Chapter 5-3: 拡張性設計とパフォーマンス最適化 📈


📚 目次に戻る: 📖 学習ガイド
⬅️ 前の章: Chapter 5-2: マルチテナンシーと複雑ビジネスロジック
➡️ 次の章: Chapter 6: パフォーマンス最適化
🏗️ アーキテクチャ: 独立APIサーバー(FastAPI + スケーリング・最適化)
🎯 学習レベル: 🌱 基礎 | 🚀 応用 | 💪 発展
⏱️ 推定学習時間: 5-7時間
📝 難易度: 上級(5-1,5-2完了・インフラ・運用知識必要) —

🎯 この章で学ぶこと(初心者向け)

この章では、Chapter 5-1, 5-2で構築したSaaSプラットフォームを、「全国チェーン展開」レベルまで成長させる技術を学びます。

  • 🌱 初心者: システムが重くならない仕組み(キャッシュ・最適化)がわかる
  • 🚀 中級者: 大量アクセスに耐えるスケーリング戦略がわかる
  • 💪 上級者: エンタープライズ級の監視・運用システムが構築できる

💡 まずは身近な例から:「全国チェーン展開する人気ラーメン店」

想像してみてください。あなたの小さなラーメン店が大人気になって、全国チェーン展開することになりました:

🍜 人気ラーメンチェーン「麺屋サプリ」の成長ストーリー

📍 Phase 1: 個人店(現在のシステム)
├── 👨‍🍳 店長1人:注文から配膳まで全て対応
├── 🪑 座席10席:お客さん10人まで
└── 📝 手書き注文:メモ帳で管理

📍 Phase 2: 地域チェーン(スケールアップ)
├── 👥 スタッフ5人:役割分担で効率化
├── 🪑 座席30席:より多くのお客さんに対応
└── 📱 デジタル注文:タブレットで管理

📍 Phase 3: 全国チェーン(スケールアウト)
├── 🏢 50店舗:各地に支店を展開
├── 🏭 セントラルキッチン:効率的な仕込み
├── 📊 本部システム:全店舗の一元管理
└── 🤖 自動化:注文・在庫・売上の自動化

🤔 なぜスケーリングが必要?

小さなシステムと大規模システムでは、まったく違うアプローチが必要です:

課題 個人店レベル 地域チェーンレベル 全国チェーンレベル 解決方法
🐌 処理速度 手作業でも十分 タブレット導入 高速システム必須 キャッシュ・データベース最適化
👥 利用者数 常連10人 地域住民100人 全国10万人 ロードバランシング・サーバー増設
📊 データ量 手書きメモ 1日100件の注文 1日10万件の注文 データベース分散・アーカイブ
🚨 障害対応 店長が対応 マネージャーが対応 24時間監視体制 自動監視・アラートシステム
🔧 メンテナンス 閉店時に対応 深夜に対応 無停止更新 CI/CD・ブルーグリーンデプロイ

🎉 最適化されたシステムなら…

flowchart TD
    A[🌍 全国のお客さん] --> B[🌐 ロードバランサー<br/>注文を適切に振り分け]
    B --> C[🖥️ サーバー1<br/>東日本担当]
    B --> D[🖥️ サーバー2<br/>西日本担当]
    B --> E[🖥️ サーバー3<br/>予備サーバー]
    
    C --> F[⚡ Redis キャッシュ<br/>よく注文される商品を高速表示]
    D --> F
    E --> F
    
    F --> G[🗄️ メインデータベース<br/>全ての注文・顧客データ]
    F --> H[📊 分析データベース<br/>売上・統計データ]
    
    I[🤖 監視システム] --> C
    I --> D
    I --> E
    I --> J[📧 障害時に自動メール通知]

メリット

  • 高速処理: キャッシュでよく使うデータを瞬時に表示
  • 🏔️ 高可用性: サーバーが落ちても他のサーバーで継続
  • 📈 自動スケール: アクセス増加時に自動でサーバー追加
  • 🔍 全自動監視: 問題発生時に即座に通知・対応

🔍 実際のコードを見てみよう!

📄 Step 1: 高速化の仕組み(Redis キャッシュ)

まず、「よく注文される商品を瞬時に表示する」仕組みを見てみましょう:

# src/chapter05-saas-platform/backend/app/core/cache.py(重要部分を抜粋)

import redis
import json
from datetime import timedelta
from typing import Any, Optional
from functools import wraps

class CacheManager:
    """高速キャッシュ管理システム(コンビニの冷蔵庫みたいな仕組み)"""
    
    def __init__(self):
        # Redis接続(超高速なデータ保存場所)
        self.redis_client = redis.from_url(
            settings.REDIS_URL,
            decode_responses=False,
            socket_keepalive=True,  # 接続を維持して高速化
            health_check_interval=30  # 30秒ごとに健康チェック
        )
        self.default_ttl = 3600  # 1時間でデータ自動削除
    
    def get(self, key: str, default: Any = None) -> Any:
        """データを高速取得(冷蔵庫からドリンクを取り出す感じ)"""
        try:
            cached_data = self.redis_client.get(self._make_key(key))
            if cached_data is None:
                return default  # キャッシュにない場合はデフォルト値
            
            # データを元の形に復元
            try:
                return json.loads(cached_data)  # JSON形式で保存されている場合
            except json.JSONDecodeError:
                import pickle
                return pickle.loads(cached_data)  # バイナリ形式の場合
                
        except Exception as e:
            print(f"キャッシュ取得エラー: {e}")
            return default
    
    def set(self, key: str, value: Any, ttl: Optional[int] = None) -> bool:
        """データを高速保存(冷蔵庫にドリンクを入れる感じ)"""
        try:
            # 保存期間設定
            if ttl is None:
                ttl = self.default_ttl  # デフォルト1時間
            elif isinstance(ttl, timedelta):
                ttl = int(ttl.total_seconds())
            
            # データを保存用に変換
            try:
                serialized_value = json.dumps(value, default=str)
            except (TypeError, ValueError):
                import pickle
                serialized_value = pickle.dumps(value)
            
            # Redis に保存(期限付き)
            return self.redis_client.setex(
                self._make_key(key),
                ttl,
                serialized_value
            )
            
        except Exception as e:
            print(f"キャッシュ保存エラー: {e}")
            return False
    
    def get_or_set(self, key: str, callable_obj, ttl: Optional[int] = None) -> Any:
        """キャッシュがあれば使う、なければ計算して保存"""
        # まずキャッシュを確認
        cached_value = self.get(key)
        if cached_value is not None:
            return cached_value  # キャッシュヒット!高速で返す
        
        # キャッシュになければ、時間のかかる処理を実行
        fresh_value = callable_obj()
        self.set(key, fresh_value, ttl)  # 次回のために保存
        return fresh_value
    
    def _make_key(self, key: str) -> str:
        """キー名を統一(アプリ名を付けて他と区別)"""
        return f"{settings.PROJECT_NAME}:{key}"

# グローバルキャッシュ(どこからでも使える)
cache = CacheManager()

# 魔法のデコレーター(関数の結果を自動キャッシュ)
def cached(ttl: Optional[int] = None):
    """関数の結果を自動でキャッシュするデコレーター"""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            # 引数からキャッシュキーを生成
            import hashlib
            args_str = str(args) + str(sorted(kwargs.items()))
            cache_key = f"{func.__name__}:{hashlib.md5(args_str.encode()).hexdigest()}"
            
            # キャッシュがあれば使う、なければ実行して保存
            return cache.get_or_set(cache_key, lambda: func(*args, **kwargs), ttl)
        return wrapper
    return decorator

# 使用例:重い計算を1回だけ実行
@cached(ttl=600)  # 10分間キャッシュ
def get_user_dashboard_data(user_id: int):
    """ユーザーダッシュボードの重いデータ取得(最初だけ時間かかる)"""
    print(f"重い計算実行中... ユーザー {user_id}")
    
    # 重い処理(データベース検索、集計など)
    # 2回目以降は瞬時に返ってくる!
    time.sleep(2)  # 2秒の重い処理をシミュレート
    
    return {
        "user_id": user_id,
        "total_projects": 15,
        "completed_tasks": 127,
        "team_members": 8
    }

🔰 初心者向け解説

概念 何をしているか 身近な例
Redis 超高速でデータを保存・取得できる場所 コンビニの冷蔵庫(よく売れる商品を手前に)
TTL(生存時間) データを自動で削除する期限 食材の賞味期限
get_or_set() あれば使う、なければ作って保存 「冷蔵庫にあれば取る、なければ作って入れる」
@cached デコレーター 関数に魔法をかけて自動でキャッシュ 「1回作ったら同じ料理は冷凍保存」
cache_key データを識別するラベル 冷凍庫の食材に貼るラベル

📄 Step 2: データベース最適化(大量データを賢く管理)

大量のお客さんが来ても、データベースが重くならない仕組みを見てみましょう:

# src/chapter05-saas-platform/backend/app/core/database_optimization.py(重要部分を抜粋)

from sqlalchemy import event, text
from sqlalchemy.engine import Engine
import time
import logging

class DatabaseOptimizer:
    """データベース最適化システム(図書館の司書さんみたいな仕組み)"""
    
    def __init__(self, engine):
        self.engine = engine
        self._setup_query_monitoring()  # クエリ監視を開始
    
    def _setup_query_monitoring(self):
        """遅いクエリを自動で見つける仕組み"""
        
        @event.listens_for(Engine, "before_cursor_execute")
        def receive_before_cursor_execute(conn, cursor, statement, parameters, context, executemany):
            # クエリ開始時刻を記録
            context._query_start_time = time.time()

        @event.listens_for(Engine, "after_cursor_execute")
        def receive_after_cursor_execute(conn, cursor, statement, parameters, context, executemany):
            # クエリ実行時間を計算
            total = time.time() - context._query_start_time
            
            # 1秒以上かかったクエリを警告(お客さんを待たせるレベル)
            if total > 1.0:
                logging.warning(
                    f"🐌 遅いクエリ発見!実行時間: {total:.2f}\n"
                    f"SQL: {statement}\n"
                    f"パラメータ: {parameters}"
                )
    
    def create_connection_pool(self):
        """コネクションプール設定(窓口カウンターの効率化)"""
        
        # データベース接続を使い回す仕組み(窓口の人数を調整)
        from sqlalchemy import create_engine
        from sqlalchemy.pool import QueuePool
        
        engine = create_engine(
            "postgresql://user:pass@localhost/db",
            
            # プール設定(銀行窓口の人数管理みたいな感じ)
            poolclass=QueuePool,
            pool_size=10,           # 常時待機する窓口数
            max_overflow=20,        # 混雑時に増やせる窓口数
            pool_timeout=30,        # 窓口待ちの最大時間(秒)
            pool_recycle=3600,      # 1時間で窓口担当者を交代
            
            # 接続オプション
            echo=False,             # SQLログ出力(開発時はTrue)
            future=True             # 新しいAPIを使用
        )
        
        return engine
    
    def setup_database_partitioning(self):
        """テーブル分割設定(大きな本棚を分割管理)"""
        
        # 日付ベースでテーブル分割(月ごとの帳簿みたいに分ける)
        partition_sql = """
        -- タスクテーブルを月別に分割
        CREATE TABLE tasks_y2024m01 PARTITION OF tasks
            FOR VALUES FROM ('2024-01-01') TO ('2024-02-01');
        
        CREATE TABLE tasks_y2024m02 PARTITION OF tasks  
            FOR VALUES FROM ('2024-02-01') TO ('2024-03-01');
        
        -- 古いパーティションの自動削除(古い帳簿の廃棄)
        CREATE OR REPLACE FUNCTION cleanup_old_partitions()
        RETURNS void AS $$
        DECLARE
            partition_name text;
        BEGIN
            -- 1年以上古いパーティションを削除
            FOR partition_name IN
                SELECT schemaname||'.'||tablename 
                FROM pg_tables 
                WHERE tablename LIKE 'tasks_y%' 
                AND tablename < 'tasks_y' || to_char(CURRENT_DATE - INTERVAL '1 year', 'YYYY"m"MM')
            LOOP
                EXECUTE 'DROP TABLE IF EXISTS ' || partition_name;
                RAISE NOTICE 'Dropped old partition: %', partition_name;
            END LOOP;
        END;
        $$ LANGUAGE plpgsql;
        """
        
        with self.engine.connect() as conn:
            conn.execute(text(partition_sql))
            conn.commit()

🔰 初心者向け解説

概念 何をしているか 身近な例
@event.listens_for データベース操作を自動で監視 図書館で「本を借りる時間」を自動計測
コネクションプール データベース接続を使い回し 銀行窓口の人数を効率的に管理
pool_size=10 常時待機する接続数 平常時の窓口担当者10人
max_overflow=20 混雑時に増やせる接続数 忙しい時だけ追加で20人投入
テーブル分割 大きなテーブルを月別に分ける 年鑑を月別に分けて保管

📄 Step 3: ロードバランサー実装(お客さんの案内係)

複数のサーバーに上手にお客さんを振り分ける仕組みを見てみましょう:

# src/chapter05-saas-platform/backend/app/core/load_balancer.py(実装例)

import random
import time
from typing import List, Dict, Optional, Any
from dataclasses import dataclass, field
from enum import Enum
import asyncio
import aiohttp

@dataclass
class ServerNode:
    """サーバーノード情報(レストランの各店舗)"""
    id: str
    host: str
    port: int
    weight: int = 1              # 処理能力の重み(大きな店舗は重み大)
    is_healthy: bool = True      # サーバーの健康状態
    current_connections: int = 0  # 現在の接続数
    last_health_check: float = field(default_factory=time.time)
    response_time: float = 0.0   # 平均応答時間

class LoadBalancingStrategy(Enum):
    """負荷分散方式"""
    ROUND_ROBIN = "round_robin"           # 順番に案内
    WEIGHTED_ROUND_ROBIN = "weighted"     # 能力に応じて案内  
    LEAST_CONNECTIONS = "least_conn"      # 空いてる店舗優先
    RANDOM = "random"                     # ランダム案内

class LoadBalancer:
    """ロードバランサー(チェーン店の案内係)"""
    
    def __init__(self, strategy: LoadBalancingStrategy = LoadBalancingStrategy.ROUND_ROBIN):
        self.servers: List[ServerNode] = []
        self.strategy = strategy
        self.current_index = 0  # Round Robin用のカウンター
        self.health_check_interval = 30  # 30秒ごとに健康診断
        
    def add_server(self, server: ServerNode):
        """サーバーを追加(新店舗オープン)"""
        self.servers.append(server)
        print(f"🏪 新店舗追加: {server.host}:{server.port} (重み: {server.weight})")
    
    def get_next_server(self) -> Optional[ServerNode]:
        """次のサーバーを選択(お客さんをどの店舗に案内するか)"""
        
        # 健康なサーバーだけフィルタリング
        healthy_servers = [s for s in self.servers if s.is_healthy]
        
        if not healthy_servers:
            print("⚠️ 利用可能な店舗がありません!")
            return None
        
        if self.strategy == LoadBalancingStrategy.ROUND_ROBIN:
            return self._round_robin_select(healthy_servers)
        elif self.strategy == LoadBalancingStrategy.WEIGHTED_ROUND_ROBIN:
            return self._weighted_round_robin_select(healthy_servers)
        elif self.strategy == LoadBalancingStrategy.LEAST_CONNECTIONS:
            return self._least_connections_select(healthy_servers)
        else:  # RANDOM
            return random.choice(healthy_servers)
    
    def _round_robin_select(self, servers: List[ServerNode]) -> ServerNode:
        """順番選択(順番に案内)"""
        if not servers:
            return None
            
        # 現在のインデックスを更新
        server = servers[self.current_index % len(servers)]
        self.current_index += 1
        
        print(f"🎯 Round Robin: {server.host}:{server.port} を選択")
        return server
    
    def _weighted_round_robin_select(self, servers: List[ServerNode]) -> ServerNode:
        """重み付き選択(大きな店舗により多くのお客さんを案内)"""
        
        # 重みの合計を計算
        total_weight = sum(server.weight for server in servers)
        
        # ランダムな数値を重みで選択
        random_value = random.randint(1, total_weight)
        current_weight = 0
        
        for server in servers:
            current_weight += server.weight
            if random_value <= current_weight:
                print(f"🎯 重み付き選択: {server.host}:{server.port} (重み: {server.weight})")
                return server
        
        # フォールバック
        return servers[0]
    
    def _least_connections_select(self, servers: List[ServerNode]) -> ServerNode:
        """最小接続選択(一番空いてる店舗に案内)"""
        
        # 接続数が最小のサーバーを選択
        least_busy_server = min(servers, key=lambda s: s.current_connections)
        
        print(f"🎯 最小接続: {least_busy_server.host}:{least_busy_server.port} "
              f"(接続数: {least_busy_server.current_connections})")
        
        return least_busy_server
    
    async def health_check_loop(self):
        """定期健康診断(各店舗の状況チェック)"""
        
        while True:
            print("🏥 サーバー健康診断を開始...")
            
            # 各サーバーの健康状態をチェック
            for server in self.servers:
                is_healthy = await self._check_server_health(server)
                
                if server.is_healthy != is_healthy:
                    # 状態が変わった場合の通知
                    status = "回復" if is_healthy else "ダウン"
                    print(f"📢 {server.host}:{server.port}{status}しました")
                
                server.is_healthy = is_healthy
                server.last_health_check = time.time()
            
            # 健康な店舗数をレポート
            healthy_count = sum(1 for s in self.servers if s.is_healthy)
            print(f"📊 健康な店舗: {healthy_count}/{len(self.servers)}")
            
            # 次の健康診断まで待機
            await asyncio.sleep(self.health_check_interval)
    
    async def _check_server_health(self, server: ServerNode) -> bool:
        """個別サーバー健康診断(店舗の営業確認)"""
        
        try:
            start_time = time.time()
            
            # HTTPでサーバーの生存確認
            async with aiohttp.ClientSession() as session:
                async with session.get(
                    f"http://{server.host}:{server.port}/health",
                    timeout=aiohttp.ClientTimeout(total=5)
                ) as response:
                    
                    # 応答時間を記録
                    server.response_time = time.time() - start_time
                    
                    # HTTPステータスが200番台なら健康
                    is_healthy = 200 <= response.status < 300
                    
                    if is_healthy:
                        print(f"✅ {server.host}:{server.port} は健康です (応答時間: {server.response_time:.2f}秒)")
                    else:
                        print(f"❌ {server.host}:{server.port} エラー応答: {response.status}")
                    
                    return is_healthy
                    
        except asyncio.TimeoutError:
            print(f"⏰ {server.host}:{server.port} タイムアウト")
            return False
        except Exception as e:
            print(f"💥 {server.host}:{server.port} 接続エラー: {e}")
            return False
    
    def get_load_balancer_stats(self) -> Dict[str, Any]:
        """ロードバランサーの統計情報"""
        
        total_servers = len(self.servers)
        healthy_servers = sum(1 for s in self.servers if s.is_healthy)
        total_connections = sum(s.current_connections for s in self.servers)
        avg_response_time = sum(s.response_time for s in self.servers) / total_servers if total_servers > 0 else 0
        
        return {
            "strategy": self.strategy.value,
            "total_servers": total_servers,
            "healthy_servers": healthy_servers,
            "total_connections": total_connections,
            "average_response_time": avg_response_time,
            "servers": [
                {
                    "id": s.id,
                    "host": f"{s.host}:{s.port}",
                    "weight": s.weight,
                    "is_healthy": s.is_healthy,
                    "connections": s.current_connections,
                    "response_time": s.response_time
                }
                for s in self.servers
            ]
        }

# 実際の使用例
async def setup_load_balancer():
    """ロードバランサーセットアップ例"""
    
    # ロードバランサーを作成
    lb = LoadBalancer(LoadBalancingStrategy.LEAST_CONNECTIONS)
    
    # サーバーを追加(チェーン店舗追加)
    lb.add_server(ServerNode("app1", "192.168.1.10", 8000, weight=1))
    lb.add_server(ServerNode("app2", "192.168.1.11", 8000, weight=2))  # 高性能サーバー
    lb.add_server(ServerNode("app3", "192.168.1.12", 8000, weight=1))
    
    # 健康診断を開始
    health_task = asyncio.create_task(lb.health_check_loop())
    
    return lb, health_task

🔰 初心者向け解説

概念 何をしているか 身近な例
ロードバランサー お客さんを効率よく店舗に案内 ファストフード店の案内係
Round Robin 順番に案内する方式 「1番窓口、次は2番窓口」の順番案内
重み付き選択 大きな店舗により多く案内 「大型店には多く、小型店には少なく」
最小接続 一番空いてる店舗に案内 「今一番空いてるレジに並んでください」
ヘルスチェック 店舗が営業中か定期確認 30秒ごとに「営業してますか?」と確認

🔍 監視・運用システム(24時間見守るシステム)

📄 Step 4: システム監視とメトリクス収集

全国チェーンになったら、各店舗の状況をリアルタイムで監視する「本部の監視センター」が必要になります:

# src/chapter05-saas-platform/backend/app/core/monitoring.py(監視システム)

import psutil
import time
from typing import Dict, Any, List
from dataclasses import dataclass
from datetime import datetime, timedelta
import asyncio
import json

@dataclass
class SystemMetrics:
    """システム指標(店舗の状況報告書)"""
    timestamp: datetime
    cpu_percent: float          # CPU使用率(コックさんの忙しさ)
    memory_percent: float       # メモリ使用率(冷蔵庫の満杯度)
    disk_percent: float         # ディスク使用率(倉庫の満杯度)
    network_io: Dict[str, int]  # ネットワークIO(お客さんの出入り)
    active_connections: int     # アクティブ接続数(店内のお客さん数)
    response_time: float        # 平均応答時間(注文から提供までの時間)

class MetricsCollector:
    """メトリクス収集システム(本部の情報収集部門)"""
    
    def __init__(self):
        self.metrics_history: List[SystemMetrics] = []
        self.collection_interval = 10  # 10秒ごとに収集
        self.retention_hours = 24      # 24時間分保持
        
    async def start_collection(self):
        """メトリクス収集開始(定期的な店舗巡回)"""
        print("📊 システム監視を開始します...")
        
        while True:
            try:
                # 現在のシステム状況を収集
                metrics = self._collect_current_metrics()
                
                # 履歴に追加
                self.metrics_history.append(metrics)
                
                # 古いデータを削除(24時間以上前)
                self._cleanup_old_metrics()
                
                # 異常検知
                await self._detect_anomalies(metrics)
                
                # 次の収集まで待機
                await asyncio.sleep(self.collection_interval)
                
            except Exception as e:
                print(f"❌ メトリクス収集エラー: {e}")
                await asyncio.sleep(5)  # エラー時は少し待つ
    
    def _collect_current_metrics(self) -> SystemMetrics:
        """現在のシステム指標を収集(店舗の現状調査)"""
        
        # CPU使用率(コックさんがどれだけ忙しいか)
        cpu_percent = psutil.cpu_percent(interval=1)
        
        # メモリ使用率(冷蔵庫の満杯度)
        memory = psutil.virtual_memory()
        memory_percent = memory.percent
        
        # ディスク使用率(倉庫の満杯度)
        disk = psutil.disk_usage('/')
        disk_percent = (disk.used / disk.total) * 100
        
        # ネットワークIO(お客さんの出入り状況)
        network = psutil.net_io_counters()
        network_io = {
            "bytes_sent": network.bytes_sent,
            "bytes_recv": network.bytes_recv,
            "packets_sent": network.packets_sent,
            "packets_recv": network.packets_recv
        }
        
        # アクティブ接続数(店内のお客さん数)
        active_connections = len(psutil.net_connections(kind='inet'))
        
        # 応答時間は別途測定(簡略化)
        response_time = self._measure_response_time()
        
        metrics = SystemMetrics(
            timestamp=datetime.utcnow(),
            cpu_percent=cpu_percent,
            memory_percent=memory_percent,
            disk_percent=disk_percent,
            network_io=network_io,
            active_connections=active_connections,
            response_time=response_time
        )
        
        # 収集した情報をログ出力
        print(f"📈 {metrics.timestamp.strftime('%H:%M:%S')}: "
              f"CPU:{cpu_percent:.1f}% "
              f"MEM:{memory_percent:.1f}% "
              f"接続:{active_connections}件")
        
        return metrics
    
    def _measure_response_time(self) -> float:
        """応答時間測定(注文から提供までの時間)"""
        # 実際の実装では、アプリケーションレベルで測定
        # ここでは簡略化
        return 0.1  # 100ms
    
    async def _detect_anomalies(self, metrics: SystemMetrics):
        """異常検知(問題の早期発見)"""
        
        warnings = []
        
        # CPU使用率が80%超過(コックさんが疲れ気味)
        if metrics.cpu_percent > 80:
            warnings.append(f"⚠️ CPU使用率が高いです: {metrics.cpu_percent:.1f}%")
        
        # メモリ使用率が85%超過(冷蔵庫がほぼ満杯)
        if metrics.memory_percent > 85:
            warnings.append(f"⚠️ メモリ使用率が高いです: {metrics.memory_percent:.1f}%")
        
        # ディスク使用率が90%超過(倉庫がほぼ満杯)
        if metrics.disk_percent > 90:
            warnings.append(f"⚠️ ディスク使用率が高いです: {metrics.disk_percent:.1f}%")
        
        # 応答時間が1秒超過(お客さんを待たせすぎ)
        if metrics.response_time > 1.0:
            warnings.append(f"⚠️ 応答時間が遅いです: {metrics.response_time:.2f}秒")
        
        # 警告がある場合はアラート送信
        if warnings:
            await self._send_alert(warnings)
    
    async def _send_alert(self, warnings: List[str]):
        """アラート送信(緊急事態の通知)"""
        
        alert_message = "🚨 システム警告:\n" + "\n".join(warnings)
        print(alert_message)
        
        # 実際の実装では以下のような通知を送信:
        # - Slack通知
        # - メール通知
        # - PagerDuty等のインシデント管理システム
        # - SMS通知(重要な障害の場合)
        
        # 例: Slack通知(実装例)
        # await self._send_slack_notification(alert_message)
    
    def _cleanup_old_metrics(self):
        """古いメトリクスデータの削除(古い資料の廃棄)"""
        
        cutoff_time = datetime.utcnow() - timedelta(hours=self.retention_hours)
        
        # 24時間より古いデータを削除
        self.metrics_history = [
            m for m in self.metrics_history 
            if m.timestamp > cutoff_time
        ]
    
    def get_metrics_summary(self, hours: int = 1) -> Dict[str, Any]:
        """指定時間のメトリクス集計(期間レポート)"""
        
        since_time = datetime.utcnow() - timedelta(hours=hours)
        recent_metrics = [
            m for m in self.metrics_history 
            if m.timestamp > since_time
        ]
        
        if not recent_metrics:
            return {"error": "No metrics available"}
        
        # 平均値、最大値、最小値を計算
        cpu_values = [m.cpu_percent for m in recent_metrics]
        memory_values = [m.memory_percent for m in recent_metrics]
        response_times = [m.response_time for m in recent_metrics]
        
        return {
            "period_hours": hours,
            "data_points": len(recent_metrics),
            "cpu": {
                "avg": sum(cpu_values) / len(cpu_values),
                "max": max(cpu_values),
                "min": min(cpu_values)
            },
            "memory": {
                "avg": sum(memory_values) / len(memory_values),
                "max": max(memory_values),
                "min": min(memory_values)
            },
            "response_time": {
                "avg": sum(response_times) / len(response_times),
                "max": max(response_times),
                "min": min(response_times)
            },
            "peak_connections": max(m.active_connections for m in recent_metrics)
        }

class AlertManager:
    """アラート管理システム(緊急事態管理部門)"""
    
    def __init__(self):
        self.alert_rules = {
            "cpu_critical": {"threshold": 90, "severity": "critical"},
            "memory_critical": {"threshold": 95, "severity": "critical"},
            "disk_critical": {"threshold": 95, "severity": "critical"},
            "response_time_warning": {"threshold": 2.0, "severity": "warning"}
        }
        self.alert_history = []
    
    async def process_metrics(self, metrics: SystemMetrics):
        """メトリクスからアラートをチェック"""
        
        alerts = []
        
        # CPU使用率チェック
        if metrics.cpu_percent >= self.alert_rules["cpu_critical"]["threshold"]:
            alerts.append({
                "rule": "cpu_critical",
                "message": f"CPU使用率が危険レベル: {metrics.cpu_percent:.1f}%",
                "severity": "critical",
                "timestamp": metrics.timestamp
            })
        
        # メモリ使用率チェック
        if metrics.memory_percent >= self.alert_rules["memory_critical"]["threshold"]:
            alerts.append({
                "rule": "memory_critical", 
                "message": f"メモリ使用率が危険レベル: {metrics.memory_percent:.1f}%",
                "severity": "critical",
                "timestamp": metrics.timestamp
            })
        
        # 各アラートを処理
        for alert in alerts:
            await self._handle_alert(alert)
    
    async def _handle_alert(self, alert: Dict[str, Any]):
        """個別アラート処理"""
        
        # アラート履歴に記録
        self.alert_history.append(alert)
        
        # 重要度に応じた処理
        if alert["severity"] == "critical":
            print(f"🚨 【緊急】{alert['message']}")
            # 即座に管理者に通知
            await self._send_critical_notification(alert)
        else:
            print(f"⚠️ 【警告】{alert['message']}")
            # 通常の警告通知
            await self._send_warning_notification(alert)
    
    async def _send_critical_notification(self, alert):
        """緊急通知送信"""
        # SMS、電話、Slack即座通知など
        pass
    
    async def _send_warning_notification(self, alert):
        """警告通知送信"""
        # メール、Slack通知など
        pass

# 実際の使用例
async def setup_monitoring():
    """監視システムセットアップ"""
    
    collector = MetricsCollector()
    alert_manager = AlertManager()
    
    # 監視開始
    monitor_task = asyncio.create_task(collector.start_collection())
    
    return collector, alert_manager, monitor_task

🔰 初心者向け解説

概念 何をしているか 身近な例
psutil システムの状況を調べるツール 体温計や血圧計(システムの健康診断)
CPU使用率 コンピューターがどれだけ忙しいか コックさんがどれだけ忙しく働いているか
メモリ使用率 作業用メモリがどれだけ使われているか 冷蔵庫がどれだけ満杯か
アクティブ接続数 同時に利用している人数 店内にいるお客さんの数
異常検知 いつもと違う状況を自動で見つける 「お客さんが多すぎて待ち時間が長い」を自動発見

📄 Step 5: アラートシステムとログ管理

問題が起きた時に、すぐに管理者に通知する仕組みを作りましょう:

# src/chapter05-saas-platform/backend/app/core/alerting.py(アラートシステム)

import asyncio
import aiohttp
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from typing import Dict, Any, List, Optional
from datetime import datetime, timedelta
from dataclasses import dataclass
from enum import Enum
import json

class AlertSeverity(Enum):
    """アラートの重要度(緊急度ランク)"""
    INFO = "info"         # 情報(お知らせ程度)
    WARNING = "warning"   # 警告(注意が必要)
    CRITICAL = "critical" # 緊急(すぐ対応必要)
    FATAL = "fatal"       # 致命的(システム停止レベル)

@dataclass
class Alert:
    """アラート情報(緊急事態報告書)"""
    id: str
    title: str
    message: str
    severity: AlertSeverity
    timestamp: datetime
    source: str           # どのシステムからの警告か
    metadata: Dict[str, Any] = None

class NotificationChannel:
    """通知チャンネル(連絡方法)"""
    
    async def send_slack_notification(self, alert: Alert, webhook_url: str):
        """Slack通知(チャット通知)"""
        
        # 重要度に応じて色を変更
        color_map = {
            AlertSeverity.INFO: "#36a64f",      # 緑
            AlertSeverity.WARNING: "#ff9500",   # オレンジ
            AlertSeverity.CRITICAL: "#ff0000",  # 赤
            AlertSeverity.FATAL: "#8B0000"      # 暗赤
        }
        
        # 重要度に応じて絵文字を変更
        emoji_map = {
            AlertSeverity.INFO: "ℹ️",
            AlertSeverity.WARNING: "⚠️", 
            AlertSeverity.CRITICAL: "🚨",
            AlertSeverity.FATAL: "💀"
        }
        
        payload = {
            "attachments": [
                {
                    "color": color_map[alert.severity],
                    "title": f"{emoji_map[alert.severity]} {alert.title}",
                    "text": alert.message,
                    "fields": [
                        {
                            "title": "重要度",
                            "value": alert.severity.value.upper(),
                            "short": True
                        },
                        {
                            "title": "発生時刻",
                            "value": alert.timestamp.strftime("%Y-%m-%d %H:%M:%S"),
                            "short": True
                        },
                        {
                            "title": "発生源",
                            "value": alert.source,
                            "short": True
                        }
                    ],
                    "footer": "SaaS Platform Monitoring",
                    "ts": alert.timestamp.timestamp()
                }
            ]
        }
        
        try:
            async with aiohttp.ClientSession() as session:
                async with session.post(webhook_url, json=payload) as response:
                    if response.status == 200:
                        print(f"✅ Slack通知送信成功: {alert.title}")
                    else:
                        print(f"❌ Slack通知送信失敗: {response.status}")
                        
        except Exception as e:
            print(f"💥 Slack通知エラー: {e}")
    
    async def send_email_notification(self, alert: Alert, email_config: Dict[str, str]):
        """メール通知(メール通知)"""
        
        try:
            # メール作成
            msg = MIMEMultipart()
            msg['From'] = email_config['from_email']
            msg['To'] = email_config['to_email']
            msg['Subject'] = f"[{alert.severity.value.upper()}] {alert.title}"
            
            # HTML本文作成
            html_body = f"""
            <html>
            <body>
                <h2 style="color: {'red' if alert.severity in [AlertSeverity.CRITICAL, AlertSeverity.FATAL] else 'orange'};">
                    システム警告
                </h2>
                
                <table style="border-collapse: collapse; width: 100%;">
                    <tr>
                        <td style="border: 1px solid #ddd; padding: 8px; font-weight: bold;">タイトル</td>
                        <td style="border: 1px solid #ddd; padding: 8px;">{alert.title}</td>
                    </tr>
                    <tr>
                        <td style="border: 1px solid #ddd; padding: 8px; font-weight: bold;">重要度</td>
                        <td style="border: 1px solid #ddd; padding: 8px;">{alert.severity.value.upper()}</td>
                    </tr>
                    <tr>
                        <td style="border: 1px solid #ddd; padding: 8px; font-weight: bold;">発生時刻</td>
                        <td style="border: 1px solid #ddd; padding: 8px;">{alert.timestamp.strftime("%Y-%m-%d %H:%M:%S")}</td>
                    </tr>
                    <tr>
                        <td style="border: 1px solid #ddd; padding: 8px; font-weight: bold;">発生源</td>
                        <td style="border: 1px solid #ddd; padding: 8px;">{alert.source}</td>
                    </tr>
                </table>
                
                <h3>詳細メッセージ</h3>
                <p>{alert.message}</p>
                
                <hr>
                <p><small>このメールは自動送信されました。</small></p>
            </body>
            </html>
            """
            
            msg.attach(MIMEText(html_body, 'html'))
            
            # SMTP送信
            server = smtplib.SMTP(email_config['smtp_server'], email_config['smtp_port'])
            server.starttls()
            server.login(email_config['username'], email_config['password'])
            server.send_message(msg)
            server.quit()
            
            print(f"✅ メール通知送信成功: {alert.title}")
            
        except Exception as e:
            print(f"💥 メール通知エラー: {e}")

class LogManager:
    """ログ管理システム(お店の日報管理)"""
    
    def __init__(self, log_level: str = "INFO"):
        self.log_level = log_level
        self.log_buffer = []  # メモリ内ログバッファ
        
    def log(self, level: str, message: str, extra_data: Dict[str, Any] = None):
        """ログ記録(日報への記録)"""
        
        log_entry = {
            "timestamp": datetime.utcnow().isoformat(),
            "level": level,
            "message": message,
            "extra_data": extra_data or {}
        }
        
        # メモリバッファに追加
        self.log_buffer.append(log_entry)
        
        # コンソール出力(開発時)
        timestamp_str = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
        print(f"[{timestamp_str}] {level}: {message}")
        
        # ファイルに書き込み(実際の実装では)
        # self._write_to_file(log_entry)
        
        # 長期保存用にデータベースに保存(実際の実装では)
        # await self._save_to_database(log_entry)
    
    def info(self, message: str, **kwargs):
        """情報ログ"""
        self.log("INFO", message, kwargs)
    
    def warning(self, message: str, **kwargs):
        """警告ログ"""
        self.log("WARNING", message, kwargs)
    
    def error(self, message: str, **kwargs):
        """エラーログ"""
        self.log("ERROR", message, kwargs)
    
    def critical(self, message: str, **kwargs):
        """緊急ログ"""
        self.log("CRITICAL", message, kwargs)
    
    def get_recent_logs(self, hours: int = 1, level: Optional[str] = None) -> List[Dict[str, Any]]:
        """最近のログ取得(期間指定で日報確認)"""
        
        since_time = datetime.utcnow() - timedelta(hours=hours)
        
        filtered_logs = []
        for log_entry in self.log_buffer:
            log_time = datetime.fromisoformat(log_entry["timestamp"])
            
            # 時間範囲チェック
            if log_time < since_time:
                continue
            
            # レベルフィルター
            if level and log_entry["level"] != level:
                continue
            
            filtered_logs.append(log_entry)
        
        return filtered_logs

class AlertingSystem:
    """統合アラートシステム(緊急事態対応本部)"""
    
    def __init__(self):
        self.notification_channel = NotificationChannel()
        self.log_manager = LogManager()
        self.alert_history = []
        self.suppression_rules = {}  # アラート抑制ルール
        
    async def send_alert(
        self, 
        title: str, 
        message: str, 
        severity: AlertSeverity,
        source: str,
        slack_webhook: Optional[str] = None,
        email_config: Optional[Dict[str, str]] = None
    ):
        """アラート送信(緊急事態の発報)"""
        
        # アラート作成
        alert = Alert(
            id=f"{source}_{datetime.utcnow().timestamp()}",
            title=title,
            message=message,
            severity=severity,
            timestamp=datetime.utcnow(),
            source=source
        )
        
        # アラート抑制チェック(同じ警告の連発を防ぐ)
        if self._should_suppress_alert(alert):
            self.log_manager.info(f"アラート抑制: {alert.title}")
            return
        
        # アラート履歴に記録
        self.alert_history.append(alert)
        
        # ログに記録
        self.log_manager.log(
            level=severity.value.upper(),
            message=f"アラート発生: {title}",
            extra_data={"alert_id": alert.id, "source": source}
        )
        
        # 通知送信
        notifications = []
        
        if slack_webhook:
            notifications.append(
                self.notification_channel.send_slack_notification(alert, slack_webhook)
            )
        
        if email_config:
            notifications.append(
                self.notification_channel.send_email_notification(alert, email_config)
            )
        
        # 複数通知を並行実行
        if notifications:
            await asyncio.gather(*notifications, return_exceptions=True)
    
    def _should_suppress_alert(self, alert: Alert) -> bool:
        """アラート抑制判定(同じ警告の連発防止)"""
        
        # 過去5分間に同じタイプのアラートがあったら抑制
        five_minutes_ago = datetime.utcnow() - timedelta(minutes=5)
        
        for past_alert in self.alert_history:
            if (past_alert.timestamp > five_minutes_ago and 
                past_alert.title == alert.title and
                past_alert.source == alert.source):
                return True
        
        return False
    
    def get_alert_summary(self, hours: int = 24) -> Dict[str, Any]:
        """アラート集計(期間レポート)"""
        
        since_time = datetime.utcnow() - timedelta(hours=hours)
        recent_alerts = [
            alert for alert in self.alert_history 
            if alert.timestamp > since_time
        ]
        
        # 重要度別集計
        severity_counts = {}
        for severity in AlertSeverity:
            severity_counts[severity.value] = len([
                a for a in recent_alerts 
                if a.severity == severity
            ])
        
        # 発生源別集計
        source_counts = {}
        for alert in recent_alerts:
            source_counts[alert.source] = source_counts.get(alert.source, 0) + 1
        
        return {
            "period_hours": hours,
            "total_alerts": len(recent_alerts),
            "severity_breakdown": severity_counts,
            "source_breakdown": source_counts,
            "recent_alerts": [
                {
                    "title": alert.title,
                    "severity": alert.severity.value,
                    "timestamp": alert.timestamp.isoformat(),
                    "source": alert.source
                }
                for alert in recent_alerts[-10:]  # 最新10件
            ]
        }

# 実際の使用例
async def setup_alerting():
    """アラートシステムセットアップ"""
    
    alerting = AlertingSystem()
    
    # テスト警告送信
    await alerting.send_alert(
        title="CPU使用率が高いです",
        message="CPU使用率が85%を超えました。システムの負荷を確認してください。",
        severity=AlertSeverity.WARNING,
        source="monitoring_system",
        slack_webhook="https://hooks.slack.com/services/YOUR/WEBHOOK/URL"
    )
    
    return alerting

🔰 初心者向け解説

概念 何をしているか 身近な例
AlertSeverity 緊急度のランク分け 「お知らせ・注意・警報・緊急事態」の段階
Slack通知 チャットアプリへの即座通知 LINE で「お店が忙しいです!」と連絡
アラート抑制 同じ警告の連発を防ぐ 「5分以内の同じ警告は1回だけ」ルール
ログ管理 起こったことの記録保存 お店の日報(何時に何が起きたか記録)
HTML メール 見やすい形式のメール通知 カラフルで読みやすい緊急連絡メール

📄 Step 6: CI/CD パイプライン(自動デプロイシステム)

コードを更新したら自動でテスト・デプロイする仕組みを作りましょう:

# .github/workflows/deploy.yml(GitHub Actions設定)

name: SaaS Platform CI/CD Pipeline

on:
  push:
    branches: [ main, develop ]
  pull_request:
    branches: [ main ]

env:
  PYTHON_VERSION: 3.11
  NODE_VERSION: 18
  REGISTRY: ghcr.io
  IMAGE_NAME: saas-platform

jobs:
  # 🧪 Step 1: テスト実行(品質チェック)
  test:
    runs-on: ubuntu-latest
    
    services:
      postgres:
        image: postgres:15
        env:
          POSTGRES_PASSWORD: postgres
          POSTGRES_DB: test_db
        options: >-
          --health-cmd pg_isready
          --health-interval 10s
          --health-timeout 5s
          --health-retries 5
        ports:
          - 5432:5432
      
      redis:
        image: redis:7
        options: >-
          --health-cmd "redis-cli ping"
          --health-interval 10s
          --health-timeout 5s
          --health-retries 5
        ports:
          - 6379:6379
    
    steps:
    - name: 📥 コードチェックアウト
      uses: actions/checkout@v3
    
    - name: 🐍 Python環境セットアップ
      uses: actions/setup-python@v4
      with:
        python-version: $
        cache: 'pip'
    
    - name: 📦 依存関係インストール
      run: |
        python -m pip install --upgrade pip
        pip install -r requirements.txt
        pip install -r requirements-dev.txt
    
    - name: 🔍 コード品質チェック(リンター)
      run: |
        # フォーマットチェック
        black --check .
        
        # インポート整理チェック  
        isort --check-only .
        
        # 静的解析
        flake8 .
        
        # 型チェック
        mypy .
    
    - name: 🧪 単体テスト実行
      env:
        DATABASE_URL: postgresql://postgres:postgres@localhost:5432/test_db
        REDIS_URL: redis://localhost:6379/0
      run: |
        pytest tests/unit/ -v --cov=app --cov-report=xml
    
    - name: 🔧 統合テスト実行
      env:
        DATABASE_URL: postgresql://postgres:postgres@localhost:5432/test_db
        REDIS_URL: redis://localhost:6379/0
      run: |
        pytest tests/integration/ -v
    
    - name: 🏃‍♂️ API テスト実行
      env:
        DATABASE_URL: postgresql://postgres:postgres@localhost:5432/test_db
        REDIS_URL: redis://localhost:6379/0
      run: |
        # アプリケーション起動
        uvicorn app.main:app --host 0.0.0.0 --port 8000 &
        sleep 10
        
        # API テスト実行
        pytest tests/api/ -v
    
    - name: 📊 テストカバレッジアップロード
      uses: codecov/codecov-action@v3
      with:
        file: ./coverage.xml
        fail_ci_if_error: true

  # 🔨 Step 2: Docker イメージビルド
  build:
    needs: test
    runs-on: ubuntu-latest
    if: github.event_name == 'push'
    
    outputs:
      image-tag: $
      image-digest: $
    
    steps:
    - name: 📥 コードチェックアウト
      uses: actions/checkout@v3
    
    - name: 🔧 Docker Buildx セットアップ
      uses: docker/setup-buildx-action@v2
    
    - name: 🔐 Container Registry ログイン
      uses: docker/login-action@v2
      with:
        registry: $
        username: $
        password: $
    
    - name: 🏷️ メタデータ生成
      id: meta
      uses: docker/metadata-action@v4
      with:
        images: $/$/$
        tags: |
          type=ref,event=branch
          type=ref,event=pr
          type=sha,prefix=-
          type=raw,value=latest,enable=
    
    - name: 🔨 Docker イメージビルド・プッシュ
      id: build
      uses: docker/build-push-action@v4
      with:
        context: .
        file: ./Dockerfile
        push: true
        tags: $
        labels: $
        cache-from: type=gha
        cache-to: type=gha,mode=max

  # 🚀 Step 3: 開発環境デプロイ
  deploy-dev:
    needs: [test, build]
    runs-on: ubuntu-latest
    if: github.ref == 'refs/heads/develop'
    environment: development
    
    steps:
    - name: 🚀 開発環境デプロイ
      run: |
        echo "🌱 開発環境にデプロイ中..."
        
        # Kubernetesにデプロイ(例)
        # kubectl set image deployment/saas-platform \
        #   app=$ \
        #   --namespace=development
        
        # ヘルスチェック
        # kubectl rollout status deployment/saas-platform --namespace=development
        
        echo "✅ 開発環境デプロイ完了!"

  # 🌍 Step 4: 本番環境デプロイ
  deploy-prod:
    needs: [test, build]
    runs-on: ubuntu-latest
    if: github.ref == 'refs/heads/main'
    environment: production
    
    steps:
    - name: 🔍 本番前チェック
      run: |
        echo "🔍 本番環境デプロイ前チェック..."
        
        # データベースマイグレーションのドライラン
        # alembic upgrade head --sql
        
        # 設定ファイルの妥当性チェック
        # python -c "from app.core.config import settings; print('設定OK')"
        
        echo "✅ 事前チェック完了"
    
    - name: 🚀 本番環境デプロイ
      run: |
        echo "🌍 本番環境にデプロイ中..."
        
        # ブルーグリーンデプロイ(例)
        # kubectl apply -f k8s/production/
        # kubectl set image deployment/saas-platform \
        #   app=$ \
        #   --namespace=production
        
        # ローリングアップデート完了待機
        # kubectl rollout status deployment/saas-platform --namespace=production
        
        echo "✅ 本番環境デプロイ完了!"
    
    - name: 🧪 本番環境ヘルスチェック
      run: |
        echo "🏥 本番環境ヘルスチェック..."
        
        # API ヘルスチェック
        # curl -f https://api.yourapp.com/health || exit 1
        
        # データベース接続チェック
        # python scripts/health_check.py
        
        echo "✅ 本番環境正常動作確認"
    
    - name: 📢 デプロイ通知
      if: always()
      run: |
        # Slack通知(実際の実装では)
        # curl -X POST -H 'Content-type: application/json' \
        #   --data '{"text":"🚀 本番環境デプロイ完了"}' \
        #   $
        
        echo "📢 関係者に通知送信完了"

  # 🔄 Step 5: データベースマイグレーション
  migrate:
    needs: deploy-prod
    runs-on: ubuntu-latest
    if: github.ref == 'refs/heads/main'
    environment: production
    
    steps:
    - name: 📥 コードチェックアウト
      uses: actions/checkout@v3
    
    - name: 🐍 Python環境セットアップ
      uses: actions/setup-python@v4
      with:
        python-version: $
    
    - name: 📦 依存関係インストール
      run: |
        pip install alembic psycopg2-binary
    
    - name: 🗄️ データベースマイグレーション実行
      env:
        DATABASE_URL: $
      run: |
        echo "🗄️ データベースマイグレーション実行..."
        
        # マイグレーション実行
        alembic upgrade head
        
        echo "✅ マイグレーション完了"

🔰 初心者向け解説

概念 何をしているか 身近な例
CI/CD コード更新→テスト→デプロイを自動化 料理のレシピ通りに自動で調理する機械
GitHub Actions GitHubでのワークフロー自動実行 「コードが更新されたら自動でテスト開始」
pytest Pythonのテスト実行ツール 料理が美味しくできたかの味見
Docker アプリを箱詰めして動かす技術 「どこでも同じ環境で動く」お弁当箱
ブルーグリーンデプロイ 2つの環境を使って安全にデプロイ 新店舗を準備してからお客さんを案内

まとめ

Chapter 5-3では、大規模なSaaSプラットフォームを支えるスケーリング戦略とパフォーマンス最適化を実装しました。

実装した機能:

  • Redis による高速キャッシングシステム
  • データベース最適化とコネクションプール管理
  • ロードバランサーによる負荷分散
  • リアルタイム監視とメトリクス収集
  • アラートシステムとログ管理
  • CI/CD パイプラインによる自動デプロイ

技術的な学習ポイント:

  • 高トラフィック対応のアーキテクチャ設計
  • 監視・運用システムの構築
  • 自動化による運用効率化
  • インフラレベルでの可用性確保

次章予告: Chapter 6では、セキュリティとコンプライアンス対応を深掘りします。

def get_connection_pool_status(self):
    """データベース接続の使用状況確認(駐車場の空き状況みたいな)"""
    pool = self.engine.pool
    return {
        "total_size": pool.size(),           # 総駐車台数
        "checked_in": pool.checkedin(),      # 空いている台数
        "checked_out": pool.checkedout(),    # 使用中の台数
        "overflow": pool.overflow(),         # 臨時駐車台数
        "status": f"{pool.checkedout()}/{pool.size() + pool.overflow()}"  # 使用率
    }

def analyze_table_sizes(self):
    """どのテーブルが重いかを調査(図書館のどの棚が重いか確認)"""
    with self.engine.connect() as conn:
        # テーブルサイズ情報を取得
        result = conn.execute(text("""
            SELECT 
                tablename as table_name,
                pg_size_pretty(pg_total_relation_size('public.'||tablename)) as size,
                pg_total_relation_size('public.'||tablename) as size_bytes
            FROM pg_tables 
            WHERE schemaname = 'public'
            ORDER BY pg_total_relation_size('public.'||tablename) DESC
            LIMIT 10;
        """))
        
        table_sizes = []
        for row in result:
            table_sizes.append({
                'table_name': row.table_name,
                'readable_size': row.size,  # "125 MB" のような読みやすい形式
                'bytes': row.size_bytes     # 正確なバイト数
            })
        
        return table_sizes

def find_unused_indexes(self):
    """使われていないインデックスを発見(使わない本棚を見つける)"""
    with self.engine.connect() as conn:
        result = conn.execute(text("""
            SELECT 
                schemaname,
                tablename,
                indexname,
                pg_size_pretty(pg_relation_size(indexrelid)) as size
            FROM pg_stat_user_indexes 
            WHERE idx_scan = 0  -- 1回も使われていないインデックス
            AND schemaname = 'public'
            ORDER BY pg_relation_size(indexrelid) DESC;
        """))
        
        unused_indexes = []
        for row in result:
            unused_indexes.append({
                'table': row.tablename,
                'index': row.indexname,
                'wasted_size': row.size
            })
        
        return unused_indexes

def create_monthly_partition(self, table_name: str, year: int, month: int):
    """月次パーティション作成(月ごとにファイルを分ける整理術)"""
    
    partition_name = f"{table_name}_y{year}m{month:02d}"
    start_date = f"{year}-{month:02d}-01"
    
    # 次月の1日を終了日とする
    if month == 12:
        end_date = f"{year + 1}-01-01"
    else:
        end_date = f"{year}-{month + 1:02d}-01"
    
    create_sql = f"""
    -- {year}年{month}月のデータ専用テーブルを作成
    CREATE TABLE IF NOT EXISTS {partition_name} 
    PARTITION OF {table_name} 
    FOR VALUES FROM ('{start_date}') TO ('{end_date}');
    
    -- 高速検索用のインデックスを作成
    CREATE INDEX IF NOT EXISTS {partition_name}_created_at_idx 
    ON {partition_name} (created_at);
    
    CREATE INDEX IF NOT EXISTS {partition_name}_user_id_idx 
    ON {partition_name} (user_id);
    """
    
    return create_sql

使用例:賢いデータベース管理

optimizer = DatabaseOptimizer(engine)

接続状況をチェック

pool_status = optimizer.get_connection_pool_status() print(f”データベース接続: {pool_status[‘status’]}”)

重いテーブルを確認

heavy_tables = optimizer.analyze_table_sizes() for table in heavy_tables[:3]: # 上位3つを表示 print(f”📊 {table[‘table_name’]}: {table[‘readable_size’]}”)

無駄なインデックスを確認

unused = optimizer.find_unused_indexes() if unused: print(f”🗑️ 使われていないインデックス: {len(unused)}個”)


**🔰 初心者向け解説**:

| 概念 | 何をしているか | 身近な例 |
|:-----|:-------------|:---------| 
| `クエリ監視` | 遅いデータベース検索を自動で発見 | 図書館で「この本探すのに10分もかかってる」と気づく |
| `接続プール` | データベース接続を使い回して効率化 | タクシー会社の車両管理(無駄に車を増やさない) |
| `テーブルサイズ分析` | どのデータが重いかを把握 | 引っ越し時に「この箱が一番重い」と把握 |
| `パーティション` | 大きなテーブルを月別に分割 | 家計簿を月別ファイルに分けて管理 |
| `未使用インデックス` | 使われていない索引を削除して節約 | 使わない電話帳を捨ててスペース確保 |

### 📄 Step 3: ロードバランサー(お客さんを上手に案内する仕組み)

大量のアクセスを複数のサーバーに振り分ける「案内係」の仕組みを見てみましょう:

```python
# src/chapter05-saas-platform/backend/app/core/load_balancer.py(重要部分を抜粋)

from typing import List, Dict, Any
import random
import time
from dataclasses import dataclass
from enum import Enum

class HealthStatus(Enum):
    """サーバーの健康状態(レストランスタッフの出勤状況みたいな)"""
    HEALTHY = "healthy"      # 元気で働ける
    UNHEALTHY = "unhealthy"  # 体調不良で働けない
    DEGRADED = "degraded"    # 少し疲れているが働ける

@dataclass
class ServerInstance:
    """個別サーバーの情報(レストランの各店舗情報)"""
    id: str                          # サーバーID(店舗名)
    host: str                        # サーバーのアドレス
    port: int                        # サーバーのポート番号
    weight: int = 1                  # 処理能力の重み(大きな店舗なら2、小さな店舗なら1)
    current_connections: int = 0      # 現在の接続数(今いるお客さん数)
    max_connections: int = 100       # 最大接続数(席数)
    status: HealthStatus = HealthStatus.HEALTHY  # 健康状態
    last_health_check: float = 0     # 最後にチェックした時刻
    response_time: float = 0         # 応答時間(注文から料理が出るまでの時間)

class LoadBalancer:
    """ロードバランサー(お客さんを適切な店舗に案内するシステム)"""
    
    def __init__(self):
        self.servers: List[ServerInstance] = []  # 管理している店舗一覧
        self.algorithm = "weighted_round_robin"   # 案内方法
        self.current_index = 0                   # 現在の案内先インデックス
    
    def add_server(self, server: ServerInstance):
        """新しい店舗を追加"""
        self.servers.append(server)
        print(f"🏪 新店舗追加: {server.id} ({server.host}:{server.port})")
    
    def get_healthy_servers(self) -> List[ServerInstance]:
        """営業中の店舗一覧を取得"""
        healthy = [s for s in self.servers if s.status == HealthStatus.HEALTHY]
        print(f"🟢 営業中店舗: {len(healthy)}店舗")
        return healthy
    
    def select_server(self) -> ServerInstance:
        """お客さんを案内する店舗を選択"""
        healthy_servers = self.get_healthy_servers()
        
        if not healthy_servers:
            raise Exception("❌ 営業中の店舗がありません!")
        
        # 案内方法による選択
        if self.algorithm == "round_robin":
            return self._round_robin_selection(healthy_servers)
        elif self.algorithm == "weighted_round_robin":
            return self._weighted_round_robin_selection(healthy_servers)
        elif self.algorithm == "least_connections":
            return self._least_connections_selection(healthy_servers)
        else:
            # ランダム選択(お任せ案内)
            return random.choice(healthy_servers)
    
    def _round_robin_selection(self, servers: List[ServerInstance]) -> ServerInstance:
        """順番に案内(公平に回す)"""
        server = servers[self.current_index % len(servers)]
        self.current_index += 1
        print(f"📍 順番案内: {server.id} へご案内")
        return server
    
    def _weighted_round_robin_selection(self, servers: List[ServerInstance]) -> ServerInstance:
        """処理能力に応じて案内(大きな店舗により多くのお客さんを)"""
        total_weight = sum(s.weight for s in servers)
        rand_weight = random.randint(1, total_weight)
        
        current_weight = 0
        for server in servers:
            current_weight += server.weight
            if rand_weight <= current_weight:
                print(f"⚖️ 重み付き案内: {server.id} へご案内 (weight: {server.weight})")
                return server
        
        return servers[0]  # フォールバック
    
    def _least_connections_selection(self, servers: List[ServerInstance]) -> ServerInstance:
        """一番空いている店舗に案内"""
        least_busy = min(servers, key=lambda s: s.current_connections)
        print(f"🎯 空いている店舗に案内: {least_busy.id} (お客さん{least_busy.current_connections}人)")
        return least_busy
    
    async def health_check(self):
        """全店舗の健康状態をチェック(巡回点検)"""
        print("🔍 全店舗の健康チェック開始...")
        
        for server in self.servers:
            try:
                # 店舗に「元気ですか?」と確認
                start_time = time.time()
                is_healthy = await self._check_server_health(server)
                response_time = time.time() - start_time
                
                server.response_time = response_time
                server.last_health_check = time.time()
                
                if is_healthy:
                    if response_time > 5.0:  # 5秒以上は少し疲れ気味
                        server.status = HealthStatus.DEGRADED
                        print(f"😅 {server.id}: 少し疲れ気味 ({response_time:.1f}秒)")
                    else:
                        server.status = HealthStatus.HEALTHY
                        print(f"😊 {server.id}: 元気です ({response_time:.1f}秒)")
                else:
                    server.status = HealthStatus.UNHEALTHY
                    print(f"😷 {server.id}: 体調不良で休業中")
                    
            except Exception as e:
                server.status = HealthStatus.UNHEALTHY
                print(f"💥 {server.id}: 連絡がつきません - {str(e)}")
    
    async def _check_server_health(self, server: ServerInstance) -> bool:
        """個別店舗の健康チェック"""
        import httpx
        
        try:
            async with httpx.AsyncClient(timeout=5.0) as client:
                # 店舗のヘルスチェックAPI呼び出し
                response = await client.get(f"http://{server.host}:{server.port}/health")
                return response.status_code == 200
        except:
            return False

# 使用例:実際のロードバランサー運用
load_balancer = LoadBalancer()

# 店舗を追加
load_balancer.add_server(ServerInstance("tokyo-1", "10.0.1.100", 8000, weight=2))    # 大型店舗
load_balancer.add_server(ServerInstance("osaka-1", "10.0.1.101", 8000, weight=2))    # 大型店舗  
load_balancer.add_server(ServerInstance("backup-1", "10.0.1.102", 8000, weight=1))   # 予備店舗

# お客さんが来たら適切な店舗に案内
try:
    selected_server = load_balancer.select_server()
    print(f"🎉 {selected_server.id} にお客様をご案内しました!")
except Exception as e:
    print(f"😵 申し訳ございません: {e}")

🔰 初心者向け解説

概念 何をしているか 身近な例
LoadBalancer お客さんを適切なサーバーに案内 レストランチェーンの案内係
HealthStatus 各サーバーの稼働状況を管理 各店舗の営業状況(営業中・休業中・混雑中)
Round Robin 順番にサーバーを使って公平に分散 「次のお客様は2号店へ、その次は3号店へ」
Weighted サーバーの性能に応じて配分調整 大きな店舗により多くのお客さんを案内
Least Connections 一番空いているサーバーを選択 待ち時間が一番短い店舗を案内
Health Check 定期的にサーバーの状態を確認 店長が各店舗に「営業できてる?」と確認電話

データベース最適化

# app/core/database_optimization.py
from sqlalchemy import event, text
from sqlalchemy.engine import Engine
from sqlalchemy.pool import QueuePool
import time
import logging

from app.core.config import settings

# スロークエリ監視
@event.listens_for(Engine, "before_cursor_execute")
def receive_before_cursor_execute(conn, cursor, statement, parameters, context, executemany):
    context._query_start_time = time.time()

@event.listens_for(Engine, "after_cursor_execute")
def receive_after_cursor_execute(conn, cursor, statement, parameters, context, executemany):
    total = time.time() - context._query_start_time
    
    # 1秒以上のクエリを警告
    if total > 1.0:
        logging.warning(
            f"Slow query: {total:.2f}s\n"
            f"Statement: {statement}\n"
            f"Parameters: {parameters}"
        )

class DatabaseOptimizer:
    def __init__(self, engine):
        self.engine = engine
    
    def get_connection_pool_status(self):
        """コネクションプール状態取得"""
        pool = self.engine.pool
        return {
            "size": pool.size(),
            "checked_in": pool.checkedin(),
            "checked_out": pool.checkedout(),
            "overflow": pool.overflow(),
            "status": f"{pool.checkedout()}/{pool.size() + pool.overflow()}"
        }
    
    def analyze_query_performance(self, query: str, limit: int = 10):
        """クエリパフォーマンス分析"""
        with self.engine.connect() as conn:
            # EXPLAIN ANALYZE実行
            explain_query = f"EXPLAIN (ANALYZE, BUFFERS, FORMAT JSON) {query}"
            result = conn.execute(text(explain_query))
            explain_data = result.fetchone()[0]
            
            return {
                "execution_time": explain_data[0]["Execution Time"],
                "planning_time": explain_data[0]["Planning Time"],
                "plan": explain_data[0]["Plan"]
            }
    
    def get_database_statistics(self):
        """データベース統計情報取得"""
        with self.engine.connect() as conn:
            # テーブルサイズ
            table_sizes = conn.execute(text("""
                SELECT 
                    schemaname,
                    tablename,
                    pg_size_pretty(pg_total_relation_size(schemaname||'.'||tablename)) as size,
                    pg_total_relation_size(schemaname||'.'||tablename) as size_bytes
                FROM pg_tables 
                WHERE schemaname = 'public'
                ORDER BY pg_total_relation_size(schemaname||'.'||tablename) DESC;
            """)).fetchall()
            
            # インデックス効率
            index_usage = conn.execute(text("""
                SELECT 
                    schemaname,
                    tablename,
                    indexname,
                    idx_scan as index_scans,
                    idx_tup_read as tuples_read,
                    idx_tup_fetch as tuples_fetched
                FROM pg_stat_user_indexes 
                ORDER BY idx_scan DESC;
            """)).fetchall()
            
            # 未使用インデックス
            unused_indexes = conn.execute(text("""
                SELECT 
                    schemaname,
                    tablename,
                    indexname,
                    pg_size_pretty(pg_relation_size(indexrelid)) as size
                FROM pg_stat_user_indexes 
                WHERE idx_scan = 0
                AND schemaname = 'public';
            """)).fetchall()
            
            return {
                "table_sizes": [dict(row._mapping) for row in table_sizes],
                "index_usage": [dict(row._mapping) for row in index_usage],
                "unused_indexes": [dict(row._mapping) for row in unused_indexes]
            }

# パーティショニング実装例
def create_audit_log_partition(year: int, month: int):
    """監査ログパーティション作成"""
    partition_name = f"audit_logs_y{year}m{month:02d}"
    start_date = f"{year}-{month:02d}-01"
    
    if month == 12:
        end_date = f"{year + 1}-01-01"
    else:
        end_date = f"{year}-{month + 1:02d}-01"
    
    create_partition_sql = f"""
    CREATE TABLE IF NOT EXISTS {partition_name} 
    PARTITION OF audit_logs 
    FOR VALUES FROM ('{start_date}') TO ('{end_date}');
    
    CREATE INDEX IF NOT EXISTS {partition_name}_created_at_idx 
    ON {partition_name} (created_at);
    
    CREATE INDEX IF NOT EXISTS {partition_name}_user_id_idx 
    ON {partition_name} (user_id);
    """
    
    return create_partition_sql

ロードバランシング設定

# app/core/load_balancer.py
from typing import List, Dict, Any
import random
import time
from dataclasses import dataclass
from enum import Enum

class HealthStatus(Enum):
    HEALTHY = "healthy"
    UNHEALTHY = "unhealthy"
    DEGRADED = "degraded"

@dataclass
class ServerInstance:
    id: str
    host: str
    port: int
    weight: int = 1
    current_connections: int = 0
    max_connections: int = 100
    status: HealthStatus = HealthStatus.HEALTHY
    last_health_check: float = 0
    response_time: float = 0

class LoadBalancer:
    def __init__(self):
        self.servers: List[ServerInstance] = []
        self.algorithm = "weighted_round_robin"
        self.current_index = 0
    
    def add_server(self, server: ServerInstance):
        """サーバー追加"""
        self.servers.append(server)
    
    def remove_server(self, server_id: str):
        """サーバー削除"""
        self.servers = [s for s in self.servers if s.id != server_id]
    
    def get_healthy_servers(self) -> List[ServerInstance]:
        """健全なサーバー一覧取得"""
        return [s for s in self.servers if s.status == HealthStatus.HEALTHY]
    
    def select_server(self) -> ServerInstance:
        """サーバー選択"""
        healthy_servers = self.get_healthy_servers()
        
        if not healthy_servers:
            raise Exception("No healthy servers available")
        
        if self.algorithm == "round_robin":
            return self._round_robin_selection(healthy_servers)
        elif self.algorithm == "weighted_round_robin":
            return self._weighted_round_robin_selection(healthy_servers)
        elif self.algorithm == "least_connections":
            return self._least_connections_selection(healthy_servers)
        else:
            return random.choice(healthy_servers)
    
    def _round_robin_selection(self, servers: List[ServerInstance]) -> ServerInstance:
        """ラウンドロビン選択"""
        server = servers[self.current_index % len(servers)]
        self.current_index += 1
        return server
    
    def _weighted_round_robin_selection(self, servers: List[ServerInstance]) -> ServerInstance:
        """重み付きラウンドロビン選択"""
        total_weight = sum(s.weight for s in servers)
        rand_weight = random.randint(1, total_weight)
        
        current_weight = 0
        for server in servers:
            current_weight += server.weight
            if rand_weight <= current_weight:
                return server
        
        return servers[0]  # フォールバック
    
    def _least_connections_selection(self, servers: List[ServerInstance]) -> ServerInstance:
        """最少コネクション選択"""
        return min(servers, key=lambda s: s.current_connections)
    
    async def health_check(self):
        """ヘルスチェック実行"""
        for server in self.servers:
            try:
                # HTTPヘルスチェック実行
                start_time = time.time()
                is_healthy = await self._check_server_health(server)
                response_time = time.time() - start_time
                
                server.response_time = response_time
                server.last_health_check = time.time()
                
                if is_healthy:
                    if response_time > 5.0:  # 5秒以上は劣化
                        server.status = HealthStatus.DEGRADED
                    else:
                        server.status = HealthStatus.HEALTHY
                else:
                    server.status = HealthStatus.UNHEALTHY
                    
            except Exception:
                server.status = HealthStatus.UNHEALTHY
    
    async def _check_server_health(self, server: ServerInstance) -> bool:
        """個別サーバーヘルスチェック"""
        import httpx
        
        try:
            async with httpx.AsyncClient(timeout=5.0) as client:
                response = await client.get(f"http://{server.host}:{server.port}/health")
                return response.status_code == 200
        except:
            return False

# 使用例
load_balancer = LoadBalancer()

🔍 監視・運用システム(24時間見守るシステム)

📄 Step 4: システム監視とメトリクス収集

全国チェーンになったら、各店舗の状況をリアルタイムで監視する「本部の監視センター」が必要になります:

# app/core/metrics.py
import time
import psutil
from typing import Dict, Any, List
from dataclasses import dataclass, asdict
from datetime import datetime
import asyncio

@dataclass
class SystemMetrics:
    timestamp: datetime
    cpu_percent: float
    memory_percent: float
    memory_used_mb: float
    disk_usage_percent: float
    network_io: Dict[str, int]
    process_count: int

@dataclass
class ApplicationMetrics:
    timestamp: datetime
    active_connections: int
    request_count: int
    error_count: int
    avg_response_time: float
    cache_hit_ratio: float
    database_connections: int

class MetricsCollector:
    def __init__(self):
        self.system_metrics_history: List[SystemMetrics] = []
        self.app_metrics_history: List[ApplicationMetrics] = []
        self.max_history_size = 1000
        
        # カウンター
        self.request_count = 0
        self.error_count = 0
        self.response_times: List[float] = []
    
    def collect_system_metrics(self) -> SystemMetrics:
        """システムメトリクス収集"""
        memory = psutil.virtual_memory()
        disk = psutil.disk_usage('/')
        network = psutil.net_io_counters()
        
        metrics = SystemMetrics(
            timestamp=datetime.utcnow(),
            cpu_percent=psutil.cpu_percent(interval=1),
            memory_percent=memory.percent,
            memory_used_mb=memory.used / 1024 / 1024,
            disk_usage_percent=disk.percent,
            network_io={
                "bytes_sent": network.bytes_sent,
                "bytes_recv": network.bytes_recv
            },
            process_count=len(psutil.pids())
        )
        
        # 履歴保存
        self.system_metrics_history.append(metrics)
        if len(self.system_metrics_history) > self.max_history_size:
            self.system_metrics_history.pop(0)
        
        return metrics
    
    def collect_application_metrics(self) -> ApplicationMetrics:
        """アプリケーションメトリクス収集"""
        from app.core.database import engine
        from app.core.cache import cache
        
        # データベース接続数
        db_connections = 0
        if hasattr(engine.pool, 'checkedout'):
            db_connections = engine.pool.checkedout()
        
        # レスポンス時間平均
        avg_response_time = 0
        if self.response_times:
            avg_response_time = sum(self.response_times) / len(self.response_times)
            self.response_times = self.response_times[-100:]  # 直近100件のみ保持
        
        # キャッシュヒット率
        cache_hit_ratio = 0
        try:
            cache_info = cache.redis_client.info('stats')
            hits = cache_info.get('keyspace_hits', 0)
            misses = cache_info.get('keyspace_misses', 0)
            if hits + misses > 0:
                cache_hit_ratio = hits / (hits + misses) * 100
        except:
            pass
        
        metrics = ApplicationMetrics(
            timestamp=datetime.utcnow(),
            active_connections=0,  # FastAPIから取得
            request_count=self.request_count,
            error_count=self.error_count,
            avg_response_time=avg_response_time,
            cache_hit_ratio=cache_hit_ratio,
            database_connections=db_connections
        )
        
        self.app_metrics_history.append(metrics)
        if len(self.app_metrics_history) > self.max_history_size:
            self.app_metrics_history.pop(0)
        
        return metrics
    
    def record_request(self, response_time: float, is_error: bool = False):
        """リクエスト記録"""
        self.request_count += 1
        self.response_times.append(response_time)
        
        if is_error:
            self.error_count += 1
    
    def get_summary(self) -> Dict[str, Any]:
        """メトリクス要約取得"""
        if not self.system_metrics_history or not self.app_metrics_history:
            return {}
        
        recent_system = self.system_metrics_history[-10:]  # 直近10件
        recent_app = self.app_metrics_history[-10:]
        
        return {
            "system": {
                "avg_cpu": sum(m.cpu_percent for m in recent_system) / len(recent_system),
                "avg_memory": sum(m.memory_percent for m in recent_system) / len(recent_system),
                "current_memory_mb": recent_system[-1].memory_used_mb,
                "disk_usage": recent_system[-1].disk_usage_percent
            },
            "application": {
                "total_requests": self.request_count,
                "total_errors": self.error_count,
                "error_rate": (self.error_count / self.request_count * 100) if self.request_count > 0 else 0,
                "avg_response_time": recent_app[-1].avg_response_time,
                "cache_hit_ratio": recent_app[-1].cache_hit_ratio,
                "db_connections": recent_app[-1].database_connections
            },
            "timestamp": datetime.utcnow().isoformat()
        }

# グローバルメトリクスコレクター
metrics_collector = MetricsCollector()

# ミドルウェア
from fastapi import Request, Response
from starlette.middleware.base import BaseHTTPMiddleware
import time

class MetricsMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request: Request, call_next):
        start_time = time.time()
        
        try:
            response = await call_next(request)
            
            # リクエスト記録
            response_time = time.time() - start_time
            is_error = response.status_code >= 400
            metrics_collector.record_request(response_time, is_error)
            
            # レスポンスヘッダーに追加
            response.headers["X-Response-Time"] = f"{response_time:.3f}"
            
            return response
            
        except Exception as e:
            response_time = time.time() - start_time
            metrics_collector.record_request(response_time, True)
            raise

ログ管理

# app/core/logging_config.py
import logging
import logging.handlers
import json
from datetime import datetime
from typing import Dict, Any
import traceback

from app.core.config import settings

class JSONFormatter(logging.Formatter):
    """JSON形式ログフォーマッター"""
    
    def format(self, record):
        log_entry = {
            "timestamp": datetime.utcnow().isoformat(),
            "level": record.levelname,
            "logger": record.name,
            "message": record.getMessage(),
            "module": record.module,
            "function": record.funcName,
            "line": record.lineno
        }
        
        # 例外情報追加
        if record.exc_info:
            log_entry["exception"] = {
                "type": record.exc_info[0].__name__,
                "message": str(record.exc_info[1]),
                "traceback": traceback.format_exception(*record.exc_info)
            }
        
        # 追加フィールド
        if hasattr(record, 'user_id'):
            log_entry["user_id"] = record.user_id
        
        if hasattr(record, 'organization_id'):
            log_entry["organization_id"] = record.organization_id
        
        if hasattr(record, 'request_id'):
            log_entry["request_id"] = record.request_id
        
        return json.dumps(log_entry)

def setup_logging():
    """ログ設定初期化"""
    
    # ルートロガー設定
    root_logger = logging.getLogger()
    root_logger.setLevel(getattr(logging, settings.LOG_LEVEL))
    
    # 既存ハンドラー削除
    for handler in root_logger.handlers[:]:
        root_logger.removeHandler(handler)
    
    # コンソールハンドラー
    console_handler = logging.StreamHandler()
    console_handler.setFormatter(JSONFormatter())
    root_logger.addHandler(console_handler)
    
    # ファイルハンドラー(本番環境)
    if not settings.DEBUG:
        file_handler = logging.handlers.RotatingFileHandler(
            "app.log",
            maxBytes=10*1024*1024,  # 10MB
            backupCount=5
        )
        file_handler.setFormatter(JSONFormatter())
        root_logger.addHandler(file_handler)
    
    # 外部ログ収集(例:Datadog)
    if hasattr(settings, 'DATADOG_API_KEY'):
        datadog_handler = DatadogHandler(settings.DATADOG_API_KEY)
        root_logger.addHandler(datadog_handler)

class StructuredLogger:
    """構造化ログヘルパー"""
    
    def __init__(self, name: str):
        self.logger = logging.getLogger(name)
    
    def info(self, message: str, **kwargs):
        self._log(logging.INFO, message, **kwargs)
    
    def warning(self, message: str, **kwargs):
        self._log(logging.WARNING, message, **kwargs)
    
    def error(self, message: str, **kwargs):
        self._log(logging.ERROR, message, **kwargs)
    
    def _log(self, level: int, message: str, **kwargs):
        extra = {k: v for k, v in kwargs.items() if k not in ['exc_info']}
        self.logger.log(level, message, extra=extra, exc_info=kwargs.get('exc_info'))

# アプリケーションロガー
app_logger = StructuredLogger("saas_platform")

# 使用例
def log_user_action(user_id: int, action: str, details: Dict[str, Any]):
    app_logger.info(
        f"User action: {action}",
        user_id=user_id,
        action=action,
        details=details
    )

アラート システム

# app/core/alerting.py
from typing import Dict, Any, List, Callable
from enum import Enum
from dataclasses import dataclass
from datetime import datetime, timedelta
import asyncio

class AlertSeverity(Enum):
    INFO = "info"
    WARNING = "warning"
    ERROR = "error"
    CRITICAL = "critical"

@dataclass
class Alert:
    id: str
    title: str
    description: str
    severity: AlertSeverity
    source: str
    timestamp: datetime
    metadata: Dict[str, Any]
    resolved: bool = False
    resolved_at: datetime = None

class AlertRule:
    def __init__(
        self,
        name: str,
        condition: Callable[[Dict[str, Any]], bool],
        severity: AlertSeverity,
        description: str,
        cooldown_minutes: int = 15
    ):
        self.name = name
        self.condition = condition
        self.severity = severity
        self.description = description
        self.cooldown = timedelta(minutes=cooldown_minutes)
        self.last_triggered = None
    
    def should_trigger(self, metrics: Dict[str, Any]) -> bool:
        """アラート発火条件チェック"""
        # クールダウン期間中は発火しない
        if self.last_triggered and datetime.utcnow() - self.last_triggered < self.cooldown:
            return False
        
        return self.condition(metrics)
    
    def trigger(self) -> Alert:
        """アラート発火"""
        self.last_triggered = datetime.utcnow()
        
        return Alert(
            id=f"{self.name}_{int(self.last_triggered.timestamp())}",
            title=f"Alert: {self.name}",
            description=self.description,
            severity=self.severity,
            source="system_monitor",
            timestamp=self.last_triggered,
            metadata={"rule": self.name}
        )

class AlertManager:
    def __init__(self):
        self.rules: List[AlertRule] = []
        self.active_alerts: Dict[str, Alert] = {}
        self.alert_handlers: List[Callable[[Alert], None]] = []
        
        # デフォルトルール設定
        self._setup_default_rules()
    
    def add_rule(self, rule: AlertRule):
        """アラートルール追加"""
        self.rules.append(rule)
    
    def add_handler(self, handler: Callable[[Alert], None]):
        """アラートハンドラー追加"""
        self.alert_handlers.append(handler)
    
    async def check_alerts(self, metrics: Dict[str, Any]):
        """アラートチェック実行"""
        for rule in self.rules:
            if rule.should_trigger(metrics):
                alert = rule.trigger()
                await self._handle_alert(alert)
    
    async def _handle_alert(self, alert: Alert):
        """アラート処理"""
        self.active_alerts[alert.id] = alert
        
        # 各ハンドラーで処理
        for handler in self.alert_handlers:
            try:
                await self._run_handler(handler, alert)
            except Exception as e:
                app_logger.error(f"Alert handler error: {e}", alert_id=alert.id)
    
    async def _run_handler(self, handler: Callable, alert: Alert):
        """ハンドラー実行"""
        if asyncio.iscoroutinefunction(handler):
            await handler(alert)
        else:
            handler(alert)
    
    def resolve_alert(self, alert_id: str):
        """アラート解決"""
        if alert_id in self.active_alerts:
            alert = self.active_alerts[alert_id]
            alert.resolved = True
            alert.resolved_at = datetime.utcnow()
            del self.active_alerts[alert_id]
    
    def _setup_default_rules(self):
        """デフォルトアラートルール設定"""
        
        # CPU使用率アラート
        self.add_rule(AlertRule(
            name="high_cpu_usage",
            condition=lambda m: m.get("system", {}).get("avg_cpu", 0) > 80,
            severity=AlertSeverity.WARNING,
            description="CPU使用率が80%を超えています"
        ))
        
        # メモリ使用率アラート
        self.add_rule(AlertRule(
            name="high_memory_usage",
            condition=lambda m: m.get("system", {}).get("avg_memory", 0) > 85,
            severity=AlertSeverity.ERROR,
            description="メモリ使用率が85%を超えています"
        ))
        
        # エラー率アラート
        self.add_rule(AlertRule(
            name="high_error_rate",
            condition=lambda m: m.get("application", {}).get("error_rate", 0) > 5,
            severity=AlertSeverity.ERROR,
            description="エラー率が5%を超えています"
        ))
        
        # レスポンス時間アラート
        self.add_rule(AlertRule(
            name="slow_response_time",
            condition=lambda m: m.get("application", {}).get("avg_response_time", 0) > 2.0,
            severity=AlertSeverity.WARNING,
            description="平均レスポンス時間が2秒を超えています"
        ))

# Slack通知ハンドラー
async def slack_alert_handler(alert: Alert):
    """Slack通知送信"""
    import httpx
    
    webhook_url = settings.SLACK_WEBHOOK_URL
    if not webhook_url:
        return
    
    color_map = {
        AlertSeverity.INFO: "#36a64f",
        AlertSeverity.WARNING: "#ff9900", 
        AlertSeverity.ERROR: "#ff0000",
        AlertSeverity.CRITICAL: "#8b0000"
    }
    
    message = {
        "attachments": [{
            "color": color_map.get(alert.severity, "#36a64f"),
            "title": alert.title,
            "text": alert.description,
            "fields": [
                {"title": "Severity", "value": alert.severity.value, "short": True},
                {"title": "Source", "value": alert.source, "short": True},
                {"title": "Time", "value": alert.timestamp.isoformat(), "short": False}
            ]
        }]
    }
    
    async with httpx.AsyncClient() as client:
        await client.post(webhook_url, json=message)

# グローバルアラートマネージャー
alert_manager = AlertManager()
alert_manager.add_handler(slack_alert_handler)

CI/CDとデプロイ戦略

Docker設定

# Dockerfile
FROM python:3.11-slim

WORKDIR /app

# システム依存関係
RUN apt-get update && apt-get install -y \
    gcc \
    postgresql-client \
    && rm -rf /var/lib/apt/lists/*

# Python依存関係
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# アプリケーションコード
COPY . .

# 非rootユーザー作成
RUN useradd --create-home --shell /bin/bash app \
    && chown -R app:app /app
USER app

# ヘルスチェック
HEALTHCHECK --interval=30s --timeout=30s --start-period=5s --retries=3 \
    CMD curl -f http://localhost:8000/health || exit 1

EXPOSE 8000

CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]
# docker-compose.yml
version: '3.8'

services:
  app:
    build: .
    ports:
      - "8000:8000"
    environment:
      - DATABASE_URL=postgresql://postgres:password@db:5432/saas_platform
      - REDIS_URL=redis://redis:6379
    depends_on:
      - db
      - redis
    volumes:
      - ./logs:/app/logs
    deploy:
      replicas: 3
      resources:
        limits:
          memory: 512M
        reservations:
          memory: 256M

  db:
    image: postgres:15
    environment:
      POSTGRES_DB: saas_platform
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: password
    volumes:
      - postgres_data:/var/lib/postgresql/data
      - ./init.sql:/docker-entrypoint-initdb.d/init.sql
    deploy:
      resources:
        limits:
          memory: 1G

  redis:
    image: redis:7-alpine
    volumes:
      - redis_data:/data
    deploy:
      resources:
        limits:
          memory: 256M

  nginx:
    image: nginx:alpine
    ports:
      - "80:80"
      - "443:443"
    volumes:
      - ./nginx.conf:/etc/nginx/nginx.conf
      - ./ssl:/etc/nginx/ssl
    depends_on:
      - app

volumes:
  postgres_data:
  redis_data:

GitHub Actions CI/CD

# .github/workflows/ci-cd.yml
name: CI/CD Pipeline

on:
  push:
    branches: [main, develop]
  pull_request:
    branches: [main]

env:
  REGISTRY: ghcr.io
  IMAGE_NAME: $

jobs:
  test:
    runs-on: ubuntu-latest
    
    services:
      postgres:
        image: postgres:15
        env:
          POSTGRES_PASSWORD: postgres
          POSTGRES_DB: test_db
        options: >-
          --health-cmd pg_isready
          --health-interval 10s
          --health-timeout 5s
          --health-retries 5
        ports:
          - 5432:5432
          
      redis:
        image: redis
        options: >-
          --health-cmd "redis-cli ping"
          --health-interval 10s
          --health-timeout 5s
          --health-retries 5
        ports:
          - 6379:6379

    steps:
    - uses: actions/checkout@v3
    
    - name: Set up Python
      uses: actions/setup-python@v4
      with:
        python-version: '3.11'
        
    - name: Cache dependencies
      uses: actions/cache@v3
      with:
        path: ~/.cache/pip
        key: $-pip-$
        
    - name: Install dependencies
      run: |
        python -m pip install --upgrade pip
        pip install -r requirements.txt
        pip install pytest pytest-cov pytest-asyncio
        
    - name: Run linting
      run: |
        pip install flake8 black isort
        flake8 app
        black --check app
        isort --check-only app
        
    - name: Run tests
      env:
        DATABASE_URL: postgresql://postgres:postgres@localhost:5432/test_db
        REDIS_URL: redis://localhost:6379
        SECRET_KEY: test-secret-key
      run: |
        pytest tests/ -v --cov=app --cov-report=xml
        
    - name: Upload coverage
      uses: codecov/codecov-action@v3
      with:
        file: ./coverage.xml

  security:
    runs-on: ubuntu-latest
    steps:
    - uses: actions/checkout@v3
    
    - name: Run security scan
      run: |
        pip install safety bandit
        safety check
        bandit -r app/

  build:
    needs: [test, security]
    runs-on: ubuntu-latest
    if: github.event_name == 'push'
    
    steps:
    - uses: actions/checkout@v3
    
    - name: Set up Docker Buildx
      uses: docker/setup-buildx-action@v2
      
    - name: Log in to Container Registry
      uses: docker/login-action@v2
      with:
        registry: $
        username: $
        password: $
        
    - name: Extract metadata
      id: meta
      uses: docker/metadata-action@v4
      with:
        images: $/$
        tags: |
          type=ref,event=branch
          type=ref,event=pr
          type=sha
          
    - name: Build and push
      uses: docker/build-push-action@v4
      with:
        context: .
        push: true
        tags: $
        labels: $
        cache-from: type=gha
        cache-to: type=gha,mode=max

  deploy:
    needs: build
    runs-on: ubuntu-latest
    if: github.ref == 'refs/heads/main'
    environment: production
    
    steps:
    - name: Deploy to production
      run: |
        echo "Deploying to production..."
        # Kubernetes deployment script
        kubectl set image deployment/saas-platform-app \
          app=$/$:$

トラブルシューティング

FastAPI + SQLAlchemy固有の問題

問題1: SQLAlchemy接続エラー

症状:

  • connection timeout エラー
  • SQLSTATE[08006] server closed connection エラー
  • データベース操作の失敗

診断手順:

# app/core/database_diagnostics.py
import sqlalchemy
from sqlalchemy import text, create_engine
from sqlalchemy.pool import StaticPool
from app.core.config import settings
import logging

logger = logging.getLogger(__name__)

async def diagnose_database_connection():
    """データベース接続診断"""
    
    logger.info("Starting database connection diagnostics...")
    
    # 1. 設定確認
    db_url = settings.database_url
    logger.info(f"Database URL configured: {bool(db_url)}")
    logger.info(f"Database URL (masked): {db_url[:20]}...{db_url[-10:] if len(db_url) > 30 else db_url}")
    
    # 2. 基本接続テスト
    try:
        engine = create_engine(
            db_url,
            poolclass=StaticPool,
            pool_pre_ping=True,
            echo=True  # SQL ログ出力
        )
        
        with engine.connect() as connection:
            result = connection.execute(text("SELECT 1 as test"))
            test_value = result.scalar()
            logger.info(f"Basic connection test: {test_value == 1}")
            
    except Exception as e:
        logger.error(f"Basic connection failed: {e}")
        return False
    
    # 3. 接続プール診断
    try:
        from app.core.database import engine as app_engine
        pool = app_engine.pool
        
        logger.info(f"Pool status:")
        logger.info(f"  - Size: {pool.size()}")
        logger.info(f"  - Checked in: {pool.checkedin()}")
        logger.info(f"  - Checked out: {pool.checkedout()}")
        logger.info(f"  - Overflow: {pool.overflow()}")
        
    except Exception as e:
        logger.error(f"Pool diagnostics failed: {e}")
    
    # 4. パフォーマンステスト
    try:
        import time
        start_time = time.time()
        
        with engine.connect() as connection:
            # 複数クエリ実行
            for i in range(10):
                connection.execute(text("SELECT pg_sleep(0.1)"))
        
        execution_time = time.time() - start_time
        logger.info(f"Performance test: {execution_time:.2f}s for 10 queries")
        
        if execution_time > 5:
            logger.warning("Slow database performance detected")
            
    except Exception as e:
        logger.error(f"Performance test failed: {e}")
    
    return True

# 使用例
async def startup_diagnostics():
    success = await diagnose_database_connection()
    if not success:
        raise Exception("Database diagnostics failed")

解決策:

# app/core/database_optimized.py
from sqlalchemy import create_engine, event
from sqlalchemy.pool import QueuePool, StaticPool
from sqlalchemy.orm import sessionmaker
from contextlib import contextmanager
import logging

logger = logging.getLogger(__name__)

class OptimizedDatabaseManager:
    def __init__(self, database_url: str):
        self.database_url = database_url
        self.engine = None
        self.SessionLocal = None
        self._setup_engine()
    
    def _setup_engine(self):
        """最適化されたエンジン設定"""
        
        # 本番環境用設定
        if settings.ENVIRONMENT == "production":
            self.engine = create_engine(
                self.database_url,
                poolclass=QueuePool,
                pool_size=20,           # 基本接続数
                max_overflow=30,        # 追加接続数
                pool_pre_ping=True,     # 接続前ping
                pool_recycle=3600,      # 1時間で接続リサイクル
                echo=False,             # SQL ログ無効
                connect_args={
                    "connect_timeout": 10,
                    "application_name": "fastapi_saas"
                }
            )
        else:
            # 開発環境用設定
            self.engine = create_engine(
                self.database_url,
                poolclass=StaticPool,
                pool_pre_ping=True,
                echo=settings.DEBUG,
                connect_args={"check_same_thread": False} if "sqlite" in self.database_url else {}
            )
        
        # 接続イベントリスナー
        @event.listens_for(self.engine, "connect")
        def set_sqlite_pragma(dbapi_connection, connection_record):
            if "sqlite" in self.database_url:
                cursor = dbapi_connection.cursor()
                cursor.execute("PRAGMA foreign_keys=ON")
                cursor.close()
        
        @event.listens_for(self.engine, "checkout")
        def receive_checkout(dbapi_connection, connection_record, connection_proxy):
            logger.debug("Connection checked out from pool")
        
        self.SessionLocal = sessionmaker(
            autocommit=False,
            autoflush=False,
            bind=self.engine
        )
    
    @contextmanager
    def get_session(self):
        """安全なセッション管理"""
        session = self.SessionLocal()
        try:
            yield session
            session.commit()
        except Exception as e:
            session.rollback()
            logger.error(f"Database session error: {e}")
            raise
        finally:
            session.close()
    
    async def health_check(self) -> bool:
        """ヘルスチェック"""
        try:
            with self.get_session() as session:
                session.execute(text("SELECT 1"))
            return True
        except Exception as e:
            logger.error(f"Database health check failed: {e}")
            return False
    
    def get_connection_info(self) -> dict:
        """接続情報取得"""
        pool = self.engine.pool
        return {
            "pool_size": pool.size(),
            "checked_in": pool.checkedin(),
            "checked_out": pool.checkedout(),
            "overflow": pool.overflow(),
            "invalid": pool.invalid()
        }

# グローバルインスタンス
db_manager = OptimizedDatabaseManager(settings.database_url)

問題2: Alembicマイグレーションエラー

症状:

  • Target database is not up to date エラー
  • マイグレーション実行失敗
  • スキーマ不整合

診断手順:

# scripts/migration_diagnostics.py
import alembic
from alembic.config import Config
from alembic import command
from alembic.script import ScriptDirectory
from alembic.runtime.environment import EnvironmentContext
from sqlalchemy import create_engine, MetaData
import logging

logger = logging.getLogger(__name__)

class MigrationDiagnostics:
    def __init__(self, alembic_cfg_path: str = "alembic.ini"):
        self.alembic_cfg = Config(alembic_cfg_path)
        self.engine = create_engine(settings.database_url)
    
    def check_migration_status(self):
        """マイグレーション状態確認"""
        
        logger.info("Checking migration status...")
        
        # 現在のリビジョン確認
        try:
            with self.engine.connect() as connection:
                context = EnvironmentContext(
                    self.alembic_cfg,
                    self.engine,
                    connection=connection
                )
                
                with context.begin_transaction():
                    current_rev = context.get_current_revision()
                    logger.info(f"Current database revision: {current_rev}")
        
        except Exception as e:
            logger.error(f"Failed to get current revision: {e}")
            return False
        
        # 利用可能なマイグレーション確認
        try:
            script_dir = ScriptDirectory.from_config(self.alembic_cfg)
            heads = script_dir.get_heads()
            logger.info(f"Available heads: {heads}")
            
            # 待機中のマイグレーション
            revisions = script_dir.walk_revisions("head", current_rev)
            pending = list(revisions)
            logger.info(f"Pending migrations: {len(pending)}")
            
            for rev in pending[:5]:  # 最初の5件のみ表示
                logger.info(f"  - {rev.revision}: {rev.doc}")
                
        except Exception as e:
            logger.error(f"Failed to check pending migrations: {e}")
            return False
        
        return True
    
    def validate_models_vs_database(self):
        """モデルとデータベースの整合性確認"""
        
        try:
            from app.models import Base
            
            # モデルから期待されるメタデータ
            model_metadata = Base.metadata
            
            # データベースの実際のメタデータ
            db_metadata = MetaData()
            db_metadata.reflect(bind=self.engine)
            
            # テーブル比較
            model_tables = set(model_metadata.tables.keys())
            db_tables = set(db_metadata.tables.keys())
            
            missing_in_db = model_tables - db_tables
            extra_in_db = db_tables - model_tables
            
            if missing_in_db:
                logger.warning(f"Tables missing in database: {missing_in_db}")
            
            if extra_in_db:
                logger.warning(f"Extra tables in database: {extra_in_db}")
            
            # 共通テーブルのカラム比較
            common_tables = model_tables & db_tables
            for table_name in common_tables:
                model_table = model_metadata.tables[table_name]
                db_table = db_metadata.tables[table_name]
                
                model_columns = set(model_table.columns.keys())
                db_columns = set(db_table.columns.keys())
                
                missing_columns = model_columns - db_columns
                extra_columns = db_columns - model_columns
                
                if missing_columns:
                    logger.warning(f"Table {table_name} - missing columns: {missing_columns}")
                
                if extra_columns:
                    logger.warning(f"Table {table_name} - extra columns: {extra_columns}")
            
            return len(missing_in_db) == 0 and len(extra_in_db) == 0
            
        except Exception as e:
            logger.error(f"Model validation failed: {e}")
            return False
    
    def repair_migration_state(self):
        """マイグレーション状態修復"""
        
        logger.info("Attempting to repair migration state...")
        
        try:
            # 1. データベースのバックアップ推奨メッセージ
            logger.warning("IMPORTANT: Please backup your database before proceeding!")
            
            # 2. マイグレーション履歴テーブル確認
            with self.engine.connect() as connection:
                result = connection.execute(text("""
                    SELECT table_name 
                    FROM information_schema.tables 
                    WHERE table_name = 'alembic_version'
                """))
                
                if not result.fetchone():
                    logger.info("Creating alembic_version table...")
                    command.stamp(self.alembic_cfg, "head")
                
            # 3. 強制的に最新リビジョンにスタンプ(慎重に)
            logger.info("Stamping database to head revision...")
            command.stamp(self.alembic_cfg, "head")
            
            logger.info("Migration repair completed")
            return True
            
        except Exception as e:
            logger.error(f"Migration repair failed: {e}")
            return False

# 使用例
diagnostics = MigrationDiagnostics()
diagnostics.check_migration_status()
diagnostics.validate_models_vs_database()

解決策:

# scripts/safe_migration.sh
#!/bin/bash

set -e

echo "🔍 Starting safe migration process..."

# 1. バックアップ作成
echo "📦 Creating database backup..."
pg_dump $DATABASE_URL > backup_$(date +%Y%m%d_%H%M%S).sql

# 2. マイグレーション状態確認
echo "📋 Checking migration status..."
python -c "
from scripts.migration_diagnostics import MigrationDiagnostics
diag = MigrationDiagnostics()
diag.check_migration_status()
diag.validate_models_vs_database()
"

# 3. ドライラン実行
echo "🧪 Performing dry run..."
alembic upgrade head --sql > migration_preview.sql
echo "Migration preview saved to migration_preview.sql"

# 4. 確認プロンプト
read -p "🤔 Continue with migration? (y/N): " -n 1 -r
echo
if [[ ! $REPLY =~ ^[Yy]$ ]]; then
    echo "❌ Migration cancelled"
    exit 1
fi

# 5. 実際のマイグレーション実行
echo "🚀 Running migration..."
alembic upgrade head

# 6. 検証
echo "✅ Verifying migration..."
python -c "
from scripts.migration_diagnostics import MigrationDiagnostics
diag = MigrationDiagnostics()
success = diag.validate_models_vs_database()
exit(0 if success else 1)
"

echo "🎉 Migration completed successfully!"

問題3: マルチテナント分離の問題

症状:

  • 別テナントのデータが見える
  • RLS ポリシーが動作しない
  • 認可エラー

診断手順:

# app/utils/tenant_diagnostics.py
from typing import List, Dict, Any
from sqlalchemy.orm import Session
from sqlalchemy import text
from app.core.database import get_db
from app.models.organization import Organization
from app.models.user import User
import logging

logger = logging.getLogger(__name__)

class TenantIsolationDiagnostics:
    def __init__(self, db: Session):
        self.db = db
    
    async def verify_rls_policies(self) -> Dict[str, Any]:
        """RLS ポリシー確認"""
        
        logger.info("Verifying RLS policies...")
        
        # 1. RLS が有効なテーブル確認
        result = self.db.execute(text("""
            SELECT schemaname, tablename, rowsecurity 
            FROM pg_tables 
            WHERE schemaname = 'public' 
            AND tablename IN ('users', 'organizations', 'projects', 'tasks')
        """))
        
        rls_status = {}
        for row in result:
            rls_status[row.tablename] = row.rowsecurity
        
        logger.info(f"RLS status: {rls_status}")
        
        # 2. 適用されているポリシー確認
        policies = self.db.execute(text("""
            SELECT schemaname, tablename, policyname, cmd, qual, with_check
            FROM pg_policies 
            WHERE schemaname = 'public'
            ORDER BY tablename, policyname
        """))
        
        policy_info = []
        for policy in policies:
            policy_info.append({
                'table': policy.tablename,
                'policy': policy.policyname,
                'command': policy.cmd,
                'condition': policy.qual,
                'check': policy.with_check
            })
        
        logger.info(f"Active policies: {len(policy_info)}")
        
        return {
            'rls_enabled': rls_status,
            'policies': policy_info
        }
    
    async def test_tenant_isolation(
        self, 
        user1_id: int, 
        user2_id: int, 
        org1_id: int, 
        org2_id: int
    ) -> bool:
        """テナント分離テスト"""
        
        logger.info("Testing tenant isolation...")
        
        try:
            # 1. ユーザー1でログイン状態をシミュレート
            self.db.execute(text("""
                SELECT set_config('request.jwt.claim.sub', :user_id, true)
            """), {"user_id": str(user1_id)})
            
            # 2. 組織1のプロジェクト確認
            org1_projects = self.db.execute(text("""
                SELECT id, name, organization_id 
                FROM projects 
                WHERE organization_id = :org_id
            """), {"org_id": org1_id}).fetchall()
            
            logger.info(f"User1 can see {len(org1_projects)} projects from org1")
            
            # 3. 組織2のプロジェクト確認(見えてはいけない)
            org2_projects = self.db.execute(text("""
                SELECT id, name, organization_id 
                FROM projects 
                WHERE organization_id = :org_id
            """), {"org_id": org2_id}).fetchall()
            
            logger.info(f"User1 can see {len(org2_projects)} projects from org2")
            
            # 4. 分離確認
            if len(org2_projects) > 0:
                logger.error("SECURITY VIOLATION: User can see other tenant's data!")
                return False
            
            # 5. ユーザー2でテスト
            self.db.execute(text("""
                SELECT set_config('request.jwt.claim.sub', :user_id, true)
            """), {"user_id": str(user2_id)})
            
            org1_projects_user2 = self.db.execute(text("""
                SELECT id, name, organization_id 
                FROM projects 
                WHERE organization_id = :org_id
            """), {"org_id": org1_id}).fetchall()
            
            if len(org1_projects_user2) > 0:
                logger.error("SECURITY VIOLATION: User2 can see org1's data!")
                return False
            
            logger.info("✅ Tenant isolation test passed")
            return True
            
        except Exception as e:
            logger.error(f"Tenant isolation test failed: {e}")
            return False
    
    async def fix_missing_rls_policies(self):
        """不足しているRLSポリシーの修復"""
        
        logger.info("Checking and fixing RLS policies...")
        
        # 必要なポリシー定義
        required_policies = [
            {
                'table': 'projects',
                'policy': 'tenant_isolation',
                'sql': '''
                    CREATE POLICY tenant_isolation ON projects
                    FOR ALL USING (
                        organization_id IN (
                            SELECT organization_id 
                            FROM user_organizations uo
                            WHERE uo.user_id = (current_setting('request.jwt.claim.sub'))::int
                        )
                    )
                '''
            },
            {
                'table': 'tasks',
                'policy': 'tenant_isolation',
                'sql': '''
                    CREATE POLICY tenant_isolation ON tasks
                    FOR ALL USING (
                        project_id IN (
                            SELECT p.id 
                            FROM projects p
                            JOIN user_organizations uo ON p.organization_id = uo.organization_id
                            WHERE uo.user_id = (current_setting('request.jwt.claim.sub'))::int
                        )
                    )
                '''
            }
        ]
        
        for policy_def in required_policies:
            try:
                # 既存ポリシー確認
                existing = self.db.execute(text(f"""
                    SELECT 1 FROM pg_policies 
                    WHERE tablename = '{policy_def['table']}' 
                    AND policyname = '{policy_def['policy']}'
                """)).fetchone()
                
                if not existing:
                    logger.info(f"Creating policy {policy_def['policy']} for {policy_def['table']}")
                    self.db.execute(text(policy_def['sql']))
                    
                    # RLS有効化
                    self.db.execute(text(f"""
                        ALTER TABLE {policy_def['table']} ENABLE ROW LEVEL SECURITY
                    """))
                    
                    self.db.commit()
                    
            except Exception as e:
                logger.error(f"Failed to create policy for {policy_def['table']}: {e}")
                self.db.rollback()

# 使用例
async def diagnose_tenant_security():
    db = next(get_db())
    diagnostics = TenantIsolationDiagnostics(db)
    
    # RLS確認
    rls_info = await diagnostics.verify_rls_policies()
    
    # テナント分離テスト
    isolation_ok = await diagnostics.test_tenant_isolation(
        user1_id=1, user2_id=2, org1_id=1, org2_id=2
    )
    
    if not isolation_ok:
        await diagnostics.fix_missing_rls_policies()

パフォーマンス問題

問題4: N+1クエリ問題

症状:

  • 大量のデータベースクエリ実行
  • ページ読み込みが遅い
  • CPU使用率が高い

診断手順:

# app/utils/query_diagnostics.py
import time
import logging
from typing import List, Dict, Any
from sqlalchemy import event
from sqlalchemy.engine import Engine
from sqlalchemy.orm import Session
from contextlib import contextmanager
from collections import defaultdict

logger = logging.getLogger(__name__)

class QueryDiagnostics:
    def __init__(self):
        self.queries: List[Dict[str, Any]] = []
        self.query_counts = defaultdict(int)
        self.monitoring = False
    
    def start_monitoring(self):
        """クエリ監視開始"""
        self.monitoring = True
        self.queries.clear()
        self.query_counts.clear()
        
        @event.listens_for(Engine, "before_cursor_execute")
        def receive_before_cursor_execute(conn, cursor, statement, parameters, context, executemany):
            if self.monitoring:
                context._query_start_time = time.time()
        
        @event.listens_for(Engine, "after_cursor_execute")
        def receive_after_cursor_execute(conn, cursor, statement, parameters, context, executemany):
            if self.monitoring:
                total = time.time() - context._query_start_time
                
                # クエリ正規化(パラメータを除去)
                normalized_query = self._normalize_query(statement)
                
                query_info = {
                    'query': statement,
                    'normalized': normalized_query,
                    'parameters': parameters,
                    'duration': total,
                    'timestamp': time.time()
                }
                
                self.queries.append(query_info)
                self.query_counts[normalized_query] += 1
                
                # スロークエリ警告
                if total > 1.0:
                    logger.warning(f"Slow query detected: {total:.2f}s - {statement[:100]}...")
    
    def stop_monitoring(self):
        """クエリ監視停止"""
        self.monitoring = False
    
    def _normalize_query(self, query: str) -> str:
        """クエリ正規化"""
        import re
        # パラメータを置換
        normalized = re.sub(r'%\([^)]*\)s', '?', query)
        normalized = re.sub(r'\$\d+', '?', normalized)
        # 複数の空白を単一に
        normalized = re.sub(r'\s+', ' ', normalized.strip())
        return normalized
    
    def analyze_queries(self) -> Dict[str, Any]:
        """クエリ分析"""
        if not self.queries:
            return {"error": "No queries recorded"}
        
        total_queries = len(self.queries)
        total_time = sum(q['duration'] for q in self.queries)
        
        # N+1問題検出
        n_plus_one_patterns = []
        for normalized, count in self.query_counts.items():
            if count > 10 and 'SELECT' in normalized and 'WHERE' in normalized:
                n_plus_one_patterns.append({
                    'query': normalized,
                    'count': count,
                    'severity': 'high' if count > 50 else 'medium'
                })
        
        # 最遅クエリ
        slowest_queries = sorted(self.queries, key=lambda x: x['duration'], reverse=True)[:5]
        
        # 頻繁なクエリ
        frequent_queries = sorted(
            self.query_counts.items(), 
            key=lambda x: x[1], 
            reverse=True
        )[:10]
        
        return {
            'total_queries': total_queries,
            'total_time': total_time,
            'avg_time': total_time / total_queries,
            'n_plus_one_patterns': n_plus_one_patterns,
            'slowest_queries': [
                {
                    'query': q['query'][:200] + '...',
                    'duration': q['duration'],
                    'parameters': str(q['parameters'])[:100]
                }
                for q in slowest_queries
            ],
            'frequent_queries': [
                {'query': q[:200] + '...', 'count': count}
                for q, count in frequent_queries
            ]
        }
    
    @contextmanager
    def monitor_request(self, endpoint: str):
        """リクエスト単位でのクエリ監視"""
        logger.info(f"Starting query monitoring for {endpoint}")
        self.start_monitoring()
        
        try:
            yield self
        finally:
            self.stop_monitoring()
            analysis = self.analyze_queries()
            
            # N+1問題警告
            if analysis.get('n_plus_one_patterns'):
                logger.warning(f"N+1 query patterns detected in {endpoint}:")
                for pattern in analysis['n_plus_one_patterns']:
                    logger.warning(f"  - {pattern['query'][:100]}... (count: {pattern['count']})")
            
            logger.info(f"Query analysis for {endpoint}: {analysis['total_queries']} queries, {analysis['total_time']:.2f}s total")

# グローバルインスタンス
query_diagnostics = QueryDiagnostics()

# FastAPI依存性注入
from fastapi import Depends

def get_query_monitor():
    return query_diagnostics

# 使用例
@app.get("/api/v1/projects")
async def get_projects(
    current_user: User = Depends(get_current_user),
    query_monitor: QueryDiagnostics = Depends(get_query_monitor)
):
    with query_monitor.monitor_request("GET /api/v1/projects"):
        # この中でのクエリが監視される
        projects = await project_service.get_user_projects(current_user.id)
        return projects

解決策:

# app/services/optimized_project_service.py
from sqlalchemy.orm import Session, joinedload, selectinload
from typing import List
from app.models.project import Project
from app.models.user import User

class OptimizedProjectService:
    def __init__(self, db: Session):
        self.db = db
    
    def get_user_projects_optimized(self, user_id: int) -> List[Project]:
        """N+1問題を解決したプロジェクト取得"""
        
        # 悪い例(N+1問題あり):
        # projects = self.db.query(Project).filter_by(user_id=user_id).all()
        # for project in projects:
        #     _ = project.tasks  # これが各プロジェクトごとに別のクエリを実行
        
        # 良い例(eager loading使用):
        projects = (
            self.db.query(Project)
            .options(
                joinedload(Project.organization),  # 1-to-1 relation
                selectinload(Project.tasks).selectinload(Task.assignee),  # 1-to-many relation
                selectinload(Project.members).selectinload(ProjectMember.user)
            )
            .join(Project.members)
            .filter(ProjectMember.user_id == user_id)
            .filter(ProjectMember.is_active == True)
            .all()
        )
        
        return projects
    
    def get_project_with_tasks_batch(self, project_ids: List[int]) -> Dict[int, List[Task]]:
        """バッチでタスク取得"""
        
        from app.models.task import Task
        
        # プロジェクトIDでタスクを一括取得
        tasks = (
            self.db.query(Task)
            .options(joinedload(Task.assignee))
            .filter(Task.project_id.in_(project_ids))
            .all()
        )
        
        # プロジェクトIDでグループ化
        tasks_by_project = defaultdict(list)
        for task in tasks:
            tasks_by_project[task.project_id].append(task)
        
        return dict(tasks_by_project)
    
    async def get_dashboard_data_optimized(self, user_id: int) -> Dict[str, Any]:
        """ダッシュボード用最適化クエリ"""
        
        # 1つのクエリで必要なデータを全て取得
        result = self.db.execute(text("""
            WITH user_projects AS (
                SELECT DISTINCT p.id, p.name, p.status, p.organization_id
                FROM projects p
                JOIN project_members pm ON p.id = pm.project_id
                WHERE pm.user_id = :user_id AND pm.is_active = true
            ),
            project_stats AS (
                SELECT 
                    up.id as project_id,
                    up.name as project_name,
                    COUNT(t.id) as total_tasks,
                    COUNT(CASE WHEN t.status = 'completed' THEN 1 END) as completed_tasks,
                    COUNT(CASE WHEN t.assignee_id = :user_id THEN 1 END) as my_tasks
                FROM user_projects up
                LEFT JOIN tasks t ON up.id = t.project_id
                GROUP BY up.id, up.name
            )
            SELECT 
                ps.*,
                CASE 
                    WHEN ps.total_tasks > 0 
                    THEN (ps.completed_tasks * 100.0 / ps.total_tasks)
                    ELSE 0 
                END as completion_percentage
            FROM project_stats ps
            ORDER BY ps.project_name
        """), {"user_id": user_id})
        
        projects = []
        for row in result:
            projects.append({
                'id': row.project_id,
                'name': row.project_name,
                'total_tasks': row.total_tasks,
                'completed_tasks': row.completed_tasks,
                'my_tasks': row.my_tasks,
                'completion_percentage': float(row.completion_percentage)
            })
        
        return {
            'projects': projects,
            'total_projects': len(projects),
            'total_tasks': sum(p['total_tasks'] for p in projects),
            'total_my_tasks': sum(p['my_tasks'] for p in projects)
        }

デバッグとモニタリング

包括的ログ設定

# app/core/logging_config.py
import logging
import logging.config
from typing import Dict, Any
import json
import traceback
from datetime import datetime

class JSONFormatter(logging.Formatter):
    """JSON形式ログフォーマッター"""
    
    def format(self, record):
        log_entry = {
            'timestamp': datetime.utcnow().isoformat(),
            'level': record.levelname,
            'logger': record.name,
            'message': record.getMessage(),
            'module': record.module,
            'function': record.funcName,
            'line': record.lineno
        }
        
        # 例外情報追加
        if record.exc_info:
            log_entry['exception'] = {
                'type': record.exc_info[0].__name__,
                'message': str(record.exc_info[1]),
                'traceback': traceback.format_exception(*record.exc_info)
            }
        
        # カスタム属性追加
        for key, value in record.__dict__.items():
            if key.startswith('custom_'):
                log_entry[key[7:]] = value  # 'custom_' プレフィックス除去
        
        return json.dumps(log_entry, ensure_ascii=False)

def setup_logging(environment: str = "development") -> None:
    """ログ設定初期化"""
    
    if environment == "production":
        config = {
            'version': 1,
            'disable_existing_loggers': False,
            'formatters': {
                'json': {
                    '()': 'app.core.logging_config.JSONFormatter'
                },
                'standard': {
                    'format': '%(asctime)s [%(levelname)s] %(name)s: %(message)s'
                }
            },
            'handlers': {
                'console': {
                    'class': 'logging.StreamHandler',
                    'level': 'INFO',
                    'formatter': 'json',
                    'stream': 'ext://sys.stdout'
                },
                'file': {
                    'class': 'logging.handlers.RotatingFileHandler',
                    'level': 'INFO',
                    'formatter': 'json',
                    'filename': 'logs/app.log',
                    'maxBytes': 10485760,  # 10MB
                    'backupCount': 5
                },
                'error_file': {
                    'class': 'logging.handlers.RotatingFileHandler',
                    'level': 'ERROR',
                    'formatter': 'json',
                    'filename': 'logs/error.log',
                    'maxBytes': 10485760,
                    'backupCount': 5
                }
            },
            'loggers': {
                'app': {
                    'level': 'INFO',
                    'handlers': ['console', 'file', 'error_file'],
                    'propagate': False
                },
                'sqlalchemy.engine': {
                    'level': 'WARNING',
                    'handlers': ['file'],
                    'propagate': False
                },
                'alembic': {
                    'level': 'INFO',
                    'handlers': ['file'],
                    'propagate': False
                }
            },
            'root': {
                'level': 'WARNING',
                'handlers': ['console']
            }
        }
    else:
        # 開発環境用設定
        config = {
            'version': 1,
            'disable_existing_loggers': False,
            'formatters': {
                'detailed': {
                    'format': '%(asctime)s [%(levelname)8s] %(name)s:%(lineno)d - %(message)s'
                }
            },
            'handlers': {
                'console': {
                    'class': 'logging.StreamHandler',
                    'level': 'DEBUG',
                    'formatter': 'detailed'
                }
            },
            'loggers': {
                'app': {
                    'level': 'DEBUG',
                    'handlers': ['console'],
                    'propagate': False
                },
                'sqlalchemy.engine': {
                    'level': 'INFO',
                    'handlers': ['console'],
                    'propagate': False
                }
            },
            'root': {
                'level': 'INFO',
                'handlers': ['console']
            }
        }
    
    logging.config.dictConfig(config)

# ログヘルパー関数
def log_function_call(func):
    """関数呼び出しログデコレーター"""
    def wrapper(*args, **kwargs):
        logger = logging.getLogger(func.__module__)
        logger.debug(f"Calling {func.__name__} with args={args}, kwargs={kwargs}")
        
        try:
            result = func(*args, **kwargs)
            logger.debug(f"{func.__name__} completed successfully")
            return result
        except Exception as e:
            logger.error(f"{func.__name__} failed", extra={
                'custom_function': func.__name__,
                'custom_error_type': type(e).__name__
            })
            raise
    
    return wrapper

# 使用例
logger = logging.getLogger(__name__)

@log_function_call
def create_project(project_data: dict):
    logger.info("Creating new project", extra={
        'custom_project_name': project_data.get('name'),
        'custom_organization_id': project_data.get('organization_id')
    })
    
    # プロジェクト作成処理...
    
    logger.info("Project created successfully", extra={
        'custom_project_id': project.id
    })

まとめ

Chapter 5-3では、エンタープライズレベルの拡張性とパフォーマンス最適化を実装しました。

実装した機能:

  • キャッシュとデータベース最適化
  • メトリクス収集と監視システム
  • アラート・ログ管理
  • CI/CDパイプラインとコンテナー化

アーキテクチャ比較まとめ:

項目 クライアント実装 Edge Functions 独立APIサーバー
開発速度 ★★★ ★★
拡張性 ★★ ★★★
セキュリティ ★★ ★★★ ★★★
運用負荷 ★★★ ★★
コスト効率 ★★★ ★★

これで独立APIサーバーパターンの実装が完了しました。要件に応じて最適なアーキテクチャを選択し、段階的に移行する戦略が重要です。


章末演習・確認事項

演習問題

  1. キャッシュ戦略設計
    • 与えられたAPIエンドポイントに対して最適なキャッシュ戦略を設計してください
    • TTL値の設定根拠を説明してください
  2. メトリクス分析
    • サンプルメトリクスデータから性能課題を特定してください
    • 改善提案を3つ以上提示してください
  3. アラート設定
    • 想定シナリオに基づいてアラートルールを設計してください
    • 誤検知を最小化する戦略を含めてください
  4. CI/CD設計
    • セキュリティを考慮したCI/CDパイプラインを設計してください
    • 各ステージの目的と実行内容を明記してください

確認事項

技術理解

  • Redis を使用したキャッシュ実装ができる
  • データベースパーティショニング戦略を説明できる
  • ロードバランシングアルゴリズムの違いを理解している
  • メトリクス収集とアラート設計ができる
  • CI/CDパイプラインを構築できる
  • コンテナー化のベストプラクティスを理解している

実装能力

  • パフォーマンス最適化コードを書ける
  • 監視システムを構築できる
  • 自動化スクリプトを作成できる
  • セキュリティを考慮した設定ができる
  • スケーラブルなアーキテクチャを設計できる

推奨学習時間

  • 理論学習: 6-8時間
  • 実装演習: 12-16時間
  • 運用演習: 4-6時間

合計: 22-30時間

📝 Chapter 5-3 学習まとめ

習得できたスキル

  • ✅ エンタープライズ級スケーリング戦略・アーキテクチャ設計
  • ✅ 高性能システムのパフォーマンス最適化・キャッシュ戦略
  • ✅ 本番運用レベルの監視・アラート・自動化システム
  • ✅ CI/CD・コンテナ化・DevOps による効率的運用体制

🎯 Chapter 5シリーズ最終達成状況

| Chapter | 主要テーマ | 技術習得レベル | 実用適用範囲 | |:——–|:———-|:————-|:————-| | 5-1 | APIサーバー基礎 | 🌱 FastAPI + SQLAlchemy | 小規模サービス・MVP | | 5-2 | マルチテナント | 🚀 企業分離・複雑権限 | 中規模SaaS・B2Bサービス | | 5-3 | スケーリング・運用 | 💪 エンタープライズ級 | 大規模サービス・国際展開 |

🔄 Part II(アーキテクチャパターン)完了

3つのアーキテクチャパターンの習得完了:

  • Chapter 3: クライアントサイド(個人・小規模チーム向け)
  • Chapter 4: Edge Functions(スタートアップ・中規模向け)
  • Chapter 5: 独立APIサーバー(エンタープライズ・大規模向け)

🚀 次章予告:パフォーマンス最適化

Chapter 6では、「F1マシンのチューニング・エンジニア」レベルの最適化技術を学習します:

  • データベース最適化: インデックス・パーティション・クエリチューニング
  • 🚀 API高速化: PostgREST設定・キャッシュ戦略・レスポンス最適化
  • 📊 監視・測定: パフォーマンス計測・ボトルネック特定・継続改善
  • 🔧 自動最適化: AI駆動の性能改善・プロアクティブなチューニング

💡 実装目標: 「ユーザーが体感する待ち時間を1秒未満に最適化する高速システム」


📍 ナビゲーション