付録E: コード例

ゲノムデータ処理の完全な例

"""
完全なゲノム解析パイプラインの実装例
"""

import logging
from pathlib import Path
import yaml

class GenomicsPipeline:
    """統合ゲノム解析パイプライン"""
    
    def __init__(self, config_file):
        """
        設定ファイルからパイプラインを初期化
        
        Args:
            config_file: YAML形式の設定ファイル
        """
        with open(config_file, 'r') as f:
            self.config = yaml.safe_load(f)
            
        self.setup_logging()
        self.setup_directories()
        
    def setup_logging(self):
        """ロギングの設定"""
        logging.basicConfig(
            level=getattr(logging, self.config['logging']['level']),
            format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler(self.config['logging']['file']),
                logging.StreamHandler()
            ]
        )
        self.logger = logging.getLogger(__name__)
        
    def setup_directories(self):
        """作業ディレクトリの作成"""
        for dir_path in ['output', 'temp', 'logs']:
            Path(self.config['paths'][dir_path]).mkdir(
                parents=True, exist_ok=True
            )
    
    def run(self):
        """パイプラインの実行"""
        try:
            self.logger.info("Starting genomics pipeline")
            
            # 1. データ読み込み
            data = self.load_data()
            
            # 2. 品質管理
            qc_data = self.quality_control(data)
            
            # 3. 解析
            results = self.analyze(qc_data)
            
            # 4. レポート生成
            self.generate_report(results)
            
            self.logger.info("Pipeline completed successfully")
            
        except Exception as e:
            self.logger.error(f"Pipeline failed: {e}")
            raise

設定ファイルの例

# config.yaml
paths:
  input: /data/input
  output: /data/output
  temp: /data/temp
  logs: /data/logs

logging:
  level: INFO
  file: /data/logs/pipeline.log

analysis:
  min_quality: 20
  min_coverage: 10
  threads: 8

resources:
  memory: 32G
  time: 24:00:00

ミニワークフロー(Snakemake 例)

# Snakefile (最小例)
rule all:
    input: "results/sample.vcf"

rule map:
    input:
        ref = "ref.fa",
        r1 = "data/sample_R1.fq.gz",
        r2 = "data/sample_R2.fq.gz"
    output:
        bam = "results/sample.sorted.bam"
    shell:
        """
        bwa mem -t 8 {input.ref} {input.r1} {input.r2} \
          | samtools view -bS - \
          | samtools sort -o {output.bam}
        samtools index {output.bam}
        """

rule call:
    input:
        ref = "ref.fa",
        bam = "results/sample.sorted.bam"
    output:
        vcf = "results/sample.vcf"
    shell:
        """
        bcftools mpileup -Ou -f {input.ref} {input.bam} \
          | bcftools call -mv -Ov -o {output.vcf}
        """

ミニワークフロー(Nextflow 例)

// main.nf (最小例)
params.ref = file('ref.fa')
params.r1  = file('data/sample_R1.fq.gz')
params.r2  = file('data/sample_R2.fq.gz')

process MAP {
  publishDir 'results'
  input:
    path ref
    path r1
    path r2
  output:
    path 'sample.sorted.bam'
  """
  bwa mem -t 8 $ref $r1 $r2 \
    | samtools view -bS - \
    | samtools sort -o sample.sorted.bam
  samtools index sample.sorted.bam
  """
}

process CALL {
  publishDir 'results'
  input:
    path ref
    path bam from MAP.out
  output:
    path 'sample.vcf'
  """
  bcftools mpileup -Ou -f $ref $bam \
    | bcftools call -mv -Ov -o sample.vcf
  """
}

workflow {
  ref = file(params.ref)
  r1  = file(params.r1)
  r2  = file(params.r2)
  CALL(ref, MAP(ref, r1, r2))
}