breez_sdk_liquid/persist/
send.rs

1use anyhow::{anyhow, Result};
2use boltz_client::swaps::boltz::CreateSubmarineResponse;
3use rusqlite::{named_params, params, Connection, Row};
4use sdk_common::bitcoin::hashes::{hex::ToHex, sha256, Hash};
5use serde::{Deserialize, Serialize};
6
7use crate::error::PaymentError;
8use crate::model::*;
9use crate::persist::{get_where_clause_state_in, Persister};
10use crate::sync::model::data::SendSyncData;
11use crate::sync::model::RecordType;
12use crate::utils::{from_row_to_u64, from_u64_to_row};
13use crate::{ensure_sdk, get_updated_fields};
14
15use super::where_clauses_to_string;
16
17impl Persister {
18    pub(crate) fn insert_or_update_send_swap_inner(
19        con: &Connection,
20        send_swap: &SendSwap,
21    ) -> Result<()> {
22        let id_hash = sha256::Hash::hash(send_swap.id.as_bytes()).to_hex();
23        con.execute(
24            "
25            INSERT INTO send_swaps (
26                id,
27                id_hash,
28                invoice,
29                bolt12_offer,
30                payment_hash,
31                destination_pubkey,
32                timeout_block_height,
33                payer_amount_sat,
34                receiver_amount_sat,
35                create_response_json,
36                refund_private_key,
37                created_at,
38                state,
39                pair_fees_json
40            )
41            VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
42            ON CONFLICT DO NOTHING
43            ",
44            (
45                &send_swap.id,
46                &id_hash,
47                &send_swap.invoice,
48                &send_swap.bolt12_offer,
49                &send_swap.payment_hash,
50                &send_swap.destination_pubkey,
51                from_u64_to_row(send_swap.timeout_block_height)?,
52                from_u64_to_row(send_swap.payer_amount_sat)?,
53                from_u64_to_row(send_swap.receiver_amount_sat)?,
54                &send_swap.create_response_json,
55                &send_swap.refund_private_key,
56                &send_swap.created_at,
57                &send_swap.state,
58                &send_swap.pair_fees_json,
59            ),
60        )?;
61
62        let rows_affected = con.execute(
63            "UPDATE send_swaps 
64            SET
65                description = :description,
66                preimage = :preimage,
67                lockup_tx_id = :lockup_tx_id,
68                refund_address = :refund_address,
69                refund_tx_id = :refund_tx_id,
70                state = :state
71            WHERE
72                id = :id AND
73                version = :version",
74            named_params! {
75                ":id": &send_swap.id,
76                ":description": &send_swap.description,
77                ":preimage": &send_swap.preimage,
78                ":lockup_tx_id": &send_swap.lockup_tx_id,
79                ":refund_address": &send_swap.refund_address,
80                ":refund_tx_id": &send_swap.refund_tx_id,
81                ":state": &send_swap.state,
82                ":version": from_u64_to_row(send_swap.metadata.version)?,
83            },
84        )?;
85        ensure_sdk!(
86            rows_affected > 0,
87            anyhow!("Version mismatch for send swap {}", send_swap.id)
88        );
89
90        Ok(())
91    }
92
93    pub(crate) fn insert_or_update_send_swap(&self, send_swap: &SendSwap) -> Result<()> {
94        let maybe_swap = self.fetch_send_swap_by_id(&send_swap.id)?;
95        let updated_fields = SendSyncData::updated_fields(maybe_swap, send_swap);
96
97        let mut con = self.get_connection()?;
98        let tx = con.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
99
100        Self::insert_or_update_send_swap_inner(&tx, send_swap)?;
101
102        // Trigger a sync if:
103        // - updated_fields is None (swap is inserted, not updated)
104        // - updated_fields in a non empty list of updated fields
105        let trigger_sync = updated_fields.as_ref().is_none_or(|u| !u.is_empty());
106        match trigger_sync {
107            true => {
108                self.commit_outgoing(&tx, &send_swap.id, RecordType::Send, updated_fields)?;
109                tx.commit()?;
110                self.trigger_sync();
111            }
112            false => {
113                tx.commit()?;
114            }
115        };
116
117        Ok(())
118    }
119
120    pub(crate) fn update_send_swaps_by_state(
121        &self,
122        from_state: PaymentState,
123        to_state: PaymentState,
124        is_local: Option<bool>,
125    ) -> Result<()> {
126        let con = self.get_connection()?;
127        let mut where_clauses = vec!["state = :from_state".to_string()];
128        if let Some(is_local) = is_local {
129            let mut where_is_local = format!("sync_state.is_local = {}", is_local as u8);
130            if is_local {
131                where_is_local = format!("({where_is_local} OR sync_state.is_local IS NULL)");
132            }
133            where_clauses.push(where_is_local);
134        }
135
136        let where_clause_str = where_clauses_to_string(where_clauses);
137        let query = format!(
138            "
139            UPDATE send_swaps
140            SET state = :to_state
141            WHERE id IN (
142                SELECT id
143                FROM send_swaps AS ss
144                LEFT JOIN sync_state ON ss.id = sync_state.data_id
145                {where_clause_str}
146                ORDER BY created_at
147            )
148            "
149        );
150
151        con.execute(
152            &query,
153            named_params! {
154                ":from_state": from_state,
155                ":to_state": to_state,
156            },
157        )?;
158
159        Ok(())
160    }
161
162    fn list_send_swaps_query(where_clauses: Vec<String>) -> String {
163        let mut where_clause_str = String::new();
164        if !where_clauses.is_empty() {
165            where_clause_str = String::from("WHERE ");
166            where_clause_str.push_str(where_clauses.join(" AND ").as_str());
167        }
168
169        format!(
170            "
171            SELECT
172                id,
173                invoice,
174                bolt12_offer,
175                payment_hash,
176                destination_pubkey,
177                timeout_block_height,
178                description,
179                preimage,
180                payer_amount_sat,
181                receiver_amount_sat,
182                create_response_json,
183                refund_private_key,
184                lockup_tx_id,
185                refund_address,
186                refund_tx_id,
187                created_at,
188                state,
189                pair_fees_json,
190                version,
191                last_updated_at,
192
193                sync_state.is_local
194            FROM send_swaps AS ss
195            LEFT JOIN sync_state ON ss.id = sync_state.data_id
196            {where_clause_str}
197            ORDER BY created_at
198        "
199        )
200    }
201
202    pub(crate) fn fetch_send_swap_by_id(&self, id: &str) -> Result<Option<SendSwap>> {
203        let con: Connection = self.get_connection()?;
204        let query = Self::list_send_swaps_query(vec!["id = ?1 or id_hash = ?1".to_string()]);
205        let res = con.query_row(&query, [id], Self::sql_row_to_send_swap);
206
207        Ok(res.ok())
208    }
209
210    pub(crate) fn fetch_send_swap_by_invoice(&self, invoice: &str) -> Result<Option<SendSwap>> {
211        let con: Connection = self.get_connection()?;
212        let query = Self::list_send_swaps_query(vec!["invoice= ?1".to_string()]);
213        let res = con.query_row(&query, [invoice], Self::sql_row_to_send_swap);
214
215        Ok(res.ok())
216    }
217
218    fn sql_row_to_send_swap(row: &Row) -> rusqlite::Result<SendSwap> {
219        Ok(SendSwap {
220            id: row.get(0)?,
221            invoice: row.get(1)?,
222            bolt12_offer: row.get(2)?,
223            payment_hash: row.get(3)?,
224            destination_pubkey: row.get(4)?,
225            timeout_block_height: from_row_to_u64(row, 5)?,
226            description: row.get(6)?,
227            preimage: row.get(7)?,
228            payer_amount_sat: from_row_to_u64(row, 8)?,
229            receiver_amount_sat: from_row_to_u64(row, 9)?,
230            create_response_json: row.get(10)?,
231            refund_private_key: row.get(11)?,
232            lockup_tx_id: row.get(12)?,
233            refund_address: row.get(13)?,
234            refund_tx_id: row.get(14)?,
235            created_at: row.get(15)?,
236            state: row.get(16)?,
237            pair_fees_json: row.get(17)?,
238            metadata: SwapMetadata {
239                version: from_row_to_u64(row, 18)?,
240                last_updated_at: row.get(19)?,
241                is_local: row.get::<usize, Option<bool>>(20)?.unwrap_or(true),
242            },
243        })
244    }
245
246    pub(crate) fn list_send_swaps_where(
247        &self,
248        con: &Connection,
249        where_clauses: Vec<String>,
250    ) -> Result<Vec<SendSwap>> {
251        let query = Self::list_send_swaps_query(where_clauses);
252        let ongoing_send = con
253            .prepare(&query)?
254            .query_map(params![], Self::sql_row_to_send_swap)?
255            .map(|i| i.unwrap())
256            .collect();
257        Ok(ongoing_send)
258    }
259
260    pub(crate) fn list_send_swaps_by_state(
261        &self,
262        states: Vec<PaymentState>,
263    ) -> Result<Vec<SendSwap>> {
264        let con = self.get_connection()?;
265        let where_clause = vec![get_where_clause_state_in(&states)];
266        self.list_send_swaps_where(&con, where_clause)
267    }
268
269    pub(crate) fn list_ongoing_send_swaps(&self) -> Result<Vec<SendSwap>> {
270        self.list_send_swaps_by_state(vec![PaymentState::Created, PaymentState::Pending])
271    }
272
273    pub(crate) fn list_pending_send_swaps(&self) -> Result<Vec<SendSwap>> {
274        self.list_send_swaps_by_state(vec![PaymentState::Pending, PaymentState::RefundPending])
275    }
276
277    pub(crate) fn list_recoverable_send_swaps(&self) -> Result<Vec<SendSwap>> {
278        self.list_send_swaps_by_state(vec![
279            PaymentState::Created,
280            PaymentState::Pending,
281            PaymentState::RefundPending,
282        ])
283    }
284
285    pub(crate) fn try_handle_send_swap_update(
286        &self,
287        swap_id: &str,
288        to_state: PaymentState,
289        preimage: Option<&str>,
290        lockup_tx_id: Option<&str>,
291        refund_tx_id: Option<&str>,
292    ) -> Result<(), PaymentError> {
293        // Do not overwrite preimage, lockup_tx_id, refund_tx_id
294        let mut con = self.get_connection()?;
295        let tx = con.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
296
297        tx.execute(
298            "UPDATE send_swaps
299            SET
300                preimage = COALESCE(preimage, :preimage),
301                lockup_tx_id = COALESCE(lockup_tx_id, :lockup_tx_id),
302                refund_tx_id = COALESCE(refund_tx_id, :refund_tx_id),
303                state = :state
304            WHERE
305                id = :id",
306            named_params! {
307                ":id": swap_id,
308                ":preimage": preimage,
309                ":lockup_tx_id": lockup_tx_id,
310                ":refund_tx_id": refund_tx_id,
311                ":state": to_state,
312            },
313        )?;
314
315        let updated_fields = get_updated_fields!(preimage);
316        self.commit_outgoing(&tx, swap_id, RecordType::Send, updated_fields)?;
317        tx.commit()?;
318
319        self.trigger_sync();
320
321        Ok(())
322    }
323
324    pub(crate) fn set_send_swap_refund_address(
325        &self,
326        swap_id: &str,
327        refund_address: &str,
328    ) -> Result<(), PaymentError> {
329        let con = self.get_connection()?;
330        con.execute(
331            "UPDATE send_swaps
332            SET refund_address = :refund_address
333            WHERE id = :id",
334            named_params! {
335                        ":id": swap_id,
336                        ":refund_address": refund_address,
337            },
338        )?;
339        Ok(())
340    }
341
342    pub(crate) fn set_send_swap_lockup_tx_id(
343        &self,
344        swap_id: &str,
345        lockup_tx_id: &str,
346    ) -> Result<(), PaymentError> {
347        let con = self.get_connection()?;
348
349        let row_count = con
350            .execute(
351                "UPDATE send_swaps
352                SET lockup_tx_id = :lockup_tx_id
353                WHERE id = :id AND lockup_tx_id IS NULL",
354                named_params! {
355                    ":id": swap_id,
356                    ":lockup_tx_id": lockup_tx_id,
357                },
358            )
359            .map_err(|e| {
360                log::error!("Failed to set send swap lockup_tx_id: {e:?}");
361                PaymentError::PersistError
362            })?;
363        match row_count {
364            1 => Ok(()),
365            _ => Err(PaymentError::PaymentInProgress),
366        }
367    }
368
369    pub(crate) fn unset_send_swap_lockup_tx_id(
370        &self,
371        swap_id: &str,
372        lockup_tx_id: &str,
373    ) -> Result<(), PaymentError> {
374        let con = self.get_connection()?;
375        con.execute(
376            "UPDATE send_swaps
377            SET lockup_tx_id = NULL
378            WHERE id = :id AND lockup_tx_id = :lockup_tx_id",
379            named_params! {
380                ":id": swap_id,
381                ":lockup_tx_id": lockup_tx_id,
382            },
383        )
384        .map_err(|e| {
385            log::error!("Failed to unset send swap lockup_tx_id: {e:?}");
386            PaymentError::PersistError
387        })?;
388        Ok(())
389    }
390}
391
392#[derive(Clone, Debug, Serialize, Deserialize)]
393pub(crate) struct InternalCreateSubmarineResponse {
394    pub(crate) accept_zero_conf: bool,
395    pub(crate) address: String,
396    pub(crate) bip21: String,
397    pub(crate) claim_public_key: String,
398    pub(crate) expected_amount: u64,
399    pub(crate) referral_id: Option<String>,
400    pub(crate) swap_tree: InternalSwapTree,
401    #[serde(default)]
402    pub(crate) timeout_block_height: u64,
403    pub(crate) blinding_key: Option<String>,
404}
405impl InternalCreateSubmarineResponse {
406    pub(crate) fn try_convert_from_boltz(
407        boltz_create_response: &CreateSubmarineResponse,
408        expected_swap_id: &str,
409    ) -> Result<InternalCreateSubmarineResponse, PaymentError> {
410        // Do not store the CreateResponse fields that are already stored separately
411        // Before skipping them, ensure they match the separately stored ones
412        ensure_sdk!(
413            boltz_create_response.id == expected_swap_id,
414            PaymentError::PersistError
415        );
416
417        let res = InternalCreateSubmarineResponse {
418            accept_zero_conf: boltz_create_response.accept_zero_conf,
419            address: boltz_create_response.address.clone(),
420            bip21: boltz_create_response.bip21.clone(),
421            claim_public_key: boltz_create_response.claim_public_key.to_string(),
422            expected_amount: boltz_create_response.expected_amount,
423            referral_id: boltz_create_response.referral_id.clone(),
424            swap_tree: boltz_create_response.swap_tree.clone().into(),
425            timeout_block_height: boltz_create_response.timeout_block_height,
426            blinding_key: boltz_create_response.blinding_key.clone(),
427        };
428        Ok(res)
429    }
430}
431
432#[cfg(test)]
433mod tests {
434    use crate::test_utils::persist::{create_persister, new_send_swap};
435    use anyhow::{anyhow, Result};
436
437    use super::PaymentState;
438
439    #[cfg(feature = "browser-tests")]
440    wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
441
442    #[sdk_macros::test_all]
443    fn test_fetch_send_swap() -> Result<()> {
444        create_persister!(storage);
445        let send_swap = new_send_swap(None, None);
446
447        storage.insert_or_update_send_swap(&send_swap)?;
448        // Fetch swap by id
449        assert!(storage.fetch_send_swap_by_id(&send_swap.id).is_ok());
450        // Fetch swap by invoice
451        assert!(storage
452            .fetch_send_swap_by_invoice(&send_swap.invoice)
453            .is_ok());
454
455        Ok(())
456    }
457
458    #[sdk_macros::test_all]
459    fn test_list_send_swap() -> Result<()> {
460        create_persister!(storage);
461
462        // List general send swaps
463        let range = 0..3;
464        for _ in range.clone() {
465            storage.insert_or_update_send_swap(&new_send_swap(None, None))?;
466        }
467
468        let con = storage.get_connection()?;
469        let swaps = storage.list_send_swaps_where(&con, vec![])?;
470        assert_eq!(swaps.len(), range.len());
471
472        // List ongoing send swaps
473        storage.insert_or_update_send_swap(&new_send_swap(Some(PaymentState::Pending), None))?;
474        let ongoing_swaps = storage.list_ongoing_send_swaps()?;
475        assert_eq!(ongoing_swaps.len(), 4);
476
477        // List pending send swaps
478        let ongoing_swaps = storage.list_pending_send_swaps()?;
479        assert_eq!(ongoing_swaps.len(), 1);
480
481        Ok(())
482    }
483
484    #[sdk_macros::test_all]
485    fn test_update_send_swap() -> Result<()> {
486        create_persister!(storage);
487
488        let mut send_swap = new_send_swap(None, None);
489        storage.insert_or_update_send_swap(&send_swap)?;
490
491        // Update metadata
492        let new_state = PaymentState::Pending;
493        let preimage = Some("preimage");
494        let lockup_tx_id = Some("lockup_tx_id");
495        let refund_tx_id = Some("refund_tx_id");
496
497        storage.try_handle_send_swap_update(
498            &send_swap.id,
499            new_state,
500            preimage,
501            lockup_tx_id,
502            refund_tx_id,
503        )?;
504
505        let updated_send_swap = storage
506            .fetch_send_swap_by_id(&send_swap.id)?
507            .ok_or(anyhow!("Could not find Send swap in database"))?;
508
509        assert_eq!(new_state, updated_send_swap.state);
510        assert_eq!(preimage, updated_send_swap.preimage.as_deref());
511        assert_eq!(lockup_tx_id, updated_send_swap.lockup_tx_id.as_deref());
512        assert_eq!(refund_tx_id, updated_send_swap.refund_tx_id.as_deref());
513
514        send_swap.state = new_state;
515
516        // Update state (global)
517        let new_state = PaymentState::Complete;
518        storage.update_send_swaps_by_state(send_swap.state, PaymentState::Complete, None)?;
519        let updated_send_swap = storage
520            .fetch_send_swap_by_id(&send_swap.id)?
521            .ok_or(anyhow!("Could not find Send swap in database"))?;
522        assert_eq!(new_state, updated_send_swap.state);
523
524        Ok(())
525    }
526
527    #[sdk_macros::async_test_all]
528    async fn test_writing_stale_swap() -> Result<()> {
529        create_persister!(storage);
530
531        let send_swap = new_send_swap(None, None);
532        storage.insert_or_update_send_swap(&send_swap)?;
533
534        // read - update - write works if there are no updates in between
535        let mut send_swap = storage.fetch_send_swap_by_id(&send_swap.id)?.unwrap();
536        send_swap.refund_tx_id = Some("tx_id".to_string());
537        storage.insert_or_update_send_swap(&send_swap)?;
538
539        // read - update - write works if there are no updates in between even if no field changes
540        let send_swap = storage.fetch_send_swap_by_id(&send_swap.id)?.unwrap();
541        storage.insert_or_update_send_swap(&send_swap)?;
542
543        // read - update - write fails if there are any updates in between
544        let mut send_swap = storage.fetch_send_swap_by_id(&send_swap.id)?.unwrap();
545        send_swap.refund_tx_id = Some("tx_id_2".to_string());
546        // Concurrent update
547        storage.set_send_swap_lockup_tx_id(&send_swap.id, "tx_id")?;
548        assert!(storage.insert_or_update_send_swap(&send_swap).is_err());
549
550        Ok(())
551    }
552}