1use platform_utils::time::{Instant, SystemTime};
2use platform_utils::tokio;
3use std::sync::Arc;
4use tracing::{debug, error, info, trace, warn};
5
6use super::{
7 BreezSdk, CLAIM_TX_SIZE_VBYTES, SYNC_PAGING_LIMIT, SyncType, helpers::update_balances,
8 parse_input,
9};
10use crate::{
11 DepositInfo, InputType, MaxFee, PaymentDetails, PaymentType,
12 error::SdkError,
13 events::{InternalSyncedEvent, SdkEvent},
14 lnurl::ListMetadataRequest,
15 models::{Payment, SyncWalletRequest, SyncWalletResponse},
16 persist::{ObjectCacheRepository, UpdateDepositPayload},
17 sync::SparkSyncService,
18 utils::{
19 deposit_chain_syncer::{DepositChainSyncer, TxOutput},
20 utxo_fetcher::DetailedUtxo,
21 },
22};
23
24impl BreezSdk {
25 pub(in crate::sdk) async fn sync_single_lnurl_metadata(&self, payment: &mut Payment) {
26 if payment.payment_type != PaymentType::Receive {
27 return;
28 }
29
30 let Some(PaymentDetails::Lightning {
31 invoice,
32 lnurl_receive_metadata,
33 ..
34 }) = &mut payment.details
35 else {
36 return;
37 };
38
39 if lnurl_receive_metadata.is_some() {
40 return;
42 }
43
44 let Ok(input) = parse_input(invoice, None).await else {
45 error!(
46 "Failed to parse invoice for lnurl metadata sync: {}",
47 invoice
48 );
49 return;
50 };
51
52 let InputType::Bolt11Invoice(details) = input else {
53 error!(
54 "Input is not a Bolt11 invoice for lnurl metadata sync: {}",
55 invoice
56 );
57 return;
58 };
59
60 if details.description_hash.is_none() {
62 return;
63 }
64
65 if let Ok(db_payment) = self.storage.get_payment_by_id(payment.id.clone()).await
70 && let Some(PaymentDetails::Lightning {
71 lnurl_receive_metadata: db_lnurl_receive_metadata @ Some(_),
72 ..
73 }) = db_payment.details
74 {
75 *lnurl_receive_metadata = db_lnurl_receive_metadata;
76 return;
77 }
78
79 if let Err(e) = self.sync_lnurl_metadata().await {
83 error!("Failed to sync lnurl metadata for invoice {invoice}: {e}");
84 return;
85 }
86
87 let db_payment = match self.storage.get_payment_by_id(payment.id.clone()).await {
88 Ok(p) => p,
89 Err(e) => {
90 debug!("Payment not found in storage for invoice {}: {e}", invoice);
91 return;
92 }
93 };
94
95 let Some(PaymentDetails::Lightning {
96 lnurl_receive_metadata: db_lnurl_receive_metadata,
97 ..
98 }) = db_payment.details
99 else {
100 debug!(
101 "No lnurl receive metadata in storage for invoice {}",
102 invoice
103 );
104 return;
105 };
106 *lnurl_receive_metadata = db_lnurl_receive_metadata;
107 }
108
109 #[allow(clippy::too_many_lines)]
110 pub(super) async fn sync_wallet_internal(
111 &self,
112 sync_type: SyncType,
113 force: bool,
114 ) -> Result<(), SdkError> {
115 let cache = ObjectCacheRepository::new(self.storage.clone());
116 let sync_interval_secs = u64::from(self.config.sync_interval_secs);
117
118 let now = SystemTime::now()
119 .duration_since(SystemTime::UNIX_EPOCH)
120 .map_or(0, |d| d.as_secs());
121
122 if !force
124 && let Some(last) = cache.get_last_sync_time().await?
125 && now.saturating_sub(last) < sync_interval_secs
126 {
127 debug!("sync_wallet_internal: Synced recently, skipping");
128 self.event_emitter.emit(&SdkEvent::Synced).await;
132 return Ok(());
133 }
134
135 if sync_type.contains(SyncType::Full)
137 && let Err(e) = cache.set_last_sync_time(now).await
138 {
139 error!("sync_wallet_internal: Failed to update last sync time: {e:?}");
140 }
141
142 let start_time = Instant::now();
143
144 let sync_wallet = async {
145 let wallet_synced = if sync_type.contains(SyncType::Wallet) {
146 debug!("sync_wallet_internal: Starting Wallet sync");
147 let wallet_start = Instant::now();
148 match self.spark_wallet.sync().await {
149 Ok(()) => {
150 debug!(
151 "sync_wallet_internal: Wallet sync completed in {:?}",
152 wallet_start.elapsed()
153 );
154 true
155 }
156 Err(e) => {
157 error!(
158 "sync_wallet_internal: Spark wallet sync failed in {:?}: {e:?}",
159 wallet_start.elapsed()
160 );
161 false
162 }
163 }
164 } else {
165 trace!("sync_wallet_internal: Skipping Wallet sync");
166 false
167 };
168
169 let wallet_state_synced = if sync_type.contains(SyncType::WalletState) {
170 debug!("sync_wallet_internal: Starting WalletState sync");
171 let wallet_state_start = Instant::now();
172 match self.sync_wallet_state_to_storage().await {
173 Ok(()) => {
174 debug!(
175 "sync_wallet_internal: WalletState sync completed in {:?}",
176 wallet_state_start.elapsed()
177 );
178 true
179 }
180 Err(e) => {
181 error!(
182 "sync_wallet_internal: Failed to sync wallet state to storage in {:?}: {e:?}",
183 wallet_state_start.elapsed()
184 );
185 false
186 }
187 }
188 } else {
189 trace!("sync_wallet_internal: Skipping WalletState sync");
190 false
191 };
192
193 (wallet_synced, wallet_state_synced)
194 };
195
196 let sync_lnurl = async {
197 if sync_type.contains(SyncType::LnurlMetadata) {
198 debug!("sync_wallet_internal: Starting LnurlMetadata sync");
199 let lnurl_start = Instant::now();
200 match self.sync_lnurl_metadata().await {
201 Ok(()) => {
202 debug!(
203 "sync_wallet_internal: LnurlMetadata sync completed in {:?}",
204 lnurl_start.elapsed()
205 );
206 true
207 }
208 Err(e) => {
209 error!(
210 "sync_wallet_internal: Failed to sync lnurl metadata in {:?}: {e:?}",
211 lnurl_start.elapsed()
212 );
213 false
214 }
215 }
216 } else {
217 trace!("sync_wallet_internal: Skipping LnurlMetadata sync");
218 false
219 }
220 };
221
222 let sync_deposits = async {
223 if sync_type.contains(SyncType::Deposits) {
224 debug!("sync_wallet_internal: Starting Deposits sync");
225 let deposits_start = Instant::now();
226 match self.check_and_claim_static_deposits().await {
227 Ok(()) => {
228 debug!(
229 "sync_wallet_internal: Deposits sync completed in {:?}",
230 deposits_start.elapsed()
231 );
232 true
233 }
234 Err(e) => {
235 error!(
236 "sync_wallet_internal: Failed to check and claim static deposits in {:?}: {e:?}",
237 deposits_start.elapsed()
238 );
239 false
240 }
241 }
242 } else {
243 trace!("sync_wallet_internal: Skipping Deposits sync");
244 false
245 }
246 };
247
248 let ((wallet, wallet_state), lnurl_metadata, deposits) =
249 tokio::join!(sync_wallet, sync_lnurl, sync_deposits);
250
251 let elapsed = start_time.elapsed();
252 let event = InternalSyncedEvent {
253 wallet,
254 wallet_state,
255 lnurl_metadata,
256 deposits,
257 storage_incoming: None,
258 };
259 info!("sync_wallet_internal: Wallet sync completed in {elapsed:?}: {event:?}");
260 self.event_emitter.emit_synced(&event).await;
261 Ok(())
262 }
263
264 pub(super) async fn sync_wallet_state_to_storage(&self) -> Result<(), SdkError> {
266 update_balances(self.spark_wallet.clone(), self.storage.clone()).await?;
267
268 let initial_sync_complete = *self.initial_synced_watcher.borrow();
269 let sync_service = SparkSyncService::new(
270 self.spark_wallet.clone(),
271 self.storage.clone(),
272 self.event_emitter.clone(),
273 );
274 sync_service.sync_payments(initial_sync_complete).await?;
275
276 Ok(())
277 }
278
279 pub(super) async fn check_and_claim_static_deposits(&self) -> Result<(), SdkError> {
280 self.maybe_ensure_spark_private_mode_initialized().await?;
281 let existing_deposits = self.storage.list_deposits().await?;
282 let existing_keys: std::collections::HashSet<TxOutput> = existing_deposits
283 .iter()
284 .map(|d| TxOutput {
285 txid: d.txid.clone(),
286 vout: d.vout,
287 })
288 .collect();
289
290 let all_utxos = DepositChainSyncer::new(
291 self.chain_service.clone(),
292 self.storage.clone(),
293 self.spark_wallet.clone(),
294 )
295 .sync()
296 .await?;
297
298 let new_deposits: Vec<DepositInfo> = all_utxos
300 .iter()
301 .filter(|(u, _)| {
302 !existing_keys.contains(&TxOutput {
303 txid: u.txid.to_string(),
304 vout: u.vout,
305 })
306 })
307 .map(|(u, is_mature)| u.clone().into_deposit_info(*is_mature))
308 .collect();
309 if !new_deposits.is_empty() {
310 self.event_emitter
311 .emit(&SdkEvent::NewDeposits { new_deposits })
312 .await;
313 }
314
315 let to_claim: Vec<_> = all_utxos
317 .into_iter()
318 .filter(|(_, is_mature)| *is_mature)
319 .map(|(u, _)| u)
320 .collect();
321
322 let mut claimed_deposits: Vec<DepositInfo> = Vec::new();
323 let mut unclaimed_deposits: Vec<DepositInfo> = Vec::new();
324 for detailed_utxo in to_claim {
325 match self
326 .claim_utxo(&detailed_utxo, self.config.max_deposit_claim_fee.clone())
327 .await
328 {
329 Ok(_) => {
330 info!("Claimed utxo {}:{}", detailed_utxo.txid, detailed_utxo.vout);
331 self.storage
332 .delete_deposit(detailed_utxo.txid.to_string(), detailed_utxo.vout)
333 .await?;
334 claimed_deposits.push(detailed_utxo.into_deposit_info(true));
335 }
336 Err(e) => {
337 warn!(
338 "Failed to claim utxo {}:{}: {e}",
339 detailed_utxo.txid, detailed_utxo.vout
340 );
341 self.storage
342 .update_deposit(
343 detailed_utxo.txid.to_string(),
344 detailed_utxo.vout,
345 UpdateDepositPayload::ClaimError {
346 error: e.clone().into(),
347 },
348 )
349 .await?;
350 let mut unclaimed_deposit = detailed_utxo.into_deposit_info(true);
351 unclaimed_deposit.claim_error = Some(e.into());
352 unclaimed_deposits.push(unclaimed_deposit);
353 }
354 }
355 }
356
357 info!("background claim completed, unclaimed deposits: {unclaimed_deposits:?}");
358
359 if !unclaimed_deposits.is_empty() {
360 self.event_emitter
361 .emit(&SdkEvent::UnclaimedDeposits { unclaimed_deposits })
362 .await;
363 }
364 if !claimed_deposits.is_empty() {
365 self.event_emitter
366 .emit(&SdkEvent::ClaimedDeposits { claimed_deposits })
367 .await;
368 }
369 Ok(())
370 }
371
372 pub(super) async fn sync_lnurl_metadata(&self) -> Result<(), SdkError> {
373 let Some(lnurl_server_client) = self.lnurl_server_client.clone() else {
374 return Ok(());
375 };
376
377 let cache = ObjectCacheRepository::new(Arc::clone(&self.storage));
378 let mut updated_after = cache.fetch_lnurl_metadata_updated_after().await?;
379
380 loop {
381 debug!("Syncing lnurl metadata from updated_after {updated_after}");
382 let metadata = lnurl_server_client
383 .list_metadata(&ListMetadataRequest {
384 offset: None,
385 limit: Some(SYNC_PAGING_LIMIT),
386 updated_after: Some(updated_after),
387 })
388 .await?;
389
390 if metadata.metadata.is_empty() {
391 debug!("No more lnurl metadata on offset {updated_after}");
392 break;
393 }
394
395 let len = u32::try_from(metadata.metadata.len())?;
396 let last_updated_at = metadata.metadata.last().map(|m| m.updated_at);
397 self.storage
398 .set_lnurl_metadata(metadata.metadata.into_iter().map(From::from).collect())
399 .await?;
400
401 debug!(
402 "Synchronized {} lnurl metadata at updated_after {updated_after}",
403 len
404 );
405 updated_after = last_updated_at.unwrap_or(updated_after);
406 cache
407 .save_lnurl_metadata_updated_after(updated_after)
408 .await?;
409
410 if len < SYNC_PAGING_LIMIT {
411 break;
413 }
414 }
415
416 Ok(())
417 }
418
419 pub(super) async fn claim_utxo(
422 &self,
423 detailed_utxo: &DetailedUtxo,
424 max_claim_fee: Option<MaxFee>,
425 ) -> Result<String, SdkError> {
426 info!(
427 "Fetching static deposit claim quote for deposit tx {}:{} and amount: {}",
428 detailed_utxo.txid, detailed_utxo.vout, detailed_utxo.value
429 );
430 let quote = self
431 .spark_wallet
432 .fetch_static_deposit_claim_quote(detailed_utxo.tx.clone(), Some(detailed_utxo.vout))
433 .await?;
434
435 let spark_requested_fee_sats = detailed_utxo.value.saturating_sub(quote.credit_amount_sats);
436
437 let spark_requested_fee_rate = spark_requested_fee_sats.div_ceil(CLAIM_TX_SIZE_VBYTES);
438
439 let Some(max_deposit_claim_fee) = max_claim_fee else {
440 return Err(SdkError::MaxDepositClaimFeeExceeded {
441 tx: detailed_utxo.txid.to_string(),
442 vout: detailed_utxo.vout,
443 max_fee: None,
444 required_fee_sats: spark_requested_fee_sats,
445 required_fee_rate_sat_per_vbyte: spark_requested_fee_rate,
446 });
447 };
448 let max_fee = max_deposit_claim_fee
449 .to_fee(self.chain_service.as_ref())
450 .await?;
451 let max_fee_sats = max_fee.to_sats(CLAIM_TX_SIZE_VBYTES);
452 info!(
453 "User max fee: {} spark requested fee: {}",
454 max_fee_sats, spark_requested_fee_sats
455 );
456 if spark_requested_fee_sats > max_fee_sats {
457 return Err(SdkError::MaxDepositClaimFeeExceeded {
458 tx: detailed_utxo.txid.to_string(),
459 vout: detailed_utxo.vout,
460 max_fee: Some(max_fee),
461 required_fee_sats: spark_requested_fee_sats,
462 required_fee_rate_sat_per_vbyte: spark_requested_fee_rate,
463 });
464 }
465
466 info!(
467 "Claiming static deposit for utxo {}:{}",
468 detailed_utxo.txid, detailed_utxo.vout
469 );
470 let credit_amount_sats = quote.credit_amount_sats;
471 let transfer_id = self.spark_wallet.claim_static_deposit(quote).await?;
472 info!(
473 "Claimed static deposit for utxo {}:{} (deposit value {}, credit {}), transfer {transfer_id}",
474 detailed_utxo.txid, detailed_utxo.vout, detailed_utxo.value, credit_amount_sats,
475 );
476 Ok(transfer_id)
477 }
478}
479
480#[cfg_attr(feature = "uniffi", uniffi::export(async_runtime = "tokio"))]
481#[allow(clippy::needless_pass_by_value)]
482impl BreezSdk {
483 #[allow(unused_variables)]
485 pub async fn sync_wallet(
486 &self,
487 request: SyncWalletRequest,
488 ) -> Result<SyncWalletResponse, SdkError> {
489 self.runtime
490 .run_user_sync(self, super::SyncType::Full, true)
491 .await?;
492 Ok(SyncWalletResponse {})
493 }
494}