breez_sdk_liquid/sync/
client.rs1use anyhow::{anyhow, Error, Result};
2
3use log::debug;
4use sdk_common::grpc::transport::{GrpcClient, Transport};
5use tokio::sync::Mutex;
6use tonic::{
7 metadata::{errors::InvalidMetadataValue, Ascii, MetadataValue},
8 service::{interceptor::InterceptedService, Interceptor},
9 Request, Status, Streaming,
10};
11
12use super::model::{
13 syncer_client::SyncerClient as ProtoSyncerClient, ListChangesReply, ListChangesRequest,
14 ListenChangesRequest, Notification, SetRecordReply, SetRecordRequest,
15};
16
17#[sdk_macros::async_trait]
18pub(crate) trait SyncerClient: Send + Sync {
19 async fn connect(&self, connect_url: String) -> Result<()>;
20 async fn push(&self, req: SetRecordRequest) -> Result<SetRecordReply>;
21 async fn pull(&self, req: ListChangesRequest) -> Result<ListChangesReply>;
22 async fn listen(&self, req: ListenChangesRequest) -> Result<Streaming<Notification>>;
23 async fn disconnect(&self) -> Result<()>;
24}
25
26pub(crate) struct BreezSyncerClient {
27 grpc_channel: Mutex<Option<Transport>>,
28 api_key: Option<String>,
29}
30
31impl BreezSyncerClient {
32 pub(crate) fn new(api_key: Option<String>) -> Self {
33 Self {
34 grpc_channel: Mutex::new(None),
35 api_key,
36 }
37 }
38
39 fn api_key_metadata(&self) -> Result<Option<MetadataValue<Ascii>>, Error> {
40 match &self.api_key {
41 Some(key) => Ok(Some(format!("Bearer {key}").parse().map_err(
42 |e: InvalidMetadataValue| {
43 anyhow!(format!(
44 "(Breez: {:?}) Failed parse API key: {e}",
45 self.api_key
46 ))
47 },
48 )?)),
49 _ => Ok(None),
50 }
51 }
52}
53
54impl BreezSyncerClient {
55 async fn get_client(
56 &self,
57 ) -> Result<ProtoSyncerClient<InterceptedService<Transport, ApiKeyInterceptor>>, Error> {
58 let Some(channel) = self.grpc_channel.lock().await.clone() else {
59 return Err(anyhow!("Cannot get sync client: not connected"));
60 };
61 let api_key_metadata = self.api_key_metadata()?;
62 Ok(ProtoSyncerClient::with_interceptor(
63 channel,
64 ApiKeyInterceptor { api_key_metadata },
65 ))
66 }
67}
68
69#[sdk_macros::async_trait]
70impl SyncerClient for BreezSyncerClient {
71 async fn connect(&self, connect_url: String) -> Result<()> {
72 let mut grpc_channel = self.grpc_channel.lock().await;
73 *grpc_channel = Some(GrpcClient::new(connect_url.clone())?.into_inner());
74 debug!("Successfully connected to {connect_url}");
75 Ok(())
76 }
77
78 async fn push(&self, req: SetRecordRequest) -> Result<SetRecordReply> {
79 Ok(self.get_client().await?.set_record(req).await?.into_inner())
80 }
81
82 async fn pull(&self, req: ListChangesRequest) -> Result<ListChangesReply> {
83 Ok(self
84 .get_client()
85 .await?
86 .list_changes(req)
87 .await?
88 .into_inner())
89 }
90
91 async fn listen(&self, req: ListenChangesRequest) -> Result<Streaming<Notification>> {
92 Ok(self
93 .get_client()
94 .await?
95 .listen_changes(req)
96 .await?
97 .into_inner())
98 }
99
100 async fn disconnect(&self) -> Result<()> {
101 let mut channel = self.grpc_channel.lock().await;
102 *channel = None;
103 Ok(())
104 }
105}
106
107#[derive(Clone)]
108pub struct ApiKeyInterceptor {
109 api_key_metadata: Option<MetadataValue<Ascii>>,
110}
111
112impl Interceptor for ApiKeyInterceptor {
113 fn call(&mut self, mut req: Request<()>) -> Result<Request<()>, Status> {
114 if let Some(api_key_metadata) = &self.api_key_metadata {
115 req.metadata_mut()
116 .insert("authorization", api_key_metadata.clone());
117 }
118 Ok(req)
119 }
120}