From 43ada68f371bda0e862de86e4a8de48f7ada1ded Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> Date: Tue, 1 Mar 2022 13:44:14 +0000 Subject: [PATCH 1/8] chore: reset release codegen-units to default (#3883) --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index c420fca0f4..91a676ab90 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -93,7 +93,7 @@ exclude = [ # This profile optimizes for runtime performance and small binary size at the expense of longer # build times. It's most suitable for final release builds. [profile.release] -codegen-units = 1 +codegen-units = 16 debug = true lto = "thin" From e5c45aeab69d0694db2a25bdfde6d0a897f64e98 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Tue, 1 Mar 2022 14:33:08 +0000 Subject: [PATCH 2/8] refactor: generic top-level flight implementation (#3882) This allows us to implement flight for the NG querier by just implementing a few traits and reuse all the existing glue code and optimizations (like dictionary handling). --- influxdb_iox/src/influxdb_ioxd/rpc.rs | 1 + influxdb_iox/src/influxdb_ioxd/rpc/flight.rs | 587 ++++++++++++++++++ .../server_type/database/rpc/flight.rs | 577 +---------------- 3 files changed, 600 insertions(+), 565 deletions(-) create mode 100644 influxdb_iox/src/influxdb_ioxd/rpc/flight.rs diff --git a/influxdb_iox/src/influxdb_ioxd/rpc.rs b/influxdb_iox/src/influxdb_ioxd/rpc.rs index f5da906824..e73b2e25ca 100644 --- a/influxdb_iox/src/influxdb_ioxd/rpc.rs +++ b/influxdb_iox/src/influxdb_ioxd/rpc.rs @@ -11,6 +11,7 @@ use crate::influxdb_ioxd::{ serving_readiness::ServingReadiness, }; +pub(crate) mod flight; pub(crate) mod testing; /// Returns the name of the gRPC service S. diff --git a/influxdb_iox/src/influxdb_ioxd/rpc/flight.rs b/influxdb_iox/src/influxdb_ioxd/rpc/flight.rs new file mode 100644 index 0000000000..fcb8c0e204 --- /dev/null +++ b/influxdb_iox/src/influxdb_ioxd/rpc/flight.rs @@ -0,0 +1,587 @@ +//! Implements the native gRPC IOx query API using Arrow Flight +use std::fmt::Debug; +use std::task::Poll; +use std::{pin::Pin, sync::Arc}; + +use arrow::{ + array::{make_array, ArrayRef, MutableArrayData}, + datatypes::{DataType, Field, Schema, SchemaRef}, + error::ArrowError, + record_batch::RecordBatch, +}; +use arrow_flight::{ + flight_service_server::{FlightService as Flight, FlightServiceServer as FlightServer}, + Action, ActionType, Criteria, Empty, FlightData, FlightDescriptor, FlightInfo, + HandshakeRequest, HandshakeResponse, PutResult, SchemaAsIpc, SchemaResult, Ticket, +}; +use datafusion::physical_plan::ExecutionPlan; +use futures::{SinkExt, Stream, StreamExt}; +use pin_project::{pin_project, pinned_drop}; +use query::QueryDatabase; +use serde::Deserialize; +use snafu::{ResultExt, Snafu}; +use tokio::task::JoinHandle; +use tonic::{Request, Response, Streaming}; + +use data_types::{DatabaseName, DatabaseNameError}; +use observability_deps::tracing::{info, warn}; +use query::exec::{ExecutionContextProvider, IOxExecutionContext}; + +use crate::influxdb_ioxd::planner::Planner; + +#[allow(clippy::enum_variant_names)] +#[derive(Debug, Snafu)] +pub enum Error { + #[snafu(display("Invalid ticket. Error: {:?} Ticket: {:?}", source, ticket))] + InvalidTicket { + source: std::string::FromUtf8Error, + ticket: Vec, + }, + #[snafu(display("Invalid query, could not parse '{}': {}", query, source))] + InvalidQuery { + query: String, + source: serde_json::Error, + }, + + #[snafu(display("Database {} not found", database_name))] + DatabaseNotFound { database_name: String }, + + #[snafu(display( + "Internal error reading points from database {}: {}", + database_name, + source + ))] + Query { + database_name: String, + source: Box, + }, + + #[snafu(display("Invalid database name: {}", source))] + InvalidDatabaseName { source: DatabaseNameError }, + + #[snafu(display("Invalid RecordBatch: {}", source))] + InvalidRecordBatch { source: ArrowError }, + + #[snafu(display("Failed to hydrate dictionary: {}", source))] + DictionaryError { source: ArrowError }, + + #[snafu(display("Error while planning query: {}", source))] + Planning { + source: crate::influxdb_ioxd::planner::Error, + }, +} +pub type Result = std::result::Result; + +impl From for tonic::Status { + /// Converts a result from the business logic into the appropriate tonic + /// status + fn from(err: Error) -> Self { + // An explicit match on the Error enum will ensure appropriate + // logging is handled for any new error variants. + let msg = "Error handling Flight gRPC request"; + match err { + Error::DatabaseNotFound { .. } + | Error::InvalidTicket { .. } + | Error::InvalidQuery { .. } + // TODO(edd): this should be `debug`. Keeping at info whilst IOx still in early development + | Error::InvalidDatabaseName { .. } => info!(?err, msg), + Error::Query { .. } => info!(?err, msg), + Error::DictionaryError { .. } + | Error::InvalidRecordBatch { .. } + | Error::Planning { .. } => warn!(?err, msg), + } + err.to_status() + } +} + +impl Error { + /// Converts a result from the business logic into the appropriate tonic + /// status + fn to_status(&self) -> tonic::Status { + use tonic::Status; + match &self { + Self::InvalidTicket { .. } => Status::invalid_argument(self.to_string()), + Self::InvalidQuery { .. } => Status::invalid_argument(self.to_string()), + Self::DatabaseNotFound { .. } => Status::not_found(self.to_string()), + Self::Query { .. } => Status::internal(self.to_string()), + Self::InvalidDatabaseName { .. } => Status::invalid_argument(self.to_string()), + Self::InvalidRecordBatch { .. } => Status::internal(self.to_string()), + Self::Planning { .. } => Status::invalid_argument(self.to_string()), + Self::DictionaryError { .. } => Status::internal(self.to_string()), + } + } +} + +type TonicStream = Pin> + Send + Sync + 'static>>; + +#[derive(Deserialize, Debug)] +/// Body of the `Ticket` serialized and sent to the do_get endpoint; this should +/// be shared with the read API probably... +struct ReadInfo { + database_name: String, + sql_query: String, +} + +pub trait QueryDatabaseProvider: std::fmt::Debug + Send + Sync + 'static { + type Db: ExecutionContextProvider + QueryDatabase; + + fn db(&self, db_name: &DatabaseName<'_>) -> std::result::Result, tonic::Status>; +} + +/// Concrete implementation of the gRPC Arrow Flight Service API +#[derive(Debug)] +struct FlightService +where + S: QueryDatabaseProvider, +{ + server: Arc, +} + +pub fn make_server(server: Arc) -> FlightServer +where + S: QueryDatabaseProvider, +{ + FlightServer::new(FlightService { server }) +} + +#[tonic::async_trait] +impl Flight for FlightService +where + S: QueryDatabaseProvider, +{ + type HandshakeStream = TonicStream; + type ListFlightsStream = TonicStream; + type DoGetStream = TonicStream; + type DoPutStream = TonicStream; + type DoActionStream = TonicStream; + type ListActionsStream = TonicStream; + type DoExchangeStream = TonicStream; + + async fn get_schema( + &self, + _request: Request, + ) -> Result, tonic::Status> { + Err(tonic::Status::unimplemented("Not yet implemented")) + } + + async fn do_get( + &self, + request: Request, + ) -> Result, tonic::Status> { + let span_ctx = request.extensions().get().cloned(); + let ticket = request.into_inner(); + let json_str = String::from_utf8(ticket.ticket.to_vec()).context(InvalidTicketSnafu { + ticket: ticket.ticket, + })?; + + let read_info: ReadInfo = + serde_json::from_str(&json_str).context(InvalidQuerySnafu { query: &json_str })?; + + let database = + DatabaseName::new(&read_info.database_name).context(InvalidDatabaseNameSnafu)?; + + let db = self.server.db(&database)?; + + let _query_completed_token = db.record_query("sql", Box::new(read_info.sql_query.clone())); + + let ctx = db.new_query_context(span_ctx); + + let physical_plan = Planner::new(&ctx) + .sql(&read_info.sql_query) + .await + .context(PlanningSnafu)?; + + let output = GetStream::new(ctx, physical_plan, read_info.database_name).await?; + + Ok(Response::new(Box::pin(output) as Self::DoGetStream)) + } + + async fn handshake( + &self, + request: Request>, + ) -> Result, tonic::Status> { + let request = request.into_inner().message().await?.unwrap(); + let response = HandshakeResponse { + protocol_version: request.protocol_version, + payload: request.payload, + }; + let output = futures::stream::iter(std::iter::once(Ok(response))); + Ok(Response::new(Box::pin(output) as Self::HandshakeStream)) + } + + async fn list_flights( + &self, + _request: Request, + ) -> Result, tonic::Status> { + Err(tonic::Status::unimplemented("Not yet implemented")) + } + + async fn get_flight_info( + &self, + _request: Request, + ) -> Result, tonic::Status> { + Err(tonic::Status::unimplemented("Not yet implemented")) + } + + async fn do_put( + &self, + _request: Request>, + ) -> Result, tonic::Status> { + Err(tonic::Status::unimplemented("Not yet implemented")) + } + + async fn do_action( + &self, + _request: Request, + ) -> Result, tonic::Status> { + Err(tonic::Status::unimplemented("Not yet implemented")) + } + + async fn list_actions( + &self, + _request: Request, + ) -> Result, tonic::Status> { + Err(tonic::Status::unimplemented("Not yet implemented")) + } + + async fn do_exchange( + &self, + _request: Request>, + ) -> Result, tonic::Status> { + Err(tonic::Status::unimplemented("Not yet implemented")) + } +} + +#[pin_project(PinnedDrop)] +struct GetStream { + #[pin] + rx: futures::channel::mpsc::Receiver>, + join_handle: JoinHandle<()>, + done: bool, +} + +impl GetStream { + async fn new( + ctx: IOxExecutionContext, + physical_plan: Arc, + database_name: String, + ) -> Result { + // setup channel + let (mut tx, rx) = futures::channel::mpsc::channel::>(1); + + // get schema + let schema = Arc::new(optimize_schema(&physical_plan.schema())); + + // setup stream + let options = arrow::ipc::writer::IpcWriteOptions::default(); + let schema_flight_data = SchemaAsIpc::new(&schema, &options).into(); + let mut stream_record_batches = ctx + .execute_stream(Arc::clone(&physical_plan)) + .await + .map_err(|e| Box::new(e) as _) + .context(QuerySnafu { + database_name: &database_name, + })?; + + let join_handle = tokio::spawn(async move { + if tx.send(Ok(schema_flight_data)).await.is_err() { + // receiver gone + return; + } + + while let Some(batch_or_err) = stream_record_batches.next().await { + match batch_or_err { + Ok(batch) => { + match optimize_record_batch(&batch, Arc::clone(&schema)) { + Ok(batch) => { + let (flight_dictionaries, flight_batch) = + arrow_flight::utils::flight_data_from_arrow_batch( + &batch, &options, + ); + + for dict in flight_dictionaries { + if tx.send(Ok(dict)).await.is_err() { + // receiver is gone + return; + } + } + + if tx.send(Ok(flight_batch)).await.is_err() { + // receiver is gone + return; + } + } + Err(e) => { + // failure sending here is OK because we're cutting the stream anyways + tx.send(Err(e.into())).await.ok(); + + // end stream + return; + } + } + } + Err(e) => { + // failure sending here is OK because we're cutting the stream anyways + tx.send(Err(Error::Query { + database_name: database_name.clone(), + source: Box::new(e), + } + .into())) + .await + .ok(); + + // end stream + return; + } + } + } + }); + + Ok(Self { + rx, + join_handle, + done: false, + }) + } +} + +#[pinned_drop] +impl PinnedDrop for GetStream { + fn drop(self: Pin<&mut Self>) { + self.join_handle.abort(); + } +} + +impl Stream for GetStream { + type Item = Result; + + fn poll_next( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + let this = self.project(); + if *this.done { + Poll::Ready(None) + } else { + match this.rx.poll_next(cx) { + Poll::Ready(None) => { + *this.done = true; + Poll::Ready(None) + } + e @ Poll::Ready(Some(Err(_))) => { + *this.done = true; + e + } + other => other, + } + } + } +} + +/// Some batches are small slices of the underlying arrays. +/// At this stage we only know the number of rows in the record batch +/// and the sizes in bytes of the backing buffers of the column arrays. +/// There is no straight-forward relationship between these two quantities, +/// since some columns can host variable length data such as strings. +/// +/// However we can apply a quick&dirty heuristic: +/// if the backing buffer is two orders of magnitudes bigger +/// than the number of rows in the result set, we assume +/// that deep-copying the record batch is cheaper than the and transfer costs. +/// +/// Possible improvements: take the type of the columns into consideration +/// and perhaps sample a few element sizes (taking care of not doing more work +/// than to always copying the results in the first place). +/// +/// Or we just fix this upstream in +/// arrow_flight::utils::flight_data_from_arrow_batch and re-encode the array +/// into a smaller buffer while we have to copy stuff around anyway. +/// +/// See rationale and discussions about future improvements on +/// +fn optimize_record_batch(batch: &RecordBatch, schema: SchemaRef) -> Result { + let max_buf_len = batch + .columns() + .iter() + .map(|a| a.get_array_memory_size()) + .max() + .unwrap_or_default(); + + let columns: Result, _> = batch + .columns() + .iter() + .map(|column| { + if matches!(column.data_type(), DataType::Dictionary(_, _)) { + hydrate_dictionary(column) + } else if max_buf_len > batch.num_rows() * 100 { + Ok(deep_clone_array(column)) + } else { + Ok(Arc::clone(column)) + } + }) + .collect(); + + RecordBatch::try_new(schema, columns?).context(InvalidRecordBatchSnafu) +} + +fn deep_clone_array(array: &ArrayRef) -> ArrayRef { + let mut mutable = MutableArrayData::new(vec![array.data()], false, 0); + mutable.extend(0, 0, array.len()); + + make_array(mutable.freeze()) +} + +/// Convert dictionary types to underlying types +/// See hydrate_dictionary for more information +fn optimize_schema(schema: &Schema) -> Schema { + let fields = schema + .fields() + .iter() + .map(|field| match field.data_type() { + DataType::Dictionary(_, value_type) => Field::new( + field.name(), + value_type.as_ref().clone(), + field.is_nullable(), + ), + _ => field.clone(), + }) + .collect(); + + Schema::new(fields) +} + +/// Hydrates a dictionary to its underlying type +/// +/// An IPC response, streaming or otherwise, defines its schema up front +/// which defines the mapping from dictionary IDs. It then sends these +/// dictionaries over the wire. +/// +/// This requires identifying the different dictionaries in use, assigning +/// them IDs, and sending new dictionaries, delta or otherwise, when needed +/// +/// This is tracked by #1318 +/// +/// For now we just hydrate the dictionaries to their underlying type +fn hydrate_dictionary(array: &ArrayRef) -> Result { + match array.data_type() { + DataType::Dictionary(_, value) => { + arrow::compute::cast(array, value).context(DictionarySnafu) + } + _ => unreachable!("not a dictionary"), + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use arrow::array::StringArray; + use arrow::{ + array::{DictionaryArray, UInt32Array}, + datatypes::{DataType, Int32Type}, + }; + use arrow_flight::utils::flight_data_to_arrow_batch; + + use datafusion::physical_plan::limit::truncate_batch; + + use super::*; + + #[test] + fn test_deep_clone_array() { + let mut builder = UInt32Array::builder(1000); + builder.append_slice(&[1, 2, 3, 4, 5, 6]).unwrap(); + let array: ArrayRef = Arc::new(builder.finish()); + assert_eq!(array.len(), 6); + + let sliced = array.slice(0, 2); + assert_eq!(sliced.len(), 2); + + let deep_cloned = deep_clone_array(&sliced); + assert!(sliced.data().get_array_memory_size() > deep_cloned.data().get_array_memory_size()); + } + + #[test] + fn test_encode_flight_data() { + let options = arrow::ipc::writer::IpcWriteOptions::default(); + let c1 = UInt32Array::from(vec![1, 2, 3, 4, 5, 6]); + + let batch = RecordBatch::try_from_iter(vec![("a", Arc::new(c1) as ArrayRef)]) + .expect("cannot create record batch"); + let schema = batch.schema(); + + let (_, baseline_flight_batch) = + arrow_flight::utils::flight_data_from_arrow_batch(&batch, &options); + + let big_batch = truncate_batch(&batch, batch.num_rows() - 1); + let optimized_big_batch = + optimize_record_batch(&big_batch, Arc::clone(&schema)).expect("failed to optimize"); + let (_, optimized_big_flight_batch) = + arrow_flight::utils::flight_data_from_arrow_batch(&optimized_big_batch, &options); + + assert_eq!( + baseline_flight_batch.data_body.len(), + optimized_big_flight_batch.data_body.len() + ); + + let small_batch = truncate_batch(&batch, 1); + let optimized_small_batch = + optimize_record_batch(&small_batch, Arc::clone(&schema)).expect("failed to optimize"); + let (_, optimized_small_flight_batch) = + arrow_flight::utils::flight_data_from_arrow_batch(&optimized_small_batch, &options); + + assert!( + baseline_flight_batch.data_body.len() > optimized_small_flight_batch.data_body.len() + ); + } + + #[test] + fn test_encode_flight_data_dictionary() { + let options = arrow::ipc::writer::IpcWriteOptions::default(); + + let c1 = UInt32Array::from(vec![1, 2, 3, 4, 5, 6]); + let c2: DictionaryArray = vec![ + Some("foo"), + Some("bar"), + None, + Some("fiz"), + None, + Some("foo"), + ] + .into_iter() + .collect(); + + let batch = + RecordBatch::try_from_iter(vec![("a", Arc::new(c1) as ArrayRef), ("b", Arc::new(c2))]) + .expect("cannot create record batch"); + + let original_schema = batch.schema(); + let optimized_schema = Arc::new(optimize_schema(&original_schema)); + + let optimized_batch = optimize_record_batch(&batch, Arc::clone(&optimized_schema)).unwrap(); + + let (_, flight_data) = + arrow_flight::utils::flight_data_from_arrow_batch(&optimized_batch, &options); + + let batch = + flight_data_to_arrow_batch(&flight_data, Arc::clone(&optimized_schema), &[None, None]) + .unwrap(); + + // Should hydrate string dictionary for transport + assert_eq!(optimized_schema.field(1).data_type(), &DataType::Utf8); + let array = batch + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + + let expected = StringArray::from(vec![ + Some("foo"), + Some("bar"), + None, + Some("fiz"), + None, + Some("foo"), + ]); + assert_eq!(array, &expected) + } +} diff --git a/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/flight.rs b/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/flight.rs index e6a9f528a7..204e9f0498 100644 --- a/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/flight.rs +++ b/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/flight.rs @@ -1,577 +1,24 @@ -//! Implements the native gRPC IOx query API using Arrow Flight -use std::fmt::Debug; -use std::task::Poll; -use std::{pin::Pin, sync::Arc}; +use std::sync::Arc; -use arrow::{ - array::{make_array, ArrayRef, MutableArrayData}, - datatypes::{DataType, Field, Schema, SchemaRef}, - error::ArrowError, - record_batch::RecordBatch, +use arrow_flight::flight_service_server::{ + FlightService as Flight, FlightServiceServer as FlightServer, }; -use arrow_flight::{ - flight_service_server::{FlightService as Flight, FlightServiceServer as FlightServer}, - Action, ActionType, Criteria, Empty, FlightData, FlightDescriptor, FlightInfo, - HandshakeRequest, HandshakeResponse, PutResult, SchemaAsIpc, SchemaResult, Ticket, -}; -use datafusion::physical_plan::ExecutionPlan; -use futures::{SinkExt, Stream, StreamExt}; -use pin_project::{pin_project, pinned_drop}; -use query::QueryDatabase; -use serde::Deserialize; -use snafu::{ResultExt, Snafu}; -use tokio::task::JoinHandle; -use tonic::{Request, Response, Streaming}; - -use data_types::{DatabaseName, DatabaseNameError}; -use observability_deps::tracing::{info, warn}; -use query::exec::{ExecutionContextProvider, IOxExecutionContext}; +use data_types::DatabaseName; +use db::Db; use server::Server; +use crate::influxdb_ioxd::rpc::flight::{make_server as make_server_inner, QueryDatabaseProvider}; + use super::error::default_server_error_handler; -use crate::influxdb_ioxd::planner::Planner; -#[allow(clippy::enum_variant_names)] -#[derive(Debug, Snafu)] -pub enum Error { - #[snafu(display("Invalid ticket. Error: {:?} Ticket: {:?}", source, ticket))] - InvalidTicket { - source: std::string::FromUtf8Error, - ticket: Vec, - }, - #[snafu(display("Invalid query, could not parse '{}': {}", query, source))] - InvalidQuery { - query: String, - source: serde_json::Error, - }, +impl QueryDatabaseProvider for Server { + type Db = Db; - #[snafu(display("Database {} not found", database_name))] - DatabaseNotFound { database_name: String }, - - #[snafu(display( - "Internal error reading points from database {}: {}", - database_name, - source - ))] - Query { - database_name: String, - source: Box, - }, - - #[snafu(display("Invalid database name: {}", source))] - InvalidDatabaseName { source: DatabaseNameError }, - - #[snafu(display("Invalid RecordBatch: {}", source))] - InvalidRecordBatch { source: ArrowError }, - - #[snafu(display("Failed to hydrate dictionary: {}", source))] - DictionaryError { source: ArrowError }, - - #[snafu(display("Error while planning query: {}", source))] - Planning { - source: crate::influxdb_ioxd::planner::Error, - }, -} -pub type Result = std::result::Result; - -impl From for tonic::Status { - /// Converts a result from the business logic into the appropriate tonic - /// status - fn from(err: Error) -> Self { - // An explicit match on the Error enum will ensure appropriate - // logging is handled for any new error variants. - let msg = "Error handling Flight gRPC request"; - match err { - Error::DatabaseNotFound { .. } - | Error::InvalidTicket { .. } - | Error::InvalidQuery { .. } - // TODO(edd): this should be `debug`. Keeping at info whilst IOx still in early development - | Error::InvalidDatabaseName { .. } => info!(?err, msg), - Error::Query { .. } => info!(?err, msg), - Error::DictionaryError { .. } - | Error::InvalidRecordBatch { .. } - | Error::Planning { .. } => warn!(?err, msg), - } - err.to_status() + fn db(&self, db_name: &DatabaseName<'_>) -> Result, tonic::Status> { + self.db(db_name).map_err(default_server_error_handler) } } -impl Error { - /// Converts a result from the business logic into the appropriate tonic - /// status - fn to_status(&self) -> tonic::Status { - use tonic::Status; - match &self { - Self::InvalidTicket { .. } => Status::invalid_argument(self.to_string()), - Self::InvalidQuery { .. } => Status::invalid_argument(self.to_string()), - Self::DatabaseNotFound { .. } => Status::not_found(self.to_string()), - Self::Query { .. } => Status::internal(self.to_string()), - Self::InvalidDatabaseName { .. } => Status::invalid_argument(self.to_string()), - Self::InvalidRecordBatch { .. } => Status::internal(self.to_string()), - Self::Planning { .. } => Status::invalid_argument(self.to_string()), - Self::DictionaryError { .. } => Status::internal(self.to_string()), - } - } -} - -type TonicStream = Pin> + Send + Sync + 'static>>; - -#[derive(Deserialize, Debug)] -/// Body of the `Ticket` serialized and sent to the do_get endpoint; this should -/// be shared with the read API probably... -struct ReadInfo { - database_name: String, - sql_query: String, -} - -/// Concrete implementation of the gRPC Arrow Flight Service API -#[derive(Debug)] -struct FlightService { - server: Arc, -} - pub fn make_server(server: Arc) -> FlightServer { - FlightServer::new(FlightService { server }) -} - -#[tonic::async_trait] -impl Flight for FlightService { - type HandshakeStream = TonicStream; - type ListFlightsStream = TonicStream; - type DoGetStream = TonicStream; - type DoPutStream = TonicStream; - type DoActionStream = TonicStream; - type ListActionsStream = TonicStream; - type DoExchangeStream = TonicStream; - - async fn get_schema( - &self, - _request: Request, - ) -> Result, tonic::Status> { - Err(tonic::Status::unimplemented("Not yet implemented")) - } - - async fn do_get( - &self, - request: Request, - ) -> Result, tonic::Status> { - let span_ctx = request.extensions().get().cloned(); - let ticket = request.into_inner(); - let json_str = String::from_utf8(ticket.ticket.to_vec()).context(InvalidTicketSnafu { - ticket: ticket.ticket, - })?; - - let read_info: ReadInfo = - serde_json::from_str(&json_str).context(InvalidQuerySnafu { query: &json_str })?; - - let database = - DatabaseName::new(&read_info.database_name).context(InvalidDatabaseNameSnafu)?; - - let db = self - .server - .db(&database) - .map_err(default_server_error_handler)?; - - let _query_completed_token = db.record_query("sql", Box::new(read_info.sql_query.clone())); - - let ctx = db.new_query_context(span_ctx); - - let physical_plan = Planner::new(&ctx) - .sql(&read_info.sql_query) - .await - .context(PlanningSnafu)?; - - let output = GetStream::new(ctx, physical_plan, read_info.database_name).await?; - - Ok(Response::new(Box::pin(output) as Self::DoGetStream)) - } - - async fn handshake( - &self, - request: Request>, - ) -> Result, tonic::Status> { - let request = request.into_inner().message().await?.unwrap(); - let response = HandshakeResponse { - protocol_version: request.protocol_version, - payload: request.payload, - }; - let output = futures::stream::iter(std::iter::once(Ok(response))); - Ok(Response::new(Box::pin(output) as Self::HandshakeStream)) - } - - async fn list_flights( - &self, - _request: Request, - ) -> Result, tonic::Status> { - Err(tonic::Status::unimplemented("Not yet implemented")) - } - - async fn get_flight_info( - &self, - _request: Request, - ) -> Result, tonic::Status> { - Err(tonic::Status::unimplemented("Not yet implemented")) - } - - async fn do_put( - &self, - _request: Request>, - ) -> Result, tonic::Status> { - Err(tonic::Status::unimplemented("Not yet implemented")) - } - - async fn do_action( - &self, - _request: Request, - ) -> Result, tonic::Status> { - Err(tonic::Status::unimplemented("Not yet implemented")) - } - - async fn list_actions( - &self, - _request: Request, - ) -> Result, tonic::Status> { - Err(tonic::Status::unimplemented("Not yet implemented")) - } - - async fn do_exchange( - &self, - _request: Request>, - ) -> Result, tonic::Status> { - Err(tonic::Status::unimplemented("Not yet implemented")) - } -} - -#[pin_project(PinnedDrop)] -struct GetStream { - #[pin] - rx: futures::channel::mpsc::Receiver>, - join_handle: JoinHandle<()>, - done: bool, -} - -impl GetStream { - async fn new( - ctx: IOxExecutionContext, - physical_plan: Arc, - database_name: String, - ) -> Result { - // setup channel - let (mut tx, rx) = futures::channel::mpsc::channel::>(1); - - // get schema - let schema = Arc::new(optimize_schema(&physical_plan.schema())); - - // setup stream - let options = arrow::ipc::writer::IpcWriteOptions::default(); - let schema_flight_data = SchemaAsIpc::new(&schema, &options).into(); - let mut stream_record_batches = ctx - .execute_stream(Arc::clone(&physical_plan)) - .await - .map_err(|e| Box::new(e) as _) - .context(QuerySnafu { - database_name: &database_name, - })?; - - let join_handle = tokio::spawn(async move { - if tx.send(Ok(schema_flight_data)).await.is_err() { - // receiver gone - return; - } - - while let Some(batch_or_err) = stream_record_batches.next().await { - match batch_or_err { - Ok(batch) => { - match optimize_record_batch(&batch, Arc::clone(&schema)) { - Ok(batch) => { - let (flight_dictionaries, flight_batch) = - arrow_flight::utils::flight_data_from_arrow_batch( - &batch, &options, - ); - - for dict in flight_dictionaries { - if tx.send(Ok(dict)).await.is_err() { - // receiver is gone - return; - } - } - - if tx.send(Ok(flight_batch)).await.is_err() { - // receiver is gone - return; - } - } - Err(e) => { - // failure sending here is OK because we're cutting the stream anyways - tx.send(Err(e.into())).await.ok(); - - // end stream - return; - } - } - } - Err(e) => { - // failure sending here is OK because we're cutting the stream anyways - tx.send(Err(Error::Query { - database_name: database_name.clone(), - source: Box::new(e), - } - .into())) - .await - .ok(); - - // end stream - return; - } - } - } - }); - - Ok(Self { - rx, - join_handle, - done: false, - }) - } -} - -#[pinned_drop] -impl PinnedDrop for GetStream { - fn drop(self: Pin<&mut Self>) { - self.join_handle.abort(); - } -} - -impl Stream for GetStream { - type Item = Result; - - fn poll_next( - self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - let this = self.project(); - if *this.done { - Poll::Ready(None) - } else { - match this.rx.poll_next(cx) { - Poll::Ready(None) => { - *this.done = true; - Poll::Ready(None) - } - e @ Poll::Ready(Some(Err(_))) => { - *this.done = true; - e - } - other => other, - } - } - } -} - -/// Some batches are small slices of the underlying arrays. -/// At this stage we only know the number of rows in the record batch -/// and the sizes in bytes of the backing buffers of the column arrays. -/// There is no straight-forward relationship between these two quantities, -/// since some columns can host variable length data such as strings. -/// -/// However we can apply a quick&dirty heuristic: -/// if the backing buffer is two orders of magnitudes bigger -/// than the number of rows in the result set, we assume -/// that deep-copying the record batch is cheaper than the and transfer costs. -/// -/// Possible improvements: take the type of the columns into consideration -/// and perhaps sample a few element sizes (taking care of not doing more work -/// than to always copying the results in the first place). -/// -/// Or we just fix this upstream in -/// arrow_flight::utils::flight_data_from_arrow_batch and re-encode the array -/// into a smaller buffer while we have to copy stuff around anyway. -/// -/// See rationale and discussions about future improvements on -/// -fn optimize_record_batch(batch: &RecordBatch, schema: SchemaRef) -> Result { - let max_buf_len = batch - .columns() - .iter() - .map(|a| a.get_array_memory_size()) - .max() - .unwrap_or_default(); - - let columns: Result, _> = batch - .columns() - .iter() - .map(|column| { - if matches!(column.data_type(), DataType::Dictionary(_, _)) { - hydrate_dictionary(column) - } else if max_buf_len > batch.num_rows() * 100 { - Ok(deep_clone_array(column)) - } else { - Ok(Arc::clone(column)) - } - }) - .collect(); - - RecordBatch::try_new(schema, columns?).context(InvalidRecordBatchSnafu) -} - -fn deep_clone_array(array: &ArrayRef) -> ArrayRef { - let mut mutable = MutableArrayData::new(vec![array.data()], false, 0); - mutable.extend(0, 0, array.len()); - - make_array(mutable.freeze()) -} - -/// Convert dictionary types to underlying types -/// See hydrate_dictionary for more information -fn optimize_schema(schema: &Schema) -> Schema { - let fields = schema - .fields() - .iter() - .map(|field| match field.data_type() { - DataType::Dictionary(_, value_type) => Field::new( - field.name(), - value_type.as_ref().clone(), - field.is_nullable(), - ), - _ => field.clone(), - }) - .collect(); - - Schema::new(fields) -} - -/// Hydrates a dictionary to its underlying type -/// -/// An IPC response, streaming or otherwise, defines its schema up front -/// which defines the mapping from dictionary IDs. It then sends these -/// dictionaries over the wire. -/// -/// This requires identifying the different dictionaries in use, assigning -/// them IDs, and sending new dictionaries, delta or otherwise, when needed -/// -/// This is tracked by #1318 -/// -/// For now we just hydrate the dictionaries to their underlying type -fn hydrate_dictionary(array: &ArrayRef) -> Result { - match array.data_type() { - DataType::Dictionary(_, value) => { - arrow::compute::cast(array, value).context(DictionarySnafu) - } - _ => unreachable!("not a dictionary"), - } -} - -#[cfg(test)] -mod tests { - use std::sync::Arc; - - use arrow::array::StringArray; - use arrow::{ - array::{DictionaryArray, UInt32Array}, - datatypes::{DataType, Int32Type}, - }; - use arrow_flight::utils::flight_data_to_arrow_batch; - - use datafusion::physical_plan::limit::truncate_batch; - - use super::*; - - #[test] - fn test_deep_clone_array() { - let mut builder = UInt32Array::builder(1000); - builder.append_slice(&[1, 2, 3, 4, 5, 6]).unwrap(); - let array: ArrayRef = Arc::new(builder.finish()); - assert_eq!(array.len(), 6); - - let sliced = array.slice(0, 2); - assert_eq!(sliced.len(), 2); - - let deep_cloned = deep_clone_array(&sliced); - assert!(sliced.data().get_array_memory_size() > deep_cloned.data().get_array_memory_size()); - } - - #[test] - fn test_encode_flight_data() { - let options = arrow::ipc::writer::IpcWriteOptions::default(); - let c1 = UInt32Array::from(vec![1, 2, 3, 4, 5, 6]); - - let batch = RecordBatch::try_from_iter(vec![("a", Arc::new(c1) as ArrayRef)]) - .expect("cannot create record batch"); - let schema = batch.schema(); - - let (_, baseline_flight_batch) = - arrow_flight::utils::flight_data_from_arrow_batch(&batch, &options); - - let big_batch = truncate_batch(&batch, batch.num_rows() - 1); - let optimized_big_batch = - optimize_record_batch(&big_batch, Arc::clone(&schema)).expect("failed to optimize"); - let (_, optimized_big_flight_batch) = - arrow_flight::utils::flight_data_from_arrow_batch(&optimized_big_batch, &options); - - assert_eq!( - baseline_flight_batch.data_body.len(), - optimized_big_flight_batch.data_body.len() - ); - - let small_batch = truncate_batch(&batch, 1); - let optimized_small_batch = - optimize_record_batch(&small_batch, Arc::clone(&schema)).expect("failed to optimize"); - let (_, optimized_small_flight_batch) = - arrow_flight::utils::flight_data_from_arrow_batch(&optimized_small_batch, &options); - - assert!( - baseline_flight_batch.data_body.len() > optimized_small_flight_batch.data_body.len() - ); - } - - #[test] - fn test_encode_flight_data_dictionary() { - let options = arrow::ipc::writer::IpcWriteOptions::default(); - - let c1 = UInt32Array::from(vec![1, 2, 3, 4, 5, 6]); - let c2: DictionaryArray = vec![ - Some("foo"), - Some("bar"), - None, - Some("fiz"), - None, - Some("foo"), - ] - .into_iter() - .collect(); - - let batch = - RecordBatch::try_from_iter(vec![("a", Arc::new(c1) as ArrayRef), ("b", Arc::new(c2))]) - .expect("cannot create record batch"); - - let original_schema = batch.schema(); - let optimized_schema = Arc::new(optimize_schema(&original_schema)); - - let optimized_batch = optimize_record_batch(&batch, Arc::clone(&optimized_schema)).unwrap(); - - let (_, flight_data) = - arrow_flight::utils::flight_data_from_arrow_batch(&optimized_batch, &options); - - let batch = - flight_data_to_arrow_batch(&flight_data, Arc::clone(&optimized_schema), &[None, None]) - .unwrap(); - - // Should hydrate string dictionary for transport - assert_eq!(optimized_schema.field(1).data_type(), &DataType::Utf8); - let array = batch - .column(1) - .as_any() - .downcast_ref::() - .unwrap(); - - let expected = StringArray::from(vec![ - Some("foo"), - Some("bar"), - None, - Some("fiz"), - None, - Some("foo"), - ]); - assert_eq!(array, &expected) - } + make_server_inner(server) } From 4a56fcdcabafe93ce6cd68454c7f3a39134e966b Mon Sep 17 00:00:00 2001 From: Marko Mikulicic Date: Tue, 1 Mar 2022 15:42:55 +0100 Subject: [PATCH 3/8] fix: Use bigger executor for test job (#3885) Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com> --- .circleci/config.yml | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 5f0a141d31..f91c6ab5d1 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -195,7 +195,7 @@ jobs: - image: postgres environment: POSTGRES_HOST_AUTH_METHOD: trust - resource_class: xlarge # use of a smaller executor tends crashes on link + resource_class: 2xlarge # use of a smaller executor tends crashes on link environment: # Disable incremental compilation to avoid overhead. We are not preserving these files anyway. CARGO_INCREMENTAL: "0" @@ -247,7 +247,7 @@ jobs: test_heappy: docker: - image: quay.io/influxdb/rust:ci - resource_class: xlarge # use of a smaller executor tends crashes on link + resource_class: xlarge # use of a smaller executor tends crashes on link environment: # Disable incremental compilation to avoid overhead. We are not preserving these files anyway. CARGO_INCREMENTAL: "0" @@ -326,7 +326,7 @@ jobs: build_dev: docker: - image: quay.io/influxdb/rust:ci - resource_class: xlarge # use of a smaller executor tends crashes on link + resource_class: xlarge # use of a smaller executor tends crashes on link environment: # Disable incremental compilation to avoid overhead. We are not preserving these files anyway. CARGO_INCREMENTAL: "0" @@ -403,7 +403,7 @@ jobs: # kinda small node) machine: image: ubuntu-2004:202111-01 - resource_class: xlarge # use of a smaller executor tends crashes on link + resource_class: xlarge # use of a smaller executor tends crashes on link environment: # Disable incremental compilation to avoid overhead. We are not preserving these files anyway. CARGO_INCREMENTAL: "0" @@ -495,7 +495,6 @@ jobs: docker build -t quay.io/influxdb/rust:$COMMIT_SHA -t quay.io/influxdb/rust:ci -f docker/Dockerfile.ci --build-arg RUST_VERSION=$RUST_VERSION . docker push --all-tags quay.io/influxdb/rust - parameters: ci_image: description: "Trigger build of CI image" @@ -530,7 +529,7 @@ workflows: filters: branches: only: main - requires: # Only do a release build if all tests have passed + requires: # Only do a release build if all tests have passed - fmt - lint - cargo_audit From ace4af1b66ad0dde129b0961547214230b537a64 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Tue, 1 Mar 2022 15:25:31 +0000 Subject: [PATCH 4/8] feat: `DedicatedExecutor` async `join` and job `detach`. (#3835) * feat: detach dedicated exec jobs * feat: async `DedicatedExecutor::join` Now `DedicatedExecutor` follows the system we use for other server components: - `shutdown`: a quick sync call that signals the shutdown but doesn't drop - `join`: async awaits until the executor has finished shutdown - `drop`: warn but still try to shut down * test: irmpove `detach_receiver` test Co-authored-by: Andrew Lamb --- executor/src/lib.rs | 190 +++++++++++++++--- .../influxdb_ioxd/server_type/database/mod.rs | 2 +- ingester/src/data.rs | 2 + ingester/src/handler.rs | 5 + ingester/src/querier_handler.rs | 6 + query/src/exec.rs | 34 +++- query/src/frontend.rs | 2 + query/src/frontend/influxrpc.rs | 2 + query/src/frontend/reorg.rs | 6 + query/src/plan/stringset.rs | 2 + server/src/application.rs | 4 +- 11 files changed, 219 insertions(+), 36 deletions(-) diff --git a/executor/src/lib.rs b/executor/src/lib.rs index d2997c7a2f..c23dbd8ebd 100644 --- a/executor/src/lib.rs +++ b/executor/src/lib.rs @@ -14,10 +14,13 @@ use parking_lot::Mutex; use pin_project::{pin_project, pinned_drop}; use std::{pin::Pin, sync::Arc}; -use tokio::sync::oneshot::Receiver; +use tokio::sync::oneshot::{error::RecvError, Receiver}; use tokio_util::sync::CancellationToken; -use futures::Future; +use futures::{ + future::{BoxFuture, Shared}, + Future, FutureExt, TryFutureExt, +}; use observability_deps::tracing::warn; @@ -55,10 +58,21 @@ pub type Error = tokio::sync::oneshot::error::RecvError; #[derive(Debug)] pub struct Job { cancel: CancellationToken, + detached: bool, #[pin] rx: Receiver, } +impl Job { + /// Detached job so dropping it does not cancel it. + /// + /// You must ensure that this task eventually finishes, otherwise [`DedicatedExecutor::join`] may never return! + pub fn detach(mut self) { + // cannot destructure `Self` because we implement `Drop`, so we use a flag instead to prevent cancelation. + self.detached = true; + } +} + impl Future for Job { type Output = Result; @@ -74,7 +88,9 @@ impl Future for Job { #[pinned_drop] impl PinnedDrop for Job { fn drop(self: Pin<&mut Self>) { - self.cancel.cancel(); + if !self.detached { + self.cancel.cancel(); + } } } @@ -90,13 +106,37 @@ pub struct DedicatedExecutor { struct State { /// Channel for requests -- the dedicated executor takes requests /// from here and runs them. + /// + /// This is `None` if we triggered shutdown. requests: Option>, - /// The thread that is doing the work - thread: Option>, + /// Receiver side indicating that shutdown is complete. + completed_shutdown: Shared>>>, /// Task counter (uses Arc strong count). task_refs: Arc<()>, + + /// The inner thread that can be used to join during drop. + thread: Option>, +} + +// IMPORTANT: Implement `Drop` for `State`, NOT for `DedicatedExecutor`, because the executor can be cloned and clones +// share their inner state. +impl Drop for State { + fn drop(&mut self) { + if self.requests.is_some() { + warn!("DedicatedExecutor dropped without calling shutdown()"); + self.requests = None; + } + + // do NOT poll the shared future if we are panicking due to https://github.com/rust-lang/futures-rs/issues/2575 + if !std::thread::panicking() && self.completed_shutdown.clone().now_or_never().is_none() { + warn!("DedicatedExecutor dropped without waiting for worker termination",); + } + + // join thread but don't care about the results + self.thread.take().expect("not dropped yet").join().ok(); + } } /// The default worker priority (value passed to `libc::setpriority`); @@ -129,7 +169,8 @@ impl DedicatedExecutor { pub fn new(thread_name: &str, num_threads: usize) -> Self { let thread_name = thread_name.to_string(); - let (tx, rx) = std::sync::mpsc::channel::(); + let (tx_tasks, rx_tasks) = std::sync::mpsc::channel::(); + let (tx_shutdown, rx_shutdown) = tokio::sync::oneshot::channel(); let thread = std::thread::spawn(move || { let runtime = tokio::runtime::Builder::new_multi_thread() @@ -146,7 +187,7 @@ impl DedicatedExecutor { // We therefore use a RwLock to wait for tasks to complete let join = Arc::new(tokio::sync::RwLock::new(())); - while let Ok(task) = rx.recv() { + while let Ok(task) = rx_tasks.recv() { let join = Arc::clone(&join); let handle = join.read_owned().await; @@ -158,13 +199,17 @@ impl DedicatedExecutor { // Wait for all tasks to finish join.write().await; + + // signal shutdown, but it's OK if the other side is gone + tx_shutdown.send(()).ok(); }) }); let state = State { - requests: Some(tx), - thread: Some(thread), + requests: Some(tx_tasks), task_refs: Arc::new(()), + completed_shutdown: rx_shutdown.map_err(Arc::new).boxed().shared(), + thread: Some(thread), }; Self { @@ -205,7 +250,11 @@ impl DedicatedExecutor { warn!("tried to schedule task on an executor that was shutdown"); } - Job { rx, cancel } + Job { + rx, + cancel, + detached: false, + } } /// Number of currently active tasks. @@ -231,20 +280,23 @@ impl DedicatedExecutor { /// Only the first all to `join` will actually wait for the /// executing thread to complete. All other calls to join will /// complete immediately. - pub fn join(&self) { + /// + /// # Panic / Drop + /// [`DedicatedExecutor`] implements shutdown on [`Drop`]. You should just use this behavior and NOT call + /// [`join`](Self::join) manually during [`Drop`] or panics because this might lead to another panic, see + /// . + pub async fn join(&self) { self.shutdown(); - // take the thread out when mutex is held - let thread = { - let mut state = self.state.lock(); - state.thread.take() + // get handle mutex is held + let handle = { + let state = self.state.lock(); + state.completed_shutdown.clone() }; // wait for completion while not holding the mutex to avoid // deadlocks - if let Some(thread) = thread { - thread.join().ok(); - } + handle.await.expect("Thread died?") } } @@ -296,6 +348,8 @@ mod tests { // should be able to get the result assert_eq!(dedicated_task.await.unwrap(), 42); + + exec.join().await; } #[tokio::test] @@ -306,6 +360,40 @@ mod tests { let dedicated_task = exec.clone().spawn(do_work(42, Arc::clone(&barrier))); barrier.wait(); assert_eq!(dedicated_task.await.unwrap(), 42); + + exec.join().await; + } + + #[tokio::test] + async fn drop_clone() { + let barrier = Arc::new(Barrier::new(2)); + let exec = DedicatedExecutor::new("Test DedicatedExecutor", 1); + + drop(exec.clone()); + + let task = exec.spawn(do_work(42, Arc::clone(&barrier))); + barrier.wait(); + assert_eq!(task.await.unwrap(), 42); + + exec.join().await; + } + + #[tokio::test] + #[should_panic(expected = "foo")] + async fn just_panic() { + struct S(DedicatedExecutor); + + impl Drop for S { + fn drop(&mut self) { + self.0.join().now_or_never(); + } + } + + let exec = DedicatedExecutor::new("Test DedicatedExecutor", 1); + let _s = S(exec); + + // this must not lead to a double-panic and SIGILL + panic!("foo") } #[tokio::test] @@ -324,7 +412,7 @@ mod tests { assert_eq!(dedicated_task1.await.unwrap(), 11); assert_eq!(dedicated_task2.await.unwrap(), 42); - exec.join(); + exec.join().await; } #[tokio::test] @@ -334,6 +422,8 @@ mod tests { let dedicated_task = exec.spawn(async move { get_current_thread_priority() }); assert_eq!(dedicated_task.await.unwrap(), WORKER_PRIORITY); + + exec.join().await; } #[tokio::test] @@ -356,6 +446,8 @@ mod tests { // Validate the inner task ran to completion (aka it did not panic) assert_eq!(dedicated_task.await.unwrap(), 25); + + exec.join().await; } #[tokio::test] @@ -371,6 +463,8 @@ mod tests { // should not be able to get the result dedicated_task.await.unwrap_err(); + + exec.join().await; } #[tokio::test] @@ -390,6 +484,8 @@ mod tests { // task should complete successfully assert_eq!(dedicated_task.await.unwrap(), 42); + + exec.join().await; } #[tokio::test] @@ -402,6 +498,8 @@ mod tests { // task should complete, but return an error dedicated_task.await.unwrap_err(); + + exec.join().await; } #[tokio::test] @@ -409,20 +507,30 @@ mod tests { let exec = DedicatedExecutor::new("Test DedicatedExecutor", 1); // shutdown the clone (but not the exec) - exec.clone().join(); + exec.clone().join().await; // Simulate trying to submit tasks once executor has shutdown let dedicated_task = exec.spawn(async { 11 }); // task should complete, but return an error dedicated_task.await.unwrap_err(); + + exec.join().await; } #[tokio::test] async fn executor_join() { let exec = DedicatedExecutor::new("Test DedicatedExecutor", 1); // test it doesn't hang - exec.join() + exec.join().await; + } + + #[tokio::test] + async fn executor_join2() { + let exec = DedicatedExecutor::new("Test DedicatedExecutor", 1); + // test it doesn't hang + exec.join().await; + exec.join().await; } #[tokio::test] @@ -430,9 +538,9 @@ mod tests { async fn executor_clone_join() { let exec = DedicatedExecutor::new("Test DedicatedExecutor", 1); // test it doesn't hang - exec.clone().join(); - exec.clone().join(); - exec.join(); + exec.clone().join().await; + exec.clone().join().await; + exec.join().await; } #[tokio::test] @@ -463,7 +571,39 @@ mod tests { wait_for_tasks(&exec, 0).await; assert_eq!(exec.tasks(), 0); - exec.join() + exec.join().await; + } + + #[tokio::test] + async fn detach_receiver() { + // create empty executor + let exec = DedicatedExecutor::new("Test DedicatedExecutor", 1); + assert_eq!(exec.tasks(), 0); + + // create first task + // `detach()` consumes the task but doesn't abort the task (in contrast to `drop`). We'll proof the that the + // task is still running by linking it to a 2nd task using a barrier with size 3 (two tasks plus the main thread). + let barrier = Arc::new(AsyncBarrier::new(3)); + let dedicated_task = exec.spawn(do_work_async(11, Arc::clone(&barrier))); + dedicated_task.detach(); + assert_eq!(exec.tasks(), 1); + + // create second task + let dedicated_task = exec.spawn(do_work_async(22, Arc::clone(&barrier))); + assert_eq!(exec.tasks(), 2); + + // wait a bit just to make sure that our tasks doesn't get dropped + tokio::time::sleep(Duration::from_millis(10)).await; + assert_eq!(exec.tasks(), 2); + + // tasks should be unblocked because they both wait on the same barrier + // unblock tasks + barrier.wait().await; + wait_for_tasks(&exec, 0).await; + let result = dedicated_task.await.unwrap(); + assert_eq!(result, 22); + + exec.join().await; } /// Wait for the barrier and then return `result` diff --git a/influxdb_iox/src/influxdb_ioxd/server_type/database/mod.rs b/influxdb_iox/src/influxdb_ioxd/server_type/database/mod.rs index b2b3a231c6..06d4c48d16 100644 --- a/influxdb_iox/src/influxdb_ioxd/server_type/database/mod.rs +++ b/influxdb_iox/src/influxdb_ioxd/server_type/database/mod.rs @@ -99,7 +99,7 @@ impl ServerType for DatabaseServerType { info!("server completed shutting down"); - self.application.join(); + self.application.join().await; info!("shared application state completed shutting down"); } diff --git a/ingester/src/data.rs b/ingester/src/data.rs index 01db3b0c7d..f1b53134e9 100644 --- a/ingester/src/data.rs +++ b/ingester/src/data.rs @@ -1893,6 +1893,8 @@ mod tests { assert_batches_sorted_eq!(&expected, &[data]); assert_eq!(p.inner.read().snapshots[0].min_sequencer_number.get(), 8); assert_eq!(p.inner.read().snapshots[0].max_sequencer_number.get(), 9); + + exec.join().await; } #[tokio::test] diff --git a/ingester/src/handler.rs b/ingester/src/handler.rs index 96f9ace182..27218d3fb8 100644 --- a/ingester/src/handler.rs +++ b/ingester/src/handler.rs @@ -229,10 +229,13 @@ impl IngestHandler for IngestHandlerImpl { panic!("Background worker '{name}' exited early!"); } } + + self.data.exec.join().await; } fn shutdown(&self) { self.shutdown.cancel(); + self.data.exec.shutdown(); } } @@ -251,6 +254,8 @@ impl Drop for IngestHandlerImpl { ); } } + + // `self.data.exec` implements `Drop`, so we don't need to do anything } } diff --git a/ingester/src/querier_handler.rs b/ingester/src/querier_handler.rs index 44e80330fd..ec5bf6e2ed 100644 --- a/ingester/src/querier_handler.rs +++ b/ingester/src/querier_handler.rs @@ -299,6 +299,8 @@ mod tests { "+-----------+------+-----------------------------+", ]; assert_batches_eq!(&expected, &output_batches); + + exc.join().await; } #[tokio::test] @@ -334,6 +336,8 @@ mod tests { "+------+-----------------------------+", ]; assert_batches_eq!(&expected, &output_batches); + + exc.join().await; } #[tokio::test] @@ -364,6 +368,8 @@ mod tests { // verify data: return nothing because the selected row already deleted let expected = vec!["++", "++"]; assert_batches_eq!(&expected, &output_batches); + + exc.join().await; } #[tokio::test] diff --git a/query/src/exec.rs b/query/src/exec.rs index 5f9d64f605..d51fbb05e3 100644 --- a/query/src/exec.rs +++ b/query/src/exec.rs @@ -121,23 +121,25 @@ impl Executor { } } + /// Initializes shutdown. + pub fn shutdown(&self) { + self.query_exec.shutdown(); + self.reorg_exec.shutdown(); + } + /// Stops all subsequent task executions, and waits for the worker /// thread to complete. Note this will shutdown all created contexts. /// /// Only the first all to `join` will actually wait for the /// executing thread to complete. All other calls to join will /// complete immediately. - pub fn join(&self) { - self.query_exec.join(); - self.reorg_exec.join(); + pub async fn join(&self) { + self.query_exec.join().await; + self.reorg_exec.join().await; } } -impl Drop for Executor { - fn drop(&mut self) { - self.join(); - } -} +// No need to implement `Drop` because this is done by DedicatedExecutor already /// Create a SchemaPivot node which an arbitrary input like /// ColA | ColB | ColC @@ -265,6 +267,8 @@ mod tests { let ctx = exec.new_context(ExecutorType::Query); let result_strings = ctx.to_string_set(plan).await.unwrap(); assert_eq!(result_strings, expected_strings); + + exec.join().await; } #[tokio::test] @@ -279,6 +283,8 @@ mod tests { let results = ctx.to_string_set(plan).await.unwrap(); assert_eq!(results, StringSetRef::new(StringSet::new())); + + exec.join().await; } #[tokio::test] @@ -295,6 +301,8 @@ mod tests { let results = ctx.to_string_set(plan).await.unwrap(); assert_eq!(results, to_set(&["foo", "bar", "baz"])); + + exec.join().await; } #[tokio::test] @@ -315,6 +323,8 @@ mod tests { let results = ctx.to_string_set(plan).await.unwrap(); assert_eq!(results, to_set(&["foo", "bar", "baz"])); + + exec.join().await; } #[tokio::test] @@ -339,6 +349,8 @@ mod tests { let results = ctx.to_string_set(plan).await.unwrap(); assert_eq!(results, to_set(&["foo", "bar", "baz"])); + + exec.join().await; } #[tokio::test] @@ -370,6 +382,8 @@ mod tests { expected_error, actual_error, ); + + exec.join().await; } #[tokio::test] @@ -397,6 +411,8 @@ mod tests { expected_error, actual_error ); + + exec.join().await; } #[tokio::test] @@ -418,6 +434,8 @@ mod tests { let results = ctx.to_string_set(plan).await.expect("Executed plan"); assert_eq!(results, to_set(&["f1", "f2"])); + + exec.join().await; } /// return a set for testing diff --git a/query/src/frontend.rs b/query/src/frontend.rs index 6976a21baa..d33e015664 100644 --- a/query/src/frontend.rs +++ b/query/src/frontend.rs @@ -111,6 +111,8 @@ mod test { .unwrap(); assert_extracted_metrics!(extracted, 8); + + executor.join().await; } // Extracted baseline metrics for the specified operator diff --git a/query/src/frontend/influxrpc.rs b/query/src/frontend/influxrpc.rs index eca12af39d..793bc48004 100644 --- a/query/src/frontend/influxrpc.rs +++ b/query/src/frontend/influxrpc.rs @@ -1948,5 +1948,7 @@ mod tests { "\nActual: {:?}\nExpected: {:?}", actual_predicate, expected_predicate ); + + executor.join().await; } } diff --git a/query/src/frontend/reorg.rs b/query/src/frontend/reorg.rs index 2f4a21fd0e..c9000a0156 100644 --- a/query/src/frontend/reorg.rs +++ b/query/src/frontend/reorg.rs @@ -376,6 +376,8 @@ mod test { ]; assert_batches_eq!(&expected, &batches); + + executor.join().await; } #[tokio::test] @@ -425,6 +427,8 @@ mod test { ]; assert_batches_eq!(&expected, &batches); + + executor.join().await; } #[tokio::test] @@ -489,5 +493,7 @@ mod test { ]; assert_batches_eq!(&expected, &batches1); + + executor.join().await; } } diff --git a/query/src/plan/stringset.rs b/query/src/plan/stringset.rs index c836d213c6..35c764daf8 100644 --- a/query/src/plan/stringset.rs +++ b/query/src/plan/stringset.rs @@ -225,6 +225,8 @@ mod tests { let ctx = exec.new_context(ExecutorType::Query); let ss = ctx.to_string_set(plan).await.unwrap(); assert_eq!(ss, expected_ss); + + exec.join().await; } fn to_string_set(v: &[&str]) -> StringSet { diff --git a/server/src/application.rs b/server/src/application.rs index d5381d46d7..5f8710650d 100644 --- a/server/src/application.rs +++ b/server/src/application.rs @@ -98,7 +98,7 @@ impl ApplicationState { &self.executor } - pub fn join(&self) { - self.executor.join() + pub async fn join(&self) { + self.executor.join().await; } } From 01f4d3ed3bdcf2f3c6d8ba8bb9e656e64fb251f3 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Tue, 1 Mar 2022 15:46:02 +0000 Subject: [PATCH 5/8] fix: check arrow instead of storage gRPC health for SQL REPL (#3886) There is no reason a query pod should support the storage API. Note that some features like the observer mode or `show databases;` still need the management API. We'll probably need to fix that for NG at some point. --- influxdb_iox/src/commands/sql.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/influxdb_iox/src/commands/sql.rs b/influxdb_iox/src/commands/sql.rs index 051b04f6de..8a1a5a78c4 100644 --- a/influxdb_iox/src/commands/sql.rs +++ b/influxdb_iox/src/commands/sql.rs @@ -59,7 +59,7 @@ pub async fn command(connection: Connection, config: Config) -> Result<()> { async fn check_health(connection: Connection) -> Result<()> { let response = health::Client::new(connection) - .check_storage() + .check_arrow() .await .context(ClientSnafu)?; From 63bea75ee1d7ab5c567ac79c0108598801ddeb83 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> Date: Tue, 1 Mar 2022 16:35:39 +0000 Subject: [PATCH 6/8] feat: split release build and deploy (#3879) * feat: split release build and deploy * fix: only build_release on main * chore: fix build_perf CI * fix: deploy_release checkout * fix: update deploy_release image * fix: deploy_release credentials * fix: add COMMIT_SHA to env Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com> --- .circleci/config.yml | 61 ++++++++++++++++++++++++++++++++++---------- 1 file changed, 48 insertions(+), 13 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index f91c6ab5d1..2bb8221762 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -425,14 +425,12 @@ jobs: COMMIT_SHA="$(git rev-parse --short HEAD)" RUST_VERSION="$(sed -E -ne 's/channel = "(.*)"/\1/p' rust-toolchain.toml)" - BRANCH="$(echo "$CIRCLE_BRANCH" | tr '[:upper:]' '[:lower:]' | sed 's/[^a-z0-9]/_/g')" docker buildx build \ --build-arg RUST_VERSION="$RUST_VERSION" \ --build-arg RUSTFLAGS="-C target-feature=+avx2 -C link-arg=-fuse-ld=lld" \ --progress plain \ --tag quay.io/influxdb/iox:"$COMMIT_SHA" \ - --tag quay.io/influxdb/iox:"$BRANCH" \ . docker buildx build \ --build-arg FEATURES="" \ @@ -441,7 +439,6 @@ jobs: --build-arg RUSTFLAGS="-C target-feature=+avx2 -C link-arg=-fuse-ld=lld" \ --progress plain \ --tag quay.io/influxdb/iox_data_generator:"$COMMIT_SHA" \ - --tag quay.io/influxdb/iox_data_generator:"$BRANCH" \ . docker buildx build \ --build-arg FEATURES="" \ @@ -450,24 +447,52 @@ jobs: --build-arg RUSTFLAGS="-C target-feature=+avx2 -C link-arg=-fuse-ld=lld" \ --progress plain \ --tag quay.io/influxdb/iox_gitops_adapter:"$COMMIT_SHA" \ - --tag quay.io/influxdb/iox_gitops_adapter:"$BRANCH" \ . docker run -it --rm quay.io/influxdb/iox:$COMMIT_SHA debug print-cpu - docker push --all-tags quay.io/influxdb/iox - docker push --all-tags quay.io/influxdb/iox_data_generator - docker push --all-tags quay.io/influxdb/iox_gitops_adapter - - echo "export COMMIT_SHA=${COMMIT_SHA}" >> $BASH_ENV + docker push quay.io/influxdb/iox:"$COMMIT_SHA" + docker push quay.io/influxdb/iox_data_generator:"$COMMIT_SHA" + docker push quay.io/influxdb/iox_gitops_adapter:"$COMMIT_SHA" # linking might take a while and doesn't produce CLI output - no_output_timeout: 20m + no_output_timeout: 30m + - cache_save + + deploy_release: + docker: + - image: cimg/base:2021.04 + steps: + - setup_remote_docker: + version: 19.03.13 + docker_layer_caching: true + - checkout + - run: | + echo "$QUAY_INFLUXDB_IOX_PASS" | docker login quay.io --username $QUAY_INFLUXDB_IOX_USER --password-stdin + - run: + name: Update docker branch tags + command: | + COMMIT_SHA="$(git rev-parse --short HEAD)" + BRANCH="$(echo "$CIRCLE_BRANCH" | tr '[:upper:]' '[:lower:]' | sed 's/[^a-z0-9]/_/g')" + + docker pull quay.io/influxdb/iox:"$COMMIT_SHA" + docker pull quay.io/influxdb/iox_data_generator:"$COMMIT_SHA" + docker pull quay.io/influxdb/iox_gitops_adapter:"$COMMIT_SHA" + + docker tag quay.io/influxdb/iox:"$COMMIT_SHA" quay.io/influxdb/iox:"$BRANCH" + docker tag quay.io/influxdb/iox_data_generator:"$COMMIT_SHA" quay.io/influxdb/iox_data_generator:"$BRANCH" + docker tag quay.io/influxdb/iox_gitops_adapter:"$COMMIT_SHA" quay.io/influxdb/iox_gitops_adapter:"$BRANCH" + + docker push quay.io/influxdb/iox:"$BRANCH" + docker push quay.io/influxdb/iox_data_generator:"$BRANCH" + docker push quay.io/influxdb/iox_gitops_adapter:"$BRANCH" + + echo "export COMMIT_SHA=${COMMIT_SHA}" >> $BASH_ENV - run: name: Deploy tags command: | echo "$QUAY_PASS" | docker login quay.io --username $QUAY_USER --password-stdin ./.circleci/get-deploy-tags.sh "${COMMIT_SHA}" - - cache_save + # Prepare the CI image used for other tasks. # @@ -512,7 +537,9 @@ workflows: # CI for all pull requests. ci: when: - not: << pipeline.parameters.ci_image >> + and: + - not: << pipeline.parameters.ci_image >> + - not: << pipeline.parameters.build_perf >> jobs: - fmt - lint @@ -529,7 +556,11 @@ workflows: filters: branches: only: main - requires: # Only do a release build if all tests have passed + - deploy_release: + filters: + branches: + only: main + requires: # Only deploy if all tests have passed - fmt - lint - cargo_audit @@ -539,6 +570,7 @@ workflows: - test_heappy - test_perf - build_dev + - build_release - doc # Manual build of CI image @@ -563,6 +595,9 @@ workflows: when: << pipeline.parameters.build_perf >> jobs: - build_release + - deploy_release: + requires: + - build_release # Nightly rebuild of the build container ci_image_nightly: From 143311f63f9c68c7d0f3019ccc3215bc8e396b0d Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> Date: Tue, 1 Mar 2022 17:08:57 +0000 Subject: [PATCH 7/8] feat: additional write buffer logging (#3805) (#3887) * feat: additional write buffer logging (#3805) * fix: assertion direction Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com> --- Cargo.lock | 2 +- write_buffer/Cargo.toml | 3 ++- write_buffer/src/kafka/aggregator.rs | 28 +++++++++++++++++----------- 3 files changed, 20 insertions(+), 13 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 72c45f80a7..4e5bda9f7d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4240,7 +4240,7 @@ dependencies = [ [[package]] name = "rskafka" version = "0.2.0" -source = "git+https://github.com/influxdata/rskafka.git?rev=3af1939fd47f8680d40074dc3fd2e2a4a0da6b8c#3af1939fd47f8680d40074dc3fd2e2a4a0da6b8c" +source = "git+https://github.com/influxdata/rskafka.git?rev=f7eef8560ac871e056887a62b7014582835cba78#f7eef8560ac871e056887a62b7014582835cba78" dependencies = [ "async-trait", "bytes", diff --git a/write_buffer/Cargo.toml b/write_buffer/Cargo.toml index a239a4fce1..cf78e5795d 100644 --- a/write_buffer/Cargo.toml +++ b/write_buffer/Cargo.toml @@ -21,7 +21,8 @@ observability_deps = { path = "../observability_deps" } parking_lot = "0.12" pin-project = "1.0" prost = "0.9" -rskafka = { git = "https://github.com/influxdata/rskafka.git", rev="3af1939fd47f8680d40074dc3fd2e2a4a0da6b8c", default-features = false, features = ["compression-snappy"] } +# TODO: Temporary additional logging (#3805) +rskafka = { git = "https://github.com/influxdata/rskafka.git", rev="f7eef8560ac871e056887a62b7014582835cba78", default-features = false, features = ["compression-snappy"] } schema = { path = "../schema" } time = { path = "../time" } tokio = { version = "1.17", features = ["fs", "macros", "parking_lot", "rt", "sync", "time"] } diff --git a/write_buffer/src/kafka/aggregator.rs b/write_buffer/src/kafka/aggregator.rs index 9df73e0cc0..379997df7b 100644 --- a/write_buffer/src/kafka/aggregator.rs +++ b/write_buffer/src/kafka/aggregator.rs @@ -4,7 +4,7 @@ use data_types::sequence::Sequence; use dml::{DmlMeta, DmlOperation, DmlWrite}; use hashbrown::{hash_map::Entry, HashMap}; use mutable_batch::MutableBatch; -use observability_deps::tracing::{info, warn}; +use observability_deps::tracing::{error, info, warn}; use rskafka::{ client::producer::aggregator::{self, Aggregator, StatusDeaggregator, TryPush}, record::Record, @@ -240,7 +240,7 @@ pub struct DmlAggregator { collector: Option>, /// Database name. - database_name: String, + database_name: Arc, /// Maximum batch size in bytes. max_size: usize, @@ -258,14 +258,14 @@ pub struct DmlAggregator { impl DmlAggregator { pub fn new( collector: Option>, - database_name: String, + database_name: impl Into>, max_size: usize, sequencer_id: u32, time_provider: Arc, ) -> Self { Self { collector, - database_name, + database_name: database_name.into(), max_size, sequencer_id, state: DmlAggregatorState::default(), @@ -418,6 +418,7 @@ impl Aggregator for DmlAggregator { records, Deaggregator { sequencer_id: self.sequencer_id, + database_name: Arc::clone(&self.database_name), metadata, tag_to_record: state.tag_to_record, }, @@ -484,6 +485,9 @@ pub struct Deaggregator { /// Sequencer ID. sequencer_id: u32, + /// Database name + database_name: Arc, + /// Metadata for every record. /// /// This is NOT per-tag, use `tag_to_record` to map tags to records first. @@ -504,13 +508,15 @@ impl StatusDeaggregator for Deaggregator { tag: Self::Tag, ) -> Result { assert_eq!(input.len(), self.tag_to_record.len(), "invalid offsets"); - assert!( - self.tag_to_record.len() > tag.0, - "tag {} out of range (tag_to_record: {:?}, offsets: {:?})", - tag.0, - self.tag_to_record, - input - ); + + if self.tag_to_record.len() <= tag.0 { + error!( + "tag {} out of range (database_name: {}, tag_to_record: {:?}, offsets: {:?})", + tag.0, self.database_name, self.tag_to_record, input + ); + // TODO: Temporary non-fatal assertion to reduce log spam (#3805) + return Err("internal aggregator error: invalid tag".into()); + } let record = self.tag_to_record[tag.0]; From 39e39f92a8477979fc5b53179dcaa5da5f2a78fc Mon Sep 17 00:00:00 2001 From: Marko Mikulicic Date: Tue, 1 Mar 2022 18:29:02 +0100 Subject: [PATCH 8/8] fix: remove rskafka from valid options for INFLUXDB_IOX_WRITE_BUFFER_TYPE (#3890) Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com> --- influxdb_iox/src/clap_blocks/write_buffer.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/influxdb_iox/src/clap_blocks/write_buffer.rs b/influxdb_iox/src/clap_blocks/write_buffer.rs index 6e14b8e94c..e11b8623f8 100644 --- a/influxdb_iox/src/clap_blocks/write_buffer.rs +++ b/influxdb_iox/src/clap_blocks/write_buffer.rs @@ -11,7 +11,7 @@ use write_buffer::{ pub struct WriteBufferConfig { /// The type of write buffer to use. /// - /// Valid options are: file, kafka, rskafka + /// Valid options are: file, kafka #[clap(long = "--write-buffer", env = "INFLUXDB_IOX_WRITE_BUFFER_TYPE")] pub(crate) type_: String,