1use crate::{
2 breez_services::BackupFailedData,
3 error::SdkResult,
4 persist::db::{HookEvent, SqliteStorage},
5 BreezEvent, Config,
6};
7
8use anyhow::{anyhow, Result};
9use ecies::symmetric::{sym_decrypt, sym_encrypt};
10use miniz_oxide::{deflate::compress_to_vec, inflate::decompress_to_vec_with_limit};
11use std::{
12 fs::{self, File},
13 io::{Read, Write},
14 path::Path,
15 sync::Arc,
16 time::{SystemTime, UNIX_EPOCH},
17};
18use tempfile::tempdir_in;
19use tokio::{
20 runtime::Builder,
21 sync::{
22 broadcast::{self, error::RecvError},
23 mpsc::{self, Sender},
24 watch, Mutex,
25 },
26};
27
28#[derive(Clone)]
29pub(crate) struct BackupRequest {
30 force: bool,
31 on_complete: Option<mpsc::Sender<Result<()>>>,
32}
33
34unsafe impl Send for BackupRequest {}
35unsafe impl Sync for BackupRequest {}
36
37impl BackupRequest {
38 pub(crate) fn new(force: bool) -> Self {
39 Self {
40 force,
41 on_complete: None,
42 }
43 }
44
45 pub(crate) fn with(on_complete: Sender<Result<()>>, force: bool) -> Self {
46 Self {
47 force,
48 on_complete: Some(on_complete),
49 }
50 }
51}
52
53#[derive(Clone, PartialEq, Eq, Debug)]
59pub struct BackupState {
60 pub generation: u64,
61 pub data: Vec<u8>,
62}
63
64#[tonic::async_trait]
66pub trait BackupTransport: Send + Sync {
67 async fn pull(&self) -> SdkResult<Option<BackupState>>;
68 async fn push(&self, version: Option<u64>, data: Vec<u8>) -> SdkResult<u64>;
69}
70
71pub(crate) struct BackupWatcher {
72 pub(crate) config: Config,
73 backup_request_sender: Mutex<Option<mpsc::Sender<BackupRequest>>>,
74 inner: Arc<dyn BackupTransport>,
75 persister: Arc<SqliteStorage>,
76 encryption_key: Vec<u8>,
77 legacy_encryption_key: Vec<u8>,
78 events_notifier: broadcast::Sender<BreezEvent>,
79}
80
81impl BackupWatcher {
83 pub(crate) fn new(
84 config: Config,
85 inner: Arc<dyn BackupTransport>,
86 persister: Arc<SqliteStorage>,
87 encryption_key: Vec<u8>,
88 legacy_encryption_key: Vec<u8>,
89 ) -> Self {
90 let (events_notifier, _) = broadcast::channel::<BreezEvent>(100);
91
92 Self {
93 config,
94 backup_request_sender: Mutex::new(None),
95 inner,
96 persister,
97 encryption_key,
98 legacy_encryption_key,
99 events_notifier,
100 }
101 }
102
103 async fn set_request_sender(&self, sender: mpsc::Sender<BackupRequest>) {
104 let mut backup_request_sender = self.backup_request_sender.lock().await;
105 *backup_request_sender = Some(sender);
106 }
107
108 pub(crate) async fn start(&self, mut quit_receiver: watch::Receiver<()>) -> Result<()> {
109 let worker = BackupWorker::new(
110 self.config.working_dir.clone(),
111 self.inner.clone(),
112 self.persister.clone(),
113 self.encryption_key.clone(),
114 self.legacy_encryption_key.clone(),
115 self.events_notifier.clone(),
116 );
117
118 let mut hooks_subscription = self.persister.subscribe_hooks();
119 let (backup_request_sender, mut backup_request_receiver) =
120 mpsc::channel::<BackupRequest>(100);
121 self.set_request_sender(backup_request_sender.clone()).await;
122
123 let rt = Builder::new_current_thread().enable_all().build()?;
124 std::thread::spawn(move || {
125 rt.block_on(async move {
126 loop {
127 tokio::select! {
128
129 req = backup_request_receiver.recv() => {
131 match req {
132 Some(req) => {
133 match worker.sync(req.force).await {
134 Ok(_) => {
135 if let Some(callback) = req.on_complete {
136 _ = callback.send(Ok(())).await;
137 }
138 }
139 Err(e) => {
140 error!("Sync worker returned with error {e}");
141 if let Some(callback) = req.on_complete {
142 _ = callback.send(Err(e)).await;
143 }
144 }
145 };
146 }
147 None => {
148 return
149 }
150 }
151 }
152
153 event = hooks_subscription.recv() => {
155 match event {
156 Ok(HookEvent::Insert{table}) => {
157 if table == "sync_requests"{
158 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
160 if let Err(e) = worker.sync(false).await {
161 error!("Sync worker returned with error {e}");
162 }
163 }
164 }
165 Err(RecvError::Lagged(_)) => {
167 if let Err(e) = worker.sync(false).await {
168 error!("Sync worker returned with error {e}");
169 }
170 }
171 Err(_) => {
173 return
174 }
175 }
176 },
177 _ = quit_receiver.changed() => {
179 return
180 }
181 }
182 }
183 });
184 });
185
186 Ok(())
187 }
188
189 pub(crate) fn subscribe_events(&self) -> broadcast::Receiver<BreezEvent> {
190 self.events_notifier.subscribe()
191 }
192
193 pub(crate) async fn request_backup(&self, req: BackupRequest) -> Result<()> {
194 let request_handler = self.backup_request_sender.lock().await;
195 let h = request_handler.clone();
196 h.ok_or_else(|| anyhow!("No backup request handler found"))?
197 .send(req)
198 .await
199 .map_err(|_| anyhow!("Failed to send backup request, the channel is likely closed"))
200 }
201}
202
203#[derive(Clone)]
205struct BackupWorker {
206 working_dir_path: String,
207 inner: Arc<dyn BackupTransport>,
208 persister: Arc<SqliteStorage>,
209 encryption_key: Vec<u8>,
210 legacy_encryption_key: Vec<u8>,
211 events_notifier: broadcast::Sender<BreezEvent>,
212}
213
214impl BackupWorker {
215 pub(crate) fn new(
216 working_dir_path: String,
217 inner: Arc<dyn BackupTransport>,
218 persister: Arc<SqliteStorage>,
219 encryption_key: Vec<u8>,
220 legacy_encryption_key: Vec<u8>,
221 events_notifier: broadcast::Sender<BreezEvent>,
222 ) -> Self {
223 Self {
224 working_dir_path,
225 inner,
226 persister,
227 encryption_key,
228 legacy_encryption_key,
229 events_notifier,
230 }
231 }
232
233 async fn notify(&self, e: BreezEvent) -> Result<()> {
234 _ = self.events_notifier.send(e);
237 Ok(())
238 }
239
240 async fn sync(&self, force: bool) -> Result<()> {
241 let last_sync_request_id = self.persister.get_last_sync_request()?.unwrap_or_default();
242 if !force && last_sync_request_id == 0 {
244 return Ok(());
245 }
246 let sync_dir = self.sync_dir()?;
247 let notify_res = match self
248 .sync_internal(sync_dir.clone(), last_sync_request_id)
249 .await
250 {
251 Ok(_) => {
252 info!("backup sync completed successfully");
253 self.notify(BreezEvent::BackupSucceeded).await
254 }
255 Err(e) => {
256 error!("backup sync failed {}", e);
257 self.notify(BreezEvent::BackupFailed {
258 details: BackupFailedData {
259 error: e.to_string(),
260 },
261 })
262 .await
263 }
264 };
265 fs::remove_dir_all(Path::new(sync_dir.as_str()))?;
266
267 match notify_res {
268 Ok(r) => Ok(r),
269 Err(e) => {
270 error!("failed to notify backup event {}", e);
271 Err(e)
272 }
273 }
274 }
275
276 async fn sync_internal(&self, sync_dir: String, mut last_sync_request_id: u64) -> Result<()> {
282 let last_version = self.persister.get_last_sync_version()?;
284
285 self.notify(BreezEvent::BackupStarted).await?;
286
287 let local_storage_file = tempfile::NamedTempFile::new_in(sync_dir.clone())?;
289 self.persister.backup(local_storage_file.path())?;
290 debug!(
291 "syncing storge, last_version = {:?}, file = {:?}",
292 last_version,
293 local_storage_file.path()
294 );
295
296 let mut f = File::open(local_storage_file.path())?;
298 let mut data = vec![];
299 f.read_to_end(&mut data)?;
300
301 let optimistic_sync = self.push(last_version, data.clone()).await;
303
304 let sync_result = match optimistic_sync {
305 Ok((new_version, data)) => {
306 info!("Optimistic sync succeeded, new version = {new_version}");
307 Ok((new_version, data))
308 }
309 Err(e) => {
310 info!("Optimistic sync failed, trying to sync remote changes {e}");
311 self.sync_remote_and_push(sync_dir, data, &mut last_sync_request_id)
313 .await
314 }
315 };
316
317 match sync_result {
321 Ok((new_version, new_data)) => {
322 let now = SystemTime::now();
323 self.persister
324 .set_last_sync_version(new_version, &new_data)?;
325 self.persister
326 .delete_sync_requests_up_to(last_sync_request_id)?;
327 self.persister
328 .set_last_backup_time(now.duration_since(UNIX_EPOCH).unwrap().as_secs())?;
329 info!("Sync succeeded");
330 Ok(())
331 }
332 Err(e) => {
333 error!("Sync failed: {}", e);
334 Err(e)
335 }
336 }
337 }
338
339 async fn sync_remote_and_push(
341 &self,
342 sync_dir: String,
343 local_data: Vec<u8>,
344 last_sync_request_id: &mut u64,
345 ) -> Result<(u64, Vec<u8>)> {
346 let remote_state = self.pull().await?;
347 let tmp_dir = tempdir_in(sync_dir)?;
348 let remote_storage_path = tmp_dir.path();
349 let mut remote_storage_file = File::create(remote_storage_path.join("sync_storage.sql"))?;
350 info!("remote_storage_path = {remote_storage_path:?}");
351 match remote_state {
352 Some(state) => {
353 remote_storage_file.write_all(&state.data[..])?;
355 remote_storage_file.flush()?;
356 let remote_storage = SqliteStorage::new(
357 remote_storage_path
358 .as_os_str()
359 .to_str()
360 .unwrap()
361 .to_string(),
362 );
363
364 self.persister
366 .import_remote_changes(&remote_storage, true)?;
367 remote_storage.import_remote_changes(self.persister.as_ref(), false)?;
368 *last_sync_request_id = self.persister.get_last_sync_request()?.unwrap_or_default();
369
370 let mut hex = vec![];
371 remote_storage_file =
372 File::open(Path::new(remote_storage_path).join("sync_storage.sql"))?;
373 remote_storage_file.read_to_end(&mut hex)?;
374
375 let result = self.push(Some(state.generation), hex).await?;
377 Ok(result)
378 }
379
380 None => {
382 debug!("No remote state, pushing local changes");
383 self.push(None, local_data).await
384 }
385 }
386 }
387
388 async fn pull(&self) -> Result<Option<BackupState>> {
389 let state = self.inner.pull().await?;
390 match state {
391 Some(state) => {
392 let mut decrypted =
393 sym_decrypt(self.encryption_key.as_slice(), state.data.as_slice());
394 if decrypted.is_none() {
395 warn!("Failed to decrypt backup with new key, trying legacy key");
396 decrypted =
397 sym_decrypt(self.legacy_encryption_key.as_slice(), state.data.as_slice());
398 }
399 let decrypted_data = decrypted.ok_or(anyhow!("Failed to decrypt backup"))?;
400 match decompress_to_vec_with_limit(&decrypted_data, 4000000) {
401 Ok(decompressed) => Ok(Some(BackupState {
402 generation: state.generation,
403 data: decompressed,
404 })),
405 Err(e) => {
406 error!("Failed to decompress backup: {e}");
407 Ok(None)
408 }
409 }
410 }
411 None => Ok(None),
412 }
413 }
414
415 async fn push(&self, version: Option<u64>, data: Vec<u8>) -> Result<(u64, Vec<u8>)> {
416 let compressed_data = compress_to_vec(&data, 10);
417 info!(
418 "Pushing compressed data with size = {}",
419 compressed_data.len()
420 );
421 let encrypted_data =
422 sym_encrypt(self.encryption_key.as_slice(), compressed_data.as_slice())
423 .ok_or(anyhow!("Failed to encrypt backup"))?;
424 let version = self.inner.push(version, encrypted_data.clone()).await?;
425 Ok((version, encrypted_data))
426 }
427
428 fn sync_dir(&self) -> Result<String> {
429 let working_dir = Path::new(self.working_dir_path.as_str());
430 let buf = working_dir.join("sync");
431 let sync_path = buf.to_str();
432 let path_str = match sync_path {
433 Some(sync_path) => {
434 if !Path::new(sync_path).exists() {
435 fs::create_dir_all(sync_path)?;
436 }
437 Ok(sync_path)
438 }
439 None => Err(anyhow!("Failed to create sync directory")),
440 }?;
441 Ok(path_str.into())
442 }
443}
444
445#[cfg(test)]
446mod tests {
447 use crate::test_utils::get_test_ofp_48h;
448 use crate::ListSwapsRequest;
449 use crate::{
450 backup::BackupRequest,
451 persist::db::SqliteStorage,
452 test_utils::{create_test_config, create_test_persister, MockBackupTransport},
453 BreezEvent, SwapInfo,
454 };
455 use std::{sync::Arc, vec};
456 use tokio::sync::{broadcast::Receiver, watch};
457 use tokio::{
458 spawn,
459 time::{Duration, Instant},
460 };
461
462 use super::BackupWatcher;
463
464 async fn create_test_backup_watcher(
465 ) -> (watch::Sender<()>, BackupWatcher, Arc<MockBackupTransport>) {
466 let config = create_test_config();
467 let persister = Arc::new(create_test_persister(config.clone()));
468 persister.init().unwrap();
469 let transport = Arc::new(MockBackupTransport::new());
470 let watcher = BackupWatcher::new(
471 config,
472 transport.clone(),
473 persister,
474 vec![0; 32],
475 vec![0; 32],
476 );
477 let (quit_sender, receiver) = watch::channel(());
478 watcher.start(receiver).await.unwrap();
479 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
480 (quit_sender, watcher, transport)
481 }
482
483 async fn test_expected_backup_events(
484 mut subscription: Receiver<BreezEvent>,
485 transport: Arc<MockBackupTransport>,
486 expected_events: Vec<BreezEvent>,
487 expected_pushed: u32,
488 expected_pulls: u32,
489 ) {
490 let start = Instant::now() + Duration::from_millis(20000);
491 let mut interval = tokio::time::interval_at(start, Duration::from_secs(3));
492 let mut events = vec![];
493 loop {
494 tokio::select! {
495 _ = interval.tick() => {
496 panic!("Timed out waiting for events");
497 }
498 r = subscription.recv() => {
499 match r {
500 Ok(event) => {
501 events.push(event);
502 if events.len() == expected_events.len() {
503 assert_eq!(events, expected_events);
504 assert_eq!(transport.pulled(), expected_pulls);
505 assert_eq!(transport.pushed(), expected_pushed);
506 return;
507 }
508 }
509 Err(e) => {
510 panic!("Failed to receive event: {}", e);
511 }
512 }
513 }
514 }
515 }
516 }
517
518 #[tokio::test]
520 async fn test_start() {
521 let (quit_sender, _, _) = create_test_backup_watcher().await;
522 quit_sender.send(()).unwrap();
523 quit_sender.closed().await;
524 }
525
526 #[tokio::test]
528 async fn test_optimistic() {
529 let (quit_sender, watcher, transport) = create_test_backup_watcher().await;
530 let subscription = watcher.subscribe_events();
531 let expected_events = vec![
532 BreezEvent::BackupStarted,
533 BreezEvent::BackupSucceeded,
534 BreezEvent::BackupStarted,
535 BreezEvent::BackupSucceeded,
536 ];
537
538 let task_subscription = watcher.subscribe_events();
539 tokio::spawn(async move {
542 watcher
543 .request_backup(BackupRequest::new(true))
544 .await
545 .unwrap();
546 wait_for_backup_success(task_subscription).await;
547 watcher
548 .request_backup(BackupRequest::new(true))
549 .await
550 .unwrap();
551 });
552 test_expected_backup_events(subscription, transport, expected_events, 2, 0).await;
553 _ = quit_sender.send(());
554 quit_sender.closed().await;
555 }
556
557 #[tokio::test]
559 async fn test_remote_not_exist() {
560 let (quit_sender, watcher, transport) = create_test_backup_watcher().await;
561 let subscription = watcher.subscribe_events();
562 let expected_events = vec![
563 BreezEvent::BackupStarted,
564 BreezEvent::BackupSucceeded,
565 BreezEvent::BackupStarted,
566 BreezEvent::BackupSucceeded,
567 ];
568
569 let persister = watcher.persister.clone();
570 let task_subscription = watcher.subscribe_events();
571 tokio::spawn(async move {
572 let subscription = task_subscription.resubscribe();
573 watcher
574 .request_backup(BackupRequest::new(true))
575 .await
576 .unwrap();
577 wait_for_backup_success(subscription).await;
578 persister.set_last_sync_version(10, &vec![]).unwrap();
579 watcher
580 .request_backup(BackupRequest::new(true))
581 .await
582 .unwrap();
583 });
584 test_expected_backup_events(subscription, transport, expected_events, 3, 1).await;
585 _ = quit_sender.send(());
586 quit_sender.closed().await;
587 }
588
589 #[tokio::test]
591 async fn test_local_newer_than_remote() {
592 let (quit_sender, watcher, transport) = create_test_backup_watcher().await;
593 let subscription = watcher.subscribe_events();
594 let expected_events = vec![
595 BreezEvent::BackupStarted,
596 BreezEvent::BackupSucceeded,
597 BreezEvent::BackupStarted,
598 BreezEvent::BackupSucceeded,
599 ];
600
601 let persister = watcher.persister.clone();
602 let task_subscription = watcher.subscribe_events();
603 tokio::spawn(async move {
604 let subscription = task_subscription.resubscribe();
605 watcher
606 .request_backup(BackupRequest::new(true))
607 .await
608 .unwrap();
609 wait_for_backup_success(subscription).await;
610 persister.set_last_sync_version(10, &vec![]).unwrap();
611 watcher
612 .request_backup(BackupRequest::new(true))
613 .await
614 .unwrap();
615 });
616 test_expected_backup_events(subscription, transport, expected_events, 3, 1).await;
617 _ = quit_sender.send(());
618 quit_sender.closed().await;
619 }
620
621 #[tokio::test]
623 async fn test_versions_history() {
624 let (quit_sender, watcher, transport) = create_test_backup_watcher().await;
625 let subscription = watcher.subscribe_events();
626 let expected_events = vec![
627 BreezEvent::BackupStarted,
628 BreezEvent::BackupSucceeded,
629 BreezEvent::BackupStarted,
630 BreezEvent::BackupSucceeded,
631 BreezEvent::BackupStarted,
632 BreezEvent::BackupSucceeded,
633 ];
634
635 let task_subscription = watcher.subscribe_events();
636 let persister = watcher.persister.clone();
637 tokio::spawn(async move {
638 for _ in 0..3 {
639 let subscription = task_subscription.resubscribe();
640 watcher
641 .request_backup(BackupRequest::new(true))
642 .await
643 .unwrap();
644 wait_for_backup_success(subscription).await;
645 }
646 });
647 test_expected_backup_events(subscription, transport, expected_events, 3, 0).await;
648 let history = persister.sync_versions_history().unwrap();
649 assert_eq!(history.len(), 3);
650 _ = quit_sender.send(());
651 quit_sender.closed().await;
652 }
653
654 #[tokio::test]
656 async fn test_limit_versions_history() {
657 let (quit_sender, watcher, transport) = create_test_backup_watcher().await;
658 let subscription = watcher.subscribe_events();
659 let mut expected_events = vec![];
660 for _ in 0..30 {
661 expected_events.push(BreezEvent::BackupStarted);
662 expected_events.push(BreezEvent::BackupSucceeded);
663 }
664
665 let task_subscription = watcher.subscribe_events();
666 let persister = watcher.persister.clone();
667 tokio::spawn(async move {
668 for _ in 0..30 {
669 let subscription = task_subscription.resubscribe();
670 watcher
671 .request_backup(BackupRequest::new(true))
672 .await
673 .unwrap();
674 wait_for_backup_success(subscription).await;
675 }
676 });
677 test_expected_backup_events(subscription, transport, expected_events, 30, 0).await;
678 let history = persister.sync_versions_history().unwrap();
679 assert_eq!(history.len(), 20);
680 _ = quit_sender.send(());
681 quit_sender.closed().await;
682 }
683
684 #[tokio::test]
686 async fn test_sync_triggers() {
687 let (quit_sender, watcher, transport) = create_test_backup_watcher().await;
688 let subscription = watcher.subscribe_events();
689
690 let mut expected_events = vec![];
691 for _ in 0..1 {
692 expected_events.push(BreezEvent::BackupStarted);
693 expected_events.push(BreezEvent::BackupSucceeded);
694 }
695
696 let persister = watcher.persister.clone();
697 spawn(async move {
698 populate_sync_table(persister.clone());
700 });
701
702 test_expected_backup_events(subscription, transport, expected_events, 1, 0).await;
703 let history = watcher.persister.sync_versions_history().unwrap();
704 assert_eq!(history.len(), 1);
705 _ = quit_sender.send(());
706 quit_sender.closed().await;
707 }
708
709 #[tokio::test]
718 async fn test_trigger_during_sync() {
719 let (quit_sender, watcher, transport) = create_test_backup_watcher().await;
720 let persister = watcher.persister.clone();
721
722 let mut expected_events = vec![];
723 for _ in 0..2 {
724 expected_events.push(BreezEvent::BackupStarted);
725 expected_events.push(BreezEvent::BackupSucceeded);
726 }
727
728 let main_subscription = watcher.subscribe_events();
729 let task_subscription = watcher.subscribe_events();
730 let task_subscription1 = task_subscription.resubscribe();
731 let cloned_persister = watcher.persister.clone();
732 tokio::spawn(async move {
733 populate_sync_table(persister.clone());
735 wait_for_backup_success(task_subscription1).await;
736 let task_subscription2 = task_subscription.resubscribe();
738
739 persister.set_last_sync_version(10, &vec![]).unwrap();
740 persister
742 .get_connection()
743 .unwrap()
744 .execute(
745 "delete from sync.swaps; delete from sync.reverse_swaps; delete from sync.payment_external_info;",
746 [],
747 )
748 .unwrap();
749 watcher
750 .request_backup(BackupRequest::new(true))
751 .await
752 .unwrap();
753 wait_for_backup_success(task_subscription2).await;
754 });
755 test_expected_backup_events(main_subscription, transport, expected_events, 3, 1).await;
756 let swaps = cloned_persister
757 .list_swaps(ListSwapsRequest::default())
758 .unwrap();
759 assert!(swaps.len() == 1);
760 _ = quit_sender.send(());
761 quit_sender.closed().await;
762 }
763
764 fn populate_sync_table(persister: Arc<SqliteStorage>) {
765 let tested_swap_info = SwapInfo {
766 bitcoin_address: String::from("1"),
767 created_at: 0,
768 lock_height: 100,
769 payment_hash: vec![1],
770 preimage: vec![2],
771 private_key: vec![3],
772 public_key: vec![4],
773 swapper_public_key: vec![5],
774 script: vec![5],
775 bolt11: None,
776 paid_msat: 0,
777 unconfirmed_sats: 0,
778 confirmed_sats: 0,
779 total_incoming_txs: 0,
780 status: crate::models::SwapStatus::Initial,
781 refund_tx_ids: Vec::new(),
782 unconfirmed_tx_ids: Vec::new(),
783 confirmed_tx_ids: Vec::new(),
784 min_allowed_deposit: 0,
785 max_allowed_deposit: 100,
786 max_swapper_payable: 200,
787 last_redeem_error: None,
788 channel_opening_fees: Some(get_test_ofp_48h(1, 1).into()),
789 confirmed_at: Some(555),
790 };
791 persister.insert_swap(tested_swap_info).unwrap();
792 }
793
794 async fn wait_for_backup_success(mut subscription: Receiver<BreezEvent>) {
795 while let Ok(event) = subscription.recv().await {
796 if event == BreezEvent::BackupSucceeded {
797 return;
798 }
799 }
800 }
801}