1use platform_utils::time::{Duration, Instant, SystemTime};
2use platform_utils::tokio;
3use spark_wallet::WalletEvent;
4use std::sync::Arc;
5use tokio::sync::watch;
6use tracing::{Instrument, debug, error, info, trace, warn};
7
8use crate::{
9 DepositInfo, InputType, MaxFee, PaymentDetails, PaymentType,
10 error::SdkError,
11 events::{InternalSyncedEvent, SdkEvent},
12 lnurl::ListMetadataRequest,
13 models::{Payment, SyncWalletRequest, SyncWalletResponse},
14 persist::{ObjectCacheRepository, UpdateDepositPayload},
15 sync::SparkSyncService,
16 utils::{
17 deposit_chain_syncer::{DepositChainSyncer, TxOutput},
18 payments::get_payment_and_emit_event,
19 run_with_shutdown,
20 utxo_fetcher::DetailedUtxo,
21 },
22};
23
24use super::{
25 BreezSdk, CLAIM_TX_SIZE_VBYTES, SYNC_PAGING_LIMIT, SyncRequest, SyncType,
26 helpers::{BalanceWatcher, update_balances},
27 parse_input,
28};
29
30impl BreezSdk {
31 pub(super) fn periodic_sync(&self, initial_synced_sender: watch::Sender<bool>) {
32 let sdk = self.clone();
33 let mut shutdown_receiver = sdk.shutdown_sender.subscribe();
34 let mut subscription = sdk.spark_wallet.subscribe_events();
35 let sync_coordinator = sdk.sync_coordinator.clone();
36 let mut sync_trigger_receiver = sdk.sync_coordinator.subscribe();
37 let mut last_sync_time = SystemTime::now();
38
39 let sync_interval = u64::from(self.config.sync_interval_secs);
40 let span = tracing::Span::current();
41 tokio::spawn(async move {
42 let balance_watcher =
43 BalanceWatcher::new(sdk.spark_wallet.clone(), sdk.storage.clone());
44 let balance_watcher_id = sdk.add_event_listener(Box::new(balance_watcher)).await;
45 loop {
46 tokio::select! {
47 _ = shutdown_receiver.changed() => {
48 if !sdk.remove_event_listener(&balance_watcher_id).await {
49 error!("Failed to remove balance watcher listener");
50 }
51 info!("Deposit tracking loop shutdown signal received");
52 return;
53 }
54 event = subscription.recv() => {
55 match event {
56 Ok(event) => {
57 info!("Received event: {event}");
58 trace!("Received event: {:?}", event);
59 sdk.handle_wallet_event(event).await;
60 }
61 Err(e) => {
62 error!("Failed to receive event: {e:?}");
63 }
64 }
65 }
66 sync_type_res = sync_trigger_receiver.recv() => {
67 let Ok(sync_request) = sync_type_res else {
68 continue;
69 };
70 info!("Sync trigger changed: {:?}", &sync_request);
71 let cloned_sdk = sdk.clone();
72 let initial_synced_sender = initial_synced_sender.clone();
73 if let Some(true) = Box::pin(run_with_shutdown(shutdown_receiver.clone(), "Sync trigger changed", async move {
74 if let Err(e) = cloned_sdk.sync_wallet_internal(&sync_request).await {
75 error!("Failed to sync wallet: {e:?}");
76 let () = sync_request.reply(Some(e)).await;
77 return false;
78 }
79 let () = sync_request.reply(None).await;
81 if sync_request.sync_type.contains(SyncType::Full) {
83 if let Err(e) = initial_synced_sender.send(true) {
84 error!("Failed to send initial synced signal: {e:?}");
85 }
86 return true;
87 }
88
89 false
90 })).await {
91 last_sync_time = SystemTime::now();
92 }
93 }
94 () = tokio::time::sleep(Duration::from_secs(10)) => {
96 let now = SystemTime::now();
97 if let Ok(elapsed) = now.duration_since(last_sync_time) && elapsed.as_secs() >= sync_interval {
98 sync_coordinator.trigger_sync_no_wait(SyncType::Full, false).await;
99 }
100 }
101 }
102 }
103 }.instrument(span));
104 }
105
106 pub(super) async fn handle_wallet_event(&self, event: WalletEvent) {
107 match event {
108 WalletEvent::DepositConfirmed(_) => {
109 info!("Deposit confirmed");
110 }
111 WalletEvent::StreamConnected => {
112 info!("Stream connected");
113 }
114 WalletEvent::StreamDisconnected => {
115 info!("Stream disconnected");
116 }
117 WalletEvent::Synced => {
118 info!("Synced");
119 self.sync_coordinator
120 .trigger_sync_no_wait(super::SyncType::Full, true)
121 .await;
122 }
123 WalletEvent::TransferClaimed(transfer) => {
124 info!("Transfer claimed");
125 if let Ok(mut payment) = Payment::try_from(transfer) {
126 if let Err(e) = self.storage.insert_payment(payment.clone()).await {
128 error!("Failed to insert succeeded payment: {e:?}");
129 }
130
131 self.sync_single_lnurl_metadata(&mut payment).await;
134
135 if let Err(e) =
138 update_balances(self.spark_wallet.clone(), self.storage.clone()).await
139 {
140 error!("Failed to update balances before PaymentSucceeded event: {e:?}");
141 }
142
143 get_payment_and_emit_event(&self.storage, &self.event_emitter, payment).await;
145 }
146 self.sync_coordinator
147 .trigger_sync_no_wait(super::SyncType::WalletState, true)
148 .await;
149 }
150 WalletEvent::TransferClaimStarting(transfer) => {
151 info!("Transfer claim starting");
152 if let Ok(mut payment) = Payment::try_from(transfer) {
153 if let Err(e) = self.storage.insert_payment(payment.clone()).await {
155 error!("Failed to insert pending payment: {e:?}");
156 }
157
158 self.sync_single_lnurl_metadata(&mut payment).await;
160
161 get_payment_and_emit_event(&self.storage, &self.event_emitter, payment).await;
163 }
164 self.sync_coordinator
165 .trigger_sync_no_wait(super::SyncType::WalletState, true)
166 .await;
167 }
168 WalletEvent::Optimization(event) => {
169 info!("Optimization event: {:?}", event);
170 }
171 }
172 }
173
174 pub(super) async fn sync_single_lnurl_metadata(&self, payment: &mut Payment) {
175 if payment.payment_type != PaymentType::Receive {
176 return;
177 }
178
179 let Some(PaymentDetails::Lightning {
180 invoice,
181 lnurl_receive_metadata,
182 ..
183 }) = &mut payment.details
184 else {
185 return;
186 };
187
188 if lnurl_receive_metadata.is_some() {
189 return;
191 }
192
193 let Ok(input) = parse_input(invoice, None).await else {
194 error!(
195 "Failed to parse invoice for lnurl metadata sync: {}",
196 invoice
197 );
198 return;
199 };
200
201 let InputType::Bolt11Invoice(details) = input else {
202 error!(
203 "Input is not a Bolt11 invoice for lnurl metadata sync: {}",
204 invoice
205 );
206 return;
207 };
208
209 if details.description_hash.is_none() {
211 return;
212 }
213
214 if let Ok(db_payment) = self.storage.get_payment_by_id(payment.id.clone()).await
219 && let Some(PaymentDetails::Lightning {
220 lnurl_receive_metadata: db_lnurl_receive_metadata @ Some(_),
221 ..
222 }) = db_payment.details
223 {
224 *lnurl_receive_metadata = db_lnurl_receive_metadata;
225 return;
226 }
227
228 if let Err(e) = self.sync_lnurl_metadata().await {
232 error!("Failed to sync lnurl metadata for invoice {invoice}: {e}");
233 return;
234 }
235
236 let db_payment = match self.storage.get_payment_by_id(payment.id.clone()).await {
237 Ok(p) => p,
238 Err(e) => {
239 debug!("Payment not found in storage for invoice {}: {e}", invoice);
240 return;
241 }
242 };
243
244 let Some(PaymentDetails::Lightning {
245 lnurl_receive_metadata: db_lnurl_receive_metadata,
246 ..
247 }) = db_payment.details
248 else {
249 debug!(
250 "No lnurl receive metadata in storage for invoice {}",
251 invoice
252 );
253 return;
254 };
255 *lnurl_receive_metadata = db_lnurl_receive_metadata;
256 }
257
258 #[allow(clippy::too_many_lines)]
259 pub(super) async fn sync_wallet_internal(&self, request: &SyncRequest) -> Result<(), SdkError> {
260 let cache = ObjectCacheRepository::new(self.storage.clone());
261 let sync_interval_secs = u64::from(self.config.sync_interval_secs);
262
263 let now = SystemTime::now()
264 .duration_since(SystemTime::UNIX_EPOCH)
265 .map(|d| d.as_secs())
266 .unwrap_or(0);
267
268 if !request.force
270 && let Some(last) = cache.get_last_sync_time().await?
271 && now.saturating_sub(last) < sync_interval_secs
272 {
273 debug!("sync_wallet_internal: Synced recently, skipping");
274 return Ok(());
275 }
276
277 if request.sync_type.contains(SyncType::Full)
279 && let Err(e) = cache.set_last_sync_time(now).await
280 {
281 error!("sync_wallet_internal: Failed to update last sync time: {e:?}");
282 }
283
284 let start_time = Instant::now();
285
286 let sync_wallet = async {
287 let wallet_synced = if request.sync_type.contains(SyncType::Wallet) {
288 debug!("sync_wallet_internal: Starting Wallet sync");
289 let wallet_start = Instant::now();
290 match self.spark_wallet.sync().await {
291 Ok(()) => {
292 debug!(
293 "sync_wallet_internal: Wallet sync completed in {:?}",
294 wallet_start.elapsed()
295 );
296 true
297 }
298 Err(e) => {
299 error!(
300 "sync_wallet_internal: Spark wallet sync failed in {:?}: {e:?}",
301 wallet_start.elapsed()
302 );
303 false
304 }
305 }
306 } else {
307 trace!("sync_wallet_internal: Skipping Wallet sync");
308 false
309 };
310
311 let wallet_state_synced = if request.sync_type.contains(SyncType::WalletState) {
312 debug!("sync_wallet_internal: Starting WalletState sync");
313 let wallet_state_start = Instant::now();
314 match self.sync_wallet_state_to_storage().await {
315 Ok(()) => {
316 debug!(
317 "sync_wallet_internal: WalletState sync completed in {:?}",
318 wallet_state_start.elapsed()
319 );
320 true
321 }
322 Err(e) => {
323 error!(
324 "sync_wallet_internal: Failed to sync wallet state to storage in {:?}: {e:?}",
325 wallet_state_start.elapsed()
326 );
327 false
328 }
329 }
330 } else {
331 trace!("sync_wallet_internal: Skipping WalletState sync");
332 false
333 };
334
335 (wallet_synced, wallet_state_synced)
336 };
337
338 let sync_lnurl = async {
339 if request.sync_type.contains(SyncType::LnurlMetadata) {
340 debug!("sync_wallet_internal: Starting LnurlMetadata sync");
341 let lnurl_start = Instant::now();
342 match self.sync_lnurl_metadata().await {
343 Ok(()) => {
344 debug!(
345 "sync_wallet_internal: LnurlMetadata sync completed in {:?}",
346 lnurl_start.elapsed()
347 );
348 true
349 }
350 Err(e) => {
351 error!(
352 "sync_wallet_internal: Failed to sync lnurl metadata in {:?}: {e:?}",
353 lnurl_start.elapsed()
354 );
355 false
356 }
357 }
358 } else {
359 trace!("sync_wallet_internal: Skipping LnurlMetadata sync");
360 false
361 }
362 };
363
364 let sync_deposits = async {
365 if request.sync_type.contains(SyncType::Deposits) {
366 debug!("sync_wallet_internal: Starting Deposits sync");
367 let deposits_start = Instant::now();
368 match self.check_and_claim_static_deposits().await {
369 Ok(()) => {
370 debug!(
371 "sync_wallet_internal: Deposits sync completed in {:?}",
372 deposits_start.elapsed()
373 );
374 true
375 }
376 Err(e) => {
377 error!(
378 "sync_wallet_internal: Failed to check and claim static deposits in {:?}: {e:?}",
379 deposits_start.elapsed()
380 );
381 false
382 }
383 }
384 } else {
385 trace!("sync_wallet_internal: Skipping Deposits sync");
386 false
387 }
388 };
389
390 let ((wallet, wallet_state), lnurl_metadata, deposits) =
391 tokio::join!(sync_wallet, sync_lnurl, sync_deposits);
392
393 let elapsed = start_time.elapsed();
394 let event = InternalSyncedEvent {
395 wallet,
396 wallet_state,
397 lnurl_metadata,
398 deposits,
399 storage_incoming: None,
400 };
401 info!("sync_wallet_internal: Wallet sync completed in {elapsed:?}: {event:?}");
402 self.event_emitter.emit_synced(&event).await;
403 Ok(())
404 }
405
406 pub(super) async fn sync_wallet_state_to_storage(&self) -> Result<(), SdkError> {
408 update_balances(self.spark_wallet.clone(), self.storage.clone()).await?;
409
410 let initial_sync_complete = *self.initial_synced_watcher.borrow();
411 let sync_service = SparkSyncService::new(
412 self.spark_wallet.clone(),
413 self.storage.clone(),
414 self.event_emitter.clone(),
415 );
416 sync_service.sync_payments(initial_sync_complete).await?;
417
418 Ok(())
419 }
420
421 pub(super) async fn check_and_claim_static_deposits(&self) -> Result<(), SdkError> {
422 self.ensure_spark_private_mode_initialized().await?;
423 let existing_deposits = self.storage.list_deposits().await?;
424 let existing_keys: std::collections::HashSet<TxOutput> = existing_deposits
425 .iter()
426 .map(|d| TxOutput {
427 txid: d.txid.clone(),
428 vout: d.vout,
429 })
430 .collect();
431
432 let all_utxos = DepositChainSyncer::new(
433 self.chain_service.clone(),
434 self.storage.clone(),
435 self.spark_wallet.clone(),
436 )
437 .sync()
438 .await?;
439
440 let new_deposits: Vec<DepositInfo> = all_utxos
442 .iter()
443 .filter(|(u, _)| {
444 !existing_keys.contains(&TxOutput {
445 txid: u.txid.to_string(),
446 vout: u.vout,
447 })
448 })
449 .map(|(u, is_mature)| u.clone().into_deposit_info(*is_mature))
450 .collect();
451 if !new_deposits.is_empty() {
452 self.event_emitter
453 .emit(&SdkEvent::NewDeposits { new_deposits })
454 .await;
455 }
456
457 let to_claim: Vec<_> = all_utxos
459 .into_iter()
460 .filter(|(_, is_mature)| *is_mature)
461 .map(|(u, _)| u)
462 .collect();
463
464 let mut claimed_deposits: Vec<DepositInfo> = Vec::new();
465 let mut unclaimed_deposits: Vec<DepositInfo> = Vec::new();
466 for detailed_utxo in to_claim {
467 match self
468 .claim_utxo(&detailed_utxo, self.config.max_deposit_claim_fee.clone())
469 .await
470 {
471 Ok(_) => {
472 info!("Claimed utxo {}:{}", detailed_utxo.txid, detailed_utxo.vout);
473 self.storage
474 .delete_deposit(detailed_utxo.txid.to_string(), detailed_utxo.vout)
475 .await?;
476 claimed_deposits.push(detailed_utxo.into_deposit_info(true));
477 }
478 Err(e) => {
479 warn!(
480 "Failed to claim utxo {}:{}: {e}",
481 detailed_utxo.txid, detailed_utxo.vout
482 );
483 self.storage
484 .update_deposit(
485 detailed_utxo.txid.to_string(),
486 detailed_utxo.vout,
487 UpdateDepositPayload::ClaimError {
488 error: e.clone().into(),
489 },
490 )
491 .await?;
492 let mut unclaimed_deposit = detailed_utxo.into_deposit_info(true);
493 unclaimed_deposit.claim_error = Some(e.into());
494 unclaimed_deposits.push(unclaimed_deposit);
495 }
496 }
497 }
498
499 info!("background claim completed, unclaimed deposits: {unclaimed_deposits:?}");
500
501 if !unclaimed_deposits.is_empty() {
502 self.event_emitter
503 .emit(&SdkEvent::UnclaimedDeposits { unclaimed_deposits })
504 .await;
505 }
506 if !claimed_deposits.is_empty() {
507 self.event_emitter
508 .emit(&SdkEvent::ClaimedDeposits { claimed_deposits })
509 .await;
510 }
511 Ok(())
512 }
513
514 pub(super) async fn sync_lnurl_metadata(&self) -> Result<(), SdkError> {
515 let Some(lnurl_server_client) = self.lnurl_server_client.clone() else {
516 return Ok(());
517 };
518
519 let cache = ObjectCacheRepository::new(Arc::clone(&self.storage));
520 let mut updated_after = cache.fetch_lnurl_metadata_updated_after().await?;
521
522 loop {
523 debug!("Syncing lnurl metadata from updated_after {updated_after}");
524 let metadata = lnurl_server_client
525 .list_metadata(&ListMetadataRequest {
526 offset: None,
527 limit: Some(SYNC_PAGING_LIMIT),
528 updated_after: Some(updated_after),
529 })
530 .await?;
531
532 if metadata.metadata.is_empty() {
533 debug!("No more lnurl metadata on offset {updated_after}");
534 break;
535 }
536
537 let len = u32::try_from(metadata.metadata.len())?;
538 let last_updated_at = metadata.metadata.last().map(|m| m.updated_at);
539 self.storage
540 .set_lnurl_metadata(metadata.metadata.into_iter().map(From::from).collect())
541 .await?;
542
543 debug!(
544 "Synchronized {} lnurl metadata at updated_after {updated_after}",
545 len
546 );
547 updated_after = last_updated_at.unwrap_or(updated_after);
548 cache
549 .save_lnurl_metadata_updated_after(updated_after)
550 .await?;
551
552 if len < SYNC_PAGING_LIMIT {
553 break;
555 }
556 }
557
558 Ok(())
559 }
560
561 pub(super) async fn claim_utxo(
562 &self,
563 detailed_utxo: &DetailedUtxo,
564 max_claim_fee: Option<MaxFee>,
565 ) -> Result<spark_wallet::WalletTransfer, SdkError> {
566 info!(
567 "Fetching static deposit claim quote for deposit tx {}:{} and amount: {}",
568 detailed_utxo.txid, detailed_utxo.vout, detailed_utxo.value
569 );
570 let quote = self
571 .spark_wallet
572 .fetch_static_deposit_claim_quote(detailed_utxo.tx.clone(), Some(detailed_utxo.vout))
573 .await?;
574
575 let spark_requested_fee_sats = detailed_utxo.value.saturating_sub(quote.credit_amount_sats);
576
577 let spark_requested_fee_rate = spark_requested_fee_sats.div_ceil(CLAIM_TX_SIZE_VBYTES);
578
579 let Some(max_deposit_claim_fee) = max_claim_fee else {
580 return Err(SdkError::MaxDepositClaimFeeExceeded {
581 tx: detailed_utxo.txid.to_string(),
582 vout: detailed_utxo.vout,
583 max_fee: None,
584 required_fee_sats: spark_requested_fee_sats,
585 required_fee_rate_sat_per_vbyte: spark_requested_fee_rate,
586 });
587 };
588 let max_fee = max_deposit_claim_fee
589 .to_fee(self.chain_service.as_ref())
590 .await?;
591 let max_fee_sats = max_fee.to_sats(CLAIM_TX_SIZE_VBYTES);
592 info!(
593 "User max fee: {} spark requested fee: {}",
594 max_fee_sats, spark_requested_fee_sats
595 );
596 if spark_requested_fee_sats > max_fee_sats {
597 return Err(SdkError::MaxDepositClaimFeeExceeded {
598 tx: detailed_utxo.txid.to_string(),
599 vout: detailed_utxo.vout,
600 max_fee: Some(max_fee),
601 required_fee_sats: spark_requested_fee_sats,
602 required_fee_rate_sat_per_vbyte: spark_requested_fee_rate,
603 });
604 }
605
606 info!(
607 "Claiming static deposit for utxo {}:{}",
608 detailed_utxo.txid, detailed_utxo.vout
609 );
610 let transfer = self.spark_wallet.claim_static_deposit(quote).await?;
611 info!(
612 "Claimed static deposit transfer for utxo {}:{}, value {}",
613 detailed_utxo.txid, detailed_utxo.vout, transfer.total_value_sat,
614 );
615 Ok(transfer)
616 }
617}
618
619#[cfg_attr(feature = "uniffi", uniffi::export(async_runtime = "tokio"))]
620#[allow(clippy::needless_pass_by_value)]
621impl BreezSdk {
622 #[allow(unused_variables)]
624 pub async fn sync_wallet(
625 &self,
626 request: SyncWalletRequest,
627 ) -> Result<SyncWalletResponse, SdkError> {
628 self.sync_coordinator
630 .trigger_sync_and_wait(super::SyncType::Full, true)
631 .await?;
632 Ok(SyncWalletResponse {})
633 }
634}