use anyhow::Result; use serde_json::Value; use sqlx::PgPool; use sqlx::postgres::PgPoolOptions; use crate::audit::current_actor; pub async fn create_pool(database_url: &str) -> Result { tracing::debug!("connecting to database"); let pool = PgPoolOptions::new() .max_connections(5) .acquire_timeout(std::time::Duration::from_secs(5)) .connect(database_url) .await?; tracing::debug!("database connection established"); Ok(pool) } pub async fn migrate(pool: &PgPool) -> Result<()> { tracing::debug!("running migrations"); sqlx::raw_sql( r#" -- ── entries: top-level entities (server, service, key, …) ────────────── CREATE TABLE IF NOT EXISTS entries ( id UUID PRIMARY KEY DEFAULT uuidv7(), namespace VARCHAR(64) NOT NULL, kind VARCHAR(64) NOT NULL, name VARCHAR(256) NOT NULL, tags TEXT[] NOT NULL DEFAULT '{}', metadata JSONB NOT NULL DEFAULT '{}', version BIGINT NOT NULL DEFAULT 1, created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), UNIQUE(namespace, kind, name) ); CREATE INDEX IF NOT EXISTS idx_entries_namespace ON entries(namespace); CREATE INDEX IF NOT EXISTS idx_entries_kind ON entries(kind); CREATE INDEX IF NOT EXISTS idx_entries_tags ON entries USING GIN(tags); CREATE INDEX IF NOT EXISTS idx_entries_metadata ON entries USING GIN(metadata jsonb_path_ops); -- ── secrets: one row per encrypted field, plaintext schema metadata ──── CREATE TABLE IF NOT EXISTS secrets ( id UUID PRIMARY KEY DEFAULT uuidv7(), entry_id UUID NOT NULL REFERENCES entries(id) ON DELETE CASCADE, field_name VARCHAR(256) NOT NULL, field_type VARCHAR(32) NOT NULL DEFAULT 'string', value_len INT NOT NULL DEFAULT 0, encrypted BYTEA NOT NULL DEFAULT '\x', version BIGINT NOT NULL DEFAULT 1, created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), UNIQUE(entry_id, field_name) ); CREATE INDEX IF NOT EXISTS idx_secrets_entry_id ON secrets(entry_id); -- ── kv_config: global key-value store (Argon2id salt, etc.) ──────────── CREATE TABLE IF NOT EXISTS kv_config ( key TEXT PRIMARY KEY, value BYTEA NOT NULL ); -- ── audit_log: append-only operation log ──────────────────────────────── CREATE TABLE IF NOT EXISTS audit_log ( id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, action VARCHAR(32) NOT NULL, namespace VARCHAR(64) NOT NULL, kind VARCHAR(64) NOT NULL, name VARCHAR(256) NOT NULL, detail JSONB NOT NULL DEFAULT '{}', actor VARCHAR(128) NOT NULL DEFAULT '', created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() ); CREATE INDEX IF NOT EXISTS idx_audit_log_created ON audit_log(created_at DESC); CREATE INDEX IF NOT EXISTS idx_audit_log_ns_kind ON audit_log(namespace, kind); -- ── entries_history: entry-level snapshot (tags + metadata) ───────────── CREATE TABLE IF NOT EXISTS entries_history ( id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, entry_id UUID NOT NULL, namespace VARCHAR(64) NOT NULL, kind VARCHAR(64) NOT NULL, name VARCHAR(256) NOT NULL, version BIGINT NOT NULL, action VARCHAR(16) NOT NULL, tags TEXT[] NOT NULL DEFAULT '{}', metadata JSONB NOT NULL DEFAULT '{}', actor VARCHAR(128) NOT NULL DEFAULT '', created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() ); CREATE INDEX IF NOT EXISTS idx_entries_history_entry_id ON entries_history(entry_id, version DESC); CREATE INDEX IF NOT EXISTS idx_entries_history_ns_kind_name ON entries_history(namespace, kind, name, version DESC); -- ── secrets_history: field-level snapshot ─────────────────────────────── CREATE TABLE IF NOT EXISTS secrets_history ( id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, entry_id UUID NOT NULL, secret_id UUID NOT NULL, entry_version BIGINT NOT NULL, field_name VARCHAR(256) NOT NULL, field_type VARCHAR(32) NOT NULL DEFAULT 'string', value_len INT NOT NULL DEFAULT 0, encrypted BYTEA NOT NULL DEFAULT '\x', action VARCHAR(16) NOT NULL, actor VARCHAR(128) NOT NULL DEFAULT '', created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() ); CREATE INDEX IF NOT EXISTS idx_secrets_history_entry_id ON secrets_history(entry_id, entry_version DESC); CREATE INDEX IF NOT EXISTS idx_secrets_history_secret_id ON secrets_history(secret_id); "#, ) .execute(pool) .await?; tracing::debug!("migrations complete"); Ok(()) } // ── Entry-level history snapshot ──────────────────────────────────────────── pub struct EntrySnapshotParams<'a> { pub entry_id: uuid::Uuid, pub namespace: &'a str, pub kind: &'a str, pub name: &'a str, pub version: i64, pub action: &'a str, pub tags: &'a [String], pub metadata: &'a Value, } /// Snapshot an entry row into `entries_history` before a write operation. pub async fn snapshot_entry_history( tx: &mut sqlx::Transaction<'_, sqlx::Postgres>, p: EntrySnapshotParams<'_>, ) -> Result<()> { let actor = current_actor(); sqlx::query( "INSERT INTO entries_history \ (entry_id, namespace, kind, name, version, action, tags, metadata, actor) \ VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)", ) .bind(p.entry_id) .bind(p.namespace) .bind(p.kind) .bind(p.name) .bind(p.version) .bind(p.action) .bind(p.tags) .bind(p.metadata) .bind(&actor) .execute(&mut **tx) .await?; Ok(()) } // ── Secret field-level history snapshot ───────────────────────────────────── pub struct SecretSnapshotParams<'a> { pub entry_id: uuid::Uuid, pub secret_id: uuid::Uuid, pub entry_version: i64, pub field_name: &'a str, pub field_type: &'a str, pub value_len: i32, pub encrypted: &'a [u8], pub action: &'a str, } /// Snapshot a single secret field into `secrets_history`. pub async fn snapshot_secret_history( tx: &mut sqlx::Transaction<'_, sqlx::Postgres>, p: SecretSnapshotParams<'_>, ) -> Result<()> { let actor = current_actor(); sqlx::query( "INSERT INTO secrets_history \ (entry_id, secret_id, entry_version, field_name, field_type, value_len, encrypted, action, actor) \ VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)", ) .bind(p.entry_id) .bind(p.secret_id) .bind(p.entry_version) .bind(p.field_name) .bind(p.field_type) .bind(p.value_len) .bind(p.encrypted) .bind(p.action) .bind(&actor) .execute(&mut **tx) .await?; Ok(()) } // ── Argon2 salt helpers ────────────────────────────────────────────────────── /// Load the Argon2id salt from the database. pub async fn load_argon2_salt(pool: &PgPool) -> Result>> { let row: Option<(Vec,)> = sqlx::query_as("SELECT value FROM kv_config WHERE key = 'argon2_salt'") .fetch_optional(pool) .await?; Ok(row.map(|(v,)| v)) } /// Store the Argon2id salt in the database (only called once on first device init). pub async fn store_argon2_salt(pool: &PgPool, salt: &[u8]) -> Result<()> { sqlx::query( "INSERT INTO kv_config (key, value) VALUES ('argon2_salt', $1) \ ON CONFLICT (key) DO NOTHING", ) .bind(salt) .execute(pool) .await?; Ok(()) }