breez_sdk_spark/
events.rs

1use core::fmt;
2use std::{
3    collections::BTreeMap,
4    sync::atomic::{AtomicBool, AtomicU64, Ordering},
5};
6
7use platform_utils::time::Instant;
8use serde::Serialize;
9use tokio::sync::{Mutex, RwLock};
10use tracing::debug;
11use uuid::Uuid;
12
13use crate::{DepositInfo, LightningAddressInfo, Payment};
14
15/// Events emitted by the SDK
16#[allow(clippy::large_enum_variant)]
17#[derive(Debug, Clone, Serialize)]
18#[cfg_attr(feature = "uniffi", derive(uniffi::Enum))]
19pub enum SdkEvent {
20    /// Emitted when the wallet has been synchronized with the network
21    Synced,
22    /// Emitted when the SDK was unable to claim deposits
23    UnclaimedDeposits {
24        unclaimed_deposits: Vec<DepositInfo>,
25    },
26    ClaimedDeposits {
27        claimed_deposits: Vec<DepositInfo>,
28    },
29    PaymentSucceeded {
30        payment: Payment,
31    },
32    PaymentPending {
33        payment: Payment,
34    },
35    PaymentFailed {
36        payment: Payment,
37    },
38    Optimization {
39        // Named with `optimization` prefix to avoid collision with `event` keyword in C#
40        optimization_event: OptimizationEvent,
41    },
42    LightningAddressChanged {
43        lightning_address: Option<LightningAddressInfo>,
44    },
45    NewDeposits {
46        new_deposits: Vec<DepositInfo>,
47    },
48}
49
50impl SdkEvent {
51    pub(crate) fn from_payment(payment: Payment) -> Self {
52        match payment.status {
53            crate::PaymentStatus::Completed => SdkEvent::PaymentSucceeded { payment },
54            crate::PaymentStatus::Pending => SdkEvent::PaymentPending { payment },
55            crate::PaymentStatus::Failed => SdkEvent::PaymentFailed { payment },
56        }
57    }
58}
59
60impl fmt::Display for SdkEvent {
61    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
62        match self {
63            SdkEvent::Synced => write!(f, "Synced"),
64            SdkEvent::UnclaimedDeposits { unclaimed_deposits } => {
65                write!(f, "UnclaimedDeposits: {unclaimed_deposits:?}")
66            }
67            SdkEvent::ClaimedDeposits { claimed_deposits } => {
68                write!(f, "ClaimedDeposits: {claimed_deposits:?}")
69            }
70            SdkEvent::PaymentSucceeded { payment } => {
71                write!(f, "PaymentSucceeded: {payment:?}")
72            }
73            SdkEvent::PaymentPending { payment } => {
74                write!(f, "PaymentPending: {payment:?}")
75            }
76            SdkEvent::PaymentFailed { payment } => {
77                write!(f, "PaymentFailed: {payment:?}")
78            }
79            SdkEvent::Optimization {
80                optimization_event: event,
81            } => {
82                write!(f, "Optimization: {event:?}")
83            }
84            SdkEvent::LightningAddressChanged { lightning_address } => {
85                write!(f, "LightningAddressChanged: {lightning_address:?}")
86            }
87            SdkEvent::NewDeposits { new_deposits } => {
88                write!(f, "NewDeposits: {new_deposits:?}")
89            }
90        }
91    }
92}
93
94#[derive(Debug, Clone, Serialize)]
95#[cfg_attr(feature = "uniffi", derive(uniffi::Enum))]
96pub enum OptimizationEvent {
97    /// Optimization has started with the given number of rounds.
98    Started { total_rounds: u32 },
99    /// A round has completed.
100    RoundCompleted {
101        current_round: u32,
102        total_rounds: u32,
103    },
104    /// Optimization completed successfully.
105    Completed,
106    /// Optimization was cancelled.
107    Cancelled,
108    /// Optimization failed with an error.
109    Failed { error: String },
110    /// Optimization was skipped because leaves are already optimal.
111    Skipped,
112}
113
114#[allow(clippy::struct_excessive_bools)]
115#[derive(Debug, Default)]
116pub struct InternalSyncedEvent {
117    pub wallet: bool,
118    pub wallet_state: bool,
119    pub deposits: bool,
120    pub lnurl_metadata: bool,
121    pub storage_incoming: Option<u32>,
122}
123
124impl InternalSyncedEvent {
125    pub fn any(&self) -> bool {
126        self.wallet
127            || self.wallet_state
128            || self.deposits
129            || self.lnurl_metadata
130            || self.storage_incoming.is_some()
131    }
132
133    pub fn any_non_zero(&self) -> bool {
134        self.wallet
135            || self.wallet_state
136            || self.deposits
137            || self.lnurl_metadata
138            || self.storage_incoming.is_some_and(|v| v > 0)
139    }
140
141    pub fn merge(&self, other: &InternalSyncedEvent) -> Self {
142        Self {
143            wallet: self.wallet || other.wallet,
144            wallet_state: self.wallet_state || other.wallet_state,
145            deposits: self.deposits || other.deposits,
146            lnurl_metadata: self.lnurl_metadata || other.lnurl_metadata,
147            storage_incoming: self
148                .storage_incoming
149                .zip(other.storage_incoming)
150                .map(|(a, b)| a.saturating_add(b))
151                .or(self.storage_incoming)
152                .or(other.storage_incoming),
153        }
154    }
155}
156
157/// Trait for event listeners
158#[cfg_attr(feature = "uniffi", uniffi::export(callback_interface))]
159#[macros::async_trait]
160pub trait EventListener: Send + Sync {
161    /// Called when an event occurs
162    async fn on_event(&self, event: SdkEvent);
163}
164
165/// Middleware that can intercept and transform events before they reach external listeners.
166///
167/// Middleware processes events in a chain. Each middleware receives the event from the
168/// previous one and can:
169/// - Pass it through unchanged: `Some(event)`
170/// - Transform it: `Some(modified_event)`
171/// - Suppress it: `None`
172#[macros::async_trait]
173pub trait EventMiddleware: Send + Sync {
174    /// Process an event. Return `Some` to forward (possibly modified), `None` to suppress.
175    async fn process(&self, event: SdkEvent) -> Option<SdkEvent>;
176}
177
178/// Event publisher that manages event listeners and middleware.
179///
180/// Events flow through three phases:
181/// 1. Internal listeners see raw events (SDK components like `wait_for_payment`)
182/// 2. Middleware chain can transform or suppress events
183/// 3. External listeners see processed events (client event handlers)
184pub struct EventEmitter {
185    has_real_time_sync: bool,
186    rtsync_failed: AtomicBool,
187    listener_index: AtomicU64,
188    /// Internal listeners see ALL events before middleware processing
189    internal_listeners: RwLock<BTreeMap<String, Box<dyn EventListener>>>,
190    /// Middleware chain that can transform/suppress events
191    middleware: RwLock<Vec<Box<dyn EventMiddleware>>>,
192    /// External listeners see events after middleware processing
193    external_listeners: RwLock<BTreeMap<String, Box<dyn EventListener>>>,
194    synced_event_buffer: Mutex<Option<InternalSyncedEvent>>,
195}
196
197impl EventEmitter {
198    /// Create a new event emitter
199    pub fn new(has_real_time_sync: bool) -> Self {
200        Self {
201            has_real_time_sync,
202            rtsync_failed: AtomicBool::new(false),
203            listener_index: AtomicU64::new(0),
204            internal_listeners: RwLock::new(BTreeMap::new()),
205            middleware: RwLock::new(Vec::new()),
206            external_listeners: RwLock::new(BTreeMap::new()),
207            synced_event_buffer: Mutex::new(Some(InternalSyncedEvent::default())),
208        }
209    }
210
211    /// Add an external listener to receive events
212    ///
213    /// # Arguments
214    ///
215    /// * `listener` - The listener to add
216    ///
217    /// # Returns
218    ///
219    /// A unique identifier for the listener, which can be used to remove it later
220    pub async fn add_external_listener(&self, listener: Box<dyn EventListener>) -> String {
221        let index = self.listener_index.fetch_add(1, Ordering::Relaxed);
222        let id = format!("listener_{}-{}", index, Uuid::new_v4());
223        let mut listeners = self.external_listeners.write().await;
224        listeners.insert(id.clone(), listener);
225        id
226    }
227
228    /// Remove an external listener by its ID
229    ///
230    /// # Arguments
231    ///
232    /// * `id` - The ID returned from `add_listener`
233    ///
234    /// # Returns
235    ///
236    /// `true` if the listener was found and removed, `false` otherwise
237    pub async fn remove_external_listener(&self, id: &str) -> bool {
238        let mut listeners = self.external_listeners.write().await;
239        listeners.remove(id).is_some()
240    }
241
242    /// Add an internal listener that sees all raw events before middleware processing.
243    ///
244    /// Used by SDK components (e.g., `wait_for_payment`) that need to observe events
245    /// that middleware may suppress.
246    pub async fn add_internal_listener(&self, listener: Box<dyn EventListener>) -> String {
247        let index = self.listener_index.fetch_add(1, Ordering::Relaxed);
248        let id = format!("internal_{}-{}", index, Uuid::new_v4());
249        let mut listeners = self.internal_listeners.write().await;
250        listeners.insert(id.clone(), listener);
251        id
252    }
253
254    /// Remove an internal listener by its ID
255    pub async fn remove_internal_listener(&self, id: &str) -> bool {
256        let mut listeners = self.internal_listeners.write().await;
257        listeners.remove(id).is_some()
258    }
259
260    /// Add middleware to the event processing chain.
261    ///
262    /// Middleware can transform or suppress events before they reach external listeners.
263    pub async fn add_middleware(&self, middleware: Box<dyn EventMiddleware>) {
264        let mut mw = self.middleware.write().await;
265        mw.push(middleware);
266    }
267
268    /// Emit an event through the three-phase pipeline:
269    /// 1. Internal listeners see the raw event
270    /// 2. Middleware chain can transform or suppress
271    /// 3. External listeners see the processed event
272    pub async fn emit(&self, event: &SdkEvent) {
273        let start = Instant::now();
274
275        // Phase 1: Internal listeners see raw event
276        let internal = self.internal_listeners.read().await;
277        for listener in internal.values() {
278            listener.on_event(event.clone()).await;
279        }
280        drop(internal);
281
282        // Phase 2: Middleware chain
283        let mut event = Some(event.clone());
284        let middleware = self.middleware.read().await;
285        for mw in middleware.iter() {
286            if let Some(e) = event {
287                event = mw.process(e).await;
288            } else {
289                break;
290            }
291        }
292        drop(middleware);
293
294        // Phase 3: External listeners see processed event
295        if let Some(ref event) = event {
296            let listeners = self.external_listeners.read().await;
297            for listener in listeners.values() {
298                listener.on_event(event.clone()).await;
299            }
300        }
301
302        debug!("emit() completed in {:?}", start.elapsed());
303    }
304
305    pub async fn emit_synced(&self, synced: &InternalSyncedEvent) {
306        if !synced.any() {
307            // Nothing to emit
308            return;
309        }
310
311        let mut mtx = self.synced_event_buffer.lock().await;
312
313        let is_first_event = if let Some(buffered) = &*mtx {
314            let merged = buffered.merge(synced);
315
316            // The first synced event emitted should at least have the wallet synced.
317            // Subsequent events might have only partial syncs.
318            if merged.wallet
319                && (!self.has_real_time_sync
320                    || merged.storage_incoming.is_some()
321                    || self.rtsync_failed.load(Ordering::Relaxed))
322            {
323                *mtx = None;
324            } else {
325                *mtx = Some(merged);
326                return;
327            }
328
329            true
330        } else {
331            false
332        };
333
334        drop(mtx);
335
336        // Only emit zero real-time syncs on the first event.
337        if !is_first_event && !synced.any_non_zero() {
338            return;
339        }
340
341        // Emit the merged event
342        self.emit(&SdkEvent::Synced).await;
343    }
344
345    /// Notify that real-time sync has failed. If the first synced event is still
346    /// buffered and the wallet has already synced, release it immediately instead
347    /// of waiting for a remote pull that may never arrive.
348    pub async fn notify_rtsync_failed(&self) {
349        self.rtsync_failed.store(true, Ordering::Relaxed);
350
351        let mut mtx = self.synced_event_buffer.lock().await;
352        if let Some(buffered) = &*mtx
353            && buffered.wallet
354        {
355            *mtx = None;
356            drop(mtx);
357            self.emit(&SdkEvent::Synced).await;
358        }
359    }
360}
361
362impl Default for EventEmitter {
363    fn default() -> Self {
364        Self::new(false)
365    }
366}
367
368#[cfg(test)]
369mod tests {
370    use super::*;
371    use std::sync::Arc;
372    use std::sync::atomic::{AtomicBool, Ordering};
373
374    use macros::async_test_all;
375
376    #[cfg(feature = "browser-tests")]
377    wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
378
379    struct TestListener {
380        received: Arc<AtomicBool>,
381    }
382
383    #[macros::async_trait]
384    impl EventListener for TestListener {
385        async fn on_event(&self, _event: SdkEvent) {
386            self.received.store(true, Ordering::Relaxed);
387        }
388    }
389
390    #[async_test_all]
391    async fn test_event_emission() {
392        let emitter = EventEmitter::new(false);
393        let received = Arc::new(AtomicBool::new(false));
394
395        // Create the listener with a shared reference to the atomic boolean
396        let listener = Box::new(TestListener {
397            received: received.clone(),
398        });
399
400        let _ = emitter.add_external_listener(listener).await;
401
402        let event = SdkEvent::Synced {};
403
404        emitter.emit(&event).await;
405
406        // Check if event was received using the shared reference
407        assert!(received.load(Ordering::Relaxed));
408    }
409
410    #[async_test_all]
411    async fn test_remove_listener() {
412        let emitter = EventEmitter::new(false);
413
414        // Create shared atomic booleans to track event reception
415        let received1 = Arc::new(AtomicBool::new(false));
416        let received2 = Arc::new(AtomicBool::new(false));
417
418        // Create listeners with their own shared references
419        let listener1 = Box::new(TestListener {
420            received: received1.clone(),
421        });
422
423        let listener2 = Box::new(TestListener {
424            received: received2.clone(),
425        });
426
427        let id1 = emitter.add_external_listener(listener1).await;
428        let id2 = emitter.add_external_listener(listener2).await;
429
430        // Remove the first listener
431        assert!(emitter.remove_external_listener(&id1).await);
432
433        // Emit an event
434        let event = SdkEvent::Synced {};
435        emitter.emit(&event).await;
436
437        // The first listener should not receive the event
438        assert!(!received1.load(Ordering::Relaxed));
439
440        // The second listener should receive the event
441        assert!(received2.load(Ordering::Relaxed));
442
443        // Remove the second listener
444        assert!(emitter.remove_external_listener(&id2).await);
445
446        // Try to remove a non-existent listener
447        assert!(!emitter.remove_external_listener("non-existent-id").await);
448    }
449
450    #[async_test_all]
451    async fn test_synced_event_only_emitted_with_wallet_sync() {
452        let emitter = EventEmitter::new(false);
453        let received = Arc::new(AtomicBool::new(false));
454
455        let listener = Box::new(TestListener {
456            received: received.clone(),
457        });
458
459        emitter.add_external_listener(listener).await;
460
461        // Emit synced event without wallet sync - should NOT emit Synced
462        emitter
463            .emit_synced(&InternalSyncedEvent {
464                wallet: false,
465                wallet_state: true,
466                deposits: true,
467                lnurl_metadata: true,
468                storage_incoming: None,
469            })
470            .await;
471
472        assert!(!received.load(Ordering::Relaxed));
473
474        // Emit synced event with wallet sync - should emit Synced
475        emitter
476            .emit_synced(&InternalSyncedEvent {
477                wallet: true,
478                wallet_state: false,
479                deposits: false,
480                lnurl_metadata: false,
481                storage_incoming: Some(1),
482            })
483            .await;
484
485        assert!(received.load(Ordering::Relaxed));
486    }
487
488    #[async_test_all]
489    async fn test_has_real_time_sync_synced_event_only_emitted_with_wallet_and_storage_sync() {
490        let emitter = EventEmitter::new(true);
491        let received = Arc::new(AtomicBool::new(false));
492
493        let listener = Box::new(TestListener {
494            received: received.clone(),
495        });
496
497        emitter.add_external_listener(listener).await;
498
499        // Emit synced event with storage
500        emitter
501            .emit_synced(&InternalSyncedEvent {
502                wallet: false,
503                wallet_state: false,
504                deposits: false,
505                lnurl_metadata: false,
506                storage_incoming: Some(0),
507            })
508            .await;
509
510        assert!(!received.load(Ordering::Relaxed));
511
512        // Emit synced event with wallet sync - should emit Synced
513        emitter
514            .emit_synced(&InternalSyncedEvent {
515                wallet: true,
516                wallet_state: false,
517                deposits: false,
518                lnurl_metadata: false,
519                storage_incoming: None,
520            })
521            .await;
522
523        assert!(received.load(Ordering::Relaxed));
524    }
525
526    #[async_test_all]
527    async fn test_has_real_time_sync_synced_event_only_emitted_with_wallet_and_storage_sync_reverse()
528     {
529        let emitter = EventEmitter::new(true);
530        let received = Arc::new(AtomicBool::new(false));
531
532        let listener = Box::new(TestListener {
533            received: received.clone(),
534        });
535
536        emitter.add_external_listener(listener).await;
537
538        // Emit synced event with wallet sync
539        emitter
540            .emit_synced(&InternalSyncedEvent {
541                wallet: true,
542                wallet_state: false,
543                deposits: false,
544                lnurl_metadata: false,
545                storage_incoming: None,
546            })
547            .await;
548
549        assert!(!received.load(Ordering::Relaxed));
550
551        // Emit synced event with storage - should emit Synced
552        emitter
553            .emit_synced(&InternalSyncedEvent {
554                wallet: false,
555                wallet_state: false,
556                deposits: false,
557                lnurl_metadata: false,
558                storage_incoming: Some(0),
559            })
560            .await;
561
562        assert!(received.load(Ordering::Relaxed));
563    }
564
565    #[async_test_all]
566    async fn test_rtsync_failed_emits_synced_on_wallet_alone() {
567        let emitter = EventEmitter::new(true);
568        let received = Arc::new(AtomicBool::new(false));
569
570        let listener = Box::new(TestListener {
571            received: received.clone(),
572        });
573
574        emitter.add_external_listener(listener).await;
575
576        // Wallet synced but rtsync hasn't failed yet — should NOT emit
577        emitter
578            .emit_synced(&InternalSyncedEvent {
579                wallet: true,
580                wallet_state: false,
581                deposits: false,
582                lnurl_metadata: false,
583                storage_incoming: None,
584            })
585            .await;
586
587        assert!(!received.load(Ordering::Relaxed));
588
589        // rtsync fails — should immediately release the buffered event
590        emitter.notify_rtsync_failed().await;
591
592        assert!(received.load(Ordering::Relaxed));
593    }
594
595    #[async_test_all]
596    async fn test_rtsync_failed_before_wallet_sync_emits_on_wallet() {
597        let emitter = EventEmitter::new(true);
598        let received = Arc::new(AtomicBool::new(false));
599
600        let listener = Box::new(TestListener {
601            received: received.clone(),
602        });
603
604        emitter.add_external_listener(listener).await;
605
606        // rtsync fails before wallet syncs — nothing to release yet
607        emitter.notify_rtsync_failed().await;
608
609        assert!(!received.load(Ordering::Relaxed));
610
611        // Wallet syncs — should emit immediately (rtsync already marked failed)
612        emitter
613            .emit_synced(&InternalSyncedEvent {
614                wallet: true,
615                wallet_state: false,
616                deposits: false,
617                lnurl_metadata: false,
618                storage_incoming: None,
619            })
620            .await;
621
622        assert!(received.load(Ordering::Relaxed));
623    }
624
625    #[async_test_all]
626    async fn test_synced_event_buffers_until_wallet_sync() {
627        let emitter = EventEmitter::new(false);
628        let received = Arc::new(AtomicBool::new(false));
629
630        let listener = Box::new(TestListener {
631            received: received.clone(),
632        });
633
634        emitter.add_external_listener(listener).await;
635
636        // Emit multiple partial syncs without wallet sync
637        emitter
638            .emit_synced(&InternalSyncedEvent {
639                wallet: false,
640                wallet_state: true,
641                deposits: false,
642                lnurl_metadata: false,
643                storage_incoming: None,
644            })
645            .await;
646
647        assert!(!received.load(Ordering::Relaxed));
648
649        emitter
650            .emit_synced(&InternalSyncedEvent {
651                wallet: false,
652                wallet_state: false,
653                deposits: true,
654                lnurl_metadata: false,
655                storage_incoming: None,
656            })
657            .await;
658
659        assert!(!received.load(Ordering::Relaxed));
660
661        emitter
662            .emit_synced(&InternalSyncedEvent {
663                wallet: false,
664                wallet_state: false,
665                deposits: false,
666                lnurl_metadata: true,
667                storage_incoming: None,
668            })
669            .await;
670
671        assert!(!received.load(Ordering::Relaxed));
672
673        emitter
674            .emit_synced(&InternalSyncedEvent {
675                wallet: false,
676                wallet_state: false,
677                deposits: false,
678                lnurl_metadata: false,
679                storage_incoming: None,
680            })
681            .await;
682
683        assert!(!received.load(Ordering::Relaxed));
684
685        // Finally emit wallet sync - should emit Synced
686        emitter
687            .emit_synced(&InternalSyncedEvent {
688                wallet: true,
689                wallet_state: false,
690                deposits: false,
691                lnurl_metadata: false,
692                storage_incoming: None,
693            })
694            .await;
695
696        assert!(received.load(Ordering::Relaxed));
697    }
698
699    #[async_test_all]
700    async fn test_synced_event_all_true() {
701        let emitter = EventEmitter::new(false);
702        let received = Arc::new(AtomicBool::new(false));
703
704        let listener = Box::new(TestListener {
705            received: received.clone(),
706        });
707
708        emitter.add_external_listener(listener).await;
709
710        // Emit synced event with wallet and other components - should emit Synced
711        emitter
712            .emit_synced(&InternalSyncedEvent {
713                wallet: true,
714                wallet_state: true,
715                deposits: true,
716                lnurl_metadata: true,
717                storage_incoming: Some(1),
718            })
719            .await;
720
721        assert!(received.load(Ordering::Relaxed));
722    }
723
724    #[async_test_all]
725    async fn test_synced_event_empty_does_not_emit() {
726        let emitter = EventEmitter::new(false);
727        let received = Arc::new(AtomicBool::new(false));
728
729        let listener = Box::new(TestListener {
730            received: received.clone(),
731        });
732
733        emitter.add_external_listener(listener).await;
734
735        // Emit empty synced event - should NOT emit Synced
736        emitter
737            .emit_synced(&InternalSyncedEvent {
738                wallet: false,
739                wallet_state: false,
740                deposits: false,
741                lnurl_metadata: false,
742                storage_incoming: None,
743            })
744            .await;
745
746        assert!(!received.load(Ordering::Relaxed));
747    }
748
749    #[async_test_all]
750    async fn test_subsequent_syncs_after_wallet_emit_immediately() {
751        use std::sync::atomic::AtomicUsize;
752
753        struct CountingListener {
754            count: Arc<AtomicUsize>,
755        }
756
757        #[macros::async_trait]
758        impl EventListener for CountingListener {
759            async fn on_event(&self, event: SdkEvent) {
760                if matches!(event, SdkEvent::Synced) {
761                    self.count.fetch_add(1, Ordering::Relaxed);
762                }
763            }
764        }
765
766        let emitter = EventEmitter::new(true);
767        let count = Arc::new(AtomicUsize::new(0));
768
769        let listener = Box::new(CountingListener {
770            count: count.clone(),
771        });
772
773        emitter.add_external_listener(listener).await;
774
775        // First sync with wallet - should emit
776        emitter
777            .emit_synced(&InternalSyncedEvent {
778                wallet: true,
779                wallet_state: false,
780                deposits: false,
781                lnurl_metadata: false,
782                storage_incoming: Some(0),
783            })
784            .await;
785
786        assert_eq!(count.load(Ordering::Relaxed), 1);
787
788        // Subsequent partial sync without wallet - should emit (buffer cleared after first wallet sync)
789        emitter
790            .emit_synced(&InternalSyncedEvent {
791                wallet: false,
792                wallet_state: true,
793                deposits: false,
794                lnurl_metadata: false,
795                storage_incoming: None,
796            })
797            .await;
798
799        assert_eq!(count.load(Ordering::Relaxed), 2);
800
801        // Another partial sync - should emit
802        emitter
803            .emit_synced(&InternalSyncedEvent {
804                wallet: false,
805                wallet_state: false,
806                deposits: true,
807                lnurl_metadata: false,
808                storage_incoming: None,
809            })
810            .await;
811
812        assert_eq!(count.load(Ordering::Relaxed), 3);
813
814        emitter
815            .emit_synced(&InternalSyncedEvent {
816                wallet: false,
817                wallet_state: false,
818                deposits: false,
819                lnurl_metadata: true,
820                storage_incoming: None,
821            })
822            .await;
823
824        assert_eq!(count.load(Ordering::Relaxed), 4);
825
826        emitter
827            .emit_synced(&InternalSyncedEvent {
828                wallet: false,
829                wallet_state: false,
830                deposits: false,
831                lnurl_metadata: false,
832                storage_incoming: Some(1),
833            })
834            .await;
835
836        assert_eq!(count.load(Ordering::Relaxed), 5);
837
838        // storage_incoming with Some(0) - should NOT emit after first sync
839        emitter
840            .emit_synced(&InternalSyncedEvent {
841                wallet: false,
842                wallet_state: false,
843                deposits: false,
844                lnurl_metadata: false,
845                storage_incoming: Some(0),
846            })
847            .await;
848
849        assert_eq!(count.load(Ordering::Relaxed), 5);
850
851        emitter
852            .emit_synced(&InternalSyncedEvent {
853                wallet: true,
854                wallet_state: false,
855                deposits: false,
856                lnurl_metadata: false,
857                storage_incoming: None,
858            })
859            .await;
860
861        assert_eq!(count.load(Ordering::Relaxed), 6);
862    }
863
864    // ── Helpers for middleware / internal listener tests ──
865
866    /// Listener that records all received events
867    struct RecordingListener {
868        events: Arc<Mutex<Vec<String>>>,
869    }
870
871    impl RecordingListener {
872        fn new() -> (Self, Arc<Mutex<Vec<String>>>) {
873            let events = Arc::new(Mutex::new(Vec::new()));
874            (
875                Self {
876                    events: events.clone(),
877                },
878                events,
879            )
880        }
881    }
882
883    #[macros::async_trait]
884    impl EventListener for RecordingListener {
885        async fn on_event(&self, event: SdkEvent) {
886            self.events.lock().await.push(format!("{event}"));
887        }
888    }
889
890    /// Middleware that suppresses all Synced events
891    struct SuppressSyncedMiddleware;
892
893    #[macros::async_trait]
894    impl EventMiddleware for SuppressSyncedMiddleware {
895        async fn process(&self, event: SdkEvent) -> Option<SdkEvent> {
896            match event {
897                SdkEvent::Synced => None,
898                other => Some(other),
899            }
900        }
901    }
902
903    /// Middleware that replaces `PaymentSucceeded` with `PaymentPending`
904    struct DowngradePaymentMiddleware;
905
906    #[macros::async_trait]
907    impl EventMiddleware for DowngradePaymentMiddleware {
908        async fn process(&self, event: SdkEvent) -> Option<SdkEvent> {
909            match event {
910                SdkEvent::PaymentSucceeded { payment } => {
911                    Some(SdkEvent::PaymentPending { payment })
912                }
913                other => Some(other),
914            }
915        }
916    }
917
918    /// Middleware that suppresses all events
919    struct SuppressAllMiddleware;
920
921    #[macros::async_trait]
922    impl EventMiddleware for SuppressAllMiddleware {
923        async fn process(&self, _event: SdkEvent) -> Option<SdkEvent> {
924            None
925        }
926    }
927
928    fn test_payment() -> Payment {
929        Payment {
930            id: "test-id".to_string(),
931            payment_type: crate::PaymentType::Receive,
932            status: crate::PaymentStatus::Completed,
933            amount: 1000,
934            fees: 10,
935            timestamp: 123_456,
936            method: crate::PaymentMethod::Spark,
937            details: None,
938            conversion_details: None,
939        }
940    }
941
942    // ── Internal listener tests ──
943
944    #[async_test_all]
945    async fn test_internal_listener_receives_events() {
946        let emitter = EventEmitter::new(false);
947        let (listener, events) = RecordingListener::new();
948
949        emitter.add_internal_listener(Box::new(listener)).await;
950
951        emitter.emit(&SdkEvent::Synced).await;
952
953        let log = events.lock().await;
954        assert_eq!(log.len(), 1);
955        assert!(log[0].contains("Synced"));
956    }
957
958    #[async_test_all]
959    async fn test_remove_internal_listener() {
960        let emitter = EventEmitter::new(false);
961        let (listener, events) = RecordingListener::new();
962
963        let id = emitter.add_internal_listener(Box::new(listener)).await;
964
965        assert!(emitter.remove_internal_listener(&id).await);
966        assert!(!emitter.remove_internal_listener(&id).await);
967
968        emitter.emit(&SdkEvent::Synced).await;
969
970        assert!(events.lock().await.is_empty());
971    }
972
973    // ── Middleware tests ──
974
975    #[async_test_all]
976    async fn test_middleware_suppresses_event_for_external() {
977        let emitter = EventEmitter::new(false);
978        let (ext, ext_events) = RecordingListener::new();
979
980        emitter.add_external_listener(Box::new(ext)).await;
981        emitter
982            .add_middleware(Box::new(SuppressSyncedMiddleware))
983            .await;
984
985        emitter.emit(&SdkEvent::Synced).await;
986
987        // External should NOT see the suppressed event
988        assert!(ext_events.lock().await.is_empty());
989    }
990
991    #[async_test_all]
992    async fn test_middleware_transforms_event() {
993        let emitter = EventEmitter::new(false);
994        let (ext, ext_events) = RecordingListener::new();
995
996        emitter.add_external_listener(Box::new(ext)).await;
997        emitter
998            .add_middleware(Box::new(DowngradePaymentMiddleware))
999            .await;
1000
1001        let event = SdkEvent::PaymentSucceeded {
1002            payment: test_payment(),
1003        };
1004        emitter.emit(&event).await;
1005
1006        let log = ext_events.lock().await;
1007        assert_eq!(log.len(), 1);
1008        assert!(log[0].contains("PaymentPending"));
1009    }
1010
1011    #[async_test_all]
1012    async fn test_middleware_passthrough_unmatched_events() {
1013        let emitter = EventEmitter::new(false);
1014        let (ext, ext_events) = RecordingListener::new();
1015
1016        emitter.add_external_listener(Box::new(ext)).await;
1017        emitter
1018            .add_middleware(Box::new(SuppressSyncedMiddleware))
1019            .await;
1020
1021        // Synced is suppressed, PaymentSucceeded passes through
1022        emitter.emit(&SdkEvent::Synced).await;
1023        let event = SdkEvent::PaymentSucceeded {
1024            payment: test_payment(),
1025        };
1026        emitter.emit(&event).await;
1027
1028        let log = ext_events.lock().await;
1029        assert_eq!(log.len(), 1);
1030        assert!(log[0].contains("PaymentSucceeded"));
1031    }
1032
1033    #[async_test_all]
1034    async fn test_middleware_chain_ordering() {
1035        let emitter = EventEmitter::new(false);
1036        let (ext, ext_events) = RecordingListener::new();
1037
1038        emitter.add_external_listener(Box::new(ext)).await;
1039
1040        // First middleware transforms PaymentSucceeded → PaymentPending
1041        emitter
1042            .add_middleware(Box::new(DowngradePaymentMiddleware))
1043            .await;
1044        // Second middleware suppresses Synced (doesn't affect PaymentPending)
1045        emitter
1046            .add_middleware(Box::new(SuppressSyncedMiddleware))
1047            .await;
1048
1049        let event = SdkEvent::PaymentSucceeded {
1050            payment: test_payment(),
1051        };
1052        emitter.emit(&event).await;
1053
1054        let log = ext_events.lock().await;
1055        assert_eq!(log.len(), 1);
1056        assert!(log[0].contains("PaymentPending"));
1057    }
1058
1059    #[async_test_all]
1060    async fn test_suppress_all_middleware_stops_chain() {
1061        let emitter = EventEmitter::new(false);
1062        let (ext, ext_events) = RecordingListener::new();
1063
1064        emitter.add_external_listener(Box::new(ext)).await;
1065
1066        // SuppressAll first — nothing should reach the next middleware or external
1067        emitter
1068            .add_middleware(Box::new(SuppressAllMiddleware))
1069            .await;
1070        emitter
1071            .add_middleware(Box::new(DowngradePaymentMiddleware))
1072            .await;
1073
1074        emitter.emit(&SdkEvent::Synced).await;
1075        let event = SdkEvent::PaymentSucceeded {
1076            payment: test_payment(),
1077        };
1078        emitter.emit(&event).await;
1079
1080        assert!(ext_events.lock().await.is_empty());
1081    }
1082
1083    // ── Three-phase flow tests ──
1084
1085    #[async_test_all]
1086    async fn test_three_phase_emit_flow() {
1087        let emitter = EventEmitter::new(false);
1088        let (int, int_events) = RecordingListener::new();
1089        let (ext, ext_events) = RecordingListener::new();
1090
1091        emitter.add_internal_listener(Box::new(int)).await;
1092        emitter.add_external_listener(Box::new(ext)).await;
1093        emitter
1094            .add_middleware(Box::new(SuppressSyncedMiddleware))
1095            .await;
1096
1097        // Synced: internal sees it, middleware suppresses it, external doesn't
1098        emitter.emit(&SdkEvent::Synced).await;
1099
1100        assert_eq!(int_events.lock().await.len(), 1);
1101        assert!(ext_events.lock().await.is_empty());
1102
1103        // PaymentSucceeded: both see it (middleware passes it through)
1104        let event = SdkEvent::PaymentSucceeded {
1105            payment: test_payment(),
1106        };
1107        emitter.emit(&event).await;
1108
1109        assert_eq!(int_events.lock().await.len(), 2);
1110        assert_eq!(ext_events.lock().await.len(), 1);
1111    }
1112
1113    #[async_test_all]
1114    async fn test_internal_sees_raw_event_external_sees_transformed() {
1115        let emitter = EventEmitter::new(false);
1116        let (int, int_events) = RecordingListener::new();
1117        let (ext, ext_events) = RecordingListener::new();
1118
1119        emitter.add_internal_listener(Box::new(int)).await;
1120        emitter.add_external_listener(Box::new(ext)).await;
1121        emitter
1122            .add_middleware(Box::new(DowngradePaymentMiddleware))
1123            .await;
1124
1125        let event = SdkEvent::PaymentSucceeded {
1126            payment: test_payment(),
1127        };
1128        emitter.emit(&event).await;
1129
1130        let int_log = int_events.lock().await;
1131        let ext_log = ext_events.lock().await;
1132
1133        // Internal sees the original PaymentSucceeded
1134        assert_eq!(int_log.len(), 1);
1135        assert!(int_log[0].contains("PaymentSucceeded"));
1136
1137        // External sees the transformed PaymentPending
1138        assert_eq!(ext_log.len(), 1);
1139        assert!(ext_log[0].contains("PaymentPending"));
1140    }
1141
1142    #[async_test_all]
1143    async fn test_no_listeners_no_middleware_does_not_panic() {
1144        let emitter = EventEmitter::new(false);
1145        emitter.emit(&SdkEvent::Synced).await;
1146        // Should not panic
1147    }
1148
1149    #[async_test_all]
1150    async fn test_empty_event_does_not_emit_after_wallet_sync() {
1151        use std::sync::atomic::AtomicUsize;
1152
1153        struct CountingListener {
1154            count: Arc<AtomicUsize>,
1155        }
1156
1157        #[macros::async_trait]
1158        impl EventListener for CountingListener {
1159            async fn on_event(&self, event: SdkEvent) {
1160                if matches!(event, SdkEvent::Synced) {
1161                    self.count.fetch_add(1, Ordering::Relaxed);
1162                }
1163            }
1164        }
1165
1166        let emitter = EventEmitter::new(false);
1167        let count = Arc::new(AtomicUsize::new(0));
1168
1169        let listener = Box::new(CountingListener {
1170            count: count.clone(),
1171        });
1172
1173        emitter.add_external_listener(listener).await;
1174
1175        // First sync with wallet - should emit
1176        emitter
1177            .emit_synced(&InternalSyncedEvent {
1178                wallet: true,
1179                wallet_state: false,
1180                deposits: false,
1181                lnurl_metadata: false,
1182                storage_incoming: None,
1183            })
1184            .await;
1185
1186        assert_eq!(count.load(Ordering::Relaxed), 1);
1187
1188        // Empty sync after wallet sync - should NOT emit (all fields false)
1189        emitter
1190            .emit_synced(&InternalSyncedEvent {
1191                wallet: false,
1192                wallet_state: false,
1193                deposits: false,
1194                lnurl_metadata: false,
1195                storage_incoming: None,
1196            })
1197            .await;
1198
1199        assert_eq!(count.load(Ordering::Relaxed), 1); // Count should remain 1
1200
1201        // Another non-empty sync - should emit
1202        emitter
1203            .emit_synced(&InternalSyncedEvent {
1204                wallet: false,
1205                wallet_state: true,
1206                deposits: false,
1207                lnurl_metadata: false,
1208                storage_incoming: None,
1209            })
1210            .await;
1211
1212        assert_eq!(count.load(Ordering::Relaxed), 2); // Now count should be 2
1213    }
1214}