- 拆分 web.rs 为 web/ 子模块;统一 client_ip 提取 - core: user_scope SQL 复用、env_map N+1 消除、FETCH_ALL 上限调整 - entries 列表页并行查询;PgPool 去 Arc;结构化 NotFound 等错误 - CI: SSH 私钥安全写入;crypto/hex 与依赖清理;MCP 输入长度校验 - AGENTS: API Key 明文存储设计说明
453 lines
14 KiB
Rust
453 lines
14 KiB
Rust
use std::collections::HashSet;
|
|
|
|
use anyhow::Result;
|
|
use serde_json::Value;
|
|
use sqlx::PgPool;
|
|
use uuid::Uuid;
|
|
|
|
use crate::db;
|
|
|
|
#[derive(Debug, serde::Serialize)]
|
|
pub struct RollbackResult {
|
|
pub name: String,
|
|
pub folder: String,
|
|
#[serde(rename = "type")]
|
|
pub entry_type: String,
|
|
pub restored_version: i64,
|
|
}
|
|
|
|
/// Roll back entry `name` to `to_version` (or the most recent snapshot if None).
|
|
/// `folder` is optional; if omitted and multiple entries share the name, an error is returned.
|
|
pub async fn run(
|
|
pool: &PgPool,
|
|
name: &str,
|
|
folder: Option<&str>,
|
|
to_version: Option<i64>,
|
|
user_id: Option<Uuid>,
|
|
) -> Result<RollbackResult> {
|
|
#[derive(sqlx::FromRow)]
|
|
struct EntryHistoryRow {
|
|
folder: String,
|
|
#[sqlx(rename = "type")]
|
|
entry_type: String,
|
|
version: i64,
|
|
action: String,
|
|
tags: Vec<String>,
|
|
metadata: Value,
|
|
}
|
|
|
|
// Disambiguate: find the unique entry_id for (name, folder).
|
|
// Query entries_history by entry_id once we know it; first resolve via name + optional folder.
|
|
let entry_id: Option<Uuid> = if let Some(uid) = user_id {
|
|
if let Some(f) = folder {
|
|
sqlx::query_scalar(
|
|
"SELECT DISTINCT entry_id FROM entries_history \
|
|
WHERE name = $1 AND folder = $2 AND user_id = $3 LIMIT 1",
|
|
)
|
|
.bind(name)
|
|
.bind(f)
|
|
.bind(uid)
|
|
.fetch_optional(pool)
|
|
.await?
|
|
} else {
|
|
let ids: Vec<Uuid> = sqlx::query_scalar(
|
|
"SELECT DISTINCT entry_id FROM entries_history \
|
|
WHERE name = $1 AND user_id = $2",
|
|
)
|
|
.bind(name)
|
|
.bind(uid)
|
|
.fetch_all(pool)
|
|
.await?;
|
|
match ids.len() {
|
|
0 => None,
|
|
1 => Some(ids[0]),
|
|
_ => {
|
|
let folders: Vec<String> = sqlx::query_scalar(
|
|
"SELECT DISTINCT folder FROM entries_history \
|
|
WHERE name = $1 AND user_id = $2",
|
|
)
|
|
.bind(name)
|
|
.bind(uid)
|
|
.fetch_all(pool)
|
|
.await?;
|
|
anyhow::bail!(
|
|
"Ambiguous: entries named '{}' exist in folders: [{}]. \
|
|
Specify 'folder' to disambiguate.",
|
|
name,
|
|
folders.join(", ")
|
|
)
|
|
}
|
|
}
|
|
}
|
|
} else if let Some(f) = folder {
|
|
sqlx::query_scalar(
|
|
"SELECT DISTINCT entry_id FROM entries_history \
|
|
WHERE name = $1 AND folder = $2 AND user_id IS NULL LIMIT 1",
|
|
)
|
|
.bind(name)
|
|
.bind(f)
|
|
.fetch_optional(pool)
|
|
.await?
|
|
} else {
|
|
let ids: Vec<Uuid> = sqlx::query_scalar(
|
|
"SELECT DISTINCT entry_id FROM entries_history \
|
|
WHERE name = $1 AND user_id IS NULL",
|
|
)
|
|
.bind(name)
|
|
.fetch_all(pool)
|
|
.await?;
|
|
match ids.len() {
|
|
0 => None,
|
|
1 => Some(ids[0]),
|
|
_ => {
|
|
let folders: Vec<String> = sqlx::query_scalar(
|
|
"SELECT DISTINCT folder FROM entries_history \
|
|
WHERE name = $1 AND user_id IS NULL",
|
|
)
|
|
.bind(name)
|
|
.fetch_all(pool)
|
|
.await?;
|
|
anyhow::bail!(
|
|
"Ambiguous: entries named '{}' exist in folders: [{}]. \
|
|
Specify 'folder' to disambiguate.",
|
|
name,
|
|
folders.join(", ")
|
|
)
|
|
}
|
|
}
|
|
};
|
|
|
|
let entry_id = entry_id.ok_or_else(|| anyhow::anyhow!("No history found for '{}'", name))?;
|
|
|
|
let snap: Option<EntryHistoryRow> = if let Some(ver) = to_version {
|
|
sqlx::query_as(
|
|
"SELECT folder, type, version, action, tags, metadata \
|
|
FROM entries_history \
|
|
WHERE entry_id = $1 AND version = $2 ORDER BY id ASC LIMIT 1",
|
|
)
|
|
.bind(entry_id)
|
|
.bind(ver)
|
|
.fetch_optional(pool)
|
|
.await?
|
|
} else {
|
|
sqlx::query_as(
|
|
"SELECT folder, type, version, action, tags, metadata \
|
|
FROM entries_history \
|
|
WHERE entry_id = $1 ORDER BY id DESC LIMIT 1",
|
|
)
|
|
.bind(entry_id)
|
|
.fetch_optional(pool)
|
|
.await?
|
|
};
|
|
|
|
let snap = snap.ok_or_else(|| {
|
|
anyhow::anyhow!(
|
|
"No history found for '{}'{}.",
|
|
name,
|
|
to_version
|
|
.map(|v| format!(" at version {}", v))
|
|
.unwrap_or_default()
|
|
)
|
|
})?;
|
|
|
|
let snap_secret_snapshot = db::entry_secret_snapshot_from_metadata(&snap.metadata);
|
|
let snap_metadata = db::strip_secret_snapshot_from_metadata(&snap.metadata);
|
|
|
|
let mut tx = pool.begin().await?;
|
|
|
|
#[derive(sqlx::FromRow)]
|
|
struct LiveEntry {
|
|
id: Uuid,
|
|
version: i64,
|
|
folder: String,
|
|
#[sqlx(rename = "type")]
|
|
entry_type: String,
|
|
tags: Vec<String>,
|
|
metadata: Value,
|
|
}
|
|
|
|
// Lock the live entry if it exists (matched by entry_id for precision).
|
|
let live: Option<LiveEntry> = sqlx::query_as(
|
|
"SELECT id, version, folder, type, tags, metadata FROM entries \
|
|
WHERE id = $1 FOR UPDATE",
|
|
)
|
|
.bind(entry_id)
|
|
.fetch_optional(&mut *tx)
|
|
.await?;
|
|
|
|
let live_entry_id = if let Some(ref lr) = live {
|
|
let history_metadata =
|
|
match db::metadata_with_secret_snapshot(&mut tx, lr.id, &lr.metadata).await {
|
|
Ok(v) => v,
|
|
Err(e) => {
|
|
tracing::warn!(error = %e, "failed to build secret snapshot for entry history");
|
|
lr.metadata.clone()
|
|
}
|
|
};
|
|
|
|
if let Err(e) = db::snapshot_entry_history(
|
|
&mut tx,
|
|
db::EntrySnapshotParams {
|
|
entry_id: lr.id,
|
|
user_id,
|
|
folder: &lr.folder,
|
|
entry_type: &lr.entry_type,
|
|
name,
|
|
version: lr.version,
|
|
action: "rollback",
|
|
tags: &lr.tags,
|
|
metadata: &history_metadata,
|
|
},
|
|
)
|
|
.await
|
|
{
|
|
tracing::warn!(error = %e, "failed to snapshot entry before rollback");
|
|
}
|
|
|
|
#[derive(sqlx::FromRow)]
|
|
struct LiveField {
|
|
id: Uuid,
|
|
name: String,
|
|
encrypted: Vec<u8>,
|
|
}
|
|
let live_fields: Vec<LiveField> = sqlx::query_as(
|
|
"SELECT s.id, s.name, s.encrypted \
|
|
FROM entry_secrets es \
|
|
JOIN secrets s ON s.id = es.secret_id \
|
|
WHERE es.entry_id = $1",
|
|
)
|
|
.bind(lr.id)
|
|
.fetch_all(&mut *tx)
|
|
.await?;
|
|
|
|
for f in &live_fields {
|
|
if let Err(e) = db::snapshot_secret_history(
|
|
&mut tx,
|
|
db::SecretSnapshotParams {
|
|
secret_id: f.id,
|
|
name: &f.name,
|
|
encrypted: &f.encrypted,
|
|
action: "rollback",
|
|
},
|
|
)
|
|
.await
|
|
{
|
|
tracing::warn!(error = %e, "failed to snapshot secret field before rollback");
|
|
}
|
|
}
|
|
|
|
sqlx::query(
|
|
"UPDATE entries SET folder = $1, type = $2, tags = $3, metadata = $4, version = version + 1, \
|
|
updated_at = NOW() WHERE id = $5",
|
|
)
|
|
.bind(&snap.folder)
|
|
.bind(&snap.entry_type)
|
|
.bind(&snap.tags)
|
|
.bind(&snap_metadata)
|
|
.bind(lr.id)
|
|
.execute(&mut *tx)
|
|
.await?;
|
|
|
|
lr.id
|
|
} else {
|
|
if let Some(uid) = user_id {
|
|
sqlx::query_scalar(
|
|
"INSERT INTO entries \
|
|
(user_id, folder, type, name, notes, tags, metadata, version, updated_at) \
|
|
VALUES ($1, $2, $3, $4, '', $5, $6, $7, NOW()) RETURNING id",
|
|
)
|
|
.bind(uid)
|
|
.bind(&snap.folder)
|
|
.bind(&snap.entry_type)
|
|
.bind(name)
|
|
.bind(&snap.tags)
|
|
.bind(&snap_metadata)
|
|
.bind(snap.version)
|
|
.fetch_one(&mut *tx)
|
|
.await?
|
|
} else {
|
|
sqlx::query_scalar(
|
|
"INSERT INTO entries \
|
|
(folder, type, name, notes, tags, metadata, version, updated_at) \
|
|
VALUES ($1, $2, $3, '', $4, $5, $6, NOW()) RETURNING id",
|
|
)
|
|
.bind(&snap.folder)
|
|
.bind(&snap.entry_type)
|
|
.bind(name)
|
|
.bind(&snap.tags)
|
|
.bind(&snap_metadata)
|
|
.bind(snap.version)
|
|
.fetch_one(&mut *tx)
|
|
.await?
|
|
}
|
|
};
|
|
|
|
if let Some(secret_snapshot) = snap_secret_snapshot {
|
|
restore_entry_secrets(&mut tx, live_entry_id, user_id, &secret_snapshot).await?;
|
|
}
|
|
|
|
crate::audit::log_tx(
|
|
&mut tx,
|
|
user_id,
|
|
"rollback",
|
|
&snap.folder,
|
|
&snap.entry_type,
|
|
name,
|
|
serde_json::json!({
|
|
"restored_version": snap.version,
|
|
"original_action": snap.action,
|
|
}),
|
|
)
|
|
.await;
|
|
|
|
tx.commit().await?;
|
|
|
|
Ok(RollbackResult {
|
|
name: name.to_string(),
|
|
folder: snap.folder,
|
|
entry_type: snap.entry_type,
|
|
restored_version: snap.version,
|
|
})
|
|
}
|
|
|
|
async fn restore_entry_secrets(
|
|
tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
|
|
entry_id: Uuid,
|
|
user_id: Option<Uuid>,
|
|
snapshot: &[db::EntrySecretSnapshot],
|
|
) -> Result<()> {
|
|
#[derive(sqlx::FromRow)]
|
|
struct LinkedSecret {
|
|
id: Uuid,
|
|
name: String,
|
|
encrypted: Vec<u8>,
|
|
}
|
|
|
|
let linked: Vec<LinkedSecret> = sqlx::query_as(
|
|
"SELECT s.id, s.name, s.encrypted \
|
|
FROM entry_secrets es \
|
|
JOIN secrets s ON s.id = es.secret_id \
|
|
WHERE es.entry_id = $1",
|
|
)
|
|
.bind(entry_id)
|
|
.fetch_all(&mut **tx)
|
|
.await?;
|
|
|
|
let target_names: HashSet<&str> = snapshot.iter().map(|s| s.name.as_str()).collect();
|
|
|
|
for s in &linked {
|
|
if target_names.contains(s.name.as_str()) {
|
|
continue;
|
|
}
|
|
if let Err(e) = db::snapshot_secret_history(
|
|
tx,
|
|
db::SecretSnapshotParams {
|
|
secret_id: s.id,
|
|
name: &s.name,
|
|
encrypted: &s.encrypted,
|
|
action: "rollback",
|
|
},
|
|
)
|
|
.await
|
|
{
|
|
tracing::warn!(error = %e, "failed to snapshot secret before rollback unlink");
|
|
}
|
|
|
|
sqlx::query("DELETE FROM entry_secrets WHERE entry_id = $1 AND secret_id = $2")
|
|
.bind(entry_id)
|
|
.bind(s.id)
|
|
.execute(&mut **tx)
|
|
.await?;
|
|
|
|
sqlx::query(
|
|
"DELETE FROM secrets s \
|
|
WHERE s.id = $1 \
|
|
AND NOT EXISTS (SELECT 1 FROM entry_secrets es WHERE es.secret_id = s.id)",
|
|
)
|
|
.bind(s.id)
|
|
.execute(&mut **tx)
|
|
.await?;
|
|
}
|
|
|
|
for snap in snapshot {
|
|
let encrypted = ::hex::decode(&snap.encrypted_hex).map_err(|e| {
|
|
anyhow::anyhow!("invalid secret snapshot data for '{}': {}", snap.name, e)
|
|
})?;
|
|
|
|
#[derive(sqlx::FromRow)]
|
|
struct ExistingSecret {
|
|
id: Uuid,
|
|
encrypted: Vec<u8>,
|
|
}
|
|
|
|
let existing: Option<ExistingSecret> = if let Some(uid) = user_id {
|
|
sqlx::query_as("SELECT id, encrypted FROM secrets WHERE user_id = $1 AND name = $2")
|
|
.bind(uid)
|
|
.bind(&snap.name)
|
|
.fetch_optional(&mut **tx)
|
|
.await?
|
|
} else {
|
|
sqlx::query_as("SELECT id, encrypted FROM secrets WHERE user_id IS NULL AND name = $1")
|
|
.bind(&snap.name)
|
|
.fetch_optional(&mut **tx)
|
|
.await?
|
|
};
|
|
|
|
let secret_id = if let Some(ex) = existing {
|
|
if ex.encrypted != encrypted
|
|
&& let Err(e) = db::snapshot_secret_history(
|
|
tx,
|
|
db::SecretSnapshotParams {
|
|
secret_id: ex.id,
|
|
name: &snap.name,
|
|
encrypted: &ex.encrypted,
|
|
action: "rollback",
|
|
},
|
|
)
|
|
.await
|
|
{
|
|
tracing::warn!(error = %e, "failed to snapshot secret before rollback restore");
|
|
}
|
|
sqlx::query(
|
|
"UPDATE secrets SET type = $1, encrypted = $2, version = version + 1, updated_at = NOW() \
|
|
WHERE id = $3",
|
|
)
|
|
.bind(&snap.secret_type)
|
|
.bind(&encrypted)
|
|
.bind(ex.id)
|
|
.execute(&mut **tx)
|
|
.await?;
|
|
ex.id
|
|
} else if let Some(uid) = user_id {
|
|
sqlx::query_scalar(
|
|
"INSERT INTO secrets (user_id, name, type, encrypted) VALUES ($1, $2, $3, $4) RETURNING id",
|
|
)
|
|
.bind(uid)
|
|
.bind(&snap.name)
|
|
.bind(&snap.secret_type)
|
|
.bind(&encrypted)
|
|
.fetch_one(&mut **tx)
|
|
.await?
|
|
} else {
|
|
sqlx::query_scalar(
|
|
"INSERT INTO secrets (user_id, name, type, encrypted) VALUES (NULL, $1, $2, $3) RETURNING id",
|
|
)
|
|
.bind(&snap.name)
|
|
.bind(&snap.secret_type)
|
|
.bind(&encrypted)
|
|
.fetch_one(&mut **tx)
|
|
.await?
|
|
};
|
|
|
|
sqlx::query(
|
|
"INSERT INTO entry_secrets (entry_id, secret_id) VALUES ($1, $2) ON CONFLICT DO NOTHING",
|
|
)
|
|
.bind(entry_id)
|
|
.bind(secret_id)
|
|
.execute(&mut **tx)
|
|
.await?;
|
|
}
|
|
|
|
Ok(())
|
|
}
|