1use std::str::FromStr;
2use std::sync::Arc;
3use std::time::Duration;
4
5use anyhow::{anyhow, bail, Result};
6use boltz_client::boltz::SubmarineClaimTxResponse;
7use boltz_client::swaps::boltz;
8use boltz_client::swaps::{boltz::CreateSubmarineResponse, boltz::SubSwapStates};
9use futures_util::TryFutureExt;
10use log::{debug, error, info, warn};
11use lwk_wollet::elements::{LockTime, Transaction};
12use lwk_wollet::hashes::{sha256, Hash};
13use sdk_common::prelude::{AesSuccessActionDataResult, SuccessAction, SuccessActionProcessed};
14use tokio::sync::broadcast;
15use web_time::{SystemTime, UNIX_EPOCH};
16
17use crate::chain::liquid::LiquidChainService;
18use crate::model::{
19 BlockListener, Config, PaymentState::*, SendSwap, LIQUID_FEE_RATE_MSAT_PER_VBYTE,
20};
21use crate::persist::model::{PaymentTxBalance, PaymentTxDetails};
22use crate::prelude::{PaymentTxData, PaymentType, Swap};
23use crate::recover::recoverer::Recoverer;
24use crate::swapper::Swapper;
25use crate::utils;
26use crate::wallet::OnchainWallet;
27use crate::{
28 error::PaymentError,
29 model::{PaymentState, Transaction as SdkTransaction},
30 persist::Persister,
31};
32
33#[derive(Clone)]
34pub(crate) struct SendSwapHandler {
35 config: Config,
36 onchain_wallet: Arc<dyn OnchainWallet>,
37 persister: std::sync::Arc<Persister>,
38 swapper: Arc<dyn Swapper>,
39 chain_service: Arc<dyn LiquidChainService>,
40 subscription_notifier: broadcast::Sender<String>,
41 recoverer: Arc<Recoverer>,
42}
43
44#[sdk_macros::async_trait]
45impl BlockListener for SendSwapHandler {
46 async fn on_bitcoin_block(&self, _height: u32) {}
47
48 async fn on_liquid_block(&self, _height: u32) {
49 if let Err(err) = self.check_refunds().await {
50 warn!("Could not refund expired swaps, error: {err:?}");
51 }
52 }
53}
54
55impl SendSwapHandler {
56 pub(crate) fn new(
57 config: Config,
58 onchain_wallet: Arc<dyn OnchainWallet>,
59 persister: std::sync::Arc<Persister>,
60 swapper: Arc<dyn Swapper>,
61 chain_service: Arc<dyn LiquidChainService>,
62 recoverer: Arc<Recoverer>,
63 ) -> Self {
64 let (subscription_notifier, _) = broadcast::channel::<String>(30);
65 Self {
66 config,
67 onchain_wallet,
68 persister,
69 swapper,
70 chain_service,
71 subscription_notifier,
72 recoverer,
73 }
74 }
75
76 pub(crate) fn subscribe_payment_updates(&self) -> broadcast::Receiver<String> {
77 self.subscription_notifier.subscribe()
78 }
79
80 pub(crate) async fn on_new_status(&self, update: &boltz::SwapStatus) -> Result<()> {
82 let id = &update.id;
83 let status = &update.status;
84 let swap_state = SubSwapStates::from_str(status)
85 .map_err(|_| anyhow!("Invalid SubSwapState for Send Swap {id}: {status}"))?;
86 let swap = self.fetch_send_swap_by_id(id)?;
87 info!("Handling Send Swap transition to {swap_state:?} for swap {id}");
88
89 match swap_state {
91 SubSwapStates::InvoiceSet => {
93 warn!("Received `invoice.set` state for Send Swap {id}");
94 Ok(())
95 }
96
97 SubSwapStates::TransactionClaimPending => {
100 if swap.metadata.is_local {
101 let preimage = match self.swapper.get_send_claim_tx_details(&swap).await {
102 Ok(claim_tx_response) => {
103 match self.cooperate_claim(&swap, claim_tx_response.clone()).await {
104 Ok(_) => Some(claim_tx_response.preimage),
105 Err(e) => {
106 warn!("Could not cooperate Send Swap {id} claim: {e:?}");
107 None
108 }
109 }
110 }
111 Err(e) => {
112 warn!("Could not get claim tx details for Send Swap {id}: {e:?}");
113 None
114 }
115 };
116 let preimage = match preimage {
117 Some(preimage) => preimage,
118 None => {
119 let preimage = self.swapper.get_submarine_preimage(&swap.id).await?;
120 utils::verify_payment_hash(&preimage, &swap.invoice)?;
121 info!("Fetched Send Swap {id} preimage cooperatively");
122 preimage
123 }
124 };
125 self.update_swap_info(&swap.id, Complete, Some(&preimage), None, None)?;
126 }
127
128 Ok(())
129 }
130
131 SubSwapStates::TransactionClaimed => {
133 debug!("Send Swap {id} has been claimed");
134
135 match swap.preimage {
136 Some(_) => {
137 debug!("The claim tx was a key path spend (cooperative claim)");
138 }
141 None => {
142 debug!("The claim tx was a script path spend (non-cooperative claim)");
143 let mut swaps = vec![Swap::Send(swap.clone())];
144 self.recoverer
145 .recover_from_onchain(&mut swaps, None)
146 .await?;
147
148 let Swap::Send(s) = swaps[0].clone() else {
149 return Err(anyhow!("Expected a Send swap"));
150 };
151 self.update_swap(s)?;
152 }
153 }
154
155 Ok(())
156 }
157
158 SubSwapStates::InvoicePaid => {
160 info!("Received `InvoicePaid` state from Boltz, saving invoice settlement time.");
161 let Some(lockup_tx_id) = swap.lockup_tx_id else {
162 bail!("Could not save invoice settlement time: no lockup tx id found.");
163 };
164 self.persister
165 .insert_or_update_payment_details(PaymentTxDetails {
166 tx_id: lockup_tx_id,
167 destination: swap.invoice,
168 settled_at: Some(utils::now()),
169 ..Default::default()
170 })
171 .map_err(|err| {
172 anyhow!(
173 "Could not persist invoice settlement time for Send Swap {id}: {err}"
174 )
175 })?;
176 Ok(())
177 }
178
179 SubSwapStates::TransactionLockupFailed
185 | SubSwapStates::InvoiceFailedToPay
186 | SubSwapStates::SwapExpired => {
187 match swap.lockup_tx_id {
188 Some(_) => match swap.refund_tx_id {
189 Some(refund_tx_id) => warn!(
190 "Refund tx for Send Swap {id} was already broadcast: txid {refund_tx_id}"
191 ),
192 None => {
193 warn!("Send Swap {id} is in an unrecoverable state: {swap_state:?}, and lockup tx has been broadcast.");
194 let refund_tx_id = match self.refund(&swap, true).await {
195 Ok(refund_tx_id) => Some(refund_tx_id),
196 Err(e) => {
197 warn!("Could not refund Send swap {id} cooperatively: {e:?}");
198 None
199 }
200 };
201 self.update_swap_info(
205 &swap.id,
206 RefundPending,
207 None,
208 None,
209 refund_tx_id.as_deref(),
210 )?;
211 }
212 },
213 None => {
216 warn!("Send Swap {id} is in an unrecoverable state: {swap_state:?}, and lockup tx has never been broadcast. Resolving payment as failed.");
217 self.update_swap_info(id, Failed, None, None, None)?;
218 }
219 }
220
221 Ok(())
222 }
223
224 _ => {
225 debug!("Unhandled state for Send Swap {id}: {swap_state:?}");
226 Ok(())
227 }
228 }
229 }
230
231 pub(crate) async fn try_lockup(
232 &self,
233 swap: &SendSwap,
234 create_response: &CreateSubmarineResponse,
235 ) -> Result<Transaction, PaymentError> {
236 if swap.lockup_tx_id.is_some() {
237 debug!("Lockup tx was already broadcast for Send Swap {}", swap.id);
238 return Err(PaymentError::PaymentInProgress);
239 }
240
241 let swap_id = &swap.id;
242 debug!(
243 "Initiated Send Swap: send {} sats to liquid address {}",
244 create_response.expected_amount, create_response.address
245 );
246
247 let lockup_tx = self
248 .onchain_wallet
249 .build_tx_or_drain_tx(
250 Some(LIQUID_FEE_RATE_MSAT_PER_VBYTE),
251 &create_response.address,
252 &self.config.lbtc_asset_id(),
253 create_response.expected_amount,
254 )
255 .await?;
256 let lockup_tx_id = lockup_tx.txid().to_string();
257
258 self.persister
259 .set_send_swap_lockup_tx_id(swap_id, &lockup_tx_id)?;
260
261 info!("Broadcasting lockup tx {lockup_tx_id} for Send swap {swap_id}",);
262
263 let broadcast_result = self.chain_service.broadcast(&lockup_tx).await;
264
265 if let Err(err) = broadcast_result {
266 debug!("Could not broadcast lockup tx for Send Swap {swap_id}: {err:?}");
267 self.persister
268 .unset_send_swap_lockup_tx_id(swap_id, &lockup_tx_id)?;
269 return Err(err.into());
270 }
271
272 info!("Successfully broadcast lockup tx for Send Swap {swap_id}. Lockup tx id: {lockup_tx_id}");
273
274 let lockup_tx_fees_sat: u64 = lockup_tx.all_fees().values().sum();
277 self.persister.insert_or_update_payment(
278 PaymentTxData {
279 tx_id: lockup_tx_id.clone(),
280 timestamp: Some(utils::now()),
281 fees_sat: lockup_tx_fees_sat,
282 is_confirmed: false,
283 unblinding_data: None,
284 },
285 &[PaymentTxBalance {
286 asset_id: self.config.lbtc_asset_id(),
287 amount: create_response.expected_amount,
288 payment_type: PaymentType::Send,
289 }],
290 None,
291 false,
292 )?;
293
294 self.update_swap_info(swap_id, Pending, None, Some(&lockup_tx_id), None)?;
295
296 Ok(lockup_tx)
297 }
298
299 fn fetch_send_swap_by_id(&self, swap_id: &str) -> Result<SendSwap, PaymentError> {
300 self.persister
301 .fetch_send_swap_by_id(swap_id)
302 .map_err(|e| {
303 error!("Failed to fetch send swap by id: {e:?}");
304 PaymentError::PersistError
305 })?
306 .ok_or(PaymentError::Generic {
307 err: format!("Send Swap not found {swap_id}"),
308 })
309 }
310
311 pub(crate) fn update_swap(&self, updated_swap: SendSwap) -> Result<(), PaymentError> {
313 let swap_id = &updated_swap.id;
314 let swap = self.fetch_send_swap_by_id(swap_id)?;
315 let lnurl_info_updated = self.update_swap_lnurl_info(&swap, &updated_swap)?;
316 if updated_swap != swap || lnurl_info_updated {
317 info!(
318 "Updating Send swap {swap_id} to {:?} (lockup_tx_id = {:?}, refund_tx_id = {:?})",
319 updated_swap.state, updated_swap.lockup_tx_id, updated_swap.refund_tx_id
320 );
321 self.persister.insert_or_update_send_swap(&updated_swap)?;
322 let _ = self.subscription_notifier.send(swap_id.clone());
323 }
324
325 if updated_swap.state == Complete {
329 utils::update_invoice_settled_at(
330 &self.persister,
331 swap_id,
332 updated_swap.lockup_tx_id.as_ref(),
333 updated_swap.invoice.clone(),
334 );
335 }
336
337 Ok(())
338 }
339
340 pub(crate) fn update_swap_lnurl_info(
341 &self,
342 swap: &SendSwap,
343 updated_swap: &SendSwap,
344 ) -> Result<bool> {
345 if swap.preimage.is_none() {
346 let Some(tx_id) = updated_swap.lockup_tx_id.clone() else {
347 return Ok(false);
348 };
349 let Some(ref preimage_str) = updated_swap.preimage.clone() else {
350 return Ok(false);
351 };
352 if let Some(PaymentTxDetails {
353 destination,
354 description,
355 lnurl_info: Some(mut lnurl_info),
356 bip353_address,
357 ..
358 }) = self.persister.get_payment_details(&tx_id)?
359 {
360 if let Some(SuccessAction::Aes { data }) =
361 lnurl_info.lnurl_pay_unprocessed_success_action.clone()
362 {
363 debug!(
364 "Decrypting AES success action with preimage for Send Swap {}",
365 swap.id
366 );
367 let preimage = sha256::Hash::from_str(preimage_str)?;
368 let preimage_arr = preimage.to_byte_array();
369 let result = match (data, &preimage_arr).try_into() {
370 Ok(data) => AesSuccessActionDataResult::Decrypted { data },
371 Err(e) => AesSuccessActionDataResult::ErrorStatus {
372 reason: e.to_string(),
373 },
374 };
375 lnurl_info.lnurl_pay_success_action =
376 Some(SuccessActionProcessed::Aes { result });
377 self.persister
378 .insert_or_update_payment_details(PaymentTxDetails {
379 tx_id,
380 destination,
381 description,
382 lnurl_info: Some(lnurl_info),
383 bip353_address,
384 ..Default::default()
385 })?;
386 return Ok(true);
387 }
388 }
389 }
390 Ok(false)
391 }
392
393 pub(crate) fn update_swap_info(
395 &self,
396 swap_id: &str,
397 to_state: PaymentState,
398 preimage: Option<&str>,
399 lockup_tx_id: Option<&str>,
400 refund_tx_id: Option<&str>,
401 ) -> Result<(), PaymentError> {
402 info!(
403 "Transitioning Send swap {swap_id} to {to_state:?} (lockup_tx_id = {lockup_tx_id:?}, refund_tx_id = {refund_tx_id:?})"
404 );
405 let swap = self.fetch_send_swap_by_id(swap_id)?;
406 Self::validate_state_transition(swap.state, to_state)?;
407 self.persister.try_handle_send_swap_update(
408 swap_id,
409 to_state,
410 preimage,
411 lockup_tx_id,
412 refund_tx_id,
413 )?;
414 let updated_swap = self.fetch_send_swap_by_id(swap_id)?;
415 let lnurl_info_updated = self.update_swap_lnurl_info(&swap, &updated_swap)?;
416 if updated_swap != swap || lnurl_info_updated {
417 let _ = self.subscription_notifier.send(updated_swap.id);
418 }
419 Ok(())
420 }
421
422 async fn cooperate_claim(
423 &self,
424 send_swap: &SendSwap,
425 claim_tx_response: SubmarineClaimTxResponse,
426 ) -> Result<(), PaymentError> {
427 debug!(
428 "Claim is pending for Send Swap {}. Initiating cooperative claim",
429 &send_swap.id
430 );
431 let refund_address = match send_swap.refund_address {
432 Some(ref refund_address) => refund_address.clone(),
433 None => {
434 let address = self.onchain_wallet.next_unused_address().await?.to_string();
436 self.persister
437 .set_send_swap_refund_address(&send_swap.id, &address)?;
438 address
439 }
440 };
441
442 self.swapper
443 .claim_send_swap_cooperative(send_swap, claim_tx_response, &refund_address)
444 .await?;
445 Ok(())
446 }
447
448 pub(crate) async fn refund(
449 &self,
450 swap: &SendSwap,
451 is_cooperative: bool,
452 ) -> Result<String, PaymentError> {
453 info!(
454 "Initiating refund for Send Swap {}, is_cooperative: {is_cooperative}",
455 swap.id
456 );
457
458 let swap_script = swap.get_swap_script()?;
459 let refund_address = match swap.refund_address {
460 Some(ref refund_address) => refund_address.clone(),
461 None => {
462 let address = self.onchain_wallet.next_unused_address().await?.to_string();
464 self.persister
465 .set_send_swap_refund_address(&swap.id, &address)?;
466 address
467 }
468 };
469
470 let script_pk = swap_script
471 .to_address(self.config.network.into())
472 .map_err(|e| anyhow!("Could not retrieve address from swap script: {e:?}"))?
473 .to_unconfidential()
474 .script_pubkey();
475 let utxos = self.chain_service.get_script_utxos(&script_pk).await?;
476 let SdkTransaction::Liquid(refund_tx) = self
477 .swapper
478 .create_refund_tx(
479 Swap::Send(swap.clone()),
480 &refund_address,
481 utxos,
482 None,
483 is_cooperative,
484 )
485 .await?
486 else {
487 return Err(PaymentError::Generic {
488 err: format!(
489 "Unexpected refund tx type returned for Send swap {}",
490 swap.id
491 ),
492 });
493 };
494 let refund_tx_id = self.chain_service.broadcast(&refund_tx).await?.to_string();
495
496 info!(
497 "Successfully broadcast refund for Send Swap {}, is_cooperative: {is_cooperative}",
498 swap.id
499 );
500
501 Ok(refund_tx_id)
502 }
503
504 async fn check_swap_expiry(&self, swap: &SendSwap) -> Result<bool> {
505 let swap_creation_time = UNIX_EPOCH + Duration::from_secs(swap.created_at as u64);
506 let duration_since_creation_time = SystemTime::now().duration_since(swap_creation_time)?;
507 if duration_since_creation_time.as_secs() < 60 * 10 {
508 return Ok(false);
509 }
510
511 let swap_script = swap.get_swap_script()?;
512 let current_height = self.onchain_wallet.tip().await;
513 let locktime_from_height = LockTime::from_height(current_height)?;
514
515 info!("Checking Send Swap {} expiration: locktime_from_height = {locktime_from_height:?}, swap_script.locktime = {:?}", swap.id, swap_script.locktime);
516 Ok(utils::is_locktime_expired(
517 locktime_from_height,
518 swap_script.locktime,
519 ))
520 }
521
522 pub(crate) async fn try_refund_all(&self, swaps: &[SendSwap]) {
524 for swap in swaps {
525 if swap.refund_tx_id.is_some() {
526 continue;
527 }
528
529 let has_swap_expired = self.check_swap_expiry(swap).await.unwrap_or(false);
530
531 if !has_swap_expired && swap.state == Pending {
532 continue;
533 }
534
535 let refund_tx_id_result = match swap.state {
536 Pending => self.refund(swap, false).await,
537 RefundPending => match has_swap_expired {
538 true => {
539 self.refund(swap, true)
540 .or_else(|e| {
541 warn!("Failed to initiate cooperative refund, switching to non-cooperative: {e:?}");
542 self.refund(swap, false)
543 })
544 .await
545 }
546 false => self.refund(swap, true).await,
547 },
548 _ => {
549 continue;
550 }
551 };
552
553 if let Ok(refund_tx_id) = refund_tx_id_result {
554 let update_swap_info_result =
555 self.update_swap_info(&swap.id, RefundPending, None, None, Some(&refund_tx_id));
556 if let Err(err) = update_swap_info_result {
557 warn!(
558 "Could not update Send swap {} information, error: {err:?}",
559 swap.id
560 );
561 };
562 }
563 }
564 }
565
566 pub(crate) async fn check_refunds(&self) -> Result<(), PaymentError> {
569 let pending_swaps = self.persister.list_pending_send_swaps()?;
570 self.try_refund_all(&pending_swaps).await;
571 Ok(())
572 }
573
574 fn validate_state_transition(
575 from_state: PaymentState,
576 to_state: PaymentState,
577 ) -> Result<(), PaymentError> {
578 match (from_state, to_state) {
579 (TimedOut, Created) => Ok(()),
580 (_, Created) => Err(PaymentError::Generic {
581 err: format!("Cannot transition from {from_state:?} to Created state"),
582 }),
583
584 (Created | Pending, Pending) => Ok(()),
585 (_, Pending) => Err(PaymentError::Generic {
586 err: format!("Cannot transition from {from_state:?} to Pending state"),
587 }),
588
589 (Created | Pending, Complete) => Ok(()),
590 (_, Complete) => Err(PaymentError::Generic {
591 err: format!("Cannot transition from {from_state:?} to Complete state"),
592 }),
593
594 (Created | TimedOut, TimedOut) => Ok(()),
595 (_, TimedOut) => Err(PaymentError::Generic {
596 err: format!("Cannot transition from {from_state:?} to TimedOut state"),
597 }),
598
599 (_, Refundable) => Err(PaymentError::Generic {
600 err: format!("Cannot transition from {from_state:?} to Refundable state"),
601 }),
602
603 (Pending, RefundPending) => Ok(()),
604 (_, RefundPending) => Err(PaymentError::Generic {
605 err: format!("Cannot transition from {from_state:?} to RefundPending state"),
606 }),
607
608 (Complete, Failed) => Err(PaymentError::Generic {
609 err: format!("Cannot transition from {from_state:?} to Failed state"),
610 }),
611 (_, Failed) => Ok(()),
612
613 (_, WaitingFeeAcceptance) => Err(PaymentError::Generic {
614 err: format!("Cannot transition from {from_state:?} to WaitingFeeAcceptance state"),
615 }),
616 }
617 }
618}
619
620#[cfg(test)]
621mod tests {
622 use std::collections::{HashMap, HashSet};
623
624 use anyhow::Result;
625
626 use crate::{
627 model::PaymentState::{self, *},
628 test_utils::{
629 persist::{create_persister, new_send_swap},
630 send_swap::new_send_swap_handler,
631 },
632 };
633
634 #[cfg(feature = "browser-tests")]
635 wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
636
637 #[sdk_macros::async_test_all]
638 async fn test_send_swap_state_transitions() -> Result<()> {
639 create_persister!(storage);
640 let send_swap_handler = new_send_swap_handler(storage.clone())?;
641
642 let valid_combinations = HashMap::from([
644 (
645 Created,
646 HashSet::from([Pending, Complete, TimedOut, Failed]),
647 ),
648 (
649 Pending,
650 HashSet::from([Pending, RefundPending, Complete, Failed]),
651 ),
652 (TimedOut, HashSet::from([TimedOut, Created, Failed])),
653 (Complete, HashSet::from([])),
654 (Refundable, HashSet::from([Failed])),
655 (Failed, HashSet::from([Failed])),
656 ]);
657
658 for (first_state, allowed_states) in valid_combinations.iter() {
659 for allowed_state in allowed_states {
660 let send_swap = new_send_swap(Some(*first_state), None);
661 storage.insert_or_update_send_swap(&send_swap)?;
662
663 assert!(send_swap_handler
664 .update_swap_info(&send_swap.id, *allowed_state, None, None, None)
665 .is_ok());
666 }
667 }
668
669 let all_states = HashSet::from([Created, Pending, Complete, TimedOut, Failed]);
671 let invalid_combinations: HashMap<PaymentState, HashSet<PaymentState>> = valid_combinations
672 .iter()
673 .map(|(first_state, allowed_states)| {
674 (
675 *first_state,
676 all_states.difference(allowed_states).cloned().collect(),
677 )
678 })
679 .collect();
680
681 for (first_state, disallowed_states) in invalid_combinations.iter() {
682 for disallowed_state in disallowed_states {
683 let send_swap = new_send_swap(Some(*first_state), None);
684 storage.insert_or_update_send_swap(&send_swap)?;
685
686 assert!(send_swap_handler
687 .update_swap_info(&send_swap.id, *disallowed_state, None, None, None)
688 .is_err());
689 }
690 }
691
692 Ok(())
693 }
694}