catcolab_backend/
document.rs

1//! Procedures to create and manipulate documents.
2
3use chrono::{DateTime, Utc};
4use serde::{Deserialize, Serialize};
5use serde_json::Value;
6use socketioxide::SocketIo;
7use ts_rs::TS;
8use uuid::Uuid;
9
10use crate::app::{AppCtx, AppError, AppState, Paginated};
11use crate::{auth::PermissionLevel, user::UserSummary};
12
13/// Creates a new document ref with initial content.
14pub async fn new_ref(ctx: AppCtx, content: Value) -> Result<Uuid, AppError> {
15    let ref_id = Uuid::now_v7();
16
17    // If the document is created but the db transaction doesn't complete, then the document will be
18    // orphaned. The only negative consequence of that is additional space used, but that should be
19    // negligible and we can later create a service which periodically cleans out the orphans
20    let new_doc_response = create_automerge_doc(&ctx.state.automerge_io, content.clone()).await?;
21
22    let mut transaction = ctx.state.db.begin().await?;
23
24    let user_id = ctx.user.map(|user| user.user_id);
25    let insert_ref = sqlx::query!(
26        "
27        WITH snapshot AS (
28            INSERT INTO snapshots(for_ref, content, last_updated, doc_id)
29            VALUES ($1, $2, NOW(), $3)
30            RETURNING id
31        )
32        INSERT INTO refs(id, head, created)
33        VALUES ($1, (SELECT id FROM snapshot), NOW())
34        ",
35        ref_id,
36        // Use the JSON provided by automerge as the authoritative content
37        new_doc_response.doc_json,
38        new_doc_response.doc_id,
39    );
40    insert_ref.execute(&mut *transaction).await?;
41
42    let insert_permission = sqlx::query!(
43        "
44        INSERT INTO permissions(subject, object, level)
45        VALUES ($1, $2, 'own')
46        ",
47        user_id,
48        ref_id,
49    );
50    insert_permission.execute(&mut *transaction).await?;
51
52    transaction.commit().await?;
53    Ok(ref_id)
54}
55
56/// Gets the content of the head snapshot for a document ref.
57pub async fn head_snapshot(state: AppState, ref_id: Uuid) -> Result<Value, AppError> {
58    let query = sqlx::query!(
59        "
60        SELECT content FROM snapshots
61        WHERE id = (SELECT head FROM refs WHERE id = $1)
62        ",
63        ref_id
64    );
65    Ok(query.fetch_one(&state.db).await?.content)
66}
67
68/// Saves the document by overwriting the snapshot at the current head.
69pub async fn autosave(state: AppState, data: RefContent) -> Result<(), AppError> {
70    let RefContent { ref_id, content } = data;
71    let query = sqlx::query!(
72        "
73        UPDATE snapshots
74        SET content = $2, last_updated = NOW()
75        WHERE id = (SELECT head FROM refs WHERE id = $1)
76        ",
77        ref_id,
78        content
79    );
80    query.execute(&state.db).await?;
81    Ok(())
82}
83
84/** Saves the document by replacing the head with a new snapshot.
85
86The snapshot at the previous head is *not* deleted.
87*/
88pub async fn create_snapshot(state: AppState, ref_id: Uuid) -> Result<(), AppError> {
89    let head_doc_id_query = sqlx::query!(
90        "
91        SELECT doc_id FROM snapshots
92        WHERE id = (SELECT head FROM refs WHERE id = $1)
93        ",
94        ref_id
95    );
96
97    let head_doc_id = head_doc_id_query.fetch_one(&state.db).await?.doc_id;
98    let new_doc_response = clone_automerge_doc(&state.automerge_io, ref_id, head_doc_id).await?;
99
100    let query = sqlx::query!(
101        "
102        WITH snapshot AS (
103            INSERT INTO snapshots(for_ref, content, last_updated, doc_id)
104            VALUES ($1, $2, NOW(), $3)
105            RETURNING id
106        )
107        UPDATE refs
108        SET head = (SELECT id FROM snapshot)
109        WHERE id = $1
110        ",
111        ref_id,
112        new_doc_response.doc_json,
113        new_doc_response.doc_id,
114    );
115    query.execute(&state.db).await?;
116    Ok(())
117}
118
119pub async fn doc_id(state: AppState, ref_id: Uuid) -> Result<String, AppError> {
120    let query = sqlx::query!(
121        "
122        SELECT doc_id FROM snapshots
123        WHERE id = (SELECT head FROM refs WHERE id = $1)
124        ",
125        ref_id
126    );
127
128    let doc_id = query.fetch_one(&state.db).await?.doc_id;
129
130    start_listening_automerge_doc(&state.automerge_io, ref_id, doc_id.clone()).await?;
131
132    Ok(doc_id)
133}
134
135async fn call_automerge_io<T, P>(
136    automerge_io: &SocketIo,
137    event: impl Into<String>,
138    payload: P,
139    fail_msg: impl Into<String>,
140) -> Result<T, AppError>
141where
142    P: Serialize,
143    T: for<'de> serde::Deserialize<'de>,
144{
145    let event = event.into();
146    let fail_msg = fail_msg.into();
147
148    let ack = automerge_io
149        .emit_with_ack::<Vec<Result<T, String>>>(event, payload)
150        .map_err(|e| AppError::AutomergeServer(format!("{fail_msg}: {e}")))?;
151
152    let response_array = ack.await?.data;
153    let response = response_array
154        .into_iter()
155        .next()
156        .ok_or_else(|| AppError::AutomergeServer("Empty ack response".to_string()))?;
157
158    response.map_err(AppError::AutomergeServer)
159}
160
161async fn start_listening_automerge_doc(
162    automerge_io: &SocketIo,
163    ref_id: Uuid,
164    doc_id: String,
165) -> Result<(), AppError> {
166    call_automerge_io::<(), _>(
167        automerge_io,
168        "startListening",
169        [ref_id.to_string(), doc_id],
170        "Failed to call startListening from backend".to_string(),
171    )
172    .await
173}
174
175async fn clone_automerge_doc(
176    automerge_io: &SocketIo,
177    ref_id: Uuid,
178    doc_id: String,
179) -> Result<NewDocSocketResponse, AppError> {
180    call_automerge_io::<NewDocSocketResponse, _>(
181        automerge_io,
182        "cloneDoc",
183        [ref_id.to_string(), doc_id],
184        "Failed to call cloneDoc from backend".to_string(),
185    )
186    .await
187}
188
189async fn create_automerge_doc(
190    automerge_io: &SocketIo,
191    content: serde_json::Value,
192) -> Result<NewDocSocketResponse, AppError> {
193    call_automerge_io::<NewDocSocketResponse, _>(
194        automerge_io,
195        "createDoc",
196        content,
197        "Failed to call createDoc from backend".to_string(),
198    )
199    .await
200}
201
202/// A document ref along with its content.
203#[derive(Debug, Serialize, Deserialize, TS)]
204pub struct RefContent {
205    #[serde(rename = "refId")]
206    pub ref_id: Uuid,
207    pub content: Value,
208}
209
210#[derive(Debug, Serialize, Deserialize)]
211pub struct NewDocSocketResponse {
212    #[serde(rename = "docId")]
213    pub doc_id: String,
214    #[serde(rename = "docJson")]
215    pub doc_json: Value,
216}
217
218/// A subset of user relevant information about a ref. Used for showing
219/// users information on a variety of refs without having to load whole
220/// refs.
221#[derive(Clone, Debug, Serialize, Deserialize, TS)]
222pub struct RefStub {
223    pub name: String,
224    #[serde(rename = "typeName")]
225    pub type_name: String,
226    #[serde(rename = "refId")]
227    pub ref_id: Uuid,
228    // permission level that the current user has on this ref
229    #[serde(rename = "permissionLevel")]
230    pub permission_level: PermissionLevel,
231    pub owner: Option<UserSummary>,
232    #[serde(rename = "createdAt")]
233    pub created_at: DateTime<Utc>,
234}
235
236/// Parameters for filtering a search of refs
237#[derive(Clone, Debug, Serialize, Deserialize, TS)]
238pub struct RefQueryParams {
239    #[serde(rename = "ownerUsernameQuery")]
240    pub owner_username_query: Option<String>,
241    #[serde(rename = "refNameQuery")]
242    pub ref_name_query: Option<String>,
243    #[serde(rename = "searcherMinLevel")]
244    pub searcher_min_level: Option<PermissionLevel>,
245    #[serde(rename = "includePublicDocuments")]
246    pub include_public_documents: Option<bool>,
247    pub limit: Option<i32>,
248    pub offset: Option<i32>,
249    // TODO: add param for document type
250}
251
252/// Searches for `RefStub`s that the current user has permission to access,
253/// returning lightweight metadata about each matching ref
254pub async fn search_ref_stubs(
255    ctx: AppCtx,
256    search_params: RefQueryParams,
257) -> Result<Paginated<RefStub>, AppError> {
258    let searcher_id = ctx.user.as_ref().map(|user| user.user_id.clone());
259
260    let min_level = search_params.searcher_min_level.unwrap_or(PermissionLevel::Read);
261
262    let limit = search_params.limit.unwrap_or(100);
263    let offset = search_params.offset.unwrap_or(0);
264
265    let results = sqlx::query!(
266        r#"
267        WITH effective_permissions AS (
268            /*
269              select at most one row per ref, the row is either:
270               - the searcher’s own permission, if it exists
271               - the public permission (subject IS NULL) when include_public_documents = TRUE and the
272                 searcher does not already have a row
273            */
274            SELECT DISTINCT ON (object)
275                   object,
276                   level
277            FROM   permissions
278            WHERE  (subject = $1)
279               OR  ($5 AND subject IS NULL)
280            ORDER BY object,
281                     (subject IS NOT NULL) DESC           -- prefer the user‑specific row
282        )
283        SELECT 
284            refs.id AS ref_id,
285            snapshots.content->>'name' AS name,
286            snapshots.content->>'type' AS type_name,
287            refs.created as created_at,
288            effective_permissions.level AS "permission_level: PermissionLevel",
289            owner.id AS "owner_id?",
290            owner.username AS "owner_username?",
291            owner.display_name AS "owner_display_name?",
292            COUNT(*) OVER()::int4 AS total_count
293        FROM refs
294        JOIN snapshots ON snapshots.id = refs.head
295        JOIN effective_permissions ON effective_permissions.object = refs.id
296        JOIN permissions AS p_owner 
297            ON p_owner.object = refs.id AND p_owner.level = 'own'
298        LEFT JOIN users AS owner
299            ON owner.id = p_owner.subject
300        WHERE (
301            owner.username = $2
302            OR $2 IS NULL
303        )
304        AND (
305            snapshots.content->>'name' ILIKE '%' || $3 || '%'
306            OR $3 IS NULL
307        )
308        AND (
309            effective_permissions.level >= $4
310        )
311        ORDER BY refs.created DESC
312        LIMIT $6::int4
313        OFFSET $7::int4;
314        "#,
315        searcher_id,
316        search_params.owner_username_query,
317        search_params.ref_name_query,
318        min_level as PermissionLevel,
319        search_params.include_public_documents.unwrap_or(false),
320        limit,
321        offset,
322    )
323    .fetch_all(&ctx.state.db)
324    .await?;
325
326    let total = results.first().and_then(|r| r.total_count).unwrap_or(0);
327
328    // We can't use sqlx::query_as! because name and type_name can be null
329    let items = results
330        .into_iter()
331        .map(|row| RefStub {
332            ref_id: row.ref_id,
333            name: row.name.unwrap_or_else(|| "untitled".to_string()),
334            type_name: row.type_name.expect("type_name should never be null"),
335            permission_level: row.permission_level,
336            created_at: row.created_at,
337            owner: match row.owner_id {
338                Some(id) => Some(UserSummary {
339                    id,
340                    username: row.owner_username,
341                    display_name: row.owner_display_name,
342                }),
343                _ => None,
344            },
345        })
346        .collect();
347
348    Ok(Paginated {
349        total,
350        offset,
351        items,
352    })
353}