付録B: 詳細なトラブルシューティング

よくあるエラーと解決法

メモリ関連エラー

🧪 概念例(擬似コード)

# エラー: MemoryError
# 解決法: 「チャンク処理 + 逐次処理(/逐次書き出し)」でメモリ常駐を避ける
#
# ポイント:
# - チャンク化しても、results のように全結果をリストに溜めるとメモリを消費する
# - 逐次返却(generator)/ 逐次書き出し(ファイル・DB等)を前提にする
import csv


def process_chunk(lines):
    """
    チャンク処理の最小例。

    - この例では入力が TSV 形式(タブ区切り)である想定。
    - 実運用では要件に応じて parse/検証/集約 等を行い、逐次的に結果を返す。
    """
    for line in lines:
        # 例: TSV -> list[str]
        yield line.split("\t")


def iter_large_file_chunked(filename, chunk_size=10_000):
    """大規模ファイルをチャンクで処理し、結果を逐次返す(メモリに溜めない)"""
    with open(filename, "r", encoding="utf-8") as f:
        chunk = []
        for line in f:
            # Windows の CRLF も吸収する
            chunk.append(line.rstrip("\r\n"))

            if len(chunk) >= chunk_size:
                # チャンクの処理(例: 解析結果を iterable として返す想定)
                yield from process_chunk(chunk)
                chunk.clear()

        # 最後のチャンク
        if chunk:
            yield from process_chunk(chunk)


# 逐次書き出し例(CSV/TSV/JSONL/Parquet等、要件に合わせて選択)
def process_large_file_to_tsv(in_path, out_path, chunk_size=10_000):
    # row は list/tuple のように「列」を表すシーケンスである想定(TSVとして書き出す)。
    with open(out_path, "w", encoding="utf-8", newline="") as out:
        writer = csv.writer(out, delimiter="\t", lineterminator="\n")
        for row in iter_large_file_chunked(in_path, chunk_size=chunk_size):
            writer.writerow(row)

並列処理のデッドロック

🧪 概念例(擬似コード)

# エラー: デッドロック発生
# 解決法: タイムアウトとリトライの実装
import concurrent.futures
import time

def safe_parallel_execution(tasks, max_workers=4, timeout=60):
    """安全な並列実行"""
    results = []
    failed_tasks = []
    
    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        # タスクを投入
        future_to_task = {
            executor.submit(task): task for task in tasks
        }
        
        # 結果を収集
        for future in concurrent.futures.as_completed(future_to_task, timeout=timeout):
            task = future_to_task[future]
            try:
                result = future.result(timeout=10)
                results.append(result)
            except concurrent.futures.TimeoutError:
                failed_tasks.append(task)
                print(f"Task timed out: {task}")
            except Exception as e:
                failed_tasks.append(task)
                print(f"Task failed: {task}, Error: {e}")
    
    # リトライ処理
    if failed_tasks:
        print(f"Retrying {len(failed_tasks)} failed tasks...")
        # 実装省略
    
    return results

数値精度の問題

🧪 概念例(擬似コード)

# エラー: 数値オーバーフロー/アンダーフロー
# 解決法: 対数空間での計算
import numpy as np

def stable_probability_computation(values):
    """数値的に安定な確率計算"""
    # 対数空間での計算
    log_values = np.log(values + 1e-300)  # ゼロ除算回避
    
    # LogSumExp トリック
    max_val = np.max(log_values)
    log_sum = max_val + np.log(np.sum(np.exp(log_values - max_val)))
    
    # 正規化された確率
    log_probs = log_values - log_sum
    probs = np.exp(log_probs)
    
    return probs