breez_sdk_liquid/sync/
client.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
use anyhow::{anyhow, Error, Result};

use log::debug;
use maybe_sync::{MaybeSend, MaybeSync};
use sdk_common::grpc::transport::{GrpcClient, Transport};
use tokio::sync::Mutex;
use tonic::{
    metadata::{errors::InvalidMetadataValue, Ascii, MetadataValue},
    service::{interceptor::InterceptedService, Interceptor},
    Request, Status, Streaming,
};

use super::model::{
    syncer_client::SyncerClient as ProtoSyncerClient, ListChangesReply, ListChangesRequest,
    ListenChangesRequest, Notification, SetRecordReply, SetRecordRequest,
};

#[sdk_macros::async_trait]
pub(crate) trait SyncerClient: MaybeSend + MaybeSync {
    async fn connect(&self, connect_url: String) -> Result<()>;
    async fn push(&self, req: SetRecordRequest) -> Result<SetRecordReply>;
    async fn pull(&self, req: ListChangesRequest) -> Result<ListChangesReply>;
    async fn listen(&self, req: ListenChangesRequest) -> Result<Streaming<Notification>>;
    async fn disconnect(&self) -> Result<()>;
}

pub(crate) struct BreezSyncerClient {
    grpc_channel: Mutex<Option<Transport>>,
    api_key: Option<String>,
}

impl BreezSyncerClient {
    pub(crate) fn new(api_key: Option<String>) -> Self {
        Self {
            grpc_channel: Mutex::new(None),
            api_key,
        }
    }

    fn api_key_metadata(&self) -> Result<Option<MetadataValue<Ascii>>, Error> {
        match &self.api_key {
            Some(key) => Ok(Some(format!("Bearer {key}").parse().map_err(
                |e: InvalidMetadataValue| {
                    anyhow!(format!(
                        "(Breez: {:?}) Failed parse API key: {e}",
                        self.api_key
                    ))
                },
            )?)),
            _ => Ok(None),
        }
    }
}

impl BreezSyncerClient {
    async fn get_client(
        &self,
    ) -> Result<ProtoSyncerClient<InterceptedService<Transport, ApiKeyInterceptor>>, Error> {
        let Some(channel) = self.grpc_channel.lock().await.clone() else {
            return Err(anyhow!("Cannot get sync client: not connected"));
        };
        let api_key_metadata = self.api_key_metadata()?;
        Ok(ProtoSyncerClient::with_interceptor(
            channel,
            ApiKeyInterceptor { api_key_metadata },
        ))
    }
}

#[sdk_macros::async_trait]
impl SyncerClient for BreezSyncerClient {
    async fn connect(&self, connect_url: String) -> Result<()> {
        let mut grpc_channel = self.grpc_channel.lock().await;
        *grpc_channel = Some(GrpcClient::new(connect_url.clone())?.into_inner());
        debug!("Successfully connected to {connect_url}");
        Ok(())
    }

    async fn push(&self, req: SetRecordRequest) -> Result<SetRecordReply> {
        Ok(self.get_client().await?.set_record(req).await?.into_inner())
    }

    async fn pull(&self, req: ListChangesRequest) -> Result<ListChangesReply> {
        Ok(self
            .get_client()
            .await?
            .list_changes(req)
            .await?
            .into_inner())
    }

    async fn listen(&self, req: ListenChangesRequest) -> Result<Streaming<Notification>> {
        Ok(self
            .get_client()
            .await?
            .listen_changes(req)
            .await?
            .into_inner())
    }

    async fn disconnect(&self) -> Result<()> {
        let mut channel = self.grpc_channel.lock().await;
        *channel = None;
        Ok(())
    }
}

#[derive(Clone)]
pub struct ApiKeyInterceptor {
    api_key_metadata: Option<MetadataValue<Ascii>>,
}

impl Interceptor for ApiKeyInterceptor {
    fn call(&mut self, mut req: Request<()>) -> Result<Request<()>, Status> {
        if let Some(api_key_metadata) = &self.api_key_metadata {
            req.metadata_mut()
                .insert("authorization", api_key_metadata.clone());
        }
        Ok(req)
    }
}