1use std::time::Duration;
4
5use crate::app::{AppError, AppState, RefMsg, RefReply};
6use crate::document;
7use futures_util::stream::StreamExt;
8use samod::DocHandle;
9use tokio::sync::mpsc;
10use tokio::time::Instant;
11use uuid::Uuid;
12
13const SNAPSHOT_DEBOUNCE: Duration = Duration::from_millis(500);
14
15pub async fn ensure_ref_actor(state: AppState, ref_id: Uuid, doc_handle: DocHandle) {
17 let mut actors = state.ref_actors.write().await;
18 if actors.contains_key(&ref_id) {
19 return;
20 }
21
22 let (tx, rx) = mpsc::channel(8);
23 actors.insert(ref_id, tx);
24 drop(actors);
25
26 tokio::spawn(run_ref_actor(state, ref_id, doc_handle, rx));
27}
28
29async fn get_or_start_actor(
31 state: &AppState,
32 ref_id: Uuid,
33) -> Result<mpsc::Sender<(RefMsg, RefReply)>, AppError> {
34 if let Some(tx) = state.ref_actors.read().await.get(&ref_id).cloned() {
35 return Ok(tx);
36 }
37
38 let doc_id = document::get_doc_id(state.clone(), ref_id).await?;
39 let doc_handle = state
40 .repo
41 .find(doc_id)
42 .await?
43 .ok_or_else(|| AppError::Invalid("Document not found".to_string()))?;
44
45 ensure_ref_actor(state.clone(), ref_id, doc_handle).await;
46
47 state
48 .ref_actors
49 .read()
50 .await
51 .get(&ref_id)
52 .cloned()
53 .ok_or_else(|| AppError::Invalid(format!("Failed to start ref actor for {ref_id}")))
54}
55
56pub async fn send_to_actor(state: &AppState, ref_id: Uuid, msg: RefMsg) -> Result<(), AppError> {
58 let tx = get_or_start_actor(state, ref_id).await?;
59
60 let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
61
62 tx.send((msg, reply_tx))
63 .await
64 .map_err(|_| AppError::Invalid(format!("Ref actor for {ref_id} stopped")))?;
65
66 reply_rx
67 .await
68 .map_err(|_| AppError::Invalid(format!("Ref actor for {ref_id} dropped reply")))?
69}
70
71async fn run_ref_actor(
73 state: AppState,
74 ref_id: Uuid,
75 doc_handle: DocHandle,
76 mut rx: mpsc::Receiver<(RefMsg, RefReply)>,
77) {
78 let mut changes = doc_handle.changes();
79 let mut deadline: Option<Instant> = None;
80 let mut skip_changes: u32 = 0;
81
82 loop {
83 let sleep = match deadline {
84 Some(d) => tokio::time::sleep_until(d),
85 None => tokio::time::sleep(Duration::MAX),
86 };
87 tokio::pin!(sleep);
88
89 tokio::select! {
90 biased;
91
92 Some((msg, reply)) = rx.recv() => {
93 let result = match msg {
94 RefMsg::CreateSnapshot => {
95 deadline = None;
96 document::create_snapshot(state.clone(), ref_id).await
97 }
98 RefMsg::LoadSnapshot { snapshot_id } => {
99 deadline = None;
100 skip_changes += 1;
101 document::load_snapshot(
102 &state, ref_id, snapshot_id, &doc_handle,
103 ).await
104 }
105 RefMsg::Delete => {
106 deadline = None;
107 document::delete_ref(state.clone(), ref_id).await
108 }
109 RefMsg::Restore => {
110 deadline = None;
111 document::restore_ref(state.clone(), ref_id).await
112 }
113 };
114
115 let _ = reply.send(result);
116 }
117
118 change = changes.next() => {
119 if change.is_none() {
120 break;
121 }
122
123 if skip_changes > 0 {
124 skip_changes -= 1;
125 continue;
126 }
127
128 deadline = Some(Instant::now() + SNAPSHOT_DEBOUNCE);
129 }
130
131 _ = &mut sleep => {
132 deadline = None;
133 if let Err(e) = document::create_snapshot(state.clone(), ref_id).await {
134 tracing::error!("Autosave snapshot failed for ref {}: {:?}", ref_id, e);
135 }
136 }
137 }
138 }
139
140 state.ref_actors.write().await.remove(&ref_id);
141}