第9章 演習問題解答

問題1:マイクロサービス認証設計

解答

Eコマースシステムのマイクロサービス構成と認証アーキテクチャ

サービス一覧と役割

services:
  1_api_gateway:
    role: "外部リクエストの受付、認証、ルーティング"
    public: true
    
  2_auth_service:
    role: "認証・認可・トークン管理"
    critical: true
    
  3_user_service:
    role: "ユーザー情報管理"
    pii: true
    
  4_product_service:
    role: "商品カタログ管理"
    cacheable: true
    
  5_inventory_service:
    role: "在庫管理"
    realtime: true
    
  6_cart_service:
    role: "ショッピングカート管理"
    session_bound: true
    
  7_order_service:
    role: "注文処理"
    transactional: true
    
  8_payment_service:
    role: "決済処理"
    pci_compliant: true
    
  9_notification_service:
    role: "通知(メール、SMS、プッシュ)"
    async: true
    
  10_analytics_service:
    role: "分析・レポーティング"
    read_only: true

認証フロー設計

class EcommerceAuthFlow:
    """Eコマース認証フロー"""
    
    def user_authentication_flow(self):
        """ユーザー認証フロー"""
        return {
            'sequence': '''
            1. Client → API Gateway: Login request
            2. API Gateway → Auth Service: Validate credentials
            3. Auth Service → User Service: Get user details
            4. Auth Service → API Gateway: Issue tokens
            5. API Gateway → Client: Return tokens
            
            Token Strategy:
            - Access Token: JWT (15分)
            - Refresh Token: Opaque (7日)
            - ID Token: JWT (ユーザー情報)
            ''',
            
            'implementation': '''
            @api_gateway.post("/auth/login")
            async def login(credentials: LoginRequest):
                # 1. 基本的な検証
                if not validate_input(credentials):
                    raise ValidationError()
                
                # 2. Auth Serviceへの認証要求
                auth_result = await auth_service.authenticate(
                    email=credentials.email,
                    password=credentials.password,
                    device_id=request.headers.get("X-Device-ID")
                )
                
                if not auth_result.success:
                    # レート制限の更新
                    await rate_limiter.record_failure(credentials.email)
                    raise AuthenticationError()
                
                # 3. トークン生成
                tokens = await auth_service.create_tokens(
                    user_id=auth_result.user_id,
                    roles=auth_result.roles,
                    device_id=request.headers.get("X-Device-ID")
                )
                
                # 4. セッション情報の保存
                await session_store.create(
                    session_id=tokens.session_id,
                    user_id=auth_result.user_id,
                    tokens=tokens
                )
                
                return {
                    "access_token": tokens.access_token,
                    "refresh_token": tokens.refresh_token,
                    "expires_in": 900
                }
            '''
        }
    
    def service_to_service_flow(self):
        """サービス間認証フロー"""
        return {
            'pattern': 'Service Mesh with mTLS + Service Tokens',
            
            'implementation': '''
            class ServiceAuthenticator:
                def __init__(self):
                    self.service_registry = ServiceRegistry()
                    self.token_issuer = ServiceTokenIssuer()
                
                async def authenticate_service(self, cert: Certificate) -> ServiceIdentity:
                    # 1. mTLSによる相互認証
                    service_name = extract_service_name(cert)
                    if not self.verify_certificate(cert):
                        raise InvalidCertificateError()
                    
                    # 2. サービスの登録確認
                    service = await self.service_registry.get(service_name)
                    if not service or not service.active:
                        raise UnregisteredServiceError()
                    
                    # 3. サービストークンの発行
                    token = self.token_issuer.issue(
                        service_name=service_name,
                        permissions=service.permissions,
                        validity=timedelta(minutes=5)
                    )
                    
                    return ServiceIdentity(
                        name=service_name,
                        token=token,
                        permissions=service.permissions
                    )
            '''
        }
    
    def token_strategy(self):
        """トークン戦略"""
        return {
            'token_types': {
                'user_access_token': {
                    'format': 'JWT',
                    'claims': {
                        'sub': 'user_id',
                        'roles': ['customer', 'premium'],
                        'sid': 'session_id',
                        'did': 'device_id'
                    },
                    'ttl': '15 minutes',
                    'usage': 'API呼び出し'
                },
                
                'service_token': {
                    'format': 'JWT (内部署名)',
                    'claims': {
                        'iss': 'auth-service',
                        'sub': 'service-name',
                        'aud': 'target-service',
                        'permissions': ['read', 'write']
                    },
                    'ttl': '5 minutes',
                    'usage': 'サービス間通信'
                },
                
                'delegation_token': {
                    'format': 'JWT',
                    'claims': {
                        'act': 'acting-service',
                        'sub': 'original-user',
                        'dlg': 'delegation-chain'
                    },
                    'ttl': '2 minutes',
                    'usage': 'サービス連鎖での権限委譲'
                }
            }
        }
    
    def error_handling(self):
        """エラー処理戦略"""
        return {
            'error_responses': {
                'auth_service_down': {
                    'fallback': 'Cache-based validation',
                    'response': {
                        'status': 503,
                        'error': 'AUTH_SERVICE_UNAVAILABLE',
                        'message': 'Authentication service is temporarily unavailable',
                        'retry_after': 30
                    }
                },
                
                'token_expired': {
                    'auto_refresh': True,
                    'response': {
                        'status': 401,
                        'error': 'TOKEN_EXPIRED',
                        'message': 'Access token has expired',
                        'refresh_endpoint': '/auth/refresh'
                    }
                },
                
                'insufficient_permissions': {
                    'log_attempt': True,
                    'response': {
                        'status': 403,
                        'error': 'INSUFFICIENT_PERMISSIONS',
                        'message': 'You do not have permission to access this resource',
                        'required_roles': ['admin']
                    }
                }
            },
            
            'circuit_breaker': '''
            @circuit_breaker(
                failure_threshold=5,
                recovery_timeout=60,
                expected_exception=ServiceUnavailableError
            )
            async def call_auth_service(request):
                try:
                    return await auth_service.validate(request)
                except TimeoutError:
                    # フォールバック処理
                    return await validate_from_cache(request)
            '''
        }

問題2:API Gateway実装

解答

from fastapi import FastAPI, Request, HTTPException, Depends
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from typing import Optional, Dict, Any
import httpx
import jwt
import redis
import time
from datetime import datetime, timedelta
import asyncio
import logging

# FastAPI アプリケーション
app = FastAPI(title="API Gateway")

# 設定
class Config:
    JWT_SECRET = "your-secret-key"
    JWT_ALGORITHM = "HS256"
    REDIS_URL = "redis://localhost:6379"
    RATE_LIMIT_REQUESTS = 100
    RATE_LIMIT_WINDOW = 3600  # 1時間
    
    # サービスレジストリ
    SERVICES = {
        "users": "http://user-service:8080",
        "products": "http://product-service:8080",
        "orders": "http://order-service:8080"
    }

# 依存性注入
security = HTTPBearer()
redis_client = redis.from_url(Config.REDIS_URL, decode_responses=True)
logger = logging.getLogger(__name__)

# JWT検証
class JWTValidator:
    @staticmethod
    def decode_token(token: str) -> Dict[str, Any]:
        try:
            payload = jwt.decode(
                token,
                Config.JWT_SECRET,
                algorithms=[Config.JWT_ALGORITHM]
            )
            return payload
        except jwt.ExpiredSignatureError:
            raise HTTPException(status_code=401, detail="Token has expired")
        except jwt.InvalidTokenError:
            raise HTTPException(status_code=401, detail="Invalid token")

# レート制限
class RateLimiter:
    def __init__(self, redis_client):
        self.redis = redis_client
        
    async def check_rate_limit(self, user_id: str) -> bool:
        key = f"rate_limit:{user_id}"
        current_time = int(time.time())
        window_start = current_time - Config.RATE_LIMIT_WINDOW
        
        # スライディングウィンドウログアルゴリズム
        pipe = self.redis.pipeline()
        pipe.zremrangebyscore(key, 0, window_start)
        pipe.zadd(key, {str(current_time): current_time})
        pipe.zcount(key, window_start, current_time)
        pipe.expire(key, Config.RATE_LIMIT_WINDOW)
        
        results = pipe.execute()
        request_count = results[2]
        
        if request_count > Config.RATE_LIMIT_REQUESTS:
            return False
        return True

# 監査ログ
class AuditLogger:
    def __init__(self, redis_client):
        self.redis = redis_client
        
    async def log_request(self, user_id: str, request: Request, response_status: int):
        log_entry = {
            "timestamp": datetime.utcnow().isoformat(),
            "user_id": user_id,
            "method": request.method,
            "path": str(request.url.path),
            "query_params": dict(request.query_params),
            "client_ip": request.client.host,
            "user_agent": request.headers.get("user-agent"),
            "response_status": response_status,
            "request_id": request.state.request_id
        }
        
        # Redisにログを保存(有効期限30日)
        key = f"audit_log:{datetime.utcnow().strftime('%Y%m%d')}:{user_id}"
        self.redis.rpush(key, json.dumps(log_entry))
        self.redis.expire(key, 30 * 24 * 3600)
        
        # 非同期でログファイルにも書き込み
        logger.info(f"API Request: {log_entry}")

# 依存性注入用の関数
rate_limiter = RateLimiter(redis_client)
audit_logger = AuditLogger(redis_client)
jwt_validator = JWTValidator()

async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)):
    token = credentials.credentials
    user_data = jwt_validator.decode_token(token)
    
    # レート制限チェック
    if not await rate_limiter.check_rate_limit(user_data["sub"]):
        raise HTTPException(
            status_code=429,
            detail="Rate limit exceeded",
            headers={"Retry-After": "3600"}
        )
    
    return user_data

# プロキシ機能
class ServiceProxy:
    def __init__(self):
        self.client = httpx.AsyncClient(timeout=30.0)
        
    async def forward_request(
        self,
        service: str,
        path: str,
        method: str,
        headers: dict,
        body: bytes,
        params: dict
    ) -> httpx.Response:
        if service not in Config.SERVICES:
            raise HTTPException(status_code=404, detail="Service not found")
        
        service_url = Config.SERVICES[service]
        url = f"{service_url}/{path}"
        
        # 内部ヘッダーの追加
        internal_headers = headers.copy()
        internal_headers["X-Internal-Request"] = "true"
        internal_headers["X-Request-ID"] = headers.get("X-Request-ID", "")
        
        try:
            response = await self.client.request(
                method=method,
                url=url,
                headers=internal_headers,
                content=body,
                params=params
            )
            return response
        except httpx.TimeoutException:
            raise HTTPException(status_code=504, detail="Service timeout")
        except httpx.HTTPError:
            raise HTTPException(status_code=502, detail="Service error")

proxy = ServiceProxy()

# ミドルウェア
@app.middleware("http")
async def add_request_id(request: Request, call_next):
    request_id = request.headers.get("X-Request-ID", str(uuid.uuid4()))
    request.state.request_id = request_id
    
    response = await call_next(request)
    response.headers["X-Request-ID"] = request_id
    
    return response

# エンドポイント
@app.get("/health")
async def health_check():
    return {"status": "healthy", "timestamp": datetime.utcnow().isoformat()}

@app.api_route("/{service}/{path:path}", methods=["GET", "POST", "PUT", "DELETE"])
async def gateway_proxy(
    service: str,
    path: str,
    request: Request,
    current_user: dict = Depends(get_current_user)
):
    # リクエストボディの読み取り
    body = await request.body()
    
    # ヘッダーの準備
    headers = dict(request.headers)
    headers["X-User-ID"] = current_user["sub"]
    headers["X-User-Roles"] = ",".join(current_user.get("roles", []))
    
    # サービスへのプロキシ
    response = await proxy.forward_request(
        service=service,
        path=path,
        method=request.method,
        headers=headers,
        body=body,
        params=dict(request.query_params)
    )
    
    # 監査ログ
    await audit_logger.log_request(
        user_id=current_user["sub"],
        request=request,
        response_status=response.status_code
    )
    
    # レスポンスの返却
    return Response(
        content=response.content,
        status_code=response.status_code,
        headers=dict(response.headers)
    )

# 管理用エンドポイント
@app.get("/admin/metrics")
async def get_metrics(current_user: dict = Depends(get_current_user)):
    if "admin" not in current_user.get("roles", []):
        raise HTTPException(status_code=403, detail="Admin access required")
    
    # メトリクスの収集
    metrics = {
        "timestamp": datetime.utcnow().isoformat(),
        "active_connections": len(proxy.client._pool._connections),
        "rate_limit_status": {}
    }
    
    # レート制限状況の取得
    for key in redis_client.scan_iter("rate_limit:*"):
        user_id = key.split(":")[-1]
        count = redis_client.zcount(key, "-inf", "+inf")
        metrics["rate_limit_status"][user_id] = {
            "current_requests": count,
            "limit": Config.RATE_LIMIT_REQUESTS
        }
    
    return metrics

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

問題3:Zero Trust設計

解答

金融システムへのZero Trust原則適用設計

class FinancialZeroTrustDesign:
    """金融システムのZero Trust設計"""
    
    def risk_assessment_factors(self):
        """リスク評価要素(10個以上)"""
        return {
            'device_factors': {
                'device_trust_score': {
                    'weight': 15,
                    'calculation': '''
                    - 管理デバイス: 100
                    - 登録済み個人デバイス: 70
                    - 未登録デバイス: 30
                    - ルート化/Jailbreak: 0
                    '''
                },
                'device_compliance': {
                    'weight': 10,
                    'checks': [
                        'OS最新パッチ適用',
                        'アンチウイルス有効',
                        'ディスク暗号化',
                        'ファイアウォール有効'
                    ]
                }
            },
            
            'location_factors': {
                'geolocation_risk': {
                    'weight': 20,
                    'scoring': '''
                    def calculate_location_risk(ip_address, gps_coords):
                        country_risk = get_country_risk_score(ip_address)
                        
                        # 高リスク国からのアクセス
                        if country_risk > 80:
                            return 100
                        
                        # オフィスからのアクセス
                        if is_office_location(gps_coords):
                            return 10
                        
                        # VPN使用
                        if is_vpn(ip_address):
                            return 70
                        
                        return country_risk
                    '''
                },
                'impossible_travel': {
                    'weight': 25,
                    'detection': 'Previous location vs current location / time'
                }
            },
            
            'behavioral_factors': {
                'access_pattern': {
                    'weight': 10,
                    'analysis': [
                        '通常のアクセス時間帯か',
                        'アクセス頻度の異常',
                        '通常と異なるアプリケーション使用'
                    ]
                },
                'transaction_behavior': {
                    'weight': 20,
                    'checks': [
                        '取引金額の異常',
                        '送金先の異常',
                        '取引頻度の急激な変化'
                    ]
                }
            },
            
            'authentication_factors': {
                'auth_method_strength': {
                    'weight': 15,
                    'scores': {
                        'password_only': 30,
                        'password_mfa_sms': 60,
                        'password_mfa_app': 80,
                        'biometric_mfa': 95,
                        'hardware_key': 100
                    }
                },
                'session_age': {
                    'weight': 5,
                    'calculation': 'exponential_decay(session_duration)'
                }
            },
            
            'network_factors': {
                'network_reputation': {
                    'weight': 10,
                    'checks': [
                        'Known malicious IP',
                        'Tor exit node',
                        'Public WiFi',
                        'Corporate network'
                    ]
                },
                'connection_security': {
                    'weight': 5,
                    'requirements': [
                        'TLS 1.3',
                        'Certificate pinning',
                        'No weak ciphers'
                    ]
                }
            },
            
            'resource_sensitivity': {
                'data_classification': {
                    'weight': 30,
                    'levels': {
                        'public': 10,
                        'internal': 30,
                        'confidential': 60,
                        'restricted': 100
                    }
                },
                'operation_risk': {
                    'weight': 25,
                    'operations': {
                        'read_balance': 20,
                        'internal_transfer': 50,
                        'external_transfer': 80,
                        'wire_transfer': 100,
                        'account_closure': 90
                    }
                }
            }
        }
    
    def dynamic_access_control(self):
        """動的アクセス制御ルール"""
        return {
            'rule_engine': '''
            class DynamicAccessController:
                def evaluate_access(self, context: AccessContext) -> AccessDecision:
                    risk_score = self.calculate_risk_score(context)
                    
                    # リスクレベルの判定
                    if risk_score < 30:
                        risk_level = "LOW"
                    elif risk_score < 60:
                        risk_level = "MEDIUM"
                    elif risk_score < 80:
                        risk_level = "HIGH"
                    else:
                        risk_level = "CRITICAL"
                    
                    # リソース感度の取得
                    resource_sensitivity = self.get_resource_sensitivity(context.resource)
                    
                    # 動的ルールの適用
                    return self.apply_rules(risk_level, resource_sensitivity, context)
            ''',
            
            'access_rules': {
                'low_risk': {
                    'public_data': 'ALLOW',
                    'internal_data': 'ALLOW',
                    'confidential_data': 'ALLOW',
                    'restricted_data': 'ALLOW_WITH_LOGGING'
                },
                
                'medium_risk': {
                    'public_data': 'ALLOW',
                    'internal_data': 'ALLOW',
                    'confidential_data': 'REQUIRE_MFA',
                    'restricted_data': 'REQUIRE_MFA_AND_APPROVAL'
                },
                
                'high_risk': {
                    'public_data': 'ALLOW',
                    'internal_data': 'REQUIRE_MFA',
                    'confidential_data': 'REQUIRE_STEP_UP_AUTH',
                    'restricted_data': 'DENY'
                },
                
                'critical_risk': {
                    'public_data': 'ALLOW_READ_ONLY',
                    'internal_data': 'DENY',
                    'confidential_data': 'DENY',
                    'restricted_data': 'DENY_AND_ALERT'
                }
            },
            
            'step_up_authentication': '''
            async def require_step_up_auth(user: User, required_level: str):
                current_auth_level = user.current_auth_level
                
                if required_level == "MFA" and current_auth_level < 2:
                    return await prompt_mfa(user)
                
                elif required_level == "BIOMETRIC" and current_auth_level < 3:
                    return await prompt_biometric(user)
                
                elif required_level == "TRANSACTION_SIGNING":
                    return await prompt_transaction_signing(user)
            '''
        }
    
    def continuous_verification(self):
        """継続的検証の実装"""
        return {
            'verification_triggers': [
                'Periodic time-based (every 5 minutes)',
                'Resource access attempt',
                'Behavioral anomaly detected',
                'Risk score change > 20 points',
                'Network change',
                'New device detected'
            ],
            
            'implementation': '''
            class ContinuousVerificationEngine:
                def __init__(self):
                    self.verification_interval = timedelta(minutes=5)
                    self.risk_threshold_delta = 20
                
                async def monitor_session(self, session_id: str):
                    session = await self.get_session(session_id)
                    last_risk_score = session.risk_score
                    
                    while session.active:
                        # 定期的な検証
                        await asyncio.sleep(self.verification_interval.seconds)
                        
                        # リスク再評価
                        current_context = await self.build_context(session)
                        new_risk_score = await self.calculate_risk(current_context)
                        
                        # 大幅なリスク変化の検出
                        if abs(new_risk_score - last_risk_score) > self.risk_threshold_delta:
                            await self.handle_risk_change(session, new_risk_score)
                        
                        # 行動分析
                        anomalies = await self.detect_anomalies(session)
                        if anomalies:
                            await self.handle_anomalies(session, anomalies)
                        
                        last_risk_score = new_risk_score
            '''
        }
    
    def incident_response_flow(self):
        """インシデント対応フロー"""
        return {
            'detection_to_response': '''
            class IncidentResponseOrchestrator:
                async def handle_security_incident(self, incident: SecurityIncident):
                    # 1. 即座の封じ込め
                    if incident.severity >= Severity.HIGH:
                        await self.immediate_containment(incident)
                    
                    # 2. 調査
                    investigation = await self.investigate(incident)
                    
                    # 3. 影響評価
                    impact = await self.assess_impact(investigation)
                    
                    # 4. 対応実施
                    response_plan = self.create_response_plan(impact)
                    await self.execute_response(response_plan)
                    
                    # 5. 復旧
                    await self.recovery_actions(incident)
                    
                    # 6. 事後分析
                    await self.post_incident_analysis(incident)
            ''',
            
            'response_actions': {
                'immediate_containment': [
                    'セッションの即時無効化',
                    'アカウントの一時凍結',
                    '関連するAPIキーの無効化',
                    'IPアドレスのブロック'
                ],
                
                'investigation_steps': [
                    'ログの収集と分析',
                    '影響を受けたリソースの特定',
                    'アクセスパターンの分析',
                    '関連するセッションの調査'
                ],
                
                'recovery_actions': [
                    'パスワードリセット要求',
                    'MFA再登録',
                    'デバイス再認証',
                    'セキュリティ質問の更新'
                ],
                
                'notification_matrix': {
                    'low_severity': ['Security team'],
                    'medium_severity': ['Security team', 'User'],
                    'high_severity': ['Security team', 'User', 'Management'],
                    'critical_severity': ['All above', 'CISO', 'Legal', 'PR']
                }
            }
        }

問題4:サービスメッシュ設定

解答

Istioを使用したセキュリティポリシー実装

# 1. mTLS設定 - すべてのサービス間で必須
apiVersion: security.istio.io/v1beta1
kind: PeerAuthentication
metadata:
  name: default
  namespace: istio-system
spec:
  mtls:
    mode: STRICT

---
# 2. データベースサービスへのアクセス制限
apiVersion: security.istio.io/v1beta1
kind: AuthorizationPolicy
metadata:
  name: database-access-policy
  namespace: production
spec:
  selector:
    matchLabels:
      app: database
  action: ALLOW
  rules:
  - from:
    - source:
        principals: 
        - "cluster.local/ns/production/sa/order-service"
        - "cluster.local/ns/production/sa/user-service"
        - "cluster.local/ns/production/sa/inventory-service"
    to:
    - operation:
        methods: ["GET", "POST", "PUT", "DELETE"]
        ports: ["5432"]

---
# 3. API Gateway経由のみ外部トラフィックを許可
apiVersion: networking.istio.io/v1beta1
kind: Gateway
metadata:
  name: api-gateway
  namespace: production
spec:
  selector:
    istio: ingressgateway
  servers:
  - port:
      number: 443
      name: https
      protocol: HTTPS
    tls:
      mode: SIMPLE
      credentialName: api-cert
    hosts:
    - "api.example.com"

---
apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
  name: api-routing
  namespace: production
spec:
  hosts:
  - "api.example.com"
  gateways:
  - api-gateway
  http:
  - match:
    - uri:
        prefix: "/api/v1/"
    route:
    - destination:
        host: api-gateway-service
        port:
          number: 8080

---
# 外部からの直接アクセスを拒否
apiVersion: security.istio.io/v1beta1
kind: AuthorizationPolicy
metadata:
  name: deny-external-access
  namespace: production
spec:
  selector:
    matchLabels:
      internal: "true"
  action: DENY
  rules:
  - from:
    - source:
        notNamespaces: ["production", "istio-system"]

---
# 4. サービスごとのレート制限
apiVersion: v1
kind: ConfigMap
metadata:
  name: ratelimit-config
  namespace: production
data:
  config.yaml: |
    domain: production-ratelimit
    descriptors:
      - key: service
        value: "user-service"
        rate_limit:
          unit: minute
          requests_per_unit: 1000
      - key: service
        value: "order-service"
        rate_limit:
          unit: minute
          requests_per_unit: 500
      - key: service
        value: "payment-service"
        rate_limit:
          unit: minute
          requests_per_unit: 100
      - key: service
        value: "analytics-service"
        rate_limit:
          unit: minute
          requests_per_unit: 2000

---
apiVersion: v1
kind: Service
metadata:
  name: ratelimit
  namespace: production
spec:
  ports:
  - port: 8081
    protocol: TCP
  selector:
    app: ratelimit

---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: ratelimit
  namespace: production
spec:
  replicas: 2
  selector:
    matchLabels:
      app: ratelimit
  template:
    metadata:
      labels:
        app: ratelimit
    spec:
      containers:
      - name: ratelimit
        image: envoyproxy/ratelimit:v1.4.0
        command: ["/bin/ratelimit"]
        env:
        - name: LOG_LEVEL
          value: debug
        - name: REDIS_SOCKET_TYPE
          value: tcp
        - name: REDIS_URL
          value: redis:6379
        - name: USE_STATSD
          value: "false"
        - name: RUNTIME_ROOT
          value: /data
        - name: RUNTIME_SUBDIRECTORY
          value: ratelimit
        ports:
        - containerPort: 8080
        - containerPort: 8081
        - containerPort: 6070
        volumeMounts:
        - name: config-volume
          mountPath: /data/ratelimit/config
      volumes:
      - name: config-volume
        configMap:
          name: ratelimit-config

---
# EnvoyFilterでレート制限を適用
apiVersion: networking.istio.io/v1alpha3
kind: EnvoyFilter
metadata:
  name: ratelimit-filter
  namespace: production
spec:
  configPatches:
  - applyTo: HTTP_FILTER
    match:
      context: SIDECAR_INBOUND
      listener:
        filterChain:
          filter:
            name: "envoy.filters.network.http_connection_manager"
    patch:
      operation: INSERT_BEFORE
      value:
        name: envoy.filters.http.ratelimit
        typed_config:
          "@type": type.googleapis.com/envoy.extensions.filters.http.ratelimit.v3.RateLimit
          domain: production-ratelimit
          failure_mode_deny: false
          rate_limit_service:
            grpc_service:
              envoy_grpc:
                cluster_name: rate_limit_service
              timeout: 0.25s
            transport_api_version: V3

---
# サービスメッシュ全体の可観測性設定
apiVersion: v1
kind: ConfigMap
metadata:
  name: istio-custom-telemetry
  namespace: istio-system
data:
  custom_metrics.yaml: |
    telemetry:
    - name: security-metrics
      dimensions:
        source_service: source.workload.name | "unknown"
        destination_service: destination.service.name | "unknown"
        auth_result: connection.mtls | "none"
        response_code: response.code | 0
      metrics:
      - name: security_request_count
        dimensions:
          - source_service
          - destination_service
          - auth_result
          - response_code
        value: "1"
      - name: unauthorized_access_attempts
        dimensions:
          - source_service
          - destination_service
        value: response.code == 403 ? 1 : 0

問題5:パフォーマンス最適化

解答

認証処理ボトルネック最適化計画

class AuthPerformanceOptimization:
    """認証パフォーマンス最適化"""
    
    def current_analysis(self):
        """現状分析"""
        return {
            'latency_measurements': {
                'auth_service_calls': {
                    'p50': '150ms',
                    'p95': '500ms',
                    'p99': '1200ms',
                    'breakdown': {
                        'network': '20ms',
                        'jwt_validation': '80ms',
                        'database_lookup': '200ms',
                        'permission_check': '150ms',
                        'response_serialization': '50ms'
                    }
                },
                
                'bottleneck_identification': '''
                # プロファイリング結果
                1. DB クエリ (40% of time)
                   - User lookup: 150ms avg
                   - Permission fetch: 100ms avg
                   - N+1 問題あり
                
                2. JWT 検証 (16% of time)
                   - RSA署名検証: 80ms
                   - 毎回公開鍵取得
                
                3. 権限チェック (30% of time)
                   - 複雑なRBACルール
                   - キャッシュなし
                
                4. ネットワーク (14% of time)
                   - サービス間の往復
                   - TLS handshake
                '''
            }
        }
    
    def optimization_strategies(self):
        """最適化案(5つ以上)"""
        return {
            '1_caching_strategy': {
                'description': 'マルチレベルキャッシング',
                'implementation': '''
                class MultiLevelCache:
                    def __init__(self):
                        # L1: プロセス内キャッシュ(超高速)
                        self.l1_cache = LRUCache(maxsize=10000, ttl=60)
                        
                        # L2: Redis(高速、分散)
                        self.l2_cache = RedisCache(ttl=300)
                        
                        # L3: CDN(エッジキャッシング)
                        self.l3_cache = CDNCache(ttl=600)
                    
                    async def get_user_permissions(self, user_id: str):
                        # L1チェック
                        if data := self.l1_cache.get(user_id):
                            return data
                        
                        # L2チェック
                        if data := await self.l2_cache.get(user_id):
                            self.l1_cache.set(user_id, data)
                            return data
                        
                        # DBから取得
                        data = await self.fetch_from_db(user_id)
                        
                        # 全レベルにキャッシュ
                        await self.cache_all_levels(user_id, data)
                        
                        return data
                ''',
                'expected_improvement': '60-80% reduction in DB calls'
            },
            
            '2_jwt_optimization': {
                'description': 'JWT検証の最適化',
                'implementation': '''
                class OptimizedJWTValidator:
                    def __init__(self):
                        # 公開鍵のキャッシュ
                        self.key_cache = {}
                        
                        # より高速なアルゴリズムへ移行
                        self.algorithm = 'ES256'  # ECDSAは RSAより高速
                        
                        # JWTのプリバリデーション
                        self.prevalidation_cache = TTLCache(maxsize=10000, ttl=300)
                    
                    def validate_token_fast(self, token: str):
                        # キャッシュチェック
                        if token in self.prevalidation_cache:
                            return self.prevalidation_cache[token]
                        
                        # 高速な基本チェック
                        if not self.quick_format_check(token):
                            return None
                        
                        # 署名検証(最適化済み)
                        claims = self.verify_signature_optimized(token)
                        
                        # キャッシュに保存
                        self.prevalidation_cache[token] = claims
                        
                        return claims
                ''',
                'expected_improvement': 'JWT validation from 80ms to 10ms'
            },
            
            '3_database_optimization': {
                'description': 'データベースクエリ最適化',
                'implementation': '''
                -- 複合インデックスの追加
                CREATE INDEX idx_user_permissions ON user_permissions(user_id, resource_type, permission);
                
                -- マテリアライズドビューの作成
                CREATE MATERIALIZED VIEW user_effective_permissions AS
                SELECT 
                    u.id as user_id,
                    r.name as role,
                    array_agg(DISTINCT p.permission) as permissions
                FROM users u
                JOIN user_roles ur ON u.id = ur.user_id
                JOIN roles r ON ur.role_id = r.id
                JOIN role_permissions rp ON r.id = rp.role_id
                JOIN permissions p ON rp.permission_id = p.id
                GROUP BY u.id, r.name;
                
                -- コネクションプーリングの最適化
                class OptimizedDBPool:
                    def __init__(self):
                        self.pool = asyncpg.create_pool(
                            min_size=20,
                            max_size=100,
                            max_queries=50000,
                            max_inactive_connection_lifetime=300,
                            command_timeout=10
                        )
                ''',
                'expected_improvement': 'Query time from 200ms to 20ms'
            },
            
            '4_service_mesh_optimization': {
                'description': 'サービスメッシュレベルの最適化',
                'implementation': '''
                # gRPC の使用(HTTPより効率的)
                service AuthService {
                    rpc ValidateToken(TokenRequest) returns (TokenResponse);
                    rpc GetPermissions(PermissionRequest) returns (PermissionResponse);
                }
                
                # バッチリクエスト
                service BatchAuthService {
                    rpc BatchValidate(BatchTokenRequest) returns (BatchTokenResponse);
                }
                
                # コネクション再利用
                class ServiceClient:
                    def __init__(self):
                        self.channel_pool = {
                            'auth': grpc.aio.insecure_channel(
                                'auth-service:50051',
                                options=[
                                    ('grpc.keepalive_time_ms', 10000),
                                    ('grpc.keepalive_timeout_ms', 5000),
                                    ('grpc.http2.max_pings_without_data', 0),
                                    ('grpc.http2.min_time_between_pings_ms', 10000),
                                ]
                            )
                        }
                ''',
                'expected_improvement': 'Network overhead reduced by 40%'
            },
            
            '5_edge_computing': {
                'description': 'エッジでの認証処理',
                'implementation': '''
                # CloudFlare Workers での JWT検証
                addEventListener('fetch', event => {
                    event.respondWith(handleRequest(event.request))
                })
                
                async function handleRequest(request) {
                    const token = request.headers.get('Authorization')
                    
                    // エッジでの高速JWT検証
                    const claims = await verifyJWT(token)
                    
                    if (!claims) {
                        return new Response('Unauthorized', { status: 401 })
                    }
                    
                    // 検証済みヘッダーを追加してオリジンへ
                    request.headers.set('X-Verified-User', claims.sub)
                    request.headers.set('X-Verified-Roles', claims.roles.join(','))
                    
                    return fetch(request)
                }
                ''',
                'expected_improvement': 'Reduce origin auth calls by 90%'
            }
        }
    
    def implementation_priority(self):
        """実装優先順位"""
        return {
            'priority_matrix': {
                'immediate': [
                    {
                        'task': 'L1/L2キャッシング実装',
                        'effort': '1 week',
                        'impact': 'High',
                        'risk': 'Low'
                    },
                    {
                        'task': 'データベースインデックス追加',
                        'effort': '2 days',
                        'impact': 'Medium',
                        'risk': 'Low'
                    }
                ],
                
                'short_term': [
                    {
                        'task': 'JWT アルゴリズム変更',
                        'effort': '2 weeks',
                        'impact': 'Medium',
                        'risk': 'Medium'
                    },
                    {
                        'task': 'コネクションプーリング最適化',
                        'effort': '1 week',
                        'impact': 'Medium',
                        'risk': 'Low'
                    }
                ],
                
                'long_term': [
                    {
                        'task': 'gRPC移行',
                        'effort': '1 month',
                        'impact': 'High',
                        'risk': 'Medium'
                    },
                    {
                        'task': 'エッジコンピューティング',
                        'effort': '2 months',
                        'impact': 'Very High',
                        'risk': 'Medium'
                    }
                ]
            }
        }
    
    def expected_results(self):
        """期待される改善効果"""
        return {
            'performance_targets': {
                'current': {
                    'p50': '150ms',
                    'p95': '500ms',
                    'p99': '1200ms',
                    'throughput': '1000 req/s'
                },
                
                'after_optimization': {
                    'p50': '20ms',
                    'p95': '50ms',
                    'p99': '100ms',
                    'throughput': '10000 req/s'
                }
            },
            
            'cost_benefit': {
                'implementation_cost': '$50,000',
                'infrastructure_savings': '$20,000/month',
                'roi_period': '3 months'
            },
            
            'monitoring_plan': '''
            # Prometheusメトリクス
            - auth_request_duration_seconds
            - auth_cache_hit_rate
            - auth_db_query_duration_seconds
            - auth_jwt_validation_duration_seconds
            
            # アラート設定
            - P95 latency > 100ms
            - Cache hit rate < 80%
            - Error rate > 1%
            '''
        }