第6章 演習問題解答
問題1:OAuth 2.0クライアントの実装
解答
import secrets
import hashlib
import base64
import time
import asyncio
import aiohttp
from typing import Dict, Optional, List, Tuple
from urllib.parse import urlencode, urlparse, parse_qs
import jwt
from cryptography.fernet import Fernet
class SecureOAuth2Client:
"""
セキュアなOAuth 2.0クライアント実装
- Authorization Code Grant with PKCE
- 自動トークンリフレッシュ
- セキュアなトークン保存
"""
def __init__(self, client_id: str, client_secret: Optional[str] = None,
auth_endpoint: str = None, token_endpoint: str = None,
is_public_client: bool = False):
self.client_id = client_id
self.client_secret = client_secret
self.auth_endpoint = auth_endpoint
self.token_endpoint = token_endpoint
self.is_public_client = is_public_client
# トークン暗号化用のキー
self.encryption_key = Fernet.generate_key()
self.fernet = Fernet(self.encryption_key)
# メモリ内トークン管理
self._access_token = None
self._token_expiry = None
self._refresh_token_encrypted = None
# リフレッシュ用のロック
self._refresh_lock = asyncio.Lock()
# セッション管理
self._sessions = {}
def generate_pkce_pair(self) -> Tuple[str, str]:
"""PKCE用のcode_verifierとcode_challengeを生成"""
# Code Verifier: 43-128文字のランダム文字列
code_verifier = base64.urlsafe_b64encode(
secrets.token_bytes(32)
).decode('utf-8').rstrip('=')
# Code Challenge: S256 method
challenge_bytes = hashlib.sha256(
code_verifier.encode('utf-8')
).digest()
code_challenge = base64.urlsafe_b64encode(
challenge_bytes
).decode('utf-8').rstrip('=')
return code_verifier, code_challenge
def create_authorization_url(self, redirect_uri: str,
scope: List[str],
**kwargs) -> Tuple[str, str]:
"""認可URLの生成(PKCE対応)"""
# State生成(CSRF対策)
state = secrets.token_urlsafe(32)
# PKCE生成
code_verifier, code_challenge = self.generate_pkce_pair()
# セッションに保存
session_id = secrets.token_urlsafe(16)
self._sessions[session_id] = {
'state': state,
'code_verifier': code_verifier,
'redirect_uri': redirect_uri,
'created_at': time.time()
}
# パラメータ構築
params = {
'response_type': 'code',
'client_id': self.client_id,
'redirect_uri': redirect_uri,
'scope': ' '.join(scope),
'state': state,
'code_challenge': code_challenge,
'code_challenge_method': 'S256'
}
# 追加パラメータ
params.update(kwargs)
auth_url = f"{self.auth_endpoint}?{urlencode(params)}"
return auth_url, session_id
async def exchange_code_for_token(self, authorization_response: str,
session_id: str) -> Dict:
"""認可コードをトークンに交換"""
# URLパース
parsed = urlparse(authorization_response)
params = parse_qs(parsed.query)
# エラーチェック
if 'error' in params:
error = params['error'][0]
error_description = params.get('error_description', [''])[0]
raise OAuthError(f"Authorization failed: {error} - {error_description}")
# 必須パラメータチェック
if 'code' not in params or 'state' not in params:
raise OAuthError("Missing required parameters")
code = params['code'][0]
state = params['state'][0]
# セッション取得と検証
session = self._sessions.get(session_id)
if not session:
raise SecurityError("Session not found")
# セッション有効期限チェック(10分)
if time.time() - session['created_at'] > 600:
del self._sessions[session_id]
raise SecurityError("Session expired")
# State検証
if state != session['state']:
raise SecurityError("State mismatch - possible CSRF attack")
# トークンリクエスト
token_data = {
'grant_type': 'authorization_code',
'code': code,
'redirect_uri': session['redirect_uri'],
'code_verifier': session['code_verifier']
}
# コンフィデンシャルクライアントの場合
if not self.is_public_client and self.client_secret:
token_data['client_id'] = self.client_id
token_data['client_secret'] = self.client_secret
else:
token_data['client_id'] = self.client_id
# セッション削除(一度だけ使用)
del self._sessions[session_id]
# トークンエンドポイントへのリクエスト
async with aiohttp.ClientSession() as session:
async with session.post(
self.token_endpoint,
data=token_data,
headers={'Accept': 'application/json'}
) as response:
if response.status != 200:
error_data = await response.json()
raise OAuthError(f"Token exchange failed: {error_data}")
tokens = await response.json()
# トークンの保存
await self._store_tokens(tokens)
return tokens
async def _store_tokens(self, tokens: Dict):
"""トークンの安全な保存"""
# アクセストークンはメモリに保持
self._access_token = tokens['access_token']
# 有効期限の計算
expires_in = tokens.get('expires_in', 3600)
self._token_expiry = time.time() + expires_in
# リフレッシュトークンは暗号化して保存
if 'refresh_token' in tokens:
refresh_bytes = tokens['refresh_token'].encode()
self._refresh_token_encrypted = self.fernet.encrypt(refresh_bytes)
# 自動リフレッシュのスケジューリング
if expires_in > 300: # 5分以上の有効期限
refresh_time = expires_in - 300 # 5分前にリフレッシュ
asyncio.create_task(self._schedule_refresh(refresh_time))
async def _schedule_refresh(self, delay: float):
"""自動リフレッシュのスケジューリング"""
await asyncio.sleep(delay)
try:
await self.refresh_access_token()
except Exception as e:
print(f"Auto-refresh failed: {e}")
async def get_access_token(self) -> str:
"""有効なアクセストークンの取得"""
# トークンの有効性チェック
if self._should_refresh():
await self.refresh_access_token()
if not self._access_token:
raise AuthenticationRequired("No valid access token")
return self._access_token
def _should_refresh(self) -> bool:
"""リフレッシュが必要かチェック"""
if not self._access_token or not self._token_expiry:
return True
# 1分のバッファを持ってチェック
return time.time() >= (self._token_expiry - 60)
async def refresh_access_token(self):
"""アクセストークンのリフレッシュ"""
# 重複リフレッシュ防止
async with self._refresh_lock:
# 再度チェック(別のコルーチンがリフレッシュ済みかも)
if not self._should_refresh():
return
if not self._refresh_token_encrypted:
raise AuthenticationRequired("No refresh token available")
# リフレッシュトークンの復号
refresh_token = self.fernet.decrypt(
self._refresh_token_encrypted
).decode()
# リフレッシュリクエスト
refresh_data = {
'grant_type': 'refresh_token',
'refresh_token': refresh_token
}
if not self.is_public_client and self.client_secret:
refresh_data['client_id'] = self.client_id
refresh_data['client_secret'] = self.client_secret
else:
refresh_data['client_id'] = self.client_id
async with aiohttp.ClientSession() as session:
async with session.post(
self.token_endpoint,
data=refresh_data,
headers={'Accept': 'application/json'}
) as response:
if response.status == 401:
# リフレッシュトークンが無効
self._clear_tokens()
raise AuthenticationRequired("Refresh token invalid")
if response.status != 200:
error_data = await response.json()
raise OAuthError(f"Token refresh failed: {error_data}")
tokens = await response.json()
# 新しいトークンの保存
await self._store_tokens(tokens)
async def make_authenticated_request(self, url: str, method: str = 'GET',
**kwargs) -> aiohttp.ClientResponse:
"""認証付きHTTPリクエスト"""
# トークン取得
access_token = await self.get_access_token()
# ヘッダー設定
headers = kwargs.get('headers', {})
headers['Authorization'] = f'Bearer {access_token}'
kwargs['headers'] = headers
async with aiohttp.ClientSession() as session:
async with session.request(method, url, **kwargs) as response:
# 401の場合は一度だけリトライ
if response.status == 401:
await self.refresh_access_token()
access_token = await self.get_access_token()
headers['Authorization'] = f'Bearer {access_token}'
# リトライ
async with session.request(method, url, **kwargs) as retry_response:
return retry_response
return response
def _clear_tokens(self):
"""トークンのクリア"""
self._access_token = None
self._token_expiry = None
self._refresh_token_encrypted = None
def logout(self):
"""ログアウト処理"""
self._clear_tokens()
self._sessions.clear()
# カスタム例外
class OAuthError(Exception):
pass
class SecurityError(Exception):
pass
class AuthenticationRequired(Exception):
pass
# 使用例
async def main():
# クライアント初期化
client = SecureOAuth2Client(
client_id='your-client-id',
client_secret=None, # パブリッククライアント
auth_endpoint='https://auth.example.com/authorize',
token_endpoint='https://auth.example.com/token',
is_public_client=True
)
# 認可URLの生成
auth_url, session_id = client.create_authorization_url(
redirect_uri='https://app.example.com/callback',
scope=['read', 'write']
)
print(f"Visit: {auth_url}")
# コールバック処理(実際はWebフレームワークで処理)
callback_url = input("Enter callback URL: ")
# トークン取得
tokens = await client.exchange_code_for_token(callback_url, session_id)
print(f"Access token obtained: {tokens['access_token'][:20]}...")
# API呼び出し
response = await client.make_authenticated_request(
'https://api.example.com/user/profile'
)
data = await response.json()
print(f"User data: {data}")
if __name__ == "__main__":
asyncio.run(main())
実装のポイント
- PKCE実装:S256メソッドでcode_challengeを生成
- 自動リフレッシュ:有効期限の5分前に自動更新
- セキュアなトークン保存:
- アクセストークン:メモリのみ
- リフレッシュトークン:暗号化して保存
- エラーハンドリング:
- 適切な例外クラス
- 401エラーでの自動リトライ
- セッションタイムアウト
問題2:脆弱性の発見と修正
提供されたコード(脆弱性あり)
# 脆弱なOAuth実装
class VulnerableOAuthClient:
def __init__(self):
self.client_id = "my-client-id"
self.client_secret = "my-secret-123" # ハードコード
def create_auth_url(self, redirect_uri):
# state なし
return f"https://auth.example.com/authorize?client_id={self.client_id}&redirect_uri={redirect_uri}&response_type=code"
def handle_callback(self, request):
code = request.args.get('code')
# HTTPでトークンリクエスト
response = requests.post('http://auth.example.com/token', data={
'grant_type': 'authorization_code',
'code': code,
'client_id': self.client_id,
'client_secret': self.client_secret
})
tokens = response.json()
# LocalStorageに保存
return f"""
<script>
localStorage.setItem('access_token', '{tokens['access_token']}');
localStorage.setItem('refresh_token', '{tokens['refresh_token']}');
</script>
"""
def call_api(self, token):
# URLにトークン
return requests.get(f"https://api.example.com/data?access_token={token}")
脆弱性と修正案
class SecureOAuthClient:
"""脆弱性を修正したOAuth実装"""
def __init__(self):
# 修正1: 環境変数から読み込み
self.client_id = os.environ.get('OAUTH_CLIENT_ID')
self.client_secret = os.environ.get('OAUTH_CLIENT_SECRET')
if not self.client_id:
raise ValueError("OAUTH_CLIENT_ID not configured")
def create_auth_url(self, redirect_uri, session):
# 修正2: CSRF対策のstate追加
state = secrets.token_urlsafe(32)
session['oauth_state'] = state
session['oauth_timestamp'] = time.time()
# 修正3: PKCE追加
verifier, challenge = self.generate_pkce_pair()
session['pkce_verifier'] = verifier
# 修正4: redirect_uriの検証
if not self._validate_redirect_uri(redirect_uri):
raise ValueError("Invalid redirect URI")
params = {
'client_id': self.client_id,
'redirect_uri': redirect_uri,
'response_type': 'code',
'state': state,
'code_challenge': challenge,
'code_challenge_method': 'S256'
}
return f"https://auth.example.com/authorize?{urlencode(params)}"
def handle_callback(self, request, session):
code = request.args.get('code')
state = request.args.get('state')
# 修正5: state検証
if not state or state != session.get('oauth_state'):
raise SecurityError("Invalid state - possible CSRF attack")
# 修正6: stateの有効期限チェック
if time.time() - session.get('oauth_timestamp', 0) > 600:
raise SecurityError("State expired")
# セッションから削除
session.pop('oauth_state', None)
verifier = session.pop('pkce_verifier', None)
# 修正7: HTTPSを使用
response = requests.post('https://auth.example.com/token',
data={
'grant_type': 'authorization_code',
'code': code,
'client_id': self.client_id,
'client_secret': self.client_secret,
'code_verifier': verifier
},
# 修正8: タイムアウト設定
timeout=10
)
if response.status_code != 200:
# 修正9: 詳細なエラー情報を隠す
app.logger.error(f"Token exchange failed: {response.text}")
raise OAuthError("Token exchange failed")
tokens = response.json()
# 修正10: セキュアなトークン保存
# リフレッシュトークンはHttpOnly Cookieに
resp = make_response(redirect('/dashboard'))
resp.set_cookie(
'refresh_token',
value=tokens['refresh_token'],
httponly=True,
secure=True,
samesite='Lax',
max_age=2592000 # 30日
)
# アクセストークンはセッションに(サーバー側)
session['access_token'] = tokens['access_token']
session['token_expiry'] = time.time() + tokens.get('expires_in', 3600)
return resp
def call_api(self, session):
# 修正11: Authorizationヘッダーを使用
token = session.get('access_token')
if not token:
raise AuthenticationRequired()
# 修正12: 有効期限チェック
if time.time() >= session.get('token_expiry', 0):
# リフレッシュが必要
self.refresh_token(session)
token = session['access_token']
response = requests.get(
"https://api.example.com/data",
headers={
'Authorization': f'Bearer {token}',
'X-Request-ID': str(uuid.uuid4()) # トレーサビリティ
},
timeout=10
)
return response
def _validate_redirect_uri(self, uri):
"""redirect_uriの検証"""
allowed_uris = [
'https://app.example.com/callback',
'https://localhost:3000/callback' # 開発環境
]
return uri in allowed_uris
発見された脆弱性一覧
- ハードコードされた認証情報
- 影響:ソースコード漏洩時に認証情報も漏洩
- 修正:環境変数使用
- CSRF対策の欠如
- 影響:認可コードインジェクション攻撃
- 修正:stateパラメータの実装
- PKCEの未実装
- 影響:認可コード横取り攻撃
- 修正:PKCEの実装
- HTTPの使用
- 影響:中間者攻撃によるトークン窃取
- 修正:HTTPSの強制
- トークンのLocalStorage保存
- 影響:XSS攻撃によるトークン窃取
- 修正:HttpOnly Cookieとサーバーセッション
- URLでのトークン送信
- 影響:ログやリファラーでの漏洩
- 修正:Authorizationヘッダー使用
- redirect_uriの検証不足
- 影響:オープンリダイレクト
- 修正:ホワイトリスト検証
- エラー情報の詳細開示
- 影響:攻撃者への情報提供
- 修正:最小限のエラー情報
問題3:グラントタイプの選択
解答
1. モバイルアプリからのAPI利用
選択:Authorization Code Grant + PKCE
理由:
- モバイルアプリはパブリッククライアント(client_secretを安全に保存できない)
- PKCEにより認可コード横取り攻撃を防止
- ユーザーコンテキストが必要
- リフレッシュトークンによる長期アクセス可能
実装例:
// iOS実装例
class OAuthManager {
func authenticate() {
// PKCE生成
let verifier = generateCodeVerifier()
let challenge = generateCodeChallenge(verifier)
// 認可URLを構築してSafariViewControllerで開く
let authURL = buildAuthURL(
codeChallenge: challenge,
redirectURI: "myapp://callback"
)
// カスタムURLスキームでコールバック処理
}
}
2. 定期バッチ処理
選択:Client Credentials Grant
理由:
- ユーザーコンテキスト不要(システム間通信)
- サーバー環境でclient_secretを安全に管理可能
- 非対話的な処理に適している
- シンプルなフロー
実装例:
class BatchProcessor:
async def get_system_token(self):
"""システム用トークンの取得"""
response = await self.http_client.post(
'https://auth.example.com/token',
data={
'grant_type': 'client_credentials',
'client_id': self.client_id,
'client_secret': self.client_secret,
'scope': 'batch:process'
}
)
return response.json()['access_token']
3. シングルページアプリケーション(SPA)
選択:Authorization Code Grant + PKCE(Implicit Grantは非推奨)
理由:
- Implicit Grantはトークンがブラウザ履歴に残るリスク
- Authorization Code + PKCEでセキュリティ向上
- BFF(Backend for Frontend)パターンも検討
- トークンはメモリ内管理
実装例:
class SPAAuthManager {
async authenticate() {
// PKCE対応
const codeVerifier = this.generateCodeVerifier();
const codeChallenge = await this.generateCodeChallenge(codeVerifier);
// セッションストレージに一時保存
sessionStorage.setItem('pkce_verifier', codeVerifier);
// 認可エンドポイントへリダイレクト
window.location.href = this.buildAuthURL({
response_type: 'code',
code_challenge: codeChallenge,
code_challenge_method: 'S256'
});
}
async handleCallback() {
const code = new URLSearchParams(window.location.search).get('code');
const verifier = sessionStorage.getItem('pkce_verifier');
// トークン取得
const tokens = await this.exchangeCode(code, verifier);
// メモリに保存(LocalStorageは使わない)
this.tokenManager.setTokens(tokens);
}
}
4. IoTデバイスの認証
選択:Device Authorization Grant (RFC 8628)
理由:
- キーボード入力が困難または不可能
- 画面が小さいまたは存在しない
- QRコードや短いコードで認証可能
- ユーザーは別デバイス(スマホ等)で認証
実装例:
class IoTDeviceAuth:
async def start_device_flow(self):
"""デバイスフローの開始"""
response = await self.http_client.post(
'https://auth.example.com/device/code',
data={
'client_id': self.device_id,
'scope': 'device:control'
}
)
device_code = response.json()
# ユーザーに表示
print(f"Please visit: {device_code['verification_uri']}")
print(f"Enter code: {device_code['user_code']}")
# ポーリング開始
return await self.poll_for_token(device_code['device_code'])
選択基準のまとめ
シナリオ | グラントタイプ | 主な理由 |
---|---|---|
モバイルアプリ | Authorization Code + PKCE | パブリッククライアント、ユーザーコンテキスト必要 |
バッチ処理 | Client Credentials | システム間通信、ユーザー不在 |
SPA | Authorization Code + PKCE | Implicitは非推奨、セキュリティ重視 |
IoTデバイス | Device Authorization | 入力制限、別デバイスでの認証 |
問題4:PKCEの実装
解答
サーバー側実装
from flask import Flask, request, jsonify, session
import secrets
import hashlib
import base64
import time
import redis
import jwt
app = Flask(__name__)
redis_client = redis.Redis()
class PKCEAuthorizationServer:
"""PKCE対応の認可サーバー実装"""
def __init__(self):
self.redis = redis_client
self.signing_key = "your-signing-key" # 実際は安全に管理
@app.route('/authorize', methods=['GET'])
def authorize(self):
"""認可エンドポイント"""
# 必須パラメータの検証
required_params = ['client_id', 'redirect_uri', 'response_type',
'code_challenge', 'code_challenge_method']
for param in required_params:
if param not in request.args:
return jsonify({'error': 'invalid_request',
'error_description': f'Missing {param}'}), 400
client_id = request.args.get('client_id')
redirect_uri = request.args.get('redirect_uri')
code_challenge = request.args.get('code_challenge')
code_challenge_method = request.args.get('code_challenge_method')
# クライアント検証
if not self._validate_client(client_id, redirect_uri):
return jsonify({'error': 'invalid_client'}), 401
# PKCE検証
if code_challenge_method != 'S256':
return jsonify({'error': 'invalid_request',
'error_description': 'Only S256 supported'}), 400
# code_challengeの形式検証(Base64URL)
if not self._is_valid_base64url(code_challenge):
return jsonify({'error': 'invalid_request',
'error_description': 'Invalid code_challenge'}), 400
# ユーザー認証(実際の実装では認証画面を表示)
user_id = self._authenticate_user()
if not user_id:
return jsonify({'error': 'access_denied'}), 403
# 認可コード生成
auth_code = secrets.token_urlsafe(32)
# 認可コード情報を保存(10分間有効)
auth_code_data = {
'client_id': client_id,
'user_id': user_id,
'redirect_uri': redirect_uri,
'code_challenge': code_challenge,
'code_challenge_method': code_challenge_method,
'scope': request.args.get('scope', ''),
'created_at': time.time()
}
self.redis.setex(
f'auth_code:{auth_code}',
600, # 10分
json.dumps(auth_code_data)
)
# リダイレクト
state = request.args.get('state', '')
redirect_params = {
'code': auth_code,
'state': state
}
redirect_url = f"{redirect_uri}?{urlencode(redirect_params)}"
return redirect(redirect_url)
@app.route('/token', methods=['POST'])
def token(self):
"""トークンエンドポイント"""
grant_type = request.form.get('grant_type')
if grant_type != 'authorization_code':
return jsonify({'error': 'unsupported_grant_type'}), 400
# 必須パラメータ
code = request.form.get('code')
code_verifier = request.form.get('code_verifier')
client_id = request.form.get('client_id')
if not all([code, code_verifier, client_id]):
return jsonify({'error': 'invalid_request'}), 400
# 認可コード情報の取得
auth_code_key = f'auth_code:{code}'
auth_code_data = self.redis.get(auth_code_key)
if not auth_code_data:
return jsonify({'error': 'invalid_grant',
'error_description': 'Invalid or expired code'}), 400
auth_code_info = json.loads(auth_code_data)
# 認可コードの削除(一度だけ使用可能)
self.redis.delete(auth_code_key)
# クライアントID検証
if auth_code_info['client_id'] != client_id:
return jsonify({'error': 'invalid_client'}), 401
# redirect_uri検証(提供された場合)
redirect_uri = request.form.get('redirect_uri')
if redirect_uri and redirect_uri != auth_code_info['redirect_uri']:
return jsonify({'error': 'invalid_grant',
'error_description': 'redirect_uri mismatch'}), 400
# PKCE検証
if not self._verify_pkce(code_verifier, auth_code_info['code_challenge']):
return jsonify({'error': 'invalid_grant',
'error_description': 'PKCE verification failed'}), 400
# トークン生成
access_token = self._generate_access_token(
auth_code_info['user_id'],
auth_code_info['client_id'],
auth_code_info['scope']
)
refresh_token = self._generate_refresh_token(
auth_code_info['user_id'],
auth_code_info['client_id']
)
# レスポンス
return jsonify({
'access_token': access_token,
'token_type': 'Bearer',
'expires_in': 3600,
'refresh_token': refresh_token,
'scope': auth_code_info['scope']
})
def _verify_pkce(self, verifier: str, challenge: str) -> bool:
"""PKCE検証"""
# S256: BASE64URL(SHA256(verifier)) == challenge
verifier_bytes = verifier.encode('utf-8')
calculated_challenge = base64.urlsafe_b64encode(
hashlib.sha256(verifier_bytes).digest()
).decode('utf-8').rstrip('=')
return calculated_challenge == challenge
def _is_valid_base64url(self, value: str) -> bool:
"""Base64URL形式の検証"""
# Base64URL文字のみ
import re
return bool(re.match(r'^[A-Za-z0-9_-]+$', value))
def _generate_access_token(self, user_id: str, client_id: str,
scope: str) -> str:
"""アクセストークン生成"""
payload = {
'sub': user_id,
'client_id': client_id,
'scope': scope,
'iat': int(time.time()),
'exp': int(time.time() + 3600)
}
return jwt.encode(payload, self.signing_key, algorithm='HS256')
# サーバー初期化
auth_server = PKCEAuthorizationServer()
クライアント側実装
class PKCEClient:
"""PKCE対応のOAuth 2.0クライアント"""
def __init__(self, client_id: str, auth_endpoint: str,
token_endpoint: str, redirect_uri: str):
self.client_id = client_id
self.auth_endpoint = auth_endpoint
self.token_endpoint = token_endpoint
self.redirect_uri = redirect_uri
def generate_code_verifier(self) -> str:
"""Code Verifierの生成(43-128文字)"""
# 32バイト = 256ビット、Base64URLエンコードで約43文字
random_bytes = secrets.token_bytes(32)
code_verifier = base64.urlsafe_b64encode(random_bytes).decode('utf-8')
return code_verifier.rstrip('=')
def generate_code_challenge(self, verifier: str) -> str:
"""Code Challengeの生成(S256)"""
# SHA256ハッシュ
challenge_bytes = hashlib.sha256(verifier.encode('utf-8')).digest()
# Base64URLエンコード
code_challenge = base64.urlsafe_b64encode(challenge_bytes).decode('utf-8')
return code_challenge.rstrip('=')
def create_authorization_url(self, scope: List[str],
state: Optional[str] = None) -> Dict[str, str]:
"""認可URLの生成"""
# PKCE生成
code_verifier = self.generate_code_verifier()
code_challenge = self.generate_code_challenge(code_verifier)
# State生成(CSRF対策)
if not state:
state = secrets.token_urlsafe(32)
# パラメータ構築
params = {
'response_type': 'code',
'client_id': self.client_id,
'redirect_uri': self.redirect_uri,
'scope': ' '.join(scope),
'state': state,
'code_challenge': code_challenge,
'code_challenge_method': 'S256'
}
auth_url = f"{self.auth_endpoint}?{urlencode(params)}"
return {
'url': auth_url,
'state': state,
'code_verifier': code_verifier
}
async def exchange_code_for_token(self, code: str, state: str,
stored_state: str,
code_verifier: str) -> Dict:
"""認可コードをトークンに交換"""
# State検証
if state != stored_state:
raise SecurityError("State mismatch - possible CSRF attack")
# トークンリクエスト
token_data = {
'grant_type': 'authorization_code',
'code': code,
'redirect_uri': self.redirect_uri,
'client_id': self.client_id,
'code_verifier': code_verifier
}
async with aiohttp.ClientSession() as session:
async with session.post(
self.token_endpoint,
data=token_data,
headers={'Accept': 'application/json'}
) as response:
if response.status != 200:
error_data = await response.json()
raise OAuthError(f"Token exchange failed: {error_data}")
return await response.json()
def verify_pkce_implementation(self):
"""PKCE実装の検証テスト"""
# 正常なケース
verifier = self.generate_code_verifier()
challenge = self.generate_code_challenge(verifier)
# 検証
recalculated = self.generate_code_challenge(verifier)
assert challenge == recalculated, "PKCE calculation mismatch"
# 異なるverifierでは一致しない
different_verifier = self.generate_code_verifier()
different_challenge = self.generate_code_challenge(different_verifier)
assert challenge != different_challenge, "Different verifiers produced same challenge"
print("PKCE implementation verified successfully")
# 使用例
async def main():
client = PKCEClient(
client_id='my-spa-app',
auth_endpoint='https://auth.example.com/authorize',
token_endpoint='https://auth.example.com/token',
redirect_uri='https://app.example.com/callback'
)
# 認可フロー開始
auth_data = client.create_authorization_url(['read', 'write'])
print(f"Visit: {auth_data['url']}")
# ブラウザで認可後、コールバックを処理
# callback?code=AUTH_CODE&state=STATE
# トークン交換
tokens = await client.exchange_code_for_token(
code='AUTH_CODE',
state='STATE',
stored_state=auth_data['state'],
code_verifier=auth_data['code_verifier']
)
print(f"Access token: {tokens['access_token']}")
実装のポイント
- Code Verifierの生成:暗号学的に安全な乱数を使用
- Code Challengeの計算:S256メソッド(SHA256)を使用
- 一度だけ使用可能:認可コードは使用後即座に削除
- タイミング攻撃対策:PKCE検証で定数時間比較
問題5:セキュリティ監査
既存のOAuth 2.0実装の監査結果
class OAuthSecurityAudit:
"""OAuth 2.0実装のセキュリティ監査"""
def __init__(self):
self.vulnerabilities = []
self.risk_scores = {}
def audit_implementation(self, codebase):
"""包括的なセキュリティ監査"""
# 1. 設定の監査
self._audit_configuration()
# 2. 実装の監査
self._audit_implementation()
# 3. 運用の監査
self._audit_operations()
return self.generate_report()
def _audit_configuration(self):
"""設定面の監査"""
findings = [
{
'id': 'CONF-001',
'title': 'Implicit Grant有効化',
'severity': 'HIGH',
'description': 'Implicit Grantが有効になっている',
'evidence': '''
ALLOWED_GRANT_TYPES = [
'authorization_code',
'implicit', # セキュリティリスク
'client_credentials'
]
''',
'risk': 'トークンがブラウザ履歴に残る、リファラー漏洩',
'recommendation': 'Implicit Grantを無効化し、Authorization Code + PKCEを使用'
},
{
'id': 'CONF-002',
'title': 'アクセストークンの有効期限が長い',
'severity': 'MEDIUM',
'description': '24時間の有効期限は長すぎる',
'evidence': 'ACCESS_TOKEN_LIFETIME = 86400 # 24時間',
'risk': 'トークン漏洩時の影響期間が長い',
'recommendation': '15分〜1時間に短縮'
},
{
'id': 'CONF-003',
'title': 'HTTPS強制なし',
'severity': 'CRITICAL',
'description': 'HTTP通信が許可されている',
'evidence': 'app.config["REQUIRE_HTTPS"] = False',
'risk': '中間者攻撃によるトークン窃取',
'recommendation': 'すべてのエンドポイントでHTTPS必須化'
}
]
self.vulnerabilities.extend(findings)
def _audit_implementation(self):
"""実装面の監査"""
findings = [
{
'id': 'IMPL-001',
'title': 'redirect_uriの検証不足',
'severity': 'HIGH',
'description': '部分一致での検証は危険',
'evidence': '''
def validate_redirect_uri(uri, registered_uri):
return uri.startswith(registered_uri) # 危険!
''',
'risk': 'オープンリダイレクト攻撃',
'recommendation': '''
def validate_redirect_uri(uri, registered_uris):
return uri in registered_uris # 完全一致
'''
},
{
'id': 'IMPL-002',
'title': 'PKCE未実装',
'severity': 'HIGH',
'description': 'パブリッククライアントでPKCEなし',
'evidence': 'SPAクライアントでPKCEパラメータなし',
'risk': '認可コード横取り攻撃',
'recommendation': 'すべてのパブリッククライアントでPKCE必須化'
},
{
'id': 'IMPL-003',
'title': 'エラー情報の過剰開示',
'severity': 'MEDIUM',
'description': 'スタックトレースが返される',
'evidence': '''
except Exception as e:
return {"error": str(e), "trace": traceback.format_exc()}
''',
'risk': '内部構造の情報漏洩',
'recommendation': '本番環境では最小限のエラー情報のみ返す'
}
]
self.vulnerabilities.extend(findings)
def _audit_operations(self):
"""運用面の監査"""
findings = [
{
'id': 'OPS-001',
'title': 'トークン無効化機能なし',
'severity': 'MEDIUM',
'description': 'ログアウト時のトークン無効化未実装',
'evidence': 'logout()関数でトークン無効化処理なし',
'risk': 'ログアウト後もトークンが有効',
'recommendation': 'トークンブラックリストまたは短い有効期限の実装'
},
{
'id': 'OPS-002',
'title': '監査ログ不足',
'severity': 'LOW',
'description': '認証イベントのログが不十分',
'evidence': '失敗した認証試行がログされていない',
'risk': '攻撃の検知が困難',
'recommendation': '包括的な監査ログの実装'
}
]
self.vulnerabilities.extend(findings)
def calculate_risk_scores(self):
"""リスクスコアの計算"""
severity_scores = {
'CRITICAL': 10,
'HIGH': 7,
'MEDIUM': 4,
'LOW': 1
}
total_score = 0
for vuln in self.vulnerabilities:
score = severity_scores[vuln['severity']]
total_score += score
return {
'total_score': total_score,
'risk_level': self._get_risk_level(total_score),
'severity_breakdown': {
'CRITICAL': len([v for v in self.vulnerabilities if v['severity'] == 'CRITICAL']),
'HIGH': len([v for v in self.vulnerabilities if v['severity'] == 'HIGH']),
'MEDIUM': len([v for v in self.vulnerabilities if v['severity'] == 'MEDIUM']),
'LOW': len([v for v in self.vulnerabilities if v['severity'] == 'LOW'])
}
}
def _get_risk_level(self, score):
if score >= 30:
return 'CRITICAL'
elif score >= 20:
return 'HIGH'
elif score >= 10:
return 'MEDIUM'
else:
return 'LOW'
def generate_report(self):
"""監査レポートの生成"""
risk_scores = self.calculate_risk_scores()
return {
'executive_summary': {
'overall_risk': risk_scores['risk_level'],
'total_findings': len(self.vulnerabilities),
'critical_findings': risk_scores['severity_breakdown']['CRITICAL'],
'immediate_action_required': risk_scores['risk_level'] in ['CRITICAL', 'HIGH']
},
'findings': self.vulnerabilities,
'remediation_priority': [
{
'phase': 'Immediate (1 week)',
'items': [v['id'] for v in self.vulnerabilities if v['severity'] == 'CRITICAL'],
'focus': 'セキュリティクリティカルな問題の修正'
},
{
'phase': 'Short-term (1 month)',
'items': [v['id'] for v in self.vulnerabilities if v['severity'] == 'HIGH'],
'focus': '主要な脆弱性の対処'
},
{
'phase': 'Medium-term (3 months)',
'items': [v['id'] for v in self.vulnerabilities if v['severity'] in ['MEDIUM', 'LOW']],
'focus': '運用改善とベストプラクティスの適用'
}
],
'recommendations': {
'immediate': [
'HTTPS強制の有効化',
'Implicit Grantの無効化',
'PKCEの実装'
],
'short_term': [
'アクセストークン有効期限の短縮',
'redirect_uri検証の強化',
'エラーハンドリングの改善'
],
'long_term': [
'トークン無効化システムの実装',
'包括的な監査ログ',
'セキュリティテストの自動化'
]
}
}
監査結果サマリー
発見された脆弱性
- CRITICAL(2件)
- HTTPS非強制
- (その他の重大な問題)
- HIGH(3件)
- Implicit Grant有効
- redirect_uri検証不足
- PKCE未実装
- MEDIUM(2件)
- トークン有効期限が長い
- エラー情報の過剰開示
- LOW(1件)
- 監査ログ不足
リスク評価
- 総合リスクレベル:HIGH
- 即時対応必要:YES
- 推定修正期間:3ヶ月
実装改善案
# 改善実装例
class ImprovedOAuth2Implementation:
def __init__(self):
# セキュアな設定
self.config = {
'require_https': True,
'allowed_grant_types': ['authorization_code'],
'require_pkce': True,
'access_token_lifetime': 900, # 15分
'refresh_token_lifetime': 2592000, # 30日
'enable_token_revocation': True
}
def validate_client_request(self, request):
"""改善されたリクエスト検証"""
# HTTPS強制
if not request.is_secure and not self._is_localhost(request):
raise SecurityError("HTTPS required")
# PKCEチェック
if self.config['require_pkce']:
if not request.args.get('code_challenge'):
raise ValueError("PKCE required")
return True