diff --git a/Cargo.lock b/Cargo.lock index 3e6ad08cc8..3ccc5bee00 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -121,11 +121,28 @@ dependencies = [ "serde_json", ] +[[package]] +name = "arrow-flight" +version = "4.0.0-SNAPSHOT" +source = "git+https://github.com/apache/arrow.git?rev=a7ae73be24f6653c2da8d0d37dd9686711eea974#a7ae73be24f6653c2da8d0d37dd9686711eea974" +dependencies = [ + "arrow", + "bytes", + "futures", + "proc-macro2", + "prost", + "prost-derive", + "tokio", + "tonic", + "tonic-build", +] + [[package]] name = "arrow_deps" version = "0.1.0" dependencies = [ "arrow", + "arrow-flight", "datafusion", "parquet", ] diff --git a/arrow_deps/Cargo.toml b/arrow_deps/Cargo.toml index ab598e3523..1b9a8a8c6e 100644 --- a/arrow_deps/Cargo.toml +++ b/arrow_deps/Cargo.toml @@ -14,6 +14,7 @@ description = "Apache Arrow / Parquet / DataFusion dependencies for InfluxDB IOx # The version can be found here: https://github.com/apache/arrow/commit/a7ae73be24f6653c2da8d0d37dd9686711eea974 # arrow = { git = "https://github.com/apache/arrow.git", rev = "a7ae73be24f6653c2da8d0d37dd9686711eea974" , features = ["simd"] } +arrow-flight = { git = "https://github.com/apache/arrow.git", rev = "a7ae73be24f6653c2da8d0d37dd9686711eea974" } datafusion = { git = "https://github.com/apache/arrow.git", rev = "a7ae73be24f6653c2da8d0d37dd9686711eea974" } # Turn off the "arrow" feature; it currently has a bug that causes the crate to rebuild every time # and we're not currently using it anyway diff --git a/arrow_deps/src/lib.rs b/arrow_deps/src/lib.rs index 9e575f0718..05fffedcb3 100644 --- a/arrow_deps/src/lib.rs +++ b/arrow_deps/src/lib.rs @@ -5,6 +5,7 @@ // export arrow, parquet, and datafusion publically so we can have a single // reference in cargo pub use arrow; +pub use arrow_flight; pub use datafusion; pub use parquet; diff --git a/arrow_deps/src/test_util.rs b/arrow_deps/src/test_util.rs index ed7fcfe2fd..11c17507f4 100644 --- a/arrow_deps/src/test_util.rs +++ b/arrow_deps/src/test_util.rs @@ -14,7 +14,8 @@ use crate::arrow::compute::kernels::sort::{lexsort, SortColumn, SortOptions}; #[macro_export] macro_rules! assert_table_eq { ($EXPECTED_LINES: expr, $CHUNKS: expr) => { - let expected_lines: Vec = $EXPECTED_LINES.iter().map(|&s| s.into()).collect(); + let expected_lines: Vec = + $EXPECTED_LINES.into_iter().map(|s| s.to_string()).collect(); let formatted = arrow_deps::arrow::util::pretty::pretty_format_batches($CHUNKS).unwrap(); diff --git a/server/src/db.rs b/server/src/db.rs index 08b03e1cf8..8d813f6c63 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -452,14 +452,14 @@ mod tests { "+-----+------+", ]; let batches = run_query(&db, "select * from cpu").await; - assert_table_eq!(expected, &batches); + assert_table_eq!(&expected, &batches); // And expect that we still get the same thing when data is rolled over again let chunk = db.rollover_partition("1970-01-01T00").await.unwrap(); assert_eq!(chunk.id(), 1); let batches = run_query(&db, "select * from cpu").await; - assert_table_eq!(expected, &batches); + assert_table_eq!(&expected, &batches); } #[tokio::test] @@ -495,7 +495,7 @@ mod tests { "+-----+------+", ]; let batches = run_query(&db, "select * from cpu").await; - assert_table_eq!(expected, &batches); + assert_table_eq!(&expected, &batches); // now, drop the mutable buffer chunk and results should still be the same db.drop_mutable_buffer_chunk(partition_key, mb_chunk.id()) @@ -506,7 +506,7 @@ mod tests { assert_eq!(read_buffer_chunk_ids(&db, partition_key).await, vec![0]); let batches = run_query(&db, "select * from cpu").await; - assert_table_eq!(expected, &batches); + assert_table_eq!(&expected, &batches); // drop, the chunk from the read buffer db.drop_read_buffer_chunk(partition_key, mb_chunk.id()) diff --git a/src/influxdb_ioxd.rs b/src/influxdb_ioxd.rs index b7c5849ec1..7ca66ea5c8 100644 --- a/src/influxdb_ioxd.rs +++ b/src/influxdb_ioxd.rs @@ -5,6 +5,7 @@ use std::net::SocketAddr; use std::path::PathBuf; use std::sync::Arc; +mod flight; pub mod http_routes; pub mod rpc; @@ -66,7 +67,7 @@ pub enum Error { ServingHttp { source: hyper::Error }, #[snafu(display("Error serving RPC: {}", source))] - ServingRPC { source: self::rpc::service::Error }, + ServingRPC { source: self::rpc::Error }, } pub type Result = std::result::Result; @@ -134,7 +135,7 @@ pub async fn main(logging_level: LoggingLevel, config: Option) -> Result .await .context(StartListeningGrpc { grpc_bind_addr })?; - let grpc_server = self::rpc::service::make_server(socket, app_server.clone()); + let grpc_server = self::rpc::make_server(socket, app_server.clone()); info!(bind_address=?grpc_bind_addr, "gRPC server listening"); diff --git a/src/influxdb_ioxd/flight.rs b/src/influxdb_ioxd/flight.rs new file mode 100644 index 0000000000..ce77862a87 --- /dev/null +++ b/src/influxdb_ioxd/flight.rs @@ -0,0 +1,226 @@ +use super::rpc::GrpcService; + +use arrow_deps::{ + arrow, + arrow_flight::{ + self, flight_service_server::FlightService, Action, ActionType, Criteria, Empty, + FlightData, FlightDescriptor, FlightInfo, HandshakeRequest, HandshakeResponse, PutResult, + SchemaResult, Ticket, + }, + datafusion::physical_plan::collect, +}; +use futures::Stream; +use query::{frontend::sql::SQLQueryPlanner, DatabaseStore}; +use serde::Deserialize; +use snafu::{OptionExt, ResultExt, Snafu}; +use std::pin::Pin; +use tonic::{Request, Response, Streaming}; +use tracing::error; + +#[derive(Debug, Snafu)] +pub enum Error { + #[snafu(display("Invalid ticket. Error: {:?} Ticket: {:?}", source, ticket))] + InvalidTicket { + source: std::string::FromUtf8Error, + ticket: Vec, + }, + #[snafu(display("Invalid query, could not parse '{}': {}", query, source))] + InvalidQuery { + query: String, + source: serde_json::Error, + }, + + #[snafu(display("Database {} not found", database_name))] + DatabaseNotFound { database_name: String }, + + #[snafu(display( + "Internal error reading points from database {}: {}", + database_name, + source + ))] + Query { + database_name: String, + source: Box, + }, + + #[snafu(display("Error planning query {}: {}", query, source))] + PlanningSQLQuery { + query: String, + source: query::frontend::sql::Error, + }, +} + +impl From for tonic::Status { + /// Converts a result from the business logic into the appropriate tonic + /// status + fn from(err: Error) -> Self { + error!("Error handling Flight gRPC request: {}", err); + err.to_status() + } +} + +impl Error { + /// Converts a result from the business logic into the appropriate tonic + /// status + fn to_status(&self) -> tonic::Status { + use tonic::Status; + match &self { + Self::InvalidTicket { .. } => Status::invalid_argument(self.to_string()), + Self::InvalidQuery { .. } => Status::invalid_argument(self.to_string()), + Self::DatabaseNotFound { .. } => Status::not_found(self.to_string()), + Self::Query { .. } => Status::internal(self.to_string()), + Self::PlanningSQLQuery { .. } => Status::invalid_argument(self.to_string()), + } + } +} + +type TonicStream = Pin> + Send + Sync + 'static>>; + +#[derive(Deserialize, Debug)] +/// Body of the `Ticket` serialized and sent to the do_get endpoint; this should +/// be shared with the read API probably... +struct ReadInfo { + database_name: String, + sql_query: String, +} + +#[tonic::async_trait] +impl FlightService for GrpcService +where + T: DatabaseStore + 'static, +{ + type HandshakeStream = TonicStream; + type ListFlightsStream = TonicStream; + type DoGetStream = TonicStream; + type DoPutStream = TonicStream; + type DoActionStream = TonicStream; + type ListActionsStream = TonicStream; + type DoExchangeStream = TonicStream; + + async fn get_schema( + &self, + _request: Request, + ) -> Result, tonic::Status> { + Err(tonic::Status::unimplemented("Not yet implemented")) + } + + // TODO: Stream results back directly by using `execute` instead of `collect` + // https://docs.rs/datafusion/3.0.0/datafusion/physical_plan/trait.ExecutionPlan.html#tymethod.execute + async fn do_get( + &self, + request: Request, + ) -> Result, tonic::Status> { + let ticket = request.into_inner(); + let json_str = String::from_utf8(ticket.ticket.to_vec()).context(InvalidTicket { + ticket: ticket.ticket, + })?; + + let read_info: ReadInfo = + serde_json::from_str(&json_str).context(InvalidQuery { query: &json_str })?; + + let db = self + .db_store + .db(&read_info.database_name) + .await + .context(DatabaseNotFound { + database_name: &read_info.database_name, + })?; + + let planner = SQLQueryPlanner::default(); + let executor = self.db_store.executor(); + + let physical_plan = planner + .query(&*db, &read_info.sql_query, &executor) + .await + .context(PlanningSQLQuery { + query: &read_info.sql_query, + })?; + + // execute the query + let results = collect(physical_plan.clone()) + .await + .map_err(|e| Box::new(e) as _) + .context(Query { + database_name: &read_info.database_name, + })?; + if results.is_empty() { + return Err(tonic::Status::internal("There were no results from ticket")); + } + + let options = arrow::ipc::writer::IpcWriteOptions::default(); + let schema = physical_plan.schema(); + let schema_flight_data = + arrow_flight::utils::flight_data_from_arrow_schema(schema.as_ref(), &options); + + let mut flights: Vec> = vec![Ok(schema_flight_data)]; + + let mut batches: Vec> = results + .iter() + .flat_map(|batch| { + let (flight_dictionaries, flight_batch) = + arrow_flight::utils::flight_data_from_arrow_batch(batch, &options); + flight_dictionaries + .into_iter() + .chain(std::iter::once(flight_batch)) + .map(Ok) + }) + .collect(); + + // append batch vector to schema vector, so that the first message sent is the + // schema + flights.append(&mut batches); + + let output = futures::stream::iter(flights); + + Ok(Response::new(Box::pin(output) as Self::DoGetStream)) + } + + async fn handshake( + &self, + _request: Request>, + ) -> Result, tonic::Status> { + Err(tonic::Status::unimplemented("Not yet implemented")) + } + + async fn list_flights( + &self, + _request: Request, + ) -> Result, tonic::Status> { + Err(tonic::Status::unimplemented("Not yet implemented")) + } + + async fn get_flight_info( + &self, + _request: Request, + ) -> Result, tonic::Status> { + Err(tonic::Status::unimplemented("Not yet implemented")) + } + + async fn do_put( + &self, + _request: Request>, + ) -> Result, tonic::Status> { + Err(tonic::Status::unimplemented("Not yet implemented")) + } + + async fn do_action( + &self, + _request: Request, + ) -> Result, tonic::Status> { + Err(tonic::Status::unimplemented("Not yet implemented")) + } + + async fn list_actions( + &self, + _request: Request, + ) -> Result, tonic::Status> { + Err(tonic::Status::unimplemented("Not yet implemented")) + } + + async fn do_exchange( + &self, + _request: Request>, + ) -> Result, tonic::Status> { + Err(tonic::Status::unimplemented("Not yet implemented")) + } +} diff --git a/src/influxdb_ioxd/rpc.rs b/src/influxdb_ioxd/rpc.rs index 223e345c29..7bcea2eb70 100644 --- a/src/influxdb_ioxd/rpc.rs +++ b/src/influxdb_ioxd/rpc.rs @@ -13,3 +13,55 @@ pub mod expr; pub mod id; pub mod input; pub mod service; + +use arrow_deps::arrow_flight::flight_service_server::FlightServiceServer; +use data_types::error::ErrorLogger; +use generated_types::{i_ox_testing_server::IOxTestingServer, storage_server::StorageServer}; +use query::DatabaseStore; +use snafu::{ResultExt, Snafu}; +use std::sync::Arc; +use tokio::net::TcpListener; +use tokio_stream::wrappers::TcpListenerStream; + +#[derive(Debug, Snafu)] +pub enum Error { + #[snafu(display("gRPC server error: {}", source))] + ServerError { source: tonic::transport::Error }, +} + +pub type Result = std::result::Result; + +/// Instantiate a server listening on the specified address +/// implementing the IOx, Storage, and Flight gRPC interfaces, the +/// underlying hyper server instance. Resolves when the server has +/// shutdown. +pub async fn make_server(socket: TcpListener, storage: Arc) -> Result<()> +where + T: DatabaseStore + 'static, +{ + let stream = TcpListenerStream::new(socket); + + tonic::transport::Server::builder() + .add_service(IOxTestingServer::new(GrpcService::new(storage.clone()))) + .add_service(StorageServer::new(GrpcService::new(storage.clone()))) + .add_service(FlightServiceServer::new(GrpcService::new(storage))) + .serve_with_incoming(stream) + .await + .context(ServerError {}) + .log_if_error("Running Tonic Server") +} + +#[derive(Debug)] +pub struct GrpcService { + pub db_store: Arc, +} + +impl GrpcService +where + T: DatabaseStore + 'static, +{ + /// Create a new GrpcService connected to `db_store` + pub fn new(db_store: Arc) -> Self { + Self { db_store } + } +} diff --git a/src/influxdb_ioxd/rpc/service.rs b/src/influxdb_ioxd/rpc/service.rs index 5d7d875911..1c8384d6f6 100644 --- a/src/influxdb_ioxd/rpc/service.rs +++ b/src/influxdb_ioxd/rpc/service.rs @@ -2,52 +2,41 @@ //! implemented in terms of the `query::Database` and //! `query::DatabaseStore` -use std::{collections::HashMap, sync::Arc}; - -use generated_types::{ - i_ox_testing_server::{IOxTesting, IOxTestingServer}, - storage_server::{Storage, StorageServer}, - CapabilitiesResponse, Capability, Int64ValuesResponse, MeasurementFieldsRequest, - MeasurementFieldsResponse, MeasurementNamesRequest, MeasurementTagKeysRequest, - MeasurementTagValuesRequest, Predicate, ReadFilterRequest, ReadGroupRequest, ReadResponse, - ReadSeriesCardinalityRequest, ReadWindowAggregateRequest, StringValuesResponse, TagKeysRequest, - TagValuesRequest, TestErrorRequest, TestErrorResponse, TimestampRange, +use super::{ + data::{ + fieldlist_to_measurement_fields_response, series_set_item_to_read_response, + tag_keys_to_byte_vecs, + }, + expr::{self, AddRPCNode, Loggable, SpecialTagKeys}, + input::GrpcInputs, + GrpcService, +}; +use data_types::{error::ErrorLogger, names::org_and_bucket_to_database, DatabaseName}; +use generated_types::{ + i_ox_testing_server::IOxTesting, storage_server::Storage, CapabilitiesResponse, Capability, + Int64ValuesResponse, MeasurementFieldsRequest, MeasurementFieldsResponse, + MeasurementNamesRequest, MeasurementTagKeysRequest, MeasurementTagValuesRequest, Predicate, + ReadFilterRequest, ReadGroupRequest, ReadResponse, ReadSeriesCardinalityRequest, + ReadWindowAggregateRequest, StringValuesResponse, TagKeysRequest, TagValuesRequest, + TestErrorRequest, TestErrorResponse, TimestampRange, }; - -use data_types::error::ErrorLogger; - -use query::group_by::GroupByAndAggregate; -use query::{exec::fieldlist::FieldList, frontend::influxrpc::InfluxRPCPlanner}; -use tokio_stream::wrappers::{ReceiverStream, TcpListenerStream}; - -use super::expr::{self, AddRPCNode, Loggable, SpecialTagKeys}; -use super::input::GrpcInputs; -use data_types::names::org_and_bucket_to_database; - -use data_types::DatabaseName; - use query::{ + exec::fieldlist::FieldList, exec::seriesset::{Error as SeriesSetError, SeriesSetItem}, + frontend::influxrpc::InfluxRPCPlanner, + group_by::GroupByAndAggregate, predicate::PredicateBuilder, Database, DatabaseStore, }; - use snafu::{OptionExt, ResultExt, Snafu}; - -use tokio::{net::TcpListener, sync::mpsc}; +use std::{collections::HashMap, sync::Arc}; +use tokio::sync::mpsc; +use tokio_stream::wrappers::ReceiverStream; use tonic::Status; use tracing::{error, info, warn}; -use super::data::{ - fieldlist_to_measurement_fields_response, series_set_item_to_read_response, - tag_keys_to_byte_vecs, -}; - #[derive(Debug, Snafu)] pub enum Error { - #[snafu(display("gRPC server error: {}", source))] - ServerError { source: tonic::transport::Error }, - #[snafu(display("Database not found: {}", db_name))] DatabaseNotFound { db_name: String }, @@ -183,7 +172,6 @@ impl Error { /// status fn to_status(&self) -> tonic::Status { match &self { - Self::ServerError { .. } => Status::internal(self.to_string()), Self::DatabaseNotFound { .. } => Status::not_found(self.to_string()), Self::ListingTables { .. } => Status::internal(self.to_string()), Self::ListingColumns { .. } => { @@ -215,21 +203,6 @@ impl Error { } } -#[derive(Debug)] -pub struct GrpcService { - db_store: Arc, -} - -impl GrpcService -where - T: DatabaseStore + 'static, -{ - /// Create a new GrpcService connected to `db_store` - pub fn new(db_store: Arc) -> Self { - Self { db_store } - } -} - #[tonic::async_trait] /// Implements the protobuf defined IOx rpc service for a DatabaseStore impl IOxTesting for GrpcService @@ -1134,25 +1107,6 @@ where Ok(field_list) } -/// Instantiate a server listening on the specified address -/// implementing the IOx and Storage gRPC interfaces, the -/// underlying hyper server instance. Resolves when the server has -/// shutdown. -pub async fn make_server(socket: TcpListener, storage: Arc) -> Result<()> -where - T: DatabaseStore + 'static, -{ - let stream = TcpListenerStream::new(socket); - - tonic::transport::Server::builder() - .add_service(IOxTestingServer::new(GrpcService::new(storage.clone()))) - .add_service(StorageServer::new(GrpcService::new(storage.clone()))) - .serve_with_incoming(stream) - .await - .context(ServerError {}) - .log_if_error("Running Tonic Server") -} - #[cfg(test)] mod tests { use super::super::id::ID; @@ -2606,7 +2560,7 @@ mod tests { println!("Starting InfluxDB IOx rpc test server on {:?}", bind_addr); - let server = make_server(socket, test_storage.clone()); + let server = super::super::make_server(socket, test_storage.clone()); tokio::task::spawn(server); let iox_client = connect_to_server::(bind_addr) diff --git a/tests/end-to-end.rs b/tests/end-to-end.rs index 2d74e63894..350d1f90a2 100644 --- a/tests/end-to-end.rs +++ b/tests/end-to-end.rs @@ -62,6 +62,7 @@ async fn read_and_write_data() { read_api::test(&http_client, &scenario, sql_query, &expected_read_data).await; grpc_api::test(&mut storage_client, &scenario).await; + flight_api::test(&scenario, sql_query, &expected_read_data).await; } // These tests manage their own data diff --git a/tests/end_to_end_cases/flight_api.rs b/tests/end_to_end_cases/flight_api.rs new file mode 100644 index 0000000000..fb90a64386 --- /dev/null +++ b/tests/end_to_end_cases/flight_api.rs @@ -0,0 +1,70 @@ +use crate::{Scenario, GRPC_URL_BASE}; +use arrow_deps::{ + arrow::{ + datatypes::Schema, + ipc::{self, reader}, + }, + arrow_flight::{ + flight_service_client::FlightServiceClient, utils::flight_data_to_arrow_batch, Ticket, + }, + assert_table_eq, +}; +use futures::prelude::*; +use serde::Serialize; +use std::{convert::TryFrom, sync::Arc}; + +// TODO: this should be shared +#[derive(Serialize, Debug)] +struct ReadInfo { + database_name: String, + sql_query: String, +} + +pub async fn test(scenario: &Scenario, sql_query: &str, expected_read_data: &[String]) { + let mut flight_client = FlightServiceClient::connect(GRPC_URL_BASE).await.unwrap(); + + let query = ReadInfo { + database_name: scenario.database_name().into(), + sql_query: sql_query.into(), + }; + + let t = Ticket { + ticket: serde_json::to_string(&query).unwrap().into(), + }; + let mut response = flight_client.do_get(t).await.unwrap().into_inner(); + + let flight_data_schema = response.next().await.unwrap().unwrap(); + let schema = Arc::new(Schema::try_from(&flight_data_schema).unwrap()); + + let mut dictionaries_by_field = vec![None; schema.fields().len()]; + + let mut batches = vec![]; + + while let Some(data) = response.next().await { + let mut data = data.unwrap(); + let mut message = + ipc::root_as_message(&data.data_header[..]).expect("Error parsing first message"); + + while message.header_type() == ipc::MessageHeader::DictionaryBatch { + reader::read_dictionary( + &data.data_body, + message + .header_as_dictionary_batch() + .expect("Error parsing dictionary"), + &schema, + &mut dictionaries_by_field, + ) + .expect("Error reading dictionary"); + + data = response.next().await.unwrap().ok().unwrap(); + message = ipc::root_as_message(&data.data_header[..]).expect("Error parsing message"); + } + + batches.push( + flight_data_to_arrow_batch(&data, schema.clone(), &dictionaries_by_field) + .expect("Unable to convert flight data to Arrow batch"), + ); + } + + assert_table_eq!(expected_read_data, &batches); +} diff --git a/tests/end_to_end_cases/mod.rs b/tests/end_to_end_cases/mod.rs index 3c81319465..a0e01d8bc2 100644 --- a/tests/end_to_end_cases/mod.rs +++ b/tests/end_to_end_cases/mod.rs @@ -1,2 +1,3 @@ +pub mod flight_api; pub mod grpc_api; pub mod read_api;