breez_sdk_spark/
events.rs

1use core::fmt;
2use std::{
3    collections::BTreeMap,
4    sync::atomic::{AtomicU64, Ordering},
5};
6
7use serde::Serialize;
8use tokio::sync::{Mutex, RwLock};
9use uuid::Uuid;
10
11use crate::{DepositInfo, Payment};
12
13/// Events emitted by the SDK
14#[allow(clippy::large_enum_variant)]
15#[derive(Debug, Clone, Serialize)]
16#[cfg_attr(feature = "uniffi", derive(uniffi::Enum))]
17pub enum SdkEvent {
18    /// Emitted when the wallet has been synchronized with the network
19    Synced,
20    /// Emitted when the SDK was unable to claim deposits
21    UnclaimedDeposits {
22        unclaimed_deposits: Vec<DepositInfo>,
23    },
24    ClaimedDeposits {
25        claimed_deposits: Vec<DepositInfo>,
26    },
27    PaymentSucceeded {
28        payment: Payment,
29    },
30    PaymentPending {
31        payment: Payment,
32    },
33    PaymentFailed {
34        payment: Payment,
35    },
36}
37
38impl fmt::Display for SdkEvent {
39    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
40        match self {
41            SdkEvent::Synced => write!(f, "Synced"),
42            SdkEvent::UnclaimedDeposits { unclaimed_deposits } => {
43                write!(f, "UnclaimedDeposits: {unclaimed_deposits:?}")
44            }
45            SdkEvent::ClaimedDeposits { claimed_deposits } => {
46                write!(f, "ClaimedDeposits: {claimed_deposits:?}")
47            }
48            SdkEvent::PaymentSucceeded { payment } => {
49                write!(f, "PaymentSucceeded: {payment:?}")
50            }
51            SdkEvent::PaymentPending { payment } => {
52                write!(f, "PaymentPending: {payment:?}")
53            }
54            SdkEvent::PaymentFailed { payment } => {
55                write!(f, "PaymentFailed: {payment:?}")
56            }
57        }
58    }
59}
60
61#[allow(clippy::struct_excessive_bools)]
62#[derive(Debug, Default)]
63pub struct InternalSyncedEvent {
64    pub wallet: bool,
65    pub wallet_state: bool,
66    pub deposits: bool,
67    pub lnurl_metadata: bool,
68    pub storage_incoming: Option<u32>,
69}
70
71impl InternalSyncedEvent {
72    pub fn any(&self) -> bool {
73        self.wallet
74            || self.wallet_state
75            || self.deposits
76            || self.lnurl_metadata
77            || self.storage_incoming.is_some()
78    }
79
80    pub fn any_non_zero(&self) -> bool {
81        self.wallet
82            || self.wallet_state
83            || self.deposits
84            || self.lnurl_metadata
85            || self.storage_incoming.is_some_and(|v| v > 0)
86    }
87
88    pub fn merge(&self, other: &InternalSyncedEvent) -> Self {
89        Self {
90            wallet: self.wallet || other.wallet,
91            wallet_state: self.wallet_state || other.wallet_state,
92            deposits: self.deposits || other.deposits,
93            lnurl_metadata: self.lnurl_metadata || other.lnurl_metadata,
94            storage_incoming: self
95                .storage_incoming
96                .zip(other.storage_incoming)
97                .map(|(a, b)| a.saturating_add(b))
98                .or(self.storage_incoming)
99                .or(other.storage_incoming),
100        }
101    }
102}
103
104/// Trait for event listeners
105#[cfg_attr(feature = "uniffi", uniffi::export(callback_interface))]
106#[macros::async_trait]
107pub trait EventListener: Send + Sync {
108    /// Called when an event occurs
109    async fn on_event(&self, event: SdkEvent);
110}
111
112/// Event publisher that manages event listeners
113pub struct EventEmitter {
114    has_real_time_sync: bool,
115    listener_index: AtomicU64,
116    listeners: RwLock<BTreeMap<String, Box<dyn EventListener>>>,
117    synced_event_buffer: Mutex<Option<InternalSyncedEvent>>,
118}
119
120impl EventEmitter {
121    /// Create a new event emitter
122    pub fn new(has_real_time_sync: bool) -> Self {
123        Self {
124            has_real_time_sync,
125            listener_index: AtomicU64::new(0),
126            listeners: RwLock::new(BTreeMap::new()),
127            synced_event_buffer: Mutex::new(Some(InternalSyncedEvent::default())),
128        }
129    }
130
131    /// Add a listener to receive events
132    ///
133    /// # Arguments
134    ///
135    /// * `listener` - The listener to add
136    ///
137    /// # Returns
138    ///
139    /// A unique identifier for the listener, which can be used to remove it later
140    pub async fn add_listener(&self, listener: Box<dyn EventListener>) -> String {
141        let index = self.listener_index.fetch_add(1, Ordering::Relaxed);
142        let id = format!("listener_{}-{}", index, Uuid::new_v4());
143        let mut listeners = self.listeners.write().await;
144        listeners.insert(id.clone(), listener);
145        id
146    }
147
148    /// Remove a listener by its ID
149    ///
150    /// # Arguments
151    ///
152    /// * `id` - The ID returned from `add_listener`
153    ///
154    /// # Returns
155    ///
156    /// `true` if the listener was found and removed, `false` otherwise
157    pub async fn remove_listener(&self, id: &str) -> bool {
158        let mut listeners = self.listeners.write().await;
159        listeners.remove(id).is_some()
160    }
161
162    /// Emit an event to all registered listeners
163    pub async fn emit(&self, event: &SdkEvent) {
164        // Get a read lock on the listeners
165        let listeners = self.listeners.read().await;
166
167        // Emit the event to each listener
168        for listener in listeners.values() {
169            listener.on_event(event.clone()).await;
170        }
171    }
172
173    pub async fn emit_synced(&self, synced: &InternalSyncedEvent) {
174        if !synced.any() {
175            // Nothing to emit
176            return;
177        }
178
179        let mut mtx = self.synced_event_buffer.lock().await;
180
181        let is_first_event = if let Some(buffered) = &*mtx {
182            let merged = buffered.merge(synced);
183
184            // The first synced event emitted should at least have the wallet synced.
185            // Subsequent events might have only partial syncs.
186            if merged.wallet && (!self.has_real_time_sync || merged.storage_incoming.is_some()) {
187                *mtx = None;
188            } else {
189                *mtx = Some(merged);
190                return;
191            }
192
193            true
194        } else {
195            false
196        };
197
198        drop(mtx);
199
200        // Only emit zero real-time syncs on the first event.
201        if !is_first_event && !synced.any_non_zero() {
202            return;
203        }
204
205        // Emit the merged event
206        self.emit(&SdkEvent::Synced).await;
207    }
208}
209
210impl Default for EventEmitter {
211    fn default() -> Self {
212        Self::new(false)
213    }
214}
215
216#[cfg(test)]
217mod tests {
218    use super::*;
219    use std::sync::Arc;
220    use std::sync::atomic::{AtomicBool, Ordering};
221
222    use macros::async_test_all;
223
224    #[cfg(feature = "browser-tests")]
225    wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
226
227    struct TestListener {
228        received: Arc<AtomicBool>,
229    }
230
231    #[macros::async_trait]
232    impl EventListener for TestListener {
233        async fn on_event(&self, _event: SdkEvent) {
234            self.received.store(true, Ordering::Relaxed);
235        }
236    }
237
238    #[async_test_all]
239    async fn test_event_emission() {
240        let emitter = EventEmitter::new(false);
241        let received = Arc::new(AtomicBool::new(false));
242
243        // Create the listener with a shared reference to the atomic boolean
244        let listener = Box::new(TestListener {
245            received: received.clone(),
246        });
247
248        let _ = emitter.add_listener(listener).await;
249
250        let event = SdkEvent::Synced {};
251
252        emitter.emit(&event).await;
253
254        // Check if event was received using the shared reference
255        assert!(received.load(Ordering::Relaxed));
256    }
257
258    #[async_test_all]
259    async fn test_remove_listener() {
260        let emitter = EventEmitter::new(false);
261
262        // Create shared atomic booleans to track event reception
263        let received1 = Arc::new(AtomicBool::new(false));
264        let received2 = Arc::new(AtomicBool::new(false));
265
266        // Create listeners with their own shared references
267        let listener1 = Box::new(TestListener {
268            received: received1.clone(),
269        });
270
271        let listener2 = Box::new(TestListener {
272            received: received2.clone(),
273        });
274
275        let id1 = emitter.add_listener(listener1).await;
276        let id2 = emitter.add_listener(listener2).await;
277
278        // Remove the first listener
279        assert!(emitter.remove_listener(&id1).await);
280
281        // Emit an event
282        let event = SdkEvent::Synced {};
283        emitter.emit(&event).await;
284
285        // The first listener should not receive the event
286        assert!(!received1.load(Ordering::Relaxed));
287
288        // The second listener should receive the event
289        assert!(received2.load(Ordering::Relaxed));
290
291        // Remove the second listener
292        assert!(emitter.remove_listener(&id2).await);
293
294        // Try to remove a non-existent listener
295        assert!(!emitter.remove_listener("non-existent-id").await);
296    }
297
298    #[async_test_all]
299    async fn test_synced_event_only_emitted_with_wallet_sync() {
300        let emitter = EventEmitter::new(false);
301        let received = Arc::new(AtomicBool::new(false));
302
303        let listener = Box::new(TestListener {
304            received: received.clone(),
305        });
306
307        emitter.add_listener(listener).await;
308
309        // Emit synced event without wallet sync - should NOT emit Synced
310        emitter
311            .emit_synced(&InternalSyncedEvent {
312                wallet: false,
313                wallet_state: true,
314                deposits: true,
315                lnurl_metadata: true,
316                storage_incoming: None,
317            })
318            .await;
319
320        assert!(!received.load(Ordering::Relaxed));
321
322        // Emit synced event with wallet sync - should emit Synced
323        emitter
324            .emit_synced(&InternalSyncedEvent {
325                wallet: true,
326                wallet_state: false,
327                deposits: false,
328                lnurl_metadata: false,
329                storage_incoming: Some(1),
330            })
331            .await;
332
333        assert!(received.load(Ordering::Relaxed));
334    }
335
336    #[async_test_all]
337    async fn test_has_real_time_sync_synced_event_only_emitted_with_wallet_and_storage_sync() {
338        let emitter = EventEmitter::new(true);
339        let received = Arc::new(AtomicBool::new(false));
340
341        let listener = Box::new(TestListener {
342            received: received.clone(),
343        });
344
345        emitter.add_listener(listener).await;
346
347        // Emit synced event with storage
348        emitter
349            .emit_synced(&InternalSyncedEvent {
350                wallet: false,
351                wallet_state: false,
352                deposits: false,
353                lnurl_metadata: false,
354                storage_incoming: Some(0),
355            })
356            .await;
357
358        assert!(!received.load(Ordering::Relaxed));
359
360        // Emit synced event with wallet sync - should emit Synced
361        emitter
362            .emit_synced(&InternalSyncedEvent {
363                wallet: true,
364                wallet_state: false,
365                deposits: false,
366                lnurl_metadata: false,
367                storage_incoming: None,
368            })
369            .await;
370
371        assert!(received.load(Ordering::Relaxed));
372    }
373
374    #[async_test_all]
375    async fn test_has_real_time_sync_synced_event_only_emitted_with_wallet_and_storage_sync_reverse()
376     {
377        let emitter = EventEmitter::new(true);
378        let received = Arc::new(AtomicBool::new(false));
379
380        let listener = Box::new(TestListener {
381            received: received.clone(),
382        });
383
384        emitter.add_listener(listener).await;
385
386        // Emit synced event with wallet sync
387        emitter
388            .emit_synced(&InternalSyncedEvent {
389                wallet: true,
390                wallet_state: false,
391                deposits: false,
392                lnurl_metadata: false,
393                storage_incoming: None,
394            })
395            .await;
396
397        assert!(!received.load(Ordering::Relaxed));
398
399        // Emit synced event with storage - should emit Synced
400        emitter
401            .emit_synced(&InternalSyncedEvent {
402                wallet: false,
403                wallet_state: false,
404                deposits: false,
405                lnurl_metadata: false,
406                storage_incoming: Some(0),
407            })
408            .await;
409
410        assert!(received.load(Ordering::Relaxed));
411    }
412
413    #[async_test_all]
414    async fn test_synced_event_buffers_until_wallet_sync() {
415        let emitter = EventEmitter::new(false);
416        let received = Arc::new(AtomicBool::new(false));
417
418        let listener = Box::new(TestListener {
419            received: received.clone(),
420        });
421
422        emitter.add_listener(listener).await;
423
424        // Emit multiple partial syncs without wallet sync
425        emitter
426            .emit_synced(&InternalSyncedEvent {
427                wallet: false,
428                wallet_state: true,
429                deposits: false,
430                lnurl_metadata: false,
431                storage_incoming: None,
432            })
433            .await;
434
435        assert!(!received.load(Ordering::Relaxed));
436
437        emitter
438            .emit_synced(&InternalSyncedEvent {
439                wallet: false,
440                wallet_state: false,
441                deposits: true,
442                lnurl_metadata: false,
443                storage_incoming: None,
444            })
445            .await;
446
447        assert!(!received.load(Ordering::Relaxed));
448
449        emitter
450            .emit_synced(&InternalSyncedEvent {
451                wallet: false,
452                wallet_state: false,
453                deposits: false,
454                lnurl_metadata: true,
455                storage_incoming: None,
456            })
457            .await;
458
459        assert!(!received.load(Ordering::Relaxed));
460
461        emitter
462            .emit_synced(&InternalSyncedEvent {
463                wallet: false,
464                wallet_state: false,
465                deposits: false,
466                lnurl_metadata: false,
467                storage_incoming: None,
468            })
469            .await;
470
471        assert!(!received.load(Ordering::Relaxed));
472
473        // Finally emit wallet sync - should emit Synced
474        emitter
475            .emit_synced(&InternalSyncedEvent {
476                wallet: true,
477                wallet_state: false,
478                deposits: false,
479                lnurl_metadata: false,
480                storage_incoming: None,
481            })
482            .await;
483
484        assert!(received.load(Ordering::Relaxed));
485    }
486
487    #[async_test_all]
488    async fn test_synced_event_all_true() {
489        let emitter = EventEmitter::new(false);
490        let received = Arc::new(AtomicBool::new(false));
491
492        let listener = Box::new(TestListener {
493            received: received.clone(),
494        });
495
496        emitter.add_listener(listener).await;
497
498        // Emit synced event with wallet and other components - should emit Synced
499        emitter
500            .emit_synced(&InternalSyncedEvent {
501                wallet: true,
502                wallet_state: true,
503                deposits: true,
504                lnurl_metadata: true,
505                storage_incoming: Some(1),
506            })
507            .await;
508
509        assert!(received.load(Ordering::Relaxed));
510    }
511
512    #[async_test_all]
513    async fn test_synced_event_empty_does_not_emit() {
514        let emitter = EventEmitter::new(false);
515        let received = Arc::new(AtomicBool::new(false));
516
517        let listener = Box::new(TestListener {
518            received: received.clone(),
519        });
520
521        emitter.add_listener(listener).await;
522
523        // Emit empty synced event - should NOT emit Synced
524        emitter
525            .emit_synced(&InternalSyncedEvent {
526                wallet: false,
527                wallet_state: false,
528                deposits: false,
529                lnurl_metadata: false,
530                storage_incoming: None,
531            })
532            .await;
533
534        assert!(!received.load(Ordering::Relaxed));
535    }
536
537    #[async_test_all]
538    async fn test_subsequent_syncs_after_wallet_emit_immediately() {
539        use std::sync::atomic::AtomicUsize;
540
541        struct CountingListener {
542            count: Arc<AtomicUsize>,
543        }
544
545        #[macros::async_trait]
546        impl EventListener for CountingListener {
547            async fn on_event(&self, event: SdkEvent) {
548                if matches!(event, SdkEvent::Synced) {
549                    self.count.fetch_add(1, Ordering::Relaxed);
550                }
551            }
552        }
553
554        let emitter = EventEmitter::new(true);
555        let count = Arc::new(AtomicUsize::new(0));
556
557        let listener = Box::new(CountingListener {
558            count: count.clone(),
559        });
560
561        emitter.add_listener(listener).await;
562
563        // First sync with wallet - should emit
564        emitter
565            .emit_synced(&InternalSyncedEvent {
566                wallet: true,
567                wallet_state: false,
568                deposits: false,
569                lnurl_metadata: false,
570                storage_incoming: Some(0),
571            })
572            .await;
573
574        assert_eq!(count.load(Ordering::Relaxed), 1);
575
576        // Subsequent partial sync without wallet - should emit (buffer cleared after first wallet sync)
577        emitter
578            .emit_synced(&InternalSyncedEvent {
579                wallet: false,
580                wallet_state: true,
581                deposits: false,
582                lnurl_metadata: false,
583                storage_incoming: None,
584            })
585            .await;
586
587        assert_eq!(count.load(Ordering::Relaxed), 2);
588
589        // Another partial sync - should emit
590        emitter
591            .emit_synced(&InternalSyncedEvent {
592                wallet: false,
593                wallet_state: false,
594                deposits: true,
595                lnurl_metadata: false,
596                storage_incoming: None,
597            })
598            .await;
599
600        assert_eq!(count.load(Ordering::Relaxed), 3);
601
602        emitter
603            .emit_synced(&InternalSyncedEvent {
604                wallet: false,
605                wallet_state: false,
606                deposits: false,
607                lnurl_metadata: true,
608                storage_incoming: None,
609            })
610            .await;
611
612        assert_eq!(count.load(Ordering::Relaxed), 4);
613
614        emitter
615            .emit_synced(&InternalSyncedEvent {
616                wallet: false,
617                wallet_state: false,
618                deposits: false,
619                lnurl_metadata: false,
620                storage_incoming: Some(1),
621            })
622            .await;
623
624        assert_eq!(count.load(Ordering::Relaxed), 5);
625
626        // storage_incoming with Some(0) - should NOT emit after first sync
627        emitter
628            .emit_synced(&InternalSyncedEvent {
629                wallet: false,
630                wallet_state: false,
631                deposits: false,
632                lnurl_metadata: false,
633                storage_incoming: Some(0),
634            })
635            .await;
636
637        assert_eq!(count.load(Ordering::Relaxed), 5);
638
639        emitter
640            .emit_synced(&InternalSyncedEvent {
641                wallet: true,
642                wallet_state: false,
643                deposits: false,
644                lnurl_metadata: false,
645                storage_incoming: None,
646            })
647            .await;
648
649        assert_eq!(count.load(Ordering::Relaxed), 6);
650    }
651
652    #[async_test_all]
653    async fn test_empty_event_does_not_emit_after_wallet_sync() {
654        use std::sync::atomic::AtomicUsize;
655
656        struct CountingListener {
657            count: Arc<AtomicUsize>,
658        }
659
660        #[macros::async_trait]
661        impl EventListener for CountingListener {
662            async fn on_event(&self, event: SdkEvent) {
663                if matches!(event, SdkEvent::Synced) {
664                    self.count.fetch_add(1, Ordering::Relaxed);
665                }
666            }
667        }
668
669        let emitter = EventEmitter::new(false);
670        let count = Arc::new(AtomicUsize::new(0));
671
672        let listener = Box::new(CountingListener {
673            count: count.clone(),
674        });
675
676        emitter.add_listener(listener).await;
677
678        // First sync with wallet - should emit
679        emitter
680            .emit_synced(&InternalSyncedEvent {
681                wallet: true,
682                wallet_state: false,
683                deposits: false,
684                lnurl_metadata: false,
685                storage_incoming: None,
686            })
687            .await;
688
689        assert_eq!(count.load(Ordering::Relaxed), 1);
690
691        // Empty sync after wallet sync - should NOT emit (all fields false)
692        emitter
693            .emit_synced(&InternalSyncedEvent {
694                wallet: false,
695                wallet_state: false,
696                deposits: false,
697                lnurl_metadata: false,
698                storage_incoming: None,
699            })
700            .await;
701
702        assert_eq!(count.load(Ordering::Relaxed), 1); // Count should remain 1
703
704        // Another non-empty sync - should emit
705        emitter
706            .emit_synced(&InternalSyncedEvent {
707                wallet: false,
708                wallet_state: true,
709                deposits: false,
710                lnurl_metadata: false,
711                storage_incoming: None,
712            })
713            .await;
714
715        assert_eq!(count.load(Ordering::Relaxed), 2); // Now count should be 2
716    }
717}