backend/storage/
postgres.rs

1use samod::storage::{Storage, StorageKey};
2use sqlx::PgPool;
3use std::collections::HashMap;
4
5/// A PostgreSQL-backed storage adapter for samod.
6///
7/// ## Database Schema
8///
9/// The adapter requires a table with the following structure:
10/// ```sql
11/// CREATE TABLE storage (
12///     key text[] PRIMARY KEY,
13///     data bytea NOT NULL
14/// );
15/// ```
16#[derive(Clone)]
17pub struct PostgresStorage {
18    pool: PgPool,
19}
20
21impl PostgresStorage {
22    /// Constructs a new PostgreSQL storage adapter.
23    pub fn new(pool: PgPool) -> Self {
24        Self { pool }
25    }
26}
27
28impl Storage for PostgresStorage {
29    async fn load(&self, key: StorageKey) -> Option<Vec<u8>> {
30        let key_parts: Vec<String> = key.into_iter().collect();
31
32        let result = sqlx::query_scalar::<_, Vec<u8>>("SELECT data FROM storage WHERE key = $1")
33            .bind(&key_parts)
34            .fetch_optional(&self.pool)
35            .await;
36
37        match result {
38            Ok(data) => data,
39            Err(e) => {
40                tracing::error!("Failed to load from storage: {}", e);
41                None
42            }
43        }
44    }
45
46    async fn load_range(&self, prefix: StorageKey) -> HashMap<StorageKey, Vec<u8>> {
47        let prefix_parts: Vec<String> = prefix.into_iter().collect();
48
49        let result = if prefix_parts.is_empty() {
50            sqlx::query_as::<_, (Vec<String>, Vec<u8>)>("SELECT key, data FROM storage")
51                .fetch_all(&self.pool)
52                .await
53        } else {
54            sqlx::query_as::<_, (Vec<String>, Vec<u8>)>(
55                "SELECT key, data FROM storage WHERE key[1:cardinality($1::text[])] = $1::text[]",
56            )
57            .bind(&prefix_parts)
58            .fetch_all(&self.pool)
59            .await
60        };
61
62        match result {
63            Ok(rows) => {
64                let mut map = HashMap::new();
65                for (key_parts, data) in rows {
66                    if let Ok(storage_key) = StorageKey::from_parts(key_parts) {
67                        map.insert(storage_key, data);
68                    }
69                }
70                map
71            }
72            Err(e) => {
73                tracing::error!("Failed to load range from storage: {}", e);
74                HashMap::new()
75            }
76        }
77    }
78
79    async fn put(&self, key: StorageKey, data: Vec<u8>) {
80        let key_parts: Vec<String> = key.into_iter().collect();
81
82        let result = sqlx::query(
83            "
84            INSERT INTO storage (key, data)
85            VALUES ($1, $2)
86            ON CONFLICT (key) DO UPDATE SET data = $2
87            ",
88        )
89        .bind(&key_parts)
90        .bind(&data)
91        .execute(&self.pool)
92        .await;
93
94        if let Err(e) = result {
95            tracing::error!("Failed to put to storage: {}", e);
96        }
97    }
98
99    async fn delete(&self, key: StorageKey) {
100        let key_parts: Vec<String> = key.into_iter().collect();
101
102        let result = sqlx::query("DELETE FROM storage WHERE key = $1")
103            .bind(&key_parts)
104            .execute(&self.pool)
105            .await;
106
107        if let Err(e) = result {
108            tracing::error!("Failed to delete from storage: {}", e);
109        }
110    }
111}