breez_sdk_core/persist/
db.rs

1use super::{
2    error::PersistResult,
3    migrations::{current_migrations, current_sync_migrations},
4};
5use anyhow::Result;
6use rusqlite::{
7    hooks::Action,
8    types::{FromSql, FromSqlError, ToSqlOutput},
9    Connection, ToSql,
10};
11use rusqlite_migration::{Migrations, M};
12use tokio::sync::broadcast;
13
14/// HookEvent is used to notify listeners about DB changes.
15/// A listener can register to be notified about specific events that occurs as part of
16/// modifications in the persistent storage.
17#[derive(Debug, Clone)]
18pub(crate) enum HookEvent {
19    Insert { table: String },
20}
21
22pub(crate) struct SqliteStorage {
23    /// Local DB. Exists only on this instance of the SDK.
24    main_db_file: String,
25    /// Sync DB. Gets synchronized across the different instances that connect to the same wallet.
26    sync_db_file: String,
27    /// Dispatch DB hook events.
28    events_publisher: broadcast::Sender<HookEvent>,
29}
30
31impl SqliteStorage {
32    pub fn new(working_dir: String) -> SqliteStorage {
33        let main_db_file = format!("{}/storage.sql", working_dir);
34        let sync_db_file = format!("{}/sync_storage.sql", working_dir);
35        let (events_publisher, _) = broadcast::channel::<HookEvent>(100);
36
37        SqliteStorage {
38            main_db_file,
39            sync_db_file,
40            events_publisher,
41        }
42    }
43
44    pub(crate) fn subscribe_hooks(&self) -> broadcast::Receiver<HookEvent> {
45        self.events_publisher.subscribe()
46    }
47
48    pub(crate) fn init(&self) -> PersistResult<()> {
49        self.migrate_main_db()?;
50        Self::migrate_sync_db(self.sync_db_file.clone())?;
51        Ok(())
52    }
53
54    pub(crate) fn migrate_sync_db(sync_db_path: String) -> PersistResult<()> {
55        let mut sync_con = Connection::open(sync_db_path)?;
56        let sync_migrations =
57            Migrations::new(current_sync_migrations().into_iter().map(M::up).collect());
58        sync_migrations.to_latest(&mut sync_con)?;
59        Ok(())
60    }
61
62    fn migrate_main_db(&self) -> PersistResult<()> {
63        let migrations = Migrations::new(current_migrations().into_iter().map(M::up).collect());
64        let mut conn = self.get_connection()?;
65        migrations.to_latest(&mut conn)?;
66        Ok(())
67    }
68
69    pub(crate) fn get_connection(&self) -> PersistResult<Connection> {
70        let con = Connection::open(self.main_db_file.clone())?;
71        let sql = "ATTACH DATABASE ? AS sync;";
72        con.execute(sql, [self.sync_db_file.clone()])?;
73        // We want to notify any subscribers with hook events.
74        let events_publisher = self.events_publisher.clone();
75        con.update_hook(Some(move |action, db: &str, t: &str, _| {
76            if action == Action::SQLITE_INSERT && db == "sync" {
77                _ = events_publisher.send(HookEvent::Insert { table: t.into() });
78            }
79        }));
80        Ok(con)
81    }
82
83    pub(crate) fn sync_db_path(&self) -> String {
84        self.sync_db_file.clone()
85    }
86}
87
88pub(crate) struct StringArray(pub Vec<String>);
89
90impl FromSql for StringArray {
91    fn column_result(value: rusqlite::types::ValueRef<'_>) -> rusqlite::types::FromSqlResult<Self> {
92        let res: Result<Vec<String>, FromSqlError> =
93            serde_json::from_str(value.as_str()?).map_err(|_| FromSqlError::InvalidType);
94        Ok(StringArray(res?))
95    }
96}
97
98impl ToSql for StringArray {
99    fn to_sql(&self) -> rusqlite::Result<rusqlite::types::ToSqlOutput<'_>> {
100        let res = serde_json::to_string(&self.0).map_err(|_| FromSqlError::InvalidType);
101        Ok(ToSqlOutput::from(res?))
102    }
103}