#!/usr/bin/env python3 """ Batch re-encrypt secret fields from a CSV file. CSV format: entry_id,secret_name,secret_value 019d...,api_key,sk-xxxx 019d...,password,hunter2 The script groups rows by entry_id, then calls `secrets_update` with `secrets_obj` so the server re-encrypts the provided plaintext values with the current key. Warnings: - Keep the CSV outside version control whenever possible. - Delete the filled CSV after the repair is complete. """ from __future__ import annotations import argparse import csv import json import sys import urllib.error import urllib.request from collections import OrderedDict from pathlib import Path from typing import Any DEFAULT_USER_AGENT = "Cursor/3.0.12 (darwin arm64)" REQUIRED_COLUMNS = {"entry_id", "secret_name", "secret_value"} def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description="Repair secret ciphertexts by re-submitting plaintext via secrets_update." ) parser.add_argument( "--csv", required=True, help="Path to CSV file with columns: entry_id,secret_name,secret_value", ) parser.add_argument( "--mcp-json", default=str(Path.home() / ".cursor" / "mcp.json"), help="Path to mcp.json used to resolve URL and headers", ) parser.add_argument( "--server", default="secrets", help="MCP server name inside mcp.json (default: secrets)", ) parser.add_argument("--url", help="Override MCP URL") parser.add_argument("--auth", help="Override Authorization header value") parser.add_argument("--encryption-key", help="Override X-Encryption-Key header value") parser.add_argument( "--user-agent", default=DEFAULT_USER_AGENT, help=f"User-Agent header (default: {DEFAULT_USER_AGENT})", ) parser.add_argument( "--dry-run", action="store_true", help="Parse and print grouped updates without sending requests", ) return parser.parse_args() def load_mcp_config(path: str, server_name: str) -> dict[str, Any]: data = json.loads(Path(path).read_text(encoding="utf-8")) servers = data.get("mcpServers", {}) if server_name not in servers: raise KeyError(f"Server '{server_name}' not found in {path}") return servers[server_name] def resolve_connection_settings(args: argparse.Namespace) -> tuple[str, str, str]: server = load_mcp_config(args.mcp_json, args.server) headers = server.get("headers", {}) url = args.url or server.get("url") auth = args.auth or headers.get("Authorization") encryption_key = args.encryption_key or headers.get("X-Encryption-Key") if not url: raise ValueError("Missing MCP URL. Pass --url or configure it in mcp.json.") if not auth: raise ValueError( "Missing Authorization header. Pass --auth or configure it in mcp.json." ) if not encryption_key: raise ValueError( "Missing X-Encryption-Key. Pass --encryption-key or configure it in mcp.json." ) return url, auth, encryption_key def load_updates(csv_path: str) -> OrderedDict[str, OrderedDict[str, str]]: grouped: OrderedDict[str, OrderedDict[str, str]] = OrderedDict() with Path(csv_path).open("r", encoding="utf-8-sig", newline="") as fh: reader = csv.DictReader(fh) fieldnames = set(reader.fieldnames or []) missing = REQUIRED_COLUMNS - fieldnames if missing: raise ValueError( "CSV missing required columns: " + ", ".join(sorted(missing)) ) for line_no, row in enumerate(reader, start=2): entry_id = (row.get("entry_id") or "").strip() secret_name = (row.get("secret_name") or "").strip() secret_value = row.get("secret_value") or "" if not entry_id and not secret_name and not secret_value: continue if not entry_id: raise ValueError(f"Line {line_no}: entry_id is required") if not secret_name: raise ValueError(f"Line {line_no}: secret_name is required") entry_group = grouped.setdefault(entry_id, OrderedDict()) if secret_name in entry_group: raise ValueError( f"Line {line_no}: duplicate secret_name '{secret_name}' for entry_id '{entry_id}'" ) entry_group[secret_name] = secret_value if not grouped: raise ValueError("CSV contains no updates") return grouped def post_json( url: str, payload: dict[str, Any], auth: str, encryption_key: str, user_agent: str, session_id: str | None = None, ) -> tuple[int, str | None, str]: headers = { "Content-Type": "application/json", "Accept": "application/json, text/event-stream", "Authorization": auth, "X-Encryption-Key": encryption_key, "User-Agent": user_agent, } if session_id: headers["mcp-session-id"] = session_id req = urllib.request.Request( url, data=json.dumps(payload).encode("utf-8"), headers=headers, method="POST", ) try: with urllib.request.urlopen(req, timeout=30) as resp: return ( resp.status, resp.headers.get("mcp-session-id") or session_id, resp.read().decode("utf-8"), ) except urllib.error.HTTPError as exc: body = exc.read().decode("utf-8", errors="replace") return exc.code, session_id, body def parse_sse_json(body: str) -> list[dict[str, Any]]: items: list[dict[str, Any]] = [] for line in body.splitlines(): if line.startswith("data: {"): items.append(json.loads(line[6:])) return items def initialize_session( url: str, auth: str, encryption_key: str, user_agent: str ) -> str: status, session_id, body = post_json( url, { "jsonrpc": "2.0", "id": 1, "method": "initialize", "params": { "protocolVersion": "2025-06-18", "capabilities": {}, "clientInfo": {"name": "repair-script", "version": "1.0"}, }, }, auth, encryption_key, user_agent, ) if status != 200 or not session_id: raise RuntimeError(f"initialize failed: status={status}, body={body[:500]}") status, _, body = post_json( url, {"jsonrpc": "2.0", "method": "notifications/initialized", "params": {}}, auth, encryption_key, user_agent, session_id, ) if status not in (200, 202): raise RuntimeError( f"notifications/initialized failed: status={status}, body={body[:500]}" ) return session_id def load_entry_index( url: str, auth: str, encryption_key: str, user_agent: str, session_id: str ) -> dict[str, tuple[str, str]]: status, _, body = post_json( url, { "jsonrpc": "2.0", "id": 999_001, "method": "tools/call", "params": { "name": "secrets_find", "arguments": { "limit": 1000, }, }, }, auth, encryption_key, user_agent, session_id, ) items = parse_sse_json(body) last = items[-1] if items else {"raw": body[:1000]} if status != 200: raise RuntimeError( f"secrets_find failed: status={status}, body={body[:500]}" ) if "error" in last: raise RuntimeError(f"secrets_find returned error: {last}") content = last.get("result", {}).get("content", []) if not content: raise RuntimeError("secrets_find returned no content") payload = json.loads(content[0]["text"]) index: dict[str, tuple[str, str]] = {} for entry in payload.get("entries", []): entry_id = entry.get("id") name = entry.get("name") folder = entry.get("folder", "") if entry_id and name is not None: index[entry_id] = (name, folder) return index def call_secrets_update( url: str, auth: str, encryption_key: str, user_agent: str, session_id: str, request_id: int, entry_id: str, entry_name: str, entry_folder: str, secrets_obj: dict[str, str], ) -> dict[str, Any]: payload = { "jsonrpc": "2.0", "id": request_id, "method": "tools/call", "params": { "name": "secrets_update", "arguments": { "id": entry_id, "name": entry_name, "folder": entry_folder, "secrets_obj": secrets_obj, # Pass the key as an argument too, so repair can still work # even when a client/proxy mishandles custom headers. "encryption_key": encryption_key, }, }, } status, _, body = post_json( url, payload, auth, encryption_key, user_agent, session_id ) items = parse_sse_json(body) last = items[-1] if items else {"raw": body[:1000]} if status != 200: raise RuntimeError( f"secrets_update failed for {entry_id}: status={status}, body={body[:500]}" ) return last def main() -> int: args = parse_args() try: url, auth, encryption_key = resolve_connection_settings(args) updates = load_updates(args.csv) except Exception as exc: print(f"ERROR: {exc}", file=sys.stderr) return 1 print(f"Loaded {len(updates)} entries from {args.csv}") if args.dry_run: for entry_id, secrets_obj in updates.items(): print( json.dumps( {"id": entry_id, "secrets_obj": secrets_obj}, ensure_ascii=False, indent=2, ) ) return 0 try: session_id = initialize_session(url, auth, encryption_key, args.user_agent) entry_index = load_entry_index( url, auth, encryption_key, args.user_agent, session_id ) except Exception as exc: print(f"ERROR: {exc}", file=sys.stderr) return 1 success = 0 failures = 0 for request_id, (entry_id, secrets_obj) in enumerate(updates.items(), start=2): try: if entry_id not in entry_index: raise RuntimeError( f"entry id not found in secrets_find results: {entry_id}" ) entry_name, entry_folder = entry_index[entry_id] result = call_secrets_update( url, auth, encryption_key, args.user_agent, session_id, request_id, entry_id, entry_name, entry_folder, secrets_obj, ) if "error" in result: failures += 1 print( json.dumps( {"id": entry_id, "status": "error", "result": result}, ensure_ascii=False, ), file=sys.stderr, ) else: success += 1 print( json.dumps( {"id": entry_id, "status": "ok", "result": result}, ensure_ascii=False, ) ) except Exception as exc: failures += 1 print(f"{entry_id}: ERROR: {exc}", file=sys.stderr) print(f"Done. success={success} failure={failures}") return 0 if failures == 0 else 2 if __name__ == "__main__": raise SystemExit(main())