1use std::path::{Path, PathBuf};
2
3use macros::async_trait;
4use rusqlite::{
5 Connection, Row, ToSql, Transaction, params,
6 types::{FromSql, FromSqlError, FromSqlResult, ToSqlOutput, ValueRef},
7};
8use rusqlite_migration::{M, Migrations, SchemaVersion};
9
10use crate::{
11 AssetFilter, DepositInfo, ListPaymentsRequest, LnurlPayInfo, LnurlWithdrawInfo, PaymentDetails,
12 PaymentMethod,
13 error::DepositClaimError,
14 persist::{PaymentMetadata, UpdateDepositPayload},
15 sync_storage::{
16 IncomingChange, OutgoingChange, Record, RecordChange, RecordId, SyncStorage,
17 SyncStorageError, UnversionedRecordChange,
18 },
19};
20
21use super::{Payment, Storage, StorageError};
22
23const DEFAULT_DB_FILENAME: &str = "storage.sql";
24pub struct SqliteStorage {
26 db_dir: PathBuf,
27}
28
29impl SqliteStorage {
30 pub fn new(path: &Path) -> Result<Self, StorageError> {
40 let storage = Self {
41 db_dir: path.to_path_buf(),
42 };
43
44 #[cfg(not(all(target_family = "wasm", target_os = "unknown")))]
45 std::fs::create_dir_all(path)
46 .map_err(|e| StorageError::InitializationError(e.to_string()))?;
47
48 storage.migrate()?;
49 Ok(storage)
50 }
51
52 pub(crate) fn get_connection(&self) -> Result<Connection, StorageError> {
53 Ok(Connection::open(self.get_db_path())?)
54 }
55
56 fn get_db_path(&self) -> PathBuf {
57 self.db_dir.join(DEFAULT_DB_FILENAME)
58 }
59
60 fn migrate(&self) -> Result<(), StorageError> {
61 let migrations =
62 Migrations::new(Self::current_migrations().into_iter().map(M::up).collect());
63 let mut conn = self.get_connection()?;
64 let previous_version = match migrations.current_version(&conn)? {
65 SchemaVersion::Inside(previous_version) => previous_version.get(),
66 _ => 0,
67 };
68 migrations.to_latest(&mut conn)?;
69
70 if previous_version < 6 {
71 Self::migrate_lnurl_metadata_description(&mut conn)?;
72 }
73
74 Ok(())
75 }
76
77 fn migrate_lnurl_metadata_description(conn: &mut Connection) -> Result<(), StorageError> {
78 let mut stmt = conn.prepare("SELECT payment_id, lnurl_pay_info FROM payment_metadata")?;
79 let pay_infos: Vec<_> = stmt
80 .query_map([], |row| {
81 let payment_id: String = row.get(0)?;
82 let lnurl_pay_info: Option<LnurlPayInfo> = row.get(1)?;
83 Ok((payment_id, lnurl_pay_info))
84 })?
85 .collect::<Result<_, _>>()?;
86 let pay_infos = pay_infos
87 .into_iter()
88 .filter_map(|(payment_id, lnurl_pay_info)| {
89 let pay_info = lnurl_pay_info?;
90 let description = pay_info.extract_description()?;
91 Some((payment_id, description))
92 })
93 .collect::<Vec<_>>();
94
95 for pay_info in pay_infos {
96 conn.execute(
97 "UPDATE payment_metadata SET lnurl_description = ? WHERE payment_id = ?",
98 params![pay_info.1, pay_info.0],
99 )?;
100 }
101
102 Ok(())
103 }
104
105 #[allow(clippy::too_many_lines)]
106 pub(crate) fn current_migrations() -> Vec<&'static str> {
107 vec![
108 "CREATE TABLE IF NOT EXISTS payments (
109 id TEXT PRIMARY KEY,
110 payment_type TEXT NOT NULL,
111 status TEXT NOT NULL,
112 amount INTEGER NOT NULL,
113 fees INTEGER NOT NULL,
114 timestamp INTEGER NOT NULL,
115 details TEXT,
116 method TEXT
117 );",
118 "CREATE TABLE IF NOT EXISTS settings (
119 key TEXT PRIMARY KEY,
120 value TEXT NOT NULL
121 );",
122 "CREATE TABLE IF NOT EXISTS unclaimed_deposits (
123 txid TEXT NOT NULL,
124 vout INTEGER NOT NULL,
125 amount_sats INTEGER,
126 claim_error TEXT,
127 refund_tx TEXT,
128 refund_tx_id TEXT,
129 PRIMARY KEY (txid, vout)
130 );",
131 "CREATE TABLE IF NOT EXISTS payment_metadata (
132 payment_id TEXT PRIMARY KEY,
133 lnurl_pay_info TEXT
134 );",
135 "CREATE TABLE IF NOT EXISTS deposit_refunds (
136 deposit_tx_id TEXT NOT NULL,
137 deposit_vout INTEGER NOT NULL,
138 refund_tx TEXT NOT NULL,
139 refund_tx_id TEXT NOT NULL,
140 PRIMARY KEY (deposit_tx_id, deposit_vout)
141 );",
142 "ALTER TABLE payment_metadata ADD COLUMN lnurl_description TEXT;",
143 "
144 ALTER TABLE payments ADD COLUMN withdraw_tx_id TEXT;
145 ALTER TABLE payments ADD COLUMN deposit_tx_id TEXT;
146 ALTER TABLE payments ADD COLUMN spark INTEGER;
147 CREATE TABLE payment_details_lightning (
148 payment_id TEXT PRIMARY KEY,
149 invoice TEXT NOT NULL,
150 payment_hash TEXT NOT NULL,
151 destination_pubkey TEXT NOT NULL,
152 description TEXT,
153 preimage TEXT,
154 FOREIGN KEY (payment_id) REFERENCES payments(id) ON DELETE CASCADE
155 );
156 INSERT INTO payment_details_lightning (payment_id, invoice, payment_hash, destination_pubkey, description, preimage)
157 SELECT id, json_extract(details, '$.Lightning.invoice'), json_extract(details, '$.Lightning.payment_hash'),
158 json_extract(details, '$.Lightning.destination_pubkey'), json_extract(details, '$.Lightning.description'),
159 json_extract(details, '$.Lightning.preimage')
160 FROM payments WHERE json_extract(details, '$.Lightning.invoice') IS NOT NULL;
161
162 UPDATE payments SET withdraw_tx_id = json_extract(details, '$.Withdraw.tx_id')
163 WHERE json_extract(details, '$.Withdraw.tx_id') IS NOT NULL;
164
165 UPDATE payments SET deposit_tx_id = json_extract(details, '$.Deposit.tx_id')
166 WHERE json_extract(details, '$.Deposit.tx_id') IS NOT NULL;
167
168 ALTER TABLE payments DROP COLUMN details;
169
170 CREATE INDEX idx_payment_details_lightning_invoice ON payment_details_lightning(invoice);
171 ",
172 "CREATE TABLE payment_details_token (
173 payment_id TEXT PRIMARY KEY,
174 metadata TEXT NOT NULL,
175 tx_hash TEXT NOT NULL,
176 FOREIGN KEY (payment_id) REFERENCES payments(id) ON DELETE CASCADE
177 );",
178 "CREATE TABLE payments_new (
180 id TEXT PRIMARY KEY,
181 payment_type TEXT NOT NULL,
182 status TEXT NOT NULL,
183 amount TEXT NOT NULL,
184 fees TEXT NOT NULL,
185 timestamp INTEGER NOT NULL,
186 method TEXT,
187 withdraw_tx_id TEXT,
188 deposit_tx_id TEXT,
189 spark INTEGER
190 );",
191 "INSERT INTO payments_new (id, payment_type, status, amount, fees, timestamp, method, withdraw_tx_id, deposit_tx_id, spark)
192 SELECT id, payment_type, status, CAST(amount AS TEXT), CAST(fees AS TEXT), timestamp, method, withdraw_tx_id, deposit_tx_id, spark
193 FROM payments;",
194 "DROP TABLE payments;",
195 "ALTER TABLE payments_new RENAME TO payments;",
196 "CREATE TABLE payment_details_spark (
197 payment_id TEXT NOT NULL PRIMARY KEY,
198 invoice_details TEXT NOT NULL,
199 FOREIGN KEY (payment_id) REFERENCES payments(id) ON DELETE CASCADE
200 );
201 ALTER TABLE payment_details_token ADD COLUMN invoice_details TEXT;",
202 "ALTER TABLE payment_metadata ADD COLUMN lnurl_withdraw_info TEXT;",
203 "CREATE TABLE sync_revision (
204 revision INTEGER NOT NULL DEFAULT 0
205 );
206 INSERT INTO sync_revision (revision) VALUES (0);
207 CREATE TABLE sync_outgoing(
208 record_type TEXT NOT NULL,
209 data_id TEXT NOT NULL,
210 schema_version TEXT NOT NULL,
211 commit_time INTEGER NOT NULL,
212 updated_fields_json TEXT NOT NULL,
213 revision INTEGER NOT NULL
214 );
215 CREATE INDEX idx_sync_outgoing_data_id_record_type ON sync_outgoing(record_type, data_id);
216 CREATE TABLE sync_state(
217 record_type TEXT NOT NULL,
218 data_id TEXT NOT NULL,
219 schema_version TEXT NOT NULL,
220 commit_time INTEGER NOT NULL,
221 data TEXT NOT NULL,
222 revision INTEGER NOT NULL,
223 PRIMARY KEY(record_type, data_id)
224 );",
225 "CREATE TABLE sync_incoming(
226 record_type TEXT NOT NULL,
227 data_id TEXT NOT NULL,
228 schema_version TEXT NOT NULL,
229 commit_time INTEGER NOT NULL,
230 data TEXT NOT NULL,
231 revision INTEGER NOT NULL,
232 PRIMARY KEY(record_type, data_id, revision)
233 );
234 CREATE INDEX idx_sync_incoming_revision ON sync_incoming(revision);",
235 ]
236 }
237}
238
239impl From<rusqlite::Error> for StorageError {
240 fn from(value: rusqlite::Error) -> Self {
241 StorageError::Implementation(value.to_string())
242 }
243}
244
245impl From<rusqlite_migration::Error> for StorageError {
246 fn from(value: rusqlite_migration::Error) -> Self {
247 StorageError::Implementation(value.to_string())
248 }
249}
250
251#[async_trait]
252impl Storage for SqliteStorage {
253 #[allow(clippy::too_many_lines)]
254 async fn list_payments(
255 &self,
256 request: ListPaymentsRequest,
257 ) -> Result<Vec<Payment>, StorageError> {
258 let connection = self.get_connection()?;
259
260 let mut where_clauses = Vec::new();
262 let mut params: Vec<Box<dyn ToSql>> = Vec::new();
263
264 if let Some(ref type_filter) = request.type_filter
266 && !type_filter.is_empty()
267 {
268 let placeholders = type_filter
269 .iter()
270 .map(|_| "?")
271 .collect::<Vec<_>>()
272 .join(", ");
273 where_clauses.push(format!("p.payment_type IN ({placeholders})"));
274 for payment_type in type_filter {
275 params.push(Box::new(payment_type.to_string()));
276 }
277 }
278
279 if let Some(ref status_filter) = request.status_filter
281 && !status_filter.is_empty()
282 {
283 let placeholders = status_filter
284 .iter()
285 .map(|_| "?")
286 .collect::<Vec<_>>()
287 .join(", ");
288 where_clauses.push(format!("p.status IN ({placeholders})"));
289 for status in status_filter {
290 params.push(Box::new(status.to_string()));
291 }
292 }
293
294 if let Some(from_timestamp) = request.from_timestamp {
296 where_clauses.push("p.timestamp >= ?".to_string());
297 params.push(Box::new(from_timestamp));
298 }
299
300 if let Some(to_timestamp) = request.to_timestamp {
301 where_clauses.push("p.timestamp < ?".to_string());
302 params.push(Box::new(to_timestamp));
303 }
304
305 if let Some(ref asset_filter) = request.asset_filter {
307 match asset_filter {
308 AssetFilter::Bitcoin => {
309 where_clauses.push("t.metadata IS NULL".to_string());
310 }
311 AssetFilter::Token { token_identifier } => {
312 where_clauses.push("t.metadata IS NOT NULL".to_string());
313 if let Some(identifier) = token_identifier {
314 where_clauses
316 .push("json_extract(t.metadata, '$.identifier') = ?".to_string());
317 params.push(Box::new(identifier.clone()));
318 }
319 }
320 }
321 }
322
323 let where_sql = if where_clauses.is_empty() {
325 String::new()
326 } else {
327 format!("WHERE {}", where_clauses.join(" AND "))
328 };
329
330 let order_direction = if request.sort_ascending.unwrap_or(false) {
332 "ASC"
333 } else {
334 "DESC"
335 };
336
337 let query = format!(
338 "SELECT p.id
339 , p.payment_type
340 , p.status
341 , p.amount
342 , p.fees
343 , p.timestamp
344 , p.method
345 , p.withdraw_tx_id
346 , p.deposit_tx_id
347 , p.spark
348 , l.invoice AS lightning_invoice
349 , l.payment_hash AS lightning_payment_hash
350 , l.destination_pubkey AS lightning_destination_pubkey
351 , COALESCE(l.description, pm.lnurl_description) AS lightning_description
352 , l.preimage AS lightning_preimage
353 , pm.lnurl_pay_info
354 , pm.lnurl_withdraw_info
355 , t.metadata AS token_metadata
356 , t.tx_hash AS token_tx_hash
357 , t.invoice_details AS token_invoice_details
358 , s.invoice_details AS spark_invoice_details
359 FROM payments p
360 LEFT JOIN payment_details_lightning l ON p.id = l.payment_id
361 LEFT JOIN payment_details_token t ON p.id = t.payment_id
362 LEFT JOIN payment_details_spark s ON p.id = s.payment_id
363 LEFT JOIN payment_metadata pm ON p.id = pm.payment_id
364 {}
365 ORDER BY p.timestamp {}
366 LIMIT {} OFFSET {}",
367 where_sql,
368 order_direction,
369 request.limit.unwrap_or(u32::MAX),
370 request.offset.unwrap_or(0)
371 );
372
373 let mut stmt = connection.prepare(&query)?;
374 let param_refs: Vec<&dyn ToSql> = params.iter().map(std::convert::AsRef::as_ref).collect();
375 let payments = stmt
376 .query_map(param_refs.as_slice(), map_payment)?
377 .collect::<Result<Vec<_>, _>>()?;
378 Ok(payments)
379 }
380
381 async fn insert_payment(&self, payment: Payment) -> Result<(), StorageError> {
382 let mut connection = self.get_connection()?;
383 let tx = connection.transaction()?;
384 tx.execute(
385 "INSERT OR REPLACE INTO payments (id, payment_type, status, amount, fees, timestamp, method)
386 VALUES (?, ?, ?, ?, ?, ?, ?)",
387 params![
388 payment.id,
389 payment.payment_type.to_string(),
390 payment.status.to_string(),
391 U128SqlWrapper(payment.amount),
392 U128SqlWrapper(payment.fees),
393 payment.timestamp,
394 payment.method,
395 ],
396 )?;
397
398 match payment.details {
399 Some(PaymentDetails::Withdraw { tx_id }) => {
400 tx.execute(
401 "UPDATE payments SET withdraw_tx_id = ? WHERE id = ?",
402 params![tx_id, payment.id],
403 )?;
404 }
405 Some(PaymentDetails::Deposit { tx_id }) => {
406 tx.execute(
407 "UPDATE payments SET deposit_tx_id = ? WHERE id = ?",
408 params![tx_id, payment.id],
409 )?;
410 }
411 Some(PaymentDetails::Spark { invoice_details }) => {
412 tx.execute(
413 "UPDATE payments SET spark = 1 WHERE id = ?",
414 params![payment.id],
415 )?;
416 if let Some(invoice_details) = invoice_details {
417 tx.execute("INSERT OR REPLACE INTO payment_details_spark (payment_id, invoice_details) VALUES (?, ?)",
418 params![payment.id, serde_json::to_string(&invoice_details)?],
419 )?;
420 }
421 }
422 Some(PaymentDetails::Token {
423 metadata,
424 tx_hash,
425 invoice_details,
426 }) => {
427 tx.execute(
428 "INSERT OR REPLACE INTO payment_details_token (payment_id, metadata, tx_hash, invoice_details) VALUES (?, ?, ?, ?)",
429 params![payment.id, serde_json::to_string(&metadata)?, tx_hash, invoice_details.map(|d| serde_json::to_string(&d)).transpose()?],
430 )?;
431 }
432 Some(PaymentDetails::Lightning {
433 invoice,
434 payment_hash,
435 destination_pubkey,
436 description,
437 preimage,
438 ..
439 }) => {
440 tx.execute(
441 "INSERT OR REPLACE INTO payment_details_lightning (payment_id, invoice, payment_hash, destination_pubkey, description, preimage)
442 VALUES (?, ?, ?, ?, ?, ?)",
443 params![
444 payment.id,
445 invoice,
446 payment_hash,
447 destination_pubkey,
448 description,
449 preimage,
450 ],
451 )?;
452 }
453 None => {}
454 }
455
456 tx.commit()?;
457 Ok(())
458 }
459
460 async fn set_payment_metadata(
461 &self,
462 payment_id: String,
463 metadata: PaymentMetadata,
464 ) -> Result<(), StorageError> {
465 let connection = self.get_connection()?;
466
467 connection.execute(
468 "INSERT OR REPLACE INTO payment_metadata (payment_id, lnurl_pay_info, lnurl_withdraw_info, lnurl_description) VALUES (?, ?, ?, ?)",
469 params![payment_id, metadata.lnurl_pay_info, metadata.lnurl_withdraw_info, metadata.lnurl_description],
470 )?;
471
472 Ok(())
473 }
474
475 async fn set_cached_item(&self, key: String, value: String) -> Result<(), StorageError> {
476 let connection = self.get_connection()?;
477
478 connection.execute(
479 "INSERT OR REPLACE INTO settings (key, value) VALUES (?, ?)",
480 params![key, value],
481 )?;
482
483 Ok(())
484 }
485
486 async fn get_cached_item(&self, key: String) -> Result<Option<String>, StorageError> {
487 let connection = self.get_connection()?;
488
489 let mut stmt = connection.prepare("SELECT value FROM settings WHERE key = ?")?;
490
491 let result = stmt.query_row(params![key], |row| {
492 let value_str: String = row.get(0)?;
493 Ok(value_str)
494 });
495
496 match result {
497 Ok(value) => Ok(Some(value)),
498 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
499 Err(e) => Err(e.into()),
500 }
501 }
502
503 async fn delete_cached_item(&self, key: String) -> Result<(), StorageError> {
504 let connection = self.get_connection()?;
505
506 connection.execute("DELETE FROM settings WHERE key = ?", params![key])?;
507
508 Ok(())
509 }
510
511 async fn get_payment_by_id(&self, id: String) -> Result<Payment, StorageError> {
512 let connection = self.get_connection()?;
513
514 let mut stmt = connection.prepare(
515 "SELECT p.id
516 , p.payment_type
517 , p.status
518 , p.amount
519 , p.fees
520 , p.timestamp
521 , p.method
522 , p.withdraw_tx_id
523 , p.deposit_tx_id
524 , p.spark
525 , l.invoice AS lightning_invoice
526 , l.payment_hash AS lightning_payment_hash
527 , l.destination_pubkey AS lightning_destination_pubkey
528 , COALESCE(l.description, pm.lnurl_description) AS lightning_description
529 , l.preimage AS lightning_preimage
530 , pm.lnurl_pay_info
531 , pm.lnurl_withdraw_info
532 , t.metadata AS token_metadata
533 , t.tx_hash AS token_tx_hash
534 , t.invoice_details AS token_invoice_details
535 , s.invoice_details AS spark_invoice_details
536 FROM payments p
537 LEFT JOIN payment_details_lightning l ON p.id = l.payment_id
538 LEFT JOIN payment_details_token t ON p.id = t.payment_id
539 LEFT JOIN payment_details_spark s ON p.id = s.payment_id
540 LEFT JOIN payment_metadata pm ON p.id = pm.payment_id
541 WHERE p.id = ?",
542 )?;
543
544 let payment = stmt.query_row(params![id], map_payment)?;
545 Ok(payment)
546 }
547
548 async fn get_payment_by_invoice(
549 &self,
550 invoice: String,
551 ) -> Result<Option<Payment>, StorageError> {
552 let connection = self.get_connection()?;
553
554 let mut stmt = connection.prepare(
555 "SELECT p.id
556 , p.payment_type
557 , p.status
558 , p.amount
559 , p.fees
560 , p.timestamp
561 , p.method
562 , p.withdraw_tx_id
563 , p.deposit_tx_id
564 , p.spark
565 , l.invoice AS lightning_invoice
566 , l.payment_hash AS lightning_payment_hash
567 , l.destination_pubkey AS lightning_destination_pubkey
568 , COALESCE(l.description, pm.lnurl_description) AS lightning_description
569 , l.preimage AS lightning_preimage
570 , pm.lnurl_pay_info
571 , pm.lnurl_withdraw_info
572 , t.metadata AS token_metadata
573 , t.tx_hash AS token_tx_hash
574 , t.invoice_details AS token_invoice_details
575 , s.invoice_details AS spark_invoice_details
576 FROM payments p
577 LEFT JOIN payment_details_lightning l ON p.id = l.payment_id
578 LEFT JOIN payment_details_token t ON p.id = t.payment_id
579 LEFT JOIN payment_details_spark s ON p.id = s.payment_id
580 LEFT JOIN payment_metadata pm ON p.id = pm.payment_id
581 WHERE l.invoice = ?",
582 )?;
583
584 let payment = stmt.query_row(params![invoice], map_payment);
585 match payment {
586 Ok(payment) => Ok(Some(payment)),
587 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
588 Err(e) => Err(e.into()),
589 }
590 }
591
592 async fn add_deposit(
593 &self,
594 txid: String,
595 vout: u32,
596 amount_sats: u64,
597 ) -> Result<(), StorageError> {
598 let connection = self.get_connection()?;
599 connection.execute(
600 "INSERT OR IGNORE INTO unclaimed_deposits (txid, vout, amount_sats)
601 VALUES (?, ?, ?)",
602 params![txid, vout, amount_sats,],
603 )?;
604 Ok(())
605 }
606
607 async fn delete_deposit(&self, txid: String, vout: u32) -> Result<(), StorageError> {
608 let connection = self.get_connection()?;
609 connection.execute(
610 "DELETE FROM unclaimed_deposits WHERE txid = ? AND vout = ?",
611 params![txid, vout],
612 )?;
613 Ok(())
614 }
615
616 async fn list_deposits(&self) -> Result<Vec<DepositInfo>, StorageError> {
617 let connection = self.get_connection()?;
618 let mut stmt =
619 connection.prepare("SELECT txid, vout, amount_sats, claim_error, refund_tx, refund_tx_id FROM unclaimed_deposits")?;
620 let rows = stmt.query_map(params![], |row| {
621 Ok(DepositInfo {
622 txid: row.get(0)?,
623 vout: row.get(1)?,
624 amount_sats: row.get(2)?,
625 claim_error: row.get(3)?,
626 refund_tx: row.get(4)?,
627 refund_tx_id: row.get(5)?,
628 })
629 })?;
630 let mut deposits = Vec::new();
631 for row in rows {
632 deposits.push(row?);
633 }
634 Ok(deposits)
635 }
636
637 async fn update_deposit(
638 &self,
639 txid: String,
640 vout: u32,
641 payload: UpdateDepositPayload,
642 ) -> Result<(), StorageError> {
643 let connection = self.get_connection()?;
644 match payload {
645 UpdateDepositPayload::ClaimError { error } => {
646 connection.execute(
647 "UPDATE unclaimed_deposits SET claim_error = ? WHERE txid = ? AND vout = ?",
648 params![error, txid, vout],
649 )?;
650 }
651 UpdateDepositPayload::Refund {
652 refund_txid,
653 refund_tx,
654 } => {
655 connection.execute(
656 "UPDATE unclaimed_deposits SET refund_tx = ?, refund_tx_id = ? WHERE txid = ? AND vout = ?",
657 params![refund_tx, refund_txid, txid, vout],
658 )?;
659 }
660 }
661 Ok(())
662 }
663}
664
665fn get_next_revision(tx: &Transaction<'_>) -> Result<u64, SyncStorageError> {
667 let revision = tx
668 .query_row(
669 "UPDATE sync_revision
670 SET revision = revision + 1
671 RETURNING revision",
672 [],
673 |row| row.get(0),
674 )
675 .map_err(map_sqlite_error)?;
676 Ok(revision)
677}
678
679impl From<StorageError> for SyncStorageError {
680 fn from(value: StorageError) -> Self {
681 match value {
682 StorageError::Implementation(s) => SyncStorageError::Implementation(s),
683 StorageError::InitializationError(s) => SyncStorageError::InitializationError(s),
684 StorageError::Serialization(s) => SyncStorageError::Serialization(s),
685 }
686 }
687}
688
689#[macros::async_trait]
690impl SyncStorage for SqliteStorage {
691 async fn add_outgoing_change(
692 &self,
693 record: UnversionedRecordChange,
694 ) -> Result<u64, SyncStorageError> {
695 let mut connection = self.get_connection()?;
696 let tx = connection.transaction().map_err(map_sqlite_error)?;
697 let revision = get_next_revision(&tx)?;
698
699 tx.execute(
700 "INSERT INTO sync_outgoing (
701 record_type
702 , data_id
703 , schema_version
704 , commit_time
705 , updated_fields_json
706 , revision
707 )
708 VALUES (?, ?, ?, strftime('%s','now'), ?, ?)",
709 params![
710 record.id.r#type,
711 record.id.data_id,
712 record.schema_version.clone(),
713 serde_json::to_string(&record.updated_fields)?,
714 revision,
715 ],
716 )
717 .map_err(map_sqlite_error)?;
718
719 tx.commit().map_err(map_sqlite_error)?;
720 Ok(revision)
721 }
722
723 async fn complete_outgoing_sync(&self, record: Record) -> Result<(), SyncStorageError> {
724 let mut connection = self.get_connection()?;
725 let tx = connection.transaction().map_err(map_sqlite_error)?;
726
727 tx.execute(
728 "DELETE FROM sync_outgoing WHERE record_type = ? AND data_id = ? AND revision = ?",
729 params![record.id.r#type, record.id.data_id, record.revision],
730 )
731 .map_err(map_sqlite_error)?;
732
733 tx.execute(
734 "INSERT OR REPLACE INTO sync_state (
735 record_type
736 , data_id
737 , schema_version
738 , commit_time
739 , data
740 , revision
741 )
742 VALUES (?, ?, ?, strftime('%s','now'), ?, ?)",
743 params![
744 record.id.r#type,
745 record.id.data_id,
746 record.schema_version.clone(),
747 serde_json::to_string(&record.data)?,
748 record.revision,
749 ],
750 )
751 .map_err(map_sqlite_error)?;
752
753 tx.commit().map_err(map_sqlite_error)?;
754 Ok(())
755 }
756
757 async fn get_pending_outgoing_changes(
758 &self,
759 limit: u32,
760 ) -> Result<Vec<OutgoingChange>, SyncStorageError> {
761 let connection = self.get_connection()?;
762
763 let mut stmt = connection
764 .prepare(
765 "SELECT o.record_type
766 , o.data_id
767 , o.schema_version
768 , o.commit_time
769 , o.updated_fields_json
770 , o.revision
771 , e.schema_version AS existing_schema_version
772 , e.commit_time AS existing_commit_time
773 , e.data AS existing_data
774 , e.revision AS existing_revision
775 FROM sync_outgoing o
776 LEFT JOIN sync_state e ON o.record_type = e.record_type AND o.data_id = e.data_id
777 ORDER BY o.revision ASC
778 LIMIT ?",
779 )
780 .map_err(map_sqlite_error)?;
781 let mut rows = stmt.query(params![limit]).map_err(map_sqlite_error)?;
782 let mut results = Vec::new();
783 while let Some(row) = rows.next().map_err(map_sqlite_error)? {
784 let parent = if let Some(existing_data) =
785 row.get::<_, Option<String>>(8).map_err(map_sqlite_error)?
786 {
787 Some(Record {
788 id: RecordId::new(
789 row.get::<_, String>(0).map_err(map_sqlite_error)?,
790 row.get::<_, String>(1).map_err(map_sqlite_error)?,
791 ),
792 schema_version: row.get(6).map_err(map_sqlite_error)?,
793 revision: row.get(9).map_err(map_sqlite_error)?,
794 data: serde_json::from_str(&existing_data)?,
795 })
796 } else {
797 None
798 };
799 let change = RecordChange {
800 id: RecordId::new(
801 row.get::<_, String>(0).map_err(map_sqlite_error)?,
802 row.get::<_, String>(1).map_err(map_sqlite_error)?,
803 ),
804 schema_version: row.get(2).map_err(map_sqlite_error)?,
805 updated_fields: serde_json::from_str(
806 &row.get::<_, String>(4).map_err(map_sqlite_error)?,
807 )?,
808 revision: row.get(5).map_err(map_sqlite_error)?,
809 };
810 results.push(OutgoingChange { change, parent });
811 }
812
813 Ok(results)
814 }
815
816 async fn get_last_revision(&self) -> Result<u64, SyncStorageError> {
817 let connection = self.get_connection()?;
818
819 let mut stmt = connection
821 .prepare("SELECT COALESCE(MAX(revision), 0) FROM sync_state")
822 .map_err(map_sqlite_error)?;
823
824 let revision: u64 = stmt
825 .query_row([], |row| row.get(0))
826 .map_err(map_sqlite_error)?;
827
828 Ok(revision)
829 }
830
831 async fn insert_incoming_records(&self, records: Vec<Record>) -> Result<(), SyncStorageError> {
832 if records.is_empty() {
833 return Ok(());
834 }
835
836 let mut connection = self.get_connection()?;
837 let tx = connection.transaction().map_err(map_sqlite_error)?;
838
839 for record in records {
840 tx.execute(
841 "INSERT OR REPLACE INTO sync_incoming (
842 record_type
843 , data_id
844 , schema_version
845 , commit_time
846 , data
847 , revision
848 )
849 VALUES (?, ?, ?, strftime('%s','now'), ?, ?)",
850 params![
851 record.id.r#type,
852 record.id.data_id,
853 record.schema_version.clone(),
854 serde_json::to_string(&record.data)?,
855 record.revision,
856 ],
857 )
858 .map_err(map_sqlite_error)?;
859 }
860
861 tx.commit().map_err(map_sqlite_error)?;
862 Ok(())
863 }
864
865 async fn delete_incoming_record(&self, record: Record) -> Result<(), SyncStorageError> {
866 let connection = self.get_connection()?;
867
868 connection
869 .execute(
870 "DELETE FROM sync_incoming WHERE record_type = ? AND data_id = ? AND revision = ?",
871 params![record.id.r#type, record.id.data_id, record.revision],
872 )
873 .map_err(map_sqlite_error)?;
874
875 Ok(())
876 }
877
878 async fn rebase_pending_outgoing_records(&self, revision: u64) -> Result<(), SyncStorageError> {
879 let mut connection = self.get_connection()?;
880 let tx = connection.transaction().map_err(map_sqlite_error)?;
881
882 let last_revision = tx
883 .query_row(
884 "SELECT COALESCE(MAX(revision), 0) FROM sync_state",
885 [],
886 |row| row.get(0),
887 )
888 .map_err(map_sqlite_error)?;
889
890 let diff = revision.saturating_sub(last_revision);
891
892 tx.execute(
894 "UPDATE sync_outgoing
895 SET revision = revision + ?",
896 params![diff],
897 )
898 .map_err(map_sqlite_error)?;
899
900 tx.commit().map_err(map_sqlite_error)?;
901 Ok(())
902 }
903
904 async fn get_incoming_records(
905 &self,
906 limit: u32,
907 ) -> Result<Vec<IncomingChange>, SyncStorageError> {
908 let connection = self.get_connection()?;
909
910 let mut stmt = connection
911 .prepare(
912 "SELECT i.record_type
913 , i.data_id
914 , i.schema_version
915 , i.data
916 , i.revision
917 , e.schema_version AS existing_schema_version
918 , e.commit_time AS existing_commit_time
919 , e.data AS existing_data
920 , e.revision AS existing_revision
921 FROM sync_incoming i
922 LEFT JOIN sync_state e ON i.record_type = e.record_type AND i.data_id = e.data_id
923 ORDER BY i.revision ASC
924 LIMIT ?",
925 )
926 .map_err(map_sqlite_error)?;
927
928 let mut rows = stmt.query(params![limit]).map_err(map_sqlite_error)?;
929 let mut results = Vec::new();
930
931 while let Some(row) = rows.next().map_err(map_sqlite_error)? {
932 let parent = if let Some(existing_data) =
933 row.get::<_, Option<String>>(7).map_err(map_sqlite_error)?
934 {
935 Some(Record {
936 id: RecordId::new(
937 row.get::<_, String>(0).map_err(map_sqlite_error)?,
938 row.get::<_, String>(1).map_err(map_sqlite_error)?,
939 ),
940 schema_version: row.get(5).map_err(map_sqlite_error)?,
941 revision: row.get(8).map_err(map_sqlite_error)?,
942 data: serde_json::from_str(&existing_data)?,
943 })
944 } else {
945 None
946 };
947 let record = Record {
948 id: RecordId::new(
949 row.get::<_, String>(0).map_err(map_sqlite_error)?,
950 row.get::<_, String>(1).map_err(map_sqlite_error)?,
951 ),
952 schema_version: row.get(2).map_err(map_sqlite_error)?,
953 data: serde_json::from_str(&row.get::<_, String>(3).map_err(map_sqlite_error)?)?,
954 revision: row.get(4).map_err(map_sqlite_error)?,
955 };
956 results.push(IncomingChange {
957 new_state: record,
958 old_state: parent,
959 });
960 }
961
962 Ok(results)
963 }
964
965 async fn get_latest_outgoing_change(&self) -> Result<Option<OutgoingChange>, SyncStorageError> {
966 let connection = self.get_connection()?;
967
968 let mut stmt = connection
969 .prepare(
970 "SELECT o.record_type
971 , o.data_id
972 , o.schema_version
973 , o.commit_time
974 , o.updated_fields_json
975 , o.revision
976 , e.schema_version AS existing_schema_version
977 , e.commit_time AS existing_commit_time
978 , e.data AS existing_data
979 , e.revision AS existing_revision
980 FROM sync_outgoing o
981 LEFT JOIN sync_state e ON o.record_type = e.record_type AND o.data_id = e.data_id
982 ORDER BY o.revision DESC
983 LIMIT 1",
984 )
985 .map_err(map_sqlite_error)?;
986
987 let mut rows = stmt.query([]).map_err(map_sqlite_error)?;
988
989 if let Some(row) = rows.next().map_err(map_sqlite_error)? {
990 let parent = if let Some(existing_data) =
991 row.get::<_, Option<String>>(8).map_err(map_sqlite_error)?
992 {
993 Some(Record {
994 id: RecordId::new(
995 row.get::<_, String>(0).map_err(map_sqlite_error)?,
996 row.get::<_, String>(1).map_err(map_sqlite_error)?,
997 ),
998 schema_version: row.get(6).map_err(map_sqlite_error)?,
999 revision: row.get(9).map_err(map_sqlite_error)?,
1000 data: serde_json::from_str(&existing_data)?,
1001 })
1002 } else {
1003 None
1004 };
1005 let change = RecordChange {
1006 id: RecordId::new(
1007 row.get::<_, String>(0).map_err(map_sqlite_error)?,
1008 row.get::<_, String>(1).map_err(map_sqlite_error)?,
1009 ),
1010 schema_version: row.get(2).map_err(map_sqlite_error)?,
1011 updated_fields: serde_json::from_str(
1012 &row.get::<_, String>(4).map_err(map_sqlite_error)?,
1013 )?,
1014 revision: row.get(5).map_err(map_sqlite_error)?,
1015 };
1016
1017 return Ok(Some(OutgoingChange { change, parent }));
1018 }
1019
1020 Ok(None)
1021 }
1022
1023 async fn update_record_from_incoming(&self, record: Record) -> Result<(), SyncStorageError> {
1024 let connection = self.get_connection()?;
1025
1026 connection
1027 .execute(
1028 "INSERT OR REPLACE INTO sync_state (
1029 record_type
1030 , data_id
1031 , schema_version
1032 , commit_time
1033 , data
1034 , revision
1035 )
1036 VALUES (?, ?, ?, strftime('%s','now'), ?, ?)",
1037 params![
1038 record.id.r#type,
1039 record.id.data_id,
1040 record.schema_version.clone(),
1041 serde_json::to_string(&record.data)?,
1042 record.revision,
1043 ],
1044 )
1045 .map_err(map_sqlite_error)?;
1046
1047 Ok(())
1048 }
1049}
1050
1051#[allow(clippy::needless_pass_by_value)]
1052fn map_sqlite_error(value: rusqlite::Error) -> SyncStorageError {
1053 SyncStorageError::Implementation(value.to_string())
1054}
1055
1056fn map_payment(row: &Row<'_>) -> Result<Payment, rusqlite::Error> {
1057 let withdraw_tx_id: Option<String> = row.get(7)?;
1058 let deposit_tx_id: Option<String> = row.get(8)?;
1059 let spark: Option<i32> = row.get(9)?;
1060 let lightning_invoice: Option<String> = row.get(10)?;
1061 let token_metadata: Option<String> = row.get(17)?;
1062 let details = match (
1063 lightning_invoice,
1064 withdraw_tx_id,
1065 deposit_tx_id,
1066 spark,
1067 token_metadata,
1068 ) {
1069 (Some(invoice), _, _, _, _) => {
1070 let payment_hash: String = row.get(11)?;
1071 let destination_pubkey: String = row.get(12)?;
1072 let description: Option<String> = row.get(13)?;
1073 let preimage: Option<String> = row.get(14)?;
1074 let lnurl_pay_info: Option<LnurlPayInfo> = row.get(15)?;
1075 let lnurl_withdraw_info: Option<LnurlWithdrawInfo> = row.get(16)?;
1076
1077 Some(PaymentDetails::Lightning {
1078 invoice,
1079 payment_hash,
1080 destination_pubkey,
1081 description,
1082 preimage,
1083 lnurl_pay_info,
1084 lnurl_withdraw_info,
1085 })
1086 }
1087 (_, Some(tx_id), _, _, _) => Some(PaymentDetails::Withdraw { tx_id }),
1088 (_, _, Some(tx_id), _, _) => Some(PaymentDetails::Deposit { tx_id }),
1089 (_, _, _, Some(_), _) => {
1090 let invoice_details_str: Option<String> = row.get(20)?;
1091 let invoice_details = invoice_details_str
1092 .map(|s| {
1093 serde_json::from_str(&s).map_err(|e| {
1094 rusqlite::Error::FromSqlConversionFailure(
1095 20,
1096 rusqlite::types::Type::Text,
1097 e.into(),
1098 )
1099 })
1100 })
1101 .transpose()?;
1102 Some(PaymentDetails::Spark { invoice_details })
1103 }
1104 (_, _, _, _, Some(metadata)) => {
1105 let invoice_details_str: Option<String> = row.get(19)?;
1106 let invoice_details = invoice_details_str
1107 .map(|s| {
1108 serde_json::from_str(&s).map_err(|e| {
1109 rusqlite::Error::FromSqlConversionFailure(
1110 19,
1111 rusqlite::types::Type::Text,
1112 e.into(),
1113 )
1114 })
1115 })
1116 .transpose()?;
1117 Some(PaymentDetails::Token {
1118 metadata: serde_json::from_str(&metadata).map_err(|e| {
1119 rusqlite::Error::FromSqlConversionFailure(
1120 17,
1121 rusqlite::types::Type::Text,
1122 e.into(),
1123 )
1124 })?,
1125 tx_hash: row.get(18)?,
1126 invoice_details,
1127 })
1128 }
1129 _ => None,
1130 };
1131 Ok(Payment {
1132 id: row.get(0)?,
1133 payment_type: row.get::<_, String>(1)?.parse().map_err(|e: String| {
1134 rusqlite::Error::FromSqlConversionFailure(1, rusqlite::types::Type::Text, e.into())
1135 })?,
1136 status: row.get::<_, String>(2)?.parse().map_err(|e: String| {
1137 rusqlite::Error::FromSqlConversionFailure(2, rusqlite::types::Type::Text, e.into())
1138 })?,
1139 amount: row.get::<_, U128SqlWrapper>(3)?.0,
1140 fees: row.get::<_, U128SqlWrapper>(4)?.0,
1141 timestamp: row.get(5)?,
1142 details,
1143 method: row.get(6)?,
1144 })
1145}
1146
1147impl ToSql for PaymentDetails {
1148 fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
1149 to_sql_json(self)
1150 }
1151}
1152
1153impl FromSql for PaymentDetails {
1154 fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
1155 from_sql_json(value)
1156 }
1157}
1158
1159impl ToSql for PaymentMethod {
1160 fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
1161 Ok(rusqlite::types::ToSqlOutput::from(self.to_string()))
1162 }
1163}
1164
1165impl FromSql for PaymentMethod {
1166 fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
1167 match value {
1168 ValueRef::Text(i) => {
1169 let s = std::str::from_utf8(i).map_err(|e| FromSqlError::Other(Box::new(e)))?;
1170 let payment_method: PaymentMethod = s
1172 .trim_matches('"')
1173 .to_lowercase()
1174 .parse()
1175 .map_err(|()| FromSqlError::InvalidType)?;
1176 Ok(payment_method)
1177 }
1178 _ => Err(FromSqlError::InvalidType),
1179 }
1180 }
1181}
1182
1183impl ToSql for DepositClaimError {
1184 fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
1185 to_sql_json(self)
1186 }
1187}
1188
1189impl FromSql for DepositClaimError {
1190 fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
1191 from_sql_json(value)
1192 }
1193}
1194
1195impl ToSql for LnurlPayInfo {
1196 fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
1197 to_sql_json(self)
1198 }
1199}
1200
1201impl FromSql for LnurlPayInfo {
1202 fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
1203 from_sql_json(value)
1204 }
1205}
1206
1207impl ToSql for LnurlWithdrawInfo {
1208 fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
1209 to_sql_json(self)
1210 }
1211}
1212
1213impl FromSql for LnurlWithdrawInfo {
1214 fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
1215 from_sql_json(value)
1216 }
1217}
1218
1219fn to_sql_json<T>(value: T) -> rusqlite::Result<ToSqlOutput<'static>>
1220where
1221 T: serde::Serialize,
1222{
1223 let json = serde_json::to_string(&value)
1224 .map_err(|e| rusqlite::Error::ToSqlConversionFailure(Box::new(e)))?;
1225 Ok(rusqlite::types::ToSqlOutput::from(json))
1226}
1227
1228fn from_sql_json<T>(value: ValueRef<'_>) -> FromSqlResult<T>
1229where
1230 T: serde::de::DeserializeOwned,
1231{
1232 match value {
1233 ValueRef::Text(i) => {
1234 let s = std::str::from_utf8(i).map_err(|e| FromSqlError::Other(Box::new(e)))?;
1235 let deserialized: T = serde_json::from_str(s).map_err(|_| FromSqlError::InvalidType)?;
1236 Ok(deserialized)
1237 }
1238 _ => Err(FromSqlError::InvalidType),
1239 }
1240}
1241
1242struct U128SqlWrapper(u128);
1243
1244impl ToSql for U128SqlWrapper {
1245 fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
1246 let string = self.0.to_string();
1247 Ok(rusqlite::types::ToSqlOutput::from(string))
1248 }
1249}
1250
1251impl FromSql for U128SqlWrapper {
1252 fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
1253 match value {
1254 ValueRef::Text(i) => {
1255 let s = std::str::from_utf8(i).map_err(|e| FromSqlError::Other(Box::new(e)))?;
1256 let integer = s.parse::<u128>().map_err(|_| FromSqlError::InvalidType)?;
1257 Ok(U128SqlWrapper(integer))
1258 }
1259 _ => Err(FromSqlError::InvalidType),
1260 }
1261 }
1262}
1263
1264#[cfg(test)]
1265mod tests {
1266
1267 use crate::SqliteStorage;
1268
1269 #[tokio::test]
1270 async fn test_sqlite_storage() {
1271 let temp_dir = tempdir::TempDir::new("sqlite_storage").unwrap();
1272 let storage = SqliteStorage::new(temp_dir.path()).unwrap();
1273
1274 crate::persist::tests::test_sqlite_storage(Box::new(storage)).await;
1275 }
1276
1277 #[tokio::test]
1278 async fn test_unclaimed_deposits_crud() {
1279 let temp_dir = tempdir::TempDir::new("sqlite_storage_deposits").unwrap();
1280 let storage = SqliteStorage::new(temp_dir.path()).unwrap();
1281
1282 crate::persist::tests::test_unclaimed_deposits_crud(Box::new(storage)).await;
1283 }
1284
1285 #[tokio::test]
1286 async fn test_deposit_refunds() {
1287 let temp_dir = tempdir::TempDir::new("sqlite_storage_refund_tx").unwrap();
1288 let storage = SqliteStorage::new(temp_dir.path()).unwrap();
1289
1290 crate::persist::tests::test_deposit_refunds(Box::new(storage)).await;
1291 }
1292
1293 #[tokio::test]
1294 async fn test_payment_type_filtering() {
1295 let temp_dir = tempdir::TempDir::new("sqlite_storage_type_filter").unwrap();
1296 let storage = SqliteStorage::new(temp_dir.path()).unwrap();
1297
1298 crate::persist::tests::test_payment_type_filtering(Box::new(storage)).await;
1299 }
1300
1301 #[tokio::test]
1302 async fn test_payment_status_filtering() {
1303 let temp_dir = tempdir::TempDir::new("sqlite_storage_status_filter").unwrap();
1304 let storage = SqliteStorage::new(temp_dir.path()).unwrap();
1305
1306 crate::persist::tests::test_payment_status_filtering(Box::new(storage)).await;
1307 }
1308
1309 #[tokio::test]
1310 async fn test_payment_details_filtering() {
1311 let temp_dir = tempdir::TempDir::new("sqlite_storage_details_filter").unwrap();
1312 let storage = SqliteStorage::new(temp_dir.path()).unwrap();
1313
1314 crate::persist::tests::test_asset_filtering(Box::new(storage)).await;
1315 }
1316
1317 #[tokio::test]
1318 async fn test_timestamp_filtering() {
1319 let temp_dir = tempdir::TempDir::new("sqlite_storage_timestamp_filter").unwrap();
1320 let storage = SqliteStorage::new(temp_dir.path()).unwrap();
1321
1322 crate::persist::tests::test_timestamp_filtering(Box::new(storage)).await;
1323 }
1324
1325 #[tokio::test]
1326 async fn test_combined_filters() {
1327 let temp_dir = tempdir::TempDir::new("sqlite_storage_combined_filter").unwrap();
1328 let storage = SqliteStorage::new(temp_dir.path()).unwrap();
1329
1330 crate::persist::tests::test_combined_filters(Box::new(storage)).await;
1331 }
1332
1333 #[tokio::test]
1334 async fn test_sort_order() {
1335 let temp_dir = tempdir::TempDir::new("sqlite_storage_sort_order").unwrap();
1336 let storage = SqliteStorage::new(temp_dir.path()).unwrap();
1337
1338 crate::persist::tests::test_sort_order(Box::new(storage)).await;
1339 }
1340
1341 #[tokio::test]
1342 async fn test_payment_request_metadata() {
1343 let temp_dir = tempdir::TempDir::new("sqlite_storage_payment_request_metadata").unwrap();
1344 let storage = SqliteStorage::new(temp_dir.path()).unwrap();
1345
1346 crate::persist::tests::test_payment_request_metadata(Box::new(storage)).await;
1347 }
1348
1349 #[tokio::test]
1350 async fn test_sync_storage() {
1351 let temp_dir = tempdir::TempDir::new("sqlite_sync_storage").unwrap();
1352 let storage = SqliteStorage::new(temp_dir.path()).unwrap();
1353
1354 crate::persist::tests::test_sqlite_sync_storage(Box::new(storage)).await;
1355 }
1356}