1use std::collections::HashSet;
2use std::str::FromStr;
3
4use anyhow::{anyhow, bail, Context, Result};
5use boltz_client::swaps::boltz::RevSwapStates;
6use boltz_client::{boltz, Serialize, ToHex};
7use log::{debug, error, info, warn};
8use lwk_wollet::elements::secp256k1_zkp::Secp256k1;
9use lwk_wollet::elements::{Transaction, Txid};
10use lwk_wollet::hashes::hex::DisplayHex;
11use lwk_wollet::secp256k1::SecretKey;
12use sdk_common::utils::Arc;
13use tokio::sync::{broadcast, Mutex};
14
15use crate::chain::liquid::LiquidChainService;
16use crate::model::{BlockListener, PaymentState::*};
17use crate::model::{Config, PaymentTxData, PaymentType, ReceiveSwap};
18use crate::prelude::Swap;
19use crate::{ensure_sdk, utils};
20use crate::{
21 error::PaymentError, model::PaymentState, persist::Persister, swapper::Swapper,
22 wallet::OnchainWallet,
23};
24
25pub const DEFAULT_ZERO_CONF_MAX_SAT: u64 = 1_000_000;
27
28pub(crate) struct ReceiveSwapHandler {
29 config: Config,
30 onchain_wallet: Arc<dyn OnchainWallet>,
31 persister: Arc<Persister>,
32 swapper: Arc<dyn Swapper>,
33 subscription_notifier: broadcast::Sender<String>,
34 liquid_chain_service: Arc<dyn LiquidChainService>,
35 claiming_swaps: Arc<Mutex<HashSet<String>>>,
36}
37
38#[sdk_macros::async_trait]
39impl BlockListener for ReceiveSwapHandler {
40 async fn on_bitcoin_block(&self, _height: u32) {}
41
42 async fn on_liquid_block(&self, height: u32) {
43 if let Err(e) = self.claim_confirmed_lockups(height).await {
44 error!("Error claiming confirmed lockups: {e:?}");
45 }
46 }
47}
48
49impl ReceiveSwapHandler {
50 pub(crate) fn new(
51 config: Config,
52 onchain_wallet: Arc<dyn OnchainWallet>,
53 persister: Arc<Persister>,
54 swapper: Arc<dyn Swapper>,
55 liquid_chain_service: Arc<dyn LiquidChainService>,
56 ) -> Self {
57 let (subscription_notifier, _) = broadcast::channel::<String>(30);
58 Self {
59 config,
60 onchain_wallet,
61 persister,
62 swapper,
63 subscription_notifier,
64 liquid_chain_service,
65 claiming_swaps: Arc::new(Mutex::new(HashSet::new())),
66 }
67 }
68
69 pub(crate) fn subscribe_payment_updates(&self) -> broadcast::Receiver<String> {
70 self.subscription_notifier.subscribe()
71 }
72
73 pub(crate) async fn on_new_status(&self, update: &boltz::SwapStatus) -> Result<()> {
75 let id = &update.id;
76 let status = &update.status;
77 let swap_state = RevSwapStates::from_str(status)
78 .map_err(|_| anyhow!("Invalid RevSwapState for Receive Swap {id}: {status}"))?;
79 let receive_swap = self.fetch_receive_swap_by_id(id)?;
80
81 info!("Handling Receive Swap transition to {swap_state:?} for swap {id}");
82
83 match swap_state {
84 RevSwapStates::SwapExpired
85 | RevSwapStates::InvoiceExpired
86 | RevSwapStates::TransactionFailed
87 | RevSwapStates::TransactionRefunded => {
88 match receive_swap.mrh_tx_id {
89 Some(mrh_tx_id) => {
90 warn!("Swap {id} is expired but MRH payment was received: txid {mrh_tx_id}")
91 }
92 None => {
93 error!("Swap {id} entered into an unrecoverable state: {swap_state:?}");
94 self.update_swap_info(id, Failed, None, None, None, None)?;
95 }
96 }
97 Ok(())
98 }
99 RevSwapStates::TransactionMempool => {
102 let Some(transaction) = update.transaction.clone() else {
103 return Err(anyhow!("Unexpected payload from Boltz status stream"));
104 };
105
106 if let Some(claim_tx_id) = receive_swap.claim_tx_id {
107 return Err(anyhow!(
108 "Claim tx for Receive Swap {id} was already broadcast: txid {claim_tx_id}"
109 ));
110 }
111
112 if let Some(mrh_tx_id) = receive_swap.mrh_tx_id {
114 return Err(anyhow!(
115 "MRH tx for Receive Swap {id} was already broadcast, ignoring swap: txid {mrh_tx_id}"
116 ));
117 }
118
119 let tx_hex = transaction.hex.ok_or(anyhow!(
121 "Missing lockup transaction hex in swap status update"
122 ))?;
123 let lockup_tx = utils::deserialize_tx_hex(&tx_hex)
124 .context("Failed to deserialize tx hex in swap status update")?;
125 debug!(
126 "Broadcasting lockup tx received in swap status update for receive swap {id}"
127 );
128 if let Err(e) = self.liquid_chain_service.broadcast(&lockup_tx).await {
129 warn!(
130 "Failed to broadcast lockup tx in swap status update: {e:?} - maybe the \
131 tx depends on inputs that haven't been seen yet, falling back to waiting for \
132 it to appear in the mempool"
133 );
134 if let Err(e) = self
135 .verify_lockup_tx_status(&receive_swap, &transaction.id, &tx_hex, false)
136 .await
137 {
138 return Err(anyhow!(
139 "Swapper mempool reported lockup could not be verified. txid: {}, err: {}",
140 transaction.id,
141 e
142 ));
143 }
144 }
145
146 if let Err(e) = self
147 .verify_lockup_tx_amount(&receive_swap, &lockup_tx)
148 .await
149 {
150 self.update_swap_info(id, Failed, None, None, None, None)?;
152 return Err(anyhow!(
153 "Swapper underpaid lockup amount. txid: {}, err: {}",
154 transaction.id,
155 e
156 ));
157 }
158 info!("Swapper lockup was verified");
159
160 let lockup_tx_id = &transaction.id;
161 self.update_swap_info(id, Pending, None, Some(lockup_tx_id), None, None)?;
162
163 let max_amount_sat = self.config.zero_conf_max_amount_sat();
165 let receiver_amount_sat = receive_swap.receiver_amount_sat;
166 if receiver_amount_sat > max_amount_sat {
167 warn!("[Receive Swap {id}] Amount is too high to claim with zero-conf ({receiver_amount_sat} sat > {max_amount_sat} sat). Waiting for confirmation...");
168 return Ok(());
169 }
170
171 debug!("[Receive Swap {id}] Amount is within valid range for zero-conf ({receiver_amount_sat} < {max_amount_sat} sat)");
172
173 let rbf_explicit = lockup_tx.input.iter().any(|input| input.sequence.is_rbf());
176 if rbf_explicit {
179 warn!("[Receive Swap {id}] Lockup transaction signals RBF. Waiting for confirmation...");
180 return Ok(());
181 }
182 debug!("[Receive Swap {id}] Lockup tx does not signal RBF. Proceeding...");
183
184 if receive_swap.metadata.is_local {
185 if let Err(err) = self.claim(id).await {
187 match err {
188 PaymentError::AlreadyClaimed => {
189 warn!("Funds already claimed for Receive Swap {id}")
190 }
191 _ => error!("Claim for Receive Swap {id} failed: {err}"),
192 }
193 }
194 }
195
196 Ok(())
197 }
198 RevSwapStates::TransactionConfirmed => {
199 let Some(transaction) = update.transaction.clone() else {
200 return Err(anyhow!("Unexpected payload from Boltz status stream"));
201 };
202
203 if let Some(mrh_tx_id) = receive_swap.mrh_tx_id {
205 return Err(anyhow!(
206 "MRH tx for Receive Swap {id} was already broadcast, ignoring swap: txid {mrh_tx_id}"
207 ));
208 }
209
210 let tx_hex = transaction.hex.ok_or(anyhow!(
212 "Missing lockup transaction hex in swap status update"
213 ))?;
214 let lockup_tx = match self
215 .verify_lockup_tx_status(&receive_swap, &transaction.id, &tx_hex, true)
216 .await
217 {
218 Ok(lockup_tx) => lockup_tx,
219 Err(e) => {
220 return Err(anyhow!(
221 "Swapper reported lockup could not be verified. txid: {}, err: {}",
222 transaction.id,
223 e
224 ));
225 }
226 };
227
228 if let Err(e) = self
229 .verify_lockup_tx_amount(&receive_swap, &lockup_tx)
230 .await
231 {
232 self.update_swap_info(id, Failed, None, None, None, None)?;
234 return Err(anyhow!(
235 "Swapper underpaid lockup amount. txid: {}, err: {}",
236 transaction.id,
237 e
238 ));
239 }
240 info!("Swapper lockup was verified, moving to claim");
241
242 match receive_swap.claim_tx_id {
243 Some(claim_tx_id) => {
244 warn!("Claim tx for Receive Swap {id} was already broadcast: txid {claim_tx_id}")
245 }
246 None => {
247 self.update_swap_info(&receive_swap.id, Pending, None, None, None, None)?;
248
249 if receive_swap.metadata.is_local {
250 if let Err(err) = self.claim(id).await {
252 match err {
253 PaymentError::AlreadyClaimed => {
254 warn!("Funds already claimed for Receive Swap {id}")
255 }
256 _ => error!("Claim for Receive Swap {id} failed: {err}"),
257 }
258 }
259 }
260 }
261 }
262 Ok(())
263 }
264
265 _ => {
266 debug!("Unhandled state for Receive Swap {id}: {swap_state:?}");
267 Ok(())
268 }
269 }
270 }
271
272 fn fetch_receive_swap_by_id(&self, swap_id: &str) -> Result<ReceiveSwap, PaymentError> {
273 self.persister
274 .fetch_receive_swap_by_id(swap_id)
275 .map_err(|_| PaymentError::PersistError)?
276 .ok_or(PaymentError::Generic {
277 err: format!("Receive Swap not found {swap_id}"),
278 })
279 }
280
281 pub(crate) fn update_swap(&self, updated_swap: ReceiveSwap) -> Result<(), PaymentError> {
283 let swap = self.fetch_receive_swap_by_id(&updated_swap.id)?;
284 if updated_swap != swap {
285 info!(
286 "Updating Receive swap {} to {:?} (claim_tx_id = {:?}, lockup_tx_id = {:?}, mrh_tx_id = {:?})",
287 updated_swap.id, updated_swap.state, updated_swap.claim_tx_id, updated_swap.lockup_tx_id, updated_swap.mrh_tx_id
288 );
289 self.persister
290 .insert_or_update_receive_swap(&updated_swap)?;
291 let _ = self.subscription_notifier.send(updated_swap.id);
292 }
293 Ok(())
294 }
295
296 pub(crate) fn update_swap_info(
298 &self,
299 swap_id: &str,
300 to_state: PaymentState,
301 claim_tx_id: Option<&str>,
302 lockup_tx_id: Option<&str>,
303 mrh_tx_id: Option<&str>,
304 mrh_amount_sat: Option<u64>,
305 ) -> Result<(), PaymentError> {
306 info!(
307 "Transitioning Receive swap {} to {:?} (claim_tx_id = {:?}, lockup_tx_id = {:?}, mrh_tx_id = {:?})",
308 swap_id, to_state, claim_tx_id, lockup_tx_id, mrh_tx_id
309 );
310 let swap = self.fetch_receive_swap_by_id(swap_id)?;
311 Self::validate_state_transition(swap.state, to_state)?;
312 self.persister.try_handle_receive_swap_update(
313 swap_id,
314 to_state,
315 claim_tx_id,
316 lockup_tx_id,
317 mrh_tx_id,
318 mrh_amount_sat,
319 )?;
320 let updated_swap = self.fetch_receive_swap_by_id(swap_id)?;
321
322 if mrh_tx_id.is_some() {
323 self.persister.delete_reserved_address(&swap.mrh_address)?;
324 }
325
326 if updated_swap != swap {
327 let _ = self.subscription_notifier.send(updated_swap.id);
328 }
329 Ok(())
330 }
331
332 async fn claim(&self, swap_id: &str) -> Result<(), PaymentError> {
333 {
334 let mut claiming_guard = self.claiming_swaps.lock().await;
335 if claiming_guard.contains(swap_id) {
336 debug!("Claim for swap {swap_id} already in progress, skipping.");
337 return Ok(());
338 }
339 claiming_guard.insert(swap_id.to_string());
340 }
341
342 let result = self.claim_inner(swap_id).await;
343
344 {
345 let mut claiming_guard = self.claiming_swaps.lock().await;
346 claiming_guard.remove(swap_id);
347 }
348
349 result
350 }
351
352 async fn claim_inner(&self, swap_id: &str) -> Result<(), PaymentError> {
353 let swap = self.fetch_receive_swap_by_id(swap_id)?;
354 ensure_sdk!(swap.claim_tx_id.is_none(), PaymentError::AlreadyClaimed);
355
356 info!("Initiating claim for Receive Swap {swap_id}");
357 let claim_address = match swap.claim_address {
358 Some(ref claim_address) => claim_address.clone(),
359 None => {
360 let address = self.onchain_wallet.next_unused_address().await?.to_string();
362 self.persister
363 .set_receive_swap_claim_address(&swap.id, &address)?;
364 address
365 }
366 };
367
368 let crate::prelude::Transaction::Liquid(claim_tx) = self
369 .swapper
370 .create_claim_tx(Swap::Receive(swap.clone()), Some(claim_address.clone()))
371 .await?
372 else {
373 return Err(PaymentError::Generic {
374 err: format!("Constructed invalid transaction for Receive swap {swap_id}"),
375 });
376 };
377
378 let tx_id = claim_tx.txid().to_hex();
381 match self.persister.set_receive_swap_claim_tx_id(swap_id, &tx_id) {
382 Ok(_) => {
383 let broadcast_res = match self.liquid_chain_service.broadcast(&claim_tx).await {
385 Ok(tx_id) => Ok(tx_id.to_hex()),
386 Err(err) => {
387 debug!(
388 "Could not broadcast claim tx via chain service for Receive swap {swap_id}: {err:?}"
389 );
390 let claim_tx_hex = claim_tx.serialize().to_lower_hex_string();
391 self.swapper
392 .broadcast_tx(self.config.network.into(), &claim_tx_hex)
393 .await
394 }
395 };
396 match broadcast_res {
397 Ok(claim_tx_id) => {
398 self.persister.insert_or_update_payment(
401 PaymentTxData {
402 tx_id: claim_tx_id.clone(),
403 timestamp: Some(utils::now()),
404 asset_id: self.config.lbtc_asset_id(),
405 amount: swap.receiver_amount_sat,
406 fees_sat: 0,
407 payment_type: PaymentType::Receive,
408 is_confirmed: false,
409 unblinding_data: None,
410 },
411 None,
412 false,
413 )?;
414
415 info!("Successfully broadcast claim tx {claim_tx_id} for Receive Swap {swap_id}");
416 _ = self.subscription_notifier.send(claim_tx_id);
419 Ok(())
420 }
421 Err(err) => {
422 debug!(
424 "Could not broadcast claim tx via swapper for Receive swap {swap_id}: {err:?}"
425 );
426 self.persister
427 .unset_receive_swap_claim_tx_id(swap_id, &tx_id)?;
428 Err(err)
429 }
430 }
431 }
432 Err(err) => {
433 debug!(
434 "Failed to set claim_tx_id after creating tx for Receive swap {swap_id}: txid {tx_id}"
435 );
436 Err(err)
437 }
438 }
439 }
440
441 async fn claim_confirmed_lockups(&self, height: u32) -> Result<()> {
442 let receive_swaps: Vec<ReceiveSwap> = self
443 .persister
444 .list_ongoing_receive_swaps(Some(true))?
445 .into_iter()
446 .filter(|s| s.lockup_tx_id.is_some() && s.claim_tx_id.is_none())
447 .collect();
448 info!(
449 "Rescanning {} Receive Swap(s) lockup txs at height {}",
450 receive_swaps.len(),
451 height
452 );
453 for swap in receive_swaps {
454 if let Err(e) = self.claim_confirmed_lockup(&swap).await {
455 error!("Error rescanning Receive Swap {}: {e:?}", swap.id,);
456 }
457 }
458 Ok(())
459 }
460
461 async fn claim_confirmed_lockup(&self, receive_swap: &ReceiveSwap) -> Result<()> {
462 let Some(tx_id) = receive_swap.lockup_tx_id.clone() else {
463 return Ok(());
465 };
466 let swap_id = &receive_swap.id;
467 let tx_hex = self
468 .liquid_chain_service
469 .get_transaction_hex(&Txid::from_str(&tx_id)?)
470 .await?
471 .ok_or(anyhow!("Lockup tx not found for Receive swap {swap_id}"))?
472 .serialize()
473 .to_lower_hex_string();
474 let lockup_tx = self
475 .verify_lockup_tx_status(receive_swap, &tx_id, &tx_hex, true)
476 .await?;
477 if let Err(e) = self.verify_lockup_tx_amount(receive_swap, &lockup_tx).await {
478 self.update_swap_info(swap_id, Failed, None, None, None, None)?;
479 return Err(e);
480 }
481 info!("Receive Swap {swap_id} lockup tx is confirmed");
482 self.claim(swap_id)
483 .await
484 .map_err(|e| anyhow!("Could not claim Receive Swap {swap_id}: {e:?}"))
485 }
486
487 fn validate_state_transition(
488 from_state: PaymentState,
489 to_state: PaymentState,
490 ) -> Result<(), PaymentError> {
491 match (from_state, to_state) {
492 (_, Created) => Err(PaymentError::Generic {
493 err: "Cannot transition to Created state".to_string(),
494 }),
495
496 (Created | Pending, Pending) => Ok(()),
497 (_, Pending) => Err(PaymentError::Generic {
498 err: format!("Cannot transition from {from_state:?} to Pending state"),
499 }),
500
501 (Created | Pending, Complete) => Ok(()),
502 (_, Complete) => Err(PaymentError::Generic {
503 err: format!("Cannot transition from {from_state:?} to Complete state"),
504 }),
505
506 (Created | TimedOut, TimedOut) => Ok(()),
507 (_, TimedOut) => Err(PaymentError::Generic {
508 err: format!("Cannot transition from {from_state:?} to TimedOut state"),
509 }),
510
511 (_, Refundable) => Err(PaymentError::Generic {
512 err: format!("Cannot transition from {from_state:?} to Refundable state"),
513 }),
514
515 (_, RefundPending) => Err(PaymentError::Generic {
516 err: format!("Cannot transition from {from_state:?} to RefundPending state"),
517 }),
518
519 (Complete, Failed) => Err(PaymentError::Generic {
520 err: format!("Cannot transition from {from_state:?} to Failed state"),
521 }),
522 (_, Failed) => Ok(()),
523
524 (_, WaitingFeeAcceptance) => Err(PaymentError::Generic {
525 err: format!("Cannot transition from {from_state:?} to WaitingFeeAcceptance state"),
526 }),
527 }
528 }
529
530 async fn verify_lockup_tx_status(
531 &self,
532 receive_swap: &ReceiveSwap,
533 tx_id: &str,
534 tx_hex: &str,
535 verify_confirmation: bool,
536 ) -> Result<Transaction> {
537 let script = receive_swap.get_swap_script()?;
539 let address = script
540 .to_address(self.config.network.into())
541 .map_err(|e| anyhow!("Failed to get swap script address {e:?}"))?;
542 self.liquid_chain_service
543 .verify_tx(&address, tx_id, tx_hex, verify_confirmation)
544 .await
545 }
546
547 async fn verify_lockup_tx_amount(
548 &self,
549 receive_swap: &ReceiveSwap,
550 lockup_tx: &Transaction,
551 ) -> Result<()> {
552 let secp = Secp256k1::new();
553 let script = receive_swap.get_swap_script()?;
554 let address = script
555 .to_address(self.config.network.into())
556 .map_err(|e| anyhow!("Failed to get swap script address {e:?}"))?;
557 let blinding_key = receive_swap
558 .get_boltz_create_response()?
559 .blinding_key
560 .ok_or(anyhow!("Missing blinding key"))?;
561 let tx_out = lockup_tx
562 .output
563 .iter()
564 .find(|tx_out| tx_out.script_pubkey == address.script_pubkey())
565 .ok_or(anyhow!("Failed to get tx output"))?;
566 let lockup_amount_sat = tx_out
567 .unblind(&secp, SecretKey::from_str(&blinding_key)?)
568 .map(|o| o.value)?;
569 let expected_lockup_amount_sat =
570 receive_swap.receiver_amount_sat + receive_swap.claim_fees_sat;
571 if lockup_amount_sat < expected_lockup_amount_sat {
572 bail!(
573 "Failed to verify lockup amount for Receive Swap {}: {} sat vs {} sat",
574 receive_swap.id,
575 expected_lockup_amount_sat,
576 lockup_amount_sat
577 );
578 }
579 Ok(())
580 }
581}
582
583#[cfg(test)]
584mod tests {
585 use std::collections::{HashMap, HashSet};
586
587 use anyhow::Result;
588
589 use crate::{
590 model::PaymentState::{self, *},
591 test_utils::{
592 persist::{create_persister, new_receive_swap},
593 receive_swap::new_receive_swap_handler,
594 },
595 };
596
597 #[cfg(feature = "browser-tests")]
598 wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
599
600 #[sdk_macros::async_test_all]
601 async fn test_receive_swap_state_transitions() -> Result<()> {
602 create_persister!(persister);
603
604 let receive_swap_state_handler = new_receive_swap_handler(persister.clone())?;
605
606 let valid_combinations = HashMap::from([
608 (
609 Created,
610 HashSet::from([Pending, Complete, TimedOut, Failed]),
611 ),
612 (Pending, HashSet::from([Pending, Complete, Failed])),
613 (TimedOut, HashSet::from([TimedOut, Failed])),
614 (Complete, HashSet::from([])),
615 (Refundable, HashSet::from([Failed])),
616 (RefundPending, HashSet::from([Failed])),
617 (Failed, HashSet::from([Failed])),
618 ]);
619
620 for (first_state, allowed_states) in valid_combinations.iter() {
621 for allowed_state in allowed_states {
622 let receive_swap = new_receive_swap(Some(*first_state), None);
623 persister.insert_or_update_receive_swap(&receive_swap)?;
624
625 assert!(receive_swap_state_handler
626 .update_swap_info(&receive_swap.id, *allowed_state, None, None, None, None)
627 .is_ok());
628 }
629 }
630
631 let all_states = HashSet::from([Created, Pending, Complete, TimedOut, Failed]);
633 let invalid_combinations: HashMap<PaymentState, HashSet<PaymentState>> = valid_combinations
634 .iter()
635 .map(|(first_state, allowed_states)| {
636 (
637 *first_state,
638 all_states.difference(allowed_states).cloned().collect(),
639 )
640 })
641 .collect();
642
643 for (first_state, disallowed_states) in invalid_combinations.iter() {
644 for disallowed_state in disallowed_states {
645 let receive_swap = new_receive_swap(Some(*first_state), None);
646 persister.insert_or_update_receive_swap(&receive_swap)?;
647
648 assert!(receive_swap_state_handler
649 .update_swap_info(&receive_swap.id, *disallowed_state, None, None, None, None)
650 .is_err());
651 }
652 }
653
654 Ok(())
655 }
656}