breez_sdk_spark/persist/
sqlite.rs

1use std::path::{Path, PathBuf};
2
3use macros::async_trait;
4use rusqlite::{
5    Connection, Row, ToSql, Transaction, params,
6    types::{FromSql, FromSqlError, FromSqlResult, ToSqlOutput, ValueRef},
7};
8use rusqlite_migration::{M, Migrations, SchemaVersion};
9
10use crate::{
11    AssetFilter, DepositInfo, ListPaymentsRequest, LnurlPayInfo, LnurlReceiveMetadata,
12    LnurlWithdrawInfo, PaymentDetails, PaymentMethod,
13    error::DepositClaimError,
14    persist::{PaymentMetadata, SetLnurlMetadataItem, UpdateDepositPayload},
15    sync_storage::{
16        IncomingChange, OutgoingChange, Record, RecordChange, RecordId, SyncStorage,
17        SyncStorageError, UnversionedRecordChange,
18    },
19};
20
21use super::{Payment, Storage, StorageError};
22
23const DEFAULT_DB_FILENAME: &str = "storage.sql";
24/// SQLite-based storage implementation
25pub struct SqliteStorage {
26    db_dir: PathBuf,
27}
28
29impl SqliteStorage {
30    /// Creates a new `SQLite` storage
31    ///
32    /// # Arguments
33    ///
34    /// * `path` - Path to the `SQLite` database file
35    ///
36    /// # Returns
37    ///
38    /// A new `SqliteStorage` instance or an error
39    pub fn new(path: &Path) -> Result<Self, StorageError> {
40        let storage = Self {
41            db_dir: path.to_path_buf(),
42        };
43
44        #[cfg(not(all(target_family = "wasm", target_os = "unknown")))]
45        std::fs::create_dir_all(path)
46            .map_err(|e| StorageError::InitializationError(e.to_string()))?;
47
48        storage.migrate()?;
49        Ok(storage)
50    }
51
52    pub(crate) fn get_connection(&self) -> Result<Connection, StorageError> {
53        Ok(Connection::open(self.get_db_path())?)
54    }
55
56    fn get_db_path(&self) -> PathBuf {
57        self.db_dir.join(DEFAULT_DB_FILENAME)
58    }
59
60    fn migrate(&self) -> Result<(), StorageError> {
61        let migrations =
62            Migrations::new(Self::current_migrations().into_iter().map(M::up).collect());
63        let mut conn = self.get_connection()?;
64        let previous_version = match migrations.current_version(&conn)? {
65            SchemaVersion::Inside(previous_version) => previous_version.get(),
66            _ => 0,
67        };
68        migrations.to_latest(&mut conn)?;
69
70        if previous_version < 6 {
71            Self::migrate_lnurl_metadata_description(&mut conn)?;
72        }
73
74        Ok(())
75    }
76
77    fn migrate_lnurl_metadata_description(conn: &mut Connection) -> Result<(), StorageError> {
78        let mut stmt = conn.prepare("SELECT payment_id, lnurl_pay_info FROM payment_metadata")?;
79        let pay_infos: Vec<_> = stmt
80            .query_map([], |row| {
81                let payment_id: String = row.get(0)?;
82                let lnurl_pay_info: Option<LnurlPayInfo> = row.get(1)?;
83                Ok((payment_id, lnurl_pay_info))
84            })?
85            .collect::<Result<_, _>>()?;
86        let pay_infos = pay_infos
87            .into_iter()
88            .filter_map(|(payment_id, lnurl_pay_info)| {
89                let pay_info = lnurl_pay_info?;
90                let description = pay_info.extract_description()?;
91                Some((payment_id, description))
92            })
93            .collect::<Vec<_>>();
94
95        for pay_info in pay_infos {
96            conn.execute(
97                "UPDATE payment_metadata SET lnurl_description = ? WHERE payment_id = ?",
98                params![pay_info.1, pay_info.0],
99            )?;
100        }
101
102        Ok(())
103    }
104
105    #[allow(clippy::too_many_lines)]
106    pub(crate) fn current_migrations() -> Vec<&'static str> {
107        vec![
108            "CREATE TABLE IF NOT EXISTS payments (
109              id TEXT PRIMARY KEY,
110              payment_type TEXT NOT NULL,
111              status TEXT NOT NULL,
112              amount INTEGER NOT NULL,
113              fees INTEGER NOT NULL,
114              timestamp INTEGER NOT NULL,
115              details TEXT,
116              method TEXT
117            );",
118            "CREATE TABLE IF NOT EXISTS settings (
119              key TEXT PRIMARY KEY,
120              value TEXT NOT NULL
121            );",
122            "CREATE TABLE IF NOT EXISTS unclaimed_deposits (
123              txid TEXT NOT NULL,
124              vout INTEGER NOT NULL,
125              amount_sats INTEGER,
126              claim_error TEXT,
127              refund_tx TEXT,
128              refund_tx_id TEXT,
129              PRIMARY KEY (txid, vout)
130            );",
131            "CREATE TABLE IF NOT EXISTS payment_metadata (
132              payment_id TEXT PRIMARY KEY,
133              lnurl_pay_info TEXT
134            );",
135            "CREATE TABLE IF NOT EXISTS deposit_refunds (
136              deposit_tx_id TEXT NOT NULL,
137              deposit_vout INTEGER NOT NULL,
138              refund_tx TEXT NOT NULL,
139              refund_tx_id TEXT NOT NULL,
140              PRIMARY KEY (deposit_tx_id, deposit_vout)              
141            );",
142            "ALTER TABLE payment_metadata ADD COLUMN lnurl_description TEXT;",
143            "
144            ALTER TABLE payments ADD COLUMN withdraw_tx_id TEXT;
145            ALTER TABLE payments ADD COLUMN deposit_tx_id TEXT;
146            ALTER TABLE payments ADD COLUMN spark INTEGER;
147            CREATE TABLE payment_details_lightning (
148              payment_id TEXT PRIMARY KEY,
149              invoice TEXT NOT NULL,
150              payment_hash TEXT NOT NULL,
151              destination_pubkey TEXT NOT NULL,
152              description TEXT,
153              preimage TEXT,
154              FOREIGN KEY (payment_id) REFERENCES payments(id) ON DELETE CASCADE
155            );
156            INSERT INTO payment_details_lightning (payment_id, invoice, payment_hash, destination_pubkey, description, preimage)
157            SELECT id, json_extract(details, '$.Lightning.invoice'), json_extract(details, '$.Lightning.payment_hash'), 
158                json_extract(details, '$.Lightning.destination_pubkey'), json_extract(details, '$.Lightning.description'), 
159                json_extract(details, '$.Lightning.preimage') 
160            FROM payments WHERE json_extract(details, '$.Lightning.invoice') IS NOT NULL;
161
162            UPDATE payments SET withdraw_tx_id = json_extract(details, '$.Withdraw.tx_id')
163            WHERE json_extract(details, '$.Withdraw.tx_id') IS NOT NULL;
164
165            UPDATE payments SET deposit_tx_id = json_extract(details, '$.Deposit.tx_id')
166            WHERE json_extract(details, '$.Deposit.tx_id') IS NOT NULL;
167
168            ALTER TABLE payments DROP COLUMN details;
169
170            CREATE INDEX idx_payment_details_lightning_invoice ON payment_details_lightning(invoice);
171            ",
172            "CREATE TABLE payment_details_token (
173              payment_id TEXT PRIMARY KEY,
174              metadata TEXT NOT NULL,
175              tx_hash TEXT NOT NULL,
176              FOREIGN KEY (payment_id) REFERENCES payments(id) ON DELETE CASCADE
177            );",
178            // Migration to change payments amount and fees from INTEGER to TEXT
179            "CREATE TABLE payments_new (
180              id TEXT PRIMARY KEY,
181              payment_type TEXT NOT NULL,
182              status TEXT NOT NULL,
183              amount TEXT NOT NULL,
184              fees TEXT NOT NULL,
185              timestamp INTEGER NOT NULL,
186              method TEXT,
187              withdraw_tx_id TEXT,
188              deposit_tx_id TEXT,
189              spark INTEGER
190            );",
191            "INSERT INTO payments_new (id, payment_type, status, amount, fees, timestamp, method, withdraw_tx_id, deposit_tx_id, spark)
192             SELECT id, payment_type, status, CAST(amount AS TEXT), CAST(fees AS TEXT), timestamp, method, withdraw_tx_id, deposit_tx_id, spark
193             FROM payments;",
194            "DROP TABLE payments;",
195            "ALTER TABLE payments_new RENAME TO payments;",
196            "CREATE TABLE payment_details_spark (
197              payment_id TEXT NOT NULL PRIMARY KEY,
198              invoice_details TEXT NOT NULL,
199              FOREIGN KEY (payment_id) REFERENCES payments(id) ON DELETE CASCADE
200            );
201            ALTER TABLE payment_details_token ADD COLUMN invoice_details TEXT;",
202            "ALTER TABLE payment_metadata ADD COLUMN lnurl_withdraw_info TEXT;",
203            "CREATE TABLE sync_revision (
204                revision INTEGER NOT NULL DEFAULT 0
205            );
206            INSERT INTO sync_revision (revision) VALUES (0);
207            CREATE TABLE sync_outgoing(
208                record_type TEXT NOT NULL,
209                data_id TEXT NOT NULL,
210                schema_version TEXT NOT NULL,
211                commit_time INTEGER NOT NULL,
212                updated_fields_json TEXT NOT NULL,
213                revision INTEGER NOT NULL
214            );
215            CREATE INDEX idx_sync_outgoing_data_id_record_type ON sync_outgoing(record_type, data_id);
216            CREATE TABLE sync_state(
217                record_type TEXT NOT NULL,
218                data_id TEXT NOT NULL,
219                schema_version TEXT NOT NULL,
220                commit_time INTEGER NOT NULL,
221                data TEXT NOT NULL,
222                revision INTEGER NOT NULL,
223                PRIMARY KEY(record_type, data_id)
224            );",
225            "CREATE TABLE sync_incoming(
226                record_type TEXT NOT NULL,
227                data_id TEXT NOT NULL,
228                schema_version TEXT NOT NULL,
229                commit_time INTEGER NOT NULL,
230                data TEXT NOT NULL,
231                revision INTEGER NOT NULL,
232                PRIMARY KEY(record_type, data_id, revision)
233            );
234            CREATE INDEX idx_sync_incoming_revision ON sync_incoming(revision);",
235            "ALTER TABLE payment_details_spark RENAME TO tmp_payment_details_spark;
236            CREATE TABLE payment_details_spark (
237              payment_id TEXT NOT NULL PRIMARY KEY,
238              invoice_details TEXT,
239              htlc_details TEXT,
240              FOREIGN KEY (payment_id) REFERENCES payments(id) ON DELETE CASCADE
241            );
242            INSERT INTO payment_details_spark (payment_id, invoice_details)
243             SELECT payment_id, invoice_details FROM tmp_payment_details_spark;
244            DROP TABLE tmp_payment_details_spark;",
245            "CREATE TABLE lnurl_receive_metadata (
246                payment_hash TEXT NOT NULL PRIMARY KEY,
247                nostr_zap_request TEXT,
248                nostr_zap_receipt TEXT,
249                sender_comment TEXT
250            );",
251            // Delete all unclaimed deposits to clear old claim_error JSON format.
252            // Deposits will be recovered on next sync.
253            "DELETE FROM unclaimed_deposits;",
254        ]
255    }
256}
257
258impl From<rusqlite::Error> for StorageError {
259    fn from(value: rusqlite::Error) -> Self {
260        StorageError::Implementation(value.to_string())
261    }
262}
263
264impl From<rusqlite_migration::Error> for StorageError {
265    fn from(value: rusqlite_migration::Error) -> Self {
266        StorageError::Implementation(value.to_string())
267    }
268}
269
270#[async_trait]
271impl Storage for SqliteStorage {
272    #[allow(clippy::too_many_lines)]
273    async fn list_payments(
274        &self,
275        request: ListPaymentsRequest,
276    ) -> Result<Vec<Payment>, StorageError> {
277        let connection = self.get_connection()?;
278
279        // Build WHERE clauses based on filters
280        let mut where_clauses = Vec::new();
281        let mut params: Vec<Box<dyn ToSql>> = Vec::new();
282
283        // Filter by payment type
284        if let Some(ref type_filter) = request.type_filter
285            && !type_filter.is_empty()
286        {
287            let placeholders = type_filter
288                .iter()
289                .map(|_| "?")
290                .collect::<Vec<_>>()
291                .join(", ");
292            where_clauses.push(format!("p.payment_type IN ({placeholders})"));
293            for payment_type in type_filter {
294                params.push(Box::new(payment_type.to_string()));
295            }
296        }
297
298        // Filter by status
299        if let Some(ref status_filter) = request.status_filter
300            && !status_filter.is_empty()
301        {
302            let placeholders = status_filter
303                .iter()
304                .map(|_| "?")
305                .collect::<Vec<_>>()
306                .join(", ");
307            where_clauses.push(format!("p.status IN ({placeholders})"));
308            for status in status_filter {
309                params.push(Box::new(status.to_string()));
310            }
311        }
312
313        // Filter by timestamp range
314        if let Some(from_timestamp) = request.from_timestamp {
315            where_clauses.push("p.timestamp >= ?".to_string());
316            params.push(Box::new(from_timestamp));
317        }
318
319        if let Some(to_timestamp) = request.to_timestamp {
320            where_clauses.push("p.timestamp < ?".to_string());
321            params.push(Box::new(to_timestamp));
322        }
323
324        // Filter by asset
325        if let Some(ref asset_filter) = request.asset_filter {
326            match asset_filter {
327                AssetFilter::Bitcoin => {
328                    where_clauses.push("t.metadata IS NULL".to_string());
329                }
330                AssetFilter::Token { token_identifier } => {
331                    where_clauses.push("t.metadata IS NOT NULL".to_string());
332                    if let Some(identifier) = token_identifier {
333                        // Filter by specific token identifier
334                        where_clauses
335                            .push("json_extract(t.metadata, '$.identifier') = ?".to_string());
336                        params.push(Box::new(identifier.clone()));
337                    }
338                }
339            }
340        }
341
342        // Filter by Spark HTLC status
343        if let Some(ref htlc_status_filter) = request.spark_htlc_status_filter
344            && !htlc_status_filter.is_empty()
345        {
346            let placeholders = htlc_status_filter
347                .iter()
348                .map(|_| "?")
349                .collect::<Vec<_>>()
350                .join(", ");
351            where_clauses.push(format!(
352                "json_extract(s.htlc_details, '$.status') IN ({placeholders})"
353            ));
354            for htlc_status in htlc_status_filter {
355                params.push(Box::new(htlc_status.to_string()));
356            }
357        }
358
359        // Build the WHERE clause
360        let where_sql = if where_clauses.is_empty() {
361            String::new()
362        } else {
363            format!("WHERE {}", where_clauses.join(" AND "))
364        };
365
366        // Determine sort order
367        let order_direction = if request.sort_ascending.unwrap_or(false) {
368            "ASC"
369        } else {
370            "DESC"
371        };
372
373        let query = format!(
374            "SELECT p.id
375            ,       p.payment_type
376            ,       p.status
377            ,       p.amount
378            ,       p.fees
379            ,       p.timestamp
380            ,       p.method
381            ,       p.withdraw_tx_id
382            ,       p.deposit_tx_id
383            ,       p.spark
384            ,       l.invoice AS lightning_invoice
385            ,       l.payment_hash AS lightning_payment_hash
386            ,       l.destination_pubkey AS lightning_destination_pubkey
387            ,       COALESCE(l.description, pm.lnurl_description) AS lightning_description
388            ,       l.preimage AS lightning_preimage
389            ,       pm.lnurl_pay_info
390            ,       pm.lnurl_withdraw_info
391            ,       t.metadata AS token_metadata
392            ,       t.tx_hash AS token_tx_hash
393            ,       t.invoice_details AS token_invoice_details
394            ,       s.invoice_details AS spark_invoice_details
395            ,       s.htlc_details AS spark_htlc_details
396            ,       lrm.nostr_zap_request AS lnurl_nostr_zap_request
397            ,       lrm.nostr_zap_receipt AS lnurl_nostr_zap_receipt
398            ,       lrm.sender_comment AS lnurl_sender_comment
399             FROM payments p
400             LEFT JOIN payment_details_lightning l ON p.id = l.payment_id
401             LEFT JOIN payment_details_token t ON p.id = t.payment_id
402             LEFT JOIN payment_details_spark s ON p.id = s.payment_id
403             LEFT JOIN payment_metadata pm ON p.id = pm.payment_id
404             LEFT JOIN lnurl_receive_metadata lrm ON l.payment_hash = lrm.payment_hash
405             {}
406             ORDER BY p.timestamp {} 
407             LIMIT {} OFFSET {}",
408            where_sql,
409            order_direction,
410            request.limit.unwrap_or(u32::MAX),
411            request.offset.unwrap_or(0)
412        );
413
414        let mut stmt = connection.prepare(&query)?;
415        let param_refs: Vec<&dyn ToSql> = params.iter().map(std::convert::AsRef::as_ref).collect();
416        let payments = stmt
417            .query_map(param_refs.as_slice(), map_payment)?
418            .collect::<Result<Vec<_>, _>>()?;
419        Ok(payments)
420    }
421
422    async fn insert_payment(&self, payment: Payment) -> Result<(), StorageError> {
423        let mut connection = self.get_connection()?;
424        let tx = connection.transaction()?;
425        tx.execute(
426            "INSERT OR REPLACE INTO payments (id, payment_type, status, amount, fees, timestamp, method) 
427             VALUES (?, ?, ?, ?, ?, ?, ?)",
428            params![
429                payment.id,
430                payment.payment_type.to_string(),
431                payment.status.to_string(),
432                U128SqlWrapper(payment.amount),
433                U128SqlWrapper(payment.fees),
434                payment.timestamp,
435                payment.method,
436            ],
437        )?;
438
439        match payment.details {
440            Some(PaymentDetails::Withdraw { tx_id }) => {
441                tx.execute(
442                    "UPDATE payments SET withdraw_tx_id = ? WHERE id = ?",
443                    params![tx_id, payment.id],
444                )?;
445            }
446            Some(PaymentDetails::Deposit { tx_id }) => {
447                tx.execute(
448                    "UPDATE payments SET deposit_tx_id = ? WHERE id = ?",
449                    params![tx_id, payment.id],
450                )?;
451            }
452            Some(PaymentDetails::Spark {
453                invoice_details,
454                htlc_details,
455            }) => {
456                tx.execute(
457                    "UPDATE payments SET spark = 1 WHERE id = ?",
458                    params![payment.id],
459                )?;
460                if let Some(invoice_details) = invoice_details {
461                    tx.execute("INSERT OR REPLACE INTO payment_details_spark (payment_id, invoice_details) VALUES (?, ?)",
462                        params![payment.id, serde_json::to_string(&invoice_details)?],
463                    )?;
464                }
465                if let Some(htlc_details) = htlc_details {
466                    tx.execute("INSERT OR REPLACE INTO payment_details_spark (payment_id, htlc_details) VALUES (?, ?)",
467                        params![payment.id, serde_json::to_string(&htlc_details)?],
468                    )?;
469                }
470            }
471            Some(PaymentDetails::Token {
472                metadata,
473                tx_hash,
474                invoice_details,
475            }) => {
476                tx.execute(
477                    "INSERT OR REPLACE INTO payment_details_token (payment_id, metadata, tx_hash, invoice_details) VALUES (?, ?, ?, ?)",
478                    params![payment.id, serde_json::to_string(&metadata)?, tx_hash, invoice_details.map(|d| serde_json::to_string(&d)).transpose()?],
479                )?;
480            }
481            Some(PaymentDetails::Lightning {
482                invoice,
483                payment_hash,
484                destination_pubkey,
485                description,
486                preimage,
487                ..
488            }) => {
489                tx.execute(
490                    "INSERT OR REPLACE INTO payment_details_lightning (payment_id, invoice, payment_hash, destination_pubkey, description, preimage) 
491                     VALUES (?, ?, ?, ?, ?, ?)",
492                    params![
493                        payment.id,
494                        invoice,
495                        payment_hash,
496                        destination_pubkey,
497                        description,
498                        preimage,
499                    ],
500                )?;
501            }
502            None => {}
503        }
504
505        tx.commit()?;
506        Ok(())
507    }
508
509    async fn set_payment_metadata(
510        &self,
511        payment_id: String,
512        metadata: PaymentMetadata,
513    ) -> Result<(), StorageError> {
514        let connection = self.get_connection()?;
515
516        connection.execute(
517            "INSERT OR REPLACE INTO payment_metadata (payment_id, lnurl_pay_info, lnurl_withdraw_info, lnurl_description) VALUES (?, ?, ?, ?)",
518            params![payment_id, metadata.lnurl_pay_info, metadata.lnurl_withdraw_info, metadata.lnurl_description],
519        )?;
520
521        Ok(())
522    }
523
524    async fn set_cached_item(&self, key: String, value: String) -> Result<(), StorageError> {
525        let connection = self.get_connection()?;
526
527        connection.execute(
528            "INSERT OR REPLACE INTO settings (key, value) VALUES (?, ?)",
529            params![key, value],
530        )?;
531
532        Ok(())
533    }
534
535    async fn get_cached_item(&self, key: String) -> Result<Option<String>, StorageError> {
536        let connection = self.get_connection()?;
537
538        let mut stmt = connection.prepare("SELECT value FROM settings WHERE key = ?")?;
539
540        let result = stmt.query_row(params![key], |row| {
541            let value_str: String = row.get(0)?;
542            Ok(value_str)
543        });
544
545        match result {
546            Ok(value) => Ok(Some(value)),
547            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
548            Err(e) => Err(e.into()),
549        }
550    }
551
552    async fn delete_cached_item(&self, key: String) -> Result<(), StorageError> {
553        let connection = self.get_connection()?;
554
555        connection.execute("DELETE FROM settings WHERE key = ?", params![key])?;
556
557        Ok(())
558    }
559
560    async fn get_payment_by_id(&self, id: String) -> Result<Payment, StorageError> {
561        let connection = self.get_connection()?;
562
563        let mut stmt = connection.prepare(
564            "SELECT p.id
565            ,       p.payment_type
566            ,       p.status
567            ,       p.amount
568            ,       p.fees
569            ,       p.timestamp
570            ,       p.method
571            ,       p.withdraw_tx_id
572            ,       p.deposit_tx_id
573            ,       p.spark
574            ,       l.invoice AS lightning_invoice
575            ,       l.payment_hash AS lightning_payment_hash
576            ,       l.destination_pubkey AS lightning_destination_pubkey
577            ,       COALESCE(l.description, pm.lnurl_description) AS lightning_description
578            ,       l.preimage AS lightning_preimage
579            ,       pm.lnurl_pay_info
580            ,       pm.lnurl_withdraw_info
581            ,       t.metadata AS token_metadata
582            ,       t.tx_hash AS token_tx_hash
583            ,       t.invoice_details AS token_invoice_details
584            ,       s.invoice_details AS spark_invoice_details
585            ,       s.htlc_details AS spark_htlc_details
586            ,       lrm.nostr_zap_request AS lnurl_nostr_zap_request
587            ,       lrm.nostr_zap_receipt AS lnurl_nostr_zap_receipt
588            ,       lrm.sender_comment AS lnurl_sender_comment
589             FROM payments p
590             LEFT JOIN payment_details_lightning l ON p.id = l.payment_id
591             LEFT JOIN payment_details_token t ON p.id = t.payment_id
592             LEFT JOIN payment_details_spark s ON p.id = s.payment_id
593             LEFT JOIN payment_metadata pm ON p.id = pm.payment_id
594             LEFT JOIN lnurl_receive_metadata lrm ON l.payment_hash = lrm.payment_hash
595             WHERE p.id = ?",
596        )?;
597
598        let payment = stmt.query_row(params![id], map_payment)?;
599        Ok(payment)
600    }
601
602    async fn get_payment_by_invoice(
603        &self,
604        invoice: String,
605    ) -> Result<Option<Payment>, StorageError> {
606        let connection = self.get_connection()?;
607
608        let mut stmt = connection.prepare(
609            "SELECT p.id
610            ,       p.payment_type
611            ,       p.status
612            ,       p.amount
613            ,       p.fees
614            ,       p.timestamp
615            ,       p.method
616            ,       p.withdraw_tx_id
617            ,       p.deposit_tx_id
618            ,       p.spark
619            ,       l.invoice AS lightning_invoice
620            ,       l.payment_hash AS lightning_payment_hash
621            ,       l.destination_pubkey AS lightning_destination_pubkey
622            ,       COALESCE(l.description, pm.lnurl_description) AS lightning_description
623            ,       l.preimage AS lightning_preimage
624            ,       pm.lnurl_pay_info
625            ,       pm.lnurl_withdraw_info
626            ,       t.metadata AS token_metadata
627            ,       t.tx_hash AS token_tx_hash
628            ,       t.invoice_details AS token_invoice_details
629            ,       s.invoice_details AS spark_invoice_details
630            ,       s.htlc_details AS spark_htlc_details
631            ,       lrm.nostr_zap_request AS lnurl_nostr_zap_request
632            ,       lrm.nostr_zap_receipt AS lnurl_nostr_zap_receipt
633            ,       lrm.sender_comment AS lnurl_sender_comment
634             FROM payments p
635             LEFT JOIN payment_details_lightning l ON p.id = l.payment_id
636             LEFT JOIN payment_details_token t ON p.id = t.payment_id
637             LEFT JOIN payment_details_spark s ON p.id = s.payment_id
638             LEFT JOIN payment_metadata pm ON p.id = pm.payment_id
639             LEFT JOIN lnurl_receive_metadata lrm ON l.payment_hash = lrm.payment_hash
640             WHERE l.invoice = ?",
641        )?;
642
643        let payment = stmt.query_row(params![invoice], map_payment);
644        match payment {
645            Ok(payment) => Ok(Some(payment)),
646            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
647            Err(e) => Err(e.into()),
648        }
649    }
650
651    async fn add_deposit(
652        &self,
653        txid: String,
654        vout: u32,
655        amount_sats: u64,
656    ) -> Result<(), StorageError> {
657        let connection = self.get_connection()?;
658        connection.execute(
659            "INSERT OR IGNORE INTO unclaimed_deposits (txid, vout, amount_sats) 
660             VALUES (?, ?, ?)",
661            params![txid, vout, amount_sats,],
662        )?;
663        Ok(())
664    }
665
666    async fn delete_deposit(&self, txid: String, vout: u32) -> Result<(), StorageError> {
667        let connection = self.get_connection()?;
668        connection.execute(
669            "DELETE FROM unclaimed_deposits WHERE txid = ? AND vout = ?",
670            params![txid, vout],
671        )?;
672        Ok(())
673    }
674
675    async fn list_deposits(&self) -> Result<Vec<DepositInfo>, StorageError> {
676        let connection = self.get_connection()?;
677        let mut stmt =
678            connection.prepare("SELECT txid, vout, amount_sats, claim_error, refund_tx, refund_tx_id FROM unclaimed_deposits")?;
679        let rows = stmt.query_map(params![], |row| {
680            Ok(DepositInfo {
681                txid: row.get(0)?,
682                vout: row.get(1)?,
683                amount_sats: row.get(2)?,
684                claim_error: row.get(3)?,
685                refund_tx: row.get(4)?,
686                refund_tx_id: row.get(5)?,
687            })
688        })?;
689        let mut deposits = Vec::new();
690        for row in rows {
691            deposits.push(row?);
692        }
693        Ok(deposits)
694    }
695
696    async fn update_deposit(
697        &self,
698        txid: String,
699        vout: u32,
700        payload: UpdateDepositPayload,
701    ) -> Result<(), StorageError> {
702        let connection = self.get_connection()?;
703        match payload {
704            UpdateDepositPayload::ClaimError { error } => {
705                connection.execute(
706                    "UPDATE unclaimed_deposits SET claim_error = ? WHERE txid = ? AND vout = ?",
707                    params![error, txid, vout],
708                )?;
709            }
710            UpdateDepositPayload::Refund {
711                refund_txid,
712                refund_tx,
713            } => {
714                connection.execute(
715                    "UPDATE unclaimed_deposits SET refund_tx = ?, refund_tx_id = ? WHERE txid = ? AND vout = ?",
716                    params![refund_tx, refund_txid, txid, vout],
717                )?;
718            }
719        }
720        Ok(())
721    }
722
723    async fn set_lnurl_metadata(
724        &self,
725        metadata: Vec<SetLnurlMetadataItem>,
726    ) -> Result<(), StorageError> {
727        let connection = self.get_connection()?;
728        for metadata in metadata {
729            connection.execute(
730                "INSERT OR REPLACE INTO lnurl_receive_metadata (payment_hash, nostr_zap_request, nostr_zap_receipt, sender_comment)
731                 VALUES (?, ?, ?, ?)",
732                params![
733                    metadata.payment_hash,
734                    metadata.nostr_zap_request,
735                    metadata.nostr_zap_receipt,
736                    metadata.sender_comment,
737                ],
738            )?;
739        }
740        Ok(())
741    }
742}
743
744/// Bumps the revision number, locking the revision number for updates for the duration of the transaction.
745fn get_next_revision(tx: &Transaction<'_>) -> Result<u64, SyncStorageError> {
746    let revision = tx
747        .query_row(
748            "UPDATE sync_revision
749            SET revision = revision + 1
750            RETURNING revision",
751            [],
752            |row| row.get(0),
753        )
754        .map_err(map_sqlite_error)?;
755    Ok(revision)
756}
757
758impl From<StorageError> for SyncStorageError {
759    fn from(value: StorageError) -> Self {
760        match value {
761            StorageError::Implementation(s) => SyncStorageError::Implementation(s),
762            StorageError::InitializationError(s) => SyncStorageError::InitializationError(s),
763            StorageError::Serialization(s) => SyncStorageError::Serialization(s),
764        }
765    }
766}
767
768#[macros::async_trait]
769impl SyncStorage for SqliteStorage {
770    async fn add_outgoing_change(
771        &self,
772        record: UnversionedRecordChange,
773    ) -> Result<u64, SyncStorageError> {
774        let mut connection = self.get_connection()?;
775        let tx = connection.transaction().map_err(map_sqlite_error)?;
776        let revision = get_next_revision(&tx)?;
777
778        tx.execute(
779            "INSERT INTO sync_outgoing (
780                record_type
781            ,   data_id
782            ,   schema_version
783            ,   commit_time
784            ,   updated_fields_json
785            ,   revision
786            )
787             VALUES (?, ?, ?, strftime('%s','now'), ?, ?)",
788            params![
789                record.id.r#type,
790                record.id.data_id,
791                record.schema_version.clone(),
792                serde_json::to_string(&record.updated_fields)?,
793                revision,
794            ],
795        )
796        .map_err(map_sqlite_error)?;
797
798        tx.commit().map_err(map_sqlite_error)?;
799        Ok(revision)
800    }
801
802    async fn complete_outgoing_sync(&self, record: Record) -> Result<(), SyncStorageError> {
803        let mut connection = self.get_connection()?;
804        let tx = connection.transaction().map_err(map_sqlite_error)?;
805
806        tx.execute(
807            "DELETE FROM sync_outgoing WHERE record_type = ? AND data_id = ? AND revision = ?",
808            params![record.id.r#type, record.id.data_id, record.revision],
809        )
810        .map_err(map_sqlite_error)?;
811
812        tx.execute(
813            "INSERT OR REPLACE INTO sync_state (
814                record_type
815            ,   data_id
816            ,   schema_version
817            ,   commit_time
818            ,   data
819            ,   revision
820            )
821             VALUES (?, ?, ?, strftime('%s','now'), ?, ?)",
822            params![
823                record.id.r#type,
824                record.id.data_id,
825                record.schema_version.clone(),
826                serde_json::to_string(&record.data)?,
827                record.revision,
828            ],
829        )
830        .map_err(map_sqlite_error)?;
831
832        tx.commit().map_err(map_sqlite_error)?;
833        Ok(())
834    }
835
836    async fn get_pending_outgoing_changes(
837        &self,
838        limit: u32,
839    ) -> Result<Vec<OutgoingChange>, SyncStorageError> {
840        let connection = self.get_connection()?;
841
842        let mut stmt = connection
843            .prepare(
844                "SELECT o.record_type
845            ,       o.data_id
846            ,       o.schema_version
847            ,       o.commit_time
848            ,       o.updated_fields_json
849            ,       o.revision
850            ,       e.schema_version AS existing_schema_version
851            ,       e.commit_time AS existing_commit_time
852            ,       e.data AS existing_data
853            ,       e.revision AS existing_revision
854             FROM sync_outgoing o
855             LEFT JOIN sync_state e ON o.record_type = e.record_type AND o.data_id = e.data_id
856             ORDER BY o.revision ASC
857             LIMIT ?",
858            )
859            .map_err(map_sqlite_error)?;
860        let mut rows = stmt.query(params![limit]).map_err(map_sqlite_error)?;
861        let mut results = Vec::new();
862        while let Some(row) = rows.next().map_err(map_sqlite_error)? {
863            let parent = if let Some(existing_data) =
864                row.get::<_, Option<String>>(8).map_err(map_sqlite_error)?
865            {
866                Some(Record {
867                    id: RecordId::new(
868                        row.get::<_, String>(0).map_err(map_sqlite_error)?,
869                        row.get::<_, String>(1).map_err(map_sqlite_error)?,
870                    ),
871                    schema_version: row.get(6).map_err(map_sqlite_error)?,
872                    revision: row.get(9).map_err(map_sqlite_error)?,
873                    data: serde_json::from_str(&existing_data)?,
874                })
875            } else {
876                None
877            };
878            let change = RecordChange {
879                id: RecordId::new(
880                    row.get::<_, String>(0).map_err(map_sqlite_error)?,
881                    row.get::<_, String>(1).map_err(map_sqlite_error)?,
882                ),
883                schema_version: row.get(2).map_err(map_sqlite_error)?,
884                updated_fields: serde_json::from_str(
885                    &row.get::<_, String>(4).map_err(map_sqlite_error)?,
886                )?,
887                revision: row.get(5).map_err(map_sqlite_error)?,
888            };
889            results.push(OutgoingChange { change, parent });
890        }
891
892        Ok(results)
893    }
894
895    async fn get_last_revision(&self) -> Result<u64, SyncStorageError> {
896        let connection = self.get_connection()?;
897
898        // Get the maximum revision from sync_state table
899        let mut stmt = connection
900            .prepare("SELECT COALESCE(MAX(revision), 0) FROM sync_state")
901            .map_err(map_sqlite_error)?;
902
903        let revision: u64 = stmt
904            .query_row([], |row| row.get(0))
905            .map_err(map_sqlite_error)?;
906
907        Ok(revision)
908    }
909
910    async fn insert_incoming_records(&self, records: Vec<Record>) -> Result<(), SyncStorageError> {
911        if records.is_empty() {
912            return Ok(());
913        }
914
915        let mut connection = self.get_connection()?;
916        let tx = connection.transaction().map_err(map_sqlite_error)?;
917
918        for record in records {
919            tx.execute(
920                "INSERT OR REPLACE INTO sync_incoming (
921                    record_type
922                ,   data_id
923                ,   schema_version
924                ,   commit_time
925                ,   data
926                ,   revision
927                )
928                 VALUES (?, ?, ?, strftime('%s','now'), ?, ?)",
929                params![
930                    record.id.r#type,
931                    record.id.data_id,
932                    record.schema_version.clone(),
933                    serde_json::to_string(&record.data)?,
934                    record.revision,
935                ],
936            )
937            .map_err(map_sqlite_error)?;
938        }
939
940        tx.commit().map_err(map_sqlite_error)?;
941        Ok(())
942    }
943
944    async fn delete_incoming_record(&self, record: Record) -> Result<(), SyncStorageError> {
945        let connection = self.get_connection()?;
946
947        connection
948            .execute(
949                "DELETE FROM sync_incoming WHERE record_type = ? AND data_id = ? AND revision = ?",
950                params![record.id.r#type, record.id.data_id, record.revision],
951            )
952            .map_err(map_sqlite_error)?;
953
954        Ok(())
955    }
956
957    async fn rebase_pending_outgoing_records(&self, revision: u64) -> Result<(), SyncStorageError> {
958        let mut connection = self.get_connection()?;
959        let tx = connection.transaction().map_err(map_sqlite_error)?;
960
961        let last_revision = tx
962            .query_row(
963                "SELECT COALESCE(MAX(revision), 0) FROM sync_state",
964                [],
965                |row| row.get(0),
966            )
967            .map_err(map_sqlite_error)?;
968
969        let diff = revision.saturating_sub(last_revision);
970
971        // Update all pending outgoing records to have revision numbers higher than the incoming record
972        tx.execute(
973            "UPDATE sync_outgoing 
974             SET revision = revision + ?",
975            params![diff],
976        )
977        .map_err(map_sqlite_error)?;
978
979        tx.commit().map_err(map_sqlite_error)?;
980        Ok(())
981    }
982
983    async fn get_incoming_records(
984        &self,
985        limit: u32,
986    ) -> Result<Vec<IncomingChange>, SyncStorageError> {
987        let connection = self.get_connection()?;
988
989        let mut stmt = connection
990            .prepare(
991                "SELECT i.record_type
992            ,       i.data_id
993            ,       i.schema_version
994            ,       i.data
995            ,       i.revision
996            ,       e.schema_version AS existing_schema_version
997            ,       e.commit_time AS existing_commit_time
998            ,       e.data AS existing_data
999            ,       e.revision AS existing_revision
1000             FROM sync_incoming i
1001             LEFT JOIN sync_state e ON i.record_type = e.record_type AND i.data_id = e.data_id
1002             ORDER BY i.revision ASC
1003             LIMIT ?",
1004            )
1005            .map_err(map_sqlite_error)?;
1006
1007        let mut rows = stmt.query(params![limit]).map_err(map_sqlite_error)?;
1008        let mut results = Vec::new();
1009
1010        while let Some(row) = rows.next().map_err(map_sqlite_error)? {
1011            let parent = if let Some(existing_data) =
1012                row.get::<_, Option<String>>(7).map_err(map_sqlite_error)?
1013            {
1014                Some(Record {
1015                    id: RecordId::new(
1016                        row.get::<_, String>(0).map_err(map_sqlite_error)?,
1017                        row.get::<_, String>(1).map_err(map_sqlite_error)?,
1018                    ),
1019                    schema_version: row.get(5).map_err(map_sqlite_error)?,
1020                    revision: row.get(8).map_err(map_sqlite_error)?,
1021                    data: serde_json::from_str(&existing_data)?,
1022                })
1023            } else {
1024                None
1025            };
1026            let record = Record {
1027                id: RecordId::new(
1028                    row.get::<_, String>(0).map_err(map_sqlite_error)?,
1029                    row.get::<_, String>(1).map_err(map_sqlite_error)?,
1030                ),
1031                schema_version: row.get(2).map_err(map_sqlite_error)?,
1032                data: serde_json::from_str(&row.get::<_, String>(3).map_err(map_sqlite_error)?)?,
1033                revision: row.get(4).map_err(map_sqlite_error)?,
1034            };
1035            results.push(IncomingChange {
1036                new_state: record,
1037                old_state: parent,
1038            });
1039        }
1040
1041        Ok(results)
1042    }
1043
1044    async fn get_latest_outgoing_change(&self) -> Result<Option<OutgoingChange>, SyncStorageError> {
1045        let connection = self.get_connection()?;
1046
1047        let mut stmt = connection
1048            .prepare(
1049                "SELECT o.record_type
1050            ,       o.data_id
1051            ,       o.schema_version
1052            ,       o.commit_time
1053            ,       o.updated_fields_json
1054            ,       o.revision
1055            ,       e.schema_version AS existing_schema_version
1056            ,       e.commit_time AS existing_commit_time
1057            ,       e.data AS existing_data
1058            ,       e.revision AS existing_revision
1059             FROM sync_outgoing o
1060             LEFT JOIN sync_state e ON o.record_type = e.record_type AND o.data_id = e.data_id
1061             ORDER BY o.revision DESC
1062             LIMIT 1",
1063            )
1064            .map_err(map_sqlite_error)?;
1065
1066        let mut rows = stmt.query([]).map_err(map_sqlite_error)?;
1067
1068        if let Some(row) = rows.next().map_err(map_sqlite_error)? {
1069            let parent = if let Some(existing_data) =
1070                row.get::<_, Option<String>>(8).map_err(map_sqlite_error)?
1071            {
1072                Some(Record {
1073                    id: RecordId::new(
1074                        row.get::<_, String>(0).map_err(map_sqlite_error)?,
1075                        row.get::<_, String>(1).map_err(map_sqlite_error)?,
1076                    ),
1077                    schema_version: row.get(6).map_err(map_sqlite_error)?,
1078                    revision: row.get(9).map_err(map_sqlite_error)?,
1079                    data: serde_json::from_str(&existing_data)?,
1080                })
1081            } else {
1082                None
1083            };
1084            let change = RecordChange {
1085                id: RecordId::new(
1086                    row.get::<_, String>(0).map_err(map_sqlite_error)?,
1087                    row.get::<_, String>(1).map_err(map_sqlite_error)?,
1088                ),
1089                schema_version: row.get(2).map_err(map_sqlite_error)?,
1090                updated_fields: serde_json::from_str(
1091                    &row.get::<_, String>(4).map_err(map_sqlite_error)?,
1092                )?,
1093                revision: row.get(5).map_err(map_sqlite_error)?,
1094            };
1095
1096            return Ok(Some(OutgoingChange { change, parent }));
1097        }
1098
1099        Ok(None)
1100    }
1101
1102    async fn update_record_from_incoming(&self, record: Record) -> Result<(), SyncStorageError> {
1103        let connection = self.get_connection()?;
1104
1105        connection
1106            .execute(
1107                "INSERT OR REPLACE INTO sync_state (
1108                record_type
1109            ,   data_id
1110            ,   schema_version
1111            ,   commit_time
1112            ,   data
1113            ,   revision
1114            )
1115             VALUES (?, ?, ?, strftime('%s','now'), ?, ?)",
1116                params![
1117                    record.id.r#type,
1118                    record.id.data_id,
1119                    record.schema_version.clone(),
1120                    serde_json::to_string(&record.data)?,
1121                    record.revision,
1122                ],
1123            )
1124            .map_err(map_sqlite_error)?;
1125
1126        Ok(())
1127    }
1128}
1129
1130#[allow(clippy::needless_pass_by_value)]
1131fn map_sqlite_error(value: rusqlite::Error) -> SyncStorageError {
1132    SyncStorageError::Implementation(value.to_string())
1133}
1134
1135#[allow(clippy::too_many_lines)]
1136fn map_payment(row: &Row<'_>) -> Result<Payment, rusqlite::Error> {
1137    let withdraw_tx_id: Option<String> = row.get(7)?;
1138    let deposit_tx_id: Option<String> = row.get(8)?;
1139    let spark: Option<i32> = row.get(9)?;
1140    let lightning_invoice: Option<String> = row.get(10)?;
1141    let token_metadata: Option<String> = row.get(17)?;
1142    let details = match (
1143        lightning_invoice,
1144        withdraw_tx_id,
1145        deposit_tx_id,
1146        spark,
1147        token_metadata,
1148    ) {
1149        (Some(invoice), _, _, _, _) => {
1150            let payment_hash: String = row.get(11)?;
1151            let destination_pubkey: String = row.get(12)?;
1152            let description: Option<String> = row.get(13)?;
1153            let preimage: Option<String> = row.get(14)?;
1154            let lnurl_pay_info: Option<LnurlPayInfo> = row.get(15)?;
1155            let lnurl_withdraw_info: Option<LnurlWithdrawInfo> = row.get(16)?;
1156            let lnurl_nostr_zap_request: Option<String> = row.get(22)?;
1157            let lnurl_nostr_zap_receipt: Option<String> = row.get(23)?;
1158            let lnurl_sender_comment: Option<String> = row.get(24)?;
1159            let lnurl_receive_metadata =
1160                if lnurl_nostr_zap_request.is_some() || lnurl_sender_comment.is_some() {
1161                    Some(LnurlReceiveMetadata {
1162                        nostr_zap_request: lnurl_nostr_zap_request,
1163                        nostr_zap_receipt: lnurl_nostr_zap_receipt,
1164                        sender_comment: lnurl_sender_comment,
1165                    })
1166                } else {
1167                    None
1168                };
1169            Some(PaymentDetails::Lightning {
1170                invoice,
1171                payment_hash,
1172                destination_pubkey,
1173                description,
1174                preimage,
1175                lnurl_pay_info,
1176                lnurl_withdraw_info,
1177                lnurl_receive_metadata,
1178            })
1179        }
1180        (_, Some(tx_id), _, _, _) => Some(PaymentDetails::Withdraw { tx_id }),
1181        (_, _, Some(tx_id), _, _) => Some(PaymentDetails::Deposit { tx_id }),
1182        (_, _, _, Some(_), _) => {
1183            let invoice_details_str: Option<String> = row.get(20)?;
1184            let invoice_details = invoice_details_str
1185                .map(|s| {
1186                    serde_json::from_str(&s).map_err(|e| {
1187                        rusqlite::Error::FromSqlConversionFailure(
1188                            20,
1189                            rusqlite::types::Type::Text,
1190                            e.into(),
1191                        )
1192                    })
1193                })
1194                .transpose()?;
1195            let htlc_details_str: Option<String> = row.get(21)?;
1196            let htlc_details = htlc_details_str
1197                .map(|s| {
1198                    serde_json::from_str(&s).map_err(|e| {
1199                        rusqlite::Error::FromSqlConversionFailure(
1200                            21,
1201                            rusqlite::types::Type::Text,
1202                            e.into(),
1203                        )
1204                    })
1205                })
1206                .transpose()?;
1207            Some(PaymentDetails::Spark {
1208                invoice_details,
1209                htlc_details,
1210            })
1211        }
1212        (_, _, _, _, Some(metadata)) => {
1213            let invoice_details_str: Option<String> = row.get(19)?;
1214            let invoice_details = invoice_details_str
1215                .map(|s| {
1216                    serde_json::from_str(&s).map_err(|e| {
1217                        rusqlite::Error::FromSqlConversionFailure(
1218                            19,
1219                            rusqlite::types::Type::Text,
1220                            e.into(),
1221                        )
1222                    })
1223                })
1224                .transpose()?;
1225            Some(PaymentDetails::Token {
1226                metadata: serde_json::from_str(&metadata).map_err(|e| {
1227                    rusqlite::Error::FromSqlConversionFailure(
1228                        17,
1229                        rusqlite::types::Type::Text,
1230                        e.into(),
1231                    )
1232                })?,
1233                tx_hash: row.get(18)?,
1234                invoice_details,
1235            })
1236        }
1237        _ => None,
1238    };
1239    Ok(Payment {
1240        id: row.get(0)?,
1241        payment_type: row.get::<_, String>(1)?.parse().map_err(|e: String| {
1242            rusqlite::Error::FromSqlConversionFailure(1, rusqlite::types::Type::Text, e.into())
1243        })?,
1244        status: row.get::<_, String>(2)?.parse().map_err(|e: String| {
1245            rusqlite::Error::FromSqlConversionFailure(2, rusqlite::types::Type::Text, e.into())
1246        })?,
1247        amount: row.get::<_, U128SqlWrapper>(3)?.0,
1248        fees: row.get::<_, U128SqlWrapper>(4)?.0,
1249        timestamp: row.get(5)?,
1250        details,
1251        method: row.get(6)?,
1252    })
1253}
1254
1255impl ToSql for PaymentDetails {
1256    fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
1257        to_sql_json(self)
1258    }
1259}
1260
1261impl FromSql for PaymentDetails {
1262    fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
1263        from_sql_json(value)
1264    }
1265}
1266
1267impl ToSql for PaymentMethod {
1268    fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
1269        Ok(rusqlite::types::ToSqlOutput::from(self.to_string()))
1270    }
1271}
1272
1273impl FromSql for PaymentMethod {
1274    fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
1275        match value {
1276            ValueRef::Text(i) => {
1277                let s = std::str::from_utf8(i).map_err(|e| FromSqlError::Other(Box::new(e)))?;
1278                // NOTE: trim_matches/to_lowercase is here, because this used to be serde_json serialized.
1279                let payment_method: PaymentMethod = s
1280                    .trim_matches('"')
1281                    .to_lowercase()
1282                    .parse()
1283                    .map_err(|()| FromSqlError::InvalidType)?;
1284                Ok(payment_method)
1285            }
1286            _ => Err(FromSqlError::InvalidType),
1287        }
1288    }
1289}
1290
1291impl ToSql for DepositClaimError {
1292    fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
1293        to_sql_json(self)
1294    }
1295}
1296
1297impl FromSql for DepositClaimError {
1298    fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
1299        from_sql_json(value)
1300    }
1301}
1302
1303impl ToSql for LnurlPayInfo {
1304    fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
1305        to_sql_json(self)
1306    }
1307}
1308
1309impl FromSql for LnurlPayInfo {
1310    fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
1311        from_sql_json(value)
1312    }
1313}
1314
1315impl ToSql for LnurlWithdrawInfo {
1316    fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
1317        to_sql_json(self)
1318    }
1319}
1320
1321impl FromSql for LnurlWithdrawInfo {
1322    fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
1323        from_sql_json(value)
1324    }
1325}
1326
1327fn to_sql_json<T>(value: T) -> rusqlite::Result<ToSqlOutput<'static>>
1328where
1329    T: serde::Serialize,
1330{
1331    let json = serde_json::to_string(&value)
1332        .map_err(|e| rusqlite::Error::ToSqlConversionFailure(Box::new(e)))?;
1333    Ok(rusqlite::types::ToSqlOutput::from(json))
1334}
1335
1336fn from_sql_json<T>(value: ValueRef<'_>) -> FromSqlResult<T>
1337where
1338    T: serde::de::DeserializeOwned,
1339{
1340    match value {
1341        ValueRef::Text(i) => {
1342            let s = std::str::from_utf8(i).map_err(|e| FromSqlError::Other(Box::new(e)))?;
1343            let deserialized: T = serde_json::from_str(s).map_err(|_| FromSqlError::InvalidType)?;
1344            Ok(deserialized)
1345        }
1346        _ => Err(FromSqlError::InvalidType),
1347    }
1348}
1349
1350struct U128SqlWrapper(u128);
1351
1352impl ToSql for U128SqlWrapper {
1353    fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
1354        let string = self.0.to_string();
1355        Ok(rusqlite::types::ToSqlOutput::from(string))
1356    }
1357}
1358
1359impl FromSql for U128SqlWrapper {
1360    fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
1361        match value {
1362            ValueRef::Text(i) => {
1363                let s = std::str::from_utf8(i).map_err(|e| FromSqlError::Other(Box::new(e)))?;
1364                let integer = s.parse::<u128>().map_err(|_| FromSqlError::InvalidType)?;
1365                Ok(U128SqlWrapper(integer))
1366            }
1367            _ => Err(FromSqlError::InvalidType),
1368        }
1369    }
1370}
1371
1372#[cfg(test)]
1373mod tests {
1374
1375    use crate::SqliteStorage;
1376
1377    #[tokio::test]
1378    async fn test_sqlite_storage() {
1379        let temp_dir = tempdir::TempDir::new("sqlite_storage").unwrap();
1380        let storage = SqliteStorage::new(temp_dir.path()).unwrap();
1381
1382        crate::persist::tests::test_sqlite_storage(Box::new(storage)).await;
1383    }
1384
1385    #[tokio::test]
1386    async fn test_unclaimed_deposits_crud() {
1387        let temp_dir = tempdir::TempDir::new("sqlite_storage_deposits").unwrap();
1388        let storage = SqliteStorage::new(temp_dir.path()).unwrap();
1389
1390        crate::persist::tests::test_unclaimed_deposits_crud(Box::new(storage)).await;
1391    }
1392
1393    #[tokio::test]
1394    async fn test_deposit_refunds() {
1395        let temp_dir = tempdir::TempDir::new("sqlite_storage_refund_tx").unwrap();
1396        let storage = SqliteStorage::new(temp_dir.path()).unwrap();
1397
1398        crate::persist::tests::test_deposit_refunds(Box::new(storage)).await;
1399    }
1400
1401    #[tokio::test]
1402    async fn test_payment_type_filtering() {
1403        let temp_dir = tempdir::TempDir::new("sqlite_storage_type_filter").unwrap();
1404        let storage = SqliteStorage::new(temp_dir.path()).unwrap();
1405
1406        crate::persist::tests::test_payment_type_filtering(Box::new(storage)).await;
1407    }
1408
1409    #[tokio::test]
1410    async fn test_payment_status_filtering() {
1411        let temp_dir = tempdir::TempDir::new("sqlite_storage_status_filter").unwrap();
1412        let storage = SqliteStorage::new(temp_dir.path()).unwrap();
1413
1414        crate::persist::tests::test_payment_status_filtering(Box::new(storage)).await;
1415    }
1416
1417    #[tokio::test]
1418    async fn test_payment_details_filtering() {
1419        let temp_dir = tempdir::TempDir::new("sqlite_storage_details_filter").unwrap();
1420        let storage = SqliteStorage::new(temp_dir.path()).unwrap();
1421
1422        crate::persist::tests::test_asset_filtering(Box::new(storage)).await;
1423    }
1424
1425    #[tokio::test]
1426    async fn test_timestamp_filtering() {
1427        let temp_dir = tempdir::TempDir::new("sqlite_storage_timestamp_filter").unwrap();
1428        let storage = SqliteStorage::new(temp_dir.path()).unwrap();
1429
1430        crate::persist::tests::test_timestamp_filtering(Box::new(storage)).await;
1431    }
1432
1433    #[tokio::test]
1434    async fn test_spark_htlc_status_filtering() {
1435        let temp_dir = tempdir::TempDir::new("sqlite_storage_htlc_filter").unwrap();
1436        let storage = SqliteStorage::new(temp_dir.path()).unwrap();
1437
1438        crate::persist::tests::test_spark_htlc_status_filtering(Box::new(storage)).await;
1439    }
1440
1441    #[tokio::test]
1442    async fn test_combined_filters() {
1443        let temp_dir = tempdir::TempDir::new("sqlite_storage_combined_filter").unwrap();
1444        let storage = SqliteStorage::new(temp_dir.path()).unwrap();
1445
1446        crate::persist::tests::test_combined_filters(Box::new(storage)).await;
1447    }
1448
1449    #[tokio::test]
1450    async fn test_sort_order() {
1451        let temp_dir = tempdir::TempDir::new("sqlite_storage_sort_order").unwrap();
1452        let storage = SqliteStorage::new(temp_dir.path()).unwrap();
1453
1454        crate::persist::tests::test_sort_order(Box::new(storage)).await;
1455    }
1456
1457    #[tokio::test]
1458    async fn test_payment_request_metadata() {
1459        let temp_dir = tempdir::TempDir::new("sqlite_storage_payment_request_metadata").unwrap();
1460        let storage = SqliteStorage::new(temp_dir.path()).unwrap();
1461
1462        crate::persist::tests::test_payment_request_metadata(Box::new(storage)).await;
1463    }
1464
1465    #[tokio::test]
1466    async fn test_sync_storage() {
1467        let temp_dir = tempdir::TempDir::new("sqlite_sync_storage").unwrap();
1468        let storage = SqliteStorage::new(temp_dir.path()).unwrap();
1469
1470        crate::persist::tests::test_sqlite_sync_storage(Box::new(storage)).await;
1471    }
1472}