1use anyhow::{anyhow, Result};
2use boltz_client::swaps::boltz::CreateSubmarineResponse;
3use rusqlite::{named_params, params, Connection, Row};
4use sdk_common::bitcoin::hashes::{hex::ToHex, sha256, Hash};
5use serde::{Deserialize, Serialize};
6
7use crate::error::PaymentError;
8use crate::model::*;
9use crate::persist::{get_where_clause_state_in, Persister};
10use crate::sync::model::data::SendSyncData;
11use crate::sync::model::RecordType;
12use crate::utils::{from_row_to_u64, from_u64_to_row};
13use crate::{ensure_sdk, get_updated_fields};
14
15use super::where_clauses_to_string;
16
17impl Persister {
18 pub(crate) fn insert_or_update_send_swap_inner(
19 con: &Connection,
20 send_swap: &SendSwap,
21 ) -> Result<()> {
22 let id_hash = sha256::Hash::hash(send_swap.id.as_bytes()).to_hex();
23 con.execute(
24 "
25 INSERT INTO send_swaps (
26 id,
27 id_hash,
28 invoice,
29 bolt12_offer,
30 payment_hash,
31 destination_pubkey,
32 timeout_block_height,
33 payer_amount_sat,
34 receiver_amount_sat,
35 create_response_json,
36 refund_private_key,
37 created_at,
38 state,
39 pair_fees_json
40 )
41 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
42 ON CONFLICT DO NOTHING
43 ",
44 (
45 &send_swap.id,
46 &id_hash,
47 &send_swap.invoice,
48 &send_swap.bolt12_offer,
49 &send_swap.payment_hash,
50 &send_swap.destination_pubkey,
51 from_u64_to_row(send_swap.timeout_block_height)?,
52 from_u64_to_row(send_swap.payer_amount_sat)?,
53 from_u64_to_row(send_swap.receiver_amount_sat)?,
54 &send_swap.create_response_json,
55 &send_swap.refund_private_key,
56 &send_swap.created_at,
57 &send_swap.state,
58 &send_swap.pair_fees_json,
59 ),
60 )?;
61
62 let rows_affected = con.execute(
63 "UPDATE send_swaps
64 SET
65 description = :description,
66 preimage = :preimage,
67 lockup_tx_id = :lockup_tx_id,
68 refund_address = :refund_address,
69 refund_tx_id = :refund_tx_id,
70 state = :state
71 WHERE
72 id = :id AND
73 version = :version",
74 named_params! {
75 ":id": &send_swap.id,
76 ":description": &send_swap.description,
77 ":preimage": &send_swap.preimage,
78 ":lockup_tx_id": &send_swap.lockup_tx_id,
79 ":refund_address": &send_swap.refund_address,
80 ":refund_tx_id": &send_swap.refund_tx_id,
81 ":state": &send_swap.state,
82 ":version": from_u64_to_row(send_swap.metadata.version)?,
83 },
84 )?;
85 ensure_sdk!(
86 rows_affected > 0,
87 anyhow!("Version mismatch for send swap {}", send_swap.id)
88 );
89
90 Ok(())
91 }
92
93 pub(crate) fn insert_or_update_send_swap(&self, send_swap: &SendSwap) -> Result<()> {
94 let maybe_swap = self.fetch_send_swap_by_id(&send_swap.id)?;
95 let updated_fields = SendSyncData::updated_fields(maybe_swap, send_swap);
96
97 let mut con = self.get_connection()?;
98 let tx = con.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
99
100 Self::insert_or_update_send_swap_inner(&tx, send_swap)?;
101
102 let trigger_sync = updated_fields.as_ref().is_none_or(|u| !u.is_empty());
106 match trigger_sync {
107 true => {
108 self.commit_outgoing(&tx, &send_swap.id, RecordType::Send, updated_fields)?;
109 tx.commit()?;
110 self.trigger_sync();
111 }
112 false => {
113 tx.commit()?;
114 }
115 };
116
117 Ok(())
118 }
119
120 pub(crate) fn update_send_swaps_by_state(
121 &self,
122 from_state: PaymentState,
123 to_state: PaymentState,
124 is_local: Option<bool>,
125 ) -> Result<()> {
126 let con = self.get_connection()?;
127 let mut where_clauses = vec!["state = :from_state".to_string()];
128 if let Some(is_local) = is_local {
129 let mut where_is_local = format!("sync_state.is_local = {}", is_local as u8);
130 if is_local {
131 where_is_local = format!("({where_is_local} OR sync_state.is_local IS NULL)");
132 }
133 where_clauses.push(where_is_local);
134 }
135
136 let where_clause_str = where_clauses_to_string(where_clauses);
137 let query = format!(
138 "
139 UPDATE send_swaps
140 SET state = :to_state
141 WHERE id IN (
142 SELECT id
143 FROM send_swaps AS ss
144 LEFT JOIN sync_state ON ss.id = sync_state.data_id
145 {where_clause_str}
146 ORDER BY created_at
147 )
148 "
149 );
150
151 con.execute(
152 &query,
153 named_params! {
154 ":from_state": from_state,
155 ":to_state": to_state,
156 },
157 )?;
158
159 Ok(())
160 }
161
162 fn list_send_swaps_query(where_clauses: Vec<String>) -> String {
163 let mut where_clause_str = String::new();
164 if !where_clauses.is_empty() {
165 where_clause_str = String::from("WHERE ");
166 where_clause_str.push_str(where_clauses.join(" AND ").as_str());
167 }
168
169 format!(
170 "
171 SELECT
172 id,
173 invoice,
174 bolt12_offer,
175 payment_hash,
176 destination_pubkey,
177 timeout_block_height,
178 description,
179 preimage,
180 payer_amount_sat,
181 receiver_amount_sat,
182 create_response_json,
183 refund_private_key,
184 lockup_tx_id,
185 refund_address,
186 refund_tx_id,
187 created_at,
188 state,
189 pair_fees_json,
190 version,
191 last_updated_at,
192
193 sync_state.is_local
194 FROM send_swaps AS ss
195 LEFT JOIN sync_state ON ss.id = sync_state.data_id
196 {where_clause_str}
197 ORDER BY created_at
198 "
199 )
200 }
201
202 pub(crate) fn fetch_send_swap_by_id(&self, id: &str) -> Result<Option<SendSwap>> {
203 let con: Connection = self.get_connection()?;
204 let query = Self::list_send_swaps_query(vec!["id = ?1 or id_hash = ?1".to_string()]);
205 let res = con.query_row(&query, [id], Self::sql_row_to_send_swap);
206
207 Ok(res.ok())
208 }
209
210 pub(crate) fn fetch_send_swap_by_invoice(&self, invoice: &str) -> Result<Option<SendSwap>> {
211 let con: Connection = self.get_connection()?;
212 let query = Self::list_send_swaps_query(vec!["invoice= ?1".to_string()]);
213 let res = con.query_row(&query, [invoice], Self::sql_row_to_send_swap);
214
215 Ok(res.ok())
216 }
217
218 fn sql_row_to_send_swap(row: &Row) -> rusqlite::Result<SendSwap> {
219 Ok(SendSwap {
220 id: row.get(0)?,
221 invoice: row.get(1)?,
222 bolt12_offer: row.get(2)?,
223 payment_hash: row.get(3)?,
224 destination_pubkey: row.get(4)?,
225 timeout_block_height: from_row_to_u64(row, 5)?,
226 description: row.get(6)?,
227 preimage: row.get(7)?,
228 payer_amount_sat: from_row_to_u64(row, 8)?,
229 receiver_amount_sat: from_row_to_u64(row, 9)?,
230 create_response_json: row.get(10)?,
231 refund_private_key: row.get(11)?,
232 lockup_tx_id: row.get(12)?,
233 refund_address: row.get(13)?,
234 refund_tx_id: row.get(14)?,
235 created_at: row.get(15)?,
236 state: row.get(16)?,
237 pair_fees_json: row.get(17)?,
238 metadata: SwapMetadata {
239 version: from_row_to_u64(row, 18)?,
240 last_updated_at: row.get(19)?,
241 is_local: row.get::<usize, Option<bool>>(20)?.unwrap_or(true),
242 },
243 })
244 }
245
246 pub(crate) fn list_send_swaps_where(
247 &self,
248 con: &Connection,
249 where_clauses: Vec<String>,
250 ) -> Result<Vec<SendSwap>> {
251 let query = Self::list_send_swaps_query(where_clauses);
252 let ongoing_send = con
253 .prepare(&query)?
254 .query_map(params![], Self::sql_row_to_send_swap)?
255 .map(|i| i.unwrap())
256 .collect();
257 Ok(ongoing_send)
258 }
259
260 pub(crate) fn list_send_swaps_by_state(
261 &self,
262 states: Vec<PaymentState>,
263 ) -> Result<Vec<SendSwap>> {
264 let con = self.get_connection()?;
265 let where_clause = vec![get_where_clause_state_in(&states)];
266 self.list_send_swaps_where(&con, where_clause)
267 }
268
269 pub(crate) fn list_ongoing_send_swaps(&self) -> Result<Vec<SendSwap>> {
270 self.list_send_swaps_by_state(vec![PaymentState::Created, PaymentState::Pending])
271 }
272
273 pub(crate) fn list_pending_send_swaps(&self) -> Result<Vec<SendSwap>> {
274 self.list_send_swaps_by_state(vec![PaymentState::Pending, PaymentState::RefundPending])
275 }
276
277 pub(crate) fn list_recoverable_send_swaps(&self) -> Result<Vec<SendSwap>> {
278 self.list_send_swaps_by_state(vec![
279 PaymentState::Created,
280 PaymentState::Pending,
281 PaymentState::RefundPending,
282 ])
283 }
284
285 pub(crate) fn try_handle_send_swap_update(
286 &self,
287 swap_id: &str,
288 to_state: PaymentState,
289 preimage: Option<&str>,
290 lockup_tx_id: Option<&str>,
291 refund_tx_id: Option<&str>,
292 ) -> Result<(), PaymentError> {
293 let mut con = self.get_connection()?;
295 let tx = con.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
296
297 tx.execute(
298 "UPDATE send_swaps
299 SET
300 preimage = COALESCE(preimage, :preimage),
301 lockup_tx_id = COALESCE(lockup_tx_id, :lockup_tx_id),
302 refund_tx_id = COALESCE(refund_tx_id, :refund_tx_id),
303 state = :state
304 WHERE
305 id = :id",
306 named_params! {
307 ":id": swap_id,
308 ":preimage": preimage,
309 ":lockup_tx_id": lockup_tx_id,
310 ":refund_tx_id": refund_tx_id,
311 ":state": to_state,
312 },
313 )?;
314
315 let updated_fields = get_updated_fields!(preimage);
316 self.commit_outgoing(&tx, swap_id, RecordType::Send, updated_fields)?;
317 tx.commit()?;
318
319 self.trigger_sync();
320
321 Ok(())
322 }
323
324 pub(crate) fn set_send_swap_refund_address(
325 &self,
326 swap_id: &str,
327 refund_address: &str,
328 ) -> Result<(), PaymentError> {
329 let con = self.get_connection()?;
330 con.execute(
331 "UPDATE send_swaps
332 SET refund_address = :refund_address
333 WHERE id = :id",
334 named_params! {
335 ":id": swap_id,
336 ":refund_address": refund_address,
337 },
338 )?;
339 Ok(())
340 }
341
342 pub(crate) fn set_send_swap_lockup_tx_id(
343 &self,
344 swap_id: &str,
345 lockup_tx_id: &str,
346 ) -> Result<(), PaymentError> {
347 let con = self.get_connection()?;
348
349 let row_count = con
350 .execute(
351 "UPDATE send_swaps
352 SET lockup_tx_id = :lockup_tx_id
353 WHERE id = :id AND lockup_tx_id IS NULL",
354 named_params! {
355 ":id": swap_id,
356 ":lockup_tx_id": lockup_tx_id,
357 },
358 )
359 .map_err(|e| {
360 log::error!("Failed to set send swap lockup_tx_id: {e:?}");
361 PaymentError::PersistError
362 })?;
363 match row_count {
364 1 => Ok(()),
365 _ => Err(PaymentError::PaymentInProgress),
366 }
367 }
368
369 pub(crate) fn unset_send_swap_lockup_tx_id(
370 &self,
371 swap_id: &str,
372 lockup_tx_id: &str,
373 ) -> Result<(), PaymentError> {
374 let con = self.get_connection()?;
375 con.execute(
376 "UPDATE send_swaps
377 SET lockup_tx_id = NULL
378 WHERE id = :id AND lockup_tx_id = :lockup_tx_id",
379 named_params! {
380 ":id": swap_id,
381 ":lockup_tx_id": lockup_tx_id,
382 },
383 )
384 .map_err(|e| {
385 log::error!("Failed to unset send swap lockup_tx_id: {e:?}");
386 PaymentError::PersistError
387 })?;
388 Ok(())
389 }
390}
391
392#[derive(Clone, Debug, Serialize, Deserialize)]
393pub(crate) struct InternalCreateSubmarineResponse {
394 pub(crate) accept_zero_conf: bool,
395 pub(crate) address: String,
396 pub(crate) bip21: String,
397 pub(crate) claim_public_key: String,
398 pub(crate) expected_amount: u64,
399 pub(crate) referral_id: Option<String>,
400 pub(crate) swap_tree: InternalSwapTree,
401 #[serde(default)]
402 pub(crate) timeout_block_height: u64,
403 pub(crate) blinding_key: Option<String>,
404}
405impl InternalCreateSubmarineResponse {
406 pub(crate) fn try_convert_from_boltz(
407 boltz_create_response: &CreateSubmarineResponse,
408 expected_swap_id: &str,
409 ) -> Result<InternalCreateSubmarineResponse, PaymentError> {
410 ensure_sdk!(
413 boltz_create_response.id == expected_swap_id,
414 PaymentError::PersistError
415 );
416
417 let res = InternalCreateSubmarineResponse {
418 accept_zero_conf: boltz_create_response.accept_zero_conf,
419 address: boltz_create_response.address.clone(),
420 bip21: boltz_create_response.bip21.clone(),
421 claim_public_key: boltz_create_response.claim_public_key.to_string(),
422 expected_amount: boltz_create_response.expected_amount,
423 referral_id: boltz_create_response.referral_id.clone(),
424 swap_tree: boltz_create_response.swap_tree.clone().into(),
425 timeout_block_height: boltz_create_response.timeout_block_height,
426 blinding_key: boltz_create_response.blinding_key.clone(),
427 };
428 Ok(res)
429 }
430}
431
432#[cfg(test)]
433mod tests {
434 use crate::test_utils::persist::{create_persister, new_send_swap};
435 use anyhow::{anyhow, Result};
436
437 use super::PaymentState;
438
439 #[cfg(feature = "browser-tests")]
440 wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
441
442 #[sdk_macros::test_all]
443 fn test_fetch_send_swap() -> Result<()> {
444 create_persister!(storage);
445 let send_swap = new_send_swap(None, None);
446
447 storage.insert_or_update_send_swap(&send_swap)?;
448 assert!(storage.fetch_send_swap_by_id(&send_swap.id).is_ok());
450 assert!(storage
452 .fetch_send_swap_by_invoice(&send_swap.invoice)
453 .is_ok());
454
455 Ok(())
456 }
457
458 #[sdk_macros::test_all]
459 fn test_list_send_swap() -> Result<()> {
460 create_persister!(storage);
461
462 let range = 0..3;
464 for _ in range.clone() {
465 storage.insert_or_update_send_swap(&new_send_swap(None, None))?;
466 }
467
468 let con = storage.get_connection()?;
469 let swaps = storage.list_send_swaps_where(&con, vec![])?;
470 assert_eq!(swaps.len(), range.len());
471
472 storage.insert_or_update_send_swap(&new_send_swap(Some(PaymentState::Pending), None))?;
474 let ongoing_swaps = storage.list_ongoing_send_swaps()?;
475 assert_eq!(ongoing_swaps.len(), 4);
476
477 let ongoing_swaps = storage.list_pending_send_swaps()?;
479 assert_eq!(ongoing_swaps.len(), 1);
480
481 Ok(())
482 }
483
484 #[sdk_macros::test_all]
485 fn test_update_send_swap() -> Result<()> {
486 create_persister!(storage);
487
488 let mut send_swap = new_send_swap(None, None);
489 storage.insert_or_update_send_swap(&send_swap)?;
490
491 let new_state = PaymentState::Pending;
493 let preimage = Some("preimage");
494 let lockup_tx_id = Some("lockup_tx_id");
495 let refund_tx_id = Some("refund_tx_id");
496
497 storage.try_handle_send_swap_update(
498 &send_swap.id,
499 new_state,
500 preimage,
501 lockup_tx_id,
502 refund_tx_id,
503 )?;
504
505 let updated_send_swap = storage
506 .fetch_send_swap_by_id(&send_swap.id)?
507 .ok_or(anyhow!("Could not find Send swap in database"))?;
508
509 assert_eq!(new_state, updated_send_swap.state);
510 assert_eq!(preimage, updated_send_swap.preimage.as_deref());
511 assert_eq!(lockup_tx_id, updated_send_swap.lockup_tx_id.as_deref());
512 assert_eq!(refund_tx_id, updated_send_swap.refund_tx_id.as_deref());
513
514 send_swap.state = new_state;
515
516 let new_state = PaymentState::Complete;
518 storage.update_send_swaps_by_state(send_swap.state, PaymentState::Complete, None)?;
519 let updated_send_swap = storage
520 .fetch_send_swap_by_id(&send_swap.id)?
521 .ok_or(anyhow!("Could not find Send swap in database"))?;
522 assert_eq!(new_state, updated_send_swap.state);
523
524 Ok(())
525 }
526
527 #[sdk_macros::async_test_all]
528 async fn test_writing_stale_swap() -> Result<()> {
529 create_persister!(storage);
530
531 let send_swap = new_send_swap(None, None);
532 storage.insert_or_update_send_swap(&send_swap)?;
533
534 let mut send_swap = storage.fetch_send_swap_by_id(&send_swap.id)?.unwrap();
536 send_swap.refund_tx_id = Some("tx_id".to_string());
537 storage.insert_or_update_send_swap(&send_swap)?;
538
539 let send_swap = storage.fetch_send_swap_by_id(&send_swap.id)?.unwrap();
541 storage.insert_or_update_send_swap(&send_swap)?;
542
543 let mut send_swap = storage.fetch_send_swap_by_id(&send_swap.id)?.unwrap();
545 send_swap.refund_tx_id = Some("tx_id_2".to_string());
546 storage.set_send_swap_lockup_tx_id(&send_swap.id, "tx_id")?;
548 assert!(storage.insert_or_update_send_swap(&send_swap).is_err());
549
550 Ok(())
551 }
552}