付録B: トラブルシューティング

トラブルシューティング項目

この付録では、単に「解決法」を列挙するのではなく、運用で再利用しやすいように 症状 / 原因 / 確認コマンド / 対処 / 再発防止 の順で整理します。実際の解析では、入力データ、実行環境、使用ツール、ログ、実行時間、メモリ使用量を同じチケットや解析ノートに残してください。

メモリ関連エラー

  • 症状: MemoryError、プロセスの強制終了、スワップ増加、Jupyter kernel の停止。
  • 原因: FASTQ/VCF/発現行列などを一括でメモリに読み込み、途中結果を listDataFrame に保持し続けている。
  • 確認コマンド: du -h <input>, wc -l <input>, free -h, /usr/bin/time -v <command> で入力サイズと最大メモリ使用量を確認する。
  • 対処: チャンク処理、generator、逐次書き出し、列型の縮小、不要な中間オブジェクトの解放を組み合わせる。
  • 再発防止: 解析手順に想定入力サイズ、メモリ上限、出力先、失敗時の再開単位を明記し、代表的な小規模データで smoke test を用意する。

🧪 概念例(チャンク処理の説明例)

# エラー: MemoryError
# 解決法: 「チャンク処理 + 逐次処理(/逐次書き出し)」でメモリ常駐を避ける
#
# ポイント:
# - チャンク化しても、results のように全結果をリストに溜めるとメモリを消費する
# - 逐次返却(generator)/ 逐次書き出し(ファイル・DB等)を前提にする
import csv


def process_chunk(lines):
    """
    チャンク処理の最小例。

    - この例では入力が TSV 形式(タブ区切り)である想定。
    - 実運用では要件に応じて parse/検証/集約 等を行い、逐次的に結果を返す。
    """
    for line in lines:
        # 例: TSV -> list[str]
        yield line.split("\t")


def iter_large_file_chunked(filename, chunk_size=10_000):
    """大規模ファイルをチャンクで処理し、結果を逐次返す(メモリに溜めない)"""
    with open(filename, "r", encoding="utf-8") as f:
        chunk = []
        for line in f:
            # Windows の CRLF も吸収する
            chunk.append(line.rstrip("\r\n"))

            if len(chunk) >= chunk_size:
                # チャンクの処理(例: 解析結果を iterable として返す想定)
                yield from process_chunk(chunk)
                chunk.clear()

        # 最後のチャンク
        if chunk:
            yield from process_chunk(chunk)


# 逐次書き出し例(CSV/TSV/JSONL/Parquet等、要件に合わせて選択)
def process_large_file_to_tsv(in_path, out_path, chunk_size=10_000):
    # row は list/tuple のように「列」を表すシーケンスである想定(TSVとして書き出す)。
    with open(out_path, "w", encoding="utf-8", newline="") as out:
        writer = csv.writer(out, delimiter="\t", lineterminator="\n")
        for row in iter_large_file_chunked(in_path, chunk_size=chunk_size):
            writer.writerow(row)

並列処理のデッドロック

  • 症状: 並列処理が終了しない、CPU使用率が低いまま待ち続ける、ログが途中で止まる。
  • 原因: 共有リソースの待ち合わせ、ワーカー数過多、キュー詰まり、タイムアウト未設定、リトライ時の冪等性不足。
  • 確認コマンド: ps -ef, top, jobs -l, 実行ログのタイムスタンプ、ワーカーごとの入力IDを確認する。
  • 対処: タイムアウト、キャンセル、上限回数付きリトライ、backoff、ワーカー数の制限、タスク単位のログ出力を導入する。
  • 再発防止: 並列化前に逐次実行で正しさを確認し、ワーカー数、タイムアウト、リトライ回数、失敗時の再実行単位を設定ファイルに残す。

🧪 概念例(リトライ制御の説明例)

# エラー: デッドロック発生
# 解決法: タイムアウトとリトライの実装
import concurrent.futures

def safe_parallel_execution(tasks, max_workers=4, timeout=60):
    """安全な並列実行(timeout秒で未完了のタスクを失敗扱いにする)"""
    results = []
    failed_tasks = []

    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        # タスクを投入
        future_to_task = {
            executor.submit(task): task for task in tasks
        }

        # 全体の待ち時間を制限し、未完了タスクを明示的に回収する
        done, not_done = concurrent.futures.wait(
            future_to_task,
            timeout=timeout,
        )

        for future in done:
            task = future_to_task[future]
            try:
                results.append(future.result())
            except Exception as e:
                failed_tasks.append(task)
                print(f"Task failed: {task}, Error: {e}")

        for future in not_done:
            task = future_to_task[future]
            future.cancel()
            failed_tasks.append(task)
            print(f"Task timed out: {task}")

    # リトライ処理
    if failed_tasks:
        print(f"Retrying {len(failed_tasks)} failed tasks...")
        # 実装省略(実運用では backoff、上限回数、冪等性を確認する)

    return results

数値精度の問題

  • 症状: nan / inf の発生、確率がすべて0になる、正規化後の合計が1からずれる、環境により結果が微妙に変わる。
  • 原因: 極端に小さい確率や大きい尤度を通常空間で計算している、dtype が不足している、ゼロ割りや丸め誤差を監視していない。
  • 確認コマンド: 入力配列の min / max / dtype、np.isfinite(...)、異常値件数、乱数 seed、ライブラリ version を確認する。
  • 対処: 対数空間、LogSumExp、適切な epsilon、dtype の見直し、異常値の明示的な検出を使う。
  • 再発防止: 境界値を含むテストデータ、期待値に許容誤差を持たせたテスト、乱数 seed とライブラリ version の記録を用意する。

🧪 概念例(数値安定化の説明例)

# エラー: 数値オーバーフロー/アンダーフロー
# 解決法: 対数空間での計算
import numpy as np

def stable_probability_computation(values):
    """数値的に安定な確率計算"""
    # 対数空間での計算
    log_values = np.log(values + 1e-300)  # ゼロ除算回避
    
    # LogSumExp トリック
    max_val = np.max(log_values)
    log_sum = max_val + np.log(np.sum(np.exp(log_values - max_val)))
    
    # 正規化された確率
    log_probs = log_values - log_sum
    probs = np.exp(log_probs)
    
    return probs

← 前へ: 環境構築ガイド 目次に戻る 次へ: パフォーマンス最適化 →