1use core::fmt;
2use std::{
3 collections::BTreeMap,
4 sync::atomic::{AtomicBool, AtomicU64, Ordering},
5};
6
7use platform_utils::time::Instant;
8use serde::Serialize;
9use tokio::sync::{Mutex, RwLock};
10use tracing::info;
11use uuid::Uuid;
12
13use crate::{DepositInfo, LightningAddressInfo, Payment, sdk::RuntimeEvent};
14
15#[allow(clippy::large_enum_variant)]
17#[derive(Debug, Clone, Serialize)]
18#[cfg_attr(feature = "uniffi", derive(uniffi::Enum))]
19pub enum SdkEvent {
20 Synced,
22 UnclaimedDeposits {
24 unclaimed_deposits: Vec<DepositInfo>,
25 },
26 ClaimedDeposits {
27 claimed_deposits: Vec<DepositInfo>,
28 },
29 PaymentSucceeded {
30 payment: Payment,
31 },
32 PaymentPending {
33 payment: Payment,
34 },
35 PaymentFailed {
36 payment: Payment,
37 },
38 Optimization {
39 optimization_event: OptimizationEvent,
41 },
42 LightningAddressChanged {
43 lightning_address: Option<LightningAddressInfo>,
44 },
45 NewDeposits {
46 new_deposits: Vec<DepositInfo>,
47 },
48}
49
50impl SdkEvent {
51 pub(crate) fn from_payment(payment: Payment) -> Self {
52 match payment.status {
53 crate::PaymentStatus::Completed => SdkEvent::PaymentSucceeded { payment },
54 crate::PaymentStatus::Pending => SdkEvent::PaymentPending { payment },
55 crate::PaymentStatus::Failed => SdkEvent::PaymentFailed { payment },
56 }
57 }
58}
59
60impl fmt::Display for SdkEvent {
61 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
62 match self {
63 SdkEvent::Synced => write!(f, "Synced"),
64 SdkEvent::UnclaimedDeposits { unclaimed_deposits } => {
65 write!(f, "UnclaimedDeposits: {unclaimed_deposits:?}")
66 }
67 SdkEvent::ClaimedDeposits { claimed_deposits } => {
68 write!(f, "ClaimedDeposits: {claimed_deposits:?}")
69 }
70 SdkEvent::PaymentSucceeded { payment } => {
71 write!(f, "PaymentSucceeded: {payment:?}")
72 }
73 SdkEvent::PaymentPending { payment } => {
74 write!(f, "PaymentPending: {payment:?}")
75 }
76 SdkEvent::PaymentFailed { payment } => {
77 write!(f, "PaymentFailed: {payment:?}")
78 }
79 SdkEvent::Optimization {
80 optimization_event: event,
81 } => {
82 write!(f, "Optimization: {event:?}")
83 }
84 SdkEvent::LightningAddressChanged { lightning_address } => {
85 write!(f, "LightningAddressChanged: {lightning_address:?}")
86 }
87 SdkEvent::NewDeposits { new_deposits } => {
88 write!(f, "NewDeposits: {new_deposits:?}")
89 }
90 }
91 }
92}
93
94#[derive(Debug, Clone, Serialize)]
95#[cfg_attr(feature = "uniffi", derive(uniffi::Enum))]
96pub enum OptimizationEvent {
97 Started { total_rounds: u32 },
99 RoundCompleted {
101 current_round: u32,
102 total_rounds: u32,
103 },
104 Completed,
106 Cancelled,
108 Failed { error: String },
110 Skipped,
112}
113
114#[allow(clippy::struct_excessive_bools)]
115#[derive(Debug, Default)]
116pub struct InternalSyncedEvent {
117 pub wallet: bool,
118 pub wallet_state: bool,
119 pub deposits: bool,
120 pub lnurl_metadata: bool,
121 pub storage_incoming: Option<u32>,
122}
123
124impl InternalSyncedEvent {
125 pub fn any(&self) -> bool {
126 self.wallet
127 || self.wallet_state
128 || self.deposits
129 || self.lnurl_metadata
130 || self.storage_incoming.is_some()
131 }
132
133 pub fn any_non_zero(&self) -> bool {
134 self.wallet
135 || self.wallet_state
136 || self.deposits
137 || self.lnurl_metadata
138 || self.storage_incoming.is_some_and(|v| v > 0)
139 }
140
141 pub fn merge(&self, other: &InternalSyncedEvent) -> Self {
142 Self {
143 wallet: self.wallet || other.wallet,
144 wallet_state: self.wallet_state || other.wallet_state,
145 deposits: self.deposits || other.deposits,
146 lnurl_metadata: self.lnurl_metadata || other.lnurl_metadata,
147 storage_incoming: self
148 .storage_incoming
149 .zip(other.storage_incoming)
150 .map(|(a, b)| a.saturating_add(b))
151 .or(self.storage_incoming)
152 .or(other.storage_incoming),
153 }
154 }
155}
156
157#[cfg_attr(feature = "uniffi", uniffi::export(callback_interface))]
159#[macros::async_trait]
160pub trait EventListener: Send + Sync {
161 async fn on_event(&self, event: SdkEvent);
163}
164
165#[macros::async_trait]
173pub trait EventMiddleware: Send + Sync {
174 async fn process(&self, event: SdkEvent) -> Option<SdkEvent>;
176}
177
178pub struct EventEmitter {
185 has_real_time_sync: bool,
186 rtsync_failed: AtomicBool,
187 listener_index: AtomicU64,
188 runtime_event_handlers: RwLock<Vec<Box<dyn RuntimeEventHandler>>>,
189 internal_listeners: RwLock<BTreeMap<String, Box<dyn EventListener>>>,
191 middleware: RwLock<Vec<Box<dyn EventMiddleware>>>,
193 external_listeners: RwLock<BTreeMap<String, Box<dyn EventListener>>>,
195 synced_event_buffer: Mutex<Option<InternalSyncedEvent>>,
196}
197
198#[macros::async_trait]
199pub(crate) trait RuntimeEventHandler: Send + Sync {
200 async fn handle(&self, event: RuntimeEvent);
201}
202
203impl EventEmitter {
204 pub fn new(has_real_time_sync: bool) -> Self {
206 Self {
207 has_real_time_sync,
208 rtsync_failed: AtomicBool::new(false),
209 listener_index: AtomicU64::new(0),
210 runtime_event_handlers: RwLock::new(Vec::new()),
211 internal_listeners: RwLock::new(BTreeMap::new()),
212 middleware: RwLock::new(Vec::new()),
213 external_listeners: RwLock::new(BTreeMap::new()),
214 synced_event_buffer: Mutex::new(Some(InternalSyncedEvent::default())),
215 }
216 }
217
218 pub async fn add_external_listener(&self, listener: Box<dyn EventListener>) -> String {
228 let index = self.listener_index.fetch_add(1, Ordering::Relaxed);
229 let id = format!("listener_{}-{}", index, Uuid::new_v4());
230 let mut listeners = self.external_listeners.write().await;
231 listeners.insert(id.clone(), listener);
232 id
233 }
234
235 pub async fn remove_external_listener(&self, id: &str) -> bool {
245 let mut listeners = self.external_listeners.write().await;
246 listeners.remove(id).is_some()
247 }
248
249 pub async fn add_internal_listener(&self, listener: Box<dyn EventListener>) -> String {
254 let index = self.listener_index.fetch_add(1, Ordering::Relaxed);
255 let id = format!("internal_{}-{}", index, Uuid::new_v4());
256 let mut listeners = self.internal_listeners.write().await;
257 listeners.insert(id.clone(), listener);
258 id
259 }
260
261 pub async fn remove_internal_listener(&self, id: &str) -> bool {
263 let mut listeners = self.internal_listeners.write().await;
264 listeners.remove(id).is_some()
265 }
266
267 pub async fn add_middleware(&self, middleware: Box<dyn EventMiddleware>) {
271 let mut mw = self.middleware.write().await;
272 mw.push(middleware);
273 }
274
275 pub(crate) async fn add_runtime_event_handler(&self, handler: Box<dyn RuntimeEventHandler>) {
276 let mut handlers = self.runtime_event_handlers.write().await;
277 handlers.push(handler);
278 }
279
280 pub(crate) async fn emit_runtime_event(&self, event: RuntimeEvent) {
281 let handlers = self.runtime_event_handlers.read().await;
282 for handler in handlers.iter() {
283 handler.handle(event.clone()).await;
284 }
285 }
286
287 pub async fn emit(&self, event: &SdkEvent) {
292 let start = Instant::now();
293 let event_label = format!("{event}");
294 let mut internal_total = std::time::Duration::ZERO;
295 let mut middleware_total = std::time::Duration::ZERO;
296 let mut external_total = std::time::Duration::ZERO;
297
298 let internal = self.internal_listeners.read().await;
300 let internal_count = internal.len();
301 for (id, listener) in internal.iter() {
302 let t = Instant::now();
303 listener.on_event(event.clone()).await;
304 let dt = t.elapsed();
305 internal_total = internal_total.saturating_add(dt);
306 info!("emit({event_label}) internal listener {id}: {dt:?}");
307 }
308 drop(internal);
309
310 let mut event = Some(event.clone());
312 let middleware = self.middleware.read().await;
313 let middleware_count = middleware.len();
314 for (i, mw) in middleware.iter().enumerate() {
315 if let Some(e) = event {
316 let t = Instant::now();
317 event = mw.process(e).await;
318 let dt = t.elapsed();
319 middleware_total = middleware_total.saturating_add(dt);
320 info!("emit({event_label}) middleware #{i}: {dt:?}");
321 } else {
322 break;
323 }
324 }
325 drop(middleware);
326
327 let mut external_count = 0;
329 if let Some(ref event) = event {
330 let listeners = self.external_listeners.read().await;
331 external_count = listeners.len();
332 for (id, listener) in listeners.iter() {
333 let t = Instant::now();
334 listener.on_event(event.clone()).await;
335 let dt = t.elapsed();
336 external_total = external_total.saturating_add(dt);
337 info!("emit({event_label}) external listener {id}: {dt:?}");
338 }
339 }
340
341 info!(
342 "emit({event_label}) completed in {:?} (internal[{}]={:?}, middleware[{}]={:?}, external[{}]={:?})",
343 start.elapsed(),
344 internal_count,
345 internal_total,
346 middleware_count,
347 middleware_total,
348 external_count,
349 external_total
350 );
351 }
352
353 pub async fn emit_synced(&self, synced: &InternalSyncedEvent) {
354 if !synced.any() {
355 return;
357 }
358
359 let mut mtx = self.synced_event_buffer.lock().await;
360
361 let is_first_event = if let Some(buffered) = &*mtx {
362 let merged = buffered.merge(synced);
363
364 if merged.wallet
367 && (!self.has_real_time_sync
368 || merged.storage_incoming.is_some()
369 || self.rtsync_failed.load(Ordering::Relaxed))
370 {
371 *mtx = None;
372 } else {
373 *mtx = Some(merged);
374 return;
375 }
376
377 true
378 } else {
379 false
380 };
381
382 drop(mtx);
383
384 if !is_first_event && !synced.any_non_zero() {
386 return;
387 }
388
389 self.emit(&SdkEvent::Synced).await;
391 }
392
393 pub async fn notify_rtsync_failed(&self) {
397 self.rtsync_failed.store(true, Ordering::Relaxed);
398
399 let mut mtx = self.synced_event_buffer.lock().await;
400 if let Some(buffered) = &*mtx
401 && buffered.wallet
402 {
403 *mtx = None;
404 drop(mtx);
405 self.emit(&SdkEvent::Synced).await;
406 }
407 }
408}
409
410impl Default for EventEmitter {
411 fn default() -> Self {
412 Self::new(false)
413 }
414}
415
416#[cfg(test)]
417mod tests {
418 use super::*;
419 use std::sync::Arc;
420 use std::sync::atomic::{AtomicBool, Ordering};
421
422 use macros::async_test_all;
423
424 #[cfg(feature = "browser-tests")]
425 wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
426
427 struct TestListener {
428 received: Arc<AtomicBool>,
429 }
430
431 #[macros::async_trait]
432 impl EventListener for TestListener {
433 async fn on_event(&self, _event: SdkEvent) {
434 self.received.store(true, Ordering::Relaxed);
435 }
436 }
437
438 #[async_test_all]
439 async fn test_event_emission() {
440 let emitter = EventEmitter::new(false);
441 let received = Arc::new(AtomicBool::new(false));
442
443 let listener = Box::new(TestListener {
445 received: received.clone(),
446 });
447
448 let _ = emitter.add_external_listener(listener).await;
449
450 let event = SdkEvent::Synced {};
451
452 emitter.emit(&event).await;
453
454 assert!(received.load(Ordering::Relaxed));
456 }
457
458 #[async_test_all]
459 async fn test_remove_listener() {
460 let emitter = EventEmitter::new(false);
461
462 let received1 = Arc::new(AtomicBool::new(false));
464 let received2 = Arc::new(AtomicBool::new(false));
465
466 let listener1 = Box::new(TestListener {
468 received: received1.clone(),
469 });
470
471 let listener2 = Box::new(TestListener {
472 received: received2.clone(),
473 });
474
475 let id1 = emitter.add_external_listener(listener1).await;
476 let id2 = emitter.add_external_listener(listener2).await;
477
478 assert!(emitter.remove_external_listener(&id1).await);
480
481 let event = SdkEvent::Synced {};
483 emitter.emit(&event).await;
484
485 assert!(!received1.load(Ordering::Relaxed));
487
488 assert!(received2.load(Ordering::Relaxed));
490
491 assert!(emitter.remove_external_listener(&id2).await);
493
494 assert!(!emitter.remove_external_listener("non-existent-id").await);
496 }
497
498 #[async_test_all]
499 async fn test_synced_event_only_emitted_with_wallet_sync() {
500 let emitter = EventEmitter::new(false);
501 let received = Arc::new(AtomicBool::new(false));
502
503 let listener = Box::new(TestListener {
504 received: received.clone(),
505 });
506
507 emitter.add_external_listener(listener).await;
508
509 emitter
511 .emit_synced(&InternalSyncedEvent {
512 wallet: false,
513 wallet_state: true,
514 deposits: true,
515 lnurl_metadata: true,
516 storage_incoming: None,
517 })
518 .await;
519
520 assert!(!received.load(Ordering::Relaxed));
521
522 emitter
524 .emit_synced(&InternalSyncedEvent {
525 wallet: true,
526 wallet_state: false,
527 deposits: false,
528 lnurl_metadata: false,
529 storage_incoming: Some(1),
530 })
531 .await;
532
533 assert!(received.load(Ordering::Relaxed));
534 }
535
536 #[async_test_all]
537 async fn test_has_real_time_sync_synced_event_only_emitted_with_wallet_and_storage_sync() {
538 let emitter = EventEmitter::new(true);
539 let received = Arc::new(AtomicBool::new(false));
540
541 let listener = Box::new(TestListener {
542 received: received.clone(),
543 });
544
545 emitter.add_external_listener(listener).await;
546
547 emitter
549 .emit_synced(&InternalSyncedEvent {
550 wallet: false,
551 wallet_state: false,
552 deposits: false,
553 lnurl_metadata: false,
554 storage_incoming: Some(0),
555 })
556 .await;
557
558 assert!(!received.load(Ordering::Relaxed));
559
560 emitter
562 .emit_synced(&InternalSyncedEvent {
563 wallet: true,
564 wallet_state: false,
565 deposits: false,
566 lnurl_metadata: false,
567 storage_incoming: None,
568 })
569 .await;
570
571 assert!(received.load(Ordering::Relaxed));
572 }
573
574 #[async_test_all]
575 async fn test_has_real_time_sync_synced_event_only_emitted_with_wallet_and_storage_sync_reverse()
576 {
577 let emitter = EventEmitter::new(true);
578 let received = Arc::new(AtomicBool::new(false));
579
580 let listener = Box::new(TestListener {
581 received: received.clone(),
582 });
583
584 emitter.add_external_listener(listener).await;
585
586 emitter
588 .emit_synced(&InternalSyncedEvent {
589 wallet: true,
590 wallet_state: false,
591 deposits: false,
592 lnurl_metadata: false,
593 storage_incoming: None,
594 })
595 .await;
596
597 assert!(!received.load(Ordering::Relaxed));
598
599 emitter
601 .emit_synced(&InternalSyncedEvent {
602 wallet: false,
603 wallet_state: false,
604 deposits: false,
605 lnurl_metadata: false,
606 storage_incoming: Some(0),
607 })
608 .await;
609
610 assert!(received.load(Ordering::Relaxed));
611 }
612
613 #[async_test_all]
614 async fn test_rtsync_failed_emits_synced_on_wallet_alone() {
615 let emitter = EventEmitter::new(true);
616 let received = Arc::new(AtomicBool::new(false));
617
618 let listener = Box::new(TestListener {
619 received: received.clone(),
620 });
621
622 emitter.add_external_listener(listener).await;
623
624 emitter
626 .emit_synced(&InternalSyncedEvent {
627 wallet: true,
628 wallet_state: false,
629 deposits: false,
630 lnurl_metadata: false,
631 storage_incoming: None,
632 })
633 .await;
634
635 assert!(!received.load(Ordering::Relaxed));
636
637 emitter.notify_rtsync_failed().await;
639
640 assert!(received.load(Ordering::Relaxed));
641 }
642
643 #[async_test_all]
644 async fn test_rtsync_failed_before_wallet_sync_emits_on_wallet() {
645 let emitter = EventEmitter::new(true);
646 let received = Arc::new(AtomicBool::new(false));
647
648 let listener = Box::new(TestListener {
649 received: received.clone(),
650 });
651
652 emitter.add_external_listener(listener).await;
653
654 emitter.notify_rtsync_failed().await;
656
657 assert!(!received.load(Ordering::Relaxed));
658
659 emitter
661 .emit_synced(&InternalSyncedEvent {
662 wallet: true,
663 wallet_state: false,
664 deposits: false,
665 lnurl_metadata: false,
666 storage_incoming: None,
667 })
668 .await;
669
670 assert!(received.load(Ordering::Relaxed));
671 }
672
673 #[async_test_all]
674 async fn test_synced_event_buffers_until_wallet_sync() {
675 let emitter = EventEmitter::new(false);
676 let received = Arc::new(AtomicBool::new(false));
677
678 let listener = Box::new(TestListener {
679 received: received.clone(),
680 });
681
682 emitter.add_external_listener(listener).await;
683
684 emitter
686 .emit_synced(&InternalSyncedEvent {
687 wallet: false,
688 wallet_state: true,
689 deposits: false,
690 lnurl_metadata: false,
691 storage_incoming: None,
692 })
693 .await;
694
695 assert!(!received.load(Ordering::Relaxed));
696
697 emitter
698 .emit_synced(&InternalSyncedEvent {
699 wallet: false,
700 wallet_state: false,
701 deposits: true,
702 lnurl_metadata: false,
703 storage_incoming: None,
704 })
705 .await;
706
707 assert!(!received.load(Ordering::Relaxed));
708
709 emitter
710 .emit_synced(&InternalSyncedEvent {
711 wallet: false,
712 wallet_state: false,
713 deposits: false,
714 lnurl_metadata: true,
715 storage_incoming: None,
716 })
717 .await;
718
719 assert!(!received.load(Ordering::Relaxed));
720
721 emitter
722 .emit_synced(&InternalSyncedEvent {
723 wallet: false,
724 wallet_state: false,
725 deposits: false,
726 lnurl_metadata: false,
727 storage_incoming: None,
728 })
729 .await;
730
731 assert!(!received.load(Ordering::Relaxed));
732
733 emitter
735 .emit_synced(&InternalSyncedEvent {
736 wallet: true,
737 wallet_state: false,
738 deposits: false,
739 lnurl_metadata: false,
740 storage_incoming: None,
741 })
742 .await;
743
744 assert!(received.load(Ordering::Relaxed));
745 }
746
747 #[async_test_all]
748 async fn test_synced_event_all_true() {
749 let emitter = EventEmitter::new(false);
750 let received = Arc::new(AtomicBool::new(false));
751
752 let listener = Box::new(TestListener {
753 received: received.clone(),
754 });
755
756 emitter.add_external_listener(listener).await;
757
758 emitter
760 .emit_synced(&InternalSyncedEvent {
761 wallet: true,
762 wallet_state: true,
763 deposits: true,
764 lnurl_metadata: true,
765 storage_incoming: Some(1),
766 })
767 .await;
768
769 assert!(received.load(Ordering::Relaxed));
770 }
771
772 #[async_test_all]
773 async fn test_synced_event_empty_does_not_emit() {
774 let emitter = EventEmitter::new(false);
775 let received = Arc::new(AtomicBool::new(false));
776
777 let listener = Box::new(TestListener {
778 received: received.clone(),
779 });
780
781 emitter.add_external_listener(listener).await;
782
783 emitter
785 .emit_synced(&InternalSyncedEvent {
786 wallet: false,
787 wallet_state: false,
788 deposits: false,
789 lnurl_metadata: false,
790 storage_incoming: None,
791 })
792 .await;
793
794 assert!(!received.load(Ordering::Relaxed));
795 }
796
797 #[async_test_all]
798 async fn test_subsequent_syncs_after_wallet_emit_immediately() {
799 use std::sync::atomic::AtomicUsize;
800
801 struct CountingListener {
802 count: Arc<AtomicUsize>,
803 }
804
805 #[macros::async_trait]
806 impl EventListener for CountingListener {
807 async fn on_event(&self, event: SdkEvent) {
808 if matches!(event, SdkEvent::Synced) {
809 self.count.fetch_add(1, Ordering::Relaxed);
810 }
811 }
812 }
813
814 let emitter = EventEmitter::new(true);
815 let count = Arc::new(AtomicUsize::new(0));
816
817 let listener = Box::new(CountingListener {
818 count: count.clone(),
819 });
820
821 emitter.add_external_listener(listener).await;
822
823 emitter
825 .emit_synced(&InternalSyncedEvent {
826 wallet: true,
827 wallet_state: false,
828 deposits: false,
829 lnurl_metadata: false,
830 storage_incoming: Some(0),
831 })
832 .await;
833
834 assert_eq!(count.load(Ordering::Relaxed), 1);
835
836 emitter
838 .emit_synced(&InternalSyncedEvent {
839 wallet: false,
840 wallet_state: true,
841 deposits: false,
842 lnurl_metadata: false,
843 storage_incoming: None,
844 })
845 .await;
846
847 assert_eq!(count.load(Ordering::Relaxed), 2);
848
849 emitter
851 .emit_synced(&InternalSyncedEvent {
852 wallet: false,
853 wallet_state: false,
854 deposits: true,
855 lnurl_metadata: false,
856 storage_incoming: None,
857 })
858 .await;
859
860 assert_eq!(count.load(Ordering::Relaxed), 3);
861
862 emitter
863 .emit_synced(&InternalSyncedEvent {
864 wallet: false,
865 wallet_state: false,
866 deposits: false,
867 lnurl_metadata: true,
868 storage_incoming: None,
869 })
870 .await;
871
872 assert_eq!(count.load(Ordering::Relaxed), 4);
873
874 emitter
875 .emit_synced(&InternalSyncedEvent {
876 wallet: false,
877 wallet_state: false,
878 deposits: false,
879 lnurl_metadata: false,
880 storage_incoming: Some(1),
881 })
882 .await;
883
884 assert_eq!(count.load(Ordering::Relaxed), 5);
885
886 emitter
888 .emit_synced(&InternalSyncedEvent {
889 wallet: false,
890 wallet_state: false,
891 deposits: false,
892 lnurl_metadata: false,
893 storage_incoming: Some(0),
894 })
895 .await;
896
897 assert_eq!(count.load(Ordering::Relaxed), 5);
898
899 emitter
900 .emit_synced(&InternalSyncedEvent {
901 wallet: true,
902 wallet_state: false,
903 deposits: false,
904 lnurl_metadata: false,
905 storage_incoming: None,
906 })
907 .await;
908
909 assert_eq!(count.load(Ordering::Relaxed), 6);
910 }
911
912 struct RecordingListener {
916 events: Arc<Mutex<Vec<String>>>,
917 }
918
919 impl RecordingListener {
920 fn new() -> (Self, Arc<Mutex<Vec<String>>>) {
921 let events = Arc::new(Mutex::new(Vec::new()));
922 (
923 Self {
924 events: events.clone(),
925 },
926 events,
927 )
928 }
929 }
930
931 #[macros::async_trait]
932 impl EventListener for RecordingListener {
933 async fn on_event(&self, event: SdkEvent) {
934 self.events.lock().await.push(format!("{event}"));
935 }
936 }
937
938 struct SuppressSyncedMiddleware;
940
941 #[macros::async_trait]
942 impl EventMiddleware for SuppressSyncedMiddleware {
943 async fn process(&self, event: SdkEvent) -> Option<SdkEvent> {
944 match event {
945 SdkEvent::Synced => None,
946 other => Some(other),
947 }
948 }
949 }
950
951 struct DowngradePaymentMiddleware;
953
954 #[macros::async_trait]
955 impl EventMiddleware for DowngradePaymentMiddleware {
956 async fn process(&self, event: SdkEvent) -> Option<SdkEvent> {
957 match event {
958 SdkEvent::PaymentSucceeded { payment } => {
959 Some(SdkEvent::PaymentPending { payment })
960 }
961 other => Some(other),
962 }
963 }
964 }
965
966 struct SuppressAllMiddleware;
968
969 #[macros::async_trait]
970 impl EventMiddleware for SuppressAllMiddleware {
971 async fn process(&self, _event: SdkEvent) -> Option<SdkEvent> {
972 None
973 }
974 }
975
976 fn test_payment() -> Payment {
977 Payment {
978 id: "test-id".to_string(),
979 payment_type: crate::PaymentType::Receive,
980 status: crate::PaymentStatus::Completed,
981 amount: 1000,
982 fees: 10,
983 timestamp: 123_456,
984 method: crate::PaymentMethod::Spark,
985 details: None,
986 conversion_details: None,
987 }
988 }
989
990 #[async_test_all]
993 async fn test_internal_listener_receives_events() {
994 let emitter = EventEmitter::new(false);
995 let (listener, events) = RecordingListener::new();
996
997 emitter.add_internal_listener(Box::new(listener)).await;
998
999 emitter.emit(&SdkEvent::Synced).await;
1000
1001 let log = events.lock().await;
1002 assert_eq!(log.len(), 1);
1003 assert!(log[0].contains("Synced"));
1004 }
1005
1006 #[async_test_all]
1007 async fn test_remove_internal_listener() {
1008 let emitter = EventEmitter::new(false);
1009 let (listener, events) = RecordingListener::new();
1010
1011 let id = emitter.add_internal_listener(Box::new(listener)).await;
1012
1013 assert!(emitter.remove_internal_listener(&id).await);
1014 assert!(!emitter.remove_internal_listener(&id).await);
1015
1016 emitter.emit(&SdkEvent::Synced).await;
1017
1018 assert!(events.lock().await.is_empty());
1019 }
1020
1021 #[async_test_all]
1024 async fn test_middleware_suppresses_event_for_external() {
1025 let emitter = EventEmitter::new(false);
1026 let (ext, ext_events) = RecordingListener::new();
1027
1028 emitter.add_external_listener(Box::new(ext)).await;
1029 emitter
1030 .add_middleware(Box::new(SuppressSyncedMiddleware))
1031 .await;
1032
1033 emitter.emit(&SdkEvent::Synced).await;
1034
1035 assert!(ext_events.lock().await.is_empty());
1037 }
1038
1039 #[async_test_all]
1040 async fn test_middleware_transforms_event() {
1041 let emitter = EventEmitter::new(false);
1042 let (ext, ext_events) = RecordingListener::new();
1043
1044 emitter.add_external_listener(Box::new(ext)).await;
1045 emitter
1046 .add_middleware(Box::new(DowngradePaymentMiddleware))
1047 .await;
1048
1049 let event = SdkEvent::PaymentSucceeded {
1050 payment: test_payment(),
1051 };
1052 emitter.emit(&event).await;
1053
1054 let log = ext_events.lock().await;
1055 assert_eq!(log.len(), 1);
1056 assert!(log[0].contains("PaymentPending"));
1057 }
1058
1059 #[async_test_all]
1060 async fn test_middleware_passthrough_unmatched_events() {
1061 let emitter = EventEmitter::new(false);
1062 let (ext, ext_events) = RecordingListener::new();
1063
1064 emitter.add_external_listener(Box::new(ext)).await;
1065 emitter
1066 .add_middleware(Box::new(SuppressSyncedMiddleware))
1067 .await;
1068
1069 emitter.emit(&SdkEvent::Synced).await;
1071 let event = SdkEvent::PaymentSucceeded {
1072 payment: test_payment(),
1073 };
1074 emitter.emit(&event).await;
1075
1076 let log = ext_events.lock().await;
1077 assert_eq!(log.len(), 1);
1078 assert!(log[0].contains("PaymentSucceeded"));
1079 }
1080
1081 #[async_test_all]
1082 async fn test_middleware_chain_ordering() {
1083 let emitter = EventEmitter::new(false);
1084 let (ext, ext_events) = RecordingListener::new();
1085
1086 emitter.add_external_listener(Box::new(ext)).await;
1087
1088 emitter
1090 .add_middleware(Box::new(DowngradePaymentMiddleware))
1091 .await;
1092 emitter
1094 .add_middleware(Box::new(SuppressSyncedMiddleware))
1095 .await;
1096
1097 let event = SdkEvent::PaymentSucceeded {
1098 payment: test_payment(),
1099 };
1100 emitter.emit(&event).await;
1101
1102 let log = ext_events.lock().await;
1103 assert_eq!(log.len(), 1);
1104 assert!(log[0].contains("PaymentPending"));
1105 }
1106
1107 #[async_test_all]
1108 async fn test_suppress_all_middleware_stops_chain() {
1109 let emitter = EventEmitter::new(false);
1110 let (ext, ext_events) = RecordingListener::new();
1111
1112 emitter.add_external_listener(Box::new(ext)).await;
1113
1114 emitter
1116 .add_middleware(Box::new(SuppressAllMiddleware))
1117 .await;
1118 emitter
1119 .add_middleware(Box::new(DowngradePaymentMiddleware))
1120 .await;
1121
1122 emitter.emit(&SdkEvent::Synced).await;
1123 let event = SdkEvent::PaymentSucceeded {
1124 payment: test_payment(),
1125 };
1126 emitter.emit(&event).await;
1127
1128 assert!(ext_events.lock().await.is_empty());
1129 }
1130
1131 #[async_test_all]
1134 async fn test_three_phase_emit_flow() {
1135 let emitter = EventEmitter::new(false);
1136 let (int, int_events) = RecordingListener::new();
1137 let (ext, ext_events) = RecordingListener::new();
1138
1139 emitter.add_internal_listener(Box::new(int)).await;
1140 emitter.add_external_listener(Box::new(ext)).await;
1141 emitter
1142 .add_middleware(Box::new(SuppressSyncedMiddleware))
1143 .await;
1144
1145 emitter.emit(&SdkEvent::Synced).await;
1147
1148 assert_eq!(int_events.lock().await.len(), 1);
1149 assert!(ext_events.lock().await.is_empty());
1150
1151 let event = SdkEvent::PaymentSucceeded {
1153 payment: test_payment(),
1154 };
1155 emitter.emit(&event).await;
1156
1157 assert_eq!(int_events.lock().await.len(), 2);
1158 assert_eq!(ext_events.lock().await.len(), 1);
1159 }
1160
1161 #[async_test_all]
1162 async fn test_internal_sees_raw_event_external_sees_transformed() {
1163 let emitter = EventEmitter::new(false);
1164 let (int, int_events) = RecordingListener::new();
1165 let (ext, ext_events) = RecordingListener::new();
1166
1167 emitter.add_internal_listener(Box::new(int)).await;
1168 emitter.add_external_listener(Box::new(ext)).await;
1169 emitter
1170 .add_middleware(Box::new(DowngradePaymentMiddleware))
1171 .await;
1172
1173 let event = SdkEvent::PaymentSucceeded {
1174 payment: test_payment(),
1175 };
1176 emitter.emit(&event).await;
1177
1178 let int_log = int_events.lock().await;
1179 let ext_log = ext_events.lock().await;
1180
1181 assert_eq!(int_log.len(), 1);
1183 assert!(int_log[0].contains("PaymentSucceeded"));
1184
1185 assert_eq!(ext_log.len(), 1);
1187 assert!(ext_log[0].contains("PaymentPending"));
1188 }
1189
1190 #[async_test_all]
1191 async fn test_no_listeners_no_middleware_does_not_panic() {
1192 let emitter = EventEmitter::new(false);
1193 emitter.emit(&SdkEvent::Synced).await;
1194 }
1196
1197 #[async_test_all]
1198 async fn test_empty_event_does_not_emit_after_wallet_sync() {
1199 use std::sync::atomic::AtomicUsize;
1200
1201 struct CountingListener {
1202 count: Arc<AtomicUsize>,
1203 }
1204
1205 #[macros::async_trait]
1206 impl EventListener for CountingListener {
1207 async fn on_event(&self, event: SdkEvent) {
1208 if matches!(event, SdkEvent::Synced) {
1209 self.count.fetch_add(1, Ordering::Relaxed);
1210 }
1211 }
1212 }
1213
1214 let emitter = EventEmitter::new(false);
1215 let count = Arc::new(AtomicUsize::new(0));
1216
1217 let listener = Box::new(CountingListener {
1218 count: count.clone(),
1219 });
1220
1221 emitter.add_external_listener(listener).await;
1222
1223 emitter
1225 .emit_synced(&InternalSyncedEvent {
1226 wallet: true,
1227 wallet_state: false,
1228 deposits: false,
1229 lnurl_metadata: false,
1230 storage_incoming: None,
1231 })
1232 .await;
1233
1234 assert_eq!(count.load(Ordering::Relaxed), 1);
1235
1236 emitter
1238 .emit_synced(&InternalSyncedEvent {
1239 wallet: false,
1240 wallet_state: false,
1241 deposits: false,
1242 lnurl_metadata: false,
1243 storage_incoming: None,
1244 })
1245 .await;
1246
1247 assert_eq!(count.load(Ordering::Relaxed), 1); emitter
1251 .emit_synced(&InternalSyncedEvent {
1252 wallet: false,
1253 wallet_state: true,
1254 deposits: false,
1255 lnurl_metadata: false,
1256 storage_incoming: None,
1257 })
1258 .await;
1259
1260 assert_eq!(count.load(Ordering::Relaxed), 2); }
1262}