1use std::path::{Path, PathBuf};
2
3use macros::async_trait;
4use rusqlite::{
5 Connection, Row, ToSql, TransactionBehavior, params,
6 types::{FromSql, FromSqlError, FromSqlResult, ToSqlOutput, ValueRef},
7};
8use rusqlite_migration::{M, Migrations, SchemaVersion};
9
10use crate::{
11 AssetFilter, Contact, ConversionDetails, ConversionInfo, ConversionStatus, DepositInfo,
12 ListContactsRequest, LnurlPayInfo, LnurlReceiveMetadata, LnurlWithdrawInfo, PaymentDetails,
13 PaymentMethod, SparkHtlcDetails, SparkHtlcStatus, TokenTransactionType,
14 error::DepositClaimError,
15 persist::{
16 PaymentMetadata, SetLnurlMetadataItem, StorageListPaymentsRequest,
17 StoragePaymentDetailsFilter, UpdateDepositPayload,
18 },
19 sync_storage::{
20 IncomingChange, OutgoingChange, Record, RecordChange, RecordId, UnversionedRecordChange,
21 },
22};
23
24use std::collections::HashMap;
25
26use tracing::warn;
27
28use super::{Payment, Storage, StorageError};
29
30const DEFAULT_DB_FILENAME: &str = "storage.sql";
31pub struct SqliteStorage {
33 db_dir: PathBuf,
34}
35
36impl SqliteStorage {
37 pub fn new(path: &Path) -> Result<Self, StorageError> {
47 let storage = Self {
48 db_dir: path.to_path_buf(),
49 };
50
51 #[cfg(not(all(target_family = "wasm", target_os = "unknown")))]
52 std::fs::create_dir_all(path)
53 .map_err(|e| StorageError::InitializationError(e.to_string()))?;
54
55 storage.migrate()?;
56 Ok(storage)
57 }
58
59 pub(crate) fn get_connection(&self) -> Result<Connection, StorageError> {
60 Ok(Connection::open(self.get_db_path())?)
61 }
62
63 fn get_db_path(&self) -> PathBuf {
64 self.db_dir.join(DEFAULT_DB_FILENAME)
65 }
66
67 fn migrate(&self) -> Result<(), StorageError> {
68 let migrations =
69 Migrations::new(Self::current_migrations().into_iter().map(M::up).collect());
70 let mut conn = self.get_connection()?;
71 let previous_version = match migrations.current_version(&conn)? {
72 SchemaVersion::Inside(previous_version) => previous_version.get(),
73 _ => 0,
74 };
75 migrations.to_latest(&mut conn)?;
76
77 if previous_version < 6 {
78 Self::migrate_lnurl_metadata_description(&mut conn)?;
79 }
80
81 Ok(())
82 }
83
84 fn migrate_lnurl_metadata_description(conn: &mut Connection) -> Result<(), StorageError> {
85 let mut stmt = conn.prepare("SELECT payment_id, lnurl_pay_info FROM payment_metadata")?;
86 let pay_infos: Vec<_> = stmt
87 .query_map([], |row| {
88 let payment_id: String = row.get(0)?;
89 let lnurl_pay_info: Option<LnurlPayInfo> = row.get(1)?;
90 Ok((payment_id, lnurl_pay_info))
91 })?
92 .collect::<Result<_, _>>()?;
93 let pay_infos = pay_infos
94 .into_iter()
95 .filter_map(|(payment_id, lnurl_pay_info)| {
96 let pay_info = lnurl_pay_info?;
97 let description = pay_info.extract_description()?;
98 Some((payment_id, description))
99 })
100 .collect::<Vec<_>>();
101
102 for pay_info in pay_infos {
103 conn.execute(
104 "UPDATE payment_metadata SET lnurl_description = ? WHERE payment_id = ?",
105 params![pay_info.1, pay_info.0],
106 )?;
107 }
108
109 Ok(())
110 }
111
112 #[allow(clippy::too_many_lines)]
113 pub(crate) fn current_migrations() -> Vec<&'static str> {
114 vec![
115 "CREATE TABLE IF NOT EXISTS payments (
116 id TEXT PRIMARY KEY,
117 payment_type TEXT NOT NULL,
118 status TEXT NOT NULL,
119 amount INTEGER NOT NULL,
120 fees INTEGER NOT NULL,
121 timestamp INTEGER NOT NULL,
122 details TEXT,
123 method TEXT
124 );",
125 "CREATE TABLE IF NOT EXISTS settings (
126 key TEXT PRIMARY KEY,
127 value TEXT NOT NULL
128 );",
129 "CREATE TABLE IF NOT EXISTS unclaimed_deposits (
130 txid TEXT NOT NULL,
131 vout INTEGER NOT NULL,
132 amount_sats INTEGER,
133 claim_error TEXT,
134 refund_tx TEXT,
135 refund_tx_id TEXT,
136 PRIMARY KEY (txid, vout)
137 );",
138 "CREATE TABLE IF NOT EXISTS payment_metadata (
139 payment_id TEXT PRIMARY KEY,
140 lnurl_pay_info TEXT
141 );",
142 "CREATE TABLE IF NOT EXISTS deposit_refunds (
143 deposit_tx_id TEXT NOT NULL,
144 deposit_vout INTEGER NOT NULL,
145 refund_tx TEXT NOT NULL,
146 refund_tx_id TEXT NOT NULL,
147 PRIMARY KEY (deposit_tx_id, deposit_vout)
148 );",
149 "ALTER TABLE payment_metadata ADD COLUMN lnurl_description TEXT;",
150 "
151 ALTER TABLE payments ADD COLUMN withdraw_tx_id TEXT;
152 ALTER TABLE payments ADD COLUMN deposit_tx_id TEXT;
153 ALTER TABLE payments ADD COLUMN spark INTEGER;
154 CREATE TABLE payment_details_lightning (
155 payment_id TEXT PRIMARY KEY,
156 invoice TEXT NOT NULL,
157 payment_hash TEXT NOT NULL,
158 destination_pubkey TEXT NOT NULL,
159 description TEXT,
160 preimage TEXT,
161 FOREIGN KEY (payment_id) REFERENCES payments(id) ON DELETE CASCADE
162 );
163 INSERT INTO payment_details_lightning (payment_id, invoice, payment_hash, destination_pubkey, description, preimage)
164 SELECT id, json_extract(details, '$.Lightning.invoice'), json_extract(details, '$.Lightning.payment_hash'),
165 json_extract(details, '$.Lightning.destination_pubkey'), json_extract(details, '$.Lightning.description'),
166 json_extract(details, '$.Lightning.preimage')
167 FROM payments WHERE json_extract(details, '$.Lightning.invoice') IS NOT NULL;
168
169 UPDATE payments SET withdraw_tx_id = json_extract(details, '$.Withdraw.tx_id')
170 WHERE json_extract(details, '$.Withdraw.tx_id') IS NOT NULL;
171
172 UPDATE payments SET deposit_tx_id = json_extract(details, '$.Deposit.tx_id')
173 WHERE json_extract(details, '$.Deposit.tx_id') IS NOT NULL;
174
175 ALTER TABLE payments DROP COLUMN details;
176
177 CREATE INDEX idx_payment_details_lightning_invoice ON payment_details_lightning(invoice);
178 ",
179 "CREATE TABLE payment_details_token (
180 payment_id TEXT PRIMARY KEY,
181 metadata TEXT NOT NULL,
182 tx_hash TEXT NOT NULL,
183 FOREIGN KEY (payment_id) REFERENCES payments(id) ON DELETE CASCADE
184 );",
185 "CREATE TABLE payments_new (
187 id TEXT PRIMARY KEY,
188 payment_type TEXT NOT NULL,
189 status TEXT NOT NULL,
190 amount TEXT NOT NULL,
191 fees TEXT NOT NULL,
192 timestamp INTEGER NOT NULL,
193 method TEXT,
194 withdraw_tx_id TEXT,
195 deposit_tx_id TEXT,
196 spark INTEGER
197 );",
198 "INSERT INTO payments_new (id, payment_type, status, amount, fees, timestamp, method, withdraw_tx_id, deposit_tx_id, spark)
199 SELECT id, payment_type, status, CAST(amount AS TEXT), CAST(fees AS TEXT), timestamp, method, withdraw_tx_id, deposit_tx_id, spark
200 FROM payments;",
201 "DROP TABLE payments;",
202 "ALTER TABLE payments_new RENAME TO payments;",
203 "CREATE TABLE payment_details_spark (
204 payment_id TEXT NOT NULL PRIMARY KEY,
205 invoice_details TEXT NOT NULL,
206 FOREIGN KEY (payment_id) REFERENCES payments(id) ON DELETE CASCADE
207 );
208 ALTER TABLE payment_details_token ADD COLUMN invoice_details TEXT;",
209 "ALTER TABLE payment_metadata ADD COLUMN lnurl_withdraw_info TEXT;",
210 "CREATE TABLE sync_revision (
214 revision INTEGER NOT NULL DEFAULT 0
215 );
216 INSERT INTO sync_revision (revision) VALUES (0);
217 CREATE TABLE sync_outgoing(
218 record_type TEXT NOT NULL,
219 data_id TEXT NOT NULL,
220 schema_version TEXT NOT NULL,
221 commit_time INTEGER NOT NULL,
222 updated_fields_json TEXT NOT NULL,
223 revision INTEGER NOT NULL
224 );
225 CREATE INDEX idx_sync_outgoing_data_id_record_type ON sync_outgoing(record_type, data_id);
226 CREATE TABLE sync_state(
227 record_type TEXT NOT NULL,
228 data_id TEXT NOT NULL,
229 schema_version TEXT NOT NULL,
230 commit_time INTEGER NOT NULL,
231 data TEXT NOT NULL,
232 revision INTEGER NOT NULL,
233 PRIMARY KEY(record_type, data_id)
234 );",
235 "CREATE TABLE sync_incoming(
236 record_type TEXT NOT NULL,
237 data_id TEXT NOT NULL,
238 schema_version TEXT NOT NULL,
239 commit_time INTEGER NOT NULL,
240 data TEXT NOT NULL,
241 revision INTEGER NOT NULL,
242 PRIMARY KEY(record_type, data_id, revision)
243 );
244 CREATE INDEX idx_sync_incoming_revision ON sync_incoming(revision);",
245 "ALTER TABLE payment_details_spark RENAME TO tmp_payment_details_spark;
246 CREATE TABLE payment_details_spark (
247 payment_id TEXT NOT NULL PRIMARY KEY,
248 invoice_details TEXT,
249 htlc_details TEXT,
250 FOREIGN KEY (payment_id) REFERENCES payments(id) ON DELETE CASCADE
251 );
252 INSERT INTO payment_details_spark (payment_id, invoice_details)
253 SELECT payment_id, invoice_details FROM tmp_payment_details_spark;
254 DROP TABLE tmp_payment_details_spark;",
255 "CREATE TABLE lnurl_receive_metadata (
256 payment_hash TEXT NOT NULL PRIMARY KEY,
257 nostr_zap_request TEXT,
258 nostr_zap_receipt TEXT,
259 sender_comment TEXT
260 );",
261 "DELETE FROM unclaimed_deposits;",
264 "DELETE FROM sync_outgoing;
269 DELETE FROM sync_incoming;
270 DELETE FROM sync_state;
271 UPDATE sync_revision SET revision = 0;
272 DELETE FROM settings WHERE key = 'sync_initial_complete';",
273 "ALTER TABLE payment_metadata ADD COLUMN token_conversion_info TEXT;",
274 "ALTER TABLE payment_metadata ADD COLUMN parent_payment_id TEXT;",
275 "
276 ALTER TABLE payment_metadata DROP COLUMN token_conversion_info;
277 ALTER TABLE payment_metadata ADD COLUMN conversion_info TEXT;
278 ",
279 "ALTER TABLE payment_details_token ADD COLUMN tx_type TEXT NOT NULL DEFAULT 'transfer';
284 UPDATE settings
285 SET value = json_set(value, '$.last_synced_final_token_payment_id', NULL)
286 WHERE key = 'sync_offset' AND json_valid(value) AND json_type(value, '$.last_synced_final_token_payment_id') IS NOT NULL;",
287 "DELETE FROM sync_outgoing;
288 DELETE FROM sync_incoming;
289 DELETE FROM sync_state;
290 UPDATE sync_revision SET revision = 0;
291 DELETE FROM settings WHERE key = 'sync_initial_complete';",
292 "ALTER TABLE payment_details_lightning ADD COLUMN htlc_status TEXT NOT NULL DEFAULT 'WaitingForPreimage';
293 ALTER TABLE payment_details_lightning ADD COLUMN htlc_expiry_time INTEGER NOT NULL DEFAULT 0;",
294 "UPDATE payment_details_lightning
299 SET htlc_status = CASE
300 WHEN (SELECT status FROM payments WHERE id = payment_id) = 'completed' THEN 'PreimageShared'
301 WHEN (SELECT status FROM payments WHERE id = payment_id) = 'pending' THEN 'WaitingForPreimage'
302 ELSE 'Returned'
303 END;
304 UPDATE settings
305 SET value = json_set(value, '$.offset', 0)
306 WHERE key = 'sync_offset' AND json_valid(value);",
307 "ALTER TABLE lnurl_receive_metadata ADD COLUMN preimage TEXT;",
309 "DELETE FROM settings WHERE key = 'lnurl_metadata_updated_after';",
312 "DELETE FROM settings WHERE key = 'lightning_address';",
314 "CREATE INDEX IF NOT EXISTS idx_payment_details_lightning_payment_hash ON payment_details_lightning(payment_hash);",
316 "CREATE TABLE contacts (
317 id TEXT PRIMARY KEY,
318 name TEXT NOT NULL,
319 payment_identifier TEXT NOT NULL,
320 created_at INTEGER NOT NULL,
321 updated_at INTEGER NOT NULL
322 );",
323 "ALTER TABLE lnurl_receive_metadata DROP COLUMN preimage;",
326 "DELETE FROM settings WHERE key = 'lightning_address';",
328 "ALTER TABLE unclaimed_deposits ADD COLUMN is_mature INTEGER NOT NULL DEFAULT 1;",
329 "ALTER TABLE payment_metadata ADD COLUMN conversion_status TEXT;",
331 ]
332 }
333}
334
335#[allow(clippy::needless_pass_by_value)]
339fn map_sqlite_error(e: rusqlite::Error) -> StorageError {
340 match e {
341 rusqlite::Error::SqliteFailure(err, _)
342 if err.code == rusqlite::ErrorCode::DatabaseBusy
343 || err.code == rusqlite::ErrorCode::DatabaseLocked =>
344 {
345 StorageError::Connection(e.to_string())
346 }
347 _ => StorageError::Implementation(e.to_string()),
348 }
349}
350
351impl From<rusqlite::Error> for StorageError {
352 fn from(value: rusqlite::Error) -> Self {
353 map_sqlite_error(value)
354 }
355}
356
357impl From<rusqlite_migration::Error> for StorageError {
358 fn from(value: rusqlite_migration::Error) -> Self {
359 StorageError::Implementation(value.to_string())
360 }
361}
362
363#[async_trait]
364impl Storage for SqliteStorage {
365 #[allow(clippy::too_many_lines)]
366 async fn list_payments(
367 &self,
368 request: StorageListPaymentsRequest,
369 ) -> Result<Vec<Payment>, StorageError> {
370 let connection = self.get_connection()?;
371
372 let mut where_clauses = Vec::new();
374 let mut params: Vec<Box<dyn ToSql>> = Vec::new();
375
376 if let Some(ref type_filter) = request.type_filter
378 && !type_filter.is_empty()
379 {
380 let placeholders = type_filter
381 .iter()
382 .map(|_| "?")
383 .collect::<Vec<_>>()
384 .join(", ");
385 where_clauses.push(format!("p.payment_type IN ({placeholders})"));
386 for payment_type in type_filter {
387 params.push(Box::new(payment_type.to_string()));
388 }
389 }
390
391 if let Some(ref status_filter) = request.status_filter
393 && !status_filter.is_empty()
394 {
395 let placeholders = status_filter
396 .iter()
397 .map(|_| "?")
398 .collect::<Vec<_>>()
399 .join(", ");
400 where_clauses.push(format!("p.status IN ({placeholders})"));
401 for status in status_filter {
402 params.push(Box::new(status.to_string()));
403 }
404 }
405
406 if let Some(from_timestamp) = request.from_timestamp {
408 where_clauses.push("p.timestamp >= ?".to_string());
409 params.push(Box::new(from_timestamp));
410 }
411
412 if let Some(to_timestamp) = request.to_timestamp {
413 where_clauses.push("p.timestamp < ?".to_string());
414 params.push(Box::new(to_timestamp));
415 }
416
417 if let Some(ref asset_filter) = request.asset_filter {
419 match asset_filter {
420 AssetFilter::Bitcoin => {
421 where_clauses.push("t.metadata IS NULL".to_string());
422 }
423 AssetFilter::Token { token_identifier } => {
424 where_clauses.push("t.metadata IS NOT NULL".to_string());
425 if let Some(identifier) = token_identifier {
426 where_clauses
428 .push("json_extract(t.metadata, '$.identifier') = ?".to_string());
429 params.push(Box::new(identifier.clone()));
430 }
431 }
432 }
433 }
434
435 if let Some(ref payment_details_filter) = request.payment_details_filter {
437 let mut all_payment_details_clauses = Vec::new();
438 for payment_details_filter in payment_details_filter {
439 let mut payment_details_clauses = Vec::new();
440 let htlc_filter = match payment_details_filter {
442 StoragePaymentDetailsFilter::Spark {
443 htlc_status: Some(s),
444 ..
445 } if !s.is_empty() => Some(("s", s)),
446 StoragePaymentDetailsFilter::Lightning {
447 htlc_status: Some(s),
448 ..
449 } if !s.is_empty() => Some(("l", s)),
450 _ => None,
451 };
452 if let Some((alias, htlc_statuses)) = htlc_filter {
453 let placeholders = htlc_statuses
454 .iter()
455 .map(|_| "?")
456 .collect::<Vec<_>>()
457 .join(", ");
458 if alias == "l" {
459 payment_details_clauses.push(format!("l.htlc_status IN ({placeholders})"));
461 } else {
462 payment_details_clauses.push(format!(
464 "json_extract(s.htlc_details, '$.status') IN ({placeholders})"
465 ));
466 }
467 for htlc_status in htlc_statuses {
468 params.push(Box::new(htlc_status.to_string()));
469 }
470 }
471 let conversion_filter = match payment_details_filter {
473 StoragePaymentDetailsFilter::Spark {
474 conversion_refund_needed: Some(v),
475 ..
476 } => Some((v, "p.spark = 1")),
477 StoragePaymentDetailsFilter::Token {
478 conversion_refund_needed: Some(v),
479 ..
480 } => Some((v, "p.spark IS NULL")),
481 _ => None,
482 };
483 if let Some((conversion_refund_needed, type_check)) = conversion_filter {
484 let refund_needed = if *conversion_refund_needed {
485 "= 'RefundNeeded'"
486 } else {
487 "!= 'RefundNeeded'"
488 };
489 payment_details_clauses.push(format!(
490 "{type_check} AND pm.conversion_info IS NOT NULL AND
491 json_extract(pm.conversion_info, '$.status') {refund_needed}"
492 ));
493 }
494 if let StoragePaymentDetailsFilter::Token {
496 tx_hash: Some(tx_hash),
497 ..
498 } = payment_details_filter
499 {
500 payment_details_clauses.push("t.tx_hash = ?".to_string());
501 params.push(Box::new(tx_hash.clone()));
502 }
503
504 if let StoragePaymentDetailsFilter::Token {
506 tx_type: Some(tx_type),
507 ..
508 } = payment_details_filter
509 {
510 payment_details_clauses.push("t.tx_type = ?".to_string());
511 params.push(Box::new(tx_type.to_string()));
512 }
513
514 if !payment_details_clauses.is_empty() {
515 all_payment_details_clauses
516 .push(format!("({})", payment_details_clauses.join(" AND ")));
517 }
518 }
519
520 if !all_payment_details_clauses.is_empty() {
521 where_clauses.push(format!("({})", all_payment_details_clauses.join(" OR ")));
522 }
523 }
524
525 where_clauses.push("pm.parent_payment_id IS NULL".to_string());
528
529 let where_sql = if where_clauses.is_empty() {
531 String::new()
532 } else {
533 format!("WHERE {}", where_clauses.join(" AND "))
534 };
535
536 let order_direction = if request.sort_ascending.unwrap_or(false) {
538 "ASC"
539 } else {
540 "DESC"
541 };
542
543 let query = format!(
544 "{SELECT_PAYMENT_SQL} {where_sql} ORDER BY p.timestamp {order_direction} LIMIT {} OFFSET {}",
545 request.limit.unwrap_or(u32::MAX),
546 request.offset.unwrap_or(0)
547 );
548
549 let mut stmt = connection.prepare(&query)?;
550 let param_refs: Vec<&dyn ToSql> = params.iter().map(std::convert::AsRef::as_ref).collect();
551 let payments = stmt
552 .query_map(param_refs.as_slice(), map_payment)?
553 .collect::<Result<Vec<_>, _>>()?;
554 Ok(payments)
555 }
556
557 #[allow(clippy::too_many_lines)]
558 async fn insert_payment(&self, payment: Payment) -> Result<(), StorageError> {
559 let mut connection = self.get_connection()?;
560 let tx = connection.transaction_with_behavior(TransactionBehavior::Immediate)?;
561
562 let (withdraw_tx_id, deposit_tx_id, spark): (Option<&str>, Option<&str>, Option<bool>) =
564 match &payment.details {
565 Some(PaymentDetails::Withdraw { tx_id }) => (Some(tx_id.as_str()), None, None),
566 Some(PaymentDetails::Deposit { tx_id }) => (None, Some(tx_id.as_str()), None),
567 Some(PaymentDetails::Spark { .. }) => (None, None, Some(true)),
568 _ => (None, None, None),
569 };
570
571 tx.execute(
573 "INSERT INTO payments (id, payment_type, status, amount, fees, timestamp, method, withdraw_tx_id, deposit_tx_id, spark)
574 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
575 ON CONFLICT(id) DO UPDATE SET
576 payment_type=excluded.payment_type,
577 status=excluded.status,
578 amount=excluded.amount,
579 fees=excluded.fees,
580 timestamp=excluded.timestamp,
581 method=excluded.method,
582 withdraw_tx_id=excluded.withdraw_tx_id,
583 deposit_tx_id=excluded.deposit_tx_id,
584 spark=excluded.spark",
585 params![
586 payment.id,
587 payment.payment_type.to_string(),
588 payment.status.to_string(),
589 U128SqlWrapper(payment.amount),
590 U128SqlWrapper(payment.fees),
591 payment.timestamp,
592 payment.method,
593 withdraw_tx_id,
594 deposit_tx_id,
595 spark,
596 ],
597 )?;
598
599 match payment.details {
600 Some(PaymentDetails::Spark {
601 invoice_details,
602 htlc_details,
603 ..
604 }) => {
605 if invoice_details.is_some() || htlc_details.is_some() {
606 tx.execute(
608 "INSERT INTO payment_details_spark (payment_id, invoice_details, htlc_details)
609 VALUES (?, ?, ?)
610 ON CONFLICT(payment_id) DO UPDATE SET
611 invoice_details=COALESCE(excluded.invoice_details, payment_details_spark.invoice_details),
612 htlc_details=COALESCE(excluded.htlc_details, payment_details_spark.htlc_details)",
613 params![
614 payment.id,
615 invoice_details.as_ref().map(serde_json::to_string).transpose()?,
616 htlc_details.as_ref().map(serde_json::to_string).transpose()?,
617 ],
618 )?;
619 }
620 }
621 Some(PaymentDetails::Token {
622 metadata,
623 tx_hash,
624 tx_type,
625 invoice_details,
626 ..
627 }) => {
628 tx.execute(
629 "INSERT INTO payment_details_token (payment_id, metadata, tx_hash, tx_type, invoice_details)
630 VALUES (?, ?, ?, ?, ?)
631 ON CONFLICT(payment_id) DO UPDATE SET
632 metadata=excluded.metadata,
633 tx_hash=excluded.tx_hash,
634 tx_type=excluded.tx_type,
635 invoice_details=COALESCE(excluded.invoice_details, payment_details_token.invoice_details)",
636 params![
637 payment.id,
638 serde_json::to_string(&metadata)?,
639 tx_hash,
640 tx_type.to_string(),
641 invoice_details.as_ref().map(serde_json::to_string).transpose()?,
642 ],
643 )?;
644 }
645 Some(PaymentDetails::Lightning {
646 invoice,
647 destination_pubkey,
648 description,
649 htlc_details,
650 ..
651 }) => {
652 tx.execute(
653 "INSERT INTO payment_details_lightning (payment_id, invoice, payment_hash, destination_pubkey, description, preimage, htlc_status, htlc_expiry_time)
654 VALUES (?, ?, ?, ?, ?, ?, ?, ?)
655 ON CONFLICT(payment_id) DO UPDATE SET
656 invoice=excluded.invoice,
657 payment_hash=excluded.payment_hash,
658 destination_pubkey=excluded.destination_pubkey,
659 description=excluded.description,
660 preimage=COALESCE(excluded.preimage, payment_details_lightning.preimage),
661 htlc_status=COALESCE(excluded.htlc_status, payment_details_lightning.htlc_status),
662 htlc_expiry_time=COALESCE(excluded.htlc_expiry_time, payment_details_lightning.htlc_expiry_time)",
663 params![
664 payment.id,
665 invoice,
666 htlc_details.payment_hash,
667 destination_pubkey,
668 description,
669 htlc_details.preimage,
670 htlc_details.status.to_string(),
671 htlc_details.expiry_time,
672 ],
673 )?;
674 }
675 Some(PaymentDetails::Withdraw { .. } | PaymentDetails::Deposit { .. }) | None => {}
676 }
677
678 tx.commit()?;
679 Ok(())
680 }
681
682 async fn insert_payment_metadata(
683 &self,
684 payment_id: String,
685 metadata: PaymentMetadata,
686 ) -> Result<(), StorageError> {
687 let connection = self.get_connection()?;
688
689 connection.execute(
690 "INSERT INTO payment_metadata (payment_id, parent_payment_id, lnurl_pay_info, lnurl_withdraw_info, lnurl_description, conversion_info, conversion_status)
691 VALUES (?, ?, ?, ?, ?, ?, ?)
692 ON CONFLICT(payment_id) DO UPDATE SET
693 parent_payment_id = COALESCE(excluded.parent_payment_id, parent_payment_id),
694 lnurl_pay_info = COALESCE(excluded.lnurl_pay_info, lnurl_pay_info),
695 lnurl_withdraw_info = COALESCE(excluded.lnurl_withdraw_info, lnurl_withdraw_info),
696 lnurl_description = COALESCE(excluded.lnurl_description, lnurl_description),
697 conversion_info = COALESCE(excluded.conversion_info, conversion_info),
698 conversion_status = COALESCE(excluded.conversion_status, conversion_status)",
699 params![
700 payment_id,
701 metadata.parent_payment_id,
702 metadata.lnurl_pay_info,
703 metadata.lnurl_withdraw_info,
704 metadata.lnurl_description,
705 metadata.conversion_info.as_ref().map(serde_json::to_string).transpose()?,
706 metadata.conversion_status.as_ref().map(std::string::ToString::to_string),
707 ],
708 )?;
709
710 Ok(())
711 }
712
713 async fn set_cached_item(&self, key: String, value: String) -> Result<(), StorageError> {
714 let connection = self.get_connection()?;
715
716 connection.execute(
717 "INSERT OR REPLACE INTO settings (key, value) VALUES (?, ?)",
718 params![key, value],
719 )?;
720
721 Ok(())
722 }
723
724 async fn get_cached_item(&self, key: String) -> Result<Option<String>, StorageError> {
725 let connection = self.get_connection()?;
726
727 let mut stmt = connection.prepare("SELECT value FROM settings WHERE key = ?")?;
728
729 let result = stmt.query_row(params![key], |row| {
730 let value_str: String = row.get(0)?;
731 Ok(value_str)
732 });
733
734 match result {
735 Ok(value) => Ok(Some(value)),
736 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
737 Err(e) => Err(e.into()),
738 }
739 }
740
741 async fn delete_cached_item(&self, key: String) -> Result<(), StorageError> {
742 let connection = self.get_connection()?;
743
744 connection.execute("DELETE FROM settings WHERE key = ?", params![key])?;
745
746 Ok(())
747 }
748
749 async fn get_payment_by_id(&self, id: String) -> Result<Payment, StorageError> {
750 let connection = self.get_connection()?;
751 let query = format!("{SELECT_PAYMENT_SQL} WHERE p.id = ?");
752 let mut stmt = connection.prepare(&query)?;
753 let payment = stmt.query_row(params![id], map_payment)?;
754 Ok(payment)
755 }
756
757 async fn get_payment_by_invoice(
758 &self,
759 invoice: String,
760 ) -> Result<Option<Payment>, StorageError> {
761 let connection = self.get_connection()?;
762 let query = format!("{SELECT_PAYMENT_SQL} WHERE l.invoice = ?");
763 let mut stmt = connection.prepare(&query)?;
764 let payment = stmt.query_row(params![invoice], map_payment);
765 match payment {
766 Ok(payment) => Ok(Some(payment)),
767 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
768 Err(e) => Err(e.into()),
769 }
770 }
771
772 async fn get_payments_by_parent_ids(
773 &self,
774 parent_payment_ids: Vec<String>,
775 ) -> Result<HashMap<String, Vec<Payment>>, StorageError> {
776 if parent_payment_ids.is_empty() {
777 return Ok(HashMap::new());
778 }
779
780 let connection = self.get_connection()?;
781
782 let has_related: bool = connection.query_row(
784 "SELECT EXISTS(SELECT 1 FROM payment_metadata WHERE parent_payment_id IS NOT NULL LIMIT 1)",
785 [],
786 |row| row.get(0),
787 )?;
788 if !has_related {
789 return Ok(HashMap::new());
790 }
791
792 let placeholders: Vec<&str> = parent_payment_ids.iter().map(|_| "?").collect();
794 let in_clause = placeholders.join(", ");
795
796 let query = format!(
797 "{SELECT_PAYMENT_SQL} WHERE pm.parent_payment_id IN ({in_clause}) ORDER BY p.timestamp ASC"
798 );
799
800 let mut stmt = connection.prepare(&query)?;
801 let params: Vec<&dyn ToSql> = parent_payment_ids
802 .iter()
803 .map(|id| id as &dyn ToSql)
804 .collect();
805 let rows = stmt.query_map(params.as_slice(), |row| {
806 let payment = map_payment(row)?;
807 let parent_payment_id: String = row.get(31)?;
808 Ok((parent_payment_id, payment))
809 })?;
810
811 let mut result: HashMap<String, Vec<Payment>> = HashMap::new();
812 for row in rows {
813 let (parent_id, related_payment) = row?;
814 result.entry(parent_id).or_default().push(related_payment);
815 }
816
817 Ok(result)
818 }
819
820 async fn add_deposit(
821 &self,
822 txid: String,
823 vout: u32,
824 amount_sats: u64,
825 is_mature: bool,
826 ) -> Result<(), StorageError> {
827 let connection = self.get_connection()?;
828 connection.execute(
829 "INSERT INTO unclaimed_deposits (txid, vout, amount_sats, is_mature)
830 VALUES (?, ?, ?, ?)
831 ON CONFLICT(txid, vout) DO UPDATE SET is_mature = excluded.is_mature, amount_sats = excluded.amount_sats",
832 params![txid, vout, amount_sats, is_mature],
833 )?;
834 Ok(())
835 }
836
837 async fn delete_deposit(&self, txid: String, vout: u32) -> Result<(), StorageError> {
838 let connection = self.get_connection()?;
839 connection.execute(
840 "DELETE FROM unclaimed_deposits WHERE txid = ? AND vout = ?",
841 params![txid, vout],
842 )?;
843 Ok(())
844 }
845
846 async fn list_deposits(&self) -> Result<Vec<DepositInfo>, StorageError> {
847 let connection = self.get_connection()?;
848 let mut stmt =
849 connection.prepare("SELECT txid, vout, amount_sats, is_mature, claim_error, refund_tx, refund_tx_id FROM unclaimed_deposits")?;
850 let rows = stmt.query_map(params![], |row| {
851 Ok(DepositInfo {
852 txid: row.get(0)?,
853 vout: row.get(1)?,
854 amount_sats: row.get(2)?,
855 is_mature: row.get(3)?,
856 claim_error: row.get(4)?,
857 refund_tx: row.get(5)?,
858 refund_tx_id: row.get(6)?,
859 })
860 })?;
861 let mut deposits = Vec::new();
862 for row in rows {
863 deposits.push(row?);
864 }
865 Ok(deposits)
866 }
867
868 async fn update_deposit(
869 &self,
870 txid: String,
871 vout: u32,
872 payload: UpdateDepositPayload,
873 ) -> Result<(), StorageError> {
874 let connection = self.get_connection()?;
875 match payload {
876 UpdateDepositPayload::ClaimError { error } => {
877 connection.execute(
878 "UPDATE unclaimed_deposits SET claim_error = ?, refund_tx = NULL, refund_tx_id = NULL WHERE txid = ? AND vout = ?",
879 params![error, txid, vout],
880 )?;
881 }
882 UpdateDepositPayload::Refund {
883 refund_txid,
884 refund_tx,
885 } => {
886 connection.execute(
887 "UPDATE unclaimed_deposits SET refund_tx = ?, refund_tx_id = ?, claim_error = NULL WHERE txid = ? AND vout = ?",
888 params![refund_tx, refund_txid, txid, vout],
889 )?;
890 }
891 }
892 Ok(())
893 }
894
895 async fn set_lnurl_metadata(
896 &self,
897 metadata: Vec<SetLnurlMetadataItem>,
898 ) -> Result<(), StorageError> {
899 let connection = self.get_connection()?;
900 for metadata in metadata {
901 connection.execute(
902 "INSERT OR REPLACE INTO lnurl_receive_metadata (payment_hash, nostr_zap_request, nostr_zap_receipt, sender_comment)
903 VALUES (?, ?, ?, ?)",
904 params![
905 metadata.payment_hash,
906 metadata.nostr_zap_request,
907 metadata.nostr_zap_receipt,
908 metadata.sender_comment,
909 ],
910 )?;
911 }
912 Ok(())
913 }
914
915 async fn list_contacts(
916 &self,
917 request: ListContactsRequest,
918 ) -> Result<Vec<Contact>, StorageError> {
919 let limit = request.limit.unwrap_or(u32::MAX);
920 let offset = request.offset.unwrap_or(0);
921 let connection = self.get_connection()?;
922 let query = "SELECT id, name, payment_identifier, created_at, updated_at FROM contacts ORDER BY name ASC LIMIT ? OFFSET ?";
923
924 let mut stmt = connection.prepare(query)?;
925 let contacts = stmt
926 .query_map(params![limit, offset], |row| {
927 Ok(Contact {
928 id: row.get(0)?,
929 name: row.get(1)?,
930 payment_identifier: row.get(2)?,
931 created_at: row.get(3)?,
932 updated_at: row.get(4)?,
933 })
934 })?
935 .collect::<Result<Vec<_>, _>>()?;
936 Ok(contacts)
937 }
938
939 async fn get_contact(&self, id: String) -> Result<Contact, StorageError> {
940 let connection = self.get_connection()?;
941 let mut stmt = connection.prepare(
942 "SELECT id, name, payment_identifier, created_at, updated_at FROM contacts WHERE id = ?",
943 )?;
944 stmt.query_row(params![id], |row| {
945 Ok(Contact {
946 id: row.get(0)?,
947 name: row.get(1)?,
948 payment_identifier: row.get(2)?,
949 created_at: row.get(3)?,
950 updated_at: row.get(4)?,
951 })
952 })
953 .map_err(|e| match e {
954 rusqlite::Error::QueryReturnedNoRows => StorageError::NotFound,
955 other => other.into(),
956 })
957 }
958
959 async fn insert_contact(&self, contact: Contact) -> Result<(), StorageError> {
960 let connection = self.get_connection()?;
961 connection.execute(
962 "INSERT INTO contacts (id, name, payment_identifier, created_at, updated_at)
963 VALUES (?, ?, ?, ?, ?)
964 ON CONFLICT(id) DO UPDATE SET
965 name = excluded.name,
966 payment_identifier = excluded.payment_identifier,
967 updated_at = excluded.updated_at",
968 params![
969 contact.id,
970 contact.name,
971 contact.payment_identifier,
972 contact.created_at,
973 contact.updated_at,
974 ],
975 )?;
976 Ok(())
977 }
978
979 async fn delete_contact(&self, id: String) -> Result<(), StorageError> {
980 let connection = self.get_connection()?;
981 connection.execute("DELETE FROM contacts WHERE id = ?", params![id])?;
982 Ok(())
983 }
984
985 async fn add_outgoing_change(
986 &self,
987 record: UnversionedRecordChange,
988 ) -> Result<u64, StorageError> {
989 let mut connection = self.get_connection()?;
990 let tx = connection
991 .transaction_with_behavior(TransactionBehavior::Immediate)
992 .map_err(map_sqlite_error)?;
993
994 let local_revision: u64 = tx
996 .query_row(
997 "SELECT COALESCE(MAX(revision), 0) + 1 FROM sync_outgoing",
998 [],
999 |row| row.get(0),
1000 )
1001 .map_err(map_sqlite_error)?;
1002
1003 tx.execute(
1004 "INSERT INTO sync_outgoing (
1005 record_type
1006 , data_id
1007 , schema_version
1008 , commit_time
1009 , updated_fields_json
1010 , revision
1011 )
1012 VALUES (?, ?, ?, strftime('%s','now'), ?, ?)",
1013 params![
1014 record.id.r#type,
1015 record.id.data_id,
1016 record.schema_version.clone(),
1017 serde_json::to_string(&record.updated_fields)?,
1018 local_revision,
1019 ],
1020 )
1021 .map_err(map_sqlite_error)?;
1022
1023 tx.commit().map_err(map_sqlite_error)?;
1024 Ok(local_revision)
1025 }
1026
1027 async fn complete_outgoing_sync(
1028 &self,
1029 record: Record,
1030 local_revision: u64,
1031 ) -> Result<(), StorageError> {
1032 let mut connection = self.get_connection()?;
1033 let tx = connection
1034 .transaction_with_behavior(TransactionBehavior::Immediate)
1035 .map_err(map_sqlite_error)?;
1036
1037 let rows_deleted = tx
1038 .execute(
1039 "DELETE FROM sync_outgoing WHERE record_type = ? AND data_id = ? AND revision = ?",
1040 params![record.id.r#type, record.id.data_id, local_revision],
1041 )
1042 .map_err(map_sqlite_error)?;
1043
1044 if rows_deleted == 0 {
1045 warn!(
1046 "complete_outgoing_sync: DELETE from sync_outgoing matched 0 rows \
1047 (type={}, data_id={}, revision={})",
1048 record.id.r#type, record.id.data_id, local_revision
1049 );
1050 }
1051
1052 tx.execute(
1053 "INSERT OR REPLACE INTO sync_state (
1054 record_type
1055 , data_id
1056 , schema_version
1057 , commit_time
1058 , data
1059 , revision
1060 )
1061 VALUES (?, ?, ?, strftime('%s','now'), ?, ?)",
1062 params![
1063 record.id.r#type,
1064 record.id.data_id,
1065 record.schema_version.clone(),
1066 serde_json::to_string(&record.data)?,
1067 record.revision,
1068 ],
1069 )
1070 .map_err(map_sqlite_error)?;
1071
1072 tx.execute(
1073 "UPDATE sync_revision SET revision = MAX(revision, ?)",
1074 params![record.revision],
1075 )
1076 .map_err(map_sqlite_error)?;
1077
1078 tx.commit().map_err(map_sqlite_error)?;
1079 Ok(())
1080 }
1081
1082 async fn get_pending_outgoing_changes(
1083 &self,
1084 limit: u32,
1085 ) -> Result<Vec<OutgoingChange>, StorageError> {
1086 let connection = self.get_connection()?;
1087
1088 let mut stmt = connection
1089 .prepare(
1090 "SELECT o.record_type
1091 , o.data_id
1092 , o.schema_version
1093 , o.commit_time
1094 , o.updated_fields_json
1095 , o.revision
1096 , e.schema_version AS existing_schema_version
1097 , e.commit_time AS existing_commit_time
1098 , e.data AS existing_data
1099 , e.revision AS existing_revision
1100 FROM sync_outgoing o
1101 LEFT JOIN sync_state e ON o.record_type = e.record_type AND o.data_id = e.data_id
1102 ORDER BY o.revision ASC
1103 LIMIT ?",
1104 )
1105 .map_err(map_sqlite_error)?;
1106 let mut rows = stmt.query(params![limit]).map_err(map_sqlite_error)?;
1107 let mut results = Vec::new();
1108 while let Some(row) = rows.next().map_err(map_sqlite_error)? {
1109 let parent = if let Some(existing_data) =
1110 row.get::<_, Option<String>>(8).map_err(map_sqlite_error)?
1111 {
1112 Some(Record {
1113 id: RecordId::new(
1114 row.get::<_, String>(0).map_err(map_sqlite_error)?,
1115 row.get::<_, String>(1).map_err(map_sqlite_error)?,
1116 ),
1117 schema_version: row.get(6).map_err(map_sqlite_error)?,
1118 revision: row.get(9).map_err(map_sqlite_error)?,
1119 data: serde_json::from_str(&existing_data)?,
1120 })
1121 } else {
1122 None
1123 };
1124 let change = RecordChange {
1125 id: RecordId::new(
1126 row.get::<_, String>(0).map_err(map_sqlite_error)?,
1127 row.get::<_, String>(1).map_err(map_sqlite_error)?,
1128 ),
1129 schema_version: row.get(2).map_err(map_sqlite_error)?,
1130 updated_fields: serde_json::from_str(
1131 &row.get::<_, String>(4).map_err(map_sqlite_error)?,
1132 )?,
1133 local_revision: row.get(5).map_err(map_sqlite_error)?,
1134 };
1135 results.push(OutgoingChange { change, parent });
1136 }
1137
1138 Ok(results)
1139 }
1140
1141 async fn get_last_revision(&self) -> Result<u64, StorageError> {
1142 let connection = self.get_connection()?;
1143
1144 let revision: u64 = connection
1145 .query_row("SELECT revision FROM sync_revision", [], |row| row.get(0))
1146 .map_err(map_sqlite_error)?;
1147
1148 Ok(revision)
1149 }
1150
1151 async fn insert_incoming_records(&self, records: Vec<Record>) -> Result<(), StorageError> {
1152 if records.is_empty() {
1153 return Ok(());
1154 }
1155
1156 let mut connection = self.get_connection()?;
1157 let tx = connection
1158 .transaction_with_behavior(TransactionBehavior::Immediate)
1159 .map_err(map_sqlite_error)?;
1160
1161 for record in records {
1162 tx.execute(
1163 "INSERT OR REPLACE INTO sync_incoming (
1164 record_type
1165 , data_id
1166 , schema_version
1167 , commit_time
1168 , data
1169 , revision
1170 )
1171 VALUES (?, ?, ?, strftime('%s','now'), ?, ?)",
1172 params![
1173 record.id.r#type,
1174 record.id.data_id,
1175 record.schema_version.clone(),
1176 serde_json::to_string(&record.data)?,
1177 record.revision,
1178 ],
1179 )
1180 .map_err(map_sqlite_error)?;
1181 }
1182
1183 tx.commit().map_err(map_sqlite_error)?;
1184 Ok(())
1185 }
1186
1187 async fn delete_incoming_record(&self, record: Record) -> Result<(), StorageError> {
1188 let connection = self.get_connection()?;
1189
1190 connection
1191 .execute(
1192 "DELETE FROM sync_incoming WHERE record_type = ? AND data_id = ? AND revision = ?",
1193 params![record.id.r#type, record.id.data_id, record.revision],
1194 )
1195 .map_err(map_sqlite_error)?;
1196
1197 Ok(())
1198 }
1199
1200 async fn get_incoming_records(&self, limit: u32) -> Result<Vec<IncomingChange>, StorageError> {
1201 let connection = self.get_connection()?;
1202
1203 let mut stmt = connection
1204 .prepare(
1205 "SELECT i.record_type
1206 , i.data_id
1207 , i.schema_version
1208 , i.data
1209 , i.revision
1210 , e.schema_version AS existing_schema_version
1211 , e.commit_time AS existing_commit_time
1212 , e.data AS existing_data
1213 , e.revision AS existing_revision
1214 FROM sync_incoming i
1215 LEFT JOIN sync_state e ON i.record_type = e.record_type AND i.data_id = e.data_id
1216 ORDER BY i.revision ASC
1217 LIMIT ?",
1218 )
1219 .map_err(map_sqlite_error)?;
1220
1221 let mut rows = stmt.query(params![limit]).map_err(map_sqlite_error)?;
1222 let mut results = Vec::new();
1223
1224 while let Some(row) = rows.next().map_err(map_sqlite_error)? {
1225 let parent = if let Some(existing_data) =
1226 row.get::<_, Option<String>>(7).map_err(map_sqlite_error)?
1227 {
1228 Some(Record {
1229 id: RecordId::new(
1230 row.get::<_, String>(0).map_err(map_sqlite_error)?,
1231 row.get::<_, String>(1).map_err(map_sqlite_error)?,
1232 ),
1233 schema_version: row.get(5).map_err(map_sqlite_error)?,
1234 revision: row.get(8).map_err(map_sqlite_error)?,
1235 data: serde_json::from_str(&existing_data)?,
1236 })
1237 } else {
1238 None
1239 };
1240 let record = Record {
1241 id: RecordId::new(
1242 row.get::<_, String>(0).map_err(map_sqlite_error)?,
1243 row.get::<_, String>(1).map_err(map_sqlite_error)?,
1244 ),
1245 schema_version: row.get(2).map_err(map_sqlite_error)?,
1246 data: serde_json::from_str(&row.get::<_, String>(3).map_err(map_sqlite_error)?)?,
1247 revision: row.get(4).map_err(map_sqlite_error)?,
1248 };
1249 results.push(IncomingChange {
1250 new_state: record,
1251 old_state: parent,
1252 });
1253 }
1254
1255 Ok(results)
1256 }
1257
1258 async fn get_latest_outgoing_change(&self) -> Result<Option<OutgoingChange>, StorageError> {
1259 let connection = self.get_connection()?;
1260
1261 let mut stmt = connection
1262 .prepare(
1263 "SELECT o.record_type
1264 , o.data_id
1265 , o.schema_version
1266 , o.commit_time
1267 , o.updated_fields_json
1268 , o.revision
1269 , e.schema_version AS existing_schema_version
1270 , e.commit_time AS existing_commit_time
1271 , e.data AS existing_data
1272 , e.revision AS existing_revision
1273 FROM sync_outgoing o
1274 LEFT JOIN sync_state e ON o.record_type = e.record_type AND o.data_id = e.data_id
1275 ORDER BY o.revision DESC
1276 LIMIT 1",
1277 )
1278 .map_err(map_sqlite_error)?;
1279
1280 let mut rows = stmt.query([]).map_err(map_sqlite_error)?;
1281
1282 if let Some(row) = rows.next().map_err(map_sqlite_error)? {
1283 let parent = if let Some(existing_data) =
1284 row.get::<_, Option<String>>(8).map_err(map_sqlite_error)?
1285 {
1286 Some(Record {
1287 id: RecordId::new(
1288 row.get::<_, String>(0).map_err(map_sqlite_error)?,
1289 row.get::<_, String>(1).map_err(map_sqlite_error)?,
1290 ),
1291 schema_version: row.get(6).map_err(map_sqlite_error)?,
1292 revision: row.get(9).map_err(map_sqlite_error)?,
1293 data: serde_json::from_str(&existing_data)?,
1294 })
1295 } else {
1296 None
1297 };
1298 let change = RecordChange {
1299 id: RecordId::new(
1300 row.get::<_, String>(0).map_err(map_sqlite_error)?,
1301 row.get::<_, String>(1).map_err(map_sqlite_error)?,
1302 ),
1303 schema_version: row.get(2).map_err(map_sqlite_error)?,
1304 updated_fields: serde_json::from_str(
1305 &row.get::<_, String>(4).map_err(map_sqlite_error)?,
1306 )?,
1307 local_revision: row.get(5).map_err(map_sqlite_error)?,
1308 };
1309
1310 return Ok(Some(OutgoingChange { change, parent }));
1311 }
1312
1313 Ok(None)
1314 }
1315
1316 async fn update_record_from_incoming(&self, record: Record) -> Result<(), StorageError> {
1317 let mut connection = self.get_connection()?;
1318 let tx = connection
1319 .transaction_with_behavior(TransactionBehavior::Immediate)
1320 .map_err(map_sqlite_error)?;
1321
1322 tx.execute(
1323 "INSERT OR REPLACE INTO sync_state (
1324 record_type
1325 , data_id
1326 , schema_version
1327 , commit_time
1328 , data
1329 , revision
1330 )
1331 VALUES (?, ?, ?, strftime('%s','now'), ?, ?)",
1332 params![
1333 record.id.r#type,
1334 record.id.data_id,
1335 record.schema_version.clone(),
1336 serde_json::to_string(&record.data)?,
1337 record.revision,
1338 ],
1339 )
1340 .map_err(map_sqlite_error)?;
1341
1342 tx.execute(
1343 "UPDATE sync_revision SET revision = MAX(revision, ?)",
1344 params![record.revision],
1345 )
1346 .map_err(map_sqlite_error)?;
1347
1348 tx.commit().map_err(map_sqlite_error)?;
1349 Ok(())
1350 }
1351}
1352
1353const SELECT_PAYMENT_SQL: &str = "
1356 SELECT p.id,
1357 p.payment_type,
1358 p.status,
1359 p.amount,
1360 p.fees,
1361 p.timestamp,
1362 p.method,
1363 p.withdraw_tx_id,
1364 p.deposit_tx_id,
1365 p.spark,
1366 l.invoice AS lightning_invoice,
1367 l.payment_hash AS lightning_payment_hash,
1368 l.destination_pubkey AS lightning_destination_pubkey,
1369 COALESCE(l.description, pm.lnurl_description) AS lightning_description,
1370 l.preimage AS lightning_preimage,
1371 l.htlc_status AS lightning_htlc_status,
1372 l.htlc_expiry_time AS lightning_htlc_expiry_time,
1373 pm.lnurl_pay_info,
1374 pm.lnurl_withdraw_info,
1375 pm.conversion_info,
1376 t.metadata AS token_metadata,
1377 t.tx_hash AS token_tx_hash,
1378 t.tx_type AS token_tx_type,
1379 t.invoice_details AS token_invoice_details,
1380 s.invoice_details AS spark_invoice_details,
1381 s.htlc_details AS spark_htlc_details,
1382 lrm.nostr_zap_request AS lnurl_nostr_zap_request,
1383 lrm.nostr_zap_receipt AS lnurl_nostr_zap_receipt,
1384 lrm.sender_comment AS lnurl_sender_comment,
1385 lrm.payment_hash AS lnurl_payment_hash,
1386 pm.conversion_status,
1387 pm.parent_payment_id
1388 FROM payments p
1389 LEFT JOIN payment_details_lightning l ON p.id = l.payment_id
1390 LEFT JOIN payment_details_token t ON p.id = t.payment_id
1391 LEFT JOIN payment_details_spark s ON p.id = s.payment_id
1392 LEFT JOIN payment_metadata pm ON p.id = pm.payment_id
1393 LEFT JOIN lnurl_receive_metadata lrm ON l.payment_hash = lrm.payment_hash";
1394
1395#[allow(clippy::too_many_lines)]
1396fn map_payment(row: &Row<'_>) -> Result<Payment, rusqlite::Error> {
1397 let withdraw_tx_id: Option<String> = row.get(7)?;
1398 let deposit_tx_id: Option<String> = row.get(8)?;
1399 let spark: Option<i32> = row.get(9)?;
1400 let lightning_invoice: Option<String> = row.get(10)?;
1401 let token_metadata: Option<String> = row.get(20)?;
1402 let details = match (
1403 lightning_invoice,
1404 withdraw_tx_id,
1405 deposit_tx_id,
1406 spark,
1407 token_metadata,
1408 ) {
1409 (Some(invoice), _, _, _, _) => {
1410 let payment_hash: String = row.get(11)?;
1411 let destination_pubkey: String = row.get(12)?;
1412 let description: Option<String> = row.get(13)?;
1413 let preimage: Option<String> = row.get(14)?;
1414 let htlc_status: SparkHtlcStatus =
1415 row.get::<_, Option<SparkHtlcStatus>>(15)?.ok_or_else(|| {
1416 rusqlite::Error::FromSqlConversionFailure(
1417 15,
1418 rusqlite::types::Type::Null,
1419 "htlc_status is required for Lightning payments".into(),
1420 )
1421 })?;
1422 let htlc_expiry_time: u64 = row.get(16)?;
1423 let htlc_details = SparkHtlcDetails {
1424 payment_hash,
1425 preimage,
1426 expiry_time: htlc_expiry_time,
1427 status: htlc_status,
1428 };
1429 let lnurl_pay_info: Option<LnurlPayInfo> = row.get(17)?;
1430 let lnurl_withdraw_info: Option<LnurlWithdrawInfo> = row.get(18)?;
1431 let lnurl_nostr_zap_request: Option<String> = row.get(26)?;
1432 let lnurl_nostr_zap_receipt: Option<String> = row.get(27)?;
1433 let lnurl_sender_comment: Option<String> = row.get(28)?;
1434 let lnurl_payment_hash: Option<String> = row.get(29)?;
1435 let lnurl_receive_metadata = if lnurl_payment_hash.is_some() {
1436 Some(LnurlReceiveMetadata {
1437 nostr_zap_request: lnurl_nostr_zap_request,
1438 nostr_zap_receipt: lnurl_nostr_zap_receipt,
1439 sender_comment: lnurl_sender_comment,
1440 })
1441 } else {
1442 None
1443 };
1444 Some(PaymentDetails::Lightning {
1445 invoice,
1446 destination_pubkey,
1447 description,
1448 htlc_details,
1449 lnurl_pay_info,
1450 lnurl_withdraw_info,
1451 lnurl_receive_metadata,
1452 })
1453 }
1454 (_, Some(tx_id), _, _, _) => Some(PaymentDetails::Withdraw { tx_id }),
1455 (_, _, Some(tx_id), _, _) => Some(PaymentDetails::Deposit { tx_id }),
1456 (_, _, _, Some(_), _) => {
1457 let invoice_details_str: Option<String> = row.get(24)?;
1458 let invoice_details = invoice_details_str
1459 .map(|s| serde_json_from_str(&s, 24))
1460 .transpose()?;
1461 let htlc_details_str: Option<String> = row.get(25)?;
1462 let htlc_details = htlc_details_str
1463 .map(|s| serde_json_from_str(&s, 25))
1464 .transpose()?;
1465 let conversion_info_str: Option<String> = row.get(19)?;
1466 let conversion_info: Option<ConversionInfo> = conversion_info_str
1467 .map(|s: String| serde_json_from_str(&s, 19))
1468 .transpose()?;
1469 Some(PaymentDetails::Spark {
1470 invoice_details,
1471 htlc_details,
1472 conversion_info,
1473 })
1474 }
1475 (_, _, _, _, Some(metadata)) => {
1476 let tx_type: TokenTransactionType = row.get(22)?;
1477 let invoice_details_str: Option<String> = row.get(23)?;
1478 let invoice_details = invoice_details_str
1479 .map(|s| serde_json_from_str(&s, 23))
1480 .transpose()?;
1481 let conversion_info_str: Option<String> = row.get(19)?;
1482 let conversion_info: Option<ConversionInfo> = conversion_info_str
1483 .map(|s: String| serde_json_from_str(&s, 19))
1484 .transpose()?;
1485 Some(PaymentDetails::Token {
1486 metadata: serde_json_from_str(&metadata, 20)?,
1487 tx_hash: row.get(21)?,
1488 tx_type,
1489 invoice_details,
1490 conversion_info,
1491 })
1492 }
1493 _ => None,
1494 };
1495 let conversion_status: Option<ConversionStatus> = row.get(30)?;
1497 let conversion_details = conversion_status.map(|status| ConversionDetails {
1498 status,
1499 from: None,
1500 to: None,
1501 });
1502
1503 Ok(Payment {
1504 id: row.get(0)?,
1505 payment_type: row.get::<_, String>(1)?.parse().map_err(|e: String| {
1506 rusqlite::Error::FromSqlConversionFailure(1, rusqlite::types::Type::Text, e.into())
1507 })?,
1508 status: row.get::<_, String>(2)?.parse().map_err(|e: String| {
1509 rusqlite::Error::FromSqlConversionFailure(2, rusqlite::types::Type::Text, e.into())
1510 })?,
1511 amount: row.get::<_, U128SqlWrapper>(3)?.0,
1512 fees: row.get::<_, U128SqlWrapper>(4)?.0,
1513 timestamp: row.get(5)?,
1514 details,
1515 method: row.get(6)?,
1516 conversion_details,
1517 })
1518}
1519
1520impl ToSql for PaymentDetails {
1521 fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
1522 to_sql_json(self)
1523 }
1524}
1525
1526impl FromSql for PaymentDetails {
1527 fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
1528 from_sql_json(value)
1529 }
1530}
1531
1532impl ToSql for PaymentMethod {
1533 fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
1534 Ok(rusqlite::types::ToSqlOutput::from(self.to_string()))
1535 }
1536}
1537
1538impl FromSql for PaymentMethod {
1539 fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
1540 match value {
1541 ValueRef::Text(i) => {
1542 let s = std::str::from_utf8(i).map_err(|e| FromSqlError::Other(Box::new(e)))?;
1543 let payment_method: PaymentMethod = s
1545 .trim_matches('"')
1546 .to_lowercase()
1547 .parse()
1548 .map_err(|()| FromSqlError::InvalidType)?;
1549 Ok(payment_method)
1550 }
1551 _ => Err(FromSqlError::InvalidType),
1552 }
1553 }
1554}
1555
1556impl ToSql for TokenTransactionType {
1557 fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
1558 Ok(rusqlite::types::ToSqlOutput::from(self.to_string()))
1559 }
1560}
1561
1562impl FromSql for TokenTransactionType {
1563 fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
1564 match value {
1565 ValueRef::Text(i) => {
1566 let s = std::str::from_utf8(i).map_err(|e| FromSqlError::Other(Box::new(e)))?;
1567 let tx_type: TokenTransactionType =
1568 s.parse().map_err(|_: String| FromSqlError::InvalidType)?;
1569 Ok(tx_type)
1570 }
1571 _ => Err(FromSqlError::InvalidType),
1572 }
1573 }
1574}
1575
1576impl ToSql for SparkHtlcStatus {
1577 fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
1578 Ok(rusqlite::types::ToSqlOutput::from(self.to_string()))
1579 }
1580}
1581
1582impl FromSql for SparkHtlcStatus {
1583 fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
1584 match value {
1585 ValueRef::Text(i) => {
1586 let s = std::str::from_utf8(i).map_err(|e| FromSqlError::Other(Box::new(e)))?;
1587 let status: SparkHtlcStatus =
1588 s.parse().map_err(|_: String| FromSqlError::InvalidType)?;
1589 Ok(status)
1590 }
1591 _ => Err(FromSqlError::InvalidType),
1592 }
1593 }
1594}
1595
1596impl ToSql for ConversionStatus {
1597 fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
1598 Ok(rusqlite::types::ToSqlOutput::from(self.to_string()))
1599 }
1600}
1601
1602impl FromSql for ConversionStatus {
1603 fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
1604 match value {
1605 ValueRef::Text(i) => {
1606 let s = std::str::from_utf8(i).map_err(|e| FromSqlError::Other(Box::new(e)))?;
1607 let status: ConversionStatus =
1608 s.parse().map_err(|_: String| FromSqlError::InvalidType)?;
1609 Ok(status)
1610 }
1611 _ => Err(FromSqlError::InvalidType),
1612 }
1613 }
1614}
1615
1616impl ToSql for DepositClaimError {
1617 fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
1618 to_sql_json(self)
1619 }
1620}
1621
1622impl FromSql for DepositClaimError {
1623 fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
1624 from_sql_json(value)
1625 }
1626}
1627
1628impl ToSql for LnurlPayInfo {
1629 fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
1630 to_sql_json(self)
1631 }
1632}
1633
1634impl FromSql for LnurlPayInfo {
1635 fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
1636 from_sql_json(value)
1637 }
1638}
1639
1640impl ToSql for LnurlWithdrawInfo {
1641 fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
1642 to_sql_json(self)
1643 }
1644}
1645
1646impl FromSql for LnurlWithdrawInfo {
1647 fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
1648 from_sql_json(value)
1649 }
1650}
1651
1652fn to_sql_json<T>(value: T) -> rusqlite::Result<ToSqlOutput<'static>>
1653where
1654 T: serde::Serialize,
1655{
1656 let json = serde_json::to_string(&value)
1657 .map_err(|e| rusqlite::Error::ToSqlConversionFailure(Box::new(e)))?;
1658 Ok(rusqlite::types::ToSqlOutput::from(json))
1659}
1660
1661fn from_sql_json<T>(value: ValueRef<'_>) -> FromSqlResult<T>
1662where
1663 T: serde::de::DeserializeOwned,
1664{
1665 match value {
1666 ValueRef::Text(i) => {
1667 let s = std::str::from_utf8(i).map_err(|e| FromSqlError::Other(Box::new(e)))?;
1668 let deserialized: T = serde_json::from_str(s).map_err(|_| FromSqlError::InvalidType)?;
1669 Ok(deserialized)
1670 }
1671 _ => Err(FromSqlError::InvalidType),
1672 }
1673}
1674
1675fn serde_json_from_str<T>(value: &str, index: usize) -> Result<T, rusqlite::Error>
1676where
1677 T: serde::de::DeserializeOwned,
1678{
1679 serde_json::from_str(value).map_err(|e| {
1680 rusqlite::Error::FromSqlConversionFailure(index, rusqlite::types::Type::Text, Box::new(e))
1681 })
1682}
1683
1684struct U128SqlWrapper(u128);
1685
1686impl ToSql for U128SqlWrapper {
1687 fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
1688 let string = self.0.to_string();
1689 Ok(rusqlite::types::ToSqlOutput::from(string))
1690 }
1691}
1692
1693impl FromSql for U128SqlWrapper {
1694 fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
1695 match value {
1696 ValueRef::Text(i) => {
1697 let s = std::str::from_utf8(i).map_err(|e| FromSqlError::Other(Box::new(e)))?;
1698 let integer = s.parse::<u128>().map_err(|_| FromSqlError::InvalidType)?;
1699 Ok(U128SqlWrapper(integer))
1700 }
1701 _ => Err(FromSqlError::InvalidType),
1702 }
1703 }
1704}
1705
1706#[cfg(test)]
1707mod tests {
1708
1709 use crate::SqliteStorage;
1710 use std::path::PathBuf;
1711
1712 fn create_temp_dir(name: &str) -> PathBuf {
1715 let mut path = std::env::temp_dir();
1716 path.push(format!("breez-test-{}-{}", name, uuid::Uuid::new_v4()));
1718 std::fs::create_dir_all(&path).unwrap();
1719 path
1720 }
1721
1722 #[tokio::test]
1723 async fn test_storage() {
1724 let temp_dir = create_temp_dir("sqlite_storage");
1725 let storage = SqliteStorage::new(&temp_dir).unwrap();
1726
1727 Box::pin(crate::persist::tests::test_storage(Box::new(storage))).await;
1728 }
1729
1730 #[tokio::test]
1731 async fn test_unclaimed_deposits_crud() {
1732 let temp_dir = create_temp_dir("sqlite_storage_deposits");
1733 let storage = SqliteStorage::new(&temp_dir).unwrap();
1734
1735 crate::persist::tests::test_unclaimed_deposits_crud(Box::new(storage)).await;
1736 }
1737
1738 #[tokio::test]
1739 async fn test_deposit_refunds() {
1740 let temp_dir = create_temp_dir("sqlite_storage_refund_tx");
1741 let storage = SqliteStorage::new(&temp_dir).unwrap();
1742
1743 crate::persist::tests::test_deposit_refunds(Box::new(storage)).await;
1744 }
1745
1746 #[tokio::test]
1747 async fn test_payment_type_filtering() {
1748 let temp_dir = create_temp_dir("sqlite_storage_type_filter");
1749 let storage = SqliteStorage::new(&temp_dir).unwrap();
1750
1751 crate::persist::tests::test_payment_type_filtering(Box::new(storage)).await;
1752 }
1753
1754 #[tokio::test]
1755 async fn test_payment_status_filtering() {
1756 let temp_dir = create_temp_dir("sqlite_storage_status_filter");
1757 let storage = SqliteStorage::new(&temp_dir).unwrap();
1758
1759 crate::persist::tests::test_payment_status_filtering(Box::new(storage)).await;
1760 }
1761
1762 #[tokio::test]
1763 async fn test_payment_asset_filtering() {
1764 let temp_dir = create_temp_dir("sqlite_storage_asset_filter");
1765 let storage = SqliteStorage::new(&temp_dir).unwrap();
1766
1767 crate::persist::tests::test_asset_filtering(Box::new(storage)).await;
1768 }
1769
1770 #[tokio::test]
1771 async fn test_timestamp_filtering() {
1772 let temp_dir = create_temp_dir("sqlite_storage_timestamp_filter");
1773 let storage = SqliteStorage::new(&temp_dir).unwrap();
1774
1775 crate::persist::tests::test_timestamp_filtering(Box::new(storage)).await;
1776 }
1777
1778 #[tokio::test]
1779 async fn test_spark_htlc_status_filtering() {
1780 let temp_dir = create_temp_dir("sqlite_storage_htlc_filter");
1781 let storage = SqliteStorage::new(&temp_dir).unwrap();
1782
1783 crate::persist::tests::test_spark_htlc_status_filtering(Box::new(storage)).await;
1784 }
1785
1786 #[tokio::test]
1787 async fn test_lightning_htlc_details_and_status_filtering() {
1788 let temp_dir = create_temp_dir("sqlite_storage_htlc_details");
1789 let storage = SqliteStorage::new(&temp_dir).unwrap();
1790
1791 crate::persist::tests::test_lightning_htlc_details_and_status_filtering(Box::new(storage))
1792 .await;
1793 }
1794
1795 #[tokio::test]
1796 async fn test_conversion_refund_needed_filtering() {
1797 let temp_dir = create_temp_dir("sqlite_storage_conversion_refund_needed_filter");
1798 let storage = SqliteStorage::new(&temp_dir).unwrap();
1799
1800 crate::persist::tests::test_conversion_refund_needed_filtering(Box::new(storage)).await;
1801 }
1802
1803 #[tokio::test]
1804 async fn test_token_transaction_type_filtering() {
1805 let temp_dir = create_temp_dir("sqlite_storage_token_transaction_type_filter");
1806 let storage = SqliteStorage::new(&temp_dir).unwrap();
1807
1808 crate::persist::tests::test_token_transaction_type_filtering(Box::new(storage)).await;
1809 }
1810
1811 #[tokio::test]
1812 async fn test_combined_filters() {
1813 let temp_dir = create_temp_dir("sqlite_storage_combined_filter");
1814 let storage = SqliteStorage::new(&temp_dir).unwrap();
1815
1816 crate::persist::tests::test_combined_filters(Box::new(storage)).await;
1817 }
1818
1819 #[tokio::test]
1820 async fn test_sort_order() {
1821 let temp_dir = create_temp_dir("sqlite_storage_sort_order");
1822 let storage = SqliteStorage::new(&temp_dir).unwrap();
1823
1824 crate::persist::tests::test_sort_order(Box::new(storage)).await;
1825 }
1826
1827 #[tokio::test]
1828 async fn test_payment_metadata() {
1829 let temp_dir = create_temp_dir("sqlite_storage_payment_request_metadata");
1830 let storage = SqliteStorage::new(&temp_dir).unwrap();
1831
1832 crate::persist::tests::test_payment_metadata(Box::new(storage)).await;
1833 }
1834
1835 #[tokio::test]
1836 async fn test_payment_details_update_persistence() {
1837 let temp_dir = create_temp_dir("sqlite_storage_payment_details_update");
1838 let storage = SqliteStorage::new(&temp_dir).unwrap();
1839
1840 crate::persist::tests::test_payment_details_update_persistence(Box::new(storage)).await;
1841 }
1842
1843 #[tokio::test]
1844 async fn test_sync_storage() {
1845 let temp_dir = create_temp_dir("sqlite_sync_storage");
1846 let storage = SqliteStorage::new(&temp_dir).unwrap();
1847
1848 crate::persist::tests::test_sync_storage(Box::new(storage)).await;
1849 }
1850
1851 #[tokio::test]
1852 async fn test_payment_metadata_merge() {
1853 let temp_dir = create_temp_dir("sqlite_payment_metadata_merge");
1854 let storage = SqliteStorage::new(&temp_dir).unwrap();
1855
1856 crate::persist::tests::test_payment_metadata_merge(Box::new(storage)).await;
1857 }
1858
1859 #[tokio::test]
1860 #[allow(clippy::too_many_lines)]
1861 async fn test_migration_tx_type() {
1862 use crate::{
1863 Payment, PaymentDetails, PaymentMethod, PaymentStatus, PaymentType, Storage,
1864 TokenMetadata, TokenTransactionType,
1865 persist::{StorageListPaymentsRequest, StoragePaymentDetailsFilter},
1866 };
1867 use rusqlite::{Connection, params};
1868 use rusqlite_migration::{M, Migrations};
1869
1870 let temp_dir = create_temp_dir("sqlite_migration_tx_type");
1871 let db_path = temp_dir.join(super::DEFAULT_DB_FILENAME);
1872
1873 {
1875 let mut conn = Connection::open(&db_path).unwrap();
1876 let migrations_before_tx_type: Vec<_> = SqliteStorage::current_migrations()
1877 .iter()
1878 .take(22) .map(|s| M::up(s))
1880 .collect();
1881 let migrations = Migrations::new(migrations_before_tx_type);
1882 migrations.to_latest(&mut conn).unwrap();
1883 }
1884
1885 {
1887 let conn = Connection::open(&db_path).unwrap();
1888
1889 conn.execute(
1891 "INSERT INTO payments (id, payment_type, status, amount, fees, timestamp, method)
1892 VALUES (?, ?, ?, ?, ?, ?, ?)",
1893 params![
1894 "token-migration-test",
1895 "send",
1896 "completed",
1897 "5000",
1898 "10",
1899 1_234_567_890_i64,
1900 "\"token\""
1901 ],
1902 )
1903 .unwrap();
1904
1905 let metadata = serde_json::json!({
1907 "identifier": "test-token-id",
1908 "issuer_public_key": format!("02{}", "a".repeat(64)),
1909 "name": "Test Token",
1910 "ticker": "TST",
1911 "decimals": 8,
1912 "max_supply": 1_000_000_u128,
1913 "is_freezable": false
1914 });
1915
1916 conn.execute(
1917 "INSERT INTO payment_details_token (payment_id, metadata, tx_hash)
1918 VALUES (?, ?, ?)",
1919 params![
1920 "token-migration-test",
1921 metadata.to_string(),
1922 "0xabcdef1234567890"
1923 ],
1924 )
1925 .unwrap();
1926 }
1927
1928 let storage = SqliteStorage::new(&temp_dir).unwrap();
1930
1931 let migrated_payment = storage
1933 .get_payment_by_id("token-migration-test".to_string())
1934 .await
1935 .unwrap();
1936
1937 assert_eq!(migrated_payment.id, "token-migration-test");
1938 assert_eq!(migrated_payment.amount, 5000);
1939 assert_eq!(migrated_payment.fees, 10);
1940 assert_eq!(migrated_payment.status, PaymentStatus::Completed);
1941 assert_eq!(migrated_payment.payment_type, PaymentType::Send);
1942 assert_eq!(migrated_payment.method, PaymentMethod::Token);
1943
1944 match migrated_payment.details {
1946 Some(PaymentDetails::Token {
1947 metadata,
1948 tx_hash,
1949 tx_type,
1950 ..
1951 }) => {
1952 assert_eq!(metadata.identifier, "test-token-id");
1953 assert_eq!(metadata.name, "Test Token");
1954 assert_eq!(metadata.ticker, "TST");
1955 assert_eq!(metadata.decimals, 8);
1956 assert_eq!(tx_hash, "0xabcdef1234567890");
1957 assert_eq!(
1959 tx_type,
1960 TokenTransactionType::Transfer,
1961 "Migration should add default txType 'transfer' to token payments"
1962 );
1963 }
1964 _ => panic!("Expected Token payment details"),
1965 }
1966
1967 let new_payment = Payment {
1969 id: "new-token-payment".to_string(),
1970 payment_type: PaymentType::Receive,
1971 status: PaymentStatus::Completed,
1972 amount: 8000,
1973 fees: 20,
1974 timestamp: 1_234_567_891,
1975 method: PaymentMethod::Token,
1976 details: Some(PaymentDetails::Token {
1977 metadata: TokenMetadata {
1978 identifier: "another-token-id".to_string(),
1979 issuer_public_key: format!("02{}", "b".repeat(64)),
1980 name: "Another Token".to_string(),
1981 ticker: "ATK".to_string(),
1982 decimals: 6,
1983 max_supply: 2_000_000,
1984 is_freezable: true,
1985 },
1986 tx_hash: "0x1111222233334444".to_string(),
1987 tx_type: TokenTransactionType::Mint,
1988 invoice_details: None,
1989 conversion_info: None,
1990 }),
1991 conversion_details: None,
1992 };
1993
1994 storage.insert_payment(new_payment).await.unwrap();
1995
1996 let request = StorageListPaymentsRequest {
1998 type_filter: None,
1999 status_filter: None,
2000 asset_filter: None,
2001 payment_details_filter: None,
2002 from_timestamp: None,
2003 to_timestamp: None,
2004 offset: None,
2005 limit: None,
2006 sort_ascending: Some(true),
2007 };
2008
2009 let payments = storage.list_payments(request).await.unwrap();
2010 assert_eq!(payments.len(), 2, "Should have both payments");
2011
2012 let migrated = payments
2014 .iter()
2015 .find(|p| p.id == "token-migration-test")
2016 .unwrap();
2017 match &migrated.details {
2018 Some(PaymentDetails::Token { tx_type, .. }) => {
2019 assert_eq!(*tx_type, TokenTransactionType::Transfer);
2020 }
2021 _ => panic!("Expected Token payment details"),
2022 }
2023
2024 let new = payments
2026 .iter()
2027 .find(|p| p.id == "new-token-payment")
2028 .unwrap();
2029 match &new.details {
2030 Some(PaymentDetails::Token { tx_type, .. }) => {
2031 assert_eq!(*tx_type, TokenTransactionType::Mint);
2032 }
2033 _ => panic!("Expected Token payment details"),
2034 }
2035
2036 let transfer_filter = StorageListPaymentsRequest {
2038 type_filter: None,
2039 status_filter: None,
2040 asset_filter: None,
2041 payment_details_filter: Some(vec![StoragePaymentDetailsFilter::Token {
2042 conversion_refund_needed: None,
2043 tx_hash: None,
2044 tx_type: Some(TokenTransactionType::Transfer),
2045 }]),
2046 from_timestamp: None,
2047 to_timestamp: None,
2048 offset: None,
2049 limit: None,
2050 sort_ascending: Some(true),
2051 };
2052
2053 let transfer_payments = storage.list_payments(transfer_filter).await.unwrap();
2054 assert_eq!(
2055 transfer_payments.len(),
2056 1,
2057 "Should find only the Transfer payment"
2058 );
2059 assert_eq!(transfer_payments[0].id, "token-migration-test");
2060 }
2061
2062 #[tokio::test]
2063 #[allow(clippy::too_many_lines)]
2064 async fn test_migration_htlc_details() {
2065 use crate::{
2066 PaymentDetails, SparkHtlcStatus, Storage,
2067 persist::{StorageListPaymentsRequest, StoragePaymentDetailsFilter},
2068 };
2069 use rusqlite::{Connection, params};
2070 use rusqlite_migration::{M, Migrations};
2071
2072 let temp_dir = create_temp_dir("sqlite_migration_htlc_details");
2073 let db_path = temp_dir.join(super::DEFAULT_DB_FILENAME);
2074
2075 {
2079 let mut conn = Connection::open(&db_path).unwrap();
2080 let migrations_before_backfill: Vec<_> = SqliteStorage::current_migrations()
2081 .iter()
2082 .take(23) .map(|s| M::up(s))
2084 .collect();
2085 let migrations = Migrations::new(migrations_before_backfill);
2086 migrations.to_latest(&mut conn).unwrap();
2087 }
2088
2089 {
2091 let conn = Connection::open(&db_path).unwrap();
2092
2093 conn.execute(
2095 "INSERT INTO payments (id, payment_type, status, amount, fees, timestamp, method)
2096 VALUES (?, ?, ?, ?, ?, ?, ?)",
2097 params![
2098 "ln-completed",
2099 "send",
2100 "completed",
2101 "1000",
2102 "10",
2103 1_700_000_001_i64,
2104 "\"lightning\""
2105 ],
2106 )
2107 .unwrap();
2108 conn.execute(
2109 "INSERT INTO payment_details_lightning (payment_id, invoice, payment_hash, destination_pubkey, preimage)
2110 VALUES (?, ?, ?, ?, ?)",
2111 params![
2112 "ln-completed",
2113 "lnbc_completed",
2114 "hash_completed_0123456789abcdef0123456789abcdef0123456789abcdef01234567",
2115 "03pubkey1",
2116 "preimage_completed"
2117 ],
2118 )
2119 .unwrap();
2120
2121 conn.execute(
2123 "INSERT INTO payments (id, payment_type, status, amount, fees, timestamp, method)
2124 VALUES (?, ?, ?, ?, ?, ?, ?)",
2125 params![
2126 "ln-pending",
2127 "receive",
2128 "pending",
2129 "2000",
2130 "0",
2131 1_700_000_002_i64,
2132 "\"lightning\""
2133 ],
2134 )
2135 .unwrap();
2136 conn.execute(
2137 "INSERT INTO payment_details_lightning (payment_id, invoice, payment_hash, destination_pubkey)
2138 VALUES (?, ?, ?, ?)",
2139 params![
2140 "ln-pending",
2141 "lnbc_pending",
2142 "hash_pending_0123456789abcdef0123456789abcdef0123456789abcdef012345678",
2143 "03pubkey2"
2144 ],
2145 )
2146 .unwrap();
2147
2148 conn.execute(
2150 "INSERT INTO payments (id, payment_type, status, amount, fees, timestamp, method)
2151 VALUES (?, ?, ?, ?, ?, ?, ?)",
2152 params![
2153 "ln-failed",
2154 "send",
2155 "failed",
2156 "3000",
2157 "5",
2158 1_700_000_003_i64,
2159 "\"lightning\""
2160 ],
2161 )
2162 .unwrap();
2163 conn.execute(
2164 "INSERT INTO payment_details_lightning (payment_id, invoice, payment_hash, destination_pubkey)
2165 VALUES (?, ?, ?, ?)",
2166 params![
2167 "ln-failed",
2168 "lnbc_failed",
2169 "hash_failed_0123456789abcdef0123456789abcdef0123456789abcdef0123456789",
2170 "03pubkey3"
2171 ],
2172 )
2173 .unwrap();
2174 }
2175
2176 let storage = SqliteStorage::new(&temp_dir).unwrap();
2178
2179 let completed = storage
2181 .get_payment_by_id("ln-completed".to_string())
2182 .await
2183 .unwrap();
2184 match &completed.details {
2185 Some(PaymentDetails::Lightning { htlc_details, .. }) => {
2186 assert_eq!(htlc_details.status, SparkHtlcStatus::PreimageShared);
2187 assert_eq!(htlc_details.expiry_time, 0);
2188 assert_eq!(
2189 htlc_details.payment_hash,
2190 "hash_completed_0123456789abcdef0123456789abcdef0123456789abcdef01234567"
2191 );
2192 assert_eq!(htlc_details.preimage.as_deref(), Some("preimage_completed"));
2193 }
2194 _ => panic!("Expected Lightning payment details for ln-completed"),
2195 }
2196
2197 let pending = storage
2199 .get_payment_by_id("ln-pending".to_string())
2200 .await
2201 .unwrap();
2202 match &pending.details {
2203 Some(PaymentDetails::Lightning { htlc_details, .. }) => {
2204 assert_eq!(htlc_details.status, SparkHtlcStatus::WaitingForPreimage);
2205 assert_eq!(htlc_details.expiry_time, 0);
2206 assert_eq!(
2207 htlc_details.payment_hash,
2208 "hash_pending_0123456789abcdef0123456789abcdef0123456789abcdef012345678"
2209 );
2210 assert!(htlc_details.preimage.is_none());
2211 }
2212 _ => panic!("Expected Lightning payment details for ln-pending"),
2213 }
2214
2215 let failed = storage
2217 .get_payment_by_id("ln-failed".to_string())
2218 .await
2219 .unwrap();
2220 match &failed.details {
2221 Some(PaymentDetails::Lightning { htlc_details, .. }) => {
2222 assert_eq!(htlc_details.status, SparkHtlcStatus::Returned);
2223 assert_eq!(htlc_details.expiry_time, 0);
2224 }
2225 _ => panic!("Expected Lightning payment details for ln-failed"),
2226 }
2227
2228 let waiting_payments = storage
2230 .list_payments(StorageListPaymentsRequest {
2231 payment_details_filter: Some(vec![StoragePaymentDetailsFilter::Lightning {
2232 htlc_status: Some(vec![SparkHtlcStatus::WaitingForPreimage]),
2233 }]),
2234 ..Default::default()
2235 })
2236 .await
2237 .unwrap();
2238 assert_eq!(waiting_payments.len(), 1);
2239 assert_eq!(waiting_payments[0].id, "ln-pending");
2240
2241 let preimage_shared = storage
2242 .list_payments(StorageListPaymentsRequest {
2243 payment_details_filter: Some(vec![StoragePaymentDetailsFilter::Lightning {
2244 htlc_status: Some(vec![SparkHtlcStatus::PreimageShared]),
2245 }]),
2246 ..Default::default()
2247 })
2248 .await
2249 .unwrap();
2250 assert_eq!(preimage_shared.len(), 1);
2251 assert_eq!(preimage_shared[0].id, "ln-completed");
2252
2253 let returned = storage
2254 .list_payments(StorageListPaymentsRequest {
2255 payment_details_filter: Some(vec![StoragePaymentDetailsFilter::Lightning {
2256 htlc_status: Some(vec![SparkHtlcStatus::Returned]),
2257 }]),
2258 ..Default::default()
2259 })
2260 .await
2261 .unwrap();
2262 assert_eq!(returned.len(), 1);
2263 assert_eq!(returned[0].id, "ln-failed");
2264 }
2265
2266 #[tokio::test]
2267 async fn test_contacts_crud() {
2268 let temp_dir = create_temp_dir("contacts_crud");
2269 let storage = SqliteStorage::new(&temp_dir).unwrap();
2270
2271 crate::persist::tests::test_contacts_crud(Box::new(storage)).await;
2272 }
2273
2274 #[tokio::test]
2275 async fn test_conversion_status_persistence() {
2276 let temp_dir = create_temp_dir("sqlite_conversion_status_persistence");
2277 let storage = SqliteStorage::new(&temp_dir).unwrap();
2278
2279 crate::persist::tests::test_conversion_status_persistence(Box::new(storage)).await;
2280 }
2281}