第6章 OAuth 2.0
なぜこの章が重要か
現代のWebにおいて、OAuth 2.0は認可の事実上の標準プロトコルです。GitHubでログイン、Googleドライブへのアクセス許可、TwitterへのツイートBot - これらすべてがOAuth 2.0によって実現されています。しかし、その柔軟性ゆえに誤った実装も多く、セキュリティインシデントの原因となっています。この章では、OAuth 2.0が解決する問題、正しい実装方法、そして進化し続けるセキュリティ対策を学びます。
6.1 OAuth 2.0の設計思想 - なぜOAuthが生まれたのか
6.1.1 パスワードアンチパターンの問題
OAuth登場以前、サードパーティアプリケーションがユーザーのリソースにアクセスする方法は問題だらけでした。
class PreOAuthProblems:
"""OAuth以前の認可の問題"""
def password_antipattern(self):
"""パスワードアンチパターンの実例"""
# 2007年頃の典型的な実装
bad_practice = {
'scenario': '''
TwitterクライアントアプリがTwitter APIを使う場合:
1. アプリ:「Twitterのユーザー名とパスワードを入力してください」
2. ユーザー:自分のパスワードをサードパーティアプリに渡す
3. アプリ:受け取ったパスワードを保存(!)
4. アプリ:ユーザーのパスワードでTwitter APIにアクセス
''',
'problems': [
{
'issue': 'パスワードの漏洩リスク',
'impact': 'アプリのDBが侵害されるとユーザーのパスワードが流出',
'example': '2009年、某Twitter管理ツールから10万件のパスワード流出'
},
{
'issue': '過剰な権限',
'impact': 'パスワードを持つ = すべての操作が可能',
'example': 'ツイート投稿だけしたいのにDM閲覧も可能に'
},
{
'issue': 'アクセス取り消しの困難さ',
'impact': 'パスワード変更しないとアクセスを止められない',
'example': '使わなくなったアプリが永続的にアクセス可能'
},
{
'issue': '信頼の連鎖の破綻',
'impact': 'ユーザーはすべてのアプリを信頼する必要',
'example': '悪意あるアプリがパスワードを悪用'
}
]
}
return bad_practice
6.1.2 OAuthが解決する問題の本質
class OAuthPhilosophy:
"""OAuth 2.0の設計哲学"""
def core_principles(self):
"""OAuth 2.0の中核原則"""
return {
'delegation_not_impersonation': {
'concept': '委任であって、なりすましではない',
'meaning': '''
# 従来:なりすまし
app.login_as_user(username, password) # アプリがユーザーになりすます
# OAuth:委任
user.grant_permission_to(app, scope=['read_tweets']) # ユーザーが権限を委任
''',
'benefit': 'ユーザーが主体的に権限をコントロール'
},
'separation_of_concerns': {
'concept': '関心の分離',
'roles': {
'resource_owner': 'リソースの所有者(エンドユーザー)',
'client': 'リソースにアクセスしたいアプリケーション',
'authorization_server': '認可を管理するサーバー',
'resource_server': 'リソースを提供するAPIサーバー'
},
'benefit': '各コンポーネントが単一の責任を持つ'
},
'limited_scope': {
'concept': '権限の最小化',
'example': '''
# スコープによる権限制限
scopes = {
'read:profile': 'プロフィール情報の読み取り',
'write:tweets': 'ツイートの投稿',
'read:dm': 'ダイレクトメッセージの読み取り'
}
# アプリは必要最小限のスコープのみ要求
requested_scopes = ['read:profile', 'write:tweets']
''',
'benefit': '過剰な権限付与を防ぐ'
},
'revocability': {
'concept': 'いつでも取り消し可能',
'implementation': '''
# ユーザーはいつでもアクセスを取り消せる
def revoke_access(user_id: str, client_id: str):
tokens = Token.query.filter_by(
user_id=user_id,
client_id=client_id
).all()
for token in tokens:
token.revoked = True
db.session.commit()
''',
'benefit': 'ユーザーが自分のデータをコントロール'
}
}
6.1.3 なぜOAuth 2.0なのか(1.0との違い)
class OAuth2Evolution:
"""OAuth 1.0から2.0への進化"""
def why_oauth2(self):
"""なぜOAuth 2.0が必要だったのか"""
oauth1_problems = {
'signature_complexity': {
'issue': '署名の計算が複雑',
'oauth1_example': '''
# OAuth 1.0の署名計算
def create_oauth1_signature(method, url, params, consumer_secret, token_secret):
# 1. パラメータの正規化
normalized_params = normalize_parameters(params)
# 2. ベース文字列の作成
base_string = f"{method}&{percent_encode(url)}&{percent_encode(normalized_params)}"
# 3. 署名キーの作成
signing_key = f"{percent_encode(consumer_secret)}&{percent_encode(token_secret)}"
# 4. HMAC-SHA1で署名
signature = hmac.new(
signing_key.encode(),
base_string.encode(),
hashlib.sha1
).digest()
return base64.b64encode(signature).decode()
''',
'developer_impact': '実装ミスが頻発、デバッグが困難'
},
'limited_client_types': {
'issue': 'Webアプリケーション中心の設計',
'limitations': [
'モバイルアプリでの実装が困難',
'JavaScriptアプリ(SPA)での使用が非現実的',
'IoTデバイスでの実装が複雑'
]
},
'performance_overhead': {
'issue': '毎リクエストでの署名計算',
'impact': 'API呼び出しのオーバーヘッド増大'
}
}
oauth2_improvements = {
'simplicity': {
'change': 'Bearer Tokenによるシンプルな認可',
'example': '''
# OAuth 2.0のシンプルなAPI呼び出し
headers = {
'Authorization': f'Bearer {access_token}'
}
response = requests.get('https://api.example.com/user', headers=headers)
''',
'benefit': '実装が容易、エラーが少ない'
},
'flexibility': {
'change': '複数のグラントタイプ',
'types': {
'authorization_code': 'Webアプリ向け',
'implicit': 'SPAア向け(現在は非推奨)',
'client_credentials': 'サーバー間通信',
'resource_owner_password': 'レガシー対応'
},
'benefit': '様々なユースケースに対応'
},
'extensibility': {
'change': '拡張可能な仕様',
'extensions': [
'PKCE(Proof Key for Code Exchange)',
'Device Authorization Grant',
'Token Introspection',
'Token Revocation'
],
'benefit': '進化する脅威への対応が可能'
}
}
return {
'oauth1_problems': oauth1_problems,
'oauth2_improvements': oauth2_improvements,
'migration_impact': 'OAuth 2.0は後方互換性を捨てて、より良い設計を選択'
}
6.2 各種グラントタイプの使い分け - ユースケースに応じた選択
6.2.1 Authorization Code Grant - 最も安全な標準フロー
class AuthorizationCodeGrant:
"""認可コードグラントの実装"""
def __init__(self):
self.flow_explanation = self._explain_flow()
self.implementation = self._implement_flow()
def _explain_flow(self):
"""なぜ認可コードグラントが安全なのか"""
return {
'security_features': {
'code_exchange': {
'reason': 'アクセストークンがブラウザを経由しない',
'benefit': 'ブラウザ履歴やリファラーでの漏洩を防ぐ'
},
'client_authentication': {
'reason': 'トークンエンドポイントでクライアント認証',
'benefit': '認可コードを盗んでも、client_secretなしでは使えない'
},
'one_time_code': {
'reason': '認可コードは一度だけ使用可能',
'benefit': 'リプレイ攻撃を防ぐ'
},
'short_lived_code': {
'reason': '認可コードの有効期限は短い(通常10分)',
'benefit': '攻撃の時間窓を最小化'
}
},
'flow_diagram': '''
User Browser Client App Auth Server Resource Server
│ │ │ │ │
│ 1. Access App │ │ │
├───────────────────────────▶│ │ │
│ │ │ │ │
│ │ 2. Redirect to Auth │ │
│ │◀────────────────┤ │ │
│ │ │ │ │
│ │ 3. Authorization Request │ │
│ ├────────────────────────────────▶│ │
│ │ │ │ │
│ 4. Login & Consent │ │ │
│◀───────────┼────────────────────────────────┤ │
│ │ │ │ │
│ 5. Approve │ │ │ │
├────────────┼───────────────────────────────▶│ │
│ │ │ │ │
│ │ 6. Redirect with Code │ │
│ │◀────────────────────────────────┤ │
│ │ │ │ │
│ │ 7. Code │ │ │
│ ├────────────────▶│ │ │
│ │ │ │ │
│ │ │ 8. Exchange Code for Token │
│ │ ├──────────────▶│ │
│ │ │ │ │
│ │ │ 9. Access Token │
│ │ │◀──────────────┤ │
│ │ │ │ │
│ │ │ 10. API Request │
│ │ ├─────────────────────────────▶│
│ │ │ │ │
│ │ │ 11. Protected Resource │
│ │ │◀─────────────────────────────┤
'''
}
def _implement_flow(self):
"""認可コードフローの実装"""
from flask import Flask, request, redirect, session
import secrets
import requests
from urllib.parse import urlencode
class OAuthClient:
def __init__(self, client_id: str, client_secret: str,
auth_endpoint: str, token_endpoint: str):
self.client_id = client_id
self.client_secret = client_secret
self.auth_endpoint = auth_endpoint
self.token_endpoint = token_endpoint
def create_authorization_url(self, redirect_uri: str,
scope: List[str],
state: Optional[str] = None) -> str:
"""認可URLの作成"""
# CSRF対策のstate生成
if not state:
state = secrets.token_urlsafe(32)
session['oauth_state'] = state
params = {
'response_type': 'code',
'client_id': self.client_id,
'redirect_uri': redirect_uri,
'scope': ' '.join(scope),
'state': state
}
return f"{self.auth_endpoint}?{urlencode(params)}"
def exchange_code_for_token(self, code: str,
redirect_uri: str,
state: str) -> Dict:
"""認可コードをアクセストークンに交換"""
# State検証(CSRF対策)
if state != session.get('oauth_state'):
raise ValueError("Invalid state parameter")
# Stateを削除(再利用防止)
session.pop('oauth_state', None)
# トークンリクエスト
token_data = {
'grant_type': 'authorization_code',
'code': code,
'redirect_uri': redirect_uri,
'client_id': self.client_id,
'client_secret': self.client_secret
}
response = requests.post(
self.token_endpoint,
data=token_data,
headers={'Accept': 'application/json'}
)
if response.status_code != 200:
raise Exception(f"Token exchange failed: {response.text}")
tokens = response.json()
# トークンの検証
self._validate_tokens(tokens)
return tokens
def _validate_tokens(self, tokens: Dict):
"""取得したトークンの基本検証"""
required_fields = ['access_token', 'token_type']
for field in required_fields:
if field not in tokens:
raise ValueError(f"Missing required field: {field}")
if tokens['token_type'].lower() != 'bearer':
raise ValueError(f"Unsupported token type: {tokens['token_type']}")
# トークンの有効期限確認
if 'expires_in' in tokens and tokens['expires_in'] <= 0:
raise ValueError("Token already expired")
return OAuthClient
6.2.2 Client Credentials Grant - サービス間認証
class ClientCredentialsGrant:
"""クライアントクレデンシャルグラントの実装"""
def explain_use_case(self):
"""いつClient Credentialsを使うべきか"""
return {
'appropriate_scenarios': [
{
'scenario': 'マイクロサービス間通信',
'example': 'OrderServiceがUserServiceのAPIを呼ぶ',
'why': 'エンドユーザーが関与しない'
},
{
'scenario': 'バッチ処理',
'example': '夜間バッチがAPIを使ってデータ同期',
'why': 'ユーザーコンテキストが不要'
},
{
'scenario': 'システム管理タスク',
'example': '監視システムがメトリクスAPIにアクセス',
'why': 'システムレベルの操作'
}
],
'inappropriate_scenarios': [
{
'scenario': 'ユーザー固有のリソースアクセス',
'why': 'ユーザーの認可が必要',
'correct_grant': 'Authorization Code'
},
{
'scenario': 'モバイルアプリからの直接アクセス',
'why': 'クライアントシークレットを安全に保存できない',
'correct_grant': 'Authorization Code + PKCE'
}
]
}
def implement_grant(self):
"""Client Credentials実装"""
class ServiceAuthClient:
def __init__(self, client_id: str, client_secret: str,
token_endpoint: str):
self.client_id = client_id
self.client_secret = client_secret
self.token_endpoint = token_endpoint
self.token_cache = {}
async def get_access_token(self, scope: Optional[List[str]] = None) -> str:
"""アクセストークンの取得(キャッシュ付き)"""
cache_key = f"{self.client_id}:{':'.join(scope or [])}"
# キャッシュチェック
cached_token = self._get_cached_token(cache_key)
if cached_token:
return cached_token
# 新規取得
token_data = {
'grant_type': 'client_credentials',
'client_id': self.client_id,
'client_secret': self.client_secret
}
if scope:
token_data['scope'] = ' '.join(scope)
response = await self._make_token_request(token_data)
# キャッシュに保存
self._cache_token(cache_key, response)
return response['access_token']
def _get_cached_token(self, cache_key: str) -> Optional[str]:
"""キャッシュからトークンを取得"""
if cache_key not in self.token_cache:
return None
cached = self.token_cache[cache_key]
# 有効期限チェック(5分のバッファ)
if time.time() < cached['expires_at'] - 300:
return cached['access_token']
# 期限切れの場合は削除
del self.token_cache[cache_key]
return None
def _cache_token(self, cache_key: str, token_response: Dict):
"""トークンをキャッシュに保存"""
expires_in = token_response.get('expires_in', 3600)
self.token_cache[cache_key] = {
'access_token': token_response['access_token'],
'expires_at': time.time() + expires_in
}
async def make_authenticated_request(self,
url: str,
method: str = 'GET',
**kwargs) -> requests.Response:
"""認証付きHTTPリクエスト"""
token = await self.get_access_token()
headers = kwargs.get('headers', {})
headers['Authorization'] = f'Bearer {token}'
kwargs['headers'] = headers
response = requests.request(method, url, **kwargs)
# 401の場合はトークンをリフレッシュして再試行
if response.status_code == 401:
# キャッシュクリア
self.token_cache.clear()
# 新しいトークンで再試行
token = await self.get_access_token()
headers['Authorization'] = f'Bearer {token}'
response = requests.request(method, url, **kwargs)
return response
return ServiceAuthClient
6.2.3 Refresh Token Grant - トークンの更新
class RefreshTokenGrant:
"""リフレッシュトークングラントの実装"""
def explain_refresh_token_design(self):
"""なぜリフレッシュトークンが必要か"""
return {
'design_rationale': {
'security_vs_usability': {
'problem': 'アクセストークンの有効期限のジレンマ',
'short_lived': {
'pros': 'セキュア(漏洩時の影響が限定的)',
'cons': '頻繁な再認証が必要'
},
'long_lived': {
'pros': 'ユーザビリティが高い',
'cons': '漏洩時の影響が大きい'
},
'solution': 'リフレッシュトークンによる両立'
},
'token_characteristics': {
'access_token': {
'lifetime': '15分〜1時間',
'usage': '頻繁(API呼び出しごと)',
'storage': 'メモリ推奨',
'scope': 'APIアクセス'
},
'refresh_token': {
'lifetime': '30日〜90日',
'usage': 'まれ(アクセストークン更新時のみ)',
'storage': 'セキュアストレージ',
'scope': '新しいアクセストークンの取得のみ'
}
}
}
}
def implement_refresh_flow(self):
"""リフレッシュフローの実装"""
class TokenManager:
def __init__(self, client_id: str, client_secret: str,
token_endpoint: str):
self.client_id = client_id
self.client_secret = client_secret
self.token_endpoint = token_endpoint
self.token_store = SecureTokenStore()
async def refresh_access_token(self, refresh_token: str) -> Dict:
"""リフレッシュトークンを使用して新しいアクセストークンを取得"""
refresh_data = {
'grant_type': 'refresh_token',
'refresh_token': refresh_token,
'client_id': self.client_id,
'client_secret': self.client_secret
}
try:
response = await self._make_token_request(refresh_data)
# 新しいトークンの保存
await self._store_tokens(response)
# リフレッシュトークンローテーション対応
if 'refresh_token' in response:
# 新しいリフレッシュトークンが発行された場合
await self._rotate_refresh_token(
old_token=refresh_token,
new_token=response['refresh_token']
)
return response
except TokenExpiredError:
# リフレッシュトークン自体が期限切れ
raise ReAuthenticationRequired()
except InvalidTokenError:
# リフレッシュトークンが無効(取り消されたなど)
raise ReAuthenticationRequired()
async def _rotate_refresh_token(self, old_token: str, new_token: str):
"""リフレッシュトークンのローテーション処理"""
# 古いトークンを無効化
await self.token_store.revoke_token(old_token)
# 新しいトークンを保存
await self.token_store.store_refresh_token(new_token)
# セキュリティログ
logging.info(f"Refresh token rotated for client {self.client_id}")
def implement_automatic_refresh(self):
"""自動リフレッシュの実装"""
class AutoRefreshClient:
def __init__(self, token_manager: TokenManager):
self.token_manager = token_manager
self.access_token = None
self.token_expiry = None
self.refresh_token = None
self.refresh_lock = asyncio.Lock()
async def make_request(self, url: str, **kwargs) -> Response:
"""自動リフレッシュ機能付きリクエスト"""
# トークンの有効性チェック
if await self._should_refresh():
await self._refresh_tokens()
# リクエスト実行
headers = kwargs.get('headers', {})
headers['Authorization'] = f'Bearer {self.access_token}'
kwargs['headers'] = headers
response = await aiohttp.request('GET', url, **kwargs)
# 401エラーの場合は再度リフレッシュ
if response.status == 401:
await self._refresh_tokens()
# 再試行
headers['Authorization'] = f'Bearer {self.access_token}'
response = await aiohttp.request('GET', url, **kwargs)
return response
async def _should_refresh(self) -> bool:
"""リフレッシュが必要かチェック"""
if not self.access_token:
return True
# 有効期限の5分前にリフレッシュ
buffer_time = 300
return time.time() >= (self.token_expiry - buffer_time)
async def _refresh_tokens(self):
"""トークンのリフレッシュ(重複防止付き)"""
async with self.refresh_lock:
# 別のコルーチンが既にリフレッシュしている場合
if not await self._should_refresh():
return
tokens = await self.token_manager.refresh_access_token(
self.refresh_token
)
self.access_token = tokens['access_token']
self.token_expiry = time.time() + tokens['expires_in']
if 'refresh_token' in tokens:
self.refresh_token = tokens['refresh_token']
return AutoRefreshClient
return TokenManager
6.2.4 最新のグラントタイプ
class ModernGrantTypes:
"""最新のOAuth 2.0グラントタイプ"""
def device_authorization_grant(self):
"""Device Authorization Grant (RFC 8628)"""
return {
'use_case': 'キーボード入力が困難なデバイス',
'examples': ['スマートTV', 'ゲーム機', 'IoTデバイス'],
'flow': '''
1. デバイス → 認可サーバー: デバイスコードをリクエスト
2. 認可サーバー → デバイス: device_code, user_code, verification_uri
3. デバイス → ユーザー: "https://example.com/device で ABCD-1234 を入力"
4. ユーザー → ブラウザ: コード入力と認可
5. デバイス → 認可サーバー: ポーリング(device_codeでトークン確認)
6. 認可サーバー → デバイス: アクセストークン
''',
'implementation': '''
class DeviceAuthFlow:
async def initiate_device_flow(self, client_id: str) -> Dict:
"""デバイスフローの開始"""
response = await self.http_client.post(
'https://auth.example.com/device/code',
data={
'client_id': client_id,
'scope': 'read write'
}
)
return {
'device_code': response['device_code'],
'user_code': response['user_code'],
'verification_uri': response['verification_uri'],
'expires_in': response['expires_in'],
'interval': response.get('interval', 5)
}
async def poll_for_token(self, device_code: str,
interval: int = 5) -> Dict:
"""トークンのポーリング"""
while True:
try:
response = await self.http_client.post(
'https://auth.example.com/token',
data={
'grant_type': 'urn:ietf:params:oauth:grant-type:device_code',
'device_code': device_code,
'client_id': self.client_id
}
)
if response.status_code == 200:
return response.json()
elif response.json()['error'] == 'authorization_pending':
await asyncio.sleep(interval)
elif response.json()['error'] == 'slow_down':
interval += 5
await asyncio.sleep(interval)
else:
raise Exception(response.json()['error'])
except Exception as e:
raise DeviceAuthError(f"Polling failed: {e}")
'''
}
6.3 セキュリティ考慮事項(PKCE等)- 脆弱性の歴史と対策の進化
6.3.1 Authorization Code Injection Attack
class AuthCodeInjectionAttack:
"""認可コードインジェクション攻撃と対策"""
def explain_vulnerability(self):
"""脆弱性の説明"""
return {
'attack_scenario': '''
1. 攻撃者が正規のOAuthフローを開始
2. 認可サーバーから認可コードを取得(自分のコード)
3. 被害者に対して、攻撃者の認可コードを含むリダイレクトURLを送る
4. 被害者がそのURLをクリック
5. アプリが攻撃者の認可コードを使ってトークンを取得
6. 被害者が攻撃者のアカウントでログインしてしまう
''',
'impact': [
'被害者が攻撃者のアカウントで操作してしまう',
'被害者の情報が攻撃者のアカウントに保存される',
'CSRF攻撃の一種として悪用可能'
],
'traditional_mitigation': {
'state_parameter': '''
# stateパラメータによる対策
def create_auth_url():
state = secrets.token_urlsafe(32)
session['oauth_state'] = state
return f"{auth_url}?client_id={client_id}&state={state}"
def handle_callback(code, state):
if state != session.get('oauth_state'):
raise SecurityError("Invalid state")
''',
'limitation': 'stateは主にCSRF対策であり、code injection対策としては不完全'
}
}
def implement_pkce(self):
"""PKCE (RFC 7636) の実装"""
import hashlib
import base64
import secrets
class PKCEClient:
"""Proof Key for Code Exchange実装"""
def generate_pkce_pair(self) -> Tuple[str, str]:
"""PKCE用のcode_verifierとcode_challengeを生成"""
# Code Verifier: 43-128文字のランダム文字列
code_verifier = base64.urlsafe_b64encode(
secrets.token_bytes(32)
).decode('utf-8').rstrip('=')
# Code Challenge: VerifierのSHA256ハッシュ
challenge_bytes = hashlib.sha256(
code_verifier.encode('utf-8')
).digest()
code_challenge = base64.urlsafe_b64encode(
challenge_bytes
).decode('utf-8').rstrip('=')
return code_verifier, code_challenge
def create_authorization_url_with_pkce(self,
redirect_uri: str,
scope: List[str]) -> Tuple[str, str]:
"""PKCE対応の認可URL作成"""
# PKCEペアの生成
code_verifier, code_challenge = self.generate_pkce_pair()
# セッションに保存(後で使用)
session['pkce_verifier'] = code_verifier
params = {
'response_type': 'code',
'client_id': self.client_id,
'redirect_uri': redirect_uri,
'scope': ' '.join(scope),
'state': secrets.token_urlsafe(32),
# PKCE パラメータ
'code_challenge': code_challenge,
'code_challenge_method': 'S256'
}
auth_url = f"{self.auth_endpoint}?{urlencode(params)}"
return auth_url, code_verifier
def exchange_code_with_pkce(self,
code: str,
redirect_uri: str) -> Dict:
"""PKCE検証付きでコードをトークンに交換"""
# セッションからverifierを取得
code_verifier = session.pop('pkce_verifier', None)
if not code_verifier:
raise SecurityError("PKCE verifier not found")
token_data = {
'grant_type': 'authorization_code',
'code': code,
'redirect_uri': redirect_uri,
'client_id': self.client_id,
# PKCEパラメータ
'code_verifier': code_verifier
}
# パブリッククライアントの場合はclient_secretなし
if self.client_secret:
token_data['client_secret'] = self.client_secret
response = requests.post(self.token_endpoint, data=token_data)
if response.status_code != 200:
raise OAuthError(f"Token exchange failed: {response.text}")
return response.json()
def explain_pkce_security(self):
"""PKCEがなぜ安全なのか"""
return {
'attack_prevention': '''
1. 攻撃者が認可コードを盗んでも...
2. code_verifierを知らないため、トークン交換できない
3. code_challengeからcode_verifierを逆算することは不可能(SHA256)
''',
'flow_comparison': {
'without_pkce': '''
App → AS: code=ABCD
AS → App: access_token(攻撃者も同じcodeで取得可能)
''',
'with_pkce': '''
App → AS: code=ABCD, code_verifier=SECRET123
AS: SHA256(SECRET123) == stored_challenge?
AS → App: access_token(verifierなしでは取得不可)
'''
}
}
return PKCEClient
6.3.2 その他の主要な脆弱性と対策
class OAuthSecurityVulnerabilities:
"""OAuth 2.0の脆弱性と対策"""
def open_redirect_vulnerability(self):
"""オープンリダイレクト脆弱性"""
return {
'vulnerability': {
'description': 'redirect_uriの検証不足による任意のサイトへのリダイレクト',
'attack': '''
# 攻撃例
https://auth.example.com/authorize?
client_id=abc123&
redirect_uri=https://evil.com& # 悪意のあるサイト
response_type=code
''',
'impact': '認可コードやトークンの窃取'
},
'mitigation': '''
class RedirectURIValidator:
def __init__(self):
# 事前登録されたredirect_uri
self.registered_uris = {
'client_abc123': [
'https://app.example.com/callback',
'https://app.example.com/oauth/callback'
]
}
def validate_redirect_uri(self, client_id: str,
redirect_uri: str) -> bool:
"""redirect_uriの厳密な検証"""
registered = self.registered_uris.get(client_id, [])
# 完全一致のみ許可(部分一致は危険)
if redirect_uri not in registered:
return False
# プロトコルのダウングレード防止
parsed = urlparse(redirect_uri)
if parsed.scheme != 'https':
# localhost開発環境のみ例外
if not (parsed.hostname == 'localhost' and
parsed.port in [3000, 8080]):
return False
return True
'''
}
def token_leakage_in_referrer(self):
"""リファラーによるトークン漏洩"""
return {
'vulnerability': {
'description': 'URLフラグメントのトークンがリファラーで漏洩',
'scenario': 'Implicitフロー使用時の問題',
'example': '''
# ブラウザのアドレスバー
https://app.example.com/#access_token=SECRET_TOKEN
# このページから外部リンクをクリックすると...
Referer: https://app.example.com/#access_token=SECRET_TOKEN
'''
},
'mitigation': [
{
'method': 'Implicitフローの廃止',
'recommendation': 'Authorization Code + PKCEを使用'
},
{
'method': 'Referrer-Policyの設定',
'implementation': '''
# HTMLメタタグ
<meta name="referrer" content="no-referrer">
# HTTPヘッダー
Referrer-Policy: no-referrer
'''
},
{
'method': 'トークンの即座の処理',
'implementation': '''
// フラグメントからトークンを抽出して削除
if (window.location.hash) {
const params = new URLSearchParams(
window.location.hash.substring(1)
);
const token = params.get('access_token');
// トークンを安全な場所に保存
tokenManager.store(token);
// URLからトークンを削除
window.history.replaceState(
{},
document.title,
window.location.pathname
);
}
'''
}
]
}
def mix_up_attack(self):
"""Mix-Up攻撃"""
return {
'vulnerability': {
'description': '複数のASを使う場合の混乱攻撃',
'attack_flow': '''
1. クライアントがAS1に認可リクエスト
2. 攻撃者(AS2)が、AS1のclient_idでレスポンス
3. クライアントが誤ってAS2にトークンリクエスト
4. 攻撃者がトークンを取得
'''
},
'mitigation': {
'issuer_identification': '''
# Authorization Response にissuerを含める
def create_auth_response(code: str, state: str) -> str:
params = {
'code': code,
'state': state,
'iss': 'https://auth.example.com' # 発行者の明示
}
return f"{redirect_uri}?{urlencode(params)}"
# クライアント側での検証
def validate_auth_response(params: Dict, expected_issuer: str):
if params.get('iss') != expected_issuer:
raise SecurityError("Issuer mismatch")
'''
}
}
6.3.3 セキュリティベストプラクティス集
class OAuthSecurityBestPractices:
"""OAuth 2.0セキュリティのベストプラクティス"""
def comprehensive_security_checklist(self):
"""包括的なセキュリティチェックリスト"""
return {
'client_authentication': {
'public_clients': [
'PKCEを必須とする',
'client_secretを使用しない',
'redirect_uriの厳密な検証'
],
'confidential_clients': [
'client_secretの安全な管理',
'mTLSの検討',
'client assertion (JWT) の使用'
]
},
'token_handling': {
'storage': [
'アクセストークンはメモリに保持',
'リフレッシュトークンは暗号化して保存',
'トークンのスコープを最小限に'
],
'transmission': [
'HTTPSの必須化',
'Authorizationヘッダーの使用',
'URLパラメータでのトークン送信禁止'
],
'validation': [
'トークンの署名検証',
'有効期限の確認',
'audience (aud) の検証'
]
},
'implementation_security': '''
class SecureOAuthImplementation:
def __init__(self):
self.security_config = {
'token_lifetime': 900, # 15分
'refresh_token_lifetime': 2592000, # 30日
'auth_code_lifetime': 600, # 10分
'pkce_required': True,
'state_required': True
}
def validate_client_request(self, request: Request) -> bool:
"""クライアントリクエストの包括的検証"""
# HTTPS必須
if not request.is_secure:
raise SecurityError("HTTPS required")
# リクエストの完全性チェック
if self._detect_parameter_pollution(request):
raise SecurityError("Parameter pollution detected")
# レート制限
if not self._check_rate_limit(request.client_id):
raise RateLimitError("Too many requests")
return True
def _detect_parameter_pollution(self, request: Request) -> bool:
"""HTTPパラメータ汚染の検出"""
for key in request.args:
if len(request.args.getlist(key)) > 1:
# 同じパラメータが複数回出現
return True
return False
'''
}
6.4 実装時の落とし穴 - よくある実装ミスとその影響
6.4.1 State Parameter の誤用
class StatePitfalls:
"""Stateパラメータの実装ミス"""
def common_mistakes(self):
"""よくある間違いとその影響"""
return [
{
'mistake': '予測可能なstate値',
'bad_example': '''
# ❌ 悪い例:予測可能
state = str(int(time.time()))
state = f"oauth_state_{user_id}"
state = hashlib.md5(session_id.encode()).hexdigest()
''',
'good_example': '''
# ✅ 良い例:暗号学的に安全な乱数
import secrets
state = secrets.token_urlsafe(32)
''',
'impact': 'CSRF攻撃が可能になる'
},
{
'mistake': 'stateの再利用',
'bad_example': '''
# ❌ 悪い例:stateを使い回し
class OAuthClient:
def __init__(self):
self.state = "my_oauth_state" # 固定値
def create_auth_url(self):
return f"{auth_url}?state={self.state}"
''',
'good_example': '''
# ✅ 良い例:毎回新しいstateを生成
def create_auth_url():
state = secrets.token_urlsafe(32)
session['oauth_state'] = state
session['oauth_state_timestamp'] = time.time()
return f"{auth_url}?state={state}"
''',
'impact': 'リプレイ攻撃の可能性'
},
{
'mistake': 'stateの有効期限なし',
'bad_example': '''
# ❌ 悪い例:無期限に有効
def verify_state(state):
return state == session.get('oauth_state')
''',
'good_example': '''
# ✅ 良い例:有効期限付き
def verify_state(state):
stored_state = session.get('oauth_state')
timestamp = session.get('oauth_state_timestamp', 0)
# 10分以内のみ有効
if time.time() - timestamp > 600:
return False
# 一度だけ使用可能
if state == stored_state:
session.pop('oauth_state', None)
session.pop('oauth_state_timestamp', None)
return True
return False
'''
}
]
6.4.2 トークンの取り扱いミス
class TokenHandlingMistakes:
"""トークン取り扱いの実装ミス"""
def insecure_token_storage(self):
"""安全でないトークン保存"""
return {
'localStorage_abuse': {
'bad_example': '''
// ❌ 悪い例:LocalStorageに生のトークン
fetch('/oauth/callback?code=' + code)
.then(res => res.json())
.then(data => {
localStorage.setItem('access_token', data.access_token);
localStorage.setItem('refresh_token', data.refresh_token);
});
''',
'problems': [
'XSS攻撃で簡単に盗まれる',
'ブラウザ拡張からアクセス可能',
'デバッグツールで誰でも見える'
],
'good_example': '''
// ✅ 良い例:メモリ内管理 + HttpOnly Cookie
class TokenManager {
constructor() {
this.accessToken = null;
this.expiresAt = null;
}
setAccessToken(token, expiresIn) {
this.accessToken = token;
this.expiresAt = Date.now() + (expiresIn * 1000);
// リフレッシュトークンはHttpOnly Cookieで
// サーバー側で設定
}
getAccessToken() {
if (Date.now() >= this.expiresAt) {
return this.refreshAccessToken();
}
return this.accessToken;
}
}
'''
},
'token_in_url': {
'bad_example': '''
# ❌ 悪い例:URLにトークン
@app.route('/api/data')
def get_data():
token = request.args.get('access_token') # URLパラメータ
return fetch_user_data(token)
# 呼び出し
GET /api/data?access_token=SECRET_TOKEN
''',
'problems': [
'サーバーログに記録される',
'ブラウザ履歴に残る',
'リファラーで漏洩',
'プロキシログに記録'
],
'good_example': '''
# ✅ 良い例:Authorizationヘッダー
@app.route('/api/data')
def get_data():
auth_header = request.headers.get('Authorization')
if not auth_header or not auth_header.startswith('Bearer '):
return {'error': 'Unauthorized'}, 401
token = auth_header.split(' ')[1]
return fetch_user_data(token)
# 呼び出し
GET /api/data
Authorization: Bearer SECRET_TOKEN
'''
}
}
def improper_token_validation(self):
"""不適切なトークン検証"""
return {
'no_expiration_check': {
'bad_example': '''
# ❌ 悪い例:有効期限チェックなし
def validate_token(token):
try:
payload = jwt.decode(token, key, options={"verify_exp": False})
return payload
except:
return None
''',
'good_example': '''
# ✅ 良い例:適切な検証
def validate_token(token):
try:
# 有効期限も含めて検証
payload = jwt.decode(
token,
key,
algorithms=['RS256'],
options={"verify_exp": True}
)
# 追加の検証
if 'aud' in payload and payload['aud'] != expected_audience:
raise InvalidAudienceError()
if 'iss' in payload and payload['iss'] != expected_issuer:
raise InvalidIssuerError()
return payload
except jwt.ExpiredSignatureError:
logging.warning("Token expired")
raise
except jwt.InvalidTokenError as e:
logging.error(f"Invalid token: {e}")
raise
'''
}
}
6.4.3 エラーハンドリングの落とし穴
class ErrorHandlingPitfalls:
"""エラーハンドリングの問題"""
def information_disclosure(self):
"""情報漏洩につながるエラー処理"""
return {
'detailed_error_messages': {
'bad_example': '''
# ❌ 悪い例:詳細すぎるエラー情報
@app.route('/oauth/token', methods=['POST'])
def token_endpoint():
try:
client_id = request.form['client_id']
client_secret = request.form['client_secret']
client = Client.query.filter_by(id=client_id).first()
if not client:
return {
'error': 'invalid_client',
'error_description': f'Client {client_id} not found in database'
}, 401
if client.secret != client_secret:
return {
'error': 'invalid_client',
'error_description': 'Client secret mismatch. Expected: ' + client.secret[:4] + '...'
}, 401
except Exception as e:
return {
'error': 'server_error',
'error_description': str(e),
'stack_trace': traceback.format_exc() # 絶対ダメ!
}, 500
''',
'good_example': '''
# ✅ 良い例:最小限のエラー情報
@app.route('/oauth/token', methods=['POST'])
def token_endpoint():
try:
# クライアント認証
if not authenticate_client(request):
# 詳細を隠す
return {
'error': 'invalid_client'
}, 401
# トークン処理
return process_token_request(request)
except InvalidGrantError:
return {'error': 'invalid_grant'}, 400
except Exception as e:
# 内部エラーはログに記録
app.logger.error(f"Token endpoint error: {e}", exc_info=True)
# クライアントには最小限の情報
return {'error': 'server_error'}, 500
'''
},
'timing_attacks': {
'bad_example': '''
# ❌ 悪い例:タイミング攻撃に脆弱
def verify_client_secret(client_id, provided_secret):
client = get_client(client_id)
if not client:
return False # すぐに返る
# 文字列比較(タイミングが異なる)
return client.secret == provided_secret
''',
'good_example': '''
# ✅ 良い例:定数時間比較
import hmac
def verify_client_secret(client_id, provided_secret):
client = get_client(client_id)
if not client:
# ダミーの比較を実行
expected = "dummy_secret_for_timing_protection"
else:
expected = client.secret
# 定数時間比較
return hmac.compare_digest(expected, provided_secret)
'''
}
}
6.4.4 設定ミスと実装の不整合
class ConfigurationMistakes:
"""設定ミスと実装の問題"""
def common_configuration_issues(self):
"""よくある設定ミス"""
return {
'development_settings_in_production': {
'bad_example': '''
# ❌ 悪い例:開発設定が本番に
class OAuthConfig:
# デバッグモードが有効
DEBUG = True
# HTTPSチェックが無効
REQUIRE_HTTPS = False
# すべてのredirect_uriを許可
ALLOW_ANY_REDIRECT_URI = True
# トークンの有効期限が長すぎる
ACCESS_TOKEN_LIFETIME = 86400 # 24時間
''',
'good_example': '''
# ✅ 良い例:環境別設定
class OAuthConfig:
def __init__(self, env='production'):
self.env = env
if env == 'production':
self.DEBUG = False
self.REQUIRE_HTTPS = True
self.ALLOW_ANY_REDIRECT_URI = False
self.ACCESS_TOKEN_LIFETIME = 900 # 15分
self.LOG_LEVEL = 'WARNING'
else:
# 開発環境
self.DEBUG = True
self.REQUIRE_HTTPS = False
self.ALLOW_ANY_REDIRECT_URI = False # それでも制限
self.ACCESS_TOKEN_LIFETIME = 3600
self.LOG_LEVEL = 'DEBUG'
'''
},
'grant_type_misconfiguration': {
'scenario': '不要なグラントタイプの有効化',
'bad_example': '''
# ❌ 悪い例:すべてのグラントタイプを有効化
ENABLED_GRANT_TYPES = [
'authorization_code',
'implicit', # 非推奨
'password', # 危険
'client_credentials',
'refresh_token'
]
''',
'good_example': '''
# ✅ 良い例:必要最小限のグラントタイプ
def get_allowed_grant_types(client):
if client.type == 'public':
# パブリッククライアントは限定的
return ['authorization_code'] # + PKCE必須
elif client.type == 'confidential':
return ['authorization_code', 'refresh_token']
elif client.type == 'service':
return ['client_credentials']
else:
return []
'''
}
}
まとめ
この章では、OAuth 2.0の設計思想から実装の詳細まで学びました:
- OAuth 2.0の設計思想
- パスワードアンチパターンの問題
- 委任による権限管理
- OAuth 1.0からの進化
- グラントタイプの使い分け
- Authorization Code:最も安全な標準フロー
- Client Credentials:サービス間認証
- Refresh Token:トークンの更新
- 最新のグラントタイプ
- セキュリティ考慮事項
- PKCEによるcode injection対策
- 各種脆弱性と対策
- セキュリティベストプラクティス
- 実装時の落とし穴
- Stateパラメータの誤用
- トークンの取り扱いミス
- エラーハンドリングの問題
- 設定ミスと実装の不整合
次章では、OpenID ConnectとSAMLについて、エンタープライズ環境でのSSO実現方法を学びます。
演習問題
問題1:OAuth 2.0クライアントの実装
以下の要件を満たすOAuth 2.0クライアントを実装しなさい:
- Authorization Code Grant with PKCE
- 自動的なトークンリフレッシュ
- 適切なエラーハンドリング
- セキュアなトークン保存
問題2:脆弱性の発見と修正
提供されたOAuth 2.0実装コードから脆弱性を見つけ、修正案を提示しなさい。
問題3:グラントタイプの選択
以下のシナリオに対して、適切なグラントタイプを選択し、理由を説明しなさい:
- モバイルアプリからのAPI利用
- 定期バッチ処理
- シングルページアプリケーション
- IoTデバイスの認証
問題4:PKCEの実装
PKCEを使用したAuthorization Code Grantの完全な実装を作成しなさい。サーバー側とクライアント側の両方を含むこと。
問題5:セキュリティ監査
既存のOAuth 2.0実装に対してセキュリティ監査を実施し、以下を報告しなさい:
- 発見された脆弱性
- リスク評価
- 修正優先度
- 実装改善案