breez_sdk_liquid/
send_swap.rs

1use std::str::FromStr;
2use std::time::Duration;
3
4use anyhow::{anyhow, Result};
5use boltz_client::boltz::SubmarineClaimTxResponse;
6use boltz_client::swaps::boltz;
7use boltz_client::swaps::{boltz::CreateSubmarineResponse, boltz::SubSwapStates};
8use futures_util::TryFutureExt;
9use log::{debug, info, warn};
10use lwk_wollet::elements::{LockTime, Transaction};
11use lwk_wollet::hashes::{sha256, Hash};
12use sdk_common::prelude::{AesSuccessActionDataResult, SuccessAction, SuccessActionProcessed};
13use sdk_common::utils::Arc;
14use tokio::sync::broadcast;
15use web_time::{SystemTime, UNIX_EPOCH};
16
17use crate::chain::liquid::LiquidChainService;
18use crate::model::{
19    BlockListener, Config, PaymentState::*, SendSwap, LIQUID_FEE_RATE_MSAT_PER_VBYTE,
20};
21use crate::persist::model::PaymentTxDetails;
22use crate::prelude::{PaymentTxData, PaymentType, Swap};
23use crate::recover::recoverer::Recoverer;
24use crate::swapper::Swapper;
25use crate::utils;
26use crate::wallet::OnchainWallet;
27use crate::{
28    error::PaymentError,
29    model::{PaymentState, Transaction as SdkTransaction},
30    persist::Persister,
31};
32
33#[derive(Clone)]
34pub(crate) struct SendSwapHandler {
35    config: Config,
36    onchain_wallet: Arc<dyn OnchainWallet>,
37    persister: std::sync::Arc<Persister>,
38    swapper: Arc<dyn Swapper>,
39    chain_service: Arc<dyn LiquidChainService>,
40    subscription_notifier: broadcast::Sender<String>,
41    recoverer: Arc<Recoverer>,
42}
43
44#[sdk_macros::async_trait]
45impl BlockListener for SendSwapHandler {
46    async fn on_bitcoin_block(&self, _height: u32) {}
47
48    async fn on_liquid_block(&self, _height: u32) {
49        if let Err(err) = self.check_refunds().await {
50            warn!("Could not refund expired swaps, error: {err:?}");
51        }
52    }
53}
54
55impl SendSwapHandler {
56    pub(crate) fn new(
57        config: Config,
58        onchain_wallet: Arc<dyn OnchainWallet>,
59        persister: std::sync::Arc<Persister>,
60        swapper: Arc<dyn Swapper>,
61        chain_service: Arc<dyn LiquidChainService>,
62        recoverer: Arc<Recoverer>,
63    ) -> Self {
64        let (subscription_notifier, _) = broadcast::channel::<String>(30);
65        Self {
66            config,
67            onchain_wallet,
68            persister,
69            swapper,
70            chain_service,
71            subscription_notifier,
72            recoverer,
73        }
74    }
75
76    pub(crate) fn subscribe_payment_updates(&self) -> broadcast::Receiver<String> {
77        self.subscription_notifier.subscribe()
78    }
79
80    /// Handles status updates from Boltz for Send swaps
81    pub(crate) async fn on_new_status(&self, update: &boltz::SwapStatus) -> Result<()> {
82        let id = &update.id;
83        let status = &update.status;
84        let swap_state = SubSwapStates::from_str(status)
85            .map_err(|_| anyhow!("Invalid SubSwapState for Send Swap {id}: {status}"))?;
86        let swap = self.fetch_send_swap_by_id(id)?;
87        info!("Handling Send Swap transition to {swap_state:?} for swap {id}");
88
89        // See https://docs.boltz.exchange/v/api/lifecycle#normal-submarine-swaps
90        match swap_state {
91            // Boltz has locked the HTLC
92            SubSwapStates::InvoiceSet => {
93                warn!("Received `invoice.set` state for Send Swap {id}");
94                Ok(())
95            }
96
97            // Boltz has detected the lockup in the mempool. If the swap is not to be batched
98            // we can speed up the claim by doing so cooperatively.
99            SubSwapStates::TransactionClaimPending => {
100                if swap.metadata.is_local {
101                    let preimage = match self.swapper.get_send_claim_tx_details(&swap).await {
102                        Ok(claim_tx_response) => {
103                            match self.cooperate_claim(&swap, claim_tx_response.clone()).await {
104                                Ok(_) => Some(claim_tx_response.preimage),
105                                Err(e) => {
106                                    warn!("Could not cooperate Send Swap {id} claim: {e:?}");
107                                    None
108                                }
109                            }
110                        }
111                        Err(e) => {
112                            warn!("Could not get claim tx details for Send Swap {id}: {e:?}");
113                            None
114                        }
115                    };
116                    let preimage = match preimage {
117                        Some(preimage) => preimage,
118                        None => {
119                            let preimage = self.swapper.get_submarine_preimage(&swap.id).await?;
120                            utils::verify_payment_hash(&preimage, &swap.invoice)?;
121                            info!("Fetched Send Swap {id} preimage cooperatively");
122                            preimage
123                        }
124                    };
125                    self.update_swap_info(&swap.id, Complete, Some(&preimage), None, None)?;
126                }
127
128                Ok(())
129            }
130
131            // Boltz announced they successfully broadcast the (cooperative or non-cooperative) claim tx
132            SubSwapStates::TransactionClaimed => {
133                debug!("Send Swap {id} has been claimed");
134
135                match swap.preimage {
136                    Some(_) => {
137                        debug!("The claim tx was a key path spend (cooperative claim)");
138                        // Preimage was already validated and stored, PaymentSucceeded event emitted,
139                        // when the cooperative claim was handled.
140                    }
141                    None => {
142                        debug!("The claim tx was a script path spend (non-cooperative claim)");
143                        let mut swaps = vec![Swap::Send(swap.clone())];
144                        self.recoverer
145                            .recover_from_onchain(&mut swaps, None)
146                            .await?;
147
148                        let Swap::Send(s) = swaps[0].clone() else {
149                            return Err(anyhow!("Expected a Send swap"));
150                        };
151                        self.update_swap(s)?;
152                    }
153                }
154
155                Ok(())
156            }
157
158            // If swap state is unrecoverable, either:
159            // 1. Boltz failed to pay
160            // 2. The swap has expired (>24h)
161            // 3. Lockup failed (we sent too little funds)
162            // We initiate a cooperative refund, and then fallback to a regular one
163            SubSwapStates::TransactionLockupFailed
164            | SubSwapStates::InvoiceFailedToPay
165            | SubSwapStates::SwapExpired => {
166                match swap.lockup_tx_id {
167                    Some(_) => match swap.refund_tx_id {
168                        Some(refund_tx_id) => warn!(
169                        "Refund tx for Send Swap {id} was already broadcast: txid {refund_tx_id}"
170                    ),
171                        None => {
172                            warn!("Send Swap {id} is in an unrecoverable state: {swap_state:?}, and lockup tx has been broadcast.");
173                            let refund_tx_id = match self.refund(&swap, true).await {
174                                Ok(refund_tx_id) => Some(refund_tx_id),
175                                Err(e) => {
176                                    warn!("Could not refund Send swap {id} cooperatively: {e:?}");
177                                    None
178                                }
179                            };
180                            // Set the payment state to `RefundPending`. This ensures that the
181                            // background thread will pick it up and try to refund it
182                            // periodically
183                            self.update_swap_info(
184                                &swap.id,
185                                RefundPending,
186                                None,
187                                None,
188                                refund_tx_id.as_deref(),
189                            )?;
190                        }
191                    },
192                    // Do not attempt broadcasting a refund if lockup tx was never sent and swap is
193                    // unrecoverable. We resolve the payment as failed.
194                    None => {
195                        warn!("Send Swap {id} is in an unrecoverable state: {swap_state:?}, and lockup tx has never been broadcast. Resolving payment as failed.");
196                        self.update_swap_info(id, Failed, None, None, None)?;
197                    }
198                }
199
200                Ok(())
201            }
202
203            _ => {
204                debug!("Unhandled state for Send Swap {id}: {swap_state:?}");
205                Ok(())
206            }
207        }
208    }
209
210    pub(crate) async fn try_lockup(
211        &self,
212        swap: &SendSwap,
213        create_response: &CreateSubmarineResponse,
214    ) -> Result<Transaction, PaymentError> {
215        if swap.lockup_tx_id.is_some() {
216            debug!("Lockup tx was already broadcast for Send Swap {}", swap.id);
217            return Err(PaymentError::PaymentInProgress);
218        }
219
220        let swap_id = &swap.id;
221        debug!(
222            "Initiated Send Swap: send {} sats to liquid address {}",
223            create_response.expected_amount, create_response.address
224        );
225
226        let lockup_tx = self
227            .onchain_wallet
228            .build_tx_or_drain_tx(
229                Some(LIQUID_FEE_RATE_MSAT_PER_VBYTE),
230                &create_response.address,
231                &self.config.lbtc_asset_id(),
232                create_response.expected_amount,
233            )
234            .await?;
235        let lockup_tx_id = lockup_tx.txid().to_string();
236
237        self.persister
238            .set_send_swap_lockup_tx_id(swap_id, &lockup_tx_id)?;
239
240        info!("Broadcasting lockup tx {lockup_tx_id} for Send swap {swap_id}",);
241
242        let broadcast_result = self.chain_service.broadcast(&lockup_tx).await;
243
244        if let Err(err) = broadcast_result {
245            debug!("Could not broadcast lockup tx for Send Swap {swap_id}: {err:?}");
246            self.persister
247                .unset_send_swap_lockup_tx_id(swap_id, &lockup_tx_id)?;
248            return Err(err.into());
249        }
250
251        info!("Successfully broadcast lockup tx for Send Swap {swap_id}. Lockup tx id: {lockup_tx_id}");
252
253        // We insert a pseudo-lockup-tx in case LWK fails to pick up the new mempool tx for a while
254        // This makes the tx known to the SDK (get_info, list_payments) instantly
255        let lockup_tx_fees_sat: u64 = lockup_tx.all_fees().values().sum();
256        self.persister.insert_or_update_payment(
257            PaymentTxData {
258                tx_id: lockup_tx_id.clone(),
259                timestamp: Some(utils::now()),
260                asset_id: self.config.lbtc_asset_id(),
261                amount: create_response.expected_amount,
262                fees_sat: lockup_tx_fees_sat,
263                payment_type: PaymentType::Send,
264                is_confirmed: false,
265                unblinding_data: None,
266            },
267            None,
268            false,
269        )?;
270
271        self.update_swap_info(swap_id, Pending, None, Some(&lockup_tx_id), None)?;
272
273        Ok(lockup_tx)
274    }
275
276    fn fetch_send_swap_by_id(&self, swap_id: &str) -> Result<SendSwap, PaymentError> {
277        self.persister
278            .fetch_send_swap_by_id(swap_id)
279            .map_err(|_| PaymentError::PersistError)?
280            .ok_or(PaymentError::Generic {
281                err: format!("Send Swap not found {swap_id}"),
282            })
283    }
284
285    // Updates the swap without state transition validation
286    pub(crate) fn update_swap(&self, updated_swap: SendSwap) -> Result<(), PaymentError> {
287        let swap = self.fetch_send_swap_by_id(&updated_swap.id)?;
288        let lnurl_info_updated = self.update_swap_lnurl_info(&swap, &updated_swap)?;
289        if updated_swap != swap || lnurl_info_updated {
290            info!(
291                "Updating Send swap {} to {:?} (lockup_tx_id = {:?}, refund_tx_id = {:?})",
292                updated_swap.id,
293                updated_swap.state,
294                updated_swap.lockup_tx_id,
295                updated_swap.refund_tx_id
296            );
297            self.persister.insert_or_update_send_swap(&updated_swap)?;
298            let _ = self.subscription_notifier.send(updated_swap.id);
299        }
300        Ok(())
301    }
302
303    pub(crate) fn update_swap_lnurl_info(
304        &self,
305        swap: &SendSwap,
306        updated_swap: &SendSwap,
307    ) -> Result<bool> {
308        if swap.preimage.is_none() {
309            let Some(tx_id) = updated_swap.lockup_tx_id.clone() else {
310                return Ok(false);
311            };
312            let Some(ref preimage_str) = updated_swap.preimage.clone() else {
313                return Ok(false);
314            };
315            if let Some(PaymentTxDetails {
316                destination,
317                description,
318                lnurl_info: Some(mut lnurl_info),
319                bip353_address,
320                ..
321            }) = self.persister.get_payment_details(&tx_id)?
322            {
323                if let Some(SuccessAction::Aes { data }) =
324                    lnurl_info.lnurl_pay_unprocessed_success_action.clone()
325                {
326                    debug!(
327                        "Decrypting AES success action with preimage for Send Swap {}",
328                        swap.id
329                    );
330                    let preimage = sha256::Hash::from_str(preimage_str)?;
331                    let preimage_arr = preimage.to_byte_array();
332                    let result = match (data, &preimage_arr).try_into() {
333                        Ok(data) => AesSuccessActionDataResult::Decrypted { data },
334                        Err(e) => AesSuccessActionDataResult::ErrorStatus {
335                            reason: e.to_string(),
336                        },
337                    };
338                    lnurl_info.lnurl_pay_success_action =
339                        Some(SuccessActionProcessed::Aes { result });
340                    self.persister
341                        .insert_or_update_payment_details(PaymentTxDetails {
342                            tx_id,
343                            destination,
344                            description,
345                            lnurl_info: Some(lnurl_info),
346                            bip353_address,
347                            ..Default::default()
348                        })?;
349                    return Ok(true);
350                }
351            }
352        }
353        Ok(false)
354    }
355
356    // Updates the swap state with validation
357    pub(crate) fn update_swap_info(
358        &self,
359        swap_id: &str,
360        to_state: PaymentState,
361        preimage: Option<&str>,
362        lockup_tx_id: Option<&str>,
363        refund_tx_id: Option<&str>,
364    ) -> Result<(), PaymentError> {
365        info!(
366            "Transitioning Send swap {} to {:?} (lockup_tx_id = {:?}, refund_tx_id = {:?})",
367            swap_id, to_state, lockup_tx_id, refund_tx_id
368        );
369        let swap = self.fetch_send_swap_by_id(swap_id)?;
370        Self::validate_state_transition(swap.state, to_state)?;
371        self.persister.try_handle_send_swap_update(
372            swap_id,
373            to_state,
374            preimage,
375            lockup_tx_id,
376            refund_tx_id,
377        )?;
378        let updated_swap = self.fetch_send_swap_by_id(swap_id)?;
379        let lnurl_info_updated = self.update_swap_lnurl_info(&swap, &updated_swap)?;
380        if updated_swap != swap || lnurl_info_updated {
381            let _ = self.subscription_notifier.send(updated_swap.id);
382        }
383        Ok(())
384    }
385
386    async fn cooperate_claim(
387        &self,
388        send_swap: &SendSwap,
389        claim_tx_response: SubmarineClaimTxResponse,
390    ) -> Result<(), PaymentError> {
391        debug!(
392            "Claim is pending for Send Swap {}. Initiating cooperative claim",
393            &send_swap.id
394        );
395        let refund_address = match send_swap.refund_address {
396            Some(ref refund_address) => refund_address.clone(),
397            None => {
398                // If no refund address is set, we get an unused one
399                let address = self.onchain_wallet.next_unused_address().await?.to_string();
400                self.persister
401                    .set_send_swap_refund_address(&send_swap.id, &address)?;
402                address
403            }
404        };
405
406        self.swapper
407            .claim_send_swap_cooperative(send_swap, claim_tx_response, &refund_address)
408            .await?;
409        Ok(())
410    }
411
412    pub(crate) async fn refund(
413        &self,
414        swap: &SendSwap,
415        is_cooperative: bool,
416    ) -> Result<String, PaymentError> {
417        info!(
418            "Initiating refund for Send Swap {}, is_cooperative: {is_cooperative}",
419            swap.id
420        );
421
422        let swap_script = swap.get_swap_script()?;
423        let refund_address = match swap.refund_address {
424            Some(ref refund_address) => refund_address.clone(),
425            None => {
426                // If no refund address is set, we get an unused one
427                let address = self.onchain_wallet.next_unused_address().await?.to_string();
428                self.persister
429                    .set_send_swap_refund_address(&swap.id, &address)?;
430                address
431            }
432        };
433
434        let script_pk = swap_script
435            .to_address(self.config.network.into())
436            .map_err(|e| anyhow!("Could not retrieve address from swap script: {e:?}"))?
437            .to_unconfidential()
438            .script_pubkey();
439        let utxos = self.chain_service.get_script_utxos(&script_pk).await?;
440        let SdkTransaction::Liquid(refund_tx) = self
441            .swapper
442            .create_refund_tx(
443                Swap::Send(swap.clone()),
444                &refund_address,
445                utxos,
446                None,
447                is_cooperative,
448            )
449            .await?
450        else {
451            return Err(PaymentError::Generic {
452                err: format!(
453                    "Unexpected refund tx type returned for Send swap {}",
454                    swap.id
455                ),
456            });
457        };
458        let refund_tx_id = self.chain_service.broadcast(&refund_tx).await?.to_string();
459
460        info!(
461            "Successfully broadcast refund for Send Swap {}, is_cooperative: {is_cooperative}",
462            swap.id
463        );
464
465        Ok(refund_tx_id)
466    }
467
468    async fn check_swap_expiry(&self, swap: &SendSwap) -> Result<bool> {
469        let swap_creation_time = UNIX_EPOCH + Duration::from_secs(swap.created_at as u64);
470        let duration_since_creation_time = SystemTime::now().duration_since(swap_creation_time)?;
471        if duration_since_creation_time.as_secs() < 60 * 10 {
472            return Ok(false);
473        }
474
475        let swap_script = swap.get_swap_script()?;
476        let current_height = self.onchain_wallet.tip().await;
477        let locktime_from_height = LockTime::from_height(current_height)?;
478
479        info!("Checking Send Swap {} expiration: locktime_from_height = {locktime_from_height:?},  swap_script.locktime = {:?}", swap.id, swap_script.locktime);
480        Ok(utils::is_locktime_expired(
481            locktime_from_height,
482            swap_script.locktime,
483        ))
484    }
485
486    // Attempts both cooperative and non-cooperative refunds, and updates the swap info accordingly
487    pub(crate) async fn try_refund_all(&self, swaps: &[SendSwap]) {
488        for swap in swaps {
489            if swap.refund_tx_id.is_some() {
490                continue;
491            }
492
493            let has_swap_expired = self.check_swap_expiry(swap).await.unwrap_or(false);
494
495            if !has_swap_expired && swap.state == Pending {
496                continue;
497            }
498
499            let refund_tx_id_result = match swap.state {
500                Pending => self.refund(swap, false).await,
501                RefundPending => match has_swap_expired {
502                    true => {
503                        self.refund(swap, true)
504                            .or_else(|e| {
505                                warn!("Failed to initiate cooperative refund, switching to non-cooperative: {e:?}");
506                                self.refund(swap, false)
507                            })
508                            .await
509                    }
510                    false => self.refund(swap, true).await,
511                },
512                _ => {
513                    continue;
514                }
515            };
516
517            if let Ok(refund_tx_id) = refund_tx_id_result {
518                let update_swap_info_result =
519                    self.update_swap_info(&swap.id, RefundPending, None, None, Some(&refund_tx_id));
520                if let Err(err) = update_swap_info_result {
521                    warn!(
522                        "Could not update Send swap {} information, error: {err:?}",
523                        swap.id
524                    );
525                };
526            }
527        }
528    }
529
530    // Attempts refunding all payments whose state is `RefundPending` and with no
531    // refund_tx_id field present
532    pub(crate) async fn check_refunds(&self) -> Result<(), PaymentError> {
533        let pending_swaps = self.persister.list_pending_send_swaps()?;
534        self.try_refund_all(&pending_swaps).await;
535        Ok(())
536    }
537
538    fn validate_state_transition(
539        from_state: PaymentState,
540        to_state: PaymentState,
541    ) -> Result<(), PaymentError> {
542        match (from_state, to_state) {
543            (TimedOut, Created) => Ok(()),
544            (_, Created) => Err(PaymentError::Generic {
545                err: format!("Cannot transition from {from_state:?} to Created state"),
546            }),
547
548            (Created | Pending, Pending) => Ok(()),
549            (_, Pending) => Err(PaymentError::Generic {
550                err: format!("Cannot transition from {from_state:?} to Pending state"),
551            }),
552
553            (Created | Pending, Complete) => Ok(()),
554            (_, Complete) => Err(PaymentError::Generic {
555                err: format!("Cannot transition from {from_state:?} to Complete state"),
556            }),
557
558            (Created | TimedOut, TimedOut) => Ok(()),
559            (_, TimedOut) => Err(PaymentError::Generic {
560                err: format!("Cannot transition from {from_state:?} to TimedOut state"),
561            }),
562
563            (_, Refundable) => Err(PaymentError::Generic {
564                err: format!("Cannot transition from {from_state:?} to Refundable state"),
565            }),
566
567            (Pending, RefundPending) => Ok(()),
568            (_, RefundPending) => Err(PaymentError::Generic {
569                err: format!("Cannot transition from {from_state:?} to RefundPending state"),
570            }),
571
572            (Complete, Failed) => Err(PaymentError::Generic {
573                err: format!("Cannot transition from {from_state:?} to Failed state"),
574            }),
575            (_, Failed) => Ok(()),
576
577            (_, WaitingFeeAcceptance) => Err(PaymentError::Generic {
578                err: format!("Cannot transition from {from_state:?} to WaitingFeeAcceptance state"),
579            }),
580        }
581    }
582}
583
584#[cfg(test)]
585mod tests {
586    use std::collections::{HashMap, HashSet};
587
588    use anyhow::Result;
589
590    use crate::{
591        model::PaymentState::{self, *},
592        test_utils::{
593            persist::{create_persister, new_send_swap},
594            send_swap::new_send_swap_handler,
595        },
596    };
597
598    #[cfg(feature = "browser-tests")]
599    wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
600
601    #[sdk_macros::async_test_all]
602    async fn test_send_swap_state_transitions() -> Result<()> {
603        create_persister!(storage);
604        let send_swap_handler = new_send_swap_handler(storage.clone())?;
605
606        // Test valid combinations of states
607        let valid_combinations = HashMap::from([
608            (
609                Created,
610                HashSet::from([Pending, Complete, TimedOut, Failed]),
611            ),
612            (
613                Pending,
614                HashSet::from([Pending, RefundPending, Complete, Failed]),
615            ),
616            (TimedOut, HashSet::from([TimedOut, Created, Failed])),
617            (Complete, HashSet::from([])),
618            (Refundable, HashSet::from([Failed])),
619            (Failed, HashSet::from([Failed])),
620        ]);
621
622        for (first_state, allowed_states) in valid_combinations.iter() {
623            for allowed_state in allowed_states {
624                let send_swap = new_send_swap(Some(*first_state), None);
625                storage.insert_or_update_send_swap(&send_swap)?;
626
627                assert!(send_swap_handler
628                    .update_swap_info(&send_swap.id, *allowed_state, None, None, None)
629                    .is_ok());
630            }
631        }
632
633        // Test invalid combinations of states
634        let all_states = HashSet::from([Created, Pending, Complete, TimedOut, Failed]);
635        let invalid_combinations: HashMap<PaymentState, HashSet<PaymentState>> = valid_combinations
636            .iter()
637            .map(|(first_state, allowed_states)| {
638                (
639                    *first_state,
640                    all_states.difference(allowed_states).cloned().collect(),
641                )
642            })
643            .collect();
644
645        for (first_state, disallowed_states) in invalid_combinations.iter() {
646            for disallowed_state in disallowed_states {
647                let send_swap = new_send_swap(Some(*first_state), None);
648                storage.insert_or_update_send_swap(&send_swap)?;
649
650                assert!(send_swap_handler
651                    .update_swap_info(&send_swap.id, *disallowed_state, None, None, None)
652                    .is_err());
653            }
654        }
655
656        Ok(())
657    }
658}