1use chrono::{DateTime, Utc};
4use serde::{Deserialize, Serialize};
5use serde_json::Value;
6use socketioxide::SocketIo;
7use ts_rs::TS;
8use uuid::Uuid;
9
10use crate::app::{AppCtx, AppError, AppState, Paginated};
11use crate::{auth::PermissionLevel, user::UserSummary};
12
13pub async fn new_ref(ctx: AppCtx, content: Value) -> Result<Uuid, AppError> {
15 let _validated_doc: notebook_types::VersionedDocument = serde_json::from_value(content.clone())
17 .map_err(|e| AppError::Invalid(format!("Failed to parse document: {}", e)))?;
18
19 let ref_id = Uuid::now_v7();
20
21 let new_doc_response = create_automerge_doc(&ctx.state.automerge_io, content.clone()).await?;
25
26 let mut transaction = ctx.state.db.begin().await?;
27
28 let user_id = ctx.user.map(|user| user.user_id);
29 let insert_ref = sqlx::query!(
30 "
31 WITH snapshot AS (
32 INSERT INTO snapshots(for_ref, content, last_updated, doc_id)
33 VALUES ($1, $2, NOW(), $3)
34 RETURNING id
35 )
36 INSERT INTO refs(id, head, created)
37 VALUES ($1, (SELECT id FROM snapshot), NOW())
38 ",
39 ref_id,
40 new_doc_response.doc_json,
42 new_doc_response.doc_id,
43 );
44 insert_ref.execute(&mut *transaction).await?;
45
46 let insert_permission = sqlx::query!(
47 "
48 INSERT INTO permissions(subject, object, level)
49 VALUES ($1, $2, 'own')
50 ",
51 user_id,
52 ref_id,
53 );
54 insert_permission.execute(&mut *transaction).await?;
55
56 transaction.commit().await?;
57 Ok(ref_id)
58}
59
60pub async fn head_snapshot(state: AppState, ref_id: Uuid) -> Result<Value, AppError> {
62 let query = sqlx::query!(
63 "
64 SELECT content FROM snapshots
65 WHERE id = (SELECT head FROM refs WHERE id = $1)
66 ",
67 ref_id
68 );
69 Ok(query.fetch_one(&state.db).await?.content)
70}
71
72pub async fn autosave(state: AppState, data: RefContent) -> Result<(), AppError> {
74 let RefContent { ref_id, content } = data;
75 let query = sqlx::query!(
76 "
77 UPDATE snapshots
78 SET content = $2, last_updated = NOW()
79 WHERE id = (SELECT head FROM refs WHERE id = $1)
80 ",
81 ref_id,
82 content
83 );
84 query.execute(&state.db).await?;
85 Ok(())
86}
87
88pub async fn create_snapshot(state: AppState, ref_id: Uuid) -> Result<(), AppError> {
92 let head_doc_id_query = sqlx::query!(
93 "
94 SELECT doc_id FROM snapshots
95 WHERE id = (SELECT head FROM refs WHERE id = $1)
96 ",
97 ref_id
98 );
99
100 let head_doc_id = head_doc_id_query.fetch_one(&state.db).await?.doc_id;
101 let new_doc_response = clone_automerge_doc(&state.automerge_io, ref_id, head_doc_id).await?;
102
103 let query = sqlx::query!(
104 "
105 WITH snapshot AS (
106 INSERT INTO snapshots(for_ref, content, last_updated, doc_id)
107 VALUES ($1, $2, NOW(), $3)
108 RETURNING id
109 )
110 UPDATE refs
111 SET head = (SELECT id FROM snapshot)
112 WHERE id = $1
113 ",
114 ref_id,
115 new_doc_response.doc_json,
116 new_doc_response.doc_id,
117 );
118 query.execute(&state.db).await?;
119 Ok(())
120}
121
122pub async fn doc_id(state: AppState, ref_id: Uuid) -> Result<String, AppError> {
123 let query = sqlx::query!(
124 "
125 SELECT doc_id FROM snapshots
126 WHERE id = (SELECT head FROM refs WHERE id = $1)
127 ",
128 ref_id
129 );
130
131 let doc_id = query.fetch_one(&state.db).await?.doc_id;
132
133 start_listening_automerge_doc(&state.automerge_io, ref_id, doc_id.clone()).await?;
134
135 Ok(doc_id)
136}
137
138async fn call_automerge_io<T, P>(
139 automerge_io: &SocketIo,
140 event: impl Into<String>,
141 payload: P,
142 fail_msg: impl Into<String>,
143) -> Result<T, AppError>
144where
145 P: Serialize,
146 T: for<'de> serde::Deserialize<'de>,
147{
148 let event = event.into();
149 let fail_msg = fail_msg.into();
150
151 let ack = automerge_io
152 .emit_with_ack::<Vec<Result<T, String>>>(event, payload)
153 .map_err(|e| AppError::AutomergeServer(format!("{fail_msg}: {e}")))?;
154
155 let response_array = ack.await?.data;
156 let response = response_array
157 .into_iter()
158 .next()
159 .ok_or_else(|| AppError::AutomergeServer("Empty ack response".to_string()))?;
160
161 response.map_err(AppError::AutomergeServer)
162}
163
164async fn start_listening_automerge_doc(
165 automerge_io: &SocketIo,
166 ref_id: Uuid,
167 doc_id: String,
168) -> Result<(), AppError> {
169 call_automerge_io::<(), _>(
170 automerge_io,
171 "startListening",
172 [ref_id.to_string(), doc_id],
173 "Failed to call startListening from backend".to_string(),
174 )
175 .await
176}
177
178async fn clone_automerge_doc(
179 automerge_io: &SocketIo,
180 ref_id: Uuid,
181 doc_id: String,
182) -> Result<NewDocSocketResponse, AppError> {
183 call_automerge_io::<NewDocSocketResponse, _>(
184 automerge_io,
185 "cloneDoc",
186 [ref_id.to_string(), doc_id],
187 "Failed to call cloneDoc from backend".to_string(),
188 )
189 .await
190}
191
192async fn create_automerge_doc(
193 automerge_io: &SocketIo,
194 content: serde_json::Value,
195) -> Result<NewDocSocketResponse, AppError> {
196 call_automerge_io::<NewDocSocketResponse, _>(
197 automerge_io,
198 "createDoc",
199 content,
200 "Failed to call createDoc from backend".to_string(),
201 )
202 .await
203}
204
205#[derive(Debug, Serialize, Deserialize, TS)]
207pub struct RefContent {
208 #[serde(rename = "refId")]
209 pub ref_id: Uuid,
210 pub content: Value,
211}
212
213#[derive(Debug, Serialize, Deserialize)]
214pub struct NewDocSocketResponse {
215 #[serde(rename = "docId")]
216 pub doc_id: String,
217 #[serde(rename = "docJson")]
218 pub doc_json: Value,
219}
220
221#[derive(Clone, Debug, Serialize, Deserialize, TS)]
224pub struct RefStub {
225 pub name: String,
226 #[serde(rename = "typeName")]
227 pub type_name: String,
228 #[serde(rename = "refId")]
229 pub ref_id: Uuid,
230 #[serde(rename = "permissionLevel")]
232 pub permission_level: PermissionLevel,
233 pub owner: Option<UserSummary>,
234 #[serde(rename = "createdAt")]
235 pub created_at: DateTime<Utc>,
236}
237
238#[derive(Clone, Debug, Serialize, Deserialize, TS)]
240pub struct RefQueryParams {
241 #[serde(rename = "ownerUsernameQuery")]
242 pub owner_username_query: Option<String>,
243 #[serde(rename = "refNameQuery")]
244 pub ref_name_query: Option<String>,
245 #[serde(rename = "searcherMinLevel")]
246 pub searcher_min_level: Option<PermissionLevel>,
247 #[serde(rename = "includePublicDocuments")]
248 pub include_public_documents: Option<bool>,
249 pub limit: Option<i32>,
250 pub offset: Option<i32>,
251 }
253
254pub async fn search_ref_stubs(
257 ctx: AppCtx,
258 search_params: RefQueryParams,
259) -> Result<Paginated<RefStub>, AppError> {
260 let searcher_id = ctx.user.as_ref().map(|user| user.user_id.clone());
261
262 let min_level = search_params.searcher_min_level.unwrap_or(PermissionLevel::Read);
263
264 let limit = search_params.limit.unwrap_or(100);
265 let offset = search_params.offset.unwrap_or(0);
266
267 let results = sqlx::query!(
268 r#"
269 WITH effective_permissions AS (
270 /*
271 select at most one row per ref, the row is either:
272 - the searcher’s own permission, if it exists
273 - the public permission (subject IS NULL) when include_public_documents = TRUE and the
274 searcher does not already have a row
275 */
276 SELECT DISTINCT ON (object)
277 object,
278 level
279 FROM permissions
280 WHERE (subject = $1)
281 OR ($5 AND subject IS NULL)
282 ORDER BY object,
283 (subject IS NOT NULL) DESC -- prefer the user‑specific row
284 )
285 SELECT
286 refs.id AS ref_id,
287 snapshots.content->>'name' AS name,
288 snapshots.content->>'type' AS type_name,
289 refs.created as created_at,
290 effective_permissions.level AS "permission_level: PermissionLevel",
291 owner.id AS "owner_id?",
292 owner.username AS "owner_username?",
293 owner.display_name AS "owner_display_name?",
294 COUNT(*) OVER()::int4 AS total_count
295 FROM refs
296 JOIN snapshots ON snapshots.id = refs.head
297 JOIN effective_permissions ON effective_permissions.object = refs.id
298 JOIN permissions AS p_owner
299 ON p_owner.object = refs.id AND p_owner.level = 'own'
300 LEFT JOIN users AS owner
301 ON owner.id = p_owner.subject
302 WHERE (
303 owner.username = $2
304 OR $2 IS NULL
305 )
306 AND (
307 snapshots.content->>'name' ILIKE '%' || $3 || '%'
308 OR $3 IS NULL
309 )
310 AND (
311 effective_permissions.level >= $4
312 )
313 ORDER BY refs.created DESC
314 LIMIT $6::int4
315 OFFSET $7::int4;
316 "#,
317 searcher_id,
318 search_params.owner_username_query,
319 search_params.ref_name_query,
320 min_level as PermissionLevel,
321 search_params.include_public_documents.unwrap_or(false),
322 limit,
323 offset,
324 )
325 .fetch_all(&ctx.state.db)
326 .await?;
327
328 let total = results.first().and_then(|r| r.total_count).unwrap_or(0);
329
330 let items = results
332 .into_iter()
333 .map(|row| RefStub {
334 ref_id: row.ref_id,
335 name: row.name.unwrap_or_else(|| "untitled".to_string()),
336 type_name: row.type_name.expect("type_name should never be null"),
337 permission_level: row.permission_level,
338 created_at: row.created_at,
339 owner: match row.owner_id {
340 Some(id) => Some(UserSummary {
341 id,
342 username: row.owner_username,
343 display_name: row.owner_display_name,
344 }),
345 _ => None,
346 },
347 })
348 .collect();
349
350 Ok(Paginated {
351 total,
352 offset,
353 items,
354 })
355}