breez_sdk_core/
backup.rs

1use crate::{
2    breez_services::BackupFailedData,
3    error::SdkResult,
4    persist::db::{HookEvent, SqliteStorage},
5    BreezEvent, Config,
6};
7
8use anyhow::{anyhow, Result};
9use ecies::symmetric::{sym_decrypt, sym_encrypt};
10use miniz_oxide::{deflate::compress_to_vec, inflate::decompress_to_vec_with_limit};
11use std::{
12    fs::{self, File},
13    io::{Read, Write},
14    path::Path,
15    sync::Arc,
16    time::{SystemTime, UNIX_EPOCH},
17};
18use tempfile::tempdir_in;
19use tokio::{
20    runtime::Builder,
21    sync::{
22        broadcast::{self, error::RecvError},
23        mpsc::{self, Sender},
24        watch, Mutex,
25    },
26};
27
28#[derive(Clone)]
29pub(crate) struct BackupRequest {
30    force: bool,
31    on_complete: Option<mpsc::Sender<Result<()>>>,
32}
33
34unsafe impl Send for BackupRequest {}
35unsafe impl Sync for BackupRequest {}
36
37impl BackupRequest {
38    pub(crate) fn new(force: bool) -> Self {
39        Self {
40            force,
41            on_complete: None,
42        }
43    }
44
45    pub(crate) fn with(on_complete: Sender<Result<()>>, force: bool) -> Self {
46        Self {
47            force,
48            on_complete: Some(on_complete),
49        }
50    }
51}
52
53// BackupState is the sdk state that requiers syncing between multiple apps.
54/// It is just a blob of data marked with a specific version (generation).
55/// The generation signals for the local state if the remote state is newer,
56/// where in that case the local state should be updated with the remote state prior to pushing
57/// any local changes.
58#[derive(Clone, PartialEq, Eq, Debug)]
59pub struct BackupState {
60    pub generation: u64,
61    pub data: Vec<u8>,
62}
63
64/// BackupTransport is the interface for syncing the sdk state between multiple apps.
65#[tonic::async_trait]
66pub trait BackupTransport: Send + Sync {
67    async fn pull(&self) -> SdkResult<Option<BackupState>>;
68    async fn push(&self, version: Option<u64>, data: Vec<u8>) -> SdkResult<u64>;
69}
70
71pub(crate) struct BackupWatcher {
72    pub(crate) config: Config,
73    backup_request_sender: Mutex<Option<mpsc::Sender<BackupRequest>>>,
74    inner: Arc<dyn BackupTransport>,
75    persister: Arc<SqliteStorage>,
76    encryption_key: Vec<u8>,
77    legacy_encryption_key: Vec<u8>,
78    events_notifier: broadcast::Sender<BreezEvent>,
79}
80
81/// watches for sync requests and syncs the sdk state when a request is detected.
82impl BackupWatcher {
83    pub(crate) fn new(
84        config: Config,
85        inner: Arc<dyn BackupTransport>,
86        persister: Arc<SqliteStorage>,
87        encryption_key: Vec<u8>,
88        legacy_encryption_key: Vec<u8>,
89    ) -> Self {
90        let (events_notifier, _) = broadcast::channel::<BreezEvent>(100);
91
92        Self {
93            config,
94            backup_request_sender: Mutex::new(None),
95            inner,
96            persister,
97            encryption_key,
98            legacy_encryption_key,
99            events_notifier,
100        }
101    }
102
103    async fn set_request_sender(&self, sender: mpsc::Sender<BackupRequest>) {
104        let mut backup_request_sender = self.backup_request_sender.lock().await;
105        *backup_request_sender = Some(sender);
106    }
107
108    pub(crate) async fn start(&self, mut quit_receiver: watch::Receiver<()>) -> Result<()> {
109        let worker = BackupWorker::new(
110            self.config.working_dir.clone(),
111            self.inner.clone(),
112            self.persister.clone(),
113            self.encryption_key.clone(),
114            self.legacy_encryption_key.clone(),
115            self.events_notifier.clone(),
116        );
117
118        let mut hooks_subscription = self.persister.subscribe_hooks();
119        let (backup_request_sender, mut backup_request_receiver) =
120            mpsc::channel::<BackupRequest>(100);
121        self.set_request_sender(backup_request_sender.clone()).await;
122
123        let rt = Builder::new_current_thread().enable_all().build()?;
124        std::thread::spawn(move || {
125            rt.block_on(async move {
126                loop {
127                    tokio::select! {
128
129                     // We listen to manual backup requests from the user
130                     req = backup_request_receiver.recv() => {
131                      match req {
132                       Some(req) => {
133                        match worker.sync(req.force).await {
134                         Ok(_) => {
135                          if let Some(callback) = req.on_complete {
136                           _ = callback.send(Ok(())).await;
137                          }
138                         }
139                         Err(e) => {
140                          error!("Sync worker returned with error {e}");
141                          if let Some(callback) = req.on_complete {
142                           _ = callback.send(Err(e)).await;
143                          }
144                         }
145                        };
146                       }
147                       None => {
148                        return
149                       }
150                      }
151                     }
152
153                      // We spin the backup worker on every new entry to the sync_requests table.
154                      event = hooks_subscription.recv() => {
155                        match event {
156                            Ok(HookEvent::Insert{table}) => {
157                             if table == "sync_requests"{
158                              // we do want to wait a bit to allow for multiple sync requests to be inserted
159                              tokio::time::sleep(std::time::Duration::from_millis(100)).await;
160                              if let Err(e) = worker.sync(false).await {
161                               error!("Sync worker returned with error {e}");
162                              }
163                             }
164                            }
165                            // If we are lagging we want to trigger sync
166                            Err(RecvError::Lagged(_)) => {
167                             if let Err(e) = worker.sync(false).await {
168                              error!("Sync worker returned with error {e}");
169                             }
170                            }
171                            // If the channel is closed we exit
172                            Err(_) => {
173                             return
174                            }
175                        }
176                      },
177                      // We also want to exit if we receive a quit signal
178                      _ = quit_receiver.changed() => {
179                        return
180                      }
181                    }
182                }
183            });
184        });
185
186        Ok(())
187    }
188
189    pub(crate) fn subscribe_events(&self) -> broadcast::Receiver<BreezEvent> {
190        self.events_notifier.subscribe()
191    }
192
193    pub(crate) async fn request_backup(&self, req: BackupRequest) -> Result<()> {
194        let request_handler = self.backup_request_sender.lock().await;
195        let h = request_handler.clone();
196        h.ok_or_else(|| anyhow!("No backup request handler found"))?
197            .send(req)
198            .await
199            .map_err(|_| anyhow!("Failed to send backup request, the channel is likely closed"))
200    }
201}
202
203/// BackupWorker is a worker that bidirectionally syncs the sdk state.
204#[derive(Clone)]
205struct BackupWorker {
206    working_dir_path: String,
207    inner: Arc<dyn BackupTransport>,
208    persister: Arc<SqliteStorage>,
209    encryption_key: Vec<u8>,
210    legacy_encryption_key: Vec<u8>,
211    events_notifier: broadcast::Sender<BreezEvent>,
212}
213
214impl BackupWorker {
215    pub(crate) fn new(
216        working_dir_path: String,
217        inner: Arc<dyn BackupTransport>,
218        persister: Arc<SqliteStorage>,
219        encryption_key: Vec<u8>,
220        legacy_encryption_key: Vec<u8>,
221        events_notifier: broadcast::Sender<BreezEvent>,
222    ) -> Self {
223        Self {
224            working_dir_path,
225            inner,
226            persister,
227            encryption_key,
228            legacy_encryption_key,
229            events_notifier,
230        }
231    }
232
233    async fn notify(&self, e: BreezEvent) -> Result<()> {
234        // we don't care for errors here as this happens if
235        // there ar eno subscribers, just ignoring them.
236        _ = self.events_notifier.send(e);
237        Ok(())
238    }
239
240    async fn sync(&self, force: bool) -> Result<()> {
241        let last_sync_request_id = self.persister.get_last_sync_request()?.unwrap_or_default();
242        // In case we don't  have any sync requests the worker can exit
243        if !force && last_sync_request_id == 0 {
244            return Ok(());
245        }
246        let sync_dir = self.sync_dir()?;
247        let notify_res = match self
248            .sync_internal(sync_dir.clone(), last_sync_request_id)
249            .await
250        {
251            Ok(_) => {
252                info!("backup sync completed successfully");
253                self.notify(BreezEvent::BackupSucceeded).await
254            }
255            Err(e) => {
256                error!("backup sync failed {}", e);
257                self.notify(BreezEvent::BackupFailed {
258                    details: BackupFailedData {
259                        error: e.to_string(),
260                    },
261                })
262                .await
263            }
264        };
265        fs::remove_dir_all(Path::new(sync_dir.as_str()))?;
266
267        match notify_res {
268            Ok(r) => Ok(r),
269            Err(e) => {
270                error!("failed to notify backup event {}", e);
271                Err(e)
272            }
273        }
274    }
275
276    /// Syncs the sdk state with the remote state.
277    /// The process is done in 3 steps:
278    /// 1. Try to push the local state to the remote state using the current version (optimistic).
279    /// 2. If the push fails, sync the remote changes into the local changes including the remote newer version.
280    /// 3. Try to push the local state again with the new version.
281    async fn sync_internal(&self, sync_dir: String, mut last_sync_request_id: u64) -> Result<()> {
282        // get the last local sync version and the last sync request id
283        let last_version = self.persister.get_last_sync_version()?;
284
285        self.notify(BreezEvent::BackupStarted).await?;
286
287        // Backup the local sdk state
288        let local_storage_file = tempfile::NamedTempFile::new_in(sync_dir.clone())?;
289        self.persister.backup(local_storage_file.path())?;
290        debug!(
291            "syncing storge, last_version = {:?}, file = {:?}",
292            last_version,
293            local_storage_file.path()
294        );
295
296        // read the backed up local data
297        let mut f = File::open(local_storage_file.path())?;
298        let mut data = vec![];
299        f.read_to_end(&mut data)?;
300
301        // Try to push with the current version, if no one else has pushed then we will succeed
302        let optimistic_sync = self.push(last_version, data.clone()).await;
303
304        let sync_result = match optimistic_sync {
305            Ok((new_version, data)) => {
306                info!("Optimistic sync succeeded, new version = {new_version}");
307                Ok((new_version, data))
308            }
309            Err(e) => {
310                info!("Optimistic sync failed, trying to sync remote changes {e}");
311                // We need to sync remote changes and then retry the push
312                self.sync_remote_and_push(sync_dir, data, &mut last_sync_request_id)
313                    .await
314            }
315        };
316
317        // In case we succeeded to push the local changes, we need to:
318        // 1. Delete the sync requests so.
319        // 2. Update the last sync version.
320        match sync_result {
321            Ok((new_version, new_data)) => {
322                let now = SystemTime::now();
323                self.persister
324                    .set_last_sync_version(new_version, &new_data)?;
325                self.persister
326                    .delete_sync_requests_up_to(last_sync_request_id)?;
327                self.persister
328                    .set_last_backup_time(now.duration_since(UNIX_EPOCH).unwrap().as_secs())?;
329                info!("Sync succeeded");
330                Ok(())
331            }
332            Err(e) => {
333                error!("Sync failed: {}", e);
334                Err(e)
335            }
336        }
337    }
338
339    /// Syncs the remote changes into the local changes and then tries to push the local changes again.    
340    async fn sync_remote_and_push(
341        &self,
342        sync_dir: String,
343        local_data: Vec<u8>,
344        last_sync_request_id: &mut u64,
345    ) -> Result<(u64, Vec<u8>)> {
346        let remote_state = self.pull().await?;
347        let tmp_dir = tempdir_in(sync_dir)?;
348        let remote_storage_path = tmp_dir.path();
349        let mut remote_storage_file = File::create(remote_storage_path.join("sync_storage.sql"))?;
350        info!("remote_storage_path = {remote_storage_path:?}");
351        match remote_state {
352            Some(state) => {
353                // Write the remote state to a file
354                remote_storage_file.write_all(&state.data[..])?;
355                remote_storage_file.flush()?;
356                let remote_storage = SqliteStorage::new(
357                    remote_storage_path
358                        .as_os_str()
359                        .to_str()
360                        .unwrap()
361                        .to_string(),
362                );
363
364                // Bidirectionaly sync the local and remote changes
365                self.persister
366                    .import_remote_changes(&remote_storage, true)?;
367                remote_storage.import_remote_changes(self.persister.as_ref(), false)?;
368                *last_sync_request_id = self.persister.get_last_sync_request()?.unwrap_or_default();
369
370                let mut hex = vec![];
371                remote_storage_file =
372                    File::open(Path::new(remote_storage_path).join("sync_storage.sql"))?;
373                remote_storage_file.read_to_end(&mut hex)?;
374
375                // Push the local changes again
376                let result = self.push(Some(state.generation), hex).await?;
377                Ok(result)
378            }
379
380            // In case there is no remote state, we can just push the local changes
381            None => {
382                debug!("No remote state, pushing local changes");
383                self.push(None, local_data).await
384            }
385        }
386    }
387
388    async fn pull(&self) -> Result<Option<BackupState>> {
389        let state = self.inner.pull().await?;
390        match state {
391            Some(state) => {
392                let mut decrypted =
393                    sym_decrypt(self.encryption_key.as_slice(), state.data.as_slice());
394                if decrypted.is_none() {
395                    warn!("Failed to decrypt backup with new key, trying legacy key");
396                    decrypted =
397                        sym_decrypt(self.legacy_encryption_key.as_slice(), state.data.as_slice());
398                }
399                let decrypted_data = decrypted.ok_or(anyhow!("Failed to decrypt backup"))?;
400                match decompress_to_vec_with_limit(&decrypted_data, 4000000) {
401                    Ok(decompressed) => Ok(Some(BackupState {
402                        generation: state.generation,
403                        data: decompressed,
404                    })),
405                    Err(e) => {
406                        error!("Failed to decompress backup: {e}");
407                        Ok(None)
408                    }
409                }
410            }
411            None => Ok(None),
412        }
413    }
414
415    async fn push(&self, version: Option<u64>, data: Vec<u8>) -> Result<(u64, Vec<u8>)> {
416        let compressed_data = compress_to_vec(&data, 10);
417        info!(
418            "Pushing compressed data with size = {}",
419            compressed_data.len()
420        );
421        let encrypted_data =
422            sym_encrypt(self.encryption_key.as_slice(), compressed_data.as_slice())
423                .ok_or(anyhow!("Failed to encrypt backup"))?;
424        let version = self.inner.push(version, encrypted_data.clone()).await?;
425        Ok((version, encrypted_data))
426    }
427
428    fn sync_dir(&self) -> Result<String> {
429        let working_dir = Path::new(self.working_dir_path.as_str());
430        let buf = working_dir.join("sync");
431        let sync_path = buf.to_str();
432        let path_str = match sync_path {
433            Some(sync_path) => {
434                if !Path::new(sync_path).exists() {
435                    fs::create_dir_all(sync_path)?;
436                }
437                Ok(sync_path)
438            }
439            None => Err(anyhow!("Failed to create sync directory")),
440        }?;
441        Ok(path_str.into())
442    }
443}
444
445#[cfg(test)]
446mod tests {
447    use crate::test_utils::get_test_ofp_48h;
448    use crate::ListSwapsRequest;
449    use crate::{
450        backup::BackupRequest,
451        persist::db::SqliteStorage,
452        test_utils::{create_test_config, create_test_persister, MockBackupTransport},
453        BreezEvent, SwapInfo,
454    };
455    use std::{sync::Arc, vec};
456    use tokio::sync::{broadcast::Receiver, watch};
457    use tokio::{
458        spawn,
459        time::{Duration, Instant},
460    };
461
462    use super::BackupWatcher;
463
464    async fn create_test_backup_watcher(
465    ) -> (watch::Sender<()>, BackupWatcher, Arc<MockBackupTransport>) {
466        let config = create_test_config();
467        let persister = Arc::new(create_test_persister(config.clone()));
468        persister.init().unwrap();
469        let transport = Arc::new(MockBackupTransport::new());
470        let watcher = BackupWatcher::new(
471            config,
472            transport.clone(),
473            persister,
474            vec![0; 32],
475            vec![0; 32],
476        );
477        let (quit_sender, receiver) = watch::channel(());
478        watcher.start(receiver).await.unwrap();
479        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
480        (quit_sender, watcher, transport)
481    }
482
483    async fn test_expected_backup_events(
484        mut subscription: Receiver<BreezEvent>,
485        transport: Arc<MockBackupTransport>,
486        expected_events: Vec<BreezEvent>,
487        expected_pushed: u32,
488        expected_pulls: u32,
489    ) {
490        let start = Instant::now() + Duration::from_millis(20000);
491        let mut interval = tokio::time::interval_at(start, Duration::from_secs(3));
492        let mut events = vec![];
493        loop {
494            tokio::select! {
495                  _ = interval.tick() => {
496                    panic!("Timed out waiting for events");
497                 }
498                 r = subscription.recv() => {
499                    match r {
500                        Ok(event) => {
501                            events.push(event);
502                            if events.len() == expected_events.len() {
503                                assert_eq!(events, expected_events);
504                                assert_eq!(transport.pulled(), expected_pulls);
505                                assert_eq!(transport.pushed(), expected_pushed);
506                                return;
507                            }
508                        }
509                        Err(e) => {
510                            panic!("Failed to receive event: {}", e);
511                        }
512                    }
513                }
514            }
515        }
516    }
517
518    // Test start and drop
519    #[tokio::test]
520    async fn test_start() {
521        let (quit_sender, _, _) = create_test_backup_watcher().await;
522        quit_sender.send(()).unwrap();
523        quit_sender.closed().await;
524    }
525
526    // Test two optimistic backups in a row
527    #[tokio::test]
528    async fn test_optimistic() {
529        let (quit_sender, watcher, transport) = create_test_backup_watcher().await;
530        let subscription = watcher.subscribe_events();
531        let expected_events = vec![
532            BreezEvent::BackupStarted,
533            BreezEvent::BackupSucceeded,
534            BreezEvent::BackupStarted,
535            BreezEvent::BackupSucceeded,
536        ];
537
538        let task_subscription = watcher.subscribe_events();
539        //let cloned_watcher = watcher.clone();
540        //let request_handler = watcher.request_handler().unwrap();
541        tokio::spawn(async move {
542            watcher
543                .request_backup(BackupRequest::new(true))
544                .await
545                .unwrap();
546            wait_for_backup_success(task_subscription).await;
547            watcher
548                .request_backup(BackupRequest::new(true))
549                .await
550                .unwrap();
551        });
552        test_expected_backup_events(subscription, transport, expected_events, 2, 0).await;
553        _ = quit_sender.send(());
554        quit_sender.closed().await;
555    }
556
557    // Test case when remote backup is not available and we only push the local backup.
558    #[tokio::test]
559    async fn test_remote_not_exist() {
560        let (quit_sender, watcher, transport) = create_test_backup_watcher().await;
561        let subscription = watcher.subscribe_events();
562        let expected_events = vec![
563            BreezEvent::BackupStarted,
564            BreezEvent::BackupSucceeded,
565            BreezEvent::BackupStarted,
566            BreezEvent::BackupSucceeded,
567        ];
568
569        let persister = watcher.persister.clone();
570        let task_subscription = watcher.subscribe_events();
571        tokio::spawn(async move {
572            let subscription = task_subscription.resubscribe();
573            watcher
574                .request_backup(BackupRequest::new(true))
575                .await
576                .unwrap();
577            wait_for_backup_success(subscription).await;
578            persister.set_last_sync_version(10, &vec![]).unwrap();
579            watcher
580                .request_backup(BackupRequest::new(true))
581                .await
582                .unwrap();
583        });
584        test_expected_backup_events(subscription, transport, expected_events, 3, 1).await;
585        _ = quit_sender.send(());
586        quit_sender.closed().await;
587    }
588
589    // Test case when remote backup is older than local backup so we need to pull it first.
590    #[tokio::test]
591    async fn test_local_newer_than_remote() {
592        let (quit_sender, watcher, transport) = create_test_backup_watcher().await;
593        let subscription = watcher.subscribe_events();
594        let expected_events = vec![
595            BreezEvent::BackupStarted,
596            BreezEvent::BackupSucceeded,
597            BreezEvent::BackupStarted,
598            BreezEvent::BackupSucceeded,
599        ];
600
601        let persister = watcher.persister.clone();
602        let task_subscription = watcher.subscribe_events();
603        tokio::spawn(async move {
604            let subscription = task_subscription.resubscribe();
605            watcher
606                .request_backup(BackupRequest::new(true))
607                .await
608                .unwrap();
609            wait_for_backup_success(subscription).await;
610            persister.set_last_sync_version(10, &vec![]).unwrap();
611            watcher
612                .request_backup(BackupRequest::new(true))
613                .await
614                .unwrap();
615        });
616        test_expected_backup_events(subscription, transport, expected_events, 3, 1).await;
617        _ = quit_sender.send(());
618        quit_sender.closed().await;
619    }
620
621    // Test versions history table is pupulated correctly
622    #[tokio::test]
623    async fn test_versions_history() {
624        let (quit_sender, watcher, transport) = create_test_backup_watcher().await;
625        let subscription = watcher.subscribe_events();
626        let expected_events = vec![
627            BreezEvent::BackupStarted,
628            BreezEvent::BackupSucceeded,
629            BreezEvent::BackupStarted,
630            BreezEvent::BackupSucceeded,
631            BreezEvent::BackupStarted,
632            BreezEvent::BackupSucceeded,
633        ];
634
635        let task_subscription = watcher.subscribe_events();
636        let persister = watcher.persister.clone();
637        tokio::spawn(async move {
638            for _ in 0..3 {
639                let subscription = task_subscription.resubscribe();
640                watcher
641                    .request_backup(BackupRequest::new(true))
642                    .await
643                    .unwrap();
644                wait_for_backup_success(subscription).await;
645            }
646        });
647        test_expected_backup_events(subscription, transport, expected_events, 3, 0).await;
648        let history = persister.sync_versions_history().unwrap();
649        assert_eq!(history.len(), 3);
650        _ = quit_sender.send(());
651        quit_sender.closed().await;
652    }
653
654    // Test versions history table is not bypassing the limit
655    #[tokio::test]
656    async fn test_limit_versions_history() {
657        let (quit_sender, watcher, transport) = create_test_backup_watcher().await;
658        let subscription = watcher.subscribe_events();
659        let mut expected_events = vec![];
660        for _ in 0..30 {
661            expected_events.push(BreezEvent::BackupStarted);
662            expected_events.push(BreezEvent::BackupSucceeded);
663        }
664
665        let task_subscription = watcher.subscribe_events();
666        let persister = watcher.persister.clone();
667        tokio::spawn(async move {
668            for _ in 0..30 {
669                let subscription = task_subscription.resubscribe();
670                watcher
671                    .request_backup(BackupRequest::new(true))
672                    .await
673                    .unwrap();
674                wait_for_backup_success(subscription).await;
675            }
676        });
677        test_expected_backup_events(subscription, transport, expected_events, 30, 0).await;
678        let history = persister.sync_versions_history().unwrap();
679        assert_eq!(history.len(), 20);
680        _ = quit_sender.send(());
681        quit_sender.closed().await;
682    }
683
684    // Test that the actualy triggers cause sync and we only sync once
685    #[tokio::test]
686    async fn test_sync_triggers() {
687        let (quit_sender, watcher, transport) = create_test_backup_watcher().await;
688        let subscription = watcher.subscribe_events();
689
690        let mut expected_events = vec![];
691        for _ in 0..1 {
692            expected_events.push(BreezEvent::BackupStarted);
693            expected_events.push(BreezEvent::BackupSucceeded);
694        }
695
696        let persister = watcher.persister.clone();
697        spawn(async move {
698            // Add some data to the sync database to trigger sync
699            populate_sync_table(persister.clone());
700        });
701
702        test_expected_backup_events(subscription, transport, expected_events, 1, 0).await;
703        let history = watcher.persister.sync_versions_history().unwrap();
704        assert_eq!(history.len(), 1);
705        _ = quit_sender.send(());
706        quit_sender.closed().await;
707    }
708
709    // Test that we only sync once if we have multiple sync requests
710    // and the data is synced.
711    // Steps:
712    // 1. Popoulate sync table - that should trigger sync
713    // 2. Add some delay for the sync to complete.
714    // 3. Delete all local data and change the local version to simulate conflict.
715    // 4. Add sync request - that should trigger sync.
716    // 5. Check that remote changes were populated locally and we synced exactly twice.
717    #[tokio::test]
718    async fn test_trigger_during_sync() {
719        let (quit_sender, watcher, transport) = create_test_backup_watcher().await;
720        let persister = watcher.persister.clone();
721
722        let mut expected_events = vec![];
723        for _ in 0..2 {
724            expected_events.push(BreezEvent::BackupStarted);
725            expected_events.push(BreezEvent::BackupSucceeded);
726        }
727
728        let main_subscription = watcher.subscribe_events();
729        let task_subscription = watcher.subscribe_events();
730        let task_subscription1 = task_subscription.resubscribe();
731        let cloned_persister = watcher.persister.clone();
732        tokio::spawn(async move {
733            // Add some data to the sync database and wait for backup to complete.
734            populate_sync_table(persister.clone());
735            wait_for_backup_success(task_subscription1).await;
736            // Add sync request - that should trigger sync that handle a conflict.
737            let task_subscription2 = task_subscription.resubscribe();
738
739            persister.set_last_sync_version(10, &vec![]).unwrap();
740            // Remove the data frmo the sql database and change the sync version to cause conflict.
741            persister
742                    .get_connection()
743                    .unwrap()
744                    .execute(
745                        "delete from sync.swaps; delete from sync.reverse_swaps; delete from sync.payment_external_info;",
746                        [],
747                    )
748                    .unwrap();
749            watcher
750                .request_backup(BackupRequest::new(true))
751                .await
752                .unwrap();
753            wait_for_backup_success(task_subscription2).await;
754        });
755        test_expected_backup_events(main_subscription, transport, expected_events, 3, 1).await;
756        let swaps = cloned_persister
757            .list_swaps(ListSwapsRequest::default())
758            .unwrap();
759        assert!(swaps.len() == 1);
760        _ = quit_sender.send(());
761        quit_sender.closed().await;
762    }
763
764    fn populate_sync_table(persister: Arc<SqliteStorage>) {
765        let tested_swap_info = SwapInfo {
766            bitcoin_address: String::from("1"),
767            created_at: 0,
768            lock_height: 100,
769            payment_hash: vec![1],
770            preimage: vec![2],
771            private_key: vec![3],
772            public_key: vec![4],
773            swapper_public_key: vec![5],
774            script: vec![5],
775            bolt11: None,
776            paid_msat: 0,
777            unconfirmed_sats: 0,
778            confirmed_sats: 0,
779            total_incoming_txs: 0,
780            status: crate::models::SwapStatus::Initial,
781            refund_tx_ids: Vec::new(),
782            unconfirmed_tx_ids: Vec::new(),
783            confirmed_tx_ids: Vec::new(),
784            min_allowed_deposit: 0,
785            max_allowed_deposit: 100,
786            max_swapper_payable: 200,
787            last_redeem_error: None,
788            channel_opening_fees: Some(get_test_ofp_48h(1, 1).into()),
789            confirmed_at: Some(555),
790        };
791        persister.insert_swap(tested_swap_info).unwrap();
792    }
793
794    async fn wait_for_backup_success(mut subscription: Receiver<BreezEvent>) {
795        while let Ok(event) = subscription.recv().await {
796            if event == BreezEvent::BackupSucceeded {
797                return;
798            }
799        }
800    }
801}