breez_sdk_liquid/sync/
client.rs

1use anyhow::{anyhow, Error, Result};
2
3use log::debug;
4use sdk_common::grpc::transport::{GrpcClient, Transport};
5use tokio::sync::Mutex;
6use tonic::{
7    metadata::{errors::InvalidMetadataValue, Ascii, MetadataValue},
8    service::{interceptor::InterceptedService, Interceptor},
9    Request, Status, Streaming,
10};
11
12use super::model::{
13    syncer_client::SyncerClient as ProtoSyncerClient, ListChangesReply, ListChangesRequest,
14    ListenChangesRequest, Notification, SetRecordReply, SetRecordRequest,
15};
16
17#[sdk_macros::async_trait]
18pub(crate) trait SyncerClient: Send + Sync {
19    async fn connect(&self, connect_url: String) -> Result<()>;
20    async fn push(&self, req: SetRecordRequest) -> Result<SetRecordReply>;
21    async fn pull(&self, req: ListChangesRequest) -> Result<ListChangesReply>;
22    async fn listen(&self, req: ListenChangesRequest) -> Result<Streaming<Notification>>;
23    async fn disconnect(&self) -> Result<()>;
24}
25
26pub(crate) struct BreezSyncerClient {
27    grpc_channel: Mutex<Option<Transport>>,
28    api_key: Option<String>,
29}
30
31impl BreezSyncerClient {
32    pub(crate) fn new(api_key: Option<String>) -> Self {
33        Self {
34            grpc_channel: Mutex::new(None),
35            api_key,
36        }
37    }
38
39    fn api_key_metadata(&self) -> Result<Option<MetadataValue<Ascii>>, Error> {
40        match &self.api_key {
41            Some(key) => Ok(Some(format!("Bearer {key}").parse().map_err(
42                |e: InvalidMetadataValue| {
43                    anyhow!(format!(
44                        "(Breez: {:?}) Failed parse API key: {e}",
45                        self.api_key
46                    ))
47                },
48            )?)),
49            _ => Ok(None),
50        }
51    }
52}
53
54impl BreezSyncerClient {
55    async fn get_client(
56        &self,
57    ) -> Result<ProtoSyncerClient<InterceptedService<Transport, ApiKeyInterceptor>>, Error> {
58        let Some(channel) = self.grpc_channel.lock().await.clone() else {
59            return Err(anyhow!("Cannot get sync client: not connected"));
60        };
61        let api_key_metadata = self.api_key_metadata()?;
62        Ok(ProtoSyncerClient::with_interceptor(
63            channel,
64            ApiKeyInterceptor { api_key_metadata },
65        ))
66    }
67}
68
69#[sdk_macros::async_trait]
70impl SyncerClient for BreezSyncerClient {
71    async fn connect(&self, connect_url: String) -> Result<()> {
72        let mut grpc_channel = self.grpc_channel.lock().await;
73        *grpc_channel = Some(GrpcClient::new(connect_url.clone())?.into_inner());
74        debug!("Successfully connected to {connect_url}");
75        Ok(())
76    }
77
78    async fn push(&self, req: SetRecordRequest) -> Result<SetRecordReply> {
79        Ok(self.get_client().await?.set_record(req).await?.into_inner())
80    }
81
82    async fn pull(&self, req: ListChangesRequest) -> Result<ListChangesReply> {
83        Ok(self
84            .get_client()
85            .await?
86            .list_changes(req)
87            .await?
88            .into_inner())
89    }
90
91    async fn listen(&self, req: ListenChangesRequest) -> Result<Streaming<Notification>> {
92        Ok(self
93            .get_client()
94            .await?
95            .listen_changes(req)
96            .await?
97            .into_inner())
98    }
99
100    async fn disconnect(&self) -> Result<()> {
101        let mut channel = self.grpc_channel.lock().await;
102        *channel = None;
103        Ok(())
104    }
105}
106
107#[derive(Clone)]
108pub struct ApiKeyInterceptor {
109    api_key_metadata: Option<MetadataValue<Ascii>>,
110}
111
112impl Interceptor for ApiKeyInterceptor {
113    fn call(&mut self, mut req: Request<()>) -> Result<Request<()>, Status> {
114        if let Some(api_key_metadata) = &self.api_key_metadata {
115            req.metadata_mut()
116                .insert("authorization", api_key_metadata.clone());
117        }
118        Ok(req)
119    }
120}