1use std::path::{Path, PathBuf};
2
3use macros::async_trait;
4use rusqlite::{
5 Connection, Row, ToSql, Transaction, params,
6 types::{FromSql, FromSqlError, FromSqlResult, ToSqlOutput, ValueRef},
7};
8use rusqlite_migration::{M, Migrations, SchemaVersion};
9
10use crate::{
11 AssetFilter, DepositInfo, ListPaymentsRequest, LnurlPayInfo, LnurlReceiveMetadata,
12 LnurlWithdrawInfo, PaymentDetails, PaymentMethod,
13 error::DepositClaimError,
14 persist::{PaymentMetadata, SetLnurlMetadataItem, UpdateDepositPayload},
15 sync_storage::{
16 IncomingChange, OutgoingChange, Record, RecordChange, RecordId, SyncStorage,
17 SyncStorageError, UnversionedRecordChange,
18 },
19};
20
21use super::{Payment, Storage, StorageError};
22
23const DEFAULT_DB_FILENAME: &str = "storage.sql";
24pub struct SqliteStorage {
26 db_dir: PathBuf,
27}
28
29impl SqliteStorage {
30 pub fn new(path: &Path) -> Result<Self, StorageError> {
40 let storage = Self {
41 db_dir: path.to_path_buf(),
42 };
43
44 #[cfg(not(all(target_family = "wasm", target_os = "unknown")))]
45 std::fs::create_dir_all(path)
46 .map_err(|e| StorageError::InitializationError(e.to_string()))?;
47
48 storage.migrate()?;
49 Ok(storage)
50 }
51
52 pub(crate) fn get_connection(&self) -> Result<Connection, StorageError> {
53 Ok(Connection::open(self.get_db_path())?)
54 }
55
56 fn get_db_path(&self) -> PathBuf {
57 self.db_dir.join(DEFAULT_DB_FILENAME)
58 }
59
60 fn migrate(&self) -> Result<(), StorageError> {
61 let migrations =
62 Migrations::new(Self::current_migrations().into_iter().map(M::up).collect());
63 let mut conn = self.get_connection()?;
64 let previous_version = match migrations.current_version(&conn)? {
65 SchemaVersion::Inside(previous_version) => previous_version.get(),
66 _ => 0,
67 };
68 migrations.to_latest(&mut conn)?;
69
70 if previous_version < 6 {
71 Self::migrate_lnurl_metadata_description(&mut conn)?;
72 }
73
74 Ok(())
75 }
76
77 fn migrate_lnurl_metadata_description(conn: &mut Connection) -> Result<(), StorageError> {
78 let mut stmt = conn.prepare("SELECT payment_id, lnurl_pay_info FROM payment_metadata")?;
79 let pay_infos: Vec<_> = stmt
80 .query_map([], |row| {
81 let payment_id: String = row.get(0)?;
82 let lnurl_pay_info: Option<LnurlPayInfo> = row.get(1)?;
83 Ok((payment_id, lnurl_pay_info))
84 })?
85 .collect::<Result<_, _>>()?;
86 let pay_infos = pay_infos
87 .into_iter()
88 .filter_map(|(payment_id, lnurl_pay_info)| {
89 let pay_info = lnurl_pay_info?;
90 let description = pay_info.extract_description()?;
91 Some((payment_id, description))
92 })
93 .collect::<Vec<_>>();
94
95 for pay_info in pay_infos {
96 conn.execute(
97 "UPDATE payment_metadata SET lnurl_description = ? WHERE payment_id = ?",
98 params![pay_info.1, pay_info.0],
99 )?;
100 }
101
102 Ok(())
103 }
104
105 #[allow(clippy::too_many_lines)]
106 pub(crate) fn current_migrations() -> Vec<&'static str> {
107 vec![
108 "CREATE TABLE IF NOT EXISTS payments (
109 id TEXT PRIMARY KEY,
110 payment_type TEXT NOT NULL,
111 status TEXT NOT NULL,
112 amount INTEGER NOT NULL,
113 fees INTEGER NOT NULL,
114 timestamp INTEGER NOT NULL,
115 details TEXT,
116 method TEXT
117 );",
118 "CREATE TABLE IF NOT EXISTS settings (
119 key TEXT PRIMARY KEY,
120 value TEXT NOT NULL
121 );",
122 "CREATE TABLE IF NOT EXISTS unclaimed_deposits (
123 txid TEXT NOT NULL,
124 vout INTEGER NOT NULL,
125 amount_sats INTEGER,
126 claim_error TEXT,
127 refund_tx TEXT,
128 refund_tx_id TEXT,
129 PRIMARY KEY (txid, vout)
130 );",
131 "CREATE TABLE IF NOT EXISTS payment_metadata (
132 payment_id TEXT PRIMARY KEY,
133 lnurl_pay_info TEXT
134 );",
135 "CREATE TABLE IF NOT EXISTS deposit_refunds (
136 deposit_tx_id TEXT NOT NULL,
137 deposit_vout INTEGER NOT NULL,
138 refund_tx TEXT NOT NULL,
139 refund_tx_id TEXT NOT NULL,
140 PRIMARY KEY (deposit_tx_id, deposit_vout)
141 );",
142 "ALTER TABLE payment_metadata ADD COLUMN lnurl_description TEXT;",
143 "
144 ALTER TABLE payments ADD COLUMN withdraw_tx_id TEXT;
145 ALTER TABLE payments ADD COLUMN deposit_tx_id TEXT;
146 ALTER TABLE payments ADD COLUMN spark INTEGER;
147 CREATE TABLE payment_details_lightning (
148 payment_id TEXT PRIMARY KEY,
149 invoice TEXT NOT NULL,
150 payment_hash TEXT NOT NULL,
151 destination_pubkey TEXT NOT NULL,
152 description TEXT,
153 preimage TEXT,
154 FOREIGN KEY (payment_id) REFERENCES payments(id) ON DELETE CASCADE
155 );
156 INSERT INTO payment_details_lightning (payment_id, invoice, payment_hash, destination_pubkey, description, preimage)
157 SELECT id, json_extract(details, '$.Lightning.invoice'), json_extract(details, '$.Lightning.payment_hash'),
158 json_extract(details, '$.Lightning.destination_pubkey'), json_extract(details, '$.Lightning.description'),
159 json_extract(details, '$.Lightning.preimage')
160 FROM payments WHERE json_extract(details, '$.Lightning.invoice') IS NOT NULL;
161
162 UPDATE payments SET withdraw_tx_id = json_extract(details, '$.Withdraw.tx_id')
163 WHERE json_extract(details, '$.Withdraw.tx_id') IS NOT NULL;
164
165 UPDATE payments SET deposit_tx_id = json_extract(details, '$.Deposit.tx_id')
166 WHERE json_extract(details, '$.Deposit.tx_id') IS NOT NULL;
167
168 ALTER TABLE payments DROP COLUMN details;
169
170 CREATE INDEX idx_payment_details_lightning_invoice ON payment_details_lightning(invoice);
171 ",
172 "CREATE TABLE payment_details_token (
173 payment_id TEXT PRIMARY KEY,
174 metadata TEXT NOT NULL,
175 tx_hash TEXT NOT NULL,
176 FOREIGN KEY (payment_id) REFERENCES payments(id) ON DELETE CASCADE
177 );",
178 "CREATE TABLE payments_new (
180 id TEXT PRIMARY KEY,
181 payment_type TEXT NOT NULL,
182 status TEXT NOT NULL,
183 amount TEXT NOT NULL,
184 fees TEXT NOT NULL,
185 timestamp INTEGER NOT NULL,
186 method TEXT,
187 withdraw_tx_id TEXT,
188 deposit_tx_id TEXT,
189 spark INTEGER
190 );",
191 "INSERT INTO payments_new (id, payment_type, status, amount, fees, timestamp, method, withdraw_tx_id, deposit_tx_id, spark)
192 SELECT id, payment_type, status, CAST(amount AS TEXT), CAST(fees AS TEXT), timestamp, method, withdraw_tx_id, deposit_tx_id, spark
193 FROM payments;",
194 "DROP TABLE payments;",
195 "ALTER TABLE payments_new RENAME TO payments;",
196 "CREATE TABLE payment_details_spark (
197 payment_id TEXT NOT NULL PRIMARY KEY,
198 invoice_details TEXT NOT NULL,
199 FOREIGN KEY (payment_id) REFERENCES payments(id) ON DELETE CASCADE
200 );
201 ALTER TABLE payment_details_token ADD COLUMN invoice_details TEXT;",
202 "ALTER TABLE payment_metadata ADD COLUMN lnurl_withdraw_info TEXT;",
203 "CREATE TABLE sync_revision (
204 revision INTEGER NOT NULL DEFAULT 0
205 );
206 INSERT INTO sync_revision (revision) VALUES (0);
207 CREATE TABLE sync_outgoing(
208 record_type TEXT NOT NULL,
209 data_id TEXT NOT NULL,
210 schema_version TEXT NOT NULL,
211 commit_time INTEGER NOT NULL,
212 updated_fields_json TEXT NOT NULL,
213 revision INTEGER NOT NULL
214 );
215 CREATE INDEX idx_sync_outgoing_data_id_record_type ON sync_outgoing(record_type, data_id);
216 CREATE TABLE sync_state(
217 record_type TEXT NOT NULL,
218 data_id TEXT NOT NULL,
219 schema_version TEXT NOT NULL,
220 commit_time INTEGER NOT NULL,
221 data TEXT NOT NULL,
222 revision INTEGER NOT NULL,
223 PRIMARY KEY(record_type, data_id)
224 );",
225 "CREATE TABLE sync_incoming(
226 record_type TEXT NOT NULL,
227 data_id TEXT NOT NULL,
228 schema_version TEXT NOT NULL,
229 commit_time INTEGER NOT NULL,
230 data TEXT NOT NULL,
231 revision INTEGER NOT NULL,
232 PRIMARY KEY(record_type, data_id, revision)
233 );
234 CREATE INDEX idx_sync_incoming_revision ON sync_incoming(revision);",
235 "ALTER TABLE payment_details_spark RENAME TO tmp_payment_details_spark;
236 CREATE TABLE payment_details_spark (
237 payment_id TEXT NOT NULL PRIMARY KEY,
238 invoice_details TEXT,
239 htlc_details TEXT,
240 FOREIGN KEY (payment_id) REFERENCES payments(id) ON DELETE CASCADE
241 );
242 INSERT INTO payment_details_spark (payment_id, invoice_details)
243 SELECT payment_id, invoice_details FROM tmp_payment_details_spark;
244 DROP TABLE tmp_payment_details_spark;",
245 "CREATE TABLE lnurl_receive_metadata (
246 payment_hash TEXT NOT NULL PRIMARY KEY,
247 nostr_zap_request TEXT,
248 nostr_zap_receipt TEXT,
249 sender_comment TEXT
250 );",
251 "DELETE FROM unclaimed_deposits;",
254 ]
255 }
256}
257
258impl From<rusqlite::Error> for StorageError {
259 fn from(value: rusqlite::Error) -> Self {
260 StorageError::Implementation(value.to_string())
261 }
262}
263
264impl From<rusqlite_migration::Error> for StorageError {
265 fn from(value: rusqlite_migration::Error) -> Self {
266 StorageError::Implementation(value.to_string())
267 }
268}
269
270#[async_trait]
271impl Storage for SqliteStorage {
272 #[allow(clippy::too_many_lines)]
273 async fn list_payments(
274 &self,
275 request: ListPaymentsRequest,
276 ) -> Result<Vec<Payment>, StorageError> {
277 let connection = self.get_connection()?;
278
279 let mut where_clauses = Vec::new();
281 let mut params: Vec<Box<dyn ToSql>> = Vec::new();
282
283 if let Some(ref type_filter) = request.type_filter
285 && !type_filter.is_empty()
286 {
287 let placeholders = type_filter
288 .iter()
289 .map(|_| "?")
290 .collect::<Vec<_>>()
291 .join(", ");
292 where_clauses.push(format!("p.payment_type IN ({placeholders})"));
293 for payment_type in type_filter {
294 params.push(Box::new(payment_type.to_string()));
295 }
296 }
297
298 if let Some(ref status_filter) = request.status_filter
300 && !status_filter.is_empty()
301 {
302 let placeholders = status_filter
303 .iter()
304 .map(|_| "?")
305 .collect::<Vec<_>>()
306 .join(", ");
307 where_clauses.push(format!("p.status IN ({placeholders})"));
308 for status in status_filter {
309 params.push(Box::new(status.to_string()));
310 }
311 }
312
313 if let Some(from_timestamp) = request.from_timestamp {
315 where_clauses.push("p.timestamp >= ?".to_string());
316 params.push(Box::new(from_timestamp));
317 }
318
319 if let Some(to_timestamp) = request.to_timestamp {
320 where_clauses.push("p.timestamp < ?".to_string());
321 params.push(Box::new(to_timestamp));
322 }
323
324 if let Some(ref asset_filter) = request.asset_filter {
326 match asset_filter {
327 AssetFilter::Bitcoin => {
328 where_clauses.push("t.metadata IS NULL".to_string());
329 }
330 AssetFilter::Token { token_identifier } => {
331 where_clauses.push("t.metadata IS NOT NULL".to_string());
332 if let Some(identifier) = token_identifier {
333 where_clauses
335 .push("json_extract(t.metadata, '$.identifier') = ?".to_string());
336 params.push(Box::new(identifier.clone()));
337 }
338 }
339 }
340 }
341
342 if let Some(ref htlc_status_filter) = request.spark_htlc_status_filter
344 && !htlc_status_filter.is_empty()
345 {
346 let placeholders = htlc_status_filter
347 .iter()
348 .map(|_| "?")
349 .collect::<Vec<_>>()
350 .join(", ");
351 where_clauses.push(format!(
352 "json_extract(s.htlc_details, '$.status') IN ({placeholders})"
353 ));
354 for htlc_status in htlc_status_filter {
355 params.push(Box::new(htlc_status.to_string()));
356 }
357 }
358
359 let where_sql = if where_clauses.is_empty() {
361 String::new()
362 } else {
363 format!("WHERE {}", where_clauses.join(" AND "))
364 };
365
366 let order_direction = if request.sort_ascending.unwrap_or(false) {
368 "ASC"
369 } else {
370 "DESC"
371 };
372
373 let query = format!(
374 "SELECT p.id
375 , p.payment_type
376 , p.status
377 , p.amount
378 , p.fees
379 , p.timestamp
380 , p.method
381 , p.withdraw_tx_id
382 , p.deposit_tx_id
383 , p.spark
384 , l.invoice AS lightning_invoice
385 , l.payment_hash AS lightning_payment_hash
386 , l.destination_pubkey AS lightning_destination_pubkey
387 , COALESCE(l.description, pm.lnurl_description) AS lightning_description
388 , l.preimage AS lightning_preimage
389 , pm.lnurl_pay_info
390 , pm.lnurl_withdraw_info
391 , t.metadata AS token_metadata
392 , t.tx_hash AS token_tx_hash
393 , t.invoice_details AS token_invoice_details
394 , s.invoice_details AS spark_invoice_details
395 , s.htlc_details AS spark_htlc_details
396 , lrm.nostr_zap_request AS lnurl_nostr_zap_request
397 , lrm.nostr_zap_receipt AS lnurl_nostr_zap_receipt
398 , lrm.sender_comment AS lnurl_sender_comment
399 FROM payments p
400 LEFT JOIN payment_details_lightning l ON p.id = l.payment_id
401 LEFT JOIN payment_details_token t ON p.id = t.payment_id
402 LEFT JOIN payment_details_spark s ON p.id = s.payment_id
403 LEFT JOIN payment_metadata pm ON p.id = pm.payment_id
404 LEFT JOIN lnurl_receive_metadata lrm ON l.payment_hash = lrm.payment_hash
405 {}
406 ORDER BY p.timestamp {}
407 LIMIT {} OFFSET {}",
408 where_sql,
409 order_direction,
410 request.limit.unwrap_or(u32::MAX),
411 request.offset.unwrap_or(0)
412 );
413
414 let mut stmt = connection.prepare(&query)?;
415 let param_refs: Vec<&dyn ToSql> = params.iter().map(std::convert::AsRef::as_ref).collect();
416 let payments = stmt
417 .query_map(param_refs.as_slice(), map_payment)?
418 .collect::<Result<Vec<_>, _>>()?;
419 Ok(payments)
420 }
421
422 async fn insert_payment(&self, payment: Payment) -> Result<(), StorageError> {
423 let mut connection = self.get_connection()?;
424 let tx = connection.transaction()?;
425 tx.execute(
426 "INSERT OR REPLACE INTO payments (id, payment_type, status, amount, fees, timestamp, method)
427 VALUES (?, ?, ?, ?, ?, ?, ?)",
428 params![
429 payment.id,
430 payment.payment_type.to_string(),
431 payment.status.to_string(),
432 U128SqlWrapper(payment.amount),
433 U128SqlWrapper(payment.fees),
434 payment.timestamp,
435 payment.method,
436 ],
437 )?;
438
439 match payment.details {
440 Some(PaymentDetails::Withdraw { tx_id }) => {
441 tx.execute(
442 "UPDATE payments SET withdraw_tx_id = ? WHERE id = ?",
443 params![tx_id, payment.id],
444 )?;
445 }
446 Some(PaymentDetails::Deposit { tx_id }) => {
447 tx.execute(
448 "UPDATE payments SET deposit_tx_id = ? WHERE id = ?",
449 params![tx_id, payment.id],
450 )?;
451 }
452 Some(PaymentDetails::Spark {
453 invoice_details,
454 htlc_details,
455 }) => {
456 tx.execute(
457 "UPDATE payments SET spark = 1 WHERE id = ?",
458 params![payment.id],
459 )?;
460 if let Some(invoice_details) = invoice_details {
461 tx.execute("INSERT OR REPLACE INTO payment_details_spark (payment_id, invoice_details) VALUES (?, ?)",
462 params![payment.id, serde_json::to_string(&invoice_details)?],
463 )?;
464 }
465 if let Some(htlc_details) = htlc_details {
466 tx.execute("INSERT OR REPLACE INTO payment_details_spark (payment_id, htlc_details) VALUES (?, ?)",
467 params![payment.id, serde_json::to_string(&htlc_details)?],
468 )?;
469 }
470 }
471 Some(PaymentDetails::Token {
472 metadata,
473 tx_hash,
474 invoice_details,
475 }) => {
476 tx.execute(
477 "INSERT OR REPLACE INTO payment_details_token (payment_id, metadata, tx_hash, invoice_details) VALUES (?, ?, ?, ?)",
478 params![payment.id, serde_json::to_string(&metadata)?, tx_hash, invoice_details.map(|d| serde_json::to_string(&d)).transpose()?],
479 )?;
480 }
481 Some(PaymentDetails::Lightning {
482 invoice,
483 payment_hash,
484 destination_pubkey,
485 description,
486 preimage,
487 ..
488 }) => {
489 tx.execute(
490 "INSERT OR REPLACE INTO payment_details_lightning (payment_id, invoice, payment_hash, destination_pubkey, description, preimage)
491 VALUES (?, ?, ?, ?, ?, ?)",
492 params![
493 payment.id,
494 invoice,
495 payment_hash,
496 destination_pubkey,
497 description,
498 preimage,
499 ],
500 )?;
501 }
502 None => {}
503 }
504
505 tx.commit()?;
506 Ok(())
507 }
508
509 async fn set_payment_metadata(
510 &self,
511 payment_id: String,
512 metadata: PaymentMetadata,
513 ) -> Result<(), StorageError> {
514 let connection = self.get_connection()?;
515
516 connection.execute(
517 "INSERT OR REPLACE INTO payment_metadata (payment_id, lnurl_pay_info, lnurl_withdraw_info, lnurl_description) VALUES (?, ?, ?, ?)",
518 params![payment_id, metadata.lnurl_pay_info, metadata.lnurl_withdraw_info, metadata.lnurl_description],
519 )?;
520
521 Ok(())
522 }
523
524 async fn set_cached_item(&self, key: String, value: String) -> Result<(), StorageError> {
525 let connection = self.get_connection()?;
526
527 connection.execute(
528 "INSERT OR REPLACE INTO settings (key, value) VALUES (?, ?)",
529 params![key, value],
530 )?;
531
532 Ok(())
533 }
534
535 async fn get_cached_item(&self, key: String) -> Result<Option<String>, StorageError> {
536 let connection = self.get_connection()?;
537
538 let mut stmt = connection.prepare("SELECT value FROM settings WHERE key = ?")?;
539
540 let result = stmt.query_row(params![key], |row| {
541 let value_str: String = row.get(0)?;
542 Ok(value_str)
543 });
544
545 match result {
546 Ok(value) => Ok(Some(value)),
547 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
548 Err(e) => Err(e.into()),
549 }
550 }
551
552 async fn delete_cached_item(&self, key: String) -> Result<(), StorageError> {
553 let connection = self.get_connection()?;
554
555 connection.execute("DELETE FROM settings WHERE key = ?", params![key])?;
556
557 Ok(())
558 }
559
560 async fn get_payment_by_id(&self, id: String) -> Result<Payment, StorageError> {
561 let connection = self.get_connection()?;
562
563 let mut stmt = connection.prepare(
564 "SELECT p.id
565 , p.payment_type
566 , p.status
567 , p.amount
568 , p.fees
569 , p.timestamp
570 , p.method
571 , p.withdraw_tx_id
572 , p.deposit_tx_id
573 , p.spark
574 , l.invoice AS lightning_invoice
575 , l.payment_hash AS lightning_payment_hash
576 , l.destination_pubkey AS lightning_destination_pubkey
577 , COALESCE(l.description, pm.lnurl_description) AS lightning_description
578 , l.preimage AS lightning_preimage
579 , pm.lnurl_pay_info
580 , pm.lnurl_withdraw_info
581 , t.metadata AS token_metadata
582 , t.tx_hash AS token_tx_hash
583 , t.invoice_details AS token_invoice_details
584 , s.invoice_details AS spark_invoice_details
585 , s.htlc_details AS spark_htlc_details
586 , lrm.nostr_zap_request AS lnurl_nostr_zap_request
587 , lrm.nostr_zap_receipt AS lnurl_nostr_zap_receipt
588 , lrm.sender_comment AS lnurl_sender_comment
589 FROM payments p
590 LEFT JOIN payment_details_lightning l ON p.id = l.payment_id
591 LEFT JOIN payment_details_token t ON p.id = t.payment_id
592 LEFT JOIN payment_details_spark s ON p.id = s.payment_id
593 LEFT JOIN payment_metadata pm ON p.id = pm.payment_id
594 LEFT JOIN lnurl_receive_metadata lrm ON l.payment_hash = lrm.payment_hash
595 WHERE p.id = ?",
596 )?;
597
598 let payment = stmt.query_row(params![id], map_payment)?;
599 Ok(payment)
600 }
601
602 async fn get_payment_by_invoice(
603 &self,
604 invoice: String,
605 ) -> Result<Option<Payment>, StorageError> {
606 let connection = self.get_connection()?;
607
608 let mut stmt = connection.prepare(
609 "SELECT p.id
610 , p.payment_type
611 , p.status
612 , p.amount
613 , p.fees
614 , p.timestamp
615 , p.method
616 , p.withdraw_tx_id
617 , p.deposit_tx_id
618 , p.spark
619 , l.invoice AS lightning_invoice
620 , l.payment_hash AS lightning_payment_hash
621 , l.destination_pubkey AS lightning_destination_pubkey
622 , COALESCE(l.description, pm.lnurl_description) AS lightning_description
623 , l.preimage AS lightning_preimage
624 , pm.lnurl_pay_info
625 , pm.lnurl_withdraw_info
626 , t.metadata AS token_metadata
627 , t.tx_hash AS token_tx_hash
628 , t.invoice_details AS token_invoice_details
629 , s.invoice_details AS spark_invoice_details
630 , s.htlc_details AS spark_htlc_details
631 , lrm.nostr_zap_request AS lnurl_nostr_zap_request
632 , lrm.nostr_zap_receipt AS lnurl_nostr_zap_receipt
633 , lrm.sender_comment AS lnurl_sender_comment
634 FROM payments p
635 LEFT JOIN payment_details_lightning l ON p.id = l.payment_id
636 LEFT JOIN payment_details_token t ON p.id = t.payment_id
637 LEFT JOIN payment_details_spark s ON p.id = s.payment_id
638 LEFT JOIN payment_metadata pm ON p.id = pm.payment_id
639 LEFT JOIN lnurl_receive_metadata lrm ON l.payment_hash = lrm.payment_hash
640 WHERE l.invoice = ?",
641 )?;
642
643 let payment = stmt.query_row(params![invoice], map_payment);
644 match payment {
645 Ok(payment) => Ok(Some(payment)),
646 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
647 Err(e) => Err(e.into()),
648 }
649 }
650
651 async fn add_deposit(
652 &self,
653 txid: String,
654 vout: u32,
655 amount_sats: u64,
656 ) -> Result<(), StorageError> {
657 let connection = self.get_connection()?;
658 connection.execute(
659 "INSERT OR IGNORE INTO unclaimed_deposits (txid, vout, amount_sats)
660 VALUES (?, ?, ?)",
661 params![txid, vout, amount_sats,],
662 )?;
663 Ok(())
664 }
665
666 async fn delete_deposit(&self, txid: String, vout: u32) -> Result<(), StorageError> {
667 let connection = self.get_connection()?;
668 connection.execute(
669 "DELETE FROM unclaimed_deposits WHERE txid = ? AND vout = ?",
670 params![txid, vout],
671 )?;
672 Ok(())
673 }
674
675 async fn list_deposits(&self) -> Result<Vec<DepositInfo>, StorageError> {
676 let connection = self.get_connection()?;
677 let mut stmt =
678 connection.prepare("SELECT txid, vout, amount_sats, claim_error, refund_tx, refund_tx_id FROM unclaimed_deposits")?;
679 let rows = stmt.query_map(params![], |row| {
680 Ok(DepositInfo {
681 txid: row.get(0)?,
682 vout: row.get(1)?,
683 amount_sats: row.get(2)?,
684 claim_error: row.get(3)?,
685 refund_tx: row.get(4)?,
686 refund_tx_id: row.get(5)?,
687 })
688 })?;
689 let mut deposits = Vec::new();
690 for row in rows {
691 deposits.push(row?);
692 }
693 Ok(deposits)
694 }
695
696 async fn update_deposit(
697 &self,
698 txid: String,
699 vout: u32,
700 payload: UpdateDepositPayload,
701 ) -> Result<(), StorageError> {
702 let connection = self.get_connection()?;
703 match payload {
704 UpdateDepositPayload::ClaimError { error } => {
705 connection.execute(
706 "UPDATE unclaimed_deposits SET claim_error = ? WHERE txid = ? AND vout = ?",
707 params![error, txid, vout],
708 )?;
709 }
710 UpdateDepositPayload::Refund {
711 refund_txid,
712 refund_tx,
713 } => {
714 connection.execute(
715 "UPDATE unclaimed_deposits SET refund_tx = ?, refund_tx_id = ? WHERE txid = ? AND vout = ?",
716 params![refund_tx, refund_txid, txid, vout],
717 )?;
718 }
719 }
720 Ok(())
721 }
722
723 async fn set_lnurl_metadata(
724 &self,
725 metadata: Vec<SetLnurlMetadataItem>,
726 ) -> Result<(), StorageError> {
727 let connection = self.get_connection()?;
728 for metadata in metadata {
729 connection.execute(
730 "INSERT OR REPLACE INTO lnurl_receive_metadata (payment_hash, nostr_zap_request, nostr_zap_receipt, sender_comment)
731 VALUES (?, ?, ?, ?)",
732 params![
733 metadata.payment_hash,
734 metadata.nostr_zap_request,
735 metadata.nostr_zap_receipt,
736 metadata.sender_comment,
737 ],
738 )?;
739 }
740 Ok(())
741 }
742}
743
744fn get_next_revision(tx: &Transaction<'_>) -> Result<u64, SyncStorageError> {
746 let revision = tx
747 .query_row(
748 "UPDATE sync_revision
749 SET revision = revision + 1
750 RETURNING revision",
751 [],
752 |row| row.get(0),
753 )
754 .map_err(map_sqlite_error)?;
755 Ok(revision)
756}
757
758impl From<StorageError> for SyncStorageError {
759 fn from(value: StorageError) -> Self {
760 match value {
761 StorageError::Implementation(s) => SyncStorageError::Implementation(s),
762 StorageError::InitializationError(s) => SyncStorageError::InitializationError(s),
763 StorageError::Serialization(s) => SyncStorageError::Serialization(s),
764 }
765 }
766}
767
768#[macros::async_trait]
769impl SyncStorage for SqliteStorage {
770 async fn add_outgoing_change(
771 &self,
772 record: UnversionedRecordChange,
773 ) -> Result<u64, SyncStorageError> {
774 let mut connection = self.get_connection()?;
775 let tx = connection.transaction().map_err(map_sqlite_error)?;
776 let revision = get_next_revision(&tx)?;
777
778 tx.execute(
779 "INSERT INTO sync_outgoing (
780 record_type
781 , data_id
782 , schema_version
783 , commit_time
784 , updated_fields_json
785 , revision
786 )
787 VALUES (?, ?, ?, strftime('%s','now'), ?, ?)",
788 params![
789 record.id.r#type,
790 record.id.data_id,
791 record.schema_version.clone(),
792 serde_json::to_string(&record.updated_fields)?,
793 revision,
794 ],
795 )
796 .map_err(map_sqlite_error)?;
797
798 tx.commit().map_err(map_sqlite_error)?;
799 Ok(revision)
800 }
801
802 async fn complete_outgoing_sync(&self, record: Record) -> Result<(), SyncStorageError> {
803 let mut connection = self.get_connection()?;
804 let tx = connection.transaction().map_err(map_sqlite_error)?;
805
806 tx.execute(
807 "DELETE FROM sync_outgoing WHERE record_type = ? AND data_id = ? AND revision = ?",
808 params![record.id.r#type, record.id.data_id, record.revision],
809 )
810 .map_err(map_sqlite_error)?;
811
812 tx.execute(
813 "INSERT OR REPLACE INTO sync_state (
814 record_type
815 , data_id
816 , schema_version
817 , commit_time
818 , data
819 , revision
820 )
821 VALUES (?, ?, ?, strftime('%s','now'), ?, ?)",
822 params![
823 record.id.r#type,
824 record.id.data_id,
825 record.schema_version.clone(),
826 serde_json::to_string(&record.data)?,
827 record.revision,
828 ],
829 )
830 .map_err(map_sqlite_error)?;
831
832 tx.commit().map_err(map_sqlite_error)?;
833 Ok(())
834 }
835
836 async fn get_pending_outgoing_changes(
837 &self,
838 limit: u32,
839 ) -> Result<Vec<OutgoingChange>, SyncStorageError> {
840 let connection = self.get_connection()?;
841
842 let mut stmt = connection
843 .prepare(
844 "SELECT o.record_type
845 , o.data_id
846 , o.schema_version
847 , o.commit_time
848 , o.updated_fields_json
849 , o.revision
850 , e.schema_version AS existing_schema_version
851 , e.commit_time AS existing_commit_time
852 , e.data AS existing_data
853 , e.revision AS existing_revision
854 FROM sync_outgoing o
855 LEFT JOIN sync_state e ON o.record_type = e.record_type AND o.data_id = e.data_id
856 ORDER BY o.revision ASC
857 LIMIT ?",
858 )
859 .map_err(map_sqlite_error)?;
860 let mut rows = stmt.query(params![limit]).map_err(map_sqlite_error)?;
861 let mut results = Vec::new();
862 while let Some(row) = rows.next().map_err(map_sqlite_error)? {
863 let parent = if let Some(existing_data) =
864 row.get::<_, Option<String>>(8).map_err(map_sqlite_error)?
865 {
866 Some(Record {
867 id: RecordId::new(
868 row.get::<_, String>(0).map_err(map_sqlite_error)?,
869 row.get::<_, String>(1).map_err(map_sqlite_error)?,
870 ),
871 schema_version: row.get(6).map_err(map_sqlite_error)?,
872 revision: row.get(9).map_err(map_sqlite_error)?,
873 data: serde_json::from_str(&existing_data)?,
874 })
875 } else {
876 None
877 };
878 let change = RecordChange {
879 id: RecordId::new(
880 row.get::<_, String>(0).map_err(map_sqlite_error)?,
881 row.get::<_, String>(1).map_err(map_sqlite_error)?,
882 ),
883 schema_version: row.get(2).map_err(map_sqlite_error)?,
884 updated_fields: serde_json::from_str(
885 &row.get::<_, String>(4).map_err(map_sqlite_error)?,
886 )?,
887 revision: row.get(5).map_err(map_sqlite_error)?,
888 };
889 results.push(OutgoingChange { change, parent });
890 }
891
892 Ok(results)
893 }
894
895 async fn get_last_revision(&self) -> Result<u64, SyncStorageError> {
896 let connection = self.get_connection()?;
897
898 let mut stmt = connection
900 .prepare("SELECT COALESCE(MAX(revision), 0) FROM sync_state")
901 .map_err(map_sqlite_error)?;
902
903 let revision: u64 = stmt
904 .query_row([], |row| row.get(0))
905 .map_err(map_sqlite_error)?;
906
907 Ok(revision)
908 }
909
910 async fn insert_incoming_records(&self, records: Vec<Record>) -> Result<(), SyncStorageError> {
911 if records.is_empty() {
912 return Ok(());
913 }
914
915 let mut connection = self.get_connection()?;
916 let tx = connection.transaction().map_err(map_sqlite_error)?;
917
918 for record in records {
919 tx.execute(
920 "INSERT OR REPLACE INTO sync_incoming (
921 record_type
922 , data_id
923 , schema_version
924 , commit_time
925 , data
926 , revision
927 )
928 VALUES (?, ?, ?, strftime('%s','now'), ?, ?)",
929 params![
930 record.id.r#type,
931 record.id.data_id,
932 record.schema_version.clone(),
933 serde_json::to_string(&record.data)?,
934 record.revision,
935 ],
936 )
937 .map_err(map_sqlite_error)?;
938 }
939
940 tx.commit().map_err(map_sqlite_error)?;
941 Ok(())
942 }
943
944 async fn delete_incoming_record(&self, record: Record) -> Result<(), SyncStorageError> {
945 let connection = self.get_connection()?;
946
947 connection
948 .execute(
949 "DELETE FROM sync_incoming WHERE record_type = ? AND data_id = ? AND revision = ?",
950 params![record.id.r#type, record.id.data_id, record.revision],
951 )
952 .map_err(map_sqlite_error)?;
953
954 Ok(())
955 }
956
957 async fn rebase_pending_outgoing_records(&self, revision: u64) -> Result<(), SyncStorageError> {
958 let mut connection = self.get_connection()?;
959 let tx = connection.transaction().map_err(map_sqlite_error)?;
960
961 let last_revision = tx
962 .query_row(
963 "SELECT COALESCE(MAX(revision), 0) FROM sync_state",
964 [],
965 |row| row.get(0),
966 )
967 .map_err(map_sqlite_error)?;
968
969 let diff = revision.saturating_sub(last_revision);
970
971 tx.execute(
973 "UPDATE sync_outgoing
974 SET revision = revision + ?",
975 params![diff],
976 )
977 .map_err(map_sqlite_error)?;
978
979 tx.commit().map_err(map_sqlite_error)?;
980 Ok(())
981 }
982
983 async fn get_incoming_records(
984 &self,
985 limit: u32,
986 ) -> Result<Vec<IncomingChange>, SyncStorageError> {
987 let connection = self.get_connection()?;
988
989 let mut stmt = connection
990 .prepare(
991 "SELECT i.record_type
992 , i.data_id
993 , i.schema_version
994 , i.data
995 , i.revision
996 , e.schema_version AS existing_schema_version
997 , e.commit_time AS existing_commit_time
998 , e.data AS existing_data
999 , e.revision AS existing_revision
1000 FROM sync_incoming i
1001 LEFT JOIN sync_state e ON i.record_type = e.record_type AND i.data_id = e.data_id
1002 ORDER BY i.revision ASC
1003 LIMIT ?",
1004 )
1005 .map_err(map_sqlite_error)?;
1006
1007 let mut rows = stmt.query(params![limit]).map_err(map_sqlite_error)?;
1008 let mut results = Vec::new();
1009
1010 while let Some(row) = rows.next().map_err(map_sqlite_error)? {
1011 let parent = if let Some(existing_data) =
1012 row.get::<_, Option<String>>(7).map_err(map_sqlite_error)?
1013 {
1014 Some(Record {
1015 id: RecordId::new(
1016 row.get::<_, String>(0).map_err(map_sqlite_error)?,
1017 row.get::<_, String>(1).map_err(map_sqlite_error)?,
1018 ),
1019 schema_version: row.get(5).map_err(map_sqlite_error)?,
1020 revision: row.get(8).map_err(map_sqlite_error)?,
1021 data: serde_json::from_str(&existing_data)?,
1022 })
1023 } else {
1024 None
1025 };
1026 let record = Record {
1027 id: RecordId::new(
1028 row.get::<_, String>(0).map_err(map_sqlite_error)?,
1029 row.get::<_, String>(1).map_err(map_sqlite_error)?,
1030 ),
1031 schema_version: row.get(2).map_err(map_sqlite_error)?,
1032 data: serde_json::from_str(&row.get::<_, String>(3).map_err(map_sqlite_error)?)?,
1033 revision: row.get(4).map_err(map_sqlite_error)?,
1034 };
1035 results.push(IncomingChange {
1036 new_state: record,
1037 old_state: parent,
1038 });
1039 }
1040
1041 Ok(results)
1042 }
1043
1044 async fn get_latest_outgoing_change(&self) -> Result<Option<OutgoingChange>, SyncStorageError> {
1045 let connection = self.get_connection()?;
1046
1047 let mut stmt = connection
1048 .prepare(
1049 "SELECT o.record_type
1050 , o.data_id
1051 , o.schema_version
1052 , o.commit_time
1053 , o.updated_fields_json
1054 , o.revision
1055 , e.schema_version AS existing_schema_version
1056 , e.commit_time AS existing_commit_time
1057 , e.data AS existing_data
1058 , e.revision AS existing_revision
1059 FROM sync_outgoing o
1060 LEFT JOIN sync_state e ON o.record_type = e.record_type AND o.data_id = e.data_id
1061 ORDER BY o.revision DESC
1062 LIMIT 1",
1063 )
1064 .map_err(map_sqlite_error)?;
1065
1066 let mut rows = stmt.query([]).map_err(map_sqlite_error)?;
1067
1068 if let Some(row) = rows.next().map_err(map_sqlite_error)? {
1069 let parent = if let Some(existing_data) =
1070 row.get::<_, Option<String>>(8).map_err(map_sqlite_error)?
1071 {
1072 Some(Record {
1073 id: RecordId::new(
1074 row.get::<_, String>(0).map_err(map_sqlite_error)?,
1075 row.get::<_, String>(1).map_err(map_sqlite_error)?,
1076 ),
1077 schema_version: row.get(6).map_err(map_sqlite_error)?,
1078 revision: row.get(9).map_err(map_sqlite_error)?,
1079 data: serde_json::from_str(&existing_data)?,
1080 })
1081 } else {
1082 None
1083 };
1084 let change = RecordChange {
1085 id: RecordId::new(
1086 row.get::<_, String>(0).map_err(map_sqlite_error)?,
1087 row.get::<_, String>(1).map_err(map_sqlite_error)?,
1088 ),
1089 schema_version: row.get(2).map_err(map_sqlite_error)?,
1090 updated_fields: serde_json::from_str(
1091 &row.get::<_, String>(4).map_err(map_sqlite_error)?,
1092 )?,
1093 revision: row.get(5).map_err(map_sqlite_error)?,
1094 };
1095
1096 return Ok(Some(OutgoingChange { change, parent }));
1097 }
1098
1099 Ok(None)
1100 }
1101
1102 async fn update_record_from_incoming(&self, record: Record) -> Result<(), SyncStorageError> {
1103 let connection = self.get_connection()?;
1104
1105 connection
1106 .execute(
1107 "INSERT OR REPLACE INTO sync_state (
1108 record_type
1109 , data_id
1110 , schema_version
1111 , commit_time
1112 , data
1113 , revision
1114 )
1115 VALUES (?, ?, ?, strftime('%s','now'), ?, ?)",
1116 params![
1117 record.id.r#type,
1118 record.id.data_id,
1119 record.schema_version.clone(),
1120 serde_json::to_string(&record.data)?,
1121 record.revision,
1122 ],
1123 )
1124 .map_err(map_sqlite_error)?;
1125
1126 Ok(())
1127 }
1128}
1129
1130#[allow(clippy::needless_pass_by_value)]
1131fn map_sqlite_error(value: rusqlite::Error) -> SyncStorageError {
1132 SyncStorageError::Implementation(value.to_string())
1133}
1134
1135#[allow(clippy::too_many_lines)]
1136fn map_payment(row: &Row<'_>) -> Result<Payment, rusqlite::Error> {
1137 let withdraw_tx_id: Option<String> = row.get(7)?;
1138 let deposit_tx_id: Option<String> = row.get(8)?;
1139 let spark: Option<i32> = row.get(9)?;
1140 let lightning_invoice: Option<String> = row.get(10)?;
1141 let token_metadata: Option<String> = row.get(17)?;
1142 let details = match (
1143 lightning_invoice,
1144 withdraw_tx_id,
1145 deposit_tx_id,
1146 spark,
1147 token_metadata,
1148 ) {
1149 (Some(invoice), _, _, _, _) => {
1150 let payment_hash: String = row.get(11)?;
1151 let destination_pubkey: String = row.get(12)?;
1152 let description: Option<String> = row.get(13)?;
1153 let preimage: Option<String> = row.get(14)?;
1154 let lnurl_pay_info: Option<LnurlPayInfo> = row.get(15)?;
1155 let lnurl_withdraw_info: Option<LnurlWithdrawInfo> = row.get(16)?;
1156 let lnurl_nostr_zap_request: Option<String> = row.get(22)?;
1157 let lnurl_nostr_zap_receipt: Option<String> = row.get(23)?;
1158 let lnurl_sender_comment: Option<String> = row.get(24)?;
1159 let lnurl_receive_metadata =
1160 if lnurl_nostr_zap_request.is_some() || lnurl_sender_comment.is_some() {
1161 Some(LnurlReceiveMetadata {
1162 nostr_zap_request: lnurl_nostr_zap_request,
1163 nostr_zap_receipt: lnurl_nostr_zap_receipt,
1164 sender_comment: lnurl_sender_comment,
1165 })
1166 } else {
1167 None
1168 };
1169 Some(PaymentDetails::Lightning {
1170 invoice,
1171 payment_hash,
1172 destination_pubkey,
1173 description,
1174 preimage,
1175 lnurl_pay_info,
1176 lnurl_withdraw_info,
1177 lnurl_receive_metadata,
1178 })
1179 }
1180 (_, Some(tx_id), _, _, _) => Some(PaymentDetails::Withdraw { tx_id }),
1181 (_, _, Some(tx_id), _, _) => Some(PaymentDetails::Deposit { tx_id }),
1182 (_, _, _, Some(_), _) => {
1183 let invoice_details_str: Option<String> = row.get(20)?;
1184 let invoice_details = invoice_details_str
1185 .map(|s| {
1186 serde_json::from_str(&s).map_err(|e| {
1187 rusqlite::Error::FromSqlConversionFailure(
1188 20,
1189 rusqlite::types::Type::Text,
1190 e.into(),
1191 )
1192 })
1193 })
1194 .transpose()?;
1195 let htlc_details_str: Option<String> = row.get(21)?;
1196 let htlc_details = htlc_details_str
1197 .map(|s| {
1198 serde_json::from_str(&s).map_err(|e| {
1199 rusqlite::Error::FromSqlConversionFailure(
1200 21,
1201 rusqlite::types::Type::Text,
1202 e.into(),
1203 )
1204 })
1205 })
1206 .transpose()?;
1207 Some(PaymentDetails::Spark {
1208 invoice_details,
1209 htlc_details,
1210 })
1211 }
1212 (_, _, _, _, Some(metadata)) => {
1213 let invoice_details_str: Option<String> = row.get(19)?;
1214 let invoice_details = invoice_details_str
1215 .map(|s| {
1216 serde_json::from_str(&s).map_err(|e| {
1217 rusqlite::Error::FromSqlConversionFailure(
1218 19,
1219 rusqlite::types::Type::Text,
1220 e.into(),
1221 )
1222 })
1223 })
1224 .transpose()?;
1225 Some(PaymentDetails::Token {
1226 metadata: serde_json::from_str(&metadata).map_err(|e| {
1227 rusqlite::Error::FromSqlConversionFailure(
1228 17,
1229 rusqlite::types::Type::Text,
1230 e.into(),
1231 )
1232 })?,
1233 tx_hash: row.get(18)?,
1234 invoice_details,
1235 })
1236 }
1237 _ => None,
1238 };
1239 Ok(Payment {
1240 id: row.get(0)?,
1241 payment_type: row.get::<_, String>(1)?.parse().map_err(|e: String| {
1242 rusqlite::Error::FromSqlConversionFailure(1, rusqlite::types::Type::Text, e.into())
1243 })?,
1244 status: row.get::<_, String>(2)?.parse().map_err(|e: String| {
1245 rusqlite::Error::FromSqlConversionFailure(2, rusqlite::types::Type::Text, e.into())
1246 })?,
1247 amount: row.get::<_, U128SqlWrapper>(3)?.0,
1248 fees: row.get::<_, U128SqlWrapper>(4)?.0,
1249 timestamp: row.get(5)?,
1250 details,
1251 method: row.get(6)?,
1252 })
1253}
1254
1255impl ToSql for PaymentDetails {
1256 fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
1257 to_sql_json(self)
1258 }
1259}
1260
1261impl FromSql for PaymentDetails {
1262 fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
1263 from_sql_json(value)
1264 }
1265}
1266
1267impl ToSql for PaymentMethod {
1268 fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
1269 Ok(rusqlite::types::ToSqlOutput::from(self.to_string()))
1270 }
1271}
1272
1273impl FromSql for PaymentMethod {
1274 fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
1275 match value {
1276 ValueRef::Text(i) => {
1277 let s = std::str::from_utf8(i).map_err(|e| FromSqlError::Other(Box::new(e)))?;
1278 let payment_method: PaymentMethod = s
1280 .trim_matches('"')
1281 .to_lowercase()
1282 .parse()
1283 .map_err(|()| FromSqlError::InvalidType)?;
1284 Ok(payment_method)
1285 }
1286 _ => Err(FromSqlError::InvalidType),
1287 }
1288 }
1289}
1290
1291impl ToSql for DepositClaimError {
1292 fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
1293 to_sql_json(self)
1294 }
1295}
1296
1297impl FromSql for DepositClaimError {
1298 fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
1299 from_sql_json(value)
1300 }
1301}
1302
1303impl ToSql for LnurlPayInfo {
1304 fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
1305 to_sql_json(self)
1306 }
1307}
1308
1309impl FromSql for LnurlPayInfo {
1310 fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
1311 from_sql_json(value)
1312 }
1313}
1314
1315impl ToSql for LnurlWithdrawInfo {
1316 fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
1317 to_sql_json(self)
1318 }
1319}
1320
1321impl FromSql for LnurlWithdrawInfo {
1322 fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
1323 from_sql_json(value)
1324 }
1325}
1326
1327fn to_sql_json<T>(value: T) -> rusqlite::Result<ToSqlOutput<'static>>
1328where
1329 T: serde::Serialize,
1330{
1331 let json = serde_json::to_string(&value)
1332 .map_err(|e| rusqlite::Error::ToSqlConversionFailure(Box::new(e)))?;
1333 Ok(rusqlite::types::ToSqlOutput::from(json))
1334}
1335
1336fn from_sql_json<T>(value: ValueRef<'_>) -> FromSqlResult<T>
1337where
1338 T: serde::de::DeserializeOwned,
1339{
1340 match value {
1341 ValueRef::Text(i) => {
1342 let s = std::str::from_utf8(i).map_err(|e| FromSqlError::Other(Box::new(e)))?;
1343 let deserialized: T = serde_json::from_str(s).map_err(|_| FromSqlError::InvalidType)?;
1344 Ok(deserialized)
1345 }
1346 _ => Err(FromSqlError::InvalidType),
1347 }
1348}
1349
1350struct U128SqlWrapper(u128);
1351
1352impl ToSql for U128SqlWrapper {
1353 fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
1354 let string = self.0.to_string();
1355 Ok(rusqlite::types::ToSqlOutput::from(string))
1356 }
1357}
1358
1359impl FromSql for U128SqlWrapper {
1360 fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
1361 match value {
1362 ValueRef::Text(i) => {
1363 let s = std::str::from_utf8(i).map_err(|e| FromSqlError::Other(Box::new(e)))?;
1364 let integer = s.parse::<u128>().map_err(|_| FromSqlError::InvalidType)?;
1365 Ok(U128SqlWrapper(integer))
1366 }
1367 _ => Err(FromSqlError::InvalidType),
1368 }
1369 }
1370}
1371
1372#[cfg(test)]
1373mod tests {
1374
1375 use crate::SqliteStorage;
1376
1377 #[tokio::test]
1378 async fn test_sqlite_storage() {
1379 let temp_dir = tempdir::TempDir::new("sqlite_storage").unwrap();
1380 let storage = SqliteStorage::new(temp_dir.path()).unwrap();
1381
1382 crate::persist::tests::test_sqlite_storage(Box::new(storage)).await;
1383 }
1384
1385 #[tokio::test]
1386 async fn test_unclaimed_deposits_crud() {
1387 let temp_dir = tempdir::TempDir::new("sqlite_storage_deposits").unwrap();
1388 let storage = SqliteStorage::new(temp_dir.path()).unwrap();
1389
1390 crate::persist::tests::test_unclaimed_deposits_crud(Box::new(storage)).await;
1391 }
1392
1393 #[tokio::test]
1394 async fn test_deposit_refunds() {
1395 let temp_dir = tempdir::TempDir::new("sqlite_storage_refund_tx").unwrap();
1396 let storage = SqliteStorage::new(temp_dir.path()).unwrap();
1397
1398 crate::persist::tests::test_deposit_refunds(Box::new(storage)).await;
1399 }
1400
1401 #[tokio::test]
1402 async fn test_payment_type_filtering() {
1403 let temp_dir = tempdir::TempDir::new("sqlite_storage_type_filter").unwrap();
1404 let storage = SqliteStorage::new(temp_dir.path()).unwrap();
1405
1406 crate::persist::tests::test_payment_type_filtering(Box::new(storage)).await;
1407 }
1408
1409 #[tokio::test]
1410 async fn test_payment_status_filtering() {
1411 let temp_dir = tempdir::TempDir::new("sqlite_storage_status_filter").unwrap();
1412 let storage = SqliteStorage::new(temp_dir.path()).unwrap();
1413
1414 crate::persist::tests::test_payment_status_filtering(Box::new(storage)).await;
1415 }
1416
1417 #[tokio::test]
1418 async fn test_payment_details_filtering() {
1419 let temp_dir = tempdir::TempDir::new("sqlite_storage_details_filter").unwrap();
1420 let storage = SqliteStorage::new(temp_dir.path()).unwrap();
1421
1422 crate::persist::tests::test_asset_filtering(Box::new(storage)).await;
1423 }
1424
1425 #[tokio::test]
1426 async fn test_timestamp_filtering() {
1427 let temp_dir = tempdir::TempDir::new("sqlite_storage_timestamp_filter").unwrap();
1428 let storage = SqliteStorage::new(temp_dir.path()).unwrap();
1429
1430 crate::persist::tests::test_timestamp_filtering(Box::new(storage)).await;
1431 }
1432
1433 #[tokio::test]
1434 async fn test_spark_htlc_status_filtering() {
1435 let temp_dir = tempdir::TempDir::new("sqlite_storage_htlc_filter").unwrap();
1436 let storage = SqliteStorage::new(temp_dir.path()).unwrap();
1437
1438 crate::persist::tests::test_spark_htlc_status_filtering(Box::new(storage)).await;
1439 }
1440
1441 #[tokio::test]
1442 async fn test_combined_filters() {
1443 let temp_dir = tempdir::TempDir::new("sqlite_storage_combined_filter").unwrap();
1444 let storage = SqliteStorage::new(temp_dir.path()).unwrap();
1445
1446 crate::persist::tests::test_combined_filters(Box::new(storage)).await;
1447 }
1448
1449 #[tokio::test]
1450 async fn test_sort_order() {
1451 let temp_dir = tempdir::TempDir::new("sqlite_storage_sort_order").unwrap();
1452 let storage = SqliteStorage::new(temp_dir.path()).unwrap();
1453
1454 crate::persist::tests::test_sort_order(Box::new(storage)).await;
1455 }
1456
1457 #[tokio::test]
1458 async fn test_payment_request_metadata() {
1459 let temp_dir = tempdir::TempDir::new("sqlite_storage_payment_request_metadata").unwrap();
1460 let storage = SqliteStorage::new(temp_dir.path()).unwrap();
1461
1462 crate::persist::tests::test_payment_request_metadata(Box::new(storage)).await;
1463 }
1464
1465 #[tokio::test]
1466 async fn test_sync_storage() {
1467 let temp_dir = tempdir::TempDir::new("sqlite_sync_storage").unwrap();
1468 let storage = SqliteStorage::new(temp_dir.path()).unwrap();
1469
1470 crate::persist::tests::test_sqlite_sync_storage(Box::new(storage)).await;
1471 }
1472}