1use std::str::FromStr;
2use std::time::Duration;
3
4use anyhow::{anyhow, Result};
5use boltz_client::boltz::SubmarineClaimTxResponse;
6use boltz_client::swaps::boltz;
7use boltz_client::swaps::{boltz::CreateSubmarineResponse, boltz::SubSwapStates};
8use futures_util::TryFutureExt;
9use log::{debug, info, warn};
10use lwk_wollet::elements::{LockTime, Transaction};
11use lwk_wollet::hashes::{sha256, Hash};
12use sdk_common::prelude::{AesSuccessActionDataResult, SuccessAction, SuccessActionProcessed};
13use sdk_common::utils::Arc;
14use tokio::sync::broadcast;
15use web_time::{SystemTime, UNIX_EPOCH};
16
17use crate::chain::liquid::LiquidChainService;
18use crate::model::{
19 BlockListener, Config, PaymentState::*, SendSwap, LIQUID_FEE_RATE_MSAT_PER_VBYTE,
20};
21use crate::persist::model::PaymentTxDetails;
22use crate::prelude::{PaymentTxData, PaymentType, Swap};
23use crate::recover::recoverer::Recoverer;
24use crate::swapper::Swapper;
25use crate::utils;
26use crate::wallet::OnchainWallet;
27use crate::{
28 error::PaymentError,
29 model::{PaymentState, Transaction as SdkTransaction},
30 persist::Persister,
31};
32
33#[derive(Clone)]
34pub(crate) struct SendSwapHandler {
35 config: Config,
36 onchain_wallet: Arc<dyn OnchainWallet>,
37 persister: std::sync::Arc<Persister>,
38 swapper: Arc<dyn Swapper>,
39 chain_service: Arc<dyn LiquidChainService>,
40 subscription_notifier: broadcast::Sender<String>,
41 recoverer: Arc<Recoverer>,
42}
43
44#[sdk_macros::async_trait]
45impl BlockListener for SendSwapHandler {
46 async fn on_bitcoin_block(&self, _height: u32) {}
47
48 async fn on_liquid_block(&self, _height: u32) {
49 if let Err(err) = self.check_refunds().await {
50 warn!("Could not refund expired swaps, error: {err:?}");
51 }
52 }
53}
54
55impl SendSwapHandler {
56 pub(crate) fn new(
57 config: Config,
58 onchain_wallet: Arc<dyn OnchainWallet>,
59 persister: std::sync::Arc<Persister>,
60 swapper: Arc<dyn Swapper>,
61 chain_service: Arc<dyn LiquidChainService>,
62 recoverer: Arc<Recoverer>,
63 ) -> Self {
64 let (subscription_notifier, _) = broadcast::channel::<String>(30);
65 Self {
66 config,
67 onchain_wallet,
68 persister,
69 swapper,
70 chain_service,
71 subscription_notifier,
72 recoverer,
73 }
74 }
75
76 pub(crate) fn subscribe_payment_updates(&self) -> broadcast::Receiver<String> {
77 self.subscription_notifier.subscribe()
78 }
79
80 pub(crate) async fn on_new_status(&self, update: &boltz::SwapStatus) -> Result<()> {
82 let id = &update.id;
83 let status = &update.status;
84 let swap_state = SubSwapStates::from_str(status)
85 .map_err(|_| anyhow!("Invalid SubSwapState for Send Swap {id}: {status}"))?;
86 let swap = self.fetch_send_swap_by_id(id)?;
87 info!("Handling Send Swap transition to {swap_state:?} for swap {id}");
88
89 match swap_state {
91 SubSwapStates::InvoiceSet => {
93 warn!("Received `invoice.set` state for Send Swap {id}");
94 Ok(())
95 }
96
97 SubSwapStates::TransactionClaimPending => {
100 if swap.metadata.is_local {
101 let preimage = match self.swapper.get_send_claim_tx_details(&swap).await {
102 Ok(claim_tx_response) => {
103 match self.cooperate_claim(&swap, claim_tx_response.clone()).await {
104 Ok(_) => Some(claim_tx_response.preimage),
105 Err(e) => {
106 warn!("Could not cooperate Send Swap {id} claim: {e:?}");
107 None
108 }
109 }
110 }
111 Err(e) => {
112 warn!("Could not get claim tx details for Send Swap {id}: {e:?}");
113 None
114 }
115 };
116 let preimage = match preimage {
117 Some(preimage) => preimage,
118 None => {
119 let preimage = self.swapper.get_submarine_preimage(&swap.id).await?;
120 utils::verify_payment_hash(&preimage, &swap.invoice)?;
121 info!("Fetched Send Swap {id} preimage cooperatively");
122 preimage
123 }
124 };
125 self.update_swap_info(&swap.id, Complete, Some(&preimage), None, None)?;
126 }
127
128 Ok(())
129 }
130
131 SubSwapStates::TransactionClaimed => {
133 debug!("Send Swap {id} has been claimed");
134
135 match swap.preimage {
136 Some(_) => {
137 debug!("The claim tx was a key path spend (cooperative claim)");
138 }
141 None => {
142 debug!("The claim tx was a script path spend (non-cooperative claim)");
143 let mut swaps = vec![Swap::Send(swap.clone())];
144 self.recoverer
145 .recover_from_onchain(&mut swaps, None)
146 .await?;
147
148 let Swap::Send(s) = swaps[0].clone() else {
149 return Err(anyhow!("Expected a Send swap"));
150 };
151 self.update_swap(s)?;
152 }
153 }
154
155 Ok(())
156 }
157
158 SubSwapStates::TransactionLockupFailed
164 | SubSwapStates::InvoiceFailedToPay
165 | SubSwapStates::SwapExpired => {
166 match swap.lockup_tx_id {
167 Some(_) => match swap.refund_tx_id {
168 Some(refund_tx_id) => warn!(
169 "Refund tx for Send Swap {id} was already broadcast: txid {refund_tx_id}"
170 ),
171 None => {
172 warn!("Send Swap {id} is in an unrecoverable state: {swap_state:?}, and lockup tx has been broadcast.");
173 let refund_tx_id = match self.refund(&swap, true).await {
174 Ok(refund_tx_id) => Some(refund_tx_id),
175 Err(e) => {
176 warn!("Could not refund Send swap {id} cooperatively: {e:?}");
177 None
178 }
179 };
180 self.update_swap_info(
184 &swap.id,
185 RefundPending,
186 None,
187 None,
188 refund_tx_id.as_deref(),
189 )?;
190 }
191 },
192 None => {
195 warn!("Send Swap {id} is in an unrecoverable state: {swap_state:?}, and lockup tx has never been broadcast. Resolving payment as failed.");
196 self.update_swap_info(id, Failed, None, None, None)?;
197 }
198 }
199
200 Ok(())
201 }
202
203 _ => {
204 debug!("Unhandled state for Send Swap {id}: {swap_state:?}");
205 Ok(())
206 }
207 }
208 }
209
210 pub(crate) async fn try_lockup(
211 &self,
212 swap: &SendSwap,
213 create_response: &CreateSubmarineResponse,
214 ) -> Result<Transaction, PaymentError> {
215 if swap.lockup_tx_id.is_some() {
216 debug!("Lockup tx was already broadcast for Send Swap {}", swap.id);
217 return Err(PaymentError::PaymentInProgress);
218 }
219
220 let swap_id = &swap.id;
221 debug!(
222 "Initiated Send Swap: send {} sats to liquid address {}",
223 create_response.expected_amount, create_response.address
224 );
225
226 let lockup_tx = self
227 .onchain_wallet
228 .build_tx_or_drain_tx(
229 Some(LIQUID_FEE_RATE_MSAT_PER_VBYTE),
230 &create_response.address,
231 &self.config.lbtc_asset_id(),
232 create_response.expected_amount,
233 )
234 .await?;
235 let lockup_tx_id = lockup_tx.txid().to_string();
236
237 self.persister
238 .set_send_swap_lockup_tx_id(swap_id, &lockup_tx_id)?;
239
240 info!("Broadcasting lockup tx {lockup_tx_id} for Send swap {swap_id}",);
241
242 let broadcast_result = self.chain_service.broadcast(&lockup_tx).await;
243
244 if let Err(err) = broadcast_result {
245 debug!("Could not broadcast lockup tx for Send Swap {swap_id}: {err:?}");
246 self.persister
247 .unset_send_swap_lockup_tx_id(swap_id, &lockup_tx_id)?;
248 return Err(err.into());
249 }
250
251 info!("Successfully broadcast lockup tx for Send Swap {swap_id}. Lockup tx id: {lockup_tx_id}");
252
253 let lockup_tx_fees_sat: u64 = lockup_tx.all_fees().values().sum();
256 self.persister.insert_or_update_payment(
257 PaymentTxData {
258 tx_id: lockup_tx_id.clone(),
259 timestamp: Some(utils::now()),
260 asset_id: self.config.lbtc_asset_id(),
261 amount: create_response.expected_amount,
262 fees_sat: lockup_tx_fees_sat,
263 payment_type: PaymentType::Send,
264 is_confirmed: false,
265 unblinding_data: None,
266 },
267 None,
268 false,
269 )?;
270
271 self.update_swap_info(swap_id, Pending, None, Some(&lockup_tx_id), None)?;
272
273 Ok(lockup_tx)
274 }
275
276 fn fetch_send_swap_by_id(&self, swap_id: &str) -> Result<SendSwap, PaymentError> {
277 self.persister
278 .fetch_send_swap_by_id(swap_id)
279 .map_err(|_| PaymentError::PersistError)?
280 .ok_or(PaymentError::Generic {
281 err: format!("Send Swap not found {swap_id}"),
282 })
283 }
284
285 pub(crate) fn update_swap(&self, updated_swap: SendSwap) -> Result<(), PaymentError> {
287 let swap = self.fetch_send_swap_by_id(&updated_swap.id)?;
288 let lnurl_info_updated = self.update_swap_lnurl_info(&swap, &updated_swap)?;
289 if updated_swap != swap || lnurl_info_updated {
290 info!(
291 "Updating Send swap {} to {:?} (lockup_tx_id = {:?}, refund_tx_id = {:?})",
292 updated_swap.id,
293 updated_swap.state,
294 updated_swap.lockup_tx_id,
295 updated_swap.refund_tx_id
296 );
297 self.persister.insert_or_update_send_swap(&updated_swap)?;
298 let _ = self.subscription_notifier.send(updated_swap.id);
299 }
300 Ok(())
301 }
302
303 pub(crate) fn update_swap_lnurl_info(
304 &self,
305 swap: &SendSwap,
306 updated_swap: &SendSwap,
307 ) -> Result<bool> {
308 if swap.preimage.is_none() {
309 let Some(tx_id) = updated_swap.lockup_tx_id.clone() else {
310 return Ok(false);
311 };
312 let Some(ref preimage_str) = updated_swap.preimage.clone() else {
313 return Ok(false);
314 };
315 if let Some(PaymentTxDetails {
316 destination,
317 description,
318 lnurl_info: Some(mut lnurl_info),
319 bip353_address,
320 ..
321 }) = self.persister.get_payment_details(&tx_id)?
322 {
323 if let Some(SuccessAction::Aes { data }) =
324 lnurl_info.lnurl_pay_unprocessed_success_action.clone()
325 {
326 debug!(
327 "Decrypting AES success action with preimage for Send Swap {}",
328 swap.id
329 );
330 let preimage = sha256::Hash::from_str(preimage_str)?;
331 let preimage_arr = preimage.to_byte_array();
332 let result = match (data, &preimage_arr).try_into() {
333 Ok(data) => AesSuccessActionDataResult::Decrypted { data },
334 Err(e) => AesSuccessActionDataResult::ErrorStatus {
335 reason: e.to_string(),
336 },
337 };
338 lnurl_info.lnurl_pay_success_action =
339 Some(SuccessActionProcessed::Aes { result });
340 self.persister
341 .insert_or_update_payment_details(PaymentTxDetails {
342 tx_id,
343 destination,
344 description,
345 lnurl_info: Some(lnurl_info),
346 bip353_address,
347 ..Default::default()
348 })?;
349 return Ok(true);
350 }
351 }
352 }
353 Ok(false)
354 }
355
356 pub(crate) fn update_swap_info(
358 &self,
359 swap_id: &str,
360 to_state: PaymentState,
361 preimage: Option<&str>,
362 lockup_tx_id: Option<&str>,
363 refund_tx_id: Option<&str>,
364 ) -> Result<(), PaymentError> {
365 info!(
366 "Transitioning Send swap {} to {:?} (lockup_tx_id = {:?}, refund_tx_id = {:?})",
367 swap_id, to_state, lockup_tx_id, refund_tx_id
368 );
369 let swap = self.fetch_send_swap_by_id(swap_id)?;
370 Self::validate_state_transition(swap.state, to_state)?;
371 self.persister.try_handle_send_swap_update(
372 swap_id,
373 to_state,
374 preimage,
375 lockup_tx_id,
376 refund_tx_id,
377 )?;
378 let updated_swap = self.fetch_send_swap_by_id(swap_id)?;
379 let lnurl_info_updated = self.update_swap_lnurl_info(&swap, &updated_swap)?;
380 if updated_swap != swap || lnurl_info_updated {
381 let _ = self.subscription_notifier.send(updated_swap.id);
382 }
383 Ok(())
384 }
385
386 async fn cooperate_claim(
387 &self,
388 send_swap: &SendSwap,
389 claim_tx_response: SubmarineClaimTxResponse,
390 ) -> Result<(), PaymentError> {
391 debug!(
392 "Claim is pending for Send Swap {}. Initiating cooperative claim",
393 &send_swap.id
394 );
395 let refund_address = match send_swap.refund_address {
396 Some(ref refund_address) => refund_address.clone(),
397 None => {
398 let address = self.onchain_wallet.next_unused_address().await?.to_string();
400 self.persister
401 .set_send_swap_refund_address(&send_swap.id, &address)?;
402 address
403 }
404 };
405
406 self.swapper
407 .claim_send_swap_cooperative(send_swap, claim_tx_response, &refund_address)
408 .await?;
409 Ok(())
410 }
411
412 pub(crate) async fn refund(
413 &self,
414 swap: &SendSwap,
415 is_cooperative: bool,
416 ) -> Result<String, PaymentError> {
417 info!(
418 "Initiating refund for Send Swap {}, is_cooperative: {is_cooperative}",
419 swap.id
420 );
421
422 let swap_script = swap.get_swap_script()?;
423 let refund_address = match swap.refund_address {
424 Some(ref refund_address) => refund_address.clone(),
425 None => {
426 let address = self.onchain_wallet.next_unused_address().await?.to_string();
428 self.persister
429 .set_send_swap_refund_address(&swap.id, &address)?;
430 address
431 }
432 };
433
434 let script_pk = swap_script
435 .to_address(self.config.network.into())
436 .map_err(|e| anyhow!("Could not retrieve address from swap script: {e:?}"))?
437 .to_unconfidential()
438 .script_pubkey();
439 let utxos = self.chain_service.get_script_utxos(&script_pk).await?;
440 let SdkTransaction::Liquid(refund_tx) = self
441 .swapper
442 .create_refund_tx(
443 Swap::Send(swap.clone()),
444 &refund_address,
445 utxos,
446 None,
447 is_cooperative,
448 )
449 .await?
450 else {
451 return Err(PaymentError::Generic {
452 err: format!(
453 "Unexpected refund tx type returned for Send swap {}",
454 swap.id
455 ),
456 });
457 };
458 let refund_tx_id = self.chain_service.broadcast(&refund_tx).await?.to_string();
459
460 info!(
461 "Successfully broadcast refund for Send Swap {}, is_cooperative: {is_cooperative}",
462 swap.id
463 );
464
465 Ok(refund_tx_id)
466 }
467
468 async fn check_swap_expiry(&self, swap: &SendSwap) -> Result<bool> {
469 let swap_creation_time = UNIX_EPOCH + Duration::from_secs(swap.created_at as u64);
470 let duration_since_creation_time = SystemTime::now().duration_since(swap_creation_time)?;
471 if duration_since_creation_time.as_secs() < 60 * 10 {
472 return Ok(false);
473 }
474
475 let swap_script = swap.get_swap_script()?;
476 let current_height = self.onchain_wallet.tip().await;
477 let locktime_from_height = LockTime::from_height(current_height)?;
478
479 info!("Checking Send Swap {} expiration: locktime_from_height = {locktime_from_height:?}, swap_script.locktime = {:?}", swap.id, swap_script.locktime);
480 Ok(utils::is_locktime_expired(
481 locktime_from_height,
482 swap_script.locktime,
483 ))
484 }
485
486 pub(crate) async fn try_refund_all(&self, swaps: &[SendSwap]) {
488 for swap in swaps {
489 if swap.refund_tx_id.is_some() {
490 continue;
491 }
492
493 let has_swap_expired = self.check_swap_expiry(swap).await.unwrap_or(false);
494
495 if !has_swap_expired && swap.state == Pending {
496 continue;
497 }
498
499 let refund_tx_id_result = match swap.state {
500 Pending => self.refund(swap, false).await,
501 RefundPending => match has_swap_expired {
502 true => {
503 self.refund(swap, true)
504 .or_else(|e| {
505 warn!("Failed to initiate cooperative refund, switching to non-cooperative: {e:?}");
506 self.refund(swap, false)
507 })
508 .await
509 }
510 false => self.refund(swap, true).await,
511 },
512 _ => {
513 continue;
514 }
515 };
516
517 if let Ok(refund_tx_id) = refund_tx_id_result {
518 let update_swap_info_result =
519 self.update_swap_info(&swap.id, RefundPending, None, None, Some(&refund_tx_id));
520 if let Err(err) = update_swap_info_result {
521 warn!(
522 "Could not update Send swap {} information, error: {err:?}",
523 swap.id
524 );
525 };
526 }
527 }
528 }
529
530 pub(crate) async fn check_refunds(&self) -> Result<(), PaymentError> {
533 let pending_swaps = self.persister.list_pending_send_swaps()?;
534 self.try_refund_all(&pending_swaps).await;
535 Ok(())
536 }
537
538 fn validate_state_transition(
539 from_state: PaymentState,
540 to_state: PaymentState,
541 ) -> Result<(), PaymentError> {
542 match (from_state, to_state) {
543 (TimedOut, Created) => Ok(()),
544 (_, Created) => Err(PaymentError::Generic {
545 err: format!("Cannot transition from {from_state:?} to Created state"),
546 }),
547
548 (Created | Pending, Pending) => Ok(()),
549 (_, Pending) => Err(PaymentError::Generic {
550 err: format!("Cannot transition from {from_state:?} to Pending state"),
551 }),
552
553 (Created | Pending, Complete) => Ok(()),
554 (_, Complete) => Err(PaymentError::Generic {
555 err: format!("Cannot transition from {from_state:?} to Complete state"),
556 }),
557
558 (Created | TimedOut, TimedOut) => Ok(()),
559 (_, TimedOut) => Err(PaymentError::Generic {
560 err: format!("Cannot transition from {from_state:?} to TimedOut state"),
561 }),
562
563 (_, Refundable) => Err(PaymentError::Generic {
564 err: format!("Cannot transition from {from_state:?} to Refundable state"),
565 }),
566
567 (Pending, RefundPending) => Ok(()),
568 (_, RefundPending) => Err(PaymentError::Generic {
569 err: format!("Cannot transition from {from_state:?} to RefundPending state"),
570 }),
571
572 (Complete, Failed) => Err(PaymentError::Generic {
573 err: format!("Cannot transition from {from_state:?} to Failed state"),
574 }),
575 (_, Failed) => Ok(()),
576
577 (_, WaitingFeeAcceptance) => Err(PaymentError::Generic {
578 err: format!("Cannot transition from {from_state:?} to WaitingFeeAcceptance state"),
579 }),
580 }
581 }
582}
583
584#[cfg(test)]
585mod tests {
586 use std::collections::{HashMap, HashSet};
587
588 use anyhow::Result;
589
590 use crate::{
591 model::PaymentState::{self, *},
592 test_utils::{
593 persist::{create_persister, new_send_swap},
594 send_swap::new_send_swap_handler,
595 },
596 };
597
598 #[cfg(feature = "browser-tests")]
599 wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
600
601 #[sdk_macros::async_test_all]
602 async fn test_send_swap_state_transitions() -> Result<()> {
603 create_persister!(storage);
604 let send_swap_handler = new_send_swap_handler(storage.clone())?;
605
606 let valid_combinations = HashMap::from([
608 (
609 Created,
610 HashSet::from([Pending, Complete, TimedOut, Failed]),
611 ),
612 (
613 Pending,
614 HashSet::from([Pending, RefundPending, Complete, Failed]),
615 ),
616 (TimedOut, HashSet::from([TimedOut, Created, Failed])),
617 (Complete, HashSet::from([])),
618 (Refundable, HashSet::from([Failed])),
619 (Failed, HashSet::from([Failed])),
620 ]);
621
622 for (first_state, allowed_states) in valid_combinations.iter() {
623 for allowed_state in allowed_states {
624 let send_swap = new_send_swap(Some(*first_state), None);
625 storage.insert_or_update_send_swap(&send_swap)?;
626
627 assert!(send_swap_handler
628 .update_swap_info(&send_swap.id, *allowed_state, None, None, None)
629 .is_ok());
630 }
631 }
632
633 let all_states = HashSet::from([Created, Pending, Complete, TimedOut, Failed]);
635 let invalid_combinations: HashMap<PaymentState, HashSet<PaymentState>> = valid_combinations
636 .iter()
637 .map(|(first_state, allowed_states)| {
638 (
639 *first_state,
640 all_states.difference(allowed_states).cloned().collect(),
641 )
642 })
643 .collect();
644
645 for (first_state, disallowed_states) in invalid_combinations.iter() {
646 for disallowed_state in disallowed_states {
647 let send_swap = new_send_swap(Some(*first_state), None);
648 storage.insert_or_update_send_swap(&send_swap)?;
649
650 assert!(send_swap_handler
651 .update_swap_info(&send_swap.id, *disallowed_state, None, None, None)
652 .is_err());
653 }
654 }
655
656 Ok(())
657 }
658}