第16章:コンプライアンスとガバナンス

16.1 ライセンス管理の基本

オープンソースライセンスの選択

主要なライセンスの比較

# license_comparison.yaml
licenses:
  MIT:
    permissions:
      - commercial_use: true
      - modification: true
      - distribution: true
      - private_use: true
    conditions:
      - include_copyright: true
      - include_license: true
    limitations:
      - liability: false
      - warranty: false
    use_case: "最も制限が少ない。商用利用も自由"
    
  Apache-2.0:
    permissions:
      - commercial_use: true
      - modification: true
      - distribution: true
      - private_use: true
      - patent_use: true
    conditions:
      - include_copyright: true
      - include_license: true
      - state_changes: true
      - include_notice: true
    limitations:
      - liability: false
      - warranty: false
    use_case: "特許条項あり。企業向けプロジェクトに適している"
    
  GPL-3.0:
    permissions:
      - commercial_use: true
      - modification: true
      - distribution: true
      - private_use: true
      - patent_use: true
    conditions:
      - include_copyright: true
      - include_license: true
      - state_changes: true
      - disclose_source: true
      - same_license: true
    limitations:
      - liability: false
      - warranty: false
    use_case: "コピーレフト。派生物も同じライセンスが必要"
    
  BSD-3-Clause:
    permissions:
      - commercial_use: true
      - modification: true
      - distribution: true
      - private_use: true
    conditions:
      - include_copyright: true
      - include_license: true
    limitations:
      - liability: false
      - warranty: false
    use_case: "MITに似ているが、名前の使用に制限あり"

ライセンスチェッカーの実装

依存関係のライセンス確認

# scripts/license_checker.py
import subprocess
import json
import requests
from typing import Dict, List, Set

class LicenseChecker:
    def __init__(self):
        self.allowed_licenses = {
            'MIT', 'Apache-2.0', 'BSD-3-Clause', 'BSD-2-Clause',
            'ISC', 'Python-2.0', 'PSF', 'CC0-1.0', 'Unlicense'
        }
        
        self.copyleft_licenses = {
            'GPL-2.0', 'GPL-3.0', 'AGPL-3.0', 'LGPL-2.1', 'LGPL-3.0'
        }
        
        self.commercial_restricted = {
            'CC-BY-NC', 'CC-BY-NC-SA', 'CC-BY-NC-ND'
        }
        
    def check_dependencies(self, requirements_file='requirements.txt'):
        """依存関係のライセンスをチェック"""
        issues = []
        
        # pip-licensesを使用してライセンス情報を取得
        result = subprocess.run(
            ['pip-licenses', '--format=json', '--with-urls'],
            capture_output=True,
            text=True
        )
        
        if result.returncode != 0:
            raise RuntimeError("Failed to run pip-licenses")
            
        licenses_data = json.loads(result.stdout)
        
        for package in licenses_data:
            license_name = package['License']
            
            # ライセンスをチェック
            if license_name in self.copyleft_licenses:
                issues.append({
                    'package': package['Name'],
                    'version': package['Version'],
                    'license': license_name,
                    'type': 'copyleft',
                    'severity': 'high',
                    'message': f"Copyleft license may require source code disclosure"
                })
                
            elif license_name in self.commercial_restricted:
                issues.append({
                    'package': package['Name'],
                    'version': package['Version'],
                    'license': license_name,
                    'type': 'commercial_restriction',
                    'severity': 'high',
                    'message': f"License restricts commercial use"
                })
                
            elif license_name not in self.allowed_licenses:
                if license_name == 'UNKNOWN':
                    severity = 'high'
                    message = "Unknown license - manual review required"
                else:
                    severity = 'medium'
                    message = f"License not in allowed list"
                    
                issues.append({
                    'package': package['Name'],
                    'version': package['Version'],
                    'license': license_name,
                    'type': 'not_allowed',
                    'severity': severity,
                    'message': message
                })
                
        return issues
    
    def generate_license_report(self):
        """ライセンスレポートを生成"""
        issues = self.check_dependencies()
        
        # ライセンス別に集計
        by_license = {}
        for package in self._get_all_packages():
            license_name = package['license']
            if license_name not in by_license:
                by_license[license_name] = []
            by_license[license_name].append(package['name'])
            
        report = """# License Compliance Report

## Summary
- Total packages: {}
- License issues found: {}
- High severity issues: {}

## License Distribution
""".format(
            self._count_packages(),
            len(issues),
            len([i for i in issues if i['severity'] == 'high'])
        )
        
        for license_name, packages in sorted(by_license.items()):
            report += f"\n### {license_name} ({len(packages)} packages)\n"
            if len(packages) <= 10:
                for pkg in packages:
                    report += f"- {pkg}\n"
            else:
                report += f"- {', '.join(packages[:5])}... and {len(packages)-5} more\n"
                
        if issues:
            report += "\n## Issues Found\n"
            
            # 重要度順にソート
            for issue in sorted(issues, key=lambda x: x['severity'], reverse=True):
                report += f"""
### {issue['package']} v{issue['version']}
- **License**: {issue['license']}
- **Type**: {issue['type']}
- **Severity**: {issue['severity']}
- **Issue**: {issue['message']}
"""
                
        report += "\n## Recommendations\n"
        report += self._generate_recommendations(issues)
        
        return report
    
    def check_license_compatibility(self, project_license, dependency_licenses):
        """ライセンスの互換性をチェック"""
        compatibility_matrix = {
            'MIT': {
                'compatible': ['MIT', 'BSD', 'Apache-2.0', 'ISC'],
                'incompatible': []
            },
            'Apache-2.0': {
                'compatible': ['MIT', 'BSD', 'Apache-2.0', 'ISC'],
                'incompatible': ['GPL-2.0']  # GPL-2.0とは非互換
            },
            'GPL-3.0': {
                'compatible': ['MIT', 'BSD', 'Apache-2.0', 'LGPL', 'GPL-3.0'],
                'incompatible': ['proprietary']
            }
        }
        
        issues = []
        project_compat = compatibility_matrix.get(project_license, {})
        
        for dep_license in dependency_licenses:
            if dep_license in project_compat.get('incompatible', []):
                issues.append({
                    'project_license': project_license,
                    'dependency_license': dep_license,
                    'issue': 'License incompatibility detected'
                })
                
        return issues

ライセンスヘッダーの管理

自動ヘッダー挿入

# scripts/license_header.py
import os
from pathlib import Path
from datetime import datetime

class LicenseHeaderManager:
    def __init__(self, license_type='MIT', organization='AI Research Lab'):
        self.license_type = license_type
        self.organization = organization
        self.year = datetime.now().year
        
        self.headers = {
            'MIT': """# Copyright (c) {year} {organization}
# 
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
# 
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
# 
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
""",
            'Apache-2.0': """# Copyright {year} {organization}
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
        }
        
    def add_header_to_file(self, file_path):
        """ファイルにライセンスヘッダーを追加"""
        file_path = Path(file_path)
        
        # 既存の内容を読み込み
        with open(file_path, 'r', encoding='utf-8') as f:
            content = f.read()
            
        # すでにヘッダーがある場合はスキップ
        if 'Copyright' in content[:500]:
            return False
            
        # ヘッダーを追加
        header = self.headers[self.license_type].format(
            year=self.year,
            organization=self.organization
        )
        
        # Shebangがある場合は、その後に挿入
        if content.startswith('#!'):
            lines = content.split('\n', 1)
            new_content = lines[0] + '\n' + header + '\n' + lines[1]
        else:
            new_content = header + '\n' + content
            
        # ファイルに書き込み
        with open(file_path, 'w', encoding='utf-8') as f:
            f.write(new_content)
            
        return True
    
    def add_headers_to_project(self, root_dir, extensions=['.py', '.js']):
        """プロジェクト全体にヘッダーを追加"""
        root_path = Path(root_dir)
        files_updated = 0
        
        for ext in extensions:
            for file_path in root_path.rglob(f'*{ext}'):
                # 除外パス
                if any(part in file_path.parts for part in 
                       ['venv', '__pycache__', 'node_modules', '.git']):
                    continue
                    
                if self.add_header_to_file(file_path):
                    files_updated += 1
                    print(f"Added header to: {file_path}")
                    
        return files_updated

16.2 Export Control対応

暗号化技術の輸出規制

輸出規制チェッカー

# scripts/export_control.py
import re
from pathlib import Path

class ExportControlChecker:
    def __init__(self):
        # 規制対象となる可能性のある暗号化関連キーワード
        self.crypto_keywords = {
            'high_risk': [
                'AES', 'RSA', 'DES', 'Blowfish', 'Twofish',
                'encrypt', 'decrypt', 'cipher', 'cryptography'
            ],
            'medium_risk': [
                'hash', 'SHA', 'MD5', 'HMAC', 'signature',
                'public_key', 'private_key', 'symmetric_key'
            ],
            'ml_specific': [
                'homomorphic_encryption', 'secure_aggregation',
                'differential_privacy', 'secure_multiparty'
            ]
        }
        
    def scan_codebase(self, root_dir):
        """コードベースをスキャンして輸出規制対象を検出"""
        findings = []
        root_path = Path(root_dir)
        
        for py_file in root_path.rglob('*.py'):
            # ファイルを読み込み
            try:
                with open(py_file, 'r', encoding='utf-8') as f:
                    content = f.read()
                    
                # キーワードをチェック
                file_findings = self._check_content(content, py_file)
                if file_findings:
                    findings.extend(file_findings)
                    
            except Exception as e:
                print(f"Error reading {py_file}: {e}")
                
        return findings
    
    def _check_content(self, content, file_path):
        """コンテンツに規制対象のキーワードが含まれるかチェック"""
        findings = []
        
        # インポート文をチェック
        imports = re.findall(r'^\s*(?:from|import)\s+([^\s]+)', 
                           content, re.MULTILINE)
        
        for imp in imports:
            if any(crypto in imp.lower() for crypto in ['crypto', 'cipher', 'ssl']):
                findings.append({
                    'file': str(file_path),
                    'type': 'import',
                    'risk': 'high',
                    'detail': f"Cryptographic import: {imp}",
                    'line': self._find_line_number(content, imp)
                })
                
        # 関数・クラス定義をチェック
        for risk_level, keywords in self.crypto_keywords.items():
            for keyword in keywords:
                pattern = rf'\b{keyword}\b'
                matches = re.finditer(pattern, content, re.IGNORECASE)
                
                for match in matches:
                    findings.append({
                        'file': str(file_path),
                        'type': 'keyword',
                        'risk': risk_level.replace('_risk', ''),
                        'detail': f"Found: {match.group()}",
                        'line': self._find_line_number(content, match.group(), 
                                                     match.start())
                    })
                    
        return findings
    
    def generate_eccn_classification(self, findings):
        """ECCN(輸出規制分類番号)の推定"""
        classification = {
            'likely_eccn': None,
            'confidence': 'low',
            'requires_review': True,
            'notes': []
        }
        
        # 暗号化機能の検出
        crypto_imports = [f for f in findings if 
                         f['type'] == 'import' and f['risk'] == 'high']
        
        if crypto_imports:
            classification['likely_eccn'] = '5D002'  # 暗号ソフトウェア
            classification['confidence'] = 'medium'
            classification['notes'].append(
                "Contains cryptographic functionality - likely subject to export controls"
            )
            
        # 機械学習特有の暗号化
        ml_crypto = [f for f in findings if 
                    any(kw in f['detail'] for kw in 
                        self.crypto_keywords['ml_specific'])]
        
        if ml_crypto:
            classification['notes'].append(
                "Contains ML-specific encryption (federated learning, etc.)"
            )
            
        return classification
    
    def generate_export_notice(self):
        """輸出規制通知を生成"""
        return """# Export Control Notice

This software may be subject to export control laws and regulations, including but not limited to:

- U.S. Export Administration Regulations (EAR)
- EU Dual-Use Regulation
- Wassenaar Arrangement

## Classification
- ECCN: 5D002 (Provisional - requires official classification)
- License Exception: TSU (Technology and Software - Unrestricted)

## Restricted Countries
This software may NOT be exported/re-exported to:
- Countries under U.S. embargo
- Entities on denied parties lists

## Cryptographic Notice
This distribution includes cryptographic software. The country in which you currently reside may have restrictions on the import, possession, use, and/or re-export to another country, of encryption software.

## Compliance Requirements
Before using or distributing this software:
1. Verify your local laws regarding encryption
2. Ensure compliance with all applicable export regulations
3. Maintain records of distribution for compliance purposes

## Contact
For export compliance questions: compliance@example.com
"""

地域制限の実装

アクセス制御

# scripts/geo_restrictions.py
import requests
from functools import wraps

class GeoRestrictionManager:
    def __init__(self):
        # 制限対象国(例)
        self.restricted_countries = {
            'embargoed': ['CU', 'IR', 'KP', 'SY'],  # 完全禁輸
            'restricted': ['CN', 'RU'],  # 一部制限
        }
        
        # IPジオロケーションサービス
        self.geo_api_url = "https://ipapi.co/{ip}/json/"
        
    def check_ip_location(self, ip_address):
        """IPアドレスの地理的位置を確認"""
        try:
            response = requests.get(
                self.geo_api_url.format(ip=ip_address),
                timeout=5
            )
            
            if response.status_code == 200:
                data = response.json()
                return {
                    'country_code': data.get('country_code'),
                    'country_name': data.get('country_name'),
                    'region': data.get('region'),
                    'city': data.get('city')
                }
        except Exception as e:
            print(f"Geolocation error: {e}")
            
        return None
    
    def is_restricted(self, country_code):
        """国が制限対象かチェック"""
        if country_code in self.restricted_countries['embargoed']:
            return 'embargoed'
        elif country_code in self.restricted_countries['restricted']:
            return 'restricted'
        return None
    
    def enforce_geo_restrictions(self, func):
        """デコレータ:地理的制限を強制"""
        @wraps(func)
        def wrapper(request, *args, **kwargs):
            # リクエストからIPを取得
            ip = self.get_client_ip(request)
            location = self.check_ip_location(ip)
            
            if location:
                restriction = self.is_restricted(location['country_code'])
                
                if restriction == 'embargoed':
                    return {
                        'error': 'Access denied',
                        'message': 'This service is not available in your region due to export control regulations'
                    }, 403
                    
                elif restriction == 'restricted':
                    # 制限付きアクセス
                    kwargs['restricted_access'] = True
                    
            return func(request, *args, **kwargs)
            
        return wrapper
    
    def generate_distribution_log(self, download_info):
        """配布ログを生成(コンプライアンス用)"""
        log_entry = {
            'timestamp': datetime.now().isoformat(),
            'ip_address': download_info['ip'],
            'country': download_info.get('country'),
            'file': download_info['file'],
            'version': download_info['version'],
            'user_agent': download_info.get('user_agent'),
            'compliance_check': 'passed'
        }
        
        # ログをファイルに追記
        with open('distribution_log.jsonl', 'a') as f:
            f.write(json.dumps(log_entry) + '\n')
            
        return log_entry

16.3 AIツール利用時の知的財産権配慮

Copilot使用ポリシー

コード生成ガイドライン

# AI Code Generation Policy

## Purpose
This policy governs the use of AI-powered code generation tools (GitHub Copilot, etc.) in our projects.

## Guidelines

### 1. Acceptable Use
- ✅ Boilerplate code generation
- ✅ Test case generation
- ✅ Documentation generation
- ✅ Code refactoring suggestions

### 2. Prohibited Use
- ❌ Generating entire modules without review
- ❌ Using suggestions that appear to be copyrighted
- ❌ Blindly accepting security-sensitive code

### 3. Review Requirements
All AI-generated code MUST be:
1. Thoroughly reviewed by a human developer
2. Tested for functionality and security
3. Checked for potential license issues
4. Modified to meet project standards

### 4. Attribution
- Document when significant portions are AI-generated
- Include comments for complex AI-generated algorithms
- Maintain a log of AI tool usage

### 5. Intellectual Property
- Ensure generated code doesn't infringe on third-party rights
- Verify that generated code aligns with project license
- Report any suspicious similarities to known codebases

### 6. Security Considerations
- Never use AI to generate cryptographic code
- Avoid AI generation for authentication/authorization
- Manually review all security-critical sections

## Compliance Checklist
- [ ] Generated code has been reviewed
- [ ] No obvious copyright infringement
- [ ] Security implications considered
- [ ] Appropriate documentation added
- [ ] Team lead approval for significant usage

AI生成コードの追跡

メタデータ管理

# scripts/ai_code_tracker.py
import ast
import json
from datetime import datetime
from pathlib import Path

class AICodeTracker:
    def __init__(self, metadata_file='ai_generated.json'):
        self.metadata_file = Path(metadata_file)
        self.metadata = self._load_metadata()
        
    def _load_metadata(self):
        """既存のメタデータを読み込み"""
        if self.metadata_file.exists():
            with open(self.metadata_file) as f:
                return json.load(f)
        return {'files': {}}
        
    def mark_ai_generated(self, file_path, lines, tool='copilot', confidence=0.8):
        """AI生成コードをマーク"""
        file_path = str(Path(file_path).resolve())
        
        if file_path not in self.metadata['files']:
            self.metadata['files'][file_path] = {
                'ai_sections': [],
                'last_reviewed': None
            }
            
        self.metadata['files'][file_path]['ai_sections'].append({
            'lines': lines,
            'tool': tool,
            'confidence': confidence,
            'marked_at': datetime.now().isoformat(),
            'reviewed': False
        })
        
        self._save_metadata()
        
    def add_review_comment(self, file_path, line_range, comment):
        """レビューコメントを追加"""
        file_path = str(Path(file_path).resolve())
        
        if file_path in self.metadata['files']:
            for section in self.metadata['files'][file_path]['ai_sections']:
                if section['lines'] == line_range:
                    section['review_comment'] = comment
                    section['reviewed'] = True
                    section['reviewed_at'] = datetime.now().isoformat()
                    
        self._save_metadata()
        
    def generate_ai_usage_report(self):
        """AI使用状況レポートを生成"""
        total_files = len(self.metadata['files'])
        total_sections = sum(
            len(file_data['ai_sections']) 
            for file_data in self.metadata['files'].values()
        )
        
        reviewed_sections = sum(
            sum(1 for section in file_data['ai_sections'] if section['reviewed'])
            for file_data in self.metadata['files'].values()
        )
        
        report = f"""# AI Code Generation Report

## Summary
- Files with AI-generated code: {total_files}
- Total AI-generated sections: {total_sections}
- Reviewed sections: {reviewed_sections} ({reviewed_sections/total_sections*100:.1f}%)

## Files Requiring Review
"""
        
        for file_path, file_data in self.metadata['files'].items():
            unreviewed = [s for s in file_data['ai_sections'] if not s['reviewed']]
            
            if unreviewed:
                report += f"\n### {Path(file_path).name}\n"
                for section in unreviewed:
                    report += f"- Lines {section['lines'][0]}-{section['lines'][1]}"
                    report += f" (Tool: {section['tool']}, Confidence: {section['confidence']:.1%})\n"
                    
        return report
    
    def check_similarity(self, code_snippet, threshold=0.8):
        """コードスニペットの類似性をチェック"""
        # 簡易的な類似性チェック(実際はより高度な手法を使用)
        from difflib import SequenceMatcher
        
        similar_snippets = []
        
        # 既知のコードベースと比較
        for known_file in Path('known_code').rglob('*.py'):
            with open(known_file) as f:
                known_code = f.read()
                
            # 類似度を計算
            similarity = SequenceMatcher(None, code_snippet, known_code).ratio()
            
            if similarity > threshold:
                similar_snippets.append({
                    'file': str(known_file),
                    'similarity': similarity,
                    'license': self._get_file_license(known_file)
                })
                
        return similar_snippets
    
    def _save_metadata(self):
        """メタデータを保存"""
        with open(self.metadata_file, 'w') as f:
            json.dump(self.metadata, f, indent=2)

16.4 監査要件への対応

監査ログシステム

包括的な監査ログ

# scripts/audit_system.py
import hashlib
import json
from datetime import datetime
from enum import Enum

class AuditEventType(Enum):
    ACCESS_GRANTED = "access_granted"
    ACCESS_DENIED = "access_denied"
    DATA_DOWNLOAD = "data_download"
    MODEL_DEPLOYMENT = "model_deployment"
    CONFIGURATION_CHANGE = "configuration_change"
    SECURITY_INCIDENT = "security_incident"
    COMPLIANCE_CHECK = "compliance_check"

class AuditLogger:
    def __init__(self, log_file='audit_log.jsonl'):
        self.log_file = log_file
        
    def log_event(self, event_type, user, details, severity='info'):
        """監査イベントをログ"""
        event = {
            'id': self._generate_event_id(),
            'timestamp': datetime.now().isoformat(),
            'event_type': event_type.value,
            'user': user,
            'severity': severity,
            'details': details,
            'hash': None  # 後で計算
        }
        
        # イベントのハッシュを計算(改ざん検知用)
        event['hash'] = self._calculate_hash(event)
        
        # ログファイルに追記
        with open(self.log_file, 'a') as f:
            f.write(json.dumps(event) + '\n')
            
        # 重要なイベントは即座に通知
        if severity in ['critical', 'high']:
            self._send_alert(event)
            
        return event['id']
    
    def verify_log_integrity(self):
        """ログの整合性を検証"""
        issues = []
        
        with open(self.log_file) as f:
            for line_num, line in enumerate(f, 1):
                try:
                    event = json.loads(line)
                    
                    # ハッシュを検証
                    stored_hash = event['hash']
                    event['hash'] = None
                    calculated_hash = self._calculate_hash(event)
                    
                    if stored_hash != calculated_hash:
                        issues.append({
                            'line': line_num,
                            'event_id': event['id'],
                            'issue': 'Hash mismatch - possible tampering'
                        })
                        
                except Exception as e:
                    issues.append({
                        'line': line_num,
                        'issue': f'Parse error: {str(e)}'
                    })
                    
        return issues
    
    def generate_audit_report(self, start_date=None, end_date=None):
        """監査レポートを生成"""
        events = self._load_events(start_date, end_date)
        
        report = {
            'period': {
                'start': start_date or 'all',
                'end': end_date or 'current'
            },
            'summary': {
                'total_events': len(events),
                'by_type': {},
                'by_severity': {},
                'by_user': {}
            },
            'critical_events': [],
            'compliance_status': 'compliant'
        }
        
        # イベントを集計
        for event in events:
            # タイプ別
            event_type = event['event_type']
            report['summary']['by_type'][event_type] = \
                report['summary']['by_type'].get(event_type, 0) + 1
                
            # 重要度別
            severity = event['severity']
            report['summary']['by_severity'][severity] = \
                report['summary']['by_severity'].get(severity, 0) + 1
                
            # ユーザー別
            user = event['user']
            report['summary']['by_user'][user] = \
                report['summary']['by_user'].get(user, 0) + 1
                
            # 重要イベント
            if event['severity'] in ['critical', 'high']:
                report['critical_events'].append(event)
                
        # コンプライアンス違反をチェック
        violations = self._check_compliance_violations(events)
        if violations:
            report['compliance_status'] = 'violations_found'
            report['violations'] = violations
            
        return report
    
    def _calculate_hash(self, event):
        """イベントのハッシュを計算"""
        # ハッシュフィールドを除外してJSON化
        event_copy = {k: v for k, v in event.items() if k != 'hash'}
        event_str = json.dumps(event_copy, sort_keys=True)
        
        return hashlib.sha256(event_str.encode()).hexdigest()
    
    def _check_compliance_violations(self, events):
        """コンプライアンス違反をチェック"""
        violations = []
        
        # 不正アクセスの試行
        failed_access = [e for e in events if 
                        e['event_type'] == AuditEventType.ACCESS_DENIED.value]
        
        # 同一ユーザーから短時間に複数の失敗
        from collections import defaultdict
        user_failures = defaultdict(list)
        
        for event in failed_access:
            user_failures[event['user']].append(event['timestamp'])
            
        for user, timestamps in user_failures.items():
            if len(timestamps) > 5:  # 5回以上の失敗
                violations.append({
                    'type': 'excessive_access_failures',
                    'user': user,
                    'count': len(timestamps),
                    'severity': 'high'
                })
                
        return violations

レギュレーション対応

GDPR対応

# scripts/gdpr_compliance.py
class GDPRCompliance:
    def __init__(self):
        self.personal_data_fields = [
            'email', 'name', 'phone', 'address', 'ip_address',
            'user_id', 'device_id', 'location'
        ]
        
    def anonymize_data(self, data, fields_to_anonymize=None):
        """個人データを匿名化"""
        fields = fields_to_anonymize or self.personal_data_fields
        anonymized = data.copy()
        
        for field in fields:
            if field in anonymized:
                if field == 'email':
                    # メールアドレスの匿名化
                    parts = anonymized[field].split('@')
                    anonymized[field] = f"{parts[0][:3]}****@{parts[1]}"
                    
                elif field == 'ip_address':
                    # IPアドレスの匿名化
                    parts = anonymized[field].split('.')
                    anonymized[field] = f"{parts[0]}.{parts[1]}.*.*"
                    
                else:
                    # その他のフィールドはハッシュ化
                    anonymized[field] = hashlib.sha256(
                        str(anonymized[field]).encode()
                    ).hexdigest()[:8]
                    
        return anonymized
    
    def handle_deletion_request(self, user_id):
        """削除リクエストの処理"""
        deletion_log = {
            'user_id': user_id,
            'requested_at': datetime.now().isoformat(),
            'status': 'pending',
            'affected_systems': []
        }
        
        # 各システムからデータを削除
        systems = ['database', 'logs', 'backups', 'analytics']
        
        for system in systems:
            try:
                self._delete_from_system(user_id, system)
                deletion_log['affected_systems'].append({
                    'system': system,
                    'status': 'deleted'
                })
            except Exception as e:
                deletion_log['affected_systems'].append({
                    'system': system,
                    'status': 'failed',
                    'error': str(e)
                })
                
        deletion_log['status'] = 'completed'
        deletion_log['completed_at'] = datetime.now().isoformat()
        
        return deletion_log
    
    def generate_privacy_report(self, user_id):
        """プライバシーレポートを生成(データポータビリティ)"""
        user_data = {
            'user_id': user_id,
            'generated_at': datetime.now().isoformat(),
            'data_categories': {}
        }
        
        # 各カテゴリーのデータを収集
        categories = [
            'profile_data',
            'activity_logs',
            'preferences',
            'generated_content'
        ]
        
        for category in categories:
            data = self._collect_user_data(user_id, category)
            if data:
                user_data['data_categories'][category] = data
                
        return user_data

16.5 ポリシー違反の検出と対応

自動ポリシー検出

ポリシー違反検出器

# scripts/policy_violation_detector.py
class PolicyViolationDetector:
    def __init__(self, policy_config='policies.yaml'):
        self.policies = self._load_policies(policy_config)
        self.violations = []
        
    def scan_repository(self, repo_path):
        """リポジトリ全体をスキャン"""
        scanners = [
            self.scan_credentials,
            self.scan_licenses,
            self.scan_security,
            self.scan_code_quality,
            self.scan_data_handling
        ]
        
        for scanner in scanners:
            violations = scanner(repo_path)
            self.violations.extend(violations)
            
        return self.violations
    
    def scan_credentials(self, repo_path):
        """認証情報の検出"""
        violations = []
        patterns = [
            (r'api[_-]?key\s*=\s*["\']([^"\']+)["\']', 'api_key'),
            (r'password\s*=\s*["\']([^"\']+)["\']', 'password'),
            (r'AWS[_-]?ACCESS[_-]?KEY[_-]?ID\s*=\s*([A-Z0-9]{20})', 'aws_key'),
            (r'-----BEGIN (RSA|DSA|EC) PRIVATE KEY-----', 'private_key')
        ]
        
        for file_path in Path(repo_path).rglob('*'):
            if file_path.is_file() and file_path.suffix in ['.py', '.js', '.yml']:
                try:
                    content = file_path.read_text()
                    
                    for pattern, cred_type in patterns:
                        matches = re.finditer(pattern, content, re.IGNORECASE)
                        for match in matches:
                            violations.append({
                                'type': 'credential_exposure',
                                'severity': 'critical',
                                'file': str(file_path),
                                'line': content[:match.start()].count('\n') + 1,
                                'credential_type': cred_type,
                                'remediation': 'Use environment variables or secret management'
                            })
                            
                except Exception as e:
                    continue
                    
        return violations
    
    def generate_violation_report(self):
        """違反レポートを生成"""
        report = f"""# Policy Violation Report

Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}

## Summary
Total violations found: {len(self.violations)}

### By Severity
"""
        
        # 重要度別に集計
        by_severity = {}
        for violation in self.violations:
            sev = violation['severity']
            by_severity[sev] = by_severity.get(sev, 0) + 1
            
        for severity in ['critical', 'high', 'medium', 'low']:
            if severity in by_severity:
                report += f"- {severity.upper()}: {by_severity[severity]}\n"
                
        report += "\n## Critical Violations\n"
        
        # 重要な違反を詳細表示
        critical = [v for v in self.violations if v['severity'] == 'critical']
        
        for violation in critical[:10]:  # 最初の10件
            report += f"""
### {violation['type']}
- **File**: {violation['file']}
- **Line**: {violation.get('line', 'N/A')}
- **Details**: {violation.get('details', 'N/A')}
- **Remediation**: {violation['remediation']}
"""
            
        return report
    
    def auto_remediate(self, violation):
        """可能な場合は自動修正"""
        if violation['type'] == 'missing_license_header':
            # ライセンスヘッダーを追加
            header_manager = LicenseHeaderManager()
            header_manager.add_header_to_file(violation['file'])
            return True
            
        elif violation['type'] == 'code_formatting':
            # コードフォーマットを修正
            subprocess.run(['black', violation['file']])
            return True
            
        # 自動修正できない場合
        return False

インシデント対応プロセス

ポリシー違反時のワークフロー

# .github/workflows/policy-enforcement.yml
name: Policy Enforcement

on:
  push:
    branches: [main, develop]
  pull_request:
    branches: [main]
  schedule:
    - cron: '0 0 * * 0'  # 週次スキャン

jobs:
  policy-check:
    runs-on: ubuntu-latest
    
    steps:
      - uses: actions/checkout@v3
      
      - name: Run policy scanner
        id: scan
        run: |
          python scripts/policy_violation_detector.py > violations.json
          
          # 重大な違反があるかチェック
          CRITICAL=$(jq '[.[] | select(.severity == "critical")] | length' violations.json)
          
          if [ $CRITICAL -gt 0 ]; then
            echo "critical_found=true" >> $GITHUB_OUTPUT
          fi
          
      - name: Create issue for violations
        if: steps.scan.outputs.critical_found == 'true'
        uses: actions/github-script@v6
        with:
          script: |
            const violations = require('./violations.json');
            const critical = violations.filter(v => v.severity === 'critical');
            
            const issueBody = `## 🚨 Critical Policy Violations Detected
            
            Found ${critical.length} critical violations that require immediate attention.
            
            ### Violations:
            ${critical.map(v => `- **${v.type}** in \`${v.file}\` (line ${v.line})`).join('\n')}
            
            ### Required Actions:
            1. Review the violations
            2. Apply recommended remediations
            3. Re-run the policy check
            
            cc @security-team
            `;
            
            github.rest.issues.create({
              owner: context.repo.owner,
              repo: context.repo.repo,
              title: '🚨 Critical Policy Violations',
              body: issueBody,
              labels: ['security', 'critical', 'policy-violation']
            });
            
      - name: Block PR if critical violations
        if: github.event_name == 'pull_request' && steps.scan.outputs.critical_found == 'true'
        run: |
          echo "Critical policy violations found. PR cannot be merged."
          exit 1
          
      - name: Upload violation report
        uses: actions/upload-artifact@v3
        with:
          name: policy-violations
          path: |
            violations.json
            violation_report.html

まとめ

本章では、コンプライアンスとガバナンスについて学習しました:

  • オープンソースライセンスの適切な選択と管理
  • 輸出規制への対応と地域制限の実装
  • AI生成コードの知的財産権管理
  • 包括的な監査ログとレポート生成
  • ポリシー違反の自動検出と対応

確認事項

  • プロジェクトに適切なライセンスを選択できる
  • 依存関係のライセンス互換性を確認できる
  • 輸出規制に対応した実装ができる
  • AI生成コードを適切に管理できる
  • 監査要件を満たすシステムを構築できる