1use spark_wallet::WalletEvent;
2use std::sync::Arc;
3use tokio::sync::watch;
4use tokio_with_wasm::alias as tokio;
5use tracing::{Instrument, debug, error, info, trace, warn};
6use web_time::{Duration, Instant, SystemTime};
7
8use crate::{
9 DepositInfo, InputType, MaxFee, PaymentDetails, PaymentType,
10 error::SdkError,
11 events::{InternalSyncedEvent, SdkEvent},
12 lnurl::ListMetadataRequest,
13 models::{Payment, SyncWalletRequest, SyncWalletResponse},
14 persist::{ObjectCacheRepository, UpdateDepositPayload},
15 sync::SparkSyncService,
16 utils::{
17 deposit_chain_syncer::DepositChainSyncer, run_with_shutdown, utxo_fetcher::DetailedUtxo,
18 },
19};
20
21use super::{
22 BreezSdk, CLAIM_TX_SIZE_VBYTES, SYNC_PAGING_LIMIT, SyncRequest, SyncType,
23 helpers::{BalanceWatcher, update_balances},
24 parse_input,
25};
26
27impl BreezSdk {
28 pub(super) fn periodic_sync(&self, initial_synced_sender: watch::Sender<bool>) {
29 let sdk = self.clone();
30 let mut shutdown_receiver = sdk.shutdown_sender.subscribe();
31 let mut subscription = sdk.spark_wallet.subscribe_events();
32 let sync_coordinator = sdk.sync_coordinator.clone();
33 let mut sync_trigger_receiver = sdk.sync_coordinator.subscribe();
34 let mut last_sync_time = SystemTime::now();
35
36 let sync_interval = u64::from(self.config.sync_interval_secs);
37 let span = tracing::Span::current();
38 tokio::spawn(async move {
39 let balance_watcher =
40 BalanceWatcher::new(sdk.spark_wallet.clone(), sdk.storage.clone());
41 let balance_watcher_id = sdk.add_event_listener(Box::new(balance_watcher)).await;
42 loop {
43 tokio::select! {
44 _ = shutdown_receiver.changed() => {
45 if !sdk.remove_event_listener(&balance_watcher_id).await {
46 error!("Failed to remove balance watcher listener");
47 }
48 info!("Deposit tracking loop shutdown signal received");
49 return;
50 }
51 event = subscription.recv() => {
52 match event {
53 Ok(event) => {
54 info!("Received event: {event}");
55 trace!("Received event: {:?}", event);
56 sdk.handle_wallet_event(event).await;
57 }
58 Err(e) => {
59 error!("Failed to receive event: {e:?}");
60 }
61 }
62 }
63 sync_type_res = sync_trigger_receiver.recv() => {
64 let Ok(sync_request) = sync_type_res else {
65 continue;
66 };
67 info!("Sync trigger changed: {:?}", &sync_request);
68 let cloned_sdk = sdk.clone();
69 let initial_synced_sender = initial_synced_sender.clone();
70 if let Some(true) = Box::pin(run_with_shutdown(shutdown_receiver.clone(), "Sync trigger changed", async move {
71 if let Err(e) = cloned_sdk.sync_wallet_internal(&sync_request).await {
72 error!("Failed to sync wallet: {e:?}");
73 let () = sync_request.reply(Some(e)).await;
74 return false;
75 }
76 let () = sync_request.reply(None).await;
78 if sync_request.sync_type.contains(SyncType::Full) {
80 if let Err(e) = initial_synced_sender.send(true) {
81 error!("Failed to send initial synced signal: {e:?}");
82 }
83 return true;
84 }
85
86 false
87 })).await {
88 last_sync_time = SystemTime::now();
89 }
90 }
91 () = tokio::time::sleep(Duration::from_secs(10)) => {
93 let now = SystemTime::now();
94 if let Ok(elapsed) = now.duration_since(last_sync_time) && elapsed.as_secs() >= sync_interval {
95 sync_coordinator.trigger_sync_no_wait(SyncType::Full, false).await;
96 }
97 }
98 }
99 }
100 }.instrument(span));
101 }
102
103 pub(super) async fn handle_wallet_event(&self, event: WalletEvent) {
104 match event {
105 WalletEvent::DepositConfirmed(_) => {
106 info!("Deposit confirmed");
107 }
108 WalletEvent::StreamConnected => {
109 info!("Stream connected");
110 }
111 WalletEvent::StreamDisconnected => {
112 info!("Stream disconnected");
113 }
114 WalletEvent::Synced => {
115 info!("Synced");
116 self.sync_coordinator
117 .trigger_sync_no_wait(super::SyncType::Full, true)
118 .await;
119 }
120 WalletEvent::TransferClaimed(transfer) => {
121 info!("Transfer claimed");
122 if let Ok(mut payment) = Payment::try_from(transfer) {
123 if let Err(e) = self.storage.insert_payment(payment.clone()).await {
125 error!("Failed to insert succeeded payment: {e:?}");
126 }
127
128 self.sync_single_lnurl_metadata(&mut payment).await;
131
132 let _ = self.lnurl_preimage_trigger.send(());
137
138 self.event_emitter
139 .emit(&SdkEvent::PaymentSucceeded { payment })
140 .await;
141 }
142 self.sync_coordinator
143 .trigger_sync_no_wait(super::SyncType::WalletState, true)
144 .await;
145 }
146 WalletEvent::TransferClaimStarting(transfer) => {
147 info!("Transfer claim starting");
148 if let Ok(mut payment) = Payment::try_from(transfer) {
149 if let Err(e) = self.storage.insert_payment(payment.clone()).await {
151 error!("Failed to insert pending payment: {e:?}");
152 }
153
154 self.sync_single_lnurl_metadata(&mut payment).await;
156
157 self.event_emitter
158 .emit(&SdkEvent::PaymentPending { payment })
159 .await;
160 }
161 self.sync_coordinator
162 .trigger_sync_no_wait(super::SyncType::WalletState, true)
163 .await;
164 }
165 WalletEvent::Optimization(event) => {
166 info!("Optimization event: {:?}", event);
167 }
168 }
169 }
170
171 pub(super) async fn sync_single_lnurl_metadata(&self, payment: &mut Payment) {
172 if payment.payment_type != PaymentType::Receive {
173 return;
174 }
175
176 let Some(PaymentDetails::Lightning {
177 invoice,
178 lnurl_receive_metadata,
179 ..
180 }) = &mut payment.details
181 else {
182 return;
183 };
184
185 if lnurl_receive_metadata.is_some() {
186 return;
188 }
189
190 let Ok(input) = parse_input(invoice, None).await else {
191 error!(
192 "Failed to parse invoice for lnurl metadata sync: {}",
193 invoice
194 );
195 return;
196 };
197
198 let InputType::Bolt11Invoice(details) = input else {
199 error!(
200 "Input is not a Bolt11 invoice for lnurl metadata sync: {}",
201 invoice
202 );
203 return;
204 };
205
206 if details.description_hash.is_none() {
208 return;
209 }
210
211 if let Ok(db_payment) = self.storage.get_payment_by_id(payment.id.clone()).await
216 && let Some(PaymentDetails::Lightning {
217 lnurl_receive_metadata: db_lnurl_receive_metadata @ Some(_),
218 ..
219 }) = db_payment.details
220 {
221 *lnurl_receive_metadata = db_lnurl_receive_metadata;
222 return;
223 }
224
225 if let Err(e) = self.sync_lnurl_metadata().await {
229 error!("Failed to sync lnurl metadata for invoice {invoice}: {e}");
230 return;
231 }
232
233 let db_payment = match self.storage.get_payment_by_id(payment.id.clone()).await {
234 Ok(p) => p,
235 Err(e) => {
236 debug!("Payment not found in storage for invoice {}: {e}", invoice);
237 return;
238 }
239 };
240
241 let Some(PaymentDetails::Lightning {
242 lnurl_receive_metadata: db_lnurl_receive_metadata,
243 ..
244 }) = db_payment.details
245 else {
246 debug!(
247 "No lnurl receive metadata in storage for invoice {}",
248 invoice
249 );
250 return;
251 };
252 *lnurl_receive_metadata = db_lnurl_receive_metadata;
253 }
254
255 #[allow(clippy::too_many_lines)]
256 pub(super) async fn sync_wallet_internal(&self, request: &SyncRequest) -> Result<(), SdkError> {
257 let cache = ObjectCacheRepository::new(self.storage.clone());
258 let sync_interval_secs = u64::from(self.config.sync_interval_secs);
259
260 let now = SystemTime::now()
261 .duration_since(SystemTime::UNIX_EPOCH)
262 .map(|d| d.as_secs())
263 .unwrap_or(0);
264
265 if !request.force
267 && let Some(last) = cache.get_last_sync_time().await?
268 && now.saturating_sub(last) < sync_interval_secs
269 {
270 debug!("sync_wallet_internal: Synced recently, skipping");
271 return Ok(());
272 }
273
274 if request.sync_type.contains(SyncType::Full)
276 && let Err(e) = cache.set_last_sync_time(now).await
277 {
278 error!("sync_wallet_internal: Failed to update last sync time: {e:?}");
279 }
280
281 let start_time = Instant::now();
282
283 let sync_wallet = async {
284 let wallet_synced = if request.sync_type.contains(SyncType::Wallet) {
285 debug!("sync_wallet_internal: Starting Wallet sync");
286 let wallet_start = Instant::now();
287 match self.spark_wallet.sync().await {
288 Ok(()) => {
289 debug!(
290 "sync_wallet_internal: Wallet sync completed in {:?}",
291 wallet_start.elapsed()
292 );
293 true
294 }
295 Err(e) => {
296 error!(
297 "sync_wallet_internal: Spark wallet sync failed in {:?}: {e:?}",
298 wallet_start.elapsed()
299 );
300 false
301 }
302 }
303 } else {
304 trace!("sync_wallet_internal: Skipping Wallet sync");
305 false
306 };
307
308 let wallet_state_synced = if request.sync_type.contains(SyncType::WalletState) {
309 debug!("sync_wallet_internal: Starting WalletState sync");
310 let wallet_state_start = Instant::now();
311 match self.sync_wallet_state_to_storage().await {
312 Ok(()) => {
313 debug!(
314 "sync_wallet_internal: WalletState sync completed in {:?}",
315 wallet_state_start.elapsed()
316 );
317 true
318 }
319 Err(e) => {
320 error!(
321 "sync_wallet_internal: Failed to sync wallet state to storage in {:?}: {e:?}",
322 wallet_state_start.elapsed()
323 );
324 false
325 }
326 }
327 } else {
328 trace!("sync_wallet_internal: Skipping WalletState sync");
329 false
330 };
331
332 (wallet_synced, wallet_state_synced)
333 };
334
335 let sync_lnurl = async {
336 if request.sync_type.contains(SyncType::LnurlMetadata) {
337 debug!("sync_wallet_internal: Starting LnurlMetadata sync");
338 let lnurl_start = Instant::now();
339 match self.sync_lnurl_metadata().await {
340 Ok(()) => {
341 debug!(
342 "sync_wallet_internal: LnurlMetadata sync completed in {:?}",
343 lnurl_start.elapsed()
344 );
345 true
346 }
347 Err(e) => {
348 error!(
349 "sync_wallet_internal: Failed to sync lnurl metadata in {:?}: {e:?}",
350 lnurl_start.elapsed()
351 );
352 false
353 }
354 }
355 } else {
356 trace!("sync_wallet_internal: Skipping LnurlMetadata sync");
357 false
358 }
359 };
360
361 let sync_deposits = async {
362 if request.sync_type.contains(SyncType::Deposits) {
363 debug!("sync_wallet_internal: Starting Deposits sync");
364 let deposits_start = Instant::now();
365 match self.check_and_claim_static_deposits().await {
366 Ok(()) => {
367 debug!(
368 "sync_wallet_internal: Deposits sync completed in {:?}",
369 deposits_start.elapsed()
370 );
371 true
372 }
373 Err(e) => {
374 error!(
375 "sync_wallet_internal: Failed to check and claim static deposits in {:?}: {e:?}",
376 deposits_start.elapsed()
377 );
378 false
379 }
380 }
381 } else {
382 trace!("sync_wallet_internal: Skipping Deposits sync");
383 false
384 }
385 };
386
387 let ((wallet, wallet_state), lnurl_metadata, deposits) =
388 tokio::join!(sync_wallet, sync_lnurl, sync_deposits);
389
390 if wallet_state && let Some(stable_balance) = &self.stable_balance {
392 stable_balance.trigger_auto_convert();
393 }
394
395 let elapsed = start_time.elapsed();
396 let event = InternalSyncedEvent {
397 wallet,
398 wallet_state,
399 lnurl_metadata,
400 deposits,
401 storage_incoming: None,
402 };
403 info!("sync_wallet_internal: Wallet sync completed in {elapsed:?}: {event:?}");
404 self.event_emitter.emit_synced(&event).await;
405 Ok(())
406 }
407
408 pub(super) async fn sync_wallet_state_to_storage(&self) -> Result<(), SdkError> {
410 update_balances(self.spark_wallet.clone(), self.storage.clone()).await?;
411
412 let initial_sync_complete = *self.initial_synced_watcher.borrow();
413 let sync_service = SparkSyncService::new(
414 self.spark_wallet.clone(),
415 self.storage.clone(),
416 self.event_emitter.clone(),
417 );
418 sync_service.sync_payments(initial_sync_complete).await?;
419
420 Ok(())
421 }
422
423 pub(super) async fn check_and_claim_static_deposits(&self) -> Result<(), SdkError> {
424 self.ensure_spark_private_mode_initialized().await?;
425 let to_claim = DepositChainSyncer::new(
426 self.chain_service.clone(),
427 self.storage.clone(),
428 self.spark_wallet.clone(),
429 )
430 .sync()
431 .await?;
432
433 let mut claimed_deposits: Vec<DepositInfo> = Vec::new();
434 let mut unclaimed_deposits: Vec<DepositInfo> = Vec::new();
435 for detailed_utxo in to_claim {
436 match self
437 .claim_utxo(&detailed_utxo, self.config.max_deposit_claim_fee.clone())
438 .await
439 {
440 Ok(_) => {
441 info!("Claimed utxo {}:{}", detailed_utxo.txid, detailed_utxo.vout);
442 self.storage
443 .delete_deposit(detailed_utxo.txid.to_string(), detailed_utxo.vout)
444 .await?;
445 claimed_deposits.push(detailed_utxo.into());
446 }
447 Err(e) => {
448 warn!(
449 "Failed to claim utxo {}:{}: {e}",
450 detailed_utxo.txid, detailed_utxo.vout
451 );
452 self.storage
453 .update_deposit(
454 detailed_utxo.txid.to_string(),
455 detailed_utxo.vout,
456 UpdateDepositPayload::ClaimError {
457 error: e.clone().into(),
458 },
459 )
460 .await?;
461 let mut unclaimed_deposit: DepositInfo = detailed_utxo.clone().into();
462 unclaimed_deposit.claim_error = Some(e.into());
463 unclaimed_deposits.push(unclaimed_deposit);
464 }
465 }
466 }
467
468 info!("background claim completed, unclaimed deposits: {unclaimed_deposits:?}");
469
470 if !unclaimed_deposits.is_empty() {
471 self.event_emitter
472 .emit(&SdkEvent::UnclaimedDeposits { unclaimed_deposits })
473 .await;
474 }
475 if !claimed_deposits.is_empty() {
476 self.event_emitter
477 .emit(&SdkEvent::ClaimedDeposits { claimed_deposits })
478 .await;
479 }
480 Ok(())
481 }
482
483 pub(super) async fn sync_lnurl_metadata(&self) -> Result<(), SdkError> {
484 let Some(lnurl_server_client) = self.lnurl_server_client.clone() else {
485 return Ok(());
486 };
487
488 let cache = ObjectCacheRepository::new(Arc::clone(&self.storage));
489 let mut updated_after = cache.fetch_lnurl_metadata_updated_after().await?;
490
491 loop {
492 debug!("Syncing lnurl metadata from updated_after {updated_after}");
493 let metadata = lnurl_server_client
494 .list_metadata(&ListMetadataRequest {
495 offset: None,
496 limit: Some(SYNC_PAGING_LIMIT),
497 updated_after: Some(updated_after),
498 })
499 .await?;
500
501 if metadata.metadata.is_empty() {
502 debug!("No more lnurl metadata on offset {updated_after}");
503 break;
504 }
505
506 let len = u32::try_from(metadata.metadata.len())?;
507 let last_updated_at = metadata.metadata.last().map(|m| m.updated_at);
508 self.storage
509 .set_lnurl_metadata(metadata.metadata.into_iter().map(From::from).collect())
510 .await?;
511
512 debug!(
513 "Synchronized {} lnurl metadata at updated_after {updated_after}",
514 len
515 );
516 updated_after = last_updated_at.unwrap_or(updated_after);
517 cache
518 .save_lnurl_metadata_updated_after(updated_after)
519 .await?;
520
521 let _ = self.lnurl_preimage_trigger.send(());
522 if len < SYNC_PAGING_LIMIT {
523 break;
525 }
526 }
527
528 Ok(())
529 }
530
531 pub(super) async fn claim_utxo(
532 &self,
533 detailed_utxo: &DetailedUtxo,
534 max_claim_fee: Option<MaxFee>,
535 ) -> Result<spark_wallet::WalletTransfer, SdkError> {
536 info!(
537 "Fetching static deposit claim quote for deposit tx {}:{} and amount: {}",
538 detailed_utxo.txid, detailed_utxo.vout, detailed_utxo.value
539 );
540 let quote = self
541 .spark_wallet
542 .fetch_static_deposit_claim_quote(detailed_utxo.tx.clone(), Some(detailed_utxo.vout))
543 .await?;
544
545 let spark_requested_fee_sats = detailed_utxo.value.saturating_sub(quote.credit_amount_sats);
546
547 let spark_requested_fee_rate = spark_requested_fee_sats.div_ceil(CLAIM_TX_SIZE_VBYTES);
548
549 let Some(max_deposit_claim_fee) = max_claim_fee else {
550 return Err(SdkError::MaxDepositClaimFeeExceeded {
551 tx: detailed_utxo.txid.to_string(),
552 vout: detailed_utxo.vout,
553 max_fee: None,
554 required_fee_sats: spark_requested_fee_sats,
555 required_fee_rate_sat_per_vbyte: spark_requested_fee_rate,
556 });
557 };
558 let max_fee = max_deposit_claim_fee
559 .to_fee(self.chain_service.as_ref())
560 .await?;
561 let max_fee_sats = max_fee.to_sats(CLAIM_TX_SIZE_VBYTES);
562 info!(
563 "User max fee: {} spark requested fee: {}",
564 max_fee_sats, spark_requested_fee_sats
565 );
566 if spark_requested_fee_sats > max_fee_sats {
567 return Err(SdkError::MaxDepositClaimFeeExceeded {
568 tx: detailed_utxo.txid.to_string(),
569 vout: detailed_utxo.vout,
570 max_fee: Some(max_fee),
571 required_fee_sats: spark_requested_fee_sats,
572 required_fee_rate_sat_per_vbyte: spark_requested_fee_rate,
573 });
574 }
575
576 info!(
577 "Claiming static deposit for utxo {}:{}",
578 detailed_utxo.txid, detailed_utxo.vout
579 );
580 let transfer = self.spark_wallet.claim_static_deposit(quote).await?;
581 info!(
582 "Claimed static deposit transfer for utxo {}:{}, value {}",
583 detailed_utxo.txid, detailed_utxo.vout, transfer.total_value_sat,
584 );
585 Ok(transfer)
586 }
587}
588
589#[cfg_attr(feature = "uniffi", uniffi::export(async_runtime = "tokio"))]
590#[allow(clippy::needless_pass_by_value)]
591impl BreezSdk {
592 #[allow(unused_variables)]
594 pub async fn sync_wallet(
595 &self,
596 request: SyncWalletRequest,
597 ) -> Result<SyncWalletResponse, SdkError> {
598 self.sync_coordinator
600 .trigger_sync_and_wait(super::SyncType::Full, true)
601 .await?;
602 Ok(SyncWalletResponse {})
603 }
604}