backend/
document.rs

1//! Procedures to create and manipulate documents.
2
3use chrono::{DateTime, Utc};
4use serde::{Deserialize, Serialize};
5use serde_json::Value;
6use socketioxide::SocketIo;
7use ts_rs::TS;
8use uuid::Uuid;
9
10use crate::app::{AppCtx, AppError, AppState, Paginated};
11use crate::{auth::PermissionLevel, user::UserSummary};
12
13/// Creates a new document ref with initial content.
14pub async fn new_ref(ctx: AppCtx, content: Value) -> Result<Uuid, AppError> {
15    // Validate document structure by attempting to deserialize it
16    let _validated_doc: notebook_types::VersionedDocument = serde_json::from_value(content.clone())
17        .map_err(|e| AppError::Invalid(format!("Failed to parse document: {}", e)))?;
18
19    let ref_id = Uuid::now_v7();
20
21    // If the document is created but the db transaction doesn't complete, then the document will be
22    // orphaned. The only negative consequence of that is additional space used, but that should be
23    // negligible and we can later create a service which periodically cleans out the orphans
24    let new_doc_response = create_automerge_doc(&ctx.state.automerge_io, content.clone()).await?;
25
26    let mut transaction = ctx.state.db.begin().await?;
27
28    let user_id = ctx.user.map(|user| user.user_id);
29    let insert_ref = sqlx::query!(
30        "
31        WITH snapshot AS (
32            INSERT INTO snapshots(for_ref, content, last_updated, doc_id)
33            VALUES ($1, $2, NOW(), $3)
34            RETURNING id
35        )
36        INSERT INTO refs(id, head, created)
37        VALUES ($1, (SELECT id FROM snapshot), NOW())
38        ",
39        ref_id,
40        // Use the JSON provided by automerge as the authoritative content
41        new_doc_response.doc_json,
42        new_doc_response.doc_id,
43    );
44    insert_ref.execute(&mut *transaction).await?;
45
46    let insert_permission = sqlx::query!(
47        "
48        INSERT INTO permissions(subject, object, level)
49        VALUES ($1, $2, 'own')
50        ",
51        user_id,
52        ref_id,
53    );
54    insert_permission.execute(&mut *transaction).await?;
55
56    transaction.commit().await?;
57    Ok(ref_id)
58}
59
60/// Gets the content of the head snapshot for a document ref.
61pub async fn head_snapshot(state: AppState, ref_id: Uuid) -> Result<Value, AppError> {
62    let query = sqlx::query!(
63        "
64        SELECT content FROM snapshots
65        WHERE id = (SELECT head FROM refs WHERE id = $1)
66        ",
67        ref_id
68    );
69    Ok(query.fetch_one(&state.db).await?.content)
70}
71
72/// Saves the document by overwriting the snapshot at the current head.
73pub async fn autosave(state: AppState, data: RefContent) -> Result<(), AppError> {
74    let RefContent { ref_id, content } = data;
75    let query = sqlx::query!(
76        "
77        UPDATE snapshots
78        SET content = $2, last_updated = NOW()
79        WHERE id = (SELECT head FROM refs WHERE id = $1)
80        ",
81        ref_id,
82        content
83    );
84    query.execute(&state.db).await?;
85    Ok(())
86}
87
88/// Saves the document by replacing the head with a new snapshot.
89///
90/// The snapshot at the previous head is *not* deleted.
91pub async fn create_snapshot(state: AppState, ref_id: Uuid) -> Result<(), AppError> {
92    let head_doc_id_query = sqlx::query!(
93        "
94        SELECT doc_id FROM snapshots
95        WHERE id = (SELECT head FROM refs WHERE id = $1)
96        ",
97        ref_id
98    );
99
100    let head_doc_id = head_doc_id_query.fetch_one(&state.db).await?.doc_id;
101    let new_doc_response = clone_automerge_doc(&state.automerge_io, ref_id, head_doc_id).await?;
102
103    let query = sqlx::query!(
104        "
105        WITH snapshot AS (
106            INSERT INTO snapshots(for_ref, content, last_updated, doc_id)
107            VALUES ($1, $2, NOW(), $3)
108            RETURNING id
109        )
110        UPDATE refs
111        SET head = (SELECT id FROM snapshot)
112        WHERE id = $1
113        ",
114        ref_id,
115        new_doc_response.doc_json,
116        new_doc_response.doc_id,
117    );
118    query.execute(&state.db).await?;
119    Ok(())
120}
121
122pub async fn doc_id(state: AppState, ref_id: Uuid) -> Result<String, AppError> {
123    let query = sqlx::query!(
124        "
125        SELECT doc_id FROM snapshots
126        WHERE id = (SELECT head FROM refs WHERE id = $1)
127        ",
128        ref_id
129    );
130
131    let doc_id = query.fetch_one(&state.db).await?.doc_id;
132
133    start_listening_automerge_doc(&state.automerge_io, ref_id, doc_id.clone()).await?;
134
135    Ok(doc_id)
136}
137
138async fn call_automerge_io<T, P>(
139    automerge_io: &SocketIo,
140    event: impl Into<String>,
141    payload: P,
142    fail_msg: impl Into<String>,
143) -> Result<T, AppError>
144where
145    P: Serialize,
146    T: for<'de> serde::Deserialize<'de>,
147{
148    let event = event.into();
149    let fail_msg = fail_msg.into();
150
151    let ack = automerge_io
152        .emit_with_ack::<Vec<Result<T, String>>>(event, payload)
153        .map_err(|e| AppError::AutomergeServer(format!("{fail_msg}: {e}")))?;
154
155    let response_array = ack.await?.data;
156    let response = response_array
157        .into_iter()
158        .next()
159        .ok_or_else(|| AppError::AutomergeServer("Empty ack response".to_string()))?;
160
161    response.map_err(AppError::AutomergeServer)
162}
163
164async fn start_listening_automerge_doc(
165    automerge_io: &SocketIo,
166    ref_id: Uuid,
167    doc_id: String,
168) -> Result<(), AppError> {
169    call_automerge_io::<(), _>(
170        automerge_io,
171        "startListening",
172        [ref_id.to_string(), doc_id],
173        "Failed to call startListening from backend".to_string(),
174    )
175    .await
176}
177
178async fn clone_automerge_doc(
179    automerge_io: &SocketIo,
180    ref_id: Uuid,
181    doc_id: String,
182) -> Result<NewDocSocketResponse, AppError> {
183    call_automerge_io::<NewDocSocketResponse, _>(
184        automerge_io,
185        "cloneDoc",
186        [ref_id.to_string(), doc_id],
187        "Failed to call cloneDoc from backend".to_string(),
188    )
189    .await
190}
191
192async fn create_automerge_doc(
193    automerge_io: &SocketIo,
194    content: serde_json::Value,
195) -> Result<NewDocSocketResponse, AppError> {
196    call_automerge_io::<NewDocSocketResponse, _>(
197        automerge_io,
198        "createDoc",
199        content,
200        "Failed to call createDoc from backend".to_string(),
201    )
202    .await
203}
204
205/// A document ref along with its content.
206#[derive(Debug, Serialize, Deserialize, TS)]
207pub struct RefContent {
208    #[serde(rename = "refId")]
209    pub ref_id: Uuid,
210    pub content: Value,
211}
212
213#[derive(Debug, Serialize, Deserialize)]
214pub struct NewDocSocketResponse {
215    #[serde(rename = "docId")]
216    pub doc_id: String,
217    #[serde(rename = "docJson")]
218    pub doc_json: Value,
219}
220
221/// A subset of user relevant information about a ref. Used for showing users
222/// information on a variety of refs without having to load whole refs.
223#[derive(Clone, Debug, Serialize, Deserialize, TS)]
224pub struct RefStub {
225    pub name: String,
226    #[serde(rename = "typeName")]
227    pub type_name: String,
228    #[serde(rename = "refId")]
229    pub ref_id: Uuid,
230    // permission level that the current user has on this ref
231    #[serde(rename = "permissionLevel")]
232    pub permission_level: PermissionLevel,
233    pub owner: Option<UserSummary>,
234    #[serde(rename = "createdAt")]
235    pub created_at: DateTime<Utc>,
236}
237
238/// Parameters for filtering a search of refs
239#[derive(Clone, Debug, Serialize, Deserialize, TS)]
240pub struct RefQueryParams {
241    #[serde(rename = "ownerUsernameQuery")]
242    pub owner_username_query: Option<String>,
243    #[serde(rename = "refNameQuery")]
244    pub ref_name_query: Option<String>,
245    #[serde(rename = "searcherMinLevel")]
246    pub searcher_min_level: Option<PermissionLevel>,
247    #[serde(rename = "includePublicDocuments")]
248    pub include_public_documents: Option<bool>,
249    pub limit: Option<i32>,
250    pub offset: Option<i32>,
251    // TODO: add param for document type
252}
253
254/// Searches for `RefStub`s that the current user has permission to access,
255/// returning lightweight metadata about each matching ref
256pub async fn search_ref_stubs(
257    ctx: AppCtx,
258    search_params: RefQueryParams,
259) -> Result<Paginated<RefStub>, AppError> {
260    let searcher_id = ctx.user.as_ref().map(|user| user.user_id.clone());
261
262    let min_level = search_params.searcher_min_level.unwrap_or(PermissionLevel::Read);
263
264    let limit = search_params.limit.unwrap_or(100);
265    let offset = search_params.offset.unwrap_or(0);
266
267    let results = sqlx::query!(
268        r#"
269        WITH effective_permissions AS (
270            /*
271              select at most one row per ref, the row is either:
272               - the searcher’s own permission, if it exists
273               - the public permission (subject IS NULL) when include_public_documents = TRUE and the
274                 searcher does not already have a row
275            */
276            SELECT DISTINCT ON (object)
277                   object,
278                   level
279            FROM   permissions
280            WHERE  (subject = $1)
281               OR  ($5 AND subject IS NULL)
282            ORDER BY object,
283                     (subject IS NOT NULL) DESC           -- prefer the user‑specific row
284        )
285        SELECT 
286            refs.id AS ref_id,
287            snapshots.content->>'name' AS name,
288            snapshots.content->>'type' AS type_name,
289            refs.created as created_at,
290            effective_permissions.level AS "permission_level: PermissionLevel",
291            owner.id AS "owner_id?",
292            owner.username AS "owner_username?",
293            owner.display_name AS "owner_display_name?",
294            COUNT(*) OVER()::int4 AS total_count
295        FROM refs
296        JOIN snapshots ON snapshots.id = refs.head
297        JOIN effective_permissions ON effective_permissions.object = refs.id
298        JOIN permissions AS p_owner 
299            ON p_owner.object = refs.id AND p_owner.level = 'own'
300        LEFT JOIN users AS owner
301            ON owner.id = p_owner.subject
302        WHERE (
303            owner.username = $2
304            OR $2 IS NULL
305        )
306        AND (
307            snapshots.content->>'name' ILIKE '%' || $3 || '%'
308            OR $3 IS NULL
309        )
310        AND (
311            effective_permissions.level >= $4
312        )
313        ORDER BY refs.created DESC
314        LIMIT $6::int4
315        OFFSET $7::int4;
316        "#,
317        searcher_id,
318        search_params.owner_username_query,
319        search_params.ref_name_query,
320        min_level as PermissionLevel,
321        search_params.include_public_documents.unwrap_or(false),
322        limit,
323        offset,
324    )
325    .fetch_all(&ctx.state.db)
326    .await?;
327
328    let total = results.first().and_then(|r| r.total_count).unwrap_or(0);
329
330    // We can't use sqlx::query_as! because name and type_name can be null
331    let items = results
332        .into_iter()
333        .map(|row| RefStub {
334            ref_id: row.ref_id,
335            name: row.name.unwrap_or_else(|| "untitled".to_string()),
336            type_name: row.type_name.expect("type_name should never be null"),
337            permission_level: row.permission_level,
338            created_at: row.created_at,
339            owner: match row.owner_id {
340                Some(id) => Some(UserSummary {
341                    id,
342                    username: row.owner_username,
343                    display_name: row.owner_display_name,
344                }),
345                _ => None,
346            },
347        })
348        .collect();
349
350    Ok(Paginated {
351        total,
352        offset,
353        items,
354    })
355}