breez_sdk_spark/sdk/
sync.rs

1use spark_wallet::WalletEvent;
2use std::sync::Arc;
3use tokio::sync::watch;
4use tokio_with_wasm::alias as tokio;
5use tracing::{Instrument, debug, error, info, trace, warn};
6use web_time::{Duration, Instant, SystemTime};
7
8use crate::{
9    DepositInfo, InputType, MaxFee, PaymentDetails, PaymentType,
10    error::SdkError,
11    events::{InternalSyncedEvent, SdkEvent},
12    lnurl::ListMetadataRequest,
13    models::{Payment, SyncWalletRequest, SyncWalletResponse},
14    persist::{ObjectCacheRepository, UpdateDepositPayload},
15    sync::SparkSyncService,
16    utils::{
17        deposit_chain_syncer::DepositChainSyncer, run_with_shutdown, utxo_fetcher::DetailedUtxo,
18    },
19};
20
21use super::{
22    BreezSdk, CLAIM_TX_SIZE_VBYTES, SYNC_PAGING_LIMIT, SyncRequest, SyncType,
23    helpers::{BalanceWatcher, update_balances},
24    parse_input,
25};
26
27impl BreezSdk {
28    pub(super) fn periodic_sync(&self, initial_synced_sender: watch::Sender<bool>) {
29        let sdk = self.clone();
30        let mut shutdown_receiver = sdk.shutdown_sender.subscribe();
31        let mut subscription = sdk.spark_wallet.subscribe_events();
32        let sync_coordinator = sdk.sync_coordinator.clone();
33        let mut sync_trigger_receiver = sdk.sync_coordinator.subscribe();
34        let mut last_sync_time = SystemTime::now();
35
36        let sync_interval = u64::from(self.config.sync_interval_secs);
37        let span = tracing::Span::current();
38        tokio::spawn(async move {
39            let balance_watcher =
40                BalanceWatcher::new(sdk.spark_wallet.clone(), sdk.storage.clone());
41            let balance_watcher_id = sdk.add_event_listener(Box::new(balance_watcher)).await;
42            loop {
43                tokio::select! {
44                    _ = shutdown_receiver.changed() => {
45                        if !sdk.remove_event_listener(&balance_watcher_id).await {
46                            error!("Failed to remove balance watcher listener");
47                        }
48                        info!("Deposit tracking loop shutdown signal received");
49                        return;
50                    }
51                    event = subscription.recv() => {
52                        match event {
53                            Ok(event) => {
54                                info!("Received event: {event}");
55                                trace!("Received event: {:?}", event);
56                                sdk.handle_wallet_event(event).await;
57                            }
58                            Err(e) => {
59                                error!("Failed to receive event: {e:?}");
60                            }
61                        }
62                    }
63                    sync_type_res = sync_trigger_receiver.recv() => {
64                        let Ok(sync_request) = sync_type_res else {
65                            continue;
66                        };
67                        info!("Sync trigger changed: {:?}", &sync_request);
68                        let cloned_sdk = sdk.clone();
69                        let initial_synced_sender = initial_synced_sender.clone();
70                        if let Some(true) = Box::pin(run_with_shutdown(shutdown_receiver.clone(), "Sync trigger changed", async move {
71                            if let Err(e) = cloned_sdk.sync_wallet_internal(&sync_request).await {
72                                error!("Failed to sync wallet: {e:?}");
73                                let () = sync_request.reply(Some(e)).await;
74                                return false;
75                            }
76                            // Notify that the requested sync is complete
77                            let () = sync_request.reply(None).await;
78                            // If this was a full sync, notify the initial synced watcher
79                            if sync_request.sync_type.contains(SyncType::Full) {
80                                if let Err(e) = initial_synced_sender.send(true) {
81                                    error!("Failed to send initial synced signal: {e:?}");
82                                }
83                                return true;
84                            }
85
86                            false
87                        })).await {
88                            last_sync_time = SystemTime::now();
89                        }
90                    }
91                    // Ensure we sync at least the configured interval
92                    () = tokio::time::sleep(Duration::from_secs(10)) => {
93                        let now = SystemTime::now();
94                        if let Ok(elapsed) = now.duration_since(last_sync_time) && elapsed.as_secs() >= sync_interval {
95                            sync_coordinator.trigger_sync_no_wait(SyncType::Full, false).await;
96                        }
97                    }
98                }
99            }
100        }.instrument(span));
101    }
102
103    pub(super) async fn handle_wallet_event(&self, event: WalletEvent) {
104        match event {
105            WalletEvent::DepositConfirmed(_) => {
106                info!("Deposit confirmed");
107            }
108            WalletEvent::StreamConnected => {
109                info!("Stream connected");
110            }
111            WalletEvent::StreamDisconnected => {
112                info!("Stream disconnected");
113            }
114            WalletEvent::Synced => {
115                info!("Synced");
116                self.sync_coordinator
117                    .trigger_sync_no_wait(super::SyncType::Full, true)
118                    .await;
119            }
120            WalletEvent::TransferClaimed(transfer) => {
121                info!("Transfer claimed");
122                if let Ok(mut payment) = Payment::try_from(transfer) {
123                    // Insert the payment into storage to make it immediately available for listing
124                    if let Err(e) = self.storage.insert_payment(payment.clone()).await {
125                        error!("Failed to insert succeeded payment: {e:?}");
126                    }
127
128                    // Ensure potential lnurl metadata is synced before emitting the event.
129                    // Note this is already synced at TransferClaimStarting, but it might not have completed yet, so that could race.
130                    self.sync_single_lnurl_metadata(&mut payment).await;
131
132                    // Trigger preimage publisher now that the payment is completed.
133                    // The lnurl metadata was likely already synced during TransferClaimStarting,
134                    // but the payment was still pending at that point so the preimage publisher
135                    // couldn't process it. Now that the payment is completed, re-trigger.
136                    let _ = self.lnurl_preimage_trigger.send(());
137
138                    self.event_emitter
139                        .emit(&SdkEvent::PaymentSucceeded { payment })
140                        .await;
141                }
142                self.sync_coordinator
143                    .trigger_sync_no_wait(super::SyncType::WalletState, true)
144                    .await;
145            }
146            WalletEvent::TransferClaimStarting(transfer) => {
147                info!("Transfer claim starting");
148                if let Ok(mut payment) = Payment::try_from(transfer) {
149                    // Insert the payment into storage to make it immediately available for listing
150                    if let Err(e) = self.storage.insert_payment(payment.clone()).await {
151                        error!("Failed to insert pending payment: {e:?}");
152                    }
153
154                    // Ensure potential lnurl metadata is synced before emitting the event
155                    self.sync_single_lnurl_metadata(&mut payment).await;
156
157                    self.event_emitter
158                        .emit(&SdkEvent::PaymentPending { payment })
159                        .await;
160                }
161                self.sync_coordinator
162                    .trigger_sync_no_wait(super::SyncType::WalletState, true)
163                    .await;
164            }
165            WalletEvent::Optimization(event) => {
166                info!("Optimization event: {:?}", event);
167            }
168        }
169    }
170
171    pub(super) async fn sync_single_lnurl_metadata(&self, payment: &mut Payment) {
172        if payment.payment_type != PaymentType::Receive {
173            return;
174        }
175
176        let Some(PaymentDetails::Lightning {
177            invoice,
178            lnurl_receive_metadata,
179            ..
180        }) = &mut payment.details
181        else {
182            return;
183        };
184
185        if lnurl_receive_metadata.is_some() {
186            // Already have lnurl metadata
187            return;
188        }
189
190        let Ok(input) = parse_input(invoice, None).await else {
191            error!(
192                "Failed to parse invoice for lnurl metadata sync: {}",
193                invoice
194            );
195            return;
196        };
197
198        let InputType::Bolt11Invoice(details) = input else {
199            error!(
200                "Input is not a Bolt11 invoice for lnurl metadata sync: {}",
201                invoice
202            );
203            return;
204        };
205
206        // If there is a description hash, we assume this is a lnurl payment.
207        if details.description_hash.is_none() {
208            return;
209        }
210
211        // Let's check whether the lnurl receive metadata was already synced, then return early.
212        // Important: Only return early if metadata is actually present (Some), otherwise we need
213        // to trigger a sync. This prevents a race condition where the payment is in storage but
214        // metadata sync from TransferClaimStarting hasn't completed yet.
215        if let Ok(db_payment) = self.storage.get_payment_by_id(payment.id.clone()).await
216            && let Some(PaymentDetails::Lightning {
217                lnurl_receive_metadata: db_lnurl_receive_metadata @ Some(_),
218                ..
219            }) = db_payment.details
220        {
221            *lnurl_receive_metadata = db_lnurl_receive_metadata;
222            return;
223        }
224
225        // Sync lnurl metadata directly instead of going through the sync trigger,
226        // because this function is called from the sync loop's event handler,
227        // which would deadlock waiting for itself to process the trigger.
228        if let Err(e) = self.sync_lnurl_metadata().await {
229            error!("Failed to sync lnurl metadata for invoice {invoice}: {e}");
230            return;
231        }
232
233        let db_payment = match self.storage.get_payment_by_id(payment.id.clone()).await {
234            Ok(p) => p,
235            Err(e) => {
236                debug!("Payment not found in storage for invoice {}: {e}", invoice);
237                return;
238            }
239        };
240
241        let Some(PaymentDetails::Lightning {
242            lnurl_receive_metadata: db_lnurl_receive_metadata,
243            ..
244        }) = db_payment.details
245        else {
246            debug!(
247                "No lnurl receive metadata in storage for invoice {}",
248                invoice
249            );
250            return;
251        };
252        *lnurl_receive_metadata = db_lnurl_receive_metadata;
253    }
254
255    #[allow(clippy::too_many_lines)]
256    pub(super) async fn sync_wallet_internal(&self, request: &SyncRequest) -> Result<(), SdkError> {
257        let cache = ObjectCacheRepository::new(self.storage.clone());
258        let sync_interval_secs = u64::from(self.config.sync_interval_secs);
259
260        let now = SystemTime::now()
261            .duration_since(SystemTime::UNIX_EPOCH)
262            .map(|d| d.as_secs())
263            .unwrap_or(0);
264
265        // Skip if we synced recently (unless forced).
266        if !request.force
267            && let Some(last) = cache.get_last_sync_time().await?
268            && now.saturating_sub(last) < sync_interval_secs
269        {
270            debug!("sync_wallet_internal: Synced recently, skipping");
271            return Ok(());
272        }
273
274        // Update last sync time if this is a full sync.
275        if request.sync_type.contains(SyncType::Full)
276            && let Err(e) = cache.set_last_sync_time(now).await
277        {
278            error!("sync_wallet_internal: Failed to update last sync time: {e:?}");
279        }
280
281        let start_time = Instant::now();
282
283        let sync_wallet = async {
284            let wallet_synced = if request.sync_type.contains(SyncType::Wallet) {
285                debug!("sync_wallet_internal: Starting Wallet sync");
286                let wallet_start = Instant::now();
287                match self.spark_wallet.sync().await {
288                    Ok(()) => {
289                        debug!(
290                            "sync_wallet_internal: Wallet sync completed in {:?}",
291                            wallet_start.elapsed()
292                        );
293                        true
294                    }
295                    Err(e) => {
296                        error!(
297                            "sync_wallet_internal: Spark wallet sync failed in {:?}: {e:?}",
298                            wallet_start.elapsed()
299                        );
300                        false
301                    }
302                }
303            } else {
304                trace!("sync_wallet_internal: Skipping Wallet sync");
305                false
306            };
307
308            let wallet_state_synced = if request.sync_type.contains(SyncType::WalletState) {
309                debug!("sync_wallet_internal: Starting WalletState sync");
310                let wallet_state_start = Instant::now();
311                match self.sync_wallet_state_to_storage().await {
312                    Ok(()) => {
313                        debug!(
314                            "sync_wallet_internal: WalletState sync completed in {:?}",
315                            wallet_state_start.elapsed()
316                        );
317                        true
318                    }
319                    Err(e) => {
320                        error!(
321                            "sync_wallet_internal: Failed to sync wallet state to storage in {:?}: {e:?}",
322                            wallet_state_start.elapsed()
323                        );
324                        false
325                    }
326                }
327            } else {
328                trace!("sync_wallet_internal: Skipping WalletState sync");
329                false
330            };
331
332            (wallet_synced, wallet_state_synced)
333        };
334
335        let sync_lnurl = async {
336            if request.sync_type.contains(SyncType::LnurlMetadata) {
337                debug!("sync_wallet_internal: Starting LnurlMetadata sync");
338                let lnurl_start = Instant::now();
339                match self.sync_lnurl_metadata().await {
340                    Ok(()) => {
341                        debug!(
342                            "sync_wallet_internal: LnurlMetadata sync completed in {:?}",
343                            lnurl_start.elapsed()
344                        );
345                        true
346                    }
347                    Err(e) => {
348                        error!(
349                            "sync_wallet_internal: Failed to sync lnurl metadata in {:?}: {e:?}",
350                            lnurl_start.elapsed()
351                        );
352                        false
353                    }
354                }
355            } else {
356                trace!("sync_wallet_internal: Skipping LnurlMetadata sync");
357                false
358            }
359        };
360
361        let sync_deposits = async {
362            if request.sync_type.contains(SyncType::Deposits) {
363                debug!("sync_wallet_internal: Starting Deposits sync");
364                let deposits_start = Instant::now();
365                match self.check_and_claim_static_deposits().await {
366                    Ok(()) => {
367                        debug!(
368                            "sync_wallet_internal: Deposits sync completed in {:?}",
369                            deposits_start.elapsed()
370                        );
371                        true
372                    }
373                    Err(e) => {
374                        error!(
375                            "sync_wallet_internal: Failed to check and claim static deposits in {:?}: {e:?}",
376                            deposits_start.elapsed()
377                        );
378                        false
379                    }
380                }
381            } else {
382                trace!("sync_wallet_internal: Skipping Deposits sync");
383                false
384            }
385        };
386
387        let ((wallet, wallet_state), lnurl_metadata, deposits) =
388            tokio::join!(sync_wallet, sync_lnurl, sync_deposits);
389
390        // Trigger auto-conversion after sync
391        if wallet_state && let Some(stable_balance) = &self.stable_balance {
392            stable_balance.trigger_auto_convert();
393        }
394
395        let elapsed = start_time.elapsed();
396        let event = InternalSyncedEvent {
397            wallet,
398            wallet_state,
399            lnurl_metadata,
400            deposits,
401            storage_incoming: None,
402        };
403        info!("sync_wallet_internal: Wallet sync completed in {elapsed:?}: {event:?}");
404        self.event_emitter.emit_synced(&event).await;
405        Ok(())
406    }
407
408    /// Synchronizes wallet state to persistent storage, making sure we have the latest balances and payments.
409    pub(super) async fn sync_wallet_state_to_storage(&self) -> Result<(), SdkError> {
410        update_balances(self.spark_wallet.clone(), self.storage.clone()).await?;
411
412        let initial_sync_complete = *self.initial_synced_watcher.borrow();
413        let sync_service = SparkSyncService::new(
414            self.spark_wallet.clone(),
415            self.storage.clone(),
416            self.event_emitter.clone(),
417        );
418        sync_service.sync_payments(initial_sync_complete).await?;
419
420        Ok(())
421    }
422
423    pub(super) async fn check_and_claim_static_deposits(&self) -> Result<(), SdkError> {
424        self.ensure_spark_private_mode_initialized().await?;
425        let to_claim = DepositChainSyncer::new(
426            self.chain_service.clone(),
427            self.storage.clone(),
428            self.spark_wallet.clone(),
429        )
430        .sync()
431        .await?;
432
433        let mut claimed_deposits: Vec<DepositInfo> = Vec::new();
434        let mut unclaimed_deposits: Vec<DepositInfo> = Vec::new();
435        for detailed_utxo in to_claim {
436            match self
437                .claim_utxo(&detailed_utxo, self.config.max_deposit_claim_fee.clone())
438                .await
439            {
440                Ok(_) => {
441                    info!("Claimed utxo {}:{}", detailed_utxo.txid, detailed_utxo.vout);
442                    self.storage
443                        .delete_deposit(detailed_utxo.txid.to_string(), detailed_utxo.vout)
444                        .await?;
445                    claimed_deposits.push(detailed_utxo.into());
446                }
447                Err(e) => {
448                    warn!(
449                        "Failed to claim utxo {}:{}: {e}",
450                        detailed_utxo.txid, detailed_utxo.vout
451                    );
452                    self.storage
453                        .update_deposit(
454                            detailed_utxo.txid.to_string(),
455                            detailed_utxo.vout,
456                            UpdateDepositPayload::ClaimError {
457                                error: e.clone().into(),
458                            },
459                        )
460                        .await?;
461                    let mut unclaimed_deposit: DepositInfo = detailed_utxo.clone().into();
462                    unclaimed_deposit.claim_error = Some(e.into());
463                    unclaimed_deposits.push(unclaimed_deposit);
464                }
465            }
466        }
467
468        info!("background claim completed, unclaimed deposits: {unclaimed_deposits:?}");
469
470        if !unclaimed_deposits.is_empty() {
471            self.event_emitter
472                .emit(&SdkEvent::UnclaimedDeposits { unclaimed_deposits })
473                .await;
474        }
475        if !claimed_deposits.is_empty() {
476            self.event_emitter
477                .emit(&SdkEvent::ClaimedDeposits { claimed_deposits })
478                .await;
479        }
480        Ok(())
481    }
482
483    pub(super) async fn sync_lnurl_metadata(&self) -> Result<(), SdkError> {
484        let Some(lnurl_server_client) = self.lnurl_server_client.clone() else {
485            return Ok(());
486        };
487
488        let cache = ObjectCacheRepository::new(Arc::clone(&self.storage));
489        let mut updated_after = cache.fetch_lnurl_metadata_updated_after().await?;
490
491        loop {
492            debug!("Syncing lnurl metadata from updated_after {updated_after}");
493            let metadata = lnurl_server_client
494                .list_metadata(&ListMetadataRequest {
495                    offset: None,
496                    limit: Some(SYNC_PAGING_LIMIT),
497                    updated_after: Some(updated_after),
498                })
499                .await?;
500
501            if metadata.metadata.is_empty() {
502                debug!("No more lnurl metadata on offset {updated_after}");
503                break;
504            }
505
506            let len = u32::try_from(metadata.metadata.len())?;
507            let last_updated_at = metadata.metadata.last().map(|m| m.updated_at);
508            self.storage
509                .set_lnurl_metadata(metadata.metadata.into_iter().map(From::from).collect())
510                .await?;
511
512            debug!(
513                "Synchronized {} lnurl metadata at updated_after {updated_after}",
514                len
515            );
516            updated_after = last_updated_at.unwrap_or(updated_after);
517            cache
518                .save_lnurl_metadata_updated_after(updated_after)
519                .await?;
520
521            let _ = self.lnurl_preimage_trigger.send(());
522            if len < SYNC_PAGING_LIMIT {
523                // No more invoices to fetch
524                break;
525            }
526        }
527
528        Ok(())
529    }
530
531    pub(super) async fn claim_utxo(
532        &self,
533        detailed_utxo: &DetailedUtxo,
534        max_claim_fee: Option<MaxFee>,
535    ) -> Result<spark_wallet::WalletTransfer, SdkError> {
536        info!(
537            "Fetching static deposit claim quote for deposit tx {}:{} and amount: {}",
538            detailed_utxo.txid, detailed_utxo.vout, detailed_utxo.value
539        );
540        let quote = self
541            .spark_wallet
542            .fetch_static_deposit_claim_quote(detailed_utxo.tx.clone(), Some(detailed_utxo.vout))
543            .await?;
544
545        let spark_requested_fee_sats = detailed_utxo.value.saturating_sub(quote.credit_amount_sats);
546
547        let spark_requested_fee_rate = spark_requested_fee_sats.div_ceil(CLAIM_TX_SIZE_VBYTES);
548
549        let Some(max_deposit_claim_fee) = max_claim_fee else {
550            return Err(SdkError::MaxDepositClaimFeeExceeded {
551                tx: detailed_utxo.txid.to_string(),
552                vout: detailed_utxo.vout,
553                max_fee: None,
554                required_fee_sats: spark_requested_fee_sats,
555                required_fee_rate_sat_per_vbyte: spark_requested_fee_rate,
556            });
557        };
558        let max_fee = max_deposit_claim_fee
559            .to_fee(self.chain_service.as_ref())
560            .await?;
561        let max_fee_sats = max_fee.to_sats(CLAIM_TX_SIZE_VBYTES);
562        info!(
563            "User max fee: {} spark requested fee: {}",
564            max_fee_sats, spark_requested_fee_sats
565        );
566        if spark_requested_fee_sats > max_fee_sats {
567            return Err(SdkError::MaxDepositClaimFeeExceeded {
568                tx: detailed_utxo.txid.to_string(),
569                vout: detailed_utxo.vout,
570                max_fee: Some(max_fee),
571                required_fee_sats: spark_requested_fee_sats,
572                required_fee_rate_sat_per_vbyte: spark_requested_fee_rate,
573            });
574        }
575
576        info!(
577            "Claiming static deposit for utxo {}:{}",
578            detailed_utxo.txid, detailed_utxo.vout
579        );
580        let transfer = self.spark_wallet.claim_static_deposit(quote).await?;
581        info!(
582            "Claimed static deposit transfer for utxo {}:{}, value {}",
583            detailed_utxo.txid, detailed_utxo.vout, transfer.total_value_sat,
584        );
585        Ok(transfer)
586    }
587}
588
589#[cfg_attr(feature = "uniffi", uniffi::export(async_runtime = "tokio"))]
590#[allow(clippy::needless_pass_by_value)]
591impl BreezSdk {
592    /// Synchronizes the wallet with the Spark network
593    #[allow(unused_variables)]
594    pub async fn sync_wallet(
595        &self,
596        request: SyncWalletRequest,
597    ) -> Result<SyncWalletResponse, SdkError> {
598        // Use the coordinator to coalesce duplicate sync requests
599        self.sync_coordinator
600            .trigger_sync_and_wait(super::SyncType::Full, true)
601            .await?;
602        Ok(SyncWalletResponse {})
603    }
604}