第13章:セキュリティとコンプライアンス
本章の意義と学習目標
なぜPodmanでマイクロサービスを学ぶのか
マイクロサービスアーキテクチャは現代のソフトウェア開発の主流となっていますが、その複雑性により開発環境の構築が課題となっています。Podmanは以下の価値を提供します:
- 開発環境の簡素化: Podとネットワーク機能による簡単なサービス間連携
- 本番環境との一貫性: Kubernetes互換性による開発から本番への円滑な移行
- リソース効率: 軽量なコンテナによる多数のサービス同時実行
- セキュリティ: Rootlessモードによる安全な開発環境
本章では、Podmanを使用した実践的なマイクロサービス開発を学びます。
13.1 マイクロサービスの基本概念
13.1.1 モノリスからマイクロサービスへ
なぜマイクロサービスが必要か
モノリシックアーキテクチャの課題:
- スケーラビリティの制限
- 技術スタックの固定化
- デプロイメントリスク
- チーム間の依存性
マイクロサービスがもたらす価値:
- 独立したスケーリング
- 技術選択の自由
- 障害の局所化
- チームの自律性
13.1.2 設計原則
単一責任の原則
# service-boundaries.yaml
services:
user-service:
responsibility: "ユーザー認証とプロファイル管理"
data_ownership: "users, profiles, sessions"
product-service:
responsibility: "商品カタログと在庫管理"
data_ownership: "products, inventory"
order-service:
responsibility: "注文処理とワークフロー"
data_ownership: "orders, order_items"
payment-service:
responsibility: "決済処理と請求"
data_ownership: "payments, invoices"
13.2 Podmanによるマイクロサービス環境構築
13.2.1 サービスディスカバリー
DNSベースのサービスディスカバリー
# ネットワーク作成
podman network create microservices
# データベースサービス
podman run -d \
--name postgres \
--network microservices \
-e POSTGRES_PASSWORD=secret \
postgres:15
# ユーザーサービス
podman run -d \
--name user-service \
--network microservices \
-e DB_HOST=postgres \
-e DB_NAME=users \
user-service:latest
# APIゲートウェイ
podman run -d \
--name api-gateway \
--network microservices \
-p 8080:8080 \
-e USER_SERVICE_URL=http://user-service:8000 \
api-gateway:latest
13.2.2 サービス間通信パターン
同期通信(REST)
# api_gateway.py
import aiohttp
from flask import Flask, jsonify
import asyncio
app = Flask(__name__)
class ServiceClient:
def __init__(self, base_url):
self.base_url = base_url
self.session = None
async def __aenter__(self):
self.session = aiohttp.ClientSession()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.session.close()
async def get(self, path):
async with self.session.get(f"{self.base_url}{path}") as resp:
return await resp.json()
@app.route('/api/users/<user_id>/orders')
async def get_user_orders(user_id):
async with ServiceClient("http://user-service:8000") as user_client, \
ServiceClient("http://order-service:8001") as order_client:
# 並列API呼び出し
user_task = asyncio.create_task(user_client.get(f"/users/{user_id}"))
orders_task = asyncio.create_task(order_client.get(f"/users/{user_id}/orders"))
user, orders = await asyncio.gather(user_task, orders_task)
return jsonify({
"user": user,
"orders": orders
})
非同期通信(メッセージング)
# order_service.py
import pika
import json
class OrderService:
def __init__(self):
self.connection = pika.BlockingConnection(
pika.ConnectionParameters('rabbitmq')
)
self.channel = self.connection.channel()
# イベント発行用のExchange
self.channel.exchange_declare(
exchange='order_events',
exchange_type='topic'
)
def create_order(self, order_data):
# 注文処理
order = self.process_order(order_data)
# イベント発行
event = {
'event_type': 'order_created',
'order_id': order['id'],
'user_id': order['user_id'],
'total': order['total'],
'timestamp': datetime.utcnow().isoformat()
}
self.channel.basic_publish(
exchange='order_events',
routing_key='order.created',
body=json.dumps(event)
)
return order
13.3 コンテナオーケストレーション
13.3.1 docker-compose.yml
version: '3.8'
services:
# インフラストラクチャサービス
postgres:
image: postgres:15
environment:
POSTGRES_USER: admin
POSTGRES_PASSWORD: secret
volumes:
- postgres-data:/var/lib/postgresql/data
- ./init-databases.sql:/docker-entrypoint-initdb.d/init.sql
healthcheck:
test: ["CMD-SHELL", "pg_isready -U admin"]
interval: 10s
timeout: 5s
retries: 5
rabbitmq:
image: rabbitmq:3-management
environment:
RABBITMQ_DEFAULT_USER: admin
RABBITMQ_DEFAULT_PASS: secret
ports:
- "15672:15672"
volumes:
- rabbitmq-data:/var/lib/rabbitmq
redis:
image: redis:7-alpine
command: redis-server --appendonly yes
volumes:
- redis-data:/data
# アプリケーションサービス
user-service:
build: ./services/user-service
environment:
DATABASE_URL: postgresql://admin:secret@postgres/users
REDIS_URL: redis://redis:6379
JWT_SECRET: ${JWT_SECRET}
depends_on:
postgres:
condition: service_healthy
redis:
condition: service_started
deploy:
replicas: 2
resources:
limits:
cpus: '0.5'
memory: 512M
order-service:
build: ./services/order-service
environment:
DATABASE_URL: postgresql://admin:secret@postgres/orders
RABBITMQ_URL: amqp://admin:secret@rabbitmq:5672
depends_on:
postgres:
condition: service_healthy
rabbitmq:
condition: service_started
deploy:
replicas: 3
payment-service:
build: ./services/payment-service
environment:
DATABASE_URL: postgresql://admin:secret@postgres/payments
RABBITMQ_URL: amqp://admin:secret@rabbitmq:5672
STRIPE_API_KEY: ${STRIPE_API_KEY}
depends_on:
postgres:
condition: service_healthy
rabbitmq:
condition: service_started
# API Gateway
api-gateway:
build: ./api-gateway
ports:
- "8080:8080"
environment:
USER_SERVICE_URL: http://user-service:8000
ORDER_SERVICE_URL: http://order-service:8001
PAYMENT_SERVICE_URL: http://payment-service:8002
depends_on:
- user-service
- order-service
- payment-service
# 監視サービス
prometheus:
image: prom/prometheus:latest
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
- prometheus-data:/prometheus
ports:
- "9090:9090"
grafana:
image: grafana/grafana:latest
environment:
- GF_SECURITY_ADMIN_PASSWORD=admin
volumes:
- grafana-data:/var/lib/grafana
- ./grafana/dashboards:/etc/grafana/provisioning/dashboards
ports:
- "3000:3000"
depends_on:
- prometheus
volumes:
postgres-data:
rabbitmq-data:
redis-data:
prometheus-data:
grafana-data:
networks:
default:
driver: bridge
13.3.2 Podman Podでのマイクロサービス
#!/bin/bash
# deploy-microservices.sh
# Podの作成(共有ネットワーク名前空間)
podman pod create \
--name microservices-pod \
--publish 8080:8080 \
--publish 9090:9090 \
--publish 3000:3000
# PostgreSQL
podman run -d \
--pod microservices-pod \
--name postgres \
-e POSTGRES_PASSWORD=secret \
-v postgres-data:/var/lib/postgresql/data \
postgres:15
# Redis
podman run -d \
--pod microservices-pod \
--name redis \
-v redis-data:/data \
redis:7-alpine
# RabbitMQ
podman run -d \
--pod microservices-pod \
--name rabbitmq \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=secret \
rabbitmq:3-management
# アプリケーションサービス
for service in user-service order-service payment-service api-gateway; do
podman run -d \
--pod microservices-pod \
--name $service \
--env-file ./$service/.env \
$service:latest
done
13.4 サービスメッシュパターン
13.4.1 サイドカープロキシ
# envoy-sidecar.yaml
apiVersion: v1
kind: Pod
metadata:
name: service-with-envoy
spec:
containers:
- name: service
image: myservice:latest
ports:
- containerPort: 8000
- name: envoy
image: envoyproxy/envoy:v1.25-latest
ports:
- containerPort: 9901 # Admin
- containerPort: 10000 # Proxy
volumeMounts:
- name: envoy-config
mountPath: /etc/envoy
command: ["envoy"]
args: ["-c", "/etc/envoy/envoy.yaml"]
volumes:
- name: envoy-config
configMap:
name: envoy-config
Envoy設定
# envoy.yaml
static_resources:
listeners:
- name: listener_0
address:
socket_address:
address: 0.0.0.0
port_value: 10000
filter_chains:
- filters:
- name: envoy.filters.network.http_connection_manager
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager
stat_prefix: ingress_http
route_config:
name: local_route
virtual_hosts:
- name: local_service
domains: ["*"]
routes:
- match:
prefix: "/"
route:
cluster: local_service
timeout: 30s
retry_policy:
retry_on: "5xx"
num_retries: 3
per_try_timeout: 10s
http_filters:
- name: envoy.filters.http.router
clusters:
- name: local_service
connect_timeout: 5s
type: STATIC
lb_policy: ROUND_ROBIN
load_assignment:
cluster_name: local_service
endpoints:
- lb_endpoints:
- endpoint:
address:
socket_address:
address: 127.0.0.1
port_value: 8000
health_checks:
- timeout: 3s
interval: 5s
unhealthy_threshold: 2
healthy_threshold: 2
http_health_check:
path: /health
13.5 分散トレーシング
13.5.1 OpenTelemetryの実装
# tracing.py
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.instrumentation.flask import FlaskInstrumentor
from opentelemetry.instrumentation.requests import RequestsInstrumentor
from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor
def setup_tracing(service_name, otlp_endpoint="otel-collector:4317"):
"""分散トレーシングのセットアップ"""
# トレーサープロバイダー設定
trace.set_tracer_provider(TracerProvider())
tracer_provider = trace.get_tracer_provider()
# OTLPエクスポーター
otlp_exporter = OTLPSpanExporter(
endpoint=otlp_endpoint,
insecure=True
)
# バッチプロセッサー
span_processor = BatchSpanProcessor(otlp_exporter)
tracer_provider.add_span_processor(span_processor)
# 自動インストルメンテーション
FlaskInstrumentor().instrument()
RequestsInstrumentor().instrument()
SQLAlchemyInstrumentor().instrument()
return trace.get_tracer(service_name)
# 使用例
from flask import Flask, request
import requests
app = Flask(__name__)
tracer = setup_tracing("order-service")
@app.route('/orders', methods=['POST'])
def create_order():
with tracer.start_as_current_span("create_order") as span:
# リクエスト情報をスパンに追加
span.set_attribute("user.id", request.json.get('user_id'))
span.set_attribute("order.total", request.json.get('total'))
# ユーザーサービス呼び出し
with tracer.start_as_current_span("validate_user"):
user_response = requests.get(
f"http://user-service/users/{request.json['user_id']}"
)
# 在庫確認
with tracer.start_as_current_span("check_inventory"):
for item in request.json['items']:
inventory_response = requests.post(
"http://inventory-service/check",
json={'product_id': item['product_id'], 'quantity': item['quantity']}
)
# 注文作成
with tracer.start_as_current_span("save_order"):
order = create_order_in_db(request.json)
return jsonify(order), 201
13.5.2 分散ログの相関
# logging_config.py
import logging
import json
from opentelemetry import trace
class StructuredFormatter(logging.Formatter):
def format(self, record):
# 現在のスパンコンテキストを取得
span = trace.get_current_span()
span_context = span.get_span_context() if span else None
log_data = {
'timestamp': self.formatTime(record),
'level': record.levelname,
'service': 'order-service',
'message': record.getMessage(),
'logger': record.name,
}
# トレース情報を追加
if span_context and span_context.is_valid:
log_data['trace_id'] = format(span_context.trace_id, '032x')
log_data['span_id'] = format(span_context.span_id, '016x')
# 追加のコンテキスト情報
if hasattr(record, 'user_id'):
log_data['user_id'] = record.user_id
return json.dumps(log_data)
# ロガー設定
def setup_logging():
handler = logging.StreamHandler()
handler.setFormatter(StructuredFormatter())
logger = logging.getLogger()
logger.addHandler(handler)
logger.setLevel(logging.INFO)
return logger
13.6 レジリエンスパターン
13.6.1 サーキットブレーカー
# circuit_breaker.py
import time
from enum import Enum
from functools import wraps
import threading
class CircuitState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
class CircuitBreaker:
def __init__(self, failure_threshold=5, recovery_timeout=60, expected_exception=Exception):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.expected_exception = expected_exception
self.failure_count = 0
self.last_failure_time = None
self.state = CircuitState.CLOSED
self._lock = threading.Lock()
def __call__(self, func):
@wraps(func)
def wrapper(*args, **kwargs):
with self._lock:
if self.state == CircuitState.OPEN:
if self._should_attempt_reset():
self.state = CircuitState.HALF_OPEN
else:
raise Exception(f"Circuit breaker is OPEN for {func.__name__}")
try:
result = func(*args, **kwargs)
with self._lock:
self._on_success()
return result
except self.expected_exception as e:
with self._lock:
self._on_failure()
raise e
return wrapper
def _should_attempt_reset(self):
return (
self.last_failure_time and
time.time() - self.last_failure_time >= self.recovery_timeout
)
def _on_success(self):
self.failure_count = 0
self.state = CircuitState.CLOSED
def _on_failure(self):
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
# 使用例
@CircuitBreaker(failure_threshold=3, recovery_timeout=30)
def call_payment_service(payment_data):
response = requests.post(
"http://payment-service/process",
json=payment_data,
timeout=5
)
response.raise_for_status()
return response.json()
13.6.2 リトライとタイムアウト
# resilience.py
import time
import random
from functools import wraps
def exponential_backoff_retry(max_retries=3, base_delay=1, max_delay=60):
"""指数バックオフによるリトライ"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
retries = 0
delay = base_delay
while retries < max_retries:
try:
return func(*args, **kwargs)
except Exception as e:
retries += 1
if retries == max_retries:
raise e
# ジッターを加えた指数バックオフ
jitter = random.uniform(0, delay * 0.1)
sleep_time = min(delay + jitter, max_delay)
time.sleep(sleep_time)
delay *= 2
return wrapper
return decorator
# 使用例
@exponential_backoff_retry(max_retries=3)
def fetch_user_data(user_id):
response = requests.get(
f"http://user-service/users/{user_id}",
timeout=3
)
response.raise_for_status()
return response.json()
13.7 実践演習
演習1: ECサイトのマイクロサービス化
目標: モノリシックなECサイトをマイクロサービスに分解し、Podmanで実装する
要件:
- ユーザー管理サービス
- 商品カタログサービス
- カートサービス
- 注文サービス
- 決済サービス
- 通知サービス
実装のポイント:
- サービス間の境界定義
- データの一貫性確保
- 分散トランザクション処理
- 監視とログの統合
演習2: サーキットブレーカーの実装と検証
目標: マイクロサービス間の通信にサーキットブレーカーを実装し、障害時の挙動を確認する
手順:
- 障害を模擬するサービスの作成
- サーキットブレーカーの実装
- 負荷テストツールでの検証
- メトリクスの可視化