第14章:パフォーマンスチューニング
本章の意義と学習目標
なぜエンタープライズ環境でPodmanが選ばれるのか
エンタープライズ環境には、個人開発や小規模チームとは異なる厳格な要件があります:
- セキュリティコンプライアンス: 業界標準や規制への準拠
- 監査要件: すべての操作の追跡可能性
- 高可用性: ビジネス継続性の確保
- 既存システムとの統合: レガシーシステムとの共存
Podmanは、これらの要件に対して以下の価値を提供します:
- Rootlessによるセキュリティ強化
- systemd統合による運用の標準化
- Red Hatによるエンタープライズサポート
- 既存のLinux管理ツールとの親和性
14.1 エンタープライズセキュリティ
14.1.1 コンプライアンス対応
セキュリティポリシーの実装
#!/bin/bash
# security-compliance-check.sh
echo "=== Enterprise Security Compliance Check ==="
echo "Date: $(date)"
echo "Checking against: CIS Docker Benchmark v1.4.0 adapted for Podman"
# 結果を保存する配列
declare -a passed=()
declare -a failed=()
declare -a warnings=()
# チェック関数
check() {
local id=$1
local description=$2
local command=$3
local expected=$4
echo -n "[$id] $description... "
result=$(eval $command 2>/dev/null)
if [[ "$result" == *"$expected"* ]]; then
echo "PASS"
passed+=("$id: $description")
else
echo "FAIL"
failed+=("$id: $description")
fi
}
# 1. ホスト設定
check "1.1" "Rootlessモードの確認" "podman info --format '\{\{.Host.Security.Rootless\}\}'" "true"
check "1.2" "SELinuxの有効化" "getenforce" "Enforcing"
check "1.3" "監査ログの有効化" "auditctl -l | grep -c podman" "1"
# 2. Podman設定
check "2.1" "ユーザー名前空間の有効化" "sysctl user.max_user_namespaces" "0"
check "2.2" "seccompプロファイルの使用" "podman info --format '\{\{.Host.Security.SECCOMPEnabled\}\}'" "true"
# 3. イメージセキュリティ
echo -e "\n[3.x] イメージセキュリティチェック"
for image in $(podman images --format "\{\{.Repository\}\}:\{\{.Tag\}\}" | grep -v "localhost/"); do
echo " Checking image: $image"
# ルートユーザーチェック
user=$(podman run --rm $image whoami 2>/dev/null || echo "error")
if [ "$user" = "root" ]; then
warnings+=("Image $image runs as root")
fi
# 署名確認
if ! podman image trust show $image >/dev/null 2>&1; then
warnings+=("Image $image has no signature policy")
fi
done
# 4. コンテナランタイムセキュリティ
echo -e "\n[4.x] ランタイムセキュリティチェック"
for container in $(podman ps -q); do
name=$(podman inspect $container --format '\{\{.Name\}\}')
echo " Checking container: $name"
# 特権モードチェック
privileged=$(podman inspect $container --format '\{\{.HostConfig.Privileged\}\}')
if [ "$privileged" = "true" ]; then
failed+=("Container $name is running in privileged mode")
fi
# ケーパビリティチェック
caps=$(podman inspect $container --format '\{\{.EffectiveCaps\}\}')
if [ "$caps" != "[]" ] && [ "$caps" != "null" ]; then
warnings+=("Container $name has additional capabilities: $caps")
fi
done
# レポート生成
echo -e "\n=== Compliance Report Summary ==="
echo "Passed: ${#passed[@]}"
echo "Failed: ${#failed[@]}"
echo "Warnings: ${#warnings[@]}"
# 詳細レポート生成
cat > compliance-report-$(date +%Y%m%d-%H%M%S).json << EOF
{
"timestamp": "$(date -u +%Y-%m-%dT%H:%M:%SZ)",
"summary": {
"passed": ${#passed[@]},
"failed": ${#failed[@]},
"warnings": ${#warnings[@]}
},
"details": {
"passed": [$(printf '"%s",' "${passed[@]}" | sed 's/,$//')]",
"failed": [$(printf '"%s",' "${failed[@]}" | sed 's/,$//')]",
"warnings": [$(printf '"%s",' "${warnings[@]}" | sed 's/,$//')"]
}
}
EOF
echo "Report saved to compliance-report-*.json"
14.1.2 監査ログの実装
#!/usr/bin/env python3
# audit_logger.py
import json
import datetime
import hashlib
import subprocess
import threading
import queue
from pathlib import Path
class PodmanAuditLogger:
def __init__(self, log_dir="/var/log/podman-audit"):
self.log_dir = Path(log_dir)
self.log_dir.mkdir(parents=True, exist_ok=True)
self.event_queue = queue.Queue()
self.running = True
# バックグラウンドワーカー起動
self.worker_thread = threading.Thread(target=self._process_events)
self.worker_thread.start()
def start_monitoring(self):
"""Podmanイベントの監視開始"""
process = subprocess.Popen(
['podman', 'events', '--format', 'json'],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=1
)
for line in process.stdout:
if line.strip():
try:
event = json.loads(line)
self.event_queue.put(event)
except json.JSONDecodeError:
continue
def _process_events(self):
"""イベント処理ワーカー"""
while self.running:
try:
event = self.event_queue.get(timeout=1)
self._log_event(event)
except queue.Empty:
continue
def _log_event(self, event):
"""監査ログエントリーの作成"""
timestamp = datetime.datetime.utcnow()
# 監査エントリー構築
audit_entry = {
"timestamp": timestamp.isoformat(),
"event_time": event.get("time", ""),
"event_type": event.get("Type", ""),
"action": event.get("Action", ""),
"actor": {
"id": event.get("Actor", {}).get("ID", ""),
"attributes": event.get("Actor", {}).get("Attributes", {})
},
"scope": event.get("scope", ""),
"time_nano": event.get("timeNano", ""),
"metadata": {}
}
# ユーザー情報の追加
try:
user_info = subprocess.run(
['id', '-un'],
capture_output=True,
text=True
).stdout.strip()
audit_entry["user"] = user_info
except:
audit_entry["user"] = "unknown"
# ハッシュ計算(改ざん防止)
entry_string = json.dumps(audit_entry, sort_keys=True)
audit_entry["hash"] = hashlib.sha256(entry_string.encode()).hexdigest()
# ログファイルへの書き込み
log_file = self.log_dir / f"audit-{timestamp.strftime('%Y%m%d')}.jsonl"
with open(log_file, 'a') as f:
f.write(json.dumps(audit_entry) + '\n')
# 重要イベントの即時アラート
if self._is_critical_event(event):
self._send_alert(audit_entry)
def _is_critical_event(self, event):
"""重要イベントの判定"""
critical_actions = [
"exec_died", # 不正なプロセス終了
"remove", # コンテナ削除
"kill" # コンテナ強制終了
]
suspicious_images = [
"alpine", # 最小イメージ(攻撃によく使われる)
"busybox"
]
action = event.get("Action", "")
image = event.get("Actor", {}).get("Attributes", {}).get("image", "")
return action in critical_actions or any(s in image for s in suspicious_images)
def _send_alert(self, audit_entry):
"""アラート送信(SIEM統合など)"""
# 実装例: Syslog, SIEM API, メール通知など
print(f"ALERT: Critical event detected - {audit_entry['action']}")
def generate_compliance_report(self, start_date, end_date):
"""コンプライアンスレポート生成"""
events = []
# 指定期間のログファイルを読み込み
for log_file in self.log_dir.glob("audit-*.jsonl"):
with open(log_file, 'r') as f:
for line in f:
event = json.loads(line)
event_time = datetime.datetime.fromisoformat(event['timestamp'])
if start_date <= event_time <= end_date:
events.append(event)
# レポート生成
report = {
"period": {
"start": start_date.isoformat(),
"end": end_date.isoformat()
},
"total_events": len(events),
"events_by_type": {},
"events_by_user": {},
"critical_events": []
}
for event in events:
# タイプ別集計
event_type = event['event_type']
report['events_by_type'][event_type] = report['events_by_type'].get(event_type, 0) + 1
# ユーザー別集計
user = event['user']
report['events_by_user'][user] = report['events_by_user'].get(user, 0) + 1
# 重要イベント抽出
if self._is_critical_event(event):
report['critical_events'].append(event)
return report
if __name__ == "__main__":
logger = PodmanAuditLogger()
logger.start_monitoring()
14.2 高可用性とディザスタリカバリ
14.2.1 アクティブ-スタンバイ構成
#!/bin/bash
# ha-setup.sh
# 高可用性Podmanクラスター構築スクリプト
ACTIVE_NODE="node1.example.com"
STANDBY_NODE="node2.example.com"
VIP="192.168.100.100"
SHARED_STORAGE="/mnt/shared-storage"
# Pacemakerクラスター設定
setup_pacemaker() {
echo "Setting up Pacemaker cluster..."
# 両ノードで実行
for node in $ACTIVE_NODE $STANDBY_NODE; do
ssh $node << 'EOF'
# Pacemaker/Corosyncインストール
dnf install -y pacemaker corosync pcs fence-agents-all
# サービス有効化
systemctl enable --now pcsd
systemctl enable pacemaker corosync
# haclusterユーザーのパスワード設定
echo "hacluster:StrongPassword123!" | chpasswd
EOF
done
# クラスター作成
pcs host auth $ACTIVE_NODE $STANDBY_NODE -u hacluster -p StrongPassword123!
pcs cluster setup podman-cluster $ACTIVE_NODE $STANDBY_NODE --force
pcs cluster start --all
pcs cluster enable --all
# 基本設定
pcs property set stonith-enabled=false
pcs property set no-quorum-policy=ignore
}
# 仮想IPリソース設定
setup_vip() {
echo "Setting up Virtual IP..."
pcs resource create virtual_ip ocf:heartbeat:IPaddr2 \
ip=$VIP \
cidr_netmask=24 \
op monitor interval=30s
}
# Podmanレジストリの高可用性設定
setup_ha_registry() {
echo "Setting up HA Registry..."
# 共有ストレージのマウント
cat > /etc/systemd/system/mnt-shared\x2dstorage.mount << EOF
[Unit]
Description=Shared Storage Mount
After=network-online.target
[Mount]
What=${SHARED_STORAGE}
Where=/mnt/shared-storage
Type=nfs
Options=_netdev,auto
[Install]
WantedBy=multi-user.target
EOF
systemctl daemon-reload
systemctl enable --now mnt-shared\\x2dstorage.mount
# レジストリサービス作成
cat > /etc/systemd/system/podman-registry.service << 'EOF'
[Unit]
Description=Podman Registry
After=network-online.target mnt-shared\x2dstorage.mount
Requires=mnt-shared\x2dstorage.mount
[Service]
Type=notify
ExecStartPre=-/usr/bin/podman rm -f registry
ExecStart=/usr/bin/podman run \
--name registry \
--publish 5000:5000 \
--volume /mnt/shared-storage/registry:/var/lib/registry \
--sdnotify=conmon \
docker.io/registry:2
ExecStop=/usr/bin/podman stop -t 10 registry
Restart=on-failure
RestartSec=30
[Install]
WantedBy=multi-user.target
EOF
# Pacemakerリソースとして登録
pcs resource create podman-registry systemd:podman-registry \
op monitor interval=30s \
op start timeout=60s \
op stop timeout=60s
# リソースグループ作成
pcs resource group add podman-services virtual_ip podman-registry
}
# データレプリケーション設定
setup_data_replication() {
echo "Setting up data replication..."
# rsyncによる定期同期
cat > /usr/local/bin/sync-podman-data.sh << 'EOF'
#!/bin/bash
# アクティブノードからスタンバイノードへのデータ同期
ACTIVE=$1
STANDBY=$2
# コンテナボリューム同期
rsync -avz --delete \
${ACTIVE}:/var/lib/containers/storage/volumes/ \
${STANDBY}:/var/lib/containers/storage/volumes/
# イメージ同期
for image in $(ssh $ACTIVE podman images --format "\{\{.Repository\}\}:\{\{.Tag\}\}"); do
ssh $ACTIVE podman save $image | ssh $STANDBY podman load
done
EOF
chmod +x /usr/local/bin/sync-podman-data.sh
# cronジョブ設定
echo "*/5 * * * * /usr/local/bin/sync-podman-data.sh $ACTIVE_NODE $STANDBY_NODE" | crontab -
}
# 監視スクリプト
cat > /usr/local/bin/monitor-podman-ha.sh << 'EOF'
#!/bin/bash
# Podman HA監視スクリプト
while true; do
# クラスターステータス確認
if ! pcs status >/dev/null 2>&1; then
echo "ALERT: Pacemaker cluster is not healthy"
# アラート送信
fi
# レジストリヘルスチェック
if ! curl -s http://${VIP}:5000/v2/_catalog >/dev/null; then
echo "ALERT: Registry is not accessible"
# フェイルオーバートリガー
pcs resource move podman-services
fi
sleep 30
done
EOF
chmod +x /usr/local/bin/monitor-podman-ha.sh
# メイン実行
setup_pacemaker
setup_vip
setup_ha_registry
setup_data_replication
echo "HA setup completed!"
14.3 既存システムとの統合
14.3.1 LDAP/Active Directory統合
#!/usr/bin/env python3
# ldap_integration.py
import ldap
import json
import subprocess
from flask import Flask, request, jsonify
from functools import wraps
import jwt
import datetime
app = Flask(__name__)
class LDAPAuthenticator:
def __init__(self, config):
self.ldap_server = config['ldap_server']
self.base_dn = config['base_dn']
self.bind_dn = config['bind_dn']
self.bind_password = config['bind_password']
self.user_filter = config['user_filter']
self.group_filter = config['group_filter']
def authenticate(self, username, password):
"""LDAP認証"""
try:
# LDAP接続
conn = ldap.initialize(f"ldap://{self.ldap_server}")
conn.protocol_version = ldap.VERSION3
conn.set_option(ldap.OPT_REFERRALS, 0)
# バインドDNで接続
conn.simple_bind_s(self.bind_dn, self.bind_password)
# ユーザー検索
search_filter = self.user_filter.format(username=username)
result = conn.search_s(
self.base_dn,
ldap.SCOPE_SUBTREE,
search_filter,
['dn', 'memberOf', 'cn', 'mail']
)
if not result:
return False, None
user_dn = result[0][0]
user_attrs = result[0][1]
# ユーザー認証
try:
user_conn = ldap.initialize(f"ldap://{self.ldap_server}")
user_conn.simple_bind_s(user_dn, password)
user_conn.unbind()
# グループメンバーシップ確認
groups = self._get_user_groups(conn, user_attrs.get('memberOf', []))
user_info = {
'username': username,
'dn': user_dn,
'cn': user_attrs.get('cn', [b''])[0].decode(),
'email': user_attrs.get('mail', [b''])[0].decode(),
'groups': groups
}
return True, user_info
except ldap.INVALID_CREDENTIALS:
return False, None
except Exception as e:
print(f"LDAP error: {e}")
return False, None
finally:
if 'conn' in locals():
conn.unbind()
def _get_user_groups(self, conn, member_of):
"""ユーザーのグループ取得"""
groups = []
for group_dn in member_of:
group_dn = group_dn.decode()
# グループ名抽出
cn = group_dn.split(',')[0].split('=')[1]
groups.append(cn)
return groups
def authorize_podman_access(self, user_info):
"""Podmanアクセス権限の確認"""
allowed_groups = [
'PodmanAdmins',
'PodmanUsers',
'ContainerDevelopers'
]
user_groups = user_info.get('groups', [])
return any(group in allowed_groups for group in user_groups)
def get_resource_permissions(self, user_info):
"""リソースレベルの権限取得"""
permissions = {
'can_create_containers': False,
'can_pull_images': False,
'can_push_images': False,
'can_manage_volumes': False,
'resource_limits': {}
}
user_groups = user_info.get('groups', [])
if 'PodmanAdmins' in user_groups:
# 管理者は全権限
permissions.update({
'can_create_containers': True,
'can_pull_images': True,
'can_push_images': True,
'can_manage_volumes': True,
'resource_limits': {
'max_containers': -1, # 無制限
'max_cpu': -1,
'max_memory': -1
}
})
elif 'PodmanUsers' in user_groups:
# 一般ユーザー
permissions.update({
'can_create_containers': True,
'can_pull_images': True,
'can_push_images': False,
'can_manage_volumes': True,
'resource_limits': {
'max_containers': 10,
'max_cpu': 4,
'max_memory': '8G'
}
})
return permissions
# JWT設定
JWT_SECRET = 'your-secret-key'
JWT_ALGORITHM = 'HS256'
JWT_EXPIRATION_HOURS = 8
def generate_token(user_info):
"""JWTトークン生成"""
payload = {
'username': user_info['username'],
'groups': user_info['groups'],
'exp': datetime.datetime.utcnow() + datetime.timedelta(hours=JWT_EXPIRATION_HOURS)
}
return jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALGORITHM)
def require_auth(f):
"""認証デコレーター"""
@wraps(f)
def decorated_function(*args, **kwargs):
token = request.headers.get('Authorization', '').replace('Bearer ', '')
if not token:
return jsonify({'error': 'No token provided'}), 401
try:
payload = jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGORITHM])
request.user = payload
except jwt.ExpiredSignatureError:
return jsonify({'error': 'Token expired'}), 401
except jwt.InvalidTokenError:
return jsonify({'error': 'Invalid token'}), 401
return f(*args, **kwargs)
return decorated_function
# LDAP設定
ldap_config = {
'ldap_server': 'ldap.company.com',
'base_dn': 'DC=company,DC=com',
'bind_dn': 'CN=ServiceAccount,OU=Services,DC=company,DC=com',
'bind_password': 'service_password',
'user_filter': '(&(objectClass=user)(sAMAccountName={username}))',
'group_filter': '(&(objectClass=group)(member={user_dn}))'
}
authenticator = LDAPAuthenticator(ldap_config)
# APIエンドポイント
@app.route('/auth/login', methods=['POST'])
def login():
"""ログインエンドポイント"""
username = request.json.get('username')
password = request.json.get('password')
if not username or not password:
return jsonify({'error': 'Username and password required'}), 400
success, user_info = authenticator.authenticate(username, password)
if not success:
return jsonify({'error': 'Invalid credentials'}), 401
if not authenticator.authorize_podman_access(user_info):
return jsonify({'error': 'Access denied'}), 403
# 権限情報を含めてトークン生成
permissions = authenticator.get_resource_permissions(user_info)
user_info['permissions'] = permissions
token = generate_token(user_info)
return jsonify({
'token': token,
'user': user_info
})
@app.route('/api/containers', methods=['POST'])
@require_auth
def create_container():
"""コンテナ作成API"""
# 権限確認
user_groups = request.user.get('groups', [])
if 'PodmanUsers' not in user_groups and 'PodmanAdmins' not in user_groups:
return jsonify({'error': 'Insufficient permissions'}), 403
# コンテナ作成リクエストの検証
container_config = request.json
# リソース制限の適用
if 'PodmanUsers' in user_groups and 'PodmanAdmins' not in user_groups:
# 一般ユーザーにはリソース制限を適用
container_config['memory'] = min(
container_config.get('memory', '1G'),
'8G'
)
container_config['cpus'] = min(
container_config.get('cpus', 1),
4
)
# Podmanコマンド実行
try:
cmd = ['podman', 'create']
if 'name' in container_config:
cmd.extend(['--name', container_config['name']])
if 'memory' in container_config:
cmd.extend(['--memory', container_config['memory']])
if 'cpus' in container_config:
cmd.extend(['--cpus', str(container_config['cpus'])])
cmd.append(container_config['image'])
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
return jsonify({'error': result.stderr}), 400
container_id = result.stdout.strip()
# 監査ログ記録
log_entry = {
'timestamp': datetime.datetime.utcnow().isoformat(),
'user': request.user['username'],
'action': 'create_container',
'container_id': container_id,
'config': container_config
}
# ログファイルに記録
with open('/var/log/podman-api-audit.log', 'a') as f:
f.write(json.dumps(log_entry) + '\n')
return jsonify({
'container_id': container_id,
'status': 'created'
})
except Exception as e:
return jsonify({'error': str(e)}), 500
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000, ssl_context='adhoc')
14.4 実践演習
演習1: エンタープライズ向けPodman環境の構築
目標: 以下の要件を満たすPodman環境を構築する
- LDAP認証統合
- 高可用性構成
- 自動バックアップ
- コンプライアンス監査
実装手順:
- LDAP/ADサーバーとの連携設定
- Pacemaker/Corosyncによるクラスター構築
- バックアップジョブの設定
- 監査ログシステムの実装
演習2: ディザスタリカバリ訓練
目標: 障害発生時の復旧手順を確立し、訓練を実施する
シナリオ:
- プライマリノードの完全障害
- データ破損からの復旧
- ランサムウェア攻撃からの復旧
期待される成果:
- RTO(目標復旧時間)の測定
- RPO(目標復旧時点)の確認
- 復旧手順書の作成