breez_sdk_liquid/sync/
client.rs

1use anyhow::{anyhow, Error, Result};
2
3use log::debug;
4use maybe_sync::{MaybeSend, MaybeSync};
5use sdk_common::grpc::transport::{GrpcClient, Transport};
6use tokio::sync::Mutex;
7use tonic::{
8    metadata::{errors::InvalidMetadataValue, Ascii, MetadataValue},
9    service::{interceptor::InterceptedService, Interceptor},
10    Request, Status, Streaming,
11};
12
13use super::model::{
14    syncer_client::SyncerClient as ProtoSyncerClient, ListChangesReply, ListChangesRequest,
15    ListenChangesRequest, Notification, SetRecordReply, SetRecordRequest,
16};
17
18#[sdk_macros::async_trait]
19pub(crate) trait SyncerClient: MaybeSend + MaybeSync {
20    async fn connect(&self, connect_url: String) -> Result<()>;
21    async fn push(&self, req: SetRecordRequest) -> Result<SetRecordReply>;
22    async fn pull(&self, req: ListChangesRequest) -> Result<ListChangesReply>;
23    async fn listen(&self, req: ListenChangesRequest) -> Result<Streaming<Notification>>;
24    async fn disconnect(&self) -> Result<()>;
25}
26
27pub(crate) struct BreezSyncerClient {
28    grpc_channel: Mutex<Option<Transport>>,
29    api_key: Option<String>,
30}
31
32impl BreezSyncerClient {
33    pub(crate) fn new(api_key: Option<String>) -> Self {
34        Self {
35            grpc_channel: Mutex::new(None),
36            api_key,
37        }
38    }
39
40    fn api_key_metadata(&self) -> Result<Option<MetadataValue<Ascii>>, Error> {
41        match &self.api_key {
42            Some(key) => Ok(Some(format!("Bearer {key}").parse().map_err(
43                |e: InvalidMetadataValue| {
44                    anyhow!(format!(
45                        "(Breez: {:?}) Failed parse API key: {e}",
46                        self.api_key
47                    ))
48                },
49            )?)),
50            _ => Ok(None),
51        }
52    }
53}
54
55impl BreezSyncerClient {
56    async fn get_client(
57        &self,
58    ) -> Result<ProtoSyncerClient<InterceptedService<Transport, ApiKeyInterceptor>>, Error> {
59        let Some(channel) = self.grpc_channel.lock().await.clone() else {
60            return Err(anyhow!("Cannot get sync client: not connected"));
61        };
62        let api_key_metadata = self.api_key_metadata()?;
63        Ok(ProtoSyncerClient::with_interceptor(
64            channel,
65            ApiKeyInterceptor { api_key_metadata },
66        ))
67    }
68}
69
70#[sdk_macros::async_trait]
71impl SyncerClient for BreezSyncerClient {
72    async fn connect(&self, connect_url: String) -> Result<()> {
73        let mut grpc_channel = self.grpc_channel.lock().await;
74        *grpc_channel = Some(GrpcClient::new(connect_url.clone())?.into_inner());
75        debug!("Successfully connected to {connect_url}");
76        Ok(())
77    }
78
79    async fn push(&self, req: SetRecordRequest) -> Result<SetRecordReply> {
80        Ok(self.get_client().await?.set_record(req).await?.into_inner())
81    }
82
83    async fn pull(&self, req: ListChangesRequest) -> Result<ListChangesReply> {
84        Ok(self
85            .get_client()
86            .await?
87            .list_changes(req)
88            .await?
89            .into_inner())
90    }
91
92    async fn listen(&self, req: ListenChangesRequest) -> Result<Streaming<Notification>> {
93        Ok(self
94            .get_client()
95            .await?
96            .listen_changes(req)
97            .await?
98            .into_inner())
99    }
100
101    async fn disconnect(&self) -> Result<()> {
102        let mut channel = self.grpc_channel.lock().await;
103        *channel = None;
104        Ok(())
105    }
106}
107
108#[derive(Clone)]
109pub struct ApiKeyInterceptor {
110    api_key_metadata: Option<MetadataValue<Ascii>>,
111}
112
113impl Interceptor for ApiKeyInterceptor {
114    fn call(&mut self, mut req: Request<()>) -> Result<Request<()>, Status> {
115        if let Some(api_key_metadata) = &self.api_key_metadata {
116            req.metadata_mut()
117                .insert("authorization", api_key_metadata.clone());
118        }
119        Ok(req)
120    }
121}