1mod address;
2pub(crate) mod asset_metadata;
3mod backup;
4pub(crate) mod bolt12_offer;
5pub(crate) mod cache;
6pub(crate) mod chain;
7mod migrations;
8pub(crate) mod model;
9pub(crate) mod receive;
10pub(crate) mod send;
11pub(crate) mod sync;
12pub(crate) mod wallet_updates;
13
14use std::collections::{HashMap, HashSet};
15use std::ops::Not;
16use std::{path::PathBuf, str::FromStr};
17
18use crate::elements::AssetId;
19use crate::model::*;
20use crate::sync::model::RecordType;
21use crate::utils;
22use anyhow::{anyhow, Result};
23use boltz_client::boltz::{ChainPair, ReversePair, SubmarinePair};
24use log::{error, warn};
25use lwk_wollet::WalletTx;
26use migrations::current_migrations;
27use model::{PaymentTxBalance, PaymentTxDetails};
28use rusqlite::backup::Backup;
29use rusqlite::{
30 params, params_from_iter, Connection, OptionalExtension, Row, ToSql, TransactionBehavior,
31};
32use rusqlite_migration::{Migrations, M};
33use tokio::sync::broadcast::{self, Sender};
34
35const DEFAULT_DB_FILENAME: &str = "storage.sql";
36
37pub struct Persister {
38 main_db_dir: PathBuf,
39 network: LiquidNetwork,
40 pub(crate) sync_trigger: Option<Sender<()>>,
41}
42
43fn get_where_clause_state_in(allowed_states: &[PaymentState]) -> String {
45 format!(
46 "state in ({})",
47 allowed_states
48 .iter()
49 .map(|t| format!("'{}'", *t as i8))
50 .collect::<Vec<_>>()
51 .join(", ")
52 )
53}
54
55fn where_clauses_to_string(where_clauses: Vec<String>) -> String {
56 let mut where_clause_str = String::new();
57 if !where_clauses.is_empty() {
58 where_clause_str = String::from("WHERE ");
59 where_clause_str.push_str(where_clauses.join(" AND ").as_str());
60 }
61 where_clause_str
62}
63
64impl Persister {
65 #[cfg(not(all(target_family = "wasm", target_os = "unknown")))]
67 pub fn new_using_fs(
68 working_dir: &str,
69 network: LiquidNetwork,
70 sync_enabled: bool,
71 asset_metadata: Option<Vec<AssetMetadata>>,
72 ) -> Result<Self> {
73 let main_db_dir = PathBuf::from_str(working_dir)?;
74 if !main_db_dir.exists() {
75 std::fs::create_dir_all(&main_db_dir)?;
76 }
77 Self::new_inner(main_db_dir, network, sync_enabled, asset_metadata, None)
78 }
79
80 #[cfg(all(target_family = "wasm", target_os = "unknown"))]
85 pub fn new_in_memory(
86 database_id: &str,
87 network: LiquidNetwork,
88 sync_enabled: bool,
89 asset_metadata: Option<Vec<AssetMetadata>>,
90 backup_bytes: Option<Vec<u8>>,
91 ) -> Result<Self> {
92 let main_db_dir = PathBuf::from_str(database_id)?;
93 let backup_con = backup_bytes
94 .map(|data| {
95 let size = data.len();
96 let cursor = std::io::Cursor::new(data);
97 let mut conn = Connection::open_in_memory()?;
98 conn.deserialize_read_exact(rusqlite::MAIN_DB, cursor, size, false)?;
99 Ok::<Connection, anyhow::Error>(conn)
100 })
101 .transpose()
102 .unwrap_or_else(|e| {
103 error!("Failed to deserialize backup data: {e} - proceeding without it");
104 None
105 });
106 Self::new_inner(
107 main_db_dir,
108 network,
109 sync_enabled,
110 asset_metadata,
111 backup_con,
112 )
113 }
114
115 fn new_inner(
116 main_db_dir: PathBuf,
117 network: LiquidNetwork,
118 sync_enabled: bool,
119 asset_metadata: Option<Vec<AssetMetadata>>,
120 backup_con: Option<Connection>,
121 ) -> Result<Self> {
122 let mut sync_trigger = None;
123 if sync_enabled {
124 let (events_notifier, _) = broadcast::channel::<()>(1);
125 sync_trigger = Some(events_notifier);
126 }
127
128 let persister = Persister {
129 main_db_dir,
130 network,
131 sync_trigger,
132 };
133
134 if let Some(backup_con) = backup_con {
135 if let Err(e) = (|| {
136 let mut dst_con = persister.get_connection()?;
137 let backup = Backup::new(&backup_con, &mut dst_con)?;
138 backup.step(-1)?;
139 Ok::<(), anyhow::Error>(())
140 })() {
141 error!("Failed to restore from backup: {e} - proceeding without it");
142 }
143 }
144
145 persister.init()?;
146 persister.replace_asset_metadata(asset_metadata)?;
147
148 Ok(persister)
149 }
150
151 fn get_db_path(&self) -> PathBuf {
152 self.main_db_dir.join(DEFAULT_DB_FILENAME)
153 }
154
155 #[cfg(all(target_family = "wasm", target_os = "unknown"))]
160 pub fn clear_in_memory_db(&self) -> Result<()> {
161 rusqlite::ffi::mem_vfs::MemVfsUtil::new().delete_db(
162 self.get_db_path()
163 .to_str()
164 .ok_or(anyhow!("Failed to get db path str"))?,
165 );
166 Ok(())
167 }
168
169 pub(crate) fn get_connection(&self) -> Result<Connection> {
170 Ok(Connection::open(self.get_db_path())?)
171 }
172
173 pub fn init(&self) -> Result<()> {
174 self.migrate_main_db()?;
175 Ok(())
176 }
177
178 #[cfg(all(target_family = "wasm", target_os = "unknown"))]
179 pub fn serialize(&self) -> Result<Vec<u8>> {
180 let con = self.get_connection()?;
181 let db_bytes = con.serialize(rusqlite::MAIN_DB)?;
182 Ok(db_bytes.to_vec())
183 }
184
185 #[cfg(any(test, feature = "test-utils"))]
186 pub(crate) fn get_database_dir(&self) -> &PathBuf {
187 &self.main_db_dir
188 }
189
190 fn migrate_main_db(&self) -> Result<()> {
191 let migrations = Migrations::new(
192 current_migrations(self.network)
193 .into_iter()
194 .map(M::up)
195 .collect(),
196 );
197 let mut conn = self.get_connection()?;
198 migrations.to_latest(&mut conn)?;
199 Ok(())
200 }
201
202 pub(crate) fn fetch_swap_by_id(&self, id: &str) -> Result<Swap> {
203 match self.fetch_send_swap_by_id(id) {
204 Ok(Some(send_swap)) => Ok(Swap::Send(send_swap)),
205 _ => match self.fetch_receive_swap_by_id(id) {
206 Ok(Some(receive_swap)) => Ok(Swap::Receive(receive_swap)),
207 _ => match self.fetch_chain_swap_by_id(id) {
208 Ok(Some(chain_swap)) => Ok(Swap::Chain(chain_swap)),
209 _ => Err(anyhow!("Could not find Swap {id}")),
210 },
211 },
212 }
213 }
214
215 pub(crate) fn insert_or_update_payment_with_wallet_tx(&self, tx: &WalletTx) -> Result<()> {
216 let tx_id = tx.txid.to_string();
217 let is_tx_confirmed = tx.height.is_some();
218
219 let mut tx_balances: HashMap<AssetId, i64> = HashMap::new();
220 for input in &tx.inputs {
221 let Some(input) = input else {
222 continue;
223 };
224 let value = input.unblinded.value as i64;
225 tx_balances
226 .entry(input.unblinded.asset)
227 .and_modify(|v| *v -= value)
228 .or_insert_with(|| -value);
229 }
230 for output in &tx.outputs {
231 let Some(output) = output else {
232 continue;
233 };
234 let value = output.unblinded.value as i64;
235 tx_balances
236 .entry(output.unblinded.asset)
237 .and_modify(|v| *v += value)
238 .or_insert_with(|| value);
239 }
240
241 if tx_balances.is_empty() {
242 warn!("Attempted to persist a payment with no balance: tx_id {tx_id} balances {tx_balances:?}");
243 return Ok(());
244 }
245
246 let lbtc_asset_id = utils::lbtc_asset_id(self.network);
247 let payment_balances: Vec<PaymentTxBalance> = tx_balances
248 .into_iter()
249 .map(|(asset_id, balance)| {
250 let payment_type = match balance >= 0 {
251 true => PaymentType::Receive,
252 false => PaymentType::Send,
253 };
254 let mut amount = balance.unsigned_abs();
255 if payment_type == PaymentType::Send && asset_id == lbtc_asset_id {
256 amount = amount.saturating_sub(tx.fee);
257 }
258 let asset_id = asset_id.to_string();
259 PaymentTxBalance {
260 payment_type,
261 asset_id,
262 amount,
263 }
264 })
265 .collect();
266
267 let maybe_address = tx
268 .outputs
269 .iter()
270 .find(|output| output.is_some())
271 .and_then(|output| {
272 output.clone().and_then(|o| {
273 o.address.blinding_pubkey.map(|blinding_pubkey| {
274 o.address.to_confidential(blinding_pubkey).to_string()
275 })
276 })
277 });
278 let unblinding_data = tx
279 .unblinded_url("")
280 .replace(&format!("tx/{tx_id}#blinded="), "");
281 self.insert_or_update_payment(
282 PaymentTxData {
283 tx_id: tx_id.clone(),
284 timestamp: tx.timestamp,
285 fees_sat: tx.fee,
286 is_confirmed: is_tx_confirmed,
287 unblinding_data: Some(unblinding_data),
288 },
289 &payment_balances,
290 maybe_address.map(|destination| PaymentTxDetails {
291 tx_id,
292 destination,
293 ..Default::default()
294 }),
295 true,
296 )
297 }
298
299 pub(crate) fn list_unconfirmed_payment_txs_data(&self) -> Result<Vec<PaymentTxData>> {
300 let con = self.get_connection()?;
301 let mut stmt = con.prepare(
302 "SELECT tx_id,
303 timestamp,
304 fees_sat,
305 is_confirmed,
306 unblinding_data
307 FROM payment_tx_data
308 WHERE is_confirmed = 0",
309 )?;
310 let payments: Vec<PaymentTxData> = stmt
311 .query_map([], |row| {
312 Ok(PaymentTxData {
313 tx_id: row.get(0)?,
314 timestamp: row.get(1)?,
315 fees_sat: row.get(2)?,
316 is_confirmed: row.get(3)?,
317 unblinding_data: row.get(4)?,
318 })
319 })?
320 .map(|i| i.unwrap())
321 .collect();
322 Ok(payments)
323 }
324
325 fn insert_or_update_payment_balance(
326 con: &Connection,
327 tx_id: &str,
328 balance: &PaymentTxBalance,
329 ) -> Result<()> {
330 con.execute(
331 "INSERT OR REPLACE INTO payment_balance (
332 tx_id,
333 asset_id,
334 payment_type,
335 amount
336 )
337 VALUES (?, ?, ?, ?)",
338 (
339 tx_id,
340 &balance.asset_id,
341 balance.payment_type,
342 balance.amount,
343 ),
344 )?;
345 Ok(())
346 }
347
348 pub(crate) fn insert_or_update_payment(
349 &self,
350 ptx: PaymentTxData,
351 balances: &[PaymentTxBalance],
352 payment_tx_details: Option<PaymentTxDetails>,
353 from_wallet_tx_data: bool,
354 ) -> Result<()> {
355 let mut con = self.get_connection()?;
356 let tx = con.transaction_with_behavior(TransactionBehavior::Immediate)?;
357 tx.execute(
358 "INSERT INTO payment_tx_data (
359 tx_id,
360 timestamp,
361 fees_sat,
362 is_confirmed,
363 unblinding_data
364 )
365 VALUES (?, ?, ?, ?, ?)
366 ON CONFLICT (tx_id)
367 DO UPDATE SET timestamp = CASE WHEN excluded.is_confirmed = 1 THEN excluded.timestamp ELSE timestamp END,
368 fees_sat = excluded.fees_sat,
369 is_confirmed = excluded.is_confirmed,
370 unblinding_data = excluded.unblinding_data
371 ",
372 (
373 &ptx.tx_id,
374 ptx.timestamp.or(Some(utils::now())),
375 ptx.fees_sat,
376 ptx.is_confirmed,
377 ptx.unblinding_data,
378 ),
379 )?;
380
381 for balance in balances {
382 Self::insert_or_update_payment_balance(&tx, &ptx.tx_id, balance)?;
383 }
384
385 let mut trigger_sync = false;
386 if let Some(ref payment_tx_details) = payment_tx_details {
387 Self::insert_or_update_payment_details_inner(
391 &tx,
392 payment_tx_details,
393 from_wallet_tx_data,
394 )?;
395 if !from_wallet_tx_data {
396 self.commit_outgoing(
397 &tx,
398 &payment_tx_details.tx_id,
399 RecordType::PaymentDetails,
400 None,
401 )?;
402 trigger_sync = true;
403 }
404 }
405
406 tx.commit()?;
407 if trigger_sync {
408 self.trigger_sync();
409 }
410
411 Ok(())
412 }
413
414 pub(crate) fn delete_payment_tx_data(&self, tx_id: &str) -> Result<()> {
415 let con = self.get_connection()?;
416
417 con.execute("DELETE FROM payment_tx_data WHERE tx_id = ?", [tx_id])?;
418
419 Ok(())
420 }
421
422 fn insert_or_update_payment_details_inner(
423 con: &Connection,
424 payment_tx_details: &PaymentTxDetails,
425 skip_destination_update: bool,
426 ) -> Result<()> {
427 let destination_update = if skip_destination_update.not() {
428 "destination = excluded.destination,"
429 } else {
430 Default::default()
431 };
432 con.execute(
433 &format!(
434 "INSERT INTO payment_details (
435 tx_id,
436 destination,
437 description,
438 lnurl_info_json,
439 bip353_address,
440 payer_note,
441 asset_fees
442 )
443 VALUES (?, ?, ?, ?, ?, ?, ?)
444 ON CONFLICT (tx_id)
445 DO UPDATE SET
446 {destination_update}
447 description = COALESCE(excluded.description, description),
448 lnurl_info_json = COALESCE(excluded.lnurl_info_json, lnurl_info_json),
449 bip353_address = COALESCE(excluded.bip353_address, bip353_address),
450 payer_note = COALESCE(excluded.payer_note, payer_note),
451 asset_fees = COALESCE(excluded.asset_fees, asset_fees)
452 "
453 ),
454 (
455 &payment_tx_details.tx_id,
456 &payment_tx_details.destination,
457 &payment_tx_details.description,
458 payment_tx_details
459 .lnurl_info
460 .as_ref()
461 .map(|info| serde_json::to_string(&info).ok()),
462 &payment_tx_details.bip353_address,
463 &payment_tx_details.payer_note,
464 &payment_tx_details.asset_fees,
465 ),
466 )?;
467 Ok(())
468 }
469
470 pub(crate) fn insert_or_update_payment_details(
471 &self,
472 payment_tx_details: PaymentTxDetails,
473 ) -> Result<()> {
474 let mut con = self.get_connection()?;
475 let tx = con.transaction_with_behavior(TransactionBehavior::Immediate)?;
476
477 Self::insert_or_update_payment_details_inner(&tx, &payment_tx_details, false)?;
478 self.commit_outgoing(
479 &tx,
480 &payment_tx_details.tx_id,
481 RecordType::PaymentDetails,
482 None,
483 )?;
484 tx.commit()?;
485 self.trigger_sync();
486
487 Ok(())
488 }
489
490 pub(crate) fn get_payment_details(&self, tx_id: &str) -> Result<Option<PaymentTxDetails>> {
491 let con = self.get_connection()?;
492 let mut stmt = con.prepare(
493 "SELECT destination, description, lnurl_info_json, bip353_address, payer_note, asset_fees
494 FROM payment_details
495 WHERE tx_id = ?",
496 )?;
497 let res = stmt.query_row([tx_id], |row| {
498 let destination = row.get(0)?;
499 let description = row.get(1)?;
500 let maybe_lnurl_info_json: Option<String> = row.get(2)?;
501 let maybe_bip353_address = row.get(3)?;
502 let maybe_payer_note = row.get(4)?;
503 let maybe_asset_fees = row.get(5)?;
504 Ok(PaymentTxDetails {
505 tx_id: tx_id.to_string(),
506 destination,
507 description,
508 lnurl_info: maybe_lnurl_info_json
509 .and_then(|info| serde_json::from_str::<LnUrlInfo>(&info).ok()),
510 bip353_address: maybe_bip353_address,
511 payer_note: maybe_payer_note,
512 asset_fees: maybe_asset_fees,
513 })
514 });
515 Ok(res.ok())
516 }
517
518 pub(crate) fn list_ongoing_swaps(&self) -> Result<Vec<Swap>> {
519 let ongoing_send_swaps: Vec<Swap> = self
520 .list_ongoing_send_swaps()?
521 .into_iter()
522 .map(Swap::Send)
523 .collect();
524 let ongoing_receive_swaps: Vec<Swap> = self
525 .list_ongoing_receive_swaps()?
526 .into_iter()
527 .map(Swap::Receive)
528 .collect();
529 let ongoing_chain_swaps: Vec<Swap> = self
530 .list_ongoing_chain_swaps()?
531 .into_iter()
532 .map(Swap::Chain)
533 .collect();
534 Ok([
535 ongoing_send_swaps,
536 ongoing_receive_swaps,
537 ongoing_chain_swaps,
538 ]
539 .concat())
540 }
541
542 fn select_payment_query(
543 &self,
544 where_clause: Option<&str>,
545 offset: Option<u32>,
546 limit: Option<u32>,
547 sort_ascending: Option<bool>,
548 include_all_states: Option<bool>,
549 ) -> String {
550 let (where_receive_swap_clause, where_chain_swap_clause) = if include_all_states
551 .unwrap_or_default()
552 {
553 ("true", "true")
554 } else {
555 (
556 "COALESCE(claim_tx_id, lockup_tx_id, mrh_tx_id) IS NOT NULL AND state NOT IN (0, 3, 4)",
558 "COALESCE(user_lockup_tx_id, claim_tx_id) IS NOT NULL AND state NOT IN (0, 4)",
560 )
561 };
562
563 format!(
564 "
565 SELECT
566 ptx.tx_id,
567 ptx.timestamp,
568 ptx.fees_sat,
569 ptx.is_confirmed,
570 ptx.unblinding_data,
571 pb.amount,
572 pb.asset_id,
573 pb.payment_type,
574 rs.id,
575 rs.created_at,
576 rs.timeout_block_height,
577 rs.invoice,
578 rs.bolt12_offer,
579 rs.payment_hash,
580 rs.destination_pubkey,
581 rs.description,
582 rs.payer_note,
583 rs.preimage,
584 rs.payer_amount_sat,
585 rs.receiver_amount_sat,
586 rs.state,
587 rs.pair_fees_json,
588 rs.claim_tx_id,
589 ss.id,
590 ss.created_at,
591 ss.timeout_block_height,
592 ss.invoice,
593 ss.bolt12_offer,
594 ss.payment_hash,
595 ss.destination_pubkey,
596 ss.description,
597 ss.preimage,
598 ss.refund_tx_id,
599 ss.payer_amount_sat,
600 ss.receiver_amount_sat,
601 ss.state,
602 ss.pair_fees_json,
603 cs.id,
604 cs.created_at,
605 cs.timeout_block_height,
606 cs.direction,
607 cs.preimage,
608 cs.description,
609 cs.refund_tx_id,
610 cs.payer_amount_sat,
611 cs.receiver_amount_sat,
612 cs.claim_address,
613 cs.lockup_address,
614 cs.state,
615 cs.pair_fees_json,
616 cs.actual_payer_amount_sat,
617 cs.accepted_receiver_amount_sat,
618 cs.auto_accepted_fees,
619 cs.user_lockup_tx_id,
620 cs.claim_tx_id,
621 rb.amount,
622 pd.destination,
623 pd.description,
624 pd.lnurl_info_json,
625 pd.bip353_address,
626 pd.payer_note,
627 pd.asset_fees,
628 am.name,
629 am.ticker,
630 am.precision
631 FROM payment_tx_data AS ptx -- Payment tx (each tx results in a Payment)
632 LEFT JOIN payment_balance AS pb
633 ON pb.tx_id = ptx.tx_id -- Payment tx balances, split by asset
634 FULL JOIN (
635 SELECT * FROM receive_swaps WHERE {}
636 ) rs -- Receive Swap data
637 ON ptx.tx_id in (rs.claim_tx_id, rs.mrh_tx_id)
638 FULL JOIN (
639 SELECT * FROM chain_swaps WHERE {}
640 ) cs -- Chain Swap data
641 ON ptx.tx_id in (cs.user_lockup_tx_id, cs.claim_tx_id)
642 LEFT JOIN send_swaps AS ss -- Send Swap data
643 ON ptx.tx_id = ss.lockup_tx_id
644 LEFT JOIN payment_balance AS rb -- Refund tx balance
645 ON rb.tx_id in (ss.refund_tx_id, cs.refund_tx_id)
646 LEFT JOIN payment_details AS pd -- Payment details
647 ON pd.tx_id = ptx.tx_id
648 LEFT JOIN asset_metadata AS am -- Asset metadata
649 ON am.asset_id = pb.asset_id
650 WHERE
651 (ptx.tx_id IS NULL -- Filter out refund txs from Chain/Send Swaps
652 OR ptx.tx_id NOT IN (SELECT refund_tx_id FROM send_swaps WHERE refund_tx_id NOT NULL)
653 AND ptx.tx_id NOT IN (SELECT refund_tx_id FROM chain_swaps WHERE refund_tx_id NOT NULL))
654 AND {}
655 ORDER BY -- Order by swap creation time or tx timestamp (in case of direct tx)
656 COALESCE(rs.created_at, ss.created_at, cs.created_at, ptx.timestamp) {}
657 LIMIT {}
658 OFFSET {}
659 ",
660 where_receive_swap_clause,
661 where_chain_swap_clause,
662 where_clause.unwrap_or("true"),
663 match sort_ascending.unwrap_or(false) {
664 true => "ASC",
665 false => "DESC",
666 },
667 limit.unwrap_or(u32::MAX),
668 offset.unwrap_or(0),
669 )
670 }
671
672 fn sql_row_to_payment(&self, row: &Row) -> Result<Payment, rusqlite::Error> {
673 let maybe_tx_tx_id: Result<String, rusqlite::Error> = row.get(0);
674 let tx_with_balance = match maybe_tx_tx_id {
675 Ok(ref tx_id) => Some((
676 PaymentTxData {
677 tx_id: tx_id.to_string(),
678 timestamp: row.get(1)?,
679 fees_sat: row.get(2)?,
680 is_confirmed: row.get(3)?,
681 unblinding_data: row.get(4)?,
682 },
683 PaymentTxBalance {
684 amount: row.get(5)?,
685 asset_id: row.get(6)?,
686 payment_type: row.get(7)?,
687 },
688 )),
689 _ => None,
690 };
691
692 let maybe_receive_swap_id: Option<String> = row.get(8)?;
693 let maybe_receive_swap_created_at: Option<u32> = row.get(9)?;
694 let maybe_receive_swap_timeout_block_height: Option<u32> = row.get(10)?;
695 let maybe_receive_swap_invoice: Option<String> = row.get(11)?;
696 let maybe_receive_swap_bolt12_offer: Option<String> = row.get(12)?;
697 let maybe_receive_swap_payment_hash: Option<String> = row.get(13)?;
698 let maybe_receive_swap_destination_pubkey: Option<String> = row.get(14)?;
699 let maybe_receive_swap_description: Option<String> = row.get(15)?;
700 let maybe_receive_swap_payer_note: Option<String> = row.get(16)?;
701 let maybe_receive_swap_preimage: Option<String> = row.get(17)?;
702 let maybe_receive_swap_payer_amount_sat: Option<u64> = row.get(18)?;
703 let maybe_receive_swap_receiver_amount_sat: Option<u64> = row.get(19)?;
704 let maybe_receive_swap_receiver_state: Option<PaymentState> = row.get(20)?;
705 let maybe_receive_swap_pair_fees_json: Option<String> = row.get(21)?;
706 let maybe_receive_swap_pair_fees: Option<ReversePair> =
707 maybe_receive_swap_pair_fees_json.and_then(|pair| serde_json::from_str(&pair).ok());
708 let maybe_receive_swap_claim_tx_id: Option<String> = row.get(22)?;
709
710 let maybe_send_swap_id: Option<String> = row.get(23)?;
711 let maybe_send_swap_created_at: Option<u32> = row.get(24)?;
712 let maybe_send_swap_timeout_block_height: Option<u32> = row.get(25)?;
713 let maybe_send_swap_invoice: Option<String> = row.get(26)?;
714 let maybe_send_swap_bolt12_offer: Option<String> = row.get(27)?;
715 let maybe_send_swap_payment_hash: Option<String> = row.get(28)?;
716 let maybe_send_swap_destination_pubkey: Option<String> = row.get(29)?;
717 let maybe_send_swap_description: Option<String> = row.get(30)?;
718 let maybe_send_swap_preimage: Option<String> = row.get(31)?;
719 let maybe_send_swap_refund_tx_id: Option<String> = row.get(32)?;
720 let maybe_send_swap_payer_amount_sat: Option<u64> = row.get(33)?;
721 let maybe_send_swap_receiver_amount_sat: Option<u64> = row.get(34)?;
722 let maybe_send_swap_state: Option<PaymentState> = row.get(35)?;
723 let maybe_send_swap_pair_fees_json: Option<String> = row.get(36)?;
724 let maybe_send_swap_pair_fees: Option<SubmarinePair> =
725 maybe_send_swap_pair_fees_json.and_then(|pair| serde_json::from_str(&pair).ok());
726
727 let maybe_chain_swap_id: Option<String> = row.get(37)?;
728 let maybe_chain_swap_created_at: Option<u32> = row.get(38)?;
729 let maybe_chain_swap_timeout_block_height: Option<u32> = row.get(39)?;
730 let maybe_chain_swap_direction: Option<Direction> = row.get(40)?;
731 let maybe_chain_swap_preimage: Option<String> = row.get(41)?;
732 let maybe_chain_swap_description: Option<String> = row.get(42)?;
733 let maybe_chain_swap_refund_tx_id: Option<String> = row.get(43)?;
734 let maybe_chain_swap_payer_amount_sat: Option<u64> = row.get(44)?;
735 let maybe_chain_swap_receiver_amount_sat: Option<u64> = row.get(45)?;
736 let maybe_chain_swap_claim_address: Option<String> = row.get(46)?;
737 let maybe_chain_swap_lockup_address: Option<String> = row.get(47)?;
738 let maybe_chain_swap_state: Option<PaymentState> = row.get(48)?;
739 let maybe_chain_swap_pair_fees_json: Option<String> = row.get(49)?;
740 let maybe_chain_swap_pair_fees: Option<ChainPair> =
741 maybe_chain_swap_pair_fees_json.and_then(|pair| serde_json::from_str(&pair).ok());
742 let maybe_chain_swap_actual_payer_amount_sat: Option<u64> = row.get(50)?;
743 let maybe_chain_swap_accepted_receiver_amount_sat: Option<u64> = row.get(51)?;
744 let maybe_chain_swap_auto_accepted_fees: Option<bool> = row.get(52)?;
745 let maybe_chain_swap_user_lockup_tx_id: Option<String> = row.get(53)?;
746 let maybe_chain_swap_claim_tx_id: Option<String> = row.get(54)?;
747
748 let maybe_swap_refund_tx_amount_sat: Option<u64> = row.get(55)?;
749
750 let maybe_payment_details_destination: Option<String> = row.get(56)?;
751 let maybe_payment_details_description: Option<String> = row.get(57)?;
752 let maybe_payment_details_lnurl_info_json: Option<String> = row.get(58)?;
753 let maybe_payment_details_lnurl_info: Option<LnUrlInfo> =
754 maybe_payment_details_lnurl_info_json.and_then(|info| serde_json::from_str(&info).ok());
755 let maybe_payment_details_bip353_address: Option<String> = row.get(59)?;
756 let maybe_payment_details_payer_note: Option<String> = row.get(60)?;
757 let maybe_payment_details_asset_fees: Option<u64> = row.get(61)?;
758
759 let maybe_asset_metadata_name: Option<String> = row.get(62)?;
760 let maybe_asset_metadata_ticker: Option<String> = row.get(63)?;
761 let maybe_asset_metadata_precision: Option<u8> = row.get(64)?;
762
763 let bitcoin_address = match maybe_chain_swap_direction {
764 Some(Direction::Incoming) => maybe_chain_swap_lockup_address,
765 Some(Direction::Outgoing) => maybe_chain_swap_claim_address,
766 None => None,
767 };
768
769 let (swap, payment_type) = match maybe_receive_swap_id {
770 Some(receive_swap_id) => {
771 let payer_amount_sat = maybe_receive_swap_payer_amount_sat.unwrap_or(0);
772
773 (
774 Some(PaymentSwapData {
775 swap_id: receive_swap_id,
776 swap_type: PaymentSwapType::Receive,
777 created_at: maybe_receive_swap_created_at.unwrap_or(utils::now()),
778 expiration_blockheight: maybe_receive_swap_timeout_block_height
779 .unwrap_or(0),
780 preimage: maybe_receive_swap_preimage,
781 invoice: maybe_receive_swap_invoice.clone(),
782 bolt12_offer: maybe_receive_swap_bolt12_offer,
783 payment_hash: maybe_receive_swap_payment_hash,
784 destination_pubkey: maybe_receive_swap_destination_pubkey,
785 description: maybe_receive_swap_description.unwrap_or_else(|| {
786 maybe_receive_swap_invoice
787 .and_then(|invoice| {
788 utils::get_invoice_description(&invoice).ok().flatten()
789 })
790 .unwrap_or("Lightning payment".to_string())
791 }),
792 payer_note: maybe_receive_swap_payer_note,
793 payer_amount_sat,
794 receiver_amount_sat: maybe_receive_swap_receiver_amount_sat.unwrap_or(0),
795 swapper_fees_sat: maybe_receive_swap_pair_fees
796 .map(|pair| pair.fees.boltz(payer_amount_sat))
797 .unwrap_or(0),
798 refund_tx_id: None,
799 refund_tx_amount_sat: None,
800 bitcoin_address: None,
801 status: maybe_receive_swap_receiver_state.unwrap_or(PaymentState::Created),
802 }),
803 PaymentType::Receive,
804 )
805 }
806 None => match maybe_send_swap_id {
807 Some(send_swap_id) => {
808 let receiver_amount_sat = maybe_send_swap_receiver_amount_sat.unwrap_or(0);
809 (
810 Some(PaymentSwapData {
811 swap_id: send_swap_id,
812 swap_type: PaymentSwapType::Send,
813 created_at: maybe_send_swap_created_at.unwrap_or(utils::now()),
814 expiration_blockheight: maybe_send_swap_timeout_block_height
815 .unwrap_or(0),
816 preimage: maybe_send_swap_preimage,
817 invoice: maybe_send_swap_invoice,
818 bolt12_offer: maybe_send_swap_bolt12_offer,
819 payment_hash: maybe_send_swap_payment_hash,
820 destination_pubkey: maybe_send_swap_destination_pubkey,
821 description: maybe_send_swap_description
822 .unwrap_or("Lightning payment".to_string()),
823 payer_note: None,
824 payer_amount_sat: maybe_send_swap_payer_amount_sat.unwrap_or(0),
825 receiver_amount_sat,
826 swapper_fees_sat: maybe_send_swap_pair_fees
827 .map(|pair| pair.fees.boltz(receiver_amount_sat))
828 .unwrap_or(0),
829 refund_tx_id: maybe_send_swap_refund_tx_id,
830 refund_tx_amount_sat: maybe_swap_refund_tx_amount_sat,
831 bitcoin_address: None,
832 status: maybe_send_swap_state.unwrap_or(PaymentState::Created),
833 }),
834 PaymentType::Send,
835 )
836 }
837 None => match maybe_chain_swap_id {
838 Some(chain_swap_id) => {
839 let (payer_amount_sat, receiver_amount_sat) = match (
840 maybe_chain_swap_actual_payer_amount_sat,
841 maybe_chain_swap_payer_amount_sat,
842 ) {
843 (Some(actual_payer_amount_sat), Some(0)) => {
846 (actual_payer_amount_sat, actual_payer_amount_sat)
847 }
848 _ => (
850 maybe_chain_swap_payer_amount_sat.unwrap_or(0),
851 maybe_chain_swap_receiver_amount_sat.unwrap_or(0),
852 ),
853 };
854 let receiver_amount_sat =
855 match maybe_chain_swap_accepted_receiver_amount_sat {
856 Some(accepted_receiver_amount_sat) => accepted_receiver_amount_sat,
858 None => receiver_amount_sat,
859 };
860 let swapper_fees_sat = maybe_chain_swap_pair_fees
861 .map(|pair| pair.fees.percentage)
862 .map(|fr| ((fr / 100.0) * payer_amount_sat as f64).ceil() as u64)
863 .unwrap_or(0);
864
865 (
866 Some(PaymentSwapData {
867 swap_id: chain_swap_id,
868 swap_type: PaymentSwapType::Chain,
869 created_at: maybe_chain_swap_created_at.unwrap_or(utils::now()),
870 expiration_blockheight: maybe_chain_swap_timeout_block_height
871 .unwrap_or(0),
872 preimage: maybe_chain_swap_preimage,
873 invoice: None,
874 bolt12_offer: None, payment_hash: None,
876 destination_pubkey: None,
877 description: maybe_chain_swap_description
878 .unwrap_or("Bitcoin transfer".to_string()),
879 payer_note: None,
880 payer_amount_sat,
881 receiver_amount_sat,
882 swapper_fees_sat,
883 refund_tx_id: maybe_chain_swap_refund_tx_id,
884 refund_tx_amount_sat: maybe_swap_refund_tx_amount_sat,
885 bitcoin_address: bitcoin_address.clone(),
886 status: maybe_chain_swap_state.unwrap_or(PaymentState::Created),
887 }),
888 maybe_chain_swap_direction
889 .unwrap_or(Direction::Outgoing)
890 .into(),
891 )
892 }
893 None => (None, PaymentType::Send),
894 },
895 },
896 };
897
898 let maybe_claim_tx_id = maybe_receive_swap_claim_tx_id.or(maybe_chain_swap_claim_tx_id);
899 let description = swap.as_ref().map(|s| s.description.clone());
900 let payment_details = match swap.clone() {
901 Some(
902 PaymentSwapData {
903 swap_type: PaymentSwapType::Receive,
904 swap_id,
905 invoice,
906 bolt12_offer,
907 payment_hash,
908 destination_pubkey,
909 payer_note,
910 refund_tx_id,
911 preimage,
912 refund_tx_amount_sat,
913 expiration_blockheight,
914 ..
915 }
916 | PaymentSwapData {
917 swap_type: PaymentSwapType::Send,
918 swap_id,
919 invoice,
920 bolt12_offer,
921 payment_hash,
922 destination_pubkey,
923 payer_note,
924 preimage,
925 refund_tx_id,
926 refund_tx_amount_sat,
927 expiration_blockheight,
928 ..
929 },
930 ) => PaymentDetails::Lightning {
931 swap_id,
932 preimage,
933 invoice: invoice.clone(),
934 bolt12_offer: bolt12_offer.clone(),
935 payment_hash,
936 destination_pubkey: destination_pubkey.or_else(|| {
937 invoice.and_then(|invoice| {
938 utils::get_invoice_destination_pubkey(&invoice, bolt12_offer.is_some()).ok()
939 })
940 }),
941 lnurl_info: maybe_payment_details_lnurl_info,
942 bip353_address: maybe_payment_details_bip353_address,
943 payer_note: payer_note.or(maybe_payment_details_payer_note),
944 claim_tx_id: maybe_claim_tx_id,
945 refund_tx_id,
946 refund_tx_amount_sat,
947 description: maybe_payment_details_description
948 .unwrap_or(description.unwrap_or("Lightning transfer".to_string())),
949 liquid_expiration_blockheight: expiration_blockheight,
950 },
951 Some(PaymentSwapData {
952 swap_type: PaymentSwapType::Chain,
953 swap_id,
954 refund_tx_id,
955 refund_tx_amount_sat,
956 expiration_blockheight,
957 ..
958 }) => {
959 let (bitcoin_expiration_blockheight, liquid_expiration_blockheight) =
960 match maybe_chain_swap_direction {
961 Some(Direction::Incoming) => (Some(expiration_blockheight), None),
962 Some(Direction::Outgoing) | None => (None, Some(expiration_blockheight)),
963 };
964 let auto_accepted_fees = maybe_chain_swap_auto_accepted_fees.unwrap_or(false);
965
966 PaymentDetails::Bitcoin {
967 swap_id,
968 bitcoin_address: bitcoin_address.unwrap_or_default(),
969 lockup_tx_id: maybe_chain_swap_user_lockup_tx_id,
970 claim_tx_id: maybe_claim_tx_id,
971 refund_tx_id,
972 refund_tx_amount_sat,
973 description: description.unwrap_or("Bitcoin transfer".to_string()),
974 liquid_expiration_blockheight,
975 bitcoin_expiration_blockheight,
976 auto_accepted_fees,
977 }
978 }
979 _ => {
980 let (amount, asset_id) = tx_with_balance.clone().map_or(
981 (0, utils::lbtc_asset_id(self.network).to_string()),
982 |(_, b)| (b.amount, b.asset_id),
983 );
984 let asset_info = match (
985 maybe_asset_metadata_name,
986 maybe_asset_metadata_ticker,
987 maybe_asset_metadata_precision,
988 ) {
989 (Some(name), Some(ticker), Some(precision)) => {
990 let asset_metadata = AssetMetadata {
991 asset_id: asset_id.clone(),
992 name: name.clone(),
993 ticker: ticker.clone(),
994 precision,
995 fiat_id: None,
996 };
997 let (amount, fees) =
998 maybe_payment_details_asset_fees.map_or((amount, None), |fees| {
999 (
1000 amount.saturating_sub(fees),
1001 Some(asset_metadata.amount_from_sat(fees)),
1002 )
1003 });
1004
1005 Some(AssetInfo {
1006 name,
1007 ticker,
1008 amount: asset_metadata.amount_from_sat(amount),
1009 fees,
1010 })
1011 }
1012 _ => None,
1013 };
1014
1015 PaymentDetails::Liquid {
1016 destination: maybe_payment_details_destination
1017 .unwrap_or("Destination unknown".to_string()),
1018 description: maybe_payment_details_description
1019 .unwrap_or("Liquid transfer".to_string()),
1020 asset_id,
1021 asset_info,
1022 lnurl_info: maybe_payment_details_lnurl_info,
1023 bip353_address: maybe_payment_details_bip353_address,
1024 payer_note: maybe_payment_details_payer_note,
1025 }
1026 }
1027 };
1028
1029 match (tx_with_balance, swap.clone()) {
1030 (None, None) => Err(maybe_tx_tx_id.err().unwrap()),
1031 (None, Some(swap)) => Ok(Payment::from_pending_swap(
1032 swap,
1033 payment_type,
1034 payment_details,
1035 )),
1036 (Some((tx, balance)), None) => {
1037 Ok(Payment::from_tx_data(tx, balance, None, payment_details))
1038 }
1039 (Some((tx, balance)), Some(swap)) => Ok(Payment::from_tx_data(
1040 tx,
1041 balance,
1042 Some(swap),
1043 payment_details,
1044 )),
1045 }
1046 }
1047
1048 pub fn get_payment(&self, id: &str) -> Result<Option<Payment>> {
1049 Ok(self
1050 .get_connection()?
1051 .query_row(
1052 &self.select_payment_query(
1053 Some("(ptx.tx_id = ?1 OR COALESCE(rs.id, ss.id, cs.id) = ?1)"),
1054 None,
1055 None,
1056 None,
1057 None,
1058 ),
1059 params![id],
1060 |row| self.sql_row_to_payment(row),
1061 )
1062 .optional()?)
1063 }
1064
1065 pub fn get_payment_by_request(&self, req: &GetPaymentRequest) -> Result<Option<Payment>> {
1066 let (where_clause, param) = match req {
1067 GetPaymentRequest::PaymentHash { payment_hash } => (
1068 "(rs.payment_hash = ?1 OR ss.payment_hash = ?1)",
1069 payment_hash,
1070 ),
1071 GetPaymentRequest::SwapId { swap_id } => (
1072 "(rs.id = ?1 OR ss.id = ?1 OR cs.id = ?1 OR \
1073 rs.id_hash = ?1 OR ss.id_hash = ?1 OR cs.id_hash = ?1)",
1074 swap_id,
1075 ),
1076 };
1077 Ok(self
1078 .get_connection()?
1079 .query_row(
1080 &self.select_payment_query(Some(where_clause), None, None, None, Some(true)),
1081 params![param],
1082 |row| self.sql_row_to_payment(row),
1083 )
1084 .optional()?)
1085 }
1086
1087 pub fn get_payments(&self, req: &ListPaymentsRequest) -> Result<Vec<Payment>> {
1088 let (where_clause, where_params) = filter_to_where_clause(req);
1089 let maybe_where_clause = match where_clause.is_empty() {
1090 false => Some(where_clause.as_str()),
1091 true => None,
1092 };
1093
1094 let con = self.get_connection()?;
1096 let mut stmt = con.prepare(&self.select_payment_query(
1097 maybe_where_clause,
1098 req.offset,
1099 req.limit,
1100 req.sort_ascending,
1101 None,
1102 ))?;
1103 let payments: Vec<Payment> = stmt
1104 .query_map(params_from_iter(where_params), |row| {
1105 self.sql_row_to_payment(row)
1106 })?
1107 .map(|i| i.unwrap())
1108 .collect();
1109 Ok(payments)
1110 }
1111
1112 pub fn get_payments_by_tx_id(
1113 &self,
1114 req: &ListPaymentsRequest,
1115 ) -> Result<HashMap<String, Payment>> {
1116 let res: HashMap<String, Payment> = self
1117 .get_payments(req)?
1118 .into_iter()
1119 .flat_map(|payment| {
1120 let mut res = vec![];
1122 if let Some(tx_id) = payment.tx_id.clone() {
1123 res.push((tx_id, payment.clone()));
1124 }
1125 if let Some(refund_tx_id) = payment.get_refund_tx_id() {
1126 res.push((refund_tx_id, payment));
1127 }
1128 res
1129 })
1130 .collect();
1131 Ok(res)
1132 }
1133}
1134
1135fn filter_to_where_clause(req: &ListPaymentsRequest) -> (String, Vec<Box<dyn ToSql + '_>>) {
1136 let mut where_clause: Vec<String> = Vec::new();
1137 let mut where_params: Vec<Box<dyn ToSql>> = Vec::new();
1138
1139 if let Some(t) = req.from_timestamp {
1140 where_clause.push("coalesce(ptx.timestamp, rs.created_at) >= ?".to_string());
1141 where_params.push(Box::new(t));
1142 };
1143 if let Some(t) = req.to_timestamp {
1144 where_clause.push("coalesce(ptx.timestamp, rs.created_at) <= ?".to_string());
1145 where_params.push(Box::new(t));
1146 };
1147
1148 if let Some(filters) = &req.filters {
1149 if !filters.is_empty() {
1150 let mut type_filter_clause: HashSet<i8> = HashSet::new();
1151
1152 for type_filter in filters {
1153 type_filter_clause.insert(*type_filter as i8);
1154 }
1155
1156 where_clause.push(format!(
1157 "pb.payment_type in ({})",
1158 type_filter_clause
1159 .iter()
1160 .map(|t| format!("{t}"))
1161 .collect::<Vec<_>>()
1162 .join(", ")
1163 ));
1164 }
1165 }
1166
1167 if let Some(states) = &req.states {
1168 if !states.is_empty() {
1169 let deduped_states: Vec<PaymentState> = states
1170 .clone()
1171 .into_iter()
1172 .collect::<HashSet<PaymentState>>()
1173 .into_iter()
1174 .collect();
1175 let states_param = deduped_states
1176 .iter()
1177 .map(|t| (*t as i8).to_string())
1178 .collect::<Vec<_>>()
1179 .join(", ");
1180 let tx_comfirmed_param = deduped_states
1181 .iter()
1182 .filter_map(|state| match state {
1183 PaymentState::Pending | PaymentState::Complete => {
1184 Some(((*state == PaymentState::Complete) as i8).to_string())
1185 }
1186 _ => None,
1187 })
1188 .collect::<Vec<_>>()
1189 .join(", ");
1190 let states_query = match tx_comfirmed_param.is_empty() {
1191 true => format!("COALESCE(rs.state, ss.state, cs.state) in ({states_param})"),
1192 false => format!("(COALESCE(rs.id, ss.id, cs.id) IS NULL AND ptx.is_confirmed in ({tx_comfirmed_param}) OR COALESCE(rs.state, ss.state, cs.state) in ({states_param}))"),
1193 };
1194 where_clause.push(states_query);
1195 }
1196 }
1197
1198 if let Some(details) = &req.details {
1199 match details {
1200 ListPaymentDetails::Bitcoin { address } => {
1201 where_clause.push("cs.id IS NOT NULL".to_string());
1202 if let Some(address) = address {
1203 where_clause.push(
1205 "(cs.direction = 0 AND cs.lockup_address = ? OR cs.direction = 1 AND cs.claim_address = ?)"
1206 .to_string(),
1207 );
1208 where_params.push(Box::new(address));
1209 where_params.push(Box::new(address));
1210 }
1211 }
1212 ListPaymentDetails::Liquid {
1213 asset_id,
1214 destination,
1215 } => {
1216 where_clause.push("COALESCE(rs.id, ss.id, cs.id) IS NULL".to_string());
1217 if let Some(asset_id) = asset_id {
1218 where_clause.push("pb.asset_id = ?".to_string());
1219 where_params.push(Box::new(asset_id));
1220 }
1221 if let Some(destination) = destination {
1222 where_clause.push("pd.destination = ?".to_string());
1223 where_params.push(Box::new(destination));
1224 }
1225 }
1226 }
1227 }
1228
1229 (where_clause.join(" and "), where_params)
1230}
1231
1232#[cfg(test)]
1233mod tests {
1234 use anyhow::Result;
1235
1236 use crate::{
1237 model::LiquidNetwork,
1238 persist::PaymentTxDetails,
1239 prelude::ListPaymentsRequest,
1240 test_utils::persist::{
1241 create_persister, new_payment_tx_data, new_receive_swap, new_send_swap,
1242 },
1243 };
1244
1245 use super::{PaymentState, PaymentType};
1246
1247 #[cfg(feature = "browser-tests")]
1248 wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
1249
1250 #[sdk_macros::test_all]
1251 fn test_get_payments() -> Result<()> {
1252 create_persister!(storage);
1253
1254 let (payment_tx_data, payment_tx_balance) =
1255 new_payment_tx_data(LiquidNetwork::Testnet, PaymentType::Send);
1256 storage.insert_or_update_payment(
1257 payment_tx_data.clone(),
1258 &[payment_tx_balance],
1259 Some(PaymentTxDetails {
1260 destination: "mock-address".to_string(),
1261 ..Default::default()
1262 }),
1263 false,
1264 )?;
1265
1266 assert!(!storage
1267 .get_payments(&ListPaymentsRequest {
1268 ..Default::default()
1269 })?
1270 .is_empty());
1271 assert!(storage.get_payment(&payment_tx_data.tx_id)?.is_some());
1272
1273 Ok(())
1274 }
1275
1276 #[sdk_macros::test_all]
1277 fn test_list_ongoing_swaps() -> Result<()> {
1278 create_persister!(storage);
1279
1280 storage.insert_or_update_send_swap(&new_send_swap(None, None))?;
1281 storage
1282 .insert_or_update_receive_swap(&new_receive_swap(Some(PaymentState::Pending), None))?;
1283
1284 assert_eq!(storage.list_ongoing_swaps()?.len(), 2);
1285
1286 Ok(())
1287 }
1288}
1289
1290#[cfg(feature = "test-utils")]
1291pub mod test_helpers {
1292 use super::*;
1293
1294 impl Persister {
1295 pub fn test_insert_or_update_send_swap(&self, swap: &SendSwap) -> Result<()> {
1296 self.insert_or_update_send_swap(swap)
1297 }
1298
1299 pub fn test_insert_or_update_receive_swap(&self, swap: &ReceiveSwap) -> Result<()> {
1300 self.insert_or_update_receive_swap(swap)
1301 }
1302
1303 pub fn test_list_ongoing_swaps(&self) -> Result<Vec<Swap>> {
1304 self.list_ongoing_swaps()
1305 }
1306 }
1307}