breez_sdk_liquid/persist/
chain.rs

1use anyhow::{anyhow, Result};
2use boltz_client::swaps::boltz::{ChainSwapDetails, CreateChainResponse};
3use rusqlite::{named_params, params, Connection, Row, TransactionBehavior};
4use sdk_common::bitcoin::hashes::{hex::ToHex, sha256, Hash};
5use serde::{Deserialize, Serialize};
6
7use crate::ensure_sdk;
8use crate::error::PaymentError;
9use crate::model::*;
10use crate::persist::{get_where_clause_state_in, where_clauses_to_string, Persister};
11use crate::sync::model::data::ChainSyncData;
12use crate::sync::model::RecordType;
13
14impl Persister {
15    pub(crate) fn insert_or_update_chain_swap_inner(
16        con: &Connection,
17        chain_swap: &ChainSwap,
18    ) -> Result<()> {
19        // There is a limit of 16 param elements in a single tuple in rusqlite,
20        // so we split up the insert into two statements.
21        let id_hash = sha256::Hash::hash(chain_swap.id.as_bytes()).to_hex();
22        con.execute(
23            "
24            INSERT INTO chain_swaps (
25                id,
26                id_hash,
27                direction,
28                lockup_address,
29                timeout_block_height,
30                preimage,
31                payer_amount_sat,
32                receiver_amount_sat,
33                accept_zero_conf,
34                create_response_json,
35                claim_private_key,
36                refund_private_key,
37                claim_fees_sat,
38                created_at,
39                state
40            )
41            VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
42		    ON CONFLICT DO NOTHING",
43            (
44                &chain_swap.id,
45                &id_hash,
46                &chain_swap.direction,
47                &chain_swap.lockup_address,
48                &chain_swap.timeout_block_height,
49                &chain_swap.preimage,
50                &chain_swap.payer_amount_sat,
51                &chain_swap.receiver_amount_sat,
52                &chain_swap.accept_zero_conf,
53                &chain_swap.create_response_json,
54                &chain_swap.claim_private_key,
55                &chain_swap.refund_private_key,
56                &chain_swap.claim_fees_sat,
57                &chain_swap.created_at,
58                &chain_swap.state,
59            ),
60        )?;
61
62        let rows_affected = con.execute(
63            "UPDATE chain_swaps
64            SET
65                description = :description,
66                accept_zero_conf = :accept_zero_conf,
67                server_lockup_tx_id = :server_lockup_tx_id,
68                user_lockup_tx_id = :user_lockup_tx_id,
69                claim_address = :claim_address,
70                claim_tx_id = :claim_tx_id,
71                refund_address = :refund_address,
72                refund_tx_id = :refund_tx_id,
73                pair_fees_json = :pair_fees_json,
74                state = :state,
75                actual_payer_amount_sat = :actual_payer_amount_sat,
76                accepted_receiver_amount_sat = :accepted_receiver_amount_sat
77            WHERE
78                id = :id AND
79                version = :version",
80            named_params! {
81                ":id": &chain_swap.id,
82                ":description": &chain_swap.description,
83                ":accept_zero_conf": &chain_swap.accept_zero_conf,
84                ":server_lockup_tx_id": &chain_swap.server_lockup_tx_id,
85                ":user_lockup_tx_id": &chain_swap.user_lockup_tx_id,
86                ":claim_address": &chain_swap.claim_address,
87                ":claim_tx_id": &chain_swap.claim_tx_id,
88                ":refund_address": &chain_swap.refund_address,
89                ":refund_tx_id": &chain_swap.refund_tx_id,
90                ":pair_fees_json": &chain_swap.pair_fees_json,
91                ":state": &chain_swap.state,
92                ":actual_payer_amount_sat": &chain_swap.actual_payer_amount_sat,
93                ":accepted_receiver_amount_sat": &chain_swap.accepted_receiver_amount_sat,
94                ":version": &chain_swap.metadata.version,
95            },
96        )?;
97        ensure_sdk!(
98            rows_affected > 0,
99            anyhow!("Version mismatch for chain swap {}", chain_swap.id)
100        );
101
102        Ok(())
103    }
104
105    pub(crate) fn insert_or_update_chain_swap(&self, chain_swap: &ChainSwap) -> Result<()> {
106        let maybe_swap = self.fetch_chain_swap_by_id(&chain_swap.id)?;
107        let updated_fields = ChainSyncData::updated_fields(maybe_swap, chain_swap);
108
109        let mut con = self.get_connection()?;
110        let tx = con.transaction_with_behavior(TransactionBehavior::Immediate)?;
111
112        Self::insert_or_update_chain_swap_inner(&tx, chain_swap)?;
113
114        // Trigger a sync if:
115        // - updated_fields is None (swap is inserted, not updated)
116        // - updated_fields in a non empty list of updated fields
117        let trigger_sync = updated_fields.as_ref().is_none_or(|u| !u.is_empty());
118        match trigger_sync {
119            true => {
120                self.commit_outgoing(&tx, &chain_swap.id, RecordType::Chain, updated_fields)?;
121                tx.commit()?;
122                self.trigger_sync();
123            }
124            false => {
125                tx.commit()?;
126            }
127        };
128
129        Ok(())
130    }
131
132    fn list_chain_swaps_query(where_clauses: Vec<String>) -> String {
133        let where_clause_str = where_clauses_to_string(where_clauses);
134
135        format!(
136            "
137            SELECT
138                id,
139                direction,
140                claim_address,
141                lockup_address,
142                refund_address,
143                timeout_block_height,
144                preimage,
145                description,
146                payer_amount_sat,
147                receiver_amount_sat,
148                accept_zero_conf,
149                create_response_json,
150                claim_private_key,
151                refund_private_key,
152                server_lockup_tx_id,
153                user_lockup_tx_id,
154                claim_fees_sat,
155                claim_tx_id,
156                refund_tx_id,
157                created_at,
158                state,
159                pair_fees_json,
160                actual_payer_amount_sat,
161                accepted_receiver_amount_sat,
162                auto_accepted_fees,
163                version,
164                last_updated_at,
165
166                sync_state.is_local
167            FROM chain_swaps
168            LEFT JOIN sync_state ON chain_swaps.id = sync_state.data_id
169            {where_clause_str}
170            ORDER BY created_at
171        "
172        )
173    }
174
175    pub(crate) fn fetch_chain_swap_by_id(&self, id: &str) -> Result<Option<ChainSwap>> {
176        let con: Connection = self.get_connection()?;
177        let query = Self::list_chain_swaps_query(vec!["id = ?1 or id_hash = ?1".to_string()]);
178        let res = con.query_row(&query, [id], Self::sql_row_to_chain_swap);
179
180        Ok(res.ok())
181    }
182
183    pub(crate) fn fetch_chain_swap_by_lockup_address(
184        &self,
185        lockup_address: &str,
186    ) -> Result<Option<ChainSwap>> {
187        let con: Connection = self.get_connection()?;
188        let query = Self::list_chain_swaps_query(vec!["lockup_address = ?1".to_string()]);
189        let res = con.query_row(&query, [lockup_address], Self::sql_row_to_chain_swap);
190
191        Ok(res.ok())
192    }
193
194    fn sql_row_to_chain_swap(row: &Row) -> rusqlite::Result<ChainSwap> {
195        Ok(ChainSwap {
196            id: row.get(0)?,
197            direction: row.get(1)?,
198            claim_address: row.get(2)?,
199            lockup_address: row.get(3)?,
200            refund_address: row.get(4)?,
201            timeout_block_height: row.get(5)?,
202            preimage: row.get(6)?,
203            description: row.get(7)?,
204            payer_amount_sat: row.get(8)?,
205            receiver_amount_sat: row.get(9)?,
206            accept_zero_conf: row.get(10)?,
207            create_response_json: row.get(11)?,
208            claim_private_key: row.get(12)?,
209            refund_private_key: row.get(13)?,
210            server_lockup_tx_id: row.get(14)?,
211            user_lockup_tx_id: row.get(15)?,
212            claim_fees_sat: row.get(16)?,
213            claim_tx_id: row.get(17)?,
214            refund_tx_id: row.get(18)?,
215            created_at: row.get(19)?,
216            state: row.get(20)?,
217            pair_fees_json: row.get(21)?,
218            actual_payer_amount_sat: row.get(22)?,
219            accepted_receiver_amount_sat: row.get(23)?,
220            auto_accepted_fees: row.get(24)?,
221            metadata: SwapMetadata {
222                version: row.get(25)?,
223                last_updated_at: row.get(26)?,
224                is_local: row.get::<usize, Option<bool>>(27)?.unwrap_or(true),
225            },
226        })
227    }
228
229    pub(crate) fn list_chain_swaps(&self) -> Result<Vec<ChainSwap>> {
230        let con: Connection = self.get_connection()?;
231        self.list_chain_swaps_where(&con, vec![])
232    }
233
234    pub(crate) fn list_chain_swaps_where(
235        &self,
236        con: &Connection,
237        where_clauses: Vec<String>,
238    ) -> Result<Vec<ChainSwap>> {
239        let query = Self::list_chain_swaps_query(where_clauses);
240        let chain_swaps = con
241            .prepare(&query)?
242            .query_map(params![], Self::sql_row_to_chain_swap)?
243            .map(|i| i.unwrap())
244            .collect();
245        Ok(chain_swaps)
246    }
247
248    pub(crate) fn list_chain_swaps_by_state(
249        &self,
250        states: Vec<PaymentState>,
251    ) -> Result<Vec<ChainSwap>> {
252        let con = self.get_connection()?;
253        let where_clause = vec![get_where_clause_state_in(&states)];
254        self.list_chain_swaps_where(&con, where_clause)
255    }
256
257    pub(crate) fn list_ongoing_chain_swaps(&self) -> Result<Vec<ChainSwap>> {
258        self.list_chain_swaps_by_state(vec![
259            PaymentState::Created,
260            PaymentState::Pending,
261            PaymentState::WaitingFeeAcceptance,
262        ])
263    }
264
265    pub(crate) fn list_pending_chain_swaps(&self) -> Result<Vec<ChainSwap>> {
266        self.list_chain_swaps_by_state(vec![
267            PaymentState::Pending,
268            PaymentState::RefundPending,
269            PaymentState::WaitingFeeAcceptance,
270        ])
271    }
272
273    pub(crate) fn list_refundable_chain_swaps(&self) -> Result<Vec<ChainSwap>> {
274        self.list_chain_swaps_by_state(vec![PaymentState::Refundable, PaymentState::RefundPending])
275    }
276
277    pub(crate) fn update_chain_swap_accept_zero_conf(
278        &self,
279        swap_id: &str,
280        accept_zero_conf: bool,
281    ) -> Result<(), PaymentError> {
282        let mut con: Connection = self.get_connection()?;
283        let tx = con.transaction_with_behavior(TransactionBehavior::Immediate)?;
284
285        tx.execute(
286            "UPDATE chain_swaps
287            SET
288                accept_zero_conf = :accept_zero_conf
289            WHERE
290                id = :id",
291            named_params! {
292                ":id": swap_id,
293                ":accept_zero_conf": accept_zero_conf,
294            },
295        )?;
296        self.commit_outgoing(
297            &tx,
298            swap_id,
299            RecordType::Chain,
300            Some(vec!["accept_zero_conf".to_string()]),
301        )?;
302        tx.commit()?;
303        self.trigger_sync();
304
305        Ok(())
306    }
307
308    /// Used for receive chain swaps, when the sender over/underpays
309    pub(crate) fn update_actual_payer_amount(
310        &self,
311        swap_id: &str,
312        actual_payer_amount_sat: u64,
313    ) -> Result<(), PaymentError> {
314        log::info!(
315            "Updating chain swap {swap_id}: actual_payer_amount_sat = {actual_payer_amount_sat}"
316        );
317        let con: Connection = self.get_connection()?;
318        con.execute(
319            "UPDATE chain_swaps 
320            SET actual_payer_amount_sat = :actual_payer_amount_sat
321            WHERE id = :id",
322            named_params! {
323                ":id": swap_id,
324                ":actual_payer_amount_sat": actual_payer_amount_sat,
325            },
326        )?;
327
328        Ok(())
329    }
330
331    /// Used for receive chain swaps, when fees are accepted and thus the agreed received amount is known
332    ///
333    /// Can also be used to erase a previously persisted accepted amount in case of failure to accept.
334    pub(crate) fn update_accepted_receiver_amount(
335        &self,
336        swap_id: &str,
337        accepted_receiver_amount_sat: Option<u64>,
338    ) -> Result<(), PaymentError> {
339        log::info!(
340            "Updating chain swap {swap_id}: accepted_receiver_amount_sat = {accepted_receiver_amount_sat:?}"
341        );
342        let mut con: Connection = self.get_connection()?;
343        let tx = con.transaction_with_behavior(TransactionBehavior::Immediate)?;
344
345        tx.execute(
346            "UPDATE chain_swaps 
347            SET accepted_receiver_amount_sat = :accepted_receiver_amount_sat
348            WHERE id = :id",
349            named_params! {
350                ":id": swap_id,
351                ":accepted_receiver_amount_sat": accepted_receiver_amount_sat,
352            },
353        )?;
354        self.commit_outgoing(
355            &tx,
356            swap_id,
357            RecordType::Chain,
358            Some(vec!["accepted_receiver_amount_sat".to_string()]),
359        )?;
360        tx.commit()?;
361        self.trigger_sync();
362
363        Ok(())
364    }
365
366    pub(crate) fn set_chain_swap_refund_address(
367        &self,
368        swap_id: &str,
369        refund_address: &str,
370    ) -> Result<(), PaymentError> {
371        let con = self.get_connection()?;
372        con.execute(
373            "UPDATE chain_swaps
374            SET refund_address = :refund_address
375            WHERE id = :id",
376            named_params! {
377                        ":id": swap_id,
378                        ":refund_address": refund_address,
379            },
380        )?;
381        Ok(())
382    }
383
384    pub(crate) fn set_chain_swap_auto_accepted_fees(
385        &self,
386        swap_id: &str,
387    ) -> Result<(), PaymentError> {
388        log::info!("Setting chain swap {swap_id}: auto_accepted_fees to TRUE");
389
390        let mut con = self.get_connection()?;
391        let tx = con.transaction_with_behavior(TransactionBehavior::Immediate)?;
392
393        tx.execute(
394            "UPDATE chain_swaps
395            SET auto_accepted_fees = 1
396            WHERE id = :id",
397            named_params! {
398                ":id": swap_id,
399            },
400        )?;
401        self.commit_outgoing(
402            &tx,
403            swap_id,
404            RecordType::Chain,
405            Some(vec!["auto_accepted_fees".to_string()]),
406        )?;
407        tx.commit()?;
408        self.trigger_sync();
409        Ok(())
410    }
411
412    // Only set the Chain Swap claim_tx_id if not set, otherwise return an error
413    pub(crate) fn set_chain_swap_claim(
414        &self,
415        swap_id: &str,
416        claim_address: Option<String>,
417        claim_tx_id: &str,
418    ) -> Result<(), PaymentError> {
419        let con = self.get_connection()?;
420        let row_count = con
421            .execute(
422                "UPDATE chain_swaps
423            SET claim_address = :claim_address, claim_tx_id = :claim_tx_id
424            WHERE id = :id AND claim_tx_id IS NULL",
425                named_params! {
426                            ":id": swap_id,
427                            ":claim_address": claim_address,
428                            ":claim_tx_id": claim_tx_id,
429                },
430            )
431            .map_err(|_| PaymentError::PersistError)?;
432        match row_count {
433            1 => Ok(()),
434            _ => Err(PaymentError::AlreadyClaimed),
435        }
436    }
437
438    // Only unset the Chain Swap claim_tx_id if set with the same tx id
439    pub(crate) fn unset_chain_swap_claim_tx_id(
440        &self,
441        swap_id: &str,
442        claim_tx_id: &str,
443    ) -> Result<(), PaymentError> {
444        let con = self.get_connection()?;
445        con.execute(
446            "UPDATE chain_swaps
447            SET claim_tx_id = NULL
448            WHERE id = :id AND claim_tx_id = :claim_tx_id",
449            named_params! {
450                        ":id": swap_id,
451                        ":claim_tx_id": claim_tx_id,
452            },
453        )
454        .map_err(|_| PaymentError::PersistError)?;
455        Ok(())
456    }
457
458    pub(crate) fn try_handle_chain_swap_update(
459        &self,
460        swap_update: &ChainSwapUpdate,
461    ) -> Result<(), PaymentError> {
462        // Do not overwrite server_lockup_tx_id, user_lockup_tx_id, claim_address, claim_tx_id
463        // Overwrite refund_tx_id if provided (refund tx fee bump)
464        let mut con = self.get_connection()?;
465        let tx = con.transaction_with_behavior(TransactionBehavior::Immediate)?;
466
467        tx.execute(
468            "UPDATE chain_swaps
469            SET
470                server_lockup_tx_id = COALESCE(server_lockup_tx_id, :server_lockup_tx_id),
471                user_lockup_tx_id = COALESCE(user_lockup_tx_id, :user_lockup_tx_id),
472                claim_address = COALESCE(claim_address, :claim_address),
473                claim_tx_id = COALESCE(claim_tx_id, :claim_tx_id),
474
475                refund_tx_id = COALESCE(:refund_tx_id, refund_tx_id),
476                state = :state
477            WHERE
478                id = :id",
479            named_params! {
480                ":id": swap_update.swap_id,
481                ":server_lockup_tx_id": swap_update.server_lockup_tx_id,
482                ":user_lockup_tx_id": swap_update.user_lockup_tx_id,
483                ":claim_address": swap_update.claim_address,
484                ":claim_tx_id": swap_update.claim_tx_id,
485                ":refund_tx_id": swap_update.refund_tx_id,
486                ":state": swap_update.to_state,
487            },
488        )?;
489
490        tx.commit()?;
491
492        Ok(())
493    }
494}
495
496#[derive(Clone, Debug, Serialize, Deserialize)]
497pub(crate) struct InternalCreateChainResponse {
498    pub(crate) claim_details: ChainSwapDetails,
499    pub(crate) lockup_details: ChainSwapDetails,
500}
501impl InternalCreateChainResponse {
502    pub(crate) fn try_convert_from_boltz(
503        boltz_create_response: &CreateChainResponse,
504        expected_swap_id: &str,
505    ) -> Result<InternalCreateChainResponse, PaymentError> {
506        // Do not store the CreateResponse fields that are already stored separately
507        // Before skipping them, ensure they match the separately stored ones
508        ensure_sdk!(
509            boltz_create_response.id == expected_swap_id,
510            PaymentError::PersistError
511        );
512
513        let res = InternalCreateChainResponse {
514            claim_details: boltz_create_response.claim_details.clone(),
515            lockup_details: boltz_create_response.lockup_details.clone(),
516        };
517        Ok(res)
518    }
519}
520
521#[cfg(test)]
522mod tests {
523    use crate::model::Direction;
524    use crate::test_utils::chain_swap::new_chain_swap;
525    use crate::test_utils::persist::create_persister;
526    use anyhow::Result;
527
528    #[cfg(feature = "browser-tests")]
529    wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
530
531    #[sdk_macros::async_test_all]
532    async fn test_writing_stale_swap() -> Result<()> {
533        create_persister!(storage);
534
535        let chain_swap = new_chain_swap(Direction::Incoming, None, false, None, false, false, None);
536        storage.insert_or_update_chain_swap(&chain_swap)?;
537
538        // read - update - write works if there are no updates in between
539        let mut chain_swap = storage.fetch_chain_swap_by_id(&chain_swap.id)?.unwrap();
540        chain_swap.claim_tx_id = Some("tx_id".to_string());
541        storage.insert_or_update_chain_swap(&chain_swap)?;
542
543        // read - update - write works if there are no updates in between even if no field changes
544        let chain_swap = storage.fetch_chain_swap_by_id(&chain_swap.id)?.unwrap();
545        storage.insert_or_update_chain_swap(&chain_swap)?;
546
547        // read - update - write fails if there are any updates in between
548        let mut chain_swap = storage.fetch_chain_swap_by_id(&chain_swap.id)?.unwrap();
549        chain_swap.claim_tx_id = Some("tx_id_2".to_string());
550        // Concurrent update
551        storage.update_chain_swap_accept_zero_conf(&chain_swap.id, true)?;
552        assert!(storage.insert_or_update_chain_swap(&chain_swap).is_err());
553
554        Ok(())
555    }
556}