import asyncio
import aiohttp
import pandas as pd
import numpy as np
from typing import List, Dict, Optional, Tuple
import json
import sqlite3
from pathlib import Path
import logging
from dataclasses import dataclass
from datetime import datetime
import yaml
@dataclass
class AnalysisConfig:
"""解析設定クラス"""
gene_list: List[str]
databases: List[str]
analysis_types: List[str]
output_formats: List[str]
max_concurrent_requests: int = 5
cache_expiry_hours: int = 24
enable_parallel_processing: bool = True
class MultiDatabaseIntegrator:
"""マルチデータベース統合解析システム"""
def __init__(self, config_file: Optional[str] = None, work_dir: str = "./integration_analysis"):
"""
Args:
config_file: 設定ファイルパス(YAML)
work_dir: 作業ディレクトリ
"""
self.work_dir = Path(work_dir)
self.work_dir.mkdir(exist_ok=True)
# ログ設定
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(self.work_dir / 'integration.log'),
logging.StreamHandler()
]
)
self.logger = logging.getLogger(__name__)
# 設定読み込み
self.config = self._load_config(config_file)
# 統合データベース初期化
self.db_path = self.work_dir / "integrated_data.db"
self._init_integrated_db()
# データソース定義
self.data_sources = {
"uniprot": {
"base_url": "https://rest.uniprot.org",
"rate_limit": 1.0,
"auth_required": False
},
"ensembl": {
"base_url": "https://rest.ensembl.org",
"rate_limit": 0.5,
"auth_required": False
},
"kegg": {
"base_url": "https://rest.kegg.jp",
"rate_limit": 1.0,
"auth_required": False
},
"string": {
"base_url": "https://string-db.org/api",
"rate_limit": 1.0,
"auth_required": False
},
"ncbi": {
"base_url": "https://eutils.ncbi.nlm.nih.gov/entrez/eutils",
"rate_limit": 1.0,
"auth_required": False
}
}
def _load_config(self, config_file: Optional[str]) -> AnalysisConfig:
"""設定ファイルの読み込み"""
if config_file and Path(config_file).exists():
with open(config_file, 'r', encoding='utf-8') as f:
config_data = yaml.safe_load(f)
return AnalysisConfig(
gene_list=config_data.get('gene_list', []),
databases=config_data.get('databases', ['uniprot', 'ensembl']),
analysis_types=config_data.get('analysis_types', ['functional', 'pathway']),
output_formats=config_data.get('output_formats', ['json', 'excel']),
max_concurrent_requests=config_data.get('max_concurrent_requests', 5),
cache_expiry_hours=config_data.get('cache_expiry_hours', 24),
enable_parallel_processing=config_data.get('enable_parallel_processing', True)
)
else:
# デフォルト設定
return AnalysisConfig(
gene_list=[],
databases=['uniprot', 'ensembl', 'string'],
analysis_types=['functional', 'pathway', 'interaction'],
output_formats=['json', 'excel', 'html'],
max_concurrent_requests=5,
cache_expiry_hours=24,
enable_parallel_processing=True
)
def _init_integrated_db(self):
"""統合データベースの初期化"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# 遺伝子基本情報テーブル
cursor.execute('''
CREATE TABLE IF NOT EXISTS genes (
id INTEGER PRIMARY KEY AUTOINCREMENT,
gene_symbol TEXT UNIQUE NOT NULL,
ensembl_id TEXT,
uniprot_id TEXT,
ncbi_gene_id TEXT,
description TEXT,
chromosome TEXT,
start_position INTEGER,
end_position INTEGER,
strand INTEGER,
biotype TEXT,
last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# 機能アノテーションテーブル
cursor.execute('''
CREATE TABLE IF NOT EXISTS functional_annotations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
gene_id INTEGER,
annotation_type TEXT,
annotation_value TEXT,
source_database TEXT,
evidence_code TEXT,
confidence_score REAL,
last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (gene_id) REFERENCES genes (id)
)
''')
# パスウェイ情報テーブル
cursor.execute('''
CREATE TABLE IF NOT EXISTS pathway_annotations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
gene_id INTEGER,
pathway_id TEXT,
pathway_name TEXT,
pathway_category TEXT,
source_database TEXT,
last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (gene_id) REFERENCES genes (id)
)
''')
# タンパク質相互作用テーブル
cursor.execute('''
CREATE TABLE IF NOT EXISTS protein_interactions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
gene_a_id INTEGER,
gene_b_id INTEGER,
interaction_type TEXT,
confidence_score REAL,
source_database TEXT,
experimental_evidence TEXT,
last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (gene_a_id) REFERENCES genes (id),
FOREIGN KEY (gene_b_id) REFERENCES genes (id)
)
''')
# 解析結果キャッシュテーブル
cursor.execute('''
CREATE TABLE IF NOT EXISTS analysis_cache (
id INTEGER PRIMARY KEY AUTOINCREMENT,
cache_key TEXT UNIQUE,
analysis_type TEXT,
gene_list TEXT,
result_data TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
expires_at TIMESTAMP
)
''')
conn.commit()
conn.close()
async def fetch_gene_basic_info(self, session: aiohttp.ClientSession,
gene_symbol: str) -> Dict:
"""遺伝子基本情報の非同期取得"""
gene_info = {"gene_symbol": gene_symbol}
# Ensemblから基本情報取得
try:
ensembl_url = f"{self.data_sources['ensembl']['base_url']}/lookup/symbol/homo_sapiens/{gene_symbol}"
async with session.get(ensembl_url, headers={"Content-Type": "application/json"}) as response:
if response.status == 200:
ensembl_data = await response.json()
gene_info.update({
"ensembl_id": ensembl_data.get("id", ""),
"description": ensembl_data.get("description", ""),
"chromosome": ensembl_data.get("seq_region_name", ""),
"start_position": ensembl_data.get("start", 0),
"end_position": ensembl_data.get("end", 0),
"strand": ensembl_data.get("strand", 0),
"biotype": ensembl_data.get("biotype", "")
})
except Exception as e:
self.logger.error(f"Ensembl取得エラー ({gene_symbol}): {e}")
# UniProtから基本情報取得
try:
uniprot_url = f"{self.data_sources['uniprot']['base_url']}/uniprotkb/search"
params = {"query": f"gene:{gene_symbol} AND organism_id:9606", "format": "json", "size": 1}
async with session.get(uniprot_url, params=params) as response:
if response.status == 200:
uniprot_data = await response.json()
if uniprot_data.get("results"):
result = uniprot_data["results"][0]
gene_info["uniprot_id"] = result.get("primaryAccession", "")
except Exception as e:
self.logger.error(f"UniProt取得エラー ({gene_symbol}): {e}")
await asyncio.sleep(self.data_sources['ensembl']['rate_limit'])
return gene_info
async def fetch_functional_annotations(self, session: aiohttp.ClientSession,
gene_symbol: str, uniprot_id: str) -> List[Dict]:
"""機能アノテーションの非同期取得"""
annotations = []
if uniprot_id:
try:
# UniProtから詳細な機能情報取得
uniprot_url = f"{self.data_sources['uniprot']['base_url']}/uniprotkb/{uniprot_id}"
async with session.get(uniprot_url) as response:
if response.status == 200:
protein_data = await response.json()
# GO terms
if "uniProtKBCrossReferences" in protein_data:
for ref in protein_data["uniProtKBCrossReferences"]:
if ref.get("database") == "GO":
go_id = ref.get("id", "")
go_term = ""
aspect = ""
if "properties" in ref:
for prop in ref["properties"]:
if prop.get("key") == "GoTerm":
go_term = prop.get("value", "")
elif prop.get("key") == "GoEvidenceType":
aspect = prop.get("value", "")
annotations.append({
"annotation_type": "GO_term",
"annotation_value": f"{go_id}|{go_term}",
"source_database": "UniProt",
"evidence_code": aspect,
"confidence_score": 0.8
})
# 機能コメント
if "comments" in protein_data:
for comment in protein_data["comments"]:
if comment.get("commentType") == "FUNCTION":
annotations.append({
"annotation_type": "function_description",
"annotation_value": comment.get("texts", [{}])[0].get("value", ""),
"source_database": "UniProt",
"evidence_code": "manual_curation",
"confidence_score": 0.9
})
except Exception as e:
self.logger.error(f"機能アノテーション取得エラー ({gene_symbol}): {e}")
await asyncio.sleep(self.data_sources['uniprot']['rate_limit'])
return annotations
async def fetch_pathway_information(self, session: aiohttp.ClientSession,
gene_symbol: str) -> List[Dict]:
"""パスウェイ情報の非同期取得"""
pathways = []
try:
# KEGGからパスウェイ情報取得
kegg_url = f"{self.data_sources['kegg']['base_url']}/find/pathway/{gene_symbol}"
async with session.get(kegg_url) as response:
if response.status == 200:
kegg_text = await response.text()
if kegg_text.strip():
for line in kegg_text.strip().split('\n'):
if '\t' in line:
pathway_id, pathway_name = line.split('\t', 1)
pathways.append({
"pathway_id": pathway_id,
"pathway_name": pathway_name,
"pathway_category": "metabolic",
"source_database": "KEGG"
})
except Exception as e:
self.logger.error(f"KEGG パスウェイ取得エラー ({gene_symbol}): {e}")
await asyncio.sleep(self.data_sources['kegg']['rate_limit'])
return pathways
async def fetch_protein_interactions(self, session: aiohttp.ClientSession,
gene_symbol: str) -> List[Dict]:
"""タンパク質相互作用の非同期取得"""
interactions = []
try:
# STRINGから相互作用情報取得
string_url = f"{self.data_sources['string']['base_url']}/json/network"
params = {
"identifiers": gene_symbol,
"species": 9606,
"limit": 20
}
async with session.get(string_url, params=params) as response:
if response.status == 200:
string_data = await response.json()
for interaction in string_data:
interactions.append({
"partner_a": interaction.get("preferredName_A", ""),
"partner_b": interaction.get("preferredName_B", ""),
"interaction_type": "protein-protein",
"confidence_score": interaction.get("score", 0) / 1000.0, # 正規化
"source_database": "STRING",
"experimental_evidence": "high_throughput"
})
except Exception as e:
self.logger.error(f"STRING 相互作用取得エラー ({gene_symbol}): {e}")
await asyncio.sleep(self.data_sources['string']['rate_limit'])
return interactions
async def process_gene_batch(self, gene_batch: List[str]) -> Dict:
"""遺伝子バッチの並列処理"""
results = {"genes": [], "annotations": [], "pathways": [], "interactions": []}
connector = aiohttp.TCPConnector(limit=self.config.max_concurrent_requests)
timeout = aiohttp.ClientTimeout(total=300) # 5分タイムアウト
async with aiohttp.ClientSession(connector=connector, timeout=timeout) as session:
# 基本情報の並列取得
basic_info_tasks = [
self.fetch_gene_basic_info(session, gene) for gene in gene_batch
]
gene_info_list = await asyncio.gather(*basic_info_tasks, return_exceptions=True)
# エラーハンドリング
valid_gene_info = []
for i, info in enumerate(gene_info_list):
if isinstance(info, Exception):
self.logger.error(f"遺伝子情報取得失敗 ({gene_batch[i]}): {info}")
else:
valid_gene_info.append(info)
results["genes"].append(info)
# 詳細情報の並列取得
detail_tasks = []
for gene_info in valid_gene_info:
gene_symbol = gene_info["gene_symbol"]
uniprot_id = gene_info.get("uniprot_id", "")
# 機能アノテーション
detail_tasks.append(
self.fetch_functional_annotations(session, gene_symbol, uniprot_id)
)
# パスウェイ情報
detail_tasks.append(
self.fetch_pathway_information(session, gene_symbol)
)
# 相互作用情報
detail_tasks.append(
self.fetch_protein_interactions(session, gene_symbol)
)
detail_results = await asyncio.gather(*detail_tasks, return_exceptions=True)
# 結果の整理
for i, result in enumerate(detail_results):
if isinstance(result, Exception):
self.logger.error(f"詳細情報取得失敗: {result}")
continue
task_type = i % 3
if task_type == 0: # 機能アノテーション
results["annotations"].extend(result)
elif task_type == 1: # パスウェイ情報
results["pathways"].extend(result)
elif task_type == 2: # 相互作用情報
results["interactions"].extend(result)
return results
def save_to_database(self, results: Dict):
"""結果をデータベースに保存"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
try:
# 遺伝子基本情報の保存
for gene_info in results.get("genes", []):
cursor.execute('''
INSERT OR REPLACE INTO genes
(gene_symbol, ensembl_id, uniprot_id, description, chromosome,
start_position, end_position, strand, biotype)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
gene_info.get("gene_symbol", ""),
gene_info.get("ensembl_id", ""),
gene_info.get("uniprot_id", ""),
gene_info.get("description", ""),
gene_info.get("chromosome", ""),
gene_info.get("start_position", 0),
gene_info.get("end_position", 0),
gene_info.get("strand", 0),
gene_info.get("biotype", "")
))
gene_id = cursor.lastrowid or cursor.execute(
"SELECT id FROM genes WHERE gene_symbol = ?",
(gene_info.get("gene_symbol", ""),)
).fetchone()[0]
# 機能アノテーション保存
for annotation in results.get("annotations", []):
cursor.execute('''
INSERT INTO functional_annotations
(gene_id, annotation_type, annotation_value, source_database,
evidence_code, confidence_score)
VALUES (?, ?, ?, ?, ?, ?)
''', (
gene_id,
annotation.get("annotation_type", ""),
annotation.get("annotation_value", ""),
annotation.get("source_database", ""),
annotation.get("evidence_code", ""),
annotation.get("confidence_score", 0.0)
))
# パスウェイ情報保存
for pathway in results.get("pathways", []):
cursor.execute('''
INSERT INTO pathway_annotations
(gene_id, pathway_id, pathway_name, pathway_category, source_database)
VALUES (?, ?, ?, ?, ?)
''', (
gene_id,
pathway.get("pathway_id", ""),
pathway.get("pathway_name", ""),
pathway.get("pathway_category", ""),
pathway.get("source_database", "")
))
conn.commit()
self.logger.info(f"データベース保存完了: {len(results.get('genes', []))}遺伝子")
except Exception as e:
self.logger.error(f"データベース保存エラー: {e}")
conn.rollback()
finally:
conn.close()
async def run_integrated_analysis(self, gene_list: List[str],
batch_size: int = 10) -> Dict:
"""統合解析の実行"""
self.logger.info(f"統合解析開始: {len(gene_list)}遺伝子")
all_results = {"genes": [], "annotations": [], "pathways": [], "interactions": []}
# バッチ処理
for i in range(0, len(gene_list), batch_size):
batch = gene_list[i:i + batch_size]
self.logger.info(f"バッチ {i//batch_size + 1} 処理中: {batch}")
batch_results = await self.process_gene_batch(batch)
# 結果の統合
for key in all_results:
all_results[key].extend(batch_results.get(key, []))
# データベースに保存
self.save_to_database(batch_results)
# バッチ間の待機
if i + batch_size < len(gene_list):
await asyncio.sleep(2)
self.logger.info("統合解析完了")
return all_results
def generate_analysis_report(self, results: Dict, output_format: str = "html") -> str:
"""解析レポートの生成"""
if output_format == "html":
html_content = f"""
<!DOCTYPE html>
<html>
<head>
<title>マルチデータベース統合解析レポート</title>
<style>
body
h1
h2
table
th, td
th
.summary
</style>
</head>
<body>
<h1>マルチデータベース統合解析レポート</h1>
<div class="summary">
<h2>解析サマリー</h2>
<ul>
<li>解析遺伝子数: {len(results.get('genes', []))}</li>
<li>機能アノテーション数: {len(results.get('annotations', []))}</li>
<li>パスウェイ情報数: {len(results.get('pathways', []))}</li>
<li>相互作用情報数: {len(results.get('interactions', []))}</li>
</ul>
</div>
<h2>遺伝子情報</h2>
<table>
<tr>
<th>遺伝子シンボル</th>
<th>Ensembl ID</th>
<th>UniProt ID</th>
<th>染色体</th>
<th>バイオタイプ</th>
</tr>
"""
for gene in results.get("genes", [])[:20]: # 最初の20件のみ表示
html_content += f"""
<tr>
<td>{gene.get('gene_symbol', 'N/A')}</td>
<td>{gene.get('ensembl_id', 'N/A')}</td>
<td>{gene.get('uniprot_id', 'N/A')}</td>
<td>{gene.get('chromosome', 'N/A')}</td>
<td>{gene.get('biotype', 'N/A')}</td>
</tr>
"""
html_content += """
</table>
<p>生成日時: """ + datetime.now().strftime('%Y-%m-%d %H:%M:%S') + """</p>
</body>
</html>
"""
return html_content
elif output_format == "json":
return json.dumps(results, indent=2, ensure_ascii=False)
else:
return str(results)
# 使用例
async def main():
"""メイン実行関数"""
# 設定ファイル作成(例)
config = {
"gene_list": ["BRCA1", "TP53", "EGFR", "KRAS", "MYC", "PIK3CA", "APC", "PTEN"],
"databases": ["uniprot", "ensembl", "kegg", "string"],
"analysis_types": ["functional", "pathway", "interaction"],
"output_formats": ["html", "json", "excel"],
"max_concurrent_requests": 5,
"cache_expiry_hours": 24,
"enable_parallel_processing": True
}
config_file = Path("./integration_config.yaml")
with open(config_file, 'w', encoding='utf-8') as f:
yaml.dump(config, f, allow_unicode=True)
# 統合解析器の初期化
integrator = MultiDatabaseIntegrator(
config_file=str(config_file),
work_dir="./multi_db_analysis"
)
# 統合解析実行
results = await integrator.run_integrated_analysis(
gene_list=config["gene_list"],
batch_size=4
)
# レポート生成
html_report = integrator.generate_analysis_report(results, output_format="html")
# レポート保存
report_file = integrator.work_dir / "integration_report.html"
with open(report_file, 'w', encoding='utf-8') as f:
f.write(html_report)
print(f"\n統合解析完了!")
print(f"解析対象遺伝子: {len(results['genes'])}件")
print(f"レポート保存: {report_file}")
# JSON結果も保存
json_file = integrator.work_dir / "integration_results.json"
with open(json_file, 'w', encoding='utf-8') as f:
json.dump(results, f, indent=2, ensure_ascii=False)
print(f"JSON結果: {json_file}")
if __name__ == "__main__":
# 非同期実行
asyncio.run(main())