backend/storage/
postgres.rs1use samod::storage::{Storage, StorageKey};
2use sqlx::PgPool;
3use std::collections::HashMap;
4
5#[derive(Clone)]
17pub struct PostgresStorage {
18 pool: PgPool,
19}
20
21impl PostgresStorage {
22 pub fn new(pool: PgPool) -> Self {
24 Self { pool }
25 }
26}
27
28impl Storage for PostgresStorage {
29 async fn load(&self, key: StorageKey) -> Option<Vec<u8>> {
30 let key_parts: Vec<String> = key.into_iter().collect();
31
32 let result = sqlx::query_scalar::<_, Vec<u8>>("SELECT data FROM storage WHERE key = $1")
33 .bind(&key_parts)
34 .fetch_optional(&self.pool)
35 .await;
36
37 match result {
38 Ok(data) => data,
39 Err(e) => {
40 tracing::error!("Failed to load from storage: {}", e);
41 None
42 }
43 }
44 }
45
46 async fn load_range(&self, prefix: StorageKey) -> HashMap<StorageKey, Vec<u8>> {
47 let prefix_parts: Vec<String> = prefix.into_iter().collect();
48
49 let result = if prefix_parts.is_empty() {
50 sqlx::query_as::<_, (Vec<String>, Vec<u8>)>("SELECT key, data FROM storage")
51 .fetch_all(&self.pool)
52 .await
53 } else {
54 sqlx::query_as::<_, (Vec<String>, Vec<u8>)>(
55 "SELECT key, data FROM storage WHERE key[1:cardinality($1::text[])] = $1::text[]",
56 )
57 .bind(&prefix_parts)
58 .fetch_all(&self.pool)
59 .await
60 };
61
62 match result {
63 Ok(rows) => {
64 let mut map = HashMap::new();
65 for (key_parts, data) in rows {
66 if let Ok(storage_key) = StorageKey::from_parts(key_parts) {
67 map.insert(storage_key, data);
68 }
69 }
70 map
71 }
72 Err(e) => {
73 tracing::error!("Failed to load range from storage: {}", e);
74 HashMap::new()
75 }
76 }
77 }
78
79 async fn put(&self, key: StorageKey, data: Vec<u8>) {
80 let key_parts: Vec<String> = key.into_iter().collect();
81
82 let result = sqlx::query(
83 "
84 INSERT INTO storage (key, data)
85 VALUES ($1, $2)
86 ON CONFLICT (key) DO UPDATE SET data = $2
87 ",
88 )
89 .bind(&key_parts)
90 .bind(&data)
91 .execute(&self.pool)
92 .await;
93
94 if let Err(e) = result {
95 tracing::error!("Failed to put to storage: {}", e);
96 }
97 }
98
99 async fn delete(&self, key: StorageKey) {
100 let key_parts: Vec<String> = key.into_iter().collect();
101
102 let result = sqlx::query("DELETE FROM storage WHERE key = $1")
103 .bind(&key_parts)
104 .execute(&self.pool)
105 .await;
106
107 if let Err(e) = result {
108 tracing::error!("Failed to delete from storage: {}", e);
109 }
110 }
111}