breez_sdk_spark/persist/
sqlite.rs

1use std::path::{Path, PathBuf};
2
3use macros::async_trait;
4use rusqlite::{
5    Connection, Row, ToSql, TransactionBehavior, params,
6    types::{FromSql, FromSqlError, FromSqlResult, ToSqlOutput, ValueRef},
7};
8use rusqlite_migration::{M, Migrations, SchemaVersion};
9
10use crate::{
11    AssetFilter, ConversionInfo, DepositInfo, LnurlPayInfo, LnurlReceiveMetadata,
12    LnurlWithdrawInfo, PaymentDetails, PaymentMethod, SparkHtlcDetails, SparkHtlcStatus,
13    TokenTransactionType,
14    error::DepositClaimError,
15    persist::{
16        PaymentMetadata, SetLnurlMetadataItem, StorageListPaymentsRequest,
17        StoragePaymentDetailsFilter, UpdateDepositPayload,
18    },
19    sync_storage::{
20        IncomingChange, OutgoingChange, Record, RecordChange, RecordId, UnversionedRecordChange,
21    },
22};
23
24use std::collections::HashMap;
25
26use tracing::warn;
27
28use super::{Payment, Storage, StorageError};
29
30const DEFAULT_DB_FILENAME: &str = "storage.sql";
31/// SQLite-based storage implementation
32pub struct SqliteStorage {
33    db_dir: PathBuf,
34}
35
36impl SqliteStorage {
37    /// Creates a new `SQLite` storage
38    ///
39    /// # Arguments
40    ///
41    /// * `path` - Path to the `SQLite` database file
42    ///
43    /// # Returns
44    ///
45    /// A new `SqliteStorage` instance or an error
46    pub fn new(path: &Path) -> Result<Self, StorageError> {
47        let storage = Self {
48            db_dir: path.to_path_buf(),
49        };
50
51        #[cfg(not(all(target_family = "wasm", target_os = "unknown")))]
52        std::fs::create_dir_all(path)
53            .map_err(|e| StorageError::InitializationError(e.to_string()))?;
54
55        storage.migrate()?;
56        Ok(storage)
57    }
58
59    pub(crate) fn get_connection(&self) -> Result<Connection, StorageError> {
60        Ok(Connection::open(self.get_db_path())?)
61    }
62
63    fn get_db_path(&self) -> PathBuf {
64        self.db_dir.join(DEFAULT_DB_FILENAME)
65    }
66
67    fn migrate(&self) -> Result<(), StorageError> {
68        let migrations =
69            Migrations::new(Self::current_migrations().into_iter().map(M::up).collect());
70        let mut conn = self.get_connection()?;
71        let previous_version = match migrations.current_version(&conn)? {
72            SchemaVersion::Inside(previous_version) => previous_version.get(),
73            _ => 0,
74        };
75        migrations.to_latest(&mut conn)?;
76
77        if previous_version < 6 {
78            Self::migrate_lnurl_metadata_description(&mut conn)?;
79        }
80
81        Ok(())
82    }
83
84    fn migrate_lnurl_metadata_description(conn: &mut Connection) -> Result<(), StorageError> {
85        let mut stmt = conn.prepare("SELECT payment_id, lnurl_pay_info FROM payment_metadata")?;
86        let pay_infos: Vec<_> = stmt
87            .query_map([], |row| {
88                let payment_id: String = row.get(0)?;
89                let lnurl_pay_info: Option<LnurlPayInfo> = row.get(1)?;
90                Ok((payment_id, lnurl_pay_info))
91            })?
92            .collect::<Result<_, _>>()?;
93        let pay_infos = pay_infos
94            .into_iter()
95            .filter_map(|(payment_id, lnurl_pay_info)| {
96                let pay_info = lnurl_pay_info?;
97                let description = pay_info.extract_description()?;
98                Some((payment_id, description))
99            })
100            .collect::<Vec<_>>();
101
102        for pay_info in pay_infos {
103            conn.execute(
104                "UPDATE payment_metadata SET lnurl_description = ? WHERE payment_id = ?",
105                params![pay_info.1, pay_info.0],
106            )?;
107        }
108
109        Ok(())
110    }
111
112    #[allow(clippy::too_many_lines)]
113    pub(crate) fn current_migrations() -> Vec<&'static str> {
114        vec![
115            "CREATE TABLE IF NOT EXISTS payments (
116              id TEXT PRIMARY KEY,
117              payment_type TEXT NOT NULL,
118              status TEXT NOT NULL,
119              amount INTEGER NOT NULL,
120              fees INTEGER NOT NULL,
121              timestamp INTEGER NOT NULL,
122              details TEXT,
123              method TEXT
124            );",
125            "CREATE TABLE IF NOT EXISTS settings (
126              key TEXT PRIMARY KEY,
127              value TEXT NOT NULL
128            );",
129            "CREATE TABLE IF NOT EXISTS unclaimed_deposits (
130              txid TEXT NOT NULL,
131              vout INTEGER NOT NULL,
132              amount_sats INTEGER,
133              claim_error TEXT,
134              refund_tx TEXT,
135              refund_tx_id TEXT,
136              PRIMARY KEY (txid, vout)
137            );",
138            "CREATE TABLE IF NOT EXISTS payment_metadata (
139              payment_id TEXT PRIMARY KEY,
140              lnurl_pay_info TEXT
141            );",
142            "CREATE TABLE IF NOT EXISTS deposit_refunds (
143              deposit_tx_id TEXT NOT NULL,
144              deposit_vout INTEGER NOT NULL,
145              refund_tx TEXT NOT NULL,
146              refund_tx_id TEXT NOT NULL,
147              PRIMARY KEY (deposit_tx_id, deposit_vout)              
148            );",
149            "ALTER TABLE payment_metadata ADD COLUMN lnurl_description TEXT;",
150            "
151            ALTER TABLE payments ADD COLUMN withdraw_tx_id TEXT;
152            ALTER TABLE payments ADD COLUMN deposit_tx_id TEXT;
153            ALTER TABLE payments ADD COLUMN spark INTEGER;
154            CREATE TABLE payment_details_lightning (
155              payment_id TEXT PRIMARY KEY,
156              invoice TEXT NOT NULL,
157              payment_hash TEXT NOT NULL,
158              destination_pubkey TEXT NOT NULL,
159              description TEXT,
160              preimage TEXT,
161              FOREIGN KEY (payment_id) REFERENCES payments(id) ON DELETE CASCADE
162            );
163            INSERT INTO payment_details_lightning (payment_id, invoice, payment_hash, destination_pubkey, description, preimage)
164            SELECT id, json_extract(details, '$.Lightning.invoice'), json_extract(details, '$.Lightning.payment_hash'), 
165                json_extract(details, '$.Lightning.destination_pubkey'), json_extract(details, '$.Lightning.description'), 
166                json_extract(details, '$.Lightning.preimage') 
167            FROM payments WHERE json_extract(details, '$.Lightning.invoice') IS NOT NULL;
168
169            UPDATE payments SET withdraw_tx_id = json_extract(details, '$.Withdraw.tx_id')
170            WHERE json_extract(details, '$.Withdraw.tx_id') IS NOT NULL;
171
172            UPDATE payments SET deposit_tx_id = json_extract(details, '$.Deposit.tx_id')
173            WHERE json_extract(details, '$.Deposit.tx_id') IS NOT NULL;
174
175            ALTER TABLE payments DROP COLUMN details;
176
177            CREATE INDEX idx_payment_details_lightning_invoice ON payment_details_lightning(invoice);
178            ",
179            "CREATE TABLE payment_details_token (
180              payment_id TEXT PRIMARY KEY,
181              metadata TEXT NOT NULL,
182              tx_hash TEXT NOT NULL,
183              FOREIGN KEY (payment_id) REFERENCES payments(id) ON DELETE CASCADE
184            );",
185            // Migration to change payments amount and fees from INTEGER to TEXT
186            "CREATE TABLE payments_new (
187              id TEXT PRIMARY KEY,
188              payment_type TEXT NOT NULL,
189              status TEXT NOT NULL,
190              amount TEXT NOT NULL,
191              fees TEXT NOT NULL,
192              timestamp INTEGER NOT NULL,
193              method TEXT,
194              withdraw_tx_id TEXT,
195              deposit_tx_id TEXT,
196              spark INTEGER
197            );",
198            "INSERT INTO payments_new (id, payment_type, status, amount, fees, timestamp, method, withdraw_tx_id, deposit_tx_id, spark)
199             SELECT id, payment_type, status, CAST(amount AS TEXT), CAST(fees AS TEXT), timestamp, method, withdraw_tx_id, deposit_tx_id, spark
200             FROM payments;",
201            "DROP TABLE payments;",
202            "ALTER TABLE payments_new RENAME TO payments;",
203            "CREATE TABLE payment_details_spark (
204              payment_id TEXT NOT NULL PRIMARY KEY,
205              invoice_details TEXT NOT NULL,
206              FOREIGN KEY (payment_id) REFERENCES payments(id) ON DELETE CASCADE
207            );
208            ALTER TABLE payment_details_token ADD COLUMN invoice_details TEXT;",
209            "ALTER TABLE payment_metadata ADD COLUMN lnurl_withdraw_info TEXT;",
210            // sync_revision: tracks the last committed revision (from server-acknowledged
211            // or server-received records). Does NOT include pending outgoing queue ids.
212            // sync_outgoing.revision stores a local queue id for ordering/de-duplication only.
213            "CREATE TABLE sync_revision (
214                revision INTEGER NOT NULL DEFAULT 0
215            );
216            INSERT INTO sync_revision (revision) VALUES (0);
217            CREATE TABLE sync_outgoing(
218                record_type TEXT NOT NULL,
219                data_id TEXT NOT NULL,
220                schema_version TEXT NOT NULL,
221                commit_time INTEGER NOT NULL,
222                updated_fields_json TEXT NOT NULL,
223                revision INTEGER NOT NULL
224            );
225            CREATE INDEX idx_sync_outgoing_data_id_record_type ON sync_outgoing(record_type, data_id);
226            CREATE TABLE sync_state(
227                record_type TEXT NOT NULL,
228                data_id TEXT NOT NULL,
229                schema_version TEXT NOT NULL,
230                commit_time INTEGER NOT NULL,
231                data TEXT NOT NULL,
232                revision INTEGER NOT NULL,
233                PRIMARY KEY(record_type, data_id)
234            );",
235            "CREATE TABLE sync_incoming(
236                record_type TEXT NOT NULL,
237                data_id TEXT NOT NULL,
238                schema_version TEXT NOT NULL,
239                commit_time INTEGER NOT NULL,
240                data TEXT NOT NULL,
241                revision INTEGER NOT NULL,
242                PRIMARY KEY(record_type, data_id, revision)
243            );
244            CREATE INDEX idx_sync_incoming_revision ON sync_incoming(revision);",
245            "ALTER TABLE payment_details_spark RENAME TO tmp_payment_details_spark;
246            CREATE TABLE payment_details_spark (
247              payment_id TEXT NOT NULL PRIMARY KEY,
248              invoice_details TEXT,
249              htlc_details TEXT,
250              FOREIGN KEY (payment_id) REFERENCES payments(id) ON DELETE CASCADE
251            );
252            INSERT INTO payment_details_spark (payment_id, invoice_details)
253             SELECT payment_id, invoice_details FROM tmp_payment_details_spark;
254            DROP TABLE tmp_payment_details_spark;",
255            "CREATE TABLE lnurl_receive_metadata (
256                payment_hash TEXT NOT NULL PRIMARY KEY,
257                nostr_zap_request TEXT,
258                nostr_zap_receipt TEXT,
259                sender_comment TEXT
260            );",
261            // Delete all unclaimed deposits to clear old claim_error JSON format.
262            // Deposits will be recovered on next sync.
263            "DELETE FROM unclaimed_deposits;",
264            // Clear all sync tables due to BreezSigner signature change.
265            // This forces users to sync from scratch to the sync server.
266            // Also delete the sync_initial_complete flag to force re-populating
267            // all payment metadata for outgoing sync using the new key.
268            "DELETE FROM sync_outgoing;
269             DELETE FROM sync_incoming;
270             DELETE FROM sync_state;
271             UPDATE sync_revision SET revision = 0;
272             DELETE FROM settings WHERE key = 'sync_initial_complete';",
273            "ALTER TABLE payment_metadata ADD COLUMN token_conversion_info TEXT;",
274            "ALTER TABLE payment_metadata ADD COLUMN parent_payment_id TEXT;",
275            "
276            ALTER TABLE payment_metadata DROP COLUMN token_conversion_info;
277            ALTER TABLE payment_metadata ADD COLUMN conversion_info TEXT;
278            ",
279            // Add tx_type column with a default value of 'transfer'.
280            // Reset only the token sync position (not bitcoin offset) to trigger token re-sync.
281            // This will update all token payment records with the correct tx_type values.
282            // Note: This intentionally couples to the CachedSyncInfo schema at migration time.
283            "ALTER TABLE payment_details_token ADD COLUMN tx_type TEXT NOT NULL DEFAULT 'transfer';
284            UPDATE settings
285            SET value = json_set(value, '$.last_synced_final_token_payment_id', NULL)
286            WHERE key = 'sync_offset' AND json_valid(value) AND json_type(value, '$.last_synced_final_token_payment_id') IS NOT NULL;",
287            "DELETE FROM sync_outgoing;
288             DELETE FROM sync_incoming;
289             DELETE FROM sync_state;
290             UPDATE sync_revision SET revision = 0;
291             DELETE FROM settings WHERE key = 'sync_initial_complete';",
292            "ALTER TABLE payment_details_lightning ADD COLUMN htlc_status TEXT NOT NULL DEFAULT 'WaitingForPreimage';
293             ALTER TABLE payment_details_lightning ADD COLUMN htlc_expiry_time INTEGER NOT NULL DEFAULT 0;",
294            // Backfill htlc_status for existing Lightning payments where it's NULL.
295            // After this migration, htlc_status is required for all Lightning payments.
296            // Also reset the bitcoin sync offset to trigger a full resync, which will
297            // correct the backfilled expiry_time values.
298            "UPDATE payment_details_lightning
299             SET htlc_status = CASE
300                     WHEN (SELECT status FROM payments WHERE id = payment_id) = 'completed' THEN 'PreimageShared'
301                     WHEN (SELECT status FROM payments WHERE id = payment_id) = 'pending' THEN 'WaitingForPreimage'
302                     ELSE 'Returned'
303                 END;
304             UPDATE settings
305             SET value = json_set(value, '$.offset', 0)
306             WHERE key = 'sync_offset' AND json_valid(value);",
307            // Add preimage column for LUD-21 and NIP-57 support
308            "ALTER TABLE lnurl_receive_metadata ADD COLUMN preimage TEXT;",
309            // Clear the lnurl_metadata_updated_after setting to force re-sync
310            // This ensures clients get the new preimage field from the server
311            "DELETE FROM settings WHERE key = 'lnurl_metadata_updated_after';",
312            // Clear cached lightning address - schema changed from string to LnurlInfo struct
313            "DELETE FROM settings WHERE key = 'lightning_address';",
314        ]
315    }
316}
317
318/// Maps a rusqlite error to the appropriate `StorageError`.
319/// Database busy/locked errors are mapped to `Connection` (transient),
320/// other errors are mapped to `Implementation`.
321#[allow(clippy::needless_pass_by_value)]
322fn map_sqlite_error(e: rusqlite::Error) -> StorageError {
323    match e {
324        rusqlite::Error::SqliteFailure(err, _)
325            if err.code == rusqlite::ErrorCode::DatabaseBusy
326                || err.code == rusqlite::ErrorCode::DatabaseLocked =>
327        {
328            StorageError::Connection(e.to_string())
329        }
330        _ => StorageError::Implementation(e.to_string()),
331    }
332}
333
334impl From<rusqlite::Error> for StorageError {
335    fn from(value: rusqlite::Error) -> Self {
336        map_sqlite_error(value)
337    }
338}
339
340impl From<rusqlite_migration::Error> for StorageError {
341    fn from(value: rusqlite_migration::Error) -> Self {
342        StorageError::Implementation(value.to_string())
343    }
344}
345
346#[async_trait]
347impl Storage for SqliteStorage {
348    #[allow(clippy::too_many_lines)]
349    async fn list_payments(
350        &self,
351        request: StorageListPaymentsRequest,
352    ) -> Result<Vec<Payment>, StorageError> {
353        let connection = self.get_connection()?;
354
355        // Build WHERE clauses based on filters
356        let mut where_clauses = Vec::new();
357        let mut params: Vec<Box<dyn ToSql>> = Vec::new();
358
359        // Filter by payment type
360        if let Some(ref type_filter) = request.type_filter
361            && !type_filter.is_empty()
362        {
363            let placeholders = type_filter
364                .iter()
365                .map(|_| "?")
366                .collect::<Vec<_>>()
367                .join(", ");
368            where_clauses.push(format!("p.payment_type IN ({placeholders})"));
369            for payment_type in type_filter {
370                params.push(Box::new(payment_type.to_string()));
371            }
372        }
373
374        // Filter by status
375        if let Some(ref status_filter) = request.status_filter
376            && !status_filter.is_empty()
377        {
378            let placeholders = status_filter
379                .iter()
380                .map(|_| "?")
381                .collect::<Vec<_>>()
382                .join(", ");
383            where_clauses.push(format!("p.status IN ({placeholders})"));
384            for status in status_filter {
385                params.push(Box::new(status.to_string()));
386            }
387        }
388
389        // Filter by timestamp range
390        if let Some(from_timestamp) = request.from_timestamp {
391            where_clauses.push("p.timestamp >= ?".to_string());
392            params.push(Box::new(from_timestamp));
393        }
394
395        if let Some(to_timestamp) = request.to_timestamp {
396            where_clauses.push("p.timestamp < ?".to_string());
397            params.push(Box::new(to_timestamp));
398        }
399
400        // Filter by asset
401        if let Some(ref asset_filter) = request.asset_filter {
402            match asset_filter {
403                AssetFilter::Bitcoin => {
404                    where_clauses.push("t.metadata IS NULL".to_string());
405                }
406                AssetFilter::Token { token_identifier } => {
407                    where_clauses.push("t.metadata IS NOT NULL".to_string());
408                    if let Some(identifier) = token_identifier {
409                        // Filter by specific token identifier
410                        where_clauses
411                            .push("json_extract(t.metadata, '$.identifier') = ?".to_string());
412                        params.push(Box::new(identifier.clone()));
413                    }
414                }
415            }
416        }
417
418        // Filter by payment details. If any filter matches, we include the payment
419        if let Some(ref payment_details_filter) = request.payment_details_filter {
420            let mut all_payment_details_clauses = Vec::new();
421            for payment_details_filter in payment_details_filter {
422                let mut payment_details_clauses = Vec::new();
423                // Filter by HTLC status (Spark or Lightning)
424                let htlc_filter = match payment_details_filter {
425                    StoragePaymentDetailsFilter::Spark {
426                        htlc_status: Some(s),
427                        ..
428                    } if !s.is_empty() => Some(("s", s)),
429                    StoragePaymentDetailsFilter::Lightning {
430                        htlc_status: Some(s),
431                        ..
432                    } if !s.is_empty() => Some(("l", s)),
433                    _ => None,
434                };
435                if let Some((alias, htlc_statuses)) = htlc_filter {
436                    let placeholders = htlc_statuses
437                        .iter()
438                        .map(|_| "?")
439                        .collect::<Vec<_>>()
440                        .join(", ");
441                    if alias == "l" {
442                        // Lightning: htlc_status is a direct column
443                        payment_details_clauses.push(format!("l.htlc_status IN ({placeholders})"));
444                    } else {
445                        // Spark: htlc_details is still JSON
446                        payment_details_clauses.push(format!(
447                            "json_extract(s.htlc_details, '$.status') IN ({placeholders})"
448                        ));
449                    }
450                    for htlc_status in htlc_statuses {
451                        params.push(Box::new(htlc_status.to_string()));
452                    }
453                }
454                // Filter by conversion info presence
455                let conversion_filter = match payment_details_filter {
456                    StoragePaymentDetailsFilter::Spark {
457                        conversion_refund_needed: Some(v),
458                        ..
459                    } => Some((v, "p.spark = 1")),
460                    StoragePaymentDetailsFilter::Token {
461                        conversion_refund_needed: Some(v),
462                        ..
463                    } => Some((v, "p.spark IS NULL")),
464                    _ => None,
465                };
466                if let Some((conversion_refund_needed, type_check)) = conversion_filter {
467                    let refund_needed = if *conversion_refund_needed {
468                        "= 'RefundNeeded'"
469                    } else {
470                        "!= 'RefundNeeded'"
471                    };
472                    payment_details_clauses.push(format!(
473                        "{type_check} AND pm.conversion_info IS NOT NULL AND
474                         json_extract(pm.conversion_info, '$.status') {refund_needed}"
475                    ));
476                }
477                // Filter by token transaction hash
478                if let StoragePaymentDetailsFilter::Token {
479                    tx_hash: Some(tx_hash),
480                    ..
481                } = payment_details_filter
482                {
483                    payment_details_clauses.push("t.tx_hash = ?".to_string());
484                    params.push(Box::new(tx_hash.clone()));
485                }
486
487                // Filter by token transaction type
488                if let StoragePaymentDetailsFilter::Token {
489                    tx_type: Some(tx_type),
490                    ..
491                } = payment_details_filter
492                {
493                    payment_details_clauses.push("t.tx_type = ?".to_string());
494                    params.push(Box::new(tx_type.to_string()));
495                }
496
497                // Filter by LNURL preimage status
498                if let StoragePaymentDetailsFilter::Lightning {
499                    has_lnurl_preimage: Some(has_preimage),
500                    ..
501                } = payment_details_filter
502                {
503                    if *has_preimage {
504                        payment_details_clauses.push("lrm.preimage IS NOT NULL".to_string());
505                    } else {
506                        // Has lnurl metadata, lightning preimage exists, but lnurl preimage not yet sent
507                        payment_details_clauses.push(
508                            "lrm.payment_hash IS NOT NULL AND l.preimage IS NOT NULL AND lrm.preimage IS NULL".to_string(),
509                        );
510                    }
511                }
512
513                if !payment_details_clauses.is_empty() {
514                    all_payment_details_clauses
515                        .push(format!("({})", payment_details_clauses.join(" AND ")));
516                }
517            }
518
519            if !all_payment_details_clauses.is_empty() {
520                where_clauses.push(format!("({})", all_payment_details_clauses.join(" OR ")));
521            }
522        }
523
524        // Exclude child payments (those with a parent_payment_id)
525        // Child payments are accessed via the parent's related_payments field
526        where_clauses.push("pm.parent_payment_id IS NULL".to_string());
527
528        // Build the WHERE clause
529        let where_sql = if where_clauses.is_empty() {
530            String::new()
531        } else {
532            format!("WHERE {}", where_clauses.join(" AND "))
533        };
534
535        // Determine sort order
536        let order_direction = if request.sort_ascending.unwrap_or(false) {
537            "ASC"
538        } else {
539            "DESC"
540        };
541
542        let query = format!(
543            "{SELECT_PAYMENT_SQL} {where_sql} ORDER BY p.timestamp {order_direction} LIMIT {} OFFSET {}",
544            request.limit.unwrap_or(u32::MAX),
545            request.offset.unwrap_or(0)
546        );
547
548        let mut stmt = connection.prepare(&query)?;
549        let param_refs: Vec<&dyn ToSql> = params.iter().map(std::convert::AsRef::as_ref).collect();
550        let payments = stmt
551            .query_map(param_refs.as_slice(), map_payment)?
552            .collect::<Result<Vec<_>, _>>()?;
553        Ok(payments)
554    }
555
556    #[allow(clippy::too_many_lines)]
557    async fn insert_payment(&self, payment: Payment) -> Result<(), StorageError> {
558        let mut connection = self.get_connection()?;
559        let tx = connection.transaction_with_behavior(TransactionBehavior::Immediate)?;
560        tx.execute(
561            "INSERT INTO payments (id, payment_type, status, amount, fees, timestamp, method) 
562             VALUES (?, ?, ?, ?, ?, ?, ?)
563             ON CONFLICT(id) DO UPDATE SET 
564                payment_type=excluded.payment_type,
565                status=excluded.status,
566                amount=excluded.amount,
567                fees=excluded.fees,
568                timestamp=excluded.timestamp,
569                method=excluded.method",
570            params![
571                payment.id,
572                payment.payment_type.to_string(),
573                payment.status.to_string(),
574                U128SqlWrapper(payment.amount),
575                U128SqlWrapper(payment.fees),
576                payment.timestamp,
577                payment.method,
578            ],
579        )?;
580
581        match payment.details {
582            Some(PaymentDetails::Withdraw { tx_id }) => {
583                tx.execute(
584                    "UPDATE payments SET withdraw_tx_id = ? WHERE id = ?",
585                    params![tx_id, payment.id],
586                )?;
587            }
588            Some(PaymentDetails::Deposit { tx_id }) => {
589                tx.execute(
590                    "UPDATE payments SET deposit_tx_id = ? WHERE id = ?",
591                    params![tx_id, payment.id],
592                )?;
593            }
594            Some(PaymentDetails::Spark {
595                invoice_details,
596                htlc_details,
597                ..
598            }) => {
599                tx.execute(
600                    "UPDATE payments SET spark = 1 WHERE id = ?",
601                    params![payment.id],
602                )?;
603                if invoice_details.is_some() || htlc_details.is_some() {
604                    // Upsert both details together and avoid overwriting existing data with NULLs
605                    tx.execute(
606                        "INSERT INTO payment_details_spark (payment_id, invoice_details, htlc_details)
607                         VALUES (?, ?, ?)
608                         ON CONFLICT(payment_id) DO UPDATE SET
609                            invoice_details=COALESCE(excluded.invoice_details, payment_details_spark.invoice_details),
610                            htlc_details=COALESCE(excluded.htlc_details, payment_details_spark.htlc_details)",
611                        params![
612                            payment.id,
613                            invoice_details.as_ref().map(serde_json::to_string).transpose()?,
614                            htlc_details.as_ref().map(serde_json::to_string).transpose()?,
615                        ],
616                    )?;
617                }
618            }
619            Some(PaymentDetails::Token {
620                metadata,
621                tx_hash,
622                tx_type,
623                invoice_details,
624                ..
625            }) => {
626                tx.execute(
627                    "INSERT INTO payment_details_token (payment_id, metadata, tx_hash, tx_type, invoice_details)
628                     VALUES (?, ?, ?, ?, ?)
629                     ON CONFLICT(payment_id) DO UPDATE SET 
630                        metadata=excluded.metadata,
631                        tx_hash=excluded.tx_hash,
632                        tx_type=excluded.tx_type,
633                        invoice_details=COALESCE(excluded.invoice_details, payment_details_token.invoice_details)",
634                    params![
635                        payment.id,
636                        serde_json::to_string(&metadata)?,
637                        tx_hash,
638                        tx_type.to_string(),
639                        invoice_details.as_ref().map(serde_json::to_string).transpose()?,
640                    ],
641                )?;
642            }
643            Some(PaymentDetails::Lightning {
644                invoice,
645                destination_pubkey,
646                description,
647                htlc_details,
648                ..
649            }) => {
650                tx.execute(
651                    "INSERT INTO payment_details_lightning (payment_id, invoice, payment_hash, destination_pubkey, description, preimage, htlc_status, htlc_expiry_time)
652                     VALUES (?, ?, ?, ?, ?, ?, ?, ?)
653                     ON CONFLICT(payment_id) DO UPDATE SET
654                        invoice=excluded.invoice,
655                        payment_hash=excluded.payment_hash,
656                        destination_pubkey=excluded.destination_pubkey,
657                        description=excluded.description,
658                        preimage=COALESCE(excluded.preimage, payment_details_lightning.preimage),
659                        htlc_status=COALESCE(excluded.htlc_status, payment_details_lightning.htlc_status),
660                        htlc_expiry_time=COALESCE(excluded.htlc_expiry_time, payment_details_lightning.htlc_expiry_time)",
661                    params![
662                        payment.id,
663                        invoice,
664                        htlc_details.payment_hash,
665                        destination_pubkey,
666                        description,
667                        htlc_details.preimage,
668                        htlc_details.status.to_string(),
669                        htlc_details.expiry_time,
670                    ],
671                )?;
672            }
673            None => {}
674        }
675
676        tx.commit()?;
677        Ok(())
678    }
679
680    async fn insert_payment_metadata(
681        &self,
682        payment_id: String,
683        metadata: PaymentMetadata,
684    ) -> Result<(), StorageError> {
685        let connection = self.get_connection()?;
686
687        connection.execute(
688            "INSERT INTO payment_metadata (payment_id, parent_payment_id, lnurl_pay_info, lnurl_withdraw_info, lnurl_description, conversion_info)
689             VALUES (?, ?, ?, ?, ?, ?)
690             ON CONFLICT(payment_id) DO UPDATE SET
691                parent_payment_id = COALESCE(excluded.parent_payment_id, parent_payment_id),
692                lnurl_pay_info = COALESCE(excluded.lnurl_pay_info, lnurl_pay_info),
693                lnurl_withdraw_info = COALESCE(excluded.lnurl_withdraw_info, lnurl_withdraw_info),
694                lnurl_description = COALESCE(excluded.lnurl_description, lnurl_description),
695                conversion_info = COALESCE(excluded.conversion_info, conversion_info)",
696            params![
697                payment_id,
698                metadata.parent_payment_id,
699                metadata.lnurl_pay_info,
700                metadata.lnurl_withdraw_info,
701                metadata.lnurl_description,
702                metadata.conversion_info.as_ref().map(serde_json::to_string).transpose()?,
703            ],
704        )?;
705
706        Ok(())
707    }
708
709    async fn set_cached_item(&self, key: String, value: String) -> Result<(), StorageError> {
710        let connection = self.get_connection()?;
711
712        connection.execute(
713            "INSERT OR REPLACE INTO settings (key, value) VALUES (?, ?)",
714            params![key, value],
715        )?;
716
717        Ok(())
718    }
719
720    async fn get_cached_item(&self, key: String) -> Result<Option<String>, StorageError> {
721        let connection = self.get_connection()?;
722
723        let mut stmt = connection.prepare("SELECT value FROM settings WHERE key = ?")?;
724
725        let result = stmt.query_row(params![key], |row| {
726            let value_str: String = row.get(0)?;
727            Ok(value_str)
728        });
729
730        match result {
731            Ok(value) => Ok(Some(value)),
732            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
733            Err(e) => Err(e.into()),
734        }
735    }
736
737    async fn delete_cached_item(&self, key: String) -> Result<(), StorageError> {
738        let connection = self.get_connection()?;
739
740        connection.execute("DELETE FROM settings WHERE key = ?", params![key])?;
741
742        Ok(())
743    }
744
745    async fn get_payment_by_id(&self, id: String) -> Result<Payment, StorageError> {
746        let connection = self.get_connection()?;
747        let query = format!("{SELECT_PAYMENT_SQL} WHERE p.id = ?");
748        let mut stmt = connection.prepare(&query)?;
749        let payment = stmt.query_row(params![id], map_payment)?;
750        Ok(payment)
751    }
752
753    async fn get_payment_by_invoice(
754        &self,
755        invoice: String,
756    ) -> Result<Option<Payment>, StorageError> {
757        let connection = self.get_connection()?;
758        let query = format!("{SELECT_PAYMENT_SQL} WHERE l.invoice = ?");
759        let mut stmt = connection.prepare(&query)?;
760        let payment = stmt.query_row(params![invoice], map_payment);
761        match payment {
762            Ok(payment) => Ok(Some(payment)),
763            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
764            Err(e) => Err(e.into()),
765        }
766    }
767
768    async fn get_payments_by_parent_ids(
769        &self,
770        parent_payment_ids: Vec<String>,
771    ) -> Result<HashMap<String, Vec<Payment>>, StorageError> {
772        if parent_payment_ids.is_empty() {
773            return Ok(HashMap::new());
774        }
775
776        let connection = self.get_connection()?;
777
778        // Early exit if no related payments exist
779        let has_related: bool = connection.query_row(
780            "SELECT EXISTS(SELECT 1 FROM payment_metadata WHERE parent_payment_id IS NOT NULL LIMIT 1)",
781            [],
782            |row| row.get(0),
783        )?;
784        if !has_related {
785            return Ok(HashMap::new());
786        }
787
788        // Build the IN clause with placeholders
789        let placeholders: Vec<&str> = parent_payment_ids.iter().map(|_| "?").collect();
790        let in_clause = placeholders.join(", ");
791
792        let query = format!(
793            "{SELECT_PAYMENT_SQL} WHERE pm.parent_payment_id IN ({in_clause}) ORDER BY p.timestamp ASC"
794        );
795
796        let mut stmt = connection.prepare(&query)?;
797        let params: Vec<&dyn ToSql> = parent_payment_ids
798            .iter()
799            .map(|id| id as &dyn ToSql)
800            .collect();
801        let rows = stmt.query_map(params.as_slice(), |row| {
802            let payment = map_payment(row)?;
803            let parent_payment_id: String = row.get(30)?;
804            Ok((parent_payment_id, payment))
805        })?;
806
807        let mut result: HashMap<String, Vec<Payment>> = HashMap::new();
808        for row in rows {
809            let (parent_id, related_payment) = row?;
810            result.entry(parent_id).or_default().push(related_payment);
811        }
812
813        Ok(result)
814    }
815
816    async fn add_deposit(
817        &self,
818        txid: String,
819        vout: u32,
820        amount_sats: u64,
821    ) -> Result<(), StorageError> {
822        let connection = self.get_connection()?;
823        connection.execute(
824            "INSERT OR IGNORE INTO unclaimed_deposits (txid, vout, amount_sats) 
825             VALUES (?, ?, ?)",
826            params![txid, vout, amount_sats,],
827        )?;
828        Ok(())
829    }
830
831    async fn delete_deposit(&self, txid: String, vout: u32) -> Result<(), StorageError> {
832        let connection = self.get_connection()?;
833        connection.execute(
834            "DELETE FROM unclaimed_deposits WHERE txid = ? AND vout = ?",
835            params![txid, vout],
836        )?;
837        Ok(())
838    }
839
840    async fn list_deposits(&self) -> Result<Vec<DepositInfo>, StorageError> {
841        let connection = self.get_connection()?;
842        let mut stmt =
843            connection.prepare("SELECT txid, vout, amount_sats, claim_error, refund_tx, refund_tx_id FROM unclaimed_deposits")?;
844        let rows = stmt.query_map(params![], |row| {
845            Ok(DepositInfo {
846                txid: row.get(0)?,
847                vout: row.get(1)?,
848                amount_sats: row.get(2)?,
849                claim_error: row.get(3)?,
850                refund_tx: row.get(4)?,
851                refund_tx_id: row.get(5)?,
852            })
853        })?;
854        let mut deposits = Vec::new();
855        for row in rows {
856            deposits.push(row?);
857        }
858        Ok(deposits)
859    }
860
861    async fn update_deposit(
862        &self,
863        txid: String,
864        vout: u32,
865        payload: UpdateDepositPayload,
866    ) -> Result<(), StorageError> {
867        let connection = self.get_connection()?;
868        match payload {
869            UpdateDepositPayload::ClaimError { error } => {
870                connection.execute(
871                    "UPDATE unclaimed_deposits SET claim_error = ? WHERE txid = ? AND vout = ?",
872                    params![error, txid, vout],
873                )?;
874            }
875            UpdateDepositPayload::Refund {
876                refund_txid,
877                refund_tx,
878            } => {
879                connection.execute(
880                    "UPDATE unclaimed_deposits SET refund_tx = ?, refund_tx_id = ? WHERE txid = ? AND vout = ?",
881                    params![refund_tx, refund_txid, txid, vout],
882                )?;
883            }
884        }
885        Ok(())
886    }
887
888    async fn set_lnurl_metadata(
889        &self,
890        metadata: Vec<SetLnurlMetadataItem>,
891    ) -> Result<(), StorageError> {
892        let connection = self.get_connection()?;
893        for metadata in metadata {
894            connection.execute(
895                "INSERT OR REPLACE INTO lnurl_receive_metadata (payment_hash, nostr_zap_request, nostr_zap_receipt, sender_comment, preimage)
896                 VALUES (?, ?, ?, ?, ?)",
897                params![
898                    metadata.payment_hash,
899                    metadata.nostr_zap_request,
900                    metadata.nostr_zap_receipt,
901                    metadata.sender_comment,
902                    metadata.preimage,
903                ],
904            )?;
905        }
906        Ok(())
907    }
908
909    async fn add_outgoing_change(
910        &self,
911        record: UnversionedRecordChange,
912    ) -> Result<u64, StorageError> {
913        let mut connection = self.get_connection()?;
914        let tx = connection
915            .transaction_with_behavior(TransactionBehavior::Immediate)
916            .map_err(map_sqlite_error)?;
917
918        // This revision is a local queue id for pending rows, not a server revision.
919        let local_revision: u64 = tx
920            .query_row(
921                "SELECT COALESCE(MAX(revision), 0) + 1 FROM sync_outgoing",
922                [],
923                |row| row.get(0),
924            )
925            .map_err(map_sqlite_error)?;
926
927        tx.execute(
928            "INSERT INTO sync_outgoing (
929                record_type
930            ,   data_id
931            ,   schema_version
932            ,   commit_time
933            ,   updated_fields_json
934            ,   revision
935            )
936             VALUES (?, ?, ?, strftime('%s','now'), ?, ?)",
937            params![
938                record.id.r#type,
939                record.id.data_id,
940                record.schema_version.clone(),
941                serde_json::to_string(&record.updated_fields)?,
942                local_revision,
943            ],
944        )
945        .map_err(map_sqlite_error)?;
946
947        tx.commit().map_err(map_sqlite_error)?;
948        Ok(local_revision)
949    }
950
951    async fn complete_outgoing_sync(
952        &self,
953        record: Record,
954        local_revision: u64,
955    ) -> Result<(), StorageError> {
956        let mut connection = self.get_connection()?;
957        let tx = connection
958            .transaction_with_behavior(TransactionBehavior::Immediate)
959            .map_err(map_sqlite_error)?;
960
961        let rows_deleted = tx
962            .execute(
963                "DELETE FROM sync_outgoing WHERE record_type = ? AND data_id = ? AND revision = ?",
964                params![record.id.r#type, record.id.data_id, local_revision],
965            )
966            .map_err(map_sqlite_error)?;
967
968        if rows_deleted == 0 {
969            warn!(
970                "complete_outgoing_sync: DELETE from sync_outgoing matched 0 rows \
971                 (type={}, data_id={}, revision={})",
972                record.id.r#type, record.id.data_id, local_revision
973            );
974        }
975
976        tx.execute(
977            "INSERT OR REPLACE INTO sync_state (
978                record_type
979            ,   data_id
980            ,   schema_version
981            ,   commit_time
982            ,   data
983            ,   revision
984            )
985             VALUES (?, ?, ?, strftime('%s','now'), ?, ?)",
986            params![
987                record.id.r#type,
988                record.id.data_id,
989                record.schema_version.clone(),
990                serde_json::to_string(&record.data)?,
991                record.revision,
992            ],
993        )
994        .map_err(map_sqlite_error)?;
995
996        tx.execute(
997            "UPDATE sync_revision SET revision = MAX(revision, ?)",
998            params![record.revision],
999        )
1000        .map_err(map_sqlite_error)?;
1001
1002        tx.commit().map_err(map_sqlite_error)?;
1003        Ok(())
1004    }
1005
1006    async fn get_pending_outgoing_changes(
1007        &self,
1008        limit: u32,
1009    ) -> Result<Vec<OutgoingChange>, StorageError> {
1010        let connection = self.get_connection()?;
1011
1012        let mut stmt = connection
1013            .prepare(
1014                "SELECT o.record_type
1015            ,       o.data_id
1016            ,       o.schema_version
1017            ,       o.commit_time
1018            ,       o.updated_fields_json
1019            ,       o.revision
1020            ,       e.schema_version AS existing_schema_version
1021            ,       e.commit_time AS existing_commit_time
1022            ,       e.data AS existing_data
1023            ,       e.revision AS existing_revision
1024             FROM sync_outgoing o
1025             LEFT JOIN sync_state e ON o.record_type = e.record_type AND o.data_id = e.data_id
1026             ORDER BY o.revision ASC
1027             LIMIT ?",
1028            )
1029            .map_err(map_sqlite_error)?;
1030        let mut rows = stmt.query(params![limit]).map_err(map_sqlite_error)?;
1031        let mut results = Vec::new();
1032        while let Some(row) = rows.next().map_err(map_sqlite_error)? {
1033            let parent = if let Some(existing_data) =
1034                row.get::<_, Option<String>>(8).map_err(map_sqlite_error)?
1035            {
1036                Some(Record {
1037                    id: RecordId::new(
1038                        row.get::<_, String>(0).map_err(map_sqlite_error)?,
1039                        row.get::<_, String>(1).map_err(map_sqlite_error)?,
1040                    ),
1041                    schema_version: row.get(6).map_err(map_sqlite_error)?,
1042                    revision: row.get(9).map_err(map_sqlite_error)?,
1043                    data: serde_json::from_str(&existing_data)?,
1044                })
1045            } else {
1046                None
1047            };
1048            let change = RecordChange {
1049                id: RecordId::new(
1050                    row.get::<_, String>(0).map_err(map_sqlite_error)?,
1051                    row.get::<_, String>(1).map_err(map_sqlite_error)?,
1052                ),
1053                schema_version: row.get(2).map_err(map_sqlite_error)?,
1054                updated_fields: serde_json::from_str(
1055                    &row.get::<_, String>(4).map_err(map_sqlite_error)?,
1056                )?,
1057                local_revision: row.get(5).map_err(map_sqlite_error)?,
1058            };
1059            results.push(OutgoingChange { change, parent });
1060        }
1061
1062        Ok(results)
1063    }
1064
1065    async fn get_last_revision(&self) -> Result<u64, StorageError> {
1066        let connection = self.get_connection()?;
1067
1068        let revision: u64 = connection
1069            .query_row("SELECT revision FROM sync_revision", [], |row| row.get(0))
1070            .map_err(map_sqlite_error)?;
1071
1072        Ok(revision)
1073    }
1074
1075    async fn insert_incoming_records(&self, records: Vec<Record>) -> Result<(), StorageError> {
1076        if records.is_empty() {
1077            return Ok(());
1078        }
1079
1080        let mut connection = self.get_connection()?;
1081        let tx = connection
1082            .transaction_with_behavior(TransactionBehavior::Immediate)
1083            .map_err(map_sqlite_error)?;
1084
1085        for record in records {
1086            tx.execute(
1087                "INSERT OR REPLACE INTO sync_incoming (
1088                    record_type
1089                ,   data_id
1090                ,   schema_version
1091                ,   commit_time
1092                ,   data
1093                ,   revision
1094                )
1095                 VALUES (?, ?, ?, strftime('%s','now'), ?, ?)",
1096                params![
1097                    record.id.r#type,
1098                    record.id.data_id,
1099                    record.schema_version.clone(),
1100                    serde_json::to_string(&record.data)?,
1101                    record.revision,
1102                ],
1103            )
1104            .map_err(map_sqlite_error)?;
1105        }
1106
1107        tx.commit().map_err(map_sqlite_error)?;
1108        Ok(())
1109    }
1110
1111    async fn delete_incoming_record(&self, record: Record) -> Result<(), StorageError> {
1112        let connection = self.get_connection()?;
1113
1114        connection
1115            .execute(
1116                "DELETE FROM sync_incoming WHERE record_type = ? AND data_id = ? AND revision = ?",
1117                params![record.id.r#type, record.id.data_id, record.revision],
1118            )
1119            .map_err(map_sqlite_error)?;
1120
1121        Ok(())
1122    }
1123
1124    async fn get_incoming_records(&self, limit: u32) -> Result<Vec<IncomingChange>, StorageError> {
1125        let connection = self.get_connection()?;
1126
1127        let mut stmt = connection
1128            .prepare(
1129                "SELECT i.record_type
1130            ,       i.data_id
1131            ,       i.schema_version
1132            ,       i.data
1133            ,       i.revision
1134            ,       e.schema_version AS existing_schema_version
1135            ,       e.commit_time AS existing_commit_time
1136            ,       e.data AS existing_data
1137            ,       e.revision AS existing_revision
1138             FROM sync_incoming i
1139             LEFT JOIN sync_state e ON i.record_type = e.record_type AND i.data_id = e.data_id
1140             ORDER BY i.revision ASC
1141             LIMIT ?",
1142            )
1143            .map_err(map_sqlite_error)?;
1144
1145        let mut rows = stmt.query(params![limit]).map_err(map_sqlite_error)?;
1146        let mut results = Vec::new();
1147
1148        while let Some(row) = rows.next().map_err(map_sqlite_error)? {
1149            let parent = if let Some(existing_data) =
1150                row.get::<_, Option<String>>(7).map_err(map_sqlite_error)?
1151            {
1152                Some(Record {
1153                    id: RecordId::new(
1154                        row.get::<_, String>(0).map_err(map_sqlite_error)?,
1155                        row.get::<_, String>(1).map_err(map_sqlite_error)?,
1156                    ),
1157                    schema_version: row.get(5).map_err(map_sqlite_error)?,
1158                    revision: row.get(8).map_err(map_sqlite_error)?,
1159                    data: serde_json::from_str(&existing_data)?,
1160                })
1161            } else {
1162                None
1163            };
1164            let record = Record {
1165                id: RecordId::new(
1166                    row.get::<_, String>(0).map_err(map_sqlite_error)?,
1167                    row.get::<_, String>(1).map_err(map_sqlite_error)?,
1168                ),
1169                schema_version: row.get(2).map_err(map_sqlite_error)?,
1170                data: serde_json::from_str(&row.get::<_, String>(3).map_err(map_sqlite_error)?)?,
1171                revision: row.get(4).map_err(map_sqlite_error)?,
1172            };
1173            results.push(IncomingChange {
1174                new_state: record,
1175                old_state: parent,
1176            });
1177        }
1178
1179        Ok(results)
1180    }
1181
1182    async fn get_latest_outgoing_change(&self) -> Result<Option<OutgoingChange>, StorageError> {
1183        let connection = self.get_connection()?;
1184
1185        let mut stmt = connection
1186            .prepare(
1187                "SELECT o.record_type
1188            ,       o.data_id
1189            ,       o.schema_version
1190            ,       o.commit_time
1191            ,       o.updated_fields_json
1192            ,       o.revision
1193            ,       e.schema_version AS existing_schema_version
1194            ,       e.commit_time AS existing_commit_time
1195            ,       e.data AS existing_data
1196            ,       e.revision AS existing_revision
1197             FROM sync_outgoing o
1198             LEFT JOIN sync_state e ON o.record_type = e.record_type AND o.data_id = e.data_id
1199             ORDER BY o.revision DESC
1200             LIMIT 1",
1201            )
1202            .map_err(map_sqlite_error)?;
1203
1204        let mut rows = stmt.query([]).map_err(map_sqlite_error)?;
1205
1206        if let Some(row) = rows.next().map_err(map_sqlite_error)? {
1207            let parent = if let Some(existing_data) =
1208                row.get::<_, Option<String>>(8).map_err(map_sqlite_error)?
1209            {
1210                Some(Record {
1211                    id: RecordId::new(
1212                        row.get::<_, String>(0).map_err(map_sqlite_error)?,
1213                        row.get::<_, String>(1).map_err(map_sqlite_error)?,
1214                    ),
1215                    schema_version: row.get(6).map_err(map_sqlite_error)?,
1216                    revision: row.get(9).map_err(map_sqlite_error)?,
1217                    data: serde_json::from_str(&existing_data)?,
1218                })
1219            } else {
1220                None
1221            };
1222            let change = RecordChange {
1223                id: RecordId::new(
1224                    row.get::<_, String>(0).map_err(map_sqlite_error)?,
1225                    row.get::<_, String>(1).map_err(map_sqlite_error)?,
1226                ),
1227                schema_version: row.get(2).map_err(map_sqlite_error)?,
1228                updated_fields: serde_json::from_str(
1229                    &row.get::<_, String>(4).map_err(map_sqlite_error)?,
1230                )?,
1231                local_revision: row.get(5).map_err(map_sqlite_error)?,
1232            };
1233
1234            return Ok(Some(OutgoingChange { change, parent }));
1235        }
1236
1237        Ok(None)
1238    }
1239
1240    async fn update_record_from_incoming(&self, record: Record) -> Result<(), StorageError> {
1241        let mut connection = self.get_connection()?;
1242        let tx = connection
1243            .transaction_with_behavior(TransactionBehavior::Immediate)
1244            .map_err(map_sqlite_error)?;
1245
1246        tx.execute(
1247            "INSERT OR REPLACE INTO sync_state (
1248                record_type
1249            ,   data_id
1250            ,   schema_version
1251            ,   commit_time
1252            ,   data
1253            ,   revision
1254            )
1255             VALUES (?, ?, ?, strftime('%s','now'), ?, ?)",
1256            params![
1257                record.id.r#type,
1258                record.id.data_id,
1259                record.schema_version.clone(),
1260                serde_json::to_string(&record.data)?,
1261                record.revision,
1262            ],
1263        )
1264        .map_err(map_sqlite_error)?;
1265
1266        tx.execute(
1267            "UPDATE sync_revision SET revision = MAX(revision, ?)",
1268            params![record.revision],
1269        )
1270        .map_err(map_sqlite_error)?;
1271
1272        tx.commit().map_err(map_sqlite_error)?;
1273        Ok(())
1274    }
1275}
1276
1277/// Base query for payment lookups.
1278/// Column indices 0-29 are used by `map_payment`, index 30 (`parent_payment_id`) is only used by `get_payments_by_parent_ids`.
1279const SELECT_PAYMENT_SQL: &str = "
1280    SELECT p.id,
1281           p.payment_type,
1282           p.status,
1283           p.amount,
1284           p.fees,
1285           p.timestamp,
1286           p.method,
1287           p.withdraw_tx_id,
1288           p.deposit_tx_id,
1289           p.spark,
1290           l.invoice AS lightning_invoice,
1291           l.payment_hash AS lightning_payment_hash,
1292           l.destination_pubkey AS lightning_destination_pubkey,
1293           COALESCE(l.description, pm.lnurl_description) AS lightning_description,
1294           l.preimage AS lightning_preimage,
1295           l.htlc_status AS lightning_htlc_status,
1296           l.htlc_expiry_time AS lightning_htlc_expiry_time,
1297           pm.lnurl_pay_info,
1298           pm.lnurl_withdraw_info,
1299           pm.conversion_info,
1300           t.metadata AS token_metadata,
1301           t.tx_hash AS token_tx_hash,
1302           t.tx_type AS token_tx_type,
1303           t.invoice_details AS token_invoice_details,
1304           s.invoice_details AS spark_invoice_details,
1305           s.htlc_details AS spark_htlc_details,
1306           lrm.nostr_zap_request AS lnurl_nostr_zap_request,
1307           lrm.nostr_zap_receipt AS lnurl_nostr_zap_receipt,
1308           lrm.sender_comment AS lnurl_sender_comment,
1309           lrm.payment_hash AS lnurl_payment_hash,
1310           pm.parent_payment_id
1311      FROM payments p
1312      LEFT JOIN payment_details_lightning l ON p.id = l.payment_id
1313      LEFT JOIN payment_details_token t ON p.id = t.payment_id
1314      LEFT JOIN payment_details_spark s ON p.id = s.payment_id
1315      LEFT JOIN payment_metadata pm ON p.id = pm.payment_id
1316      LEFT JOIN lnurl_receive_metadata lrm ON l.payment_hash = lrm.payment_hash";
1317
1318#[allow(clippy::too_many_lines)]
1319fn map_payment(row: &Row<'_>) -> Result<Payment, rusqlite::Error> {
1320    let withdraw_tx_id: Option<String> = row.get(7)?;
1321    let deposit_tx_id: Option<String> = row.get(8)?;
1322    let spark: Option<i32> = row.get(9)?;
1323    let lightning_invoice: Option<String> = row.get(10)?;
1324    let token_metadata: Option<String> = row.get(20)?;
1325    let details = match (
1326        lightning_invoice,
1327        withdraw_tx_id,
1328        deposit_tx_id,
1329        spark,
1330        token_metadata,
1331    ) {
1332        (Some(invoice), _, _, _, _) => {
1333            let payment_hash: String = row.get(11)?;
1334            let destination_pubkey: String = row.get(12)?;
1335            let description: Option<String> = row.get(13)?;
1336            let preimage: Option<String> = row.get(14)?;
1337            let htlc_status: SparkHtlcStatus =
1338                row.get::<_, Option<SparkHtlcStatus>>(15)?.ok_or_else(|| {
1339                    rusqlite::Error::FromSqlConversionFailure(
1340                        15,
1341                        rusqlite::types::Type::Null,
1342                        "htlc_status is required for Lightning payments".into(),
1343                    )
1344                })?;
1345            let htlc_expiry_time: u64 = row.get(16)?;
1346            let htlc_details = SparkHtlcDetails {
1347                payment_hash,
1348                preimage,
1349                expiry_time: htlc_expiry_time,
1350                status: htlc_status,
1351            };
1352            let lnurl_pay_info: Option<LnurlPayInfo> = row.get(17)?;
1353            let lnurl_withdraw_info: Option<LnurlWithdrawInfo> = row.get(18)?;
1354            let lnurl_nostr_zap_request: Option<String> = row.get(26)?;
1355            let lnurl_nostr_zap_receipt: Option<String> = row.get(27)?;
1356            let lnurl_sender_comment: Option<String> = row.get(28)?;
1357            let lnurl_payment_hash: Option<String> = row.get(29)?;
1358            let lnurl_receive_metadata = if lnurl_payment_hash.is_some() {
1359                Some(LnurlReceiveMetadata {
1360                    nostr_zap_request: lnurl_nostr_zap_request,
1361                    nostr_zap_receipt: lnurl_nostr_zap_receipt,
1362                    sender_comment: lnurl_sender_comment,
1363                })
1364            } else {
1365                None
1366            };
1367            Some(PaymentDetails::Lightning {
1368                invoice,
1369                destination_pubkey,
1370                description,
1371                htlc_details,
1372                lnurl_pay_info,
1373                lnurl_withdraw_info,
1374                lnurl_receive_metadata,
1375            })
1376        }
1377        (_, Some(tx_id), _, _, _) => Some(PaymentDetails::Withdraw { tx_id }),
1378        (_, _, Some(tx_id), _, _) => Some(PaymentDetails::Deposit { tx_id }),
1379        (_, _, _, Some(_), _) => {
1380            let invoice_details_str: Option<String> = row.get(24)?;
1381            let invoice_details = invoice_details_str
1382                .map(|s| serde_json_from_str(&s, 24))
1383                .transpose()?;
1384            let htlc_details_str: Option<String> = row.get(25)?;
1385            let htlc_details = htlc_details_str
1386                .map(|s| serde_json_from_str(&s, 25))
1387                .transpose()?;
1388            let conversion_info_str: Option<String> = row.get(19)?;
1389            let conversion_info: Option<ConversionInfo> = conversion_info_str
1390                .map(|s: String| serde_json_from_str(&s, 19))
1391                .transpose()?;
1392            Some(PaymentDetails::Spark {
1393                invoice_details,
1394                htlc_details,
1395                conversion_info,
1396            })
1397        }
1398        (_, _, _, _, Some(metadata)) => {
1399            let tx_type: TokenTransactionType = row.get(22)?;
1400            let invoice_details_str: Option<String> = row.get(23)?;
1401            let invoice_details = invoice_details_str
1402                .map(|s| serde_json_from_str(&s, 23))
1403                .transpose()?;
1404            let conversion_info_str: Option<String> = row.get(19)?;
1405            let conversion_info: Option<ConversionInfo> = conversion_info_str
1406                .map(|s: String| serde_json_from_str(&s, 19))
1407                .transpose()?;
1408            Some(PaymentDetails::Token {
1409                metadata: serde_json_from_str(&metadata, 20)?,
1410                tx_hash: row.get(21)?,
1411                tx_type,
1412                invoice_details,
1413                conversion_info,
1414            })
1415        }
1416        _ => None,
1417    };
1418    Ok(Payment {
1419        id: row.get(0)?,
1420        payment_type: row.get::<_, String>(1)?.parse().map_err(|e: String| {
1421            rusqlite::Error::FromSqlConversionFailure(1, rusqlite::types::Type::Text, e.into())
1422        })?,
1423        status: row.get::<_, String>(2)?.parse().map_err(|e: String| {
1424            rusqlite::Error::FromSqlConversionFailure(2, rusqlite::types::Type::Text, e.into())
1425        })?,
1426        amount: row.get::<_, U128SqlWrapper>(3)?.0,
1427        fees: row.get::<_, U128SqlWrapper>(4)?.0,
1428        timestamp: row.get(5)?,
1429        details,
1430        method: row.get(6)?,
1431        conversion_details: None,
1432    })
1433}
1434
1435impl ToSql for PaymentDetails {
1436    fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
1437        to_sql_json(self)
1438    }
1439}
1440
1441impl FromSql for PaymentDetails {
1442    fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
1443        from_sql_json(value)
1444    }
1445}
1446
1447impl ToSql for PaymentMethod {
1448    fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
1449        Ok(rusqlite::types::ToSqlOutput::from(self.to_string()))
1450    }
1451}
1452
1453impl FromSql for PaymentMethod {
1454    fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
1455        match value {
1456            ValueRef::Text(i) => {
1457                let s = std::str::from_utf8(i).map_err(|e| FromSqlError::Other(Box::new(e)))?;
1458                // NOTE: trim_matches/to_lowercase is here, because this used to be serde_json serialized.
1459                let payment_method: PaymentMethod = s
1460                    .trim_matches('"')
1461                    .to_lowercase()
1462                    .parse()
1463                    .map_err(|()| FromSqlError::InvalidType)?;
1464                Ok(payment_method)
1465            }
1466            _ => Err(FromSqlError::InvalidType),
1467        }
1468    }
1469}
1470
1471impl ToSql for TokenTransactionType {
1472    fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
1473        Ok(rusqlite::types::ToSqlOutput::from(self.to_string()))
1474    }
1475}
1476
1477impl FromSql for TokenTransactionType {
1478    fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
1479        match value {
1480            ValueRef::Text(i) => {
1481                let s = std::str::from_utf8(i).map_err(|e| FromSqlError::Other(Box::new(e)))?;
1482                let tx_type: TokenTransactionType =
1483                    s.parse().map_err(|_: String| FromSqlError::InvalidType)?;
1484                Ok(tx_type)
1485            }
1486            _ => Err(FromSqlError::InvalidType),
1487        }
1488    }
1489}
1490
1491impl ToSql for SparkHtlcStatus {
1492    fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
1493        Ok(rusqlite::types::ToSqlOutput::from(self.to_string()))
1494    }
1495}
1496
1497impl FromSql for SparkHtlcStatus {
1498    fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
1499        match value {
1500            ValueRef::Text(i) => {
1501                let s = std::str::from_utf8(i).map_err(|e| FromSqlError::Other(Box::new(e)))?;
1502                let status: SparkHtlcStatus =
1503                    s.parse().map_err(|_: String| FromSqlError::InvalidType)?;
1504                Ok(status)
1505            }
1506            _ => Err(FromSqlError::InvalidType),
1507        }
1508    }
1509}
1510
1511impl ToSql for DepositClaimError {
1512    fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
1513        to_sql_json(self)
1514    }
1515}
1516
1517impl FromSql for DepositClaimError {
1518    fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
1519        from_sql_json(value)
1520    }
1521}
1522
1523impl ToSql for LnurlPayInfo {
1524    fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
1525        to_sql_json(self)
1526    }
1527}
1528
1529impl FromSql for LnurlPayInfo {
1530    fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
1531        from_sql_json(value)
1532    }
1533}
1534
1535impl ToSql for LnurlWithdrawInfo {
1536    fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
1537        to_sql_json(self)
1538    }
1539}
1540
1541impl FromSql for LnurlWithdrawInfo {
1542    fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
1543        from_sql_json(value)
1544    }
1545}
1546
1547fn to_sql_json<T>(value: T) -> rusqlite::Result<ToSqlOutput<'static>>
1548where
1549    T: serde::Serialize,
1550{
1551    let json = serde_json::to_string(&value)
1552        .map_err(|e| rusqlite::Error::ToSqlConversionFailure(Box::new(e)))?;
1553    Ok(rusqlite::types::ToSqlOutput::from(json))
1554}
1555
1556fn from_sql_json<T>(value: ValueRef<'_>) -> FromSqlResult<T>
1557where
1558    T: serde::de::DeserializeOwned,
1559{
1560    match value {
1561        ValueRef::Text(i) => {
1562            let s = std::str::from_utf8(i).map_err(|e| FromSqlError::Other(Box::new(e)))?;
1563            let deserialized: T = serde_json::from_str(s).map_err(|_| FromSqlError::InvalidType)?;
1564            Ok(deserialized)
1565        }
1566        _ => Err(FromSqlError::InvalidType),
1567    }
1568}
1569
1570fn serde_json_from_str<T>(value: &str, index: usize) -> Result<T, rusqlite::Error>
1571where
1572    T: serde::de::DeserializeOwned,
1573{
1574    serde_json::from_str(value).map_err(|e| {
1575        rusqlite::Error::FromSqlConversionFailure(index, rusqlite::types::Type::Text, Box::new(e))
1576    })
1577}
1578
1579struct U128SqlWrapper(u128);
1580
1581impl ToSql for U128SqlWrapper {
1582    fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
1583        let string = self.0.to_string();
1584        Ok(rusqlite::types::ToSqlOutput::from(string))
1585    }
1586}
1587
1588impl FromSql for U128SqlWrapper {
1589    fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
1590        match value {
1591            ValueRef::Text(i) => {
1592                let s = std::str::from_utf8(i).map_err(|e| FromSqlError::Other(Box::new(e)))?;
1593                let integer = s.parse::<u128>().map_err(|_| FromSqlError::InvalidType)?;
1594                Ok(U128SqlWrapper(integer))
1595            }
1596            _ => Err(FromSqlError::InvalidType),
1597        }
1598    }
1599}
1600
1601#[cfg(test)]
1602mod tests {
1603
1604    use crate::SqliteStorage;
1605    use std::path::PathBuf;
1606
1607    /// Helper function to create a temporary directory for tests
1608    /// Uses std library to avoid external dependency
1609    fn create_temp_dir(name: &str) -> PathBuf {
1610        let mut path = std::env::temp_dir();
1611        // Use UUID for uniqueness to avoid conflicts between parallel tests
1612        path.push(format!("breez-test-{}-{}", name, uuid::Uuid::new_v4()));
1613        std::fs::create_dir_all(&path).unwrap();
1614        path
1615    }
1616
1617    #[tokio::test]
1618    async fn test_storage() {
1619        let temp_dir = create_temp_dir("sqlite_storage");
1620        let storage = SqliteStorage::new(&temp_dir).unwrap();
1621
1622        Box::pin(crate::persist::tests::test_storage(Box::new(storage))).await;
1623    }
1624
1625    #[tokio::test]
1626    async fn test_unclaimed_deposits_crud() {
1627        let temp_dir = create_temp_dir("sqlite_storage_deposits");
1628        let storage = SqliteStorage::new(&temp_dir).unwrap();
1629
1630        crate::persist::tests::test_unclaimed_deposits_crud(Box::new(storage)).await;
1631    }
1632
1633    #[tokio::test]
1634    async fn test_deposit_refunds() {
1635        let temp_dir = create_temp_dir("sqlite_storage_refund_tx");
1636        let storage = SqliteStorage::new(&temp_dir).unwrap();
1637
1638        crate::persist::tests::test_deposit_refunds(Box::new(storage)).await;
1639    }
1640
1641    #[tokio::test]
1642    async fn test_payment_type_filtering() {
1643        let temp_dir = create_temp_dir("sqlite_storage_type_filter");
1644        let storage = SqliteStorage::new(&temp_dir).unwrap();
1645
1646        crate::persist::tests::test_payment_type_filtering(Box::new(storage)).await;
1647    }
1648
1649    #[tokio::test]
1650    async fn test_payment_status_filtering() {
1651        let temp_dir = create_temp_dir("sqlite_storage_status_filter");
1652        let storage = SqliteStorage::new(&temp_dir).unwrap();
1653
1654        crate::persist::tests::test_payment_status_filtering(Box::new(storage)).await;
1655    }
1656
1657    #[tokio::test]
1658    async fn test_payment_asset_filtering() {
1659        let temp_dir = create_temp_dir("sqlite_storage_asset_filter");
1660        let storage = SqliteStorage::new(&temp_dir).unwrap();
1661
1662        crate::persist::tests::test_asset_filtering(Box::new(storage)).await;
1663    }
1664
1665    #[tokio::test]
1666    async fn test_timestamp_filtering() {
1667        let temp_dir = create_temp_dir("sqlite_storage_timestamp_filter");
1668        let storage = SqliteStorage::new(&temp_dir).unwrap();
1669
1670        crate::persist::tests::test_timestamp_filtering(Box::new(storage)).await;
1671    }
1672
1673    #[tokio::test]
1674    async fn test_spark_htlc_status_filtering() {
1675        let temp_dir = create_temp_dir("sqlite_storage_htlc_filter");
1676        let storage = SqliteStorage::new(&temp_dir).unwrap();
1677
1678        crate::persist::tests::test_spark_htlc_status_filtering(Box::new(storage)).await;
1679    }
1680
1681    #[tokio::test]
1682    async fn test_lightning_htlc_details_and_status_filtering() {
1683        let temp_dir = create_temp_dir("sqlite_storage_htlc_details");
1684        let storage = SqliteStorage::new(&temp_dir).unwrap();
1685
1686        crate::persist::tests::test_lightning_htlc_details_and_status_filtering(Box::new(storage))
1687            .await;
1688    }
1689
1690    #[tokio::test]
1691    async fn test_conversion_refund_needed_filtering() {
1692        let temp_dir = create_temp_dir("sqlite_storage_conversion_refund_needed_filter");
1693        let storage = SqliteStorage::new(&temp_dir).unwrap();
1694
1695        crate::persist::tests::test_conversion_refund_needed_filtering(Box::new(storage)).await;
1696    }
1697
1698    #[tokio::test]
1699    async fn test_token_transaction_type_filtering() {
1700        let temp_dir = create_temp_dir("sqlite_storage_token_transaction_type_filter");
1701        let storage = SqliteStorage::new(&temp_dir).unwrap();
1702
1703        crate::persist::tests::test_token_transaction_type_filtering(Box::new(storage)).await;
1704    }
1705
1706    #[tokio::test]
1707    async fn test_combined_filters() {
1708        let temp_dir = create_temp_dir("sqlite_storage_combined_filter");
1709        let storage = SqliteStorage::new(&temp_dir).unwrap();
1710
1711        crate::persist::tests::test_combined_filters(Box::new(storage)).await;
1712    }
1713
1714    #[tokio::test]
1715    async fn test_sort_order() {
1716        let temp_dir = create_temp_dir("sqlite_storage_sort_order");
1717        let storage = SqliteStorage::new(&temp_dir).unwrap();
1718
1719        crate::persist::tests::test_sort_order(Box::new(storage)).await;
1720    }
1721
1722    #[tokio::test]
1723    async fn test_payment_metadata() {
1724        let temp_dir = create_temp_dir("sqlite_storage_payment_request_metadata");
1725        let storage = SqliteStorage::new(&temp_dir).unwrap();
1726
1727        crate::persist::tests::test_payment_metadata(Box::new(storage)).await;
1728    }
1729
1730    #[tokio::test]
1731    async fn test_payment_details_update_persistence() {
1732        let temp_dir = create_temp_dir("sqlite_storage_payment_details_update");
1733        let storage = SqliteStorage::new(&temp_dir).unwrap();
1734
1735        crate::persist::tests::test_payment_details_update_persistence(Box::new(storage)).await;
1736    }
1737
1738    #[tokio::test]
1739    async fn test_pending_lnurl_preimages() {
1740        let temp_dir = create_temp_dir("sqlite_storage_pending_lnurl_preimages");
1741        let storage = SqliteStorage::new(&temp_dir).unwrap();
1742
1743        crate::persist::tests::test_pending_lnurl_preimages(Box::new(storage)).await;
1744    }
1745
1746    #[tokio::test]
1747    async fn test_sync_storage() {
1748        let temp_dir = create_temp_dir("sqlite_sync_storage");
1749        let storage = SqliteStorage::new(&temp_dir).unwrap();
1750
1751        crate::persist::tests::test_sync_storage(Box::new(storage)).await;
1752    }
1753
1754    #[tokio::test]
1755    async fn test_payment_metadata_merge() {
1756        let temp_dir = create_temp_dir("sqlite_payment_metadata_merge");
1757        let storage = SqliteStorage::new(&temp_dir).unwrap();
1758
1759        crate::persist::tests::test_payment_metadata_merge(Box::new(storage)).await;
1760    }
1761
1762    #[tokio::test]
1763    #[allow(clippy::too_many_lines)]
1764    async fn test_migration_tx_type() {
1765        use crate::{
1766            Payment, PaymentDetails, PaymentMethod, PaymentStatus, PaymentType, Storage,
1767            TokenMetadata, TokenTransactionType,
1768            persist::{StorageListPaymentsRequest, StoragePaymentDetailsFilter},
1769        };
1770        use rusqlite::{Connection, params};
1771        use rusqlite_migration::{M, Migrations};
1772
1773        let temp_dir = create_temp_dir("sqlite_migration_tx_type");
1774        let db_path = temp_dir.join(super::DEFAULT_DB_FILENAME);
1775
1776        // Step 1: Create database at version 21 (before tx_type migration)
1777        {
1778            let mut conn = Connection::open(&db_path).unwrap();
1779            let migrations_before_tx_type: Vec<_> = SqliteStorage::current_migrations()
1780                .iter()
1781                .take(22) // Migrations 0-21 (index 22 is the tx_type migration)
1782                .map(|s| M::up(s))
1783                .collect();
1784            let migrations = Migrations::new(migrations_before_tx_type);
1785            migrations.to_latest(&mut conn).unwrap();
1786        }
1787
1788        // Step 2: Insert a token payment WITHOUT tx_type column
1789        {
1790            let conn = Connection::open(&db_path).unwrap();
1791
1792            // Insert into payments table
1793            conn.execute(
1794                "INSERT INTO payments (id, payment_type, status, amount, fees, timestamp, method)
1795                 VALUES (?, ?, ?, ?, ?, ?, ?)",
1796                params![
1797                    "token-migration-test",
1798                    "send",
1799                    "completed",
1800                    "5000",
1801                    "10",
1802                    1_234_567_890_i64,
1803                    "\"token\""
1804                ],
1805            )
1806            .unwrap();
1807
1808            // Insert into payment_details_token WITHOUT tx_type (pre-migration)
1809            let metadata = serde_json::json!({
1810                "identifier": "test-token-id",
1811                "issuer_public_key": format!("02{}", "a".repeat(64)),
1812                "name": "Test Token",
1813                "ticker": "TST",
1814                "decimals": 8,
1815                "max_supply": 1_000_000_u128,
1816                "is_freezable": false
1817            });
1818
1819            conn.execute(
1820                "INSERT INTO payment_details_token (payment_id, metadata, tx_hash)
1821                 VALUES (?, ?, ?)",
1822                params![
1823                    "token-migration-test",
1824                    metadata.to_string(),
1825                    "0xabcdef1234567890"
1826                ],
1827            )
1828            .unwrap();
1829        }
1830
1831        // Step 3: Open with SqliteStorage (triggers migration to latest)
1832        let storage = SqliteStorage::new(&temp_dir).unwrap();
1833
1834        // Step 4: Verify the migrated token payment
1835        let migrated_payment = storage
1836            .get_payment_by_id("token-migration-test".to_string())
1837            .await
1838            .unwrap();
1839
1840        assert_eq!(migrated_payment.id, "token-migration-test");
1841        assert_eq!(migrated_payment.amount, 5000);
1842        assert_eq!(migrated_payment.fees, 10);
1843        assert_eq!(migrated_payment.status, PaymentStatus::Completed);
1844        assert_eq!(migrated_payment.payment_type, PaymentType::Send);
1845        assert_eq!(migrated_payment.method, PaymentMethod::Token);
1846
1847        // Verify token payment details have the default txType
1848        match migrated_payment.details {
1849            Some(PaymentDetails::Token {
1850                metadata,
1851                tx_hash,
1852                tx_type,
1853                ..
1854            }) => {
1855                assert_eq!(metadata.identifier, "test-token-id");
1856                assert_eq!(metadata.name, "Test Token");
1857                assert_eq!(metadata.ticker, "TST");
1858                assert_eq!(metadata.decimals, 8);
1859                assert_eq!(tx_hash, "0xabcdef1234567890");
1860                // Key assertion: migration added default tx_type
1861                assert_eq!(
1862                    tx_type,
1863                    TokenTransactionType::Transfer,
1864                    "Migration should add default txType 'transfer' to token payments"
1865                );
1866            }
1867            _ => panic!("Expected Token payment details"),
1868        }
1869
1870        // Step 5: Insert a new token payment with explicit tx_type
1871        let new_payment = Payment {
1872            id: "new-token-payment".to_string(),
1873            payment_type: PaymentType::Receive,
1874            status: PaymentStatus::Completed,
1875            amount: 8000,
1876            fees: 20,
1877            timestamp: 1_234_567_891,
1878            method: PaymentMethod::Token,
1879            details: Some(PaymentDetails::Token {
1880                metadata: TokenMetadata {
1881                    identifier: "another-token-id".to_string(),
1882                    issuer_public_key: format!("02{}", "b".repeat(64)),
1883                    name: "Another Token".to_string(),
1884                    ticker: "ATK".to_string(),
1885                    decimals: 6,
1886                    max_supply: 2_000_000,
1887                    is_freezable: true,
1888                },
1889                tx_hash: "0x1111222233334444".to_string(),
1890                tx_type: TokenTransactionType::Mint,
1891                invoice_details: None,
1892                conversion_info: None,
1893            }),
1894            conversion_details: None,
1895        };
1896
1897        storage.insert_payment(new_payment).await.unwrap();
1898
1899        // Step 6: List all payments
1900        let request = StorageListPaymentsRequest {
1901            type_filter: None,
1902            status_filter: None,
1903            asset_filter: None,
1904            payment_details_filter: None,
1905            from_timestamp: None,
1906            to_timestamp: None,
1907            offset: None,
1908            limit: None,
1909            sort_ascending: Some(true),
1910        };
1911
1912        let payments = storage.list_payments(request).await.unwrap();
1913        assert_eq!(payments.len(), 2, "Should have both payments");
1914
1915        // Verify migrated payment has Transfer type
1916        let migrated = payments
1917            .iter()
1918            .find(|p| p.id == "token-migration-test")
1919            .unwrap();
1920        match &migrated.details {
1921            Some(PaymentDetails::Token { tx_type, .. }) => {
1922                assert_eq!(*tx_type, TokenTransactionType::Transfer);
1923            }
1924            _ => panic!("Expected Token payment details"),
1925        }
1926
1927        // Verify new payment has Mint type
1928        let new = payments
1929            .iter()
1930            .find(|p| p.id == "new-token-payment")
1931            .unwrap();
1932        match &new.details {
1933            Some(PaymentDetails::Token { tx_type, .. }) => {
1934                assert_eq!(*tx_type, TokenTransactionType::Mint);
1935            }
1936            _ => panic!("Expected Token payment details"),
1937        }
1938
1939        // Step 7: Test filtering by token transaction type
1940        let transfer_filter = StorageListPaymentsRequest {
1941            type_filter: None,
1942            status_filter: None,
1943            asset_filter: None,
1944            payment_details_filter: Some(vec![StoragePaymentDetailsFilter::Token {
1945                conversion_refund_needed: None,
1946                tx_hash: None,
1947                tx_type: Some(TokenTransactionType::Transfer),
1948            }]),
1949            from_timestamp: None,
1950            to_timestamp: None,
1951            offset: None,
1952            limit: None,
1953            sort_ascending: Some(true),
1954        };
1955
1956        let transfer_payments = storage.list_payments(transfer_filter).await.unwrap();
1957        assert_eq!(
1958            transfer_payments.len(),
1959            1,
1960            "Should find only the Transfer payment"
1961        );
1962        assert_eq!(transfer_payments[0].id, "token-migration-test");
1963    }
1964
1965    #[tokio::test]
1966    #[allow(clippy::too_many_lines)]
1967    async fn test_migration_htlc_details() {
1968        use crate::{
1969            PaymentDetails, SparkHtlcStatus, Storage,
1970            persist::{StorageListPaymentsRequest, StoragePaymentDetailsFilter},
1971        };
1972        use rusqlite::{Connection, params};
1973        use rusqlite_migration::{M, Migrations};
1974
1975        let temp_dir = create_temp_dir("sqlite_migration_htlc_details");
1976        let db_path = temp_dir.join(super::DEFAULT_DB_FILENAME);
1977
1978        // Step 1: Create database at version 23 (before the htlc_status backfill migration)
1979        // This includes the ALTER TABLE that adds htlc_status and htlc_expiry_time columns (migration 22)
1980        // but not the backfill UPDATE (migration 23).
1981        {
1982            let mut conn = Connection::open(&db_path).unwrap();
1983            let migrations_before_backfill: Vec<_> = SqliteStorage::current_migrations()
1984                .iter()
1985                .take(23) // Migrations 0-22
1986                .map(|s| M::up(s))
1987                .collect();
1988            let migrations = Migrations::new(migrations_before_backfill);
1989            migrations.to_latest(&mut conn).unwrap();
1990        }
1991
1992        // Step 2: Insert Lightning payments with different statuses to test all branches
1993        {
1994            let conn = Connection::open(&db_path).unwrap();
1995
1996            // Insert a Completed Lightning payment
1997            conn.execute(
1998                "INSERT INTO payments (id, payment_type, status, amount, fees, timestamp, method)
1999                 VALUES (?, ?, ?, ?, ?, ?, ?)",
2000                params![
2001                    "ln-completed",
2002                    "send",
2003                    "completed",
2004                    "1000",
2005                    "10",
2006                    1_700_000_001_i64,
2007                    "\"lightning\""
2008                ],
2009            )
2010            .unwrap();
2011            conn.execute(
2012                "INSERT INTO payment_details_lightning (payment_id, invoice, payment_hash, destination_pubkey, preimage)
2013                 VALUES (?, ?, ?, ?, ?)",
2014                params![
2015                    "ln-completed",
2016                    "lnbc_completed",
2017                    "hash_completed_0123456789abcdef0123456789abcdef0123456789abcdef01234567",
2018                    "03pubkey1",
2019                    "preimage_completed"
2020                ],
2021            )
2022            .unwrap();
2023
2024            // Insert a Pending Lightning payment
2025            conn.execute(
2026                "INSERT INTO payments (id, payment_type, status, amount, fees, timestamp, method)
2027                 VALUES (?, ?, ?, ?, ?, ?, ?)",
2028                params![
2029                    "ln-pending",
2030                    "receive",
2031                    "pending",
2032                    "2000",
2033                    "0",
2034                    1_700_000_002_i64,
2035                    "\"lightning\""
2036                ],
2037            )
2038            .unwrap();
2039            conn.execute(
2040                "INSERT INTO payment_details_lightning (payment_id, invoice, payment_hash, destination_pubkey)
2041                 VALUES (?, ?, ?, ?)",
2042                params![
2043                    "ln-pending",
2044                    "lnbc_pending",
2045                    "hash_pending_0123456789abcdef0123456789abcdef0123456789abcdef012345678",
2046                    "03pubkey2"
2047                ],
2048            )
2049            .unwrap();
2050
2051            // Insert a Failed Lightning payment
2052            conn.execute(
2053                "INSERT INTO payments (id, payment_type, status, amount, fees, timestamp, method)
2054                 VALUES (?, ?, ?, ?, ?, ?, ?)",
2055                params![
2056                    "ln-failed",
2057                    "send",
2058                    "failed",
2059                    "3000",
2060                    "5",
2061                    1_700_000_003_i64,
2062                    "\"lightning\""
2063                ],
2064            )
2065            .unwrap();
2066            conn.execute(
2067                "INSERT INTO payment_details_lightning (payment_id, invoice, payment_hash, destination_pubkey)
2068                 VALUES (?, ?, ?, ?)",
2069                params![
2070                    "ln-failed",
2071                    "lnbc_failed",
2072                    "hash_failed_0123456789abcdef0123456789abcdef0123456789abcdef0123456789",
2073                    "03pubkey3"
2074                ],
2075            )
2076            .unwrap();
2077        }
2078
2079        // Step 3: Open with SqliteStorage (triggers migration 23 - the backfill)
2080        let storage = SqliteStorage::new(&temp_dir).unwrap();
2081
2082        // Step 4: Verify Completed → PreimageShared
2083        let completed = storage
2084            .get_payment_by_id("ln-completed".to_string())
2085            .await
2086            .unwrap();
2087        match &completed.details {
2088            Some(PaymentDetails::Lightning { htlc_details, .. }) => {
2089                assert_eq!(htlc_details.status, SparkHtlcStatus::PreimageShared);
2090                assert_eq!(htlc_details.expiry_time, 0);
2091                assert_eq!(
2092                    htlc_details.payment_hash,
2093                    "hash_completed_0123456789abcdef0123456789abcdef0123456789abcdef01234567"
2094                );
2095                assert_eq!(htlc_details.preimage.as_deref(), Some("preimage_completed"));
2096            }
2097            _ => panic!("Expected Lightning payment details for ln-completed"),
2098        }
2099
2100        // Step 5: Verify Pending → WaitingForPreimage
2101        let pending = storage
2102            .get_payment_by_id("ln-pending".to_string())
2103            .await
2104            .unwrap();
2105        match &pending.details {
2106            Some(PaymentDetails::Lightning { htlc_details, .. }) => {
2107                assert_eq!(htlc_details.status, SparkHtlcStatus::WaitingForPreimage);
2108                assert_eq!(htlc_details.expiry_time, 0);
2109                assert_eq!(
2110                    htlc_details.payment_hash,
2111                    "hash_pending_0123456789abcdef0123456789abcdef0123456789abcdef012345678"
2112                );
2113                assert!(htlc_details.preimage.is_none());
2114            }
2115            _ => panic!("Expected Lightning payment details for ln-pending"),
2116        }
2117
2118        // Step 6: Verify Failed → Returned
2119        let failed = storage
2120            .get_payment_by_id("ln-failed".to_string())
2121            .await
2122            .unwrap();
2123        match &failed.details {
2124            Some(PaymentDetails::Lightning { htlc_details, .. }) => {
2125                assert_eq!(htlc_details.status, SparkHtlcStatus::Returned);
2126                assert_eq!(htlc_details.expiry_time, 0);
2127            }
2128            _ => panic!("Expected Lightning payment details for ln-failed"),
2129        }
2130
2131        // Step 7: Verify filtering by htlc_status works on migrated data
2132        let waiting_payments = storage
2133            .list_payments(StorageListPaymentsRequest {
2134                payment_details_filter: Some(vec![StoragePaymentDetailsFilter::Lightning {
2135                    htlc_status: Some(vec![SparkHtlcStatus::WaitingForPreimage]),
2136                    has_lnurl_preimage: None,
2137                }]),
2138                ..Default::default()
2139            })
2140            .await
2141            .unwrap();
2142        assert_eq!(waiting_payments.len(), 1);
2143        assert_eq!(waiting_payments[0].id, "ln-pending");
2144
2145        let preimage_shared = storage
2146            .list_payments(StorageListPaymentsRequest {
2147                payment_details_filter: Some(vec![StoragePaymentDetailsFilter::Lightning {
2148                    htlc_status: Some(vec![SparkHtlcStatus::PreimageShared]),
2149                    has_lnurl_preimage: None,
2150                }]),
2151                ..Default::default()
2152            })
2153            .await
2154            .unwrap();
2155        assert_eq!(preimage_shared.len(), 1);
2156        assert_eq!(preimage_shared[0].id, "ln-completed");
2157
2158        let returned = storage
2159            .list_payments(StorageListPaymentsRequest {
2160                payment_details_filter: Some(vec![StoragePaymentDetailsFilter::Lightning {
2161                    htlc_status: Some(vec![SparkHtlcStatus::Returned]),
2162                    has_lnurl_preimage: None,
2163                }]),
2164                ..Default::default()
2165            })
2166            .await
2167            .unwrap();
2168        assert_eq!(returned.len(), 1);
2169        assert_eq!(returned[0].id, "ln-failed");
2170    }
2171}