1use crate::ReverseSwapStatus;
2
3use super::{db::SqliteStorage, error::PersistResult};
4use rusqlite::{named_params, Row, Transaction, TransactionBehavior};
5use std::path::Path;
6
7#[allow(dead_code)]
8pub(crate) struct SyncVersion {
9 pub created_at: String,
10 pub last_version: u64,
11 pub data: Vec<u8>,
12}
13
14impl SqliteStorage {
15 pub(crate) fn backup<P: AsRef<Path>>(&self, dst_path: P) -> PersistResult<()> {
16 Ok(self.get_connection()?.backup(
17 rusqlite::DatabaseName::Attached("sync"),
18 dst_path,
19 None,
20 )?)
21 }
22
23 pub(crate) fn get_last_sync_version(&self) -> PersistResult<Option<u64>> {
24 Ok(self.get_connection()?.query_row(
25 "SELECT max(last_version) FROM sync_versions",
26 [],
27 |row| row.get::<usize, Option<u64>>(0),
28 )?)
29 }
30
31 pub(crate) fn set_last_sync_version(
32 &self,
33 last_version: u64,
34 data: &Vec<u8>,
35 ) -> PersistResult<()> {
36 let con = self.get_connection()?;
37
38 con.execute("Delete from sync_versions where last_version not in (select last_version from sync_versions order by created_at desc limit 19);", [])?;
40 con.execute(
41 "INSERT OR REPLACE INTO sync_versions (last_version, data) VALUES (?1, ?2);",
42 (last_version, data),
43 )?;
44
45 Ok(())
46 }
47
48 #[allow(dead_code)]
49 pub(crate) fn sync_versions_history(&self) -> PersistResult<Vec<SyncVersion>> {
50 let con = self.get_connection()?;
51 let mut stmt = con.prepare(
52 "SELECT created_at, last_version, data FROM sync_versions ORDER BY created_at DESC;",
53 )?;
54
55 let vec: Vec<SyncVersion> = stmt
56 .query_map([], |row| self.sql_row_to_sync_version(row))?
57 .map(|i| i.unwrap())
58 .collect();
59
60 Ok(vec)
61 }
62
63 fn sql_row_to_sync_version(&self, row: &Row) -> PersistResult<SyncVersion, rusqlite::Error> {
64 let version = SyncVersion {
65 created_at: row.get(0)?,
66 last_version: row.get(1)?,
67 data: row.get(2)?,
68 };
69
70 Ok(version)
71 }
72
73 pub fn get_last_sync_request(&self) -> PersistResult<Option<u64>> {
74 let res: rusqlite::Result<Option<u64>> =
75 self.get_connection()?
76 .query_row("SELECT max(id) FROM sync.sync_requests", [], |row| {
77 row.get::<usize, Option<u64>>(0)
78 });
79 Ok(res?)
80 }
81
82 pub(crate) fn delete_sync_requests_up_to(&self, request_id: u64) -> PersistResult<()> {
83 self.get_connection()?.execute(
84 "DELETE FROM sync.sync_requests where id <= ?1",
85 [request_id],
86 )?;
87 Ok(())
88 }
89
90 pub(crate) fn import_remote_changes(
91 &self,
92 remote_storage: &SqliteStorage,
93 to_local: bool,
94 ) -> PersistResult<()> {
95 let sync_data_file = remote_storage.sync_db_path();
96 match SqliteStorage::migrate_sync_db(sync_data_file.clone()) {
97 Ok(_) => {}
98 Err(e) => {
99 log::error!("Failed to migrate sync db, probably local db is older than remote, skipping migration: {}", e);
100 }
101 }
102
103 let mut con = self.get_connection()?;
104 let tx = con.transaction_with_behavior(TransactionBehavior::Immediate)?;
105 tx.execute("ATTACH DATABASE ? AS remote_sync;", [sync_data_file])?;
106
107 if to_local {
108 tx.execute(
109 "
110 INSERT OR IGNORE INTO swaps_info (bitcoin_address, unconfirmed_tx_ids, confirmed_tx_ids)
111 SELECT
112 bitcoin_address, '[]', '[]'
113 FROM remote_sync.swaps;",
114 [],
115 )?;
116
117 tx.execute(
118 "
119 INSERT OR IGNORE INTO reverse_swaps_info (id, status)
120 SELECT
121 id, :status
122 FROM remote_sync.reverse_swaps;",
123 named_params! {
124 ":status": serde_json::to_value(ReverseSwapStatus::Initial)?
125 },
126 )?;
127 }
128
129 tx.execute(
131 "
132 INSERT INTO sync.swaps
133 SELECT
134 bitcoin_address,
135 created_at,
136 lock_height,
137 payment_hash,
138 preimage,
139 private_key,
140 public_key,
141 swapper_public_key,
142 script,
143 min_allowed_deposit,
144 max_allowed_deposit,
145 max_swapper_payable
146 FROM remote_sync.swaps
147 WHERE bitcoin_address NOT IN (SELECT bitcoin_address FROM sync.swaps);",
148 [],
149 )?;
150
151 tx.execute(
153 "
154 INSERT OR IGNORE INTO sync.swap_refunds
155 SELECT
156 bitcoin_address,
157 refund_tx_id
158 FROM remote_sync.swap_refunds;",
159 [],
160 )?;
161
162 tx.execute(
164 "
165 INSERT into sync.payments_external_info
166 SELECT
167 payment_id,
168 lnurl_success_action,
169 ln_address,
170 lnurl_metadata,
171 lnurl_withdraw_endpoint,
172 attempted_amount_msat,
173 attempted_error,
174 lnurl_pay_domain,
175 lnurl_pay_comment
176 FROM remote_sync.payments_external_info
177 WHERE payment_id NOT IN (SELECT payment_id FROM sync.payments_external_info);",
178 [],
179 )?;
180
181 tx.execute(
183 "
184 INSERT OR REPLACE INTO sync.payments_metadata
185 SELECT
186 remote_sync.payments_metadata.payment_id,
187 remote_sync.payments_metadata.metadata,
188 remote_sync.payments_metadata.updated_at
189 FROM remote_sync.payments_metadata
190 LEFT JOIN sync.payments_metadata
191 ON sync.payments_metadata.payment_id = remote_sync.payments_metadata.payment_id
192 WHERE
193 remote_sync.payments_metadata.updated_at > sync.payments_metadata.updated_at;",
194 [],
195 )?;
196
197 tx.execute(
199 "
200 INSERT into sync.reverse_swaps
201 SELECT
202 id,
203 created_at_block_height,
204 preimage,
205 private_key,
206 claim_pubkey,
207 timeout_block_height,
208 invoice,
209 onchain_amount_sat,
210 sat_per_vbyte,
211 receive_amount_sat,
212 redeem_script
213 FROM remote_sync.reverse_swaps
214 WHERE id NOT IN (SELECT id FROM sync.reverse_swaps);",
215 [],
216 )?;
217
218 tx.execute(
220 "
221 INSERT INTO sync.open_channel_payment_info
222 SELECT
223 payment_hash,
224 payer_amount_msat,
225 open_channel_bolt11
226 FROM remote_sync.open_channel_payment_info
227 WHERE payment_hash NOT IN (SELECT payment_hash FROM sync.open_channel_payment_info);",
228 [],
229 )?;
230
231 Self::sync_swaps_fees_local(&tx)?;
234
235 tx.commit()?;
236 con.execute("DETACH DATABASE remote_sync", [])?;
237
238 Ok(())
239 }
240
241 fn sync_swaps_fees_local(tx: &Transaction) -> PersistResult<()> {
243 tx.execute(
249 "
250 INSERT OR REPLACE INTO sync.swaps_fees
251 SELECT
252 remote_sync.swaps_fees.bitcoin_address as bitcoin_address,
253 remote_sync.swaps_fees.created_at as created_at,
254 remote_sync.swaps_fees.channel_opening_fees as channel_opening_fees
255 FROM remote_sync.swaps_fees
256 LEFT JOIN sync.swaps_fees ON sync.swaps_fees.bitcoin_address = remote_sync.swaps_fees.bitcoin_address
257 WHERE
258 sync.swaps_fees.created_at IS NULL OR remote_sync.swaps_fees.created_at > sync.swaps_fees.created_at
259 ;",
260 [],
261 )?;
262
263 Ok(())
264 }
265}
266
267#[cfg(test)]
268mod tests {
269 use anyhow::anyhow;
270 use rand::random;
271 use std::time::Duration;
272
273 use crate::persist::db::SqliteStorage;
274 use crate::persist::error::PersistResult;
275 use crate::persist::test_utils;
276 use crate::test_utils::{get_test_ofp_48h, rand_string, rand_vec_u8};
277 use crate::{ListSwapsRequest, SwapInfo};
278
279 #[test]
280 fn test_sync() -> PersistResult<()> {
281 let local_storage = SqliteStorage::new(test_utils::create_test_sql_dir());
282 local_storage.init()?;
283
284 let local_swap_info = create_test_swap_info();
285 local_storage.insert_swap(local_swap_info.clone())?;
286
287 let mut remote_swap_info = local_swap_info;
288 remote_swap_info.bitcoin_address = "2".into();
289 remote_swap_info.script = vec![6];
290 remote_swap_info.swapper_public_key = vec![6];
291 remote_swap_info.public_key = vec![6];
292 remote_swap_info.preimage = vec![6];
293 remote_swap_info.payment_hash = vec![6];
294 remote_swap_info.private_key = vec![6];
295
296 let remote_storage = SqliteStorage::new(test_utils::create_test_sql_dir());
297 remote_storage.init()?;
298 remote_storage.insert_swap(remote_swap_info)?;
299
300 remote_storage.insert_open_channel_payment_info("123", 100000, "")?;
301
302 remote_storage.import_remote_changes(&local_storage, false)?;
303 local_storage.import_remote_changes(&remote_storage, true)?;
304
305 let mut local_swaps = local_storage.list_swaps(ListSwapsRequest::default())?;
306 local_swaps.sort_by(|s1, s2| s1.bitcoin_address.cmp(&s2.bitcoin_address));
307
308 let mut remote_swaps = local_storage.list_swaps(ListSwapsRequest::default())?;
309 remote_swaps.sort_by(|s1, s2| s1.bitcoin_address.cmp(&s2.bitcoin_address));
310
311 assert_eq!(local_swaps, remote_swaps);
312 assert_eq!(local_swaps.len(), 2);
313
314 local_storage.set_last_sync_version(10, &vec![])?;
315 let version = local_storage.get_last_sync_version()?.unwrap();
316 assert_eq!(version, 10);
317
318 Ok(())
319 }
320
321 #[tokio::test]
322 async fn test_sync_swaps_update_swap_fees() -> PersistResult<()> {
323 let local_storage = SqliteStorage::new(test_utils::create_test_sql_dir());
324 local_storage.init()?;
325
326 let local_swap_info = create_test_swap_info();
328 local_storage.insert_swap(local_swap_info.clone())?;
329
330 tokio::time::sleep(Duration::from_secs(2)).await;
332
333 let new_fees: crate::OpeningFeeParams = get_test_ofp_48h(10, 10).into();
335 local_storage.update_swap_fees(local_swap_info.bitcoin_address, new_fees.clone())?;
336
337 let local_swaps = local_storage.list_swaps(ListSwapsRequest::default())?;
338 assert_eq!(local_swaps.len(), 1);
339 assert_eq!(
340 local_swaps
341 .first()
342 .ok_or_else(|| anyhow!("No element found"))?
343 .channel_opening_fees,
344 Some(new_fees)
345 );
346
347 Ok(())
348 }
349
350 #[tokio::test]
351 async fn test_sync_swaps_fees_local_vs_remote() -> PersistResult<()> {
352 let local_storage = SqliteStorage::new(test_utils::create_test_sql_dir());
353 local_storage.init()?;
354
355 let remote_storage = SqliteStorage::new(test_utils::create_test_sql_dir());
356 remote_storage.init()?;
357
358 let pre_sync_l1 = create_test_swap_info();
360 let pre_sync_l2 = create_test_swap_info();
361 let pre_sync_l3 = create_test_swap_info();
362
363 let mut pre_sync_r1 = pre_sync_l1.clone();
365 pre_sync_r1.channel_opening_fees = Some(get_test_ofp_48h(10, 10).into());
366 let mut pre_sync_r2 = pre_sync_l2.clone();
367 pre_sync_r2.channel_opening_fees = Some(get_test_ofp_48h(15, 15).into());
368
369 remote_storage.insert_swap(pre_sync_r1.clone())?; tokio::time::sleep(Duration::from_secs(1)).await;
378 local_storage.insert_swap(pre_sync_l1.clone())?; tokio::time::sleep(Duration::from_secs(1)).await;
380 local_storage.insert_swap(pre_sync_l2.clone())?; tokio::time::sleep(Duration::from_secs(1)).await;
382 local_storage.insert_swap(pre_sync_l3.clone())?; tokio::time::sleep(Duration::from_secs(1)).await;
384 remote_storage.insert_swap(pre_sync_r2.clone())?; let local_swaps = local_storage.list_swaps(ListSwapsRequest::default())?;
394 let remote_swaps = remote_storage.list_swaps(ListSwapsRequest::default())?;
395 assert_eq!(local_swaps.len(), 3);
396 assert_eq!(remote_swaps.len(), 2); remote_storage.import_remote_changes(&local_storage, false)?;
400 local_storage.import_remote_changes(&remote_storage, true)?;
401
402 let local_swaps = local_storage.list_swaps(ListSwapsRequest::default())?;
403 let remote_swaps = remote_storage.list_swaps(ListSwapsRequest::default())?;
404 assert_eq!(local_swaps.len(), 3);
405 assert_eq!(remote_swaps.len(), 3); let post_sync_l1 = local_swaps
408 .iter()
409 .find(|s| s.bitcoin_address == pre_sync_l1.bitcoin_address)
410 .ok_or_else(|| anyhow!("L1 swaps_fees row lost from local DB after sync"))?;
411 let post_sync_l2 = local_swaps
412 .iter()
413 .find(|s| s.bitcoin_address == pre_sync_l2.bitcoin_address)
414 .ok_or_else(|| anyhow!("L2 swaps_fees row lost from local DB after sync"))?;
415 let post_sync_l3 = local_swaps
416 .iter()
417 .find(|s| s.bitcoin_address == pre_sync_l3.bitcoin_address)
418 .ok_or_else(|| anyhow!("L3 swaps_fees row lost from local DB after sync"))?;
419 let post_sync_r1 = remote_swaps
420 .iter()
421 .find(|s| s.bitcoin_address == pre_sync_r1.bitcoin_address)
422 .ok_or_else(|| anyhow!("R1 swaps_fees row lost from remote DB after sync"))?;
423 let post_sync_r2 = remote_swaps
424 .iter()
425 .find(|s| s.bitcoin_address == pre_sync_r2.bitcoin_address)
426 .ok_or_else(|| anyhow!("R2 swaps_fees row lost from remote DB after sync"))?;
427 let post_sync_r3 = remote_swaps
428 .iter()
429 .find(|s| s.bitcoin_address == pre_sync_l3.bitcoin_address)
430 .ok_or_else(|| anyhow!("No R3 swap info found in remote DB after sync"))?;
431
432 assert_ne!(post_sync_l1, &pre_sync_r1);
434 assert_eq!(post_sync_l1, &pre_sync_l1);
436 assert_eq!(post_sync_l1, post_sync_r1);
438 assert_ne!(post_sync_r1, &pre_sync_r1);
439
440 assert_eq!(post_sync_l2, post_sync_r2);
442 assert_ne!(post_sync_l2, &pre_sync_l2);
443
444 assert_eq!(post_sync_l3, post_sync_r3);
446 assert_eq!(post_sync_l3, &pre_sync_l3);
447
448 Ok(())
449 }
450
451 fn create_test_swap_info() -> SwapInfo {
452 SwapInfo {
453 bitcoin_address: rand_string(10),
454 created_at: 10,
455 lock_height: random(),
456 payment_hash: rand_vec_u8(10),
457 preimage: rand_vec_u8(10),
458 private_key: rand_vec_u8(10),
459 public_key: rand_vec_u8(10),
460 swapper_public_key: rand_vec_u8(10),
461 script: rand_vec_u8(10),
462 bolt11: None,
463 paid_msat: 0,
464 unconfirmed_sats: 0,
465 confirmed_sats: 0,
466 total_incoming_txs: 0,
467 status: crate::models::SwapStatus::Initial,
468 refund_tx_ids: Vec::new(),
469 unconfirmed_tx_ids: Vec::new(),
470 confirmed_tx_ids: Vec::new(),
471 min_allowed_deposit: 0,
472 max_allowed_deposit: 100,
473 max_swapper_payable: 200,
474 last_redeem_error: None,
475 channel_opening_fees: Some(get_test_ofp_48h(random(), random()).into()),
476 confirmed_at: None,
477 }
478 }
479}