breez_sdk_spark/persist/
sqlite.rs

1use std::path::{Path, PathBuf};
2
3use macros::async_trait;
4use rusqlite::{
5    Connection, Row, ToSql, TransactionBehavior, params,
6    types::{FromSql, FromSqlError, FromSqlResult, ToSqlOutput, ValueRef},
7};
8use rusqlite_migration::{M, Migrations, SchemaVersion};
9
10use crate::{
11    AssetFilter, Contact, ConversionInfo, DepositInfo, ListContactsRequest, LnurlPayInfo,
12    LnurlReceiveMetadata, LnurlWithdrawInfo, PaymentDetails, PaymentMethod, SparkHtlcDetails,
13    SparkHtlcStatus, TokenTransactionType,
14    error::DepositClaimError,
15    persist::{
16        PaymentMetadata, SetLnurlMetadataItem, StorageListPaymentsRequest,
17        StoragePaymentDetailsFilter, UpdateDepositPayload,
18    },
19    sync_storage::{
20        IncomingChange, OutgoingChange, Record, RecordChange, RecordId, UnversionedRecordChange,
21    },
22};
23
24use std::collections::HashMap;
25
26use tracing::warn;
27
28use super::{Payment, Storage, StorageError};
29
30const DEFAULT_DB_FILENAME: &str = "storage.sql";
31/// SQLite-based storage implementation
32pub struct SqliteStorage {
33    db_dir: PathBuf,
34}
35
36impl SqliteStorage {
37    /// Creates a new `SQLite` storage
38    ///
39    /// # Arguments
40    ///
41    /// * `path` - Path to the `SQLite` database file
42    ///
43    /// # Returns
44    ///
45    /// A new `SqliteStorage` instance or an error
46    pub fn new(path: &Path) -> Result<Self, StorageError> {
47        let storage = Self {
48            db_dir: path.to_path_buf(),
49        };
50
51        #[cfg(not(all(target_family = "wasm", target_os = "unknown")))]
52        std::fs::create_dir_all(path)
53            .map_err(|e| StorageError::InitializationError(e.to_string()))?;
54
55        storage.migrate()?;
56        Ok(storage)
57    }
58
59    pub(crate) fn get_connection(&self) -> Result<Connection, StorageError> {
60        Ok(Connection::open(self.get_db_path())?)
61    }
62
63    fn get_db_path(&self) -> PathBuf {
64        self.db_dir.join(DEFAULT_DB_FILENAME)
65    }
66
67    fn migrate(&self) -> Result<(), StorageError> {
68        let migrations =
69            Migrations::new(Self::current_migrations().into_iter().map(M::up).collect());
70        let mut conn = self.get_connection()?;
71        let previous_version = match migrations.current_version(&conn)? {
72            SchemaVersion::Inside(previous_version) => previous_version.get(),
73            _ => 0,
74        };
75        migrations.to_latest(&mut conn)?;
76
77        if previous_version < 6 {
78            Self::migrate_lnurl_metadata_description(&mut conn)?;
79        }
80
81        Ok(())
82    }
83
84    fn migrate_lnurl_metadata_description(conn: &mut Connection) -> Result<(), StorageError> {
85        let mut stmt = conn.prepare("SELECT payment_id, lnurl_pay_info FROM payment_metadata")?;
86        let pay_infos: Vec<_> = stmt
87            .query_map([], |row| {
88                let payment_id: String = row.get(0)?;
89                let lnurl_pay_info: Option<LnurlPayInfo> = row.get(1)?;
90                Ok((payment_id, lnurl_pay_info))
91            })?
92            .collect::<Result<_, _>>()?;
93        let pay_infos = pay_infos
94            .into_iter()
95            .filter_map(|(payment_id, lnurl_pay_info)| {
96                let pay_info = lnurl_pay_info?;
97                let description = pay_info.extract_description()?;
98                Some((payment_id, description))
99            })
100            .collect::<Vec<_>>();
101
102        for pay_info in pay_infos {
103            conn.execute(
104                "UPDATE payment_metadata SET lnurl_description = ? WHERE payment_id = ?",
105                params![pay_info.1, pay_info.0],
106            )?;
107        }
108
109        Ok(())
110    }
111
112    #[allow(clippy::too_many_lines)]
113    pub(crate) fn current_migrations() -> Vec<&'static str> {
114        vec![
115            "CREATE TABLE IF NOT EXISTS payments (
116              id TEXT PRIMARY KEY,
117              payment_type TEXT NOT NULL,
118              status TEXT NOT NULL,
119              amount INTEGER NOT NULL,
120              fees INTEGER NOT NULL,
121              timestamp INTEGER NOT NULL,
122              details TEXT,
123              method TEXT
124            );",
125            "CREATE TABLE IF NOT EXISTS settings (
126              key TEXT PRIMARY KEY,
127              value TEXT NOT NULL
128            );",
129            "CREATE TABLE IF NOT EXISTS unclaimed_deposits (
130              txid TEXT NOT NULL,
131              vout INTEGER NOT NULL,
132              amount_sats INTEGER,
133              claim_error TEXT,
134              refund_tx TEXT,
135              refund_tx_id TEXT,
136              PRIMARY KEY (txid, vout)
137            );",
138            "CREATE TABLE IF NOT EXISTS payment_metadata (
139              payment_id TEXT PRIMARY KEY,
140              lnurl_pay_info TEXT
141            );",
142            "CREATE TABLE IF NOT EXISTS deposit_refunds (
143              deposit_tx_id TEXT NOT NULL,
144              deposit_vout INTEGER NOT NULL,
145              refund_tx TEXT NOT NULL,
146              refund_tx_id TEXT NOT NULL,
147              PRIMARY KEY (deposit_tx_id, deposit_vout)              
148            );",
149            "ALTER TABLE payment_metadata ADD COLUMN lnurl_description TEXT;",
150            "
151            ALTER TABLE payments ADD COLUMN withdraw_tx_id TEXT;
152            ALTER TABLE payments ADD COLUMN deposit_tx_id TEXT;
153            ALTER TABLE payments ADD COLUMN spark INTEGER;
154            CREATE TABLE payment_details_lightning (
155              payment_id TEXT PRIMARY KEY,
156              invoice TEXT NOT NULL,
157              payment_hash TEXT NOT NULL,
158              destination_pubkey TEXT NOT NULL,
159              description TEXT,
160              preimage TEXT,
161              FOREIGN KEY (payment_id) REFERENCES payments(id) ON DELETE CASCADE
162            );
163            INSERT INTO payment_details_lightning (payment_id, invoice, payment_hash, destination_pubkey, description, preimage)
164            SELECT id, json_extract(details, '$.Lightning.invoice'), json_extract(details, '$.Lightning.payment_hash'), 
165                json_extract(details, '$.Lightning.destination_pubkey'), json_extract(details, '$.Lightning.description'), 
166                json_extract(details, '$.Lightning.preimage') 
167            FROM payments WHERE json_extract(details, '$.Lightning.invoice') IS NOT NULL;
168
169            UPDATE payments SET withdraw_tx_id = json_extract(details, '$.Withdraw.tx_id')
170            WHERE json_extract(details, '$.Withdraw.tx_id') IS NOT NULL;
171
172            UPDATE payments SET deposit_tx_id = json_extract(details, '$.Deposit.tx_id')
173            WHERE json_extract(details, '$.Deposit.tx_id') IS NOT NULL;
174
175            ALTER TABLE payments DROP COLUMN details;
176
177            CREATE INDEX idx_payment_details_lightning_invoice ON payment_details_lightning(invoice);
178            ",
179            "CREATE TABLE payment_details_token (
180              payment_id TEXT PRIMARY KEY,
181              metadata TEXT NOT NULL,
182              tx_hash TEXT NOT NULL,
183              FOREIGN KEY (payment_id) REFERENCES payments(id) ON DELETE CASCADE
184            );",
185            // Migration to change payments amount and fees from INTEGER to TEXT
186            "CREATE TABLE payments_new (
187              id TEXT PRIMARY KEY,
188              payment_type TEXT NOT NULL,
189              status TEXT NOT NULL,
190              amount TEXT NOT NULL,
191              fees TEXT NOT NULL,
192              timestamp INTEGER NOT NULL,
193              method TEXT,
194              withdraw_tx_id TEXT,
195              deposit_tx_id TEXT,
196              spark INTEGER
197            );",
198            "INSERT INTO payments_new (id, payment_type, status, amount, fees, timestamp, method, withdraw_tx_id, deposit_tx_id, spark)
199             SELECT id, payment_type, status, CAST(amount AS TEXT), CAST(fees AS TEXT), timestamp, method, withdraw_tx_id, deposit_tx_id, spark
200             FROM payments;",
201            "DROP TABLE payments;",
202            "ALTER TABLE payments_new RENAME TO payments;",
203            "CREATE TABLE payment_details_spark (
204              payment_id TEXT NOT NULL PRIMARY KEY,
205              invoice_details TEXT NOT NULL,
206              FOREIGN KEY (payment_id) REFERENCES payments(id) ON DELETE CASCADE
207            );
208            ALTER TABLE payment_details_token ADD COLUMN invoice_details TEXT;",
209            "ALTER TABLE payment_metadata ADD COLUMN lnurl_withdraw_info TEXT;",
210            // sync_revision: tracks the last committed revision (from server-acknowledged
211            // or server-received records). Does NOT include pending outgoing queue ids.
212            // sync_outgoing.revision stores a local queue id for ordering/de-duplication only.
213            "CREATE TABLE sync_revision (
214                revision INTEGER NOT NULL DEFAULT 0
215            );
216            INSERT INTO sync_revision (revision) VALUES (0);
217            CREATE TABLE sync_outgoing(
218                record_type TEXT NOT NULL,
219                data_id TEXT NOT NULL,
220                schema_version TEXT NOT NULL,
221                commit_time INTEGER NOT NULL,
222                updated_fields_json TEXT NOT NULL,
223                revision INTEGER NOT NULL
224            );
225            CREATE INDEX idx_sync_outgoing_data_id_record_type ON sync_outgoing(record_type, data_id);
226            CREATE TABLE sync_state(
227                record_type TEXT NOT NULL,
228                data_id TEXT NOT NULL,
229                schema_version TEXT NOT NULL,
230                commit_time INTEGER NOT NULL,
231                data TEXT NOT NULL,
232                revision INTEGER NOT NULL,
233                PRIMARY KEY(record_type, data_id)
234            );",
235            "CREATE TABLE sync_incoming(
236                record_type TEXT NOT NULL,
237                data_id TEXT NOT NULL,
238                schema_version TEXT NOT NULL,
239                commit_time INTEGER NOT NULL,
240                data TEXT NOT NULL,
241                revision INTEGER NOT NULL,
242                PRIMARY KEY(record_type, data_id, revision)
243            );
244            CREATE INDEX idx_sync_incoming_revision ON sync_incoming(revision);",
245            "ALTER TABLE payment_details_spark RENAME TO tmp_payment_details_spark;
246            CREATE TABLE payment_details_spark (
247              payment_id TEXT NOT NULL PRIMARY KEY,
248              invoice_details TEXT,
249              htlc_details TEXT,
250              FOREIGN KEY (payment_id) REFERENCES payments(id) ON DELETE CASCADE
251            );
252            INSERT INTO payment_details_spark (payment_id, invoice_details)
253             SELECT payment_id, invoice_details FROM tmp_payment_details_spark;
254            DROP TABLE tmp_payment_details_spark;",
255            "CREATE TABLE lnurl_receive_metadata (
256                payment_hash TEXT NOT NULL PRIMARY KEY,
257                nostr_zap_request TEXT,
258                nostr_zap_receipt TEXT,
259                sender_comment TEXT
260            );",
261            // Delete all unclaimed deposits to clear old claim_error JSON format.
262            // Deposits will be recovered on next sync.
263            "DELETE FROM unclaimed_deposits;",
264            // Clear all sync tables due to BreezSigner signature change.
265            // This forces users to sync from scratch to the sync server.
266            // Also delete the sync_initial_complete flag to force re-populating
267            // all payment metadata for outgoing sync using the new key.
268            "DELETE FROM sync_outgoing;
269             DELETE FROM sync_incoming;
270             DELETE FROM sync_state;
271             UPDATE sync_revision SET revision = 0;
272             DELETE FROM settings WHERE key = 'sync_initial_complete';",
273            "ALTER TABLE payment_metadata ADD COLUMN token_conversion_info TEXT;",
274            "ALTER TABLE payment_metadata ADD COLUMN parent_payment_id TEXT;",
275            "
276            ALTER TABLE payment_metadata DROP COLUMN token_conversion_info;
277            ALTER TABLE payment_metadata ADD COLUMN conversion_info TEXT;
278            ",
279            // Add tx_type column with a default value of 'transfer'.
280            // Reset only the token sync position (not bitcoin offset) to trigger token re-sync.
281            // This will update all token payment records with the correct tx_type values.
282            // Note: This intentionally couples to the CachedSyncInfo schema at migration time.
283            "ALTER TABLE payment_details_token ADD COLUMN tx_type TEXT NOT NULL DEFAULT 'transfer';
284            UPDATE settings
285            SET value = json_set(value, '$.last_synced_final_token_payment_id', NULL)
286            WHERE key = 'sync_offset' AND json_valid(value) AND json_type(value, '$.last_synced_final_token_payment_id') IS NOT NULL;",
287            "DELETE FROM sync_outgoing;
288             DELETE FROM sync_incoming;
289             DELETE FROM sync_state;
290             UPDATE sync_revision SET revision = 0;
291             DELETE FROM settings WHERE key = 'sync_initial_complete';",
292            "ALTER TABLE payment_details_lightning ADD COLUMN htlc_status TEXT NOT NULL DEFAULT 'WaitingForPreimage';
293             ALTER TABLE payment_details_lightning ADD COLUMN htlc_expiry_time INTEGER NOT NULL DEFAULT 0;",
294            // Backfill htlc_status for existing Lightning payments where it's NULL.
295            // After this migration, htlc_status is required for all Lightning payments.
296            // Also reset the bitcoin sync offset to trigger a full resync, which will
297            // correct the backfilled expiry_time values.
298            "UPDATE payment_details_lightning
299             SET htlc_status = CASE
300                     WHEN (SELECT status FROM payments WHERE id = payment_id) = 'completed' THEN 'PreimageShared'
301                     WHEN (SELECT status FROM payments WHERE id = payment_id) = 'pending' THEN 'WaitingForPreimage'
302                     ELSE 'Returned'
303                 END;
304             UPDATE settings
305             SET value = json_set(value, '$.offset', 0)
306             WHERE key = 'sync_offset' AND json_valid(value);",
307            // Add preimage column for LUD-21 and NIP-57 support
308            "ALTER TABLE lnurl_receive_metadata ADD COLUMN preimage TEXT;",
309            // Clear the lnurl_metadata_updated_after setting to force re-sync
310            // This ensures clients get the new preimage field from the server
311            "DELETE FROM settings WHERE key = 'lnurl_metadata_updated_after';",
312            // Clear cached lightning address - schema changed from string to LnurlInfo struct
313            "DELETE FROM settings WHERE key = 'lightning_address';",
314            // Add index on payment_hash for JOIN with lnurl_receive_metadata
315            "CREATE INDEX IF NOT EXISTS idx_payment_details_lightning_payment_hash ON payment_details_lightning(payment_hash);",
316            "CREATE TABLE contacts (
317                id TEXT PRIMARY KEY,
318                name TEXT NOT NULL,
319                payment_identifier TEXT NOT NULL,
320                created_at INTEGER NOT NULL,
321                updated_at INTEGER NOT NULL
322            );",
323        ]
324    }
325}
326
327/// Maps a rusqlite error to the appropriate `StorageError`.
328/// Database busy/locked errors are mapped to `Connection` (transient),
329/// other errors are mapped to `Implementation`.
330#[allow(clippy::needless_pass_by_value)]
331fn map_sqlite_error(e: rusqlite::Error) -> StorageError {
332    match e {
333        rusqlite::Error::SqliteFailure(err, _)
334            if err.code == rusqlite::ErrorCode::DatabaseBusy
335                || err.code == rusqlite::ErrorCode::DatabaseLocked =>
336        {
337            StorageError::Connection(e.to_string())
338        }
339        _ => StorageError::Implementation(e.to_string()),
340    }
341}
342
343impl From<rusqlite::Error> for StorageError {
344    fn from(value: rusqlite::Error) -> Self {
345        map_sqlite_error(value)
346    }
347}
348
349impl From<rusqlite_migration::Error> for StorageError {
350    fn from(value: rusqlite_migration::Error) -> Self {
351        StorageError::Implementation(value.to_string())
352    }
353}
354
355#[async_trait]
356impl Storage for SqliteStorage {
357    #[allow(clippy::too_many_lines)]
358    async fn list_payments(
359        &self,
360        request: StorageListPaymentsRequest,
361    ) -> Result<Vec<Payment>, StorageError> {
362        let connection = self.get_connection()?;
363
364        // Build WHERE clauses based on filters
365        let mut where_clauses = Vec::new();
366        let mut params: Vec<Box<dyn ToSql>> = Vec::new();
367
368        // Filter by payment type
369        if let Some(ref type_filter) = request.type_filter
370            && !type_filter.is_empty()
371        {
372            let placeholders = type_filter
373                .iter()
374                .map(|_| "?")
375                .collect::<Vec<_>>()
376                .join(", ");
377            where_clauses.push(format!("p.payment_type IN ({placeholders})"));
378            for payment_type in type_filter {
379                params.push(Box::new(payment_type.to_string()));
380            }
381        }
382
383        // Filter by status
384        if let Some(ref status_filter) = request.status_filter
385            && !status_filter.is_empty()
386        {
387            let placeholders = status_filter
388                .iter()
389                .map(|_| "?")
390                .collect::<Vec<_>>()
391                .join(", ");
392            where_clauses.push(format!("p.status IN ({placeholders})"));
393            for status in status_filter {
394                params.push(Box::new(status.to_string()));
395            }
396        }
397
398        // Filter by timestamp range
399        if let Some(from_timestamp) = request.from_timestamp {
400            where_clauses.push("p.timestamp >= ?".to_string());
401            params.push(Box::new(from_timestamp));
402        }
403
404        if let Some(to_timestamp) = request.to_timestamp {
405            where_clauses.push("p.timestamp < ?".to_string());
406            params.push(Box::new(to_timestamp));
407        }
408
409        // Filter by asset
410        if let Some(ref asset_filter) = request.asset_filter {
411            match asset_filter {
412                AssetFilter::Bitcoin => {
413                    where_clauses.push("t.metadata IS NULL".to_string());
414                }
415                AssetFilter::Token { token_identifier } => {
416                    where_clauses.push("t.metadata IS NOT NULL".to_string());
417                    if let Some(identifier) = token_identifier {
418                        // Filter by specific token identifier
419                        where_clauses
420                            .push("json_extract(t.metadata, '$.identifier') = ?".to_string());
421                        params.push(Box::new(identifier.clone()));
422                    }
423                }
424            }
425        }
426
427        // Filter by payment details. If any filter matches, we include the payment
428        if let Some(ref payment_details_filter) = request.payment_details_filter {
429            let mut all_payment_details_clauses = Vec::new();
430            for payment_details_filter in payment_details_filter {
431                let mut payment_details_clauses = Vec::new();
432                // Filter by HTLC status (Spark or Lightning)
433                let htlc_filter = match payment_details_filter {
434                    StoragePaymentDetailsFilter::Spark {
435                        htlc_status: Some(s),
436                        ..
437                    } if !s.is_empty() => Some(("s", s)),
438                    StoragePaymentDetailsFilter::Lightning {
439                        htlc_status: Some(s),
440                        ..
441                    } if !s.is_empty() => Some(("l", s)),
442                    _ => None,
443                };
444                if let Some((alias, htlc_statuses)) = htlc_filter {
445                    let placeholders = htlc_statuses
446                        .iter()
447                        .map(|_| "?")
448                        .collect::<Vec<_>>()
449                        .join(", ");
450                    if alias == "l" {
451                        // Lightning: htlc_status is a direct column
452                        payment_details_clauses.push(format!("l.htlc_status IN ({placeholders})"));
453                    } else {
454                        // Spark: htlc_details is still JSON
455                        payment_details_clauses.push(format!(
456                            "json_extract(s.htlc_details, '$.status') IN ({placeholders})"
457                        ));
458                    }
459                    for htlc_status in htlc_statuses {
460                        params.push(Box::new(htlc_status.to_string()));
461                    }
462                }
463                // Filter by conversion info presence
464                let conversion_filter = match payment_details_filter {
465                    StoragePaymentDetailsFilter::Spark {
466                        conversion_refund_needed: Some(v),
467                        ..
468                    } => Some((v, "p.spark = 1")),
469                    StoragePaymentDetailsFilter::Token {
470                        conversion_refund_needed: Some(v),
471                        ..
472                    } => Some((v, "p.spark IS NULL")),
473                    _ => None,
474                };
475                if let Some((conversion_refund_needed, type_check)) = conversion_filter {
476                    let refund_needed = if *conversion_refund_needed {
477                        "= 'RefundNeeded'"
478                    } else {
479                        "!= 'RefundNeeded'"
480                    };
481                    payment_details_clauses.push(format!(
482                        "{type_check} AND pm.conversion_info IS NOT NULL AND
483                         json_extract(pm.conversion_info, '$.status') {refund_needed}"
484                    ));
485                }
486                // Filter by token transaction hash
487                if let StoragePaymentDetailsFilter::Token {
488                    tx_hash: Some(tx_hash),
489                    ..
490                } = payment_details_filter
491                {
492                    payment_details_clauses.push("t.tx_hash = ?".to_string());
493                    params.push(Box::new(tx_hash.clone()));
494                }
495
496                // Filter by token transaction type
497                if let StoragePaymentDetailsFilter::Token {
498                    tx_type: Some(tx_type),
499                    ..
500                } = payment_details_filter
501                {
502                    payment_details_clauses.push("t.tx_type = ?".to_string());
503                    params.push(Box::new(tx_type.to_string()));
504                }
505
506                // Filter by LNURL preimage status
507                if let StoragePaymentDetailsFilter::Lightning {
508                    has_lnurl_preimage: Some(has_preimage),
509                    ..
510                } = payment_details_filter
511                {
512                    if *has_preimage {
513                        payment_details_clauses.push("lrm.preimage IS NOT NULL".to_string());
514                    } else {
515                        // Has lnurl metadata, lightning preimage exists, but lnurl preimage not yet sent
516                        payment_details_clauses.push(
517                            "lrm.payment_hash IS NOT NULL AND l.preimage IS NOT NULL AND lrm.preimage IS NULL".to_string(),
518                        );
519                    }
520                }
521
522                if !payment_details_clauses.is_empty() {
523                    all_payment_details_clauses
524                        .push(format!("({})", payment_details_clauses.join(" AND ")));
525                }
526            }
527
528            if !all_payment_details_clauses.is_empty() {
529                where_clauses.push(format!("({})", all_payment_details_clauses.join(" OR ")));
530            }
531        }
532
533        // Exclude child payments (those with a parent_payment_id)
534        // Child payments are accessed via the parent's related_payments field
535        where_clauses.push("pm.parent_payment_id IS NULL".to_string());
536
537        // Build the WHERE clause
538        let where_sql = if where_clauses.is_empty() {
539            String::new()
540        } else {
541            format!("WHERE {}", where_clauses.join(" AND "))
542        };
543
544        // Determine sort order
545        let order_direction = if request.sort_ascending.unwrap_or(false) {
546            "ASC"
547        } else {
548            "DESC"
549        };
550
551        let query = format!(
552            "{SELECT_PAYMENT_SQL} {where_sql} ORDER BY p.timestamp {order_direction} LIMIT {} OFFSET {}",
553            request.limit.unwrap_or(u32::MAX),
554            request.offset.unwrap_or(0)
555        );
556
557        let mut stmt = connection.prepare(&query)?;
558        let param_refs: Vec<&dyn ToSql> = params.iter().map(std::convert::AsRef::as_ref).collect();
559        let payments = stmt
560            .query_map(param_refs.as_slice(), map_payment)?
561            .collect::<Result<Vec<_>, _>>()?;
562        Ok(payments)
563    }
564
565    #[allow(clippy::too_many_lines)]
566    async fn insert_payment(&self, payment: Payment) -> Result<(), StorageError> {
567        let mut connection = self.get_connection()?;
568        let tx = connection.transaction_with_behavior(TransactionBehavior::Immediate)?;
569
570        // Compute detail columns for the main payments row
571        let (withdraw_tx_id, deposit_tx_id, spark): (Option<&str>, Option<&str>, Option<bool>) =
572            match &payment.details {
573                Some(PaymentDetails::Withdraw { tx_id }) => (Some(tx_id.as_str()), None, None),
574                Some(PaymentDetails::Deposit { tx_id }) => (None, Some(tx_id.as_str()), None),
575                Some(PaymentDetails::Spark { .. }) => (None, None, Some(true)),
576                _ => (None, None, None),
577            };
578
579        // Insert or update main payment record (including detail columns atomically)
580        tx.execute(
581            "INSERT INTO payments (id, payment_type, status, amount, fees, timestamp, method, withdraw_tx_id, deposit_tx_id, spark)
582             VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
583             ON CONFLICT(id) DO UPDATE SET
584                payment_type=excluded.payment_type,
585                status=excluded.status,
586                amount=excluded.amount,
587                fees=excluded.fees,
588                timestamp=excluded.timestamp,
589                method=excluded.method,
590                withdraw_tx_id=excluded.withdraw_tx_id,
591                deposit_tx_id=excluded.deposit_tx_id,
592                spark=excluded.spark",
593            params![
594                payment.id,
595                payment.payment_type.to_string(),
596                payment.status.to_string(),
597                U128SqlWrapper(payment.amount),
598                U128SqlWrapper(payment.fees),
599                payment.timestamp,
600                payment.method,
601                withdraw_tx_id,
602                deposit_tx_id,
603                spark,
604            ],
605        )?;
606
607        match payment.details {
608            Some(PaymentDetails::Spark {
609                invoice_details,
610                htlc_details,
611                ..
612            }) => {
613                if invoice_details.is_some() || htlc_details.is_some() {
614                    // Upsert both details together and avoid overwriting existing data with NULLs
615                    tx.execute(
616                        "INSERT INTO payment_details_spark (payment_id, invoice_details, htlc_details)
617                         VALUES (?, ?, ?)
618                         ON CONFLICT(payment_id) DO UPDATE SET
619                            invoice_details=COALESCE(excluded.invoice_details, payment_details_spark.invoice_details),
620                            htlc_details=COALESCE(excluded.htlc_details, payment_details_spark.htlc_details)",
621                        params![
622                            payment.id,
623                            invoice_details.as_ref().map(serde_json::to_string).transpose()?,
624                            htlc_details.as_ref().map(serde_json::to_string).transpose()?,
625                        ],
626                    )?;
627                }
628            }
629            Some(PaymentDetails::Token {
630                metadata,
631                tx_hash,
632                tx_type,
633                invoice_details,
634                ..
635            }) => {
636                tx.execute(
637                    "INSERT INTO payment_details_token (payment_id, metadata, tx_hash, tx_type, invoice_details)
638                     VALUES (?, ?, ?, ?, ?)
639                     ON CONFLICT(payment_id) DO UPDATE SET 
640                        metadata=excluded.metadata,
641                        tx_hash=excluded.tx_hash,
642                        tx_type=excluded.tx_type,
643                        invoice_details=COALESCE(excluded.invoice_details, payment_details_token.invoice_details)",
644                    params![
645                        payment.id,
646                        serde_json::to_string(&metadata)?,
647                        tx_hash,
648                        tx_type.to_string(),
649                        invoice_details.as_ref().map(serde_json::to_string).transpose()?,
650                    ],
651                )?;
652            }
653            Some(PaymentDetails::Lightning {
654                invoice,
655                destination_pubkey,
656                description,
657                htlc_details,
658                ..
659            }) => {
660                tx.execute(
661                    "INSERT INTO payment_details_lightning (payment_id, invoice, payment_hash, destination_pubkey, description, preimage, htlc_status, htlc_expiry_time)
662                     VALUES (?, ?, ?, ?, ?, ?, ?, ?)
663                     ON CONFLICT(payment_id) DO UPDATE SET
664                        invoice=excluded.invoice,
665                        payment_hash=excluded.payment_hash,
666                        destination_pubkey=excluded.destination_pubkey,
667                        description=excluded.description,
668                        preimage=COALESCE(excluded.preimage, payment_details_lightning.preimage),
669                        htlc_status=COALESCE(excluded.htlc_status, payment_details_lightning.htlc_status),
670                        htlc_expiry_time=COALESCE(excluded.htlc_expiry_time, payment_details_lightning.htlc_expiry_time)",
671                    params![
672                        payment.id,
673                        invoice,
674                        htlc_details.payment_hash,
675                        destination_pubkey,
676                        description,
677                        htlc_details.preimage,
678                        htlc_details.status.to_string(),
679                        htlc_details.expiry_time,
680                    ],
681                )?;
682            }
683            Some(PaymentDetails::Withdraw { .. } | PaymentDetails::Deposit { .. }) | None => {}
684        }
685
686        tx.commit()?;
687        Ok(())
688    }
689
690    async fn insert_payment_metadata(
691        &self,
692        payment_id: String,
693        metadata: PaymentMetadata,
694    ) -> Result<(), StorageError> {
695        let connection = self.get_connection()?;
696
697        connection.execute(
698            "INSERT INTO payment_metadata (payment_id, parent_payment_id, lnurl_pay_info, lnurl_withdraw_info, lnurl_description, conversion_info)
699             VALUES (?, ?, ?, ?, ?, ?)
700             ON CONFLICT(payment_id) DO UPDATE SET
701                parent_payment_id = COALESCE(excluded.parent_payment_id, parent_payment_id),
702                lnurl_pay_info = COALESCE(excluded.lnurl_pay_info, lnurl_pay_info),
703                lnurl_withdraw_info = COALESCE(excluded.lnurl_withdraw_info, lnurl_withdraw_info),
704                lnurl_description = COALESCE(excluded.lnurl_description, lnurl_description),
705                conversion_info = COALESCE(excluded.conversion_info, conversion_info)",
706            params![
707                payment_id,
708                metadata.parent_payment_id,
709                metadata.lnurl_pay_info,
710                metadata.lnurl_withdraw_info,
711                metadata.lnurl_description,
712                metadata.conversion_info.as_ref().map(serde_json::to_string).transpose()?,
713            ],
714        )?;
715
716        Ok(())
717    }
718
719    async fn set_cached_item(&self, key: String, value: String) -> Result<(), StorageError> {
720        let connection = self.get_connection()?;
721
722        connection.execute(
723            "INSERT OR REPLACE INTO settings (key, value) VALUES (?, ?)",
724            params![key, value],
725        )?;
726
727        Ok(())
728    }
729
730    async fn get_cached_item(&self, key: String) -> Result<Option<String>, StorageError> {
731        let connection = self.get_connection()?;
732
733        let mut stmt = connection.prepare("SELECT value FROM settings WHERE key = ?")?;
734
735        let result = stmt.query_row(params![key], |row| {
736            let value_str: String = row.get(0)?;
737            Ok(value_str)
738        });
739
740        match result {
741            Ok(value) => Ok(Some(value)),
742            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
743            Err(e) => Err(e.into()),
744        }
745    }
746
747    async fn delete_cached_item(&self, key: String) -> Result<(), StorageError> {
748        let connection = self.get_connection()?;
749
750        connection.execute("DELETE FROM settings WHERE key = ?", params![key])?;
751
752        Ok(())
753    }
754
755    async fn get_payment_by_id(&self, id: String) -> Result<Payment, StorageError> {
756        let connection = self.get_connection()?;
757        let query = format!("{SELECT_PAYMENT_SQL} WHERE p.id = ?");
758        let mut stmt = connection.prepare(&query)?;
759        let payment = stmt.query_row(params![id], map_payment)?;
760        Ok(payment)
761    }
762
763    async fn get_payment_by_invoice(
764        &self,
765        invoice: String,
766    ) -> Result<Option<Payment>, StorageError> {
767        let connection = self.get_connection()?;
768        let query = format!("{SELECT_PAYMENT_SQL} WHERE l.invoice = ?");
769        let mut stmt = connection.prepare(&query)?;
770        let payment = stmt.query_row(params![invoice], map_payment);
771        match payment {
772            Ok(payment) => Ok(Some(payment)),
773            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
774            Err(e) => Err(e.into()),
775        }
776    }
777
778    async fn get_payments_by_parent_ids(
779        &self,
780        parent_payment_ids: Vec<String>,
781    ) -> Result<HashMap<String, Vec<Payment>>, StorageError> {
782        if parent_payment_ids.is_empty() {
783            return Ok(HashMap::new());
784        }
785
786        let connection = self.get_connection()?;
787
788        // Early exit if no related payments exist
789        let has_related: bool = connection.query_row(
790            "SELECT EXISTS(SELECT 1 FROM payment_metadata WHERE parent_payment_id IS NOT NULL LIMIT 1)",
791            [],
792            |row| row.get(0),
793        )?;
794        if !has_related {
795            return Ok(HashMap::new());
796        }
797
798        // Build the IN clause with placeholders
799        let placeholders: Vec<&str> = parent_payment_ids.iter().map(|_| "?").collect();
800        let in_clause = placeholders.join(", ");
801
802        let query = format!(
803            "{SELECT_PAYMENT_SQL} WHERE pm.parent_payment_id IN ({in_clause}) ORDER BY p.timestamp ASC"
804        );
805
806        let mut stmt = connection.prepare(&query)?;
807        let params: Vec<&dyn ToSql> = parent_payment_ids
808            .iter()
809            .map(|id| id as &dyn ToSql)
810            .collect();
811        let rows = stmt.query_map(params.as_slice(), |row| {
812            let payment = map_payment(row)?;
813            let parent_payment_id: String = row.get(30)?;
814            Ok((parent_payment_id, payment))
815        })?;
816
817        let mut result: HashMap<String, Vec<Payment>> = HashMap::new();
818        for row in rows {
819            let (parent_id, related_payment) = row?;
820            result.entry(parent_id).or_default().push(related_payment);
821        }
822
823        Ok(result)
824    }
825
826    async fn add_deposit(
827        &self,
828        txid: String,
829        vout: u32,
830        amount_sats: u64,
831    ) -> Result<(), StorageError> {
832        let connection = self.get_connection()?;
833        connection.execute(
834            "INSERT OR IGNORE INTO unclaimed_deposits (txid, vout, amount_sats) 
835             VALUES (?, ?, ?)",
836            params![txid, vout, amount_sats,],
837        )?;
838        Ok(())
839    }
840
841    async fn delete_deposit(&self, txid: String, vout: u32) -> Result<(), StorageError> {
842        let connection = self.get_connection()?;
843        connection.execute(
844            "DELETE FROM unclaimed_deposits WHERE txid = ? AND vout = ?",
845            params![txid, vout],
846        )?;
847        Ok(())
848    }
849
850    async fn list_deposits(&self) -> Result<Vec<DepositInfo>, StorageError> {
851        let connection = self.get_connection()?;
852        let mut stmt =
853            connection.prepare("SELECT txid, vout, amount_sats, claim_error, refund_tx, refund_tx_id FROM unclaimed_deposits")?;
854        let rows = stmt.query_map(params![], |row| {
855            Ok(DepositInfo {
856                txid: row.get(0)?,
857                vout: row.get(1)?,
858                amount_sats: row.get(2)?,
859                claim_error: row.get(3)?,
860                refund_tx: row.get(4)?,
861                refund_tx_id: row.get(5)?,
862            })
863        })?;
864        let mut deposits = Vec::new();
865        for row in rows {
866            deposits.push(row?);
867        }
868        Ok(deposits)
869    }
870
871    async fn update_deposit(
872        &self,
873        txid: String,
874        vout: u32,
875        payload: UpdateDepositPayload,
876    ) -> Result<(), StorageError> {
877        let connection = self.get_connection()?;
878        match payload {
879            UpdateDepositPayload::ClaimError { error } => {
880                connection.execute(
881                    "UPDATE unclaimed_deposits SET claim_error = ?, refund_tx = NULL, refund_tx_id = NULL WHERE txid = ? AND vout = ?",
882                    params![error, txid, vout],
883                )?;
884            }
885            UpdateDepositPayload::Refund {
886                refund_txid,
887                refund_tx,
888            } => {
889                connection.execute(
890                    "UPDATE unclaimed_deposits SET refund_tx = ?, refund_tx_id = ?, claim_error = NULL WHERE txid = ? AND vout = ?",
891                    params![refund_tx, refund_txid, txid, vout],
892                )?;
893            }
894        }
895        Ok(())
896    }
897
898    async fn set_lnurl_metadata(
899        &self,
900        metadata: Vec<SetLnurlMetadataItem>,
901    ) -> Result<(), StorageError> {
902        let connection = self.get_connection()?;
903        for metadata in metadata {
904            connection.execute(
905                "INSERT OR REPLACE INTO lnurl_receive_metadata (payment_hash, nostr_zap_request, nostr_zap_receipt, sender_comment, preimage)
906                 VALUES (?, ?, ?, ?, ?)",
907                params![
908                    metadata.payment_hash,
909                    metadata.nostr_zap_request,
910                    metadata.nostr_zap_receipt,
911                    metadata.sender_comment,
912                    metadata.preimage,
913                ],
914            )?;
915        }
916        Ok(())
917    }
918
919    async fn list_contacts(
920        &self,
921        request: ListContactsRequest,
922    ) -> Result<Vec<Contact>, StorageError> {
923        let limit = request.limit.unwrap_or(u32::MAX);
924        let offset = request.offset.unwrap_or(0);
925        let connection = self.get_connection()?;
926        let query = "SELECT id, name, payment_identifier, created_at, updated_at FROM contacts ORDER BY name ASC LIMIT ? OFFSET ?";
927
928        let mut stmt = connection.prepare(query)?;
929        let contacts = stmt
930            .query_map(params![limit, offset], |row| {
931                Ok(Contact {
932                    id: row.get(0)?,
933                    name: row.get(1)?,
934                    payment_identifier: row.get(2)?,
935                    created_at: row.get(3)?,
936                    updated_at: row.get(4)?,
937                })
938            })?
939            .collect::<Result<Vec<_>, _>>()?;
940        Ok(contacts)
941    }
942
943    async fn get_contact(&self, id: String) -> Result<Contact, StorageError> {
944        let connection = self.get_connection()?;
945        let mut stmt = connection.prepare(
946            "SELECT id, name, payment_identifier, created_at, updated_at FROM contacts WHERE id = ?",
947        )?;
948        stmt.query_row(params![id], |row| {
949            Ok(Contact {
950                id: row.get(0)?,
951                name: row.get(1)?,
952                payment_identifier: row.get(2)?,
953                created_at: row.get(3)?,
954                updated_at: row.get(4)?,
955            })
956        })
957        .map_err(|e| match e {
958            rusqlite::Error::QueryReturnedNoRows => StorageError::NotFound,
959            other => other.into(),
960        })
961    }
962
963    async fn insert_contact(&self, contact: Contact) -> Result<(), StorageError> {
964        let connection = self.get_connection()?;
965        connection.execute(
966            "INSERT INTO contacts (id, name, payment_identifier, created_at, updated_at)
967             VALUES (?, ?, ?, ?, ?)
968             ON CONFLICT(id) DO UPDATE SET
969               name = excluded.name,
970               payment_identifier = excluded.payment_identifier,
971               updated_at = excluded.updated_at",
972            params![
973                contact.id,
974                contact.name,
975                contact.payment_identifier,
976                contact.created_at,
977                contact.updated_at,
978            ],
979        )?;
980        Ok(())
981    }
982
983    async fn delete_contact(&self, id: String) -> Result<(), StorageError> {
984        let connection = self.get_connection()?;
985        connection.execute("DELETE FROM contacts WHERE id = ?", params![id])?;
986        Ok(())
987    }
988
989    async fn add_outgoing_change(
990        &self,
991        record: UnversionedRecordChange,
992    ) -> Result<u64, StorageError> {
993        let mut connection = self.get_connection()?;
994        let tx = connection
995            .transaction_with_behavior(TransactionBehavior::Immediate)
996            .map_err(map_sqlite_error)?;
997
998        // This revision is a local queue id for pending rows, not a server revision.
999        let local_revision: u64 = tx
1000            .query_row(
1001                "SELECT COALESCE(MAX(revision), 0) + 1 FROM sync_outgoing",
1002                [],
1003                |row| row.get(0),
1004            )
1005            .map_err(map_sqlite_error)?;
1006
1007        tx.execute(
1008            "INSERT INTO sync_outgoing (
1009                record_type
1010            ,   data_id
1011            ,   schema_version
1012            ,   commit_time
1013            ,   updated_fields_json
1014            ,   revision
1015            )
1016             VALUES (?, ?, ?, strftime('%s','now'), ?, ?)",
1017            params![
1018                record.id.r#type,
1019                record.id.data_id,
1020                record.schema_version.clone(),
1021                serde_json::to_string(&record.updated_fields)?,
1022                local_revision,
1023            ],
1024        )
1025        .map_err(map_sqlite_error)?;
1026
1027        tx.commit().map_err(map_sqlite_error)?;
1028        Ok(local_revision)
1029    }
1030
1031    async fn complete_outgoing_sync(
1032        &self,
1033        record: Record,
1034        local_revision: u64,
1035    ) -> Result<(), StorageError> {
1036        let mut connection = self.get_connection()?;
1037        let tx = connection
1038            .transaction_with_behavior(TransactionBehavior::Immediate)
1039            .map_err(map_sqlite_error)?;
1040
1041        let rows_deleted = tx
1042            .execute(
1043                "DELETE FROM sync_outgoing WHERE record_type = ? AND data_id = ? AND revision = ?",
1044                params![record.id.r#type, record.id.data_id, local_revision],
1045            )
1046            .map_err(map_sqlite_error)?;
1047
1048        if rows_deleted == 0 {
1049            warn!(
1050                "complete_outgoing_sync: DELETE from sync_outgoing matched 0 rows \
1051                 (type={}, data_id={}, revision={})",
1052                record.id.r#type, record.id.data_id, local_revision
1053            );
1054        }
1055
1056        tx.execute(
1057            "INSERT OR REPLACE INTO sync_state (
1058                record_type
1059            ,   data_id
1060            ,   schema_version
1061            ,   commit_time
1062            ,   data
1063            ,   revision
1064            )
1065             VALUES (?, ?, ?, strftime('%s','now'), ?, ?)",
1066            params![
1067                record.id.r#type,
1068                record.id.data_id,
1069                record.schema_version.clone(),
1070                serde_json::to_string(&record.data)?,
1071                record.revision,
1072            ],
1073        )
1074        .map_err(map_sqlite_error)?;
1075
1076        tx.execute(
1077            "UPDATE sync_revision SET revision = MAX(revision, ?)",
1078            params![record.revision],
1079        )
1080        .map_err(map_sqlite_error)?;
1081
1082        tx.commit().map_err(map_sqlite_error)?;
1083        Ok(())
1084    }
1085
1086    async fn get_pending_outgoing_changes(
1087        &self,
1088        limit: u32,
1089    ) -> Result<Vec<OutgoingChange>, StorageError> {
1090        let connection = self.get_connection()?;
1091
1092        let mut stmt = connection
1093            .prepare(
1094                "SELECT o.record_type
1095            ,       o.data_id
1096            ,       o.schema_version
1097            ,       o.commit_time
1098            ,       o.updated_fields_json
1099            ,       o.revision
1100            ,       e.schema_version AS existing_schema_version
1101            ,       e.commit_time AS existing_commit_time
1102            ,       e.data AS existing_data
1103            ,       e.revision AS existing_revision
1104             FROM sync_outgoing o
1105             LEFT JOIN sync_state e ON o.record_type = e.record_type AND o.data_id = e.data_id
1106             ORDER BY o.revision ASC
1107             LIMIT ?",
1108            )
1109            .map_err(map_sqlite_error)?;
1110        let mut rows = stmt.query(params![limit]).map_err(map_sqlite_error)?;
1111        let mut results = Vec::new();
1112        while let Some(row) = rows.next().map_err(map_sqlite_error)? {
1113            let parent = if let Some(existing_data) =
1114                row.get::<_, Option<String>>(8).map_err(map_sqlite_error)?
1115            {
1116                Some(Record {
1117                    id: RecordId::new(
1118                        row.get::<_, String>(0).map_err(map_sqlite_error)?,
1119                        row.get::<_, String>(1).map_err(map_sqlite_error)?,
1120                    ),
1121                    schema_version: row.get(6).map_err(map_sqlite_error)?,
1122                    revision: row.get(9).map_err(map_sqlite_error)?,
1123                    data: serde_json::from_str(&existing_data)?,
1124                })
1125            } else {
1126                None
1127            };
1128            let change = RecordChange {
1129                id: RecordId::new(
1130                    row.get::<_, String>(0).map_err(map_sqlite_error)?,
1131                    row.get::<_, String>(1).map_err(map_sqlite_error)?,
1132                ),
1133                schema_version: row.get(2).map_err(map_sqlite_error)?,
1134                updated_fields: serde_json::from_str(
1135                    &row.get::<_, String>(4).map_err(map_sqlite_error)?,
1136                )?,
1137                local_revision: row.get(5).map_err(map_sqlite_error)?,
1138            };
1139            results.push(OutgoingChange { change, parent });
1140        }
1141
1142        Ok(results)
1143    }
1144
1145    async fn get_last_revision(&self) -> Result<u64, StorageError> {
1146        let connection = self.get_connection()?;
1147
1148        let revision: u64 = connection
1149            .query_row("SELECT revision FROM sync_revision", [], |row| row.get(0))
1150            .map_err(map_sqlite_error)?;
1151
1152        Ok(revision)
1153    }
1154
1155    async fn insert_incoming_records(&self, records: Vec<Record>) -> Result<(), StorageError> {
1156        if records.is_empty() {
1157            return Ok(());
1158        }
1159
1160        let mut connection = self.get_connection()?;
1161        let tx = connection
1162            .transaction_with_behavior(TransactionBehavior::Immediate)
1163            .map_err(map_sqlite_error)?;
1164
1165        for record in records {
1166            tx.execute(
1167                "INSERT OR REPLACE INTO sync_incoming (
1168                    record_type
1169                ,   data_id
1170                ,   schema_version
1171                ,   commit_time
1172                ,   data
1173                ,   revision
1174                )
1175                 VALUES (?, ?, ?, strftime('%s','now'), ?, ?)",
1176                params![
1177                    record.id.r#type,
1178                    record.id.data_id,
1179                    record.schema_version.clone(),
1180                    serde_json::to_string(&record.data)?,
1181                    record.revision,
1182                ],
1183            )
1184            .map_err(map_sqlite_error)?;
1185        }
1186
1187        tx.commit().map_err(map_sqlite_error)?;
1188        Ok(())
1189    }
1190
1191    async fn delete_incoming_record(&self, record: Record) -> Result<(), StorageError> {
1192        let connection = self.get_connection()?;
1193
1194        connection
1195            .execute(
1196                "DELETE FROM sync_incoming WHERE record_type = ? AND data_id = ? AND revision = ?",
1197                params![record.id.r#type, record.id.data_id, record.revision],
1198            )
1199            .map_err(map_sqlite_error)?;
1200
1201        Ok(())
1202    }
1203
1204    async fn get_incoming_records(&self, limit: u32) -> Result<Vec<IncomingChange>, StorageError> {
1205        let connection = self.get_connection()?;
1206
1207        let mut stmt = connection
1208            .prepare(
1209                "SELECT i.record_type
1210            ,       i.data_id
1211            ,       i.schema_version
1212            ,       i.data
1213            ,       i.revision
1214            ,       e.schema_version AS existing_schema_version
1215            ,       e.commit_time AS existing_commit_time
1216            ,       e.data AS existing_data
1217            ,       e.revision AS existing_revision
1218             FROM sync_incoming i
1219             LEFT JOIN sync_state e ON i.record_type = e.record_type AND i.data_id = e.data_id
1220             ORDER BY i.revision ASC
1221             LIMIT ?",
1222            )
1223            .map_err(map_sqlite_error)?;
1224
1225        let mut rows = stmt.query(params![limit]).map_err(map_sqlite_error)?;
1226        let mut results = Vec::new();
1227
1228        while let Some(row) = rows.next().map_err(map_sqlite_error)? {
1229            let parent = if let Some(existing_data) =
1230                row.get::<_, Option<String>>(7).map_err(map_sqlite_error)?
1231            {
1232                Some(Record {
1233                    id: RecordId::new(
1234                        row.get::<_, String>(0).map_err(map_sqlite_error)?,
1235                        row.get::<_, String>(1).map_err(map_sqlite_error)?,
1236                    ),
1237                    schema_version: row.get(5).map_err(map_sqlite_error)?,
1238                    revision: row.get(8).map_err(map_sqlite_error)?,
1239                    data: serde_json::from_str(&existing_data)?,
1240                })
1241            } else {
1242                None
1243            };
1244            let record = Record {
1245                id: RecordId::new(
1246                    row.get::<_, String>(0).map_err(map_sqlite_error)?,
1247                    row.get::<_, String>(1).map_err(map_sqlite_error)?,
1248                ),
1249                schema_version: row.get(2).map_err(map_sqlite_error)?,
1250                data: serde_json::from_str(&row.get::<_, String>(3).map_err(map_sqlite_error)?)?,
1251                revision: row.get(4).map_err(map_sqlite_error)?,
1252            };
1253            results.push(IncomingChange {
1254                new_state: record,
1255                old_state: parent,
1256            });
1257        }
1258
1259        Ok(results)
1260    }
1261
1262    async fn get_latest_outgoing_change(&self) -> Result<Option<OutgoingChange>, StorageError> {
1263        let connection = self.get_connection()?;
1264
1265        let mut stmt = connection
1266            .prepare(
1267                "SELECT o.record_type
1268            ,       o.data_id
1269            ,       o.schema_version
1270            ,       o.commit_time
1271            ,       o.updated_fields_json
1272            ,       o.revision
1273            ,       e.schema_version AS existing_schema_version
1274            ,       e.commit_time AS existing_commit_time
1275            ,       e.data AS existing_data
1276            ,       e.revision AS existing_revision
1277             FROM sync_outgoing o
1278             LEFT JOIN sync_state e ON o.record_type = e.record_type AND o.data_id = e.data_id
1279             ORDER BY o.revision DESC
1280             LIMIT 1",
1281            )
1282            .map_err(map_sqlite_error)?;
1283
1284        let mut rows = stmt.query([]).map_err(map_sqlite_error)?;
1285
1286        if let Some(row) = rows.next().map_err(map_sqlite_error)? {
1287            let parent = if let Some(existing_data) =
1288                row.get::<_, Option<String>>(8).map_err(map_sqlite_error)?
1289            {
1290                Some(Record {
1291                    id: RecordId::new(
1292                        row.get::<_, String>(0).map_err(map_sqlite_error)?,
1293                        row.get::<_, String>(1).map_err(map_sqlite_error)?,
1294                    ),
1295                    schema_version: row.get(6).map_err(map_sqlite_error)?,
1296                    revision: row.get(9).map_err(map_sqlite_error)?,
1297                    data: serde_json::from_str(&existing_data)?,
1298                })
1299            } else {
1300                None
1301            };
1302            let change = RecordChange {
1303                id: RecordId::new(
1304                    row.get::<_, String>(0).map_err(map_sqlite_error)?,
1305                    row.get::<_, String>(1).map_err(map_sqlite_error)?,
1306                ),
1307                schema_version: row.get(2).map_err(map_sqlite_error)?,
1308                updated_fields: serde_json::from_str(
1309                    &row.get::<_, String>(4).map_err(map_sqlite_error)?,
1310                )?,
1311                local_revision: row.get(5).map_err(map_sqlite_error)?,
1312            };
1313
1314            return Ok(Some(OutgoingChange { change, parent }));
1315        }
1316
1317        Ok(None)
1318    }
1319
1320    async fn update_record_from_incoming(&self, record: Record) -> Result<(), StorageError> {
1321        let mut connection = self.get_connection()?;
1322        let tx = connection
1323            .transaction_with_behavior(TransactionBehavior::Immediate)
1324            .map_err(map_sqlite_error)?;
1325
1326        tx.execute(
1327            "INSERT OR REPLACE INTO sync_state (
1328                record_type
1329            ,   data_id
1330            ,   schema_version
1331            ,   commit_time
1332            ,   data
1333            ,   revision
1334            )
1335             VALUES (?, ?, ?, strftime('%s','now'), ?, ?)",
1336            params![
1337                record.id.r#type,
1338                record.id.data_id,
1339                record.schema_version.clone(),
1340                serde_json::to_string(&record.data)?,
1341                record.revision,
1342            ],
1343        )
1344        .map_err(map_sqlite_error)?;
1345
1346        tx.execute(
1347            "UPDATE sync_revision SET revision = MAX(revision, ?)",
1348            params![record.revision],
1349        )
1350        .map_err(map_sqlite_error)?;
1351
1352        tx.commit().map_err(map_sqlite_error)?;
1353        Ok(())
1354    }
1355}
1356
1357/// Base query for payment lookups.
1358/// Column indices 0-29 are used by `map_payment`, index 30 (`parent_payment_id`) is only used by `get_payments_by_parent_ids`.
1359const SELECT_PAYMENT_SQL: &str = "
1360    SELECT p.id,
1361           p.payment_type,
1362           p.status,
1363           p.amount,
1364           p.fees,
1365           p.timestamp,
1366           p.method,
1367           p.withdraw_tx_id,
1368           p.deposit_tx_id,
1369           p.spark,
1370           l.invoice AS lightning_invoice,
1371           l.payment_hash AS lightning_payment_hash,
1372           l.destination_pubkey AS lightning_destination_pubkey,
1373           COALESCE(l.description, pm.lnurl_description) AS lightning_description,
1374           l.preimage AS lightning_preimage,
1375           l.htlc_status AS lightning_htlc_status,
1376           l.htlc_expiry_time AS lightning_htlc_expiry_time,
1377           pm.lnurl_pay_info,
1378           pm.lnurl_withdraw_info,
1379           pm.conversion_info,
1380           t.metadata AS token_metadata,
1381           t.tx_hash AS token_tx_hash,
1382           t.tx_type AS token_tx_type,
1383           t.invoice_details AS token_invoice_details,
1384           s.invoice_details AS spark_invoice_details,
1385           s.htlc_details AS spark_htlc_details,
1386           lrm.nostr_zap_request AS lnurl_nostr_zap_request,
1387           lrm.nostr_zap_receipt AS lnurl_nostr_zap_receipt,
1388           lrm.sender_comment AS lnurl_sender_comment,
1389           lrm.payment_hash AS lnurl_payment_hash,
1390           pm.parent_payment_id
1391      FROM payments p
1392      LEFT JOIN payment_details_lightning l ON p.id = l.payment_id
1393      LEFT JOIN payment_details_token t ON p.id = t.payment_id
1394      LEFT JOIN payment_details_spark s ON p.id = s.payment_id
1395      LEFT JOIN payment_metadata pm ON p.id = pm.payment_id
1396      LEFT JOIN lnurl_receive_metadata lrm ON l.payment_hash = lrm.payment_hash";
1397
1398#[allow(clippy::too_many_lines)]
1399fn map_payment(row: &Row<'_>) -> Result<Payment, rusqlite::Error> {
1400    let withdraw_tx_id: Option<String> = row.get(7)?;
1401    let deposit_tx_id: Option<String> = row.get(8)?;
1402    let spark: Option<i32> = row.get(9)?;
1403    let lightning_invoice: Option<String> = row.get(10)?;
1404    let token_metadata: Option<String> = row.get(20)?;
1405    let details = match (
1406        lightning_invoice,
1407        withdraw_tx_id,
1408        deposit_tx_id,
1409        spark,
1410        token_metadata,
1411    ) {
1412        (Some(invoice), _, _, _, _) => {
1413            let payment_hash: String = row.get(11)?;
1414            let destination_pubkey: String = row.get(12)?;
1415            let description: Option<String> = row.get(13)?;
1416            let preimage: Option<String> = row.get(14)?;
1417            let htlc_status: SparkHtlcStatus =
1418                row.get::<_, Option<SparkHtlcStatus>>(15)?.ok_or_else(|| {
1419                    rusqlite::Error::FromSqlConversionFailure(
1420                        15,
1421                        rusqlite::types::Type::Null,
1422                        "htlc_status is required for Lightning payments".into(),
1423                    )
1424                })?;
1425            let htlc_expiry_time: u64 = row.get(16)?;
1426            let htlc_details = SparkHtlcDetails {
1427                payment_hash,
1428                preimage,
1429                expiry_time: htlc_expiry_time,
1430                status: htlc_status,
1431            };
1432            let lnurl_pay_info: Option<LnurlPayInfo> = row.get(17)?;
1433            let lnurl_withdraw_info: Option<LnurlWithdrawInfo> = row.get(18)?;
1434            let lnurl_nostr_zap_request: Option<String> = row.get(26)?;
1435            let lnurl_nostr_zap_receipt: Option<String> = row.get(27)?;
1436            let lnurl_sender_comment: Option<String> = row.get(28)?;
1437            let lnurl_payment_hash: Option<String> = row.get(29)?;
1438            let lnurl_receive_metadata = if lnurl_payment_hash.is_some() {
1439                Some(LnurlReceiveMetadata {
1440                    nostr_zap_request: lnurl_nostr_zap_request,
1441                    nostr_zap_receipt: lnurl_nostr_zap_receipt,
1442                    sender_comment: lnurl_sender_comment,
1443                })
1444            } else {
1445                None
1446            };
1447            Some(PaymentDetails::Lightning {
1448                invoice,
1449                destination_pubkey,
1450                description,
1451                htlc_details,
1452                lnurl_pay_info,
1453                lnurl_withdraw_info,
1454                lnurl_receive_metadata,
1455            })
1456        }
1457        (_, Some(tx_id), _, _, _) => Some(PaymentDetails::Withdraw { tx_id }),
1458        (_, _, Some(tx_id), _, _) => Some(PaymentDetails::Deposit { tx_id }),
1459        (_, _, _, Some(_), _) => {
1460            let invoice_details_str: Option<String> = row.get(24)?;
1461            let invoice_details = invoice_details_str
1462                .map(|s| serde_json_from_str(&s, 24))
1463                .transpose()?;
1464            let htlc_details_str: Option<String> = row.get(25)?;
1465            let htlc_details = htlc_details_str
1466                .map(|s| serde_json_from_str(&s, 25))
1467                .transpose()?;
1468            let conversion_info_str: Option<String> = row.get(19)?;
1469            let conversion_info: Option<ConversionInfo> = conversion_info_str
1470                .map(|s: String| serde_json_from_str(&s, 19))
1471                .transpose()?;
1472            Some(PaymentDetails::Spark {
1473                invoice_details,
1474                htlc_details,
1475                conversion_info,
1476            })
1477        }
1478        (_, _, _, _, Some(metadata)) => {
1479            let tx_type: TokenTransactionType = row.get(22)?;
1480            let invoice_details_str: Option<String> = row.get(23)?;
1481            let invoice_details = invoice_details_str
1482                .map(|s| serde_json_from_str(&s, 23))
1483                .transpose()?;
1484            let conversion_info_str: Option<String> = row.get(19)?;
1485            let conversion_info: Option<ConversionInfo> = conversion_info_str
1486                .map(|s: String| serde_json_from_str(&s, 19))
1487                .transpose()?;
1488            Some(PaymentDetails::Token {
1489                metadata: serde_json_from_str(&metadata, 20)?,
1490                tx_hash: row.get(21)?,
1491                tx_type,
1492                invoice_details,
1493                conversion_info,
1494            })
1495        }
1496        _ => None,
1497    };
1498    Ok(Payment {
1499        id: row.get(0)?,
1500        payment_type: row.get::<_, String>(1)?.parse().map_err(|e: String| {
1501            rusqlite::Error::FromSqlConversionFailure(1, rusqlite::types::Type::Text, e.into())
1502        })?,
1503        status: row.get::<_, String>(2)?.parse().map_err(|e: String| {
1504            rusqlite::Error::FromSqlConversionFailure(2, rusqlite::types::Type::Text, e.into())
1505        })?,
1506        amount: row.get::<_, U128SqlWrapper>(3)?.0,
1507        fees: row.get::<_, U128SqlWrapper>(4)?.0,
1508        timestamp: row.get(5)?,
1509        details,
1510        method: row.get(6)?,
1511        conversion_details: None,
1512    })
1513}
1514
1515impl ToSql for PaymentDetails {
1516    fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
1517        to_sql_json(self)
1518    }
1519}
1520
1521impl FromSql for PaymentDetails {
1522    fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
1523        from_sql_json(value)
1524    }
1525}
1526
1527impl ToSql for PaymentMethod {
1528    fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
1529        Ok(rusqlite::types::ToSqlOutput::from(self.to_string()))
1530    }
1531}
1532
1533impl FromSql for PaymentMethod {
1534    fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
1535        match value {
1536            ValueRef::Text(i) => {
1537                let s = std::str::from_utf8(i).map_err(|e| FromSqlError::Other(Box::new(e)))?;
1538                // NOTE: trim_matches/to_lowercase is here, because this used to be serde_json serialized.
1539                let payment_method: PaymentMethod = s
1540                    .trim_matches('"')
1541                    .to_lowercase()
1542                    .parse()
1543                    .map_err(|()| FromSqlError::InvalidType)?;
1544                Ok(payment_method)
1545            }
1546            _ => Err(FromSqlError::InvalidType),
1547        }
1548    }
1549}
1550
1551impl ToSql for TokenTransactionType {
1552    fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
1553        Ok(rusqlite::types::ToSqlOutput::from(self.to_string()))
1554    }
1555}
1556
1557impl FromSql for TokenTransactionType {
1558    fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
1559        match value {
1560            ValueRef::Text(i) => {
1561                let s = std::str::from_utf8(i).map_err(|e| FromSqlError::Other(Box::new(e)))?;
1562                let tx_type: TokenTransactionType =
1563                    s.parse().map_err(|_: String| FromSqlError::InvalidType)?;
1564                Ok(tx_type)
1565            }
1566            _ => Err(FromSqlError::InvalidType),
1567        }
1568    }
1569}
1570
1571impl ToSql for SparkHtlcStatus {
1572    fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
1573        Ok(rusqlite::types::ToSqlOutput::from(self.to_string()))
1574    }
1575}
1576
1577impl FromSql for SparkHtlcStatus {
1578    fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
1579        match value {
1580            ValueRef::Text(i) => {
1581                let s = std::str::from_utf8(i).map_err(|e| FromSqlError::Other(Box::new(e)))?;
1582                let status: SparkHtlcStatus =
1583                    s.parse().map_err(|_: String| FromSqlError::InvalidType)?;
1584                Ok(status)
1585            }
1586            _ => Err(FromSqlError::InvalidType),
1587        }
1588    }
1589}
1590
1591impl ToSql for DepositClaimError {
1592    fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
1593        to_sql_json(self)
1594    }
1595}
1596
1597impl FromSql for DepositClaimError {
1598    fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
1599        from_sql_json(value)
1600    }
1601}
1602
1603impl ToSql for LnurlPayInfo {
1604    fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
1605        to_sql_json(self)
1606    }
1607}
1608
1609impl FromSql for LnurlPayInfo {
1610    fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
1611        from_sql_json(value)
1612    }
1613}
1614
1615impl ToSql for LnurlWithdrawInfo {
1616    fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
1617        to_sql_json(self)
1618    }
1619}
1620
1621impl FromSql for LnurlWithdrawInfo {
1622    fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
1623        from_sql_json(value)
1624    }
1625}
1626
1627fn to_sql_json<T>(value: T) -> rusqlite::Result<ToSqlOutput<'static>>
1628where
1629    T: serde::Serialize,
1630{
1631    let json = serde_json::to_string(&value)
1632        .map_err(|e| rusqlite::Error::ToSqlConversionFailure(Box::new(e)))?;
1633    Ok(rusqlite::types::ToSqlOutput::from(json))
1634}
1635
1636fn from_sql_json<T>(value: ValueRef<'_>) -> FromSqlResult<T>
1637where
1638    T: serde::de::DeserializeOwned,
1639{
1640    match value {
1641        ValueRef::Text(i) => {
1642            let s = std::str::from_utf8(i).map_err(|e| FromSqlError::Other(Box::new(e)))?;
1643            let deserialized: T = serde_json::from_str(s).map_err(|_| FromSqlError::InvalidType)?;
1644            Ok(deserialized)
1645        }
1646        _ => Err(FromSqlError::InvalidType),
1647    }
1648}
1649
1650fn serde_json_from_str<T>(value: &str, index: usize) -> Result<T, rusqlite::Error>
1651where
1652    T: serde::de::DeserializeOwned,
1653{
1654    serde_json::from_str(value).map_err(|e| {
1655        rusqlite::Error::FromSqlConversionFailure(index, rusqlite::types::Type::Text, Box::new(e))
1656    })
1657}
1658
1659struct U128SqlWrapper(u128);
1660
1661impl ToSql for U128SqlWrapper {
1662    fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
1663        let string = self.0.to_string();
1664        Ok(rusqlite::types::ToSqlOutput::from(string))
1665    }
1666}
1667
1668impl FromSql for U128SqlWrapper {
1669    fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
1670        match value {
1671            ValueRef::Text(i) => {
1672                let s = std::str::from_utf8(i).map_err(|e| FromSqlError::Other(Box::new(e)))?;
1673                let integer = s.parse::<u128>().map_err(|_| FromSqlError::InvalidType)?;
1674                Ok(U128SqlWrapper(integer))
1675            }
1676            _ => Err(FromSqlError::InvalidType),
1677        }
1678    }
1679}
1680
1681#[cfg(test)]
1682mod tests {
1683
1684    use crate::SqliteStorage;
1685    use std::path::PathBuf;
1686
1687    /// Helper function to create a temporary directory for tests
1688    /// Uses std library to avoid external dependency
1689    fn create_temp_dir(name: &str) -> PathBuf {
1690        let mut path = std::env::temp_dir();
1691        // Use UUID for uniqueness to avoid conflicts between parallel tests
1692        path.push(format!("breez-test-{}-{}", name, uuid::Uuid::new_v4()));
1693        std::fs::create_dir_all(&path).unwrap();
1694        path
1695    }
1696
1697    #[tokio::test]
1698    async fn test_storage() {
1699        let temp_dir = create_temp_dir("sqlite_storage");
1700        let storage = SqliteStorage::new(&temp_dir).unwrap();
1701
1702        Box::pin(crate::persist::tests::test_storage(Box::new(storage))).await;
1703    }
1704
1705    #[tokio::test]
1706    async fn test_unclaimed_deposits_crud() {
1707        let temp_dir = create_temp_dir("sqlite_storage_deposits");
1708        let storage = SqliteStorage::new(&temp_dir).unwrap();
1709
1710        crate::persist::tests::test_unclaimed_deposits_crud(Box::new(storage)).await;
1711    }
1712
1713    #[tokio::test]
1714    async fn test_deposit_refunds() {
1715        let temp_dir = create_temp_dir("sqlite_storage_refund_tx");
1716        let storage = SqliteStorage::new(&temp_dir).unwrap();
1717
1718        crate::persist::tests::test_deposit_refunds(Box::new(storage)).await;
1719    }
1720
1721    #[tokio::test]
1722    async fn test_payment_type_filtering() {
1723        let temp_dir = create_temp_dir("sqlite_storage_type_filter");
1724        let storage = SqliteStorage::new(&temp_dir).unwrap();
1725
1726        crate::persist::tests::test_payment_type_filtering(Box::new(storage)).await;
1727    }
1728
1729    #[tokio::test]
1730    async fn test_payment_status_filtering() {
1731        let temp_dir = create_temp_dir("sqlite_storage_status_filter");
1732        let storage = SqliteStorage::new(&temp_dir).unwrap();
1733
1734        crate::persist::tests::test_payment_status_filtering(Box::new(storage)).await;
1735    }
1736
1737    #[tokio::test]
1738    async fn test_payment_asset_filtering() {
1739        let temp_dir = create_temp_dir("sqlite_storage_asset_filter");
1740        let storage = SqliteStorage::new(&temp_dir).unwrap();
1741
1742        crate::persist::tests::test_asset_filtering(Box::new(storage)).await;
1743    }
1744
1745    #[tokio::test]
1746    async fn test_timestamp_filtering() {
1747        let temp_dir = create_temp_dir("sqlite_storage_timestamp_filter");
1748        let storage = SqliteStorage::new(&temp_dir).unwrap();
1749
1750        crate::persist::tests::test_timestamp_filtering(Box::new(storage)).await;
1751    }
1752
1753    #[tokio::test]
1754    async fn test_spark_htlc_status_filtering() {
1755        let temp_dir = create_temp_dir("sqlite_storage_htlc_filter");
1756        let storage = SqliteStorage::new(&temp_dir).unwrap();
1757
1758        crate::persist::tests::test_spark_htlc_status_filtering(Box::new(storage)).await;
1759    }
1760
1761    #[tokio::test]
1762    async fn test_lightning_htlc_details_and_status_filtering() {
1763        let temp_dir = create_temp_dir("sqlite_storage_htlc_details");
1764        let storage = SqliteStorage::new(&temp_dir).unwrap();
1765
1766        crate::persist::tests::test_lightning_htlc_details_and_status_filtering(Box::new(storage))
1767            .await;
1768    }
1769
1770    #[tokio::test]
1771    async fn test_conversion_refund_needed_filtering() {
1772        let temp_dir = create_temp_dir("sqlite_storage_conversion_refund_needed_filter");
1773        let storage = SqliteStorage::new(&temp_dir).unwrap();
1774
1775        crate::persist::tests::test_conversion_refund_needed_filtering(Box::new(storage)).await;
1776    }
1777
1778    #[tokio::test]
1779    async fn test_token_transaction_type_filtering() {
1780        let temp_dir = create_temp_dir("sqlite_storage_token_transaction_type_filter");
1781        let storage = SqliteStorage::new(&temp_dir).unwrap();
1782
1783        crate::persist::tests::test_token_transaction_type_filtering(Box::new(storage)).await;
1784    }
1785
1786    #[tokio::test]
1787    async fn test_combined_filters() {
1788        let temp_dir = create_temp_dir("sqlite_storage_combined_filter");
1789        let storage = SqliteStorage::new(&temp_dir).unwrap();
1790
1791        crate::persist::tests::test_combined_filters(Box::new(storage)).await;
1792    }
1793
1794    #[tokio::test]
1795    async fn test_sort_order() {
1796        let temp_dir = create_temp_dir("sqlite_storage_sort_order");
1797        let storage = SqliteStorage::new(&temp_dir).unwrap();
1798
1799        crate::persist::tests::test_sort_order(Box::new(storage)).await;
1800    }
1801
1802    #[tokio::test]
1803    async fn test_payment_metadata() {
1804        let temp_dir = create_temp_dir("sqlite_storage_payment_request_metadata");
1805        let storage = SqliteStorage::new(&temp_dir).unwrap();
1806
1807        crate::persist::tests::test_payment_metadata(Box::new(storage)).await;
1808    }
1809
1810    #[tokio::test]
1811    async fn test_payment_details_update_persistence() {
1812        let temp_dir = create_temp_dir("sqlite_storage_payment_details_update");
1813        let storage = SqliteStorage::new(&temp_dir).unwrap();
1814
1815        crate::persist::tests::test_payment_details_update_persistence(Box::new(storage)).await;
1816    }
1817
1818    #[tokio::test]
1819    async fn test_pending_lnurl_preimages() {
1820        let temp_dir = create_temp_dir("sqlite_storage_pending_lnurl_preimages");
1821        let storage = SqliteStorage::new(&temp_dir).unwrap();
1822
1823        crate::persist::tests::test_pending_lnurl_preimages(Box::new(storage)).await;
1824    }
1825
1826    #[tokio::test]
1827    async fn test_sync_storage() {
1828        let temp_dir = create_temp_dir("sqlite_sync_storage");
1829        let storage = SqliteStorage::new(&temp_dir).unwrap();
1830
1831        crate::persist::tests::test_sync_storage(Box::new(storage)).await;
1832    }
1833
1834    #[tokio::test]
1835    async fn test_payment_metadata_merge() {
1836        let temp_dir = create_temp_dir("sqlite_payment_metadata_merge");
1837        let storage = SqliteStorage::new(&temp_dir).unwrap();
1838
1839        crate::persist::tests::test_payment_metadata_merge(Box::new(storage)).await;
1840    }
1841
1842    #[tokio::test]
1843    #[allow(clippy::too_many_lines)]
1844    async fn test_migration_tx_type() {
1845        use crate::{
1846            Payment, PaymentDetails, PaymentMethod, PaymentStatus, PaymentType, Storage,
1847            TokenMetadata, TokenTransactionType,
1848            persist::{StorageListPaymentsRequest, StoragePaymentDetailsFilter},
1849        };
1850        use rusqlite::{Connection, params};
1851        use rusqlite_migration::{M, Migrations};
1852
1853        let temp_dir = create_temp_dir("sqlite_migration_tx_type");
1854        let db_path = temp_dir.join(super::DEFAULT_DB_FILENAME);
1855
1856        // Step 1: Create database at version 21 (before tx_type migration)
1857        {
1858            let mut conn = Connection::open(&db_path).unwrap();
1859            let migrations_before_tx_type: Vec<_> = SqliteStorage::current_migrations()
1860                .iter()
1861                .take(22) // Migrations 0-21 (index 22 is the tx_type migration)
1862                .map(|s| M::up(s))
1863                .collect();
1864            let migrations = Migrations::new(migrations_before_tx_type);
1865            migrations.to_latest(&mut conn).unwrap();
1866        }
1867
1868        // Step 2: Insert a token payment WITHOUT tx_type column
1869        {
1870            let conn = Connection::open(&db_path).unwrap();
1871
1872            // Insert into payments table
1873            conn.execute(
1874                "INSERT INTO payments (id, payment_type, status, amount, fees, timestamp, method)
1875                 VALUES (?, ?, ?, ?, ?, ?, ?)",
1876                params![
1877                    "token-migration-test",
1878                    "send",
1879                    "completed",
1880                    "5000",
1881                    "10",
1882                    1_234_567_890_i64,
1883                    "\"token\""
1884                ],
1885            )
1886            .unwrap();
1887
1888            // Insert into payment_details_token WITHOUT tx_type (pre-migration)
1889            let metadata = serde_json::json!({
1890                "identifier": "test-token-id",
1891                "issuer_public_key": format!("02{}", "a".repeat(64)),
1892                "name": "Test Token",
1893                "ticker": "TST",
1894                "decimals": 8,
1895                "max_supply": 1_000_000_u128,
1896                "is_freezable": false
1897            });
1898
1899            conn.execute(
1900                "INSERT INTO payment_details_token (payment_id, metadata, tx_hash)
1901                 VALUES (?, ?, ?)",
1902                params![
1903                    "token-migration-test",
1904                    metadata.to_string(),
1905                    "0xabcdef1234567890"
1906                ],
1907            )
1908            .unwrap();
1909        }
1910
1911        // Step 3: Open with SqliteStorage (triggers migration to latest)
1912        let storage = SqliteStorage::new(&temp_dir).unwrap();
1913
1914        // Step 4: Verify the migrated token payment
1915        let migrated_payment = storage
1916            .get_payment_by_id("token-migration-test".to_string())
1917            .await
1918            .unwrap();
1919
1920        assert_eq!(migrated_payment.id, "token-migration-test");
1921        assert_eq!(migrated_payment.amount, 5000);
1922        assert_eq!(migrated_payment.fees, 10);
1923        assert_eq!(migrated_payment.status, PaymentStatus::Completed);
1924        assert_eq!(migrated_payment.payment_type, PaymentType::Send);
1925        assert_eq!(migrated_payment.method, PaymentMethod::Token);
1926
1927        // Verify token payment details have the default txType
1928        match migrated_payment.details {
1929            Some(PaymentDetails::Token {
1930                metadata,
1931                tx_hash,
1932                tx_type,
1933                ..
1934            }) => {
1935                assert_eq!(metadata.identifier, "test-token-id");
1936                assert_eq!(metadata.name, "Test Token");
1937                assert_eq!(metadata.ticker, "TST");
1938                assert_eq!(metadata.decimals, 8);
1939                assert_eq!(tx_hash, "0xabcdef1234567890");
1940                // Key assertion: migration added default tx_type
1941                assert_eq!(
1942                    tx_type,
1943                    TokenTransactionType::Transfer,
1944                    "Migration should add default txType 'transfer' to token payments"
1945                );
1946            }
1947            _ => panic!("Expected Token payment details"),
1948        }
1949
1950        // Step 5: Insert a new token payment with explicit tx_type
1951        let new_payment = Payment {
1952            id: "new-token-payment".to_string(),
1953            payment_type: PaymentType::Receive,
1954            status: PaymentStatus::Completed,
1955            amount: 8000,
1956            fees: 20,
1957            timestamp: 1_234_567_891,
1958            method: PaymentMethod::Token,
1959            details: Some(PaymentDetails::Token {
1960                metadata: TokenMetadata {
1961                    identifier: "another-token-id".to_string(),
1962                    issuer_public_key: format!("02{}", "b".repeat(64)),
1963                    name: "Another Token".to_string(),
1964                    ticker: "ATK".to_string(),
1965                    decimals: 6,
1966                    max_supply: 2_000_000,
1967                    is_freezable: true,
1968                },
1969                tx_hash: "0x1111222233334444".to_string(),
1970                tx_type: TokenTransactionType::Mint,
1971                invoice_details: None,
1972                conversion_info: None,
1973            }),
1974            conversion_details: None,
1975        };
1976
1977        storage.insert_payment(new_payment).await.unwrap();
1978
1979        // Step 6: List all payments
1980        let request = StorageListPaymentsRequest {
1981            type_filter: None,
1982            status_filter: None,
1983            asset_filter: None,
1984            payment_details_filter: None,
1985            from_timestamp: None,
1986            to_timestamp: None,
1987            offset: None,
1988            limit: None,
1989            sort_ascending: Some(true),
1990        };
1991
1992        let payments = storage.list_payments(request).await.unwrap();
1993        assert_eq!(payments.len(), 2, "Should have both payments");
1994
1995        // Verify migrated payment has Transfer type
1996        let migrated = payments
1997            .iter()
1998            .find(|p| p.id == "token-migration-test")
1999            .unwrap();
2000        match &migrated.details {
2001            Some(PaymentDetails::Token { tx_type, .. }) => {
2002                assert_eq!(*tx_type, TokenTransactionType::Transfer);
2003            }
2004            _ => panic!("Expected Token payment details"),
2005        }
2006
2007        // Verify new payment has Mint type
2008        let new = payments
2009            .iter()
2010            .find(|p| p.id == "new-token-payment")
2011            .unwrap();
2012        match &new.details {
2013            Some(PaymentDetails::Token { tx_type, .. }) => {
2014                assert_eq!(*tx_type, TokenTransactionType::Mint);
2015            }
2016            _ => panic!("Expected Token payment details"),
2017        }
2018
2019        // Step 7: Test filtering by token transaction type
2020        let transfer_filter = StorageListPaymentsRequest {
2021            type_filter: None,
2022            status_filter: None,
2023            asset_filter: None,
2024            payment_details_filter: Some(vec![StoragePaymentDetailsFilter::Token {
2025                conversion_refund_needed: None,
2026                tx_hash: None,
2027                tx_type: Some(TokenTransactionType::Transfer),
2028            }]),
2029            from_timestamp: None,
2030            to_timestamp: None,
2031            offset: None,
2032            limit: None,
2033            sort_ascending: Some(true),
2034        };
2035
2036        let transfer_payments = storage.list_payments(transfer_filter).await.unwrap();
2037        assert_eq!(
2038            transfer_payments.len(),
2039            1,
2040            "Should find only the Transfer payment"
2041        );
2042        assert_eq!(transfer_payments[0].id, "token-migration-test");
2043    }
2044
2045    #[tokio::test]
2046    #[allow(clippy::too_many_lines)]
2047    async fn test_migration_htlc_details() {
2048        use crate::{
2049            PaymentDetails, SparkHtlcStatus, Storage,
2050            persist::{StorageListPaymentsRequest, StoragePaymentDetailsFilter},
2051        };
2052        use rusqlite::{Connection, params};
2053        use rusqlite_migration::{M, Migrations};
2054
2055        let temp_dir = create_temp_dir("sqlite_migration_htlc_details");
2056        let db_path = temp_dir.join(super::DEFAULT_DB_FILENAME);
2057
2058        // Step 1: Create database at version 23 (before the htlc_status backfill migration)
2059        // This includes the ALTER TABLE that adds htlc_status and htlc_expiry_time columns (migration 22)
2060        // but not the backfill UPDATE (migration 23).
2061        {
2062            let mut conn = Connection::open(&db_path).unwrap();
2063            let migrations_before_backfill: Vec<_> = SqliteStorage::current_migrations()
2064                .iter()
2065                .take(23) // Migrations 0-22
2066                .map(|s| M::up(s))
2067                .collect();
2068            let migrations = Migrations::new(migrations_before_backfill);
2069            migrations.to_latest(&mut conn).unwrap();
2070        }
2071
2072        // Step 2: Insert Lightning payments with different statuses to test all branches
2073        {
2074            let conn = Connection::open(&db_path).unwrap();
2075
2076            // Insert a Completed Lightning payment
2077            conn.execute(
2078                "INSERT INTO payments (id, payment_type, status, amount, fees, timestamp, method)
2079                 VALUES (?, ?, ?, ?, ?, ?, ?)",
2080                params![
2081                    "ln-completed",
2082                    "send",
2083                    "completed",
2084                    "1000",
2085                    "10",
2086                    1_700_000_001_i64,
2087                    "\"lightning\""
2088                ],
2089            )
2090            .unwrap();
2091            conn.execute(
2092                "INSERT INTO payment_details_lightning (payment_id, invoice, payment_hash, destination_pubkey, preimage)
2093                 VALUES (?, ?, ?, ?, ?)",
2094                params![
2095                    "ln-completed",
2096                    "lnbc_completed",
2097                    "hash_completed_0123456789abcdef0123456789abcdef0123456789abcdef01234567",
2098                    "03pubkey1",
2099                    "preimage_completed"
2100                ],
2101            )
2102            .unwrap();
2103
2104            // Insert a Pending Lightning payment
2105            conn.execute(
2106                "INSERT INTO payments (id, payment_type, status, amount, fees, timestamp, method)
2107                 VALUES (?, ?, ?, ?, ?, ?, ?)",
2108                params![
2109                    "ln-pending",
2110                    "receive",
2111                    "pending",
2112                    "2000",
2113                    "0",
2114                    1_700_000_002_i64,
2115                    "\"lightning\""
2116                ],
2117            )
2118            .unwrap();
2119            conn.execute(
2120                "INSERT INTO payment_details_lightning (payment_id, invoice, payment_hash, destination_pubkey)
2121                 VALUES (?, ?, ?, ?)",
2122                params![
2123                    "ln-pending",
2124                    "lnbc_pending",
2125                    "hash_pending_0123456789abcdef0123456789abcdef0123456789abcdef012345678",
2126                    "03pubkey2"
2127                ],
2128            )
2129            .unwrap();
2130
2131            // Insert a Failed Lightning payment
2132            conn.execute(
2133                "INSERT INTO payments (id, payment_type, status, amount, fees, timestamp, method)
2134                 VALUES (?, ?, ?, ?, ?, ?, ?)",
2135                params![
2136                    "ln-failed",
2137                    "send",
2138                    "failed",
2139                    "3000",
2140                    "5",
2141                    1_700_000_003_i64,
2142                    "\"lightning\""
2143                ],
2144            )
2145            .unwrap();
2146            conn.execute(
2147                "INSERT INTO payment_details_lightning (payment_id, invoice, payment_hash, destination_pubkey)
2148                 VALUES (?, ?, ?, ?)",
2149                params![
2150                    "ln-failed",
2151                    "lnbc_failed",
2152                    "hash_failed_0123456789abcdef0123456789abcdef0123456789abcdef0123456789",
2153                    "03pubkey3"
2154                ],
2155            )
2156            .unwrap();
2157        }
2158
2159        // Step 3: Open with SqliteStorage (triggers migration 23 - the backfill)
2160        let storage = SqliteStorage::new(&temp_dir).unwrap();
2161
2162        // Step 4: Verify Completed → PreimageShared
2163        let completed = storage
2164            .get_payment_by_id("ln-completed".to_string())
2165            .await
2166            .unwrap();
2167        match &completed.details {
2168            Some(PaymentDetails::Lightning { htlc_details, .. }) => {
2169                assert_eq!(htlc_details.status, SparkHtlcStatus::PreimageShared);
2170                assert_eq!(htlc_details.expiry_time, 0);
2171                assert_eq!(
2172                    htlc_details.payment_hash,
2173                    "hash_completed_0123456789abcdef0123456789abcdef0123456789abcdef01234567"
2174                );
2175                assert_eq!(htlc_details.preimage.as_deref(), Some("preimage_completed"));
2176            }
2177            _ => panic!("Expected Lightning payment details for ln-completed"),
2178        }
2179
2180        // Step 5: Verify Pending → WaitingForPreimage
2181        let pending = storage
2182            .get_payment_by_id("ln-pending".to_string())
2183            .await
2184            .unwrap();
2185        match &pending.details {
2186            Some(PaymentDetails::Lightning { htlc_details, .. }) => {
2187                assert_eq!(htlc_details.status, SparkHtlcStatus::WaitingForPreimage);
2188                assert_eq!(htlc_details.expiry_time, 0);
2189                assert_eq!(
2190                    htlc_details.payment_hash,
2191                    "hash_pending_0123456789abcdef0123456789abcdef0123456789abcdef012345678"
2192                );
2193                assert!(htlc_details.preimage.is_none());
2194            }
2195            _ => panic!("Expected Lightning payment details for ln-pending"),
2196        }
2197
2198        // Step 6: Verify Failed → Returned
2199        let failed = storage
2200            .get_payment_by_id("ln-failed".to_string())
2201            .await
2202            .unwrap();
2203        match &failed.details {
2204            Some(PaymentDetails::Lightning { htlc_details, .. }) => {
2205                assert_eq!(htlc_details.status, SparkHtlcStatus::Returned);
2206                assert_eq!(htlc_details.expiry_time, 0);
2207            }
2208            _ => panic!("Expected Lightning payment details for ln-failed"),
2209        }
2210
2211        // Step 7: Verify filtering by htlc_status works on migrated data
2212        let waiting_payments = storage
2213            .list_payments(StorageListPaymentsRequest {
2214                payment_details_filter: Some(vec![StoragePaymentDetailsFilter::Lightning {
2215                    htlc_status: Some(vec![SparkHtlcStatus::WaitingForPreimage]),
2216                    has_lnurl_preimage: None,
2217                }]),
2218                ..Default::default()
2219            })
2220            .await
2221            .unwrap();
2222        assert_eq!(waiting_payments.len(), 1);
2223        assert_eq!(waiting_payments[0].id, "ln-pending");
2224
2225        let preimage_shared = storage
2226            .list_payments(StorageListPaymentsRequest {
2227                payment_details_filter: Some(vec![StoragePaymentDetailsFilter::Lightning {
2228                    htlc_status: Some(vec![SparkHtlcStatus::PreimageShared]),
2229                    has_lnurl_preimage: None,
2230                }]),
2231                ..Default::default()
2232            })
2233            .await
2234            .unwrap();
2235        assert_eq!(preimage_shared.len(), 1);
2236        assert_eq!(preimage_shared[0].id, "ln-completed");
2237
2238        let returned = storage
2239            .list_payments(StorageListPaymentsRequest {
2240                payment_details_filter: Some(vec![StoragePaymentDetailsFilter::Lightning {
2241                    htlc_status: Some(vec![SparkHtlcStatus::Returned]),
2242                    has_lnurl_preimage: None,
2243                }]),
2244                ..Default::default()
2245            })
2246            .await
2247            .unwrap();
2248        assert_eq!(returned.len(), 1);
2249        assert_eq!(returned[0].id, "ln-failed");
2250    }
2251
2252    #[tokio::test]
2253    async fn test_contacts_crud() {
2254        let temp_dir = create_temp_dir("contacts_crud");
2255        let storage = SqliteStorage::new(&temp_dir).unwrap();
2256
2257        crate::persist::tests::test_contacts_crud(Box::new(storage)).await;
2258    }
2259}