工作流程與技術細節

返回 主文件

本文件詳細說明系統的核心工作流程、技術實作細節、Prompt 工程策略、以及資料處理流程。


1. 核心工作流程

1.1 完整流程圖

┌─────────────────────────────────────────────────────────┐
│  階段一:請求接收與驗證                                  │
└─────────────┬───────────────────────────────────────────┘
              │
              ▼
┌─────────────────────────────────────────────────────────┐
│  1. 使用者提交掃描請求 (透過 UI 或 API)                  │
│  2. API Gateway 進行:                                   │
│     • 使用者認證 (JWT Token 驗證)                        │
│     • 授權檢查 (是否有權限掃描此目標)                    │
│     • 速率限制 (防止濫用)                                │
│  3. 輸入驗證:                                           │
│     • 目標 URL 格式驗證                                  │
│     • 黑名單檢查 (RFC 1918 私有 IP、localhost 等)        │
│     • 參數清理 (防止注入攻擊)                            │
└─────────────┬───────────────────────────────────────────┘
              │
              ▼
┌─────────────────────────────────────────────────────────┐
│  階段二:任務規劃與準備                                  │
└─────────────┬───────────────────────────────────────────┘
              │
              ▼
┌─────────────────────────────────────────────────────────┐
│  4. Backend Service 創建 Scan 記錄                       │
│     • 生成 scan_id (UUID)                                │
│     • 初始狀態:pending                                  │
│     • 記錄請求參數與時間戳                               │
│                                                          │
│  5. 將任務加入 Redis 任務佇列                            │
│     • 佇列名稱:scan_tasks                               │
│     • 優先級:Critical > High > Normal                   │
│                                                          │
│  6. 返回 scan_id 給前端                                  │
│     • 前端開始輪詢狀態 (每 3 秒)                         │
└─────────────┬───────────────────────────────────────────┘
              │
              ▼
┌─────────────────────────────────────────────────────────┐
│  階段三:LLM 協調與腳本生成                              │
└─────────────┬───────────────────────────────────────────┘
              │
              ▼
┌─────────────────────────────────────────────────────────┐
│  7. LLM Orchestrator Worker 從佇列取出任務               │
│     • 更新掃描狀態:pending → running                    │
│                                                          │
│  8. 意圖理解與任務分解                                   │
│     • 解析使用者選擇的模式與檢測項目                     │
│     • 若為情資驅動模式:執行快速偵察 + CVE 查詢          │
│     • 若為對話模式:基於對話歷史決定下一步               │
│     • 若為自動化模式:載入預定義策略                     │
│                                                          │
│  9. 建構 LLM Prompt                                      │
│     • 從 Prompt 模板庫載入對應模板                       │
│     • 填充變數 (target, ports, cve_id 等)                │
│     • 添加安全約束規則                                   │
│                                                          │
│  10. 調用 LLM API 生成腳本                               │
│      • 模型:GPT-4 / Claude 3.5 Sonnet                   │
│      • Temperature: 0.2 (追求穩定性)                     │
│      • 重試機制:最多 3 次                               │
│                                                          │
│  11. 腳本安全驗證                                        │
│      • 靜態分析:檢查是否包含危險函數                    │
│      • 語法檢查:ast.parse() 驗證                        │
│      • 黑名單檢查:禁止 eval, exec, __import__ 等        │
└─────────────┬───────────────────────────────────────────┘
              │
              ▼
┌─────────────────────────────────────────────────────────┐
│  階段四:沙箱執行                                        │
└─────────────┬───────────────────────────────────────────┘
              │
              ▼
┌─────────────────────────────────────────────────────────┐
│  12. Sandbox Manager 準備執行環境                        │
│      • 將腳本寫入臨時檔案                                │
│      • 配置 Docker 容器參數                              │
│                                                          │
│  13. 啟動隔離容器                                        │
│      • 鏡像:scanner-sandbox:latest                      │
│      • 資源限制:1 CPU, 512MB RAM                        │
│      • 網路:scan_net (隔離網路)                         │
│      • 超時:300 秒                                      │
│                                                          │
│  14. 執行腳本並收集輸出                                  │
│      • 捕獲 stdout 和 stderr                             │
│      • 記錄執行時間                                      │
│      • 處理異常與超時                                    │
│                                                          │
│  15. 清理資源                                            │
│      • 強制終止並刪除容器                                │
│      • 清除臨時檔案                                      │
└─────────────┬───────────────────────────────────────────┘
              │
              ▼
┌─────────────────────────────────────────────────────────┐
│  階段五:資料轉換與解析                                  │
└─────────────┬───────────────────────────────────────────┘
              │
              ▼
┌─────────────────────────────────────────────────────────┐
│  16. Data Transformer 解析原始輸出                       │
│      • 根據工具類型選擇對應解析器                        │
│      • nmap: XML 解析                                    │
│      • nikto: 正則表達式提取                             │
│      • custom_script: JSON 直接解析                      │
│                                                          │
│  17. 轉換為標準化 JSON 結構                              │
│      • 符合 ScanResult Schema                            │
│      • 包含 findings 陣列                                │
│      • 每個 finding 包含:severity, title, description   │
│                                                          │
│  18. 儲存結構化結果到資料庫                              │
│      • 表格:scan_results                                │
│      • 欄位:raw_output (原始輸出) + structured_findings │
└─────────────┬───────────────────────────────────────────┘
              │
              ▼
┌─────────────────────────────────────────────────────────┐
│  階段六:LLM 解釋與報告生成                              │
└─────────────┬───────────────────────────────────────────┘
              │
              ▼
┌─────────────────────────────────────────────────────────┐
│  19. 將結構化結果再次發送給 LLM                          │
│      • Prompt: "As a cybersecurity analyst, interpret..." │
│      • 提供上下文:目標、工具、原始目標                  │
│                                                          │
│  20. LLM 生成人類可讀的分析                              │
│      • Executive Summary (執行摘要)                      │
│      • Detailed Findings (詳細發現)                      │
│      • Risk Assessment (風險評估)                        │
│      • Remediation Steps (修復建議)                      │
│      • Educational Insights (學習要點)                   │
│                                                          │
│  21. 儲存 LLM 分析到資料庫                               │
│      • 欄位:llm_analysis (JSONB)                        │
└─────────────┬───────────────────────────────────────────┘
              │
              ▼
┌─────────────────────────────────────────────────────────┐
│  階段七:完成與通知                                      │
└─────────────┬───────────────────────────────────────────┘
              │
              ▼
┌─────────────────────────────────────────────────────────┐
│  22. 更新掃描狀態                                        │
│      • 狀態:running → completed                         │
│      • 記錄 completed_at 時間戳                          │
│      • 統計 findings 數量與嚴重性分佈                    │
│                                                          │
│  23. 觸發通知 (如果啟用)                                 │
│      • Email 通知 (Critical 漏洞)                        │
│      • Webhook 調用 (整合第三方系統)                     │
│      • Slack 訊息                                        │
│                                                          │
│  24. 前端獲取完整結果                                    │
│      • API: GET /api/v1/scans/{scan_id}/results          │
│      • 渲染報告頁面                                      │
└─────────────┴───────────────────────────────────────────┘

2. Prompt 工程策略

2.1 Prompt 設計原則

本系統的 Prompt 設計遵循以下原則:

  1. 角色定義清晰:明確告知 LLM 扮演的角色(資深滲透測試專家)
  2. 任務具體化:詳細說明要生成的程式碼類型與格式
  3. 安全約束前置:將安全規則放在 Prompt 開頭,強調其重要性
  4. 範例導向:提供預期輸出格式的範例
  5. 上下文豐富:提供足夠的背景資訊(目標、工具能力、限制)

2.2 核心 Prompt 模板

模板類型一:通用工具腳本生成

GENERIC_TOOL_SCRIPT_PROMPT = """
ROLE: You are a senior penetration tester with 10+ years of experience in web application security testing.

TASK: Write a Python script to {task_description} for the target {target}.

SECURITY RULES (CRITICAL - MUST FOLLOW):
1. This is authorized security testing with explicit permission
2. Do NOT cause damage, disruption, or data corruption
3. Use minimal payloads necessary for detection only
4. Do NOT execute shell commands with shell=True
5. Do NOT use eval(), exec(), or __import__()
6. Implement proper error handling and timeouts
7. Respect rate limits to avoid overwhelming the target

TECHNICAL REQUIREMENTS:
1. Use only the following approved libraries: {allowed_libraries}
2. Use the tool binary at: {tool_path}
3. Output format: {output_format}
4. Timeout: {timeout} seconds
5. Parse the tool output and convert to JSON structure
6. Handle all exceptions gracefully

TOOL-SPECIFIC PARAMETERS:
{tool_parameters}

EXPECTED OUTPUT STRUCTURE:
```json
{expected_output_schema}

IMPORTANT NOTES:
- Validate all inputs before use
- Log all actions for audit purposes
- If uncertain, fail safely and report the issue

Now, generate the complete Python script. Only output the code, no additional explanation.
"""

**使用範例**:

```python
prompt = GENERIC_TOOL_SCRIPT_PROMPT.format(
    task_description="scan open ports and detect service versions",
    target="example.com",
    allowed_libraries="subprocess, json, sys",
    tool_path="/usr/bin/nmap",
    output_format="XML (using -oX - flag)",
    timeout=300,
    tool_parameters="""
    - Scan ports: 80, 443, 8080, 8443
    - Enable version detection: -sV flag
    - Set timing template: -T4
    - Disable DNS resolution: -n flag
    """,
    expected_output_schema=json.dumps({
        "scan_time": "ISO-8601 timestamp",
        "target": "string",
        "ports": [
            {
                "port": "integer",
                "protocol": "string",
                "service": "string",
                "version": "string"
            }
        ]
    }, indent=2)
)

模板類型二:CVE 專用驗證腳本

CVE_VERIFICATION_PROMPT = """
ROLE: You are a senior penetration tester specializing in vulnerability verification.

OBJECTIVE: Write a Python script to safely verify if {target} is vulnerable to {cve_id}.

CVE INFORMATION:
- CVE ID: {cve_id}
- Severity: {severity} (CVSS: {cvss_score})
- Affected Component: {affected_component}
- Vulnerability Type: {vulnerability_type}
- Description: {cve_description}

VERIFICATION METHODOLOGY:
{detection_method}

CRITICAL SAFETY RULES:
1. This is DETECTION only, NOT exploitation
2. Use the minimal payload necessary to confirm vulnerability
3. Do NOT attempt to extract sensitive data
4. Do NOT modify or delete any data on the target
5. Implement request throttling (1 request per second)
6. Set timeout to 5 seconds per request
7. If target returns 429 or 503, stop immediately

TECHNICAL REQUIREMENTS:
1. Use the 'requests' library for HTTP interactions
2. Implement User-Agent header: "Security-Scanner/1.0"
3. Verify SSL certificates (verify=True)
4. Follow redirects (allow_redirects=True)
5. Maximum of 3 verification attempts

OUTPUT FORMAT:
Print a JSON object to stdout with exactly this structure:
```json
{{
  "status": "VULNERABLE" | "NOT_VULNERABLE" | "UNCERTAIN",
  "confidence": 0.0 to 1.0,
  "evidence": {{
    // Specific evidence data
  }},
  "raw_response": {{
    "status_code": 200,
    "headers": {{}},
    "body_snippet": "First 500 chars"
  }}
}}

EXPECTED BEHAVIOR:
- Print "VULNERABLE" if the target exhibits the exact vulnerable behavior described
- Print "NOT_VULNERABLE" if the target is clearly patched or unaffected
- Print "UNCERTAIN" if the test is inconclusive

Now, generate the complete Python script. Output code only, no explanations.
"""

#### 模板類型三:結果解釋

```python
RESULT_INTERPRETATION_PROMPT = """
ROLE: You are a senior cybersecurity analyst and educator with expertise in translating technical security findings into actionable insights.

CONTEXT:
- Target: {target}
- Scan Type: {scan_type}
- Tool Used: {tool_name}
- User Level: {user_level} (beginner / intermediate / advanced)

RAW SCAN OUTPUT:

{raw_output}

STRUCTURED FINDINGS:
```json
{structured_findings}

YOUR TASK:
Provide a comprehensive analysis with the following sections:

  1. EXECUTIVE SUMMARY (2-3 sentences)
  2. Overall security posture
  3. Number and severity of findings
  4. Immediate actions required

  5. DETAILED FINDINGS
    For each finding:

  6. What was discovered
  7. Why it matters (risk/impact)
  8. How it can be exploited (educational explanation)
  9. Severity justification

  10. RISK ASSESSMENT

  11. Priority ranking of issues
  12. Business impact analysis
  13. Likelihood of exploitation

  14. REMEDIATION RECOMMENDATIONS
    For each finding, provide:

  15. Specific fix steps
  16. Code examples (if applicable)
  17. Configuration changes
  18. Testing procedures to verify fix

  19. EDUCATIONAL INSIGHTS (for {user_level} level)

  20. Key concepts demonstrated
  21. Learning resources
  22. Related vulnerabilities to explore

TONE AND STYLE:
- Clear and professional
- Educational but not condescending
- Use analogies for complex concepts
- Provide examples and references

OUTPUT FORMAT: Markdown with clear section headers

Begin your analysis:
"""

### 2.3 對話模式的 System Prompt

```python
CONVERSATIONAL_SYSTEM_PROMPT = """
You are an experienced cybersecurity mentor and penetration testing expert. Your role is to guide users through web security testing in an interactive, educational manner.

YOUR TEACHING PHILOSOPHY:
- Ask questions to understand the user's current knowledge level
- Explain concepts before diving into technical details
- Use the Socratic method to encourage critical thinking
- Provide context for why each step is important
- Celebrate learning moments and progress

YOUR CAPABILITIES:
- Generate custom security testing scripts
- Analyze scan results and explain findings
- Recommend appropriate tools for different scenarios
- Connect abstract security concepts to concrete tests
- Adapt explanations to the user's level

BEHAVIORAL GUIDELINES:
1. ALWAYS explain the reasoning behind your suggestions
2. NEVER execute a scan without the user's explicit confirmation
3. When proposing a script, show the code and explain what it does
4. If the user seems confused, ask clarifying questions
5. Offer multiple paths: "quick scan" vs "deep dive learning"
6. Point out learning opportunities in the results
7. Connect findings back to theoretical concepts

SAFETY PROTOCOLS:
- Remind users that testing requires authorization
- Warn about potentially disruptive tests before running them
- Explain the ethical implications of security testing
- Encourage responsible disclosure practices

CONVERSATION STRUCTURE:
- Greet users warmly and assess their goals
- Break complex tasks into digestible steps
- Provide progress updates during scans
- Celebrate discoveries and learning moments
- Summarize key takeaways at the end

EXAMPLE INTERACTIONS:

User: "I want to test my website"
You: "Great! Let's start by understanding what you're looking for. Are you concerned about specific vulnerabilities, or would you like a comprehensive assessment? Also, can you confirm you own this website or have explicit permission to test it?"

User: "Just run a full scan"
You: "I can definitely do that, but let me first explain what a 'full scan' involves so you know what to expect. It typically includes [list components]. This can take 30-60 minutes and may generate significant traffic. Sound good? Or would you prefer to start with something lighter while I explain each step?"

Now, engage with the user based on their messages. Be friendly, educational, and security-conscious.
"""

3. 工具整合實作細節

3.1 nmap 整合

包裝器實作

from typing import Optional, Dict, List
import subprocess
import xml.etree.ElementTree as ET
from dataclasses import dataclass

@dataclass
class NmapPort:
    port: int
    protocol: str
    state: str
    service_name: str
    service_version: Optional[str]
    service_product: Optional[str]

class NmapWrapper:
    BINARY_PATH = "/usr/bin/nmap"

    def __init__(self):
        self.default_timeout = 300  # 5 minutes

    def build_command(
        self,
        target: str,
        ports: str = "80,443,8080,8443",
        scan_type: str = "version_detection",
        timing: str = "T4"
    ) -> List[str]:
        """
        建構 nmap 命令

        scan_type 選項:
        - port_scan: 基礎連接埠掃描
        - version_detection: 服務版本探測 (-sV)
        - os_detection: 作業系統探測 (-O)
        """
        cmd = [self.BINARY_PATH]

        # 掃描類型
        if scan_type == "version_detection":
            cmd.append("-sV")
        elif scan_type == "os_detection":
            cmd.extend(["-O", "--osscan-guess"])

        # 連接埠範圍
        cmd.extend(["-p", ports])

        # 時序模板
        cmd.append(f"-{timing}")

        # 停用反向 DNS 查詢(加速)
        cmd.append("-n")

        # XML 輸出到 stdout
        cmd.extend(["-oX", "-"])

        # 目標
        cmd.append(target)

        return cmd

    def execute(self, command: List[str]) -> str:
        """執行 nmap 命令並返回 XML 輸出"""
        try:
            result = subprocess.run(
                command,
                capture_output=True,
                text=True,
                timeout=self.default_timeout,
                check=True
            )
            return result.stdout

        except subprocess.TimeoutExpired:
            raise ToolExecutionError(f"nmap scan exceeded {self.default_timeout}s timeout")
        except subprocess.CalledProcessError as e:
            raise ToolExecutionError(f"nmap execution failed: {e.stderr}")

    def parse_xml_output(self, xml_output: str) -> Dict:
        """解析 nmap XML 輸出"""
        try:
            root = ET.fromstring(xml_output)

            # 提取掃描資訊
            scan_info = {
                "start_time": root.get("start"),
                "version": root.get("version"),
                "args": root.get("args")
            }

            # 提取主機資訊
            hosts = []
            for host in root.findall(".//host"):
                host_info = self._parse_host(host)
                if host_info:
                    hosts.append(host_info)

            return {
                "scan_info": scan_info,
                "hosts": hosts
            }

        except ET.ParseError as e:
            raise DataTransformError(f"Failed to parse nmap XML: {e}")

    def _parse_host(self, host_element) -> Optional[Dict]:
        """解析單個主機的資訊"""
        # 檢查主機是否在線
        status = host_element.find("status")
        if status is None or status.get("state") != "up":
            return None

        # IP 地址
        address_elem = host_element.find("address")
        ip = address_elem.get("addr") if address_elem is not None else "unknown"

        # 主機名稱(如果有)
        hostname_elem = host_element.find(".//hostname")
        hostname = hostname_elem.get("name") if hostname_elem is not None else None

        # 解析連接埠
        ports = []
        for port_elem in host_element.findall(".//port"):
            port_info = self._parse_port(port_elem)
            if port_info:
                ports.append(port_info)

        return {
            "ip": ip,
            "hostname": hostname,
            "ports": ports
        }

    def _parse_port(self, port_element) -> Optional[Dict]:
        """解析單個連接埠的資訊"""
        state_elem = port_element.find("state")
        if state_elem is None or state_elem.get("state") != "open":
            return None

        port_num = int(port_element.get("portid"))
        protocol = port_element.get("protocol")

        # 服務資訊
        service_elem = port_element.find("service")
        if service_elem is not None:
            service_name = service_elem.get("name", "unknown")
            service_product = service_elem.get("product")
            service_version = service_elem.get("version")
            service_extrainfo = service_elem.get("extrainfo")

            # 組合完整版本字串
            version_str = None
            if service_product:
                version_str = service_product
                if service_version:
                    version_str += f" {service_version}"
                if service_extrainfo:
                    version_str += f" ({service_extrainfo})"
        else:
            service_name = "unknown"
            version_str = None

        return {
            "port": port_num,
            "protocol": protocol,
            "service_name": service_name,
            "service_version": version_str
        }

    def convert_to_findings(self, parsed_data: Dict) -> List[Finding]:
        """將解析後的數據轉換為標準 Finding 格式"""
        findings = []

        for host in parsed_data["hosts"]:
            for port in host["ports"]:
                # 根據服務與版本評估嚴重性
                severity = self._assess_port_severity(port)

                finding = Finding(
                    severity=severity,
                    title=f"Open port detected: {port['port']}/{port['protocol']}",
                    description=self._build_port_description(port),
                    cve=None,  # 此階段還不知道 CVE
                    evidence={
                        "ip": host["ip"],
                        "hostname": host["hostname"],
                        "port": port["port"],
                        "protocol": port["protocol"],
                        "service_name": port["service_name"],
                        "service_version": port["service_version"]
                    }
                )
                findings.append(finding)

        return findings

    def _assess_port_severity(self, port: Dict) -> str:
        """評估連接埠開放的嚴重性"""
        # 常見高風險連接埠
        high_risk_ports = [23, 21, 3389, 22]  # Telnet, FTP, RDP, SSH

        if port["port"] in high_risk_ports:
            return "High"
        elif port["service_name"] in ["http", "https"]:
            return "Medium"
        else:
            return "Low"

    def _build_port_description(self, port: Dict) -> str:
        """建構連接埠發現的描述"""
        desc = f"Port {port['port']}/{port['protocol']} is open and running {port['service_name']}"

        if port["service_version"]:
            desc += f" (version: {port['service_version']})"

        desc += ". This port is accessible from the internet and may expose services to potential attackers."

        return desc

3.2 CVE 資料庫整合

NVD API 同步

import httpx
import asyncio
from datetime import datetime, timedelta
from typing import AsyncIterator

class NVDSyncService:
    """
    National Vulnerability Database 同步服務
    """
    BASE_URL = "https://services.nvd.nist.gov/rest/json/cves/2.0"

    def __init__(self, api_key: Optional[str] = None):
        self.api_key = api_key
        self.rate_limit_delay = 0.6 if api_key else 6  # 有 API key 可加速

    async def sync_recent_cves(self, days: int = 7) -> int:
        """同步最近 N 天的 CVE"""
        end_date = datetime.now()
        start_date = end_date - timedelta(days=days)

        count = 0
        async for cve in self._fetch_cves_by_date_range(start_date, end_date):
            await self._save_cve_to_database(cve)
            count += 1

        return count

    async def _fetch_cves_by_date_range(
        self,
        start_date: datetime,
        end_date: datetime
    ) -> AsyncIterator[Dict]:
        """依日期範圍獲取 CVE(分頁)"""
        async with httpx.AsyncClient() as client:
            start_index = 0
            results_per_page = 2000

            while True:
                params = {
                    "pubStartDate": start_date.isoformat(),
                    "pubEndDate": end_date.isoformat(),
                    "startIndex": start_index,
                    "resultsPerPage": results_per_page
                }

                if self.api_key:
                    headers = {"apiKey": self.api_key}
                else:
                    headers = {}

                response = await client.get(
                    self.BASE_URL,
                    params=params,
                    headers=headers,
                    timeout=30
                )

                response.raise_for_status()
                data = response.json()

                vulnerabilities = data.get("vulnerabilities", [])
                if not vulnerabilities:
                    break

                for vuln_data in vulnerabilities:
                    cve = self._parse_nvd_cve(vuln_data)
                    yield cve

                # 檢查是否還有更多結果
                total_results = data.get("totalResults", 0)
                if start_index + results_per_page >= total_results:
                    break

                start_index += results_per_page

                # 速率限制
                await asyncio.sleep(self.rate_limit_delay)

    def _parse_nvd_cve(self, vuln_data: Dict) -> Dict:
        """解析 NVD API 回應為內部格式"""
        cve_item = vuln_data["cve"]

        cve_id = cve_item["id"]

        # 描述
        descriptions = cve_item.get("descriptions", [])
        description = next(
            (d["value"] for d in descriptions if d["lang"] == "en"),
            "No description available"
        )

        # CVSS 評分
        metrics = vuln_data.get("cve", {}).get("metrics", {})
        cvss_data = (
            metrics.get("cvssMetricV31", [{}])[0]
            or metrics.get("cvssMetricV30", [{}])[0]
            or metrics.get("cvssMetricV2", [{}])[0]
        )

        cvss_score = cvss_data.get("cvssData", {}).get("baseScore", 0.0)
        severity = cvss_data.get("baseSeverity", "Unknown")

        # 受影響的 CPE(Common Platform Enumeration)
        configurations = vuln_data.get("cve", {}).get("configurations", [])
        affected_cpe = []
        for config in configurations:
            for node in config.get("nodes", []):
                for cpe_match in node.get("cpeMatch", []):
                    if cpe_match.get("vulnerable"):
                        affected_cpe.append(cpe_match.get("criteria"))

        # 發佈日期
        published = cve_item.get("published")

        # 參考連結
        references = [
            ref["url"] for ref in cve_item.get("references", [])
        ]

        return {
            "cve_id": cve_id,
            "description": description,
            "cvss_score": cvss_score,
            "severity": severity,
            "affected_cpe": affected_cpe,
            "published_date": published,
            "references": references,
            "raw_data": vuln_data  # 保存原始資料供未來使用
        }

    async def _save_cve_to_database(self, cve: Dict):
        """儲存 CVE 到資料庫"""
        query = """
        INSERT INTO cve_database (
            cve_id, description, cvss_score, severity,
            affected_cpe, published_date, references, raw_data
        ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
        ON CONFLICT (cve_id) DO UPDATE SET
            description = EXCLUDED.description,
            cvss_score = EXCLUDED.cvss_score,
            severity = EXCLUDED.severity,
            affected_cpe = EXCLUDED.affected_cpe,
            published_date = EXCLUDED.published_date,
            references = EXCLUDED.references,
            raw_data = EXCLUDED.raw_data,
            updated_at = NOW()
        """

        await db.execute(
            query,
            cve["cve_id"],
            cve["description"],
            cve["cvss_score"],
            cve["severity"],
            json.dumps(cve["affected_cpe"]),
            cve["published_date"],
            json.dumps(cve["references"]),
            json.dumps(cve["raw_data"])
        )

4. 錯誤處理與重試機制

4.1 分層錯誤處理

class SecurityScannerError(Exception):
    """基礎異常類別"""
    pass

class ValidationError(SecurityScannerError):
    """輸入驗證錯誤"""
    pass

class AuthorizationError(SecurityScannerError):
    """授權檢查失敗"""
    pass

class LLMError(SecurityScannerError):
    """LLM API 相關錯誤"""
    pass

class ToolExecutionError(SecurityScannerError):
    """工具執行錯誤"""
    pass

class SandboxError(SecurityScannerError):
    """沙箱執行錯誤"""
    pass

class DataTransformError(SecurityScannerError):
    """數據轉換錯誤"""
    pass

4.2 重試裝飾器

import functools
import asyncio
from typing import Callable, TypeVar, Any

T = TypeVar('T')

def retry_with_backoff(
    max_attempts: int = 3,
    initial_delay: float = 1.0,
    backoff_factor: float = 2.0,
    exceptions: tuple = (Exception,)
):
    """
    指數退避重試裝飾器

    Args:
        max_attempts: 最大嘗試次數
        initial_delay: 初始延遲(秒)
        backoff_factor: 退避倍數
        exceptions: 需要重試的異常類型
    """
    def decorator(func: Callable[..., T]) -> Callable[..., T]:
        @functools.wraps(func)
        async def wrapper(*args, **kwargs) -> T:
            delay = initial_delay
            last_exception = None

            for attempt in range(1, max_attempts + 1):
                try:
                    return await func(*args, **kwargs)
                except exceptions as e:
                    last_exception = e

                    if attempt == max_attempts:
                        # 最後一次嘗試失敗,拋出異常
                        raise

                    # 記錄重試
                    logger.warning(
                        f"{func.__name__} attempt {attempt}/{max_attempts} failed: {e}. "
                        f"Retrying in {delay}s..."
                    )

                    await asyncio.sleep(delay)
                    delay *= backoff_factor

            # 理論上不會執行到這裡
            raise last_exception

        return wrapper
    return decorator

# 使用範例
@retry_with_backoff(
    max_attempts=3,
    initial_delay=1.0,
    exceptions=(httpx.HTTPError, LLMError)
)
async def call_llm_api(prompt: str) -> str:
    """調用 LLM API(含重試機制)"""
    response = await llm_client.complete(prompt)
    return response

5. 監控與日誌

5.1 結構化日誌

import structlog
from datetime import datetime

# 配置 structlog
structlog.configure(
    processors=[
        structlog.stdlib.add_log_level,
        structlog.stdlib.add_logger_name,
        structlog.processors.TimeStamper(fmt="iso"),
        structlog.processors.JSONRenderer()
    ]
)

logger = structlog.get_logger()

# 使用範例
async def execute_scan(scan_id: str, target: str):
    log = logger.bind(
        scan_id=scan_id,
        target=target,
        component="scan_executor"
    )

    log.info("scan_started")

    try:
        result = await perform_scan()
        log.info("scan_completed",
                 duration_ms=result.duration,
                 findings_count=len(result.findings))
    except Exception as e:
        log.error("scan_failed",
                  error_type=type(e).__name__,
                  error_message=str(e))
        raise

5.2 性能指標收集

from prometheus_client import Counter, Histogram, Gauge

# 定義指標
scan_requests_total = Counter(
    'scan_requests_total',
    'Total number of scan requests',
    ['scan_mode', 'status']
)

scan_duration_seconds = Histogram(
    'scan_duration_seconds',
    'Scan execution duration in seconds',
    ['scan_mode', 'tool']
)

active_scans = Gauge(
    'active_scans',
    'Number of currently running scans'
)

llm_api_calls_total = Counter(
    'llm_api_calls_total',
    'Total LLM API calls',
    ['purpose', 'model']
)

# 使用範例
async def execute_scan_with_metrics(scan_request: ScanRequest):
    scan_requests_total.labels(
        scan_mode=scan_request.mode,
        status='started'
    ).inc()

    active_scans.inc()

    start_time = time.time()

    try:
        result = await perform_scan(scan_request)

        scan_requests_total.labels(
            scan_mode=scan_request.mode,
            status='completed'
        ).inc()

        return result
    except Exception:
        scan_requests_total.labels(
            scan_mode=scan_request.mode,
            status='failed'
        ).inc()
        raise
    finally:
        duration = time.time() - start_time
        scan_duration_seconds.labels(
            scan_mode=scan_request.mode,
            tool='overall'
        ).observe(duration)

        active_scans.dec()

本文件最後更新:2025-10-21

返回 主文件