Skip to main content

breez_sdk_spark/
events.rs

1use core::fmt;
2use std::{
3    collections::BTreeMap,
4    sync::atomic::{AtomicBool, AtomicU64, Ordering},
5};
6
7use platform_utils::time::Instant;
8use serde::Serialize;
9use tokio::sync::{Mutex, RwLock};
10use tracing::info;
11use uuid::Uuid;
12
13use crate::{DepositInfo, LightningAddressInfo, Payment, sdk::RuntimeEvent};
14
15/// Events emitted by the SDK
16#[allow(clippy::large_enum_variant)]
17#[derive(Debug, Clone, Serialize)]
18#[cfg_attr(feature = "uniffi", derive(uniffi::Enum))]
19pub enum SdkEvent {
20    /// Emitted when the wallet has been synchronized with the network
21    Synced,
22    /// Emitted when the SDK was unable to claim deposits
23    UnclaimedDeposits {
24        unclaimed_deposits: Vec<DepositInfo>,
25    },
26    ClaimedDeposits {
27        claimed_deposits: Vec<DepositInfo>,
28    },
29    PaymentSucceeded {
30        payment: Payment,
31    },
32    PaymentPending {
33        payment: Payment,
34    },
35    PaymentFailed {
36        payment: Payment,
37    },
38    Optimization {
39        // Named with `optimization` prefix to avoid collision with `event` keyword in C#
40        optimization_event: OptimizationEvent,
41    },
42    LightningAddressChanged {
43        lightning_address: Option<LightningAddressInfo>,
44    },
45    NewDeposits {
46        new_deposits: Vec<DepositInfo>,
47    },
48}
49
50impl SdkEvent {
51    pub(crate) fn from_payment(payment: Payment) -> Self {
52        match payment.status {
53            crate::PaymentStatus::Completed => SdkEvent::PaymentSucceeded { payment },
54            crate::PaymentStatus::Pending => SdkEvent::PaymentPending { payment },
55            crate::PaymentStatus::Failed => SdkEvent::PaymentFailed { payment },
56        }
57    }
58}
59
60impl fmt::Display for SdkEvent {
61    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
62        match self {
63            SdkEvent::Synced => write!(f, "Synced"),
64            SdkEvent::UnclaimedDeposits { unclaimed_deposits } => {
65                write!(f, "UnclaimedDeposits: {unclaimed_deposits:?}")
66            }
67            SdkEvent::ClaimedDeposits { claimed_deposits } => {
68                write!(f, "ClaimedDeposits: {claimed_deposits:?}")
69            }
70            SdkEvent::PaymentSucceeded { payment } => {
71                write!(f, "PaymentSucceeded: {payment:?}")
72            }
73            SdkEvent::PaymentPending { payment } => {
74                write!(f, "PaymentPending: {payment:?}")
75            }
76            SdkEvent::PaymentFailed { payment } => {
77                write!(f, "PaymentFailed: {payment:?}")
78            }
79            SdkEvent::Optimization {
80                optimization_event: event,
81            } => {
82                write!(f, "Optimization: {event:?}")
83            }
84            SdkEvent::LightningAddressChanged { lightning_address } => {
85                write!(f, "LightningAddressChanged: {lightning_address:?}")
86            }
87            SdkEvent::NewDeposits { new_deposits } => {
88                write!(f, "NewDeposits: {new_deposits:?}")
89            }
90        }
91    }
92}
93
94#[derive(Debug, Clone, Serialize)]
95#[cfg_attr(feature = "uniffi", derive(uniffi::Enum))]
96pub enum OptimizationEvent {
97    /// Optimization has started with the given number of rounds.
98    Started { total_rounds: u32 },
99    /// A round has completed.
100    RoundCompleted {
101        current_round: u32,
102        total_rounds: u32,
103    },
104    /// Optimization completed successfully.
105    Completed,
106    /// Optimization was cancelled.
107    Cancelled,
108    /// Optimization failed with an error.
109    Failed { error: String },
110    /// Optimization was skipped because leaves are already optimal.
111    Skipped,
112}
113
114#[allow(clippy::struct_excessive_bools)]
115#[derive(Debug, Default)]
116pub struct InternalSyncedEvent {
117    pub wallet: bool,
118    pub wallet_state: bool,
119    pub deposits: bool,
120    pub lnurl_metadata: bool,
121    pub storage_incoming: Option<u32>,
122}
123
124impl InternalSyncedEvent {
125    pub fn any(&self) -> bool {
126        self.wallet
127            || self.wallet_state
128            || self.deposits
129            || self.lnurl_metadata
130            || self.storage_incoming.is_some()
131    }
132
133    pub fn any_non_zero(&self) -> bool {
134        self.wallet
135            || self.wallet_state
136            || self.deposits
137            || self.lnurl_metadata
138            || self.storage_incoming.is_some_and(|v| v > 0)
139    }
140
141    pub fn merge(&self, other: &InternalSyncedEvent) -> Self {
142        Self {
143            wallet: self.wallet || other.wallet,
144            wallet_state: self.wallet_state || other.wallet_state,
145            deposits: self.deposits || other.deposits,
146            lnurl_metadata: self.lnurl_metadata || other.lnurl_metadata,
147            storage_incoming: self
148                .storage_incoming
149                .zip(other.storage_incoming)
150                .map(|(a, b)| a.saturating_add(b))
151                .or(self.storage_incoming)
152                .or(other.storage_incoming),
153        }
154    }
155}
156
157/// Trait for event listeners
158#[cfg_attr(feature = "uniffi", uniffi::export(callback_interface))]
159#[macros::async_trait]
160pub trait EventListener: Send + Sync {
161    /// Called when an event occurs
162    async fn on_event(&self, event: SdkEvent);
163}
164
165/// Middleware that can intercept and transform events before they reach external listeners.
166///
167/// Middleware processes events in a chain. Each middleware receives the event from the
168/// previous one and can:
169/// - Pass it through unchanged: `Some(event)`
170/// - Transform it: `Some(modified_event)`
171/// - Suppress it: `None`
172#[macros::async_trait]
173pub trait EventMiddleware: Send + Sync {
174    /// Process an event. Return `Some` to forward (possibly modified), `None` to suppress.
175    async fn process(&self, event: SdkEvent) -> Option<SdkEvent>;
176}
177
178/// Event publisher that manages event listeners and middleware.
179///
180/// Events flow through three phases:
181/// 1. Internal listeners see raw events (SDK components like `wait_for_payment`)
182/// 2. Middleware chain can transform or suppress events
183/// 3. External listeners see processed events (client event handlers)
184pub struct EventEmitter {
185    has_real_time_sync: bool,
186    rtsync_failed: AtomicBool,
187    listener_index: AtomicU64,
188    runtime_event_handlers: RwLock<Vec<Box<dyn RuntimeEventHandler>>>,
189    /// Internal listeners see ALL events before middleware processing
190    internal_listeners: RwLock<BTreeMap<String, Box<dyn EventListener>>>,
191    /// Middleware chain that can transform/suppress events
192    middleware: RwLock<Vec<Box<dyn EventMiddleware>>>,
193    /// External listeners see events after middleware processing
194    external_listeners: RwLock<BTreeMap<String, Box<dyn EventListener>>>,
195    synced_event_buffer: Mutex<Option<InternalSyncedEvent>>,
196}
197
198#[macros::async_trait]
199pub(crate) trait RuntimeEventHandler: Send + Sync {
200    async fn handle(&self, event: RuntimeEvent);
201}
202
203impl EventEmitter {
204    /// Create a new event emitter
205    pub fn new(has_real_time_sync: bool) -> Self {
206        Self {
207            has_real_time_sync,
208            rtsync_failed: AtomicBool::new(false),
209            listener_index: AtomicU64::new(0),
210            runtime_event_handlers: RwLock::new(Vec::new()),
211            internal_listeners: RwLock::new(BTreeMap::new()),
212            middleware: RwLock::new(Vec::new()),
213            external_listeners: RwLock::new(BTreeMap::new()),
214            synced_event_buffer: Mutex::new(Some(InternalSyncedEvent::default())),
215        }
216    }
217
218    /// Add an external listener to receive events
219    ///
220    /// # Arguments
221    ///
222    /// * `listener` - The listener to add
223    ///
224    /// # Returns
225    ///
226    /// A unique identifier for the listener, which can be used to remove it later
227    pub async fn add_external_listener(&self, listener: Box<dyn EventListener>) -> String {
228        let index = self.listener_index.fetch_add(1, Ordering::Relaxed);
229        let id = format!("listener_{}-{}", index, Uuid::new_v4());
230        let mut listeners = self.external_listeners.write().await;
231        listeners.insert(id.clone(), listener);
232        id
233    }
234
235    /// Remove an external listener by its ID
236    ///
237    /// # Arguments
238    ///
239    /// * `id` - The ID returned from `add_listener`
240    ///
241    /// # Returns
242    ///
243    /// `true` if the listener was found and removed, `false` otherwise
244    pub async fn remove_external_listener(&self, id: &str) -> bool {
245        let mut listeners = self.external_listeners.write().await;
246        listeners.remove(id).is_some()
247    }
248
249    /// Add an internal listener that sees all raw events before middleware processing.
250    ///
251    /// Used by SDK components (e.g., `wait_for_payment`) that need to observe events
252    /// that middleware may suppress.
253    pub async fn add_internal_listener(&self, listener: Box<dyn EventListener>) -> String {
254        let index = self.listener_index.fetch_add(1, Ordering::Relaxed);
255        let id = format!("internal_{}-{}", index, Uuid::new_v4());
256        let mut listeners = self.internal_listeners.write().await;
257        listeners.insert(id.clone(), listener);
258        id
259    }
260
261    /// Remove an internal listener by its ID
262    pub async fn remove_internal_listener(&self, id: &str) -> bool {
263        let mut listeners = self.internal_listeners.write().await;
264        listeners.remove(id).is_some()
265    }
266
267    /// Add middleware to the event processing chain.
268    ///
269    /// Middleware can transform or suppress events before they reach external listeners.
270    pub async fn add_middleware(&self, middleware: Box<dyn EventMiddleware>) {
271        let mut mw = self.middleware.write().await;
272        mw.push(middleware);
273    }
274
275    pub(crate) async fn add_runtime_event_handler(&self, handler: Box<dyn RuntimeEventHandler>) {
276        let mut handlers = self.runtime_event_handlers.write().await;
277        handlers.push(handler);
278    }
279
280    pub(crate) async fn emit_runtime_event(&self, event: RuntimeEvent) {
281        let handlers = self.runtime_event_handlers.read().await;
282        for handler in handlers.iter() {
283            handler.handle(event.clone()).await;
284        }
285    }
286
287    /// Emit an event through the three-phase pipeline:
288    /// 1. Internal listeners see the raw event
289    /// 2. Middleware chain can transform or suppress
290    /// 3. External listeners see the processed event
291    pub async fn emit(&self, event: &SdkEvent) {
292        let start = Instant::now();
293        let event_label = format!("{event}");
294        let mut internal_total = std::time::Duration::ZERO;
295        let mut middleware_total = std::time::Duration::ZERO;
296        let mut external_total = std::time::Duration::ZERO;
297
298        // Phase 1: Internal listeners see raw event
299        let internal = self.internal_listeners.read().await;
300        let internal_count = internal.len();
301        for (id, listener) in internal.iter() {
302            let t = Instant::now();
303            listener.on_event(event.clone()).await;
304            let dt = t.elapsed();
305            internal_total = internal_total.saturating_add(dt);
306            info!("emit({event_label}) internal listener {id}: {dt:?}");
307        }
308        drop(internal);
309
310        // Phase 2: Middleware chain
311        let mut event = Some(event.clone());
312        let middleware = self.middleware.read().await;
313        let middleware_count = middleware.len();
314        for (i, mw) in middleware.iter().enumerate() {
315            if let Some(e) = event {
316                let t = Instant::now();
317                event = mw.process(e).await;
318                let dt = t.elapsed();
319                middleware_total = middleware_total.saturating_add(dt);
320                info!("emit({event_label}) middleware #{i}: {dt:?}");
321            } else {
322                break;
323            }
324        }
325        drop(middleware);
326
327        // Phase 3: External listeners see processed event
328        let mut external_count = 0;
329        if let Some(ref event) = event {
330            let listeners = self.external_listeners.read().await;
331            external_count = listeners.len();
332            for (id, listener) in listeners.iter() {
333                let t = Instant::now();
334                listener.on_event(event.clone()).await;
335                let dt = t.elapsed();
336                external_total = external_total.saturating_add(dt);
337                info!("emit({event_label}) external listener {id}: {dt:?}");
338            }
339        }
340
341        info!(
342            "emit({event_label}) completed in {:?} (internal[{}]={:?}, middleware[{}]={:?}, external[{}]={:?})",
343            start.elapsed(),
344            internal_count,
345            internal_total,
346            middleware_count,
347            middleware_total,
348            external_count,
349            external_total
350        );
351    }
352
353    pub async fn emit_synced(&self, synced: &InternalSyncedEvent) {
354        if !synced.any() {
355            // Nothing to emit
356            return;
357        }
358
359        let mut mtx = self.synced_event_buffer.lock().await;
360
361        let is_first_event = if let Some(buffered) = &*mtx {
362            let merged = buffered.merge(synced);
363
364            // The first synced event emitted should at least have the wallet synced.
365            // Subsequent events might have only partial syncs.
366            if merged.wallet
367                && (!self.has_real_time_sync
368                    || merged.storage_incoming.is_some()
369                    || self.rtsync_failed.load(Ordering::Relaxed))
370            {
371                *mtx = None;
372            } else {
373                *mtx = Some(merged);
374                return;
375            }
376
377            true
378        } else {
379            false
380        };
381
382        drop(mtx);
383
384        // Only emit zero real-time syncs on the first event.
385        if !is_first_event && !synced.any_non_zero() {
386            return;
387        }
388
389        // Emit the merged event
390        self.emit(&SdkEvent::Synced).await;
391    }
392
393    /// Notify that real-time sync has failed. If the first synced event is still
394    /// buffered and the wallet has already synced, release it immediately instead
395    /// of waiting for a remote pull that may never arrive.
396    pub async fn notify_rtsync_failed(&self) {
397        self.rtsync_failed.store(true, Ordering::Relaxed);
398
399        let mut mtx = self.synced_event_buffer.lock().await;
400        if let Some(buffered) = &*mtx
401            && buffered.wallet
402        {
403            *mtx = None;
404            drop(mtx);
405            self.emit(&SdkEvent::Synced).await;
406        }
407    }
408}
409
410impl Default for EventEmitter {
411    fn default() -> Self {
412        Self::new(false)
413    }
414}
415
416#[cfg(test)]
417mod tests {
418    use super::*;
419    use std::sync::Arc;
420    use std::sync::atomic::{AtomicBool, Ordering};
421
422    use macros::async_test_all;
423
424    #[cfg(feature = "browser-tests")]
425    wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
426
427    struct TestListener {
428        received: Arc<AtomicBool>,
429    }
430
431    #[macros::async_trait]
432    impl EventListener for TestListener {
433        async fn on_event(&self, _event: SdkEvent) {
434            self.received.store(true, Ordering::Relaxed);
435        }
436    }
437
438    #[async_test_all]
439    async fn test_event_emission() {
440        let emitter = EventEmitter::new(false);
441        let received = Arc::new(AtomicBool::new(false));
442
443        // Create the listener with a shared reference to the atomic boolean
444        let listener = Box::new(TestListener {
445            received: received.clone(),
446        });
447
448        let _ = emitter.add_external_listener(listener).await;
449
450        let event = SdkEvent::Synced {};
451
452        emitter.emit(&event).await;
453
454        // Check if event was received using the shared reference
455        assert!(received.load(Ordering::Relaxed));
456    }
457
458    #[async_test_all]
459    async fn test_remove_listener() {
460        let emitter = EventEmitter::new(false);
461
462        // Create shared atomic booleans to track event reception
463        let received1 = Arc::new(AtomicBool::new(false));
464        let received2 = Arc::new(AtomicBool::new(false));
465
466        // Create listeners with their own shared references
467        let listener1 = Box::new(TestListener {
468            received: received1.clone(),
469        });
470
471        let listener2 = Box::new(TestListener {
472            received: received2.clone(),
473        });
474
475        let id1 = emitter.add_external_listener(listener1).await;
476        let id2 = emitter.add_external_listener(listener2).await;
477
478        // Remove the first listener
479        assert!(emitter.remove_external_listener(&id1).await);
480
481        // Emit an event
482        let event = SdkEvent::Synced {};
483        emitter.emit(&event).await;
484
485        // The first listener should not receive the event
486        assert!(!received1.load(Ordering::Relaxed));
487
488        // The second listener should receive the event
489        assert!(received2.load(Ordering::Relaxed));
490
491        // Remove the second listener
492        assert!(emitter.remove_external_listener(&id2).await);
493
494        // Try to remove a non-existent listener
495        assert!(!emitter.remove_external_listener("non-existent-id").await);
496    }
497
498    #[async_test_all]
499    async fn test_synced_event_only_emitted_with_wallet_sync() {
500        let emitter = EventEmitter::new(false);
501        let received = Arc::new(AtomicBool::new(false));
502
503        let listener = Box::new(TestListener {
504            received: received.clone(),
505        });
506
507        emitter.add_external_listener(listener).await;
508
509        // Emit synced event without wallet sync - should NOT emit Synced
510        emitter
511            .emit_synced(&InternalSyncedEvent {
512                wallet: false,
513                wallet_state: true,
514                deposits: true,
515                lnurl_metadata: true,
516                storage_incoming: None,
517            })
518            .await;
519
520        assert!(!received.load(Ordering::Relaxed));
521
522        // Emit synced event with wallet sync - should emit Synced
523        emitter
524            .emit_synced(&InternalSyncedEvent {
525                wallet: true,
526                wallet_state: false,
527                deposits: false,
528                lnurl_metadata: false,
529                storage_incoming: Some(1),
530            })
531            .await;
532
533        assert!(received.load(Ordering::Relaxed));
534    }
535
536    #[async_test_all]
537    async fn test_has_real_time_sync_synced_event_only_emitted_with_wallet_and_storage_sync() {
538        let emitter = EventEmitter::new(true);
539        let received = Arc::new(AtomicBool::new(false));
540
541        let listener = Box::new(TestListener {
542            received: received.clone(),
543        });
544
545        emitter.add_external_listener(listener).await;
546
547        // Emit synced event with storage
548        emitter
549            .emit_synced(&InternalSyncedEvent {
550                wallet: false,
551                wallet_state: false,
552                deposits: false,
553                lnurl_metadata: false,
554                storage_incoming: Some(0),
555            })
556            .await;
557
558        assert!(!received.load(Ordering::Relaxed));
559
560        // Emit synced event with wallet sync - should emit Synced
561        emitter
562            .emit_synced(&InternalSyncedEvent {
563                wallet: true,
564                wallet_state: false,
565                deposits: false,
566                lnurl_metadata: false,
567                storage_incoming: None,
568            })
569            .await;
570
571        assert!(received.load(Ordering::Relaxed));
572    }
573
574    #[async_test_all]
575    async fn test_has_real_time_sync_synced_event_only_emitted_with_wallet_and_storage_sync_reverse()
576     {
577        let emitter = EventEmitter::new(true);
578        let received = Arc::new(AtomicBool::new(false));
579
580        let listener = Box::new(TestListener {
581            received: received.clone(),
582        });
583
584        emitter.add_external_listener(listener).await;
585
586        // Emit synced event with wallet sync
587        emitter
588            .emit_synced(&InternalSyncedEvent {
589                wallet: true,
590                wallet_state: false,
591                deposits: false,
592                lnurl_metadata: false,
593                storage_incoming: None,
594            })
595            .await;
596
597        assert!(!received.load(Ordering::Relaxed));
598
599        // Emit synced event with storage - should emit Synced
600        emitter
601            .emit_synced(&InternalSyncedEvent {
602                wallet: false,
603                wallet_state: false,
604                deposits: false,
605                lnurl_metadata: false,
606                storage_incoming: Some(0),
607            })
608            .await;
609
610        assert!(received.load(Ordering::Relaxed));
611    }
612
613    #[async_test_all]
614    async fn test_rtsync_failed_emits_synced_on_wallet_alone() {
615        let emitter = EventEmitter::new(true);
616        let received = Arc::new(AtomicBool::new(false));
617
618        let listener = Box::new(TestListener {
619            received: received.clone(),
620        });
621
622        emitter.add_external_listener(listener).await;
623
624        // Wallet synced but rtsync hasn't failed yet — should NOT emit
625        emitter
626            .emit_synced(&InternalSyncedEvent {
627                wallet: true,
628                wallet_state: false,
629                deposits: false,
630                lnurl_metadata: false,
631                storage_incoming: None,
632            })
633            .await;
634
635        assert!(!received.load(Ordering::Relaxed));
636
637        // rtsync fails — should immediately release the buffered event
638        emitter.notify_rtsync_failed().await;
639
640        assert!(received.load(Ordering::Relaxed));
641    }
642
643    #[async_test_all]
644    async fn test_rtsync_failed_before_wallet_sync_emits_on_wallet() {
645        let emitter = EventEmitter::new(true);
646        let received = Arc::new(AtomicBool::new(false));
647
648        let listener = Box::new(TestListener {
649            received: received.clone(),
650        });
651
652        emitter.add_external_listener(listener).await;
653
654        // rtsync fails before wallet syncs — nothing to release yet
655        emitter.notify_rtsync_failed().await;
656
657        assert!(!received.load(Ordering::Relaxed));
658
659        // Wallet syncs — should emit immediately (rtsync already marked failed)
660        emitter
661            .emit_synced(&InternalSyncedEvent {
662                wallet: true,
663                wallet_state: false,
664                deposits: false,
665                lnurl_metadata: false,
666                storage_incoming: None,
667            })
668            .await;
669
670        assert!(received.load(Ordering::Relaxed));
671    }
672
673    #[async_test_all]
674    async fn test_synced_event_buffers_until_wallet_sync() {
675        let emitter = EventEmitter::new(false);
676        let received = Arc::new(AtomicBool::new(false));
677
678        let listener = Box::new(TestListener {
679            received: received.clone(),
680        });
681
682        emitter.add_external_listener(listener).await;
683
684        // Emit multiple partial syncs without wallet sync
685        emitter
686            .emit_synced(&InternalSyncedEvent {
687                wallet: false,
688                wallet_state: true,
689                deposits: false,
690                lnurl_metadata: false,
691                storage_incoming: None,
692            })
693            .await;
694
695        assert!(!received.load(Ordering::Relaxed));
696
697        emitter
698            .emit_synced(&InternalSyncedEvent {
699                wallet: false,
700                wallet_state: false,
701                deposits: true,
702                lnurl_metadata: false,
703                storage_incoming: None,
704            })
705            .await;
706
707        assert!(!received.load(Ordering::Relaxed));
708
709        emitter
710            .emit_synced(&InternalSyncedEvent {
711                wallet: false,
712                wallet_state: false,
713                deposits: false,
714                lnurl_metadata: true,
715                storage_incoming: None,
716            })
717            .await;
718
719        assert!(!received.load(Ordering::Relaxed));
720
721        emitter
722            .emit_synced(&InternalSyncedEvent {
723                wallet: false,
724                wallet_state: false,
725                deposits: false,
726                lnurl_metadata: false,
727                storage_incoming: None,
728            })
729            .await;
730
731        assert!(!received.load(Ordering::Relaxed));
732
733        // Finally emit wallet sync - should emit Synced
734        emitter
735            .emit_synced(&InternalSyncedEvent {
736                wallet: true,
737                wallet_state: false,
738                deposits: false,
739                lnurl_metadata: false,
740                storage_incoming: None,
741            })
742            .await;
743
744        assert!(received.load(Ordering::Relaxed));
745    }
746
747    #[async_test_all]
748    async fn test_synced_event_all_true() {
749        let emitter = EventEmitter::new(false);
750        let received = Arc::new(AtomicBool::new(false));
751
752        let listener = Box::new(TestListener {
753            received: received.clone(),
754        });
755
756        emitter.add_external_listener(listener).await;
757
758        // Emit synced event with wallet and other components - should emit Synced
759        emitter
760            .emit_synced(&InternalSyncedEvent {
761                wallet: true,
762                wallet_state: true,
763                deposits: true,
764                lnurl_metadata: true,
765                storage_incoming: Some(1),
766            })
767            .await;
768
769        assert!(received.load(Ordering::Relaxed));
770    }
771
772    #[async_test_all]
773    async fn test_synced_event_empty_does_not_emit() {
774        let emitter = EventEmitter::new(false);
775        let received = Arc::new(AtomicBool::new(false));
776
777        let listener = Box::new(TestListener {
778            received: received.clone(),
779        });
780
781        emitter.add_external_listener(listener).await;
782
783        // Emit empty synced event - should NOT emit Synced
784        emitter
785            .emit_synced(&InternalSyncedEvent {
786                wallet: false,
787                wallet_state: false,
788                deposits: false,
789                lnurl_metadata: false,
790                storage_incoming: None,
791            })
792            .await;
793
794        assert!(!received.load(Ordering::Relaxed));
795    }
796
797    #[async_test_all]
798    async fn test_subsequent_syncs_after_wallet_emit_immediately() {
799        use std::sync::atomic::AtomicUsize;
800
801        struct CountingListener {
802            count: Arc<AtomicUsize>,
803        }
804
805        #[macros::async_trait]
806        impl EventListener for CountingListener {
807            async fn on_event(&self, event: SdkEvent) {
808                if matches!(event, SdkEvent::Synced) {
809                    self.count.fetch_add(1, Ordering::Relaxed);
810                }
811            }
812        }
813
814        let emitter = EventEmitter::new(true);
815        let count = Arc::new(AtomicUsize::new(0));
816
817        let listener = Box::new(CountingListener {
818            count: count.clone(),
819        });
820
821        emitter.add_external_listener(listener).await;
822
823        // First sync with wallet - should emit
824        emitter
825            .emit_synced(&InternalSyncedEvent {
826                wallet: true,
827                wallet_state: false,
828                deposits: false,
829                lnurl_metadata: false,
830                storage_incoming: Some(0),
831            })
832            .await;
833
834        assert_eq!(count.load(Ordering::Relaxed), 1);
835
836        // Subsequent partial sync without wallet - should emit (buffer cleared after first wallet sync)
837        emitter
838            .emit_synced(&InternalSyncedEvent {
839                wallet: false,
840                wallet_state: true,
841                deposits: false,
842                lnurl_metadata: false,
843                storage_incoming: None,
844            })
845            .await;
846
847        assert_eq!(count.load(Ordering::Relaxed), 2);
848
849        // Another partial sync - should emit
850        emitter
851            .emit_synced(&InternalSyncedEvent {
852                wallet: false,
853                wallet_state: false,
854                deposits: true,
855                lnurl_metadata: false,
856                storage_incoming: None,
857            })
858            .await;
859
860        assert_eq!(count.load(Ordering::Relaxed), 3);
861
862        emitter
863            .emit_synced(&InternalSyncedEvent {
864                wallet: false,
865                wallet_state: false,
866                deposits: false,
867                lnurl_metadata: true,
868                storage_incoming: None,
869            })
870            .await;
871
872        assert_eq!(count.load(Ordering::Relaxed), 4);
873
874        emitter
875            .emit_synced(&InternalSyncedEvent {
876                wallet: false,
877                wallet_state: false,
878                deposits: false,
879                lnurl_metadata: false,
880                storage_incoming: Some(1),
881            })
882            .await;
883
884        assert_eq!(count.load(Ordering::Relaxed), 5);
885
886        // storage_incoming with Some(0) - should NOT emit after first sync
887        emitter
888            .emit_synced(&InternalSyncedEvent {
889                wallet: false,
890                wallet_state: false,
891                deposits: false,
892                lnurl_metadata: false,
893                storage_incoming: Some(0),
894            })
895            .await;
896
897        assert_eq!(count.load(Ordering::Relaxed), 5);
898
899        emitter
900            .emit_synced(&InternalSyncedEvent {
901                wallet: true,
902                wallet_state: false,
903                deposits: false,
904                lnurl_metadata: false,
905                storage_incoming: None,
906            })
907            .await;
908
909        assert_eq!(count.load(Ordering::Relaxed), 6);
910    }
911
912    // ── Helpers for middleware / internal listener tests ──
913
914    /// Listener that records all received events
915    struct RecordingListener {
916        events: Arc<Mutex<Vec<String>>>,
917    }
918
919    impl RecordingListener {
920        fn new() -> (Self, Arc<Mutex<Vec<String>>>) {
921            let events = Arc::new(Mutex::new(Vec::new()));
922            (
923                Self {
924                    events: events.clone(),
925                },
926                events,
927            )
928        }
929    }
930
931    #[macros::async_trait]
932    impl EventListener for RecordingListener {
933        async fn on_event(&self, event: SdkEvent) {
934            self.events.lock().await.push(format!("{event}"));
935        }
936    }
937
938    /// Middleware that suppresses all Synced events
939    struct SuppressSyncedMiddleware;
940
941    #[macros::async_trait]
942    impl EventMiddleware for SuppressSyncedMiddleware {
943        async fn process(&self, event: SdkEvent) -> Option<SdkEvent> {
944            match event {
945                SdkEvent::Synced => None,
946                other => Some(other),
947            }
948        }
949    }
950
951    /// Middleware that replaces `PaymentSucceeded` with `PaymentPending`
952    struct DowngradePaymentMiddleware;
953
954    #[macros::async_trait]
955    impl EventMiddleware for DowngradePaymentMiddleware {
956        async fn process(&self, event: SdkEvent) -> Option<SdkEvent> {
957            match event {
958                SdkEvent::PaymentSucceeded { payment } => {
959                    Some(SdkEvent::PaymentPending { payment })
960                }
961                other => Some(other),
962            }
963        }
964    }
965
966    /// Middleware that suppresses all events
967    struct SuppressAllMiddleware;
968
969    #[macros::async_trait]
970    impl EventMiddleware for SuppressAllMiddleware {
971        async fn process(&self, _event: SdkEvent) -> Option<SdkEvent> {
972            None
973        }
974    }
975
976    fn test_payment() -> Payment {
977        Payment {
978            id: "test-id".to_string(),
979            payment_type: crate::PaymentType::Receive,
980            status: crate::PaymentStatus::Completed,
981            amount: 1000,
982            fees: 10,
983            timestamp: 123_456,
984            method: crate::PaymentMethod::Spark,
985            details: None,
986            conversion_details: None,
987        }
988    }
989
990    // ── Internal listener tests ──
991
992    #[async_test_all]
993    async fn test_internal_listener_receives_events() {
994        let emitter = EventEmitter::new(false);
995        let (listener, events) = RecordingListener::new();
996
997        emitter.add_internal_listener(Box::new(listener)).await;
998
999        emitter.emit(&SdkEvent::Synced).await;
1000
1001        let log = events.lock().await;
1002        assert_eq!(log.len(), 1);
1003        assert!(log[0].contains("Synced"));
1004    }
1005
1006    #[async_test_all]
1007    async fn test_remove_internal_listener() {
1008        let emitter = EventEmitter::new(false);
1009        let (listener, events) = RecordingListener::new();
1010
1011        let id = emitter.add_internal_listener(Box::new(listener)).await;
1012
1013        assert!(emitter.remove_internal_listener(&id).await);
1014        assert!(!emitter.remove_internal_listener(&id).await);
1015
1016        emitter.emit(&SdkEvent::Synced).await;
1017
1018        assert!(events.lock().await.is_empty());
1019    }
1020
1021    // ── Middleware tests ──
1022
1023    #[async_test_all]
1024    async fn test_middleware_suppresses_event_for_external() {
1025        let emitter = EventEmitter::new(false);
1026        let (ext, ext_events) = RecordingListener::new();
1027
1028        emitter.add_external_listener(Box::new(ext)).await;
1029        emitter
1030            .add_middleware(Box::new(SuppressSyncedMiddleware))
1031            .await;
1032
1033        emitter.emit(&SdkEvent::Synced).await;
1034
1035        // External should NOT see the suppressed event
1036        assert!(ext_events.lock().await.is_empty());
1037    }
1038
1039    #[async_test_all]
1040    async fn test_middleware_transforms_event() {
1041        let emitter = EventEmitter::new(false);
1042        let (ext, ext_events) = RecordingListener::new();
1043
1044        emitter.add_external_listener(Box::new(ext)).await;
1045        emitter
1046            .add_middleware(Box::new(DowngradePaymentMiddleware))
1047            .await;
1048
1049        let event = SdkEvent::PaymentSucceeded {
1050            payment: test_payment(),
1051        };
1052        emitter.emit(&event).await;
1053
1054        let log = ext_events.lock().await;
1055        assert_eq!(log.len(), 1);
1056        assert!(log[0].contains("PaymentPending"));
1057    }
1058
1059    #[async_test_all]
1060    async fn test_middleware_passthrough_unmatched_events() {
1061        let emitter = EventEmitter::new(false);
1062        let (ext, ext_events) = RecordingListener::new();
1063
1064        emitter.add_external_listener(Box::new(ext)).await;
1065        emitter
1066            .add_middleware(Box::new(SuppressSyncedMiddleware))
1067            .await;
1068
1069        // Synced is suppressed, PaymentSucceeded passes through
1070        emitter.emit(&SdkEvent::Synced).await;
1071        let event = SdkEvent::PaymentSucceeded {
1072            payment: test_payment(),
1073        };
1074        emitter.emit(&event).await;
1075
1076        let log = ext_events.lock().await;
1077        assert_eq!(log.len(), 1);
1078        assert!(log[0].contains("PaymentSucceeded"));
1079    }
1080
1081    #[async_test_all]
1082    async fn test_middleware_chain_ordering() {
1083        let emitter = EventEmitter::new(false);
1084        let (ext, ext_events) = RecordingListener::new();
1085
1086        emitter.add_external_listener(Box::new(ext)).await;
1087
1088        // First middleware transforms PaymentSucceeded → PaymentPending
1089        emitter
1090            .add_middleware(Box::new(DowngradePaymentMiddleware))
1091            .await;
1092        // Second middleware suppresses Synced (doesn't affect PaymentPending)
1093        emitter
1094            .add_middleware(Box::new(SuppressSyncedMiddleware))
1095            .await;
1096
1097        let event = SdkEvent::PaymentSucceeded {
1098            payment: test_payment(),
1099        };
1100        emitter.emit(&event).await;
1101
1102        let log = ext_events.lock().await;
1103        assert_eq!(log.len(), 1);
1104        assert!(log[0].contains("PaymentPending"));
1105    }
1106
1107    #[async_test_all]
1108    async fn test_suppress_all_middleware_stops_chain() {
1109        let emitter = EventEmitter::new(false);
1110        let (ext, ext_events) = RecordingListener::new();
1111
1112        emitter.add_external_listener(Box::new(ext)).await;
1113
1114        // SuppressAll first — nothing should reach the next middleware or external
1115        emitter
1116            .add_middleware(Box::new(SuppressAllMiddleware))
1117            .await;
1118        emitter
1119            .add_middleware(Box::new(DowngradePaymentMiddleware))
1120            .await;
1121
1122        emitter.emit(&SdkEvent::Synced).await;
1123        let event = SdkEvent::PaymentSucceeded {
1124            payment: test_payment(),
1125        };
1126        emitter.emit(&event).await;
1127
1128        assert!(ext_events.lock().await.is_empty());
1129    }
1130
1131    // ── Three-phase flow tests ──
1132
1133    #[async_test_all]
1134    async fn test_three_phase_emit_flow() {
1135        let emitter = EventEmitter::new(false);
1136        let (int, int_events) = RecordingListener::new();
1137        let (ext, ext_events) = RecordingListener::new();
1138
1139        emitter.add_internal_listener(Box::new(int)).await;
1140        emitter.add_external_listener(Box::new(ext)).await;
1141        emitter
1142            .add_middleware(Box::new(SuppressSyncedMiddleware))
1143            .await;
1144
1145        // Synced: internal sees it, middleware suppresses it, external doesn't
1146        emitter.emit(&SdkEvent::Synced).await;
1147
1148        assert_eq!(int_events.lock().await.len(), 1);
1149        assert!(ext_events.lock().await.is_empty());
1150
1151        // PaymentSucceeded: both see it (middleware passes it through)
1152        let event = SdkEvent::PaymentSucceeded {
1153            payment: test_payment(),
1154        };
1155        emitter.emit(&event).await;
1156
1157        assert_eq!(int_events.lock().await.len(), 2);
1158        assert_eq!(ext_events.lock().await.len(), 1);
1159    }
1160
1161    #[async_test_all]
1162    async fn test_internal_sees_raw_event_external_sees_transformed() {
1163        let emitter = EventEmitter::new(false);
1164        let (int, int_events) = RecordingListener::new();
1165        let (ext, ext_events) = RecordingListener::new();
1166
1167        emitter.add_internal_listener(Box::new(int)).await;
1168        emitter.add_external_listener(Box::new(ext)).await;
1169        emitter
1170            .add_middleware(Box::new(DowngradePaymentMiddleware))
1171            .await;
1172
1173        let event = SdkEvent::PaymentSucceeded {
1174            payment: test_payment(),
1175        };
1176        emitter.emit(&event).await;
1177
1178        let int_log = int_events.lock().await;
1179        let ext_log = ext_events.lock().await;
1180
1181        // Internal sees the original PaymentSucceeded
1182        assert_eq!(int_log.len(), 1);
1183        assert!(int_log[0].contains("PaymentSucceeded"));
1184
1185        // External sees the transformed PaymentPending
1186        assert_eq!(ext_log.len(), 1);
1187        assert!(ext_log[0].contains("PaymentPending"));
1188    }
1189
1190    #[async_test_all]
1191    async fn test_no_listeners_no_middleware_does_not_panic() {
1192        let emitter = EventEmitter::new(false);
1193        emitter.emit(&SdkEvent::Synced).await;
1194        // Should not panic
1195    }
1196
1197    #[async_test_all]
1198    async fn test_empty_event_does_not_emit_after_wallet_sync() {
1199        use std::sync::atomic::AtomicUsize;
1200
1201        struct CountingListener {
1202            count: Arc<AtomicUsize>,
1203        }
1204
1205        #[macros::async_trait]
1206        impl EventListener for CountingListener {
1207            async fn on_event(&self, event: SdkEvent) {
1208                if matches!(event, SdkEvent::Synced) {
1209                    self.count.fetch_add(1, Ordering::Relaxed);
1210                }
1211            }
1212        }
1213
1214        let emitter = EventEmitter::new(false);
1215        let count = Arc::new(AtomicUsize::new(0));
1216
1217        let listener = Box::new(CountingListener {
1218            count: count.clone(),
1219        });
1220
1221        emitter.add_external_listener(listener).await;
1222
1223        // First sync with wallet - should emit
1224        emitter
1225            .emit_synced(&InternalSyncedEvent {
1226                wallet: true,
1227                wallet_state: false,
1228                deposits: false,
1229                lnurl_metadata: false,
1230                storage_incoming: None,
1231            })
1232            .await;
1233
1234        assert_eq!(count.load(Ordering::Relaxed), 1);
1235
1236        // Empty sync after wallet sync - should NOT emit (all fields false)
1237        emitter
1238            .emit_synced(&InternalSyncedEvent {
1239                wallet: false,
1240                wallet_state: false,
1241                deposits: false,
1242                lnurl_metadata: false,
1243                storage_incoming: None,
1244            })
1245            .await;
1246
1247        assert_eq!(count.load(Ordering::Relaxed), 1); // Count should remain 1
1248
1249        // Another non-empty sync - should emit
1250        emitter
1251            .emit_synced(&InternalSyncedEvent {
1252                wallet: false,
1253                wallet_state: true,
1254                deposits: false,
1255                lnurl_metadata: false,
1256                storage_incoming: None,
1257            })
1258            .await;
1259
1260        assert_eq!(count.load(Ordering::Relaxed), 2); // Now count should be 2
1261    }
1262}