breez_sdk_liquid/persist/
receive.rs

1use anyhow::{anyhow, Result};
2use boltz_client::swaps::boltz::CreateReverseResponse;
3use rusqlite::{named_params, params, Connection, Row, TransactionBehavior};
4use sdk_common::bitcoin::hashes::{hex::ToHex, sha256, Hash};
5use serde::{Deserialize, Serialize};
6
7use crate::ensure_sdk;
8use crate::error::PaymentError;
9use crate::model::*;
10use crate::persist::{get_where_clause_state_in, where_clauses_to_string, Persister};
11use crate::sync::model::data::ReceiveSyncData;
12use crate::sync::model::RecordType;
13use crate::utils::{from_optional_u64_to_row, from_row_to_u64, from_u64_to_row};
14
15impl Persister {
16    pub(crate) fn insert_or_update_receive_swap_inner(
17        con: &Connection,
18        receive_swap: &ReceiveSwap,
19    ) -> Result<()> {
20        let id_hash = sha256::Hash::hash(receive_swap.id.as_bytes()).to_hex();
21        con.execute(
22            "
23            INSERT INTO receive_swaps (
24                id,
25                id_hash,
26                preimage,
27                create_response_json,
28                claim_private_key,
29                invoice,
30                timeout_block_height,
31                payment_hash,
32                destination_pubkey,
33                payer_amount_sat,
34                receiver_amount_sat,
35                created_at,
36                claim_fees_sat,
37                mrh_address,
38                state,
39                pair_fees_json
40            )
41            VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
42            ON CONFLICT DO NOTHING
43            ",
44            (
45                &receive_swap.id,
46                id_hash,
47                &receive_swap.preimage,
48                &receive_swap.create_response_json,
49                &receive_swap.claim_private_key,
50                &receive_swap.invoice,
51                &receive_swap.timeout_block_height,
52                &receive_swap.payment_hash,
53                &receive_swap.destination_pubkey,
54                from_u64_to_row(receive_swap.payer_amount_sat)?,
55                from_u64_to_row(receive_swap.receiver_amount_sat)?,
56                &receive_swap.created_at,
57                from_u64_to_row(receive_swap.claim_fees_sat)?,
58                &receive_swap.mrh_address,
59                &receive_swap.state,
60                &receive_swap.pair_fees_json,
61            ),
62        )?;
63
64        let rows_affected = con.execute(
65            "UPDATE receive_swaps
66            SET
67                bolt12_offer = :bolt12_offer,
68                description = :description,
69                payer_note = :payer_note,
70                claim_address = :claim_address,
71                claim_tx_id = :claim_tx_id,
72                lockup_tx_id = :lockup_tx_id,
73                mrh_tx_id = :mrh_tx_id,
74                payer_amount_sat = :payer_amount_sat,
75                receiver_amount_sat = :receiver_amount_sat,
76                state = :state
77            WHERE
78                id = :id AND
79                version = :version",
80            named_params! {
81                ":id": &receive_swap.id,
82                ":bolt12_offer": &receive_swap.bolt12_offer,
83                ":description": &receive_swap.description,
84                ":payer_note": &receive_swap.payer_note,
85                ":claim_address": &receive_swap.claim_address,
86                ":claim_tx_id": &receive_swap.claim_tx_id,
87                ":lockup_tx_id": &receive_swap.lockup_tx_id,
88                ":mrh_tx_id": &receive_swap.mrh_tx_id,
89                // When the swap is paid via MRH, the recoverer sets the
90                // payer/receiver amount to the MRH tx amount.
91                // This is to show no fees in the payment.
92                ":payer_amount_sat": from_u64_to_row(receive_swap.payer_amount_sat)?,
93                ":receiver_amount_sat": from_u64_to_row(receive_swap.receiver_amount_sat)?,
94                ":state": &receive_swap.state,
95                ":version": from_u64_to_row(receive_swap.metadata.version)?,
96            },
97        )?;
98        ensure_sdk!(
99            rows_affected > 0,
100            anyhow!("Version mismatch for receive swap {}", receive_swap.id)
101        );
102
103        if receive_swap.mrh_tx_id.is_some() {
104            Self::delete_reserved_address_inner(con, &receive_swap.mrh_address)?;
105        }
106
107        Ok(())
108    }
109
110    pub(crate) fn insert_or_update_receive_swap(&self, receive_swap: &ReceiveSwap) -> Result<()> {
111        let maybe_swap = self.fetch_receive_swap_by_id(&receive_swap.id)?;
112        let updated_fields = ReceiveSyncData::updated_fields(maybe_swap, receive_swap);
113
114        let mut con = self.get_connection()?;
115        let tx = con.transaction_with_behavior(TransactionBehavior::Immediate)?;
116
117        Self::insert_or_update_receive_swap_inner(&tx, receive_swap)?;
118
119        // Trigger a sync if:
120        // - updated_fields is None (swap is inserted, not updated)
121        // - updated_fields in a non empty list of updated fields
122        let trigger_sync = updated_fields.as_ref().is_none_or(|u| !u.is_empty());
123        match trigger_sync {
124            true => {
125                self.commit_outgoing(&tx, &receive_swap.id, RecordType::Receive, updated_fields)?;
126                tx.commit()?;
127                self.trigger_sync();
128            }
129            false => {
130                tx.commit()?;
131            }
132        };
133
134        Ok(())
135    }
136
137    fn list_receive_swaps_query(where_clauses: Vec<String>) -> String {
138        let where_clause_str = where_clauses_to_string(where_clauses);
139
140        format!(
141            "
142            SELECT
143                rs.id,
144                rs.preimage,
145                rs.create_response_json,
146                rs.claim_private_key,
147                rs.invoice,
148                rs.bolt12_offer,
149                rs.payment_hash,
150                rs.destination_pubkey,
151                rs.timeout_block_height,
152                rs.description,
153                rs.payer_note,
154                rs.payer_amount_sat,
155                rs.receiver_amount_sat,
156                rs.claim_fees_sat,
157                rs.claim_address,
158                rs.claim_tx_id,
159                rs.lockup_tx_id,
160                rs.mrh_address,
161                rs.mrh_tx_id,
162                rs.created_at,
163                rs.state,
164                rs.pair_fees_json,
165                rs.version,
166                rs.last_updated_at,
167
168                sync_state.is_local
169            FROM receive_swaps AS rs
170            LEFT JOIN sync_state ON rs.id = sync_state.data_id
171            {where_clause_str}
172            ORDER BY rs.created_at
173        "
174        )
175    }
176
177    pub(crate) fn fetch_receive_swap_by_id(&self, id: &str) -> Result<Option<ReceiveSwap>> {
178        let con: Connection = self.get_connection()?;
179        let query = Self::list_receive_swaps_query(vec!["id = ?1 or id_hash = ?1".to_string()]);
180        let res = con.query_row(&query, [id], Self::sql_row_to_receive_swap);
181
182        Ok(res.ok())
183    }
184
185    pub(crate) fn fetch_receive_swap_by_invoice(
186        &self,
187        invoice: &str,
188    ) -> Result<Option<ReceiveSwap>> {
189        let con: Connection = self.get_connection()?;
190        let query = Self::list_receive_swaps_query(vec!["invoice= ?1".to_string()]);
191        let res = con.query_row(&query, [invoice], Self::sql_row_to_receive_swap);
192
193        Ok(res.ok())
194    }
195
196    fn sql_row_to_receive_swap(row: &Row) -> rusqlite::Result<ReceiveSwap> {
197        Ok(ReceiveSwap {
198            id: row.get(0)?,
199            preimage: row.get(1)?,
200            create_response_json: row.get(2)?,
201            claim_private_key: row.get(3)?,
202            invoice: row.get(4)?,
203            bolt12_offer: row.get(5)?,
204            payment_hash: row.get(6)?,
205            destination_pubkey: row.get(7)?,
206            timeout_block_height: row.get(8)?,
207            description: row.get(9)?,
208            payer_note: row.get(10)?,
209            payer_amount_sat: from_row_to_u64(row, 11)?,
210            receiver_amount_sat: from_row_to_u64(row, 12)?,
211            claim_fees_sat: from_row_to_u64(row, 13)?,
212            claim_address: row.get(14)?,
213            claim_tx_id: row.get(15)?,
214            lockup_tx_id: row.get(16)?,
215            mrh_address: row.get(17)?,
216            mrh_tx_id: row.get(18)?,
217            created_at: row.get(19)?,
218            state: row.get(20)?,
219            pair_fees_json: row.get(21)?,
220            metadata: SwapMetadata {
221                version: from_row_to_u64(row, 22)?,
222                last_updated_at: row.get(23)?,
223                is_local: row.get::<usize, Option<bool>>(24)?.unwrap_or(true),
224            },
225        })
226    }
227
228    pub(crate) fn list_receive_swaps_where(
229        &self,
230        con: &Connection,
231        where_clauses: Vec<String>,
232    ) -> Result<Vec<ReceiveSwap>> {
233        let query = Self::list_receive_swaps_query(where_clauses);
234        let ongoing_receive = con
235            .prepare(&query)?
236            .query_map(params![], Self::sql_row_to_receive_swap)?
237            .map(|i| i.unwrap())
238            .collect();
239        Ok(ongoing_receive)
240    }
241
242    pub(crate) fn list_ongoing_receive_swaps(&self) -> Result<Vec<ReceiveSwap>> {
243        let con = self.get_connection()?;
244        let where_clauses = vec![get_where_clause_state_in(&[
245            PaymentState::Created,
246            PaymentState::Pending,
247        ])];
248
249        self.list_receive_swaps_where(&con, where_clauses)
250    }
251
252    pub(crate) fn list_recoverable_receive_swaps(&self) -> Result<Vec<ReceiveSwap>> {
253        let con = self.get_connection()?;
254        let where_clause = vec![get_where_clause_state_in(&[
255            PaymentState::Created,
256            PaymentState::Pending,
257        ])];
258
259        self.list_receive_swaps_where(&con, where_clause)
260    }
261
262    pub(crate) fn set_receive_swap_claim_address(
263        &self,
264        swap_id: &str,
265        claim_address: &str,
266    ) -> Result<(), PaymentError> {
267        let con = self.get_connection()?;
268        con.execute(
269            "UPDATE receive_swaps
270            SET claim_address = :claim_address
271            WHERE id = :id",
272            named_params! {
273                        ":id": swap_id,
274                        ":claim_address": claim_address,
275            },
276        )?;
277        Ok(())
278    }
279
280    // Only set the Receive Swap claim_tx_id if not set, otherwise return an error
281    pub(crate) fn set_receive_swap_claim_tx_id(
282        &self,
283        swap_id: &str,
284        claim_tx_id: &str,
285    ) -> Result<(), PaymentError> {
286        let con = self.get_connection()?;
287        let row_count = con
288            .execute(
289                "UPDATE receive_swaps 
290            SET claim_tx_id = :claim_tx_id
291            WHERE id = :id AND claim_tx_id IS NULL",
292                named_params! {
293                            ":id": swap_id,
294                            ":claim_tx_id": claim_tx_id,
295                },
296            )
297            .map_err(|_| PaymentError::PersistError)?;
298        match row_count {
299            1 => Ok(()),
300            _ => Err(PaymentError::AlreadyClaimed),
301        }
302    }
303
304    // Only unset the Receive Swap claim_tx_id if set with the same tx id
305    pub(crate) fn unset_receive_swap_claim_tx_id(
306        &self,
307        swap_id: &str,
308        claim_tx_id: &str,
309    ) -> Result<(), PaymentError> {
310        let con = self.get_connection()?;
311        con.execute(
312            "UPDATE receive_swaps 
313            SET claim_tx_id = NULL
314            WHERE id = :id AND claim_tx_id = :claim_tx_id",
315            named_params! {
316                        ":id": swap_id,
317                        ":claim_tx_id": claim_tx_id,
318            },
319        )
320        .map_err(|_| PaymentError::PersistError)?;
321        Ok(())
322    }
323
324    pub(crate) fn try_handle_receive_swap_update(
325        &self,
326        swap_id: &str,
327        to_state: PaymentState,
328        claim_tx_id: Option<&str>,
329        lockup_tx_id: Option<&str>,
330        mrh_tx_id: Option<&str>,
331        mrh_amount_sat: Option<u64>,
332    ) -> Result<(), PaymentError> {
333        // Do not overwrite claim_tx_id, lockup_tx_id, mrh_tx_id
334        let mut con = self.get_connection()?;
335        let tx = con.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
336
337        tx.execute(
338            "UPDATE receive_swaps
339            SET
340                claim_tx_id = COALESCE(claim_tx_id, :claim_tx_id),
341                lockup_tx_id = COALESCE(lockup_tx_id, :lockup_tx_id),
342                mrh_tx_id = COALESCE(mrh_tx_id, :mrh_tx_id),
343
344                payer_amount_sat = COALESCE(:mrh_amount_sat, payer_amount_sat),
345                receiver_amount_sat = COALESCE(:mrh_amount_sat, receiver_amount_sat),
346                state = :state
347            WHERE
348                id = :id",
349            named_params! {
350                ":id": swap_id,
351                ":lockup_tx_id": lockup_tx_id,
352                ":claim_tx_id": claim_tx_id,
353                ":mrh_tx_id": mrh_tx_id,
354                ":mrh_amount_sat": from_optional_u64_to_row(&mrh_amount_sat)?,
355                ":state": to_state,
356            },
357        )?;
358
359        // NOTE: Receive currently does not update any fields, bypassing the commit logic for now
360        // let updated_fields = None;
361        // Self::commit_outgoing(&tx, swap_id, RecordType::Receive, updated_fields)?;
362        // self.sync_trigger
363        //     .try_send(())
364        //     .map_err(|err| PaymentError::Generic {
365        //         err: format!("Could not trigger manual sync: {err:?}"),
366        //     })?;
367
368        tx.commit()?;
369
370        Ok(())
371    }
372}
373
374#[derive(Clone, Debug, Serialize, Deserialize)]
375pub(crate) struct InternalCreateReverseResponse {
376    pub swap_tree: InternalSwapTree,
377    pub lockup_address: String,
378    pub refund_public_key: String,
379    pub timeout_block_height: u32,
380    pub onchain_amount: u64,
381    pub blinding_key: Option<String>,
382}
383impl InternalCreateReverseResponse {
384    pub(crate) fn try_convert_from_boltz(
385        boltz_create_response: &CreateReverseResponse,
386        expected_swap_id: &str,
387        expected_invoice: Option<&str>,
388    ) -> Result<Self, PaymentError> {
389        // Do not store the CreateResponse fields that are already stored separately
390        // Before skipping them, ensure they match the separately stored ones
391        ensure_sdk!(
392            boltz_create_response.id == expected_swap_id,
393            PaymentError::PersistError
394        );
395        match (&boltz_create_response.invoice, expected_invoice) {
396            (Some(invoice), Some(expected_invoice)) => {
397                ensure_sdk!(invoice == expected_invoice, PaymentError::PersistError);
398            }
399            (None, None) => {}
400            _ => {
401                return Err(PaymentError::PersistError);
402            }
403        }
404
405        let res = InternalCreateReverseResponse {
406            swap_tree: boltz_create_response.swap_tree.clone().into(),
407            lockup_address: boltz_create_response.lockup_address.clone(),
408            refund_public_key: boltz_create_response.refund_public_key.to_string(),
409            timeout_block_height: boltz_create_response.timeout_block_height,
410            onchain_amount: boltz_create_response.onchain_amount,
411            blinding_key: boltz_create_response.blinding_key.clone(),
412        };
413        Ok(res)
414    }
415}
416
417#[cfg(test)]
418mod tests {
419    use anyhow::{anyhow, Result};
420
421    use crate::test_utils::persist::{create_persister, new_receive_swap};
422
423    use super::PaymentState;
424
425    #[cfg(feature = "browser-tests")]
426    wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
427
428    #[sdk_macros::test_all]
429    fn test_fetch_receive_swap() -> Result<()> {
430        create_persister!(storage);
431
432        let receive_swap = new_receive_swap(None, None);
433
434        storage.insert_or_update_receive_swap(&receive_swap)?;
435        // Fetch swap by id
436        assert!(storage.fetch_receive_swap_by_id(&receive_swap.id).is_ok());
437        // Fetch swap by invoice
438        assert!(storage
439            .fetch_receive_swap_by_invoice(&receive_swap.invoice)
440            .is_ok());
441
442        Ok(())
443    }
444
445    #[sdk_macros::test_all]
446    fn test_list_receive_swap() -> Result<()> {
447        create_persister!(storage);
448
449        // List general receive swaps
450        let range = 0..3;
451        for _ in range.clone() {
452            storage.insert_or_update_receive_swap(&new_receive_swap(None, None))?;
453        }
454
455        let con = storage.get_connection()?;
456        let swaps = storage.list_receive_swaps_where(&con, vec![])?;
457        assert_eq!(swaps.len(), range.len());
458
459        // List ongoing receive swaps
460        storage
461            .insert_or_update_receive_swap(&new_receive_swap(Some(PaymentState::Pending), None))?;
462        let ongoing_swaps = storage.list_ongoing_receive_swaps()?;
463        assert_eq!(ongoing_swaps.len(), 4);
464
465        Ok(())
466    }
467
468    #[sdk_macros::test_all]
469    fn test_update_receive_swap() -> Result<()> {
470        create_persister!(storage);
471
472        let receive_swap = new_receive_swap(None, None);
473        storage.insert_or_update_receive_swap(&receive_swap)?;
474
475        // Update metadata
476        let new_state = PaymentState::Pending;
477        let claim_tx_id = Some("claim_tx_id");
478
479        storage.try_handle_receive_swap_update(
480            &receive_swap.id,
481            new_state,
482            claim_tx_id,
483            None,
484            None,
485            None,
486        )?;
487
488        let updated_receive_swap = storage
489            .fetch_receive_swap_by_id(&receive_swap.id)?
490            .ok_or(anyhow!("Could not find Receive swap in database"))?;
491
492        assert_eq!(new_state, updated_receive_swap.state);
493        assert_eq!(claim_tx_id, updated_receive_swap.claim_tx_id.as_deref());
494
495        Ok(())
496    }
497
498    #[sdk_macros::async_test_all]
499    async fn test_writing_stale_swap() -> Result<()> {
500        create_persister!(storage);
501
502        let receive_swap = new_receive_swap(None, None);
503        storage.insert_or_update_receive_swap(&receive_swap)?;
504
505        // read - update - write works if there are no updates in between
506        let mut receive_swap = storage.fetch_receive_swap_by_id(&receive_swap.id)?.unwrap();
507        receive_swap.lockup_tx_id = Some("tx_id".to_string());
508        storage.insert_or_update_receive_swap(&receive_swap)?;
509
510        // read - update - write works if there are no updates in between even if no field changes
511        let receive_swap = storage.fetch_receive_swap_by_id(&receive_swap.id)?.unwrap();
512        storage.insert_or_update_receive_swap(&receive_swap)?;
513
514        // read - update - write fails if there are any updates in between
515        let mut receive_swap = storage.fetch_receive_swap_by_id(&receive_swap.id)?.unwrap();
516        receive_swap.lockup_tx_id = Some("tx_id_2".to_string());
517        // Concurrent update
518        storage.set_receive_swap_claim_tx_id(&receive_swap.id, "tx_id")?;
519        assert!(storage
520            .insert_or_update_receive_swap(&receive_swap)
521            .is_err());
522
523        Ok(())
524    }
525}