breez_sdk_spark/sdk/
sync.rs

1use platform_utils::time::{Duration, Instant, SystemTime};
2use platform_utils::tokio;
3use spark_wallet::WalletEvent;
4use std::sync::Arc;
5use tokio::sync::watch;
6use tracing::{Instrument, debug, error, info, trace, warn};
7
8use crate::{
9    DepositInfo, InputType, MaxFee, PaymentDetails, PaymentType,
10    error::SdkError,
11    events::{InternalSyncedEvent, SdkEvent},
12    lnurl::ListMetadataRequest,
13    models::{Payment, SyncWalletRequest, SyncWalletResponse},
14    persist::{ObjectCacheRepository, UpdateDepositPayload},
15    sync::SparkSyncService,
16    utils::{
17        deposit_chain_syncer::{DepositChainSyncer, TxOutput},
18        payments::get_payment_and_emit_event,
19        run_with_shutdown,
20        utxo_fetcher::DetailedUtxo,
21    },
22};
23
24use super::{
25    BreezSdk, CLAIM_TX_SIZE_VBYTES, SYNC_PAGING_LIMIT, SyncRequest, SyncType,
26    helpers::{BalanceWatcher, update_balances},
27    parse_input,
28};
29
30impl BreezSdk {
31    pub(super) fn periodic_sync(&self, initial_synced_sender: watch::Sender<bool>) {
32        let sdk = self.clone();
33        let mut shutdown_receiver = sdk.shutdown_sender.subscribe();
34        let mut subscription = sdk.spark_wallet.subscribe_events();
35        let sync_coordinator = sdk.sync_coordinator.clone();
36        let mut sync_trigger_receiver = sdk.sync_coordinator.subscribe();
37        let mut last_sync_time = SystemTime::now();
38
39        let sync_interval = u64::from(self.config.sync_interval_secs);
40        let span = tracing::Span::current();
41        tokio::spawn(async move {
42            let balance_watcher =
43                BalanceWatcher::new(sdk.spark_wallet.clone(), sdk.storage.clone());
44            let balance_watcher_id = sdk.add_event_listener(Box::new(balance_watcher)).await;
45            loop {
46                tokio::select! {
47                    _ = shutdown_receiver.changed() => {
48                        if !sdk.remove_event_listener(&balance_watcher_id).await {
49                            error!("Failed to remove balance watcher listener");
50                        }
51                        info!("Deposit tracking loop shutdown signal received");
52                        return;
53                    }
54                    event = subscription.recv() => {
55                        match event {
56                            Ok(event) => {
57                                info!("Received event: {event}");
58                                trace!("Received event: {:?}", event);
59                                sdk.handle_wallet_event(event).await;
60                            }
61                            Err(e) => {
62                                error!("Failed to receive event: {e:?}");
63                            }
64                        }
65                    }
66                    sync_type_res = sync_trigger_receiver.recv() => {
67                        let Ok(sync_request) = sync_type_res else {
68                            continue;
69                        };
70                        info!("Sync trigger changed: {:?}", &sync_request);
71                        let cloned_sdk = sdk.clone();
72                        let initial_synced_sender = initial_synced_sender.clone();
73                        if let Some(true) = Box::pin(run_with_shutdown(shutdown_receiver.clone(), "Sync trigger changed", async move {
74                            if let Err(e) = cloned_sdk.sync_wallet_internal(&sync_request).await {
75                                error!("Failed to sync wallet: {e:?}");
76                                let () = sync_request.reply(Some(e)).await;
77                                return false;
78                            }
79                            // Notify that the requested sync is complete
80                            let () = sync_request.reply(None).await;
81                            // If this was a full sync, notify the initial synced watcher
82                            if sync_request.sync_type.contains(SyncType::Full) {
83                                if let Err(e) = initial_synced_sender.send(true) {
84                                    error!("Failed to send initial synced signal: {e:?}");
85                                }
86                                return true;
87                            }
88
89                            false
90                        })).await {
91                            last_sync_time = SystemTime::now();
92                        }
93                    }
94                    // Ensure we sync at least the configured interval
95                    () = tokio::time::sleep(Duration::from_secs(10)) => {
96                        let now = SystemTime::now();
97                        if let Ok(elapsed) = now.duration_since(last_sync_time) && elapsed.as_secs() >= sync_interval {
98                            sync_coordinator.trigger_sync_no_wait(SyncType::Full, false).await;
99                        }
100                    }
101                }
102            }
103        }.instrument(span));
104    }
105
106    pub(super) async fn handle_wallet_event(&self, event: WalletEvent) {
107        match event {
108            WalletEvent::DepositConfirmed(_) => {
109                info!("Deposit confirmed");
110            }
111            WalletEvent::StreamConnected => {
112                info!("Stream connected");
113            }
114            WalletEvent::StreamDisconnected => {
115                info!("Stream disconnected");
116            }
117            WalletEvent::Synced => {
118                info!("Synced");
119                self.sync_coordinator
120                    .trigger_sync_no_wait(super::SyncType::Full, true)
121                    .await;
122            }
123            WalletEvent::TransferClaimed(transfer) => {
124                info!("Transfer claimed");
125                if let Ok(mut payment) = Payment::try_from(transfer) {
126                    // Insert the payment into storage to make it immediately available for listing
127                    if let Err(e) = self.storage.insert_payment(payment.clone()).await {
128                        error!("Failed to insert succeeded payment: {e:?}");
129                    }
130
131                    // Ensure potential lnurl metadata is synced before emitting the event.
132                    // Note this is already synced at TransferClaimStarting, but it might not have completed yet, so that could race.
133                    self.sync_single_lnurl_metadata(&mut payment).await;
134
135                    // Update balance before emitting the event so that listeners can immediately
136                    // query the new balance.
137                    if let Err(e) =
138                        update_balances(self.spark_wallet.clone(), self.storage.clone()).await
139                    {
140                        error!("Failed to update balances before PaymentSucceeded event: {e:?}");
141                    }
142
143                    // Fetch the payment to include already stored metadata
144                    get_payment_and_emit_event(&self.storage, &self.event_emitter, payment).await;
145                }
146                self.sync_coordinator
147                    .trigger_sync_no_wait(super::SyncType::WalletState, true)
148                    .await;
149            }
150            WalletEvent::TransferClaimStarting(transfer) => {
151                info!("Transfer claim starting");
152                if let Ok(mut payment) = Payment::try_from(transfer) {
153                    // Insert the payment into storage to make it immediately available for listing
154                    if let Err(e) = self.storage.insert_payment(payment.clone()).await {
155                        error!("Failed to insert pending payment: {e:?}");
156                    }
157
158                    // Ensure potential lnurl metadata is synced before emitting the event
159                    self.sync_single_lnurl_metadata(&mut payment).await;
160
161                    // Fetch the payment to include already stored metadata
162                    get_payment_and_emit_event(&self.storage, &self.event_emitter, payment).await;
163                }
164                self.sync_coordinator
165                    .trigger_sync_no_wait(super::SyncType::WalletState, true)
166                    .await;
167            }
168            WalletEvent::Optimization(event) => {
169                info!("Optimization event: {:?}", event);
170            }
171        }
172    }
173
174    pub(super) async fn sync_single_lnurl_metadata(&self, payment: &mut Payment) {
175        if payment.payment_type != PaymentType::Receive {
176            return;
177        }
178
179        let Some(PaymentDetails::Lightning {
180            invoice,
181            lnurl_receive_metadata,
182            ..
183        }) = &mut payment.details
184        else {
185            return;
186        };
187
188        if lnurl_receive_metadata.is_some() {
189            // Already have lnurl metadata
190            return;
191        }
192
193        let Ok(input) = parse_input(invoice, None).await else {
194            error!(
195                "Failed to parse invoice for lnurl metadata sync: {}",
196                invoice
197            );
198            return;
199        };
200
201        let InputType::Bolt11Invoice(details) = input else {
202            error!(
203                "Input is not a Bolt11 invoice for lnurl metadata sync: {}",
204                invoice
205            );
206            return;
207        };
208
209        // If there is a description hash, we assume this is a lnurl payment.
210        if details.description_hash.is_none() {
211            return;
212        }
213
214        // Let's check whether the lnurl receive metadata was already synced, then return early.
215        // Important: Only return early if metadata is actually present (Some), otherwise we need
216        // to trigger a sync. This prevents a race condition where the payment is in storage but
217        // metadata sync from TransferClaimStarting hasn't completed yet.
218        if let Ok(db_payment) = self.storage.get_payment_by_id(payment.id.clone()).await
219            && let Some(PaymentDetails::Lightning {
220                lnurl_receive_metadata: db_lnurl_receive_metadata @ Some(_),
221                ..
222            }) = db_payment.details
223        {
224            *lnurl_receive_metadata = db_lnurl_receive_metadata;
225            return;
226        }
227
228        // Sync lnurl metadata directly instead of going through the sync trigger,
229        // because this function is called from the sync loop's event handler,
230        // which would deadlock waiting for itself to process the trigger.
231        if let Err(e) = self.sync_lnurl_metadata().await {
232            error!("Failed to sync lnurl metadata for invoice {invoice}: {e}");
233            return;
234        }
235
236        let db_payment = match self.storage.get_payment_by_id(payment.id.clone()).await {
237            Ok(p) => p,
238            Err(e) => {
239                debug!("Payment not found in storage for invoice {}: {e}", invoice);
240                return;
241            }
242        };
243
244        let Some(PaymentDetails::Lightning {
245            lnurl_receive_metadata: db_lnurl_receive_metadata,
246            ..
247        }) = db_payment.details
248        else {
249            debug!(
250                "No lnurl receive metadata in storage for invoice {}",
251                invoice
252            );
253            return;
254        };
255        *lnurl_receive_metadata = db_lnurl_receive_metadata;
256    }
257
258    #[allow(clippy::too_many_lines)]
259    pub(super) async fn sync_wallet_internal(&self, request: &SyncRequest) -> Result<(), SdkError> {
260        let cache = ObjectCacheRepository::new(self.storage.clone());
261        let sync_interval_secs = u64::from(self.config.sync_interval_secs);
262
263        let now = SystemTime::now()
264            .duration_since(SystemTime::UNIX_EPOCH)
265            .map(|d| d.as_secs())
266            .unwrap_or(0);
267
268        // Skip if we synced recently (unless forced).
269        if !request.force
270            && let Some(last) = cache.get_last_sync_time().await?
271            && now.saturating_sub(last) < sync_interval_secs
272        {
273            debug!("sync_wallet_internal: Synced recently, skipping");
274            return Ok(());
275        }
276
277        // Update last sync time if this is a full sync.
278        if request.sync_type.contains(SyncType::Full)
279            && let Err(e) = cache.set_last_sync_time(now).await
280        {
281            error!("sync_wallet_internal: Failed to update last sync time: {e:?}");
282        }
283
284        let start_time = Instant::now();
285
286        let sync_wallet = async {
287            let wallet_synced = if request.sync_type.contains(SyncType::Wallet) {
288                debug!("sync_wallet_internal: Starting Wallet sync");
289                let wallet_start = Instant::now();
290                match self.spark_wallet.sync().await {
291                    Ok(()) => {
292                        debug!(
293                            "sync_wallet_internal: Wallet sync completed in {:?}",
294                            wallet_start.elapsed()
295                        );
296                        true
297                    }
298                    Err(e) => {
299                        error!(
300                            "sync_wallet_internal: Spark wallet sync failed in {:?}: {e:?}",
301                            wallet_start.elapsed()
302                        );
303                        false
304                    }
305                }
306            } else {
307                trace!("sync_wallet_internal: Skipping Wallet sync");
308                false
309            };
310
311            let wallet_state_synced = if request.sync_type.contains(SyncType::WalletState) {
312                debug!("sync_wallet_internal: Starting WalletState sync");
313                let wallet_state_start = Instant::now();
314                match self.sync_wallet_state_to_storage().await {
315                    Ok(()) => {
316                        debug!(
317                            "sync_wallet_internal: WalletState sync completed in {:?}",
318                            wallet_state_start.elapsed()
319                        );
320                        true
321                    }
322                    Err(e) => {
323                        error!(
324                            "sync_wallet_internal: Failed to sync wallet state to storage in {:?}: {e:?}",
325                            wallet_state_start.elapsed()
326                        );
327                        false
328                    }
329                }
330            } else {
331                trace!("sync_wallet_internal: Skipping WalletState sync");
332                false
333            };
334
335            (wallet_synced, wallet_state_synced)
336        };
337
338        let sync_lnurl = async {
339            if request.sync_type.contains(SyncType::LnurlMetadata) {
340                debug!("sync_wallet_internal: Starting LnurlMetadata sync");
341                let lnurl_start = Instant::now();
342                match self.sync_lnurl_metadata().await {
343                    Ok(()) => {
344                        debug!(
345                            "sync_wallet_internal: LnurlMetadata sync completed in {:?}",
346                            lnurl_start.elapsed()
347                        );
348                        true
349                    }
350                    Err(e) => {
351                        error!(
352                            "sync_wallet_internal: Failed to sync lnurl metadata in {:?}: {e:?}",
353                            lnurl_start.elapsed()
354                        );
355                        false
356                    }
357                }
358            } else {
359                trace!("sync_wallet_internal: Skipping LnurlMetadata sync");
360                false
361            }
362        };
363
364        let sync_deposits = async {
365            if request.sync_type.contains(SyncType::Deposits) {
366                debug!("sync_wallet_internal: Starting Deposits sync");
367                let deposits_start = Instant::now();
368                match self.check_and_claim_static_deposits().await {
369                    Ok(()) => {
370                        debug!(
371                            "sync_wallet_internal: Deposits sync completed in {:?}",
372                            deposits_start.elapsed()
373                        );
374                        true
375                    }
376                    Err(e) => {
377                        error!(
378                            "sync_wallet_internal: Failed to check and claim static deposits in {:?}: {e:?}",
379                            deposits_start.elapsed()
380                        );
381                        false
382                    }
383                }
384            } else {
385                trace!("sync_wallet_internal: Skipping Deposits sync");
386                false
387            }
388        };
389
390        let ((wallet, wallet_state), lnurl_metadata, deposits) =
391            tokio::join!(sync_wallet, sync_lnurl, sync_deposits);
392
393        let elapsed = start_time.elapsed();
394        let event = InternalSyncedEvent {
395            wallet,
396            wallet_state,
397            lnurl_metadata,
398            deposits,
399            storage_incoming: None,
400        };
401        info!("sync_wallet_internal: Wallet sync completed in {elapsed:?}: {event:?}");
402        self.event_emitter.emit_synced(&event).await;
403        Ok(())
404    }
405
406    /// Synchronizes wallet state to persistent storage, making sure we have the latest balances and payments.
407    pub(super) async fn sync_wallet_state_to_storage(&self) -> Result<(), SdkError> {
408        update_balances(self.spark_wallet.clone(), self.storage.clone()).await?;
409
410        let initial_sync_complete = *self.initial_synced_watcher.borrow();
411        let sync_service = SparkSyncService::new(
412            self.spark_wallet.clone(),
413            self.storage.clone(),
414            self.event_emitter.clone(),
415        );
416        sync_service.sync_payments(initial_sync_complete).await?;
417
418        Ok(())
419    }
420
421    pub(super) async fn check_and_claim_static_deposits(&self) -> Result<(), SdkError> {
422        self.ensure_spark_private_mode_initialized().await?;
423        let existing_deposits = self.storage.list_deposits().await?;
424        let existing_keys: std::collections::HashSet<TxOutput> = existing_deposits
425            .iter()
426            .map(|d| TxOutput {
427                txid: d.txid.clone(),
428                vout: d.vout,
429            })
430            .collect();
431
432        let all_utxos = DepositChainSyncer::new(
433            self.chain_service.clone(),
434            self.storage.clone(),
435            self.spark_wallet.clone(),
436        )
437        .sync()
438        .await?;
439
440        // Emit NewDeposits for any deposits not previously known
441        let new_deposits: Vec<DepositInfo> = all_utxos
442            .iter()
443            .filter(|(u, _)| {
444                !existing_keys.contains(&TxOutput {
445                    txid: u.txid.to_string(),
446                    vout: u.vout,
447                })
448            })
449            .map(|(u, is_mature)| u.clone().into_deposit_info(*is_mature))
450            .collect();
451        if !new_deposits.is_empty() {
452            self.event_emitter
453                .emit(&SdkEvent::NewDeposits { new_deposits })
454                .await;
455        }
456
457        // Only claim UTXOs with sufficient confirmations
458        let to_claim: Vec<_> = all_utxos
459            .into_iter()
460            .filter(|(_, is_mature)| *is_mature)
461            .map(|(u, _)| u)
462            .collect();
463
464        let mut claimed_deposits: Vec<DepositInfo> = Vec::new();
465        let mut unclaimed_deposits: Vec<DepositInfo> = Vec::new();
466        for detailed_utxo in to_claim {
467            match self
468                .claim_utxo(&detailed_utxo, self.config.max_deposit_claim_fee.clone())
469                .await
470            {
471                Ok(_) => {
472                    info!("Claimed utxo {}:{}", detailed_utxo.txid, detailed_utxo.vout);
473                    self.storage
474                        .delete_deposit(detailed_utxo.txid.to_string(), detailed_utxo.vout)
475                        .await?;
476                    claimed_deposits.push(detailed_utxo.into_deposit_info(true));
477                }
478                Err(e) => {
479                    warn!(
480                        "Failed to claim utxo {}:{}: {e}",
481                        detailed_utxo.txid, detailed_utxo.vout
482                    );
483                    self.storage
484                        .update_deposit(
485                            detailed_utxo.txid.to_string(),
486                            detailed_utxo.vout,
487                            UpdateDepositPayload::ClaimError {
488                                error: e.clone().into(),
489                            },
490                        )
491                        .await?;
492                    let mut unclaimed_deposit = detailed_utxo.into_deposit_info(true);
493                    unclaimed_deposit.claim_error = Some(e.into());
494                    unclaimed_deposits.push(unclaimed_deposit);
495                }
496            }
497        }
498
499        info!("background claim completed, unclaimed deposits: {unclaimed_deposits:?}");
500
501        if !unclaimed_deposits.is_empty() {
502            self.event_emitter
503                .emit(&SdkEvent::UnclaimedDeposits { unclaimed_deposits })
504                .await;
505        }
506        if !claimed_deposits.is_empty() {
507            self.event_emitter
508                .emit(&SdkEvent::ClaimedDeposits { claimed_deposits })
509                .await;
510        }
511        Ok(())
512    }
513
514    pub(super) async fn sync_lnurl_metadata(&self) -> Result<(), SdkError> {
515        let Some(lnurl_server_client) = self.lnurl_server_client.clone() else {
516            return Ok(());
517        };
518
519        let cache = ObjectCacheRepository::new(Arc::clone(&self.storage));
520        let mut updated_after = cache.fetch_lnurl_metadata_updated_after().await?;
521
522        loop {
523            debug!("Syncing lnurl metadata from updated_after {updated_after}");
524            let metadata = lnurl_server_client
525                .list_metadata(&ListMetadataRequest {
526                    offset: None,
527                    limit: Some(SYNC_PAGING_LIMIT),
528                    updated_after: Some(updated_after),
529                })
530                .await?;
531
532            if metadata.metadata.is_empty() {
533                debug!("No more lnurl metadata on offset {updated_after}");
534                break;
535            }
536
537            let len = u32::try_from(metadata.metadata.len())?;
538            let last_updated_at = metadata.metadata.last().map(|m| m.updated_at);
539            self.storage
540                .set_lnurl_metadata(metadata.metadata.into_iter().map(From::from).collect())
541                .await?;
542
543            debug!(
544                "Synchronized {} lnurl metadata at updated_after {updated_after}",
545                len
546            );
547            updated_after = last_updated_at.unwrap_or(updated_after);
548            cache
549                .save_lnurl_metadata_updated_after(updated_after)
550                .await?;
551
552            if len < SYNC_PAGING_LIMIT {
553                // No more invoices to fetch
554                break;
555            }
556        }
557
558        Ok(())
559    }
560
561    pub(super) async fn claim_utxo(
562        &self,
563        detailed_utxo: &DetailedUtxo,
564        max_claim_fee: Option<MaxFee>,
565    ) -> Result<spark_wallet::WalletTransfer, SdkError> {
566        info!(
567            "Fetching static deposit claim quote for deposit tx {}:{} and amount: {}",
568            detailed_utxo.txid, detailed_utxo.vout, detailed_utxo.value
569        );
570        let quote = self
571            .spark_wallet
572            .fetch_static_deposit_claim_quote(detailed_utxo.tx.clone(), Some(detailed_utxo.vout))
573            .await?;
574
575        let spark_requested_fee_sats = detailed_utxo.value.saturating_sub(quote.credit_amount_sats);
576
577        let spark_requested_fee_rate = spark_requested_fee_sats.div_ceil(CLAIM_TX_SIZE_VBYTES);
578
579        let Some(max_deposit_claim_fee) = max_claim_fee else {
580            return Err(SdkError::MaxDepositClaimFeeExceeded {
581                tx: detailed_utxo.txid.to_string(),
582                vout: detailed_utxo.vout,
583                max_fee: None,
584                required_fee_sats: spark_requested_fee_sats,
585                required_fee_rate_sat_per_vbyte: spark_requested_fee_rate,
586            });
587        };
588        let max_fee = max_deposit_claim_fee
589            .to_fee(self.chain_service.as_ref())
590            .await?;
591        let max_fee_sats = max_fee.to_sats(CLAIM_TX_SIZE_VBYTES);
592        info!(
593            "User max fee: {} spark requested fee: {}",
594            max_fee_sats, spark_requested_fee_sats
595        );
596        if spark_requested_fee_sats > max_fee_sats {
597            return Err(SdkError::MaxDepositClaimFeeExceeded {
598                tx: detailed_utxo.txid.to_string(),
599                vout: detailed_utxo.vout,
600                max_fee: Some(max_fee),
601                required_fee_sats: spark_requested_fee_sats,
602                required_fee_rate_sat_per_vbyte: spark_requested_fee_rate,
603            });
604        }
605
606        info!(
607            "Claiming static deposit for utxo {}:{}",
608            detailed_utxo.txid, detailed_utxo.vout
609        );
610        let transfer = self.spark_wallet.claim_static_deposit(quote).await?;
611        info!(
612            "Claimed static deposit transfer for utxo {}:{}, value {}",
613            detailed_utxo.txid, detailed_utxo.vout, transfer.total_value_sat,
614        );
615        Ok(transfer)
616    }
617}
618
619#[cfg_attr(feature = "uniffi", uniffi::export(async_runtime = "tokio"))]
620#[allow(clippy::needless_pass_by_value)]
621impl BreezSdk {
622    /// Synchronizes the wallet with the Spark network
623    #[allow(unused_variables)]
624    pub async fn sync_wallet(
625        &self,
626        request: SyncWalletRequest,
627    ) -> Result<SyncWalletResponse, SdkError> {
628        // Use the coordinator to coalesce duplicate sync requests
629        self.sync_coordinator
630            .trigger_sync_and_wait(super::SyncType::Full, true)
631            .await?;
632        Ok(SyncWalletResponse {})
633    }
634}