1use anyhow::{anyhow, Result};
2use boltz_client::swaps::boltz::CreateSubmarineResponse;
3use rusqlite::{named_params, params, Connection, Row};
4use sdk_common::bitcoin::hashes::{hex::ToHex, sha256, Hash};
5use serde::{Deserialize, Serialize};
6
7use crate::error::PaymentError;
8use crate::model::*;
9use crate::persist::{get_where_clause_state_in, Persister};
10use crate::sync::model::data::SendSyncData;
11use crate::sync::model::RecordType;
12use crate::{ensure_sdk, get_updated_fields};
13
14use super::where_clauses_to_string;
15
16impl Persister {
17 pub(crate) fn insert_or_update_send_swap_inner(
18 con: &Connection,
19 send_swap: &SendSwap,
20 ) -> Result<()> {
21 let id_hash = sha256::Hash::hash(send_swap.id.as_bytes()).to_hex();
22 con.execute(
23 "
24 INSERT INTO send_swaps (
25 id,
26 id_hash,
27 invoice,
28 bolt12_offer,
29 payment_hash,
30 destination_pubkey,
31 timeout_block_height,
32 payer_amount_sat,
33 receiver_amount_sat,
34 create_response_json,
35 refund_private_key,
36 created_at,
37 state,
38 pair_fees_json
39 )
40 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
41 ON CONFLICT DO NOTHING
42 ",
43 (
44 &send_swap.id,
45 &id_hash,
46 &send_swap.invoice,
47 &send_swap.bolt12_offer,
48 &send_swap.payment_hash,
49 &send_swap.destination_pubkey,
50 &send_swap.timeout_block_height,
51 &send_swap.payer_amount_sat,
52 &send_swap.receiver_amount_sat,
53 &send_swap.create_response_json,
54 &send_swap.refund_private_key,
55 &send_swap.created_at,
56 &send_swap.state,
57 &send_swap.pair_fees_json,
58 ),
59 )?;
60
61 let rows_affected = con.execute(
62 "UPDATE send_swaps
63 SET
64 description = :description,
65 preimage = :preimage,
66 lockup_tx_id = :lockup_tx_id,
67 refund_address = :refund_address,
68 refund_tx_id = :refund_tx_id,
69 state = :state
70 WHERE
71 id = :id AND
72 version = :version",
73 named_params! {
74 ":id": &send_swap.id,
75 ":description": &send_swap.description,
76 ":preimage": &send_swap.preimage,
77 ":lockup_tx_id": &send_swap.lockup_tx_id,
78 ":refund_address": &send_swap.refund_address,
79 ":refund_tx_id": &send_swap.refund_tx_id,
80 ":state": &send_swap.state,
81 ":version": &send_swap.metadata.version,
82 },
83 )?;
84 ensure_sdk!(
85 rows_affected > 0,
86 anyhow!("Version mismatch for send swap {}", send_swap.id)
87 );
88
89 Ok(())
90 }
91
92 pub(crate) fn insert_or_update_send_swap(&self, send_swap: &SendSwap) -> Result<()> {
93 let maybe_swap = self.fetch_send_swap_by_id(&send_swap.id)?;
94 let updated_fields = SendSyncData::updated_fields(maybe_swap, send_swap);
95
96 let mut con = self.get_connection()?;
97 let tx = con.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
98
99 Self::insert_or_update_send_swap_inner(&tx, send_swap)?;
100
101 let trigger_sync = updated_fields.as_ref().is_none_or(|u| !u.is_empty());
105 match trigger_sync {
106 true => {
107 self.commit_outgoing(&tx, &send_swap.id, RecordType::Send, updated_fields)?;
108 tx.commit()?;
109 self.trigger_sync();
110 }
111 false => {
112 tx.commit()?;
113 }
114 };
115
116 Ok(())
117 }
118
119 pub(crate) fn update_send_swaps_by_state(
120 &self,
121 from_state: PaymentState,
122 to_state: PaymentState,
123 is_local: Option<bool>,
124 ) -> Result<()> {
125 let con = self.get_connection()?;
126 let mut where_clauses = vec!["state = :from_state".to_string()];
127 if let Some(is_local) = is_local {
128 let mut where_is_local = format!("sync_state.is_local = {}", is_local as u8);
129 if is_local {
130 where_is_local = format!("({} OR sync_state.is_local IS NULL)", where_is_local);
131 }
132 where_clauses.push(where_is_local);
133 }
134
135 let where_clause_str = where_clauses_to_string(where_clauses);
136 let query = format!(
137 "
138 UPDATE send_swaps
139 SET state = :to_state
140 WHERE id IN (
141 SELECT id
142 FROM send_swaps AS ss
143 LEFT JOIN sync_state ON ss.id = sync_state.data_id
144 {where_clause_str}
145 ORDER BY created_at
146 )
147 "
148 );
149
150 con.execute(
151 &query,
152 named_params! {
153 ":from_state": from_state,
154 ":to_state": to_state,
155 },
156 )?;
157
158 Ok(())
159 }
160
161 fn list_send_swaps_query(where_clauses: Vec<String>) -> String {
162 let mut where_clause_str = String::new();
163 if !where_clauses.is_empty() {
164 where_clause_str = String::from("WHERE ");
165 where_clause_str.push_str(where_clauses.join(" AND ").as_str());
166 }
167
168 format!(
169 "
170 SELECT
171 id,
172 invoice,
173 bolt12_offer,
174 payment_hash,
175 destination_pubkey,
176 timeout_block_height,
177 description,
178 preimage,
179 payer_amount_sat,
180 receiver_amount_sat,
181 create_response_json,
182 refund_private_key,
183 lockup_tx_id,
184 refund_address,
185 refund_tx_id,
186 created_at,
187 state,
188 pair_fees_json,
189 version,
190 last_updated_at,
191
192 sync_state.is_local
193 FROM send_swaps AS ss
194 LEFT JOIN sync_state ON ss.id = sync_state.data_id
195 {where_clause_str}
196 ORDER BY created_at
197 "
198 )
199 }
200
201 pub(crate) fn fetch_send_swap_by_id(&self, id: &str) -> Result<Option<SendSwap>> {
202 let con: Connection = self.get_connection()?;
203 let query = Self::list_send_swaps_query(vec!["id = ?1 or id_hash = ?1".to_string()]);
204 let res = con.query_row(&query, [id], Self::sql_row_to_send_swap);
205
206 Ok(res.ok())
207 }
208
209 pub(crate) fn fetch_send_swap_by_invoice(&self, invoice: &str) -> Result<Option<SendSwap>> {
210 let con: Connection = self.get_connection()?;
211 let query = Self::list_send_swaps_query(vec!["invoice= ?1".to_string()]);
212 let res = con.query_row(&query, [invoice], Self::sql_row_to_send_swap);
213
214 Ok(res.ok())
215 }
216
217 fn sql_row_to_send_swap(row: &Row) -> rusqlite::Result<SendSwap> {
218 Ok(SendSwap {
219 id: row.get(0)?,
220 invoice: row.get(1)?,
221 bolt12_offer: row.get(2)?,
222 payment_hash: row.get(3)?,
223 destination_pubkey: row.get(4)?,
224 timeout_block_height: row.get(5)?,
225 description: row.get(6)?,
226 preimage: row.get(7)?,
227 payer_amount_sat: row.get(8)?,
228 receiver_amount_sat: row.get(9)?,
229 create_response_json: row.get(10)?,
230 refund_private_key: row.get(11)?,
231 lockup_tx_id: row.get(12)?,
232 refund_address: row.get(13)?,
233 refund_tx_id: row.get(14)?,
234 created_at: row.get(15)?,
235 state: row.get(16)?,
236 pair_fees_json: row.get(17)?,
237 metadata: SwapMetadata {
238 version: row.get(18)?,
239 last_updated_at: row.get(19)?,
240 is_local: row.get::<usize, Option<bool>>(20)?.unwrap_or(true),
241 },
242 })
243 }
244
245 pub(crate) fn list_send_swaps_where(
246 &self,
247 con: &Connection,
248 where_clauses: Vec<String>,
249 ) -> Result<Vec<SendSwap>> {
250 let query = Self::list_send_swaps_query(where_clauses);
251 let ongoing_send = con
252 .prepare(&query)?
253 .query_map(params![], Self::sql_row_to_send_swap)?
254 .map(|i| i.unwrap())
255 .collect();
256 Ok(ongoing_send)
257 }
258
259 pub(crate) fn list_send_swaps_by_state(
260 &self,
261 states: Vec<PaymentState>,
262 ) -> Result<Vec<SendSwap>> {
263 let con = self.get_connection()?;
264 let where_clause = vec![get_where_clause_state_in(&states)];
265 self.list_send_swaps_where(&con, where_clause)
266 }
267
268 pub(crate) fn list_ongoing_send_swaps(&self) -> Result<Vec<SendSwap>> {
269 self.list_send_swaps_by_state(vec![PaymentState::Created, PaymentState::Pending])
270 }
271
272 pub(crate) fn list_pending_send_swaps(&self) -> Result<Vec<SendSwap>> {
273 self.list_send_swaps_by_state(vec![PaymentState::Pending, PaymentState::RefundPending])
274 }
275
276 pub(crate) fn list_recoverable_send_swaps(&self) -> Result<Vec<SendSwap>> {
277 self.list_send_swaps_by_state(vec![
278 PaymentState::Created,
279 PaymentState::Pending,
280 PaymentState::RefundPending,
281 ])
282 }
283
284 pub(crate) fn try_handle_send_swap_update(
285 &self,
286 swap_id: &str,
287 to_state: PaymentState,
288 preimage: Option<&str>,
289 lockup_tx_id: Option<&str>,
290 refund_tx_id: Option<&str>,
291 ) -> Result<(), PaymentError> {
292 let mut con = self.get_connection()?;
294 let tx = con.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
295
296 tx.execute(
297 "UPDATE send_swaps
298 SET
299 preimage = COALESCE(preimage, :preimage),
300 lockup_tx_id = COALESCE(lockup_tx_id, :lockup_tx_id),
301 refund_tx_id = COALESCE(refund_tx_id, :refund_tx_id),
302 state = :state
303 WHERE
304 id = :id",
305 named_params! {
306 ":id": swap_id,
307 ":preimage": preimage,
308 ":lockup_tx_id": lockup_tx_id,
309 ":refund_tx_id": refund_tx_id,
310 ":state": to_state,
311 },
312 )?;
313
314 let updated_fields = get_updated_fields!(preimage);
315 self.commit_outgoing(&tx, swap_id, RecordType::Send, updated_fields)?;
316 tx.commit()?;
317
318 self.trigger_sync();
319
320 Ok(())
321 }
322
323 pub(crate) fn set_send_swap_refund_address(
324 &self,
325 swap_id: &str,
326 refund_address: &str,
327 ) -> Result<(), PaymentError> {
328 let con = self.get_connection()?;
329 con.execute(
330 "UPDATE send_swaps
331 SET refund_address = :refund_address
332 WHERE id = :id",
333 named_params! {
334 ":id": swap_id,
335 ":refund_address": refund_address,
336 },
337 )?;
338 Ok(())
339 }
340
341 pub(crate) fn set_send_swap_lockup_tx_id(
342 &self,
343 swap_id: &str,
344 lockup_tx_id: &str,
345 ) -> Result<(), PaymentError> {
346 let con = self.get_connection()?;
347
348 let row_count = con
349 .execute(
350 "UPDATE send_swaps
351 SET lockup_tx_id = :lockup_tx_id
352 WHERE id = :id AND lockup_tx_id IS NULL",
353 named_params! {
354 ":id": swap_id,
355 ":lockup_tx_id": lockup_tx_id,
356 },
357 )
358 .map_err(|_| PaymentError::PersistError)?;
359 match row_count {
360 1 => Ok(()),
361 _ => Err(PaymentError::PaymentInProgress),
362 }
363 }
364
365 pub(crate) fn unset_send_swap_lockup_tx_id(
366 &self,
367 swap_id: &str,
368 lockup_tx_id: &str,
369 ) -> Result<(), PaymentError> {
370 let con = self.get_connection()?;
371 con.execute(
372 "UPDATE send_swaps
373 SET lockup_tx_id = NULL
374 WHERE id = :id AND lockup_tx_id = :lockup_tx_id",
375 named_params! {
376 ":id": swap_id,
377 ":lockup_tx_id": lockup_tx_id,
378 },
379 )
380 .map_err(|_| PaymentError::PersistError)?;
381 Ok(())
382 }
383}
384
385#[derive(Clone, Debug, Serialize, Deserialize)]
386pub(crate) struct InternalCreateSubmarineResponse {
387 pub(crate) accept_zero_conf: bool,
388 pub(crate) address: String,
389 pub(crate) bip21: String,
390 pub(crate) claim_public_key: String,
391 pub(crate) expected_amount: u64,
392 pub(crate) referral_id: Option<String>,
393 pub(crate) swap_tree: InternalSwapTree,
394 #[serde(default)]
395 pub(crate) timeout_block_height: u64,
396 pub(crate) blinding_key: Option<String>,
397}
398impl InternalCreateSubmarineResponse {
399 pub(crate) fn try_convert_from_boltz(
400 boltz_create_response: &CreateSubmarineResponse,
401 expected_swap_id: &str,
402 ) -> Result<InternalCreateSubmarineResponse, PaymentError> {
403 ensure_sdk!(
406 boltz_create_response.id == expected_swap_id,
407 PaymentError::PersistError
408 );
409
410 let res = InternalCreateSubmarineResponse {
411 accept_zero_conf: boltz_create_response.accept_zero_conf,
412 address: boltz_create_response.address.clone(),
413 bip21: boltz_create_response.bip21.clone(),
414 claim_public_key: boltz_create_response.claim_public_key.to_string(),
415 expected_amount: boltz_create_response.expected_amount,
416 referral_id: boltz_create_response.referral_id.clone(),
417 swap_tree: boltz_create_response.swap_tree.clone().into(),
418 timeout_block_height: boltz_create_response.timeout_block_height,
419 blinding_key: boltz_create_response.blinding_key.clone(),
420 };
421 Ok(res)
422 }
423}
424
425#[cfg(test)]
426mod tests {
427 use crate::test_utils::persist::{create_persister, new_send_swap};
428 use anyhow::{anyhow, Result};
429
430 use super::PaymentState;
431
432 #[cfg(feature = "browser-tests")]
433 wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
434
435 #[sdk_macros::test_all]
436 fn test_fetch_send_swap() -> Result<()> {
437 create_persister!(storage);
438 let send_swap = new_send_swap(None, None);
439
440 storage.insert_or_update_send_swap(&send_swap)?;
441 assert!(storage.fetch_send_swap_by_id(&send_swap.id).is_ok());
443 assert!(storage
445 .fetch_send_swap_by_invoice(&send_swap.invoice)
446 .is_ok());
447
448 Ok(())
449 }
450
451 #[sdk_macros::test_all]
452 fn test_list_send_swap() -> Result<()> {
453 create_persister!(storage);
454
455 let range = 0..3;
457 for _ in range.clone() {
458 storage.insert_or_update_send_swap(&new_send_swap(None, None))?;
459 }
460
461 let con = storage.get_connection()?;
462 let swaps = storage.list_send_swaps_where(&con, vec![])?;
463 assert_eq!(swaps.len(), range.len());
464
465 storage.insert_or_update_send_swap(&new_send_swap(Some(PaymentState::Pending), None))?;
467 let ongoing_swaps = storage.list_ongoing_send_swaps()?;
468 assert_eq!(ongoing_swaps.len(), 4);
469
470 let ongoing_swaps = storage.list_pending_send_swaps()?;
472 assert_eq!(ongoing_swaps.len(), 1);
473
474 Ok(())
475 }
476
477 #[sdk_macros::test_all]
478 fn test_update_send_swap() -> Result<()> {
479 create_persister!(storage);
480
481 let mut send_swap = new_send_swap(None, None);
482 storage.insert_or_update_send_swap(&send_swap)?;
483
484 let new_state = PaymentState::Pending;
486 let preimage = Some("preimage");
487 let lockup_tx_id = Some("lockup_tx_id");
488 let refund_tx_id = Some("refund_tx_id");
489
490 storage.try_handle_send_swap_update(
491 &send_swap.id,
492 new_state,
493 preimage,
494 lockup_tx_id,
495 refund_tx_id,
496 )?;
497
498 let updated_send_swap = storage
499 .fetch_send_swap_by_id(&send_swap.id)?
500 .ok_or(anyhow!("Could not find Send swap in database"))?;
501
502 assert_eq!(new_state, updated_send_swap.state);
503 assert_eq!(preimage, updated_send_swap.preimage.as_deref());
504 assert_eq!(lockup_tx_id, updated_send_swap.lockup_tx_id.as_deref());
505 assert_eq!(refund_tx_id, updated_send_swap.refund_tx_id.as_deref());
506
507 send_swap.state = new_state;
508
509 let new_state = PaymentState::Complete;
511 storage.update_send_swaps_by_state(send_swap.state, PaymentState::Complete, None)?;
512 let updated_send_swap = storage
513 .fetch_send_swap_by_id(&send_swap.id)?
514 .ok_or(anyhow!("Could not find Send swap in database"))?;
515 assert_eq!(new_state, updated_send_swap.state);
516
517 Ok(())
518 }
519
520 #[sdk_macros::async_test_all]
521 async fn test_writing_stale_swap() -> Result<()> {
522 create_persister!(storage);
523
524 let send_swap = new_send_swap(None, None);
525 storage.insert_or_update_send_swap(&send_swap)?;
526
527 let mut send_swap = storage.fetch_send_swap_by_id(&send_swap.id)?.unwrap();
529 send_swap.refund_tx_id = Some("tx_id".to_string());
530 storage.insert_or_update_send_swap(&send_swap)?;
531
532 let send_swap = storage.fetch_send_swap_by_id(&send_swap.id)?.unwrap();
534 storage.insert_or_update_send_swap(&send_swap)?;
535
536 let mut send_swap = storage.fetch_send_swap_by_id(&send_swap.id)?.unwrap();
538 send_swap.refund_tx_id = Some("tx_id_2".to_string());
539 storage.set_send_swap_lockup_tx_id(&send_swap.id, "tx_id")?;
541 assert!(storage.insert_or_update_send_swap(&send_swap).is_err());
542
543 Ok(())
544 }
545}