From 358a548a0a251a3269ea469bc4c47e930d5c11a3 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 15 Sep 2020 15:30:57 -0400 Subject: [PATCH] feat: implement skeleton Storage gRPC interface (#290) --- src/server/write_buffer_rpc.rs | 202 ++++++++++++++++++++++++++++++--- 1 file changed, 187 insertions(+), 15 deletions(-) diff --git a/src/server/write_buffer_rpc.rs b/src/server/write_buffer_rpc.rs index 81fa227f62..8b28b4cd15 100644 --- a/src/server/write_buffer_rpc.rs +++ b/src/server/write_buffer_rpc.rs @@ -1,17 +1,29 @@ //! This module contains gRPC service implementatations for the WriteBuffer //! storage implementation -use std::{net::SocketAddr, sync::Arc}; +// Something in instrument is causing lint warnings about unused braces +#![allow(unused_braces)] + +// Something about how `tracing::instrument` works triggers a clippy +// warning about complex types +#![allow(clippy::type_complexity)] + +use std::{collections::HashMap, net::SocketAddr, sync::Arc}; use delorean::storage::DatabaseStore; use snafu::{ResultExt, Snafu}; use delorean::generated_types::{ delorean_server::{Delorean, DeloreanServer}, - CreateBucketRequest, CreateBucketResponse, DeleteBucketRequest, DeleteBucketResponse, - GetBucketsResponse, Organization, + storage_server::{Storage, StorageServer}, + CapabilitiesResponse, CreateBucketRequest, CreateBucketResponse, DeleteBucketRequest, + DeleteBucketResponse, GetBucketsResponse, MeasurementFieldsRequest, MeasurementFieldsResponse, + MeasurementNamesRequest, MeasurementTagKeysRequest, MeasurementTagValuesRequest, Organization, + ReadFilterRequest, ReadGroupRequest, ReadResponse, StringValuesResponse, TagKeysRequest, + TagValuesRequest, }; +use tokio::sync::mpsc; use tonic::Status; #[derive(Debug, Snafu)] @@ -34,7 +46,7 @@ impl GrpcService where T: DatabaseStore + 'static, { - /// Create a new GrpcServer connected to `db_store` + /// Create a new GrpcService connected to `db_store` pub fn new(db_store: Arc) -> Self { Self { db_store } } @@ -69,6 +81,108 @@ where } } +/// Implementes the protobuf defined Storage service for a DatabaseStore +#[tonic::async_trait] +impl Storage for GrpcService +where + T: DatabaseStore + 'static, +{ + type ReadFilterStream = mpsc::Receiver>; + + async fn read_filter( + &self, + _req: tonic::Request, + ) -> Result, Status> { + Err(Status::unimplemented("read_filter")) + } + + type ReadGroupStream = mpsc::Receiver>; + + #[tracing::instrument(level = "debug")] + async fn read_group( + &self, + req: tonic::Request, + ) -> Result, Status> { + Err(Status::unimplemented("read_group")) + } + + type TagKeysStream = mpsc::Receiver>; + + #[tracing::instrument(level = "debug")] + async fn tag_keys( + &self, + req: tonic::Request, + ) -> Result, Status> { + Err(Status::unimplemented("tag_keys")) + } + + type TagValuesStream = mpsc::Receiver>; + + #[tracing::instrument(level = "debug")] + async fn tag_values( + &self, + req: tonic::Request, + ) -> Result, Status> { + Err(Status::unimplemented("tag_values")) + } + + #[tracing::instrument(level = "debug")] + async fn capabilities( + &self, + req: tonic::Request<()>, + ) -> Result, Status> { + // Full list of go capabilities in + // idpe/storage/read/capabilities.go (aka window aggregate / + // pushdown) + // + // For now, do not claim to support any capabilities + let caps = CapabilitiesResponse { + caps: HashMap::new(), + }; + Ok(tonic::Response::new(caps)) + } + + type MeasurementNamesStream = mpsc::Receiver>; + + #[tracing::instrument(level = "debug")] + async fn measurement_names( + &self, + req: tonic::Request, + ) -> Result, Status> { + Err(Status::unimplemented("measurement_names")) + } + + type MeasurementTagKeysStream = mpsc::Receiver>; + + #[tracing::instrument(level = "debug")] + async fn measurement_tag_keys( + &self, + req: tonic::Request, + ) -> Result, Status> { + Err(Status::unimplemented("measurement_tag_keys")) + } + + type MeasurementTagValuesStream = mpsc::Receiver>; + + #[tracing::instrument(level = "debug")] + async fn measurement_tag_values( + &self, + req: tonic::Request, + ) -> Result, Status> { + Err(Status::unimplemented("measurement_tag_values")) + } + + type MeasurementFieldsStream = mpsc::Receiver>; + + #[tracing::instrument(level = "debug")] + async fn measurement_fields( + &self, + req: tonic::Request, + ) -> Result, Status> { + Err(Status::unimplemented("measurement_fields")) + } +} + /// Instantiate a server listening on the specified address /// implementing the Delorean and Storage gRPC interfaces, the /// underlying hyper server instance. Resolves when the server has @@ -79,7 +193,7 @@ where { tonic::transport::Server::builder() .add_service(DeloreanServer::new(GrpcService::new(storage.clone()))) - //.add_service(StorageServer::new(GrpcServer { app: state.clone() })) + .add_service(StorageServer::new(GrpcService::new(storage.clone()))) .serve(bind_addr) .await .context(ServerError {}) @@ -95,11 +209,19 @@ mod tests { }; use tonic::Code; - use delorean_generated_types::delorean_client::DeloreanClient; + use delorean_generated_types::{delorean_client, storage_client}; + + type DeloreanClient = delorean_client::DeloreanClient; + type StorageClient = storage_client::StorageClient; + + struct Clients { + delorean_client: DeloreanClient, + storage_client: StorageClient, + } #[tokio::test] async fn test_delorean_rpc_create() -> Result<()> { - let mut delorean_client = make_test_server().await.expect("Connecting to test server"); + let mut clients = make_test_server().await.expect("Connecting to test server"); let org = Organization { id: 1337, @@ -107,8 +229,8 @@ mod tests { buckets: Vec::new(), }; - // Test basic bucket listing - let res = delorean_client.get_buckets(org).await; + // Test response from delorean server + let res = clients.delorean_client.get_buckets(org).await; match res { Err(e) => { @@ -116,16 +238,31 @@ mod tests { assert_eq!(e.message(), "get_buckets"); } Ok(buckets) => { - assert!(false, "Unexpected success: {:?}", buckets); + assert!(false, "Unexpected delorean_client success: {:?}", buckets); + } + }; + + // Test response from storage server + let res = clients.storage_client.capabilities(()).await; + match res { + Err(e) => { + assert!(false, "Unexpected storage_client error: {:?}", e); + assert_eq!(e.message(), "get_buckets"); + } + Ok(caps) => { + let expected_caps = CapabilitiesResponse { + caps: HashMap::new(), + }; + + assert_eq!(*caps.get_ref(), expected_caps); } }; Ok(()) } - // Start up a test server, returning a client suitable for communication with it. - async fn make_test_server( - ) -> Result, tonic::transport::Error> { + // Start up a test rpc server, returning clients suitable for communication with it. + async fn make_test_server() -> Result { let test_storage = Arc::new(TestDatabaseStore::new()); // TODO: specify port 0 to let the OS pick the port (need to // figure out how to get access to the actual addr from tonic) @@ -136,14 +273,49 @@ mod tests { let server = make_server(bind_addr, test_storage.clone()); tokio::task::spawn(server); - // Now, loop and try to make a client connection for 5 seconds + let delorean_client = connect_to_server::(bind_addr).await?; + let storage_client = connect_to_server::(bind_addr).await?; + + Ok(Clients { + delorean_client, + storage_client, + }) + } + + /// Represents something that can make a connection to a server + #[tonic::async_trait] + trait NewClient: Sized + std::fmt::Debug { + async fn connect(addr: String) -> Result; + } + + #[tonic::async_trait] + impl NewClient for DeloreanClient { + async fn connect(addr: String) -> Result { + Self::connect(addr).await + } + } + + #[tonic::async_trait] + impl NewClient for StorageClient { + async fn connect(addr: String) -> Result { + Self::connect(addr).await + } + } + + /// loop and try to make a client connection for 5 seconds, + /// returning the result of the connection + async fn connect_to_server(bind_addr: SocketAddr) -> Result + where + T: NewClient, + { const MAX_RETRIES: u32 = 10; let mut retry_count = 0; loop { let mut interval = tokio::time::interval(Duration::from_millis(500)); - match DeloreanClient::connect(format!("http://{}", bind_addr)).await { + match T::connect(format!("http://{}", bind_addr)).await { Ok(client) => { + println!("Sucessfully connected to server. Client: {:?}", client); return Ok(client); } Err(e) => {