Chapter 5-3: 拡張性設計とパフォーマンス最適化 📈
📚 目次に戻る: 📖 学習ガイド
⬅️ 前の章: Chapter 5-2: マルチテナンシーと複雑ビジネスロジック
➡️ 次の章: Chapter 6: パフォーマンス最適化
🏗️ アーキテクチャ: 独立APIサーバー(FastAPI + スケーリング・最適化)
🎯 学習レベル: 🌱 基礎 | 🚀 応用 | 💪 発展
⏱️ 推定学習時間: 5-7時間
📝 難易度: 上級(5-1,5-2完了・インフラ・運用知識必要)
—
🎯 この章で学ぶこと(初心者向け)
この章では、Chapter 5-1, 5-2で構築したSaaSプラットフォームを、「全国チェーン展開」レベルまで成長させる技術を学びます。
- 🌱 初心者: システムが重くならない仕組み(キャッシュ・最適化)がわかる
- 🚀 中級者: 大量アクセスに耐えるスケーリング戦略がわかる
- 💪 上級者: エンタープライズ級の監視・運用システムが構築できる
💡 まずは身近な例から:「全国チェーン展開する人気ラーメン店」
想像してみてください。あなたの小さなラーメン店が大人気になって、全国チェーン展開することになりました:
🍜 人気ラーメンチェーン「麺屋サプリ」の成長ストーリー
📍 Phase 1: 個人店(現在のシステム)
├── 👨🍳 店長1人:注文から配膳まで全て対応
├── 🪑 座席10席:お客さん10人まで
└── 📝 手書き注文:メモ帳で管理
📍 Phase 2: 地域チェーン(スケールアップ)
├── 👥 スタッフ5人:役割分担で効率化
├── 🪑 座席30席:より多くのお客さんに対応
└── 📱 デジタル注文:タブレットで管理
📍 Phase 3: 全国チェーン(スケールアウト)
├── 🏢 50店舗:各地に支店を展開
├── 🏭 セントラルキッチン:効率的な仕込み
├── 📊 本部システム:全店舗の一元管理
└── 🤖 自動化:注文・在庫・売上の自動化
🤔 なぜスケーリングが必要?
小さなシステムと大規模システムでは、まったく違うアプローチが必要です:
課題 | 個人店レベル | 地域チェーンレベル | 全国チェーンレベル | 解決方法 |
---|---|---|---|---|
🐌 処理速度 | 手作業でも十分 | タブレット導入 | 高速システム必須 | キャッシュ・データベース最適化 |
👥 利用者数 | 常連10人 | 地域住民100人 | 全国10万人 | ロードバランシング・サーバー増設 |
📊 データ量 | 手書きメモ | 1日100件の注文 | 1日10万件の注文 | データベース分散・アーカイブ |
🚨 障害対応 | 店長が対応 | マネージャーが対応 | 24時間監視体制 | 自動監視・アラートシステム |
🔧 メンテナンス | 閉店時に対応 | 深夜に対応 | 無停止更新 | CI/CD・ブルーグリーンデプロイ |
🎉 最適化されたシステムなら…
flowchart TD
A[🌍 全国のお客さん] --> B[🌐 ロードバランサー<br/>注文を適切に振り分け]
B --> C[🖥️ サーバー1<br/>東日本担当]
B --> D[🖥️ サーバー2<br/>西日本担当]
B --> E[🖥️ サーバー3<br/>予備サーバー]
C --> F[⚡ Redis キャッシュ<br/>よく注文される商品を高速表示]
D --> F
E --> F
F --> G[🗄️ メインデータベース<br/>全ての注文・顧客データ]
F --> H[📊 分析データベース<br/>売上・統計データ]
I[🤖 監視システム] --> C
I --> D
I --> E
I --> J[📧 障害時に自動メール通知]
メリット:
- ⚡ 高速処理: キャッシュでよく使うデータを瞬時に表示
- 🏔️ 高可用性: サーバーが落ちても他のサーバーで継続
- 📈 自動スケール: アクセス増加時に自動でサーバー追加
- 🔍 全自動監視: 問題発生時に即座に通知・対応
🔍 実際のコードを見てみよう!
📄 Step 1: 高速化の仕組み(Redis キャッシュ)
まず、「よく注文される商品を瞬時に表示する」仕組みを見てみましょう:
# src/chapter05-saas-platform/backend/app/core/cache.py(重要部分を抜粋)
import redis
import json
from datetime import timedelta
from typing import Any, Optional
from functools import wraps
class CacheManager:
"""高速キャッシュ管理システム(コンビニの冷蔵庫みたいな仕組み)"""
def __init__(self):
# Redis接続(超高速なデータ保存場所)
self.redis_client = redis.from_url(
settings.REDIS_URL,
decode_responses=False,
socket_keepalive=True, # 接続を維持して高速化
health_check_interval=30 # 30秒ごとに健康チェック
)
self.default_ttl = 3600 # 1時間でデータ自動削除
def get(self, key: str, default: Any = None) -> Any:
"""データを高速取得(冷蔵庫からドリンクを取り出す感じ)"""
try:
cached_data = self.redis_client.get(self._make_key(key))
if cached_data is None:
return default # キャッシュにない場合はデフォルト値
# データを元の形に復元
try:
return json.loads(cached_data) # JSON形式で保存されている場合
except json.JSONDecodeError:
import pickle
return pickle.loads(cached_data) # バイナリ形式の場合
except Exception as e:
print(f"キャッシュ取得エラー: {e}")
return default
def set(self, key: str, value: Any, ttl: Optional[int] = None) -> bool:
"""データを高速保存(冷蔵庫にドリンクを入れる感じ)"""
try:
# 保存期間設定
if ttl is None:
ttl = self.default_ttl # デフォルト1時間
elif isinstance(ttl, timedelta):
ttl = int(ttl.total_seconds())
# データを保存用に変換
try:
serialized_value = json.dumps(value, default=str)
except (TypeError, ValueError):
import pickle
serialized_value = pickle.dumps(value)
# Redis に保存(期限付き)
return self.redis_client.setex(
self._make_key(key),
ttl,
serialized_value
)
except Exception as e:
print(f"キャッシュ保存エラー: {e}")
return False
def get_or_set(self, key: str, callable_obj, ttl: Optional[int] = None) -> Any:
"""キャッシュがあれば使う、なければ計算して保存"""
# まずキャッシュを確認
cached_value = self.get(key)
if cached_value is not None:
return cached_value # キャッシュヒット!高速で返す
# キャッシュになければ、時間のかかる処理を実行
fresh_value = callable_obj()
self.set(key, fresh_value, ttl) # 次回のために保存
return fresh_value
def _make_key(self, key: str) -> str:
"""キー名を統一(アプリ名を付けて他と区別)"""
return f"{settings.PROJECT_NAME}:{key}"
# グローバルキャッシュ(どこからでも使える)
cache = CacheManager()
# 魔法のデコレーター(関数の結果を自動キャッシュ)
def cached(ttl: Optional[int] = None):
"""関数の結果を自動でキャッシュするデコレーター"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
# 引数からキャッシュキーを生成
import hashlib
args_str = str(args) + str(sorted(kwargs.items()))
cache_key = f"{func.__name__}:{hashlib.md5(args_str.encode()).hexdigest()}"
# キャッシュがあれば使う、なければ実行して保存
return cache.get_or_set(cache_key, lambda: func(*args, **kwargs), ttl)
return wrapper
return decorator
# 使用例:重い計算を1回だけ実行
@cached(ttl=600) # 10分間キャッシュ
def get_user_dashboard_data(user_id: int):
"""ユーザーダッシュボードの重いデータ取得(最初だけ時間かかる)"""
print(f"重い計算実行中... ユーザー {user_id}")
# 重い処理(データベース検索、集計など)
# 2回目以降は瞬時に返ってくる!
time.sleep(2) # 2秒の重い処理をシミュレート
return {
"user_id": user_id,
"total_projects": 15,
"completed_tasks": 127,
"team_members": 8
}
🔰 初心者向け解説:
概念 | 何をしているか | 身近な例 |
---|---|---|
Redis |
超高速でデータを保存・取得できる場所 | コンビニの冷蔵庫(よく売れる商品を手前に) |
TTL(生存時間) |
データを自動で削除する期限 | 食材の賞味期限 |
get_or_set() |
あれば使う、なければ作って保存 | 「冷蔵庫にあれば取る、なければ作って入れる」 |
@cached デコレーター |
関数に魔法をかけて自動でキャッシュ | 「1回作ったら同じ料理は冷凍保存」 |
cache_key |
データを識別するラベル | 冷凍庫の食材に貼るラベル |
📄 Step 2: データベース最適化(大量データを賢く管理)
大量のお客さんが来ても、データベースが重くならない仕組みを見てみましょう:
# src/chapter05-saas-platform/backend/app/core/database_optimization.py(重要部分を抜粋)
from sqlalchemy import event, text
from sqlalchemy.engine import Engine
import time
import logging
class DatabaseOptimizer:
"""データベース最適化システム(図書館の司書さんみたいな仕組み)"""
def __init__(self, engine):
self.engine = engine
self._setup_query_monitoring() # クエリ監視を開始
def _setup_query_monitoring(self):
"""遅いクエリを自動で見つける仕組み"""
@event.listens_for(Engine, "before_cursor_execute")
def receive_before_cursor_execute(conn, cursor, statement, parameters, context, executemany):
# クエリ開始時刻を記録
context._query_start_time = time.time()
@event.listens_for(Engine, "after_cursor_execute")
def receive_after_cursor_execute(conn, cursor, statement, parameters, context, executemany):
# クエリ実行時間を計算
total = time.time() - context._query_start_time
# 1秒以上かかったクエリを警告(お客さんを待たせるレベル)
if total > 1.0:
logging.warning(
f"🐌 遅いクエリ発見!実行時間: {total:.2f}秒\n"
f"SQL: {statement}\n"
f"パラメータ: {parameters}"
)
def create_connection_pool(self):
"""コネクションプール設定(窓口カウンターの効率化)"""
# データベース接続を使い回す仕組み(窓口の人数を調整)
from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool
engine = create_engine(
"postgresql://user:pass@localhost/db",
# プール設定(銀行窓口の人数管理みたいな感じ)
poolclass=QueuePool,
pool_size=10, # 常時待機する窓口数
max_overflow=20, # 混雑時に増やせる窓口数
pool_timeout=30, # 窓口待ちの最大時間(秒)
pool_recycle=3600, # 1時間で窓口担当者を交代
# 接続オプション
echo=False, # SQLログ出力(開発時はTrue)
future=True # 新しいAPIを使用
)
return engine
def setup_database_partitioning(self):
"""テーブル分割設定(大きな本棚を分割管理)"""
# 日付ベースでテーブル分割(月ごとの帳簿みたいに分ける)
partition_sql = """
-- タスクテーブルを月別に分割
CREATE TABLE tasks_y2024m01 PARTITION OF tasks
FOR VALUES FROM ('2024-01-01') TO ('2024-02-01');
CREATE TABLE tasks_y2024m02 PARTITION OF tasks
FOR VALUES FROM ('2024-02-01') TO ('2024-03-01');
-- 古いパーティションの自動削除(古い帳簿の廃棄)
CREATE OR REPLACE FUNCTION cleanup_old_partitions()
RETURNS void AS $$
DECLARE
partition_name text;
BEGIN
-- 1年以上古いパーティションを削除
FOR partition_name IN
SELECT schemaname||'.'||tablename
FROM pg_tables
WHERE tablename LIKE 'tasks_y%'
AND tablename < 'tasks_y' || to_char(CURRENT_DATE - INTERVAL '1 year', 'YYYY"m"MM')
LOOP
EXECUTE 'DROP TABLE IF EXISTS ' || partition_name;
RAISE NOTICE 'Dropped old partition: %', partition_name;
END LOOP;
END;
$$ LANGUAGE plpgsql;
"""
with self.engine.connect() as conn:
conn.execute(text(partition_sql))
conn.commit()
🔰 初心者向け解説:
概念 | 何をしているか | 身近な例 |
---|---|---|
@event.listens_for |
データベース操作を自動で監視 | 図書館で「本を借りる時間」を自動計測 |
コネクションプール |
データベース接続を使い回し | 銀行窓口の人数を効率的に管理 |
pool_size=10 |
常時待機する接続数 | 平常時の窓口担当者10人 |
max_overflow=20 |
混雑時に増やせる接続数 | 忙しい時だけ追加で20人投入 |
テーブル分割 |
大きなテーブルを月別に分ける | 年鑑を月別に分けて保管 |
📄 Step 3: ロードバランサー実装(お客さんの案内係)
複数のサーバーに上手にお客さんを振り分ける仕組みを見てみましょう:
# src/chapter05-saas-platform/backend/app/core/load_balancer.py(実装例)
import random
import time
from typing import List, Dict, Optional, Any
from dataclasses import dataclass, field
from enum import Enum
import asyncio
import aiohttp
@dataclass
class ServerNode:
"""サーバーノード情報(レストランの各店舗)"""
id: str
host: str
port: int
weight: int = 1 # 処理能力の重み(大きな店舗は重み大)
is_healthy: bool = True # サーバーの健康状態
current_connections: int = 0 # 現在の接続数
last_health_check: float = field(default_factory=time.time)
response_time: float = 0.0 # 平均応答時間
class LoadBalancingStrategy(Enum):
"""負荷分散方式"""
ROUND_ROBIN = "round_robin" # 順番に案内
WEIGHTED_ROUND_ROBIN = "weighted" # 能力に応じて案内
LEAST_CONNECTIONS = "least_conn" # 空いてる店舗優先
RANDOM = "random" # ランダム案内
class LoadBalancer:
"""ロードバランサー(チェーン店の案内係)"""
def __init__(self, strategy: LoadBalancingStrategy = LoadBalancingStrategy.ROUND_ROBIN):
self.servers: List[ServerNode] = []
self.strategy = strategy
self.current_index = 0 # Round Robin用のカウンター
self.health_check_interval = 30 # 30秒ごとに健康診断
def add_server(self, server: ServerNode):
"""サーバーを追加(新店舗オープン)"""
self.servers.append(server)
print(f"🏪 新店舗追加: {server.host}:{server.port} (重み: {server.weight})")
def get_next_server(self) -> Optional[ServerNode]:
"""次のサーバーを選択(お客さんをどの店舗に案内するか)"""
# 健康なサーバーだけフィルタリング
healthy_servers = [s for s in self.servers if s.is_healthy]
if not healthy_servers:
print("⚠️ 利用可能な店舗がありません!")
return None
if self.strategy == LoadBalancingStrategy.ROUND_ROBIN:
return self._round_robin_select(healthy_servers)
elif self.strategy == LoadBalancingStrategy.WEIGHTED_ROUND_ROBIN:
return self._weighted_round_robin_select(healthy_servers)
elif self.strategy == LoadBalancingStrategy.LEAST_CONNECTIONS:
return self._least_connections_select(healthy_servers)
else: # RANDOM
return random.choice(healthy_servers)
def _round_robin_select(self, servers: List[ServerNode]) -> ServerNode:
"""順番選択(順番に案内)"""
if not servers:
return None
# 現在のインデックスを更新
server = servers[self.current_index % len(servers)]
self.current_index += 1
print(f"🎯 Round Robin: {server.host}:{server.port} を選択")
return server
def _weighted_round_robin_select(self, servers: List[ServerNode]) -> ServerNode:
"""重み付き選択(大きな店舗により多くのお客さんを案内)"""
# 重みの合計を計算
total_weight = sum(server.weight for server in servers)
# ランダムな数値を重みで選択
random_value = random.randint(1, total_weight)
current_weight = 0
for server in servers:
current_weight += server.weight
if random_value <= current_weight:
print(f"🎯 重み付き選択: {server.host}:{server.port} (重み: {server.weight})")
return server
# フォールバック
return servers[0]
def _least_connections_select(self, servers: List[ServerNode]) -> ServerNode:
"""最小接続選択(一番空いてる店舗に案内)"""
# 接続数が最小のサーバーを選択
least_busy_server = min(servers, key=lambda s: s.current_connections)
print(f"🎯 最小接続: {least_busy_server.host}:{least_busy_server.port} "
f"(接続数: {least_busy_server.current_connections})")
return least_busy_server
async def health_check_loop(self):
"""定期健康診断(各店舗の状況チェック)"""
while True:
print("🏥 サーバー健康診断を開始...")
# 各サーバーの健康状態をチェック
for server in self.servers:
is_healthy = await self._check_server_health(server)
if server.is_healthy != is_healthy:
# 状態が変わった場合の通知
status = "回復" if is_healthy else "ダウン"
print(f"📢 {server.host}:{server.port} が{status}しました")
server.is_healthy = is_healthy
server.last_health_check = time.time()
# 健康な店舗数をレポート
healthy_count = sum(1 for s in self.servers if s.is_healthy)
print(f"📊 健康な店舗: {healthy_count}/{len(self.servers)}")
# 次の健康診断まで待機
await asyncio.sleep(self.health_check_interval)
async def _check_server_health(self, server: ServerNode) -> bool:
"""個別サーバー健康診断(店舗の営業確認)"""
try:
start_time = time.time()
# HTTPでサーバーの生存確認
async with aiohttp.ClientSession() as session:
async with session.get(
f"http://{server.host}:{server.port}/health",
timeout=aiohttp.ClientTimeout(total=5)
) as response:
# 応答時間を記録
server.response_time = time.time() - start_time
# HTTPステータスが200番台なら健康
is_healthy = 200 <= response.status < 300
if is_healthy:
print(f"✅ {server.host}:{server.port} は健康です (応答時間: {server.response_time:.2f}秒)")
else:
print(f"❌ {server.host}:{server.port} エラー応答: {response.status}")
return is_healthy
except asyncio.TimeoutError:
print(f"⏰ {server.host}:{server.port} タイムアウト")
return False
except Exception as e:
print(f"💥 {server.host}:{server.port} 接続エラー: {e}")
return False
def get_load_balancer_stats(self) -> Dict[str, Any]:
"""ロードバランサーの統計情報"""
total_servers = len(self.servers)
healthy_servers = sum(1 for s in self.servers if s.is_healthy)
total_connections = sum(s.current_connections for s in self.servers)
avg_response_time = sum(s.response_time for s in self.servers) / total_servers if total_servers > 0 else 0
return {
"strategy": self.strategy.value,
"total_servers": total_servers,
"healthy_servers": healthy_servers,
"total_connections": total_connections,
"average_response_time": avg_response_time,
"servers": [
{
"id": s.id,
"host": f"{s.host}:{s.port}",
"weight": s.weight,
"is_healthy": s.is_healthy,
"connections": s.current_connections,
"response_time": s.response_time
}
for s in self.servers
]
}
# 実際の使用例
async def setup_load_balancer():
"""ロードバランサーセットアップ例"""
# ロードバランサーを作成
lb = LoadBalancer(LoadBalancingStrategy.LEAST_CONNECTIONS)
# サーバーを追加(チェーン店舗追加)
lb.add_server(ServerNode("app1", "192.168.1.10", 8000, weight=1))
lb.add_server(ServerNode("app2", "192.168.1.11", 8000, weight=2)) # 高性能サーバー
lb.add_server(ServerNode("app3", "192.168.1.12", 8000, weight=1))
# 健康診断を開始
health_task = asyncio.create_task(lb.health_check_loop())
return lb, health_task
🔰 初心者向け解説:
概念 | 何をしているか | 身近な例 |
---|---|---|
ロードバランサー |
お客さんを効率よく店舗に案内 | ファストフード店の案内係 |
Round Robin |
順番に案内する方式 | 「1番窓口、次は2番窓口」の順番案内 |
重み付き選択 |
大きな店舗により多く案内 | 「大型店には多く、小型店には少なく」 |
最小接続 |
一番空いてる店舗に案内 | 「今一番空いてるレジに並んでください」 |
ヘルスチェック |
店舗が営業中か定期確認 | 30秒ごとに「営業してますか?」と確認 |
🔍 監視・運用システム(24時間見守るシステム)
📄 Step 4: システム監視とメトリクス収集
全国チェーンになったら、各店舗の状況をリアルタイムで監視する「本部の監視センター」が必要になります:
# src/chapter05-saas-platform/backend/app/core/monitoring.py(監視システム)
import psutil
import time
from typing import Dict, Any, List
from dataclasses import dataclass
from datetime import datetime, timedelta
import asyncio
import json
@dataclass
class SystemMetrics:
"""システム指標(店舗の状況報告書)"""
timestamp: datetime
cpu_percent: float # CPU使用率(コックさんの忙しさ)
memory_percent: float # メモリ使用率(冷蔵庫の満杯度)
disk_percent: float # ディスク使用率(倉庫の満杯度)
network_io: Dict[str, int] # ネットワークIO(お客さんの出入り)
active_connections: int # アクティブ接続数(店内のお客さん数)
response_time: float # 平均応答時間(注文から提供までの時間)
class MetricsCollector:
"""メトリクス収集システム(本部の情報収集部門)"""
def __init__(self):
self.metrics_history: List[SystemMetrics] = []
self.collection_interval = 10 # 10秒ごとに収集
self.retention_hours = 24 # 24時間分保持
async def start_collection(self):
"""メトリクス収集開始(定期的な店舗巡回)"""
print("📊 システム監視を開始します...")
while True:
try:
# 現在のシステム状況を収集
metrics = self._collect_current_metrics()
# 履歴に追加
self.metrics_history.append(metrics)
# 古いデータを削除(24時間以上前)
self._cleanup_old_metrics()
# 異常検知
await self._detect_anomalies(metrics)
# 次の収集まで待機
await asyncio.sleep(self.collection_interval)
except Exception as e:
print(f"❌ メトリクス収集エラー: {e}")
await asyncio.sleep(5) # エラー時は少し待つ
def _collect_current_metrics(self) -> SystemMetrics:
"""現在のシステム指標を収集(店舗の現状調査)"""
# CPU使用率(コックさんがどれだけ忙しいか)
cpu_percent = psutil.cpu_percent(interval=1)
# メモリ使用率(冷蔵庫の満杯度)
memory = psutil.virtual_memory()
memory_percent = memory.percent
# ディスク使用率(倉庫の満杯度)
disk = psutil.disk_usage('/')
disk_percent = (disk.used / disk.total) * 100
# ネットワークIO(お客さんの出入り状況)
network = psutil.net_io_counters()
network_io = {
"bytes_sent": network.bytes_sent,
"bytes_recv": network.bytes_recv,
"packets_sent": network.packets_sent,
"packets_recv": network.packets_recv
}
# アクティブ接続数(店内のお客さん数)
active_connections = len(psutil.net_connections(kind='inet'))
# 応答時間は別途測定(簡略化)
response_time = self._measure_response_time()
metrics = SystemMetrics(
timestamp=datetime.utcnow(),
cpu_percent=cpu_percent,
memory_percent=memory_percent,
disk_percent=disk_percent,
network_io=network_io,
active_connections=active_connections,
response_time=response_time
)
# 収集した情報をログ出力
print(f"📈 {metrics.timestamp.strftime('%H:%M:%S')}: "
f"CPU:{cpu_percent:.1f}% "
f"MEM:{memory_percent:.1f}% "
f"接続:{active_connections}件")
return metrics
def _measure_response_time(self) -> float:
"""応答時間測定(注文から提供までの時間)"""
# 実際の実装では、アプリケーションレベルで測定
# ここでは簡略化
return 0.1 # 100ms
async def _detect_anomalies(self, metrics: SystemMetrics):
"""異常検知(問題の早期発見)"""
warnings = []
# CPU使用率が80%超過(コックさんが疲れ気味)
if metrics.cpu_percent > 80:
warnings.append(f"⚠️ CPU使用率が高いです: {metrics.cpu_percent:.1f}%")
# メモリ使用率が85%超過(冷蔵庫がほぼ満杯)
if metrics.memory_percent > 85:
warnings.append(f"⚠️ メモリ使用率が高いです: {metrics.memory_percent:.1f}%")
# ディスク使用率が90%超過(倉庫がほぼ満杯)
if metrics.disk_percent > 90:
warnings.append(f"⚠️ ディスク使用率が高いです: {metrics.disk_percent:.1f}%")
# 応答時間が1秒超過(お客さんを待たせすぎ)
if metrics.response_time > 1.0:
warnings.append(f"⚠️ 応答時間が遅いです: {metrics.response_time:.2f}秒")
# 警告がある場合はアラート送信
if warnings:
await self._send_alert(warnings)
async def _send_alert(self, warnings: List[str]):
"""アラート送信(緊急事態の通知)"""
alert_message = "🚨 システム警告:\n" + "\n".join(warnings)
print(alert_message)
# 実際の実装では以下のような通知を送信:
# - Slack通知
# - メール通知
# - PagerDuty等のインシデント管理システム
# - SMS通知(重要な障害の場合)
# 例: Slack通知(実装例)
# await self._send_slack_notification(alert_message)
def _cleanup_old_metrics(self):
"""古いメトリクスデータの削除(古い資料の廃棄)"""
cutoff_time = datetime.utcnow() - timedelta(hours=self.retention_hours)
# 24時間より古いデータを削除
self.metrics_history = [
m for m in self.metrics_history
if m.timestamp > cutoff_time
]
def get_metrics_summary(self, hours: int = 1) -> Dict[str, Any]:
"""指定時間のメトリクス集計(期間レポート)"""
since_time = datetime.utcnow() - timedelta(hours=hours)
recent_metrics = [
m for m in self.metrics_history
if m.timestamp > since_time
]
if not recent_metrics:
return {"error": "No metrics available"}
# 平均値、最大値、最小値を計算
cpu_values = [m.cpu_percent for m in recent_metrics]
memory_values = [m.memory_percent for m in recent_metrics]
response_times = [m.response_time for m in recent_metrics]
return {
"period_hours": hours,
"data_points": len(recent_metrics),
"cpu": {
"avg": sum(cpu_values) / len(cpu_values),
"max": max(cpu_values),
"min": min(cpu_values)
},
"memory": {
"avg": sum(memory_values) / len(memory_values),
"max": max(memory_values),
"min": min(memory_values)
},
"response_time": {
"avg": sum(response_times) / len(response_times),
"max": max(response_times),
"min": min(response_times)
},
"peak_connections": max(m.active_connections for m in recent_metrics)
}
class AlertManager:
"""アラート管理システム(緊急事態管理部門)"""
def __init__(self):
self.alert_rules = {
"cpu_critical": {"threshold": 90, "severity": "critical"},
"memory_critical": {"threshold": 95, "severity": "critical"},
"disk_critical": {"threshold": 95, "severity": "critical"},
"response_time_warning": {"threshold": 2.0, "severity": "warning"}
}
self.alert_history = []
async def process_metrics(self, metrics: SystemMetrics):
"""メトリクスからアラートをチェック"""
alerts = []
# CPU使用率チェック
if metrics.cpu_percent >= self.alert_rules["cpu_critical"]["threshold"]:
alerts.append({
"rule": "cpu_critical",
"message": f"CPU使用率が危険レベル: {metrics.cpu_percent:.1f}%",
"severity": "critical",
"timestamp": metrics.timestamp
})
# メモリ使用率チェック
if metrics.memory_percent >= self.alert_rules["memory_critical"]["threshold"]:
alerts.append({
"rule": "memory_critical",
"message": f"メモリ使用率が危険レベル: {metrics.memory_percent:.1f}%",
"severity": "critical",
"timestamp": metrics.timestamp
})
# 各アラートを処理
for alert in alerts:
await self._handle_alert(alert)
async def _handle_alert(self, alert: Dict[str, Any]):
"""個別アラート処理"""
# アラート履歴に記録
self.alert_history.append(alert)
# 重要度に応じた処理
if alert["severity"] == "critical":
print(f"🚨 【緊急】{alert['message']}")
# 即座に管理者に通知
await self._send_critical_notification(alert)
else:
print(f"⚠️ 【警告】{alert['message']}")
# 通常の警告通知
await self._send_warning_notification(alert)
async def _send_critical_notification(self, alert):
"""緊急通知送信"""
# SMS、電話、Slack即座通知など
pass
async def _send_warning_notification(self, alert):
"""警告通知送信"""
# メール、Slack通知など
pass
# 実際の使用例
async def setup_monitoring():
"""監視システムセットアップ"""
collector = MetricsCollector()
alert_manager = AlertManager()
# 監視開始
monitor_task = asyncio.create_task(collector.start_collection())
return collector, alert_manager, monitor_task
🔰 初心者向け解説:
概念 | 何をしているか | 身近な例 |
---|---|---|
psutil |
システムの状況を調べるツール | 体温計や血圧計(システムの健康診断) |
CPU使用率 |
コンピューターがどれだけ忙しいか | コックさんがどれだけ忙しく働いているか |
メモリ使用率 |
作業用メモリがどれだけ使われているか | 冷蔵庫がどれだけ満杯か |
アクティブ接続数 |
同時に利用している人数 | 店内にいるお客さんの数 |
異常検知 |
いつもと違う状況を自動で見つける | 「お客さんが多すぎて待ち時間が長い」を自動発見 |
📄 Step 5: アラートシステムとログ管理
問題が起きた時に、すぐに管理者に通知する仕組みを作りましょう:
# src/chapter05-saas-platform/backend/app/core/alerting.py(アラートシステム)
import asyncio
import aiohttp
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from typing import Dict, Any, List, Optional
from datetime import datetime, timedelta
from dataclasses import dataclass
from enum import Enum
import json
class AlertSeverity(Enum):
"""アラートの重要度(緊急度ランク)"""
INFO = "info" # 情報(お知らせ程度)
WARNING = "warning" # 警告(注意が必要)
CRITICAL = "critical" # 緊急(すぐ対応必要)
FATAL = "fatal" # 致命的(システム停止レベル)
@dataclass
class Alert:
"""アラート情報(緊急事態報告書)"""
id: str
title: str
message: str
severity: AlertSeverity
timestamp: datetime
source: str # どのシステムからの警告か
metadata: Dict[str, Any] = None
class NotificationChannel:
"""通知チャンネル(連絡方法)"""
async def send_slack_notification(self, alert: Alert, webhook_url: str):
"""Slack通知(チャット通知)"""
# 重要度に応じて色を変更
color_map = {
AlertSeverity.INFO: "#36a64f", # 緑
AlertSeverity.WARNING: "#ff9500", # オレンジ
AlertSeverity.CRITICAL: "#ff0000", # 赤
AlertSeverity.FATAL: "#8B0000" # 暗赤
}
# 重要度に応じて絵文字を変更
emoji_map = {
AlertSeverity.INFO: "ℹ️",
AlertSeverity.WARNING: "⚠️",
AlertSeverity.CRITICAL: "🚨",
AlertSeverity.FATAL: "💀"
}
payload = {
"attachments": [
{
"color": color_map[alert.severity],
"title": f"{emoji_map[alert.severity]} {alert.title}",
"text": alert.message,
"fields": [
{
"title": "重要度",
"value": alert.severity.value.upper(),
"short": True
},
{
"title": "発生時刻",
"value": alert.timestamp.strftime("%Y-%m-%d %H:%M:%S"),
"short": True
},
{
"title": "発生源",
"value": alert.source,
"short": True
}
],
"footer": "SaaS Platform Monitoring",
"ts": alert.timestamp.timestamp()
}
]
}
try:
async with aiohttp.ClientSession() as session:
async with session.post(webhook_url, json=payload) as response:
if response.status == 200:
print(f"✅ Slack通知送信成功: {alert.title}")
else:
print(f"❌ Slack通知送信失敗: {response.status}")
except Exception as e:
print(f"💥 Slack通知エラー: {e}")
async def send_email_notification(self, alert: Alert, email_config: Dict[str, str]):
"""メール通知(メール通知)"""
try:
# メール作成
msg = MIMEMultipart()
msg['From'] = email_config['from_email']
msg['To'] = email_config['to_email']
msg['Subject'] = f"[{alert.severity.value.upper()}] {alert.title}"
# HTML本文作成
html_body = f"""
<html>
<body>
<h2 style="color: {'red' if alert.severity in [AlertSeverity.CRITICAL, AlertSeverity.FATAL] else 'orange'};">
システム警告
</h2>
<table style="border-collapse: collapse; width: 100%;">
<tr>
<td style="border: 1px solid #ddd; padding: 8px; font-weight: bold;">タイトル</td>
<td style="border: 1px solid #ddd; padding: 8px;">{alert.title}</td>
</tr>
<tr>
<td style="border: 1px solid #ddd; padding: 8px; font-weight: bold;">重要度</td>
<td style="border: 1px solid #ddd; padding: 8px;">{alert.severity.value.upper()}</td>
</tr>
<tr>
<td style="border: 1px solid #ddd; padding: 8px; font-weight: bold;">発生時刻</td>
<td style="border: 1px solid #ddd; padding: 8px;">{alert.timestamp.strftime("%Y-%m-%d %H:%M:%S")}</td>
</tr>
<tr>
<td style="border: 1px solid #ddd; padding: 8px; font-weight: bold;">発生源</td>
<td style="border: 1px solid #ddd; padding: 8px;">{alert.source}</td>
</tr>
</table>
<h3>詳細メッセージ</h3>
<p>{alert.message}</p>
<hr>
<p><small>このメールは自動送信されました。</small></p>
</body>
</html>
"""
msg.attach(MIMEText(html_body, 'html'))
# SMTP送信
server = smtplib.SMTP(email_config['smtp_server'], email_config['smtp_port'])
server.starttls()
server.login(email_config['username'], email_config['password'])
server.send_message(msg)
server.quit()
print(f"✅ メール通知送信成功: {alert.title}")
except Exception as e:
print(f"💥 メール通知エラー: {e}")
class LogManager:
"""ログ管理システム(お店の日報管理)"""
def __init__(self, log_level: str = "INFO"):
self.log_level = log_level
self.log_buffer = [] # メモリ内ログバッファ
def log(self, level: str, message: str, extra_data: Dict[str, Any] = None):
"""ログ記録(日報への記録)"""
log_entry = {
"timestamp": datetime.utcnow().isoformat(),
"level": level,
"message": message,
"extra_data": extra_data or {}
}
# メモリバッファに追加
self.log_buffer.append(log_entry)
# コンソール出力(開発時)
timestamp_str = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
print(f"[{timestamp_str}] {level}: {message}")
# ファイルに書き込み(実際の実装では)
# self._write_to_file(log_entry)
# 長期保存用にデータベースに保存(実際の実装では)
# await self._save_to_database(log_entry)
def info(self, message: str, **kwargs):
"""情報ログ"""
self.log("INFO", message, kwargs)
def warning(self, message: str, **kwargs):
"""警告ログ"""
self.log("WARNING", message, kwargs)
def error(self, message: str, **kwargs):
"""エラーログ"""
self.log("ERROR", message, kwargs)
def critical(self, message: str, **kwargs):
"""緊急ログ"""
self.log("CRITICAL", message, kwargs)
def get_recent_logs(self, hours: int = 1, level: Optional[str] = None) -> List[Dict[str, Any]]:
"""最近のログ取得(期間指定で日報確認)"""
since_time = datetime.utcnow() - timedelta(hours=hours)
filtered_logs = []
for log_entry in self.log_buffer:
log_time = datetime.fromisoformat(log_entry["timestamp"])
# 時間範囲チェック
if log_time < since_time:
continue
# レベルフィルター
if level and log_entry["level"] != level:
continue
filtered_logs.append(log_entry)
return filtered_logs
class AlertingSystem:
"""統合アラートシステム(緊急事態対応本部)"""
def __init__(self):
self.notification_channel = NotificationChannel()
self.log_manager = LogManager()
self.alert_history = []
self.suppression_rules = {} # アラート抑制ルール
async def send_alert(
self,
title: str,
message: str,
severity: AlertSeverity,
source: str,
slack_webhook: Optional[str] = None,
email_config: Optional[Dict[str, str]] = None
):
"""アラート送信(緊急事態の発報)"""
# アラート作成
alert = Alert(
id=f"{source}_{datetime.utcnow().timestamp()}",
title=title,
message=message,
severity=severity,
timestamp=datetime.utcnow(),
source=source
)
# アラート抑制チェック(同じ警告の連発を防ぐ)
if self._should_suppress_alert(alert):
self.log_manager.info(f"アラート抑制: {alert.title}")
return
# アラート履歴に記録
self.alert_history.append(alert)
# ログに記録
self.log_manager.log(
level=severity.value.upper(),
message=f"アラート発生: {title}",
extra_data={"alert_id": alert.id, "source": source}
)
# 通知送信
notifications = []
if slack_webhook:
notifications.append(
self.notification_channel.send_slack_notification(alert, slack_webhook)
)
if email_config:
notifications.append(
self.notification_channel.send_email_notification(alert, email_config)
)
# 複数通知を並行実行
if notifications:
await asyncio.gather(*notifications, return_exceptions=True)
def _should_suppress_alert(self, alert: Alert) -> bool:
"""アラート抑制判定(同じ警告の連発防止)"""
# 過去5分間に同じタイプのアラートがあったら抑制
five_minutes_ago = datetime.utcnow() - timedelta(minutes=5)
for past_alert in self.alert_history:
if (past_alert.timestamp > five_minutes_ago and
past_alert.title == alert.title and
past_alert.source == alert.source):
return True
return False
def get_alert_summary(self, hours: int = 24) -> Dict[str, Any]:
"""アラート集計(期間レポート)"""
since_time = datetime.utcnow() - timedelta(hours=hours)
recent_alerts = [
alert for alert in self.alert_history
if alert.timestamp > since_time
]
# 重要度別集計
severity_counts = {}
for severity in AlertSeverity:
severity_counts[severity.value] = len([
a for a in recent_alerts
if a.severity == severity
])
# 発生源別集計
source_counts = {}
for alert in recent_alerts:
source_counts[alert.source] = source_counts.get(alert.source, 0) + 1
return {
"period_hours": hours,
"total_alerts": len(recent_alerts),
"severity_breakdown": severity_counts,
"source_breakdown": source_counts,
"recent_alerts": [
{
"title": alert.title,
"severity": alert.severity.value,
"timestamp": alert.timestamp.isoformat(),
"source": alert.source
}
for alert in recent_alerts[-10:] # 最新10件
]
}
# 実際の使用例
async def setup_alerting():
"""アラートシステムセットアップ"""
alerting = AlertingSystem()
# テスト警告送信
await alerting.send_alert(
title="CPU使用率が高いです",
message="CPU使用率が85%を超えました。システムの負荷を確認してください。",
severity=AlertSeverity.WARNING,
source="monitoring_system",
slack_webhook="https://hooks.slack.com/services/YOUR/WEBHOOK/URL"
)
return alerting
🔰 初心者向け解説:
概念 | 何をしているか | 身近な例 |
---|---|---|
AlertSeverity |
緊急度のランク分け | 「お知らせ・注意・警報・緊急事態」の段階 |
Slack通知 |
チャットアプリへの即座通知 | LINE で「お店が忙しいです!」と連絡 |
アラート抑制 |
同じ警告の連発を防ぐ | 「5分以内の同じ警告は1回だけ」ルール |
ログ管理 |
起こったことの記録保存 | お店の日報(何時に何が起きたか記録) |
HTML メール |
見やすい形式のメール通知 | カラフルで読みやすい緊急連絡メール |
📄 Step 6: CI/CD パイプライン(自動デプロイシステム)
コードを更新したら自動でテスト・デプロイする仕組みを作りましょう:
# .github/workflows/deploy.yml(GitHub Actions設定)
name: SaaS Platform CI/CD Pipeline
on:
push:
branches: [ main, develop ]
pull_request:
branches: [ main ]
env:
PYTHON_VERSION: 3.11
NODE_VERSION: 18
REGISTRY: ghcr.io
IMAGE_NAME: saas-platform
jobs:
# 🧪 Step 1: テスト実行(品質チェック)
test:
runs-on: ubuntu-latest
services:
postgres:
image: postgres:15
env:
POSTGRES_PASSWORD: postgres
POSTGRES_DB: test_db
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
ports:
- 5432:5432
redis:
image: redis:7
options: >-
--health-cmd "redis-cli ping"
--health-interval 10s
--health-timeout 5s
--health-retries 5
ports:
- 6379:6379
steps:
- name: 📥 コードチェックアウト
uses: actions/checkout@v3
- name: 🐍 Python環境セットアップ
uses: actions/setup-python@v4
with:
python-version: $
cache: 'pip'
- name: 📦 依存関係インストール
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
pip install -r requirements-dev.txt
- name: 🔍 コード品質チェック(リンター)
run: |
# フォーマットチェック
black --check .
# インポート整理チェック
isort --check-only .
# 静的解析
flake8 .
# 型チェック
mypy .
- name: 🧪 単体テスト実行
env:
DATABASE_URL: postgresql://postgres:postgres@localhost:5432/test_db
REDIS_URL: redis://localhost:6379/0
run: |
pytest tests/unit/ -v --cov=app --cov-report=xml
- name: 🔧 統合テスト実行
env:
DATABASE_URL: postgresql://postgres:postgres@localhost:5432/test_db
REDIS_URL: redis://localhost:6379/0
run: |
pytest tests/integration/ -v
- name: 🏃♂️ API テスト実行
env:
DATABASE_URL: postgresql://postgres:postgres@localhost:5432/test_db
REDIS_URL: redis://localhost:6379/0
run: |
# アプリケーション起動
uvicorn app.main:app --host 0.0.0.0 --port 8000 &
sleep 10
# API テスト実行
pytest tests/api/ -v
- name: 📊 テストカバレッジアップロード
uses: codecov/codecov-action@v3
with:
file: ./coverage.xml
fail_ci_if_error: true
# 🔨 Step 2: Docker イメージビルド
build:
needs: test
runs-on: ubuntu-latest
if: github.event_name == 'push'
outputs:
image-tag: $
image-digest: $
steps:
- name: 📥 コードチェックアウト
uses: actions/checkout@v3
- name: 🔧 Docker Buildx セットアップ
uses: docker/setup-buildx-action@v2
- name: 🔐 Container Registry ログイン
uses: docker/login-action@v2
with:
registry: $
username: $
password: $
- name: 🏷️ メタデータ生成
id: meta
uses: docker/metadata-action@v4
with:
images: $/$/$
tags: |
type=ref,event=branch
type=ref,event=pr
type=sha,prefix=-
type=raw,value=latest,enable=
- name: 🔨 Docker イメージビルド・プッシュ
id: build
uses: docker/build-push-action@v4
with:
context: .
file: ./Dockerfile
push: true
tags: $
labels: $
cache-from: type=gha
cache-to: type=gha,mode=max
# 🚀 Step 3: 開発環境デプロイ
deploy-dev:
needs: [test, build]
runs-on: ubuntu-latest
if: github.ref == 'refs/heads/develop'
environment: development
steps:
- name: 🚀 開発環境デプロイ
run: |
echo "🌱 開発環境にデプロイ中..."
# Kubernetesにデプロイ(例)
# kubectl set image deployment/saas-platform \
# app=$ \
# --namespace=development
# ヘルスチェック
# kubectl rollout status deployment/saas-platform --namespace=development
echo "✅ 開発環境デプロイ完了!"
# 🌍 Step 4: 本番環境デプロイ
deploy-prod:
needs: [test, build]
runs-on: ubuntu-latest
if: github.ref == 'refs/heads/main'
environment: production
steps:
- name: 🔍 本番前チェック
run: |
echo "🔍 本番環境デプロイ前チェック..."
# データベースマイグレーションのドライラン
# alembic upgrade head --sql
# 設定ファイルの妥当性チェック
# python -c "from app.core.config import settings; print('設定OK')"
echo "✅ 事前チェック完了"
- name: 🚀 本番環境デプロイ
run: |
echo "🌍 本番環境にデプロイ中..."
# ブルーグリーンデプロイ(例)
# kubectl apply -f k8s/production/
# kubectl set image deployment/saas-platform \
# app=$ \
# --namespace=production
# ローリングアップデート完了待機
# kubectl rollout status deployment/saas-platform --namespace=production
echo "✅ 本番環境デプロイ完了!"
- name: 🧪 本番環境ヘルスチェック
run: |
echo "🏥 本番環境ヘルスチェック..."
# API ヘルスチェック
# curl -f https://api.yourapp.com/health || exit 1
# データベース接続チェック
# python scripts/health_check.py
echo "✅ 本番環境正常動作確認"
- name: 📢 デプロイ通知
if: always()
run: |
# Slack通知(実際の実装では)
# curl -X POST -H 'Content-type: application/json' \
# --data '{"text":"🚀 本番環境デプロイ完了"}' \
# $
echo "📢 関係者に通知送信完了"
# 🔄 Step 5: データベースマイグレーション
migrate:
needs: deploy-prod
runs-on: ubuntu-latest
if: github.ref == 'refs/heads/main'
environment: production
steps:
- name: 📥 コードチェックアウト
uses: actions/checkout@v3
- name: 🐍 Python環境セットアップ
uses: actions/setup-python@v4
with:
python-version: $
- name: 📦 依存関係インストール
run: |
pip install alembic psycopg2-binary
- name: 🗄️ データベースマイグレーション実行
env:
DATABASE_URL: $
run: |
echo "🗄️ データベースマイグレーション実行..."
# マイグレーション実行
alembic upgrade head
echo "✅ マイグレーション完了"
🔰 初心者向け解説:
概念 | 何をしているか | 身近な例 |
---|---|---|
CI/CD |
コード更新→テスト→デプロイを自動化 | 料理のレシピ通りに自動で調理する機械 |
GitHub Actions |
GitHubでのワークフロー自動実行 | 「コードが更新されたら自動でテスト開始」 |
pytest |
Pythonのテスト実行ツール | 料理が美味しくできたかの味見 |
Docker |
アプリを箱詰めして動かす技術 | 「どこでも同じ環境で動く」お弁当箱 |
ブルーグリーンデプロイ |
2つの環境を使って安全にデプロイ | 新店舗を準備してからお客さんを案内 |
まとめ
Chapter 5-3では、大規模なSaaSプラットフォームを支えるスケーリング戦略とパフォーマンス最適化を実装しました。
実装した機能:
- Redis による高速キャッシングシステム
- データベース最適化とコネクションプール管理
- ロードバランサーによる負荷分散
- リアルタイム監視とメトリクス収集
- アラートシステムとログ管理
- CI/CD パイプラインによる自動デプロイ
技術的な学習ポイント:
- 高トラフィック対応のアーキテクチャ設計
- 監視・運用システムの構築
- 自動化による運用効率化
- インフラレベルでの可用性確保
次章予告: Chapter 6では、セキュリティとコンプライアンス対応を深掘りします。
def get_connection_pool_status(self):
"""データベース接続の使用状況確認(駐車場の空き状況みたいな)"""
pool = self.engine.pool
return {
"total_size": pool.size(), # 総駐車台数
"checked_in": pool.checkedin(), # 空いている台数
"checked_out": pool.checkedout(), # 使用中の台数
"overflow": pool.overflow(), # 臨時駐車台数
"status": f"{pool.checkedout()}/{pool.size() + pool.overflow()}" # 使用率
}
def analyze_table_sizes(self):
"""どのテーブルが重いかを調査(図書館のどの棚が重いか確認)"""
with self.engine.connect() as conn:
# テーブルサイズ情報を取得
result = conn.execute(text("""
SELECT
tablename as table_name,
pg_size_pretty(pg_total_relation_size('public.'||tablename)) as size,
pg_total_relation_size('public.'||tablename) as size_bytes
FROM pg_tables
WHERE schemaname = 'public'
ORDER BY pg_total_relation_size('public.'||tablename) DESC
LIMIT 10;
"""))
table_sizes = []
for row in result:
table_sizes.append({
'table_name': row.table_name,
'readable_size': row.size, # "125 MB" のような読みやすい形式
'bytes': row.size_bytes # 正確なバイト数
})
return table_sizes
def find_unused_indexes(self):
"""使われていないインデックスを発見(使わない本棚を見つける)"""
with self.engine.connect() as conn:
result = conn.execute(text("""
SELECT
schemaname,
tablename,
indexname,
pg_size_pretty(pg_relation_size(indexrelid)) as size
FROM pg_stat_user_indexes
WHERE idx_scan = 0 -- 1回も使われていないインデックス
AND schemaname = 'public'
ORDER BY pg_relation_size(indexrelid) DESC;
"""))
unused_indexes = []
for row in result:
unused_indexes.append({
'table': row.tablename,
'index': row.indexname,
'wasted_size': row.size
})
return unused_indexes
def create_monthly_partition(self, table_name: str, year: int, month: int):
"""月次パーティション作成(月ごとにファイルを分ける整理術)"""
partition_name = f"{table_name}_y{year}m{month:02d}"
start_date = f"{year}-{month:02d}-01"
# 次月の1日を終了日とする
if month == 12:
end_date = f"{year + 1}-01-01"
else:
end_date = f"{year}-{month + 1:02d}-01"
create_sql = f"""
-- {year}年{month}月のデータ専用テーブルを作成
CREATE TABLE IF NOT EXISTS {partition_name}
PARTITION OF {table_name}
FOR VALUES FROM ('{start_date}') TO ('{end_date}');
-- 高速検索用のインデックスを作成
CREATE INDEX IF NOT EXISTS {partition_name}_created_at_idx
ON {partition_name} (created_at);
CREATE INDEX IF NOT EXISTS {partition_name}_user_id_idx
ON {partition_name} (user_id);
"""
return create_sql
使用例:賢いデータベース管理
optimizer = DatabaseOptimizer(engine)
接続状況をチェック
pool_status = optimizer.get_connection_pool_status() print(f”データベース接続: {pool_status[‘status’]}”)
重いテーブルを確認
heavy_tables = optimizer.analyze_table_sizes() for table in heavy_tables[:3]: # 上位3つを表示 print(f”📊 {table[‘table_name’]}: {table[‘readable_size’]}”)
無駄なインデックスを確認
unused = optimizer.find_unused_indexes() if unused: print(f”🗑️ 使われていないインデックス: {len(unused)}個”)
**🔰 初心者向け解説**:
| 概念 | 何をしているか | 身近な例 |
|:-----|:-------------|:---------|
| `クエリ監視` | 遅いデータベース検索を自動で発見 | 図書館で「この本探すのに10分もかかってる」と気づく |
| `接続プール` | データベース接続を使い回して効率化 | タクシー会社の車両管理(無駄に車を増やさない) |
| `テーブルサイズ分析` | どのデータが重いかを把握 | 引っ越し時に「この箱が一番重い」と把握 |
| `パーティション` | 大きなテーブルを月別に分割 | 家計簿を月別ファイルに分けて管理 |
| `未使用インデックス` | 使われていない索引を削除して節約 | 使わない電話帳を捨ててスペース確保 |
### 📄 Step 3: ロードバランサー(お客さんを上手に案内する仕組み)
大量のアクセスを複数のサーバーに振り分ける「案内係」の仕組みを見てみましょう:
```python
# src/chapter05-saas-platform/backend/app/core/load_balancer.py(重要部分を抜粋)
from typing import List, Dict, Any
import random
import time
from dataclasses import dataclass
from enum import Enum
class HealthStatus(Enum):
"""サーバーの健康状態(レストランスタッフの出勤状況みたいな)"""
HEALTHY = "healthy" # 元気で働ける
UNHEALTHY = "unhealthy" # 体調不良で働けない
DEGRADED = "degraded" # 少し疲れているが働ける
@dataclass
class ServerInstance:
"""個別サーバーの情報(レストランの各店舗情報)"""
id: str # サーバーID(店舗名)
host: str # サーバーのアドレス
port: int # サーバーのポート番号
weight: int = 1 # 処理能力の重み(大きな店舗なら2、小さな店舗なら1)
current_connections: int = 0 # 現在の接続数(今いるお客さん数)
max_connections: int = 100 # 最大接続数(席数)
status: HealthStatus = HealthStatus.HEALTHY # 健康状態
last_health_check: float = 0 # 最後にチェックした時刻
response_time: float = 0 # 応答時間(注文から料理が出るまでの時間)
class LoadBalancer:
"""ロードバランサー(お客さんを適切な店舗に案内するシステム)"""
def __init__(self):
self.servers: List[ServerInstance] = [] # 管理している店舗一覧
self.algorithm = "weighted_round_robin" # 案内方法
self.current_index = 0 # 現在の案内先インデックス
def add_server(self, server: ServerInstance):
"""新しい店舗を追加"""
self.servers.append(server)
print(f"🏪 新店舗追加: {server.id} ({server.host}:{server.port})")
def get_healthy_servers(self) -> List[ServerInstance]:
"""営業中の店舗一覧を取得"""
healthy = [s for s in self.servers if s.status == HealthStatus.HEALTHY]
print(f"🟢 営業中店舗: {len(healthy)}店舗")
return healthy
def select_server(self) -> ServerInstance:
"""お客さんを案内する店舗を選択"""
healthy_servers = self.get_healthy_servers()
if not healthy_servers:
raise Exception("❌ 営業中の店舗がありません!")
# 案内方法による選択
if self.algorithm == "round_robin":
return self._round_robin_selection(healthy_servers)
elif self.algorithm == "weighted_round_robin":
return self._weighted_round_robin_selection(healthy_servers)
elif self.algorithm == "least_connections":
return self._least_connections_selection(healthy_servers)
else:
# ランダム選択(お任せ案内)
return random.choice(healthy_servers)
def _round_robin_selection(self, servers: List[ServerInstance]) -> ServerInstance:
"""順番に案内(公平に回す)"""
server = servers[self.current_index % len(servers)]
self.current_index += 1
print(f"📍 順番案内: {server.id} へご案内")
return server
def _weighted_round_robin_selection(self, servers: List[ServerInstance]) -> ServerInstance:
"""処理能力に応じて案内(大きな店舗により多くのお客さんを)"""
total_weight = sum(s.weight for s in servers)
rand_weight = random.randint(1, total_weight)
current_weight = 0
for server in servers:
current_weight += server.weight
if rand_weight <= current_weight:
print(f"⚖️ 重み付き案内: {server.id} へご案内 (weight: {server.weight})")
return server
return servers[0] # フォールバック
def _least_connections_selection(self, servers: List[ServerInstance]) -> ServerInstance:
"""一番空いている店舗に案内"""
least_busy = min(servers, key=lambda s: s.current_connections)
print(f"🎯 空いている店舗に案内: {least_busy.id} (お客さん{least_busy.current_connections}人)")
return least_busy
async def health_check(self):
"""全店舗の健康状態をチェック(巡回点検)"""
print("🔍 全店舗の健康チェック開始...")
for server in self.servers:
try:
# 店舗に「元気ですか?」と確認
start_time = time.time()
is_healthy = await self._check_server_health(server)
response_time = time.time() - start_time
server.response_time = response_time
server.last_health_check = time.time()
if is_healthy:
if response_time > 5.0: # 5秒以上は少し疲れ気味
server.status = HealthStatus.DEGRADED
print(f"😅 {server.id}: 少し疲れ気味 ({response_time:.1f}秒)")
else:
server.status = HealthStatus.HEALTHY
print(f"😊 {server.id}: 元気です ({response_time:.1f}秒)")
else:
server.status = HealthStatus.UNHEALTHY
print(f"😷 {server.id}: 体調不良で休業中")
except Exception as e:
server.status = HealthStatus.UNHEALTHY
print(f"💥 {server.id}: 連絡がつきません - {str(e)}")
async def _check_server_health(self, server: ServerInstance) -> bool:
"""個別店舗の健康チェック"""
import httpx
try:
async with httpx.AsyncClient(timeout=5.0) as client:
# 店舗のヘルスチェックAPI呼び出し
response = await client.get(f"http://{server.host}:{server.port}/health")
return response.status_code == 200
except:
return False
# 使用例:実際のロードバランサー運用
load_balancer = LoadBalancer()
# 店舗を追加
load_balancer.add_server(ServerInstance("tokyo-1", "10.0.1.100", 8000, weight=2)) # 大型店舗
load_balancer.add_server(ServerInstance("osaka-1", "10.0.1.101", 8000, weight=2)) # 大型店舗
load_balancer.add_server(ServerInstance("backup-1", "10.0.1.102", 8000, weight=1)) # 予備店舗
# お客さんが来たら適切な店舗に案内
try:
selected_server = load_balancer.select_server()
print(f"🎉 {selected_server.id} にお客様をご案内しました!")
except Exception as e:
print(f"😵 申し訳ございません: {e}")
🔰 初心者向け解説:
概念 | 何をしているか | 身近な例 |
---|---|---|
LoadBalancer |
お客さんを適切なサーバーに案内 | レストランチェーンの案内係 |
HealthStatus |
各サーバーの稼働状況を管理 | 各店舗の営業状況(営業中・休業中・混雑中) |
Round Robin |
順番にサーバーを使って公平に分散 | 「次のお客様は2号店へ、その次は3号店へ」 |
Weighted |
サーバーの性能に応じて配分調整 | 大きな店舗により多くのお客さんを案内 |
Least Connections |
一番空いているサーバーを選択 | 待ち時間が一番短い店舗を案内 |
Health Check |
定期的にサーバーの状態を確認 | 店長が各店舗に「営業できてる?」と確認電話 |
データベース最適化
# app/core/database_optimization.py
from sqlalchemy import event, text
from sqlalchemy.engine import Engine
from sqlalchemy.pool import QueuePool
import time
import logging
from app.core.config import settings
# スロークエリ監視
@event.listens_for(Engine, "before_cursor_execute")
def receive_before_cursor_execute(conn, cursor, statement, parameters, context, executemany):
context._query_start_time = time.time()
@event.listens_for(Engine, "after_cursor_execute")
def receive_after_cursor_execute(conn, cursor, statement, parameters, context, executemany):
total = time.time() - context._query_start_time
# 1秒以上のクエリを警告
if total > 1.0:
logging.warning(
f"Slow query: {total:.2f}s\n"
f"Statement: {statement}\n"
f"Parameters: {parameters}"
)
class DatabaseOptimizer:
def __init__(self, engine):
self.engine = engine
def get_connection_pool_status(self):
"""コネクションプール状態取得"""
pool = self.engine.pool
return {
"size": pool.size(),
"checked_in": pool.checkedin(),
"checked_out": pool.checkedout(),
"overflow": pool.overflow(),
"status": f"{pool.checkedout()}/{pool.size() + pool.overflow()}"
}
def analyze_query_performance(self, query: str, limit: int = 10):
"""クエリパフォーマンス分析"""
with self.engine.connect() as conn:
# EXPLAIN ANALYZE実行
explain_query = f"EXPLAIN (ANALYZE, BUFFERS, FORMAT JSON) {query}"
result = conn.execute(text(explain_query))
explain_data = result.fetchone()[0]
return {
"execution_time": explain_data[0]["Execution Time"],
"planning_time": explain_data[0]["Planning Time"],
"plan": explain_data[0]["Plan"]
}
def get_database_statistics(self):
"""データベース統計情報取得"""
with self.engine.connect() as conn:
# テーブルサイズ
table_sizes = conn.execute(text("""
SELECT
schemaname,
tablename,
pg_size_pretty(pg_total_relation_size(schemaname||'.'||tablename)) as size,
pg_total_relation_size(schemaname||'.'||tablename) as size_bytes
FROM pg_tables
WHERE schemaname = 'public'
ORDER BY pg_total_relation_size(schemaname||'.'||tablename) DESC;
""")).fetchall()
# インデックス効率
index_usage = conn.execute(text("""
SELECT
schemaname,
tablename,
indexname,
idx_scan as index_scans,
idx_tup_read as tuples_read,
idx_tup_fetch as tuples_fetched
FROM pg_stat_user_indexes
ORDER BY idx_scan DESC;
""")).fetchall()
# 未使用インデックス
unused_indexes = conn.execute(text("""
SELECT
schemaname,
tablename,
indexname,
pg_size_pretty(pg_relation_size(indexrelid)) as size
FROM pg_stat_user_indexes
WHERE idx_scan = 0
AND schemaname = 'public';
""")).fetchall()
return {
"table_sizes": [dict(row._mapping) for row in table_sizes],
"index_usage": [dict(row._mapping) for row in index_usage],
"unused_indexes": [dict(row._mapping) for row in unused_indexes]
}
# パーティショニング実装例
def create_audit_log_partition(year: int, month: int):
"""監査ログパーティション作成"""
partition_name = f"audit_logs_y{year}m{month:02d}"
start_date = f"{year}-{month:02d}-01"
if month == 12:
end_date = f"{year + 1}-01-01"
else:
end_date = f"{year}-{month + 1:02d}-01"
create_partition_sql = f"""
CREATE TABLE IF NOT EXISTS {partition_name}
PARTITION OF audit_logs
FOR VALUES FROM ('{start_date}') TO ('{end_date}');
CREATE INDEX IF NOT EXISTS {partition_name}_created_at_idx
ON {partition_name} (created_at);
CREATE INDEX IF NOT EXISTS {partition_name}_user_id_idx
ON {partition_name} (user_id);
"""
return create_partition_sql
ロードバランシング設定
# app/core/load_balancer.py
from typing import List, Dict, Any
import random
import time
from dataclasses import dataclass
from enum import Enum
class HealthStatus(Enum):
HEALTHY = "healthy"
UNHEALTHY = "unhealthy"
DEGRADED = "degraded"
@dataclass
class ServerInstance:
id: str
host: str
port: int
weight: int = 1
current_connections: int = 0
max_connections: int = 100
status: HealthStatus = HealthStatus.HEALTHY
last_health_check: float = 0
response_time: float = 0
class LoadBalancer:
def __init__(self):
self.servers: List[ServerInstance] = []
self.algorithm = "weighted_round_robin"
self.current_index = 0
def add_server(self, server: ServerInstance):
"""サーバー追加"""
self.servers.append(server)
def remove_server(self, server_id: str):
"""サーバー削除"""
self.servers = [s for s in self.servers if s.id != server_id]
def get_healthy_servers(self) -> List[ServerInstance]:
"""健全なサーバー一覧取得"""
return [s for s in self.servers if s.status == HealthStatus.HEALTHY]
def select_server(self) -> ServerInstance:
"""サーバー選択"""
healthy_servers = self.get_healthy_servers()
if not healthy_servers:
raise Exception("No healthy servers available")
if self.algorithm == "round_robin":
return self._round_robin_selection(healthy_servers)
elif self.algorithm == "weighted_round_robin":
return self._weighted_round_robin_selection(healthy_servers)
elif self.algorithm == "least_connections":
return self._least_connections_selection(healthy_servers)
else:
return random.choice(healthy_servers)
def _round_robin_selection(self, servers: List[ServerInstance]) -> ServerInstance:
"""ラウンドロビン選択"""
server = servers[self.current_index % len(servers)]
self.current_index += 1
return server
def _weighted_round_robin_selection(self, servers: List[ServerInstance]) -> ServerInstance:
"""重み付きラウンドロビン選択"""
total_weight = sum(s.weight for s in servers)
rand_weight = random.randint(1, total_weight)
current_weight = 0
for server in servers:
current_weight += server.weight
if rand_weight <= current_weight:
return server
return servers[0] # フォールバック
def _least_connections_selection(self, servers: List[ServerInstance]) -> ServerInstance:
"""最少コネクション選択"""
return min(servers, key=lambda s: s.current_connections)
async def health_check(self):
"""ヘルスチェック実行"""
for server in self.servers:
try:
# HTTPヘルスチェック実行
start_time = time.time()
is_healthy = await self._check_server_health(server)
response_time = time.time() - start_time
server.response_time = response_time
server.last_health_check = time.time()
if is_healthy:
if response_time > 5.0: # 5秒以上は劣化
server.status = HealthStatus.DEGRADED
else:
server.status = HealthStatus.HEALTHY
else:
server.status = HealthStatus.UNHEALTHY
except Exception:
server.status = HealthStatus.UNHEALTHY
async def _check_server_health(self, server: ServerInstance) -> bool:
"""個別サーバーヘルスチェック"""
import httpx
try:
async with httpx.AsyncClient(timeout=5.0) as client:
response = await client.get(f"http://{server.host}:{server.port}/health")
return response.status_code == 200
except:
return False
# 使用例
load_balancer = LoadBalancer()
🔍 監視・運用システム(24時間見守るシステム)
📄 Step 4: システム監視とメトリクス収集
全国チェーンになったら、各店舗の状況をリアルタイムで監視する「本部の監視センター」が必要になります:
# app/core/metrics.py
import time
import psutil
from typing import Dict, Any, List
from dataclasses import dataclass, asdict
from datetime import datetime
import asyncio
@dataclass
class SystemMetrics:
timestamp: datetime
cpu_percent: float
memory_percent: float
memory_used_mb: float
disk_usage_percent: float
network_io: Dict[str, int]
process_count: int
@dataclass
class ApplicationMetrics:
timestamp: datetime
active_connections: int
request_count: int
error_count: int
avg_response_time: float
cache_hit_ratio: float
database_connections: int
class MetricsCollector:
def __init__(self):
self.system_metrics_history: List[SystemMetrics] = []
self.app_metrics_history: List[ApplicationMetrics] = []
self.max_history_size = 1000
# カウンター
self.request_count = 0
self.error_count = 0
self.response_times: List[float] = []
def collect_system_metrics(self) -> SystemMetrics:
"""システムメトリクス収集"""
memory = psutil.virtual_memory()
disk = psutil.disk_usage('/')
network = psutil.net_io_counters()
metrics = SystemMetrics(
timestamp=datetime.utcnow(),
cpu_percent=psutil.cpu_percent(interval=1),
memory_percent=memory.percent,
memory_used_mb=memory.used / 1024 / 1024,
disk_usage_percent=disk.percent,
network_io={
"bytes_sent": network.bytes_sent,
"bytes_recv": network.bytes_recv
},
process_count=len(psutil.pids())
)
# 履歴保存
self.system_metrics_history.append(metrics)
if len(self.system_metrics_history) > self.max_history_size:
self.system_metrics_history.pop(0)
return metrics
def collect_application_metrics(self) -> ApplicationMetrics:
"""アプリケーションメトリクス収集"""
from app.core.database import engine
from app.core.cache import cache
# データベース接続数
db_connections = 0
if hasattr(engine.pool, 'checkedout'):
db_connections = engine.pool.checkedout()
# レスポンス時間平均
avg_response_time = 0
if self.response_times:
avg_response_time = sum(self.response_times) / len(self.response_times)
self.response_times = self.response_times[-100:] # 直近100件のみ保持
# キャッシュヒット率
cache_hit_ratio = 0
try:
cache_info = cache.redis_client.info('stats')
hits = cache_info.get('keyspace_hits', 0)
misses = cache_info.get('keyspace_misses', 0)
if hits + misses > 0:
cache_hit_ratio = hits / (hits + misses) * 100
except:
pass
metrics = ApplicationMetrics(
timestamp=datetime.utcnow(),
active_connections=0, # FastAPIから取得
request_count=self.request_count,
error_count=self.error_count,
avg_response_time=avg_response_time,
cache_hit_ratio=cache_hit_ratio,
database_connections=db_connections
)
self.app_metrics_history.append(metrics)
if len(self.app_metrics_history) > self.max_history_size:
self.app_metrics_history.pop(0)
return metrics
def record_request(self, response_time: float, is_error: bool = False):
"""リクエスト記録"""
self.request_count += 1
self.response_times.append(response_time)
if is_error:
self.error_count += 1
def get_summary(self) -> Dict[str, Any]:
"""メトリクス要約取得"""
if not self.system_metrics_history or not self.app_metrics_history:
return {}
recent_system = self.system_metrics_history[-10:] # 直近10件
recent_app = self.app_metrics_history[-10:]
return {
"system": {
"avg_cpu": sum(m.cpu_percent for m in recent_system) / len(recent_system),
"avg_memory": sum(m.memory_percent for m in recent_system) / len(recent_system),
"current_memory_mb": recent_system[-1].memory_used_mb,
"disk_usage": recent_system[-1].disk_usage_percent
},
"application": {
"total_requests": self.request_count,
"total_errors": self.error_count,
"error_rate": (self.error_count / self.request_count * 100) if self.request_count > 0 else 0,
"avg_response_time": recent_app[-1].avg_response_time,
"cache_hit_ratio": recent_app[-1].cache_hit_ratio,
"db_connections": recent_app[-1].database_connections
},
"timestamp": datetime.utcnow().isoformat()
}
# グローバルメトリクスコレクター
metrics_collector = MetricsCollector()
# ミドルウェア
from fastapi import Request, Response
from starlette.middleware.base import BaseHTTPMiddleware
import time
class MetricsMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
start_time = time.time()
try:
response = await call_next(request)
# リクエスト記録
response_time = time.time() - start_time
is_error = response.status_code >= 400
metrics_collector.record_request(response_time, is_error)
# レスポンスヘッダーに追加
response.headers["X-Response-Time"] = f"{response_time:.3f}"
return response
except Exception as e:
response_time = time.time() - start_time
metrics_collector.record_request(response_time, True)
raise
ログ管理
# app/core/logging_config.py
import logging
import logging.handlers
import json
from datetime import datetime
from typing import Dict, Any
import traceback
from app.core.config import settings
class JSONFormatter(logging.Formatter):
"""JSON形式ログフォーマッター"""
def format(self, record):
log_entry = {
"timestamp": datetime.utcnow().isoformat(),
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),
"module": record.module,
"function": record.funcName,
"line": record.lineno
}
# 例外情報追加
if record.exc_info:
log_entry["exception"] = {
"type": record.exc_info[0].__name__,
"message": str(record.exc_info[1]),
"traceback": traceback.format_exception(*record.exc_info)
}
# 追加フィールド
if hasattr(record, 'user_id'):
log_entry["user_id"] = record.user_id
if hasattr(record, 'organization_id'):
log_entry["organization_id"] = record.organization_id
if hasattr(record, 'request_id'):
log_entry["request_id"] = record.request_id
return json.dumps(log_entry)
def setup_logging():
"""ログ設定初期化"""
# ルートロガー設定
root_logger = logging.getLogger()
root_logger.setLevel(getattr(logging, settings.LOG_LEVEL))
# 既存ハンドラー削除
for handler in root_logger.handlers[:]:
root_logger.removeHandler(handler)
# コンソールハンドラー
console_handler = logging.StreamHandler()
console_handler.setFormatter(JSONFormatter())
root_logger.addHandler(console_handler)
# ファイルハンドラー(本番環境)
if not settings.DEBUG:
file_handler = logging.handlers.RotatingFileHandler(
"app.log",
maxBytes=10*1024*1024, # 10MB
backupCount=5
)
file_handler.setFormatter(JSONFormatter())
root_logger.addHandler(file_handler)
# 外部ログ収集(例:Datadog)
if hasattr(settings, 'DATADOG_API_KEY'):
datadog_handler = DatadogHandler(settings.DATADOG_API_KEY)
root_logger.addHandler(datadog_handler)
class StructuredLogger:
"""構造化ログヘルパー"""
def __init__(self, name: str):
self.logger = logging.getLogger(name)
def info(self, message: str, **kwargs):
self._log(logging.INFO, message, **kwargs)
def warning(self, message: str, **kwargs):
self._log(logging.WARNING, message, **kwargs)
def error(self, message: str, **kwargs):
self._log(logging.ERROR, message, **kwargs)
def _log(self, level: int, message: str, **kwargs):
extra = {k: v for k, v in kwargs.items() if k not in ['exc_info']}
self.logger.log(level, message, extra=extra, exc_info=kwargs.get('exc_info'))
# アプリケーションロガー
app_logger = StructuredLogger("saas_platform")
# 使用例
def log_user_action(user_id: int, action: str, details: Dict[str, Any]):
app_logger.info(
f"User action: {action}",
user_id=user_id,
action=action,
details=details
)
アラート システム
# app/core/alerting.py
from typing import Dict, Any, List, Callable
from enum import Enum
from dataclasses import dataclass
from datetime import datetime, timedelta
import asyncio
class AlertSeverity(Enum):
INFO = "info"
WARNING = "warning"
ERROR = "error"
CRITICAL = "critical"
@dataclass
class Alert:
id: str
title: str
description: str
severity: AlertSeverity
source: str
timestamp: datetime
metadata: Dict[str, Any]
resolved: bool = False
resolved_at: datetime = None
class AlertRule:
def __init__(
self,
name: str,
condition: Callable[[Dict[str, Any]], bool],
severity: AlertSeverity,
description: str,
cooldown_minutes: int = 15
):
self.name = name
self.condition = condition
self.severity = severity
self.description = description
self.cooldown = timedelta(minutes=cooldown_minutes)
self.last_triggered = None
def should_trigger(self, metrics: Dict[str, Any]) -> bool:
"""アラート発火条件チェック"""
# クールダウン期間中は発火しない
if self.last_triggered and datetime.utcnow() - self.last_triggered < self.cooldown:
return False
return self.condition(metrics)
def trigger(self) -> Alert:
"""アラート発火"""
self.last_triggered = datetime.utcnow()
return Alert(
id=f"{self.name}_{int(self.last_triggered.timestamp())}",
title=f"Alert: {self.name}",
description=self.description,
severity=self.severity,
source="system_monitor",
timestamp=self.last_triggered,
metadata={"rule": self.name}
)
class AlertManager:
def __init__(self):
self.rules: List[AlertRule] = []
self.active_alerts: Dict[str, Alert] = {}
self.alert_handlers: List[Callable[[Alert], None]] = []
# デフォルトルール設定
self._setup_default_rules()
def add_rule(self, rule: AlertRule):
"""アラートルール追加"""
self.rules.append(rule)
def add_handler(self, handler: Callable[[Alert], None]):
"""アラートハンドラー追加"""
self.alert_handlers.append(handler)
async def check_alerts(self, metrics: Dict[str, Any]):
"""アラートチェック実行"""
for rule in self.rules:
if rule.should_trigger(metrics):
alert = rule.trigger()
await self._handle_alert(alert)
async def _handle_alert(self, alert: Alert):
"""アラート処理"""
self.active_alerts[alert.id] = alert
# 各ハンドラーで処理
for handler in self.alert_handlers:
try:
await self._run_handler(handler, alert)
except Exception as e:
app_logger.error(f"Alert handler error: {e}", alert_id=alert.id)
async def _run_handler(self, handler: Callable, alert: Alert):
"""ハンドラー実行"""
if asyncio.iscoroutinefunction(handler):
await handler(alert)
else:
handler(alert)
def resolve_alert(self, alert_id: str):
"""アラート解決"""
if alert_id in self.active_alerts:
alert = self.active_alerts[alert_id]
alert.resolved = True
alert.resolved_at = datetime.utcnow()
del self.active_alerts[alert_id]
def _setup_default_rules(self):
"""デフォルトアラートルール設定"""
# CPU使用率アラート
self.add_rule(AlertRule(
name="high_cpu_usage",
condition=lambda m: m.get("system", {}).get("avg_cpu", 0) > 80,
severity=AlertSeverity.WARNING,
description="CPU使用率が80%を超えています"
))
# メモリ使用率アラート
self.add_rule(AlertRule(
name="high_memory_usage",
condition=lambda m: m.get("system", {}).get("avg_memory", 0) > 85,
severity=AlertSeverity.ERROR,
description="メモリ使用率が85%を超えています"
))
# エラー率アラート
self.add_rule(AlertRule(
name="high_error_rate",
condition=lambda m: m.get("application", {}).get("error_rate", 0) > 5,
severity=AlertSeverity.ERROR,
description="エラー率が5%を超えています"
))
# レスポンス時間アラート
self.add_rule(AlertRule(
name="slow_response_time",
condition=lambda m: m.get("application", {}).get("avg_response_time", 0) > 2.0,
severity=AlertSeverity.WARNING,
description="平均レスポンス時間が2秒を超えています"
))
# Slack通知ハンドラー
async def slack_alert_handler(alert: Alert):
"""Slack通知送信"""
import httpx
webhook_url = settings.SLACK_WEBHOOK_URL
if not webhook_url:
return
color_map = {
AlertSeverity.INFO: "#36a64f",
AlertSeverity.WARNING: "#ff9900",
AlertSeverity.ERROR: "#ff0000",
AlertSeverity.CRITICAL: "#8b0000"
}
message = {
"attachments": [{
"color": color_map.get(alert.severity, "#36a64f"),
"title": alert.title,
"text": alert.description,
"fields": [
{"title": "Severity", "value": alert.severity.value, "short": True},
{"title": "Source", "value": alert.source, "short": True},
{"title": "Time", "value": alert.timestamp.isoformat(), "short": False}
]
}]
}
async with httpx.AsyncClient() as client:
await client.post(webhook_url, json=message)
# グローバルアラートマネージャー
alert_manager = AlertManager()
alert_manager.add_handler(slack_alert_handler)
CI/CDとデプロイ戦略
Docker設定
# Dockerfile
FROM python:3.11-slim
WORKDIR /app
# システム依存関係
RUN apt-get update && apt-get install -y \
gcc \
postgresql-client \
&& rm -rf /var/lib/apt/lists/*
# Python依存関係
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# アプリケーションコード
COPY . .
# 非rootユーザー作成
RUN useradd --create-home --shell /bin/bash app \
&& chown -R app:app /app
USER app
# ヘルスチェック
HEALTHCHECK --interval=30s --timeout=30s --start-period=5s --retries=3 \
CMD curl -f http://localhost:8000/health || exit 1
EXPOSE 8000
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]
# docker-compose.yml
version: '3.8'
services:
app:
build: .
ports:
- "8000:8000"
environment:
- DATABASE_URL=postgresql://postgres:password@db:5432/saas_platform
- REDIS_URL=redis://redis:6379
depends_on:
- db
- redis
volumes:
- ./logs:/app/logs
deploy:
replicas: 3
resources:
limits:
memory: 512M
reservations:
memory: 256M
db:
image: postgres:15
environment:
POSTGRES_DB: saas_platform
POSTGRES_USER: postgres
POSTGRES_PASSWORD: password
volumes:
- postgres_data:/var/lib/postgresql/data
- ./init.sql:/docker-entrypoint-initdb.d/init.sql
deploy:
resources:
limits:
memory: 1G
redis:
image: redis:7-alpine
volumes:
- redis_data:/data
deploy:
resources:
limits:
memory: 256M
nginx:
image: nginx:alpine
ports:
- "80:80"
- "443:443"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf
- ./ssl:/etc/nginx/ssl
depends_on:
- app
volumes:
postgres_data:
redis_data:
GitHub Actions CI/CD
# .github/workflows/ci-cd.yml
name: CI/CD Pipeline
on:
push:
branches: [main, develop]
pull_request:
branches: [main]
env:
REGISTRY: ghcr.io
IMAGE_NAME: $
jobs:
test:
runs-on: ubuntu-latest
services:
postgres:
image: postgres:15
env:
POSTGRES_PASSWORD: postgres
POSTGRES_DB: test_db
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
ports:
- 5432:5432
redis:
image: redis
options: >-
--health-cmd "redis-cli ping"
--health-interval 10s
--health-timeout 5s
--health-retries 5
ports:
- 6379:6379
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.11'
- name: Cache dependencies
uses: actions/cache@v3
with:
path: ~/.cache/pip
key: $-pip-$
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
pip install pytest pytest-cov pytest-asyncio
- name: Run linting
run: |
pip install flake8 black isort
flake8 app
black --check app
isort --check-only app
- name: Run tests
env:
DATABASE_URL: postgresql://postgres:postgres@localhost:5432/test_db
REDIS_URL: redis://localhost:6379
SECRET_KEY: test-secret-key
run: |
pytest tests/ -v --cov=app --cov-report=xml
- name: Upload coverage
uses: codecov/codecov-action@v3
with:
file: ./coverage.xml
security:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Run security scan
run: |
pip install safety bandit
safety check
bandit -r app/
build:
needs: [test, security]
runs-on: ubuntu-latest
if: github.event_name == 'push'
steps:
- uses: actions/checkout@v3
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v2
- name: Log in to Container Registry
uses: docker/login-action@v2
with:
registry: $
username: $
password: $
- name: Extract metadata
id: meta
uses: docker/metadata-action@v4
with:
images: $/$
tags: |
type=ref,event=branch
type=ref,event=pr
type=sha
- name: Build and push
uses: docker/build-push-action@v4
with:
context: .
push: true
tags: $
labels: $
cache-from: type=gha
cache-to: type=gha,mode=max
deploy:
needs: build
runs-on: ubuntu-latest
if: github.ref == 'refs/heads/main'
environment: production
steps:
- name: Deploy to production
run: |
echo "Deploying to production..."
# Kubernetes deployment script
kubectl set image deployment/saas-platform-app \
app=$/$:$
トラブルシューティング
FastAPI + SQLAlchemy固有の問題
問題1: SQLAlchemy接続エラー
症状:
connection timeout
エラーSQLSTATE[08006] server closed connection
エラー- データベース操作の失敗
診断手順:
# app/core/database_diagnostics.py
import sqlalchemy
from sqlalchemy import text, create_engine
from sqlalchemy.pool import StaticPool
from app.core.config import settings
import logging
logger = logging.getLogger(__name__)
async def diagnose_database_connection():
"""データベース接続診断"""
logger.info("Starting database connection diagnostics...")
# 1. 設定確認
db_url = settings.database_url
logger.info(f"Database URL configured: {bool(db_url)}")
logger.info(f"Database URL (masked): {db_url[:20]}...{db_url[-10:] if len(db_url) > 30 else db_url}")
# 2. 基本接続テスト
try:
engine = create_engine(
db_url,
poolclass=StaticPool,
pool_pre_ping=True,
echo=True # SQL ログ出力
)
with engine.connect() as connection:
result = connection.execute(text("SELECT 1 as test"))
test_value = result.scalar()
logger.info(f"Basic connection test: {test_value == 1}")
except Exception as e:
logger.error(f"Basic connection failed: {e}")
return False
# 3. 接続プール診断
try:
from app.core.database import engine as app_engine
pool = app_engine.pool
logger.info(f"Pool status:")
logger.info(f" - Size: {pool.size()}")
logger.info(f" - Checked in: {pool.checkedin()}")
logger.info(f" - Checked out: {pool.checkedout()}")
logger.info(f" - Overflow: {pool.overflow()}")
except Exception as e:
logger.error(f"Pool diagnostics failed: {e}")
# 4. パフォーマンステスト
try:
import time
start_time = time.time()
with engine.connect() as connection:
# 複数クエリ実行
for i in range(10):
connection.execute(text("SELECT pg_sleep(0.1)"))
execution_time = time.time() - start_time
logger.info(f"Performance test: {execution_time:.2f}s for 10 queries")
if execution_time > 5:
logger.warning("Slow database performance detected")
except Exception as e:
logger.error(f"Performance test failed: {e}")
return True
# 使用例
async def startup_diagnostics():
success = await diagnose_database_connection()
if not success:
raise Exception("Database diagnostics failed")
解決策:
# app/core/database_optimized.py
from sqlalchemy import create_engine, event
from sqlalchemy.pool import QueuePool, StaticPool
from sqlalchemy.orm import sessionmaker
from contextlib import contextmanager
import logging
logger = logging.getLogger(__name__)
class OptimizedDatabaseManager:
def __init__(self, database_url: str):
self.database_url = database_url
self.engine = None
self.SessionLocal = None
self._setup_engine()
def _setup_engine(self):
"""最適化されたエンジン設定"""
# 本番環境用設定
if settings.ENVIRONMENT == "production":
self.engine = create_engine(
self.database_url,
poolclass=QueuePool,
pool_size=20, # 基本接続数
max_overflow=30, # 追加接続数
pool_pre_ping=True, # 接続前ping
pool_recycle=3600, # 1時間で接続リサイクル
echo=False, # SQL ログ無効
connect_args={
"connect_timeout": 10,
"application_name": "fastapi_saas"
}
)
else:
# 開発環境用設定
self.engine = create_engine(
self.database_url,
poolclass=StaticPool,
pool_pre_ping=True,
echo=settings.DEBUG,
connect_args={"check_same_thread": False} if "sqlite" in self.database_url else {}
)
# 接続イベントリスナー
@event.listens_for(self.engine, "connect")
def set_sqlite_pragma(dbapi_connection, connection_record):
if "sqlite" in self.database_url:
cursor = dbapi_connection.cursor()
cursor.execute("PRAGMA foreign_keys=ON")
cursor.close()
@event.listens_for(self.engine, "checkout")
def receive_checkout(dbapi_connection, connection_record, connection_proxy):
logger.debug("Connection checked out from pool")
self.SessionLocal = sessionmaker(
autocommit=False,
autoflush=False,
bind=self.engine
)
@contextmanager
def get_session(self):
"""安全なセッション管理"""
session = self.SessionLocal()
try:
yield session
session.commit()
except Exception as e:
session.rollback()
logger.error(f"Database session error: {e}")
raise
finally:
session.close()
async def health_check(self) -> bool:
"""ヘルスチェック"""
try:
with self.get_session() as session:
session.execute(text("SELECT 1"))
return True
except Exception as e:
logger.error(f"Database health check failed: {e}")
return False
def get_connection_info(self) -> dict:
"""接続情報取得"""
pool = self.engine.pool
return {
"pool_size": pool.size(),
"checked_in": pool.checkedin(),
"checked_out": pool.checkedout(),
"overflow": pool.overflow(),
"invalid": pool.invalid()
}
# グローバルインスタンス
db_manager = OptimizedDatabaseManager(settings.database_url)
問題2: Alembicマイグレーションエラー
症状:
Target database is not up to date
エラー- マイグレーション実行失敗
- スキーマ不整合
診断手順:
# scripts/migration_diagnostics.py
import alembic
from alembic.config import Config
from alembic import command
from alembic.script import ScriptDirectory
from alembic.runtime.environment import EnvironmentContext
from sqlalchemy import create_engine, MetaData
import logging
logger = logging.getLogger(__name__)
class MigrationDiagnostics:
def __init__(self, alembic_cfg_path: str = "alembic.ini"):
self.alembic_cfg = Config(alembic_cfg_path)
self.engine = create_engine(settings.database_url)
def check_migration_status(self):
"""マイグレーション状態確認"""
logger.info("Checking migration status...")
# 現在のリビジョン確認
try:
with self.engine.connect() as connection:
context = EnvironmentContext(
self.alembic_cfg,
self.engine,
connection=connection
)
with context.begin_transaction():
current_rev = context.get_current_revision()
logger.info(f"Current database revision: {current_rev}")
except Exception as e:
logger.error(f"Failed to get current revision: {e}")
return False
# 利用可能なマイグレーション確認
try:
script_dir = ScriptDirectory.from_config(self.alembic_cfg)
heads = script_dir.get_heads()
logger.info(f"Available heads: {heads}")
# 待機中のマイグレーション
revisions = script_dir.walk_revisions("head", current_rev)
pending = list(revisions)
logger.info(f"Pending migrations: {len(pending)}")
for rev in pending[:5]: # 最初の5件のみ表示
logger.info(f" - {rev.revision}: {rev.doc}")
except Exception as e:
logger.error(f"Failed to check pending migrations: {e}")
return False
return True
def validate_models_vs_database(self):
"""モデルとデータベースの整合性確認"""
try:
from app.models import Base
# モデルから期待されるメタデータ
model_metadata = Base.metadata
# データベースの実際のメタデータ
db_metadata = MetaData()
db_metadata.reflect(bind=self.engine)
# テーブル比較
model_tables = set(model_metadata.tables.keys())
db_tables = set(db_metadata.tables.keys())
missing_in_db = model_tables - db_tables
extra_in_db = db_tables - model_tables
if missing_in_db:
logger.warning(f"Tables missing in database: {missing_in_db}")
if extra_in_db:
logger.warning(f"Extra tables in database: {extra_in_db}")
# 共通テーブルのカラム比較
common_tables = model_tables & db_tables
for table_name in common_tables:
model_table = model_metadata.tables[table_name]
db_table = db_metadata.tables[table_name]
model_columns = set(model_table.columns.keys())
db_columns = set(db_table.columns.keys())
missing_columns = model_columns - db_columns
extra_columns = db_columns - model_columns
if missing_columns:
logger.warning(f"Table {table_name} - missing columns: {missing_columns}")
if extra_columns:
logger.warning(f"Table {table_name} - extra columns: {extra_columns}")
return len(missing_in_db) == 0 and len(extra_in_db) == 0
except Exception as e:
logger.error(f"Model validation failed: {e}")
return False
def repair_migration_state(self):
"""マイグレーション状態修復"""
logger.info("Attempting to repair migration state...")
try:
# 1. データベースのバックアップ推奨メッセージ
logger.warning("IMPORTANT: Please backup your database before proceeding!")
# 2. マイグレーション履歴テーブル確認
with self.engine.connect() as connection:
result = connection.execute(text("""
SELECT table_name
FROM information_schema.tables
WHERE table_name = 'alembic_version'
"""))
if not result.fetchone():
logger.info("Creating alembic_version table...")
command.stamp(self.alembic_cfg, "head")
# 3. 強制的に最新リビジョンにスタンプ(慎重に)
logger.info("Stamping database to head revision...")
command.stamp(self.alembic_cfg, "head")
logger.info("Migration repair completed")
return True
except Exception as e:
logger.error(f"Migration repair failed: {e}")
return False
# 使用例
diagnostics = MigrationDiagnostics()
diagnostics.check_migration_status()
diagnostics.validate_models_vs_database()
解決策:
# scripts/safe_migration.sh
#!/bin/bash
set -e
echo "🔍 Starting safe migration process..."
# 1. バックアップ作成
echo "📦 Creating database backup..."
pg_dump $DATABASE_URL > backup_$(date +%Y%m%d_%H%M%S).sql
# 2. マイグレーション状態確認
echo "📋 Checking migration status..."
python -c "
from scripts.migration_diagnostics import MigrationDiagnostics
diag = MigrationDiagnostics()
diag.check_migration_status()
diag.validate_models_vs_database()
"
# 3. ドライラン実行
echo "🧪 Performing dry run..."
alembic upgrade head --sql > migration_preview.sql
echo "Migration preview saved to migration_preview.sql"
# 4. 確認プロンプト
read -p "🤔 Continue with migration? (y/N): " -n 1 -r
echo
if [[ ! $REPLY =~ ^[Yy]$ ]]; then
echo "❌ Migration cancelled"
exit 1
fi
# 5. 実際のマイグレーション実行
echo "🚀 Running migration..."
alembic upgrade head
# 6. 検証
echo "✅ Verifying migration..."
python -c "
from scripts.migration_diagnostics import MigrationDiagnostics
diag = MigrationDiagnostics()
success = diag.validate_models_vs_database()
exit(0 if success else 1)
"
echo "🎉 Migration completed successfully!"
問題3: マルチテナント分離の問題
症状:
- 別テナントのデータが見える
- RLS ポリシーが動作しない
- 認可エラー
診断手順:
# app/utils/tenant_diagnostics.py
from typing import List, Dict, Any
from sqlalchemy.orm import Session
from sqlalchemy import text
from app.core.database import get_db
from app.models.organization import Organization
from app.models.user import User
import logging
logger = logging.getLogger(__name__)
class TenantIsolationDiagnostics:
def __init__(self, db: Session):
self.db = db
async def verify_rls_policies(self) -> Dict[str, Any]:
"""RLS ポリシー確認"""
logger.info("Verifying RLS policies...")
# 1. RLS が有効なテーブル確認
result = self.db.execute(text("""
SELECT schemaname, tablename, rowsecurity
FROM pg_tables
WHERE schemaname = 'public'
AND tablename IN ('users', 'organizations', 'projects', 'tasks')
"""))
rls_status = {}
for row in result:
rls_status[row.tablename] = row.rowsecurity
logger.info(f"RLS status: {rls_status}")
# 2. 適用されているポリシー確認
policies = self.db.execute(text("""
SELECT schemaname, tablename, policyname, cmd, qual, with_check
FROM pg_policies
WHERE schemaname = 'public'
ORDER BY tablename, policyname
"""))
policy_info = []
for policy in policies:
policy_info.append({
'table': policy.tablename,
'policy': policy.policyname,
'command': policy.cmd,
'condition': policy.qual,
'check': policy.with_check
})
logger.info(f"Active policies: {len(policy_info)}")
return {
'rls_enabled': rls_status,
'policies': policy_info
}
async def test_tenant_isolation(
self,
user1_id: int,
user2_id: int,
org1_id: int,
org2_id: int
) -> bool:
"""テナント分離テスト"""
logger.info("Testing tenant isolation...")
try:
# 1. ユーザー1でログイン状態をシミュレート
self.db.execute(text("""
SELECT set_config('request.jwt.claim.sub', :user_id, true)
"""), {"user_id": str(user1_id)})
# 2. 組織1のプロジェクト確認
org1_projects = self.db.execute(text("""
SELECT id, name, organization_id
FROM projects
WHERE organization_id = :org_id
"""), {"org_id": org1_id}).fetchall()
logger.info(f"User1 can see {len(org1_projects)} projects from org1")
# 3. 組織2のプロジェクト確認(見えてはいけない)
org2_projects = self.db.execute(text("""
SELECT id, name, organization_id
FROM projects
WHERE organization_id = :org_id
"""), {"org_id": org2_id}).fetchall()
logger.info(f"User1 can see {len(org2_projects)} projects from org2")
# 4. 分離確認
if len(org2_projects) > 0:
logger.error("SECURITY VIOLATION: User can see other tenant's data!")
return False
# 5. ユーザー2でテスト
self.db.execute(text("""
SELECT set_config('request.jwt.claim.sub', :user_id, true)
"""), {"user_id": str(user2_id)})
org1_projects_user2 = self.db.execute(text("""
SELECT id, name, organization_id
FROM projects
WHERE organization_id = :org_id
"""), {"org_id": org1_id}).fetchall()
if len(org1_projects_user2) > 0:
logger.error("SECURITY VIOLATION: User2 can see org1's data!")
return False
logger.info("✅ Tenant isolation test passed")
return True
except Exception as e:
logger.error(f"Tenant isolation test failed: {e}")
return False
async def fix_missing_rls_policies(self):
"""不足しているRLSポリシーの修復"""
logger.info("Checking and fixing RLS policies...")
# 必要なポリシー定義
required_policies = [
{
'table': 'projects',
'policy': 'tenant_isolation',
'sql': '''
CREATE POLICY tenant_isolation ON projects
FOR ALL USING (
organization_id IN (
SELECT organization_id
FROM user_organizations uo
WHERE uo.user_id = (current_setting('request.jwt.claim.sub'))::int
)
)
'''
},
{
'table': 'tasks',
'policy': 'tenant_isolation',
'sql': '''
CREATE POLICY tenant_isolation ON tasks
FOR ALL USING (
project_id IN (
SELECT p.id
FROM projects p
JOIN user_organizations uo ON p.organization_id = uo.organization_id
WHERE uo.user_id = (current_setting('request.jwt.claim.sub'))::int
)
)
'''
}
]
for policy_def in required_policies:
try:
# 既存ポリシー確認
existing = self.db.execute(text(f"""
SELECT 1 FROM pg_policies
WHERE tablename = '{policy_def['table']}'
AND policyname = '{policy_def['policy']}'
""")).fetchone()
if not existing:
logger.info(f"Creating policy {policy_def['policy']} for {policy_def['table']}")
self.db.execute(text(policy_def['sql']))
# RLS有効化
self.db.execute(text(f"""
ALTER TABLE {policy_def['table']} ENABLE ROW LEVEL SECURITY
"""))
self.db.commit()
except Exception as e:
logger.error(f"Failed to create policy for {policy_def['table']}: {e}")
self.db.rollback()
# 使用例
async def diagnose_tenant_security():
db = next(get_db())
diagnostics = TenantIsolationDiagnostics(db)
# RLS確認
rls_info = await diagnostics.verify_rls_policies()
# テナント分離テスト
isolation_ok = await diagnostics.test_tenant_isolation(
user1_id=1, user2_id=2, org1_id=1, org2_id=2
)
if not isolation_ok:
await diagnostics.fix_missing_rls_policies()
パフォーマンス問題
問題4: N+1クエリ問題
症状:
- 大量のデータベースクエリ実行
- ページ読み込みが遅い
- CPU使用率が高い
診断手順:
# app/utils/query_diagnostics.py
import time
import logging
from typing import List, Dict, Any
from sqlalchemy import event
from sqlalchemy.engine import Engine
from sqlalchemy.orm import Session
from contextlib import contextmanager
from collections import defaultdict
logger = logging.getLogger(__name__)
class QueryDiagnostics:
def __init__(self):
self.queries: List[Dict[str, Any]] = []
self.query_counts = defaultdict(int)
self.monitoring = False
def start_monitoring(self):
"""クエリ監視開始"""
self.monitoring = True
self.queries.clear()
self.query_counts.clear()
@event.listens_for(Engine, "before_cursor_execute")
def receive_before_cursor_execute(conn, cursor, statement, parameters, context, executemany):
if self.monitoring:
context._query_start_time = time.time()
@event.listens_for(Engine, "after_cursor_execute")
def receive_after_cursor_execute(conn, cursor, statement, parameters, context, executemany):
if self.monitoring:
total = time.time() - context._query_start_time
# クエリ正規化(パラメータを除去)
normalized_query = self._normalize_query(statement)
query_info = {
'query': statement,
'normalized': normalized_query,
'parameters': parameters,
'duration': total,
'timestamp': time.time()
}
self.queries.append(query_info)
self.query_counts[normalized_query] += 1
# スロークエリ警告
if total > 1.0:
logger.warning(f"Slow query detected: {total:.2f}s - {statement[:100]}...")
def stop_monitoring(self):
"""クエリ監視停止"""
self.monitoring = False
def _normalize_query(self, query: str) -> str:
"""クエリ正規化"""
import re
# パラメータを置換
normalized = re.sub(r'%\([^)]*\)s', '?', query)
normalized = re.sub(r'\$\d+', '?', normalized)
# 複数の空白を単一に
normalized = re.sub(r'\s+', ' ', normalized.strip())
return normalized
def analyze_queries(self) -> Dict[str, Any]:
"""クエリ分析"""
if not self.queries:
return {"error": "No queries recorded"}
total_queries = len(self.queries)
total_time = sum(q['duration'] for q in self.queries)
# N+1問題検出
n_plus_one_patterns = []
for normalized, count in self.query_counts.items():
if count > 10 and 'SELECT' in normalized and 'WHERE' in normalized:
n_plus_one_patterns.append({
'query': normalized,
'count': count,
'severity': 'high' if count > 50 else 'medium'
})
# 最遅クエリ
slowest_queries = sorted(self.queries, key=lambda x: x['duration'], reverse=True)[:5]
# 頻繁なクエリ
frequent_queries = sorted(
self.query_counts.items(),
key=lambda x: x[1],
reverse=True
)[:10]
return {
'total_queries': total_queries,
'total_time': total_time,
'avg_time': total_time / total_queries,
'n_plus_one_patterns': n_plus_one_patterns,
'slowest_queries': [
{
'query': q['query'][:200] + '...',
'duration': q['duration'],
'parameters': str(q['parameters'])[:100]
}
for q in slowest_queries
],
'frequent_queries': [
{'query': q[:200] + '...', 'count': count}
for q, count in frequent_queries
]
}
@contextmanager
def monitor_request(self, endpoint: str):
"""リクエスト単位でのクエリ監視"""
logger.info(f"Starting query monitoring for {endpoint}")
self.start_monitoring()
try:
yield self
finally:
self.stop_monitoring()
analysis = self.analyze_queries()
# N+1問題警告
if analysis.get('n_plus_one_patterns'):
logger.warning(f"N+1 query patterns detected in {endpoint}:")
for pattern in analysis['n_plus_one_patterns']:
logger.warning(f" - {pattern['query'][:100]}... (count: {pattern['count']})")
logger.info(f"Query analysis for {endpoint}: {analysis['total_queries']} queries, {analysis['total_time']:.2f}s total")
# グローバルインスタンス
query_diagnostics = QueryDiagnostics()
# FastAPI依存性注入
from fastapi import Depends
def get_query_monitor():
return query_diagnostics
# 使用例
@app.get("/api/v1/projects")
async def get_projects(
current_user: User = Depends(get_current_user),
query_monitor: QueryDiagnostics = Depends(get_query_monitor)
):
with query_monitor.monitor_request("GET /api/v1/projects"):
# この中でのクエリが監視される
projects = await project_service.get_user_projects(current_user.id)
return projects
解決策:
# app/services/optimized_project_service.py
from sqlalchemy.orm import Session, joinedload, selectinload
from typing import List
from app.models.project import Project
from app.models.user import User
class OptimizedProjectService:
def __init__(self, db: Session):
self.db = db
def get_user_projects_optimized(self, user_id: int) -> List[Project]:
"""N+1問題を解決したプロジェクト取得"""
# 悪い例(N+1問題あり):
# projects = self.db.query(Project).filter_by(user_id=user_id).all()
# for project in projects:
# _ = project.tasks # これが各プロジェクトごとに別のクエリを実行
# 良い例(eager loading使用):
projects = (
self.db.query(Project)
.options(
joinedload(Project.organization), # 1-to-1 relation
selectinload(Project.tasks).selectinload(Task.assignee), # 1-to-many relation
selectinload(Project.members).selectinload(ProjectMember.user)
)
.join(Project.members)
.filter(ProjectMember.user_id == user_id)
.filter(ProjectMember.is_active == True)
.all()
)
return projects
def get_project_with_tasks_batch(self, project_ids: List[int]) -> Dict[int, List[Task]]:
"""バッチでタスク取得"""
from app.models.task import Task
# プロジェクトIDでタスクを一括取得
tasks = (
self.db.query(Task)
.options(joinedload(Task.assignee))
.filter(Task.project_id.in_(project_ids))
.all()
)
# プロジェクトIDでグループ化
tasks_by_project = defaultdict(list)
for task in tasks:
tasks_by_project[task.project_id].append(task)
return dict(tasks_by_project)
async def get_dashboard_data_optimized(self, user_id: int) -> Dict[str, Any]:
"""ダッシュボード用最適化クエリ"""
# 1つのクエリで必要なデータを全て取得
result = self.db.execute(text("""
WITH user_projects AS (
SELECT DISTINCT p.id, p.name, p.status, p.organization_id
FROM projects p
JOIN project_members pm ON p.id = pm.project_id
WHERE pm.user_id = :user_id AND pm.is_active = true
),
project_stats AS (
SELECT
up.id as project_id,
up.name as project_name,
COUNT(t.id) as total_tasks,
COUNT(CASE WHEN t.status = 'completed' THEN 1 END) as completed_tasks,
COUNT(CASE WHEN t.assignee_id = :user_id THEN 1 END) as my_tasks
FROM user_projects up
LEFT JOIN tasks t ON up.id = t.project_id
GROUP BY up.id, up.name
)
SELECT
ps.*,
CASE
WHEN ps.total_tasks > 0
THEN (ps.completed_tasks * 100.0 / ps.total_tasks)
ELSE 0
END as completion_percentage
FROM project_stats ps
ORDER BY ps.project_name
"""), {"user_id": user_id})
projects = []
for row in result:
projects.append({
'id': row.project_id,
'name': row.project_name,
'total_tasks': row.total_tasks,
'completed_tasks': row.completed_tasks,
'my_tasks': row.my_tasks,
'completion_percentage': float(row.completion_percentage)
})
return {
'projects': projects,
'total_projects': len(projects),
'total_tasks': sum(p['total_tasks'] for p in projects),
'total_my_tasks': sum(p['my_tasks'] for p in projects)
}
デバッグとモニタリング
包括的ログ設定
# app/core/logging_config.py
import logging
import logging.config
from typing import Dict, Any
import json
import traceback
from datetime import datetime
class JSONFormatter(logging.Formatter):
"""JSON形式ログフォーマッター"""
def format(self, record):
log_entry = {
'timestamp': datetime.utcnow().isoformat(),
'level': record.levelname,
'logger': record.name,
'message': record.getMessage(),
'module': record.module,
'function': record.funcName,
'line': record.lineno
}
# 例外情報追加
if record.exc_info:
log_entry['exception'] = {
'type': record.exc_info[0].__name__,
'message': str(record.exc_info[1]),
'traceback': traceback.format_exception(*record.exc_info)
}
# カスタム属性追加
for key, value in record.__dict__.items():
if key.startswith('custom_'):
log_entry[key[7:]] = value # 'custom_' プレフィックス除去
return json.dumps(log_entry, ensure_ascii=False)
def setup_logging(environment: str = "development") -> None:
"""ログ設定初期化"""
if environment == "production":
config = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'json': {
'()': 'app.core.logging_config.JSONFormatter'
},
'standard': {
'format': '%(asctime)s [%(levelname)s] %(name)s: %(message)s'
}
},
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'level': 'INFO',
'formatter': 'json',
'stream': 'ext://sys.stdout'
},
'file': {
'class': 'logging.handlers.RotatingFileHandler',
'level': 'INFO',
'formatter': 'json',
'filename': 'logs/app.log',
'maxBytes': 10485760, # 10MB
'backupCount': 5
},
'error_file': {
'class': 'logging.handlers.RotatingFileHandler',
'level': 'ERROR',
'formatter': 'json',
'filename': 'logs/error.log',
'maxBytes': 10485760,
'backupCount': 5
}
},
'loggers': {
'app': {
'level': 'INFO',
'handlers': ['console', 'file', 'error_file'],
'propagate': False
},
'sqlalchemy.engine': {
'level': 'WARNING',
'handlers': ['file'],
'propagate': False
},
'alembic': {
'level': 'INFO',
'handlers': ['file'],
'propagate': False
}
},
'root': {
'level': 'WARNING',
'handlers': ['console']
}
}
else:
# 開発環境用設定
config = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'detailed': {
'format': '%(asctime)s [%(levelname)8s] %(name)s:%(lineno)d - %(message)s'
}
},
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'level': 'DEBUG',
'formatter': 'detailed'
}
},
'loggers': {
'app': {
'level': 'DEBUG',
'handlers': ['console'],
'propagate': False
},
'sqlalchemy.engine': {
'level': 'INFO',
'handlers': ['console'],
'propagate': False
}
},
'root': {
'level': 'INFO',
'handlers': ['console']
}
}
logging.config.dictConfig(config)
# ログヘルパー関数
def log_function_call(func):
"""関数呼び出しログデコレーター"""
def wrapper(*args, **kwargs):
logger = logging.getLogger(func.__module__)
logger.debug(f"Calling {func.__name__} with args={args}, kwargs={kwargs}")
try:
result = func(*args, **kwargs)
logger.debug(f"{func.__name__} completed successfully")
return result
except Exception as e:
logger.error(f"{func.__name__} failed", extra={
'custom_function': func.__name__,
'custom_error_type': type(e).__name__
})
raise
return wrapper
# 使用例
logger = logging.getLogger(__name__)
@log_function_call
def create_project(project_data: dict):
logger.info("Creating new project", extra={
'custom_project_name': project_data.get('name'),
'custom_organization_id': project_data.get('organization_id')
})
# プロジェクト作成処理...
logger.info("Project created successfully", extra={
'custom_project_id': project.id
})
まとめ
Chapter 5-3では、エンタープライズレベルの拡張性とパフォーマンス最適化を実装しました。
実装した機能:
- キャッシュとデータベース最適化
- メトリクス収集と監視システム
- アラート・ログ管理
- CI/CDパイプラインとコンテナー化
アーキテクチャ比較まとめ:
項目 | クライアント実装 | Edge Functions | 独立APIサーバー |
---|---|---|---|
開発速度 | ★★★ | ★★ | ★ |
拡張性 | ★ | ★★ | ★★★ |
セキュリティ | ★★ | ★★★ | ★★★ |
運用負荷 | ★★★ | ★★ | ★ |
コスト効率 | ★★★ | ★★ | ★ |
これで独立APIサーバーパターンの実装が完了しました。要件に応じて最適なアーキテクチャを選択し、段階的に移行する戦略が重要です。
章末演習・確認事項
演習問題
- キャッシュ戦略設計
- 与えられたAPIエンドポイントに対して最適なキャッシュ戦略を設計してください
- TTL値の設定根拠を説明してください
- メトリクス分析
- サンプルメトリクスデータから性能課題を特定してください
- 改善提案を3つ以上提示してください
- アラート設定
- 想定シナリオに基づいてアラートルールを設計してください
- 誤検知を最小化する戦略を含めてください
- CI/CD設計
- セキュリティを考慮したCI/CDパイプラインを設計してください
- 各ステージの目的と実行内容を明記してください
確認事項
技術理解
- Redis を使用したキャッシュ実装ができる
- データベースパーティショニング戦略を説明できる
- ロードバランシングアルゴリズムの違いを理解している
- メトリクス収集とアラート設計ができる
- CI/CDパイプラインを構築できる
- コンテナー化のベストプラクティスを理解している
実装能力
- パフォーマンス最適化コードを書ける
- 監視システムを構築できる
- 自動化スクリプトを作成できる
- セキュリティを考慮した設定ができる
- スケーラブルなアーキテクチャを設計できる
推奨学習時間
- 理論学習: 6-8時間
- 実装演習: 12-16時間
- 運用演習: 4-6時間
合計: 22-30時間
📝 Chapter 5-3 学習まとめ
✅ 習得できたスキル
- ✅ エンタープライズ級スケーリング戦略・アーキテクチャ設計
- ✅ 高性能システムのパフォーマンス最適化・キャッシュ戦略
- ✅ 本番運用レベルの監視・アラート・自動化システム
- ✅ CI/CD・コンテナ化・DevOps による効率的運用体制
🎯 Chapter 5シリーズ最終達成状況
| Chapter | 主要テーマ | 技術習得レベル | 実用適用範囲 | |:——–|:———-|:————-|:————-| | 5-1 | APIサーバー基礎 | 🌱 FastAPI + SQLAlchemy | 小規模サービス・MVP | | 5-2 | マルチテナント | 🚀 企業分離・複雑権限 | 中規模SaaS・B2Bサービス | | 5-3 | スケーリング・運用 | 💪 エンタープライズ級 | 大規模サービス・国際展開 |
🔄 Part II(アーキテクチャパターン)完了
3つのアーキテクチャパターンの習得完了:
- ✅ Chapter 3: クライアントサイド(個人・小規模チーム向け)
- ✅ Chapter 4: Edge Functions(スタートアップ・中規模向け)
- ✅ Chapter 5: 独立APIサーバー(エンタープライズ・大規模向け)
🚀 次章予告:パフォーマンス最適化
Chapter 6では、「F1マシンのチューニング・エンジニア」レベルの最適化技術を学習します:
- ⚡ データベース最適化: インデックス・パーティション・クエリチューニング
- 🚀 API高速化: PostgREST設定・キャッシュ戦略・レスポンス最適化
- 📊 監視・測定: パフォーマンス計測・ボトルネック特定・継続改善
- 🔧 自動最適化: AI駆動の性能改善・プロアクティブなチューニング
💡 実装目標: 「ユーザーが体感する待ち時間を1秒未満に最適化する高速システム」
📍 ナビゲーション
- 📚 目次: 📖 学習ガイド
- ⬅️ 前の章: Chapter 5-2: マルチテナンシーと複雑ビジネスロジック
- ➡️ 次の章: Chapter 6: パフォーマンス最適化
-
🏠 関連章: Chapter 5-1: APIサーバー基礎 Chapter 8: 運用監視 -
🔧 リソース: スケーリング設計 運用チェックリスト