1use std::path::{Path, PathBuf};
2
3use macros::async_trait;
4use rusqlite::{
5 Connection, Row, ToSql, TransactionBehavior, params,
6 types::{FromSql, FromSqlError, FromSqlResult, ToSqlOutput, ValueRef},
7};
8use rusqlite_migration::{M, Migrations, SchemaVersion};
9
10use crate::{
11 AssetFilter, ConversionInfo, DepositInfo, LnurlPayInfo, LnurlReceiveMetadata,
12 LnurlWithdrawInfo, PaymentDetails, PaymentMethod, SparkHtlcDetails, SparkHtlcStatus,
13 TokenTransactionType,
14 error::DepositClaimError,
15 persist::{
16 PaymentMetadata, SetLnurlMetadataItem, StorageListPaymentsRequest,
17 StoragePaymentDetailsFilter, UpdateDepositPayload,
18 },
19 sync_storage::{
20 IncomingChange, OutgoingChange, Record, RecordChange, RecordId, UnversionedRecordChange,
21 },
22};
23
24use std::collections::HashMap;
25
26use tracing::warn;
27
28use super::{Payment, Storage, StorageError};
29
30const DEFAULT_DB_FILENAME: &str = "storage.sql";
31pub struct SqliteStorage {
33 db_dir: PathBuf,
34}
35
36impl SqliteStorage {
37 pub fn new(path: &Path) -> Result<Self, StorageError> {
47 let storage = Self {
48 db_dir: path.to_path_buf(),
49 };
50
51 #[cfg(not(all(target_family = "wasm", target_os = "unknown")))]
52 std::fs::create_dir_all(path)
53 .map_err(|e| StorageError::InitializationError(e.to_string()))?;
54
55 storage.migrate()?;
56 Ok(storage)
57 }
58
59 pub(crate) fn get_connection(&self) -> Result<Connection, StorageError> {
60 Ok(Connection::open(self.get_db_path())?)
61 }
62
63 fn get_db_path(&self) -> PathBuf {
64 self.db_dir.join(DEFAULT_DB_FILENAME)
65 }
66
67 fn migrate(&self) -> Result<(), StorageError> {
68 let migrations =
69 Migrations::new(Self::current_migrations().into_iter().map(M::up).collect());
70 let mut conn = self.get_connection()?;
71 let previous_version = match migrations.current_version(&conn)? {
72 SchemaVersion::Inside(previous_version) => previous_version.get(),
73 _ => 0,
74 };
75 migrations.to_latest(&mut conn)?;
76
77 if previous_version < 6 {
78 Self::migrate_lnurl_metadata_description(&mut conn)?;
79 }
80
81 Ok(())
82 }
83
84 fn migrate_lnurl_metadata_description(conn: &mut Connection) -> Result<(), StorageError> {
85 let mut stmt = conn.prepare("SELECT payment_id, lnurl_pay_info FROM payment_metadata")?;
86 let pay_infos: Vec<_> = stmt
87 .query_map([], |row| {
88 let payment_id: String = row.get(0)?;
89 let lnurl_pay_info: Option<LnurlPayInfo> = row.get(1)?;
90 Ok((payment_id, lnurl_pay_info))
91 })?
92 .collect::<Result<_, _>>()?;
93 let pay_infos = pay_infos
94 .into_iter()
95 .filter_map(|(payment_id, lnurl_pay_info)| {
96 let pay_info = lnurl_pay_info?;
97 let description = pay_info.extract_description()?;
98 Some((payment_id, description))
99 })
100 .collect::<Vec<_>>();
101
102 for pay_info in pay_infos {
103 conn.execute(
104 "UPDATE payment_metadata SET lnurl_description = ? WHERE payment_id = ?",
105 params![pay_info.1, pay_info.0],
106 )?;
107 }
108
109 Ok(())
110 }
111
112 #[allow(clippy::too_many_lines)]
113 pub(crate) fn current_migrations() -> Vec<&'static str> {
114 vec![
115 "CREATE TABLE IF NOT EXISTS payments (
116 id TEXT PRIMARY KEY,
117 payment_type TEXT NOT NULL,
118 status TEXT NOT NULL,
119 amount INTEGER NOT NULL,
120 fees INTEGER NOT NULL,
121 timestamp INTEGER NOT NULL,
122 details TEXT,
123 method TEXT
124 );",
125 "CREATE TABLE IF NOT EXISTS settings (
126 key TEXT PRIMARY KEY,
127 value TEXT NOT NULL
128 );",
129 "CREATE TABLE IF NOT EXISTS unclaimed_deposits (
130 txid TEXT NOT NULL,
131 vout INTEGER NOT NULL,
132 amount_sats INTEGER,
133 claim_error TEXT,
134 refund_tx TEXT,
135 refund_tx_id TEXT,
136 PRIMARY KEY (txid, vout)
137 );",
138 "CREATE TABLE IF NOT EXISTS payment_metadata (
139 payment_id TEXT PRIMARY KEY,
140 lnurl_pay_info TEXT
141 );",
142 "CREATE TABLE IF NOT EXISTS deposit_refunds (
143 deposit_tx_id TEXT NOT NULL,
144 deposit_vout INTEGER NOT NULL,
145 refund_tx TEXT NOT NULL,
146 refund_tx_id TEXT NOT NULL,
147 PRIMARY KEY (deposit_tx_id, deposit_vout)
148 );",
149 "ALTER TABLE payment_metadata ADD COLUMN lnurl_description TEXT;",
150 "
151 ALTER TABLE payments ADD COLUMN withdraw_tx_id TEXT;
152 ALTER TABLE payments ADD COLUMN deposit_tx_id TEXT;
153 ALTER TABLE payments ADD COLUMN spark INTEGER;
154 CREATE TABLE payment_details_lightning (
155 payment_id TEXT PRIMARY KEY,
156 invoice TEXT NOT NULL,
157 payment_hash TEXT NOT NULL,
158 destination_pubkey TEXT NOT NULL,
159 description TEXT,
160 preimage TEXT,
161 FOREIGN KEY (payment_id) REFERENCES payments(id) ON DELETE CASCADE
162 );
163 INSERT INTO payment_details_lightning (payment_id, invoice, payment_hash, destination_pubkey, description, preimage)
164 SELECT id, json_extract(details, '$.Lightning.invoice'), json_extract(details, '$.Lightning.payment_hash'),
165 json_extract(details, '$.Lightning.destination_pubkey'), json_extract(details, '$.Lightning.description'),
166 json_extract(details, '$.Lightning.preimage')
167 FROM payments WHERE json_extract(details, '$.Lightning.invoice') IS NOT NULL;
168
169 UPDATE payments SET withdraw_tx_id = json_extract(details, '$.Withdraw.tx_id')
170 WHERE json_extract(details, '$.Withdraw.tx_id') IS NOT NULL;
171
172 UPDATE payments SET deposit_tx_id = json_extract(details, '$.Deposit.tx_id')
173 WHERE json_extract(details, '$.Deposit.tx_id') IS NOT NULL;
174
175 ALTER TABLE payments DROP COLUMN details;
176
177 CREATE INDEX idx_payment_details_lightning_invoice ON payment_details_lightning(invoice);
178 ",
179 "CREATE TABLE payment_details_token (
180 payment_id TEXT PRIMARY KEY,
181 metadata TEXT NOT NULL,
182 tx_hash TEXT NOT NULL,
183 FOREIGN KEY (payment_id) REFERENCES payments(id) ON DELETE CASCADE
184 );",
185 "CREATE TABLE payments_new (
187 id TEXT PRIMARY KEY,
188 payment_type TEXT NOT NULL,
189 status TEXT NOT NULL,
190 amount TEXT NOT NULL,
191 fees TEXT NOT NULL,
192 timestamp INTEGER NOT NULL,
193 method TEXT,
194 withdraw_tx_id TEXT,
195 deposit_tx_id TEXT,
196 spark INTEGER
197 );",
198 "INSERT INTO payments_new (id, payment_type, status, amount, fees, timestamp, method, withdraw_tx_id, deposit_tx_id, spark)
199 SELECT id, payment_type, status, CAST(amount AS TEXT), CAST(fees AS TEXT), timestamp, method, withdraw_tx_id, deposit_tx_id, spark
200 FROM payments;",
201 "DROP TABLE payments;",
202 "ALTER TABLE payments_new RENAME TO payments;",
203 "CREATE TABLE payment_details_spark (
204 payment_id TEXT NOT NULL PRIMARY KEY,
205 invoice_details TEXT NOT NULL,
206 FOREIGN KEY (payment_id) REFERENCES payments(id) ON DELETE CASCADE
207 );
208 ALTER TABLE payment_details_token ADD COLUMN invoice_details TEXT;",
209 "ALTER TABLE payment_metadata ADD COLUMN lnurl_withdraw_info TEXT;",
210 "CREATE TABLE sync_revision (
214 revision INTEGER NOT NULL DEFAULT 0
215 );
216 INSERT INTO sync_revision (revision) VALUES (0);
217 CREATE TABLE sync_outgoing(
218 record_type TEXT NOT NULL,
219 data_id TEXT NOT NULL,
220 schema_version TEXT NOT NULL,
221 commit_time INTEGER NOT NULL,
222 updated_fields_json TEXT NOT NULL,
223 revision INTEGER NOT NULL
224 );
225 CREATE INDEX idx_sync_outgoing_data_id_record_type ON sync_outgoing(record_type, data_id);
226 CREATE TABLE sync_state(
227 record_type TEXT NOT NULL,
228 data_id TEXT NOT NULL,
229 schema_version TEXT NOT NULL,
230 commit_time INTEGER NOT NULL,
231 data TEXT NOT NULL,
232 revision INTEGER NOT NULL,
233 PRIMARY KEY(record_type, data_id)
234 );",
235 "CREATE TABLE sync_incoming(
236 record_type TEXT NOT NULL,
237 data_id TEXT NOT NULL,
238 schema_version TEXT NOT NULL,
239 commit_time INTEGER NOT NULL,
240 data TEXT NOT NULL,
241 revision INTEGER NOT NULL,
242 PRIMARY KEY(record_type, data_id, revision)
243 );
244 CREATE INDEX idx_sync_incoming_revision ON sync_incoming(revision);",
245 "ALTER TABLE payment_details_spark RENAME TO tmp_payment_details_spark;
246 CREATE TABLE payment_details_spark (
247 payment_id TEXT NOT NULL PRIMARY KEY,
248 invoice_details TEXT,
249 htlc_details TEXT,
250 FOREIGN KEY (payment_id) REFERENCES payments(id) ON DELETE CASCADE
251 );
252 INSERT INTO payment_details_spark (payment_id, invoice_details)
253 SELECT payment_id, invoice_details FROM tmp_payment_details_spark;
254 DROP TABLE tmp_payment_details_spark;",
255 "CREATE TABLE lnurl_receive_metadata (
256 payment_hash TEXT NOT NULL PRIMARY KEY,
257 nostr_zap_request TEXT,
258 nostr_zap_receipt TEXT,
259 sender_comment TEXT
260 );",
261 "DELETE FROM unclaimed_deposits;",
264 "DELETE FROM sync_outgoing;
269 DELETE FROM sync_incoming;
270 DELETE FROM sync_state;
271 UPDATE sync_revision SET revision = 0;
272 DELETE FROM settings WHERE key = 'sync_initial_complete';",
273 "ALTER TABLE payment_metadata ADD COLUMN token_conversion_info TEXT;",
274 "ALTER TABLE payment_metadata ADD COLUMN parent_payment_id TEXT;",
275 "
276 ALTER TABLE payment_metadata DROP COLUMN token_conversion_info;
277 ALTER TABLE payment_metadata ADD COLUMN conversion_info TEXT;
278 ",
279 "ALTER TABLE payment_details_token ADD COLUMN tx_type TEXT NOT NULL DEFAULT 'transfer';
284 UPDATE settings
285 SET value = json_set(value, '$.last_synced_final_token_payment_id', NULL)
286 WHERE key = 'sync_offset' AND json_valid(value) AND json_type(value, '$.last_synced_final_token_payment_id') IS NOT NULL;",
287 "DELETE FROM sync_outgoing;
288 DELETE FROM sync_incoming;
289 DELETE FROM sync_state;
290 UPDATE sync_revision SET revision = 0;
291 DELETE FROM settings WHERE key = 'sync_initial_complete';",
292 "ALTER TABLE payment_details_lightning ADD COLUMN htlc_status TEXT NOT NULL DEFAULT 'WaitingForPreimage';
293 ALTER TABLE payment_details_lightning ADD COLUMN htlc_expiry_time INTEGER NOT NULL DEFAULT 0;",
294 "UPDATE payment_details_lightning
299 SET htlc_status = CASE
300 WHEN (SELECT status FROM payments WHERE id = payment_id) = 'completed' THEN 'PreimageShared'
301 WHEN (SELECT status FROM payments WHERE id = payment_id) = 'pending' THEN 'WaitingForPreimage'
302 ELSE 'Returned'
303 END;
304 UPDATE settings
305 SET value = json_set(value, '$.offset', 0)
306 WHERE key = 'sync_offset' AND json_valid(value);",
307 "ALTER TABLE lnurl_receive_metadata ADD COLUMN preimage TEXT;",
309 "DELETE FROM settings WHERE key = 'lnurl_metadata_updated_after';",
312 "DELETE FROM settings WHERE key = 'lightning_address';",
314 ]
315 }
316}
317
318#[allow(clippy::needless_pass_by_value)]
322fn map_sqlite_error(e: rusqlite::Error) -> StorageError {
323 match e {
324 rusqlite::Error::SqliteFailure(err, _)
325 if err.code == rusqlite::ErrorCode::DatabaseBusy
326 || err.code == rusqlite::ErrorCode::DatabaseLocked =>
327 {
328 StorageError::Connection(e.to_string())
329 }
330 _ => StorageError::Implementation(e.to_string()),
331 }
332}
333
334impl From<rusqlite::Error> for StorageError {
335 fn from(value: rusqlite::Error) -> Self {
336 map_sqlite_error(value)
337 }
338}
339
340impl From<rusqlite_migration::Error> for StorageError {
341 fn from(value: rusqlite_migration::Error) -> Self {
342 StorageError::Implementation(value.to_string())
343 }
344}
345
346#[async_trait]
347impl Storage for SqliteStorage {
348 #[allow(clippy::too_many_lines)]
349 async fn list_payments(
350 &self,
351 request: StorageListPaymentsRequest,
352 ) -> Result<Vec<Payment>, StorageError> {
353 let connection = self.get_connection()?;
354
355 let mut where_clauses = Vec::new();
357 let mut params: Vec<Box<dyn ToSql>> = Vec::new();
358
359 if let Some(ref type_filter) = request.type_filter
361 && !type_filter.is_empty()
362 {
363 let placeholders = type_filter
364 .iter()
365 .map(|_| "?")
366 .collect::<Vec<_>>()
367 .join(", ");
368 where_clauses.push(format!("p.payment_type IN ({placeholders})"));
369 for payment_type in type_filter {
370 params.push(Box::new(payment_type.to_string()));
371 }
372 }
373
374 if let Some(ref status_filter) = request.status_filter
376 && !status_filter.is_empty()
377 {
378 let placeholders = status_filter
379 .iter()
380 .map(|_| "?")
381 .collect::<Vec<_>>()
382 .join(", ");
383 where_clauses.push(format!("p.status IN ({placeholders})"));
384 for status in status_filter {
385 params.push(Box::new(status.to_string()));
386 }
387 }
388
389 if let Some(from_timestamp) = request.from_timestamp {
391 where_clauses.push("p.timestamp >= ?".to_string());
392 params.push(Box::new(from_timestamp));
393 }
394
395 if let Some(to_timestamp) = request.to_timestamp {
396 where_clauses.push("p.timestamp < ?".to_string());
397 params.push(Box::new(to_timestamp));
398 }
399
400 if let Some(ref asset_filter) = request.asset_filter {
402 match asset_filter {
403 AssetFilter::Bitcoin => {
404 where_clauses.push("t.metadata IS NULL".to_string());
405 }
406 AssetFilter::Token { token_identifier } => {
407 where_clauses.push("t.metadata IS NOT NULL".to_string());
408 if let Some(identifier) = token_identifier {
409 where_clauses
411 .push("json_extract(t.metadata, '$.identifier') = ?".to_string());
412 params.push(Box::new(identifier.clone()));
413 }
414 }
415 }
416 }
417
418 if let Some(ref payment_details_filter) = request.payment_details_filter {
420 let mut all_payment_details_clauses = Vec::new();
421 for payment_details_filter in payment_details_filter {
422 let mut payment_details_clauses = Vec::new();
423 let htlc_filter = match payment_details_filter {
425 StoragePaymentDetailsFilter::Spark {
426 htlc_status: Some(s),
427 ..
428 } if !s.is_empty() => Some(("s", s)),
429 StoragePaymentDetailsFilter::Lightning {
430 htlc_status: Some(s),
431 ..
432 } if !s.is_empty() => Some(("l", s)),
433 _ => None,
434 };
435 if let Some((alias, htlc_statuses)) = htlc_filter {
436 let placeholders = htlc_statuses
437 .iter()
438 .map(|_| "?")
439 .collect::<Vec<_>>()
440 .join(", ");
441 if alias == "l" {
442 payment_details_clauses.push(format!("l.htlc_status IN ({placeholders})"));
444 } else {
445 payment_details_clauses.push(format!(
447 "json_extract(s.htlc_details, '$.status') IN ({placeholders})"
448 ));
449 }
450 for htlc_status in htlc_statuses {
451 params.push(Box::new(htlc_status.to_string()));
452 }
453 }
454 let conversion_filter = match payment_details_filter {
456 StoragePaymentDetailsFilter::Spark {
457 conversion_refund_needed: Some(v),
458 ..
459 } => Some((v, "p.spark = 1")),
460 StoragePaymentDetailsFilter::Token {
461 conversion_refund_needed: Some(v),
462 ..
463 } => Some((v, "p.spark IS NULL")),
464 _ => None,
465 };
466 if let Some((conversion_refund_needed, type_check)) = conversion_filter {
467 let refund_needed = if *conversion_refund_needed {
468 "= 'RefundNeeded'"
469 } else {
470 "!= 'RefundNeeded'"
471 };
472 payment_details_clauses.push(format!(
473 "{type_check} AND pm.conversion_info IS NOT NULL AND
474 json_extract(pm.conversion_info, '$.status') {refund_needed}"
475 ));
476 }
477 if let StoragePaymentDetailsFilter::Token {
479 tx_hash: Some(tx_hash),
480 ..
481 } = payment_details_filter
482 {
483 payment_details_clauses.push("t.tx_hash = ?".to_string());
484 params.push(Box::new(tx_hash.clone()));
485 }
486
487 if let StoragePaymentDetailsFilter::Token {
489 tx_type: Some(tx_type),
490 ..
491 } = payment_details_filter
492 {
493 payment_details_clauses.push("t.tx_type = ?".to_string());
494 params.push(Box::new(tx_type.to_string()));
495 }
496
497 if let StoragePaymentDetailsFilter::Lightning {
499 has_lnurl_preimage: Some(has_preimage),
500 ..
501 } = payment_details_filter
502 {
503 if *has_preimage {
504 payment_details_clauses.push("lrm.preimage IS NOT NULL".to_string());
505 } else {
506 payment_details_clauses.push(
508 "lrm.payment_hash IS NOT NULL AND l.preimage IS NOT NULL AND lrm.preimage IS NULL".to_string(),
509 );
510 }
511 }
512
513 if !payment_details_clauses.is_empty() {
514 all_payment_details_clauses
515 .push(format!("({})", payment_details_clauses.join(" AND ")));
516 }
517 }
518
519 if !all_payment_details_clauses.is_empty() {
520 where_clauses.push(format!("({})", all_payment_details_clauses.join(" OR ")));
521 }
522 }
523
524 where_clauses.push("pm.parent_payment_id IS NULL".to_string());
527
528 let where_sql = if where_clauses.is_empty() {
530 String::new()
531 } else {
532 format!("WHERE {}", where_clauses.join(" AND "))
533 };
534
535 let order_direction = if request.sort_ascending.unwrap_or(false) {
537 "ASC"
538 } else {
539 "DESC"
540 };
541
542 let query = format!(
543 "{SELECT_PAYMENT_SQL} {where_sql} ORDER BY p.timestamp {order_direction} LIMIT {} OFFSET {}",
544 request.limit.unwrap_or(u32::MAX),
545 request.offset.unwrap_or(0)
546 );
547
548 let mut stmt = connection.prepare(&query)?;
549 let param_refs: Vec<&dyn ToSql> = params.iter().map(std::convert::AsRef::as_ref).collect();
550 let payments = stmt
551 .query_map(param_refs.as_slice(), map_payment)?
552 .collect::<Result<Vec<_>, _>>()?;
553 Ok(payments)
554 }
555
556 #[allow(clippy::too_many_lines)]
557 async fn insert_payment(&self, payment: Payment) -> Result<(), StorageError> {
558 let mut connection = self.get_connection()?;
559 let tx = connection.transaction_with_behavior(TransactionBehavior::Immediate)?;
560 tx.execute(
561 "INSERT INTO payments (id, payment_type, status, amount, fees, timestamp, method)
562 VALUES (?, ?, ?, ?, ?, ?, ?)
563 ON CONFLICT(id) DO UPDATE SET
564 payment_type=excluded.payment_type,
565 status=excluded.status,
566 amount=excluded.amount,
567 fees=excluded.fees,
568 timestamp=excluded.timestamp,
569 method=excluded.method",
570 params![
571 payment.id,
572 payment.payment_type.to_string(),
573 payment.status.to_string(),
574 U128SqlWrapper(payment.amount),
575 U128SqlWrapper(payment.fees),
576 payment.timestamp,
577 payment.method,
578 ],
579 )?;
580
581 match payment.details {
582 Some(PaymentDetails::Withdraw { tx_id }) => {
583 tx.execute(
584 "UPDATE payments SET withdraw_tx_id = ? WHERE id = ?",
585 params![tx_id, payment.id],
586 )?;
587 }
588 Some(PaymentDetails::Deposit { tx_id }) => {
589 tx.execute(
590 "UPDATE payments SET deposit_tx_id = ? WHERE id = ?",
591 params![tx_id, payment.id],
592 )?;
593 }
594 Some(PaymentDetails::Spark {
595 invoice_details,
596 htlc_details,
597 ..
598 }) => {
599 tx.execute(
600 "UPDATE payments SET spark = 1 WHERE id = ?",
601 params![payment.id],
602 )?;
603 if invoice_details.is_some() || htlc_details.is_some() {
604 tx.execute(
606 "INSERT INTO payment_details_spark (payment_id, invoice_details, htlc_details)
607 VALUES (?, ?, ?)
608 ON CONFLICT(payment_id) DO UPDATE SET
609 invoice_details=COALESCE(excluded.invoice_details, payment_details_spark.invoice_details),
610 htlc_details=COALESCE(excluded.htlc_details, payment_details_spark.htlc_details)",
611 params![
612 payment.id,
613 invoice_details.as_ref().map(serde_json::to_string).transpose()?,
614 htlc_details.as_ref().map(serde_json::to_string).transpose()?,
615 ],
616 )?;
617 }
618 }
619 Some(PaymentDetails::Token {
620 metadata,
621 tx_hash,
622 tx_type,
623 invoice_details,
624 ..
625 }) => {
626 tx.execute(
627 "INSERT INTO payment_details_token (payment_id, metadata, tx_hash, tx_type, invoice_details)
628 VALUES (?, ?, ?, ?, ?)
629 ON CONFLICT(payment_id) DO UPDATE SET
630 metadata=excluded.metadata,
631 tx_hash=excluded.tx_hash,
632 tx_type=excluded.tx_type,
633 invoice_details=COALESCE(excluded.invoice_details, payment_details_token.invoice_details)",
634 params![
635 payment.id,
636 serde_json::to_string(&metadata)?,
637 tx_hash,
638 tx_type.to_string(),
639 invoice_details.as_ref().map(serde_json::to_string).transpose()?,
640 ],
641 )?;
642 }
643 Some(PaymentDetails::Lightning {
644 invoice,
645 destination_pubkey,
646 description,
647 htlc_details,
648 ..
649 }) => {
650 tx.execute(
651 "INSERT INTO payment_details_lightning (payment_id, invoice, payment_hash, destination_pubkey, description, preimage, htlc_status, htlc_expiry_time)
652 VALUES (?, ?, ?, ?, ?, ?, ?, ?)
653 ON CONFLICT(payment_id) DO UPDATE SET
654 invoice=excluded.invoice,
655 payment_hash=excluded.payment_hash,
656 destination_pubkey=excluded.destination_pubkey,
657 description=excluded.description,
658 preimage=COALESCE(excluded.preimage, payment_details_lightning.preimage),
659 htlc_status=COALESCE(excluded.htlc_status, payment_details_lightning.htlc_status),
660 htlc_expiry_time=COALESCE(excluded.htlc_expiry_time, payment_details_lightning.htlc_expiry_time)",
661 params![
662 payment.id,
663 invoice,
664 htlc_details.payment_hash,
665 destination_pubkey,
666 description,
667 htlc_details.preimage,
668 htlc_details.status.to_string(),
669 htlc_details.expiry_time,
670 ],
671 )?;
672 }
673 None => {}
674 }
675
676 tx.commit()?;
677 Ok(())
678 }
679
680 async fn insert_payment_metadata(
681 &self,
682 payment_id: String,
683 metadata: PaymentMetadata,
684 ) -> Result<(), StorageError> {
685 let connection = self.get_connection()?;
686
687 connection.execute(
688 "INSERT INTO payment_metadata (payment_id, parent_payment_id, lnurl_pay_info, lnurl_withdraw_info, lnurl_description, conversion_info)
689 VALUES (?, ?, ?, ?, ?, ?)
690 ON CONFLICT(payment_id) DO UPDATE SET
691 parent_payment_id = COALESCE(excluded.parent_payment_id, parent_payment_id),
692 lnurl_pay_info = COALESCE(excluded.lnurl_pay_info, lnurl_pay_info),
693 lnurl_withdraw_info = COALESCE(excluded.lnurl_withdraw_info, lnurl_withdraw_info),
694 lnurl_description = COALESCE(excluded.lnurl_description, lnurl_description),
695 conversion_info = COALESCE(excluded.conversion_info, conversion_info)",
696 params![
697 payment_id,
698 metadata.parent_payment_id,
699 metadata.lnurl_pay_info,
700 metadata.lnurl_withdraw_info,
701 metadata.lnurl_description,
702 metadata.conversion_info.as_ref().map(serde_json::to_string).transpose()?,
703 ],
704 )?;
705
706 Ok(())
707 }
708
709 async fn set_cached_item(&self, key: String, value: String) -> Result<(), StorageError> {
710 let connection = self.get_connection()?;
711
712 connection.execute(
713 "INSERT OR REPLACE INTO settings (key, value) VALUES (?, ?)",
714 params![key, value],
715 )?;
716
717 Ok(())
718 }
719
720 async fn get_cached_item(&self, key: String) -> Result<Option<String>, StorageError> {
721 let connection = self.get_connection()?;
722
723 let mut stmt = connection.prepare("SELECT value FROM settings WHERE key = ?")?;
724
725 let result = stmt.query_row(params![key], |row| {
726 let value_str: String = row.get(0)?;
727 Ok(value_str)
728 });
729
730 match result {
731 Ok(value) => Ok(Some(value)),
732 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
733 Err(e) => Err(e.into()),
734 }
735 }
736
737 async fn delete_cached_item(&self, key: String) -> Result<(), StorageError> {
738 let connection = self.get_connection()?;
739
740 connection.execute("DELETE FROM settings WHERE key = ?", params![key])?;
741
742 Ok(())
743 }
744
745 async fn get_payment_by_id(&self, id: String) -> Result<Payment, StorageError> {
746 let connection = self.get_connection()?;
747 let query = format!("{SELECT_PAYMENT_SQL} WHERE p.id = ?");
748 let mut stmt = connection.prepare(&query)?;
749 let payment = stmt.query_row(params![id], map_payment)?;
750 Ok(payment)
751 }
752
753 async fn get_payment_by_invoice(
754 &self,
755 invoice: String,
756 ) -> Result<Option<Payment>, StorageError> {
757 let connection = self.get_connection()?;
758 let query = format!("{SELECT_PAYMENT_SQL} WHERE l.invoice = ?");
759 let mut stmt = connection.prepare(&query)?;
760 let payment = stmt.query_row(params![invoice], map_payment);
761 match payment {
762 Ok(payment) => Ok(Some(payment)),
763 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
764 Err(e) => Err(e.into()),
765 }
766 }
767
768 async fn get_payments_by_parent_ids(
769 &self,
770 parent_payment_ids: Vec<String>,
771 ) -> Result<HashMap<String, Vec<Payment>>, StorageError> {
772 if parent_payment_ids.is_empty() {
773 return Ok(HashMap::new());
774 }
775
776 let connection = self.get_connection()?;
777
778 let has_related: bool = connection.query_row(
780 "SELECT EXISTS(SELECT 1 FROM payment_metadata WHERE parent_payment_id IS NOT NULL LIMIT 1)",
781 [],
782 |row| row.get(0),
783 )?;
784 if !has_related {
785 return Ok(HashMap::new());
786 }
787
788 let placeholders: Vec<&str> = parent_payment_ids.iter().map(|_| "?").collect();
790 let in_clause = placeholders.join(", ");
791
792 let query = format!(
793 "{SELECT_PAYMENT_SQL} WHERE pm.parent_payment_id IN ({in_clause}) ORDER BY p.timestamp ASC"
794 );
795
796 let mut stmt = connection.prepare(&query)?;
797 let params: Vec<&dyn ToSql> = parent_payment_ids
798 .iter()
799 .map(|id| id as &dyn ToSql)
800 .collect();
801 let rows = stmt.query_map(params.as_slice(), |row| {
802 let payment = map_payment(row)?;
803 let parent_payment_id: String = row.get(30)?;
804 Ok((parent_payment_id, payment))
805 })?;
806
807 let mut result: HashMap<String, Vec<Payment>> = HashMap::new();
808 for row in rows {
809 let (parent_id, related_payment) = row?;
810 result.entry(parent_id).or_default().push(related_payment);
811 }
812
813 Ok(result)
814 }
815
816 async fn add_deposit(
817 &self,
818 txid: String,
819 vout: u32,
820 amount_sats: u64,
821 ) -> Result<(), StorageError> {
822 let connection = self.get_connection()?;
823 connection.execute(
824 "INSERT OR IGNORE INTO unclaimed_deposits (txid, vout, amount_sats)
825 VALUES (?, ?, ?)",
826 params![txid, vout, amount_sats,],
827 )?;
828 Ok(())
829 }
830
831 async fn delete_deposit(&self, txid: String, vout: u32) -> Result<(), StorageError> {
832 let connection = self.get_connection()?;
833 connection.execute(
834 "DELETE FROM unclaimed_deposits WHERE txid = ? AND vout = ?",
835 params![txid, vout],
836 )?;
837 Ok(())
838 }
839
840 async fn list_deposits(&self) -> Result<Vec<DepositInfo>, StorageError> {
841 let connection = self.get_connection()?;
842 let mut stmt =
843 connection.prepare("SELECT txid, vout, amount_sats, claim_error, refund_tx, refund_tx_id FROM unclaimed_deposits")?;
844 let rows = stmt.query_map(params![], |row| {
845 Ok(DepositInfo {
846 txid: row.get(0)?,
847 vout: row.get(1)?,
848 amount_sats: row.get(2)?,
849 claim_error: row.get(3)?,
850 refund_tx: row.get(4)?,
851 refund_tx_id: row.get(5)?,
852 })
853 })?;
854 let mut deposits = Vec::new();
855 for row in rows {
856 deposits.push(row?);
857 }
858 Ok(deposits)
859 }
860
861 async fn update_deposit(
862 &self,
863 txid: String,
864 vout: u32,
865 payload: UpdateDepositPayload,
866 ) -> Result<(), StorageError> {
867 let connection = self.get_connection()?;
868 match payload {
869 UpdateDepositPayload::ClaimError { error } => {
870 connection.execute(
871 "UPDATE unclaimed_deposits SET claim_error = ? WHERE txid = ? AND vout = ?",
872 params![error, txid, vout],
873 )?;
874 }
875 UpdateDepositPayload::Refund {
876 refund_txid,
877 refund_tx,
878 } => {
879 connection.execute(
880 "UPDATE unclaimed_deposits SET refund_tx = ?, refund_tx_id = ? WHERE txid = ? AND vout = ?",
881 params![refund_tx, refund_txid, txid, vout],
882 )?;
883 }
884 }
885 Ok(())
886 }
887
888 async fn set_lnurl_metadata(
889 &self,
890 metadata: Vec<SetLnurlMetadataItem>,
891 ) -> Result<(), StorageError> {
892 let connection = self.get_connection()?;
893 for metadata in metadata {
894 connection.execute(
895 "INSERT OR REPLACE INTO lnurl_receive_metadata (payment_hash, nostr_zap_request, nostr_zap_receipt, sender_comment, preimage)
896 VALUES (?, ?, ?, ?, ?)",
897 params![
898 metadata.payment_hash,
899 metadata.nostr_zap_request,
900 metadata.nostr_zap_receipt,
901 metadata.sender_comment,
902 metadata.preimage,
903 ],
904 )?;
905 }
906 Ok(())
907 }
908
909 async fn add_outgoing_change(
910 &self,
911 record: UnversionedRecordChange,
912 ) -> Result<u64, StorageError> {
913 let mut connection = self.get_connection()?;
914 let tx = connection
915 .transaction_with_behavior(TransactionBehavior::Immediate)
916 .map_err(map_sqlite_error)?;
917
918 let local_revision: u64 = tx
920 .query_row(
921 "SELECT COALESCE(MAX(revision), 0) + 1 FROM sync_outgoing",
922 [],
923 |row| row.get(0),
924 )
925 .map_err(map_sqlite_error)?;
926
927 tx.execute(
928 "INSERT INTO sync_outgoing (
929 record_type
930 , data_id
931 , schema_version
932 , commit_time
933 , updated_fields_json
934 , revision
935 )
936 VALUES (?, ?, ?, strftime('%s','now'), ?, ?)",
937 params![
938 record.id.r#type,
939 record.id.data_id,
940 record.schema_version.clone(),
941 serde_json::to_string(&record.updated_fields)?,
942 local_revision,
943 ],
944 )
945 .map_err(map_sqlite_error)?;
946
947 tx.commit().map_err(map_sqlite_error)?;
948 Ok(local_revision)
949 }
950
951 async fn complete_outgoing_sync(
952 &self,
953 record: Record,
954 local_revision: u64,
955 ) -> Result<(), StorageError> {
956 let mut connection = self.get_connection()?;
957 let tx = connection
958 .transaction_with_behavior(TransactionBehavior::Immediate)
959 .map_err(map_sqlite_error)?;
960
961 let rows_deleted = tx
962 .execute(
963 "DELETE FROM sync_outgoing WHERE record_type = ? AND data_id = ? AND revision = ?",
964 params![record.id.r#type, record.id.data_id, local_revision],
965 )
966 .map_err(map_sqlite_error)?;
967
968 if rows_deleted == 0 {
969 warn!(
970 "complete_outgoing_sync: DELETE from sync_outgoing matched 0 rows \
971 (type={}, data_id={}, revision={})",
972 record.id.r#type, record.id.data_id, local_revision
973 );
974 }
975
976 tx.execute(
977 "INSERT OR REPLACE INTO sync_state (
978 record_type
979 , data_id
980 , schema_version
981 , commit_time
982 , data
983 , revision
984 )
985 VALUES (?, ?, ?, strftime('%s','now'), ?, ?)",
986 params![
987 record.id.r#type,
988 record.id.data_id,
989 record.schema_version.clone(),
990 serde_json::to_string(&record.data)?,
991 record.revision,
992 ],
993 )
994 .map_err(map_sqlite_error)?;
995
996 tx.execute(
997 "UPDATE sync_revision SET revision = MAX(revision, ?)",
998 params![record.revision],
999 )
1000 .map_err(map_sqlite_error)?;
1001
1002 tx.commit().map_err(map_sqlite_error)?;
1003 Ok(())
1004 }
1005
1006 async fn get_pending_outgoing_changes(
1007 &self,
1008 limit: u32,
1009 ) -> Result<Vec<OutgoingChange>, StorageError> {
1010 let connection = self.get_connection()?;
1011
1012 let mut stmt = connection
1013 .prepare(
1014 "SELECT o.record_type
1015 , o.data_id
1016 , o.schema_version
1017 , o.commit_time
1018 , o.updated_fields_json
1019 , o.revision
1020 , e.schema_version AS existing_schema_version
1021 , e.commit_time AS existing_commit_time
1022 , e.data AS existing_data
1023 , e.revision AS existing_revision
1024 FROM sync_outgoing o
1025 LEFT JOIN sync_state e ON o.record_type = e.record_type AND o.data_id = e.data_id
1026 ORDER BY o.revision ASC
1027 LIMIT ?",
1028 )
1029 .map_err(map_sqlite_error)?;
1030 let mut rows = stmt.query(params![limit]).map_err(map_sqlite_error)?;
1031 let mut results = Vec::new();
1032 while let Some(row) = rows.next().map_err(map_sqlite_error)? {
1033 let parent = if let Some(existing_data) =
1034 row.get::<_, Option<String>>(8).map_err(map_sqlite_error)?
1035 {
1036 Some(Record {
1037 id: RecordId::new(
1038 row.get::<_, String>(0).map_err(map_sqlite_error)?,
1039 row.get::<_, String>(1).map_err(map_sqlite_error)?,
1040 ),
1041 schema_version: row.get(6).map_err(map_sqlite_error)?,
1042 revision: row.get(9).map_err(map_sqlite_error)?,
1043 data: serde_json::from_str(&existing_data)?,
1044 })
1045 } else {
1046 None
1047 };
1048 let change = RecordChange {
1049 id: RecordId::new(
1050 row.get::<_, String>(0).map_err(map_sqlite_error)?,
1051 row.get::<_, String>(1).map_err(map_sqlite_error)?,
1052 ),
1053 schema_version: row.get(2).map_err(map_sqlite_error)?,
1054 updated_fields: serde_json::from_str(
1055 &row.get::<_, String>(4).map_err(map_sqlite_error)?,
1056 )?,
1057 local_revision: row.get(5).map_err(map_sqlite_error)?,
1058 };
1059 results.push(OutgoingChange { change, parent });
1060 }
1061
1062 Ok(results)
1063 }
1064
1065 async fn get_last_revision(&self) -> Result<u64, StorageError> {
1066 let connection = self.get_connection()?;
1067
1068 let revision: u64 = connection
1069 .query_row("SELECT revision FROM sync_revision", [], |row| row.get(0))
1070 .map_err(map_sqlite_error)?;
1071
1072 Ok(revision)
1073 }
1074
1075 async fn insert_incoming_records(&self, records: Vec<Record>) -> Result<(), StorageError> {
1076 if records.is_empty() {
1077 return Ok(());
1078 }
1079
1080 let mut connection = self.get_connection()?;
1081 let tx = connection
1082 .transaction_with_behavior(TransactionBehavior::Immediate)
1083 .map_err(map_sqlite_error)?;
1084
1085 for record in records {
1086 tx.execute(
1087 "INSERT OR REPLACE INTO sync_incoming (
1088 record_type
1089 , data_id
1090 , schema_version
1091 , commit_time
1092 , data
1093 , revision
1094 )
1095 VALUES (?, ?, ?, strftime('%s','now'), ?, ?)",
1096 params![
1097 record.id.r#type,
1098 record.id.data_id,
1099 record.schema_version.clone(),
1100 serde_json::to_string(&record.data)?,
1101 record.revision,
1102 ],
1103 )
1104 .map_err(map_sqlite_error)?;
1105 }
1106
1107 tx.commit().map_err(map_sqlite_error)?;
1108 Ok(())
1109 }
1110
1111 async fn delete_incoming_record(&self, record: Record) -> Result<(), StorageError> {
1112 let connection = self.get_connection()?;
1113
1114 connection
1115 .execute(
1116 "DELETE FROM sync_incoming WHERE record_type = ? AND data_id = ? AND revision = ?",
1117 params![record.id.r#type, record.id.data_id, record.revision],
1118 )
1119 .map_err(map_sqlite_error)?;
1120
1121 Ok(())
1122 }
1123
1124 async fn get_incoming_records(&self, limit: u32) -> Result<Vec<IncomingChange>, StorageError> {
1125 let connection = self.get_connection()?;
1126
1127 let mut stmt = connection
1128 .prepare(
1129 "SELECT i.record_type
1130 , i.data_id
1131 , i.schema_version
1132 , i.data
1133 , i.revision
1134 , e.schema_version AS existing_schema_version
1135 , e.commit_time AS existing_commit_time
1136 , e.data AS existing_data
1137 , e.revision AS existing_revision
1138 FROM sync_incoming i
1139 LEFT JOIN sync_state e ON i.record_type = e.record_type AND i.data_id = e.data_id
1140 ORDER BY i.revision ASC
1141 LIMIT ?",
1142 )
1143 .map_err(map_sqlite_error)?;
1144
1145 let mut rows = stmt.query(params![limit]).map_err(map_sqlite_error)?;
1146 let mut results = Vec::new();
1147
1148 while let Some(row) = rows.next().map_err(map_sqlite_error)? {
1149 let parent = if let Some(existing_data) =
1150 row.get::<_, Option<String>>(7).map_err(map_sqlite_error)?
1151 {
1152 Some(Record {
1153 id: RecordId::new(
1154 row.get::<_, String>(0).map_err(map_sqlite_error)?,
1155 row.get::<_, String>(1).map_err(map_sqlite_error)?,
1156 ),
1157 schema_version: row.get(5).map_err(map_sqlite_error)?,
1158 revision: row.get(8).map_err(map_sqlite_error)?,
1159 data: serde_json::from_str(&existing_data)?,
1160 })
1161 } else {
1162 None
1163 };
1164 let record = Record {
1165 id: RecordId::new(
1166 row.get::<_, String>(0).map_err(map_sqlite_error)?,
1167 row.get::<_, String>(1).map_err(map_sqlite_error)?,
1168 ),
1169 schema_version: row.get(2).map_err(map_sqlite_error)?,
1170 data: serde_json::from_str(&row.get::<_, String>(3).map_err(map_sqlite_error)?)?,
1171 revision: row.get(4).map_err(map_sqlite_error)?,
1172 };
1173 results.push(IncomingChange {
1174 new_state: record,
1175 old_state: parent,
1176 });
1177 }
1178
1179 Ok(results)
1180 }
1181
1182 async fn get_latest_outgoing_change(&self) -> Result<Option<OutgoingChange>, StorageError> {
1183 let connection = self.get_connection()?;
1184
1185 let mut stmt = connection
1186 .prepare(
1187 "SELECT o.record_type
1188 , o.data_id
1189 , o.schema_version
1190 , o.commit_time
1191 , o.updated_fields_json
1192 , o.revision
1193 , e.schema_version AS existing_schema_version
1194 , e.commit_time AS existing_commit_time
1195 , e.data AS existing_data
1196 , e.revision AS existing_revision
1197 FROM sync_outgoing o
1198 LEFT JOIN sync_state e ON o.record_type = e.record_type AND o.data_id = e.data_id
1199 ORDER BY o.revision DESC
1200 LIMIT 1",
1201 )
1202 .map_err(map_sqlite_error)?;
1203
1204 let mut rows = stmt.query([]).map_err(map_sqlite_error)?;
1205
1206 if let Some(row) = rows.next().map_err(map_sqlite_error)? {
1207 let parent = if let Some(existing_data) =
1208 row.get::<_, Option<String>>(8).map_err(map_sqlite_error)?
1209 {
1210 Some(Record {
1211 id: RecordId::new(
1212 row.get::<_, String>(0).map_err(map_sqlite_error)?,
1213 row.get::<_, String>(1).map_err(map_sqlite_error)?,
1214 ),
1215 schema_version: row.get(6).map_err(map_sqlite_error)?,
1216 revision: row.get(9).map_err(map_sqlite_error)?,
1217 data: serde_json::from_str(&existing_data)?,
1218 })
1219 } else {
1220 None
1221 };
1222 let change = RecordChange {
1223 id: RecordId::new(
1224 row.get::<_, String>(0).map_err(map_sqlite_error)?,
1225 row.get::<_, String>(1).map_err(map_sqlite_error)?,
1226 ),
1227 schema_version: row.get(2).map_err(map_sqlite_error)?,
1228 updated_fields: serde_json::from_str(
1229 &row.get::<_, String>(4).map_err(map_sqlite_error)?,
1230 )?,
1231 local_revision: row.get(5).map_err(map_sqlite_error)?,
1232 };
1233
1234 return Ok(Some(OutgoingChange { change, parent }));
1235 }
1236
1237 Ok(None)
1238 }
1239
1240 async fn update_record_from_incoming(&self, record: Record) -> Result<(), StorageError> {
1241 let mut connection = self.get_connection()?;
1242 let tx = connection
1243 .transaction_with_behavior(TransactionBehavior::Immediate)
1244 .map_err(map_sqlite_error)?;
1245
1246 tx.execute(
1247 "INSERT OR REPLACE INTO sync_state (
1248 record_type
1249 , data_id
1250 , schema_version
1251 , commit_time
1252 , data
1253 , revision
1254 )
1255 VALUES (?, ?, ?, strftime('%s','now'), ?, ?)",
1256 params![
1257 record.id.r#type,
1258 record.id.data_id,
1259 record.schema_version.clone(),
1260 serde_json::to_string(&record.data)?,
1261 record.revision,
1262 ],
1263 )
1264 .map_err(map_sqlite_error)?;
1265
1266 tx.execute(
1267 "UPDATE sync_revision SET revision = MAX(revision, ?)",
1268 params![record.revision],
1269 )
1270 .map_err(map_sqlite_error)?;
1271
1272 tx.commit().map_err(map_sqlite_error)?;
1273 Ok(())
1274 }
1275}
1276
1277const SELECT_PAYMENT_SQL: &str = "
1280 SELECT p.id,
1281 p.payment_type,
1282 p.status,
1283 p.amount,
1284 p.fees,
1285 p.timestamp,
1286 p.method,
1287 p.withdraw_tx_id,
1288 p.deposit_tx_id,
1289 p.spark,
1290 l.invoice AS lightning_invoice,
1291 l.payment_hash AS lightning_payment_hash,
1292 l.destination_pubkey AS lightning_destination_pubkey,
1293 COALESCE(l.description, pm.lnurl_description) AS lightning_description,
1294 l.preimage AS lightning_preimage,
1295 l.htlc_status AS lightning_htlc_status,
1296 l.htlc_expiry_time AS lightning_htlc_expiry_time,
1297 pm.lnurl_pay_info,
1298 pm.lnurl_withdraw_info,
1299 pm.conversion_info,
1300 t.metadata AS token_metadata,
1301 t.tx_hash AS token_tx_hash,
1302 t.tx_type AS token_tx_type,
1303 t.invoice_details AS token_invoice_details,
1304 s.invoice_details AS spark_invoice_details,
1305 s.htlc_details AS spark_htlc_details,
1306 lrm.nostr_zap_request AS lnurl_nostr_zap_request,
1307 lrm.nostr_zap_receipt AS lnurl_nostr_zap_receipt,
1308 lrm.sender_comment AS lnurl_sender_comment,
1309 lrm.payment_hash AS lnurl_payment_hash,
1310 pm.parent_payment_id
1311 FROM payments p
1312 LEFT JOIN payment_details_lightning l ON p.id = l.payment_id
1313 LEFT JOIN payment_details_token t ON p.id = t.payment_id
1314 LEFT JOIN payment_details_spark s ON p.id = s.payment_id
1315 LEFT JOIN payment_metadata pm ON p.id = pm.payment_id
1316 LEFT JOIN lnurl_receive_metadata lrm ON l.payment_hash = lrm.payment_hash";
1317
1318#[allow(clippy::too_many_lines)]
1319fn map_payment(row: &Row<'_>) -> Result<Payment, rusqlite::Error> {
1320 let withdraw_tx_id: Option<String> = row.get(7)?;
1321 let deposit_tx_id: Option<String> = row.get(8)?;
1322 let spark: Option<i32> = row.get(9)?;
1323 let lightning_invoice: Option<String> = row.get(10)?;
1324 let token_metadata: Option<String> = row.get(20)?;
1325 let details = match (
1326 lightning_invoice,
1327 withdraw_tx_id,
1328 deposit_tx_id,
1329 spark,
1330 token_metadata,
1331 ) {
1332 (Some(invoice), _, _, _, _) => {
1333 let payment_hash: String = row.get(11)?;
1334 let destination_pubkey: String = row.get(12)?;
1335 let description: Option<String> = row.get(13)?;
1336 let preimage: Option<String> = row.get(14)?;
1337 let htlc_status: SparkHtlcStatus =
1338 row.get::<_, Option<SparkHtlcStatus>>(15)?.ok_or_else(|| {
1339 rusqlite::Error::FromSqlConversionFailure(
1340 15,
1341 rusqlite::types::Type::Null,
1342 "htlc_status is required for Lightning payments".into(),
1343 )
1344 })?;
1345 let htlc_expiry_time: u64 = row.get(16)?;
1346 let htlc_details = SparkHtlcDetails {
1347 payment_hash,
1348 preimage,
1349 expiry_time: htlc_expiry_time,
1350 status: htlc_status,
1351 };
1352 let lnurl_pay_info: Option<LnurlPayInfo> = row.get(17)?;
1353 let lnurl_withdraw_info: Option<LnurlWithdrawInfo> = row.get(18)?;
1354 let lnurl_nostr_zap_request: Option<String> = row.get(26)?;
1355 let lnurl_nostr_zap_receipt: Option<String> = row.get(27)?;
1356 let lnurl_sender_comment: Option<String> = row.get(28)?;
1357 let lnurl_payment_hash: Option<String> = row.get(29)?;
1358 let lnurl_receive_metadata = if lnurl_payment_hash.is_some() {
1359 Some(LnurlReceiveMetadata {
1360 nostr_zap_request: lnurl_nostr_zap_request,
1361 nostr_zap_receipt: lnurl_nostr_zap_receipt,
1362 sender_comment: lnurl_sender_comment,
1363 })
1364 } else {
1365 None
1366 };
1367 Some(PaymentDetails::Lightning {
1368 invoice,
1369 destination_pubkey,
1370 description,
1371 htlc_details,
1372 lnurl_pay_info,
1373 lnurl_withdraw_info,
1374 lnurl_receive_metadata,
1375 })
1376 }
1377 (_, Some(tx_id), _, _, _) => Some(PaymentDetails::Withdraw { tx_id }),
1378 (_, _, Some(tx_id), _, _) => Some(PaymentDetails::Deposit { tx_id }),
1379 (_, _, _, Some(_), _) => {
1380 let invoice_details_str: Option<String> = row.get(24)?;
1381 let invoice_details = invoice_details_str
1382 .map(|s| serde_json_from_str(&s, 24))
1383 .transpose()?;
1384 let htlc_details_str: Option<String> = row.get(25)?;
1385 let htlc_details = htlc_details_str
1386 .map(|s| serde_json_from_str(&s, 25))
1387 .transpose()?;
1388 let conversion_info_str: Option<String> = row.get(19)?;
1389 let conversion_info: Option<ConversionInfo> = conversion_info_str
1390 .map(|s: String| serde_json_from_str(&s, 19))
1391 .transpose()?;
1392 Some(PaymentDetails::Spark {
1393 invoice_details,
1394 htlc_details,
1395 conversion_info,
1396 })
1397 }
1398 (_, _, _, _, Some(metadata)) => {
1399 let tx_type: TokenTransactionType = row.get(22)?;
1400 let invoice_details_str: Option<String> = row.get(23)?;
1401 let invoice_details = invoice_details_str
1402 .map(|s| serde_json_from_str(&s, 23))
1403 .transpose()?;
1404 let conversion_info_str: Option<String> = row.get(19)?;
1405 let conversion_info: Option<ConversionInfo> = conversion_info_str
1406 .map(|s: String| serde_json_from_str(&s, 19))
1407 .transpose()?;
1408 Some(PaymentDetails::Token {
1409 metadata: serde_json_from_str(&metadata, 20)?,
1410 tx_hash: row.get(21)?,
1411 tx_type,
1412 invoice_details,
1413 conversion_info,
1414 })
1415 }
1416 _ => None,
1417 };
1418 Ok(Payment {
1419 id: row.get(0)?,
1420 payment_type: row.get::<_, String>(1)?.parse().map_err(|e: String| {
1421 rusqlite::Error::FromSqlConversionFailure(1, rusqlite::types::Type::Text, e.into())
1422 })?,
1423 status: row.get::<_, String>(2)?.parse().map_err(|e: String| {
1424 rusqlite::Error::FromSqlConversionFailure(2, rusqlite::types::Type::Text, e.into())
1425 })?,
1426 amount: row.get::<_, U128SqlWrapper>(3)?.0,
1427 fees: row.get::<_, U128SqlWrapper>(4)?.0,
1428 timestamp: row.get(5)?,
1429 details,
1430 method: row.get(6)?,
1431 conversion_details: None,
1432 })
1433}
1434
1435impl ToSql for PaymentDetails {
1436 fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
1437 to_sql_json(self)
1438 }
1439}
1440
1441impl FromSql for PaymentDetails {
1442 fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
1443 from_sql_json(value)
1444 }
1445}
1446
1447impl ToSql for PaymentMethod {
1448 fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
1449 Ok(rusqlite::types::ToSqlOutput::from(self.to_string()))
1450 }
1451}
1452
1453impl FromSql for PaymentMethod {
1454 fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
1455 match value {
1456 ValueRef::Text(i) => {
1457 let s = std::str::from_utf8(i).map_err(|e| FromSqlError::Other(Box::new(e)))?;
1458 let payment_method: PaymentMethod = s
1460 .trim_matches('"')
1461 .to_lowercase()
1462 .parse()
1463 .map_err(|()| FromSqlError::InvalidType)?;
1464 Ok(payment_method)
1465 }
1466 _ => Err(FromSqlError::InvalidType),
1467 }
1468 }
1469}
1470
1471impl ToSql for TokenTransactionType {
1472 fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
1473 Ok(rusqlite::types::ToSqlOutput::from(self.to_string()))
1474 }
1475}
1476
1477impl FromSql for TokenTransactionType {
1478 fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
1479 match value {
1480 ValueRef::Text(i) => {
1481 let s = std::str::from_utf8(i).map_err(|e| FromSqlError::Other(Box::new(e)))?;
1482 let tx_type: TokenTransactionType =
1483 s.parse().map_err(|_: String| FromSqlError::InvalidType)?;
1484 Ok(tx_type)
1485 }
1486 _ => Err(FromSqlError::InvalidType),
1487 }
1488 }
1489}
1490
1491impl ToSql for SparkHtlcStatus {
1492 fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
1493 Ok(rusqlite::types::ToSqlOutput::from(self.to_string()))
1494 }
1495}
1496
1497impl FromSql for SparkHtlcStatus {
1498 fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
1499 match value {
1500 ValueRef::Text(i) => {
1501 let s = std::str::from_utf8(i).map_err(|e| FromSqlError::Other(Box::new(e)))?;
1502 let status: SparkHtlcStatus =
1503 s.parse().map_err(|_: String| FromSqlError::InvalidType)?;
1504 Ok(status)
1505 }
1506 _ => Err(FromSqlError::InvalidType),
1507 }
1508 }
1509}
1510
1511impl ToSql for DepositClaimError {
1512 fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
1513 to_sql_json(self)
1514 }
1515}
1516
1517impl FromSql for DepositClaimError {
1518 fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
1519 from_sql_json(value)
1520 }
1521}
1522
1523impl ToSql for LnurlPayInfo {
1524 fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
1525 to_sql_json(self)
1526 }
1527}
1528
1529impl FromSql for LnurlPayInfo {
1530 fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
1531 from_sql_json(value)
1532 }
1533}
1534
1535impl ToSql for LnurlWithdrawInfo {
1536 fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
1537 to_sql_json(self)
1538 }
1539}
1540
1541impl FromSql for LnurlWithdrawInfo {
1542 fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
1543 from_sql_json(value)
1544 }
1545}
1546
1547fn to_sql_json<T>(value: T) -> rusqlite::Result<ToSqlOutput<'static>>
1548where
1549 T: serde::Serialize,
1550{
1551 let json = serde_json::to_string(&value)
1552 .map_err(|e| rusqlite::Error::ToSqlConversionFailure(Box::new(e)))?;
1553 Ok(rusqlite::types::ToSqlOutput::from(json))
1554}
1555
1556fn from_sql_json<T>(value: ValueRef<'_>) -> FromSqlResult<T>
1557where
1558 T: serde::de::DeserializeOwned,
1559{
1560 match value {
1561 ValueRef::Text(i) => {
1562 let s = std::str::from_utf8(i).map_err(|e| FromSqlError::Other(Box::new(e)))?;
1563 let deserialized: T = serde_json::from_str(s).map_err(|_| FromSqlError::InvalidType)?;
1564 Ok(deserialized)
1565 }
1566 _ => Err(FromSqlError::InvalidType),
1567 }
1568}
1569
1570fn serde_json_from_str<T>(value: &str, index: usize) -> Result<T, rusqlite::Error>
1571where
1572 T: serde::de::DeserializeOwned,
1573{
1574 serde_json::from_str(value).map_err(|e| {
1575 rusqlite::Error::FromSqlConversionFailure(index, rusqlite::types::Type::Text, Box::new(e))
1576 })
1577}
1578
1579struct U128SqlWrapper(u128);
1580
1581impl ToSql for U128SqlWrapper {
1582 fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
1583 let string = self.0.to_string();
1584 Ok(rusqlite::types::ToSqlOutput::from(string))
1585 }
1586}
1587
1588impl FromSql for U128SqlWrapper {
1589 fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
1590 match value {
1591 ValueRef::Text(i) => {
1592 let s = std::str::from_utf8(i).map_err(|e| FromSqlError::Other(Box::new(e)))?;
1593 let integer = s.parse::<u128>().map_err(|_| FromSqlError::InvalidType)?;
1594 Ok(U128SqlWrapper(integer))
1595 }
1596 _ => Err(FromSqlError::InvalidType),
1597 }
1598 }
1599}
1600
1601#[cfg(test)]
1602mod tests {
1603
1604 use crate::SqliteStorage;
1605 use std::path::PathBuf;
1606
1607 fn create_temp_dir(name: &str) -> PathBuf {
1610 let mut path = std::env::temp_dir();
1611 path.push(format!("breez-test-{}-{}", name, uuid::Uuid::new_v4()));
1613 std::fs::create_dir_all(&path).unwrap();
1614 path
1615 }
1616
1617 #[tokio::test]
1618 async fn test_storage() {
1619 let temp_dir = create_temp_dir("sqlite_storage");
1620 let storage = SqliteStorage::new(&temp_dir).unwrap();
1621
1622 Box::pin(crate::persist::tests::test_storage(Box::new(storage))).await;
1623 }
1624
1625 #[tokio::test]
1626 async fn test_unclaimed_deposits_crud() {
1627 let temp_dir = create_temp_dir("sqlite_storage_deposits");
1628 let storage = SqliteStorage::new(&temp_dir).unwrap();
1629
1630 crate::persist::tests::test_unclaimed_deposits_crud(Box::new(storage)).await;
1631 }
1632
1633 #[tokio::test]
1634 async fn test_deposit_refunds() {
1635 let temp_dir = create_temp_dir("sqlite_storage_refund_tx");
1636 let storage = SqliteStorage::new(&temp_dir).unwrap();
1637
1638 crate::persist::tests::test_deposit_refunds(Box::new(storage)).await;
1639 }
1640
1641 #[tokio::test]
1642 async fn test_payment_type_filtering() {
1643 let temp_dir = create_temp_dir("sqlite_storage_type_filter");
1644 let storage = SqliteStorage::new(&temp_dir).unwrap();
1645
1646 crate::persist::tests::test_payment_type_filtering(Box::new(storage)).await;
1647 }
1648
1649 #[tokio::test]
1650 async fn test_payment_status_filtering() {
1651 let temp_dir = create_temp_dir("sqlite_storage_status_filter");
1652 let storage = SqliteStorage::new(&temp_dir).unwrap();
1653
1654 crate::persist::tests::test_payment_status_filtering(Box::new(storage)).await;
1655 }
1656
1657 #[tokio::test]
1658 async fn test_payment_asset_filtering() {
1659 let temp_dir = create_temp_dir("sqlite_storage_asset_filter");
1660 let storage = SqliteStorage::new(&temp_dir).unwrap();
1661
1662 crate::persist::tests::test_asset_filtering(Box::new(storage)).await;
1663 }
1664
1665 #[tokio::test]
1666 async fn test_timestamp_filtering() {
1667 let temp_dir = create_temp_dir("sqlite_storage_timestamp_filter");
1668 let storage = SqliteStorage::new(&temp_dir).unwrap();
1669
1670 crate::persist::tests::test_timestamp_filtering(Box::new(storage)).await;
1671 }
1672
1673 #[tokio::test]
1674 async fn test_spark_htlc_status_filtering() {
1675 let temp_dir = create_temp_dir("sqlite_storage_htlc_filter");
1676 let storage = SqliteStorage::new(&temp_dir).unwrap();
1677
1678 crate::persist::tests::test_spark_htlc_status_filtering(Box::new(storage)).await;
1679 }
1680
1681 #[tokio::test]
1682 async fn test_lightning_htlc_details_and_status_filtering() {
1683 let temp_dir = create_temp_dir("sqlite_storage_htlc_details");
1684 let storage = SqliteStorage::new(&temp_dir).unwrap();
1685
1686 crate::persist::tests::test_lightning_htlc_details_and_status_filtering(Box::new(storage))
1687 .await;
1688 }
1689
1690 #[tokio::test]
1691 async fn test_conversion_refund_needed_filtering() {
1692 let temp_dir = create_temp_dir("sqlite_storage_conversion_refund_needed_filter");
1693 let storage = SqliteStorage::new(&temp_dir).unwrap();
1694
1695 crate::persist::tests::test_conversion_refund_needed_filtering(Box::new(storage)).await;
1696 }
1697
1698 #[tokio::test]
1699 async fn test_token_transaction_type_filtering() {
1700 let temp_dir = create_temp_dir("sqlite_storage_token_transaction_type_filter");
1701 let storage = SqliteStorage::new(&temp_dir).unwrap();
1702
1703 crate::persist::tests::test_token_transaction_type_filtering(Box::new(storage)).await;
1704 }
1705
1706 #[tokio::test]
1707 async fn test_combined_filters() {
1708 let temp_dir = create_temp_dir("sqlite_storage_combined_filter");
1709 let storage = SqliteStorage::new(&temp_dir).unwrap();
1710
1711 crate::persist::tests::test_combined_filters(Box::new(storage)).await;
1712 }
1713
1714 #[tokio::test]
1715 async fn test_sort_order() {
1716 let temp_dir = create_temp_dir("sqlite_storage_sort_order");
1717 let storage = SqliteStorage::new(&temp_dir).unwrap();
1718
1719 crate::persist::tests::test_sort_order(Box::new(storage)).await;
1720 }
1721
1722 #[tokio::test]
1723 async fn test_payment_metadata() {
1724 let temp_dir = create_temp_dir("sqlite_storage_payment_request_metadata");
1725 let storage = SqliteStorage::new(&temp_dir).unwrap();
1726
1727 crate::persist::tests::test_payment_metadata(Box::new(storage)).await;
1728 }
1729
1730 #[tokio::test]
1731 async fn test_payment_details_update_persistence() {
1732 let temp_dir = create_temp_dir("sqlite_storage_payment_details_update");
1733 let storage = SqliteStorage::new(&temp_dir).unwrap();
1734
1735 crate::persist::tests::test_payment_details_update_persistence(Box::new(storage)).await;
1736 }
1737
1738 #[tokio::test]
1739 async fn test_pending_lnurl_preimages() {
1740 let temp_dir = create_temp_dir("sqlite_storage_pending_lnurl_preimages");
1741 let storage = SqliteStorage::new(&temp_dir).unwrap();
1742
1743 crate::persist::tests::test_pending_lnurl_preimages(Box::new(storage)).await;
1744 }
1745
1746 #[tokio::test]
1747 async fn test_sync_storage() {
1748 let temp_dir = create_temp_dir("sqlite_sync_storage");
1749 let storage = SqliteStorage::new(&temp_dir).unwrap();
1750
1751 crate::persist::tests::test_sync_storage(Box::new(storage)).await;
1752 }
1753
1754 #[tokio::test]
1755 async fn test_payment_metadata_merge() {
1756 let temp_dir = create_temp_dir("sqlite_payment_metadata_merge");
1757 let storage = SqliteStorage::new(&temp_dir).unwrap();
1758
1759 crate::persist::tests::test_payment_metadata_merge(Box::new(storage)).await;
1760 }
1761
1762 #[tokio::test]
1763 #[allow(clippy::too_many_lines)]
1764 async fn test_migration_tx_type() {
1765 use crate::{
1766 Payment, PaymentDetails, PaymentMethod, PaymentStatus, PaymentType, Storage,
1767 TokenMetadata, TokenTransactionType,
1768 persist::{StorageListPaymentsRequest, StoragePaymentDetailsFilter},
1769 };
1770 use rusqlite::{Connection, params};
1771 use rusqlite_migration::{M, Migrations};
1772
1773 let temp_dir = create_temp_dir("sqlite_migration_tx_type");
1774 let db_path = temp_dir.join(super::DEFAULT_DB_FILENAME);
1775
1776 {
1778 let mut conn = Connection::open(&db_path).unwrap();
1779 let migrations_before_tx_type: Vec<_> = SqliteStorage::current_migrations()
1780 .iter()
1781 .take(22) .map(|s| M::up(s))
1783 .collect();
1784 let migrations = Migrations::new(migrations_before_tx_type);
1785 migrations.to_latest(&mut conn).unwrap();
1786 }
1787
1788 {
1790 let conn = Connection::open(&db_path).unwrap();
1791
1792 conn.execute(
1794 "INSERT INTO payments (id, payment_type, status, amount, fees, timestamp, method)
1795 VALUES (?, ?, ?, ?, ?, ?, ?)",
1796 params![
1797 "token-migration-test",
1798 "send",
1799 "completed",
1800 "5000",
1801 "10",
1802 1_234_567_890_i64,
1803 "\"token\""
1804 ],
1805 )
1806 .unwrap();
1807
1808 let metadata = serde_json::json!({
1810 "identifier": "test-token-id",
1811 "issuer_public_key": format!("02{}", "a".repeat(64)),
1812 "name": "Test Token",
1813 "ticker": "TST",
1814 "decimals": 8,
1815 "max_supply": 1_000_000_u128,
1816 "is_freezable": false
1817 });
1818
1819 conn.execute(
1820 "INSERT INTO payment_details_token (payment_id, metadata, tx_hash)
1821 VALUES (?, ?, ?)",
1822 params![
1823 "token-migration-test",
1824 metadata.to_string(),
1825 "0xabcdef1234567890"
1826 ],
1827 )
1828 .unwrap();
1829 }
1830
1831 let storage = SqliteStorage::new(&temp_dir).unwrap();
1833
1834 let migrated_payment = storage
1836 .get_payment_by_id("token-migration-test".to_string())
1837 .await
1838 .unwrap();
1839
1840 assert_eq!(migrated_payment.id, "token-migration-test");
1841 assert_eq!(migrated_payment.amount, 5000);
1842 assert_eq!(migrated_payment.fees, 10);
1843 assert_eq!(migrated_payment.status, PaymentStatus::Completed);
1844 assert_eq!(migrated_payment.payment_type, PaymentType::Send);
1845 assert_eq!(migrated_payment.method, PaymentMethod::Token);
1846
1847 match migrated_payment.details {
1849 Some(PaymentDetails::Token {
1850 metadata,
1851 tx_hash,
1852 tx_type,
1853 ..
1854 }) => {
1855 assert_eq!(metadata.identifier, "test-token-id");
1856 assert_eq!(metadata.name, "Test Token");
1857 assert_eq!(metadata.ticker, "TST");
1858 assert_eq!(metadata.decimals, 8);
1859 assert_eq!(tx_hash, "0xabcdef1234567890");
1860 assert_eq!(
1862 tx_type,
1863 TokenTransactionType::Transfer,
1864 "Migration should add default txType 'transfer' to token payments"
1865 );
1866 }
1867 _ => panic!("Expected Token payment details"),
1868 }
1869
1870 let new_payment = Payment {
1872 id: "new-token-payment".to_string(),
1873 payment_type: PaymentType::Receive,
1874 status: PaymentStatus::Completed,
1875 amount: 8000,
1876 fees: 20,
1877 timestamp: 1_234_567_891,
1878 method: PaymentMethod::Token,
1879 details: Some(PaymentDetails::Token {
1880 metadata: TokenMetadata {
1881 identifier: "another-token-id".to_string(),
1882 issuer_public_key: format!("02{}", "b".repeat(64)),
1883 name: "Another Token".to_string(),
1884 ticker: "ATK".to_string(),
1885 decimals: 6,
1886 max_supply: 2_000_000,
1887 is_freezable: true,
1888 },
1889 tx_hash: "0x1111222233334444".to_string(),
1890 tx_type: TokenTransactionType::Mint,
1891 invoice_details: None,
1892 conversion_info: None,
1893 }),
1894 conversion_details: None,
1895 };
1896
1897 storage.insert_payment(new_payment).await.unwrap();
1898
1899 let request = StorageListPaymentsRequest {
1901 type_filter: None,
1902 status_filter: None,
1903 asset_filter: None,
1904 payment_details_filter: None,
1905 from_timestamp: None,
1906 to_timestamp: None,
1907 offset: None,
1908 limit: None,
1909 sort_ascending: Some(true),
1910 };
1911
1912 let payments = storage.list_payments(request).await.unwrap();
1913 assert_eq!(payments.len(), 2, "Should have both payments");
1914
1915 let migrated = payments
1917 .iter()
1918 .find(|p| p.id == "token-migration-test")
1919 .unwrap();
1920 match &migrated.details {
1921 Some(PaymentDetails::Token { tx_type, .. }) => {
1922 assert_eq!(*tx_type, TokenTransactionType::Transfer);
1923 }
1924 _ => panic!("Expected Token payment details"),
1925 }
1926
1927 let new = payments
1929 .iter()
1930 .find(|p| p.id == "new-token-payment")
1931 .unwrap();
1932 match &new.details {
1933 Some(PaymentDetails::Token { tx_type, .. }) => {
1934 assert_eq!(*tx_type, TokenTransactionType::Mint);
1935 }
1936 _ => panic!("Expected Token payment details"),
1937 }
1938
1939 let transfer_filter = StorageListPaymentsRequest {
1941 type_filter: None,
1942 status_filter: None,
1943 asset_filter: None,
1944 payment_details_filter: Some(vec![StoragePaymentDetailsFilter::Token {
1945 conversion_refund_needed: None,
1946 tx_hash: None,
1947 tx_type: Some(TokenTransactionType::Transfer),
1948 }]),
1949 from_timestamp: None,
1950 to_timestamp: None,
1951 offset: None,
1952 limit: None,
1953 sort_ascending: Some(true),
1954 };
1955
1956 let transfer_payments = storage.list_payments(transfer_filter).await.unwrap();
1957 assert_eq!(
1958 transfer_payments.len(),
1959 1,
1960 "Should find only the Transfer payment"
1961 );
1962 assert_eq!(transfer_payments[0].id, "token-migration-test");
1963 }
1964
1965 #[tokio::test]
1966 #[allow(clippy::too_many_lines)]
1967 async fn test_migration_htlc_details() {
1968 use crate::{
1969 PaymentDetails, SparkHtlcStatus, Storage,
1970 persist::{StorageListPaymentsRequest, StoragePaymentDetailsFilter},
1971 };
1972 use rusqlite::{Connection, params};
1973 use rusqlite_migration::{M, Migrations};
1974
1975 let temp_dir = create_temp_dir("sqlite_migration_htlc_details");
1976 let db_path = temp_dir.join(super::DEFAULT_DB_FILENAME);
1977
1978 {
1982 let mut conn = Connection::open(&db_path).unwrap();
1983 let migrations_before_backfill: Vec<_> = SqliteStorage::current_migrations()
1984 .iter()
1985 .take(23) .map(|s| M::up(s))
1987 .collect();
1988 let migrations = Migrations::new(migrations_before_backfill);
1989 migrations.to_latest(&mut conn).unwrap();
1990 }
1991
1992 {
1994 let conn = Connection::open(&db_path).unwrap();
1995
1996 conn.execute(
1998 "INSERT INTO payments (id, payment_type, status, amount, fees, timestamp, method)
1999 VALUES (?, ?, ?, ?, ?, ?, ?)",
2000 params![
2001 "ln-completed",
2002 "send",
2003 "completed",
2004 "1000",
2005 "10",
2006 1_700_000_001_i64,
2007 "\"lightning\""
2008 ],
2009 )
2010 .unwrap();
2011 conn.execute(
2012 "INSERT INTO payment_details_lightning (payment_id, invoice, payment_hash, destination_pubkey, preimage)
2013 VALUES (?, ?, ?, ?, ?)",
2014 params![
2015 "ln-completed",
2016 "lnbc_completed",
2017 "hash_completed_0123456789abcdef0123456789abcdef0123456789abcdef01234567",
2018 "03pubkey1",
2019 "preimage_completed"
2020 ],
2021 )
2022 .unwrap();
2023
2024 conn.execute(
2026 "INSERT INTO payments (id, payment_type, status, amount, fees, timestamp, method)
2027 VALUES (?, ?, ?, ?, ?, ?, ?)",
2028 params![
2029 "ln-pending",
2030 "receive",
2031 "pending",
2032 "2000",
2033 "0",
2034 1_700_000_002_i64,
2035 "\"lightning\""
2036 ],
2037 )
2038 .unwrap();
2039 conn.execute(
2040 "INSERT INTO payment_details_lightning (payment_id, invoice, payment_hash, destination_pubkey)
2041 VALUES (?, ?, ?, ?)",
2042 params![
2043 "ln-pending",
2044 "lnbc_pending",
2045 "hash_pending_0123456789abcdef0123456789abcdef0123456789abcdef012345678",
2046 "03pubkey2"
2047 ],
2048 )
2049 .unwrap();
2050
2051 conn.execute(
2053 "INSERT INTO payments (id, payment_type, status, amount, fees, timestamp, method)
2054 VALUES (?, ?, ?, ?, ?, ?, ?)",
2055 params![
2056 "ln-failed",
2057 "send",
2058 "failed",
2059 "3000",
2060 "5",
2061 1_700_000_003_i64,
2062 "\"lightning\""
2063 ],
2064 )
2065 .unwrap();
2066 conn.execute(
2067 "INSERT INTO payment_details_lightning (payment_id, invoice, payment_hash, destination_pubkey)
2068 VALUES (?, ?, ?, ?)",
2069 params![
2070 "ln-failed",
2071 "lnbc_failed",
2072 "hash_failed_0123456789abcdef0123456789abcdef0123456789abcdef0123456789",
2073 "03pubkey3"
2074 ],
2075 )
2076 .unwrap();
2077 }
2078
2079 let storage = SqliteStorage::new(&temp_dir).unwrap();
2081
2082 let completed = storage
2084 .get_payment_by_id("ln-completed".to_string())
2085 .await
2086 .unwrap();
2087 match &completed.details {
2088 Some(PaymentDetails::Lightning { htlc_details, .. }) => {
2089 assert_eq!(htlc_details.status, SparkHtlcStatus::PreimageShared);
2090 assert_eq!(htlc_details.expiry_time, 0);
2091 assert_eq!(
2092 htlc_details.payment_hash,
2093 "hash_completed_0123456789abcdef0123456789abcdef0123456789abcdef01234567"
2094 );
2095 assert_eq!(htlc_details.preimage.as_deref(), Some("preimage_completed"));
2096 }
2097 _ => panic!("Expected Lightning payment details for ln-completed"),
2098 }
2099
2100 let pending = storage
2102 .get_payment_by_id("ln-pending".to_string())
2103 .await
2104 .unwrap();
2105 match &pending.details {
2106 Some(PaymentDetails::Lightning { htlc_details, .. }) => {
2107 assert_eq!(htlc_details.status, SparkHtlcStatus::WaitingForPreimage);
2108 assert_eq!(htlc_details.expiry_time, 0);
2109 assert_eq!(
2110 htlc_details.payment_hash,
2111 "hash_pending_0123456789abcdef0123456789abcdef0123456789abcdef012345678"
2112 );
2113 assert!(htlc_details.preimage.is_none());
2114 }
2115 _ => panic!("Expected Lightning payment details for ln-pending"),
2116 }
2117
2118 let failed = storage
2120 .get_payment_by_id("ln-failed".to_string())
2121 .await
2122 .unwrap();
2123 match &failed.details {
2124 Some(PaymentDetails::Lightning { htlc_details, .. }) => {
2125 assert_eq!(htlc_details.status, SparkHtlcStatus::Returned);
2126 assert_eq!(htlc_details.expiry_time, 0);
2127 }
2128 _ => panic!("Expected Lightning payment details for ln-failed"),
2129 }
2130
2131 let waiting_payments = storage
2133 .list_payments(StorageListPaymentsRequest {
2134 payment_details_filter: Some(vec![StoragePaymentDetailsFilter::Lightning {
2135 htlc_status: Some(vec![SparkHtlcStatus::WaitingForPreimage]),
2136 has_lnurl_preimage: None,
2137 }]),
2138 ..Default::default()
2139 })
2140 .await
2141 .unwrap();
2142 assert_eq!(waiting_payments.len(), 1);
2143 assert_eq!(waiting_payments[0].id, "ln-pending");
2144
2145 let preimage_shared = storage
2146 .list_payments(StorageListPaymentsRequest {
2147 payment_details_filter: Some(vec![StoragePaymentDetailsFilter::Lightning {
2148 htlc_status: Some(vec![SparkHtlcStatus::PreimageShared]),
2149 has_lnurl_preimage: None,
2150 }]),
2151 ..Default::default()
2152 })
2153 .await
2154 .unwrap();
2155 assert_eq!(preimage_shared.len(), 1);
2156 assert_eq!(preimage_shared[0].id, "ln-completed");
2157
2158 let returned = storage
2159 .list_payments(StorageListPaymentsRequest {
2160 payment_details_filter: Some(vec![StoragePaymentDetailsFilter::Lightning {
2161 htlc_status: Some(vec![SparkHtlcStatus::Returned]),
2162 has_lnurl_preimage: None,
2163 }]),
2164 ..Default::default()
2165 })
2166 .await
2167 .unwrap();
2168 assert_eq!(returned.len(), 1);
2169 assert_eq!(returned[0].id, "ln-failed");
2170 }
2171}