I.5 パフォーマンス最適化とベストプラクティス

I.5.1 効率的なデータアクセス設計

import asyncio
import aiohttp
import aiofiles
import aiomysql
import redis.asyncio as redis
import time
from typing import List, Dict, Optional, AsyncGenerator
from dataclasses import dataclass, asdict
import json
import hashlib
from pathlib import Path
import logging
from contextlib import asynccontextmanager

@dataclass
class QueryMetrics:
    """クエリ性能メトリクス"""
    query_id: str
    database: str
    start_time: float
    end_time: float
    response_size: int
    cache_hit: bool
    error: Optional[str] = None
    
    @property
    def duration(self) -> float:
        return self.end_time - self.start_time

class PerformanceOptimizer:
    """データベースアクセス性能最適化システム"""
    
    def __init__(self, config: Dict):
        """
        Args:
            config: 設定辞書
        """
        self.config = config
        self.logger = logging.getLogger(__name__)
        
        # Redis接続プール
        self.redis_pool = None
        
        # MySQL接続プール  
        self.mysql_pool = None
        
        # HTTP セッション管理
        self.http_sessions = {}
        
        # メトリクス収集
        self.metrics = []
        
        # キャッシュ戦略
        self.cache_strategies = {
            "uniprot": {"ttl": 3600, "prefix": "up:"},      # 1時間
            "ensembl": {"ttl": 7200, "prefix": "en:"},      # 2時間
            "ncbi": {"ttl": 1800, "prefix": "ncbi:"},       # 30分
            "kegg": {"ttl": 14400, "prefix": "kegg:"}       # 4時間
        }
    
    async def initialize(self):
        """リソースの初期化"""
        try:
            # Redis接続プール初期化
            if self.config.get("redis", {}).get("enabled", False):
                redis_config = self.config["redis"]
                self.redis_pool = redis.ConnectionPool(
                    host=redis_config.get("host", "localhost"),
                    port=redis_config.get("port", 6379),
                    db=redis_config.get("db", 0),
                    max_connections=redis_config.get("max_connections", 20),
                    decode_responses=True
                )
                self.logger.info("Redis接続プール初期化完了")
            
            # MySQL接続プール初期化
            if self.config.get("mysql", {}).get("enabled", False):
                mysql_config = self.config["mysql"]
                self.mysql_pool = await aiomysql.create_pool(
                    host=mysql_config.get("host", "localhost"),
                    port=mysql_config.get("port", 3306),
                    user=mysql_config.get("user", "root"),
                    password=mysql_config.get("password", ""),
                    db=mysql_config.get("database", "bioinformatics"),
                    minsize=mysql_config.get("min_connections", 5),
                    maxsize=mysql_config.get("max_connections", 20),
                    autocommit=True
                )
                self.logger.info("MySQL接続プール初期化完了")
            
            # HTTP セッションプール初期化
            for db_name, db_config in self.config.get("databases", {}).items():
                connector = aiohttp.TCPConnector(
                    limit=db_config.get("max_connections", 10),
                    limit_per_host=db_config.get("limit_per_host", 5),
                    ttl_dns_cache=300,
                    use_dns_cache=True
                )
                
                timeout = aiohttp.ClientTimeout(
                    total=db_config.get("timeout", 30),
                    connect=db_config.get("connect_timeout", 10)
                )
                
                self.http_sessions[db_name] = aiohttp.ClientSession(
                    connector=connector,
                    timeout=timeout,
                    headers=db_config.get("headers", {})
                )
            
            self.logger.info("パフォーマンス最適化システム初期化完了")
            
        except Exception as e:
            self.logger.error(f"初期化エラー: {e}")
            raise
    
    async def cleanup(self):
        """リソースのクリーンアップ"""
        # HTTPセッション閉じる
        for session in self.http_sessions.values():
            await session.close()
        
        # MySQL接続プール閉じる
        if self.mysql_pool:
            self.mysql_pool.close()
            await self.mysql_pool.wait_closed()
        
        # Redis接続プール閉じる
        if self.redis_pool:
            await self.redis_pool.disconnect()
        
        self.logger.info("リソースクリーンアップ完了")
    
    def _generate_cache_key(self, database: str, query: str, params: Dict = None) -> str:
        """キャッシュキーの生成"""
        cache_data = {"db": database, "query": query, "params": params or {}}
        cache_string = json.dumps(cache_data, sort_keys=True)
        hash_key = hashlib.md5(cache_string.encode()).hexdigest()
        
        prefix = self.cache_strategies.get(database, {}).get("prefix", "default:")
        return f"{prefix}{hash_key}"
    
    @asynccontextmanager
    async def get_redis_connection(self):
        """Redis接続の取得(コンテキストマネージャ)"""
        if not self.redis_pool:
            yield None
            return
        
        connection = redis.Redis(connection_pool=self.redis_pool)
        try:
            yield connection
        finally:
            await connection.close()
    
    async def get_cached_result(self, cache_key: str) -> Optional[Dict]:
        """キャッシュからの結果取得"""
        async with self.get_redis_connection() as redis_conn:
            if not redis_conn:
                return None
            
            try:
                cached_data = await redis_conn.get(cache_key)
                if cached_data:
                    return json.loads(cached_data)
            except Exception as e:
                self.logger.warning(f"キャッシュ取得エラー: {e}")
            
        return None
    
    async def set_cached_result(self, cache_key: str, data: Dict, database: str):
        """結果のキャッシュ保存"""
        async with self.get_redis_connection() as redis_conn:
            if not redis_conn:
                return
            
            try:
                ttl = self.cache_strategies.get(database, {}).get("ttl", 3600)
                await redis_conn.setex(
                    cache_key, 
                    ttl, 
                    json.dumps(data, ensure_ascii=False)
                )
            except Exception as e:
                self.logger.warning(f"キャッシュ保存エラー: {e}")
    
    async def batch_query_with_optimization(self, 
                                          database: str,
                                          queries: List[Dict],
                                          batch_size: int = 10,
                                          enable_cache: bool = True) -> List[Dict]:
        """最適化されたバッチクエリ実行"""
        results = []
        
        # クエリをバッチに分割
        for i in range(0, len(queries), batch_size):
            batch = queries[i:i + batch_size]
            self.logger.info(f"バッチ {i//batch_size + 1}/{(len(queries)-1)//batch_size + 1} 実行中")
            
            batch_results = await self._execute_batch_with_cache(
                database, batch, enable_cache
            )
            results.extend(batch_results)
            
            # バッチ間の適切な待機
            if i + batch_size < len(queries):
                rate_limit = self.config.get("databases", {}).get(database, {}).get("rate_limit", 1.0)
                await asyncio.sleep(rate_limit)
        
        return results
    
    async def _execute_batch_with_cache(self, 
                                       database: str, 
                                       batch: List[Dict],
                                       enable_cache: bool) -> List[Dict]:
        """キャッシュ機能付きバッチ実行"""
        results = []
        uncached_queries = []
        cache_keys = []
        
        # キャッシュチェック
        if enable_cache:
            for query in batch:
                cache_key = self._generate_cache_key(database, 
                                                   query.get("endpoint", ""), 
                                                   query.get("params"))
                cache_keys.append(cache_key)
                
                cached_result = await self.get_cached_result(cache_key)
                if cached_result:
                    # キャッシュヒット
                    results.append(cached_result)
                    self._record_metrics(database, True, len(str(cached_result)))
                else:
                    # キャッシュミス
                    uncached_queries.append((query, cache_key))
        else:
            uncached_queries = [(query, None) for query in batch]
        
        # キャッシュミスしたクエリを並列実行
        if uncached_queries:
            session = self.http_sessions.get(database)
            if not session:
                self.logger.error(f"セッションが見つかりません: {database}")
                return results
            
            tasks = []
            for query, cache_key in uncached_queries:
                task = self._execute_single_query(session, database, query, cache_key, enable_cache)
                tasks.append(task)
            
            query_results = await asyncio.gather(*tasks, return_exceptions=True)
            
            for result in query_results:
                if isinstance(result, Exception):
                    self.logger.error(f"クエリ実行エラー: {result}")
                    results.append({"error": str(result)})
                else:
                    results.append(result)
        
        return results
    
    async def _execute_single_query(self, 
                                   session: aiohttp.ClientSession,
                                   database: str,
                                   query: Dict,
                                   cache_key: Optional[str],
                                   enable_cache: bool) -> Dict:
        """単一クエリの実行"""
        start_time = time.time()
        
        try:
            url = query.get("url", "")
            params = query.get("params", {})
            headers = query.get("headers", {})
            method = query.get("method", "GET")
            
            if method.upper() == "GET":
                async with session.get(url, params=params, headers=headers) as response:
                    response.raise_for_status()
                    
                    content_type = response.headers.get('content-type', '')
                    if 'application/json' in content_type:
                        result = await response.json()
                    else:
                        result = {"text": await response.text()}
            
            elif method.upper() == "POST":
                data = query.get("data", {})
                async with session.post(url, json=data, params=params, headers=headers) as response:
                    response.raise_for_status()
                    result = await response.json()
            
            else:
                raise ValueError(f"サポートされていないHTTPメソッド: {method}")
            
            # レスポンスサイズ計算
            response_size = len(json.dumps(result))
            
            # キャッシュ保存
            if enable_cache and cache_key:
                await self.set_cached_result(cache_key, result, database)
            
            # メトリクス記録
            self._record_metrics(database, False, response_size)
            
            return result
            
        except Exception as e:
            end_time = time.time()
            self.logger.error(f"クエリ実行エラー ({database}): {e}")
            
            # エラーメトリクス記録
            metrics = QueryMetrics(
                query_id=str(hash(str(query))),
                database=database,
                start_time=start_time,
                end_time=end_time,
                response_size=0,
                cache_hit=False,
                error=str(e)
            )
            self.metrics.append(metrics)
            
            return {"error": str(e)}
    
    def _record_metrics(self, database: str, cache_hit: bool, response_size: int):
        """メトリクスの記録"""
        metrics = QueryMetrics(
            query_id=f"{database}_{int(time.time())}",
            database=database,
            start_time=time.time(),
            end_time=time.time(),
            response_size=response_size,
            cache_hit=cache_hit
        )
        self.metrics.append(metrics)
    
    async def optimize_database_schema(self):
        """データベーススキーマの最適化"""
        if not self.mysql_pool:
            return
        
        optimization_queries = [
            # インデックス最適化
            """
            ALTER TABLE genes 
            ADD INDEX idx_gene_symbol (gene_symbol),
            ADD INDEX idx_chromosome (chromosome),
            ADD INDEX idx_biotype (biotype)
            """,
            
            """
            ALTER TABLE functional_annotations 
            ADD INDEX idx_gene_annotation (gene_id, annotation_type),
            ADD INDEX idx_source_db (source_database),
            ADD INDEX idx_confidence (confidence_score)
            """,
            
            """
            ALTER TABLE pathway_annotations 
            ADD INDEX idx_gene_pathway (gene_id, pathway_id),
            ADD INDEX idx_pathway_category (pathway_category)
            """,
            
            """
            ALTER TABLE protein_interactions 
            ADD INDEX idx_gene_pair (gene_a_id, gene_b_id),
            ADD INDEX idx_interaction_type (interaction_type),
            ADD INDEX idx_confidence_score (confidence_score)
            """,
            
            # パーティション設定(日付ベース)
            """
            ALTER TABLE analysis_cache 
            PARTITION BY RANGE (TO_DAYS(created_at)) (
                PARTITION p0 VALUES LESS THAN (TO_DAYS('2024-01-01')),
                PARTITION p1 VALUES LESS THAN (TO_DAYS('2024-04-01')),
                PARTITION p2 VALUES LESS THAN (TO_DAYS('2024-07-01')),
                PARTITION p3 VALUES LESS THAN (TO_DAYS('2024-10-01')),
                PARTITION p4 VALUES LESS THAN (TO_DAYS('2025-01-01'))
            )
            """
        ]
        
        async with self.mysql_pool.acquire() as connection:
            async with connection.cursor() as cursor:
                for query in optimization_queries:
                    try:
                        await cursor.execute(query)
                        self.logger.info(f"スキーマ最適化実行: {query[:50]}...")
                    except Exception as e:
                        self.logger.warning(f"スキーマ最適化スキップ: {e}")
    
    def generate_performance_report(self) -> Dict:
        """パフォーマンスレポートの生成"""
        if not self.metrics:
            return {"message": "メトリクスデータがありません"}
        
        # 基本統計
        total_queries = len(self.metrics)
        cache_hits = sum(1 for m in self.metrics if m.cache_hit)
        errors = sum(1 for m in self.metrics if m.error)
        
        # データベース別統計
        db_stats = {}
        for metric in self.metrics:
            db = metric.database
            if db not in db_stats:
                db_stats[db] = {
                    "total_queries": 0,
                    "cache_hits": 0,
                    "errors": 0,
                    "total_duration": 0,
                    "total_size": 0
                }
            
            db_stats[db]["total_queries"] += 1
            if metric.cache_hit:
                db_stats[db]["cache_hits"] += 1
            if metric.error:
                db_stats[db]["errors"] += 1
            db_stats[db]["total_duration"] += metric.duration
            db_stats[db]["total_size"] += metric.response_size
        
        # パフォーマンスサマリー
        report = {
            "summary": {
                "total_queries": total_queries,
                "cache_hit_rate": cache_hits / total_queries if total_queries > 0 else 0,
                "error_rate": errors / total_queries if total_queries > 0 else 0,
                "average_response_time": sum(m.duration for m in self.metrics) / total_queries if total_queries > 0 else 0
            },
            "database_stats": {},
            "recommendations": []
        }
        
        # データベース別詳細統計
        for db, stats in db_stats.items():
            report["database_stats"][db] = {
                "total_queries": stats["total_queries"],
                "cache_hit_rate": stats["cache_hits"] / stats["total_queries"] if stats["total_queries"] > 0 else 0,
                "error_rate": stats["errors"] / stats["total_queries"] if stats["total_queries"] > 0 else 0,
                "average_response_time": stats["total_duration"] / stats["total_queries"] if stats["total_queries"] > 0 else 0,
                "average_response_size": stats["total_size"] / stats["total_queries"] if stats["total_queries"] > 0 else 0
            }
        
        # 最適化推奨事項
        for db, stats in report["database_stats"].items():
            if stats["cache_hit_rate"] < 0.5:
                report["recommendations"].append(f"{db}: キャッシュTTLの延長を検討")
            if stats["error_rate"] > 0.1:
                report["recommendations"].append(f"{db}: エラーハンドリング強化が必要")
            if stats["average_response_time"] > 5.0:
                report["recommendations"].append(f"{db}: レスポンス時間最適化が必要")
        
        return report

# 使用例
async def main():
    """パフォーマンス最適化デモ"""
    config = {
        "redis": {
            "enabled": True,
            "host": "localhost",
            "port": 6379,
            "db": 0,
            "max_connections": 20
        },
        "mysql": {
            "enabled": False,  # MySQL未使用の場合
            "host": "localhost",
            "port": 3306,
            "user": "root",
            "password": "",
            "database": "bioinformatics",
            "min_connections": 5,
            "max_connections": 20
        },
        "databases": {
            "uniprot": {
                "base_url": "https://rest.uniprot.org",
                "max_connections": 10,
                "limit_per_host": 5,
                "timeout": 30,
                "rate_limit": 1.0,
                "headers": {"Accept": "application/json"}
            },
            "ensembl": {
                "base_url": "https://rest.ensembl.org",
                "max_connections": 8,
                "limit_per_host": 4,
                "timeout": 20,
                "rate_limit": 0.5,
                "headers": {"Content-Type": "application/json"}
            }
        }
    }
    
    optimizer = PerformanceOptimizer(config)
    
    try:
        await optimizer.initialize()
        
        # サンプルクエリ生成
        sample_queries = []
        genes = ["BRCA1", "TP53", "EGFR", "MYC", "KRAS"]
        
        for gene in genes:
            sample_queries.append({
                "url": f"{config['databases']['uniprot']['base_url']}/uniprotkb/search",
                "params": {"query": f"gene:{gene} AND organism_id:9606", "format": "json", "size": 1},
                "method": "GET"
            })
        
        # 最適化されたバッチクエリ実行
        print("最適化バッチクエリ実行中...")
        results = await optimizer.batch_query_with_optimization(
            database="uniprot",
            queries=sample_queries,
            batch_size=3,
            enable_cache=True
        )
        
        print(f"結果取得: {len(results)}")
        
        # 2回目実行(キャッシュ効果確認)
        print("\n2回目実行(キャッシュテスト)...")
        results2 = await optimizer.batch_query_with_optimization(
            database="uniprot",
            queries=sample_queries,
            batch_size=3,
            enable_cache=True
        )
        
        # パフォーマンスレポート生成
        report = optimizer.generate_performance_report()
        print("\n=== パフォーマンスレポート ===")
        print(f"総クエリ数: {report['summary']['total_queries']}")
        print(f"キャッシュヒット率: {report['summary']['cache_hit_rate']:.2%}")
        print(f"エラー率: {report['summary']['error_rate']:.2%}")
        print(f"平均レスポンス時間: {report['summary']['average_response_time']:.3f}")
        
        if report['recommendations']:
            print("\n推奨事項:")
            for rec in report['recommendations']:
                print(f"- {rec}")
        
    finally:
        await optimizer.cleanup()

if __name__ == "__main__":
    # Redis不要な場合の簡易版
    asyncio.run(main())

まとめ

本付録では、バイオインフォマティクス研究における主要データベースの実践的な活用方法を包括的に解説しました。

🎯 キーポイント

  1. 戦略的データベース選択: 研究クエスチョンに応じた効果的なデータベースの選び方
  2. 効率的なデータアクセス: API活用、バッチ処理、キャッシュ戦略による高速化
  3. 統合解析アプローチ: 複数データベースの情報を統合した包括的解析
  4. パフォーマンス最適化: 大規模データ処理のための実装上の工夫

💡 実践での活用

  • 研究初期段階: データベース横断検索による仮説生成
  • 詳細解析段階: 特化データベースを用いた深掘り調査
  • 統合解析段階: マルチオミクスデータの統合的解釈
  • 結果検証段階: 複数ソースでのクロスバリデーション

🔄 継続的改善

データベースの内容やAPIは常に更新されるため、定期的な手法の見直しと最適化が重要です。本ガイドで紹介した基本的なアプローチを基に、各研究プロジェクトに最適化したワークフローを構築してください。


本節は分割されました: I.2 主要データベースの実践的活用法

本付録は分割されています: I.1 概要 / I.2 主要DB ほか

本節は分割されました: I.3 専門データベースの戦略的活用

本節は分割されました: 付録I-4: 実践的なワークフロー構築