返回 主文件
本文件詳細說明系統的核心工作流程、技術實作細節、Prompt 工程策略、以及資料處理流程。
┌─────────────────────────────────────────────────────────┐
│ 階段一:請求接收與驗證 │
└─────────────┬───────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ 1. 使用者提交掃描請求 (透過 UI 或 API) │
│ 2. API Gateway 進行: │
│ • 使用者認證 (JWT Token 驗證) │
│ • 授權檢查 (是否有權限掃描此目標) │
│ • 速率限制 (防止濫用) │
│ 3. 輸入驗證: │
│ • 目標 URL 格式驗證 │
│ • 黑名單檢查 (RFC 1918 私有 IP、localhost 等) │
│ • 參數清理 (防止注入攻擊) │
└─────────────┬───────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ 階段二:任務規劃與準備 │
└─────────────┬───────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ 4. Backend Service 創建 Scan 記錄 │
│ • 生成 scan_id (UUID) │
│ • 初始狀態:pending │
│ • 記錄請求參數與時間戳 │
│ │
│ 5. 將任務加入 Redis 任務佇列 │
│ • 佇列名稱:scan_tasks │
│ • 優先級:Critical > High > Normal │
│ │
│ 6. 返回 scan_id 給前端 │
│ • 前端開始輪詢狀態 (每 3 秒) │
└─────────────┬───────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ 階段三:LLM 協調與腳本生成 │
└─────────────┬───────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ 7. LLM Orchestrator Worker 從佇列取出任務 │
│ • 更新掃描狀態:pending → running │
│ │
│ 8. 意圖理解與任務分解 │
│ • 解析使用者選擇的模式與檢測項目 │
│ • 若為情資驅動模式:執行快速偵察 + CVE 查詢 │
│ • 若為對話模式:基於對話歷史決定下一步 │
│ • 若為自動化模式:載入預定義策略 │
│ │
│ 9. 建構 LLM Prompt │
│ • 從 Prompt 模板庫載入對應模板 │
│ • 填充變數 (target, ports, cve_id 等) │
│ • 添加安全約束規則 │
│ │
│ 10. 調用 LLM API 生成腳本 │
│ • 模型:GPT-4 / Claude 3.5 Sonnet │
│ • Temperature: 0.2 (追求穩定性) │
│ • 重試機制:最多 3 次 │
│ │
│ 11. 腳本安全驗證 │
│ • 靜態分析:檢查是否包含危險函數 │
│ • 語法檢查:ast.parse() 驗證 │
│ • 黑名單檢查:禁止 eval, exec, __import__ 等 │
└─────────────┬───────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ 階段四:沙箱執行 │
└─────────────┬───────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ 12. Sandbox Manager 準備執行環境 │
│ • 將腳本寫入臨時檔案 │
│ • 配置 Docker 容器參數 │
│ │
│ 13. 啟動隔離容器 │
│ • 鏡像:scanner-sandbox:latest │
│ • 資源限制:1 CPU, 512MB RAM │
│ • 網路:scan_net (隔離網路) │
│ • 超時:300 秒 │
│ │
│ 14. 執行腳本並收集輸出 │
│ • 捕獲 stdout 和 stderr │
│ • 記錄執行時間 │
│ • 處理異常與超時 │
│ │
│ 15. 清理資源 │
│ • 強制終止並刪除容器 │
│ • 清除臨時檔案 │
└─────────────┬───────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ 階段五:資料轉換與解析 │
└─────────────┬───────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ 16. Data Transformer 解析原始輸出 │
│ • 根據工具類型選擇對應解析器 │
│ • nmap: XML 解析 │
│ • nikto: 正則表達式提取 │
│ • custom_script: JSON 直接解析 │
│ │
│ 17. 轉換為標準化 JSON 結構 │
│ • 符合 ScanResult Schema │
│ • 包含 findings 陣列 │
│ • 每個 finding 包含:severity, title, description │
│ │
│ 18. 儲存結構化結果到資料庫 │
│ • 表格:scan_results │
│ • 欄位:raw_output (原始輸出) + structured_findings │
└─────────────┬───────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ 階段六:LLM 解釋與報告生成 │
└─────────────┬───────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ 19. 將結構化結果再次發送給 LLM │
│ • Prompt: "As a cybersecurity analyst, interpret..." │
│ • 提供上下文:目標、工具、原始目標 │
│ │
│ 20. LLM 生成人類可讀的分析 │
│ • Executive Summary (執行摘要) │
│ • Detailed Findings (詳細發現) │
│ • Risk Assessment (風險評估) │
│ • Remediation Steps (修復建議) │
│ • Educational Insights (學習要點) │
│ │
│ 21. 儲存 LLM 分析到資料庫 │
│ • 欄位:llm_analysis (JSONB) │
└─────────────┬───────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ 階段七:完成與通知 │
└─────────────┬───────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ 22. 更新掃描狀態 │
│ • 狀態:running → completed │
│ • 記錄 completed_at 時間戳 │
│ • 統計 findings 數量與嚴重性分佈 │
│ │
│ 23. 觸發通知 (如果啟用) │
│ • Email 通知 (Critical 漏洞) │
│ • Webhook 調用 (整合第三方系統) │
│ • Slack 訊息 │
│ │
│ 24. 前端獲取完整結果 │
│ • API: GET /api/v1/scans/{scan_id}/results │
│ • 渲染報告頁面 │
└─────────────┴───────────────────────────────────────────┘
本系統的 Prompt 設計遵循以下原則:
GENERIC_TOOL_SCRIPT_PROMPT = """
ROLE: You are a senior penetration tester with 10+ years of experience in web application security testing.
TASK: Write a Python script to {task_description} for the target {target}.
SECURITY RULES (CRITICAL - MUST FOLLOW):
1. This is authorized security testing with explicit permission
2. Do NOT cause damage, disruption, or data corruption
3. Use minimal payloads necessary for detection only
4. Do NOT execute shell commands with shell=True
5. Do NOT use eval(), exec(), or __import__()
6. Implement proper error handling and timeouts
7. Respect rate limits to avoid overwhelming the target
TECHNICAL REQUIREMENTS:
1. Use only the following approved libraries: {allowed_libraries}
2. Use the tool binary at: {tool_path}
3. Output format: {output_format}
4. Timeout: {timeout} seconds
5. Parse the tool output and convert to JSON structure
6. Handle all exceptions gracefully
TOOL-SPECIFIC PARAMETERS:
{tool_parameters}
EXPECTED OUTPUT STRUCTURE:
```json
{expected_output_schema}
IMPORTANT NOTES:
- Validate all inputs before use
- Log all actions for audit purposes
- If uncertain, fail safely and report the issue
Now, generate the complete Python script. Only output the code, no additional explanation.
"""
**使用範例**:
```python
prompt = GENERIC_TOOL_SCRIPT_PROMPT.format(
task_description="scan open ports and detect service versions",
target="example.com",
allowed_libraries="subprocess, json, sys",
tool_path="/usr/bin/nmap",
output_format="XML (using -oX - flag)",
timeout=300,
tool_parameters="""
- Scan ports: 80, 443, 8080, 8443
- Enable version detection: -sV flag
- Set timing template: -T4
- Disable DNS resolution: -n flag
""",
expected_output_schema=json.dumps({
"scan_time": "ISO-8601 timestamp",
"target": "string",
"ports": [
{
"port": "integer",
"protocol": "string",
"service": "string",
"version": "string"
}
]
}, indent=2)
)
CVE_VERIFICATION_PROMPT = """
ROLE: You are a senior penetration tester specializing in vulnerability verification.
OBJECTIVE: Write a Python script to safely verify if {target} is vulnerable to {cve_id}.
CVE INFORMATION:
- CVE ID: {cve_id}
- Severity: {severity} (CVSS: {cvss_score})
- Affected Component: {affected_component}
- Vulnerability Type: {vulnerability_type}
- Description: {cve_description}
VERIFICATION METHODOLOGY:
{detection_method}
CRITICAL SAFETY RULES:
1. This is DETECTION only, NOT exploitation
2. Use the minimal payload necessary to confirm vulnerability
3. Do NOT attempt to extract sensitive data
4. Do NOT modify or delete any data on the target
5. Implement request throttling (1 request per second)
6. Set timeout to 5 seconds per request
7. If target returns 429 or 503, stop immediately
TECHNICAL REQUIREMENTS:
1. Use the 'requests' library for HTTP interactions
2. Implement User-Agent header: "Security-Scanner/1.0"
3. Verify SSL certificates (verify=True)
4. Follow redirects (allow_redirects=True)
5. Maximum of 3 verification attempts
OUTPUT FORMAT:
Print a JSON object to stdout with exactly this structure:
```json
{{
"status": "VULNERABLE" | "NOT_VULNERABLE" | "UNCERTAIN",
"confidence": 0.0 to 1.0,
"evidence": {{
// Specific evidence data
}},
"raw_response": {{
"status_code": 200,
"headers": {{}},
"body_snippet": "First 500 chars"
}}
}}
EXPECTED BEHAVIOR:
- Print "VULNERABLE" if the target exhibits the exact vulnerable behavior described
- Print "NOT_VULNERABLE" if the target is clearly patched or unaffected
- Print "UNCERTAIN" if the test is inconclusive
Now, generate the complete Python script. Output code only, no explanations.
"""
#### 模板類型三:結果解釋
```python
RESULT_INTERPRETATION_PROMPT = """
ROLE: You are a senior cybersecurity analyst and educator with expertise in translating technical security findings into actionable insights.
CONTEXT:
- Target: {target}
- Scan Type: {scan_type}
- Tool Used: {tool_name}
- User Level: {user_level} (beginner / intermediate / advanced)
RAW SCAN OUTPUT:
{raw_output}
STRUCTURED FINDINGS:
```json
{structured_findings}
YOUR TASK:
Provide a comprehensive analysis with the following sections:
Immediate actions required
DETAILED FINDINGS
For each finding:
Severity justification
RISK ASSESSMENT
Likelihood of exploitation
REMEDIATION RECOMMENDATIONS
For each finding, provide:
Testing procedures to verify fix
EDUCATIONAL INSIGHTS (for {user_level} level)
TONE AND STYLE:
- Clear and professional
- Educational but not condescending
- Use analogies for complex concepts
- Provide examples and references
OUTPUT FORMAT: Markdown with clear section headers
Begin your analysis:
"""
### 2.3 對話模式的 System Prompt
```python
CONVERSATIONAL_SYSTEM_PROMPT = """
You are an experienced cybersecurity mentor and penetration testing expert. Your role is to guide users through web security testing in an interactive, educational manner.
YOUR TEACHING PHILOSOPHY:
- Ask questions to understand the user's current knowledge level
- Explain concepts before diving into technical details
- Use the Socratic method to encourage critical thinking
- Provide context for why each step is important
- Celebrate learning moments and progress
YOUR CAPABILITIES:
- Generate custom security testing scripts
- Analyze scan results and explain findings
- Recommend appropriate tools for different scenarios
- Connect abstract security concepts to concrete tests
- Adapt explanations to the user's level
BEHAVIORAL GUIDELINES:
1. ALWAYS explain the reasoning behind your suggestions
2. NEVER execute a scan without the user's explicit confirmation
3. When proposing a script, show the code and explain what it does
4. If the user seems confused, ask clarifying questions
5. Offer multiple paths: "quick scan" vs "deep dive learning"
6. Point out learning opportunities in the results
7. Connect findings back to theoretical concepts
SAFETY PROTOCOLS:
- Remind users that testing requires authorization
- Warn about potentially disruptive tests before running them
- Explain the ethical implications of security testing
- Encourage responsible disclosure practices
CONVERSATION STRUCTURE:
- Greet users warmly and assess their goals
- Break complex tasks into digestible steps
- Provide progress updates during scans
- Celebrate discoveries and learning moments
- Summarize key takeaways at the end
EXAMPLE INTERACTIONS:
User: "I want to test my website"
You: "Great! Let's start by understanding what you're looking for. Are you concerned about specific vulnerabilities, or would you like a comprehensive assessment? Also, can you confirm you own this website or have explicit permission to test it?"
User: "Just run a full scan"
You: "I can definitely do that, but let me first explain what a 'full scan' involves so you know what to expect. It typically includes [list components]. This can take 30-60 minutes and may generate significant traffic. Sound good? Or would you prefer to start with something lighter while I explain each step?"
Now, engage with the user based on their messages. Be friendly, educational, and security-conscious.
"""
from typing import Optional, Dict, List
import subprocess
import xml.etree.ElementTree as ET
from dataclasses import dataclass
@dataclass
class NmapPort:
port: int
protocol: str
state: str
service_name: str
service_version: Optional[str]
service_product: Optional[str]
class NmapWrapper:
BINARY_PATH = "/usr/bin/nmap"
def __init__(self):
self.default_timeout = 300 # 5 minutes
def build_command(
self,
target: str,
ports: str = "80,443,8080,8443",
scan_type: str = "version_detection",
timing: str = "T4"
) -> List[str]:
"""
建構 nmap 命令
scan_type 選項:
- port_scan: 基礎連接埠掃描
- version_detection: 服務版本探測 (-sV)
- os_detection: 作業系統探測 (-O)
"""
cmd = [self.BINARY_PATH]
# 掃描類型
if scan_type == "version_detection":
cmd.append("-sV")
elif scan_type == "os_detection":
cmd.extend(["-O", "--osscan-guess"])
# 連接埠範圍
cmd.extend(["-p", ports])
# 時序模板
cmd.append(f"-{timing}")
# 停用反向 DNS 查詢(加速)
cmd.append("-n")
# XML 輸出到 stdout
cmd.extend(["-oX", "-"])
# 目標
cmd.append(target)
return cmd
def execute(self, command: List[str]) -> str:
"""執行 nmap 命令並返回 XML 輸出"""
try:
result = subprocess.run(
command,
capture_output=True,
text=True,
timeout=self.default_timeout,
check=True
)
return result.stdout
except subprocess.TimeoutExpired:
raise ToolExecutionError(f"nmap scan exceeded {self.default_timeout}s timeout")
except subprocess.CalledProcessError as e:
raise ToolExecutionError(f"nmap execution failed: {e.stderr}")
def parse_xml_output(self, xml_output: str) -> Dict:
"""解析 nmap XML 輸出"""
try:
root = ET.fromstring(xml_output)
# 提取掃描資訊
scan_info = {
"start_time": root.get("start"),
"version": root.get("version"),
"args": root.get("args")
}
# 提取主機資訊
hosts = []
for host in root.findall(".//host"):
host_info = self._parse_host(host)
if host_info:
hosts.append(host_info)
return {
"scan_info": scan_info,
"hosts": hosts
}
except ET.ParseError as e:
raise DataTransformError(f"Failed to parse nmap XML: {e}")
def _parse_host(self, host_element) -> Optional[Dict]:
"""解析單個主機的資訊"""
# 檢查主機是否在線
status = host_element.find("status")
if status is None or status.get("state") != "up":
return None
# IP 地址
address_elem = host_element.find("address")
ip = address_elem.get("addr") if address_elem is not None else "unknown"
# 主機名稱(如果有)
hostname_elem = host_element.find(".//hostname")
hostname = hostname_elem.get("name") if hostname_elem is not None else None
# 解析連接埠
ports = []
for port_elem in host_element.findall(".//port"):
port_info = self._parse_port(port_elem)
if port_info:
ports.append(port_info)
return {
"ip": ip,
"hostname": hostname,
"ports": ports
}
def _parse_port(self, port_element) -> Optional[Dict]:
"""解析單個連接埠的資訊"""
state_elem = port_element.find("state")
if state_elem is None or state_elem.get("state") != "open":
return None
port_num = int(port_element.get("portid"))
protocol = port_element.get("protocol")
# 服務資訊
service_elem = port_element.find("service")
if service_elem is not None:
service_name = service_elem.get("name", "unknown")
service_product = service_elem.get("product")
service_version = service_elem.get("version")
service_extrainfo = service_elem.get("extrainfo")
# 組合完整版本字串
version_str = None
if service_product:
version_str = service_product
if service_version:
version_str += f" {service_version}"
if service_extrainfo:
version_str += f" ({service_extrainfo})"
else:
service_name = "unknown"
version_str = None
return {
"port": port_num,
"protocol": protocol,
"service_name": service_name,
"service_version": version_str
}
def convert_to_findings(self, parsed_data: Dict) -> List[Finding]:
"""將解析後的數據轉換為標準 Finding 格式"""
findings = []
for host in parsed_data["hosts"]:
for port in host["ports"]:
# 根據服務與版本評估嚴重性
severity = self._assess_port_severity(port)
finding = Finding(
severity=severity,
title=f"Open port detected: {port['port']}/{port['protocol']}",
description=self._build_port_description(port),
cve=None, # 此階段還不知道 CVE
evidence={
"ip": host["ip"],
"hostname": host["hostname"],
"port": port["port"],
"protocol": port["protocol"],
"service_name": port["service_name"],
"service_version": port["service_version"]
}
)
findings.append(finding)
return findings
def _assess_port_severity(self, port: Dict) -> str:
"""評估連接埠開放的嚴重性"""
# 常見高風險連接埠
high_risk_ports = [23, 21, 3389, 22] # Telnet, FTP, RDP, SSH
if port["port"] in high_risk_ports:
return "High"
elif port["service_name"] in ["http", "https"]:
return "Medium"
else:
return "Low"
def _build_port_description(self, port: Dict) -> str:
"""建構連接埠發現的描述"""
desc = f"Port {port['port']}/{port['protocol']} is open and running {port['service_name']}"
if port["service_version"]:
desc += f" (version: {port['service_version']})"
desc += ". This port is accessible from the internet and may expose services to potential attackers."
return desc
import httpx
import asyncio
from datetime import datetime, timedelta
from typing import AsyncIterator
class NVDSyncService:
"""
National Vulnerability Database 同步服務
"""
BASE_URL = "https://services.nvd.nist.gov/rest/json/cves/2.0"
def __init__(self, api_key: Optional[str] = None):
self.api_key = api_key
self.rate_limit_delay = 0.6 if api_key else 6 # 有 API key 可加速
async def sync_recent_cves(self, days: int = 7) -> int:
"""同步最近 N 天的 CVE"""
end_date = datetime.now()
start_date = end_date - timedelta(days=days)
count = 0
async for cve in self._fetch_cves_by_date_range(start_date, end_date):
await self._save_cve_to_database(cve)
count += 1
return count
async def _fetch_cves_by_date_range(
self,
start_date: datetime,
end_date: datetime
) -> AsyncIterator[Dict]:
"""依日期範圍獲取 CVE(分頁)"""
async with httpx.AsyncClient() as client:
start_index = 0
results_per_page = 2000
while True:
params = {
"pubStartDate": start_date.isoformat(),
"pubEndDate": end_date.isoformat(),
"startIndex": start_index,
"resultsPerPage": results_per_page
}
if self.api_key:
headers = {"apiKey": self.api_key}
else:
headers = {}
response = await client.get(
self.BASE_URL,
params=params,
headers=headers,
timeout=30
)
response.raise_for_status()
data = response.json()
vulnerabilities = data.get("vulnerabilities", [])
if not vulnerabilities:
break
for vuln_data in vulnerabilities:
cve = self._parse_nvd_cve(vuln_data)
yield cve
# 檢查是否還有更多結果
total_results = data.get("totalResults", 0)
if start_index + results_per_page >= total_results:
break
start_index += results_per_page
# 速率限制
await asyncio.sleep(self.rate_limit_delay)
def _parse_nvd_cve(self, vuln_data: Dict) -> Dict:
"""解析 NVD API 回應為內部格式"""
cve_item = vuln_data["cve"]
cve_id = cve_item["id"]
# 描述
descriptions = cve_item.get("descriptions", [])
description = next(
(d["value"] for d in descriptions if d["lang"] == "en"),
"No description available"
)
# CVSS 評分
metrics = vuln_data.get("cve", {}).get("metrics", {})
cvss_data = (
metrics.get("cvssMetricV31", [{}])[0]
or metrics.get("cvssMetricV30", [{}])[0]
or metrics.get("cvssMetricV2", [{}])[0]
)
cvss_score = cvss_data.get("cvssData", {}).get("baseScore", 0.0)
severity = cvss_data.get("baseSeverity", "Unknown")
# 受影響的 CPE(Common Platform Enumeration)
configurations = vuln_data.get("cve", {}).get("configurations", [])
affected_cpe = []
for config in configurations:
for node in config.get("nodes", []):
for cpe_match in node.get("cpeMatch", []):
if cpe_match.get("vulnerable"):
affected_cpe.append(cpe_match.get("criteria"))
# 發佈日期
published = cve_item.get("published")
# 參考連結
references = [
ref["url"] for ref in cve_item.get("references", [])
]
return {
"cve_id": cve_id,
"description": description,
"cvss_score": cvss_score,
"severity": severity,
"affected_cpe": affected_cpe,
"published_date": published,
"references": references,
"raw_data": vuln_data # 保存原始資料供未來使用
}
async def _save_cve_to_database(self, cve: Dict):
"""儲存 CVE 到資料庫"""
query = """
INSERT INTO cve_database (
cve_id, description, cvss_score, severity,
affected_cpe, published_date, references, raw_data
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
ON CONFLICT (cve_id) DO UPDATE SET
description = EXCLUDED.description,
cvss_score = EXCLUDED.cvss_score,
severity = EXCLUDED.severity,
affected_cpe = EXCLUDED.affected_cpe,
published_date = EXCLUDED.published_date,
references = EXCLUDED.references,
raw_data = EXCLUDED.raw_data,
updated_at = NOW()
"""
await db.execute(
query,
cve["cve_id"],
cve["description"],
cve["cvss_score"],
cve["severity"],
json.dumps(cve["affected_cpe"]),
cve["published_date"],
json.dumps(cve["references"]),
json.dumps(cve["raw_data"])
)
class SecurityScannerError(Exception):
"""基礎異常類別"""
pass
class ValidationError(SecurityScannerError):
"""輸入驗證錯誤"""
pass
class AuthorizationError(SecurityScannerError):
"""授權檢查失敗"""
pass
class LLMError(SecurityScannerError):
"""LLM API 相關錯誤"""
pass
class ToolExecutionError(SecurityScannerError):
"""工具執行錯誤"""
pass
class SandboxError(SecurityScannerError):
"""沙箱執行錯誤"""
pass
class DataTransformError(SecurityScannerError):
"""數據轉換錯誤"""
pass
import functools
import asyncio
from typing import Callable, TypeVar, Any
T = TypeVar('T')
def retry_with_backoff(
max_attempts: int = 3,
initial_delay: float = 1.0,
backoff_factor: float = 2.0,
exceptions: tuple = (Exception,)
):
"""
指數退避重試裝飾器
Args:
max_attempts: 最大嘗試次數
initial_delay: 初始延遲(秒)
backoff_factor: 退避倍數
exceptions: 需要重試的異常類型
"""
def decorator(func: Callable[..., T]) -> Callable[..., T]:
@functools.wraps(func)
async def wrapper(*args, **kwargs) -> T:
delay = initial_delay
last_exception = None
for attempt in range(1, max_attempts + 1):
try:
return await func(*args, **kwargs)
except exceptions as e:
last_exception = e
if attempt == max_attempts:
# 最後一次嘗試失敗,拋出異常
raise
# 記錄重試
logger.warning(
f"{func.__name__} attempt {attempt}/{max_attempts} failed: {e}. "
f"Retrying in {delay}s..."
)
await asyncio.sleep(delay)
delay *= backoff_factor
# 理論上不會執行到這裡
raise last_exception
return wrapper
return decorator
# 使用範例
@retry_with_backoff(
max_attempts=3,
initial_delay=1.0,
exceptions=(httpx.HTTPError, LLMError)
)
async def call_llm_api(prompt: str) -> str:
"""調用 LLM API(含重試機制)"""
response = await llm_client.complete(prompt)
return response
import structlog
from datetime import datetime
# 配置 structlog
structlog.configure(
processors=[
structlog.stdlib.add_log_level,
structlog.stdlib.add_logger_name,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.JSONRenderer()
]
)
logger = structlog.get_logger()
# 使用範例
async def execute_scan(scan_id: str, target: str):
log = logger.bind(
scan_id=scan_id,
target=target,
component="scan_executor"
)
log.info("scan_started")
try:
result = await perform_scan()
log.info("scan_completed",
duration_ms=result.duration,
findings_count=len(result.findings))
except Exception as e:
log.error("scan_failed",
error_type=type(e).__name__,
error_message=str(e))
raise
from prometheus_client import Counter, Histogram, Gauge
# 定義指標
scan_requests_total = Counter(
'scan_requests_total',
'Total number of scan requests',
['scan_mode', 'status']
)
scan_duration_seconds = Histogram(
'scan_duration_seconds',
'Scan execution duration in seconds',
['scan_mode', 'tool']
)
active_scans = Gauge(
'active_scans',
'Number of currently running scans'
)
llm_api_calls_total = Counter(
'llm_api_calls_total',
'Total LLM API calls',
['purpose', 'model']
)
# 使用範例
async def execute_scan_with_metrics(scan_request: ScanRequest):
scan_requests_total.labels(
scan_mode=scan_request.mode,
status='started'
).inc()
active_scans.inc()
start_time = time.time()
try:
result = await perform_scan(scan_request)
scan_requests_total.labels(
scan_mode=scan_request.mode,
status='completed'
).inc()
return result
except Exception:
scan_requests_total.labels(
scan_mode=scan_request.mode,
status='failed'
).inc()
raise
finally:
duration = time.time() - start_time
scan_duration_seconds.labels(
scan_mode=scan_request.mode,
tool='overall'
).observe(duration)
active_scans.dec()
本文件最後更新:2025-10-21
返回 主文件