backend/
user_state_updates.rs1use crate::app::{AppError, AppState};
4use crate::user_state::read_user_state_from_db;
5
6pub async fn update_user_state(state: &AppState, user_id: &str) -> Result<(), AppError> {
12 let doc_id = {
13 let initialized = state.initialized_user_states.read().await;
14 match initialized.get(user_id) {
15 Some(id) => id.clone(),
16 None => return Ok(()),
17 }
18 };
19
20 tracing::debug!(
21 user_id = %user_id,
22 doc_id = %doc_id,
23 "Updating user state for user",
24 );
25
26 let user_state = read_user_state_from_db(user_id.to_string(), &state.db).await?;
27
28 let doc_handle =
29 state.repo.find(doc_id).await?.ok_or_else(|| {
30 AppError::UserStateSync("User state doc not found in repo".to_string())
31 })?;
32
33 doc_handle.with_document(|doc| user_state.reconcile_into(doc))?;
34
35 Ok(())
36}
37
38pub async fn update_ref_for_users(
44 state: &AppState,
45 ref_id: uuid::Uuid,
46 extra_user_ids: Vec<String>,
47) -> Result<(), AppError> {
48 let mut holders: Vec<String> = sqlx::query_scalar::<_, String>(
49 "SELECT DISTINCT subject FROM permissions WHERE object = $1 AND subject IS NOT NULL",
50 )
51 .bind(ref_id)
52 .fetch_all(&state.db)
53 .await?;
54
55 holders.extend(extra_user_ids);
56
57 update_initialized_users(state, holders).await;
58 Ok(())
59}
60
61pub async fn update_profile_for_users(state: &AppState, user_id: &str) -> Result<(), AppError> {
67 let mut affected: Vec<String> = sqlx::query_scalar::<_, String>(
68 r#"
69 SELECT DISTINCT p2.subject
70 FROM permissions p1
71 JOIN permissions p2 ON p1.object = p2.object
72 WHERE p1.subject = $1
73 AND p2.subject IS NOT NULL
74 AND p2.subject != $1
75 "#,
76 )
77 .bind(user_id)
78 .fetch_all(&state.db)
79 .await?;
80
81 affected.push(user_id.to_string());
82
83 update_initialized_users(state, affected).await;
84 Ok(())
85}
86
87async fn update_initialized_users(state: &AppState, user_ids: Vec<String>) {
89 let initialized = state.initialized_user_states.read().await;
90 let to_update: Vec<String> = user_ids
91 .into_iter()
92 .collect::<std::collections::HashSet<_>>()
93 .into_iter()
94 .filter(|uid| initialized.contains_key(uid))
95 .collect();
96
97 for user_id in to_update {
98 if let Err(e) = update_user_state(state, &user_id).await {
99 tracing::error!(%user_id, error = %e, "Failed to update user state");
100 }
101 }
102}