付録C: パフォーマンス最適化

最適化前のトラブルシューティング手順

パフォーマンス最適化では、先に 症状 / 原因 / 確認コマンド / 対処 / 再発防止 を分けて記録します。測定せずに高速化を始めると、I/O待ち、アルゴリズム、メモリ、ネットワーク、外部API待ちのどれが支配的か分からず、変更後の効果も説明できません。

  • 症状: 実行時間が長い、メモリ使用量が上限に近い、クラウド費用が増える、CIやsmoke testがタイムアウトする。
  • 原因: アルゴリズムの計算量、不要な全件読み込み、I/O待ち、過剰な並列化、外部サービス待ち、ログ不足。
  • 確認コマンド: /usr/bin/time -v <command>, python -m cProfile <script.py>, du -h <input>, wc -l <input> で時間、メモリ、入力規模を記録する。
  • 対処: プロファイリング結果に基づき、データ型、チャンクサイズ、I/O、並列度、キャッシュ、アルゴリズムを一つずつ変更する。
  • 再発防止: ベースライン入力、許容実行時間、最大メモリ、測定コマンド、変更前後の結果を解析ノートやCIログに残す。

プロファイリングとボトルネック特定

  • 症状: 実行時間が予想より長いが、どの関数や処理段階が遅いか説明できない。
  • 原因: 測定なしに最適化している、I/O待ちとCPU計算を分けていない、入力サイズ別の傾向を記録していない。
  • 確認コマンド: python -m cProfile <script.py>, /usr/bin/time -v <command>, ログの開始・終了時刻を使ってボトルネックを特定する。
  • 対処: 上位の関数、I/O、外部API、アルゴリズムのどれが支配的かを分け、影響が大きい箇所から変更する。
  • 再発防止: 代表入力でのベースライン時間とメモリを記録し、変更後のベンチマークをPRや解析ノートに残す。

🧪 概念例(プロファイリング手法の説明例)

import cProfile
import pstats
from line_profiler import LineProfiler

def profile_function(func):
    """関数のプロファイリング"""
    def wrapper(*args, **kwargs):
        profiler = cProfile.Profile()
        profiler.enable()

        result = func(*args, **kwargs)

        profiler.disable()
        stats = pstats.Stats(profiler).sort_stats('cumulative')
        stats.print_stats(10)  # 上位10個の関数を表示

        return result

    return wrapper

### 使用例
@profile_function
def slow_function():
    # 処理内容
    pass

メモリ効率的なデータ構造

  • 症状: 実行途中でメモリ上限に近づく、同じ入力でも環境により処理が落ちる、GC後も使用量が下がらない。
  • 原因: 必要以上に大きい dtype、全件読み込み、不要なコピー、疎なデータを密行列で保持している。
  • 確認コマンド: /usr/bin/time -v <command>, du -h <input>, python -c "import numpy as np; a=np.load('<input.npy>'); print(a.dtype, a.shape, np.nanmin(a), np.nanmax(a))" などで dtype、shape、欠測率、最大値・最小値を確認する。
  • 対処: dtype の縮小、カテゴリ化、疎行列、チャンク処理、不要列の削除を検討する。
  • 再発防止: 入力スキーマ、許容 dtype、メモリ上限、縮小前後の精度差を記録し、境界値テストを残す。

🧪 概念例(メモリ最適化の説明例)

### NumPy配列の型指定によるメモリ削減
import numpy as np

def optimize_memory_usage(data):
    """データ型の最適化によるメモリ削減"""
    if data.dtype == np.float64:
        # float32で十分な精度の場合
        if np.all(np.abs(data) < 1e6):
            data = data.astype(np.float32)

    elif data.dtype == np.int64:
        # より小さい整数型で表現可能な場合
        if data.min() >= 0 and data.max() <= 255:
            data = data.astype(np.uint8)
        elif data.min() >= -128 and data.max() <= 127:
            data = data.astype(np.int8)

    return data

並列化の最適化

  • 症状: ワーカーを増やしても速くならない、CPU使用率が不安定、I/O待ちやメモリ使用量だけが増える。
  • 原因: 並列化対象が小さすぎる、共有リソースで競合している、I/Oが支配的、ワーカーごとのメモリ使用量を見積もっていない。
  • 確認コマンド: top, /usr/bin/time -v <command>, ワーカー数別の実行時間・最大メモリ・失敗率を表にする。
  • 対処: ワーカー数を段階的に変え、I/O待ちならバッチサイズやキャッシュ、CPU支配ならアルゴリズムやJITの効果を分けて確認する。
  • 再発防止: 推奨ワーカー数、入力サイズ、CPU/メモリ/I/O条件、タイムアウト設定を手順書に残す。

この例は Numba / LLVM / CPU 命令セットに依存するため、検証時は Python、Numba、NumPy、OS、CPU の情報をログに残します。Numba を利用できない環境では、純粋な NumPy 実装や小さな入力での逐次実装と出力を比較して代替確認します。

⚠️ 環境依存(Numba/JIT環境依存の最適化例)

from numba import jit, prange
import numpy as np

@jit(nopython=True, parallel=True)
def parallel_computation(data):
    """Numbaによる並列化"""
    n = len(data)
    result = np.zeros(n)

    for i in prange(n):
        # 並列実行される処理
        result[i] = expensive_operation(data[i])

    return result

@jit(nopython=True)
def expensive_operation(x):
    """計算集約的な処理(Numba用)"""
    return x ** 2 + np.sin(x) * np.exp(-x)

← 前へ: トラブルシューティング 目次に戻る 次へ: セキュリティ →