breez_sdk_liquid/persist/
receive.rs

1use anyhow::{anyhow, Result};
2use boltz_client::swaps::boltz::CreateReverseResponse;
3use rusqlite::{named_params, params, Connection, Row, TransactionBehavior};
4use sdk_common::bitcoin::hashes::{hex::ToHex, sha256, Hash};
5use serde::{Deserialize, Serialize};
6
7use crate::ensure_sdk;
8use crate::error::PaymentError;
9use crate::model::*;
10use crate::persist::{get_where_clause_state_in, where_clauses_to_string, Persister};
11use crate::sync::model::data::ReceiveSyncData;
12use crate::sync::model::RecordType;
13
14impl Persister {
15    pub(crate) fn insert_or_update_receive_swap_inner(
16        con: &Connection,
17        receive_swap: &ReceiveSwap,
18    ) -> Result<()> {
19        let id_hash = sha256::Hash::hash(receive_swap.id.as_bytes()).to_hex();
20        con.execute(
21            "
22            INSERT INTO receive_swaps (
23                id,
24                id_hash,
25                preimage,
26                create_response_json,
27                claim_private_key,
28                invoice,
29                timeout_block_height,
30                payment_hash,
31                destination_pubkey,
32                payer_amount_sat,
33                receiver_amount_sat,
34                created_at,
35                claim_fees_sat,
36                mrh_address,
37                state,
38                pair_fees_json
39            )
40            VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
41            ON CONFLICT DO NOTHING
42            ",
43            (
44                &receive_swap.id,
45                id_hash,
46                &receive_swap.preimage,
47                &receive_swap.create_response_json,
48                &receive_swap.claim_private_key,
49                &receive_swap.invoice,
50                &receive_swap.timeout_block_height,
51                &receive_swap.payment_hash,
52                &receive_swap.destination_pubkey,
53                &receive_swap.payer_amount_sat,
54                &receive_swap.receiver_amount_sat,
55                &receive_swap.created_at,
56                &receive_swap.claim_fees_sat,
57                &receive_swap.mrh_address,
58                &receive_swap.state,
59                &receive_swap.pair_fees_json,
60            ),
61        )?;
62
63        let rows_affected = con.execute(
64            "UPDATE receive_swaps
65            SET
66                bolt12_offer = :bolt12_offer,
67                description = :description,
68                payer_note = :payer_note,
69                claim_address = :claim_address,
70                claim_tx_id = :claim_tx_id,
71                lockup_tx_id = :lockup_tx_id,
72                mrh_tx_id = :mrh_tx_id,
73                payer_amount_sat = :payer_amount_sat,
74                receiver_amount_sat = :receiver_amount_sat,
75                state = :state
76            WHERE
77                id = :id AND
78                version = :version",
79            named_params! {
80                ":id": &receive_swap.id,
81                ":bolt12_offer": &receive_swap.bolt12_offer,
82                ":description": &receive_swap.description,
83                ":payer_note": &receive_swap.payer_note,
84                ":claim_address": &receive_swap.claim_address,
85                ":claim_tx_id": &receive_swap.claim_tx_id,
86                ":lockup_tx_id": &receive_swap.lockup_tx_id,
87                ":mrh_tx_id": &receive_swap.mrh_tx_id,
88                // When the swap is paid via MRH, the recoverer sets the
89                // payer/receiver amount to the MRH tx amount.
90                // This is to show no fees in the payment.
91                ":payer_amount_sat": &receive_swap.payer_amount_sat,
92                ":receiver_amount_sat": &receive_swap.receiver_amount_sat,
93                ":state": &receive_swap.state,
94                ":version": &receive_swap.metadata.version,
95            },
96        )?;
97        ensure_sdk!(
98            rows_affected > 0,
99            anyhow!("Version mismatch for receive swap {}", receive_swap.id)
100        );
101
102        if receive_swap.mrh_tx_id.is_some() {
103            Self::delete_reserved_address_inner(con, &receive_swap.mrh_address)?;
104        }
105
106        Ok(())
107    }
108
109    pub(crate) fn insert_or_update_receive_swap(&self, receive_swap: &ReceiveSwap) -> Result<()> {
110        let maybe_swap = self.fetch_receive_swap_by_id(&receive_swap.id)?;
111        let updated_fields = ReceiveSyncData::updated_fields(maybe_swap, receive_swap);
112
113        let mut con = self.get_connection()?;
114        let tx = con.transaction_with_behavior(TransactionBehavior::Immediate)?;
115
116        Self::insert_or_update_receive_swap_inner(&tx, receive_swap)?;
117
118        // Trigger a sync if:
119        // - updated_fields is None (swap is inserted, not updated)
120        // - updated_fields in a non empty list of updated fields
121        let trigger_sync = updated_fields.as_ref().is_none_or(|u| !u.is_empty());
122        match trigger_sync {
123            true => {
124                self.commit_outgoing(&tx, &receive_swap.id, RecordType::Receive, updated_fields)?;
125                tx.commit()?;
126                self.trigger_sync();
127            }
128            false => {
129                tx.commit()?;
130            }
131        };
132
133        Ok(())
134    }
135
136    fn list_receive_swaps_query(where_clauses: Vec<String>) -> String {
137        let where_clause_str = where_clauses_to_string(where_clauses);
138
139        format!(
140            "
141            SELECT
142                rs.id,
143                rs.preimage,
144                rs.create_response_json,
145                rs.claim_private_key,
146                rs.invoice,
147                rs.bolt12_offer,
148                rs.payment_hash,
149                rs.destination_pubkey,
150                rs.timeout_block_height,
151                rs.description,
152                rs.payer_note,
153                rs.payer_amount_sat,
154                rs.receiver_amount_sat,
155                rs.claim_fees_sat,
156                rs.claim_address,
157                rs.claim_tx_id,
158                rs.lockup_tx_id,
159                rs.mrh_address,
160                rs.mrh_tx_id,
161                rs.created_at,
162                rs.state,
163                rs.pair_fees_json,
164                rs.version,
165                rs.last_updated_at,
166
167                sync_state.is_local
168            FROM receive_swaps AS rs
169            LEFT JOIN sync_state ON rs.id = sync_state.data_id
170            {where_clause_str}
171            ORDER BY rs.created_at
172        "
173        )
174    }
175
176    pub(crate) fn fetch_receive_swap_by_id(&self, id: &str) -> Result<Option<ReceiveSwap>> {
177        let con: Connection = self.get_connection()?;
178        let query = Self::list_receive_swaps_query(vec!["id = ?1 or id_hash = ?1".to_string()]);
179        let res = con.query_row(&query, [id], Self::sql_row_to_receive_swap);
180
181        Ok(res.ok())
182    }
183
184    pub(crate) fn fetch_receive_swap_by_invoice(
185        &self,
186        invoice: &str,
187    ) -> Result<Option<ReceiveSwap>> {
188        let con: Connection = self.get_connection()?;
189        let query = Self::list_receive_swaps_query(vec!["invoice= ?1".to_string()]);
190        let res = con.query_row(&query, [invoice], Self::sql_row_to_receive_swap);
191
192        Ok(res.ok())
193    }
194
195    fn sql_row_to_receive_swap(row: &Row) -> rusqlite::Result<ReceiveSwap> {
196        Ok(ReceiveSwap {
197            id: row.get(0)?,
198            preimage: row.get(1)?,
199            create_response_json: row.get(2)?,
200            claim_private_key: row.get(3)?,
201            invoice: row.get(4)?,
202            bolt12_offer: row.get(5)?,
203            payment_hash: row.get(6)?,
204            destination_pubkey: row.get(7)?,
205            timeout_block_height: row.get(8)?,
206            description: row.get(9)?,
207            payer_note: row.get(10)?,
208            payer_amount_sat: row.get(11)?,
209            receiver_amount_sat: row.get(12)?,
210            claim_fees_sat: row.get(13)?,
211            claim_address: row.get(14)?,
212            claim_tx_id: row.get(15)?,
213            lockup_tx_id: row.get(16)?,
214            mrh_address: row.get(17)?,
215            mrh_tx_id: row.get(18)?,
216            created_at: row.get(19)?,
217            state: row.get(20)?,
218            pair_fees_json: row.get(21)?,
219            metadata: SwapMetadata {
220                version: row.get(22)?,
221                last_updated_at: row.get(23)?,
222                is_local: row.get::<usize, Option<bool>>(24)?.unwrap_or(true),
223            },
224        })
225    }
226
227    pub(crate) fn list_receive_swaps_where(
228        &self,
229        con: &Connection,
230        where_clauses: Vec<String>,
231    ) -> Result<Vec<ReceiveSwap>> {
232        let query = Self::list_receive_swaps_query(where_clauses);
233        let ongoing_receive = con
234            .prepare(&query)?
235            .query_map(params![], Self::sql_row_to_receive_swap)?
236            .map(|i| i.unwrap())
237            .collect();
238        Ok(ongoing_receive)
239    }
240
241    pub(crate) fn list_ongoing_receive_swaps(&self) -> Result<Vec<ReceiveSwap>> {
242        let con = self.get_connection()?;
243        let where_clauses = vec![get_where_clause_state_in(&[
244            PaymentState::Created,
245            PaymentState::Pending,
246        ])];
247
248        self.list_receive_swaps_where(&con, where_clauses)
249    }
250
251    pub(crate) fn list_recoverable_receive_swaps(&self) -> Result<Vec<ReceiveSwap>> {
252        let con = self.get_connection()?;
253        let where_clause = vec![get_where_clause_state_in(&[
254            PaymentState::Created,
255            PaymentState::Pending,
256        ])];
257
258        self.list_receive_swaps_where(&con, where_clause)
259    }
260
261    pub(crate) fn set_receive_swap_claim_address(
262        &self,
263        swap_id: &str,
264        claim_address: &str,
265    ) -> Result<(), PaymentError> {
266        let con = self.get_connection()?;
267        con.execute(
268            "UPDATE receive_swaps
269            SET claim_address = :claim_address
270            WHERE id = :id",
271            named_params! {
272                        ":id": swap_id,
273                        ":claim_address": claim_address,
274            },
275        )?;
276        Ok(())
277    }
278
279    // Only set the Receive Swap claim_tx_id if not set, otherwise return an error
280    pub(crate) fn set_receive_swap_claim_tx_id(
281        &self,
282        swap_id: &str,
283        claim_tx_id: &str,
284    ) -> Result<(), PaymentError> {
285        let con = self.get_connection()?;
286        let row_count = con
287            .execute(
288                "UPDATE receive_swaps 
289            SET claim_tx_id = :claim_tx_id
290            WHERE id = :id AND claim_tx_id IS NULL",
291                named_params! {
292                            ":id": swap_id,
293                            ":claim_tx_id": claim_tx_id,
294                },
295            )
296            .map_err(|_| PaymentError::PersistError)?;
297        match row_count {
298            1 => Ok(()),
299            _ => Err(PaymentError::AlreadyClaimed),
300        }
301    }
302
303    // Only unset the Receive Swap claim_tx_id if set with the same tx id
304    pub(crate) fn unset_receive_swap_claim_tx_id(
305        &self,
306        swap_id: &str,
307        claim_tx_id: &str,
308    ) -> Result<(), PaymentError> {
309        let con = self.get_connection()?;
310        con.execute(
311            "UPDATE receive_swaps 
312            SET claim_tx_id = NULL
313            WHERE id = :id AND claim_tx_id = :claim_tx_id",
314            named_params! {
315                        ":id": swap_id,
316                        ":claim_tx_id": claim_tx_id,
317            },
318        )
319        .map_err(|_| PaymentError::PersistError)?;
320        Ok(())
321    }
322
323    pub(crate) fn try_handle_receive_swap_update(
324        &self,
325        swap_id: &str,
326        to_state: PaymentState,
327        claim_tx_id: Option<&str>,
328        lockup_tx_id: Option<&str>,
329        mrh_tx_id: Option<&str>,
330        mrh_amount_sat: Option<u64>,
331    ) -> Result<(), PaymentError> {
332        // Do not overwrite claim_tx_id, lockup_tx_id, mrh_tx_id
333        let mut con = self.get_connection()?;
334        let tx = con.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
335
336        tx.execute(
337            "UPDATE receive_swaps
338            SET
339                claim_tx_id = COALESCE(claim_tx_id, :claim_tx_id),
340                lockup_tx_id = COALESCE(lockup_tx_id, :lockup_tx_id),
341                mrh_tx_id = COALESCE(mrh_tx_id, :mrh_tx_id),
342
343                payer_amount_sat = COALESCE(:mrh_amount_sat, payer_amount_sat),
344                receiver_amount_sat = COALESCE(:mrh_amount_sat, receiver_amount_sat),
345                state = :state
346            WHERE
347                id = :id",
348            named_params! {
349                ":id": swap_id,
350                ":lockup_tx_id": lockup_tx_id,
351                ":claim_tx_id": claim_tx_id,
352                ":mrh_tx_id": mrh_tx_id,
353                ":mrh_amount_sat": mrh_amount_sat,
354                ":state": to_state,
355            },
356        )?;
357
358        // NOTE: Receive currently does not update any fields, bypassing the commit logic for now
359        // let updated_fields = None;
360        // Self::commit_outgoing(&tx, swap_id, RecordType::Receive, updated_fields)?;
361        // self.sync_trigger
362        //     .try_send(())
363        //     .map_err(|err| PaymentError::Generic {
364        //         err: format!("Could not trigger manual sync: {err:?}"),
365        //     })?;
366
367        tx.commit()?;
368
369        Ok(())
370    }
371}
372
373#[derive(Clone, Debug, Serialize, Deserialize)]
374pub(crate) struct InternalCreateReverseResponse {
375    pub swap_tree: InternalSwapTree,
376    pub lockup_address: String,
377    pub refund_public_key: String,
378    pub timeout_block_height: u32,
379    pub onchain_amount: u64,
380    pub blinding_key: Option<String>,
381}
382impl InternalCreateReverseResponse {
383    pub(crate) fn try_convert_from_boltz(
384        boltz_create_response: &CreateReverseResponse,
385        expected_swap_id: &str,
386        expected_invoice: Option<&str>,
387    ) -> Result<Self, PaymentError> {
388        // Do not store the CreateResponse fields that are already stored separately
389        // Before skipping them, ensure they match the separately stored ones
390        ensure_sdk!(
391            boltz_create_response.id == expected_swap_id,
392            PaymentError::PersistError
393        );
394        match (&boltz_create_response.invoice, expected_invoice) {
395            (Some(invoice), Some(expected_invoice)) => {
396                ensure_sdk!(invoice == expected_invoice, PaymentError::PersistError);
397            }
398            (None, None) => {}
399            _ => {
400                return Err(PaymentError::PersistError);
401            }
402        }
403
404        let res = InternalCreateReverseResponse {
405            swap_tree: boltz_create_response.swap_tree.clone().into(),
406            lockup_address: boltz_create_response.lockup_address.clone(),
407            refund_public_key: boltz_create_response.refund_public_key.to_string(),
408            timeout_block_height: boltz_create_response.timeout_block_height,
409            onchain_amount: boltz_create_response.onchain_amount,
410            blinding_key: boltz_create_response.blinding_key.clone(),
411        };
412        Ok(res)
413    }
414}
415
416#[cfg(test)]
417mod tests {
418    use anyhow::{anyhow, Result};
419
420    use crate::test_utils::persist::{create_persister, new_receive_swap};
421
422    use super::PaymentState;
423
424    #[cfg(feature = "browser-tests")]
425    wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
426
427    #[sdk_macros::test_all]
428    fn test_fetch_receive_swap() -> Result<()> {
429        create_persister!(storage);
430
431        let receive_swap = new_receive_swap(None, None);
432
433        storage.insert_or_update_receive_swap(&receive_swap)?;
434        // Fetch swap by id
435        assert!(storage.fetch_receive_swap_by_id(&receive_swap.id).is_ok());
436        // Fetch swap by invoice
437        assert!(storage
438            .fetch_receive_swap_by_invoice(&receive_swap.invoice)
439            .is_ok());
440
441        Ok(())
442    }
443
444    #[sdk_macros::test_all]
445    fn test_list_receive_swap() -> Result<()> {
446        create_persister!(storage);
447
448        // List general receive swaps
449        let range = 0..3;
450        for _ in range.clone() {
451            storage.insert_or_update_receive_swap(&new_receive_swap(None, None))?;
452        }
453
454        let con = storage.get_connection()?;
455        let swaps = storage.list_receive_swaps_where(&con, vec![])?;
456        assert_eq!(swaps.len(), range.len());
457
458        // List ongoing receive swaps
459        storage
460            .insert_or_update_receive_swap(&new_receive_swap(Some(PaymentState::Pending), None))?;
461        let ongoing_swaps = storage.list_ongoing_receive_swaps()?;
462        assert_eq!(ongoing_swaps.len(), 4);
463
464        Ok(())
465    }
466
467    #[sdk_macros::test_all]
468    fn test_update_receive_swap() -> Result<()> {
469        create_persister!(storage);
470
471        let receive_swap = new_receive_swap(None, None);
472        storage.insert_or_update_receive_swap(&receive_swap)?;
473
474        // Update metadata
475        let new_state = PaymentState::Pending;
476        let claim_tx_id = Some("claim_tx_id");
477
478        storage.try_handle_receive_swap_update(
479            &receive_swap.id,
480            new_state,
481            claim_tx_id,
482            None,
483            None,
484            None,
485        )?;
486
487        let updated_receive_swap = storage
488            .fetch_receive_swap_by_id(&receive_swap.id)?
489            .ok_or(anyhow!("Could not find Receive swap in database"))?;
490
491        assert_eq!(new_state, updated_receive_swap.state);
492        assert_eq!(claim_tx_id, updated_receive_swap.claim_tx_id.as_deref());
493
494        Ok(())
495    }
496
497    #[sdk_macros::async_test_all]
498    async fn test_writing_stale_swap() -> Result<()> {
499        create_persister!(storage);
500
501        let receive_swap = new_receive_swap(None, None);
502        storage.insert_or_update_receive_swap(&receive_swap)?;
503
504        // read - update - write works if there are no updates in between
505        let mut receive_swap = storage.fetch_receive_swap_by_id(&receive_swap.id)?.unwrap();
506        receive_swap.lockup_tx_id = Some("tx_id".to_string());
507        storage.insert_or_update_receive_swap(&receive_swap)?;
508
509        // read - update - write works if there are no updates in between even if no field changes
510        let receive_swap = storage.fetch_receive_swap_by_id(&receive_swap.id)?.unwrap();
511        storage.insert_or_update_receive_swap(&receive_swap)?;
512
513        // read - update - write fails if there are any updates in between
514        let mut receive_swap = storage.fetch_receive_swap_by_id(&receive_swap.id)?.unwrap();
515        receive_swap.lockup_tx_id = Some("tx_id_2".to_string());
516        // Concurrent update
517        storage.set_receive_swap_claim_tx_id(&receive_swap.id, "tx_id")?;
518        assert!(storage
519            .insert_or_update_receive_swap(&receive_swap)
520            .is_err());
521
522        Ok(())
523    }
524}