breez_sdk_spark/persist/
sqlite.rs

1use macros::async_trait;
2use rusqlite::{
3    Connection, ToSql, params,
4    types::{FromSql, FromSqlError, FromSqlResult, ToSqlOutput, ValueRef},
5};
6use rusqlite_migration::{M, Migrations};
7use std::path::{Path, PathBuf};
8
9use crate::{
10    DepositInfo, LnurlPayInfo, PaymentDetails, PaymentMethod,
11    error::DepositClaimError,
12    models::{PaymentStatus, PaymentType},
13    persist::{PaymentMetadata, UpdateDepositPayload},
14};
15
16use super::{Payment, Storage, StorageError};
17
18const DEFAULT_DB_FILENAME: &str = "storage.sql";
19/// SQLite-based storage implementation
20pub struct SqliteStorage {
21    db_dir: PathBuf,
22}
23
24impl SqliteStorage {
25    /// Creates a new `SQLite` storage
26    ///
27    /// # Arguments
28    ///
29    /// * `path` - Path to the `SQLite` database file
30    ///
31    /// # Returns
32    ///
33    /// A new `SqliteStorage` instance or an error
34    pub fn new(path: &Path) -> Result<Self, StorageError> {
35        let storage = Self {
36            db_dir: path.to_path_buf(),
37        };
38
39        #[cfg(not(all(target_family = "wasm", target_os = "unknown")))]
40        std::fs::create_dir_all(path)
41            .map_err(|e| StorageError::InitializationError(e.to_string()))?;
42
43        storage.migrate()?;
44        Ok(storage)
45    }
46
47    pub(crate) fn get_connection(&self) -> Result<Connection, StorageError> {
48        Ok(Connection::open(self.get_db_path())?)
49    }
50
51    fn get_db_path(&self) -> PathBuf {
52        self.db_dir.join(DEFAULT_DB_FILENAME)
53    }
54
55    fn migrate(&self) -> Result<(), StorageError> {
56        let migrations =
57            Migrations::new(Self::current_migrations().into_iter().map(M::up).collect());
58        let mut conn = self.get_connection()?;
59        migrations.to_latest(&mut conn)?;
60        Ok(())
61    }
62
63    pub(crate) fn current_migrations() -> Vec<&'static str> {
64        vec![
65            "CREATE TABLE IF NOT EXISTS payments (
66              id TEXT PRIMARY KEY,
67              payment_type TEXT NOT NULL,
68              status TEXT NOT NULL,
69              amount INTEGER NOT NULL,
70              fees INTEGER NOT NULL,
71              timestamp INTEGER NOT NULL,
72              details TEXT,
73              method TEXT
74            );",
75            "CREATE TABLE IF NOT EXISTS settings (
76              key TEXT PRIMARY KEY,
77              value TEXT NOT NULL
78            );",
79            "CREATE TABLE IF NOT EXISTS unclaimed_deposits (
80              txid TEXT NOT NULL,
81              vout INTEGER NOT NULL,
82              amount_sats INTEGER,
83              claim_error TEXT,
84              refund_tx TEXT,
85              refund_tx_id TEXT,
86              PRIMARY KEY (txid, vout)
87            );",
88            "CREATE TABLE IF NOT EXISTS payment_metadata (
89              payment_id TEXT PRIMARY KEY,
90              lnurl_pay_info TEXT);",
91            "CREATE TABLE IF NOT EXISTS deposit_refunds (
92              deposit_tx_id TEXT NOT NULL,
93              deposit_vout INTEGER NOT NULL,
94              refund_tx TEXT NOT NULL,
95              refund_tx_id TEXT NOT NULL,
96              PRIMARY KEY (deposit_tx_id, deposit_vout)              
97            );",
98        ]
99    }
100}
101
102impl From<rusqlite::Error> for StorageError {
103    fn from(value: rusqlite::Error) -> Self {
104        StorageError::Implementation(value.to_string())
105    }
106}
107
108impl From<rusqlite_migration::Error> for StorageError {
109    fn from(value: rusqlite_migration::Error) -> Self {
110        StorageError::Implementation(value.to_string())
111    }
112}
113
114#[async_trait]
115impl Storage for SqliteStorage {
116    async fn list_payments(
117        &self,
118        offset: Option<u32>,
119        limit: Option<u32>,
120    ) -> Result<Vec<Payment>, StorageError> {
121        let connection = self.get_connection()?;
122
123        let query = format!(
124            "SELECT p.id, p.payment_type, p.status, p.amount, p.fees, p.timestamp, p.details, p.method, pm.lnurl_pay_info
125             FROM payments p
126             LEFT JOIN payment_metadata pm ON p.id = pm.payment_id
127             ORDER BY p.timestamp DESC 
128             LIMIT {} OFFSET {}",
129            limit.unwrap_or(u32::MAX),
130            offset.unwrap_or(0)
131        );
132
133        let mut stmt = connection.prepare(&query)?;
134
135        let payment_iter = stmt.query_map(params![], |row| {
136            let mut details: Option<PaymentDetails> = row.get(6)?;
137            if let Some(PaymentDetails::Lightning { lnurl_pay_info, .. }) = &mut details {
138                *lnurl_pay_info = row.get(8)?;
139            }
140
141            Ok(Payment {
142                id: row.get(0)?,
143                payment_type: PaymentType::from(row.get::<_, String>(1)?.as_str()),
144                status: PaymentStatus::from(row.get::<_, String>(2)?.as_str()),
145                amount: row.get(3)?,
146                fees: row.get(4)?,
147                timestamp: row.get(5)?,
148                details,
149                method: row.get(7)?,
150            })
151        })?;
152
153        let mut payments = Vec::new();
154        for payment in payment_iter {
155            payments.push(payment?);
156        }
157
158        Ok(payments)
159    }
160
161    async fn insert_payment(&self, payment: Payment) -> Result<(), StorageError> {
162        let connection = self.get_connection()?;
163
164        connection.execute(
165            "INSERT OR REPLACE INTO payments (id, payment_type, status, amount, fees, timestamp, details, method) 
166             VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
167            params![
168                payment.id,
169                payment.payment_type.to_string(),
170                payment.status.to_string(),
171                payment.amount,
172                payment.fees,
173                payment.timestamp,
174                payment.details,
175                payment.method,
176            ],
177        )?;
178
179        Ok(())
180    }
181
182    async fn set_payment_metadata(
183        &self,
184        payment_id: String,
185        metadata: PaymentMetadata,
186    ) -> Result<(), StorageError> {
187        let connection = self.get_connection()?;
188
189        connection.execute(
190            "INSERT OR REPLACE INTO payment_metadata (payment_id, lnurl_pay_info) VALUES (?, ?)",
191            params![payment_id, metadata.lnurl_pay_info],
192        )?;
193
194        Ok(())
195    }
196
197    async fn set_cached_item(&self, key: String, value: String) -> Result<(), StorageError> {
198        let connection = self.get_connection()?;
199
200        connection.execute(
201            "INSERT OR REPLACE INTO settings (key, value) VALUES (?, ?)",
202            params![key, value],
203        )?;
204
205        Ok(())
206    }
207
208    async fn get_cached_item(&self, key: String) -> Result<Option<String>, StorageError> {
209        let connection = self.get_connection()?;
210
211        let mut stmt = connection.prepare("SELECT value FROM settings WHERE key = ?")?;
212
213        let result = stmt.query_row(params![key], |row| {
214            let value_str: String = row.get(0)?;
215            Ok(value_str)
216        });
217
218        match result {
219            Ok(value) => Ok(Some(value)),
220            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
221            Err(e) => Err(e.into()),
222        }
223    }
224
225    async fn get_payment_by_id(&self, id: String) -> Result<Payment, StorageError> {
226        let connection = self.get_connection()?;
227
228        let mut stmt = connection.prepare(
229            "SELECT id, payment_type, status, amount, fees, timestamp, details, method, pm.lnurl_pay_info FROM payments LEFT JOIN payment_metadata pm ON payments.id = pm.payment_id WHERE payments.id = ?",
230        )?;
231
232        let result = stmt.query_row(params![id], |row| {
233            let mut details: Option<PaymentDetails> = row.get(6)?;
234            if let Some(PaymentDetails::Lightning { lnurl_pay_info, .. }) = &mut details {
235                *lnurl_pay_info = row.get(8)?;
236            }
237
238            Ok(Payment {
239                id: row.get(0)?,
240                payment_type: PaymentType::from(row.get::<_, String>(1)?.as_str()),
241                status: PaymentStatus::from(row.get::<_, String>(2)?.as_str()),
242                amount: row.get(3)?,
243                fees: row.get(4)?,
244                timestamp: row.get(5)?,
245                details,
246                method: row.get(7)?,
247            })
248        });
249        result.map_err(StorageError::from)
250    }
251
252    async fn add_deposit(
253        &self,
254        txid: String,
255        vout: u32,
256        amount_sats: u64,
257    ) -> Result<(), StorageError> {
258        let connection = self.get_connection()?;
259        connection.execute(
260            "INSERT OR IGNORE INTO unclaimed_deposits (txid, vout, amount_sats) 
261             VALUES (?, ?, ?)",
262            params![txid, vout, amount_sats,],
263        )?;
264        Ok(())
265    }
266
267    async fn delete_deposit(&self, txid: String, vout: u32) -> Result<(), StorageError> {
268        let connection = self.get_connection()?;
269        connection.execute(
270            "DELETE FROM unclaimed_deposits WHERE txid = ? AND vout = ?",
271            params![txid, vout],
272        )?;
273        Ok(())
274    }
275
276    async fn list_deposits(&self) -> Result<Vec<DepositInfo>, StorageError> {
277        let connection = self.get_connection()?;
278        let mut stmt =
279            connection.prepare("SELECT txid, vout, amount_sats, claim_error, refund_tx, refund_tx_id FROM unclaimed_deposits")?;
280        let rows = stmt.query_map(params![], |row| {
281            Ok(DepositInfo {
282                txid: row.get(0)?,
283                vout: row.get(1)?,
284                amount_sats: row.get(2)?,
285                claim_error: row.get(3)?,
286                refund_tx: row.get(4)?,
287                refund_tx_id: row.get(5)?,
288            })
289        })?;
290        let mut deposits = Vec::new();
291        for row in rows {
292            deposits.push(row?);
293        }
294        Ok(deposits)
295    }
296
297    async fn update_deposit(
298        &self,
299        txid: String,
300        vout: u32,
301        payload: UpdateDepositPayload,
302    ) -> Result<(), StorageError> {
303        let connection = self.get_connection()?;
304        match payload {
305            UpdateDepositPayload::ClaimError { error } => {
306                connection.execute(
307                    "UPDATE unclaimed_deposits SET claim_error = ? WHERE txid = ? AND vout = ?",
308                    params![error, txid, vout],
309                )?;
310            }
311            UpdateDepositPayload::Refund {
312                refund_txid,
313                refund_tx,
314            } => {
315                connection.execute(
316                    "UPDATE unclaimed_deposits SET refund_tx = ?, refund_tx_id = ? WHERE txid = ? AND vout = ?",
317                    params![refund_tx, refund_txid, txid, vout],
318                )?;
319            }
320        }
321        Ok(())
322    }
323}
324
325impl ToSql for PaymentDetails {
326    fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
327        let json = serde_json::to_string(self)
328            .map_err(|e| rusqlite::Error::ToSqlConversionFailure(Box::new(e)))?;
329        Ok(rusqlite::types::ToSqlOutput::from(json))
330    }
331}
332
333impl FromSql for PaymentDetails {
334    fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
335        match value {
336            ValueRef::Text(i) => {
337                let s = std::str::from_utf8(i).map_err(FromSqlError::other)?;
338                let payment_details: PaymentDetails =
339                    serde_json::from_str(s).map_err(|_| FromSqlError::InvalidType)?;
340                Ok(payment_details)
341            }
342            _ => Err(FromSqlError::InvalidType),
343        }
344    }
345}
346
347impl ToSql for PaymentMethod {
348    fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
349        let json = serde_json::to_string(self)
350            .map_err(|e| rusqlite::Error::ToSqlConversionFailure(Box::new(e)))?;
351        Ok(rusqlite::types::ToSqlOutput::from(json))
352    }
353}
354
355impl FromSql for PaymentMethod {
356    fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
357        match value {
358            ValueRef::Text(i) => {
359                let s = std::str::from_utf8(i).map_err(FromSqlError::other)?;
360                let payment_method: PaymentMethod =
361                    serde_json::from_str(s).map_err(|_| FromSqlError::InvalidType)?;
362                Ok(payment_method)
363            }
364            _ => Err(FromSqlError::InvalidType),
365        }
366    }
367}
368
369impl ToSql for DepositClaimError {
370    fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
371        let json = serde_json::to_string(self)
372            .map_err(|e| rusqlite::Error::ToSqlConversionFailure(Box::new(e)))?;
373        Ok(rusqlite::types::ToSqlOutput::from(json))
374    }
375}
376
377impl ToSql for LnurlPayInfo {
378    fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
379        let json = serde_json::to_string(self)
380            .map_err(|e| rusqlite::Error::ToSqlConversionFailure(Box::new(e)))?;
381        Ok(rusqlite::types::ToSqlOutput::from(json))
382    }
383}
384
385impl FromSql for DepositClaimError {
386    fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
387        match value {
388            ValueRef::Text(i) => {
389                let s = std::str::from_utf8(i).map_err(FromSqlError::other)?;
390                let deposit_claim_error: DepositClaimError =
391                    serde_json::from_str(s).map_err(|_| FromSqlError::InvalidType)?;
392                Ok(deposit_claim_error)
393            }
394            _ => Err(FromSqlError::InvalidType),
395        }
396    }
397}
398
399impl FromSql for LnurlPayInfo {
400    fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
401        match value {
402            ValueRef::Text(i) => {
403                let s = std::str::from_utf8(i).map_err(FromSqlError::other)?;
404                let lnurl_pay_info: LnurlPayInfo =
405                    serde_json::from_str(s).map_err(|_| FromSqlError::InvalidType)?;
406                Ok(lnurl_pay_info)
407            }
408            _ => Err(FromSqlError::InvalidType),
409        }
410    }
411}
412
413#[cfg(test)]
414mod tests {
415    use crate::SqliteStorage;
416
417    #[tokio::test]
418    async fn test_sqlite_storage() {
419        let temp_dir = tempdir::TempDir::new("sqlite_storage").unwrap();
420        let storage = SqliteStorage::new(temp_dir.path()).unwrap();
421
422        crate::persist::tests::test_sqlite_storage(Box::new(storage)).await;
423    }
424
425    #[tokio::test]
426    async fn test_unclaimed_deposits_crud() {
427        let temp_dir = tempdir::TempDir::new("sqlite_storage_deposits").unwrap();
428        let storage = SqliteStorage::new(temp_dir.path()).unwrap();
429
430        crate::persist::tests::test_unclaimed_deposits_crud(Box::new(storage)).await;
431    }
432
433    #[tokio::test]
434    async fn test_deposit_refunds() {
435        let temp_dir = tempdir::TempDir::new("sqlite_storage_refund_tx").unwrap();
436        let storage = SqliteStorage::new(temp_dir.path()).unwrap();
437
438        crate::persist::tests::test_deposit_refunds(Box::new(storage)).await;
439    }
440}