breez_sdk_spark/persist/
sqlite.rs

1use std::path::{Path, PathBuf};
2
3use macros::async_trait;
4use rusqlite::{
5    Connection, Row, ToSql, Transaction, params,
6    types::{FromSql, FromSqlError, FromSqlResult, ToSqlOutput, ValueRef},
7};
8use rusqlite_migration::{M, Migrations, SchemaVersion};
9
10use crate::{
11    AssetFilter, ConversionInfo, DepositInfo, ListPaymentsRequest, LnurlPayInfo,
12    LnurlReceiveMetadata, LnurlWithdrawInfo, PaymentDetails, PaymentDetailsFilter, PaymentMethod,
13    error::DepositClaimError,
14    persist::{PaymentMetadata, SetLnurlMetadataItem, UpdateDepositPayload},
15    sync_storage::{
16        IncomingChange, OutgoingChange, Record, RecordChange, RecordId, SyncStorage,
17        SyncStorageError, UnversionedRecordChange,
18    },
19};
20
21use super::{Payment, Storage, StorageError};
22
23const DEFAULT_DB_FILENAME: &str = "storage.sql";
24/// SQLite-based storage implementation
25pub struct SqliteStorage {
26    db_dir: PathBuf,
27}
28
29impl SqliteStorage {
30    /// Creates a new `SQLite` storage
31    ///
32    /// # Arguments
33    ///
34    /// * `path` - Path to the `SQLite` database file
35    ///
36    /// # Returns
37    ///
38    /// A new `SqliteStorage` instance or an error
39    pub fn new(path: &Path) -> Result<Self, StorageError> {
40        let storage = Self {
41            db_dir: path.to_path_buf(),
42        };
43
44        #[cfg(not(all(target_family = "wasm", target_os = "unknown")))]
45        std::fs::create_dir_all(path)
46            .map_err(|e| StorageError::InitializationError(e.to_string()))?;
47
48        storage.migrate()?;
49        Ok(storage)
50    }
51
52    pub(crate) fn get_connection(&self) -> Result<Connection, StorageError> {
53        Ok(Connection::open(self.get_db_path())?)
54    }
55
56    fn get_db_path(&self) -> PathBuf {
57        self.db_dir.join(DEFAULT_DB_FILENAME)
58    }
59
60    fn migrate(&self) -> Result<(), StorageError> {
61        let migrations =
62            Migrations::new(Self::current_migrations().into_iter().map(M::up).collect());
63        let mut conn = self.get_connection()?;
64        let previous_version = match migrations.current_version(&conn)? {
65            SchemaVersion::Inside(previous_version) => previous_version.get(),
66            _ => 0,
67        };
68        migrations.to_latest(&mut conn)?;
69
70        if previous_version < 6 {
71            Self::migrate_lnurl_metadata_description(&mut conn)?;
72        }
73
74        Ok(())
75    }
76
77    fn migrate_lnurl_metadata_description(conn: &mut Connection) -> Result<(), StorageError> {
78        let mut stmt = conn.prepare("SELECT payment_id, lnurl_pay_info FROM payment_metadata")?;
79        let pay_infos: Vec<_> = stmt
80            .query_map([], |row| {
81                let payment_id: String = row.get(0)?;
82                let lnurl_pay_info: Option<LnurlPayInfo> = row.get(1)?;
83                Ok((payment_id, lnurl_pay_info))
84            })?
85            .collect::<Result<_, _>>()?;
86        let pay_infos = pay_infos
87            .into_iter()
88            .filter_map(|(payment_id, lnurl_pay_info)| {
89                let pay_info = lnurl_pay_info?;
90                let description = pay_info.extract_description()?;
91                Some((payment_id, description))
92            })
93            .collect::<Vec<_>>();
94
95        for pay_info in pay_infos {
96            conn.execute(
97                "UPDATE payment_metadata SET lnurl_description = ? WHERE payment_id = ?",
98                params![pay_info.1, pay_info.0],
99            )?;
100        }
101
102        Ok(())
103    }
104
105    #[allow(clippy::too_many_lines)]
106    pub(crate) fn current_migrations() -> Vec<&'static str> {
107        vec![
108            "CREATE TABLE IF NOT EXISTS payments (
109              id TEXT PRIMARY KEY,
110              payment_type TEXT NOT NULL,
111              status TEXT NOT NULL,
112              amount INTEGER NOT NULL,
113              fees INTEGER NOT NULL,
114              timestamp INTEGER NOT NULL,
115              details TEXT,
116              method TEXT
117            );",
118            "CREATE TABLE IF NOT EXISTS settings (
119              key TEXT PRIMARY KEY,
120              value TEXT NOT NULL
121            );",
122            "CREATE TABLE IF NOT EXISTS unclaimed_deposits (
123              txid TEXT NOT NULL,
124              vout INTEGER NOT NULL,
125              amount_sats INTEGER,
126              claim_error TEXT,
127              refund_tx TEXT,
128              refund_tx_id TEXT,
129              PRIMARY KEY (txid, vout)
130            );",
131            "CREATE TABLE IF NOT EXISTS payment_metadata (
132              payment_id TEXT PRIMARY KEY,
133              lnurl_pay_info TEXT
134            );",
135            "CREATE TABLE IF NOT EXISTS deposit_refunds (
136              deposit_tx_id TEXT NOT NULL,
137              deposit_vout INTEGER NOT NULL,
138              refund_tx TEXT NOT NULL,
139              refund_tx_id TEXT NOT NULL,
140              PRIMARY KEY (deposit_tx_id, deposit_vout)              
141            );",
142            "ALTER TABLE payment_metadata ADD COLUMN lnurl_description TEXT;",
143            "
144            ALTER TABLE payments ADD COLUMN withdraw_tx_id TEXT;
145            ALTER TABLE payments ADD COLUMN deposit_tx_id TEXT;
146            ALTER TABLE payments ADD COLUMN spark INTEGER;
147            CREATE TABLE payment_details_lightning (
148              payment_id TEXT PRIMARY KEY,
149              invoice TEXT NOT NULL,
150              payment_hash TEXT NOT NULL,
151              destination_pubkey TEXT NOT NULL,
152              description TEXT,
153              preimage TEXT,
154              FOREIGN KEY (payment_id) REFERENCES payments(id) ON DELETE CASCADE
155            );
156            INSERT INTO payment_details_lightning (payment_id, invoice, payment_hash, destination_pubkey, description, preimage)
157            SELECT id, json_extract(details, '$.Lightning.invoice'), json_extract(details, '$.Lightning.payment_hash'), 
158                json_extract(details, '$.Lightning.destination_pubkey'), json_extract(details, '$.Lightning.description'), 
159                json_extract(details, '$.Lightning.preimage') 
160            FROM payments WHERE json_extract(details, '$.Lightning.invoice') IS NOT NULL;
161
162            UPDATE payments SET withdraw_tx_id = json_extract(details, '$.Withdraw.tx_id')
163            WHERE json_extract(details, '$.Withdraw.tx_id') IS NOT NULL;
164
165            UPDATE payments SET deposit_tx_id = json_extract(details, '$.Deposit.tx_id')
166            WHERE json_extract(details, '$.Deposit.tx_id') IS NOT NULL;
167
168            ALTER TABLE payments DROP COLUMN details;
169
170            CREATE INDEX idx_payment_details_lightning_invoice ON payment_details_lightning(invoice);
171            ",
172            "CREATE TABLE payment_details_token (
173              payment_id TEXT PRIMARY KEY,
174              metadata TEXT NOT NULL,
175              tx_hash TEXT NOT NULL,
176              FOREIGN KEY (payment_id) REFERENCES payments(id) ON DELETE CASCADE
177            );",
178            // Migration to change payments amount and fees from INTEGER to TEXT
179            "CREATE TABLE payments_new (
180              id TEXT PRIMARY KEY,
181              payment_type TEXT NOT NULL,
182              status TEXT NOT NULL,
183              amount TEXT NOT NULL,
184              fees TEXT NOT NULL,
185              timestamp INTEGER NOT NULL,
186              method TEXT,
187              withdraw_tx_id TEXT,
188              deposit_tx_id TEXT,
189              spark INTEGER
190            );",
191            "INSERT INTO payments_new (id, payment_type, status, amount, fees, timestamp, method, withdraw_tx_id, deposit_tx_id, spark)
192             SELECT id, payment_type, status, CAST(amount AS TEXT), CAST(fees AS TEXT), timestamp, method, withdraw_tx_id, deposit_tx_id, spark
193             FROM payments;",
194            "DROP TABLE payments;",
195            "ALTER TABLE payments_new RENAME TO payments;",
196            "CREATE TABLE payment_details_spark (
197              payment_id TEXT NOT NULL PRIMARY KEY,
198              invoice_details TEXT NOT NULL,
199              FOREIGN KEY (payment_id) REFERENCES payments(id) ON DELETE CASCADE
200            );
201            ALTER TABLE payment_details_token ADD COLUMN invoice_details TEXT;",
202            "ALTER TABLE payment_metadata ADD COLUMN lnurl_withdraw_info TEXT;",
203            "CREATE TABLE sync_revision (
204                revision INTEGER NOT NULL DEFAULT 0
205            );
206            INSERT INTO sync_revision (revision) VALUES (0);
207            CREATE TABLE sync_outgoing(
208                record_type TEXT NOT NULL,
209                data_id TEXT NOT NULL,
210                schema_version TEXT NOT NULL,
211                commit_time INTEGER NOT NULL,
212                updated_fields_json TEXT NOT NULL,
213                revision INTEGER NOT NULL
214            );
215            CREATE INDEX idx_sync_outgoing_data_id_record_type ON sync_outgoing(record_type, data_id);
216            CREATE TABLE sync_state(
217                record_type TEXT NOT NULL,
218                data_id TEXT NOT NULL,
219                schema_version TEXT NOT NULL,
220                commit_time INTEGER NOT NULL,
221                data TEXT NOT NULL,
222                revision INTEGER NOT NULL,
223                PRIMARY KEY(record_type, data_id)
224            );",
225            "CREATE TABLE sync_incoming(
226                record_type TEXT NOT NULL,
227                data_id TEXT NOT NULL,
228                schema_version TEXT NOT NULL,
229                commit_time INTEGER NOT NULL,
230                data TEXT NOT NULL,
231                revision INTEGER NOT NULL,
232                PRIMARY KEY(record_type, data_id, revision)
233            );
234            CREATE INDEX idx_sync_incoming_revision ON sync_incoming(revision);",
235            "ALTER TABLE payment_details_spark RENAME TO tmp_payment_details_spark;
236            CREATE TABLE payment_details_spark (
237              payment_id TEXT NOT NULL PRIMARY KEY,
238              invoice_details TEXT,
239              htlc_details TEXT,
240              FOREIGN KEY (payment_id) REFERENCES payments(id) ON DELETE CASCADE
241            );
242            INSERT INTO payment_details_spark (payment_id, invoice_details)
243             SELECT payment_id, invoice_details FROM tmp_payment_details_spark;
244            DROP TABLE tmp_payment_details_spark;",
245            "CREATE TABLE lnurl_receive_metadata (
246                payment_hash TEXT NOT NULL PRIMARY KEY,
247                nostr_zap_request TEXT,
248                nostr_zap_receipt TEXT,
249                sender_comment TEXT
250            );",
251            // Delete all unclaimed deposits to clear old claim_error JSON format.
252            // Deposits will be recovered on next sync.
253            "DELETE FROM unclaimed_deposits;",
254            // Clear all sync tables due to BreezSigner signature change.
255            // This forces users to sync from scratch to the sync server.
256            // Also delete the sync_initial_complete flag to force re-populating
257            // all payment metadata for outgoing sync using the new key.
258            "DELETE FROM sync_outgoing;
259             DELETE FROM sync_incoming;
260             DELETE FROM sync_state;
261             UPDATE sync_revision SET revision = 0;
262             DELETE FROM settings WHERE key = 'sync_initial_complete';",
263            "ALTER TABLE payment_metadata ADD COLUMN token_conversion_info TEXT;",
264            "ALTER TABLE payment_metadata ADD COLUMN parent_payment_id TEXT;",
265            "
266            ALTER TABLE payment_metadata DROP COLUMN token_conversion_info;
267            ALTER TABLE payment_metadata ADD COLUMN conversion_info TEXT;
268            ",
269        ]
270    }
271}
272
273impl From<rusqlite::Error> for StorageError {
274    fn from(value: rusqlite::Error) -> Self {
275        StorageError::Implementation(value.to_string())
276    }
277}
278
279impl From<rusqlite_migration::Error> for StorageError {
280    fn from(value: rusqlite_migration::Error) -> Self {
281        StorageError::Implementation(value.to_string())
282    }
283}
284
285#[async_trait]
286impl Storage for SqliteStorage {
287    #[allow(clippy::too_many_lines)]
288    async fn list_payments(
289        &self,
290        request: ListPaymentsRequest,
291    ) -> Result<Vec<Payment>, StorageError> {
292        let connection = self.get_connection()?;
293
294        // Build WHERE clauses based on filters
295        let mut where_clauses = Vec::new();
296        let mut params: Vec<Box<dyn ToSql>> = Vec::new();
297
298        // Filter by payment type
299        if let Some(ref type_filter) = request.type_filter
300            && !type_filter.is_empty()
301        {
302            let placeholders = type_filter
303                .iter()
304                .map(|_| "?")
305                .collect::<Vec<_>>()
306                .join(", ");
307            where_clauses.push(format!("p.payment_type IN ({placeholders})"));
308            for payment_type in type_filter {
309                params.push(Box::new(payment_type.to_string()));
310            }
311        }
312
313        // Filter by status
314        if let Some(ref status_filter) = request.status_filter
315            && !status_filter.is_empty()
316        {
317            let placeholders = status_filter
318                .iter()
319                .map(|_| "?")
320                .collect::<Vec<_>>()
321                .join(", ");
322            where_clauses.push(format!("p.status IN ({placeholders})"));
323            for status in status_filter {
324                params.push(Box::new(status.to_string()));
325            }
326        }
327
328        // Filter by timestamp range
329        if let Some(from_timestamp) = request.from_timestamp {
330            where_clauses.push("p.timestamp >= ?".to_string());
331            params.push(Box::new(from_timestamp));
332        }
333
334        if let Some(to_timestamp) = request.to_timestamp {
335            where_clauses.push("p.timestamp < ?".to_string());
336            params.push(Box::new(to_timestamp));
337        }
338
339        // Filter by asset
340        if let Some(ref asset_filter) = request.asset_filter {
341            match asset_filter {
342                AssetFilter::Bitcoin => {
343                    where_clauses.push("t.metadata IS NULL".to_string());
344                }
345                AssetFilter::Token { token_identifier } => {
346                    where_clauses.push("t.metadata IS NOT NULL".to_string());
347                    if let Some(identifier) = token_identifier {
348                        // Filter by specific token identifier
349                        where_clauses
350                            .push("json_extract(t.metadata, '$.identifier') = ?".to_string());
351                        params.push(Box::new(identifier.clone()));
352                    }
353                }
354            }
355        }
356
357        // Filter by payment details. If any filter matches, we include the payment
358        if let Some(ref payment_details_filter) = request.payment_details_filter {
359            let mut all_payment_details_clauses = Vec::new();
360            for payment_details_filter in payment_details_filter {
361                let mut payment_details_clauses = Vec::new();
362                // Filter by Spark HTLC status
363                if let PaymentDetailsFilter::Spark {
364                    htlc_status: Some(htlc_statuses),
365                    ..
366                } = payment_details_filter
367                    && !htlc_statuses.is_empty()
368                {
369                    let placeholders = htlc_statuses
370                        .iter()
371                        .map(|_| "?")
372                        .collect::<Vec<_>>()
373                        .join(", ");
374                    payment_details_clauses.push(format!(
375                        "json_extract(s.htlc_details, '$.status') IN ({placeholders})"
376                    ));
377                    for htlc_status in htlc_statuses {
378                        params.push(Box::new(htlc_status.to_string()));
379                    }
380                }
381                // Filter by conversion info presence
382                if let PaymentDetailsFilter::Spark {
383                    conversion_refund_needed: Some(conversion_refund_needed),
384                    ..
385                }
386                | PaymentDetailsFilter::Token {
387                    conversion_refund_needed: Some(conversion_refund_needed),
388                    ..
389                } = payment_details_filter
390                {
391                    let type_check = match payment_details_filter {
392                        PaymentDetailsFilter::Spark { .. } => "p.spark = 1",
393                        PaymentDetailsFilter::Token { .. } => "p.spark IS NULL",
394                    };
395                    let refund_needed = if *conversion_refund_needed {
396                        "= 'RefundNeeded'"
397                    } else {
398                        "!= 'RefundNeeded'"
399                    };
400                    payment_details_clauses.push(format!(
401                        "{type_check} AND pm.conversion_info IS NOT NULL AND
402                         json_extract(pm.conversion_info, '$.status') {refund_needed}"
403                    ));
404                }
405                // Filter by token transaction hash
406                if let PaymentDetailsFilter::Token {
407                    tx_hash: Some(tx_hash),
408                    ..
409                } = payment_details_filter
410                {
411                    payment_details_clauses.push("t.tx_hash = ?".to_string());
412                    params.push(Box::new(tx_hash.clone()));
413                }
414
415                if !payment_details_clauses.is_empty() {
416                    all_payment_details_clauses
417                        .push(format!("({})", payment_details_clauses.join(" AND ")));
418                }
419            }
420
421            if !all_payment_details_clauses.is_empty() {
422                where_clauses.push(format!("({})", all_payment_details_clauses.join(" OR ")));
423            }
424        }
425
426        // Build the WHERE clause
427        let where_sql = if where_clauses.is_empty() {
428            String::new()
429        } else {
430            format!("WHERE {}", where_clauses.join(" AND "))
431        };
432
433        // Determine sort order
434        let order_direction = if request.sort_ascending.unwrap_or(false) {
435            "ASC"
436        } else {
437            "DESC"
438        };
439
440        let query = format!(
441            "SELECT p.id
442            ,       p.payment_type
443            ,       p.status
444            ,       p.amount
445            ,       p.fees
446            ,       p.timestamp
447            ,       p.method
448            ,       p.withdraw_tx_id
449            ,       p.deposit_tx_id
450            ,       p.spark
451            ,       l.invoice AS lightning_invoice
452            ,       l.payment_hash AS lightning_payment_hash
453            ,       l.destination_pubkey AS lightning_destination_pubkey
454            ,       COALESCE(l.description, pm.lnurl_description) AS lightning_description
455            ,       l.preimage AS lightning_preimage
456            ,       pm.lnurl_pay_info
457            ,       pm.lnurl_withdraw_info
458            ,       pm.conversion_info
459            ,       t.metadata AS token_metadata
460            ,       t.tx_hash AS token_tx_hash
461            ,       t.invoice_details AS token_invoice_details
462            ,       s.invoice_details AS spark_invoice_details
463            ,       s.htlc_details AS spark_htlc_details
464            ,       lrm.nostr_zap_request AS lnurl_nostr_zap_request
465            ,       lrm.nostr_zap_receipt AS lnurl_nostr_zap_receipt
466            ,       lrm.sender_comment AS lnurl_sender_comment
467             FROM payments p
468             LEFT JOIN payment_details_lightning l ON p.id = l.payment_id
469             LEFT JOIN payment_details_token t ON p.id = t.payment_id
470             LEFT JOIN payment_details_spark s ON p.id = s.payment_id
471             LEFT JOIN payment_metadata pm ON p.id = pm.payment_id
472             LEFT JOIN lnurl_receive_metadata lrm ON l.payment_hash = lrm.payment_hash
473             {}
474             ORDER BY p.timestamp {} 
475             LIMIT {} OFFSET {}",
476            where_sql,
477            order_direction,
478            request.limit.unwrap_or(u32::MAX),
479            request.offset.unwrap_or(0)
480        );
481
482        let mut stmt = connection.prepare(&query)?;
483        let param_refs: Vec<&dyn ToSql> = params.iter().map(std::convert::AsRef::as_ref).collect();
484        let payments = stmt
485            .query_map(param_refs.as_slice(), map_payment)?
486            .collect::<Result<Vec<_>, _>>()?;
487        Ok(payments)
488    }
489
490    #[allow(clippy::too_many_lines)]
491    async fn insert_payment(&self, payment: Payment) -> Result<(), StorageError> {
492        let mut connection = self.get_connection()?;
493        let tx = connection.transaction()?;
494        tx.execute(
495            "INSERT INTO payments (id, payment_type, status, amount, fees, timestamp, method) 
496             VALUES (?, ?, ?, ?, ?, ?, ?)
497             ON CONFLICT(id) DO UPDATE SET 
498                payment_type=excluded.payment_type,
499                status=excluded.status,
500                amount=excluded.amount,
501                fees=excluded.fees,
502                timestamp=excluded.timestamp,
503                method=excluded.method",
504            params![
505                payment.id,
506                payment.payment_type.to_string(),
507                payment.status.to_string(),
508                U128SqlWrapper(payment.amount),
509                U128SqlWrapper(payment.fees),
510                payment.timestamp,
511                payment.method,
512            ],
513        )?;
514
515        match payment.details {
516            Some(PaymentDetails::Withdraw { tx_id }) => {
517                tx.execute(
518                    "UPDATE payments SET withdraw_tx_id = ? WHERE id = ?",
519                    params![tx_id, payment.id],
520                )?;
521            }
522            Some(PaymentDetails::Deposit { tx_id }) => {
523                tx.execute(
524                    "UPDATE payments SET deposit_tx_id = ? WHERE id = ?",
525                    params![tx_id, payment.id],
526                )?;
527            }
528            Some(PaymentDetails::Spark {
529                invoice_details,
530                htlc_details,
531                ..
532            }) => {
533                tx.execute(
534                    "UPDATE payments SET spark = 1 WHERE id = ?",
535                    params![payment.id],
536                )?;
537                if invoice_details.is_some() || htlc_details.is_some() {
538                    // Upsert both details together and avoid overwriting existing data with NULLs
539                    tx.execute(
540                        "INSERT INTO payment_details_spark (payment_id, invoice_details, htlc_details)
541                         VALUES (?, ?, ?)
542                         ON CONFLICT(payment_id) DO UPDATE SET
543                            invoice_details=COALESCE(excluded.invoice_details, payment_details_spark.invoice_details),
544                            htlc_details=COALESCE(excluded.htlc_details, payment_details_spark.htlc_details)",
545                        params![
546                            payment.id,
547                            invoice_details.as_ref().map(serde_json::to_string).transpose()?,
548                            htlc_details.as_ref().map(serde_json::to_string).transpose()?,
549                        ],
550                    )?;
551                }
552            }
553            Some(PaymentDetails::Token {
554                metadata,
555                tx_hash,
556                invoice_details,
557                ..
558            }) => {
559                tx.execute(
560                    "INSERT INTO payment_details_token (payment_id, metadata, tx_hash, invoice_details)
561                     VALUES (?, ?, ?, ?)
562                     ON CONFLICT(payment_id) DO UPDATE SET 
563                        metadata=excluded.metadata,
564                        tx_hash=excluded.tx_hash,
565                        invoice_details=COALESCE(excluded.invoice_details, payment_details_token.invoice_details)",
566                    params![
567                        payment.id,
568                        serde_json::to_string(&metadata)?,
569                        tx_hash,
570                        invoice_details.as_ref().map(serde_json::to_string).transpose()?,
571                    ],
572                )?;
573            }
574            Some(PaymentDetails::Lightning {
575                invoice,
576                payment_hash,
577                destination_pubkey,
578                description,
579                preimage,
580                ..
581            }) => {
582                tx.execute(
583                    "INSERT INTO payment_details_lightning (payment_id, invoice, payment_hash, destination_pubkey, description, preimage) 
584                     VALUES (?, ?, ?, ?, ?, ?)
585                     ON CONFLICT(payment_id) DO UPDATE SET
586                        invoice=excluded.invoice,
587                        payment_hash=excluded.payment_hash,
588                        destination_pubkey=excluded.destination_pubkey,
589                        description=excluded.description,
590                        preimage=COALESCE(excluded.preimage, payment_details_lightning.preimage)",
591                    params![
592                        payment.id,
593                        invoice,
594                        payment_hash,
595                        destination_pubkey,
596                        description,
597                        preimage,
598                    ],
599                )?;
600            }
601            None => {}
602        }
603
604        tx.commit()?;
605        Ok(())
606    }
607
608    async fn set_payment_metadata(
609        &self,
610        payment_id: String,
611        metadata: PaymentMetadata,
612    ) -> Result<(), StorageError> {
613        let connection = self.get_connection()?;
614
615        connection.execute(
616            "INSERT OR REPLACE INTO payment_metadata (payment_id, parent_payment_id, lnurl_pay_info, lnurl_withdraw_info, lnurl_description, conversion_info)
617             VALUES (?, ?, ?, ?, ?, ?)",
618            params![
619                payment_id,
620                metadata.parent_payment_id,
621                metadata.lnurl_pay_info,
622                metadata.lnurl_withdraw_info,
623                metadata.lnurl_description,
624                metadata.conversion_info.as_ref().map(serde_json::to_string).transpose()?,
625            ],
626        )?;
627
628        Ok(())
629    }
630
631    async fn set_cached_item(&self, key: String, value: String) -> Result<(), StorageError> {
632        let connection = self.get_connection()?;
633
634        connection.execute(
635            "INSERT OR REPLACE INTO settings (key, value) VALUES (?, ?)",
636            params![key, value],
637        )?;
638
639        Ok(())
640    }
641
642    async fn get_cached_item(&self, key: String) -> Result<Option<String>, StorageError> {
643        let connection = self.get_connection()?;
644
645        let mut stmt = connection.prepare("SELECT value FROM settings WHERE key = ?")?;
646
647        let result = stmt.query_row(params![key], |row| {
648            let value_str: String = row.get(0)?;
649            Ok(value_str)
650        });
651
652        match result {
653            Ok(value) => Ok(Some(value)),
654            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
655            Err(e) => Err(e.into()),
656        }
657    }
658
659    async fn delete_cached_item(&self, key: String) -> Result<(), StorageError> {
660        let connection = self.get_connection()?;
661
662        connection.execute("DELETE FROM settings WHERE key = ?", params![key])?;
663
664        Ok(())
665    }
666
667    async fn get_payment_by_id(&self, id: String) -> Result<Payment, StorageError> {
668        let connection = self.get_connection()?;
669
670        let mut stmt = connection.prepare(
671            "SELECT p.id
672            ,       p.payment_type
673            ,       p.status
674            ,       p.amount
675            ,       p.fees
676            ,       p.timestamp
677            ,       p.method
678            ,       p.withdraw_tx_id
679            ,       p.deposit_tx_id
680            ,       p.spark
681            ,       l.invoice AS lightning_invoice
682            ,       l.payment_hash AS lightning_payment_hash
683            ,       l.destination_pubkey AS lightning_destination_pubkey
684            ,       COALESCE(l.description, pm.lnurl_description) AS lightning_description
685            ,       l.preimage AS lightning_preimage
686            ,       pm.lnurl_pay_info
687            ,       pm.lnurl_withdraw_info
688            ,       pm.conversion_info
689            ,       t.metadata AS token_metadata
690            ,       t.tx_hash AS token_tx_hash
691            ,       t.invoice_details AS token_invoice_details
692            ,       s.invoice_details AS spark_invoice_details
693            ,       s.htlc_details AS spark_htlc_details
694            ,       lrm.nostr_zap_request AS lnurl_nostr_zap_request
695            ,       lrm.nostr_zap_receipt AS lnurl_nostr_zap_receipt
696            ,       lrm.sender_comment AS lnurl_sender_comment
697             FROM payments p
698             LEFT JOIN payment_details_lightning l ON p.id = l.payment_id
699             LEFT JOIN payment_details_token t ON p.id = t.payment_id
700             LEFT JOIN payment_details_spark s ON p.id = s.payment_id
701             LEFT JOIN payment_metadata pm ON p.id = pm.payment_id
702             LEFT JOIN lnurl_receive_metadata lrm ON l.payment_hash = lrm.payment_hash
703             WHERE p.id = ?",
704        )?;
705
706        let payment = stmt.query_row(params![id], map_payment)?;
707        Ok(payment)
708    }
709
710    async fn get_payment_by_invoice(
711        &self,
712        invoice: String,
713    ) -> Result<Option<Payment>, StorageError> {
714        let connection = self.get_connection()?;
715
716        let mut stmt = connection.prepare(
717            "SELECT p.id
718            ,       p.payment_type
719            ,       p.status
720            ,       p.amount
721            ,       p.fees
722            ,       p.timestamp
723            ,       p.method
724            ,       p.withdraw_tx_id
725            ,       p.deposit_tx_id
726            ,       p.spark
727            ,       l.invoice AS lightning_invoice
728            ,       l.payment_hash AS lightning_payment_hash
729            ,       l.destination_pubkey AS lightning_destination_pubkey
730            ,       COALESCE(l.description, pm.lnurl_description) AS lightning_description
731            ,       l.preimage AS lightning_preimage
732            ,       pm.lnurl_pay_info
733            ,       pm.lnurl_withdraw_info
734            ,       pm.conversion_info
735            ,       t.metadata AS token_metadata
736            ,       t.tx_hash AS token_tx_hash
737            ,       t.invoice_details AS token_invoice_details
738            ,       s.invoice_details AS spark_invoice_details
739            ,       s.htlc_details AS spark_htlc_details
740            ,       lrm.nostr_zap_request AS lnurl_nostr_zap_request
741            ,       lrm.nostr_zap_receipt AS lnurl_nostr_zap_receipt
742            ,       lrm.sender_comment AS lnurl_sender_comment
743             FROM payments p
744             LEFT JOIN payment_details_lightning l ON p.id = l.payment_id
745             LEFT JOIN payment_details_token t ON p.id = t.payment_id
746             LEFT JOIN payment_details_spark s ON p.id = s.payment_id
747             LEFT JOIN payment_metadata pm ON p.id = pm.payment_id
748             LEFT JOIN lnurl_receive_metadata lrm ON l.payment_hash = lrm.payment_hash
749             WHERE l.invoice = ?",
750        )?;
751
752        let payment = stmt.query_row(params![invoice], map_payment);
753        match payment {
754            Ok(payment) => Ok(Some(payment)),
755            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
756            Err(e) => Err(e.into()),
757        }
758    }
759
760    async fn add_deposit(
761        &self,
762        txid: String,
763        vout: u32,
764        amount_sats: u64,
765    ) -> Result<(), StorageError> {
766        let connection = self.get_connection()?;
767        connection.execute(
768            "INSERT OR IGNORE INTO unclaimed_deposits (txid, vout, amount_sats) 
769             VALUES (?, ?, ?)",
770            params![txid, vout, amount_sats,],
771        )?;
772        Ok(())
773    }
774
775    async fn delete_deposit(&self, txid: String, vout: u32) -> Result<(), StorageError> {
776        let connection = self.get_connection()?;
777        connection.execute(
778            "DELETE FROM unclaimed_deposits WHERE txid = ? AND vout = ?",
779            params![txid, vout],
780        )?;
781        Ok(())
782    }
783
784    async fn list_deposits(&self) -> Result<Vec<DepositInfo>, StorageError> {
785        let connection = self.get_connection()?;
786        let mut stmt =
787            connection.prepare("SELECT txid, vout, amount_sats, claim_error, refund_tx, refund_tx_id FROM unclaimed_deposits")?;
788        let rows = stmt.query_map(params![], |row| {
789            Ok(DepositInfo {
790                txid: row.get(0)?,
791                vout: row.get(1)?,
792                amount_sats: row.get(2)?,
793                claim_error: row.get(3)?,
794                refund_tx: row.get(4)?,
795                refund_tx_id: row.get(5)?,
796            })
797        })?;
798        let mut deposits = Vec::new();
799        for row in rows {
800            deposits.push(row?);
801        }
802        Ok(deposits)
803    }
804
805    async fn update_deposit(
806        &self,
807        txid: String,
808        vout: u32,
809        payload: UpdateDepositPayload,
810    ) -> Result<(), StorageError> {
811        let connection = self.get_connection()?;
812        match payload {
813            UpdateDepositPayload::ClaimError { error } => {
814                connection.execute(
815                    "UPDATE unclaimed_deposits SET claim_error = ? WHERE txid = ? AND vout = ?",
816                    params![error, txid, vout],
817                )?;
818            }
819            UpdateDepositPayload::Refund {
820                refund_txid,
821                refund_tx,
822            } => {
823                connection.execute(
824                    "UPDATE unclaimed_deposits SET refund_tx = ?, refund_tx_id = ? WHERE txid = ? AND vout = ?",
825                    params![refund_tx, refund_txid, txid, vout],
826                )?;
827            }
828        }
829        Ok(())
830    }
831
832    async fn set_lnurl_metadata(
833        &self,
834        metadata: Vec<SetLnurlMetadataItem>,
835    ) -> Result<(), StorageError> {
836        let connection = self.get_connection()?;
837        for metadata in metadata {
838            connection.execute(
839                "INSERT OR REPLACE INTO lnurl_receive_metadata (payment_hash, nostr_zap_request, nostr_zap_receipt, sender_comment)
840                 VALUES (?, ?, ?, ?)",
841                params![
842                    metadata.payment_hash,
843                    metadata.nostr_zap_request,
844                    metadata.nostr_zap_receipt,
845                    metadata.sender_comment,
846                ],
847            )?;
848        }
849        Ok(())
850    }
851}
852
853/// Bumps the revision number, locking the revision number for updates for the duration of the transaction.
854fn get_next_revision(tx: &Transaction<'_>) -> Result<u64, SyncStorageError> {
855    let revision = tx
856        .query_row(
857            "UPDATE sync_revision
858            SET revision = revision + 1
859            RETURNING revision",
860            [],
861            |row| row.get(0),
862        )
863        .map_err(map_sqlite_error)?;
864    Ok(revision)
865}
866
867impl From<StorageError> for SyncStorageError {
868    fn from(value: StorageError) -> Self {
869        match value {
870            StorageError::Implementation(s) => SyncStorageError::Implementation(s),
871            StorageError::InitializationError(s) => SyncStorageError::InitializationError(s),
872            StorageError::Serialization(s) => SyncStorageError::Serialization(s),
873        }
874    }
875}
876
877#[macros::async_trait]
878impl SyncStorage for SqliteStorage {
879    async fn add_outgoing_change(
880        &self,
881        record: UnversionedRecordChange,
882    ) -> Result<u64, SyncStorageError> {
883        let mut connection = self.get_connection()?;
884        let tx = connection.transaction().map_err(map_sqlite_error)?;
885        let revision = get_next_revision(&tx)?;
886
887        tx.execute(
888            "INSERT INTO sync_outgoing (
889                record_type
890            ,   data_id
891            ,   schema_version
892            ,   commit_time
893            ,   updated_fields_json
894            ,   revision
895            )
896             VALUES (?, ?, ?, strftime('%s','now'), ?, ?)",
897            params![
898                record.id.r#type,
899                record.id.data_id,
900                record.schema_version.clone(),
901                serde_json::to_string(&record.updated_fields)?,
902                revision,
903            ],
904        )
905        .map_err(map_sqlite_error)?;
906
907        tx.commit().map_err(map_sqlite_error)?;
908        Ok(revision)
909    }
910
911    async fn complete_outgoing_sync(&self, record: Record) -> Result<(), SyncStorageError> {
912        let mut connection = self.get_connection()?;
913        let tx = connection.transaction().map_err(map_sqlite_error)?;
914
915        tx.execute(
916            "DELETE FROM sync_outgoing WHERE record_type = ? AND data_id = ? AND revision = ?",
917            params![record.id.r#type, record.id.data_id, record.revision],
918        )
919        .map_err(map_sqlite_error)?;
920
921        tx.execute(
922            "INSERT OR REPLACE INTO sync_state (
923                record_type
924            ,   data_id
925            ,   schema_version
926            ,   commit_time
927            ,   data
928            ,   revision
929            )
930             VALUES (?, ?, ?, strftime('%s','now'), ?, ?)",
931            params![
932                record.id.r#type,
933                record.id.data_id,
934                record.schema_version.clone(),
935                serde_json::to_string(&record.data)?,
936                record.revision,
937            ],
938        )
939        .map_err(map_sqlite_error)?;
940
941        tx.commit().map_err(map_sqlite_error)?;
942        Ok(())
943    }
944
945    async fn get_pending_outgoing_changes(
946        &self,
947        limit: u32,
948    ) -> Result<Vec<OutgoingChange>, SyncStorageError> {
949        let connection = self.get_connection()?;
950
951        let mut stmt = connection
952            .prepare(
953                "SELECT o.record_type
954            ,       o.data_id
955            ,       o.schema_version
956            ,       o.commit_time
957            ,       o.updated_fields_json
958            ,       o.revision
959            ,       e.schema_version AS existing_schema_version
960            ,       e.commit_time AS existing_commit_time
961            ,       e.data AS existing_data
962            ,       e.revision AS existing_revision
963             FROM sync_outgoing o
964             LEFT JOIN sync_state e ON o.record_type = e.record_type AND o.data_id = e.data_id
965             ORDER BY o.revision ASC
966             LIMIT ?",
967            )
968            .map_err(map_sqlite_error)?;
969        let mut rows = stmt.query(params![limit]).map_err(map_sqlite_error)?;
970        let mut results = Vec::new();
971        while let Some(row) = rows.next().map_err(map_sqlite_error)? {
972            let parent = if let Some(existing_data) =
973                row.get::<_, Option<String>>(8).map_err(map_sqlite_error)?
974            {
975                Some(Record {
976                    id: RecordId::new(
977                        row.get::<_, String>(0).map_err(map_sqlite_error)?,
978                        row.get::<_, String>(1).map_err(map_sqlite_error)?,
979                    ),
980                    schema_version: row.get(6).map_err(map_sqlite_error)?,
981                    revision: row.get(9).map_err(map_sqlite_error)?,
982                    data: serde_json::from_str(&existing_data)?,
983                })
984            } else {
985                None
986            };
987            let change = RecordChange {
988                id: RecordId::new(
989                    row.get::<_, String>(0).map_err(map_sqlite_error)?,
990                    row.get::<_, String>(1).map_err(map_sqlite_error)?,
991                ),
992                schema_version: row.get(2).map_err(map_sqlite_error)?,
993                updated_fields: serde_json::from_str(
994                    &row.get::<_, String>(4).map_err(map_sqlite_error)?,
995                )?,
996                revision: row.get(5).map_err(map_sqlite_error)?,
997            };
998            results.push(OutgoingChange { change, parent });
999        }
1000
1001        Ok(results)
1002    }
1003
1004    async fn get_last_revision(&self) -> Result<u64, SyncStorageError> {
1005        let connection = self.get_connection()?;
1006
1007        // Get the maximum revision from sync_state table
1008        let mut stmt = connection
1009            .prepare("SELECT COALESCE(MAX(revision), 0) FROM sync_state")
1010            .map_err(map_sqlite_error)?;
1011
1012        let revision: u64 = stmt
1013            .query_row([], |row| row.get(0))
1014            .map_err(map_sqlite_error)?;
1015
1016        Ok(revision)
1017    }
1018
1019    async fn insert_incoming_records(&self, records: Vec<Record>) -> Result<(), SyncStorageError> {
1020        if records.is_empty() {
1021            return Ok(());
1022        }
1023
1024        let mut connection = self.get_connection()?;
1025        let tx = connection.transaction().map_err(map_sqlite_error)?;
1026
1027        for record in records {
1028            tx.execute(
1029                "INSERT OR REPLACE INTO sync_incoming (
1030                    record_type
1031                ,   data_id
1032                ,   schema_version
1033                ,   commit_time
1034                ,   data
1035                ,   revision
1036                )
1037                 VALUES (?, ?, ?, strftime('%s','now'), ?, ?)",
1038                params![
1039                    record.id.r#type,
1040                    record.id.data_id,
1041                    record.schema_version.clone(),
1042                    serde_json::to_string(&record.data)?,
1043                    record.revision,
1044                ],
1045            )
1046            .map_err(map_sqlite_error)?;
1047        }
1048
1049        tx.commit().map_err(map_sqlite_error)?;
1050        Ok(())
1051    }
1052
1053    async fn delete_incoming_record(&self, record: Record) -> Result<(), SyncStorageError> {
1054        let connection = self.get_connection()?;
1055
1056        connection
1057            .execute(
1058                "DELETE FROM sync_incoming WHERE record_type = ? AND data_id = ? AND revision = ?",
1059                params![record.id.r#type, record.id.data_id, record.revision],
1060            )
1061            .map_err(map_sqlite_error)?;
1062
1063        Ok(())
1064    }
1065
1066    async fn rebase_pending_outgoing_records(&self, revision: u64) -> Result<(), SyncStorageError> {
1067        let mut connection = self.get_connection()?;
1068        let tx = connection.transaction().map_err(map_sqlite_error)?;
1069
1070        let last_revision = tx
1071            .query_row(
1072                "SELECT COALESCE(MAX(revision), 0) FROM sync_state",
1073                [],
1074                |row| row.get(0),
1075            )
1076            .map_err(map_sqlite_error)?;
1077
1078        let diff = revision.saturating_sub(last_revision);
1079
1080        // Update all pending outgoing records to have revision numbers higher than the incoming record
1081        tx.execute(
1082            "UPDATE sync_outgoing 
1083             SET revision = revision + ?",
1084            params![diff],
1085        )
1086        .map_err(map_sqlite_error)?;
1087
1088        tx.commit().map_err(map_sqlite_error)?;
1089        Ok(())
1090    }
1091
1092    async fn get_incoming_records(
1093        &self,
1094        limit: u32,
1095    ) -> Result<Vec<IncomingChange>, SyncStorageError> {
1096        let connection = self.get_connection()?;
1097
1098        let mut stmt = connection
1099            .prepare(
1100                "SELECT i.record_type
1101            ,       i.data_id
1102            ,       i.schema_version
1103            ,       i.data
1104            ,       i.revision
1105            ,       e.schema_version AS existing_schema_version
1106            ,       e.commit_time AS existing_commit_time
1107            ,       e.data AS existing_data
1108            ,       e.revision AS existing_revision
1109             FROM sync_incoming i
1110             LEFT JOIN sync_state e ON i.record_type = e.record_type AND i.data_id = e.data_id
1111             ORDER BY i.revision ASC
1112             LIMIT ?",
1113            )
1114            .map_err(map_sqlite_error)?;
1115
1116        let mut rows = stmt.query(params![limit]).map_err(map_sqlite_error)?;
1117        let mut results = Vec::new();
1118
1119        while let Some(row) = rows.next().map_err(map_sqlite_error)? {
1120            let parent = if let Some(existing_data) =
1121                row.get::<_, Option<String>>(7).map_err(map_sqlite_error)?
1122            {
1123                Some(Record {
1124                    id: RecordId::new(
1125                        row.get::<_, String>(0).map_err(map_sqlite_error)?,
1126                        row.get::<_, String>(1).map_err(map_sqlite_error)?,
1127                    ),
1128                    schema_version: row.get(5).map_err(map_sqlite_error)?,
1129                    revision: row.get(8).map_err(map_sqlite_error)?,
1130                    data: serde_json::from_str(&existing_data)?,
1131                })
1132            } else {
1133                None
1134            };
1135            let record = Record {
1136                id: RecordId::new(
1137                    row.get::<_, String>(0).map_err(map_sqlite_error)?,
1138                    row.get::<_, String>(1).map_err(map_sqlite_error)?,
1139                ),
1140                schema_version: row.get(2).map_err(map_sqlite_error)?,
1141                data: serde_json::from_str(&row.get::<_, String>(3).map_err(map_sqlite_error)?)?,
1142                revision: row.get(4).map_err(map_sqlite_error)?,
1143            };
1144            results.push(IncomingChange {
1145                new_state: record,
1146                old_state: parent,
1147            });
1148        }
1149
1150        Ok(results)
1151    }
1152
1153    async fn get_latest_outgoing_change(&self) -> Result<Option<OutgoingChange>, SyncStorageError> {
1154        let connection = self.get_connection()?;
1155
1156        let mut stmt = connection
1157            .prepare(
1158                "SELECT o.record_type
1159            ,       o.data_id
1160            ,       o.schema_version
1161            ,       o.commit_time
1162            ,       o.updated_fields_json
1163            ,       o.revision
1164            ,       e.schema_version AS existing_schema_version
1165            ,       e.commit_time AS existing_commit_time
1166            ,       e.data AS existing_data
1167            ,       e.revision AS existing_revision
1168             FROM sync_outgoing o
1169             LEFT JOIN sync_state e ON o.record_type = e.record_type AND o.data_id = e.data_id
1170             ORDER BY o.revision DESC
1171             LIMIT 1",
1172            )
1173            .map_err(map_sqlite_error)?;
1174
1175        let mut rows = stmt.query([]).map_err(map_sqlite_error)?;
1176
1177        if let Some(row) = rows.next().map_err(map_sqlite_error)? {
1178            let parent = if let Some(existing_data) =
1179                row.get::<_, Option<String>>(8).map_err(map_sqlite_error)?
1180            {
1181                Some(Record {
1182                    id: RecordId::new(
1183                        row.get::<_, String>(0).map_err(map_sqlite_error)?,
1184                        row.get::<_, String>(1).map_err(map_sqlite_error)?,
1185                    ),
1186                    schema_version: row.get(6).map_err(map_sqlite_error)?,
1187                    revision: row.get(9).map_err(map_sqlite_error)?,
1188                    data: serde_json::from_str(&existing_data)?,
1189                })
1190            } else {
1191                None
1192            };
1193            let change = RecordChange {
1194                id: RecordId::new(
1195                    row.get::<_, String>(0).map_err(map_sqlite_error)?,
1196                    row.get::<_, String>(1).map_err(map_sqlite_error)?,
1197                ),
1198                schema_version: row.get(2).map_err(map_sqlite_error)?,
1199                updated_fields: serde_json::from_str(
1200                    &row.get::<_, String>(4).map_err(map_sqlite_error)?,
1201                )?,
1202                revision: row.get(5).map_err(map_sqlite_error)?,
1203            };
1204
1205            return Ok(Some(OutgoingChange { change, parent }));
1206        }
1207
1208        Ok(None)
1209    }
1210
1211    async fn update_record_from_incoming(&self, record: Record) -> Result<(), SyncStorageError> {
1212        let connection = self.get_connection()?;
1213
1214        connection
1215            .execute(
1216                "INSERT OR REPLACE INTO sync_state (
1217                record_type
1218            ,   data_id
1219            ,   schema_version
1220            ,   commit_time
1221            ,   data
1222            ,   revision
1223            )
1224             VALUES (?, ?, ?, strftime('%s','now'), ?, ?)",
1225                params![
1226                    record.id.r#type,
1227                    record.id.data_id,
1228                    record.schema_version.clone(),
1229                    serde_json::to_string(&record.data)?,
1230                    record.revision,
1231                ],
1232            )
1233            .map_err(map_sqlite_error)?;
1234
1235        Ok(())
1236    }
1237}
1238
1239#[allow(clippy::needless_pass_by_value)]
1240fn map_sqlite_error(value: rusqlite::Error) -> SyncStorageError {
1241    SyncStorageError::Implementation(value.to_string())
1242}
1243
1244#[allow(clippy::too_many_lines)]
1245fn map_payment(row: &Row<'_>) -> Result<Payment, rusqlite::Error> {
1246    let withdraw_tx_id: Option<String> = row.get(7)?;
1247    let deposit_tx_id: Option<String> = row.get(8)?;
1248    let spark: Option<i32> = row.get(9)?;
1249    let lightning_invoice: Option<String> = row.get(10)?;
1250    let token_metadata: Option<String> = row.get(18)?;
1251    let details = match (
1252        lightning_invoice,
1253        withdraw_tx_id,
1254        deposit_tx_id,
1255        spark,
1256        token_metadata,
1257    ) {
1258        (Some(invoice), _, _, _, _) => {
1259            let payment_hash: String = row.get(11)?;
1260            let destination_pubkey: String = row.get(12)?;
1261            let description: Option<String> = row.get(13)?;
1262            let preimage: Option<String> = row.get(14)?;
1263            let lnurl_pay_info: Option<LnurlPayInfo> = row.get(15)?;
1264            let lnurl_withdraw_info: Option<LnurlWithdrawInfo> = row.get(16)?;
1265            let lnurl_nostr_zap_request: Option<String> = row.get(23)?;
1266            let lnurl_nostr_zap_receipt: Option<String> = row.get(24)?;
1267            let lnurl_sender_comment: Option<String> = row.get(25)?;
1268            let lnurl_receive_metadata =
1269                if lnurl_nostr_zap_request.is_some() || lnurl_sender_comment.is_some() {
1270                    Some(LnurlReceiveMetadata {
1271                        nostr_zap_request: lnurl_nostr_zap_request,
1272                        nostr_zap_receipt: lnurl_nostr_zap_receipt,
1273                        sender_comment: lnurl_sender_comment,
1274                    })
1275                } else {
1276                    None
1277                };
1278            Some(PaymentDetails::Lightning {
1279                invoice,
1280                payment_hash,
1281                destination_pubkey,
1282                description,
1283                preimage,
1284                lnurl_pay_info,
1285                lnurl_withdraw_info,
1286                lnurl_receive_metadata,
1287            })
1288        }
1289        (_, Some(tx_id), _, _, _) => Some(PaymentDetails::Withdraw { tx_id }),
1290        (_, _, Some(tx_id), _, _) => Some(PaymentDetails::Deposit { tx_id }),
1291        (_, _, _, Some(_), _) => {
1292            let invoice_details_str: Option<String> = row.get(21)?;
1293            let invoice_details = invoice_details_str
1294                .map(|s| serde_json_from_str(&s, 21))
1295                .transpose()?;
1296            let htlc_details_str: Option<String> = row.get(22)?;
1297            let htlc_details = htlc_details_str
1298                .map(|s| serde_json_from_str(&s, 22))
1299                .transpose()?;
1300            let conversion_info_str: Option<String> = row.get(17)?;
1301            let conversion_info: Option<ConversionInfo> = conversion_info_str
1302                .map(|s: String| serde_json_from_str(&s, 17))
1303                .transpose()?;
1304            Some(PaymentDetails::Spark {
1305                invoice_details,
1306                htlc_details,
1307                conversion_info,
1308            })
1309        }
1310        (_, _, _, _, Some(metadata)) => {
1311            let invoice_details_str: Option<String> = row.get(20)?;
1312            let invoice_details = invoice_details_str
1313                .map(|s| serde_json_from_str(&s, 20))
1314                .transpose()?;
1315            let conversion_info_str: Option<String> = row.get(17)?;
1316            let conversion_info: Option<ConversionInfo> = conversion_info_str
1317                .map(|s: String| serde_json_from_str(&s, 17))
1318                .transpose()?;
1319            Some(PaymentDetails::Token {
1320                metadata: serde_json_from_str(&metadata, 18)?,
1321                tx_hash: row.get(19)?,
1322                invoice_details,
1323                conversion_info,
1324            })
1325        }
1326        _ => None,
1327    };
1328    Ok(Payment {
1329        id: row.get(0)?,
1330        payment_type: row.get::<_, String>(1)?.parse().map_err(|e: String| {
1331            rusqlite::Error::FromSqlConversionFailure(1, rusqlite::types::Type::Text, e.into())
1332        })?,
1333        status: row.get::<_, String>(2)?.parse().map_err(|e: String| {
1334            rusqlite::Error::FromSqlConversionFailure(2, rusqlite::types::Type::Text, e.into())
1335        })?,
1336        amount: row.get::<_, U128SqlWrapper>(3)?.0,
1337        fees: row.get::<_, U128SqlWrapper>(4)?.0,
1338        timestamp: row.get(5)?,
1339        details,
1340        method: row.get(6)?,
1341    })
1342}
1343
1344impl ToSql for PaymentDetails {
1345    fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
1346        to_sql_json(self)
1347    }
1348}
1349
1350impl FromSql for PaymentDetails {
1351    fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
1352        from_sql_json(value)
1353    }
1354}
1355
1356impl ToSql for PaymentMethod {
1357    fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
1358        Ok(rusqlite::types::ToSqlOutput::from(self.to_string()))
1359    }
1360}
1361
1362impl FromSql for PaymentMethod {
1363    fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
1364        match value {
1365            ValueRef::Text(i) => {
1366                let s = std::str::from_utf8(i).map_err(|e| FromSqlError::Other(Box::new(e)))?;
1367                // NOTE: trim_matches/to_lowercase is here, because this used to be serde_json serialized.
1368                let payment_method: PaymentMethod = s
1369                    .trim_matches('"')
1370                    .to_lowercase()
1371                    .parse()
1372                    .map_err(|()| FromSqlError::InvalidType)?;
1373                Ok(payment_method)
1374            }
1375            _ => Err(FromSqlError::InvalidType),
1376        }
1377    }
1378}
1379
1380impl ToSql for DepositClaimError {
1381    fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
1382        to_sql_json(self)
1383    }
1384}
1385
1386impl FromSql for DepositClaimError {
1387    fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
1388        from_sql_json(value)
1389    }
1390}
1391
1392impl ToSql for LnurlPayInfo {
1393    fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
1394        to_sql_json(self)
1395    }
1396}
1397
1398impl FromSql for LnurlPayInfo {
1399    fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
1400        from_sql_json(value)
1401    }
1402}
1403
1404impl ToSql for LnurlWithdrawInfo {
1405    fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
1406        to_sql_json(self)
1407    }
1408}
1409
1410impl FromSql for LnurlWithdrawInfo {
1411    fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
1412        from_sql_json(value)
1413    }
1414}
1415
1416fn to_sql_json<T>(value: T) -> rusqlite::Result<ToSqlOutput<'static>>
1417where
1418    T: serde::Serialize,
1419{
1420    let json = serde_json::to_string(&value)
1421        .map_err(|e| rusqlite::Error::ToSqlConversionFailure(Box::new(e)))?;
1422    Ok(rusqlite::types::ToSqlOutput::from(json))
1423}
1424
1425fn from_sql_json<T>(value: ValueRef<'_>) -> FromSqlResult<T>
1426where
1427    T: serde::de::DeserializeOwned,
1428{
1429    match value {
1430        ValueRef::Text(i) => {
1431            let s = std::str::from_utf8(i).map_err(|e| FromSqlError::Other(Box::new(e)))?;
1432            let deserialized: T = serde_json::from_str(s).map_err(|_| FromSqlError::InvalidType)?;
1433            Ok(deserialized)
1434        }
1435        _ => Err(FromSqlError::InvalidType),
1436    }
1437}
1438
1439fn serde_json_from_str<T>(value: &str, index: usize) -> Result<T, rusqlite::Error>
1440where
1441    T: serde::de::DeserializeOwned,
1442{
1443    serde_json::from_str(value).map_err(|e| {
1444        rusqlite::Error::FromSqlConversionFailure(index, rusqlite::types::Type::Text, Box::new(e))
1445    })
1446}
1447
1448struct U128SqlWrapper(u128);
1449
1450impl ToSql for U128SqlWrapper {
1451    fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
1452        let string = self.0.to_string();
1453        Ok(rusqlite::types::ToSqlOutput::from(string))
1454    }
1455}
1456
1457impl FromSql for U128SqlWrapper {
1458    fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
1459        match value {
1460            ValueRef::Text(i) => {
1461                let s = std::str::from_utf8(i).map_err(|e| FromSqlError::Other(Box::new(e)))?;
1462                let integer = s.parse::<u128>().map_err(|_| FromSqlError::InvalidType)?;
1463                Ok(U128SqlWrapper(integer))
1464            }
1465            _ => Err(FromSqlError::InvalidType),
1466        }
1467    }
1468}
1469
1470#[cfg(test)]
1471mod tests {
1472
1473    use crate::SqliteStorage;
1474    use std::path::PathBuf;
1475
1476    /// Helper function to create a temporary directory for tests
1477    /// Uses std library to avoid external dependency
1478    fn create_temp_dir(name: &str) -> PathBuf {
1479        let mut path = std::env::temp_dir();
1480        // Use UUID for uniqueness to avoid conflicts between parallel tests
1481        path.push(format!("breez-test-{}-{}", name, uuid::Uuid::new_v4()));
1482        std::fs::create_dir_all(&path).unwrap();
1483        path
1484    }
1485
1486    #[tokio::test]
1487    async fn test_sqlite_storage() {
1488        let temp_dir = create_temp_dir("sqlite_storage");
1489        let storage = SqliteStorage::new(&temp_dir).unwrap();
1490
1491        crate::persist::tests::test_sqlite_storage(Box::new(storage)).await;
1492    }
1493
1494    #[tokio::test]
1495    async fn test_unclaimed_deposits_crud() {
1496        let temp_dir = create_temp_dir("sqlite_storage_deposits");
1497        let storage = SqliteStorage::new(&temp_dir).unwrap();
1498
1499        crate::persist::tests::test_unclaimed_deposits_crud(Box::new(storage)).await;
1500    }
1501
1502    #[tokio::test]
1503    async fn test_deposit_refunds() {
1504        let temp_dir = create_temp_dir("sqlite_storage_refund_tx");
1505        let storage = SqliteStorage::new(&temp_dir).unwrap();
1506
1507        crate::persist::tests::test_deposit_refunds(Box::new(storage)).await;
1508    }
1509
1510    #[tokio::test]
1511    async fn test_payment_type_filtering() {
1512        let temp_dir = create_temp_dir("sqlite_storage_type_filter");
1513        let storage = SqliteStorage::new(&temp_dir).unwrap();
1514
1515        crate::persist::tests::test_payment_type_filtering(Box::new(storage)).await;
1516    }
1517
1518    #[tokio::test]
1519    async fn test_payment_status_filtering() {
1520        let temp_dir = create_temp_dir("sqlite_storage_status_filter");
1521        let storage = SqliteStorage::new(&temp_dir).unwrap();
1522
1523        crate::persist::tests::test_payment_status_filtering(Box::new(storage)).await;
1524    }
1525
1526    #[tokio::test]
1527    async fn test_payment_asset_filtering() {
1528        let temp_dir = create_temp_dir("sqlite_storage_asset_filter");
1529        let storage = SqliteStorage::new(&temp_dir).unwrap();
1530
1531        crate::persist::tests::test_asset_filtering(Box::new(storage)).await;
1532    }
1533
1534    #[tokio::test]
1535    async fn test_timestamp_filtering() {
1536        let temp_dir = create_temp_dir("sqlite_storage_timestamp_filter");
1537        let storage = SqliteStorage::new(&temp_dir).unwrap();
1538
1539        crate::persist::tests::test_timestamp_filtering(Box::new(storage)).await;
1540    }
1541
1542    #[tokio::test]
1543    async fn test_spark_htlc_status_filtering() {
1544        let temp_dir = create_temp_dir("sqlite_storage_htlc_filter");
1545        let storage = SqliteStorage::new(&temp_dir).unwrap();
1546
1547        crate::persist::tests::test_spark_htlc_status_filtering(Box::new(storage)).await;
1548    }
1549
1550    #[tokio::test]
1551    async fn test_conversion_refund_needed_filtering() {
1552        let temp_dir = create_temp_dir("sqlite_storage_conversion_refund_needed_filter");
1553        let storage = SqliteStorage::new(&temp_dir).unwrap();
1554
1555        crate::persist::tests::test_conversion_refund_needed_filtering(Box::new(storage)).await;
1556    }
1557
1558    #[tokio::test]
1559    async fn test_combined_filters() {
1560        let temp_dir = create_temp_dir("sqlite_storage_combined_filter");
1561        let storage = SqliteStorage::new(&temp_dir).unwrap();
1562
1563        crate::persist::tests::test_combined_filters(Box::new(storage)).await;
1564    }
1565
1566    #[tokio::test]
1567    async fn test_sort_order() {
1568        let temp_dir = create_temp_dir("sqlite_storage_sort_order");
1569        let storage = SqliteStorage::new(&temp_dir).unwrap();
1570
1571        crate::persist::tests::test_sort_order(Box::new(storage)).await;
1572    }
1573
1574    #[tokio::test]
1575    async fn test_payment_metadata() {
1576        let temp_dir = create_temp_dir("sqlite_storage_payment_request_metadata");
1577        let storage = SqliteStorage::new(&temp_dir).unwrap();
1578
1579        crate::persist::tests::test_payment_metadata(Box::new(storage)).await;
1580    }
1581
1582    #[tokio::test]
1583    async fn test_payment_details_update_persistence() {
1584        let temp_dir = create_temp_dir("sqlite_storage_payment_details_update");
1585        let storage = SqliteStorage::new(&temp_dir).unwrap();
1586
1587        crate::persist::tests::test_payment_details_update_persistence(Box::new(storage)).await;
1588    }
1589
1590    #[tokio::test]
1591    async fn test_sync_storage() {
1592        let temp_dir = create_temp_dir("sqlite_sync_storage");
1593        let storage = SqliteStorage::new(&temp_dir).unwrap();
1594
1595        crate::persist::tests::test_sqlite_sync_storage(Box::new(storage)).await;
1596    }
1597}