breez_sdk_liquid/persist/
chain.rs

1use anyhow::{anyhow, Result};
2use boltz_client::swaps::boltz::{ChainSwapDetails, CreateChainResponse};
3use rusqlite::{named_params, params, Connection, Row, TransactionBehavior};
4use sdk_common::bitcoin::hashes::{hex::ToHex, sha256, Hash};
5use serde::{Deserialize, Serialize};
6
7use crate::ensure_sdk;
8use crate::error::PaymentError;
9use crate::model::*;
10use crate::persist::{get_where_clause_state_in, where_clauses_to_string, Persister};
11use crate::sync::model::data::ChainSyncData;
12use crate::sync::model::RecordType;
13use crate::utils::{
14    from_optional_u64_to_row, from_row_to_optional_u64, from_row_to_u64, from_u64_to_row,
15};
16
17impl Persister {
18    pub(crate) fn insert_or_update_chain_swap_inner(
19        con: &Connection,
20        chain_swap: &ChainSwap,
21    ) -> Result<()> {
22        // There is a limit of 16 param elements in a single tuple in rusqlite,
23        // so we split up the insert into two statements.
24        let id_hash = sha256::Hash::hash(chain_swap.id.as_bytes()).to_hex();
25        con.execute(
26            "
27            INSERT INTO chain_swaps (
28                id,
29                id_hash,
30                direction,
31                lockup_address,
32                timeout_block_height,
33                claim_timeout_block_height,
34                preimage,
35                payer_amount_sat,
36                receiver_amount_sat,
37                accept_zero_conf,
38                create_response_json,
39                claim_private_key,
40                refund_private_key,
41                claim_fees_sat,
42                created_at,
43                state
44            )
45            VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
46		    ON CONFLICT DO NOTHING",
47            (
48                &chain_swap.id,
49                &id_hash,
50                &chain_swap.direction,
51                &chain_swap.lockup_address,
52                &chain_swap.timeout_block_height,
53                &chain_swap.claim_timeout_block_height,
54                &chain_swap.preimage,
55                from_u64_to_row(chain_swap.payer_amount_sat)?,
56                from_u64_to_row(chain_swap.receiver_amount_sat)?,
57                &chain_swap.accept_zero_conf,
58                &chain_swap.create_response_json,
59                &chain_swap.claim_private_key,
60                &chain_swap.refund_private_key,
61                from_u64_to_row(chain_swap.claim_fees_sat)?,
62                &chain_swap.created_at,
63                &chain_swap.state,
64            ),
65        )?;
66
67        let rows_affected = con.execute(
68            "UPDATE chain_swaps
69            SET
70                description = :description,
71                accept_zero_conf = :accept_zero_conf,
72                server_lockup_tx_id = :server_lockup_tx_id,
73                user_lockup_tx_id = :user_lockup_tx_id,
74                claim_address = :claim_address,
75                claim_tx_id = :claim_tx_id,
76                refund_address = :refund_address,
77                refund_tx_id = :refund_tx_id,
78                pair_fees_json = :pair_fees_json,
79                state = :state,
80                actual_payer_amount_sat = :actual_payer_amount_sat,
81                accepted_receiver_amount_sat = :accepted_receiver_amount_sat,
82                user_lockup_spent = :user_lockup_spent
83            WHERE
84                id = :id AND
85                version = :version",
86            named_params! {
87                ":id": &chain_swap.id,
88                ":description": &chain_swap.description,
89                ":accept_zero_conf": &chain_swap.accept_zero_conf,
90                ":server_lockup_tx_id": &chain_swap.server_lockup_tx_id,
91                ":user_lockup_tx_id": &chain_swap.user_lockup_tx_id,
92                ":claim_address": &chain_swap.claim_address,
93                ":claim_tx_id": &chain_swap.claim_tx_id,
94                ":refund_address": &chain_swap.refund_address,
95                ":refund_tx_id": &chain_swap.refund_tx_id,
96                ":pair_fees_json": &chain_swap.pair_fees_json,
97                ":state": &chain_swap.state,
98                ":actual_payer_amount_sat": from_optional_u64_to_row(&chain_swap.actual_payer_amount_sat)?,
99                ":accepted_receiver_amount_sat": from_optional_u64_to_row(&chain_swap.accepted_receiver_amount_sat)?,
100                ":user_lockup_spent": &chain_swap.user_lockup_spent,
101                ":version": from_u64_to_row(chain_swap.metadata.version)?,
102            },
103        )?;
104        ensure_sdk!(
105            rows_affected > 0,
106            anyhow!("Version mismatch for chain swap {}", chain_swap.id)
107        );
108
109        Ok(())
110    }
111
112    pub(crate) fn insert_or_update_chain_swap(&self, chain_swap: &ChainSwap) -> Result<()> {
113        let maybe_swap = self.fetch_chain_swap_by_id(&chain_swap.id)?;
114        let updated_fields = ChainSyncData::updated_fields(maybe_swap, chain_swap);
115
116        let mut con = self.get_connection()?;
117        let tx = con.transaction_with_behavior(TransactionBehavior::Immediate)?;
118
119        Self::insert_or_update_chain_swap_inner(&tx, chain_swap)?;
120
121        // Trigger a sync if:
122        // - updated_fields is None (swap is inserted, not updated)
123        // - updated_fields in a non empty list of updated fields
124        let trigger_sync = updated_fields.as_ref().is_none_or(|u| !u.is_empty());
125        match trigger_sync {
126            true => {
127                self.commit_outgoing(&tx, &chain_swap.id, RecordType::Chain, updated_fields)?;
128                tx.commit()?;
129                self.trigger_sync();
130            }
131            false => {
132                tx.commit()?;
133            }
134        };
135
136        Ok(())
137    }
138
139    fn list_chain_swaps_query(where_clauses: Vec<String>) -> String {
140        let where_clause_str = where_clauses_to_string(where_clauses);
141
142        format!(
143            "
144            SELECT
145                id,
146                direction,
147                claim_address,
148                lockup_address,
149                refund_address,
150                timeout_block_height,
151                claim_timeout_block_height,
152                preimage,
153                description,
154                payer_amount_sat,
155                receiver_amount_sat,
156                accept_zero_conf,
157                create_response_json,
158                claim_private_key,
159                refund_private_key,
160                server_lockup_tx_id,
161                user_lockup_tx_id,
162                claim_fees_sat,
163                claim_tx_id,
164                refund_tx_id,
165                created_at,
166                state,
167                pair_fees_json,
168                actual_payer_amount_sat,
169                accepted_receiver_amount_sat,
170                auto_accepted_fees,
171                user_lockup_spent,
172                version,
173                last_updated_at,
174
175                sync_state.is_local
176            FROM chain_swaps
177            LEFT JOIN sync_state ON chain_swaps.id = sync_state.data_id
178            {where_clause_str}
179            ORDER BY created_at
180        "
181        )
182    }
183
184    pub(crate) fn fetch_chain_swap_by_id(&self, id: &str) -> Result<Option<ChainSwap>> {
185        let con: Connection = self.get_connection()?;
186        let query = Self::list_chain_swaps_query(vec!["id = ?1 or id_hash = ?1".to_string()]);
187        let res = con.query_row(&query, [id], Self::sql_row_to_chain_swap);
188
189        Ok(res.ok())
190    }
191
192    pub(crate) fn fetch_chain_swap_by_lockup_address(
193        &self,
194        lockup_address: &str,
195    ) -> Result<Option<ChainSwap>> {
196        let con: Connection = self.get_connection()?;
197        let query = Self::list_chain_swaps_query(vec!["lockup_address = ?1".to_string()]);
198        let res = con.query_row(&query, [lockup_address], Self::sql_row_to_chain_swap);
199
200        Ok(res.ok())
201    }
202
203    fn sql_row_to_chain_swap(row: &Row) -> rusqlite::Result<ChainSwap> {
204        Ok(ChainSwap {
205            id: row.get(0)?,
206            direction: row.get(1)?,
207            claim_address: row.get(2)?,
208            lockup_address: row.get(3)?,
209            refund_address: row.get(4)?,
210            timeout_block_height: row.get(5)?,
211            claim_timeout_block_height: row.get(6)?,
212            preimage: row.get(7)?,
213            description: row.get(8)?,
214            payer_amount_sat: from_row_to_u64(row, 9)?,
215            receiver_amount_sat: from_row_to_u64(row, 10)?,
216            accept_zero_conf: row.get(11)?,
217            create_response_json: row.get(12)?,
218            claim_private_key: row.get(13)?,
219            refund_private_key: row.get(14)?,
220            server_lockup_tx_id: row.get(15)?,
221            user_lockup_tx_id: row.get(16)?,
222            claim_fees_sat: from_row_to_u64(row, 17)?,
223            claim_tx_id: row.get(18)?,
224            refund_tx_id: row.get(19)?,
225            created_at: row.get(20)?,
226            state: row.get(21)?,
227            pair_fees_json: row.get(22)?,
228            actual_payer_amount_sat: from_row_to_optional_u64(row, 23)?,
229            accepted_receiver_amount_sat: from_row_to_optional_u64(row, 24)?,
230            auto_accepted_fees: row.get(25)?,
231            user_lockup_spent: row.get(26)?,
232            metadata: SwapMetadata {
233                version: from_row_to_u64(row, 27)?,
234                last_updated_at: row.get(28)?,
235                is_local: row.get::<usize, Option<bool>>(29)?.unwrap_or(true),
236            },
237        })
238    }
239
240    pub(crate) fn list_chain_swaps(&self) -> Result<Vec<ChainSwap>> {
241        let con: Connection = self.get_connection()?;
242        self.list_chain_swaps_where(&con, vec![])
243    }
244
245    pub(crate) fn list_chain_swaps_where(
246        &self,
247        con: &Connection,
248        where_clauses: Vec<String>,
249    ) -> Result<Vec<ChainSwap>> {
250        let query = Self::list_chain_swaps_query(where_clauses);
251        let chain_swaps = con
252            .prepare(&query)?
253            .query_map(params![], Self::sql_row_to_chain_swap)?
254            .map(|i| i.unwrap())
255            .collect();
256        Ok(chain_swaps)
257    }
258
259    pub(crate) fn list_chain_swaps_by_state(
260        &self,
261        states: Vec<PaymentState>,
262    ) -> Result<Vec<ChainSwap>> {
263        let con = self.get_connection()?;
264        let where_clause = vec![get_where_clause_state_in(&states)];
265        self.list_chain_swaps_where(&con, where_clause)
266    }
267
268    pub(crate) fn list_ongoing_chain_swaps(&self) -> Result<Vec<ChainSwap>> {
269        self.list_chain_swaps_by_state(vec![
270            PaymentState::Created,
271            PaymentState::Pending,
272            PaymentState::WaitingFeeAcceptance,
273        ])
274    }
275
276    pub(crate) fn list_pending_chain_swaps(&self) -> Result<Vec<ChainSwap>> {
277        self.list_chain_swaps_by_state(vec![
278            PaymentState::Pending,
279            PaymentState::RefundPending,
280            PaymentState::WaitingFeeAcceptance,
281        ])
282    }
283
284    pub(crate) fn list_refundable_chain_swaps(&self) -> Result<Vec<ChainSwap>> {
285        self.list_chain_swaps_by_state(vec![PaymentState::Refundable, PaymentState::RefundPending])
286    }
287
288    pub(crate) fn update_chain_swap_accept_zero_conf(
289        &self,
290        swap_id: &str,
291        accept_zero_conf: bool,
292    ) -> Result<(), PaymentError> {
293        let mut con: Connection = self.get_connection()?;
294        let tx = con.transaction_with_behavior(TransactionBehavior::Immediate)?;
295
296        tx.execute(
297            "UPDATE chain_swaps
298            SET
299                accept_zero_conf = :accept_zero_conf
300            WHERE
301                id = :id",
302            named_params! {
303                ":id": swap_id,
304                ":accept_zero_conf": accept_zero_conf,
305            },
306        )?;
307        self.commit_outgoing(
308            &tx,
309            swap_id,
310            RecordType::Chain,
311            Some(vec!["accept_zero_conf".to_string()]),
312        )?;
313        tx.commit()?;
314        self.trigger_sync();
315
316        Ok(())
317    }
318
319    /// Used for receive chain swaps, when the sender over/underpays
320    pub(crate) fn update_actual_payer_amount(
321        &self,
322        swap_id: &str,
323        actual_payer_amount_sat: u64,
324    ) -> Result<(), PaymentError> {
325        log::info!(
326            "Updating chain swap {swap_id}: actual_payer_amount_sat = {actual_payer_amount_sat}"
327        );
328        let con: Connection = self.get_connection()?;
329        con.execute(
330            "UPDATE chain_swaps 
331            SET actual_payer_amount_sat = :actual_payer_amount_sat
332            WHERE id = :id",
333            named_params! {
334                ":id": swap_id,
335                ":actual_payer_amount_sat": from_u64_to_row(actual_payer_amount_sat)?,
336            },
337        )?;
338
339        Ok(())
340    }
341
342    /// Used for receive chain swaps, when fees are accepted and thus the agreed received amount is known
343    ///
344    /// Can also be used to erase a previously persisted accepted amount in case of failure to accept.
345    pub(crate) fn update_accepted_receiver_amount(
346        &self,
347        swap_id: &str,
348        accepted_receiver_amount_sat: Option<u64>,
349    ) -> Result<(), PaymentError> {
350        log::info!(
351            "Updating chain swap {swap_id}: accepted_receiver_amount_sat = {accepted_receiver_amount_sat:?}"
352        );
353        let mut con: Connection = self.get_connection()?;
354        let tx = con.transaction_with_behavior(TransactionBehavior::Immediate)?;
355
356        tx.execute(
357            "UPDATE chain_swaps 
358            SET accepted_receiver_amount_sat = :accepted_receiver_amount_sat
359            WHERE id = :id",
360            named_params! {
361                ":id": swap_id,
362                ":accepted_receiver_amount_sat": from_optional_u64_to_row(&accepted_receiver_amount_sat)?,
363            },
364        )?;
365        self.commit_outgoing(
366            &tx,
367            swap_id,
368            RecordType::Chain,
369            Some(vec!["accepted_receiver_amount_sat".to_string()]),
370        )?;
371        tx.commit()?;
372        self.trigger_sync();
373
374        Ok(())
375    }
376
377    pub(crate) fn set_chain_swap_refund_address(
378        &self,
379        swap_id: &str,
380        refund_address: &str,
381    ) -> Result<(), PaymentError> {
382        let con = self.get_connection()?;
383        con.execute(
384            "UPDATE chain_swaps
385            SET refund_address = :refund_address
386            WHERE id = :id",
387            named_params! {
388                        ":id": swap_id,
389                        ":refund_address": refund_address,
390            },
391        )?;
392        Ok(())
393    }
394
395    pub(crate) fn set_chain_swap_auto_accepted_fees(
396        &self,
397        swap_id: &str,
398    ) -> Result<(), PaymentError> {
399        log::info!("Setting chain swap {swap_id}: auto_accepted_fees to TRUE");
400
401        let mut con = self.get_connection()?;
402        let tx = con.transaction_with_behavior(TransactionBehavior::Immediate)?;
403
404        tx.execute(
405            "UPDATE chain_swaps
406            SET auto_accepted_fees = 1
407            WHERE id = :id",
408            named_params! {
409                ":id": swap_id,
410            },
411        )?;
412        self.commit_outgoing(
413            &tx,
414            swap_id,
415            RecordType::Chain,
416            Some(vec!["auto_accepted_fees".to_string()]),
417        )?;
418        tx.commit()?;
419        self.trigger_sync();
420        Ok(())
421    }
422
423    // Only set the Chain Swap claim_tx_id if not set, otherwise return an error
424    pub(crate) fn set_chain_swap_claim(
425        &self,
426        swap_id: &str,
427        claim_address: Option<String>,
428        claim_tx_id: &str,
429    ) -> Result<(), PaymentError> {
430        let con = self.get_connection()?;
431        let row_count = con
432            .execute(
433                "UPDATE chain_swaps
434            SET claim_address = :claim_address, claim_tx_id = :claim_tx_id
435            WHERE id = :id AND claim_tx_id IS NULL",
436                named_params! {
437                            ":id": swap_id,
438                            ":claim_address": claim_address,
439                            ":claim_tx_id": claim_tx_id,
440                },
441            )
442            .map_err(|_| PaymentError::PersistError)?;
443        match row_count {
444            1 => Ok(()),
445            _ => Err(PaymentError::AlreadyClaimed),
446        }
447    }
448
449    // Only unset the Chain Swap claim_tx_id if set with the same tx id
450    pub(crate) fn unset_chain_swap_claim_tx_id(
451        &self,
452        swap_id: &str,
453        claim_tx_id: &str,
454    ) -> Result<(), PaymentError> {
455        let con = self.get_connection()?;
456        con.execute(
457            "UPDATE chain_swaps
458            SET claim_tx_id = NULL
459            WHERE id = :id AND claim_tx_id = :claim_tx_id",
460            named_params! {
461                        ":id": swap_id,
462                        ":claim_tx_id": claim_tx_id,
463            },
464        )
465        .map_err(|_| PaymentError::PersistError)?;
466        Ok(())
467    }
468
469    pub(crate) fn try_handle_chain_swap_update(
470        &self,
471        swap_update: &ChainSwapUpdate,
472    ) -> Result<(), PaymentError> {
473        // Do not overwrite server_lockup_tx_id, user_lockup_tx_id, claim_address, claim_tx_id
474        // Overwrite refund_tx_id if provided (refund tx fee bump)
475        let mut con = self.get_connection()?;
476        let tx = con.transaction_with_behavior(TransactionBehavior::Immediate)?;
477
478        tx.execute(
479            "UPDATE chain_swaps
480            SET
481                server_lockup_tx_id = COALESCE(server_lockup_tx_id, :server_lockup_tx_id),
482                user_lockup_tx_id = COALESCE(user_lockup_tx_id, :user_lockup_tx_id),
483                claim_address = COALESCE(claim_address, :claim_address),
484                claim_tx_id = COALESCE(claim_tx_id, :claim_tx_id),
485
486                refund_tx_id = COALESCE(:refund_tx_id, refund_tx_id),
487                state = :state
488            WHERE
489                id = :id",
490            named_params! {
491                ":id": swap_update.swap_id,
492                ":server_lockup_tx_id": swap_update.server_lockup_tx_id,
493                ":user_lockup_tx_id": swap_update.user_lockup_tx_id,
494                ":claim_address": swap_update.claim_address,
495                ":claim_tx_id": swap_update.claim_tx_id,
496                ":refund_tx_id": swap_update.refund_tx_id,
497                ":state": swap_update.to_state,
498            },
499        )?;
500
501        tx.commit()?;
502
503        Ok(())
504    }
505}
506
507#[derive(Clone, Debug, Serialize, Deserialize)]
508pub(crate) struct InternalCreateChainResponse {
509    pub(crate) claim_details: ChainSwapDetails,
510    pub(crate) lockup_details: ChainSwapDetails,
511}
512impl InternalCreateChainResponse {
513    pub(crate) fn try_convert_from_boltz(
514        boltz_create_response: &CreateChainResponse,
515        expected_swap_id: &str,
516    ) -> Result<InternalCreateChainResponse, PaymentError> {
517        // Do not store the CreateResponse fields that are already stored separately
518        // Before skipping them, ensure they match the separately stored ones
519        ensure_sdk!(
520            boltz_create_response.id == expected_swap_id,
521            PaymentError::PersistError
522        );
523
524        let res = InternalCreateChainResponse {
525            claim_details: boltz_create_response.claim_details.clone(),
526            lockup_details: boltz_create_response.lockup_details.clone(),
527        };
528        Ok(res)
529    }
530}
531
532#[cfg(test)]
533mod tests {
534    use crate::model::Direction;
535    use crate::test_utils::chain_swap::new_chain_swap;
536    use crate::test_utils::persist::create_persister;
537    use anyhow::Result;
538
539    #[cfg(feature = "browser-tests")]
540    wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
541
542    #[sdk_macros::async_test_all]
543    async fn test_writing_stale_swap() -> Result<()> {
544        create_persister!(storage);
545
546        let chain_swap = new_chain_swap(Direction::Incoming, None, false, None, false, false, None);
547        storage.insert_or_update_chain_swap(&chain_swap)?;
548
549        // read - update - write works if there are no updates in between
550        let mut chain_swap = storage.fetch_chain_swap_by_id(&chain_swap.id)?.unwrap();
551        chain_swap.claim_tx_id = Some("tx_id".to_string());
552        storage.insert_or_update_chain_swap(&chain_swap)?;
553
554        // read - update - write works if there are no updates in between even if no field changes
555        let chain_swap = storage.fetch_chain_swap_by_id(&chain_swap.id)?.unwrap();
556        storage.insert_or_update_chain_swap(&chain_swap)?;
557
558        // read - update - write fails if there are any updates in between
559        let mut chain_swap = storage.fetch_chain_swap_by_id(&chain_swap.id)?.unwrap();
560        chain_swap.claim_tx_id = Some("tx_id_2".to_string());
561        // Concurrent update
562        storage.update_chain_swap_accept_zero_conf(&chain_swap.id, true)?;
563        assert!(storage.insert_or_update_chain_swap(&chain_swap).is_err());
564
565        Ok(())
566    }
567}