付録B: 詳細なトラブルシューティング
よくあるエラーと解決法
メモリ関連エラー
🧪 概念例(擬似コード)
# エラー: MemoryError
# 解決法: 「チャンク処理 + 逐次処理(/逐次書き出し)」でメモリ常駐を避ける
#
# ポイント:
# - チャンク化しても、results のように全結果をリストに溜めるとメモリを消費する
# - 逐次返却(generator)/ 逐次書き出し(ファイル・DB等)を前提にする
import csv
def process_chunk(lines):
"""
チャンク処理の最小例。
- この例では入力が TSV 形式(タブ区切り)である想定。
- 実運用では要件に応じて parse/検証/集約 等を行い、逐次的に結果を返す。
"""
for line in lines:
# 例: TSV -> list[str]
yield line.split("\t")
def iter_large_file_chunked(filename, chunk_size=10_000):
"""大規模ファイルをチャンクで処理し、結果を逐次返す(メモリに溜めない)"""
with open(filename, "r", encoding="utf-8") as f:
chunk = []
for line in f:
# Windows の CRLF も吸収する
chunk.append(line.rstrip("\r\n"))
if len(chunk) >= chunk_size:
# チャンクの処理(例: 解析結果を iterable として返す想定)
yield from process_chunk(chunk)
chunk.clear()
# 最後のチャンク
if chunk:
yield from process_chunk(chunk)
# 逐次書き出し例(CSV/TSV/JSONL/Parquet等、要件に合わせて選択)
def process_large_file_to_tsv(in_path, out_path, chunk_size=10_000):
# row は list/tuple のように「列」を表すシーケンスである想定(TSVとして書き出す)。
with open(out_path, "w", encoding="utf-8", newline="") as out:
writer = csv.writer(out, delimiter="\t", lineterminator="\n")
for row in iter_large_file_chunked(in_path, chunk_size=chunk_size):
writer.writerow(row)
並列処理のデッドロック
🧪 概念例(擬似コード)
# エラー: デッドロック発生
# 解決法: タイムアウトとリトライの実装
import concurrent.futures
import time
def safe_parallel_execution(tasks, max_workers=4, timeout=60):
"""安全な並列実行"""
results = []
failed_tasks = []
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
# タスクを投入
future_to_task = {
executor.submit(task): task for task in tasks
}
# 結果を収集
for future in concurrent.futures.as_completed(future_to_task, timeout=timeout):
task = future_to_task[future]
try:
result = future.result(timeout=10)
results.append(result)
except concurrent.futures.TimeoutError:
failed_tasks.append(task)
print(f"Task timed out: {task}")
except Exception as e:
failed_tasks.append(task)
print(f"Task failed: {task}, Error: {e}")
# リトライ処理
if failed_tasks:
print(f"Retrying {len(failed_tasks)} failed tasks...")
# 実装省略
return results
数値精度の問題
🧪 概念例(擬似コード)
# エラー: 数値オーバーフロー/アンダーフロー
# 解決法: 対数空間での計算
import numpy as np
def stable_probability_computation(values):
"""数値的に安定な確率計算"""
# 対数空間での計算
log_values = np.log(values + 1e-300) # ゼロ除算回避
# LogSumExp トリック
max_val = np.max(log_values)
log_sum = max_val + np.log(np.sum(np.exp(log_values - max_val)))
# 正規化された確率
log_probs = log_values - log_sum
probs = np.exp(log_probs)
return probs