Merge pull request #712 from influxdata/jg/flight-server

pull/24376/head
Carol (Nichols || Goulding) 2021-02-03 10:25:51 -05:00 committed by GitHub
commit 4b524dbb35
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 402 additions and 77 deletions

17
Cargo.lock generated
View File

@ -121,11 +121,28 @@ dependencies = [
"serde_json",
]
[[package]]
name = "arrow-flight"
version = "4.0.0-SNAPSHOT"
source = "git+https://github.com/apache/arrow.git?rev=a7ae73be24f6653c2da8d0d37dd9686711eea974#a7ae73be24f6653c2da8d0d37dd9686711eea974"
dependencies = [
"arrow",
"bytes",
"futures",
"proc-macro2",
"prost",
"prost-derive",
"tokio",
"tonic",
"tonic-build",
]
[[package]]
name = "arrow_deps"
version = "0.1.0"
dependencies = [
"arrow",
"arrow-flight",
"datafusion",
"parquet",
]

View File

@ -14,6 +14,7 @@ description = "Apache Arrow / Parquet / DataFusion dependencies for InfluxDB IOx
# The version can be found here: https://github.com/apache/arrow/commit/a7ae73be24f6653c2da8d0d37dd9686711eea974
#
arrow = { git = "https://github.com/apache/arrow.git", rev = "a7ae73be24f6653c2da8d0d37dd9686711eea974" , features = ["simd"] }
arrow-flight = { git = "https://github.com/apache/arrow.git", rev = "a7ae73be24f6653c2da8d0d37dd9686711eea974" }
datafusion = { git = "https://github.com/apache/arrow.git", rev = "a7ae73be24f6653c2da8d0d37dd9686711eea974" }
# Turn off the "arrow" feature; it currently has a bug that causes the crate to rebuild every time
# and we're not currently using it anyway

View File

@ -5,6 +5,7 @@
// export arrow, parquet, and datafusion publically so we can have a single
// reference in cargo
pub use arrow;
pub use arrow_flight;
pub use datafusion;
pub use parquet;

View File

@ -14,7 +14,8 @@ use crate::arrow::compute::kernels::sort::{lexsort, SortColumn, SortOptions};
#[macro_export]
macro_rules! assert_table_eq {
($EXPECTED_LINES: expr, $CHUNKS: expr) => {
let expected_lines: Vec<String> = $EXPECTED_LINES.iter().map(|&s| s.into()).collect();
let expected_lines: Vec<String> =
$EXPECTED_LINES.into_iter().map(|s| s.to_string()).collect();
let formatted = arrow_deps::arrow::util::pretty::pretty_format_batches($CHUNKS).unwrap();

View File

@ -452,14 +452,14 @@ mod tests {
"+-----+------+",
];
let batches = run_query(&db, "select * from cpu").await;
assert_table_eq!(expected, &batches);
assert_table_eq!(&expected, &batches);
// And expect that we still get the same thing when data is rolled over again
let chunk = db.rollover_partition("1970-01-01T00").await.unwrap();
assert_eq!(chunk.id(), 1);
let batches = run_query(&db, "select * from cpu").await;
assert_table_eq!(expected, &batches);
assert_table_eq!(&expected, &batches);
}
#[tokio::test]
@ -495,7 +495,7 @@ mod tests {
"+-----+------+",
];
let batches = run_query(&db, "select * from cpu").await;
assert_table_eq!(expected, &batches);
assert_table_eq!(&expected, &batches);
// now, drop the mutable buffer chunk and results should still be the same
db.drop_mutable_buffer_chunk(partition_key, mb_chunk.id())
@ -506,7 +506,7 @@ mod tests {
assert_eq!(read_buffer_chunk_ids(&db, partition_key).await, vec![0]);
let batches = run_query(&db, "select * from cpu").await;
assert_table_eq!(expected, &batches);
assert_table_eq!(&expected, &batches);
// drop, the chunk from the read buffer
db.drop_read_buffer_chunk(partition_key, mb_chunk.id())

View File

@ -5,6 +5,7 @@ use std::net::SocketAddr;
use std::path::PathBuf;
use std::sync::Arc;
mod flight;
pub mod http_routes;
pub mod rpc;
@ -66,7 +67,7 @@ pub enum Error {
ServingHttp { source: hyper::Error },
#[snafu(display("Error serving RPC: {}", source))]
ServingRPC { source: self::rpc::service::Error },
ServingRPC { source: self::rpc::Error },
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@ -134,7 +135,7 @@ pub async fn main(logging_level: LoggingLevel, config: Option<Config>) -> Result
.await
.context(StartListeningGrpc { grpc_bind_addr })?;
let grpc_server = self::rpc::service::make_server(socket, app_server.clone());
let grpc_server = self::rpc::make_server(socket, app_server.clone());
info!(bind_address=?grpc_bind_addr, "gRPC server listening");

226
src/influxdb_ioxd/flight.rs Normal file
View File

@ -0,0 +1,226 @@
use super::rpc::GrpcService;
use arrow_deps::{
arrow,
arrow_flight::{
self, flight_service_server::FlightService, Action, ActionType, Criteria, Empty,
FlightData, FlightDescriptor, FlightInfo, HandshakeRequest, HandshakeResponse, PutResult,
SchemaResult, Ticket,
},
datafusion::physical_plan::collect,
};
use futures::Stream;
use query::{frontend::sql::SQLQueryPlanner, DatabaseStore};
use serde::Deserialize;
use snafu::{OptionExt, ResultExt, Snafu};
use std::pin::Pin;
use tonic::{Request, Response, Streaming};
use tracing::error;
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Invalid ticket. Error: {:?} Ticket: {:?}", source, ticket))]
InvalidTicket {
source: std::string::FromUtf8Error,
ticket: Vec<u8>,
},
#[snafu(display("Invalid query, could not parse '{}': {}", query, source))]
InvalidQuery {
query: String,
source: serde_json::Error,
},
#[snafu(display("Database {} not found", database_name))]
DatabaseNotFound { database_name: String },
#[snafu(display(
"Internal error reading points from database {}: {}",
database_name,
source
))]
Query {
database_name: String,
source: Box<dyn std::error::Error + Send + Sync>,
},
#[snafu(display("Error planning query {}: {}", query, source))]
PlanningSQLQuery {
query: String,
source: query::frontend::sql::Error,
},
}
impl From<Error> for tonic::Status {
/// Converts a result from the business logic into the appropriate tonic
/// status
fn from(err: Error) -> Self {
error!("Error handling Flight gRPC request: {}", err);
err.to_status()
}
}
impl Error {
/// Converts a result from the business logic into the appropriate tonic
/// status
fn to_status(&self) -> tonic::Status {
use tonic::Status;
match &self {
Self::InvalidTicket { .. } => Status::invalid_argument(self.to_string()),
Self::InvalidQuery { .. } => Status::invalid_argument(self.to_string()),
Self::DatabaseNotFound { .. } => Status::not_found(self.to_string()),
Self::Query { .. } => Status::internal(self.to_string()),
Self::PlanningSQLQuery { .. } => Status::invalid_argument(self.to_string()),
}
}
}
type TonicStream<T> = Pin<Box<dyn Stream<Item = Result<T, tonic::Status>> + Send + Sync + 'static>>;
#[derive(Deserialize, Debug)]
/// Body of the `Ticket` serialized and sent to the do_get endpoint; this should
/// be shared with the read API probably...
struct ReadInfo {
database_name: String,
sql_query: String,
}
#[tonic::async_trait]
impl<T> FlightService for GrpcService<T>
where
T: DatabaseStore + 'static,
{
type HandshakeStream = TonicStream<HandshakeResponse>;
type ListFlightsStream = TonicStream<FlightInfo>;
type DoGetStream = TonicStream<FlightData>;
type DoPutStream = TonicStream<PutResult>;
type DoActionStream = TonicStream<arrow_flight::Result>;
type ListActionsStream = TonicStream<ActionType>;
type DoExchangeStream = TonicStream<FlightData>;
async fn get_schema(
&self,
_request: Request<FlightDescriptor>,
) -> Result<Response<SchemaResult>, tonic::Status> {
Err(tonic::Status::unimplemented("Not yet implemented"))
}
// TODO: Stream results back directly by using `execute` instead of `collect`
// https://docs.rs/datafusion/3.0.0/datafusion/physical_plan/trait.ExecutionPlan.html#tymethod.execute
async fn do_get(
&self,
request: Request<Ticket>,
) -> Result<Response<Self::DoGetStream>, tonic::Status> {
let ticket = request.into_inner();
let json_str = String::from_utf8(ticket.ticket.to_vec()).context(InvalidTicket {
ticket: ticket.ticket,
})?;
let read_info: ReadInfo =
serde_json::from_str(&json_str).context(InvalidQuery { query: &json_str })?;
let db = self
.db_store
.db(&read_info.database_name)
.await
.context(DatabaseNotFound {
database_name: &read_info.database_name,
})?;
let planner = SQLQueryPlanner::default();
let executor = self.db_store.executor();
let physical_plan = planner
.query(&*db, &read_info.sql_query, &executor)
.await
.context(PlanningSQLQuery {
query: &read_info.sql_query,
})?;
// execute the query
let results = collect(physical_plan.clone())
.await
.map_err(|e| Box::new(e) as _)
.context(Query {
database_name: &read_info.database_name,
})?;
if results.is_empty() {
return Err(tonic::Status::internal("There were no results from ticket"));
}
let options = arrow::ipc::writer::IpcWriteOptions::default();
let schema = physical_plan.schema();
let schema_flight_data =
arrow_flight::utils::flight_data_from_arrow_schema(schema.as_ref(), &options);
let mut flights: Vec<Result<FlightData, tonic::Status>> = vec![Ok(schema_flight_data)];
let mut batches: Vec<Result<FlightData, tonic::Status>> = results
.iter()
.flat_map(|batch| {
let (flight_dictionaries, flight_batch) =
arrow_flight::utils::flight_data_from_arrow_batch(batch, &options);
flight_dictionaries
.into_iter()
.chain(std::iter::once(flight_batch))
.map(Ok)
})
.collect();
// append batch vector to schema vector, so that the first message sent is the
// schema
flights.append(&mut batches);
let output = futures::stream::iter(flights);
Ok(Response::new(Box::pin(output) as Self::DoGetStream))
}
async fn handshake(
&self,
_request: Request<Streaming<HandshakeRequest>>,
) -> Result<Response<Self::HandshakeStream>, tonic::Status> {
Err(tonic::Status::unimplemented("Not yet implemented"))
}
async fn list_flights(
&self,
_request: Request<Criteria>,
) -> Result<Response<Self::ListFlightsStream>, tonic::Status> {
Err(tonic::Status::unimplemented("Not yet implemented"))
}
async fn get_flight_info(
&self,
_request: Request<FlightDescriptor>,
) -> Result<Response<FlightInfo>, tonic::Status> {
Err(tonic::Status::unimplemented("Not yet implemented"))
}
async fn do_put(
&self,
_request: Request<Streaming<FlightData>>,
) -> Result<Response<Self::DoPutStream>, tonic::Status> {
Err(tonic::Status::unimplemented("Not yet implemented"))
}
async fn do_action(
&self,
_request: Request<Action>,
) -> Result<Response<Self::DoActionStream>, tonic::Status> {
Err(tonic::Status::unimplemented("Not yet implemented"))
}
async fn list_actions(
&self,
_request: Request<Empty>,
) -> Result<Response<Self::ListActionsStream>, tonic::Status> {
Err(tonic::Status::unimplemented("Not yet implemented"))
}
async fn do_exchange(
&self,
_request: Request<Streaming<FlightData>>,
) -> Result<Response<Self::DoExchangeStream>, tonic::Status> {
Err(tonic::Status::unimplemented("Not yet implemented"))
}
}

View File

@ -13,3 +13,55 @@ pub mod expr;
pub mod id;
pub mod input;
pub mod service;
use arrow_deps::arrow_flight::flight_service_server::FlightServiceServer;
use data_types::error::ErrorLogger;
use generated_types::{i_ox_testing_server::IOxTestingServer, storage_server::StorageServer};
use query::DatabaseStore;
use snafu::{ResultExt, Snafu};
use std::sync::Arc;
use tokio::net::TcpListener;
use tokio_stream::wrappers::TcpListenerStream;
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("gRPC server error: {}", source))]
ServerError { source: tonic::transport::Error },
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Instantiate a server listening on the specified address
/// implementing the IOx, Storage, and Flight gRPC interfaces, the
/// underlying hyper server instance. Resolves when the server has
/// shutdown.
pub async fn make_server<T>(socket: TcpListener, storage: Arc<T>) -> Result<()>
where
T: DatabaseStore + 'static,
{
let stream = TcpListenerStream::new(socket);
tonic::transport::Server::builder()
.add_service(IOxTestingServer::new(GrpcService::new(storage.clone())))
.add_service(StorageServer::new(GrpcService::new(storage.clone())))
.add_service(FlightServiceServer::new(GrpcService::new(storage)))
.serve_with_incoming(stream)
.await
.context(ServerError {})
.log_if_error("Running Tonic Server")
}
#[derive(Debug)]
pub struct GrpcService<T: DatabaseStore> {
pub db_store: Arc<T>,
}
impl<T> GrpcService<T>
where
T: DatabaseStore + 'static,
{
/// Create a new GrpcService connected to `db_store`
pub fn new(db_store: Arc<T>) -> Self {
Self { db_store }
}
}

View File

@ -2,52 +2,41 @@
//! implemented in terms of the `query::Database` and
//! `query::DatabaseStore`
use std::{collections::HashMap, sync::Arc};
use generated_types::{
i_ox_testing_server::{IOxTesting, IOxTestingServer},
storage_server::{Storage, StorageServer},
CapabilitiesResponse, Capability, Int64ValuesResponse, MeasurementFieldsRequest,
MeasurementFieldsResponse, MeasurementNamesRequest, MeasurementTagKeysRequest,
MeasurementTagValuesRequest, Predicate, ReadFilterRequest, ReadGroupRequest, ReadResponse,
ReadSeriesCardinalityRequest, ReadWindowAggregateRequest, StringValuesResponse, TagKeysRequest,
TagValuesRequest, TestErrorRequest, TestErrorResponse, TimestampRange,
use super::{
data::{
fieldlist_to_measurement_fields_response, series_set_item_to_read_response,
tag_keys_to_byte_vecs,
},
expr::{self, AddRPCNode, Loggable, SpecialTagKeys},
input::GrpcInputs,
GrpcService,
};
use data_types::{error::ErrorLogger, names::org_and_bucket_to_database, DatabaseName};
use generated_types::{
i_ox_testing_server::IOxTesting, storage_server::Storage, CapabilitiesResponse, Capability,
Int64ValuesResponse, MeasurementFieldsRequest, MeasurementFieldsResponse,
MeasurementNamesRequest, MeasurementTagKeysRequest, MeasurementTagValuesRequest, Predicate,
ReadFilterRequest, ReadGroupRequest, ReadResponse, ReadSeriesCardinalityRequest,
ReadWindowAggregateRequest, StringValuesResponse, TagKeysRequest, TagValuesRequest,
TestErrorRequest, TestErrorResponse, TimestampRange,
};
use data_types::error::ErrorLogger;
use query::group_by::GroupByAndAggregate;
use query::{exec::fieldlist::FieldList, frontend::influxrpc::InfluxRPCPlanner};
use tokio_stream::wrappers::{ReceiverStream, TcpListenerStream};
use super::expr::{self, AddRPCNode, Loggable, SpecialTagKeys};
use super::input::GrpcInputs;
use data_types::names::org_and_bucket_to_database;
use data_types::DatabaseName;
use query::{
exec::fieldlist::FieldList,
exec::seriesset::{Error as SeriesSetError, SeriesSetItem},
frontend::influxrpc::InfluxRPCPlanner,
group_by::GroupByAndAggregate,
predicate::PredicateBuilder,
Database, DatabaseStore,
};
use snafu::{OptionExt, ResultExt, Snafu};
use tokio::{net::TcpListener, sync::mpsc};
use std::{collections::HashMap, sync::Arc};
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tonic::Status;
use tracing::{error, info, warn};
use super::data::{
fieldlist_to_measurement_fields_response, series_set_item_to_read_response,
tag_keys_to_byte_vecs,
};
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("gRPC server error: {}", source))]
ServerError { source: tonic::transport::Error },
#[snafu(display("Database not found: {}", db_name))]
DatabaseNotFound { db_name: String },
@ -183,7 +172,6 @@ impl Error {
/// status
fn to_status(&self) -> tonic::Status {
match &self {
Self::ServerError { .. } => Status::internal(self.to_string()),
Self::DatabaseNotFound { .. } => Status::not_found(self.to_string()),
Self::ListingTables { .. } => Status::internal(self.to_string()),
Self::ListingColumns { .. } => {
@ -215,21 +203,6 @@ impl Error {
}
}
#[derive(Debug)]
pub struct GrpcService<T: DatabaseStore> {
db_store: Arc<T>,
}
impl<T> GrpcService<T>
where
T: DatabaseStore + 'static,
{
/// Create a new GrpcService connected to `db_store`
pub fn new(db_store: Arc<T>) -> Self {
Self { db_store }
}
}
#[tonic::async_trait]
/// Implements the protobuf defined IOx rpc service for a DatabaseStore
impl<T> IOxTesting for GrpcService<T>
@ -1134,25 +1107,6 @@ where
Ok(field_list)
}
/// Instantiate a server listening on the specified address
/// implementing the IOx and Storage gRPC interfaces, the
/// underlying hyper server instance. Resolves when the server has
/// shutdown.
pub async fn make_server<T>(socket: TcpListener, storage: Arc<T>) -> Result<()>
where
T: DatabaseStore + 'static,
{
let stream = TcpListenerStream::new(socket);
tonic::transport::Server::builder()
.add_service(IOxTestingServer::new(GrpcService::new(storage.clone())))
.add_service(StorageServer::new(GrpcService::new(storage.clone())))
.serve_with_incoming(stream)
.await
.context(ServerError {})
.log_if_error("Running Tonic Server")
}
#[cfg(test)]
mod tests {
use super::super::id::ID;
@ -2606,7 +2560,7 @@ mod tests {
println!("Starting InfluxDB IOx rpc test server on {:?}", bind_addr);
let server = make_server(socket, test_storage.clone());
let server = super::super::make_server(socket, test_storage.clone());
tokio::task::spawn(server);
let iox_client = connect_to_server::<IOxTestingClient>(bind_addr)

View File

@ -62,6 +62,7 @@ async fn read_and_write_data() {
read_api::test(&http_client, &scenario, sql_query, &expected_read_data).await;
grpc_api::test(&mut storage_client, &scenario).await;
flight_api::test(&scenario, sql_query, &expected_read_data).await;
}
// These tests manage their own data

View File

@ -0,0 +1,70 @@
use crate::{Scenario, GRPC_URL_BASE};
use arrow_deps::{
arrow::{
datatypes::Schema,
ipc::{self, reader},
},
arrow_flight::{
flight_service_client::FlightServiceClient, utils::flight_data_to_arrow_batch, Ticket,
},
assert_table_eq,
};
use futures::prelude::*;
use serde::Serialize;
use std::{convert::TryFrom, sync::Arc};
// TODO: this should be shared
#[derive(Serialize, Debug)]
struct ReadInfo {
database_name: String,
sql_query: String,
}
pub async fn test(scenario: &Scenario, sql_query: &str, expected_read_data: &[String]) {
let mut flight_client = FlightServiceClient::connect(GRPC_URL_BASE).await.unwrap();
let query = ReadInfo {
database_name: scenario.database_name().into(),
sql_query: sql_query.into(),
};
let t = Ticket {
ticket: serde_json::to_string(&query).unwrap().into(),
};
let mut response = flight_client.do_get(t).await.unwrap().into_inner();
let flight_data_schema = response.next().await.unwrap().unwrap();
let schema = Arc::new(Schema::try_from(&flight_data_schema).unwrap());
let mut dictionaries_by_field = vec![None; schema.fields().len()];
let mut batches = vec![];
while let Some(data) = response.next().await {
let mut data = data.unwrap();
let mut message =
ipc::root_as_message(&data.data_header[..]).expect("Error parsing first message");
while message.header_type() == ipc::MessageHeader::DictionaryBatch {
reader::read_dictionary(
&data.data_body,
message
.header_as_dictionary_batch()
.expect("Error parsing dictionary"),
&schema,
&mut dictionaries_by_field,
)
.expect("Error reading dictionary");
data = response.next().await.unwrap().ok().unwrap();
message = ipc::root_as_message(&data.data_header[..]).expect("Error parsing message");
}
batches.push(
flight_data_to_arrow_batch(&data, schema.clone(), &dictionaries_by_field)
.expect("Unable to convert flight data to Arrow batch"),
);
}
assert_table_eq!(expected_read_data, &batches);
}

View File

@ -1,2 +1,3 @@
pub mod flight_api;
pub mod grpc_api;
pub mod read_api;