breez_sdk_liquid/persist/
send.rs

1use anyhow::{anyhow, Result};
2use boltz_client::swaps::boltz::CreateSubmarineResponse;
3use rusqlite::{named_params, params, Connection, Row};
4use sdk_common::bitcoin::hashes::{hex::ToHex, sha256, Hash};
5use serde::{Deserialize, Serialize};
6
7use crate::error::PaymentError;
8use crate::model::*;
9use crate::persist::{get_where_clause_state_in, Persister};
10use crate::sync::model::data::SendSyncData;
11use crate::sync::model::RecordType;
12use crate::{ensure_sdk, get_updated_fields};
13
14use super::where_clauses_to_string;
15
16impl Persister {
17    pub(crate) fn insert_or_update_send_swap_inner(
18        con: &Connection,
19        send_swap: &SendSwap,
20    ) -> Result<()> {
21        let id_hash = sha256::Hash::hash(send_swap.id.as_bytes()).to_hex();
22        con.execute(
23            "
24            INSERT INTO send_swaps (
25                id,
26                id_hash,
27                invoice,
28                bolt12_offer,
29                payment_hash,
30                destination_pubkey,
31                timeout_block_height,
32                payer_amount_sat,
33                receiver_amount_sat,
34                create_response_json,
35                refund_private_key,
36                created_at,
37                state,
38                pair_fees_json
39            )
40            VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
41            ON CONFLICT DO NOTHING
42            ",
43            (
44                &send_swap.id,
45                &id_hash,
46                &send_swap.invoice,
47                &send_swap.bolt12_offer,
48                &send_swap.payment_hash,
49                &send_swap.destination_pubkey,
50                &send_swap.timeout_block_height,
51                &send_swap.payer_amount_sat,
52                &send_swap.receiver_amount_sat,
53                &send_swap.create_response_json,
54                &send_swap.refund_private_key,
55                &send_swap.created_at,
56                &send_swap.state,
57                &send_swap.pair_fees_json,
58            ),
59        )?;
60
61        let rows_affected = con.execute(
62            "UPDATE send_swaps 
63            SET
64                description = :description,
65                preimage = :preimage,
66                lockup_tx_id = :lockup_tx_id,
67                refund_address = :refund_address,
68                refund_tx_id = :refund_tx_id,
69                state = :state
70            WHERE
71                id = :id AND
72                version = :version",
73            named_params! {
74                ":id": &send_swap.id,
75                ":description": &send_swap.description,
76                ":preimage": &send_swap.preimage,
77                ":lockup_tx_id": &send_swap.lockup_tx_id,
78                ":refund_address": &send_swap.refund_address,
79                ":refund_tx_id": &send_swap.refund_tx_id,
80                ":state": &send_swap.state,
81                ":version": &send_swap.metadata.version,
82            },
83        )?;
84        ensure_sdk!(
85            rows_affected > 0,
86            anyhow!("Version mismatch for send swap {}", send_swap.id)
87        );
88
89        Ok(())
90    }
91
92    pub(crate) fn insert_or_update_send_swap(&self, send_swap: &SendSwap) -> Result<()> {
93        let maybe_swap = self.fetch_send_swap_by_id(&send_swap.id)?;
94        let updated_fields = SendSyncData::updated_fields(maybe_swap, send_swap);
95
96        let mut con = self.get_connection()?;
97        let tx = con.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
98
99        Self::insert_or_update_send_swap_inner(&tx, send_swap)?;
100
101        // Trigger a sync if:
102        // - updated_fields is None (swap is inserted, not updated)
103        // - updated_fields in a non empty list of updated fields
104        let trigger_sync = updated_fields.as_ref().is_none_or(|u| !u.is_empty());
105        match trigger_sync {
106            true => {
107                self.commit_outgoing(&tx, &send_swap.id, RecordType::Send, updated_fields)?;
108                tx.commit()?;
109                self.trigger_sync();
110            }
111            false => {
112                tx.commit()?;
113            }
114        };
115
116        Ok(())
117    }
118
119    pub(crate) fn update_send_swaps_by_state(
120        &self,
121        from_state: PaymentState,
122        to_state: PaymentState,
123        is_local: Option<bool>,
124    ) -> Result<()> {
125        let con = self.get_connection()?;
126        let mut where_clauses = vec!["state = :from_state".to_string()];
127        if let Some(is_local) = is_local {
128            let mut where_is_local = format!("sync_state.is_local = {}", is_local as u8);
129            if is_local {
130                where_is_local = format!("({} OR sync_state.is_local IS NULL)", where_is_local);
131            }
132            where_clauses.push(where_is_local);
133        }
134
135        let where_clause_str = where_clauses_to_string(where_clauses);
136        let query = format!(
137            "
138            UPDATE send_swaps
139            SET state = :to_state
140            WHERE id IN (
141                SELECT id
142                FROM send_swaps AS ss
143                LEFT JOIN sync_state ON ss.id = sync_state.data_id
144                {where_clause_str}
145                ORDER BY created_at
146            )
147            "
148        );
149
150        con.execute(
151            &query,
152            named_params! {
153                ":from_state": from_state,
154                ":to_state": to_state,
155            },
156        )?;
157
158        Ok(())
159    }
160
161    fn list_send_swaps_query(where_clauses: Vec<String>) -> String {
162        let mut where_clause_str = String::new();
163        if !where_clauses.is_empty() {
164            where_clause_str = String::from("WHERE ");
165            where_clause_str.push_str(where_clauses.join(" AND ").as_str());
166        }
167
168        format!(
169            "
170            SELECT
171                id,
172                invoice,
173                bolt12_offer,
174                payment_hash,
175                destination_pubkey,
176                timeout_block_height,
177                description,
178                preimage,
179                payer_amount_sat,
180                receiver_amount_sat,
181                create_response_json,
182                refund_private_key,
183                lockup_tx_id,
184                refund_address,
185                refund_tx_id,
186                created_at,
187                state,
188                pair_fees_json,
189                version,
190                last_updated_at,
191
192                sync_state.is_local
193            FROM send_swaps AS ss
194            LEFT JOIN sync_state ON ss.id = sync_state.data_id
195            {where_clause_str}
196            ORDER BY created_at
197        "
198        )
199    }
200
201    pub(crate) fn fetch_send_swap_by_id(&self, id: &str) -> Result<Option<SendSwap>> {
202        let con: Connection = self.get_connection()?;
203        let query = Self::list_send_swaps_query(vec!["id = ?1 or id_hash = ?1".to_string()]);
204        let res = con.query_row(&query, [id], Self::sql_row_to_send_swap);
205
206        Ok(res.ok())
207    }
208
209    pub(crate) fn fetch_send_swap_by_invoice(&self, invoice: &str) -> Result<Option<SendSwap>> {
210        let con: Connection = self.get_connection()?;
211        let query = Self::list_send_swaps_query(vec!["invoice= ?1".to_string()]);
212        let res = con.query_row(&query, [invoice], Self::sql_row_to_send_swap);
213
214        Ok(res.ok())
215    }
216
217    fn sql_row_to_send_swap(row: &Row) -> rusqlite::Result<SendSwap> {
218        Ok(SendSwap {
219            id: row.get(0)?,
220            invoice: row.get(1)?,
221            bolt12_offer: row.get(2)?,
222            payment_hash: row.get(3)?,
223            destination_pubkey: row.get(4)?,
224            timeout_block_height: row.get(5)?,
225            description: row.get(6)?,
226            preimage: row.get(7)?,
227            payer_amount_sat: row.get(8)?,
228            receiver_amount_sat: row.get(9)?,
229            create_response_json: row.get(10)?,
230            refund_private_key: row.get(11)?,
231            lockup_tx_id: row.get(12)?,
232            refund_address: row.get(13)?,
233            refund_tx_id: row.get(14)?,
234            created_at: row.get(15)?,
235            state: row.get(16)?,
236            pair_fees_json: row.get(17)?,
237            metadata: SwapMetadata {
238                version: row.get(18)?,
239                last_updated_at: row.get(19)?,
240                is_local: row.get::<usize, Option<bool>>(20)?.unwrap_or(true),
241            },
242        })
243    }
244
245    pub(crate) fn list_send_swaps_where(
246        &self,
247        con: &Connection,
248        where_clauses: Vec<String>,
249    ) -> Result<Vec<SendSwap>> {
250        let query = Self::list_send_swaps_query(where_clauses);
251        let ongoing_send = con
252            .prepare(&query)?
253            .query_map(params![], Self::sql_row_to_send_swap)?
254            .map(|i| i.unwrap())
255            .collect();
256        Ok(ongoing_send)
257    }
258
259    pub(crate) fn list_send_swaps_by_state(
260        &self,
261        states: Vec<PaymentState>,
262    ) -> Result<Vec<SendSwap>> {
263        let con = self.get_connection()?;
264        let where_clause = vec![get_where_clause_state_in(&states)];
265        self.list_send_swaps_where(&con, where_clause)
266    }
267
268    pub(crate) fn list_ongoing_send_swaps(&self) -> Result<Vec<SendSwap>> {
269        self.list_send_swaps_by_state(vec![PaymentState::Created, PaymentState::Pending])
270    }
271
272    pub(crate) fn list_pending_send_swaps(&self) -> Result<Vec<SendSwap>> {
273        self.list_send_swaps_by_state(vec![PaymentState::Pending, PaymentState::RefundPending])
274    }
275
276    pub(crate) fn list_recoverable_send_swaps(&self) -> Result<Vec<SendSwap>> {
277        self.list_send_swaps_by_state(vec![
278            PaymentState::Created,
279            PaymentState::Pending,
280            PaymentState::RefundPending,
281        ])
282    }
283
284    pub(crate) fn try_handle_send_swap_update(
285        &self,
286        swap_id: &str,
287        to_state: PaymentState,
288        preimage: Option<&str>,
289        lockup_tx_id: Option<&str>,
290        refund_tx_id: Option<&str>,
291    ) -> Result<(), PaymentError> {
292        // Do not overwrite preimage, lockup_tx_id, refund_tx_id
293        let mut con = self.get_connection()?;
294        let tx = con.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
295
296        tx.execute(
297            "UPDATE send_swaps
298            SET
299                preimage = COALESCE(preimage, :preimage),
300                lockup_tx_id = COALESCE(lockup_tx_id, :lockup_tx_id),
301                refund_tx_id = COALESCE(refund_tx_id, :refund_tx_id),
302                state = :state
303            WHERE
304                id = :id",
305            named_params! {
306                ":id": swap_id,
307                ":preimage": preimage,
308                ":lockup_tx_id": lockup_tx_id,
309                ":refund_tx_id": refund_tx_id,
310                ":state": to_state,
311            },
312        )?;
313
314        let updated_fields = get_updated_fields!(preimage);
315        self.commit_outgoing(&tx, swap_id, RecordType::Send, updated_fields)?;
316        tx.commit()?;
317
318        self.trigger_sync();
319
320        Ok(())
321    }
322
323    pub(crate) fn set_send_swap_refund_address(
324        &self,
325        swap_id: &str,
326        refund_address: &str,
327    ) -> Result<(), PaymentError> {
328        let con = self.get_connection()?;
329        con.execute(
330            "UPDATE send_swaps
331            SET refund_address = :refund_address
332            WHERE id = :id",
333            named_params! {
334                        ":id": swap_id,
335                        ":refund_address": refund_address,
336            },
337        )?;
338        Ok(())
339    }
340
341    pub(crate) fn set_send_swap_lockup_tx_id(
342        &self,
343        swap_id: &str,
344        lockup_tx_id: &str,
345    ) -> Result<(), PaymentError> {
346        let con = self.get_connection()?;
347
348        let row_count = con
349            .execute(
350                "UPDATE send_swaps
351                SET lockup_tx_id = :lockup_tx_id
352                WHERE id = :id AND lockup_tx_id IS NULL",
353                named_params! {
354                    ":id": swap_id,
355                    ":lockup_tx_id": lockup_tx_id,
356                },
357            )
358            .map_err(|_| PaymentError::PersistError)?;
359        match row_count {
360            1 => Ok(()),
361            _ => Err(PaymentError::PaymentInProgress),
362        }
363    }
364
365    pub(crate) fn unset_send_swap_lockup_tx_id(
366        &self,
367        swap_id: &str,
368        lockup_tx_id: &str,
369    ) -> Result<(), PaymentError> {
370        let con = self.get_connection()?;
371        con.execute(
372            "UPDATE send_swaps
373            SET lockup_tx_id = NULL
374            WHERE id = :id AND lockup_tx_id = :lockup_tx_id",
375            named_params! {
376                ":id": swap_id,
377                ":lockup_tx_id": lockup_tx_id,
378            },
379        )
380        .map_err(|_| PaymentError::PersistError)?;
381        Ok(())
382    }
383}
384
385#[derive(Clone, Debug, Serialize, Deserialize)]
386pub(crate) struct InternalCreateSubmarineResponse {
387    pub(crate) accept_zero_conf: bool,
388    pub(crate) address: String,
389    pub(crate) bip21: String,
390    pub(crate) claim_public_key: String,
391    pub(crate) expected_amount: u64,
392    pub(crate) referral_id: Option<String>,
393    pub(crate) swap_tree: InternalSwapTree,
394    #[serde(default)]
395    pub(crate) timeout_block_height: u64,
396    pub(crate) blinding_key: Option<String>,
397}
398impl InternalCreateSubmarineResponse {
399    pub(crate) fn try_convert_from_boltz(
400        boltz_create_response: &CreateSubmarineResponse,
401        expected_swap_id: &str,
402    ) -> Result<InternalCreateSubmarineResponse, PaymentError> {
403        // Do not store the CreateResponse fields that are already stored separately
404        // Before skipping them, ensure they match the separately stored ones
405        ensure_sdk!(
406            boltz_create_response.id == expected_swap_id,
407            PaymentError::PersistError
408        );
409
410        let res = InternalCreateSubmarineResponse {
411            accept_zero_conf: boltz_create_response.accept_zero_conf,
412            address: boltz_create_response.address.clone(),
413            bip21: boltz_create_response.bip21.clone(),
414            claim_public_key: boltz_create_response.claim_public_key.to_string(),
415            expected_amount: boltz_create_response.expected_amount,
416            referral_id: boltz_create_response.referral_id.clone(),
417            swap_tree: boltz_create_response.swap_tree.clone().into(),
418            timeout_block_height: boltz_create_response.timeout_block_height,
419            blinding_key: boltz_create_response.blinding_key.clone(),
420        };
421        Ok(res)
422    }
423}
424
425#[cfg(test)]
426mod tests {
427    use crate::test_utils::persist::{create_persister, new_send_swap};
428    use anyhow::{anyhow, Result};
429
430    use super::PaymentState;
431
432    #[cfg(feature = "browser-tests")]
433    wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
434
435    #[sdk_macros::test_all]
436    fn test_fetch_send_swap() -> Result<()> {
437        create_persister!(storage);
438        let send_swap = new_send_swap(None, None);
439
440        storage.insert_or_update_send_swap(&send_swap)?;
441        // Fetch swap by id
442        assert!(storage.fetch_send_swap_by_id(&send_swap.id).is_ok());
443        // Fetch swap by invoice
444        assert!(storage
445            .fetch_send_swap_by_invoice(&send_swap.invoice)
446            .is_ok());
447
448        Ok(())
449    }
450
451    #[sdk_macros::test_all]
452    fn test_list_send_swap() -> Result<()> {
453        create_persister!(storage);
454
455        // List general send swaps
456        let range = 0..3;
457        for _ in range.clone() {
458            storage.insert_or_update_send_swap(&new_send_swap(None, None))?;
459        }
460
461        let con = storage.get_connection()?;
462        let swaps = storage.list_send_swaps_where(&con, vec![])?;
463        assert_eq!(swaps.len(), range.len());
464
465        // List ongoing send swaps
466        storage.insert_or_update_send_swap(&new_send_swap(Some(PaymentState::Pending), None))?;
467        let ongoing_swaps = storage.list_ongoing_send_swaps()?;
468        assert_eq!(ongoing_swaps.len(), 4);
469
470        // List pending send swaps
471        let ongoing_swaps = storage.list_pending_send_swaps()?;
472        assert_eq!(ongoing_swaps.len(), 1);
473
474        Ok(())
475    }
476
477    #[sdk_macros::test_all]
478    fn test_update_send_swap() -> Result<()> {
479        create_persister!(storage);
480
481        let mut send_swap = new_send_swap(None, None);
482        storage.insert_or_update_send_swap(&send_swap)?;
483
484        // Update metadata
485        let new_state = PaymentState::Pending;
486        let preimage = Some("preimage");
487        let lockup_tx_id = Some("lockup_tx_id");
488        let refund_tx_id = Some("refund_tx_id");
489
490        storage.try_handle_send_swap_update(
491            &send_swap.id,
492            new_state,
493            preimage,
494            lockup_tx_id,
495            refund_tx_id,
496        )?;
497
498        let updated_send_swap = storage
499            .fetch_send_swap_by_id(&send_swap.id)?
500            .ok_or(anyhow!("Could not find Send swap in database"))?;
501
502        assert_eq!(new_state, updated_send_swap.state);
503        assert_eq!(preimage, updated_send_swap.preimage.as_deref());
504        assert_eq!(lockup_tx_id, updated_send_swap.lockup_tx_id.as_deref());
505        assert_eq!(refund_tx_id, updated_send_swap.refund_tx_id.as_deref());
506
507        send_swap.state = new_state;
508
509        // Update state (global)
510        let new_state = PaymentState::Complete;
511        storage.update_send_swaps_by_state(send_swap.state, PaymentState::Complete, None)?;
512        let updated_send_swap = storage
513            .fetch_send_swap_by_id(&send_swap.id)?
514            .ok_or(anyhow!("Could not find Send swap in database"))?;
515        assert_eq!(new_state, updated_send_swap.state);
516
517        Ok(())
518    }
519
520    #[sdk_macros::async_test_all]
521    async fn test_writing_stale_swap() -> Result<()> {
522        create_persister!(storage);
523
524        let send_swap = new_send_swap(None, None);
525        storage.insert_or_update_send_swap(&send_swap)?;
526
527        // read - update - write works if there are no updates in between
528        let mut send_swap = storage.fetch_send_swap_by_id(&send_swap.id)?.unwrap();
529        send_swap.refund_tx_id = Some("tx_id".to_string());
530        storage.insert_or_update_send_swap(&send_swap)?;
531
532        // read - update - write works if there are no updates in between even if no field changes
533        let send_swap = storage.fetch_send_swap_by_id(&send_swap.id)?.unwrap();
534        storage.insert_or_update_send_swap(&send_swap)?;
535
536        // read - update - write fails if there are any updates in between
537        let mut send_swap = storage.fetch_send_swap_by_id(&send_swap.id)?.unwrap();
538        send_swap.refund_tx_id = Some("tx_id_2".to_string());
539        // Concurrent update
540        storage.set_send_swap_lockup_tx_id(&send_swap.id, "tx_id")?;
541        assert!(storage.insert_or_update_send_swap(&send_swap).is_err());
542
543        Ok(())
544    }
545}