From a5e09366b027d5d40cac938856061583f5ce3723 Mon Sep 17 00:00:00 2001 From: Jake Goulding Date: Fri, 22 Jan 2021 15:11:08 -0500 Subject: [PATCH 01/10] feat: Export arrow-flight from arrow-deps --- Cargo.lock | 17 +++++++++++++++++ arrow_deps/Cargo.toml | 1 + arrow_deps/src/lib.rs | 1 + 3 files changed, 19 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index 3e6ad08cc8..3ccc5bee00 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -121,11 +121,28 @@ dependencies = [ "serde_json", ] +[[package]] +name = "arrow-flight" +version = "4.0.0-SNAPSHOT" +source = "git+https://github.com/apache/arrow.git?rev=a7ae73be24f6653c2da8d0d37dd9686711eea974#a7ae73be24f6653c2da8d0d37dd9686711eea974" +dependencies = [ + "arrow", + "bytes", + "futures", + "proc-macro2", + "prost", + "prost-derive", + "tokio", + "tonic", + "tonic-build", +] + [[package]] name = "arrow_deps" version = "0.1.0" dependencies = [ "arrow", + "arrow-flight", "datafusion", "parquet", ] diff --git a/arrow_deps/Cargo.toml b/arrow_deps/Cargo.toml index ab598e3523..1b9a8a8c6e 100644 --- a/arrow_deps/Cargo.toml +++ b/arrow_deps/Cargo.toml @@ -14,6 +14,7 @@ description = "Apache Arrow / Parquet / DataFusion dependencies for InfluxDB IOx # The version can be found here: https://github.com/apache/arrow/commit/a7ae73be24f6653c2da8d0d37dd9686711eea974 # arrow = { git = "https://github.com/apache/arrow.git", rev = "a7ae73be24f6653c2da8d0d37dd9686711eea974" , features = ["simd"] } +arrow-flight = { git = "https://github.com/apache/arrow.git", rev = "a7ae73be24f6653c2da8d0d37dd9686711eea974" } datafusion = { git = "https://github.com/apache/arrow.git", rev = "a7ae73be24f6653c2da8d0d37dd9686711eea974" } # Turn off the "arrow" feature; it currently has a bug that causes the crate to rebuild every time # and we're not currently using it anyway diff --git a/arrow_deps/src/lib.rs b/arrow_deps/src/lib.rs index 9e575f0718..05fffedcb3 100644 --- a/arrow_deps/src/lib.rs +++ b/arrow_deps/src/lib.rs @@ -5,6 +5,7 @@ // export arrow, parquet, and datafusion publically so we can have a single // reference in cargo pub use arrow; +pub use arrow_flight; pub use datafusion; pub use parquet; From 95d5f759dfb1364b9efa48e0380ad5627f448596 Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Fri, 29 Jan 2021 14:16:10 -0500 Subject: [PATCH 02/10] chore: Organize use imports --- src/influxdb_ioxd/rpc/service.rs | 36 +++++++++++++------------------- 1 file changed, 14 insertions(+), 22 deletions(-) diff --git a/src/influxdb_ioxd/rpc/service.rs b/src/influxdb_ioxd/rpc/service.rs index 5d7d875911..b72187a5a8 100644 --- a/src/influxdb_ioxd/rpc/service.rs +++ b/src/influxdb_ioxd/rpc/service.rs @@ -2,8 +2,15 @@ //! implemented in terms of the `query::Database` and //! `query::DatabaseStore` -use std::{collections::HashMap, sync::Arc}; - +use super::{ + data::{ + fieldlist_to_measurement_fields_response, series_set_item_to_read_response, + tag_keys_to_byte_vecs, + }, + expr::{self, AddRPCNode, Loggable, SpecialTagKeys}, + input::GrpcInputs, +}; +use data_types::{error::ErrorLogger, names::org_and_bucket_to_database, DatabaseName}; use generated_types::{ i_ox_testing_server::{IOxTesting, IOxTestingServer}, storage_server::{Storage, StorageServer}, @@ -13,36 +20,21 @@ use generated_types::{ ReadSeriesCardinalityRequest, ReadWindowAggregateRequest, StringValuesResponse, TagKeysRequest, TagValuesRequest, TestErrorRequest, TestErrorResponse, TimestampRange, }; - -use data_types::error::ErrorLogger; - -use query::group_by::GroupByAndAggregate; -use query::{exec::fieldlist::FieldList, frontend::influxrpc::InfluxRPCPlanner}; -use tokio_stream::wrappers::{ReceiverStream, TcpListenerStream}; - -use super::expr::{self, AddRPCNode, Loggable, SpecialTagKeys}; -use super::input::GrpcInputs; -use data_types::names::org_and_bucket_to_database; - -use data_types::DatabaseName; - use query::{ + exec::fieldlist::FieldList, exec::seriesset::{Error as SeriesSetError, SeriesSetItem}, + frontend::influxrpc::InfluxRPCPlanner, + group_by::GroupByAndAggregate, predicate::PredicateBuilder, Database, DatabaseStore, }; - use snafu::{OptionExt, ResultExt, Snafu}; - +use std::{collections::HashMap, sync::Arc}; use tokio::{net::TcpListener, sync::mpsc}; +use tokio_stream::wrappers::{ReceiverStream, TcpListenerStream}; use tonic::Status; use tracing::{error, info, warn}; -use super::data::{ - fieldlist_to_measurement_fields_response, series_set_item_to_read_response, - tag_keys_to_byte_vecs, -}; - #[derive(Debug, Snafu)] pub enum Error { #[snafu(display("gRPC server error: {}", source))] From 39338b195b36aa6cac6f9b0807b07b41af0eb7c6 Mon Sep 17 00:00:00 2001 From: Jake Goulding Date: Fri, 22 Jan 2021 11:14:40 -0500 Subject: [PATCH 03/10] feat: Basic scaffolding of a flight server do_get --- src/influxdb_ioxd/rpc/service.rs | 199 ++++++++++++++++++++++++++- tests/end-to-end.rs | 1 + tests/end_to_end_cases/flight_api.rs | 72 ++++++++++ tests/end_to_end_cases/mod.rs | 1 + 4 files changed, 269 insertions(+), 4 deletions(-) create mode 100644 tests/end_to_end_cases/flight_api.rs diff --git a/src/influxdb_ioxd/rpc/service.rs b/src/influxdb_ioxd/rpc/service.rs index b72187a5a8..4c01f8695b 100644 --- a/src/influxdb_ioxd/rpc/service.rs +++ b/src/influxdb_ioxd/rpc/service.rs @@ -10,7 +10,22 @@ use super::{ expr::{self, AddRPCNode, Loggable, SpecialTagKeys}, input::GrpcInputs, }; -use data_types::{error::ErrorLogger, names::org_and_bucket_to_database, DatabaseName}; +use arrow_deps::{ + arrow, + arrow_flight::{ + self, + flight_service_server::{FlightService, FlightServiceServer}, + Action, ActionType, Criteria, Empty, FlightData, FlightDescriptor, FlightInfo, + HandshakeRequest, HandshakeResponse, PutResult, SchemaResult, Ticket, + }, + datafusion::physical_plan::collect, +}; +use data_types::{ + error::ErrorLogger, + names::{org_and_bucket_to_database, OrgBucketMappingError}, + DatabaseName, +}; +use futures::Stream; use generated_types::{ i_ox_testing_server::{IOxTesting, IOxTestingServer}, storage_server::{Storage, StorageServer}, @@ -23,16 +38,17 @@ use generated_types::{ use query::{ exec::fieldlist::FieldList, exec::seriesset::{Error as SeriesSetError, SeriesSetItem}, - frontend::influxrpc::InfluxRPCPlanner, + frontend::{influxrpc::InfluxRPCPlanner, sql::SQLQueryPlanner}, group_by::GroupByAndAggregate, predicate::PredicateBuilder, Database, DatabaseStore, }; +use serde::Deserialize; use snafu::{OptionExt, ResultExt, Snafu}; -use std::{collections::HashMap, sync::Arc}; +use std::{collections::HashMap, pin::Pin, sync::Arc}; use tokio::{net::TcpListener, sync::mpsc}; use tokio_stream::wrappers::{ReceiverStream, TcpListenerStream}; -use tonic::Status; +use tonic::{Request, Response, Status, Streaming}; use tracing::{error, info, warn}; #[derive(Debug, Snafu)] @@ -157,6 +173,30 @@ pub enum Error { #[snafu(display("Operation not yet implemented: {}", operation))] NotYetImplemented { operation: String }, + + #[snafu(display("Invalid ticket. Error: {:?} Ticket: {:?}", source, ticket))] + InvalidTicket { + source: std::string::FromUtf8Error, + ticket: Vec, + }, + + #[snafu(display("Invalid query, could not parse '{}': {}", query, source))] + InvalidQuery { + query: String, + source: serde_json::Error, + }, + + #[snafu(display("Internal error mapping org & bucket: {}", source))] + BucketMappingError { source: OrgBucketMappingError }, + + #[snafu(display("Bucket {} not found in org {}", bucket, org))] + BucketNotFound { org: String, bucket: String }, + + #[snafu(display("Internal error reading points from database {}: {}", db_name, source))] + Query { + db_name: String, + source: Box, + }, } pub type Result = std::result::Result; @@ -203,6 +243,11 @@ impl Error { Self::SendingResults { .. } => Status::internal(self.to_string()), Self::InternalHintsFieldNotSupported { .. } => Status::internal(self.to_string()), Self::NotYetImplemented { .. } => Status::internal(self.to_string()), + Self::InvalidTicket { .. } => Status::invalid_argument(self.to_string()), + Self::InvalidQuery { .. } => Status::invalid_argument(self.to_string()), + Self::BucketMappingError { .. } => Status::internal(self.to_string()), + Self::BucketNotFound { .. } => Status::not_found(self.to_string()), + Self::Query { .. } => Status::internal(self.to_string()), } } } @@ -746,6 +791,151 @@ where } } +type TonicStream = Pin> + Send + Sync + 'static>>; + +#[derive(Deserialize, Debug)] +/// Body of the `Ticket` serialized and sent to the do_get endpoint; this should +/// be shared with the read API probably... +struct ReadInfo { + org: String, + bucket: String, + sql_query: String, +} + +#[tonic::async_trait] +impl FlightService for GrpcService +where + T: DatabaseStore + 'static, +{ + type HandshakeStream = TonicStream; + type ListFlightsStream = TonicStream; + type DoGetStream = TonicStream; + type DoPutStream = TonicStream; + type DoActionStream = TonicStream; + type ListActionsStream = TonicStream; + type DoExchangeStream = TonicStream; + + async fn get_schema( + &self, + _request: Request, + ) -> Result, Status> { + Err(Status::unimplemented("Not yet implemented")) + } + + async fn do_get( + &self, + request: Request, + ) -> Result, Status> { + let ticket = request.into_inner(); + let json_str = String::from_utf8(ticket.ticket.to_vec()).context(InvalidTicket { + ticket: ticket.ticket, + })?; + + let read_info: ReadInfo = + serde_json::from_str(&json_str).context(InvalidQuery { query: &json_str })?; + + let db_name = org_and_bucket_to_database(&read_info.org, &read_info.bucket) + .context(BucketMappingError)?; + let db = self.db_store.db(&db_name).await.context(BucketNotFound { + org: read_info.org.clone(), + bucket: read_info.bucket.clone(), + })?; + + let planner = SQLQueryPlanner::default(); + let executor = self.db_store.executor(); + + let physical_plan = planner + .query(&*db, &read_info.sql_query, &executor) + .await + .unwrap(); + + // execute the query + let results = collect(physical_plan.clone()) + .await + .map_err(|e| Box::new(e) as _) + .context(Query { db_name })?; + if results.is_empty() { + return Err(Status::internal("There were no results from ticket")); + } + + let options = arrow::ipc::writer::IpcWriteOptions::default(); + let schema = physical_plan.schema(); + let schema_flight_data = + arrow_flight::utils::flight_data_from_arrow_schema(schema.as_ref(), &options); + + let mut flights: Vec> = vec![Ok(schema_flight_data)]; + + let mut batches: Vec> = results + .iter() + .flat_map(|batch| { + let (flight_dictionaries, flight_batch) = + arrow_flight::utils::flight_data_from_arrow_batch(batch, &options); + flight_dictionaries + .into_iter() + .chain(std::iter::once(flight_batch)) + .map(Ok) + }) + .collect(); + + // append batch vector to schema vector, so that the first message sent is the + // schema + flights.append(&mut batches); + + let output = futures::stream::iter(flights); + + Ok(Response::new(Box::pin(output) as Self::DoGetStream)) + } + + async fn handshake( + &self, + _request: Request>, + ) -> Result, Status> { + Err(Status::unimplemented("Not yet implemented")) + } + + async fn list_flights( + &self, + _request: Request, + ) -> Result, Status> { + Err(Status::unimplemented("Not yet implemented")) + } + + async fn get_flight_info( + &self, + _request: Request, + ) -> Result, Status> { + Err(Status::unimplemented("Not yet implemented")) + } + + async fn do_put( + &self, + _request: Request>, + ) -> Result, Status> { + Err(Status::unimplemented("Not yet implemented")) + } + + async fn do_action( + &self, + _request: Request, + ) -> Result, Status> { + Err(Status::unimplemented("Not yet implemented")) + } + + async fn list_actions( + &self, + _request: Request, + ) -> Result, Status> { + Err(Status::unimplemented("Not yet implemented")) + } + + async fn do_exchange( + &self, + _request: Request>, + ) -> Result, Status> { + Err(Status::unimplemented("Not yet implemented")) + } +} + trait SetRange { /// sets the timestamp range to range, if present fn set_range(self, range: Option) -> Self; @@ -1139,6 +1329,7 @@ where tonic::transport::Server::builder() .add_service(IOxTestingServer::new(GrpcService::new(storage.clone()))) .add_service(StorageServer::new(GrpcService::new(storage.clone()))) + .add_service(FlightServiceServer::new(GrpcService::new(storage))) .serve_with_incoming(stream) .await .context(ServerError {}) diff --git a/tests/end-to-end.rs b/tests/end-to-end.rs index 2d74e63894..350d1f90a2 100644 --- a/tests/end-to-end.rs +++ b/tests/end-to-end.rs @@ -62,6 +62,7 @@ async fn read_and_write_data() { read_api::test(&http_client, &scenario, sql_query, &expected_read_data).await; grpc_api::test(&mut storage_client, &scenario).await; + flight_api::test(&scenario, sql_query, &expected_read_data).await; } // These tests manage their own data diff --git a/tests/end_to_end_cases/flight_api.rs b/tests/end_to_end_cases/flight_api.rs new file mode 100644 index 0000000000..b55d19a74b --- /dev/null +++ b/tests/end_to_end_cases/flight_api.rs @@ -0,0 +1,72 @@ +use crate::{Scenario, GRPC_URL_BASE}; +use arrow_deps::{ + arrow::{ + datatypes::Schema, + ipc::{self, reader}, + }, + arrow_flight::{ + flight_service_client::FlightServiceClient, utils::flight_data_to_arrow_batch, Ticket, + }, + assert_table_eq, +}; +use futures::prelude::*; +use serde::Serialize; +use std::{convert::TryFrom, sync::Arc}; + +// TODO: this should be shared +#[derive(Serialize, Debug)] +struct ReadInfo { + org: String, + bucket: String, + sql_query: String, +} + +pub async fn test(scenario: &Scenario, sql_query: &str, expected_read_data: &[String]) { + let mut flight_client = FlightServiceClient::connect(GRPC_URL_BASE).await.unwrap(); + + let query = ReadInfo { + org: scenario.org_id_str().into(), + bucket: scenario.bucket_id_str().into(), + sql_query: sql_query.into(), + }; + + let t = Ticket { + ticket: serde_json::to_string(&query).unwrap().into(), + }; + let mut response = flight_client.do_get(t).await.unwrap().into_inner(); + + let flight_data_schema = response.next().await.unwrap().unwrap(); + let schema = Arc::new(Schema::try_from(&flight_data_schema).unwrap()); + + let mut dictionaries_by_field = vec![None; schema.fields().len()]; + + let mut batches = vec![]; + + while let Some(data) = response.next().await { + let mut data = data.unwrap(); + let mut message = + ipc::root_as_message(&data.data_header[..]).expect("Error parsing first message"); + + while message.header_type() == ipc::MessageHeader::DictionaryBatch { + reader::read_dictionary( + &data.data_body, + message + .header_as_dictionary_batch() + .expect("Error parsing dictionary"), + &schema, + &mut dictionaries_by_field, + ) + .expect("Error reading dictionary"); + + data = response.next().await.unwrap().ok().unwrap(); + message = ipc::root_as_message(&data.data_header[..]).expect("Error parsing message"); + } + + batches.push( + flight_data_to_arrow_batch(&data, schema.clone(), &dictionaries_by_field) + .expect("Unable to convert flight data to Arrow batch"), + ); + } + + assert_table_eq!(expected_read_data, &batches); +} diff --git a/tests/end_to_end_cases/mod.rs b/tests/end_to_end_cases/mod.rs index 3c81319465..a0e01d8bc2 100644 --- a/tests/end_to_end_cases/mod.rs +++ b/tests/end_to_end_cases/mod.rs @@ -1,2 +1,3 @@ +pub mod flight_api; pub mod grpc_api; pub mod read_api; From 8975b1cbf5fe8fd7cd28b1c018906dca84c0621e Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Fri, 29 Jan 2021 15:58:12 -0500 Subject: [PATCH 04/10] fix: into_iter in the assert_table_eq macro This makes it so the macro can take `&[String]` or `&[&str]`. --- arrow_deps/src/test_util.rs | 3 ++- server/src/db.rs | 8 ++++---- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/arrow_deps/src/test_util.rs b/arrow_deps/src/test_util.rs index ed7fcfe2fd..11c17507f4 100644 --- a/arrow_deps/src/test_util.rs +++ b/arrow_deps/src/test_util.rs @@ -14,7 +14,8 @@ use crate::arrow::compute::kernels::sort::{lexsort, SortColumn, SortOptions}; #[macro_export] macro_rules! assert_table_eq { ($EXPECTED_LINES: expr, $CHUNKS: expr) => { - let expected_lines: Vec = $EXPECTED_LINES.iter().map(|&s| s.into()).collect(); + let expected_lines: Vec = + $EXPECTED_LINES.into_iter().map(|s| s.to_string()).collect(); let formatted = arrow_deps::arrow::util::pretty::pretty_format_batches($CHUNKS).unwrap(); diff --git a/server/src/db.rs b/server/src/db.rs index 08b03e1cf8..8d813f6c63 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -452,14 +452,14 @@ mod tests { "+-----+------+", ]; let batches = run_query(&db, "select * from cpu").await; - assert_table_eq!(expected, &batches); + assert_table_eq!(&expected, &batches); // And expect that we still get the same thing when data is rolled over again let chunk = db.rollover_partition("1970-01-01T00").await.unwrap(); assert_eq!(chunk.id(), 1); let batches = run_query(&db, "select * from cpu").await; - assert_table_eq!(expected, &batches); + assert_table_eq!(&expected, &batches); } #[tokio::test] @@ -495,7 +495,7 @@ mod tests { "+-----+------+", ]; let batches = run_query(&db, "select * from cpu").await; - assert_table_eq!(expected, &batches); + assert_table_eq!(&expected, &batches); // now, drop the mutable buffer chunk and results should still be the same db.drop_mutable_buffer_chunk(partition_key, mb_chunk.id()) @@ -506,7 +506,7 @@ mod tests { assert_eq!(read_buffer_chunk_ids(&db, partition_key).await, vec![0]); let batches = run_query(&db, "select * from cpu").await; - assert_table_eq!(expected, &batches); + assert_table_eq!(&expected, &batches); // drop, the chunk from the read buffer db.drop_read_buffer_chunk(partition_key, mb_chunk.id()) From 9ff2ee99d83f6555eecd6c5759f6e5d2d6a2c768 Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Mon, 1 Feb 2021 11:43:00 -0500 Subject: [PATCH 05/10] refactor: Move gRPC make_server up a module --- src/influxdb_ioxd.rs | 4 +-- src/influxdb_ioxd/rpc.rs | 41 ++++++++++++++++++++++++++ src/influxdb_ioxd/rpc/service.rs | 50 ++++++++------------------------ 3 files changed, 55 insertions(+), 40 deletions(-) diff --git a/src/influxdb_ioxd.rs b/src/influxdb_ioxd.rs index b7c5849ec1..38a6aea325 100644 --- a/src/influxdb_ioxd.rs +++ b/src/influxdb_ioxd.rs @@ -66,7 +66,7 @@ pub enum Error { ServingHttp { source: hyper::Error }, #[snafu(display("Error serving RPC: {}", source))] - ServingRPC { source: self::rpc::service::Error }, + ServingRPC { source: self::rpc::Error }, } pub type Result = std::result::Result; @@ -134,7 +134,7 @@ pub async fn main(logging_level: LoggingLevel, config: Option) -> Result .await .context(StartListeningGrpc { grpc_bind_addr })?; - let grpc_server = self::rpc::service::make_server(socket, app_server.clone()); + let grpc_server = self::rpc::make_server(socket, app_server.clone()); info!(bind_address=?grpc_bind_addr, "gRPC server listening"); diff --git a/src/influxdb_ioxd/rpc.rs b/src/influxdb_ioxd/rpc.rs index 223e345c29..1774a4f9a3 100644 --- a/src/influxdb_ioxd/rpc.rs +++ b/src/influxdb_ioxd/rpc.rs @@ -13,3 +13,44 @@ pub mod expr; pub mod id; pub mod input; pub mod service; + +use arrow_deps::arrow_flight::flight_service_server::FlightServiceServer; +use data_types::error::ErrorLogger; +use generated_types::{i_ox_testing_server::IOxTestingServer, storage_server::StorageServer}; +use query::DatabaseStore; +use snafu::{ResultExt, Snafu}; +use std::sync::Arc; +use tokio::net::TcpListener; +use tokio_stream::wrappers::TcpListenerStream; + +#[derive(Debug, Snafu)] +pub enum Error { + #[snafu(display("gRPC server error: {}", source))] + ServerError { source: tonic::transport::Error }, +} + +pub type Result = std::result::Result; + +/// Instantiate a server listening on the specified address +/// implementing the IOx, Storage, and Flight gRPC interfaces, the +/// underlying hyper server instance. Resolves when the server has +/// shutdown. +pub async fn make_server(socket: TcpListener, storage: Arc) -> Result<()> +where + T: DatabaseStore + 'static, +{ + let stream = TcpListenerStream::new(socket); + + tonic::transport::Server::builder() + .add_service(IOxTestingServer::new(service::GrpcService::new( + storage.clone(), + ))) + .add_service(StorageServer::new(service::GrpcService::new( + storage.clone(), + ))) + .add_service(FlightServiceServer::new(service::GrpcService::new(storage))) + .serve_with_incoming(stream) + .await + .context(ServerError {}) + .log_if_error("Running Tonic Server") +} diff --git a/src/influxdb_ioxd/rpc/service.rs b/src/influxdb_ioxd/rpc/service.rs index 4c01f8695b..5a60e63370 100644 --- a/src/influxdb_ioxd/rpc/service.rs +++ b/src/influxdb_ioxd/rpc/service.rs @@ -13,10 +13,9 @@ use super::{ use arrow_deps::{ arrow, arrow_flight::{ - self, - flight_service_server::{FlightService, FlightServiceServer}, - Action, ActionType, Criteria, Empty, FlightData, FlightDescriptor, FlightInfo, - HandshakeRequest, HandshakeResponse, PutResult, SchemaResult, Ticket, + self, flight_service_server::FlightService, Action, ActionType, Criteria, Empty, + FlightData, FlightDescriptor, FlightInfo, HandshakeRequest, HandshakeResponse, PutResult, + SchemaResult, Ticket, }, datafusion::physical_plan::collect, }; @@ -27,13 +26,12 @@ use data_types::{ }; use futures::Stream; use generated_types::{ - i_ox_testing_server::{IOxTesting, IOxTestingServer}, - storage_server::{Storage, StorageServer}, - CapabilitiesResponse, Capability, Int64ValuesResponse, MeasurementFieldsRequest, - MeasurementFieldsResponse, MeasurementNamesRequest, MeasurementTagKeysRequest, - MeasurementTagValuesRequest, Predicate, ReadFilterRequest, ReadGroupRequest, ReadResponse, - ReadSeriesCardinalityRequest, ReadWindowAggregateRequest, StringValuesResponse, TagKeysRequest, - TagValuesRequest, TestErrorRequest, TestErrorResponse, TimestampRange, + i_ox_testing_server::IOxTesting, storage_server::Storage, CapabilitiesResponse, Capability, + Int64ValuesResponse, MeasurementFieldsRequest, MeasurementFieldsResponse, + MeasurementNamesRequest, MeasurementTagKeysRequest, MeasurementTagValuesRequest, Predicate, + ReadFilterRequest, ReadGroupRequest, ReadResponse, ReadSeriesCardinalityRequest, + ReadWindowAggregateRequest, StringValuesResponse, TagKeysRequest, TagValuesRequest, + TestErrorRequest, TestErrorResponse, TimestampRange, }; use query::{ exec::fieldlist::FieldList, @@ -46,16 +44,13 @@ use query::{ use serde::Deserialize; use snafu::{OptionExt, ResultExt, Snafu}; use std::{collections::HashMap, pin::Pin, sync::Arc}; -use tokio::{net::TcpListener, sync::mpsc}; -use tokio_stream::wrappers::{ReceiverStream, TcpListenerStream}; +use tokio::sync::mpsc; +use tokio_stream::wrappers::ReceiverStream; use tonic::{Request, Response, Status, Streaming}; use tracing::{error, info, warn}; #[derive(Debug, Snafu)] pub enum Error { - #[snafu(display("gRPC server error: {}", source))] - ServerError { source: tonic::transport::Error }, - #[snafu(display("Database not found: {}", db_name))] DatabaseNotFound { db_name: String }, @@ -215,7 +210,6 @@ impl Error { /// status fn to_status(&self) -> tonic::Status { match &self { - Self::ServerError { .. } => Status::internal(self.to_string()), Self::DatabaseNotFound { .. } => Status::not_found(self.to_string()), Self::ListingTables { .. } => Status::internal(self.to_string()), Self::ListingColumns { .. } => { @@ -1316,26 +1310,6 @@ where Ok(field_list) } -/// Instantiate a server listening on the specified address -/// implementing the IOx and Storage gRPC interfaces, the -/// underlying hyper server instance. Resolves when the server has -/// shutdown. -pub async fn make_server(socket: TcpListener, storage: Arc) -> Result<()> -where - T: DatabaseStore + 'static, -{ - let stream = TcpListenerStream::new(socket); - - tonic::transport::Server::builder() - .add_service(IOxTestingServer::new(GrpcService::new(storage.clone()))) - .add_service(StorageServer::new(GrpcService::new(storage.clone()))) - .add_service(FlightServiceServer::new(GrpcService::new(storage))) - .serve_with_incoming(stream) - .await - .context(ServerError {}) - .log_if_error("Running Tonic Server") -} - #[cfg(test)] mod tests { use super::super::id::ID; @@ -2789,7 +2763,7 @@ mod tests { println!("Starting InfluxDB IOx rpc test server on {:?}", bind_addr); - let server = make_server(socket, test_storage.clone()); + let server = super::super::make_server(socket, test_storage.clone()); tokio::task::spawn(server); let iox_client = connect_to_server::(bind_addr) From 51d18e6ac75a4e9670c885db779027b4d3247b93 Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Mon, 1 Feb 2021 13:16:03 -0500 Subject: [PATCH 06/10] refactor: Move GrpcService definition up a module --- src/influxdb_ioxd/rpc.rs | 25 ++++++++++++++++++------- src/influxdb_ioxd/rpc/service.rs | 16 +--------------- 2 files changed, 19 insertions(+), 22 deletions(-) diff --git a/src/influxdb_ioxd/rpc.rs b/src/influxdb_ioxd/rpc.rs index 1774a4f9a3..f7e5af7b27 100644 --- a/src/influxdb_ioxd/rpc.rs +++ b/src/influxdb_ioxd/rpc.rs @@ -42,15 +42,26 @@ where let stream = TcpListenerStream::new(socket); tonic::transport::Server::builder() - .add_service(IOxTestingServer::new(service::GrpcService::new( - storage.clone(), - ))) - .add_service(StorageServer::new(service::GrpcService::new( - storage.clone(), - ))) - .add_service(FlightServiceServer::new(service::GrpcService::new(storage))) + .add_service(IOxTestingServer::new(GrpcService::new(storage.clone()))) + .add_service(StorageServer::new(GrpcService::new(storage.clone()))) + .add_service(FlightServiceServer::new(GrpcService::new(storage))) .serve_with_incoming(stream) .await .context(ServerError {}) .log_if_error("Running Tonic Server") } + +#[derive(Debug)] +pub struct GrpcService { + db_store: Arc, +} + +impl GrpcService +where + T: DatabaseStore + 'static, +{ + /// Create a new GrpcService connected to `db_store` + pub fn new(db_store: Arc) -> Self { + Self { db_store } + } +} diff --git a/src/influxdb_ioxd/rpc/service.rs b/src/influxdb_ioxd/rpc/service.rs index 5a60e63370..3b308947bc 100644 --- a/src/influxdb_ioxd/rpc/service.rs +++ b/src/influxdb_ioxd/rpc/service.rs @@ -9,6 +9,7 @@ use super::{ }, expr::{self, AddRPCNode, Loggable, SpecialTagKeys}, input::GrpcInputs, + GrpcService, }; use arrow_deps::{ arrow, @@ -246,21 +247,6 @@ impl Error { } } -#[derive(Debug)] -pub struct GrpcService { - db_store: Arc, -} - -impl GrpcService -where - T: DatabaseStore + 'static, -{ - /// Create a new GrpcService connected to `db_store` - pub fn new(db_store: Arc) -> Self { - Self { db_store } - } -} - #[tonic::async_trait] /// Implements the protobuf defined IOx rpc service for a DatabaseStore impl IOxTesting for GrpcService From 19673657c2862b6ef8687f805b92f2a8c5d5c43b Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Mon, 1 Feb 2021 13:40:14 -0500 Subject: [PATCH 07/10] refactor: Extract the Flight server into its own module --- src/influxdb_ioxd.rs | 1 + src/influxdb_ioxd/flight.rs | 214 +++++++++++++++++++++++++++++++ src/influxdb_ioxd/rpc.rs | 2 +- src/influxdb_ioxd/rpc/service.rs | 197 +--------------------------- 4 files changed, 220 insertions(+), 194 deletions(-) create mode 100644 src/influxdb_ioxd/flight.rs diff --git a/src/influxdb_ioxd.rs b/src/influxdb_ioxd.rs index 38a6aea325..7ca66ea5c8 100644 --- a/src/influxdb_ioxd.rs +++ b/src/influxdb_ioxd.rs @@ -5,6 +5,7 @@ use std::net::SocketAddr; use std::path::PathBuf; use std::sync::Arc; +mod flight; pub mod http_routes; pub mod rpc; diff --git a/src/influxdb_ioxd/flight.rs b/src/influxdb_ioxd/flight.rs new file mode 100644 index 0000000000..726ee1deb0 --- /dev/null +++ b/src/influxdb_ioxd/flight.rs @@ -0,0 +1,214 @@ +use super::rpc::GrpcService; + +use arrow_deps::{ + arrow, + arrow_flight::{ + self, flight_service_server::FlightService, Action, ActionType, Criteria, Empty, + FlightData, FlightDescriptor, FlightInfo, HandshakeRequest, HandshakeResponse, PutResult, + SchemaResult, Ticket, + }, + datafusion::physical_plan::collect, +}; +use data_types::names::{org_and_bucket_to_database, OrgBucketMappingError}; +use futures::Stream; +use query::{frontend::sql::SQLQueryPlanner, DatabaseStore}; +use serde::Deserialize; +use snafu::{OptionExt, ResultExt, Snafu}; +use std::pin::Pin; +use tonic::{Request, Response, Streaming}; +use tracing::error; + +#[derive(Debug, Snafu)] +pub enum Error { + #[snafu(display("Invalid ticket. Error: {:?} Ticket: {:?}", source, ticket))] + InvalidTicket { + source: std::string::FromUtf8Error, + ticket: Vec, + }, + #[snafu(display("Invalid query, could not parse '{}': {}", query, source))] + InvalidQuery { + query: String, + source: serde_json::Error, + }, + + #[snafu(display("Internal error mapping org & bucket: {}", source))] + BucketMappingError { source: OrgBucketMappingError }, + + #[snafu(display("Bucket {} not found in org {}", bucket, org))] + BucketNotFound { org: String, bucket: String }, + + #[snafu(display("Internal error reading points from database {}: {}", db_name, source))] + Query { + db_name: String, + source: Box, + }, +} + +impl From for tonic::Status { + /// Converts a result from the business logic into the appropriate tonic + /// status + fn from(err: Error) -> Self { + error!("Error handling Flight gRPC request: {}", err); + err.to_status() + } +} + +impl Error { + /// Converts a result from the business logic into the appropriate tonic + /// status + fn to_status(&self) -> tonic::Status { + use tonic::Status; + match &self { + Self::InvalidTicket { .. } => Status::invalid_argument(self.to_string()), + Self::InvalidQuery { .. } => Status::invalid_argument(self.to_string()), + Self::BucketMappingError { .. } => Status::internal(self.to_string()), + Self::BucketNotFound { .. } => Status::not_found(self.to_string()), + Self::Query { .. } => Status::internal(self.to_string()), + } + } +} + +type TonicStream = Pin> + Send + Sync + 'static>>; + +#[derive(Deserialize, Debug)] +/// Body of the `Ticket` serialized and sent to the do_get endpoint; this should +/// be shared with the read API probably... +struct ReadInfo { + org: String, + bucket: String, + sql_query: String, +} + +#[tonic::async_trait] +impl FlightService for GrpcService +where + T: DatabaseStore + 'static, +{ + type HandshakeStream = TonicStream; + type ListFlightsStream = TonicStream; + type DoGetStream = TonicStream; + type DoPutStream = TonicStream; + type DoActionStream = TonicStream; + type ListActionsStream = TonicStream; + type DoExchangeStream = TonicStream; + + async fn get_schema( + &self, + _request: Request, + ) -> Result, tonic::Status> { + Err(tonic::Status::unimplemented("Not yet implemented")) + } + + async fn do_get( + &self, + request: Request, + ) -> Result, tonic::Status> { + let ticket = request.into_inner(); + let json_str = String::from_utf8(ticket.ticket.to_vec()).context(InvalidTicket { + ticket: ticket.ticket, + })?; + + let read_info: ReadInfo = + serde_json::from_str(&json_str).context(InvalidQuery { query: &json_str })?; + + let db_name = org_and_bucket_to_database(&read_info.org, &read_info.bucket) + .context(BucketMappingError)?; + let db = self.db_store.db(&db_name).await.context(BucketNotFound { + org: read_info.org.clone(), + bucket: read_info.bucket.clone(), + })?; + + let planner = SQLQueryPlanner::default(); + let executor = self.db_store.executor(); + + let physical_plan = planner + .query(&*db, &read_info.sql_query, &executor) + .await + .unwrap(); + + // execute the query + let results = collect(physical_plan.clone()) + .await + .map_err(|e| Box::new(e) as _) + .context(Query { db_name })?; + if results.is_empty() { + return Err(tonic::Status::internal("There were no results from ticket")); + } + + let options = arrow::ipc::writer::IpcWriteOptions::default(); + let schema = physical_plan.schema(); + let schema_flight_data = + arrow_flight::utils::flight_data_from_arrow_schema(schema.as_ref(), &options); + + let mut flights: Vec> = vec![Ok(schema_flight_data)]; + + let mut batches: Vec> = results + .iter() + .flat_map(|batch| { + let (flight_dictionaries, flight_batch) = + arrow_flight::utils::flight_data_from_arrow_batch(batch, &options); + flight_dictionaries + .into_iter() + .chain(std::iter::once(flight_batch)) + .map(Ok) + }) + .collect(); + + // append batch vector to schema vector, so that the first message sent is the + // schema + flights.append(&mut batches); + + let output = futures::stream::iter(flights); + + Ok(Response::new(Box::pin(output) as Self::DoGetStream)) + } + + async fn handshake( + &self, + _request: Request>, + ) -> Result, tonic::Status> { + Err(tonic::Status::unimplemented("Not yet implemented")) + } + + async fn list_flights( + &self, + _request: Request, + ) -> Result, tonic::Status> { + Err(tonic::Status::unimplemented("Not yet implemented")) + } + + async fn get_flight_info( + &self, + _request: Request, + ) -> Result, tonic::Status> { + Err(tonic::Status::unimplemented("Not yet implemented")) + } + + async fn do_put( + &self, + _request: Request>, + ) -> Result, tonic::Status> { + Err(tonic::Status::unimplemented("Not yet implemented")) + } + + async fn do_action( + &self, + _request: Request, + ) -> Result, tonic::Status> { + Err(tonic::Status::unimplemented("Not yet implemented")) + } + + async fn list_actions( + &self, + _request: Request, + ) -> Result, tonic::Status> { + Err(tonic::Status::unimplemented("Not yet implemented")) + } + + async fn do_exchange( + &self, + _request: Request>, + ) -> Result, tonic::Status> { + Err(tonic::Status::unimplemented("Not yet implemented")) + } +} diff --git a/src/influxdb_ioxd/rpc.rs b/src/influxdb_ioxd/rpc.rs index f7e5af7b27..7bcea2eb70 100644 --- a/src/influxdb_ioxd/rpc.rs +++ b/src/influxdb_ioxd/rpc.rs @@ -53,7 +53,7 @@ where #[derive(Debug)] pub struct GrpcService { - db_store: Arc, + pub db_store: Arc, } impl GrpcService diff --git a/src/influxdb_ioxd/rpc/service.rs b/src/influxdb_ioxd/rpc/service.rs index 3b308947bc..1c8384d6f6 100644 --- a/src/influxdb_ioxd/rpc/service.rs +++ b/src/influxdb_ioxd/rpc/service.rs @@ -11,21 +11,7 @@ use super::{ input::GrpcInputs, GrpcService, }; -use arrow_deps::{ - arrow, - arrow_flight::{ - self, flight_service_server::FlightService, Action, ActionType, Criteria, Empty, - FlightData, FlightDescriptor, FlightInfo, HandshakeRequest, HandshakeResponse, PutResult, - SchemaResult, Ticket, - }, - datafusion::physical_plan::collect, -}; -use data_types::{ - error::ErrorLogger, - names::{org_and_bucket_to_database, OrgBucketMappingError}, - DatabaseName, -}; -use futures::Stream; +use data_types::{error::ErrorLogger, names::org_and_bucket_to_database, DatabaseName}; use generated_types::{ i_ox_testing_server::IOxTesting, storage_server::Storage, CapabilitiesResponse, Capability, Int64ValuesResponse, MeasurementFieldsRequest, MeasurementFieldsResponse, @@ -37,17 +23,16 @@ use generated_types::{ use query::{ exec::fieldlist::FieldList, exec::seriesset::{Error as SeriesSetError, SeriesSetItem}, - frontend::{influxrpc::InfluxRPCPlanner, sql::SQLQueryPlanner}, + frontend::influxrpc::InfluxRPCPlanner, group_by::GroupByAndAggregate, predicate::PredicateBuilder, Database, DatabaseStore, }; -use serde::Deserialize; use snafu::{OptionExt, ResultExt, Snafu}; -use std::{collections::HashMap, pin::Pin, sync::Arc}; +use std::{collections::HashMap, sync::Arc}; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; -use tonic::{Request, Response, Status, Streaming}; +use tonic::Status; use tracing::{error, info, warn}; #[derive(Debug, Snafu)] @@ -169,30 +154,6 @@ pub enum Error { #[snafu(display("Operation not yet implemented: {}", operation))] NotYetImplemented { operation: String }, - - #[snafu(display("Invalid ticket. Error: {:?} Ticket: {:?}", source, ticket))] - InvalidTicket { - source: std::string::FromUtf8Error, - ticket: Vec, - }, - - #[snafu(display("Invalid query, could not parse '{}': {}", query, source))] - InvalidQuery { - query: String, - source: serde_json::Error, - }, - - #[snafu(display("Internal error mapping org & bucket: {}", source))] - BucketMappingError { source: OrgBucketMappingError }, - - #[snafu(display("Bucket {} not found in org {}", bucket, org))] - BucketNotFound { org: String, bucket: String }, - - #[snafu(display("Internal error reading points from database {}: {}", db_name, source))] - Query { - db_name: String, - source: Box, - }, } pub type Result = std::result::Result; @@ -238,11 +199,6 @@ impl Error { Self::SendingResults { .. } => Status::internal(self.to_string()), Self::InternalHintsFieldNotSupported { .. } => Status::internal(self.to_string()), Self::NotYetImplemented { .. } => Status::internal(self.to_string()), - Self::InvalidTicket { .. } => Status::invalid_argument(self.to_string()), - Self::InvalidQuery { .. } => Status::invalid_argument(self.to_string()), - Self::BucketMappingError { .. } => Status::internal(self.to_string()), - Self::BucketNotFound { .. } => Status::not_found(self.to_string()), - Self::Query { .. } => Status::internal(self.to_string()), } } } @@ -771,151 +727,6 @@ where } } -type TonicStream = Pin> + Send + Sync + 'static>>; - -#[derive(Deserialize, Debug)] -/// Body of the `Ticket` serialized and sent to the do_get endpoint; this should -/// be shared with the read API probably... -struct ReadInfo { - org: String, - bucket: String, - sql_query: String, -} - -#[tonic::async_trait] -impl FlightService for GrpcService -where - T: DatabaseStore + 'static, -{ - type HandshakeStream = TonicStream; - type ListFlightsStream = TonicStream; - type DoGetStream = TonicStream; - type DoPutStream = TonicStream; - type DoActionStream = TonicStream; - type ListActionsStream = TonicStream; - type DoExchangeStream = TonicStream; - - async fn get_schema( - &self, - _request: Request, - ) -> Result, Status> { - Err(Status::unimplemented("Not yet implemented")) - } - - async fn do_get( - &self, - request: Request, - ) -> Result, Status> { - let ticket = request.into_inner(); - let json_str = String::from_utf8(ticket.ticket.to_vec()).context(InvalidTicket { - ticket: ticket.ticket, - })?; - - let read_info: ReadInfo = - serde_json::from_str(&json_str).context(InvalidQuery { query: &json_str })?; - - let db_name = org_and_bucket_to_database(&read_info.org, &read_info.bucket) - .context(BucketMappingError)?; - let db = self.db_store.db(&db_name).await.context(BucketNotFound { - org: read_info.org.clone(), - bucket: read_info.bucket.clone(), - })?; - - let planner = SQLQueryPlanner::default(); - let executor = self.db_store.executor(); - - let physical_plan = planner - .query(&*db, &read_info.sql_query, &executor) - .await - .unwrap(); - - // execute the query - let results = collect(physical_plan.clone()) - .await - .map_err(|e| Box::new(e) as _) - .context(Query { db_name })?; - if results.is_empty() { - return Err(Status::internal("There were no results from ticket")); - } - - let options = arrow::ipc::writer::IpcWriteOptions::default(); - let schema = physical_plan.schema(); - let schema_flight_data = - arrow_flight::utils::flight_data_from_arrow_schema(schema.as_ref(), &options); - - let mut flights: Vec> = vec![Ok(schema_flight_data)]; - - let mut batches: Vec> = results - .iter() - .flat_map(|batch| { - let (flight_dictionaries, flight_batch) = - arrow_flight::utils::flight_data_from_arrow_batch(batch, &options); - flight_dictionaries - .into_iter() - .chain(std::iter::once(flight_batch)) - .map(Ok) - }) - .collect(); - - // append batch vector to schema vector, so that the first message sent is the - // schema - flights.append(&mut batches); - - let output = futures::stream::iter(flights); - - Ok(Response::new(Box::pin(output) as Self::DoGetStream)) - } - - async fn handshake( - &self, - _request: Request>, - ) -> Result, Status> { - Err(Status::unimplemented("Not yet implemented")) - } - - async fn list_flights( - &self, - _request: Request, - ) -> Result, Status> { - Err(Status::unimplemented("Not yet implemented")) - } - - async fn get_flight_info( - &self, - _request: Request, - ) -> Result, Status> { - Err(Status::unimplemented("Not yet implemented")) - } - - async fn do_put( - &self, - _request: Request>, - ) -> Result, Status> { - Err(Status::unimplemented("Not yet implemented")) - } - - async fn do_action( - &self, - _request: Request, - ) -> Result, Status> { - Err(Status::unimplemented("Not yet implemented")) - } - - async fn list_actions( - &self, - _request: Request, - ) -> Result, Status> { - Err(Status::unimplemented("Not yet implemented")) - } - - async fn do_exchange( - &self, - _request: Request>, - ) -> Result, Status> { - Err(Status::unimplemented("Not yet implemented")) - } -} - trait SetRange { /// sets the timestamp range to range, if present fn set_range(self, range: Option) -> Self; From b0a3ac1a65b0dd42ac8630fbdcb9ecacf74cfe2f Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Mon, 1 Feb 2021 14:05:57 -0500 Subject: [PATCH 08/10] docs: Leave a TODO note --- src/influxdb_ioxd/flight.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/influxdb_ioxd/flight.rs b/src/influxdb_ioxd/flight.rs index 726ee1deb0..9f2799dc71 100644 --- a/src/influxdb_ioxd/flight.rs +++ b/src/influxdb_ioxd/flight.rs @@ -99,6 +99,8 @@ where Err(tonic::Status::unimplemented("Not yet implemented")) } + // TODO: Stream results back directly by using `execute` instead of `collect` + // https://docs.rs/datafusion/3.0.0/datafusion/physical_plan/trait.ExecutionPlan.html#tymethod.execute async fn do_get( &self, request: Request, From fc091041c5991452a74b05d68ab444c68235ad3b Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Mon, 1 Feb 2021 14:11:57 -0500 Subject: [PATCH 09/10] fix: Propagate query planning error instead of unwrapping --- src/influxdb_ioxd/flight.rs | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/src/influxdb_ioxd/flight.rs b/src/influxdb_ioxd/flight.rs index 9f2799dc71..6f3a7701af 100644 --- a/src/influxdb_ioxd/flight.rs +++ b/src/influxdb_ioxd/flight.rs @@ -42,6 +42,12 @@ pub enum Error { db_name: String, source: Box, }, + + #[snafu(display("Error planning query {}: {}", query, source))] + PlanningSQLQuery { + query: String, + source: query::frontend::sql::Error, + }, } impl From for tonic::Status { @@ -64,6 +70,7 @@ impl Error { Self::BucketMappingError { .. } => Status::internal(self.to_string()), Self::BucketNotFound { .. } => Status::not_found(self.to_string()), Self::Query { .. } => Status::internal(self.to_string()), + Self::PlanningSQLQuery { .. } => Status::invalid_argument(self.to_string()), } } } @@ -126,7 +133,9 @@ where let physical_plan = planner .query(&*db, &read_info.sql_query, &executor) .await - .unwrap(); + .context(PlanningSQLQuery { + query: &read_info.sql_query, + })?; // execute the query let results = collect(physical_plan.clone()) From 1f1ebefd7a5071aaf5a72f9c0cda7989151d1e96 Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Wed, 3 Feb 2021 09:56:11 -0500 Subject: [PATCH 10/10] fix: Switch to database name instead of org/bucket for the Flight API --- src/influxdb_ioxd/flight.rs | 39 ++++++++++++++-------------- tests/end_to_end_cases/flight_api.rs | 6 ++--- 2 files changed, 22 insertions(+), 23 deletions(-) diff --git a/src/influxdb_ioxd/flight.rs b/src/influxdb_ioxd/flight.rs index 6f3a7701af..ce77862a87 100644 --- a/src/influxdb_ioxd/flight.rs +++ b/src/influxdb_ioxd/flight.rs @@ -9,7 +9,6 @@ use arrow_deps::{ }, datafusion::physical_plan::collect, }; -use data_types::names::{org_and_bucket_to_database, OrgBucketMappingError}; use futures::Stream; use query::{frontend::sql::SQLQueryPlanner, DatabaseStore}; use serde::Deserialize; @@ -31,15 +30,16 @@ pub enum Error { source: serde_json::Error, }, - #[snafu(display("Internal error mapping org & bucket: {}", source))] - BucketMappingError { source: OrgBucketMappingError }, + #[snafu(display("Database {} not found", database_name))] + DatabaseNotFound { database_name: String }, - #[snafu(display("Bucket {} not found in org {}", bucket, org))] - BucketNotFound { org: String, bucket: String }, - - #[snafu(display("Internal error reading points from database {}: {}", db_name, source))] + #[snafu(display( + "Internal error reading points from database {}: {}", + database_name, + source + ))] Query { - db_name: String, + database_name: String, source: Box, }, @@ -67,8 +67,7 @@ impl Error { match &self { Self::InvalidTicket { .. } => Status::invalid_argument(self.to_string()), Self::InvalidQuery { .. } => Status::invalid_argument(self.to_string()), - Self::BucketMappingError { .. } => Status::internal(self.to_string()), - Self::BucketNotFound { .. } => Status::not_found(self.to_string()), + Self::DatabaseNotFound { .. } => Status::not_found(self.to_string()), Self::Query { .. } => Status::internal(self.to_string()), Self::PlanningSQLQuery { .. } => Status::invalid_argument(self.to_string()), } @@ -81,8 +80,7 @@ type TonicStream = Pin> + Send /// Body of the `Ticket` serialized and sent to the do_get endpoint; this should /// be shared with the read API probably... struct ReadInfo { - org: String, - bucket: String, + database_name: String, sql_query: String, } @@ -120,12 +118,13 @@ where let read_info: ReadInfo = serde_json::from_str(&json_str).context(InvalidQuery { query: &json_str })?; - let db_name = org_and_bucket_to_database(&read_info.org, &read_info.bucket) - .context(BucketMappingError)?; - let db = self.db_store.db(&db_name).await.context(BucketNotFound { - org: read_info.org.clone(), - bucket: read_info.bucket.clone(), - })?; + let db = self + .db_store + .db(&read_info.database_name) + .await + .context(DatabaseNotFound { + database_name: &read_info.database_name, + })?; let planner = SQLQueryPlanner::default(); let executor = self.db_store.executor(); @@ -141,7 +140,9 @@ where let results = collect(physical_plan.clone()) .await .map_err(|e| Box::new(e) as _) - .context(Query { db_name })?; + .context(Query { + database_name: &read_info.database_name, + })?; if results.is_empty() { return Err(tonic::Status::internal("There were no results from ticket")); } diff --git a/tests/end_to_end_cases/flight_api.rs b/tests/end_to_end_cases/flight_api.rs index b55d19a74b..fb90a64386 100644 --- a/tests/end_to_end_cases/flight_api.rs +++ b/tests/end_to_end_cases/flight_api.rs @@ -16,8 +16,7 @@ use std::{convert::TryFrom, sync::Arc}; // TODO: this should be shared #[derive(Serialize, Debug)] struct ReadInfo { - org: String, - bucket: String, + database_name: String, sql_query: String, } @@ -25,8 +24,7 @@ pub async fn test(scenario: &Scenario, sql_query: &str, expected_read_data: &[St let mut flight_client = FlightServiceClient::connect(GRPC_URL_BASE).await.unwrap(); let query = ReadInfo { - org: scenario.org_id_str().into(), - bucket: scenario.bucket_id_str().into(), + database_name: scenario.database_name().into(), sql_query: sql_query.into(), };