breez_sdk_spark/sdk/
payments.rs

1use bitcoin::hashes::sha256;
2use spark_wallet::{ExitSpeed, SparkAddress, TransferId, TransferTokenOutput};
3use std::str::FromStr;
4use tokio::select;
5use tokio::sync::mpsc;
6use tokio::time::timeout;
7use tracing::{Instrument, error, info, warn};
8use web_time::Duration;
9
10use crate::{
11    BitcoinAddressDetails, Bolt11InvoiceDetails, ClaimHtlcPaymentRequest, ClaimHtlcPaymentResponse,
12    ConversionEstimate, ConversionOptions, ConversionPurpose, ConversionType, FeePolicy,
13    FetchConversionLimitsRequest, FetchConversionLimitsResponse, GetPaymentRequest,
14    GetPaymentResponse, InputType, OnchainConfirmationSpeed, PaymentStatus, SendOnchainFeeQuote,
15    SendPaymentMethod, SendPaymentOptions, SparkHtlcOptions, SparkInvoiceDetails,
16    WaitForPaymentIdentifier,
17    error::SdkError,
18    events::SdkEvent,
19    models::{
20        ListPaymentsRequest, ListPaymentsResponse, Payment, PaymentDetails,
21        PrepareSendPaymentRequest, PrepareSendPaymentResponse, ReceivePaymentMethod,
22        ReceivePaymentRequest, ReceivePaymentResponse, SendPaymentRequest, SendPaymentResponse,
23    },
24    persist::PaymentMetadata,
25    token_conversion::{
26        ConversionAmount, DEFAULT_CONVERSION_TIMEOUT_SECS, TokenConversionResponse,
27    },
28    utils::{
29        payments::{get_payment_and_emit_event, get_payment_with_conversion_details},
30        send_payment_validation::{get_dust_limit_sats, validate_prepare_send_payment_request},
31        token::map_and_persist_token_transaction,
32    },
33};
34use bitcoin::secp256k1::PublicKey;
35use platform_utils::tokio;
36use spark_wallet::{InvoiceDescription, Preimage};
37use web_time::SystemTime;
38
39use super::{
40    BreezSdk, SyncType,
41    helpers::{InternalEventListener, get_or_create_deposit_address, is_payment_match},
42};
43
44#[cfg_attr(feature = "uniffi", uniffi::export(async_runtime = "tokio"))]
45#[allow(clippy::needless_pass_by_value)]
46impl BreezSdk {
47    pub async fn receive_payment(
48        &self,
49        request: ReceivePaymentRequest,
50    ) -> Result<ReceivePaymentResponse, SdkError> {
51        self.ensure_spark_private_mode_initialized().await?;
52        match request.payment_method {
53            ReceivePaymentMethod::SparkAddress => Ok(ReceivePaymentResponse {
54                fee: 0,
55                payment_request: self
56                    .spark_wallet
57                    .get_spark_address()?
58                    .to_address_string()
59                    .map_err(|e| {
60                        SdkError::Generic(format!("Failed to convert Spark address to string: {e}"))
61                    })?,
62            }),
63            ReceivePaymentMethod::SparkInvoice {
64                amount,
65                token_identifier,
66                expiry_time,
67                description,
68                sender_public_key,
69            } => {
70                let invoice = self
71                    .spark_wallet
72                    .create_spark_invoice(
73                        amount,
74                        token_identifier.clone(),
75                        expiry_time
76                            .map(|time| {
77                                SystemTime::UNIX_EPOCH
78                                    .checked_add(Duration::from_secs(time))
79                                    .ok_or(SdkError::Generic("Invalid expiry time".to_string()))
80                            })
81                            .transpose()?,
82                        description,
83                        sender_public_key.map(|key| PublicKey::from_str(&key).unwrap()),
84                    )
85                    .await?;
86                Ok(ReceivePaymentResponse {
87                    fee: 0,
88                    payment_request: invoice,
89                })
90            }
91            ReceivePaymentMethod::BitcoinAddress => {
92                let address =
93                    get_or_create_deposit_address(&self.spark_wallet, self.storage.clone()).await?;
94                Ok(ReceivePaymentResponse {
95                    payment_request: address,
96                    fee: 0,
97                })
98            }
99            ReceivePaymentMethod::Bolt11Invoice {
100                description,
101                amount_sats,
102                expiry_secs,
103                payment_hash,
104            } => {
105                self.receive_bolt11_invoice(description, amount_sats, expiry_secs, payment_hash)
106                    .await
107            }
108        }
109    }
110
111    pub async fn claim_htlc_payment(
112        &self,
113        request: ClaimHtlcPaymentRequest,
114    ) -> Result<ClaimHtlcPaymentResponse, SdkError> {
115        let preimage = Preimage::from_hex(&request.preimage)
116            .map_err(|_| SdkError::InvalidInput("Invalid preimage".to_string()))?;
117        let payment_hash = preimage.compute_hash();
118
119        // Check if there is a claimable HTLC with the given payment hash
120        let claimable_htlc_transfers = self
121            .spark_wallet
122            .list_claimable_htlc_transfers(None)
123            .await?;
124        if !claimable_htlc_transfers
125            .iter()
126            .filter_map(|t| t.htlc_preimage_request.as_ref())
127            .any(|p| p.payment_hash == payment_hash)
128        {
129            return Err(SdkError::InvalidInput(
130                "No claimable HTLC with the given payment hash".to_string(),
131            ));
132        }
133
134        let transfer = self.spark_wallet.claim_htlc(&preimage).await?;
135        let payment: Payment = transfer.try_into()?;
136
137        // Insert the payment into storage to make it immediately available for listing
138        self.storage.insert_payment(payment.clone()).await?;
139
140        Ok(ClaimHtlcPaymentResponse { payment })
141    }
142
143    #[allow(clippy::too_many_lines)]
144    pub async fn prepare_send_payment(
145        &self,
146        request: PrepareSendPaymentRequest,
147    ) -> Result<PrepareSendPaymentResponse, SdkError> {
148        let parsed_input = self.parse(&request.payment_request).await?;
149
150        validate_prepare_send_payment_request(
151            &parsed_input,
152            &request,
153            &self.spark_wallet.get_identity_public_key().to_string(),
154        )?;
155
156        let fee_policy = request.fee_policy.unwrap_or_default();
157        let token_identifier = request.token_identifier.clone();
158
159        match &parsed_input {
160            InputType::SparkAddress(spark_address_details) => {
161                let amount = request
162                    .amount
163                    .ok_or(SdkError::InvalidInput("Amount is required".to_string()))?;
164
165                // FeesIncluded doesn't support conversion (validated earlier)
166                let conversion_estimate = if fee_policy == FeePolicy::FeesIncluded {
167                    None
168                } else {
169                    let conversion_options = self
170                        .get_conversion_options_for_payment(
171                            request.conversion_options.as_ref(),
172                            token_identifier.as_ref(),
173                            amount,
174                        )
175                        .await?;
176                    self.token_converter
177                        .validate(
178                            conversion_options.as_ref(),
179                            token_identifier.as_ref(),
180                            amount,
181                        )
182                        .await?
183                };
184
185                Ok(PrepareSendPaymentResponse {
186                    payment_method: SendPaymentMethod::SparkAddress {
187                        address: spark_address_details.address.clone(),
188                        fee: 0,
189                        token_identifier: token_identifier.clone(),
190                    },
191                    amount,
192                    token_identifier,
193                    conversion_estimate,
194                    fee_policy,
195                })
196            }
197            InputType::SparkInvoice(spark_invoice_details) => {
198                // Use request's token_identifier if provided, otherwise fall back to invoice's
199                let effective_token_identifier =
200                    token_identifier.or_else(|| spark_invoice_details.token_identifier.clone());
201
202                let amount = spark_invoice_details
203                    .amount
204                    .or(request.amount)
205                    .ok_or(SdkError::InvalidInput("Amount is required".to_string()))?;
206
207                // FeesIncluded doesn't support conversion (validated earlier)
208                let conversion_estimate = if fee_policy == FeePolicy::FeesIncluded {
209                    None
210                } else {
211                    let conversion_options = self
212                        .get_conversion_options_for_payment(
213                            request.conversion_options.as_ref(),
214                            effective_token_identifier.as_ref(),
215                            amount,
216                        )
217                        .await?;
218                    self.token_converter
219                        .validate(
220                            conversion_options.as_ref(),
221                            effective_token_identifier.as_ref(),
222                            amount,
223                        )
224                        .await?
225                };
226
227                Ok(PrepareSendPaymentResponse {
228                    payment_method: SendPaymentMethod::SparkInvoice {
229                        spark_invoice_details: spark_invoice_details.clone(),
230                        fee: 0,
231                        token_identifier: effective_token_identifier.clone(),
232                    },
233                    amount,
234                    token_identifier: effective_token_identifier,
235                    conversion_estimate,
236                    fee_policy,
237                })
238            }
239            InputType::Bolt11Invoice(detailed_bolt11_invoice) => {
240                let spark_address: Option<SparkAddress> = self
241                    .spark_wallet
242                    .extract_spark_address(&request.payment_request)?;
243
244                let spark_transfer_fee_sats = if spark_address.is_some() {
245                    Some(0)
246                } else {
247                    None
248                };
249
250                let amount = request
251                    .amount
252                    .or(detailed_bolt11_invoice
253                        .amount_msat
254                        .map(|msat| u128::from(msat).saturating_div(1000)))
255                    .ok_or(SdkError::InvalidInput("Amount is required".to_string()))?;
256
257                // For FeesIncluded, estimate fee for user's full amount
258                let lightning_fee_sats = self
259                    .spark_wallet
260                    .fetch_lightning_send_fee_estimate(
261                        &request.payment_request,
262                        Some(amount.try_into()?),
263                    )
264                    .await?;
265
266                // Validate receiver amount is positive for FeesIncluded
267                if fee_policy == FeePolicy::FeesIncluded
268                    && detailed_bolt11_invoice.amount_msat.is_none()
269                {
270                    let amount_u64: u64 = amount.try_into()?;
271                    if amount_u64 <= lightning_fee_sats {
272                        return Err(SdkError::InvalidInput(
273                            "Amount too small to cover fees".to_string(),
274                        ));
275                    }
276                }
277
278                // FeesIncluded doesn't support conversion (validated earlier)
279                let conversion_estimate = if fee_policy == FeePolicy::FeesIncluded {
280                    None
281                } else {
282                    let total_amount = amount.saturating_add(u128::from(lightning_fee_sats));
283                    let conversion_options = self
284                        .get_conversion_options_for_payment(
285                            request.conversion_options.as_ref(),
286                            token_identifier.as_ref(),
287                            total_amount,
288                        )
289                        .await?;
290                    self.token_converter
291                        .validate(
292                            conversion_options.as_ref(),
293                            token_identifier.as_ref(),
294                            total_amount,
295                        )
296                        .await?
297                };
298
299                Ok(PrepareSendPaymentResponse {
300                    payment_method: SendPaymentMethod::Bolt11Invoice {
301                        invoice_details: detailed_bolt11_invoice.clone(),
302                        spark_transfer_fee_sats,
303                        lightning_fee_sats,
304                    },
305                    amount,
306                    token_identifier,
307                    conversion_estimate,
308                    fee_policy,
309                })
310            }
311            InputType::BitcoinAddress(withdrawal_address) => {
312                let amount = request
313                    .amount
314                    .ok_or(SdkError::InvalidInput("Amount is required".to_string()))?;
315
316                // Validate the amount meets the dust limit before making any network calls.
317                // For FeesIncluded the output will be smaller after fees, but if the total
318                // amount is already below dust there's no point fetching a fee quote.
319                let dust_limit_sats = get_dust_limit_sats(&withdrawal_address.address)?;
320                let amount_u64: u64 = amount.try_into()?;
321                if amount_u64 < dust_limit_sats {
322                    return Err(SdkError::InvalidInput(format!(
323                        "Amount is below the minimum of {dust_limit_sats} sats required for this address"
324                    )));
325                }
326
327                let fee_quote: SendOnchainFeeQuote = self
328                    .spark_wallet
329                    .fetch_coop_exit_fee_quote(
330                        &withdrawal_address.address,
331                        Some(amount.try_into()?),
332                    )
333                    .await?
334                    .into();
335
336                // For FeesIncluded, validate the output after fees using the best case
337                // (slow/lowest fee). Only reject if even the cheapest option results in dust.
338                if fee_policy == FeePolicy::FeesIncluded {
339                    let min_fee_sats = fee_quote.speed_slow.total_fee_sat();
340                    let output_amount_sats = amount_u64.saturating_sub(min_fee_sats);
341                    if output_amount_sats < dust_limit_sats {
342                        return Err(SdkError::InvalidInput(format!(
343                            "Amount is below the minimum of {dust_limit_sats} sats required for this address after lowest fees of {min_fee_sats} sats"
344                        )));
345                    }
346                }
347
348                // FeesIncluded doesn't support conversion (validated earlier)
349                let conversion_estimate = if fee_policy == FeePolicy::FeesIncluded {
350                    None
351                } else {
352                    // For conversion estimate, use fast fee as worst case
353                    let total_amount =
354                        amount.saturating_add(u128::from(fee_quote.speed_fast.total_fee_sat()));
355                    let conversion_options = self
356                        .get_conversion_options_for_payment(
357                            request.conversion_options.as_ref(),
358                            token_identifier.as_ref(),
359                            total_amount,
360                        )
361                        .await?;
362                    self.token_converter
363                        .validate(
364                            conversion_options.as_ref(),
365                            token_identifier.as_ref(),
366                            total_amount,
367                        )
368                        .await?
369                };
370
371                Ok(PrepareSendPaymentResponse {
372                    payment_method: SendPaymentMethod::BitcoinAddress {
373                        address: withdrawal_address.clone(),
374                        fee_quote,
375                    },
376                    amount,
377                    token_identifier,
378                    conversion_estimate,
379                    fee_policy,
380                })
381            }
382            _ => Err(SdkError::InvalidInput(
383                "Unsupported payment method".to_string(),
384            )),
385        }
386    }
387
388    pub async fn send_payment(
389        &self,
390        request: SendPaymentRequest,
391    ) -> Result<SendPaymentResponse, SdkError> {
392        self.ensure_spark_private_mode_initialized().await?;
393        Box::pin(self.maybe_convert_token_send_payment(request, false, None)).await
394    }
395
396    pub async fn fetch_conversion_limits(
397        &self,
398        request: FetchConversionLimitsRequest,
399    ) -> Result<FetchConversionLimitsResponse, SdkError> {
400        self.token_converter
401            .fetch_limits(&request)
402            .await
403            .map_err(Into::into)
404    }
405
406    /// Lists payments from the storage with pagination
407    ///
408    /// This method provides direct access to the payment history stored in the database.
409    /// It returns payments in reverse chronological order (newest first).
410    ///
411    /// # Arguments
412    ///
413    /// * `request` - Contains pagination parameters (offset and limit)
414    ///
415    /// # Returns
416    ///
417    /// * `Ok(ListPaymentsResponse)` - Contains the list of payments if successful
418    /// * `Err(SdkError)` - If there was an error accessing the storage
419    pub async fn list_payments(
420        &self,
421        request: ListPaymentsRequest,
422    ) -> Result<ListPaymentsResponse, SdkError> {
423        let mut payments = self.storage.list_payments(request.into()).await?;
424
425        // Collect all parent IDs and batch query for related payments
426        let parent_ids: Vec<String> = payments.iter().map(|p| p.id.clone()).collect();
427
428        if !parent_ids.is_empty() {
429            let related_payments_map = self.storage.get_payments_by_parent_ids(parent_ids).await?;
430
431            // Add conversion details of each payments
432            for payment in &mut payments {
433                if let Some(related_payments) = related_payments_map.get(&payment.id) {
434                    match related_payments.try_into() {
435                        Ok(conversion_details) => {
436                            payment.conversion_details = Some(conversion_details);
437                        }
438                        Err(e) => {
439                            warn!("Found payments couldn't be converted to ConversionDetails: {e}");
440                        }
441                    }
442                }
443            }
444        }
445
446        Ok(ListPaymentsResponse { payments })
447    }
448
449    pub async fn get_payment(
450        &self,
451        request: GetPaymentRequest,
452    ) -> Result<GetPaymentResponse, SdkError> {
453        let payment =
454            get_payment_with_conversion_details(request.payment_id, self.storage.clone()).await?;
455
456        Ok(GetPaymentResponse { payment })
457    }
458}
459
460// Private payment methods
461impl BreezSdk {
462    async fn receive_bolt11_invoice(
463        &self,
464        description: String,
465        amount_sats: Option<u64>,
466        expiry_secs: Option<u32>,
467        payment_hash: Option<String>,
468    ) -> Result<ReceivePaymentResponse, SdkError> {
469        let invoice = if let Some(payment_hash_hex) = payment_hash {
470            let hash = sha256::Hash::from_str(&payment_hash_hex)
471                .map_err(|e| SdkError::InvalidInput(format!("Invalid payment hash: {e}")))?;
472            self.spark_wallet
473                .create_hodl_lightning_invoice(
474                    amount_sats.unwrap_or_default(),
475                    Some(InvoiceDescription::Memo(description.clone())),
476                    hash,
477                    None,
478                    expiry_secs,
479                )
480                .await?
481                .invoice
482        } else {
483            self.spark_wallet
484                .create_lightning_invoice(
485                    amount_sats.unwrap_or_default(),
486                    Some(InvoiceDescription::Memo(description.clone())),
487                    None,
488                    expiry_secs,
489                    self.config.prefer_spark_over_lightning,
490                )
491                .await?
492                .invoice
493        };
494        Ok(ReceivePaymentResponse {
495            payment_request: invoice,
496            fee: 0,
497        })
498    }
499
500    pub(super) async fn maybe_convert_token_send_payment(
501        &self,
502        request: SendPaymentRequest,
503        mut suppress_payment_event: bool,
504        amount_override: Option<u64>,
505    ) -> Result<SendPaymentResponse, SdkError> {
506        let token_identifier = request.prepare_response.token_identifier.clone();
507
508        // Check the idempotency key is valid and payment doesn't already exist
509        if request.idempotency_key.is_some() && token_identifier.is_some() {
510            return Err(SdkError::InvalidInput(
511                "Idempotency key is not supported for token payments".to_string(),
512            ));
513        }
514        if let Some(idempotency_key) = &request.idempotency_key {
515            // If an idempotency key is provided, check if a payment with that id already exists
516            if let Ok(payment) = self
517                .storage
518                .get_payment_by_id(idempotency_key.clone())
519                .await
520            {
521                return Ok(SendPaymentResponse { payment });
522            }
523        }
524        // Perform the send payment, with conversion if requested
525        let res = if let Some(ConversionEstimate {
526            options: conversion_options,
527            ..
528        }) = &request.prepare_response.conversion_estimate
529        {
530            Box::pin(self.convert_token_send_payment_internal(
531                conversion_options,
532                &request,
533                &mut suppress_payment_event,
534            ))
535            .await
536        } else {
537            Box::pin(self.send_payment_internal(&request, amount_override)).await
538        };
539        // Emit payment status event and trigger wallet state sync
540        if let Ok(response) = &res {
541            if !suppress_payment_event {
542                // Emit the payment with metadata already included
543                self.event_emitter
544                    .emit(&SdkEvent::from_payment(response.payment.clone()))
545                    .await;
546            }
547            self.sync_coordinator
548                .trigger_sync_no_wait(SyncType::WalletState, true)
549                .await;
550        }
551        res
552    }
553
554    #[allow(clippy::too_many_lines)]
555    async fn convert_token_send_payment_internal(
556        &self,
557        conversion_options: &ConversionOptions,
558        request: &SendPaymentRequest,
559        suppress_payment_event: &mut bool,
560    ) -> Result<SendPaymentResponse, SdkError> {
561        // FeesIncluded not supported with token conversion (validated earlier)
562        if request.prepare_response.fee_policy == FeePolicy::FeesIncluded {
563            return Err(SdkError::InvalidInput(
564                "FeesIncluded not supported with token conversion".to_string(),
565            ));
566        }
567
568        // Prevent auto-convert from running while this payment is in progress.
569        let _lock_guard = match (
570            &request.prepare_response.token_identifier,
571            &self.stable_balance,
572        ) {
573            (None, Some(sb)) => Some(sb.create_payment_lock_guard()),
574            _ => None,
575        };
576
577        let amount = request.prepare_response.amount;
578        let token_identifier = request.prepare_response.token_identifier.clone();
579
580        // Perform a conversion before sending the payment
581        let (conversion_response, conversion_purpose) =
582            match &request.prepare_response.payment_method {
583                SendPaymentMethod::SparkAddress { address, .. } => {
584                    let spark_address = address
585                        .parse::<SparkAddress>()
586                        .map_err(|_| SdkError::InvalidInput("Invalid spark address".to_string()))?;
587                    let conversion_purpose = if spark_address.identity_public_key
588                        == self.spark_wallet.get_identity_public_key()
589                    {
590                        ConversionPurpose::SelfTransfer
591                    } else {
592                        ConversionPurpose::OngoingPayment {
593                            payment_request: address.clone(),
594                        }
595                    };
596                    let conversion_response = self
597                        .token_converter
598                        .convert(
599                            conversion_options,
600                            &conversion_purpose,
601                            token_identifier.as_ref(),
602                            ConversionAmount::MinAmountOut(amount),
603                        )
604                        .await?;
605                    (conversion_response, conversion_purpose)
606                }
607                SendPaymentMethod::SparkInvoice {
608                    spark_invoice_details:
609                        SparkInvoiceDetails {
610                            identity_public_key,
611                            invoice,
612                            ..
613                        },
614                    ..
615                } => {
616                    let own_identity_public_key =
617                        self.spark_wallet.get_identity_public_key().to_string();
618                    let conversion_purpose = if identity_public_key == &own_identity_public_key {
619                        ConversionPurpose::SelfTransfer
620                    } else {
621                        ConversionPurpose::OngoingPayment {
622                            payment_request: invoice.clone(),
623                        }
624                    };
625                    let conversion_response = self
626                        .token_converter
627                        .convert(
628                            conversion_options,
629                            &conversion_purpose,
630                            token_identifier.as_ref(),
631                            ConversionAmount::MinAmountOut(amount),
632                        )
633                        .await?;
634                    (conversion_response, conversion_purpose)
635                }
636                SendPaymentMethod::Bolt11Invoice {
637                    spark_transfer_fee_sats,
638                    lightning_fee_sats,
639                    invoice_details,
640                    ..
641                } => {
642                    let conversion_purpose = ConversionPurpose::OngoingPayment {
643                        payment_request: invoice_details.invoice.bolt11.clone(),
644                    };
645                    let conversion_response = self
646                        .convert_token_for_bolt11_invoice(
647                            conversion_options,
648                            *spark_transfer_fee_sats,
649                            *lightning_fee_sats,
650                            request,
651                            &conversion_purpose,
652                            amount,
653                            token_identifier.as_ref(),
654                        )
655                        .await?;
656                    (conversion_response, conversion_purpose)
657                }
658                SendPaymentMethod::BitcoinAddress { address, fee_quote } => {
659                    let conversion_purpose = ConversionPurpose::OngoingPayment {
660                        payment_request: address.address.clone(),
661                    };
662                    let conversion_response = self
663                        .convert_token_for_bitcoin_address(
664                            conversion_options,
665                            fee_quote,
666                            request,
667                            &conversion_purpose,
668                            amount,
669                            token_identifier.as_ref(),
670                        )
671                        .await?;
672                    (conversion_response, conversion_purpose)
673                }
674            };
675        // Trigger a wallet state sync if converting from Bitcoin to token
676        if matches!(
677            conversion_options.conversion_type,
678            ConversionType::FromBitcoin
679        ) {
680            self.sync_coordinator
681                .trigger_sync_no_wait(SyncType::WalletState, true)
682                .await;
683        }
684        // Wait for the received conversion payment to complete
685        let payment = self
686            .wait_for_payment(
687                WaitForPaymentIdentifier::PaymentId(
688                    conversion_response.received_payment_id.clone(),
689                ),
690                conversion_options
691                    .completion_timeout_secs
692                    .unwrap_or(DEFAULT_CONVERSION_TIMEOUT_SECS),
693            )
694            .await
695            .map_err(|e| {
696                SdkError::Generic(format!("Timeout waiting for conversion to complete: {e}"))
697            })?;
698        // For self-payments, we can skip sending the actual payment
699        if conversion_purpose == ConversionPurpose::SelfTransfer {
700            *suppress_payment_event = true;
701            return Ok(SendPaymentResponse { payment });
702        }
703        // Now send the actual payment
704        let response = Box::pin(self.send_payment_internal(request, None)).await?;
705        // Set payment metadata to link the payments
706        self.storage
707            .insert_payment_metadata(
708                conversion_response.sent_payment_id,
709                PaymentMetadata {
710                    parent_payment_id: Some(response.payment.id.clone()),
711                    ..Default::default()
712                },
713            )
714            .await?;
715        self.storage
716            .insert_payment_metadata(
717                conversion_response.received_payment_id,
718                PaymentMetadata {
719                    parent_payment_id: Some(response.payment.id.clone()),
720                    ..Default::default()
721                },
722            )
723            .await?;
724        // Fetch the updated payment with conversion details
725        get_payment_with_conversion_details(response.payment.id, self.storage.clone())
726            .await
727            .map(|payment| SendPaymentResponse { payment })
728        // _lock_guard drops here, releasing the distributed lock if no other payments are in-flight
729    }
730
731    pub(super) async fn send_payment_internal(
732        &self,
733        request: &SendPaymentRequest,
734        amount_override: Option<u64>,
735    ) -> Result<SendPaymentResponse, SdkError> {
736        let amount = request.prepare_response.amount;
737        let token_identifier = request.prepare_response.token_identifier.clone();
738
739        match &request.prepare_response.payment_method {
740            SendPaymentMethod::SparkAddress { address, .. } => {
741                self.send_spark_address(
742                    address,
743                    token_identifier,
744                    amount,
745                    request.options.as_ref(),
746                    request.idempotency_key.clone(),
747                )
748                .await
749            }
750            SendPaymentMethod::SparkInvoice {
751                spark_invoice_details,
752                ..
753            } => {
754                self.send_spark_invoice(&spark_invoice_details.invoice, request, amount)
755                    .await
756            }
757            SendPaymentMethod::Bolt11Invoice {
758                invoice_details,
759                spark_transfer_fee_sats,
760                lightning_fee_sats,
761                ..
762            } => {
763                Box::pin(self.send_bolt11_invoice(
764                    invoice_details,
765                    *spark_transfer_fee_sats,
766                    *lightning_fee_sats,
767                    request,
768                    amount_override,
769                    amount,
770                ))
771                .await
772            }
773            SendPaymentMethod::BitcoinAddress { address, fee_quote } => {
774                self.send_bitcoin_address(address, fee_quote, request).await
775            }
776        }
777    }
778
779    async fn send_spark_address(
780        &self,
781        address: &str,
782        token_identifier: Option<String>,
783        amount: u128,
784        options: Option<&SendPaymentOptions>,
785        idempotency_key: Option<String>,
786    ) -> Result<SendPaymentResponse, SdkError> {
787        let spark_address = address
788            .parse::<SparkAddress>()
789            .map_err(|_| SdkError::InvalidInput("Invalid spark address".to_string()))?;
790
791        // If HTLC options are provided, send an HTLC transfer
792        if let Some(SendPaymentOptions::SparkAddress { htlc_options }) = options
793            && let Some(htlc_options) = htlc_options
794        {
795            if token_identifier.is_some() {
796                return Err(SdkError::InvalidInput(
797                    "Can't provide both token identifier and HTLC options".to_string(),
798                ));
799            }
800
801            return self
802                .send_spark_htlc(
803                    &spark_address,
804                    amount.try_into()?,
805                    htlc_options,
806                    idempotency_key,
807                )
808                .await;
809        }
810
811        let payment = if let Some(identifier) = token_identifier {
812            self.send_spark_token_address(identifier, amount, spark_address)
813                .await?
814        } else {
815            let transfer_id = idempotency_key
816                .as_ref()
817                .map(|key| TransferId::from_str(key))
818                .transpose()?;
819            let transfer = self
820                .spark_wallet
821                .transfer(amount.try_into()?, &spark_address, transfer_id)
822                .await?;
823            transfer.try_into()?
824        };
825
826        // Insert the payment into storage to make it immediately available for listing
827        self.storage.insert_payment(payment.clone()).await?;
828
829        Ok(SendPaymentResponse { payment })
830    }
831
832    async fn send_spark_htlc(
833        &self,
834        address: &SparkAddress,
835        amount_sat: u64,
836        htlc_options: &SparkHtlcOptions,
837        idempotency_key: Option<String>,
838    ) -> Result<SendPaymentResponse, SdkError> {
839        let payment_hash = sha256::Hash::from_str(&htlc_options.payment_hash)
840            .map_err(|_| SdkError::InvalidInput("Invalid payment hash".to_string()))?;
841
842        if htlc_options.expiry_duration_secs == 0 {
843            return Err(SdkError::InvalidInput(
844                "Expiry duration must be greater than 0".to_string(),
845            ));
846        }
847        let expiry_duration = Duration::from_secs(htlc_options.expiry_duration_secs);
848
849        let transfer_id = idempotency_key
850            .as_ref()
851            .map(|key| TransferId::from_str(key))
852            .transpose()?;
853        let transfer = self
854            .spark_wallet
855            .create_htlc(
856                amount_sat,
857                address,
858                &payment_hash,
859                expiry_duration,
860                transfer_id,
861            )
862            .await?;
863
864        let payment: Payment = transfer.try_into()?;
865
866        // Insert the payment into storage to make it immediately available for listing
867        self.storage.insert_payment(payment.clone()).await?;
868
869        Ok(SendPaymentResponse { payment })
870    }
871
872    async fn send_spark_token_address(
873        &self,
874        token_identifier: String,
875        amount: u128,
876        receiver_address: SparkAddress,
877    ) -> Result<Payment, SdkError> {
878        let token_transaction = self
879            .spark_wallet
880            .transfer_tokens(
881                vec![TransferTokenOutput {
882                    token_id: token_identifier,
883                    amount,
884                    receiver_address: receiver_address.clone(),
885                    spark_invoice: None,
886                }],
887                None,
888                None,
889            )
890            .await?;
891
892        map_and_persist_token_transaction(&self.spark_wallet, &self.storage, &token_transaction)
893            .await
894    }
895
896    async fn send_spark_invoice(
897        &self,
898        invoice: &str,
899        request: &SendPaymentRequest,
900        amount: u128,
901    ) -> Result<SendPaymentResponse, SdkError> {
902        let transfer_id = request
903            .idempotency_key
904            .as_ref()
905            .map(|key| TransferId::from_str(key))
906            .transpose()?;
907
908        let payment = match self
909            .spark_wallet
910            .fulfill_spark_invoice(invoice, Some(amount), transfer_id)
911            .await?
912        {
913            spark_wallet::FulfillSparkInvoiceResult::Transfer(wallet_transfer) => {
914                (*wallet_transfer).try_into()?
915            }
916            spark_wallet::FulfillSparkInvoiceResult::TokenTransaction(token_transaction) => {
917                map_and_persist_token_transaction(
918                    &self.spark_wallet,
919                    &self.storage,
920                    &token_transaction,
921                )
922                .await?
923            }
924        };
925
926        // Insert the payment into storage to make it immediately available for listing
927        self.storage.insert_payment(payment.clone()).await?;
928
929        Ok(SendPaymentResponse { payment })
930    }
931
932    /// For `FeesIncluded` + amountless Bolt11: calculates the amount to send
933    /// (`receiver_amount` + any overpayment from fee decrease).
934    async fn calculate_fees_included_bolt11_amount(
935        &self,
936        invoice: &str,
937        user_amount: u64,
938        stored_fee: u64,
939    ) -> Result<u64, SdkError> {
940        let receiver_amount = user_amount.saturating_sub(stored_fee);
941        if receiver_amount == 0 {
942            return Err(SdkError::InvalidInput(
943                "Amount too small to cover fees".to_string(),
944            ));
945        }
946
947        // Re-estimate current fee for receiver amount
948        let current_fee = self
949            .spark_wallet
950            .fetch_lightning_send_fee_estimate(invoice, Some(receiver_amount))
951            .await?;
952
953        // If current fee exceeds stored fee, fail
954        if current_fee > stored_fee {
955            return Err(SdkError::Generic(
956                "Fee increased since prepare. Please retry.".to_string(),
957            ));
958        }
959
960        // Calculate overpayment
961        let overpayment = stored_fee.saturating_sub(current_fee);
962
963        // Protect against excessive fee overpayment.
964        // Allow overpayment up to 100% of actual fee, with a minimum of 1 sat.
965        let max_allowed_overpayment = current_fee.max(1);
966        if overpayment > max_allowed_overpayment {
967            return Err(SdkError::Generic(format!(
968                "Fee overpayment ({overpayment} sats) exceeds allowed maximum ({max_allowed_overpayment} sats)"
969            )));
970        }
971
972        if overpayment > 0 {
973            info!(
974                overpayment_sats = overpayment,
975                stored_fee_sats = stored_fee,
976                current_fee_sats = current_fee,
977                "FeesIncluded fee overpayment applied for Bolt11"
978            );
979        }
980
981        Ok(receiver_amount.saturating_add(overpayment))
982    }
983
984    async fn send_bolt11_invoice(
985        &self,
986        invoice_details: &Bolt11InvoiceDetails,
987        spark_transfer_fee_sats: Option<u64>,
988        lightning_fee_sats: u64,
989        request: &SendPaymentRequest,
990        amount_override: Option<u64>,
991        amount: u128,
992    ) -> Result<SendPaymentResponse, SdkError> {
993        // Handle FeesIncluded for amountless Bolt11 invoices
994        let amount_to_send = if request.prepare_response.fee_policy == FeePolicy::FeesIncluded
995            && invoice_details.amount_msat.is_none()
996            && amount_override.is_none()
997        {
998            let amt = self
999                .calculate_fees_included_bolt11_amount(
1000                    &invoice_details.invoice.bolt11,
1001                    amount.try_into()?,
1002                    lightning_fee_sats,
1003                )
1004                .await?;
1005            Some(u128::from(amt))
1006        } else {
1007            match amount_override {
1008                // Amount override provided
1009                Some(amt) => Some(amt.into()),
1010                None => match invoice_details.amount_msat {
1011                    // We are not sending amount in case the invoice contains it.
1012                    Some(_) => None,
1013                    // We are sending amount for zero amount invoice
1014                    None => Some(amount),
1015                },
1016            }
1017        };
1018        let (prefer_spark, completion_timeout_secs) = match request.options {
1019            Some(SendPaymentOptions::Bolt11Invoice {
1020                prefer_spark,
1021                completion_timeout_secs,
1022            }) => (prefer_spark, completion_timeout_secs),
1023            _ => (self.config.prefer_spark_over_lightning, None),
1024        };
1025        let fee_sats = match (prefer_spark, spark_transfer_fee_sats, lightning_fee_sats) {
1026            (true, Some(fee), _) => fee,
1027            _ => lightning_fee_sats,
1028        };
1029        let transfer_id = request
1030            .idempotency_key
1031            .as_ref()
1032            .map(|idempotency_key| TransferId::from_str(idempotency_key))
1033            .transpose()?;
1034
1035        let payment_response = Box::pin(
1036            self.spark_wallet.pay_lightning_invoice(
1037                &invoice_details.invoice.bolt11,
1038                amount_to_send
1039                    .map(|a| Ok::<u64, SdkError>(a.try_into()?))
1040                    .transpose()?,
1041                Some(fee_sats),
1042                prefer_spark,
1043                transfer_id,
1044            ),
1045        )
1046        .await?;
1047        let payment = match payment_response.lightning_payment {
1048            Some(lightning_payment) => {
1049                let ssp_id = lightning_payment.id.clone();
1050                let htlc_details = payment_response
1051                    .transfer
1052                    .htlc_preimage_request
1053                    .ok_or_else(|| {
1054                        SdkError::Generic(
1055                            "Missing HTLC details for Lightning send payment".to_string(),
1056                        )
1057                    })?
1058                    .try_into()?;
1059                let payment = Payment::from_lightning(
1060                    lightning_payment,
1061                    amount,
1062                    payment_response.transfer.id.to_string(),
1063                    htlc_details,
1064                )?;
1065                self.poll_lightning_send_payment(&payment, ssp_id);
1066                payment
1067            }
1068            None => payment_response.transfer.try_into()?,
1069        };
1070
1071        let completion_timeout_secs = completion_timeout_secs.unwrap_or(0);
1072
1073        if completion_timeout_secs == 0 {
1074            // Insert the payment into storage to make it immediately available for listing
1075            self.storage.insert_payment(payment.clone()).await?;
1076
1077            return Ok(SendPaymentResponse { payment });
1078        }
1079
1080        let payment = self
1081            .wait_for_payment(
1082                WaitForPaymentIdentifier::PaymentId(payment.id.clone()),
1083                completion_timeout_secs,
1084            )
1085            .await
1086            .unwrap_or(payment);
1087
1088        // Insert the payment into storage to make it immediately available for listing
1089        self.storage.insert_payment(payment.clone()).await?;
1090
1091        Ok(SendPaymentResponse { payment })
1092    }
1093
1094    async fn send_bitcoin_address(
1095        &self,
1096        address: &BitcoinAddressDetails,
1097        fee_quote: &SendOnchainFeeQuote,
1098        request: &SendPaymentRequest,
1099    ) -> Result<SendPaymentResponse, SdkError> {
1100        // Extract confirmation speed from options
1101        let confirmation_speed = match &request.options {
1102            Some(SendPaymentOptions::BitcoinAddress { confirmation_speed }) => {
1103                confirmation_speed.clone()
1104            }
1105            None => OnchainConfirmationSpeed::Fast, // Default to fast
1106            _ => {
1107                return Err(SdkError::InvalidInput(
1108                    "Invalid options for Bitcoin address payment".to_string(),
1109                ));
1110            }
1111        };
1112
1113        let exit_speed: ExitSpeed = confirmation_speed.clone().into();
1114
1115        // Calculate fee based on selected speed
1116        let fee_sats = match confirmation_speed {
1117            OnchainConfirmationSpeed::Fast => fee_quote.speed_fast.total_fee_sat(),
1118            OnchainConfirmationSpeed::Medium => fee_quote.speed_medium.total_fee_sat(),
1119            OnchainConfirmationSpeed::Slow => fee_quote.speed_slow.total_fee_sat(),
1120        };
1121
1122        // Compute amount - for FeesIncluded, receiver gets total minus fees
1123        let amount_sats: u64 = if request.prepare_response.fee_policy == FeePolicy::FeesIncluded {
1124            let total_sats: u64 = request.prepare_response.amount.try_into()?;
1125            total_sats.saturating_sub(fee_sats)
1126        } else {
1127            request.prepare_response.amount.try_into()?
1128        };
1129
1130        // Validate the output amount meets the dust limit for this address type
1131        let dust_limit_sats = get_dust_limit_sats(&address.address)?;
1132        if amount_sats < dust_limit_sats {
1133            return Err(SdkError::InvalidInput(format!(
1134                "Amount is below the minimum of {dust_limit_sats} sats required for this address"
1135            )));
1136        }
1137
1138        let transfer_id = request
1139            .idempotency_key
1140            .as_ref()
1141            .map(|idempotency_key| TransferId::from_str(idempotency_key))
1142            .transpose()?;
1143        let response = self
1144            .spark_wallet
1145            .withdraw(
1146                &address.address,
1147                Some(amount_sats),
1148                exit_speed,
1149                fee_quote.clone().into(),
1150                transfer_id,
1151            )
1152            .await?;
1153
1154        let payment: Payment = response.try_into()?;
1155
1156        self.storage.insert_payment(payment.clone()).await?;
1157
1158        Ok(SendPaymentResponse { payment })
1159    }
1160
1161    pub(super) async fn wait_for_payment(
1162        &self,
1163        identifier: WaitForPaymentIdentifier,
1164        completion_timeout_secs: u32,
1165    ) -> Result<Payment, SdkError> {
1166        let (tx, mut rx) = mpsc::channel(20);
1167        let id = self
1168            .add_event_listener(Box::new(InternalEventListener::new(tx)))
1169            .await;
1170
1171        // First check if we already have the completed payment in storage
1172        let payment = match &identifier {
1173            WaitForPaymentIdentifier::PaymentId(payment_id) => self
1174                .storage
1175                .get_payment_by_id(payment_id.clone())
1176                .await
1177                .ok(),
1178            WaitForPaymentIdentifier::PaymentRequest(payment_request) => {
1179                self.storage
1180                    .get_payment_by_invoice(payment_request.clone())
1181                    .await?
1182            }
1183        };
1184        if let Some(payment) = payment
1185            && payment.status == PaymentStatus::Completed
1186        {
1187            self.remove_event_listener(&id).await;
1188            return Ok(payment);
1189        }
1190
1191        let timeout_res = timeout(Duration::from_secs(completion_timeout_secs.into()), async {
1192            loop {
1193                let Some(event) = rx.recv().await else {
1194                    return Err(SdkError::Generic("Event channel closed".to_string()));
1195                };
1196
1197                let SdkEvent::PaymentSucceeded { payment } = event else {
1198                    continue;
1199                };
1200
1201                if is_payment_match(&payment, &identifier) {
1202                    return Ok(payment);
1203                }
1204            }
1205        })
1206        .await
1207        .map_err(|_| SdkError::Generic("Timeout waiting for payment".to_string()));
1208
1209        self.remove_event_listener(&id).await;
1210        timeout_res?
1211    }
1212
1213    // Pools the lightning send payment until it is in completed state.
1214    fn poll_lightning_send_payment(&self, payment: &Payment, ssp_id: String) {
1215        const MAX_POLL_ATTEMPTS: u32 = 20;
1216        let payment_id = payment.id.clone();
1217        info!("Polling lightning send payment {}", payment_id);
1218
1219        let Some(htlc_details) = payment.details.as_ref().and_then(|d| match d {
1220            PaymentDetails::Lightning { htlc_details, .. } => Some(htlc_details.clone()),
1221            _ => None,
1222        }) else {
1223            error!(
1224                "Missing HTLC details for lightning send payment {payment_id}, skipping polling"
1225            );
1226            return;
1227        };
1228        let spark_wallet = self.spark_wallet.clone();
1229        let storage = self.storage.clone();
1230        let sync_coordinator = self.sync_coordinator.clone();
1231        let event_emitter = self.event_emitter.clone();
1232        let payment = payment.clone();
1233        let payment_id = payment_id.clone();
1234        let mut shutdown = self.shutdown_sender.subscribe();
1235        let span = tracing::Span::current();
1236
1237        tokio::spawn(async move {
1238            for i in 0..MAX_POLL_ATTEMPTS {
1239                info!(
1240                    "Polling lightning send payment {} attempt {}",
1241                    payment_id, i
1242                );
1243                select! {
1244                    _ = shutdown.changed() => {
1245                        info!("Shutdown signal received");
1246                        return;
1247                    },
1248                    p = spark_wallet.fetch_lightning_send_payment(&ssp_id) => {
1249                        if let Ok(Some(p)) = p && let Ok(payment) = Payment::from_lightning(p.clone(), payment.amount, payment.id.clone(), htlc_details.clone()) {
1250                            info!("Polling payment status = {} {:?}", payment.status, p.status);
1251                            if payment.status != PaymentStatus::Pending {
1252                                info!("Polling payment completed status = {}", payment.status);
1253                                // Update storage before emitting event so that
1254                                // get_payment returns the correct status immediately.
1255                                if let Err(e) = storage.insert_payment(payment.clone()).await {
1256                                    error!("Failed to update payment in storage: {e:?}");
1257                                }
1258                                // Fetch the payment to include already stored metadata
1259                                get_payment_and_emit_event(&storage, &event_emitter, payment.clone()).await;
1260                                sync_coordinator
1261                                    .trigger_sync_no_wait(SyncType::WalletState, true)
1262                                    .await;
1263                                return;
1264                            }
1265                        }
1266
1267                        let sleep_time = if i < 5 {
1268                            Duration::from_secs(1)
1269                        } else {
1270                            Duration::from_secs(i.into())
1271                        };
1272                        tokio::time::sleep(sleep_time).await;
1273                    }
1274                }
1275            }
1276        }.instrument(span));
1277    }
1278
1279    #[expect(clippy::too_many_arguments)]
1280    async fn convert_token_for_bolt11_invoice(
1281        &self,
1282        conversion_options: &ConversionOptions,
1283        spark_transfer_fee_sats: Option<u64>,
1284        lightning_fee_sats: u64,
1285        request: &SendPaymentRequest,
1286        conversion_purpose: &ConversionPurpose,
1287        amount: u128,
1288        token_identifier: Option<&String>,
1289    ) -> Result<TokenConversionResponse, SdkError> {
1290        // Determine the fee to be used based on preference
1291        let fee_sats = match request.options {
1292            Some(SendPaymentOptions::Bolt11Invoice { prefer_spark, .. }) => {
1293                match (prefer_spark, spark_transfer_fee_sats) {
1294                    (true, Some(fee)) => fee,
1295                    _ => lightning_fee_sats,
1296                }
1297            }
1298            _ => lightning_fee_sats,
1299        };
1300        // The absolute minimum amount out is the lightning invoice amount plus fee
1301        let min_amount_out = amount.saturating_add(u128::from(fee_sats));
1302
1303        self.token_converter
1304            .convert(
1305                conversion_options,
1306                conversion_purpose,
1307                token_identifier,
1308                ConversionAmount::MinAmountOut(min_amount_out),
1309            )
1310            .await
1311            .map_err(Into::into)
1312    }
1313
1314    /// Gets conversion options for a payment, auto-populating from stable balance config if needed.
1315    ///
1316    /// Returns the provided options if set, or auto-populates from stable balance config
1317    /// if configured and there's not enough sats balance to cover the payment.
1318    async fn get_conversion_options_for_payment(
1319        &self,
1320        options: Option<&ConversionOptions>,
1321        token_identifier: Option<&String>,
1322        payment_amount: u128,
1323    ) -> Result<Option<ConversionOptions>, SdkError> {
1324        if let Some(stable_balance) = &self.stable_balance {
1325            stable_balance
1326                .get_conversion_options(options, token_identifier, payment_amount)
1327                .await
1328                .map_err(Into::into)
1329        } else {
1330            Ok(options.cloned())
1331        }
1332    }
1333
1334    async fn convert_token_for_bitcoin_address(
1335        &self,
1336        conversion_options: &ConversionOptions,
1337        fee_quote: &SendOnchainFeeQuote,
1338        request: &SendPaymentRequest,
1339        conversion_purpose: &ConversionPurpose,
1340        amount: u128,
1341        token_identifier: Option<&String>,
1342    ) -> Result<TokenConversionResponse, SdkError> {
1343        // Derive fee_sats from request.options confirmation speed
1344        let fee_sats = match &request.options {
1345            Some(SendPaymentOptions::BitcoinAddress { confirmation_speed }) => {
1346                match confirmation_speed {
1347                    OnchainConfirmationSpeed::Slow => fee_quote.speed_slow.total_fee_sat(),
1348                    OnchainConfirmationSpeed::Medium => fee_quote.speed_medium.total_fee_sat(),
1349                    OnchainConfirmationSpeed::Fast => fee_quote.speed_fast.total_fee_sat(),
1350                }
1351            }
1352            _ => fee_quote.speed_fast.total_fee_sat(), // Default to fast
1353        };
1354
1355        // The absolute minimum amount out is the amount plus fee
1356        let min_amount_out = amount.saturating_add(u128::from(fee_sats));
1357
1358        self.token_converter
1359            .convert(
1360                conversion_options,
1361                conversion_purpose,
1362                token_identifier,
1363                ConversionAmount::MinAmountOut(min_amount_out),
1364            )
1365            .await
1366            .map_err(Into::into)
1367    }
1368}