breez_sdk_spark/
events.rs

1use core::fmt;
2use std::{
3    collections::BTreeMap,
4    sync::atomic::{AtomicU64, Ordering},
5};
6
7use serde::Serialize;
8use tokio::sync::{Mutex, RwLock};
9use uuid::Uuid;
10
11use crate::{DepositInfo, Payment};
12
13/// Events emitted by the SDK
14#[allow(clippy::large_enum_variant)]
15#[derive(Debug, Clone, Serialize)]
16#[cfg_attr(feature = "uniffi", derive(uniffi::Enum))]
17pub enum SdkEvent {
18    /// Emitted when the wallet has been synchronized with the network
19    Synced,
20    /// Emitted when the SDK was unable to claim deposits
21    UnclaimedDeposits {
22        unclaimed_deposits: Vec<DepositInfo>,
23    },
24    ClaimedDeposits {
25        claimed_deposits: Vec<DepositInfo>,
26    },
27    PaymentSucceeded {
28        payment: Payment,
29    },
30    PaymentPending {
31        payment: Payment,
32    },
33    PaymentFailed {
34        payment: Payment,
35    },
36    Optimization {
37        // Named with `optimization` prefix to avoid collision with `event` keyword in C#
38        optimization_event: OptimizationEvent,
39    },
40}
41
42impl SdkEvent {
43    pub(crate) fn from_payment(payment: Payment) -> Self {
44        match payment.status {
45            crate::PaymentStatus::Completed => SdkEvent::PaymentSucceeded { payment },
46            crate::PaymentStatus::Pending => SdkEvent::PaymentPending { payment },
47            crate::PaymentStatus::Failed => SdkEvent::PaymentFailed { payment },
48        }
49    }
50}
51
52impl fmt::Display for SdkEvent {
53    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
54        match self {
55            SdkEvent::Synced => write!(f, "Synced"),
56            SdkEvent::UnclaimedDeposits { unclaimed_deposits } => {
57                write!(f, "UnclaimedDeposits: {unclaimed_deposits:?}")
58            }
59            SdkEvent::ClaimedDeposits { claimed_deposits } => {
60                write!(f, "ClaimedDeposits: {claimed_deposits:?}")
61            }
62            SdkEvent::PaymentSucceeded { payment } => {
63                write!(f, "PaymentSucceeded: {payment:?}")
64            }
65            SdkEvent::PaymentPending { payment } => {
66                write!(f, "PaymentPending: {payment:?}")
67            }
68            SdkEvent::PaymentFailed { payment } => {
69                write!(f, "PaymentFailed: {payment:?}")
70            }
71            SdkEvent::Optimization {
72                optimization_event: event,
73            } => {
74                write!(f, "Optimization: {event:?}")
75            }
76        }
77    }
78}
79
80#[derive(Debug, Clone, Serialize)]
81#[cfg_attr(feature = "uniffi", derive(uniffi::Enum))]
82pub enum OptimizationEvent {
83    /// Optimization has started with the given number of rounds.
84    Started { total_rounds: u32 },
85    /// A round has completed.
86    RoundCompleted {
87        current_round: u32,
88        total_rounds: u32,
89    },
90    /// Optimization completed successfully.
91    Completed,
92    /// Optimization was cancelled.
93    Cancelled,
94    /// Optimization failed with an error.
95    Failed { error: String },
96    /// Optimization was skipped because leaves are already optimal.
97    Skipped,
98}
99
100#[allow(clippy::struct_excessive_bools)]
101#[derive(Debug, Default)]
102pub struct InternalSyncedEvent {
103    pub wallet: bool,
104    pub wallet_state: bool,
105    pub deposits: bool,
106    pub lnurl_metadata: bool,
107    pub storage_incoming: Option<u32>,
108}
109
110impl InternalSyncedEvent {
111    pub fn any(&self) -> bool {
112        self.wallet
113            || self.wallet_state
114            || self.deposits
115            || self.lnurl_metadata
116            || self.storage_incoming.is_some()
117    }
118
119    pub fn any_non_zero(&self) -> bool {
120        self.wallet
121            || self.wallet_state
122            || self.deposits
123            || self.lnurl_metadata
124            || self.storage_incoming.is_some_and(|v| v > 0)
125    }
126
127    pub fn merge(&self, other: &InternalSyncedEvent) -> Self {
128        Self {
129            wallet: self.wallet || other.wallet,
130            wallet_state: self.wallet_state || other.wallet_state,
131            deposits: self.deposits || other.deposits,
132            lnurl_metadata: self.lnurl_metadata || other.lnurl_metadata,
133            storage_incoming: self
134                .storage_incoming
135                .zip(other.storage_incoming)
136                .map(|(a, b)| a.saturating_add(b))
137                .or(self.storage_incoming)
138                .or(other.storage_incoming),
139        }
140    }
141}
142
143/// Trait for event listeners
144#[cfg_attr(feature = "uniffi", uniffi::export(callback_interface))]
145#[macros::async_trait]
146pub trait EventListener: Send + Sync {
147    /// Called when an event occurs
148    async fn on_event(&self, event: SdkEvent);
149}
150
151/// Event publisher that manages event listeners
152pub struct EventEmitter {
153    has_real_time_sync: bool,
154    listener_index: AtomicU64,
155    listeners: RwLock<BTreeMap<String, Box<dyn EventListener>>>,
156    synced_event_buffer: Mutex<Option<InternalSyncedEvent>>,
157}
158
159impl EventEmitter {
160    /// Create a new event emitter
161    pub fn new(has_real_time_sync: bool) -> Self {
162        Self {
163            has_real_time_sync,
164            listener_index: AtomicU64::new(0),
165            listeners: RwLock::new(BTreeMap::new()),
166            synced_event_buffer: Mutex::new(Some(InternalSyncedEvent::default())),
167        }
168    }
169
170    /// Add a listener to receive events
171    ///
172    /// # Arguments
173    ///
174    /// * `listener` - The listener to add
175    ///
176    /// # Returns
177    ///
178    /// A unique identifier for the listener, which can be used to remove it later
179    pub async fn add_listener(&self, listener: Box<dyn EventListener>) -> String {
180        let index = self.listener_index.fetch_add(1, Ordering::Relaxed);
181        let id = format!("listener_{}-{}", index, Uuid::new_v4());
182        let mut listeners = self.listeners.write().await;
183        listeners.insert(id.clone(), listener);
184        id
185    }
186
187    /// Remove a listener by its ID
188    ///
189    /// # Arguments
190    ///
191    /// * `id` - The ID returned from `add_listener`
192    ///
193    /// # Returns
194    ///
195    /// `true` if the listener was found and removed, `false` otherwise
196    pub async fn remove_listener(&self, id: &str) -> bool {
197        let mut listeners = self.listeners.write().await;
198        listeners.remove(id).is_some()
199    }
200
201    /// Emit an event to all registered listeners
202    pub async fn emit(&self, event: &SdkEvent) {
203        // Get a read lock on the listeners
204        let listeners = self.listeners.read().await;
205
206        // Emit the event to each listener
207        for listener in listeners.values() {
208            listener.on_event(event.clone()).await;
209        }
210    }
211
212    pub async fn emit_synced(&self, synced: &InternalSyncedEvent) {
213        if !synced.any() {
214            // Nothing to emit
215            return;
216        }
217
218        let mut mtx = self.synced_event_buffer.lock().await;
219
220        let is_first_event = if let Some(buffered) = &*mtx {
221            let merged = buffered.merge(synced);
222
223            // The first synced event emitted should at least have the wallet synced.
224            // Subsequent events might have only partial syncs.
225            if merged.wallet && (!self.has_real_time_sync || merged.storage_incoming.is_some()) {
226                *mtx = None;
227            } else {
228                *mtx = Some(merged);
229                return;
230            }
231
232            true
233        } else {
234            false
235        };
236
237        drop(mtx);
238
239        // Only emit zero real-time syncs on the first event.
240        if !is_first_event && !synced.any_non_zero() {
241            return;
242        }
243
244        // Emit the merged event
245        self.emit(&SdkEvent::Synced).await;
246    }
247}
248
249impl Default for EventEmitter {
250    fn default() -> Self {
251        Self::new(false)
252    }
253}
254
255#[cfg(test)]
256mod tests {
257    use super::*;
258    use std::sync::Arc;
259    use std::sync::atomic::{AtomicBool, Ordering};
260
261    use macros::async_test_all;
262
263    #[cfg(feature = "browser-tests")]
264    wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
265
266    struct TestListener {
267        received: Arc<AtomicBool>,
268    }
269
270    #[macros::async_trait]
271    impl EventListener for TestListener {
272        async fn on_event(&self, _event: SdkEvent) {
273            self.received.store(true, Ordering::Relaxed);
274        }
275    }
276
277    #[async_test_all]
278    async fn test_event_emission() {
279        let emitter = EventEmitter::new(false);
280        let received = Arc::new(AtomicBool::new(false));
281
282        // Create the listener with a shared reference to the atomic boolean
283        let listener = Box::new(TestListener {
284            received: received.clone(),
285        });
286
287        let _ = emitter.add_listener(listener).await;
288
289        let event = SdkEvent::Synced {};
290
291        emitter.emit(&event).await;
292
293        // Check if event was received using the shared reference
294        assert!(received.load(Ordering::Relaxed));
295    }
296
297    #[async_test_all]
298    async fn test_remove_listener() {
299        let emitter = EventEmitter::new(false);
300
301        // Create shared atomic booleans to track event reception
302        let received1 = Arc::new(AtomicBool::new(false));
303        let received2 = Arc::new(AtomicBool::new(false));
304
305        // Create listeners with their own shared references
306        let listener1 = Box::new(TestListener {
307            received: received1.clone(),
308        });
309
310        let listener2 = Box::new(TestListener {
311            received: received2.clone(),
312        });
313
314        let id1 = emitter.add_listener(listener1).await;
315        let id2 = emitter.add_listener(listener2).await;
316
317        // Remove the first listener
318        assert!(emitter.remove_listener(&id1).await);
319
320        // Emit an event
321        let event = SdkEvent::Synced {};
322        emitter.emit(&event).await;
323
324        // The first listener should not receive the event
325        assert!(!received1.load(Ordering::Relaxed));
326
327        // The second listener should receive the event
328        assert!(received2.load(Ordering::Relaxed));
329
330        // Remove the second listener
331        assert!(emitter.remove_listener(&id2).await);
332
333        // Try to remove a non-existent listener
334        assert!(!emitter.remove_listener("non-existent-id").await);
335    }
336
337    #[async_test_all]
338    async fn test_synced_event_only_emitted_with_wallet_sync() {
339        let emitter = EventEmitter::new(false);
340        let received = Arc::new(AtomicBool::new(false));
341
342        let listener = Box::new(TestListener {
343            received: received.clone(),
344        });
345
346        emitter.add_listener(listener).await;
347
348        // Emit synced event without wallet sync - should NOT emit Synced
349        emitter
350            .emit_synced(&InternalSyncedEvent {
351                wallet: false,
352                wallet_state: true,
353                deposits: true,
354                lnurl_metadata: true,
355                storage_incoming: None,
356            })
357            .await;
358
359        assert!(!received.load(Ordering::Relaxed));
360
361        // Emit synced event with wallet sync - should emit Synced
362        emitter
363            .emit_synced(&InternalSyncedEvent {
364                wallet: true,
365                wallet_state: false,
366                deposits: false,
367                lnurl_metadata: false,
368                storage_incoming: Some(1),
369            })
370            .await;
371
372        assert!(received.load(Ordering::Relaxed));
373    }
374
375    #[async_test_all]
376    async fn test_has_real_time_sync_synced_event_only_emitted_with_wallet_and_storage_sync() {
377        let emitter = EventEmitter::new(true);
378        let received = Arc::new(AtomicBool::new(false));
379
380        let listener = Box::new(TestListener {
381            received: received.clone(),
382        });
383
384        emitter.add_listener(listener).await;
385
386        // Emit synced event with storage
387        emitter
388            .emit_synced(&InternalSyncedEvent {
389                wallet: false,
390                wallet_state: false,
391                deposits: false,
392                lnurl_metadata: false,
393                storage_incoming: Some(0),
394            })
395            .await;
396
397        assert!(!received.load(Ordering::Relaxed));
398
399        // Emit synced event with wallet sync - should emit Synced
400        emitter
401            .emit_synced(&InternalSyncedEvent {
402                wallet: true,
403                wallet_state: false,
404                deposits: false,
405                lnurl_metadata: false,
406                storage_incoming: None,
407            })
408            .await;
409
410        assert!(received.load(Ordering::Relaxed));
411    }
412
413    #[async_test_all]
414    async fn test_has_real_time_sync_synced_event_only_emitted_with_wallet_and_storage_sync_reverse()
415     {
416        let emitter = EventEmitter::new(true);
417        let received = Arc::new(AtomicBool::new(false));
418
419        let listener = Box::new(TestListener {
420            received: received.clone(),
421        });
422
423        emitter.add_listener(listener).await;
424
425        // Emit synced event with wallet sync
426        emitter
427            .emit_synced(&InternalSyncedEvent {
428                wallet: true,
429                wallet_state: false,
430                deposits: false,
431                lnurl_metadata: false,
432                storage_incoming: None,
433            })
434            .await;
435
436        assert!(!received.load(Ordering::Relaxed));
437
438        // Emit synced event with storage - should emit Synced
439        emitter
440            .emit_synced(&InternalSyncedEvent {
441                wallet: false,
442                wallet_state: false,
443                deposits: false,
444                lnurl_metadata: false,
445                storage_incoming: Some(0),
446            })
447            .await;
448
449        assert!(received.load(Ordering::Relaxed));
450    }
451
452    #[async_test_all]
453    async fn test_synced_event_buffers_until_wallet_sync() {
454        let emitter = EventEmitter::new(false);
455        let received = Arc::new(AtomicBool::new(false));
456
457        let listener = Box::new(TestListener {
458            received: received.clone(),
459        });
460
461        emitter.add_listener(listener).await;
462
463        // Emit multiple partial syncs without wallet sync
464        emitter
465            .emit_synced(&InternalSyncedEvent {
466                wallet: false,
467                wallet_state: true,
468                deposits: false,
469                lnurl_metadata: false,
470                storage_incoming: None,
471            })
472            .await;
473
474        assert!(!received.load(Ordering::Relaxed));
475
476        emitter
477            .emit_synced(&InternalSyncedEvent {
478                wallet: false,
479                wallet_state: false,
480                deposits: true,
481                lnurl_metadata: false,
482                storage_incoming: None,
483            })
484            .await;
485
486        assert!(!received.load(Ordering::Relaxed));
487
488        emitter
489            .emit_synced(&InternalSyncedEvent {
490                wallet: false,
491                wallet_state: false,
492                deposits: false,
493                lnurl_metadata: true,
494                storage_incoming: None,
495            })
496            .await;
497
498        assert!(!received.load(Ordering::Relaxed));
499
500        emitter
501            .emit_synced(&InternalSyncedEvent {
502                wallet: false,
503                wallet_state: false,
504                deposits: false,
505                lnurl_metadata: false,
506                storage_incoming: None,
507            })
508            .await;
509
510        assert!(!received.load(Ordering::Relaxed));
511
512        // Finally emit wallet sync - should emit Synced
513        emitter
514            .emit_synced(&InternalSyncedEvent {
515                wallet: true,
516                wallet_state: false,
517                deposits: false,
518                lnurl_metadata: false,
519                storage_incoming: None,
520            })
521            .await;
522
523        assert!(received.load(Ordering::Relaxed));
524    }
525
526    #[async_test_all]
527    async fn test_synced_event_all_true() {
528        let emitter = EventEmitter::new(false);
529        let received = Arc::new(AtomicBool::new(false));
530
531        let listener = Box::new(TestListener {
532            received: received.clone(),
533        });
534
535        emitter.add_listener(listener).await;
536
537        // Emit synced event with wallet and other components - should emit Synced
538        emitter
539            .emit_synced(&InternalSyncedEvent {
540                wallet: true,
541                wallet_state: true,
542                deposits: true,
543                lnurl_metadata: true,
544                storage_incoming: Some(1),
545            })
546            .await;
547
548        assert!(received.load(Ordering::Relaxed));
549    }
550
551    #[async_test_all]
552    async fn test_synced_event_empty_does_not_emit() {
553        let emitter = EventEmitter::new(false);
554        let received = Arc::new(AtomicBool::new(false));
555
556        let listener = Box::new(TestListener {
557            received: received.clone(),
558        });
559
560        emitter.add_listener(listener).await;
561
562        // Emit empty synced event - should NOT emit Synced
563        emitter
564            .emit_synced(&InternalSyncedEvent {
565                wallet: false,
566                wallet_state: false,
567                deposits: false,
568                lnurl_metadata: false,
569                storage_incoming: None,
570            })
571            .await;
572
573        assert!(!received.load(Ordering::Relaxed));
574    }
575
576    #[async_test_all]
577    async fn test_subsequent_syncs_after_wallet_emit_immediately() {
578        use std::sync::atomic::AtomicUsize;
579
580        struct CountingListener {
581            count: Arc<AtomicUsize>,
582        }
583
584        #[macros::async_trait]
585        impl EventListener for CountingListener {
586            async fn on_event(&self, event: SdkEvent) {
587                if matches!(event, SdkEvent::Synced) {
588                    self.count.fetch_add(1, Ordering::Relaxed);
589                }
590            }
591        }
592
593        let emitter = EventEmitter::new(true);
594        let count = Arc::new(AtomicUsize::new(0));
595
596        let listener = Box::new(CountingListener {
597            count: count.clone(),
598        });
599
600        emitter.add_listener(listener).await;
601
602        // First sync with wallet - should emit
603        emitter
604            .emit_synced(&InternalSyncedEvent {
605                wallet: true,
606                wallet_state: false,
607                deposits: false,
608                lnurl_metadata: false,
609                storage_incoming: Some(0),
610            })
611            .await;
612
613        assert_eq!(count.load(Ordering::Relaxed), 1);
614
615        // Subsequent partial sync without wallet - should emit (buffer cleared after first wallet sync)
616        emitter
617            .emit_synced(&InternalSyncedEvent {
618                wallet: false,
619                wallet_state: true,
620                deposits: false,
621                lnurl_metadata: false,
622                storage_incoming: None,
623            })
624            .await;
625
626        assert_eq!(count.load(Ordering::Relaxed), 2);
627
628        // Another partial sync - should emit
629        emitter
630            .emit_synced(&InternalSyncedEvent {
631                wallet: false,
632                wallet_state: false,
633                deposits: true,
634                lnurl_metadata: false,
635                storage_incoming: None,
636            })
637            .await;
638
639        assert_eq!(count.load(Ordering::Relaxed), 3);
640
641        emitter
642            .emit_synced(&InternalSyncedEvent {
643                wallet: false,
644                wallet_state: false,
645                deposits: false,
646                lnurl_metadata: true,
647                storage_incoming: None,
648            })
649            .await;
650
651        assert_eq!(count.load(Ordering::Relaxed), 4);
652
653        emitter
654            .emit_synced(&InternalSyncedEvent {
655                wallet: false,
656                wallet_state: false,
657                deposits: false,
658                lnurl_metadata: false,
659                storage_incoming: Some(1),
660            })
661            .await;
662
663        assert_eq!(count.load(Ordering::Relaxed), 5);
664
665        // storage_incoming with Some(0) - should NOT emit after first sync
666        emitter
667            .emit_synced(&InternalSyncedEvent {
668                wallet: false,
669                wallet_state: false,
670                deposits: false,
671                lnurl_metadata: false,
672                storage_incoming: Some(0),
673            })
674            .await;
675
676        assert_eq!(count.load(Ordering::Relaxed), 5);
677
678        emitter
679            .emit_synced(&InternalSyncedEvent {
680                wallet: true,
681                wallet_state: false,
682                deposits: false,
683                lnurl_metadata: false,
684                storage_incoming: None,
685            })
686            .await;
687
688        assert_eq!(count.load(Ordering::Relaxed), 6);
689    }
690
691    #[async_test_all]
692    async fn test_empty_event_does_not_emit_after_wallet_sync() {
693        use std::sync::atomic::AtomicUsize;
694
695        struct CountingListener {
696            count: Arc<AtomicUsize>,
697        }
698
699        #[macros::async_trait]
700        impl EventListener for CountingListener {
701            async fn on_event(&self, event: SdkEvent) {
702                if matches!(event, SdkEvent::Synced) {
703                    self.count.fetch_add(1, Ordering::Relaxed);
704                }
705            }
706        }
707
708        let emitter = EventEmitter::new(false);
709        let count = Arc::new(AtomicUsize::new(0));
710
711        let listener = Box::new(CountingListener {
712            count: count.clone(),
713        });
714
715        emitter.add_listener(listener).await;
716
717        // First sync with wallet - should emit
718        emitter
719            .emit_synced(&InternalSyncedEvent {
720                wallet: true,
721                wallet_state: false,
722                deposits: false,
723                lnurl_metadata: false,
724                storage_incoming: None,
725            })
726            .await;
727
728        assert_eq!(count.load(Ordering::Relaxed), 1);
729
730        // Empty sync after wallet sync - should NOT emit (all fields false)
731        emitter
732            .emit_synced(&InternalSyncedEvent {
733                wallet: false,
734                wallet_state: false,
735                deposits: false,
736                lnurl_metadata: false,
737                storage_incoming: None,
738            })
739            .await;
740
741        assert_eq!(count.load(Ordering::Relaxed), 1); // Count should remain 1
742
743        // Another non-empty sync - should emit
744        emitter
745            .emit_synced(&InternalSyncedEvent {
746                wallet: false,
747                wallet_state: true,
748                deposits: false,
749                lnurl_metadata: false,
750                storage_incoming: None,
751            })
752            .await;
753
754        assert_eq!(count.load(Ordering::Relaxed), 2); // Now count should be 2
755    }
756}