1use std::path::{Path, PathBuf};
2
3use macros::async_trait;
4use rusqlite::{
5 Connection, Row, ToSql, TransactionBehavior, params,
6 types::{FromSql, FromSqlError, FromSqlResult, ToSqlOutput, ValueRef},
7};
8use rusqlite_migration::{M, Migrations, SchemaVersion};
9
10use crate::{
11 AssetFilter, Contact, ConversionInfo, DepositInfo, ListContactsRequest, LnurlPayInfo,
12 LnurlReceiveMetadata, LnurlWithdrawInfo, PaymentDetails, PaymentMethod, SparkHtlcDetails,
13 SparkHtlcStatus, TokenTransactionType,
14 error::DepositClaimError,
15 persist::{
16 PaymentMetadata, SetLnurlMetadataItem, StorageListPaymentsRequest,
17 StoragePaymentDetailsFilter, UpdateDepositPayload,
18 },
19 sync_storage::{
20 IncomingChange, OutgoingChange, Record, RecordChange, RecordId, UnversionedRecordChange,
21 },
22};
23
24use std::collections::HashMap;
25
26use tracing::warn;
27
28use super::{Payment, Storage, StorageError};
29
30const DEFAULT_DB_FILENAME: &str = "storage.sql";
31pub struct SqliteStorage {
33 db_dir: PathBuf,
34}
35
36impl SqliteStorage {
37 pub fn new(path: &Path) -> Result<Self, StorageError> {
47 let storage = Self {
48 db_dir: path.to_path_buf(),
49 };
50
51 #[cfg(not(all(target_family = "wasm", target_os = "unknown")))]
52 std::fs::create_dir_all(path)
53 .map_err(|e| StorageError::InitializationError(e.to_string()))?;
54
55 storage.migrate()?;
56 Ok(storage)
57 }
58
59 pub(crate) fn get_connection(&self) -> Result<Connection, StorageError> {
60 Ok(Connection::open(self.get_db_path())?)
61 }
62
63 fn get_db_path(&self) -> PathBuf {
64 self.db_dir.join(DEFAULT_DB_FILENAME)
65 }
66
67 fn migrate(&self) -> Result<(), StorageError> {
68 let migrations =
69 Migrations::new(Self::current_migrations().into_iter().map(M::up).collect());
70 let mut conn = self.get_connection()?;
71 let previous_version = match migrations.current_version(&conn)? {
72 SchemaVersion::Inside(previous_version) => previous_version.get(),
73 _ => 0,
74 };
75 migrations.to_latest(&mut conn)?;
76
77 if previous_version < 6 {
78 Self::migrate_lnurl_metadata_description(&mut conn)?;
79 }
80
81 Ok(())
82 }
83
84 fn migrate_lnurl_metadata_description(conn: &mut Connection) -> Result<(), StorageError> {
85 let mut stmt = conn.prepare("SELECT payment_id, lnurl_pay_info FROM payment_metadata")?;
86 let pay_infos: Vec<_> = stmt
87 .query_map([], |row| {
88 let payment_id: String = row.get(0)?;
89 let lnurl_pay_info: Option<LnurlPayInfo> = row.get(1)?;
90 Ok((payment_id, lnurl_pay_info))
91 })?
92 .collect::<Result<_, _>>()?;
93 let pay_infos = pay_infos
94 .into_iter()
95 .filter_map(|(payment_id, lnurl_pay_info)| {
96 let pay_info = lnurl_pay_info?;
97 let description = pay_info.extract_description()?;
98 Some((payment_id, description))
99 })
100 .collect::<Vec<_>>();
101
102 for pay_info in pay_infos {
103 conn.execute(
104 "UPDATE payment_metadata SET lnurl_description = ? WHERE payment_id = ?",
105 params![pay_info.1, pay_info.0],
106 )?;
107 }
108
109 Ok(())
110 }
111
112 #[allow(clippy::too_many_lines)]
113 pub(crate) fn current_migrations() -> Vec<&'static str> {
114 vec![
115 "CREATE TABLE IF NOT EXISTS payments (
116 id TEXT PRIMARY KEY,
117 payment_type TEXT NOT NULL,
118 status TEXT NOT NULL,
119 amount INTEGER NOT NULL,
120 fees INTEGER NOT NULL,
121 timestamp INTEGER NOT NULL,
122 details TEXT,
123 method TEXT
124 );",
125 "CREATE TABLE IF NOT EXISTS settings (
126 key TEXT PRIMARY KEY,
127 value TEXT NOT NULL
128 );",
129 "CREATE TABLE IF NOT EXISTS unclaimed_deposits (
130 txid TEXT NOT NULL,
131 vout INTEGER NOT NULL,
132 amount_sats INTEGER,
133 claim_error TEXT,
134 refund_tx TEXT,
135 refund_tx_id TEXT,
136 PRIMARY KEY (txid, vout)
137 );",
138 "CREATE TABLE IF NOT EXISTS payment_metadata (
139 payment_id TEXT PRIMARY KEY,
140 lnurl_pay_info TEXT
141 );",
142 "CREATE TABLE IF NOT EXISTS deposit_refunds (
143 deposit_tx_id TEXT NOT NULL,
144 deposit_vout INTEGER NOT NULL,
145 refund_tx TEXT NOT NULL,
146 refund_tx_id TEXT NOT NULL,
147 PRIMARY KEY (deposit_tx_id, deposit_vout)
148 );",
149 "ALTER TABLE payment_metadata ADD COLUMN lnurl_description TEXT;",
150 "
151 ALTER TABLE payments ADD COLUMN withdraw_tx_id TEXT;
152 ALTER TABLE payments ADD COLUMN deposit_tx_id TEXT;
153 ALTER TABLE payments ADD COLUMN spark INTEGER;
154 CREATE TABLE payment_details_lightning (
155 payment_id TEXT PRIMARY KEY,
156 invoice TEXT NOT NULL,
157 payment_hash TEXT NOT NULL,
158 destination_pubkey TEXT NOT NULL,
159 description TEXT,
160 preimage TEXT,
161 FOREIGN KEY (payment_id) REFERENCES payments(id) ON DELETE CASCADE
162 );
163 INSERT INTO payment_details_lightning (payment_id, invoice, payment_hash, destination_pubkey, description, preimage)
164 SELECT id, json_extract(details, '$.Lightning.invoice'), json_extract(details, '$.Lightning.payment_hash'),
165 json_extract(details, '$.Lightning.destination_pubkey'), json_extract(details, '$.Lightning.description'),
166 json_extract(details, '$.Lightning.preimage')
167 FROM payments WHERE json_extract(details, '$.Lightning.invoice') IS NOT NULL;
168
169 UPDATE payments SET withdraw_tx_id = json_extract(details, '$.Withdraw.tx_id')
170 WHERE json_extract(details, '$.Withdraw.tx_id') IS NOT NULL;
171
172 UPDATE payments SET deposit_tx_id = json_extract(details, '$.Deposit.tx_id')
173 WHERE json_extract(details, '$.Deposit.tx_id') IS NOT NULL;
174
175 ALTER TABLE payments DROP COLUMN details;
176
177 CREATE INDEX idx_payment_details_lightning_invoice ON payment_details_lightning(invoice);
178 ",
179 "CREATE TABLE payment_details_token (
180 payment_id TEXT PRIMARY KEY,
181 metadata TEXT NOT NULL,
182 tx_hash TEXT NOT NULL,
183 FOREIGN KEY (payment_id) REFERENCES payments(id) ON DELETE CASCADE
184 );",
185 "CREATE TABLE payments_new (
187 id TEXT PRIMARY KEY,
188 payment_type TEXT NOT NULL,
189 status TEXT NOT NULL,
190 amount TEXT NOT NULL,
191 fees TEXT NOT NULL,
192 timestamp INTEGER NOT NULL,
193 method TEXT,
194 withdraw_tx_id TEXT,
195 deposit_tx_id TEXT,
196 spark INTEGER
197 );",
198 "INSERT INTO payments_new (id, payment_type, status, amount, fees, timestamp, method, withdraw_tx_id, deposit_tx_id, spark)
199 SELECT id, payment_type, status, CAST(amount AS TEXT), CAST(fees AS TEXT), timestamp, method, withdraw_tx_id, deposit_tx_id, spark
200 FROM payments;",
201 "DROP TABLE payments;",
202 "ALTER TABLE payments_new RENAME TO payments;",
203 "CREATE TABLE payment_details_spark (
204 payment_id TEXT NOT NULL PRIMARY KEY,
205 invoice_details TEXT NOT NULL,
206 FOREIGN KEY (payment_id) REFERENCES payments(id) ON DELETE CASCADE
207 );
208 ALTER TABLE payment_details_token ADD COLUMN invoice_details TEXT;",
209 "ALTER TABLE payment_metadata ADD COLUMN lnurl_withdraw_info TEXT;",
210 "CREATE TABLE sync_revision (
214 revision INTEGER NOT NULL DEFAULT 0
215 );
216 INSERT INTO sync_revision (revision) VALUES (0);
217 CREATE TABLE sync_outgoing(
218 record_type TEXT NOT NULL,
219 data_id TEXT NOT NULL,
220 schema_version TEXT NOT NULL,
221 commit_time INTEGER NOT NULL,
222 updated_fields_json TEXT NOT NULL,
223 revision INTEGER NOT NULL
224 );
225 CREATE INDEX idx_sync_outgoing_data_id_record_type ON sync_outgoing(record_type, data_id);
226 CREATE TABLE sync_state(
227 record_type TEXT NOT NULL,
228 data_id TEXT NOT NULL,
229 schema_version TEXT NOT NULL,
230 commit_time INTEGER NOT NULL,
231 data TEXT NOT NULL,
232 revision INTEGER NOT NULL,
233 PRIMARY KEY(record_type, data_id)
234 );",
235 "CREATE TABLE sync_incoming(
236 record_type TEXT NOT NULL,
237 data_id TEXT NOT NULL,
238 schema_version TEXT NOT NULL,
239 commit_time INTEGER NOT NULL,
240 data TEXT NOT NULL,
241 revision INTEGER NOT NULL,
242 PRIMARY KEY(record_type, data_id, revision)
243 );
244 CREATE INDEX idx_sync_incoming_revision ON sync_incoming(revision);",
245 "ALTER TABLE payment_details_spark RENAME TO tmp_payment_details_spark;
246 CREATE TABLE payment_details_spark (
247 payment_id TEXT NOT NULL PRIMARY KEY,
248 invoice_details TEXT,
249 htlc_details TEXT,
250 FOREIGN KEY (payment_id) REFERENCES payments(id) ON DELETE CASCADE
251 );
252 INSERT INTO payment_details_spark (payment_id, invoice_details)
253 SELECT payment_id, invoice_details FROM tmp_payment_details_spark;
254 DROP TABLE tmp_payment_details_spark;",
255 "CREATE TABLE lnurl_receive_metadata (
256 payment_hash TEXT NOT NULL PRIMARY KEY,
257 nostr_zap_request TEXT,
258 nostr_zap_receipt TEXT,
259 sender_comment TEXT
260 );",
261 "DELETE FROM unclaimed_deposits;",
264 "DELETE FROM sync_outgoing;
269 DELETE FROM sync_incoming;
270 DELETE FROM sync_state;
271 UPDATE sync_revision SET revision = 0;
272 DELETE FROM settings WHERE key = 'sync_initial_complete';",
273 "ALTER TABLE payment_metadata ADD COLUMN token_conversion_info TEXT;",
274 "ALTER TABLE payment_metadata ADD COLUMN parent_payment_id TEXT;",
275 "
276 ALTER TABLE payment_metadata DROP COLUMN token_conversion_info;
277 ALTER TABLE payment_metadata ADD COLUMN conversion_info TEXT;
278 ",
279 "ALTER TABLE payment_details_token ADD COLUMN tx_type TEXT NOT NULL DEFAULT 'transfer';
284 UPDATE settings
285 SET value = json_set(value, '$.last_synced_final_token_payment_id', NULL)
286 WHERE key = 'sync_offset' AND json_valid(value) AND json_type(value, '$.last_synced_final_token_payment_id') IS NOT NULL;",
287 "DELETE FROM sync_outgoing;
288 DELETE FROM sync_incoming;
289 DELETE FROM sync_state;
290 UPDATE sync_revision SET revision = 0;
291 DELETE FROM settings WHERE key = 'sync_initial_complete';",
292 "ALTER TABLE payment_details_lightning ADD COLUMN htlc_status TEXT NOT NULL DEFAULT 'WaitingForPreimage';
293 ALTER TABLE payment_details_lightning ADD COLUMN htlc_expiry_time INTEGER NOT NULL DEFAULT 0;",
294 "UPDATE payment_details_lightning
299 SET htlc_status = CASE
300 WHEN (SELECT status FROM payments WHERE id = payment_id) = 'completed' THEN 'PreimageShared'
301 WHEN (SELECT status FROM payments WHERE id = payment_id) = 'pending' THEN 'WaitingForPreimage'
302 ELSE 'Returned'
303 END;
304 UPDATE settings
305 SET value = json_set(value, '$.offset', 0)
306 WHERE key = 'sync_offset' AND json_valid(value);",
307 "ALTER TABLE lnurl_receive_metadata ADD COLUMN preimage TEXT;",
309 "DELETE FROM settings WHERE key = 'lnurl_metadata_updated_after';",
312 "DELETE FROM settings WHERE key = 'lightning_address';",
314 "CREATE INDEX IF NOT EXISTS idx_payment_details_lightning_payment_hash ON payment_details_lightning(payment_hash);",
316 "CREATE TABLE contacts (
317 id TEXT PRIMARY KEY,
318 name TEXT NOT NULL,
319 payment_identifier TEXT NOT NULL,
320 created_at INTEGER NOT NULL,
321 updated_at INTEGER NOT NULL
322 );",
323 ]
324 }
325}
326
327#[allow(clippy::needless_pass_by_value)]
331fn map_sqlite_error(e: rusqlite::Error) -> StorageError {
332 match e {
333 rusqlite::Error::SqliteFailure(err, _)
334 if err.code == rusqlite::ErrorCode::DatabaseBusy
335 || err.code == rusqlite::ErrorCode::DatabaseLocked =>
336 {
337 StorageError::Connection(e.to_string())
338 }
339 _ => StorageError::Implementation(e.to_string()),
340 }
341}
342
343impl From<rusqlite::Error> for StorageError {
344 fn from(value: rusqlite::Error) -> Self {
345 map_sqlite_error(value)
346 }
347}
348
349impl From<rusqlite_migration::Error> for StorageError {
350 fn from(value: rusqlite_migration::Error) -> Self {
351 StorageError::Implementation(value.to_string())
352 }
353}
354
355#[async_trait]
356impl Storage for SqliteStorage {
357 #[allow(clippy::too_many_lines)]
358 async fn list_payments(
359 &self,
360 request: StorageListPaymentsRequest,
361 ) -> Result<Vec<Payment>, StorageError> {
362 let connection = self.get_connection()?;
363
364 let mut where_clauses = Vec::new();
366 let mut params: Vec<Box<dyn ToSql>> = Vec::new();
367
368 if let Some(ref type_filter) = request.type_filter
370 && !type_filter.is_empty()
371 {
372 let placeholders = type_filter
373 .iter()
374 .map(|_| "?")
375 .collect::<Vec<_>>()
376 .join(", ");
377 where_clauses.push(format!("p.payment_type IN ({placeholders})"));
378 for payment_type in type_filter {
379 params.push(Box::new(payment_type.to_string()));
380 }
381 }
382
383 if let Some(ref status_filter) = request.status_filter
385 && !status_filter.is_empty()
386 {
387 let placeholders = status_filter
388 .iter()
389 .map(|_| "?")
390 .collect::<Vec<_>>()
391 .join(", ");
392 where_clauses.push(format!("p.status IN ({placeholders})"));
393 for status in status_filter {
394 params.push(Box::new(status.to_string()));
395 }
396 }
397
398 if let Some(from_timestamp) = request.from_timestamp {
400 where_clauses.push("p.timestamp >= ?".to_string());
401 params.push(Box::new(from_timestamp));
402 }
403
404 if let Some(to_timestamp) = request.to_timestamp {
405 where_clauses.push("p.timestamp < ?".to_string());
406 params.push(Box::new(to_timestamp));
407 }
408
409 if let Some(ref asset_filter) = request.asset_filter {
411 match asset_filter {
412 AssetFilter::Bitcoin => {
413 where_clauses.push("t.metadata IS NULL".to_string());
414 }
415 AssetFilter::Token { token_identifier } => {
416 where_clauses.push("t.metadata IS NOT NULL".to_string());
417 if let Some(identifier) = token_identifier {
418 where_clauses
420 .push("json_extract(t.metadata, '$.identifier') = ?".to_string());
421 params.push(Box::new(identifier.clone()));
422 }
423 }
424 }
425 }
426
427 if let Some(ref payment_details_filter) = request.payment_details_filter {
429 let mut all_payment_details_clauses = Vec::new();
430 for payment_details_filter in payment_details_filter {
431 let mut payment_details_clauses = Vec::new();
432 let htlc_filter = match payment_details_filter {
434 StoragePaymentDetailsFilter::Spark {
435 htlc_status: Some(s),
436 ..
437 } if !s.is_empty() => Some(("s", s)),
438 StoragePaymentDetailsFilter::Lightning {
439 htlc_status: Some(s),
440 ..
441 } if !s.is_empty() => Some(("l", s)),
442 _ => None,
443 };
444 if let Some((alias, htlc_statuses)) = htlc_filter {
445 let placeholders = htlc_statuses
446 .iter()
447 .map(|_| "?")
448 .collect::<Vec<_>>()
449 .join(", ");
450 if alias == "l" {
451 payment_details_clauses.push(format!("l.htlc_status IN ({placeholders})"));
453 } else {
454 payment_details_clauses.push(format!(
456 "json_extract(s.htlc_details, '$.status') IN ({placeholders})"
457 ));
458 }
459 for htlc_status in htlc_statuses {
460 params.push(Box::new(htlc_status.to_string()));
461 }
462 }
463 let conversion_filter = match payment_details_filter {
465 StoragePaymentDetailsFilter::Spark {
466 conversion_refund_needed: Some(v),
467 ..
468 } => Some((v, "p.spark = 1")),
469 StoragePaymentDetailsFilter::Token {
470 conversion_refund_needed: Some(v),
471 ..
472 } => Some((v, "p.spark IS NULL")),
473 _ => None,
474 };
475 if let Some((conversion_refund_needed, type_check)) = conversion_filter {
476 let refund_needed = if *conversion_refund_needed {
477 "= 'RefundNeeded'"
478 } else {
479 "!= 'RefundNeeded'"
480 };
481 payment_details_clauses.push(format!(
482 "{type_check} AND pm.conversion_info IS NOT NULL AND
483 json_extract(pm.conversion_info, '$.status') {refund_needed}"
484 ));
485 }
486 if let StoragePaymentDetailsFilter::Token {
488 tx_hash: Some(tx_hash),
489 ..
490 } = payment_details_filter
491 {
492 payment_details_clauses.push("t.tx_hash = ?".to_string());
493 params.push(Box::new(tx_hash.clone()));
494 }
495
496 if let StoragePaymentDetailsFilter::Token {
498 tx_type: Some(tx_type),
499 ..
500 } = payment_details_filter
501 {
502 payment_details_clauses.push("t.tx_type = ?".to_string());
503 params.push(Box::new(tx_type.to_string()));
504 }
505
506 if let StoragePaymentDetailsFilter::Lightning {
508 has_lnurl_preimage: Some(has_preimage),
509 ..
510 } = payment_details_filter
511 {
512 if *has_preimage {
513 payment_details_clauses.push("lrm.preimage IS NOT NULL".to_string());
514 } else {
515 payment_details_clauses.push(
517 "lrm.payment_hash IS NOT NULL AND l.preimage IS NOT NULL AND lrm.preimage IS NULL".to_string(),
518 );
519 }
520 }
521
522 if !payment_details_clauses.is_empty() {
523 all_payment_details_clauses
524 .push(format!("({})", payment_details_clauses.join(" AND ")));
525 }
526 }
527
528 if !all_payment_details_clauses.is_empty() {
529 where_clauses.push(format!("({})", all_payment_details_clauses.join(" OR ")));
530 }
531 }
532
533 where_clauses.push("pm.parent_payment_id IS NULL".to_string());
536
537 let where_sql = if where_clauses.is_empty() {
539 String::new()
540 } else {
541 format!("WHERE {}", where_clauses.join(" AND "))
542 };
543
544 let order_direction = if request.sort_ascending.unwrap_or(false) {
546 "ASC"
547 } else {
548 "DESC"
549 };
550
551 let query = format!(
552 "{SELECT_PAYMENT_SQL} {where_sql} ORDER BY p.timestamp {order_direction} LIMIT {} OFFSET {}",
553 request.limit.unwrap_or(u32::MAX),
554 request.offset.unwrap_or(0)
555 );
556
557 let mut stmt = connection.prepare(&query)?;
558 let param_refs: Vec<&dyn ToSql> = params.iter().map(std::convert::AsRef::as_ref).collect();
559 let payments = stmt
560 .query_map(param_refs.as_slice(), map_payment)?
561 .collect::<Result<Vec<_>, _>>()?;
562 Ok(payments)
563 }
564
565 #[allow(clippy::too_many_lines)]
566 async fn insert_payment(&self, payment: Payment) -> Result<(), StorageError> {
567 let mut connection = self.get_connection()?;
568 let tx = connection.transaction_with_behavior(TransactionBehavior::Immediate)?;
569
570 let (withdraw_tx_id, deposit_tx_id, spark): (Option<&str>, Option<&str>, Option<bool>) =
572 match &payment.details {
573 Some(PaymentDetails::Withdraw { tx_id }) => (Some(tx_id.as_str()), None, None),
574 Some(PaymentDetails::Deposit { tx_id }) => (None, Some(tx_id.as_str()), None),
575 Some(PaymentDetails::Spark { .. }) => (None, None, Some(true)),
576 _ => (None, None, None),
577 };
578
579 tx.execute(
581 "INSERT INTO payments (id, payment_type, status, amount, fees, timestamp, method, withdraw_tx_id, deposit_tx_id, spark)
582 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
583 ON CONFLICT(id) DO UPDATE SET
584 payment_type=excluded.payment_type,
585 status=excluded.status,
586 amount=excluded.amount,
587 fees=excluded.fees,
588 timestamp=excluded.timestamp,
589 method=excluded.method,
590 withdraw_tx_id=excluded.withdraw_tx_id,
591 deposit_tx_id=excluded.deposit_tx_id,
592 spark=excluded.spark",
593 params![
594 payment.id,
595 payment.payment_type.to_string(),
596 payment.status.to_string(),
597 U128SqlWrapper(payment.amount),
598 U128SqlWrapper(payment.fees),
599 payment.timestamp,
600 payment.method,
601 withdraw_tx_id,
602 deposit_tx_id,
603 spark,
604 ],
605 )?;
606
607 match payment.details {
608 Some(PaymentDetails::Spark {
609 invoice_details,
610 htlc_details,
611 ..
612 }) => {
613 if invoice_details.is_some() || htlc_details.is_some() {
614 tx.execute(
616 "INSERT INTO payment_details_spark (payment_id, invoice_details, htlc_details)
617 VALUES (?, ?, ?)
618 ON CONFLICT(payment_id) DO UPDATE SET
619 invoice_details=COALESCE(excluded.invoice_details, payment_details_spark.invoice_details),
620 htlc_details=COALESCE(excluded.htlc_details, payment_details_spark.htlc_details)",
621 params![
622 payment.id,
623 invoice_details.as_ref().map(serde_json::to_string).transpose()?,
624 htlc_details.as_ref().map(serde_json::to_string).transpose()?,
625 ],
626 )?;
627 }
628 }
629 Some(PaymentDetails::Token {
630 metadata,
631 tx_hash,
632 tx_type,
633 invoice_details,
634 ..
635 }) => {
636 tx.execute(
637 "INSERT INTO payment_details_token (payment_id, metadata, tx_hash, tx_type, invoice_details)
638 VALUES (?, ?, ?, ?, ?)
639 ON CONFLICT(payment_id) DO UPDATE SET
640 metadata=excluded.metadata,
641 tx_hash=excluded.tx_hash,
642 tx_type=excluded.tx_type,
643 invoice_details=COALESCE(excluded.invoice_details, payment_details_token.invoice_details)",
644 params![
645 payment.id,
646 serde_json::to_string(&metadata)?,
647 tx_hash,
648 tx_type.to_string(),
649 invoice_details.as_ref().map(serde_json::to_string).transpose()?,
650 ],
651 )?;
652 }
653 Some(PaymentDetails::Lightning {
654 invoice,
655 destination_pubkey,
656 description,
657 htlc_details,
658 ..
659 }) => {
660 tx.execute(
661 "INSERT INTO payment_details_lightning (payment_id, invoice, payment_hash, destination_pubkey, description, preimage, htlc_status, htlc_expiry_time)
662 VALUES (?, ?, ?, ?, ?, ?, ?, ?)
663 ON CONFLICT(payment_id) DO UPDATE SET
664 invoice=excluded.invoice,
665 payment_hash=excluded.payment_hash,
666 destination_pubkey=excluded.destination_pubkey,
667 description=excluded.description,
668 preimage=COALESCE(excluded.preimage, payment_details_lightning.preimage),
669 htlc_status=COALESCE(excluded.htlc_status, payment_details_lightning.htlc_status),
670 htlc_expiry_time=COALESCE(excluded.htlc_expiry_time, payment_details_lightning.htlc_expiry_time)",
671 params![
672 payment.id,
673 invoice,
674 htlc_details.payment_hash,
675 destination_pubkey,
676 description,
677 htlc_details.preimage,
678 htlc_details.status.to_string(),
679 htlc_details.expiry_time,
680 ],
681 )?;
682 }
683 Some(PaymentDetails::Withdraw { .. } | PaymentDetails::Deposit { .. }) | None => {}
684 }
685
686 tx.commit()?;
687 Ok(())
688 }
689
690 async fn insert_payment_metadata(
691 &self,
692 payment_id: String,
693 metadata: PaymentMetadata,
694 ) -> Result<(), StorageError> {
695 let connection = self.get_connection()?;
696
697 connection.execute(
698 "INSERT INTO payment_metadata (payment_id, parent_payment_id, lnurl_pay_info, lnurl_withdraw_info, lnurl_description, conversion_info)
699 VALUES (?, ?, ?, ?, ?, ?)
700 ON CONFLICT(payment_id) DO UPDATE SET
701 parent_payment_id = COALESCE(excluded.parent_payment_id, parent_payment_id),
702 lnurl_pay_info = COALESCE(excluded.lnurl_pay_info, lnurl_pay_info),
703 lnurl_withdraw_info = COALESCE(excluded.lnurl_withdraw_info, lnurl_withdraw_info),
704 lnurl_description = COALESCE(excluded.lnurl_description, lnurl_description),
705 conversion_info = COALESCE(excluded.conversion_info, conversion_info)",
706 params![
707 payment_id,
708 metadata.parent_payment_id,
709 metadata.lnurl_pay_info,
710 metadata.lnurl_withdraw_info,
711 metadata.lnurl_description,
712 metadata.conversion_info.as_ref().map(serde_json::to_string).transpose()?,
713 ],
714 )?;
715
716 Ok(())
717 }
718
719 async fn set_cached_item(&self, key: String, value: String) -> Result<(), StorageError> {
720 let connection = self.get_connection()?;
721
722 connection.execute(
723 "INSERT OR REPLACE INTO settings (key, value) VALUES (?, ?)",
724 params![key, value],
725 )?;
726
727 Ok(())
728 }
729
730 async fn get_cached_item(&self, key: String) -> Result<Option<String>, StorageError> {
731 let connection = self.get_connection()?;
732
733 let mut stmt = connection.prepare("SELECT value FROM settings WHERE key = ?")?;
734
735 let result = stmt.query_row(params![key], |row| {
736 let value_str: String = row.get(0)?;
737 Ok(value_str)
738 });
739
740 match result {
741 Ok(value) => Ok(Some(value)),
742 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
743 Err(e) => Err(e.into()),
744 }
745 }
746
747 async fn delete_cached_item(&self, key: String) -> Result<(), StorageError> {
748 let connection = self.get_connection()?;
749
750 connection.execute("DELETE FROM settings WHERE key = ?", params![key])?;
751
752 Ok(())
753 }
754
755 async fn get_payment_by_id(&self, id: String) -> Result<Payment, StorageError> {
756 let connection = self.get_connection()?;
757 let query = format!("{SELECT_PAYMENT_SQL} WHERE p.id = ?");
758 let mut stmt = connection.prepare(&query)?;
759 let payment = stmt.query_row(params![id], map_payment)?;
760 Ok(payment)
761 }
762
763 async fn get_payment_by_invoice(
764 &self,
765 invoice: String,
766 ) -> Result<Option<Payment>, StorageError> {
767 let connection = self.get_connection()?;
768 let query = format!("{SELECT_PAYMENT_SQL} WHERE l.invoice = ?");
769 let mut stmt = connection.prepare(&query)?;
770 let payment = stmt.query_row(params![invoice], map_payment);
771 match payment {
772 Ok(payment) => Ok(Some(payment)),
773 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
774 Err(e) => Err(e.into()),
775 }
776 }
777
778 async fn get_payments_by_parent_ids(
779 &self,
780 parent_payment_ids: Vec<String>,
781 ) -> Result<HashMap<String, Vec<Payment>>, StorageError> {
782 if parent_payment_ids.is_empty() {
783 return Ok(HashMap::new());
784 }
785
786 let connection = self.get_connection()?;
787
788 let has_related: bool = connection.query_row(
790 "SELECT EXISTS(SELECT 1 FROM payment_metadata WHERE parent_payment_id IS NOT NULL LIMIT 1)",
791 [],
792 |row| row.get(0),
793 )?;
794 if !has_related {
795 return Ok(HashMap::new());
796 }
797
798 let placeholders: Vec<&str> = parent_payment_ids.iter().map(|_| "?").collect();
800 let in_clause = placeholders.join(", ");
801
802 let query = format!(
803 "{SELECT_PAYMENT_SQL} WHERE pm.parent_payment_id IN ({in_clause}) ORDER BY p.timestamp ASC"
804 );
805
806 let mut stmt = connection.prepare(&query)?;
807 let params: Vec<&dyn ToSql> = parent_payment_ids
808 .iter()
809 .map(|id| id as &dyn ToSql)
810 .collect();
811 let rows = stmt.query_map(params.as_slice(), |row| {
812 let payment = map_payment(row)?;
813 let parent_payment_id: String = row.get(30)?;
814 Ok((parent_payment_id, payment))
815 })?;
816
817 let mut result: HashMap<String, Vec<Payment>> = HashMap::new();
818 for row in rows {
819 let (parent_id, related_payment) = row?;
820 result.entry(parent_id).or_default().push(related_payment);
821 }
822
823 Ok(result)
824 }
825
826 async fn add_deposit(
827 &self,
828 txid: String,
829 vout: u32,
830 amount_sats: u64,
831 ) -> Result<(), StorageError> {
832 let connection = self.get_connection()?;
833 connection.execute(
834 "INSERT OR IGNORE INTO unclaimed_deposits (txid, vout, amount_sats)
835 VALUES (?, ?, ?)",
836 params![txid, vout, amount_sats,],
837 )?;
838 Ok(())
839 }
840
841 async fn delete_deposit(&self, txid: String, vout: u32) -> Result<(), StorageError> {
842 let connection = self.get_connection()?;
843 connection.execute(
844 "DELETE FROM unclaimed_deposits WHERE txid = ? AND vout = ?",
845 params![txid, vout],
846 )?;
847 Ok(())
848 }
849
850 async fn list_deposits(&self) -> Result<Vec<DepositInfo>, StorageError> {
851 let connection = self.get_connection()?;
852 let mut stmt =
853 connection.prepare("SELECT txid, vout, amount_sats, claim_error, refund_tx, refund_tx_id FROM unclaimed_deposits")?;
854 let rows = stmt.query_map(params![], |row| {
855 Ok(DepositInfo {
856 txid: row.get(0)?,
857 vout: row.get(1)?,
858 amount_sats: row.get(2)?,
859 claim_error: row.get(3)?,
860 refund_tx: row.get(4)?,
861 refund_tx_id: row.get(5)?,
862 })
863 })?;
864 let mut deposits = Vec::new();
865 for row in rows {
866 deposits.push(row?);
867 }
868 Ok(deposits)
869 }
870
871 async fn update_deposit(
872 &self,
873 txid: String,
874 vout: u32,
875 payload: UpdateDepositPayload,
876 ) -> Result<(), StorageError> {
877 let connection = self.get_connection()?;
878 match payload {
879 UpdateDepositPayload::ClaimError { error } => {
880 connection.execute(
881 "UPDATE unclaimed_deposits SET claim_error = ?, refund_tx = NULL, refund_tx_id = NULL WHERE txid = ? AND vout = ?",
882 params![error, txid, vout],
883 )?;
884 }
885 UpdateDepositPayload::Refund {
886 refund_txid,
887 refund_tx,
888 } => {
889 connection.execute(
890 "UPDATE unclaimed_deposits SET refund_tx = ?, refund_tx_id = ?, claim_error = NULL WHERE txid = ? AND vout = ?",
891 params![refund_tx, refund_txid, txid, vout],
892 )?;
893 }
894 }
895 Ok(())
896 }
897
898 async fn set_lnurl_metadata(
899 &self,
900 metadata: Vec<SetLnurlMetadataItem>,
901 ) -> Result<(), StorageError> {
902 let connection = self.get_connection()?;
903 for metadata in metadata {
904 connection.execute(
905 "INSERT OR REPLACE INTO lnurl_receive_metadata (payment_hash, nostr_zap_request, nostr_zap_receipt, sender_comment, preimage)
906 VALUES (?, ?, ?, ?, ?)",
907 params![
908 metadata.payment_hash,
909 metadata.nostr_zap_request,
910 metadata.nostr_zap_receipt,
911 metadata.sender_comment,
912 metadata.preimage,
913 ],
914 )?;
915 }
916 Ok(())
917 }
918
919 async fn list_contacts(
920 &self,
921 request: ListContactsRequest,
922 ) -> Result<Vec<Contact>, StorageError> {
923 let limit = request.limit.unwrap_or(u32::MAX);
924 let offset = request.offset.unwrap_or(0);
925 let connection = self.get_connection()?;
926 let query = "SELECT id, name, payment_identifier, created_at, updated_at FROM contacts ORDER BY name ASC LIMIT ? OFFSET ?";
927
928 let mut stmt = connection.prepare(query)?;
929 let contacts = stmt
930 .query_map(params![limit, offset], |row| {
931 Ok(Contact {
932 id: row.get(0)?,
933 name: row.get(1)?,
934 payment_identifier: row.get(2)?,
935 created_at: row.get(3)?,
936 updated_at: row.get(4)?,
937 })
938 })?
939 .collect::<Result<Vec<_>, _>>()?;
940 Ok(contacts)
941 }
942
943 async fn get_contact(&self, id: String) -> Result<Contact, StorageError> {
944 let connection = self.get_connection()?;
945 let mut stmt = connection.prepare(
946 "SELECT id, name, payment_identifier, created_at, updated_at FROM contacts WHERE id = ?",
947 )?;
948 stmt.query_row(params![id], |row| {
949 Ok(Contact {
950 id: row.get(0)?,
951 name: row.get(1)?,
952 payment_identifier: row.get(2)?,
953 created_at: row.get(3)?,
954 updated_at: row.get(4)?,
955 })
956 })
957 .map_err(|e| match e {
958 rusqlite::Error::QueryReturnedNoRows => StorageError::NotFound,
959 other => other.into(),
960 })
961 }
962
963 async fn insert_contact(&self, contact: Contact) -> Result<(), StorageError> {
964 let connection = self.get_connection()?;
965 connection.execute(
966 "INSERT INTO contacts (id, name, payment_identifier, created_at, updated_at)
967 VALUES (?, ?, ?, ?, ?)
968 ON CONFLICT(id) DO UPDATE SET
969 name = excluded.name,
970 payment_identifier = excluded.payment_identifier,
971 updated_at = excluded.updated_at",
972 params![
973 contact.id,
974 contact.name,
975 contact.payment_identifier,
976 contact.created_at,
977 contact.updated_at,
978 ],
979 )?;
980 Ok(())
981 }
982
983 async fn delete_contact(&self, id: String) -> Result<(), StorageError> {
984 let connection = self.get_connection()?;
985 connection.execute("DELETE FROM contacts WHERE id = ?", params![id])?;
986 Ok(())
987 }
988
989 async fn add_outgoing_change(
990 &self,
991 record: UnversionedRecordChange,
992 ) -> Result<u64, StorageError> {
993 let mut connection = self.get_connection()?;
994 let tx = connection
995 .transaction_with_behavior(TransactionBehavior::Immediate)
996 .map_err(map_sqlite_error)?;
997
998 let local_revision: u64 = tx
1000 .query_row(
1001 "SELECT COALESCE(MAX(revision), 0) + 1 FROM sync_outgoing",
1002 [],
1003 |row| row.get(0),
1004 )
1005 .map_err(map_sqlite_error)?;
1006
1007 tx.execute(
1008 "INSERT INTO sync_outgoing (
1009 record_type
1010 , data_id
1011 , schema_version
1012 , commit_time
1013 , updated_fields_json
1014 , revision
1015 )
1016 VALUES (?, ?, ?, strftime('%s','now'), ?, ?)",
1017 params![
1018 record.id.r#type,
1019 record.id.data_id,
1020 record.schema_version.clone(),
1021 serde_json::to_string(&record.updated_fields)?,
1022 local_revision,
1023 ],
1024 )
1025 .map_err(map_sqlite_error)?;
1026
1027 tx.commit().map_err(map_sqlite_error)?;
1028 Ok(local_revision)
1029 }
1030
1031 async fn complete_outgoing_sync(
1032 &self,
1033 record: Record,
1034 local_revision: u64,
1035 ) -> Result<(), StorageError> {
1036 let mut connection = self.get_connection()?;
1037 let tx = connection
1038 .transaction_with_behavior(TransactionBehavior::Immediate)
1039 .map_err(map_sqlite_error)?;
1040
1041 let rows_deleted = tx
1042 .execute(
1043 "DELETE FROM sync_outgoing WHERE record_type = ? AND data_id = ? AND revision = ?",
1044 params![record.id.r#type, record.id.data_id, local_revision],
1045 )
1046 .map_err(map_sqlite_error)?;
1047
1048 if rows_deleted == 0 {
1049 warn!(
1050 "complete_outgoing_sync: DELETE from sync_outgoing matched 0 rows \
1051 (type={}, data_id={}, revision={})",
1052 record.id.r#type, record.id.data_id, local_revision
1053 );
1054 }
1055
1056 tx.execute(
1057 "INSERT OR REPLACE INTO sync_state (
1058 record_type
1059 , data_id
1060 , schema_version
1061 , commit_time
1062 , data
1063 , revision
1064 )
1065 VALUES (?, ?, ?, strftime('%s','now'), ?, ?)",
1066 params![
1067 record.id.r#type,
1068 record.id.data_id,
1069 record.schema_version.clone(),
1070 serde_json::to_string(&record.data)?,
1071 record.revision,
1072 ],
1073 )
1074 .map_err(map_sqlite_error)?;
1075
1076 tx.execute(
1077 "UPDATE sync_revision SET revision = MAX(revision, ?)",
1078 params![record.revision],
1079 )
1080 .map_err(map_sqlite_error)?;
1081
1082 tx.commit().map_err(map_sqlite_error)?;
1083 Ok(())
1084 }
1085
1086 async fn get_pending_outgoing_changes(
1087 &self,
1088 limit: u32,
1089 ) -> Result<Vec<OutgoingChange>, StorageError> {
1090 let connection = self.get_connection()?;
1091
1092 let mut stmt = connection
1093 .prepare(
1094 "SELECT o.record_type
1095 , o.data_id
1096 , o.schema_version
1097 , o.commit_time
1098 , o.updated_fields_json
1099 , o.revision
1100 , e.schema_version AS existing_schema_version
1101 , e.commit_time AS existing_commit_time
1102 , e.data AS existing_data
1103 , e.revision AS existing_revision
1104 FROM sync_outgoing o
1105 LEFT JOIN sync_state e ON o.record_type = e.record_type AND o.data_id = e.data_id
1106 ORDER BY o.revision ASC
1107 LIMIT ?",
1108 )
1109 .map_err(map_sqlite_error)?;
1110 let mut rows = stmt.query(params![limit]).map_err(map_sqlite_error)?;
1111 let mut results = Vec::new();
1112 while let Some(row) = rows.next().map_err(map_sqlite_error)? {
1113 let parent = if let Some(existing_data) =
1114 row.get::<_, Option<String>>(8).map_err(map_sqlite_error)?
1115 {
1116 Some(Record {
1117 id: RecordId::new(
1118 row.get::<_, String>(0).map_err(map_sqlite_error)?,
1119 row.get::<_, String>(1).map_err(map_sqlite_error)?,
1120 ),
1121 schema_version: row.get(6).map_err(map_sqlite_error)?,
1122 revision: row.get(9).map_err(map_sqlite_error)?,
1123 data: serde_json::from_str(&existing_data)?,
1124 })
1125 } else {
1126 None
1127 };
1128 let change = RecordChange {
1129 id: RecordId::new(
1130 row.get::<_, String>(0).map_err(map_sqlite_error)?,
1131 row.get::<_, String>(1).map_err(map_sqlite_error)?,
1132 ),
1133 schema_version: row.get(2).map_err(map_sqlite_error)?,
1134 updated_fields: serde_json::from_str(
1135 &row.get::<_, String>(4).map_err(map_sqlite_error)?,
1136 )?,
1137 local_revision: row.get(5).map_err(map_sqlite_error)?,
1138 };
1139 results.push(OutgoingChange { change, parent });
1140 }
1141
1142 Ok(results)
1143 }
1144
1145 async fn get_last_revision(&self) -> Result<u64, StorageError> {
1146 let connection = self.get_connection()?;
1147
1148 let revision: u64 = connection
1149 .query_row("SELECT revision FROM sync_revision", [], |row| row.get(0))
1150 .map_err(map_sqlite_error)?;
1151
1152 Ok(revision)
1153 }
1154
1155 async fn insert_incoming_records(&self, records: Vec<Record>) -> Result<(), StorageError> {
1156 if records.is_empty() {
1157 return Ok(());
1158 }
1159
1160 let mut connection = self.get_connection()?;
1161 let tx = connection
1162 .transaction_with_behavior(TransactionBehavior::Immediate)
1163 .map_err(map_sqlite_error)?;
1164
1165 for record in records {
1166 tx.execute(
1167 "INSERT OR REPLACE INTO sync_incoming (
1168 record_type
1169 , data_id
1170 , schema_version
1171 , commit_time
1172 , data
1173 , revision
1174 )
1175 VALUES (?, ?, ?, strftime('%s','now'), ?, ?)",
1176 params![
1177 record.id.r#type,
1178 record.id.data_id,
1179 record.schema_version.clone(),
1180 serde_json::to_string(&record.data)?,
1181 record.revision,
1182 ],
1183 )
1184 .map_err(map_sqlite_error)?;
1185 }
1186
1187 tx.commit().map_err(map_sqlite_error)?;
1188 Ok(())
1189 }
1190
1191 async fn delete_incoming_record(&self, record: Record) -> Result<(), StorageError> {
1192 let connection = self.get_connection()?;
1193
1194 connection
1195 .execute(
1196 "DELETE FROM sync_incoming WHERE record_type = ? AND data_id = ? AND revision = ?",
1197 params![record.id.r#type, record.id.data_id, record.revision],
1198 )
1199 .map_err(map_sqlite_error)?;
1200
1201 Ok(())
1202 }
1203
1204 async fn get_incoming_records(&self, limit: u32) -> Result<Vec<IncomingChange>, StorageError> {
1205 let connection = self.get_connection()?;
1206
1207 let mut stmt = connection
1208 .prepare(
1209 "SELECT i.record_type
1210 , i.data_id
1211 , i.schema_version
1212 , i.data
1213 , i.revision
1214 , e.schema_version AS existing_schema_version
1215 , e.commit_time AS existing_commit_time
1216 , e.data AS existing_data
1217 , e.revision AS existing_revision
1218 FROM sync_incoming i
1219 LEFT JOIN sync_state e ON i.record_type = e.record_type AND i.data_id = e.data_id
1220 ORDER BY i.revision ASC
1221 LIMIT ?",
1222 )
1223 .map_err(map_sqlite_error)?;
1224
1225 let mut rows = stmt.query(params![limit]).map_err(map_sqlite_error)?;
1226 let mut results = Vec::new();
1227
1228 while let Some(row) = rows.next().map_err(map_sqlite_error)? {
1229 let parent = if let Some(existing_data) =
1230 row.get::<_, Option<String>>(7).map_err(map_sqlite_error)?
1231 {
1232 Some(Record {
1233 id: RecordId::new(
1234 row.get::<_, String>(0).map_err(map_sqlite_error)?,
1235 row.get::<_, String>(1).map_err(map_sqlite_error)?,
1236 ),
1237 schema_version: row.get(5).map_err(map_sqlite_error)?,
1238 revision: row.get(8).map_err(map_sqlite_error)?,
1239 data: serde_json::from_str(&existing_data)?,
1240 })
1241 } else {
1242 None
1243 };
1244 let record = Record {
1245 id: RecordId::new(
1246 row.get::<_, String>(0).map_err(map_sqlite_error)?,
1247 row.get::<_, String>(1).map_err(map_sqlite_error)?,
1248 ),
1249 schema_version: row.get(2).map_err(map_sqlite_error)?,
1250 data: serde_json::from_str(&row.get::<_, String>(3).map_err(map_sqlite_error)?)?,
1251 revision: row.get(4).map_err(map_sqlite_error)?,
1252 };
1253 results.push(IncomingChange {
1254 new_state: record,
1255 old_state: parent,
1256 });
1257 }
1258
1259 Ok(results)
1260 }
1261
1262 async fn get_latest_outgoing_change(&self) -> Result<Option<OutgoingChange>, StorageError> {
1263 let connection = self.get_connection()?;
1264
1265 let mut stmt = connection
1266 .prepare(
1267 "SELECT o.record_type
1268 , o.data_id
1269 , o.schema_version
1270 , o.commit_time
1271 , o.updated_fields_json
1272 , o.revision
1273 , e.schema_version AS existing_schema_version
1274 , e.commit_time AS existing_commit_time
1275 , e.data AS existing_data
1276 , e.revision AS existing_revision
1277 FROM sync_outgoing o
1278 LEFT JOIN sync_state e ON o.record_type = e.record_type AND o.data_id = e.data_id
1279 ORDER BY o.revision DESC
1280 LIMIT 1",
1281 )
1282 .map_err(map_sqlite_error)?;
1283
1284 let mut rows = stmt.query([]).map_err(map_sqlite_error)?;
1285
1286 if let Some(row) = rows.next().map_err(map_sqlite_error)? {
1287 let parent = if let Some(existing_data) =
1288 row.get::<_, Option<String>>(8).map_err(map_sqlite_error)?
1289 {
1290 Some(Record {
1291 id: RecordId::new(
1292 row.get::<_, String>(0).map_err(map_sqlite_error)?,
1293 row.get::<_, String>(1).map_err(map_sqlite_error)?,
1294 ),
1295 schema_version: row.get(6).map_err(map_sqlite_error)?,
1296 revision: row.get(9).map_err(map_sqlite_error)?,
1297 data: serde_json::from_str(&existing_data)?,
1298 })
1299 } else {
1300 None
1301 };
1302 let change = RecordChange {
1303 id: RecordId::new(
1304 row.get::<_, String>(0).map_err(map_sqlite_error)?,
1305 row.get::<_, String>(1).map_err(map_sqlite_error)?,
1306 ),
1307 schema_version: row.get(2).map_err(map_sqlite_error)?,
1308 updated_fields: serde_json::from_str(
1309 &row.get::<_, String>(4).map_err(map_sqlite_error)?,
1310 )?,
1311 local_revision: row.get(5).map_err(map_sqlite_error)?,
1312 };
1313
1314 return Ok(Some(OutgoingChange { change, parent }));
1315 }
1316
1317 Ok(None)
1318 }
1319
1320 async fn update_record_from_incoming(&self, record: Record) -> Result<(), StorageError> {
1321 let mut connection = self.get_connection()?;
1322 let tx = connection
1323 .transaction_with_behavior(TransactionBehavior::Immediate)
1324 .map_err(map_sqlite_error)?;
1325
1326 tx.execute(
1327 "INSERT OR REPLACE INTO sync_state (
1328 record_type
1329 , data_id
1330 , schema_version
1331 , commit_time
1332 , data
1333 , revision
1334 )
1335 VALUES (?, ?, ?, strftime('%s','now'), ?, ?)",
1336 params![
1337 record.id.r#type,
1338 record.id.data_id,
1339 record.schema_version.clone(),
1340 serde_json::to_string(&record.data)?,
1341 record.revision,
1342 ],
1343 )
1344 .map_err(map_sqlite_error)?;
1345
1346 tx.execute(
1347 "UPDATE sync_revision SET revision = MAX(revision, ?)",
1348 params![record.revision],
1349 )
1350 .map_err(map_sqlite_error)?;
1351
1352 tx.commit().map_err(map_sqlite_error)?;
1353 Ok(())
1354 }
1355}
1356
1357const SELECT_PAYMENT_SQL: &str = "
1360 SELECT p.id,
1361 p.payment_type,
1362 p.status,
1363 p.amount,
1364 p.fees,
1365 p.timestamp,
1366 p.method,
1367 p.withdraw_tx_id,
1368 p.deposit_tx_id,
1369 p.spark,
1370 l.invoice AS lightning_invoice,
1371 l.payment_hash AS lightning_payment_hash,
1372 l.destination_pubkey AS lightning_destination_pubkey,
1373 COALESCE(l.description, pm.lnurl_description) AS lightning_description,
1374 l.preimage AS lightning_preimage,
1375 l.htlc_status AS lightning_htlc_status,
1376 l.htlc_expiry_time AS lightning_htlc_expiry_time,
1377 pm.lnurl_pay_info,
1378 pm.lnurl_withdraw_info,
1379 pm.conversion_info,
1380 t.metadata AS token_metadata,
1381 t.tx_hash AS token_tx_hash,
1382 t.tx_type AS token_tx_type,
1383 t.invoice_details AS token_invoice_details,
1384 s.invoice_details AS spark_invoice_details,
1385 s.htlc_details AS spark_htlc_details,
1386 lrm.nostr_zap_request AS lnurl_nostr_zap_request,
1387 lrm.nostr_zap_receipt AS lnurl_nostr_zap_receipt,
1388 lrm.sender_comment AS lnurl_sender_comment,
1389 lrm.payment_hash AS lnurl_payment_hash,
1390 pm.parent_payment_id
1391 FROM payments p
1392 LEFT JOIN payment_details_lightning l ON p.id = l.payment_id
1393 LEFT JOIN payment_details_token t ON p.id = t.payment_id
1394 LEFT JOIN payment_details_spark s ON p.id = s.payment_id
1395 LEFT JOIN payment_metadata pm ON p.id = pm.payment_id
1396 LEFT JOIN lnurl_receive_metadata lrm ON l.payment_hash = lrm.payment_hash";
1397
1398#[allow(clippy::too_many_lines)]
1399fn map_payment(row: &Row<'_>) -> Result<Payment, rusqlite::Error> {
1400 let withdraw_tx_id: Option<String> = row.get(7)?;
1401 let deposit_tx_id: Option<String> = row.get(8)?;
1402 let spark: Option<i32> = row.get(9)?;
1403 let lightning_invoice: Option<String> = row.get(10)?;
1404 let token_metadata: Option<String> = row.get(20)?;
1405 let details = match (
1406 lightning_invoice,
1407 withdraw_tx_id,
1408 deposit_tx_id,
1409 spark,
1410 token_metadata,
1411 ) {
1412 (Some(invoice), _, _, _, _) => {
1413 let payment_hash: String = row.get(11)?;
1414 let destination_pubkey: String = row.get(12)?;
1415 let description: Option<String> = row.get(13)?;
1416 let preimage: Option<String> = row.get(14)?;
1417 let htlc_status: SparkHtlcStatus =
1418 row.get::<_, Option<SparkHtlcStatus>>(15)?.ok_or_else(|| {
1419 rusqlite::Error::FromSqlConversionFailure(
1420 15,
1421 rusqlite::types::Type::Null,
1422 "htlc_status is required for Lightning payments".into(),
1423 )
1424 })?;
1425 let htlc_expiry_time: u64 = row.get(16)?;
1426 let htlc_details = SparkHtlcDetails {
1427 payment_hash,
1428 preimage,
1429 expiry_time: htlc_expiry_time,
1430 status: htlc_status,
1431 };
1432 let lnurl_pay_info: Option<LnurlPayInfo> = row.get(17)?;
1433 let lnurl_withdraw_info: Option<LnurlWithdrawInfo> = row.get(18)?;
1434 let lnurl_nostr_zap_request: Option<String> = row.get(26)?;
1435 let lnurl_nostr_zap_receipt: Option<String> = row.get(27)?;
1436 let lnurl_sender_comment: Option<String> = row.get(28)?;
1437 let lnurl_payment_hash: Option<String> = row.get(29)?;
1438 let lnurl_receive_metadata = if lnurl_payment_hash.is_some() {
1439 Some(LnurlReceiveMetadata {
1440 nostr_zap_request: lnurl_nostr_zap_request,
1441 nostr_zap_receipt: lnurl_nostr_zap_receipt,
1442 sender_comment: lnurl_sender_comment,
1443 })
1444 } else {
1445 None
1446 };
1447 Some(PaymentDetails::Lightning {
1448 invoice,
1449 destination_pubkey,
1450 description,
1451 htlc_details,
1452 lnurl_pay_info,
1453 lnurl_withdraw_info,
1454 lnurl_receive_metadata,
1455 })
1456 }
1457 (_, Some(tx_id), _, _, _) => Some(PaymentDetails::Withdraw { tx_id }),
1458 (_, _, Some(tx_id), _, _) => Some(PaymentDetails::Deposit { tx_id }),
1459 (_, _, _, Some(_), _) => {
1460 let invoice_details_str: Option<String> = row.get(24)?;
1461 let invoice_details = invoice_details_str
1462 .map(|s| serde_json_from_str(&s, 24))
1463 .transpose()?;
1464 let htlc_details_str: Option<String> = row.get(25)?;
1465 let htlc_details = htlc_details_str
1466 .map(|s| serde_json_from_str(&s, 25))
1467 .transpose()?;
1468 let conversion_info_str: Option<String> = row.get(19)?;
1469 let conversion_info: Option<ConversionInfo> = conversion_info_str
1470 .map(|s: String| serde_json_from_str(&s, 19))
1471 .transpose()?;
1472 Some(PaymentDetails::Spark {
1473 invoice_details,
1474 htlc_details,
1475 conversion_info,
1476 })
1477 }
1478 (_, _, _, _, Some(metadata)) => {
1479 let tx_type: TokenTransactionType = row.get(22)?;
1480 let invoice_details_str: Option<String> = row.get(23)?;
1481 let invoice_details = invoice_details_str
1482 .map(|s| serde_json_from_str(&s, 23))
1483 .transpose()?;
1484 let conversion_info_str: Option<String> = row.get(19)?;
1485 let conversion_info: Option<ConversionInfo> = conversion_info_str
1486 .map(|s: String| serde_json_from_str(&s, 19))
1487 .transpose()?;
1488 Some(PaymentDetails::Token {
1489 metadata: serde_json_from_str(&metadata, 20)?,
1490 tx_hash: row.get(21)?,
1491 tx_type,
1492 invoice_details,
1493 conversion_info,
1494 })
1495 }
1496 _ => None,
1497 };
1498 Ok(Payment {
1499 id: row.get(0)?,
1500 payment_type: row.get::<_, String>(1)?.parse().map_err(|e: String| {
1501 rusqlite::Error::FromSqlConversionFailure(1, rusqlite::types::Type::Text, e.into())
1502 })?,
1503 status: row.get::<_, String>(2)?.parse().map_err(|e: String| {
1504 rusqlite::Error::FromSqlConversionFailure(2, rusqlite::types::Type::Text, e.into())
1505 })?,
1506 amount: row.get::<_, U128SqlWrapper>(3)?.0,
1507 fees: row.get::<_, U128SqlWrapper>(4)?.0,
1508 timestamp: row.get(5)?,
1509 details,
1510 method: row.get(6)?,
1511 conversion_details: None,
1512 })
1513}
1514
1515impl ToSql for PaymentDetails {
1516 fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
1517 to_sql_json(self)
1518 }
1519}
1520
1521impl FromSql for PaymentDetails {
1522 fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
1523 from_sql_json(value)
1524 }
1525}
1526
1527impl ToSql for PaymentMethod {
1528 fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
1529 Ok(rusqlite::types::ToSqlOutput::from(self.to_string()))
1530 }
1531}
1532
1533impl FromSql for PaymentMethod {
1534 fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
1535 match value {
1536 ValueRef::Text(i) => {
1537 let s = std::str::from_utf8(i).map_err(|e| FromSqlError::Other(Box::new(e)))?;
1538 let payment_method: PaymentMethod = s
1540 .trim_matches('"')
1541 .to_lowercase()
1542 .parse()
1543 .map_err(|()| FromSqlError::InvalidType)?;
1544 Ok(payment_method)
1545 }
1546 _ => Err(FromSqlError::InvalidType),
1547 }
1548 }
1549}
1550
1551impl ToSql for TokenTransactionType {
1552 fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
1553 Ok(rusqlite::types::ToSqlOutput::from(self.to_string()))
1554 }
1555}
1556
1557impl FromSql for TokenTransactionType {
1558 fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
1559 match value {
1560 ValueRef::Text(i) => {
1561 let s = std::str::from_utf8(i).map_err(|e| FromSqlError::Other(Box::new(e)))?;
1562 let tx_type: TokenTransactionType =
1563 s.parse().map_err(|_: String| FromSqlError::InvalidType)?;
1564 Ok(tx_type)
1565 }
1566 _ => Err(FromSqlError::InvalidType),
1567 }
1568 }
1569}
1570
1571impl ToSql for SparkHtlcStatus {
1572 fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
1573 Ok(rusqlite::types::ToSqlOutput::from(self.to_string()))
1574 }
1575}
1576
1577impl FromSql for SparkHtlcStatus {
1578 fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
1579 match value {
1580 ValueRef::Text(i) => {
1581 let s = std::str::from_utf8(i).map_err(|e| FromSqlError::Other(Box::new(e)))?;
1582 let status: SparkHtlcStatus =
1583 s.parse().map_err(|_: String| FromSqlError::InvalidType)?;
1584 Ok(status)
1585 }
1586 _ => Err(FromSqlError::InvalidType),
1587 }
1588 }
1589}
1590
1591impl ToSql for DepositClaimError {
1592 fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
1593 to_sql_json(self)
1594 }
1595}
1596
1597impl FromSql for DepositClaimError {
1598 fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
1599 from_sql_json(value)
1600 }
1601}
1602
1603impl ToSql for LnurlPayInfo {
1604 fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
1605 to_sql_json(self)
1606 }
1607}
1608
1609impl FromSql for LnurlPayInfo {
1610 fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
1611 from_sql_json(value)
1612 }
1613}
1614
1615impl ToSql for LnurlWithdrawInfo {
1616 fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
1617 to_sql_json(self)
1618 }
1619}
1620
1621impl FromSql for LnurlWithdrawInfo {
1622 fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
1623 from_sql_json(value)
1624 }
1625}
1626
1627fn to_sql_json<T>(value: T) -> rusqlite::Result<ToSqlOutput<'static>>
1628where
1629 T: serde::Serialize,
1630{
1631 let json = serde_json::to_string(&value)
1632 .map_err(|e| rusqlite::Error::ToSqlConversionFailure(Box::new(e)))?;
1633 Ok(rusqlite::types::ToSqlOutput::from(json))
1634}
1635
1636fn from_sql_json<T>(value: ValueRef<'_>) -> FromSqlResult<T>
1637where
1638 T: serde::de::DeserializeOwned,
1639{
1640 match value {
1641 ValueRef::Text(i) => {
1642 let s = std::str::from_utf8(i).map_err(|e| FromSqlError::Other(Box::new(e)))?;
1643 let deserialized: T = serde_json::from_str(s).map_err(|_| FromSqlError::InvalidType)?;
1644 Ok(deserialized)
1645 }
1646 _ => Err(FromSqlError::InvalidType),
1647 }
1648}
1649
1650fn serde_json_from_str<T>(value: &str, index: usize) -> Result<T, rusqlite::Error>
1651where
1652 T: serde::de::DeserializeOwned,
1653{
1654 serde_json::from_str(value).map_err(|e| {
1655 rusqlite::Error::FromSqlConversionFailure(index, rusqlite::types::Type::Text, Box::new(e))
1656 })
1657}
1658
1659struct U128SqlWrapper(u128);
1660
1661impl ToSql for U128SqlWrapper {
1662 fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
1663 let string = self.0.to_string();
1664 Ok(rusqlite::types::ToSqlOutput::from(string))
1665 }
1666}
1667
1668impl FromSql for U128SqlWrapper {
1669 fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
1670 match value {
1671 ValueRef::Text(i) => {
1672 let s = std::str::from_utf8(i).map_err(|e| FromSqlError::Other(Box::new(e)))?;
1673 let integer = s.parse::<u128>().map_err(|_| FromSqlError::InvalidType)?;
1674 Ok(U128SqlWrapper(integer))
1675 }
1676 _ => Err(FromSqlError::InvalidType),
1677 }
1678 }
1679}
1680
1681#[cfg(test)]
1682mod tests {
1683
1684 use crate::SqliteStorage;
1685 use std::path::PathBuf;
1686
1687 fn create_temp_dir(name: &str) -> PathBuf {
1690 let mut path = std::env::temp_dir();
1691 path.push(format!("breez-test-{}-{}", name, uuid::Uuid::new_v4()));
1693 std::fs::create_dir_all(&path).unwrap();
1694 path
1695 }
1696
1697 #[tokio::test]
1698 async fn test_storage() {
1699 let temp_dir = create_temp_dir("sqlite_storage");
1700 let storage = SqliteStorage::new(&temp_dir).unwrap();
1701
1702 Box::pin(crate::persist::tests::test_storage(Box::new(storage))).await;
1703 }
1704
1705 #[tokio::test]
1706 async fn test_unclaimed_deposits_crud() {
1707 let temp_dir = create_temp_dir("sqlite_storage_deposits");
1708 let storage = SqliteStorage::new(&temp_dir).unwrap();
1709
1710 crate::persist::tests::test_unclaimed_deposits_crud(Box::new(storage)).await;
1711 }
1712
1713 #[tokio::test]
1714 async fn test_deposit_refunds() {
1715 let temp_dir = create_temp_dir("sqlite_storage_refund_tx");
1716 let storage = SqliteStorage::new(&temp_dir).unwrap();
1717
1718 crate::persist::tests::test_deposit_refunds(Box::new(storage)).await;
1719 }
1720
1721 #[tokio::test]
1722 async fn test_payment_type_filtering() {
1723 let temp_dir = create_temp_dir("sqlite_storage_type_filter");
1724 let storage = SqliteStorage::new(&temp_dir).unwrap();
1725
1726 crate::persist::tests::test_payment_type_filtering(Box::new(storage)).await;
1727 }
1728
1729 #[tokio::test]
1730 async fn test_payment_status_filtering() {
1731 let temp_dir = create_temp_dir("sqlite_storage_status_filter");
1732 let storage = SqliteStorage::new(&temp_dir).unwrap();
1733
1734 crate::persist::tests::test_payment_status_filtering(Box::new(storage)).await;
1735 }
1736
1737 #[tokio::test]
1738 async fn test_payment_asset_filtering() {
1739 let temp_dir = create_temp_dir("sqlite_storage_asset_filter");
1740 let storage = SqliteStorage::new(&temp_dir).unwrap();
1741
1742 crate::persist::tests::test_asset_filtering(Box::new(storage)).await;
1743 }
1744
1745 #[tokio::test]
1746 async fn test_timestamp_filtering() {
1747 let temp_dir = create_temp_dir("sqlite_storage_timestamp_filter");
1748 let storage = SqliteStorage::new(&temp_dir).unwrap();
1749
1750 crate::persist::tests::test_timestamp_filtering(Box::new(storage)).await;
1751 }
1752
1753 #[tokio::test]
1754 async fn test_spark_htlc_status_filtering() {
1755 let temp_dir = create_temp_dir("sqlite_storage_htlc_filter");
1756 let storage = SqliteStorage::new(&temp_dir).unwrap();
1757
1758 crate::persist::tests::test_spark_htlc_status_filtering(Box::new(storage)).await;
1759 }
1760
1761 #[tokio::test]
1762 async fn test_lightning_htlc_details_and_status_filtering() {
1763 let temp_dir = create_temp_dir("sqlite_storage_htlc_details");
1764 let storage = SqliteStorage::new(&temp_dir).unwrap();
1765
1766 crate::persist::tests::test_lightning_htlc_details_and_status_filtering(Box::new(storage))
1767 .await;
1768 }
1769
1770 #[tokio::test]
1771 async fn test_conversion_refund_needed_filtering() {
1772 let temp_dir = create_temp_dir("sqlite_storage_conversion_refund_needed_filter");
1773 let storage = SqliteStorage::new(&temp_dir).unwrap();
1774
1775 crate::persist::tests::test_conversion_refund_needed_filtering(Box::new(storage)).await;
1776 }
1777
1778 #[tokio::test]
1779 async fn test_token_transaction_type_filtering() {
1780 let temp_dir = create_temp_dir("sqlite_storage_token_transaction_type_filter");
1781 let storage = SqliteStorage::new(&temp_dir).unwrap();
1782
1783 crate::persist::tests::test_token_transaction_type_filtering(Box::new(storage)).await;
1784 }
1785
1786 #[tokio::test]
1787 async fn test_combined_filters() {
1788 let temp_dir = create_temp_dir("sqlite_storage_combined_filter");
1789 let storage = SqliteStorage::new(&temp_dir).unwrap();
1790
1791 crate::persist::tests::test_combined_filters(Box::new(storage)).await;
1792 }
1793
1794 #[tokio::test]
1795 async fn test_sort_order() {
1796 let temp_dir = create_temp_dir("sqlite_storage_sort_order");
1797 let storage = SqliteStorage::new(&temp_dir).unwrap();
1798
1799 crate::persist::tests::test_sort_order(Box::new(storage)).await;
1800 }
1801
1802 #[tokio::test]
1803 async fn test_payment_metadata() {
1804 let temp_dir = create_temp_dir("sqlite_storage_payment_request_metadata");
1805 let storage = SqliteStorage::new(&temp_dir).unwrap();
1806
1807 crate::persist::tests::test_payment_metadata(Box::new(storage)).await;
1808 }
1809
1810 #[tokio::test]
1811 async fn test_payment_details_update_persistence() {
1812 let temp_dir = create_temp_dir("sqlite_storage_payment_details_update");
1813 let storage = SqliteStorage::new(&temp_dir).unwrap();
1814
1815 crate::persist::tests::test_payment_details_update_persistence(Box::new(storage)).await;
1816 }
1817
1818 #[tokio::test]
1819 async fn test_pending_lnurl_preimages() {
1820 let temp_dir = create_temp_dir("sqlite_storage_pending_lnurl_preimages");
1821 let storage = SqliteStorage::new(&temp_dir).unwrap();
1822
1823 crate::persist::tests::test_pending_lnurl_preimages(Box::new(storage)).await;
1824 }
1825
1826 #[tokio::test]
1827 async fn test_sync_storage() {
1828 let temp_dir = create_temp_dir("sqlite_sync_storage");
1829 let storage = SqliteStorage::new(&temp_dir).unwrap();
1830
1831 crate::persist::tests::test_sync_storage(Box::new(storage)).await;
1832 }
1833
1834 #[tokio::test]
1835 async fn test_payment_metadata_merge() {
1836 let temp_dir = create_temp_dir("sqlite_payment_metadata_merge");
1837 let storage = SqliteStorage::new(&temp_dir).unwrap();
1838
1839 crate::persist::tests::test_payment_metadata_merge(Box::new(storage)).await;
1840 }
1841
1842 #[tokio::test]
1843 #[allow(clippy::too_many_lines)]
1844 async fn test_migration_tx_type() {
1845 use crate::{
1846 Payment, PaymentDetails, PaymentMethod, PaymentStatus, PaymentType, Storage,
1847 TokenMetadata, TokenTransactionType,
1848 persist::{StorageListPaymentsRequest, StoragePaymentDetailsFilter},
1849 };
1850 use rusqlite::{Connection, params};
1851 use rusqlite_migration::{M, Migrations};
1852
1853 let temp_dir = create_temp_dir("sqlite_migration_tx_type");
1854 let db_path = temp_dir.join(super::DEFAULT_DB_FILENAME);
1855
1856 {
1858 let mut conn = Connection::open(&db_path).unwrap();
1859 let migrations_before_tx_type: Vec<_> = SqliteStorage::current_migrations()
1860 .iter()
1861 .take(22) .map(|s| M::up(s))
1863 .collect();
1864 let migrations = Migrations::new(migrations_before_tx_type);
1865 migrations.to_latest(&mut conn).unwrap();
1866 }
1867
1868 {
1870 let conn = Connection::open(&db_path).unwrap();
1871
1872 conn.execute(
1874 "INSERT INTO payments (id, payment_type, status, amount, fees, timestamp, method)
1875 VALUES (?, ?, ?, ?, ?, ?, ?)",
1876 params![
1877 "token-migration-test",
1878 "send",
1879 "completed",
1880 "5000",
1881 "10",
1882 1_234_567_890_i64,
1883 "\"token\""
1884 ],
1885 )
1886 .unwrap();
1887
1888 let metadata = serde_json::json!({
1890 "identifier": "test-token-id",
1891 "issuer_public_key": format!("02{}", "a".repeat(64)),
1892 "name": "Test Token",
1893 "ticker": "TST",
1894 "decimals": 8,
1895 "max_supply": 1_000_000_u128,
1896 "is_freezable": false
1897 });
1898
1899 conn.execute(
1900 "INSERT INTO payment_details_token (payment_id, metadata, tx_hash)
1901 VALUES (?, ?, ?)",
1902 params![
1903 "token-migration-test",
1904 metadata.to_string(),
1905 "0xabcdef1234567890"
1906 ],
1907 )
1908 .unwrap();
1909 }
1910
1911 let storage = SqliteStorage::new(&temp_dir).unwrap();
1913
1914 let migrated_payment = storage
1916 .get_payment_by_id("token-migration-test".to_string())
1917 .await
1918 .unwrap();
1919
1920 assert_eq!(migrated_payment.id, "token-migration-test");
1921 assert_eq!(migrated_payment.amount, 5000);
1922 assert_eq!(migrated_payment.fees, 10);
1923 assert_eq!(migrated_payment.status, PaymentStatus::Completed);
1924 assert_eq!(migrated_payment.payment_type, PaymentType::Send);
1925 assert_eq!(migrated_payment.method, PaymentMethod::Token);
1926
1927 match migrated_payment.details {
1929 Some(PaymentDetails::Token {
1930 metadata,
1931 tx_hash,
1932 tx_type,
1933 ..
1934 }) => {
1935 assert_eq!(metadata.identifier, "test-token-id");
1936 assert_eq!(metadata.name, "Test Token");
1937 assert_eq!(metadata.ticker, "TST");
1938 assert_eq!(metadata.decimals, 8);
1939 assert_eq!(tx_hash, "0xabcdef1234567890");
1940 assert_eq!(
1942 tx_type,
1943 TokenTransactionType::Transfer,
1944 "Migration should add default txType 'transfer' to token payments"
1945 );
1946 }
1947 _ => panic!("Expected Token payment details"),
1948 }
1949
1950 let new_payment = Payment {
1952 id: "new-token-payment".to_string(),
1953 payment_type: PaymentType::Receive,
1954 status: PaymentStatus::Completed,
1955 amount: 8000,
1956 fees: 20,
1957 timestamp: 1_234_567_891,
1958 method: PaymentMethod::Token,
1959 details: Some(PaymentDetails::Token {
1960 metadata: TokenMetadata {
1961 identifier: "another-token-id".to_string(),
1962 issuer_public_key: format!("02{}", "b".repeat(64)),
1963 name: "Another Token".to_string(),
1964 ticker: "ATK".to_string(),
1965 decimals: 6,
1966 max_supply: 2_000_000,
1967 is_freezable: true,
1968 },
1969 tx_hash: "0x1111222233334444".to_string(),
1970 tx_type: TokenTransactionType::Mint,
1971 invoice_details: None,
1972 conversion_info: None,
1973 }),
1974 conversion_details: None,
1975 };
1976
1977 storage.insert_payment(new_payment).await.unwrap();
1978
1979 let request = StorageListPaymentsRequest {
1981 type_filter: None,
1982 status_filter: None,
1983 asset_filter: None,
1984 payment_details_filter: None,
1985 from_timestamp: None,
1986 to_timestamp: None,
1987 offset: None,
1988 limit: None,
1989 sort_ascending: Some(true),
1990 };
1991
1992 let payments = storage.list_payments(request).await.unwrap();
1993 assert_eq!(payments.len(), 2, "Should have both payments");
1994
1995 let migrated = payments
1997 .iter()
1998 .find(|p| p.id == "token-migration-test")
1999 .unwrap();
2000 match &migrated.details {
2001 Some(PaymentDetails::Token { tx_type, .. }) => {
2002 assert_eq!(*tx_type, TokenTransactionType::Transfer);
2003 }
2004 _ => panic!("Expected Token payment details"),
2005 }
2006
2007 let new = payments
2009 .iter()
2010 .find(|p| p.id == "new-token-payment")
2011 .unwrap();
2012 match &new.details {
2013 Some(PaymentDetails::Token { tx_type, .. }) => {
2014 assert_eq!(*tx_type, TokenTransactionType::Mint);
2015 }
2016 _ => panic!("Expected Token payment details"),
2017 }
2018
2019 let transfer_filter = StorageListPaymentsRequest {
2021 type_filter: None,
2022 status_filter: None,
2023 asset_filter: None,
2024 payment_details_filter: Some(vec![StoragePaymentDetailsFilter::Token {
2025 conversion_refund_needed: None,
2026 tx_hash: None,
2027 tx_type: Some(TokenTransactionType::Transfer),
2028 }]),
2029 from_timestamp: None,
2030 to_timestamp: None,
2031 offset: None,
2032 limit: None,
2033 sort_ascending: Some(true),
2034 };
2035
2036 let transfer_payments = storage.list_payments(transfer_filter).await.unwrap();
2037 assert_eq!(
2038 transfer_payments.len(),
2039 1,
2040 "Should find only the Transfer payment"
2041 );
2042 assert_eq!(transfer_payments[0].id, "token-migration-test");
2043 }
2044
2045 #[tokio::test]
2046 #[allow(clippy::too_many_lines)]
2047 async fn test_migration_htlc_details() {
2048 use crate::{
2049 PaymentDetails, SparkHtlcStatus, Storage,
2050 persist::{StorageListPaymentsRequest, StoragePaymentDetailsFilter},
2051 };
2052 use rusqlite::{Connection, params};
2053 use rusqlite_migration::{M, Migrations};
2054
2055 let temp_dir = create_temp_dir("sqlite_migration_htlc_details");
2056 let db_path = temp_dir.join(super::DEFAULT_DB_FILENAME);
2057
2058 {
2062 let mut conn = Connection::open(&db_path).unwrap();
2063 let migrations_before_backfill: Vec<_> = SqliteStorage::current_migrations()
2064 .iter()
2065 .take(23) .map(|s| M::up(s))
2067 .collect();
2068 let migrations = Migrations::new(migrations_before_backfill);
2069 migrations.to_latest(&mut conn).unwrap();
2070 }
2071
2072 {
2074 let conn = Connection::open(&db_path).unwrap();
2075
2076 conn.execute(
2078 "INSERT INTO payments (id, payment_type, status, amount, fees, timestamp, method)
2079 VALUES (?, ?, ?, ?, ?, ?, ?)",
2080 params![
2081 "ln-completed",
2082 "send",
2083 "completed",
2084 "1000",
2085 "10",
2086 1_700_000_001_i64,
2087 "\"lightning\""
2088 ],
2089 )
2090 .unwrap();
2091 conn.execute(
2092 "INSERT INTO payment_details_lightning (payment_id, invoice, payment_hash, destination_pubkey, preimage)
2093 VALUES (?, ?, ?, ?, ?)",
2094 params![
2095 "ln-completed",
2096 "lnbc_completed",
2097 "hash_completed_0123456789abcdef0123456789abcdef0123456789abcdef01234567",
2098 "03pubkey1",
2099 "preimage_completed"
2100 ],
2101 )
2102 .unwrap();
2103
2104 conn.execute(
2106 "INSERT INTO payments (id, payment_type, status, amount, fees, timestamp, method)
2107 VALUES (?, ?, ?, ?, ?, ?, ?)",
2108 params![
2109 "ln-pending",
2110 "receive",
2111 "pending",
2112 "2000",
2113 "0",
2114 1_700_000_002_i64,
2115 "\"lightning\""
2116 ],
2117 )
2118 .unwrap();
2119 conn.execute(
2120 "INSERT INTO payment_details_lightning (payment_id, invoice, payment_hash, destination_pubkey)
2121 VALUES (?, ?, ?, ?)",
2122 params![
2123 "ln-pending",
2124 "lnbc_pending",
2125 "hash_pending_0123456789abcdef0123456789abcdef0123456789abcdef012345678",
2126 "03pubkey2"
2127 ],
2128 )
2129 .unwrap();
2130
2131 conn.execute(
2133 "INSERT INTO payments (id, payment_type, status, amount, fees, timestamp, method)
2134 VALUES (?, ?, ?, ?, ?, ?, ?)",
2135 params![
2136 "ln-failed",
2137 "send",
2138 "failed",
2139 "3000",
2140 "5",
2141 1_700_000_003_i64,
2142 "\"lightning\""
2143 ],
2144 )
2145 .unwrap();
2146 conn.execute(
2147 "INSERT INTO payment_details_lightning (payment_id, invoice, payment_hash, destination_pubkey)
2148 VALUES (?, ?, ?, ?)",
2149 params![
2150 "ln-failed",
2151 "lnbc_failed",
2152 "hash_failed_0123456789abcdef0123456789abcdef0123456789abcdef0123456789",
2153 "03pubkey3"
2154 ],
2155 )
2156 .unwrap();
2157 }
2158
2159 let storage = SqliteStorage::new(&temp_dir).unwrap();
2161
2162 let completed = storage
2164 .get_payment_by_id("ln-completed".to_string())
2165 .await
2166 .unwrap();
2167 match &completed.details {
2168 Some(PaymentDetails::Lightning { htlc_details, .. }) => {
2169 assert_eq!(htlc_details.status, SparkHtlcStatus::PreimageShared);
2170 assert_eq!(htlc_details.expiry_time, 0);
2171 assert_eq!(
2172 htlc_details.payment_hash,
2173 "hash_completed_0123456789abcdef0123456789abcdef0123456789abcdef01234567"
2174 );
2175 assert_eq!(htlc_details.preimage.as_deref(), Some("preimage_completed"));
2176 }
2177 _ => panic!("Expected Lightning payment details for ln-completed"),
2178 }
2179
2180 let pending = storage
2182 .get_payment_by_id("ln-pending".to_string())
2183 .await
2184 .unwrap();
2185 match &pending.details {
2186 Some(PaymentDetails::Lightning { htlc_details, .. }) => {
2187 assert_eq!(htlc_details.status, SparkHtlcStatus::WaitingForPreimage);
2188 assert_eq!(htlc_details.expiry_time, 0);
2189 assert_eq!(
2190 htlc_details.payment_hash,
2191 "hash_pending_0123456789abcdef0123456789abcdef0123456789abcdef012345678"
2192 );
2193 assert!(htlc_details.preimage.is_none());
2194 }
2195 _ => panic!("Expected Lightning payment details for ln-pending"),
2196 }
2197
2198 let failed = storage
2200 .get_payment_by_id("ln-failed".to_string())
2201 .await
2202 .unwrap();
2203 match &failed.details {
2204 Some(PaymentDetails::Lightning { htlc_details, .. }) => {
2205 assert_eq!(htlc_details.status, SparkHtlcStatus::Returned);
2206 assert_eq!(htlc_details.expiry_time, 0);
2207 }
2208 _ => panic!("Expected Lightning payment details for ln-failed"),
2209 }
2210
2211 let waiting_payments = storage
2213 .list_payments(StorageListPaymentsRequest {
2214 payment_details_filter: Some(vec![StoragePaymentDetailsFilter::Lightning {
2215 htlc_status: Some(vec![SparkHtlcStatus::WaitingForPreimage]),
2216 has_lnurl_preimage: None,
2217 }]),
2218 ..Default::default()
2219 })
2220 .await
2221 .unwrap();
2222 assert_eq!(waiting_payments.len(), 1);
2223 assert_eq!(waiting_payments[0].id, "ln-pending");
2224
2225 let preimage_shared = storage
2226 .list_payments(StorageListPaymentsRequest {
2227 payment_details_filter: Some(vec![StoragePaymentDetailsFilter::Lightning {
2228 htlc_status: Some(vec![SparkHtlcStatus::PreimageShared]),
2229 has_lnurl_preimage: None,
2230 }]),
2231 ..Default::default()
2232 })
2233 .await
2234 .unwrap();
2235 assert_eq!(preimage_shared.len(), 1);
2236 assert_eq!(preimage_shared[0].id, "ln-completed");
2237
2238 let returned = storage
2239 .list_payments(StorageListPaymentsRequest {
2240 payment_details_filter: Some(vec![StoragePaymentDetailsFilter::Lightning {
2241 htlc_status: Some(vec![SparkHtlcStatus::Returned]),
2242 has_lnurl_preimage: None,
2243 }]),
2244 ..Default::default()
2245 })
2246 .await
2247 .unwrap();
2248 assert_eq!(returned.len(), 1);
2249 assert_eq!(returned[0].id, "ln-failed");
2250 }
2251
2252 #[tokio::test]
2253 async fn test_contacts_crud() {
2254 let temp_dir = create_temp_dir("contacts_crud");
2255 let storage = SqliteStorage::new(&temp_dir).unwrap();
2256
2257 crate::persist::tests::test_contacts_crud(Box::new(storage)).await;
2258 }
2259}