1use anyhow::{anyhow, Result};
2use boltz_client::swaps::boltz::CreateReverseResponse;
3use rusqlite::{named_params, params, Connection, Row, TransactionBehavior};
4use sdk_common::bitcoin::hashes::{hex::ToHex, sha256, Hash};
5use serde::{Deserialize, Serialize};
6
7use crate::ensure_sdk;
8use crate::error::PaymentError;
9use crate::model::*;
10use crate::persist::{get_where_clause_state_in, where_clauses_to_string, Persister};
11use crate::sync::model::data::ReceiveSyncData;
12use crate::sync::model::RecordType;
13
14impl Persister {
15 pub(crate) fn insert_or_update_receive_swap_inner(
16 con: &Connection,
17 receive_swap: &ReceiveSwap,
18 ) -> Result<()> {
19 let id_hash = sha256::Hash::hash(receive_swap.id.as_bytes()).to_hex();
20 con.execute(
21 "
22 INSERT INTO receive_swaps (
23 id,
24 id_hash,
25 preimage,
26 create_response_json,
27 claim_private_key,
28 invoice,
29 timeout_block_height,
30 payment_hash,
31 destination_pubkey,
32 payer_amount_sat,
33 receiver_amount_sat,
34 created_at,
35 claim_fees_sat,
36 mrh_address,
37 state,
38 pair_fees_json
39 )
40 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
41 ON CONFLICT DO NOTHING
42 ",
43 (
44 &receive_swap.id,
45 id_hash,
46 &receive_swap.preimage,
47 &receive_swap.create_response_json,
48 &receive_swap.claim_private_key,
49 &receive_swap.invoice,
50 &receive_swap.timeout_block_height,
51 &receive_swap.payment_hash,
52 &receive_swap.destination_pubkey,
53 &receive_swap.payer_amount_sat,
54 &receive_swap.receiver_amount_sat,
55 &receive_swap.created_at,
56 &receive_swap.claim_fees_sat,
57 &receive_swap.mrh_address,
58 &receive_swap.state,
59 &receive_swap.pair_fees_json,
60 ),
61 )?;
62
63 let rows_affected = con.execute(
64 "UPDATE receive_swaps
65 SET
66 bolt12_offer = :bolt12_offer,
67 description = :description,
68 payer_note = :payer_note,
69 claim_address = :claim_address,
70 claim_tx_id = :claim_tx_id,
71 lockup_tx_id = :lockup_tx_id,
72 mrh_tx_id = :mrh_tx_id,
73 payer_amount_sat = :payer_amount_sat,
74 receiver_amount_sat = :receiver_amount_sat,
75 state = :state
76 WHERE
77 id = :id AND
78 version = :version",
79 named_params! {
80 ":id": &receive_swap.id,
81 ":bolt12_offer": &receive_swap.bolt12_offer,
82 ":description": &receive_swap.description,
83 ":payer_note": &receive_swap.payer_note,
84 ":claim_address": &receive_swap.claim_address,
85 ":claim_tx_id": &receive_swap.claim_tx_id,
86 ":lockup_tx_id": &receive_swap.lockup_tx_id,
87 ":mrh_tx_id": &receive_swap.mrh_tx_id,
88 ":payer_amount_sat": &receive_swap.payer_amount_sat,
92 ":receiver_amount_sat": &receive_swap.receiver_amount_sat,
93 ":state": &receive_swap.state,
94 ":version": &receive_swap.metadata.version,
95 },
96 )?;
97 ensure_sdk!(
98 rows_affected > 0,
99 anyhow!("Version mismatch for receive swap {}", receive_swap.id)
100 );
101
102 if receive_swap.mrh_tx_id.is_some() {
103 Self::delete_reserved_address_inner(con, &receive_swap.mrh_address)?;
104 }
105
106 Ok(())
107 }
108
109 pub(crate) fn insert_or_update_receive_swap(&self, receive_swap: &ReceiveSwap) -> Result<()> {
110 let maybe_swap = self.fetch_receive_swap_by_id(&receive_swap.id)?;
111 let updated_fields = ReceiveSyncData::updated_fields(maybe_swap, receive_swap);
112
113 let mut con = self.get_connection()?;
114 let tx = con.transaction_with_behavior(TransactionBehavior::Immediate)?;
115
116 Self::insert_or_update_receive_swap_inner(&tx, receive_swap)?;
117
118 let trigger_sync = updated_fields.as_ref().is_none_or(|u| !u.is_empty());
122 match trigger_sync {
123 true => {
124 self.commit_outgoing(&tx, &receive_swap.id, RecordType::Receive, updated_fields)?;
125 tx.commit()?;
126 self.trigger_sync();
127 }
128 false => {
129 tx.commit()?;
130 }
131 };
132
133 Ok(())
134 }
135
136 fn list_receive_swaps_query(where_clauses: Vec<String>) -> String {
137 let where_clause_str = where_clauses_to_string(where_clauses);
138
139 format!(
140 "
141 SELECT
142 rs.id,
143 rs.preimage,
144 rs.create_response_json,
145 rs.claim_private_key,
146 rs.invoice,
147 rs.bolt12_offer,
148 rs.payment_hash,
149 rs.destination_pubkey,
150 rs.timeout_block_height,
151 rs.description,
152 rs.payer_note,
153 rs.payer_amount_sat,
154 rs.receiver_amount_sat,
155 rs.claim_fees_sat,
156 rs.claim_address,
157 rs.claim_tx_id,
158 rs.lockup_tx_id,
159 rs.mrh_address,
160 rs.mrh_tx_id,
161 rs.created_at,
162 rs.state,
163 rs.pair_fees_json,
164 rs.version,
165 rs.last_updated_at,
166
167 sync_state.is_local
168 FROM receive_swaps AS rs
169 LEFT JOIN sync_state ON rs.id = sync_state.data_id
170 {where_clause_str}
171 ORDER BY rs.created_at
172 "
173 )
174 }
175
176 pub(crate) fn fetch_receive_swap_by_id(&self, id: &str) -> Result<Option<ReceiveSwap>> {
177 let con: Connection = self.get_connection()?;
178 let query = Self::list_receive_swaps_query(vec!["id = ?1 or id_hash = ?1".to_string()]);
179 let res = con.query_row(&query, [id], Self::sql_row_to_receive_swap);
180
181 Ok(res.ok())
182 }
183
184 pub(crate) fn fetch_receive_swap_by_invoice(
185 &self,
186 invoice: &str,
187 ) -> Result<Option<ReceiveSwap>> {
188 let con: Connection = self.get_connection()?;
189 let query = Self::list_receive_swaps_query(vec!["invoice= ?1".to_string()]);
190 let res = con.query_row(&query, [invoice], Self::sql_row_to_receive_swap);
191
192 Ok(res.ok())
193 }
194
195 fn sql_row_to_receive_swap(row: &Row) -> rusqlite::Result<ReceiveSwap> {
196 Ok(ReceiveSwap {
197 id: row.get(0)?,
198 preimage: row.get(1)?,
199 create_response_json: row.get(2)?,
200 claim_private_key: row.get(3)?,
201 invoice: row.get(4)?,
202 bolt12_offer: row.get(5)?,
203 payment_hash: row.get(6)?,
204 destination_pubkey: row.get(7)?,
205 timeout_block_height: row.get(8)?,
206 description: row.get(9)?,
207 payer_note: row.get(10)?,
208 payer_amount_sat: row.get(11)?,
209 receiver_amount_sat: row.get(12)?,
210 claim_fees_sat: row.get(13)?,
211 claim_address: row.get(14)?,
212 claim_tx_id: row.get(15)?,
213 lockup_tx_id: row.get(16)?,
214 mrh_address: row.get(17)?,
215 mrh_tx_id: row.get(18)?,
216 created_at: row.get(19)?,
217 state: row.get(20)?,
218 pair_fees_json: row.get(21)?,
219 metadata: SwapMetadata {
220 version: row.get(22)?,
221 last_updated_at: row.get(23)?,
222 is_local: row.get::<usize, Option<bool>>(24)?.unwrap_or(true),
223 },
224 })
225 }
226
227 pub(crate) fn list_receive_swaps_where(
228 &self,
229 con: &Connection,
230 where_clauses: Vec<String>,
231 ) -> Result<Vec<ReceiveSwap>> {
232 let query = Self::list_receive_swaps_query(where_clauses);
233 let ongoing_receive = con
234 .prepare(&query)?
235 .query_map(params![], Self::sql_row_to_receive_swap)?
236 .map(|i| i.unwrap())
237 .collect();
238 Ok(ongoing_receive)
239 }
240
241 pub(crate) fn list_ongoing_receive_swaps(&self) -> Result<Vec<ReceiveSwap>> {
242 let con = self.get_connection()?;
243 let where_clauses = vec![get_where_clause_state_in(&[
244 PaymentState::Created,
245 PaymentState::Pending,
246 ])];
247
248 self.list_receive_swaps_where(&con, where_clauses)
249 }
250
251 pub(crate) fn list_recoverable_receive_swaps(&self) -> Result<Vec<ReceiveSwap>> {
252 let con = self.get_connection()?;
253 let where_clause = vec![get_where_clause_state_in(&[
254 PaymentState::Created,
255 PaymentState::Pending,
256 ])];
257
258 self.list_receive_swaps_where(&con, where_clause)
259 }
260
261 pub(crate) fn set_receive_swap_claim_address(
262 &self,
263 swap_id: &str,
264 claim_address: &str,
265 ) -> Result<(), PaymentError> {
266 let con = self.get_connection()?;
267 con.execute(
268 "UPDATE receive_swaps
269 SET claim_address = :claim_address
270 WHERE id = :id",
271 named_params! {
272 ":id": swap_id,
273 ":claim_address": claim_address,
274 },
275 )?;
276 Ok(())
277 }
278
279 pub(crate) fn set_receive_swap_claim_tx_id(
281 &self,
282 swap_id: &str,
283 claim_tx_id: &str,
284 ) -> Result<(), PaymentError> {
285 let con = self.get_connection()?;
286 let row_count = con
287 .execute(
288 "UPDATE receive_swaps
289 SET claim_tx_id = :claim_tx_id
290 WHERE id = :id AND claim_tx_id IS NULL",
291 named_params! {
292 ":id": swap_id,
293 ":claim_tx_id": claim_tx_id,
294 },
295 )
296 .map_err(|_| PaymentError::PersistError)?;
297 match row_count {
298 1 => Ok(()),
299 _ => Err(PaymentError::AlreadyClaimed),
300 }
301 }
302
303 pub(crate) fn unset_receive_swap_claim_tx_id(
305 &self,
306 swap_id: &str,
307 claim_tx_id: &str,
308 ) -> Result<(), PaymentError> {
309 let con = self.get_connection()?;
310 con.execute(
311 "UPDATE receive_swaps
312 SET claim_tx_id = NULL
313 WHERE id = :id AND claim_tx_id = :claim_tx_id",
314 named_params! {
315 ":id": swap_id,
316 ":claim_tx_id": claim_tx_id,
317 },
318 )
319 .map_err(|_| PaymentError::PersistError)?;
320 Ok(())
321 }
322
323 pub(crate) fn try_handle_receive_swap_update(
324 &self,
325 swap_id: &str,
326 to_state: PaymentState,
327 claim_tx_id: Option<&str>,
328 lockup_tx_id: Option<&str>,
329 mrh_tx_id: Option<&str>,
330 mrh_amount_sat: Option<u64>,
331 ) -> Result<(), PaymentError> {
332 let mut con = self.get_connection()?;
334 let tx = con.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
335
336 tx.execute(
337 "UPDATE receive_swaps
338 SET
339 claim_tx_id = COALESCE(claim_tx_id, :claim_tx_id),
340 lockup_tx_id = COALESCE(lockup_tx_id, :lockup_tx_id),
341 mrh_tx_id = COALESCE(mrh_tx_id, :mrh_tx_id),
342
343 payer_amount_sat = COALESCE(:mrh_amount_sat, payer_amount_sat),
344 receiver_amount_sat = COALESCE(:mrh_amount_sat, receiver_amount_sat),
345 state = :state
346 WHERE
347 id = :id",
348 named_params! {
349 ":id": swap_id,
350 ":lockup_tx_id": lockup_tx_id,
351 ":claim_tx_id": claim_tx_id,
352 ":mrh_tx_id": mrh_tx_id,
353 ":mrh_amount_sat": mrh_amount_sat,
354 ":state": to_state,
355 },
356 )?;
357
358 tx.commit()?;
368
369 Ok(())
370 }
371}
372
373#[derive(Clone, Debug, Serialize, Deserialize)]
374pub(crate) struct InternalCreateReverseResponse {
375 pub swap_tree: InternalSwapTree,
376 pub lockup_address: String,
377 pub refund_public_key: String,
378 pub timeout_block_height: u32,
379 pub onchain_amount: u64,
380 pub blinding_key: Option<String>,
381}
382impl InternalCreateReverseResponse {
383 pub(crate) fn try_convert_from_boltz(
384 boltz_create_response: &CreateReverseResponse,
385 expected_swap_id: &str,
386 expected_invoice: Option<&str>,
387 ) -> Result<Self, PaymentError> {
388 ensure_sdk!(
391 boltz_create_response.id == expected_swap_id,
392 PaymentError::PersistError
393 );
394 match (&boltz_create_response.invoice, expected_invoice) {
395 (Some(invoice), Some(expected_invoice)) => {
396 ensure_sdk!(invoice == expected_invoice, PaymentError::PersistError);
397 }
398 (None, None) => {}
399 _ => {
400 return Err(PaymentError::PersistError);
401 }
402 }
403
404 let res = InternalCreateReverseResponse {
405 swap_tree: boltz_create_response.swap_tree.clone().into(),
406 lockup_address: boltz_create_response.lockup_address.clone(),
407 refund_public_key: boltz_create_response.refund_public_key.to_string(),
408 timeout_block_height: boltz_create_response.timeout_block_height,
409 onchain_amount: boltz_create_response.onchain_amount,
410 blinding_key: boltz_create_response.blinding_key.clone(),
411 };
412 Ok(res)
413 }
414}
415
416#[cfg(test)]
417mod tests {
418 use anyhow::{anyhow, Result};
419
420 use crate::test_utils::persist::{create_persister, new_receive_swap};
421
422 use super::PaymentState;
423
424 #[cfg(feature = "browser-tests")]
425 wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
426
427 #[sdk_macros::test_all]
428 fn test_fetch_receive_swap() -> Result<()> {
429 create_persister!(storage);
430
431 let receive_swap = new_receive_swap(None, None);
432
433 storage.insert_or_update_receive_swap(&receive_swap)?;
434 assert!(storage.fetch_receive_swap_by_id(&receive_swap.id).is_ok());
436 assert!(storage
438 .fetch_receive_swap_by_invoice(&receive_swap.invoice)
439 .is_ok());
440
441 Ok(())
442 }
443
444 #[sdk_macros::test_all]
445 fn test_list_receive_swap() -> Result<()> {
446 create_persister!(storage);
447
448 let range = 0..3;
450 for _ in range.clone() {
451 storage.insert_or_update_receive_swap(&new_receive_swap(None, None))?;
452 }
453
454 let con = storage.get_connection()?;
455 let swaps = storage.list_receive_swaps_where(&con, vec![])?;
456 assert_eq!(swaps.len(), range.len());
457
458 storage
460 .insert_or_update_receive_swap(&new_receive_swap(Some(PaymentState::Pending), None))?;
461 let ongoing_swaps = storage.list_ongoing_receive_swaps()?;
462 assert_eq!(ongoing_swaps.len(), 4);
463
464 Ok(())
465 }
466
467 #[sdk_macros::test_all]
468 fn test_update_receive_swap() -> Result<()> {
469 create_persister!(storage);
470
471 let receive_swap = new_receive_swap(None, None);
472 storage.insert_or_update_receive_swap(&receive_swap)?;
473
474 let new_state = PaymentState::Pending;
476 let claim_tx_id = Some("claim_tx_id");
477
478 storage.try_handle_receive_swap_update(
479 &receive_swap.id,
480 new_state,
481 claim_tx_id,
482 None,
483 None,
484 None,
485 )?;
486
487 let updated_receive_swap = storage
488 .fetch_receive_swap_by_id(&receive_swap.id)?
489 .ok_or(anyhow!("Could not find Receive swap in database"))?;
490
491 assert_eq!(new_state, updated_receive_swap.state);
492 assert_eq!(claim_tx_id, updated_receive_swap.claim_tx_id.as_deref());
493
494 Ok(())
495 }
496
497 #[sdk_macros::async_test_all]
498 async fn test_writing_stale_swap() -> Result<()> {
499 create_persister!(storage);
500
501 let receive_swap = new_receive_swap(None, None);
502 storage.insert_or_update_receive_swap(&receive_swap)?;
503
504 let mut receive_swap = storage.fetch_receive_swap_by_id(&receive_swap.id)?.unwrap();
506 receive_swap.lockup_tx_id = Some("tx_id".to_string());
507 storage.insert_or_update_receive_swap(&receive_swap)?;
508
509 let receive_swap = storage.fetch_receive_swap_by_id(&receive_swap.id)?.unwrap();
511 storage.insert_or_update_receive_swap(&receive_swap)?;
512
513 let mut receive_swap = storage.fetch_receive_swap_by_id(&receive_swap.id)?.unwrap();
515 receive_swap.lockup_tx_id = Some("tx_id_2".to_string());
516 storage.set_receive_swap_claim_tx_id(&receive_swap.id, "tx_id")?;
518 assert!(storage
519 .insert_or_update_receive_swap(&receive_swap)
520 .is_err());
521
522 Ok(())
523 }
524}