1use core::fmt;
2use std::{
3 collections::BTreeMap,
4 sync::atomic::{AtomicU64, Ordering},
5};
6
7use serde::Serialize;
8use tokio::sync::{Mutex, RwLock};
9use uuid::Uuid;
10
11use crate::{DepositInfo, Payment};
12
13#[allow(clippy::large_enum_variant)]
15#[derive(Debug, Clone, Serialize)]
16#[cfg_attr(feature = "uniffi", derive(uniffi::Enum))]
17pub enum SdkEvent {
18 Synced,
20 UnclaimedDeposits {
22 unclaimed_deposits: Vec<DepositInfo>,
23 },
24 ClaimedDeposits {
25 claimed_deposits: Vec<DepositInfo>,
26 },
27 PaymentSucceeded {
28 payment: Payment,
29 },
30 PaymentPending {
31 payment: Payment,
32 },
33 PaymentFailed {
34 payment: Payment,
35 },
36}
37
38impl fmt::Display for SdkEvent {
39 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
40 match self {
41 SdkEvent::Synced => write!(f, "Synced"),
42 SdkEvent::UnclaimedDeposits { unclaimed_deposits } => {
43 write!(f, "UnclaimedDeposits: {unclaimed_deposits:?}")
44 }
45 SdkEvent::ClaimedDeposits { claimed_deposits } => {
46 write!(f, "ClaimedDeposits: {claimed_deposits:?}")
47 }
48 SdkEvent::PaymentSucceeded { payment } => {
49 write!(f, "PaymentSucceeded: {payment:?}")
50 }
51 SdkEvent::PaymentPending { payment } => {
52 write!(f, "PaymentPending: {payment:?}")
53 }
54 SdkEvent::PaymentFailed { payment } => {
55 write!(f, "PaymentFailed: {payment:?}")
56 }
57 }
58 }
59}
60
61#[allow(clippy::struct_excessive_bools)]
62#[derive(Debug, Default)]
63pub struct InternalSyncedEvent {
64 pub wallet: bool,
65 pub wallet_state: bool,
66 pub deposits: bool,
67 pub lnurl_metadata: bool,
68 pub storage_incoming: Option<u32>,
69}
70
71impl InternalSyncedEvent {
72 pub fn any(&self) -> bool {
73 self.wallet
74 || self.wallet_state
75 || self.deposits
76 || self.lnurl_metadata
77 || self.storage_incoming.is_some()
78 }
79
80 pub fn any_non_zero(&self) -> bool {
81 self.wallet
82 || self.wallet_state
83 || self.deposits
84 || self.lnurl_metadata
85 || self.storage_incoming.is_some_and(|v| v > 0)
86 }
87
88 pub fn merge(&self, other: &InternalSyncedEvent) -> Self {
89 Self {
90 wallet: self.wallet || other.wallet,
91 wallet_state: self.wallet_state || other.wallet_state,
92 deposits: self.deposits || other.deposits,
93 lnurl_metadata: self.lnurl_metadata || other.lnurl_metadata,
94 storage_incoming: self
95 .storage_incoming
96 .zip(other.storage_incoming)
97 .map(|(a, b)| a.saturating_add(b))
98 .or(self.storage_incoming)
99 .or(other.storage_incoming),
100 }
101 }
102}
103
104#[cfg_attr(feature = "uniffi", uniffi::export(callback_interface))]
106#[macros::async_trait]
107pub trait EventListener: Send + Sync {
108 async fn on_event(&self, event: SdkEvent);
110}
111
112pub struct EventEmitter {
114 has_real_time_sync: bool,
115 listener_index: AtomicU64,
116 listeners: RwLock<BTreeMap<String, Box<dyn EventListener>>>,
117 synced_event_buffer: Mutex<Option<InternalSyncedEvent>>,
118}
119
120impl EventEmitter {
121 pub fn new(has_real_time_sync: bool) -> Self {
123 Self {
124 has_real_time_sync,
125 listener_index: AtomicU64::new(0),
126 listeners: RwLock::new(BTreeMap::new()),
127 synced_event_buffer: Mutex::new(Some(InternalSyncedEvent::default())),
128 }
129 }
130
131 pub async fn add_listener(&self, listener: Box<dyn EventListener>) -> String {
141 let index = self.listener_index.fetch_add(1, Ordering::Relaxed);
142 let id = format!("listener_{}-{}", index, Uuid::new_v4());
143 let mut listeners = self.listeners.write().await;
144 listeners.insert(id.clone(), listener);
145 id
146 }
147
148 pub async fn remove_listener(&self, id: &str) -> bool {
158 let mut listeners = self.listeners.write().await;
159 listeners.remove(id).is_some()
160 }
161
162 pub async fn emit(&self, event: &SdkEvent) {
164 let listeners = self.listeners.read().await;
166
167 for listener in listeners.values() {
169 listener.on_event(event.clone()).await;
170 }
171 }
172
173 pub async fn emit_synced(&self, synced: &InternalSyncedEvent) {
174 if !synced.any() {
175 return;
177 }
178
179 let mut mtx = self.synced_event_buffer.lock().await;
180
181 let is_first_event = if let Some(buffered) = &*mtx {
182 let merged = buffered.merge(synced);
183
184 if merged.wallet && (!self.has_real_time_sync || merged.storage_incoming.is_some()) {
187 *mtx = None;
188 } else {
189 *mtx = Some(merged);
190 return;
191 }
192
193 true
194 } else {
195 false
196 };
197
198 drop(mtx);
199
200 if !is_first_event && !synced.any_non_zero() {
202 return;
203 }
204
205 self.emit(&SdkEvent::Synced).await;
207 }
208}
209
210impl Default for EventEmitter {
211 fn default() -> Self {
212 Self::new(false)
213 }
214}
215
216#[cfg(test)]
217mod tests {
218 use super::*;
219 use std::sync::Arc;
220 use std::sync::atomic::{AtomicBool, Ordering};
221
222 use macros::async_test_all;
223
224 #[cfg(feature = "browser-tests")]
225 wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
226
227 struct TestListener {
228 received: Arc<AtomicBool>,
229 }
230
231 #[macros::async_trait]
232 impl EventListener for TestListener {
233 async fn on_event(&self, _event: SdkEvent) {
234 self.received.store(true, Ordering::Relaxed);
235 }
236 }
237
238 #[async_test_all]
239 async fn test_event_emission() {
240 let emitter = EventEmitter::new(false);
241 let received = Arc::new(AtomicBool::new(false));
242
243 let listener = Box::new(TestListener {
245 received: received.clone(),
246 });
247
248 let _ = emitter.add_listener(listener).await;
249
250 let event = SdkEvent::Synced {};
251
252 emitter.emit(&event).await;
253
254 assert!(received.load(Ordering::Relaxed));
256 }
257
258 #[async_test_all]
259 async fn test_remove_listener() {
260 let emitter = EventEmitter::new(false);
261
262 let received1 = Arc::new(AtomicBool::new(false));
264 let received2 = Arc::new(AtomicBool::new(false));
265
266 let listener1 = Box::new(TestListener {
268 received: received1.clone(),
269 });
270
271 let listener2 = Box::new(TestListener {
272 received: received2.clone(),
273 });
274
275 let id1 = emitter.add_listener(listener1).await;
276 let id2 = emitter.add_listener(listener2).await;
277
278 assert!(emitter.remove_listener(&id1).await);
280
281 let event = SdkEvent::Synced {};
283 emitter.emit(&event).await;
284
285 assert!(!received1.load(Ordering::Relaxed));
287
288 assert!(received2.load(Ordering::Relaxed));
290
291 assert!(emitter.remove_listener(&id2).await);
293
294 assert!(!emitter.remove_listener("non-existent-id").await);
296 }
297
298 #[async_test_all]
299 async fn test_synced_event_only_emitted_with_wallet_sync() {
300 let emitter = EventEmitter::new(false);
301 let received = Arc::new(AtomicBool::new(false));
302
303 let listener = Box::new(TestListener {
304 received: received.clone(),
305 });
306
307 emitter.add_listener(listener).await;
308
309 emitter
311 .emit_synced(&InternalSyncedEvent {
312 wallet: false,
313 wallet_state: true,
314 deposits: true,
315 lnurl_metadata: true,
316 storage_incoming: None,
317 })
318 .await;
319
320 assert!(!received.load(Ordering::Relaxed));
321
322 emitter
324 .emit_synced(&InternalSyncedEvent {
325 wallet: true,
326 wallet_state: false,
327 deposits: false,
328 lnurl_metadata: false,
329 storage_incoming: Some(1),
330 })
331 .await;
332
333 assert!(received.load(Ordering::Relaxed));
334 }
335
336 #[async_test_all]
337 async fn test_has_real_time_sync_synced_event_only_emitted_with_wallet_and_storage_sync() {
338 let emitter = EventEmitter::new(true);
339 let received = Arc::new(AtomicBool::new(false));
340
341 let listener = Box::new(TestListener {
342 received: received.clone(),
343 });
344
345 emitter.add_listener(listener).await;
346
347 emitter
349 .emit_synced(&InternalSyncedEvent {
350 wallet: false,
351 wallet_state: false,
352 deposits: false,
353 lnurl_metadata: false,
354 storage_incoming: Some(0),
355 })
356 .await;
357
358 assert!(!received.load(Ordering::Relaxed));
359
360 emitter
362 .emit_synced(&InternalSyncedEvent {
363 wallet: true,
364 wallet_state: false,
365 deposits: false,
366 lnurl_metadata: false,
367 storage_incoming: None,
368 })
369 .await;
370
371 assert!(received.load(Ordering::Relaxed));
372 }
373
374 #[async_test_all]
375 async fn test_has_real_time_sync_synced_event_only_emitted_with_wallet_and_storage_sync_reverse()
376 {
377 let emitter = EventEmitter::new(true);
378 let received = Arc::new(AtomicBool::new(false));
379
380 let listener = Box::new(TestListener {
381 received: received.clone(),
382 });
383
384 emitter.add_listener(listener).await;
385
386 emitter
388 .emit_synced(&InternalSyncedEvent {
389 wallet: true,
390 wallet_state: false,
391 deposits: false,
392 lnurl_metadata: false,
393 storage_incoming: None,
394 })
395 .await;
396
397 assert!(!received.load(Ordering::Relaxed));
398
399 emitter
401 .emit_synced(&InternalSyncedEvent {
402 wallet: false,
403 wallet_state: false,
404 deposits: false,
405 lnurl_metadata: false,
406 storage_incoming: Some(0),
407 })
408 .await;
409
410 assert!(received.load(Ordering::Relaxed));
411 }
412
413 #[async_test_all]
414 async fn test_synced_event_buffers_until_wallet_sync() {
415 let emitter = EventEmitter::new(false);
416 let received = Arc::new(AtomicBool::new(false));
417
418 let listener = Box::new(TestListener {
419 received: received.clone(),
420 });
421
422 emitter.add_listener(listener).await;
423
424 emitter
426 .emit_synced(&InternalSyncedEvent {
427 wallet: false,
428 wallet_state: true,
429 deposits: false,
430 lnurl_metadata: false,
431 storage_incoming: None,
432 })
433 .await;
434
435 assert!(!received.load(Ordering::Relaxed));
436
437 emitter
438 .emit_synced(&InternalSyncedEvent {
439 wallet: false,
440 wallet_state: false,
441 deposits: true,
442 lnurl_metadata: false,
443 storage_incoming: None,
444 })
445 .await;
446
447 assert!(!received.load(Ordering::Relaxed));
448
449 emitter
450 .emit_synced(&InternalSyncedEvent {
451 wallet: false,
452 wallet_state: false,
453 deposits: false,
454 lnurl_metadata: true,
455 storage_incoming: None,
456 })
457 .await;
458
459 assert!(!received.load(Ordering::Relaxed));
460
461 emitter
462 .emit_synced(&InternalSyncedEvent {
463 wallet: false,
464 wallet_state: false,
465 deposits: false,
466 lnurl_metadata: false,
467 storage_incoming: None,
468 })
469 .await;
470
471 assert!(!received.load(Ordering::Relaxed));
472
473 emitter
475 .emit_synced(&InternalSyncedEvent {
476 wallet: true,
477 wallet_state: false,
478 deposits: false,
479 lnurl_metadata: false,
480 storage_incoming: None,
481 })
482 .await;
483
484 assert!(received.load(Ordering::Relaxed));
485 }
486
487 #[async_test_all]
488 async fn test_synced_event_all_true() {
489 let emitter = EventEmitter::new(false);
490 let received = Arc::new(AtomicBool::new(false));
491
492 let listener = Box::new(TestListener {
493 received: received.clone(),
494 });
495
496 emitter.add_listener(listener).await;
497
498 emitter
500 .emit_synced(&InternalSyncedEvent {
501 wallet: true,
502 wallet_state: true,
503 deposits: true,
504 lnurl_metadata: true,
505 storage_incoming: Some(1),
506 })
507 .await;
508
509 assert!(received.load(Ordering::Relaxed));
510 }
511
512 #[async_test_all]
513 async fn test_synced_event_empty_does_not_emit() {
514 let emitter = EventEmitter::new(false);
515 let received = Arc::new(AtomicBool::new(false));
516
517 let listener = Box::new(TestListener {
518 received: received.clone(),
519 });
520
521 emitter.add_listener(listener).await;
522
523 emitter
525 .emit_synced(&InternalSyncedEvent {
526 wallet: false,
527 wallet_state: false,
528 deposits: false,
529 lnurl_metadata: false,
530 storage_incoming: None,
531 })
532 .await;
533
534 assert!(!received.load(Ordering::Relaxed));
535 }
536
537 #[async_test_all]
538 async fn test_subsequent_syncs_after_wallet_emit_immediately() {
539 use std::sync::atomic::AtomicUsize;
540
541 struct CountingListener {
542 count: Arc<AtomicUsize>,
543 }
544
545 #[macros::async_trait]
546 impl EventListener for CountingListener {
547 async fn on_event(&self, event: SdkEvent) {
548 if matches!(event, SdkEvent::Synced) {
549 self.count.fetch_add(1, Ordering::Relaxed);
550 }
551 }
552 }
553
554 let emitter = EventEmitter::new(true);
555 let count = Arc::new(AtomicUsize::new(0));
556
557 let listener = Box::new(CountingListener {
558 count: count.clone(),
559 });
560
561 emitter.add_listener(listener).await;
562
563 emitter
565 .emit_synced(&InternalSyncedEvent {
566 wallet: true,
567 wallet_state: false,
568 deposits: false,
569 lnurl_metadata: false,
570 storage_incoming: Some(0),
571 })
572 .await;
573
574 assert_eq!(count.load(Ordering::Relaxed), 1);
575
576 emitter
578 .emit_synced(&InternalSyncedEvent {
579 wallet: false,
580 wallet_state: true,
581 deposits: false,
582 lnurl_metadata: false,
583 storage_incoming: None,
584 })
585 .await;
586
587 assert_eq!(count.load(Ordering::Relaxed), 2);
588
589 emitter
591 .emit_synced(&InternalSyncedEvent {
592 wallet: false,
593 wallet_state: false,
594 deposits: true,
595 lnurl_metadata: false,
596 storage_incoming: None,
597 })
598 .await;
599
600 assert_eq!(count.load(Ordering::Relaxed), 3);
601
602 emitter
603 .emit_synced(&InternalSyncedEvent {
604 wallet: false,
605 wallet_state: false,
606 deposits: false,
607 lnurl_metadata: true,
608 storage_incoming: None,
609 })
610 .await;
611
612 assert_eq!(count.load(Ordering::Relaxed), 4);
613
614 emitter
615 .emit_synced(&InternalSyncedEvent {
616 wallet: false,
617 wallet_state: false,
618 deposits: false,
619 lnurl_metadata: false,
620 storage_incoming: Some(1),
621 })
622 .await;
623
624 assert_eq!(count.load(Ordering::Relaxed), 5);
625
626 emitter
628 .emit_synced(&InternalSyncedEvent {
629 wallet: false,
630 wallet_state: false,
631 deposits: false,
632 lnurl_metadata: false,
633 storage_incoming: Some(0),
634 })
635 .await;
636
637 assert_eq!(count.load(Ordering::Relaxed), 5);
638
639 emitter
640 .emit_synced(&InternalSyncedEvent {
641 wallet: true,
642 wallet_state: false,
643 deposits: false,
644 lnurl_metadata: false,
645 storage_incoming: None,
646 })
647 .await;
648
649 assert_eq!(count.load(Ordering::Relaxed), 6);
650 }
651
652 #[async_test_all]
653 async fn test_empty_event_does_not_emit_after_wallet_sync() {
654 use std::sync::atomic::AtomicUsize;
655
656 struct CountingListener {
657 count: Arc<AtomicUsize>,
658 }
659
660 #[macros::async_trait]
661 impl EventListener for CountingListener {
662 async fn on_event(&self, event: SdkEvent) {
663 if matches!(event, SdkEvent::Synced) {
664 self.count.fetch_add(1, Ordering::Relaxed);
665 }
666 }
667 }
668
669 let emitter = EventEmitter::new(false);
670 let count = Arc::new(AtomicUsize::new(0));
671
672 let listener = Box::new(CountingListener {
673 count: count.clone(),
674 });
675
676 emitter.add_listener(listener).await;
677
678 emitter
680 .emit_synced(&InternalSyncedEvent {
681 wallet: true,
682 wallet_state: false,
683 deposits: false,
684 lnurl_metadata: false,
685 storage_incoming: None,
686 })
687 .await;
688
689 assert_eq!(count.load(Ordering::Relaxed), 1);
690
691 emitter
693 .emit_synced(&InternalSyncedEvent {
694 wallet: false,
695 wallet_state: false,
696 deposits: false,
697 lnurl_metadata: false,
698 storage_incoming: None,
699 })
700 .await;
701
702 assert_eq!(count.load(Ordering::Relaxed), 1); emitter
706 .emit_synced(&InternalSyncedEvent {
707 wallet: false,
708 wallet_state: true,
709 deposits: false,
710 lnurl_metadata: false,
711 storage_incoming: None,
712 })
713 .await;
714
715 assert_eq!(count.load(Ordering::Relaxed), 2); }
717}