1use std::path::{Path, PathBuf};
2
3use macros::async_trait;
4use rusqlite::{
5 Connection, Row, ToSql, Transaction, params,
6 types::{FromSql, FromSqlError, FromSqlResult, ToSqlOutput, ValueRef},
7};
8use rusqlite_migration::{M, Migrations, SchemaVersion};
9
10use crate::{
11 AssetFilter, ConversionInfo, DepositInfo, ListPaymentsRequest, LnurlPayInfo,
12 LnurlReceiveMetadata, LnurlWithdrawInfo, PaymentDetails, PaymentDetailsFilter, PaymentMethod,
13 error::DepositClaimError,
14 persist::{PaymentMetadata, SetLnurlMetadataItem, UpdateDepositPayload},
15 sync_storage::{
16 IncomingChange, OutgoingChange, Record, RecordChange, RecordId, SyncStorage,
17 SyncStorageError, UnversionedRecordChange,
18 },
19};
20
21use super::{Payment, Storage, StorageError};
22
23const DEFAULT_DB_FILENAME: &str = "storage.sql";
24pub struct SqliteStorage {
26 db_dir: PathBuf,
27}
28
29impl SqliteStorage {
30 pub fn new(path: &Path) -> Result<Self, StorageError> {
40 let storage = Self {
41 db_dir: path.to_path_buf(),
42 };
43
44 #[cfg(not(all(target_family = "wasm", target_os = "unknown")))]
45 std::fs::create_dir_all(path)
46 .map_err(|e| StorageError::InitializationError(e.to_string()))?;
47
48 storage.migrate()?;
49 Ok(storage)
50 }
51
52 pub(crate) fn get_connection(&self) -> Result<Connection, StorageError> {
53 Ok(Connection::open(self.get_db_path())?)
54 }
55
56 fn get_db_path(&self) -> PathBuf {
57 self.db_dir.join(DEFAULT_DB_FILENAME)
58 }
59
60 fn migrate(&self) -> Result<(), StorageError> {
61 let migrations =
62 Migrations::new(Self::current_migrations().into_iter().map(M::up).collect());
63 let mut conn = self.get_connection()?;
64 let previous_version = match migrations.current_version(&conn)? {
65 SchemaVersion::Inside(previous_version) => previous_version.get(),
66 _ => 0,
67 };
68 migrations.to_latest(&mut conn)?;
69
70 if previous_version < 6 {
71 Self::migrate_lnurl_metadata_description(&mut conn)?;
72 }
73
74 Ok(())
75 }
76
77 fn migrate_lnurl_metadata_description(conn: &mut Connection) -> Result<(), StorageError> {
78 let mut stmt = conn.prepare("SELECT payment_id, lnurl_pay_info FROM payment_metadata")?;
79 let pay_infos: Vec<_> = stmt
80 .query_map([], |row| {
81 let payment_id: String = row.get(0)?;
82 let lnurl_pay_info: Option<LnurlPayInfo> = row.get(1)?;
83 Ok((payment_id, lnurl_pay_info))
84 })?
85 .collect::<Result<_, _>>()?;
86 let pay_infos = pay_infos
87 .into_iter()
88 .filter_map(|(payment_id, lnurl_pay_info)| {
89 let pay_info = lnurl_pay_info?;
90 let description = pay_info.extract_description()?;
91 Some((payment_id, description))
92 })
93 .collect::<Vec<_>>();
94
95 for pay_info in pay_infos {
96 conn.execute(
97 "UPDATE payment_metadata SET lnurl_description = ? WHERE payment_id = ?",
98 params![pay_info.1, pay_info.0],
99 )?;
100 }
101
102 Ok(())
103 }
104
105 #[allow(clippy::too_many_lines)]
106 pub(crate) fn current_migrations() -> Vec<&'static str> {
107 vec![
108 "CREATE TABLE IF NOT EXISTS payments (
109 id TEXT PRIMARY KEY,
110 payment_type TEXT NOT NULL,
111 status TEXT NOT NULL,
112 amount INTEGER NOT NULL,
113 fees INTEGER NOT NULL,
114 timestamp INTEGER NOT NULL,
115 details TEXT,
116 method TEXT
117 );",
118 "CREATE TABLE IF NOT EXISTS settings (
119 key TEXT PRIMARY KEY,
120 value TEXT NOT NULL
121 );",
122 "CREATE TABLE IF NOT EXISTS unclaimed_deposits (
123 txid TEXT NOT NULL,
124 vout INTEGER NOT NULL,
125 amount_sats INTEGER,
126 claim_error TEXT,
127 refund_tx TEXT,
128 refund_tx_id TEXT,
129 PRIMARY KEY (txid, vout)
130 );",
131 "CREATE TABLE IF NOT EXISTS payment_metadata (
132 payment_id TEXT PRIMARY KEY,
133 lnurl_pay_info TEXT
134 );",
135 "CREATE TABLE IF NOT EXISTS deposit_refunds (
136 deposit_tx_id TEXT NOT NULL,
137 deposit_vout INTEGER NOT NULL,
138 refund_tx TEXT NOT NULL,
139 refund_tx_id TEXT NOT NULL,
140 PRIMARY KEY (deposit_tx_id, deposit_vout)
141 );",
142 "ALTER TABLE payment_metadata ADD COLUMN lnurl_description TEXT;",
143 "
144 ALTER TABLE payments ADD COLUMN withdraw_tx_id TEXT;
145 ALTER TABLE payments ADD COLUMN deposit_tx_id TEXT;
146 ALTER TABLE payments ADD COLUMN spark INTEGER;
147 CREATE TABLE payment_details_lightning (
148 payment_id TEXT PRIMARY KEY,
149 invoice TEXT NOT NULL,
150 payment_hash TEXT NOT NULL,
151 destination_pubkey TEXT NOT NULL,
152 description TEXT,
153 preimage TEXT,
154 FOREIGN KEY (payment_id) REFERENCES payments(id) ON DELETE CASCADE
155 );
156 INSERT INTO payment_details_lightning (payment_id, invoice, payment_hash, destination_pubkey, description, preimage)
157 SELECT id, json_extract(details, '$.Lightning.invoice'), json_extract(details, '$.Lightning.payment_hash'),
158 json_extract(details, '$.Lightning.destination_pubkey'), json_extract(details, '$.Lightning.description'),
159 json_extract(details, '$.Lightning.preimage')
160 FROM payments WHERE json_extract(details, '$.Lightning.invoice') IS NOT NULL;
161
162 UPDATE payments SET withdraw_tx_id = json_extract(details, '$.Withdraw.tx_id')
163 WHERE json_extract(details, '$.Withdraw.tx_id') IS NOT NULL;
164
165 UPDATE payments SET deposit_tx_id = json_extract(details, '$.Deposit.tx_id')
166 WHERE json_extract(details, '$.Deposit.tx_id') IS NOT NULL;
167
168 ALTER TABLE payments DROP COLUMN details;
169
170 CREATE INDEX idx_payment_details_lightning_invoice ON payment_details_lightning(invoice);
171 ",
172 "CREATE TABLE payment_details_token (
173 payment_id TEXT PRIMARY KEY,
174 metadata TEXT NOT NULL,
175 tx_hash TEXT NOT NULL,
176 FOREIGN KEY (payment_id) REFERENCES payments(id) ON DELETE CASCADE
177 );",
178 "CREATE TABLE payments_new (
180 id TEXT PRIMARY KEY,
181 payment_type TEXT NOT NULL,
182 status TEXT NOT NULL,
183 amount TEXT NOT NULL,
184 fees TEXT NOT NULL,
185 timestamp INTEGER NOT NULL,
186 method TEXT,
187 withdraw_tx_id TEXT,
188 deposit_tx_id TEXT,
189 spark INTEGER
190 );",
191 "INSERT INTO payments_new (id, payment_type, status, amount, fees, timestamp, method, withdraw_tx_id, deposit_tx_id, spark)
192 SELECT id, payment_type, status, CAST(amount AS TEXT), CAST(fees AS TEXT), timestamp, method, withdraw_tx_id, deposit_tx_id, spark
193 FROM payments;",
194 "DROP TABLE payments;",
195 "ALTER TABLE payments_new RENAME TO payments;",
196 "CREATE TABLE payment_details_spark (
197 payment_id TEXT NOT NULL PRIMARY KEY,
198 invoice_details TEXT NOT NULL,
199 FOREIGN KEY (payment_id) REFERENCES payments(id) ON DELETE CASCADE
200 );
201 ALTER TABLE payment_details_token ADD COLUMN invoice_details TEXT;",
202 "ALTER TABLE payment_metadata ADD COLUMN lnurl_withdraw_info TEXT;",
203 "CREATE TABLE sync_revision (
204 revision INTEGER NOT NULL DEFAULT 0
205 );
206 INSERT INTO sync_revision (revision) VALUES (0);
207 CREATE TABLE sync_outgoing(
208 record_type TEXT NOT NULL,
209 data_id TEXT NOT NULL,
210 schema_version TEXT NOT NULL,
211 commit_time INTEGER NOT NULL,
212 updated_fields_json TEXT NOT NULL,
213 revision INTEGER NOT NULL
214 );
215 CREATE INDEX idx_sync_outgoing_data_id_record_type ON sync_outgoing(record_type, data_id);
216 CREATE TABLE sync_state(
217 record_type TEXT NOT NULL,
218 data_id TEXT NOT NULL,
219 schema_version TEXT NOT NULL,
220 commit_time INTEGER NOT NULL,
221 data TEXT NOT NULL,
222 revision INTEGER NOT NULL,
223 PRIMARY KEY(record_type, data_id)
224 );",
225 "CREATE TABLE sync_incoming(
226 record_type TEXT NOT NULL,
227 data_id TEXT NOT NULL,
228 schema_version TEXT NOT NULL,
229 commit_time INTEGER NOT NULL,
230 data TEXT NOT NULL,
231 revision INTEGER NOT NULL,
232 PRIMARY KEY(record_type, data_id, revision)
233 );
234 CREATE INDEX idx_sync_incoming_revision ON sync_incoming(revision);",
235 "ALTER TABLE payment_details_spark RENAME TO tmp_payment_details_spark;
236 CREATE TABLE payment_details_spark (
237 payment_id TEXT NOT NULL PRIMARY KEY,
238 invoice_details TEXT,
239 htlc_details TEXT,
240 FOREIGN KEY (payment_id) REFERENCES payments(id) ON DELETE CASCADE
241 );
242 INSERT INTO payment_details_spark (payment_id, invoice_details)
243 SELECT payment_id, invoice_details FROM tmp_payment_details_spark;
244 DROP TABLE tmp_payment_details_spark;",
245 "CREATE TABLE lnurl_receive_metadata (
246 payment_hash TEXT NOT NULL PRIMARY KEY,
247 nostr_zap_request TEXT,
248 nostr_zap_receipt TEXT,
249 sender_comment TEXT
250 );",
251 "DELETE FROM unclaimed_deposits;",
254 "DELETE FROM sync_outgoing;
259 DELETE FROM sync_incoming;
260 DELETE FROM sync_state;
261 UPDATE sync_revision SET revision = 0;
262 DELETE FROM settings WHERE key = 'sync_initial_complete';",
263 "ALTER TABLE payment_metadata ADD COLUMN token_conversion_info TEXT;",
264 "ALTER TABLE payment_metadata ADD COLUMN parent_payment_id TEXT;",
265 "
266 ALTER TABLE payment_metadata DROP COLUMN token_conversion_info;
267 ALTER TABLE payment_metadata ADD COLUMN conversion_info TEXT;
268 ",
269 ]
270 }
271}
272
273impl From<rusqlite::Error> for StorageError {
274 fn from(value: rusqlite::Error) -> Self {
275 StorageError::Implementation(value.to_string())
276 }
277}
278
279impl From<rusqlite_migration::Error> for StorageError {
280 fn from(value: rusqlite_migration::Error) -> Self {
281 StorageError::Implementation(value.to_string())
282 }
283}
284
285#[async_trait]
286impl Storage for SqliteStorage {
287 #[allow(clippy::too_many_lines)]
288 async fn list_payments(
289 &self,
290 request: ListPaymentsRequest,
291 ) -> Result<Vec<Payment>, StorageError> {
292 let connection = self.get_connection()?;
293
294 let mut where_clauses = Vec::new();
296 let mut params: Vec<Box<dyn ToSql>> = Vec::new();
297
298 if let Some(ref type_filter) = request.type_filter
300 && !type_filter.is_empty()
301 {
302 let placeholders = type_filter
303 .iter()
304 .map(|_| "?")
305 .collect::<Vec<_>>()
306 .join(", ");
307 where_clauses.push(format!("p.payment_type IN ({placeholders})"));
308 for payment_type in type_filter {
309 params.push(Box::new(payment_type.to_string()));
310 }
311 }
312
313 if let Some(ref status_filter) = request.status_filter
315 && !status_filter.is_empty()
316 {
317 let placeholders = status_filter
318 .iter()
319 .map(|_| "?")
320 .collect::<Vec<_>>()
321 .join(", ");
322 where_clauses.push(format!("p.status IN ({placeholders})"));
323 for status in status_filter {
324 params.push(Box::new(status.to_string()));
325 }
326 }
327
328 if let Some(from_timestamp) = request.from_timestamp {
330 where_clauses.push("p.timestamp >= ?".to_string());
331 params.push(Box::new(from_timestamp));
332 }
333
334 if let Some(to_timestamp) = request.to_timestamp {
335 where_clauses.push("p.timestamp < ?".to_string());
336 params.push(Box::new(to_timestamp));
337 }
338
339 if let Some(ref asset_filter) = request.asset_filter {
341 match asset_filter {
342 AssetFilter::Bitcoin => {
343 where_clauses.push("t.metadata IS NULL".to_string());
344 }
345 AssetFilter::Token { token_identifier } => {
346 where_clauses.push("t.metadata IS NOT NULL".to_string());
347 if let Some(identifier) = token_identifier {
348 where_clauses
350 .push("json_extract(t.metadata, '$.identifier') = ?".to_string());
351 params.push(Box::new(identifier.clone()));
352 }
353 }
354 }
355 }
356
357 if let Some(ref payment_details_filter) = request.payment_details_filter {
359 let mut all_payment_details_clauses = Vec::new();
360 for payment_details_filter in payment_details_filter {
361 let mut payment_details_clauses = Vec::new();
362 if let PaymentDetailsFilter::Spark {
364 htlc_status: Some(htlc_statuses),
365 ..
366 } = payment_details_filter
367 && !htlc_statuses.is_empty()
368 {
369 let placeholders = htlc_statuses
370 .iter()
371 .map(|_| "?")
372 .collect::<Vec<_>>()
373 .join(", ");
374 payment_details_clauses.push(format!(
375 "json_extract(s.htlc_details, '$.status') IN ({placeholders})"
376 ));
377 for htlc_status in htlc_statuses {
378 params.push(Box::new(htlc_status.to_string()));
379 }
380 }
381 if let PaymentDetailsFilter::Spark {
383 conversion_refund_needed: Some(conversion_refund_needed),
384 ..
385 }
386 | PaymentDetailsFilter::Token {
387 conversion_refund_needed: Some(conversion_refund_needed),
388 ..
389 } = payment_details_filter
390 {
391 let type_check = match payment_details_filter {
392 PaymentDetailsFilter::Spark { .. } => "p.spark = 1",
393 PaymentDetailsFilter::Token { .. } => "p.spark IS NULL",
394 };
395 let refund_needed = if *conversion_refund_needed {
396 "= 'RefundNeeded'"
397 } else {
398 "!= 'RefundNeeded'"
399 };
400 payment_details_clauses.push(format!(
401 "{type_check} AND pm.conversion_info IS NOT NULL AND
402 json_extract(pm.conversion_info, '$.status') {refund_needed}"
403 ));
404 }
405 if let PaymentDetailsFilter::Token {
407 tx_hash: Some(tx_hash),
408 ..
409 } = payment_details_filter
410 {
411 payment_details_clauses.push("t.tx_hash = ?".to_string());
412 params.push(Box::new(tx_hash.clone()));
413 }
414
415 if !payment_details_clauses.is_empty() {
416 all_payment_details_clauses
417 .push(format!("({})", payment_details_clauses.join(" AND ")));
418 }
419 }
420
421 if !all_payment_details_clauses.is_empty() {
422 where_clauses.push(format!("({})", all_payment_details_clauses.join(" OR ")));
423 }
424 }
425
426 let where_sql = if where_clauses.is_empty() {
428 String::new()
429 } else {
430 format!("WHERE {}", where_clauses.join(" AND "))
431 };
432
433 let order_direction = if request.sort_ascending.unwrap_or(false) {
435 "ASC"
436 } else {
437 "DESC"
438 };
439
440 let query = format!(
441 "SELECT p.id
442 , p.payment_type
443 , p.status
444 , p.amount
445 , p.fees
446 , p.timestamp
447 , p.method
448 , p.withdraw_tx_id
449 , p.deposit_tx_id
450 , p.spark
451 , l.invoice AS lightning_invoice
452 , l.payment_hash AS lightning_payment_hash
453 , l.destination_pubkey AS lightning_destination_pubkey
454 , COALESCE(l.description, pm.lnurl_description) AS lightning_description
455 , l.preimage AS lightning_preimage
456 , pm.lnurl_pay_info
457 , pm.lnurl_withdraw_info
458 , pm.conversion_info
459 , t.metadata AS token_metadata
460 , t.tx_hash AS token_tx_hash
461 , t.invoice_details AS token_invoice_details
462 , s.invoice_details AS spark_invoice_details
463 , s.htlc_details AS spark_htlc_details
464 , lrm.nostr_zap_request AS lnurl_nostr_zap_request
465 , lrm.nostr_zap_receipt AS lnurl_nostr_zap_receipt
466 , lrm.sender_comment AS lnurl_sender_comment
467 FROM payments p
468 LEFT JOIN payment_details_lightning l ON p.id = l.payment_id
469 LEFT JOIN payment_details_token t ON p.id = t.payment_id
470 LEFT JOIN payment_details_spark s ON p.id = s.payment_id
471 LEFT JOIN payment_metadata pm ON p.id = pm.payment_id
472 LEFT JOIN lnurl_receive_metadata lrm ON l.payment_hash = lrm.payment_hash
473 {}
474 ORDER BY p.timestamp {}
475 LIMIT {} OFFSET {}",
476 where_sql,
477 order_direction,
478 request.limit.unwrap_or(u32::MAX),
479 request.offset.unwrap_or(0)
480 );
481
482 let mut stmt = connection.prepare(&query)?;
483 let param_refs: Vec<&dyn ToSql> = params.iter().map(std::convert::AsRef::as_ref).collect();
484 let payments = stmt
485 .query_map(param_refs.as_slice(), map_payment)?
486 .collect::<Result<Vec<_>, _>>()?;
487 Ok(payments)
488 }
489
490 #[allow(clippy::too_many_lines)]
491 async fn insert_payment(&self, payment: Payment) -> Result<(), StorageError> {
492 let mut connection = self.get_connection()?;
493 let tx = connection.transaction()?;
494 tx.execute(
495 "INSERT INTO payments (id, payment_type, status, amount, fees, timestamp, method)
496 VALUES (?, ?, ?, ?, ?, ?, ?)
497 ON CONFLICT(id) DO UPDATE SET
498 payment_type=excluded.payment_type,
499 status=excluded.status,
500 amount=excluded.amount,
501 fees=excluded.fees,
502 timestamp=excluded.timestamp,
503 method=excluded.method",
504 params![
505 payment.id,
506 payment.payment_type.to_string(),
507 payment.status.to_string(),
508 U128SqlWrapper(payment.amount),
509 U128SqlWrapper(payment.fees),
510 payment.timestamp,
511 payment.method,
512 ],
513 )?;
514
515 match payment.details {
516 Some(PaymentDetails::Withdraw { tx_id }) => {
517 tx.execute(
518 "UPDATE payments SET withdraw_tx_id = ? WHERE id = ?",
519 params![tx_id, payment.id],
520 )?;
521 }
522 Some(PaymentDetails::Deposit { tx_id }) => {
523 tx.execute(
524 "UPDATE payments SET deposit_tx_id = ? WHERE id = ?",
525 params![tx_id, payment.id],
526 )?;
527 }
528 Some(PaymentDetails::Spark {
529 invoice_details,
530 htlc_details,
531 ..
532 }) => {
533 tx.execute(
534 "UPDATE payments SET spark = 1 WHERE id = ?",
535 params![payment.id],
536 )?;
537 if invoice_details.is_some() || htlc_details.is_some() {
538 tx.execute(
540 "INSERT INTO payment_details_spark (payment_id, invoice_details, htlc_details)
541 VALUES (?, ?, ?)
542 ON CONFLICT(payment_id) DO UPDATE SET
543 invoice_details=COALESCE(excluded.invoice_details, payment_details_spark.invoice_details),
544 htlc_details=COALESCE(excluded.htlc_details, payment_details_spark.htlc_details)",
545 params![
546 payment.id,
547 invoice_details.as_ref().map(serde_json::to_string).transpose()?,
548 htlc_details.as_ref().map(serde_json::to_string).transpose()?,
549 ],
550 )?;
551 }
552 }
553 Some(PaymentDetails::Token {
554 metadata,
555 tx_hash,
556 invoice_details,
557 ..
558 }) => {
559 tx.execute(
560 "INSERT INTO payment_details_token (payment_id, metadata, tx_hash, invoice_details)
561 VALUES (?, ?, ?, ?)
562 ON CONFLICT(payment_id) DO UPDATE SET
563 metadata=excluded.metadata,
564 tx_hash=excluded.tx_hash,
565 invoice_details=COALESCE(excluded.invoice_details, payment_details_token.invoice_details)",
566 params![
567 payment.id,
568 serde_json::to_string(&metadata)?,
569 tx_hash,
570 invoice_details.as_ref().map(serde_json::to_string).transpose()?,
571 ],
572 )?;
573 }
574 Some(PaymentDetails::Lightning {
575 invoice,
576 payment_hash,
577 destination_pubkey,
578 description,
579 preimage,
580 ..
581 }) => {
582 tx.execute(
583 "INSERT INTO payment_details_lightning (payment_id, invoice, payment_hash, destination_pubkey, description, preimage)
584 VALUES (?, ?, ?, ?, ?, ?)
585 ON CONFLICT(payment_id) DO UPDATE SET
586 invoice=excluded.invoice,
587 payment_hash=excluded.payment_hash,
588 destination_pubkey=excluded.destination_pubkey,
589 description=excluded.description,
590 preimage=COALESCE(excluded.preimage, payment_details_lightning.preimage)",
591 params![
592 payment.id,
593 invoice,
594 payment_hash,
595 destination_pubkey,
596 description,
597 preimage,
598 ],
599 )?;
600 }
601 None => {}
602 }
603
604 tx.commit()?;
605 Ok(())
606 }
607
608 async fn set_payment_metadata(
609 &self,
610 payment_id: String,
611 metadata: PaymentMetadata,
612 ) -> Result<(), StorageError> {
613 let connection = self.get_connection()?;
614
615 connection.execute(
616 "INSERT OR REPLACE INTO payment_metadata (payment_id, parent_payment_id, lnurl_pay_info, lnurl_withdraw_info, lnurl_description, conversion_info)
617 VALUES (?, ?, ?, ?, ?, ?)",
618 params![
619 payment_id,
620 metadata.parent_payment_id,
621 metadata.lnurl_pay_info,
622 metadata.lnurl_withdraw_info,
623 metadata.lnurl_description,
624 metadata.conversion_info.as_ref().map(serde_json::to_string).transpose()?,
625 ],
626 )?;
627
628 Ok(())
629 }
630
631 async fn set_cached_item(&self, key: String, value: String) -> Result<(), StorageError> {
632 let connection = self.get_connection()?;
633
634 connection.execute(
635 "INSERT OR REPLACE INTO settings (key, value) VALUES (?, ?)",
636 params![key, value],
637 )?;
638
639 Ok(())
640 }
641
642 async fn get_cached_item(&self, key: String) -> Result<Option<String>, StorageError> {
643 let connection = self.get_connection()?;
644
645 let mut stmt = connection.prepare("SELECT value FROM settings WHERE key = ?")?;
646
647 let result = stmt.query_row(params![key], |row| {
648 let value_str: String = row.get(0)?;
649 Ok(value_str)
650 });
651
652 match result {
653 Ok(value) => Ok(Some(value)),
654 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
655 Err(e) => Err(e.into()),
656 }
657 }
658
659 async fn delete_cached_item(&self, key: String) -> Result<(), StorageError> {
660 let connection = self.get_connection()?;
661
662 connection.execute("DELETE FROM settings WHERE key = ?", params![key])?;
663
664 Ok(())
665 }
666
667 async fn get_payment_by_id(&self, id: String) -> Result<Payment, StorageError> {
668 let connection = self.get_connection()?;
669
670 let mut stmt = connection.prepare(
671 "SELECT p.id
672 , p.payment_type
673 , p.status
674 , p.amount
675 , p.fees
676 , p.timestamp
677 , p.method
678 , p.withdraw_tx_id
679 , p.deposit_tx_id
680 , p.spark
681 , l.invoice AS lightning_invoice
682 , l.payment_hash AS lightning_payment_hash
683 , l.destination_pubkey AS lightning_destination_pubkey
684 , COALESCE(l.description, pm.lnurl_description) AS lightning_description
685 , l.preimage AS lightning_preimage
686 , pm.lnurl_pay_info
687 , pm.lnurl_withdraw_info
688 , pm.conversion_info
689 , t.metadata AS token_metadata
690 , t.tx_hash AS token_tx_hash
691 , t.invoice_details AS token_invoice_details
692 , s.invoice_details AS spark_invoice_details
693 , s.htlc_details AS spark_htlc_details
694 , lrm.nostr_zap_request AS lnurl_nostr_zap_request
695 , lrm.nostr_zap_receipt AS lnurl_nostr_zap_receipt
696 , lrm.sender_comment AS lnurl_sender_comment
697 FROM payments p
698 LEFT JOIN payment_details_lightning l ON p.id = l.payment_id
699 LEFT JOIN payment_details_token t ON p.id = t.payment_id
700 LEFT JOIN payment_details_spark s ON p.id = s.payment_id
701 LEFT JOIN payment_metadata pm ON p.id = pm.payment_id
702 LEFT JOIN lnurl_receive_metadata lrm ON l.payment_hash = lrm.payment_hash
703 WHERE p.id = ?",
704 )?;
705
706 let payment = stmt.query_row(params![id], map_payment)?;
707 Ok(payment)
708 }
709
710 async fn get_payment_by_invoice(
711 &self,
712 invoice: String,
713 ) -> Result<Option<Payment>, StorageError> {
714 let connection = self.get_connection()?;
715
716 let mut stmt = connection.prepare(
717 "SELECT p.id
718 , p.payment_type
719 , p.status
720 , p.amount
721 , p.fees
722 , p.timestamp
723 , p.method
724 , p.withdraw_tx_id
725 , p.deposit_tx_id
726 , p.spark
727 , l.invoice AS lightning_invoice
728 , l.payment_hash AS lightning_payment_hash
729 , l.destination_pubkey AS lightning_destination_pubkey
730 , COALESCE(l.description, pm.lnurl_description) AS lightning_description
731 , l.preimage AS lightning_preimage
732 , pm.lnurl_pay_info
733 , pm.lnurl_withdraw_info
734 , pm.conversion_info
735 , t.metadata AS token_metadata
736 , t.tx_hash AS token_tx_hash
737 , t.invoice_details AS token_invoice_details
738 , s.invoice_details AS spark_invoice_details
739 , s.htlc_details AS spark_htlc_details
740 , lrm.nostr_zap_request AS lnurl_nostr_zap_request
741 , lrm.nostr_zap_receipt AS lnurl_nostr_zap_receipt
742 , lrm.sender_comment AS lnurl_sender_comment
743 FROM payments p
744 LEFT JOIN payment_details_lightning l ON p.id = l.payment_id
745 LEFT JOIN payment_details_token t ON p.id = t.payment_id
746 LEFT JOIN payment_details_spark s ON p.id = s.payment_id
747 LEFT JOIN payment_metadata pm ON p.id = pm.payment_id
748 LEFT JOIN lnurl_receive_metadata lrm ON l.payment_hash = lrm.payment_hash
749 WHERE l.invoice = ?",
750 )?;
751
752 let payment = stmt.query_row(params![invoice], map_payment);
753 match payment {
754 Ok(payment) => Ok(Some(payment)),
755 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
756 Err(e) => Err(e.into()),
757 }
758 }
759
760 async fn add_deposit(
761 &self,
762 txid: String,
763 vout: u32,
764 amount_sats: u64,
765 ) -> Result<(), StorageError> {
766 let connection = self.get_connection()?;
767 connection.execute(
768 "INSERT OR IGNORE INTO unclaimed_deposits (txid, vout, amount_sats)
769 VALUES (?, ?, ?)",
770 params![txid, vout, amount_sats,],
771 )?;
772 Ok(())
773 }
774
775 async fn delete_deposit(&self, txid: String, vout: u32) -> Result<(), StorageError> {
776 let connection = self.get_connection()?;
777 connection.execute(
778 "DELETE FROM unclaimed_deposits WHERE txid = ? AND vout = ?",
779 params![txid, vout],
780 )?;
781 Ok(())
782 }
783
784 async fn list_deposits(&self) -> Result<Vec<DepositInfo>, StorageError> {
785 let connection = self.get_connection()?;
786 let mut stmt =
787 connection.prepare("SELECT txid, vout, amount_sats, claim_error, refund_tx, refund_tx_id FROM unclaimed_deposits")?;
788 let rows = stmt.query_map(params![], |row| {
789 Ok(DepositInfo {
790 txid: row.get(0)?,
791 vout: row.get(1)?,
792 amount_sats: row.get(2)?,
793 claim_error: row.get(3)?,
794 refund_tx: row.get(4)?,
795 refund_tx_id: row.get(5)?,
796 })
797 })?;
798 let mut deposits = Vec::new();
799 for row in rows {
800 deposits.push(row?);
801 }
802 Ok(deposits)
803 }
804
805 async fn update_deposit(
806 &self,
807 txid: String,
808 vout: u32,
809 payload: UpdateDepositPayload,
810 ) -> Result<(), StorageError> {
811 let connection = self.get_connection()?;
812 match payload {
813 UpdateDepositPayload::ClaimError { error } => {
814 connection.execute(
815 "UPDATE unclaimed_deposits SET claim_error = ? WHERE txid = ? AND vout = ?",
816 params![error, txid, vout],
817 )?;
818 }
819 UpdateDepositPayload::Refund {
820 refund_txid,
821 refund_tx,
822 } => {
823 connection.execute(
824 "UPDATE unclaimed_deposits SET refund_tx = ?, refund_tx_id = ? WHERE txid = ? AND vout = ?",
825 params![refund_tx, refund_txid, txid, vout],
826 )?;
827 }
828 }
829 Ok(())
830 }
831
832 async fn set_lnurl_metadata(
833 &self,
834 metadata: Vec<SetLnurlMetadataItem>,
835 ) -> Result<(), StorageError> {
836 let connection = self.get_connection()?;
837 for metadata in metadata {
838 connection.execute(
839 "INSERT OR REPLACE INTO lnurl_receive_metadata (payment_hash, nostr_zap_request, nostr_zap_receipt, sender_comment)
840 VALUES (?, ?, ?, ?)",
841 params![
842 metadata.payment_hash,
843 metadata.nostr_zap_request,
844 metadata.nostr_zap_receipt,
845 metadata.sender_comment,
846 ],
847 )?;
848 }
849 Ok(())
850 }
851}
852
853fn get_next_revision(tx: &Transaction<'_>) -> Result<u64, SyncStorageError> {
855 let revision = tx
856 .query_row(
857 "UPDATE sync_revision
858 SET revision = revision + 1
859 RETURNING revision",
860 [],
861 |row| row.get(0),
862 )
863 .map_err(map_sqlite_error)?;
864 Ok(revision)
865}
866
867impl From<StorageError> for SyncStorageError {
868 fn from(value: StorageError) -> Self {
869 match value {
870 StorageError::Implementation(s) => SyncStorageError::Implementation(s),
871 StorageError::InitializationError(s) => SyncStorageError::InitializationError(s),
872 StorageError::Serialization(s) => SyncStorageError::Serialization(s),
873 }
874 }
875}
876
877#[macros::async_trait]
878impl SyncStorage for SqliteStorage {
879 async fn add_outgoing_change(
880 &self,
881 record: UnversionedRecordChange,
882 ) -> Result<u64, SyncStorageError> {
883 let mut connection = self.get_connection()?;
884 let tx = connection.transaction().map_err(map_sqlite_error)?;
885 let revision = get_next_revision(&tx)?;
886
887 tx.execute(
888 "INSERT INTO sync_outgoing (
889 record_type
890 , data_id
891 , schema_version
892 , commit_time
893 , updated_fields_json
894 , revision
895 )
896 VALUES (?, ?, ?, strftime('%s','now'), ?, ?)",
897 params![
898 record.id.r#type,
899 record.id.data_id,
900 record.schema_version.clone(),
901 serde_json::to_string(&record.updated_fields)?,
902 revision,
903 ],
904 )
905 .map_err(map_sqlite_error)?;
906
907 tx.commit().map_err(map_sqlite_error)?;
908 Ok(revision)
909 }
910
911 async fn complete_outgoing_sync(&self, record: Record) -> Result<(), SyncStorageError> {
912 let mut connection = self.get_connection()?;
913 let tx = connection.transaction().map_err(map_sqlite_error)?;
914
915 tx.execute(
916 "DELETE FROM sync_outgoing WHERE record_type = ? AND data_id = ? AND revision = ?",
917 params![record.id.r#type, record.id.data_id, record.revision],
918 )
919 .map_err(map_sqlite_error)?;
920
921 tx.execute(
922 "INSERT OR REPLACE INTO sync_state (
923 record_type
924 , data_id
925 , schema_version
926 , commit_time
927 , data
928 , revision
929 )
930 VALUES (?, ?, ?, strftime('%s','now'), ?, ?)",
931 params![
932 record.id.r#type,
933 record.id.data_id,
934 record.schema_version.clone(),
935 serde_json::to_string(&record.data)?,
936 record.revision,
937 ],
938 )
939 .map_err(map_sqlite_error)?;
940
941 tx.commit().map_err(map_sqlite_error)?;
942 Ok(())
943 }
944
945 async fn get_pending_outgoing_changes(
946 &self,
947 limit: u32,
948 ) -> Result<Vec<OutgoingChange>, SyncStorageError> {
949 let connection = self.get_connection()?;
950
951 let mut stmt = connection
952 .prepare(
953 "SELECT o.record_type
954 , o.data_id
955 , o.schema_version
956 , o.commit_time
957 , o.updated_fields_json
958 , o.revision
959 , e.schema_version AS existing_schema_version
960 , e.commit_time AS existing_commit_time
961 , e.data AS existing_data
962 , e.revision AS existing_revision
963 FROM sync_outgoing o
964 LEFT JOIN sync_state e ON o.record_type = e.record_type AND o.data_id = e.data_id
965 ORDER BY o.revision ASC
966 LIMIT ?",
967 )
968 .map_err(map_sqlite_error)?;
969 let mut rows = stmt.query(params![limit]).map_err(map_sqlite_error)?;
970 let mut results = Vec::new();
971 while let Some(row) = rows.next().map_err(map_sqlite_error)? {
972 let parent = if let Some(existing_data) =
973 row.get::<_, Option<String>>(8).map_err(map_sqlite_error)?
974 {
975 Some(Record {
976 id: RecordId::new(
977 row.get::<_, String>(0).map_err(map_sqlite_error)?,
978 row.get::<_, String>(1).map_err(map_sqlite_error)?,
979 ),
980 schema_version: row.get(6).map_err(map_sqlite_error)?,
981 revision: row.get(9).map_err(map_sqlite_error)?,
982 data: serde_json::from_str(&existing_data)?,
983 })
984 } else {
985 None
986 };
987 let change = RecordChange {
988 id: RecordId::new(
989 row.get::<_, String>(0).map_err(map_sqlite_error)?,
990 row.get::<_, String>(1).map_err(map_sqlite_error)?,
991 ),
992 schema_version: row.get(2).map_err(map_sqlite_error)?,
993 updated_fields: serde_json::from_str(
994 &row.get::<_, String>(4).map_err(map_sqlite_error)?,
995 )?,
996 revision: row.get(5).map_err(map_sqlite_error)?,
997 };
998 results.push(OutgoingChange { change, parent });
999 }
1000
1001 Ok(results)
1002 }
1003
1004 async fn get_last_revision(&self) -> Result<u64, SyncStorageError> {
1005 let connection = self.get_connection()?;
1006
1007 let mut stmt = connection
1009 .prepare("SELECT COALESCE(MAX(revision), 0) FROM sync_state")
1010 .map_err(map_sqlite_error)?;
1011
1012 let revision: u64 = stmt
1013 .query_row([], |row| row.get(0))
1014 .map_err(map_sqlite_error)?;
1015
1016 Ok(revision)
1017 }
1018
1019 async fn insert_incoming_records(&self, records: Vec<Record>) -> Result<(), SyncStorageError> {
1020 if records.is_empty() {
1021 return Ok(());
1022 }
1023
1024 let mut connection = self.get_connection()?;
1025 let tx = connection.transaction().map_err(map_sqlite_error)?;
1026
1027 for record in records {
1028 tx.execute(
1029 "INSERT OR REPLACE INTO sync_incoming (
1030 record_type
1031 , data_id
1032 , schema_version
1033 , commit_time
1034 , data
1035 , revision
1036 )
1037 VALUES (?, ?, ?, strftime('%s','now'), ?, ?)",
1038 params![
1039 record.id.r#type,
1040 record.id.data_id,
1041 record.schema_version.clone(),
1042 serde_json::to_string(&record.data)?,
1043 record.revision,
1044 ],
1045 )
1046 .map_err(map_sqlite_error)?;
1047 }
1048
1049 tx.commit().map_err(map_sqlite_error)?;
1050 Ok(())
1051 }
1052
1053 async fn delete_incoming_record(&self, record: Record) -> Result<(), SyncStorageError> {
1054 let connection = self.get_connection()?;
1055
1056 connection
1057 .execute(
1058 "DELETE FROM sync_incoming WHERE record_type = ? AND data_id = ? AND revision = ?",
1059 params![record.id.r#type, record.id.data_id, record.revision],
1060 )
1061 .map_err(map_sqlite_error)?;
1062
1063 Ok(())
1064 }
1065
1066 async fn rebase_pending_outgoing_records(&self, revision: u64) -> Result<(), SyncStorageError> {
1067 let mut connection = self.get_connection()?;
1068 let tx = connection.transaction().map_err(map_sqlite_error)?;
1069
1070 let last_revision = tx
1071 .query_row(
1072 "SELECT COALESCE(MAX(revision), 0) FROM sync_state",
1073 [],
1074 |row| row.get(0),
1075 )
1076 .map_err(map_sqlite_error)?;
1077
1078 let diff = revision.saturating_sub(last_revision);
1079
1080 tx.execute(
1082 "UPDATE sync_outgoing
1083 SET revision = revision + ?",
1084 params![diff],
1085 )
1086 .map_err(map_sqlite_error)?;
1087
1088 tx.commit().map_err(map_sqlite_error)?;
1089 Ok(())
1090 }
1091
1092 async fn get_incoming_records(
1093 &self,
1094 limit: u32,
1095 ) -> Result<Vec<IncomingChange>, SyncStorageError> {
1096 let connection = self.get_connection()?;
1097
1098 let mut stmt = connection
1099 .prepare(
1100 "SELECT i.record_type
1101 , i.data_id
1102 , i.schema_version
1103 , i.data
1104 , i.revision
1105 , e.schema_version AS existing_schema_version
1106 , e.commit_time AS existing_commit_time
1107 , e.data AS existing_data
1108 , e.revision AS existing_revision
1109 FROM sync_incoming i
1110 LEFT JOIN sync_state e ON i.record_type = e.record_type AND i.data_id = e.data_id
1111 ORDER BY i.revision ASC
1112 LIMIT ?",
1113 )
1114 .map_err(map_sqlite_error)?;
1115
1116 let mut rows = stmt.query(params![limit]).map_err(map_sqlite_error)?;
1117 let mut results = Vec::new();
1118
1119 while let Some(row) = rows.next().map_err(map_sqlite_error)? {
1120 let parent = if let Some(existing_data) =
1121 row.get::<_, Option<String>>(7).map_err(map_sqlite_error)?
1122 {
1123 Some(Record {
1124 id: RecordId::new(
1125 row.get::<_, String>(0).map_err(map_sqlite_error)?,
1126 row.get::<_, String>(1).map_err(map_sqlite_error)?,
1127 ),
1128 schema_version: row.get(5).map_err(map_sqlite_error)?,
1129 revision: row.get(8).map_err(map_sqlite_error)?,
1130 data: serde_json::from_str(&existing_data)?,
1131 })
1132 } else {
1133 None
1134 };
1135 let record = Record {
1136 id: RecordId::new(
1137 row.get::<_, String>(0).map_err(map_sqlite_error)?,
1138 row.get::<_, String>(1).map_err(map_sqlite_error)?,
1139 ),
1140 schema_version: row.get(2).map_err(map_sqlite_error)?,
1141 data: serde_json::from_str(&row.get::<_, String>(3).map_err(map_sqlite_error)?)?,
1142 revision: row.get(4).map_err(map_sqlite_error)?,
1143 };
1144 results.push(IncomingChange {
1145 new_state: record,
1146 old_state: parent,
1147 });
1148 }
1149
1150 Ok(results)
1151 }
1152
1153 async fn get_latest_outgoing_change(&self) -> Result<Option<OutgoingChange>, SyncStorageError> {
1154 let connection = self.get_connection()?;
1155
1156 let mut stmt = connection
1157 .prepare(
1158 "SELECT o.record_type
1159 , o.data_id
1160 , o.schema_version
1161 , o.commit_time
1162 , o.updated_fields_json
1163 , o.revision
1164 , e.schema_version AS existing_schema_version
1165 , e.commit_time AS existing_commit_time
1166 , e.data AS existing_data
1167 , e.revision AS existing_revision
1168 FROM sync_outgoing o
1169 LEFT JOIN sync_state e ON o.record_type = e.record_type AND o.data_id = e.data_id
1170 ORDER BY o.revision DESC
1171 LIMIT 1",
1172 )
1173 .map_err(map_sqlite_error)?;
1174
1175 let mut rows = stmt.query([]).map_err(map_sqlite_error)?;
1176
1177 if let Some(row) = rows.next().map_err(map_sqlite_error)? {
1178 let parent = if let Some(existing_data) =
1179 row.get::<_, Option<String>>(8).map_err(map_sqlite_error)?
1180 {
1181 Some(Record {
1182 id: RecordId::new(
1183 row.get::<_, String>(0).map_err(map_sqlite_error)?,
1184 row.get::<_, String>(1).map_err(map_sqlite_error)?,
1185 ),
1186 schema_version: row.get(6).map_err(map_sqlite_error)?,
1187 revision: row.get(9).map_err(map_sqlite_error)?,
1188 data: serde_json::from_str(&existing_data)?,
1189 })
1190 } else {
1191 None
1192 };
1193 let change = RecordChange {
1194 id: RecordId::new(
1195 row.get::<_, String>(0).map_err(map_sqlite_error)?,
1196 row.get::<_, String>(1).map_err(map_sqlite_error)?,
1197 ),
1198 schema_version: row.get(2).map_err(map_sqlite_error)?,
1199 updated_fields: serde_json::from_str(
1200 &row.get::<_, String>(4).map_err(map_sqlite_error)?,
1201 )?,
1202 revision: row.get(5).map_err(map_sqlite_error)?,
1203 };
1204
1205 return Ok(Some(OutgoingChange { change, parent }));
1206 }
1207
1208 Ok(None)
1209 }
1210
1211 async fn update_record_from_incoming(&self, record: Record) -> Result<(), SyncStorageError> {
1212 let connection = self.get_connection()?;
1213
1214 connection
1215 .execute(
1216 "INSERT OR REPLACE INTO sync_state (
1217 record_type
1218 , data_id
1219 , schema_version
1220 , commit_time
1221 , data
1222 , revision
1223 )
1224 VALUES (?, ?, ?, strftime('%s','now'), ?, ?)",
1225 params![
1226 record.id.r#type,
1227 record.id.data_id,
1228 record.schema_version.clone(),
1229 serde_json::to_string(&record.data)?,
1230 record.revision,
1231 ],
1232 )
1233 .map_err(map_sqlite_error)?;
1234
1235 Ok(())
1236 }
1237}
1238
1239#[allow(clippy::needless_pass_by_value)]
1240fn map_sqlite_error(value: rusqlite::Error) -> SyncStorageError {
1241 SyncStorageError::Implementation(value.to_string())
1242}
1243
1244#[allow(clippy::too_many_lines)]
1245fn map_payment(row: &Row<'_>) -> Result<Payment, rusqlite::Error> {
1246 let withdraw_tx_id: Option<String> = row.get(7)?;
1247 let deposit_tx_id: Option<String> = row.get(8)?;
1248 let spark: Option<i32> = row.get(9)?;
1249 let lightning_invoice: Option<String> = row.get(10)?;
1250 let token_metadata: Option<String> = row.get(18)?;
1251 let details = match (
1252 lightning_invoice,
1253 withdraw_tx_id,
1254 deposit_tx_id,
1255 spark,
1256 token_metadata,
1257 ) {
1258 (Some(invoice), _, _, _, _) => {
1259 let payment_hash: String = row.get(11)?;
1260 let destination_pubkey: String = row.get(12)?;
1261 let description: Option<String> = row.get(13)?;
1262 let preimage: Option<String> = row.get(14)?;
1263 let lnurl_pay_info: Option<LnurlPayInfo> = row.get(15)?;
1264 let lnurl_withdraw_info: Option<LnurlWithdrawInfo> = row.get(16)?;
1265 let lnurl_nostr_zap_request: Option<String> = row.get(23)?;
1266 let lnurl_nostr_zap_receipt: Option<String> = row.get(24)?;
1267 let lnurl_sender_comment: Option<String> = row.get(25)?;
1268 let lnurl_receive_metadata =
1269 if lnurl_nostr_zap_request.is_some() || lnurl_sender_comment.is_some() {
1270 Some(LnurlReceiveMetadata {
1271 nostr_zap_request: lnurl_nostr_zap_request,
1272 nostr_zap_receipt: lnurl_nostr_zap_receipt,
1273 sender_comment: lnurl_sender_comment,
1274 })
1275 } else {
1276 None
1277 };
1278 Some(PaymentDetails::Lightning {
1279 invoice,
1280 payment_hash,
1281 destination_pubkey,
1282 description,
1283 preimage,
1284 lnurl_pay_info,
1285 lnurl_withdraw_info,
1286 lnurl_receive_metadata,
1287 })
1288 }
1289 (_, Some(tx_id), _, _, _) => Some(PaymentDetails::Withdraw { tx_id }),
1290 (_, _, Some(tx_id), _, _) => Some(PaymentDetails::Deposit { tx_id }),
1291 (_, _, _, Some(_), _) => {
1292 let invoice_details_str: Option<String> = row.get(21)?;
1293 let invoice_details = invoice_details_str
1294 .map(|s| serde_json_from_str(&s, 21))
1295 .transpose()?;
1296 let htlc_details_str: Option<String> = row.get(22)?;
1297 let htlc_details = htlc_details_str
1298 .map(|s| serde_json_from_str(&s, 22))
1299 .transpose()?;
1300 let conversion_info_str: Option<String> = row.get(17)?;
1301 let conversion_info: Option<ConversionInfo> = conversion_info_str
1302 .map(|s: String| serde_json_from_str(&s, 17))
1303 .transpose()?;
1304 Some(PaymentDetails::Spark {
1305 invoice_details,
1306 htlc_details,
1307 conversion_info,
1308 })
1309 }
1310 (_, _, _, _, Some(metadata)) => {
1311 let invoice_details_str: Option<String> = row.get(20)?;
1312 let invoice_details = invoice_details_str
1313 .map(|s| serde_json_from_str(&s, 20))
1314 .transpose()?;
1315 let conversion_info_str: Option<String> = row.get(17)?;
1316 let conversion_info: Option<ConversionInfo> = conversion_info_str
1317 .map(|s: String| serde_json_from_str(&s, 17))
1318 .transpose()?;
1319 Some(PaymentDetails::Token {
1320 metadata: serde_json_from_str(&metadata, 18)?,
1321 tx_hash: row.get(19)?,
1322 invoice_details,
1323 conversion_info,
1324 })
1325 }
1326 _ => None,
1327 };
1328 Ok(Payment {
1329 id: row.get(0)?,
1330 payment_type: row.get::<_, String>(1)?.parse().map_err(|e: String| {
1331 rusqlite::Error::FromSqlConversionFailure(1, rusqlite::types::Type::Text, e.into())
1332 })?,
1333 status: row.get::<_, String>(2)?.parse().map_err(|e: String| {
1334 rusqlite::Error::FromSqlConversionFailure(2, rusqlite::types::Type::Text, e.into())
1335 })?,
1336 amount: row.get::<_, U128SqlWrapper>(3)?.0,
1337 fees: row.get::<_, U128SqlWrapper>(4)?.0,
1338 timestamp: row.get(5)?,
1339 details,
1340 method: row.get(6)?,
1341 })
1342}
1343
1344impl ToSql for PaymentDetails {
1345 fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
1346 to_sql_json(self)
1347 }
1348}
1349
1350impl FromSql for PaymentDetails {
1351 fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
1352 from_sql_json(value)
1353 }
1354}
1355
1356impl ToSql for PaymentMethod {
1357 fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
1358 Ok(rusqlite::types::ToSqlOutput::from(self.to_string()))
1359 }
1360}
1361
1362impl FromSql for PaymentMethod {
1363 fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
1364 match value {
1365 ValueRef::Text(i) => {
1366 let s = std::str::from_utf8(i).map_err(|e| FromSqlError::Other(Box::new(e)))?;
1367 let payment_method: PaymentMethod = s
1369 .trim_matches('"')
1370 .to_lowercase()
1371 .parse()
1372 .map_err(|()| FromSqlError::InvalidType)?;
1373 Ok(payment_method)
1374 }
1375 _ => Err(FromSqlError::InvalidType),
1376 }
1377 }
1378}
1379
1380impl ToSql for DepositClaimError {
1381 fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
1382 to_sql_json(self)
1383 }
1384}
1385
1386impl FromSql for DepositClaimError {
1387 fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
1388 from_sql_json(value)
1389 }
1390}
1391
1392impl ToSql for LnurlPayInfo {
1393 fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
1394 to_sql_json(self)
1395 }
1396}
1397
1398impl FromSql for LnurlPayInfo {
1399 fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
1400 from_sql_json(value)
1401 }
1402}
1403
1404impl ToSql for LnurlWithdrawInfo {
1405 fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
1406 to_sql_json(self)
1407 }
1408}
1409
1410impl FromSql for LnurlWithdrawInfo {
1411 fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
1412 from_sql_json(value)
1413 }
1414}
1415
1416fn to_sql_json<T>(value: T) -> rusqlite::Result<ToSqlOutput<'static>>
1417where
1418 T: serde::Serialize,
1419{
1420 let json = serde_json::to_string(&value)
1421 .map_err(|e| rusqlite::Error::ToSqlConversionFailure(Box::new(e)))?;
1422 Ok(rusqlite::types::ToSqlOutput::from(json))
1423}
1424
1425fn from_sql_json<T>(value: ValueRef<'_>) -> FromSqlResult<T>
1426where
1427 T: serde::de::DeserializeOwned,
1428{
1429 match value {
1430 ValueRef::Text(i) => {
1431 let s = std::str::from_utf8(i).map_err(|e| FromSqlError::Other(Box::new(e)))?;
1432 let deserialized: T = serde_json::from_str(s).map_err(|_| FromSqlError::InvalidType)?;
1433 Ok(deserialized)
1434 }
1435 _ => Err(FromSqlError::InvalidType),
1436 }
1437}
1438
1439fn serde_json_from_str<T>(value: &str, index: usize) -> Result<T, rusqlite::Error>
1440where
1441 T: serde::de::DeserializeOwned,
1442{
1443 serde_json::from_str(value).map_err(|e| {
1444 rusqlite::Error::FromSqlConversionFailure(index, rusqlite::types::Type::Text, Box::new(e))
1445 })
1446}
1447
1448struct U128SqlWrapper(u128);
1449
1450impl ToSql for U128SqlWrapper {
1451 fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
1452 let string = self.0.to_string();
1453 Ok(rusqlite::types::ToSqlOutput::from(string))
1454 }
1455}
1456
1457impl FromSql for U128SqlWrapper {
1458 fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
1459 match value {
1460 ValueRef::Text(i) => {
1461 let s = std::str::from_utf8(i).map_err(|e| FromSqlError::Other(Box::new(e)))?;
1462 let integer = s.parse::<u128>().map_err(|_| FromSqlError::InvalidType)?;
1463 Ok(U128SqlWrapper(integer))
1464 }
1465 _ => Err(FromSqlError::InvalidType),
1466 }
1467 }
1468}
1469
1470#[cfg(test)]
1471mod tests {
1472
1473 use crate::SqliteStorage;
1474 use std::path::PathBuf;
1475
1476 fn create_temp_dir(name: &str) -> PathBuf {
1479 let mut path = std::env::temp_dir();
1480 path.push(format!("breez-test-{}-{}", name, uuid::Uuid::new_v4()));
1482 std::fs::create_dir_all(&path).unwrap();
1483 path
1484 }
1485
1486 #[tokio::test]
1487 async fn test_sqlite_storage() {
1488 let temp_dir = create_temp_dir("sqlite_storage");
1489 let storage = SqliteStorage::new(&temp_dir).unwrap();
1490
1491 crate::persist::tests::test_sqlite_storage(Box::new(storage)).await;
1492 }
1493
1494 #[tokio::test]
1495 async fn test_unclaimed_deposits_crud() {
1496 let temp_dir = create_temp_dir("sqlite_storage_deposits");
1497 let storage = SqliteStorage::new(&temp_dir).unwrap();
1498
1499 crate::persist::tests::test_unclaimed_deposits_crud(Box::new(storage)).await;
1500 }
1501
1502 #[tokio::test]
1503 async fn test_deposit_refunds() {
1504 let temp_dir = create_temp_dir("sqlite_storage_refund_tx");
1505 let storage = SqliteStorage::new(&temp_dir).unwrap();
1506
1507 crate::persist::tests::test_deposit_refunds(Box::new(storage)).await;
1508 }
1509
1510 #[tokio::test]
1511 async fn test_payment_type_filtering() {
1512 let temp_dir = create_temp_dir("sqlite_storage_type_filter");
1513 let storage = SqliteStorage::new(&temp_dir).unwrap();
1514
1515 crate::persist::tests::test_payment_type_filtering(Box::new(storage)).await;
1516 }
1517
1518 #[tokio::test]
1519 async fn test_payment_status_filtering() {
1520 let temp_dir = create_temp_dir("sqlite_storage_status_filter");
1521 let storage = SqliteStorage::new(&temp_dir).unwrap();
1522
1523 crate::persist::tests::test_payment_status_filtering(Box::new(storage)).await;
1524 }
1525
1526 #[tokio::test]
1527 async fn test_payment_asset_filtering() {
1528 let temp_dir = create_temp_dir("sqlite_storage_asset_filter");
1529 let storage = SqliteStorage::new(&temp_dir).unwrap();
1530
1531 crate::persist::tests::test_asset_filtering(Box::new(storage)).await;
1532 }
1533
1534 #[tokio::test]
1535 async fn test_timestamp_filtering() {
1536 let temp_dir = create_temp_dir("sqlite_storage_timestamp_filter");
1537 let storage = SqliteStorage::new(&temp_dir).unwrap();
1538
1539 crate::persist::tests::test_timestamp_filtering(Box::new(storage)).await;
1540 }
1541
1542 #[tokio::test]
1543 async fn test_spark_htlc_status_filtering() {
1544 let temp_dir = create_temp_dir("sqlite_storage_htlc_filter");
1545 let storage = SqliteStorage::new(&temp_dir).unwrap();
1546
1547 crate::persist::tests::test_spark_htlc_status_filtering(Box::new(storage)).await;
1548 }
1549
1550 #[tokio::test]
1551 async fn test_conversion_refund_needed_filtering() {
1552 let temp_dir = create_temp_dir("sqlite_storage_conversion_refund_needed_filter");
1553 let storage = SqliteStorage::new(&temp_dir).unwrap();
1554
1555 crate::persist::tests::test_conversion_refund_needed_filtering(Box::new(storage)).await;
1556 }
1557
1558 #[tokio::test]
1559 async fn test_combined_filters() {
1560 let temp_dir = create_temp_dir("sqlite_storage_combined_filter");
1561 let storage = SqliteStorage::new(&temp_dir).unwrap();
1562
1563 crate::persist::tests::test_combined_filters(Box::new(storage)).await;
1564 }
1565
1566 #[tokio::test]
1567 async fn test_sort_order() {
1568 let temp_dir = create_temp_dir("sqlite_storage_sort_order");
1569 let storage = SqliteStorage::new(&temp_dir).unwrap();
1570
1571 crate::persist::tests::test_sort_order(Box::new(storage)).await;
1572 }
1573
1574 #[tokio::test]
1575 async fn test_payment_metadata() {
1576 let temp_dir = create_temp_dir("sqlite_storage_payment_request_metadata");
1577 let storage = SqliteStorage::new(&temp_dir).unwrap();
1578
1579 crate::persist::tests::test_payment_metadata(Box::new(storage)).await;
1580 }
1581
1582 #[tokio::test]
1583 async fn test_payment_details_update_persistence() {
1584 let temp_dir = create_temp_dir("sqlite_storage_payment_details_update");
1585 let storage = SqliteStorage::new(&temp_dir).unwrap();
1586
1587 crate::persist::tests::test_payment_details_update_persistence(Box::new(storage)).await;
1588 }
1589
1590 #[tokio::test]
1591 async fn test_sync_storage() {
1592 let temp_dir = create_temp_dir("sqlite_sync_storage");
1593 let storage = SqliteStorage::new(&temp_dir).unwrap();
1594
1595 crate::persist::tests::test_sqlite_sync_storage(Box::new(storage)).await;
1596 }
1597}