1use std::collections::HashSet;
2use std::str::FromStr;
3use std::sync::Arc;
4
5use anyhow::{anyhow, bail, Context, Result};
6use boltz_client::swaps::boltz::RevSwapStates;
7use boltz_client::{boltz, Serialize, ToHex};
8use log::{debug, error, info, warn};
9use lwk_wollet::elements::secp256k1_zkp::Secp256k1;
10use lwk_wollet::elements::{Transaction, Txid};
11use lwk_wollet::hashes::hex::DisplayHex;
12use lwk_wollet::secp256k1::SecretKey;
13use tokio::sync::{broadcast, Mutex};
14
15use crate::chain::liquid::LiquidChainService;
16use crate::error::is_txn_mempool_conflict_error;
17use crate::model::{BlockListener, PaymentState::*};
18use crate::model::{Config, PaymentTxData, PaymentType, ReceiveSwap};
19use crate::persist::model::PaymentTxBalance;
20use crate::prelude::Swap;
21use crate::{ensure_sdk, utils};
22use crate::{
23 error::PaymentError, model::PaymentState, persist::Persister, swapper::Swapper,
24 wallet::OnchainWallet,
25};
26
27pub const DEFAULT_ZERO_CONF_MAX_SAT: u64 = 1_000_000;
29
30pub(crate) struct ReceiveSwapHandler {
31 config: Config,
32 onchain_wallet: Arc<dyn OnchainWallet>,
33 persister: std::sync::Arc<Persister>,
34 swapper: Arc<dyn Swapper>,
35 subscription_notifier: broadcast::Sender<String>,
36 liquid_chain_service: Arc<dyn LiquidChainService>,
37 claiming_swaps: Arc<Mutex<HashSet<String>>>,
38}
39
40#[sdk_macros::async_trait]
41impl BlockListener for ReceiveSwapHandler {
42 async fn on_bitcoin_block(&self, _height: u32) {}
43
44 async fn on_liquid_block(&self, height: u32) {
45 if let Err(e) = self.claim_confirmed_lockups(height).await {
46 error!("Error claiming confirmed lockups: {e:?}");
47 }
48 }
49}
50
51impl ReceiveSwapHandler {
52 pub(crate) fn new(
53 config: Config,
54 onchain_wallet: Arc<dyn OnchainWallet>,
55 persister: std::sync::Arc<Persister>,
56 swapper: Arc<dyn Swapper>,
57 liquid_chain_service: Arc<dyn LiquidChainService>,
58 ) -> Self {
59 let (subscription_notifier, _) = broadcast::channel::<String>(30);
60 Self {
61 config,
62 onchain_wallet,
63 persister,
64 swapper,
65 subscription_notifier,
66 liquid_chain_service,
67 claiming_swaps: Arc::new(Mutex::new(HashSet::new())),
68 }
69 }
70
71 pub(crate) fn subscribe_payment_updates(&self) -> broadcast::Receiver<String> {
72 self.subscription_notifier.subscribe()
73 }
74
75 pub(crate) async fn on_new_status(&self, update: &boltz::SwapStatus) -> Result<()> {
77 let id = &update.id;
78 let status = &update.status;
79 let swap_state = RevSwapStates::from_str(status)
80 .map_err(|_| anyhow!("Invalid RevSwapState for Receive Swap {id}: {status}"))?;
81 let receive_swap = self.fetch_receive_swap_by_id(id)?;
82
83 info!("Handling Receive Swap transition to {swap_state:?} for swap {id}");
84
85 match swap_state {
86 RevSwapStates::SwapExpired
87 | RevSwapStates::InvoiceExpired
88 | RevSwapStates::TransactionFailed
89 | RevSwapStates::TransactionRefunded => {
90 match receive_swap.mrh_tx_id {
91 Some(mrh_tx_id) => {
92 warn!("Swap {id} is expired but MRH payment was received: txid {mrh_tx_id}")
93 }
94 None => {
95 error!("Swap {id} entered into an unrecoverable state: {swap_state:?}");
96 self.update_swap_info(id, Failed, None, None, None, None)?;
97 }
98 }
99 Ok(())
100 }
101 RevSwapStates::TransactionMempool => {
104 let Some(transaction) = update.transaction.clone() else {
105 return Err(anyhow!("Unexpected payload from Boltz status stream"));
106 };
107
108 if let Some(claim_tx_id) = receive_swap.claim_tx_id {
109 return Err(anyhow!(
110 "Claim tx for Receive Swap {id} was already broadcast: txid {claim_tx_id}"
111 ));
112 }
113
114 if let Some(mrh_tx_id) = receive_swap.mrh_tx_id {
116 return Err(anyhow!(
117 "MRH tx for Receive Swap {id} was already broadcast, ignoring swap: txid {mrh_tx_id}"
118 ));
119 }
120
121 let tx_hex = transaction.hex.ok_or(anyhow!(
123 "Missing lockup transaction hex in swap status update"
124 ))?;
125 let lockup_tx = utils::deserialize_tx_hex(&tx_hex)
126 .context("Failed to deserialize tx hex in swap status update")?;
127 debug!(
128 "Broadcasting lockup tx received in swap status update for receive swap {id}"
129 );
130 if let Err(e) = self.liquid_chain_service.broadcast(&lockup_tx).await {
131 warn!(
132 "Failed to broadcast lockup tx in swap status update: {e:?} - maybe the \
133 tx depends on inputs that haven't been seen yet, falling back to waiting for \
134 it to appear in the mempool"
135 );
136 if let Err(e) = self
137 .verify_lockup_tx_status(&receive_swap, &transaction.id, &tx_hex, false)
138 .await
139 {
140 return Err(anyhow!(
141 "Swapper mempool reported lockup could not be verified. txid: {}, err: {}",
142 transaction.id,
143 e
144 ));
145 }
146 }
147
148 if let Err(e) = self
149 .verify_lockup_tx_amount(&receive_swap, &lockup_tx)
150 .await
151 {
152 self.update_swap_info(id, Failed, None, None, None, None)?;
154 return Err(anyhow!(
155 "Swapper underpaid lockup amount. txid: {}, err: {}",
156 transaction.id,
157 e
158 ));
159 }
160 info!("Swapper lockup was verified");
161
162 let lockup_tx_id = &transaction.id;
163 self.update_swap_info(id, Pending, None, Some(lockup_tx_id), None, None)?;
164
165 let max_amount_sat = self.config.zero_conf_max_amount_sat();
167 let receiver_amount_sat = receive_swap.receiver_amount_sat;
168 if receiver_amount_sat > max_amount_sat {
169 warn!("[Receive Swap {id}] Amount is too high to claim with zero-conf ({receiver_amount_sat} sat > {max_amount_sat} sat). Waiting for confirmation...");
170 return Ok(());
171 }
172
173 debug!("[Receive Swap {id}] Amount is within valid range for zero-conf ({receiver_amount_sat} < {max_amount_sat} sat)");
174
175 let rbf_explicit = lockup_tx.input.iter().any(|input| input.sequence.is_rbf());
178 if rbf_explicit {
181 warn!("[Receive Swap {id}] Lockup transaction signals RBF. Waiting for confirmation...");
182 return Ok(());
183 }
184 debug!("[Receive Swap {id}] Lockup tx does not signal RBF. Proceeding...");
185
186 if let Err(err) = self.claim(id).await {
187 match err {
188 PaymentError::AlreadyClaimed => {
189 warn!("Funds already claimed for Receive Swap {id}")
190 }
191 _ => error!("Claim for Receive Swap {id} failed: {err}"),
192 }
193 }
194
195 Ok(())
196 }
197 RevSwapStates::TransactionConfirmed => {
198 let Some(transaction) = update.transaction.clone() else {
199 return Err(anyhow!("Unexpected payload from Boltz status stream"));
200 };
201
202 if let Some(mrh_tx_id) = receive_swap.mrh_tx_id {
204 return Err(anyhow!(
205 "MRH tx for Receive Swap {id} was already broadcast, ignoring swap: txid {mrh_tx_id}"
206 ));
207 }
208
209 let tx_hex = transaction.hex.ok_or(anyhow!(
211 "Missing lockup transaction hex in swap status update"
212 ))?;
213 let lockup_tx = match self
214 .verify_lockup_tx_status(&receive_swap, &transaction.id, &tx_hex, true)
215 .await
216 {
217 Ok(lockup_tx) => lockup_tx,
218 Err(e) => {
219 return Err(anyhow!(
220 "Swapper reported lockup could not be verified. txid: {}, err: {}",
221 transaction.id,
222 e
223 ));
224 }
225 };
226
227 if let Err(e) = self
228 .verify_lockup_tx_amount(&receive_swap, &lockup_tx)
229 .await
230 {
231 self.update_swap_info(id, Failed, None, None, None, None)?;
233 return Err(anyhow!(
234 "Swapper underpaid lockup amount. txid: {}, err: {}",
235 transaction.id,
236 e
237 ));
238 }
239 info!("Swapper lockup was verified, moving to claim");
240
241 match receive_swap.claim_tx_id {
242 Some(claim_tx_id) => {
243 warn!("Claim tx for Receive Swap {id} was already broadcast: txid {claim_tx_id}")
244 }
245 None => {
246 self.update_swap_info(&receive_swap.id, Pending, None, None, None, None)?;
247
248 if let Err(err) = self.claim(id).await {
249 match err {
250 PaymentError::AlreadyClaimed => {
251 warn!("Funds already claimed for Receive Swap {id}")
252 }
253 _ => error!("Claim for Receive Swap {id} failed: {err}"),
254 }
255 }
256 }
257 }
258 Ok(())
259 }
260
261 _ => {
262 debug!("Unhandled state for Receive Swap {id}: {swap_state:?}");
263 Ok(())
264 }
265 }
266 }
267
268 fn fetch_receive_swap_by_id(&self, swap_id: &str) -> Result<ReceiveSwap, PaymentError> {
269 self.persister
270 .fetch_receive_swap_by_id(swap_id)
271 .map_err(|e| {
272 error!("Failed to fetch receive swap by id: {e:?}");
273 PaymentError::PersistError
274 })?
275 .ok_or(PaymentError::Generic {
276 err: format!("Receive Swap not found {swap_id}"),
277 })
278 }
279
280 pub(crate) fn update_swap(&self, updated_swap: ReceiveSwap) -> Result<(), PaymentError> {
282 let swap = self.fetch_receive_swap_by_id(&updated_swap.id)?;
283 if updated_swap != swap {
284 info!(
285 "Updating Receive swap {} to {:?} (claim_tx_id = {:?}, lockup_tx_id = {:?}, mrh_tx_id = {:?})",
286 updated_swap.id, updated_swap.state, updated_swap.claim_tx_id, updated_swap.lockup_tx_id, updated_swap.mrh_tx_id
287 );
288 self.persister
289 .insert_or_update_receive_swap(&updated_swap)?;
290 let _ = self.subscription_notifier.send(updated_swap.id);
291 }
292 Ok(())
293 }
294
295 pub(crate) fn update_swap_info(
297 &self,
298 swap_id: &str,
299 to_state: PaymentState,
300 claim_tx_id: Option<&str>,
301 lockup_tx_id: Option<&str>,
302 mrh_tx_id: Option<&str>,
303 mrh_amount_sat: Option<u64>,
304 ) -> Result<(), PaymentError> {
305 info!(
306 "Transitioning Receive swap {swap_id} to {to_state:?} (claim_tx_id = {claim_tx_id:?}, lockup_tx_id = {lockup_tx_id:?}, mrh_tx_id = {mrh_tx_id:?})"
307 );
308 let swap = self.fetch_receive_swap_by_id(swap_id)?;
309 Self::validate_state_transition(swap.state, to_state)?;
310 self.persister.try_handle_receive_swap_update(
311 swap_id,
312 to_state,
313 claim_tx_id,
314 lockup_tx_id,
315 mrh_tx_id,
316 mrh_amount_sat,
317 )?;
318 let updated_swap = self.fetch_receive_swap_by_id(swap_id)?;
319
320 if mrh_tx_id.is_some() {
321 self.persister.delete_reserved_address(&swap.mrh_address)?;
322 }
323
324 if updated_swap != swap {
325 let _ = self.subscription_notifier.send(updated_swap.id);
326 }
327 Ok(())
328 }
329
330 async fn claim(&self, swap_id: &str) -> Result<(), PaymentError> {
331 {
332 let mut claiming_guard = self.claiming_swaps.lock().await;
333 if claiming_guard.contains(swap_id) {
334 debug!("Claim for swap {swap_id} already in progress, skipping.");
335 return Ok(());
336 }
337 claiming_guard.insert(swap_id.to_string());
338 }
339
340 let result = self.claim_inner(swap_id).await;
341
342 {
343 let mut claiming_guard = self.claiming_swaps.lock().await;
344 claiming_guard.remove(swap_id);
345 }
346
347 result
348 }
349
350 async fn claim_inner(&self, swap_id: &str) -> Result<(), PaymentError> {
351 let swap = self.fetch_receive_swap_by_id(swap_id)?;
352 ensure_sdk!(swap.claim_tx_id.is_none(), PaymentError::AlreadyClaimed);
353
354 let liquid_tip = self.liquid_chain_service.tip().await?;
356 let is_cooperative = liquid_tip <= swap.timeout_block_height.saturating_sub(10);
357 if !is_cooperative {
358 info!(
359 "Using non-cooperative claim for Receive Swap {swap_id} as timeout block height {} is near or past (liquid tip: {liquid_tip})",
360 swap.timeout_block_height
361 );
362 }
363
364 info!("Initiating claim for Receive Swap {swap_id}");
365 let claim_address = match swap.claim_address {
366 Some(ref claim_address) => claim_address.clone(),
367 None => {
368 let address = self.onchain_wallet.next_unused_address().await?.to_string();
370 self.persister
371 .set_receive_swap_claim_address(&swap.id, &address)?;
372 address
373 }
374 };
375
376 let crate::prelude::Transaction::Liquid(claim_tx) = self
377 .swapper
378 .create_claim_tx(
379 Swap::Receive(swap.clone()),
380 Some(claim_address.clone()),
381 is_cooperative,
382 )
383 .await?
384 else {
385 return Err(PaymentError::Generic {
386 err: format!("Constructed invalid transaction for Receive swap {swap_id}"),
387 });
388 };
389
390 let tx_id = claim_tx.txid().to_hex();
393 match self.persister.set_receive_swap_claim_tx_id(swap_id, &tx_id) {
394 Ok(_) => {
395 let broadcast_res = match self.liquid_chain_service.broadcast(&claim_tx).await {
397 Ok(tx_id) => Ok(tx_id.to_hex()),
398 Err(e) if is_txn_mempool_conflict_error(&e) => {
399 Err(PaymentError::AlreadyClaimed)
400 }
401 Err(err) => {
402 debug!(
403 "Could not broadcast claim tx via chain service for Receive swap {swap_id}: {err:?}"
404 );
405 let claim_tx_hex = claim_tx.serialize().to_lower_hex_string();
406 self.swapper
407 .broadcast_tx(self.config.network.into(), &claim_tx_hex)
408 .await
409 }
410 };
411 match broadcast_res {
412 Ok(claim_tx_id) => {
413 self.persister.insert_or_update_payment(
416 PaymentTxData {
417 tx_id: claim_tx_id.clone(),
418 timestamp: Some(utils::now()),
419 fees_sat: 0,
420 is_confirmed: false,
421 unblinding_data: None,
422 },
423 &[PaymentTxBalance {
424 amount: swap.receiver_amount_sat,
425 payment_type: PaymentType::Receive,
426 asset_id: self.config.lbtc_asset_id(),
427 }],
428 None,
429 false,
430 )?;
431
432 info!("Successfully broadcast claim tx {claim_tx_id} for Receive Swap {swap_id}");
433 _ = self.subscription_notifier.send(claim_tx_id);
436 Ok(())
437 }
438 Err(err) => {
439 debug!(
441 "Could not broadcast claim tx via swapper for Receive swap {swap_id}: {err:?}"
442 );
443 self.persister
444 .unset_receive_swap_claim_tx_id(swap_id, &tx_id)?;
445 Err(err)
446 }
447 }
448 }
449 Err(err) => {
450 debug!(
451 "Failed to set claim_tx_id after creating tx for Receive swap {swap_id}: txid {tx_id}"
452 );
453 Err(err)
454 }
455 }
456 }
457
458 async fn claim_confirmed_lockups(&self, height: u32) -> Result<()> {
459 let receive_swaps: Vec<ReceiveSwap> = self
460 .persister
461 .list_ongoing_receive_swaps()?
462 .into_iter()
463 .filter(|s| s.lockup_tx_id.is_some() && s.claim_tx_id.is_none())
464 .collect();
465 info!(
466 "Rescanning {} Receive Swap(s) lockup txs at height {}",
467 receive_swaps.len(),
468 height
469 );
470 for swap in receive_swaps {
471 if let Err(e) = self.claim_confirmed_lockup(&swap).await {
472 error!("Error rescanning Receive Swap {}: {e:?}", swap.id,);
473 }
474 }
475 Ok(())
476 }
477
478 async fn claim_confirmed_lockup(&self, receive_swap: &ReceiveSwap) -> Result<()> {
479 let Some(tx_id) = receive_swap.lockup_tx_id.clone() else {
480 return Ok(());
482 };
483 let swap_id = &receive_swap.id;
484 let tx_hex = self
485 .liquid_chain_service
486 .get_transaction_hex(&Txid::from_str(&tx_id)?)
487 .await?
488 .ok_or(anyhow!("Lockup tx not found for Receive swap {swap_id}"))?
489 .serialize()
490 .to_lower_hex_string();
491 let lockup_tx = self
492 .verify_lockup_tx_status(receive_swap, &tx_id, &tx_hex, true)
493 .await?;
494 if let Err(e) = self.verify_lockup_tx_amount(receive_swap, &lockup_tx).await {
495 self.update_swap_info(swap_id, Failed, None, None, None, None)?;
496 return Err(e);
497 }
498 info!("Receive Swap {swap_id} lockup tx is confirmed");
499 self.claim(swap_id)
500 .await
501 .map_err(|e| anyhow!("Could not claim Receive Swap {swap_id}: {e:?}"))
502 }
503
504 fn validate_state_transition(
505 from_state: PaymentState,
506 to_state: PaymentState,
507 ) -> Result<(), PaymentError> {
508 match (from_state, to_state) {
509 (_, Created) => Err(PaymentError::Generic {
510 err: "Cannot transition to Created state".to_string(),
511 }),
512
513 (Created | Pending, Pending) => Ok(()),
514 (_, Pending) => Err(PaymentError::Generic {
515 err: format!("Cannot transition from {from_state:?} to Pending state"),
516 }),
517
518 (Created | Pending, Complete) => Ok(()),
519 (_, Complete) => Err(PaymentError::Generic {
520 err: format!("Cannot transition from {from_state:?} to Complete state"),
521 }),
522
523 (Created | TimedOut, TimedOut) => Ok(()),
524 (_, TimedOut) => Err(PaymentError::Generic {
525 err: format!("Cannot transition from {from_state:?} to TimedOut state"),
526 }),
527
528 (_, Refundable) => Err(PaymentError::Generic {
529 err: format!("Cannot transition from {from_state:?} to Refundable state"),
530 }),
531
532 (_, RefundPending) => Err(PaymentError::Generic {
533 err: format!("Cannot transition from {from_state:?} to RefundPending state"),
534 }),
535
536 (Complete, Failed) => Err(PaymentError::Generic {
537 err: format!("Cannot transition from {from_state:?} to Failed state"),
538 }),
539 (_, Failed) => Ok(()),
540
541 (_, WaitingFeeAcceptance) => Err(PaymentError::Generic {
542 err: format!("Cannot transition from {from_state:?} to WaitingFeeAcceptance state"),
543 }),
544 }
545 }
546
547 async fn verify_lockup_tx_status(
548 &self,
549 receive_swap: &ReceiveSwap,
550 tx_id: &str,
551 tx_hex: &str,
552 verify_confirmation: bool,
553 ) -> Result<Transaction> {
554 let script = receive_swap.get_swap_script()?;
556 let address = script
557 .to_address(self.config.network.into())
558 .map_err(|e| anyhow!("Failed to get swap script address {e:?}"))?;
559 self.liquid_chain_service
560 .verify_tx(&address, tx_id, tx_hex, verify_confirmation)
561 .await
562 }
563
564 async fn verify_lockup_tx_amount(
565 &self,
566 receive_swap: &ReceiveSwap,
567 lockup_tx: &Transaction,
568 ) -> Result<()> {
569 let secp = Secp256k1::new();
570 let script = receive_swap.get_swap_script()?;
571 let address = script
572 .to_address(self.config.network.into())
573 .map_err(|e| anyhow!("Failed to get swap script address {e:?}"))?;
574 let blinding_key = receive_swap
575 .get_boltz_create_response()?
576 .blinding_key
577 .ok_or(anyhow!("Missing blinding key"))?;
578 let tx_out = lockup_tx
579 .output
580 .iter()
581 .find(|tx_out| tx_out.script_pubkey == address.script_pubkey())
582 .ok_or(anyhow!("Failed to get tx output"))?;
583 let lockup_amount_sat = tx_out
584 .unblind(&secp, SecretKey::from_str(&blinding_key)?)
585 .map(|o| o.value)?;
586 let expected_lockup_amount_sat =
587 receive_swap.receiver_amount_sat + receive_swap.claim_fees_sat;
588 if lockup_amount_sat < expected_lockup_amount_sat {
589 bail!(
590 "Failed to verify lockup amount for Receive Swap {}: {} sat vs {} sat",
591 receive_swap.id,
592 expected_lockup_amount_sat,
593 lockup_amount_sat
594 );
595 }
596 Ok(())
597 }
598}
599
600#[cfg(test)]
601mod tests {
602 use std::collections::{HashMap, HashSet};
603
604 use anyhow::Result;
605
606 use crate::{
607 model::PaymentState::{self, *},
608 test_utils::{
609 persist::{create_persister, new_receive_swap},
610 receive_swap::new_receive_swap_handler,
611 },
612 };
613
614 #[cfg(feature = "browser-tests")]
615 wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
616
617 #[sdk_macros::async_test_all]
618 async fn test_receive_swap_state_transitions() -> Result<()> {
619 create_persister!(persister);
620
621 let receive_swap_state_handler = new_receive_swap_handler(persister.clone())?;
622
623 let valid_combinations = HashMap::from([
625 (
626 Created,
627 HashSet::from([Pending, Complete, TimedOut, Failed]),
628 ),
629 (Pending, HashSet::from([Pending, Complete, Failed])),
630 (TimedOut, HashSet::from([TimedOut, Failed])),
631 (Complete, HashSet::from([])),
632 (Refundable, HashSet::from([Failed])),
633 (RefundPending, HashSet::from([Failed])),
634 (Failed, HashSet::from([Failed])),
635 ]);
636
637 for (first_state, allowed_states) in valid_combinations.iter() {
638 for allowed_state in allowed_states {
639 let receive_swap = new_receive_swap(Some(*first_state), None);
640 persister.insert_or_update_receive_swap(&receive_swap)?;
641
642 assert!(receive_swap_state_handler
643 .update_swap_info(&receive_swap.id, *allowed_state, None, None, None, None)
644 .is_ok());
645 }
646 }
647
648 let all_states = HashSet::from([Created, Pending, Complete, TimedOut, Failed]);
650 let invalid_combinations: HashMap<PaymentState, HashSet<PaymentState>> = valid_combinations
651 .iter()
652 .map(|(first_state, allowed_states)| {
653 (
654 *first_state,
655 all_states.difference(allowed_states).cloned().collect(),
656 )
657 })
658 .collect();
659
660 for (first_state, disallowed_states) in invalid_combinations.iter() {
661 for disallowed_state in disallowed_states {
662 let receive_swap = new_receive_swap(Some(*first_state), None);
663 persister.insert_or_update_receive_swap(&receive_swap)?;
664
665 assert!(receive_swap_state_handler
666 .update_swap_info(&receive_swap.id, *disallowed_state, None, None, None, None)
667 .is_err());
668 }
669 }
670
671 Ok(())
672 }
673}