breez_sdk_spark/
events.rs

1use core::fmt;
2use std::{
3    collections::BTreeMap,
4    sync::atomic::{AtomicBool, AtomicU64, Ordering},
5};
6
7use serde::Serialize;
8use tokio::sync::{Mutex, RwLock};
9use uuid::Uuid;
10
11use crate::{DepositInfo, LightningAddressInfo, Payment};
12
13/// Events emitted by the SDK
14#[allow(clippy::large_enum_variant)]
15#[derive(Debug, Clone, Serialize)]
16#[cfg_attr(feature = "uniffi", derive(uniffi::Enum))]
17pub enum SdkEvent {
18    /// Emitted when the wallet has been synchronized with the network
19    Synced,
20    /// Emitted when the SDK was unable to claim deposits
21    UnclaimedDeposits {
22        unclaimed_deposits: Vec<DepositInfo>,
23    },
24    ClaimedDeposits {
25        claimed_deposits: Vec<DepositInfo>,
26    },
27    PaymentSucceeded {
28        payment: Payment,
29    },
30    PaymentPending {
31        payment: Payment,
32    },
33    PaymentFailed {
34        payment: Payment,
35    },
36    Optimization {
37        // Named with `optimization` prefix to avoid collision with `event` keyword in C#
38        optimization_event: OptimizationEvent,
39    },
40    LightningAddressChanged {
41        lightning_address: Option<LightningAddressInfo>,
42    },
43}
44
45impl SdkEvent {
46    pub(crate) fn from_payment(payment: Payment) -> Self {
47        match payment.status {
48            crate::PaymentStatus::Completed => SdkEvent::PaymentSucceeded { payment },
49            crate::PaymentStatus::Pending => SdkEvent::PaymentPending { payment },
50            crate::PaymentStatus::Failed => SdkEvent::PaymentFailed { payment },
51        }
52    }
53}
54
55impl fmt::Display for SdkEvent {
56    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
57        match self {
58            SdkEvent::Synced => write!(f, "Synced"),
59            SdkEvent::UnclaimedDeposits { unclaimed_deposits } => {
60                write!(f, "UnclaimedDeposits: {unclaimed_deposits:?}")
61            }
62            SdkEvent::ClaimedDeposits { claimed_deposits } => {
63                write!(f, "ClaimedDeposits: {claimed_deposits:?}")
64            }
65            SdkEvent::PaymentSucceeded { payment } => {
66                write!(f, "PaymentSucceeded: {payment:?}")
67            }
68            SdkEvent::PaymentPending { payment } => {
69                write!(f, "PaymentPending: {payment:?}")
70            }
71            SdkEvent::PaymentFailed { payment } => {
72                write!(f, "PaymentFailed: {payment:?}")
73            }
74            SdkEvent::Optimization {
75                optimization_event: event,
76            } => {
77                write!(f, "Optimization: {event:?}")
78            }
79            SdkEvent::LightningAddressChanged { lightning_address } => {
80                write!(f, "LightningAddressChanged: {lightning_address:?}")
81            }
82        }
83    }
84}
85
86#[derive(Debug, Clone, Serialize)]
87#[cfg_attr(feature = "uniffi", derive(uniffi::Enum))]
88pub enum OptimizationEvent {
89    /// Optimization has started with the given number of rounds.
90    Started { total_rounds: u32 },
91    /// A round has completed.
92    RoundCompleted {
93        current_round: u32,
94        total_rounds: u32,
95    },
96    /// Optimization completed successfully.
97    Completed,
98    /// Optimization was cancelled.
99    Cancelled,
100    /// Optimization failed with an error.
101    Failed { error: String },
102    /// Optimization was skipped because leaves are already optimal.
103    Skipped,
104}
105
106#[allow(clippy::struct_excessive_bools)]
107#[derive(Debug, Default)]
108pub struct InternalSyncedEvent {
109    pub wallet: bool,
110    pub wallet_state: bool,
111    pub deposits: bool,
112    pub lnurl_metadata: bool,
113    pub storage_incoming: Option<u32>,
114}
115
116impl InternalSyncedEvent {
117    pub fn any(&self) -> bool {
118        self.wallet
119            || self.wallet_state
120            || self.deposits
121            || self.lnurl_metadata
122            || self.storage_incoming.is_some()
123    }
124
125    pub fn any_non_zero(&self) -> bool {
126        self.wallet
127            || self.wallet_state
128            || self.deposits
129            || self.lnurl_metadata
130            || self.storage_incoming.is_some_and(|v| v > 0)
131    }
132
133    pub fn merge(&self, other: &InternalSyncedEvent) -> Self {
134        Self {
135            wallet: self.wallet || other.wallet,
136            wallet_state: self.wallet_state || other.wallet_state,
137            deposits: self.deposits || other.deposits,
138            lnurl_metadata: self.lnurl_metadata || other.lnurl_metadata,
139            storage_incoming: self
140                .storage_incoming
141                .zip(other.storage_incoming)
142                .map(|(a, b)| a.saturating_add(b))
143                .or(self.storage_incoming)
144                .or(other.storage_incoming),
145        }
146    }
147}
148
149/// Trait for event listeners
150#[cfg_attr(feature = "uniffi", uniffi::export(callback_interface))]
151#[macros::async_trait]
152pub trait EventListener: Send + Sync {
153    /// Called when an event occurs
154    async fn on_event(&self, event: SdkEvent);
155}
156
157/// Event publisher that manages event listeners
158pub struct EventEmitter {
159    has_real_time_sync: bool,
160    rtsync_failed: AtomicBool,
161    listener_index: AtomicU64,
162    listeners: RwLock<BTreeMap<String, Box<dyn EventListener>>>,
163    synced_event_buffer: Mutex<Option<InternalSyncedEvent>>,
164}
165
166impl EventEmitter {
167    /// Create a new event emitter
168    pub fn new(has_real_time_sync: bool) -> Self {
169        Self {
170            has_real_time_sync,
171            rtsync_failed: AtomicBool::new(false),
172            listener_index: AtomicU64::new(0),
173            listeners: RwLock::new(BTreeMap::new()),
174            synced_event_buffer: Mutex::new(Some(InternalSyncedEvent::default())),
175        }
176    }
177
178    /// Add a listener to receive events
179    ///
180    /// # Arguments
181    ///
182    /// * `listener` - The listener to add
183    ///
184    /// # Returns
185    ///
186    /// A unique identifier for the listener, which can be used to remove it later
187    pub async fn add_listener(&self, listener: Box<dyn EventListener>) -> String {
188        let index = self.listener_index.fetch_add(1, Ordering::Relaxed);
189        let id = format!("listener_{}-{}", index, Uuid::new_v4());
190        let mut listeners = self.listeners.write().await;
191        listeners.insert(id.clone(), listener);
192        id
193    }
194
195    /// Remove a listener by its ID
196    ///
197    /// # Arguments
198    ///
199    /// * `id` - The ID returned from `add_listener`
200    ///
201    /// # Returns
202    ///
203    /// `true` if the listener was found and removed, `false` otherwise
204    pub async fn remove_listener(&self, id: &str) -> bool {
205        let mut listeners = self.listeners.write().await;
206        listeners.remove(id).is_some()
207    }
208
209    /// Emit an event to all registered listeners
210    pub async fn emit(&self, event: &SdkEvent) {
211        // Get a read lock on the listeners
212        let listeners = self.listeners.read().await;
213
214        // Emit the event to each listener
215        for listener in listeners.values() {
216            listener.on_event(event.clone()).await;
217        }
218    }
219
220    pub async fn emit_synced(&self, synced: &InternalSyncedEvent) {
221        if !synced.any() {
222            // Nothing to emit
223            return;
224        }
225
226        let mut mtx = self.synced_event_buffer.lock().await;
227
228        let is_first_event = if let Some(buffered) = &*mtx {
229            let merged = buffered.merge(synced);
230
231            // The first synced event emitted should at least have the wallet synced.
232            // Subsequent events might have only partial syncs.
233            if merged.wallet
234                && (!self.has_real_time_sync
235                    || merged.storage_incoming.is_some()
236                    || self.rtsync_failed.load(Ordering::Relaxed))
237            {
238                *mtx = None;
239            } else {
240                *mtx = Some(merged);
241                return;
242            }
243
244            true
245        } else {
246            false
247        };
248
249        drop(mtx);
250
251        // Only emit zero real-time syncs on the first event.
252        if !is_first_event && !synced.any_non_zero() {
253            return;
254        }
255
256        // Emit the merged event
257        self.emit(&SdkEvent::Synced).await;
258    }
259
260    /// Notify that real-time sync has failed. If the first synced event is still
261    /// buffered and the wallet has already synced, release it immediately instead
262    /// of waiting for a remote pull that may never arrive.
263    pub async fn notify_rtsync_failed(&self) {
264        self.rtsync_failed.store(true, Ordering::Relaxed);
265
266        let mut mtx = self.synced_event_buffer.lock().await;
267        if let Some(buffered) = &*mtx
268            && buffered.wallet
269        {
270            *mtx = None;
271            drop(mtx);
272            self.emit(&SdkEvent::Synced).await;
273        }
274    }
275}
276
277impl Default for EventEmitter {
278    fn default() -> Self {
279        Self::new(false)
280    }
281}
282
283#[cfg(test)]
284mod tests {
285    use super::*;
286    use std::sync::Arc;
287    use std::sync::atomic::{AtomicBool, Ordering};
288
289    use macros::async_test_all;
290
291    #[cfg(feature = "browser-tests")]
292    wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
293
294    struct TestListener {
295        received: Arc<AtomicBool>,
296    }
297
298    #[macros::async_trait]
299    impl EventListener for TestListener {
300        async fn on_event(&self, _event: SdkEvent) {
301            self.received.store(true, Ordering::Relaxed);
302        }
303    }
304
305    #[async_test_all]
306    async fn test_event_emission() {
307        let emitter = EventEmitter::new(false);
308        let received = Arc::new(AtomicBool::new(false));
309
310        // Create the listener with a shared reference to the atomic boolean
311        let listener = Box::new(TestListener {
312            received: received.clone(),
313        });
314
315        let _ = emitter.add_listener(listener).await;
316
317        let event = SdkEvent::Synced {};
318
319        emitter.emit(&event).await;
320
321        // Check if event was received using the shared reference
322        assert!(received.load(Ordering::Relaxed));
323    }
324
325    #[async_test_all]
326    async fn test_remove_listener() {
327        let emitter = EventEmitter::new(false);
328
329        // Create shared atomic booleans to track event reception
330        let received1 = Arc::new(AtomicBool::new(false));
331        let received2 = Arc::new(AtomicBool::new(false));
332
333        // Create listeners with their own shared references
334        let listener1 = Box::new(TestListener {
335            received: received1.clone(),
336        });
337
338        let listener2 = Box::new(TestListener {
339            received: received2.clone(),
340        });
341
342        let id1 = emitter.add_listener(listener1).await;
343        let id2 = emitter.add_listener(listener2).await;
344
345        // Remove the first listener
346        assert!(emitter.remove_listener(&id1).await);
347
348        // Emit an event
349        let event = SdkEvent::Synced {};
350        emitter.emit(&event).await;
351
352        // The first listener should not receive the event
353        assert!(!received1.load(Ordering::Relaxed));
354
355        // The second listener should receive the event
356        assert!(received2.load(Ordering::Relaxed));
357
358        // Remove the second listener
359        assert!(emitter.remove_listener(&id2).await);
360
361        // Try to remove a non-existent listener
362        assert!(!emitter.remove_listener("non-existent-id").await);
363    }
364
365    #[async_test_all]
366    async fn test_synced_event_only_emitted_with_wallet_sync() {
367        let emitter = EventEmitter::new(false);
368        let received = Arc::new(AtomicBool::new(false));
369
370        let listener = Box::new(TestListener {
371            received: received.clone(),
372        });
373
374        emitter.add_listener(listener).await;
375
376        // Emit synced event without wallet sync - should NOT emit Synced
377        emitter
378            .emit_synced(&InternalSyncedEvent {
379                wallet: false,
380                wallet_state: true,
381                deposits: true,
382                lnurl_metadata: true,
383                storage_incoming: None,
384            })
385            .await;
386
387        assert!(!received.load(Ordering::Relaxed));
388
389        // Emit synced event with wallet sync - should emit Synced
390        emitter
391            .emit_synced(&InternalSyncedEvent {
392                wallet: true,
393                wallet_state: false,
394                deposits: false,
395                lnurl_metadata: false,
396                storage_incoming: Some(1),
397            })
398            .await;
399
400        assert!(received.load(Ordering::Relaxed));
401    }
402
403    #[async_test_all]
404    async fn test_has_real_time_sync_synced_event_only_emitted_with_wallet_and_storage_sync() {
405        let emitter = EventEmitter::new(true);
406        let received = Arc::new(AtomicBool::new(false));
407
408        let listener = Box::new(TestListener {
409            received: received.clone(),
410        });
411
412        emitter.add_listener(listener).await;
413
414        // Emit synced event with storage
415        emitter
416            .emit_synced(&InternalSyncedEvent {
417                wallet: false,
418                wallet_state: false,
419                deposits: false,
420                lnurl_metadata: false,
421                storage_incoming: Some(0),
422            })
423            .await;
424
425        assert!(!received.load(Ordering::Relaxed));
426
427        // Emit synced event with wallet sync - should emit Synced
428        emitter
429            .emit_synced(&InternalSyncedEvent {
430                wallet: true,
431                wallet_state: false,
432                deposits: false,
433                lnurl_metadata: false,
434                storage_incoming: None,
435            })
436            .await;
437
438        assert!(received.load(Ordering::Relaxed));
439    }
440
441    #[async_test_all]
442    async fn test_has_real_time_sync_synced_event_only_emitted_with_wallet_and_storage_sync_reverse()
443     {
444        let emitter = EventEmitter::new(true);
445        let received = Arc::new(AtomicBool::new(false));
446
447        let listener = Box::new(TestListener {
448            received: received.clone(),
449        });
450
451        emitter.add_listener(listener).await;
452
453        // Emit synced event with wallet sync
454        emitter
455            .emit_synced(&InternalSyncedEvent {
456                wallet: true,
457                wallet_state: false,
458                deposits: false,
459                lnurl_metadata: false,
460                storage_incoming: None,
461            })
462            .await;
463
464        assert!(!received.load(Ordering::Relaxed));
465
466        // Emit synced event with storage - should emit Synced
467        emitter
468            .emit_synced(&InternalSyncedEvent {
469                wallet: false,
470                wallet_state: false,
471                deposits: false,
472                lnurl_metadata: false,
473                storage_incoming: Some(0),
474            })
475            .await;
476
477        assert!(received.load(Ordering::Relaxed));
478    }
479
480    #[async_test_all]
481    async fn test_rtsync_failed_emits_synced_on_wallet_alone() {
482        let emitter = EventEmitter::new(true);
483        let received = Arc::new(AtomicBool::new(false));
484
485        let listener = Box::new(TestListener {
486            received: received.clone(),
487        });
488
489        emitter.add_listener(listener).await;
490
491        // Wallet synced but rtsync hasn't failed yet — should NOT emit
492        emitter
493            .emit_synced(&InternalSyncedEvent {
494                wallet: true,
495                wallet_state: false,
496                deposits: false,
497                lnurl_metadata: false,
498                storage_incoming: None,
499            })
500            .await;
501
502        assert!(!received.load(Ordering::Relaxed));
503
504        // rtsync fails — should immediately release the buffered event
505        emitter.notify_rtsync_failed().await;
506
507        assert!(received.load(Ordering::Relaxed));
508    }
509
510    #[async_test_all]
511    async fn test_rtsync_failed_before_wallet_sync_emits_on_wallet() {
512        let emitter = EventEmitter::new(true);
513        let received = Arc::new(AtomicBool::new(false));
514
515        let listener = Box::new(TestListener {
516            received: received.clone(),
517        });
518
519        emitter.add_listener(listener).await;
520
521        // rtsync fails before wallet syncs — nothing to release yet
522        emitter.notify_rtsync_failed().await;
523
524        assert!(!received.load(Ordering::Relaxed));
525
526        // Wallet syncs — should emit immediately (rtsync already marked failed)
527        emitter
528            .emit_synced(&InternalSyncedEvent {
529                wallet: true,
530                wallet_state: false,
531                deposits: false,
532                lnurl_metadata: false,
533                storage_incoming: None,
534            })
535            .await;
536
537        assert!(received.load(Ordering::Relaxed));
538    }
539
540    #[async_test_all]
541    async fn test_synced_event_buffers_until_wallet_sync() {
542        let emitter = EventEmitter::new(false);
543        let received = Arc::new(AtomicBool::new(false));
544
545        let listener = Box::new(TestListener {
546            received: received.clone(),
547        });
548
549        emitter.add_listener(listener).await;
550
551        // Emit multiple partial syncs without wallet sync
552        emitter
553            .emit_synced(&InternalSyncedEvent {
554                wallet: false,
555                wallet_state: true,
556                deposits: false,
557                lnurl_metadata: false,
558                storage_incoming: None,
559            })
560            .await;
561
562        assert!(!received.load(Ordering::Relaxed));
563
564        emitter
565            .emit_synced(&InternalSyncedEvent {
566                wallet: false,
567                wallet_state: false,
568                deposits: true,
569                lnurl_metadata: false,
570                storage_incoming: None,
571            })
572            .await;
573
574        assert!(!received.load(Ordering::Relaxed));
575
576        emitter
577            .emit_synced(&InternalSyncedEvent {
578                wallet: false,
579                wallet_state: false,
580                deposits: false,
581                lnurl_metadata: true,
582                storage_incoming: None,
583            })
584            .await;
585
586        assert!(!received.load(Ordering::Relaxed));
587
588        emitter
589            .emit_synced(&InternalSyncedEvent {
590                wallet: false,
591                wallet_state: false,
592                deposits: false,
593                lnurl_metadata: false,
594                storage_incoming: None,
595            })
596            .await;
597
598        assert!(!received.load(Ordering::Relaxed));
599
600        // Finally emit wallet sync - should emit Synced
601        emitter
602            .emit_synced(&InternalSyncedEvent {
603                wallet: true,
604                wallet_state: false,
605                deposits: false,
606                lnurl_metadata: false,
607                storage_incoming: None,
608            })
609            .await;
610
611        assert!(received.load(Ordering::Relaxed));
612    }
613
614    #[async_test_all]
615    async fn test_synced_event_all_true() {
616        let emitter = EventEmitter::new(false);
617        let received = Arc::new(AtomicBool::new(false));
618
619        let listener = Box::new(TestListener {
620            received: received.clone(),
621        });
622
623        emitter.add_listener(listener).await;
624
625        // Emit synced event with wallet and other components - should emit Synced
626        emitter
627            .emit_synced(&InternalSyncedEvent {
628                wallet: true,
629                wallet_state: true,
630                deposits: true,
631                lnurl_metadata: true,
632                storage_incoming: Some(1),
633            })
634            .await;
635
636        assert!(received.load(Ordering::Relaxed));
637    }
638
639    #[async_test_all]
640    async fn test_synced_event_empty_does_not_emit() {
641        let emitter = EventEmitter::new(false);
642        let received = Arc::new(AtomicBool::new(false));
643
644        let listener = Box::new(TestListener {
645            received: received.clone(),
646        });
647
648        emitter.add_listener(listener).await;
649
650        // Emit empty synced event - should NOT emit Synced
651        emitter
652            .emit_synced(&InternalSyncedEvent {
653                wallet: false,
654                wallet_state: false,
655                deposits: false,
656                lnurl_metadata: false,
657                storage_incoming: None,
658            })
659            .await;
660
661        assert!(!received.load(Ordering::Relaxed));
662    }
663
664    #[async_test_all]
665    async fn test_subsequent_syncs_after_wallet_emit_immediately() {
666        use std::sync::atomic::AtomicUsize;
667
668        struct CountingListener {
669            count: Arc<AtomicUsize>,
670        }
671
672        #[macros::async_trait]
673        impl EventListener for CountingListener {
674            async fn on_event(&self, event: SdkEvent) {
675                if matches!(event, SdkEvent::Synced) {
676                    self.count.fetch_add(1, Ordering::Relaxed);
677                }
678            }
679        }
680
681        let emitter = EventEmitter::new(true);
682        let count = Arc::new(AtomicUsize::new(0));
683
684        let listener = Box::new(CountingListener {
685            count: count.clone(),
686        });
687
688        emitter.add_listener(listener).await;
689
690        // First sync with wallet - should emit
691        emitter
692            .emit_synced(&InternalSyncedEvent {
693                wallet: true,
694                wallet_state: false,
695                deposits: false,
696                lnurl_metadata: false,
697                storage_incoming: Some(0),
698            })
699            .await;
700
701        assert_eq!(count.load(Ordering::Relaxed), 1);
702
703        // Subsequent partial sync without wallet - should emit (buffer cleared after first wallet sync)
704        emitter
705            .emit_synced(&InternalSyncedEvent {
706                wallet: false,
707                wallet_state: true,
708                deposits: false,
709                lnurl_metadata: false,
710                storage_incoming: None,
711            })
712            .await;
713
714        assert_eq!(count.load(Ordering::Relaxed), 2);
715
716        // Another partial sync - should emit
717        emitter
718            .emit_synced(&InternalSyncedEvent {
719                wallet: false,
720                wallet_state: false,
721                deposits: true,
722                lnurl_metadata: false,
723                storage_incoming: None,
724            })
725            .await;
726
727        assert_eq!(count.load(Ordering::Relaxed), 3);
728
729        emitter
730            .emit_synced(&InternalSyncedEvent {
731                wallet: false,
732                wallet_state: false,
733                deposits: false,
734                lnurl_metadata: true,
735                storage_incoming: None,
736            })
737            .await;
738
739        assert_eq!(count.load(Ordering::Relaxed), 4);
740
741        emitter
742            .emit_synced(&InternalSyncedEvent {
743                wallet: false,
744                wallet_state: false,
745                deposits: false,
746                lnurl_metadata: false,
747                storage_incoming: Some(1),
748            })
749            .await;
750
751        assert_eq!(count.load(Ordering::Relaxed), 5);
752
753        // storage_incoming with Some(0) - should NOT emit after first sync
754        emitter
755            .emit_synced(&InternalSyncedEvent {
756                wallet: false,
757                wallet_state: false,
758                deposits: false,
759                lnurl_metadata: false,
760                storage_incoming: Some(0),
761            })
762            .await;
763
764        assert_eq!(count.load(Ordering::Relaxed), 5);
765
766        emitter
767            .emit_synced(&InternalSyncedEvent {
768                wallet: true,
769                wallet_state: false,
770                deposits: false,
771                lnurl_metadata: false,
772                storage_incoming: None,
773            })
774            .await;
775
776        assert_eq!(count.load(Ordering::Relaxed), 6);
777    }
778
779    #[async_test_all]
780    async fn test_empty_event_does_not_emit_after_wallet_sync() {
781        use std::sync::atomic::AtomicUsize;
782
783        struct CountingListener {
784            count: Arc<AtomicUsize>,
785        }
786
787        #[macros::async_trait]
788        impl EventListener for CountingListener {
789            async fn on_event(&self, event: SdkEvent) {
790                if matches!(event, SdkEvent::Synced) {
791                    self.count.fetch_add(1, Ordering::Relaxed);
792                }
793            }
794        }
795
796        let emitter = EventEmitter::new(false);
797        let count = Arc::new(AtomicUsize::new(0));
798
799        let listener = Box::new(CountingListener {
800            count: count.clone(),
801        });
802
803        emitter.add_listener(listener).await;
804
805        // First sync with wallet - should emit
806        emitter
807            .emit_synced(&InternalSyncedEvent {
808                wallet: true,
809                wallet_state: false,
810                deposits: false,
811                lnurl_metadata: false,
812                storage_incoming: None,
813            })
814            .await;
815
816        assert_eq!(count.load(Ordering::Relaxed), 1);
817
818        // Empty sync after wallet sync - should NOT emit (all fields false)
819        emitter
820            .emit_synced(&InternalSyncedEvent {
821                wallet: false,
822                wallet_state: false,
823                deposits: false,
824                lnurl_metadata: false,
825                storage_incoming: None,
826            })
827            .await;
828
829        assert_eq!(count.load(Ordering::Relaxed), 1); // Count should remain 1
830
831        // Another non-empty sync - should emit
832        emitter
833            .emit_synced(&InternalSyncedEvent {
834                wallet: false,
835                wallet_state: true,
836                deposits: false,
837                lnurl_metadata: false,
838                storage_incoming: None,
839            })
840            .await;
841
842        assert_eq!(count.load(Ordering::Relaxed), 2); // Now count should be 2
843    }
844}