breez_sdk_liquid/
send_swap.rs

1use std::str::FromStr;
2use std::sync::Arc;
3use std::time::Duration;
4
5use anyhow::{anyhow, Result};
6use boltz_client::boltz::SubmarineClaimTxResponse;
7use boltz_client::swaps::boltz;
8use boltz_client::swaps::{boltz::CreateSubmarineResponse, boltz::SubSwapStates};
9use futures_util::TryFutureExt;
10use log::{debug, error, info, warn};
11use lwk_wollet::elements::{LockTime, Transaction};
12use lwk_wollet::hashes::{sha256, Hash};
13use sdk_common::prelude::{AesSuccessActionDataResult, SuccessAction, SuccessActionProcessed};
14use tokio::sync::broadcast;
15use web_time::{SystemTime, UNIX_EPOCH};
16
17use crate::chain::liquid::LiquidChainService;
18use crate::model::{
19    BlockListener, Config, PaymentState::*, SendSwap, LIQUID_FEE_RATE_MSAT_PER_VBYTE,
20};
21use crate::persist::model::{PaymentTxBalance, PaymentTxDetails};
22use crate::prelude::{PaymentTxData, PaymentType, Swap};
23use crate::recover::recoverer::Recoverer;
24use crate::swapper::Swapper;
25use crate::utils;
26use crate::wallet::OnchainWallet;
27use crate::{
28    error::PaymentError,
29    model::{PaymentState, Transaction as SdkTransaction},
30    persist::Persister,
31};
32
33#[derive(Clone)]
34pub(crate) struct SendSwapHandler {
35    config: Config,
36    onchain_wallet: Arc<dyn OnchainWallet>,
37    persister: std::sync::Arc<Persister>,
38    swapper: Arc<dyn Swapper>,
39    chain_service: Arc<dyn LiquidChainService>,
40    subscription_notifier: broadcast::Sender<String>,
41    recoverer: Arc<Recoverer>,
42}
43
44#[sdk_macros::async_trait]
45impl BlockListener for SendSwapHandler {
46    async fn on_bitcoin_block(&self, _height: u32) {}
47
48    async fn on_liquid_block(&self, _height: u32) {
49        if let Err(err) = self.check_refunds().await {
50            warn!("Could not refund expired swaps, error: {err:?}");
51        }
52    }
53}
54
55impl SendSwapHandler {
56    pub(crate) fn new(
57        config: Config,
58        onchain_wallet: Arc<dyn OnchainWallet>,
59        persister: std::sync::Arc<Persister>,
60        swapper: Arc<dyn Swapper>,
61        chain_service: Arc<dyn LiquidChainService>,
62        recoverer: Arc<Recoverer>,
63    ) -> Self {
64        let (subscription_notifier, _) = broadcast::channel::<String>(30);
65        Self {
66            config,
67            onchain_wallet,
68            persister,
69            swapper,
70            chain_service,
71            subscription_notifier,
72            recoverer,
73        }
74    }
75
76    pub(crate) fn subscribe_payment_updates(&self) -> broadcast::Receiver<String> {
77        self.subscription_notifier.subscribe()
78    }
79
80    /// Handles status updates from Boltz for Send swaps
81    pub(crate) async fn on_new_status(&self, update: &boltz::SwapStatus) -> Result<()> {
82        let id = &update.id;
83        let status = &update.status;
84        let swap_state = SubSwapStates::from_str(status)
85            .map_err(|_| anyhow!("Invalid SubSwapState for Send Swap {id}: {status}"))?;
86        let swap = self.fetch_send_swap_by_id(id)?;
87        info!("Handling Send Swap transition to {swap_state:?} for swap {id}");
88
89        // See https://docs.boltz.exchange/v/api/lifecycle#normal-submarine-swaps
90        match swap_state {
91            // Boltz has locked the HTLC
92            SubSwapStates::InvoiceSet => {
93                warn!("Received `invoice.set` state for Send Swap {id}");
94                Ok(())
95            }
96
97            // Boltz has detected the lockup in the mempool. If the swap is not to be batched
98            // we can speed up the claim by doing so cooperatively.
99            SubSwapStates::TransactionClaimPending => {
100                if swap.metadata.is_local {
101                    let preimage = match self.swapper.get_send_claim_tx_details(&swap).await {
102                        Ok(claim_tx_response) => {
103                            match self.cooperate_claim(&swap, claim_tx_response.clone()).await {
104                                Ok(_) => Some(claim_tx_response.preimage),
105                                Err(e) => {
106                                    warn!("Could not cooperate Send Swap {id} claim: {e:?}");
107                                    None
108                                }
109                            }
110                        }
111                        Err(e) => {
112                            warn!("Could not get claim tx details for Send Swap {id}: {e:?}");
113                            None
114                        }
115                    };
116                    let preimage = match preimage {
117                        Some(preimage) => preimage,
118                        None => {
119                            let preimage = self.swapper.get_submarine_preimage(&swap.id).await?;
120                            utils::verify_payment_hash(&preimage, &swap.invoice)?;
121                            info!("Fetched Send Swap {id} preimage cooperatively");
122                            preimage
123                        }
124                    };
125                    self.update_swap_info(&swap.id, Complete, Some(&preimage), None, None)?;
126                }
127
128                Ok(())
129            }
130
131            // Boltz announced they successfully broadcast the (cooperative or non-cooperative) claim tx
132            SubSwapStates::TransactionClaimed => {
133                debug!("Send Swap {id} has been claimed");
134
135                match swap.preimage {
136                    Some(_) => {
137                        debug!("The claim tx was a key path spend (cooperative claim)");
138                        // Preimage was already validated and stored, PaymentSucceeded event emitted,
139                        // when the cooperative claim was handled.
140                    }
141                    None => {
142                        debug!("The claim tx was a script path spend (non-cooperative claim)");
143                        let mut swaps = vec![Swap::Send(swap.clone())];
144                        self.recoverer
145                            .recover_from_onchain(&mut swaps, None)
146                            .await?;
147
148                        let Swap::Send(s) = swaps[0].clone() else {
149                            return Err(anyhow!("Expected a Send swap"));
150                        };
151                        self.update_swap(s)?;
152                    }
153                }
154
155                Ok(())
156            }
157
158            // If swap state is unrecoverable, either:
159            // 1. Boltz failed to pay
160            // 2. The swap has expired (>24h)
161            // 3. Lockup failed (we sent too little funds)
162            // We initiate a cooperative refund, and then fallback to a regular one
163            SubSwapStates::TransactionLockupFailed
164            | SubSwapStates::InvoiceFailedToPay
165            | SubSwapStates::SwapExpired => {
166                match swap.lockup_tx_id {
167                    Some(_) => match swap.refund_tx_id {
168                        Some(refund_tx_id) => warn!(
169                        "Refund tx for Send Swap {id} was already broadcast: txid {refund_tx_id}"
170                    ),
171                        None => {
172                            warn!("Send Swap {id} is in an unrecoverable state: {swap_state:?}, and lockup tx has been broadcast.");
173                            let refund_tx_id = match self.refund(&swap, true).await {
174                                Ok(refund_tx_id) => Some(refund_tx_id),
175                                Err(e) => {
176                                    warn!("Could not refund Send swap {id} cooperatively: {e:?}");
177                                    None
178                                }
179                            };
180                            // Set the payment state to `RefundPending`. This ensures that the
181                            // background thread will pick it up and try to refund it
182                            // periodically
183                            self.update_swap_info(
184                                &swap.id,
185                                RefundPending,
186                                None,
187                                None,
188                                refund_tx_id.as_deref(),
189                            )?;
190                        }
191                    },
192                    // Do not attempt broadcasting a refund if lockup tx was never sent and swap is
193                    // unrecoverable. We resolve the payment as failed.
194                    None => {
195                        warn!("Send Swap {id} is in an unrecoverable state: {swap_state:?}, and lockup tx has never been broadcast. Resolving payment as failed.");
196                        self.update_swap_info(id, Failed, None, None, None)?;
197                    }
198                }
199
200                Ok(())
201            }
202
203            _ => {
204                debug!("Unhandled state for Send Swap {id}: {swap_state:?}");
205                Ok(())
206            }
207        }
208    }
209
210    pub(crate) async fn try_lockup(
211        &self,
212        swap: &SendSwap,
213        create_response: &CreateSubmarineResponse,
214    ) -> Result<Transaction, PaymentError> {
215        if swap.lockup_tx_id.is_some() {
216            debug!("Lockup tx was already broadcast for Send Swap {}", swap.id);
217            return Err(PaymentError::PaymentInProgress);
218        }
219
220        let swap_id = &swap.id;
221        debug!(
222            "Initiated Send Swap: send {} sats to liquid address {}",
223            create_response.expected_amount, create_response.address
224        );
225
226        let lockup_tx = self
227            .onchain_wallet
228            .build_tx_or_drain_tx(
229                Some(LIQUID_FEE_RATE_MSAT_PER_VBYTE),
230                &create_response.address,
231                &self.config.lbtc_asset_id(),
232                create_response.expected_amount,
233            )
234            .await?;
235        let lockup_tx_id = lockup_tx.txid().to_string();
236
237        self.persister
238            .set_send_swap_lockup_tx_id(swap_id, &lockup_tx_id)?;
239
240        info!("Broadcasting lockup tx {lockup_tx_id} for Send swap {swap_id}",);
241
242        let broadcast_result = self.chain_service.broadcast(&lockup_tx).await;
243
244        if let Err(err) = broadcast_result {
245            debug!("Could not broadcast lockup tx for Send Swap {swap_id}: {err:?}");
246            self.persister
247                .unset_send_swap_lockup_tx_id(swap_id, &lockup_tx_id)?;
248            return Err(err.into());
249        }
250
251        info!("Successfully broadcast lockup tx for Send Swap {swap_id}. Lockup tx id: {lockup_tx_id}");
252
253        // We insert a pseudo-lockup-tx in case LWK fails to pick up the new mempool tx for a while
254        // This makes the tx known to the SDK (get_info, list_payments) instantly
255        let lockup_tx_fees_sat: u64 = lockup_tx.all_fees().values().sum();
256        self.persister.insert_or_update_payment(
257            PaymentTxData {
258                tx_id: lockup_tx_id.clone(),
259                timestamp: Some(utils::now()),
260                fees_sat: lockup_tx_fees_sat,
261                is_confirmed: false,
262                unblinding_data: None,
263            },
264            &[PaymentTxBalance {
265                asset_id: self.config.lbtc_asset_id(),
266                amount: create_response.expected_amount,
267                payment_type: PaymentType::Send,
268            }],
269            None,
270            false,
271        )?;
272
273        self.update_swap_info(swap_id, Pending, None, Some(&lockup_tx_id), None)?;
274
275        Ok(lockup_tx)
276    }
277
278    fn fetch_send_swap_by_id(&self, swap_id: &str) -> Result<SendSwap, PaymentError> {
279        self.persister
280            .fetch_send_swap_by_id(swap_id)
281            .map_err(|e| {
282                error!("Failed to fetch send swap by id: {e:?}");
283                PaymentError::PersistError
284            })?
285            .ok_or(PaymentError::Generic {
286                err: format!("Send Swap not found {swap_id}"),
287            })
288    }
289
290    // Updates the swap without state transition validation
291    pub(crate) fn update_swap(&self, updated_swap: SendSwap) -> Result<(), PaymentError> {
292        let swap = self.fetch_send_swap_by_id(&updated_swap.id)?;
293        let lnurl_info_updated = self.update_swap_lnurl_info(&swap, &updated_swap)?;
294        if updated_swap != swap || lnurl_info_updated {
295            info!(
296                "Updating Send swap {} to {:?} (lockup_tx_id = {:?}, refund_tx_id = {:?})",
297                updated_swap.id,
298                updated_swap.state,
299                updated_swap.lockup_tx_id,
300                updated_swap.refund_tx_id
301            );
302            self.persister.insert_or_update_send_swap(&updated_swap)?;
303            let _ = self.subscription_notifier.send(updated_swap.id);
304        }
305        Ok(())
306    }
307
308    pub(crate) fn update_swap_lnurl_info(
309        &self,
310        swap: &SendSwap,
311        updated_swap: &SendSwap,
312    ) -> Result<bool> {
313        if swap.preimage.is_none() {
314            let Some(tx_id) = updated_swap.lockup_tx_id.clone() else {
315                return Ok(false);
316            };
317            let Some(ref preimage_str) = updated_swap.preimage.clone() else {
318                return Ok(false);
319            };
320            if let Some(PaymentTxDetails {
321                destination,
322                description,
323                lnurl_info: Some(mut lnurl_info),
324                bip353_address,
325                ..
326            }) = self.persister.get_payment_details(&tx_id)?
327            {
328                if let Some(SuccessAction::Aes { data }) =
329                    lnurl_info.lnurl_pay_unprocessed_success_action.clone()
330                {
331                    debug!(
332                        "Decrypting AES success action with preimage for Send Swap {}",
333                        swap.id
334                    );
335                    let preimage = sha256::Hash::from_str(preimage_str)?;
336                    let preimage_arr = preimage.to_byte_array();
337                    let result = match (data, &preimage_arr).try_into() {
338                        Ok(data) => AesSuccessActionDataResult::Decrypted { data },
339                        Err(e) => AesSuccessActionDataResult::ErrorStatus {
340                            reason: e.to_string(),
341                        },
342                    };
343                    lnurl_info.lnurl_pay_success_action =
344                        Some(SuccessActionProcessed::Aes { result });
345                    self.persister
346                        .insert_or_update_payment_details(PaymentTxDetails {
347                            tx_id,
348                            destination,
349                            description,
350                            lnurl_info: Some(lnurl_info),
351                            bip353_address,
352                            ..Default::default()
353                        })?;
354                    return Ok(true);
355                }
356            }
357        }
358        Ok(false)
359    }
360
361    // Updates the swap state with validation
362    pub(crate) fn update_swap_info(
363        &self,
364        swap_id: &str,
365        to_state: PaymentState,
366        preimage: Option<&str>,
367        lockup_tx_id: Option<&str>,
368        refund_tx_id: Option<&str>,
369    ) -> Result<(), PaymentError> {
370        info!(
371            "Transitioning Send swap {swap_id} to {to_state:?} (lockup_tx_id = {lockup_tx_id:?}, refund_tx_id = {refund_tx_id:?})"
372        );
373        let swap = self.fetch_send_swap_by_id(swap_id)?;
374        Self::validate_state_transition(swap.state, to_state)?;
375        self.persister.try_handle_send_swap_update(
376            swap_id,
377            to_state,
378            preimage,
379            lockup_tx_id,
380            refund_tx_id,
381        )?;
382        let updated_swap = self.fetch_send_swap_by_id(swap_id)?;
383        let lnurl_info_updated = self.update_swap_lnurl_info(&swap, &updated_swap)?;
384        if updated_swap != swap || lnurl_info_updated {
385            let _ = self.subscription_notifier.send(updated_swap.id);
386        }
387        Ok(())
388    }
389
390    async fn cooperate_claim(
391        &self,
392        send_swap: &SendSwap,
393        claim_tx_response: SubmarineClaimTxResponse,
394    ) -> Result<(), PaymentError> {
395        debug!(
396            "Claim is pending for Send Swap {}. Initiating cooperative claim",
397            &send_swap.id
398        );
399        let refund_address = match send_swap.refund_address {
400            Some(ref refund_address) => refund_address.clone(),
401            None => {
402                // If no refund address is set, we get an unused one
403                let address = self.onchain_wallet.next_unused_address().await?.to_string();
404                self.persister
405                    .set_send_swap_refund_address(&send_swap.id, &address)?;
406                address
407            }
408        };
409
410        self.swapper
411            .claim_send_swap_cooperative(send_swap, claim_tx_response, &refund_address)
412            .await?;
413        Ok(())
414    }
415
416    pub(crate) async fn refund(
417        &self,
418        swap: &SendSwap,
419        is_cooperative: bool,
420    ) -> Result<String, PaymentError> {
421        info!(
422            "Initiating refund for Send Swap {}, is_cooperative: {is_cooperative}",
423            swap.id
424        );
425
426        let swap_script = swap.get_swap_script()?;
427        let refund_address = match swap.refund_address {
428            Some(ref refund_address) => refund_address.clone(),
429            None => {
430                // If no refund address is set, we get an unused one
431                let address = self.onchain_wallet.next_unused_address().await?.to_string();
432                self.persister
433                    .set_send_swap_refund_address(&swap.id, &address)?;
434                address
435            }
436        };
437
438        let script_pk = swap_script
439            .to_address(self.config.network.into())
440            .map_err(|e| anyhow!("Could not retrieve address from swap script: {e:?}"))?
441            .to_unconfidential()
442            .script_pubkey();
443        let utxos = self.chain_service.get_script_utxos(&script_pk).await?;
444        let SdkTransaction::Liquid(refund_tx) = self
445            .swapper
446            .create_refund_tx(
447                Swap::Send(swap.clone()),
448                &refund_address,
449                utxos,
450                None,
451                is_cooperative,
452            )
453            .await?
454        else {
455            return Err(PaymentError::Generic {
456                err: format!(
457                    "Unexpected refund tx type returned for Send swap {}",
458                    swap.id
459                ),
460            });
461        };
462        let refund_tx_id = self.chain_service.broadcast(&refund_tx).await?.to_string();
463
464        info!(
465            "Successfully broadcast refund for Send Swap {}, is_cooperative: {is_cooperative}",
466            swap.id
467        );
468
469        Ok(refund_tx_id)
470    }
471
472    async fn check_swap_expiry(&self, swap: &SendSwap) -> Result<bool> {
473        let swap_creation_time = UNIX_EPOCH + Duration::from_secs(swap.created_at as u64);
474        let duration_since_creation_time = SystemTime::now().duration_since(swap_creation_time)?;
475        if duration_since_creation_time.as_secs() < 60 * 10 {
476            return Ok(false);
477        }
478
479        let swap_script = swap.get_swap_script()?;
480        let current_height = self.onchain_wallet.tip().await;
481        let locktime_from_height = LockTime::from_height(current_height)?;
482
483        info!("Checking Send Swap {} expiration: locktime_from_height = {locktime_from_height:?},  swap_script.locktime = {:?}", swap.id, swap_script.locktime);
484        Ok(utils::is_locktime_expired(
485            locktime_from_height,
486            swap_script.locktime,
487        ))
488    }
489
490    // Attempts both cooperative and non-cooperative refunds, and updates the swap info accordingly
491    pub(crate) async fn try_refund_all(&self, swaps: &[SendSwap]) {
492        for swap in swaps {
493            if swap.refund_tx_id.is_some() {
494                continue;
495            }
496
497            let has_swap_expired = self.check_swap_expiry(swap).await.unwrap_or(false);
498
499            if !has_swap_expired && swap.state == Pending {
500                continue;
501            }
502
503            let refund_tx_id_result = match swap.state {
504                Pending => self.refund(swap, false).await,
505                RefundPending => match has_swap_expired {
506                    true => {
507                        self.refund(swap, true)
508                            .or_else(|e| {
509                                warn!("Failed to initiate cooperative refund, switching to non-cooperative: {e:?}");
510                                self.refund(swap, false)
511                            })
512                            .await
513                    }
514                    false => self.refund(swap, true).await,
515                },
516                _ => {
517                    continue;
518                }
519            };
520
521            if let Ok(refund_tx_id) = refund_tx_id_result {
522                let update_swap_info_result =
523                    self.update_swap_info(&swap.id, RefundPending, None, None, Some(&refund_tx_id));
524                if let Err(err) = update_swap_info_result {
525                    warn!(
526                        "Could not update Send swap {} information, error: {err:?}",
527                        swap.id
528                    );
529                };
530            }
531        }
532    }
533
534    // Attempts refunding all payments whose state is `RefundPending` and with no
535    // refund_tx_id field present
536    pub(crate) async fn check_refunds(&self) -> Result<(), PaymentError> {
537        let pending_swaps = self.persister.list_pending_send_swaps()?;
538        self.try_refund_all(&pending_swaps).await;
539        Ok(())
540    }
541
542    fn validate_state_transition(
543        from_state: PaymentState,
544        to_state: PaymentState,
545    ) -> Result<(), PaymentError> {
546        match (from_state, to_state) {
547            (TimedOut, Created) => Ok(()),
548            (_, Created) => Err(PaymentError::Generic {
549                err: format!("Cannot transition from {from_state:?} to Created state"),
550            }),
551
552            (Created | Pending, Pending) => Ok(()),
553            (_, Pending) => Err(PaymentError::Generic {
554                err: format!("Cannot transition from {from_state:?} to Pending state"),
555            }),
556
557            (Created | Pending, Complete) => Ok(()),
558            (_, Complete) => Err(PaymentError::Generic {
559                err: format!("Cannot transition from {from_state:?} to Complete state"),
560            }),
561
562            (Created | TimedOut, TimedOut) => Ok(()),
563            (_, TimedOut) => Err(PaymentError::Generic {
564                err: format!("Cannot transition from {from_state:?} to TimedOut state"),
565            }),
566
567            (_, Refundable) => Err(PaymentError::Generic {
568                err: format!("Cannot transition from {from_state:?} to Refundable state"),
569            }),
570
571            (Pending, RefundPending) => Ok(()),
572            (_, RefundPending) => Err(PaymentError::Generic {
573                err: format!("Cannot transition from {from_state:?} to RefundPending state"),
574            }),
575
576            (Complete, Failed) => Err(PaymentError::Generic {
577                err: format!("Cannot transition from {from_state:?} to Failed state"),
578            }),
579            (_, Failed) => Ok(()),
580
581            (_, WaitingFeeAcceptance) => Err(PaymentError::Generic {
582                err: format!("Cannot transition from {from_state:?} to WaitingFeeAcceptance state"),
583            }),
584        }
585    }
586}
587
588#[cfg(test)]
589mod tests {
590    use std::collections::{HashMap, HashSet};
591
592    use anyhow::Result;
593
594    use crate::{
595        model::PaymentState::{self, *},
596        test_utils::{
597            persist::{create_persister, new_send_swap},
598            send_swap::new_send_swap_handler,
599        },
600    };
601
602    #[cfg(feature = "browser-tests")]
603    wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
604
605    #[sdk_macros::async_test_all]
606    async fn test_send_swap_state_transitions() -> Result<()> {
607        create_persister!(storage);
608        let send_swap_handler = new_send_swap_handler(storage.clone())?;
609
610        // Test valid combinations of states
611        let valid_combinations = HashMap::from([
612            (
613                Created,
614                HashSet::from([Pending, Complete, TimedOut, Failed]),
615            ),
616            (
617                Pending,
618                HashSet::from([Pending, RefundPending, Complete, Failed]),
619            ),
620            (TimedOut, HashSet::from([TimedOut, Created, Failed])),
621            (Complete, HashSet::from([])),
622            (Refundable, HashSet::from([Failed])),
623            (Failed, HashSet::from([Failed])),
624        ]);
625
626        for (first_state, allowed_states) in valid_combinations.iter() {
627            for allowed_state in allowed_states {
628                let send_swap = new_send_swap(Some(*first_state), None);
629                storage.insert_or_update_send_swap(&send_swap)?;
630
631                assert!(send_swap_handler
632                    .update_swap_info(&send_swap.id, *allowed_state, None, None, None)
633                    .is_ok());
634            }
635        }
636
637        // Test invalid combinations of states
638        let all_states = HashSet::from([Created, Pending, Complete, TimedOut, Failed]);
639        let invalid_combinations: HashMap<PaymentState, HashSet<PaymentState>> = valid_combinations
640            .iter()
641            .map(|(first_state, allowed_states)| {
642                (
643                    *first_state,
644                    all_states.difference(allowed_states).cloned().collect(),
645                )
646            })
647            .collect();
648
649        for (first_state, disallowed_states) in invalid_combinations.iter() {
650            for disallowed_state in disallowed_states {
651                let send_swap = new_send_swap(Some(*first_state), None);
652                storage.insert_or_update_send_swap(&send_swap)?;
653
654                assert!(send_swap_handler
655                    .update_swap_info(&send_swap.id, *disallowed_state, None, None, None)
656                    .is_err());
657            }
658        }
659
660        Ok(())
661    }
662}