1use std::str::FromStr;
2use std::time::Duration;
3
4use anyhow::{anyhow, Result};
5use boltz_client::boltz::SubmarineClaimTxResponse;
6use boltz_client::swaps::boltz;
7use boltz_client::swaps::{boltz::CreateSubmarineResponse, boltz::SubSwapStates};
8use futures_util::TryFutureExt;
9use log::{debug, info, warn};
10use lwk_wollet::elements::{LockTime, Transaction};
11use lwk_wollet::hashes::{sha256, Hash};
12use sdk_common::prelude::{AesSuccessActionDataResult, SuccessAction, SuccessActionProcessed};
13use sdk_common::utils::Arc;
14use tokio::sync::broadcast;
15use web_time::{SystemTime, UNIX_EPOCH};
16
17use crate::chain::liquid::LiquidChainService;
18use crate::model::{
19 BlockListener, Config, PaymentState::*, SendSwap, LIQUID_FEE_RATE_MSAT_PER_VBYTE,
20};
21use crate::persist::model::{PaymentTxBalance, PaymentTxDetails};
22use crate::prelude::{PaymentTxData, PaymentType, Swap};
23use crate::recover::recoverer::Recoverer;
24use crate::swapper::Swapper;
25use crate::utils;
26use crate::wallet::OnchainWallet;
27use crate::{
28 error::PaymentError,
29 model::{PaymentState, Transaction as SdkTransaction},
30 persist::Persister,
31};
32
33#[derive(Clone)]
34pub(crate) struct SendSwapHandler {
35 config: Config,
36 onchain_wallet: Arc<dyn OnchainWallet>,
37 persister: std::sync::Arc<Persister>,
38 swapper: Arc<dyn Swapper>,
39 chain_service: Arc<dyn LiquidChainService>,
40 subscription_notifier: broadcast::Sender<String>,
41 recoverer: Arc<Recoverer>,
42}
43
44#[sdk_macros::async_trait]
45impl BlockListener for SendSwapHandler {
46 async fn on_bitcoin_block(&self, _height: u32) {}
47
48 async fn on_liquid_block(&self, _height: u32) {
49 if let Err(err) = self.check_refunds().await {
50 warn!("Could not refund expired swaps, error: {err:?}");
51 }
52 }
53}
54
55impl SendSwapHandler {
56 pub(crate) fn new(
57 config: Config,
58 onchain_wallet: Arc<dyn OnchainWallet>,
59 persister: std::sync::Arc<Persister>,
60 swapper: Arc<dyn Swapper>,
61 chain_service: Arc<dyn LiquidChainService>,
62 recoverer: Arc<Recoverer>,
63 ) -> Self {
64 let (subscription_notifier, _) = broadcast::channel::<String>(30);
65 Self {
66 config,
67 onchain_wallet,
68 persister,
69 swapper,
70 chain_service,
71 subscription_notifier,
72 recoverer,
73 }
74 }
75
76 pub(crate) fn subscribe_payment_updates(&self) -> broadcast::Receiver<String> {
77 self.subscription_notifier.subscribe()
78 }
79
80 pub(crate) async fn on_new_status(&self, update: &boltz::SwapStatus) -> Result<()> {
82 let id = &update.id;
83 let status = &update.status;
84 let swap_state = SubSwapStates::from_str(status)
85 .map_err(|_| anyhow!("Invalid SubSwapState for Send Swap {id}: {status}"))?;
86 let swap = self.fetch_send_swap_by_id(id)?;
87 info!("Handling Send Swap transition to {swap_state:?} for swap {id}");
88
89 match swap_state {
91 SubSwapStates::InvoiceSet => {
93 warn!("Received `invoice.set` state for Send Swap {id}");
94 Ok(())
95 }
96
97 SubSwapStates::TransactionClaimPending => {
100 if swap.metadata.is_local {
101 let preimage = match self.swapper.get_send_claim_tx_details(&swap).await {
102 Ok(claim_tx_response) => {
103 match self.cooperate_claim(&swap, claim_tx_response.clone()).await {
104 Ok(_) => Some(claim_tx_response.preimage),
105 Err(e) => {
106 warn!("Could not cooperate Send Swap {id} claim: {e:?}");
107 None
108 }
109 }
110 }
111 Err(e) => {
112 warn!("Could not get claim tx details for Send Swap {id}: {e:?}");
113 None
114 }
115 };
116 let preimage = match preimage {
117 Some(preimage) => preimage,
118 None => {
119 let preimage = self.swapper.get_submarine_preimage(&swap.id).await?;
120 utils::verify_payment_hash(&preimage, &swap.invoice)?;
121 info!("Fetched Send Swap {id} preimage cooperatively");
122 preimage
123 }
124 };
125 self.update_swap_info(&swap.id, Complete, Some(&preimage), None, None)?;
126 }
127
128 Ok(())
129 }
130
131 SubSwapStates::TransactionClaimed => {
133 debug!("Send Swap {id} has been claimed");
134
135 match swap.preimage {
136 Some(_) => {
137 debug!("The claim tx was a key path spend (cooperative claim)");
138 }
141 None => {
142 debug!("The claim tx was a script path spend (non-cooperative claim)");
143 let mut swaps = vec![Swap::Send(swap.clone())];
144 self.recoverer
145 .recover_from_onchain(&mut swaps, None)
146 .await?;
147
148 let Swap::Send(s) = swaps[0].clone() else {
149 return Err(anyhow!("Expected a Send swap"));
150 };
151 self.update_swap(s)?;
152 }
153 }
154
155 Ok(())
156 }
157
158 SubSwapStates::TransactionLockupFailed
164 | SubSwapStates::InvoiceFailedToPay
165 | SubSwapStates::SwapExpired => {
166 match swap.lockup_tx_id {
167 Some(_) => match swap.refund_tx_id {
168 Some(refund_tx_id) => warn!(
169 "Refund tx for Send Swap {id} was already broadcast: txid {refund_tx_id}"
170 ),
171 None => {
172 warn!("Send Swap {id} is in an unrecoverable state: {swap_state:?}, and lockup tx has been broadcast.");
173 let refund_tx_id = match self.refund(&swap, true).await {
174 Ok(refund_tx_id) => Some(refund_tx_id),
175 Err(e) => {
176 warn!("Could not refund Send swap {id} cooperatively: {e:?}");
177 None
178 }
179 };
180 self.update_swap_info(
184 &swap.id,
185 RefundPending,
186 None,
187 None,
188 refund_tx_id.as_deref(),
189 )?;
190 }
191 },
192 None => {
195 warn!("Send Swap {id} is in an unrecoverable state: {swap_state:?}, and lockup tx has never been broadcast. Resolving payment as failed.");
196 self.update_swap_info(id, Failed, None, None, None)?;
197 }
198 }
199
200 Ok(())
201 }
202
203 _ => {
204 debug!("Unhandled state for Send Swap {id}: {swap_state:?}");
205 Ok(())
206 }
207 }
208 }
209
210 pub(crate) async fn try_lockup(
211 &self,
212 swap: &SendSwap,
213 create_response: &CreateSubmarineResponse,
214 ) -> Result<Transaction, PaymentError> {
215 if swap.lockup_tx_id.is_some() {
216 debug!("Lockup tx was already broadcast for Send Swap {}", swap.id);
217 return Err(PaymentError::PaymentInProgress);
218 }
219
220 let swap_id = &swap.id;
221 debug!(
222 "Initiated Send Swap: send {} sats to liquid address {}",
223 create_response.expected_amount, create_response.address
224 );
225
226 let lockup_tx = self
227 .onchain_wallet
228 .build_tx_or_drain_tx(
229 Some(LIQUID_FEE_RATE_MSAT_PER_VBYTE),
230 &create_response.address,
231 &self.config.lbtc_asset_id(),
232 create_response.expected_amount,
233 )
234 .await?;
235 let lockup_tx_id = lockup_tx.txid().to_string();
236
237 self.persister
238 .set_send_swap_lockup_tx_id(swap_id, &lockup_tx_id)?;
239
240 info!("Broadcasting lockup tx {lockup_tx_id} for Send swap {swap_id}",);
241
242 let broadcast_result = self.chain_service.broadcast(&lockup_tx).await;
243
244 if let Err(err) = broadcast_result {
245 debug!("Could not broadcast lockup tx for Send Swap {swap_id}: {err:?}");
246 self.persister
247 .unset_send_swap_lockup_tx_id(swap_id, &lockup_tx_id)?;
248 return Err(err.into());
249 }
250
251 info!("Successfully broadcast lockup tx for Send Swap {swap_id}. Lockup tx id: {lockup_tx_id}");
252
253 let lockup_tx_fees_sat: u64 = lockup_tx.all_fees().values().sum();
256 self.persister.insert_or_update_payment(
257 PaymentTxData {
258 tx_id: lockup_tx_id.clone(),
259 timestamp: Some(utils::now()),
260 fees_sat: lockup_tx_fees_sat,
261 is_confirmed: false,
262 unblinding_data: None,
263 },
264 &[PaymentTxBalance {
265 asset_id: self.config.lbtc_asset_id(),
266 amount: create_response.expected_amount,
267 payment_type: PaymentType::Send,
268 }],
269 None,
270 false,
271 )?;
272
273 self.update_swap_info(swap_id, Pending, None, Some(&lockup_tx_id), None)?;
274
275 Ok(lockup_tx)
276 }
277
278 fn fetch_send_swap_by_id(&self, swap_id: &str) -> Result<SendSwap, PaymentError> {
279 self.persister
280 .fetch_send_swap_by_id(swap_id)
281 .map_err(|_| PaymentError::PersistError)?
282 .ok_or(PaymentError::Generic {
283 err: format!("Send Swap not found {swap_id}"),
284 })
285 }
286
287 pub(crate) fn update_swap(&self, updated_swap: SendSwap) -> Result<(), PaymentError> {
289 let swap = self.fetch_send_swap_by_id(&updated_swap.id)?;
290 let lnurl_info_updated = self.update_swap_lnurl_info(&swap, &updated_swap)?;
291 if updated_swap != swap || lnurl_info_updated {
292 info!(
293 "Updating Send swap {} to {:?} (lockup_tx_id = {:?}, refund_tx_id = {:?})",
294 updated_swap.id,
295 updated_swap.state,
296 updated_swap.lockup_tx_id,
297 updated_swap.refund_tx_id
298 );
299 self.persister.insert_or_update_send_swap(&updated_swap)?;
300 let _ = self.subscription_notifier.send(updated_swap.id);
301 }
302 Ok(())
303 }
304
305 pub(crate) fn update_swap_lnurl_info(
306 &self,
307 swap: &SendSwap,
308 updated_swap: &SendSwap,
309 ) -> Result<bool> {
310 if swap.preimage.is_none() {
311 let Some(tx_id) = updated_swap.lockup_tx_id.clone() else {
312 return Ok(false);
313 };
314 let Some(ref preimage_str) = updated_swap.preimage.clone() else {
315 return Ok(false);
316 };
317 if let Some(PaymentTxDetails {
318 destination,
319 description,
320 lnurl_info: Some(mut lnurl_info),
321 bip353_address,
322 ..
323 }) = self.persister.get_payment_details(&tx_id)?
324 {
325 if let Some(SuccessAction::Aes { data }) =
326 lnurl_info.lnurl_pay_unprocessed_success_action.clone()
327 {
328 debug!(
329 "Decrypting AES success action with preimage for Send Swap {}",
330 swap.id
331 );
332 let preimage = sha256::Hash::from_str(preimage_str)?;
333 let preimage_arr = preimage.to_byte_array();
334 let result = match (data, &preimage_arr).try_into() {
335 Ok(data) => AesSuccessActionDataResult::Decrypted { data },
336 Err(e) => AesSuccessActionDataResult::ErrorStatus {
337 reason: e.to_string(),
338 },
339 };
340 lnurl_info.lnurl_pay_success_action =
341 Some(SuccessActionProcessed::Aes { result });
342 self.persister
343 .insert_or_update_payment_details(PaymentTxDetails {
344 tx_id,
345 destination,
346 description,
347 lnurl_info: Some(lnurl_info),
348 bip353_address,
349 ..Default::default()
350 })?;
351 return Ok(true);
352 }
353 }
354 }
355 Ok(false)
356 }
357
358 pub(crate) fn update_swap_info(
360 &self,
361 swap_id: &str,
362 to_state: PaymentState,
363 preimage: Option<&str>,
364 lockup_tx_id: Option<&str>,
365 refund_tx_id: Option<&str>,
366 ) -> Result<(), PaymentError> {
367 info!(
368 "Transitioning Send swap {swap_id} to {to_state:?} (lockup_tx_id = {lockup_tx_id:?}, refund_tx_id = {refund_tx_id:?})"
369 );
370 let swap = self.fetch_send_swap_by_id(swap_id)?;
371 Self::validate_state_transition(swap.state, to_state)?;
372 self.persister.try_handle_send_swap_update(
373 swap_id,
374 to_state,
375 preimage,
376 lockup_tx_id,
377 refund_tx_id,
378 )?;
379 let updated_swap = self.fetch_send_swap_by_id(swap_id)?;
380 let lnurl_info_updated = self.update_swap_lnurl_info(&swap, &updated_swap)?;
381 if updated_swap != swap || lnurl_info_updated {
382 let _ = self.subscription_notifier.send(updated_swap.id);
383 }
384 Ok(())
385 }
386
387 async fn cooperate_claim(
388 &self,
389 send_swap: &SendSwap,
390 claim_tx_response: SubmarineClaimTxResponse,
391 ) -> Result<(), PaymentError> {
392 debug!(
393 "Claim is pending for Send Swap {}. Initiating cooperative claim",
394 &send_swap.id
395 );
396 let refund_address = match send_swap.refund_address {
397 Some(ref refund_address) => refund_address.clone(),
398 None => {
399 let address = self.onchain_wallet.next_unused_address().await?.to_string();
401 self.persister
402 .set_send_swap_refund_address(&send_swap.id, &address)?;
403 address
404 }
405 };
406
407 self.swapper
408 .claim_send_swap_cooperative(send_swap, claim_tx_response, &refund_address)
409 .await?;
410 Ok(())
411 }
412
413 pub(crate) async fn refund(
414 &self,
415 swap: &SendSwap,
416 is_cooperative: bool,
417 ) -> Result<String, PaymentError> {
418 info!(
419 "Initiating refund for Send Swap {}, is_cooperative: {is_cooperative}",
420 swap.id
421 );
422
423 let swap_script = swap.get_swap_script()?;
424 let refund_address = match swap.refund_address {
425 Some(ref refund_address) => refund_address.clone(),
426 None => {
427 let address = self.onchain_wallet.next_unused_address().await?.to_string();
429 self.persister
430 .set_send_swap_refund_address(&swap.id, &address)?;
431 address
432 }
433 };
434
435 let script_pk = swap_script
436 .to_address(self.config.network.into())
437 .map_err(|e| anyhow!("Could not retrieve address from swap script: {e:?}"))?
438 .to_unconfidential()
439 .script_pubkey();
440 let utxos = self.chain_service.get_script_utxos(&script_pk).await?;
441 let SdkTransaction::Liquid(refund_tx) = self
442 .swapper
443 .create_refund_tx(
444 Swap::Send(swap.clone()),
445 &refund_address,
446 utxos,
447 None,
448 is_cooperative,
449 )
450 .await?
451 else {
452 return Err(PaymentError::Generic {
453 err: format!(
454 "Unexpected refund tx type returned for Send swap {}",
455 swap.id
456 ),
457 });
458 };
459 let refund_tx_id = self.chain_service.broadcast(&refund_tx).await?.to_string();
460
461 info!(
462 "Successfully broadcast refund for Send Swap {}, is_cooperative: {is_cooperative}",
463 swap.id
464 );
465
466 Ok(refund_tx_id)
467 }
468
469 async fn check_swap_expiry(&self, swap: &SendSwap) -> Result<bool> {
470 let swap_creation_time = UNIX_EPOCH + Duration::from_secs(swap.created_at as u64);
471 let duration_since_creation_time = SystemTime::now().duration_since(swap_creation_time)?;
472 if duration_since_creation_time.as_secs() < 60 * 10 {
473 return Ok(false);
474 }
475
476 let swap_script = swap.get_swap_script()?;
477 let current_height = self.onchain_wallet.tip().await;
478 let locktime_from_height = LockTime::from_height(current_height)?;
479
480 info!("Checking Send Swap {} expiration: locktime_from_height = {locktime_from_height:?}, swap_script.locktime = {:?}", swap.id, swap_script.locktime);
481 Ok(utils::is_locktime_expired(
482 locktime_from_height,
483 swap_script.locktime,
484 ))
485 }
486
487 pub(crate) async fn try_refund_all(&self, swaps: &[SendSwap]) {
489 for swap in swaps {
490 if swap.refund_tx_id.is_some() {
491 continue;
492 }
493
494 let has_swap_expired = self.check_swap_expiry(swap).await.unwrap_or(false);
495
496 if !has_swap_expired && swap.state == Pending {
497 continue;
498 }
499
500 let refund_tx_id_result = match swap.state {
501 Pending => self.refund(swap, false).await,
502 RefundPending => match has_swap_expired {
503 true => {
504 self.refund(swap, true)
505 .or_else(|e| {
506 warn!("Failed to initiate cooperative refund, switching to non-cooperative: {e:?}");
507 self.refund(swap, false)
508 })
509 .await
510 }
511 false => self.refund(swap, true).await,
512 },
513 _ => {
514 continue;
515 }
516 };
517
518 if let Ok(refund_tx_id) = refund_tx_id_result {
519 let update_swap_info_result =
520 self.update_swap_info(&swap.id, RefundPending, None, None, Some(&refund_tx_id));
521 if let Err(err) = update_swap_info_result {
522 warn!(
523 "Could not update Send swap {} information, error: {err:?}",
524 swap.id
525 );
526 };
527 }
528 }
529 }
530
531 pub(crate) async fn check_refunds(&self) -> Result<(), PaymentError> {
534 let pending_swaps = self.persister.list_pending_send_swaps()?;
535 self.try_refund_all(&pending_swaps).await;
536 Ok(())
537 }
538
539 fn validate_state_transition(
540 from_state: PaymentState,
541 to_state: PaymentState,
542 ) -> Result<(), PaymentError> {
543 match (from_state, to_state) {
544 (TimedOut, Created) => Ok(()),
545 (_, Created) => Err(PaymentError::Generic {
546 err: format!("Cannot transition from {from_state:?} to Created state"),
547 }),
548
549 (Created | Pending, Pending) => Ok(()),
550 (_, Pending) => Err(PaymentError::Generic {
551 err: format!("Cannot transition from {from_state:?} to Pending state"),
552 }),
553
554 (Created | Pending, Complete) => Ok(()),
555 (_, Complete) => Err(PaymentError::Generic {
556 err: format!("Cannot transition from {from_state:?} to Complete state"),
557 }),
558
559 (Created | TimedOut, TimedOut) => Ok(()),
560 (_, TimedOut) => Err(PaymentError::Generic {
561 err: format!("Cannot transition from {from_state:?} to TimedOut state"),
562 }),
563
564 (_, Refundable) => Err(PaymentError::Generic {
565 err: format!("Cannot transition from {from_state:?} to Refundable state"),
566 }),
567
568 (Pending, RefundPending) => Ok(()),
569 (_, RefundPending) => Err(PaymentError::Generic {
570 err: format!("Cannot transition from {from_state:?} to RefundPending state"),
571 }),
572
573 (Complete, Failed) => Err(PaymentError::Generic {
574 err: format!("Cannot transition from {from_state:?} to Failed state"),
575 }),
576 (_, Failed) => Ok(()),
577
578 (_, WaitingFeeAcceptance) => Err(PaymentError::Generic {
579 err: format!("Cannot transition from {from_state:?} to WaitingFeeAcceptance state"),
580 }),
581 }
582 }
583}
584
585#[cfg(test)]
586mod tests {
587 use std::collections::{HashMap, HashSet};
588
589 use anyhow::Result;
590
591 use crate::{
592 model::PaymentState::{self, *},
593 test_utils::{
594 persist::{create_persister, new_send_swap},
595 send_swap::new_send_swap_handler,
596 },
597 };
598
599 #[cfg(feature = "browser-tests")]
600 wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
601
602 #[sdk_macros::async_test_all]
603 async fn test_send_swap_state_transitions() -> Result<()> {
604 create_persister!(storage);
605 let send_swap_handler = new_send_swap_handler(storage.clone())?;
606
607 let valid_combinations = HashMap::from([
609 (
610 Created,
611 HashSet::from([Pending, Complete, TimedOut, Failed]),
612 ),
613 (
614 Pending,
615 HashSet::from([Pending, RefundPending, Complete, Failed]),
616 ),
617 (TimedOut, HashSet::from([TimedOut, Created, Failed])),
618 (Complete, HashSet::from([])),
619 (Refundable, HashSet::from([Failed])),
620 (Failed, HashSet::from([Failed])),
621 ]);
622
623 for (first_state, allowed_states) in valid_combinations.iter() {
624 for allowed_state in allowed_states {
625 let send_swap = new_send_swap(Some(*first_state), None);
626 storage.insert_or_update_send_swap(&send_swap)?;
627
628 assert!(send_swap_handler
629 .update_swap_info(&send_swap.id, *allowed_state, None, None, None)
630 .is_ok());
631 }
632 }
633
634 let all_states = HashSet::from([Created, Pending, Complete, TimedOut, Failed]);
636 let invalid_combinations: HashMap<PaymentState, HashSet<PaymentState>> = valid_combinations
637 .iter()
638 .map(|(first_state, allowed_states)| {
639 (
640 *first_state,
641 all_states.difference(allowed_states).cloned().collect(),
642 )
643 })
644 .collect();
645
646 for (first_state, disallowed_states) in invalid_combinations.iter() {
647 for disallowed_state in disallowed_states {
648 let send_swap = new_send_swap(Some(*first_state), None);
649 storage.insert_or_update_send_swap(&send_swap)?;
650
651 assert!(send_swap_handler
652 .update_swap_info(&send_swap.id, *disallowed_state, None, None, None)
653 .is_err());
654 }
655 }
656
657 Ok(())
658 }
659}