use anyhow::Result; use serde_json::{Map, Value}; use sqlx::PgPool; use uuid::Uuid; use crate::crypto; use crate::db; use crate::models::EntryRow; use crate::service::add::{ collect_field_paths, collect_key_paths, flatten_json_fields, insert_path, parse_key_path, parse_kv, remove_path, }; #[derive(Debug, serde::Serialize)] pub struct UpdateResult { pub namespace: String, pub kind: String, pub name: String, pub add_tags: Vec, pub remove_tags: Vec, pub meta_keys: Vec, pub remove_meta: Vec, pub secret_keys: Vec, pub remove_secrets: Vec, } pub struct UpdateParams<'a> { pub namespace: &'a str, pub kind: &'a str, pub name: &'a str, pub add_tags: &'a [String], pub remove_tags: &'a [String], pub meta_entries: &'a [String], pub remove_meta: &'a [String], pub secret_entries: &'a [String], pub remove_secrets: &'a [String], pub user_id: Option, } pub async fn run( pool: &PgPool, params: UpdateParams<'_>, master_key: &[u8; 32], ) -> Result { let mut tx = pool.begin().await?; let row: Option = if let Some(uid) = params.user_id { sqlx::query_as( "SELECT id, version, tags, metadata FROM entries \ WHERE user_id = $1 AND namespace = $2 AND kind = $3 AND name = $4 FOR UPDATE", ) .bind(uid) .bind(params.namespace) .bind(params.kind) .bind(params.name) .fetch_optional(&mut *tx) .await? } else { sqlx::query_as( "SELECT id, version, tags, metadata FROM entries \ WHERE user_id IS NULL AND namespace = $1 AND kind = $2 AND name = $3 FOR UPDATE", ) .bind(params.namespace) .bind(params.kind) .bind(params.name) .fetch_optional(&mut *tx) .await? }; let row = row.ok_or_else(|| { anyhow::anyhow!( "Not found: [{}/{}] {}. Use `add` to create it first.", params.namespace, params.kind, params.name ) })?; if let Err(e) = db::snapshot_entry_history( &mut tx, db::EntrySnapshotParams { entry_id: row.id, user_id: params.user_id, namespace: params.namespace, kind: params.kind, name: params.name, version: row.version, action: "update", tags: &row.tags, metadata: &row.metadata, }, ) .await { tracing::warn!(error = %e, "failed to snapshot entry history before update"); } let mut tags: Vec = row.tags.clone(); for t in params.add_tags { if !tags.contains(t) { tags.push(t.clone()); } } tags.retain(|t| !params.remove_tags.contains(t)); let mut meta_map: Map = match row.metadata.clone() { Value::Object(m) => m, _ => Map::new(), }; for entry in params.meta_entries { let (path, value) = parse_kv(entry)?; insert_path(&mut meta_map, &path, value)?; } for key in params.remove_meta { let path = parse_key_path(key)?; remove_path(&mut meta_map, &path)?; } let metadata = Value::Object(meta_map); let result = sqlx::query( "UPDATE entries SET tags = $1, metadata = $2, version = version + 1, updated_at = NOW() \ WHERE id = $3 AND version = $4", ) .bind(&tags) .bind(&metadata) .bind(row.id) .bind(row.version) .execute(&mut *tx) .await?; if result.rows_affected() == 0 { tx.rollback().await?; anyhow::bail!( "Concurrent modification detected for [{}/{}] {}. Please retry.", params.namespace, params.kind, params.name ); } let new_version = row.version + 1; for entry in params.secret_entries { let (path, field_value) = parse_kv(entry)?; let flat = flatten_json_fields("", &{ let mut m = Map::new(); insert_path(&mut m, &path, field_value)?; Value::Object(m) }); for (field_name, fv) in &flat { let encrypted = crypto::encrypt_json(master_key, fv)?; #[derive(sqlx::FromRow)] struct ExistingField { id: Uuid, encrypted: Vec, } let ef: Option = sqlx::query_as( "SELECT id, encrypted FROM secrets WHERE entry_id = $1 AND field_name = $2", ) .bind(row.id) .bind(field_name) .fetch_optional(&mut *tx) .await?; if let Some(ef) = &ef && let Err(e) = db::snapshot_secret_history( &mut tx, db::SecretSnapshotParams { entry_id: row.id, secret_id: ef.id, entry_version: row.version, field_name, encrypted: &ef.encrypted, action: "update", }, ) .await { tracing::warn!(error = %e, "failed to snapshot secret field history"); } sqlx::query( "INSERT INTO secrets (entry_id, field_name, encrypted) VALUES ($1, $2, $3) \ ON CONFLICT (entry_id, field_name) DO UPDATE SET \ encrypted = EXCLUDED.encrypted, version = secrets.version + 1, updated_at = NOW()", ) .bind(row.id) .bind(field_name) .bind(&encrypted) .execute(&mut *tx) .await?; } } for key in params.remove_secrets { let path = parse_key_path(key)?; let field_name = path.join("."); #[derive(sqlx::FromRow)] struct FieldToDelete { id: Uuid, encrypted: Vec, } let field: Option = sqlx::query_as( "SELECT id, encrypted FROM secrets WHERE entry_id = $1 AND field_name = $2", ) .bind(row.id) .bind(&field_name) .fetch_optional(&mut *tx) .await?; if let Some(f) = field { if let Err(e) = db::snapshot_secret_history( &mut tx, db::SecretSnapshotParams { entry_id: row.id, secret_id: f.id, entry_version: new_version, field_name: &field_name, encrypted: &f.encrypted, action: "delete", }, ) .await { tracing::warn!(error = %e, "failed to snapshot secret field history before delete"); } sqlx::query("DELETE FROM secrets WHERE id = $1") .bind(f.id) .execute(&mut *tx) .await?; } } let meta_keys = collect_key_paths(params.meta_entries)?; let remove_meta_keys = collect_field_paths(params.remove_meta)?; let secret_keys = collect_key_paths(params.secret_entries)?; let remove_secret_keys = collect_field_paths(params.remove_secrets)?; crate::audit::log_tx( &mut tx, "update", params.namespace, params.kind, params.name, serde_json::json!({ "add_tags": params.add_tags, "remove_tags": params.remove_tags, "meta_keys": meta_keys, "remove_meta": remove_meta_keys, "secret_keys": secret_keys, "remove_secrets": remove_secret_keys, }), ) .await; tx.commit().await?; Ok(UpdateResult { namespace: params.namespace.to_string(), kind: params.kind.to_string(), name: params.name.to_string(), add_tags: params.add_tags.to_vec(), remove_tags: params.remove_tags.to_vec(), meta_keys, remove_meta: remove_meta_keys, secret_keys, remove_secrets: remove_secret_keys, }) }