1use chrono::{DateTime, Utc};
4use serde::{Deserialize, Serialize};
5use serde_json::Value;
6use socketioxide::SocketIo;
7use ts_rs::TS;
8use uuid::Uuid;
9
10use crate::app::{AppCtx, AppError, AppState, Paginated};
11use crate::{auth::PermissionLevel, user::UserSummary};
12
13pub async fn new_ref(ctx: AppCtx, content: Value) -> Result<Uuid, AppError> {
15 let ref_id = Uuid::now_v7();
16
17 let new_doc_response = create_automerge_doc(&ctx.state.automerge_io, content.clone()).await?;
21
22 let mut transaction = ctx.state.db.begin().await?;
23
24 let user_id = ctx.user.map(|user| user.user_id);
25 let insert_ref = sqlx::query!(
26 "
27 WITH snapshot AS (
28 INSERT INTO snapshots(for_ref, content, last_updated, doc_id)
29 VALUES ($1, $2, NOW(), $3)
30 RETURNING id
31 )
32 INSERT INTO refs(id, head, created)
33 VALUES ($1, (SELECT id FROM snapshot), NOW())
34 ",
35 ref_id,
36 new_doc_response.doc_json,
38 new_doc_response.doc_id,
39 );
40 insert_ref.execute(&mut *transaction).await?;
41
42 let insert_permission = sqlx::query!(
43 "
44 INSERT INTO permissions(subject, object, level)
45 VALUES ($1, $2, 'own')
46 ",
47 user_id,
48 ref_id,
49 );
50 insert_permission.execute(&mut *transaction).await?;
51
52 transaction.commit().await?;
53 Ok(ref_id)
54}
55
56pub async fn head_snapshot(state: AppState, ref_id: Uuid) -> Result<Value, AppError> {
58 let query = sqlx::query!(
59 "
60 SELECT content FROM snapshots
61 WHERE id = (SELECT head FROM refs WHERE id = $1)
62 ",
63 ref_id
64 );
65 Ok(query.fetch_one(&state.db).await?.content)
66}
67
68pub async fn autosave(state: AppState, data: RefContent) -> Result<(), AppError> {
70 let RefContent { ref_id, content } = data;
71 let query = sqlx::query!(
72 "
73 UPDATE snapshots
74 SET content = $2, last_updated = NOW()
75 WHERE id = (SELECT head FROM refs WHERE id = $1)
76 ",
77 ref_id,
78 content
79 );
80 query.execute(&state.db).await?;
81 Ok(())
82}
83
84pub async fn create_snapshot(state: AppState, ref_id: Uuid) -> Result<(), AppError> {
89 let head_doc_id_query = sqlx::query!(
90 "
91 SELECT doc_id FROM snapshots
92 WHERE id = (SELECT head FROM refs WHERE id = $1)
93 ",
94 ref_id
95 );
96
97 let head_doc_id = head_doc_id_query.fetch_one(&state.db).await?.doc_id;
98 let new_doc_response = clone_automerge_doc(&state.automerge_io, ref_id, head_doc_id).await?;
99
100 let query = sqlx::query!(
101 "
102 WITH snapshot AS (
103 INSERT INTO snapshots(for_ref, content, last_updated, doc_id)
104 VALUES ($1, $2, NOW(), $3)
105 RETURNING id
106 )
107 UPDATE refs
108 SET head = (SELECT id FROM snapshot)
109 WHERE id = $1
110 ",
111 ref_id,
112 new_doc_response.doc_json,
113 new_doc_response.doc_id,
114 );
115 query.execute(&state.db).await?;
116 Ok(())
117}
118
119pub async fn doc_id(state: AppState, ref_id: Uuid) -> Result<String, AppError> {
120 let query = sqlx::query!(
121 "
122 SELECT doc_id FROM snapshots
123 WHERE id = (SELECT head FROM refs WHERE id = $1)
124 ",
125 ref_id
126 );
127
128 let doc_id = query.fetch_one(&state.db).await?.doc_id;
129
130 start_listening_automerge_doc(&state.automerge_io, ref_id, doc_id.clone()).await?;
131
132 Ok(doc_id)
133}
134
135async fn call_automerge_io<T, P>(
136 automerge_io: &SocketIo,
137 event: impl Into<String>,
138 payload: P,
139 fail_msg: impl Into<String>,
140) -> Result<T, AppError>
141where
142 P: Serialize,
143 T: for<'de> serde::Deserialize<'de>,
144{
145 let event = event.into();
146 let fail_msg = fail_msg.into();
147
148 let ack = automerge_io
149 .emit_with_ack::<Vec<Result<T, String>>>(event, payload)
150 .map_err(|e| AppError::AutomergeServer(format!("{fail_msg}: {e}")))?;
151
152 let response_array = ack.await?.data;
153 let response = response_array
154 .into_iter()
155 .next()
156 .ok_or_else(|| AppError::AutomergeServer("Empty ack response".to_string()))?;
157
158 response.map_err(AppError::AutomergeServer)
159}
160
161async fn start_listening_automerge_doc(
162 automerge_io: &SocketIo,
163 ref_id: Uuid,
164 doc_id: String,
165) -> Result<(), AppError> {
166 call_automerge_io::<(), _>(
167 automerge_io,
168 "startListening",
169 [ref_id.to_string(), doc_id],
170 "Failed to call startListening from backend".to_string(),
171 )
172 .await
173}
174
175async fn clone_automerge_doc(
176 automerge_io: &SocketIo,
177 ref_id: Uuid,
178 doc_id: String,
179) -> Result<NewDocSocketResponse, AppError> {
180 call_automerge_io::<NewDocSocketResponse, _>(
181 automerge_io,
182 "cloneDoc",
183 [ref_id.to_string(), doc_id],
184 "Failed to call cloneDoc from backend".to_string(),
185 )
186 .await
187}
188
189async fn create_automerge_doc(
190 automerge_io: &SocketIo,
191 content: serde_json::Value,
192) -> Result<NewDocSocketResponse, AppError> {
193 call_automerge_io::<NewDocSocketResponse, _>(
194 automerge_io,
195 "createDoc",
196 content,
197 "Failed to call createDoc from backend".to_string(),
198 )
199 .await
200}
201
202#[derive(Debug, Serialize, Deserialize, TS)]
204pub struct RefContent {
205 #[serde(rename = "refId")]
206 pub ref_id: Uuid,
207 pub content: Value,
208}
209
210#[derive(Debug, Serialize, Deserialize)]
211pub struct NewDocSocketResponse {
212 #[serde(rename = "docId")]
213 pub doc_id: String,
214 #[serde(rename = "docJson")]
215 pub doc_json: Value,
216}
217
218#[derive(Clone, Debug, Serialize, Deserialize, TS)]
222pub struct RefStub {
223 pub name: String,
224 #[serde(rename = "typeName")]
225 pub type_name: String,
226 #[serde(rename = "refId")]
227 pub ref_id: Uuid,
228 #[serde(rename = "permissionLevel")]
230 pub permission_level: PermissionLevel,
231 pub owner: Option<UserSummary>,
232 #[serde(rename = "createdAt")]
233 pub created_at: DateTime<Utc>,
234}
235
236#[derive(Clone, Debug, Serialize, Deserialize, TS)]
238pub struct RefQueryParams {
239 #[serde(rename = "ownerUsernameQuery")]
240 pub owner_username_query: Option<String>,
241 #[serde(rename = "refNameQuery")]
242 pub ref_name_query: Option<String>,
243 #[serde(rename = "searcherMinLevel")]
244 pub searcher_min_level: Option<PermissionLevel>,
245 #[serde(rename = "includePublicDocuments")]
246 pub include_public_documents: Option<bool>,
247 pub limit: Option<i32>,
248 pub offset: Option<i32>,
249 }
251
252pub async fn search_ref_stubs(
255 ctx: AppCtx,
256 search_params: RefQueryParams,
257) -> Result<Paginated<RefStub>, AppError> {
258 let searcher_id = ctx.user.as_ref().map(|user| user.user_id.clone());
259
260 let min_level = search_params.searcher_min_level.unwrap_or(PermissionLevel::Read);
261
262 let limit = search_params.limit.unwrap_or(100);
263 let offset = search_params.offset.unwrap_or(0);
264
265 let results = sqlx::query!(
266 r#"
267 WITH effective_permissions AS (
268 /*
269 select at most one row per ref, the row is either:
270 - the searcher’s own permission, if it exists
271 - the public permission (subject IS NULL) when include_public_documents = TRUE and the
272 searcher does not already have a row
273 */
274 SELECT DISTINCT ON (object)
275 object,
276 level
277 FROM permissions
278 WHERE (subject = $1)
279 OR ($5 AND subject IS NULL)
280 ORDER BY object,
281 (subject IS NOT NULL) DESC -- prefer the user‑specific row
282 )
283 SELECT
284 refs.id AS ref_id,
285 snapshots.content->>'name' AS name,
286 snapshots.content->>'type' AS type_name,
287 refs.created as created_at,
288 effective_permissions.level AS "permission_level: PermissionLevel",
289 owner.id AS "owner_id?",
290 owner.username AS "owner_username?",
291 owner.display_name AS "owner_display_name?",
292 COUNT(*) OVER()::int4 AS total_count
293 FROM refs
294 JOIN snapshots ON snapshots.id = refs.head
295 JOIN effective_permissions ON effective_permissions.object = refs.id
296 JOIN permissions AS p_owner
297 ON p_owner.object = refs.id AND p_owner.level = 'own'
298 LEFT JOIN users AS owner
299 ON owner.id = p_owner.subject
300 WHERE (
301 owner.username = $2
302 OR $2 IS NULL
303 )
304 AND (
305 snapshots.content->>'name' ILIKE '%' || $3 || '%'
306 OR $3 IS NULL
307 )
308 AND (
309 effective_permissions.level >= $4
310 )
311 ORDER BY refs.created DESC
312 LIMIT $6::int4
313 OFFSET $7::int4;
314 "#,
315 searcher_id,
316 search_params.owner_username_query,
317 search_params.ref_name_query,
318 min_level as PermissionLevel,
319 search_params.include_public_documents.unwrap_or(false),
320 limit,
321 offset,
322 )
323 .fetch_all(&ctx.state.db)
324 .await?;
325
326 let total = results.first().and_then(|r| r.total_count).unwrap_or(0);
327
328 let items = results
330 .into_iter()
331 .map(|row| RefStub {
332 ref_id: row.ref_id,
333 name: row.name.unwrap_or_else(|| "untitled".to_string()),
334 type_name: row.type_name.expect("type_name should never be null"),
335 permission_level: row.permission_level,
336 created_at: row.created_at,
337 owner: match row.owner_id {
338 Some(id) => Some(UserSummary {
339 id,
340 username: row.owner_username,
341 display_name: row.owner_display_name,
342 }),
343 _ => None,
344 },
345 })
346 .collect();
347
348 Ok(Paginated {
349 total,
350 offset,
351 items,
352 })
353}