breez_sdk_spark/persist/
sqlite.rs

1use std::path::{Path, PathBuf};
2
3use macros::async_trait;
4use rusqlite::{
5    Connection, Row, ToSql, TransactionBehavior, params,
6    types::{FromSql, FromSqlError, FromSqlResult, ToSqlOutput, ValueRef},
7};
8use rusqlite_migration::{M, Migrations, SchemaVersion};
9
10use crate::{
11    AssetFilter, Contact, ConversionDetails, ConversionInfo, ConversionStatus, DepositInfo,
12    ListContactsRequest, LnurlPayInfo, LnurlReceiveMetadata, LnurlWithdrawInfo, PaymentDetails,
13    PaymentMethod, SparkHtlcDetails, SparkHtlcStatus, TokenTransactionType,
14    error::DepositClaimError,
15    persist::{
16        PaymentMetadata, SetLnurlMetadataItem, StorageListPaymentsRequest,
17        StoragePaymentDetailsFilter, UpdateDepositPayload,
18    },
19    sync_storage::{
20        IncomingChange, OutgoingChange, Record, RecordChange, RecordId, UnversionedRecordChange,
21    },
22};
23
24use std::collections::HashMap;
25
26use tracing::warn;
27
28use super::{Payment, Storage, StorageError};
29
30const DEFAULT_DB_FILENAME: &str = "storage.sql";
31/// SQLite-based storage implementation
32pub struct SqliteStorage {
33    db_dir: PathBuf,
34}
35
36impl SqliteStorage {
37    /// Creates a new `SQLite` storage
38    ///
39    /// # Arguments
40    ///
41    /// * `path` - Path to the `SQLite` database file
42    ///
43    /// # Returns
44    ///
45    /// A new `SqliteStorage` instance or an error
46    pub fn new(path: &Path) -> Result<Self, StorageError> {
47        let storage = Self {
48            db_dir: path.to_path_buf(),
49        };
50
51        #[cfg(not(all(target_family = "wasm", target_os = "unknown")))]
52        std::fs::create_dir_all(path)
53            .map_err(|e| StorageError::InitializationError(e.to_string()))?;
54
55        storage.migrate()?;
56        Ok(storage)
57    }
58
59    pub(crate) fn get_connection(&self) -> Result<Connection, StorageError> {
60        Ok(Connection::open(self.get_db_path())?)
61    }
62
63    fn get_db_path(&self) -> PathBuf {
64        self.db_dir.join(DEFAULT_DB_FILENAME)
65    }
66
67    fn migrate(&self) -> Result<(), StorageError> {
68        let migrations =
69            Migrations::new(Self::current_migrations().into_iter().map(M::up).collect());
70        let mut conn = self.get_connection()?;
71        let previous_version = match migrations.current_version(&conn)? {
72            SchemaVersion::Inside(previous_version) => previous_version.get(),
73            _ => 0,
74        };
75        migrations.to_latest(&mut conn)?;
76
77        if previous_version < 6 {
78            Self::migrate_lnurl_metadata_description(&mut conn)?;
79        }
80
81        Ok(())
82    }
83
84    fn migrate_lnurl_metadata_description(conn: &mut Connection) -> Result<(), StorageError> {
85        let mut stmt = conn.prepare("SELECT payment_id, lnurl_pay_info FROM payment_metadata")?;
86        let pay_infos: Vec<_> = stmt
87            .query_map([], |row| {
88                let payment_id: String = row.get(0)?;
89                let lnurl_pay_info: Option<LnurlPayInfo> = row.get(1)?;
90                Ok((payment_id, lnurl_pay_info))
91            })?
92            .collect::<Result<_, _>>()?;
93        let pay_infos = pay_infos
94            .into_iter()
95            .filter_map(|(payment_id, lnurl_pay_info)| {
96                let pay_info = lnurl_pay_info?;
97                let description = pay_info.extract_description()?;
98                Some((payment_id, description))
99            })
100            .collect::<Vec<_>>();
101
102        for pay_info in pay_infos {
103            conn.execute(
104                "UPDATE payment_metadata SET lnurl_description = ? WHERE payment_id = ?",
105                params![pay_info.1, pay_info.0],
106            )?;
107        }
108
109        Ok(())
110    }
111
112    #[allow(clippy::too_many_lines)]
113    pub(crate) fn current_migrations() -> Vec<&'static str> {
114        vec![
115            "CREATE TABLE IF NOT EXISTS payments (
116              id TEXT PRIMARY KEY,
117              payment_type TEXT NOT NULL,
118              status TEXT NOT NULL,
119              amount INTEGER NOT NULL,
120              fees INTEGER NOT NULL,
121              timestamp INTEGER NOT NULL,
122              details TEXT,
123              method TEXT
124            );",
125            "CREATE TABLE IF NOT EXISTS settings (
126              key TEXT PRIMARY KEY,
127              value TEXT NOT NULL
128            );",
129            "CREATE TABLE IF NOT EXISTS unclaimed_deposits (
130              txid TEXT NOT NULL,
131              vout INTEGER NOT NULL,
132              amount_sats INTEGER,
133              claim_error TEXT,
134              refund_tx TEXT,
135              refund_tx_id TEXT,
136              PRIMARY KEY (txid, vout)
137            );",
138            "CREATE TABLE IF NOT EXISTS payment_metadata (
139              payment_id TEXT PRIMARY KEY,
140              lnurl_pay_info TEXT
141            );",
142            "CREATE TABLE IF NOT EXISTS deposit_refunds (
143              deposit_tx_id TEXT NOT NULL,
144              deposit_vout INTEGER NOT NULL,
145              refund_tx TEXT NOT NULL,
146              refund_tx_id TEXT NOT NULL,
147              PRIMARY KEY (deposit_tx_id, deposit_vout)              
148            );",
149            "ALTER TABLE payment_metadata ADD COLUMN lnurl_description TEXT;",
150            "
151            ALTER TABLE payments ADD COLUMN withdraw_tx_id TEXT;
152            ALTER TABLE payments ADD COLUMN deposit_tx_id TEXT;
153            ALTER TABLE payments ADD COLUMN spark INTEGER;
154            CREATE TABLE payment_details_lightning (
155              payment_id TEXT PRIMARY KEY,
156              invoice TEXT NOT NULL,
157              payment_hash TEXT NOT NULL,
158              destination_pubkey TEXT NOT NULL,
159              description TEXT,
160              preimage TEXT,
161              FOREIGN KEY (payment_id) REFERENCES payments(id) ON DELETE CASCADE
162            );
163            INSERT INTO payment_details_lightning (payment_id, invoice, payment_hash, destination_pubkey, description, preimage)
164            SELECT id, json_extract(details, '$.Lightning.invoice'), json_extract(details, '$.Lightning.payment_hash'), 
165                json_extract(details, '$.Lightning.destination_pubkey'), json_extract(details, '$.Lightning.description'), 
166                json_extract(details, '$.Lightning.preimage') 
167            FROM payments WHERE json_extract(details, '$.Lightning.invoice') IS NOT NULL;
168
169            UPDATE payments SET withdraw_tx_id = json_extract(details, '$.Withdraw.tx_id')
170            WHERE json_extract(details, '$.Withdraw.tx_id') IS NOT NULL;
171
172            UPDATE payments SET deposit_tx_id = json_extract(details, '$.Deposit.tx_id')
173            WHERE json_extract(details, '$.Deposit.tx_id') IS NOT NULL;
174
175            ALTER TABLE payments DROP COLUMN details;
176
177            CREATE INDEX idx_payment_details_lightning_invoice ON payment_details_lightning(invoice);
178            ",
179            "CREATE TABLE payment_details_token (
180              payment_id TEXT PRIMARY KEY,
181              metadata TEXT NOT NULL,
182              tx_hash TEXT NOT NULL,
183              FOREIGN KEY (payment_id) REFERENCES payments(id) ON DELETE CASCADE
184            );",
185            // Migration to change payments amount and fees from INTEGER to TEXT
186            "CREATE TABLE payments_new (
187              id TEXT PRIMARY KEY,
188              payment_type TEXT NOT NULL,
189              status TEXT NOT NULL,
190              amount TEXT NOT NULL,
191              fees TEXT NOT NULL,
192              timestamp INTEGER NOT NULL,
193              method TEXT,
194              withdraw_tx_id TEXT,
195              deposit_tx_id TEXT,
196              spark INTEGER
197            );",
198            "INSERT INTO payments_new (id, payment_type, status, amount, fees, timestamp, method, withdraw_tx_id, deposit_tx_id, spark)
199             SELECT id, payment_type, status, CAST(amount AS TEXT), CAST(fees AS TEXT), timestamp, method, withdraw_tx_id, deposit_tx_id, spark
200             FROM payments;",
201            "DROP TABLE payments;",
202            "ALTER TABLE payments_new RENAME TO payments;",
203            "CREATE TABLE payment_details_spark (
204              payment_id TEXT NOT NULL PRIMARY KEY,
205              invoice_details TEXT NOT NULL,
206              FOREIGN KEY (payment_id) REFERENCES payments(id) ON DELETE CASCADE
207            );
208            ALTER TABLE payment_details_token ADD COLUMN invoice_details TEXT;",
209            "ALTER TABLE payment_metadata ADD COLUMN lnurl_withdraw_info TEXT;",
210            // sync_revision: tracks the last committed revision (from server-acknowledged
211            // or server-received records). Does NOT include pending outgoing queue ids.
212            // sync_outgoing.revision stores a local queue id for ordering/de-duplication only.
213            "CREATE TABLE sync_revision (
214                revision INTEGER NOT NULL DEFAULT 0
215            );
216            INSERT INTO sync_revision (revision) VALUES (0);
217            CREATE TABLE sync_outgoing(
218                record_type TEXT NOT NULL,
219                data_id TEXT NOT NULL,
220                schema_version TEXT NOT NULL,
221                commit_time INTEGER NOT NULL,
222                updated_fields_json TEXT NOT NULL,
223                revision INTEGER NOT NULL
224            );
225            CREATE INDEX idx_sync_outgoing_data_id_record_type ON sync_outgoing(record_type, data_id);
226            CREATE TABLE sync_state(
227                record_type TEXT NOT NULL,
228                data_id TEXT NOT NULL,
229                schema_version TEXT NOT NULL,
230                commit_time INTEGER NOT NULL,
231                data TEXT NOT NULL,
232                revision INTEGER NOT NULL,
233                PRIMARY KEY(record_type, data_id)
234            );",
235            "CREATE TABLE sync_incoming(
236                record_type TEXT NOT NULL,
237                data_id TEXT NOT NULL,
238                schema_version TEXT NOT NULL,
239                commit_time INTEGER NOT NULL,
240                data TEXT NOT NULL,
241                revision INTEGER NOT NULL,
242                PRIMARY KEY(record_type, data_id, revision)
243            );
244            CREATE INDEX idx_sync_incoming_revision ON sync_incoming(revision);",
245            "ALTER TABLE payment_details_spark RENAME TO tmp_payment_details_spark;
246            CREATE TABLE payment_details_spark (
247              payment_id TEXT NOT NULL PRIMARY KEY,
248              invoice_details TEXT,
249              htlc_details TEXT,
250              FOREIGN KEY (payment_id) REFERENCES payments(id) ON DELETE CASCADE
251            );
252            INSERT INTO payment_details_spark (payment_id, invoice_details)
253             SELECT payment_id, invoice_details FROM tmp_payment_details_spark;
254            DROP TABLE tmp_payment_details_spark;",
255            "CREATE TABLE lnurl_receive_metadata (
256                payment_hash TEXT NOT NULL PRIMARY KEY,
257                nostr_zap_request TEXT,
258                nostr_zap_receipt TEXT,
259                sender_comment TEXT
260            );",
261            // Delete all unclaimed deposits to clear old claim_error JSON format.
262            // Deposits will be recovered on next sync.
263            "DELETE FROM unclaimed_deposits;",
264            // Clear all sync tables due to BreezSigner signature change.
265            // This forces users to sync from scratch to the sync server.
266            // Also delete the sync_initial_complete flag to force re-populating
267            // all payment metadata for outgoing sync using the new key.
268            "DELETE FROM sync_outgoing;
269             DELETE FROM sync_incoming;
270             DELETE FROM sync_state;
271             UPDATE sync_revision SET revision = 0;
272             DELETE FROM settings WHERE key = 'sync_initial_complete';",
273            "ALTER TABLE payment_metadata ADD COLUMN token_conversion_info TEXT;",
274            "ALTER TABLE payment_metadata ADD COLUMN parent_payment_id TEXT;",
275            "
276            ALTER TABLE payment_metadata DROP COLUMN token_conversion_info;
277            ALTER TABLE payment_metadata ADD COLUMN conversion_info TEXT;
278            ",
279            // Add tx_type column with a default value of 'transfer'.
280            // Reset only the token sync position (not bitcoin offset) to trigger token re-sync.
281            // This will update all token payment records with the correct tx_type values.
282            // Note: This intentionally couples to the CachedSyncInfo schema at migration time.
283            "ALTER TABLE payment_details_token ADD COLUMN tx_type TEXT NOT NULL DEFAULT 'transfer';
284            UPDATE settings
285            SET value = json_set(value, '$.last_synced_final_token_payment_id', NULL)
286            WHERE key = 'sync_offset' AND json_valid(value) AND json_type(value, '$.last_synced_final_token_payment_id') IS NOT NULL;",
287            "DELETE FROM sync_outgoing;
288             DELETE FROM sync_incoming;
289             DELETE FROM sync_state;
290             UPDATE sync_revision SET revision = 0;
291             DELETE FROM settings WHERE key = 'sync_initial_complete';",
292            "ALTER TABLE payment_details_lightning ADD COLUMN htlc_status TEXT NOT NULL DEFAULT 'WaitingForPreimage';
293             ALTER TABLE payment_details_lightning ADD COLUMN htlc_expiry_time INTEGER NOT NULL DEFAULT 0;",
294            // Backfill htlc_status for existing Lightning payments where it's NULL.
295            // After this migration, htlc_status is required for all Lightning payments.
296            // Also reset the bitcoin sync offset to trigger a full resync, which will
297            // correct the backfilled expiry_time values.
298            "UPDATE payment_details_lightning
299             SET htlc_status = CASE
300                     WHEN (SELECT status FROM payments WHERE id = payment_id) = 'completed' THEN 'PreimageShared'
301                     WHEN (SELECT status FROM payments WHERE id = payment_id) = 'pending' THEN 'WaitingForPreimage'
302                     ELSE 'Returned'
303                 END;
304             UPDATE settings
305             SET value = json_set(value, '$.offset', 0)
306             WHERE key = 'sync_offset' AND json_valid(value);",
307            // Add preimage column for LUD-21 and NIP-57 support
308            "ALTER TABLE lnurl_receive_metadata ADD COLUMN preimage TEXT;",
309            // Clear the lnurl_metadata_updated_after setting to force re-sync
310            // This ensures clients get the new preimage field from the server
311            "DELETE FROM settings WHERE key = 'lnurl_metadata_updated_after';",
312            // Clear cached lightning address - schema changed from string to LnurlInfo struct
313            "DELETE FROM settings WHERE key = 'lightning_address';",
314            // Add index on payment_hash for JOIN with lnurl_receive_metadata
315            "CREATE INDEX IF NOT EXISTS idx_payment_details_lightning_payment_hash ON payment_details_lightning(payment_hash);",
316            "CREATE TABLE contacts (
317                id TEXT PRIMARY KEY,
318                name TEXT NOT NULL,
319                payment_identifier TEXT NOT NULL,
320                created_at INTEGER NOT NULL,
321                updated_at INTEGER NOT NULL
322            );",
323            // Drop preimage column from lnurl_receive_metadata - no longer needed
324            // since the server handles preimage tracking via webhooks.
325            "ALTER TABLE lnurl_receive_metadata DROP COLUMN preimage;",
326            // Clear cached lightning address - format changed to CachedLightningAddress wrapper
327            "DELETE FROM settings WHERE key = 'lightning_address';",
328            "ALTER TABLE unclaimed_deposits ADD COLUMN is_mature INTEGER NOT NULL DEFAULT 1;",
329            // Add conversion_status to payment_metadata
330            "ALTER TABLE payment_metadata ADD COLUMN conversion_status TEXT;",
331        ]
332    }
333}
334
335/// Maps a rusqlite error to the appropriate `StorageError`.
336/// Database busy/locked errors are mapped to `Connection` (transient),
337/// other errors are mapped to `Implementation`.
338#[allow(clippy::needless_pass_by_value)]
339fn map_sqlite_error(e: rusqlite::Error) -> StorageError {
340    match e {
341        rusqlite::Error::SqliteFailure(err, _)
342            if err.code == rusqlite::ErrorCode::DatabaseBusy
343                || err.code == rusqlite::ErrorCode::DatabaseLocked =>
344        {
345            StorageError::Connection(e.to_string())
346        }
347        _ => StorageError::Implementation(e.to_string()),
348    }
349}
350
351impl From<rusqlite::Error> for StorageError {
352    fn from(value: rusqlite::Error) -> Self {
353        map_sqlite_error(value)
354    }
355}
356
357impl From<rusqlite_migration::Error> for StorageError {
358    fn from(value: rusqlite_migration::Error) -> Self {
359        StorageError::Implementation(value.to_string())
360    }
361}
362
363#[async_trait]
364impl Storage for SqliteStorage {
365    #[allow(clippy::too_many_lines)]
366    async fn list_payments(
367        &self,
368        request: StorageListPaymentsRequest,
369    ) -> Result<Vec<Payment>, StorageError> {
370        let connection = self.get_connection()?;
371
372        // Build WHERE clauses based on filters
373        let mut where_clauses = Vec::new();
374        let mut params: Vec<Box<dyn ToSql>> = Vec::new();
375
376        // Filter by payment type
377        if let Some(ref type_filter) = request.type_filter
378            && !type_filter.is_empty()
379        {
380            let placeholders = type_filter
381                .iter()
382                .map(|_| "?")
383                .collect::<Vec<_>>()
384                .join(", ");
385            where_clauses.push(format!("p.payment_type IN ({placeholders})"));
386            for payment_type in type_filter {
387                params.push(Box::new(payment_type.to_string()));
388            }
389        }
390
391        // Filter by status
392        if let Some(ref status_filter) = request.status_filter
393            && !status_filter.is_empty()
394        {
395            let placeholders = status_filter
396                .iter()
397                .map(|_| "?")
398                .collect::<Vec<_>>()
399                .join(", ");
400            where_clauses.push(format!("p.status IN ({placeholders})"));
401            for status in status_filter {
402                params.push(Box::new(status.to_string()));
403            }
404        }
405
406        // Filter by timestamp range
407        if let Some(from_timestamp) = request.from_timestamp {
408            where_clauses.push("p.timestamp >= ?".to_string());
409            params.push(Box::new(from_timestamp));
410        }
411
412        if let Some(to_timestamp) = request.to_timestamp {
413            where_clauses.push("p.timestamp < ?".to_string());
414            params.push(Box::new(to_timestamp));
415        }
416
417        // Filter by asset
418        if let Some(ref asset_filter) = request.asset_filter {
419            match asset_filter {
420                AssetFilter::Bitcoin => {
421                    where_clauses.push("t.metadata IS NULL".to_string());
422                }
423                AssetFilter::Token { token_identifier } => {
424                    where_clauses.push("t.metadata IS NOT NULL".to_string());
425                    if let Some(identifier) = token_identifier {
426                        // Filter by specific token identifier
427                        where_clauses
428                            .push("json_extract(t.metadata, '$.identifier') = ?".to_string());
429                        params.push(Box::new(identifier.clone()));
430                    }
431                }
432            }
433        }
434
435        // Filter by payment details. If any filter matches, we include the payment
436        if let Some(ref payment_details_filter) = request.payment_details_filter {
437            let mut all_payment_details_clauses = Vec::new();
438            for payment_details_filter in payment_details_filter {
439                let mut payment_details_clauses = Vec::new();
440                // Filter by HTLC status (Spark or Lightning)
441                let htlc_filter = match payment_details_filter {
442                    StoragePaymentDetailsFilter::Spark {
443                        htlc_status: Some(s),
444                        ..
445                    } if !s.is_empty() => Some(("s", s)),
446                    StoragePaymentDetailsFilter::Lightning {
447                        htlc_status: Some(s),
448                        ..
449                    } if !s.is_empty() => Some(("l", s)),
450                    _ => None,
451                };
452                if let Some((alias, htlc_statuses)) = htlc_filter {
453                    let placeholders = htlc_statuses
454                        .iter()
455                        .map(|_| "?")
456                        .collect::<Vec<_>>()
457                        .join(", ");
458                    if alias == "l" {
459                        // Lightning: htlc_status is a direct column
460                        payment_details_clauses.push(format!("l.htlc_status IN ({placeholders})"));
461                    } else {
462                        // Spark: htlc_details is still JSON
463                        payment_details_clauses.push(format!(
464                            "json_extract(s.htlc_details, '$.status') IN ({placeholders})"
465                        ));
466                    }
467                    for htlc_status in htlc_statuses {
468                        params.push(Box::new(htlc_status.to_string()));
469                    }
470                }
471                // Filter by conversion info presence
472                let conversion_filter = match payment_details_filter {
473                    StoragePaymentDetailsFilter::Spark {
474                        conversion_refund_needed: Some(v),
475                        ..
476                    } => Some((v, "p.spark = 1")),
477                    StoragePaymentDetailsFilter::Token {
478                        conversion_refund_needed: Some(v),
479                        ..
480                    } => Some((v, "p.spark IS NULL")),
481                    _ => None,
482                };
483                if let Some((conversion_refund_needed, type_check)) = conversion_filter {
484                    let refund_needed = if *conversion_refund_needed {
485                        "= 'RefundNeeded'"
486                    } else {
487                        "!= 'RefundNeeded'"
488                    };
489                    payment_details_clauses.push(format!(
490                        "{type_check} AND pm.conversion_info IS NOT NULL AND
491                         json_extract(pm.conversion_info, '$.status') {refund_needed}"
492                    ));
493                }
494                // Filter by token transaction hash
495                if let StoragePaymentDetailsFilter::Token {
496                    tx_hash: Some(tx_hash),
497                    ..
498                } = payment_details_filter
499                {
500                    payment_details_clauses.push("t.tx_hash = ?".to_string());
501                    params.push(Box::new(tx_hash.clone()));
502                }
503
504                // Filter by token transaction type
505                if let StoragePaymentDetailsFilter::Token {
506                    tx_type: Some(tx_type),
507                    ..
508                } = payment_details_filter
509                {
510                    payment_details_clauses.push("t.tx_type = ?".to_string());
511                    params.push(Box::new(tx_type.to_string()));
512                }
513
514                if !payment_details_clauses.is_empty() {
515                    all_payment_details_clauses
516                        .push(format!("({})", payment_details_clauses.join(" AND ")));
517                }
518            }
519
520            if !all_payment_details_clauses.is_empty() {
521                where_clauses.push(format!("({})", all_payment_details_clauses.join(" OR ")));
522            }
523        }
524
525        // Exclude child payments (those with a parent_payment_id)
526        // Child payments are accessed via the parent's related_payments field
527        where_clauses.push("pm.parent_payment_id IS NULL".to_string());
528
529        // Build the WHERE clause
530        let where_sql = if where_clauses.is_empty() {
531            String::new()
532        } else {
533            format!("WHERE {}", where_clauses.join(" AND "))
534        };
535
536        // Determine sort order
537        let order_direction = if request.sort_ascending.unwrap_or(false) {
538            "ASC"
539        } else {
540            "DESC"
541        };
542
543        let query = format!(
544            "{SELECT_PAYMENT_SQL} {where_sql} ORDER BY p.timestamp {order_direction} LIMIT {} OFFSET {}",
545            request.limit.unwrap_or(u32::MAX),
546            request.offset.unwrap_or(0)
547        );
548
549        let mut stmt = connection.prepare(&query)?;
550        let param_refs: Vec<&dyn ToSql> = params.iter().map(std::convert::AsRef::as_ref).collect();
551        let payments = stmt
552            .query_map(param_refs.as_slice(), map_payment)?
553            .collect::<Result<Vec<_>, _>>()?;
554        Ok(payments)
555    }
556
557    #[allow(clippy::too_many_lines)]
558    async fn insert_payment(&self, payment: Payment) -> Result<(), StorageError> {
559        let mut connection = self.get_connection()?;
560        let tx = connection.transaction_with_behavior(TransactionBehavior::Immediate)?;
561
562        // Compute detail columns for the main payments row
563        let (withdraw_tx_id, deposit_tx_id, spark): (Option<&str>, Option<&str>, Option<bool>) =
564            match &payment.details {
565                Some(PaymentDetails::Withdraw { tx_id }) => (Some(tx_id.as_str()), None, None),
566                Some(PaymentDetails::Deposit { tx_id }) => (None, Some(tx_id.as_str()), None),
567                Some(PaymentDetails::Spark { .. }) => (None, None, Some(true)),
568                _ => (None, None, None),
569            };
570
571        // Insert or update main payment record (including detail columns atomically)
572        tx.execute(
573            "INSERT INTO payments (id, payment_type, status, amount, fees, timestamp, method, withdraw_tx_id, deposit_tx_id, spark)
574             VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
575             ON CONFLICT(id) DO UPDATE SET
576                payment_type=excluded.payment_type,
577                status=excluded.status,
578                amount=excluded.amount,
579                fees=excluded.fees,
580                timestamp=excluded.timestamp,
581                method=excluded.method,
582                withdraw_tx_id=excluded.withdraw_tx_id,
583                deposit_tx_id=excluded.deposit_tx_id,
584                spark=excluded.spark",
585            params![
586                payment.id,
587                payment.payment_type.to_string(),
588                payment.status.to_string(),
589                U128SqlWrapper(payment.amount),
590                U128SqlWrapper(payment.fees),
591                payment.timestamp,
592                payment.method,
593                withdraw_tx_id,
594                deposit_tx_id,
595                spark,
596            ],
597        )?;
598
599        match payment.details {
600            Some(PaymentDetails::Spark {
601                invoice_details,
602                htlc_details,
603                ..
604            }) => {
605                if invoice_details.is_some() || htlc_details.is_some() {
606                    // Upsert both details together and avoid overwriting existing data with NULLs
607                    tx.execute(
608                        "INSERT INTO payment_details_spark (payment_id, invoice_details, htlc_details)
609                         VALUES (?, ?, ?)
610                         ON CONFLICT(payment_id) DO UPDATE SET
611                            invoice_details=COALESCE(excluded.invoice_details, payment_details_spark.invoice_details),
612                            htlc_details=COALESCE(excluded.htlc_details, payment_details_spark.htlc_details)",
613                        params![
614                            payment.id,
615                            invoice_details.as_ref().map(serde_json::to_string).transpose()?,
616                            htlc_details.as_ref().map(serde_json::to_string).transpose()?,
617                        ],
618                    )?;
619                }
620            }
621            Some(PaymentDetails::Token {
622                metadata,
623                tx_hash,
624                tx_type,
625                invoice_details,
626                ..
627            }) => {
628                tx.execute(
629                    "INSERT INTO payment_details_token (payment_id, metadata, tx_hash, tx_type, invoice_details)
630                     VALUES (?, ?, ?, ?, ?)
631                     ON CONFLICT(payment_id) DO UPDATE SET 
632                        metadata=excluded.metadata,
633                        tx_hash=excluded.tx_hash,
634                        tx_type=excluded.tx_type,
635                        invoice_details=COALESCE(excluded.invoice_details, payment_details_token.invoice_details)",
636                    params![
637                        payment.id,
638                        serde_json::to_string(&metadata)?,
639                        tx_hash,
640                        tx_type.to_string(),
641                        invoice_details.as_ref().map(serde_json::to_string).transpose()?,
642                    ],
643                )?;
644            }
645            Some(PaymentDetails::Lightning {
646                invoice,
647                destination_pubkey,
648                description,
649                htlc_details,
650                ..
651            }) => {
652                tx.execute(
653                    "INSERT INTO payment_details_lightning (payment_id, invoice, payment_hash, destination_pubkey, description, preimage, htlc_status, htlc_expiry_time)
654                     VALUES (?, ?, ?, ?, ?, ?, ?, ?)
655                     ON CONFLICT(payment_id) DO UPDATE SET
656                        invoice=excluded.invoice,
657                        payment_hash=excluded.payment_hash,
658                        destination_pubkey=excluded.destination_pubkey,
659                        description=excluded.description,
660                        preimage=COALESCE(excluded.preimage, payment_details_lightning.preimage),
661                        htlc_status=COALESCE(excluded.htlc_status, payment_details_lightning.htlc_status),
662                        htlc_expiry_time=COALESCE(excluded.htlc_expiry_time, payment_details_lightning.htlc_expiry_time)",
663                    params![
664                        payment.id,
665                        invoice,
666                        htlc_details.payment_hash,
667                        destination_pubkey,
668                        description,
669                        htlc_details.preimage,
670                        htlc_details.status.to_string(),
671                        htlc_details.expiry_time,
672                    ],
673                )?;
674            }
675            Some(PaymentDetails::Withdraw { .. } | PaymentDetails::Deposit { .. }) | None => {}
676        }
677
678        tx.commit()?;
679        Ok(())
680    }
681
682    async fn insert_payment_metadata(
683        &self,
684        payment_id: String,
685        metadata: PaymentMetadata,
686    ) -> Result<(), StorageError> {
687        let connection = self.get_connection()?;
688
689        connection.execute(
690            "INSERT INTO payment_metadata (payment_id, parent_payment_id, lnurl_pay_info, lnurl_withdraw_info, lnurl_description, conversion_info, conversion_status)
691             VALUES (?, ?, ?, ?, ?, ?, ?)
692             ON CONFLICT(payment_id) DO UPDATE SET
693                parent_payment_id = COALESCE(excluded.parent_payment_id, parent_payment_id),
694                lnurl_pay_info = COALESCE(excluded.lnurl_pay_info, lnurl_pay_info),
695                lnurl_withdraw_info = COALESCE(excluded.lnurl_withdraw_info, lnurl_withdraw_info),
696                lnurl_description = COALESCE(excluded.lnurl_description, lnurl_description),
697                conversion_info = COALESCE(excluded.conversion_info, conversion_info),
698                conversion_status = COALESCE(excluded.conversion_status, conversion_status)",
699            params![
700                payment_id,
701                metadata.parent_payment_id,
702                metadata.lnurl_pay_info,
703                metadata.lnurl_withdraw_info,
704                metadata.lnurl_description,
705                metadata.conversion_info.as_ref().map(serde_json::to_string).transpose()?,
706                metadata.conversion_status.as_ref().map(std::string::ToString::to_string),
707            ],
708        )?;
709
710        Ok(())
711    }
712
713    async fn set_cached_item(&self, key: String, value: String) -> Result<(), StorageError> {
714        let connection = self.get_connection()?;
715
716        connection.execute(
717            "INSERT OR REPLACE INTO settings (key, value) VALUES (?, ?)",
718            params![key, value],
719        )?;
720
721        Ok(())
722    }
723
724    async fn get_cached_item(&self, key: String) -> Result<Option<String>, StorageError> {
725        let connection = self.get_connection()?;
726
727        let mut stmt = connection.prepare("SELECT value FROM settings WHERE key = ?")?;
728
729        let result = stmt.query_row(params![key], |row| {
730            let value_str: String = row.get(0)?;
731            Ok(value_str)
732        });
733
734        match result {
735            Ok(value) => Ok(Some(value)),
736            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
737            Err(e) => Err(e.into()),
738        }
739    }
740
741    async fn delete_cached_item(&self, key: String) -> Result<(), StorageError> {
742        let connection = self.get_connection()?;
743
744        connection.execute("DELETE FROM settings WHERE key = ?", params![key])?;
745
746        Ok(())
747    }
748
749    async fn get_payment_by_id(&self, id: String) -> Result<Payment, StorageError> {
750        let connection = self.get_connection()?;
751        let query = format!("{SELECT_PAYMENT_SQL} WHERE p.id = ?");
752        let mut stmt = connection.prepare(&query)?;
753        let payment = stmt.query_row(params![id], map_payment)?;
754        Ok(payment)
755    }
756
757    async fn get_payment_by_invoice(
758        &self,
759        invoice: String,
760    ) -> Result<Option<Payment>, StorageError> {
761        let connection = self.get_connection()?;
762        let query = format!("{SELECT_PAYMENT_SQL} WHERE l.invoice = ?");
763        let mut stmt = connection.prepare(&query)?;
764        let payment = stmt.query_row(params![invoice], map_payment);
765        match payment {
766            Ok(payment) => Ok(Some(payment)),
767            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
768            Err(e) => Err(e.into()),
769        }
770    }
771
772    async fn get_payments_by_parent_ids(
773        &self,
774        parent_payment_ids: Vec<String>,
775    ) -> Result<HashMap<String, Vec<Payment>>, StorageError> {
776        if parent_payment_ids.is_empty() {
777            return Ok(HashMap::new());
778        }
779
780        let connection = self.get_connection()?;
781
782        // Early exit if no related payments exist
783        let has_related: bool = connection.query_row(
784            "SELECT EXISTS(SELECT 1 FROM payment_metadata WHERE parent_payment_id IS NOT NULL LIMIT 1)",
785            [],
786            |row| row.get(0),
787        )?;
788        if !has_related {
789            return Ok(HashMap::new());
790        }
791
792        // Build the IN clause with placeholders
793        let placeholders: Vec<&str> = parent_payment_ids.iter().map(|_| "?").collect();
794        let in_clause = placeholders.join(", ");
795
796        let query = format!(
797            "{SELECT_PAYMENT_SQL} WHERE pm.parent_payment_id IN ({in_clause}) ORDER BY p.timestamp ASC"
798        );
799
800        let mut stmt = connection.prepare(&query)?;
801        let params: Vec<&dyn ToSql> = parent_payment_ids
802            .iter()
803            .map(|id| id as &dyn ToSql)
804            .collect();
805        let rows = stmt.query_map(params.as_slice(), |row| {
806            let payment = map_payment(row)?;
807            let parent_payment_id: String = row.get(31)?;
808            Ok((parent_payment_id, payment))
809        })?;
810
811        let mut result: HashMap<String, Vec<Payment>> = HashMap::new();
812        for row in rows {
813            let (parent_id, related_payment) = row?;
814            result.entry(parent_id).or_default().push(related_payment);
815        }
816
817        Ok(result)
818    }
819
820    async fn add_deposit(
821        &self,
822        txid: String,
823        vout: u32,
824        amount_sats: u64,
825        is_mature: bool,
826    ) -> Result<(), StorageError> {
827        let connection = self.get_connection()?;
828        connection.execute(
829            "INSERT INTO unclaimed_deposits (txid, vout, amount_sats, is_mature)
830             VALUES (?, ?, ?, ?)
831             ON CONFLICT(txid, vout) DO UPDATE SET is_mature = excluded.is_mature, amount_sats = excluded.amount_sats",
832            params![txid, vout, amount_sats, is_mature],
833        )?;
834        Ok(())
835    }
836
837    async fn delete_deposit(&self, txid: String, vout: u32) -> Result<(), StorageError> {
838        let connection = self.get_connection()?;
839        connection.execute(
840            "DELETE FROM unclaimed_deposits WHERE txid = ? AND vout = ?",
841            params![txid, vout],
842        )?;
843        Ok(())
844    }
845
846    async fn list_deposits(&self) -> Result<Vec<DepositInfo>, StorageError> {
847        let connection = self.get_connection()?;
848        let mut stmt =
849            connection.prepare("SELECT txid, vout, amount_sats, is_mature, claim_error, refund_tx, refund_tx_id FROM unclaimed_deposits")?;
850        let rows = stmt.query_map(params![], |row| {
851            Ok(DepositInfo {
852                txid: row.get(0)?,
853                vout: row.get(1)?,
854                amount_sats: row.get(2)?,
855                is_mature: row.get(3)?,
856                claim_error: row.get(4)?,
857                refund_tx: row.get(5)?,
858                refund_tx_id: row.get(6)?,
859            })
860        })?;
861        let mut deposits = Vec::new();
862        for row in rows {
863            deposits.push(row?);
864        }
865        Ok(deposits)
866    }
867
868    async fn update_deposit(
869        &self,
870        txid: String,
871        vout: u32,
872        payload: UpdateDepositPayload,
873    ) -> Result<(), StorageError> {
874        let connection = self.get_connection()?;
875        match payload {
876            UpdateDepositPayload::ClaimError { error } => {
877                connection.execute(
878                    "UPDATE unclaimed_deposits SET claim_error = ?, refund_tx = NULL, refund_tx_id = NULL WHERE txid = ? AND vout = ?",
879                    params![error, txid, vout],
880                )?;
881            }
882            UpdateDepositPayload::Refund {
883                refund_txid,
884                refund_tx,
885            } => {
886                connection.execute(
887                    "UPDATE unclaimed_deposits SET refund_tx = ?, refund_tx_id = ?, claim_error = NULL WHERE txid = ? AND vout = ?",
888                    params![refund_tx, refund_txid, txid, vout],
889                )?;
890            }
891        }
892        Ok(())
893    }
894
895    async fn set_lnurl_metadata(
896        &self,
897        metadata: Vec<SetLnurlMetadataItem>,
898    ) -> Result<(), StorageError> {
899        let connection = self.get_connection()?;
900        for metadata in metadata {
901            connection.execute(
902                "INSERT OR REPLACE INTO lnurl_receive_metadata (payment_hash, nostr_zap_request, nostr_zap_receipt, sender_comment)
903                 VALUES (?, ?, ?, ?)",
904                params![
905                    metadata.payment_hash,
906                    metadata.nostr_zap_request,
907                    metadata.nostr_zap_receipt,
908                    metadata.sender_comment,
909                ],
910            )?;
911        }
912        Ok(())
913    }
914
915    async fn list_contacts(
916        &self,
917        request: ListContactsRequest,
918    ) -> Result<Vec<Contact>, StorageError> {
919        let limit = request.limit.unwrap_or(u32::MAX);
920        let offset = request.offset.unwrap_or(0);
921        let connection = self.get_connection()?;
922        let query = "SELECT id, name, payment_identifier, created_at, updated_at FROM contacts ORDER BY name ASC LIMIT ? OFFSET ?";
923
924        let mut stmt = connection.prepare(query)?;
925        let contacts = stmt
926            .query_map(params![limit, offset], |row| {
927                Ok(Contact {
928                    id: row.get(0)?,
929                    name: row.get(1)?,
930                    payment_identifier: row.get(2)?,
931                    created_at: row.get(3)?,
932                    updated_at: row.get(4)?,
933                })
934            })?
935            .collect::<Result<Vec<_>, _>>()?;
936        Ok(contacts)
937    }
938
939    async fn get_contact(&self, id: String) -> Result<Contact, StorageError> {
940        let connection = self.get_connection()?;
941        let mut stmt = connection.prepare(
942            "SELECT id, name, payment_identifier, created_at, updated_at FROM contacts WHERE id = ?",
943        )?;
944        stmt.query_row(params![id], |row| {
945            Ok(Contact {
946                id: row.get(0)?,
947                name: row.get(1)?,
948                payment_identifier: row.get(2)?,
949                created_at: row.get(3)?,
950                updated_at: row.get(4)?,
951            })
952        })
953        .map_err(|e| match e {
954            rusqlite::Error::QueryReturnedNoRows => StorageError::NotFound,
955            other => other.into(),
956        })
957    }
958
959    async fn insert_contact(&self, contact: Contact) -> Result<(), StorageError> {
960        let connection = self.get_connection()?;
961        connection.execute(
962            "INSERT INTO contacts (id, name, payment_identifier, created_at, updated_at)
963             VALUES (?, ?, ?, ?, ?)
964             ON CONFLICT(id) DO UPDATE SET
965               name = excluded.name,
966               payment_identifier = excluded.payment_identifier,
967               updated_at = excluded.updated_at",
968            params![
969                contact.id,
970                contact.name,
971                contact.payment_identifier,
972                contact.created_at,
973                contact.updated_at,
974            ],
975        )?;
976        Ok(())
977    }
978
979    async fn delete_contact(&self, id: String) -> Result<(), StorageError> {
980        let connection = self.get_connection()?;
981        connection.execute("DELETE FROM contacts WHERE id = ?", params![id])?;
982        Ok(())
983    }
984
985    async fn add_outgoing_change(
986        &self,
987        record: UnversionedRecordChange,
988    ) -> Result<u64, StorageError> {
989        let mut connection = self.get_connection()?;
990        let tx = connection
991            .transaction_with_behavior(TransactionBehavior::Immediate)
992            .map_err(map_sqlite_error)?;
993
994        // This revision is a local queue id for pending rows, not a server revision.
995        let local_revision: u64 = tx
996            .query_row(
997                "SELECT COALESCE(MAX(revision), 0) + 1 FROM sync_outgoing",
998                [],
999                |row| row.get(0),
1000            )
1001            .map_err(map_sqlite_error)?;
1002
1003        tx.execute(
1004            "INSERT INTO sync_outgoing (
1005                record_type
1006            ,   data_id
1007            ,   schema_version
1008            ,   commit_time
1009            ,   updated_fields_json
1010            ,   revision
1011            )
1012             VALUES (?, ?, ?, strftime('%s','now'), ?, ?)",
1013            params![
1014                record.id.r#type,
1015                record.id.data_id,
1016                record.schema_version.clone(),
1017                serde_json::to_string(&record.updated_fields)?,
1018                local_revision,
1019            ],
1020        )
1021        .map_err(map_sqlite_error)?;
1022
1023        tx.commit().map_err(map_sqlite_error)?;
1024        Ok(local_revision)
1025    }
1026
1027    async fn complete_outgoing_sync(
1028        &self,
1029        record: Record,
1030        local_revision: u64,
1031    ) -> Result<(), StorageError> {
1032        let mut connection = self.get_connection()?;
1033        let tx = connection
1034            .transaction_with_behavior(TransactionBehavior::Immediate)
1035            .map_err(map_sqlite_error)?;
1036
1037        let rows_deleted = tx
1038            .execute(
1039                "DELETE FROM sync_outgoing WHERE record_type = ? AND data_id = ? AND revision = ?",
1040                params![record.id.r#type, record.id.data_id, local_revision],
1041            )
1042            .map_err(map_sqlite_error)?;
1043
1044        if rows_deleted == 0 {
1045            warn!(
1046                "complete_outgoing_sync: DELETE from sync_outgoing matched 0 rows \
1047                 (type={}, data_id={}, revision={})",
1048                record.id.r#type, record.id.data_id, local_revision
1049            );
1050        }
1051
1052        tx.execute(
1053            "INSERT OR REPLACE INTO sync_state (
1054                record_type
1055            ,   data_id
1056            ,   schema_version
1057            ,   commit_time
1058            ,   data
1059            ,   revision
1060            )
1061             VALUES (?, ?, ?, strftime('%s','now'), ?, ?)",
1062            params![
1063                record.id.r#type,
1064                record.id.data_id,
1065                record.schema_version.clone(),
1066                serde_json::to_string(&record.data)?,
1067                record.revision,
1068            ],
1069        )
1070        .map_err(map_sqlite_error)?;
1071
1072        tx.execute(
1073            "UPDATE sync_revision SET revision = MAX(revision, ?)",
1074            params![record.revision],
1075        )
1076        .map_err(map_sqlite_error)?;
1077
1078        tx.commit().map_err(map_sqlite_error)?;
1079        Ok(())
1080    }
1081
1082    async fn get_pending_outgoing_changes(
1083        &self,
1084        limit: u32,
1085    ) -> Result<Vec<OutgoingChange>, StorageError> {
1086        let connection = self.get_connection()?;
1087
1088        let mut stmt = connection
1089            .prepare(
1090                "SELECT o.record_type
1091            ,       o.data_id
1092            ,       o.schema_version
1093            ,       o.commit_time
1094            ,       o.updated_fields_json
1095            ,       o.revision
1096            ,       e.schema_version AS existing_schema_version
1097            ,       e.commit_time AS existing_commit_time
1098            ,       e.data AS existing_data
1099            ,       e.revision AS existing_revision
1100             FROM sync_outgoing o
1101             LEFT JOIN sync_state e ON o.record_type = e.record_type AND o.data_id = e.data_id
1102             ORDER BY o.revision ASC
1103             LIMIT ?",
1104            )
1105            .map_err(map_sqlite_error)?;
1106        let mut rows = stmt.query(params![limit]).map_err(map_sqlite_error)?;
1107        let mut results = Vec::new();
1108        while let Some(row) = rows.next().map_err(map_sqlite_error)? {
1109            let parent = if let Some(existing_data) =
1110                row.get::<_, Option<String>>(8).map_err(map_sqlite_error)?
1111            {
1112                Some(Record {
1113                    id: RecordId::new(
1114                        row.get::<_, String>(0).map_err(map_sqlite_error)?,
1115                        row.get::<_, String>(1).map_err(map_sqlite_error)?,
1116                    ),
1117                    schema_version: row.get(6).map_err(map_sqlite_error)?,
1118                    revision: row.get(9).map_err(map_sqlite_error)?,
1119                    data: serde_json::from_str(&existing_data)?,
1120                })
1121            } else {
1122                None
1123            };
1124            let change = RecordChange {
1125                id: RecordId::new(
1126                    row.get::<_, String>(0).map_err(map_sqlite_error)?,
1127                    row.get::<_, String>(1).map_err(map_sqlite_error)?,
1128                ),
1129                schema_version: row.get(2).map_err(map_sqlite_error)?,
1130                updated_fields: serde_json::from_str(
1131                    &row.get::<_, String>(4).map_err(map_sqlite_error)?,
1132                )?,
1133                local_revision: row.get(5).map_err(map_sqlite_error)?,
1134            };
1135            results.push(OutgoingChange { change, parent });
1136        }
1137
1138        Ok(results)
1139    }
1140
1141    async fn get_last_revision(&self) -> Result<u64, StorageError> {
1142        let connection = self.get_connection()?;
1143
1144        let revision: u64 = connection
1145            .query_row("SELECT revision FROM sync_revision", [], |row| row.get(0))
1146            .map_err(map_sqlite_error)?;
1147
1148        Ok(revision)
1149    }
1150
1151    async fn insert_incoming_records(&self, records: Vec<Record>) -> Result<(), StorageError> {
1152        if records.is_empty() {
1153            return Ok(());
1154        }
1155
1156        let mut connection = self.get_connection()?;
1157        let tx = connection
1158            .transaction_with_behavior(TransactionBehavior::Immediate)
1159            .map_err(map_sqlite_error)?;
1160
1161        for record in records {
1162            tx.execute(
1163                "INSERT OR REPLACE INTO sync_incoming (
1164                    record_type
1165                ,   data_id
1166                ,   schema_version
1167                ,   commit_time
1168                ,   data
1169                ,   revision
1170                )
1171                 VALUES (?, ?, ?, strftime('%s','now'), ?, ?)",
1172                params![
1173                    record.id.r#type,
1174                    record.id.data_id,
1175                    record.schema_version.clone(),
1176                    serde_json::to_string(&record.data)?,
1177                    record.revision,
1178                ],
1179            )
1180            .map_err(map_sqlite_error)?;
1181        }
1182
1183        tx.commit().map_err(map_sqlite_error)?;
1184        Ok(())
1185    }
1186
1187    async fn delete_incoming_record(&self, record: Record) -> Result<(), StorageError> {
1188        let connection = self.get_connection()?;
1189
1190        connection
1191            .execute(
1192                "DELETE FROM sync_incoming WHERE record_type = ? AND data_id = ? AND revision = ?",
1193                params![record.id.r#type, record.id.data_id, record.revision],
1194            )
1195            .map_err(map_sqlite_error)?;
1196
1197        Ok(())
1198    }
1199
1200    async fn get_incoming_records(&self, limit: u32) -> Result<Vec<IncomingChange>, StorageError> {
1201        let connection = self.get_connection()?;
1202
1203        let mut stmt = connection
1204            .prepare(
1205                "SELECT i.record_type
1206            ,       i.data_id
1207            ,       i.schema_version
1208            ,       i.data
1209            ,       i.revision
1210            ,       e.schema_version AS existing_schema_version
1211            ,       e.commit_time AS existing_commit_time
1212            ,       e.data AS existing_data
1213            ,       e.revision AS existing_revision
1214             FROM sync_incoming i
1215             LEFT JOIN sync_state e ON i.record_type = e.record_type AND i.data_id = e.data_id
1216             ORDER BY i.revision ASC
1217             LIMIT ?",
1218            )
1219            .map_err(map_sqlite_error)?;
1220
1221        let mut rows = stmt.query(params![limit]).map_err(map_sqlite_error)?;
1222        let mut results = Vec::new();
1223
1224        while let Some(row) = rows.next().map_err(map_sqlite_error)? {
1225            let parent = if let Some(existing_data) =
1226                row.get::<_, Option<String>>(7).map_err(map_sqlite_error)?
1227            {
1228                Some(Record {
1229                    id: RecordId::new(
1230                        row.get::<_, String>(0).map_err(map_sqlite_error)?,
1231                        row.get::<_, String>(1).map_err(map_sqlite_error)?,
1232                    ),
1233                    schema_version: row.get(5).map_err(map_sqlite_error)?,
1234                    revision: row.get(8).map_err(map_sqlite_error)?,
1235                    data: serde_json::from_str(&existing_data)?,
1236                })
1237            } else {
1238                None
1239            };
1240            let record = Record {
1241                id: RecordId::new(
1242                    row.get::<_, String>(0).map_err(map_sqlite_error)?,
1243                    row.get::<_, String>(1).map_err(map_sqlite_error)?,
1244                ),
1245                schema_version: row.get(2).map_err(map_sqlite_error)?,
1246                data: serde_json::from_str(&row.get::<_, String>(3).map_err(map_sqlite_error)?)?,
1247                revision: row.get(4).map_err(map_sqlite_error)?,
1248            };
1249            results.push(IncomingChange {
1250                new_state: record,
1251                old_state: parent,
1252            });
1253        }
1254
1255        Ok(results)
1256    }
1257
1258    async fn get_latest_outgoing_change(&self) -> Result<Option<OutgoingChange>, StorageError> {
1259        let connection = self.get_connection()?;
1260
1261        let mut stmt = connection
1262            .prepare(
1263                "SELECT o.record_type
1264            ,       o.data_id
1265            ,       o.schema_version
1266            ,       o.commit_time
1267            ,       o.updated_fields_json
1268            ,       o.revision
1269            ,       e.schema_version AS existing_schema_version
1270            ,       e.commit_time AS existing_commit_time
1271            ,       e.data AS existing_data
1272            ,       e.revision AS existing_revision
1273             FROM sync_outgoing o
1274             LEFT JOIN sync_state e ON o.record_type = e.record_type AND o.data_id = e.data_id
1275             ORDER BY o.revision DESC
1276             LIMIT 1",
1277            )
1278            .map_err(map_sqlite_error)?;
1279
1280        let mut rows = stmt.query([]).map_err(map_sqlite_error)?;
1281
1282        if let Some(row) = rows.next().map_err(map_sqlite_error)? {
1283            let parent = if let Some(existing_data) =
1284                row.get::<_, Option<String>>(8).map_err(map_sqlite_error)?
1285            {
1286                Some(Record {
1287                    id: RecordId::new(
1288                        row.get::<_, String>(0).map_err(map_sqlite_error)?,
1289                        row.get::<_, String>(1).map_err(map_sqlite_error)?,
1290                    ),
1291                    schema_version: row.get(6).map_err(map_sqlite_error)?,
1292                    revision: row.get(9).map_err(map_sqlite_error)?,
1293                    data: serde_json::from_str(&existing_data)?,
1294                })
1295            } else {
1296                None
1297            };
1298            let change = RecordChange {
1299                id: RecordId::new(
1300                    row.get::<_, String>(0).map_err(map_sqlite_error)?,
1301                    row.get::<_, String>(1).map_err(map_sqlite_error)?,
1302                ),
1303                schema_version: row.get(2).map_err(map_sqlite_error)?,
1304                updated_fields: serde_json::from_str(
1305                    &row.get::<_, String>(4).map_err(map_sqlite_error)?,
1306                )?,
1307                local_revision: row.get(5).map_err(map_sqlite_error)?,
1308            };
1309
1310            return Ok(Some(OutgoingChange { change, parent }));
1311        }
1312
1313        Ok(None)
1314    }
1315
1316    async fn update_record_from_incoming(&self, record: Record) -> Result<(), StorageError> {
1317        let mut connection = self.get_connection()?;
1318        let tx = connection
1319            .transaction_with_behavior(TransactionBehavior::Immediate)
1320            .map_err(map_sqlite_error)?;
1321
1322        tx.execute(
1323            "INSERT OR REPLACE INTO sync_state (
1324                record_type
1325            ,   data_id
1326            ,   schema_version
1327            ,   commit_time
1328            ,   data
1329            ,   revision
1330            )
1331             VALUES (?, ?, ?, strftime('%s','now'), ?, ?)",
1332            params![
1333                record.id.r#type,
1334                record.id.data_id,
1335                record.schema_version.clone(),
1336                serde_json::to_string(&record.data)?,
1337                record.revision,
1338            ],
1339        )
1340        .map_err(map_sqlite_error)?;
1341
1342        tx.execute(
1343            "UPDATE sync_revision SET revision = MAX(revision, ?)",
1344            params![record.revision],
1345        )
1346        .map_err(map_sqlite_error)?;
1347
1348        tx.commit().map_err(map_sqlite_error)?;
1349        Ok(())
1350    }
1351}
1352
1353/// Base query for payment lookups.
1354/// Column indices 0-30 are used by `map_payment`, index 31 (`parent_payment_id`) is only used by `get_payments_by_parent_ids`.
1355const SELECT_PAYMENT_SQL: &str = "
1356    SELECT p.id,
1357           p.payment_type,
1358           p.status,
1359           p.amount,
1360           p.fees,
1361           p.timestamp,
1362           p.method,
1363           p.withdraw_tx_id,
1364           p.deposit_tx_id,
1365           p.spark,
1366           l.invoice AS lightning_invoice,
1367           l.payment_hash AS lightning_payment_hash,
1368           l.destination_pubkey AS lightning_destination_pubkey,
1369           COALESCE(l.description, pm.lnurl_description) AS lightning_description,
1370           l.preimage AS lightning_preimage,
1371           l.htlc_status AS lightning_htlc_status,
1372           l.htlc_expiry_time AS lightning_htlc_expiry_time,
1373           pm.lnurl_pay_info,
1374           pm.lnurl_withdraw_info,
1375           pm.conversion_info,
1376           t.metadata AS token_metadata,
1377           t.tx_hash AS token_tx_hash,
1378           t.tx_type AS token_tx_type,
1379           t.invoice_details AS token_invoice_details,
1380           s.invoice_details AS spark_invoice_details,
1381           s.htlc_details AS spark_htlc_details,
1382           lrm.nostr_zap_request AS lnurl_nostr_zap_request,
1383           lrm.nostr_zap_receipt AS lnurl_nostr_zap_receipt,
1384           lrm.sender_comment AS lnurl_sender_comment,
1385           lrm.payment_hash AS lnurl_payment_hash,
1386           pm.conversion_status,
1387           pm.parent_payment_id
1388      FROM payments p
1389      LEFT JOIN payment_details_lightning l ON p.id = l.payment_id
1390      LEFT JOIN payment_details_token t ON p.id = t.payment_id
1391      LEFT JOIN payment_details_spark s ON p.id = s.payment_id
1392      LEFT JOIN payment_metadata pm ON p.id = pm.payment_id
1393      LEFT JOIN lnurl_receive_metadata lrm ON l.payment_hash = lrm.payment_hash";
1394
1395#[allow(clippy::too_many_lines)]
1396fn map_payment(row: &Row<'_>) -> Result<Payment, rusqlite::Error> {
1397    let withdraw_tx_id: Option<String> = row.get(7)?;
1398    let deposit_tx_id: Option<String> = row.get(8)?;
1399    let spark: Option<i32> = row.get(9)?;
1400    let lightning_invoice: Option<String> = row.get(10)?;
1401    let token_metadata: Option<String> = row.get(20)?;
1402    let details = match (
1403        lightning_invoice,
1404        withdraw_tx_id,
1405        deposit_tx_id,
1406        spark,
1407        token_metadata,
1408    ) {
1409        (Some(invoice), _, _, _, _) => {
1410            let payment_hash: String = row.get(11)?;
1411            let destination_pubkey: String = row.get(12)?;
1412            let description: Option<String> = row.get(13)?;
1413            let preimage: Option<String> = row.get(14)?;
1414            let htlc_status: SparkHtlcStatus =
1415                row.get::<_, Option<SparkHtlcStatus>>(15)?.ok_or_else(|| {
1416                    rusqlite::Error::FromSqlConversionFailure(
1417                        15,
1418                        rusqlite::types::Type::Null,
1419                        "htlc_status is required for Lightning payments".into(),
1420                    )
1421                })?;
1422            let htlc_expiry_time: u64 = row.get(16)?;
1423            let htlc_details = SparkHtlcDetails {
1424                payment_hash,
1425                preimage,
1426                expiry_time: htlc_expiry_time,
1427                status: htlc_status,
1428            };
1429            let lnurl_pay_info: Option<LnurlPayInfo> = row.get(17)?;
1430            let lnurl_withdraw_info: Option<LnurlWithdrawInfo> = row.get(18)?;
1431            let lnurl_nostr_zap_request: Option<String> = row.get(26)?;
1432            let lnurl_nostr_zap_receipt: Option<String> = row.get(27)?;
1433            let lnurl_sender_comment: Option<String> = row.get(28)?;
1434            let lnurl_payment_hash: Option<String> = row.get(29)?;
1435            let lnurl_receive_metadata = if lnurl_payment_hash.is_some() {
1436                Some(LnurlReceiveMetadata {
1437                    nostr_zap_request: lnurl_nostr_zap_request,
1438                    nostr_zap_receipt: lnurl_nostr_zap_receipt,
1439                    sender_comment: lnurl_sender_comment,
1440                })
1441            } else {
1442                None
1443            };
1444            Some(PaymentDetails::Lightning {
1445                invoice,
1446                destination_pubkey,
1447                description,
1448                htlc_details,
1449                lnurl_pay_info,
1450                lnurl_withdraw_info,
1451                lnurl_receive_metadata,
1452            })
1453        }
1454        (_, Some(tx_id), _, _, _) => Some(PaymentDetails::Withdraw { tx_id }),
1455        (_, _, Some(tx_id), _, _) => Some(PaymentDetails::Deposit { tx_id }),
1456        (_, _, _, Some(_), _) => {
1457            let invoice_details_str: Option<String> = row.get(24)?;
1458            let invoice_details = invoice_details_str
1459                .map(|s| serde_json_from_str(&s, 24))
1460                .transpose()?;
1461            let htlc_details_str: Option<String> = row.get(25)?;
1462            let htlc_details = htlc_details_str
1463                .map(|s| serde_json_from_str(&s, 25))
1464                .transpose()?;
1465            let conversion_info_str: Option<String> = row.get(19)?;
1466            let conversion_info: Option<ConversionInfo> = conversion_info_str
1467                .map(|s: String| serde_json_from_str(&s, 19))
1468                .transpose()?;
1469            Some(PaymentDetails::Spark {
1470                invoice_details,
1471                htlc_details,
1472                conversion_info,
1473            })
1474        }
1475        (_, _, _, _, Some(metadata)) => {
1476            let tx_type: TokenTransactionType = row.get(22)?;
1477            let invoice_details_str: Option<String> = row.get(23)?;
1478            let invoice_details = invoice_details_str
1479                .map(|s| serde_json_from_str(&s, 23))
1480                .transpose()?;
1481            let conversion_info_str: Option<String> = row.get(19)?;
1482            let conversion_info: Option<ConversionInfo> = conversion_info_str
1483                .map(|s: String| serde_json_from_str(&s, 19))
1484                .transpose()?;
1485            Some(PaymentDetails::Token {
1486                metadata: serde_json_from_str(&metadata, 20)?,
1487                tx_hash: row.get(21)?,
1488                tx_type,
1489                invoice_details,
1490                conversion_info,
1491            })
1492        }
1493        _ => None,
1494    };
1495    // Read conversion_status from payment_metadata (column 30)
1496    let conversion_status: Option<ConversionStatus> = row.get(30)?;
1497    let conversion_details = conversion_status.map(|status| ConversionDetails {
1498        status,
1499        from: None,
1500        to: None,
1501    });
1502
1503    Ok(Payment {
1504        id: row.get(0)?,
1505        payment_type: row.get::<_, String>(1)?.parse().map_err(|e: String| {
1506            rusqlite::Error::FromSqlConversionFailure(1, rusqlite::types::Type::Text, e.into())
1507        })?,
1508        status: row.get::<_, String>(2)?.parse().map_err(|e: String| {
1509            rusqlite::Error::FromSqlConversionFailure(2, rusqlite::types::Type::Text, e.into())
1510        })?,
1511        amount: row.get::<_, U128SqlWrapper>(3)?.0,
1512        fees: row.get::<_, U128SqlWrapper>(4)?.0,
1513        timestamp: row.get(5)?,
1514        details,
1515        method: row.get(6)?,
1516        conversion_details,
1517    })
1518}
1519
1520impl ToSql for PaymentDetails {
1521    fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
1522        to_sql_json(self)
1523    }
1524}
1525
1526impl FromSql for PaymentDetails {
1527    fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
1528        from_sql_json(value)
1529    }
1530}
1531
1532impl ToSql for PaymentMethod {
1533    fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
1534        Ok(rusqlite::types::ToSqlOutput::from(self.to_string()))
1535    }
1536}
1537
1538impl FromSql for PaymentMethod {
1539    fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
1540        match value {
1541            ValueRef::Text(i) => {
1542                let s = std::str::from_utf8(i).map_err(|e| FromSqlError::Other(Box::new(e)))?;
1543                // NOTE: trim_matches/to_lowercase is here, because this used to be serde_json serialized.
1544                let payment_method: PaymentMethod = s
1545                    .trim_matches('"')
1546                    .to_lowercase()
1547                    .parse()
1548                    .map_err(|()| FromSqlError::InvalidType)?;
1549                Ok(payment_method)
1550            }
1551            _ => Err(FromSqlError::InvalidType),
1552        }
1553    }
1554}
1555
1556impl ToSql for TokenTransactionType {
1557    fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
1558        Ok(rusqlite::types::ToSqlOutput::from(self.to_string()))
1559    }
1560}
1561
1562impl FromSql for TokenTransactionType {
1563    fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
1564        match value {
1565            ValueRef::Text(i) => {
1566                let s = std::str::from_utf8(i).map_err(|e| FromSqlError::Other(Box::new(e)))?;
1567                let tx_type: TokenTransactionType =
1568                    s.parse().map_err(|_: String| FromSqlError::InvalidType)?;
1569                Ok(tx_type)
1570            }
1571            _ => Err(FromSqlError::InvalidType),
1572        }
1573    }
1574}
1575
1576impl ToSql for SparkHtlcStatus {
1577    fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
1578        Ok(rusqlite::types::ToSqlOutput::from(self.to_string()))
1579    }
1580}
1581
1582impl FromSql for SparkHtlcStatus {
1583    fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
1584        match value {
1585            ValueRef::Text(i) => {
1586                let s = std::str::from_utf8(i).map_err(|e| FromSqlError::Other(Box::new(e)))?;
1587                let status: SparkHtlcStatus =
1588                    s.parse().map_err(|_: String| FromSqlError::InvalidType)?;
1589                Ok(status)
1590            }
1591            _ => Err(FromSqlError::InvalidType),
1592        }
1593    }
1594}
1595
1596impl ToSql for ConversionStatus {
1597    fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
1598        Ok(rusqlite::types::ToSqlOutput::from(self.to_string()))
1599    }
1600}
1601
1602impl FromSql for ConversionStatus {
1603    fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
1604        match value {
1605            ValueRef::Text(i) => {
1606                let s = std::str::from_utf8(i).map_err(|e| FromSqlError::Other(Box::new(e)))?;
1607                let status: ConversionStatus =
1608                    s.parse().map_err(|_: String| FromSqlError::InvalidType)?;
1609                Ok(status)
1610            }
1611            _ => Err(FromSqlError::InvalidType),
1612        }
1613    }
1614}
1615
1616impl ToSql for DepositClaimError {
1617    fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
1618        to_sql_json(self)
1619    }
1620}
1621
1622impl FromSql for DepositClaimError {
1623    fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
1624        from_sql_json(value)
1625    }
1626}
1627
1628impl ToSql for LnurlPayInfo {
1629    fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
1630        to_sql_json(self)
1631    }
1632}
1633
1634impl FromSql for LnurlPayInfo {
1635    fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
1636        from_sql_json(value)
1637    }
1638}
1639
1640impl ToSql for LnurlWithdrawInfo {
1641    fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
1642        to_sql_json(self)
1643    }
1644}
1645
1646impl FromSql for LnurlWithdrawInfo {
1647    fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
1648        from_sql_json(value)
1649    }
1650}
1651
1652fn to_sql_json<T>(value: T) -> rusqlite::Result<ToSqlOutput<'static>>
1653where
1654    T: serde::Serialize,
1655{
1656    let json = serde_json::to_string(&value)
1657        .map_err(|e| rusqlite::Error::ToSqlConversionFailure(Box::new(e)))?;
1658    Ok(rusqlite::types::ToSqlOutput::from(json))
1659}
1660
1661fn from_sql_json<T>(value: ValueRef<'_>) -> FromSqlResult<T>
1662where
1663    T: serde::de::DeserializeOwned,
1664{
1665    match value {
1666        ValueRef::Text(i) => {
1667            let s = std::str::from_utf8(i).map_err(|e| FromSqlError::Other(Box::new(e)))?;
1668            let deserialized: T = serde_json::from_str(s).map_err(|_| FromSqlError::InvalidType)?;
1669            Ok(deserialized)
1670        }
1671        _ => Err(FromSqlError::InvalidType),
1672    }
1673}
1674
1675fn serde_json_from_str<T>(value: &str, index: usize) -> Result<T, rusqlite::Error>
1676where
1677    T: serde::de::DeserializeOwned,
1678{
1679    serde_json::from_str(value).map_err(|e| {
1680        rusqlite::Error::FromSqlConversionFailure(index, rusqlite::types::Type::Text, Box::new(e))
1681    })
1682}
1683
1684struct U128SqlWrapper(u128);
1685
1686impl ToSql for U128SqlWrapper {
1687    fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
1688        let string = self.0.to_string();
1689        Ok(rusqlite::types::ToSqlOutput::from(string))
1690    }
1691}
1692
1693impl FromSql for U128SqlWrapper {
1694    fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
1695        match value {
1696            ValueRef::Text(i) => {
1697                let s = std::str::from_utf8(i).map_err(|e| FromSqlError::Other(Box::new(e)))?;
1698                let integer = s.parse::<u128>().map_err(|_| FromSqlError::InvalidType)?;
1699                Ok(U128SqlWrapper(integer))
1700            }
1701            _ => Err(FromSqlError::InvalidType),
1702        }
1703    }
1704}
1705
1706#[cfg(test)]
1707mod tests {
1708
1709    use crate::SqliteStorage;
1710    use std::path::PathBuf;
1711
1712    /// Helper function to create a temporary directory for tests
1713    /// Uses std library to avoid external dependency
1714    fn create_temp_dir(name: &str) -> PathBuf {
1715        let mut path = std::env::temp_dir();
1716        // Use UUID for uniqueness to avoid conflicts between parallel tests
1717        path.push(format!("breez-test-{}-{}", name, uuid::Uuid::new_v4()));
1718        std::fs::create_dir_all(&path).unwrap();
1719        path
1720    }
1721
1722    #[tokio::test]
1723    async fn test_storage() {
1724        let temp_dir = create_temp_dir("sqlite_storage");
1725        let storage = SqliteStorage::new(&temp_dir).unwrap();
1726
1727        Box::pin(crate::persist::tests::test_storage(Box::new(storage))).await;
1728    }
1729
1730    #[tokio::test]
1731    async fn test_unclaimed_deposits_crud() {
1732        let temp_dir = create_temp_dir("sqlite_storage_deposits");
1733        let storage = SqliteStorage::new(&temp_dir).unwrap();
1734
1735        crate::persist::tests::test_unclaimed_deposits_crud(Box::new(storage)).await;
1736    }
1737
1738    #[tokio::test]
1739    async fn test_deposit_refunds() {
1740        let temp_dir = create_temp_dir("sqlite_storage_refund_tx");
1741        let storage = SqliteStorage::new(&temp_dir).unwrap();
1742
1743        crate::persist::tests::test_deposit_refunds(Box::new(storage)).await;
1744    }
1745
1746    #[tokio::test]
1747    async fn test_payment_type_filtering() {
1748        let temp_dir = create_temp_dir("sqlite_storage_type_filter");
1749        let storage = SqliteStorage::new(&temp_dir).unwrap();
1750
1751        crate::persist::tests::test_payment_type_filtering(Box::new(storage)).await;
1752    }
1753
1754    #[tokio::test]
1755    async fn test_payment_status_filtering() {
1756        let temp_dir = create_temp_dir("sqlite_storage_status_filter");
1757        let storage = SqliteStorage::new(&temp_dir).unwrap();
1758
1759        crate::persist::tests::test_payment_status_filtering(Box::new(storage)).await;
1760    }
1761
1762    #[tokio::test]
1763    async fn test_payment_asset_filtering() {
1764        let temp_dir = create_temp_dir("sqlite_storage_asset_filter");
1765        let storage = SqliteStorage::new(&temp_dir).unwrap();
1766
1767        crate::persist::tests::test_asset_filtering(Box::new(storage)).await;
1768    }
1769
1770    #[tokio::test]
1771    async fn test_timestamp_filtering() {
1772        let temp_dir = create_temp_dir("sqlite_storage_timestamp_filter");
1773        let storage = SqliteStorage::new(&temp_dir).unwrap();
1774
1775        crate::persist::tests::test_timestamp_filtering(Box::new(storage)).await;
1776    }
1777
1778    #[tokio::test]
1779    async fn test_spark_htlc_status_filtering() {
1780        let temp_dir = create_temp_dir("sqlite_storage_htlc_filter");
1781        let storage = SqliteStorage::new(&temp_dir).unwrap();
1782
1783        crate::persist::tests::test_spark_htlc_status_filtering(Box::new(storage)).await;
1784    }
1785
1786    #[tokio::test]
1787    async fn test_lightning_htlc_details_and_status_filtering() {
1788        let temp_dir = create_temp_dir("sqlite_storage_htlc_details");
1789        let storage = SqliteStorage::new(&temp_dir).unwrap();
1790
1791        crate::persist::tests::test_lightning_htlc_details_and_status_filtering(Box::new(storage))
1792            .await;
1793    }
1794
1795    #[tokio::test]
1796    async fn test_conversion_refund_needed_filtering() {
1797        let temp_dir = create_temp_dir("sqlite_storage_conversion_refund_needed_filter");
1798        let storage = SqliteStorage::new(&temp_dir).unwrap();
1799
1800        crate::persist::tests::test_conversion_refund_needed_filtering(Box::new(storage)).await;
1801    }
1802
1803    #[tokio::test]
1804    async fn test_token_transaction_type_filtering() {
1805        let temp_dir = create_temp_dir("sqlite_storage_token_transaction_type_filter");
1806        let storage = SqliteStorage::new(&temp_dir).unwrap();
1807
1808        crate::persist::tests::test_token_transaction_type_filtering(Box::new(storage)).await;
1809    }
1810
1811    #[tokio::test]
1812    async fn test_combined_filters() {
1813        let temp_dir = create_temp_dir("sqlite_storage_combined_filter");
1814        let storage = SqliteStorage::new(&temp_dir).unwrap();
1815
1816        crate::persist::tests::test_combined_filters(Box::new(storage)).await;
1817    }
1818
1819    #[tokio::test]
1820    async fn test_sort_order() {
1821        let temp_dir = create_temp_dir("sqlite_storage_sort_order");
1822        let storage = SqliteStorage::new(&temp_dir).unwrap();
1823
1824        crate::persist::tests::test_sort_order(Box::new(storage)).await;
1825    }
1826
1827    #[tokio::test]
1828    async fn test_payment_metadata() {
1829        let temp_dir = create_temp_dir("sqlite_storage_payment_request_metadata");
1830        let storage = SqliteStorage::new(&temp_dir).unwrap();
1831
1832        crate::persist::tests::test_payment_metadata(Box::new(storage)).await;
1833    }
1834
1835    #[tokio::test]
1836    async fn test_payment_details_update_persistence() {
1837        let temp_dir = create_temp_dir("sqlite_storage_payment_details_update");
1838        let storage = SqliteStorage::new(&temp_dir).unwrap();
1839
1840        crate::persist::tests::test_payment_details_update_persistence(Box::new(storage)).await;
1841    }
1842
1843    #[tokio::test]
1844    async fn test_sync_storage() {
1845        let temp_dir = create_temp_dir("sqlite_sync_storage");
1846        let storage = SqliteStorage::new(&temp_dir).unwrap();
1847
1848        crate::persist::tests::test_sync_storage(Box::new(storage)).await;
1849    }
1850
1851    #[tokio::test]
1852    async fn test_payment_metadata_merge() {
1853        let temp_dir = create_temp_dir("sqlite_payment_metadata_merge");
1854        let storage = SqliteStorage::new(&temp_dir).unwrap();
1855
1856        crate::persist::tests::test_payment_metadata_merge(Box::new(storage)).await;
1857    }
1858
1859    #[tokio::test]
1860    #[allow(clippy::too_many_lines)]
1861    async fn test_migration_tx_type() {
1862        use crate::{
1863            Payment, PaymentDetails, PaymentMethod, PaymentStatus, PaymentType, Storage,
1864            TokenMetadata, TokenTransactionType,
1865            persist::{StorageListPaymentsRequest, StoragePaymentDetailsFilter},
1866        };
1867        use rusqlite::{Connection, params};
1868        use rusqlite_migration::{M, Migrations};
1869
1870        let temp_dir = create_temp_dir("sqlite_migration_tx_type");
1871        let db_path = temp_dir.join(super::DEFAULT_DB_FILENAME);
1872
1873        // Step 1: Create database at version 21 (before tx_type migration)
1874        {
1875            let mut conn = Connection::open(&db_path).unwrap();
1876            let migrations_before_tx_type: Vec<_> = SqliteStorage::current_migrations()
1877                .iter()
1878                .take(22) // Migrations 0-21 (index 22 is the tx_type migration)
1879                .map(|s| M::up(s))
1880                .collect();
1881            let migrations = Migrations::new(migrations_before_tx_type);
1882            migrations.to_latest(&mut conn).unwrap();
1883        }
1884
1885        // Step 2: Insert a token payment WITHOUT tx_type column
1886        {
1887            let conn = Connection::open(&db_path).unwrap();
1888
1889            // Insert into payments table
1890            conn.execute(
1891                "INSERT INTO payments (id, payment_type, status, amount, fees, timestamp, method)
1892                 VALUES (?, ?, ?, ?, ?, ?, ?)",
1893                params![
1894                    "token-migration-test",
1895                    "send",
1896                    "completed",
1897                    "5000",
1898                    "10",
1899                    1_234_567_890_i64,
1900                    "\"token\""
1901                ],
1902            )
1903            .unwrap();
1904
1905            // Insert into payment_details_token WITHOUT tx_type (pre-migration)
1906            let metadata = serde_json::json!({
1907                "identifier": "test-token-id",
1908                "issuer_public_key": format!("02{}", "a".repeat(64)),
1909                "name": "Test Token",
1910                "ticker": "TST",
1911                "decimals": 8,
1912                "max_supply": 1_000_000_u128,
1913                "is_freezable": false
1914            });
1915
1916            conn.execute(
1917                "INSERT INTO payment_details_token (payment_id, metadata, tx_hash)
1918                 VALUES (?, ?, ?)",
1919                params![
1920                    "token-migration-test",
1921                    metadata.to_string(),
1922                    "0xabcdef1234567890"
1923                ],
1924            )
1925            .unwrap();
1926        }
1927
1928        // Step 3: Open with SqliteStorage (triggers migration to latest)
1929        let storage = SqliteStorage::new(&temp_dir).unwrap();
1930
1931        // Step 4: Verify the migrated token payment
1932        let migrated_payment = storage
1933            .get_payment_by_id("token-migration-test".to_string())
1934            .await
1935            .unwrap();
1936
1937        assert_eq!(migrated_payment.id, "token-migration-test");
1938        assert_eq!(migrated_payment.amount, 5000);
1939        assert_eq!(migrated_payment.fees, 10);
1940        assert_eq!(migrated_payment.status, PaymentStatus::Completed);
1941        assert_eq!(migrated_payment.payment_type, PaymentType::Send);
1942        assert_eq!(migrated_payment.method, PaymentMethod::Token);
1943
1944        // Verify token payment details have the default txType
1945        match migrated_payment.details {
1946            Some(PaymentDetails::Token {
1947                metadata,
1948                tx_hash,
1949                tx_type,
1950                ..
1951            }) => {
1952                assert_eq!(metadata.identifier, "test-token-id");
1953                assert_eq!(metadata.name, "Test Token");
1954                assert_eq!(metadata.ticker, "TST");
1955                assert_eq!(metadata.decimals, 8);
1956                assert_eq!(tx_hash, "0xabcdef1234567890");
1957                // Key assertion: migration added default tx_type
1958                assert_eq!(
1959                    tx_type,
1960                    TokenTransactionType::Transfer,
1961                    "Migration should add default txType 'transfer' to token payments"
1962                );
1963            }
1964            _ => panic!("Expected Token payment details"),
1965        }
1966
1967        // Step 5: Insert a new token payment with explicit tx_type
1968        let new_payment = Payment {
1969            id: "new-token-payment".to_string(),
1970            payment_type: PaymentType::Receive,
1971            status: PaymentStatus::Completed,
1972            amount: 8000,
1973            fees: 20,
1974            timestamp: 1_234_567_891,
1975            method: PaymentMethod::Token,
1976            details: Some(PaymentDetails::Token {
1977                metadata: TokenMetadata {
1978                    identifier: "another-token-id".to_string(),
1979                    issuer_public_key: format!("02{}", "b".repeat(64)),
1980                    name: "Another Token".to_string(),
1981                    ticker: "ATK".to_string(),
1982                    decimals: 6,
1983                    max_supply: 2_000_000,
1984                    is_freezable: true,
1985                },
1986                tx_hash: "0x1111222233334444".to_string(),
1987                tx_type: TokenTransactionType::Mint,
1988                invoice_details: None,
1989                conversion_info: None,
1990            }),
1991            conversion_details: None,
1992        };
1993
1994        storage.insert_payment(new_payment).await.unwrap();
1995
1996        // Step 6: List all payments
1997        let request = StorageListPaymentsRequest {
1998            type_filter: None,
1999            status_filter: None,
2000            asset_filter: None,
2001            payment_details_filter: None,
2002            from_timestamp: None,
2003            to_timestamp: None,
2004            offset: None,
2005            limit: None,
2006            sort_ascending: Some(true),
2007        };
2008
2009        let payments = storage.list_payments(request).await.unwrap();
2010        assert_eq!(payments.len(), 2, "Should have both payments");
2011
2012        // Verify migrated payment has Transfer type
2013        let migrated = payments
2014            .iter()
2015            .find(|p| p.id == "token-migration-test")
2016            .unwrap();
2017        match &migrated.details {
2018            Some(PaymentDetails::Token { tx_type, .. }) => {
2019                assert_eq!(*tx_type, TokenTransactionType::Transfer);
2020            }
2021            _ => panic!("Expected Token payment details"),
2022        }
2023
2024        // Verify new payment has Mint type
2025        let new = payments
2026            .iter()
2027            .find(|p| p.id == "new-token-payment")
2028            .unwrap();
2029        match &new.details {
2030            Some(PaymentDetails::Token { tx_type, .. }) => {
2031                assert_eq!(*tx_type, TokenTransactionType::Mint);
2032            }
2033            _ => panic!("Expected Token payment details"),
2034        }
2035
2036        // Step 7: Test filtering by token transaction type
2037        let transfer_filter = StorageListPaymentsRequest {
2038            type_filter: None,
2039            status_filter: None,
2040            asset_filter: None,
2041            payment_details_filter: Some(vec![StoragePaymentDetailsFilter::Token {
2042                conversion_refund_needed: None,
2043                tx_hash: None,
2044                tx_type: Some(TokenTransactionType::Transfer),
2045            }]),
2046            from_timestamp: None,
2047            to_timestamp: None,
2048            offset: None,
2049            limit: None,
2050            sort_ascending: Some(true),
2051        };
2052
2053        let transfer_payments = storage.list_payments(transfer_filter).await.unwrap();
2054        assert_eq!(
2055            transfer_payments.len(),
2056            1,
2057            "Should find only the Transfer payment"
2058        );
2059        assert_eq!(transfer_payments[0].id, "token-migration-test");
2060    }
2061
2062    #[tokio::test]
2063    #[allow(clippy::too_many_lines)]
2064    async fn test_migration_htlc_details() {
2065        use crate::{
2066            PaymentDetails, SparkHtlcStatus, Storage,
2067            persist::{StorageListPaymentsRequest, StoragePaymentDetailsFilter},
2068        };
2069        use rusqlite::{Connection, params};
2070        use rusqlite_migration::{M, Migrations};
2071
2072        let temp_dir = create_temp_dir("sqlite_migration_htlc_details");
2073        let db_path = temp_dir.join(super::DEFAULT_DB_FILENAME);
2074
2075        // Step 1: Create database at version 23 (before the htlc_status backfill migration)
2076        // This includes the ALTER TABLE that adds htlc_status and htlc_expiry_time columns (migration 22)
2077        // but not the backfill UPDATE (migration 23).
2078        {
2079            let mut conn = Connection::open(&db_path).unwrap();
2080            let migrations_before_backfill: Vec<_> = SqliteStorage::current_migrations()
2081                .iter()
2082                .take(23) // Migrations 0-22
2083                .map(|s| M::up(s))
2084                .collect();
2085            let migrations = Migrations::new(migrations_before_backfill);
2086            migrations.to_latest(&mut conn).unwrap();
2087        }
2088
2089        // Step 2: Insert Lightning payments with different statuses to test all branches
2090        {
2091            let conn = Connection::open(&db_path).unwrap();
2092
2093            // Insert a Completed Lightning payment
2094            conn.execute(
2095                "INSERT INTO payments (id, payment_type, status, amount, fees, timestamp, method)
2096                 VALUES (?, ?, ?, ?, ?, ?, ?)",
2097                params![
2098                    "ln-completed",
2099                    "send",
2100                    "completed",
2101                    "1000",
2102                    "10",
2103                    1_700_000_001_i64,
2104                    "\"lightning\""
2105                ],
2106            )
2107            .unwrap();
2108            conn.execute(
2109                "INSERT INTO payment_details_lightning (payment_id, invoice, payment_hash, destination_pubkey, preimage)
2110                 VALUES (?, ?, ?, ?, ?)",
2111                params![
2112                    "ln-completed",
2113                    "lnbc_completed",
2114                    "hash_completed_0123456789abcdef0123456789abcdef0123456789abcdef01234567",
2115                    "03pubkey1",
2116                    "preimage_completed"
2117                ],
2118            )
2119            .unwrap();
2120
2121            // Insert a Pending Lightning payment
2122            conn.execute(
2123                "INSERT INTO payments (id, payment_type, status, amount, fees, timestamp, method)
2124                 VALUES (?, ?, ?, ?, ?, ?, ?)",
2125                params![
2126                    "ln-pending",
2127                    "receive",
2128                    "pending",
2129                    "2000",
2130                    "0",
2131                    1_700_000_002_i64,
2132                    "\"lightning\""
2133                ],
2134            )
2135            .unwrap();
2136            conn.execute(
2137                "INSERT INTO payment_details_lightning (payment_id, invoice, payment_hash, destination_pubkey)
2138                 VALUES (?, ?, ?, ?)",
2139                params![
2140                    "ln-pending",
2141                    "lnbc_pending",
2142                    "hash_pending_0123456789abcdef0123456789abcdef0123456789abcdef012345678",
2143                    "03pubkey2"
2144                ],
2145            )
2146            .unwrap();
2147
2148            // Insert a Failed Lightning payment
2149            conn.execute(
2150                "INSERT INTO payments (id, payment_type, status, amount, fees, timestamp, method)
2151                 VALUES (?, ?, ?, ?, ?, ?, ?)",
2152                params![
2153                    "ln-failed",
2154                    "send",
2155                    "failed",
2156                    "3000",
2157                    "5",
2158                    1_700_000_003_i64,
2159                    "\"lightning\""
2160                ],
2161            )
2162            .unwrap();
2163            conn.execute(
2164                "INSERT INTO payment_details_lightning (payment_id, invoice, payment_hash, destination_pubkey)
2165                 VALUES (?, ?, ?, ?)",
2166                params![
2167                    "ln-failed",
2168                    "lnbc_failed",
2169                    "hash_failed_0123456789abcdef0123456789abcdef0123456789abcdef0123456789",
2170                    "03pubkey3"
2171                ],
2172            )
2173            .unwrap();
2174        }
2175
2176        // Step 3: Open with SqliteStorage (triggers migration 23 - the backfill)
2177        let storage = SqliteStorage::new(&temp_dir).unwrap();
2178
2179        // Step 4: Verify Completed → PreimageShared
2180        let completed = storage
2181            .get_payment_by_id("ln-completed".to_string())
2182            .await
2183            .unwrap();
2184        match &completed.details {
2185            Some(PaymentDetails::Lightning { htlc_details, .. }) => {
2186                assert_eq!(htlc_details.status, SparkHtlcStatus::PreimageShared);
2187                assert_eq!(htlc_details.expiry_time, 0);
2188                assert_eq!(
2189                    htlc_details.payment_hash,
2190                    "hash_completed_0123456789abcdef0123456789abcdef0123456789abcdef01234567"
2191                );
2192                assert_eq!(htlc_details.preimage.as_deref(), Some("preimage_completed"));
2193            }
2194            _ => panic!("Expected Lightning payment details for ln-completed"),
2195        }
2196
2197        // Step 5: Verify Pending → WaitingForPreimage
2198        let pending = storage
2199            .get_payment_by_id("ln-pending".to_string())
2200            .await
2201            .unwrap();
2202        match &pending.details {
2203            Some(PaymentDetails::Lightning { htlc_details, .. }) => {
2204                assert_eq!(htlc_details.status, SparkHtlcStatus::WaitingForPreimage);
2205                assert_eq!(htlc_details.expiry_time, 0);
2206                assert_eq!(
2207                    htlc_details.payment_hash,
2208                    "hash_pending_0123456789abcdef0123456789abcdef0123456789abcdef012345678"
2209                );
2210                assert!(htlc_details.preimage.is_none());
2211            }
2212            _ => panic!("Expected Lightning payment details for ln-pending"),
2213        }
2214
2215        // Step 6: Verify Failed → Returned
2216        let failed = storage
2217            .get_payment_by_id("ln-failed".to_string())
2218            .await
2219            .unwrap();
2220        match &failed.details {
2221            Some(PaymentDetails::Lightning { htlc_details, .. }) => {
2222                assert_eq!(htlc_details.status, SparkHtlcStatus::Returned);
2223                assert_eq!(htlc_details.expiry_time, 0);
2224            }
2225            _ => panic!("Expected Lightning payment details for ln-failed"),
2226        }
2227
2228        // Step 7: Verify filtering by htlc_status works on migrated data
2229        let waiting_payments = storage
2230            .list_payments(StorageListPaymentsRequest {
2231                payment_details_filter: Some(vec![StoragePaymentDetailsFilter::Lightning {
2232                    htlc_status: Some(vec![SparkHtlcStatus::WaitingForPreimage]),
2233                }]),
2234                ..Default::default()
2235            })
2236            .await
2237            .unwrap();
2238        assert_eq!(waiting_payments.len(), 1);
2239        assert_eq!(waiting_payments[0].id, "ln-pending");
2240
2241        let preimage_shared = storage
2242            .list_payments(StorageListPaymentsRequest {
2243                payment_details_filter: Some(vec![StoragePaymentDetailsFilter::Lightning {
2244                    htlc_status: Some(vec![SparkHtlcStatus::PreimageShared]),
2245                }]),
2246                ..Default::default()
2247            })
2248            .await
2249            .unwrap();
2250        assert_eq!(preimage_shared.len(), 1);
2251        assert_eq!(preimage_shared[0].id, "ln-completed");
2252
2253        let returned = storage
2254            .list_payments(StorageListPaymentsRequest {
2255                payment_details_filter: Some(vec![StoragePaymentDetailsFilter::Lightning {
2256                    htlc_status: Some(vec![SparkHtlcStatus::Returned]),
2257                }]),
2258                ..Default::default()
2259            })
2260            .await
2261            .unwrap();
2262        assert_eq!(returned.len(), 1);
2263        assert_eq!(returned[0].id, "ln-failed");
2264    }
2265
2266    #[tokio::test]
2267    async fn test_contacts_crud() {
2268        let temp_dir = create_temp_dir("contacts_crud");
2269        let storage = SqliteStorage::new(&temp_dir).unwrap();
2270
2271        crate::persist::tests::test_contacts_crud(Box::new(storage)).await;
2272    }
2273
2274    #[tokio::test]
2275    async fn test_conversion_status_persistence() {
2276        let temp_dir = create_temp_dir("sqlite_conversion_status_persistence");
2277        let storage = SqliteStorage::new(&temp_dir).unwrap();
2278
2279        crate::persist::tests::test_conversion_status_persistence(Box::new(storage)).await;
2280    }
2281}