backend/
ref_actor.rs

1//! Per-ref actor that coordinates changes to a document ref's state.
2
3use std::time::Duration;
4
5use crate::app::{AppError, AppState, RefMsg, RefReply};
6use crate::document;
7use futures_util::stream::StreamExt;
8use samod::DocHandle;
9use tokio::sync::mpsc;
10use tokio::time::Instant;
11use uuid::Uuid;
12
13const SNAPSHOT_DEBOUNCE: Duration = Duration::from_millis(500);
14
15/// Ensures a ref actor is running for the given ref, spawning one if needed.
16pub async fn ensure_ref_actor(state: AppState, ref_id: Uuid, doc_handle: DocHandle) {
17    let mut actors = state.ref_actors.write().await;
18    if actors.contains_key(&ref_id) {
19        return;
20    }
21
22    let (tx, rx) = mpsc::channel(8);
23    actors.insert(ref_id, tx);
24    drop(actors);
25
26    tokio::spawn(run_ref_actor(state, ref_id, doc_handle, rx));
27}
28
29/// Gets the sender for the ref actor, starting one on demand if needed.
30async fn get_or_start_actor(
31    state: &AppState,
32    ref_id: Uuid,
33) -> Result<mpsc::Sender<(RefMsg, RefReply)>, AppError> {
34    if let Some(tx) = state.ref_actors.read().await.get(&ref_id).cloned() {
35        return Ok(tx);
36    }
37
38    let doc_id = document::get_doc_id(state.clone(), ref_id).await?;
39    let doc_handle = state
40        .repo
41        .find(doc_id)
42        .await?
43        .ok_or_else(|| AppError::Invalid("Document not found".to_string()))?;
44
45    ensure_ref_actor(state.clone(), ref_id, doc_handle).await;
46
47    state
48        .ref_actors
49        .read()
50        .await
51        .get(&ref_id)
52        .cloned()
53        .ok_or_else(|| AppError::Invalid(format!("Failed to start ref actor for {ref_id}")))
54}
55
56/// Send a message to the ref actor for `ref_id`, starting one if needed.
57pub async fn send_to_actor(state: &AppState, ref_id: Uuid, msg: RefMsg) -> Result<(), AppError> {
58    let tx = get_or_start_actor(state, ref_id).await?;
59
60    let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
61
62    tx.send((msg, reply_tx))
63        .await
64        .map_err(|_| AppError::Invalid(format!("Ref actor for {ref_id} stopped")))?;
65
66    reply_rx
67        .await
68        .map_err(|_| AppError::Invalid(format!("Ref actor for {ref_id} dropped reply")))?
69}
70
71/// The main actor loop for a single document ref.
72async fn run_ref_actor(
73    state: AppState,
74    ref_id: Uuid,
75    doc_handle: DocHandle,
76    mut rx: mpsc::Receiver<(RefMsg, RefReply)>,
77) {
78    let mut changes = doc_handle.changes();
79    let mut deadline: Option<Instant> = None;
80    let mut skip_changes: u32 = 0;
81
82    loop {
83        let sleep = match deadline {
84            Some(d) => tokio::time::sleep_until(d),
85            None => tokio::time::sleep(Duration::MAX),
86        };
87        tokio::pin!(sleep);
88
89        tokio::select! {
90            biased;
91
92            Some((msg, reply)) = rx.recv() => {
93                let result = match msg {
94                    RefMsg::CreateSnapshot => {
95                        deadline = None;
96                        document::create_snapshot(state.clone(), ref_id).await
97                    }
98                    RefMsg::LoadSnapshot { snapshot_id } => {
99                        deadline = None;
100                        skip_changes += 1;
101                        document::load_snapshot(
102                            &state, ref_id, snapshot_id, &doc_handle,
103                        ).await
104                    }
105                    RefMsg::Delete => {
106                        deadline = None;
107                        document::delete_ref(state.clone(), ref_id).await
108                    }
109                    RefMsg::Restore => {
110                        deadline = None;
111                        document::restore_ref(state.clone(), ref_id).await
112                    }
113                };
114
115                let _ = reply.send(result);
116            }
117
118            change = changes.next() => {
119                if change.is_none() {
120                    break;
121                }
122
123                if skip_changes > 0 {
124                    skip_changes -= 1;
125                    continue;
126                }
127
128                deadline = Some(Instant::now() + SNAPSHOT_DEBOUNCE);
129            }
130
131            _ = &mut sleep => {
132                deadline = None;
133                if let Err(e) = document::create_snapshot(state.clone(), ref_id).await {
134                    tracing::error!("Autosave snapshot failed for ref {}: {:?}", ref_id, e);
135                }
136            }
137        }
138    }
139
140    state.ref_actors.write().await.remove(&ref_id);
141}