breez_sdk_liquid/sync/
client.rs1use anyhow::{anyhow, Error, Result};
2
3use log::debug;
4use maybe_sync::{MaybeSend, MaybeSync};
5use sdk_common::grpc::transport::{GrpcClient, Transport};
6use tokio::sync::Mutex;
7use tonic::{
8 metadata::{errors::InvalidMetadataValue, Ascii, MetadataValue},
9 service::{interceptor::InterceptedService, Interceptor},
10 Request, Status, Streaming,
11};
12
13use super::model::{
14 syncer_client::SyncerClient as ProtoSyncerClient, ListChangesReply, ListChangesRequest,
15 ListenChangesRequest, Notification, SetRecordReply, SetRecordRequest,
16};
17
18#[sdk_macros::async_trait]
19pub(crate) trait SyncerClient: MaybeSend + MaybeSync {
20 async fn connect(&self, connect_url: String) -> Result<()>;
21 async fn push(&self, req: SetRecordRequest) -> Result<SetRecordReply>;
22 async fn pull(&self, req: ListChangesRequest) -> Result<ListChangesReply>;
23 async fn listen(&self, req: ListenChangesRequest) -> Result<Streaming<Notification>>;
24 async fn disconnect(&self) -> Result<()>;
25}
26
27pub(crate) struct BreezSyncerClient {
28 grpc_channel: Mutex<Option<Transport>>,
29 api_key: Option<String>,
30}
31
32impl BreezSyncerClient {
33 pub(crate) fn new(api_key: Option<String>) -> Self {
34 Self {
35 grpc_channel: Mutex::new(None),
36 api_key,
37 }
38 }
39
40 fn api_key_metadata(&self) -> Result<Option<MetadataValue<Ascii>>, Error> {
41 match &self.api_key {
42 Some(key) => Ok(Some(format!("Bearer {key}").parse().map_err(
43 |e: InvalidMetadataValue| {
44 anyhow!(format!(
45 "(Breez: {:?}) Failed parse API key: {e}",
46 self.api_key
47 ))
48 },
49 )?)),
50 _ => Ok(None),
51 }
52 }
53}
54
55impl BreezSyncerClient {
56 async fn get_client(
57 &self,
58 ) -> Result<ProtoSyncerClient<InterceptedService<Transport, ApiKeyInterceptor>>, Error> {
59 let Some(channel) = self.grpc_channel.lock().await.clone() else {
60 return Err(anyhow!("Cannot get sync client: not connected"));
61 };
62 let api_key_metadata = self.api_key_metadata()?;
63 Ok(ProtoSyncerClient::with_interceptor(
64 channel,
65 ApiKeyInterceptor { api_key_metadata },
66 ))
67 }
68}
69
70#[sdk_macros::async_trait]
71impl SyncerClient for BreezSyncerClient {
72 async fn connect(&self, connect_url: String) -> Result<()> {
73 let mut grpc_channel = self.grpc_channel.lock().await;
74 *grpc_channel = Some(GrpcClient::new(connect_url.clone())?.into_inner());
75 debug!("Successfully connected to {connect_url}");
76 Ok(())
77 }
78
79 async fn push(&self, req: SetRecordRequest) -> Result<SetRecordReply> {
80 Ok(self.get_client().await?.set_record(req).await?.into_inner())
81 }
82
83 async fn pull(&self, req: ListChangesRequest) -> Result<ListChangesReply> {
84 Ok(self
85 .get_client()
86 .await?
87 .list_changes(req)
88 .await?
89 .into_inner())
90 }
91
92 async fn listen(&self, req: ListenChangesRequest) -> Result<Streaming<Notification>> {
93 Ok(self
94 .get_client()
95 .await?
96 .listen_changes(req)
97 .await?
98 .into_inner())
99 }
100
101 async fn disconnect(&self) -> Result<()> {
102 let mut channel = self.grpc_channel.lock().await;
103 *channel = None;
104 Ok(())
105 }
106}
107
108#[derive(Clone)]
109pub struct ApiKeyInterceptor {
110 api_key_metadata: Option<MetadataValue<Ascii>>,
111}
112
113impl Interceptor for ApiKeyInterceptor {
114 fn call(&mut self, mut req: Request<()>) -> Result<Request<()>, Status> {
115 if let Some(api_key_metadata) = &self.api_key_metadata {
116 req.metadata_mut()
117 .insert("authorization", api_key_metadata.clone());
118 }
119 Ok(req)
120 }
121}