breez_sdk_core/persist/
sync.rs

1use crate::ReverseSwapStatus;
2
3use super::{db::SqliteStorage, error::PersistResult};
4use rusqlite::{named_params, Row, Transaction, TransactionBehavior};
5use std::path::Path;
6
7#[allow(dead_code)]
8pub(crate) struct SyncVersion {
9    pub created_at: String,
10    pub last_version: u64,
11    pub data: Vec<u8>,
12}
13
14impl SqliteStorage {
15    pub(crate) fn backup<P: AsRef<Path>>(&self, dst_path: P) -> PersistResult<()> {
16        Ok(self.get_connection()?.backup(
17            rusqlite::DatabaseName::Attached("sync"),
18            dst_path,
19            None,
20        )?)
21    }
22
23    pub(crate) fn get_last_sync_version(&self) -> PersistResult<Option<u64>> {
24        Ok(self.get_connection()?.query_row(
25            "SELECT max(last_version) FROM sync_versions",
26            [],
27            |row| row.get::<usize, Option<u64>>(0),
28        )?)
29    }
30
31    pub(crate) fn set_last_sync_version(
32        &self,
33        last_version: u64,
34        data: &Vec<u8>,
35    ) -> PersistResult<()> {
36        let con = self.get_connection()?;
37
38        // make sure we have no more than 20 history entries
39        con.execute("Delete from sync_versions where last_version not in (select last_version from sync_versions order by created_at desc limit 19);", [])?;
40        con.execute(
41            "INSERT OR REPLACE INTO sync_versions (last_version, data) VALUES (?1, ?2);",
42            (last_version, data),
43        )?;
44
45        Ok(())
46    }
47
48    #[allow(dead_code)]
49    pub(crate) fn sync_versions_history(&self) -> PersistResult<Vec<SyncVersion>> {
50        let con = self.get_connection()?;
51        let mut stmt = con.prepare(
52            "SELECT created_at, last_version, data FROM sync_versions ORDER BY created_at DESC;",
53        )?;
54
55        let vec: Vec<SyncVersion> = stmt
56            .query_map([], |row| self.sql_row_to_sync_version(row))?
57            .map(|i| i.unwrap())
58            .collect();
59
60        Ok(vec)
61    }
62
63    fn sql_row_to_sync_version(&self, row: &Row) -> PersistResult<SyncVersion, rusqlite::Error> {
64        let version = SyncVersion {
65            created_at: row.get(0)?,
66            last_version: row.get(1)?,
67            data: row.get(2)?,
68        };
69
70        Ok(version)
71    }
72
73    pub fn get_last_sync_request(&self) -> PersistResult<Option<u64>> {
74        let res: rusqlite::Result<Option<u64>> =
75            self.get_connection()?
76                .query_row("SELECT max(id) FROM sync.sync_requests", [], |row| {
77                    row.get::<usize, Option<u64>>(0)
78                });
79        Ok(res?)
80    }
81
82    pub(crate) fn delete_sync_requests_up_to(&self, request_id: u64) -> PersistResult<()> {
83        self.get_connection()?.execute(
84            "DELETE FROM sync.sync_requests where id <= ?1",
85            [request_id],
86        )?;
87        Ok(())
88    }
89
90    pub(crate) fn import_remote_changes(
91        &self,
92        remote_storage: &SqliteStorage,
93        to_local: bool,
94    ) -> PersistResult<()> {
95        let sync_data_file = remote_storage.sync_db_path();
96        match SqliteStorage::migrate_sync_db(sync_data_file.clone()) {
97            Ok(_) => {}
98            Err(e) => {
99                log::error!("Failed to migrate sync db, probably local db is older than remote, skipping migration: {}", e);
100            }
101        }
102
103        let mut con = self.get_connection()?;
104        let tx = con.transaction_with_behavior(TransactionBehavior::Immediate)?;
105        tx.execute("ATTACH DATABASE ? AS remote_sync;", [sync_data_file])?;
106
107        if to_local {
108            tx.execute(
109                "
110            INSERT OR IGNORE INTO swaps_info (bitcoin_address, unconfirmed_tx_ids, confirmed_tx_ids)
111                SELECT
112                    bitcoin_address, '[]', '[]'
113                FROM remote_sync.swaps;",
114                [],
115            )?;
116
117            tx.execute(
118                "
119            INSERT OR IGNORE INTO reverse_swaps_info (id, status)
120                SELECT
121                    id, :status
122                FROM remote_sync.reverse_swaps;",
123                named_params! {
124                    ":status": serde_json::to_value(ReverseSwapStatus::Initial)?
125                },
126            )?;
127        }
128
129        // sync remote swaps table
130        tx.execute(
131            "
132        INSERT INTO sync.swaps 
133          SELECT
134           bitcoin_address,
135           created_at,
136           lock_height,
137           payment_hash,
138           preimage,
139           private_key,
140           public_key,
141           swapper_public_key,
142           script,
143           min_allowed_deposit,
144           max_allowed_deposit,
145           max_swapper_payable
146          FROM remote_sync.swaps
147          WHERE bitcoin_address NOT IN (SELECT bitcoin_address FROM sync.swaps);",
148            [],
149        )?;
150
151        // sync remote swap_refunds table
152        tx.execute(
153            "
154        INSERT OR IGNORE INTO sync.swap_refunds
155         SELECT
156          bitcoin_address,
157          refund_tx_id
158         FROM remote_sync.swap_refunds;",
159            [],
160        )?;
161
162        // sync remote payments_external_info table
163        tx.execute(
164            "
165             INSERT into sync.payments_external_info
166             SELECT
167              payment_id,
168              lnurl_success_action,
169              ln_address,
170              lnurl_metadata,
171              lnurl_withdraw_endpoint,
172              attempted_amount_msat,
173              attempted_error,
174              lnurl_pay_domain,
175              lnurl_pay_comment
176             FROM remote_sync.payments_external_info
177             WHERE payment_id NOT IN (SELECT payment_id FROM sync.payments_external_info);",
178            [],
179        )?;
180
181        // sync remote payments_metadata table
182        tx.execute(
183            "
184             INSERT OR REPLACE INTO sync.payments_metadata
185             SELECT
186              remote_sync.payments_metadata.payment_id,
187              remote_sync.payments_metadata.metadata,
188              remote_sync.payments_metadata.updated_at
189             FROM remote_sync.payments_metadata
190             LEFT JOIN sync.payments_metadata 
191             ON sync.payments_metadata.payment_id = remote_sync.payments_metadata.payment_id
192             WHERE
193              remote_sync.payments_metadata.updated_at > sync.payments_metadata.updated_at;",
194            [],
195        )?;
196
197        // sync remote reverse_swaps table
198        tx.execute(
199            "
200        INSERT into sync.reverse_swaps
201        SELECT
202         id,
203         created_at_block_height,
204         preimage,
205         private_key,
206         claim_pubkey,
207         timeout_block_height,
208         invoice,
209         onchain_amount_sat,
210         sat_per_vbyte,
211         receive_amount_sat,
212         redeem_script
213        FROM remote_sync.reverse_swaps
214        WHERE id NOT IN (SELECT id FROM sync.reverse_swaps);",
215            [],
216        )?;
217
218        // sync remote swap_refunds table
219        tx.execute(
220            "
221        INSERT INTO sync.open_channel_payment_info
222         SELECT
223          payment_hash,
224          payer_amount_msat,
225          open_channel_bolt11
226         FROM remote_sync.open_channel_payment_info
227         WHERE payment_hash NOT IN (SELECT payment_hash FROM sync.open_channel_payment_info);",
228            [],
229        )?;
230
231        // Sync remote swaps_fees table, which contains dynamic fees used in swaps
232        // created_at is used to settle conflicts, since we assume small variations in the client local times
233        Self::sync_swaps_fees_local(&tx)?;
234
235        tx.commit()?;
236        con.execute("DETACH DATABASE remote_sync", [])?;
237
238        Ok(())
239    }
240
241    /// Insert or update to local db all rows that have created_at larger than in the local
242    fn sync_swaps_fees_local(tx: &Transaction) -> PersistResult<()> {
243        // The WHERE clause covers both possible scenarios for the swaps_fees table:
244        // - Local DB doesn't have a row matching a remote DB row with the same swap address
245        //   - checked via `sync.swaps_fees.created_at IS NULL`
246        //   - `created_at` is NOT NULL in the schema, so matching this means finding an address for which no local DB row exists
247        // - Local and remote DBs have a row for the same swap address and remote crated_at > local created_at
248        tx.execute(
249            "
250        INSERT OR REPLACE INTO sync.swaps_fees
251         SELECT
252          remote_sync.swaps_fees.bitcoin_address as bitcoin_address,
253          remote_sync.swaps_fees.created_at as created_at,
254          remote_sync.swaps_fees.channel_opening_fees as channel_opening_fees
255         FROM remote_sync.swaps_fees
256          LEFT JOIN sync.swaps_fees ON sync.swaps_fees.bitcoin_address = remote_sync.swaps_fees.bitcoin_address
257         WHERE
258          sync.swaps_fees.created_at IS NULL OR remote_sync.swaps_fees.created_at > sync.swaps_fees.created_at
259         ;",
260            [],
261        )?;
262
263        Ok(())
264    }
265}
266
267#[cfg(test)]
268mod tests {
269    use anyhow::anyhow;
270    use rand::random;
271    use std::time::Duration;
272
273    use crate::persist::db::SqliteStorage;
274    use crate::persist::error::PersistResult;
275    use crate::persist::test_utils;
276    use crate::test_utils::{get_test_ofp_48h, rand_string, rand_vec_u8};
277    use crate::{ListSwapsRequest, SwapInfo};
278
279    #[test]
280    fn test_sync() -> PersistResult<()> {
281        let local_storage = SqliteStorage::new(test_utils::create_test_sql_dir());
282        local_storage.init()?;
283
284        let local_swap_info = create_test_swap_info();
285        local_storage.insert_swap(local_swap_info.clone())?;
286
287        let mut remote_swap_info = local_swap_info;
288        remote_swap_info.bitcoin_address = "2".into();
289        remote_swap_info.script = vec![6];
290        remote_swap_info.swapper_public_key = vec![6];
291        remote_swap_info.public_key = vec![6];
292        remote_swap_info.preimage = vec![6];
293        remote_swap_info.payment_hash = vec![6];
294        remote_swap_info.private_key = vec![6];
295
296        let remote_storage = SqliteStorage::new(test_utils::create_test_sql_dir());
297        remote_storage.init()?;
298        remote_storage.insert_swap(remote_swap_info)?;
299
300        remote_storage.insert_open_channel_payment_info("123", 100000, "")?;
301
302        remote_storage.import_remote_changes(&local_storage, false)?;
303        local_storage.import_remote_changes(&remote_storage, true)?;
304
305        let mut local_swaps = local_storage.list_swaps(ListSwapsRequest::default())?;
306        local_swaps.sort_by(|s1, s2| s1.bitcoin_address.cmp(&s2.bitcoin_address));
307
308        let mut remote_swaps = local_storage.list_swaps(ListSwapsRequest::default())?;
309        remote_swaps.sort_by(|s1, s2| s1.bitcoin_address.cmp(&s2.bitcoin_address));
310
311        assert_eq!(local_swaps, remote_swaps);
312        assert_eq!(local_swaps.len(), 2);
313
314        local_storage.set_last_sync_version(10, &vec![])?;
315        let version = local_storage.get_last_sync_version()?.unwrap();
316        assert_eq!(version, 10);
317
318        Ok(())
319    }
320
321    #[tokio::test]
322    async fn test_sync_swaps_update_swap_fees() -> PersistResult<()> {
323        let local_storage = SqliteStorage::new(test_utils::create_test_sql_dir());
324        local_storage.init()?;
325
326        // Swap is created with initial dynamic fee
327        let local_swap_info = create_test_swap_info();
328        local_storage.insert_swap(local_swap_info.clone())?;
329
330        // Sleep to cause a change in created_at
331        tokio::time::sleep(Duration::from_secs(2)).await;
332
333        // Swap address is re-used later with different (newer) dynamic fee
334        let new_fees: crate::OpeningFeeParams = get_test_ofp_48h(10, 10).into();
335        local_storage.update_swap_fees(local_swap_info.bitcoin_address, new_fees.clone())?;
336
337        let local_swaps = local_storage.list_swaps(ListSwapsRequest::default())?;
338        assert_eq!(local_swaps.len(), 1);
339        assert_eq!(
340            local_swaps
341                .first()
342                .ok_or_else(|| anyhow!("No element found"))?
343                .channel_opening_fees,
344            Some(new_fees)
345        );
346
347        Ok(())
348    }
349
350    #[tokio::test]
351    async fn test_sync_swaps_fees_local_vs_remote() -> PersistResult<()> {
352        let local_storage = SqliteStorage::new(test_utils::create_test_sql_dir());
353        local_storage.init()?;
354
355        let remote_storage = SqliteStorage::new(test_utils::create_test_sql_dir());
356        remote_storage.init()?;
357
358        // Created locally: Swaps L1, L2, L3
359        let pre_sync_l1 = create_test_swap_info();
360        let pre_sync_l2 = create_test_swap_info();
361        let pre_sync_l3 = create_test_swap_info();
362
363        // Create remote swaps R1 and R2 (clones of L1 and L2, but with different fees and created_at)
364        let mut pre_sync_r1 = pre_sync_l1.clone();
365        pre_sync_r1.channel_opening_fees = Some(get_test_ofp_48h(10, 10).into());
366        let mut pre_sync_r2 = pre_sync_l2.clone();
367        pre_sync_r2.channel_opening_fees = Some(get_test_ofp_48h(15, 15).into());
368
369        // Swaps are inserted 1 second apart from each other, because the swaps_fees rows include a local timestamp on insertion
370        // Swaps are inserted in this order:
371        // - Remote swap R1
372        // - Local swap L1  (created_at +1s)
373        // - Local swap L2  (created_at +1s)
374        // - Local swap L3  (created_at +1s)
375        // - Remote swap R2 (created_at +1s)
376        remote_storage.insert_swap(pre_sync_r1.clone())?; // R1
377        tokio::time::sleep(Duration::from_secs(1)).await;
378        local_storage.insert_swap(pre_sync_l1.clone())?; // L1
379        tokio::time::sleep(Duration::from_secs(1)).await;
380        local_storage.insert_swap(pre_sync_l2.clone())?; // L2
381        tokio::time::sleep(Duration::from_secs(1)).await;
382        local_storage.insert_swap(pre_sync_l3.clone())?; // L3
383        tokio::time::sleep(Duration::from_secs(1)).await;
384        remote_storage.insert_swap(pre_sync_r2.clone())?; // R2
385
386        // The swap fees created_at are in this order: R1 < L1 < L2 < L3 < R2
387
388        // As a result:
389        // - R1 fee created_at < L1 fee created_at => sync should NOT replace the local version
390        // - R2 fee created_at > L2 fee created_at => sync should replace the local version
391        // - R3 should be created (mirror of L3) because it doesn't exist on remote
392
393        let local_swaps = local_storage.list_swaps(ListSwapsRequest::default())?;
394        let remote_swaps = remote_storage.list_swaps(ListSwapsRequest::default())?;
395        assert_eq!(local_swaps.len(), 3);
396        assert_eq!(remote_swaps.len(), 2); // Before the sync, only 2 swaps in remote
397
398        // Update local DB based on remote (sync)
399        remote_storage.import_remote_changes(&local_storage, false)?;
400        local_storage.import_remote_changes(&remote_storage, true)?;
401
402        let local_swaps = local_storage.list_swaps(ListSwapsRequest::default())?;
403        let remote_swaps = remote_storage.list_swaps(ListSwapsRequest::default())?;
404        assert_eq!(local_swaps.len(), 3);
405        assert_eq!(remote_swaps.len(), 3); // After the sync, all 3 swaps in remote
406
407        let post_sync_l1 = local_swaps
408            .iter()
409            .find(|s| s.bitcoin_address == pre_sync_l1.bitcoin_address)
410            .ok_or_else(|| anyhow!("L1 swaps_fees row lost from local DB after sync"))?;
411        let post_sync_l2 = local_swaps
412            .iter()
413            .find(|s| s.bitcoin_address == pre_sync_l2.bitcoin_address)
414            .ok_or_else(|| anyhow!("L2 swaps_fees row lost from local DB after sync"))?;
415        let post_sync_l3 = local_swaps
416            .iter()
417            .find(|s| s.bitcoin_address == pre_sync_l3.bitcoin_address)
418            .ok_or_else(|| anyhow!("L3 swaps_fees row lost from local DB after sync"))?;
419        let post_sync_r1 = remote_swaps
420            .iter()
421            .find(|s| s.bitcoin_address == pre_sync_r1.bitcoin_address)
422            .ok_or_else(|| anyhow!("R1 swaps_fees row lost from remote DB after sync"))?;
423        let post_sync_r2 = remote_swaps
424            .iter()
425            .find(|s| s.bitcoin_address == pre_sync_r2.bitcoin_address)
426            .ok_or_else(|| anyhow!("R2 swaps_fees row lost from remote DB after sync"))?;
427        let post_sync_r3 = remote_swaps
428            .iter()
429            .find(|s| s.bitcoin_address == pre_sync_l3.bitcoin_address)
430            .ok_or_else(|| anyhow!("No R3 swap info found in remote DB after sync"))?;
431
432        // L1 fees were NOT updated to R1 fees (R1 fees created_at < L1 fees created_at)
433        assert_ne!(post_sync_l1, &pre_sync_r1);
434        // L1 fees remain as they were before the sync
435        assert_eq!(post_sync_l1, &pre_sync_l1);
436        // L1 and R1 are in sync (L1 overwrote R1)
437        assert_eq!(post_sync_l1, post_sync_r1);
438        assert_ne!(post_sync_r1, &pre_sync_r1);
439
440        // L2 fees were replaced by the R2 fees (R2 fees created_at > L2 fees created_at)
441        assert_eq!(post_sync_l2, post_sync_r2);
442        assert_ne!(post_sync_l2, &pre_sync_l2);
443
444        // L3 and R3 are in sync (there was no R3 before the sync, now R3 = L3)
445        assert_eq!(post_sync_l3, post_sync_r3);
446        assert_eq!(post_sync_l3, &pre_sync_l3);
447
448        Ok(())
449    }
450
451    fn create_test_swap_info() -> SwapInfo {
452        SwapInfo {
453            bitcoin_address: rand_string(10),
454            created_at: 10,
455            lock_height: random(),
456            payment_hash: rand_vec_u8(10),
457            preimage: rand_vec_u8(10),
458            private_key: rand_vec_u8(10),
459            public_key: rand_vec_u8(10),
460            swapper_public_key: rand_vec_u8(10),
461            script: rand_vec_u8(10),
462            bolt11: None,
463            paid_msat: 0,
464            unconfirmed_sats: 0,
465            confirmed_sats: 0,
466            total_incoming_txs: 0,
467            status: crate::models::SwapStatus::Initial,
468            refund_tx_ids: Vec::new(),
469            unconfirmed_tx_ids: Vec::new(),
470            confirmed_tx_ids: Vec::new(),
471            min_allowed_deposit: 0,
472            max_allowed_deposit: 100,
473            max_swapper_payable: 200,
474            last_redeem_error: None,
475            channel_opening_fees: Some(get_test_ofp_48h(random(), random()).into()),
476            confirmed_at: None,
477        }
478    }
479}