1use crate::models::*;
2use std::collections::HashMap;
3
4use super::{db::SqliteStorage, error::PersistResult};
5use std::str::FromStr;
6
7impl SqliteStorage {
8 pub(crate) fn update_channels(&self, fetched_channels: &[Channel]) -> PersistResult<()> {
15 let channels_before_update = self
17 .list_channels()?
18 .into_iter()
19 .map(|c| (c.funding_txid.clone(), c))
20 .collect::<HashMap<_, _>>();
21
22 let new_channels: Vec<Channel> = fetched_channels
24 .iter()
25 .map(|c| {
26 let persisted_channel = channels_before_update.get(&c.funding_txid);
27 let mut cloned_channel = c.clone();
28 if let Some(unwrapped_channel) = persisted_channel {
29 cloned_channel.closed_at = unwrapped_channel.closed_at;
30 cloned_channel
31 .closing_txid
32 .clone_from(&unwrapped_channel.closing_txid);
33 }
34 cloned_channel
35 })
36 .collect();
37
38 for c in new_channels.iter().cloned() {
40 self.insert_or_update_channel(c)?
41 }
42
43 let funding_txs: Vec<String> = new_channels
44 .iter()
45 .cloned()
46 .map(|c| format!("'{}'", c.funding_txid))
47 .collect();
48
49 self.get_connection()?.execute(
51 format!(
52 "
53 UPDATE channels
54 SET
55 state=?1
56 where funding_txid not in ({})
57 ",
58 funding_txs.join(",")
59 )
60 .as_str(),
61 (ChannelState::Closed.to_string(),),
62 )?;
63
64 Ok(())
65 }
66
67 pub(crate) fn list_channels(&self) -> PersistResult<Vec<Channel>> {
68 let con = self.get_connection()?;
69 let mut stmt = con.prepare(
70 "
71 SELECT
72 funding_txid,
73 short_channel_id,
74 state,
75 spendable_msat,
76 local_balance_msat,
77 receivable_msat,
78 closed_at,
79 funding_outnum,
80 alias_local,
81 alias_remote,
82 closing_txid
83 FROM channels
84 ",
85 )?;
86 let channels: Vec<Channel> = stmt
87 .query_map([], |row| {
88 let state_str: String = row.get(2)?;
89 Ok(Channel {
90 funding_txid: row.get(0)?,
91 short_channel_id: row.get(1)?,
92 state: ChannelState::from_str(state_str.as_str())
93 .unwrap_or(ChannelState::Closed),
94 spendable_msat: row.get(3)?,
95 local_balance_msat: row.get(4)?,
96 receivable_msat: row.get(5)?,
97 closed_at: row.get(6)?,
98 funding_outnum: row.get(7)?,
99 alias_local: row.get(8)?,
100 alias_remote: row.get(9)?,
101 closing_txid: row.get(10)?,
102 htlcs: Vec::new(),
103 })
104 })?
105 .map(|i| i.unwrap())
106 .collect();
107
108 Ok(channels)
109 }
110
111 pub(crate) fn insert_or_update_channel(&self, c: Channel) -> PersistResult<()> {
112 self.get_connection()?.execute(
113 "INSERT OR REPLACE INTO channels (
114 funding_txid,
115 short_channel_id,
116 state,
117 spendable_msat,
118 local_balance_msat,
119 receivable_msat,
120 closed_at,
121 funding_outnum,
122 alias_local,
123 alias_remote,
124 closing_txid
125 )
126 VALUES (?1,?2,?3,?4,?5,?6,?7,?8,?9,?10, ?11)
127 ",
128 (
129 c.funding_txid,
130 c.short_channel_id,
131 c.state.to_string(),
132 c.spendable_msat,
133 c.local_balance_msat,
134 c.receivable_msat,
135 match c.state {
136 ChannelState::Opened | ChannelState::PendingOpen => None,
137 _ => c.closed_at,
138 },
139 c.funding_outnum,
140 c.alias_local,
141 c.alias_remote,
142 c.closing_txid,
143 ),
144 )?;
145 Ok(())
146 }
147}
148
149#[test]
150fn test_simple_sync_channels() {
151 use crate::persist::test_utils;
152
153 let storage = SqliteStorage::new(test_utils::create_test_sql_dir());
154
155 storage.init().unwrap();
156 let channels = vec![
157 Channel {
158 funding_txid: "123".to_string(),
159 short_channel_id: Some("10x11x12".to_string()),
160 state: ChannelState::Opened,
161 spendable_msat: 100,
162 local_balance_msat: 100,
163 receivable_msat: 1000,
164 closed_at: None,
165 funding_outnum: None,
166 alias_local: None,
167 alias_remote: None,
168 closing_txid: None,
169 htlcs: Vec::new(),
170 },
171 Channel {
172 funding_txid: "456".to_string(),
173 short_channel_id: Some("13x14x15".to_string()),
174 state: ChannelState::Opened,
175 spendable_msat: 200,
176 local_balance_msat: 200,
177 receivable_msat: 2000,
178 closed_at: None,
179 funding_outnum: None,
180 alias_local: None,
181 alias_remote: None,
182 closing_txid: None,
183 htlcs: Vec::new(),
184 },
185 ];
186
187 storage.update_channels(&channels).unwrap();
188 let queried_channels = storage.list_channels().unwrap();
189 assert_eq!(channels, queried_channels);
190
191 storage.update_channels(&channels).unwrap();
192 let queried_channels = storage.list_channels().unwrap();
193 assert_eq!(channels, queried_channels);
194}
195
196#[test]
197fn test_sync_none_short_channel_id() {
198 use crate::persist::test_utils;
199
200 let storage = SqliteStorage::new(test_utils::create_test_sql_dir());
201
202 storage.init().unwrap();
203 let channels = vec![
204 Channel {
205 funding_txid: "123".to_string(),
206 short_channel_id: None,
207 state: ChannelState::Opened,
208 spendable_msat: 100,
209 local_balance_msat: 100,
210 receivable_msat: 1000,
211 closed_at: None,
212 funding_outnum: None,
213 alias_local: None,
214 alias_remote: None,
215 closing_txid: None,
216 htlcs: Vec::new(),
217 },
218 Channel {
219 funding_txid: "456".to_string(),
220 short_channel_id: None,
221 state: ChannelState::Closed,
222 spendable_msat: 200,
223 local_balance_msat: 200,
224 receivable_msat: 2000,
225 closed_at: None,
226 funding_outnum: None,
227 alias_local: None,
228 alias_remote: None,
229 closing_txid: None,
230 htlcs: Vec::new(),
231 },
232 ];
233
234 storage.update_channels(&channels).unwrap();
235 let queried_channels = storage.list_channels().unwrap();
236 assert_eq!(channels, queried_channels);
237 assert!(queried_channels[0].short_channel_id.is_none());
238 assert!(queried_channels[1].short_channel_id.is_none());
239
240 let mut channels = channels.clone();
243 channels[0].short_channel_id = Some("10x11x12".to_string());
244 channels[1].short_channel_id = Some("13x14x15".to_string());
245
246 storage.update_channels(&channels).unwrap();
247 let queried_channels = storage.list_channels().unwrap();
248 assert_eq!(channels, queried_channels);
249 assert!(queried_channels[0].short_channel_id.is_some());
250 assert!(queried_channels[1].short_channel_id.is_some());
251}
252
253#[test]
254fn test_sync_closed_channels() {
255 use crate::persist::test_utils;
256
257 let storage = SqliteStorage::new(test_utils::create_test_sql_dir());
258
259 storage.init().unwrap();
260 let channels = vec![
261 Channel {
262 funding_txid: "123".to_string(),
263 short_channel_id: Some("10x11x12".to_string()),
264 state: ChannelState::Opened,
265 spendable_msat: 100,
266 local_balance_msat: 100,
267 receivable_msat: 1000,
268 closed_at: None,
269 funding_outnum: None,
270 alias_local: None,
271 alias_remote: None,
272 closing_txid: None,
273 htlcs: Vec::new(),
274 },
275 Channel {
277 funding_txid: "456".to_string(),
278 short_channel_id: Some("13x14x15".to_string()),
279 state: ChannelState::Closed,
280 spendable_msat: 200,
281 local_balance_msat: 200,
282 receivable_msat: 2000,
283 closed_at: Some(1),
284 funding_outnum: None,
285 alias_local: None,
286 alias_remote: None,
287 closing_txid: Some("a".into()),
288 htlcs: Vec::new(),
289 },
290 ];
291
292 storage.update_channels(&channels).unwrap();
293 let queried_channels = storage.list_channels().unwrap();
294 assert_eq!(2, queried_channels.len());
295 assert_eq!(channels[0], queried_channels[0]);
296 assert!(queried_channels[1].closed_at.is_some());
297 assert!(queried_channels[1].closing_txid.is_some());
298
299 storage.update_channels(&channels).unwrap();
300 let queried_channels = storage.list_channels().unwrap();
301 assert_eq!(channels[0], queried_channels[0]);
302
303 storage.update_channels(&Vec::new()).unwrap();
305 let queried_channels = storage.list_channels().unwrap();
306 let expected = vec![
307 Channel {
308 funding_txid: "123".to_string(),
309 short_channel_id: Some("10x11x12".to_string()),
310 state: ChannelState::Closed,
311 spendable_msat: 100,
312 local_balance_msat: 100,
313 receivable_msat: 1000,
314 closed_at: None,
315 funding_outnum: None,
316 alias_local: None,
317 alias_remote: None,
318 closing_txid: None,
319 htlcs: Vec::new(),
320 },
321 Channel {
322 funding_txid: "456".to_string(),
323 short_channel_id: Some("13x14x15".to_string()),
324 state: ChannelState::Closed,
325 spendable_msat: 200,
326 local_balance_msat: 200,
327 receivable_msat: 2000,
328 closed_at: None,
329 funding_outnum: None,
330 alias_local: None,
331 alias_remote: None,
332 closing_txid: None,
333 htlcs: Vec::new(),
334 },
335 ];
336 assert_eq!(expected.len(), queried_channels.len());
337 assert!(queried_channels[0].closed_at.is_none());
340 assert!(queried_channels[0].closing_txid.is_none());
341 assert!(queried_channels[1].closed_at.is_some());
344 assert!(queried_channels[1].closing_txid.is_some());
345
346 storage.update_channels(&channels).unwrap();
348 let queried_channels = storage.list_channels().unwrap();
349 assert_eq!(channels.len(), queried_channels.len());
350}