breez_sdk_core/persist/
db.rs1use super::{
2 error::PersistResult,
3 migrations::{current_migrations, current_sync_migrations},
4};
5use anyhow::Result;
6use rusqlite::{
7 hooks::Action,
8 types::{FromSql, FromSqlError, ToSqlOutput},
9 Connection, ToSql,
10};
11use rusqlite_migration::{Migrations, M};
12use tokio::sync::broadcast;
13
14#[derive(Debug, Clone)]
18pub(crate) enum HookEvent {
19 Insert { table: String },
20}
21
22pub(crate) struct SqliteStorage {
23 main_db_file: String,
25 sync_db_file: String,
27 events_publisher: broadcast::Sender<HookEvent>,
29}
30
31impl SqliteStorage {
32 pub fn new(working_dir: String) -> SqliteStorage {
33 let main_db_file = format!("{}/storage.sql", working_dir);
34 let sync_db_file = format!("{}/sync_storage.sql", working_dir);
35 let (events_publisher, _) = broadcast::channel::<HookEvent>(100);
36
37 SqliteStorage {
38 main_db_file,
39 sync_db_file,
40 events_publisher,
41 }
42 }
43
44 pub(crate) fn subscribe_hooks(&self) -> broadcast::Receiver<HookEvent> {
45 self.events_publisher.subscribe()
46 }
47
48 pub(crate) fn init(&self) -> PersistResult<()> {
49 self.migrate_main_db()?;
50 Self::migrate_sync_db(self.sync_db_file.clone())?;
51 Ok(())
52 }
53
54 pub(crate) fn migrate_sync_db(sync_db_path: String) -> PersistResult<()> {
55 let mut sync_con = Connection::open(sync_db_path)?;
56 let sync_migrations =
57 Migrations::new(current_sync_migrations().into_iter().map(M::up).collect());
58 sync_migrations.to_latest(&mut sync_con)?;
59 Ok(())
60 }
61
62 fn migrate_main_db(&self) -> PersistResult<()> {
63 let migrations = Migrations::new(current_migrations().into_iter().map(M::up).collect());
64 let mut conn = self.get_connection()?;
65 migrations.to_latest(&mut conn)?;
66 Ok(())
67 }
68
69 pub(crate) fn get_connection(&self) -> PersistResult<Connection> {
70 let con = Connection::open(self.main_db_file.clone())?;
71 let sql = "ATTACH DATABASE ? AS sync;";
72 con.execute(sql, [self.sync_db_file.clone()])?;
73 let events_publisher = self.events_publisher.clone();
75 con.update_hook(Some(move |action, db: &str, t: &str, _| {
76 if action == Action::SQLITE_INSERT && db == "sync" {
77 _ = events_publisher.send(HookEvent::Insert { table: t.into() });
78 }
79 }));
80 Ok(con)
81 }
82
83 pub(crate) fn sync_db_path(&self) -> String {
84 self.sync_db_file.clone()
85 }
86}
87
88pub(crate) struct StringArray(pub Vec<String>);
89
90impl FromSql for StringArray {
91 fn column_result(value: rusqlite::types::ValueRef<'_>) -> rusqlite::types::FromSqlResult<Self> {
92 let res: Result<Vec<String>, FromSqlError> =
93 serde_json::from_str(value.as_str()?).map_err(|_| FromSqlError::InvalidType);
94 Ok(StringArray(res?))
95 }
96}
97
98impl ToSql for StringArray {
99 fn to_sql(&self) -> rusqlite::Result<rusqlite::types::ToSqlOutput<'_>> {
100 let res = serde_json::to_string(&self.0).map_err(|_| FromSqlError::InvalidType);
101 Ok(ToSqlOutput::from(res?))
102 }
103}