1use core::fmt;
2use std::{
3 collections::BTreeMap,
4 sync::atomic::{AtomicU64, Ordering},
5};
6
7use serde::Serialize;
8use tokio::sync::{Mutex, RwLock};
9use uuid::Uuid;
10
11use crate::{DepositInfo, Payment};
12
13#[allow(clippy::large_enum_variant)]
15#[derive(Debug, Clone, Serialize)]
16#[cfg_attr(feature = "uniffi", derive(uniffi::Enum))]
17pub enum SdkEvent {
18 Synced,
20 UnclaimedDeposits {
22 unclaimed_deposits: Vec<DepositInfo>,
23 },
24 ClaimedDeposits {
25 claimed_deposits: Vec<DepositInfo>,
26 },
27 PaymentSucceeded {
28 payment: Payment,
29 },
30 PaymentPending {
31 payment: Payment,
32 },
33 PaymentFailed {
34 payment: Payment,
35 },
36 Optimization {
37 optimization_event: OptimizationEvent,
39 },
40}
41
42impl SdkEvent {
43 pub(crate) fn from_payment(payment: Payment) -> Self {
44 match payment.status {
45 crate::PaymentStatus::Completed => SdkEvent::PaymentSucceeded { payment },
46 crate::PaymentStatus::Pending => SdkEvent::PaymentPending { payment },
47 crate::PaymentStatus::Failed => SdkEvent::PaymentFailed { payment },
48 }
49 }
50}
51
52impl fmt::Display for SdkEvent {
53 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
54 match self {
55 SdkEvent::Synced => write!(f, "Synced"),
56 SdkEvent::UnclaimedDeposits { unclaimed_deposits } => {
57 write!(f, "UnclaimedDeposits: {unclaimed_deposits:?}")
58 }
59 SdkEvent::ClaimedDeposits { claimed_deposits } => {
60 write!(f, "ClaimedDeposits: {claimed_deposits:?}")
61 }
62 SdkEvent::PaymentSucceeded { payment } => {
63 write!(f, "PaymentSucceeded: {payment:?}")
64 }
65 SdkEvent::PaymentPending { payment } => {
66 write!(f, "PaymentPending: {payment:?}")
67 }
68 SdkEvent::PaymentFailed { payment } => {
69 write!(f, "PaymentFailed: {payment:?}")
70 }
71 SdkEvent::Optimization {
72 optimization_event: event,
73 } => {
74 write!(f, "Optimization: {event:?}")
75 }
76 }
77 }
78}
79
80#[derive(Debug, Clone, Serialize)]
81#[cfg_attr(feature = "uniffi", derive(uniffi::Enum))]
82pub enum OptimizationEvent {
83 Started { total_rounds: u32 },
85 RoundCompleted {
87 current_round: u32,
88 total_rounds: u32,
89 },
90 Completed,
92 Cancelled,
94 Failed { error: String },
96 Skipped,
98}
99
100#[allow(clippy::struct_excessive_bools)]
101#[derive(Debug, Default)]
102pub struct InternalSyncedEvent {
103 pub wallet: bool,
104 pub wallet_state: bool,
105 pub deposits: bool,
106 pub lnurl_metadata: bool,
107 pub storage_incoming: Option<u32>,
108}
109
110impl InternalSyncedEvent {
111 pub fn any(&self) -> bool {
112 self.wallet
113 || self.wallet_state
114 || self.deposits
115 || self.lnurl_metadata
116 || self.storage_incoming.is_some()
117 }
118
119 pub fn any_non_zero(&self) -> bool {
120 self.wallet
121 || self.wallet_state
122 || self.deposits
123 || self.lnurl_metadata
124 || self.storage_incoming.is_some_and(|v| v > 0)
125 }
126
127 pub fn merge(&self, other: &InternalSyncedEvent) -> Self {
128 Self {
129 wallet: self.wallet || other.wallet,
130 wallet_state: self.wallet_state || other.wallet_state,
131 deposits: self.deposits || other.deposits,
132 lnurl_metadata: self.lnurl_metadata || other.lnurl_metadata,
133 storage_incoming: self
134 .storage_incoming
135 .zip(other.storage_incoming)
136 .map(|(a, b)| a.saturating_add(b))
137 .or(self.storage_incoming)
138 .or(other.storage_incoming),
139 }
140 }
141}
142
143#[cfg_attr(feature = "uniffi", uniffi::export(callback_interface))]
145#[macros::async_trait]
146pub trait EventListener: Send + Sync {
147 async fn on_event(&self, event: SdkEvent);
149}
150
151pub struct EventEmitter {
153 has_real_time_sync: bool,
154 listener_index: AtomicU64,
155 listeners: RwLock<BTreeMap<String, Box<dyn EventListener>>>,
156 synced_event_buffer: Mutex<Option<InternalSyncedEvent>>,
157}
158
159impl EventEmitter {
160 pub fn new(has_real_time_sync: bool) -> Self {
162 Self {
163 has_real_time_sync,
164 listener_index: AtomicU64::new(0),
165 listeners: RwLock::new(BTreeMap::new()),
166 synced_event_buffer: Mutex::new(Some(InternalSyncedEvent::default())),
167 }
168 }
169
170 pub async fn add_listener(&self, listener: Box<dyn EventListener>) -> String {
180 let index = self.listener_index.fetch_add(1, Ordering::Relaxed);
181 let id = format!("listener_{}-{}", index, Uuid::new_v4());
182 let mut listeners = self.listeners.write().await;
183 listeners.insert(id.clone(), listener);
184 id
185 }
186
187 pub async fn remove_listener(&self, id: &str) -> bool {
197 let mut listeners = self.listeners.write().await;
198 listeners.remove(id).is_some()
199 }
200
201 pub async fn emit(&self, event: &SdkEvent) {
203 let listeners = self.listeners.read().await;
205
206 for listener in listeners.values() {
208 listener.on_event(event.clone()).await;
209 }
210 }
211
212 pub async fn emit_synced(&self, synced: &InternalSyncedEvent) {
213 if !synced.any() {
214 return;
216 }
217
218 let mut mtx = self.synced_event_buffer.lock().await;
219
220 let is_first_event = if let Some(buffered) = &*mtx {
221 let merged = buffered.merge(synced);
222
223 if merged.wallet && (!self.has_real_time_sync || merged.storage_incoming.is_some()) {
226 *mtx = None;
227 } else {
228 *mtx = Some(merged);
229 return;
230 }
231
232 true
233 } else {
234 false
235 };
236
237 drop(mtx);
238
239 if !is_first_event && !synced.any_non_zero() {
241 return;
242 }
243
244 self.emit(&SdkEvent::Synced).await;
246 }
247}
248
249impl Default for EventEmitter {
250 fn default() -> Self {
251 Self::new(false)
252 }
253}
254
255#[cfg(test)]
256mod tests {
257 use super::*;
258 use std::sync::Arc;
259 use std::sync::atomic::{AtomicBool, Ordering};
260
261 use macros::async_test_all;
262
263 #[cfg(feature = "browser-tests")]
264 wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
265
266 struct TestListener {
267 received: Arc<AtomicBool>,
268 }
269
270 #[macros::async_trait]
271 impl EventListener for TestListener {
272 async fn on_event(&self, _event: SdkEvent) {
273 self.received.store(true, Ordering::Relaxed);
274 }
275 }
276
277 #[async_test_all]
278 async fn test_event_emission() {
279 let emitter = EventEmitter::new(false);
280 let received = Arc::new(AtomicBool::new(false));
281
282 let listener = Box::new(TestListener {
284 received: received.clone(),
285 });
286
287 let _ = emitter.add_listener(listener).await;
288
289 let event = SdkEvent::Synced {};
290
291 emitter.emit(&event).await;
292
293 assert!(received.load(Ordering::Relaxed));
295 }
296
297 #[async_test_all]
298 async fn test_remove_listener() {
299 let emitter = EventEmitter::new(false);
300
301 let received1 = Arc::new(AtomicBool::new(false));
303 let received2 = Arc::new(AtomicBool::new(false));
304
305 let listener1 = Box::new(TestListener {
307 received: received1.clone(),
308 });
309
310 let listener2 = Box::new(TestListener {
311 received: received2.clone(),
312 });
313
314 let id1 = emitter.add_listener(listener1).await;
315 let id2 = emitter.add_listener(listener2).await;
316
317 assert!(emitter.remove_listener(&id1).await);
319
320 let event = SdkEvent::Synced {};
322 emitter.emit(&event).await;
323
324 assert!(!received1.load(Ordering::Relaxed));
326
327 assert!(received2.load(Ordering::Relaxed));
329
330 assert!(emitter.remove_listener(&id2).await);
332
333 assert!(!emitter.remove_listener("non-existent-id").await);
335 }
336
337 #[async_test_all]
338 async fn test_synced_event_only_emitted_with_wallet_sync() {
339 let emitter = EventEmitter::new(false);
340 let received = Arc::new(AtomicBool::new(false));
341
342 let listener = Box::new(TestListener {
343 received: received.clone(),
344 });
345
346 emitter.add_listener(listener).await;
347
348 emitter
350 .emit_synced(&InternalSyncedEvent {
351 wallet: false,
352 wallet_state: true,
353 deposits: true,
354 lnurl_metadata: true,
355 storage_incoming: None,
356 })
357 .await;
358
359 assert!(!received.load(Ordering::Relaxed));
360
361 emitter
363 .emit_synced(&InternalSyncedEvent {
364 wallet: true,
365 wallet_state: false,
366 deposits: false,
367 lnurl_metadata: false,
368 storage_incoming: Some(1),
369 })
370 .await;
371
372 assert!(received.load(Ordering::Relaxed));
373 }
374
375 #[async_test_all]
376 async fn test_has_real_time_sync_synced_event_only_emitted_with_wallet_and_storage_sync() {
377 let emitter = EventEmitter::new(true);
378 let received = Arc::new(AtomicBool::new(false));
379
380 let listener = Box::new(TestListener {
381 received: received.clone(),
382 });
383
384 emitter.add_listener(listener).await;
385
386 emitter
388 .emit_synced(&InternalSyncedEvent {
389 wallet: false,
390 wallet_state: false,
391 deposits: false,
392 lnurl_metadata: false,
393 storage_incoming: Some(0),
394 })
395 .await;
396
397 assert!(!received.load(Ordering::Relaxed));
398
399 emitter
401 .emit_synced(&InternalSyncedEvent {
402 wallet: true,
403 wallet_state: false,
404 deposits: false,
405 lnurl_metadata: false,
406 storage_incoming: None,
407 })
408 .await;
409
410 assert!(received.load(Ordering::Relaxed));
411 }
412
413 #[async_test_all]
414 async fn test_has_real_time_sync_synced_event_only_emitted_with_wallet_and_storage_sync_reverse()
415 {
416 let emitter = EventEmitter::new(true);
417 let received = Arc::new(AtomicBool::new(false));
418
419 let listener = Box::new(TestListener {
420 received: received.clone(),
421 });
422
423 emitter.add_listener(listener).await;
424
425 emitter
427 .emit_synced(&InternalSyncedEvent {
428 wallet: true,
429 wallet_state: false,
430 deposits: false,
431 lnurl_metadata: false,
432 storage_incoming: None,
433 })
434 .await;
435
436 assert!(!received.load(Ordering::Relaxed));
437
438 emitter
440 .emit_synced(&InternalSyncedEvent {
441 wallet: false,
442 wallet_state: false,
443 deposits: false,
444 lnurl_metadata: false,
445 storage_incoming: Some(0),
446 })
447 .await;
448
449 assert!(received.load(Ordering::Relaxed));
450 }
451
452 #[async_test_all]
453 async fn test_synced_event_buffers_until_wallet_sync() {
454 let emitter = EventEmitter::new(false);
455 let received = Arc::new(AtomicBool::new(false));
456
457 let listener = Box::new(TestListener {
458 received: received.clone(),
459 });
460
461 emitter.add_listener(listener).await;
462
463 emitter
465 .emit_synced(&InternalSyncedEvent {
466 wallet: false,
467 wallet_state: true,
468 deposits: false,
469 lnurl_metadata: false,
470 storage_incoming: None,
471 })
472 .await;
473
474 assert!(!received.load(Ordering::Relaxed));
475
476 emitter
477 .emit_synced(&InternalSyncedEvent {
478 wallet: false,
479 wallet_state: false,
480 deposits: true,
481 lnurl_metadata: false,
482 storage_incoming: None,
483 })
484 .await;
485
486 assert!(!received.load(Ordering::Relaxed));
487
488 emitter
489 .emit_synced(&InternalSyncedEvent {
490 wallet: false,
491 wallet_state: false,
492 deposits: false,
493 lnurl_metadata: true,
494 storage_incoming: None,
495 })
496 .await;
497
498 assert!(!received.load(Ordering::Relaxed));
499
500 emitter
501 .emit_synced(&InternalSyncedEvent {
502 wallet: false,
503 wallet_state: false,
504 deposits: false,
505 lnurl_metadata: false,
506 storage_incoming: None,
507 })
508 .await;
509
510 assert!(!received.load(Ordering::Relaxed));
511
512 emitter
514 .emit_synced(&InternalSyncedEvent {
515 wallet: true,
516 wallet_state: false,
517 deposits: false,
518 lnurl_metadata: false,
519 storage_incoming: None,
520 })
521 .await;
522
523 assert!(received.load(Ordering::Relaxed));
524 }
525
526 #[async_test_all]
527 async fn test_synced_event_all_true() {
528 let emitter = EventEmitter::new(false);
529 let received = Arc::new(AtomicBool::new(false));
530
531 let listener = Box::new(TestListener {
532 received: received.clone(),
533 });
534
535 emitter.add_listener(listener).await;
536
537 emitter
539 .emit_synced(&InternalSyncedEvent {
540 wallet: true,
541 wallet_state: true,
542 deposits: true,
543 lnurl_metadata: true,
544 storage_incoming: Some(1),
545 })
546 .await;
547
548 assert!(received.load(Ordering::Relaxed));
549 }
550
551 #[async_test_all]
552 async fn test_synced_event_empty_does_not_emit() {
553 let emitter = EventEmitter::new(false);
554 let received = Arc::new(AtomicBool::new(false));
555
556 let listener = Box::new(TestListener {
557 received: received.clone(),
558 });
559
560 emitter.add_listener(listener).await;
561
562 emitter
564 .emit_synced(&InternalSyncedEvent {
565 wallet: false,
566 wallet_state: false,
567 deposits: false,
568 lnurl_metadata: false,
569 storage_incoming: None,
570 })
571 .await;
572
573 assert!(!received.load(Ordering::Relaxed));
574 }
575
576 #[async_test_all]
577 async fn test_subsequent_syncs_after_wallet_emit_immediately() {
578 use std::sync::atomic::AtomicUsize;
579
580 struct CountingListener {
581 count: Arc<AtomicUsize>,
582 }
583
584 #[macros::async_trait]
585 impl EventListener for CountingListener {
586 async fn on_event(&self, event: SdkEvent) {
587 if matches!(event, SdkEvent::Synced) {
588 self.count.fetch_add(1, Ordering::Relaxed);
589 }
590 }
591 }
592
593 let emitter = EventEmitter::new(true);
594 let count = Arc::new(AtomicUsize::new(0));
595
596 let listener = Box::new(CountingListener {
597 count: count.clone(),
598 });
599
600 emitter.add_listener(listener).await;
601
602 emitter
604 .emit_synced(&InternalSyncedEvent {
605 wallet: true,
606 wallet_state: false,
607 deposits: false,
608 lnurl_metadata: false,
609 storage_incoming: Some(0),
610 })
611 .await;
612
613 assert_eq!(count.load(Ordering::Relaxed), 1);
614
615 emitter
617 .emit_synced(&InternalSyncedEvent {
618 wallet: false,
619 wallet_state: true,
620 deposits: false,
621 lnurl_metadata: false,
622 storage_incoming: None,
623 })
624 .await;
625
626 assert_eq!(count.load(Ordering::Relaxed), 2);
627
628 emitter
630 .emit_synced(&InternalSyncedEvent {
631 wallet: false,
632 wallet_state: false,
633 deposits: true,
634 lnurl_metadata: false,
635 storage_incoming: None,
636 })
637 .await;
638
639 assert_eq!(count.load(Ordering::Relaxed), 3);
640
641 emitter
642 .emit_synced(&InternalSyncedEvent {
643 wallet: false,
644 wallet_state: false,
645 deposits: false,
646 lnurl_metadata: true,
647 storage_incoming: None,
648 })
649 .await;
650
651 assert_eq!(count.load(Ordering::Relaxed), 4);
652
653 emitter
654 .emit_synced(&InternalSyncedEvent {
655 wallet: false,
656 wallet_state: false,
657 deposits: false,
658 lnurl_metadata: false,
659 storage_incoming: Some(1),
660 })
661 .await;
662
663 assert_eq!(count.load(Ordering::Relaxed), 5);
664
665 emitter
667 .emit_synced(&InternalSyncedEvent {
668 wallet: false,
669 wallet_state: false,
670 deposits: false,
671 lnurl_metadata: false,
672 storage_incoming: Some(0),
673 })
674 .await;
675
676 assert_eq!(count.load(Ordering::Relaxed), 5);
677
678 emitter
679 .emit_synced(&InternalSyncedEvent {
680 wallet: true,
681 wallet_state: false,
682 deposits: false,
683 lnurl_metadata: false,
684 storage_incoming: None,
685 })
686 .await;
687
688 assert_eq!(count.load(Ordering::Relaxed), 6);
689 }
690
691 #[async_test_all]
692 async fn test_empty_event_does_not_emit_after_wallet_sync() {
693 use std::sync::atomic::AtomicUsize;
694
695 struct CountingListener {
696 count: Arc<AtomicUsize>,
697 }
698
699 #[macros::async_trait]
700 impl EventListener for CountingListener {
701 async fn on_event(&self, event: SdkEvent) {
702 if matches!(event, SdkEvent::Synced) {
703 self.count.fetch_add(1, Ordering::Relaxed);
704 }
705 }
706 }
707
708 let emitter = EventEmitter::new(false);
709 let count = Arc::new(AtomicUsize::new(0));
710
711 let listener = Box::new(CountingListener {
712 count: count.clone(),
713 });
714
715 emitter.add_listener(listener).await;
716
717 emitter
719 .emit_synced(&InternalSyncedEvent {
720 wallet: true,
721 wallet_state: false,
722 deposits: false,
723 lnurl_metadata: false,
724 storage_incoming: None,
725 })
726 .await;
727
728 assert_eq!(count.load(Ordering::Relaxed), 1);
729
730 emitter
732 .emit_synced(&InternalSyncedEvent {
733 wallet: false,
734 wallet_state: false,
735 deposits: false,
736 lnurl_metadata: false,
737 storage_incoming: None,
738 })
739 .await;
740
741 assert_eq!(count.load(Ordering::Relaxed), 1); emitter
745 .emit_synced(&InternalSyncedEvent {
746 wallet: false,
747 wallet_state: true,
748 deposits: false,
749 lnurl_metadata: false,
750 storage_incoming: None,
751 })
752 .await;
753
754 assert_eq!(count.load(Ordering::Relaxed), 2); }
756}