1use platform_utils::tokio;
2use spark_wallet::WalletEvent;
3use std::sync::Arc;
4use tokio::sync::watch;
5use tracing::{Instrument, debug, error, info, trace, warn};
6use web_time::{Duration, Instant, SystemTime};
7
8use crate::{
9 DepositInfo, InputType, MaxFee, PaymentDetails, PaymentType,
10 error::SdkError,
11 events::{InternalSyncedEvent, SdkEvent},
12 lnurl::ListMetadataRequest,
13 models::{Payment, SyncWalletRequest, SyncWalletResponse},
14 persist::{ObjectCacheRepository, UpdateDepositPayload},
15 sync::SparkSyncService,
16 utils::{
17 deposit_chain_syncer::DepositChainSyncer, payments::get_payment_and_emit_event,
18 run_with_shutdown, utxo_fetcher::DetailedUtxo,
19 },
20};
21
22use super::{
23 BreezSdk, CLAIM_TX_SIZE_VBYTES, SYNC_PAGING_LIMIT, SyncRequest, SyncType,
24 helpers::{BalanceWatcher, update_balances},
25 parse_input,
26};
27
28impl BreezSdk {
29 pub(super) fn periodic_sync(&self, initial_synced_sender: watch::Sender<bool>) {
30 let sdk = self.clone();
31 let mut shutdown_receiver = sdk.shutdown_sender.subscribe();
32 let mut subscription = sdk.spark_wallet.subscribe_events();
33 let sync_coordinator = sdk.sync_coordinator.clone();
34 let mut sync_trigger_receiver = sdk.sync_coordinator.subscribe();
35 let mut last_sync_time = SystemTime::now();
36
37 let sync_interval = u64::from(self.config.sync_interval_secs);
38 let span = tracing::Span::current();
39 tokio::spawn(async move {
40 let balance_watcher =
41 BalanceWatcher::new(sdk.spark_wallet.clone(), sdk.storage.clone());
42 let balance_watcher_id = sdk.add_event_listener(Box::new(balance_watcher)).await;
43 loop {
44 tokio::select! {
45 _ = shutdown_receiver.changed() => {
46 if !sdk.remove_event_listener(&balance_watcher_id).await {
47 error!("Failed to remove balance watcher listener");
48 }
49 info!("Deposit tracking loop shutdown signal received");
50 return;
51 }
52 event = subscription.recv() => {
53 match event {
54 Ok(event) => {
55 info!("Received event: {event}");
56 trace!("Received event: {:?}", event);
57 sdk.handle_wallet_event(event).await;
58 }
59 Err(e) => {
60 error!("Failed to receive event: {e:?}");
61 }
62 }
63 }
64 sync_type_res = sync_trigger_receiver.recv() => {
65 let Ok(sync_request) = sync_type_res else {
66 continue;
67 };
68 info!("Sync trigger changed: {:?}", &sync_request);
69 let cloned_sdk = sdk.clone();
70 let initial_synced_sender = initial_synced_sender.clone();
71 if let Some(true) = Box::pin(run_with_shutdown(shutdown_receiver.clone(), "Sync trigger changed", async move {
72 if let Err(e) = cloned_sdk.sync_wallet_internal(&sync_request).await {
73 error!("Failed to sync wallet: {e:?}");
74 let () = sync_request.reply(Some(e)).await;
75 return false;
76 }
77 let () = sync_request.reply(None).await;
79 if sync_request.sync_type.contains(SyncType::Full) {
81 if let Err(e) = initial_synced_sender.send(true) {
82 error!("Failed to send initial synced signal: {e:?}");
83 }
84 return true;
85 }
86
87 false
88 })).await {
89 last_sync_time = SystemTime::now();
90 }
91 }
92 () = tokio::time::sleep(Duration::from_secs(10)) => {
94 let now = SystemTime::now();
95 if let Ok(elapsed) = now.duration_since(last_sync_time) && elapsed.as_secs() >= sync_interval {
96 sync_coordinator.trigger_sync_no_wait(SyncType::Full, false).await;
97 }
98 }
99 }
100 }
101 }.instrument(span));
102 }
103
104 pub(super) async fn handle_wallet_event(&self, event: WalletEvent) {
105 match event {
106 WalletEvent::DepositConfirmed(_) => {
107 info!("Deposit confirmed");
108 }
109 WalletEvent::StreamConnected => {
110 info!("Stream connected");
111 }
112 WalletEvent::StreamDisconnected => {
113 info!("Stream disconnected");
114 }
115 WalletEvent::Synced => {
116 info!("Synced");
117 self.sync_coordinator
118 .trigger_sync_no_wait(super::SyncType::Full, true)
119 .await;
120 }
121 WalletEvent::TransferClaimed(transfer) => {
122 info!("Transfer claimed");
123 if let Ok(mut payment) = Payment::try_from(transfer) {
124 if let Err(e) = self.storage.insert_payment(payment.clone()).await {
126 error!("Failed to insert succeeded payment: {e:?}");
127 }
128
129 self.sync_single_lnurl_metadata(&mut payment).await;
132
133 let _ = self.lnurl_preimage_trigger.send(());
138
139 if let Err(e) =
142 update_balances(self.spark_wallet.clone(), self.storage.clone()).await
143 {
144 error!("Failed to update balances before PaymentSucceeded event: {e:?}");
145 }
146
147 get_payment_and_emit_event(&self.storage, &self.event_emitter, payment).await;
149 }
150 self.sync_coordinator
151 .trigger_sync_no_wait(super::SyncType::WalletState, true)
152 .await;
153 }
154 WalletEvent::TransferClaimStarting(transfer) => {
155 info!("Transfer claim starting");
156 if let Ok(mut payment) = Payment::try_from(transfer) {
157 if let Err(e) = self.storage.insert_payment(payment.clone()).await {
159 error!("Failed to insert pending payment: {e:?}");
160 }
161
162 self.sync_single_lnurl_metadata(&mut payment).await;
164
165 get_payment_and_emit_event(&self.storage, &self.event_emitter, payment).await;
167 }
168 self.sync_coordinator
169 .trigger_sync_no_wait(super::SyncType::WalletState, true)
170 .await;
171 }
172 WalletEvent::Optimization(event) => {
173 info!("Optimization event: {:?}", event);
174 }
175 }
176 }
177
178 pub(super) async fn sync_single_lnurl_metadata(&self, payment: &mut Payment) {
179 if payment.payment_type != PaymentType::Receive {
180 return;
181 }
182
183 let Some(PaymentDetails::Lightning {
184 invoice,
185 lnurl_receive_metadata,
186 ..
187 }) = &mut payment.details
188 else {
189 return;
190 };
191
192 if lnurl_receive_metadata.is_some() {
193 return;
195 }
196
197 let Ok(input) = parse_input(invoice, None).await else {
198 error!(
199 "Failed to parse invoice for lnurl metadata sync: {}",
200 invoice
201 );
202 return;
203 };
204
205 let InputType::Bolt11Invoice(details) = input else {
206 error!(
207 "Input is not a Bolt11 invoice for lnurl metadata sync: {}",
208 invoice
209 );
210 return;
211 };
212
213 if details.description_hash.is_none() {
215 return;
216 }
217
218 if let Ok(db_payment) = self.storage.get_payment_by_id(payment.id.clone()).await
223 && let Some(PaymentDetails::Lightning {
224 lnurl_receive_metadata: db_lnurl_receive_metadata @ Some(_),
225 ..
226 }) = db_payment.details
227 {
228 *lnurl_receive_metadata = db_lnurl_receive_metadata;
229 return;
230 }
231
232 if let Err(e) = self.sync_lnurl_metadata().await {
236 error!("Failed to sync lnurl metadata for invoice {invoice}: {e}");
237 return;
238 }
239
240 let db_payment = match self.storage.get_payment_by_id(payment.id.clone()).await {
241 Ok(p) => p,
242 Err(e) => {
243 debug!("Payment not found in storage for invoice {}: {e}", invoice);
244 return;
245 }
246 };
247
248 let Some(PaymentDetails::Lightning {
249 lnurl_receive_metadata: db_lnurl_receive_metadata,
250 ..
251 }) = db_payment.details
252 else {
253 debug!(
254 "No lnurl receive metadata in storage for invoice {}",
255 invoice
256 );
257 return;
258 };
259 *lnurl_receive_metadata = db_lnurl_receive_metadata;
260 }
261
262 #[allow(clippy::too_many_lines)]
263 pub(super) async fn sync_wallet_internal(&self, request: &SyncRequest) -> Result<(), SdkError> {
264 let cache = ObjectCacheRepository::new(self.storage.clone());
265 let sync_interval_secs = u64::from(self.config.sync_interval_secs);
266
267 let now = SystemTime::now()
268 .duration_since(SystemTime::UNIX_EPOCH)
269 .map(|d| d.as_secs())
270 .unwrap_or(0);
271
272 if !request.force
274 && let Some(last) = cache.get_last_sync_time().await?
275 && now.saturating_sub(last) < sync_interval_secs
276 {
277 debug!("sync_wallet_internal: Synced recently, skipping");
278 return Ok(());
279 }
280
281 if request.sync_type.contains(SyncType::Full)
283 && let Err(e) = cache.set_last_sync_time(now).await
284 {
285 error!("sync_wallet_internal: Failed to update last sync time: {e:?}");
286 }
287
288 let start_time = Instant::now();
289
290 let sync_wallet = async {
291 let wallet_synced = if request.sync_type.contains(SyncType::Wallet) {
292 debug!("sync_wallet_internal: Starting Wallet sync");
293 let wallet_start = Instant::now();
294 match self.spark_wallet.sync().await {
295 Ok(()) => {
296 debug!(
297 "sync_wallet_internal: Wallet sync completed in {:?}",
298 wallet_start.elapsed()
299 );
300 true
301 }
302 Err(e) => {
303 error!(
304 "sync_wallet_internal: Spark wallet sync failed in {:?}: {e:?}",
305 wallet_start.elapsed()
306 );
307 false
308 }
309 }
310 } else {
311 trace!("sync_wallet_internal: Skipping Wallet sync");
312 false
313 };
314
315 let wallet_state_synced = if request.sync_type.contains(SyncType::WalletState) {
316 debug!("sync_wallet_internal: Starting WalletState sync");
317 let wallet_state_start = Instant::now();
318 match self.sync_wallet_state_to_storage().await {
319 Ok(()) => {
320 debug!(
321 "sync_wallet_internal: WalletState sync completed in {:?}",
322 wallet_state_start.elapsed()
323 );
324 true
325 }
326 Err(e) => {
327 error!(
328 "sync_wallet_internal: Failed to sync wallet state to storage in {:?}: {e:?}",
329 wallet_state_start.elapsed()
330 );
331 false
332 }
333 }
334 } else {
335 trace!("sync_wallet_internal: Skipping WalletState sync");
336 false
337 };
338
339 (wallet_synced, wallet_state_synced)
340 };
341
342 let sync_lnurl = async {
343 if request.sync_type.contains(SyncType::LnurlMetadata) {
344 debug!("sync_wallet_internal: Starting LnurlMetadata sync");
345 let lnurl_start = Instant::now();
346 match self.sync_lnurl_metadata().await {
347 Ok(()) => {
348 debug!(
349 "sync_wallet_internal: LnurlMetadata sync completed in {:?}",
350 lnurl_start.elapsed()
351 );
352 true
353 }
354 Err(e) => {
355 error!(
356 "sync_wallet_internal: Failed to sync lnurl metadata in {:?}: {e:?}",
357 lnurl_start.elapsed()
358 );
359 false
360 }
361 }
362 } else {
363 trace!("sync_wallet_internal: Skipping LnurlMetadata sync");
364 false
365 }
366 };
367
368 let sync_deposits = async {
369 if request.sync_type.contains(SyncType::Deposits) {
370 debug!("sync_wallet_internal: Starting Deposits sync");
371 let deposits_start = Instant::now();
372 match self.check_and_claim_static_deposits().await {
373 Ok(()) => {
374 debug!(
375 "sync_wallet_internal: Deposits sync completed in {:?}",
376 deposits_start.elapsed()
377 );
378 true
379 }
380 Err(e) => {
381 error!(
382 "sync_wallet_internal: Failed to check and claim static deposits in {:?}: {e:?}",
383 deposits_start.elapsed()
384 );
385 false
386 }
387 }
388 } else {
389 trace!("sync_wallet_internal: Skipping Deposits sync");
390 false
391 }
392 };
393
394 let ((wallet, wallet_state), lnurl_metadata, deposits) =
395 tokio::join!(sync_wallet, sync_lnurl, sync_deposits);
396
397 if wallet_state && let Some(stable_balance) = &self.stable_balance {
399 stable_balance.trigger_auto_convert();
400 }
401
402 let elapsed = start_time.elapsed();
403 let event = InternalSyncedEvent {
404 wallet,
405 wallet_state,
406 lnurl_metadata,
407 deposits,
408 storage_incoming: None,
409 };
410 info!("sync_wallet_internal: Wallet sync completed in {elapsed:?}: {event:?}");
411 self.event_emitter.emit_synced(&event).await;
412 Ok(())
413 }
414
415 pub(super) async fn sync_wallet_state_to_storage(&self) -> Result<(), SdkError> {
417 update_balances(self.spark_wallet.clone(), self.storage.clone()).await?;
418
419 let initial_sync_complete = *self.initial_synced_watcher.borrow();
420 let sync_service = SparkSyncService::new(
421 self.spark_wallet.clone(),
422 self.storage.clone(),
423 self.event_emitter.clone(),
424 );
425 sync_service.sync_payments(initial_sync_complete).await?;
426
427 Ok(())
428 }
429
430 pub(super) async fn check_and_claim_static_deposits(&self) -> Result<(), SdkError> {
431 self.ensure_spark_private_mode_initialized().await?;
432 let to_claim = DepositChainSyncer::new(
433 self.chain_service.clone(),
434 self.storage.clone(),
435 self.spark_wallet.clone(),
436 )
437 .sync()
438 .await?;
439
440 let mut claimed_deposits: Vec<DepositInfo> = Vec::new();
441 let mut unclaimed_deposits: Vec<DepositInfo> = Vec::new();
442 for detailed_utxo in to_claim {
443 match self
444 .claim_utxo(&detailed_utxo, self.config.max_deposit_claim_fee.clone())
445 .await
446 {
447 Ok(_) => {
448 info!("Claimed utxo {}:{}", detailed_utxo.txid, detailed_utxo.vout);
449 self.storage
450 .delete_deposit(detailed_utxo.txid.to_string(), detailed_utxo.vout)
451 .await?;
452 claimed_deposits.push(detailed_utxo.into());
453 }
454 Err(e) => {
455 warn!(
456 "Failed to claim utxo {}:{}: {e}",
457 detailed_utxo.txid, detailed_utxo.vout
458 );
459 self.storage
460 .update_deposit(
461 detailed_utxo.txid.to_string(),
462 detailed_utxo.vout,
463 UpdateDepositPayload::ClaimError {
464 error: e.clone().into(),
465 },
466 )
467 .await?;
468 let mut unclaimed_deposit: DepositInfo = detailed_utxo.clone().into();
469 unclaimed_deposit.claim_error = Some(e.into());
470 unclaimed_deposits.push(unclaimed_deposit);
471 }
472 }
473 }
474
475 info!("background claim completed, unclaimed deposits: {unclaimed_deposits:?}");
476
477 if !unclaimed_deposits.is_empty() {
478 self.event_emitter
479 .emit(&SdkEvent::UnclaimedDeposits { unclaimed_deposits })
480 .await;
481 }
482 if !claimed_deposits.is_empty() {
483 self.event_emitter
484 .emit(&SdkEvent::ClaimedDeposits { claimed_deposits })
485 .await;
486 }
487 Ok(())
488 }
489
490 pub(super) async fn sync_lnurl_metadata(&self) -> Result<(), SdkError> {
491 let Some(lnurl_server_client) = self.lnurl_server_client.clone() else {
492 return Ok(());
493 };
494
495 let cache = ObjectCacheRepository::new(Arc::clone(&self.storage));
496 let mut updated_after = cache.fetch_lnurl_metadata_updated_after().await?;
497
498 loop {
499 debug!("Syncing lnurl metadata from updated_after {updated_after}");
500 let metadata = lnurl_server_client
501 .list_metadata(&ListMetadataRequest {
502 offset: None,
503 limit: Some(SYNC_PAGING_LIMIT),
504 updated_after: Some(updated_after),
505 })
506 .await?;
507
508 if metadata.metadata.is_empty() {
509 debug!("No more lnurl metadata on offset {updated_after}");
510 break;
511 }
512
513 let len = u32::try_from(metadata.metadata.len())?;
514 let last_updated_at = metadata.metadata.last().map(|m| m.updated_at);
515 self.storage
516 .set_lnurl_metadata(metadata.metadata.into_iter().map(From::from).collect())
517 .await?;
518
519 debug!(
520 "Synchronized {} lnurl metadata at updated_after {updated_after}",
521 len
522 );
523 updated_after = last_updated_at.unwrap_or(updated_after);
524 cache
525 .save_lnurl_metadata_updated_after(updated_after)
526 .await?;
527
528 let _ = self.lnurl_preimage_trigger.send(());
529 if len < SYNC_PAGING_LIMIT {
530 break;
532 }
533 }
534
535 Ok(())
536 }
537
538 pub(super) async fn claim_utxo(
539 &self,
540 detailed_utxo: &DetailedUtxo,
541 max_claim_fee: Option<MaxFee>,
542 ) -> Result<spark_wallet::WalletTransfer, SdkError> {
543 info!(
544 "Fetching static deposit claim quote for deposit tx {}:{} and amount: {}",
545 detailed_utxo.txid, detailed_utxo.vout, detailed_utxo.value
546 );
547 let quote = self
548 .spark_wallet
549 .fetch_static_deposit_claim_quote(detailed_utxo.tx.clone(), Some(detailed_utxo.vout))
550 .await?;
551
552 let spark_requested_fee_sats = detailed_utxo.value.saturating_sub(quote.credit_amount_sats);
553
554 let spark_requested_fee_rate = spark_requested_fee_sats.div_ceil(CLAIM_TX_SIZE_VBYTES);
555
556 let Some(max_deposit_claim_fee) = max_claim_fee else {
557 return Err(SdkError::MaxDepositClaimFeeExceeded {
558 tx: detailed_utxo.txid.to_string(),
559 vout: detailed_utxo.vout,
560 max_fee: None,
561 required_fee_sats: spark_requested_fee_sats,
562 required_fee_rate_sat_per_vbyte: spark_requested_fee_rate,
563 });
564 };
565 let max_fee = max_deposit_claim_fee
566 .to_fee(self.chain_service.as_ref())
567 .await?;
568 let max_fee_sats = max_fee.to_sats(CLAIM_TX_SIZE_VBYTES);
569 info!(
570 "User max fee: {} spark requested fee: {}",
571 max_fee_sats, spark_requested_fee_sats
572 );
573 if spark_requested_fee_sats > max_fee_sats {
574 return Err(SdkError::MaxDepositClaimFeeExceeded {
575 tx: detailed_utxo.txid.to_string(),
576 vout: detailed_utxo.vout,
577 max_fee: Some(max_fee),
578 required_fee_sats: spark_requested_fee_sats,
579 required_fee_rate_sat_per_vbyte: spark_requested_fee_rate,
580 });
581 }
582
583 info!(
584 "Claiming static deposit for utxo {}:{}",
585 detailed_utxo.txid, detailed_utxo.vout
586 );
587 let transfer = self.spark_wallet.claim_static_deposit(quote).await?;
588 info!(
589 "Claimed static deposit transfer for utxo {}:{}, value {}",
590 detailed_utxo.txid, detailed_utxo.vout, transfer.total_value_sat,
591 );
592 Ok(transfer)
593 }
594}
595
596#[cfg_attr(feature = "uniffi", uniffi::export(async_runtime = "tokio"))]
597#[allow(clippy::needless_pass_by_value)]
598impl BreezSdk {
599 #[allow(unused_variables)]
601 pub async fn sync_wallet(
602 &self,
603 request: SyncWalletRequest,
604 ) -> Result<SyncWalletResponse, SdkError> {
605 self.sync_coordinator
607 .trigger_sync_and_wait(super::SyncType::Full, true)
608 .await?;
609 Ok(SyncWalletResponse {})
610 }
611}