breez_sdk_liquid/
send_swap.rs

1use std::str::FromStr;
2use std::time::Duration;
3
4use anyhow::{anyhow, Result};
5use boltz_client::boltz::SubmarineClaimTxResponse;
6use boltz_client::swaps::boltz;
7use boltz_client::swaps::{boltz::CreateSubmarineResponse, boltz::SubSwapStates};
8use futures_util::TryFutureExt;
9use log::{debug, info, warn};
10use lwk_wollet::elements::{LockTime, Transaction};
11use lwk_wollet::hashes::{sha256, Hash};
12use sdk_common::prelude::{AesSuccessActionDataResult, SuccessAction, SuccessActionProcessed};
13use sdk_common::utils::Arc;
14use tokio::sync::broadcast;
15use web_time::{SystemTime, UNIX_EPOCH};
16
17use crate::chain::liquid::LiquidChainService;
18use crate::model::{
19    BlockListener, Config, PaymentState::*, SendSwap, LIQUID_FEE_RATE_MSAT_PER_VBYTE,
20};
21use crate::persist::model::{PaymentTxBalance, PaymentTxDetails};
22use crate::prelude::{PaymentTxData, PaymentType, Swap};
23use crate::recover::recoverer::Recoverer;
24use crate::swapper::Swapper;
25use crate::utils;
26use crate::wallet::OnchainWallet;
27use crate::{
28    error::PaymentError,
29    model::{PaymentState, Transaction as SdkTransaction},
30    persist::Persister,
31};
32
33#[derive(Clone)]
34pub(crate) struct SendSwapHandler {
35    config: Config,
36    onchain_wallet: Arc<dyn OnchainWallet>,
37    persister: std::sync::Arc<Persister>,
38    swapper: Arc<dyn Swapper>,
39    chain_service: Arc<dyn LiquidChainService>,
40    subscription_notifier: broadcast::Sender<String>,
41    recoverer: Arc<Recoverer>,
42}
43
44#[sdk_macros::async_trait]
45impl BlockListener for SendSwapHandler {
46    async fn on_bitcoin_block(&self, _height: u32) {}
47
48    async fn on_liquid_block(&self, _height: u32) {
49        if let Err(err) = self.check_refunds().await {
50            warn!("Could not refund expired swaps, error: {err:?}");
51        }
52    }
53}
54
55impl SendSwapHandler {
56    pub(crate) fn new(
57        config: Config,
58        onchain_wallet: Arc<dyn OnchainWallet>,
59        persister: std::sync::Arc<Persister>,
60        swapper: Arc<dyn Swapper>,
61        chain_service: Arc<dyn LiquidChainService>,
62        recoverer: Arc<Recoverer>,
63    ) -> Self {
64        let (subscription_notifier, _) = broadcast::channel::<String>(30);
65        Self {
66            config,
67            onchain_wallet,
68            persister,
69            swapper,
70            chain_service,
71            subscription_notifier,
72            recoverer,
73        }
74    }
75
76    pub(crate) fn subscribe_payment_updates(&self) -> broadcast::Receiver<String> {
77        self.subscription_notifier.subscribe()
78    }
79
80    /// Handles status updates from Boltz for Send swaps
81    pub(crate) async fn on_new_status(&self, update: &boltz::SwapStatus) -> Result<()> {
82        let id = &update.id;
83        let status = &update.status;
84        let swap_state = SubSwapStates::from_str(status)
85            .map_err(|_| anyhow!("Invalid SubSwapState for Send Swap {id}: {status}"))?;
86        let swap = self.fetch_send_swap_by_id(id)?;
87        info!("Handling Send Swap transition to {swap_state:?} for swap {id}");
88
89        // See https://docs.boltz.exchange/v/api/lifecycle#normal-submarine-swaps
90        match swap_state {
91            // Boltz has locked the HTLC
92            SubSwapStates::InvoiceSet => {
93                warn!("Received `invoice.set` state for Send Swap {id}");
94                Ok(())
95            }
96
97            // Boltz has detected the lockup in the mempool. If the swap is not to be batched
98            // we can speed up the claim by doing so cooperatively.
99            SubSwapStates::TransactionClaimPending => {
100                if swap.metadata.is_local {
101                    let preimage = match self.swapper.get_send_claim_tx_details(&swap).await {
102                        Ok(claim_tx_response) => {
103                            match self.cooperate_claim(&swap, claim_tx_response.clone()).await {
104                                Ok(_) => Some(claim_tx_response.preimage),
105                                Err(e) => {
106                                    warn!("Could not cooperate Send Swap {id} claim: {e:?}");
107                                    None
108                                }
109                            }
110                        }
111                        Err(e) => {
112                            warn!("Could not get claim tx details for Send Swap {id}: {e:?}");
113                            None
114                        }
115                    };
116                    let preimage = match preimage {
117                        Some(preimage) => preimage,
118                        None => {
119                            let preimage = self.swapper.get_submarine_preimage(&swap.id).await?;
120                            utils::verify_payment_hash(&preimage, &swap.invoice)?;
121                            info!("Fetched Send Swap {id} preimage cooperatively");
122                            preimage
123                        }
124                    };
125                    self.update_swap_info(&swap.id, Complete, Some(&preimage), None, None)?;
126                }
127
128                Ok(())
129            }
130
131            // Boltz announced they successfully broadcast the (cooperative or non-cooperative) claim tx
132            SubSwapStates::TransactionClaimed => {
133                debug!("Send Swap {id} has been claimed");
134
135                match swap.preimage {
136                    Some(_) => {
137                        debug!("The claim tx was a key path spend (cooperative claim)");
138                        // Preimage was already validated and stored, PaymentSucceeded event emitted,
139                        // when the cooperative claim was handled.
140                    }
141                    None => {
142                        debug!("The claim tx was a script path spend (non-cooperative claim)");
143                        let mut swaps = vec![Swap::Send(swap.clone())];
144                        self.recoverer
145                            .recover_from_onchain(&mut swaps, None)
146                            .await?;
147
148                        let Swap::Send(s) = swaps[0].clone() else {
149                            return Err(anyhow!("Expected a Send swap"));
150                        };
151                        self.update_swap(s)?;
152                    }
153                }
154
155                Ok(())
156            }
157
158            // If swap state is unrecoverable, either:
159            // 1. Boltz failed to pay
160            // 2. The swap has expired (>24h)
161            // 3. Lockup failed (we sent too little funds)
162            // We initiate a cooperative refund, and then fallback to a regular one
163            SubSwapStates::TransactionLockupFailed
164            | SubSwapStates::InvoiceFailedToPay
165            | SubSwapStates::SwapExpired => {
166                match swap.lockup_tx_id {
167                    Some(_) => match swap.refund_tx_id {
168                        Some(refund_tx_id) => warn!(
169                        "Refund tx for Send Swap {id} was already broadcast: txid {refund_tx_id}"
170                    ),
171                        None => {
172                            warn!("Send Swap {id} is in an unrecoverable state: {swap_state:?}, and lockup tx has been broadcast.");
173                            let refund_tx_id = match self.refund(&swap, true).await {
174                                Ok(refund_tx_id) => Some(refund_tx_id),
175                                Err(e) => {
176                                    warn!("Could not refund Send swap {id} cooperatively: {e:?}");
177                                    None
178                                }
179                            };
180                            // Set the payment state to `RefundPending`. This ensures that the
181                            // background thread will pick it up and try to refund it
182                            // periodically
183                            self.update_swap_info(
184                                &swap.id,
185                                RefundPending,
186                                None,
187                                None,
188                                refund_tx_id.as_deref(),
189                            )?;
190                        }
191                    },
192                    // Do not attempt broadcasting a refund if lockup tx was never sent and swap is
193                    // unrecoverable. We resolve the payment as failed.
194                    None => {
195                        warn!("Send Swap {id} is in an unrecoverable state: {swap_state:?}, and lockup tx has never been broadcast. Resolving payment as failed.");
196                        self.update_swap_info(id, Failed, None, None, None)?;
197                    }
198                }
199
200                Ok(())
201            }
202
203            _ => {
204                debug!("Unhandled state for Send Swap {id}: {swap_state:?}");
205                Ok(())
206            }
207        }
208    }
209
210    pub(crate) async fn try_lockup(
211        &self,
212        swap: &SendSwap,
213        create_response: &CreateSubmarineResponse,
214    ) -> Result<Transaction, PaymentError> {
215        if swap.lockup_tx_id.is_some() {
216            debug!("Lockup tx was already broadcast for Send Swap {}", swap.id);
217            return Err(PaymentError::PaymentInProgress);
218        }
219
220        let swap_id = &swap.id;
221        debug!(
222            "Initiated Send Swap: send {} sats to liquid address {}",
223            create_response.expected_amount, create_response.address
224        );
225
226        let lockup_tx = self
227            .onchain_wallet
228            .build_tx_or_drain_tx(
229                Some(LIQUID_FEE_RATE_MSAT_PER_VBYTE),
230                &create_response.address,
231                &self.config.lbtc_asset_id(),
232                create_response.expected_amount,
233            )
234            .await?;
235        let lockup_tx_id = lockup_tx.txid().to_string();
236
237        self.persister
238            .set_send_swap_lockup_tx_id(swap_id, &lockup_tx_id)?;
239
240        info!("Broadcasting lockup tx {lockup_tx_id} for Send swap {swap_id}",);
241
242        let broadcast_result = self.chain_service.broadcast(&lockup_tx).await;
243
244        if let Err(err) = broadcast_result {
245            debug!("Could not broadcast lockup tx for Send Swap {swap_id}: {err:?}");
246            self.persister
247                .unset_send_swap_lockup_tx_id(swap_id, &lockup_tx_id)?;
248            return Err(err.into());
249        }
250
251        info!("Successfully broadcast lockup tx for Send Swap {swap_id}. Lockup tx id: {lockup_tx_id}");
252
253        // We insert a pseudo-lockup-tx in case LWK fails to pick up the new mempool tx for a while
254        // This makes the tx known to the SDK (get_info, list_payments) instantly
255        let lockup_tx_fees_sat: u64 = lockup_tx.all_fees().values().sum();
256        self.persister.insert_or_update_payment(
257            PaymentTxData {
258                tx_id: lockup_tx_id.clone(),
259                timestamp: Some(utils::now()),
260                fees_sat: lockup_tx_fees_sat,
261                is_confirmed: false,
262                unblinding_data: None,
263            },
264            &[PaymentTxBalance {
265                asset_id: self.config.lbtc_asset_id(),
266                amount: create_response.expected_amount,
267                payment_type: PaymentType::Send,
268            }],
269            None,
270            false,
271        )?;
272
273        self.update_swap_info(swap_id, Pending, None, Some(&lockup_tx_id), None)?;
274
275        Ok(lockup_tx)
276    }
277
278    fn fetch_send_swap_by_id(&self, swap_id: &str) -> Result<SendSwap, PaymentError> {
279        self.persister
280            .fetch_send_swap_by_id(swap_id)
281            .map_err(|_| PaymentError::PersistError)?
282            .ok_or(PaymentError::Generic {
283                err: format!("Send Swap not found {swap_id}"),
284            })
285    }
286
287    // Updates the swap without state transition validation
288    pub(crate) fn update_swap(&self, updated_swap: SendSwap) -> Result<(), PaymentError> {
289        let swap = self.fetch_send_swap_by_id(&updated_swap.id)?;
290        let lnurl_info_updated = self.update_swap_lnurl_info(&swap, &updated_swap)?;
291        if updated_swap != swap || lnurl_info_updated {
292            info!(
293                "Updating Send swap {} to {:?} (lockup_tx_id = {:?}, refund_tx_id = {:?})",
294                updated_swap.id,
295                updated_swap.state,
296                updated_swap.lockup_tx_id,
297                updated_swap.refund_tx_id
298            );
299            self.persister.insert_or_update_send_swap(&updated_swap)?;
300            let _ = self.subscription_notifier.send(updated_swap.id);
301        }
302        Ok(())
303    }
304
305    pub(crate) fn update_swap_lnurl_info(
306        &self,
307        swap: &SendSwap,
308        updated_swap: &SendSwap,
309    ) -> Result<bool> {
310        if swap.preimage.is_none() {
311            let Some(tx_id) = updated_swap.lockup_tx_id.clone() else {
312                return Ok(false);
313            };
314            let Some(ref preimage_str) = updated_swap.preimage.clone() else {
315                return Ok(false);
316            };
317            if let Some(PaymentTxDetails {
318                destination,
319                description,
320                lnurl_info: Some(mut lnurl_info),
321                bip353_address,
322                ..
323            }) = self.persister.get_payment_details(&tx_id)?
324            {
325                if let Some(SuccessAction::Aes { data }) =
326                    lnurl_info.lnurl_pay_unprocessed_success_action.clone()
327                {
328                    debug!(
329                        "Decrypting AES success action with preimage for Send Swap {}",
330                        swap.id
331                    );
332                    let preimage = sha256::Hash::from_str(preimage_str)?;
333                    let preimage_arr = preimage.to_byte_array();
334                    let result = match (data, &preimage_arr).try_into() {
335                        Ok(data) => AesSuccessActionDataResult::Decrypted { data },
336                        Err(e) => AesSuccessActionDataResult::ErrorStatus {
337                            reason: e.to_string(),
338                        },
339                    };
340                    lnurl_info.lnurl_pay_success_action =
341                        Some(SuccessActionProcessed::Aes { result });
342                    self.persister
343                        .insert_or_update_payment_details(PaymentTxDetails {
344                            tx_id,
345                            destination,
346                            description,
347                            lnurl_info: Some(lnurl_info),
348                            bip353_address,
349                            ..Default::default()
350                        })?;
351                    return Ok(true);
352                }
353            }
354        }
355        Ok(false)
356    }
357
358    // Updates the swap state with validation
359    pub(crate) fn update_swap_info(
360        &self,
361        swap_id: &str,
362        to_state: PaymentState,
363        preimage: Option<&str>,
364        lockup_tx_id: Option<&str>,
365        refund_tx_id: Option<&str>,
366    ) -> Result<(), PaymentError> {
367        info!(
368            "Transitioning Send swap {swap_id} to {to_state:?} (lockup_tx_id = {lockup_tx_id:?}, refund_tx_id = {refund_tx_id:?})"
369        );
370        let swap = self.fetch_send_swap_by_id(swap_id)?;
371        Self::validate_state_transition(swap.state, to_state)?;
372        self.persister.try_handle_send_swap_update(
373            swap_id,
374            to_state,
375            preimage,
376            lockup_tx_id,
377            refund_tx_id,
378        )?;
379        let updated_swap = self.fetch_send_swap_by_id(swap_id)?;
380        let lnurl_info_updated = self.update_swap_lnurl_info(&swap, &updated_swap)?;
381        if updated_swap != swap || lnurl_info_updated {
382            let _ = self.subscription_notifier.send(updated_swap.id);
383        }
384        Ok(())
385    }
386
387    async fn cooperate_claim(
388        &self,
389        send_swap: &SendSwap,
390        claim_tx_response: SubmarineClaimTxResponse,
391    ) -> Result<(), PaymentError> {
392        debug!(
393            "Claim is pending for Send Swap {}. Initiating cooperative claim",
394            &send_swap.id
395        );
396        let refund_address = match send_swap.refund_address {
397            Some(ref refund_address) => refund_address.clone(),
398            None => {
399                // If no refund address is set, we get an unused one
400                let address = self.onchain_wallet.next_unused_address().await?.to_string();
401                self.persister
402                    .set_send_swap_refund_address(&send_swap.id, &address)?;
403                address
404            }
405        };
406
407        self.swapper
408            .claim_send_swap_cooperative(send_swap, claim_tx_response, &refund_address)
409            .await?;
410        Ok(())
411    }
412
413    pub(crate) async fn refund(
414        &self,
415        swap: &SendSwap,
416        is_cooperative: bool,
417    ) -> Result<String, PaymentError> {
418        info!(
419            "Initiating refund for Send Swap {}, is_cooperative: {is_cooperative}",
420            swap.id
421        );
422
423        let swap_script = swap.get_swap_script()?;
424        let refund_address = match swap.refund_address {
425            Some(ref refund_address) => refund_address.clone(),
426            None => {
427                // If no refund address is set, we get an unused one
428                let address = self.onchain_wallet.next_unused_address().await?.to_string();
429                self.persister
430                    .set_send_swap_refund_address(&swap.id, &address)?;
431                address
432            }
433        };
434
435        let script_pk = swap_script
436            .to_address(self.config.network.into())
437            .map_err(|e| anyhow!("Could not retrieve address from swap script: {e:?}"))?
438            .to_unconfidential()
439            .script_pubkey();
440        let utxos = self.chain_service.get_script_utxos(&script_pk).await?;
441        let SdkTransaction::Liquid(refund_tx) = self
442            .swapper
443            .create_refund_tx(
444                Swap::Send(swap.clone()),
445                &refund_address,
446                utxos,
447                None,
448                is_cooperative,
449            )
450            .await?
451        else {
452            return Err(PaymentError::Generic {
453                err: format!(
454                    "Unexpected refund tx type returned for Send swap {}",
455                    swap.id
456                ),
457            });
458        };
459        let refund_tx_id = self.chain_service.broadcast(&refund_tx).await?.to_string();
460
461        info!(
462            "Successfully broadcast refund for Send Swap {}, is_cooperative: {is_cooperative}",
463            swap.id
464        );
465
466        Ok(refund_tx_id)
467    }
468
469    async fn check_swap_expiry(&self, swap: &SendSwap) -> Result<bool> {
470        let swap_creation_time = UNIX_EPOCH + Duration::from_secs(swap.created_at as u64);
471        let duration_since_creation_time = SystemTime::now().duration_since(swap_creation_time)?;
472        if duration_since_creation_time.as_secs() < 60 * 10 {
473            return Ok(false);
474        }
475
476        let swap_script = swap.get_swap_script()?;
477        let current_height = self.onchain_wallet.tip().await;
478        let locktime_from_height = LockTime::from_height(current_height)?;
479
480        info!("Checking Send Swap {} expiration: locktime_from_height = {locktime_from_height:?},  swap_script.locktime = {:?}", swap.id, swap_script.locktime);
481        Ok(utils::is_locktime_expired(
482            locktime_from_height,
483            swap_script.locktime,
484        ))
485    }
486
487    // Attempts both cooperative and non-cooperative refunds, and updates the swap info accordingly
488    pub(crate) async fn try_refund_all(&self, swaps: &[SendSwap]) {
489        for swap in swaps {
490            if swap.refund_tx_id.is_some() {
491                continue;
492            }
493
494            let has_swap_expired = self.check_swap_expiry(swap).await.unwrap_or(false);
495
496            if !has_swap_expired && swap.state == Pending {
497                continue;
498            }
499
500            let refund_tx_id_result = match swap.state {
501                Pending => self.refund(swap, false).await,
502                RefundPending => match has_swap_expired {
503                    true => {
504                        self.refund(swap, true)
505                            .or_else(|e| {
506                                warn!("Failed to initiate cooperative refund, switching to non-cooperative: {e:?}");
507                                self.refund(swap, false)
508                            })
509                            .await
510                    }
511                    false => self.refund(swap, true).await,
512                },
513                _ => {
514                    continue;
515                }
516            };
517
518            if let Ok(refund_tx_id) = refund_tx_id_result {
519                let update_swap_info_result =
520                    self.update_swap_info(&swap.id, RefundPending, None, None, Some(&refund_tx_id));
521                if let Err(err) = update_swap_info_result {
522                    warn!(
523                        "Could not update Send swap {} information, error: {err:?}",
524                        swap.id
525                    );
526                };
527            }
528        }
529    }
530
531    // Attempts refunding all payments whose state is `RefundPending` and with no
532    // refund_tx_id field present
533    pub(crate) async fn check_refunds(&self) -> Result<(), PaymentError> {
534        let pending_swaps = self.persister.list_pending_send_swaps()?;
535        self.try_refund_all(&pending_swaps).await;
536        Ok(())
537    }
538
539    fn validate_state_transition(
540        from_state: PaymentState,
541        to_state: PaymentState,
542    ) -> Result<(), PaymentError> {
543        match (from_state, to_state) {
544            (TimedOut, Created) => Ok(()),
545            (_, Created) => Err(PaymentError::Generic {
546                err: format!("Cannot transition from {from_state:?} to Created state"),
547            }),
548
549            (Created | Pending, Pending) => Ok(()),
550            (_, Pending) => Err(PaymentError::Generic {
551                err: format!("Cannot transition from {from_state:?} to Pending state"),
552            }),
553
554            (Created | Pending, Complete) => Ok(()),
555            (_, Complete) => Err(PaymentError::Generic {
556                err: format!("Cannot transition from {from_state:?} to Complete state"),
557            }),
558
559            (Created | TimedOut, TimedOut) => Ok(()),
560            (_, TimedOut) => Err(PaymentError::Generic {
561                err: format!("Cannot transition from {from_state:?} to TimedOut state"),
562            }),
563
564            (_, Refundable) => Err(PaymentError::Generic {
565                err: format!("Cannot transition from {from_state:?} to Refundable state"),
566            }),
567
568            (Pending, RefundPending) => Ok(()),
569            (_, RefundPending) => Err(PaymentError::Generic {
570                err: format!("Cannot transition from {from_state:?} to RefundPending state"),
571            }),
572
573            (Complete, Failed) => Err(PaymentError::Generic {
574                err: format!("Cannot transition from {from_state:?} to Failed state"),
575            }),
576            (_, Failed) => Ok(()),
577
578            (_, WaitingFeeAcceptance) => Err(PaymentError::Generic {
579                err: format!("Cannot transition from {from_state:?} to WaitingFeeAcceptance state"),
580            }),
581        }
582    }
583}
584
585#[cfg(test)]
586mod tests {
587    use std::collections::{HashMap, HashSet};
588
589    use anyhow::Result;
590
591    use crate::{
592        model::PaymentState::{self, *},
593        test_utils::{
594            persist::{create_persister, new_send_swap},
595            send_swap::new_send_swap_handler,
596        },
597    };
598
599    #[cfg(feature = "browser-tests")]
600    wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
601
602    #[sdk_macros::async_test_all]
603    async fn test_send_swap_state_transitions() -> Result<()> {
604        create_persister!(storage);
605        let send_swap_handler = new_send_swap_handler(storage.clone())?;
606
607        // Test valid combinations of states
608        let valid_combinations = HashMap::from([
609            (
610                Created,
611                HashSet::from([Pending, Complete, TimedOut, Failed]),
612            ),
613            (
614                Pending,
615                HashSet::from([Pending, RefundPending, Complete, Failed]),
616            ),
617            (TimedOut, HashSet::from([TimedOut, Created, Failed])),
618            (Complete, HashSet::from([])),
619            (Refundable, HashSet::from([Failed])),
620            (Failed, HashSet::from([Failed])),
621        ]);
622
623        for (first_state, allowed_states) in valid_combinations.iter() {
624            for allowed_state in allowed_states {
625                let send_swap = new_send_swap(Some(*first_state), None);
626                storage.insert_or_update_send_swap(&send_swap)?;
627
628                assert!(send_swap_handler
629                    .update_swap_info(&send_swap.id, *allowed_state, None, None, None)
630                    .is_ok());
631            }
632        }
633
634        // Test invalid combinations of states
635        let all_states = HashSet::from([Created, Pending, Complete, TimedOut, Failed]);
636        let invalid_combinations: HashMap<PaymentState, HashSet<PaymentState>> = valid_combinations
637            .iter()
638            .map(|(first_state, allowed_states)| {
639                (
640                    *first_state,
641                    all_states.difference(allowed_states).cloned().collect(),
642                )
643            })
644            .collect();
645
646        for (first_state, disallowed_states) in invalid_combinations.iter() {
647            for disallowed_state in disallowed_states {
648                let send_swap = new_send_swap(Some(*first_state), None);
649                storage.insert_or_update_send_swap(&send_swap)?;
650
651                assert!(send_swap_handler
652                    .update_swap_info(&send_swap.id, *disallowed_state, None, None, None)
653                    .is_err());
654            }
655        }
656
657        Ok(())
658    }
659}