第14章:大規模データとモデルの管理

14.1 AI協働を考慮したGit LFS設定

AI協働時代のGit LFS

AI生成モデルやデータセットの管理には、第2章の協働履歴と組み合わせたGit LFS設定が重要です。AIによるアセット生成を追跡可能にします。

Git LFSの初期設定

インストールと有効化

# Git LFSのインストール
# macOS
brew install git-lfs

# Ubuntu/Debian
curl -s https://packagecloud.io/install/repositories/github/git-lfs/script.deb.sh | sudo bash
sudo apt-get install git-lfs

# Windows
# Git for Windowsに含まれている

# Git LFSの初期化
git lfs install

# リポジトリでLFSを有効化
cd your-repo
# AIアセットのトラッキング(AI協働履歴付き)
git lfs track "*.pth"  # PyTorchモデル
git lfs track "*.h5"   # Keras/TensorFlowモデル
git lfs track "*.onnx" # ONNXモデル
git lfs track "*.pkl"  # Pickleファイル
git lfs track "*.parquet" # データファイル
git lfs track "ai-generated/*" # AI生成アセット専用
git lfs track "experiments/*/models/*" # 実験モデル

# .gitattributesをコミット
git add .gitattributes
git commit -m "Configure Git LFS tracking"

LFS設定の最適化

.lfsconfigファイル

# .lfsconfig
[lfs]
    # GitHubのLFSエンドポイント(GitHub Enterprise Serverまたはカスタムサーバー用)
    # 注意: GitHub.comを使用している場合、この設定は通常不要です
    # url = https://github.com/org/repo.git/info/lfs
    
    # 同時転送数
    concurrenttransfers = 8
    
    # バッチサイズ
    batch = true
    
    # 転送時の検証
    transfer.maxverifies = 3

[lfs "transfer"]
    # カスタム転送アダプター(高速化)
    maxretries = 3
    maxverifyretries = 3

[lfs "extension"]
    # 追加の拡張子
    clean = git-lfs clean -- %f
    smudge = git-lfs smudge -- %f
    process = git-lfs filter-process

注意事項:

  • url設定はGitHub Enterprise ServerやセルフホストのGit LFSサーバーを使用する場合にのみ必要です
  • GitHub.comのパブリック/プライベートリポジトリでは、LFSエンドポイントは自動的に解決されるため設定不要です
  • カスタムLFSサーバーを使用する場合のみ、適切なエンドポイントURLを設定してください

LFSの使用パターン

モデルファイルの管理

# scripts/model_management.py
import subprocess
import hashlib
from pathlib import Path

class ModelLFSManager:
    def __init__(self, repo_path='.'):
        self.repo_path = Path(repo_path)
        
    def add_model_to_lfs(self, model_path, auto_commit=True):
        """モデルをLFSに追加"""
        model_path = Path(model_path)
        
        # ファイルサイズを確認
        size_mb = model_path.stat().st_size / (1024 * 1024)
        print(f"Model size: {size_mb:.2f} MB")
        
        # LFSに追加
        subprocess.run(['git', 'lfs', 'track', f'*{model_path.suffix}'])
        subprocess.run(['git', 'add', '.gitattributes'])
        subprocess.run(['git', 'add', str(model_path)])
        
        if auto_commit:
            # モデルのハッシュを計算
            model_hash = self._calculate_hash(model_path)[:8]
            
            commit_message = f"Add model: {model_path.name} (hash: {model_hash})"
            subprocess.run(['git', 'commit', '-m', commit_message])
            
        return {
            'path': str(model_path),
            'size_mb': size_mb,
            'hash': model_hash
        }
    
    def prune_old_models(self, keep_latest=3):
        """古いモデルを削除"""
        models_dir = self.repo_path / 'models'
        
        # モデルファイルを日付順にソート
        model_files = sorted(
            models_dir.glob('*.pth'),
            key=lambda x: x.stat().st_mtime,
            reverse=True
        )
        
        # 保持するファイル以外を削除
        for model_file in model_files[keep_latest:]:
            print(f"Removing old model: {model_file}")
            
            # LFSから削除
            subprocess.run(['git', 'rm', str(model_file)])
            
        # LFSのガベージコレクション
        subprocess.run(['git', 'lfs', 'prune'])
    
    def _calculate_hash(self, file_path):
        """ファイルのハッシュを計算"""
        sha256_hash = hashlib.sha256()
        
        with open(file_path, "rb") as f:
            for byte_block in iter(lambda: f.read(4096), b""):
                sha256_hash.update(byte_block)
                
        return sha256_hash.hexdigest()

13.2 DVC(Data Version Control)との連携

DVCの基本設定

初期設定

# DVCのインストール
pip install dvc[s3]  # AWS S3サポート付き
# または
pip install dvc[gdrive]  # Google Driveサポート付き

# DVCの初期化
dvc init

# リモートストレージの設定
dvc remote add -d myremote s3://my-bucket/dvc-storage
dvc remote modify myremote access_key_id ${AWS_ACCESS_KEY_ID}
dvc remote modify myremote secret_access_key ${AWS_SECRET_ACCESS_KEY}

# または Google Drive
dvc remote add -d myremote gdrive://folder-id

DVCパイプラインの構築

dvc.yamlの設定

# dvc.yaml
stages:
  download_data:
    cmd: python scripts/download_data.py
    deps:
      - scripts/download_data.py
    params:
      - data.source_url
      - data.dataset_name
    outs:
      - data/raw/

  preprocess:
    cmd: python scripts/preprocess.py
    deps:
      - scripts/preprocess.py
      - data/raw/
    params:
      - preprocessing.image_size
      - preprocessing.augmentation
    outs:
      - data/processed/

  train:
    cmd: python train.py
    deps:
      - train.py
      - data/processed/
      - src/
    params:
      - model.architecture
      - training.epochs
      - training.batch_size
      - training.learning_rate
    outs:
      - models/model.pth
      - logs/training.log
    metrics:
      - metrics/train_metrics.json:
          cache: false

  evaluate:
    cmd: python evaluate.py
    deps:
      - evaluate.py
      - models/model.pth
      - data/processed/test/
    outs:
      - reports/evaluation.html
      - reports/confusion_matrix.png
    metrics:
      - metrics/eval_metrics.json:
          cache: false

DVCとGitの統合

自動化スクリプト

# scripts/dvc_integration.py
import subprocess
import yaml
import json
from pathlib import Path

class DVCGitIntegration:
    def __init__(self):
        self.dvc_files = ['.dvc', 'dvc.yaml', 'dvc.lock']
        
    def run_experiment(self, params_override=None):
        """実験を実行してバージョン管理"""
        # パラメータを更新
        if params_override:
            self.update_params(params_override)
            
        # DVCパイプラインを実行
        subprocess.run(['dvc', 'repro'], check=True)
        
        # メトリクスを取得
        metrics = self.get_metrics()
        
        # Git にコミット
        self.commit_experiment(metrics)
        
        return metrics
    
    def update_params(self, params):
        """パラメータファイルを更新"""
        params_file = Path('params.yaml')
        
        with open(params_file) as f:
            current_params = yaml.safe_load(f)
            
        # パラメータを更新
        for key, value in params.items():
            keys = key.split('.')
            target = current_params
            
            for k in keys[:-1]:
                target = target[k]
            target[keys[-1]] = value
            
        with open(params_file, 'w') as f:
            yaml.dump(current_params, f)
    
    def get_metrics(self):
        """DVCメトリクスを取得"""
        metrics = {}
        
        # メトリクスファイルを読み込み
        for metric_file in Path('metrics').glob('*.json'):
            with open(metric_file) as f:
                data = json.load(f)
                metrics.update(data)
                
        return metrics
    
    def commit_experiment(self, metrics):
        """実験結果をコミット"""
        # DVCファイルを追加
        for file in self.dvc_files:
            subprocess.run(['git', 'add', file])
            
        # メトリクスファイルを追加
        subprocess.run(['git', 'add', 'metrics/'])
        
        # コミットメッセージを生成
        message = f"""Experiment: {metrics.get('model_name', 'unnamed')}

Metrics:
- Accuracy: {metrics.get('accuracy', 'N/A')}
- Loss: {metrics.get('loss', 'N/A')}
- F1 Score: {metrics.get('f1_score', 'N/A')}

Parameters:
- Learning Rate: {metrics.get('learning_rate', 'N/A')}
- Batch Size: {metrics.get('batch_size', 'N/A')}
- Epochs: {metrics.get('epochs', 'N/A')}
"""
        
        subprocess.run(['git', 'commit', '-m', message])
        
    def compare_experiments(self, branch1='HEAD~1', branch2='HEAD'):
        """実験結果を比較"""
        # メトリクスの差分を表示
        result = subprocess.run(
            ['dvc', 'metrics', 'diff', branch1, branch2],
            capture_output=True,
            text=True
        )
        
        print(result.stdout)
        
        # パラメータの差分を表示
        result = subprocess.run(
            ['dvc', 'params', 'diff', branch1, branch2],
            capture_output=True,
            text=True
        )
        
        print(result.stdout)

13.3 モデルレジストリの構築

MLflowとの連携

MLflow設定

# mlflow_config.py
import mlflow
from mlflow.tracking import MlflowClient
import os

class ModelRegistry:
    def __init__(self, tracking_uri="http://localhost:5000"):
        mlflow.set_tracking_uri(tracking_uri)
        self.client = MlflowClient()
        
    def register_model(self, model_path, model_name, metrics, params, tags=None):
        """モデルをレジストリに登録"""
        with mlflow.start_run() as run:
            # パラメータを記録
            for key, value in params.items():
                mlflow.log_param(key, value)
                
            # メトリクスを記録
            for key, value in metrics.items():
                mlflow.log_metric(key, value)
                
            # タグを記録
            if tags:
                for key, value in tags.items():
                    mlflow.set_tag(key, value)
                    
            # モデルを記録
            if model_path.endswith('.pth'):
                mlflow.pytorch.log_model(
                    pytorch_model=model_path,
                    artifact_path="model",
                    registered_model_name=model_name
                )
            elif model_path.endswith('.h5'):
                mlflow.tensorflow.log_model(
                    tf_saved_model_dir=model_path,
                    artifact_path="model",
                    registered_model_name=model_name
                )
                
            # モデルのバージョンを取得
            model_version = self.get_latest_version(model_name)
            
            return {
                'run_id': run.info.run_id,
                'model_name': model_name,
                'model_version': model_version,
                'status': 'registered'
            }
    
    def promote_model(self, model_name, version, stage="Production"):
        """モデルをプロダクションに昇格"""
        self.client.transition_model_version_stage(
            name=model_name,
            version=version,
            stage=stage
        )
        
        # 以前のプロダクションモデルをアーカイブ
        for mv in self.client.search_model_versions(f"name='{model_name}'"):
            if mv.current_stage == "Production" and mv.version != version:
                self.client.transition_model_version_stage(
                    name=model_name,
                    version=mv.version,
                    stage="Archived"
                )
    
    def get_production_model(self, model_name):
        """プロダクションモデルを取得"""
        filter_string = f"name='{model_name}'"
        results = self.client.search_model_versions(filter_string)
        
        for mv in results:
            if mv.current_stage == "Production":
                return {
                    'version': mv.version,
                    'run_id': mv.run_id,
                    'uri': mv.source,
                    'status': mv.status
                }
                
        return None
    
    def compare_models(self, model_name, version1, version2):
        """2つのモデルバージョンを比較"""
        def get_run_data(version):
            mv = self.client.get_model_version(model_name, version)
            run = self.client.get_run(mv.run_id)
            return run.data
            
        run1 = get_run_data(version1)
        run2 = get_run_data(version2)
        
        comparison = {
            'metrics': {},
            'params': {}
        }
        
        # メトリクスの比較
        for key in run1.metrics.keys():
            if key in run2.metrics:
                comparison['metrics'][key] = {
                    'v1': run1.metrics[key],
                    'v2': run2.metrics[key],
                    'improvement': run2.metrics[key] - run1.metrics[key]
                }
                
        # パラメータの比較
        for key in run1.params.keys():
            if key in run2.params:
                if run1.params[key] != run2.params[key]:
                    comparison['params'][key] = {
                        'v1': run1.params[key],
                        'v2': run2.params[key]
                    }
                    
        return comparison

カスタムモデルレジストリ

S3ベースのレジストリ

# s3_model_registry.py
import boto3
import json
from datetime import datetime
from pathlib import Path
import hashlib

class S3ModelRegistry:
    def __init__(self, bucket_name, prefix='models'):
        self.s3 = boto3.client('s3')
        self.bucket_name = bucket_name
        self.prefix = prefix
        
    def upload_model(self, local_path, model_name, version=None, metadata=None):
        """モデルをS3にアップロード"""
        local_path = Path(local_path)
        
        # バージョンを生成
        if version is None:
            version = datetime.now().strftime('%Y%m%d_%H%M%S')
            
        # S3キーを生成
        s3_key = f"{self.prefix}/{model_name}/{version}/model.pth"
        
        # メタデータを準備
        upload_metadata = {
            'model_name': model_name,
            'version': version,
            'upload_time': datetime.now().isoformat(),
            'file_size': local_path.stat().st_size,
            'file_hash': self._calculate_hash(local_path)
        }
        
        if metadata:
            upload_metadata.update(metadata)
            
        # モデルをアップロード
        self.s3.upload_file(
            str(local_path),
            self.bucket_name,
            s3_key,
            ExtraArgs={
                'Metadata': {
                    k: str(v) for k, v in upload_metadata.items()
                }
            }
        )
        
        # メタデータファイルをアップロード
        metadata_key = f"{self.prefix}/{model_name}/{version}/metadata.json"
        self.s3.put_object(
            Bucket=self.bucket_name,
            Key=metadata_key,
            Body=json.dumps(upload_metadata, indent=2)
        )
        
        return {
            'model_name': model_name,
            'version': version,
            's3_uri': f"s3://{self.bucket_name}/{s3_key}",
            'metadata_uri': f"s3://{self.bucket_name}/{metadata_key}"
        }
    
    def list_models(self, model_name=None):
        """モデル一覧を取得"""
        prefix = f"{self.prefix}/"
        if model_name:
            prefix += f"{model_name}/"
            
        response = self.s3.list_objects_v2(
            Bucket=self.bucket_name,
            Prefix=prefix,
            Delimiter='/'
        )
        
        models = []
        
        if 'CommonPrefixes' in response:
            for prefix_info in response['CommonPrefixes']:
                model_path = prefix_info['Prefix']
                model_name = model_path.split('/')[-2]
                
                # バージョン一覧を取得
                versions = self.list_versions(model_name)
                
                models.append({
                    'model_name': model_name,
                    'versions': versions,
                    'latest_version': versions[0] if versions else None
                })
                
        return models
    
    def list_versions(self, model_name):
        """モデルのバージョン一覧を取得"""
        prefix = f"{self.prefix}/{model_name}/"
        
        response = self.s3.list_objects_v2(
            Bucket=self.bucket_name,
            Prefix=prefix,
            Delimiter='/'
        )
        
        versions = []
        
        if 'CommonPrefixes' in response:
            for prefix_info in response['CommonPrefixes']:
                version_path = prefix_info['Prefix']
                version = version_path.split('/')[-2]
                
                # メタデータを取得
                metadata = self.get_metadata(model_name, version)
                
                versions.append({
                    'version': version,
                    'created_at': metadata.get('upload_time'),
                    'size': metadata.get('file_size'),
                    'metrics': metadata.get('metrics', {})
                })
                
        # 日付順にソート
        versions.sort(key=lambda x: x['created_at'], reverse=True)
        
        return versions
    
    def download_model(self, model_name, version, local_path):
        """モデルをダウンロード"""
        s3_key = f"{self.prefix}/{model_name}/{version}/model.pth"
        
        self.s3.download_file(
            self.bucket_name,
            s3_key,
            str(local_path)
        )
        
        return local_path
    
    def _calculate_hash(self, file_path):
        """ファイルのハッシュを計算"""
        sha256_hash = hashlib.sha256()
        
        with open(file_path, "rb") as f:
            for byte_block in iter(lambda: f.read(4096), b""):
                sha256_hash.update(byte_block)
                
        return sha256_hash.hexdigest()

13.4 アクセス制御を考慮したデータ管理

データアクセス階層

アクセス制御の実装

# data_access_control.py
from enum import Enum
import json
from pathlib import Path

class DataAccessLevel(Enum):
    PUBLIC = "public"
    INTERNAL = "internal"
    RESTRICTED = "restricted"
    CONFIDENTIAL = "confidential"

class DataAccessController:
    def __init__(self, config_path='data_access_config.json'):
        self.config = self._load_config(config_path)
        self.user_permissions = {}
        
    def _load_config(self, config_path):
        """アクセス制御設定を読み込み"""
        with open(config_path) as f:
            return json.load(f)
            
    def grant_access(self, user, data_path, access_level):
        """ユーザーにデータアクセス権限を付与"""
        if user not in self.user_permissions:
            self.user_permissions[user] = {}
            
        self.user_permissions[user][str(data_path)] = access_level
        
        # 監査ログに記録
        self._log_access_grant(user, data_path, access_level)
        
    def check_access(self, user, data_path):
        """アクセス権限をチェック"""
        data_path = str(data_path)
        
        # ユーザーの権限を確認
        if user in self.user_permissions:
            if data_path in self.user_permissions[user]:
                return True
                
        # データのアクセスレベルを確認
        data_level = self._get_data_access_level(data_path)
        user_level = self._get_user_access_level(user)
        
        return self._compare_access_levels(user_level, data_level)
    
    def encrypt_sensitive_data(self, data_path, access_level):
        """機密データを暗号化"""
        if access_level in [DataAccessLevel.RESTRICTED, DataAccessLevel.CONFIDENTIAL]:
            # 暗号化処理
            from cryptography.fernet import Fernet
            
            key = self._get_encryption_key(access_level)
            fernet = Fernet(key)
            
            with open(data_path, 'rb') as f:
                encrypted_data = fernet.encrypt(f.read())
                
            encrypted_path = data_path.with_suffix('.encrypted')
            with open(encrypted_path, 'wb') as f:
                f.write(encrypted_data)
                
            # 元ファイルを削除
            data_path.unlink()
            
            return encrypted_path
            
        return data_path
    
    def create_data_manifest(self, data_dir):
        """データマニフェストを作成"""
        manifest = {
            'created_at': datetime.now().isoformat(),
            'data_files': []
        }
        
        for data_file in Path(data_dir).rglob('*'):
            if data_file.is_file():
                file_info = {
                    'path': str(data_file.relative_to(data_dir)),
                    'size': data_file.stat().st_size,
                    'hash': self._calculate_hash(data_file),
                    'access_level': self._get_data_access_level(data_file),
                    'encrypted': data_file.suffix == '.encrypted'
                }
                manifest['data_files'].append(file_info)
                
        manifest_path = Path(data_dir) / 'manifest.json'
        with open(manifest_path, 'w') as f:
            json.dump(manifest, f, indent=2)
            
        return manifest_path

データガバナンス

データ系譜の追跡

# data_lineage.py
import networkx as nx
from datetime import datetime
import json

class DataLineageTracker:
    def __init__(self):
        self.lineage_graph = nx.DiGraph()
        
    def add_data_source(self, source_id, metadata):
        """データソースを追加"""
        self.lineage_graph.add_node(
            source_id,
            type='source',
            created_at=datetime.now().isoformat(),
            **metadata
        )
        
    def add_transformation(self, input_ids, output_id, transformation_metadata):
        """データ変換を記録"""
        # 出力ノードを追加
        self.lineage_graph.add_node(
            output_id,
            type='derived',
            created_at=datetime.now().isoformat(),
            **transformation_metadata
        )
        
        # 入力から出力へのエッジを追加
        for input_id in input_ids:
            self.lineage_graph.add_edge(
                input_id,
                output_id,
                transformation=transformation_metadata.get('operation', 'unknown')
            )
            
    def get_data_provenance(self, data_id):
        """データの来歴を取得"""
        if data_id not in self.lineage_graph:
            return None
            
        # 祖先ノードを取得
        ancestors = nx.ancestors(self.lineage_graph, data_id)
        
        # サブグラフを抽出
        nodes = list(ancestors) + [data_id]
        subgraph = self.lineage_graph.subgraph(nodes)
        
        # 来歴情報を構築
        provenance = {
            'data_id': data_id,
            'lineage': []
        }
        
        # トポロジカルソート
        for node in nx.topological_sort(subgraph):
            node_data = self.lineage_graph.nodes[node]
            
            predecessors = list(subgraph.predecessors(node))
            
            provenance['lineage'].append({
                'id': node,
                'type': node_data['type'],
                'created_at': node_data['created_at'],
                'inputs': predecessors,
                'metadata': {
                    k: v for k, v in node_data.items()
                    if k not in ['type', 'created_at']
                }
            })
            
        return provenance
    
    def export_lineage(self, output_path):
        """系譜グラフをエクスポート"""
        lineage_data = {
            'nodes': [
                {
                    'id': node,
                    **self.lineage_graph.nodes[node]
                }
                for node in self.lineage_graph.nodes()
            ],
            'edges': [
                {
                    'source': edge[0],
                    'target': edge[1],
                    **self.lineage_graph.edges[edge]
                }
                for edge in self.lineage_graph.edges()
            ]
        }
        
        with open(output_path, 'w') as f:
            json.dump(lineage_data, f, indent=2)
            
    def visualize_lineage(self, data_id=None, output_path='lineage.png'):
        """系譜を可視化"""
        import matplotlib.pyplot as plt
        
        if data_id:
            # 特定のデータの系譜のみ
            ancestors = nx.ancestors(self.lineage_graph, data_id)
            nodes = list(ancestors) + [data_id]
            subgraph = self.lineage_graph.subgraph(nodes)
        else:
            subgraph = self.lineage_graph
            
        # レイアウトを計算
        pos = nx.spring_layout(subgraph)
        
        # ノードの色を設定
        node_colors = []
        for node in subgraph.nodes():
            if subgraph.nodes[node]['type'] == 'source':
                node_colors.append('lightblue')
            else:
                node_colors.append('lightgreen')
                
        # グラフを描画
        plt.figure(figsize=(12, 8))
        nx.draw(
            subgraph,
            pos,
            node_color=node_colors,
            with_labels=True,
            node_size=3000,
            font_size=8,
            font_weight='bold',
            arrows=True,
            edge_color='gray'
        )
        
        plt.title('Data Lineage Graph')
        plt.axis('off')
        plt.tight_layout()
        plt.savefig(output_path, dpi=300, bbox_inches='tight')
        plt.close()

14.5 AI協働実験の追跡と管理

実験管理システム

AI協働実験トラッカー

# ai_experiment_tracker.py
import json
import yaml
from pathlib import Path
from datetime import datetime
import pandas as pd

class AICollaborativeExperimentTracker:
    def __init__(self, experiments_dir='experiments'):
        self.experiments_dir = Path(experiments_dir)
        self.experiments_dir.mkdir(exist_ok=True)
        
    def start_experiment(self, name, config, description="", ai_collaboration=None):
        """AI協働実験を開始"""
        exp_id = f"{name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
        exp_dir = self.experiments_dir / exp_id
        exp_dir.mkdir()
        
        # 実験設定を保存
        config_path = exp_dir / 'config.yaml'
        with open(config_path, 'w') as f:
            yaml.dump(config, f)
            
        # AI協働情報の記録
        ai_info = ai_collaboration or {}
        ai_metadata = {
            'ai_tools_used': ai_info.get('tools', []),
            'ai_assistance_level': ai_info.get('level', 'none'),  # none, low, medium, high
            'ai_generated_components': ai_info.get('components', []),
            'prompt_templates': ai_info.get('prompts', []),
            'human_review_points': ai_info.get('review_points', [])
        }
            
        # 実験メタデータを保存(AI協働情報含む)
        metadata = {
            'id': exp_id,
            'name': name,
            'description': description,
            'status': 'running',
            'start_time': datetime.now().isoformat(),
            'git_commit': self._get_git_commit(),
            'config_path': str(config_path),
            'ai_collaboration': ai_metadata,  # 第2章のフレームワーク適用
            'collaboration_framework': 'CLEAR'  # 使用したフレームワーク
        }
        
        metadata_path = exp_dir / 'metadata.json'
        with open(metadata_path, 'w') as f:
            json.dump(metadata, f, indent=2)
            
        return exp_id
    
    def log_metrics(self, exp_id, metrics, step=None):
        """メトリクスを記録"""
        exp_dir = self.experiments_dir / exp_id
        metrics_file = exp_dir / 'metrics.jsonl'
        
        entry = {
            'timestamp': datetime.now().isoformat(),
            'step': step,
            **metrics
        }
        
        with open(metrics_file, 'a') as f:
            f.write(json.dumps(entry) + '\n')
            
    def log_artifact(self, exp_id, artifact_path, artifact_type='model'):
        """成果物を記録"""
        exp_dir = self.experiments_dir / exp_id
        artifacts_dir = exp_dir / 'artifacts'
        artifacts_dir.mkdir(exist_ok=True)
        
        # 成果物をコピー
        artifact_path = Path(artifact_path)
        dest_path = artifacts_dir / artifact_path.name
        
        import shutil
        shutil.copy2(artifact_path, dest_path)
        
        # 成果物情報を記録
        artifact_info = {
            'type': artifact_type,
            'original_path': str(artifact_path),
            'stored_path': str(dest_path),
            'size': artifact_path.stat().st_size,
            'timestamp': datetime.now().isoformat()
        }
        
        artifacts_log = exp_dir / 'artifacts.jsonl'
        with open(artifacts_log, 'a') as f:
            f.write(json.dumps(artifact_info) + '\n')
            
    def complete_experiment(self, exp_id, final_metrics=None):
        """実験を完了"""
        exp_dir = self.experiments_dir / exp_id
        
        # メタデータを更新
        metadata_path = exp_dir / 'metadata.json'
        with open(metadata_path) as f:
            metadata = json.load(f)
            
        metadata['status'] = 'completed'
        metadata['end_time'] = datetime.now().isoformat()
        
        if final_metrics:
            metadata['final_metrics'] = final_metrics
            
        with open(metadata_path, 'w') as f:
            json.dump(metadata, f, indent=2)
            
        # サマリーレポートを生成
        self._generate_summary(exp_id)
        
    def compare_experiments(self, exp_ids, metrics_to_compare=None):
        """複数の実験を比較"""
        comparison_data = []
        
        for exp_id in exp_ids:
            exp_dir = self.experiments_dir / exp_id
            
            # メタデータを読み込み
            with open(exp_dir / 'metadata.json') as f:
                metadata = json.load(f)
                
            # 最終メトリクスを取得
            final_metrics = metadata.get('final_metrics', {})
            
            # 設定を読み込み
            with open(exp_dir / 'config.yaml') as f:
                config = yaml.safe_load(f)
                
            row = {
                'experiment_id': exp_id,
                'name': metadata['name'],
                'status': metadata['status'],
                **final_metrics,
                **self._flatten_dict(config, prefix='config.')
            }
            
            comparison_data.append(row)
            
        # DataFrameに変換
        df = pd.DataFrame(comparison_data)
        
        # 指定されたメトリクスのみを表示
        if metrics_to_compare:
            columns = ['experiment_id', 'name'] + metrics_to_compare
            df = df[columns]
            
        return df
    
    def _generate_summary(self, exp_id):
        """実験サマリーを生成"""
        exp_dir = self.experiments_dir / exp_id
        
        # メトリクスの履歴を読み込み
        metrics_history = []
        metrics_file = exp_dir / 'metrics.jsonl'
        
        if metrics_file.exists():
            with open(metrics_file) as f:
                for line in f:
                    metrics_history.append(json.loads(line))
                    
        # サマリーレポートを作成
        summary = {
            'experiment_id': exp_id,
            'metrics_summary': self._summarize_metrics(metrics_history),
            'artifacts': self._list_artifacts(exp_dir),
            'duration': self._calculate_duration(exp_dir)
        }
        
        summary_path = exp_dir / 'summary.json'
        with open(summary_path, 'w') as f:
            json.dump(summary, f, indent=2)
            
    def _flatten_dict(self, d, prefix=''):
        """辞書をフラット化"""
        items = []
        for k, v in d.items():
            new_key = f"{prefix}{k}" if prefix else k
            if isinstance(v, dict):
                items.extend(self._flatten_dict(v, f"{new_key}.").items())
            else:
                items.append((new_key, v))
        return dict(items)

まとめ

本章では、大規模データとモデルの管理について学習しました:

  • Git LFSで大容量ファイルを効率的に管理
  • DVCでデータパイプラインをバージョン管理
  • MLflowやカスタムレジストリでモデルを体系的に管理
  • アクセス制御とデータガバナンスの実装
  • 実験結果の追跡と比較分析

次章では、外部協力者との連携について学習します。

確認事項

  • Git LFSを設定して大容量ファイルを管理できる
  • DVCでデータパイプラインを構築できる
  • モデルレジストリを活用できる
  • データアクセス制御を実装できる
  • 実験結果を体系的に管理できる