breez_sdk_liquid/swapper/boltz/status_stream.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164
use std::collections::HashSet;
use std::time::Duration;
use crate::swapper::{
boltz::BoltzSwapper, ProxyUrlFetcher, SubscriptionHandler, SwapperStatusStream,
};
use anyhow::Result;
use boltz_client::boltz::{
self,
tokio_tungstenite_wasm::{Message, WebSocketStream},
WsRequest, WsResponse,
};
use futures_util::{stream::SplitSink, SinkExt, StreamExt};
use log::{debug, error, info, warn};
use sdk_common::utils::Arc;
use tokio::sync::{broadcast, watch};
use tokio_with_wasm::alias as tokio;
impl<P: ProxyUrlFetcher> BoltzSwapper<P> {
async fn send_subscription(
&self,
swap_id: String,
sender: &mut SplitSink<WebSocketStream, Message>,
) {
info!("Subscribing to status updates for swap ID {swap_id}");
let subscription = WsRequest::subscribe_swap_request(&swap_id);
match serde_json::to_string(&subscription) {
Ok(subscribe_json) => match sender.send(Message::Text(subscribe_json.into())).await {
Ok(_) => info!("Subscribed"),
Err(e) => error!("Failed to subscribe to {swap_id}: {e:?}"),
},
Err(e) => error!("Invalid subscription msg: {e:?}"),
}
}
}
impl<P: ProxyUrlFetcher> SwapperStatusStream for BoltzSwapper<P> {
fn start(
self: Arc<Self>,
callback: Box<dyn SubscriptionHandler>,
mut shutdown: watch::Receiver<()>,
) {
let keep_alive_ping_interval = Duration::from_secs(15);
let reconnect_delay = Duration::from_secs(2);
let swapper = Arc::clone(&self);
tokio::spawn(async move {
loop {
debug!("Start of ws stream loop");
let client = match swapper.get_boltz_client().await {
Ok(client) => client,
Err(e) => {
warn!("Failed to get swapper client: {e:?}");
tokio::time::sleep(reconnect_delay).await;
continue;
}
};
match client.inner.connect_ws().await {
Ok(ws_stream) => {
let (mut sender, mut receiver) = ws_stream.split();
let mut tracked_swap_ids: HashSet<String> = HashSet::new();
let mut subscription_stream = self.subscription_notifier.subscribe();
callback.subscribe_swaps().await;
let mut interval = tokio::time::interval(keep_alive_ping_interval);
#[cfg(not(all(target_family = "wasm", target_os = "unknown")))]
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
tokio::select! {
_ = shutdown.changed() => {
info!("Received shutdown signal, exiting Status Stream loop");
return;
},
_ = interval.tick() => {
match serde_json::to_string(&WsRequest::Ping) {
Ok(ping_msg) => {
match sender.send(Message::Text(ping_msg.into())).await {
Ok(_) => debug!("Sent keep-alive ping"),
Err(e) => warn!("Failed to send keep-alive ping: {e:?}"),
}
},
Err(e) => error!("Failed to serialize ping message: {e:?}"),
}
},
swap_res = subscription_stream.recv() => match swap_res {
Ok(swap_id) => {
if !tracked_swap_ids.contains(&swap_id) {
self.send_subscription(swap_id.clone(), &mut sender).await;
tracked_swap_ids.insert(swap_id.clone());
}
},
Err(e) => error!("Received error on subscription stream: {e:?}"),
},
maybe_next = receiver.next() => match maybe_next {
Some(msg) => match msg {
Ok(Message::Close(_)) => {
warn!("Received close msg, exiting socket loop");
tokio::time::sleep(reconnect_delay).await;
break;
},
Ok(Message::Text(payload)) => {
let payload = payload.as_str();
info!("Received text msg: {payload:?}");
match serde_json::from_str::<WsResponse>(payload) {
// Subscribing/unsubscribing confirmation
Ok(WsResponse::Subscribe { .. }) | Ok(WsResponse::Unsubscribe { .. }) => {}
// Status update(s)
Ok(WsResponse::Update(update)) => {
for update in update.args {
let _ = self.update_notifier.send(update);
}
}
// A response to one of our pings
Ok(WsResponse::Pong) => debug!("Received pong"),
// Either an invalid response, or an error related to subscription
Err(e) => error!("Failed to parse websocket response: {e:?} - response: {payload}"),
}
},
Ok(msg) => warn!("Unhandled msg: {msg:?}"),
Err(e) => {
error!("Received stream error: {e:?}");
let _ = sender.close().await;
break;
}
},
None => {
warn!("Received nothing from the stream");
let _ = sender.close().await;
tokio::time::sleep(reconnect_delay).await;
break;
},
}
}
}
}
Err(e) => {
warn!("Error connecting to stream: {e:?}");
tokio::time::sleep(reconnect_delay).await;
}
}
}
});
}
fn track_swap_id(&self, swap_id: &str) -> Result<()> {
let _ = self.subscription_notifier.send(swap_id.to_string());
Ok(())
}
fn subscribe_swap_updates(&self) -> broadcast::Receiver<boltz::SwapStatus> {
self.update_notifier.subscribe()
}
}