breez_sdk_core/persist/
channels.rs

1use crate::models::*;
2use std::collections::HashMap;
3
4use super::{db::SqliteStorage, error::PersistResult};
5use std::str::FromStr;
6
7impl SqliteStorage {
8    /// Expects a full list of (non-closed) channels.
9    ///
10    /// Any known channel that is missing from the list, will be marked as closed. When doing so, the
11    /// closing-related fields `closed_at` and `closing_txid` are not set, because doing so would require
12    /// a chain service lookup. Instead, they will be set on first lookup in
13    /// [BreezServices::closed_channel_to_transaction]
14    pub(crate) fn update_channels(&self, fetched_channels: &[Channel]) -> PersistResult<()> {
15        // create a hash map of the channels before the update
16        let channels_before_update = self
17            .list_channels()?
18            .into_iter()
19            .map(|c| (c.funding_txid.clone(), c))
20            .collect::<HashMap<_, _>>();
21
22        // merge the closed_at and closed_txid from the persisted channels into the fetched channels
23        let new_channels: Vec<Channel> = fetched_channels
24            .iter()
25            .map(|c| {
26                let persisted_channel = channels_before_update.get(&c.funding_txid);
27                let mut cloned_channel = c.clone();
28                if let Some(unwrapped_channel) = persisted_channel {
29                    cloned_channel.closed_at = unwrapped_channel.closed_at;
30                    cloned_channel
31                        .closing_txid
32                        .clone_from(&unwrapped_channel.closing_txid);
33                }
34                cloned_channel
35            })
36            .collect();
37
38        // insert all channels
39        for c in new_channels.iter().cloned() {
40            self.insert_or_update_channel(c)?
41        }
42
43        let funding_txs: Vec<String> = new_channels
44            .iter()
45            .cloned()
46            .map(|c| format!("'{}'", c.funding_txid))
47            .collect();
48
49        // Close channels not in the list provided
50        self.get_connection()?.execute(
51            format!(
52                "
53                 UPDATE channels 
54                 SET 
55                  state=?1
56                 where funding_txid not in ({})
57                ",
58                funding_txs.join(",")
59            )
60            .as_str(),
61            (ChannelState::Closed.to_string(),),
62        )?;
63
64        Ok(())
65    }
66
67    pub(crate) fn list_channels(&self) -> PersistResult<Vec<Channel>> {
68        let con = self.get_connection()?;
69        let mut stmt = con.prepare(
70            "
71               SELECT
72                funding_txid, 
73                short_channel_id,
74                state, 
75                spendable_msat,
76                local_balance_msat,
77                receivable_msat,
78                closed_at,
79                funding_outnum,
80                alias_local,
81                alias_remote,
82                closing_txid
83               FROM channels             
84             ",
85        )?;
86        let channels: Vec<Channel> = stmt
87            .query_map([], |row| {
88                let state_str: String = row.get(2)?;
89                Ok(Channel {
90                    funding_txid: row.get(0)?,
91                    short_channel_id: row.get(1)?,
92                    state: ChannelState::from_str(state_str.as_str())
93                        .unwrap_or(ChannelState::Closed),
94                    spendable_msat: row.get(3)?,
95                    local_balance_msat: row.get(4)?,
96                    receivable_msat: row.get(5)?,
97                    closed_at: row.get(6)?,
98                    funding_outnum: row.get(7)?,
99                    alias_local: row.get(8)?,
100                    alias_remote: row.get(9)?,
101                    closing_txid: row.get(10)?,
102                    htlcs: Vec::new(),
103                })
104            })?
105            .map(|i| i.unwrap())
106            .collect();
107
108        Ok(channels)
109    }
110
111    pub(crate) fn insert_or_update_channel(&self, c: Channel) -> PersistResult<()> {
112        self.get_connection()?.execute(
113            "INSERT OR REPLACE INTO channels (
114                   funding_txid, 
115                   short_channel_id,
116                   state,
117                   spendable_msat,
118                   local_balance_msat,
119                   receivable_msat,
120                   closed_at,
121                   funding_outnum,                   
122                   alias_local,
123                   alias_remote,
124                   closing_txid
125                  )
126                  VALUES (?1,?2,?3,?4,?5,?6,?7,?8,?9,?10, ?11)
127               ",
128            (
129                c.funding_txid,
130                c.short_channel_id,
131                c.state.to_string(),
132                c.spendable_msat,
133                c.local_balance_msat,
134                c.receivable_msat,
135                match c.state {
136                    ChannelState::Opened | ChannelState::PendingOpen => None,
137                    _ => c.closed_at,
138                },
139                c.funding_outnum,
140                c.alias_local,
141                c.alias_remote,
142                c.closing_txid,
143            ),
144        )?;
145        Ok(())
146    }
147}
148
149#[test]
150fn test_simple_sync_channels() {
151    use crate::persist::test_utils;
152
153    let storage = SqliteStorage::new(test_utils::create_test_sql_dir());
154
155    storage.init().unwrap();
156    let channels = vec![
157        Channel {
158            funding_txid: "123".to_string(),
159            short_channel_id: Some("10x11x12".to_string()),
160            state: ChannelState::Opened,
161            spendable_msat: 100,
162            local_balance_msat: 100,
163            receivable_msat: 1000,
164            closed_at: None,
165            funding_outnum: None,
166            alias_local: None,
167            alias_remote: None,
168            closing_txid: None,
169            htlcs: Vec::new(),
170        },
171        Channel {
172            funding_txid: "456".to_string(),
173            short_channel_id: Some("13x14x15".to_string()),
174            state: ChannelState::Opened,
175            spendable_msat: 200,
176            local_balance_msat: 200,
177            receivable_msat: 2000,
178            closed_at: None,
179            funding_outnum: None,
180            alias_local: None,
181            alias_remote: None,
182            closing_txid: None,
183            htlcs: Vec::new(),
184        },
185    ];
186
187    storage.update_channels(&channels).unwrap();
188    let queried_channels = storage.list_channels().unwrap();
189    assert_eq!(channels, queried_channels);
190
191    storage.update_channels(&channels).unwrap();
192    let queried_channels = storage.list_channels().unwrap();
193    assert_eq!(channels, queried_channels);
194}
195
196#[test]
197fn test_sync_none_short_channel_id() {
198    use crate::persist::test_utils;
199
200    let storage = SqliteStorage::new(test_utils::create_test_sql_dir());
201
202    storage.init().unwrap();
203    let channels = vec![
204        Channel {
205            funding_txid: "123".to_string(),
206            short_channel_id: None,
207            state: ChannelState::Opened,
208            spendable_msat: 100,
209            local_balance_msat: 100,
210            receivable_msat: 1000,
211            closed_at: None,
212            funding_outnum: None,
213            alias_local: None,
214            alias_remote: None,
215            closing_txid: None,
216            htlcs: Vec::new(),
217        },
218        Channel {
219            funding_txid: "456".to_string(),
220            short_channel_id: None,
221            state: ChannelState::Closed,
222            spendable_msat: 200,
223            local_balance_msat: 200,
224            receivable_msat: 2000,
225            closed_at: None,
226            funding_outnum: None,
227            alias_local: None,
228            alias_remote: None,
229            closing_txid: None,
230            htlcs: Vec::new(),
231        },
232    ];
233
234    storage.update_channels(&channels).unwrap();
235    let queried_channels = storage.list_channels().unwrap();
236    assert_eq!(channels, queried_channels);
237    assert!(queried_channels[0].short_channel_id.is_none());
238    assert!(queried_channels[1].short_channel_id.is_none());
239
240    // The short_channel_id is set once the funding tx has been confirmed.
241    // Make sure the short_channel_id is set after this update.
242    let mut channels = channels.clone();
243    channels[0].short_channel_id = Some("10x11x12".to_string());
244    channels[1].short_channel_id = Some("13x14x15".to_string());
245
246    storage.update_channels(&channels).unwrap();
247    let queried_channels = storage.list_channels().unwrap();
248    assert_eq!(channels, queried_channels);
249    assert!(queried_channels[0].short_channel_id.is_some());
250    assert!(queried_channels[1].short_channel_id.is_some());
251}
252
253#[test]
254fn test_sync_closed_channels() {
255    use crate::persist::test_utils;
256
257    let storage = SqliteStorage::new(test_utils::create_test_sql_dir());
258
259    storage.init().unwrap();
260    let channels = vec![
261        Channel {
262            funding_txid: "123".to_string(),
263            short_channel_id: Some("10x11x12".to_string()),
264            state: ChannelState::Opened,
265            spendable_msat: 100,
266            local_balance_msat: 100,
267            receivable_msat: 1000,
268            closed_at: None,
269            funding_outnum: None,
270            alias_local: None,
271            alias_remote: None,
272            closing_txid: None,
273            htlcs: Vec::new(),
274        },
275        // Simulate closed channel that was persisted with closed_at and closing_txid
276        Channel {
277            funding_txid: "456".to_string(),
278            short_channel_id: Some("13x14x15".to_string()),
279            state: ChannelState::Closed,
280            spendable_msat: 200,
281            local_balance_msat: 200,
282            receivable_msat: 2000,
283            closed_at: Some(1),
284            funding_outnum: None,
285            alias_local: None,
286            alias_remote: None,
287            closing_txid: Some("a".into()),
288            htlcs: Vec::new(),
289        },
290    ];
291
292    storage.update_channels(&channels).unwrap();
293    let queried_channels = storage.list_channels().unwrap();
294    assert_eq!(2, queried_channels.len());
295    assert_eq!(channels[0], queried_channels[0]);
296    assert!(queried_channels[1].closed_at.is_some());
297    assert!(queried_channels[1].closing_txid.is_some());
298
299    storage.update_channels(&channels).unwrap();
300    let queried_channels = storage.list_channels().unwrap();
301    assert_eq!(channels[0], queried_channels[0]);
302
303    // test all channels were closed
304    storage.update_channels(&Vec::new()).unwrap();
305    let queried_channels = storage.list_channels().unwrap();
306    let expected = vec![
307        Channel {
308            funding_txid: "123".to_string(),
309            short_channel_id: Some("10x11x12".to_string()),
310            state: ChannelState::Closed,
311            spendable_msat: 100,
312            local_balance_msat: 100,
313            receivable_msat: 1000,
314            closed_at: None,
315            funding_outnum: None,
316            alias_local: None,
317            alias_remote: None,
318            closing_txid: None,
319            htlcs: Vec::new(),
320        },
321        Channel {
322            funding_txid: "456".to_string(),
323            short_channel_id: Some("13x14x15".to_string()),
324            state: ChannelState::Closed,
325            spendable_msat: 200,
326            local_balance_msat: 200,
327            receivable_msat: 2000,
328            closed_at: None,
329            funding_outnum: None,
330            alias_local: None,
331            alias_remote: None,
332            closing_txid: None,
333            htlcs: Vec::new(),
334        },
335    ];
336    assert_eq!(expected.len(), queried_channels.len());
337    // For channel inserted WITHOUT closed_at and closing_txid,
338    // the closing-related fields are empty on channel queried directly from DB
339    assert!(queried_channels[0].closed_at.is_none());
340    assert!(queried_channels[0].closing_txid.is_none());
341    // For channel inserted WITH closed_at and closing_txid (as for example after a chain service lookup),
342    // the closing-related fields are not empty on channel queried directly from DB
343    assert!(queried_channels[1].closed_at.is_some());
344    assert!(queried_channels[1].closing_txid.is_some());
345
346    // test dedup channels in db
347    storage.update_channels(&channels).unwrap();
348    let queried_channels = storage.list_channels().unwrap();
349    assert_eq!(channels.len(), queried_channels.len());
350}