1use macros::async_trait;
2use rusqlite::{
3 Connection, ToSql, params,
4 types::{FromSql, FromSqlError, FromSqlResult, ToSqlOutput, ValueRef},
5};
6use rusqlite_migration::{M, Migrations};
7use std::path::{Path, PathBuf};
8
9use crate::{
10 DepositInfo, LnurlPayInfo, PaymentDetails, PaymentMethod,
11 error::DepositClaimError,
12 models::{PaymentStatus, PaymentType},
13 persist::{PaymentMetadata, UpdateDepositPayload},
14};
15
16use super::{Payment, Storage, StorageError};
17
18const DEFAULT_DB_FILENAME: &str = "storage.sql";
19pub struct SqliteStorage {
21 db_dir: PathBuf,
22}
23
24impl SqliteStorage {
25 pub fn new(path: &Path) -> Result<Self, StorageError> {
35 let storage = Self {
36 db_dir: path.to_path_buf(),
37 };
38
39 #[cfg(not(all(target_family = "wasm", target_os = "unknown")))]
40 std::fs::create_dir_all(path)
41 .map_err(|e| StorageError::InitializationError(e.to_string()))?;
42
43 storage.migrate()?;
44 Ok(storage)
45 }
46
47 pub(crate) fn get_connection(&self) -> Result<Connection, StorageError> {
48 Ok(Connection::open(self.get_db_path())?)
49 }
50
51 fn get_db_path(&self) -> PathBuf {
52 self.db_dir.join(DEFAULT_DB_FILENAME)
53 }
54
55 fn migrate(&self) -> Result<(), StorageError> {
56 let migrations =
57 Migrations::new(Self::current_migrations().into_iter().map(M::up).collect());
58 let mut conn = self.get_connection()?;
59 migrations.to_latest(&mut conn)?;
60 Ok(())
61 }
62
63 pub(crate) fn current_migrations() -> Vec<&'static str> {
64 vec![
65 "CREATE TABLE IF NOT EXISTS payments (
66 id TEXT PRIMARY KEY,
67 payment_type TEXT NOT NULL,
68 status TEXT NOT NULL,
69 amount INTEGER NOT NULL,
70 fees INTEGER NOT NULL,
71 timestamp INTEGER NOT NULL,
72 details TEXT,
73 method TEXT
74 );",
75 "CREATE TABLE IF NOT EXISTS settings (
76 key TEXT PRIMARY KEY,
77 value TEXT NOT NULL
78 );",
79 "CREATE TABLE IF NOT EXISTS unclaimed_deposits (
80 txid TEXT NOT NULL,
81 vout INTEGER NOT NULL,
82 amount_sats INTEGER,
83 claim_error TEXT,
84 refund_tx TEXT,
85 refund_tx_id TEXT,
86 PRIMARY KEY (txid, vout)
87 );",
88 "CREATE TABLE IF NOT EXISTS payment_metadata (
89 payment_id TEXT PRIMARY KEY,
90 lnurl_pay_info TEXT);",
91 "CREATE TABLE IF NOT EXISTS deposit_refunds (
92 deposit_tx_id TEXT NOT NULL,
93 deposit_vout INTEGER NOT NULL,
94 refund_tx TEXT NOT NULL,
95 refund_tx_id TEXT NOT NULL,
96 PRIMARY KEY (deposit_tx_id, deposit_vout)
97 );",
98 ]
99 }
100}
101
102impl From<rusqlite::Error> for StorageError {
103 fn from(value: rusqlite::Error) -> Self {
104 StorageError::Implementation(value.to_string())
105 }
106}
107
108impl From<rusqlite_migration::Error> for StorageError {
109 fn from(value: rusqlite_migration::Error) -> Self {
110 StorageError::Implementation(value.to_string())
111 }
112}
113
114#[async_trait]
115impl Storage for SqliteStorage {
116 async fn list_payments(
117 &self,
118 offset: Option<u32>,
119 limit: Option<u32>,
120 ) -> Result<Vec<Payment>, StorageError> {
121 let connection = self.get_connection()?;
122
123 let query = format!(
124 "SELECT p.id, p.payment_type, p.status, p.amount, p.fees, p.timestamp, p.details, p.method, pm.lnurl_pay_info
125 FROM payments p
126 LEFT JOIN payment_metadata pm ON p.id = pm.payment_id
127 ORDER BY p.timestamp DESC
128 LIMIT {} OFFSET {}",
129 limit.unwrap_or(u32::MAX),
130 offset.unwrap_or(0)
131 );
132
133 let mut stmt = connection.prepare(&query)?;
134
135 let payment_iter = stmt.query_map(params![], |row| {
136 let mut details: Option<PaymentDetails> = row.get(6)?;
137 if let Some(PaymentDetails::Lightning { lnurl_pay_info, .. }) = &mut details {
138 *lnurl_pay_info = row.get(8)?;
139 }
140
141 Ok(Payment {
142 id: row.get(0)?,
143 payment_type: PaymentType::from(row.get::<_, String>(1)?.as_str()),
144 status: PaymentStatus::from(row.get::<_, String>(2)?.as_str()),
145 amount: row.get(3)?,
146 fees: row.get(4)?,
147 timestamp: row.get(5)?,
148 details,
149 method: row.get(7)?,
150 })
151 })?;
152
153 let mut payments = Vec::new();
154 for payment in payment_iter {
155 payments.push(payment?);
156 }
157
158 Ok(payments)
159 }
160
161 async fn insert_payment(&self, payment: Payment) -> Result<(), StorageError> {
162 let connection = self.get_connection()?;
163
164 connection.execute(
165 "INSERT OR REPLACE INTO payments (id, payment_type, status, amount, fees, timestamp, details, method)
166 VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
167 params![
168 payment.id,
169 payment.payment_type.to_string(),
170 payment.status.to_string(),
171 payment.amount,
172 payment.fees,
173 payment.timestamp,
174 payment.details,
175 payment.method,
176 ],
177 )?;
178
179 Ok(())
180 }
181
182 async fn set_payment_metadata(
183 &self,
184 payment_id: String,
185 metadata: PaymentMetadata,
186 ) -> Result<(), StorageError> {
187 let connection = self.get_connection()?;
188
189 connection.execute(
190 "INSERT OR REPLACE INTO payment_metadata (payment_id, lnurl_pay_info) VALUES (?, ?)",
191 params![payment_id, metadata.lnurl_pay_info],
192 )?;
193
194 Ok(())
195 }
196
197 async fn set_cached_item(&self, key: String, value: String) -> Result<(), StorageError> {
198 let connection = self.get_connection()?;
199
200 connection.execute(
201 "INSERT OR REPLACE INTO settings (key, value) VALUES (?, ?)",
202 params![key, value],
203 )?;
204
205 Ok(())
206 }
207
208 async fn get_cached_item(&self, key: String) -> Result<Option<String>, StorageError> {
209 let connection = self.get_connection()?;
210
211 let mut stmt = connection.prepare("SELECT value FROM settings WHERE key = ?")?;
212
213 let result = stmt.query_row(params![key], |row| {
214 let value_str: String = row.get(0)?;
215 Ok(value_str)
216 });
217
218 match result {
219 Ok(value) => Ok(Some(value)),
220 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
221 Err(e) => Err(e.into()),
222 }
223 }
224
225 async fn get_payment_by_id(&self, id: String) -> Result<Payment, StorageError> {
226 let connection = self.get_connection()?;
227
228 let mut stmt = connection.prepare(
229 "SELECT id, payment_type, status, amount, fees, timestamp, details, method, pm.lnurl_pay_info FROM payments LEFT JOIN payment_metadata pm ON payments.id = pm.payment_id WHERE payments.id = ?",
230 )?;
231
232 let result = stmt.query_row(params![id], |row| {
233 let mut details: Option<PaymentDetails> = row.get(6)?;
234 if let Some(PaymentDetails::Lightning { lnurl_pay_info, .. }) = &mut details {
235 *lnurl_pay_info = row.get(8)?;
236 }
237
238 Ok(Payment {
239 id: row.get(0)?,
240 payment_type: PaymentType::from(row.get::<_, String>(1)?.as_str()),
241 status: PaymentStatus::from(row.get::<_, String>(2)?.as_str()),
242 amount: row.get(3)?,
243 fees: row.get(4)?,
244 timestamp: row.get(5)?,
245 details,
246 method: row.get(7)?,
247 })
248 });
249 result.map_err(StorageError::from)
250 }
251
252 async fn add_deposit(
253 &self,
254 txid: String,
255 vout: u32,
256 amount_sats: u64,
257 ) -> Result<(), StorageError> {
258 let connection = self.get_connection()?;
259 connection.execute(
260 "INSERT OR IGNORE INTO unclaimed_deposits (txid, vout, amount_sats)
261 VALUES (?, ?, ?)",
262 params![txid, vout, amount_sats,],
263 )?;
264 Ok(())
265 }
266
267 async fn delete_deposit(&self, txid: String, vout: u32) -> Result<(), StorageError> {
268 let connection = self.get_connection()?;
269 connection.execute(
270 "DELETE FROM unclaimed_deposits WHERE txid = ? AND vout = ?",
271 params![txid, vout],
272 )?;
273 Ok(())
274 }
275
276 async fn list_deposits(&self) -> Result<Vec<DepositInfo>, StorageError> {
277 let connection = self.get_connection()?;
278 let mut stmt =
279 connection.prepare("SELECT txid, vout, amount_sats, claim_error, refund_tx, refund_tx_id FROM unclaimed_deposits")?;
280 let rows = stmt.query_map(params![], |row| {
281 Ok(DepositInfo {
282 txid: row.get(0)?,
283 vout: row.get(1)?,
284 amount_sats: row.get(2)?,
285 claim_error: row.get(3)?,
286 refund_tx: row.get(4)?,
287 refund_tx_id: row.get(5)?,
288 })
289 })?;
290 let mut deposits = Vec::new();
291 for row in rows {
292 deposits.push(row?);
293 }
294 Ok(deposits)
295 }
296
297 async fn update_deposit(
298 &self,
299 txid: String,
300 vout: u32,
301 payload: UpdateDepositPayload,
302 ) -> Result<(), StorageError> {
303 let connection = self.get_connection()?;
304 match payload {
305 UpdateDepositPayload::ClaimError { error } => {
306 connection.execute(
307 "UPDATE unclaimed_deposits SET claim_error = ? WHERE txid = ? AND vout = ?",
308 params![error, txid, vout],
309 )?;
310 }
311 UpdateDepositPayload::Refund {
312 refund_txid,
313 refund_tx,
314 } => {
315 connection.execute(
316 "UPDATE unclaimed_deposits SET refund_tx = ?, refund_tx_id = ? WHERE txid = ? AND vout = ?",
317 params![refund_tx, refund_txid, txid, vout],
318 )?;
319 }
320 }
321 Ok(())
322 }
323}
324
325impl ToSql for PaymentDetails {
326 fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
327 let json = serde_json::to_string(self)
328 .map_err(|e| rusqlite::Error::ToSqlConversionFailure(Box::new(e)))?;
329 Ok(rusqlite::types::ToSqlOutput::from(json))
330 }
331}
332
333impl FromSql for PaymentDetails {
334 fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
335 match value {
336 ValueRef::Text(i) => {
337 let s = std::str::from_utf8(i).map_err(FromSqlError::other)?;
338 let payment_details: PaymentDetails =
339 serde_json::from_str(s).map_err(|_| FromSqlError::InvalidType)?;
340 Ok(payment_details)
341 }
342 _ => Err(FromSqlError::InvalidType),
343 }
344 }
345}
346
347impl ToSql for PaymentMethod {
348 fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
349 let json = serde_json::to_string(self)
350 .map_err(|e| rusqlite::Error::ToSqlConversionFailure(Box::new(e)))?;
351 Ok(rusqlite::types::ToSqlOutput::from(json))
352 }
353}
354
355impl FromSql for PaymentMethod {
356 fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
357 match value {
358 ValueRef::Text(i) => {
359 let s = std::str::from_utf8(i).map_err(FromSqlError::other)?;
360 let payment_method: PaymentMethod =
361 serde_json::from_str(s).map_err(|_| FromSqlError::InvalidType)?;
362 Ok(payment_method)
363 }
364 _ => Err(FromSqlError::InvalidType),
365 }
366 }
367}
368
369impl ToSql for DepositClaimError {
370 fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
371 let json = serde_json::to_string(self)
372 .map_err(|e| rusqlite::Error::ToSqlConversionFailure(Box::new(e)))?;
373 Ok(rusqlite::types::ToSqlOutput::from(json))
374 }
375}
376
377impl ToSql for LnurlPayInfo {
378 fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
379 let json = serde_json::to_string(self)
380 .map_err(|e| rusqlite::Error::ToSqlConversionFailure(Box::new(e)))?;
381 Ok(rusqlite::types::ToSqlOutput::from(json))
382 }
383}
384
385impl FromSql for DepositClaimError {
386 fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
387 match value {
388 ValueRef::Text(i) => {
389 let s = std::str::from_utf8(i).map_err(FromSqlError::other)?;
390 let deposit_claim_error: DepositClaimError =
391 serde_json::from_str(s).map_err(|_| FromSqlError::InvalidType)?;
392 Ok(deposit_claim_error)
393 }
394 _ => Err(FromSqlError::InvalidType),
395 }
396 }
397}
398
399impl FromSql for LnurlPayInfo {
400 fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
401 match value {
402 ValueRef::Text(i) => {
403 let s = std::str::from_utf8(i).map_err(FromSqlError::other)?;
404 let lnurl_pay_info: LnurlPayInfo =
405 serde_json::from_str(s).map_err(|_| FromSqlError::InvalidType)?;
406 Ok(lnurl_pay_info)
407 }
408 _ => Err(FromSqlError::InvalidType),
409 }
410 }
411}
412
413#[cfg(test)]
414mod tests {
415 use crate::SqliteStorage;
416
417 #[tokio::test]
418 async fn test_sqlite_storage() {
419 let temp_dir = tempdir::TempDir::new("sqlite_storage").unwrap();
420 let storage = SqliteStorage::new(temp_dir.path()).unwrap();
421
422 crate::persist::tests::test_sqlite_storage(Box::new(storage)).await;
423 }
424
425 #[tokio::test]
426 async fn test_unclaimed_deposits_crud() {
427 let temp_dir = tempdir::TempDir::new("sqlite_storage_deposits").unwrap();
428 let storage = SqliteStorage::new(temp_dir.path()).unwrap();
429
430 crate::persist::tests::test_unclaimed_deposits_crud(Box::new(storage)).await;
431 }
432
433 #[tokio::test]
434 async fn test_deposit_refunds() {
435 let temp_dir = tempdir::TempDir::new("sqlite_storage_refund_tx").unwrap();
436 let storage = SqliteStorage::new(temp_dir.path()).unwrap();
437
438 crate::persist::tests::test_deposit_refunds(Box::new(storage)).await;
439 }
440}