diff --git a/.gitignore b/.gitignore index 0c995d8..7d7c8ae 100644 --- a/.gitignore +++ b/.gitignore @@ -5,4 +5,5 @@ *.pem tmp/ client_secret_*.apps.googleusercontent.com.json -node_modules/ \ No newline at end of file +node_modules/ +*.pyc \ No newline at end of file diff --git a/scripts/repair-secrets.template.csv b/scripts/repair-secrets.template.csv new file mode 100644 index 0000000..efff45d --- /dev/null +++ b/scripts/repair-secrets.template.csv @@ -0,0 +1 @@ +entry_id,secret_name,secret_value \ No newline at end of file diff --git a/scripts/repair_secrets_from_csv.py b/scripts/repair_secrets_from_csv.py new file mode 100644 index 0000000..f9b5bd6 --- /dev/null +++ b/scripts/repair_secrets_from_csv.py @@ -0,0 +1,383 @@ +#!/usr/bin/env python3 +""" +Batch re-encrypt secret fields from a CSV file. + +CSV format: + entry_id,secret_name,secret_value + 019d...,api_key,sk-xxxx + 019d...,password,hunter2 + +The script groups rows by entry_id, then calls `secrets_update` with `secrets_obj` +so the server re-encrypts the provided plaintext values with the current key. + +Warnings: +- Keep the CSV outside version control whenever possible. +- Delete the filled CSV after the repair is complete. +""" + +from __future__ import annotations + +import argparse +import csv +import json +import sys +import urllib.error +import urllib.request +from collections import OrderedDict +from pathlib import Path +from typing import Any + + +DEFAULT_USER_AGENT = "Cursor/3.0.12 (darwin arm64)" +REQUIRED_COLUMNS = {"entry_id", "secret_name", "secret_value"} + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser( + description="Repair secret ciphertexts by re-submitting plaintext via secrets_update." + ) + parser.add_argument( + "--csv", + required=True, + help="Path to CSV file with columns: entry_id,secret_name,secret_value", + ) + parser.add_argument( + "--mcp-json", + default=str(Path.home() / ".cursor" / "mcp.json"), + help="Path to mcp.json used to resolve URL and headers", + ) + parser.add_argument( + "--server", + default="secrets", + help="MCP server name inside mcp.json (default: secrets)", + ) + parser.add_argument("--url", help="Override MCP URL") + parser.add_argument("--auth", help="Override Authorization header value") + parser.add_argument("--encryption-key", help="Override X-Encryption-Key header value") + parser.add_argument( + "--user-agent", + default=DEFAULT_USER_AGENT, + help=f"User-Agent header (default: {DEFAULT_USER_AGENT})", + ) + parser.add_argument( + "--dry-run", + action="store_true", + help="Parse and print grouped updates without sending requests", + ) + return parser.parse_args() + + +def load_mcp_config(path: str, server_name: str) -> dict[str, Any]: + data = json.loads(Path(path).read_text(encoding="utf-8")) + servers = data.get("mcpServers", {}) + if server_name not in servers: + raise KeyError(f"Server '{server_name}' not found in {path}") + return servers[server_name] + + +def resolve_connection_settings(args: argparse.Namespace) -> tuple[str, str, str]: + server = load_mcp_config(args.mcp_json, args.server) + headers = server.get("headers", {}) + + url = args.url or server.get("url") + auth = args.auth or headers.get("Authorization") + encryption_key = args.encryption_key or headers.get("X-Encryption-Key") + + if not url: + raise ValueError("Missing MCP URL. Pass --url or configure it in mcp.json.") + if not auth: + raise ValueError( + "Missing Authorization header. Pass --auth or configure it in mcp.json." + ) + if not encryption_key: + raise ValueError( + "Missing X-Encryption-Key. Pass --encryption-key or configure it in mcp.json." + ) + + return url, auth, encryption_key + + +def load_updates(csv_path: str) -> OrderedDict[str, OrderedDict[str, str]]: + grouped: OrderedDict[str, OrderedDict[str, str]] = OrderedDict() + + with Path(csv_path).open("r", encoding="utf-8-sig", newline="") as fh: + reader = csv.DictReader(fh) + fieldnames = set(reader.fieldnames or []) + missing = REQUIRED_COLUMNS - fieldnames + if missing: + raise ValueError( + "CSV missing required columns: " + ", ".join(sorted(missing)) + ) + + for line_no, row in enumerate(reader, start=2): + entry_id = (row.get("entry_id") or "").strip() + secret_name = (row.get("secret_name") or "").strip() + secret_value = row.get("secret_value") or "" + + if not entry_id and not secret_name and not secret_value: + continue + if not entry_id: + raise ValueError(f"Line {line_no}: entry_id is required") + if not secret_name: + raise ValueError(f"Line {line_no}: secret_name is required") + + entry_group = grouped.setdefault(entry_id, OrderedDict()) + if secret_name in entry_group: + raise ValueError( + f"Line {line_no}: duplicate secret_name '{secret_name}' for entry_id '{entry_id}'" + ) + entry_group[secret_name] = secret_value + + if not grouped: + raise ValueError("CSV contains no updates") + + return grouped + + +def post_json( + url: str, + payload: dict[str, Any], + auth: str, + encryption_key: str, + user_agent: str, + session_id: str | None = None, +) -> tuple[int, str | None, str]: + headers = { + "Content-Type": "application/json", + "Accept": "application/json, text/event-stream", + "Authorization": auth, + "X-Encryption-Key": encryption_key, + "User-Agent": user_agent, + } + if session_id: + headers["mcp-session-id"] = session_id + + req = urllib.request.Request( + url, + data=json.dumps(payload).encode("utf-8"), + headers=headers, + method="POST", + ) + try: + with urllib.request.urlopen(req, timeout=30) as resp: + return ( + resp.status, + resp.headers.get("mcp-session-id") or session_id, + resp.read().decode("utf-8"), + ) + except urllib.error.HTTPError as exc: + body = exc.read().decode("utf-8", errors="replace") + return exc.code, session_id, body + + +def parse_sse_json(body: str) -> list[dict[str, Any]]: + items: list[dict[str, Any]] = [] + for line in body.splitlines(): + if line.startswith("data: {"): + items.append(json.loads(line[6:])) + return items + + +def initialize_session( + url: str, auth: str, encryption_key: str, user_agent: str +) -> str: + status, session_id, body = post_json( + url, + { + "jsonrpc": "2.0", + "id": 1, + "method": "initialize", + "params": { + "protocolVersion": "2025-06-18", + "capabilities": {}, + "clientInfo": {"name": "repair-script", "version": "1.0"}, + }, + }, + auth, + encryption_key, + user_agent, + ) + if status != 200 or not session_id: + raise RuntimeError(f"initialize failed: status={status}, body={body[:500]}") + + status, _, body = post_json( + url, + {"jsonrpc": "2.0", "method": "notifications/initialized", "params": {}}, + auth, + encryption_key, + user_agent, + session_id, + ) + if status not in (200, 202): + raise RuntimeError( + f"notifications/initialized failed: status={status}, body={body[:500]}" + ) + return session_id + + +def load_entry_index( + url: str, auth: str, encryption_key: str, user_agent: str, session_id: str +) -> dict[str, tuple[str, str]]: + status, _, body = post_json( + url, + { + "jsonrpc": "2.0", + "id": 999_001, + "method": "tools/call", + "params": { + "name": "secrets_find", + "arguments": { + "limit": 1000, + }, + }, + }, + auth, + encryption_key, + user_agent, + session_id, + ) + items = parse_sse_json(body) + last = items[-1] if items else {"raw": body[:1000]} + if status != 200: + raise RuntimeError( + f"secrets_find failed: status={status}, body={body[:500]}" + ) + if "error" in last: + raise RuntimeError(f"secrets_find returned error: {last}") + + content = last.get("result", {}).get("content", []) + if not content: + raise RuntimeError("secrets_find returned no content") + payload = json.loads(content[0]["text"]) + + index: dict[str, tuple[str, str]] = {} + for entry in payload.get("entries", []): + entry_id = entry.get("id") + name = entry.get("name") + folder = entry.get("folder", "") + if entry_id and name is not None: + index[entry_id] = (name, folder) + return index + + +def call_secrets_update( + url: str, + auth: str, + encryption_key: str, + user_agent: str, + session_id: str, + request_id: int, + entry_id: str, + entry_name: str, + entry_folder: str, + secrets_obj: dict[str, str], +) -> dict[str, Any]: + payload = { + "jsonrpc": "2.0", + "id": request_id, + "method": "tools/call", + "params": { + "name": "secrets_update", + "arguments": { + "id": entry_id, + "name": entry_name, + "folder": entry_folder, + "secrets_obj": secrets_obj, + # Pass the key as an argument too, so repair can still work + # even when a client/proxy mishandles custom headers. + "encryption_key": encryption_key, + }, + }, + } + status, _, body = post_json( + url, payload, auth, encryption_key, user_agent, session_id + ) + items = parse_sse_json(body) + last = items[-1] if items else {"raw": body[:1000]} + if status != 200: + raise RuntimeError( + f"secrets_update failed for {entry_id}: status={status}, body={body[:500]}" + ) + return last + + +def main() -> int: + args = parse_args() + + try: + url, auth, encryption_key = resolve_connection_settings(args) + updates = load_updates(args.csv) + except Exception as exc: + print(f"ERROR: {exc}", file=sys.stderr) + return 1 + + print(f"Loaded {len(updates)} entries from {args.csv}") + + if args.dry_run: + for entry_id, secrets_obj in updates.items(): + print( + json.dumps( + {"id": entry_id, "secrets_obj": secrets_obj}, + ensure_ascii=False, + indent=2, + ) + ) + return 0 + + try: + session_id = initialize_session(url, auth, encryption_key, args.user_agent) + entry_index = load_entry_index( + url, auth, encryption_key, args.user_agent, session_id + ) + except Exception as exc: + print(f"ERROR: {exc}", file=sys.stderr) + return 1 + + success = 0 + failures = 0 + for request_id, (entry_id, secrets_obj) in enumerate(updates.items(), start=2): + try: + if entry_id not in entry_index: + raise RuntimeError( + f"entry id not found in secrets_find results: {entry_id}" + ) + entry_name, entry_folder = entry_index[entry_id] + result = call_secrets_update( + url, + auth, + encryption_key, + args.user_agent, + session_id, + request_id, + entry_id, + entry_name, + entry_folder, + secrets_obj, + ) + if "error" in result: + failures += 1 + print( + json.dumps( + {"id": entry_id, "status": "error", "result": result}, + ensure_ascii=False, + ), + file=sys.stderr, + ) + else: + success += 1 + print( + json.dumps( + {"id": entry_id, "status": "ok", "result": result}, + ensure_ascii=False, + ) + ) + except Exception as exc: + failures += 1 + print(f"{entry_id}: ERROR: {exc}", file=sys.stderr) + + print(f"Done. success={success} failure={failures}") + return 0 if failures == 0 else 2 + + +if __name__ == "__main__": + raise SystemExit(main())