Data Pipeline Architecture
The Hokusai data pipeline is a Metaflow-based system designed to evaluate machine learning model improvements using contributed data. It produces cryptographically verifiable attestations suitable for blockchain integration.
System Overview
Core Components
1. Pipeline Orchestrator (hokusai_pipeline.py
)
The main Metaflow pipeline that coordinates all steps:
class HokusaiPipeline(FlowSpec):
"""Main pipeline orchestrating model evaluation"""
@step
def start(self):
"""Initialize pipeline with configuration"""
self.config = load_config()
self.mlflow_run_id = start_mlflow_run()
self.next(self.load_baseline_model)
@step
def load_baseline_model(self):
"""Load or create baseline model"""
# Implementation details...
self.next(self.validate_contributed_data)
2. Data Integration Module
Handles all data processing operations:
class DataIntegrator:
"""Manages data loading, validation, and integration"""
def validate_schema(self, data: pd.DataFrame) -> bool:
"""Ensure data matches expected schema"""
def detect_pii(self, data: pd.DataFrame) -> Dict[str, List[str]]:
"""Scan for personally identifiable information"""
def integrate_datasets(
self,
baseline: pd.DataFrame,
contributed: pd.DataFrame
) -> pd.DataFrame:
"""Merge contributed data with baseline training set"""
Key Features:
- Multi-format Support: CSV, JSON, Parquet
- Schema Validation: Ensures data consistency
- PII Detection: Automatic sensitive data scanning
- Deduplication: Removes redundant entries
- Quality Scoring: Assesses data contribution value
3. Model Training Module
Manages model training with contributed data:
class ModelTrainer:
"""Handles model training operations"""
def train_model(
self,
data: pd.DataFrame,
config: TrainingConfig
) -> Model:
"""Train model with specified configuration"""
def save_checkpoint(self, model: Model, iteration: int):
"""Save training checkpoint for recovery"""
Training Features:
- Deterministic Training: Fixed seeds for reproducibility
- Hyperparameter Management: Configurable training params
- Checkpoint System: Resume from failures
- Multi-framework Support: TensorFlow, PyTorch, scikit-learn
4. Evaluation Module
Compares model performance:
class Evaluator:
"""Evaluates and compares model performance"""
def evaluate_model(
self,
model: Model,
test_data: pd.DataFrame
) -> Dict[str, float]:
"""Calculate performance metrics"""
def compare_models(
self,
baseline_metrics: Dict,
new_metrics: Dict
) -> DeltaResult:
"""Compute performance improvements"""
Metrics Calculated:
- Classification: Accuracy, Precision, Recall, F1, AUROC
- Regression: MSE, MAE, R², MAPE
- Custom Metrics: Domain-specific evaluations
5. Attestation Generator
Creates verifiable outputs:
class AttestationGenerator:
"""Generates ZK-proof ready attestations"""
def generate_attestation(
self,
delta_result: DeltaResult,
contributor_info: ContributorInfo,
pipeline_metadata: PipelineMetadata
) -> Attestation:
"""Create cryptographically verifiable attestation"""
Pipeline Flow Details
Step 1: Initialization
@step
def start(self):
"""Pipeline initialization"""
# Load configuration
self.config = load_pipeline_config()
# Set random seeds
set_random_seeds(self.config.random_seed)
# Initialize MLFlow
self.mlflow_run_id = init_mlflow_tracking()
self.next(self.load_baseline_model)
Step 2: Load Baseline Model
@step
def load_baseline_model(self):
"""Load existing baseline or create mock"""
if self.dry_run:
self.baseline_model = create_mock_baseline()
else:
self.baseline_model = load_model(self.baseline_path)
# Log baseline metrics
log_baseline_metrics(self.baseline_model)
self.next(self.validate_contributed_data)
Step 3: Data Validation
@step
def validate_contributed_data(self):
"""Validate and prepare contributed data"""
# Load data
data = load_contributed_data(self.contributed_data_path)
# Validate schema
validate_schema(data)
# Check for PII
pii_report = detect_pii(data)
if pii_report.has_pii:
data = hash_pii_fields(data, pii_report)
# Quality checks
self.data_quality_score = assess_data_quality(data)
self.validated_data = data
self.next(self.integrate_data)
Step 4: Data Integration
@step
def integrate_data(self):
"""Merge contributed data with training set"""
# Load baseline training data
baseline_data = load_baseline_training_data()
# Integrate datasets
integrated_data = integrate_datasets(
baseline_data,
self.validated_data
)
# Apply sampling if needed
if len(integrated_data) > self.config.max_samples:
integrated_data = stratified_sample(
integrated_data,
self.config.max_samples
)
self.training_data = integrated_data
self.next(self.train_model)
Step 5: Model Training
@step
def train_model(self):
"""Train improved model with integrated data"""
# Initialize trainer
trainer = ModelTrainer(self.config.training_config)
# Train model
self.improved_model = trainer.train(
self.training_data,
callbacks=[
MLFlowCallback(),
CheckpointCallback()
]
)
# Save model artifacts
save_model_artifacts(self.improved_model)
self.next(self.evaluate_models)
Step 6: Evaluation
@step
def evaluate_models(self):
"""Evaluate both models on benchmark"""
# Load evaluation dataset
eval_data = load_evaluation_data()
# Evaluate baseline
self.baseline_metrics = evaluate_model(
self.baseline_model,
eval_data
)
# Evaluate improved model
self.improved_metrics = evaluate_model(
self.improved_model,
eval_data
)
# Log to MLFlow
log_evaluation_results(
self.baseline_metrics,
self.improved_metrics
)
self.next(self.compute_delta)
Step 7: Delta Computation
@step
def compute_delta(self):
"""Calculate performance improvements"""
# Compute deltas
self.delta_result = compute_metric_deltas(
self.baseline_metrics,
self.improved_metrics
)
# Statistical significance
self.significance = test_significance(
self.baseline_metrics,
self.improved_metrics,
self.eval_data
)
# Aggregate improvement score
self.delta_one_score = calculate_delta_one_score(
self.delta_result
)
self.next(self.generate_attestation)
Step 8: Attestation Generation
@step
def generate_attestation(self):
"""Create final attestation output"""
# Prepare contributor info
contributor = ContributorInfo(
address=self.contributor_address,
data_hash=hash_data(self.validated_data),
sample_count=len(self.validated_data)
)
# Generate attestation
attestation = generate_attestation(
delta_result=self.delta_result,
contributor_info=contributor,
pipeline_metadata=self.get_metadata()
)
# Write output
write_attestation_output(attestation, self.output_dir)
self.next(self.end)
Configuration System
Pipeline Configuration
pipeline:
name: "hokusai-evaluation"
version: "1.0.0"
random_seed: 42
metaflow:
max_workers: 4
max_retry_count: 3
training:
framework: "scikit-learn"
model_type: "RandomForestClassifier"
hyperparameters:
n_estimators: 100
max_depth: 10
evaluation:
test_split: 0.2
stratify: true
metrics:
- accuracy
- precision
- recall
- f1_score
- auroc
Environment Variables
# Core settings
HOKUSAI_CONFIG_PATH=./config/pipeline.yaml
HOKUSAI_OUTPUT_DIR=./outputs
HOKUSAI_LOG_LEVEL=INFO
# MLFlow settings
MLFLOW_TRACKING_URI=./mlruns
MLFLOW_EXPERIMENT_NAME=hokusai-evaluation
# Performance settings
METAFLOW_MEMORY=8192
METAFLOW_CPU=4
Error Handling
Retry Logic
@retry(times=3, minutes_between_retries=1)
@step
def train_model(self):
"""Training with automatic retry on failure"""
try:
self.model = train_model(self.data)
except MemoryError:
# Reduce batch size and retry
self.config.batch_size //= 2
raise
except Exception as e:
# Log error and fail
log_error(e)
raise
Recovery Mechanisms
- Checkpointing: Resume from last successful step
- Partial Results: Save intermediate outputs
- Graceful Degradation: Continue with reduced functionality
Performance Optimization
1. Data Processing
- Chunked Reading: Process large files in chunks
- Parallel Validation: Multi-threaded data validation
- Memory Mapping: Efficient large file handling
2. Model Training
- GPU Acceleration: Automatic GPU detection and usage
- Distributed Training: Multi-node support via Metaflow
- Mixed Precision: FP16 training where supported
3. Caching Strategy
- Model Cache: Reuse loaded models
- Data Cache: Cache processed datasets
- Result Cache: Store evaluation results
Monitoring and Observability
Logging
# Structured logging throughout
logger.info("Training started", extra={
"model_type": self.config.model_type,
"data_size": len(self.training_data),
"run_id": self.mlflow_run_id
})
Metrics Collection
- Pipeline Metrics: Step duration, memory usage
- Model Metrics: Training loss, validation scores
- System Metrics: CPU, memory, disk I/O
MLFlow Integration
- Experiment Tracking: All runs logged
- Artifact Storage: Models and data saved
- Metric Visualization: Real-time dashboards
Security Considerations
Data Security
- PII Detection: Automatic scanning and hashing
- Encryption: Sensitive data encrypted at rest
- Access Control: Role-based permissions
Pipeline Security
- Input Validation: Strict schema enforcement
- Sandboxing: Isolated execution environments
- Audit Logging: Complete activity trails
Scalability
Horizontal Scaling
- Metaflow: Native support for distributed execution
- Data Partitioning: Process subsets in parallel
- Model Parallelism: Distribute model across nodes
Vertical Scaling
- Resource Allocation: Configurable CPU/memory
- Batch Processing: Adjustable batch sizes
- Streaming: Handle arbitrarily large datasets
Next Steps
- Configuration Guide - Detailed configuration options
- Data Validation - Data formats and validation
- Smart Contracts - Attestation and verification
- API Reference - API documentation