1use std::collections::HashSet;
2use std::str::FromStr;
3use std::time::Duration;
4
5use anyhow::{anyhow, bail, Context, Result};
6use boltz_client::{
7 boltz::{self},
8 swaps::boltz::{ChainSwapStates, CreateChainResponse, TransactionInfo},
9 ElementsLockTime, Secp256k1, Serialize, ToHex,
10};
11use elements::{hex::FromHex, Script, Transaction};
12use futures_util::TryFutureExt;
13use log::{debug, error, info, warn};
14use lwk_wollet::hashes::hex::DisplayHex;
15use sdk_common::utils::Arc;
16use tokio::sync::{broadcast, Mutex};
17use tokio_with_wasm::alias as tokio;
18
19use crate::{
20 chain::{bitcoin::BitcoinChainService, liquid::LiquidChainService},
21 elements, ensure_sdk,
22 error::{PaymentError, SdkError, SdkResult},
23 model::{
24 BlockListener, BtcHistory, ChainSwap, ChainSwapUpdate, Config, Direction, LBtcHistory,
25 PaymentState::{self, *},
26 PaymentTxData, PaymentType, Swap, SwapScriptV2, Transaction as SdkTransaction,
27 LIQUID_FEE_RATE_MSAT_PER_VBYTE,
28 },
29 persist::Persister,
30 swapper::Swapper,
31 utils,
32 wallet::OnchainWallet,
33};
34use crate::{
35 error::is_txn_mempool_conflict_error, model::DEFAULT_ONCHAIN_FEE_RATE_LEEWAY_SAT,
36 persist::model::PaymentTxBalance,
37};
38
39pub const ESTIMATED_BTC_CLAIM_TX_VSIZE: u64 = 111;
41
42pub(crate) struct ChainSwapHandler {
43 config: Config,
44 onchain_wallet: Arc<dyn OnchainWallet>,
45 persister: std::sync::Arc<Persister>,
46 swapper: Arc<dyn Swapper>,
47 liquid_chain_service: Arc<dyn LiquidChainService>,
48 bitcoin_chain_service: Arc<dyn BitcoinChainService>,
49 subscription_notifier: broadcast::Sender<String>,
50 claiming_swaps: Arc<Mutex<HashSet<String>>>,
51}
52
53#[sdk_macros::async_trait]
54impl BlockListener for ChainSwapHandler {
55 async fn on_bitcoin_block(&self, height: u32) {
56 if let Err(e) = self.claim_outgoing(height).await {
57 error!("Error claiming outgoing: {e:?}");
58 }
59 }
60
61 async fn on_liquid_block(&self, height: u32) {
62 if let Err(e) = self.refund_outgoing(height).await {
63 warn!("Error refunding outgoing: {e:?}");
64 }
65 if let Err(e) = self.claim_incoming(height).await {
66 error!("Error claiming incoming: {e:?}");
67 }
68 }
69}
70
71impl ChainSwapHandler {
72 pub(crate) fn new(
73 config: Config,
74 onchain_wallet: Arc<dyn OnchainWallet>,
75 persister: std::sync::Arc<Persister>,
76 swapper: Arc<dyn Swapper>,
77 liquid_chain_service: Arc<dyn LiquidChainService>,
78 bitcoin_chain_service: Arc<dyn BitcoinChainService>,
79 ) -> Result<Self> {
80 let (subscription_notifier, _) = broadcast::channel::<String>(30);
81 Ok(Self {
82 config,
83 onchain_wallet,
84 persister,
85 swapper,
86 liquid_chain_service,
87 bitcoin_chain_service,
88 subscription_notifier,
89 claiming_swaps: Arc::new(Mutex::new(HashSet::new())),
90 })
91 }
92
93 pub(crate) fn subscribe_payment_updates(&self) -> broadcast::Receiver<String> {
94 self.subscription_notifier.subscribe()
95 }
96
97 pub(crate) async fn on_new_status(&self, update: &boltz::SwapStatus) -> Result<()> {
99 let id = &update.id;
100 let swap = self.fetch_chain_swap_by_id(id)?;
101
102 match swap.direction {
103 Direction::Incoming => self.on_new_incoming_status(&swap, update).await,
104 Direction::Outgoing => self.on_new_outgoing_status(&swap, update).await,
105 }
106 }
107
108 async fn claim_incoming(&self, height: u32) -> Result<()> {
109 let chain_swaps: Vec<ChainSwap> = self
110 .persister
111 .list_chain_swaps()?
112 .into_iter()
113 .filter(|s| {
114 s.direction == Direction::Incoming && s.state == Pending && s.claim_tx_id.is_none()
115 })
116 .collect();
117 info!(
118 "Rescanning {} incoming Chain Swap(s) server lockup txs at height {}",
119 chain_swaps.len(),
120 height
121 );
122 for swap in chain_swaps {
123 if let Err(e) = self.claim_confirmed_server_lockup(&swap).await {
124 error!(
125 "Error rescanning server lockup of incoming Chain Swap {}: {e:?}",
126 swap.id,
127 );
128 }
129 }
130 Ok(())
131 }
132
133 async fn claim_outgoing(&self, height: u32) -> Result<()> {
134 let chain_swaps: Vec<ChainSwap> = self
135 .persister
136 .list_chain_swaps()?
137 .into_iter()
138 .filter(|s| {
139 s.direction == Direction::Outgoing && s.state == Pending && s.claim_tx_id.is_none()
140 })
141 .collect();
142 info!(
143 "Rescanning {} outgoing Chain Swap(s) server lockup txs at height {}",
144 chain_swaps.len(),
145 height
146 );
147 for swap in chain_swaps {
148 if let Err(e) = self.claim_confirmed_server_lockup(&swap).await {
149 error!(
150 "Error rescanning server lockup of outgoing Chain Swap {}: {e:?}",
151 swap.id
152 );
153 }
154 }
155 Ok(())
156 }
157
158 async fn fetch_script_history(&self, swap_script: &SwapScriptV2) -> Result<Vec<(String, i32)>> {
159 let history = match swap_script {
160 SwapScriptV2::Liquid(_) => self
161 .fetch_liquid_script_history(swap_script)
162 .await?
163 .into_iter()
164 .map(|h| (h.txid.to_hex(), h.height))
165 .collect(),
166 SwapScriptV2::Bitcoin(_) => self
167 .fetch_bitcoin_script_history(swap_script)
168 .await?
169 .into_iter()
170 .map(|h| (h.txid.to_hex(), h.height))
171 .collect(),
172 };
173 Ok(history)
174 }
175
176 async fn claim_confirmed_server_lockup(&self, swap: &ChainSwap) -> Result<()> {
177 let Some(tx_id) = swap.server_lockup_tx_id.clone() else {
178 return Ok(());
180 };
181 let swap_id = &swap.id;
182 let swap_script = swap.get_claim_swap_script()?;
183 let script_history = self.fetch_script_history(&swap_script).await?;
184 let (_tx_history, tx_height) =
185 script_history
186 .iter()
187 .find(|h| h.0.eq(&tx_id))
188 .ok_or(anyhow!(
189 "Server lockup tx for Chain Swap {swap_id} was not found, txid={tx_id}"
190 ))?;
191 if *tx_height > 0 {
192 info!("Chain Swap {swap_id} server lockup tx is confirmed");
193 self.claim(swap_id)
194 .await
195 .map_err(|e| anyhow!("Could not claim Chain Swap {swap_id}: {e:?}"))?;
196 }
197 Ok(())
198 }
199
200 async fn on_new_incoming_status(
201 &self,
202 swap: &ChainSwap,
203 update: &boltz::SwapStatus,
204 ) -> Result<()> {
205 let id = update.id.clone();
206 let status = &update.status;
207 let swap_state = ChainSwapStates::from_str(status)
208 .map_err(|_| anyhow!("Invalid ChainSwapState for Chain Swap {id}: {status}"))?;
209
210 info!("Handling incoming Chain Swap transition to {status:?} for swap {id}");
211 match swap_state {
213 ChainSwapStates::TransactionMempool | ChainSwapStates::TransactionConfirmed => {
215 if let Some(zero_conf_rejected) = update.zero_conf_rejected {
216 info!("Is zero conf rejected for Chain Swap {id}: {zero_conf_rejected}");
217 self.persister
218 .update_chain_swap_accept_zero_conf(&id, !zero_conf_rejected)?;
219 }
220 if let Some(transaction) = update.transaction.clone() {
221 let actual_payer_amount =
222 self.fetch_incoming_swap_actual_payer_amount(swap).await?;
223 self.persister
224 .update_actual_payer_amount(&swap.id, actual_payer_amount)?;
225
226 self.update_swap_info(&ChainSwapUpdate {
227 swap_id: id,
228 to_state: Pending,
229 user_lockup_tx_id: Some(transaction.id),
230 ..Default::default()
231 })?;
232 }
233 Ok(())
234 }
235
236 ChainSwapStates::TransactionServerMempool => {
239 match swap.claim_tx_id.clone() {
240 None => {
241 let Some(transaction) = update.transaction.clone() else {
242 return Err(anyhow!("Unexpected payload from Boltz status stream"));
243 };
244
245 if let Err(e) = self.verify_user_lockup_tx(swap).await {
246 warn!("User lockup transaction for incoming Chain Swap {} could not be verified. err: {}", swap.id, e);
247 return Err(anyhow!("Could not verify user lockup transaction: {e}",));
248 }
249
250 if let Err(e) = self
251 .verify_server_lockup_tx(swap, &transaction, false)
252 .await
253 {
254 warn!("Server lockup mempool transaction for incoming Chain Swap {} could not be verified. txid: {}, err: {}",
255 swap.id,
256 transaction.id,
257 e);
258 return Err(anyhow!(
259 "Could not verify server lockup transaction {}: {e}",
260 transaction.id
261 ));
262 }
263
264 info!("Server lockup mempool transaction was verified for incoming Chain Swap {}", swap.id);
265 self.update_swap_info(&ChainSwapUpdate {
266 swap_id: id.clone(),
267 to_state: Pending,
268 server_lockup_tx_id: Some(transaction.id),
269 ..Default::default()
270 })?;
271
272 if swap.accept_zero_conf {
273 maybe_delay_before_claim(swap.metadata.is_local).await;
274 self.claim(&id).await.map_err(|e| {
275 error!("Could not cooperate Chain Swap {id} claim: {e}");
276 anyhow!("Could not post claim details. Err: {e:?}")
277 })?;
278 }
279 }
280 Some(claim_tx_id) => {
281 warn!("Claim tx for Chain Swap {id} was already broadcast: txid {claim_tx_id}")
282 }
283 };
284 Ok(())
285 }
286
287 ChainSwapStates::TransactionServerConfirmed => {
290 match swap.claim_tx_id.clone() {
291 None => {
292 let Some(transaction) = update.transaction.clone() else {
293 return Err(anyhow!("Unexpected payload from Boltz status stream"));
294 };
295
296 if let Err(e) = self.verify_user_lockup_tx(swap).await {
297 warn!("User lockup transaction for incoming Chain Swap {} could not be verified. err: {}", swap.id, e);
298 return Err(anyhow!("Could not verify user lockup transaction: {e}",));
299 }
300
301 let verify_res =
302 self.verify_server_lockup_tx(swap, &transaction, true).await;
303
304 self.update_swap_info(&ChainSwapUpdate {
308 swap_id: id.clone(),
309 to_state: Pending,
310 server_lockup_tx_id: Some(transaction.id.clone()),
311 ..Default::default()
312 })?;
313
314 match verify_res {
315 Ok(_) => {
316 info!("Server lockup transaction was verified for incoming Chain Swap {}", swap.id);
317
318 maybe_delay_before_claim(swap.metadata.is_local).await;
319 self.claim(&id).await.map_err(|e| {
320 error!("Could not cooperate Chain Swap {id} claim: {e}");
321 anyhow!("Could not post claim details. Err: {e:?}")
322 })?;
323 }
324 Err(e) => {
325 warn!("Server lockup transaction for incoming Chain Swap {} could not be verified. txid: {}, err: {}", swap.id, transaction.id, e);
326 return Err(anyhow!(
327 "Could not verify server lockup transaction {}: {e}",
328 transaction.id
329 ));
330 }
331 }
332 }
333 Some(claim_tx_id) => {
334 warn!("Claim tx for Chain Swap {id} was already broadcast: txid {claim_tx_id}")
335 }
336 };
337 Ok(())
338 }
339
340 ChainSwapStates::TransactionFailed
347 | ChainSwapStates::TransactionLockupFailed
348 | ChainSwapStates::TransactionRefunded
349 | ChainSwapStates::SwapExpired => {
350 let is_zero_amount = swap.payer_amount_sat == 0;
352 if matches!(swap_state, ChainSwapStates::TransactionLockupFailed) && is_zero_amount
353 {
354 match self.handle_amountless_update(swap).await {
355 Ok(_) => {
356 return Ok(()); }
359 Err(e) => error!("Failed to accept the quote for swap {}: {e:?}", &swap.id),
361 }
362 }
363
364 match swap.refund_tx_id.clone() {
365 None => {
366 warn!("Chain Swap {id} is in an unrecoverable state: {swap_state:?}");
367 if self
368 .user_lockup_tx_exists(swap)
369 .await
370 .context("Failed to check if user lockup tx exists")?
371 {
372 info!("Chain Swap {id} user lockup tx was broadcast. Setting the swap to refundable.");
373 self.update_swap_info(&ChainSwapUpdate {
374 swap_id: id,
375 to_state: Refundable,
376 ..Default::default()
377 })?;
378 } else {
379 info!("Chain Swap {id} user lockup tx was never broadcast. Resolving payment as failed.");
380 self.update_swap_info(&ChainSwapUpdate {
381 swap_id: id,
382 to_state: Failed,
383 ..Default::default()
384 })?;
385 }
386 }
387 Some(refund_tx_id) => warn!(
388 "Refund for Chain Swap {id} was already broadcast: txid {refund_tx_id}"
389 ),
390 };
391 Ok(())
392 }
393
394 _ => {
395 debug!("Unhandled state for Chain Swap {id}: {swap_state:?}");
396 Ok(())
397 }
398 }
399 }
400
401 async fn handle_amountless_update(&self, swap: &ChainSwap) -> Result<(), PaymentError> {
402 let id = swap.id.clone();
403
404 if swap.accepted_receiver_amount_sat.is_some() {
408 info!("Handling amountless update for swap {id} with existing accepted receiver amount. Erasing the accepted amount now...");
409 self.persister.update_accepted_receiver_amount(&id, None)?;
410 }
411
412 let quote = self
413 .swapper
414 .get_zero_amount_chain_swap_quote(&id)
415 .await
416 .map(|quote| quote.to_sat())?;
417 info!("Got quote of {quote} sat for swap {}", &id);
418
419 match self.validate_amountless_swap(swap, quote).await? {
420 ValidateAmountlessSwapResult::ReadyForAccepting {
421 user_lockup_amount_sat,
422 receiver_amount_sat,
423 } => {
424 debug!("Zero-amount swap validated. Auto-accepting...");
425 self.persister
426 .update_actual_payer_amount(&id, user_lockup_amount_sat)?;
427 self.persister
428 .update_accepted_receiver_amount(&id, Some(receiver_amount_sat))?;
429 self.swapper
430 .accept_zero_amount_chain_swap_quote(&id, quote)
431 .inspect_err(|e| {
432 error!("Failed to accept zero-amount swap {id} quote: {e} - trying to erase the accepted receiver amount...");
433 let _ = self.persister.update_accepted_receiver_amount(&id, None);
434 })
435 .await?;
436 self.persister.set_chain_swap_auto_accepted_fees(&id)
437 }
438 ValidateAmountlessSwapResult::RequiresUserAction {
439 user_lockup_amount_sat,
440 } => {
441 debug!("Zero-amount swap validated. Fees are too high for automatic accepting. Moving to WaitingFeeAcceptance");
442 self.persister
443 .update_actual_payer_amount(&id, user_lockup_amount_sat)?;
444 self.update_swap_info(&ChainSwapUpdate {
445 swap_id: id,
446 to_state: WaitingFeeAcceptance,
447 ..Default::default()
448 })
449 }
450 }
451 }
452
453 async fn validate_amountless_swap(
454 &self,
455 swap: &ChainSwap,
456 quote_server_lockup_amount_sat: u64,
457 ) -> Result<ValidateAmountlessSwapResult, PaymentError> {
458 debug!("Validating {swap:?}");
459
460 ensure_sdk!(
461 matches!(swap.direction, Direction::Incoming),
462 PaymentError::generic(format!(
463 "Only an incoming chain swap can be a zero-amount swap. Swap ID: {}",
464 &swap.id
465 ))
466 );
467
468 let script_pubkey = swap.get_receive_lockup_swap_script_pubkey(self.config.network)?;
469 let script_balance = self
470 .bitcoin_chain_service
471 .script_get_balance_with_retry(script_pubkey.as_script(), 10)
472 .await?;
473 debug!("Found lockup balance {script_balance:?}");
474 let user_lockup_amount_sat = match script_balance.confirmed > 0 {
475 true => script_balance.confirmed,
476 false => match script_balance.unconfirmed > 0 {
477 true => script_balance.unconfirmed.unsigned_abs(),
478 false => 0,
479 },
480 };
481 ensure_sdk!(
482 user_lockup_amount_sat > 0,
483 PaymentError::generic("Lockup address has no confirmed or unconfirmed balance")
484 );
485
486 let pair = swap.get_boltz_pair()?;
487
488 let server_fees_estimate_sat = pair.fees.server();
490 let service_fees_sat = pair.fees.boltz(user_lockup_amount_sat);
491 let server_lockup_amount_estimate_sat =
492 user_lockup_amount_sat - server_fees_estimate_sat - service_fees_sat;
493
494 let server_fees_leeway_sat = self
496 .config
497 .onchain_fee_rate_leeway_sat
498 .unwrap_or(DEFAULT_ONCHAIN_FEE_RATE_LEEWAY_SAT);
499 let min_auto_accept_server_lockup_amount_sat =
500 server_lockup_amount_estimate_sat.saturating_sub(server_fees_leeway_sat);
501
502 debug!(
503 "user_lockup_amount_sat = {user_lockup_amount_sat}, \
504 service_fees_sat = {service_fees_sat}, \
505 server_fees_estimate_sat = {server_fees_estimate_sat}, \
506 server_fees_leeway_sat = {server_fees_leeway_sat}, \
507 min_auto_accept_server_lockup_amount_sat = {min_auto_accept_server_lockup_amount_sat}, \
508 quote_server_lockup_amount_sat = {quote_server_lockup_amount_sat}",
509 );
510
511 if min_auto_accept_server_lockup_amount_sat > quote_server_lockup_amount_sat {
512 Ok(ValidateAmountlessSwapResult::RequiresUserAction {
513 user_lockup_amount_sat,
514 })
515 } else {
516 let receiver_amount_sat = quote_server_lockup_amount_sat - swap.claim_fees_sat;
517 Ok(ValidateAmountlessSwapResult::ReadyForAccepting {
518 user_lockup_amount_sat,
519 receiver_amount_sat,
520 })
521 }
522 }
523
524 async fn on_new_outgoing_status(
525 &self,
526 swap: &ChainSwap,
527 update: &boltz::SwapStatus,
528 ) -> Result<()> {
529 let id = update.id.clone();
530 let status = &update.status;
531 let swap_state = ChainSwapStates::from_str(status)
532 .map_err(|_| anyhow!("Invalid ChainSwapState for Chain Swap {id}: {status}"))?;
533
534 info!("Handling outgoing Chain Swap transition to {status:?} for swap {id}");
535 match swap_state {
537 ChainSwapStates::Created => {
539 match (swap.state, swap.user_lockup_tx_id.clone()) {
540 (TimedOut, _) => warn!("Chain Swap {id} timed out, do not broadcast a lockup tx"),
542
543 (_, None) => {
545 let create_response = swap.get_boltz_create_response()?;
546 let user_lockup_tx = self.lockup_funds(&id, &create_response).await?;
547 let lockup_tx_id = user_lockup_tx.txid().to_string();
548 let lockup_tx_fees_sat: u64 = user_lockup_tx.all_fees().values().sum();
549
550 self.persister.insert_or_update_payment(PaymentTxData {
553 tx_id: lockup_tx_id.clone(),
554 timestamp: Some(utils::now()),
555 fees_sat: lockup_tx_fees_sat,
556 is_confirmed: false,
557 unblinding_data: None,
558 },
559 &[PaymentTxBalance {
560 asset_id: self.config.lbtc_asset_id().to_string(),
561 amount: create_response.lockup_details.amount,
562 payment_type: PaymentType::Send,
563 }],
564 None, false)?;
565
566 self.update_swap_info(&ChainSwapUpdate {
567 swap_id: id,
568 to_state: Pending,
569 user_lockup_tx_id: Some(lockup_tx_id),
570 ..Default::default()
571 })?;
572 },
573
574 (_, Some(lockup_tx_id)) => warn!("User lockup tx for Chain Swap {id} was already broadcast: txid {lockup_tx_id}"),
576 };
577 Ok(())
578 }
579
580 ChainSwapStates::TransactionMempool | ChainSwapStates::TransactionConfirmed => {
582 if let Some(zero_conf_rejected) = update.zero_conf_rejected {
583 info!("Is zero conf rejected for Chain Swap {id}: {zero_conf_rejected}");
584 self.persister
585 .update_chain_swap_accept_zero_conf(&id, !zero_conf_rejected)?;
586 }
587 if let Some(transaction) = update.transaction.clone() {
588 self.update_swap_info(&ChainSwapUpdate {
589 swap_id: id,
590 to_state: Pending,
591 user_lockup_tx_id: Some(transaction.id),
592 ..Default::default()
593 })?;
594 }
595 Ok(())
596 }
597
598 ChainSwapStates::TransactionServerMempool => {
601 match swap.claim_tx_id.clone() {
602 None => {
603 let Some(transaction) = update.transaction.clone() else {
604 return Err(anyhow!("Unexpected payload from Boltz status stream"));
605 };
606
607 if let Err(e) = self.verify_user_lockup_tx(swap).await {
608 warn!("User lockup transaction for outgoing Chain Swap {} could not be verified. err: {}", swap.id, e);
609 return Err(anyhow!("Could not verify user lockup transaction: {e}",));
610 }
611
612 if let Err(e) = self
613 .verify_server_lockup_tx(swap, &transaction, false)
614 .await
615 {
616 warn!("Server lockup mempool transaction for outgoing Chain Swap {} could not be verified. txid: {}, err: {}",
617 swap.id,
618 transaction.id,
619 e);
620 return Err(anyhow!(
621 "Could not verify server lockup transaction {}: {e}",
622 transaction.id
623 ));
624 }
625
626 info!("Server lockup mempool transaction was verified for outgoing Chain Swap {}", swap.id);
627 self.update_swap_info(&ChainSwapUpdate {
628 swap_id: id.clone(),
629 to_state: Pending,
630 server_lockup_tx_id: Some(transaction.id),
631 ..Default::default()
632 })?;
633
634 if swap.accept_zero_conf {
635 maybe_delay_before_claim(swap.metadata.is_local).await;
636 self.claim(&id).await.map_err(|e| {
637 error!("Could not cooperate Chain Swap {id} claim: {e}");
638 anyhow!("Could not post claim details. Err: {e:?}")
639 })?;
640 }
641 }
642 Some(claim_tx_id) => {
643 warn!("Claim tx for Chain Swap {id} was already broadcast: txid {claim_tx_id}")
644 }
645 };
646 Ok(())
647 }
648
649 ChainSwapStates::TransactionServerConfirmed => {
652 match swap.claim_tx_id.clone() {
653 None => {
654 let Some(transaction) = update.transaction.clone() else {
655 return Err(anyhow!("Unexpected payload from Boltz status stream"));
656 };
657
658 if let Err(e) = self.verify_user_lockup_tx(swap).await {
659 warn!("User lockup transaction for outgoing Chain Swap {} could not be verified. err: {}", swap.id, e);
660 return Err(anyhow!("Could not verify user lockup transaction: {e}",));
661 }
662
663 if let Err(e) = self.verify_server_lockup_tx(swap, &transaction, true).await
664 {
665 warn!("Server lockup transaction for outgoing Chain Swap {} could not be verified. txid: {}, err: {}",
666 swap.id,
667 transaction.id,
668 e);
669 return Err(anyhow!(
670 "Could not verify server lockup transaction {}: {e}",
671 transaction.id
672 ));
673 }
674
675 info!(
676 "Server lockup transaction was verified for outgoing Chain Swap {}",
677 swap.id
678 );
679 self.update_swap_info(&ChainSwapUpdate {
680 swap_id: id.clone(),
681 to_state: Pending,
682 server_lockup_tx_id: Some(transaction.id),
683 ..Default::default()
684 })?;
685
686 maybe_delay_before_claim(swap.metadata.is_local).await;
687 self.claim(&id).await.map_err(|e| {
688 error!("Could not cooperate Chain Swap {id} claim: {e}");
689 anyhow!("Could not post claim details. Err: {e:?}")
690 })?;
691 }
692 Some(claim_tx_id) => {
693 warn!("Claim tx for Chain Swap {id} was already broadcast: txid {claim_tx_id}")
694 }
695 };
696 Ok(())
697 }
698
699 ChainSwapStates::TransactionFailed
706 | ChainSwapStates::TransactionLockupFailed
707 | ChainSwapStates::TransactionRefunded
708 | ChainSwapStates::SwapExpired => {
709 match &swap.refund_tx_id {
710 None => {
711 warn!("Chain Swap {id} is in an unrecoverable state: {swap_state:?}");
712 match swap.user_lockup_tx_id {
713 Some(_) => {
714 warn!("Chain Swap {id} user lockup tx has been broadcast.");
715 let refund_tx_id = match self.refund_outgoing_swap(swap, true).await
716 {
717 Ok(refund_tx_id) => Some(refund_tx_id),
718 Err(e) => {
719 warn!(
720 "Could not refund Send swap {id} cooperatively: {e:?}"
721 );
722 None
723 }
724 };
725 self.update_swap_info(&ChainSwapUpdate {
729 swap_id: id,
730 to_state: RefundPending,
731 refund_tx_id,
732 ..Default::default()
733 })?;
734 }
735 None => {
736 warn!("Chain Swap {id} user lockup tx was never broadcast. Resolving payment as failed.");
737 self.update_swap_info(&ChainSwapUpdate {
738 swap_id: id,
739 to_state: Failed,
740 ..Default::default()
741 })?;
742 }
743 }
744 }
745 Some(refund_tx_id) => warn!(
746 "Refund tx for Chain Swap {id} was already broadcast: txid {refund_tx_id}"
747 ),
748 };
749 Ok(())
750 }
751
752 _ => {
753 debug!("Unhandled state for Chain Swap {id}: {swap_state:?}");
754 Ok(())
755 }
756 }
757 }
758
759 async fn lockup_funds(
760 &self,
761 swap_id: &str,
762 create_response: &CreateChainResponse,
763 ) -> Result<Transaction, PaymentError> {
764 let lockup_details = create_response.lockup_details.clone();
765
766 debug!(
767 "Initiated Chain Swap: send {} sats to liquid address {}",
768 lockup_details.amount, lockup_details.lockup_address
769 );
770
771 let lockup_tx = self
772 .onchain_wallet
773 .build_tx_or_drain_tx(
774 Some(LIQUID_FEE_RATE_MSAT_PER_VBYTE),
775 &lockup_details.lockup_address,
776 &self.config.lbtc_asset_id().to_string(),
777 lockup_details.amount,
778 )
779 .await?;
780
781 let lockup_tx_id = self
782 .liquid_chain_service
783 .broadcast(&lockup_tx)
784 .await?
785 .to_string();
786
787 debug!(
788 "Successfully broadcast lockup transaction for Chain Swap {swap_id}. Lockup tx id: {lockup_tx_id}"
789 );
790 Ok(lockup_tx)
791 }
792
793 fn fetch_chain_swap_by_id(&self, swap_id: &str) -> Result<ChainSwap, PaymentError> {
794 self.persister
795 .fetch_chain_swap_by_id(swap_id)
796 .map_err(|_| PaymentError::PersistError)?
797 .ok_or(PaymentError::Generic {
798 err: format!("Chain Swap not found {swap_id}"),
799 })
800 }
801
802 pub(crate) fn update_swap(&self, updated_swap: ChainSwap) -> Result<(), PaymentError> {
804 let swap = self.fetch_chain_swap_by_id(&updated_swap.id)?;
805 if updated_swap != swap {
806 info!(
807 "Updating Chain swap {} to {:?} (user_lockup_tx_id = {:?}, server_lockup_tx_id = {:?}, claim_tx_id = {:?}, refund_tx_id = {:?})",
808 updated_swap.id,
809 updated_swap.state,
810 updated_swap.user_lockup_tx_id,
811 updated_swap.server_lockup_tx_id,
812 updated_swap.claim_tx_id,
813 updated_swap.refund_tx_id
814 );
815 self.persister.insert_or_update_chain_swap(&updated_swap)?;
816 let _ = self.subscription_notifier.send(updated_swap.id);
817 }
818 Ok(())
819 }
820
821 pub(crate) fn update_swap_info(
823 &self,
824 swap_update: &ChainSwapUpdate,
825 ) -> Result<(), PaymentError> {
826 info!("Updating Chain swap {swap_update:?}");
827 let swap = self.fetch_chain_swap_by_id(&swap_update.swap_id)?;
828 Self::validate_state_transition(swap.state, swap_update.to_state)?;
829 self.persister.try_handle_chain_swap_update(swap_update)?;
830 let updated_swap = self.fetch_chain_swap_by_id(&swap_update.swap_id)?;
831 if updated_swap != swap {
832 let _ = self.subscription_notifier.send(updated_swap.id);
833 }
834 Ok(())
835 }
836
837 async fn claim(&self, swap_id: &str) -> Result<(), PaymentError> {
838 {
839 let mut claiming_guard = self.claiming_swaps.lock().await;
840 if claiming_guard.contains(swap_id) {
841 debug!("Claim for swap {swap_id} already in progress, skipping.");
842 return Ok(());
843 }
844 claiming_guard.insert(swap_id.to_string());
845 }
846
847 let result = self.claim_inner(swap_id).await;
848
849 {
850 let mut claiming_guard = self.claiming_swaps.lock().await;
851 claiming_guard.remove(swap_id);
852 }
853
854 result
855 }
856
857 async fn claim_inner(&self, swap_id: &str) -> Result<(), PaymentError> {
858 let swap = self.fetch_chain_swap_by_id(swap_id)?;
859 ensure_sdk!(swap.claim_tx_id.is_none(), PaymentError::AlreadyClaimed);
860
861 debug!("Initiating claim for Chain Swap {swap_id}");
862 let claim_address = match (swap.direction, swap.claim_address.clone()) {
865 (Direction::Incoming, None) => {
866 Some(self.onchain_wallet.next_unused_address().await?.to_string())
867 }
868 _ => swap.claim_address.clone(),
869 };
870 let claim_tx = self
871 .swapper
872 .create_claim_tx(Swap::Chain(swap.clone()), claim_address.clone())
873 .await?;
874
875 let tx_id = claim_tx.txid();
878 match self
879 .persister
880 .set_chain_swap_claim(swap_id, claim_address, &tx_id)
881 {
882 Ok(_) => {
883 let broadcast_res = match claim_tx {
884 SdkTransaction::Liquid(tx) => {
886 match self.liquid_chain_service.broadcast(&tx).await {
887 Ok(tx_id) => Ok(tx_id.to_hex()),
888 Err(e) if is_txn_mempool_conflict_error(&e) => {
889 Err(PaymentError::AlreadyClaimed)
890 }
891 Err(err) => {
892 debug!(
893 "Could not broadcast claim tx via chain service for Chain swap {swap_id}: {err:?}"
894 );
895 let claim_tx_hex = tx.serialize().to_lower_hex_string();
896 self.swapper
897 .broadcast_tx(self.config.network.into(), &claim_tx_hex)
898 .await
899 }
900 }
901 }
902 SdkTransaction::Bitcoin(tx) => self
903 .bitcoin_chain_service
904 .broadcast(&tx)
905 .await
906 .map(|tx_id| tx_id.to_hex())
907 .map_err(|err| PaymentError::Generic {
908 err: err.to_string(),
909 }),
910 };
911
912 match broadcast_res {
913 Ok(claim_tx_id) => {
914 let payment_id = match swap.direction {
915 Direction::Incoming => {
916 self.persister.insert_or_update_payment(
919 PaymentTxData {
920 tx_id: claim_tx_id.clone(),
921 timestamp: Some(utils::now()),
922 fees_sat: 0,
923 is_confirmed: false,
924 unblinding_data: None,
925 },
926 &[PaymentTxBalance {
927 asset_id: self.config.lbtc_asset_id().to_string(),
928 amount: swap
929 .accepted_receiver_amount_sat
930 .unwrap_or(swap.receiver_amount_sat),
931 payment_type: PaymentType::Receive,
932 }],
933 None,
934 false,
935 )?;
936 Some(claim_tx_id.clone())
937 }
938 Direction::Outgoing => swap.user_lockup_tx_id,
939 };
940
941 info!("Successfully broadcast claim tx {claim_tx_id} for Chain Swap {swap_id}");
942 payment_id.and_then(|payment_id| {
945 self.subscription_notifier.send(payment_id).ok()
946 });
947 Ok(())
948 }
949 Err(err) => {
950 debug!(
952 "Could not broadcast claim tx via swapper for Chain swap {swap_id}: {err:?}"
953 );
954 self.persister
955 .unset_chain_swap_claim_tx_id(swap_id, &tx_id)?;
956 Err(err)
957 }
958 }
959 }
960 Err(err) => {
961 debug!(
962 "Failed to set claim_tx_id after creating tx for Chain swap {swap_id}: txid {tx_id}"
963 );
964 Err(err)
965 }
966 }
967 }
968
969 pub(crate) async fn prepare_refund(
970 &self,
971 lockup_address: &str,
972 refund_address: &str,
973 fee_rate_sat_per_vb: u32,
974 ) -> SdkResult<(u32, u64, Option<String>)> {
975 let swap = self
976 .persister
977 .fetch_chain_swap_by_lockup_address(lockup_address)?
978 .ok_or(SdkError::generic(format!(
979 "Chain Swap with lockup address {lockup_address} not found"
980 )))?;
981
982 let refund_tx_id = swap.refund_tx_id.clone();
983 if let Some(refund_tx_id) = &refund_tx_id {
984 warn!(
985 "A refund tx for Chain Swap {} was already broadcast: txid {refund_tx_id}",
986 swap.id
987 );
988 }
989
990 let (refund_tx_size, refund_tx_fees_sat) = self
991 .swapper
992 .estimate_refund_broadcast(
993 Swap::Chain(swap),
994 refund_address,
995 Some(fee_rate_sat_per_vb as f64),
996 true,
997 )
998 .await?;
999
1000 Ok((refund_tx_size, refund_tx_fees_sat, refund_tx_id))
1001 }
1002
1003 pub(crate) async fn refund_incoming_swap(
1004 &self,
1005 lockup_address: &str,
1006 refund_address: &str,
1007 broadcast_fee_rate_sat_per_vb: u32,
1008 is_cooperative: bool,
1009 ) -> Result<String, PaymentError> {
1010 let swap = self
1011 .persister
1012 .fetch_chain_swap_by_lockup_address(lockup_address)?
1013 .ok_or(PaymentError::Generic {
1014 err: format!("Swap for lockup address {lockup_address} not found"),
1015 })?;
1016 let id = &swap.id;
1017
1018 ensure_sdk!(
1019 swap.state.is_refundable(),
1020 PaymentError::Generic {
1021 err: format!("Chain Swap {id} was not in refundable state")
1022 }
1023 );
1024
1025 info!("Initiating refund for incoming Chain Swap {id}, is_cooperative: {is_cooperative}");
1026
1027 let SwapScriptV2::Bitcoin(swap_script) = swap.get_lockup_swap_script()? else {
1028 return Err(PaymentError::Generic {
1029 err: "Unexpected swap script type found".to_string(),
1030 });
1031 };
1032
1033 let script_pk = swap_script
1034 .to_address(self.config.network.as_bitcoin_chain())
1035 .map_err(|e| anyhow!("Could not retrieve address from swap script: {e:?}"))?
1036 .script_pubkey();
1037 let utxos = self
1038 .bitcoin_chain_service
1039 .get_script_utxos(&script_pk)
1040 .await?;
1041
1042 let SdkTransaction::Bitcoin(refund_tx) = self
1043 .swapper
1044 .create_refund_tx(
1045 Swap::Chain(swap.clone()),
1046 refund_address,
1047 utxos,
1048 Some(broadcast_fee_rate_sat_per_vb as f64),
1049 is_cooperative,
1050 )
1051 .await?
1052 else {
1053 return Err(PaymentError::Generic {
1054 err: format!("Unexpected refund tx type returned for incoming Chain swap {id}",),
1055 });
1056 };
1057 let refund_tx_id = self
1058 .bitcoin_chain_service
1059 .broadcast(&refund_tx)
1060 .await?
1061 .to_string();
1062
1063 info!("Successfully broadcast refund for incoming Chain Swap {id}, is_cooperative: {is_cooperative}");
1064
1065 self.update_swap_info(&ChainSwapUpdate {
1069 swap_id: swap.id,
1070 to_state: RefundPending,
1071 refund_tx_id: Some(refund_tx_id.clone()),
1072 ..Default::default()
1073 })?;
1074
1075 Ok(refund_tx_id)
1076 }
1077
1078 pub(crate) async fn refund_outgoing_swap(
1079 &self,
1080 swap: &ChainSwap,
1081 is_cooperative: bool,
1082 ) -> Result<String, PaymentError> {
1083 ensure_sdk!(
1084 swap.refund_tx_id.is_none(),
1085 PaymentError::Generic {
1086 err: format!(
1087 "A refund tx for outgoing Chain Swap {} was already broadcast",
1088 swap.id
1089 )
1090 }
1091 );
1092
1093 info!(
1094 "Initiating refund for outgoing Chain Swap {}, is_cooperative: {is_cooperative}",
1095 swap.id
1096 );
1097
1098 let SwapScriptV2::Liquid(swap_script) = swap.get_lockup_swap_script()? else {
1099 return Err(PaymentError::Generic {
1100 err: "Unexpected swap script type found".to_string(),
1101 });
1102 };
1103
1104 let script_pk = swap_script
1105 .to_address(self.config.network.into())
1106 .map_err(|e| anyhow!("Could not retrieve address from swap script: {e:?}"))?
1107 .to_unconfidential()
1108 .script_pubkey();
1109 let utxos = self
1110 .liquid_chain_service
1111 .get_script_utxos(&script_pk)
1112 .await?;
1113
1114 let refund_address = match swap.refund_address {
1115 Some(ref refund_address) => refund_address.clone(),
1116 None => {
1117 let address = self.onchain_wallet.next_unused_address().await?.to_string();
1119 self.persister
1120 .set_chain_swap_refund_address(&swap.id, &address)?;
1121 address
1122 }
1123 };
1124
1125 let SdkTransaction::Liquid(refund_tx) = self
1126 .swapper
1127 .create_refund_tx(
1128 Swap::Chain(swap.clone()),
1129 &refund_address,
1130 utxos,
1131 None,
1132 is_cooperative,
1133 )
1134 .await?
1135 else {
1136 return Err(PaymentError::Generic {
1137 err: format!(
1138 "Unexpected refund tx type returned for outgoing Chain swap {}",
1139 swap.id
1140 ),
1141 });
1142 };
1143 let refund_tx_id = self
1144 .liquid_chain_service
1145 .broadcast(&refund_tx)
1146 .await?
1147 .to_string();
1148
1149 info!(
1150 "Successfully broadcast refund for outgoing Chain Swap {}, is_cooperative: {is_cooperative}",
1151 swap.id
1152 );
1153
1154 Ok(refund_tx_id)
1155 }
1156
1157 async fn refund_outgoing(&self, height: u32) -> Result<(), PaymentError> {
1158 let pending_swaps: Vec<ChainSwap> = self
1160 .persister
1161 .list_pending_chain_swaps()?
1162 .into_iter()
1163 .filter(|s| s.direction == Direction::Outgoing && s.refund_tx_id.is_none())
1164 .collect();
1165 for swap in pending_swaps {
1166 let swap_script = swap.get_lockup_swap_script()?.as_liquid_script()?;
1167 let locktime_from_height = ElementsLockTime::from_height(height)
1168 .map_err(|e| PaymentError::Generic { err: e.to_string() })?;
1169 info!("Checking Chain Swap {} expiration: locktime_from_height = {locktime_from_height:?}, swap_script.locktime = {:?}", swap.id, swap_script.locktime);
1170 let has_swap_expired =
1171 utils::is_locktime_expired(locktime_from_height, swap_script.locktime);
1172 if has_swap_expired || swap.state == RefundPending {
1173 let refund_tx_id_res = match swap.state {
1174 Pending => self.refund_outgoing_swap(&swap, false).await,
1175 RefundPending => match has_swap_expired {
1176 true => {
1177 self.refund_outgoing_swap(&swap, true)
1178 .or_else(|e| {
1179 warn!("Failed to initiate cooperative refund, switching to non-cooperative: {e:?}");
1180 self.refund_outgoing_swap(&swap, false)
1181 })
1182 .await
1183 }
1184 false => self.refund_outgoing_swap(&swap, true).await,
1185 },
1186 _ => {
1187 continue;
1188 }
1189 };
1190
1191 if let Ok(refund_tx_id) = refund_tx_id_res {
1192 let update_swap_info_res = self.update_swap_info(&ChainSwapUpdate {
1193 swap_id: swap.id.clone(),
1194 to_state: RefundPending,
1195 refund_tx_id: Some(refund_tx_id),
1196 ..Default::default()
1197 });
1198 if let Err(err) = update_swap_info_res {
1199 warn!(
1200 "Could not update outgoing Chain swap {} information: {err:?}",
1201 swap.id
1202 );
1203 };
1204 }
1205 }
1206 }
1207 Ok(())
1208 }
1209
1210 fn validate_state_transition(
1211 from_state: PaymentState,
1212 to_state: PaymentState,
1213 ) -> Result<(), PaymentError> {
1214 match (from_state, to_state) {
1215 (_, Created) => Err(PaymentError::Generic {
1216 err: "Cannot transition to Created state".to_string(),
1217 }),
1218
1219 (Created | Pending | WaitingFeeAcceptance, Pending) => Ok(()),
1220 (_, Pending) => Err(PaymentError::Generic {
1221 err: format!("Cannot transition from {from_state:?} to Pending state"),
1222 }),
1223
1224 (Created | Pending | WaitingFeeAcceptance, WaitingFeeAcceptance) => Ok(()),
1225 (_, WaitingFeeAcceptance) => Err(PaymentError::Generic {
1226 err: format!("Cannot transition from {from_state:?} to WaitingFeeAcceptance state"),
1227 }),
1228
1229 (Created | Pending | WaitingFeeAcceptance | RefundPending, Complete) => Ok(()),
1230 (_, Complete) => Err(PaymentError::Generic {
1231 err: format!("Cannot transition from {from_state:?} to Complete state"),
1232 }),
1233
1234 (Created, TimedOut) => Ok(()),
1235 (_, TimedOut) => Err(PaymentError::Generic {
1236 err: format!("Cannot transition from {from_state:?} to TimedOut state"),
1237 }),
1238
1239 (
1240 Created | Pending | WaitingFeeAcceptance | RefundPending | Failed | Complete,
1241 Refundable,
1242 ) => Ok(()),
1243 (_, Refundable) => Err(PaymentError::Generic {
1244 err: format!("Cannot transition from {from_state:?} to Refundable state"),
1245 }),
1246
1247 (Pending | WaitingFeeAcceptance | Refundable | RefundPending, RefundPending) => Ok(()),
1248 (_, RefundPending) => Err(PaymentError::Generic {
1249 err: format!("Cannot transition from {from_state:?} to RefundPending state"),
1250 }),
1251
1252 (Complete, Failed) => Err(PaymentError::Generic {
1253 err: format!("Cannot transition from {from_state:?} to Failed state"),
1254 }),
1255
1256 (_, Failed) => Ok(()),
1257 }
1258 }
1259
1260 async fn fetch_incoming_swap_actual_payer_amount(&self, chain_swap: &ChainSwap) -> Result<u64> {
1261 let swap_script = chain_swap.get_lockup_swap_script()?;
1262 let script_pubkey = swap_script
1263 .as_bitcoin_script()?
1264 .to_address(self.config.network.as_bitcoin_chain())
1265 .map_err(|e| anyhow!("Failed to get swap script address {e:?}"))?
1266 .script_pubkey();
1267
1268 let history = self.fetch_bitcoin_script_history(&swap_script).await?;
1269
1270 let first_tx_id = history
1272 .first()
1273 .ok_or(anyhow!(
1274 "No history found for user lockup script for swap {}",
1275 chain_swap.id
1276 ))?
1277 .txid
1278 .to_raw_hash()
1279 .into();
1280
1281 let txs = self
1283 .bitcoin_chain_service
1284 .get_transactions(&[first_tx_id])
1285 .await?;
1286 let user_lockup_tx = txs.first().ok_or(anyhow!(
1287 "No transactions found for user lockup script for swap {}",
1288 chain_swap.id
1289 ))?;
1290
1291 user_lockup_tx
1293 .output
1294 .iter()
1295 .find(|out| out.script_pubkey == script_pubkey)
1296 .map(|out| out.value.to_sat())
1297 .ok_or(anyhow!("No output found paying to user lockup script"))
1298 }
1299
1300 async fn verify_server_lockup_tx(
1301 &self,
1302 chain_swap: &ChainSwap,
1303 swap_update_tx: &TransactionInfo,
1304 verify_confirmation: bool,
1305 ) -> Result<()> {
1306 match chain_swap.direction {
1307 Direction::Incoming => {
1308 self.verify_incoming_server_lockup_tx(
1309 chain_swap,
1310 swap_update_tx,
1311 verify_confirmation,
1312 )
1313 .await
1314 }
1315 Direction::Outgoing => {
1316 self.verify_outgoing_server_lockup_tx(
1317 chain_swap,
1318 swap_update_tx,
1319 verify_confirmation,
1320 )
1321 .await
1322 }
1323 }
1324 }
1325
1326 async fn verify_incoming_server_lockup_tx(
1327 &self,
1328 chain_swap: &ChainSwap,
1329 swap_update_tx: &TransactionInfo,
1330 verify_confirmation: bool,
1331 ) -> Result<()> {
1332 let swap_script = chain_swap.get_claim_swap_script()?;
1333 let claim_details = chain_swap.get_boltz_create_response()?.claim_details;
1334 let liquid_swap_script = swap_script.as_liquid_script()?;
1336 let address = liquid_swap_script
1337 .to_address(self.config.network.into())
1338 .map_err(|e| anyhow!("Failed to get swap script address {e:?}"))?;
1339 let tx_hex = swap_update_tx
1340 .hex
1341 .as_ref()
1342 .ok_or(anyhow!("Transaction info without hex"))?;
1343 let tx = self
1344 .liquid_chain_service
1345 .verify_tx(&address, &swap_update_tx.id, tx_hex, verify_confirmation)
1346 .await?;
1347 let rbf_explicit = tx.input.iter().any(|tx_in| tx_in.sequence.is_rbf());
1349 if !verify_confirmation && rbf_explicit {
1350 bail!("Transaction signals RBF");
1351 }
1352 let secp = Secp256k1::new();
1354 let to_address_output = tx
1355 .output
1356 .iter()
1357 .filter(|tx_out| tx_out.script_pubkey == address.script_pubkey());
1358 let mut value = 0;
1359 for tx_out in to_address_output {
1360 value += tx_out
1361 .unblind(&secp, liquid_swap_script.blinding_key.secret_key())?
1362 .value;
1363 }
1364 match chain_swap.accepted_receiver_amount_sat {
1365 None => {
1366 if value < claim_details.amount {
1367 bail!(
1368 "Transaction value {value} sats is less than {} sats",
1369 claim_details.amount
1370 );
1371 }
1372 }
1373 Some(accepted_receiver_amount_sat) => {
1374 let expected_server_lockup_amount_sat =
1375 accepted_receiver_amount_sat + chain_swap.claim_fees_sat;
1376 if value < expected_server_lockup_amount_sat {
1377 bail!(
1378 "Transaction value {value} sats is less than accepted {} sats",
1379 expected_server_lockup_amount_sat
1380 );
1381 }
1382 }
1383 }
1384
1385 Ok(())
1386 }
1387
1388 async fn verify_outgoing_server_lockup_tx(
1389 &self,
1390 chain_swap: &ChainSwap,
1391 swap_update_tx: &TransactionInfo,
1392 verify_confirmation: bool,
1393 ) -> Result<()> {
1394 let swap_script = chain_swap.get_claim_swap_script()?;
1395 let claim_details = chain_swap.get_boltz_create_response()?.claim_details;
1396 let address = swap_script
1398 .as_bitcoin_script()?
1399 .to_address(self.config.network.as_bitcoin_chain())
1400 .map_err(|e| anyhow!("Failed to get swap script address {e:?}"))?;
1401 let tx_hex = swap_update_tx
1402 .hex
1403 .as_ref()
1404 .ok_or(anyhow!("Transaction info without hex"))?;
1405 let tx = self
1406 .bitcoin_chain_service
1407 .verify_tx(&address, &swap_update_tx.id, tx_hex, verify_confirmation)
1408 .await?;
1409 let rbf_explicit = tx.input.iter().any(|input| input.sequence.is_rbf());
1411 if !verify_confirmation && rbf_explicit {
1412 return Err(anyhow!("Transaction signals RBF"));
1413 }
1414 let value: u64 = tx
1416 .output
1417 .iter()
1418 .filter(|tx_out| tx_out.script_pubkey == address.script_pubkey())
1419 .map(|tx_out| tx_out.value.to_sat())
1420 .sum();
1421 if value < claim_details.amount {
1422 return Err(anyhow!(
1423 "Transaction value {value} sats is less than {} sats",
1424 claim_details.amount
1425 ));
1426 }
1427 Ok(())
1428 }
1429
1430 async fn user_lockup_tx_exists(&self, chain_swap: &ChainSwap) -> Result<bool> {
1431 let lockup_script = chain_swap.get_lockup_swap_script()?;
1432 let script_history = self.fetch_script_history(&lockup_script).await?;
1433
1434 match chain_swap.user_lockup_tx_id.clone() {
1435 Some(user_lockup_tx_id) => {
1436 if !script_history.iter().any(|h| h.0 == user_lockup_tx_id) {
1437 return Ok(false);
1438 }
1439 }
1440 None => {
1441 let (txid, _tx_height) = match script_history.into_iter().nth(0) {
1442 Some(h) => h,
1443 None => {
1444 return Ok(false);
1445 }
1446 };
1447 self.update_swap_info(&ChainSwapUpdate {
1448 swap_id: chain_swap.id.clone(),
1449 to_state: Pending,
1450 user_lockup_tx_id: Some(txid.clone()),
1451 ..Default::default()
1452 })?;
1453 }
1454 }
1455
1456 Ok(true)
1457 }
1458
1459 async fn verify_user_lockup_tx(&self, chain_swap: &ChainSwap) -> Result<()> {
1460 if !self.user_lockup_tx_exists(chain_swap).await? {
1461 bail!("User lockup tx not found in script history");
1462 }
1463
1464 if chain_swap.direction == Direction::Incoming {
1466 let actual_payer_amount_sat = match chain_swap.actual_payer_amount_sat {
1467 Some(amount) => amount,
1468 None => {
1469 let actual_payer_amount_sat = self
1470 .fetch_incoming_swap_actual_payer_amount(chain_swap)
1471 .await?;
1472 self.persister
1473 .update_actual_payer_amount(&chain_swap.id, actual_payer_amount_sat)?;
1474 actual_payer_amount_sat
1475 }
1476 };
1477 if chain_swap.payer_amount_sat > 0
1479 && chain_swap.payer_amount_sat != actual_payer_amount_sat
1480 {
1481 bail!("Invalid user lockup tx - user lockup amount ({actual_payer_amount_sat} sat) differs from agreed ({} sat)", chain_swap.payer_amount_sat);
1482 }
1483 }
1484
1485 Ok(())
1486 }
1487
1488 async fn fetch_bitcoin_script_history(
1489 &self,
1490 swap_script: &SwapScriptV2,
1491 ) -> Result<Vec<BtcHistory>> {
1492 let address = swap_script
1493 .as_bitcoin_script()?
1494 .to_address(self.config.network.as_bitcoin_chain())
1495 .map_err(|e| anyhow!("Failed to get swap script address {e:?}"))?;
1496 let script_pubkey = address.script_pubkey();
1497 let script = script_pubkey.as_script();
1498 self.bitcoin_chain_service
1499 .get_script_history_with_retry(script, 10)
1500 .await
1501 }
1502
1503 async fn fetch_liquid_script_history(
1504 &self,
1505 swap_script: &SwapScriptV2,
1506 ) -> Result<Vec<LBtcHistory>> {
1507 let address = swap_script
1508 .as_liquid_script()?
1509 .to_address(self.config.network.into())
1510 .map_err(|e| anyhow!("Failed to get swap script address {e:?}"))?
1511 .to_unconfidential();
1512 let script = Script::from_hex(hex::encode(address.script_pubkey().as_bytes()).as_str())
1513 .map_err(|e| anyhow!("Failed to get script from address {e:?}"))?;
1514 self.liquid_chain_service
1515 .get_script_history_with_retry(&script, 10)
1516 .await
1517 }
1518}
1519
1520enum ValidateAmountlessSwapResult {
1521 ReadyForAccepting {
1522 user_lockup_amount_sat: u64,
1523 receiver_amount_sat: u64,
1524 },
1525 RequiresUserAction {
1526 user_lockup_amount_sat: u64,
1527 },
1528}
1529
1530async fn maybe_delay_before_claim(is_swap_local: bool) {
1531 if !is_swap_local {
1536 info!("Waiting 5 seconds before claim to reduce likelihood of concurrent claims");
1537 tokio::time::sleep(Duration::from_secs(5)).await;
1538 }
1539}
1540
1541#[cfg(test)]
1542mod tests {
1543 use anyhow::Result;
1544 use std::collections::{HashMap, HashSet};
1545
1546 use crate::{
1547 model::{
1548 ChainSwapUpdate, Direction,
1549 PaymentState::{self, *},
1550 },
1551 test_utils::{
1552 chain_swap::{new_chain_swap, new_chain_swap_handler},
1553 persist::create_persister,
1554 },
1555 };
1556
1557 #[cfg(feature = "browser-tests")]
1558 wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
1559
1560 #[sdk_macros::async_test_all]
1561 async fn test_chain_swap_state_transitions() -> Result<()> {
1562 create_persister!(persister);
1563
1564 let chain_swap_handler = new_chain_swap_handler(persister.clone())?;
1565
1566 let all_states = HashSet::from([
1568 Created,
1569 Pending,
1570 WaitingFeeAcceptance,
1571 Complete,
1572 TimedOut,
1573 Failed,
1574 ]);
1575 let valid_combinations = HashMap::from([
1576 (
1577 Created,
1578 HashSet::from([
1579 Pending,
1580 WaitingFeeAcceptance,
1581 Complete,
1582 TimedOut,
1583 Refundable,
1584 Failed,
1585 ]),
1586 ),
1587 (
1588 Pending,
1589 HashSet::from([
1590 Pending,
1591 WaitingFeeAcceptance,
1592 Complete,
1593 Refundable,
1594 RefundPending,
1595 Failed,
1596 ]),
1597 ),
1598 (
1599 WaitingFeeAcceptance,
1600 HashSet::from([
1601 Pending,
1602 WaitingFeeAcceptance,
1603 Complete,
1604 Refundable,
1605 RefundPending,
1606 Failed,
1607 ]),
1608 ),
1609 (TimedOut, HashSet::from([Failed])),
1610 (Complete, HashSet::from([Refundable])),
1611 (Refundable, HashSet::from([RefundPending, Failed])),
1612 (
1613 RefundPending,
1614 HashSet::from([Refundable, Complete, Failed, RefundPending]),
1615 ),
1616 (Failed, HashSet::from([Failed, Refundable])),
1617 ]);
1618
1619 for (first_state, allowed_states) in valid_combinations.iter() {
1620 for allowed_state in allowed_states {
1621 let chain_swap = new_chain_swap(
1622 Direction::Incoming,
1623 Some(*first_state),
1624 false,
1625 None,
1626 false,
1627 false,
1628 None,
1629 );
1630 persister.insert_or_update_chain_swap(&chain_swap)?;
1631
1632 assert!(chain_swap_handler
1633 .update_swap_info(&ChainSwapUpdate {
1634 swap_id: chain_swap.id,
1635 to_state: *allowed_state,
1636 ..Default::default()
1637 })
1638 .is_ok());
1639 }
1640 }
1641
1642 let invalid_combinations: HashMap<PaymentState, HashSet<PaymentState>> = valid_combinations
1644 .iter()
1645 .map(|(first_state, allowed_states)| {
1646 (
1647 *first_state,
1648 all_states.difference(allowed_states).cloned().collect(),
1649 )
1650 })
1651 .collect();
1652
1653 for (first_state, disallowed_states) in invalid_combinations.iter() {
1654 for disallowed_state in disallowed_states {
1655 let chain_swap = new_chain_swap(
1656 Direction::Incoming,
1657 Some(*first_state),
1658 false,
1659 None,
1660 false,
1661 false,
1662 None,
1663 );
1664 persister.insert_or_update_chain_swap(&chain_swap)?;
1665
1666 assert!(chain_swap_handler
1667 .update_swap_info(&ChainSwapUpdate {
1668 swap_id: chain_swap.id,
1669 to_state: *disallowed_state,
1670 ..Default::default()
1671 })
1672 .is_err());
1673 }
1674 }
1675
1676 Ok(())
1677 }
1678}