I.5 パフォーマンス最適化とベストプラクティス
I.5.1 効率的なデータアクセス設計
🧪 概念例(擬似コード)
import asyncio
import aiohttp
import aiofiles
import aiomysql
import redis.asyncio as redis
import time
from typing import List, Dict, Optional, AsyncGenerator
from dataclasses import dataclass, asdict
import json
import hashlib
from pathlib import Path
import logging
from contextlib import asynccontextmanager
@dataclass
class QueryMetrics:
"""クエリ性能メトリクス"""
query_id: str
database: str
start_time: float
end_time: float
response_size: int
cache_hit: bool
error: Optional[str] = None
@property
def duration(self) -> float:
return self.end_time - self.start_time
class PerformanceOptimizer:
"""データベースアクセス性能最適化システム"""
def __init__(self, config: Dict):
"""
Args:
config: 設定辞書
"""
self.config = config
self.logger = logging.getLogger(__name__)
# Redis接続プール
self.redis_pool = None
# MySQL接続プール
self.mysql_pool = None
# HTTP セッション管理
self.http_sessions = {}
# メトリクス収集
self.metrics = []
# キャッシュ戦略
self.cache_strategies = {
"uniprot": {"ttl": 3600, "prefix": "up:"}, # 1時間
"ensembl": {"ttl": 7200, "prefix": "en:"}, # 2時間
"ncbi": {"ttl": 1800, "prefix": "ncbi:"}, # 30分
"kegg": {"ttl": 14400, "prefix": "kegg:"} # 4時間
}
async def initialize(self):
"""リソースの初期化"""
try:
# Redis接続プール初期化
if self.config.get("redis", {}).get("enabled", False):
redis_config = self.config["redis"]
self.redis_pool = redis.ConnectionPool(
host=redis_config.get("host", "localhost"),
port=redis_config.get("port", 6379),
db=redis_config.get("db", 0),
max_connections=redis_config.get("max_connections", 20),
decode_responses=True
)
self.logger.info("Redis接続プール初期化完了")
# MySQL接続プール初期化
if self.config.get("mysql", {}).get("enabled", False):
mysql_config = self.config["mysql"]
self.mysql_pool = await aiomysql.create_pool(
host=mysql_config.get("host", "localhost"),
port=mysql_config.get("port", 3306),
user=mysql_config.get("user", "root"),
password=mysql_config.get("password", ""),
db=mysql_config.get("database", "bioinformatics"),
minsize=mysql_config.get("min_connections", 5),
maxsize=mysql_config.get("max_connections", 20),
autocommit=True
)
self.logger.info("MySQL接続プール初期化完了")
# HTTP セッションプール初期化
for db_name, db_config in self.config.get("databases", {}).items():
connector = aiohttp.TCPConnector(
limit=db_config.get("max_connections", 10),
limit_per_host=db_config.get("limit_per_host", 5),
ttl_dns_cache=300,
use_dns_cache=True
)
timeout = aiohttp.ClientTimeout(
total=db_config.get("timeout", 30),
connect=db_config.get("connect_timeout", 10)
)
self.http_sessions[db_name] = aiohttp.ClientSession(
connector=connector,
timeout=timeout,
headers=db_config.get("headers", {})
)
self.logger.info("パフォーマンス最適化システム初期化完了")
except Exception as e:
self.logger.error(f"初期化エラー: {e}")
raise
async def cleanup(self):
"""リソースのクリーンアップ"""
# HTTPセッション閉じる
for session in self.http_sessions.values():
await session.close()
# MySQL接続プール閉じる
if self.mysql_pool:
self.mysql_pool.close()
await self.mysql_pool.wait_closed()
# Redis接続プール閉じる
if self.redis_pool:
await self.redis_pool.disconnect()
self.logger.info("リソースクリーンアップ完了")
def _generate_cache_key(self, database: str, query: str, params: Dict = None) -> str:
"""キャッシュキーの生成"""
cache_data = {"db": database, "query": query, "params": params or {}}
cache_string = json.dumps(cache_data, sort_keys=True)
hash_key = hashlib.md5(cache_string.encode()).hexdigest()
prefix = self.cache_strategies.get(database, {}).get("prefix", "default:")
return f"{prefix}{hash_key}"
@asynccontextmanager
async def get_redis_connection(self):
"""Redis接続の取得(コンテキストマネージャ)"""
if not self.redis_pool:
yield None
return
connection = redis.Redis(connection_pool=self.redis_pool)
try:
yield connection
finally:
await connection.close()
async def get_cached_result(self, cache_key: str) -> Optional[Dict]:
"""キャッシュからの結果取得"""
async with self.get_redis_connection() as redis_conn:
if not redis_conn:
return None
try:
cached_data = await redis_conn.get(cache_key)
if cached_data:
return json.loads(cached_data)
except Exception as e:
self.logger.warning(f"キャッシュ取得エラー: {e}")
return None
async def set_cached_result(self, cache_key: str, data: Dict, database: str):
"""結果のキャッシュ保存"""
async with self.get_redis_connection() as redis_conn:
if not redis_conn:
return
try:
ttl = self.cache_strategies.get(database, {}).get("ttl", 3600)
await redis_conn.setex(
cache_key,
ttl,
json.dumps(data, ensure_ascii=False)
)
except Exception as e:
self.logger.warning(f"キャッシュ保存エラー: {e}")
async def batch_query_with_optimization(self,
database: str,
queries: List[Dict],
batch_size: int = 10,
enable_cache: bool = True) -> List[Dict]:
"""最適化されたバッチクエリ実行"""
results = []
# クエリをバッチに分割
for i in range(0, len(queries), batch_size):
batch = queries[i:i + batch_size]
self.logger.info(f"バッチ {i//batch_size + 1}/{(len(queries)-1)//batch_size + 1} 実行中")
batch_results = await self._execute_batch_with_cache(
database, batch, enable_cache
)
results.extend(batch_results)
# バッチ間の適切な待機
if i + batch_size < len(queries):
rate_limit = self.config.get("databases", {}).get(database, {}).get("rate_limit", 1.0)
await asyncio.sleep(rate_limit)
return results
async def _execute_batch_with_cache(self,
database: str,
batch: List[Dict],
enable_cache: bool) -> List[Dict]:
"""キャッシュ機能付きバッチ実行"""
results = []
uncached_queries = []
cache_keys = []
# キャッシュチェック
if enable_cache:
for query in batch:
cache_key = self._generate_cache_key(database,
query.get("endpoint", ""),
query.get("params"))
cache_keys.append(cache_key)
cached_result = await self.get_cached_result(cache_key)
if cached_result:
# キャッシュヒット
results.append(cached_result)
self._record_metrics(database, True, len(str(cached_result)))
else:
# キャッシュミス
uncached_queries.append((query, cache_key))
else:
uncached_queries = [(query, None) for query in batch]
# キャッシュミスしたクエリを並列実行
if uncached_queries:
session = self.http_sessions.get(database)
if not session:
self.logger.error(f"セッションが見つかりません: {database}")
return results
tasks = []
for query, cache_key in uncached_queries:
task = self._execute_single_query(session, database, query, cache_key, enable_cache)
tasks.append(task)
query_results = await asyncio.gather(*tasks, return_exceptions=True)
for result in query_results:
if isinstance(result, Exception):
self.logger.error(f"クエリ実行エラー: {result}")
results.append({"error": str(result)})
else:
results.append(result)
return results
async def _execute_single_query(self,
session: aiohttp.ClientSession,
database: str,
query: Dict,
cache_key: Optional[str],
enable_cache: bool) -> Dict:
"""単一クエリの実行"""
start_time = time.time()
try:
url = query.get("url", "")
params = query.get("params", {})
headers = query.get("headers", {})
method = query.get("method", "GET")
if method.upper() == "GET":
async with session.get(url, params=params, headers=headers) as response:
response.raise_for_status()
content_type = response.headers.get('content-type', '')
if 'application/json' in content_type:
result = await response.json()
else:
result = {"text": await response.text()}
elif method.upper() == "POST":
data = query.get("data", {})
async with session.post(url, json=data, params=params, headers=headers) as response:
response.raise_for_status()
result = await response.json()
else:
raise ValueError(f"サポートされていないHTTPメソッド: {method}")
# レスポンスサイズ計算
response_size = len(json.dumps(result))
# キャッシュ保存
if enable_cache and cache_key:
await self.set_cached_result(cache_key, result, database)
# メトリクス記録
self._record_metrics(database, False, response_size)
return result
except Exception as e:
end_time = time.time()
self.logger.error(f"クエリ実行エラー ({database}): {e}")
# エラーメトリクス記録
metrics = QueryMetrics(
query_id=str(hash(str(query))),
database=database,
start_time=start_time,
end_time=end_time,
response_size=0,
cache_hit=False,
error=str(e)
)
self.metrics.append(metrics)
return {"error": str(e)}
def _record_metrics(self, database: str, cache_hit: bool, response_size: int):
"""メトリクスの記録"""
metrics = QueryMetrics(
query_id=f"{database}_{int(time.time())}",
database=database,
start_time=time.time(),
end_time=time.time(),
response_size=response_size,
cache_hit=cache_hit
)
self.metrics.append(metrics)
async def optimize_database_schema(self):
"""データベーススキーマの最適化"""
if not self.mysql_pool:
return
optimization_queries = [
# インデックス最適化
"""
ALTER TABLE genes
ADD INDEX idx_gene_symbol (gene_symbol),
ADD INDEX idx_chromosome (chromosome),
ADD INDEX idx_biotype (biotype)
""",
"""
ALTER TABLE functional_annotations
ADD INDEX idx_gene_annotation (gene_id, annotation_type),
ADD INDEX idx_source_db (source_database),
ADD INDEX idx_confidence (confidence_score)
""",
"""
ALTER TABLE pathway_annotations
ADD INDEX idx_gene_pathway (gene_id, pathway_id),
ADD INDEX idx_pathway_category (pathway_category)
""",
"""
ALTER TABLE protein_interactions
ADD INDEX idx_gene_pair (gene_a_id, gene_b_id),
ADD INDEX idx_interaction_type (interaction_type),
ADD INDEX idx_confidence_score (confidence_score)
""",
# パーティション設定(日付ベース)
"""
ALTER TABLE analysis_cache
PARTITION BY RANGE (TO_DAYS(created_at)) (
PARTITION p0 VALUES LESS THAN (TO_DAYS('2024-01-01')),
PARTITION p1 VALUES LESS THAN (TO_DAYS('2024-04-01')),
PARTITION p2 VALUES LESS THAN (TO_DAYS('2024-07-01')),
PARTITION p3 VALUES LESS THAN (TO_DAYS('2024-10-01')),
PARTITION p4 VALUES LESS THAN (TO_DAYS('2025-01-01'))
)
"""
]
async with self.mysql_pool.acquire() as connection:
async with connection.cursor() as cursor:
for query in optimization_queries:
try:
await cursor.execute(query)
self.logger.info(f"スキーマ最適化実行: {query[:50]}...")
except Exception as e:
self.logger.warning(f"スキーマ最適化スキップ: {e}")
def generate_performance_report(self) -> Dict:
"""パフォーマンスレポートの生成"""
if not self.metrics:
return {"message": "メトリクスデータがありません"}
# 基本統計
total_queries = len(self.metrics)
cache_hits = sum(1 for m in self.metrics if m.cache_hit)
errors = sum(1 for m in self.metrics if m.error)
# データベース別統計
db_stats = {}
for metric in self.metrics:
db = metric.database
if db not in db_stats:
db_stats[db] = {
"total_queries": 0,
"cache_hits": 0,
"errors": 0,
"total_duration": 0,
"total_size": 0
}
db_stats[db]["total_queries"] += 1
if metric.cache_hit:
db_stats[db]["cache_hits"] += 1
if metric.error:
db_stats[db]["errors"] += 1
db_stats[db]["total_duration"] += metric.duration
db_stats[db]["total_size"] += metric.response_size
# パフォーマンスサマリー
report = {
"summary": {
"total_queries": total_queries,
"cache_hit_rate": cache_hits / total_queries if total_queries > 0 else 0,
"error_rate": errors / total_queries if total_queries > 0 else 0,
"average_response_time": sum(m.duration for m in self.metrics) / total_queries if total_queries > 0 else 0
},
"database_stats": {},
"recommendations": []
}
# データベース別詳細統計
for db, stats in db_stats.items():
report["database_stats"][db] = {
"total_queries": stats["total_queries"],
"cache_hit_rate": stats["cache_hits"] / stats["total_queries"] if stats["total_queries"] > 0 else 0,
"error_rate": stats["errors"] / stats["total_queries"] if stats["total_queries"] > 0 else 0,
"average_response_time": stats["total_duration"] / stats["total_queries"] if stats["total_queries"] > 0 else 0,
"average_response_size": stats["total_size"] / stats["total_queries"] if stats["total_queries"] > 0 else 0
}
# 最適化推奨事項
for db, stats in report["database_stats"].items():
if stats["cache_hit_rate"] < 0.5:
report["recommendations"].append(f"{db}: キャッシュTTLの延長を検討")
if stats["error_rate"] > 0.1:
report["recommendations"].append(f"{db}: エラーハンドリング強化が必要")
if stats["average_response_time"] > 5.0:
report["recommendations"].append(f"{db}: レスポンス時間最適化が必要")
return report
# 使用例
async def main():
"""パフォーマンス最適化デモ"""
config = {
"redis": {
"enabled": True,
"host": "localhost",
"port": 6379,
"db": 0,
"max_connections": 20
},
"mysql": {
"enabled": False, # MySQL未使用の場合
"host": "localhost",
"port": 3306,
"user": "root",
"password": "",
"database": "bioinformatics",
"min_connections": 5,
"max_connections": 20
},
"databases": {
"uniprot": {
"base_url": "https://rest.uniprot.org",
"max_connections": 10,
"limit_per_host": 5,
"timeout": 30,
"rate_limit": 1.0,
"headers": {"Accept": "application/json"}
},
"ensembl": {
"base_url": "https://rest.ensembl.org",
"max_connections": 8,
"limit_per_host": 4,
"timeout": 20,
"rate_limit": 0.5,
"headers": {"Content-Type": "application/json"}
}
}
}
optimizer = PerformanceOptimizer(config)
try:
await optimizer.initialize()
# サンプルクエリ生成
sample_queries = []
genes = ["BRCA1", "TP53", "EGFR", "MYC", "KRAS"]
for gene in genes:
sample_queries.append({
"url": f"{config['databases']['uniprot']['base_url']}/uniprotkb/search",
"params": {"query": f"gene:{gene} AND organism_id:9606", "format": "json", "size": 1},
"method": "GET"
})
# 最適化されたバッチクエリ実行
print("最適化バッチクエリ実行中...")
results = await optimizer.batch_query_with_optimization(
database="uniprot",
queries=sample_queries,
batch_size=3,
enable_cache=True
)
print(f"結果取得: {len(results)}件")
# 2回目実行(キャッシュ効果確認)
print("\n2回目実行(キャッシュテスト)...")
results2 = await optimizer.batch_query_with_optimization(
database="uniprot",
queries=sample_queries,
batch_size=3,
enable_cache=True
)
# パフォーマンスレポート生成
report = optimizer.generate_performance_report()
print("\n=== パフォーマンスレポート ===")
print(f"総クエリ数: {report['summary']['total_queries']}")
print(f"キャッシュヒット率: {report['summary']['cache_hit_rate']:.2%}")
print(f"エラー率: {report['summary']['error_rate']:.2%}")
print(f"平均レスポンス時間: {report['summary']['average_response_time']:.3f}秒")
if report['recommendations']:
print("\n推奨事項:")
for rec in report['recommendations']:
print(f"- {rec}")
finally:
await optimizer.cleanup()
if __name__ == "__main__":
# Redis不要な場合の簡易版
asyncio.run(main())
まとめ
本節では、外部API/データベースへの問い合わせを大量に行う場面での、実装上のパフォーマンス最適化(キャッシュ・並列化・レート制限・再試行・観測性)を扱いました。
🎯 キーポイント
- 計測が先: レイテンシ、スループット、エラー率、キャッシュヒット率などを計測し、改善対象を特定する
- バッチ処理と並列化: I/O待ちを隠蔽しつつ、同時実行数を制御して過負荷を避ける
- キャッシュ戦略: キャッシュキー/TTLを設計し、メモリ/Redis等を使い分ける
- レート制限: APIの制約に合わせてクライアント側で制御する(429/503の扱いを含む)
- 耐障害性: タイムアウト、再試行、指数バックオフ等で失敗を局所化する
- 観測性: メトリクスとログを整備し、改善を継続できる形にする
💡 実践での活用
- 少量→大量へスケールする前に、計測とキャッシュで「再現性のある改善」を作る
- まずは同時実行数の上限とタイムアウト/再試行方針を決め、API側への影響を抑える
- 取得・解析パイプラインで同じクエリが繰り返される場合は、キャッシュが効果的
🔄 継続的改善
API仕様・レート制限・データ更新頻度は変化するため、メトリクスを継続収集し、TTL/並列数/バックオフなどのパラメータを定期的に見直してください。
注: 付録Iは分割されています。全体の目次は 付録I: データベース利用の実践ガイド を参照してください。