From 5e94aee44a41ac3639783713706cd1ed9480710d Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Wed, 15 Mar 2023 15:08:19 +0100 Subject: [PATCH 01/15] feat(wal): reference tracker metrics Adds three metrics to expose the internal state of the WAL file reference tracker. These metrics are mainly useful to identify why WAL files are not being deleted, if any. --- ingester2/src/wal/reference_tracker.rs | 207 +++++++++++++++++++++++-- 1 file changed, 194 insertions(+), 13 deletions(-) diff --git a/ingester2/src/wal/reference_tracker.rs b/ingester2/src/wal/reference_tracker.rs index 339d4a9c43..8b7564fa96 100644 --- a/ingester2/src/wal/reference_tracker.rs +++ b/ingester2/src/wal/reference_tracker.rs @@ -9,6 +9,7 @@ use data_types::{ SequenceNumber, }; use hashbrown::HashMap; +use metric::U64Gauge; use observability_deps::tracing::{debug, info, warn}; use tokio::{ select, @@ -112,7 +113,7 @@ impl WalReferenceHandle { /// The returned [`WalReferenceActor`] SHOULD be /// [`WalReferenceActor::run()`] before the handle is used to avoid /// potential deadlocks. - pub(crate) fn new(wal: T) -> (Self, WalReferenceActor) + pub(crate) fn new(wal: T, metrics: &metric::Registry) -> (Self, WalReferenceActor) where T: WalFileDeleter, { @@ -120,14 +121,7 @@ impl WalReferenceHandle { let (persist_tx, persist_rx) = mpsc::channel(50); let (unbuffered_tx, unbuffered_rx) = mpsc::channel(50); - let actor = WalReferenceActor { - wal, - persisted: SequenceNumberSet::default(), - wal_files: HashMap::with_capacity(3), - file_rx, - persist_rx, - unbuffered_rx, - }; + let actor = WalReferenceActor::new(wal, file_rx, persist_rx, unbuffered_rx, metrics); ( Self { @@ -206,15 +200,72 @@ pub(crate) struct WalReferenceActor> { /// Invariant: sets in this map are always non-empty. wal_files: HashMap, + /// Channels for input from the [`WalReferenceHandle`]. file_rx: mpsc::Receiver<(SegmentId, SequenceNumberSet)>, persist_rx: mpsc::Receiver>, unbuffered_rx: mpsc::Receiver, + + /// A metric tracking the number of rotated WAL files being reference + /// tracked. + num_files: U64Gauge, + /// The minimum [`SegmentId`] in `wal_files`, the set of old (rotated out) + /// files that will eventually be deleted. + /// + /// If this value never changes over the lifetime of an ingester, it is an + /// indication of a reference leak bug, causing a WAL file to never be + /// deleted. + min_id: U64Gauge, + /// The number of references to unpersisted operations remaining in the old + /// (rotated out) WAL files, decreasing as persistence completes, and + /// increasing as non-empty WAL files are rotated into `wal_files`. + referenced_ops: U64Gauge, } impl WalReferenceActor where T: WalFileDeleter, { + fn new( + wal: T, + file_rx: mpsc::Receiver<(SegmentId, SequenceNumberSet)>, + persist_rx: mpsc::Receiver>, + unbuffered_rx: mpsc::Receiver, + metrics: &metric::Registry, + ) -> Self { + let num_files = metrics + .register_metric::( + "ingester_wal_inactive_file_count", + "number of WAL files that are not being actively wrote to, but contain unpersisted data" + ) + .recorder(&[]); + + let min_id = metrics + .register_metric::( + "ingester_wal_inactive_min_id", + "the segment ID of the oldest inactive wal file", + ) + .recorder(&[]); + + let referenced_ops = metrics + .register_metric::( + "ingester_wal_inactive_file_op_reference_count", + "the number of unpersisted operations referenced in inactive WAL files", + ) + .recorder(&[]); + + Self { + wal, + persisted: SequenceNumberSet::default(), + wal_files: HashMap::with_capacity(3), + file_rx, + persist_rx, + unbuffered_rx, + num_files, + min_id, + referenced_ops, + } + } + /// Execute the actor task. /// /// This task exits once the sender side of the input channels have been @@ -235,11 +286,50 @@ where Some(i) = self.unbuffered_rx.recv() => self.handle_unbuffered(i).await, else => break } + + // After each action is processed, update the metrics. + self.update_metrics(); } debug!("stopping wal reference counter task"); } + /// Update the metrics to match the internal state. + fn update_metrics(&self) { + let num_files = self.wal_files.len(); + + // Build a set of (id, set_len) tuples for debug logging. + let id_lens = self + .wal_files + .iter() + .map(|(id, set)| (*id, set.len())) + .collect::>(); + + // Emit a log for debugging purposes, showing the current state. + debug!( + num_files, + files=?id_lens, + persisted_set_len=self.persisted.len(), + "updated reference state" + ); + + // Reduce (id, set_len) tuples to the min ID and sum of the set lengths, + // defaulting to 0 for the length and u64::MAX for the ID if the file + // set is empty. + let (min_id, referenced_ops) = + id_lens + .into_iter() + .fold((u64::MAX, 0), |(id_min, len_sum), e| { + assert!(e.1 > 0); // Invariant: sets in file map are never empty + (id_min.min(e.0.get()), len_sum + e.1) + }); + + // And update the various exported metrics. + self.num_files.set(num_files as _); + self.min_id.set(min_id); + self.referenced_ops.set(referenced_ops); + } + /// Track a newly rotated WAL segment, with the given [`SegmentId`] and /// containing the operations specified in [`SequenceNumberSet`]. /// @@ -409,6 +499,7 @@ mod tests { use assert_matches::assert_matches; use data_types::{NamespaceId, PartitionId, TableId}; use futures::Future; + use metric::assert_counter; use parking_lot::Mutex; use test_helpers::timeout::FutureTimeout; use tokio::sync::Notify; @@ -477,8 +568,9 @@ mod tests { async fn test_rotate_persist_delete() { const SEGMENT_ID: SegmentId = SegmentId::new(42); + let metrics = metric::Registry::default(); let wal = Arc::new(MockWalDeleter::default()); - let (handle, actor) = WalReferenceHandle::new(Arc::clone(&wal)); + let (handle, actor) = WalReferenceHandle::new(Arc::clone(&wal), &metrics); let actor_task = tokio::spawn(actor.run()); @@ -511,6 +603,20 @@ mod tests { // Validate the correct ID was deleted assert_matches!(wal.calls().as_slice(), &[v] if v == SEGMENT_ID); + assert_counter!( + metrics, + U64Gauge, + "ingester_wal_inactive_file_count", + value = 0, + ); + + assert_counter!( + metrics, + U64Gauge, + "ingester_wal_inactive_file_op_reference_count", + value = 0, + ); + // Assert clean shutdown behaviour. drop(handle); actor_task @@ -535,8 +641,9 @@ mod tests { const SEGMENT_ID_1: SegmentId = SegmentId::new(42); const SEGMENT_ID_2: SegmentId = SegmentId::new(24); + let metrics = metric::Registry::default(); let wal = Arc::new(MockWalDeleter::default()); - let (handle, actor) = WalReferenceHandle::new(Arc::clone(&wal)); + let (handle, actor) = WalReferenceHandle::new(Arc::clone(&wal), &metrics); let actor_task = tokio::spawn(actor.run()); @@ -598,8 +705,9 @@ mod tests { async fn test_empty_file_set() { const SEGMENT_ID: SegmentId = SegmentId::new(42); + let metrics = metric::Registry::default(); let wal = Arc::new(MockWalDeleter::default()); - let (handle, actor) = WalReferenceHandle::new(Arc::clone(&wal)); + let (handle, actor) = WalReferenceHandle::new(Arc::clone(&wal), &metrics); let actor_task = tokio::spawn(actor.run()); @@ -625,8 +733,9 @@ mod tests { #[tokio::test] #[should_panic(expected = "duplicate segment ID")] async fn test_duplicate_segment_ids() { + let metrics = metric::Registry::default(); let wal = Arc::new(MockWalDeleter::default()); - let (handle, actor) = WalReferenceHandle::new(Arc::clone(&wal)); + let (handle, actor) = WalReferenceHandle::new(Arc::clone(&wal), &metrics); // Enqueuing a notification before the actor is running should succeed // because of the channel buffer capacity. @@ -643,4 +752,76 @@ mod tests { // This should panic after processing the second file. actor.run().with_timeout_panic(Duration::from_secs(5)).await; } + + /// Enqueue two segment files, enqueue persist notifications for the second + /// file and wait for it to be deleted to synchronise the state (so it's not + /// a racy test). + /// + /// Then assert the metric values for the known state. + #[tokio::test] + async fn test_metrics() { + const SEGMENT_ID_1: SegmentId = SegmentId::new(42); + const SEGMENT_ID_2: SegmentId = SegmentId::new(24); + + let metrics = metric::Registry::default(); + let wal = Arc::new(MockWalDeleter::default()); + let (handle, actor) = WalReferenceHandle::new(Arc::clone(&wal), &metrics); + + let actor_task = tokio::spawn(actor.run()); + + // Add a file with 4 references + handle + .enqueue_rotated_file(SEGMENT_ID_1, new_set([1, 2, 3, 4, 5])) + .await; + + // Reduce the reference count for file 1 (leaving 3 references) + handle.enqueue_persist_notification(new_note([1, 2])).await; + + // Enqueue the second file. + handle + .enqueue_rotated_file(SEGMENT_ID_2, new_set([6])) + .await; + + // Release the references to file 2 + let waker = wal.waker(); + handle.enqueue_persist_notification(new_note([6])).await; + + waker.await; + + // + // At this point, the actor has deleted the second file, which means it + // has already processed the first enqueued file, and the first persist + // notification that relates to the first file. + // + // A non-racy assert can now be made against this known state. + // + + assert_counter!( + metrics, + U64Gauge, + "ingester_wal_inactive_file_count", + value = 1, + ); + + assert_counter!( + metrics, + U64Gauge, + "ingester_wal_inactive_file_op_reference_count", + value = 3, // 5 initial, reduced by 2 via persist notification + ); + + assert_counter!( + metrics, + U64Gauge, + "ingester_wal_inactive_min_id", + value = SEGMENT_ID_1.get(), + ); + + // Assert clean shutdown behaviour. + drop(handle); + actor_task + .with_timeout_panic(Duration::from_secs(5)) + .await + .expect("actor task should stop cleanly") + } } From 393de6980e59bb0596963b22d07dd790a47401e6 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Wed, 15 Mar 2023 19:55:33 +0100 Subject: [PATCH 02/15] feat: debug-log errors during chunk extraction (#7223) Helps debugging while working on #6098 . --- .../physical_optimizer/chunk_extraction.rs | 70 ++++++++++++++----- 1 file changed, 52 insertions(+), 18 deletions(-) diff --git a/iox_query/src/physical_optimizer/chunk_extraction.rs b/iox_query/src/physical_optimizer/chunk_extraction.rs index 898cd7ab8c..071e3ae040 100644 --- a/iox_query/src/physical_optimizer/chunk_extraction.rs +++ b/iox_query/src/physical_optimizer/chunk_extraction.rs @@ -1,9 +1,13 @@ use std::sync::Arc; -use datafusion::physical_plan::{ - empty::EmptyExec, file_format::ParquetExec, union::UnionExec, visit_execution_plan, - ExecutionPlan, ExecutionPlanVisitor, +use datafusion::{ + error::DataFusionError, + physical_plan::{ + empty::EmptyExec, file_format::ParquetExec, union::UnionExec, visit_execution_plan, + ExecutionPlan, ExecutionPlanVisitor, + }, }; +use observability_deps::tracing::debug; use schema::Schema; use crate::{ @@ -22,7 +26,13 @@ use crate::{ /// [`chunks_to_physical_nodes`]: crate::provider::chunks_to_physical_nodes pub fn extract_chunks(plan: &dyn ExecutionPlan) -> Option<(Schema, Vec>)> { let mut visitor = ExtractChunksVisitor::default(); - visit_execution_plan(plan, &mut visitor).ok()?; + if let Err(e) = visit_execution_plan(plan, &mut visitor) { + debug!( + %e, + "cannot extract chunks", + ); + return None; + } visitor.schema.map(|schema| (schema, visitor.chunks)) } @@ -33,16 +43,22 @@ struct ExtractChunksVisitor { } impl ExtractChunksVisitor { - fn add_chunk(&mut self, chunk: Arc) -> Result<(), ()> { + fn add_chunk(&mut self, chunk: Arc) { self.chunks.push(chunk); - Ok(()) } - fn add_schema_from_exec(&mut self, exec: &dyn ExecutionPlan) -> Result<(), ()> { - let schema = Schema::try_from(exec.schema()).map_err(|_| ())?; + fn add_schema_from_exec(&mut self, exec: &dyn ExecutionPlan) -> Result<(), DataFusionError> { + let schema = Schema::try_from(exec.schema()).map_err(|e| { + DataFusionError::Context( + "Schema recovery".to_owned(), + Box::new(DataFusionError::External(Box::new(e))), + ) + })?; if let Some(existing) = &self.schema { if existing != &schema { - return Err(()); + return Err(DataFusionError::External( + String::from("Different schema").into(), + )); } } else { self.schema = Some(schema); @@ -52,19 +68,27 @@ impl ExtractChunksVisitor { } impl ExecutionPlanVisitor for ExtractChunksVisitor { - type Error = (); + type Error = DataFusionError; fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> Result { let plan_any = plan.as_any(); if let Some(record_batches_exec) = plan_any.downcast_ref::() { - self.add_schema_from_exec(record_batches_exec)?; + self.add_schema_from_exec(record_batches_exec) + .map_err(|e| { + DataFusionError::Context( + "add schema from RecordBatchesExec".to_owned(), + Box::new(e), + ) + })?; for chunk in record_batches_exec.chunks() { - self.add_chunk(Arc::clone(chunk))?; + self.add_chunk(Arc::clone(chunk)); } } else if let Some(parquet_exec) = plan_any.downcast_ref::() { - self.add_schema_from_exec(parquet_exec)?; + self.add_schema_from_exec(parquet_exec).map_err(|e| { + DataFusionError::Context("add schema from ParquetExec".to_owned(), Box::new(e)) + })?; for group in &parquet_exec.base_config().file_groups { for file in group { @@ -72,22 +96,32 @@ impl ExecutionPlanVisitor for ExtractChunksVisitor { .extensions .as_ref() .and_then(|any| any.downcast_ref::()) - .ok_or(())?; - self.add_chunk(Arc::clone(&ext.0))?; + .ok_or_else(|| { + DataFusionError::External( + String::from("PartitionedFileExt not found").into(), + ) + })?; + self.add_chunk(Arc::clone(&ext.0)); } } } else if let Some(empty_exec) = plan_any.downcast_ref::() { // should not produce dummy data if empty_exec.produce_one_row() { - return Err(()); + return Err(DataFusionError::External( + String::from("EmptyExec produces row").into(), + )); } - self.add_schema_from_exec(empty_exec)?; + self.add_schema_from_exec(empty_exec).map_err(|e| { + DataFusionError::Context("add schema from EmptyExec".to_owned(), Box::new(e)) + })?; } else if plan_any.downcast_ref::().is_some() { // continue visiting } else { // unsupported node - return Err(()); + return Err(DataFusionError::External( + String::from("Unsupported node").into(), + )); } Ok(true) From 6d6fd8f66392e9b527611f6a9d9c551572ee3b3a Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 15 Mar 2023 22:52:59 +0100 Subject: [PATCH 03/15] feat(flightsql): implement basic `CommandGetCatalogs` support (#7212) * refactor: reduce redundancy in test * chore: implement basic get_catalog support * fix: clippy --- flightsql/src/cmd.rs | 17 ++- flightsql/src/planner.rs | 82 ++++++++---- .../tests/end_to_end_cases/flightsql.rs | 123 ++++++++++++------ influxdb_iox/tests/jdbc_client/Main.java | 32 +++-- influxdb_iox_client/src/client/flightsql.rs | 18 ++- iox_query/src/exec/context.rs | 19 +-- 6 files changed, 205 insertions(+), 86 deletions(-) diff --git a/flightsql/src/cmd.rs b/flightsql/src/cmd.rs index 2344cba42b..6b2ccc7af2 100644 --- a/flightsql/src/cmd.rs +++ b/flightsql/src/cmd.rs @@ -4,7 +4,7 @@ use std::fmt::Display; use arrow_flight::sql::{ ActionClosePreparedStatementRequest, ActionCreatePreparedStatementRequest, Any, - CommandPreparedStatementQuery, CommandStatementQuery, + CommandGetCatalogs, CommandPreparedStatementQuery, CommandStatementQuery, }; use bytes::Bytes; use prost::Message; @@ -14,7 +14,7 @@ use crate::error::*; /// Represents a prepared statement "handle". IOx passes all state /// required to run the prepared statement back and forth to the -/// client so any querier instance can run it +/// client, so any querier instance can run it #[derive(Debug, Clone, PartialEq)] pub struct PreparedStatementHandle { /// The raw SQL query text @@ -57,12 +57,17 @@ impl From for Bytes { } } -/// Decoded / validated FlightSQL command messages +/// Decoded / validated FlightSQL command messages +/// +/// Handles encoding/decoding prost::Any messages back +/// and forth to native Rust types #[derive(Debug, Clone, PartialEq)] pub enum FlightSQLCommand { CommandStatementQuery(String), /// Run a prepared statement CommandPreparedStatementQuery(PreparedStatementHandle), + /// Get a list of the available catalogs + CommandGetCatalogs(), /// Create a prepared statement ActionCreatePreparedStatementRequest(String), /// Close a prepared statement @@ -74,6 +79,7 @@ impl Display for FlightSQLCommand { match self { Self::CommandStatementQuery(q) => write!(f, "CommandStatementQuery{q}"), Self::CommandPreparedStatementQuery(h) => write!(f, "CommandPreparedStatementQuery{h}"), + Self::CommandGetCatalogs() => write!(f, "CommandGetCatalogs"), Self::ActionCreatePreparedStatementRequest(q) => { write!(f, "ActionCreatePreparedStatementRequest{q}") } @@ -100,6 +106,10 @@ impl FlightSQLCommand { let handle = PreparedStatementHandle::try_decode(prepared_statement_handle)?; Ok(Self::CommandPreparedStatementQuery(handle)) + } else if let Some(decoded_cmd) = Any::unpack::(&msg)? { + let CommandGetCatalogs {} = decoded_cmd; + + Ok(Self::CommandGetCatalogs()) } else if let Some(decoded_cmd) = Any::unpack::(&msg)? { let ActionCreatePreparedStatementRequest { query } = decoded_cmd; @@ -131,6 +141,7 @@ impl FlightSQLCommand { prepared_statement_handle, }) } + FlightSQLCommand::CommandGetCatalogs() => Any::pack(&CommandGetCatalogs {}), FlightSQLCommand::ActionCreatePreparedStatementRequest(query) => { Any::pack(&ActionCreatePreparedStatementRequest { query }) } diff --git a/flightsql/src/planner.rs b/flightsql/src/planner.rs index 6779cc3931..b7ca27ba93 100644 --- a/flightsql/src/planner.rs +++ b/flightsql/src/planner.rs @@ -1,13 +1,13 @@ //! FlightSQL handling use std::sync::Arc; -use arrow::{error::ArrowError, ipc::writer::IpcWriteOptions}; +use arrow::{datatypes::Schema, error::ArrowError, ipc::writer::IpcWriteOptions}; use arrow_flight::{ sql::{ActionCreatePreparedStatementResult, Any}, IpcMessage, SchemaAsIpc, }; use bytes::Bytes; -use datafusion::physical_plan::ExecutionPlan; +use datafusion::{logical_expr::LogicalPlan, physical_plan::ExecutionPlan}; use iox_query::{exec::IOxSessionContext, QueryNamespace}; use observability_deps::tracing::debug; use prost::Message; @@ -35,12 +35,17 @@ impl FlightSQLPlanner { match cmd { FlightSQLCommand::CommandStatementQuery(query) => { - Self::get_schema_for_query(&query, ctx).await + get_schema_for_query(&query, ctx).await } FlightSQLCommand::CommandPreparedStatementQuery(handle) => { - Self::get_schema_for_query(handle.query(), ctx).await + get_schema_for_query(handle.query(), ctx).await } - _ => ProtocolSnafu { + FlightSQLCommand::CommandGetCatalogs() => { + let plan = plan_get_catalogs(ctx).await?; + get_schema_for_plan(plan) + } + FlightSQLCommand::ActionCreatePreparedStatementRequest(_) + | FlightSQLCommand::ActionClosePreparedStatementRequest(_) => ProtocolSnafu { cmd: format!("{cmd:?}"), method: "GetFlightInfo", } @@ -48,24 +53,6 @@ impl FlightSQLPlanner { } } - /// Return the schema for the specified query - /// - /// returns: IPC encoded (schema_bytes) for this query - async fn get_schema_for_query(query: &str, ctx: &IOxSessionContext) -> Result { - // gather real schema, but only - let logical_plan = ctx.plan_sql(query).await?; - let schema = arrow::datatypes::Schema::from(logical_plan.schema().as_ref()); - let options = IpcWriteOptions::default(); - - // encode the schema into the correct form - let message: Result = - SchemaAsIpc::new(&schema, &options).try_into(); - - let IpcMessage(schema) = message?; - - Ok(schema) - } - /// Returns a plan that computes results requested in msg pub async fn do_get( namespace_name: impl Into, @@ -86,7 +73,14 @@ impl FlightSQLPlanner { debug!(%query, "Planning FlightSQL prepared query"); Ok(ctx.prepare_sql(query).await?) } - _ => ProtocolSnafu { + FlightSQLCommand::CommandGetCatalogs() => { + debug!("Planning GetCatalogs query"); + let plan = plan_get_catalogs(ctx).await?; + Ok(ctx.create_physical_plan(&plan).await?) + } + + FlightSQLCommand::ActionClosePreparedStatementRequest(_) + | FlightSQLCommand::ActionCreatePreparedStatementRequest(_) => ProtocolSnafu { cmd: format!("{cmd:?}"), method: "DoGet", } @@ -114,7 +108,7 @@ impl FlightSQLPlanner { // see https://github.com/apache/arrow-datafusion/pull/4701 let parameter_schema = vec![]; - let dataset_schema = Self::get_schema_for_query(&query, ctx).await?; + let dataset_schema = get_schema_for_query(&query, ctx).await?; let handle = PreparedStatementHandle::new(query); let result = ActionCreatePreparedStatementResult { @@ -141,3 +135,41 @@ impl FlightSQLPlanner { } } } + +/// Return the schema for the specified query +/// +/// returns: IPC encoded (schema_bytes) for this query +async fn get_schema_for_query(query: &str, ctx: &IOxSessionContext) -> Result { + get_schema_for_plan(ctx.plan_sql(query).await?) +} + +/// Return the schema for the specified logical plan +/// +/// returns: IPC encoded (schema_bytes) for this query +fn get_schema_for_plan(logical_plan: LogicalPlan) -> Result { + // gather real schema, but only + let schema = Schema::from(logical_plan.schema().as_ref()); + encode_schema(&schema) +} + +/// Encodes the schema IPC encoded (schema_bytes) +fn encode_schema(schema: &Schema) -> Result { + let options = IpcWriteOptions::default(); + + // encode the schema into the correct form + let message: Result = SchemaAsIpc::new(schema, &options).try_into(); + + let IpcMessage(schema) = message?; + + Ok(schema) +} + +/// Return a `LogicalPlan` for GetCatalogs +/// +/// In the future this could be made more efficient by building the +/// response directly from the IOx catalog rather than running an +/// entire DataFusion plan. +async fn plan_get_catalogs(ctx: &IOxSessionContext) -> Result { + let query = "SELECT DISTINCT table_catalog AS catalog_name FROM information_schema.tables ORDER BY table_catalog"; + Ok(ctx.plan_sql(query).await?) +} diff --git a/influxdb_iox/tests/end_to_end_cases/flightsql.rs b/influxdb_iox/tests/end_to_end_cases/flightsql.rs index a0411ccb64..17e57379d8 100644 --- a/influxdb_iox/tests/end_to_end_cases/flightsql.rs +++ b/influxdb_iox/tests/end_to_end_cases/flightsql.rs @@ -1,5 +1,7 @@ use std::path::PathBuf; +use arrow::record_batch::RecordBatch; +use arrow_flight::decode::FlightRecordBatchStream; use arrow_util::assert_batches_sorted_eq; use assert_cmd::Command; use datafusion::common::assert_contains; @@ -37,22 +39,10 @@ async fn flightsql_adhoc_query() { "+------+------+--------------------------------+-----+", ]; - let connection = state.cluster().querier().querier_grpc_connection(); - let (channel, _headers) = connection.into_grpc_connection().into_parts(); + let mut client = flightsql_client(state.cluster()); - let mut client = FlightSqlClient::new(channel); - - // Add namespace to client headers until it is fully supported by FlightSQL - let namespace = state.cluster().namespace(); - client.add_header("iox-namespace-name", namespace).unwrap(); - - let batches: Vec<_> = client - .query(sql) - .await - .expect("ran SQL query") - .try_collect() - .await - .expect("got batches"); + let stream = client.query(sql).await.unwrap(); + let batches = collect_stream(stream).await; assert_batches_sorted_eq!(&expected, &batches); } @@ -84,14 +74,7 @@ async fn flightsql_adhoc_query_error() { async move { let sql = String::from("select * from incorrect_table"); - let connection = state.cluster().querier().querier_grpc_connection(); - let (channel, _headers) = connection.into_grpc_connection().into_parts(); - - let mut client = FlightSqlClient::new(channel); - - // Add namespace to client headers until it is fully supported by FlightSQL - let namespace = state.cluster().namespace(); - client.add_header("iox-namespace-name", namespace).unwrap(); + let mut client = flightsql_client(state.cluster()); let err = client.query(sql).await.unwrap_err(); @@ -138,24 +121,54 @@ async fn flightsql_prepared_query() { "+------+------+--------------------------------+-----+", ]; - let connection = state.cluster().querier().querier_grpc_connection(); - let (channel, _headers) = connection.into_grpc_connection().into_parts(); - - let mut client = FlightSqlClient::new(channel); - - // Add namespace to client headers until it is fully supported by FlightSQL - let namespace = state.cluster().namespace(); - client.add_header("iox-namespace-name", namespace).unwrap(); + let mut client = flightsql_client(state.cluster()); let handle = client.prepare(sql).await.unwrap(); + let stream = client.execute(handle).await.unwrap(); - let batches: Vec<_> = client - .execute(handle) - .await - .expect("ran SQL query") - .try_collect() - .await - .expect("got batches"); + let batches = collect_stream(stream).await; + + assert_batches_sorted_eq!(&expected, &batches); + } + .boxed() + })), + ], + ) + .run() + .await +} + +#[tokio::test] +async fn flightsql_get_catalogs() { + test_helpers::maybe_start_logging(); + let database_url = maybe_skip_integration!(); + + let table_name = "the_table"; + + // Set up the cluster ==================================== + let mut cluster = MiniCluster::create_shared2(database_url).await; + + StepTest::new( + &mut cluster, + vec![ + Step::WriteLineProtocol(format!( + "{table_name},tag1=A,tag2=B val=42i 123456\n\ + {table_name},tag1=A,tag2=C val=43i 123457" + )), + Step::Custom(Box::new(move |state: &mut StepTestState| { + async move { + let expected = vec![ + "+--------------+", + "| catalog_name |", + "+--------------+", + "| public |", + "+--------------+", + ]; + + let mut client = flightsql_client(state.cluster()); + + let stream = client.get_catalogs().await.unwrap(); + let batches = collect_stream(stream).await; assert_batches_sorted_eq!(&expected, &batches); } @@ -243,6 +256,22 @@ async fn flightsql_jdbc() { .success() .stdout(predicate::str::contains("Running Prepared SQL Query")) .stdout(predicate::str::contains("A, B")); + + // CommandGetCatalogs output + let expected_catalogs = "**************\n\ + Catalogs:\n\ + **************\n\ + TABLE_CAT\n\ + ------------\n\ + public"; + + // Validate metadata: jdbc_client metadata + Command::from_std(std::process::Command::new(&path)) + .arg(&jdbc_url) + .arg("metadata") + .assert() + .success() + .stdout(predicate::str::contains(expected_catalogs)); } .boxed() })), @@ -251,3 +280,21 @@ async fn flightsql_jdbc() { .run() .await } + +/// Return a [`FlightSqlClient`] configured for use +fn flightsql_client(cluster: &MiniCluster) -> FlightSqlClient { + let connection = cluster.querier().querier_grpc_connection(); + let (channel, _headers) = connection.into_grpc_connection().into_parts(); + + let mut client = FlightSqlClient::new(channel); + + // Add namespace to client headers until it is fully supported by FlightSQL + let namespace = cluster.namespace(); + client.add_header("iox-namespace-name", namespace).unwrap(); + + client +} + +async fn collect_stream(stream: FlightRecordBatchStream) -> Vec { + stream.try_collect().await.expect("collecting batches") +} diff --git a/influxdb_iox/tests/jdbc_client/Main.java b/influxdb_iox/tests/jdbc_client/Main.java index 6cca662e58..4b1e16f61a 100644 --- a/influxdb_iox/tests/jdbc_client/Main.java +++ b/influxdb_iox/tests/jdbc_client/Main.java @@ -19,7 +19,7 @@ public class Main { System.err.println("# Run specified prepared query without parameters"); System.err.println("jdbc_client prepared_query "); System.err.println("# Run metadata tests"); - System.err.println("jdbc_client props"); + System.err.println("jdbc_client metadata"); System.exit(1); } @@ -63,9 +63,9 @@ public class Main { } break; - case "props": + case "metadata": { - run_props(url); + run_metadata(url); } break; @@ -115,17 +115,27 @@ public class Main { print_result_set(rs); } - static void run_props(String url) throws SQLException { + static void run_metadata(String url) throws SQLException { Connection conn = connect(url); System.out.println(conn.getCatalog()); DatabaseMetaData md = conn.getMetaData(); - System.out.println("isReadOnly: " + md.isReadOnly()); - System.out.println("getSearchStringEscape: " + md.getSearchStringEscape()); - System.out.println("getDriverVersion: " + md.getDriverVersion()); - System.out.println("getDatabaseProductVersion: " + md.getDatabaseProductVersion()); - System.out.println("getJDBCMajorVersion: " + md.getJDBCMajorVersion()); - System.out.println("getJDBCMinorVersion: " + md.getJDBCMinorVersion()); - System.out.println("getDriverName: " + md.getDriverName()); + // Note yet implemented + // (see https://github.com/influxdata/influxdb_iox/issues/7210 ) + + System.out.println("**************"); + System.out.println("Catalogs:"); + System.out.println("**************"); + ResultSet rs = md.getCatalogs(); + print_result_set(rs); + + //System.out.println("isReadOnly: " + md.isReadOnly()); + //System.out.println("getSearchStringEscape: " + md.getSearchStringEscape()); + //System.out.println("getDriverVersion: " + md.getDriverVersion()); + //System.out.println("getDatabaseProductVersion: " + md.getDatabaseProductVersion()); + //System.out.println("getJDBCMajorVersion: " + md.getJDBCMajorVersion()); + //System.out.println("getJDBCMinorVersion: " + md.getJDBCMinorVersion()); + //System.out.println("getDriverName: " + md.getDriverName()); + } // Print out the ResultSet in a whitespace delimited form diff --git a/influxdb_iox_client/src/client/flightsql.rs b/influxdb_iox_client/src/client/flightsql.rs index 8136357135..116c365dd0 100644 --- a/influxdb_iox_client/src/client/flightsql.rs +++ b/influxdb_iox_client/src/client/flightsql.rs @@ -29,7 +29,7 @@ use arrow_flight::{ error::{FlightError, Result}, sql::{ ActionCreatePreparedStatementRequest, ActionCreatePreparedStatementResult, Any, - CommandPreparedStatementQuery, CommandStatementQuery, ProstMessageExt, + CommandGetCatalogs, CommandPreparedStatementQuery, CommandStatementQuery, ProstMessageExt, }, Action, FlightClient, FlightDescriptor, FlightInfo, IpcMessage, Ticket, }; @@ -124,6 +124,22 @@ impl FlightSqlClient { self.do_get_with_cmd(msg.as_any()).await } + /// List the catalogs on this server using a [`CommandGetCatalogs`] message. + /// + /// This implementation does not support alternate endpoints + /// + /// [`CommandGetCatalogs`]: https://github.com/apache/arrow/blob/3a6fc1f9eedd41df2d8ffbcbdfbdab911ff6d82e/format/FlightSql.proto#L1125-L1140 + pub async fn get_catalogs(&mut self) -> Result { + let msg = CommandGetCatalogs {}; + self.do_get_with_cmd(msg.as_any()).await + } + + /// Implements the canonical interaction for most FlightSQL messages: + /// + /// 1. Call `GetFlightInfo` with the provided message, and get a + /// [`FlightInfo`] and embedded ticket. + /// + /// 2. Call `DoGet` with the provided ticket. async fn do_get_with_cmd( &mut self, cmd: arrow_flight::sql::Any, diff --git a/iox_query/src/exec/context.rs b/iox_query/src/exec/context.rs index 7793113465..1e8bbdbf82 100644 --- a/iox_query/src/exec/context.rs +++ b/iox_query/src/exec/context.rs @@ -329,6 +329,15 @@ impl IOxSessionContext { pub async fn prepare_sql(&self, sql: &str) -> Result> { let logical_plan = self.plan_sql(sql).await?; + let ctx = self.child_ctx("prepare_sql"); + ctx.create_physical_plan(&logical_plan).await + } + + /// Prepare (optimize + plan) a pre-created [`LogicalPlan`] for execution + pub async fn create_physical_plan( + &self, + logical_plan: &LogicalPlan, + ) -> Result> { // Make nicer erorrs for unsupported SQL // (By default datafusion returns Internal Error) match &logical_plan { @@ -353,15 +362,9 @@ impl IOxSessionContext { _ => (), } - let ctx = self.child_ctx("prepare_sql"); - ctx.create_physical_plan(&logical_plan).await - } - - /// Prepare (optimize + plan) a pre-created [`LogicalPlan`] for execution - pub async fn create_physical_plan(&self, plan: &LogicalPlan) -> Result> { let mut ctx = self.child_ctx("create_physical_plan"); - debug!(text=%plan.display_indent_schema(), "create_physical_plan: initial plan"); - let physical_plan = ctx.inner.state().create_physical_plan(plan).await?; + debug!(text=%logical_plan.display_indent_schema(), "create_physical_plan: initial plan"); + let physical_plan = ctx.inner.state().create_physical_plan(logical_plan).await?; ctx.recorder.event("physical plan"); debug!(text=%displayable(physical_plan.as_ref()).indent(), "create_physical_plan: plan to run"); From 3a31f41c2c9b49ca6cd46dd32dff0927d4c3ac71 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Thu, 16 Mar 2023 09:45:14 +0100 Subject: [PATCH 04/15] refactor: use arrow schema in `chunks_to_physical_nodes` (#7217) We don't need a validated IOx schema in this method. This will simplify some work on #6098. Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com> --- .../physical_optimizer/chunk_extraction.rs | 10 +++++-- .../src/physical_optimizer/combine_chunks.rs | 4 +-- .../dedup/dedup_null_columns.rs | 5 ++-- .../dedup/dedup_sort_order.rs | 5 ++-- .../dedup/partition_split.rs | 3 ++- .../physical_optimizer/dedup/remove_dedup.rs | 3 ++- .../src/physical_optimizer/dedup/test_util.rs | 2 +- .../physical_optimizer/dedup/time_split.rs | 3 ++- iox_query/src/provider.rs | 8 +++--- iox_query/src/provider/physical.rs | 27 +++++++++---------- 10 files changed, 40 insertions(+), 30 deletions(-) diff --git a/iox_query/src/physical_optimizer/chunk_extraction.rs b/iox_query/src/physical_optimizer/chunk_extraction.rs index 071e3ae040..f5f615bda2 100644 --- a/iox_query/src/physical_optimizer/chunk_extraction.rs +++ b/iox_query/src/physical_optimizer/chunk_extraction.rs @@ -215,7 +215,7 @@ mod tests { #[test] fn test_stop_at_other_node_types() { let chunk1 = chunk(1); - let schema = chunk1.schema().clone(); + let schema = chunk1.schema().as_arrow(); let plan = chunks_to_physical_nodes( &schema, None, @@ -259,7 +259,13 @@ mod tests { #[track_caller] fn assert_roundtrip(schema: Schema, chunks: Vec>) { - let plan = chunks_to_physical_nodes(&schema, None, chunks.clone(), Predicate::default(), 2); + let plan = chunks_to_physical_nodes( + &schema.as_arrow(), + None, + chunks.clone(), + Predicate::default(), + 2, + ); let (schema2, chunks2) = extract_chunks(plan.as_ref()).expect("data found"); assert_eq!(schema, schema2); assert_eq!(chunk_ids(&chunks), chunk_ids(&chunks2)); diff --git a/iox_query/src/physical_optimizer/combine_chunks.rs b/iox_query/src/physical_optimizer/combine_chunks.rs index ac898e4c0f..78cd14602a 100644 --- a/iox_query/src/physical_optimizer/combine_chunks.rs +++ b/iox_query/src/physical_optimizer/combine_chunks.rs @@ -36,7 +36,7 @@ impl PhysicalOptimizerRule for CombineChunks { plan.transform_up(&|plan| { if let Some((iox_schema, chunks)) = extract_chunks(plan.as_ref()) { return Ok(Some(chunks_to_physical_nodes( - &iox_schema, + &iox_schema.as_arrow(), None, chunks, Predicate::new(), @@ -72,7 +72,7 @@ mod tests { let chunk3 = TestChunk::new("table").with_id(3); let chunk4 = TestChunk::new("table").with_id(4).with_dummy_parquet_file(); let chunk5 = TestChunk::new("table").with_id(5).with_dummy_parquet_file(); - let schema = chunk1.schema().clone(); + let schema = chunk1.schema().as_arrow(); let plan = Arc::new(UnionExec::new(vec![ chunks_to_physical_nodes( &schema, diff --git a/iox_query/src/physical_optimizer/dedup/dedup_null_columns.rs b/iox_query/src/physical_optimizer/dedup/dedup_null_columns.rs index 8efcf43e5b..f9790cb22c 100644 --- a/iox_query/src/physical_optimizer/dedup/dedup_null_columns.rs +++ b/iox_query/src/physical_optimizer/dedup/dedup_null_columns.rs @@ -64,15 +64,16 @@ impl PhysicalOptimizerRule for DedupNullColumns { } let sort_key = sort_key_builder.build(); + let arrow_schema = schema.as_arrow(); let child = chunks_to_physical_nodes( - &schema, + &arrow_schema, (!sort_key.is_empty()).then_some(&sort_key), chunks, Predicate::new(), config.execution.target_partitions, ); - let sort_exprs = arrow_sort_key_exprs(&sort_key, schema.as_arrow().as_ref()); + let sort_exprs = arrow_sort_key_exprs(&sort_key, &arrow_schema); return Ok(Some(Arc::new(DeduplicateExec::new(child, sort_exprs)))); } diff --git a/iox_query/src/physical_optimizer/dedup/dedup_sort_order.rs b/iox_query/src/physical_optimizer/dedup/dedup_sort_order.rs index 57a5d2d840..650f9dc141 100644 --- a/iox_query/src/physical_optimizer/dedup/dedup_sort_order.rs +++ b/iox_query/src/physical_optimizer/dedup/dedup_sort_order.rs @@ -124,15 +124,16 @@ impl PhysicalOptimizerRule for DedupSortOrder { } let quorum_sort_key = quorum_sort_key_builder.build(); + let arrow_schema = schema.as_arrow(); let child = chunks_to_physical_nodes( - &schema, + &arrow_schema, (!quorum_sort_key.is_empty()).then_some(&quorum_sort_key), chunks, Predicate::new(), config.execution.target_partitions, ); - let sort_exprs = arrow_sort_key_exprs(&quorum_sort_key, schema.as_arrow().as_ref()); + let sort_exprs = arrow_sort_key_exprs(&quorum_sort_key, &arrow_schema); return Ok(Some(Arc::new(DeduplicateExec::new(child, sort_exprs)))); } diff --git a/iox_query/src/physical_optimizer/dedup/partition_split.rs b/iox_query/src/physical_optimizer/dedup/partition_split.rs index 0d21fc5113..066f97febb 100644 --- a/iox_query/src/physical_optimizer/dedup/partition_split.rs +++ b/iox_query/src/physical_optimizer/dedup/partition_split.rs @@ -76,13 +76,14 @@ impl PhysicalOptimizerRule for PartitionSplit { let mut chunks_by_partition = chunks_by_partition.into_iter().collect::>(); chunks_by_partition.sort_by_key(|(p_id, _chunks)| *p_id); + let arrow_schema = schema.as_arrow(); let out = UnionExec::new( chunks_by_partition .into_iter() .map(|(_p_id, chunks)| { Arc::new(DeduplicateExec::new( chunks_to_physical_nodes( - &schema, + &arrow_schema, None, chunks, Predicate::new(), diff --git a/iox_query/src/physical_optimizer/dedup/remove_dedup.rs b/iox_query/src/physical_optimizer/dedup/remove_dedup.rs index edda8ac6b9..1aae525dcf 100644 --- a/iox_query/src/physical_optimizer/dedup/remove_dedup.rs +++ b/iox_query/src/physical_optimizer/dedup/remove_dedup.rs @@ -35,8 +35,9 @@ impl PhysicalOptimizerRule for RemoveDedup { }; if (chunks.len() < 2) && chunks.iter().all(|c| !c.may_contain_pk_duplicates()) { + let arrow_schema = schema.as_arrow(); return Ok(Some(chunks_to_physical_nodes( - &schema, + &arrow_schema, None, chunks, Predicate::new(), diff --git a/iox_query/src/physical_optimizer/dedup/test_util.rs b/iox_query/src/physical_optimizer/dedup/test_util.rs index 19bb3e508a..25fff9131a 100644 --- a/iox_query/src/physical_optimizer/dedup/test_util.rs +++ b/iox_query/src/physical_optimizer/dedup/test_util.rs @@ -16,7 +16,7 @@ pub fn dedup_plan(schema: Schema, chunks: Vec) -> Arc>>(); - let plan = chunks_to_physical_nodes(&schema, None, chunks, Predicate::new(), 2); + let plan = chunks_to_physical_nodes(&schema.as_arrow(), None, chunks, Predicate::new(), 2); let sort_key = schema::sort::SortKey::from_columns(schema.primary_key()); let sort_exprs = arrow_sort_key_exprs(&sort_key, &schema.as_arrow()); diff --git a/iox_query/src/physical_optimizer/dedup/time_split.rs b/iox_query/src/physical_optimizer/dedup/time_split.rs index bed6b825cd..a54715f34e 100644 --- a/iox_query/src/physical_optimizer/dedup/time_split.rs +++ b/iox_query/src/physical_optimizer/dedup/time_split.rs @@ -63,13 +63,14 @@ impl PhysicalOptimizerRule for TimeSplit { return Ok(None); } + let arrow_schema = schema.as_arrow(); let out = UnionExec::new( groups .into_iter() .map(|chunks| { Arc::new(DeduplicateExec::new( chunks_to_physical_nodes( - &schema, + &arrow_schema, None, chunks, Predicate::new(), diff --git a/iox_query/src/provider.rs b/iox_query/src/provider.rs index 0683dea5fd..e75e619dab 100644 --- a/iox_query/src/provider.rs +++ b/iox_query/src/provider.rs @@ -1089,7 +1089,7 @@ impl Deduplicater { // Create the bottom node RecordBatchesExec for this chunk let mut input = chunks_to_physical_nodes( - &input_schema, + &input_schema.as_arrow(), output_sort_key, vec![Arc::clone(&chunk)], predicate, @@ -1267,7 +1267,7 @@ impl Deduplicater { debug!("Build one scan RecordBatchesExec for all non duplicated chunks even if empty"); plans.push(chunks_to_physical_nodes( - output_schema, + &output_schema.as_arrow(), output_sort_key, chunks.into_no_duplicates(deduplication), predicate, @@ -1452,7 +1452,7 @@ mod test { // IOx scan operator let input = chunks_to_physical_nodes( - chunk.schema(), + &chunk.schema().as_arrow(), None, vec![Arc::clone(&chunk)], Predicate::default(), @@ -1540,7 +1540,7 @@ mod test { // IOx scan operator let input = chunks_to_physical_nodes( - chunk.schema(), + &chunk.schema().as_arrow(), None, vec![Arc::clone(&chunk)], Predicate::default(), diff --git a/iox_query/src/provider/physical.rs b/iox_query/src/provider/physical.rs index 1a7bf3a2e2..d9e73bb038 100644 --- a/iox_query/src/provider/physical.rs +++ b/iox_query/src/provider/physical.rs @@ -4,6 +4,7 @@ use crate::{ provider::record_batch_exec::RecordBatchesExec, util::arrow_sort_key_exprs, QueryChunk, QueryChunkData, }; +use arrow::datatypes::SchemaRef; use datafusion::{ datasource::{listing::PartitionedFile, object_store::ObjectStoreUrl}, physical_expr::execution_props::ExecutionProps, @@ -135,14 +136,14 @@ fn combine_sort_key( /// pushdown ([`RecordBatchesExec`] has NO builtin filter function). Delete predicates are NOT applied at all. The /// caller is responsible for wrapping the output node into appropriate filter nodes. pub fn chunks_to_physical_nodes( - iox_schema: &Schema, + schema: &SchemaRef, output_sort_key: Option<&SortKey>, chunks: Vec>, predicate: Predicate, target_partitions: usize, ) -> Arc { if chunks.is_empty() { - return Arc::new(EmptyExec::new(false, iox_schema.as_arrow())); + return Arc::new(EmptyExec::new(false, Arc::clone(schema))); } let mut record_batch_chunks: Vec> = vec![]; @@ -177,7 +178,7 @@ pub fn chunks_to_physical_nodes( if !record_batch_chunks.is_empty() { output_nodes.push(Arc::new(RecordBatchesExec::new( record_batch_chunks, - iox_schema.as_arrow(), + Arc::clone(schema), ))); } let mut parquet_chunks: Vec<_> = parquet_chunks.into_iter().collect(); @@ -202,14 +203,12 @@ pub fn chunks_to_physical_nodes( ); // Tell datafusion about the sort key, if any - let file_schema = iox_schema.as_arrow(); - let output_ordering = - sort_key.map(|sort_key| arrow_sort_key_exprs(&sort_key, &file_schema)); + let output_ordering = sort_key.map(|sort_key| arrow_sort_key_exprs(&sort_key, schema)); let props = ExecutionProps::new(); let filter_expr = predicate.filter_expr() .and_then(|filter_expr| { - match create_physical_expr_from_schema(&props, &filter_expr, &file_schema) { + match create_physical_expr_from_schema(&props, &filter_expr, schema) { Ok(f) => Some(f), Err(e) => { warn!(%e, ?filter_expr, "Error creating physical filter expression, can not push down"); @@ -220,7 +219,7 @@ pub fn chunks_to_physical_nodes( let base_config = FileScanConfig { object_store_url, - file_schema, + file_schema: Arc::clone(schema), file_groups, statistics: Statistics::default(), projection: None, @@ -361,7 +360,7 @@ mod tests { #[test] fn test_chunks_to_physical_nodes_empty() { - let schema = TestChunk::new("table").schema().clone(); + let schema = TestChunk::new("table").schema().as_arrow(); let plan = chunks_to_physical_nodes(&schema, None, vec![], Predicate::new(), 2); insta::assert_yaml_snapshot!( format_execution_plan(&plan), @@ -375,7 +374,7 @@ mod tests { #[test] fn test_chunks_to_physical_nodes_recordbatch() { let chunk = TestChunk::new("table"); - let schema = chunk.schema().clone(); + let schema = chunk.schema().as_arrow(); let plan = chunks_to_physical_nodes(&schema, None, vec![Arc::new(chunk)], Predicate::new(), 2); insta::assert_yaml_snapshot!( @@ -391,7 +390,7 @@ mod tests { #[test] fn test_chunks_to_physical_nodes_parquet_one_file() { let chunk = TestChunk::new("table").with_dummy_parquet_file(); - let schema = chunk.schema().clone(); + let schema = chunk.schema().as_arrow(); let plan = chunks_to_physical_nodes(&schema, None, vec![Arc::new(chunk)], Predicate::new(), 2); insta::assert_yaml_snapshot!( @@ -409,7 +408,7 @@ mod tests { let chunk1 = TestChunk::new("table").with_id(0).with_dummy_parquet_file(); let chunk2 = TestChunk::new("table").with_id(1).with_dummy_parquet_file(); let chunk3 = TestChunk::new("table").with_id(2).with_dummy_parquet_file(); - let schema = chunk1.schema().clone(); + let schema = chunk1.schema().as_arrow(); let plan = chunks_to_physical_nodes( &schema, None, @@ -435,7 +434,7 @@ mod tests { let chunk2 = TestChunk::new("table") .with_id(1) .with_dummy_parquet_file_and_store("iox2://"); - let schema = chunk1.schema().clone(); + let schema = chunk1.schema().as_arrow(); let plan = chunks_to_physical_nodes( &schema, None, @@ -458,7 +457,7 @@ mod tests { fn test_chunks_to_physical_nodes_mixed() { let chunk1 = TestChunk::new("table").with_dummy_parquet_file(); let chunk2 = TestChunk::new("table"); - let schema = chunk1.schema().clone(); + let schema = chunk1.schema().as_arrow(); let plan = chunks_to_physical_nodes( &schema, None, From 2dde0658c6677897445fa175161eab1fb0007248 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Thu, 16 Mar 2023 08:52:27 +0000 Subject: [PATCH 05/15] chore(deps): Bump assert_cmd from 2.0.8 to 2.0.9 (#7228) * chore(deps): Bump assert_cmd from 2.0.8 to 2.0.9 Bumps [assert_cmd](https://github.com/assert-rs/assert_cmd) from 2.0.8 to 2.0.9. - [Release notes](https://github.com/assert-rs/assert_cmd/releases) - [Changelog](https://github.com/assert-rs/assert_cmd/blob/master/CHANGELOG.md) - [Commits](https://github.com/assert-rs/assert_cmd/compare/v2.0.8...v2.0.9) --- updated-dependencies: - dependency-name: assert_cmd dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] * chore: Run cargo hakari tasks --------- Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: CircleCI[bot] Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com> --- Cargo.lock | 34 +++++++++++++++++++++++------- influxdb_iox/Cargo.toml | 2 +- test_helpers_end_to_end/Cargo.toml | 2 +- workspace-hack/Cargo.toml | 1 - 4 files changed, 28 insertions(+), 11 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a0a661f7db..1174c9ee1f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -80,6 +80,12 @@ version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4b46cbb362ab8752921c97e041f5e366ee6297bd428a31275b9fcf1e380f7299" +[[package]] +name = "anstyle" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "80c697cc33851b02ab0c26b2e8a211684fbe627ff1cc506131f35026dd7686dd" + [[package]] name = "anyhow" version = "1.0.69" @@ -360,13 +366,14 @@ dependencies = [ [[package]] name = "assert_cmd" -version = "2.0.8" +version = "2.0.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9834fcc22e0874394a010230586367d4a3e9f11b560f469262678547e1d2575e" +checksum = "c0dcbed38184f9219183fcf38beb4cdbf5df7163a6d7cd227c6ac89b7966d6fe" dependencies = [ + "anstyle", "bstr", "doc-comment", - "predicates", + "predicates 3.0.1", "predicates-core", "predicates-tree", "wait-timeout", @@ -2539,7 +2546,7 @@ dependencies = [ "parquet_file", "parquet_to_line_protocol", "predicate", - "predicates", + "predicates 2.1.5", "prost", "rustyline", "schema", @@ -4333,10 +4340,22 @@ dependencies = [ ] [[package]] -name = "predicates-core" -version = "1.0.5" +name = "predicates" +version = "3.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "72f883590242d3c6fc5bf50299011695fa6590c2c70eac95ee1bdb9a733ad1a2" +checksum = "1ba7d6ead3e3966038f68caa9fc1f860185d95a793180bbcfe0d0da47b3961ed" +dependencies = [ + "anstyle", + "difflib", + "itertools", + "predicates-core", +] + +[[package]] +name = "predicates-core" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b794032607612e7abeb4db69adb4e33590fa6cf1149e95fd7cb00e634b92f174" [[package]] name = "predicates-tree" @@ -6722,7 +6741,6 @@ dependencies = [ "parquet", "petgraph", "phf_shared", - "predicates", "prost", "prost-types", "rand", diff --git a/influxdb_iox/Cargo.toml b/influxdb_iox/Cargo.toml index 70bbcf028a..951ff8e142 100644 --- a/influxdb_iox/Cargo.toml +++ b/influxdb_iox/Cargo.toml @@ -83,7 +83,7 @@ workspace-hack = { version = "0.1", path = "../workspace-hack" } [dev-dependencies] # In alphabetical order arrow_util = { path = "../arrow_util" } -assert_cmd = "2.0.8" +assert_cmd = "2.0.9" async-trait = "0.1" predicate = { path = "../predicate" } predicates = "2.1.0" diff --git a/test_helpers_end_to_end/Cargo.toml b/test_helpers_end_to_end/Cargo.toml index b1200f8cf9..160ba99f44 100644 --- a/test_helpers_end_to_end/Cargo.toml +++ b/test_helpers_end_to_end/Cargo.toml @@ -9,7 +9,7 @@ license.workspace = true arrow = { workspace = true, features = ["prettyprint"] } arrow-flight = { workspace = true } arrow_util = { path = "../arrow_util" } -assert_cmd = "2.0.8" +assert_cmd = "2.0.9" bytes = "1.4" data_types = { path = "../data_types" } dml = { path = "../dml" } diff --git a/workspace-hack/Cargo.toml b/workspace-hack/Cargo.toml index 41b653fc9f..519226d2d3 100644 --- a/workspace-hack/Cargo.toml +++ b/workspace-hack/Cargo.toml @@ -63,7 +63,6 @@ parking_lot = { version = "0.12", features = ["arc_lock"] } parquet = { version = "34", features = ["async", "experimental"] } petgraph = { version = "0.6" } phf_shared = { version = "0.11" } -predicates = { version = "2" } prost = { version = "0.11" } prost-types = { version = "0.11" } rand = { version = "0.8", features = ["small_rng"] } From f128539f9822eb7d5d15f2a3c88ec4811220df1e Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Thu, 16 Mar 2023 09:59:48 +0100 Subject: [PATCH 06/15] feat: more projection pushdown (#7218) * feat: proj->proj pushdown For #6098. * feat: proj->SortPreservingMergeExec pushdown For #6098. --------- Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com> --- .../physical_optimizer/projection_pushdown.rs | 175 +++++++++++++++++- 1 file changed, 173 insertions(+), 2 deletions(-) diff --git a/iox_query/src/physical_optimizer/projection_pushdown.rs b/iox_query/src/physical_optimizer/projection_pushdown.rs index 86da9bff2b..afa5a1e3c0 100644 --- a/iox_query/src/physical_optimizer/projection_pushdown.rs +++ b/iox_query/src/physical_optimizer/projection_pushdown.rs @@ -19,7 +19,7 @@ use datafusion::{ filter::FilterExec, projection::ProjectionExec, rewrite::TreeNodeRewritable, - sorts::sort::SortExec, + sorts::{sort::SortExec, sort_preserving_merge::SortPreservingMergeExec}, union::UnionExec, ExecutionPlan, PhysicalExpr, }, @@ -32,10 +32,11 @@ use crate::provider::{DeduplicateExec, RecordBatchesExec}; pub struct ProjectionPushdown; impl PhysicalOptimizerRule for ProjectionPushdown { + #[allow(clippy::only_used_in_recursion)] fn optimize( &self, plan: Arc, - _config: &ConfigOptions, + config: &ConfigOptions, ) -> Result> { plan.transform_down(&|plan| { let plan_any = plan.as_any(); @@ -149,6 +150,47 @@ impl PhysicalOptimizerRule for ProjectionPushdown { }, )?; + return Ok(Some(plan)); + } else if let Some(child_sort) = child_any.downcast_ref::() + { + let sort_required_cols = child_sort + .expr() + .iter() + .map(|expr| collect_columns(&expr.expr)) + .collect::>(); + let sort_required_cols = sort_required_cols + .iter() + .flat_map(|cols| cols.iter()) + .map(|col| col.name()) + .collect::>(); + + let plan = wrap_user_into_projections( + &sort_required_cols, + &column_names, + Arc::clone(child_sort.input()), + |plan| { + Ok(Arc::new(SortPreservingMergeExec::new( + reassign_sort_exprs_columns(child_sort.expr(), &plan.schema())?, + plan, + ))) + }, + )?; + + return Ok(Some(plan)); + } else if let Some(child_proj) = child_any.downcast_ref::() { + let expr = column_indices + .iter() + .map(|idx| child_proj.expr()[*idx].clone()) + .collect(); + let plan = Arc::new(ProjectionExec::try_new( + expr, + Arc::clone(child_proj.input()), + )?); + + // need to call `optimize` directly on the plan, because otherwise we would continue with the child + // and miss the optimization of that particular new ProjectionExec + let plan = self.optimize(plan, config)?; + return Ok(Some(plan)); } else if let Some(child_dedup) = child_any.downcast_ref::() { let dedup_required_cols = child_dedup.sort_columns(); @@ -793,6 +835,135 @@ mod tests { ); } + // since `SortPreservingMergeExec` and `FilterExec` both use `wrap_user_into_projections`, we only test one variant for `SortPreservingMergeExec` + #[test] + fn test_sortpreservingmerge_projection_split() { + let schema = schema(); + let plan = Arc::new( + ProjectionExec::try_new( + vec![(expr_col("tag1", &schema), String::from("tag1"))], + Arc::new(SortPreservingMergeExec::new( + vec![PhysicalSortExpr { + expr: expr_col("tag2", &schema), + options: SortOptions { + descending: true, + ..Default::default() + }, + }], + Arc::new(TestExec::new(schema)), + )), + ) + .unwrap(), + ); + let opt = ProjectionPushdown::default(); + insta::assert_yaml_snapshot!( + OptimizationTest::new(plan, opt), + @r###" + --- + input: + - " ProjectionExec: expr=[tag1@0 as tag1]" + - " SortPreservingMergeExec: [tag2@1 DESC]" + - " Test" + output: + Ok: + - " ProjectionExec: expr=[tag1@0 as tag1]" + - " SortPreservingMergeExec: [tag2@1 DESC]" + - " ProjectionExec: expr=[tag1@0 as tag1, tag2@1 as tag2]" + - " Test" + "### + ); + } + + #[test] + fn test_nested_proj_inner_is_impure() { + let schema = schema(); + let plan = Arc::new(EmptyExec::new(false, schema)); + let plan = Arc::new( + ProjectionExec::try_new( + vec![ + ( + Arc::new(Literal::new(ScalarValue::from("foo"))), + String::from("tag1"), + ), + ( + Arc::new(Literal::new(ScalarValue::from("bar"))), + String::from("tag2"), + ), + ], + plan, + ) + .unwrap(), + ); + let plan = Arc::new( + ProjectionExec::try_new( + vec![(expr_col("tag1", &plan.schema()), String::from("tag1"))], + plan, + ) + .unwrap(), + ); + let opt = ProjectionPushdown::default(); + insta::assert_yaml_snapshot!( + OptimizationTest::new(plan, opt), + @r###" + --- + input: + - " ProjectionExec: expr=[tag1@0 as tag1]" + - " ProjectionExec: expr=[foo as tag1, bar as tag2]" + - " EmptyExec: produce_one_row=false" + output: + Ok: + - " ProjectionExec: expr=[foo as tag1]" + - " EmptyExec: produce_one_row=false" + "### + ); + } + + #[test] + fn test_nested_proj_inner_is_pure() { + let schema = schema(); + let plan = Arc::new(EmptyExec::new(false, schema)); + let plan = Arc::new( + ProjectionExec::try_new( + vec![ + (expr_col("tag1", &plan.schema()), String::from("tag1")), + (expr_col("tag2", &plan.schema()), String::from("tag2")), + ], + plan, + ) + .unwrap(), + ); + let plan = Arc::new( + ProjectionExec::try_new( + vec![(expr_col("tag1", &plan.schema()), String::from("tag1"))], + plan, + ) + .unwrap(), + ); + let opt = ProjectionPushdown::default(); + let test = OptimizationTest::new(plan, opt); + insta::assert_yaml_snapshot!( + test, + @r###" + --- + input: + - " ProjectionExec: expr=[tag1@0 as tag1]" + - " ProjectionExec: expr=[tag1@0 as tag1, tag2@1 as tag2]" + - " EmptyExec: produce_one_row=false" + output: + Ok: + - " EmptyExec: produce_one_row=false" + "### + ); + let empty_exec = test + .output_plan() + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + let expected_schema = Schema::new(vec![Field::new("tag1", DataType::Utf8, true)]); + assert_eq!(empty_exec.schema().as_ref(), &expected_schema); + } + // since `DeduplicateExec` and `FilterExec` both use `wrap_user_into_projections`, we only test a few variants for `DeduplicateExec` #[test] fn test_dedup_projection_split1() { From 6bfae7bb5265b0f5d2547c604ec4b7fbad459a5d Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Thu, 16 Mar 2023 09:23:02 +0000 Subject: [PATCH 07/15] chore(deps): Bump predicates from 2.1.5 to 3.0.1 (#7229) * chore(deps): Bump predicates from 2.1.5 to 3.0.1 Bumps [predicates](https://github.com/assert-rs/predicates-rs) from 2.1.5 to 3.0.1. - [Release notes](https://github.com/assert-rs/predicates-rs/releases) - [Changelog](https://github.com/assert-rs/predicates-rs/blob/master/CHANGELOG.md) - [Commits](https://github.com/assert-rs/predicates-rs/compare/v2.1.5...v3.0.1) --- updated-dependencies: - dependency-name: predicates dependency-type: direct:production update-type: version-update:semver-major ... Signed-off-by: dependabot[bot] * chore: Run cargo hakari tasks --------- Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: CircleCI[bot] --- Cargo.lock | 22 ++++++---------------- influxdb_iox/Cargo.toml | 2 +- workspace-hack/Cargo.toml | 1 + 3 files changed, 8 insertions(+), 17 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1174c9ee1f..6fed5019e8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -373,7 +373,7 @@ dependencies = [ "anstyle", "bstr", "doc-comment", - "predicates 3.0.1", + "predicates", "predicates-core", "predicates-tree", "wait-timeout", @@ -2546,7 +2546,7 @@ dependencies = [ "parquet_file", "parquet_to_line_protocol", "predicate", - "predicates 2.1.5", + "predicates", "prost", "rustyline", "schema", @@ -4325,20 +4325,6 @@ dependencies = [ "workspace-hack", ] -[[package]] -name = "predicates" -version = "2.1.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "59230a63c37f3e18569bdb90e4a89cbf5bf8b06fea0b84e65ea10cc4df47addd" -dependencies = [ - "difflib", - "float-cmp", - "itertools", - "normalize-line-endings", - "predicates-core", - "regex", -] - [[package]] name = "predicates" version = "3.0.1" @@ -4347,8 +4333,11 @@ checksum = "1ba7d6ead3e3966038f68caa9fc1f860185d95a793180bbcfe0d0da47b3961ed" dependencies = [ "anstyle", "difflib", + "float-cmp", "itertools", + "normalize-line-endings", "predicates-core", + "regex", ] [[package]] @@ -6741,6 +6730,7 @@ dependencies = [ "parquet", "petgraph", "phf_shared", + "predicates", "prost", "prost-types", "rand", diff --git a/influxdb_iox/Cargo.toml b/influxdb_iox/Cargo.toml index 951ff8e142..a4da9dcc81 100644 --- a/influxdb_iox/Cargo.toml +++ b/influxdb_iox/Cargo.toml @@ -86,7 +86,7 @@ arrow_util = { path = "../arrow_util" } assert_cmd = "2.0.9" async-trait = "0.1" predicate = { path = "../predicate" } -predicates = "2.1.0" +predicates = "3.0.1" serde = "1.0.156" test_helpers = { path = "../test_helpers", features = ["future_timeout"] } test_helpers_end_to_end = { path = "../test_helpers_end_to_end" } diff --git a/workspace-hack/Cargo.toml b/workspace-hack/Cargo.toml index 519226d2d3..5475f89b69 100644 --- a/workspace-hack/Cargo.toml +++ b/workspace-hack/Cargo.toml @@ -63,6 +63,7 @@ parking_lot = { version = "0.12", features = ["arc_lock"] } parquet = { version = "34", features = ["async", "experimental"] } petgraph = { version = "0.6" } phf_shared = { version = "0.11" } +predicates = { version = "3" } prost = { version = "0.11" } prost-types = { version = "0.11" } rand = { version = "0.8", features = ["small_rng"] } From c819d4f865d2deaaa4a84637fcbf8397aa92dc4e Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 16 Mar 2023 11:13:51 +0100 Subject: [PATCH 08/15] feat(flightsql): Implement `GetDbSchemas` metadata API (#7213) Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com> --- Cargo.lock | 1 + flightsql/src/cmd.rs | 72 +++++---- flightsql/src/planner.rs | 101 ++++++++++++- influxdb_iox/Cargo.toml | 1 + .../tests/end_to_end_cases/flightsql.rs | 139 +++++++++++++++++- influxdb_iox/tests/jdbc_client/Main.java | 8 +- influxdb_iox_client/src/client/flightsql.rs | 36 ++++- service_grpc_flight/src/request.rs | 5 +- 8 files changed, 321 insertions(+), 42 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6fed5019e8..c4b3a75342 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2520,6 +2520,7 @@ dependencies = [ "influxdb_iox_client", "influxdb_storage_client", "influxrpc_parser", + "insta", "iox_catalog", "iox_query", "iox_time", diff --git a/flightsql/src/cmd.rs b/flightsql/src/cmd.rs index 6b2ccc7af2..782dfbc256 100644 --- a/flightsql/src/cmd.rs +++ b/flightsql/src/cmd.rs @@ -4,7 +4,7 @@ use std::fmt::Display; use arrow_flight::sql::{ ActionClosePreparedStatementRequest, ActionCreatePreparedStatementRequest, Any, - CommandGetCatalogs, CommandPreparedStatementQuery, CommandStatementQuery, + CommandGetCatalogs, CommandGetDbSchemas, CommandPreparedStatementQuery, CommandStatementQuery, }; use bytes::Bytes; use prost::Message; @@ -63,13 +63,16 @@ impl From for Bytes { /// and forth to native Rust types #[derive(Debug, Clone, PartialEq)] pub enum FlightSQLCommand { - CommandStatementQuery(String), - /// Run a prepared statement + CommandStatementQuery(CommandStatementQuery), + /// Run a prepared statement. CommandPreparedStatementQuery(PreparedStatementHandle), - /// Get a list of the available catalogs - CommandGetCatalogs(), + /// Get a list of the available catalogs. See [`CommandGetCatalogs`] for details. + CommandGetCatalogs(CommandGetCatalogs), + /// Get a list of the available schemas. See [`CommandGetDbSchemas`] + /// for details and how to interpret the parameters. + CommandGetDbSchemas(CommandGetDbSchemas), /// Create a prepared statement - ActionCreatePreparedStatementRequest(String), + ActionCreatePreparedStatementRequest(ActionCreatePreparedStatementRequest), /// Close a prepared statement ActionClosePreparedStatementRequest(PreparedStatementHandle), } @@ -77,11 +80,29 @@ pub enum FlightSQLCommand { impl Display for FlightSQLCommand { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - Self::CommandStatementQuery(q) => write!(f, "CommandStatementQuery{q}"), + Self::CommandStatementQuery(CommandStatementQuery { query }) => { + write!(f, "CommandStatementQuery{query}") + } Self::CommandPreparedStatementQuery(h) => write!(f, "CommandPreparedStatementQuery{h}"), - Self::CommandGetCatalogs() => write!(f, "CommandGetCatalogs"), - Self::ActionCreatePreparedStatementRequest(q) => { - write!(f, "ActionCreatePreparedStatementRequest{q}") + Self::CommandGetCatalogs(CommandGetCatalogs {}) => write!(f, "CommandGetCatalogs"), + Self::CommandGetDbSchemas(CommandGetDbSchemas { + catalog, + db_schema_filter_pattern, + }) => { + write!( + f, + "CommandGetCatalogs(catalog={}, db_schema_filter_pattern={}", + catalog.as_ref().map(|c| c.as_str()).unwrap_or(""), + db_schema_filter_pattern + .as_ref() + .map(|c| c.as_str()) + .unwrap_or("") + ) + } + Self::ActionCreatePreparedStatementRequest(ActionCreatePreparedStatementRequest { + query, + }) => { + write!(f, "ActionCreatePreparedStatementRequest{query}") } Self::ActionClosePreparedStatementRequest(h) => { write!(f, "ActionClosePreparedStatementRequest{h}") @@ -97,25 +118,24 @@ impl FlightSQLCommand { let msg: Any = Message::decode(msg)?; if let Some(decoded_cmd) = Any::unpack::(&msg)? { - let CommandStatementQuery { query } = decoded_cmd; - Ok(Self::CommandStatementQuery(query)) + Ok(Self::CommandStatementQuery(decoded_cmd)) } else if let Some(decoded_cmd) = Any::unpack::(&msg)? { let CommandPreparedStatementQuery { prepared_statement_handle, } = decoded_cmd; - + // Decode to IOx specific structure let handle = PreparedStatementHandle::try_decode(prepared_statement_handle)?; Ok(Self::CommandPreparedStatementQuery(handle)) } else if let Some(decoded_cmd) = Any::unpack::(&msg)? { - let CommandGetCatalogs {} = decoded_cmd; - - Ok(Self::CommandGetCatalogs()) + Ok(Self::CommandGetCatalogs(decoded_cmd)) + } else if let Some(decoded_cmd) = Any::unpack::(&msg)? { + Ok(Self::CommandGetDbSchemas(decoded_cmd)) } else if let Some(decoded_cmd) = Any::unpack::(&msg)? { - let ActionCreatePreparedStatementRequest { query } = decoded_cmd; - Ok(Self::ActionCreatePreparedStatementRequest(query)) + Ok(Self::ActionCreatePreparedStatementRequest(decoded_cmd)) } else if let Some(decoded_cmd) = Any::unpack::(&msg)? { + // Decode to IOx specific structure let ActionClosePreparedStatementRequest { prepared_statement_handle, } = decoded_cmd; @@ -132,19 +152,17 @@ impl FlightSQLCommand { // Encode the command as a flightsql message (bytes) pub fn try_encode(self) -> Result { let msg = match self { - FlightSQLCommand::CommandStatementQuery(query) => { - Any::pack(&CommandStatementQuery { query }) - } + FlightSQLCommand::CommandStatementQuery(cmd) => Any::pack(&cmd), FlightSQLCommand::CommandPreparedStatementQuery(handle) => { let prepared_statement_handle = handle.encode(); - Any::pack(&CommandPreparedStatementQuery { + let cmd = CommandPreparedStatementQuery { prepared_statement_handle, - }) - } - FlightSQLCommand::CommandGetCatalogs() => Any::pack(&CommandGetCatalogs {}), - FlightSQLCommand::ActionCreatePreparedStatementRequest(query) => { - Any::pack(&ActionCreatePreparedStatementRequest { query }) + }; + Any::pack(&cmd) } + FlightSQLCommand::CommandGetCatalogs(cmd) => Any::pack(&cmd), + FlightSQLCommand::CommandGetDbSchemas(cmd) => Any::pack(&cmd), + FlightSQLCommand::ActionCreatePreparedStatementRequest(cmd) => Any::pack(&cmd), FlightSQLCommand::ActionClosePreparedStatementRequest(handle) => { let prepared_statement_handle = handle.encode(); Any::pack(&ActionClosePreparedStatementRequest { diff --git a/flightsql/src/planner.rs b/flightsql/src/planner.rs index b7ca27ba93..f64aba91e4 100644 --- a/flightsql/src/planner.rs +++ b/flightsql/src/planner.rs @@ -3,11 +3,14 @@ use std::sync::Arc; use arrow::{datatypes::Schema, error::ArrowError, ipc::writer::IpcWriteOptions}; use arrow_flight::{ - sql::{ActionCreatePreparedStatementResult, Any}, + sql::{ + ActionCreatePreparedStatementRequest, ActionCreatePreparedStatementResult, Any, + CommandGetCatalogs, CommandGetDbSchemas, CommandStatementQuery, + }, IpcMessage, SchemaAsIpc, }; use bytes::Bytes; -use datafusion::{logical_expr::LogicalPlan, physical_plan::ExecutionPlan}; +use datafusion::{logical_expr::LogicalPlan, physical_plan::ExecutionPlan, scalar::ScalarValue}; use iox_query::{exec::IOxSessionContext, QueryNamespace}; use observability_deps::tracing::debug; use prost::Message; @@ -34,16 +37,23 @@ impl FlightSQLPlanner { debug!(%namespace_name, %cmd, "Handling flightsql get_flight_info"); match cmd { - FlightSQLCommand::CommandStatementQuery(query) => { + FlightSQLCommand::CommandStatementQuery(CommandStatementQuery { query }) => { get_schema_for_query(&query, ctx).await } FlightSQLCommand::CommandPreparedStatementQuery(handle) => { get_schema_for_query(handle.query(), ctx).await } - FlightSQLCommand::CommandGetCatalogs() => { + FlightSQLCommand::CommandGetCatalogs(CommandGetCatalogs {}) => { let plan = plan_get_catalogs(ctx).await?; get_schema_for_plan(plan) } + FlightSQLCommand::CommandGetDbSchemas(CommandGetDbSchemas { + catalog, + db_schema_filter_pattern, + }) => { + let plan = plan_get_db_schemas(ctx, catalog, db_schema_filter_pattern).await?; + get_schema_for_plan(plan) + } FlightSQLCommand::ActionCreatePreparedStatementRequest(_) | FlightSQLCommand::ActionClosePreparedStatementRequest(_) => ProtocolSnafu { cmd: format!("{cmd:?}"), @@ -64,7 +74,7 @@ impl FlightSQLPlanner { debug!(%namespace_name, %cmd, "Handling flightsql do_get"); match cmd { - FlightSQLCommand::CommandStatementQuery(query) => { + FlightSQLCommand::CommandStatementQuery(CommandStatementQuery { query }) => { debug!(%query, "Planning FlightSQL query"); Ok(ctx.prepare_sql(&query).await?) } @@ -73,12 +83,23 @@ impl FlightSQLPlanner { debug!(%query, "Planning FlightSQL prepared query"); Ok(ctx.prepare_sql(query).await?) } - FlightSQLCommand::CommandGetCatalogs() => { + FlightSQLCommand::CommandGetCatalogs(CommandGetCatalogs {}) => { debug!("Planning GetCatalogs query"); let plan = plan_get_catalogs(ctx).await?; Ok(ctx.create_physical_plan(&plan).await?) } - + FlightSQLCommand::CommandGetDbSchemas(CommandGetDbSchemas { + catalog, + db_schema_filter_pattern, + }) => { + debug!( + ?catalog, + ?db_schema_filter_pattern, + "Planning GetDbSchemas query" + ); + let plan = plan_get_db_schemas(ctx, catalog, db_schema_filter_pattern).await?; + Ok(ctx.create_physical_plan(&plan).await?) + } FlightSQLCommand::ActionClosePreparedStatementRequest(_) | FlightSQLCommand::ActionCreatePreparedStatementRequest(_) => ProtocolSnafu { cmd: format!("{cmd:?}"), @@ -101,7 +122,9 @@ impl FlightSQLPlanner { debug!(%namespace_name, %cmd, "Handling flightsql do_action"); match cmd { - FlightSQLCommand::ActionCreatePreparedStatementRequest(query) => { + FlightSQLCommand::ActionCreatePreparedStatementRequest( + ActionCreatePreparedStatementRequest { query }, + ) => { debug!(%query, "Creating prepared statement"); // todo run the planner here and actually figure out parameter schemas @@ -173,3 +196,65 @@ async fn plan_get_catalogs(ctx: &IOxSessionContext) -> Result { let query = "SELECT DISTINCT table_catalog AS catalog_name FROM information_schema.tables ORDER BY table_catalog"; Ok(ctx.plan_sql(query).await?) } + +/// Return a `LogicalPlan` for GetDbSchemas +/// +/// # Parameters +/// +/// Definition from +/// +/// catalog: Specifies the Catalog to search for the tables. +/// An empty string retrieves those without a catalog. +/// If omitted the catalog name should not be used to narrow the search. +/// +/// db_schema_filter_pattern: Specifies a filter pattern for schemas to search for. +/// When no db_schema_filter_pattern is provided, the pattern will not be used to narrow the search. +/// In the pattern string, two special characters can be used to denote matching rules: +/// - "%" means to match any substring with 0 or more characters. +/// - "_" means to match any one character. +/// +async fn plan_get_db_schemas( + ctx: &IOxSessionContext, + catalog: Option, + db_schema_filter_pattern: Option, +) -> Result { + let (query, params) = match (catalog, db_schema_filter_pattern) { + (Some(catalog), Some(db_schema_filter_pattern)) => ( + "PREPARE my_plan(VARCHAR, VARCHAR) AS \ + SELECT DISTINCT table_catalog AS catalog_name, table_schema AS db_schema_name \ + FROM information_schema.tables \ + WHERE table_catalog like $1 AND table_schema like $2 \ + ORDER BY table_catalog, table_schema", + vec![ + ScalarValue::Utf8(Some(catalog)), + ScalarValue::Utf8(Some(db_schema_filter_pattern)), + ], + ), + (None, Some(db_schema_filter_pattern)) => ( + "PREPARE my_plan(VARCHAR) AS \ + SELECT DISTINCT table_catalog AS catalog_name, table_schema AS db_schema_name \ + FROM information_schema.tables \ + WHERE table_schema like $1 \ + ORDER BY table_catalog, table_schema", + vec![ScalarValue::Utf8(Some(db_schema_filter_pattern))], + ), + (Some(catalog), None) => ( + "PREPARE my_plan(VARCHAR) AS \ + SELECT DISTINCT table_catalog AS catalog_name, table_schema AS db_schema_name \ + FROM information_schema.tables \ + WHERE table_catalog like $1 \ + ORDER BY table_catalog, table_schema", + vec![ScalarValue::Utf8(Some(catalog))], + ), + (None, None) => ( + "SELECT DISTINCT table_catalog AS catalog_name, table_schema AS db_schema_name \ + FROM information_schema.tables \ + ORDER BY table_catalog, table_schema", + vec![], + ), + }; + + let plan = ctx.plan_sql(query).await?; + debug!(?plan, "Prepared plan is"); + Ok(plan.with_param_values(params)?) +} diff --git a/influxdb_iox/Cargo.toml b/influxdb_iox/Cargo.toml index a4da9dcc81..317279a1d8 100644 --- a/influxdb_iox/Cargo.toml +++ b/influxdb_iox/Cargo.toml @@ -90,6 +90,7 @@ predicates = "3.0.1" serde = "1.0.156" test_helpers = { path = "../test_helpers", features = ["future_timeout"] } test_helpers_end_to_end = { path = "../test_helpers_end_to_end" } +insta = { version = "1", features = ["yaml"] } [features] default = ["jemalloc_replacing_malloc"] diff --git a/influxdb_iox/tests/end_to_end_cases/flightsql.rs b/influxdb_iox/tests/end_to_end_cases/flightsql.rs index 17e57379d8..2c248a4caf 100644 --- a/influxdb_iox/tests/end_to_end_cases/flightsql.rs +++ b/influxdb_iox/tests/end_to_end_cases/flightsql.rs @@ -2,7 +2,7 @@ use std::path::PathBuf; use arrow::record_batch::RecordBatch; use arrow_flight::decode::FlightRecordBatchStream; -use arrow_util::assert_batches_sorted_eq; +use arrow_util::{assert_batches_sorted_eq, test_util::batches_to_sorted_lines}; use assert_cmd::Command; use datafusion::common::assert_contains; use futures::{FutureExt, TryStreamExt}; @@ -180,6 +180,131 @@ async fn flightsql_get_catalogs() { .await } +#[tokio::test] +async fn flightsql_get_db_schemas() { + test_helpers::maybe_start_logging(); + let database_url = maybe_skip_integration!(); + + let table_name = "the_table"; + + // Set up the cluster ==================================== + let mut cluster = MiniCluster::create_shared2(database_url).await; + + StepTest::new( + &mut cluster, + vec![ + Step::WriteLineProtocol(format!( + "{table_name},tag1=A,tag2=B val=42i 123456\n\ + {table_name},tag1=A,tag2=C val=43i 123457" + )), + Step::Custom(Box::new(move |state: &mut StepTestState| { + async move { + struct TestCase { + catalog: Option<&'static str>, + db_schema_filter_pattern: Option<&'static str>, + } + let cases = [ + TestCase { + catalog: None, + db_schema_filter_pattern: None, + }, + TestCase { + // pub <> public + catalog: Some("pub"), + db_schema_filter_pattern: None, + }, + TestCase { + // pub% should match all + catalog: Some("pub%"), + db_schema_filter_pattern: None, + }, + TestCase { + catalog: None, + db_schema_filter_pattern: Some("%for%"), + }, + TestCase { + catalog: Some("public"), + db_schema_filter_pattern: Some("iox"), + }, + ]; + + let mut client = flightsql_client(state.cluster()); + + let mut output = vec![]; + for case in cases { + let TestCase { + catalog, + db_schema_filter_pattern, + } = case; + output.push(format!("catalog:{catalog:?}")); + output.push(format!( + "db_schema_filter_pattern:{db_schema_filter_pattern:?}" + )); + output.push("*********************".into()); + + let stream = client + .get_db_schemas(catalog, db_schema_filter_pattern) + .await + .unwrap(); + let batches = collect_stream(stream).await; + output.extend(batches_to_sorted_lines(&batches)) + } + insta::assert_yaml_snapshot!( + output, + @r###" + --- + - "catalog:None" + - "db_schema_filter_pattern:None" + - "*********************" + - +--------------+--------------------+ + - "| catalog_name | db_schema_name |" + - +--------------+--------------------+ + - "| public | information_schema |" + - "| public | iox |" + - "| public | system |" + - +--------------+--------------------+ + - "catalog:Some(\"pub\")" + - "db_schema_filter_pattern:None" + - "*********************" + - ++ + - ++ + - "catalog:Some(\"pub%\")" + - "db_schema_filter_pattern:None" + - "*********************" + - +--------------+--------------------+ + - "| catalog_name | db_schema_name |" + - +--------------+--------------------+ + - "| public | information_schema |" + - "| public | iox |" + - "| public | system |" + - +--------------+--------------------+ + - "catalog:None" + - "db_schema_filter_pattern:Some(\"%for%\")" + - "*********************" + - +--------------+--------------------+ + - "| catalog_name | db_schema_name |" + - +--------------+--------------------+ + - "| public | information_schema |" + - +--------------+--------------------+ + - "catalog:Some(\"public\")" + - "db_schema_filter_pattern:Some(\"iox\")" + - "*********************" + - +--------------+----------------+ + - "| catalog_name | db_schema_name |" + - +--------------+----------------+ + - "| public | iox |" + - +--------------+----------------+ + "### + ); + } + .boxed() + })), + ], + ) + .run() + .await +} + #[tokio::test] /// Runs the `jdbc_client` program against IOx to verify JDBC via FlightSQL is working /// @@ -265,13 +390,23 @@ async fn flightsql_jdbc() { ------------\n\ public"; + let expected_schemas = "**************\n\ + Schemas:\n\ + **************\n\ + TABLE_SCHEM, TABLE_CATALOG\n\ + ------------\n\ + information_schema, public\n\ + iox, public\n\ + system, public"; + // Validate metadata: jdbc_client metadata Command::from_std(std::process::Command::new(&path)) .arg(&jdbc_url) .arg("metadata") .assert() .success() - .stdout(predicate::str::contains(expected_catalogs)); + .stdout(predicate::str::contains(expected_catalogs)) + .stdout(predicate::str::contains(expected_schemas)); } .boxed() })), diff --git a/influxdb_iox/tests/jdbc_client/Main.java b/influxdb_iox/tests/jdbc_client/Main.java index 4b1e16f61a..f3c9282b94 100644 --- a/influxdb_iox/tests/jdbc_client/Main.java +++ b/influxdb_iox/tests/jdbc_client/Main.java @@ -125,8 +125,12 @@ public class Main { System.out.println("**************"); System.out.println("Catalogs:"); System.out.println("**************"); - ResultSet rs = md.getCatalogs(); - print_result_set(rs); + print_result_set(md.getCatalogs()); + + System.out.println("**************"); + System.out.println("Schemas:"); + System.out.println("**************"); + print_result_set(md.getSchemas()); //System.out.println("isReadOnly: " + md.isReadOnly()); //System.out.println("getSearchStringEscape: " + md.getSearchStringEscape()); diff --git a/influxdb_iox_client/src/client/flightsql.rs b/influxdb_iox_client/src/client/flightsql.rs index 116c365dd0..90c4809076 100644 --- a/influxdb_iox_client/src/client/flightsql.rs +++ b/influxdb_iox_client/src/client/flightsql.rs @@ -29,7 +29,8 @@ use arrow_flight::{ error::{FlightError, Result}, sql::{ ActionCreatePreparedStatementRequest, ActionCreatePreparedStatementResult, Any, - CommandGetCatalogs, CommandPreparedStatementQuery, CommandStatementQuery, ProstMessageExt, + CommandGetCatalogs, CommandGetDbSchemas, CommandPreparedStatementQuery, + CommandStatementQuery, ProstMessageExt, }, Action, FlightClient, FlightDescriptor, FlightInfo, IpcMessage, Ticket, }; @@ -134,13 +135,44 @@ impl FlightSqlClient { self.do_get_with_cmd(msg.as_any()).await } + /// List the schemas on this server + /// + /// # Parameters + /// + /// Definition from + /// + /// catalog: Specifies the Catalog to search for the tables. + /// An empty string retrieves those without a catalog. + /// If omitted the catalog name should not be used to narrow the search. + /// + /// db_schema_filter_pattern: Specifies a filter pattern for schemas to search for. + /// When no db_schema_filter_pattern is provided, the pattern will not be used to narrow the search. + /// In the pattern string, two special characters can be used to denote matching rules: + /// - "%" means to match any substring with 0 or more characters. + /// - "_" means to match any one character. + /// + /// This implementation does not support alternate endpoints + pub async fn get_db_schemas( + &mut self, + catalog: Option + Send>, + db_schema_filter_pattern: Option + Send>, + ) -> Result { + let msg = CommandGetDbSchemas { + catalog: catalog.map(|s| s.into()), + db_schema_filter_pattern: db_schema_filter_pattern.map(|s| s.into()), + }; + self.do_get_with_cmd(msg.as_any()).await + } + /// Implements the canonical interaction for most FlightSQL messages: /// /// 1. Call `GetFlightInfo` with the provided message, and get a /// [`FlightInfo`] and embedded ticket. /// /// 2. Call `DoGet` with the provided ticket. - async fn do_get_with_cmd( + /// + /// TODO: example calling with GetDbSchemas + pub async fn do_get_with_cmd( &mut self, cmd: arrow_flight::sql::Any, ) -> Result { diff --git a/service_grpc_flight/src/request.rs b/service_grpc_flight/src/request.rs index c0f284e27c..cd9e6a8964 100644 --- a/service_grpc_flight/src/request.rs +++ b/service_grpc_flight/src/request.rs @@ -216,6 +216,7 @@ impl IoxGetRequest { #[cfg(test)] mod tests { + use arrow_flight::sql::CommandStatementQuery; use assert_matches::assert_matches; use generated_types::influxdata::iox::querier::v1::read_info::QueryType; @@ -385,7 +386,9 @@ mod tests { #[test] fn round_trip_flightsql() { - let cmd = FlightSQLCommand::CommandStatementQuery("select * from foo".into()); + let cmd = FlightSQLCommand::CommandStatementQuery(CommandStatementQuery { + query: "select * from foo".into(), + }); let request = IoxGetRequest { namespace_name: "foo_blarg".into(), From 43d174b891f5c9472b3604e58ea262619edfc933 Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Thu, 16 Mar 2023 11:55:01 +0100 Subject: [PATCH 09/15] refactor: remove Clone derive on CompletedPersist This type is exposed as an Arc-wrapped (reference counted) shared item to avoid cloning the underlying data, and as such this type should not be cloned directly. --- ingester2/src/persist/completion_observer.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ingester2/src/persist/completion_observer.rs b/ingester2/src/persist/completion_observer.rs index 762df271cc..46fa9e6a44 100644 --- a/ingester2/src/persist/completion_observer.rs +++ b/ingester2/src/persist/completion_observer.rs @@ -22,7 +22,7 @@ pub(crate) trait PersistCompletionObserver: Send + Sync + Debug { } /// A set of details describing the persisted data. -#[derive(Debug, Clone)] +#[derive(Debug)] pub struct CompletedPersist { /// The catalog identifiers for the persisted partition. namespace_id: NamespaceId, From d2874f5c028eff78edd5c3f3286579d02768bad4 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Thu, 16 Mar 2023 14:14:08 +0100 Subject: [PATCH 10/15] test: fix chunk boundaries for retention query tests (#7235) See code comment, found while working on #6098. --- .../tests/query_tests2/cases/in/retention.sql.expected | 10 +++++----- influxdb_iox/tests/query_tests2/setups.rs | 7 ++++++- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/influxdb_iox/tests/query_tests2/cases/in/retention.sql.expected b/influxdb_iox/tests/query_tests2/cases/in/retention.sql.expected index 77e2622229..404a5c634c 100644 --- a/influxdb_iox/tests/query_tests2/cases/in/retention.sql.expected +++ b/influxdb_iox/tests/query_tests2/cases/in/retention.sql.expected @@ -3,9 +3,9 @@ +------+------+----------------------+ | host | load | time | +------+------+----------------------+ -| a | 1.0 | 2022-01-01T01:00:00Z | -| b | 2.0 | 2022-01-01T01:00:00Z | -| bb | 21.0 | 2022-01-01T01:00:00Z | +| a | 1.0 | 2022-01-01T11:00:00Z | +| b | 2.0 | 2022-01-01T11:00:00Z | +| bb | 21.0 | 2022-01-01T11:00:00Z | +------+------+----------------------+ -- SQL: EXPLAIN SELECT * FROM cpu order by host, load, time; -- Results After Normalizing UUIDs @@ -31,8 +31,8 @@ +------+------+----------------------+ | host | load | time | +------+------+----------------------+ -| a | 1.0 | 2022-01-01T01:00:00Z | -| bb | 21.0 | 2022-01-01T01:00:00Z | +| a | 1.0 | 2022-01-01T11:00:00Z | +| bb | 21.0 | 2022-01-01T11:00:00Z | +------+------+----------------------+ -- SQL: EXPLAIN SELECT * FROM cpu WHERE host != 'b' ORDER BY host,time; -- Results After Normalizing UUIDs diff --git a/influxdb_iox/tests/query_tests2/setups.rs b/influxdb_iox/tests/query_tests2/setups.rs index b3b174bdaf..1a29efa5be 100644 --- a/influxdb_iox/tests/query_tests2/setups.rs +++ b/influxdb_iox/tests/query_tests2/setups.rs @@ -1291,7 +1291,12 @@ impl RetentionSetup { let retention_period_1_hour_ns = 3600 * 1_000_000_000; // Data is relative to this particular time stamp - let cutoff = Time::from_rfc3339("2022-01-01T00:00:00+00:00") + // + // Use a cutoff date that is NOT at the start of the partition so that `lp_partially_inside` only spans a single + // partition, not two. This is important because otherwise this will result in two chunks / files, not one. + // However a partial inside/outside chunk is important for the query tests so that we can proof that it is not + // sufficient to prune the chunks solely on statistics but that there needs to be an actual row-wise filter. + let cutoff = Time::from_rfc3339("2022-01-01T10:00:00+00:00") .unwrap() .timestamp_nanos(); // Timestamp 1 hour later than the cutoff, so the data will be retained for 1 hour From 78c8429caad0f25cf638d9ab9d6f8112c0dabc9b Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Thu, 16 Mar 2023 14:28:16 +0100 Subject: [PATCH 11/15] chore: split reference_tracker module into submods Breaks the reference_tracker module into sub-modules - this keeps the code a little tidier. Soon I'll add more code, so this one file was getting really big! --- ingester2/src/wal/reference_tracker/actor.rs | 340 ++++++++++++++++ .../handle.rs} | 363 +----------------- ingester2/src/wal/reference_tracker/mod.rs | 8 + .../src/wal/reference_tracker/wal_deleter.rs | 20 + 4 files changed, 379 insertions(+), 352 deletions(-) create mode 100644 ingester2/src/wal/reference_tracker/actor.rs rename ingester2/src/wal/{reference_tracker.rs => reference_tracker/handle.rs} (58%) create mode 100644 ingester2/src/wal/reference_tracker/mod.rs create mode 100644 ingester2/src/wal/reference_tracker/wal_deleter.rs diff --git a/ingester2/src/wal/reference_tracker/actor.rs b/ingester2/src/wal/reference_tracker/actor.rs new file mode 100644 index 0000000000..ca92f1da55 --- /dev/null +++ b/ingester2/src/wal/reference_tracker/actor.rs @@ -0,0 +1,340 @@ +use std::sync::Arc; + +use data_types::{ + sequence_number_set::{self, SequenceNumberSet}, + SequenceNumber, +}; +use hashbrown::HashMap; +use metric::U64Gauge; +use observability_deps::tracing::{debug, info}; +use tokio::{select, sync::mpsc}; +use wal::SegmentId; + +use crate::{ + persist::completion_observer::CompletedPersist, wal::reference_tracker::WalFileDeleter, +}; + +/// A WAL file reference-count tracker. +/// +/// See [`WalReferenceHandle`]. +/// +/// [`WalReferenceHandle`]: super::WalReferenceHandle +#[derive(Debug)] +pub(crate) struct WalReferenceActor> { + wal: T, + + /// The set of IDs of persisted data that do not yet appear in + /// `wal_files`, the set of WAL files rotated out of active use. This is + /// an intermediate buffer necessary to tolerate out-of-order persist + /// notifications w.r.t file notifications. + /// + /// IDs that appear in this set are most likely part of the active WAL + /// segment file and should be reconciled when it rotates. + persisted: SequenceNumberSet, + + /// The set of closed WAL segment files, and the set of unpersisted + /// [`SequenceNumber`] they contain. + /// + /// These [`SequenceNumberSet`] are slowly drained / have IDs removed in + /// response to persisted data notifications. Once the set is of length 0, + /// the file can be deleted as all the entries the file contains has been + /// persisted. + /// + /// Invariant: sets in this map are always non-empty. + wal_files: HashMap, + + /// Channels for input from the [`WalReferenceHandle`]. + /// + /// [`WalReferenceHandle`]: super::WalReferenceHandle + file_rx: mpsc::Receiver<(SegmentId, SequenceNumberSet)>, + persist_rx: mpsc::Receiver>, + unbuffered_rx: mpsc::Receiver, + + /// A metric tracking the number of rotated WAL files being reference + /// tracked. + num_files: U64Gauge, + /// The minimum [`SegmentId`] in `wal_files`, the set of old (rotated out) + /// files that will eventually be deleted. + /// + /// If this value never changes over the lifetime of an ingester, it is an + /// indication of a reference leak bug, causing a WAL file to never be + /// deleted. + min_id: U64Gauge, + /// The number of references to unpersisted operations remaining in the old + /// (rotated out) WAL files, decreasing as persistence completes, and + /// increasing as non-empty WAL files are rotated into `wal_files`. + referenced_ops: U64Gauge, +} + +impl WalReferenceActor +where + T: WalFileDeleter, +{ + pub(super) fn new( + wal: T, + file_rx: mpsc::Receiver<(SegmentId, SequenceNumberSet)>, + persist_rx: mpsc::Receiver>, + unbuffered_rx: mpsc::Receiver, + metrics: &metric::Registry, + ) -> Self { + let num_files = metrics + .register_metric::( + "ingester_wal_inactive_file_count", + "number of WAL files that are not being actively wrote to, but contain unpersisted data" + ) + .recorder(&[]); + + let min_id = metrics + .register_metric::( + "ingester_wal_inactive_min_id", + "the segment ID of the oldest inactive wal file", + ) + .recorder(&[]); + + let referenced_ops = metrics + .register_metric::( + "ingester_wal_inactive_file_op_reference_count", + "the number of unpersisted operations referenced in inactive WAL files", + ) + .recorder(&[]); + + Self { + wal, + persisted: SequenceNumberSet::default(), + wal_files: HashMap::with_capacity(3), + file_rx, + persist_rx, + unbuffered_rx, + num_files, + min_id, + referenced_ops, + } + } + + /// Execute the actor task. + /// + /// This task exits once the sender side of the input channels have been + /// dropped. + pub(crate) async fn run(mut self) { + loop { + select! { + // Prefer polling the channels in the specified order. + // + // By consuming file_rx first, there's a greater chance that + // subsequent persist/ignore events can be applied directly to + // the file sets, rather than having to wait in the intermediate + // "persisted" set, reducing memory utilisation. + biased; + + Some((id, f)) = self.file_rx.recv() => self.handle_new_file(id, f).await, + Some(p) = self.persist_rx.recv() => self.handle_persisted(p).await, + Some(i) = self.unbuffered_rx.recv() => self.handle_unbuffered(i).await, + else => break + } + + // After each action is processed, update the metrics. + self.update_metrics(); + } + + debug!("stopping wal reference counter task"); + } + + /// Update the metrics to match the internal state. + fn update_metrics(&self) { + let num_files = self.wal_files.len(); + + // Build a set of (id, set_len) tuples for debug logging. + let id_lens = self + .wal_files + .iter() + .map(|(id, set)| (*id, set.len())) + .collect::>(); + + // Emit a log for debugging purposes, showing the current state. + debug!( + num_files, + files=?id_lens, + persisted_set_len=self.persisted.len(), + "updated reference state" + ); + + // Reduce (id, set_len) tuples to the min ID and sum of the set lengths, + // defaulting to 0 for the length and u64::MAX for the ID if the file + // set is empty. + let (min_id, referenced_ops) = + id_lens + .into_iter() + .fold((u64::MAX, 0), |(id_min, len_sum), e| { + assert!(e.1 > 0); // Invariant: sets in file map are never empty + (id_min.min(e.0.get()), len_sum + e.1) + }); + + // And update the various exported metrics. + self.num_files.set(num_files as _); + self.min_id.set(min_id); + self.referenced_ops.set(referenced_ops); + } + + /// Track a newly rotated WAL segment, with the given [`SegmentId`] and + /// containing the operations specified in [`SequenceNumberSet`]. + /// + /// This method tolerates an empty `set`. + async fn handle_new_file(&mut self, segment_id: SegmentId, mut set: SequenceNumberSet) { + debug!( + %segment_id, + sequence_number_set = ?set, + "notified of new segment file" + ); + + // Clear the overlap between the "persisted" set, and this new file from + // both. + let n = clear_intersection(&mut self.persisted, &mut set); + if n > 0 { + debug!(n, "released previously persisted IDs"); + } + + // If the file set is now completely empty, it can be immediately + // deleted. + if set.is_empty() { + debug!(n, "immediately dropping empty segment file"); + return delete_file(&self.wal, segment_id).await; + } + + // Otherwise, retain this file for later persist notifications. + // + // Run-optimise the bitmap to minimise memory utilisation of this set. + // This is a relatively fast operation, and the file sets are expected + // to be highly suitable for RLE compression due to the monotonic + // sequence number assignments. + set.run_optimise(); + + // Insert the file set into the files being tracked + assert!(!set.is_empty()); // Invariant: sets in file map are never empty + assert!( + self.wal_files.insert(segment_id, set).is_none(), + "duplicate segment ID" + ); + } + + /// Process a persistence completion notification, decreasing the reference + /// counts against tracked WAL files, and holding any remaining IDs (in the + /// untracked active WAL segment) in a temporary "persisted" buffer. + async fn handle_persisted(&mut self, note: Arc) { + debug!( + namespace_id = %note.namespace_id(), + table_id = %note.table_id(), + partition_id = %note.partition_id(), + sequence_number_set = ?note.sequence_numbers(), + "notified of persisted data" + ); + + self.remove(note.owned_sequence_numbers()).await; + } + + /// Handle a write that has been added to the WAL, but that did not complete + /// / buffer. + /// + /// Because the write was added to the WAL, its ID will be part of the WAL + /// file's [`SequenceNumberSet`], but because the write was not buffered, it + /// will never be persisted and therefore the WAL set will always have an + /// outstanding reference unless it is accounted for here. + async fn handle_unbuffered(&mut self, id: SequenceNumber) { + debug!(sequence_number = id.get(), "notified of unbuffered write"); + + // Delegate to the same code as persisted by presenting this ID as a set + // - the same behaviour is required. + let mut set = SequenceNumberSet::with_capacity(1); + set.add(id); + + self.remove(set).await; + } + + /// Remove the intersection of `set` from all the sets in `self` (file sets, + /// and the untracked / "persisted" buffer set). + /// + /// Deletes all WAL files that are no longer referenced / have unpersisted + /// entries. + async fn remove(&mut self, mut set: SequenceNumberSet) { + // First remove this set from the "persisted" / file-less set. + let n = clear_intersection(&mut set, &mut self.persisted); + if n > 0 { + debug!(n, "released previously persisted IDs"); + } + + if set.is_empty() { + debug!(n, "fully matched previously persisted IDs"); + return; + } + + // And then walk the WAL file sets. + let mut remove_ids = Vec::with_capacity(0); + for (id, file_set) in self.wal_files.iter_mut() { + // Invariant: files in the file set always have at least 1 reference + assert!(!file_set.is_empty()); + + // Early exit the loop if possible. + if set.is_empty() { + break; + } + + // Clear the intersection of both sets. + let n = clear_intersection(&mut set, file_set); + if n == 0 { + continue; + } + + debug!(n, segment_id=%id, "matched file IDs"); + + // At least 1 element was removed from the file set, it may now be + // empty. + if file_set.is_empty() { + remove_ids.push(*id); + } + } + + // Union whatever IDs remain with the file-less persisted set. + if !set.is_empty() { + debug!(n = set.len(), "retaining file-less IDs"); + self.persisted.add_set(&set); + } + + // And delete any newly empty files + for id in remove_ids { + let file_set = self + .wal_files + .remove(&id) + .expect("id was obtained during iter"); + + // Invariant: the file being removed always has no references. + assert!(file_set.is_empty()); + + delete_file(&self.wal, id).await + } + } +} + +/// Remove the intersection of `a` and `b`, from both `a` and `b`, and return +/// the cardinality of the intersection. +fn clear_intersection(a: &mut SequenceNumberSet, b: &mut SequenceNumberSet) -> usize { + let intersection = sequence_number_set::intersect(a, b); + + a.remove_set(&intersection); + b.remove_set(&intersection); + + intersection.len() as _ +} + +/// Delete the specified WAL segment from `wal`, and log it at info. +async fn delete_file(wal: &T, id: SegmentId) +where + T: WalFileDeleter, +{ + info!( + %id, + "deleted fully-persisted wal segment" + ); + + wal.delete_file(id).await +} + +// Tests in actor.rs diff --git a/ingester2/src/wal/reference_tracker.rs b/ingester2/src/wal/reference_tracker/handle.rs similarity index 58% rename from ingester2/src/wal/reference_tracker.rs rename to ingester2/src/wal/reference_tracker/handle.rs index 8b7564fa96..36ce3dbbbc 100644 --- a/ingester2/src/wal/reference_tracker.rs +++ b/ingester2/src/wal/reference_tracker/handle.rs @@ -1,39 +1,15 @@ -//! A WAL file reference tracker, responsible for deleting files that contain -//! entirely persisted data. - use std::{fmt::Debug, sync::Arc}; -use async_trait::async_trait; -use data_types::{ - sequence_number_set::{self, SequenceNumberSet}, - SequenceNumber, -}; -use hashbrown::HashMap; -use metric::U64Gauge; -use observability_deps::tracing::{debug, info, warn}; -use tokio::{ - select, - sync::mpsc::{self, error::TrySendError}, -}; +use data_types::{sequence_number_set::SequenceNumberSet, SequenceNumber}; +use observability_deps::tracing::warn; +use tokio::sync::mpsc::{self, error::TrySendError}; use wal::SegmentId; -use crate::persist::completion_observer::CompletedPersist; +use crate::{ + persist::completion_observer::CompletedPersist, wal::reference_tracker::WalFileDeleter, +}; -/// An abstraction defining the ability of an implementer to delete WAL segment -/// files by ID. -#[async_trait] -pub(crate) trait WalFileDeleter: Debug + Send + Sync + 'static { - /// Delete the WAL segment with the specified [`SegmentId`], or panic if - /// deletion fails. - async fn delete_file(&self, id: SegmentId); -} - -#[async_trait] -impl WalFileDeleter for Arc { - async fn delete_file(&self, id: SegmentId) { - self.delete(id).await.expect("failed to drop wal segment"); - } -} +use super::WalReferenceActor; /// A WAL file reference-count tracker handle. /// @@ -173,333 +149,16 @@ impl WalReferenceHandle { } } -/// A WAL file reference-count tracker. -/// -/// See [`WalReferenceHandle`]. -#[derive(Debug)] -pub(crate) struct WalReferenceActor> { - wal: T, - - /// The set of IDs of persisted data that do not yet appear in - /// `wal_files`, the set of WAL files rotated out of active use. This is - /// an intermediate buffer necessary to tolerate out-of-order persist - /// notifications w.r.t file notifications. - /// - /// IDs that appear in this set are most likely part of the active WAL - /// segment file and should be reconciled when it rotates. - persisted: SequenceNumberSet, - - /// The set of closed WAL segment files, and the set of unpersisted - /// [`SequenceNumber`] they contain. - /// - /// These [`SequenceNumberSet`] are slowly drained / have IDs removed in - /// response to persisted data notifications. Once the set is of length 0, - /// the file can be deleted as all the entries the file contains has been - /// persisted. - /// - /// Invariant: sets in this map are always non-empty. - wal_files: HashMap, - - /// Channels for input from the [`WalReferenceHandle`]. - file_rx: mpsc::Receiver<(SegmentId, SequenceNumberSet)>, - persist_rx: mpsc::Receiver>, - unbuffered_rx: mpsc::Receiver, - - /// A metric tracking the number of rotated WAL files being reference - /// tracked. - num_files: U64Gauge, - /// The minimum [`SegmentId`] in `wal_files`, the set of old (rotated out) - /// files that will eventually be deleted. - /// - /// If this value never changes over the lifetime of an ingester, it is an - /// indication of a reference leak bug, causing a WAL file to never be - /// deleted. - min_id: U64Gauge, - /// The number of references to unpersisted operations remaining in the old - /// (rotated out) WAL files, decreasing as persistence completes, and - /// increasing as non-empty WAL files are rotated into `wal_files`. - referenced_ops: U64Gauge, -} - -impl WalReferenceActor -where - T: WalFileDeleter, -{ - fn new( - wal: T, - file_rx: mpsc::Receiver<(SegmentId, SequenceNumberSet)>, - persist_rx: mpsc::Receiver>, - unbuffered_rx: mpsc::Receiver, - metrics: &metric::Registry, - ) -> Self { - let num_files = metrics - .register_metric::( - "ingester_wal_inactive_file_count", - "number of WAL files that are not being actively wrote to, but contain unpersisted data" - ) - .recorder(&[]); - - let min_id = metrics - .register_metric::( - "ingester_wal_inactive_min_id", - "the segment ID of the oldest inactive wal file", - ) - .recorder(&[]); - - let referenced_ops = metrics - .register_metric::( - "ingester_wal_inactive_file_op_reference_count", - "the number of unpersisted operations referenced in inactive WAL files", - ) - .recorder(&[]); - - Self { - wal, - persisted: SequenceNumberSet::default(), - wal_files: HashMap::with_capacity(3), - file_rx, - persist_rx, - unbuffered_rx, - num_files, - min_id, - referenced_ops, - } - } - - /// Execute the actor task. - /// - /// This task exits once the sender side of the input channels have been - /// dropped. - pub(crate) async fn run(mut self) { - loop { - select! { - // Prefer polling the channels in the specified order. - // - // By consuming file_rx first, there's a greater chance that - // subsequent persist/ignore events can be applied directly to - // the file sets, rather than having to wait in the intermediate - // "persisted" set, reducing memory utilisation. - biased; - - Some((id, f)) = self.file_rx.recv() => self.handle_new_file(id, f).await, - Some(p) = self.persist_rx.recv() => self.handle_persisted(p).await, - Some(i) = self.unbuffered_rx.recv() => self.handle_unbuffered(i).await, - else => break - } - - // After each action is processed, update the metrics. - self.update_metrics(); - } - - debug!("stopping wal reference counter task"); - } - - /// Update the metrics to match the internal state. - fn update_metrics(&self) { - let num_files = self.wal_files.len(); - - // Build a set of (id, set_len) tuples for debug logging. - let id_lens = self - .wal_files - .iter() - .map(|(id, set)| (*id, set.len())) - .collect::>(); - - // Emit a log for debugging purposes, showing the current state. - debug!( - num_files, - files=?id_lens, - persisted_set_len=self.persisted.len(), - "updated reference state" - ); - - // Reduce (id, set_len) tuples to the min ID and sum of the set lengths, - // defaulting to 0 for the length and u64::MAX for the ID if the file - // set is empty. - let (min_id, referenced_ops) = - id_lens - .into_iter() - .fold((u64::MAX, 0), |(id_min, len_sum), e| { - assert!(e.1 > 0); // Invariant: sets in file map are never empty - (id_min.min(e.0.get()), len_sum + e.1) - }); - - // And update the various exported metrics. - self.num_files.set(num_files as _); - self.min_id.set(min_id); - self.referenced_ops.set(referenced_ops); - } - - /// Track a newly rotated WAL segment, with the given [`SegmentId`] and - /// containing the operations specified in [`SequenceNumberSet`]. - /// - /// This method tolerates an empty `set`. - async fn handle_new_file(&mut self, segment_id: SegmentId, mut set: SequenceNumberSet) { - debug!( - %segment_id, - sequence_number_set = ?set, - "notified of new segment file" - ); - - // Clear the overlap between the "persisted" set, and this new file from - // both. - let n = clear_intersection(&mut self.persisted, &mut set); - if n > 0 { - debug!(n, "released previously persisted IDs"); - } - - // If the file set is now completely empty, it can be immediately - // deleted. - if set.is_empty() { - debug!(n, "immediately dropping empty segment file"); - return delete_file(&self.wal, segment_id).await; - } - - // Otherwise, retain this file for later persist notifications. - // - // Run-optimise the bitmap to minimise memory utilisation of this set. - // This is a relatively fast operation, and the file sets are expected - // to be highly suitable for RLE compression due to the monotonic - // sequence number assignments. - set.run_optimise(); - - // Insert the file set into the files being tracked - assert!(!set.is_empty()); // Invariant: sets in file map are never empty - assert!( - self.wal_files.insert(segment_id, set).is_none(), - "duplicate segment ID" - ); - } - - /// Process a persistence completion notification, decreasing the reference - /// counts against tracked WAL files, and holding any remaining IDs (in the - /// untracked active WAL segment) in a temporary "persisted" buffer. - async fn handle_persisted(&mut self, note: Arc) { - debug!( - namespace_id = %note.namespace_id(), - table_id = %note.table_id(), - partition_id = %note.partition_id(), - sequence_number_set = ?note.sequence_numbers(), - "notified of persisted data" - ); - - self.remove(note.owned_sequence_numbers()).await; - } - - /// Handle a write that has been added to the WAL, but that did not complete - /// / buffer. - /// - /// Because the write was added to the WAL, its ID will be part of the WAL - /// file's [`SequenceNumberSet`], but because the write was not buffered, it - /// will never be persisted and therefore the WAL set will always have an - /// outstanding reference unless it is accounted for here. - async fn handle_unbuffered(&mut self, id: SequenceNumber) { - debug!(sequence_number = id.get(), "notified of unbuffered write"); - - // Delegate to the same code as persisted by presenting this ID as a set - // - the same behaviour is required. - let mut set = SequenceNumberSet::with_capacity(1); - set.add(id); - - self.remove(set).await; - } - - /// Remove the intersection of `set` from all the sets in `self` (file sets, - /// and the untracked / "persisted" buffer set). - /// - /// Deletes all WAL files that are no longer referenced / have unpersisted - /// entries. - async fn remove(&mut self, mut set: SequenceNumberSet) { - // First remove this set from the "persisted" / file-less set. - let n = clear_intersection(&mut set, &mut self.persisted); - if n > 0 { - debug!(n, "released previously persisted IDs"); - } - - if set.is_empty() { - debug!(n, "fully matched previously persisted IDs"); - return; - } - - // And then walk the WAL file sets. - let mut remove_ids = Vec::with_capacity(0); - for (id, file_set) in self.wal_files.iter_mut() { - // Invariant: files in the file set always have at least 1 reference - assert!(!file_set.is_empty()); - - // Early exit the loop if possible. - if set.is_empty() { - break; - } - - // Clear the intersection of both sets. - let n = clear_intersection(&mut set, file_set); - if n == 0 { - continue; - } - - debug!(n, segment_id=%id, "matched file IDs"); - - // At least 1 element was removed from the file set, it may now be - // empty. - if file_set.is_empty() { - remove_ids.push(*id); - } - } - - // Union whatever IDs remain with the file-less persisted set. - if !set.is_empty() { - debug!(n = set.len(), "retaining file-less IDs"); - self.persisted.add_set(&set); - } - - // And delete any newly empty files - for id in remove_ids { - let file_set = self - .wal_files - .remove(&id) - .expect("id was obtained during iter"); - - // Invariant: the file being removed always has no references. - assert!(file_set.is_empty()); - - delete_file(&self.wal, id).await - } - } -} - -/// Remove the intersection of `a` and `b`, from both `a` and `b`, and return -/// the cardinality of the intersection. -fn clear_intersection(a: &mut SequenceNumberSet, b: &mut SequenceNumberSet) -> usize { - let intersection = sequence_number_set::intersect(a, b); - - a.remove_set(&intersection); - b.remove_set(&intersection); - - intersection.len() as _ -} - -/// Delete the specified WAL segment from `wal`, and log it at info. -async fn delete_file(wal: &T, id: SegmentId) -where - T: WalFileDeleter, -{ - info!( - %id, - "deleted fully-persisted wal segment" - ); - - wal.delete_file(id).await -} - #[cfg(test)] mod tests { - use std::{sync::Arc, time::Duration}; + + use std::time::Duration; use assert_matches::assert_matches; + use async_trait::async_trait; use data_types::{NamespaceId, PartitionId, TableId}; use futures::Future; - use metric::assert_counter; + use metric::{assert_counter, U64Gauge}; use parking_lot::Mutex; use test_helpers::timeout::FutureTimeout; use tokio::sync::Notify; diff --git a/ingester2/src/wal/reference_tracker/mod.rs b/ingester2/src/wal/reference_tracker/mod.rs new file mode 100644 index 0000000000..c45f3c80fe --- /dev/null +++ b/ingester2/src/wal/reference_tracker/mod.rs @@ -0,0 +1,8 @@ +mod actor; +mod handle; +mod wal_deleter; + +pub(crate) use actor::*; +#[allow(unused_imports)] // Used by docs - will be used by code soon. +pub(crate) use handle::*; +pub(crate) use wal_deleter::*; diff --git a/ingester2/src/wal/reference_tracker/wal_deleter.rs b/ingester2/src/wal/reference_tracker/wal_deleter.rs new file mode 100644 index 0000000000..dd5ce2402a --- /dev/null +++ b/ingester2/src/wal/reference_tracker/wal_deleter.rs @@ -0,0 +1,20 @@ +use std::{fmt::Debug, sync::Arc}; + +use async_trait::async_trait; +use wal::SegmentId; + +/// An abstraction defining the ability of an implementer to delete WAL segment +/// files by ID. +#[async_trait] +pub(crate) trait WalFileDeleter: Debug + Send + Sync + 'static { + /// Delete the WAL segment with the specified [`SegmentId`], or panic if + /// deletion fails. + async fn delete_file(&self, id: SegmentId); +} + +#[async_trait] +impl WalFileDeleter for Arc { + async fn delete_file(&self, id: SegmentId) { + self.delete(id).await.expect("failed to drop wal segment"); + } +} From 45d23f76523774b0b30f27e291ff6eef9c9a3f85 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Thu, 16 Mar 2023 15:19:52 +0100 Subject: [PATCH 12/15] refactor: `extract_chunks` return arrow schema (#7231) Similar to #7217 there is no need to convert the arrow schema to an IOx schema. This also makes it easier to handle the chunk order column in #6098. Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com> --- .../physical_optimizer/chunk_extraction.rs | 43 ++++++++----------- .../src/physical_optimizer/combine_chunks.rs | 4 +- .../dedup/dedup_null_columns.rs | 7 ++- .../dedup/dedup_sort_order.rs | 5 +-- .../dedup/partition_split.rs | 3 +- .../physical_optimizer/dedup/remove_dedup.rs | 3 +- .../physical_optimizer/dedup/time_split.rs | 3 +- 7 files changed, 28 insertions(+), 40 deletions(-) diff --git a/iox_query/src/physical_optimizer/chunk_extraction.rs b/iox_query/src/physical_optimizer/chunk_extraction.rs index f5f615bda2..87c72f7d01 100644 --- a/iox_query/src/physical_optimizer/chunk_extraction.rs +++ b/iox_query/src/physical_optimizer/chunk_extraction.rs @@ -1,5 +1,6 @@ use std::sync::Arc; +use arrow::datatypes::SchemaRef; use datafusion::{ error::DataFusionError, physical_plan::{ @@ -8,7 +9,6 @@ use datafusion::{ }, }; use observability_deps::tracing::debug; -use schema::Schema; use crate::{ provider::{PartitionedFileExt, RecordBatchesExec}, @@ -24,7 +24,7 @@ use crate::{ /// additional nodes (like de-duplication, filtering, projection) then NO data will be returned. /// /// [`chunks_to_physical_nodes`]: crate::provider::chunks_to_physical_nodes -pub fn extract_chunks(plan: &dyn ExecutionPlan) -> Option<(Schema, Vec>)> { +pub fn extract_chunks(plan: &dyn ExecutionPlan) -> Option<(SchemaRef, Vec>)> { let mut visitor = ExtractChunksVisitor::default(); if let Err(e) = visit_execution_plan(plan, &mut visitor) { debug!( @@ -39,7 +39,7 @@ pub fn extract_chunks(plan: &dyn ExecutionPlan) -> Option<(Schema, Vec>, - schema: Option, + schema: Option, } impl ExtractChunksVisitor { @@ -48,12 +48,7 @@ impl ExtractChunksVisitor { } fn add_schema_from_exec(&mut self, exec: &dyn ExecutionPlan) -> Result<(), DataFusionError> { - let schema = Schema::try_from(exec.schema()).map_err(|e| { - DataFusionError::Context( - "Schema recovery".to_owned(), - Box::new(DataFusionError::External(Box::new(e))), - ) - })?; + let schema = exec.schema(); if let Some(existing) = &self.schema { if existing != &schema { return Err(DataFusionError::External( @@ -146,20 +141,20 @@ mod tests { #[test] fn test_roundtrip_empty() { - let schema = chunk(1).schema().clone(); + let schema = chunk(1).schema().as_arrow(); assert_roundtrip(schema, vec![]); } #[test] fn test_roundtrip_single_record_batch() { let chunk1 = chunk(1); - assert_roundtrip(chunk1.schema().clone(), vec![Arc::new(chunk1)]); + assert_roundtrip(chunk1.schema().as_arrow(), vec![Arc::new(chunk1)]); } #[test] fn test_roundtrip_single_parquet() { let chunk1 = chunk(1).with_dummy_parquet_file(); - assert_roundtrip(chunk1.schema().clone(), vec![Arc::new(chunk1)]); + assert_roundtrip(chunk1.schema().as_arrow(), vec![Arc::new(chunk1)]); } #[test] @@ -170,7 +165,7 @@ mod tests { let chunk4 = chunk(4); let chunk5 = chunk(5); assert_roundtrip( - chunk1.schema().clone(), + chunk1.schema().as_arrow(), vec![ Arc::new(chunk1), Arc::new(chunk2), @@ -208,8 +203,10 @@ mod tests { DataType::Float64, true, )])); - let plan = EmptyExec::new(false, schema); - assert!(extract_chunks(&plan).is_none()); + let plan = EmptyExec::new(false, Arc::clone(&schema)); + let (schema2, chunks) = extract_chunks(&plan).unwrap(); + assert_eq!(schema, schema2); + assert!(chunks.is_empty()); } #[test] @@ -240,7 +237,8 @@ mod tests { .unwrap() .merge(&schema_ext) .unwrap() - .build(); + .build() + .as_arrow(); assert_roundtrip(schema, vec![Arc::new(chunk)]); } @@ -253,19 +251,14 @@ mod tests { .unwrap() .merge(&schema_ext) .unwrap() - .build(); + .build() + .as_arrow(); assert_roundtrip(schema, vec![Arc::new(chunk)]); } #[track_caller] - fn assert_roundtrip(schema: Schema, chunks: Vec>) { - let plan = chunks_to_physical_nodes( - &schema.as_arrow(), - None, - chunks.clone(), - Predicate::default(), - 2, - ); + fn assert_roundtrip(schema: SchemaRef, chunks: Vec>) { + let plan = chunks_to_physical_nodes(&schema, None, chunks.clone(), Predicate::default(), 2); let (schema2, chunks2) = extract_chunks(plan.as_ref()).expect("data found"); assert_eq!(schema, schema2); assert_eq!(chunk_ids(&chunks), chunk_ids(&chunks2)); diff --git a/iox_query/src/physical_optimizer/combine_chunks.rs b/iox_query/src/physical_optimizer/combine_chunks.rs index 78cd14602a..9347c4834f 100644 --- a/iox_query/src/physical_optimizer/combine_chunks.rs +++ b/iox_query/src/physical_optimizer/combine_chunks.rs @@ -34,9 +34,9 @@ impl PhysicalOptimizerRule for CombineChunks { config: &ConfigOptions, ) -> Result> { plan.transform_up(&|plan| { - if let Some((iox_schema, chunks)) = extract_chunks(plan.as_ref()) { + if let Some((schema, chunks)) = extract_chunks(plan.as_ref()) { return Ok(Some(chunks_to_physical_nodes( - &iox_schema.as_arrow(), + &schema, None, chunks, Predicate::new(), diff --git a/iox_query/src/physical_optimizer/dedup/dedup_null_columns.rs b/iox_query/src/physical_optimizer/dedup/dedup_null_columns.rs index f9790cb22c..c5e808e52d 100644 --- a/iox_query/src/physical_optimizer/dedup/dedup_null_columns.rs +++ b/iox_query/src/physical_optimizer/dedup/dedup_null_columns.rs @@ -44,7 +44,7 @@ impl PhysicalOptimizerRule for DedupNullColumns { return Ok(None); }; - let pk_cols = schema.primary_key().into_iter().collect::>(); + let pk_cols = dedup_exec.sort_columns(); let mut used_pk_cols = HashSet::new(); for chunk in &chunks { @@ -64,16 +64,15 @@ impl PhysicalOptimizerRule for DedupNullColumns { } let sort_key = sort_key_builder.build(); - let arrow_schema = schema.as_arrow(); let child = chunks_to_physical_nodes( - &arrow_schema, + &schema, (!sort_key.is_empty()).then_some(&sort_key), chunks, Predicate::new(), config.execution.target_partitions, ); - let sort_exprs = arrow_sort_key_exprs(&sort_key, &arrow_schema); + let sort_exprs = arrow_sort_key_exprs(&sort_key, &schema); return Ok(Some(Arc::new(DeduplicateExec::new(child, sort_exprs)))); } diff --git a/iox_query/src/physical_optimizer/dedup/dedup_sort_order.rs b/iox_query/src/physical_optimizer/dedup/dedup_sort_order.rs index 650f9dc141..de42c0c90f 100644 --- a/iox_query/src/physical_optimizer/dedup/dedup_sort_order.rs +++ b/iox_query/src/physical_optimizer/dedup/dedup_sort_order.rs @@ -124,16 +124,15 @@ impl PhysicalOptimizerRule for DedupSortOrder { } let quorum_sort_key = quorum_sort_key_builder.build(); - let arrow_schema = schema.as_arrow(); let child = chunks_to_physical_nodes( - &arrow_schema, + &schema, (!quorum_sort_key.is_empty()).then_some(&quorum_sort_key), chunks, Predicate::new(), config.execution.target_partitions, ); - let sort_exprs = arrow_sort_key_exprs(&quorum_sort_key, &arrow_schema); + let sort_exprs = arrow_sort_key_exprs(&quorum_sort_key, &schema); return Ok(Some(Arc::new(DeduplicateExec::new(child, sort_exprs)))); } diff --git a/iox_query/src/physical_optimizer/dedup/partition_split.rs b/iox_query/src/physical_optimizer/dedup/partition_split.rs index 066f97febb..0d21fc5113 100644 --- a/iox_query/src/physical_optimizer/dedup/partition_split.rs +++ b/iox_query/src/physical_optimizer/dedup/partition_split.rs @@ -76,14 +76,13 @@ impl PhysicalOptimizerRule for PartitionSplit { let mut chunks_by_partition = chunks_by_partition.into_iter().collect::>(); chunks_by_partition.sort_by_key(|(p_id, _chunks)| *p_id); - let arrow_schema = schema.as_arrow(); let out = UnionExec::new( chunks_by_partition .into_iter() .map(|(_p_id, chunks)| { Arc::new(DeduplicateExec::new( chunks_to_physical_nodes( - &arrow_schema, + &schema, None, chunks, Predicate::new(), diff --git a/iox_query/src/physical_optimizer/dedup/remove_dedup.rs b/iox_query/src/physical_optimizer/dedup/remove_dedup.rs index 1aae525dcf..edda8ac6b9 100644 --- a/iox_query/src/physical_optimizer/dedup/remove_dedup.rs +++ b/iox_query/src/physical_optimizer/dedup/remove_dedup.rs @@ -35,9 +35,8 @@ impl PhysicalOptimizerRule for RemoveDedup { }; if (chunks.len() < 2) && chunks.iter().all(|c| !c.may_contain_pk_duplicates()) { - let arrow_schema = schema.as_arrow(); return Ok(Some(chunks_to_physical_nodes( - &arrow_schema, + &schema, None, chunks, Predicate::new(), diff --git a/iox_query/src/physical_optimizer/dedup/time_split.rs b/iox_query/src/physical_optimizer/dedup/time_split.rs index a54715f34e..bed6b825cd 100644 --- a/iox_query/src/physical_optimizer/dedup/time_split.rs +++ b/iox_query/src/physical_optimizer/dedup/time_split.rs @@ -63,14 +63,13 @@ impl PhysicalOptimizerRule for TimeSplit { return Ok(None); } - let arrow_schema = schema.as_arrow(); let out = UnionExec::new( groups .into_iter() .map(|chunks| { Arc::new(DeduplicateExec::new( chunks_to_physical_nodes( - &arrow_schema, + &schema, None, chunks, Predicate::new(), From 0c36c60d66487b0ed47d4d34988be76ed43b2d5a Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 16 Mar 2023 15:27:21 +0100 Subject: [PATCH 13/15] refactor(flightsql): simplfy parameterized statement generation (#7232) * docs: Add doc link to command enum * refactor(flightsql): simplfy parameterized statement generation --------- Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com> --- flightsql/src/cmd.rs | 3 +++ flightsql/src/planner.rs | 49 ++++++++++++---------------------------- 2 files changed, 17 insertions(+), 35 deletions(-) diff --git a/flightsql/src/cmd.rs b/flightsql/src/cmd.rs index 782dfbc256..283799587a 100644 --- a/flightsql/src/cmd.rs +++ b/flightsql/src/cmd.rs @@ -61,6 +61,9 @@ impl From for Bytes { /// /// Handles encoding/decoding prost::Any messages back /// and forth to native Rust types +/// +/// TODO use / contribute upstream arrow-flight implementation, when ready: +/// #[derive(Debug, Clone, PartialEq)] pub enum FlightSQLCommand { CommandStatementQuery(CommandStatementQuery), diff --git a/flightsql/src/planner.rs b/flightsql/src/planner.rs index f64aba91e4..5cb3cd8313 100644 --- a/flightsql/src/planner.rs +++ b/flightsql/src/planner.rs @@ -218,41 +218,20 @@ async fn plan_get_db_schemas( catalog: Option, db_schema_filter_pattern: Option, ) -> Result { - let (query, params) = match (catalog, db_schema_filter_pattern) { - (Some(catalog), Some(db_schema_filter_pattern)) => ( - "PREPARE my_plan(VARCHAR, VARCHAR) AS \ - SELECT DISTINCT table_catalog AS catalog_name, table_schema AS db_schema_name \ - FROM information_schema.tables \ - WHERE table_catalog like $1 AND table_schema like $2 \ - ORDER BY table_catalog, table_schema", - vec![ - ScalarValue::Utf8(Some(catalog)), - ScalarValue::Utf8(Some(db_schema_filter_pattern)), - ], - ), - (None, Some(db_schema_filter_pattern)) => ( - "PREPARE my_plan(VARCHAR) AS \ - SELECT DISTINCT table_catalog AS catalog_name, table_schema AS db_schema_name \ - FROM information_schema.tables \ - WHERE table_schema like $1 \ - ORDER BY table_catalog, table_schema", - vec![ScalarValue::Utf8(Some(db_schema_filter_pattern))], - ), - (Some(catalog), None) => ( - "PREPARE my_plan(VARCHAR) AS \ - SELECT DISTINCT table_catalog AS catalog_name, table_schema AS db_schema_name \ - FROM information_schema.tables \ - WHERE table_catalog like $1 \ - ORDER BY table_catalog, table_schema", - vec![ScalarValue::Utf8(Some(catalog))], - ), - (None, None) => ( - "SELECT DISTINCT table_catalog AS catalog_name, table_schema AS db_schema_name \ - FROM information_schema.tables \ - ORDER BY table_catalog, table_schema", - vec![], - ), - }; + // use '%' to match anything if filters are not specified + let catalog = catalog.unwrap_or_else(|| String::from("%")); + let db_schema_filter_pattern = db_schema_filter_pattern.unwrap_or_else(|| String::from("%")); + + let query = "PREPARE my_plan(VARCHAR, VARCHAR) AS \ + SELECT DISTINCT table_catalog AS catalog_name, table_schema AS db_schema_name \ + FROM information_schema.tables \ + WHERE table_catalog like $1 AND table_schema like $2 \ + ORDER BY table_catalog, table_schema"; + + let params = vec![ + ScalarValue::Utf8(Some(catalog)), + ScalarValue::Utf8(Some(db_schema_filter_pattern)), + ]; let plan = ctx.plan_sql(query).await?; debug!(?plan, "Prepared plan is"); From 7dfaa05e8a071df11f01e01386b0a48a92fbb65d Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 16 Mar 2023 15:34:40 +0100 Subject: [PATCH 14/15] chore: Update datafusion again (#7208) * chore: update datafusion again * fix: update test * fix: use table_reference * fix: clean up import * chore: Run cargo hakari tasks --------- Co-authored-by: CircleCI[bot] Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com> --- Cargo.lock | 18 +++++++++--------- Cargo.toml | 4 ++-- influxdb_iox/tests/query_tests2/sql_errors.rs | 3 +-- iox_query/src/frontend/common.rs | 7 +++++-- .../src/physical_optimizer/combine_chunks.rs | 2 +- .../dedup/dedup_null_columns.rs | 2 +- .../dedup/dedup_sort_order.rs | 2 +- .../dedup/partition_split.rs | 2 +- .../physical_optimizer/dedup/remove_dedup.rs | 2 +- .../src/physical_optimizer/dedup/time_split.rs | 2 +- .../physical_optimizer/predicate_pushdown.rs | 2 +- .../physical_optimizer/projection_pushdown.rs | 2 +- .../physical_optimizer/sort/redundant_sort.rs | 2 +- .../physical_optimizer/sort/sort_pushdown.rs | 2 +- .../physical_optimizer/union/nested_union.rs | 2 +- .../src/physical_optimizer/union/one_union.rs | 2 +- iox_query_influxql/src/plan/planner.rs | 4 +++- workspace-hack/Cargo.toml | 6 +++--- 18 files changed, 35 insertions(+), 31 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c4b3a75342..f9cc41467f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1415,7 +1415,7 @@ dependencies = [ [[package]] name = "datafusion" version = "20.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=612eb1d0ce338af7980fa906df8796eb47c4be44#612eb1d0ce338af7980fa906df8796eb47c4be44" +source = "git+https://github.com/apache/arrow-datafusion.git?rev=4afd67a0e496e1834ad6184629f28e60f66b2777#4afd67a0e496e1834ad6184629f28e60f66b2777" dependencies = [ "ahash 0.8.3", "arrow", @@ -1462,7 +1462,7 @@ dependencies = [ [[package]] name = "datafusion-common" version = "20.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=612eb1d0ce338af7980fa906df8796eb47c4be44#612eb1d0ce338af7980fa906df8796eb47c4be44" +source = "git+https://github.com/apache/arrow-datafusion.git?rev=4afd67a0e496e1834ad6184629f28e60f66b2777#4afd67a0e496e1834ad6184629f28e60f66b2777" dependencies = [ "arrow", "chrono", @@ -1475,7 +1475,7 @@ dependencies = [ [[package]] name = "datafusion-execution" version = "20.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=612eb1d0ce338af7980fa906df8796eb47c4be44#612eb1d0ce338af7980fa906df8796eb47c4be44" +source = "git+https://github.com/apache/arrow-datafusion.git?rev=4afd67a0e496e1834ad6184629f28e60f66b2777#4afd67a0e496e1834ad6184629f28e60f66b2777" dependencies = [ "dashmap", "datafusion-common", @@ -1492,7 +1492,7 @@ dependencies = [ [[package]] name = "datafusion-expr" version = "20.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=612eb1d0ce338af7980fa906df8796eb47c4be44#612eb1d0ce338af7980fa906df8796eb47c4be44" +source = "git+https://github.com/apache/arrow-datafusion.git?rev=4afd67a0e496e1834ad6184629f28e60f66b2777#4afd67a0e496e1834ad6184629f28e60f66b2777" dependencies = [ "ahash 0.8.3", "arrow", @@ -1503,7 +1503,7 @@ dependencies = [ [[package]] name = "datafusion-optimizer" version = "20.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=612eb1d0ce338af7980fa906df8796eb47c4be44#612eb1d0ce338af7980fa906df8796eb47c4be44" +source = "git+https://github.com/apache/arrow-datafusion.git?rev=4afd67a0e496e1834ad6184629f28e60f66b2777#4afd67a0e496e1834ad6184629f28e60f66b2777" dependencies = [ "arrow", "async-trait", @@ -1520,7 +1520,7 @@ dependencies = [ [[package]] name = "datafusion-physical-expr" version = "20.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=612eb1d0ce338af7980fa906df8796eb47c4be44#612eb1d0ce338af7980fa906df8796eb47c4be44" +source = "git+https://github.com/apache/arrow-datafusion.git?rev=4afd67a0e496e1834ad6184629f28e60f66b2777#4afd67a0e496e1834ad6184629f28e60f66b2777" dependencies = [ "ahash 0.8.3", "arrow", @@ -1550,7 +1550,7 @@ dependencies = [ [[package]] name = "datafusion-proto" version = "20.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=612eb1d0ce338af7980fa906df8796eb47c4be44#612eb1d0ce338af7980fa906df8796eb47c4be44" +source = "git+https://github.com/apache/arrow-datafusion.git?rev=4afd67a0e496e1834ad6184629f28e60f66b2777#4afd67a0e496e1834ad6184629f28e60f66b2777" dependencies = [ "arrow", "chrono", @@ -1566,7 +1566,7 @@ dependencies = [ [[package]] name = "datafusion-row" version = "20.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=612eb1d0ce338af7980fa906df8796eb47c4be44#612eb1d0ce338af7980fa906df8796eb47c4be44" +source = "git+https://github.com/apache/arrow-datafusion.git?rev=4afd67a0e496e1834ad6184629f28e60f66b2777#4afd67a0e496e1834ad6184629f28e60f66b2777" dependencies = [ "arrow", "datafusion-common", @@ -1577,7 +1577,7 @@ dependencies = [ [[package]] name = "datafusion-sql" version = "20.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=612eb1d0ce338af7980fa906df8796eb47c4be44#612eb1d0ce338af7980fa906df8796eb47c4be44" +source = "git+https://github.com/apache/arrow-datafusion.git?rev=4afd67a0e496e1834ad6184629f28e60f66b2777#4afd67a0e496e1834ad6184629f28e60f66b2777" dependencies = [ "arrow-schema", "datafusion-common", diff --git a/Cargo.toml b/Cargo.toml index 93e61af688..6ec0f42d89 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -118,8 +118,8 @@ license = "MIT OR Apache-2.0" [workspace.dependencies] arrow = { version = "34.0.0" } arrow-flight = { version = "34.0.0" } -datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev="612eb1d0ce338af7980fa906df8796eb47c4be44", default-features = false } -datafusion-proto = { git = "https://github.com/apache/arrow-datafusion.git", rev="612eb1d0ce338af7980fa906df8796eb47c4be44" } +datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev="4afd67a0e496e1834ad6184629f28e60f66b2777", default-features = false } +datafusion-proto = { git = "https://github.com/apache/arrow-datafusion.git", rev="4afd67a0e496e1834ad6184629f28e60f66b2777" } hashbrown = { version = "0.13.2" } parquet = { version = "34.0.0" } diff --git a/influxdb_iox/tests/query_tests2/sql_errors.rs b/influxdb_iox/tests/query_tests2/sql_errors.rs index 68122ea269..58b6e12029 100644 --- a/influxdb_iox/tests/query_tests2/sql_errors.rs +++ b/influxdb_iox/tests/query_tests2/sql_errors.rs @@ -10,8 +10,7 @@ async fn schema_merge_nonexistent_column() { setup_name: "MultiChunkSchemaMerge", sql: "SELECT * from cpu where foo = 8", expected_error_code: tonic::Code::InvalidArgument, - expected_message: "Error while planning query: Schema error: No field named 'foo'. \ - Valid fields are 'cpu'.'host', 'cpu'.'region', 'cpu'.'system', 'cpu'.'time', 'cpu'.'user'.", + expected_message: r#"Error while planning query: Schema error: No field named "foo". Valid fields are "cpu"."host", "cpu"."region", "cpu"."system", "cpu"."time", "cpu"."user"."#, } .run() .await; diff --git a/iox_query/src/frontend/common.rs b/iox_query/src/frontend/common.rs index f870b23304..c1b1b4fbe6 100644 --- a/iox_query/src/frontend/common.rs +++ b/iox_query/src/frontend/common.rs @@ -1,6 +1,7 @@ use std::sync::Arc; use datafusion::{ + catalog::TableReference, datasource::provider_as_source, logical_expr::{expr_rewriter::ExprRewritable, LogicalPlanBuilder}, }; @@ -183,8 +184,10 @@ impl<'a> ScanPlanBuilder<'a> { // later if possible) let projection = None; - let mut plan_builder = LogicalPlanBuilder::scan(table_name.as_ref(), source, projection) - .context(BuildingPlanSnafu)?; + // Do not parse the tablename as a SQL identifer, but use as is + let table_ref = TableReference::bare(table_name.to_string()); + let mut plan_builder = + LogicalPlanBuilder::scan(table_ref, source, projection).context(BuildingPlanSnafu)?; // Use a filter node to add general predicates + timestamp // range, if any diff --git a/iox_query/src/physical_optimizer/combine_chunks.rs b/iox_query/src/physical_optimizer/combine_chunks.rs index 9347c4834f..ca92f817f9 100644 --- a/iox_query/src/physical_optimizer/combine_chunks.rs +++ b/iox_query/src/physical_optimizer/combine_chunks.rs @@ -4,7 +4,7 @@ use datafusion::{ config::ConfigOptions, error::Result, physical_optimizer::PhysicalOptimizerRule, - physical_plan::{rewrite::TreeNodeRewritable, ExecutionPlan}, + physical_plan::{tree_node::TreeNodeRewritable, ExecutionPlan}, }; use predicate::Predicate; diff --git a/iox_query/src/physical_optimizer/dedup/dedup_null_columns.rs b/iox_query/src/physical_optimizer/dedup/dedup_null_columns.rs index c5e808e52d..1c1c8e7112 100644 --- a/iox_query/src/physical_optimizer/dedup/dedup_null_columns.rs +++ b/iox_query/src/physical_optimizer/dedup/dedup_null_columns.rs @@ -4,7 +4,7 @@ use datafusion::{ config::ConfigOptions, error::Result, physical_optimizer::PhysicalOptimizerRule, - physical_plan::{rewrite::TreeNodeRewritable, ExecutionPlan}, + physical_plan::{tree_node::TreeNodeRewritable, ExecutionPlan}, }; use predicate::Predicate; use schema::{sort::SortKeyBuilder, TIME_COLUMN_NAME}; diff --git a/iox_query/src/physical_optimizer/dedup/dedup_sort_order.rs b/iox_query/src/physical_optimizer/dedup/dedup_sort_order.rs index de42c0c90f..8724061178 100644 --- a/iox_query/src/physical_optimizer/dedup/dedup_sort_order.rs +++ b/iox_query/src/physical_optimizer/dedup/dedup_sort_order.rs @@ -5,7 +5,7 @@ use datafusion::{ config::ConfigOptions, error::Result, physical_optimizer::PhysicalOptimizerRule, - physical_plan::{rewrite::TreeNodeRewritable, ExecutionPlan}, + physical_plan::{tree_node::TreeNodeRewritable, ExecutionPlan}, }; use indexmap::IndexSet; use predicate::Predicate; diff --git a/iox_query/src/physical_optimizer/dedup/partition_split.rs b/iox_query/src/physical_optimizer/dedup/partition_split.rs index 0d21fc5113..84eb2af75a 100644 --- a/iox_query/src/physical_optimizer/dedup/partition_split.rs +++ b/iox_query/src/physical_optimizer/dedup/partition_split.rs @@ -5,7 +5,7 @@ use datafusion::{ config::ConfigOptions, error::Result, physical_optimizer::PhysicalOptimizerRule, - physical_plan::{rewrite::TreeNodeRewritable, union::UnionExec, ExecutionPlan}, + physical_plan::{tree_node::TreeNodeRewritable, union::UnionExec, ExecutionPlan}, }; use hashbrown::HashMap; use observability_deps::tracing::warn; diff --git a/iox_query/src/physical_optimizer/dedup/remove_dedup.rs b/iox_query/src/physical_optimizer/dedup/remove_dedup.rs index edda8ac6b9..9a1f18572a 100644 --- a/iox_query/src/physical_optimizer/dedup/remove_dedup.rs +++ b/iox_query/src/physical_optimizer/dedup/remove_dedup.rs @@ -4,7 +4,7 @@ use datafusion::{ config::ConfigOptions, error::Result, physical_optimizer::PhysicalOptimizerRule, - physical_plan::{rewrite::TreeNodeRewritable, ExecutionPlan}, + physical_plan::{tree_node::TreeNodeRewritable, ExecutionPlan}, }; use predicate::Predicate; diff --git a/iox_query/src/physical_optimizer/dedup/time_split.rs b/iox_query/src/physical_optimizer/dedup/time_split.rs index bed6b825cd..c3df94cabb 100644 --- a/iox_query/src/physical_optimizer/dedup/time_split.rs +++ b/iox_query/src/physical_optimizer/dedup/time_split.rs @@ -4,7 +4,7 @@ use datafusion::{ config::ConfigOptions, error::Result, physical_optimizer::PhysicalOptimizerRule, - physical_plan::{rewrite::TreeNodeRewritable, union::UnionExec, ExecutionPlan}, + physical_plan::{tree_node::TreeNodeRewritable, union::UnionExec, ExecutionPlan}, }; use observability_deps::tracing::warn; use predicate::Predicate; diff --git a/iox_query/src/physical_optimizer/predicate_pushdown.rs b/iox_query/src/physical_optimizer/predicate_pushdown.rs index 2d95b52ce4..f86ffe67e9 100644 --- a/iox_query/src/physical_optimizer/predicate_pushdown.rs +++ b/iox_query/src/physical_optimizer/predicate_pushdown.rs @@ -15,7 +15,7 @@ use datafusion::{ expressions::{BinaryExpr, Column}, file_format::ParquetExec, filter::FilterExec, - rewrite::TreeNodeRewritable, + tree_node::TreeNodeRewritable, union::UnionExec, ExecutionPlan, PhysicalExpr, }, diff --git a/iox_query/src/physical_optimizer/projection_pushdown.rs b/iox_query/src/physical_optimizer/projection_pushdown.rs index afa5a1e3c0..3e3cd839a0 100644 --- a/iox_query/src/physical_optimizer/projection_pushdown.rs +++ b/iox_query/src/physical_optimizer/projection_pushdown.rs @@ -18,8 +18,8 @@ use datafusion::{ file_format::{FileScanConfig, ParquetExec}, filter::FilterExec, projection::ProjectionExec, - rewrite::TreeNodeRewritable, sorts::{sort::SortExec, sort_preserving_merge::SortPreservingMergeExec}, + tree_node::TreeNodeRewritable, union::UnionExec, ExecutionPlan, PhysicalExpr, }, diff --git a/iox_query/src/physical_optimizer/sort/redundant_sort.rs b/iox_query/src/physical_optimizer/sort/redundant_sort.rs index e33ef12bed..01eb7a5578 100644 --- a/iox_query/src/physical_optimizer/sort/redundant_sort.rs +++ b/iox_query/src/physical_optimizer/sort/redundant_sort.rs @@ -4,7 +4,7 @@ use datafusion::{ config::ConfigOptions, error::Result, physical_optimizer::PhysicalOptimizerRule, - physical_plan::{rewrite::TreeNodeRewritable, sorts::sort::SortExec, ExecutionPlan}, + physical_plan::{sorts::sort::SortExec, tree_node::TreeNodeRewritable, ExecutionPlan}, }; /// Removes [`SortExec`] if it is no longer needed. diff --git a/iox_query/src/physical_optimizer/sort/sort_pushdown.rs b/iox_query/src/physical_optimizer/sort/sort_pushdown.rs index 4afc32d83d..f1fef8328d 100644 --- a/iox_query/src/physical_optimizer/sort/sort_pushdown.rs +++ b/iox_query/src/physical_optimizer/sort/sort_pushdown.rs @@ -5,7 +5,7 @@ use datafusion::{ error::Result, physical_optimizer::PhysicalOptimizerRule, physical_plan::{ - rewrite::TreeNodeRewritable, sorts::sort::SortExec, union::UnionExec, ExecutionPlan, + sorts::sort::SortExec, tree_node::TreeNodeRewritable, union::UnionExec, ExecutionPlan, }, }; diff --git a/iox_query/src/physical_optimizer/union/nested_union.rs b/iox_query/src/physical_optimizer/union/nested_union.rs index ac3335250e..97961217fa 100644 --- a/iox_query/src/physical_optimizer/union/nested_union.rs +++ b/iox_query/src/physical_optimizer/union/nested_union.rs @@ -4,7 +4,7 @@ use datafusion::{ config::ConfigOptions, error::Result, physical_optimizer::PhysicalOptimizerRule, - physical_plan::{rewrite::TreeNodeRewritable, union::UnionExec, ExecutionPlan}, + physical_plan::{tree_node::TreeNodeRewritable, union::UnionExec, ExecutionPlan}, }; /// Optimizer that replaces nested [`UnionExec`]s with a single level. diff --git a/iox_query/src/physical_optimizer/union/one_union.rs b/iox_query/src/physical_optimizer/union/one_union.rs index 356b85fb08..fc52130704 100644 --- a/iox_query/src/physical_optimizer/union/one_union.rs +++ b/iox_query/src/physical_optimizer/union/one_union.rs @@ -4,7 +4,7 @@ use datafusion::{ config::ConfigOptions, error::Result, physical_optimizer::PhysicalOptimizerRule, - physical_plan::{rewrite::TreeNodeRewritable, union::UnionExec, ExecutionPlan}, + physical_plan::{tree_node::TreeNodeRewritable, union::UnionExec, ExecutionPlan}, }; /// Optimizer that replaces [`UnionExec`] with a single child node w/ the child note itself. diff --git a/iox_query_influxql/src/plan/planner.rs b/iox_query_influxql/src/plan/planner.rs index 1ab406f9f5..54a6ed8e6f 100644 --- a/iox_query_influxql/src/plan/planner.rs +++ b/iox_query_influxql/src/plan/planner.rs @@ -4,6 +4,7 @@ use crate::plan::rewriter::rewrite_statement; use crate::plan::util::{binary_operator_to_df_operator, Schemas}; use crate::plan::var_ref::{column_type_to_var_ref_data_type, var_ref_data_type_to_data_type}; use arrow::datatypes::DataType; +use datafusion::catalog::TableReference; use datafusion::common::{DFSchema, DFSchemaRef, DataFusionError, Result, ScalarValue, ToDFSchema}; use datafusion::logical_expr::expr_rewriter::{normalize_col, ExprRewritable, ExprRewriter}; use datafusion::logical_expr::logical_plan::builder::project; @@ -725,8 +726,9 @@ impl<'a> InfluxQLToLogicalPlan<'a> { /// by the [`rewrite_statement`] function. fn create_table_ref(&self, table_name: String) -> Result> { Ok(if let Ok(source) = self.s.get_table_provider(&table_name) { + let table_ref = TableReference::bare(table_name.to_string()); Some(project( - LogicalPlanBuilder::scan(&table_name, source, None)?.build()?, + LogicalPlanBuilder::scan(table_ref, source, None)?.build()?, iter::once(lit_dict(&table_name).alias(INFLUXQL_MEASUREMENT_COLUMN_NAME)), )?) } else { diff --git a/workspace-hack/Cargo.toml b/workspace-hack/Cargo.toml index 5475f89b69..63a37dc635 100644 --- a/workspace-hack/Cargo.toml +++ b/workspace-hack/Cargo.toml @@ -29,9 +29,9 @@ bytes = { version = "1" } chrono = { version = "0.4", default-features = false, features = ["alloc", "clock", "serde"] } crossbeam-utils = { version = "0.8" } crypto-common = { version = "0.1", default-features = false, features = ["std"] } -datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "612eb1d0ce338af7980fa906df8796eb47c4be44" } -datafusion-optimizer = { git = "https://github.com/apache/arrow-datafusion.git", rev = "612eb1d0ce338af7980fa906df8796eb47c4be44", default-features = false, features = ["crypto_expressions", "regex_expressions", "unicode_expressions"] } -datafusion-physical-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev = "612eb1d0ce338af7980fa906df8796eb47c4be44", default-features = false, features = ["crypto_expressions", "regex_expressions", "unicode_expressions"] } +datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "4afd67a0e496e1834ad6184629f28e60f66b2777" } +datafusion-optimizer = { git = "https://github.com/apache/arrow-datafusion.git", rev = "4afd67a0e496e1834ad6184629f28e60f66b2777", default-features = false, features = ["crypto_expressions", "regex_expressions", "unicode_expressions"] } +datafusion-physical-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev = "4afd67a0e496e1834ad6184629f28e60f66b2777", default-features = false, features = ["crypto_expressions", "regex_expressions", "unicode_expressions"] } digest = { version = "0.10", features = ["mac", "std"] } either = { version = "1" } fixedbitset = { version = "0.4" } From 3f0073807fbedb10eae9c533d67f0b907c418bcd Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 16 Mar 2023 15:41:49 +0100 Subject: [PATCH 15/15] chore(flightsql): `insta`ize flightsql end to end tests (#7236) Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com> --- .../tests/end_to_end_cases/flightsql.rs | 68 ++++++++++--------- 1 file changed, 36 insertions(+), 32 deletions(-) diff --git a/influxdb_iox/tests/end_to_end_cases/flightsql.rs b/influxdb_iox/tests/end_to_end_cases/flightsql.rs index 2c248a4caf..31c1d6f62b 100644 --- a/influxdb_iox/tests/end_to_end_cases/flightsql.rs +++ b/influxdb_iox/tests/end_to_end_cases/flightsql.rs @@ -2,7 +2,7 @@ use std::path::PathBuf; use arrow::record_batch::RecordBatch; use arrow_flight::decode::FlightRecordBatchStream; -use arrow_util::{assert_batches_sorted_eq, test_util::batches_to_sorted_lines}; +use arrow_util::test_util::batches_to_sorted_lines; use assert_cmd::Command; use datafusion::common::assert_contains; use futures::{FutureExt, TryStreamExt}; @@ -30,21 +30,22 @@ async fn flightsql_adhoc_query() { Step::Custom(Box::new(move |state: &mut StepTestState| { async move { let sql = format!("select * from {table_name}"); - let expected = vec![ - "+------+------+--------------------------------+-----+", - "| tag1 | tag2 | time | val |", - "+------+------+--------------------------------+-----+", - "| A | B | 1970-01-01T00:00:00.000123456Z | 42 |", - "| A | C | 1970-01-01T00:00:00.000123457Z | 43 |", - "+------+------+--------------------------------+-----+", - ]; - let mut client = flightsql_client(state.cluster()); let stream = client.query(sql).await.unwrap(); let batches = collect_stream(stream).await; - - assert_batches_sorted_eq!(&expected, &batches); + insta::assert_yaml_snapshot!( + batches_to_sorted_lines(&batches), + @r###" + --- + - +------+------+--------------------------------+-----+ + - "| tag1 | tag2 | time | val |" + - +------+------+--------------------------------+-----+ + - "| A | B | 1970-01-01T00:00:00.000123456Z | 42 |" + - "| A | C | 1970-01-01T00:00:00.000123457Z | 43 |" + - +------+------+--------------------------------+-----+ + "### + ); } .boxed() })), @@ -112,23 +113,24 @@ async fn flightsql_prepared_query() { Step::Custom(Box::new(move |state: &mut StepTestState| { async move { let sql = format!("select * from {table_name}"); - let expected = vec![ - "+------+------+--------------------------------+-----+", - "| tag1 | tag2 | time | val |", - "+------+------+--------------------------------+-----+", - "| A | B | 1970-01-01T00:00:00.000123456Z | 42 |", - "| A | C | 1970-01-01T00:00:00.000123457Z | 43 |", - "+------+------+--------------------------------+-----+", - ]; - let mut client = flightsql_client(state.cluster()); let handle = client.prepare(sql).await.unwrap(); let stream = client.execute(handle).await.unwrap(); let batches = collect_stream(stream).await; - - assert_batches_sorted_eq!(&expected, &batches); + insta::assert_yaml_snapshot!( + batches_to_sorted_lines(&batches), + @r###" + --- + - +------+------+--------------------------------+-----+ + - "| tag1 | tag2 | time | val |" + - +------+------+--------------------------------+-----+ + - "| A | B | 1970-01-01T00:00:00.000123456Z | 42 |" + - "| A | C | 1970-01-01T00:00:00.000123457Z | 43 |" + - +------+------+--------------------------------+-----+ + "### + ); } .boxed() })), @@ -157,20 +159,22 @@ async fn flightsql_get_catalogs() { )), Step::Custom(Box::new(move |state: &mut StepTestState| { async move { - let expected = vec![ - "+--------------+", - "| catalog_name |", - "+--------------+", - "| public |", - "+--------------+", - ]; - let mut client = flightsql_client(state.cluster()); let stream = client.get_catalogs().await.unwrap(); let batches = collect_stream(stream).await; - assert_batches_sorted_eq!(&expected, &batches); + insta::assert_yaml_snapshot!( + batches_to_sorted_lines(&batches), + @r###" + --- + - +--------------+ + - "| catalog_name |" + - +--------------+ + - "| public |" + - +--------------+ + "### + ); } .boxed() })),