1use core::fmt;
2use std::{
3 collections::BTreeMap,
4 sync::atomic::{AtomicBool, AtomicU64, Ordering},
5};
6
7use serde::Serialize;
8use tokio::sync::{Mutex, RwLock};
9use uuid::Uuid;
10
11use crate::{DepositInfo, LightningAddressInfo, Payment};
12
13#[allow(clippy::large_enum_variant)]
15#[derive(Debug, Clone, Serialize)]
16#[cfg_attr(feature = "uniffi", derive(uniffi::Enum))]
17pub enum SdkEvent {
18 Synced,
20 UnclaimedDeposits {
22 unclaimed_deposits: Vec<DepositInfo>,
23 },
24 ClaimedDeposits {
25 claimed_deposits: Vec<DepositInfo>,
26 },
27 PaymentSucceeded {
28 payment: Payment,
29 },
30 PaymentPending {
31 payment: Payment,
32 },
33 PaymentFailed {
34 payment: Payment,
35 },
36 Optimization {
37 optimization_event: OptimizationEvent,
39 },
40 LightningAddressChanged {
41 lightning_address: Option<LightningAddressInfo>,
42 },
43}
44
45impl SdkEvent {
46 pub(crate) fn from_payment(payment: Payment) -> Self {
47 match payment.status {
48 crate::PaymentStatus::Completed => SdkEvent::PaymentSucceeded { payment },
49 crate::PaymentStatus::Pending => SdkEvent::PaymentPending { payment },
50 crate::PaymentStatus::Failed => SdkEvent::PaymentFailed { payment },
51 }
52 }
53}
54
55impl fmt::Display for SdkEvent {
56 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
57 match self {
58 SdkEvent::Synced => write!(f, "Synced"),
59 SdkEvent::UnclaimedDeposits { unclaimed_deposits } => {
60 write!(f, "UnclaimedDeposits: {unclaimed_deposits:?}")
61 }
62 SdkEvent::ClaimedDeposits { claimed_deposits } => {
63 write!(f, "ClaimedDeposits: {claimed_deposits:?}")
64 }
65 SdkEvent::PaymentSucceeded { payment } => {
66 write!(f, "PaymentSucceeded: {payment:?}")
67 }
68 SdkEvent::PaymentPending { payment } => {
69 write!(f, "PaymentPending: {payment:?}")
70 }
71 SdkEvent::PaymentFailed { payment } => {
72 write!(f, "PaymentFailed: {payment:?}")
73 }
74 SdkEvent::Optimization {
75 optimization_event: event,
76 } => {
77 write!(f, "Optimization: {event:?}")
78 }
79 SdkEvent::LightningAddressChanged { lightning_address } => {
80 write!(f, "LightningAddressChanged: {lightning_address:?}")
81 }
82 }
83 }
84}
85
86#[derive(Debug, Clone, Serialize)]
87#[cfg_attr(feature = "uniffi", derive(uniffi::Enum))]
88pub enum OptimizationEvent {
89 Started { total_rounds: u32 },
91 RoundCompleted {
93 current_round: u32,
94 total_rounds: u32,
95 },
96 Completed,
98 Cancelled,
100 Failed { error: String },
102 Skipped,
104}
105
106#[allow(clippy::struct_excessive_bools)]
107#[derive(Debug, Default)]
108pub struct InternalSyncedEvent {
109 pub wallet: bool,
110 pub wallet_state: bool,
111 pub deposits: bool,
112 pub lnurl_metadata: bool,
113 pub storage_incoming: Option<u32>,
114}
115
116impl InternalSyncedEvent {
117 pub fn any(&self) -> bool {
118 self.wallet
119 || self.wallet_state
120 || self.deposits
121 || self.lnurl_metadata
122 || self.storage_incoming.is_some()
123 }
124
125 pub fn any_non_zero(&self) -> bool {
126 self.wallet
127 || self.wallet_state
128 || self.deposits
129 || self.lnurl_metadata
130 || self.storage_incoming.is_some_and(|v| v > 0)
131 }
132
133 pub fn merge(&self, other: &InternalSyncedEvent) -> Self {
134 Self {
135 wallet: self.wallet || other.wallet,
136 wallet_state: self.wallet_state || other.wallet_state,
137 deposits: self.deposits || other.deposits,
138 lnurl_metadata: self.lnurl_metadata || other.lnurl_metadata,
139 storage_incoming: self
140 .storage_incoming
141 .zip(other.storage_incoming)
142 .map(|(a, b)| a.saturating_add(b))
143 .or(self.storage_incoming)
144 .or(other.storage_incoming),
145 }
146 }
147}
148
149#[cfg_attr(feature = "uniffi", uniffi::export(callback_interface))]
151#[macros::async_trait]
152pub trait EventListener: Send + Sync {
153 async fn on_event(&self, event: SdkEvent);
155}
156
157pub struct EventEmitter {
159 has_real_time_sync: bool,
160 rtsync_failed: AtomicBool,
161 listener_index: AtomicU64,
162 listeners: RwLock<BTreeMap<String, Box<dyn EventListener>>>,
163 synced_event_buffer: Mutex<Option<InternalSyncedEvent>>,
164}
165
166impl EventEmitter {
167 pub fn new(has_real_time_sync: bool) -> Self {
169 Self {
170 has_real_time_sync,
171 rtsync_failed: AtomicBool::new(false),
172 listener_index: AtomicU64::new(0),
173 listeners: RwLock::new(BTreeMap::new()),
174 synced_event_buffer: Mutex::new(Some(InternalSyncedEvent::default())),
175 }
176 }
177
178 pub async fn add_listener(&self, listener: Box<dyn EventListener>) -> String {
188 let index = self.listener_index.fetch_add(1, Ordering::Relaxed);
189 let id = format!("listener_{}-{}", index, Uuid::new_v4());
190 let mut listeners = self.listeners.write().await;
191 listeners.insert(id.clone(), listener);
192 id
193 }
194
195 pub async fn remove_listener(&self, id: &str) -> bool {
205 let mut listeners = self.listeners.write().await;
206 listeners.remove(id).is_some()
207 }
208
209 pub async fn emit(&self, event: &SdkEvent) {
211 let listeners = self.listeners.read().await;
213
214 for listener in listeners.values() {
216 listener.on_event(event.clone()).await;
217 }
218 }
219
220 pub async fn emit_synced(&self, synced: &InternalSyncedEvent) {
221 if !synced.any() {
222 return;
224 }
225
226 let mut mtx = self.synced_event_buffer.lock().await;
227
228 let is_first_event = if let Some(buffered) = &*mtx {
229 let merged = buffered.merge(synced);
230
231 if merged.wallet
234 && (!self.has_real_time_sync
235 || merged.storage_incoming.is_some()
236 || self.rtsync_failed.load(Ordering::Relaxed))
237 {
238 *mtx = None;
239 } else {
240 *mtx = Some(merged);
241 return;
242 }
243
244 true
245 } else {
246 false
247 };
248
249 drop(mtx);
250
251 if !is_first_event && !synced.any_non_zero() {
253 return;
254 }
255
256 self.emit(&SdkEvent::Synced).await;
258 }
259
260 pub async fn notify_rtsync_failed(&self) {
264 self.rtsync_failed.store(true, Ordering::Relaxed);
265
266 let mut mtx = self.synced_event_buffer.lock().await;
267 if let Some(buffered) = &*mtx
268 && buffered.wallet
269 {
270 *mtx = None;
271 drop(mtx);
272 self.emit(&SdkEvent::Synced).await;
273 }
274 }
275}
276
277impl Default for EventEmitter {
278 fn default() -> Self {
279 Self::new(false)
280 }
281}
282
283#[cfg(test)]
284mod tests {
285 use super::*;
286 use std::sync::Arc;
287 use std::sync::atomic::{AtomicBool, Ordering};
288
289 use macros::async_test_all;
290
291 #[cfg(feature = "browser-tests")]
292 wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
293
294 struct TestListener {
295 received: Arc<AtomicBool>,
296 }
297
298 #[macros::async_trait]
299 impl EventListener for TestListener {
300 async fn on_event(&self, _event: SdkEvent) {
301 self.received.store(true, Ordering::Relaxed);
302 }
303 }
304
305 #[async_test_all]
306 async fn test_event_emission() {
307 let emitter = EventEmitter::new(false);
308 let received = Arc::new(AtomicBool::new(false));
309
310 let listener = Box::new(TestListener {
312 received: received.clone(),
313 });
314
315 let _ = emitter.add_listener(listener).await;
316
317 let event = SdkEvent::Synced {};
318
319 emitter.emit(&event).await;
320
321 assert!(received.load(Ordering::Relaxed));
323 }
324
325 #[async_test_all]
326 async fn test_remove_listener() {
327 let emitter = EventEmitter::new(false);
328
329 let received1 = Arc::new(AtomicBool::new(false));
331 let received2 = Arc::new(AtomicBool::new(false));
332
333 let listener1 = Box::new(TestListener {
335 received: received1.clone(),
336 });
337
338 let listener2 = Box::new(TestListener {
339 received: received2.clone(),
340 });
341
342 let id1 = emitter.add_listener(listener1).await;
343 let id2 = emitter.add_listener(listener2).await;
344
345 assert!(emitter.remove_listener(&id1).await);
347
348 let event = SdkEvent::Synced {};
350 emitter.emit(&event).await;
351
352 assert!(!received1.load(Ordering::Relaxed));
354
355 assert!(received2.load(Ordering::Relaxed));
357
358 assert!(emitter.remove_listener(&id2).await);
360
361 assert!(!emitter.remove_listener("non-existent-id").await);
363 }
364
365 #[async_test_all]
366 async fn test_synced_event_only_emitted_with_wallet_sync() {
367 let emitter = EventEmitter::new(false);
368 let received = Arc::new(AtomicBool::new(false));
369
370 let listener = Box::new(TestListener {
371 received: received.clone(),
372 });
373
374 emitter.add_listener(listener).await;
375
376 emitter
378 .emit_synced(&InternalSyncedEvent {
379 wallet: false,
380 wallet_state: true,
381 deposits: true,
382 lnurl_metadata: true,
383 storage_incoming: None,
384 })
385 .await;
386
387 assert!(!received.load(Ordering::Relaxed));
388
389 emitter
391 .emit_synced(&InternalSyncedEvent {
392 wallet: true,
393 wallet_state: false,
394 deposits: false,
395 lnurl_metadata: false,
396 storage_incoming: Some(1),
397 })
398 .await;
399
400 assert!(received.load(Ordering::Relaxed));
401 }
402
403 #[async_test_all]
404 async fn test_has_real_time_sync_synced_event_only_emitted_with_wallet_and_storage_sync() {
405 let emitter = EventEmitter::new(true);
406 let received = Arc::new(AtomicBool::new(false));
407
408 let listener = Box::new(TestListener {
409 received: received.clone(),
410 });
411
412 emitter.add_listener(listener).await;
413
414 emitter
416 .emit_synced(&InternalSyncedEvent {
417 wallet: false,
418 wallet_state: false,
419 deposits: false,
420 lnurl_metadata: false,
421 storage_incoming: Some(0),
422 })
423 .await;
424
425 assert!(!received.load(Ordering::Relaxed));
426
427 emitter
429 .emit_synced(&InternalSyncedEvent {
430 wallet: true,
431 wallet_state: false,
432 deposits: false,
433 lnurl_metadata: false,
434 storage_incoming: None,
435 })
436 .await;
437
438 assert!(received.load(Ordering::Relaxed));
439 }
440
441 #[async_test_all]
442 async fn test_has_real_time_sync_synced_event_only_emitted_with_wallet_and_storage_sync_reverse()
443 {
444 let emitter = EventEmitter::new(true);
445 let received = Arc::new(AtomicBool::new(false));
446
447 let listener = Box::new(TestListener {
448 received: received.clone(),
449 });
450
451 emitter.add_listener(listener).await;
452
453 emitter
455 .emit_synced(&InternalSyncedEvent {
456 wallet: true,
457 wallet_state: false,
458 deposits: false,
459 lnurl_metadata: false,
460 storage_incoming: None,
461 })
462 .await;
463
464 assert!(!received.load(Ordering::Relaxed));
465
466 emitter
468 .emit_synced(&InternalSyncedEvent {
469 wallet: false,
470 wallet_state: false,
471 deposits: false,
472 lnurl_metadata: false,
473 storage_incoming: Some(0),
474 })
475 .await;
476
477 assert!(received.load(Ordering::Relaxed));
478 }
479
480 #[async_test_all]
481 async fn test_rtsync_failed_emits_synced_on_wallet_alone() {
482 let emitter = EventEmitter::new(true);
483 let received = Arc::new(AtomicBool::new(false));
484
485 let listener = Box::new(TestListener {
486 received: received.clone(),
487 });
488
489 emitter.add_listener(listener).await;
490
491 emitter
493 .emit_synced(&InternalSyncedEvent {
494 wallet: true,
495 wallet_state: false,
496 deposits: false,
497 lnurl_metadata: false,
498 storage_incoming: None,
499 })
500 .await;
501
502 assert!(!received.load(Ordering::Relaxed));
503
504 emitter.notify_rtsync_failed().await;
506
507 assert!(received.load(Ordering::Relaxed));
508 }
509
510 #[async_test_all]
511 async fn test_rtsync_failed_before_wallet_sync_emits_on_wallet() {
512 let emitter = EventEmitter::new(true);
513 let received = Arc::new(AtomicBool::new(false));
514
515 let listener = Box::new(TestListener {
516 received: received.clone(),
517 });
518
519 emitter.add_listener(listener).await;
520
521 emitter.notify_rtsync_failed().await;
523
524 assert!(!received.load(Ordering::Relaxed));
525
526 emitter
528 .emit_synced(&InternalSyncedEvent {
529 wallet: true,
530 wallet_state: false,
531 deposits: false,
532 lnurl_metadata: false,
533 storage_incoming: None,
534 })
535 .await;
536
537 assert!(received.load(Ordering::Relaxed));
538 }
539
540 #[async_test_all]
541 async fn test_synced_event_buffers_until_wallet_sync() {
542 let emitter = EventEmitter::new(false);
543 let received = Arc::new(AtomicBool::new(false));
544
545 let listener = Box::new(TestListener {
546 received: received.clone(),
547 });
548
549 emitter.add_listener(listener).await;
550
551 emitter
553 .emit_synced(&InternalSyncedEvent {
554 wallet: false,
555 wallet_state: true,
556 deposits: false,
557 lnurl_metadata: false,
558 storage_incoming: None,
559 })
560 .await;
561
562 assert!(!received.load(Ordering::Relaxed));
563
564 emitter
565 .emit_synced(&InternalSyncedEvent {
566 wallet: false,
567 wallet_state: false,
568 deposits: true,
569 lnurl_metadata: false,
570 storage_incoming: None,
571 })
572 .await;
573
574 assert!(!received.load(Ordering::Relaxed));
575
576 emitter
577 .emit_synced(&InternalSyncedEvent {
578 wallet: false,
579 wallet_state: false,
580 deposits: false,
581 lnurl_metadata: true,
582 storage_incoming: None,
583 })
584 .await;
585
586 assert!(!received.load(Ordering::Relaxed));
587
588 emitter
589 .emit_synced(&InternalSyncedEvent {
590 wallet: false,
591 wallet_state: false,
592 deposits: false,
593 lnurl_metadata: false,
594 storage_incoming: None,
595 })
596 .await;
597
598 assert!(!received.load(Ordering::Relaxed));
599
600 emitter
602 .emit_synced(&InternalSyncedEvent {
603 wallet: true,
604 wallet_state: false,
605 deposits: false,
606 lnurl_metadata: false,
607 storage_incoming: None,
608 })
609 .await;
610
611 assert!(received.load(Ordering::Relaxed));
612 }
613
614 #[async_test_all]
615 async fn test_synced_event_all_true() {
616 let emitter = EventEmitter::new(false);
617 let received = Arc::new(AtomicBool::new(false));
618
619 let listener = Box::new(TestListener {
620 received: received.clone(),
621 });
622
623 emitter.add_listener(listener).await;
624
625 emitter
627 .emit_synced(&InternalSyncedEvent {
628 wallet: true,
629 wallet_state: true,
630 deposits: true,
631 lnurl_metadata: true,
632 storage_incoming: Some(1),
633 })
634 .await;
635
636 assert!(received.load(Ordering::Relaxed));
637 }
638
639 #[async_test_all]
640 async fn test_synced_event_empty_does_not_emit() {
641 let emitter = EventEmitter::new(false);
642 let received = Arc::new(AtomicBool::new(false));
643
644 let listener = Box::new(TestListener {
645 received: received.clone(),
646 });
647
648 emitter.add_listener(listener).await;
649
650 emitter
652 .emit_synced(&InternalSyncedEvent {
653 wallet: false,
654 wallet_state: false,
655 deposits: false,
656 lnurl_metadata: false,
657 storage_incoming: None,
658 })
659 .await;
660
661 assert!(!received.load(Ordering::Relaxed));
662 }
663
664 #[async_test_all]
665 async fn test_subsequent_syncs_after_wallet_emit_immediately() {
666 use std::sync::atomic::AtomicUsize;
667
668 struct CountingListener {
669 count: Arc<AtomicUsize>,
670 }
671
672 #[macros::async_trait]
673 impl EventListener for CountingListener {
674 async fn on_event(&self, event: SdkEvent) {
675 if matches!(event, SdkEvent::Synced) {
676 self.count.fetch_add(1, Ordering::Relaxed);
677 }
678 }
679 }
680
681 let emitter = EventEmitter::new(true);
682 let count = Arc::new(AtomicUsize::new(0));
683
684 let listener = Box::new(CountingListener {
685 count: count.clone(),
686 });
687
688 emitter.add_listener(listener).await;
689
690 emitter
692 .emit_synced(&InternalSyncedEvent {
693 wallet: true,
694 wallet_state: false,
695 deposits: false,
696 lnurl_metadata: false,
697 storage_incoming: Some(0),
698 })
699 .await;
700
701 assert_eq!(count.load(Ordering::Relaxed), 1);
702
703 emitter
705 .emit_synced(&InternalSyncedEvent {
706 wallet: false,
707 wallet_state: true,
708 deposits: false,
709 lnurl_metadata: false,
710 storage_incoming: None,
711 })
712 .await;
713
714 assert_eq!(count.load(Ordering::Relaxed), 2);
715
716 emitter
718 .emit_synced(&InternalSyncedEvent {
719 wallet: false,
720 wallet_state: false,
721 deposits: true,
722 lnurl_metadata: false,
723 storage_incoming: None,
724 })
725 .await;
726
727 assert_eq!(count.load(Ordering::Relaxed), 3);
728
729 emitter
730 .emit_synced(&InternalSyncedEvent {
731 wallet: false,
732 wallet_state: false,
733 deposits: false,
734 lnurl_metadata: true,
735 storage_incoming: None,
736 })
737 .await;
738
739 assert_eq!(count.load(Ordering::Relaxed), 4);
740
741 emitter
742 .emit_synced(&InternalSyncedEvent {
743 wallet: false,
744 wallet_state: false,
745 deposits: false,
746 lnurl_metadata: false,
747 storage_incoming: Some(1),
748 })
749 .await;
750
751 assert_eq!(count.load(Ordering::Relaxed), 5);
752
753 emitter
755 .emit_synced(&InternalSyncedEvent {
756 wallet: false,
757 wallet_state: false,
758 deposits: false,
759 lnurl_metadata: false,
760 storage_incoming: Some(0),
761 })
762 .await;
763
764 assert_eq!(count.load(Ordering::Relaxed), 5);
765
766 emitter
767 .emit_synced(&InternalSyncedEvent {
768 wallet: true,
769 wallet_state: false,
770 deposits: false,
771 lnurl_metadata: false,
772 storage_incoming: None,
773 })
774 .await;
775
776 assert_eq!(count.load(Ordering::Relaxed), 6);
777 }
778
779 #[async_test_all]
780 async fn test_empty_event_does_not_emit_after_wallet_sync() {
781 use std::sync::atomic::AtomicUsize;
782
783 struct CountingListener {
784 count: Arc<AtomicUsize>,
785 }
786
787 #[macros::async_trait]
788 impl EventListener for CountingListener {
789 async fn on_event(&self, event: SdkEvent) {
790 if matches!(event, SdkEvent::Synced) {
791 self.count.fetch_add(1, Ordering::Relaxed);
792 }
793 }
794 }
795
796 let emitter = EventEmitter::new(false);
797 let count = Arc::new(AtomicUsize::new(0));
798
799 let listener = Box::new(CountingListener {
800 count: count.clone(),
801 });
802
803 emitter.add_listener(listener).await;
804
805 emitter
807 .emit_synced(&InternalSyncedEvent {
808 wallet: true,
809 wallet_state: false,
810 deposits: false,
811 lnurl_metadata: false,
812 storage_incoming: None,
813 })
814 .await;
815
816 assert_eq!(count.load(Ordering::Relaxed), 1);
817
818 emitter
820 .emit_synced(&InternalSyncedEvent {
821 wallet: false,
822 wallet_state: false,
823 deposits: false,
824 lnurl_metadata: false,
825 storage_incoming: None,
826 })
827 .await;
828
829 assert_eq!(count.load(Ordering::Relaxed), 1); emitter
833 .emit_synced(&InternalSyncedEvent {
834 wallet: false,
835 wallet_state: true,
836 deposits: false,
837 lnurl_metadata: false,
838 storage_incoming: None,
839 })
840 .await;
841
842 assert_eq!(count.load(Ordering::Relaxed), 2); }
844}