返回 主文件
本文件詳細說明 LLM 驅動 WEB 安全性健康報告生成系統的整體架構設計、核心組件與技術選型。
本系統採用微服務導向的雲原生架構,核心設計原則包括:
┌─────────────────────────────────────────────────────────────────┐
│ 使用者層 │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Web Browser │ │ Mobile App │ │ CLI Tool │ │
│ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ │
│ │ │ │ │
└─────────┼──────────────────┼──────────────────┼───────────────────┘
│ │ │
└──────────────────┴──────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ API Gateway │
│ (Nginx / Kong / AWS API Gateway) │
│ [認證] [授權] [速率限制] [請求路由] │
└─────────────────────────┬───────────────────────────────────────┘
│
┌───────────────┼───────────────┐
│ │ │
▼ ▼ ▼
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Frontend │ │ Backend │ │ Auth │
│ Service │ │ Service │ │ Service │
│ (Next.js) │ │ (FastAPI) │ │ (Keycloak) │
└──────────────┘ └──────┬───────┘ └──────────────┘
│
▼
┌──────────────────┐
│ LLM Orchestrator │ ◄─── LLM API (OpenAI/Claude)
│ 協調核心 │
└──────┬───────────┘
│
┌──────────────┼──────────────┐
│ │ │
▼ ▼ ▼
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Knowledge │ │ Task │ │ Sandbox │
│ Base │ │ Queue │ │ Manager │
│ (PostgreSQL) │ │ (Redis) │ │ (Docker) │
└──────────────┘ └──────────────┘ └──────┬───────┘
│
▼
┌──────────────────┐
│ Execution │
│ Sandbox Pool │
│ ┌────┐ ┌────┐ │
│ │ C1 │ │ C2 │ │ (Container)
│ └────┘ └────┘ │
└──────────────────┘
│
▼
┌──────────────────┐
│ Tool │
│ Integration │
│ Layer │
│ (nmap, curl...) │
└──────────────────┘
1. 儀表板 (Dashboard)
- 最近掃描歷史
- 風險趨勢圖表
- 待處理漏洞統計
2. 掃描啟動介面
- 四種模式的切換 UI
- 目標輸入與驗證
- 掃描參數設定
3. 對話介面 (Conversational UI)
- 仿 ChatGPT 的對話視窗
- 支援 Markdown 渲染
- 腳本預覽與確認機制
4. 報告檢視器
- 結構化結果展示
- 檢測邏輯圖視覺化
- 匯出功能(PDF / JSON)
5. 教學案例庫瀏覽器
- 搜尋與篩選功能
- 案例詳情頁
- 標記與收藏功能
# 掃描管理
POST /api/v1/scans # 創建新掃描任務
GET /api/v1/scans/{scan_id} # 查詢掃描狀態
DELETE /api/v1/scans/{scan_id} # 取消/刪除掃描
# 結果查詢
GET /api/v1/scans/{scan_id}/results # 取得掃描結果
GET /api/v1/scans/{scan_id}/raw-output # 取得原始輸出
# 對話模式
POST /api/v1/conversations # 開始對話
POST /api/v1/conversations/{id}/messages # 發送訊息
# 教學案例
GET /api/v1/educational-cases # 瀏覽案例庫
GET /api/v1/educational-cases/{id} # 取得案例詳情
# CVE 查詢
GET /api/v1/cve/search?q=apache # 搜尋 CVE
GET /api/v1/cve/{cve_id} # 取得 CVE 詳情
# 系統管理
GET /api/v1/health # 健康檢查
GET /api/v1/metrics # 系統指標
from pydantic import BaseModel, HttpUrl, Field
from typing import Literal, Optional
from datetime import datetime
class ScanCreateRequest(BaseModel):
target: HttpUrl
scan_mode: Literal["guided", "intelligence", "conversational", "automated"]
selected_tests: Optional[list[str]] = None
options: Optional[dict] = None
class ScanStatusResponse(BaseModel):
scan_id: str
status: Literal["pending", "running", "completed", "failed"]
progress: int = Field(ge=0, le=100)
current_step: Optional[str] = None
created_at: datetime
estimated_completion: Optional[datetime] = None
這是系統的「大腦」,負責智能決策與工作流調度。
class LLMOrchestrator:
"""
LLM 協調核心 - 系統智能的中樞
"""
def __init__(self, llm_client, knowledge_base, sandbox_manager):
self.llm_client = llm_client # OpenAI / Anthropic 客戶端
self.kb = knowledge_base # 知識庫存取
self.sandbox = sandbox_manager
async def process_scan_request(self, request: ScanRequest) -> ScanPlan:
"""
根據掃描請求生成執行計畫
"""
# 1. 意圖理解
intent = await self._understand_intent(request)
# 2. CVE 情資查詢(模式 B)
if request.mode == "intelligence":
cve_suggestions = await self._query_cve_database(request.target)
intent.enrich_with_cve(cve_suggestions)
# 3. 工具選擇
tools = await self._select_tools(intent)
# 4. 生成執行計畫
plan = ScanPlan(intent=intent, tools=tools)
return plan
async def generate_script(self, task: Task) -> ScriptArtifact:
"""
針對特定任務生成 Python 腳本
"""
prompt = self._build_script_generation_prompt(task)
response = await self.llm_client.complete(prompt)
# 腳本驗證
script = self._extract_code_from_response(response)
if not self._validate_script_safety(script):
raise SecurityError("Generated script contains unsafe operations")
return ScriptArtifact(code=script, task=task)
async def interpret_results(self, raw_output: str, context: Context) -> Analysis:
"""
解釋掃描結果
"""
prompt = f"""
As a senior cybersecurity analyst, interpret the following scan results:
Context:
- Target: {context.target}
- Tool used: {context.tool}
- Goal: {context.goal}
Raw Output:
{raw_output}
Provide:
1. Executive summary
2. Detailed findings with severity
3. Remediation recommendations
4. Educational explanation
"""
response = await self.llm_client.complete(prompt)
return Analysis.from_llm_response(response)
系統維護一個 Prompt 模板庫,針對不同工具與任務:
模板範例:nmap 版本探測
NMAP_VERSION_DETECTION_PROMPT = """
As a senior penetration tester, write a Python script to perform service version detection on the target {target}.
Requirements:
1. Use the 'subprocess' module to call nmap
2. Scan ports: {ports}
3. Use the -sV flag for version detection
4. Output results in XML format using -oX -
5. Parse the XML and print a JSON structure with:
- port (int)
- protocol (str)
- service_name (str)
- service_version (str)
6. Handle errors gracefully
7. Set a timeout of 300 seconds
IMPORTANT SECURITY RULES:
- Do NOT execute shell commands directly with shell=True
- Do NOT accept user input without validation
- Use only the nmap binary at /usr/bin/nmap
Example output format:
[
{{"port": 80, "protocol": "tcp", "service_name": "http", "service_version": "Apache httpd 2.4.41"}}
]
"""
模板範例:CVE 專用漏洞驗證
CVE_VERIFICATION_PROMPT = """
As a senior penetration tester, write a Python script to test if the target {target} is vulnerable to {cve_id}.
CVE Details:
{cve_description}
Verification Method:
{verification_method}
Requirements:
1. Use the 'requests' library for HTTP interactions
2. Implement the exact verification steps described above
3. Print "VULNERABLE" if the target is confirmed vulnerable
4. Print "NOT_VULNERABLE" if the target appears patched
5. Print "UNCERTAIN" if the test is inconclusive
6. Include evidence in JSON format
7. Set appropriate timeouts and error handling
IMPORTANT:
- This is authorized security testing
- Do NOT cause damage or disruption
- Use minimal payloads necessary for verification
"""
Zero-Trust 原則:假設生成的腳本可能包含惡意程式碼或錯誤。
Dockerfile
FROM python:3.11-slim
# 安裝安全工具
RUN apt-get update && apt-get install -y \
nmap \
curl \
dnsutils \
nikto \
&& rm -rf /var/lib/apt/lists/*
# 安裝 Python 函式庫
RUN pip install --no-cache-dir \
requests \
beautifulsoup4 \
python-nmap \
lxml
# 建立非 root 使用者
RUN useradd -m -u 1000 scanner
USER scanner
WORKDIR /workspace
# 限制寫入權限
RUN mkdir /workspace/output && chmod 700 /workspace/output
ENTRYPOINT ["python", "/workspace/script.py"]
Docker Run 參數
docker run \
--rm \ # 執行後自動刪除
--network=scan_net \ # 隔離網路
--cpus="1.0" \ # CPU 限制
--memory="512m" \ # 記憶體限制
--memory-swap="512m" \
--pids-limit=100 \ # 行程數限制
--read-only \ # 唯讀檔案系統
--tmpfs /tmp:rw,noexec,nosuid \ # 臨時檔案系統
--security-opt=no-new-privileges \
--cap-drop=ALL \ # 移除所有能力
--cap-add=NET_RAW \ # 僅添加必要能力(nmap)
-v $(pwd)/script.py:/workspace/script.py:ro \
-v $(pwd)/output:/workspace/output:rw \
scanner-sandbox:latest
import docker
import asyncio
from typing import Optional
class SandboxManager:
def __init__(self):
self.client = docker.from_env()
self.max_concurrent = 5
self.semaphore = asyncio.Semaphore(self.max_concurrent)
async def execute_script(
self,
script: str,
timeout: int = 300
) -> ExecutionResult:
"""
在隔離沙箱中執行腳本
"""
async with self.semaphore: # 限制並行數
container = None
try:
# 準備腳本檔案
script_path = self._prepare_script_file(script)
# 啟動容器
container = self.client.containers.run(
image="scanner-sandbox:latest",
volumes={
script_path: {"bind": "/workspace/script.py", "mode": "ro"}
},
network_mode="scan_net",
mem_limit="512m",
cpu_period=100000,
cpu_quota=100000, # 1 CPU
detach=True,
remove=False # 手動刪除以捕獲日誌
)
# 等待執行完成(含 timeout)
result = await asyncio.wait_for(
self._wait_container(container),
timeout=timeout
)
# 收集輸出
logs = container.logs(stdout=True, stderr=True).decode('utf-8')
return ExecutionResult(
exit_code=result,
stdout=logs,
stderr="",
execution_time=self._get_execution_time(container)
)
except asyncio.TimeoutError:
if container:
container.kill()
raise ExecutionTimeoutError(f"Script execution exceeded {timeout}s")
finally:
if container:
container.remove(force=True)
self._cleanup_script_file(script_path)
async def _wait_container(self, container):
"""異步等待容器完成"""
while True:
container.reload()
if container.status == "exited":
return container.attrs["State"]["ExitCode"]
await asyncio.sleep(1)
使用 Docker 自訂網路,並透過 iptables 規則限制:
# 建立隔離網路
docker network create \
--driver bridge \
--subnet 172.20.0.0/16 \
--opt "com.docker.network.bridge.enable_icc=false" \
scan_net
# iptables 規則(僅允許外部掃描,禁止存取內部網路)
iptables -A DOCKER-USER -s 172.20.0.0/16 -d 10.0.0.0/8 -j DROP
iptables -A DOCKER-USER -s 172.20.0.0/16 -d 172.16.0.0/12 -j DROP
iptables -A DOCKER-USER -s 172.20.0.0/16 -d 192.168.0.0/16 -j DROP
負責將各種工具的原始輸出解析為統一的 JSON 結構。
from abc import ABC, abstractmethod
import json
import xml.etree.ElementTree as ET
class OutputParser(ABC):
"""解析器基類"""
@abstractmethod
def parse(self, raw_output: str) -> list[Finding]:
pass
class NmapXMLParser(OutputParser):
"""nmap XML 輸出解析器"""
def parse(self, raw_output: str) -> list[Finding]:
root = ET.fromstring(raw_output)
findings = []
for host in root.findall('.//host'):
for port in host.findall('.//port'):
finding = Finding(
severity=self._assess_severity(port),
title=f"Open port detected: {port.get('portid')}",
description=self._build_description(port),
evidence={
"port": int(port.get('portid')),
"protocol": port.get('protocol'),
"service_name": port.find('service').get('name'),
"service_version": port.find('service').get('version')
}
)
findings.append(finding)
return findings
class NiktoParser(OutputParser):
"""Nikto 輸出解析器"""
def parse(self, raw_output: str) -> list[Finding]:
# Nikto 輸出為純文字,使用正則提取
import re
findings = []
pattern = r'\+ ([^:]+): (.+)'
for match in re.finditer(pattern, raw_output):
finding = Finding(
severity="Medium", # Nikto 預設為 Medium
title=match.group(1),
description=match.group(2),
evidence={"raw_line": match.group(0)}
)
findings.append(finding)
return findings
class ParserFactory:
"""解析器工廠"""
_parsers = {
"nmap": NmapXMLParser(),
"nikto": NiktoParser(),
# ... 其他工具
}
@classmethod
def get_parser(cls, tool_name: str) -> OutputParser:
return cls._parsers.get(tool_name.lower())
PostgreSQL 15+ with JSONB support
詳見 proposal_core_values.html 中的資料表結構。
資料來源:
- NVD (National Vulnerability Database)
- MITRE CVE List
- Exploit-DB
更新策略:
- 每日自動同步最新 CVE
- 使用 CVE JSON Feed API
查詢優化:
-- 索引設計
CREATE INDEX idx_cve_cpe ON cve_database USING GIN (affected_cpe jsonb_path_ops);
CREATE INDEX idx_cve_severity ON cve_database (severity);
CREATE INDEX idx_cve_published ON cve_database (published_date DESC);
-- 查詢範例:找出影響 Apache 2.4.49 的 CVE
SELECT cve_id, description, severity, cvss_score
FROM cve_database
WHERE affected_cpe @> '["cpe:2.3:a:apache:http_server:2.4.49"]'::jsonb
ORDER BY cvss_score DESC;
| 類別 | 工具 | 用途 |
|---|---|---|
| 資訊收集 | nmap | 連接埠掃描、版本探測 |
| curl | HTTP 請求與 Header 分析 | |
| whois | 域名資訊查詢 | |
| dig | DNS 記錄查詢 | |
| Web 掃描 | Nikto | Web 伺服器漏洞掃描 |
| GoBuster | 目錄與檔案枚舉 | |
| WhatWeb | CMS 識別 | |
| 漏洞掃描 | SQLMap | SQL Injection 檢測 |
| WPScan | WordPress 專用掃描 | |
| SSLyze | SSL/TLS 配置檢測 | |
| Python 庫 | requests | HTTP 客戶端 |
| beautifulsoup4 | HTML 解析 | |
| python-nmap | nmap Python 介面 |
為每個工具建立標準化包裝器:
class ToolWrapper(ABC):
@abstractmethod
def build_command(self, params: dict) -> list[str]:
"""建構命令列參數"""
pass
@abstractmethod
def parse_output(self, output: str) -> dict:
"""解析工具輸出"""
pass
class NmapWrapper(ToolWrapper):
def build_command(self, params: dict) -> list[str]:
cmd = ["/usr/bin/nmap"]
if params.get("version_detection"):
cmd.append("-sV")
if params.get("ports"):
cmd.extend(["-p", params["ports"]])
cmd.extend(["-oX", "-"]) # XML 輸出到 stdout
cmd.append(params["target"])
return cmd
def parse_output(self, output: str) -> dict:
parser = NmapXMLParser()
findings = parser.parse(output)
return {"findings": [f.dict() for f in findings]}
[使用者] 提交掃描請求
↓
[API Gateway] 驗證授權、速率限制
↓
[Backend Service] 創建 Scan 記錄 (status=pending)
↓
[Task Queue] 將任務加入 Redis 佇列
↓
[LLM Orchestrator] 從佇列取出任務
↓
[LLM API] 生成檢測腳本
↓
[LLM Orchestrator] 驗證腳本安全性
↓
[Sandbox Manager] 在 Docker 容器中執行腳本
↓
[Tool Integration Layer] 工具執行並返回原始輸出
↓
[Data Transformer] 解析為結構化 JSON
↓
[LLM Orchestrator] 再次調用 LLM 解釋結果
↓
[Backend Service] 更新 Scan 記錄 (status=completed)
↓
[Frontend] 輪詢獲取結果並展示
class ScanOrchestrator:
async def execute_scan(self, scan_id: str):
try:
# 更新狀態為 running
await self.db.update_scan_status(scan_id, "running")
# 生成執行計畫
plan = await self.llm.process_scan_request(scan_id)
# 執行各個任務
for task in plan.tasks:
try:
result = await self._execute_task(task)
await self.db.save_task_result(scan_id, result)
except TaskExecutionError as e:
# 單一任務失敗,記錄但繼續
await self.db.log_task_error(scan_id, task.id, str(e))
continue
# 生成最終報告
await self._generate_report(scan_id)
# 更新狀態為 completed
await self.db.update_scan_status(scan_id, "completed")
except Exception as e:
await self.db.update_scan_status(scan_id, "failed")
await self.db.log_error(scan_id, str(e))
raise
# Redis 快取層
cache_config = {
"cve_data": 3600, # CVE 資料快取 1 小時
"llm_prompts": 86400, # Prompt 模板快取 24 小時
"scan_results": 1800 # 掃描結果快取 30 分鐘
}
| 指標 | 目標值 |
|---|---|
| API 回應時間 (P95) | < 200ms |
| 掃描啟動延遲 | < 5s |
| 單次工具執行時間 | < 5min |
| 完整自動化掃描時間 | < 30min |
| 系統並行掃描數 | > 50 |
本文件最後更新:2025-10-21
返回 主文件