breez_sdk_spark/persist/
sqlite.rs

1use std::path::{Path, PathBuf};
2
3use macros::async_trait;
4use rusqlite::{
5    Connection, Row, ToSql, Transaction, params,
6    types::{FromSql, FromSqlError, FromSqlResult, ToSqlOutput, ValueRef},
7};
8use rusqlite_migration::{M, Migrations, SchemaVersion};
9
10use crate::{
11    AssetFilter, DepositInfo, ListPaymentsRequest, LnurlPayInfo, LnurlWithdrawInfo, PaymentDetails,
12    PaymentMethod,
13    error::DepositClaimError,
14    persist::{PaymentMetadata, UpdateDepositPayload},
15    sync_storage::{
16        IncomingChange, OutgoingChange, Record, RecordChange, RecordId, SyncStorage,
17        SyncStorageError, UnversionedRecordChange,
18    },
19};
20
21use super::{Payment, Storage, StorageError};
22
23const DEFAULT_DB_FILENAME: &str = "storage.sql";
24/// SQLite-based storage implementation
25pub struct SqliteStorage {
26    db_dir: PathBuf,
27}
28
29impl SqliteStorage {
30    /// Creates a new `SQLite` storage
31    ///
32    /// # Arguments
33    ///
34    /// * `path` - Path to the `SQLite` database file
35    ///
36    /// # Returns
37    ///
38    /// A new `SqliteStorage` instance or an error
39    pub fn new(path: &Path) -> Result<Self, StorageError> {
40        let storage = Self {
41            db_dir: path.to_path_buf(),
42        };
43
44        #[cfg(not(all(target_family = "wasm", target_os = "unknown")))]
45        std::fs::create_dir_all(path)
46            .map_err(|e| StorageError::InitializationError(e.to_string()))?;
47
48        storage.migrate()?;
49        Ok(storage)
50    }
51
52    pub(crate) fn get_connection(&self) -> Result<Connection, StorageError> {
53        Ok(Connection::open(self.get_db_path())?)
54    }
55
56    fn get_db_path(&self) -> PathBuf {
57        self.db_dir.join(DEFAULT_DB_FILENAME)
58    }
59
60    fn migrate(&self) -> Result<(), StorageError> {
61        let migrations =
62            Migrations::new(Self::current_migrations().into_iter().map(M::up).collect());
63        let mut conn = self.get_connection()?;
64        let previous_version = match migrations.current_version(&conn)? {
65            SchemaVersion::Inside(previous_version) => previous_version.get(),
66            _ => 0,
67        };
68        migrations.to_latest(&mut conn)?;
69
70        if previous_version < 6 {
71            Self::migrate_lnurl_metadata_description(&mut conn)?;
72        }
73
74        Ok(())
75    }
76
77    fn migrate_lnurl_metadata_description(conn: &mut Connection) -> Result<(), StorageError> {
78        let mut stmt = conn.prepare("SELECT payment_id, lnurl_pay_info FROM payment_metadata")?;
79        let pay_infos: Vec<_> = stmt
80            .query_map([], |row| {
81                let payment_id: String = row.get(0)?;
82                let lnurl_pay_info: Option<LnurlPayInfo> = row.get(1)?;
83                Ok((payment_id, lnurl_pay_info))
84            })?
85            .collect::<Result<_, _>>()?;
86        let pay_infos = pay_infos
87            .into_iter()
88            .filter_map(|(payment_id, lnurl_pay_info)| {
89                let pay_info = lnurl_pay_info?;
90                let description = pay_info.extract_description()?;
91                Some((payment_id, description))
92            })
93            .collect::<Vec<_>>();
94
95        for pay_info in pay_infos {
96            conn.execute(
97                "UPDATE payment_metadata SET lnurl_description = ? WHERE payment_id = ?",
98                params![pay_info.1, pay_info.0],
99            )?;
100        }
101
102        Ok(())
103    }
104
105    #[allow(clippy::too_many_lines)]
106    pub(crate) fn current_migrations() -> Vec<&'static str> {
107        vec![
108            "CREATE TABLE IF NOT EXISTS payments (
109              id TEXT PRIMARY KEY,
110              payment_type TEXT NOT NULL,
111              status TEXT NOT NULL,
112              amount INTEGER NOT NULL,
113              fees INTEGER NOT NULL,
114              timestamp INTEGER NOT NULL,
115              details TEXT,
116              method TEXT
117            );",
118            "CREATE TABLE IF NOT EXISTS settings (
119              key TEXT PRIMARY KEY,
120              value TEXT NOT NULL
121            );",
122            "CREATE TABLE IF NOT EXISTS unclaimed_deposits (
123              txid TEXT NOT NULL,
124              vout INTEGER NOT NULL,
125              amount_sats INTEGER,
126              claim_error TEXT,
127              refund_tx TEXT,
128              refund_tx_id TEXT,
129              PRIMARY KEY (txid, vout)
130            );",
131            "CREATE TABLE IF NOT EXISTS payment_metadata (
132              payment_id TEXT PRIMARY KEY,
133              lnurl_pay_info TEXT
134            );",
135            "CREATE TABLE IF NOT EXISTS deposit_refunds (
136              deposit_tx_id TEXT NOT NULL,
137              deposit_vout INTEGER NOT NULL,
138              refund_tx TEXT NOT NULL,
139              refund_tx_id TEXT NOT NULL,
140              PRIMARY KEY (deposit_tx_id, deposit_vout)              
141            );",
142            "ALTER TABLE payment_metadata ADD COLUMN lnurl_description TEXT;",
143            "
144            ALTER TABLE payments ADD COLUMN withdraw_tx_id TEXT;
145            ALTER TABLE payments ADD COLUMN deposit_tx_id TEXT;
146            ALTER TABLE payments ADD COLUMN spark INTEGER;
147            CREATE TABLE payment_details_lightning (
148              payment_id TEXT PRIMARY KEY,
149              invoice TEXT NOT NULL,
150              payment_hash TEXT NOT NULL,
151              destination_pubkey TEXT NOT NULL,
152              description TEXT,
153              preimage TEXT,
154              FOREIGN KEY (payment_id) REFERENCES payments(id) ON DELETE CASCADE
155            );
156            INSERT INTO payment_details_lightning (payment_id, invoice, payment_hash, destination_pubkey, description, preimage)
157            SELECT id, json_extract(details, '$.Lightning.invoice'), json_extract(details, '$.Lightning.payment_hash'), 
158                json_extract(details, '$.Lightning.destination_pubkey'), json_extract(details, '$.Lightning.description'), 
159                json_extract(details, '$.Lightning.preimage') 
160            FROM payments WHERE json_extract(details, '$.Lightning.invoice') IS NOT NULL;
161
162            UPDATE payments SET withdraw_tx_id = json_extract(details, '$.Withdraw.tx_id')
163            WHERE json_extract(details, '$.Withdraw.tx_id') IS NOT NULL;
164
165            UPDATE payments SET deposit_tx_id = json_extract(details, '$.Deposit.tx_id')
166            WHERE json_extract(details, '$.Deposit.tx_id') IS NOT NULL;
167
168            ALTER TABLE payments DROP COLUMN details;
169
170            CREATE INDEX idx_payment_details_lightning_invoice ON payment_details_lightning(invoice);
171            ",
172            "CREATE TABLE payment_details_token (
173              payment_id TEXT PRIMARY KEY,
174              metadata TEXT NOT NULL,
175              tx_hash TEXT NOT NULL,
176              FOREIGN KEY (payment_id) REFERENCES payments(id) ON DELETE CASCADE
177            );",
178            // Migration to change payments amount and fees from INTEGER to TEXT
179            "CREATE TABLE payments_new (
180              id TEXT PRIMARY KEY,
181              payment_type TEXT NOT NULL,
182              status TEXT NOT NULL,
183              amount TEXT NOT NULL,
184              fees TEXT NOT NULL,
185              timestamp INTEGER NOT NULL,
186              method TEXT,
187              withdraw_tx_id TEXT,
188              deposit_tx_id TEXT,
189              spark INTEGER
190            );",
191            "INSERT INTO payments_new (id, payment_type, status, amount, fees, timestamp, method, withdraw_tx_id, deposit_tx_id, spark)
192             SELECT id, payment_type, status, CAST(amount AS TEXT), CAST(fees AS TEXT), timestamp, method, withdraw_tx_id, deposit_tx_id, spark
193             FROM payments;",
194            "DROP TABLE payments;",
195            "ALTER TABLE payments_new RENAME TO payments;",
196            "CREATE TABLE payment_details_spark (
197              payment_id TEXT NOT NULL PRIMARY KEY,
198              invoice_details TEXT NOT NULL,
199              FOREIGN KEY (payment_id) REFERENCES payments(id) ON DELETE CASCADE
200            );
201            ALTER TABLE payment_details_token ADD COLUMN invoice_details TEXT;",
202            "ALTER TABLE payment_metadata ADD COLUMN lnurl_withdraw_info TEXT;",
203            "CREATE TABLE sync_revision (
204                revision INTEGER NOT NULL DEFAULT 0
205            );
206            INSERT INTO sync_revision (revision) VALUES (0);
207            CREATE TABLE sync_outgoing(
208                record_type TEXT NOT NULL,
209                data_id TEXT NOT NULL,
210                schema_version TEXT NOT NULL,
211                commit_time INTEGER NOT NULL,
212                updated_fields_json TEXT NOT NULL,
213                revision INTEGER NOT NULL
214            );
215            CREATE INDEX idx_sync_outgoing_data_id_record_type ON sync_outgoing(record_type, data_id);
216            CREATE TABLE sync_state(
217                record_type TEXT NOT NULL,
218                data_id TEXT NOT NULL,
219                schema_version TEXT NOT NULL,
220                commit_time INTEGER NOT NULL,
221                data TEXT NOT NULL,
222                revision INTEGER NOT NULL,
223                PRIMARY KEY(record_type, data_id)
224            );",
225            "CREATE TABLE sync_incoming(
226                record_type TEXT NOT NULL,
227                data_id TEXT NOT NULL,
228                schema_version TEXT NOT NULL,
229                commit_time INTEGER NOT NULL,
230                data TEXT NOT NULL,
231                revision INTEGER NOT NULL,
232                PRIMARY KEY(record_type, data_id, revision)
233            );
234            CREATE INDEX idx_sync_incoming_revision ON sync_incoming(revision);",
235        ]
236    }
237}
238
239impl From<rusqlite::Error> for StorageError {
240    fn from(value: rusqlite::Error) -> Self {
241        StorageError::Implementation(value.to_string())
242    }
243}
244
245impl From<rusqlite_migration::Error> for StorageError {
246    fn from(value: rusqlite_migration::Error) -> Self {
247        StorageError::Implementation(value.to_string())
248    }
249}
250
251#[async_trait]
252impl Storage for SqliteStorage {
253    #[allow(clippy::too_many_lines)]
254    async fn list_payments(
255        &self,
256        request: ListPaymentsRequest,
257    ) -> Result<Vec<Payment>, StorageError> {
258        let connection = self.get_connection()?;
259
260        // Build WHERE clauses based on filters
261        let mut where_clauses = Vec::new();
262        let mut params: Vec<Box<dyn ToSql>> = Vec::new();
263
264        // Filter by payment type
265        if let Some(ref type_filter) = request.type_filter
266            && !type_filter.is_empty()
267        {
268            let placeholders = type_filter
269                .iter()
270                .map(|_| "?")
271                .collect::<Vec<_>>()
272                .join(", ");
273            where_clauses.push(format!("p.payment_type IN ({placeholders})"));
274            for payment_type in type_filter {
275                params.push(Box::new(payment_type.to_string()));
276            }
277        }
278
279        // Filter by status
280        if let Some(ref status_filter) = request.status_filter
281            && !status_filter.is_empty()
282        {
283            let placeholders = status_filter
284                .iter()
285                .map(|_| "?")
286                .collect::<Vec<_>>()
287                .join(", ");
288            where_clauses.push(format!("p.status IN ({placeholders})"));
289            for status in status_filter {
290                params.push(Box::new(status.to_string()));
291            }
292        }
293
294        // Filter by timestamp range
295        if let Some(from_timestamp) = request.from_timestamp {
296            where_clauses.push("p.timestamp >= ?".to_string());
297            params.push(Box::new(from_timestamp));
298        }
299
300        if let Some(to_timestamp) = request.to_timestamp {
301            where_clauses.push("p.timestamp < ?".to_string());
302            params.push(Box::new(to_timestamp));
303        }
304
305        // Filter by asset
306        if let Some(ref asset_filter) = request.asset_filter {
307            match asset_filter {
308                AssetFilter::Bitcoin => {
309                    where_clauses.push("t.metadata IS NULL".to_string());
310                }
311                AssetFilter::Token { token_identifier } => {
312                    where_clauses.push("t.metadata IS NOT NULL".to_string());
313                    if let Some(identifier) = token_identifier {
314                        // Filter by specific token identifier
315                        where_clauses
316                            .push("json_extract(t.metadata, '$.identifier') = ?".to_string());
317                        params.push(Box::new(identifier.clone()));
318                    }
319                }
320            }
321        }
322
323        // Build the WHERE clause
324        let where_sql = if where_clauses.is_empty() {
325            String::new()
326        } else {
327            format!("WHERE {}", where_clauses.join(" AND "))
328        };
329
330        // Determine sort order
331        let order_direction = if request.sort_ascending.unwrap_or(false) {
332            "ASC"
333        } else {
334            "DESC"
335        };
336
337        let query = format!(
338            "SELECT p.id
339            ,       p.payment_type
340            ,       p.status
341            ,       p.amount
342            ,       p.fees
343            ,       p.timestamp
344            ,       p.method
345            ,       p.withdraw_tx_id
346            ,       p.deposit_tx_id
347            ,       p.spark
348            ,       l.invoice AS lightning_invoice
349            ,       l.payment_hash AS lightning_payment_hash
350            ,       l.destination_pubkey AS lightning_destination_pubkey
351            ,       COALESCE(l.description, pm.lnurl_description) AS lightning_description
352            ,       l.preimage AS lightning_preimage
353            ,       pm.lnurl_pay_info
354            ,       pm.lnurl_withdraw_info
355            ,       t.metadata AS token_metadata
356            ,       t.tx_hash AS token_tx_hash
357            ,       t.invoice_details AS token_invoice_details
358            ,       s.invoice_details AS spark_invoice_details
359             FROM payments p
360             LEFT JOIN payment_details_lightning l ON p.id = l.payment_id
361             LEFT JOIN payment_details_token t ON p.id = t.payment_id
362             LEFT JOIN payment_details_spark s ON p.id = s.payment_id
363             LEFT JOIN payment_metadata pm ON p.id = pm.payment_id
364             {}
365             ORDER BY p.timestamp {} 
366             LIMIT {} OFFSET {}",
367            where_sql,
368            order_direction,
369            request.limit.unwrap_or(u32::MAX),
370            request.offset.unwrap_or(0)
371        );
372
373        let mut stmt = connection.prepare(&query)?;
374        let param_refs: Vec<&dyn ToSql> = params.iter().map(std::convert::AsRef::as_ref).collect();
375        let payments = stmt
376            .query_map(param_refs.as_slice(), map_payment)?
377            .collect::<Result<Vec<_>, _>>()?;
378        Ok(payments)
379    }
380
381    async fn insert_payment(&self, payment: Payment) -> Result<(), StorageError> {
382        let mut connection = self.get_connection()?;
383        let tx = connection.transaction()?;
384        tx.execute(
385            "INSERT OR REPLACE INTO payments (id, payment_type, status, amount, fees, timestamp, method) 
386             VALUES (?, ?, ?, ?, ?, ?, ?)",
387            params![
388                payment.id,
389                payment.payment_type.to_string(),
390                payment.status.to_string(),
391                U128SqlWrapper(payment.amount),
392                U128SqlWrapper(payment.fees),
393                payment.timestamp,
394                payment.method,
395            ],
396        )?;
397
398        match payment.details {
399            Some(PaymentDetails::Withdraw { tx_id }) => {
400                tx.execute(
401                    "UPDATE payments SET withdraw_tx_id = ? WHERE id = ?",
402                    params![tx_id, payment.id],
403                )?;
404            }
405            Some(PaymentDetails::Deposit { tx_id }) => {
406                tx.execute(
407                    "UPDATE payments SET deposit_tx_id = ? WHERE id = ?",
408                    params![tx_id, payment.id],
409                )?;
410            }
411            Some(PaymentDetails::Spark { invoice_details }) => {
412                tx.execute(
413                    "UPDATE payments SET spark = 1 WHERE id = ?",
414                    params![payment.id],
415                )?;
416                if let Some(invoice_details) = invoice_details {
417                    tx.execute("INSERT OR REPLACE INTO payment_details_spark (payment_id, invoice_details) VALUES (?, ?)",
418                        params![payment.id, serde_json::to_string(&invoice_details)?],
419                    )?;
420                }
421            }
422            Some(PaymentDetails::Token {
423                metadata,
424                tx_hash,
425                invoice_details,
426            }) => {
427                tx.execute(
428                    "INSERT OR REPLACE INTO payment_details_token (payment_id, metadata, tx_hash, invoice_details) VALUES (?, ?, ?, ?)",
429                    params![payment.id, serde_json::to_string(&metadata)?, tx_hash, invoice_details.map(|d| serde_json::to_string(&d)).transpose()?],
430                )?;
431            }
432            Some(PaymentDetails::Lightning {
433                invoice,
434                payment_hash,
435                destination_pubkey,
436                description,
437                preimage,
438                ..
439            }) => {
440                tx.execute(
441                    "INSERT OR REPLACE INTO payment_details_lightning (payment_id, invoice, payment_hash, destination_pubkey, description, preimage) 
442                     VALUES (?, ?, ?, ?, ?, ?)",
443                    params![
444                        payment.id,
445                        invoice,
446                        payment_hash,
447                        destination_pubkey,
448                        description,
449                        preimage,
450                    ],
451                )?;
452            }
453            None => {}
454        }
455
456        tx.commit()?;
457        Ok(())
458    }
459
460    async fn set_payment_metadata(
461        &self,
462        payment_id: String,
463        metadata: PaymentMetadata,
464    ) -> Result<(), StorageError> {
465        let connection = self.get_connection()?;
466
467        connection.execute(
468            "INSERT OR REPLACE INTO payment_metadata (payment_id, lnurl_pay_info, lnurl_withdraw_info, lnurl_description) VALUES (?, ?, ?, ?)",
469            params![payment_id, metadata.lnurl_pay_info, metadata.lnurl_withdraw_info, metadata.lnurl_description],
470        )?;
471
472        Ok(())
473    }
474
475    async fn set_cached_item(&self, key: String, value: String) -> Result<(), StorageError> {
476        let connection = self.get_connection()?;
477
478        connection.execute(
479            "INSERT OR REPLACE INTO settings (key, value) VALUES (?, ?)",
480            params![key, value],
481        )?;
482
483        Ok(())
484    }
485
486    async fn get_cached_item(&self, key: String) -> Result<Option<String>, StorageError> {
487        let connection = self.get_connection()?;
488
489        let mut stmt = connection.prepare("SELECT value FROM settings WHERE key = ?")?;
490
491        let result = stmt.query_row(params![key], |row| {
492            let value_str: String = row.get(0)?;
493            Ok(value_str)
494        });
495
496        match result {
497            Ok(value) => Ok(Some(value)),
498            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
499            Err(e) => Err(e.into()),
500        }
501    }
502
503    async fn delete_cached_item(&self, key: String) -> Result<(), StorageError> {
504        let connection = self.get_connection()?;
505
506        connection.execute("DELETE FROM settings WHERE key = ?", params![key])?;
507
508        Ok(())
509    }
510
511    async fn get_payment_by_id(&self, id: String) -> Result<Payment, StorageError> {
512        let connection = self.get_connection()?;
513
514        let mut stmt = connection.prepare(
515            "SELECT p.id
516            ,       p.payment_type
517            ,       p.status
518            ,       p.amount
519            ,       p.fees
520            ,       p.timestamp
521            ,       p.method
522            ,       p.withdraw_tx_id
523            ,       p.deposit_tx_id
524            ,       p.spark
525            ,       l.invoice AS lightning_invoice
526            ,       l.payment_hash AS lightning_payment_hash
527            ,       l.destination_pubkey AS lightning_destination_pubkey
528            ,       COALESCE(l.description, pm.lnurl_description) AS lightning_description
529            ,       l.preimage AS lightning_preimage
530            ,       pm.lnurl_pay_info
531            ,       pm.lnurl_withdraw_info
532            ,       t.metadata AS token_metadata
533            ,       t.tx_hash AS token_tx_hash
534            ,       t.invoice_details AS token_invoice_details
535            ,       s.invoice_details AS spark_invoice_details
536             FROM payments p
537             LEFT JOIN payment_details_lightning l ON p.id = l.payment_id
538             LEFT JOIN payment_details_token t ON p.id = t.payment_id
539             LEFT JOIN payment_details_spark s ON p.id = s.payment_id
540             LEFT JOIN payment_metadata pm ON p.id = pm.payment_id
541             WHERE p.id = ?",
542        )?;
543
544        let payment = stmt.query_row(params![id], map_payment)?;
545        Ok(payment)
546    }
547
548    async fn get_payment_by_invoice(
549        &self,
550        invoice: String,
551    ) -> Result<Option<Payment>, StorageError> {
552        let connection = self.get_connection()?;
553
554        let mut stmt = connection.prepare(
555            "SELECT p.id
556            ,       p.payment_type
557            ,       p.status
558            ,       p.amount
559            ,       p.fees
560            ,       p.timestamp
561            ,       p.method
562            ,       p.withdraw_tx_id
563            ,       p.deposit_tx_id
564            ,       p.spark
565            ,       l.invoice AS lightning_invoice
566            ,       l.payment_hash AS lightning_payment_hash
567            ,       l.destination_pubkey AS lightning_destination_pubkey
568            ,       COALESCE(l.description, pm.lnurl_description) AS lightning_description
569            ,       l.preimage AS lightning_preimage
570            ,       pm.lnurl_pay_info
571            ,       pm.lnurl_withdraw_info
572            ,       t.metadata AS token_metadata
573            ,       t.tx_hash AS token_tx_hash
574            ,       t.invoice_details AS token_invoice_details
575            ,       s.invoice_details AS spark_invoice_details
576             FROM payments p
577             LEFT JOIN payment_details_lightning l ON p.id = l.payment_id
578             LEFT JOIN payment_details_token t ON p.id = t.payment_id
579             LEFT JOIN payment_details_spark s ON p.id = s.payment_id
580             LEFT JOIN payment_metadata pm ON p.id = pm.payment_id
581             WHERE l.invoice = ?",
582        )?;
583
584        let payment = stmt.query_row(params![invoice], map_payment);
585        match payment {
586            Ok(payment) => Ok(Some(payment)),
587            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
588            Err(e) => Err(e.into()),
589        }
590    }
591
592    async fn add_deposit(
593        &self,
594        txid: String,
595        vout: u32,
596        amount_sats: u64,
597    ) -> Result<(), StorageError> {
598        let connection = self.get_connection()?;
599        connection.execute(
600            "INSERT OR IGNORE INTO unclaimed_deposits (txid, vout, amount_sats) 
601             VALUES (?, ?, ?)",
602            params![txid, vout, amount_sats,],
603        )?;
604        Ok(())
605    }
606
607    async fn delete_deposit(&self, txid: String, vout: u32) -> Result<(), StorageError> {
608        let connection = self.get_connection()?;
609        connection.execute(
610            "DELETE FROM unclaimed_deposits WHERE txid = ? AND vout = ?",
611            params![txid, vout],
612        )?;
613        Ok(())
614    }
615
616    async fn list_deposits(&self) -> Result<Vec<DepositInfo>, StorageError> {
617        let connection = self.get_connection()?;
618        let mut stmt =
619            connection.prepare("SELECT txid, vout, amount_sats, claim_error, refund_tx, refund_tx_id FROM unclaimed_deposits")?;
620        let rows = stmt.query_map(params![], |row| {
621            Ok(DepositInfo {
622                txid: row.get(0)?,
623                vout: row.get(1)?,
624                amount_sats: row.get(2)?,
625                claim_error: row.get(3)?,
626                refund_tx: row.get(4)?,
627                refund_tx_id: row.get(5)?,
628            })
629        })?;
630        let mut deposits = Vec::new();
631        for row in rows {
632            deposits.push(row?);
633        }
634        Ok(deposits)
635    }
636
637    async fn update_deposit(
638        &self,
639        txid: String,
640        vout: u32,
641        payload: UpdateDepositPayload,
642    ) -> Result<(), StorageError> {
643        let connection = self.get_connection()?;
644        match payload {
645            UpdateDepositPayload::ClaimError { error } => {
646                connection.execute(
647                    "UPDATE unclaimed_deposits SET claim_error = ? WHERE txid = ? AND vout = ?",
648                    params![error, txid, vout],
649                )?;
650            }
651            UpdateDepositPayload::Refund {
652                refund_txid,
653                refund_tx,
654            } => {
655                connection.execute(
656                    "UPDATE unclaimed_deposits SET refund_tx = ?, refund_tx_id = ? WHERE txid = ? AND vout = ?",
657                    params![refund_tx, refund_txid, txid, vout],
658                )?;
659            }
660        }
661        Ok(())
662    }
663}
664
665/// Bumps the revision number, locking the revision number for updates for the duration of the transaction.
666fn get_next_revision(tx: &Transaction<'_>) -> Result<u64, SyncStorageError> {
667    let revision = tx
668        .query_row(
669            "UPDATE sync_revision
670            SET revision = revision + 1
671            RETURNING revision",
672            [],
673            |row| row.get(0),
674        )
675        .map_err(map_sqlite_error)?;
676    Ok(revision)
677}
678
679impl From<StorageError> for SyncStorageError {
680    fn from(value: StorageError) -> Self {
681        match value {
682            StorageError::Implementation(s) => SyncStorageError::Implementation(s),
683            StorageError::InitializationError(s) => SyncStorageError::InitializationError(s),
684            StorageError::Serialization(s) => SyncStorageError::Serialization(s),
685        }
686    }
687}
688
689#[macros::async_trait]
690impl SyncStorage for SqliteStorage {
691    async fn add_outgoing_change(
692        &self,
693        record: UnversionedRecordChange,
694    ) -> Result<u64, SyncStorageError> {
695        let mut connection = self.get_connection()?;
696        let tx = connection.transaction().map_err(map_sqlite_error)?;
697        let revision = get_next_revision(&tx)?;
698
699        tx.execute(
700            "INSERT INTO sync_outgoing (
701                record_type
702            ,   data_id
703            ,   schema_version
704            ,   commit_time
705            ,   updated_fields_json
706            ,   revision
707            )
708             VALUES (?, ?, ?, strftime('%s','now'), ?, ?)",
709            params![
710                record.id.r#type,
711                record.id.data_id,
712                record.schema_version.clone(),
713                serde_json::to_string(&record.updated_fields)?,
714                revision,
715            ],
716        )
717        .map_err(map_sqlite_error)?;
718
719        tx.commit().map_err(map_sqlite_error)?;
720        Ok(revision)
721    }
722
723    async fn complete_outgoing_sync(&self, record: Record) -> Result<(), SyncStorageError> {
724        let mut connection = self.get_connection()?;
725        let tx = connection.transaction().map_err(map_sqlite_error)?;
726
727        tx.execute(
728            "DELETE FROM sync_outgoing WHERE record_type = ? AND data_id = ? AND revision = ?",
729            params![record.id.r#type, record.id.data_id, record.revision],
730        )
731        .map_err(map_sqlite_error)?;
732
733        tx.execute(
734            "INSERT OR REPLACE INTO sync_state (
735                record_type
736            ,   data_id
737            ,   schema_version
738            ,   commit_time
739            ,   data
740            ,   revision
741            )
742             VALUES (?, ?, ?, strftime('%s','now'), ?, ?)",
743            params![
744                record.id.r#type,
745                record.id.data_id,
746                record.schema_version.clone(),
747                serde_json::to_string(&record.data)?,
748                record.revision,
749            ],
750        )
751        .map_err(map_sqlite_error)?;
752
753        tx.commit().map_err(map_sqlite_error)?;
754        Ok(())
755    }
756
757    async fn get_pending_outgoing_changes(
758        &self,
759        limit: u32,
760    ) -> Result<Vec<OutgoingChange>, SyncStorageError> {
761        let connection = self.get_connection()?;
762
763        let mut stmt = connection
764            .prepare(
765                "SELECT o.record_type
766            ,       o.data_id
767            ,       o.schema_version
768            ,       o.commit_time
769            ,       o.updated_fields_json
770            ,       o.revision
771            ,       e.schema_version AS existing_schema_version
772            ,       e.commit_time AS existing_commit_time
773            ,       e.data AS existing_data
774            ,       e.revision AS existing_revision
775             FROM sync_outgoing o
776             LEFT JOIN sync_state e ON o.record_type = e.record_type AND o.data_id = e.data_id
777             ORDER BY o.revision ASC
778             LIMIT ?",
779            )
780            .map_err(map_sqlite_error)?;
781        let mut rows = stmt.query(params![limit]).map_err(map_sqlite_error)?;
782        let mut results = Vec::new();
783        while let Some(row) = rows.next().map_err(map_sqlite_error)? {
784            let parent = if let Some(existing_data) =
785                row.get::<_, Option<String>>(8).map_err(map_sqlite_error)?
786            {
787                Some(Record {
788                    id: RecordId::new(
789                        row.get::<_, String>(0).map_err(map_sqlite_error)?,
790                        row.get::<_, String>(1).map_err(map_sqlite_error)?,
791                    ),
792                    schema_version: row.get(6).map_err(map_sqlite_error)?,
793                    revision: row.get(9).map_err(map_sqlite_error)?,
794                    data: serde_json::from_str(&existing_data)?,
795                })
796            } else {
797                None
798            };
799            let change = RecordChange {
800                id: RecordId::new(
801                    row.get::<_, String>(0).map_err(map_sqlite_error)?,
802                    row.get::<_, String>(1).map_err(map_sqlite_error)?,
803                ),
804                schema_version: row.get(2).map_err(map_sqlite_error)?,
805                updated_fields: serde_json::from_str(
806                    &row.get::<_, String>(4).map_err(map_sqlite_error)?,
807                )?,
808                revision: row.get(5).map_err(map_sqlite_error)?,
809            };
810            results.push(OutgoingChange { change, parent });
811        }
812
813        Ok(results)
814    }
815
816    async fn get_last_revision(&self) -> Result<u64, SyncStorageError> {
817        let connection = self.get_connection()?;
818
819        // Get the maximum revision from sync_state table
820        let mut stmt = connection
821            .prepare("SELECT COALESCE(MAX(revision), 0) FROM sync_state")
822            .map_err(map_sqlite_error)?;
823
824        let revision: u64 = stmt
825            .query_row([], |row| row.get(0))
826            .map_err(map_sqlite_error)?;
827
828        Ok(revision)
829    }
830
831    async fn insert_incoming_records(&self, records: Vec<Record>) -> Result<(), SyncStorageError> {
832        if records.is_empty() {
833            return Ok(());
834        }
835
836        let mut connection = self.get_connection()?;
837        let tx = connection.transaction().map_err(map_sqlite_error)?;
838
839        for record in records {
840            tx.execute(
841                "INSERT OR REPLACE INTO sync_incoming (
842                    record_type
843                ,   data_id
844                ,   schema_version
845                ,   commit_time
846                ,   data
847                ,   revision
848                )
849                 VALUES (?, ?, ?, strftime('%s','now'), ?, ?)",
850                params![
851                    record.id.r#type,
852                    record.id.data_id,
853                    record.schema_version.clone(),
854                    serde_json::to_string(&record.data)?,
855                    record.revision,
856                ],
857            )
858            .map_err(map_sqlite_error)?;
859        }
860
861        tx.commit().map_err(map_sqlite_error)?;
862        Ok(())
863    }
864
865    async fn delete_incoming_record(&self, record: Record) -> Result<(), SyncStorageError> {
866        let connection = self.get_connection()?;
867
868        connection
869            .execute(
870                "DELETE FROM sync_incoming WHERE record_type = ? AND data_id = ? AND revision = ?",
871                params![record.id.r#type, record.id.data_id, record.revision],
872            )
873            .map_err(map_sqlite_error)?;
874
875        Ok(())
876    }
877
878    async fn rebase_pending_outgoing_records(&self, revision: u64) -> Result<(), SyncStorageError> {
879        let mut connection = self.get_connection()?;
880        let tx = connection.transaction().map_err(map_sqlite_error)?;
881
882        let last_revision = tx
883            .query_row(
884                "SELECT COALESCE(MAX(revision), 0) FROM sync_state",
885                [],
886                |row| row.get(0),
887            )
888            .map_err(map_sqlite_error)?;
889
890        let diff = revision.saturating_sub(last_revision);
891
892        // Update all pending outgoing records to have revision numbers higher than the incoming record
893        tx.execute(
894            "UPDATE sync_outgoing 
895             SET revision = revision + ?",
896            params![diff],
897        )
898        .map_err(map_sqlite_error)?;
899
900        tx.commit().map_err(map_sqlite_error)?;
901        Ok(())
902    }
903
904    async fn get_incoming_records(
905        &self,
906        limit: u32,
907    ) -> Result<Vec<IncomingChange>, SyncStorageError> {
908        let connection = self.get_connection()?;
909
910        let mut stmt = connection
911            .prepare(
912                "SELECT i.record_type
913            ,       i.data_id
914            ,       i.schema_version
915            ,       i.data
916            ,       i.revision
917            ,       e.schema_version AS existing_schema_version
918            ,       e.commit_time AS existing_commit_time
919            ,       e.data AS existing_data
920            ,       e.revision AS existing_revision
921             FROM sync_incoming i
922             LEFT JOIN sync_state e ON i.record_type = e.record_type AND i.data_id = e.data_id
923             ORDER BY i.revision ASC
924             LIMIT ?",
925            )
926            .map_err(map_sqlite_error)?;
927
928        let mut rows = stmt.query(params![limit]).map_err(map_sqlite_error)?;
929        let mut results = Vec::new();
930
931        while let Some(row) = rows.next().map_err(map_sqlite_error)? {
932            let parent = if let Some(existing_data) =
933                row.get::<_, Option<String>>(7).map_err(map_sqlite_error)?
934            {
935                Some(Record {
936                    id: RecordId::new(
937                        row.get::<_, String>(0).map_err(map_sqlite_error)?,
938                        row.get::<_, String>(1).map_err(map_sqlite_error)?,
939                    ),
940                    schema_version: row.get(5).map_err(map_sqlite_error)?,
941                    revision: row.get(8).map_err(map_sqlite_error)?,
942                    data: serde_json::from_str(&existing_data)?,
943                })
944            } else {
945                None
946            };
947            let record = Record {
948                id: RecordId::new(
949                    row.get::<_, String>(0).map_err(map_sqlite_error)?,
950                    row.get::<_, String>(1).map_err(map_sqlite_error)?,
951                ),
952                schema_version: row.get(2).map_err(map_sqlite_error)?,
953                data: serde_json::from_str(&row.get::<_, String>(3).map_err(map_sqlite_error)?)?,
954                revision: row.get(4).map_err(map_sqlite_error)?,
955            };
956            results.push(IncomingChange {
957                new_state: record,
958                old_state: parent,
959            });
960        }
961
962        Ok(results)
963    }
964
965    async fn get_latest_outgoing_change(&self) -> Result<Option<OutgoingChange>, SyncStorageError> {
966        let connection = self.get_connection()?;
967
968        let mut stmt = connection
969            .prepare(
970                "SELECT o.record_type
971            ,       o.data_id
972            ,       o.schema_version
973            ,       o.commit_time
974            ,       o.updated_fields_json
975            ,       o.revision
976            ,       e.schema_version AS existing_schema_version
977            ,       e.commit_time AS existing_commit_time
978            ,       e.data AS existing_data
979            ,       e.revision AS existing_revision
980             FROM sync_outgoing o
981             LEFT JOIN sync_state e ON o.record_type = e.record_type AND o.data_id = e.data_id
982             ORDER BY o.revision DESC
983             LIMIT 1",
984            )
985            .map_err(map_sqlite_error)?;
986
987        let mut rows = stmt.query([]).map_err(map_sqlite_error)?;
988
989        if let Some(row) = rows.next().map_err(map_sqlite_error)? {
990            let parent = if let Some(existing_data) =
991                row.get::<_, Option<String>>(8).map_err(map_sqlite_error)?
992            {
993                Some(Record {
994                    id: RecordId::new(
995                        row.get::<_, String>(0).map_err(map_sqlite_error)?,
996                        row.get::<_, String>(1).map_err(map_sqlite_error)?,
997                    ),
998                    schema_version: row.get(6).map_err(map_sqlite_error)?,
999                    revision: row.get(9).map_err(map_sqlite_error)?,
1000                    data: serde_json::from_str(&existing_data)?,
1001                })
1002            } else {
1003                None
1004            };
1005            let change = RecordChange {
1006                id: RecordId::new(
1007                    row.get::<_, String>(0).map_err(map_sqlite_error)?,
1008                    row.get::<_, String>(1).map_err(map_sqlite_error)?,
1009                ),
1010                schema_version: row.get(2).map_err(map_sqlite_error)?,
1011                updated_fields: serde_json::from_str(
1012                    &row.get::<_, String>(4).map_err(map_sqlite_error)?,
1013                )?,
1014                revision: row.get(5).map_err(map_sqlite_error)?,
1015            };
1016
1017            return Ok(Some(OutgoingChange { change, parent }));
1018        }
1019
1020        Ok(None)
1021    }
1022
1023    async fn update_record_from_incoming(&self, record: Record) -> Result<(), SyncStorageError> {
1024        let connection = self.get_connection()?;
1025
1026        connection
1027            .execute(
1028                "INSERT OR REPLACE INTO sync_state (
1029                record_type
1030            ,   data_id
1031            ,   schema_version
1032            ,   commit_time
1033            ,   data
1034            ,   revision
1035            )
1036             VALUES (?, ?, ?, strftime('%s','now'), ?, ?)",
1037                params![
1038                    record.id.r#type,
1039                    record.id.data_id,
1040                    record.schema_version.clone(),
1041                    serde_json::to_string(&record.data)?,
1042                    record.revision,
1043                ],
1044            )
1045            .map_err(map_sqlite_error)?;
1046
1047        Ok(())
1048    }
1049}
1050
1051#[allow(clippy::needless_pass_by_value)]
1052fn map_sqlite_error(value: rusqlite::Error) -> SyncStorageError {
1053    SyncStorageError::Implementation(value.to_string())
1054}
1055
1056fn map_payment(row: &Row<'_>) -> Result<Payment, rusqlite::Error> {
1057    let withdraw_tx_id: Option<String> = row.get(7)?;
1058    let deposit_tx_id: Option<String> = row.get(8)?;
1059    let spark: Option<i32> = row.get(9)?;
1060    let lightning_invoice: Option<String> = row.get(10)?;
1061    let token_metadata: Option<String> = row.get(17)?;
1062    let details = match (
1063        lightning_invoice,
1064        withdraw_tx_id,
1065        deposit_tx_id,
1066        spark,
1067        token_metadata,
1068    ) {
1069        (Some(invoice), _, _, _, _) => {
1070            let payment_hash: String = row.get(11)?;
1071            let destination_pubkey: String = row.get(12)?;
1072            let description: Option<String> = row.get(13)?;
1073            let preimage: Option<String> = row.get(14)?;
1074            let lnurl_pay_info: Option<LnurlPayInfo> = row.get(15)?;
1075            let lnurl_withdraw_info: Option<LnurlWithdrawInfo> = row.get(16)?;
1076
1077            Some(PaymentDetails::Lightning {
1078                invoice,
1079                payment_hash,
1080                destination_pubkey,
1081                description,
1082                preimage,
1083                lnurl_pay_info,
1084                lnurl_withdraw_info,
1085            })
1086        }
1087        (_, Some(tx_id), _, _, _) => Some(PaymentDetails::Withdraw { tx_id }),
1088        (_, _, Some(tx_id), _, _) => Some(PaymentDetails::Deposit { tx_id }),
1089        (_, _, _, Some(_), _) => {
1090            let invoice_details_str: Option<String> = row.get(20)?;
1091            let invoice_details = invoice_details_str
1092                .map(|s| {
1093                    serde_json::from_str(&s).map_err(|e| {
1094                        rusqlite::Error::FromSqlConversionFailure(
1095                            20,
1096                            rusqlite::types::Type::Text,
1097                            e.into(),
1098                        )
1099                    })
1100                })
1101                .transpose()?;
1102            Some(PaymentDetails::Spark { invoice_details })
1103        }
1104        (_, _, _, _, Some(metadata)) => {
1105            let invoice_details_str: Option<String> = row.get(19)?;
1106            let invoice_details = invoice_details_str
1107                .map(|s| {
1108                    serde_json::from_str(&s).map_err(|e| {
1109                        rusqlite::Error::FromSqlConversionFailure(
1110                            19,
1111                            rusqlite::types::Type::Text,
1112                            e.into(),
1113                        )
1114                    })
1115                })
1116                .transpose()?;
1117            Some(PaymentDetails::Token {
1118                metadata: serde_json::from_str(&metadata).map_err(|e| {
1119                    rusqlite::Error::FromSqlConversionFailure(
1120                        17,
1121                        rusqlite::types::Type::Text,
1122                        e.into(),
1123                    )
1124                })?,
1125                tx_hash: row.get(18)?,
1126                invoice_details,
1127            })
1128        }
1129        _ => None,
1130    };
1131    Ok(Payment {
1132        id: row.get(0)?,
1133        payment_type: row.get::<_, String>(1)?.parse().map_err(|e: String| {
1134            rusqlite::Error::FromSqlConversionFailure(1, rusqlite::types::Type::Text, e.into())
1135        })?,
1136        status: row.get::<_, String>(2)?.parse().map_err(|e: String| {
1137            rusqlite::Error::FromSqlConversionFailure(2, rusqlite::types::Type::Text, e.into())
1138        })?,
1139        amount: row.get::<_, U128SqlWrapper>(3)?.0,
1140        fees: row.get::<_, U128SqlWrapper>(4)?.0,
1141        timestamp: row.get(5)?,
1142        details,
1143        method: row.get(6)?,
1144    })
1145}
1146
1147impl ToSql for PaymentDetails {
1148    fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
1149        to_sql_json(self)
1150    }
1151}
1152
1153impl FromSql for PaymentDetails {
1154    fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
1155        from_sql_json(value)
1156    }
1157}
1158
1159impl ToSql for PaymentMethod {
1160    fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
1161        Ok(rusqlite::types::ToSqlOutput::from(self.to_string()))
1162    }
1163}
1164
1165impl FromSql for PaymentMethod {
1166    fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
1167        match value {
1168            ValueRef::Text(i) => {
1169                let s = std::str::from_utf8(i).map_err(|e| FromSqlError::Other(Box::new(e)))?;
1170                // NOTE: trim_matches/to_lowercase is here, because this used to be serde_json serialized.
1171                let payment_method: PaymentMethod = s
1172                    .trim_matches('"')
1173                    .to_lowercase()
1174                    .parse()
1175                    .map_err(|()| FromSqlError::InvalidType)?;
1176                Ok(payment_method)
1177            }
1178            _ => Err(FromSqlError::InvalidType),
1179        }
1180    }
1181}
1182
1183impl ToSql for DepositClaimError {
1184    fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
1185        to_sql_json(self)
1186    }
1187}
1188
1189impl FromSql for DepositClaimError {
1190    fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
1191        from_sql_json(value)
1192    }
1193}
1194
1195impl ToSql for LnurlPayInfo {
1196    fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
1197        to_sql_json(self)
1198    }
1199}
1200
1201impl FromSql for LnurlPayInfo {
1202    fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
1203        from_sql_json(value)
1204    }
1205}
1206
1207impl ToSql for LnurlWithdrawInfo {
1208    fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
1209        to_sql_json(self)
1210    }
1211}
1212
1213impl FromSql for LnurlWithdrawInfo {
1214    fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
1215        from_sql_json(value)
1216    }
1217}
1218
1219fn to_sql_json<T>(value: T) -> rusqlite::Result<ToSqlOutput<'static>>
1220where
1221    T: serde::Serialize,
1222{
1223    let json = serde_json::to_string(&value)
1224        .map_err(|e| rusqlite::Error::ToSqlConversionFailure(Box::new(e)))?;
1225    Ok(rusqlite::types::ToSqlOutput::from(json))
1226}
1227
1228fn from_sql_json<T>(value: ValueRef<'_>) -> FromSqlResult<T>
1229where
1230    T: serde::de::DeserializeOwned,
1231{
1232    match value {
1233        ValueRef::Text(i) => {
1234            let s = std::str::from_utf8(i).map_err(|e| FromSqlError::Other(Box::new(e)))?;
1235            let deserialized: T = serde_json::from_str(s).map_err(|_| FromSqlError::InvalidType)?;
1236            Ok(deserialized)
1237        }
1238        _ => Err(FromSqlError::InvalidType),
1239    }
1240}
1241
1242struct U128SqlWrapper(u128);
1243
1244impl ToSql for U128SqlWrapper {
1245    fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
1246        let string = self.0.to_string();
1247        Ok(rusqlite::types::ToSqlOutput::from(string))
1248    }
1249}
1250
1251impl FromSql for U128SqlWrapper {
1252    fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
1253        match value {
1254            ValueRef::Text(i) => {
1255                let s = std::str::from_utf8(i).map_err(|e| FromSqlError::Other(Box::new(e)))?;
1256                let integer = s.parse::<u128>().map_err(|_| FromSqlError::InvalidType)?;
1257                Ok(U128SqlWrapper(integer))
1258            }
1259            _ => Err(FromSqlError::InvalidType),
1260        }
1261    }
1262}
1263
1264#[cfg(test)]
1265mod tests {
1266
1267    use crate::SqliteStorage;
1268
1269    #[tokio::test]
1270    async fn test_sqlite_storage() {
1271        let temp_dir = tempdir::TempDir::new("sqlite_storage").unwrap();
1272        let storage = SqliteStorage::new(temp_dir.path()).unwrap();
1273
1274        crate::persist::tests::test_sqlite_storage(Box::new(storage)).await;
1275    }
1276
1277    #[tokio::test]
1278    async fn test_unclaimed_deposits_crud() {
1279        let temp_dir = tempdir::TempDir::new("sqlite_storage_deposits").unwrap();
1280        let storage = SqliteStorage::new(temp_dir.path()).unwrap();
1281
1282        crate::persist::tests::test_unclaimed_deposits_crud(Box::new(storage)).await;
1283    }
1284
1285    #[tokio::test]
1286    async fn test_deposit_refunds() {
1287        let temp_dir = tempdir::TempDir::new("sqlite_storage_refund_tx").unwrap();
1288        let storage = SqliteStorage::new(temp_dir.path()).unwrap();
1289
1290        crate::persist::tests::test_deposit_refunds(Box::new(storage)).await;
1291    }
1292
1293    #[tokio::test]
1294    async fn test_payment_type_filtering() {
1295        let temp_dir = tempdir::TempDir::new("sqlite_storage_type_filter").unwrap();
1296        let storage = SqliteStorage::new(temp_dir.path()).unwrap();
1297
1298        crate::persist::tests::test_payment_type_filtering(Box::new(storage)).await;
1299    }
1300
1301    #[tokio::test]
1302    async fn test_payment_status_filtering() {
1303        let temp_dir = tempdir::TempDir::new("sqlite_storage_status_filter").unwrap();
1304        let storage = SqliteStorage::new(temp_dir.path()).unwrap();
1305
1306        crate::persist::tests::test_payment_status_filtering(Box::new(storage)).await;
1307    }
1308
1309    #[tokio::test]
1310    async fn test_payment_details_filtering() {
1311        let temp_dir = tempdir::TempDir::new("sqlite_storage_details_filter").unwrap();
1312        let storage = SqliteStorage::new(temp_dir.path()).unwrap();
1313
1314        crate::persist::tests::test_asset_filtering(Box::new(storage)).await;
1315    }
1316
1317    #[tokio::test]
1318    async fn test_timestamp_filtering() {
1319        let temp_dir = tempdir::TempDir::new("sqlite_storage_timestamp_filter").unwrap();
1320        let storage = SqliteStorage::new(temp_dir.path()).unwrap();
1321
1322        crate::persist::tests::test_timestamp_filtering(Box::new(storage)).await;
1323    }
1324
1325    #[tokio::test]
1326    async fn test_combined_filters() {
1327        let temp_dir = tempdir::TempDir::new("sqlite_storage_combined_filter").unwrap();
1328        let storage = SqliteStorage::new(temp_dir.path()).unwrap();
1329
1330        crate::persist::tests::test_combined_filters(Box::new(storage)).await;
1331    }
1332
1333    #[tokio::test]
1334    async fn test_sort_order() {
1335        let temp_dir = tempdir::TempDir::new("sqlite_storage_sort_order").unwrap();
1336        let storage = SqliteStorage::new(temp_dir.path()).unwrap();
1337
1338        crate::persist::tests::test_sort_order(Box::new(storage)).await;
1339    }
1340
1341    #[tokio::test]
1342    async fn test_payment_request_metadata() {
1343        let temp_dir = tempdir::TempDir::new("sqlite_storage_payment_request_metadata").unwrap();
1344        let storage = SqliteStorage::new(temp_dir.path()).unwrap();
1345
1346        crate::persist::tests::test_payment_request_metadata(Box::new(storage)).await;
1347    }
1348
1349    #[tokio::test]
1350    async fn test_sync_storage() {
1351        let temp_dir = tempdir::TempDir::new("sqlite_sync_storage").unwrap();
1352        let storage = SqliteStorage::new(temp_dir.path()).unwrap();
1353
1354        crate::persist::tests::test_sqlite_sync_storage(Box::new(storage)).await;
1355    }
1356}