1use std::str::FromStr;
2use std::sync::Arc;
3use std::time::Duration;
4
5use anyhow::{anyhow, Result};
6use boltz_client::boltz::SubmarineClaimTxResponse;
7use boltz_client::swaps::boltz;
8use boltz_client::swaps::{boltz::CreateSubmarineResponse, boltz::SubSwapStates};
9use futures_util::TryFutureExt;
10use log::{debug, error, info, warn};
11use lwk_wollet::elements::{LockTime, Transaction};
12use lwk_wollet::hashes::{sha256, Hash};
13use sdk_common::prelude::{AesSuccessActionDataResult, SuccessAction, SuccessActionProcessed};
14use tokio::sync::broadcast;
15use web_time::{SystemTime, UNIX_EPOCH};
16
17use crate::chain::liquid::LiquidChainService;
18use crate::model::{
19 BlockListener, Config, PaymentState::*, SendSwap, LIQUID_FEE_RATE_MSAT_PER_VBYTE,
20};
21use crate::persist::model::{PaymentTxBalance, PaymentTxDetails};
22use crate::prelude::{PaymentTxData, PaymentType, Swap};
23use crate::recover::recoverer::Recoverer;
24use crate::swapper::Swapper;
25use crate::utils;
26use crate::wallet::OnchainWallet;
27use crate::{
28 error::PaymentError,
29 model::{PaymentState, Transaction as SdkTransaction},
30 persist::Persister,
31};
32
33#[derive(Clone)]
34pub(crate) struct SendSwapHandler {
35 config: Config,
36 onchain_wallet: Arc<dyn OnchainWallet>,
37 persister: std::sync::Arc<Persister>,
38 swapper: Arc<dyn Swapper>,
39 chain_service: Arc<dyn LiquidChainService>,
40 subscription_notifier: broadcast::Sender<String>,
41 recoverer: Arc<Recoverer>,
42}
43
44#[sdk_macros::async_trait]
45impl BlockListener for SendSwapHandler {
46 async fn on_bitcoin_block(&self, _height: u32) {}
47
48 async fn on_liquid_block(&self, _height: u32) {
49 if let Err(err) = self.check_refunds().await {
50 warn!("Could not refund expired swaps, error: {err:?}");
51 }
52 }
53}
54
55impl SendSwapHandler {
56 pub(crate) fn new(
57 config: Config,
58 onchain_wallet: Arc<dyn OnchainWallet>,
59 persister: std::sync::Arc<Persister>,
60 swapper: Arc<dyn Swapper>,
61 chain_service: Arc<dyn LiquidChainService>,
62 recoverer: Arc<Recoverer>,
63 ) -> Self {
64 let (subscription_notifier, _) = broadcast::channel::<String>(30);
65 Self {
66 config,
67 onchain_wallet,
68 persister,
69 swapper,
70 chain_service,
71 subscription_notifier,
72 recoverer,
73 }
74 }
75
76 pub(crate) fn subscribe_payment_updates(&self) -> broadcast::Receiver<String> {
77 self.subscription_notifier.subscribe()
78 }
79
80 pub(crate) async fn on_new_status(&self, update: &boltz::SwapStatus) -> Result<()> {
82 let id = &update.id;
83 let status = &update.status;
84 let swap_state = SubSwapStates::from_str(status)
85 .map_err(|_| anyhow!("Invalid SubSwapState for Send Swap {id}: {status}"))?;
86 let swap = self.fetch_send_swap_by_id(id)?;
87 info!("Handling Send Swap transition to {swap_state:?} for swap {id}");
88
89 match swap_state {
91 SubSwapStates::InvoiceSet => {
93 warn!("Received `invoice.set` state for Send Swap {id}");
94 Ok(())
95 }
96
97 SubSwapStates::TransactionClaimPending => {
100 if swap.metadata.is_local {
101 let preimage = match self.swapper.get_send_claim_tx_details(&swap).await {
102 Ok(claim_tx_response) => {
103 match self.cooperate_claim(&swap, claim_tx_response.clone()).await {
104 Ok(_) => Some(claim_tx_response.preimage),
105 Err(e) => {
106 warn!("Could not cooperate Send Swap {id} claim: {e:?}");
107 None
108 }
109 }
110 }
111 Err(e) => {
112 warn!("Could not get claim tx details for Send Swap {id}: {e:?}");
113 None
114 }
115 };
116 let preimage = match preimage {
117 Some(preimage) => preimage,
118 None => {
119 let preimage = self.swapper.get_submarine_preimage(&swap.id).await?;
120 utils::verify_payment_hash(&preimage, &swap.invoice)?;
121 info!("Fetched Send Swap {id} preimage cooperatively");
122 preimage
123 }
124 };
125 self.update_swap_info(&swap.id, Complete, Some(&preimage), None, None)?;
126 }
127
128 Ok(())
129 }
130
131 SubSwapStates::TransactionClaimed => {
133 debug!("Send Swap {id} has been claimed");
134
135 match swap.preimage {
136 Some(_) => {
137 debug!("The claim tx was a key path spend (cooperative claim)");
138 }
141 None => {
142 debug!("The claim tx was a script path spend (non-cooperative claim)");
143 let mut swaps = vec![Swap::Send(swap.clone())];
144 self.recoverer
145 .recover_from_onchain(&mut swaps, None)
146 .await?;
147
148 let Swap::Send(s) = swaps[0].clone() else {
149 return Err(anyhow!("Expected a Send swap"));
150 };
151 self.update_swap(s)?;
152 }
153 }
154
155 Ok(())
156 }
157
158 SubSwapStates::TransactionLockupFailed
164 | SubSwapStates::InvoiceFailedToPay
165 | SubSwapStates::SwapExpired => {
166 match swap.lockup_tx_id {
167 Some(_) => match swap.refund_tx_id {
168 Some(refund_tx_id) => warn!(
169 "Refund tx for Send Swap {id} was already broadcast: txid {refund_tx_id}"
170 ),
171 None => {
172 warn!("Send Swap {id} is in an unrecoverable state: {swap_state:?}, and lockup tx has been broadcast.");
173 let refund_tx_id = match self.refund(&swap, true).await {
174 Ok(refund_tx_id) => Some(refund_tx_id),
175 Err(e) => {
176 warn!("Could not refund Send swap {id} cooperatively: {e:?}");
177 None
178 }
179 };
180 self.update_swap_info(
184 &swap.id,
185 RefundPending,
186 None,
187 None,
188 refund_tx_id.as_deref(),
189 )?;
190 }
191 },
192 None => {
195 warn!("Send Swap {id} is in an unrecoverable state: {swap_state:?}, and lockup tx has never been broadcast. Resolving payment as failed.");
196 self.update_swap_info(id, Failed, None, None, None)?;
197 }
198 }
199
200 Ok(())
201 }
202
203 _ => {
204 debug!("Unhandled state for Send Swap {id}: {swap_state:?}");
205 Ok(())
206 }
207 }
208 }
209
210 pub(crate) async fn try_lockup(
211 &self,
212 swap: &SendSwap,
213 create_response: &CreateSubmarineResponse,
214 ) -> Result<Transaction, PaymentError> {
215 if swap.lockup_tx_id.is_some() {
216 debug!("Lockup tx was already broadcast for Send Swap {}", swap.id);
217 return Err(PaymentError::PaymentInProgress);
218 }
219
220 let swap_id = &swap.id;
221 debug!(
222 "Initiated Send Swap: send {} sats to liquid address {}",
223 create_response.expected_amount, create_response.address
224 );
225
226 let lockup_tx = self
227 .onchain_wallet
228 .build_tx_or_drain_tx(
229 Some(LIQUID_FEE_RATE_MSAT_PER_VBYTE),
230 &create_response.address,
231 &self.config.lbtc_asset_id(),
232 create_response.expected_amount,
233 )
234 .await?;
235 let lockup_tx_id = lockup_tx.txid().to_string();
236
237 self.persister
238 .set_send_swap_lockup_tx_id(swap_id, &lockup_tx_id)?;
239
240 info!("Broadcasting lockup tx {lockup_tx_id} for Send swap {swap_id}",);
241
242 let broadcast_result = self.chain_service.broadcast(&lockup_tx).await;
243
244 if let Err(err) = broadcast_result {
245 debug!("Could not broadcast lockup tx for Send Swap {swap_id}: {err:?}");
246 self.persister
247 .unset_send_swap_lockup_tx_id(swap_id, &lockup_tx_id)?;
248 return Err(err.into());
249 }
250
251 info!("Successfully broadcast lockup tx for Send Swap {swap_id}. Lockup tx id: {lockup_tx_id}");
252
253 let lockup_tx_fees_sat: u64 = lockup_tx.all_fees().values().sum();
256 self.persister.insert_or_update_payment(
257 PaymentTxData {
258 tx_id: lockup_tx_id.clone(),
259 timestamp: Some(utils::now()),
260 fees_sat: lockup_tx_fees_sat,
261 is_confirmed: false,
262 unblinding_data: None,
263 },
264 &[PaymentTxBalance {
265 asset_id: self.config.lbtc_asset_id(),
266 amount: create_response.expected_amount,
267 payment_type: PaymentType::Send,
268 }],
269 None,
270 false,
271 )?;
272
273 self.update_swap_info(swap_id, Pending, None, Some(&lockup_tx_id), None)?;
274
275 Ok(lockup_tx)
276 }
277
278 fn fetch_send_swap_by_id(&self, swap_id: &str) -> Result<SendSwap, PaymentError> {
279 self.persister
280 .fetch_send_swap_by_id(swap_id)
281 .map_err(|e| {
282 error!("Failed to fetch send swap by id: {e:?}");
283 PaymentError::PersistError
284 })?
285 .ok_or(PaymentError::Generic {
286 err: format!("Send Swap not found {swap_id}"),
287 })
288 }
289
290 pub(crate) fn update_swap(&self, updated_swap: SendSwap) -> Result<(), PaymentError> {
292 let swap = self.fetch_send_swap_by_id(&updated_swap.id)?;
293 let lnurl_info_updated = self.update_swap_lnurl_info(&swap, &updated_swap)?;
294 if updated_swap != swap || lnurl_info_updated {
295 info!(
296 "Updating Send swap {} to {:?} (lockup_tx_id = {:?}, refund_tx_id = {:?})",
297 updated_swap.id,
298 updated_swap.state,
299 updated_swap.lockup_tx_id,
300 updated_swap.refund_tx_id
301 );
302 self.persister.insert_or_update_send_swap(&updated_swap)?;
303 let _ = self.subscription_notifier.send(updated_swap.id);
304 }
305 Ok(())
306 }
307
308 pub(crate) fn update_swap_lnurl_info(
309 &self,
310 swap: &SendSwap,
311 updated_swap: &SendSwap,
312 ) -> Result<bool> {
313 if swap.preimage.is_none() {
314 let Some(tx_id) = updated_swap.lockup_tx_id.clone() else {
315 return Ok(false);
316 };
317 let Some(ref preimage_str) = updated_swap.preimage.clone() else {
318 return Ok(false);
319 };
320 if let Some(PaymentTxDetails {
321 destination,
322 description,
323 lnurl_info: Some(mut lnurl_info),
324 bip353_address,
325 ..
326 }) = self.persister.get_payment_details(&tx_id)?
327 {
328 if let Some(SuccessAction::Aes { data }) =
329 lnurl_info.lnurl_pay_unprocessed_success_action.clone()
330 {
331 debug!(
332 "Decrypting AES success action with preimage for Send Swap {}",
333 swap.id
334 );
335 let preimage = sha256::Hash::from_str(preimage_str)?;
336 let preimage_arr = preimage.to_byte_array();
337 let result = match (data, &preimage_arr).try_into() {
338 Ok(data) => AesSuccessActionDataResult::Decrypted { data },
339 Err(e) => AesSuccessActionDataResult::ErrorStatus {
340 reason: e.to_string(),
341 },
342 };
343 lnurl_info.lnurl_pay_success_action =
344 Some(SuccessActionProcessed::Aes { result });
345 self.persister
346 .insert_or_update_payment_details(PaymentTxDetails {
347 tx_id,
348 destination,
349 description,
350 lnurl_info: Some(lnurl_info),
351 bip353_address,
352 ..Default::default()
353 })?;
354 return Ok(true);
355 }
356 }
357 }
358 Ok(false)
359 }
360
361 pub(crate) fn update_swap_info(
363 &self,
364 swap_id: &str,
365 to_state: PaymentState,
366 preimage: Option<&str>,
367 lockup_tx_id: Option<&str>,
368 refund_tx_id: Option<&str>,
369 ) -> Result<(), PaymentError> {
370 info!(
371 "Transitioning Send swap {swap_id} to {to_state:?} (lockup_tx_id = {lockup_tx_id:?}, refund_tx_id = {refund_tx_id:?})"
372 );
373 let swap = self.fetch_send_swap_by_id(swap_id)?;
374 Self::validate_state_transition(swap.state, to_state)?;
375 self.persister.try_handle_send_swap_update(
376 swap_id,
377 to_state,
378 preimage,
379 lockup_tx_id,
380 refund_tx_id,
381 )?;
382 let updated_swap = self.fetch_send_swap_by_id(swap_id)?;
383 let lnurl_info_updated = self.update_swap_lnurl_info(&swap, &updated_swap)?;
384 if updated_swap != swap || lnurl_info_updated {
385 let _ = self.subscription_notifier.send(updated_swap.id);
386 }
387 Ok(())
388 }
389
390 async fn cooperate_claim(
391 &self,
392 send_swap: &SendSwap,
393 claim_tx_response: SubmarineClaimTxResponse,
394 ) -> Result<(), PaymentError> {
395 debug!(
396 "Claim is pending for Send Swap {}. Initiating cooperative claim",
397 &send_swap.id
398 );
399 let refund_address = match send_swap.refund_address {
400 Some(ref refund_address) => refund_address.clone(),
401 None => {
402 let address = self.onchain_wallet.next_unused_address().await?.to_string();
404 self.persister
405 .set_send_swap_refund_address(&send_swap.id, &address)?;
406 address
407 }
408 };
409
410 self.swapper
411 .claim_send_swap_cooperative(send_swap, claim_tx_response, &refund_address)
412 .await?;
413 Ok(())
414 }
415
416 pub(crate) async fn refund(
417 &self,
418 swap: &SendSwap,
419 is_cooperative: bool,
420 ) -> Result<String, PaymentError> {
421 info!(
422 "Initiating refund for Send Swap {}, is_cooperative: {is_cooperative}",
423 swap.id
424 );
425
426 let swap_script = swap.get_swap_script()?;
427 let refund_address = match swap.refund_address {
428 Some(ref refund_address) => refund_address.clone(),
429 None => {
430 let address = self.onchain_wallet.next_unused_address().await?.to_string();
432 self.persister
433 .set_send_swap_refund_address(&swap.id, &address)?;
434 address
435 }
436 };
437
438 let script_pk = swap_script
439 .to_address(self.config.network.into())
440 .map_err(|e| anyhow!("Could not retrieve address from swap script: {e:?}"))?
441 .to_unconfidential()
442 .script_pubkey();
443 let utxos = self.chain_service.get_script_utxos(&script_pk).await?;
444 let SdkTransaction::Liquid(refund_tx) = self
445 .swapper
446 .create_refund_tx(
447 Swap::Send(swap.clone()),
448 &refund_address,
449 utxos,
450 None,
451 is_cooperative,
452 )
453 .await?
454 else {
455 return Err(PaymentError::Generic {
456 err: format!(
457 "Unexpected refund tx type returned for Send swap {}",
458 swap.id
459 ),
460 });
461 };
462 let refund_tx_id = self.chain_service.broadcast(&refund_tx).await?.to_string();
463
464 info!(
465 "Successfully broadcast refund for Send Swap {}, is_cooperative: {is_cooperative}",
466 swap.id
467 );
468
469 Ok(refund_tx_id)
470 }
471
472 async fn check_swap_expiry(&self, swap: &SendSwap) -> Result<bool> {
473 let swap_creation_time = UNIX_EPOCH + Duration::from_secs(swap.created_at as u64);
474 let duration_since_creation_time = SystemTime::now().duration_since(swap_creation_time)?;
475 if duration_since_creation_time.as_secs() < 60 * 10 {
476 return Ok(false);
477 }
478
479 let swap_script = swap.get_swap_script()?;
480 let current_height = self.onchain_wallet.tip().await;
481 let locktime_from_height = LockTime::from_height(current_height)?;
482
483 info!("Checking Send Swap {} expiration: locktime_from_height = {locktime_from_height:?}, swap_script.locktime = {:?}", swap.id, swap_script.locktime);
484 Ok(utils::is_locktime_expired(
485 locktime_from_height,
486 swap_script.locktime,
487 ))
488 }
489
490 pub(crate) async fn try_refund_all(&self, swaps: &[SendSwap]) {
492 for swap in swaps {
493 if swap.refund_tx_id.is_some() {
494 continue;
495 }
496
497 let has_swap_expired = self.check_swap_expiry(swap).await.unwrap_or(false);
498
499 if !has_swap_expired && swap.state == Pending {
500 continue;
501 }
502
503 let refund_tx_id_result = match swap.state {
504 Pending => self.refund(swap, false).await,
505 RefundPending => match has_swap_expired {
506 true => {
507 self.refund(swap, true)
508 .or_else(|e| {
509 warn!("Failed to initiate cooperative refund, switching to non-cooperative: {e:?}");
510 self.refund(swap, false)
511 })
512 .await
513 }
514 false => self.refund(swap, true).await,
515 },
516 _ => {
517 continue;
518 }
519 };
520
521 if let Ok(refund_tx_id) = refund_tx_id_result {
522 let update_swap_info_result =
523 self.update_swap_info(&swap.id, RefundPending, None, None, Some(&refund_tx_id));
524 if let Err(err) = update_swap_info_result {
525 warn!(
526 "Could not update Send swap {} information, error: {err:?}",
527 swap.id
528 );
529 };
530 }
531 }
532 }
533
534 pub(crate) async fn check_refunds(&self) -> Result<(), PaymentError> {
537 let pending_swaps = self.persister.list_pending_send_swaps()?;
538 self.try_refund_all(&pending_swaps).await;
539 Ok(())
540 }
541
542 fn validate_state_transition(
543 from_state: PaymentState,
544 to_state: PaymentState,
545 ) -> Result<(), PaymentError> {
546 match (from_state, to_state) {
547 (TimedOut, Created) => Ok(()),
548 (_, Created) => Err(PaymentError::Generic {
549 err: format!("Cannot transition from {from_state:?} to Created state"),
550 }),
551
552 (Created | Pending, Pending) => Ok(()),
553 (_, Pending) => Err(PaymentError::Generic {
554 err: format!("Cannot transition from {from_state:?} to Pending state"),
555 }),
556
557 (Created | Pending, Complete) => Ok(()),
558 (_, Complete) => Err(PaymentError::Generic {
559 err: format!("Cannot transition from {from_state:?} to Complete state"),
560 }),
561
562 (Created | TimedOut, TimedOut) => Ok(()),
563 (_, TimedOut) => Err(PaymentError::Generic {
564 err: format!("Cannot transition from {from_state:?} to TimedOut state"),
565 }),
566
567 (_, Refundable) => Err(PaymentError::Generic {
568 err: format!("Cannot transition from {from_state:?} to Refundable state"),
569 }),
570
571 (Pending, RefundPending) => Ok(()),
572 (_, RefundPending) => Err(PaymentError::Generic {
573 err: format!("Cannot transition from {from_state:?} to RefundPending state"),
574 }),
575
576 (Complete, Failed) => Err(PaymentError::Generic {
577 err: format!("Cannot transition from {from_state:?} to Failed state"),
578 }),
579 (_, Failed) => Ok(()),
580
581 (_, WaitingFeeAcceptance) => Err(PaymentError::Generic {
582 err: format!("Cannot transition from {from_state:?} to WaitingFeeAcceptance state"),
583 }),
584 }
585 }
586}
587
588#[cfg(test)]
589mod tests {
590 use std::collections::{HashMap, HashSet};
591
592 use anyhow::Result;
593
594 use crate::{
595 model::PaymentState::{self, *},
596 test_utils::{
597 persist::{create_persister, new_send_swap},
598 send_swap::new_send_swap_handler,
599 },
600 };
601
602 #[cfg(feature = "browser-tests")]
603 wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
604
605 #[sdk_macros::async_test_all]
606 async fn test_send_swap_state_transitions() -> Result<()> {
607 create_persister!(storage);
608 let send_swap_handler = new_send_swap_handler(storage.clone())?;
609
610 let valid_combinations = HashMap::from([
612 (
613 Created,
614 HashSet::from([Pending, Complete, TimedOut, Failed]),
615 ),
616 (
617 Pending,
618 HashSet::from([Pending, RefundPending, Complete, Failed]),
619 ),
620 (TimedOut, HashSet::from([TimedOut, Created, Failed])),
621 (Complete, HashSet::from([])),
622 (Refundable, HashSet::from([Failed])),
623 (Failed, HashSet::from([Failed])),
624 ]);
625
626 for (first_state, allowed_states) in valid_combinations.iter() {
627 for allowed_state in allowed_states {
628 let send_swap = new_send_swap(Some(*first_state), None);
629 storage.insert_or_update_send_swap(&send_swap)?;
630
631 assert!(send_swap_handler
632 .update_swap_info(&send_swap.id, *allowed_state, None, None, None)
633 .is_ok());
634 }
635 }
636
637 let all_states = HashSet::from([Created, Pending, Complete, TimedOut, Failed]);
639 let invalid_combinations: HashMap<PaymentState, HashSet<PaymentState>> = valid_combinations
640 .iter()
641 .map(|(first_state, allowed_states)| {
642 (
643 *first_state,
644 all_states.difference(allowed_states).cloned().collect(),
645 )
646 })
647 .collect();
648
649 for (first_state, disallowed_states) in invalid_combinations.iter() {
650 for disallowed_state in disallowed_states {
651 let send_swap = new_send_swap(Some(*first_state), None);
652 storage.insert_or_update_send_swap(&send_swap)?;
653
654 assert!(send_swap_handler
655 .update_swap_info(&send_swap.id, *disallowed_state, None, None, None)
656 .is_err());
657 }
658 }
659
660 Ok(())
661 }
662}