Chapter 9: アーキテクチャ選択演習 🏗️
📚 目次に戻る: 📖 学習ガイド
⬅️ 前の章: Chapter 8: 運用監視と自動化
➡️ 次の章: Chapter 10: トラブルシューティング
🎯 学習フェーズ: Part IV - 実践・応用編(演習)
🎯 学習レベル: 🌱 基礎 | 🚀 応用 | 💪 発展
⏱️ 推定学習時間: 4-6時間
📝 難易度: 中級(全章の理解が前提)
—
🏗️ 建築設計のプロセス
├── 📋 要件ヒアリング:「どんな建物が欲しいですか?」
├── 🎯 設計選択:一戸建て vs マンション vs 商業ビル
├── 📐 詳細設計:間取り・構造・設備の決定
└── 🔄 段階的施工:基礎→骨組み→内装→完成
💾 Supabaseアーキテクチャ選択
├── 📋 プロジェクト要件:ユーザー数・機能・予算・期間
├── 🎯 パターン選択:Client-side vs Edge vs API Server
├── 📐 詳細設計:データベース構造・セキュリティ・UI設計
└── 🔄 段階的実装:MVP→機能追加→スケーリング→運用最適化
🎯 この章で学ぶこと
建築家が「平屋の家を建てたいのか、30階建てのオフィスビルを建てたいのか」によって全く異なる設計をするように、Supabaseアプリケーションも、要件に応じて最適なアーキテクチャパターンを選択する必要があります。
🌱 初心者が陥りがちな問題
# ❌ よくある初心者の思考パターン
「とりあえずクライアントサイドで作り始めよう」
「後で必要になったら考えればいいでしょ」
「複雑そうだから一番安全そうなAPIサーバーにしよう」
# → 結果:過剰設計 or 後戻りコスト大
✅ この章で身につく選択力
# ✅ 要件ベースの体系的判断
user_count = 1000 # ユーザー数から規模を判断
complexity = "medium" # 機能複雑度を評価
team_size = 3 # チーム規模を考慮
timeline = 8 # 開発期間を検討
# → 結果:「この条件ならEdge Functionsが最適」
📚 学習進度別ガイド
レベル | 対象者 | 学習内容 | 所要時間 |
---|---|---|---|
🌱 基礎 | パターンの違いが分からない方 | 要件分析・パターン比較・基本選択手法 | 4-6時間 |
🚀 応用 | 各パターンは知っている方 | 移行戦略・ハイブリッド構成・実践演習 | 6-10時間 |
💪 発展 | 本格的な設計をしたい方 | 企業級要件対応・マイクロサービス・自動化 | 10-15時間 |
9.1 要件からのパターン選択フレームワーク(建築設計コンサルタントシステム)
レストランを開業するとき、「お客さんは何人くらい?」「どんな料理?」「予算は?」「いつオープン?」を聞いて店舗設計を決めるように、Supabaseアプリケーションも要件を聞いて最適なアーキテクチャパターンを決定します。
🎯 Step 1: 基本的な要件分析システム
まず、「この建物にはどんな機能が必要ですか?」を整理するシステムを作りましょう:
🔰 初心者向け解説:要件分析とは?
実際のビジネスと技術選択の関係を理解してみましょう:
ビジネス例 | 要件の例 | 適切な選択 | 不適切な選択 |
---|---|---|---|
🍕 個人のピザ屋 | 「近所の50人向け、注文管理」 | 小さなタブレット1台 | 大型POSシステム(過剰) |
🏪 コンビニチェーン | 「全国1000店舗、在庫管理」 | 本部システム+各店端末 | 手書き帳簿(力不足) |
🏦 銀行システム | 「数百万顧客、24時間運用」 | 冗長化された大規模システム | 個人PC(危険) |
これと同じように、Supabaseでも要件に応じて適切なアーキテクチャパターンを選択する必要があります。
📋 要件分析システムの実装
レストランで「何名様ですか?」「お食事の内容は?」「予算は?」を聞くように、アプリケーションの要件を体系的に分析するシステムを作ります:
アーキテクチャ決定フレームワーク
# 📋 アーキテクチャ要件分析システム(レストラン設計コンサルタント)
from typing import Dict, Any, List, Optional, Tuple
from dataclasses import dataclass
from enum import Enum
import json
# 🏗️ プロジェクト規模分類(建物の種類)
class ProjectScale(Enum):
PROTOTYPE = "prototype" # 模型・試作品(個人的な実験)
SMALL = "small" # 個人商店レベル(< 1K users)
MEDIUM = "medium" # 支店・事業所レベル(1K-10K users)
LARGE = "large" # 本社ビルレベル(10K-100K users)
ENTERPRISE = "enterprise" # 超高層ビルレベル(> 100K users)
# 🧩 機能複雑度レベル(レストランのメニューの複雑さ)
class ComplexityLevel(Enum):
LOW = "low" # ファーストフード(基本CRUD)
MEDIUM = "medium" # 一般レストラン(ビジネスロジック有)
HIGH = "high" # 高級レストラン(複雑なワークフロー)
VERY_HIGH = "very_high" # ミシュラン星付き(企業レベル統合)
# ⏱️ 開発期間制約(工期)
class TimeConstraint(Enum):
URGENT = "urgent" # 急ぎ(< 4週間)- 仮設店舗レベル
NORMAL = "normal" # 通常(4-12週間)- 一般的な建設期間
RELAXED = "relaxed" # 余裕(> 12週間)- じっくり建設
# 👥 チーム規模(施工チームの人数)
class TeamSize(Enum):
SOLO = "solo" # 1名(個人事業主)
SMALL = "small" # 2-3名(小規模工務店)
MEDIUM = "medium" # 4-8名(中規模建設会社)
LARGE = "large" # 9名以上(大手建設会社)
# 🏗️ アーキテクチャパターン(建築工法)
class ArchitecturePattern(Enum):
CLIENT_SIDE = "client_side" # 木造住宅(シンプル・安価)
EDGE_FUNCTIONS = "edge_functions" # プレハブ工法(柔軟・中コスト)
API_SERVER = "api_server" # 鉄筋コンクリート(堅牢・高コスト)
HYBRID = "hybrid" # 混構造(適材適所)
MICROSERVICES = "microservices" # 高層ビル工法(最高性能・最高コスト)
# 📋 プロジェクト要件書(レストラン開業計画書)
@dataclass
class ProjectRequirements:
# 🏪 基本情報(店舗の基本データ)
project_name: str # プロジェクト名(店舗名)
description: str # 説明(どんなお店?)
# 📊 スケール要件(お客さんの規模)
expected_users: int # 想定ユーザー数(1日何人のお客さん?)
concurrent_users: int # 同時利用者数(一度に何人?)
data_volume_gb: float # データ容量(メニューの種類、顧客情報など)
requests_per_second: int # 秒間リクエスト数(忙しい時の注文頻度)
# 🎯 機能要件(お店で提供するサービス)
real_time_features: bool # リアルタイム機能(注文のライブ表示など)
offline_support: bool # オフライン対応(ネット切れても使える?)
mobile_app: bool # モバイルアプリ(スマホ注文対応)
third_party_integrations: int # 外部連携数(配達アプリ、決済システムなど)
payment_processing: bool # 決済処理(クレジットカード対応)
file_uploads: bool # ファイルアップロード(写真投稿など)
notifications: bool # 通知機能(注文完了のお知らせ)
analytics_reporting: bool # 分析レポート(売上分析など)
# 🛡️ 非機能要件(お店の品質基準)
availability_requirement: float # 稼働率要件(99.9% = ほぼ休まず営業)
response_time_requirement_ms: int # 応答時間要件(注文から何秒で応答?)
security_level: str # セキュリティレベル(basic/standard/high/enterprise)
compliance_requirements: List[str] # 法的要件(GDPR, 食品衛生法など)
# 👥 組織・プロジェクト制約(建設チーム・予算の制約)
team_size: TeamSize # チーム規模(何人で開発?)
development_timeline_weeks: int # 開発期間(何週間で完成?)
budget_constraint: str # 予算制約(low/medium/high/unlimited)
existing_infrastructure: List[str] # 既存インフラ(すでに持っている設備)
preferred_technologies: List[str] # 希望技術(Python、React など)
# 📈 成長予測(将来の拡張計画)
user_growth_rate_monthly: float # 月間ユーザー増加率(月何%ずつ成長?)
feature_expansion_planned: bool # 機能拡張予定(将来メニュー追加?)
international_expansion: bool # 海外展開(海外支店の予定?)
# 📊 アーキテクチャ推奨結果(建築設計提案書)
@dataclass
class ArchitectureRecommendation:
primary_pattern: ArchitecturePattern # 推奨パターン(木造/鉄筋など)
confidence_score: float # 信頼度スコア(0-100、確信度)
reasoning: List[str] # 選択理由(なぜこの工法?)
pros: List[str] # メリット(この工法の良い点)
cons: List[str] # デメリット(注意すべき点)
risks: List[str] # リスク(想定される問題)
migration_path: Optional[List[ArchitecturePattern]] # 移行経路(段階的建設計画)
estimated_development_time_weeks: int # 推定開発期間(建設期間)
estimated_maintenance_effort: str # 推定保守工数(メンテナンス負担)
class ArchitectureDecisionEngine:
"""🏗️ アーキテクチャ決定エンジン(建築設計コンサルタントAI)"""
def __init__(self):
# 💡 設計ルール読み込み(建築基準・設計指針)
self.decision_rules = self._load_decision_rules()
# ⚖️ 重み係数読み込み(何を重視するか)
self.weight_factors = self._load_weight_factors()
def analyze_requirements(self, requirements: ProjectRequirements) -> ArchitectureRecommendation:
"""📋 要件分析とアーキテクチャ推奨(設計コンサルティング)"""
print("🏗️ アーキテクチャ分析を開始します...")
print(f"📋 プロジェクト: {requirements.project_name}")
print(f"👥 想定ユーザー数: {requirements.expected_users:,}人")
print(f"⏱️ 開発期間: {requirements.development_timeline_weeks}週間")
# 🎯 各パターンのスコア計算(各工法の適合度採点)
pattern_scores = {}
print("\n📊 各パターンの適合度を計算中...")
for pattern in ArchitecturePattern:
print(f" 🔍 {pattern.value} パターンを評価中...")
score = self._calculate_pattern_score(requirements, pattern)
pattern_scores[pattern] = score
print(f" 📈 総合スコア: {score['total_score']:.1f}/100")
# 🏆 最適パターン選択(最高点の工法を選択)
best_pattern = max(pattern_scores.keys(), key=lambda p: pattern_scores[p]["total_score"])
best_score = pattern_scores[best_pattern]
print(f"\n🎯 推奨パターン: {best_pattern.value}")
print(f"✨ 信頼度スコア: {best_score['total_score']:.1f}/100")
# 📋 推奨内容生成(提案書作成)
recommendation = self._generate_recommendation(requirements, best_pattern, best_score, pattern_scores)
return recommendation
def _calculate_pattern_score(self, req: ProjectRequirements, pattern: ArchitecturePattern) -> Dict[str, Any]:
"""🎯 パターン別スコア計算(建築工法の適合度評価)"""
# 📊 評価項目(建築工法の評価基準)
scores = {
"scalability": 0, # 拡張性(増築のしやすさ)
"development_speed": 0, # 開発速度(建設の早さ)
"maintenance_ease": 0, # 保守性(メンテナンスの楽さ)
"security": 0, # セキュリティ(防犯性)
"cost_efficiency": 0, # コスト効率(建設・運用費用)
"team_fit": 0, # チーム適合性(施工チームとの相性)
"technology_match": 0 # 技術マッチ度(希望技術との適合)
}
penalties = [] # 減点要因(この工法の問題点)
bonuses = [] # 加点要因(この工法の利点)
print(f" 🔍 {pattern.value} の詳細評価:")
# 📋 パターン別評価実行(工法ごとの詳細チェック)
if pattern == ArchitecturePattern.CLIENT_SIDE:
scores.update(self._score_client_side_pattern(req, penalties, bonuses))
elif pattern == ArchitecturePattern.EDGE_FUNCTIONS:
scores.update(self._score_edge_functions_pattern(req, penalties, bonuses))
elif pattern == ArchitecturePattern.API_SERVER:
scores.update(self._score_api_server_pattern(req, penalties, bonuses))
elif pattern == ArchitecturePattern.HYBRID:
scores.update(self._score_hybrid_pattern(req, penalties, bonuses))
elif pattern == ArchitecturePattern.MICROSERVICES:
scores.update(self._score_microservices_pattern(req, penalties, bonuses))
# ⚖️ 重み付き総合スコア計算(項目の重要度を考慮)
# プロジェクトの性質に応じて重み付けを調整
weight_factors = self._get_weight_factors(req)
total_score = sum(
scores[criterion] * weight_factors.get(criterion, 0.14) # デフォルト重み: 1/7
for criterion in scores
)
# 📊 評価結果の詳細表示
for criterion, score in scores.items():
print(f" 📈 {criterion}: {score}/100")
if bonuses:
print(f" ✅ 加点要因: {len(bonuses)}個")
if penalties:
print(f" ❌ 減点要因: {len(penalties)}個")
return {
"scores": scores,
"total_score": total_score,
"penalties": penalties,
"bonuses": bonuses
}
def _get_weight_factors(self, req: ProjectRequirements) -> Dict[str, float]:
"""⚖️ 重み係数の動的計算(プロジェクトの性質に応じた重要度調整)"""
# 🎯 基本重み(すべて均等)
weights = {
"scalability": 0.14, # 拡張性
"development_speed": 0.14, # 開発速度
"maintenance_ease": 0.14, # 保守性
"security": 0.14, # セキュリティ
"cost_efficiency": 0.14, # コスト効率
"team_fit": 0.14, # チーム適合性
"technology_match": 0.16 # 技術マッチ度
}
# 📊 プロジェクトの特性に応じて重みを調整
# 大規模プロジェクトは拡張性重視
if req.expected_users > 10000:
weights["scalability"] += 0.1
weights["development_speed"] -= 0.05
weights["cost_efficiency"] -= 0.05
# 急ぎプロジェクトは開発速度重視
if req.development_timeline_weeks < 8:
weights["development_speed"] += 0.15
weights["maintenance_ease"] -= 0.075
weights["scalability"] -= 0.075
# 高セキュリティ要件はセキュリティ重視
if req.security_level in ["high", "enterprise"]:
weights["security"] += 0.1
weights["development_speed"] -= 0.05
weights["cost_efficiency"] -= 0.05
# 小規模チームはシンプルさ重視
if req.team_size in [TeamSize.SOLO, TeamSize.SMALL]:
weights["maintenance_ease"] += 0.1
weights["team_fit"] += 0.05
weights["scalability"] -= 0.15
return weights
def _score_client_side_pattern(self, req: ProjectRequirements, penalties: List, bonuses: List) -> Dict[str, int]:
"""🏠 クライアントサイドパターン評価(木造住宅工法)"""
scores = {}
print(" 🏠 木造住宅工法として評価中...")
# 🏗️ スケーラビリティ(増築のしやすさ)
if req.expected_users < 1000:
scores["scalability"] = 90
bonuses.append("個人住宅規模に最適(1000人未満)")
print(" ✅ 小規模利用に適している")
elif req.expected_users < 5000:
scores["scalability"] = 70
print(" ⚠️ 中規模では制約あり")
else:
scores["scalability"] = 30
penalties.append("大規模ユーザーには構造的に不適切(マンション建設が必要)")
print(" ❌ 大規模には向かない")
# ⚡ 開発速度(建設の早さ)
scores["development_speed"] = 95
bonuses.append("プレハブ工法のように迅速な開発が可能")
print(" ✅ 最速で建設可能")
# 🔧 メンテナンス性(保守の楽さ)
if req.complexity_level == ComplexityLevel.LOW:
scores["maintenance_ease"] = 85
bonuses.append("シンプルな構造で修理しやすい")
print(" ✅ シンプルで修理しやすい")
elif req.complexity_level == ComplexityLevel.MEDIUM:
scores["maintenance_ease"] = 65
print(" ⚠️ 複雑になると修理が大変")
else:
scores["maintenance_ease"] = 40
penalties.append("複雑なロジックのメンテナンスが困難(配線・配管が複雑)")
print(" ❌ 複雑な構造は木造では限界")
# 🛡️ セキュリティ(防犯性)
if req.security_level in ["basic", "standard"]:
scores["security"] = 70
print(" ✅ 一般的なセキュリティは確保")
else:
scores["security"] = 45
penalties.append("高度なセキュリティ要件に制限(銀行レベルは困難)")
print(" ❌ 高セキュリティは構造的に困難")
# 💰 コスト効率(建設・運用費用)
scores["cost_efficiency"] = 95
bonuses.append("運用コストがほぼゼロ(電気・ガス代不要)")
print(" ✅ 運用コストが最安")
# 👥 チーム適合性(施工チームとの相性)
if req.team_size in [TeamSize.SOLO, TeamSize.SMALL]:
scores["team_fit"] = 90
bonuses.append("1-2人の大工さんでも建設可能")
print(" ✅ 小規模チームに最適")
else:
scores["team_fit"] = 70
print(" ⚠️ 大きなチームには物足りない")
# 🔧 技術マッチ度(希望技術との適合)
if "python" in [tech.lower() for tech in req.preferred_technologies]:
scores["technology_match"] = 85
bonuses.append("Python Fletとの相性抜群")
print(" ✅ Python技術を活用可能")
else:
scores["technology_match"] = 75
print(" ✅ 一般的な技術で対応可能")
return scores
def _score_edge_functions_pattern(self, req: ProjectRequirements, penalties: List, bonuses: List) -> Dict[str, int]:
"""🏢 Edge Functionsパターン評価(プレハブ工法)"""
scores = {}
print(" 🏢 プレハブ工法として評価中...")
# 🏗️ スケーラビリティ(増築のしやすさ)
if req.expected_users < 50000:
scores["scalability"] = 85
bonuses.append("自動スケーリング対応(必要に応じて増設可能)")
print(" ✅ 中規模まで柔軟に対応")
else:
scores["scalability"] = 70
print(" ⚠️ 超大規模では制約あり")
# ⚡ 開発速度(建設の早さ)
if req.third_party_integrations > 2:
scores["development_speed"] = 80
bonuses.append("外部連携部品の組み込み得意(電気・水道・ガス工事)")
print(" ✅ 外部システム連携が得意")
else:
scores["development_speed"] = 70
print(" ✅ 標準的な建設速度")
# 🔧 メンテナンス性(保守の楽さ)
scores["maintenance_ease"] = 75
bonuses.append("モジュール化された構造で部分修理が容易")
print(" ✅ 部分的な修理・交換が容易")
# 🛡️ セキュリティ(防犯性)
scores["security"] = 85
bonuses.append("Supabase認証統合(セキュリティシステム標準装備)")
print(" ✅ 高品質なセキュリティ標準搭載")
# 💰 コスト効率(建設・運用費用)
scores["cost_efficiency"] = 80
bonuses.append("使用量に応じた従量課金(電気代は使った分だけ)")
print(" ✅ 使った分だけの課金")
# 👥 チーム適合性(施工チームとの相性)
if "typescript" in [tech.lower() for tech in req.preferred_technologies]:
scores["team_fit"] = 85
bonuses.append("TypeScript職人チームに最適")
print(" ✅ TypeScript技術者に最適")
else:
scores["team_fit"] = 65
penalties.append("TypeScript/Deno学習コスト(新しい工法の習得必要)")
print(" ⚠️ TypeScript学習が必要")
# 🔧 技術マッチ度(希望技術との適合)
if req.real_time_features:
scores["technology_match"] = 90
bonuses.append("リアルタイム機能統合(最新の配線・通信システム)")
print(" ✅ リアルタイム機能が強み")
else:
scores["technology_match"] = 75
print(" ✅ 一般的な機能は十分対応")
return scores
def _score_api_server_pattern(self, req: ProjectRequirements, penalties: List, bonuses: List) -> Dict[str, int]:
"""🏬 APIサーバーパターン評価(鉄筋コンクリート工法)"""
scores = {}
print(" 🏬 鉄筋コンクリート工法として評価中...")
# 🏗️ スケーラビリティ(増築のしやすさ)
if req.expected_users > 10000:
scores["scalability"] = 95
bonuses.append("大規模システムに最適(高層ビル建設可能)")
print(" ✅ 大規模・高層建築に最適")
else:
scores["scalability"] = 70
print(" ⚠️ 小規模には過剰設計")
# ⚡ 開発速度(建設の早さ)
if req.development_timeline_weeks > 12:
scores["development_speed"] = 75
print(" ✅ 十分な工期があれば高品質")
else:
scores["development_speed"] = 50
penalties.append("初期開発時間が長い(基礎工事に時間が必要)")
print(" ❌ 急ぎ工事は不向き")
# 🔧 メンテナンス性(保守の楽さ)
if req.team_size in [TeamSize.MEDIUM, TeamSize.LARGE]:
scores["maintenance_ease"] = 90
bonuses.append("大規模開発チームに適している(専門職種分業)")
print(" ✅ 大規模チームで真価発揮")
else:
scores["maintenance_ease"] = 60
penalties.append("小規模チームには複雑すぎる")
print(" ❌ 小規模チームには複雑")
# 🛡️ セキュリティ(防犯性)
if req.security_level in ["high", "enterprise"]:
scores["security"] = 95
bonuses.append("エンタープライズセキュリティ対応(銀行並みの堅牢性)")
print(" ✅ 最高レベルのセキュリティ")
else:
scores["security"] = 80
print(" ✅ 高いセキュリティを標準装備")
# 💰 コスト効率(建設・運用費用)
if req.budget_constraint in ["medium", "high", "unlimited"]:
scores["cost_efficiency"] = 70
print(" ✅ 予算に余裕があれば適切")
else:
scores["cost_efficiency"] = 40
penalties.append("運用コストが高い(維持費・光熱費が高額)")
print(" ❌ 運用コストが高額")
# 👥 チーム適合性(施工チームとの相性)
if req.team_size == TeamSize.LARGE:
scores["team_fit"] = 90
bonuses.append("大規模チームの専門技術を活用可能")
print(" ✅ 大規模チームが必要")
else:
scores["team_fit"] = 60
penalties.append("小規模チームには負担が大きい")
print(" ❌ 小規模チームには負担")
# 🔧 技術マッチ度(希望技術との適合)
if len(req.compliance_requirements) > 0:
scores["technology_match"] = 95
bonuses.append("コンプライアンス要件に対応(各種法規制対応)")
print(" ✅ 法規制・コンプライアンス対応")
else:
scores["technology_match"] = 80
print(" ✅ 汎用的な技術要件に対応")
return scores
def _score_hybrid_pattern(self, req: ProjectRequirements, penalties: List, bonuses: List) -> Dict[str, int]:
"""🏘️ ハイブリッドパターン評価(混構造工法)"""
scores = {}
print(" 🏘️ 混構造工法として評価中...")
# 🧩 複雑性が高い場合に推奨(適材適所の建設)
if req.complexity_level in [ComplexityLevel.HIGH, ComplexityLevel.VERY_HIGH]:
scores["scalability"] = 85
scores["development_speed"] = 70
scores["maintenance_ease"] = 75
scores["security"] = 85
scores["cost_efficiency"] = 70
scores["team_fit"] = 65
scores["technology_match"] = 80
bonuses.append("複雑な要件に柔軟対応(適材適所の工法選択)")
print(" ✅ 複雑な要件に最適な工法")
else:
# 複雑性が低い場合は過剰(過剰設計)
scores = {
"scalability": 50,
"development_speed": 50,
"maintenance_ease": 50,
"security": 50,
"cost_efficiency": 50,
"team_fit": 50,
"technology_match": 50
}
penalties.append("要件に対して過剰な複雑性(簡単な家に高級建材は不要)")
print(" ❌ シンプルな要件には過剰")
return scores
def _score_microservices_pattern(self, req: ProjectRequirements, penalties: List, bonuses: List) -> Dict[str, int]:
"""🏙️ マイクロサービスパターン評価(超高層ビル工法)"""
scores = {}
print(" 🏙️ 超高層ビル工法として評価中...")
# 🏢 大規模・高複雑性のみ推奨(超高層ビルが必要な場合のみ)
if (req.expected_users > 50000 and
req.complexity_level == ComplexityLevel.VERY_HIGH and
req.team_size == TeamSize.LARGE):
scores["scalability"] = 100
scores["development_speed"] = 50
scores["maintenance_ease"] = 60
scores["security"] = 80
scores["cost_efficiency"] = 50
scores["team_fit"] = 70
scores["technology_match"] = 75
bonuses.append("超大規模システムに最適(摩天楼建設)")
print(" ✅ 超大規模・超複雑案件に最適")
else:
scores = {
"scalability": 30,
"development_speed": 30,
"maintenance_ease": 30,
"security": 30,
"cost_efficiency": 30,
"team_fit": 30,
"technology_match": 30
}
penalties.append("要件に対して過度に複雑(戸建てに超高層ビル工法は不要)")
print(" ❌ 要件に対して過剰すぎる")
return scores
🔰 初心者向け解説:
このアーキテクチャ決定エンジンは、建築設計コンサルタントのように動作します:
評価プロセス | 建築設計の例 | システム設計の例 |
---|---|---|
Step 1 | 「何人家族ですか?」 | 「何人のユーザーですか?」 |
Step 2 | 「予算はいくらですか?」 | 「開発・運用予算は?」 |
Step 3 | 「どんな生活スタイル?」 | 「どんな機能が必要?」 |
Step 4 | 「いつまでに完成?」 | 「開発期間はどれくらい?」 |
Step 5 | 「工法の提案と見積もり」 | 「アーキテクチャパターンの推奨」 |
🎯 Step 2: 実用的な選択例(3つの典型的なプロジェクト)
実際のプロジェクト例で、どのように選択が変わるかを見てみましょう:
📊 プロジェクト比較表
項目 | 🏠 個人ブログ | 🏢 社内システム | 🏦 金融アプリ |
---|---|---|---|
ユーザー数 | 50人 | 500人 | 50,000人 |
複雑度 | 低(記事投稿) | 中(承認フロー) | 高(取引・決済) |
チーム規模 | 1人 | 3-5人 | 10人以上 |
開発期間 | 4週間 | 12週間 | 24週間 |
セキュリティ | 基本 | 標準 | 最高 |
予算 | 低 | 中 | 高 |
推奨パターン | 🏠 Client-side | 🏢 Edge Functions | 🏦 API Server |
信頼度 | 95% | 88% | 92% |
💡 実際の分析実行例
レストラン注文システムを作る場合の分析を見てみましょう:
# 🍕 レストラン注文システムの要件定義
restaurant_requirements = ProjectRequirements(
project_name="カフェ注文システム",
description="小さなカフェの注文・決済システム",
expected_users=200, # 1日200人のお客さん
concurrent_users=20, # 同時に20人が注文
data_volume_gb=5.0, # メニュー・注文データ
requests_per_second=10, # 秒間10件の注文
real_time_features=True, # 注文状況のリアルタイム表示
offline_support=False, # オンライン専用
mobile_app=True, # スマホ注文対応
third_party_integrations=2, # 決済サービス、配達アプリ
payment_processing=True, # クレジットカード決済
file_uploads=True, # メニュー写真
notifications=True, # 注文完了通知
analytics_reporting=False, # 分析は不要
availability_requirement=99.5, # 99.5%稼働(年3日の停止まで)
response_time_requirement_ms=1000, # 1秒以内に応答
security_level="standard", # 一般的なセキュリティ
compliance_requirements=[], # 特別な法規制なし
team_size=TeamSize.SMALL, # 2-3人チーム
development_timeline_weeks=8, # 8週間で完成
budget_constraint="low", # 予算は限られている
existing_infrastructure=[], # 既存システムなし
preferred_technologies=["Python"], # Python希望
user_growth_rate_monthly=5.0, # 月5%成長
feature_expansion_planned=True, # 将来の機能拡張予定
international_expansion=False # 海外展開なし
)
# 🔍 分析実行
engine = ArchitectureDecisionEngine()
recommendation = engine.analyze_requirements(restaurant_requirements)
print(f"📋 推奨アーキテクチャ: {recommendation.primary_pattern.value}")
print(f"✨ 信頼度: {recommendation.confidence_score:.1f}%")
print(f"⏱️ 開発期間: {recommendation.estimated_development_time_weeks}週間")
実行結果例:
🏗️ アーキテクチャ分析を開始します...
📋 プロジェクト: カフェ注文システム
👥 想定ユーザー数: 200人
⏱️ 開発期間: 8週間
📊 各パターンの適合度を計算中...
🔍 client_side パターンを評価中...
🏠 木造住宅工法として評価中...
📈 scalability: 90/100
📈 development_speed: 95/100
📈 maintenance_ease: 85/100
✅ 加点要因: 3個
🎯 推奨パターン: client_side
✨ 信頼度スコア: 87.3/100
📋 推奨アーキテクチャ: client_side
✨ 信頼度: 87.3%
⏱️ 開発期間: 6週間
🔰 初心者向け解説:
この例では、小さなカフェという「個人住宅レベル」の要件だったため、Client-side(木造住宅工法)が推奨されました。
選択理由:
- ✅ 200人という小規模ユーザー → 木造住宅で十分
- ✅ 8週間という短期間 → 迅速建設が可能
- ✅ 予算制約 → 低コストで実現
- ✅ Python希望 → Flet使用で技術マッチ
9.2 実践的な選択演習(建築設計コンサルティング実習)
🎯 Step 1: あなたのプロジェクト要件を入力してみよう
以下の質問に答えて、あなたのプロジェクトに最適なアーキテクチャパターンを見つけましょう:
def _generate_recommendation(
self,
req: ProjectRequirements,
best_pattern: ArchitecturePattern,
best_score: Dict[str, Any],
all_scores: Dict[ArchitecturePattern, Dict[str, Any]]
) -> ArchitectureRecommendation:
"""推奨内容生成"""
confidence = min(100, best_score["total_score"])
# 推奨理由
reasoning = [
f"総合スコア: {confidence:.1f}/100",
f"想定ユーザー数: {req.expected_users:,}名",
f"開発期間: {req.development_timeline_weeks}週間",
f"チーム規模: {req.team_size.value}"
]
reasoning.extend(best_score["bonuses"])
# メリット・デメリット
pros, cons = self._get_pattern_pros_cons(best_pattern, req)
# リスク評価
risks = self._assess_risks(best_pattern, req)
risks.extend(best_score["penalties"])
# 移行パス
migration_path = self._suggest_migration_path(req, best_pattern)
# 開発時間予測
dev_time = self._estimate_development_time(req, best_pattern)
# メンテナンス工数予測
maintenance_effort = self._estimate_maintenance_effort(req, best_pattern)
return ArchitectureRecommendation(
primary_pattern=best_pattern,
confidence_score=confidence,
reasoning=reasoning,
pros=pros,
cons=cons,
risks=risks,
migration_path=migration_path,
estimated_development_time_weeks=dev_time,
estimated_maintenance_effort=maintenance_effort
)
def _get_pattern_pros_cons(self, pattern: ArchitecturePattern, req: ProjectRequirements) -> Tuple[List[str], List[str]]:
"""パターン別メリット・デメリット"""
pattern_characteristics = {
ArchitecturePattern.CLIENT_SIDE: {
"pros": [
"迅速な開発・デプロイ",
"運用コストが低い",
"Supabaseとの自然な統合",
"リアルタイム機能標準対応"
],
"cons": [
"大規模スケーリング制限",
"複雑ビジネスロジック実装困難",
"セキュリティ境界の制限"
]
},
ArchitecturePattern.EDGE_FUNCTIONS: {
"pros": [
"自動スケーリング",
"Supabase認証統合",
"外部API連携に適している",
"サーバーレス運用"
],
"cons": [
"TypeScript/Deno学習コスト",
"コールドスタート遅延",
"デバッグの複雑性"
]
},
ArchitecturePattern.API_SERVER: {
"pros": [
"完全な制御と柔軟性",
"大規模システム対応",
"複雑ビジネスロジック実装可能",
"エンタープライズ機能対応"
],
"cons": [
"初期開発時間が長い",
"運用・保守コストが高い",
"インフラ管理が必要"
]
}
}
return (
pattern_characteristics.get(pattern, {}).get("pros", []),
pattern_characteristics.get(pattern, {}).get("cons", [])
)
def _assess_risks(self, pattern: ArchitecturePattern, req: ProjectRequirements) -> List[str]:
"""リスク評価"""
risks = []
if pattern == ArchitecturePattern.CLIENT_SIDE:
if req.expected_users > 1000:
risks.append("ユーザー増加時のパフォーマンス劣化リスク")
if req.security_level in ["high", "enterprise"]:
risks.append("セキュリティ要件を満たせないリスク")
elif pattern == ArchitecturePattern.EDGE_FUNCTIONS:
if req.development_timeline_weeks < 8:
risks.append("学習コストによるスケジュール遅延リスク")
risks.append("Deno/TypeScript エコシステム依存リスク")
elif pattern == ArchitecturePattern.API_SERVER:
if req.team_size in [TeamSize.SOLO, TeamSize.SMALL]:
risks.append("小規模チームでの運用負荷過大リスク")
if req.budget_constraint == "low":
risks.append("運用コスト超過リスク")
return risks
def _suggest_migration_path(self, req: ProjectRequirements, target_pattern: ArchitecturePattern) -> Optional[List[ArchitecturePattern]]:
"""移行パス提案"""
# 段階的移行が推奨される場合
if target_pattern == ArchitecturePattern.API_SERVER and req.development_timeline_weeks < 16:
return [
ArchitecturePattern.CLIENT_SIDE,
ArchitecturePattern.EDGE_FUNCTIONS,
ArchitecturePattern.API_SERVER
]
if target_pattern == ArchitecturePattern.MICROSERVICES:
return [
ArchitecturePattern.API_SERVER,
ArchitecturePattern.MICROSERVICES
]
return None
def _estimate_development_time(self, req: ProjectRequirements, pattern: ArchitecturePattern) -> int:
"""開発時間予測"""
base_times = {
ArchitecturePattern.CLIENT_SIDE: 4,
ArchitecturePattern.EDGE_FUNCTIONS: 8,
ArchitecturePattern.API_SERVER: 16,
ArchitecturePattern.HYBRID: 20,
ArchitecturePattern.MICROSERVICES: 32
}
base_time = base_times.get(pattern, 12)
# 複雑性による調整
complexity_multiplier = {
ComplexityLevel.LOW: 0.8,
ComplexityLevel.MEDIUM: 1.0,
ComplexityLevel.HIGH: 1.5,
ComplexityLevel.VERY_HIGH: 2.0
}
# チームサイズによる調整
team_multiplier = {
TeamSize.SOLO: 1.5,
TeamSize.SMALL: 1.0,
TeamSize.MEDIUM: 0.8,
TeamSize.LARGE: 0.7
}
adjusted_time = base_time * complexity_multiplier.get(req.complexity_level, 1.0) * team_multiplier.get(req.team_size, 1.0)
return int(adjusted_time)
def _estimate_maintenance_effort(self, req: ProjectRequirements, pattern: ArchitecturePattern) -> str:
"""メンテナンス工数予測"""
effort_scores = {
ArchitecturePattern.CLIENT_SIDE: 1,
ArchitecturePattern.EDGE_FUNCTIONS: 2,
ArchitecturePattern.API_SERVER: 4,
ArchitecturePattern.HYBRID: 5,
ArchitecturePattern.MICROSERVICES: 8
}
base_effort = effort_scores.get(pattern, 3)
# ユーザー数による調整
if req.expected_users > 10000:
base_effort += 2
elif req.expected_users > 1000:
base_effort += 1
# 複雑性による調整
if req.complexity_level in [ComplexityLevel.HIGH, ComplexityLevel.VERY_HIGH]:
base_effort += 2
if base_effort <= 2:
return "低"
elif base_effort <= 5:
return "中"
else:
return "高"
def _load_decision_rules(self) -> Dict[str, Any]:
"""決定ルール読み込み"""
# 実際の実装では外部設定ファイルから読み込み
return {}
def _load_weight_factors(self) -> Dict[str, float]:
"""重み係数読み込み"""
return {
"scalability": 0.20,
"development_speed": 0.15,
"maintenance_ease": 0.15,
"security": 0.15,
"cost_efficiency": 0.15,
"team_fit": 0.10,
"technology_match": 0.10
}
# 使用例とテストケース
def create_sample_requirements() -> List[Tuple[str, ProjectRequirements]]:
"""サンプル要件作成"""
return [
("スタートアップMVP", ProjectRequirements(
project_name="TaskFlow MVP",
description="タスク管理SaaS のMVP",
expected_users=500,
concurrent_users=50,
data_volume_gb=1.0,
requests_per_second=10,
real_time_features=True,
offline_support=False,
mobile_app=True,
third_party_integrations=2,
payment_processing=True,
file_uploads=False,
notifications=True,
analytics_reporting=False,
availability_requirement=99.0,
response_time_requirement_ms=2000,
security_level="standard",
compliance_requirements=[],
team_size=TeamSize.SMALL,
development_timeline_weeks=8,
budget_constraint="low",
existing_infrastructure=[],
preferred_technologies=["python", "react"],
user_growth_rate_monthly=0.2,
feature_expansion_planned=True,
international_expansion=False,
complexity_level=ComplexityLevel.MEDIUM
)),
("エンタープライズプラットフォーム", ProjectRequirements(
project_name="Enterprise Analytics Platform",
description="大企業向けデータ分析プラットフォーム",
expected_users=50000,
concurrent_users=5000,
data_volume_gb=500.0,
requests_per_second=1000,
real_time_features=True,
offline_support=True,
mobile_app=True,
third_party_integrations=15,
payment_processing=False,
file_uploads=True,
notifications=True,
analytics_reporting=True,
availability_requirement=99.9,
response_time_requirement_ms=500,
security_level="enterprise",
compliance_requirements=["GDPR", "SOX", "HIPAA"],
team_size=TeamSize.LARGE,
development_timeline_weeks=52,
budget_constraint="unlimited",
existing_infrastructure=["kubernetes", "postgresql"],
preferred_technologies=["python", "fastapi", "react"],
user_growth_rate_monthly=0.05,
feature_expansion_planned=True,
international_expansion=True,
complexity_level=ComplexityLevel.VERY_HIGH
)),
("社内ツール", ProjectRequirements(
project_name="Internal HR System",
description="社内人事管理システム",
expected_users=200,
concurrent_users=20,
data_volume_gb=10.0,
requests_per_second=5,
real_time_features=False,
offline_support=False,
mobile_app=False,
third_party_integrations=3,
payment_processing=False,
file_uploads=True,
notifications=True,
analytics_reporting=True,
availability_requirement=99.0,
response_time_requirement_ms=3000,
security_level="high",
compliance_requirements=["GDPR"],
team_size=TeamSize.SMALL,
development_timeline_weeks=12,
budget_constraint="medium",
existing_infrastructure=["supabase"],
preferred_technologies=["typescript", "vue"],
user_growth_rate_monthly=0.02,
feature_expansion_planned=False,
international_expansion=False,
complexity_level=ComplexityLevel.MEDIUM
)),
("サンプルプロジェクト", ProjectRequirements(
project_name="Sample Project",
description="A sample project for testing",
expected_users=1000,
concurrent_users=100,
data_volume_gb=10.0,
requests_per_second=50,
real_time_features=True,
offline_support=False,
mobile_app=True,
third_party_integrations=3,
payment_processing=True,
file_uploads=True,
notifications=True,
analytics_reporting=True,
availability_requirement=99.9,
response_time_requirement_ms=200,
security_level="standard",
compliance_requirements=["GDPR"],
team_size=TeamSize.SMALL,
development_timeline_weeks=12,
budget_constraint="medium",
existing_infrastructure=["AWS"],
preferred_technologies=["Python", "React"],
user_growth_rate_monthly=10.0,
feature_expansion_planned=True,
international_expansion=False
))
]
# テスト実行
def run_architecture_analysis():
"""アーキテクチャ分析実行"""
engine = ArchitectureDecisionEngine()
sample_requirements = create_sample_requirements()
for name, requirements in sample_requirements:
print(f"\n{'='*60}")
print(f"プロジェクト: {name}")
print(f"{'='*60}")
recommendation = engine.analyze_requirements(requirements)
print(f"推奨アーキテクチャ: {recommendation.primary_pattern.value}")
print(f"信頼度: {recommendation.confidence_score:.1f}%")
print(f"予想開発期間: {recommendation.estimated_development_time_weeks}週間")
print(f"メンテナンス工数: {recommendation.estimated_maintenance_effort}")
print("\n推奨理由:")
for reason in recommendation.reasoning:
print(f" • {reason}")
print("\nメリット:")
for pro in recommendation.pros:
print(f" ✓ {pro}")
print("\nデメリット:")
for con in recommendation.cons:
print(f" ✗ {con}")
if recommendation.risks:
print("\nリスク:")
for risk in recommendation.risks:
print(f" ⚠ {risk}")
if recommendation.migration_path:
print("\n移行パス:")
path_str = " → ".join([p.value for p in recommendation.migration_path])
print(f" {path_str}")
if __name__ == "__main__":
run_architecture_analysis()
9.2 移行戦略(パターン間の段階的移行)
段階的移行戦略実装
# migration_strategy.py
from typing import Dict, Any, List, Optional, Tuple
from dataclasses import dataclass
from enum import Enum
from datetime import datetime, timedelta
import json
class MigrationPhase(Enum):
PLANNING = "planning"
PREPARATION = "preparation"
IMPLEMENTATION = "implementation"
TESTING = "testing"
DEPLOYMENT = "deployment"
MONITORING = "monitoring"
COMPLETION = "completion"
class MigrationRisk(Enum):
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
@dataclass
class MigrationTask:
id: str
name: str
description: str
phase: MigrationPhase
estimated_hours: int
dependencies: List[str]
risk_level: MigrationRisk
owner: str
deadline: Optional[datetime] = None
completed: bool = False
@dataclass
class MigrationMilestone:
name: str
description: str
target_date: datetime
criteria: List[str]
rollback_plan: str
class MigrationStrategy:
"""段階的移行戦略"""
def __init__(self):
self.migration_patterns = {
("client_side", "edge_functions"): self._client_to_edge_migration,
("client_side", "api_server"): self._client_to_api_migration,
("edge_functions", "api_server"): self._edge_to_api_migration,
("api_server", "microservices"): self._api_to_microservices_migration
}
def create_migration_plan(
self,
current_pattern: str,
target_pattern: str,
project_constraints: Dict[str, Any]
) -> Dict[str, Any]:
"""移行計画作成"""
migration_key = (current_pattern, target_pattern)
if migration_key not in self.migration_patterns:
return self._create_unsupported_migration_plan(current_pattern, target_pattern)
migration_func = self.migration_patterns[migration_key]
return migration_func(project_constraints)
def _client_to_edge_migration(self, constraints: Dict[str, Any]) -> Dict[str, Any]:
"""クライアントサイド → Edge Functions 移行"""
tasks = [
MigrationTask(
id="analyze_business_logic",
name="ビジネスロジック分析",
description="クライアントサイドからサーバーサイドに移行すべきロジックを特定",
phase=MigrationPhase.PLANNING,
estimated_hours=16,
dependencies=[],
risk_level=MigrationRisk.MEDIUM,
owner="tech_lead"
),
MigrationTask(
id="setup_edge_functions_env",
name="Edge Functions環境構築",
description="Deno開発環境とデプロイパイプライン構築",
phase=MigrationPhase.PREPARATION,
estimated_hours=8,
dependencies=[],
risk_level=MigrationRisk.LOW,
owner="devops"
),
MigrationTask(
id="implement_auth_integration",
name="認証統合実装",
description="Edge FunctionsでのJWT検証とSupabase Auth統合",
phase=MigrationPhase.PREPARATION,
estimated_hours=12,
dependencies=["setup_edge_functions_env"],
risk_level=MigrationRisk.MEDIUM,
owner="backend_dev"
),
MigrationTask(
id="migrate_critical_functions",
name="重要機能移行",
description="決済処理など重要なビジネスロジックから順次移行",
phase=MigrationPhase.IMPLEMENTATION,
estimated_hours=40,
dependencies=["analyze_business_logic", "implement_auth_integration"],
risk_level=MigrationRisk.HIGH,
owner="backend_dev"
),
MigrationTask(
id="implement_feature_flags",
name="フィーチャーフラグ実装",
description="段階的切り替えのためのフィーチャーフラグシステム",
phase=MigrationPhase.IMPLEMENTATION,
estimated_hours=16,
dependencies=["setup_edge_functions_env"],
risk_level=MigrationRisk.MEDIUM,
owner="frontend_dev"
),
MigrationTask(
id="performance_testing",
name="パフォーマンステスト",
description="移行後の性能検証とボトルネック特定",
phase=MigrationPhase.TESTING,
estimated_hours=24,
dependencies=["migrate_critical_functions"],
risk_level=MigrationRisk.MEDIUM,
owner="qa_engineer"
),
MigrationTask(
id="gradual_rollout",
name="段階的ロールアウト",
description="ユーザーベースの段階的な新機能展開",
phase=MigrationPhase.DEPLOYMENT,
estimated_hours=32,
dependencies=["performance_testing", "implement_feature_flags"],
risk_level=MigrationRisk.HIGH,
owner="tech_lead"
),
MigrationTask(
id="monitoring_setup",
name="監視システム構築",
description="Edge Functions専用の監視・アラートシステム",
phase=MigrationPhase.MONITORING,
estimated_hours=20,
dependencies=["gradual_rollout"],
risk_level=MigrationRisk.MEDIUM,
owner="devops"
)
]
milestones = [
MigrationMilestone(
name="開発環境準備完了",
description="Edge Functions開発・テスト環境の構築完了",
target_date=datetime.now() + timedelta(weeks=2),
criteria=[
"Deno開発環境が稼働",
"CI/CDパイプラインが動作",
"基本的な認証機能が実装済み"
],
rollback_plan="既存クライアントサイド実装に戻す"
),
MigrationMilestone(
name="コア機能移行完了",
description="重要なビジネスロジックの移行完了",
target_date=datetime.now() + timedelta(weeks=6),
criteria=[
"決済処理がEdge Functionsで動作",
"パフォーマンステストが合格",
"セキュリティ監査が完了"
],
rollback_plan="フィーチャーフラグで旧実装に切り戻し"
),
MigrationMilestone(
name="移行完了",
description="全機能の移行と旧コード削除",
target_date=datetime.now() + timedelta(weeks=10),
criteria=[
"全ユーザーが新実装を使用",
"旧クライアントサイドコードを削除",
"監視システムが正常稼働"
],
rollback_plan="緊急時のみ全体ロールバック"
)
]
return {
"migration_type": "client_side_to_edge_functions",
"estimated_duration_weeks": 12,
"total_effort_hours": sum(task.estimated_hours for task in tasks),
"risk_assessment": "中程度",
"tasks": [task.__dict__ for task in tasks],
"milestones": [milestone.__dict__ for milestone in milestones],
"success_criteria": [
"レスポンス時間が現在の1.5倍以内",
"エラー率が0.1%以下",
"機能完全性の維持",
"セキュリティ強化の確認"
],
"rollback_triggers": [
"パフォーマンス劣化(2倍以上)",
"エラー率1%超過",
"セキュリティ問題発覚",
"ユーザー満足度大幅低下"
]
}
def _client_to_api_migration(self, constraints: Dict[str, Any]) -> Dict[str, Any]:
"""クライアントサイド → API Server 移行(段階的)"""
# 3段階での移行を推奨
phases = [
{
"name": "Phase 1: Edge Functions導入",
"duration_weeks": 8,
"description": "まずEdge Functionsで重要機能を移行"
},
{
"name": "Phase 2: API Server構築",
"duration_weeks": 12,
"description": "FastAPI サーバーの並行開発"
},
{
"name": "Phase 3: 統合移行",
"duration_weeks": 8,
"description": "Edge FunctionsからAPI Serverへの移行"
}
]
return {
"migration_type": "client_side_to_api_server_staged",
"estimated_duration_weeks": 28,
"total_effort_hours": 400,
"risk_assessment": "高",
"phases": phases,
"recommended_approach": "段階的移行(Client → Edge → API)",
"reasoning": [
"直接移行はリスクが高すぎる",
"Edge Functionsで段階的にサーバーサイド化",
"最終的にAPI Serverで完全制御を実現"
]
}
def _edge_to_api_migration(self, constraints: Dict[str, Any]) -> Dict[str, Any]:
"""Edge Functions → API Server 移行"""
tasks = [
MigrationTask(
id="api_architecture_design",
name="API アーキテクチャ設計",
description="FastAPI アプリケーションの詳細設計",
phase=MigrationPhase.PLANNING,
estimated_hours=24,
dependencies=[],
risk_level=MigrationRisk.MEDIUM,
owner="architect"
),
MigrationTask(
id="infrastructure_setup",
name="インフラストラクチャ構築",
description="Kubernetes/Docker環境構築とCI/CD設定",
phase=MigrationPhase.PREPARATION,
estimated_hours=40,
dependencies=[],
risk_level=MigrationRisk.HIGH,
owner="devops"
),
MigrationTask(
id="core_api_implementation",
name="コアAPI実装",
description="認証・認可・基本CRUD機能の実装",
phase=MigrationPhase.IMPLEMENTATION,
estimated_hours=80,
dependencies=["api_architecture_design", "infrastructure_setup"],
risk_level=MigrationRisk.MEDIUM,
owner="backend_team"
),
MigrationTask(
id="business_logic_migration",
name="ビジネスロジック移行",
description="Edge FunctionsからFastAPIへのロジック移植",
phase=MigrationPhase.IMPLEMENTATION,
estimated_hours=120,
dependencies=["core_api_implementation"],
risk_level=MigrationRisk.HIGH,
owner="backend_team"
),
MigrationTask(
id="data_migration_strategy",
name="データ移行戦略実装",
description="無停止データ移行のための仕組み構築",
phase=MigrationPhase.IMPLEMENTATION,
estimated_hours=60,
dependencies=["business_logic_migration"],
risk_level=MigrationRisk.CRITICAL,
owner="database_specialist"
),
MigrationTask(
id="load_testing",
name="負荷テスト",
description="本番想定負荷でのパフォーマンステスト",
phase=MigrationPhase.TESTING,
estimated_hours=40,
dependencies=["business_logic_migration"],
risk_level=MigrationRisk.MEDIUM,
owner="qa_team"
),
MigrationTask(
id="canary_deployment",
name="カナリアデプロイメント",
description="段階的なトラフィック移行",
phase=MigrationPhase.DEPLOYMENT,
estimated_hours=60,
dependencies=["load_testing", "data_migration_strategy"],
risk_level=MigrationRisk.HIGH,
owner="devops"
)
]
return {
"migration_type": "edge_functions_to_api_server",
"estimated_duration_weeks": 20,
"total_effort_hours": sum(task.estimated_hours for task in tasks),
"risk_assessment": "高",
"tasks": [task.__dict__ for task in tasks],
"critical_considerations": [
"Edge Functionsの並行運用期間の管理",
"データベース接続モデルの変更",
"認証・認可システムの統合",
"監視・ログシステムの移行"
]
}
def _api_to_microservices_migration(self, constraints: Dict[str, Any]) -> Dict[str, Any]:
"""API Server → Microservices 移行"""
# マイクロサービス分割戦略
service_decomposition = [
{
"service_name": "user-service",
"description": "ユーザー管理・認証",
"estimated_effort_hours": 120,
"priority": "high"
},
{
"service_name": "project-service",
"description": "プロジェクト・タスク管理",
"estimated_effort_hours": 160,
"priority": "high"
},
{
"service_name": "notification-service",
"description": "通知・メール配信",
"estimated_effort_hours": 80,
"priority": "medium"
},
{
"service_name": "analytics-service",
"description": "分析・レポート生成",
"estimated_effort_hours": 100,
"priority": "low"
}
]
return {
"migration_type": "api_server_to_microservices",
"estimated_duration_weeks": 32,
"total_effort_hours": 600,
"risk_assessment": "非常に高い",
"service_decomposition": service_decomposition,
"migration_sequence": [
"1. 分析・レポートサービス分離(低リスク)",
"2. 通知サービス分離",
"3. プロジェクト管理サービス分離",
"4. ユーザー管理サービス分離(最高リスク)"
],
"infrastructure_requirements": [
"サービスメッシュ(Istio/Linkerd)",
"API ゲートウェイ",
"分散トレーシング",
"集中ログ管理",
"サービス間通信(gRPC/REST)"
]
}
def _create_unsupported_migration_plan(self, current: str, target: str) -> Dict[str, Any]:
"""未対応移行パスの場合"""
return {
"migration_type": f"{current}_to_{target}",
"status": "unsupported",
"message": f"直接移行パス({current} → {target})は推奨されません",
"alternative_paths": self._suggest_alternative_paths(current, target)
}
def _suggest_alternative_paths(self, current: str, target: str) -> List[str]:
"""代替移行パス提案"""
# 推奨移行経路
migration_graph = {
"client_side": ["edge_functions", "api_server"],
"edge_functions": ["api_server", "microservices"],
"api_server": ["microservices"],
"microservices": []
}
# 幅優先探索で経路探索
from collections import deque
queue = deque([(current, [current])])
visited = {current}
while queue:
node, path = queue.popleft()
if node == target:
return [" → ".join(path)]
for neighbor in migration_graph.get(node, []):
if neighbor not in visited:
visited.add(neighbor)
queue.append((neighbor, path + [neighbor]))
return ["直接移行パスが見つかりません"]
# 移行実行管理
class MigrationExecutor:
"""移行実行管理"""
def __init__(self):
self.current_migration = None
self.execution_log = []
def start_migration(self, migration_plan: Dict[str, Any]) -> str:
"""移行開始"""
migration_id = f"migration_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
self.current_migration = {
"id": migration_id,
"plan": migration_plan,
"status": "in_progress",
"started_at": datetime.now(),
"completed_tasks": [],
"current_phase": MigrationPhase.PLANNING
}
self.execution_log.append({
"timestamp": datetime.now(),
"event": "migration_started",
"migration_id": migration_id
})
return migration_id
def complete_task(self, task_id: str, notes: str = "") -> bool:
"""タスク完了"""
if not self.current_migration:
return False
task = next(
(t for t in self.current_migration["plan"]["tasks"] if t["id"] == task_id),
None
)
if not task:
return False
task["completed"] = True
task["completed_at"] = datetime.now()
task["notes"] = notes
self.current_migration["completed_tasks"].append(task_id)
self.execution_log.append({
"timestamp": datetime.now(),
"event": "task_completed",
"task_id": task_id,
"notes": notes
})
# フェーズ進行チェック
self._check_phase_completion()
return True
def _check_phase_completion(self):
"""フェーズ完了チェック"""
current_phase = self.current_migration["current_phase"]
tasks_in_phase = [
t for t in self.current_migration["plan"]["tasks"]
if t["phase"] == current_phase.value
]
completed_in_phase = [
t for t in tasks_in_phase if t.get("completed", False)
]
if len(completed_in_phase) == len(tasks_in_phase):
# 次のフェーズに進行
next_phase = self._get_next_phase(current_phase)
if next_phase:
self.current_migration["current_phase"] = next_phase
self.execution_log.append({
"timestamp": datetime.now(),
"event": "phase_completed",
"phase": current_phase.value,
"next_phase": next_phase.value
})
def _get_next_phase(self, current_phase: MigrationPhase) -> Optional[MigrationPhase]:
"""次フェーズ取得"""
phases = list(MigrationPhase)
try:
current_index = phases.index(current_phase)
if current_index < len(phases) - 1:
return phases[current_index + 1]
except ValueError:
pass
return None
def get_migration_status(self) -> Dict[str, Any]:
"""移行状況取得"""
if not self.current_migration:
return {"status": "no_active_migration"}
total_tasks = len(self.current_migration["plan"]["tasks"])
completed_tasks = len(self.current_migration["completed_tasks"])
progress = (completed_tasks / total_tasks) * 100 if total_tasks > 0 else 0
return {
"migration_id": self.current_migration["id"],
"status": self.current_migration["status"],
"current_phase": self.current_migration["current_phase"].value,
"progress_percent": progress,
"completed_tasks": completed_tasks,
"total_tasks": total_tasks,
"started_at": self.current_migration["started_at"],
"estimated_completion": self._estimate_completion_date()
}
def _estimate_completion_date(self) -> Optional[datetime]:
"""完了予定日予測"""
if not self.current_migration:
return None
# 簡単な線形予測
total_tasks = len(self.current_migration["plan"]["tasks"])
completed_tasks = len(self.current_migration["completed_tasks"])
if completed_tasks == 0:
return None
elapsed_time = datetime.now() - self.current_migration["started_at"]
average_time_per_task = elapsed_time / completed_tasks
remaining_tasks = total_tasks - completed_tasks
return datetime.now() + (average_time_per_task * remaining_tasks)
# 使用例
def demo_migration_planning():
"""移行計画デモ"""
strategy = MigrationStrategy()
# サンプル制約
constraints = {
"team_size": 5,
"timeline_weeks": 16,
"risk_tolerance": "medium",
"budget": "medium"
}
# 移行計画作成
plan = strategy.create_migration_plan("client_side", "edge_functions", constraints)
print("移行計画:")
print(json.dumps(plan, indent=2, default=str, ensure_ascii=False))
# 移行実行
executor = MigrationExecutor()
migration_id = executor.start_migration(plan)
print(f"\n移行開始: {migration_id}")
# サンプルタスク完了
executor.complete_task("analyze_business_logic", "要件定義完了")
executor.complete_task("setup_edge_functions_env", "開発環境構築完了")
status = executor.get_migration_status()
print(f"\n現在の進捗: {status['progress_percent']:.1f}%")
if __name__ == "__main__":
demo_migration_planning()
9.3 ハイブリッド構成設計
複合アーキテクチャ設計
# hybrid_architecture.py
from typing import Dict, Any, List, Optional, Set
from dataclasses import dataclass
from enum import Enum
import json
class ComponentType(Enum):
CLIENT_SIDE = "client_side"
EDGE_FUNCTION = "edge_function"
API_SERVER = "api_server"
DATABASE_DIRECT = "database_direct"
EXTERNAL_SERVICE = "external_service"
class DataSensitivity(Enum):
PUBLIC = "public"
INTERNAL = "internal"
CONFIDENTIAL = "confidential"
RESTRICTED = "restricted"
class PerformanceRequirement(Enum):
LOW = "low" # >2秒
MEDIUM = "medium" # 500ms-2秒
HIGH = "high" # 100ms-500ms
CRITICAL = "critical" # <100ms
@dataclass
class ArchitectureComponent:
name: str
type: ComponentType
responsibilities: List[str]
data_access: List[str]
performance_req: PerformanceRequirement
security_req: DataSensitivity
dependencies: List[str]
class HybridArchitectureDesigner:
"""ハイブリッドアーキテクチャ設計"""
def __init__(self):
self.component_constraints = self._load_component_constraints()
def design_hybrid_architecture(
self,
functional_requirements: List[Dict[str, Any]],
non_functional_requirements: Dict[str, Any]
) -> Dict[str, Any]:
"""ハイブリッドアーキテクチャ設計"""
# 機能別最適配置決定
components = []
for req in functional_requirements:
component = self._design_component(req, non_functional_requirements)
components.append(component)
# 相互依存関係の最適化
optimized_components = self._optimize_dependencies(components)
# 統合アーキテクチャ生成
architecture = self._generate_integrated_architecture(optimized_components)
return architecture
def _design_component(
self,
functional_req: Dict[str, Any],
non_functional_reqs: Dict[str, Any]
) -> ArchitectureComponent:
"""個別コンポーネント設計"""
name = functional_req["name"]
responsibilities = functional_req["responsibilities"]
data_sensitivity = DataSensitivity(functional_req.get("data_sensitivity", "internal"))
performance_req = PerformanceRequirement(functional_req.get("performance", "medium"))
# 最適なコンポーネントタイプ決定
component_type = self._select_optimal_component_type(
responsibilities,
data_sensitivity,
performance_req,
non_functional_reqs
)
return ArchitectureComponent(
name=name,
type=component_type,
responsibilities=responsibilities,
data_access=functional_req.get("data_access", []),
performance_req=performance_req,
security_req=data_sensitivity,
dependencies=functional_req.get("dependencies", [])
)
def _select_optimal_component_type(
self,
responsibilities: List[str],
data_sensitivity: DataSensitivity,
performance_req: PerformanceRequirement,
non_functional_reqs: Dict[str, Any]
) -> ComponentType:
"""最適なコンポーネントタイプ選択"""
# 決定マトリックス
scores = {}
for comp_type in ComponentType:
score = self._calculate_component_score(
comp_type, responsibilities, data_sensitivity, performance_req, non_functional_reqs
)
scores[comp_type] = score
return max(scores.keys(), key=lambda x: scores[x])
def _calculate_component_score(
self,
comp_type: ComponentType,
responsibilities: List[str],
data_sensitivity: DataSensitivity,
performance_req: PerformanceRequirement,
non_functional_reqs: Dict[str, Any]
) -> float:
"""コンポーネントタイプスコア計算"""
score = 0.0
# 責務適合性
if comp_type == ComponentType.CLIENT_SIDE:
if any(r in ["ui_interaction", "real_time_updates", "offline_support"] for r in responsibilities):
score += 30
if any(r in ["payment_processing", "sensitive_data_processing"] for r in responsibilities):
score -= 20 # セキュリティリスク
elif comp_type == ComponentType.EDGE_FUNCTION:
if any(r in ["api_integration", "data_transformation", "notification"] for r in responsibilities):
score += 25
if any(r in ["complex_business_logic", "heavy_computation"] for r in responsibilities):
score -= 15 # 実行時間制限
elif comp_type == ComponentType.API_SERVER:
if any(r in ["complex_business_logic", "data_processing", "integration"] for r in responsibilities):
score += 35
if any(r in ["simple_crud", "static_content"] for r in responsibilities):
score -= 10 # オーバーエンジニアリング
# セキュリティ適合性
security_scores = {
ComponentType.CLIENT_SIDE: {
DataSensitivity.PUBLIC: 20,
DataSensitivity.INTERNAL: 10,
DataSensitivity.CONFIDENTIAL: -10,
DataSensitivity.RESTRICTED: -30
},
ComponentType.EDGE_FUNCTION: {
DataSensitivity.PUBLIC: 25,
DataSensitivity.INTERNAL: 20,
DataSensitivity.CONFIDENTIAL: 15,
DataSensitivity.RESTRICTED: 5
},
ComponentType.API_SERVER: {
DataSensitivity.PUBLIC: 15,
DataSensitivity.INTERNAL: 25,
DataSensitivity.CONFIDENTIAL: 30,
DataSensitivity.RESTRICTED: 35
}
}
score += security_scores.get(comp_type, {}).get(data_sensitivity, 0)
# パフォーマンス適合性
performance_scores = {
ComponentType.CLIENT_SIDE: {
PerformanceRequirement.CRITICAL: 30,
PerformanceRequirement.HIGH: 25,
PerformanceRequirement.MEDIUM: 20,
PerformanceRequirement.LOW: 15
},
ComponentType.EDGE_FUNCTION: {
PerformanceRequirement.CRITICAL: 15, # コールドスタート
PerformanceRequirement.HIGH: 25,
PerformanceRequirement.MEDIUM: 30,
PerformanceRequirement.LOW: 25
},
ComponentType.API_SERVER: {
PerformanceRequirement.CRITICAL: 20,
PerformanceRequirement.HIGH: 30,
PerformanceRequirement.MEDIUM: 25,
PerformanceRequirement.LOW: 20
}
}
score += performance_scores.get(comp_type, {}).get(performance_req, 0)
return score
def _optimize_dependencies(self, components: List[ArchitectureComponent]) -> List[ArchitectureComponent]:
"""依存関係最適化"""
# 依存関係グラフ構築
dependency_graph = {}
for comp in components:
dependency_graph[comp.name] = comp.dependencies
# 循環依存検出
cycles = self._detect_cycles(dependency_graph)
if cycles:
# 循環依存解決策提案
for cycle in cycles:
self._resolve_cycle(cycle, components)
# 通信コスト最適化
optimized_components = self._optimize_communication_costs(components)
return optimized_components
def _detect_cycles(self, graph: Dict[str, List[str]]) -> List[List[str]]:
"""循環依存検出"""
cycles = []
visited = set()
rec_stack = set()
path = []
def dfs(node):
if node in rec_stack:
cycle_start = path.index(node)
cycles.append(path[cycle_start:])
return
if node in visited:
return
visited.add(node)
rec_stack.add(node)
path.append(node)
for neighbor in graph.get(node, []):
dfs(neighbor)
rec_stack.remove(node)
path.pop()
for node in graph:
if node not in visited:
dfs(node)
return cycles
def _resolve_cycle(self, cycle: List[str], components: List[ArchitectureComponent]):
"""循環依存解決"""
# イベント駆動アーキテクチャの提案
print(f"循環依存検出: {' -> '.join(cycle)}")
print("解決策: イベント駆動パターンまたは共通インターフェース抽出を推奨")
def _optimize_communication_costs(self, components: List[ArchitectureComponent]) -> List[ArchitectureComponent]:
"""通信コスト最適化"""
# 高頻度通信の識別
high_frequency_pairs = self._identify_high_frequency_communication(components)
# 同一デプロイメント単位への統合提案
for pair in high_frequency_pairs:
comp1, comp2 = pair
if self._should_colocate(comp1, comp2):
print(f"推奨: {comp1.name} と {comp2.name} の同一デプロイメント検討")
return components
def _identify_high_frequency_communication(self, components: List[ArchitectureComponent]) -> List[tuple]:
"""高頻度通信の識別"""
# 実装簡略化
return []
def _should_colocate(self, comp1: ArchitectureComponent, comp2: ArchitectureComponent) -> bool:
"""同一配置判定"""
# 同じコンポーネントタイプかつ似た要件の場合
return (comp1.type == comp2.type and
comp1.performance_req == comp2.performance_req and
comp1.security_req == comp2.security_req)
def _generate_integrated_architecture(self, components: List[ArchitectureComponent]) -> Dict[str, Any]:
"""統合アーキテクチャ生成"""
# コンポーネントタイプ別グループ化
grouped_components = {}
for comp in components:
comp_type = comp.type.value
if comp_type not in grouped_components:
grouped_components[comp_type] = []
grouped_components[comp_type].append(comp)
# 通信パターン分析
communication_patterns = self._analyze_communication_patterns(components)
# セキュリティ境界定義
security_boundaries = self._define_security_boundaries(components)
# デプロイメント戦略
deployment_strategy = self._design_deployment_strategy(grouped_components)
return {
"architecture_type": "hybrid",
"components": {
comp_type: [self._component_to_dict(comp) for comp in comps]
for comp_type, comps in grouped_components.items()
},
"communication_patterns": communication_patterns,
"security_boundaries": security_boundaries,
"deployment_strategy": deployment_strategy,
"monitoring_strategy": self._design_monitoring_strategy(components),
"scalability_plan": self._design_scalability_plan(grouped_components)
}
def _analyze_communication_patterns(self, components: List[ArchitectureComponent]) -> Dict[str, Any]:
"""通信パターン分析"""
patterns = {
"synchronous": [],
"asynchronous": [],
"event_driven": []
}
for comp in components:
for dep in comp.dependencies:
# 性能要件に基づく通信パターン決定
if comp.performance_req == PerformanceRequirement.CRITICAL:
patterns["synchronous"].append(f"{comp.name} -> {dep}")
elif comp.type == ComponentType.EDGE_FUNCTION:
patterns["asynchronous"].append(f"{comp.name} -> {dep}")
else:
patterns["event_driven"].append(f"{comp.name} -> {dep}")
return patterns
def _define_security_boundaries(self, components: List[ArchitectureComponent]) -> List[Dict[str, Any]]:
"""セキュリティ境界定義"""
boundaries = []
# データ機密性レベル別境界
for sensitivity in DataSensitivity:
boundary_components = [
comp.name for comp in components
if comp.security_req == sensitivity
]
if boundary_components:
boundaries.append({
"name": f"{sensitivity.value}_boundary",
"components": boundary_components,
"security_controls": self._get_security_controls(sensitivity)
})
return boundaries
def _get_security_controls(self, sensitivity: DataSensitivity) -> List[str]:
"""セキュリティ制御取得"""
controls_map = {
DataSensitivity.PUBLIC: ["rate_limiting", "input_validation"],
DataSensitivity.INTERNAL: ["authentication", "authorization", "audit_logging"],
DataSensitivity.CONFIDENTIAL: ["encryption_at_rest", "encryption_in_transit", "access_monitoring"],
DataSensitivity.RESTRICTED: ["multi_factor_auth", "data_loss_prevention", "privileged_access_management"]
}
return controls_map.get(sensitivity, [])
def _design_deployment_strategy(self, grouped_components: Dict[str, List[ArchitectureComponent]]) -> Dict[str, Any]:
"""デプロイメント戦略設計"""
strategy = {}
for comp_type, components in grouped_components.items():
if comp_type == "client_side":
strategy[comp_type] = {
"deployment_method": "static_hosting",
"cdn": "cloudflare",
"build_pipeline": "github_actions"
}
elif comp_type == "edge_function":
strategy[comp_type] = {
"deployment_method": "supabase_functions",
"auto_scaling": True,
"monitoring": "built_in"
}
elif comp_type == "api_server":
strategy[comp_type] = {
"deployment_method": "kubernetes",
"container_registry": "ghcr.io",
"auto_scaling": "horizontal_pod_autoscaler",
"load_balancer": "nginx_ingress"
}
return strategy
def _design_monitoring_strategy(self, components: List[ArchitectureComponent]) -> Dict[str, Any]:
"""監視戦略設計"""
return {
"metrics_collection": "prometheus",
"log_aggregation": "elasticsearch",
"distributed_tracing": "jaeger",
"alerting": "alertmanager",
"dashboards": "grafana",
"component_specific": {
comp.name: self._get_component_monitoring(comp)
for comp in components
}
}
def _get_component_monitoring(self, component: ArchitectureComponent) -> List[str]:
"""コンポーネント別監視項目"""
base_metrics = ["response_time", "error_rate", "throughput"]
if component.type == ComponentType.CLIENT_SIDE:
return base_metrics + ["page_load_time", "user_interactions"]
elif component.type == ComponentType.EDGE_FUNCTION:
return base_metrics + ["cold_start_time", "memory_usage"]
elif component.type == ComponentType.API_SERVER:
return base_metrics + ["cpu_usage", "memory_usage", "database_connections"]
return base_metrics
def _design_scalability_plan(self, grouped_components: Dict[str, List[ArchitectureComponent]]) -> Dict[str, Any]:
"""スケーラビリティ計画設計"""
plan = {}
for comp_type, components in grouped_components.items():
if comp_type == "client_side":
plan[comp_type] = {
"scaling_method": "cdn_scaling",
"considerations": ["bundle_size_optimization", "lazy_loading"]
}
elif comp_type == "edge_function":
plan[comp_type] = {
"scaling_method": "automatic",
"considerations": ["cold_start_optimization", "memory_limits"]
}
elif comp_type == "api_server":
plan[comp_type] = {
"scaling_method": "horizontal",
"target_metrics": ["cpu_utilization", "memory_utilization"],
"considerations": ["stateless_design", "database_connection_pooling"]
}
return plan
def _component_to_dict(self, component: ArchitectureComponent) -> Dict[str, Any]:
"""コンポーネント辞書変換"""
return {
"name": component.name,
"type": component.type.value,
"responsibilities": component.responsibilities,
"data_access": component.data_access,
"performance_requirement": component.performance_req.value,
"security_requirement": component.security_req.value,
"dependencies": component.dependencies
}
def _load_component_constraints(self) -> Dict[str, Any]:
"""コンポーネント制約読み込み"""
# 実装省略
return {}
# 実用例
def design_sample_hybrid_architecture():
"""サンプルハイブリッドアーキテクチャ設計"""
designer = HybridArchitectureDesigner()
# 機能要件定義
functional_requirements = [
{
"name": "user_interface",
"responsibilities": ["ui_interaction", "real_time_updates", "offline_support"],
"data_sensitivity": "public",
"performance": "critical",
"data_access": ["user_profile", "dashboard_data"],
"dependencies": ["authentication_service", "api_gateway"]
},
{
"name": "authentication_service",
"responsibilities": ["user_authentication", "session_management", "token_validation"],
"data_sensitivity": "confidential",
"performance": "high",
"data_access": ["user_credentials", "session_data"],
"dependencies": ["user_database"]
},
{
"name": "payment_processing",
"responsibilities": ["payment_validation", "transaction_processing", "fraud_detection"],
"data_sensitivity": "restricted",
"performance": "high",
"data_access": ["payment_data", "transaction_history"],
"dependencies": ["external_payment_gateway", "fraud_detection_service"]
},
{
"name": "notification_service",
"responsibilities": ["email_notifications", "push_notifications", "sms_notifications"],
"data_sensitivity": "internal",
"performance": "medium",
"data_access": ["user_preferences", "notification_templates"],
"dependencies": ["email_service", "push_service"]
},
{
"name": "analytics_engine",
"responsibilities": ["data_aggregation", "report_generation", "trend_analysis"],
"data_sensitivity": "internal",
"performance": "low",
"data_access": ["event_data", "user_behavior"],
"dependencies": ["data_warehouse", "ml_service"]
}
]
# 非機能要件
non_functional_requirements = {
"availability": 99.9,
"scalability": "high",
"security_compliance": ["GDPR", "PCI_DSS"],
"performance_budget": "medium",
"team_size": 8,
"maintenance_preference": "automated"
}
# アーキテクチャ設計実行
architecture = designer.design_hybrid_architecture(
functional_requirements,
non_functional_requirements
)
print("ハイブリッドアーキテクチャ設計結果:")
print(json.dumps(architecture, indent=2, ensure_ascii=False))
return architecture
if __name__ == "__main__":
design_sample_hybrid_architecture()
トラブルシューティング
チーム開発・CI/CDの問題
問題1: 環境間でのデータベース不整合
症状:
- ローカル環境とステージング環境でテーブル構造が異なる
- デプロイ後にマイグレーションエラーが発生
- 開発者間でスキーマの競合
診断手順:
import subprocess
import json
from supabase import create_client
class EnvironmentSyncDiagnostics:
def __init__(self):
self.environments = {
'local': {'url': 'http://localhost:54321', 'key': 'local_anon_key'},
'staging': {'url': 'https://staging.supabase.co', 'key': 'staging_anon_key'},
'production': {'url': 'https://prod.supabase.co', 'key': 'prod_anon_key'}
}
async def diagnose_schema_differences(self):
"""スキーマ差分診断"""
schema_comparison = {}
for env_name, config in self.environments.items():
try:
client = create_client(config['url'], config['key'])
# テーブル構造取得
tables_query = """
SELECT
table_name,
column_name,
data_type,
is_nullable,
column_default
FROM information_schema.columns
WHERE table_schema = 'public'
ORDER BY table_name, ordinal_position;
"""
result = await client.rpc('execute_sql', {'query': tables_query}).execute()
schema_comparison[env_name] = result.data
except Exception as e:
schema_comparison[env_name] = {"error": str(e)}
# 差分分析
differences = self._analyze_schema_differences(schema_comparison)
return {
"environments": list(self.environments.keys()),
"schema_comparison": schema_comparison,
"differences": differences,
"recommendations": self._generate_sync_recommendations(differences)
}
def _analyze_schema_differences(self, schemas):
"""スキーマ差分分析"""
differences = []
if 'local' in schemas and 'staging' in schemas:
local_tables = {f"{row['table_name']}.{row['column_name']}"
for row in schemas['local'] if 'error' not in schemas['local']}
staging_tables = {f"{row['table_name']}.{row['column_name']}"
for row in schemas['staging'] if 'error' not in schemas['staging']}
# ローカルにのみ存在
local_only = local_tables - staging_tables
if local_only:
differences.append({
"type": "missing_in_staging",
"items": list(local_only)
})
# ステージングにのみ存在
staging_only = staging_tables - local_tables
if staging_only:
differences.append({
"type": "missing_in_local",
"items": list(staging_only)
})
return differences
def _generate_sync_recommendations(self, differences):
"""同期推奨事項生成"""
recommendations = []
for diff in differences:
if diff['type'] == 'missing_in_staging':
recommendations.append(
"ローカル→ステージングへのマイグレーション実行が必要"
)
elif diff['type'] == 'missing_in_local':
recommendations.append(
"ステージング→ローカルへのスキーマ同期が必要"
)
return recommendations
# 実行例
diagnostics = EnvironmentSyncDiagnostics()
result = await diagnostics.diagnose_schema_differences()
解決策:
class AutoMigrationManager:
def __init__(self, source_env, target_env):
self.source = source_env
self.target = target_env
async def generate_sync_migration(self):
"""同期用マイグレーション生成"""
# 1. 差分SQLの生成
diff_sql = await self._generate_diff_sql()
# 2. マイグレーションファイル作成
migration_file = f"sync_{self.source}_to_{self.target}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.sql"
with open(f"supabase/migrations/{migration_file}", 'w') as f:
f.write(diff_sql)
# 3. 検証スクリプト作成
validation_script = self._generate_validation_script()
return {
"migration_file": migration_file,
"validation_script": validation_script,
"next_steps": [
"1. マイグレーションファイルをレビュー",
"2. ステージング環境でテスト実行",
"3. 本番環境への適用"
]
}
async def _generate_diff_sql(self):
"""差分SQL生成"""
return """
-- スキーマ同期SQL
-- 注意: 本番環境での実行前に十分なテストを実施してください
BEGIN;
-- 新しいテーブル作成
CREATE TABLE IF NOT EXISTS new_feature_table (
id SERIAL PRIMARY KEY,
name TEXT NOT NULL,
created_at TIMESTAMPTZ DEFAULT NOW()
);
-- 既存テーブルの修正
ALTER TABLE existing_table
ADD COLUMN IF NOT EXISTS new_column TEXT;
-- インデックス作成
CREATE INDEX IF NOT EXISTS idx_new_feature_name
ON new_feature_table(name);
COMMIT;
"""
問題2: テストの不安定性
症状:
- CI/CDでテストが間欠的に失敗
- ローカルでは成功するがCI環境で失敗
- テスト間でのデータ競合
診断・解決手法:
import pytest
import asyncio
from datetime import datetime
import uuid
class TestStabilityManager:
def __init__(self, supabase_client):
self.client = supabase_client
self.test_isolation_prefix = f"test_{uuid.uuid4().hex[:8]}_"
async def setup_isolated_test_environment(self):
"""分離されたテスト環境セットアップ"""
# 1. テスト専用スキーマ作成
test_schema = f"test_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
create_schema_sql = f"""
CREATE SCHEMA IF NOT EXISTS {test_schema};
SET search_path TO {test_schema}, public;
"""
await self.client.rpc('execute_sql', {'query': create_schema_sql}).execute()
# 2. テストデータの準備
test_data = await self._create_test_data()
return {
"test_schema": test_schema,
"test_data": test_data,
"cleanup_function": lambda: self._cleanup_test_environment(test_schema)
}
async def _create_test_data(self):
"""テストデータ作成"""
# 決定的なテストデータ(毎回同じ結果)
test_users = []
for i in range(5):
user_data = {
"id": f"{self.test_isolation_prefix}user_{i:03d}",
"email": f"test_user_{i}@example.com",
"name": f"Test User {i}",
"created_at": datetime.now().isoformat()
}
test_users.append(user_data)
# バッチ挿入でパフォーマンス向上
result = await self.client.table('users').insert(test_users).execute()
return {
"users": test_users,
"user_count": len(test_users)
}
async def _cleanup_test_environment(self, test_schema):
"""テスト環境クリーンアップ"""
cleanup_sql = f"""
DROP SCHEMA IF EXISTS {test_schema} CASCADE;
"""
await self.client.rpc('execute_sql', {'query': cleanup_sql}).execute()
# pytest設定例
@pytest.fixture
async def stable_test_env():
"""安定したテスト環境フィクスチャ"""
manager = TestStabilityManager(supabase_client)
env = await manager.setup_isolated_test_environment()
yield env
# クリーンアップ
await env['cleanup_function']()
# テストケース例
@pytest.mark.asyncio
async def test_user_creation_stable(stable_test_env):
"""安定したユーザー作成テスト"""
# テストデータを使用
test_data = stable_test_env['test_data']
# 決定的なアサーション
assert len(test_data['users']) == 5
assert all(user['email'].endswith('@example.com') for user in test_data['users'])
# 冪等性確認
result = await supabase.table('users').select('*').execute()
assert len(result.data) >= 5
問題3: デプロイメント失敗
症状:
- デプロイ途中でロールバック
- 環境変数の設定ミス
- マイグレーション適用失敗
自動デプロイメント検証:
class DeploymentValidator:
def __init__(self, target_environment):
self.env = target_environment
self.validation_results = []
async def validate_deployment(self):
"""デプロイメント検証実行"""
validations = [
self._validate_database_connectivity,
self._validate_environment_variables,
self._validate_api_endpoints,
self._validate_realtime_functionality,
self._validate_security_policies
]
for validation in validations:
try:
result = await validation()
self.validation_results.append(result)
except Exception as e:
self.validation_results.append({
"validation": validation.__name__,
"status": "failed",
"error": str(e)
})
return self._generate_deployment_report()
async def _validate_database_connectivity(self):
"""データベース接続検証"""
connection_tests = []
# 基本接続テスト
try:
result = await self.env['client'].table('_health').select('*').limit(1).execute()
connection_tests.append({"test": "basic_connection", "status": "passed"})
except Exception as e:
connection_tests.append({"test": "basic_connection", "status": "failed", "error": str(e)})
# 認証テスト
try:
auth_result = await self.env['client'].auth.get_user()
connection_tests.append({"test": "auth_service", "status": "passed"})
except Exception as e:
connection_tests.append({"test": "auth_service", "status": "failed", "error": str(e)})
return {
"validation": "database_connectivity",
"status": "passed" if all(t["status"] == "passed" for t in connection_tests) else "failed",
"details": connection_tests
}
async def _validate_environment_variables(self):
"""環境変数検証"""
required_vars = [
'SUPABASE_URL',
'SUPABASE_ANON_KEY',
'SUPABASE_SERVICE_ROLE_KEY',
'DATABASE_URL'
]
var_status = []
for var in required_vars:
value = os.getenv(var)
if value:
var_status.append({
"variable": var,
"status": "present",
"length": len(value)
})
else:
var_status.append({
"variable": var,
"status": "missing"
})
return {
"validation": "environment_variables",
"status": "passed" if all(v["status"] == "present" for v in var_status) else "failed",
"details": var_status
}
async def _validate_api_endpoints(self):
"""APIエンドポイント検証"""
endpoint_tests = []
# REST API テスト
try:
response = await self.env['client'].table('users').select('count').execute()
endpoint_tests.append({
"endpoint": "REST API",
"status": "passed",
"response_time": "< 1s"
})
except Exception as e:
endpoint_tests.append({
"endpoint": "REST API",
"status": "failed",
"error": str(e)
})
# Realtime テスト
try:
# リアルタイム接続テスト
subscription = self.env['client'].table('users').on('*', lambda x: None).subscribe()
endpoint_tests.append({
"endpoint": "Realtime",
"status": "passed"
})
subscription.unsubscribe()
except Exception as e:
endpoint_tests.append({
"endpoint": "Realtime",
"status": "failed",
"error": str(e)
})
return {
"validation": "api_endpoints",
"status": "passed" if all(t["status"] == "passed" for t in endpoint_tests) else "failed",
"details": endpoint_tests
}
def _generate_deployment_report(self):
"""デプロイメントレポート生成"""
passed_validations = sum(1 for v in self.validation_results if v["status"] == "passed")
total_validations = len(self.validation_results)
return {
"deployment_environment": self.env['name'],
"validation_timestamp": datetime.now().isoformat(),
"overall_status": "passed" if passed_validations == total_validations else "failed",
"validation_summary": {
"passed": passed_validations,
"failed": total_validations - passed_validations,
"total": total_validations
},
"detailed_results": self.validation_results,
"recommendations": self._generate_deployment_recommendations()
}
def _generate_deployment_recommendations(self):
"""デプロイメント推奨事項生成"""
recommendations = []
failed_validations = [v for v in self.validation_results if v["status"] == "failed"]
for validation in failed_validations:
if "database_connectivity" in validation["validation"]:
recommendations.append("データベース接続設定を確認してください")
elif "environment_variables" in validation["validation"]:
recommendations.append("環境変数の設定を確認してください")
elif "api_endpoints" in validation["validation"]:
recommendations.append("APIエンドポイントの設定を確認してください")
if not failed_validations:
recommendations.append("すべての検証に成功しました。デプロイメント完了です。")
return recommendations
開発プロセスの問題
問題4: コードレビューの品質問題
症状:
- セキュリティ問題の見落とし
- パフォーマンス問題の見逃し
- アーキテクチャガイドライン違反
自動コードレビューツール:
class AutomatedCodeReview:
def __init__(self, project_path):
self.project_path = project_path
self.review_rules = self._load_review_rules()
def _load_review_rules(self):
"""レビュールール定義"""
return {
"security": [
{
"rule": "no_hardcoded_secrets",
"pattern": r"(password|secret|key)\s*=\s*['\"][^'\"]+['\"]",
"severity": "critical"
},
{
"rule": "rls_policy_check",
"pattern": r"supabase\.table\([^)]+\)\.select\(",
"severity": "warning",
"check": "ensure_rls_enabled"
}
],
"performance": [
{
"rule": "avoid_n_plus_1",
"pattern": r"for\s+\w+\s+in\s+.*:\s*supabase\.table",
"severity": "warning"
},
{
"rule": "pagination_required",
"pattern": r"\.select\([^)]*\)\.execute\(\)",
"severity": "info",
"check": "suggest_pagination"
}
],
"architecture": [
{
"rule": "client_side_business_logic",
"pattern": r"class.*Manager|class.*Service",
"severity": "warning",
"context": "client_side_pattern"
}
]
}
def analyze_code_changes(self, file_changes):
"""コード変更分析"""
review_comments = []
for file_path, changes in file_changes.items():
file_comments = self._analyze_file(file_path, changes)
review_comments.extend(file_comments)
return {
"total_issues": len(review_comments),
"critical_issues": len([c for c in review_comments if c["severity"] == "critical"]),
"warnings": len([c for c in review_comments if c["severity"] == "warning"]),
"suggestions": len([c for c in review_comments if c["severity"] == "info"]),
"comments": review_comments,
"approval_recommendation": self._generate_approval_recommendation(review_comments)
}
def _analyze_file(self, file_path, changes):
"""ファイル分析"""
comments = []
for category, rules in self.review_rules.items():
for rule in rules:
matches = re.finditer(rule["pattern"], changes, re.MULTILINE)
for match in matches:
comment = {
"file": file_path,
"line": self._get_line_number(changes, match.start()),
"rule": rule["rule"],
"severity": rule["severity"],
"message": self._generate_message(rule, match),
"suggestion": self._generate_suggestion(rule)
}
comments.append(comment)
return comments
def _generate_approval_recommendation(self, comments):
"""承認推奨判定"""
critical_count = len([c for c in comments if c["severity"] == "critical"])
warning_count = len([c for c in comments if c["severity"] == "warning"])
if critical_count > 0:
return {
"recommendation": "reject",
"reason": f"{critical_count}個の重大な問題があります"
}
elif warning_count > 5:
return {
"recommendation": "request_changes",
"reason": f"{warning_count}個の警告があります。修正を検討してください"
}
else:
return {
"recommendation": "approve",
"reason": "重大な問題は検出されませんでした"
}
プロジェクト管理の問題
問題5: 技術債務の蓄積
症状:
- 開発速度の低下
- バグ修正時間の増加
- 新機能実装の困難さ
技術債務監視ダッシュボード:
class TechnicalDebtMonitor:
def __init__(self, project_metrics):
self.metrics = project_metrics
def analyze_technical_debt(self):
"""技術債務分析"""
debt_indicators = {
"code_complexity": self._analyze_code_complexity(),
"test_coverage": self._analyze_test_coverage(),
"performance_regression": self._analyze_performance_trends(),
"security_vulnerabilities": self._analyze_security_issues(),
"documentation_coverage": self._analyze_documentation()
}
debt_score = self._calculate_debt_score(debt_indicators)
return {
"overall_debt_score": debt_score,
"debt_level": self._categorize_debt_level(debt_score),
"indicators": debt_indicators,
"recommendations": self._generate_debt_reduction_plan(debt_indicators)
}
def _calculate_debt_score(self, indicators):
"""債務スコア計算"""
weights = {
"code_complexity": 0.25,
"test_coverage": 0.20,
"performance_regression": 0.20,
"security_vulnerabilities": 0.25,
"documentation_coverage": 0.10
}
weighted_score = sum(
indicators[indicator]["score"] * weights[indicator]
for indicator in indicators
)
return round(weighted_score, 2)
def _generate_debt_reduction_plan(self, indicators):
"""債務削減計画生成"""
plan = []
# 優先度順に並べ替え
sorted_indicators = sorted(
indicators.items(),
key=lambda x: x[1]["priority"],
reverse=True
)
for indicator_name, data in sorted_indicators:
if data["score"] < 7.0: # 7.0未満は改善対象
plan.append({
"area": indicator_name,
"current_score": data["score"],
"target_score": 8.0,
"actions": data.get("suggested_actions", []),
"estimated_effort": data.get("effort_estimate", "medium")
})
return plan
まとめ
Chapter 9では、実践的なアーキテクチャ選択演習を通じて、要件分析から最適なパターン選択までの体系的なアプローチを学習しました。
習得したスキル:
-
要件分析: 機能・非機能要件からの定量的評価
-
アーキテクチャ決定エンジン
-
要件駆動: 技術選択より要件を優先
-
段階的移行: 現行システムからのスムーズな移行
実践的フレームワーク:
- アーキテクチャ決定エンジン
- 移行実行管理システム
- ハイブリッド構成設計ツール
設計原則:
- 要件駆動: 技術選択より要件を優先
- 段階的移行: 現行システムからのスムーズな移行
📝 Chapter 9 学習まとめ
✅ 習得できたスキル
- ✅ 要件分析からのアーキテクチャパターン選択手法
- ✅ 定量的評価によるパターン比較・決定フレームワーク
- ✅ 実際のケーススタディによる実践的判断力
- ✅ 段階的移行・ハイブリッド構成の設計手法
🎯 アーキテクチャ決定の全体像
| 決定要素 | 重要度 | 判断基準 | 対応パターン | |:———|:——-|:———|:————-| | ユーザー規模 | ⭐⭐⭐ | 〜100人 / 〜10,000人 / 10,000人〜 | Client / Edge / API | | 機能複雑度 | ⭐⭐⭐ | CRUD / 処理 / エンタープライズ | Client / Edge / API | | チーム経験 | ⭐⭐ | 初心者 / 中級 / 上級 | 段階的選択 | | 予算・期間 | ⭐⭐ | 制約大 / 中程度 / 十分 | 最適化レベル選択 |
🔄 実践での応用
- ✅ 新規プロジェクトでの要件分析・アーキテクチャ選択
- ✅ 既存システムの移行計画・段階的実装
- ✅ プロジェクト途中でのアーキテクチャ変更判断
- ✅ チーム内でのアーキテクチャ議論・合意形成
🚀 最終章予告:トラブルシューティング
Chapter 10では、「緊急救命室の医師」レベルの問題解決力を身に着けます:
- 🚨 迅速な問題診断: 症状から根本原因の特定手法
- 🔧 即座の応急処置: システム停止を最小限に抑える緊急対応
- 💊 根本治療: 同じ問題を二度と起こさない恒久対策
- 📚 知識の体系化: 過去事例のナレッジベース構築
💡 実習目標: 「深夜2時のシステム障害でも、冷静に問題を解決できるエンジニア」
📍 ナビゲーション
- 📚 目次: 📖 学習ガイド
- ⬅️ 前の章: Chapter 8: 運用監視と自動化
- ➡️ 最終章: Chapter 10: トラブルシューティング
-
🏠 関連章: Chapter 3-5: アーキテクチャパターン 選択ガイド -
🔧 リソース: アーキテクチャ決定ツール 移行チェックリスト