1use core::fmt;
2use std::{
3 collections::BTreeMap,
4 sync::atomic::{AtomicBool, AtomicU64, Ordering},
5};
6
7use platform_utils::time::Instant;
8use serde::Serialize;
9use tokio::sync::{Mutex, RwLock};
10use tracing::debug;
11use uuid::Uuid;
12
13use crate::{DepositInfo, LightningAddressInfo, Payment};
14
15#[allow(clippy::large_enum_variant)]
17#[derive(Debug, Clone, Serialize)]
18#[cfg_attr(feature = "uniffi", derive(uniffi::Enum))]
19pub enum SdkEvent {
20 Synced,
22 UnclaimedDeposits {
24 unclaimed_deposits: Vec<DepositInfo>,
25 },
26 ClaimedDeposits {
27 claimed_deposits: Vec<DepositInfo>,
28 },
29 PaymentSucceeded {
30 payment: Payment,
31 },
32 PaymentPending {
33 payment: Payment,
34 },
35 PaymentFailed {
36 payment: Payment,
37 },
38 Optimization {
39 optimization_event: OptimizationEvent,
41 },
42 LightningAddressChanged {
43 lightning_address: Option<LightningAddressInfo>,
44 },
45 NewDeposits {
46 new_deposits: Vec<DepositInfo>,
47 },
48}
49
50impl SdkEvent {
51 pub(crate) fn from_payment(payment: Payment) -> Self {
52 match payment.status {
53 crate::PaymentStatus::Completed => SdkEvent::PaymentSucceeded { payment },
54 crate::PaymentStatus::Pending => SdkEvent::PaymentPending { payment },
55 crate::PaymentStatus::Failed => SdkEvent::PaymentFailed { payment },
56 }
57 }
58}
59
60impl fmt::Display for SdkEvent {
61 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
62 match self {
63 SdkEvent::Synced => write!(f, "Synced"),
64 SdkEvent::UnclaimedDeposits { unclaimed_deposits } => {
65 write!(f, "UnclaimedDeposits: {unclaimed_deposits:?}")
66 }
67 SdkEvent::ClaimedDeposits { claimed_deposits } => {
68 write!(f, "ClaimedDeposits: {claimed_deposits:?}")
69 }
70 SdkEvent::PaymentSucceeded { payment } => {
71 write!(f, "PaymentSucceeded: {payment:?}")
72 }
73 SdkEvent::PaymentPending { payment } => {
74 write!(f, "PaymentPending: {payment:?}")
75 }
76 SdkEvent::PaymentFailed { payment } => {
77 write!(f, "PaymentFailed: {payment:?}")
78 }
79 SdkEvent::Optimization {
80 optimization_event: event,
81 } => {
82 write!(f, "Optimization: {event:?}")
83 }
84 SdkEvent::LightningAddressChanged { lightning_address } => {
85 write!(f, "LightningAddressChanged: {lightning_address:?}")
86 }
87 SdkEvent::NewDeposits { new_deposits } => {
88 write!(f, "NewDeposits: {new_deposits:?}")
89 }
90 }
91 }
92}
93
94#[derive(Debug, Clone, Serialize)]
95#[cfg_attr(feature = "uniffi", derive(uniffi::Enum))]
96pub enum OptimizationEvent {
97 Started { total_rounds: u32 },
99 RoundCompleted {
101 current_round: u32,
102 total_rounds: u32,
103 },
104 Completed,
106 Cancelled,
108 Failed { error: String },
110 Skipped,
112}
113
114#[allow(clippy::struct_excessive_bools)]
115#[derive(Debug, Default)]
116pub struct InternalSyncedEvent {
117 pub wallet: bool,
118 pub wallet_state: bool,
119 pub deposits: bool,
120 pub lnurl_metadata: bool,
121 pub storage_incoming: Option<u32>,
122}
123
124impl InternalSyncedEvent {
125 pub fn any(&self) -> bool {
126 self.wallet
127 || self.wallet_state
128 || self.deposits
129 || self.lnurl_metadata
130 || self.storage_incoming.is_some()
131 }
132
133 pub fn any_non_zero(&self) -> bool {
134 self.wallet
135 || self.wallet_state
136 || self.deposits
137 || self.lnurl_metadata
138 || self.storage_incoming.is_some_and(|v| v > 0)
139 }
140
141 pub fn merge(&self, other: &InternalSyncedEvent) -> Self {
142 Self {
143 wallet: self.wallet || other.wallet,
144 wallet_state: self.wallet_state || other.wallet_state,
145 deposits: self.deposits || other.deposits,
146 lnurl_metadata: self.lnurl_metadata || other.lnurl_metadata,
147 storage_incoming: self
148 .storage_incoming
149 .zip(other.storage_incoming)
150 .map(|(a, b)| a.saturating_add(b))
151 .or(self.storage_incoming)
152 .or(other.storage_incoming),
153 }
154 }
155}
156
157#[cfg_attr(feature = "uniffi", uniffi::export(callback_interface))]
159#[macros::async_trait]
160pub trait EventListener: Send + Sync {
161 async fn on_event(&self, event: SdkEvent);
163}
164
165#[macros::async_trait]
173pub trait EventMiddleware: Send + Sync {
174 async fn process(&self, event: SdkEvent) -> Option<SdkEvent>;
176}
177
178pub struct EventEmitter {
185 has_real_time_sync: bool,
186 rtsync_failed: AtomicBool,
187 listener_index: AtomicU64,
188 internal_listeners: RwLock<BTreeMap<String, Box<dyn EventListener>>>,
190 middleware: RwLock<Vec<Box<dyn EventMiddleware>>>,
192 external_listeners: RwLock<BTreeMap<String, Box<dyn EventListener>>>,
194 synced_event_buffer: Mutex<Option<InternalSyncedEvent>>,
195}
196
197impl EventEmitter {
198 pub fn new(has_real_time_sync: bool) -> Self {
200 Self {
201 has_real_time_sync,
202 rtsync_failed: AtomicBool::new(false),
203 listener_index: AtomicU64::new(0),
204 internal_listeners: RwLock::new(BTreeMap::new()),
205 middleware: RwLock::new(Vec::new()),
206 external_listeners: RwLock::new(BTreeMap::new()),
207 synced_event_buffer: Mutex::new(Some(InternalSyncedEvent::default())),
208 }
209 }
210
211 pub async fn add_external_listener(&self, listener: Box<dyn EventListener>) -> String {
221 let index = self.listener_index.fetch_add(1, Ordering::Relaxed);
222 let id = format!("listener_{}-{}", index, Uuid::new_v4());
223 let mut listeners = self.external_listeners.write().await;
224 listeners.insert(id.clone(), listener);
225 id
226 }
227
228 pub async fn remove_external_listener(&self, id: &str) -> bool {
238 let mut listeners = self.external_listeners.write().await;
239 listeners.remove(id).is_some()
240 }
241
242 pub async fn add_internal_listener(&self, listener: Box<dyn EventListener>) -> String {
247 let index = self.listener_index.fetch_add(1, Ordering::Relaxed);
248 let id = format!("internal_{}-{}", index, Uuid::new_v4());
249 let mut listeners = self.internal_listeners.write().await;
250 listeners.insert(id.clone(), listener);
251 id
252 }
253
254 pub async fn remove_internal_listener(&self, id: &str) -> bool {
256 let mut listeners = self.internal_listeners.write().await;
257 listeners.remove(id).is_some()
258 }
259
260 pub async fn add_middleware(&self, middleware: Box<dyn EventMiddleware>) {
264 let mut mw = self.middleware.write().await;
265 mw.push(middleware);
266 }
267
268 pub async fn emit(&self, event: &SdkEvent) {
273 let start = Instant::now();
274
275 let internal = self.internal_listeners.read().await;
277 for listener in internal.values() {
278 listener.on_event(event.clone()).await;
279 }
280 drop(internal);
281
282 let mut event = Some(event.clone());
284 let middleware = self.middleware.read().await;
285 for mw in middleware.iter() {
286 if let Some(e) = event {
287 event = mw.process(e).await;
288 } else {
289 break;
290 }
291 }
292 drop(middleware);
293
294 if let Some(ref event) = event {
296 let listeners = self.external_listeners.read().await;
297 for listener in listeners.values() {
298 listener.on_event(event.clone()).await;
299 }
300 }
301
302 debug!("emit() completed in {:?}", start.elapsed());
303 }
304
305 pub async fn emit_synced(&self, synced: &InternalSyncedEvent) {
306 if !synced.any() {
307 return;
309 }
310
311 let mut mtx = self.synced_event_buffer.lock().await;
312
313 let is_first_event = if let Some(buffered) = &*mtx {
314 let merged = buffered.merge(synced);
315
316 if merged.wallet
319 && (!self.has_real_time_sync
320 || merged.storage_incoming.is_some()
321 || self.rtsync_failed.load(Ordering::Relaxed))
322 {
323 *mtx = None;
324 } else {
325 *mtx = Some(merged);
326 return;
327 }
328
329 true
330 } else {
331 false
332 };
333
334 drop(mtx);
335
336 if !is_first_event && !synced.any_non_zero() {
338 return;
339 }
340
341 self.emit(&SdkEvent::Synced).await;
343 }
344
345 pub async fn notify_rtsync_failed(&self) {
349 self.rtsync_failed.store(true, Ordering::Relaxed);
350
351 let mut mtx = self.synced_event_buffer.lock().await;
352 if let Some(buffered) = &*mtx
353 && buffered.wallet
354 {
355 *mtx = None;
356 drop(mtx);
357 self.emit(&SdkEvent::Synced).await;
358 }
359 }
360}
361
362impl Default for EventEmitter {
363 fn default() -> Self {
364 Self::new(false)
365 }
366}
367
368#[cfg(test)]
369mod tests {
370 use super::*;
371 use std::sync::Arc;
372 use std::sync::atomic::{AtomicBool, Ordering};
373
374 use macros::async_test_all;
375
376 #[cfg(feature = "browser-tests")]
377 wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
378
379 struct TestListener {
380 received: Arc<AtomicBool>,
381 }
382
383 #[macros::async_trait]
384 impl EventListener for TestListener {
385 async fn on_event(&self, _event: SdkEvent) {
386 self.received.store(true, Ordering::Relaxed);
387 }
388 }
389
390 #[async_test_all]
391 async fn test_event_emission() {
392 let emitter = EventEmitter::new(false);
393 let received = Arc::new(AtomicBool::new(false));
394
395 let listener = Box::new(TestListener {
397 received: received.clone(),
398 });
399
400 let _ = emitter.add_external_listener(listener).await;
401
402 let event = SdkEvent::Synced {};
403
404 emitter.emit(&event).await;
405
406 assert!(received.load(Ordering::Relaxed));
408 }
409
410 #[async_test_all]
411 async fn test_remove_listener() {
412 let emitter = EventEmitter::new(false);
413
414 let received1 = Arc::new(AtomicBool::new(false));
416 let received2 = Arc::new(AtomicBool::new(false));
417
418 let listener1 = Box::new(TestListener {
420 received: received1.clone(),
421 });
422
423 let listener2 = Box::new(TestListener {
424 received: received2.clone(),
425 });
426
427 let id1 = emitter.add_external_listener(listener1).await;
428 let id2 = emitter.add_external_listener(listener2).await;
429
430 assert!(emitter.remove_external_listener(&id1).await);
432
433 let event = SdkEvent::Synced {};
435 emitter.emit(&event).await;
436
437 assert!(!received1.load(Ordering::Relaxed));
439
440 assert!(received2.load(Ordering::Relaxed));
442
443 assert!(emitter.remove_external_listener(&id2).await);
445
446 assert!(!emitter.remove_external_listener("non-existent-id").await);
448 }
449
450 #[async_test_all]
451 async fn test_synced_event_only_emitted_with_wallet_sync() {
452 let emitter = EventEmitter::new(false);
453 let received = Arc::new(AtomicBool::new(false));
454
455 let listener = Box::new(TestListener {
456 received: received.clone(),
457 });
458
459 emitter.add_external_listener(listener).await;
460
461 emitter
463 .emit_synced(&InternalSyncedEvent {
464 wallet: false,
465 wallet_state: true,
466 deposits: true,
467 lnurl_metadata: true,
468 storage_incoming: None,
469 })
470 .await;
471
472 assert!(!received.load(Ordering::Relaxed));
473
474 emitter
476 .emit_synced(&InternalSyncedEvent {
477 wallet: true,
478 wallet_state: false,
479 deposits: false,
480 lnurl_metadata: false,
481 storage_incoming: Some(1),
482 })
483 .await;
484
485 assert!(received.load(Ordering::Relaxed));
486 }
487
488 #[async_test_all]
489 async fn test_has_real_time_sync_synced_event_only_emitted_with_wallet_and_storage_sync() {
490 let emitter = EventEmitter::new(true);
491 let received = Arc::new(AtomicBool::new(false));
492
493 let listener = Box::new(TestListener {
494 received: received.clone(),
495 });
496
497 emitter.add_external_listener(listener).await;
498
499 emitter
501 .emit_synced(&InternalSyncedEvent {
502 wallet: false,
503 wallet_state: false,
504 deposits: false,
505 lnurl_metadata: false,
506 storage_incoming: Some(0),
507 })
508 .await;
509
510 assert!(!received.load(Ordering::Relaxed));
511
512 emitter
514 .emit_synced(&InternalSyncedEvent {
515 wallet: true,
516 wallet_state: false,
517 deposits: false,
518 lnurl_metadata: false,
519 storage_incoming: None,
520 })
521 .await;
522
523 assert!(received.load(Ordering::Relaxed));
524 }
525
526 #[async_test_all]
527 async fn test_has_real_time_sync_synced_event_only_emitted_with_wallet_and_storage_sync_reverse()
528 {
529 let emitter = EventEmitter::new(true);
530 let received = Arc::new(AtomicBool::new(false));
531
532 let listener = Box::new(TestListener {
533 received: received.clone(),
534 });
535
536 emitter.add_external_listener(listener).await;
537
538 emitter
540 .emit_synced(&InternalSyncedEvent {
541 wallet: true,
542 wallet_state: false,
543 deposits: false,
544 lnurl_metadata: false,
545 storage_incoming: None,
546 })
547 .await;
548
549 assert!(!received.load(Ordering::Relaxed));
550
551 emitter
553 .emit_synced(&InternalSyncedEvent {
554 wallet: false,
555 wallet_state: false,
556 deposits: false,
557 lnurl_metadata: false,
558 storage_incoming: Some(0),
559 })
560 .await;
561
562 assert!(received.load(Ordering::Relaxed));
563 }
564
565 #[async_test_all]
566 async fn test_rtsync_failed_emits_synced_on_wallet_alone() {
567 let emitter = EventEmitter::new(true);
568 let received = Arc::new(AtomicBool::new(false));
569
570 let listener = Box::new(TestListener {
571 received: received.clone(),
572 });
573
574 emitter.add_external_listener(listener).await;
575
576 emitter
578 .emit_synced(&InternalSyncedEvent {
579 wallet: true,
580 wallet_state: false,
581 deposits: false,
582 lnurl_metadata: false,
583 storage_incoming: None,
584 })
585 .await;
586
587 assert!(!received.load(Ordering::Relaxed));
588
589 emitter.notify_rtsync_failed().await;
591
592 assert!(received.load(Ordering::Relaxed));
593 }
594
595 #[async_test_all]
596 async fn test_rtsync_failed_before_wallet_sync_emits_on_wallet() {
597 let emitter = EventEmitter::new(true);
598 let received = Arc::new(AtomicBool::new(false));
599
600 let listener = Box::new(TestListener {
601 received: received.clone(),
602 });
603
604 emitter.add_external_listener(listener).await;
605
606 emitter.notify_rtsync_failed().await;
608
609 assert!(!received.load(Ordering::Relaxed));
610
611 emitter
613 .emit_synced(&InternalSyncedEvent {
614 wallet: true,
615 wallet_state: false,
616 deposits: false,
617 lnurl_metadata: false,
618 storage_incoming: None,
619 })
620 .await;
621
622 assert!(received.load(Ordering::Relaxed));
623 }
624
625 #[async_test_all]
626 async fn test_synced_event_buffers_until_wallet_sync() {
627 let emitter = EventEmitter::new(false);
628 let received = Arc::new(AtomicBool::new(false));
629
630 let listener = Box::new(TestListener {
631 received: received.clone(),
632 });
633
634 emitter.add_external_listener(listener).await;
635
636 emitter
638 .emit_synced(&InternalSyncedEvent {
639 wallet: false,
640 wallet_state: true,
641 deposits: false,
642 lnurl_metadata: false,
643 storage_incoming: None,
644 })
645 .await;
646
647 assert!(!received.load(Ordering::Relaxed));
648
649 emitter
650 .emit_synced(&InternalSyncedEvent {
651 wallet: false,
652 wallet_state: false,
653 deposits: true,
654 lnurl_metadata: false,
655 storage_incoming: None,
656 })
657 .await;
658
659 assert!(!received.load(Ordering::Relaxed));
660
661 emitter
662 .emit_synced(&InternalSyncedEvent {
663 wallet: false,
664 wallet_state: false,
665 deposits: false,
666 lnurl_metadata: true,
667 storage_incoming: None,
668 })
669 .await;
670
671 assert!(!received.load(Ordering::Relaxed));
672
673 emitter
674 .emit_synced(&InternalSyncedEvent {
675 wallet: false,
676 wallet_state: false,
677 deposits: false,
678 lnurl_metadata: false,
679 storage_incoming: None,
680 })
681 .await;
682
683 assert!(!received.load(Ordering::Relaxed));
684
685 emitter
687 .emit_synced(&InternalSyncedEvent {
688 wallet: true,
689 wallet_state: false,
690 deposits: false,
691 lnurl_metadata: false,
692 storage_incoming: None,
693 })
694 .await;
695
696 assert!(received.load(Ordering::Relaxed));
697 }
698
699 #[async_test_all]
700 async fn test_synced_event_all_true() {
701 let emitter = EventEmitter::new(false);
702 let received = Arc::new(AtomicBool::new(false));
703
704 let listener = Box::new(TestListener {
705 received: received.clone(),
706 });
707
708 emitter.add_external_listener(listener).await;
709
710 emitter
712 .emit_synced(&InternalSyncedEvent {
713 wallet: true,
714 wallet_state: true,
715 deposits: true,
716 lnurl_metadata: true,
717 storage_incoming: Some(1),
718 })
719 .await;
720
721 assert!(received.load(Ordering::Relaxed));
722 }
723
724 #[async_test_all]
725 async fn test_synced_event_empty_does_not_emit() {
726 let emitter = EventEmitter::new(false);
727 let received = Arc::new(AtomicBool::new(false));
728
729 let listener = Box::new(TestListener {
730 received: received.clone(),
731 });
732
733 emitter.add_external_listener(listener).await;
734
735 emitter
737 .emit_synced(&InternalSyncedEvent {
738 wallet: false,
739 wallet_state: false,
740 deposits: false,
741 lnurl_metadata: false,
742 storage_incoming: None,
743 })
744 .await;
745
746 assert!(!received.load(Ordering::Relaxed));
747 }
748
749 #[async_test_all]
750 async fn test_subsequent_syncs_after_wallet_emit_immediately() {
751 use std::sync::atomic::AtomicUsize;
752
753 struct CountingListener {
754 count: Arc<AtomicUsize>,
755 }
756
757 #[macros::async_trait]
758 impl EventListener for CountingListener {
759 async fn on_event(&self, event: SdkEvent) {
760 if matches!(event, SdkEvent::Synced) {
761 self.count.fetch_add(1, Ordering::Relaxed);
762 }
763 }
764 }
765
766 let emitter = EventEmitter::new(true);
767 let count = Arc::new(AtomicUsize::new(0));
768
769 let listener = Box::new(CountingListener {
770 count: count.clone(),
771 });
772
773 emitter.add_external_listener(listener).await;
774
775 emitter
777 .emit_synced(&InternalSyncedEvent {
778 wallet: true,
779 wallet_state: false,
780 deposits: false,
781 lnurl_metadata: false,
782 storage_incoming: Some(0),
783 })
784 .await;
785
786 assert_eq!(count.load(Ordering::Relaxed), 1);
787
788 emitter
790 .emit_synced(&InternalSyncedEvent {
791 wallet: false,
792 wallet_state: true,
793 deposits: false,
794 lnurl_metadata: false,
795 storage_incoming: None,
796 })
797 .await;
798
799 assert_eq!(count.load(Ordering::Relaxed), 2);
800
801 emitter
803 .emit_synced(&InternalSyncedEvent {
804 wallet: false,
805 wallet_state: false,
806 deposits: true,
807 lnurl_metadata: false,
808 storage_incoming: None,
809 })
810 .await;
811
812 assert_eq!(count.load(Ordering::Relaxed), 3);
813
814 emitter
815 .emit_synced(&InternalSyncedEvent {
816 wallet: false,
817 wallet_state: false,
818 deposits: false,
819 lnurl_metadata: true,
820 storage_incoming: None,
821 })
822 .await;
823
824 assert_eq!(count.load(Ordering::Relaxed), 4);
825
826 emitter
827 .emit_synced(&InternalSyncedEvent {
828 wallet: false,
829 wallet_state: false,
830 deposits: false,
831 lnurl_metadata: false,
832 storage_incoming: Some(1),
833 })
834 .await;
835
836 assert_eq!(count.load(Ordering::Relaxed), 5);
837
838 emitter
840 .emit_synced(&InternalSyncedEvent {
841 wallet: false,
842 wallet_state: false,
843 deposits: false,
844 lnurl_metadata: false,
845 storage_incoming: Some(0),
846 })
847 .await;
848
849 assert_eq!(count.load(Ordering::Relaxed), 5);
850
851 emitter
852 .emit_synced(&InternalSyncedEvent {
853 wallet: true,
854 wallet_state: false,
855 deposits: false,
856 lnurl_metadata: false,
857 storage_incoming: None,
858 })
859 .await;
860
861 assert_eq!(count.load(Ordering::Relaxed), 6);
862 }
863
864 struct RecordingListener {
868 events: Arc<Mutex<Vec<String>>>,
869 }
870
871 impl RecordingListener {
872 fn new() -> (Self, Arc<Mutex<Vec<String>>>) {
873 let events = Arc::new(Mutex::new(Vec::new()));
874 (
875 Self {
876 events: events.clone(),
877 },
878 events,
879 )
880 }
881 }
882
883 #[macros::async_trait]
884 impl EventListener for RecordingListener {
885 async fn on_event(&self, event: SdkEvent) {
886 self.events.lock().await.push(format!("{event}"));
887 }
888 }
889
890 struct SuppressSyncedMiddleware;
892
893 #[macros::async_trait]
894 impl EventMiddleware for SuppressSyncedMiddleware {
895 async fn process(&self, event: SdkEvent) -> Option<SdkEvent> {
896 match event {
897 SdkEvent::Synced => None,
898 other => Some(other),
899 }
900 }
901 }
902
903 struct DowngradePaymentMiddleware;
905
906 #[macros::async_trait]
907 impl EventMiddleware for DowngradePaymentMiddleware {
908 async fn process(&self, event: SdkEvent) -> Option<SdkEvent> {
909 match event {
910 SdkEvent::PaymentSucceeded { payment } => {
911 Some(SdkEvent::PaymentPending { payment })
912 }
913 other => Some(other),
914 }
915 }
916 }
917
918 struct SuppressAllMiddleware;
920
921 #[macros::async_trait]
922 impl EventMiddleware for SuppressAllMiddleware {
923 async fn process(&self, _event: SdkEvent) -> Option<SdkEvent> {
924 None
925 }
926 }
927
928 fn test_payment() -> Payment {
929 Payment {
930 id: "test-id".to_string(),
931 payment_type: crate::PaymentType::Receive,
932 status: crate::PaymentStatus::Completed,
933 amount: 1000,
934 fees: 10,
935 timestamp: 123_456,
936 method: crate::PaymentMethod::Spark,
937 details: None,
938 conversion_details: None,
939 }
940 }
941
942 #[async_test_all]
945 async fn test_internal_listener_receives_events() {
946 let emitter = EventEmitter::new(false);
947 let (listener, events) = RecordingListener::new();
948
949 emitter.add_internal_listener(Box::new(listener)).await;
950
951 emitter.emit(&SdkEvent::Synced).await;
952
953 let log = events.lock().await;
954 assert_eq!(log.len(), 1);
955 assert!(log[0].contains("Synced"));
956 }
957
958 #[async_test_all]
959 async fn test_remove_internal_listener() {
960 let emitter = EventEmitter::new(false);
961 let (listener, events) = RecordingListener::new();
962
963 let id = emitter.add_internal_listener(Box::new(listener)).await;
964
965 assert!(emitter.remove_internal_listener(&id).await);
966 assert!(!emitter.remove_internal_listener(&id).await);
967
968 emitter.emit(&SdkEvent::Synced).await;
969
970 assert!(events.lock().await.is_empty());
971 }
972
973 #[async_test_all]
976 async fn test_middleware_suppresses_event_for_external() {
977 let emitter = EventEmitter::new(false);
978 let (ext, ext_events) = RecordingListener::new();
979
980 emitter.add_external_listener(Box::new(ext)).await;
981 emitter
982 .add_middleware(Box::new(SuppressSyncedMiddleware))
983 .await;
984
985 emitter.emit(&SdkEvent::Synced).await;
986
987 assert!(ext_events.lock().await.is_empty());
989 }
990
991 #[async_test_all]
992 async fn test_middleware_transforms_event() {
993 let emitter = EventEmitter::new(false);
994 let (ext, ext_events) = RecordingListener::new();
995
996 emitter.add_external_listener(Box::new(ext)).await;
997 emitter
998 .add_middleware(Box::new(DowngradePaymentMiddleware))
999 .await;
1000
1001 let event = SdkEvent::PaymentSucceeded {
1002 payment: test_payment(),
1003 };
1004 emitter.emit(&event).await;
1005
1006 let log = ext_events.lock().await;
1007 assert_eq!(log.len(), 1);
1008 assert!(log[0].contains("PaymentPending"));
1009 }
1010
1011 #[async_test_all]
1012 async fn test_middleware_passthrough_unmatched_events() {
1013 let emitter = EventEmitter::new(false);
1014 let (ext, ext_events) = RecordingListener::new();
1015
1016 emitter.add_external_listener(Box::new(ext)).await;
1017 emitter
1018 .add_middleware(Box::new(SuppressSyncedMiddleware))
1019 .await;
1020
1021 emitter.emit(&SdkEvent::Synced).await;
1023 let event = SdkEvent::PaymentSucceeded {
1024 payment: test_payment(),
1025 };
1026 emitter.emit(&event).await;
1027
1028 let log = ext_events.lock().await;
1029 assert_eq!(log.len(), 1);
1030 assert!(log[0].contains("PaymentSucceeded"));
1031 }
1032
1033 #[async_test_all]
1034 async fn test_middleware_chain_ordering() {
1035 let emitter = EventEmitter::new(false);
1036 let (ext, ext_events) = RecordingListener::new();
1037
1038 emitter.add_external_listener(Box::new(ext)).await;
1039
1040 emitter
1042 .add_middleware(Box::new(DowngradePaymentMiddleware))
1043 .await;
1044 emitter
1046 .add_middleware(Box::new(SuppressSyncedMiddleware))
1047 .await;
1048
1049 let event = SdkEvent::PaymentSucceeded {
1050 payment: test_payment(),
1051 };
1052 emitter.emit(&event).await;
1053
1054 let log = ext_events.lock().await;
1055 assert_eq!(log.len(), 1);
1056 assert!(log[0].contains("PaymentPending"));
1057 }
1058
1059 #[async_test_all]
1060 async fn test_suppress_all_middleware_stops_chain() {
1061 let emitter = EventEmitter::new(false);
1062 let (ext, ext_events) = RecordingListener::new();
1063
1064 emitter.add_external_listener(Box::new(ext)).await;
1065
1066 emitter
1068 .add_middleware(Box::new(SuppressAllMiddleware))
1069 .await;
1070 emitter
1071 .add_middleware(Box::new(DowngradePaymentMiddleware))
1072 .await;
1073
1074 emitter.emit(&SdkEvent::Synced).await;
1075 let event = SdkEvent::PaymentSucceeded {
1076 payment: test_payment(),
1077 };
1078 emitter.emit(&event).await;
1079
1080 assert!(ext_events.lock().await.is_empty());
1081 }
1082
1083 #[async_test_all]
1086 async fn test_three_phase_emit_flow() {
1087 let emitter = EventEmitter::new(false);
1088 let (int, int_events) = RecordingListener::new();
1089 let (ext, ext_events) = RecordingListener::new();
1090
1091 emitter.add_internal_listener(Box::new(int)).await;
1092 emitter.add_external_listener(Box::new(ext)).await;
1093 emitter
1094 .add_middleware(Box::new(SuppressSyncedMiddleware))
1095 .await;
1096
1097 emitter.emit(&SdkEvent::Synced).await;
1099
1100 assert_eq!(int_events.lock().await.len(), 1);
1101 assert!(ext_events.lock().await.is_empty());
1102
1103 let event = SdkEvent::PaymentSucceeded {
1105 payment: test_payment(),
1106 };
1107 emitter.emit(&event).await;
1108
1109 assert_eq!(int_events.lock().await.len(), 2);
1110 assert_eq!(ext_events.lock().await.len(), 1);
1111 }
1112
1113 #[async_test_all]
1114 async fn test_internal_sees_raw_event_external_sees_transformed() {
1115 let emitter = EventEmitter::new(false);
1116 let (int, int_events) = RecordingListener::new();
1117 let (ext, ext_events) = RecordingListener::new();
1118
1119 emitter.add_internal_listener(Box::new(int)).await;
1120 emitter.add_external_listener(Box::new(ext)).await;
1121 emitter
1122 .add_middleware(Box::new(DowngradePaymentMiddleware))
1123 .await;
1124
1125 let event = SdkEvent::PaymentSucceeded {
1126 payment: test_payment(),
1127 };
1128 emitter.emit(&event).await;
1129
1130 let int_log = int_events.lock().await;
1131 let ext_log = ext_events.lock().await;
1132
1133 assert_eq!(int_log.len(), 1);
1135 assert!(int_log[0].contains("PaymentSucceeded"));
1136
1137 assert_eq!(ext_log.len(), 1);
1139 assert!(ext_log[0].contains("PaymentPending"));
1140 }
1141
1142 #[async_test_all]
1143 async fn test_no_listeners_no_middleware_does_not_panic() {
1144 let emitter = EventEmitter::new(false);
1145 emitter.emit(&SdkEvent::Synced).await;
1146 }
1148
1149 #[async_test_all]
1150 async fn test_empty_event_does_not_emit_after_wallet_sync() {
1151 use std::sync::atomic::AtomicUsize;
1152
1153 struct CountingListener {
1154 count: Arc<AtomicUsize>,
1155 }
1156
1157 #[macros::async_trait]
1158 impl EventListener for CountingListener {
1159 async fn on_event(&self, event: SdkEvent) {
1160 if matches!(event, SdkEvent::Synced) {
1161 self.count.fetch_add(1, Ordering::Relaxed);
1162 }
1163 }
1164 }
1165
1166 let emitter = EventEmitter::new(false);
1167 let count = Arc::new(AtomicUsize::new(0));
1168
1169 let listener = Box::new(CountingListener {
1170 count: count.clone(),
1171 });
1172
1173 emitter.add_external_listener(listener).await;
1174
1175 emitter
1177 .emit_synced(&InternalSyncedEvent {
1178 wallet: true,
1179 wallet_state: false,
1180 deposits: false,
1181 lnurl_metadata: false,
1182 storage_incoming: None,
1183 })
1184 .await;
1185
1186 assert_eq!(count.load(Ordering::Relaxed), 1);
1187
1188 emitter
1190 .emit_synced(&InternalSyncedEvent {
1191 wallet: false,
1192 wallet_state: false,
1193 deposits: false,
1194 lnurl_metadata: false,
1195 storage_incoming: None,
1196 })
1197 .await;
1198
1199 assert_eq!(count.load(Ordering::Relaxed), 1); emitter
1203 .emit_synced(&InternalSyncedEvent {
1204 wallet: false,
1205 wallet_state: true,
1206 deposits: false,
1207 lnurl_metadata: false,
1208 storage_incoming: None,
1209 })
1210 .await;
1211
1212 assert_eq!(count.load(Ordering::Relaxed), 2); }
1214}