breez_sdk_spark/sdk/
sync.rs

1use platform_utils::tokio;
2use spark_wallet::WalletEvent;
3use std::sync::Arc;
4use tokio::sync::watch;
5use tracing::{Instrument, debug, error, info, trace, warn};
6use web_time::{Duration, Instant, SystemTime};
7
8use crate::{
9    DepositInfo, InputType, MaxFee, PaymentDetails, PaymentType,
10    error::SdkError,
11    events::{InternalSyncedEvent, SdkEvent},
12    lnurl::ListMetadataRequest,
13    models::{Payment, SyncWalletRequest, SyncWalletResponse},
14    persist::{ObjectCacheRepository, UpdateDepositPayload},
15    sync::SparkSyncService,
16    utils::{
17        deposit_chain_syncer::DepositChainSyncer, payments::get_payment_and_emit_event,
18        run_with_shutdown, utxo_fetcher::DetailedUtxo,
19    },
20};
21
22use super::{
23    BreezSdk, CLAIM_TX_SIZE_VBYTES, SYNC_PAGING_LIMIT, SyncRequest, SyncType,
24    helpers::{BalanceWatcher, update_balances},
25    parse_input,
26};
27
28impl BreezSdk {
29    pub(super) fn periodic_sync(&self, initial_synced_sender: watch::Sender<bool>) {
30        let sdk = self.clone();
31        let mut shutdown_receiver = sdk.shutdown_sender.subscribe();
32        let mut subscription = sdk.spark_wallet.subscribe_events();
33        let sync_coordinator = sdk.sync_coordinator.clone();
34        let mut sync_trigger_receiver = sdk.sync_coordinator.subscribe();
35        let mut last_sync_time = SystemTime::now();
36
37        let sync_interval = u64::from(self.config.sync_interval_secs);
38        let span = tracing::Span::current();
39        tokio::spawn(async move {
40            let balance_watcher =
41                BalanceWatcher::new(sdk.spark_wallet.clone(), sdk.storage.clone());
42            let balance_watcher_id = sdk.add_event_listener(Box::new(balance_watcher)).await;
43            loop {
44                tokio::select! {
45                    _ = shutdown_receiver.changed() => {
46                        if !sdk.remove_event_listener(&balance_watcher_id).await {
47                            error!("Failed to remove balance watcher listener");
48                        }
49                        info!("Deposit tracking loop shutdown signal received");
50                        return;
51                    }
52                    event = subscription.recv() => {
53                        match event {
54                            Ok(event) => {
55                                info!("Received event: {event}");
56                                trace!("Received event: {:?}", event);
57                                sdk.handle_wallet_event(event).await;
58                            }
59                            Err(e) => {
60                                error!("Failed to receive event: {e:?}");
61                            }
62                        }
63                    }
64                    sync_type_res = sync_trigger_receiver.recv() => {
65                        let Ok(sync_request) = sync_type_res else {
66                            continue;
67                        };
68                        info!("Sync trigger changed: {:?}", &sync_request);
69                        let cloned_sdk = sdk.clone();
70                        let initial_synced_sender = initial_synced_sender.clone();
71                        if let Some(true) = Box::pin(run_with_shutdown(shutdown_receiver.clone(), "Sync trigger changed", async move {
72                            if let Err(e) = cloned_sdk.sync_wallet_internal(&sync_request).await {
73                                error!("Failed to sync wallet: {e:?}");
74                                let () = sync_request.reply(Some(e)).await;
75                                return false;
76                            }
77                            // Notify that the requested sync is complete
78                            let () = sync_request.reply(None).await;
79                            // If this was a full sync, notify the initial synced watcher
80                            if sync_request.sync_type.contains(SyncType::Full) {
81                                if let Err(e) = initial_synced_sender.send(true) {
82                                    error!("Failed to send initial synced signal: {e:?}");
83                                }
84                                return true;
85                            }
86
87                            false
88                        })).await {
89                            last_sync_time = SystemTime::now();
90                        }
91                    }
92                    // Ensure we sync at least the configured interval
93                    () = tokio::time::sleep(Duration::from_secs(10)) => {
94                        let now = SystemTime::now();
95                        if let Ok(elapsed) = now.duration_since(last_sync_time) && elapsed.as_secs() >= sync_interval {
96                            sync_coordinator.trigger_sync_no_wait(SyncType::Full, false).await;
97                        }
98                    }
99                }
100            }
101        }.instrument(span));
102    }
103
104    pub(super) async fn handle_wallet_event(&self, event: WalletEvent) {
105        match event {
106            WalletEvent::DepositConfirmed(_) => {
107                info!("Deposit confirmed");
108            }
109            WalletEvent::StreamConnected => {
110                info!("Stream connected");
111            }
112            WalletEvent::StreamDisconnected => {
113                info!("Stream disconnected");
114            }
115            WalletEvent::Synced => {
116                info!("Synced");
117                self.sync_coordinator
118                    .trigger_sync_no_wait(super::SyncType::Full, true)
119                    .await;
120            }
121            WalletEvent::TransferClaimed(transfer) => {
122                info!("Transfer claimed");
123                if let Ok(mut payment) = Payment::try_from(transfer) {
124                    // Insert the payment into storage to make it immediately available for listing
125                    if let Err(e) = self.storage.insert_payment(payment.clone()).await {
126                        error!("Failed to insert succeeded payment: {e:?}");
127                    }
128
129                    // Ensure potential lnurl metadata is synced before emitting the event.
130                    // Note this is already synced at TransferClaimStarting, but it might not have completed yet, so that could race.
131                    self.sync_single_lnurl_metadata(&mut payment).await;
132
133                    // Trigger preimage publisher now that the payment is completed.
134                    // The lnurl metadata was likely already synced during TransferClaimStarting,
135                    // but the payment was still pending at that point so the preimage publisher
136                    // couldn't process it. Now that the payment is completed, re-trigger.
137                    let _ = self.lnurl_preimage_trigger.send(());
138
139                    // Update balance before emitting the event so that listeners can immediately
140                    // query the new balance.
141                    if let Err(e) =
142                        update_balances(self.spark_wallet.clone(), self.storage.clone()).await
143                    {
144                        error!("Failed to update balances before PaymentSucceeded event: {e:?}");
145                    }
146
147                    // Fetch the payment to include already stored metadata
148                    get_payment_and_emit_event(&self.storage, &self.event_emitter, payment).await;
149                }
150                self.sync_coordinator
151                    .trigger_sync_no_wait(super::SyncType::WalletState, true)
152                    .await;
153            }
154            WalletEvent::TransferClaimStarting(transfer) => {
155                info!("Transfer claim starting");
156                if let Ok(mut payment) = Payment::try_from(transfer) {
157                    // Insert the payment into storage to make it immediately available for listing
158                    if let Err(e) = self.storage.insert_payment(payment.clone()).await {
159                        error!("Failed to insert pending payment: {e:?}");
160                    }
161
162                    // Ensure potential lnurl metadata is synced before emitting the event
163                    self.sync_single_lnurl_metadata(&mut payment).await;
164
165                    // Fetch the payment to include already stored metadata
166                    get_payment_and_emit_event(&self.storage, &self.event_emitter, payment).await;
167                }
168                self.sync_coordinator
169                    .trigger_sync_no_wait(super::SyncType::WalletState, true)
170                    .await;
171            }
172            WalletEvent::Optimization(event) => {
173                info!("Optimization event: {:?}", event);
174            }
175        }
176    }
177
178    pub(super) async fn sync_single_lnurl_metadata(&self, payment: &mut Payment) {
179        if payment.payment_type != PaymentType::Receive {
180            return;
181        }
182
183        let Some(PaymentDetails::Lightning {
184            invoice,
185            lnurl_receive_metadata,
186            ..
187        }) = &mut payment.details
188        else {
189            return;
190        };
191
192        if lnurl_receive_metadata.is_some() {
193            // Already have lnurl metadata
194            return;
195        }
196
197        let Ok(input) = parse_input(invoice, None).await else {
198            error!(
199                "Failed to parse invoice for lnurl metadata sync: {}",
200                invoice
201            );
202            return;
203        };
204
205        let InputType::Bolt11Invoice(details) = input else {
206            error!(
207                "Input is not a Bolt11 invoice for lnurl metadata sync: {}",
208                invoice
209            );
210            return;
211        };
212
213        // If there is a description hash, we assume this is a lnurl payment.
214        if details.description_hash.is_none() {
215            return;
216        }
217
218        // Let's check whether the lnurl receive metadata was already synced, then return early.
219        // Important: Only return early if metadata is actually present (Some), otherwise we need
220        // to trigger a sync. This prevents a race condition where the payment is in storage but
221        // metadata sync from TransferClaimStarting hasn't completed yet.
222        if let Ok(db_payment) = self.storage.get_payment_by_id(payment.id.clone()).await
223            && let Some(PaymentDetails::Lightning {
224                lnurl_receive_metadata: db_lnurl_receive_metadata @ Some(_),
225                ..
226            }) = db_payment.details
227        {
228            *lnurl_receive_metadata = db_lnurl_receive_metadata;
229            return;
230        }
231
232        // Sync lnurl metadata directly instead of going through the sync trigger,
233        // because this function is called from the sync loop's event handler,
234        // which would deadlock waiting for itself to process the trigger.
235        if let Err(e) = self.sync_lnurl_metadata().await {
236            error!("Failed to sync lnurl metadata for invoice {invoice}: {e}");
237            return;
238        }
239
240        let db_payment = match self.storage.get_payment_by_id(payment.id.clone()).await {
241            Ok(p) => p,
242            Err(e) => {
243                debug!("Payment not found in storage for invoice {}: {e}", invoice);
244                return;
245            }
246        };
247
248        let Some(PaymentDetails::Lightning {
249            lnurl_receive_metadata: db_lnurl_receive_metadata,
250            ..
251        }) = db_payment.details
252        else {
253            debug!(
254                "No lnurl receive metadata in storage for invoice {}",
255                invoice
256            );
257            return;
258        };
259        *lnurl_receive_metadata = db_lnurl_receive_metadata;
260    }
261
262    #[allow(clippy::too_many_lines)]
263    pub(super) async fn sync_wallet_internal(&self, request: &SyncRequest) -> Result<(), SdkError> {
264        let cache = ObjectCacheRepository::new(self.storage.clone());
265        let sync_interval_secs = u64::from(self.config.sync_interval_secs);
266
267        let now = SystemTime::now()
268            .duration_since(SystemTime::UNIX_EPOCH)
269            .map(|d| d.as_secs())
270            .unwrap_or(0);
271
272        // Skip if we synced recently (unless forced).
273        if !request.force
274            && let Some(last) = cache.get_last_sync_time().await?
275            && now.saturating_sub(last) < sync_interval_secs
276        {
277            debug!("sync_wallet_internal: Synced recently, skipping");
278            return Ok(());
279        }
280
281        // Update last sync time if this is a full sync.
282        if request.sync_type.contains(SyncType::Full)
283            && let Err(e) = cache.set_last_sync_time(now).await
284        {
285            error!("sync_wallet_internal: Failed to update last sync time: {e:?}");
286        }
287
288        let start_time = Instant::now();
289
290        let sync_wallet = async {
291            let wallet_synced = if request.sync_type.contains(SyncType::Wallet) {
292                debug!("sync_wallet_internal: Starting Wallet sync");
293                let wallet_start = Instant::now();
294                match self.spark_wallet.sync().await {
295                    Ok(()) => {
296                        debug!(
297                            "sync_wallet_internal: Wallet sync completed in {:?}",
298                            wallet_start.elapsed()
299                        );
300                        true
301                    }
302                    Err(e) => {
303                        error!(
304                            "sync_wallet_internal: Spark wallet sync failed in {:?}: {e:?}",
305                            wallet_start.elapsed()
306                        );
307                        false
308                    }
309                }
310            } else {
311                trace!("sync_wallet_internal: Skipping Wallet sync");
312                false
313            };
314
315            let wallet_state_synced = if request.sync_type.contains(SyncType::WalletState) {
316                debug!("sync_wallet_internal: Starting WalletState sync");
317                let wallet_state_start = Instant::now();
318                match self.sync_wallet_state_to_storage().await {
319                    Ok(()) => {
320                        debug!(
321                            "sync_wallet_internal: WalletState sync completed in {:?}",
322                            wallet_state_start.elapsed()
323                        );
324                        true
325                    }
326                    Err(e) => {
327                        error!(
328                            "sync_wallet_internal: Failed to sync wallet state to storage in {:?}: {e:?}",
329                            wallet_state_start.elapsed()
330                        );
331                        false
332                    }
333                }
334            } else {
335                trace!("sync_wallet_internal: Skipping WalletState sync");
336                false
337            };
338
339            (wallet_synced, wallet_state_synced)
340        };
341
342        let sync_lnurl = async {
343            if request.sync_type.contains(SyncType::LnurlMetadata) {
344                debug!("sync_wallet_internal: Starting LnurlMetadata sync");
345                let lnurl_start = Instant::now();
346                match self.sync_lnurl_metadata().await {
347                    Ok(()) => {
348                        debug!(
349                            "sync_wallet_internal: LnurlMetadata sync completed in {:?}",
350                            lnurl_start.elapsed()
351                        );
352                        true
353                    }
354                    Err(e) => {
355                        error!(
356                            "sync_wallet_internal: Failed to sync lnurl metadata in {:?}: {e:?}",
357                            lnurl_start.elapsed()
358                        );
359                        false
360                    }
361                }
362            } else {
363                trace!("sync_wallet_internal: Skipping LnurlMetadata sync");
364                false
365            }
366        };
367
368        let sync_deposits = async {
369            if request.sync_type.contains(SyncType::Deposits) {
370                debug!("sync_wallet_internal: Starting Deposits sync");
371                let deposits_start = Instant::now();
372                match self.check_and_claim_static_deposits().await {
373                    Ok(()) => {
374                        debug!(
375                            "sync_wallet_internal: Deposits sync completed in {:?}",
376                            deposits_start.elapsed()
377                        );
378                        true
379                    }
380                    Err(e) => {
381                        error!(
382                            "sync_wallet_internal: Failed to check and claim static deposits in {:?}: {e:?}",
383                            deposits_start.elapsed()
384                        );
385                        false
386                    }
387                }
388            } else {
389                trace!("sync_wallet_internal: Skipping Deposits sync");
390                false
391            }
392        };
393
394        let ((wallet, wallet_state), lnurl_metadata, deposits) =
395            tokio::join!(sync_wallet, sync_lnurl, sync_deposits);
396
397        // Trigger auto-conversion after sync
398        if wallet_state && let Some(stable_balance) = &self.stable_balance {
399            stable_balance.trigger_auto_convert();
400        }
401
402        let elapsed = start_time.elapsed();
403        let event = InternalSyncedEvent {
404            wallet,
405            wallet_state,
406            lnurl_metadata,
407            deposits,
408            storage_incoming: None,
409        };
410        info!("sync_wallet_internal: Wallet sync completed in {elapsed:?}: {event:?}");
411        self.event_emitter.emit_synced(&event).await;
412        Ok(())
413    }
414
415    /// Synchronizes wallet state to persistent storage, making sure we have the latest balances and payments.
416    pub(super) async fn sync_wallet_state_to_storage(&self) -> Result<(), SdkError> {
417        update_balances(self.spark_wallet.clone(), self.storage.clone()).await?;
418
419        let initial_sync_complete = *self.initial_synced_watcher.borrow();
420        let sync_service = SparkSyncService::new(
421            self.spark_wallet.clone(),
422            self.storage.clone(),
423            self.event_emitter.clone(),
424        );
425        sync_service.sync_payments(initial_sync_complete).await?;
426
427        Ok(())
428    }
429
430    pub(super) async fn check_and_claim_static_deposits(&self) -> Result<(), SdkError> {
431        self.ensure_spark_private_mode_initialized().await?;
432        let to_claim = DepositChainSyncer::new(
433            self.chain_service.clone(),
434            self.storage.clone(),
435            self.spark_wallet.clone(),
436        )
437        .sync()
438        .await?;
439
440        let mut claimed_deposits: Vec<DepositInfo> = Vec::new();
441        let mut unclaimed_deposits: Vec<DepositInfo> = Vec::new();
442        for detailed_utxo in to_claim {
443            match self
444                .claim_utxo(&detailed_utxo, self.config.max_deposit_claim_fee.clone())
445                .await
446            {
447                Ok(_) => {
448                    info!("Claimed utxo {}:{}", detailed_utxo.txid, detailed_utxo.vout);
449                    self.storage
450                        .delete_deposit(detailed_utxo.txid.to_string(), detailed_utxo.vout)
451                        .await?;
452                    claimed_deposits.push(detailed_utxo.into());
453                }
454                Err(e) => {
455                    warn!(
456                        "Failed to claim utxo {}:{}: {e}",
457                        detailed_utxo.txid, detailed_utxo.vout
458                    );
459                    self.storage
460                        .update_deposit(
461                            detailed_utxo.txid.to_string(),
462                            detailed_utxo.vout,
463                            UpdateDepositPayload::ClaimError {
464                                error: e.clone().into(),
465                            },
466                        )
467                        .await?;
468                    let mut unclaimed_deposit: DepositInfo = detailed_utxo.clone().into();
469                    unclaimed_deposit.claim_error = Some(e.into());
470                    unclaimed_deposits.push(unclaimed_deposit);
471                }
472            }
473        }
474
475        info!("background claim completed, unclaimed deposits: {unclaimed_deposits:?}");
476
477        if !unclaimed_deposits.is_empty() {
478            self.event_emitter
479                .emit(&SdkEvent::UnclaimedDeposits { unclaimed_deposits })
480                .await;
481        }
482        if !claimed_deposits.is_empty() {
483            self.event_emitter
484                .emit(&SdkEvent::ClaimedDeposits { claimed_deposits })
485                .await;
486        }
487        Ok(())
488    }
489
490    pub(super) async fn sync_lnurl_metadata(&self) -> Result<(), SdkError> {
491        let Some(lnurl_server_client) = self.lnurl_server_client.clone() else {
492            return Ok(());
493        };
494
495        let cache = ObjectCacheRepository::new(Arc::clone(&self.storage));
496        let mut updated_after = cache.fetch_lnurl_metadata_updated_after().await?;
497
498        loop {
499            debug!("Syncing lnurl metadata from updated_after {updated_after}");
500            let metadata = lnurl_server_client
501                .list_metadata(&ListMetadataRequest {
502                    offset: None,
503                    limit: Some(SYNC_PAGING_LIMIT),
504                    updated_after: Some(updated_after),
505                })
506                .await?;
507
508            if metadata.metadata.is_empty() {
509                debug!("No more lnurl metadata on offset {updated_after}");
510                break;
511            }
512
513            let len = u32::try_from(metadata.metadata.len())?;
514            let last_updated_at = metadata.metadata.last().map(|m| m.updated_at);
515            self.storage
516                .set_lnurl_metadata(metadata.metadata.into_iter().map(From::from).collect())
517                .await?;
518
519            debug!(
520                "Synchronized {} lnurl metadata at updated_after {updated_after}",
521                len
522            );
523            updated_after = last_updated_at.unwrap_or(updated_after);
524            cache
525                .save_lnurl_metadata_updated_after(updated_after)
526                .await?;
527
528            let _ = self.lnurl_preimage_trigger.send(());
529            if len < SYNC_PAGING_LIMIT {
530                // No more invoices to fetch
531                break;
532            }
533        }
534
535        Ok(())
536    }
537
538    pub(super) async fn claim_utxo(
539        &self,
540        detailed_utxo: &DetailedUtxo,
541        max_claim_fee: Option<MaxFee>,
542    ) -> Result<spark_wallet::WalletTransfer, SdkError> {
543        info!(
544            "Fetching static deposit claim quote for deposit tx {}:{} and amount: {}",
545            detailed_utxo.txid, detailed_utxo.vout, detailed_utxo.value
546        );
547        let quote = self
548            .spark_wallet
549            .fetch_static_deposit_claim_quote(detailed_utxo.tx.clone(), Some(detailed_utxo.vout))
550            .await?;
551
552        let spark_requested_fee_sats = detailed_utxo.value.saturating_sub(quote.credit_amount_sats);
553
554        let spark_requested_fee_rate = spark_requested_fee_sats.div_ceil(CLAIM_TX_SIZE_VBYTES);
555
556        let Some(max_deposit_claim_fee) = max_claim_fee else {
557            return Err(SdkError::MaxDepositClaimFeeExceeded {
558                tx: detailed_utxo.txid.to_string(),
559                vout: detailed_utxo.vout,
560                max_fee: None,
561                required_fee_sats: spark_requested_fee_sats,
562                required_fee_rate_sat_per_vbyte: spark_requested_fee_rate,
563            });
564        };
565        let max_fee = max_deposit_claim_fee
566            .to_fee(self.chain_service.as_ref())
567            .await?;
568        let max_fee_sats = max_fee.to_sats(CLAIM_TX_SIZE_VBYTES);
569        info!(
570            "User max fee: {} spark requested fee: {}",
571            max_fee_sats, spark_requested_fee_sats
572        );
573        if spark_requested_fee_sats > max_fee_sats {
574            return Err(SdkError::MaxDepositClaimFeeExceeded {
575                tx: detailed_utxo.txid.to_string(),
576                vout: detailed_utxo.vout,
577                max_fee: Some(max_fee),
578                required_fee_sats: spark_requested_fee_sats,
579                required_fee_rate_sat_per_vbyte: spark_requested_fee_rate,
580            });
581        }
582
583        info!(
584            "Claiming static deposit for utxo {}:{}",
585            detailed_utxo.txid, detailed_utxo.vout
586        );
587        let transfer = self.spark_wallet.claim_static_deposit(quote).await?;
588        info!(
589            "Claimed static deposit transfer for utxo {}:{}, value {}",
590            detailed_utxo.txid, detailed_utxo.vout, transfer.total_value_sat,
591        );
592        Ok(transfer)
593    }
594}
595
596#[cfg_attr(feature = "uniffi", uniffi::export(async_runtime = "tokio"))]
597#[allow(clippy::needless_pass_by_value)]
598impl BreezSdk {
599    /// Synchronizes the wallet with the Spark network
600    #[allow(unused_variables)]
601    pub async fn sync_wallet(
602        &self,
603        request: SyncWalletRequest,
604    ) -> Result<SyncWalletResponse, SdkError> {
605        // Use the coordinator to coalesce duplicate sync requests
606        self.sync_coordinator
607            .trigger_sync_and_wait(super::SyncType::Full, true)
608            .await?;
609        Ok(SyncWalletResponse {})
610    }
611}