breez_sdk_spark/common/
sync_storage.rs

1use std::{collections::HashMap, sync::Arc};
2
3use serde::{Deserialize, Serialize};
4use thiserror::Error;
5
6/// Errors that can occur during storage operations
7#[derive(Debug, Error, Clone)]
8#[cfg_attr(feature = "uniffi", derive(uniffi::Error))]
9pub enum SyncStorageError {
10    #[error("Underline implementation error: {0}")]
11    Implementation(String),
12
13    /// Database initialization error
14    #[error("Failed to initialize database: {0}")]
15    InitializationError(String),
16
17    #[error("Failed to serialize/deserialize data: {0}")]
18    Serialization(String),
19}
20
21impl From<SyncStorageError> for breez_sdk_common::sync::storage::SyncStorageError {
22    fn from(value: SyncStorageError) -> Self {
23        match value {
24            SyncStorageError::Implementation(msg) => {
25                breez_sdk_common::sync::storage::SyncStorageError::Implementation(msg)
26            }
27            SyncStorageError::InitializationError(msg) => {
28                breez_sdk_common::sync::storage::SyncStorageError::InitializationError(msg)
29            }
30            SyncStorageError::Serialization(msg) => {
31                breez_sdk_common::sync::storage::SyncStorageError::Serialization(msg)
32            }
33        }
34    }
35}
36
37impl From<serde_json::Error> for SyncStorageError {
38    fn from(e: serde_json::Error) -> Self {
39        SyncStorageError::Serialization(e.to_string())
40    }
41}
42
43#[cfg_attr(feature = "uniffi", uniffi::export(with_foreign))]
44#[macros::async_trait]
45pub trait SyncStorage: Send + Sync {
46    async fn add_outgoing_change(
47        &self,
48        record: UnversionedRecordChange,
49    ) -> Result<u64, SyncStorageError>;
50    async fn complete_outgoing_sync(&self, record: Record) -> Result<(), SyncStorageError>;
51    async fn get_pending_outgoing_changes(
52        &self,
53        limit: u32,
54    ) -> Result<Vec<OutgoingChange>, SyncStorageError>;
55
56    /// Get the revision number of the last synchronized record
57    async fn get_last_revision(&self) -> Result<u64, SyncStorageError>;
58
59    /// Insert incoming records from remote sync
60    async fn insert_incoming_records(&self, records: Vec<Record>) -> Result<(), SyncStorageError>;
61
62    /// Delete an incoming record after it has been processed
63    async fn delete_incoming_record(&self, record: Record) -> Result<(), SyncStorageError>;
64
65    /// Update revision numbers of pending outgoing records to be higher than the given revision
66    async fn rebase_pending_outgoing_records(&self, revision: u64) -> Result<(), SyncStorageError>;
67
68    /// Get incoming records that need to be processed, up to the specified limit
69    async fn get_incoming_records(
70        &self,
71        limit: u32,
72    ) -> Result<Vec<IncomingChange>, SyncStorageError>;
73
74    /// Get the latest outgoing record if any exists
75    async fn get_latest_outgoing_change(&self) -> Result<Option<OutgoingChange>, SyncStorageError>;
76
77    /// Update the sync state record from an incoming record
78    async fn update_record_from_incoming(&self, record: Record) -> Result<(), SyncStorageError>;
79}
80
81pub(crate) struct SyncStorageWrapper {
82    pub inner: Arc<dyn SyncStorage>,
83}
84
85impl SyncStorageWrapper {
86    pub fn new(inner: Arc<dyn SyncStorage>) -> Self {
87        Self { inner }
88    }
89}
90
91#[macros::async_trait]
92impl breez_sdk_common::sync::storage::SyncStorage for SyncStorageWrapper {
93    async fn add_outgoing_change(
94        &self,
95        record: breez_sdk_common::sync::storage::UnversionedRecordChange,
96    ) -> Result<u64, breez_sdk_common::sync::storage::SyncStorageError> {
97        Ok(self.inner.add_outgoing_change(record.into()).await?)
98    }
99
100    async fn complete_outgoing_sync(
101        &self,
102        record: breez_sdk_common::sync::storage::Record,
103    ) -> Result<(), breez_sdk_common::sync::storage::SyncStorageError> {
104        Ok(self.inner.complete_outgoing_sync(record.into()).await?)
105    }
106
107    async fn get_pending_outgoing_changes(
108        &self,
109        limit: u32,
110    ) -> Result<
111        Vec<breez_sdk_common::sync::storage::OutgoingChange>,
112        breez_sdk_common::sync::storage::SyncStorageError,
113    > {
114        let changes = self.inner.get_pending_outgoing_changes(limit).await?;
115        Ok(changes.into_iter().map(From::from).collect())
116    }
117
118    async fn get_last_revision(
119        &self,
120    ) -> Result<u64, breez_sdk_common::sync::storage::SyncStorageError> {
121        Ok(self.inner.get_last_revision().await?)
122    }
123
124    async fn insert_incoming_records(
125        &self,
126        records: Vec<breez_sdk_common::sync::storage::Record>,
127    ) -> Result<(), breez_sdk_common::sync::storage::SyncStorageError> {
128        let recs: Vec<Record> = records.into_iter().map(From::from).collect();
129        Ok(self.inner.insert_incoming_records(recs).await?)
130    }
131
132    async fn delete_incoming_record(
133        &self,
134        record: breez_sdk_common::sync::storage::Record,
135    ) -> Result<(), breez_sdk_common::sync::storage::SyncStorageError> {
136        Ok(self.inner.delete_incoming_record(record.into()).await?)
137    }
138
139    async fn rebase_pending_outgoing_records(
140        &self,
141        revision: u64,
142    ) -> Result<(), breez_sdk_common::sync::storage::SyncStorageError> {
143        Ok(self.inner.rebase_pending_outgoing_records(revision).await?)
144    }
145
146    async fn get_incoming_records(
147        &self,
148        limit: u32,
149    ) -> Result<
150        Vec<breez_sdk_common::sync::storage::IncomingChange>,
151        breez_sdk_common::sync::storage::SyncStorageError,
152    > {
153        let changes = self.inner.get_incoming_records(limit).await?;
154        Ok(changes.into_iter().map(From::from).collect())
155    }
156
157    async fn get_latest_outgoing_change(
158        &self,
159    ) -> Result<
160        Option<breez_sdk_common::sync::storage::OutgoingChange>,
161        breez_sdk_common::sync::storage::SyncStorageError,
162    > {
163        let change = self.inner.get_latest_outgoing_change().await?;
164        Ok(change.map(From::from))
165    }
166
167    async fn update_record_from_incoming(
168        &self,
169        record: breez_sdk_common::sync::storage::Record,
170    ) -> Result<(), breez_sdk_common::sync::storage::SyncStorageError> {
171        Ok(self
172            .inner
173            .update_record_from_incoming(record.into())
174            .await?)
175    }
176}
177
178#[derive(Debug, Clone, Deserialize, Serialize)]
179#[cfg_attr(feature = "uniffi", derive(uniffi::Record))]
180pub struct RecordId {
181    pub r#type: String,
182    pub data_id: String,
183}
184
185impl RecordId {
186    pub fn new(r#type: String, data_id: String) -> Self {
187        RecordId { r#type, data_id }
188    }
189}
190
191impl From<breez_sdk_common::sync::storage::RecordId> for RecordId {
192    fn from(value: breez_sdk_common::sync::storage::RecordId) -> Self {
193        RecordId {
194            r#type: value.r#type,
195            data_id: value.data_id,
196        }
197    }
198}
199
200impl From<RecordId> for breez_sdk_common::sync::storage::RecordId {
201    fn from(value: RecordId) -> Self {
202        breez_sdk_common::sync::storage::RecordId {
203            r#type: value.r#type,
204            data_id: value.data_id,
205        }
206    }
207}
208
209#[cfg_attr(feature = "uniffi", derive(uniffi::Record))]
210#[derive(Clone, Debug)]
211pub struct IncomingChange {
212    pub new_state: Record,
213    pub old_state: Option<Record>,
214}
215
216impl From<breez_sdk_common::sync::storage::IncomingChange> for IncomingChange {
217    fn from(value: breez_sdk_common::sync::storage::IncomingChange) -> Self {
218        IncomingChange {
219            new_state: value.new_state.into(),
220            old_state: value.old_state.map(From::from),
221        }
222    }
223}
224
225impl From<IncomingChange> for breez_sdk_common::sync::storage::IncomingChange {
226    fn from(value: IncomingChange) -> Self {
227        breez_sdk_common::sync::storage::IncomingChange {
228            new_state: value.new_state.into(),
229            old_state: value.old_state.map(From::from),
230        }
231    }
232}
233
234#[cfg_attr(feature = "uniffi", derive(uniffi::Record))]
235#[derive(Clone, Debug)]
236pub struct OutgoingChange {
237    pub change: RecordChange,
238    pub parent: Option<Record>,
239}
240
241impl From<breez_sdk_common::sync::storage::OutgoingChange> for OutgoingChange {
242    fn from(value: breez_sdk_common::sync::storage::OutgoingChange) -> Self {
243        OutgoingChange {
244            change: value.change.into(),
245            parent: value.parent.map(From::from),
246        }
247    }
248}
249
250impl From<OutgoingChange> for breez_sdk_common::sync::storage::OutgoingChange {
251    fn from(value: OutgoingChange) -> Self {
252        breez_sdk_common::sync::storage::OutgoingChange {
253            change: value.change.into(),
254            parent: value.parent.map(From::from),
255        }
256    }
257}
258
259#[cfg_attr(feature = "uniffi", derive(uniffi::Record))]
260#[derive(Clone, Debug)]
261pub struct UnversionedRecordChange {
262    pub id: RecordId,
263    pub schema_version: String,
264    pub updated_fields: HashMap<String, String>,
265}
266
267impl From<breez_sdk_common::sync::storage::UnversionedRecordChange> for UnversionedRecordChange {
268    fn from(value: breez_sdk_common::sync::storage::UnversionedRecordChange) -> Self {
269        UnversionedRecordChange {
270            id: value.id.into(),
271            schema_version: value.schema_version,
272            updated_fields: value.updated_fields,
273        }
274    }
275}
276
277impl From<UnversionedRecordChange> for breez_sdk_common::sync::storage::UnversionedRecordChange {
278    fn from(value: UnversionedRecordChange) -> Self {
279        breez_sdk_common::sync::storage::UnversionedRecordChange {
280            id: value.id.into(),
281            schema_version: value.schema_version,
282            updated_fields: value.updated_fields,
283        }
284    }
285}
286
287#[cfg_attr(feature = "uniffi", derive(uniffi::Record))]
288#[derive(Clone, Debug)]
289pub struct RecordChange {
290    pub id: RecordId,
291    pub schema_version: String,
292    pub updated_fields: HashMap<String, String>,
293    pub revision: u64,
294}
295
296impl From<breez_sdk_common::sync::storage::RecordChange> for RecordChange {
297    fn from(value: breez_sdk_common::sync::storage::RecordChange) -> Self {
298        RecordChange {
299            id: value.id.into(),
300            schema_version: value.schema_version,
301            updated_fields: value.updated_fields,
302            revision: value.revision,
303        }
304    }
305}
306
307impl From<RecordChange> for breez_sdk_common::sync::storage::RecordChange {
308    fn from(value: RecordChange) -> Self {
309        breez_sdk_common::sync::storage::RecordChange {
310            id: value.id.into(),
311            schema_version: value.schema_version,
312            updated_fields: value.updated_fields,
313            revision: value.revision,
314        }
315    }
316}
317
318#[cfg_attr(feature = "uniffi", derive(uniffi::Record))]
319#[derive(Clone, Debug)]
320pub struct Record {
321    pub id: RecordId,
322    pub revision: u64,
323    pub schema_version: String,
324    pub data: HashMap<String, String>,
325}
326
327impl From<breez_sdk_common::sync::storage::Record> for Record {
328    fn from(value: breez_sdk_common::sync::storage::Record) -> Self {
329        Record {
330            id: value.id.into(),
331            revision: value.revision,
332            schema_version: value.schema_version,
333            data: value.data,
334        }
335    }
336}
337
338impl From<Record> for breez_sdk_common::sync::storage::Record {
339    fn from(value: Record) -> Self {
340        breez_sdk_common::sync::storage::Record {
341            id: value.id.into(),
342            revision: value.revision,
343            schema_version: value.schema_version,
344            data: value.data,
345        }
346    }
347}