1use std::collections::HashSet;
2use std::str::FromStr;
3
4use anyhow::{anyhow, bail, Context, Result};
5use boltz_client::swaps::boltz::RevSwapStates;
6use boltz_client::{boltz, Serialize, ToHex};
7use log::{debug, error, info, warn};
8use lwk_wollet::elements::secp256k1_zkp::Secp256k1;
9use lwk_wollet::elements::{Transaction, Txid};
10use lwk_wollet::hashes::hex::DisplayHex;
11use lwk_wollet::secp256k1::SecretKey;
12use sdk_common::utils::Arc;
13use tokio::sync::{broadcast, Mutex};
14
15use crate::chain::liquid::LiquidChainService;
16use crate::error::is_txn_mempool_conflict_error;
17use crate::model::{BlockListener, PaymentState::*};
18use crate::model::{Config, PaymentTxData, PaymentType, ReceiveSwap};
19use crate::prelude::Swap;
20use crate::{ensure_sdk, utils};
21use crate::{
22 error::PaymentError, model::PaymentState, persist::Persister, swapper::Swapper,
23 wallet::OnchainWallet,
24};
25
26pub const DEFAULT_ZERO_CONF_MAX_SAT: u64 = 1_000_000;
28
29pub(crate) struct ReceiveSwapHandler {
30 config: Config,
31 onchain_wallet: Arc<dyn OnchainWallet>,
32 persister: std::sync::Arc<Persister>,
33 swapper: Arc<dyn Swapper>,
34 subscription_notifier: broadcast::Sender<String>,
35 liquid_chain_service: Arc<dyn LiquidChainService>,
36 claiming_swaps: Arc<Mutex<HashSet<String>>>,
37}
38
39#[sdk_macros::async_trait]
40impl BlockListener for ReceiveSwapHandler {
41 async fn on_bitcoin_block(&self, _height: u32) {}
42
43 async fn on_liquid_block(&self, height: u32) {
44 if let Err(e) = self.claim_confirmed_lockups(height).await {
45 error!("Error claiming confirmed lockups: {e:?}");
46 }
47 }
48}
49
50impl ReceiveSwapHandler {
51 pub(crate) fn new(
52 config: Config,
53 onchain_wallet: Arc<dyn OnchainWallet>,
54 persister: std::sync::Arc<Persister>,
55 swapper: Arc<dyn Swapper>,
56 liquid_chain_service: Arc<dyn LiquidChainService>,
57 ) -> Self {
58 let (subscription_notifier, _) = broadcast::channel::<String>(30);
59 Self {
60 config,
61 onchain_wallet,
62 persister,
63 swapper,
64 subscription_notifier,
65 liquid_chain_service,
66 claiming_swaps: Arc::new(Mutex::new(HashSet::new())),
67 }
68 }
69
70 pub(crate) fn subscribe_payment_updates(&self) -> broadcast::Receiver<String> {
71 self.subscription_notifier.subscribe()
72 }
73
74 pub(crate) async fn on_new_status(&self, update: &boltz::SwapStatus) -> Result<()> {
76 let id = &update.id;
77 let status = &update.status;
78 let swap_state = RevSwapStates::from_str(status)
79 .map_err(|_| anyhow!("Invalid RevSwapState for Receive Swap {id}: {status}"))?;
80 let receive_swap = self.fetch_receive_swap_by_id(id)?;
81
82 info!("Handling Receive Swap transition to {swap_state:?} for swap {id}");
83
84 match swap_state {
85 RevSwapStates::SwapExpired
86 | RevSwapStates::InvoiceExpired
87 | RevSwapStates::TransactionFailed
88 | RevSwapStates::TransactionRefunded => {
89 match receive_swap.mrh_tx_id {
90 Some(mrh_tx_id) => {
91 warn!("Swap {id} is expired but MRH payment was received: txid {mrh_tx_id}")
92 }
93 None => {
94 error!("Swap {id} entered into an unrecoverable state: {swap_state:?}");
95 self.update_swap_info(id, Failed, None, None, None, None)?;
96 }
97 }
98 Ok(())
99 }
100 RevSwapStates::TransactionMempool => {
103 let Some(transaction) = update.transaction.clone() else {
104 return Err(anyhow!("Unexpected payload from Boltz status stream"));
105 };
106
107 if let Some(claim_tx_id) = receive_swap.claim_tx_id {
108 return Err(anyhow!(
109 "Claim tx for Receive Swap {id} was already broadcast: txid {claim_tx_id}"
110 ));
111 }
112
113 if let Some(mrh_tx_id) = receive_swap.mrh_tx_id {
115 return Err(anyhow!(
116 "MRH tx for Receive Swap {id} was already broadcast, ignoring swap: txid {mrh_tx_id}"
117 ));
118 }
119
120 let tx_hex = transaction.hex.ok_or(anyhow!(
122 "Missing lockup transaction hex in swap status update"
123 ))?;
124 let lockup_tx = utils::deserialize_tx_hex(&tx_hex)
125 .context("Failed to deserialize tx hex in swap status update")?;
126 debug!(
127 "Broadcasting lockup tx received in swap status update for receive swap {id}"
128 );
129 if let Err(e) = self.liquid_chain_service.broadcast(&lockup_tx).await {
130 warn!(
131 "Failed to broadcast lockup tx in swap status update: {e:?} - maybe the \
132 tx depends on inputs that haven't been seen yet, falling back to waiting for \
133 it to appear in the mempool"
134 );
135 if let Err(e) = self
136 .verify_lockup_tx_status(&receive_swap, &transaction.id, &tx_hex, false)
137 .await
138 {
139 return Err(anyhow!(
140 "Swapper mempool reported lockup could not be verified. txid: {}, err: {}",
141 transaction.id,
142 e
143 ));
144 }
145 }
146
147 if let Err(e) = self
148 .verify_lockup_tx_amount(&receive_swap, &lockup_tx)
149 .await
150 {
151 self.update_swap_info(id, Failed, None, None, None, None)?;
153 return Err(anyhow!(
154 "Swapper underpaid lockup amount. txid: {}, err: {}",
155 transaction.id,
156 e
157 ));
158 }
159 info!("Swapper lockup was verified");
160
161 let lockup_tx_id = &transaction.id;
162 self.update_swap_info(id, Pending, None, Some(lockup_tx_id), None, None)?;
163
164 let max_amount_sat = self.config.zero_conf_max_amount_sat();
166 let receiver_amount_sat = receive_swap.receiver_amount_sat;
167 if receiver_amount_sat > max_amount_sat {
168 warn!("[Receive Swap {id}] Amount is too high to claim with zero-conf ({receiver_amount_sat} sat > {max_amount_sat} sat). Waiting for confirmation...");
169 return Ok(());
170 }
171
172 debug!("[Receive Swap {id}] Amount is within valid range for zero-conf ({receiver_amount_sat} < {max_amount_sat} sat)");
173
174 let rbf_explicit = lockup_tx.input.iter().any(|input| input.sequence.is_rbf());
177 if rbf_explicit {
180 warn!("[Receive Swap {id}] Lockup transaction signals RBF. Waiting for confirmation...");
181 return Ok(());
182 }
183 debug!("[Receive Swap {id}] Lockup tx does not signal RBF. Proceeding...");
184
185 if let Err(err) = self.claim(id).await {
186 match err {
187 PaymentError::AlreadyClaimed => {
188 warn!("Funds already claimed for Receive Swap {id}")
189 }
190 _ => error!("Claim for Receive Swap {id} failed: {err}"),
191 }
192 }
193
194 Ok(())
195 }
196 RevSwapStates::TransactionConfirmed => {
197 let Some(transaction) = update.transaction.clone() else {
198 return Err(anyhow!("Unexpected payload from Boltz status stream"));
199 };
200
201 if let Some(mrh_tx_id) = receive_swap.mrh_tx_id {
203 return Err(anyhow!(
204 "MRH tx for Receive Swap {id} was already broadcast, ignoring swap: txid {mrh_tx_id}"
205 ));
206 }
207
208 let tx_hex = transaction.hex.ok_or(anyhow!(
210 "Missing lockup transaction hex in swap status update"
211 ))?;
212 let lockup_tx = match self
213 .verify_lockup_tx_status(&receive_swap, &transaction.id, &tx_hex, true)
214 .await
215 {
216 Ok(lockup_tx) => lockup_tx,
217 Err(e) => {
218 return Err(anyhow!(
219 "Swapper reported lockup could not be verified. txid: {}, err: {}",
220 transaction.id,
221 e
222 ));
223 }
224 };
225
226 if let Err(e) = self
227 .verify_lockup_tx_amount(&receive_swap, &lockup_tx)
228 .await
229 {
230 self.update_swap_info(id, Failed, None, None, None, None)?;
232 return Err(anyhow!(
233 "Swapper underpaid lockup amount. txid: {}, err: {}",
234 transaction.id,
235 e
236 ));
237 }
238 info!("Swapper lockup was verified, moving to claim");
239
240 match receive_swap.claim_tx_id {
241 Some(claim_tx_id) => {
242 warn!("Claim tx for Receive Swap {id} was already broadcast: txid {claim_tx_id}")
243 }
244 None => {
245 self.update_swap_info(&receive_swap.id, Pending, None, None, None, None)?;
246
247 if let Err(err) = self.claim(id).await {
248 match err {
249 PaymentError::AlreadyClaimed => {
250 warn!("Funds already claimed for Receive Swap {id}")
251 }
252 _ => error!("Claim for Receive Swap {id} failed: {err}"),
253 }
254 }
255 }
256 }
257 Ok(())
258 }
259
260 _ => {
261 debug!("Unhandled state for Receive Swap {id}: {swap_state:?}");
262 Ok(())
263 }
264 }
265 }
266
267 fn fetch_receive_swap_by_id(&self, swap_id: &str) -> Result<ReceiveSwap, PaymentError> {
268 self.persister
269 .fetch_receive_swap_by_id(swap_id)
270 .map_err(|_| PaymentError::PersistError)?
271 .ok_or(PaymentError::Generic {
272 err: format!("Receive Swap not found {swap_id}"),
273 })
274 }
275
276 pub(crate) fn update_swap(&self, updated_swap: ReceiveSwap) -> Result<(), PaymentError> {
278 let swap = self.fetch_receive_swap_by_id(&updated_swap.id)?;
279 if updated_swap != swap {
280 info!(
281 "Updating Receive swap {} to {:?} (claim_tx_id = {:?}, lockup_tx_id = {:?}, mrh_tx_id = {:?})",
282 updated_swap.id, updated_swap.state, updated_swap.claim_tx_id, updated_swap.lockup_tx_id, updated_swap.mrh_tx_id
283 );
284 self.persister
285 .insert_or_update_receive_swap(&updated_swap)?;
286 let _ = self.subscription_notifier.send(updated_swap.id);
287 }
288 Ok(())
289 }
290
291 pub(crate) fn update_swap_info(
293 &self,
294 swap_id: &str,
295 to_state: PaymentState,
296 claim_tx_id: Option<&str>,
297 lockup_tx_id: Option<&str>,
298 mrh_tx_id: Option<&str>,
299 mrh_amount_sat: Option<u64>,
300 ) -> Result<(), PaymentError> {
301 info!(
302 "Transitioning Receive swap {} to {:?} (claim_tx_id = {:?}, lockup_tx_id = {:?}, mrh_tx_id = {:?})",
303 swap_id, to_state, claim_tx_id, lockup_tx_id, mrh_tx_id
304 );
305 let swap = self.fetch_receive_swap_by_id(swap_id)?;
306 Self::validate_state_transition(swap.state, to_state)?;
307 self.persister.try_handle_receive_swap_update(
308 swap_id,
309 to_state,
310 claim_tx_id,
311 lockup_tx_id,
312 mrh_tx_id,
313 mrh_amount_sat,
314 )?;
315 let updated_swap = self.fetch_receive_swap_by_id(swap_id)?;
316
317 if mrh_tx_id.is_some() {
318 self.persister.delete_reserved_address(&swap.mrh_address)?;
319 }
320
321 if updated_swap != swap {
322 let _ = self.subscription_notifier.send(updated_swap.id);
323 }
324 Ok(())
325 }
326
327 async fn claim(&self, swap_id: &str) -> Result<(), PaymentError> {
328 {
329 let mut claiming_guard = self.claiming_swaps.lock().await;
330 if claiming_guard.contains(swap_id) {
331 debug!("Claim for swap {swap_id} already in progress, skipping.");
332 return Ok(());
333 }
334 claiming_guard.insert(swap_id.to_string());
335 }
336
337 let result = self.claim_inner(swap_id).await;
338
339 {
340 let mut claiming_guard = self.claiming_swaps.lock().await;
341 claiming_guard.remove(swap_id);
342 }
343
344 result
345 }
346
347 async fn claim_inner(&self, swap_id: &str) -> Result<(), PaymentError> {
348 let swap = self.fetch_receive_swap_by_id(swap_id)?;
349 ensure_sdk!(swap.claim_tx_id.is_none(), PaymentError::AlreadyClaimed);
350
351 info!("Initiating claim for Receive Swap {swap_id}");
352 let claim_address = match swap.claim_address {
353 Some(ref claim_address) => claim_address.clone(),
354 None => {
355 let address = self.onchain_wallet.next_unused_address().await?.to_string();
357 self.persister
358 .set_receive_swap_claim_address(&swap.id, &address)?;
359 address
360 }
361 };
362
363 let crate::prelude::Transaction::Liquid(claim_tx) = self
364 .swapper
365 .create_claim_tx(Swap::Receive(swap.clone()), Some(claim_address.clone()))
366 .await?
367 else {
368 return Err(PaymentError::Generic {
369 err: format!("Constructed invalid transaction for Receive swap {swap_id}"),
370 });
371 };
372
373 let tx_id = claim_tx.txid().to_hex();
376 match self.persister.set_receive_swap_claim_tx_id(swap_id, &tx_id) {
377 Ok(_) => {
378 let broadcast_res = match self.liquid_chain_service.broadcast(&claim_tx).await {
380 Ok(tx_id) => Ok(tx_id.to_hex()),
381 Err(e) if is_txn_mempool_conflict_error(&e) => {
382 Err(PaymentError::AlreadyClaimed)
383 }
384 Err(err) => {
385 debug!(
386 "Could not broadcast claim tx via chain service for Receive swap {swap_id}: {err:?}"
387 );
388 let claim_tx_hex = claim_tx.serialize().to_lower_hex_string();
389 self.swapper
390 .broadcast_tx(self.config.network.into(), &claim_tx_hex)
391 .await
392 }
393 };
394 match broadcast_res {
395 Ok(claim_tx_id) => {
396 self.persister.insert_or_update_payment(
399 PaymentTxData {
400 tx_id: claim_tx_id.clone(),
401 timestamp: Some(utils::now()),
402 asset_id: self.config.lbtc_asset_id(),
403 amount: swap.receiver_amount_sat,
404 fees_sat: 0,
405 payment_type: PaymentType::Receive,
406 is_confirmed: false,
407 unblinding_data: None,
408 },
409 None,
410 false,
411 )?;
412
413 info!("Successfully broadcast claim tx {claim_tx_id} for Receive Swap {swap_id}");
414 _ = self.subscription_notifier.send(claim_tx_id);
417 Ok(())
418 }
419 Err(err) => {
420 debug!(
422 "Could not broadcast claim tx via swapper for Receive swap {swap_id}: {err:?}"
423 );
424 self.persister
425 .unset_receive_swap_claim_tx_id(swap_id, &tx_id)?;
426 Err(err)
427 }
428 }
429 }
430 Err(err) => {
431 debug!(
432 "Failed to set claim_tx_id after creating tx for Receive swap {swap_id}: txid {tx_id}"
433 );
434 Err(err)
435 }
436 }
437 }
438
439 async fn claim_confirmed_lockups(&self, height: u32) -> Result<()> {
440 let receive_swaps: Vec<ReceiveSwap> = self
441 .persister
442 .list_ongoing_receive_swaps()?
443 .into_iter()
444 .filter(|s| s.lockup_tx_id.is_some() && s.claim_tx_id.is_none())
445 .collect();
446 info!(
447 "Rescanning {} Receive Swap(s) lockup txs at height {}",
448 receive_swaps.len(),
449 height
450 );
451 for swap in receive_swaps {
452 if let Err(e) = self.claim_confirmed_lockup(&swap).await {
453 error!("Error rescanning Receive Swap {}: {e:?}", swap.id,);
454 }
455 }
456 Ok(())
457 }
458
459 async fn claim_confirmed_lockup(&self, receive_swap: &ReceiveSwap) -> Result<()> {
460 let Some(tx_id) = receive_swap.lockup_tx_id.clone() else {
461 return Ok(());
463 };
464 let swap_id = &receive_swap.id;
465 let tx_hex = self
466 .liquid_chain_service
467 .get_transaction_hex(&Txid::from_str(&tx_id)?)
468 .await?
469 .ok_or(anyhow!("Lockup tx not found for Receive swap {swap_id}"))?
470 .serialize()
471 .to_lower_hex_string();
472 let lockup_tx = self
473 .verify_lockup_tx_status(receive_swap, &tx_id, &tx_hex, true)
474 .await?;
475 if let Err(e) = self.verify_lockup_tx_amount(receive_swap, &lockup_tx).await {
476 self.update_swap_info(swap_id, Failed, None, None, None, None)?;
477 return Err(e);
478 }
479 info!("Receive Swap {swap_id} lockup tx is confirmed");
480 self.claim(swap_id)
481 .await
482 .map_err(|e| anyhow!("Could not claim Receive Swap {swap_id}: {e:?}"))
483 }
484
485 fn validate_state_transition(
486 from_state: PaymentState,
487 to_state: PaymentState,
488 ) -> Result<(), PaymentError> {
489 match (from_state, to_state) {
490 (_, Created) => Err(PaymentError::Generic {
491 err: "Cannot transition to Created state".to_string(),
492 }),
493
494 (Created | Pending, Pending) => Ok(()),
495 (_, Pending) => Err(PaymentError::Generic {
496 err: format!("Cannot transition from {from_state:?} to Pending state"),
497 }),
498
499 (Created | Pending, Complete) => Ok(()),
500 (_, Complete) => Err(PaymentError::Generic {
501 err: format!("Cannot transition from {from_state:?} to Complete state"),
502 }),
503
504 (Created | TimedOut, TimedOut) => Ok(()),
505 (_, TimedOut) => Err(PaymentError::Generic {
506 err: format!("Cannot transition from {from_state:?} to TimedOut state"),
507 }),
508
509 (_, Refundable) => Err(PaymentError::Generic {
510 err: format!("Cannot transition from {from_state:?} to Refundable state"),
511 }),
512
513 (_, RefundPending) => Err(PaymentError::Generic {
514 err: format!("Cannot transition from {from_state:?} to RefundPending state"),
515 }),
516
517 (Complete, Failed) => Err(PaymentError::Generic {
518 err: format!("Cannot transition from {from_state:?} to Failed state"),
519 }),
520 (_, Failed) => Ok(()),
521
522 (_, WaitingFeeAcceptance) => Err(PaymentError::Generic {
523 err: format!("Cannot transition from {from_state:?} to WaitingFeeAcceptance state"),
524 }),
525 }
526 }
527
528 async fn verify_lockup_tx_status(
529 &self,
530 receive_swap: &ReceiveSwap,
531 tx_id: &str,
532 tx_hex: &str,
533 verify_confirmation: bool,
534 ) -> Result<Transaction> {
535 let script = receive_swap.get_swap_script()?;
537 let address = script
538 .to_address(self.config.network.into())
539 .map_err(|e| anyhow!("Failed to get swap script address {e:?}"))?;
540 self.liquid_chain_service
541 .verify_tx(&address, tx_id, tx_hex, verify_confirmation)
542 .await
543 }
544
545 async fn verify_lockup_tx_amount(
546 &self,
547 receive_swap: &ReceiveSwap,
548 lockup_tx: &Transaction,
549 ) -> Result<()> {
550 let secp = Secp256k1::new();
551 let script = receive_swap.get_swap_script()?;
552 let address = script
553 .to_address(self.config.network.into())
554 .map_err(|e| anyhow!("Failed to get swap script address {e:?}"))?;
555 let blinding_key = receive_swap
556 .get_boltz_create_response()?
557 .blinding_key
558 .ok_or(anyhow!("Missing blinding key"))?;
559 let tx_out = lockup_tx
560 .output
561 .iter()
562 .find(|tx_out| tx_out.script_pubkey == address.script_pubkey())
563 .ok_or(anyhow!("Failed to get tx output"))?;
564 let lockup_amount_sat = tx_out
565 .unblind(&secp, SecretKey::from_str(&blinding_key)?)
566 .map(|o| o.value)?;
567 let expected_lockup_amount_sat =
568 receive_swap.receiver_amount_sat + receive_swap.claim_fees_sat;
569 if lockup_amount_sat < expected_lockup_amount_sat {
570 bail!(
571 "Failed to verify lockup amount for Receive Swap {}: {} sat vs {} sat",
572 receive_swap.id,
573 expected_lockup_amount_sat,
574 lockup_amount_sat
575 );
576 }
577 Ok(())
578 }
579}
580
581#[cfg(test)]
582mod tests {
583 use std::collections::{HashMap, HashSet};
584
585 use anyhow::Result;
586
587 use crate::{
588 model::PaymentState::{self, *},
589 test_utils::{
590 persist::{create_persister, new_receive_swap},
591 receive_swap::new_receive_swap_handler,
592 },
593 };
594
595 #[cfg(feature = "browser-tests")]
596 wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
597
598 #[sdk_macros::async_test_all]
599 async fn test_receive_swap_state_transitions() -> Result<()> {
600 create_persister!(persister);
601
602 let receive_swap_state_handler = new_receive_swap_handler(persister.clone())?;
603
604 let valid_combinations = HashMap::from([
606 (
607 Created,
608 HashSet::from([Pending, Complete, TimedOut, Failed]),
609 ),
610 (Pending, HashSet::from([Pending, Complete, Failed])),
611 (TimedOut, HashSet::from([TimedOut, Failed])),
612 (Complete, HashSet::from([])),
613 (Refundable, HashSet::from([Failed])),
614 (RefundPending, HashSet::from([Failed])),
615 (Failed, HashSet::from([Failed])),
616 ]);
617
618 for (first_state, allowed_states) in valid_combinations.iter() {
619 for allowed_state in allowed_states {
620 let receive_swap = new_receive_swap(Some(*first_state), None);
621 persister.insert_or_update_receive_swap(&receive_swap)?;
622
623 assert!(receive_swap_state_handler
624 .update_swap_info(&receive_swap.id, *allowed_state, None, None, None, None)
625 .is_ok());
626 }
627 }
628
629 let all_states = HashSet::from([Created, Pending, Complete, TimedOut, Failed]);
631 let invalid_combinations: HashMap<PaymentState, HashSet<PaymentState>> = valid_combinations
632 .iter()
633 .map(|(first_state, allowed_states)| {
634 (
635 *first_state,
636 all_states.difference(allowed_states).cloned().collect(),
637 )
638 })
639 .collect();
640
641 for (first_state, disallowed_states) in invalid_combinations.iter() {
642 for disallowed_state in disallowed_states {
643 let receive_swap = new_receive_swap(Some(*first_state), None);
644 persister.insert_or_update_receive_swap(&receive_swap)?;
645
646 assert!(receive_swap_state_handler
647 .update_swap_info(&receive_swap.id, *disallowed_state, None, None, None, None)
648 .is_err());
649 }
650 }
651
652 Ok(())
653 }
654}