scripts: 添加基于 CSV 的 MCP secrets 重加密修复工具

通过读取 entry_id/secret_name/secret_value 调用 secrets_update 让服务端用当前密钥重加密。附带模板 CSV,.gitignore 忽略 *.pyc。
This commit is contained in:
voson
2026-04-06 13:34:04 +08:00
parent 53d53ff96a
commit 8942718641
3 changed files with 386 additions and 1 deletions

3
.gitignore vendored
View File

@@ -5,4 +5,5 @@
*.pem *.pem
tmp/ tmp/
client_secret_*.apps.googleusercontent.com.json client_secret_*.apps.googleusercontent.com.json
node_modules/ node_modules/
*.pyc

View File

@@ -0,0 +1 @@
entry_id,secret_name,secret_value
1 entry_id secret_name secret_value

View File

@@ -0,0 +1,383 @@
#!/usr/bin/env python3
"""
Batch re-encrypt secret fields from a CSV file.
CSV format:
entry_id,secret_name,secret_value
019d...,api_key,sk-xxxx
019d...,password,hunter2
The script groups rows by entry_id, then calls `secrets_update` with `secrets_obj`
so the server re-encrypts the provided plaintext values with the current key.
Warnings:
- Keep the CSV outside version control whenever possible.
- Delete the filled CSV after the repair is complete.
"""
from __future__ import annotations
import argparse
import csv
import json
import sys
import urllib.error
import urllib.request
from collections import OrderedDict
from pathlib import Path
from typing import Any
DEFAULT_USER_AGENT = "Cursor/3.0.12 (darwin arm64)"
REQUIRED_COLUMNS = {"entry_id", "secret_name", "secret_value"}
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Repair secret ciphertexts by re-submitting plaintext via secrets_update."
)
parser.add_argument(
"--csv",
required=True,
help="Path to CSV file with columns: entry_id,secret_name,secret_value",
)
parser.add_argument(
"--mcp-json",
default=str(Path.home() / ".cursor" / "mcp.json"),
help="Path to mcp.json used to resolve URL and headers",
)
parser.add_argument(
"--server",
default="secrets",
help="MCP server name inside mcp.json (default: secrets)",
)
parser.add_argument("--url", help="Override MCP URL")
parser.add_argument("--auth", help="Override Authorization header value")
parser.add_argument("--encryption-key", help="Override X-Encryption-Key header value")
parser.add_argument(
"--user-agent",
default=DEFAULT_USER_AGENT,
help=f"User-Agent header (default: {DEFAULT_USER_AGENT})",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Parse and print grouped updates without sending requests",
)
return parser.parse_args()
def load_mcp_config(path: str, server_name: str) -> dict[str, Any]:
data = json.loads(Path(path).read_text(encoding="utf-8"))
servers = data.get("mcpServers", {})
if server_name not in servers:
raise KeyError(f"Server '{server_name}' not found in {path}")
return servers[server_name]
def resolve_connection_settings(args: argparse.Namespace) -> tuple[str, str, str]:
server = load_mcp_config(args.mcp_json, args.server)
headers = server.get("headers", {})
url = args.url or server.get("url")
auth = args.auth or headers.get("Authorization")
encryption_key = args.encryption_key or headers.get("X-Encryption-Key")
if not url:
raise ValueError("Missing MCP URL. Pass --url or configure it in mcp.json.")
if not auth:
raise ValueError(
"Missing Authorization header. Pass --auth or configure it in mcp.json."
)
if not encryption_key:
raise ValueError(
"Missing X-Encryption-Key. Pass --encryption-key or configure it in mcp.json."
)
return url, auth, encryption_key
def load_updates(csv_path: str) -> OrderedDict[str, OrderedDict[str, str]]:
grouped: OrderedDict[str, OrderedDict[str, str]] = OrderedDict()
with Path(csv_path).open("r", encoding="utf-8-sig", newline="") as fh:
reader = csv.DictReader(fh)
fieldnames = set(reader.fieldnames or [])
missing = REQUIRED_COLUMNS - fieldnames
if missing:
raise ValueError(
"CSV missing required columns: " + ", ".join(sorted(missing))
)
for line_no, row in enumerate(reader, start=2):
entry_id = (row.get("entry_id") or "").strip()
secret_name = (row.get("secret_name") or "").strip()
secret_value = row.get("secret_value") or ""
if not entry_id and not secret_name and not secret_value:
continue
if not entry_id:
raise ValueError(f"Line {line_no}: entry_id is required")
if not secret_name:
raise ValueError(f"Line {line_no}: secret_name is required")
entry_group = grouped.setdefault(entry_id, OrderedDict())
if secret_name in entry_group:
raise ValueError(
f"Line {line_no}: duplicate secret_name '{secret_name}' for entry_id '{entry_id}'"
)
entry_group[secret_name] = secret_value
if not grouped:
raise ValueError("CSV contains no updates")
return grouped
def post_json(
url: str,
payload: dict[str, Any],
auth: str,
encryption_key: str,
user_agent: str,
session_id: str | None = None,
) -> tuple[int, str | None, str]:
headers = {
"Content-Type": "application/json",
"Accept": "application/json, text/event-stream",
"Authorization": auth,
"X-Encryption-Key": encryption_key,
"User-Agent": user_agent,
}
if session_id:
headers["mcp-session-id"] = session_id
req = urllib.request.Request(
url,
data=json.dumps(payload).encode("utf-8"),
headers=headers,
method="POST",
)
try:
with urllib.request.urlopen(req, timeout=30) as resp:
return (
resp.status,
resp.headers.get("mcp-session-id") or session_id,
resp.read().decode("utf-8"),
)
except urllib.error.HTTPError as exc:
body = exc.read().decode("utf-8", errors="replace")
return exc.code, session_id, body
def parse_sse_json(body: str) -> list[dict[str, Any]]:
items: list[dict[str, Any]] = []
for line in body.splitlines():
if line.startswith("data: {"):
items.append(json.loads(line[6:]))
return items
def initialize_session(
url: str, auth: str, encryption_key: str, user_agent: str
) -> str:
status, session_id, body = post_json(
url,
{
"jsonrpc": "2.0",
"id": 1,
"method": "initialize",
"params": {
"protocolVersion": "2025-06-18",
"capabilities": {},
"clientInfo": {"name": "repair-script", "version": "1.0"},
},
},
auth,
encryption_key,
user_agent,
)
if status != 200 or not session_id:
raise RuntimeError(f"initialize failed: status={status}, body={body[:500]}")
status, _, body = post_json(
url,
{"jsonrpc": "2.0", "method": "notifications/initialized", "params": {}},
auth,
encryption_key,
user_agent,
session_id,
)
if status not in (200, 202):
raise RuntimeError(
f"notifications/initialized failed: status={status}, body={body[:500]}"
)
return session_id
def load_entry_index(
url: str, auth: str, encryption_key: str, user_agent: str, session_id: str
) -> dict[str, tuple[str, str]]:
status, _, body = post_json(
url,
{
"jsonrpc": "2.0",
"id": 999_001,
"method": "tools/call",
"params": {
"name": "secrets_find",
"arguments": {
"limit": 1000,
},
},
},
auth,
encryption_key,
user_agent,
session_id,
)
items = parse_sse_json(body)
last = items[-1] if items else {"raw": body[:1000]}
if status != 200:
raise RuntimeError(
f"secrets_find failed: status={status}, body={body[:500]}"
)
if "error" in last:
raise RuntimeError(f"secrets_find returned error: {last}")
content = last.get("result", {}).get("content", [])
if not content:
raise RuntimeError("secrets_find returned no content")
payload = json.loads(content[0]["text"])
index: dict[str, tuple[str, str]] = {}
for entry in payload.get("entries", []):
entry_id = entry.get("id")
name = entry.get("name")
folder = entry.get("folder", "")
if entry_id and name is not None:
index[entry_id] = (name, folder)
return index
def call_secrets_update(
url: str,
auth: str,
encryption_key: str,
user_agent: str,
session_id: str,
request_id: int,
entry_id: str,
entry_name: str,
entry_folder: str,
secrets_obj: dict[str, str],
) -> dict[str, Any]:
payload = {
"jsonrpc": "2.0",
"id": request_id,
"method": "tools/call",
"params": {
"name": "secrets_update",
"arguments": {
"id": entry_id,
"name": entry_name,
"folder": entry_folder,
"secrets_obj": secrets_obj,
# Pass the key as an argument too, so repair can still work
# even when a client/proxy mishandles custom headers.
"encryption_key": encryption_key,
},
},
}
status, _, body = post_json(
url, payload, auth, encryption_key, user_agent, session_id
)
items = parse_sse_json(body)
last = items[-1] if items else {"raw": body[:1000]}
if status != 200:
raise RuntimeError(
f"secrets_update failed for {entry_id}: status={status}, body={body[:500]}"
)
return last
def main() -> int:
args = parse_args()
try:
url, auth, encryption_key = resolve_connection_settings(args)
updates = load_updates(args.csv)
except Exception as exc:
print(f"ERROR: {exc}", file=sys.stderr)
return 1
print(f"Loaded {len(updates)} entries from {args.csv}")
if args.dry_run:
for entry_id, secrets_obj in updates.items():
print(
json.dumps(
{"id": entry_id, "secrets_obj": secrets_obj},
ensure_ascii=False,
indent=2,
)
)
return 0
try:
session_id = initialize_session(url, auth, encryption_key, args.user_agent)
entry_index = load_entry_index(
url, auth, encryption_key, args.user_agent, session_id
)
except Exception as exc:
print(f"ERROR: {exc}", file=sys.stderr)
return 1
success = 0
failures = 0
for request_id, (entry_id, secrets_obj) in enumerate(updates.items(), start=2):
try:
if entry_id not in entry_index:
raise RuntimeError(
f"entry id not found in secrets_find results: {entry_id}"
)
entry_name, entry_folder = entry_index[entry_id]
result = call_secrets_update(
url,
auth,
encryption_key,
args.user_agent,
session_id,
request_id,
entry_id,
entry_name,
entry_folder,
secrets_obj,
)
if "error" in result:
failures += 1
print(
json.dumps(
{"id": entry_id, "status": "error", "result": result},
ensure_ascii=False,
),
file=sys.stderr,
)
else:
success += 1
print(
json.dumps(
{"id": entry_id, "status": "ok", "result": result},
ensure_ascii=False,
)
)
except Exception as exc:
failures += 1
print(f"{entry_id}: ERROR: {exc}", file=sys.stderr)
print(f"Done. success={success} failure={failures}")
return 0 if failures == 0 else 2
if __name__ == "__main__":
raise SystemExit(main())