1use anyhow::{anyhow, Result};
2use boltz_client::swaps::boltz::CreateReverseResponse;
3use rusqlite::{named_params, params, Connection, Row, TransactionBehavior};
4use sdk_common::bitcoin::hashes::{hex::ToHex, sha256, Hash};
5use serde::{Deserialize, Serialize};
6
7use crate::ensure_sdk;
8use crate::error::PaymentError;
9use crate::model::*;
10use crate::persist::{get_where_clause_state_in, where_clauses_to_string, Persister};
11use crate::sync::model::data::ReceiveSyncData;
12use crate::sync::model::RecordType;
13use crate::utils::{from_optional_u64_to_row, from_row_to_u64, from_u64_to_row};
14
15impl Persister {
16 pub(crate) fn insert_or_update_receive_swap_inner(
17 con: &Connection,
18 receive_swap: &ReceiveSwap,
19 ) -> Result<()> {
20 let id_hash = sha256::Hash::hash(receive_swap.id.as_bytes()).to_hex();
21 con.execute(
22 "
23 INSERT INTO receive_swaps (
24 id,
25 id_hash,
26 preimage,
27 create_response_json,
28 claim_private_key,
29 invoice,
30 timeout_block_height,
31 payment_hash,
32 destination_pubkey,
33 payer_amount_sat,
34 receiver_amount_sat,
35 created_at,
36 claim_fees_sat,
37 mrh_address,
38 state,
39 pair_fees_json
40 )
41 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
42 ON CONFLICT DO NOTHING
43 ",
44 (
45 &receive_swap.id,
46 id_hash,
47 &receive_swap.preimage,
48 &receive_swap.create_response_json,
49 &receive_swap.claim_private_key,
50 &receive_swap.invoice,
51 &receive_swap.timeout_block_height,
52 &receive_swap.payment_hash,
53 &receive_swap.destination_pubkey,
54 from_u64_to_row(receive_swap.payer_amount_sat)?,
55 from_u64_to_row(receive_swap.receiver_amount_sat)?,
56 &receive_swap.created_at,
57 from_u64_to_row(receive_swap.claim_fees_sat)?,
58 &receive_swap.mrh_address,
59 &receive_swap.state,
60 &receive_swap.pair_fees_json,
61 ),
62 )?;
63
64 let rows_affected = con.execute(
65 "UPDATE receive_swaps
66 SET
67 bolt12_offer = :bolt12_offer,
68 description = :description,
69 payer_note = :payer_note,
70 claim_address = :claim_address,
71 claim_tx_id = :claim_tx_id,
72 lockup_tx_id = :lockup_tx_id,
73 mrh_tx_id = :mrh_tx_id,
74 payer_amount_sat = :payer_amount_sat,
75 receiver_amount_sat = :receiver_amount_sat,
76 state = :state
77 WHERE
78 id = :id AND
79 version = :version",
80 named_params! {
81 ":id": &receive_swap.id,
82 ":bolt12_offer": &receive_swap.bolt12_offer,
83 ":description": &receive_swap.description,
84 ":payer_note": &receive_swap.payer_note,
85 ":claim_address": &receive_swap.claim_address,
86 ":claim_tx_id": &receive_swap.claim_tx_id,
87 ":lockup_tx_id": &receive_swap.lockup_tx_id,
88 ":mrh_tx_id": &receive_swap.mrh_tx_id,
89 ":payer_amount_sat": from_u64_to_row(receive_swap.payer_amount_sat)?,
93 ":receiver_amount_sat": from_u64_to_row(receive_swap.receiver_amount_sat)?,
94 ":state": &receive_swap.state,
95 ":version": from_u64_to_row(receive_swap.metadata.version)?,
96 },
97 )?;
98 ensure_sdk!(
99 rows_affected > 0,
100 anyhow!("Version mismatch for receive swap {}", receive_swap.id)
101 );
102
103 if receive_swap.mrh_tx_id.is_some() {
104 Self::delete_reserved_address_inner(con, &receive_swap.mrh_address)?;
105 }
106
107 Ok(())
108 }
109
110 pub(crate) fn insert_or_update_receive_swap(&self, receive_swap: &ReceiveSwap) -> Result<()> {
111 let maybe_swap = self.fetch_receive_swap_by_id(&receive_swap.id)?;
112 let updated_fields = ReceiveSyncData::updated_fields(maybe_swap, receive_swap);
113
114 let mut con = self.get_connection()?;
115 let tx = con.transaction_with_behavior(TransactionBehavior::Immediate)?;
116
117 Self::insert_or_update_receive_swap_inner(&tx, receive_swap)?;
118
119 let trigger_sync = updated_fields.as_ref().is_none_or(|u| !u.is_empty());
123 match trigger_sync {
124 true => {
125 self.commit_outgoing(&tx, &receive_swap.id, RecordType::Receive, updated_fields)?;
126 tx.commit()?;
127 self.trigger_sync();
128 }
129 false => {
130 tx.commit()?;
131 }
132 };
133
134 Ok(())
135 }
136
137 fn list_receive_swaps_query(where_clauses: Vec<String>) -> String {
138 let where_clause_str = where_clauses_to_string(where_clauses);
139
140 format!(
141 "
142 SELECT
143 rs.id,
144 rs.preimage,
145 rs.create_response_json,
146 rs.claim_private_key,
147 rs.invoice,
148 rs.bolt12_offer,
149 rs.payment_hash,
150 rs.destination_pubkey,
151 rs.timeout_block_height,
152 rs.description,
153 rs.payer_note,
154 rs.payer_amount_sat,
155 rs.receiver_amount_sat,
156 rs.claim_fees_sat,
157 rs.claim_address,
158 rs.claim_tx_id,
159 rs.lockup_tx_id,
160 rs.mrh_address,
161 rs.mrh_tx_id,
162 rs.created_at,
163 rs.state,
164 rs.pair_fees_json,
165 rs.version,
166 rs.last_updated_at,
167
168 sync_state.is_local
169 FROM receive_swaps AS rs
170 LEFT JOIN sync_state ON rs.id = sync_state.data_id
171 {where_clause_str}
172 ORDER BY rs.created_at
173 "
174 )
175 }
176
177 pub(crate) fn fetch_receive_swap_by_id(&self, id: &str) -> Result<Option<ReceiveSwap>> {
178 let con: Connection = self.get_connection()?;
179 let query = Self::list_receive_swaps_query(vec!["id = ?1 or id_hash = ?1".to_string()]);
180 let res = con.query_row(&query, [id], Self::sql_row_to_receive_swap);
181
182 Ok(res.ok())
183 }
184
185 pub(crate) fn fetch_receive_swap_by_invoice(
186 &self,
187 invoice: &str,
188 ) -> Result<Option<ReceiveSwap>> {
189 let con: Connection = self.get_connection()?;
190 let query = Self::list_receive_swaps_query(vec!["invoice= ?1".to_string()]);
191 let res = con.query_row(&query, [invoice], Self::sql_row_to_receive_swap);
192
193 Ok(res.ok())
194 }
195
196 fn sql_row_to_receive_swap(row: &Row) -> rusqlite::Result<ReceiveSwap> {
197 Ok(ReceiveSwap {
198 id: row.get(0)?,
199 preimage: row.get(1)?,
200 create_response_json: row.get(2)?,
201 claim_private_key: row.get(3)?,
202 invoice: row.get(4)?,
203 bolt12_offer: row.get(5)?,
204 payment_hash: row.get(6)?,
205 destination_pubkey: row.get(7)?,
206 timeout_block_height: row.get(8)?,
207 description: row.get(9)?,
208 payer_note: row.get(10)?,
209 payer_amount_sat: from_row_to_u64(row, 11)?,
210 receiver_amount_sat: from_row_to_u64(row, 12)?,
211 claim_fees_sat: from_row_to_u64(row, 13)?,
212 claim_address: row.get(14)?,
213 claim_tx_id: row.get(15)?,
214 lockup_tx_id: row.get(16)?,
215 mrh_address: row.get(17)?,
216 mrh_tx_id: row.get(18)?,
217 created_at: row.get(19)?,
218 state: row.get(20)?,
219 pair_fees_json: row.get(21)?,
220 metadata: SwapMetadata {
221 version: from_row_to_u64(row, 22)?,
222 last_updated_at: row.get(23)?,
223 is_local: row.get::<usize, Option<bool>>(24)?.unwrap_or(true),
224 },
225 })
226 }
227
228 pub(crate) fn list_receive_swaps_where(
229 &self,
230 con: &Connection,
231 where_clauses: Vec<String>,
232 ) -> Result<Vec<ReceiveSwap>> {
233 let query = Self::list_receive_swaps_query(where_clauses);
234 let ongoing_receive = con
235 .prepare(&query)?
236 .query_map(params![], Self::sql_row_to_receive_swap)?
237 .map(|i| i.unwrap())
238 .collect();
239 Ok(ongoing_receive)
240 }
241
242 pub(crate) fn list_ongoing_receive_swaps(&self) -> Result<Vec<ReceiveSwap>> {
243 let con = self.get_connection()?;
244 let where_clauses = vec![get_where_clause_state_in(&[
245 PaymentState::Created,
246 PaymentState::Pending,
247 ])];
248
249 self.list_receive_swaps_where(&con, where_clauses)
250 }
251
252 pub(crate) fn list_recoverable_receive_swaps(&self) -> Result<Vec<ReceiveSwap>> {
253 let con = self.get_connection()?;
254 let where_clause = vec![get_where_clause_state_in(&[
255 PaymentState::Created,
256 PaymentState::Pending,
257 ])];
258
259 self.list_receive_swaps_where(&con, where_clause)
260 }
261
262 pub(crate) fn set_receive_swap_claim_address(
263 &self,
264 swap_id: &str,
265 claim_address: &str,
266 ) -> Result<(), PaymentError> {
267 let con = self.get_connection()?;
268 con.execute(
269 "UPDATE receive_swaps
270 SET claim_address = :claim_address
271 WHERE id = :id",
272 named_params! {
273 ":id": swap_id,
274 ":claim_address": claim_address,
275 },
276 )?;
277 Ok(())
278 }
279
280 pub(crate) fn set_receive_swap_claim_tx_id(
282 &self,
283 swap_id: &str,
284 claim_tx_id: &str,
285 ) -> Result<(), PaymentError> {
286 let con = self.get_connection()?;
287 let row_count = con
288 .execute(
289 "UPDATE receive_swaps
290 SET claim_tx_id = :claim_tx_id
291 WHERE id = :id AND claim_tx_id IS NULL",
292 named_params! {
293 ":id": swap_id,
294 ":claim_tx_id": claim_tx_id,
295 },
296 )
297 .map_err(|_| PaymentError::PersistError)?;
298 match row_count {
299 1 => Ok(()),
300 _ => Err(PaymentError::AlreadyClaimed),
301 }
302 }
303
304 pub(crate) fn unset_receive_swap_claim_tx_id(
306 &self,
307 swap_id: &str,
308 claim_tx_id: &str,
309 ) -> Result<(), PaymentError> {
310 let con = self.get_connection()?;
311 con.execute(
312 "UPDATE receive_swaps
313 SET claim_tx_id = NULL
314 WHERE id = :id AND claim_tx_id = :claim_tx_id",
315 named_params! {
316 ":id": swap_id,
317 ":claim_tx_id": claim_tx_id,
318 },
319 )
320 .map_err(|_| PaymentError::PersistError)?;
321 Ok(())
322 }
323
324 pub(crate) fn try_handle_receive_swap_update(
325 &self,
326 swap_id: &str,
327 to_state: PaymentState,
328 claim_tx_id: Option<&str>,
329 lockup_tx_id: Option<&str>,
330 mrh_tx_id: Option<&str>,
331 mrh_amount_sat: Option<u64>,
332 ) -> Result<(), PaymentError> {
333 let mut con = self.get_connection()?;
335 let tx = con.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
336
337 tx.execute(
338 "UPDATE receive_swaps
339 SET
340 claim_tx_id = COALESCE(claim_tx_id, :claim_tx_id),
341 lockup_tx_id = COALESCE(lockup_tx_id, :lockup_tx_id),
342 mrh_tx_id = COALESCE(mrh_tx_id, :mrh_tx_id),
343
344 payer_amount_sat = COALESCE(:mrh_amount_sat, payer_amount_sat),
345 receiver_amount_sat = COALESCE(:mrh_amount_sat, receiver_amount_sat),
346 state = :state
347 WHERE
348 id = :id",
349 named_params! {
350 ":id": swap_id,
351 ":lockup_tx_id": lockup_tx_id,
352 ":claim_tx_id": claim_tx_id,
353 ":mrh_tx_id": mrh_tx_id,
354 ":mrh_amount_sat": from_optional_u64_to_row(&mrh_amount_sat)?,
355 ":state": to_state,
356 },
357 )?;
358
359 tx.commit()?;
369
370 Ok(())
371 }
372}
373
374#[derive(Clone, Debug, Serialize, Deserialize)]
375pub(crate) struct InternalCreateReverseResponse {
376 pub swap_tree: InternalSwapTree,
377 pub lockup_address: String,
378 pub refund_public_key: String,
379 pub timeout_block_height: u32,
380 pub onchain_amount: u64,
381 pub blinding_key: Option<String>,
382}
383impl InternalCreateReverseResponse {
384 pub(crate) fn try_convert_from_boltz(
385 boltz_create_response: &CreateReverseResponse,
386 expected_swap_id: &str,
387 expected_invoice: Option<&str>,
388 ) -> Result<Self, PaymentError> {
389 ensure_sdk!(
392 boltz_create_response.id == expected_swap_id,
393 PaymentError::PersistError
394 );
395 match (&boltz_create_response.invoice, expected_invoice) {
396 (Some(invoice), Some(expected_invoice)) => {
397 ensure_sdk!(invoice == expected_invoice, PaymentError::PersistError);
398 }
399 (None, None) => {}
400 _ => {
401 return Err(PaymentError::PersistError);
402 }
403 }
404
405 let res = InternalCreateReverseResponse {
406 swap_tree: boltz_create_response.swap_tree.clone().into(),
407 lockup_address: boltz_create_response.lockup_address.clone(),
408 refund_public_key: boltz_create_response.refund_public_key.to_string(),
409 timeout_block_height: boltz_create_response.timeout_block_height,
410 onchain_amount: boltz_create_response.onchain_amount,
411 blinding_key: boltz_create_response.blinding_key.clone(),
412 };
413 Ok(res)
414 }
415}
416
417#[cfg(test)]
418mod tests {
419 use anyhow::{anyhow, Result};
420
421 use crate::test_utils::persist::{create_persister, new_receive_swap};
422
423 use super::PaymentState;
424
425 #[cfg(feature = "browser-tests")]
426 wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
427
428 #[sdk_macros::test_all]
429 fn test_fetch_receive_swap() -> Result<()> {
430 create_persister!(storage);
431
432 let receive_swap = new_receive_swap(None, None);
433
434 storage.insert_or_update_receive_swap(&receive_swap)?;
435 assert!(storage.fetch_receive_swap_by_id(&receive_swap.id).is_ok());
437 assert!(storage
439 .fetch_receive_swap_by_invoice(&receive_swap.invoice)
440 .is_ok());
441
442 Ok(())
443 }
444
445 #[sdk_macros::test_all]
446 fn test_list_receive_swap() -> Result<()> {
447 create_persister!(storage);
448
449 let range = 0..3;
451 for _ in range.clone() {
452 storage.insert_or_update_receive_swap(&new_receive_swap(None, None))?;
453 }
454
455 let con = storage.get_connection()?;
456 let swaps = storage.list_receive_swaps_where(&con, vec![])?;
457 assert_eq!(swaps.len(), range.len());
458
459 storage
461 .insert_or_update_receive_swap(&new_receive_swap(Some(PaymentState::Pending), None))?;
462 let ongoing_swaps = storage.list_ongoing_receive_swaps()?;
463 assert_eq!(ongoing_swaps.len(), 4);
464
465 Ok(())
466 }
467
468 #[sdk_macros::test_all]
469 fn test_update_receive_swap() -> Result<()> {
470 create_persister!(storage);
471
472 let receive_swap = new_receive_swap(None, None);
473 storage.insert_or_update_receive_swap(&receive_swap)?;
474
475 let new_state = PaymentState::Pending;
477 let claim_tx_id = Some("claim_tx_id");
478
479 storage.try_handle_receive_swap_update(
480 &receive_swap.id,
481 new_state,
482 claim_tx_id,
483 None,
484 None,
485 None,
486 )?;
487
488 let updated_receive_swap = storage
489 .fetch_receive_swap_by_id(&receive_swap.id)?
490 .ok_or(anyhow!("Could not find Receive swap in database"))?;
491
492 assert_eq!(new_state, updated_receive_swap.state);
493 assert_eq!(claim_tx_id, updated_receive_swap.claim_tx_id.as_deref());
494
495 Ok(())
496 }
497
498 #[sdk_macros::async_test_all]
499 async fn test_writing_stale_swap() -> Result<()> {
500 create_persister!(storage);
501
502 let receive_swap = new_receive_swap(None, None);
503 storage.insert_or_update_receive_swap(&receive_swap)?;
504
505 let mut receive_swap = storage.fetch_receive_swap_by_id(&receive_swap.id)?.unwrap();
507 receive_swap.lockup_tx_id = Some("tx_id".to_string());
508 storage.insert_or_update_receive_swap(&receive_swap)?;
509
510 let receive_swap = storage.fetch_receive_swap_by_id(&receive_swap.id)?.unwrap();
512 storage.insert_or_update_receive_swap(&receive_swap)?;
513
514 let mut receive_swap = storage.fetch_receive_swap_by_id(&receive_swap.id)?.unwrap();
516 receive_swap.lockup_tx_id = Some("tx_id_2".to_string());
517 storage.set_receive_swap_claim_tx_id(&receive_swap.id, "tx_id")?;
519 assert!(storage
520 .insert_or_update_receive_swap(&receive_swap)
521 .is_err());
522
523 Ok(())
524 }
525}