backend/
user_state_updates.rs

1//! User-state update helpers called from RPC handlers after mutations.
2
3use crate::app::{AppError, AppState};
4use crate::user_state::read_user_state_from_db;
5
6/// Re-read the full user state from the database and reconcile it into the
7/// user's Automerge doc.
8///
9/// No-op if the user has not been initialized (i.e. the user has never called
10/// `get_or_create_user_state_doc`).
11pub async fn update_user_state(state: &AppState, user_id: &str) -> Result<(), AppError> {
12    let doc_id = {
13        let initialized = state.initialized_user_states.read().await;
14        match initialized.get(user_id) {
15            Some(id) => id.clone(),
16            None => return Ok(()),
17        }
18    };
19
20    tracing::debug!(
21        user_id = %user_id,
22        doc_id = %doc_id,
23        "Updating user state for user",
24    );
25
26    let user_state = read_user_state_from_db(user_id.to_string(), &state.db).await?;
27
28    let doc_handle =
29        state.repo.find(doc_id).await?.ok_or_else(|| {
30            AppError::UserStateSync("User state doc not found in repo".to_string())
31        })?;
32
33    doc_handle.with_document(|doc| user_state.reconcile_into(doc))?;
34
35    Ok(())
36}
37
38/// Update state for all initialized users who hold permissions on a ref.
39///
40/// `extra_user_ids` allows callers to include additional users who should be
41/// updated (e.g. users whose permissions were revoked and who therefore no
42/// longer appear in the permissions table).
43pub async fn update_ref_for_users(
44    state: &AppState,
45    ref_id: uuid::Uuid,
46    extra_user_ids: Vec<String>,
47) -> Result<(), AppError> {
48    let mut holders: Vec<String> = sqlx::query_scalar::<_, String>(
49        "SELECT DISTINCT subject FROM permissions WHERE object = $1 AND subject IS NOT NULL",
50    )
51    .bind(ref_id)
52    .fetch_all(&state.db)
53    .await?;
54
55    holders.extend(extra_user_ids);
56
57    update_initialized_users(state, holders).await;
58    Ok(())
59}
60
61/// Update state for a user and all users who share documents with them.
62///
63/// Called after a profile update so that:
64/// - The user's own `profile` field is updated.
65/// - Other users' `known_users` entries are updated.
66pub async fn update_profile_for_users(state: &AppState, user_id: &str) -> Result<(), AppError> {
67    let mut affected: Vec<String> = sqlx::query_scalar::<_, String>(
68        r#"
69        SELECT DISTINCT p2.subject
70        FROM permissions p1
71        JOIN permissions p2 ON p1.object = p2.object
72        WHERE p1.subject = $1
73          AND p2.subject IS NOT NULL
74          AND p2.subject != $1
75        "#,
76    )
77    .bind(user_id)
78    .fetch_all(&state.db)
79    .await?;
80
81    affected.push(user_id.to_string());
82
83    update_initialized_users(state, affected).await;
84    Ok(())
85}
86
87/// Update each initialized user in the given set.
88async fn update_initialized_users(state: &AppState, user_ids: Vec<String>) {
89    let initialized = state.initialized_user_states.read().await;
90    let to_update: Vec<String> = user_ids
91        .into_iter()
92        .collect::<std::collections::HashSet<_>>()
93        .into_iter()
94        .filter(|uid| initialized.contains_key(uid))
95        .collect();
96
97    for user_id in to_update {
98        if let Err(e) = update_user_state(state, &user_id).await {
99            tracing::error!(%user_id, error = %e, "Failed to update user state");
100        }
101    }
102}