Skip to main content

breez_sdk_spark/sdk/
sync.rs

1use platform_utils::time::{Instant, SystemTime};
2use platform_utils::tokio;
3use std::sync::Arc;
4use tracing::{debug, error, info, trace, warn};
5
6use super::{
7    BreezSdk, CLAIM_TX_SIZE_VBYTES, SYNC_PAGING_LIMIT, SyncType, helpers::update_balances,
8    parse_input,
9};
10use crate::{
11    DepositInfo, InputType, MaxFee, PaymentDetails, PaymentType,
12    error::SdkError,
13    events::{InternalSyncedEvent, SdkEvent},
14    lnurl::ListMetadataRequest,
15    models::{Payment, SyncWalletRequest, SyncWalletResponse},
16    persist::{ObjectCacheRepository, UpdateDepositPayload},
17    sync::SparkSyncService,
18    utils::{
19        deposit_chain_syncer::{DepositChainSyncer, TxOutput},
20        utxo_fetcher::DetailedUtxo,
21    },
22};
23
24impl BreezSdk {
25    pub(in crate::sdk) async fn sync_single_lnurl_metadata(&self, payment: &mut Payment) {
26        if payment.payment_type != PaymentType::Receive {
27            return;
28        }
29
30        let Some(PaymentDetails::Lightning {
31            invoice,
32            lnurl_receive_metadata,
33            ..
34        }) = &mut payment.details
35        else {
36            return;
37        };
38
39        if lnurl_receive_metadata.is_some() {
40            // Already have lnurl metadata
41            return;
42        }
43
44        let Ok(input) = parse_input(invoice, None).await else {
45            error!(
46                "Failed to parse invoice for lnurl metadata sync: {}",
47                invoice
48            );
49            return;
50        };
51
52        let InputType::Bolt11Invoice(details) = input else {
53            error!(
54                "Input is not a Bolt11 invoice for lnurl metadata sync: {}",
55                invoice
56            );
57            return;
58        };
59
60        // If there is a description hash, we assume this is a lnurl payment.
61        if details.description_hash.is_none() {
62            return;
63        }
64
65        // Let's check whether the lnurl receive metadata was already synced, then return early.
66        // Important: Only return early if metadata is actually present (Some), otherwise we need
67        // to trigger a sync. This prevents a race condition where the payment is in storage but
68        // metadata sync from TransferClaimStarting hasn't completed yet.
69        if let Ok(db_payment) = self.storage.get_payment_by_id(payment.id.clone()).await
70            && let Some(PaymentDetails::Lightning {
71                lnurl_receive_metadata: db_lnurl_receive_metadata @ Some(_),
72                ..
73            }) = db_payment.details
74        {
75            *lnurl_receive_metadata = db_lnurl_receive_metadata;
76            return;
77        }
78
79        // Sync lnurl metadata directly instead of going through the sync trigger,
80        // because this function is called from the sync loop's event handler,
81        // which would deadlock waiting for itself to process the trigger.
82        if let Err(e) = self.sync_lnurl_metadata().await {
83            error!("Failed to sync lnurl metadata for invoice {invoice}: {e}");
84            return;
85        }
86
87        let db_payment = match self.storage.get_payment_by_id(payment.id.clone()).await {
88            Ok(p) => p,
89            Err(e) => {
90                debug!("Payment not found in storage for invoice {}: {e}", invoice);
91                return;
92            }
93        };
94
95        let Some(PaymentDetails::Lightning {
96            lnurl_receive_metadata: db_lnurl_receive_metadata,
97            ..
98        }) = db_payment.details
99        else {
100            debug!(
101                "No lnurl receive metadata in storage for invoice {}",
102                invoice
103            );
104            return;
105        };
106        *lnurl_receive_metadata = db_lnurl_receive_metadata;
107    }
108
109    #[allow(clippy::too_many_lines)]
110    pub(super) async fn sync_wallet_internal(
111        &self,
112        sync_type: SyncType,
113        force: bool,
114    ) -> Result<(), SdkError> {
115        let cache = ObjectCacheRepository::new(self.storage.clone());
116        let sync_interval_secs = u64::from(self.config.sync_interval_secs);
117
118        let now = SystemTime::now()
119            .duration_since(SystemTime::UNIX_EPOCH)
120            .map_or(0, |d| d.as_secs());
121
122        // Skip if we synced recently (unless forced).
123        if !force
124            && let Some(last) = cache.get_last_sync_time().await?
125            && now.saturating_sub(last) < sync_interval_secs
126        {
127            debug!("sync_wallet_internal: Synced recently, skipping");
128            // When another instance shares our storage and keeps winning the sync
129            // race, we would otherwise never emit a Synced event. Emit it here so
130            // consumers are still notified that storage is up to date.
131            self.event_emitter.emit(&SdkEvent::Synced).await;
132            return Ok(());
133        }
134
135        // Update last sync time if this is a full sync.
136        if sync_type.contains(SyncType::Full)
137            && let Err(e) = cache.set_last_sync_time(now).await
138        {
139            error!("sync_wallet_internal: Failed to update last sync time: {e:?}");
140        }
141
142        let start_time = Instant::now();
143
144        let sync_wallet = async {
145            let wallet_synced = if sync_type.contains(SyncType::Wallet) {
146                debug!("sync_wallet_internal: Starting Wallet sync");
147                let wallet_start = Instant::now();
148                match self.spark_wallet.sync().await {
149                    Ok(()) => {
150                        debug!(
151                            "sync_wallet_internal: Wallet sync completed in {:?}",
152                            wallet_start.elapsed()
153                        );
154                        true
155                    }
156                    Err(e) => {
157                        error!(
158                            "sync_wallet_internal: Spark wallet sync failed in {:?}: {e:?}",
159                            wallet_start.elapsed()
160                        );
161                        false
162                    }
163                }
164            } else {
165                trace!("sync_wallet_internal: Skipping Wallet sync");
166                false
167            };
168
169            let wallet_state_synced = if sync_type.contains(SyncType::WalletState) {
170                debug!("sync_wallet_internal: Starting WalletState sync");
171                let wallet_state_start = Instant::now();
172                match self.sync_wallet_state_to_storage().await {
173                    Ok(()) => {
174                        debug!(
175                            "sync_wallet_internal: WalletState sync completed in {:?}",
176                            wallet_state_start.elapsed()
177                        );
178                        true
179                    }
180                    Err(e) => {
181                        error!(
182                            "sync_wallet_internal: Failed to sync wallet state to storage in {:?}: {e:?}",
183                            wallet_state_start.elapsed()
184                        );
185                        false
186                    }
187                }
188            } else {
189                trace!("sync_wallet_internal: Skipping WalletState sync");
190                false
191            };
192
193            (wallet_synced, wallet_state_synced)
194        };
195
196        let sync_lnurl = async {
197            if sync_type.contains(SyncType::LnurlMetadata) {
198                debug!("sync_wallet_internal: Starting LnurlMetadata sync");
199                let lnurl_start = Instant::now();
200                match self.sync_lnurl_metadata().await {
201                    Ok(()) => {
202                        debug!(
203                            "sync_wallet_internal: LnurlMetadata sync completed in {:?}",
204                            lnurl_start.elapsed()
205                        );
206                        true
207                    }
208                    Err(e) => {
209                        error!(
210                            "sync_wallet_internal: Failed to sync lnurl metadata in {:?}: {e:?}",
211                            lnurl_start.elapsed()
212                        );
213                        false
214                    }
215                }
216            } else {
217                trace!("sync_wallet_internal: Skipping LnurlMetadata sync");
218                false
219            }
220        };
221
222        let sync_deposits = async {
223            if sync_type.contains(SyncType::Deposits) {
224                debug!("sync_wallet_internal: Starting Deposits sync");
225                let deposits_start = Instant::now();
226                match self.check_and_claim_static_deposits().await {
227                    Ok(()) => {
228                        debug!(
229                            "sync_wallet_internal: Deposits sync completed in {:?}",
230                            deposits_start.elapsed()
231                        );
232                        true
233                    }
234                    Err(e) => {
235                        error!(
236                            "sync_wallet_internal: Failed to check and claim static deposits in {:?}: {e:?}",
237                            deposits_start.elapsed()
238                        );
239                        false
240                    }
241                }
242            } else {
243                trace!("sync_wallet_internal: Skipping Deposits sync");
244                false
245            }
246        };
247
248        let ((wallet, wallet_state), lnurl_metadata, deposits) =
249            tokio::join!(sync_wallet, sync_lnurl, sync_deposits);
250
251        let elapsed = start_time.elapsed();
252        let event = InternalSyncedEvent {
253            wallet,
254            wallet_state,
255            lnurl_metadata,
256            deposits,
257            storage_incoming: None,
258        };
259        info!("sync_wallet_internal: Wallet sync completed in {elapsed:?}: {event:?}");
260        self.event_emitter.emit_synced(&event).await;
261        Ok(())
262    }
263
264    /// Synchronizes wallet state to persistent storage, making sure we have the latest balances and payments.
265    pub(super) async fn sync_wallet_state_to_storage(&self) -> Result<(), SdkError> {
266        update_balances(self.spark_wallet.clone(), self.storage.clone()).await?;
267
268        let initial_sync_complete = *self.initial_synced_watcher.borrow();
269        let sync_service = SparkSyncService::new(
270            self.spark_wallet.clone(),
271            self.storage.clone(),
272            self.event_emitter.clone(),
273        );
274        sync_service.sync_payments(initial_sync_complete).await?;
275
276        Ok(())
277    }
278
279    pub(super) async fn check_and_claim_static_deposits(&self) -> Result<(), SdkError> {
280        self.maybe_ensure_spark_private_mode_initialized().await?;
281        let existing_deposits = self.storage.list_deposits().await?;
282        let existing_keys: std::collections::HashSet<TxOutput> = existing_deposits
283            .iter()
284            .map(|d| TxOutput {
285                txid: d.txid.clone(),
286                vout: d.vout,
287            })
288            .collect();
289
290        let all_utxos = DepositChainSyncer::new(
291            self.chain_service.clone(),
292            self.storage.clone(),
293            self.spark_wallet.clone(),
294        )
295        .sync()
296        .await?;
297
298        // Emit NewDeposits for any deposits not previously known
299        let new_deposits: Vec<DepositInfo> = all_utxos
300            .iter()
301            .filter(|(u, _)| {
302                !existing_keys.contains(&TxOutput {
303                    txid: u.txid.to_string(),
304                    vout: u.vout,
305                })
306            })
307            .map(|(u, is_mature)| u.clone().into_deposit_info(*is_mature))
308            .collect();
309        if !new_deposits.is_empty() {
310            self.event_emitter
311                .emit(&SdkEvent::NewDeposits { new_deposits })
312                .await;
313        }
314
315        // Only claim UTXOs with sufficient confirmations
316        let to_claim: Vec<_> = all_utxos
317            .into_iter()
318            .filter(|(_, is_mature)| *is_mature)
319            .map(|(u, _)| u)
320            .collect();
321
322        let mut claimed_deposits: Vec<DepositInfo> = Vec::new();
323        let mut unclaimed_deposits: Vec<DepositInfo> = Vec::new();
324        for detailed_utxo in to_claim {
325            match self
326                .claim_utxo(&detailed_utxo, self.config.max_deposit_claim_fee.clone())
327                .await
328            {
329                Ok(_) => {
330                    info!("Claimed utxo {}:{}", detailed_utxo.txid, detailed_utxo.vout);
331                    self.storage
332                        .delete_deposit(detailed_utxo.txid.to_string(), detailed_utxo.vout)
333                        .await?;
334                    claimed_deposits.push(detailed_utxo.into_deposit_info(true));
335                }
336                Err(e) => {
337                    warn!(
338                        "Failed to claim utxo {}:{}: {e}",
339                        detailed_utxo.txid, detailed_utxo.vout
340                    );
341                    self.storage
342                        .update_deposit(
343                            detailed_utxo.txid.to_string(),
344                            detailed_utxo.vout,
345                            UpdateDepositPayload::ClaimError {
346                                error: e.clone().into(),
347                            },
348                        )
349                        .await?;
350                    let mut unclaimed_deposit = detailed_utxo.into_deposit_info(true);
351                    unclaimed_deposit.claim_error = Some(e.into());
352                    unclaimed_deposits.push(unclaimed_deposit);
353                }
354            }
355        }
356
357        info!("background claim completed, unclaimed deposits: {unclaimed_deposits:?}");
358
359        if !unclaimed_deposits.is_empty() {
360            self.event_emitter
361                .emit(&SdkEvent::UnclaimedDeposits { unclaimed_deposits })
362                .await;
363        }
364        if !claimed_deposits.is_empty() {
365            self.event_emitter
366                .emit(&SdkEvent::ClaimedDeposits { claimed_deposits })
367                .await;
368        }
369        Ok(())
370    }
371
372    pub(super) async fn sync_lnurl_metadata(&self) -> Result<(), SdkError> {
373        let Some(lnurl_server_client) = self.lnurl_server_client.clone() else {
374            return Ok(());
375        };
376
377        let cache = ObjectCacheRepository::new(Arc::clone(&self.storage));
378        let mut updated_after = cache.fetch_lnurl_metadata_updated_after().await?;
379
380        loop {
381            debug!("Syncing lnurl metadata from updated_after {updated_after}");
382            let metadata = lnurl_server_client
383                .list_metadata(&ListMetadataRequest {
384                    offset: None,
385                    limit: Some(SYNC_PAGING_LIMIT),
386                    updated_after: Some(updated_after),
387                })
388                .await?;
389
390            if metadata.metadata.is_empty() {
391                debug!("No more lnurl metadata on offset {updated_after}");
392                break;
393            }
394
395            let len = u32::try_from(metadata.metadata.len())?;
396            let last_updated_at = metadata.metadata.last().map(|m| m.updated_at);
397            self.storage
398                .set_lnurl_metadata(metadata.metadata.into_iter().map(From::from).collect())
399                .await?;
400
401            debug!(
402                "Synchronized {} lnurl metadata at updated_after {updated_after}",
403                len
404            );
405            updated_after = last_updated_at.unwrap_or(updated_after);
406            cache
407                .save_lnurl_metadata_updated_after(updated_after)
408                .await?;
409
410            if len < SYNC_PAGING_LIMIT {
411                // No more invoices to fetch
412                break;
413            }
414        }
415
416        Ok(())
417    }
418
419    /// Submits a static deposit claim for `detailed_utxo` and returns the
420    /// resulting transfer id.
421    pub(super) async fn claim_utxo(
422        &self,
423        detailed_utxo: &DetailedUtxo,
424        max_claim_fee: Option<MaxFee>,
425    ) -> Result<String, SdkError> {
426        info!(
427            "Fetching static deposit claim quote for deposit tx {}:{} and amount: {}",
428            detailed_utxo.txid, detailed_utxo.vout, detailed_utxo.value
429        );
430        let quote = self
431            .spark_wallet
432            .fetch_static_deposit_claim_quote(detailed_utxo.tx.clone(), Some(detailed_utxo.vout))
433            .await?;
434
435        let spark_requested_fee_sats = detailed_utxo.value.saturating_sub(quote.credit_amount_sats);
436
437        let spark_requested_fee_rate = spark_requested_fee_sats.div_ceil(CLAIM_TX_SIZE_VBYTES);
438
439        let Some(max_deposit_claim_fee) = max_claim_fee else {
440            return Err(SdkError::MaxDepositClaimFeeExceeded {
441                tx: detailed_utxo.txid.to_string(),
442                vout: detailed_utxo.vout,
443                max_fee: None,
444                required_fee_sats: spark_requested_fee_sats,
445                required_fee_rate_sat_per_vbyte: spark_requested_fee_rate,
446            });
447        };
448        let max_fee = max_deposit_claim_fee
449            .to_fee(self.chain_service.as_ref())
450            .await?;
451        let max_fee_sats = max_fee.to_sats(CLAIM_TX_SIZE_VBYTES);
452        info!(
453            "User max fee: {} spark requested fee: {}",
454            max_fee_sats, spark_requested_fee_sats
455        );
456        if spark_requested_fee_sats > max_fee_sats {
457            return Err(SdkError::MaxDepositClaimFeeExceeded {
458                tx: detailed_utxo.txid.to_string(),
459                vout: detailed_utxo.vout,
460                max_fee: Some(max_fee),
461                required_fee_sats: spark_requested_fee_sats,
462                required_fee_rate_sat_per_vbyte: spark_requested_fee_rate,
463            });
464        }
465
466        info!(
467            "Claiming static deposit for utxo {}:{}",
468            detailed_utxo.txid, detailed_utxo.vout
469        );
470        let credit_amount_sats = quote.credit_amount_sats;
471        let transfer_id = self.spark_wallet.claim_static_deposit(quote).await?;
472        info!(
473            "Claimed static deposit for utxo {}:{} (deposit value {}, credit {}), transfer {transfer_id}",
474            detailed_utxo.txid, detailed_utxo.vout, detailed_utxo.value, credit_amount_sats,
475        );
476        Ok(transfer_id)
477    }
478}
479
480#[cfg_attr(feature = "uniffi", uniffi::export(async_runtime = "tokio"))]
481#[allow(clippy::needless_pass_by_value)]
482impl BreezSdk {
483    /// Synchronizes the wallet with the Spark network
484    #[allow(unused_variables)]
485    pub async fn sync_wallet(
486        &self,
487        request: SyncWalletRequest,
488    ) -> Result<SyncWalletResponse, SdkError> {
489        self.runtime
490            .run_user_sync(self, super::SyncType::Full, true)
491            .await?;
492        Ok(SyncWalletResponse {})
493    }
494}