As machine learning projects grow in complexity, managing models, data, experiments, and deployments becomes increasingly challenging. Version control for AI goes beyond traditional code versioning—it encompasses models, datasets, experiments, hyperparameters, and the entire training environment. This comprehensive guide explores best practices and tools for managing the complete ML lifecycle.

The Versioning Challenge in ML

What Needs to Be Versioned?

Unlike traditional software, ML projects have multiple moving parts:

  1. Code: Training scripts, model architectures, preprocessing
  2. Data: Training datasets, validation splits, test sets
  3. Models: Trained weights, checkpoints, exported formats
  4. Experiments: Hyperparameters, metrics, visualizations
  5. Environment: Dependencies, Docker images, hardware configs
  6. Configuration: Training configs, feature definitions

python

# Example: Everything that defines a model

model_version = {

'code_commit': 'abc123',

'data_version': 'dataset_v2.3',

'hyperparameters': {

'learning_rate': 0.001,

'batch_size': 32,

'epochs': 100

},

'environment': {

'python': '3.9',

'pytorch': '2.0',

'cuda': '11.8'

},

'metrics': {

'accuracy': 0.95,

'f1_score': 0.93

},

'model_path': 's3://models/model_v1.2.3.pt'

}

`

Why Traditional Git Isn't Enough

Git excels at code but struggles with ML artifacts:

  • Large files: Models and datasets can be gigabytes
  • Binary formats: Git can't diff model weights meaningfully
  • Metadata: Experiment tracking needs structured data
  • Lineage: Understanding how a model was created

Data Version Control with DVC

Setting Up DVC

`bash

# Initialize DVC in a git repository

pip install dvc

dvc init

# Configure remote storage

dvc remote add -d myremote s3://my-bucket/dvc-storage

`

Tracking Data and Models

`bash

# Track a large dataset

dvc add data/training_data.csv

# This creates:

# - data/training_data.csv.dvc (small pointer file for git)

# - .gitignore entry for the actual file

git add data/training_data.csv.dvc data/.gitignore

git commit -m "Add training data v1"

# Track model files

dvc add models/model_v1.pt

git add models/model_v1.pt.dvc

git commit -m "Add trained model v1"

# Push to remote storage

dvc push

`

DVC Pipelines

`yaml

# dvc.yaml - Define reproducible ML pipelines

stages:

prepare:

cmd: python src/prepare_data.py

deps:

  • src/prepare_data.py
  • data/raw/

outs:

  • data/processed/

params:

  • prepare.split_ratio
  • prepare.seed

train:

cmd: python src/train.py

deps:

  • src/train.py
  • data/processed/

params:

  • train.learning_rate
  • train.batch_size
  • train.epochs

outs:

  • models/model.pt

metrics:

  • metrics.json:

cache: false

plots:

  • plots/training_curves.json:

x: epoch

y: loss

evaluate:

cmd: python src/evaluate.py

deps:

  • src/evaluate.py
  • models/model.pt
  • data/processed/test/

metrics:

  • evaluation_metrics.json:

cache: false

`

`yaml

# params.yaml - Centralized parameters

prepare:

split_ratio: 0.2

seed: 42

train:

learning_rate: 0.001

batch_size: 32

epochs: 100

model_type: resnet50

`

Running and Reproducing Pipelines

`bash

# Run the full pipeline

dvc repro

# Run only specific stage

dvc repro train

# Compare metrics across versions

dvc metrics show

dvc metrics diff

# Visualize pipeline

dvc dag

`

Experiment Tracking

MLflow Integration

`python

import mlflow

import mlflow.pytorch

# Set tracking URI

mlflow.set_tracking_uri("http://mlflow-server:5000")

# Set experiment

mlflow.set_experiment("image-classification")

def train_model(config):

with mlflow.start_run():

# Log parameters

mlflow.log_params({

'learning_rate': config['lr'],

'batch_size': config['batch_size'],

'model_type': config['model_type'],

'optimizer': config['optimizer']

})

# Train

model = create_model(config)

for epoch in range(config['epochs']):

train_loss, train_acc = train_epoch(model)

val_loss, val_acc = validate(model)

# Log metrics

mlflow.log_metrics({

'train_loss': train_loss,

'train_acc': train_acc,

'val_loss': val_loss,

'val_acc': val_acc

}, step=epoch)

# Log final metrics

test_metrics = evaluate(model)

mlflow.log_metrics({

'test_accuracy': test_metrics['accuracy'],

'test_f1': test_metrics['f1']

})

# Log model

mlflow.pytorch.log_model(model, "model")

# Log artifacts

mlflow.log_artifact("confusion_matrix.png")

mlflow.log_artifact("config.yaml")

# Set tags

mlflow.set_tags({

'framework': 'pytorch',

'task': 'classification',

'dataset': 'imagenet'

})

`

Weights & Biases Integration

`python

import wandb

def train_with_wandb(config):

# Initialize run

run = wandb.init(

project="image-classification",

config=config,

tags=["resnet", "baseline"]

)

model = create_model(config)

# Watch model for gradient tracking

wandb.watch(model, log="all", log_freq=100)

for epoch in range(config['epochs']):

for batch_idx, (images, labels) in enumerate(train_loader):

loss, acc = train_step(model, images, labels)

# Log per-batch metrics

wandb.log({

'batch_loss': loss,

'batch_acc': acc

})

# Log epoch metrics

val_metrics = validate(model)

wandb.log({

'epoch': epoch,

'val_loss': val_metrics['loss'],

'val_accuracy': val_metrics['accuracy'],

'learning_rate': get_lr(optimizer)

})

# Log sample predictions

if epoch % 10 == 0:

log_predictions(model, val_loader)

# Log final model

model_artifact = wandb.Artifact(

name=f"model-{run.id}",

type="model",

description="Trained classification model"

)

model_artifact.add_file("model.pt")

run.log_artifact(model_artifact)

run.finish()

def log_predictions(model, loader):

"""Log sample predictions as a table."""

table = wandb.Table(columns=["image", "prediction", "ground_truth"])

model.eval()

with torch.no_grad():

for images, labels in loader:

predictions = model(images).argmax(dim=1)

for img, pred, label in zip(images[:10], predictions[:10], labels[:10]):

table.add_data(

wandb.Image(img),

class_names[pred.item()],

class_names[label.item()]

)

break

wandb.log({"predictions": table})

`

Model Registry

MLflow Model Registry

`python

import mlflow

from mlflow.tracking import MlflowClient

client = MlflowClient()

# Register a model

result = mlflow.register_model(

model_uri=f"runs:/{run_id}/model",

name="image-classifier"

)

# Transition model stages

client.transition_model_version_stage(

name="image-classifier",

version=result.version,

stage="Staging"

)

# After validation, promote to production

client.transition_model_version_stage(

name="image-classifier",

version=result.version,

stage="Production"

)

# Add description and tags

client.update_model_version(

name="image-classifier",

version=result.version,

description="ResNet50 trained on ImageNet, achieves 95% accuracy"

)

client.set_model_version_tag(

name="image-classifier",

version=result.version,

key="validation_status",

value="approved"

)

# Load model by stage

def load_production_model():

model = mlflow.pytorch.load_model(

model_uri="models:/image-classifier/Production"

)

return model

# List all versions

versions = client.search_model_versions("name='image-classifier'")

for v in versions:

print(f"Version {v.version}: stage={v.current_stage}, status={v.status}")

`

Custom Model Registry

`python

import hashlib

import json

from datetime import datetime

class ModelRegistry:

"""Custom model registry for version control."""

def __init__(self, storage_path):

self.storage_path = storage_path

self.metadata_file = os.path.join(storage_path, "registry.json")

self.metadata = self._load_metadata()

def _load_metadata(self):

if os.path.exists(self.metadata_file):

with open(self.metadata_file) as f:

return json.load(f)

return {"models": {}}

def _save_metadata(self):

with open(self.metadata_file, 'w') as f:

json.dump(self.metadata, f, indent=2)

def _compute_hash(self, model_path):

"""Compute SHA256 hash of model file."""

sha256 = hashlib.sha256()

with open(model_path, 'rb') as f:

for chunk in iter(lambda: f.read(8192), b''):

sha256.update(chunk)

return sha256.hexdigest()

def register(self, name, model_path, metrics, config, tags=None):

"""Register a new model version."""

if name not in self.metadata["models"]:

self.metadata["models"][name] = {"versions": []}

# Compute version number

versions = self.metadata["models"][name]["versions"]

version = len(versions) + 1

# Compute model hash

model_hash = self._compute_hash(model_path)

# Check for duplicate

for v in versions:

if v["hash"] == model_hash:

print(f"Identical model already registered as version {v['version']}")

return v["version"]

# Copy model to registry storage

dest_path = os.path.join(

self.storage_path, name, f"v{version}", "model.pt"

)

os.makedirs(os.path.dirname(dest_path), exist_ok=True)

shutil.copy(model_path, dest_path)

# Record metadata

version_info = {

"version": version,

"path": dest_path,

"hash": model_hash,

"metrics": metrics,

"config": config,

"tags": tags or {},

"created_at": datetime.now().isoformat(),

"stage": "development"

}

versions.append(version_info)

self._save_metadata()

return version

def promote(self, name, version, stage):

"""Promote a model to a new stage."""

versions = self.metadata["models"][name]["versions"]

# Demote any existing model in this stage

for v in versions:

if v["stage"] == stage:

v["stage"] = "archived"

# Promote requested version

for v in versions:

if v["version"] == version:

v["stage"] = stage

break

self._save_metadata()

def get(self, name, version=None, stage=None):

"""Get model by version or stage."""

versions = self.metadata["models"][name]["versions"]

if version:

for v in versions:

if v["version"] == version:

return v

if stage:

for v in versions:

if v["stage"] == stage:

return v

return None

def compare(self, name, version1, version2):

"""Compare two model versions."""

v1 = self.get(name, version=version1)

v2 = self.get(name, version=version2)

comparison = {

"metrics_diff": {},

"config_diff": {}

}

# Compare metrics

for key in set(v1["metrics"].keys()) | set(v2["metrics"].keys()):

m1 = v1["metrics"].get(key)

m2 = v2["metrics"].get(key)

if m1 != m2:

comparison["metrics_diff"][key] = {

"v1": m1, "v2": m2, "diff": m2 - m1 if m1 and m2 else None

}

# Compare config

for key in set(v1["config"].keys()) | set(v2["config"].keys()):

c1 = v1["config"].get(key)

c2 = v2["config"].get(key)

if c1 != c2:

comparison["config_diff"][key] = {"v1": c1, "v2": c2}

return comparison

`

Environment Management

Docker for Reproducibility

`dockerfile

# Dockerfile for ML training environment

FROM nvidia/cuda:11.8-cudnn8-devel-ubuntu22.04

# System dependencies

RUN apt-get update && apt-get install -y \

python3.10 \

python3-pip \

git \

&& rm -rf /var/lib/apt/lists/*

# Python dependencies

COPY requirements.txt /app/

RUN pip install -r /app/requirements.txt

# Copy code

COPY src/ /app/src/

COPY configs/ /app/configs/

WORKDIR /app

# Environment variables

ENV PYTHONPATH=/app

ENV CUDA_VISIBLE_DEVICES=0

ENTRYPOINT ["python", "src/train.py"]

`

`yaml

# docker-compose.yml for training

version: '3.8'

services:

train:

build: .

volumes:

  • ./data:/app/data
  • ./models:/app/models
  • ./logs:/app/logs

environment:

  • WANDB_API_KEY=${WANDB_API_KEY}
  • MLFLOW_TRACKING_URI=${MLFLOW_TRACKING_URI}

deploy:

resources:

reservations:

devices:

  • driver: nvidia

count: 1

capabilities: [gpu]

`

Conda Environments

`yaml

# environment.yml

name: ml-training

channels:

  • pytorch
  • nvidia
  • conda-forge
  • defaults

dependencies:

  • python=3.10
  • pytorch=2.0
  • torchvision
  • cudatoolkit=11.8
  • numpy
  • pandas
  • scikit-learn
  • pip:
  • mlflow
  • wandb
  • dvc[s3]

`

Requirements Lock Files

`python

# generate_requirements.py

import subprocess

import json

def generate_locked_requirements():

"""Generate locked requirements with exact versions."""

# Get installed packages

result = subprocess.run(

['pip', 'list', '--format', 'json'],

capture_output=True, text=True

)

packages = json.loads(result.stdout)

# Write locked requirements

with open('requirements.lock', 'w') as f:

for pkg in sorted(packages, key=lambda x: x['name']):

f.write(f"{pkg['name']}=={pkg['version']}\n")

# Also capture system info

system_info = {

'python': subprocess.run(

['python', '--version'],

capture_output=True, text=True

).stdout.strip(),

'cuda': subprocess.run(

['nvidia-smi', '--query-gpu=driver_version,cuda_version', '--format=csv'],

capture_output=True, text=True

).stdout.strip(),

'platform': subprocess.run(

['uname', '-a'],

capture_output=True, text=True

).stdout.strip()

}

with open('environment_info.json', 'w') as f:

json.dump(system_info, f, indent=2)

`

Lineage Tracking

Data Lineage

`python

class DataLineageTracker:

"""Track data transformations and dependencies."""

def __init__(self):

self.lineage = []

def record_operation(self, operation, inputs, output, params=None):

"""Record a data transformation."""

self.lineage.append({

'timestamp': datetime.now().isoformat(),

'operation': operation,

'inputs': inputs,

'output': output,

'params': params or {},

'code_version': self._get_git_commit()

})

def _get_git_commit(self):

try:

result = subprocess.run(

['git', 'rev-parse', 'HEAD'],

capture_output=True, text=True

)

return result.stdout.strip()

except:

return None

def save(self, path):

with open(path, 'w') as f:

json.dump(self.lineage, f, indent=2)

def visualize(self):

"""Generate lineage graph."""

import graphviz

dot = graphviz.Digraph()

for i, op in enumerate(self.lineage):

node_id = f"op_{i}"

dot.node(node_id, op['operation'])

for inp in op['inputs']:

dot.edge(inp, node_id)

dot.edge(node_id, op['output'])

return dot

# Usage

tracker = DataLineageTracker()

# Track data loading

raw_data = load_data("data/raw.csv")

tracker.record_operation(

"load_data",

inputs=["data/raw.csv"],

output="raw_data"

)

# Track preprocessing

processed_data = preprocess(raw_data, normalize=True)

tracker.record_operation(

"preprocess",

inputs=["raw_data"],

output="processed_data",

params={"normalize": True}

)

# Track splitting

train, test = split_data(processed_data, ratio=0.8)

tracker.record_operation(

"split_data",

inputs=["processed_data"],

output="train_test_split",

params={"ratio": 0.8}

)

`

Model Lineage

`python

class ModelCard:

"""Document model provenance and characteristics."""

def __init__(self, name, version):

self.name = name

self.version = version

self.card = {

'model_details': {},

'training_data': {},

'evaluation': {},

'ethical_considerations': {},

'caveats': {}

}

def set_model_details(self, architecture, framework, task,

input_format, output_format):

self.card['model_details'] = {

'name': self.name,

'version': self.version,

'architecture': architecture,

'framework': framework,

'task': task,

'input_format': input_format,

'output_format': output_format

}

def set_training_data(self, dataset_name, dataset_version,

num_examples, preprocessing):

self.card['training_data'] = {

'dataset': dataset_name,

'version': dataset_version,

'num_examples': num_examples,

'preprocessing': preprocessing

}

def set_evaluation(self, metrics, test_dataset, evaluation_date):

self.card['evaluation'] = {

'metrics': metrics,

'test_dataset': test_dataset,

'evaluation_date': evaluation_date

}

def set_lineage(self, parent_model=None, training_run_id=None,

code_commit=None, data_commit=None):

self.card['lineage'] = {

'parent_model': parent_model,

'training_run_id': training_run_id,

'code_commit': code_commit,

'data_commit': data_commit

}

def export(self, path):

with open(path, 'w') as f:

json.dump(self.card, f, indent=2)

def to_markdown(self):

md = f"# Model Card: {self.name} v{self.version}\n\n"

md += "## Model Details\n"

for k, v in self.card['model_details'].items():

md += f"- {k}: {v}\n"

md += "\n## Training Data\n"

for k, v in self.card['training_data'].items():

md += f"- {k}: {v}\n"

md += "\n## Evaluation\n"

for k, v in self.card['evaluation'].get('metrics', {}).items():

md += f"- {k}: {v}\n"

return md

`

Best Practices

Project Structure

`

my-ml-project/

├── .dvc/ # DVC configuration

├── .git/ # Git repository

├── data/

│ ├── raw/ # Raw data (DVC tracked)

│ ├── processed/ # Processed data (DVC tracked)

│ └── .gitignore

├── models/

│ ├── checkpoints/ # Training checkpoints

│ └── exported/ # Export-ready models

├── src/

│ ├── data/ # Data loading and preprocessing

│ ├── models/ # Model architectures

│ ├── training/ # Training scripts

│ └── evaluation/ # Evaluation scripts

├── configs/

│ ├── train_config.yaml

│ └── model_config.yaml

├── notebooks/ # Exploration notebooks

├── tests/ # Unit tests

├── dvc.yaml # DVC pipeline

├── params.yaml # Parameters

├── requirements.txt

├── Dockerfile

└── README.md

`

Semantic Versioning for Models

`python

class ModelVersioner:

"""Semantic versioning for ML models."""

@staticmethod

def get_version_type(old_metrics, new_metrics, config_changed):

"""

Determine version bump type:

  • MAJOR: Breaking changes (different input/output format)
  • MINOR: Significant improvements (>5% metric gain)
  • PATCH: Bug fixes, minor improvements

"""

if config_changed.get('input_format') or config_changed.get('output_format'):

return 'major'

metric_improvement = (

new_metrics['accuracy'] - old_metrics['accuracy']

) / old_metrics['accuracy']

if metric_improvement > 0.05:

return 'minor'

return 'patch'

@staticmethod

def bump_version(current_version, bump_type):

major, minor, patch = map(int, current_version.split('.'))

if bump_type == 'major':

return f"{major + 1}.0.0"

elif bump_type == 'minor':

return f"{major}.{minor + 1}.0"

else:

return f"{major}.{minor}.{patch + 1}"

Conclusion

Effective version control for AI projects requires a holistic approach that goes beyond traditional code versioning. By combining tools like DVC, MLflow, and careful organization, you can ensure reproducibility, traceability, and collaboration throughout the ML lifecycle.

Key takeaways:

  1. Version everything: Code, data, models, configs, and environments
  2. Use specialized tools: DVC for data, MLflow/W&B for experiments
  3. Establish a model registry: Track model versions and stages
  4. Track lineage: Understand how models and data were created
  5. Containerize environments: Docker ensures reproducibility
  6. Document thoroughly: Model cards capture essential information

With proper version control, you can confidently reproduce experiments, roll back to previous versions, collaborate effectively, and maintain a clear audit trail of your ML development process.

Leave a Reply

Your email address will not be published. Required fields are marked *