系統架構與核心組件

返回 主文件

本文件詳細說明 LLM 驅動 WEB 安全性健康報告生成系統的整體架構設計、核心組件與技術選型。


1. 整體架構概覽

1.1 設計理念

本系統採用微服務導向的雲原生架構,核心設計原則包括:

1.2 架構圖

┌─────────────────────────────────────────────────────────────────┐
│                          使用者層                                │
│  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐          │
│  │ Web Browser  │  │  Mobile App  │  │   CLI Tool   │          │
│  └──────┬───────┘  └──────┬───────┘  └──────┬───────┘          │
│         │                  │                  │                   │
└─────────┼──────────────────┼──────────────────┼───────────────────┘
          │                  │                  │
          └──────────────────┴──────────────────┘
                             │
                             ▼
┌─────────────────────────────────────────────────────────────────┐
│                       API Gateway                                │
│              (Nginx / Kong / AWS API Gateway)                    │
│          [認證] [授權] [速率限制] [請求路由]                     │
└─────────────────────────┬───────────────────────────────────────┘
                          │
          ┌───────────────┼───────────────┐
          │               │               │
          ▼               ▼               ▼
┌──────────────┐  ┌──────────────┐  ┌──────────────┐
│   Frontend   │  │   Backend    │  │   Auth       │
│   Service    │  │   Service    │  │   Service    │
│ (Next.js)    │  │  (FastAPI)   │  │  (Keycloak)  │
└──────────────┘  └──────┬───────┘  └──────────────┘
                         │
                         ▼
                  ┌──────────────────┐
                  │  LLM Orchestrator │ ◄─── LLM API (OpenAI/Claude)
                  │   協調核心         │
                  └──────┬───────────┘
                         │
          ┌──────────────┼──────────────┐
          │              │              │
          ▼              ▼              ▼
┌──────────────┐  ┌──────────────┐  ┌──────────────┐
│  Knowledge   │  │   Task       │  │  Sandbox     │
│  Base        │  │   Queue      │  │  Manager     │
│ (PostgreSQL) │  │  (Redis)     │  │  (Docker)    │
└──────────────┘  └──────────────┘  └──────┬───────┘
                                            │
                                            ▼
                                    ┌──────────────────┐
                                    │  Execution       │
                                    │  Sandbox Pool    │
                                    │ ┌────┐ ┌────┐   │
                                    │ │ C1 │ │ C2 │   │ (Container)
                                    │ └────┘ └────┘   │
                                    └──────────────────┘
                                            │
                                            ▼
                                    ┌──────────────────┐
                                    │  Tool            │
                                    │  Integration     │
                                    │  Layer           │
                                    │ (nmap, curl...)  │
                                    └──────────────────┘

2. 核心組件詳述

2.1 使用者介面 (UI - Web Portal)

技術棧

核心功能模組

1. 儀表板 (Dashboard)
- 最近掃描歷史
- 風險趨勢圖表
- 待處理漏洞統計

2. 掃描啟動介面
- 四種模式的切換 UI
- 目標輸入與驗證
- 掃描參數設定

3. 對話介面 (Conversational UI)
- 仿 ChatGPT 的對話視窗
- 支援 Markdown 渲染
- 腳本預覽與確認機制

4. 報告檢視器
- 結構化結果展示
- 檢測邏輯圖視覺化
- 匯出功能(PDF / JSON)

5. 教學案例庫瀏覽器
- 搜尋與篩選功能
- 案例詳情頁
- 標記與收藏功能

安全考量


2.2 後端服務 (Backend Service)

技術棧

核心 API 端點

# 掃描管理
POST   /api/v1/scans              # 創建新掃描任務
GET    /api/v1/scans/{scan_id}    # 查詢掃描狀態
DELETE /api/v1/scans/{scan_id}    # 取消/刪除掃描

# 結果查詢
GET    /api/v1/scans/{scan_id}/results      # 取得掃描結果
GET    /api/v1/scans/{scan_id}/raw-output   # 取得原始輸出

# 對話模式
POST   /api/v1/conversations           # 開始對話
POST   /api/v1/conversations/{id}/messages  # 發送訊息

# 教學案例
GET    /api/v1/educational-cases      # 瀏覽案例庫
GET    /api/v1/educational-cases/{id} # 取得案例詳情

# CVE 查詢
GET    /api/v1/cve/search?q=apache    # 搜尋 CVE
GET    /api/v1/cve/{cve_id}           # 取得 CVE 詳情

# 系統管理
GET    /api/v1/health                 # 健康檢查
GET    /api/v1/metrics                # 系統指標

Pydantic 模型示例

from pydantic import BaseModel, HttpUrl, Field
from typing import Literal, Optional
from datetime import datetime

class ScanCreateRequest(BaseModel):
    target: HttpUrl
    scan_mode: Literal["guided", "intelligence", "conversational", "automated"]
    selected_tests: Optional[list[str]] = None
    options: Optional[dict] = None

class ScanStatusResponse(BaseModel):
    scan_id: str
    status: Literal["pending", "running", "completed", "failed"]
    progress: int = Field(ge=0, le=100)
    current_step: Optional[str] = None
    created_at: datetime
    estimated_completion: Optional[datetime] = None

2.3 LLM 協調核心 (LLM Orchestration Core)

這是系統的「大腦」,負責智能決策與工作流調度。

核心職責

  1. Prompt 工程與生成
  2. 腳本生成與驗證
  3. 結果解釋與分析
  4. 對話管理
  5. 工具選擇邏輯

架構設計

class LLMOrchestrator:
    """
    LLM 協調核心 - 系統智能的中樞
    """
    def __init__(self, llm_client, knowledge_base, sandbox_manager):
        self.llm_client = llm_client  # OpenAI / Anthropic 客戶端
        self.kb = knowledge_base      # 知識庫存取
        self.sandbox = sandbox_manager

    async def process_scan_request(self, request: ScanRequest) -> ScanPlan:
        """
        根據掃描請求生成執行計畫
        """
        # 1. 意圖理解
        intent = await self._understand_intent(request)

        # 2. CVE 情資查詢(模式 B)
        if request.mode == "intelligence":
            cve_suggestions = await self._query_cve_database(request.target)
            intent.enrich_with_cve(cve_suggestions)

        # 3. 工具選擇
        tools = await self._select_tools(intent)

        # 4. 生成執行計畫
        plan = ScanPlan(intent=intent, tools=tools)
        return plan

    async def generate_script(self, task: Task) -> ScriptArtifact:
        """
        針對特定任務生成 Python 腳本
        """
        prompt = self._build_script_generation_prompt(task)
        response = await self.llm_client.complete(prompt)

        # 腳本驗證
        script = self._extract_code_from_response(response)
        if not self._validate_script_safety(script):
            raise SecurityError("Generated script contains unsafe operations")

        return ScriptArtifact(code=script, task=task)

    async def interpret_results(self, raw_output: str, context: Context) -> Analysis:
        """
        解釋掃描結果
        """
        prompt = f"""
        As a senior cybersecurity analyst, interpret the following scan results:

        Context:
        - Target: {context.target}
        - Tool used: {context.tool}
        - Goal: {context.goal}

        Raw Output:
        {raw_output}

        Provide:
        1. Executive summary
        2. Detailed findings with severity
        3. Remediation recommendations
        4. Educational explanation
        """

        response = await self.llm_client.complete(prompt)
        return Analysis.from_llm_response(response)

Prompt 模板庫

系統維護一個 Prompt 模板庫,針對不同工具與任務:

模板範例:nmap 版本探測

NMAP_VERSION_DETECTION_PROMPT = """
As a senior penetration tester, write a Python script to perform service version detection on the target {target}.

Requirements:
1. Use the 'subprocess' module to call nmap
2. Scan ports: {ports}
3. Use the -sV flag for version detection
4. Output results in XML format using -oX -
5. Parse the XML and print a JSON structure with:
   - port (int)
   - protocol (str)
   - service_name (str)
   - service_version (str)
6. Handle errors gracefully
7. Set a timeout of 300 seconds

IMPORTANT SECURITY RULES:
- Do NOT execute shell commands directly with shell=True
- Do NOT accept user input without validation
- Use only the nmap binary at /usr/bin/nmap

Example output format:
[
  {{"port": 80, "protocol": "tcp", "service_name": "http", "service_version": "Apache httpd 2.4.41"}}
]
"""

模板範例:CVE 專用漏洞驗證

CVE_VERIFICATION_PROMPT = """
As a senior penetration tester, write a Python script to test if the target {target} is vulnerable to {cve_id}.

CVE Details:
{cve_description}

Verification Method:
{verification_method}

Requirements:
1. Use the 'requests' library for HTTP interactions
2. Implement the exact verification steps described above
3. Print "VULNERABLE" if the target is confirmed vulnerable
4. Print "NOT_VULNERABLE" if the target appears patched
5. Print "UNCERTAIN" if the test is inconclusive
6. Include evidence in JSON format
7. Set appropriate timeouts and error handling

IMPORTANT:
- This is authorized security testing
- Do NOT cause damage or disruption
- Use minimal payloads necessary for verification
"""

2.4 安全執行沙箱 (Secure Execution Sandbox)

設計原則

Zero-Trust 原則:假設生成的腳本可能包含惡意程式碼或錯誤。

Docker 容器配置

Dockerfile

FROM python:3.11-slim

# 安裝安全工具
RUN apt-get update && apt-get install -y \
    nmap \
    curl \
    dnsutils \
    nikto \
    && rm -rf /var/lib/apt/lists/*

# 安裝 Python 函式庫
RUN pip install --no-cache-dir \
    requests \
    beautifulsoup4 \
    python-nmap \
    lxml

# 建立非 root 使用者
RUN useradd -m -u 1000 scanner
USER scanner
WORKDIR /workspace

# 限制寫入權限
RUN mkdir /workspace/output && chmod 700 /workspace/output

ENTRYPOINT ["python", "/workspace/script.py"]

Docker Run 參數

docker run \
  --rm \                           # 執行後自動刪除
  --network=scan_net \             # 隔離網路
  --cpus="1.0" \                   # CPU 限制
  --memory="512m" \                # 記憶體限制
  --memory-swap="512m" \
  --pids-limit=100 \               # 行程數限制
  --read-only \                    # 唯讀檔案系統
  --tmpfs /tmp:rw,noexec,nosuid \  # 臨時檔案系統
  --security-opt=no-new-privileges \
  --cap-drop=ALL \                 # 移除所有能力
  --cap-add=NET_RAW \              # 僅添加必要能力(nmap)
  -v $(pwd)/script.py:/workspace/script.py:ro \
  -v $(pwd)/output:/workspace/output:rw \
  scanner-sandbox:latest

沙箱管理器 (Python)

import docker
import asyncio
from typing import Optional

class SandboxManager:
    def __init__(self):
        self.client = docker.from_env()
        self.max_concurrent = 5
        self.semaphore = asyncio.Semaphore(self.max_concurrent)

    async def execute_script(
        self,
        script: str,
        timeout: int = 300
    ) -> ExecutionResult:
        """
        在隔離沙箱中執行腳本
        """
        async with self.semaphore:  # 限制並行數
            container = None
            try:
                # 準備腳本檔案
                script_path = self._prepare_script_file(script)

                # 啟動容器
                container = self.client.containers.run(
                    image="scanner-sandbox:latest",
                    volumes={
                        script_path: {"bind": "/workspace/script.py", "mode": "ro"}
                    },
                    network_mode="scan_net",
                    mem_limit="512m",
                    cpu_period=100000,
                    cpu_quota=100000,  # 1 CPU
                    detach=True,
                    remove=False  # 手動刪除以捕獲日誌
                )

                # 等待執行完成(含 timeout)
                result = await asyncio.wait_for(
                    self._wait_container(container),
                    timeout=timeout
                )

                # 收集輸出
                logs = container.logs(stdout=True, stderr=True).decode('utf-8')

                return ExecutionResult(
                    exit_code=result,
                    stdout=logs,
                    stderr="",
                    execution_time=self._get_execution_time(container)
                )

            except asyncio.TimeoutError:
                if container:
                    container.kill()
                raise ExecutionTimeoutError(f"Script execution exceeded {timeout}s")

            finally:
                if container:
                    container.remove(force=True)
                self._cleanup_script_file(script_path)

    async def _wait_container(self, container):
        """異步等待容器完成"""
        while True:
            container.reload()
            if container.status == "exited":
                return container.attrs["State"]["ExitCode"]
            await asyncio.sleep(1)

網路隔離策略

使用 Docker 自訂網路,並透過 iptables 規則限制:

# 建立隔離網路
docker network create \
  --driver bridge \
  --subnet 172.20.0.0/16 \
  --opt "com.docker.network.bridge.enable_icc=false" \
  scan_net

# iptables 規則(僅允許外部掃描,禁止存取內部網路)
iptables -A DOCKER-USER -s 172.20.0.0/16 -d 10.0.0.0/8 -j DROP
iptables -A DOCKER-USER -s 172.20.0.0/16 -d 172.16.0.0/12 -j DROP
iptables -A DOCKER-USER -s 172.20.0.0/16 -d 192.168.0.0/16 -j DROP

2.5 數據轉換器 (Data Transformer)

負責將各種工具的原始輸出解析為統一的 JSON 結構。

架構設計

from abc import ABC, abstractmethod
import json
import xml.etree.ElementTree as ET

class OutputParser(ABC):
    """解析器基類"""
    @abstractmethod
    def parse(self, raw_output: str) -> list[Finding]:
        pass

class NmapXMLParser(OutputParser):
    """nmap XML 輸出解析器"""
    def parse(self, raw_output: str) -> list[Finding]:
        root = ET.fromstring(raw_output)
        findings = []

        for host in root.findall('.//host'):
            for port in host.findall('.//port'):
                finding = Finding(
                    severity=self._assess_severity(port),
                    title=f"Open port detected: {port.get('portid')}",
                    description=self._build_description(port),
                    evidence={
                        "port": int(port.get('portid')),
                        "protocol": port.get('protocol'),
                        "service_name": port.find('service').get('name'),
                        "service_version": port.find('service').get('version')
                    }
                )
                findings.append(finding)

        return findings

class NiktoParser(OutputParser):
    """Nikto 輸出解析器"""
    def parse(self, raw_output: str) -> list[Finding]:
        # Nikto 輸出為純文字,使用正則提取
        import re
        findings = []

        pattern = r'\+ ([^:]+): (.+)'
        for match in re.finditer(pattern, raw_output):
            finding = Finding(
                severity="Medium",  # Nikto 預設為 Medium
                title=match.group(1),
                description=match.group(2),
                evidence={"raw_line": match.group(0)}
            )
            findings.append(finding)

        return findings

class ParserFactory:
    """解析器工廠"""
    _parsers = {
        "nmap": NmapXMLParser(),
        "nikto": NiktoParser(),
        # ... 其他工具
    }

    @classmethod
    def get_parser(cls, tool_name: str) -> OutputParser:
        return cls._parsers.get(tool_name.lower())

2.6 知識庫與資料庫 (Knowledge Base & Database)

資料庫技術選型

PostgreSQL 15+ with JSONB support

Schema 設計

詳見 proposal_core_values.html 中的資料表結構。

CVE 知識庫整合

資料來源
- NVD (National Vulnerability Database)
- MITRE CVE List
- Exploit-DB

更新策略
- 每日自動同步最新 CVE
- 使用 CVE JSON Feed API

查詢優化

-- 索引設計
CREATE INDEX idx_cve_cpe ON cve_database USING GIN (affected_cpe jsonb_path_ops);
CREATE INDEX idx_cve_severity ON cve_database (severity);
CREATE INDEX idx_cve_published ON cve_database (published_date DESC);

-- 查詢範例:找出影響 Apache 2.4.49 的 CVE
SELECT cve_id, description, severity, cvss_score
FROM cve_database
WHERE affected_cpe @> '["cpe:2.3:a:apache:http_server:2.4.49"]'::jsonb
ORDER BY cvss_score DESC;

2.7 工具整合層 (Tool Integration Layer)

預裝工具清單

類別 工具 用途
資訊收集 nmap 連接埠掃描、版本探測
curl HTTP 請求與 Header 分析
whois 域名資訊查詢
dig DNS 記錄查詢
Web 掃描 Nikto Web 伺服器漏洞掃描
GoBuster 目錄與檔案枚舉
WhatWeb CMS 識別
漏洞掃描 SQLMap SQL Injection 檢測
WPScan WordPress 專用掃描
SSLyze SSL/TLS 配置檢測
Python 庫 requests HTTP 客戶端
beautifulsoup4 HTML 解析
python-nmap nmap Python 介面

工具包裝器設計

為每個工具建立標準化包裝器:

class ToolWrapper(ABC):
    @abstractmethod
    def build_command(self, params: dict) -> list[str]:
        """建構命令列參數"""
        pass

    @abstractmethod
    def parse_output(self, output: str) -> dict:
        """解析工具輸出"""
        pass

class NmapWrapper(ToolWrapper):
    def build_command(self, params: dict) -> list[str]:
        cmd = ["/usr/bin/nmap"]

        if params.get("version_detection"):
            cmd.append("-sV")

        if params.get("ports"):
            cmd.extend(["-p", params["ports"]])

        cmd.extend(["-oX", "-"])  # XML 輸出到 stdout
        cmd.append(params["target"])

        return cmd

    def parse_output(self, output: str) -> dict:
        parser = NmapXMLParser()
        findings = parser.parse(output)
        return {"findings": [f.dict() for f in findings]}

3. 系統整合流程

3.1 完整請求流程

[使用者] 提交掃描請求
    ↓
[API Gateway] 驗證授權、速率限制
    ↓
[Backend Service] 創建 Scan 記錄 (status=pending)
    ↓
[Task Queue] 將任務加入 Redis 佇列
    ↓
[LLM Orchestrator] 從佇列取出任務
    ↓
[LLM API] 生成檢測腳本
    ↓
[LLM Orchestrator] 驗證腳本安全性
    ↓
[Sandbox Manager] 在 Docker 容器中執行腳本
    ↓
[Tool Integration Layer] 工具執行並返回原始輸出
    ↓
[Data Transformer] 解析為結構化 JSON
    ↓
[LLM Orchestrator] 再次調用 LLM 解釋結果
    ↓
[Backend Service] 更新 Scan 記錄 (status=completed)
    ↓
[Frontend] 輪詢獲取結果並展示

3.2 錯誤處理與重試

class ScanOrchestrator:
    async def execute_scan(self, scan_id: str):
        try:
            # 更新狀態為 running
            await self.db.update_scan_status(scan_id, "running")

            # 生成執行計畫
            plan = await self.llm.process_scan_request(scan_id)

            # 執行各個任務
            for task in plan.tasks:
                try:
                    result = await self._execute_task(task)
                    await self.db.save_task_result(scan_id, result)
                except TaskExecutionError as e:
                    # 單一任務失敗,記錄但繼續
                    await self.db.log_task_error(scan_id, task.id, str(e))
                    continue

            # 生成最終報告
            await self._generate_report(scan_id)

            # 更新狀態為 completed
            await self.db.update_scan_status(scan_id, "completed")

        except Exception as e:
            await self.db.update_scan_status(scan_id, "failed")
            await self.db.log_error(scan_id, str(e))
            raise

4. 可擴展性與效能

4.1 水平擴展策略

4.2 快取策略

# Redis 快取層
cache_config = {
    "cve_data": 3600,        # CVE 資料快取 1 小時
    "llm_prompts": 86400,    # Prompt 模板快取 24 小時
    "scan_results": 1800     # 掃描結果快取 30 分鐘
}

4.3 效能指標目標

指標 目標值
API 回應時間 (P95) < 200ms
掃描啟動延遲 < 5s
單次工具執行時間 < 5min
完整自動化掃描時間 < 30min
系統並行掃描數 > 50

5. 安全與合規

5.1 安全檢查清單

5.2 合規考量


本文件最後更新:2025-10-21

返回 主文件