第7章:セキュリティ強化


目次に戻る: はじめに
前の章: 第6章:パフォーマンス最適化
次の章: 第8章:運用監視と自動化
学習フェーズ: Part III - 実装・運用編(セキュリティ)
学習レベル: 基礎 | 応用 | 発展
推定学習時間: 6〜8時間
難易度: 上級(セキュリティ・コンプライアンス知識必要) —

 セキュリティを城の防御システムで考えてみよう
├──  外堀(多層防御):複数の防御ラインで敵を阻止
├──  城門(高度なRLS):入城許可を細かく制御
├──  見張り台(セキュリティ監査):すべての出入りを記録
└──  警備隊(脆弱性対策):侵入者を即座に検知・対処

 Supabaseセキュリティシステム
├──  多層防御:認証・認可・暗号化・ファイアウォール
├──  高度なRLS:動的権限制御・時間制限・組織階層
├──  セキュリティ監査:ログ収集・コンプライアンス対応
└──  脆弱性対策:侵入検知・脆弱性スキャン・自動修復

この章で学ぶこと

このままプライベートプロジェクトや簡単なアプリだけを作っていると、「セキュリティ対策ってJWTトークンを使うだけで十分でしょ?」と思ってしまいがちです。しかし、実際のビジネスアプリケーションでは、もっと複雑で細かい権限制御が必要になります。

初心者が陥りがちな問題

# [NG] よくある初心者の認証実装
@app.route('/api/users/<user_id>')
def get_user(user_id):
    if not request.headers.get('Authorization'):
        return "ログインしてください", 401
    return {"user": get_user_data(user_id)}  # 誰のデータでも見れてしまう!

[OK] この章で作る安全な実装

# [OK] セキュリティを考慮した実装
@app.route('/api/users/<user_id>')
@require_auth
@check_permissions('user:read')
@audit_log('user_access')
@rate_limit('10/minute')
def get_user(user_id):
    # 自分のデータか、管理権限があるかチェック
    if not can_access_user_data(current_user, user_id):
        return "このユーザー情報にはアクセスできません", 403
    return {"user": get_user_data(user_id)}

学習進度別ガイド

レベル 対象者 学習内容 所要時間
基礎 JWT認証しか知らない方 RLS基本・権限概念・監査ログ 6〜8時間
応用 基本的なRBACができる方 時間制限・組織階層・動的権限 8〜12時間
発展 セキュリティ専門を目指す方 侵入検知・脆弱性対策・自動化 12〜16時間

この章で扱う構成

  • 構成: 共通(セキュリティ強化)
  • 推奨用途: 本番運用・監査/コンプライアンスが必要な段階
  • 非推奨用途: 短期検証でのみ使用し本番運用しないケース

7.1 RLSポリシー詳細設計(城門の複雑な入場許可システム)

城に入るとき、「身分証を見せれば入れる」という単純なルールだけでは不十分ですよね。実際の城では「この人は昼間だけ入場可能」「この部屋にはVIPしか入れない」「家族なら一緒に入れる」といった複雑なルールがあります。SupabaseのRLS(Row Level Security)も同じように、データベースの各行に対して細かい権限制御を実現できます。

Step 1: 基本的な動的権限チェック関数

まず、「この人はこの操作をしても良いか?」を判断する基本的な関数を作りましょう:

初心者向け解説:権限システムとは?

身近な例で権限システムを理解してみましょう:

場面 実際の権限制御 Supabaseでの権限制御
オフィスビル 「社員証で入館、部署に応じてフロア制限」 「JWTで認証、テーブルに応じてアクセス制限」
銀行ATM 「カード+暗証番号で本人確認、口座残高範囲で出金」 「認証+RLSで本人確認、データ範囲で操作制限」
電子カルテ 「医師は全患者、看護師は担当患者のみ」 「roleに応じて、関連するrowのみアクセス可能」

動的権限チェック関数の実装

レストランで「この人はキッチンに入っても良いか?」を判断するシステムのように、複数の条件を組み合わせて権限をチェックする関数を作ります:

高度なRLSパターン

--  動的権限チェック関数(レストランの入店判定システム)
CREATE OR REPLACE FUNCTION auth.user_has_permission(
    permission_name text,              -- 「キッチン利用」「食材発注」など具体的な権限名
    resource_type text DEFAULT NULL,   -- 「project」「task」などリソースの種類
    resource_id bigint DEFAULT NULL,   -- 具体的なリソースのID(特定のプロジェクトなど)
    organization_context bigint DEFAULT NULL  -- どの組織での権限かの指定
)
RETURNS boolean AS $$  -- true(許可)かfalse(拒否)を返す
DECLARE
    user_uuid uuid;                    -- 現在ログインしているユーザーのID
    has_permission boolean := false;   -- 権限があるかの判定結果(初期値:権限なし)
    org_id bigint;                     -- 対象となる組織のID
BEGIN
    --  Step 1: 現在のユーザー取得(誰が操作しようとしているか確認)
    user_uuid := auth.uid();
    IF user_uuid IS NULL THEN
        -- ログインしていない場合は即座に拒否
        RETURN false;
    END IF;
    
    --  Step 2: 組織コンテキスト決定(どの会社での権限を調べるか)
    org_id := COALESCE(
        organization_context,          -- 明示的に指定された組織
        (SELECT organization_id FROM current_user_context 
         WHERE user_id = user_uuid LIMIT 1)  -- ユーザーの現在のコンテキスト
    );
    
    --  Step 3: スーパーユーザーチェック(管理者は全権限あり)
    IF auth.is_superuser() THEN
        RETURN true;  -- システム管理者なら何でもOK(レストランオーナー権限)
    END IF;
    
    --  Step 4: 組織オーナー権限(会社の社長は社内で何でもできる)
    IF EXISTS (
        SELECT 1 FROM user_organizations uo
        WHERE uo.user_id = user_uuid 
        AND uo.organization_id = org_id 
        AND uo.role = 'owner'          -- オーナー役割
        AND uo.is_active = true        -- 現在アクティブな権限
    ) THEN
        RETURN true;  -- 組織オーナーなら組織内で全権限あり
    END IF;
    
    --  Step 5: 具体的権限チェック(詳細な権限を3つのルートで確認)
    WITH user_permissions AS (
        --  ルート1: 直接権限(個人に直接付与された権限)
        -- 「田中さんに特別にキッチン利用権限を付与」のような個別設定
        SELECT p.name 
        FROM user_permissions up
        JOIN permissions p ON up.permission_id = p.id
        WHERE up.user_id = user_uuid 
        AND up.is_active = true                              -- 現在有効な権限
        AND (up.expires_at IS NULL OR up.expires_at > now()) -- 期限切れでない権限
        
        UNION
        
        --  ルート2: ロール経由権限(役職による権限)
        -- 「料理長ロールには全キッチン権限あり」のような役職ベース
        SELECT p.name
        FROM user_roles ur
        JOIN role_permissions rp ON ur.role_id = rp.role_id
        JOIN permissions p ON rp.permission_id = p.id
        WHERE ur.user_id = user_uuid
        AND ur.is_active = true                              -- 現在有効な役職
        AND (ur.expires_at IS NULL OR ur.expires_at > now()) -- 期限切れでない役職
        
        UNION
        
        --  ルート3: 組織ロール権限(組織内での役職による権限)
        -- 「この支店のマネージャーには支店内全権限あり」のような組織ベース
        SELECT p.name
        FROM user_organizations uo
        JOIN organization_role_permissions orp ON uo.role = orp.role_name
        JOIN permissions p ON orp.permission_id = p.id
        WHERE uo.user_id = user_uuid
        AND uo.organization_id = org_id
        AND uo.is_active = true                              -- 現在有効な組織メンバーシップ
    )
    --  権限存在チェック:3つのルートのいずれかで必要な権限が見つかるか?
    SELECT EXISTS(
        SELECT 1 FROM user_permissions 
        WHERE name = permission_name
    ) INTO has_permission;
    
    --  Step 6: リソース固有権限チェック(特定のファイルやプロジェクトの所有者確認)
    IF has_permission AND resource_type IS NOT NULL AND resource_id IS NOT NULL THEN
        --  プロジェクト関連の場合:プロジェクト作成者かメンバーかチェック
        IF resource_type = 'project' THEN
            SELECT EXISTS(
                SELECT 1 FROM projects p
                WHERE p.id = resource_id
                AND (
                    --  プロジェクト作成者の場合(プロジェクトオーナー)
                    p.created_by = (SELECT id FROM auth.users WHERE auth.uid() = id)
                    OR EXISTS(
                        --  プロジェクトメンバーの場合(プロジェクトチーム)
                        SELECT 1 FROM project_members pm
                        WHERE pm.project_id = resource_id
                        AND pm.user_id = (SELECT id FROM auth.users WHERE auth.uid() = id)
                        AND pm.is_active = true        -- 現在アクティブなメンバー
                    )
                )
            ) INTO has_permission;
        END IF;
    END IF;
    
    --  最終判定結果を返す(true = 許可、false = 拒否)
    RETURN has_permission;
END;
$$ LANGUAGE plpgsql SECURITY DEFINER STABLE;

初心者向け解説

この関数は、レストランの「入店許可システム」に例えると以下のような流れです:

ステップ レストランの例 実際の処理 判定結果
1 「この人は誰?」 auth.uid()でユーザー特定 ログイン確認
2 「どの支店での利用?」 組織コンテキスト決定 対象組織特定
3 「オーナー様ですか?」 スーパーユーザーチェック 全権限確認
4 「支店長ですか?」 組織オーナーチェック 組織内全権限確認
5 「利用許可証をお持ちですか?」 3ルートでの権限確認 具体的権限確認
6 「この席の予約者ですか?」 リソース所有者確認 特定データ権限

Step 2: 時間ベースアクセス制御(営業時間管理システム)

レストランに「平日9-18時のみ営業」「土日は休業」といった営業時間があるように、データベースアクセスにも時間制限を設けることができます:

--  時間ベースアクセス制御(営業時間チェックシステム)
CREATE OR REPLACE FUNCTION auth.check_time_based_access(
    resource_type text,     -- 「organization」「project」など制御対象
    resource_id bigint      -- 具体的なID
)
RETURNS boolean AS $$  -- true(営業時間内)かfalse(営業時間外)を返す
DECLARE
    current_time time;         -- 現在の時刻(例:14:30:00)
    current_dow int;           -- 現在の曜日(0=日曜日、1=月曜日...6=土曜日)
    access_schedule jsonb;     -- 営業時間の設定(JSON形式)
BEGIN
    --  現在の日時情報を取得
    current_time := CURRENT_TIME;                    -- 今何時?
    current_dow := EXTRACT(dow FROM CURRENT_DATE);   -- 今日は何曜日?
    
    --  リソースの営業時間設定を取得
    CASE resource_type
        WHEN 'organization' THEN
            --  組織の営業時間設定を取得
            SELECT settings->'access_schedule' INTO access_schedule
            FROM organizations WHERE id = resource_id;
        WHEN 'project' THEN
            --  プロジェクトの利用可能時間設定を取得
            SELECT settings->'access_schedule' INTO access_schedule
            FROM projects WHERE id = resource_id;
        ELSE
            --  その他の場合は24時間利用可能
            RETURN true; -- デフォルトは許可
    END CASE;
    
    --  営業時間の設定がない場合は24時間営業
    IF access_schedule IS NULL THEN
        RETURN true;
    END IF;
    
    --  現在の曜日・時間が営業時間内かチェック
    RETURN (
        --  今日が営業日かチェック(例:月曜日なら"1"のenabled = "true")
        access_schedule->(current_dow::text)->>'enabled' = 'true'
        AND (
            --  現在時刻が営業時間内かチェック(例:9:00-18:00の間)
            current_time BETWEEN 
                (access_schedule->(current_dow::text)->>'start_time')::time
                AND (access_schedule->(current_dow::text)->>'end_time')::time
        )
    );
END;
$$ LANGUAGE plpgsql SECURITY DEFINER STABLE;

初心者向け解説

この営業時間チェック関数の設定例(JSON形式):

{
  "access_schedule": {
    "0": { "enabled": false },                    // 日曜日は休業
    "1": { "enabled": true, "start_time": "09:00", "end_time": "18:00" },  // 月曜日 9-18
    "2": { "enabled": true, "start_time": "09:00", "end_time": "18:00" },  // 火曜日 9-18
    "3": { "enabled": true, "start_time": "09:00", "end_time": "18:00" },  // 水曜日 9-18
    "4": { "enabled": true, "start_time": "09:00", "end_time": "18:00" },  // 木曜日 9-18
    "5": { "enabled": true, "start_time": "09:00", "end_time": "18:00" },  // 金曜日 9-18
    "6": { "enabled": false }                     // 土曜日は休業
  }
}

Step 3: IPアドレス制限(入店許可リスト管理)

レストランで「会員カードをお持ちの方のみ入店可能」「特定地域の方は入店お断り」といった制限があるように、特定のIPアドレスからのみアクセス可能にすることができます:

--  IPアドレス制限(入店許可リスト管理システム)
CREATE OR REPLACE FUNCTION auth.check_ip_access(
    allowed_ips text[]      -- 許可するIPアドレスのリスト
)
RETURNS boolean AS $$  -- true(許可IPから)かfalse(禁止IPから)を返す
DECLARE
    client_ip inet;             -- クライアントのIPアドレス
    ip_allowed boolean := false; -- IP許可フラグ(初期値:拒否)
    allowed_ip text;            -- 許可IPリストの各項目
BEGIN
    --  クライアントのIPアドレス取得(どこからアクセスしているか)
    client_ip := COALESCE(
        -- プロキシ経由の場合(ロードバランサー越しなど)
        current_setting('request.headers', true)::json->>'x-forwarded-for',
        -- リバースプロキシ経由の場合
        current_setting('request.headers', true)::json->>'x-real-ip',
        -- ローカル環境の場合のデフォルト
        '127.0.0.1'
    )::inet;
    
    --  IP制限なしの場合は全て許可(フリーパス)
    IF allowed_ips IS NULL OR array_length(allowed_ips, 1) = 0 THEN
        RETURN true;
    END IF;
    
    --  許可IPリストを一つずつチェック
    FOREACH allowed_ip IN ARRAY allowed_ips
    LOOP
        --  CIDR記法対応(例:192.168.1.0/24 のような範囲指定)
        IF client_ip <<= allowed_ip::inet THEN
            ip_allowed := true;
            EXIT;  -- 一つでも一致すればOK(ループを抜ける)
        END IF;
    END LOOP;
    
    --  最終判定結果を返す
    RETURN ip_allowed;
END;
$$ LANGUAGE plpgsql SECURITY DEFINER STABLE;

初心者向け解説

IPアドレス制限の設定例:

設定パターン 許可リスト例 用途 効果
単一IP ['192.168.1.100'] 管理者専用PC 特定の1台からのみアクセス可
社内ネットワーク ['192.168.1.0/24'] 社内からのみ 社内の256台からアクセス可
複数拠点 ['192.168.1.0/24', '10.0.0.0/8'] 本社+支社 複数のオフィスからアクセス可
制限なし [] または NULL 全世界 どこからでもアクセス可(デフォルト)

Step 4: 階層型組織のRLS実装(コンビニチェーン管理システム)

大手コンビニチェーンのように「本社→エリア→支店」の階層構造で、「エリアマネージャーは配下の全支店を管理可能」といった階層的な権限継承を実現します:

 コンビニチェーンの組織構造
本社(全社管理)
├── 関東エリア(エリア内管理)
│   ├── 渋谷支店
│   ├── 新宿支店
│   └── 池袋支店
└── 関西エリア(エリア内管理)
    ├── 大阪支店
    ├── 京都支店
    └── 神戸支店

 組織階層での権限継承
本社管理者 → 全エリア・全支店にアクセス可
エリアマネージャー → 配下の支店のみアクセス可
支店長 → 自分の支店のみアクセス可
--  階層組織アクセス制御(コンビニチェーン権限システム)
CREATE OR REPLACE FUNCTION auth.get_accessible_organizations(
    user_uuid uuid DEFAULT auth.uid(),      -- 対象ユーザー(デフォルト:現在のログインユーザー)
    include_inherited boolean DEFAULT true  -- 上位組織の権限を下位に継承するか
)
RETURNS TABLE(organization_id bigint, access_level text, inherited boolean) AS $$
BEGIN
    RETURN QUERY
    WITH RECURSIVE org_hierarchy AS (
        --  Step 1: 直接所属している組織を取得(本人が正式に配属されている場所)
        SELECT 
            uo.organization_id,                -- 組織ID
            uo.role as access_level,           -- アクセスレベル(owner/admin/member/viewer)
            false as inherited,                -- 直接権限なので継承ではない
            0 as depth                         -- 階層の深さ(0 = 直接所属)
        FROM user_organizations uo
        WHERE uo.user_id = user_uuid 
        AND uo.is_active = true                -- 現在アクティブな所属のみ
        
        UNION ALL
        
        --  Step 2: 上位組織の権限を下位組織に継承(権限の階層展開)
        SELECT 
            o.id as organization_id,           -- 下位組織のID
            CASE 
                -- 権限レベルを段階的に下げる(上位権限者が下位で暴走しないように)
                WHEN oh.access_level = 'owner' THEN 'admin'   -- オーナー → 管理者
                WHEN oh.access_level = 'admin' THEN 'member'   -- 管理者 → メンバー
                ELSE 'viewer'                                  -- その他 → 閲覧者
            END as access_level,
            true as inherited,                 -- 継承による権限
            oh.depth + 1                       -- 階層を一つ深く
        FROM organizations o
        JOIN org_hierarchy oh ON o.parent_id = oh.organization_id  -- 親子関係でジョイン
        WHERE include_inherited = true         -- 継承が有効な場合のみ
        AND oh.depth < 10                      -- 無限ループ防止(最大10階層)
        AND oh.access_level IN ('owner', 'admin')  -- 権限のある人のみ継承可能
    )
    --  結果の整理(同じ組織に複数の権限がある場合は最強の権限を選択)
    SELECT DISTINCT ON (oh.organization_id)
        oh.organization_id,
        oh.access_level,
        oh.inherited
    FROM org_hierarchy oh
    ORDER BY oh.organization_id, 
             oh.inherited ASC,                 -- 直接権限を優先
             CASE oh.access_level              -- 権限レベルの強さで順序付け
                WHEN 'owner' THEN 0 
                WHEN 'admin' THEN 1 
                WHEN 'member' THEN 2 
                ELSE 3 
             END;
END;
$$ LANGUAGE plpgsql SECURITY DEFINER STABLE;

初心者向け解説

この階層組織システムの動作例:

ユーザー 直接所属 継承権限 アクセス可能組織 権限レベル
本社社長 本社(owner) 全エリア・全支店 本社、関東エリア、渋谷支店… admin→member
エリアマネージャー 関東エリア(admin) 配下支店のみ 関東エリア、渋谷支店、新宿支店… admin→member
支店長 渋谷支店(admin) なし 渋谷支店のみ admin
店員 渋谷支店(member) なし 渋谷支店のみ member

Step 5: 動的RLSポリシーの適用(総合セキュリティゲート)

これまで作った各種チェック関数を組み合わせて、包括的なセキュリティポリシーを作成します:

--  動的RLSポリシー適用(総合セキュリティゲートシステム)
CREATE POLICY "Dynamic organization access" ON projects
    FOR ALL USING (
        --  条件1: 組織メンバーシップチェック(この組織のメンバーか?)
        organization_id IN (
            SELECT organization_id 
            FROM auth.get_accessible_organizations()
        )
        --  条件2: 時間制限チェック(営業時間内か?)
        AND auth.check_time_based_access('project', id)
        --  条件3: 権限チェック(必要な権限を持っているか?)
        AND auth.user_has_permission('project:read', 'project', id)
        --  条件4: IPアドレス制限チェック(許可された場所からか?)
        AND auth.check_ip_access(
            (SELECT string_to_array(settings->>'allowed_ips', ',') 
             FROM organizations WHERE id = organization_id)
        )
    );

--  階層データアクセスポリシー(タスクテーブル用)
CREATE POLICY "Hierarchical data access" ON tasks
    FOR SELECT USING (
        --  条件1: プロジェクトの組織にアクセス権があるか
        project_id IN (
            SELECT p.id FROM projects p
            WHERE p.organization_id IN (
                SELECT organization_id 
                FROM auth.get_accessible_organizations()
            )
        )
        AND (
            --  条件2: タスクへの個人的な関係があるか
            created_by = auth.uid()              -- タスク作成者
            OR assignee_id = auth.uid()          -- タスク担当者
            OR auth.user_has_permission('task:read', 'task', id)  -- 権限保有者
        )
    );

初心者向け解説

この複合ポリシーは、空港のセキュリティチェックに例えると:

チェック段階 空港セキュリティ RLSポリシー 失敗時の結果
1 「チケットをお持ちですか?」 組織メンバーシップ データアクセス拒否
2 「搭乗時間内ですか?」 営業時間チェック 時間外アクセス拒否
3 「搭乗券の権限は?」 権限レベル確認 権限不足でアクセス拒否
4 「禁止地域からではないか?」 IPアドレス制限 不正地域からアクセス拒否

7.2 セキュリティ監査システム(見張り台の記録システム)

城の見張り台がすべての出入りを記録するように、データベースのすべての操作を記録・監視するシステムを構築します:

 古典的な見張り台システム
├──  監視:誰が・いつ・何を・したか
├──  記録:すべての出来事を詳細に記録
├── [CRITICAL] アラート:異常な行動を即座に通知
└──  分析:パターンを分析して脅威を検知

 デジタル監査システム
├──  監視:データアクセス・変更・削除を追跡
├──  記録:操作ログを改ざん不可能な形で保存
├── [CRITICAL] アラート:不審なアクセスパターンを検知
└──  分析:アクセスパターンから異常を検知

Step 1: セキュリティ監査ログテーブル設計

すべての操作を記録するための「デジタル監視記録簿」を作成します:

--  セキュリティ監査ログテーブル(デジタル見張り台の記録簿)
CREATE TABLE security_audit_logs (
    id bigserial PRIMARY KEY,                    -- ログID(通し番号)
    
    --  Who(誰が)
    user_id uuid REFERENCES auth.users(id),      -- 操作者のユーザーID
    user_email text,                             -- 操作者のメールアドレス(削除対策)
    user_role text,                              -- 操作時の役職・権限
    
    --  When(いつ)
    created_at timestamp with time zone DEFAULT now(),  -- 操作実行時刻
    
    --  What(何を)
    action text NOT NULL,                        -- 操作内容(CREATE/READ/UPDATE/DELETE)
    resource_type text NOT NULL,                 -- 対象リソース種類(projects/tasks/users)
    resource_id bigint,                          -- 対象リソースの具体的ID
    old_values jsonb,                            -- 変更前のデータ(UPDATE/DELETE用)
    new_values jsonb,                            -- 変更後のデータ(CREATE/UPDATE用)
    
    --  Where(どの組織で)
    organization_id bigint,                      -- 操作対象の組織
    
    --  セキュリティ情報(不正アクセス検知用)
    ip_address inet,                             -- アクセス元IPアドレス
    user_agent text,                             -- ブラウザ・アプリ情報
    session_id text,                             -- セッションID(追跡用)
    request_path text,                           -- アクセスしたURL/API
    
    -- [OK] アクセス結果
    access_granted boolean NOT NULL,             -- アクセス許可されたか(true/false)
    denial_reason text,                          -- 拒否の理由(権限不足/時間外/IP制限など)
    risk_score integer DEFAULT 0,               -- リスクスコア(0-100、高いほど危険)
    
    --  追加メタデータ
    request_details jsonb,                       -- リクエストの詳細情報(JSON)
    
    --  インデックス(高速検索用)
    INDEX idx_audit_user_time (user_id, created_at),
    INDEX idx_audit_resource (resource_type, resource_id),
    INDEX idx_audit_risk (risk_score DESC, created_at DESC),
    INDEX idx_audit_ip_suspicious (ip_address, access_granted)
);

--  監査ログの自動RLS(監視記録の閲覧制限)
ALTER TABLE security_audit_logs ENABLE ROW LEVEL SECURITY;

CREATE POLICY "Audit log access" ON security_audit_logs
    FOR SELECT USING (
        --  セキュリティ管理者:全ログ閲覧可能(監査部門の権限)
        auth.user_has_permission('security:audit_read')
        OR
        --  組織管理者:自組織のログのみ閲覧可能(支社長は自支社のみ)
        (
            auth.user_has_permission('organization:admin')
            AND organization_id IN (
                SELECT organization_id 
                FROM auth.get_accessible_organizations()
                WHERE access_level IN ('owner', 'admin')
            )
        )
        OR
        --  一般ユーザー:自分の活動記録のみ閲覧可能
        user_id = auth.uid()
    );

--  監査ログ記録関数(自動監視システム)
CREATE OR REPLACE FUNCTION log_security_event(
    p_action text,                      -- 操作種類(LOGIN/CREATE/UPDATE/DELETE)
    p_resource_type text,               -- 対象リソース(users/projects/tasks)
    p_resource_id bigint DEFAULT NULL,  -- 対象ID
    p_access_granted boolean DEFAULT true,  -- アクセス成功/失敗
    p_denial_reason text DEFAULT NULL,  -- 失敗理由
    p_request_details jsonb DEFAULT NULL  -- 詳細情報
)
RETURNS void AS $$
DECLARE
    current_user_id uuid;              -- 現在のユーザーID
    current_org_id bigint;             -- 現在の組織ID
    client_ip inet;                    -- クライアントIP
    client_user_agent text;            -- ブラウザ情報
    calculated_risk_score integer;     -- 計算されたリスクスコア
BEGIN
    --  現在のユーザー情報取得
    current_user_id := auth.uid();
    
    --  組織コンテキスト取得
    SELECT organization_id INTO current_org_id
    FROM current_user_context 
    WHERE user_id = current_user_id 
    LIMIT 1;
    
    --  クライアント情報取得(どこからアクセスしているか)
    client_ip := COALESCE(
        current_setting('request.headers', true)::json->>'x-forwarded-for',
        current_setting('request.headers', true)::json->>'x-real-ip',
        '127.0.0.1'
    )::inet;
    
    client_user_agent := current_setting('request.headers', true)::json->>'user-agent';
    
    -- [WARN] リスクスコア計算(不審度判定)
    calculated_risk_score := 0;
    
    -- 失敗アクセスはリスク+30
    IF NOT p_access_granted THEN
        calculated_risk_score := calculated_risk_score + 30;
    END IF;
    
    -- 管理者系操作はリスク+20
    IF p_action IN ('DELETE', 'ADMIN_LOGIN', 'PERMISSION_CHANGE') THEN
        calculated_risk_score := calculated_risk_score + 20;
    END IF;
    
    -- 深夜時間帯(22時-6時)はリスク+10
    IF EXTRACT(hour FROM now()) BETWEEN 22 AND 23 OR EXTRACT(hour FROM now()) BETWEEN 0 AND 6 THEN
        calculated_risk_score := calculated_risk_score + 10;
    END IF;
    
    --  監査ログ記録実行
    INSERT INTO security_audit_logs (
        user_id, user_email, user_role,         -- Who(誰が)
        action, resource_type, resource_id,     -- What(何を)
        organization_id,                        -- Where(どの組織で)
        ip_address, user_agent, request_path,   -- How(どのような方法で)
        access_granted, denial_reason,          -- Result(結果)
        risk_score, request_details             -- Analysis(分析情報)
    ) VALUES (
        current_user_id,
        (SELECT email FROM auth.users WHERE id = current_user_id),
        (SELECT role FROM user_organizations WHERE user_id = current_user_id AND organization_id = current_org_id LIMIT 1),
        p_action,
        p_resource_type,
        p_resource_id,
        current_org_id,
        client_ip,
        client_user_agent,
        current_setting('request.path', true),
        p_access_granted,
        p_denial_reason,
        calculated_risk_score,
        p_request_details
    );
    
    -- [CRITICAL] 高リスクイベントの即座通知(リスクスコア70以上)
    IF calculated_risk_score >= 70 THEN
        PERFORM notify_security_alert(
            'high_risk_activity',
            json_build_object(
                'user_id', current_user_id,
                'action', p_action,
                'risk_score', calculated_risk_score,
                'ip_address', client_ip
            )
        );
    END IF;
END;
$$ LANGUAGE plpgsql SECURITY DEFINER;

初心者向け解説

監査ログシステムの活用例:

使用場面 検索クエリ例 見つかる情報 対処方法
不正ログイン調査 access_granted = false AND action = 'LOGIN' 失敗したログイン試行 IP制限・アカウントロック
データ削除の追跡 action = 'DELETE' AND resource_type = 'projects' プロジェクト削除履歴 バックアップからの復旧
深夜の不審活動 risk_score >= 50 AND created_at::time BETWEEN '22:00' AND '06:00' 深夜の高リスク操作 管理者への通知・調査
特定IPの監視 ip_address = '192.168.1.100' 特定端末からのアクセス 端末の安全性確認

Step 2: 実用的な監査レポート生成

経営陣や監査部門向けの分かりやすいレポートを自動生成する機能を作成します:

--  セキュリティサマリーレポート生成関数
CREATE OR REPLACE FUNCTION generate_security_summary_report(
    start_date date DEFAULT CURRENT_DATE - INTERVAL '30 days',
    end_date date DEFAULT CURRENT_DATE,
    organization_filter bigint DEFAULT NULL
)
RETURNS TABLE(
    report_section text,
    metric_name text, 
    metric_value text,
    risk_level text,
    recommendations text
) AS $$
BEGIN
    --  基本統計情報
    RETURN QUERY
    SELECT 
        '基本統計'::text,
        '総アクセス数'::text,
        COUNT(*)::text,
        CASE 
            WHEN COUNT(*) > 10000 THEN 'HIGH'
            WHEN COUNT(*) > 1000 THEN 'MEDIUM'
            ELSE 'LOW'
        END::text,
        '通常の利用レベルです'::text
    FROM security_audit_logs 
    WHERE created_at::date BETWEEN start_date AND end_date
    AND (organization_filter IS NULL OR organization_id = organization_filter);
    
    -- [CRITICAL] 失敗アクセス統計
    RETURN QUERY
    SELECT 
        '失敗アクセス'::text,
        '失敗ログイン数'::text,
        COUNT(*)::text,
        CASE 
            WHEN COUNT(*) > 100 THEN 'HIGH'
            WHEN COUNT(*) > 10 THEN 'MEDIUM'
            ELSE 'LOW'
        END::text,
        CASE 
            WHEN COUNT(*) > 100 THEN 'IP制限の強化を推奨'
            WHEN COUNT(*) > 10 THEN '監視を継続'
            ELSE '問題なし'
        END::text
    FROM security_audit_logs 
    WHERE created_at::date BETWEEN start_date AND end_date
    AND access_granted = false
    AND (organization_filter IS NULL OR organization_id = organization_filter);
    
    -- [WARN] 高リスク活動統計
    RETURN QUERY
    SELECT 
        '高リスク活動'::text,
        'リスクスコア50以上'::text,
        COUNT(*)::text,
        CASE 
            WHEN COUNT(*) > 50 THEN 'HIGH'
            WHEN COUNT(*) > 10 THEN 'MEDIUM'
            ELSE 'LOW'
        END::text,
        CASE 
            WHEN COUNT(*) > 50 THEN '詳細調査が必要'
            WHEN COUNT(*) > 10 THEN '継続監視推奨'
            ELSE '通常レベル'
        END::text
    FROM security_audit_logs 
    WHERE created_at::date BETWEEN start_date AND end_date
    AND risk_score >= 50
    AND (organization_filter IS NULL OR organization_id = organization_filter);
    
END;
$$ LANGUAGE plpgsql SECURITY DEFINER;

7.3 脆弱性対策と侵入検知(城の警備隊システム)

城に侵入者が現れたとき、即座に検知して対処する警備隊のように、システムへの攻撃を自動検知・防御するシステムを構築します:

 城の警備隊システム
├──  パトロール:定期的な脆弱性スキャン
├──  監視兵:リアルタイム侵入検知
├── [CRITICAL] 警報:異常検知時の即座アラート
└──  防御:自動ブロック・反撃機能

 デジタル侵入検知システム
├──  脆弱性スキャン:定期的なセキュリティチェック
├──  異常検知:パターン分析による攻撃検知
├── [CRITICAL] アラート:管理者への即座通知
└──  自動防御:IP遮断・レート制限・アカウント保護

Step 1: 異常アクセスパターン検知

「同じIPから短時間で大量ログイン失敗」「深夜の管理者権限操作」といった異常パターンを自動検知します:

--  異常アクセスパターン検知関数(不審者発見システム)
CREATE OR REPLACE FUNCTION detect_suspicious_activities()
RETURNS TABLE(
    threat_type text,
    description text,
    severity text,
    evidence jsonb,
    recommended_action text
) AS $$
BEGIN
    -- [CRITICAL] パターン1: 短時間での大量ログイン失敗(ブルートフォース攻撃)
    RETURN QUERY
    SELECT 
        'brute_force_attack'::text,
        'ブルートフォース攻撃の可能性'::text,
        'HIGH'::text,
        json_build_object(
            'ip_address', ip_address,
            'failed_attempts', COUNT(*),
            'time_window', '過去1時間'
        )::jsonb,
        'IPアドレスの即座ブロック推奨'::text
    FROM security_audit_logs 
    WHERE created_at > NOW() - INTERVAL '1 hour'
    AND access_granted = false
    AND action = 'LOGIN'
    GROUP BY ip_address
    HAVING COUNT(*) >= 10;  -- 1時間で10回以上のログイン失敗
    
    --  パターン2: 深夜の管理者権限操作(内部犯行の可能性)
    RETURN QUERY
    SELECT 
        'suspicious_admin_activity'::text,
        '深夜の管理者権限操作'::text,
        'MEDIUM'::text,
        json_build_object(
            'user_id', user_id,
            'user_email', user_email,
            'actions', array_agg(action),
            'time_range', '22:00-06:00'
        )::jsonb,
        'ユーザーへの確認とアクセス制限検討'::text
    FROM security_audit_logs 
    WHERE created_at::time BETWEEN '22:00' AND '06:00'
    AND action IN ('DELETE', 'PERMISSION_CHANGE', 'USER_CREATE')
    AND created_at > NOW() - INTERVAL '24 hours'
    GROUP BY user_id, user_email
    HAVING COUNT(*) >= 3;  -- 深夜に3回以上の管理操作
    
    --  パターン3: 異常な地理的アクセス(不正アクセスの可能性)
    RETURN QUERY
    SELECT 
        'unusual_geographic_access'::text,
        '複数地域からの同時アクセス'::text,
        'MEDIUM'::text,
        json_build_object(
            'user_id', user_id,
            'ip_addresses', array_agg(DISTINCT ip_address),
            'access_count', COUNT(DISTINCT ip_address)
        )::jsonb,
        'アカウント乗っ取りの可能性・パスワード変更推奨'::text
    FROM security_audit_logs 
    WHERE created_at > NOW() - INTERVAL '1 hour'
    AND access_granted = true
    GROUP BY user_id
    HAVING COUNT(DISTINCT ip_address) >= 3;  -- 1時間で3つ以上の異なるIP
    
END;
$$ LANGUAGE plpgsql SECURITY DEFINER;

初心者向け解説

異常検知システムの判定基準:

攻撃パターン 検知条件 深刻度 対処方法 実際の被害例
ブルートフォース 1時間で10回以上ログイン失敗 HIGH IP即座ブロック パスワード総当たり攻撃
内部犯行 深夜に管理操作3回以上 MEDIUM ユーザー確認 不正データ削除・持出し
アカウント乗っ取り 1時間で3つ以上の異なるIP MEDIUM パスワード変更促進 なりすましによる情報流出
権限昇格攻撃 短時間での権限変更試行 HIGH アカウント停止 管理者権限の不正取得

Step 2: 自動防御システム(自動迎撃機能)

異常を検知したら、人間の判断を待たずに自動的に防御措置を実行するシステムを構築します:

--  自動防御システム(自動迎撃機能)
CREATE OR REPLACE FUNCTION execute_automatic_defense(
    threat_type text,
    evidence jsonb
)
RETURNS jsonb AS $$
DECLARE
    defense_actions jsonb := '[]'::jsonb;
    blocked_ip inet;
    suspended_user_id uuid;
    alert_sent boolean := false;
BEGIN
    -- [CRITICAL] ブルートフォース攻撃への対処
    IF threat_type = 'brute_force_attack' THEN
        blocked_ip := (evidence->>'ip_address')::inet;
        
        --  IPアドレスを一時ブロック(24時間)
        INSERT INTO ip_blacklist (ip_address, reason, expires_at, created_by)
        VALUES (
            blocked_ip,
            'Brute force attack detected',
            NOW() + INTERVAL '24 hours',
            '00000000-0000-0000-0000-000000000000'  -- システム自動
        );
        
        defense_actions := defense_actions || 
            json_build_object('action', 'ip_blocked', 'target', blocked_ip, 'duration', '24時間')::jsonb;
    END IF;
    
    --  不審なユーザー活動への対処
    IF threat_type IN ('suspicious_admin_activity', 'unusual_geographic_access') THEN
        suspended_user_id := (evidence->>'user_id')::uuid;
        
        --  ユーザーアカウントを一時停止
        UPDATE auth.users 
        SET is_suspended = true, 
            suspension_reason = threat_type,
            suspended_at = NOW()
        WHERE id = suspended_user_id;
        
        --  すべてのセッションを無効化
        UPDATE user_sessions 
        SET is_active = false, 
            terminated_at = NOW(),
            termination_reason = 'security_suspension'
        WHERE user_id = suspended_user_id AND is_active = true;
        
        defense_actions := defense_actions || 
            json_build_object('action', 'user_suspended', 'user_id', suspended_user_id)::jsonb;
    END IF;
    
    --  管理者への緊急通知
    INSERT INTO security_alerts (
        alert_type, 
        severity, 
        message, 
        evidence, 
        auto_actions_taken,
        created_at
    ) VALUES (
        threat_type,
        'HIGH',
        '自動防御システムが脅威を検知し、防御措置を実行しました',
        evidence,
        defense_actions,
        NOW()
    );
    
    alert_sent := true;
    defense_actions := defense_actions || 
        json_build_object('action', 'alert_sent', 'status', 'success')::jsonb;
    
    --  防御実行結果を返す
    RETURN json_build_object(
        'defense_executed', true,
        'threat_type', threat_type,
        'actions_taken', defense_actions,
        'timestamp', NOW()
    );
    
EXCEPTION
    WHEN OTHERS THEN
        -- [NG] 防御実行失敗時のログ
        INSERT INTO system_errors (error_type, error_message, context)
        VALUES ('defense_system_failure', SQLERRM, evidence);
        
        RETURN json_build_object(
            'defense_executed', false,
            'error', SQLERRM,
            'timestamp', NOW()
        );
END;
$$ LANGUAGE plpgsql SECURITY DEFINER;

Step 3: 定期的な脅威スキャン(巡回警備システム)

城の警備兵が定期的に巡回するように、定期的に脅威をスキャンして予防するシステムを構築します:

--  定期脅威スキャン関数(巡回警備システム)
CREATE OR REPLACE FUNCTION perform_security_patrol()
RETURNS jsonb AS $$
DECLARE
    scan_result jsonb := '{"threats_detected": [], "scan_summary": {}}'::jsonb;
    threat_record record;
    total_threats integer := 0;
    high_severity_threats integer := 0;
BEGIN
    --  異常活動の検知実行
    FOR threat_record IN 
        SELECT * FROM detect_suspicious_activities()
    LOOP
        total_threats := total_threats + 1;
        
        -- 高深刻度の脅威をカウント
        IF threat_record.severity = 'HIGH' THEN
            high_severity_threats := high_severity_threats + 1;
            
            -- [CRITICAL] 自動防御システム発動
            PERFORM execute_automatic_defense(
                threat_record.threat_type, 
                threat_record.evidence
            );
        END IF;
        
        -- 脅威リストに追加
        scan_result := jsonb_set(
            scan_result,
            '{threats_detected}',
            (scan_result->'threats_detected') || 
            json_build_object(
                'type', threat_record.threat_type,
                'description', threat_record.description,
                'severity', threat_record.severity,
                'evidence', threat_record.evidence,
                'action', threat_record.recommended_action
            )::jsonb
        );
    END LOOP;
    
    --  スキャン結果サマリー
    scan_result := jsonb_set(
        scan_result,
        '{scan_summary}',
        json_build_object(
            'scan_time', NOW(),
            'total_threats', total_threats,
            'high_severity_threats', high_severity_threats,
            'auto_defenses_activated', high_severity_threats,
            'system_status', CASE 
                WHEN high_severity_threats > 0 THEN 'ALERT'
                WHEN total_threats > 0 THEN 'WARNING' 
                ELSE 'NORMAL'
            END
        )::jsonb
    );
    
    --  スキャン履歴を記録
    INSERT INTO security_scan_history (
        scan_type,
        scan_result,
        threats_found,
        created_at
    ) VALUES (
        'automated_patrol',
        scan_result,
        total_threats,
        NOW()
    );
    
    RETURN scan_result;
END;
$$ LANGUAGE plpgsql SECURITY DEFINER;

初心者向け解説

定期スキャンシステムの動作例:

スキャン時間 検知される脅威 自動対処 管理者通知
毎時00分 ブルートフォース攻撃 IP自動ブロック 緊急メール送信
毎時30分 不審なログイン ユーザー一時停止 Slack通知
深夜2時 管理者権限乱用 セッション強制終了 SMS緊急通報
通常時 脅威なし 対処なし 日報に記録

第7章 学習まとめと実践演習

学習チェックリスト

基礎レベル(初心者)

  • [] RLSの基本概念を理解している
  • [] 権限システムの3つのルート(直接・ロール・組織)を説明できる
  • [] 監査ログの重要性を理解している
  • [] 基本的な異常検知パターンを知っている

応用レベル(中級者)

  • [] 時間ベースアクセス制御を実装できる
  • [] IPアドレス制限を設定できる
  • [] 階層組織での権限継承を設計できる
  • [] セキュリティ監査レポートを生成できる

発展レベル(上級者)

  • [] 動的RLSポリシーを作成できる
  • [] 異常検知システムを構築できる
  • [] 自動防御システムを実装できる
  • [] 包括的なセキュリティ戦略を立案できる

実践演習

演習1: 基本的なRLSポリシー作成( 基礎)

以下の要件でRLSポリシーを作成してください:

-- 要件:
-- 1. ユーザーは自分が作成したタスクのみ閲覧可能
-- 2. プロジェクトメンバーは プロジェクト内のタスクを閲覧可能
-- 3. 組織管理者は組織内の全タスクを閲覧可能

-- 解答例(テーブル/カラム名は本章のスキーマに合わせて調整)
CREATE POLICY "task_access_policy" ON tasks
FOR SELECT USING (
    -- 1. ユーザーは自分が作成したタスクのみ閲覧可能
    created_by = auth.uid()
    OR
    -- 2. プロジェクトメンバーは プロジェクト内のタスクを閲覧可能
    project_id IN (
        SELECT pm.project_id
        FROM project_members pm
        WHERE pm.user_id = auth.uid()
          AND pm.is_active = true
    )
    OR
    -- 3. 組織管理者(owner/admin)は組織内の全タスクを閲覧可能
    project_id IN (
        SELECT p.id
        FROM projects p
        JOIN user_organizations uo ON uo.organization_id = p.organization_id
        WHERE uo.user_id = auth.uid()
          AND uo.is_active = true
          AND uo.role IN ('owner', 'admin')
    )
);

演習2: 監査ログ分析( 応用)

以下の監査ログから異常を特定し、対処法を提案してください:

-- サンプルデータ
SELECT user_email, action, created_at, ip_address, access_granted 
FROM security_audit_logs 
WHERE created_at > '2024-01-01 02:00:00'::timestamp;

-- 結果例:
-- admin@company.com, DELETE, 2024-01-01 02:15:00, 192.168.1.100, true
-- admin@company.com, DELETE, 2024-01-01 02:16:00, 192.168.1.100, true  
-- admin@company.com, PERMISSION_CHANGE, 2024-01-01 02:17:00, 192.168.1.100, true

質問: このログから何が読み取れますか?どのような対処が必要でしょうか?

演習3: セキュリティシステム設計( 発展)

中規模SaaS企業(従業員100名、顧客データ10万件)のセキュリティシステムを設計してください:

要件:

  • 多段階認証の実装
  • 役割ベースアクセス制御
  • リアルタイム異常検知
  • 自動インシデント対応
  • コンプライアンス対応

実装課題

  1. 認証強化: 多要素認証(MFA)システムの実装
  2. 監査機能: SOC2準拠の監査ログシステム構築
  3. 異常検知: 機械学習を使った異常検知の導入
  4. 自動化: DevSecOpsパイプラインでの自動セキュリティテスト

成果物チェックリスト

実装完了時に以下を確認してください:

  • [] RLSポリシーが適切に設定されている
  • [] 権限システムが正常に動作している
  • [] 監査ログが記録されている
  • [] 異常検知システムが稼働している
  • [] 自動防御システムが機能している
  • [] セキュリティドキュメントが整備されている
  • [] インシデント対応手順が確立されている

推奨学習時間

  • 基礎習得: 8〜12時間
  • 応用実装: 12〜16時間
  • 発展システム: 16〜24時間
  • 実践課題: 20〜30時間

合計: 56〜82時間


第7章 学習まとめ

[OK] 習得できたスキル

  • [OK] エンタープライズ級多層セキュリティ防御システム構築
  • [OK] 高度なRLS・動的権限制御・組織階層管理
  • [OK] セキュリティ監査・コンプライアンス対応システム
  • [OK] 脅威検知・侵入防止・自動対応システム実装

セキュリティレベル比較

| セキュリティ要素 | 第1章〜第2章 (基礎) | 第7章 (エンタープライズ) | 適用場面 | |:—————-|:——————|:—————————-|:——–| | 認証| 基本JWT | 多要素認証・OAuth・SAML | 金融・医療・政府 | | 認可| 簡単RLS | 動的権限・階層制御 | 大企業・規制業界 | | 監査| [NG] なし | 完全監査証跡 | コンプライアンス | | 脅威対策| [WARN] 基本的 | [CRITICAL] AI脅威検知・自動対応 | 高セキュリティ環境 |

次の学習ステップ

第8章で学ぶ運用監視の前提知識:

  • [OK] セキュリティ監視システムの構築経験(運用監視への拡張)
  • [OK] ログ収集・分析の実装経験(システム監視への応用)
  • [OK] 自動対応システムの実装経験(運用自動化への応用)
  • [OK] アラート・通知システムの実装経験(運用アラートへの応用)

次章予告:運用監視と自動化

第8章では、「病院の生命維持管理システム」レベルの運用監視を実装します:

  • リアルタイム監視: システム健康状態の24時間監視
  • [CRITICAL] インテリジェント・アラート: AI による異常検知と自動対応
  • 自動復旧: 障害の自動検知・診断・修復システム
  • 運用ダッシュボード: 総合的なシステム状況可視化

実装目標: 「人間の睡眠時間も含めて、24時間365日安定稼働するシステム運用」


ナビゲーション