breez_sdk_liquid/persist/
chain.rs

1use anyhow::{anyhow, Result};
2use boltz_client::swaps::boltz::{ChainSwapDetails, CreateChainResponse};
3use rusqlite::{named_params, params, Connection, Row, TransactionBehavior};
4use sdk_common::bitcoin::hashes::{hex::ToHex, sha256, Hash};
5use serde::{Deserialize, Serialize};
6
7use crate::ensure_sdk;
8use crate::error::PaymentError;
9use crate::model::*;
10use crate::persist::{get_where_clause_state_in, where_clauses_to_string, Persister};
11use crate::sync::model::data::ChainSyncData;
12use crate::sync::model::RecordType;
13
14impl Persister {
15    pub(crate) fn insert_or_update_chain_swap_inner(
16        con: &Connection,
17        chain_swap: &ChainSwap,
18    ) -> Result<()> {
19        // There is a limit of 16 param elements in a single tuple in rusqlite,
20        // so we split up the insert into two statements.
21        let id_hash = sha256::Hash::hash(chain_swap.id.as_bytes()).to_hex();
22        con.execute(
23            "
24            INSERT INTO chain_swaps (
25                id,
26                id_hash,
27                direction,
28                lockup_address,
29                timeout_block_height,
30                claim_timeout_block_height,
31                preimage,
32                payer_amount_sat,
33                receiver_amount_sat,
34                accept_zero_conf,
35                create_response_json,
36                claim_private_key,
37                refund_private_key,
38                claim_fees_sat,
39                created_at,
40                state
41            )
42            VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
43		    ON CONFLICT DO NOTHING",
44            (
45                &chain_swap.id,
46                &id_hash,
47                &chain_swap.direction,
48                &chain_swap.lockup_address,
49                &chain_swap.timeout_block_height,
50                &chain_swap.claim_timeout_block_height,
51                &chain_swap.preimage,
52                &chain_swap.payer_amount_sat,
53                &chain_swap.receiver_amount_sat,
54                &chain_swap.accept_zero_conf,
55                &chain_swap.create_response_json,
56                &chain_swap.claim_private_key,
57                &chain_swap.refund_private_key,
58                &chain_swap.claim_fees_sat,
59                &chain_swap.created_at,
60                &chain_swap.state,
61            ),
62        )?;
63
64        let rows_affected = con.execute(
65            "UPDATE chain_swaps
66            SET
67                description = :description,
68                accept_zero_conf = :accept_zero_conf,
69                server_lockup_tx_id = :server_lockup_tx_id,
70                user_lockup_tx_id = :user_lockup_tx_id,
71                claim_address = :claim_address,
72                claim_tx_id = :claim_tx_id,
73                refund_address = :refund_address,
74                refund_tx_id = :refund_tx_id,
75                pair_fees_json = :pair_fees_json,
76                state = :state,
77                actual_payer_amount_sat = :actual_payer_amount_sat,
78                accepted_receiver_amount_sat = :accepted_receiver_amount_sat
79            WHERE
80                id = :id AND
81                version = :version",
82            named_params! {
83                ":id": &chain_swap.id,
84                ":description": &chain_swap.description,
85                ":accept_zero_conf": &chain_swap.accept_zero_conf,
86                ":server_lockup_tx_id": &chain_swap.server_lockup_tx_id,
87                ":user_lockup_tx_id": &chain_swap.user_lockup_tx_id,
88                ":claim_address": &chain_swap.claim_address,
89                ":claim_tx_id": &chain_swap.claim_tx_id,
90                ":refund_address": &chain_swap.refund_address,
91                ":refund_tx_id": &chain_swap.refund_tx_id,
92                ":pair_fees_json": &chain_swap.pair_fees_json,
93                ":state": &chain_swap.state,
94                ":actual_payer_amount_sat": &chain_swap.actual_payer_amount_sat,
95                ":accepted_receiver_amount_sat": &chain_swap.accepted_receiver_amount_sat,
96                ":version": &chain_swap.metadata.version,
97            },
98        )?;
99        ensure_sdk!(
100            rows_affected > 0,
101            anyhow!("Version mismatch for chain swap {}", chain_swap.id)
102        );
103
104        Ok(())
105    }
106
107    pub(crate) fn insert_or_update_chain_swap(&self, chain_swap: &ChainSwap) -> Result<()> {
108        let maybe_swap = self.fetch_chain_swap_by_id(&chain_swap.id)?;
109        let updated_fields = ChainSyncData::updated_fields(maybe_swap, chain_swap);
110
111        let mut con = self.get_connection()?;
112        let tx = con.transaction_with_behavior(TransactionBehavior::Immediate)?;
113
114        Self::insert_or_update_chain_swap_inner(&tx, chain_swap)?;
115
116        // Trigger a sync if:
117        // - updated_fields is None (swap is inserted, not updated)
118        // - updated_fields in a non empty list of updated fields
119        let trigger_sync = updated_fields.as_ref().is_none_or(|u| !u.is_empty());
120        match trigger_sync {
121            true => {
122                self.commit_outgoing(&tx, &chain_swap.id, RecordType::Chain, updated_fields)?;
123                tx.commit()?;
124                self.trigger_sync();
125            }
126            false => {
127                tx.commit()?;
128            }
129        };
130
131        Ok(())
132    }
133
134    fn list_chain_swaps_query(where_clauses: Vec<String>) -> String {
135        let where_clause_str = where_clauses_to_string(where_clauses);
136
137        format!(
138            "
139            SELECT
140                id,
141                direction,
142                claim_address,
143                lockup_address,
144                refund_address,
145                timeout_block_height,
146                claim_timeout_block_height,
147                preimage,
148                description,
149                payer_amount_sat,
150                receiver_amount_sat,
151                accept_zero_conf,
152                create_response_json,
153                claim_private_key,
154                refund_private_key,
155                server_lockup_tx_id,
156                user_lockup_tx_id,
157                claim_fees_sat,
158                claim_tx_id,
159                refund_tx_id,
160                created_at,
161                state,
162                pair_fees_json,
163                actual_payer_amount_sat,
164                accepted_receiver_amount_sat,
165                auto_accepted_fees,
166                version,
167                last_updated_at,
168
169                sync_state.is_local
170            FROM chain_swaps
171            LEFT JOIN sync_state ON chain_swaps.id = sync_state.data_id
172            {where_clause_str}
173            ORDER BY created_at
174        "
175        )
176    }
177
178    pub(crate) fn fetch_chain_swap_by_id(&self, id: &str) -> Result<Option<ChainSwap>> {
179        let con: Connection = self.get_connection()?;
180        let query = Self::list_chain_swaps_query(vec!["id = ?1 or id_hash = ?1".to_string()]);
181        let res = con.query_row(&query, [id], Self::sql_row_to_chain_swap);
182
183        Ok(res.ok())
184    }
185
186    pub(crate) fn fetch_chain_swap_by_lockup_address(
187        &self,
188        lockup_address: &str,
189    ) -> Result<Option<ChainSwap>> {
190        let con: Connection = self.get_connection()?;
191        let query = Self::list_chain_swaps_query(vec!["lockup_address = ?1".to_string()]);
192        let res = con.query_row(&query, [lockup_address], Self::sql_row_to_chain_swap);
193
194        Ok(res.ok())
195    }
196
197    fn sql_row_to_chain_swap(row: &Row) -> rusqlite::Result<ChainSwap> {
198        Ok(ChainSwap {
199            id: row.get(0)?,
200            direction: row.get(1)?,
201            claim_address: row.get(2)?,
202            lockup_address: row.get(3)?,
203            refund_address: row.get(4)?,
204            timeout_block_height: row.get(5)?,
205            claim_timeout_block_height: row.get(6)?,
206            preimage: row.get(7)?,
207            description: row.get(8)?,
208            payer_amount_sat: row.get(9)?,
209            receiver_amount_sat: row.get(10)?,
210            accept_zero_conf: row.get(11)?,
211            create_response_json: row.get(12)?,
212            claim_private_key: row.get(13)?,
213            refund_private_key: row.get(14)?,
214            server_lockup_tx_id: row.get(15)?,
215            user_lockup_tx_id: row.get(16)?,
216            claim_fees_sat: row.get(17)?,
217            claim_tx_id: row.get(18)?,
218            refund_tx_id: row.get(19)?,
219            created_at: row.get(20)?,
220            state: row.get(21)?,
221            pair_fees_json: row.get(22)?,
222            actual_payer_amount_sat: row.get(23)?,
223            accepted_receiver_amount_sat: row.get(24)?,
224            auto_accepted_fees: row.get(25)?,
225            metadata: SwapMetadata {
226                version: row.get(26)?,
227                last_updated_at: row.get(27)?,
228                is_local: row.get::<usize, Option<bool>>(28)?.unwrap_or(true),
229            },
230        })
231    }
232
233    pub(crate) fn list_chain_swaps(&self) -> Result<Vec<ChainSwap>> {
234        let con: Connection = self.get_connection()?;
235        self.list_chain_swaps_where(&con, vec![])
236    }
237
238    pub(crate) fn list_chain_swaps_where(
239        &self,
240        con: &Connection,
241        where_clauses: Vec<String>,
242    ) -> Result<Vec<ChainSwap>> {
243        let query = Self::list_chain_swaps_query(where_clauses);
244        let chain_swaps = con
245            .prepare(&query)?
246            .query_map(params![], Self::sql_row_to_chain_swap)?
247            .map(|i| i.unwrap())
248            .collect();
249        Ok(chain_swaps)
250    }
251
252    pub(crate) fn list_chain_swaps_by_state(
253        &self,
254        states: Vec<PaymentState>,
255    ) -> Result<Vec<ChainSwap>> {
256        let con = self.get_connection()?;
257        let where_clause = vec![get_where_clause_state_in(&states)];
258        self.list_chain_swaps_where(&con, where_clause)
259    }
260
261    pub(crate) fn list_ongoing_chain_swaps(&self) -> Result<Vec<ChainSwap>> {
262        self.list_chain_swaps_by_state(vec![
263            PaymentState::Created,
264            PaymentState::Pending,
265            PaymentState::WaitingFeeAcceptance,
266        ])
267    }
268
269    pub(crate) fn list_pending_chain_swaps(&self) -> Result<Vec<ChainSwap>> {
270        self.list_chain_swaps_by_state(vec![
271            PaymentState::Pending,
272            PaymentState::RefundPending,
273            PaymentState::WaitingFeeAcceptance,
274        ])
275    }
276
277    pub(crate) fn list_refundable_chain_swaps(&self) -> Result<Vec<ChainSwap>> {
278        self.list_chain_swaps_by_state(vec![PaymentState::Refundable, PaymentState::RefundPending])
279    }
280
281    pub(crate) fn update_chain_swap_accept_zero_conf(
282        &self,
283        swap_id: &str,
284        accept_zero_conf: bool,
285    ) -> Result<(), PaymentError> {
286        let mut con: Connection = self.get_connection()?;
287        let tx = con.transaction_with_behavior(TransactionBehavior::Immediate)?;
288
289        tx.execute(
290            "UPDATE chain_swaps
291            SET
292                accept_zero_conf = :accept_zero_conf
293            WHERE
294                id = :id",
295            named_params! {
296                ":id": swap_id,
297                ":accept_zero_conf": accept_zero_conf,
298            },
299        )?;
300        self.commit_outgoing(
301            &tx,
302            swap_id,
303            RecordType::Chain,
304            Some(vec!["accept_zero_conf".to_string()]),
305        )?;
306        tx.commit()?;
307        self.trigger_sync();
308
309        Ok(())
310    }
311
312    /// Used for receive chain swaps, when the sender over/underpays
313    pub(crate) fn update_actual_payer_amount(
314        &self,
315        swap_id: &str,
316        actual_payer_amount_sat: u64,
317    ) -> Result<(), PaymentError> {
318        log::info!(
319            "Updating chain swap {swap_id}: actual_payer_amount_sat = {actual_payer_amount_sat}"
320        );
321        let con: Connection = self.get_connection()?;
322        con.execute(
323            "UPDATE chain_swaps 
324            SET actual_payer_amount_sat = :actual_payer_amount_sat
325            WHERE id = :id",
326            named_params! {
327                ":id": swap_id,
328                ":actual_payer_amount_sat": actual_payer_amount_sat,
329            },
330        )?;
331
332        Ok(())
333    }
334
335    /// Used for receive chain swaps, when fees are accepted and thus the agreed received amount is known
336    ///
337    /// Can also be used to erase a previously persisted accepted amount in case of failure to accept.
338    pub(crate) fn update_accepted_receiver_amount(
339        &self,
340        swap_id: &str,
341        accepted_receiver_amount_sat: Option<u64>,
342    ) -> Result<(), PaymentError> {
343        log::info!(
344            "Updating chain swap {swap_id}: accepted_receiver_amount_sat = {accepted_receiver_amount_sat:?}"
345        );
346        let mut con: Connection = self.get_connection()?;
347        let tx = con.transaction_with_behavior(TransactionBehavior::Immediate)?;
348
349        tx.execute(
350            "UPDATE chain_swaps 
351            SET accepted_receiver_amount_sat = :accepted_receiver_amount_sat
352            WHERE id = :id",
353            named_params! {
354                ":id": swap_id,
355                ":accepted_receiver_amount_sat": accepted_receiver_amount_sat,
356            },
357        )?;
358        self.commit_outgoing(
359            &tx,
360            swap_id,
361            RecordType::Chain,
362            Some(vec!["accepted_receiver_amount_sat".to_string()]),
363        )?;
364        tx.commit()?;
365        self.trigger_sync();
366
367        Ok(())
368    }
369
370    pub(crate) fn set_chain_swap_refund_address(
371        &self,
372        swap_id: &str,
373        refund_address: &str,
374    ) -> Result<(), PaymentError> {
375        let con = self.get_connection()?;
376        con.execute(
377            "UPDATE chain_swaps
378            SET refund_address = :refund_address
379            WHERE id = :id",
380            named_params! {
381                        ":id": swap_id,
382                        ":refund_address": refund_address,
383            },
384        )?;
385        Ok(())
386    }
387
388    pub(crate) fn set_chain_swap_auto_accepted_fees(
389        &self,
390        swap_id: &str,
391    ) -> Result<(), PaymentError> {
392        log::info!("Setting chain swap {swap_id}: auto_accepted_fees to TRUE");
393
394        let mut con = self.get_connection()?;
395        let tx = con.transaction_with_behavior(TransactionBehavior::Immediate)?;
396
397        tx.execute(
398            "UPDATE chain_swaps
399            SET auto_accepted_fees = 1
400            WHERE id = :id",
401            named_params! {
402                ":id": swap_id,
403            },
404        )?;
405        self.commit_outgoing(
406            &tx,
407            swap_id,
408            RecordType::Chain,
409            Some(vec!["auto_accepted_fees".to_string()]),
410        )?;
411        tx.commit()?;
412        self.trigger_sync();
413        Ok(())
414    }
415
416    // Only set the Chain Swap claim_tx_id if not set, otherwise return an error
417    pub(crate) fn set_chain_swap_claim(
418        &self,
419        swap_id: &str,
420        claim_address: Option<String>,
421        claim_tx_id: &str,
422    ) -> Result<(), PaymentError> {
423        let con = self.get_connection()?;
424        let row_count = con
425            .execute(
426                "UPDATE chain_swaps
427            SET claim_address = :claim_address, claim_tx_id = :claim_tx_id
428            WHERE id = :id AND claim_tx_id IS NULL",
429                named_params! {
430                            ":id": swap_id,
431                            ":claim_address": claim_address,
432                            ":claim_tx_id": claim_tx_id,
433                },
434            )
435            .map_err(|_| PaymentError::PersistError)?;
436        match row_count {
437            1 => Ok(()),
438            _ => Err(PaymentError::AlreadyClaimed),
439        }
440    }
441
442    // Only unset the Chain Swap claim_tx_id if set with the same tx id
443    pub(crate) fn unset_chain_swap_claim_tx_id(
444        &self,
445        swap_id: &str,
446        claim_tx_id: &str,
447    ) -> Result<(), PaymentError> {
448        let con = self.get_connection()?;
449        con.execute(
450            "UPDATE chain_swaps
451            SET claim_tx_id = NULL
452            WHERE id = :id AND claim_tx_id = :claim_tx_id",
453            named_params! {
454                        ":id": swap_id,
455                        ":claim_tx_id": claim_tx_id,
456            },
457        )
458        .map_err(|_| PaymentError::PersistError)?;
459        Ok(())
460    }
461
462    pub(crate) fn try_handle_chain_swap_update(
463        &self,
464        swap_update: &ChainSwapUpdate,
465    ) -> Result<(), PaymentError> {
466        // Do not overwrite server_lockup_tx_id, user_lockup_tx_id, claim_address, claim_tx_id
467        // Overwrite refund_tx_id if provided (refund tx fee bump)
468        let mut con = self.get_connection()?;
469        let tx = con.transaction_with_behavior(TransactionBehavior::Immediate)?;
470
471        tx.execute(
472            "UPDATE chain_swaps
473            SET
474                server_lockup_tx_id = COALESCE(server_lockup_tx_id, :server_lockup_tx_id),
475                user_lockup_tx_id = COALESCE(user_lockup_tx_id, :user_lockup_tx_id),
476                claim_address = COALESCE(claim_address, :claim_address),
477                claim_tx_id = COALESCE(claim_tx_id, :claim_tx_id),
478
479                refund_tx_id = COALESCE(:refund_tx_id, refund_tx_id),
480                state = :state
481            WHERE
482                id = :id",
483            named_params! {
484                ":id": swap_update.swap_id,
485                ":server_lockup_tx_id": swap_update.server_lockup_tx_id,
486                ":user_lockup_tx_id": swap_update.user_lockup_tx_id,
487                ":claim_address": swap_update.claim_address,
488                ":claim_tx_id": swap_update.claim_tx_id,
489                ":refund_tx_id": swap_update.refund_tx_id,
490                ":state": swap_update.to_state,
491            },
492        )?;
493
494        tx.commit()?;
495
496        Ok(())
497    }
498}
499
500#[derive(Clone, Debug, Serialize, Deserialize)]
501pub(crate) struct InternalCreateChainResponse {
502    pub(crate) claim_details: ChainSwapDetails,
503    pub(crate) lockup_details: ChainSwapDetails,
504}
505impl InternalCreateChainResponse {
506    pub(crate) fn try_convert_from_boltz(
507        boltz_create_response: &CreateChainResponse,
508        expected_swap_id: &str,
509    ) -> Result<InternalCreateChainResponse, PaymentError> {
510        // Do not store the CreateResponse fields that are already stored separately
511        // Before skipping them, ensure they match the separately stored ones
512        ensure_sdk!(
513            boltz_create_response.id == expected_swap_id,
514            PaymentError::PersistError
515        );
516
517        let res = InternalCreateChainResponse {
518            claim_details: boltz_create_response.claim_details.clone(),
519            lockup_details: boltz_create_response.lockup_details.clone(),
520        };
521        Ok(res)
522    }
523}
524
525#[cfg(test)]
526mod tests {
527    use crate::model::Direction;
528    use crate::test_utils::chain_swap::new_chain_swap;
529    use crate::test_utils::persist::create_persister;
530    use anyhow::Result;
531
532    #[cfg(feature = "browser-tests")]
533    wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
534
535    #[sdk_macros::async_test_all]
536    async fn test_writing_stale_swap() -> Result<()> {
537        create_persister!(storage);
538
539        let chain_swap = new_chain_swap(Direction::Incoming, None, false, None, false, false, None);
540        storage.insert_or_update_chain_swap(&chain_swap)?;
541
542        // read - update - write works if there are no updates in between
543        let mut chain_swap = storage.fetch_chain_swap_by_id(&chain_swap.id)?.unwrap();
544        chain_swap.claim_tx_id = Some("tx_id".to_string());
545        storage.insert_or_update_chain_swap(&chain_swap)?;
546
547        // read - update - write works if there are no updates in between even if no field changes
548        let chain_swap = storage.fetch_chain_swap_by_id(&chain_swap.id)?.unwrap();
549        storage.insert_or_update_chain_swap(&chain_swap)?;
550
551        // read - update - write fails if there are any updates in between
552        let mut chain_swap = storage.fetch_chain_swap_by_id(&chain_swap.id)?.unwrap();
553        chain_swap.claim_tx_id = Some("tx_id_2".to_string());
554        // Concurrent update
555        storage.update_chain_swap_accept_zero_conf(&chain_swap.id, true)?;
556        assert!(storage.insert_or_update_chain_swap(&chain_swap).is_err());
557
558        Ok(())
559    }
560}