付録I: データベース利用の実践ガイド
対象読者: バイオインフォマティクス研究でデータベースを効果的に活用したい研究者・技術者
本付録では、バイオインフォマティクス研究において重要な役割を果たす各種データベースの実践的な利用方法を、具体的なユースケースとコード例を交えて解説します。
I.1 概要:バイオインフォマティクスにおけるデータベースの重要性
I.1.1 データベースエコシステムの理解
バイオインフォマティクス研究では、多様なデータベースが相互に連携し、包括的な生物学的情報基盤を形成しています。
I.1.2 データベース選択の戦略的アプローチ
研究目的に応じた効果的なデータベース選択フレームワーク:
Step 1: 研究クエスチョンの分類
def classify_research_question(question_type, data_scope, analysis_depth):
"""
研究クエスチョンに基づくデータベース推奨システム
Args:
question_type: "functional", "structural", "evolutionary", "clinical"
data_scope: "single_gene", "pathway", "genome_wide", "multi_omics"
analysis_depth: "descriptive", "comparative", "predictive", "causal"
Returns:
dict: 推奨データベースとアクセス戦略
"""
recommendations = {
"functional": {
"single_gene": {
"descriptive": ["UniProt", "GO", "InterPro"],
"comparative": ["UniProt", "GO", "OMA"],
"predictive": ["STRING", "GO", "KEGG"],
"causal": ["GO", "KEGG", "Reactome"]
},
"pathway": {
"descriptive": ["KEGG", "Reactome", "BioCyc"],
"comparative": ["KEGG", "STRING", "GO"],
"predictive": ["KEGG", "STRING", "MetaCyc"],
"causal": ["Reactome", "KEGG", "SIGNOR"]
}
},
"structural": {
"single_gene": {
"descriptive": ["PDB", "UniProt", "Pfam"],
"comparative": ["PDB", "CATH", "SCOP"],
"predictive": ["AlphaFold", "ModBase", "I-TASSER"],
"causal": ["PDB", "CASTp", "ConCavity"]
}
},
"clinical": {
"single_gene": {
"descriptive": ["ClinVar", "OMIM", "PharmGKB"],
"comparative": ["ClinVar", "COSMIC", "ExAC"],
"predictive": ["ClinVar", "PharmGKB", "DGIdb"],
"causal": ["ClinVar", "OMIM", "DisGeNET"]
},
"genome_wide": {
"descriptive": ["GWAS Catalog", "UK Biobank", "GTEx"],
"comparative": ["GWAS Catalog", "PhenoScanner", "Open Targets"],
"predictive": ["PRS Catalog", "GWAS Catalog", "UK Biobank"],
"causal": ["Open Targets", "DisGeNET", "STRING"]
}
}
}
try:
return {
"primary_databases": recommendations[question_type][data_scope][analysis_depth],
"access_strategy": generate_access_strategy(question_type, data_scope),
"integration_approach": suggest_integration_methods(data_scope, analysis_depth)
}
except KeyError:
return {"error": "Invalid combination of parameters"}
def generate_access_strategy(question_type, data_scope):
"""データアクセス戦略の生成"""
if data_scope in ["genome_wide", "multi_omics"]:
return {
"method": "bulk_download",
"tools": ["FTP", "API", "rsync"],
"preprocessing": "required",
"storage": "local_database_recommended"
}
else:
return {
"method": "query_based",
"tools": ["REST_API", "web_interface"],
"preprocessing": "minimal",
"storage": "cache_sufficient"
}
def suggest_integration_methods(data_scope, analysis_depth):
"""データ統合手法の提案"""
integration_matrix = {
("single_gene", "descriptive"): ["manual_curation", "simple_joins"],
("single_gene", "comparative"): ["orthology_mapping", "sequence_alignment"],
("pathway", "predictive"): ["network_analysis", "enrichment_analysis"],
("genome_wide", "causal"): ["mendelian_randomization", "colocalization"],
("multi_omics", "predictive"): ["multi_modal_ML", "network_integration"]
}
return integration_matrix.get((data_scope, analysis_depth), ["custom_integration"])
# 使用例
recommendation = classify_research_question("clinical", "single_gene", "predictive")
print(f"推奨データベース: {recommendation['primary_databases']}")
print(f"アクセス戦略: {recommendation['access_strategy']['method']}")
I.2 主要データベースの実践的活用法
I.2.1 NCBI データベース群の効率的利用
GenBank/RefSeq: 配列データの取得と品質管理
from Bio import Entrez, SeqIO
import requests
import pandas as pd
from typing import List, Dict, Optional
import time
import logging
class NCBIDataRetriever:
"""NCBI データベースからの効率的なデータ取得"""
def __init__(self, email: str, api_key: Optional[str] = None):
"""
Args:
email: NCBI利用規約に必要なメールアドレス
api_key: API利用制限緩和のためのキー(推奨)
"""
Entrez.email = email
if api_key:
Entrez.api_key = api_key
# ログ設定
logging.basicConfig(level=logging.INFO)
self.logger = logging.getLogger(__name__)
# レート制限設定
self.request_delay = 0.34 if api_key else 1.0 # API keyあり: 10 req/sec, なし: 3 req/sec
def search_sequences(self, query: str, database: str = "nucleotide",
max_results: int = 100, filters: Optional[Dict] = None) -> List[str]:
"""
配列の検索とID取得
Args:
query: 検索クエリ(例:"BRCA1[Gene] AND human[Organism]")
database: 検索対象データベース
max_results: 取得する最大件数
filters: フィルタ条件
Returns:
List[str]: GenBank Accession IDs
"""
try:
# フィルタ適用
if filters:
filter_terms = []
if "organism" in filters:
filter_terms.append(f"{filters['organism']}[Organism]")
if "molecular_type" in filters:
filter_terms.append(f"{filters['molecular_type']}[Properties]")
if "date_range" in filters:
filter_terms.append(f"{filters['date_range']}[Publication Date]")
if filter_terms:
query += " AND " + " AND ".join(filter_terms)
self.logger.info(f"実行クエリ: {query}")
# 検索実行
handle = Entrez.esearch(db=database, term=query, retmax=max_results)
search_results = Entrez.read(handle)
handle.close()
id_list = search_results["IdList"]
self.logger.info(f"検索結果: {len(id_list)}件")
return id_list
except Exception as e:
self.logger.error(f"検索エラー: {e}")
raise
def fetch_sequences_batch(self, id_list: List[str], database: str = "nucleotide",
batch_size: int = 100, format: str = "fasta") -> Dict[str, str]:
"""
バッチ処理による配列データ取得
Args:
id_list: GenBank IDs
database: データベース名
batch_size: バッチサイズ
format: 出力フォーマット
Returns:
Dict[str, str]: ID -> 配列データのマッピング
"""
sequences = {}
# バッチ処理
for i in range(0, len(id_list), batch_size):
batch_ids = id_list[i:i + batch_size]
try:
self.logger.info(f"バッチ {i//batch_size + 1}: {len(batch_ids)}件処理中")
# データ取得
handle = Entrez.efetch(
db=database,
id=",".join(batch_ids),
rettype=format,
retmode="text"
)
# FASTA形式の場合
if format == "fasta":
fasta_records = SeqIO.parse(handle, "fasta")
for record in fasta_records:
sequences[record.id] = str(record.seq)
else:
# 他の形式(GenBank等)
sequences.update({id: handle.read() for id in batch_ids})
handle.close()
# レート制限
time.sleep(self.request_delay)
except Exception as e:
self.logger.error(f"バッチ処理エラー (batch {i//batch_size + 1}): {e}")
continue
return sequences
def get_gene_summary(self, gene_symbol: str, organism: str = "human") -> Dict:
"""
遺伝子情報の包括的取得
Args:
gene_symbol: 遺伝子シンボル(例:BRCA1)
organism: 生物種
Returns:
Dict: 遺伝子情報
"""
try:
# Gene データベースで検索
gene_query = f"{gene_symbol}[Gene] AND {organism}[Organism]"
gene_handle = Entrez.esearch(db="gene", term=gene_query, retmax=1)
gene_results = Entrez.read(gene_handle)
gene_handle.close()
if not gene_results["IdList"]:
return {"error": f"遺伝子 {gene_symbol} が見つかりません"}
gene_id = gene_results["IdList"][0]
# 詳細情報取得
summary_handle = Entrez.esummary(db="gene", id=gene_id)
summary = Entrez.read(summary_handle)[0]
summary_handle.close()
# 関連配列情報取得
link_handle = Entrez.elink(dbfrom="gene", db="nucleotide", id=gene_id)
link_results = Entrez.read(link_handle)
link_handle.close()
nucleotide_ids = []
if link_results[0]["LinkSetDb"]:
nucleotide_ids = [link["Id"] for link in link_results[0]["LinkSetDb"][0]["Link"]]
return {
"gene_id": gene_id,
"symbol": summary.get("Name", ""),
"description": summary.get("Description", ""),
"summary": summary.get("Summary", ""),
"chromosome": summary.get("Chromosome", ""),
"map_location": summary.get("MapLocation", ""),
"gene_type": summary.get("GeneType", ""),
"associated_sequences": len(nucleotide_ids),
"nucleotide_ids": nucleotide_ids[:10] # 最初の10件のみ
}
except Exception as e:
self.logger.error(f"遺伝子情報取得エラー: {e}")
return {"error": str(e)}
# 使用例
if __name__ == "__main__":
# 初期化(実際の利用時はメールアドレスとAPIキーを設定)
retriever = NCBIDataRetriever(
email="your.email@example.com",
api_key="your_api_key_here" # オプション
)
# 1. BRCA1遺伝子の基本情報取得
brca1_info = retriever.get_gene_summary("BRCA1", "human")
print("BRCA1遺伝子情報:")
print(f"- 説明: {brca1_info.get('description', 'N/A')}")
print(f"- 染色体: {brca1_info.get('chromosome', 'N/A')}")
print(f"- 関連配列数: {brca1_info.get('associated_sequences', 'N/A')}")
# 2. COVID-19関連配列の検索と取得
covid_filters = {
"organism": "SARS-CoV-2",
"molecular_type": "genomic RNA",
"date_range": "2020/01/01:2024/12/31"
}
covid_ids = retriever.search_sequences(
query="complete genome",
database="nucleotide",
max_results=50,
filters=covid_filters
)
if covid_ids:
# バッチで配列取得
covid_sequences = retriever.fetch_sequences_batch(
covid_ids[:10], # テスト用に10件のみ
batch_size=5
)
print(f"\n取得したCOVID-19配列: {len(covid_sequences)}件")
for seq_id, sequence in list(covid_sequences.items())[:3]:
print(f"- {seq_id}: {len(sequence)} bp")
SRA: シークエンシングデータの効率的アクセス
import subprocess
import os
import pandas as pd
from pathlib import Path
import xml.etree.ElementTree as ET
import requests
from typing import List, Dict, Optional
import concurrent.futures
import hashlib
class SRADataManager:
"""SRA(Sequence Read Archive)データの効率的管理"""
def __init__(self, work_dir: str = "./sra_data", max_workers: int = 4):
"""
Args:
work_dir: 作業ディレクトリ
max_workers: 並列ダウンロード数
"""
self.work_dir = Path(work_dir)
self.work_dir.mkdir(exist_ok=True)
self.max_workers = max_workers
# SRA Toolkitの確認
self._check_sra_toolkit()
def _check_sra_toolkit(self):
"""SRA Toolkitのインストール確認"""
try:
result = subprocess.run(["fastq-dump", "--version"],
capture_output=True, text=True)
if result.returncode == 0:
print(f"SRA Toolkit確認済み: {result.stdout.strip()}")
else:
raise FileNotFoundError
except FileNotFoundError:
print("警告: SRA Toolkitが見つかりません")
print("インストール方法: conda install -c bioconda sra-tools")
def search_sra_studies(self, query: str, max_results: int = 100) -> pd.DataFrame:
"""
SRAスタディの検索
Args:
query: 検索クエリ(例:"RNA-seq AND human AND cancer")
max_results: 最大取得件数
Returns:
pd.DataFrame: 検索結果
"""
try:
# ESearchでSRAスタディを検索
search_url = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/esearch.fcgi"
search_params = {
"db": "sra",
"term": query,
"retmax": max_results,
"retmode": "xml"
}
response = requests.get(search_url, params=search_params)
root = ET.fromstring(response.content)
# SRA IDリストを取得
sra_ids = [id_elem.text for id_elem in root.findall(".//Id")]
if not sra_ids:
print("検索結果なし")
return pd.DataFrame()
# 詳細情報を取得
summary_url = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/esummary.fcgi"
summary_params = {
"db": "sra",
"id": ",".join(sra_ids),
"retmode": "xml"
}
response = requests.get(summary_url, params=summary_params)
root = ET.fromstring(response.content)
# 結果をパース
studies = []
for doc_sum in root.findall(".//DocSum"):
study_info = {"SRA_ID": doc_sum.find("Id").text}
for item in doc_sum.findall(".//Item"):
name = item.get("Name")
if name in ["Title", "Platform", "Organism", "LibraryStrategy",
"LibrarySource", "SampleAccession", "StudyAccession"]:
study_info[name] = item.text or ""
studies.append(study_info)
df = pd.DataFrame(studies)
print(f"検索結果: {len(df)}件のSRAスタディを取得")
return df
except Exception as e:
print(f"SRA検索エラー: {e}")
return pd.DataFrame()
def get_run_info(self, study_accession: str) -> pd.DataFrame:
"""
スタディに含まれるランの詳細情報を取得
Args:
study_accession: SRAスタディアクセッション(例:SRP123456)
Returns:
pd.DataFrame: ラン情報
"""
try:
# RunInfoを取得
url = f"https://www.ncbi.nlm.nih.gov/Traces/sra/sra.cgi?save=efetch&db=sra&rettype=runinfo&term={study_accession}"
response = requests.get(url)
if response.status_code == 200:
# CSVデータをDataFrameに変換
from io import StringIO
df = pd.read_csv(StringIO(response.text))
# 重要な列のみ選択
important_cols = [
"Run", "SampleName", "Experiment", "LibraryStrategy",
"LibrarySource", "Platform", "Instrument", "InsertSize",
"LibraryLayout", "spots", "bases", "download_path"
]
available_cols = [col for col in important_cols if col in df.columns]
df_filtered = df[available_cols]
print(f"ラン情報取得: {len(df_filtered)}件")
return df_filtered
else:
print(f"RunInfo取得失敗: HTTP {response.status_code}")
return pd.DataFrame()
except Exception as e:
print(f"RunInfo取得エラー: {e}")
return pd.DataFrame()
def download_fastq(self, run_accession: str, output_dir: Optional[str] = None,
paired: bool = True, compressed: bool = True) -> Dict[str, str]:
"""
FASTQファイルのダウンロード
Args:
run_accession: ランアクセッション(例:SRR123456)
output_dir: 出力ディレクトリ
paired: ペアエンドデータの場合True
compressed: 圧縮ファイルで保存する場合True
Returns:
Dict[str, str]: ダウンロードしたファイルのパス
"""
if output_dir is None:
output_dir = self.work_dir / "fastq"
output_dir = Path(output_dir)
output_dir.mkdir(exist_ok=True)
try:
# fastq-dumpコマンド構築
cmd = ["fastq-dump"]
if paired:
cmd.extend(["--split-files"]) # ペアエンドファイルを分割
if compressed:
cmd.extend(["--gzip"]) # 圧縮して保存
# 出力ディレクトリ指定
cmd.extend(["--outdir", str(output_dir)])
# ランアクセッション
cmd.append(run_accession)
print(f"FASTQダウンロード開始: {run_accession}")
print(f"コマンド: {' '.join(cmd)}")
# 実行
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
# ダウンロードされたファイルを確認
downloaded_files = {}
suffix = ".fastq.gz" if compressed else ".fastq"
if paired:
# ペアエンドファイル
for i in [1, 2]:
filename = f"{run_accession}_{i}{suffix}"
filepath = output_dir / filename
if filepath.exists():
downloaded_files[f"read_{i}"] = str(filepath)
else:
# シングルエンドファイル
filename = f"{run_accession}{suffix}"
filepath = output_dir / filename
if filepath.exists():
downloaded_files["reads"] = str(filepath)
print(f"ダウンロード完了: {len(downloaded_files)}ファイル")
return downloaded_files
else:
print(f"ダウンロードエラー: {result.stderr}")
return {}
except Exception as e:
print(f"FASTQダウンロードエラー: {e}")
return {}
def batch_download(self, run_list: List[str], max_concurrent: int = None) -> Dict[str, Dict]:
"""
複数ランの並列ダウンロード
Args:
run_list: ランアクセッションのリスト
max_concurrent: 最大並列数
Returns:
Dict[str, Dict]: ラン別ダウンロード結果
"""
if max_concurrent is None:
max_concurrent = self.max_workers
results = {}
with concurrent.futures.ThreadPoolExecutor(max_workers=max_concurrent) as executor:
# 並列ダウンロード開始
future_to_run = {
executor.submit(self.download_fastq, run): run
for run in run_list
}
for future in concurrent.futures.as_completed(future_to_run):
run = future_to_run[future]
try:
result = future.result()
results[run] = result
print(f"完了: {run}")
except Exception as e:
print(f"エラー {run}: {e}")
results[run] = {"error": str(e)}
return results
def verify_download_integrity(self, file_path: str, expected_md5: str = None) -> bool:
"""
ダウンロードファイルの整合性確認
Args:
file_path: ファイルパス
expected_md5: 期待されるMD5ハッシュ
Returns:
bool: 整合性確認結果
"""
if not os.path.exists(file_path):
return False
if expected_md5:
# MD5ハッシュ計算
hasher = hashlib.md5()
with open(file_path, 'rb') as f:
for chunk in iter(lambda: f.read(4096), b""):
hasher.update(chunk)
actual_md5 = hasher.hexdigest()
return actual_md5 == expected_md5
# MD5が提供されていない場合はファイル存在確認のみ
return True
# 使用例
if __name__ == "__main__":
# SRAデータマネージャー初期化
sra_manager = SRADataManager(work_dir="./sra_analysis", max_workers=2)
# 1. COVID-19関連RNA-seqデータの検索
covid_studies = sra_manager.search_sra_studies(
query="COVID-19 AND RNA-seq AND human",
max_results=20
)
if not covid_studies.empty:
print("\n検索結果(最初の5件):")
print(covid_studies.head()[["StudyAccession", "Title", "Platform"]].to_string())
# 2. 特定のスタディのラン情報取得
first_study = covid_studies.iloc[0]["StudyAccession"]
run_info = sra_manager.get_run_info(first_study)
if not run_info.empty:
print(f"\n{first_study}のラン情報:")
print(run_info.head()[["Run", "LibraryStrategy", "spots", "bases"]].to_string())
# 3. 小さなサンプルをダウンロード(テスト用)
test_runs = run_info.head(2)["Run"].tolist()
print(f"\nテストダウンロード開始: {test_runs}")
# 個別ダウンロード
for run in test_runs:
result = sra_manager.download_fastq(run, compressed=True)
if result:
print(f"ダウンロード成功 {run}: {list(result.keys())}")
else:
print(f"ダウンロード失敗: {run}")
I.2.2 UniProt データベースの高度な活用
import requests
import pandas as pd
import json
from typing import List, Dict, Optional, Union
import time
import re
from urllib.parse import urlencode
import xml.etree.ElementTree as ET
class UniProtAnalyzer:
"""UniProtタンパク質データベースの包括的解析ツール"""
def __init__(self, rate_limit: float = 1.0):
"""
Args:
rate_limit: リクエスト間の待機時間(秒)
"""
self.base_url = "https://rest.uniprot.org"
self.rate_limit = rate_limit
self.session = requests.Session()
# よく使用されるフィールド定義
self.common_fields = {
"basic": [
"accession", "id", "gene_names", "protein_name",
"organism_name", "length", "mass"
],
"sequence": [
"accession", "sequence", "length", "mass",
"cc_subcellular_location", "ft_domain"
],
"function": [
"accession", "protein_name", "cc_function",
"go_c", "go_f", "go_p", "cc_pathway"
],
"disease": [
"accession", "gene_names", "cc_disease",
"cc_involvement_in_disease", "cc_allergen", "cc_toxic_dose"
],
"structure": [
"accession", "ft_helix", "ft_strand", "ft_turn",
"ft_disulfid", "xref_pdb", "cc_similarity"
]
}
def search_proteins(self, query: str, organism: Optional[str] = None,
reviewed: Optional[bool] = None, max_results: int = 100,
fields: List[str] = None) -> pd.DataFrame:
"""
タンパク質の検索
Args:
query: 検索クエリ(Gene name, protein name, keywords等)
organism: 生物種(例:"human", "mouse", "9606")
reviewed: Swiss-Protのみ検索する場合True
max_results: 最大取得件数
fields: 取得するフィールドのリスト
Returns:
pd.DataFrame: 検索結果
"""
# クエリ構築
search_terms = [query]
if organism:
if organism.isdigit():
search_terms.append(f"taxonomy_id:{organism}")
else:
search_terms.append(f"organism:{organism}")
if reviewed is not None:
search_terms.append("reviewed:true" if reviewed else "reviewed:false")
final_query = " AND ".join(search_terms)
# フィールド設定
if fields is None:
fields = self.common_fields["basic"]
# APIリクエスト
params = {
"query": final_query,
"format": "tsv",
"fields": ",".join(fields),
"size": min(max_results, 500) # API制限
}
try:
response = self.session.get(f"{self.base_url}/uniprotkb/search", params=params)
response.raise_for_status()
# TSVデータをDataFrameに変換
from io import StringIO
df = pd.read_csv(StringIO(response.text), sep='\t')
print(f"検索結果: {len(df)}件のタンパク質")
return df
except requests.exceptions.RequestException as e:
print(f"UniProt検索エラー: {e}")
return pd.DataFrame()
finally:
time.sleep(self.rate_limit)
def get_protein_details(self, accession: str,
include_features: bool = True,
include_interactions: bool = True) -> Dict:
"""
特定タンパク質の詳細情報取得
Args:
accession: UniProtアクセッション
include_features: 特徴情報を含める場合True
include_interactions: 相互作用情報を含める場合True
Returns:
Dict: タンパク質詳細情報
"""
try:
# 基本情報取得
response = self.session.get(f"{self.base_url}/uniprotkb/{accession}")
response.raise_for_status()
protein_data = response.json()
# 構造化された情報を抽出
details = {
"accession": accession,
"entry_name": protein_data.get("uniProtkbId", ""),
"protein_names": self._extract_protein_names(protein_data),
"gene_names": self._extract_gene_names(protein_data),
"organism": self._extract_organism(protein_data),
"sequence_info": self._extract_sequence_info(protein_data),
"subcellular_location": self._extract_subcellular_location(protein_data),
"function": self._extract_function(protein_data),
"go_annotations": self._extract_go_annotations(protein_data),
"pathways": self._extract_pathways(protein_data),
"diseases": self._extract_diseases(protein_data)
}
# オプション情報
if include_features:
details["features"] = self._extract_features(protein_data)
if include_interactions:
details["interactions"] = self.get_protein_interactions(accession)
return details
except requests.exceptions.RequestException as e:
print(f"タンパク質詳細取得エラー: {e}")
return {}
finally:
time.sleep(self.rate_limit)
def _extract_protein_names(self, data: Dict) -> Dict:
"""タンパク質名の抽出"""
names = {"recommended": "", "alternative": [], "short": []}
if "proteinDescription" in data:
desc = data["proteinDescription"]
if "recommendedName" in desc:
names["recommended"] = desc["recommendedName"].get("fullName", {}).get("value", "")
if "alternativeNames" in desc:
for alt in desc["alternativeNames"]:
if "fullName" in alt:
names["alternative"].append(alt["fullName"].get("value", ""))
if "shortNames" in alt:
names["short"].extend([sn.get("value", "") for sn in alt["shortNames"]])
return names
def _extract_gene_names(self, data: Dict) -> Dict:
"""遺伝子名の抽出"""
genes = {"primary": "", "synonyms": [], "ordered_locus": [], "orf": []}
if "genes" in data:
for gene in data["genes"]:
if gene.get("geneName"):
genes["primary"] = gene["geneName"].get("value", "")
if "synonyms" in gene:
genes["synonyms"].extend([syn.get("value", "") for syn in gene["synonyms"]])
if "orderedLocusNames" in gene:
genes["ordered_locus"].extend([oln.get("value", "") for oln in gene["orderedLocusNames"]])
if "orfNames" in gene:
genes["orf"].extend([orf.get("value", "") for orf in gene["orfNames"]])
return genes
def _extract_organism(self, data: Dict) -> Dict:
"""生物種情報の抽出"""
organism = {"scientific_name": "", "common_name": "", "taxonomy_id": 0}
if "organism" in data:
org = data["organism"]
organism["scientific_name"] = org.get("scientificName", "")
organism["common_name"] = org.get("commonName", "")
organism["taxonomy_id"] = org.get("taxonId", 0)
return organism
def _extract_sequence_info(self, data: Dict) -> Dict:
"""配列情報の抽出"""
seq_info = {"length": 0, "mass": 0, "checksum": "", "sequence": ""}
if "sequence" in data:
seq = data["sequence"]
seq_info["length"] = seq.get("length", 0)
seq_info["mass"] = seq.get("molWeight", 0)
seq_info["checksum"] = seq.get("crc64", "")
seq_info["sequence"] = seq.get("value", "")
return seq_info
def _extract_go_annotations(self, data: Dict) -> Dict:
"""Gene Ontologyアノテーションの抽出"""
go_terms = {"molecular_function": [], "biological_process": [], "cellular_component": []}
if "uniProtKBCrossReferences" in data:
for ref in data["uniProtKBCrossReferences"]:
if ref.get("database") == "GO":
go_id = ref.get("id", "")
go_desc = ""
go_aspect = ""
if "properties" in ref:
for prop in ref["properties"]:
if prop.get("key") == "GoTerm":
go_desc = prop.get("value", "").split(":")[1] if ":" in prop.get("value", "") else ""
elif prop.get("key") == "GoEvidenceType":
go_aspect = prop.get("value", "")
go_entry = {"id": go_id, "term": go_desc, "evidence": go_aspect}
# GO aspectに基づく分類
if go_id.startswith("GO:"):
if "F:" in go_desc:
go_terms["molecular_function"].append(go_entry)
elif "P:" in go_desc:
go_terms["biological_process"].append(go_entry)
elif "C:" in go_desc:
go_terms["cellular_component"].append(go_entry)
return go_terms
def get_protein_interactions(self, accession: str, max_interactions: int = 50) -> List[Dict]:
"""
タンパク質相互作用情報の取得
Args:
accession: UniProtアクセッション
max_interactions: 最大取得相互作用数
Returns:
List[Dict]: 相互作用情報
"""
try:
# STRING データベースAPIを使用
string_url = "https://string-db.org/api/json/network"
params = {
"identifiers": accession,
"species": 9606, # human
"limit": max_interactions
}
response = self.session.get(string_url, params=params)
response.raise_for_status()
interactions = []
for interaction in response.json():
interactions.append({
"partner_a": interaction.get("preferredName_A", ""),
"partner_b": interaction.get("preferredName_B", ""),
"score": interaction.get("score", 0),
"interaction_type": "protein-protein"
})
return interactions
except requests.exceptions.RequestException as e:
print(f"相互作用情報取得エラー: {e}")
return []
finally:
time.sleep(self.rate_limit)
def analyze_protein_family(self, gene_family: str, organisms: List[str] = None) -> pd.DataFrame:
"""
タンパク質ファミリーの比較解析
Args:
gene_family: 遺伝子ファミリー名(例:"histone", "kinase")
organisms: 対象生物種のリスト
Returns:
pd.DataFrame: ファミリー解析結果
"""
if organisms is None:
organisms = ["human", "mouse", "rat", "zebrafish"]
family_data = []
for organism in organisms:
print(f"{organism}で{gene_family}ファミリーを検索中...")
# ファミリー検索
results = self.search_proteins(
query=gene_family,
organism=organism,
reviewed=True,
max_results=100,
fields=self.common_fields["basic"] + ["cc_function", "go_f"]
)
if not results.empty:
results["organism"] = organism
results["family"] = gene_family
family_data.append(results)
time.sleep(self.rate_limit)
if family_data:
combined_df = pd.concat(family_data, ignore_index=True)
# ファミリー統計
print(f"\n{gene_family}ファミリー解析結果:")
print(f"- 総タンパク質数: {len(combined_df)}")
print(f"- 生物種別分布:")
print(combined_df["organism"].value_counts().to_string())
return combined_df
else:
return pd.DataFrame()
def functional_enrichment_analysis(self, protein_list: List[str]) -> Dict:
"""
タンパク質リストの機能的濃縮解析
Args:
protein_list: UniProtアクセッションのリスト
Returns:
Dict: 濃縮解析結果
"""
# タンパク質の詳細情報を取得
proteins_data = []
for accession in protein_list[:20]: # 制限
details = self.get_protein_details(accession, include_features=False)
if details:
proteins_data.append(details)
time.sleep(self.rate_limit)
if not proteins_data:
return {}
# GO term集計
go_counts = {"molecular_function": {}, "biological_process": {}, "cellular_component": {}}
for protein in proteins_data:
go_annotations = protein.get("go_annotations", {})
for category, terms in go_annotations.items():
for term in terms:
term_id = term.get("term", "Unknown")
if term_id in go_counts[category]:
go_counts[category][term_id] += 1
else:
go_counts[category][term_id] = 1
# パスウェイ集計
pathway_counts = {}
for protein in proteins_data:
pathways = protein.get("pathways", [])
for pathway in pathways:
if pathway in pathway_counts:
pathway_counts[pathway] += 1
else:
pathway_counts[pathway] = 1
# 結果をランク順にソート
enrichment_results = {
"go_molecular_function": sorted(go_counts["molecular_function"].items(),
key=lambda x: x[1], reverse=True)[:10],
"go_biological_process": sorted(go_counts["biological_process"].items(),
key=lambda x: x[1], reverse=True)[:10],
"go_cellular_component": sorted(go_counts["cellular_component"].items(),
key=lambda x: x[1], reverse=True)[:10],
"pathways": sorted(pathway_counts.items(), key=lambda x: x[1], reverse=True)[:10],
"total_proteins": len(proteins_data)
}
return enrichment_results
# 使用例
if __name__ == "__main__":
# UniProt解析ツール初期化
uniprot = UniProtAnalyzer(rate_limit=1.0)
# 1. BRCA1タンパク質の詳細解析
print("=== BRCA1タンパク質詳細解析 ===")
brca1_details = uniprot.get_protein_details("P38398",
include_features=True,
include_interactions=True)
if brca1_details:
print(f"タンパク質名: {brca1_details['protein_names']['recommended']}")
print(f"遺伝子名: {brca1_details['gene_names']['primary']}")
print(f"配列長: {brca1_details['sequence_info']['length']} aa")
print(f"分子量: {brca1_details['sequence_info']['mass']} Da")
print(f"相互作用数: {len(brca1_details.get('interactions', []))}")
# 2. キナーゼファミリーの比較解析
print("\n=== キナーゼファミリー比較解析 ===")
kinase_family = uniprot.analyze_protein_family(
gene_family="protein kinase",
organisms=["human", "mouse"]
)
if not kinase_family.empty:
print("\n上位10タンパク質:")
print(kinase_family.head(10)[["Entry", "Gene Names", "Protein names", "organism"]].to_string())
# 3. がん関連タンパク質の機能的濃縮解析
print("\n=== がん関連タンパク質の機能的濃縮解析 ===")
cancer_proteins = ["P53_HUMAN", "P38398", "P04637", "P21359", "Q02952"] # 例
# アクセッション形式に変換
cancer_search = uniprot.search_proteins("cancer AND tumor suppressor",
organism="human",
reviewed=True,
max_results=20)
if not cancer_search.empty:
cancer_accessions = cancer_search["Entry"].tolist()[:10]
enrichment = uniprot.functional_enrichment_analysis(cancer_accessions)
print(f"解析対象タンパク質数: {enrichment.get('total_proteins', 0)}")
print("\n上位GO Molecular Function:")
for term, count in enrichment.get("go_molecular_function", [])[:5]:
print(f" {term}: {count}件")
print("\n上位GO Biological Process:")
for term, count in enrichment.get("go_biological_process", [])[:5]:
print(f" {term}: {count}件")
I.3 専門データベースの戦略的活用
I.3.1 臨床データベースの統合的活用
ClinVar + COSMIC + PharmGKB の統合解析
import requests
import pandas as pd
import json
from typing import List, Dict, Optional
import time
import sqlite3
from pathlib import Path
import xml.etree.ElementTree as ET
class ClinicalVariantAnalyzer:
"""臨床変異データベースの統合解析プラットフォーム"""
def __init__(self, cache_dir: str = "./clinical_cache"):
"""
Args:
cache_dir: データキャッシュディレクトリ
"""
self.cache_dir = Path(cache_dir)
self.cache_dir.mkdir(exist_ok=True)
# SQLiteキャッシュデータベース
self.db_path = self.cache_dir / "clinical_variants.db"
self._init_cache_db()
# API設定
self.apis = {
"clinvar": "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/",
"cosmic": "https://cancer.sanger.ac.uk/cosmic/",
"pharmgkb": "https://api.pharmgkb.org/v1/",
"ensembl": "https://rest.ensembl.org/"
}
self.rate_limits = {
"clinvar": 1.0, # 1秒間隔
"cosmic": 2.0, # 2秒間隔
"pharmgkb": 1.5, # 1.5秒間隔
"ensembl": 0.5 # 0.5秒間隔
}
def _init_cache_db(self):
"""キャッシュデータベースの初期化"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# ClinVarテーブル
cursor.execute('''
CREATE TABLE IF NOT EXISTS clinvar_variants (
id INTEGER PRIMARY KEY,
variation_id TEXT UNIQUE,
gene_symbol TEXT,
hgvs_c TEXT,
hgvs_p TEXT,
clinical_significance TEXT,
condition TEXT,
review_status TEXT,
last_updated TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# COSMICテーブル
cursor.execute('''
CREATE TABLE IF NOT EXISTS cosmic_variants (
id INTEGER PRIMARY KEY,
cosmic_id TEXT UNIQUE,
gene_symbol TEXT,
mutation_type TEXT,
mutation_description TEXT,
tissue_type TEXT,
histology TEXT,
primary_site TEXT,
sample_source TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# PharmGKBテーブル
cursor.execute('''
CREATE TABLE IF NOT EXISTS pharmgkb_variants (
id INTEGER PRIMARY KEY,
rsid TEXT,
gene_symbol TEXT,
variant_annotation TEXT,
drug_association TEXT,
phenotype TEXT,
evidence_level TEXT,
population TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
conn.commit()
conn.close()
def search_clinvar_variants(self, gene_symbol: str, max_results: int = 100) -> pd.DataFrame:
"""
ClinVarでの変異検索
Args:
gene_symbol: 遺伝子シンボル
max_results: 最大取得件数
Returns:
pd.DataFrame: ClinVar変異データ
"""
try:
# ESearch APIで変異ID取得
search_url = f"{self.apis['clinvar']}esearch.fcgi"
search_params = {
"db": "clinvar",
"term": f"{gene_symbol}[gene] AND single_gene[prop]",
"retmax": max_results,
"retmode": "json"
}
response = requests.get(search_url, params=search_params)
response.raise_for_status()
search_data = response.json()
if not search_data["esearchresult"]["idlist"]:
print(f"ClinVarで{gene_symbol}の変異が見つかりませんでした")
return pd.DataFrame()
variant_ids = search_data["esearchresult"]["idlist"]
# ESummary APIで詳細情報取得
summary_url = f"{self.apis['clinvar']}esummary.fcgi"
summary_params = {
"db": "clinvar",
"id": ",".join(variant_ids),
"retmode": "json"
}
time.sleep(self.rate_limits["clinvar"])
response = requests.get(summary_url, params=summary_params)
response.raise_for_status()
summary_data = response.json()
# データ整形
variants = []
for variant_id, data in summary_data["result"].items():
if variant_id == "uids":
continue
variant_info = {
"variation_id": variant_id,
"gene_symbol": gene_symbol,
"title": data.get("title", ""),
"clinical_significance": data.get("clinical_significance", {}).get("description", ""),
"review_status": data.get("clinical_significance", {}).get("review_status", ""),
"condition": data.get("trait_set", [{}])[0].get("trait_name", "") if data.get("trait_set") else "",
"variation_type": data.get("variation_type", ""),
"last_updated": data.get("last_updated", "")
}
variants.append(variant_info)
df = pd.DataFrame(variants)
print(f"ClinVar検索結果: {len(df)}件の変異")
# キャッシュに保存
self._cache_clinvar_data(df)
return df
except requests.exceptions.RequestException as e:
print(f"ClinVar検索エラー: {e}")
return pd.DataFrame()
def search_cosmic_variants(self, gene_symbol: str, cancer_type: Optional[str] = None) -> pd.DataFrame:
"""
COSMICでのがん変異検索
Args:
gene_symbol: 遺伝子シンボル
cancer_type: がん種(オプション)
Returns:
pd.DataFrame: COSMIC変異データ
"""
# 注意: 実際のCOSMIC APIは認証が必要です。ここではダミーデータを生成
print(f"COSMIC検索: {gene_symbol}")
# ダミーデータ生成(実際の実装では認証付きAPIアクセス)
dummy_data = []
for i in range(10):
dummy_data.append({
"cosmic_id": f"COSM{1000000 + i}",
"gene_symbol": gene_symbol,
"mutation_type": "Substitution - Missense",
"mutation_description": f"c.{100 + i*10}G>A p.V{34 + i}M",
"tissue_type": "Carcinoma",
"histology": "adenocarcinoma",
"primary_site": "lung" if cancer_type == "lung" else "breast",
"sample_source": "surgical resection"
})
df = pd.DataFrame(dummy_data)
print(f"COSMIC検索結果: {len(df)}件の変異")
return df
def search_pharmgkb_variants(self, gene_symbol: str) -> pd.DataFrame:
"""
PharmGKBでの薬理遺伝学変異検索
Args:
gene_symbol: 遺伝子シンボル
Returns:
pd.DataFrame: PharmGKB変異データ
"""
try:
# PharmGKB REST API
api_url = f"{self.apis['pharmgkb']}data/variantAnnotation"
params = {
"gene": gene_symbol,
"format": "json"
}
# 注意: 実際のAPIは認証が必要な場合があります
# ここではダミーデータで代替
print(f"PharmGKB検索: {gene_symbol}")
dummy_pharmgkb_data = []
if gene_symbol.upper() in ["CYP2D6", "CYP2C19", "SLCO1B1", "DPYD"]:
for i, drug in enumerate(["warfarin", "clopidogrel", "simvastatin"][:3]):
dummy_pharmgkb_data.append({
"rsid": f"rs{1000000 + i}",
"gene_symbol": gene_symbol,
"variant_annotation": f"{gene_symbol}*{i+2}",
"drug_association": drug,
"phenotype": "altered drug metabolism",
"evidence_level": "1A",
"population": "European"
})
df = pd.DataFrame(dummy_pharmgkb_data)
print(f"PharmGKB検索結果: {len(df)}件の薬理遺伝学変異")
return df
except Exception as e:
print(f"PharmGKB検索エラー: {e}")
return pd.DataFrame()
def integrated_variant_analysis(self, gene_symbol: str,
include_population_data: bool = True) -> Dict:
"""
統合的変異解析
Args:
gene_symbol: 遺伝子シンボル
include_population_data: 集団データを含める場合True
Returns:
Dict: 統合解析結果
"""
print(f"\n=== {gene_symbol} 統合変異解析 ===")
# 各データベースから情報取得
clinvar_data = self.search_clinvar_variants(gene_symbol, max_results=50)
cosmic_data = self.search_cosmic_variants(gene_symbol)
pharmgkb_data = self.search_pharmgkb_variants(gene_symbol)
# 集団データ(gnomAD等からの頻度情報)
population_data = {}
if include_population_data:
population_data = self._get_population_frequencies(gene_symbol)
# 統合解析結果
analysis_results = {
"gene_symbol": gene_symbol,
"total_clinvar_variants": len(clinvar_data),
"total_cosmic_variants": len(cosmic_data),
"total_pharmgkb_variants": len(pharmgkb_data),
"clinvar_significance_distribution": {},
"cosmic_cancer_distribution": {},
"pharmgkb_drug_associations": {},
"high_impact_variants": [],
"population_frequencies": population_data
}
# ClinVar臨床的意義分布
if not clinvar_data.empty:
significance_counts = clinvar_data["clinical_significance"].value_counts()
analysis_results["clinvar_significance_distribution"] = significance_counts.to_dict()
# 高影響度変異の特定
high_impact = clinvar_data[
clinvar_data["clinical_significance"].str.contains(
"Pathogenic|Likely pathogenic", na=False, case=False
)
]
analysis_results["high_impact_variants"] = high_impact.to_dict('records')
# COSMICがん種分布
if not cosmic_data.empty:
cancer_counts = cosmic_data["primary_site"].value_counts()
analysis_results["cosmic_cancer_distribution"] = cancer_counts.to_dict()
# PharmGKB薬物関連
if not pharmgkb_data.empty:
drug_counts = pharmgkb_data["drug_association"].value_counts()
analysis_results["pharmgkb_drug_associations"] = drug_counts.to_dict()
return analysis_results
def _get_population_frequencies(self, gene_symbol: str) -> Dict:
"""集団頻度データの取得(gnomAD等)"""
# Ensembl REST APIを使用した例
try:
url = f"{self.apis['ensembl']}lookup/symbol/homo_sapiens/{gene_symbol}"
params = {"expand": "1"}
response = requests.get(url, params=params,
headers={"Content-Type": "application/json"})
if response.status_code == 200:
gene_data = response.json()
return {
"ensembl_gene_id": gene_data.get("id", ""),
"chromosome": gene_data.get("seq_region_name", ""),
"start": gene_data.get("start", 0),
"end": gene_data.get("end", 0),
"strand": gene_data.get("strand", 0),
"biotype": gene_data.get("biotype", "")
}
else:
return {"error": f"Ensembl lookup failed: {response.status_code}"}
except Exception as e:
return {"error": f"Population data error: {e}"}
def _cache_clinvar_data(self, df: pd.DataFrame):
"""ClinVarデータのキャッシュ保存"""
if df.empty:
return
conn = sqlite3.connect(self.db_path)
for _, row in df.iterrows():
try:
conn.execute('''
INSERT OR REPLACE INTO clinvar_variants
(variation_id, gene_symbol, clinical_significance,
condition, review_status, last_updated)
VALUES (?, ?, ?, ?, ?, ?)
''', (
row.get("variation_id", ""),
row.get("gene_symbol", ""),
row.get("clinical_significance", ""),
row.get("condition", ""),
row.get("review_status", ""),
row.get("last_updated", "")
))
except Exception as e:
print(f"キャッシュ保存エラー: {e}")
conn.commit()
conn.close()
def generate_clinical_report(self, gene_symbol: str, output_format: str = "markdown") -> str:
"""
臨床変異レポートの生成
Args:
gene_symbol: 遺伝子シンボル
output_format: 出力形式("markdown", "html", "json")
Returns:
str: レポート内容
"""
# 統合解析実行
analysis = self.integrated_variant_analysis(gene_symbol)
if output_format == "markdown":
report = f"""# {gene_symbol} 臨床変異解析レポート
## 概要
- **遺伝子**: {gene_symbol}
- **ClinVar変異数**: {analysis['total_clinvar_variants']}
- **COSMIC変異数**: {analysis['total_cosmic_variants']}
- **PharmGKB薬理遺伝学変異数**: {analysis['total_pharmgkb_variants']}
## ClinVar臨床的意義分布
"""
for significance, count in analysis["clinvar_significance_distribution"].items():
report += f"- {significance}: {count}件\n"
report += "\n## 高影響度変異\n"
for i, variant in enumerate(analysis["high_impact_variants"][:5], 1):
report += f"{i}. {variant.get('title', 'N/A')} - {variant.get('clinical_significance', 'N/A')}\n"
if analysis["cosmic_cancer_distribution"]:
report += "\n## COSMIC がん種分布\n"
for cancer, count in analysis["cosmic_cancer_distribution"].items():
report += f"- {cancer}: {count}件\n"
if analysis["pharmgkb_drug_associations"]:
report += "\n## PharmGKB 薬物関連\n"
for drug, count in analysis["pharmgkb_drug_associations"].items():
report += f"- {drug}: {count}件\n"
report += f"\n---\n生成日時: {pd.Timestamp.now().strftime('%Y-%m-%d %H:%M:%S')}\n"
return report
elif output_format == "json":
return json.dumps(analysis, indent=2, ensure_ascii=False)
else:
return str(analysis)
# 使用例
if __name__ == "__main__":
# 臨床変異解析器の初期化
analyzer = ClinicalVariantAnalyzer(cache_dir="./clinical_analysis_cache")
# 主要ながん関連遺伝子の解析
cancer_genes = ["BRCA1", "TP53", "EGFR", "KRAS"]
for gene in cancer_genes:
print(f"\n{'='*50}")
print(f"{gene} 遺伝子の臨床変異解析")
print(f"{'='*50}")
# 統合解析実行
results = analyzer.integrated_variant_analysis(gene, include_population_data=True)
# 結果要約
print(f"\n解析結果サマリー:")
print(f"- ClinVar変異: {results['total_clinvar_variants']}件")
print(f"- COSMIC変異: {results['total_cosmic_variants']}件")
print(f"- PharmGKB変異: {results['total_pharmgkb_variants']}件")
# レポート生成
report = analyzer.generate_clinical_report(gene, output_format="markdown")
# ファイル出力
report_file = Path(f"./clinical_report_{gene}.md")
with open(report_file, 'w', encoding='utf-8') as f:
f.write(report)
print(f"レポート保存: {report_file}")
# 待機
time.sleep(2)
print("\n全ての解析が完了しました。")
I.4 実践的なワークフロー構築
I.4.1 マルチデータベース統合解析パイプライン
import asyncio
import aiohttp
import pandas as pd
import numpy as np
from typing import List, Dict, Optional, Tuple
import json
import sqlite3
from pathlib import Path
import logging
from dataclasses import dataclass
from datetime import datetime
import yaml
@dataclass
class AnalysisConfig:
"""解析設定クラス"""
gene_list: List[str]
databases: List[str]
analysis_types: List[str]
output_formats: List[str]
max_concurrent_requests: int = 5
cache_expiry_hours: int = 24
enable_parallel_processing: bool = True
class MultiDatabaseIntegrator:
"""マルチデータベース統合解析システム"""
def __init__(self, config_file: Optional[str] = None, work_dir: str = "./integration_analysis"):
"""
Args:
config_file: 設定ファイルパス(YAML)
work_dir: 作業ディレクトリ
"""
self.work_dir = Path(work_dir)
self.work_dir.mkdir(exist_ok=True)
# ログ設定
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(self.work_dir / 'integration.log'),
logging.StreamHandler()
]
)
self.logger = logging.getLogger(__name__)
# 設定読み込み
self.config = self._load_config(config_file)
# 統合データベース初期化
self.db_path = self.work_dir / "integrated_data.db"
self._init_integrated_db()
# データソース定義
self.data_sources = {
"uniprot": {
"base_url": "https://rest.uniprot.org",
"rate_limit": 1.0,
"auth_required": False
},
"ensembl": {
"base_url": "https://rest.ensembl.org",
"rate_limit": 0.5,
"auth_required": False
},
"kegg": {
"base_url": "https://rest.kegg.jp",
"rate_limit": 1.0,
"auth_required": False
},
"string": {
"base_url": "https://string-db.org/api",
"rate_limit": 1.0,
"auth_required": False
},
"ncbi": {
"base_url": "https://eutils.ncbi.nlm.nih.gov/entrez/eutils",
"rate_limit": 1.0,
"auth_required": False
}
}
def _load_config(self, config_file: Optional[str]) -> AnalysisConfig:
"""設定ファイルの読み込み"""
if config_file and Path(config_file).exists():
with open(config_file, 'r', encoding='utf-8') as f:
config_data = yaml.safe_load(f)
return AnalysisConfig(
gene_list=config_data.get('gene_list', []),
databases=config_data.get('databases', ['uniprot', 'ensembl']),
analysis_types=config_data.get('analysis_types', ['functional', 'pathway']),
output_formats=config_data.get('output_formats', ['json', 'excel']),
max_concurrent_requests=config_data.get('max_concurrent_requests', 5),
cache_expiry_hours=config_data.get('cache_expiry_hours', 24),
enable_parallel_processing=config_data.get('enable_parallel_processing', True)
)
else:
# デフォルト設定
return AnalysisConfig(
gene_list=[],
databases=['uniprot', 'ensembl', 'string'],
analysis_types=['functional', 'pathway', 'interaction'],
output_formats=['json', 'excel', 'html'],
max_concurrent_requests=5,
cache_expiry_hours=24,
enable_parallel_processing=True
)
def _init_integrated_db(self):
"""統合データベースの初期化"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# 遺伝子基本情報テーブル
cursor.execute('''
CREATE TABLE IF NOT EXISTS genes (
id INTEGER PRIMARY KEY AUTOINCREMENT,
gene_symbol TEXT UNIQUE NOT NULL,
ensembl_id TEXT,
uniprot_id TEXT,
ncbi_gene_id TEXT,
description TEXT,
chromosome TEXT,
start_position INTEGER,
end_position INTEGER,
strand INTEGER,
biotype TEXT,
last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# 機能アノテーションテーブル
cursor.execute('''
CREATE TABLE IF NOT EXISTS functional_annotations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
gene_id INTEGER,
annotation_type TEXT,
annotation_value TEXT,
source_database TEXT,
evidence_code TEXT,
confidence_score REAL,
last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (gene_id) REFERENCES genes (id)
)
''')
# パスウェイ情報テーブル
cursor.execute('''
CREATE TABLE IF NOT EXISTS pathway_annotations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
gene_id INTEGER,
pathway_id TEXT,
pathway_name TEXT,
pathway_category TEXT,
source_database TEXT,
last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (gene_id) REFERENCES genes (id)
)
''')
# タンパク質相互作用テーブル
cursor.execute('''
CREATE TABLE IF NOT EXISTS protein_interactions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
gene_a_id INTEGER,
gene_b_id INTEGER,
interaction_type TEXT,
confidence_score REAL,
source_database TEXT,
experimental_evidence TEXT,
last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (gene_a_id) REFERENCES genes (id),
FOREIGN KEY (gene_b_id) REFERENCES genes (id)
)
''')
# 解析結果キャッシュテーブル
cursor.execute('''
CREATE TABLE IF NOT EXISTS analysis_cache (
id INTEGER PRIMARY KEY AUTOINCREMENT,
cache_key TEXT UNIQUE,
analysis_type TEXT,
gene_list TEXT,
result_data TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
expires_at TIMESTAMP
)
''')
conn.commit()
conn.close()
async def fetch_gene_basic_info(self, session: aiohttp.ClientSession,
gene_symbol: str) -> Dict:
"""遺伝子基本情報の非同期取得"""
gene_info = {"gene_symbol": gene_symbol}
# Ensemblから基本情報取得
try:
ensembl_url = f"{self.data_sources['ensembl']['base_url']}/lookup/symbol/homo_sapiens/{gene_symbol}"
async with session.get(ensembl_url, headers={"Content-Type": "application/json"}) as response:
if response.status == 200:
ensembl_data = await response.json()
gene_info.update({
"ensembl_id": ensembl_data.get("id", ""),
"description": ensembl_data.get("description", ""),
"chromosome": ensembl_data.get("seq_region_name", ""),
"start_position": ensembl_data.get("start", 0),
"end_position": ensembl_data.get("end", 0),
"strand": ensembl_data.get("strand", 0),
"biotype": ensembl_data.get("biotype", "")
})
except Exception as e:
self.logger.error(f"Ensembl取得エラー ({gene_symbol}): {e}")
# UniProtから基本情報取得
try:
uniprot_url = f"{self.data_sources['uniprot']['base_url']}/uniprotkb/search"
params = {"query": f"gene:{gene_symbol} AND organism_id:9606", "format": "json", "size": 1}
async with session.get(uniprot_url, params=params) as response:
if response.status == 200:
uniprot_data = await response.json()
if uniprot_data.get("results"):
result = uniprot_data["results"][0]
gene_info["uniprot_id"] = result.get("primaryAccession", "")
except Exception as e:
self.logger.error(f"UniProt取得エラー ({gene_symbol}): {e}")
await asyncio.sleep(self.data_sources['ensembl']['rate_limit'])
return gene_info
async def fetch_functional_annotations(self, session: aiohttp.ClientSession,
gene_symbol: str, uniprot_id: str) -> List[Dict]:
"""機能アノテーションの非同期取得"""
annotations = []
if uniprot_id:
try:
# UniProtから詳細な機能情報取得
uniprot_url = f"{self.data_sources['uniprot']['base_url']}/uniprotkb/{uniprot_id}"
async with session.get(uniprot_url) as response:
if response.status == 200:
protein_data = await response.json()
# GO terms
if "uniProtKBCrossReferences" in protein_data:
for ref in protein_data["uniProtKBCrossReferences"]:
if ref.get("database") == "GO":
go_id = ref.get("id", "")
go_term = ""
aspect = ""
if "properties" in ref:
for prop in ref["properties"]:
if prop.get("key") == "GoTerm":
go_term = prop.get("value", "")
elif prop.get("key") == "GoEvidenceType":
aspect = prop.get("value", "")
annotations.append({
"annotation_type": "GO_term",
"annotation_value": f"{go_id}|{go_term}",
"source_database": "UniProt",
"evidence_code": aspect,
"confidence_score": 0.8
})
# 機能コメント
if "comments" in protein_data:
for comment in protein_data["comments"]:
if comment.get("commentType") == "FUNCTION":
annotations.append({
"annotation_type": "function_description",
"annotation_value": comment.get("texts", [{}])[0].get("value", ""),
"source_database": "UniProt",
"evidence_code": "manual_curation",
"confidence_score": 0.9
})
except Exception as e:
self.logger.error(f"機能アノテーション取得エラー ({gene_symbol}): {e}")
await asyncio.sleep(self.data_sources['uniprot']['rate_limit'])
return annotations
async def fetch_pathway_information(self, session: aiohttp.ClientSession,
gene_symbol: str) -> List[Dict]:
"""パスウェイ情報の非同期取得"""
pathways = []
try:
# KEGGからパスウェイ情報取得
kegg_url = f"{self.data_sources['kegg']['base_url']}/find/pathway/{gene_symbol}"
async with session.get(kegg_url) as response:
if response.status == 200:
kegg_text = await response.text()
if kegg_text.strip():
for line in kegg_text.strip().split('\n'):
if '\t' in line:
pathway_id, pathway_name = line.split('\t', 1)
pathways.append({
"pathway_id": pathway_id,
"pathway_name": pathway_name,
"pathway_category": "metabolic",
"source_database": "KEGG"
})
except Exception as e:
self.logger.error(f"KEGG パスウェイ取得エラー ({gene_symbol}): {e}")
await asyncio.sleep(self.data_sources['kegg']['rate_limit'])
return pathways
async def fetch_protein_interactions(self, session: aiohttp.ClientSession,
gene_symbol: str) -> List[Dict]:
"""タンパク質相互作用の非同期取得"""
interactions = []
try:
# STRINGから相互作用情報取得
string_url = f"{self.data_sources['string']['base_url']}/json/network"
params = {
"identifiers": gene_symbol,
"species": 9606,
"limit": 20
}
async with session.get(string_url, params=params) as response:
if response.status == 200:
string_data = await response.json()
for interaction in string_data:
interactions.append({
"partner_a": interaction.get("preferredName_A", ""),
"partner_b": interaction.get("preferredName_B", ""),
"interaction_type": "protein-protein",
"confidence_score": interaction.get("score", 0) / 1000.0, # 正規化
"source_database": "STRING",
"experimental_evidence": "high_throughput"
})
except Exception as e:
self.logger.error(f"STRING 相互作用取得エラー ({gene_symbol}): {e}")
await asyncio.sleep(self.data_sources['string']['rate_limit'])
return interactions
async def process_gene_batch(self, gene_batch: List[str]) -> Dict:
"""遺伝子バッチの並列処理"""
results = {"genes": [], "annotations": [], "pathways": [], "interactions": []}
connector = aiohttp.TCPConnector(limit=self.config.max_concurrent_requests)
timeout = aiohttp.ClientTimeout(total=300) # 5分タイムアウト
async with aiohttp.ClientSession(connector=connector, timeout=timeout) as session:
# 基本情報の並列取得
basic_info_tasks = [
self.fetch_gene_basic_info(session, gene) for gene in gene_batch
]
gene_info_list = await asyncio.gather(*basic_info_tasks, return_exceptions=True)
# エラーハンドリング
valid_gene_info = []
for i, info in enumerate(gene_info_list):
if isinstance(info, Exception):
self.logger.error(f"遺伝子情報取得失敗 ({gene_batch[i]}): {info}")
else:
valid_gene_info.append(info)
results["genes"].append(info)
# 詳細情報の並列取得
detail_tasks = []
for gene_info in valid_gene_info:
gene_symbol = gene_info["gene_symbol"]
uniprot_id = gene_info.get("uniprot_id", "")
# 機能アノテーション
detail_tasks.append(
self.fetch_functional_annotations(session, gene_symbol, uniprot_id)
)
# パスウェイ情報
detail_tasks.append(
self.fetch_pathway_information(session, gene_symbol)
)
# 相互作用情報
detail_tasks.append(
self.fetch_protein_interactions(session, gene_symbol)
)
detail_results = await asyncio.gather(*detail_tasks, return_exceptions=True)
# 結果の整理
for i, result in enumerate(detail_results):
if isinstance(result, Exception):
self.logger.error(f"詳細情報取得失敗: {result}")
continue
task_type = i % 3
if task_type == 0: # 機能アノテーション
results["annotations"].extend(result)
elif task_type == 1: # パスウェイ情報
results["pathways"].extend(result)
elif task_type == 2: # 相互作用情報
results["interactions"].extend(result)
return results
def save_to_database(self, results: Dict):
"""結果をデータベースに保存"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
try:
# 遺伝子基本情報の保存
for gene_info in results.get("genes", []):
cursor.execute('''
INSERT OR REPLACE INTO genes
(gene_symbol, ensembl_id, uniprot_id, description, chromosome,
start_position, end_position, strand, biotype)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
gene_info.get("gene_symbol", ""),
gene_info.get("ensembl_id", ""),
gene_info.get("uniprot_id", ""),
gene_info.get("description", ""),
gene_info.get("chromosome", ""),
gene_info.get("start_position", 0),
gene_info.get("end_position", 0),
gene_info.get("strand", 0),
gene_info.get("biotype", "")
))
gene_id = cursor.lastrowid or cursor.execute(
"SELECT id FROM genes WHERE gene_symbol = ?",
(gene_info.get("gene_symbol", ""),)
).fetchone()[0]
# 機能アノテーション保存
for annotation in results.get("annotations", []):
cursor.execute('''
INSERT INTO functional_annotations
(gene_id, annotation_type, annotation_value, source_database,
evidence_code, confidence_score)
VALUES (?, ?, ?, ?, ?, ?)
''', (
gene_id,
annotation.get("annotation_type", ""),
annotation.get("annotation_value", ""),
annotation.get("source_database", ""),
annotation.get("evidence_code", ""),
annotation.get("confidence_score", 0.0)
))
# パスウェイ情報保存
for pathway in results.get("pathways", []):
cursor.execute('''
INSERT INTO pathway_annotations
(gene_id, pathway_id, pathway_name, pathway_category, source_database)
VALUES (?, ?, ?, ?, ?)
''', (
gene_id,
pathway.get("pathway_id", ""),
pathway.get("pathway_name", ""),
pathway.get("pathway_category", ""),
pathway.get("source_database", "")
))
conn.commit()
self.logger.info(f"データベース保存完了: {len(results.get('genes', []))}遺伝子")
except Exception as e:
self.logger.error(f"データベース保存エラー: {e}")
conn.rollback()
finally:
conn.close()
async def run_integrated_analysis(self, gene_list: List[str],
batch_size: int = 10) -> Dict:
"""統合解析の実行"""
self.logger.info(f"統合解析開始: {len(gene_list)}遺伝子")
all_results = {"genes": [], "annotations": [], "pathways": [], "interactions": []}
# バッチ処理
for i in range(0, len(gene_list), batch_size):
batch = gene_list[i:i + batch_size]
self.logger.info(f"バッチ {i//batch_size + 1} 処理中: {batch}")
batch_results = await self.process_gene_batch(batch)
# 結果の統合
for key in all_results:
all_results[key].extend(batch_results.get(key, []))
# データベースに保存
self.save_to_database(batch_results)
# バッチ間の待機
if i + batch_size < len(gene_list):
await asyncio.sleep(2)
self.logger.info("統合解析完了")
return all_results
def generate_analysis_report(self, results: Dict, output_format: str = "html") -> str:
"""解析レポートの生成"""
if output_format == "html":
html_content = f"""
<!DOCTYPE html>
<html>
<head>
<title>マルチデータベース統合解析レポート</title>
<style>
body
h1
h2
table
th, td
th
.summary
</style>
</head>
<body>
<h1>マルチデータベース統合解析レポート</h1>
<div class="summary">
<h2>解析サマリー</h2>
<ul>
<li>解析遺伝子数: {len(results.get('genes', []))}</li>
<li>機能アノテーション数: {len(results.get('annotations', []))}</li>
<li>パスウェイ情報数: {len(results.get('pathways', []))}</li>
<li>相互作用情報数: {len(results.get('interactions', []))}</li>
</ul>
</div>
<h2>遺伝子情報</h2>
<table>
<tr>
<th>遺伝子シンボル</th>
<th>Ensembl ID</th>
<th>UniProt ID</th>
<th>染色体</th>
<th>バイオタイプ</th>
</tr>
"""
for gene in results.get("genes", [])[:20]: # 最初の20件のみ表示
html_content += f"""
<tr>
<td>{gene.get('gene_symbol', 'N/A')}</td>
<td>{gene.get('ensembl_id', 'N/A')}</td>
<td>{gene.get('uniprot_id', 'N/A')}</td>
<td>{gene.get('chromosome', 'N/A')}</td>
<td>{gene.get('biotype', 'N/A')}</td>
</tr>
"""
html_content += """
</table>
<p>生成日時: """ + datetime.now().strftime('%Y-%m-%d %H:%M:%S') + """</p>
</body>
</html>
"""
return html_content
elif output_format == "json":
return json.dumps(results, indent=2, ensure_ascii=False)
else:
return str(results)
# 使用例
async def main():
"""メイン実行関数"""
# 設定ファイル作成(例)
config = {
"gene_list": ["BRCA1", "TP53", "EGFR", "KRAS", "MYC", "PIK3CA", "APC", "PTEN"],
"databases": ["uniprot", "ensembl", "kegg", "string"],
"analysis_types": ["functional", "pathway", "interaction"],
"output_formats": ["html", "json", "excel"],
"max_concurrent_requests": 5,
"cache_expiry_hours": 24,
"enable_parallel_processing": True
}
config_file = Path("./integration_config.yaml")
with open(config_file, 'w', encoding='utf-8') as f:
yaml.dump(config, f, allow_unicode=True)
# 統合解析器の初期化
integrator = MultiDatabaseIntegrator(
config_file=str(config_file),
work_dir="./multi_db_analysis"
)
# 統合解析実行
results = await integrator.run_integrated_analysis(
gene_list=config["gene_list"],
batch_size=4
)
# レポート生成
html_report = integrator.generate_analysis_report(results, output_format="html")
# レポート保存
report_file = integrator.work_dir / "integration_report.html"
with open(report_file, 'w', encoding='utf-8') as f:
f.write(html_report)
print(f"\n統合解析完了!")
print(f"解析対象遺伝子: {len(results['genes'])}件")
print(f"レポート保存: {report_file}")
# JSON結果も保存
json_file = integrator.work_dir / "integration_results.json"
with open(json_file, 'w', encoding='utf-8') as f:
json.dump(results, f, indent=2, ensure_ascii=False)
print(f"JSON結果: {json_file}")
if __name__ == "__main__":
# 非同期実行
asyncio.run(main())
I.5 パフォーマンス最適化とベストプラクティス
I.5.1 効率的なデータアクセス設計
import asyncio
import aiohttp
import aiofiles
import aiomysql
import redis.asyncio as redis
import time
from typing import List, Dict, Optional, AsyncGenerator
from dataclasses import dataclass, asdict
import json
import hashlib
from pathlib import Path
import logging
from contextlib import asynccontextmanager
@dataclass
class QueryMetrics:
"""クエリ性能メトリクス"""
query_id: str
database: str
start_time: float
end_time: float
response_size: int
cache_hit: bool
error: Optional[str] = None
@property
def duration(self) -> float:
return self.end_time - self.start_time
class PerformanceOptimizer:
"""データベースアクセス性能最適化システム"""
def __init__(self, config: Dict):
"""
Args:
config: 設定辞書
"""
self.config = config
self.logger = logging.getLogger(__name__)
# Redis接続プール
self.redis_pool = None
# MySQL接続プール
self.mysql_pool = None
# HTTP セッション管理
self.http_sessions = {}
# メトリクス収集
self.metrics = []
# キャッシュ戦略
self.cache_strategies = {
"uniprot": {"ttl": 3600, "prefix": "up:"}, # 1時間
"ensembl": {"ttl": 7200, "prefix": "en:"}, # 2時間
"ncbi": {"ttl": 1800, "prefix": "ncbi:"}, # 30分
"kegg": {"ttl": 14400, "prefix": "kegg:"} # 4時間
}
async def initialize(self):
"""リソースの初期化"""
try:
# Redis接続プール初期化
if self.config.get("redis", {}).get("enabled", False):
redis_config = self.config["redis"]
self.redis_pool = redis.ConnectionPool(
host=redis_config.get("host", "localhost"),
port=redis_config.get("port", 6379),
db=redis_config.get("db", 0),
max_connections=redis_config.get("max_connections", 20),
decode_responses=True
)
self.logger.info("Redis接続プール初期化完了")
# MySQL接続プール初期化
if self.config.get("mysql", {}).get("enabled", False):
mysql_config = self.config["mysql"]
self.mysql_pool = await aiomysql.create_pool(
host=mysql_config.get("host", "localhost"),
port=mysql_config.get("port", 3306),
user=mysql_config.get("user", "root"),
password=mysql_config.get("password", ""),
db=mysql_config.get("database", "bioinformatics"),
minsize=mysql_config.get("min_connections", 5),
maxsize=mysql_config.get("max_connections", 20),
autocommit=True
)
self.logger.info("MySQL接続プール初期化完了")
# HTTP セッションプール初期化
for db_name, db_config in self.config.get("databases", {}).items():
connector = aiohttp.TCPConnector(
limit=db_config.get("max_connections", 10),
limit_per_host=db_config.get("limit_per_host", 5),
ttl_dns_cache=300,
use_dns_cache=True
)
timeout = aiohttp.ClientTimeout(
total=db_config.get("timeout", 30),
connect=db_config.get("connect_timeout", 10)
)
self.http_sessions[db_name] = aiohttp.ClientSession(
connector=connector,
timeout=timeout,
headers=db_config.get("headers", {})
)
self.logger.info("パフォーマンス最適化システム初期化完了")
except Exception as e:
self.logger.error(f"初期化エラー: {e}")
raise
async def cleanup(self):
"""リソースのクリーンアップ"""
# HTTPセッション閉じる
for session in self.http_sessions.values():
await session.close()
# MySQL接続プール閉じる
if self.mysql_pool:
self.mysql_pool.close()
await self.mysql_pool.wait_closed()
# Redis接続プール閉じる
if self.redis_pool:
await self.redis_pool.disconnect()
self.logger.info("リソースクリーンアップ完了")
def _generate_cache_key(self, database: str, query: str, params: Dict = None) -> str:
"""キャッシュキーの生成"""
cache_data = {"db": database, "query": query, "params": params or {}}
cache_string = json.dumps(cache_data, sort_keys=True)
hash_key = hashlib.md5(cache_string.encode()).hexdigest()
prefix = self.cache_strategies.get(database, {}).get("prefix", "default:")
return f"{prefix}{hash_key}"
@asynccontextmanager
async def get_redis_connection(self):
"""Redis接続の取得(コンテキストマネージャ)"""
if not self.redis_pool:
yield None
return
connection = redis.Redis(connection_pool=self.redis_pool)
try:
yield connection
finally:
await connection.close()
async def get_cached_result(self, cache_key: str) -> Optional[Dict]:
"""キャッシュからの結果取得"""
async with self.get_redis_connection() as redis_conn:
if not redis_conn:
return None
try:
cached_data = await redis_conn.get(cache_key)
if cached_data:
return json.loads(cached_data)
except Exception as e:
self.logger.warning(f"キャッシュ取得エラー: {e}")
return None
async def set_cached_result(self, cache_key: str, data: Dict, database: str):
"""結果のキャッシュ保存"""
async with self.get_redis_connection() as redis_conn:
if not redis_conn:
return
try:
ttl = self.cache_strategies.get(database, {}).get("ttl", 3600)
await redis_conn.setex(
cache_key,
ttl,
json.dumps(data, ensure_ascii=False)
)
except Exception as e:
self.logger.warning(f"キャッシュ保存エラー: {e}")
async def batch_query_with_optimization(self,
database: str,
queries: List[Dict],
batch_size: int = 10,
enable_cache: bool = True) -> List[Dict]:
"""最適化されたバッチクエリ実行"""
results = []
# クエリをバッチに分割
for i in range(0, len(queries), batch_size):
batch = queries[i:i + batch_size]
self.logger.info(f"バッチ {i//batch_size + 1}/{(len(queries)-1)//batch_size + 1} 実行中")
batch_results = await self._execute_batch_with_cache(
database, batch, enable_cache
)
results.extend(batch_results)
# バッチ間の適切な待機
if i + batch_size < len(queries):
rate_limit = self.config.get("databases", {}).get(database, {}).get("rate_limit", 1.0)
await asyncio.sleep(rate_limit)
return results
async def _execute_batch_with_cache(self,
database: str,
batch: List[Dict],
enable_cache: bool) -> List[Dict]:
"""キャッシュ機能付きバッチ実行"""
results = []
uncached_queries = []
cache_keys = []
# キャッシュチェック
if enable_cache:
for query in batch:
cache_key = self._generate_cache_key(database,
query.get("endpoint", ""),
query.get("params"))
cache_keys.append(cache_key)
cached_result = await self.get_cached_result(cache_key)
if cached_result:
# キャッシュヒット
results.append(cached_result)
self._record_metrics(database, True, len(str(cached_result)))
else:
# キャッシュミス
uncached_queries.append((query, cache_key))
else:
uncached_queries = [(query, None) for query in batch]
# キャッシュミスしたクエリを並列実行
if uncached_queries:
session = self.http_sessions.get(database)
if not session:
self.logger.error(f"セッションが見つかりません: {database}")
return results
tasks = []
for query, cache_key in uncached_queries:
task = self._execute_single_query(session, database, query, cache_key, enable_cache)
tasks.append(task)
query_results = await asyncio.gather(*tasks, return_exceptions=True)
for result in query_results:
if isinstance(result, Exception):
self.logger.error(f"クエリ実行エラー: {result}")
results.append({"error": str(result)})
else:
results.append(result)
return results
async def _execute_single_query(self,
session: aiohttp.ClientSession,
database: str,
query: Dict,
cache_key: Optional[str],
enable_cache: bool) -> Dict:
"""単一クエリの実行"""
start_time = time.time()
try:
url = query.get("url", "")
params = query.get("params", {})
headers = query.get("headers", {})
method = query.get("method", "GET")
if method.upper() == "GET":
async with session.get(url, params=params, headers=headers) as response:
response.raise_for_status()
content_type = response.headers.get('content-type', '')
if 'application/json' in content_type:
result = await response.json()
else:
result = {"text": await response.text()}
elif method.upper() == "POST":
data = query.get("data", {})
async with session.post(url, json=data, params=params, headers=headers) as response:
response.raise_for_status()
result = await response.json()
else:
raise ValueError(f"サポートされていないHTTPメソッド: {method}")
# レスポンスサイズ計算
response_size = len(json.dumps(result))
# キャッシュ保存
if enable_cache and cache_key:
await self.set_cached_result(cache_key, result, database)
# メトリクス記録
self._record_metrics(database, False, response_size)
return result
except Exception as e:
end_time = time.time()
self.logger.error(f"クエリ実行エラー ({database}): {e}")
# エラーメトリクス記録
metrics = QueryMetrics(
query_id=str(hash(str(query))),
database=database,
start_time=start_time,
end_time=end_time,
response_size=0,
cache_hit=False,
error=str(e)
)
self.metrics.append(metrics)
return {"error": str(e)}
def _record_metrics(self, database: str, cache_hit: bool, response_size: int):
"""メトリクスの記録"""
metrics = QueryMetrics(
query_id=f"{database}_{int(time.time())}",
database=database,
start_time=time.time(),
end_time=time.time(),
response_size=response_size,
cache_hit=cache_hit
)
self.metrics.append(metrics)
async def optimize_database_schema(self):
"""データベーススキーマの最適化"""
if not self.mysql_pool:
return
optimization_queries = [
# インデックス最適化
"""
ALTER TABLE genes
ADD INDEX idx_gene_symbol (gene_symbol),
ADD INDEX idx_chromosome (chromosome),
ADD INDEX idx_biotype (biotype)
""",
"""
ALTER TABLE functional_annotations
ADD INDEX idx_gene_annotation (gene_id, annotation_type),
ADD INDEX idx_source_db (source_database),
ADD INDEX idx_confidence (confidence_score)
""",
"""
ALTER TABLE pathway_annotations
ADD INDEX idx_gene_pathway (gene_id, pathway_id),
ADD INDEX idx_pathway_category (pathway_category)
""",
"""
ALTER TABLE protein_interactions
ADD INDEX idx_gene_pair (gene_a_id, gene_b_id),
ADD INDEX idx_interaction_type (interaction_type),
ADD INDEX idx_confidence_score (confidence_score)
""",
# パーティション設定(日付ベース)
"""
ALTER TABLE analysis_cache
PARTITION BY RANGE (TO_DAYS(created_at)) (
PARTITION p0 VALUES LESS THAN (TO_DAYS('2024-01-01')),
PARTITION p1 VALUES LESS THAN (TO_DAYS('2024-04-01')),
PARTITION p2 VALUES LESS THAN (TO_DAYS('2024-07-01')),
PARTITION p3 VALUES LESS THAN (TO_DAYS('2024-10-01')),
PARTITION p4 VALUES LESS THAN (TO_DAYS('2025-01-01'))
)
"""
]
async with self.mysql_pool.acquire() as connection:
async with connection.cursor() as cursor:
for query in optimization_queries:
try:
await cursor.execute(query)
self.logger.info(f"スキーマ最適化実行: {query[:50]}...")
except Exception as e:
self.logger.warning(f"スキーマ最適化スキップ: {e}")
def generate_performance_report(self) -> Dict:
"""パフォーマンスレポートの生成"""
if not self.metrics:
return {"message": "メトリクスデータがありません"}
# 基本統計
total_queries = len(self.metrics)
cache_hits = sum(1 for m in self.metrics if m.cache_hit)
errors = sum(1 for m in self.metrics if m.error)
# データベース別統計
db_stats = {}
for metric in self.metrics:
db = metric.database
if db not in db_stats:
db_stats[db] = {
"total_queries": 0,
"cache_hits": 0,
"errors": 0,
"total_duration": 0,
"total_size": 0
}
db_stats[db]["total_queries"] += 1
if metric.cache_hit:
db_stats[db]["cache_hits"] += 1
if metric.error:
db_stats[db]["errors"] += 1
db_stats[db]["total_duration"] += metric.duration
db_stats[db]["total_size"] += metric.response_size
# パフォーマンスサマリー
report = {
"summary": {
"total_queries": total_queries,
"cache_hit_rate": cache_hits / total_queries if total_queries > 0 else 0,
"error_rate": errors / total_queries if total_queries > 0 else 0,
"average_response_time": sum(m.duration for m in self.metrics) / total_queries if total_queries > 0 else 0
},
"database_stats": {},
"recommendations": []
}
# データベース別詳細統計
for db, stats in db_stats.items():
report["database_stats"][db] = {
"total_queries": stats["total_queries"],
"cache_hit_rate": stats["cache_hits"] / stats["total_queries"] if stats["total_queries"] > 0 else 0,
"error_rate": stats["errors"] / stats["total_queries"] if stats["total_queries"] > 0 else 0,
"average_response_time": stats["total_duration"] / stats["total_queries"] if stats["total_queries"] > 0 else 0,
"average_response_size": stats["total_size"] / stats["total_queries"] if stats["total_queries"] > 0 else 0
}
# 最適化推奨事項
for db, stats in report["database_stats"].items():
if stats["cache_hit_rate"] < 0.5:
report["recommendations"].append(f"{db}: キャッシュTTLの延長を検討")
if stats["error_rate"] > 0.1:
report["recommendations"].append(f"{db}: エラーハンドリング強化が必要")
if stats["average_response_time"] > 5.0:
report["recommendations"].append(f"{db}: レスポンス時間最適化が必要")
return report
# 使用例
async def main():
"""パフォーマンス最適化デモ"""
config = {
"redis": {
"enabled": True,
"host": "localhost",
"port": 6379,
"db": 0,
"max_connections": 20
},
"mysql": {
"enabled": False, # MySQL未使用の場合
"host": "localhost",
"port": 3306,
"user": "root",
"password": "",
"database": "bioinformatics",
"min_connections": 5,
"max_connections": 20
},
"databases": {
"uniprot": {
"base_url": "https://rest.uniprot.org",
"max_connections": 10,
"limit_per_host": 5,
"timeout": 30,
"rate_limit": 1.0,
"headers": {"Accept": "application/json"}
},
"ensembl": {
"base_url": "https://rest.ensembl.org",
"max_connections": 8,
"limit_per_host": 4,
"timeout": 20,
"rate_limit": 0.5,
"headers": {"Content-Type": "application/json"}
}
}
}
optimizer = PerformanceOptimizer(config)
try:
await optimizer.initialize()
# サンプルクエリ生成
sample_queries = []
genes = ["BRCA1", "TP53", "EGFR", "MYC", "KRAS"]
for gene in genes:
sample_queries.append({
"url": f"{config['databases']['uniprot']['base_url']}/uniprotkb/search",
"params": {"query": f"gene:{gene} AND organism_id:9606", "format": "json", "size": 1},
"method": "GET"
})
# 最適化されたバッチクエリ実行
print("最適化バッチクエリ実行中...")
results = await optimizer.batch_query_with_optimization(
database="uniprot",
queries=sample_queries,
batch_size=3,
enable_cache=True
)
print(f"結果取得: {len(results)}件")
# 2回目実行(キャッシュ効果確認)
print("\n2回目実行(キャッシュテスト)...")
results2 = await optimizer.batch_query_with_optimization(
database="uniprot",
queries=sample_queries,
batch_size=3,
enable_cache=True
)
# パフォーマンスレポート生成
report = optimizer.generate_performance_report()
print("\n=== パフォーマンスレポート ===")
print(f"総クエリ数: {report['summary']['total_queries']}")
print(f"キャッシュヒット率: {report['summary']['cache_hit_rate']:.2%}")
print(f"エラー率: {report['summary']['error_rate']:.2%}")
print(f"平均レスポンス時間: {report['summary']['average_response_time']:.3f}秒")
if report['recommendations']:
print("\n推奨事項:")
for rec in report['recommendations']:
print(f"- {rec}")
finally:
await optimizer.cleanup()
if __name__ == "__main__":
# Redis不要な場合の簡易版
asyncio.run(main())
まとめ
本付録では、バイオインフォマティクス研究における主要データベースの実践的な活用方法を包括的に解説しました。
🎯 キーポイント
- 戦略的データベース選択: 研究クエスチョンに応じた効果的なデータベースの選び方
- 効率的なデータアクセス: API活用、バッチ処理、キャッシュ戦略による高速化
- 統合解析アプローチ: 複数データベースの情報を統合した包括的解析
- パフォーマンス最適化: 大規模データ処理のための実装上の工夫
💡 実践での活用
- 研究初期段階: データベース横断検索による仮説生成
- 詳細解析段階: 特化データベースを用いた深掘り調査
- 統合解析段階: マルチオミクスデータの統合的解釈
- 結果検証段階: 複数ソースでのクロスバリデーション
🔄 継続的改善
データベースの内容やAPIは常に更新されるため、定期的な手法の見直しと最適化が重要です。本ガイドで紹介した基本的なアプローチを基に、各研究プロジェクトに最適化したワークフローを構築してください。
戻る: 付録一覧 | 次へ: 付録J |