1use std::collections::HashSet;
2use std::str::FromStr;
3
4use anyhow::{anyhow, bail, Result};
5use boltz_client::swaps::boltz::RevSwapStates;
6use boltz_client::{boltz, Serialize, ToHex};
7use log::{debug, error, info, warn};
8use lwk_wollet::elements::secp256k1_zkp::Secp256k1;
9use lwk_wollet::elements::{Transaction, Txid};
10use lwk_wollet::hashes::hex::DisplayHex;
11use lwk_wollet::secp256k1::SecretKey;
12use sdk_common::utils::Arc;
13use tokio::sync::{broadcast, Mutex};
14
15use crate::chain::liquid::LiquidChainService;
16use crate::model::{BlockListener, PaymentState::*};
17use crate::model::{Config, PaymentTxData, PaymentType, ReceiveSwap};
18use crate::prelude::Swap;
19use crate::{ensure_sdk, utils};
20use crate::{
21 error::PaymentError, model::PaymentState, persist::Persister, swapper::Swapper,
22 wallet::OnchainWallet,
23};
24
25pub const DEFAULT_ZERO_CONF_MAX_SAT: u64 = 1_000_000;
27
28pub(crate) struct ReceiveSwapHandler {
29 config: Config,
30 onchain_wallet: Arc<dyn OnchainWallet>,
31 persister: Arc<Persister>,
32 swapper: Arc<dyn Swapper>,
33 subscription_notifier: broadcast::Sender<String>,
34 liquid_chain_service: Arc<dyn LiquidChainService>,
35 claiming_swaps: Arc<Mutex<HashSet<String>>>,
36}
37
38#[sdk_macros::async_trait]
39impl BlockListener for ReceiveSwapHandler {
40 async fn on_bitcoin_block(&self, _height: u32) {}
41
42 async fn on_liquid_block(&self, height: u32) {
43 if let Err(e) = self.claim_confirmed_lockups(height).await {
44 error!("Error claiming confirmed lockups: {e:?}");
45 }
46 }
47}
48
49impl ReceiveSwapHandler {
50 pub(crate) fn new(
51 config: Config,
52 onchain_wallet: Arc<dyn OnchainWallet>,
53 persister: Arc<Persister>,
54 swapper: Arc<dyn Swapper>,
55 liquid_chain_service: Arc<dyn LiquidChainService>,
56 ) -> Self {
57 let (subscription_notifier, _) = broadcast::channel::<String>(30);
58 Self {
59 config,
60 onchain_wallet,
61 persister,
62 swapper,
63 subscription_notifier,
64 liquid_chain_service,
65 claiming_swaps: Arc::new(Mutex::new(HashSet::new())),
66 }
67 }
68
69 pub(crate) fn subscribe_payment_updates(&self) -> broadcast::Receiver<String> {
70 self.subscription_notifier.subscribe()
71 }
72
73 pub(crate) async fn on_new_status(&self, update: &boltz::SwapStatus) -> Result<()> {
75 let id = &update.id;
76 let status = &update.status;
77 let swap_state = RevSwapStates::from_str(status)
78 .map_err(|_| anyhow!("Invalid RevSwapState for Receive Swap {id}: {status}"))?;
79 let receive_swap = self.fetch_receive_swap_by_id(id)?;
80
81 info!("Handling Receive Swap transition to {swap_state:?} for swap {id}");
82
83 match swap_state {
84 RevSwapStates::SwapExpired
85 | RevSwapStates::InvoiceExpired
86 | RevSwapStates::TransactionFailed
87 | RevSwapStates::TransactionRefunded => {
88 match receive_swap.mrh_tx_id {
89 Some(mrh_tx_id) => {
90 warn!("Swap {id} is expired but MRH payment was received: txid {mrh_tx_id}")
91 }
92 None => {
93 error!("Swap {id} entered into an unrecoverable state: {swap_state:?}");
94 self.update_swap_info(id, Failed, None, None, None, None)?;
95 }
96 }
97 Ok(())
98 }
99 RevSwapStates::TransactionMempool => {
102 let Some(transaction) = update.transaction.clone() else {
103 return Err(anyhow!("Unexpected payload from Boltz status stream"));
104 };
105
106 if let Some(claim_tx_id) = receive_swap.claim_tx_id {
107 return Err(anyhow!(
108 "Claim tx for Receive Swap {id} was already broadcast: txid {claim_tx_id}"
109 ));
110 }
111
112 if let Some(mrh_tx_id) = receive_swap.mrh_tx_id {
114 return Err(anyhow!(
115 "MRH tx for Receive Swap {id} was already broadcast, ignoring swap: txid {mrh_tx_id}"
116 ));
117 }
118
119 let tx_hex = transaction.hex.ok_or(anyhow!(
121 "Missing lockup transaction hex in swap status update"
122 ))?;
123 let lockup_tx = match self
124 .verify_lockup_tx(&receive_swap, &transaction.id, &tx_hex, false)
125 .await
126 {
127 Ok(lockup_tx) => lockup_tx,
128 Err(e) => {
129 return Err(anyhow!(
130 "Swapper mempool reported lockup could not be verified. txid: {}, err: {}",
131 transaction.id,
132 e
133 ));
134 }
135 };
136
137 if let Err(e) = self
138 .verify_lockup_tx_amount(&receive_swap, &lockup_tx)
139 .await
140 {
141 self.update_swap_info(id, Failed, None, None, None, None)?;
143 return Err(anyhow!(
144 "Swapper underpaid lockup amount. txid: {}, err: {}",
145 transaction.id,
146 e
147 ));
148 }
149 info!("Swapper lockup was verified");
150
151 let lockup_tx_id = &transaction.id;
152 self.update_swap_info(id, Pending, None, Some(lockup_tx_id), None, None)?;
153
154 let max_amount_sat = self.config.zero_conf_max_amount_sat();
156 let receiver_amount_sat = receive_swap.receiver_amount_sat;
157 if receiver_amount_sat > max_amount_sat {
158 warn!("[Receive Swap {id}] Amount is too high to claim with zero-conf ({receiver_amount_sat} sat > {max_amount_sat} sat). Waiting for confirmation...");
159 return Ok(());
160 }
161
162 debug!("[Receive Swap {id}] Amount is within valid range for zero-conf ({receiver_amount_sat} < {max_amount_sat} sat)");
163
164 let rbf_explicit = lockup_tx.input.iter().any(|input| input.sequence.is_rbf());
167 if rbf_explicit {
170 warn!("[Receive Swap {id}] Lockup transaction signals RBF. Waiting for confirmation...");
171 return Ok(());
172 }
173 debug!("[Receive Swap {id}] Lockup tx does not signal RBF. Proceeding...");
174
175 if receive_swap.metadata.is_local {
176 if let Err(err) = self.claim(id).await {
178 match err {
179 PaymentError::AlreadyClaimed => {
180 warn!("Funds already claimed for Receive Swap {id}")
181 }
182 _ => error!("Claim for Receive Swap {id} failed: {err}"),
183 }
184 }
185 }
186
187 Ok(())
188 }
189 RevSwapStates::TransactionConfirmed => {
190 let Some(transaction) = update.transaction.clone() else {
191 return Err(anyhow!("Unexpected payload from Boltz status stream"));
192 };
193
194 if let Some(mrh_tx_id) = receive_swap.mrh_tx_id {
196 return Err(anyhow!(
197 "MRH tx for Receive Swap {id} was already broadcast, ignoring swap: txid {mrh_tx_id}"
198 ));
199 }
200
201 let tx_hex = transaction.hex.ok_or(anyhow!(
203 "Missing lockup transaction hex in swap status update"
204 ))?;
205 let lockup_tx = match self
206 .verify_lockup_tx(&receive_swap, &transaction.id, &tx_hex, true)
207 .await
208 {
209 Ok(lockup_tx) => lockup_tx,
210 Err(e) => {
211 return Err(anyhow!(
212 "Swapper reported lockup could not be verified. txid: {}, err: {}",
213 transaction.id,
214 e
215 ));
216 }
217 };
218
219 if let Err(e) = self
220 .verify_lockup_tx_amount(&receive_swap, &lockup_tx)
221 .await
222 {
223 self.update_swap_info(id, Failed, None, None, None, None)?;
225 return Err(anyhow!(
226 "Swapper underpaid lockup amount. txid: {}, err: {}",
227 transaction.id,
228 e
229 ));
230 }
231 info!("Swapper lockup was verified, moving to claim");
232
233 match receive_swap.claim_tx_id {
234 Some(claim_tx_id) => {
235 warn!("Claim tx for Receive Swap {id} was already broadcast: txid {claim_tx_id}")
236 }
237 None => {
238 self.update_swap_info(&receive_swap.id, Pending, None, None, None, None)?;
239
240 if receive_swap.metadata.is_local {
241 if let Err(err) = self.claim(id).await {
243 match err {
244 PaymentError::AlreadyClaimed => {
245 warn!("Funds already claimed for Receive Swap {id}")
246 }
247 _ => error!("Claim for Receive Swap {id} failed: {err}"),
248 }
249 }
250 }
251 }
252 }
253 Ok(())
254 }
255
256 _ => {
257 debug!("Unhandled state for Receive Swap {id}: {swap_state:?}");
258 Ok(())
259 }
260 }
261 }
262
263 fn fetch_receive_swap_by_id(&self, swap_id: &str) -> Result<ReceiveSwap, PaymentError> {
264 self.persister
265 .fetch_receive_swap_by_id(swap_id)
266 .map_err(|_| PaymentError::PersistError)?
267 .ok_or(PaymentError::Generic {
268 err: format!("Receive Swap not found {swap_id}"),
269 })
270 }
271
272 pub(crate) fn update_swap(&self, updated_swap: ReceiveSwap) -> Result<(), PaymentError> {
274 let swap = self.fetch_receive_swap_by_id(&updated_swap.id)?;
275 if updated_swap != swap {
276 info!(
277 "Updating Receive swap {} to {:?} (claim_tx_id = {:?}, lockup_tx_id = {:?}, mrh_tx_id = {:?})",
278 updated_swap.id, updated_swap.state, updated_swap.claim_tx_id, updated_swap.lockup_tx_id, updated_swap.mrh_tx_id
279 );
280 self.persister
281 .insert_or_update_receive_swap(&updated_swap)?;
282 let _ = self.subscription_notifier.send(updated_swap.id);
283 }
284 Ok(())
285 }
286
287 pub(crate) fn update_swap_info(
289 &self,
290 swap_id: &str,
291 to_state: PaymentState,
292 claim_tx_id: Option<&str>,
293 lockup_tx_id: Option<&str>,
294 mrh_tx_id: Option<&str>,
295 mrh_amount_sat: Option<u64>,
296 ) -> Result<(), PaymentError> {
297 info!(
298 "Transitioning Receive swap {} to {:?} (claim_tx_id = {:?}, lockup_tx_id = {:?}, mrh_tx_id = {:?})",
299 swap_id, to_state, claim_tx_id, lockup_tx_id, mrh_tx_id
300 );
301 let swap = self.fetch_receive_swap_by_id(swap_id)?;
302 Self::validate_state_transition(swap.state, to_state)?;
303 self.persister.try_handle_receive_swap_update(
304 swap_id,
305 to_state,
306 claim_tx_id,
307 lockup_tx_id,
308 mrh_tx_id,
309 mrh_amount_sat,
310 )?;
311 let updated_swap = self.fetch_receive_swap_by_id(swap_id)?;
312
313 if mrh_tx_id.is_some() {
314 self.persister.delete_reserved_address(&swap.mrh_address)?;
315 }
316
317 if updated_swap != swap {
318 let _ = self.subscription_notifier.send(updated_swap.id);
319 }
320 Ok(())
321 }
322
323 async fn claim(&self, swap_id: &str) -> Result<(), PaymentError> {
324 {
325 let mut claiming_guard = self.claiming_swaps.lock().await;
326 if claiming_guard.contains(swap_id) {
327 debug!("Claim for swap {swap_id} already in progress, skipping.");
328 return Ok(());
329 }
330 claiming_guard.insert(swap_id.to_string());
331 }
332
333 let result = self.claim_inner(swap_id).await;
334
335 {
336 let mut claiming_guard = self.claiming_swaps.lock().await;
337 claiming_guard.remove(swap_id);
338 }
339
340 result
341 }
342
343 async fn claim_inner(&self, swap_id: &str) -> Result<(), PaymentError> {
344 let swap = self.fetch_receive_swap_by_id(swap_id)?;
345 ensure_sdk!(swap.claim_tx_id.is_none(), PaymentError::AlreadyClaimed);
346
347 info!("Initiating claim for Receive Swap {swap_id}");
348 let claim_address = match swap.claim_address {
349 Some(ref claim_address) => claim_address.clone(),
350 None => {
351 let address = self.onchain_wallet.next_unused_address().await?.to_string();
353 self.persister
354 .set_receive_swap_claim_address(&swap.id, &address)?;
355 address
356 }
357 };
358
359 let crate::prelude::Transaction::Liquid(claim_tx) = self
360 .swapper
361 .create_claim_tx(Swap::Receive(swap.clone()), Some(claim_address.clone()))
362 .await?
363 else {
364 return Err(PaymentError::Generic {
365 err: format!("Constructed invalid transaction for Receive swap {swap_id}"),
366 });
367 };
368
369 let tx_id = claim_tx.txid().to_hex();
372 match self.persister.set_receive_swap_claim_tx_id(swap_id, &tx_id) {
373 Ok(_) => {
374 let broadcast_res = match self.liquid_chain_service.broadcast(&claim_tx).await {
376 Ok(tx_id) => Ok(tx_id.to_hex()),
377 Err(err) => {
378 debug!(
379 "Could not broadcast claim tx via chain service for Receive swap {swap_id}: {err:?}"
380 );
381 let claim_tx_hex = claim_tx.serialize().to_lower_hex_string();
382 self.swapper
383 .broadcast_tx(self.config.network.into(), &claim_tx_hex)
384 .await
385 }
386 };
387 match broadcast_res {
388 Ok(claim_tx_id) => {
389 self.persister.insert_or_update_payment(
392 PaymentTxData {
393 tx_id: claim_tx_id.clone(),
394 timestamp: Some(utils::now()),
395 asset_id: self.config.lbtc_asset_id(),
396 amount: swap.receiver_amount_sat,
397 fees_sat: 0,
398 payment_type: PaymentType::Receive,
399 is_confirmed: false,
400 unblinding_data: None,
401 },
402 None,
403 false,
404 )?;
405
406 info!("Successfully broadcast claim tx {claim_tx_id} for Receive Swap {swap_id}");
407 _ = self.subscription_notifier.send(claim_tx_id);
410 Ok(())
411 }
412 Err(err) => {
413 debug!(
415 "Could not broadcast claim tx via swapper for Receive swap {swap_id}: {err:?}"
416 );
417 self.persister
418 .unset_receive_swap_claim_tx_id(swap_id, &tx_id)?;
419 Err(err)
420 }
421 }
422 }
423 Err(err) => {
424 debug!(
425 "Failed to set claim_tx_id after creating tx for Receive swap {swap_id}: txid {tx_id}"
426 );
427 Err(err)
428 }
429 }
430 }
431
432 async fn claim_confirmed_lockups(&self, height: u32) -> Result<()> {
433 let receive_swaps: Vec<ReceiveSwap> = self
434 .persister
435 .list_ongoing_receive_swaps(Some(true))?
436 .into_iter()
437 .filter(|s| s.lockup_tx_id.is_some() && s.claim_tx_id.is_none())
438 .collect();
439 info!(
440 "Rescanning {} Receive Swap(s) lockup txs at height {}",
441 receive_swaps.len(),
442 height
443 );
444 for swap in receive_swaps {
445 if let Err(e) = self.claim_confirmed_lockup(&swap).await {
446 error!("Error rescanning Receive Swap {}: {e:?}", swap.id,);
447 }
448 }
449 Ok(())
450 }
451
452 async fn claim_confirmed_lockup(&self, receive_swap: &ReceiveSwap) -> Result<()> {
453 let Some(tx_id) = receive_swap.lockup_tx_id.clone() else {
454 return Ok(());
456 };
457 let swap_id = &receive_swap.id;
458 let tx_hex = self
459 .liquid_chain_service
460 .get_transaction_hex(&Txid::from_str(&tx_id)?)
461 .await?
462 .ok_or(anyhow!("Lockup tx not found for Receive swap {swap_id}"))?
463 .serialize()
464 .to_lower_hex_string();
465 let lockup_tx = self
466 .verify_lockup_tx(receive_swap, &tx_id, &tx_hex, true)
467 .await?;
468 if let Err(e) = self.verify_lockup_tx_amount(receive_swap, &lockup_tx).await {
469 self.update_swap_info(swap_id, Failed, None, None, None, None)?;
470 return Err(e);
471 }
472 info!("Receive Swap {swap_id} lockup tx is confirmed");
473 self.claim(swap_id)
474 .await
475 .map_err(|e| anyhow!("Could not claim Receive Swap {swap_id}: {e:?}"))
476 }
477
478 fn validate_state_transition(
479 from_state: PaymentState,
480 to_state: PaymentState,
481 ) -> Result<(), PaymentError> {
482 match (from_state, to_state) {
483 (_, Created) => Err(PaymentError::Generic {
484 err: "Cannot transition to Created state".to_string(),
485 }),
486
487 (Created | Pending, Pending) => Ok(()),
488 (_, Pending) => Err(PaymentError::Generic {
489 err: format!("Cannot transition from {from_state:?} to Pending state"),
490 }),
491
492 (Created | Pending, Complete) => Ok(()),
493 (_, Complete) => Err(PaymentError::Generic {
494 err: format!("Cannot transition from {from_state:?} to Complete state"),
495 }),
496
497 (Created | TimedOut, TimedOut) => Ok(()),
498 (_, TimedOut) => Err(PaymentError::Generic {
499 err: format!("Cannot transition from {from_state:?} to TimedOut state"),
500 }),
501
502 (_, Refundable) => Err(PaymentError::Generic {
503 err: format!("Cannot transition from {from_state:?} to Refundable state"),
504 }),
505
506 (_, RefundPending) => Err(PaymentError::Generic {
507 err: format!("Cannot transition from {from_state:?} to RefundPending state"),
508 }),
509
510 (Complete, Failed) => Err(PaymentError::Generic {
511 err: format!("Cannot transition from {from_state:?} to Failed state"),
512 }),
513 (_, Failed) => Ok(()),
514
515 (_, WaitingFeeAcceptance) => Err(PaymentError::Generic {
516 err: format!("Cannot transition from {from_state:?} to WaitingFeeAcceptance state"),
517 }),
518 }
519 }
520
521 async fn verify_lockup_tx(
522 &self,
523 receive_swap: &ReceiveSwap,
524 tx_id: &str,
525 tx_hex: &str,
526 verify_confirmation: bool,
527 ) -> Result<Transaction> {
528 let script = receive_swap.get_swap_script()?;
530 let address = script
531 .to_address(self.config.network.into())
532 .map_err(|e| anyhow!("Failed to get swap script address {e:?}"))?;
533 self.liquid_chain_service
534 .verify_tx(&address, tx_id, tx_hex, verify_confirmation)
535 .await
536 }
537
538 async fn verify_lockup_tx_amount(
539 &self,
540 receive_swap: &ReceiveSwap,
541 lockup_tx: &Transaction,
542 ) -> Result<()> {
543 let secp = Secp256k1::new();
544 let script = receive_swap.get_swap_script()?;
545 let address = script
546 .to_address(self.config.network.into())
547 .map_err(|e| anyhow!("Failed to get swap script address {e:?}"))?;
548 let blinding_key = receive_swap
549 .get_boltz_create_response()?
550 .blinding_key
551 .ok_or(anyhow!("Missing blinding key"))?;
552 let tx_out = lockup_tx
553 .output
554 .iter()
555 .find(|tx_out| tx_out.script_pubkey == address.script_pubkey())
556 .ok_or(anyhow!("Failed to get tx output"))?;
557 let lockup_amount_sat = tx_out
558 .unblind(&secp, SecretKey::from_str(&blinding_key)?)
559 .map(|o| o.value)?;
560 let expected_lockup_amount_sat =
561 receive_swap.receiver_amount_sat + receive_swap.claim_fees_sat;
562 if lockup_amount_sat < expected_lockup_amount_sat {
563 bail!(
564 "Failed to verify lockup amount for Receive Swap {}: {} sat vs {} sat",
565 receive_swap.id,
566 expected_lockup_amount_sat,
567 lockup_amount_sat
568 );
569 }
570 Ok(())
571 }
572}
573
574#[cfg(test)]
575mod tests {
576 use std::collections::{HashMap, HashSet};
577
578 use anyhow::Result;
579
580 use crate::{
581 model::PaymentState::{self, *},
582 test_utils::{
583 persist::{create_persister, new_receive_swap},
584 receive_swap::new_receive_swap_handler,
585 },
586 };
587
588 #[cfg(feature = "browser-tests")]
589 wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
590
591 #[sdk_macros::async_test_all]
592 async fn test_receive_swap_state_transitions() -> Result<()> {
593 create_persister!(persister);
594
595 let receive_swap_state_handler = new_receive_swap_handler(persister.clone())?;
596
597 let valid_combinations = HashMap::from([
599 (
600 Created,
601 HashSet::from([Pending, Complete, TimedOut, Failed]),
602 ),
603 (Pending, HashSet::from([Pending, Complete, Failed])),
604 (TimedOut, HashSet::from([TimedOut, Failed])),
605 (Complete, HashSet::from([])),
606 (Refundable, HashSet::from([Failed])),
607 (RefundPending, HashSet::from([Failed])),
608 (Failed, HashSet::from([Failed])),
609 ]);
610
611 for (first_state, allowed_states) in valid_combinations.iter() {
612 for allowed_state in allowed_states {
613 let receive_swap = new_receive_swap(Some(*first_state), None);
614 persister.insert_or_update_receive_swap(&receive_swap)?;
615
616 assert!(receive_swap_state_handler
617 .update_swap_info(&receive_swap.id, *allowed_state, None, None, None, None)
618 .is_ok());
619 }
620 }
621
622 let all_states = HashSet::from([Created, Pending, Complete, TimedOut, Failed]);
624 let invalid_combinations: HashMap<PaymentState, HashSet<PaymentState>> = valid_combinations
625 .iter()
626 .map(|(first_state, allowed_states)| {
627 (
628 *first_state,
629 all_states.difference(allowed_states).cloned().collect(),
630 )
631 })
632 .collect();
633
634 for (first_state, disallowed_states) in invalid_combinations.iter() {
635 for disallowed_state in disallowed_states {
636 let receive_swap = new_receive_swap(Some(*first_state), None);
637 persister.insert_or_update_receive_swap(&receive_swap)?;
638
639 assert!(receive_swap_state_handler
640 .update_swap_info(&receive_swap.id, *disallowed_state, None, None, None, None)
641 .is_err());
642 }
643 }
644
645 Ok(())
646 }
647}