付録B: トラブルシューティング
トラブルシューティング項目
この付録では、単に「解決法」を列挙するのではなく、運用で再利用しやすいように 症状 / 原因 / 確認コマンド / 対処 / 再発防止 の順で整理します。実際の解析では、入力データ、実行環境、使用ツール、ログ、実行時間、メモリ使用量を同じチケットや解析ノートに残してください。
メモリ関連エラー
- 症状:
MemoryError、プロセスの強制終了、スワップ増加、Jupyter kernel の停止。 - 原因: FASTQ/VCF/発現行列などを一括でメモリに読み込み、途中結果を
listやDataFrameに保持し続けている。 - 確認コマンド:
du -h <input>,wc -l <input>,free -h,/usr/bin/time -v <command>で入力サイズと最大メモリ使用量を確認する。 - 対処: チャンク処理、generator、逐次書き出し、列型の縮小、不要な中間オブジェクトの解放を組み合わせる。
- 再発防止: 解析手順に想定入力サイズ、メモリ上限、出力先、失敗時の再開単位を明記し、代表的な小規模データで smoke test を用意する。
🧪 概念例(チャンク処理の説明例)
# エラー: MemoryError
# 解決法: 「チャンク処理 + 逐次処理(/逐次書き出し)」でメモリ常駐を避ける
#
# ポイント:
# - チャンク化しても、results のように全結果をリストに溜めるとメモリを消費する
# - 逐次返却(generator)/ 逐次書き出し(ファイル・DB等)を前提にする
import csv
def process_chunk(lines):
"""
チャンク処理の最小例。
- この例では入力が TSV 形式(タブ区切り)である想定。
- 実運用では要件に応じて parse/検証/集約 等を行い、逐次的に結果を返す。
"""
for line in lines:
# 例: TSV -> list[str]
yield line.split("\t")
def iter_large_file_chunked(filename, chunk_size=10_000):
"""大規模ファイルをチャンクで処理し、結果を逐次返す(メモリに溜めない)"""
with open(filename, "r", encoding="utf-8") as f:
chunk = []
for line in f:
# Windows の CRLF も吸収する
chunk.append(line.rstrip("\r\n"))
if len(chunk) >= chunk_size:
# チャンクの処理(例: 解析結果を iterable として返す想定)
yield from process_chunk(chunk)
chunk.clear()
# 最後のチャンク
if chunk:
yield from process_chunk(chunk)
# 逐次書き出し例(CSV/TSV/JSONL/Parquet等、要件に合わせて選択)
def process_large_file_to_tsv(in_path, out_path, chunk_size=10_000):
# row は list/tuple のように「列」を表すシーケンスである想定(TSVとして書き出す)。
with open(out_path, "w", encoding="utf-8", newline="") as out:
writer = csv.writer(out, delimiter="\t", lineterminator="\n")
for row in iter_large_file_chunked(in_path, chunk_size=chunk_size):
writer.writerow(row)
並列処理のデッドロック
- 症状: 並列処理が終了しない、CPU使用率が低いまま待ち続ける、ログが途中で止まる。
- 原因: 共有リソースの待ち合わせ、ワーカー数過多、キュー詰まり、タイムアウト未設定、リトライ時の冪等性不足。
- 確認コマンド:
ps -ef,top,jobs -l, 実行ログのタイムスタンプ、ワーカーごとの入力IDを確認する。 - 対処: タイムアウト、キャンセル、上限回数付きリトライ、backoff、ワーカー数の制限、タスク単位のログ出力を導入する。
- 再発防止: 並列化前に逐次実行で正しさを確認し、ワーカー数、タイムアウト、リトライ回数、失敗時の再実行単位を設定ファイルに残す。
🧪 概念例(リトライ制御の説明例)
# エラー: デッドロック発生
# 解決法: タイムアウトとリトライの実装
import concurrent.futures
def safe_parallel_execution(tasks, max_workers=4, timeout=60):
"""安全な並列実行(timeout秒で未完了のタスクを失敗扱いにする)"""
results = []
failed_tasks = []
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
# タスクを投入
future_to_task = {
executor.submit(task): task for task in tasks
}
# 全体の待ち時間を制限し、未完了タスクを明示的に回収する
done, not_done = concurrent.futures.wait(
future_to_task,
timeout=timeout,
)
for future in done:
task = future_to_task[future]
try:
results.append(future.result())
except Exception as e:
failed_tasks.append(task)
print(f"Task failed: {task}, Error: {e}")
for future in not_done:
task = future_to_task[future]
future.cancel()
failed_tasks.append(task)
print(f"Task timed out: {task}")
# リトライ処理
if failed_tasks:
print(f"Retrying {len(failed_tasks)} failed tasks...")
# 実装省略(実運用では backoff、上限回数、冪等性を確認する)
return results
数値精度の問題
- 症状:
nan/infの発生、確率がすべて0になる、正規化後の合計が1からずれる、環境により結果が微妙に変わる。 - 原因: 極端に小さい確率や大きい尤度を通常空間で計算している、dtype が不足している、ゼロ割りや丸め誤差を監視していない。
- 確認コマンド: 入力配列の
min/max/ dtype、np.isfinite(...)、異常値件数、乱数 seed、ライブラリ version を確認する。 - 対処: 対数空間、LogSumExp、適切な epsilon、dtype の見直し、異常値の明示的な検出を使う。
- 再発防止: 境界値を含むテストデータ、期待値に許容誤差を持たせたテスト、乱数 seed とライブラリ version の記録を用意する。
🧪 概念例(数値安定化の説明例)
# エラー: 数値オーバーフロー/アンダーフロー
# 解決法: 対数空間での計算
import numpy as np
def stable_probability_computation(values):
"""数値的に安定な確率計算"""
# 対数空間での計算
log_values = np.log(values + 1e-300) # ゼロ除算回避
# LogSumExp トリック
max_val = np.max(log_values)
log_sum = max_val + np.log(np.sum(np.exp(log_values - max_val)))
# 正規化された確率
log_probs = log_values - log_sum
probs = np.exp(log_probs)
return probs
| ← 前へ: 環境構築ガイド | 目次に戻る | 次へ: パフォーマンス最適化 → |