As machine learning projects grow in complexity, managing models, data, experiments, and deployments becomes increasingly challenging. Version control for AI goes beyond traditional code versioning—it encompasses models, datasets, experiments, hyperparameters, and the entire training environment. This comprehensive guide explores best practices and tools for managing the complete ML lifecycle.
The Versioning Challenge in ML
What Needs to Be Versioned?
Unlike traditional software, ML projects have multiple moving parts:
- Code: Training scripts, model architectures, preprocessing
- Data: Training datasets, validation splits, test sets
- Models: Trained weights, checkpoints, exported formats
- Experiments: Hyperparameters, metrics, visualizations
- Environment: Dependencies, Docker images, hardware configs
- Configuration: Training configs, feature definitions
“python
# Example: Everything that defines a model
model_version = {
'code_commit': 'abc123',
'data_version': 'dataset_v2.3',
'hyperparameters': {
'learning_rate': 0.001,
'batch_size': 32,
'epochs': 100
},
'environment': {
'python': '3.9',
'pytorch': '2.0',
'cuda': '11.8'
},
'metrics': {
'accuracy': 0.95,
'f1_score': 0.93
},
'model_path': 's3://models/model_v1.2.3.pt'
}
`
Why Traditional Git Isn't Enough
Git excels at code but struggles with ML artifacts:
- Large files: Models and datasets can be gigabytes
- Binary formats: Git can't diff model weights meaningfully
- Metadata: Experiment tracking needs structured data
- Lineage: Understanding how a model was created
Data Version Control with DVC
Setting Up DVC
`bash
# Initialize DVC in a git repository
pip install dvc
dvc init
# Configure remote storage
dvc remote add -d myremote s3://my-bucket/dvc-storage
`
Tracking Data and Models
`bash
# Track a large dataset
dvc add data/training_data.csv
# This creates:
# - data/training_data.csv.dvc (small pointer file for git)
# - .gitignore entry for the actual file
git add data/training_data.csv.dvc data/.gitignore
git commit -m "Add training data v1"
# Track model files
dvc add models/model_v1.pt
git add models/model_v1.pt.dvc
git commit -m "Add trained model v1"
# Push to remote storage
dvc push
`
DVC Pipelines
`yaml
# dvc.yaml - Define reproducible ML pipelines
stages:
prepare:
cmd: python src/prepare_data.py
deps:
- src/prepare_data.py
- data/raw/
outs:
- data/processed/
params:
- prepare.split_ratio
- prepare.seed
train:
cmd: python src/train.py
deps:
- src/train.py
- data/processed/
params:
- train.learning_rate
- train.batch_size
- train.epochs
outs:
- models/model.pt
metrics:
- metrics.json:
cache: false
plots:
- plots/training_curves.json:
x: epoch
y: loss
evaluate:
cmd: python src/evaluate.py
deps:
- src/evaluate.py
- models/model.pt
- data/processed/test/
metrics:
- evaluation_metrics.json:
cache: false
`
`yaml
# params.yaml - Centralized parameters
prepare:
split_ratio: 0.2
seed: 42
train:
learning_rate: 0.001
batch_size: 32
epochs: 100
model_type: resnet50
`
Running and Reproducing Pipelines
`bash
# Run the full pipeline
dvc repro
# Run only specific stage
dvc repro train
# Compare metrics across versions
dvc metrics show
dvc metrics diff
# Visualize pipeline
dvc dag
`
Experiment Tracking
MLflow Integration
`python
import mlflow
import mlflow.pytorch
# Set tracking URI
mlflow.set_tracking_uri("http://mlflow-server:5000")
# Set experiment
mlflow.set_experiment("image-classification")
def train_model(config):
with mlflow.start_run():
# Log parameters
mlflow.log_params({
'learning_rate': config['lr'],
'batch_size': config['batch_size'],
'model_type': config['model_type'],
'optimizer': config['optimizer']
})
# Train
model = create_model(config)
for epoch in range(config['epochs']):
train_loss, train_acc = train_epoch(model)
val_loss, val_acc = validate(model)
# Log metrics
mlflow.log_metrics({
'train_loss': train_loss,
'train_acc': train_acc,
'val_loss': val_loss,
'val_acc': val_acc
}, step=epoch)
# Log final metrics
test_metrics = evaluate(model)
mlflow.log_metrics({
'test_accuracy': test_metrics['accuracy'],
'test_f1': test_metrics['f1']
})
# Log model
mlflow.pytorch.log_model(model, "model")
# Log artifacts
mlflow.log_artifact("confusion_matrix.png")
mlflow.log_artifact("config.yaml")
# Set tags
mlflow.set_tags({
'framework': 'pytorch',
'task': 'classification',
'dataset': 'imagenet'
})
`
Weights & Biases Integration
`python
import wandb
def train_with_wandb(config):
# Initialize run
run = wandb.init(
project="image-classification",
config=config,
tags=["resnet", "baseline"]
)
model = create_model(config)
# Watch model for gradient tracking
wandb.watch(model, log="all", log_freq=100)
for epoch in range(config['epochs']):
for batch_idx, (images, labels) in enumerate(train_loader):
loss, acc = train_step(model, images, labels)
# Log per-batch metrics
wandb.log({
'batch_loss': loss,
'batch_acc': acc
})
# Log epoch metrics
val_metrics = validate(model)
wandb.log({
'epoch': epoch,
'val_loss': val_metrics['loss'],
'val_accuracy': val_metrics['accuracy'],
'learning_rate': get_lr(optimizer)
})
# Log sample predictions
if epoch % 10 == 0:
log_predictions(model, val_loader)
# Log final model
model_artifact = wandb.Artifact(
name=f"model-{run.id}",
type="model",
description="Trained classification model"
)
model_artifact.add_file("model.pt")
run.log_artifact(model_artifact)
run.finish()
def log_predictions(model, loader):
"""Log sample predictions as a table."""
table = wandb.Table(columns=["image", "prediction", "ground_truth"])
model.eval()
with torch.no_grad():
for images, labels in loader:
predictions = model(images).argmax(dim=1)
for img, pred, label in zip(images[:10], predictions[:10], labels[:10]):
table.add_data(
wandb.Image(img),
class_names[pred.item()],
class_names[label.item()]
)
break
wandb.log({"predictions": table})
`
Model Registry
MLflow Model Registry
`python
import mlflow
from mlflow.tracking import MlflowClient
client = MlflowClient()
# Register a model
result = mlflow.register_model(
model_uri=f"runs:/{run_id}/model",
name="image-classifier"
)
# Transition model stages
client.transition_model_version_stage(
name="image-classifier",
version=result.version,
stage="Staging"
)
# After validation, promote to production
client.transition_model_version_stage(
name="image-classifier",
version=result.version,
stage="Production"
)
# Add description and tags
client.update_model_version(
name="image-classifier",
version=result.version,
description="ResNet50 trained on ImageNet, achieves 95% accuracy"
)
client.set_model_version_tag(
name="image-classifier",
version=result.version,
key="validation_status",
value="approved"
)
# Load model by stage
def load_production_model():
model = mlflow.pytorch.load_model(
model_uri="models:/image-classifier/Production"
)
return model
# List all versions
versions = client.search_model_versions("name='image-classifier'")
for v in versions:
print(f"Version {v.version}: stage={v.current_stage}, status={v.status}")
`
Custom Model Registry
`python
import hashlib
import json
from datetime import datetime
class ModelRegistry:
"""Custom model registry for version control."""
def __init__(self, storage_path):
self.storage_path = storage_path
self.metadata_file = os.path.join(storage_path, "registry.json")
self.metadata = self._load_metadata()
def _load_metadata(self):
if os.path.exists(self.metadata_file):
with open(self.metadata_file) as f:
return json.load(f)
return {"models": {}}
def _save_metadata(self):
with open(self.metadata_file, 'w') as f:
json.dump(self.metadata, f, indent=2)
def _compute_hash(self, model_path):
"""Compute SHA256 hash of model file."""
sha256 = hashlib.sha256()
with open(model_path, 'rb') as f:
for chunk in iter(lambda: f.read(8192), b''):
sha256.update(chunk)
return sha256.hexdigest()
def register(self, name, model_path, metrics, config, tags=None):
"""Register a new model version."""
if name not in self.metadata["models"]:
self.metadata["models"][name] = {"versions": []}
# Compute version number
versions = self.metadata["models"][name]["versions"]
version = len(versions) + 1
# Compute model hash
model_hash = self._compute_hash(model_path)
# Check for duplicate
for v in versions:
if v["hash"] == model_hash:
print(f"Identical model already registered as version {v['version']}")
return v["version"]
# Copy model to registry storage
dest_path = os.path.join(
self.storage_path, name, f"v{version}", "model.pt"
)
os.makedirs(os.path.dirname(dest_path), exist_ok=True)
shutil.copy(model_path, dest_path)
# Record metadata
version_info = {
"version": version,
"path": dest_path,
"hash": model_hash,
"metrics": metrics,
"config": config,
"tags": tags or {},
"created_at": datetime.now().isoformat(),
"stage": "development"
}
versions.append(version_info)
self._save_metadata()
return version
def promote(self, name, version, stage):
"""Promote a model to a new stage."""
versions = self.metadata["models"][name]["versions"]
# Demote any existing model in this stage
for v in versions:
if v["stage"] == stage:
v["stage"] = "archived"
# Promote requested version
for v in versions:
if v["version"] == version:
v["stage"] = stage
break
self._save_metadata()
def get(self, name, version=None, stage=None):
"""Get model by version or stage."""
versions = self.metadata["models"][name]["versions"]
if version:
for v in versions:
if v["version"] == version:
return v
if stage:
for v in versions:
if v["stage"] == stage:
return v
return None
def compare(self, name, version1, version2):
"""Compare two model versions."""
v1 = self.get(name, version=version1)
v2 = self.get(name, version=version2)
comparison = {
"metrics_diff": {},
"config_diff": {}
}
# Compare metrics
for key in set(v1["metrics"].keys()) | set(v2["metrics"].keys()):
m1 = v1["metrics"].get(key)
m2 = v2["metrics"].get(key)
if m1 != m2:
comparison["metrics_diff"][key] = {
"v1": m1, "v2": m2, "diff": m2 - m1 if m1 and m2 else None
}
# Compare config
for key in set(v1["config"].keys()) | set(v2["config"].keys()):
c1 = v1["config"].get(key)
c2 = v2["config"].get(key)
if c1 != c2:
comparison["config_diff"][key] = {"v1": c1, "v2": c2}
return comparison
`
Environment Management
Docker for Reproducibility
`dockerfile
# Dockerfile for ML training environment
FROM nvidia/cuda:11.8-cudnn8-devel-ubuntu22.04
# System dependencies
RUN apt-get update && apt-get install -y \
python3.10 \
python3-pip \
git \
&& rm -rf /var/lib/apt/lists/*
# Python dependencies
COPY requirements.txt /app/
RUN pip install -r /app/requirements.txt
# Copy code
COPY src/ /app/src/
COPY configs/ /app/configs/
WORKDIR /app
# Environment variables
ENV PYTHONPATH=/app
ENV CUDA_VISIBLE_DEVICES=0
ENTRYPOINT ["python", "src/train.py"]
`
`yaml
# docker-compose.yml for training
version: '3.8'
services:
train:
build: .
volumes:
- ./data:/app/data
- ./models:/app/models
- ./logs:/app/logs
environment:
- WANDB_API_KEY=${WANDB_API_KEY}
- MLFLOW_TRACKING_URI=${MLFLOW_TRACKING_URI}
deploy:
resources:
reservations:
devices:
- driver: nvidia
count: 1
capabilities: [gpu]
`
Conda Environments
`yaml
# environment.yml
name: ml-training
channels:
- pytorch
- nvidia
- conda-forge
- defaults
dependencies:
- python=3.10
- pytorch=2.0
- torchvision
- cudatoolkit=11.8
- numpy
- pandas
- scikit-learn
- pip:
- mlflow
- wandb
- dvc[s3]
`
Requirements Lock Files
`python
# generate_requirements.py
import subprocess
import json
def generate_locked_requirements():
"""Generate locked requirements with exact versions."""
# Get installed packages
result = subprocess.run(
['pip', 'list', '--format', 'json'],
capture_output=True, text=True
)
packages = json.loads(result.stdout)
# Write locked requirements
with open('requirements.lock', 'w') as f:
for pkg in sorted(packages, key=lambda x: x['name']):
f.write(f"{pkg['name']}=={pkg['version']}\n")
# Also capture system info
system_info = {
'python': subprocess.run(
['python', '--version'],
capture_output=True, text=True
).stdout.strip(),
'cuda': subprocess.run(
['nvidia-smi', '--query-gpu=driver_version,cuda_version', '--format=csv'],
capture_output=True, text=True
).stdout.strip(),
'platform': subprocess.run(
['uname', '-a'],
capture_output=True, text=True
).stdout.strip()
}
with open('environment_info.json', 'w') as f:
json.dump(system_info, f, indent=2)
`
Lineage Tracking
Data Lineage
`python
class DataLineageTracker:
"""Track data transformations and dependencies."""
def __init__(self):
self.lineage = []
def record_operation(self, operation, inputs, output, params=None):
"""Record a data transformation."""
self.lineage.append({
'timestamp': datetime.now().isoformat(),
'operation': operation,
'inputs': inputs,
'output': output,
'params': params or {},
'code_version': self._get_git_commit()
})
def _get_git_commit(self):
try:
result = subprocess.run(
['git', 'rev-parse', 'HEAD'],
capture_output=True, text=True
)
return result.stdout.strip()
except:
return None
def save(self, path):
with open(path, 'w') as f:
json.dump(self.lineage, f, indent=2)
def visualize(self):
"""Generate lineage graph."""
import graphviz
dot = graphviz.Digraph()
for i, op in enumerate(self.lineage):
node_id = f"op_{i}"
dot.node(node_id, op['operation'])
for inp in op['inputs']:
dot.edge(inp, node_id)
dot.edge(node_id, op['output'])
return dot
# Usage
tracker = DataLineageTracker()
# Track data loading
raw_data = load_data("data/raw.csv")
tracker.record_operation(
"load_data",
inputs=["data/raw.csv"],
output="raw_data"
)
# Track preprocessing
processed_data = preprocess(raw_data, normalize=True)
tracker.record_operation(
"preprocess",
inputs=["raw_data"],
output="processed_data",
params={"normalize": True}
)
# Track splitting
train, test = split_data(processed_data, ratio=0.8)
tracker.record_operation(
"split_data",
inputs=["processed_data"],
output="train_test_split",
params={"ratio": 0.8}
)
`
Model Lineage
`python
class ModelCard:
"""Document model provenance and characteristics."""
def __init__(self, name, version):
self.name = name
self.version = version
self.card = {
'model_details': {},
'training_data': {},
'evaluation': {},
'ethical_considerations': {},
'caveats': {}
}
def set_model_details(self, architecture, framework, task,
input_format, output_format):
self.card['model_details'] = {
'name': self.name,
'version': self.version,
'architecture': architecture,
'framework': framework,
'task': task,
'input_format': input_format,
'output_format': output_format
}
def set_training_data(self, dataset_name, dataset_version,
num_examples, preprocessing):
self.card['training_data'] = {
'dataset': dataset_name,
'version': dataset_version,
'num_examples': num_examples,
'preprocessing': preprocessing
}
def set_evaluation(self, metrics, test_dataset, evaluation_date):
self.card['evaluation'] = {
'metrics': metrics,
'test_dataset': test_dataset,
'evaluation_date': evaluation_date
}
def set_lineage(self, parent_model=None, training_run_id=None,
code_commit=None, data_commit=None):
self.card['lineage'] = {
'parent_model': parent_model,
'training_run_id': training_run_id,
'code_commit': code_commit,
'data_commit': data_commit
}
def export(self, path):
with open(path, 'w') as f:
json.dump(self.card, f, indent=2)
def to_markdown(self):
md = f"# Model Card: {self.name} v{self.version}\n\n"
md += "## Model Details\n"
for k, v in self.card['model_details'].items():
md += f"- {k}: {v}\n"
md += "\n## Training Data\n"
for k, v in self.card['training_data'].items():
md += f"- {k}: {v}\n"
md += "\n## Evaluation\n"
for k, v in self.card['evaluation'].get('metrics', {}).items():
md += f"- {k}: {v}\n"
return md
`
Best Practices
Project Structure
`
my-ml-project/
├── .dvc/ # DVC configuration
├── .git/ # Git repository
├── data/
│ ├── raw/ # Raw data (DVC tracked)
│ ├── processed/ # Processed data (DVC tracked)
│ └── .gitignore
├── models/
│ ├── checkpoints/ # Training checkpoints
│ └── exported/ # Export-ready models
├── src/
│ ├── data/ # Data loading and preprocessing
│ ├── models/ # Model architectures
│ ├── training/ # Training scripts
│ └── evaluation/ # Evaluation scripts
├── configs/
│ ├── train_config.yaml
│ └── model_config.yaml
├── notebooks/ # Exploration notebooks
├── tests/ # Unit tests
├── dvc.yaml # DVC pipeline
├── params.yaml # Parameters
├── requirements.txt
├── Dockerfile
└── README.md
`
Semantic Versioning for Models
`python
class ModelVersioner:
"""Semantic versioning for ML models."""
@staticmethod
def get_version_type(old_metrics, new_metrics, config_changed):
"""
Determine version bump type:
- MAJOR: Breaking changes (different input/output format)
- MINOR: Significant improvements (>5% metric gain)
- PATCH: Bug fixes, minor improvements
"""
if config_changed.get('input_format') or config_changed.get('output_format'):
return 'major'
metric_improvement = (
new_metrics['accuracy'] - old_metrics['accuracy']
) / old_metrics['accuracy']
if metric_improvement > 0.05:
return 'minor'
return 'patch'
@staticmethod
def bump_version(current_version, bump_type):
major, minor, patch = map(int, current_version.split('.'))
if bump_type == 'major':
return f"{major + 1}.0.0"
elif bump_type == 'minor':
return f"{major}.{minor + 1}.0"
else:
return f"{major}.{minor}.{patch + 1}"
“
Conclusion
Effective version control for AI projects requires a holistic approach that goes beyond traditional code versioning. By combining tools like DVC, MLflow, and careful organization, you can ensure reproducibility, traceability, and collaboration throughout the ML lifecycle.
Key takeaways:
- Version everything: Code, data, models, configs, and environments
- Use specialized tools: DVC for data, MLflow/W&B for experiments
- Establish a model registry: Track model versions and stages
- Track lineage: Understand how models and data were created
- Containerize environments: Docker ensures reproducibility
- Document thoroughly: Model cards capture essential information
With proper version control, you can confidently reproduce experiments, roll back to previous versions, collaborate effectively, and maintain a clear audit trail of your ML development process.