付録C: パフォーマンス最適化
最適化前のトラブルシューティング手順
パフォーマンス最適化では、先に 症状 / 原因 / 確認コマンド / 対処 / 再発防止 を分けて記録します。測定せずに高速化を始めると、I/O待ち、アルゴリズム、メモリ、ネットワーク、外部API待ちのどれが支配的か分からず、変更後の効果も説明できません。
- 症状: 実行時間が長い、メモリ使用量が上限に近い、クラウド費用が増える、CIやsmoke testがタイムアウトする。
- 原因: アルゴリズムの計算量、不要な全件読み込み、I/O待ち、過剰な並列化、外部サービス待ち、ログ不足。
- 確認コマンド:
/usr/bin/time -v <command>,python -m cProfile <script.py>,du -h <input>,wc -l <input>で時間、メモリ、入力規模を記録する。 - 対処: プロファイリング結果に基づき、データ型、チャンクサイズ、I/O、並列度、キャッシュ、アルゴリズムを一つずつ変更する。
- 再発防止: ベースライン入力、許容実行時間、最大メモリ、測定コマンド、変更前後の結果を解析ノートやCIログに残す。
プロファイリングとボトルネック特定
- 症状: 実行時間が予想より長いが、どの関数や処理段階が遅いか説明できない。
- 原因: 測定なしに最適化している、I/O待ちとCPU計算を分けていない、入力サイズ別の傾向を記録していない。
- 確認コマンド:
python -m cProfile <script.py>,/usr/bin/time -v <command>, ログの開始・終了時刻を使ってボトルネックを特定する。 - 対処: 上位の関数、I/O、外部API、アルゴリズムのどれが支配的かを分け、影響が大きい箇所から変更する。
- 再発防止: 代表入力でのベースライン時間とメモリを記録し、変更後のベンチマークをPRや解析ノートに残す。
🧪 概念例(プロファイリング手法の説明例)
import cProfile
import pstats
from line_profiler import LineProfiler
def profile_function(func):
"""関数のプロファイリング"""
def wrapper(*args, **kwargs):
profiler = cProfile.Profile()
profiler.enable()
result = func(*args, **kwargs)
profiler.disable()
stats = pstats.Stats(profiler).sort_stats('cumulative')
stats.print_stats(10) # 上位10個の関数を表示
return result
return wrapper
### 使用例
@profile_function
def slow_function():
# 処理内容
pass
メモリ効率的なデータ構造
- 症状: 実行途中でメモリ上限に近づく、同じ入力でも環境により処理が落ちる、GC後も使用量が下がらない。
- 原因: 必要以上に大きい dtype、全件読み込み、不要なコピー、疎なデータを密行列で保持している。
- 確認コマンド:
/usr/bin/time -v <command>,du -h <input>,python -c "import numpy as np; a=np.load('<input.npy>'); print(a.dtype, a.shape, np.nanmin(a), np.nanmax(a))"などで dtype、shape、欠測率、最大値・最小値を確認する。 - 対処: dtype の縮小、カテゴリ化、疎行列、チャンク処理、不要列の削除を検討する。
- 再発防止: 入力スキーマ、許容 dtype、メモリ上限、縮小前後の精度差を記録し、境界値テストを残す。
🧪 概念例(メモリ最適化の説明例)
### NumPy配列の型指定によるメモリ削減
import numpy as np
def optimize_memory_usage(data):
"""データ型の最適化によるメモリ削減"""
if data.dtype == np.float64:
# float32で十分な精度の場合
if np.all(np.abs(data) < 1e6):
data = data.astype(np.float32)
elif data.dtype == np.int64:
# より小さい整数型で表現可能な場合
if data.min() >= 0 and data.max() <= 255:
data = data.astype(np.uint8)
elif data.min() >= -128 and data.max() <= 127:
data = data.astype(np.int8)
return data
並列化の最適化
- 症状: ワーカーを増やしても速くならない、CPU使用率が不安定、I/O待ちやメモリ使用量だけが増える。
- 原因: 並列化対象が小さすぎる、共有リソースで競合している、I/Oが支配的、ワーカーごとのメモリ使用量を見積もっていない。
- 確認コマンド:
top,/usr/bin/time -v <command>, ワーカー数別の実行時間・最大メモリ・失敗率を表にする。 - 対処: ワーカー数を段階的に変え、I/O待ちならバッチサイズやキャッシュ、CPU支配ならアルゴリズムやJITの効果を分けて確認する。
- 再発防止: 推奨ワーカー数、入力サイズ、CPU/メモリ/I/O条件、タイムアウト設定を手順書に残す。
この例は Numba / LLVM / CPU 命令セットに依存するため、検証時は Python、Numba、NumPy、OS、CPU の情報をログに残します。Numba を利用できない環境では、純粋な NumPy 実装や小さな入力での逐次実装と出力を比較して代替確認します。
⚠️ 環境依存(Numba/JIT環境依存の最適化例)
from numba import jit, prange
import numpy as np
@jit(nopython=True, parallel=True)
def parallel_computation(data):
"""Numbaによる並列化"""
n = len(data)
result = np.zeros(n)
for i in prange(n):
# 並列実行される処理
result[i] = expensive_operation(data[i])
return result
@jit(nopython=True)
def expensive_operation(x):
"""計算集約的な処理(Numba用)"""
return x ** 2 + np.sin(x) * np.exp(-x)
| ← 前へ: トラブルシューティング | 目次に戻る | 次へ: セキュリティ → |