From b8a80869d40f51df7d374aff6ede598b99ef8317 Mon Sep 17 00:00:00 2001 From: Nga Tran Date: Thu, 26 Jan 2023 05:52:47 -0500 Subject: [PATCH 1/3] feat: introduce a new way of max_sequence_number for ingester, compactor and querier (#6692) * feat: introduce a new way of max_sequence_number for ingester, compactor and querier * chore: cleanup * feat: new column max_l0_created_at to order files for deduplication * chore: cleanup * chore: debug info for chnaging cpu.parquet * fix: update test parquet file Co-authored-by: Marco Neumann --- compactor/src/compact.rs | 1 + compactor/src/garbage_collector.rs | 3 ++ compactor/src/parquet_file_combining.rs | 4 +- compactor/src/parquet_file_filtering.rs | 1 + compactor2/src/compactor_tests.rs | 33 ++++++++++++++++ .../src/components/df_planner/query_chunk.rs | 2 +- .../components/parquet_file_sink/dedicated.rs | 14 ++++++- .../components/parquet_file_sink/logging.rs | 4 +- .../src/components/parquet_file_sink/mock.rs | 13 +++++-- .../src/components/parquet_file_sink/mod.rs | 2 + .../parquet_file_sink/object_store.rs | 4 +- compactor2/src/driver.rs | 12 +++++- compactor2/src/test_util.rs | 36 ++++++++++-------- data_types/src/lib.rs | 12 ++++++ garbage_collector/src/objectstore/checker.rs | 1 + .../iox/catalog/v1/parquet_file.proto | 2 + .../iox/ingester/v1/parquet_metadata.proto | 3 ++ influxdb_iox/src/commands/remote/partition.rs | 3 ++ influxdb_iox/tests/end_to_end_cases/cli.rs | 4 +- ingester/src/buffer_tree/namespace.rs | 1 + ingester/src/data.rs | 6 ++- ingester2/src/persist/mod.rs | 13 +++++-- ingester2/src/persist/worker.rs | 4 +- ...0230125103737_create_max_l0_crrated_at.sql | 3 ++ iox_catalog/src/interface.rs | 15 ++++++++ iox_catalog/src/postgres.rs | 22 ++++++----- iox_tests/src/util.rs | 13 +++++++ iox_time/src/lib.rs | 18 +++++++++ parquet_file/src/metadata.rs | 18 ++++++++- parquet_file/src/serialize.rs | 1 + parquet_file/src/storage.rs | 1 + parquet_file/tests/metadata.rs | 5 +++ querier/src/cache/parquet_file.rs | 4 +- querier/src/namespace/mod.rs | 4 +- querier/src/parquet/creation.rs | 2 +- service_grpc_catalog/src/lib.rs | 2 + service_grpc_object_store/src/lib.rs | 1 + test_fixtures/cpu.parquet | Bin 25424 -> 1967 bytes 38 files changed, 238 insertions(+), 49 deletions(-) create mode 100644 iox_catalog/migrations/20230125103737_create_max_l0_crrated_at.sql diff --git a/compactor/src/compact.rs b/compactor/src/compact.rs index bd4e1d82d3..c7177da52b 100644 --- a/compactor/src/compact.rs +++ b/compactor/src/compact.rs @@ -884,6 +884,7 @@ pub mod tests { compaction_level: CompactionLevel::Initial, // level of file of new writes created_at: time_9_hour_ago, // create cold files by default column_set: ColumnSet::new([ColumnId::new(1), ColumnId::new(2)]), + max_l0_created_at: time_9_hour_ago, }; // Note: The order of the test cases below is important and should not be changed diff --git a/compactor/src/garbage_collector.rs b/compactor/src/garbage_collector.rs index f1094922ce..3110c70550 100644 --- a/compactor/src/garbage_collector.rs +++ b/compactor/src/garbage_collector.rs @@ -185,6 +185,7 @@ mod tests { created_at: Timestamp::new(1), compaction_level: CompactionLevel::Initial, column_set: ColumnSet::new([ColumnId::new(1), ColumnId::new(2)]), + max_l0_created_at: Timestamp::new(1), }; let parquet_file = txn .parquet_files() @@ -265,6 +266,7 @@ mod tests { created_at: Timestamp::new(1), compaction_level: CompactionLevel::Initial, column_set: ColumnSet::new([ColumnId::new(1), ColumnId::new(2)]), + max_l0_created_at: Timestamp::new(1), }; let parquet_file = txn .parquet_files() @@ -349,6 +351,7 @@ mod tests { created_at: Timestamp::new(1), compaction_level: CompactionLevel::Initial, column_set: ColumnSet::new([ColumnId::new(1), ColumnId::new(2)]), + max_l0_created_at: Timestamp::new(1), }; let parquet_file = txn .parquet_files() diff --git a/compactor/src/parquet_file_combining.rs b/compactor/src/parquet_file_combining.rs index e0574699e5..48574b508e 100644 --- a/compactor/src/parquet_file_combining.rs +++ b/compactor/src/parquet_file_combining.rs @@ -650,9 +650,10 @@ impl CompactPlan { .context(ExecuteCompactPlanSnafu)?; trace!(partition = i, "built result stream for partition"); + let time_now = time_provider.now(); let meta = IoxMetadata { object_store_id: Uuid::new_v4(), - creation_timestamp: time_provider.now(), + creation_timestamp: time_now, shard_id: partition.shard_id(), namespace_id: partition.namespace_id(), namespace_name: partition.namespace.name.clone().into(), @@ -663,6 +664,7 @@ impl CompactPlan { max_sequence_number, compaction_level: target_level, sort_key: Some(sort_key.clone()), + max_l0_created_at: time_now, }; debug!( diff --git a/compactor/src/parquet_file_filtering.rs b/compactor/src/parquet_file_filtering.rs index 1b2f5c2096..dd7ec162c5 100644 --- a/compactor/src/parquet_file_filtering.rs +++ b/compactor/src/parquet_file_filtering.rs @@ -1836,6 +1836,7 @@ mod tests { compaction_level, created_at: Timestamp::new(12), column_set: ColumnSet::new(std::iter::empty()), + max_l0_created_at: Timestamp::new(12), }; // Estimated arrow bytes for one file with a tag, a time and 11 rows = 1176 CompactorParquetFile::new(f, ESTIMATED_STREAM_BYTES, (file_size_bytes * 2) as u64) diff --git a/compactor2/src/compactor_tests.rs b/compactor2/src/compactor_tests.rs index 0eb348d1fa..48eaed8ab0 100644 --- a/compactor2/src/compactor_tests.rs +++ b/compactor2/src/compactor_tests.rs @@ -56,6 +56,29 @@ mod tests { (6, CompactionLevel::Initial), ] ); + // verify ID and max_l0_created_at + let time_provider = Arc::clone(&setup.config.time_provider); + + let time_1_minute_future = time_provider.minutes_into_future(1).timestamp_nanos(); + let time_2_minutes_future = time_provider.minutes_into_future(2).timestamp_nanos(); + let time_3_minutes_future = time_provider.minutes_into_future(3).timestamp_nanos(); + let time_5_minutes_future = time_provider.minutes_into_future(5).timestamp_nanos(); + + let files_and_max_l0_created_ats: Vec<_> = files + .iter() + .map(|f| (f.id.get(), f.max_l0_created_at.get())) + .collect(); + assert_eq!( + files_and_max_l0_created_ats, + vec![ + (1, time_1_minute_future), + (2, time_2_minutes_future), + (3, time_5_minutes_future), + (4, time_3_minutes_future), + (5, time_5_minutes_future), + (6, time_2_minutes_future), + ] + ); // compact run_compact(&setup).await; @@ -77,6 +100,16 @@ mod tests { (8, CompactionLevel::FileNonOverlapped), ] ); + // verify ID and max_l0_created_at + let files_and_max_l0_created_ats: Vec<_> = files + .iter() + .map(|f| (f.id.get(), f.max_l0_created_at.get())) + .collect(); + // both files have max_l0_created time_5_minutes_future which is the max of all L0 input's max_l0_created_at + assert_eq!( + files_and_max_l0_created_ats, + vec![(7, time_5_minutes_future), (8, time_5_minutes_future),] + ); // verify the content of files // Compacted smaller file with the later data diff --git a/compactor2/src/components/df_planner/query_chunk.rs b/compactor2/src/components/df_planner/query_chunk.rs index c01050ceb3..32ca0de9d8 100644 --- a/compactor2/src/components/df_planner/query_chunk.rs +++ b/compactor2/src/components/df_planner/query_chunk.rs @@ -193,7 +193,7 @@ impl QueryChunk for QueryableParquetChunk { // . We can only compact different sets of files of the same partition concurrently into the same target_level. // We can use the following rules to set order of the chunk of its (compaction_level, target_level) as follows: // . compaction_level < target_level : the order is `created_at` - // . compaction_level == target_level : order is `0` to make sure it is in the front of the ordered list. + // . compaction_level == target_level : order is 0 to make sure it is in the front of the ordered list. // This means that the chunk of `compaction_level == target_level` will be in arbitrary order and will be // fine as long as they are in front of the chunks of `compaction_level < target_level` diff --git a/compactor2/src/components/parquet_file_sink/dedicated.rs b/compactor2/src/components/parquet_file_sink/dedicated.rs index 39dae96eae..85d5e177c6 100644 --- a/compactor2/src/components/parquet_file_sink/dedicated.rs +++ b/compactor2/src/components/parquet_file_sink/dedicated.rs @@ -4,6 +4,7 @@ use async_trait::async_trait; use data_types::{CompactionLevel, ParquetFileParams}; use datafusion::{error::DataFusionError, physical_plan::SendableRecordBatchStream}; use iox_query::exec::{Executor, ExecutorType}; +use iox_time::Time; use crate::partition_info::PartitionInfo; @@ -49,11 +50,16 @@ where stream: SendableRecordBatchStream, partition: Arc, level: CompactionLevel, + max_l0_created_at: Time, ) -> Result, DataFusionError> { let inner = Arc::clone(&self.inner); self.exec .executor(ExecutorType::Reorg) - .spawn(async move { inner.store(stream, partition, level).await }) + .spawn(async move { + inner + .store(stream, partition, level, max_l0_created_at) + .await + }) .await .map_err(|e| DataFusionError::External(e.into()))? } @@ -92,7 +98,11 @@ mod tests { )); let partition = partition_info(); let level = CompactionLevel::FileNonOverlapped; - let err = sink.store(stream, partition, level).await.unwrap_err(); + let max_l0_created_at = Time::from_timestamp_nanos(0); + let err = sink + .store(stream, partition, level, max_l0_created_at) + .await + .unwrap_err(); assert_eq!(err.to_string(), "External error: foo",); } } diff --git a/compactor2/src/components/parquet_file_sink/logging.rs b/compactor2/src/components/parquet_file_sink/logging.rs index 3590569caf..c7b63f1112 100644 --- a/compactor2/src/components/parquet_file_sink/logging.rs +++ b/compactor2/src/components/parquet_file_sink/logging.rs @@ -3,6 +3,7 @@ use std::{fmt::Display, sync::Arc}; use async_trait::async_trait; use data_types::{CompactionLevel, ParquetFileParams}; use datafusion::{error::DataFusionError, physical_plan::SendableRecordBatchStream}; +use iox_time::Time; use observability_deps::tracing::{info, warn}; use crate::partition_info::PartitionInfo; @@ -45,10 +46,11 @@ where stream: SendableRecordBatchStream, partition: Arc, level: CompactionLevel, + max_l0_created_at: Time, ) -> Result, DataFusionError> { let res = self .inner - .store(stream, Arc::clone(&partition), level) + .store(stream, Arc::clone(&partition), level, max_l0_created_at) .await; match &res { Ok(Some(f)) => { diff --git a/compactor2/src/components/parquet_file_sink/mock.rs b/compactor2/src/components/parquet_file_sink/mock.rs index 3c1fb4795e..9cc9ddd324 100644 --- a/compactor2/src/components/parquet_file_sink/mock.rs +++ b/compactor2/src/components/parquet_file_sink/mock.rs @@ -13,6 +13,7 @@ use datafusion::{ physical_plan::SendableRecordBatchStream, }; use futures::TryStreamExt; +use iox_time::Time; use uuid::Uuid; use crate::partition_info::PartitionInfo; @@ -57,6 +58,7 @@ impl ParquetFileSink for MockParquetFileSink { stream: SendableRecordBatchStream, partition: Arc, level: CompactionLevel, + max_l0_created_at: Time, ) -> Result, DataFusionError> { let schema = stream.schema(); let batches: Vec<_> = stream.try_collect().await?; @@ -76,6 +78,7 @@ impl ParquetFileSink for MockParquetFileSink { compaction_level: level, created_at: Timestamp::new(1), column_set: ColumnSet::new(vec![]), + max_l0_created_at: max_l0_created_at.into(), }); guard.push(StoredFile { batches, @@ -118,13 +121,14 @@ mod tests { .as_arrow(); let partition = partition_info(); let level = CompactionLevel::FileNonOverlapped; + let max_l0_created_at = Time::from_timestamp_nanos(1); let stream = Box::pin(RecordBatchStreamAdapter::new( Arc::clone(&schema), futures::stream::empty(), )); assert_eq!( - sink.store(stream, Arc::clone(&partition), level) + sink.store(stream, Arc::clone(&partition), level, max_l0_created_at) .await .unwrap(), None, @@ -137,7 +141,7 @@ mod tests { futures::stream::once(async move { Ok(record_batch_captured) }), )); assert_eq!( - sink.store(stream, Arc::clone(&partition), level) + sink.store(stream, Arc::clone(&partition), level, max_l0_created_at) .await .unwrap(), None, @@ -154,7 +158,7 @@ mod tests { futures::stream::once(async move { Ok(record_batch_captured) }), )); assert_eq!( - sink.store(stream, Arc::clone(&partition), level) + sink.store(stream, Arc::clone(&partition), level, max_l0_created_at) .await .unwrap(), Some(ParquetFileParams { @@ -170,7 +174,8 @@ mod tests { row_count: 1, compaction_level: CompactionLevel::FileNonOverlapped, created_at: Timestamp::new(1), - column_set: ColumnSet::new([]) + column_set: ColumnSet::new([]), + max_l0_created_at: max_l0_created_at.into(), }), ); diff --git a/compactor2/src/components/parquet_file_sink/mod.rs b/compactor2/src/components/parquet_file_sink/mod.rs index 88c960bf82..7a1238a0a4 100644 --- a/compactor2/src/components/parquet_file_sink/mod.rs +++ b/compactor2/src/components/parquet_file_sink/mod.rs @@ -6,6 +6,7 @@ use std::{ use async_trait::async_trait; use data_types::{CompactionLevel, ParquetFileParams}; use datafusion::{error::DataFusionError, physical_plan::SendableRecordBatchStream}; +use iox_time::Time; use crate::partition_info::PartitionInfo; @@ -24,5 +25,6 @@ pub trait ParquetFileSink: Debug + Display + Send + Sync { stream: SendableRecordBatchStream, partition: Arc, level: CompactionLevel, + max_l0_created_at: Time, ) -> Result, DataFusionError>; } diff --git a/compactor2/src/components/parquet_file_sink/object_store.rs b/compactor2/src/components/parquet_file_sink/object_store.rs index b20a9cf524..e3f602a1a9 100644 --- a/compactor2/src/components/parquet_file_sink/object_store.rs +++ b/compactor2/src/components/parquet_file_sink/object_store.rs @@ -3,7 +3,7 @@ use std::{fmt::Display, sync::Arc}; use async_trait::async_trait; use data_types::{CompactionLevel, ParquetFileParams, SequenceNumber, ShardId}; use datafusion::{error::DataFusionError, physical_plan::SendableRecordBatchStream}; -use iox_time::TimeProvider; +use iox_time::{Time, TimeProvider}; use parquet_file::{ metadata::IoxMetadata, serialize::CodecError, @@ -52,6 +52,7 @@ impl ParquetFileSink for ObjectStoreParquetFileSink { stream: SendableRecordBatchStream, partition: Arc, level: CompactionLevel, + max_l0_created_at: Time, ) -> Result, DataFusionError> { let meta = IoxMetadata { object_store_id: Uuid::new_v4(), @@ -66,6 +67,7 @@ impl ParquetFileSink for ObjectStoreParquetFileSink { max_sequence_number: SequenceNumber::new(MAX_SEQUENCE_NUMBER), compaction_level: level, sort_key: partition.sort_key.clone(), + max_l0_created_at, }; // Stream the record batches from the compaction exec, serialize diff --git a/compactor2/src/driver.rs b/compactor2/src/driver.rs index 18309f509b..c18291d863 100644 --- a/compactor2/src/driver.rs +++ b/compactor2/src/driver.rs @@ -3,6 +3,7 @@ use std::{future::Future, num::NonZeroUsize, sync::Arc, time::Duration}; use data_types::{CompactionLevel, ParquetFile, ParquetFileParams, PartitionId}; use datafusion::physical_plan::SendableRecordBatchStream; use futures::{stream::FuturesOrdered, StreamExt, TryFutureExt, TryStreamExt}; +use iox_time::Time; use parquet_file::ParquetFilePath; use tracker::InstrumentedAsyncSemaphore; @@ -115,6 +116,13 @@ async fn try_compact_partition( while let Some(branch) = branches.pop() { let delete_ids = branch.iter().map(|f| f.id).collect::>(); + // compute max_l0_created_at + let max_l0_created_at = branch + .iter() + .map(|f| f.max_l0_created_at) + .max() + .expect("max_l0_created_at should have value"); + // stage files let input_paths: Vec = branch.iter().map(|f| f.into()).collect(); let input_uuids_inpad = scratchpad_ctx.load_to_scratchpad(&input_paths).await; @@ -154,6 +162,7 @@ async fn try_compact_partition( streams, Arc::clone(partition_info), target_level, + max_l0_created_at.into(), Arc::clone(&components), ); @@ -242,6 +251,7 @@ fn stream_into_file_sink( streams: Vec, partition_info: Arc, target_level: CompactionLevel, + max_l0_created_at: Time, components: Arc, ) -> impl Future, Error>> { streams @@ -252,7 +262,7 @@ fn stream_into_file_sink( async move { components .parquet_file_sink - .store(stream, partition_info, target_level) + .store(stream, partition_info, target_level, max_l0_created_at) .await } }) diff --git a/compactor2/src/test_util.rs b/compactor2/src/test_util.rs index 12323bccc2..b76ab55979 100644 --- a/compactor2/src/test_util.rs +++ b/compactor2/src/test_util.rs @@ -8,7 +8,7 @@ use data_types::{ }; use datafusion::arrow::record_batch::RecordBatch; use iox_tests::util::{TestCatalog, TestParquetFileBuilder, TestTable}; -use iox_time::{SystemProvider, TimeProvider}; +use iox_time::TimeProvider; use parquet_file::storage::{ParquetStorage, StorageId}; use schema::sort::SortKey; use uuid::Uuid; @@ -42,6 +42,7 @@ impl ParquetFileBuilder { compaction_level: CompactionLevel::FileNonOverlapped, created_at: Timestamp::new(0), column_set: ColumnSet::new(vec![]), + max_l0_created_at: Timestamp::new(0), }, } } @@ -298,14 +299,13 @@ impl TestSetup { partition_key: partition.partition.partition_key.clone(), }); + let time_provider = Arc::::clone(&catalog.time_provider); let mut parquet_files = vec![]; if with_files { - let time = SystemProvider::new(); - let time_16_minutes_ago = time.minutes_ago(16); - let time_5_minutes_ago = time.minutes_ago(5); - let time_2_minutes_ago = time.minutes_ago(2); - let time_1_minute_ago = time.minutes_ago(1); - let time_now = time.now(); + let time_1_minute_future = time_provider.minutes_into_future(1); + let time_2_minutes_future = time_provider.minutes_into_future(2); + let time_3_minutes_future = time_provider.minutes_into_future(3); + let time_5_minutes_future = time_provider.minutes_into_future(5); // L1 file let lp = vec![ @@ -315,7 +315,8 @@ impl TestSetup { .join("\n"); let builder = TestParquetFileBuilder::default() .with_line_protocol(&lp) - .with_creation_time(time_1_minute_ago) + .with_creation_time(time_3_minutes_future) + .with_max_l0_created_at(time_1_minute_future) .with_compaction_level(CompactionLevel::FileNonOverlapped); // Prev compaction let level_1_file_1_minute_ago = partition.create_parquet_file(builder).await.into(); @@ -329,7 +330,8 @@ impl TestSetup { .join("\n"); let builder = TestParquetFileBuilder::default() .with_line_protocol(&lp) - .with_creation_time(time_16_minutes_ago) + .with_creation_time(time_2_minutes_future) + .with_max_l0_created_at(time_2_minutes_future) .with_compaction_level(CompactionLevel::Initial); let level_0_file_16_minutes_ago = partition.create_parquet_file(builder).await.into(); @@ -342,7 +344,8 @@ impl TestSetup { .join("\n"); let builder = TestParquetFileBuilder::default() .with_line_protocol(&lp) - .with_creation_time(time_5_minutes_ago) + .with_creation_time(time_5_minutes_future) + .with_max_l0_created_at(time_5_minutes_future) .with_compaction_level(CompactionLevel::Initial); let level_0_file_5_minutes_ago = partition.create_parquet_file(builder).await.into(); @@ -356,9 +359,10 @@ impl TestSetup { .join("\n"); let builder = TestParquetFileBuilder::default() .with_line_protocol(&lp) - .with_creation_time(time_1_minute_ago) + .with_creation_time(time_5_minutes_future) + .with_max_l0_created_at(time_3_minutes_future) .with_compaction_level(CompactionLevel::FileNonOverlapped); // Prev compaction - let level_1_file_1_minute_ago_with_duplicates = + let level_1_file_1_minute_ago_with_duplicates: ParquetFile = partition.create_parquet_file(builder).await.into(); // L0 file @@ -367,7 +371,8 @@ impl TestSetup { .with_line_protocol(&lp) .with_min_time(0) .with_max_time(36000) - .with_creation_time(time_now) + .with_creation_time(time_5_minutes_future) + .with_max_l0_created_at(time_5_minutes_future) // Will put the group size between "small" and "large" .with_size_override(50 * 1024 * 1024) .with_compaction_level(CompactionLevel::Initial); @@ -383,7 +388,8 @@ impl TestSetup { .with_line_protocol(&lp) .with_min_time(36001) .with_max_time(136000) - .with_creation_time(time_2_minutes_ago) + .with_creation_time(time_2_minutes_future) + .with_max_l0_created_at(time_2_minutes_future) // Will put the group size two multiples over "large" .with_size_override(180 * 1024 * 1024) .with_compaction_level(CompactionLevel::Initial); @@ -411,7 +417,7 @@ impl TestSetup { Arc::new(object_store::memory::InMemory::new()), StorageId::from("scratchpad"), ), - time_provider: Arc::::clone(&catalog.time_provider), + time_provider, exec: Arc::clone(&catalog.exec), backoff_config: BackoffConfig::default(), partition_concurrency: NonZeroUsize::new(1).unwrap(), diff --git a/data_types/src/lib.rs b/data_types/src/lib.rs index 12ed34583e..836e748c3b 100644 --- a/data_types/src/lib.rs +++ b/data_types/src/lib.rs @@ -360,6 +360,12 @@ impl From for Timestamp { } } +impl From for iox_time::Time { + fn from(time: Timestamp) -> iox_time::Time { + iox_time::Time::from_timestamp_nanos(time.get()) + } +} + impl Add for Timestamp { type Output = Self; @@ -1105,6 +1111,8 @@ pub struct ParquetFile { /// The columns that are present in the table-wide schema are sorted according to the partition /// sort key. The occur in the parquet file according to this order. pub column_set: ColumnSet, + /// the max of created_at of all L0 files needed for file/chunk ordering for deduplication + pub max_l0_created_at: Timestamp, } impl ParquetFile { @@ -1128,6 +1136,7 @@ impl ParquetFile { compaction_level: params.compaction_level, created_at: params.created_at, column_set: params.column_set, + max_l0_created_at: params.max_l0_created_at, } } @@ -1167,6 +1176,8 @@ pub struct ParquetFileParams { pub created_at: Timestamp, /// columns in this file. pub column_set: ColumnSet, + /// the max of created_at of all L0 files + pub max_l0_created_at: Timestamp, } impl From for ParquetFileParams { @@ -1185,6 +1196,7 @@ impl From for ParquetFileParams { compaction_level: value.compaction_level, created_at: value.created_at, column_set: value.column_set, + max_l0_created_at: value.max_l0_created_at, } } } diff --git a/garbage_collector/src/objectstore/checker.rs b/garbage_collector/src/objectstore/checker.rs index f24da3cd00..be4dd7e020 100644 --- a/garbage_collector/src/objectstore/checker.rs +++ b/garbage_collector/src/objectstore/checker.rs @@ -171,6 +171,7 @@ mod tests { compaction_level: CompactionLevel::Initial, created_at: Timestamp::new(1), column_set: ColumnSet::new([ColumnId::new(1), ColumnId::new(2)]), + max_l0_created_at: Timestamp::new(1), }; let parquet_file = repos diff --git a/generated_types/protos/influxdata/iox/catalog/v1/parquet_file.proto b/generated_types/protos/influxdata/iox/catalog/v1/parquet_file.proto index 89f2e7ac51..6e0198d134 100644 --- a/generated_types/protos/influxdata/iox/catalog/v1/parquet_file.proto +++ b/generated_types/protos/influxdata/iox/catalog/v1/parquet_file.proto @@ -38,4 +38,6 @@ message ParquetFile { int64 created_at = 15; // Set of columns within this parquet file. repeated int64 column_set = 16; + // max creation timestamp of all L0s this parquet file is compacted to + int64 max_l0_created_at = 18; } diff --git a/generated_types/protos/influxdata/iox/ingester/v1/parquet_metadata.proto b/generated_types/protos/influxdata/iox/ingester/v1/parquet_metadata.proto index 9b65456b15..dbda394cf6 100644 --- a/generated_types/protos/influxdata/iox/ingester/v1/parquet_metadata.proto +++ b/generated_types/protos/influxdata/iox/ingester/v1/parquet_metadata.proto @@ -52,6 +52,9 @@ message IoxMetadata { // the compaction level of the file int32 compaction_level = 16; + + // max creation time of all L0 files this file is compacted to + google.protobuf.Timestamp max_l0_created_at = 18; } // Sort key of a chunk. diff --git a/influxdb_iox/src/commands/remote/partition.rs b/influxdb_iox/src/commands/remote/partition.rs index ddf9097fe7..f2d3398b7e 100644 --- a/influxdb_iox/src/commands/remote/partition.rs +++ b/influxdb_iox/src/commands/remote/partition.rs @@ -365,6 +365,7 @@ async fn load_parquet_files( .expect("compaction level should be valid"), created_at: Timestamp::new(p.created_at), column_set: ColumnSet::new(p.column_set.into_iter().map(ColumnId::new)), + max_l0_created_at: Timestamp::new(p.max_l0_created_at), }; repos.parquet_files().create(params).await? @@ -579,6 +580,7 @@ mod tests { compaction_level: CompactionLevel::Initial as i32, created_at: created_at.get(), column_set: vec![1, 2], + max_l0_created_at: created_at.get(), }], ) .await @@ -603,6 +605,7 @@ mod tests { compaction_level: CompactionLevel::Initial, created_at, column_set: ColumnSet::new([ColumnId::new(1), ColumnId::new(2)]), + max_l0_created_at: created_at, }]; assert_eq!(expected, files); } diff --git a/influxdb_iox/tests/end_to_end_cases/cli.rs b/influxdb_iox/tests/end_to_end_cases/cli.rs index 4492ed4dbe..712f21db79 100644 --- a/influxdb_iox/tests/end_to_end_cases/cli.rs +++ b/influxdb_iox/tests/end_to_end_cases/cli.rs @@ -405,14 +405,14 @@ async fn write_and_query() { .arg("../test_fixtures/lineproto/air_and_water.lp") // gzipped line protocol ('m0') .arg("../test_fixtures/lineproto/read_filter.lp.gz") - // iox formatted parquet ('cpu' measurement) + // iox formatted parquet ('cpu' measurement) .arg("../test_fixtures/cpu.parquet") .assert() .success() // this number is the total size of // uncompressed line protocol stored in all // three files - .stdout(predicate::str::contains("1137058 Bytes OK")); + .stdout(predicate::str::contains("889317 Bytes OK")); } .boxed() })), diff --git a/ingester/src/buffer_tree/namespace.rs b/ingester/src/buffer_tree/namespace.rs index b2865a27b5..b8d96c8b35 100644 --- a/ingester/src/buffer_tree/namespace.rs +++ b/ingester/src/buffer_tree/namespace.rs @@ -479,6 +479,7 @@ mod tests { compaction_level: CompactionLevel::Initial, created_at: Timestamp::new(1), column_set: ColumnSet::new([ColumnId::new(1), ColumnId::new(2)]), + max_l0_created_at: Timestamp::new(1), }; repos .parquet_files() diff --git a/ingester/src/data.rs b/ingester/src/data.rs index ef0f1f9182..42b0bfc60f 100644 --- a/ingester/src/data.rs +++ b/ingester/src/data.rs @@ -450,9 +450,10 @@ impl Persister for IngesterData { let object_store_id = Uuid::new_v4(); // Construct the metadata for this parquet file. + let time_now = SystemProvider::new().now(); let iox_metadata = IoxMetadata { object_store_id, - creation_timestamp: SystemProvider::new().now(), + creation_timestamp: time_now, shard_id, namespace_id, namespace_name: Arc::clone(&*namespace.namespace_name().get().await), @@ -463,6 +464,7 @@ impl Persister for IngesterData { max_sequence_number: batch_sequence_number_range.inclusive_max().unwrap(), compaction_level: CompactionLevel::Initial, sort_key: Some(data_sort_key), + max_l0_created_at: time_now, }; // Save the compacted data to a parquet file in object storage. @@ -1120,7 +1122,7 @@ mod tests { // different, the file may change slightly from time to time // // https://github.com/influxdata/influxdb_iox/issues/5434 - let expected_size = 1252; + let expected_size = 1265; let allowable_delta = 10; let size_delta = (pf.file_size_bytes - expected_size).abs(); assert!( diff --git a/ingester2/src/persist/mod.rs b/ingester2/src/persist/mod.rs index 824e21fc30..d636098bea 100644 --- a/ingester2/src/persist/mod.rs +++ b/ingester2/src/persist/mod.rs @@ -13,7 +13,7 @@ mod tests { use std::{sync::Arc, time::Duration}; use assert_matches::assert_matches; - use data_types::{CompactionLevel, ParquetFile, PartitionKey, ShardId}; + use data_types::{CompactionLevel, ParquetFile, PartitionKey, ShardId, SequenceNumber}; use dml::DmlOperation; use futures::TryStreamExt; use iox_catalog::{ @@ -259,18 +259,21 @@ mod tests { row_count, compaction_level, file_size_bytes, + created_at, + max_l0_created_at, .. }] => { + assert_eq!(created_at.get(), max_l0_created_at.get()); + assert_eq!(got_namespace_id, namespace_id); assert_eq!(got_table_id, table_id); assert_eq!(got_partition_id, partition_id); + assert_eq!(max_sequence_number, SequenceNumber::new(0)); assert_eq!(row_count, 1); assert_eq!(compaction_level, CompactionLevel::Initial); - assert_eq!(max_sequence_number.get(), 0); // Unused, dummy value - (object_store_id, file_size_bytes) } ); @@ -406,9 +409,13 @@ mod tests { row_count, compaction_level, file_size_bytes, + created_at, + max_l0_created_at, .. }] => { + assert_eq!(created_at.get(), max_l0_created_at.get()); + assert_eq!(got_namespace_id, namespace_id); assert_eq!(got_table_id, table_id); assert_eq!(got_partition_id, partition_id); diff --git a/ingester2/src/persist/worker.rs b/ingester2/src/persist/worker.rs index c32d2dcc8d..8f9f2ca1df 100644 --- a/ingester2/src/persist/worker.rs +++ b/ingester2/src/persist/worker.rs @@ -253,9 +253,10 @@ where ); // Construct the metadata for this parquet file. + let time_now = SystemProvider::new().now(); let iox_metadata = IoxMetadata { object_store_id, - creation_timestamp: SystemProvider::new().now(), + creation_timestamp: time_now, shard_id: ctx.transition_shard_id(), namespace_id: ctx.namespace_id(), namespace_name: Arc::clone(&*ctx.namespace_name().get().await), @@ -266,6 +267,7 @@ where max_sequence_number: SequenceNumber::new(0), // TODO: not ordered! compaction_level: CompactionLevel::Initial, sort_key: Some(data_sort_key), + max_l0_created_at: time_now, }; // Save the compacted data to a parquet file in object storage. diff --git a/iox_catalog/migrations/20230125103737_create_max_l0_crrated_at.sql b/iox_catalog/migrations/20230125103737_create_max_l0_crrated_at.sql new file mode 100644 index 0000000000..4bddbd14d9 --- /dev/null +++ b/iox_catalog/migrations/20230125103737_create_max_l0_crrated_at.sql @@ -0,0 +1,3 @@ +ALTER TABLE + IF EXISTS parquet_file + ADD COLUMN max_l0_created_at BIGINT NOT NULL DEFAULT 0; \ No newline at end of file diff --git a/iox_catalog/src/interface.rs b/iox_catalog/src/interface.rs index 1749891df4..65d04e04a8 100644 --- a/iox_catalog/src/interface.rs +++ b/iox_catalog/src/interface.rs @@ -2114,6 +2114,7 @@ pub(crate) mod test_helpers { let min_time = Timestamp::new(10); let max_time = Timestamp::new(20); let max_sequence_number = SequenceNumber::new(140); + let max_l0_created_at = Timestamp::new(0); let parquet_file_params = ParquetFileParams { shard_id: shard.id, @@ -2129,6 +2130,7 @@ pub(crate) mod test_helpers { compaction_level: CompactionLevel::Initial, created_at: Timestamp::new(1), column_set: ColumnSet::new([ColumnId::new(1), ColumnId::new(2)]), + max_l0_created_at, }; let parquet_file = repos .parquet_files() @@ -2345,6 +2347,7 @@ pub(crate) mod test_helpers { compaction_level: CompactionLevel::Initial, created_at: Timestamp::new(1), column_set: ColumnSet::new([ColumnId::new(1), ColumnId::new(2)]), + max_l0_created_at: Timestamp::new(1), }; let parquet_file = repos .parquet_files() @@ -2863,6 +2866,7 @@ pub(crate) mod test_helpers { compaction_level: CompactionLevel::Initial, created_at: Timestamp::new(1), column_set: ColumnSet::new([ColumnId::new(1), ColumnId::new(2)]), + max_l0_created_at: Timestamp::new(1), }; let parquet_file = repos @@ -3007,6 +3011,7 @@ pub(crate) mod test_helpers { compaction_level: CompactionLevel::Initial, created_at: Timestamp::new(1), column_set: ColumnSet::new([ColumnId::new(1), ColumnId::new(2)]), + max_l0_created_at: Timestamp::new(1), }; let parquet_file = repos .parquet_files() @@ -3198,6 +3203,7 @@ pub(crate) mod test_helpers { .await .unwrap(); + let time_now = Timestamp::from(catalog.time_provider().now()); let time_five_hour_ago = Timestamp::from(catalog.time_provider().hours_ago(5)); let time_8_hours_ago = Timestamp::from(catalog.time_provider().hours_ago(8)); let time_38_hour_ago = Timestamp::from(catalog.time_provider().hours_ago(38)); @@ -3270,6 +3276,7 @@ pub(crate) mod test_helpers { compaction_level: CompactionLevel::Initial, created_at: time_38_hour_ago, column_set: ColumnSet::new([ColumnId::new(1), ColumnId::new(2)]), + max_l0_created_at: time_now, }; let delete_l0_file = repos .parquet_files() @@ -3903,6 +3910,7 @@ pub(crate) mod test_helpers { compaction_level: CompactionLevel::Initial, created_at: time_three_hour_ago, column_set: ColumnSet::new([ColumnId::new(1), ColumnId::new(2)]), + max_l0_created_at: time_now, }; // create a deleted L0 file that was created 3 hours ago @@ -4319,6 +4327,7 @@ pub(crate) mod test_helpers { compaction_level: CompactionLevel::Initial, created_at: time_now, column_set: ColumnSet::new([ColumnId::new(1), ColumnId::new(2)]), + max_l0_created_at: time_now, }; let delete_l0_file = repos .parquet_files() @@ -4716,6 +4725,7 @@ pub(crate) mod test_helpers { compaction_level: CompactionLevel::FileNonOverlapped, created_at: time_now, column_set: ColumnSet::new([ColumnId::new(1), ColumnId::new(2)]), + max_l0_created_at: time_now, }; let delete_l1_file = repos .parquet_files() @@ -4932,6 +4942,7 @@ pub(crate) mod test_helpers { compaction_level: CompactionLevel::Initial, created_at: Timestamp::new(1), column_set: ColumnSet::new([ColumnId::new(1), ColumnId::new(2)]), + max_l0_created_at: Timestamp::new(1), }; let parquet_file = repos @@ -5053,6 +5064,7 @@ pub(crate) mod test_helpers { compaction_level: CompactionLevel::Initial, created_at: Timestamp::new(1), column_set: ColumnSet::new([ColumnId::new(1), ColumnId::new(2)]), + max_l0_created_at: Timestamp::new(1), }; let parquet_file = repos .parquet_files() @@ -5179,6 +5191,7 @@ pub(crate) mod test_helpers { compaction_level: CompactionLevel::Initial, created_at: Timestamp::new(1), column_set: ColumnSet::new([ColumnId::new(1), ColumnId::new(2)]), + max_l0_created_at: Timestamp::new(1), }; let p1 = repos .parquet_files() @@ -5364,6 +5377,7 @@ pub(crate) mod test_helpers { compaction_level: CompactionLevel::Initial, created_at: Timestamp::new(1), column_set: ColumnSet::new([ColumnId::new(1), ColumnId::new(2)]), + max_l0_created_at: Timestamp::new(1), }; let p1_n1 = repos .parquet_files() @@ -5499,6 +5513,7 @@ pub(crate) mod test_helpers { compaction_level: CompactionLevel::Initial, created_at: Timestamp::new(1), column_set: ColumnSet::new([ColumnId::new(1), ColumnId::new(2)]), + max_l0_created_at: Timestamp::new(1), }; let p1_n2 = repos .parquet_files() diff --git a/iox_catalog/src/postgres.rs b/iox_catalog/src/postgres.rs index 34b95316db..5fc103b9f9 100644 --- a/iox_catalog/src/postgres.rs +++ b/iox_catalog/src/postgres.rs @@ -1689,6 +1689,7 @@ impl ParquetFileRepo for PostgresTxn { compaction_level, created_at, column_set, + max_l0_created_at, } = parquet_file_params; let rec = sqlx::query_as::<_, ParquetFile>( @@ -1696,8 +1697,8 @@ impl ParquetFileRepo for PostgresTxn { INSERT INTO parquet_file ( shard_id, table_id, partition_id, object_store_id, max_sequence_number, min_time, max_time, file_size_bytes, - row_count, compaction_level, created_at, namespace_id, column_set ) -VALUES ( $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13 ) + row_count, compaction_level, created_at, namespace_id, column_set, max_l0_created_at ) +VALUES ( $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14 ) RETURNING *; "#, ) @@ -1714,6 +1715,7 @@ RETURNING *; .bind(created_at) // $11 .bind(namespace_id) // $12 .bind(column_set) // $13 + .bind(max_l0_created_at) // $14 .fetch_one(&mut self.inner) .await .map_err(|e| { @@ -1776,7 +1778,7 @@ RETURNING *; r#" SELECT id, shard_id, namespace_id, table_id, partition_id, object_store_id, max_sequence_number, min_time, max_time, to_delete, file_size_bytes, - row_count, compaction_level, created_at, column_set + row_count, compaction_level, created_at, column_set, max_l0_created_at FROM parquet_file WHERE shard_id = $1 AND max_sequence_number > $2 @@ -1802,7 +1804,8 @@ SELECT parquet_file.id, parquet_file.shard_id, parquet_file.namespace_id, parquet_file.table_id, parquet_file.partition_id, parquet_file.object_store_id, parquet_file.max_sequence_number, parquet_file.min_time, parquet_file.max_time, parquet_file.to_delete, parquet_file.file_size_bytes, - parquet_file.row_count, parquet_file.compaction_level, parquet_file.created_at, parquet_file.column_set + parquet_file.row_count, parquet_file.compaction_level, parquet_file.created_at, parquet_file.column_set, + parquet_file.max_l0_created_at FROM parquet_file INNER JOIN table_name on table_name.id = parquet_file.table_id WHERE table_name.namespace_id = $1 @@ -1822,7 +1825,7 @@ WHERE table_name.namespace_id = $1 r#" SELECT id, shard_id, namespace_id, table_id, partition_id, object_store_id, max_sequence_number, min_time, max_time, to_delete, file_size_bytes, - row_count, compaction_level, created_at, column_set + row_count, compaction_level, created_at, column_set, max_l0_created_at FROM parquet_file WHERE table_id = $1 AND to_delete IS NULL; "#, @@ -1882,7 +1885,7 @@ RETURNING id; r#" SELECT id, shard_id, namespace_id, table_id, partition_id, object_store_id, max_sequence_number, min_time, max_time, to_delete, file_size_bytes, - row_count, compaction_level, created_at, column_set + row_count, compaction_level, created_at, column_set, max_l0_created_at FROM parquet_file WHERE parquet_file.shard_id = $1 AND parquet_file.compaction_level = $2 @@ -1909,7 +1912,7 @@ WHERE parquet_file.shard_id = $1 r#" SELECT id, shard_id, namespace_id, table_id, partition_id, object_store_id, max_sequence_number, min_time, max_time, to_delete, file_size_bytes, - row_count, compaction_level, created_at, column_set + row_count, compaction_level, created_at, column_set, max_l0_created_at FROM parquet_file WHERE parquet_file.shard_id = $1 AND parquet_file.table_id = $2 @@ -2115,7 +2118,7 @@ LIMIT $4; r#" SELECT id, shard_id, namespace_id, table_id, partition_id, object_store_id, max_sequence_number, min_time, max_time, to_delete, file_size_bytes, - row_count, compaction_level, created_at, column_set + row_count, compaction_level, created_at, column_set, max_l0_created_at FROM parquet_file WHERE parquet_file.partition_id = $1 AND parquet_file.to_delete IS NULL; @@ -2250,7 +2253,7 @@ WHERE table_id = $1 r#" SELECT id, shard_id, namespace_id, table_id, partition_id, object_store_id, max_sequence_number, min_time, max_time, to_delete, file_size_bytes, - row_count, compaction_level, created_at, column_set + row_count, compaction_level, created_at, column_set, max_l0_created_at FROM parquet_file WHERE object_store_id = $1; "#, @@ -3148,6 +3151,7 @@ mod tests { compaction_level: CompactionLevel::Initial, // level of file of new writes created_at: time_now, column_set: ColumnSet::new([ColumnId::new(1), ColumnId::new(2)]), + max_l0_created_at: time_now, }; let f1 = postgres .repositories() diff --git a/iox_tests/src/util.rs b/iox_tests/src/util.rs index cdcae89a6a..e3d5576d47 100644 --- a/iox_tests/src/util.rs +++ b/iox_tests/src/util.rs @@ -619,6 +619,7 @@ impl TestPartition { to_delete, object_store_id, row_count, + max_l0_created_at, } = builder; let record_batch = record_batch.expect("A record batch is required"); @@ -653,6 +654,7 @@ impl TestPartition { max_sequence_number, compaction_level: CompactionLevel::Initial, sort_key: Some(sort_key.clone()), + max_l0_created_at: Time::from_timestamp_nanos(max_l0_created_at), }; let real_file_size_bytes = create_parquet_file( ParquetStorage::new( @@ -678,6 +680,7 @@ impl TestPartition { to_delete, object_store_id: Some(object_store_id), row_count: None, // will be computed from the record batch again + max_l0_created_at, }; let result = self.create_parquet_file_catalog_record(builder).await; @@ -704,6 +707,7 @@ impl TestPartition { to_delete, object_store_id, row_count, + max_l0_created_at, .. } = builder; @@ -744,6 +748,7 @@ impl TestPartition { created_at: Timestamp::new(creation_time), compaction_level, column_set, + max_l0_created_at: Timestamp::new(max_l0_created_at), }; let mut repos = self.catalog.catalog.repositories().await; @@ -789,6 +794,7 @@ pub struct TestParquetFileBuilder { to_delete: bool, object_store_id: Option, row_count: Option, + max_l0_created_at: i64, } impl Default for TestParquetFileBuilder { @@ -807,6 +813,7 @@ impl Default for TestParquetFileBuilder { to_delete: false, object_store_id: None, row_count: None, + max_l0_created_at: 1, } } } @@ -863,6 +870,12 @@ impl TestParquetFileBuilder { self } + /// specify max creation time of all L0 this file was created from + pub fn with_max_l0_created_at(mut self, time: iox_time::Time) -> Self { + self.max_l0_created_at = time.timestamp_nanos(); + self + } + /// Specify the compaction level for the parquet file metadata. pub fn with_compaction_level(mut self, compaction_level: CompactionLevel) -> Self { self.compaction_level = compaction_level; diff --git a/iox_time/src/lib.rs b/iox_time/src/lib.rs index 003f224764..55520b0dde 100644 --- a/iox_time/src/lib.rs +++ b/iox_time/src/lib.rs @@ -193,6 +193,12 @@ pub trait TimeProvider: std::fmt::Debug + Send + Sync + 'static { /// Sleep until given time. fn sleep_until(&self, t: Time) -> Pin + Send + 'static>>; + /// Return a time that is the specified number of minutes in the future relative to this + /// provider's `now`. + fn minutes_into_future(&self, minutes: u64) -> Time { + self.now() + Duration::from_secs(60 * minutes) + } + /// Return a time that is the specified number of minutes in the past relative to this /// provider's `now`. fn minutes_ago(&self, minutes_ago: u64) -> Time { @@ -611,6 +617,18 @@ mod test { assert_eq!(min_ago.to_rfc3339(), ago); } + #[test] + fn test_minutes_into_future() { + let now = "2022-07-07T00:00:00+00:00"; + let future = "2022-07-07T00:10:00+00:00"; + + let provider = MockProvider::new(Time::from_rfc3339(now).unwrap()); + + let min_future = provider.minutes_into_future(10); + assert_eq!(min_future, Time::from_timestamp_nanos(1657152600000000000)); + assert_eq!(min_future.to_rfc3339(), future); + } + #[test] fn test_hours_ago() { let now = "2022-07-07T00:00:00+00:00"; diff --git a/parquet_file/src/metadata.rs b/parquet_file/src/metadata.rs index bd02b7bbbb..942a00fdce 100644 --- a/parquet_file/src/metadata.rs +++ b/parquet_file/src/metadata.rs @@ -297,6 +297,12 @@ pub struct IoxMetadata { /// Sort key of this chunk pub sort_key: Option, + + /// Max timestamp of creation timestamp of L0 files + /// If this metadata is for an L0 file, this value will be the same as the `creation_timestamp` + /// If this metadata is for an L1/L2 file, this value will be the max of all L0 files + /// that are compacted into this file + pub max_l0_created_at: Time, } impl IoxMetadata { @@ -341,6 +347,7 @@ impl IoxMetadata { max_sequence_number: self.max_sequence_number.get(), sort_key, compaction_level: self.compaction_level as i32, + max_l0_created_at: Some(self.max_l0_created_at.date_time().into()), }; let mut buf = Vec::new(); @@ -359,6 +366,8 @@ impl IoxMetadata { // extract creation timestamp let creation_timestamp = decode_timestamp_from_field(proto_msg.creation_timestamp, "creation_timestamp")?; + let max_l0_created_at = + decode_timestamp_from_field(proto_msg.max_l0_created_at, "max_l0_created_at")?; // extract strings let namespace_name = Arc::from(proto_msg.namespace_name.as_ref()); @@ -395,6 +404,7 @@ impl IoxMetadata { compaction_level: proto_msg.compaction_level, }, )?, + max_l0_created_at, }) } @@ -416,6 +426,7 @@ impl IoxMetadata { max_sequence_number: SequenceNumber::new(1), compaction_level: CompactionLevel::Initial, sort_key: None, + max_l0_created_at: Time::from_timestamp_nanos(creation_timestamp_ns), } } @@ -503,6 +514,7 @@ impl IoxMetadata { row_count: row_count.try_into().expect("row count overflows i64"), created_at: Timestamp::from(self.creation_timestamp), column_set: ColumnSet::new(columns), + max_l0_created_at: Timestamp::from(self.max_l0_created_at), } } @@ -1001,9 +1013,11 @@ mod tests { let sort_key = SortKeyBuilder::new().with_col("sort_col").build(); + let create_time = Time::from_timestamp(3234, 0).unwrap(); + let iox_metadata = IoxMetadata { object_store_id, - creation_timestamp: Time::from_timestamp(3234, 0).unwrap(), + creation_timestamp: create_time, namespace_id: NamespaceId::new(2), namespace_name: Arc::from("hi"), shard_id: ShardId::new(1), @@ -1014,6 +1028,7 @@ mod tests { max_sequence_number: SequenceNumber::new(6), compaction_level: CompactionLevel::Initial, sort_key: Some(sort_key), + max_l0_created_at: create_time, }; let proto = iox_metadata.to_protobuf().unwrap(); @@ -1038,6 +1053,7 @@ mod tests { max_sequence_number: SequenceNumber::new(11), compaction_level: CompactionLevel::FileNonOverlapped, sort_key: None, + max_l0_created_at: Time::from_timestamp_nanos(42), }; let array = StringArray::from_iter([Some("bananas")]); diff --git a/parquet_file/src/serialize.rs b/parquet_file/src/serialize.rs index 51831cfa9e..9b076166a8 100644 --- a/parquet_file/src/serialize.rs +++ b/parquet_file/src/serialize.rs @@ -219,6 +219,7 @@ mod tests { max_sequence_number: SequenceNumber::new(11), compaction_level: CompactionLevel::FileNonOverlapped, sort_key: None, + max_l0_created_at: Time::from_timestamp_nanos(42), }; let batch = RecordBatch::try_from_iter([("a", to_string_array(&["value"]))]).unwrap(); diff --git a/parquet_file/src/storage.rs b/parquet_file/src/storage.rs index 51f15b821a..8211f2c145 100644 --- a/parquet_file/src/storage.rs +++ b/parquet_file/src/storage.rs @@ -577,6 +577,7 @@ mod tests { max_sequence_number: SequenceNumber::new(11), compaction_level: CompactionLevel::FileNonOverlapped, sort_key: None, + max_l0_created_at: Time::from_timestamp_nanos(42), } } diff --git a/parquet_file/tests/metadata.rs b/parquet_file/tests/metadata.rs index b0b15ca5df..3038cce364 100644 --- a/parquet_file/tests/metadata.rs +++ b/parquet_file/tests/metadata.rs @@ -62,6 +62,7 @@ async fn test_decoded_iox_metadata() { max_sequence_number: SequenceNumber::new(11), compaction_level: CompactionLevel::FileNonOverlapped, sort_key: None, + max_l0_created_at: Time::from_timestamp_nanos(42), }; let mut schema_builder = SchemaBuilder::new(); @@ -203,6 +204,7 @@ async fn test_empty_parquet_file_panic() { max_sequence_number: SequenceNumber::new(11), compaction_level: CompactionLevel::FileNonOverlapped, sort_key: None, + max_l0_created_at: Time::from_timestamp_nanos(42), }; let batch = RecordBatch::try_from_iter(data).unwrap(); @@ -297,6 +299,7 @@ async fn test_decoded_many_columns_with_null_cols_iox_metadata() { max_sequence_number: SequenceNumber::new(11), compaction_level: CompactionLevel::FileNonOverlapped, sort_key: Some(sort_key), + max_l0_created_at: Time::from_timestamp_nanos(42), }; let mut schema_builder = SchemaBuilder::new(); @@ -385,6 +388,7 @@ async fn test_derive_parquet_file_params() { max_sequence_number: SequenceNumber::new(11), compaction_level: CompactionLevel::FileNonOverlapped, sort_key: None, + max_l0_created_at: Time::from_timestamp_nanos(1234), }; // Build a schema that contains the IOx metadata, ensuring it is correctly @@ -433,6 +437,7 @@ async fn test_derive_parquet_file_params() { assert_eq!(catalog_data.row_count, 3); assert_eq!(catalog_data.min_time, Timestamp::new(1646917692000000000)); assert_eq!(catalog_data.max_time, Timestamp::new(1653311292000000000)); + assert_eq!(catalog_data.max_l0_created_at, Timestamp::new(1234)); } fn to_string_array(strs: &[&str]) -> ArrayRef { diff --git a/querier/src/cache/parquet_file.rs b/querier/src/cache/parquet_file.rs index 6bd8743b54..79d31b4995 100644 --- a/querier/src/cache/parquet_file.rs +++ b/querier/src/cache/parquet_file.rs @@ -391,8 +391,8 @@ mod tests { partition.create_parquet_file(builder).await; let table_id = table.table.id; - let single_file_size = 224; - let two_file_size = 408; + let single_file_size = 232; + let two_file_size = 424; assert!(single_file_size < two_file_size); let cache = make_cache(&catalog); diff --git a/querier/src/namespace/mod.rs b/querier/src/namespace/mod.rs index b49450d846..c984527c15 100644 --- a/querier/src/namespace/mod.rs +++ b/querier/src/namespace/mod.rs @@ -105,10 +105,10 @@ impl QuerierNamespace { exec: Arc, ingester_connection: Option>, sharder: Arc>>, - rpc_write: bool, + write_rpc: bool, ) -> Self { let time_provider = catalog_cache.time_provider(); - let chunk_adapter = Arc::new(ChunkAdapter::new(catalog_cache, metric_registry, rpc_write)); + let chunk_adapter = Arc::new(ChunkAdapter::new(catalog_cache, metric_registry, write_rpc)); let query_log = Arc::new(QueryLog::new(10, time_provider)); let prune_metrics = Arc::new(PruneMetrics::new(&chunk_adapter.metric_registry())); diff --git a/querier/src/parquet/creation.rs b/querier/src/parquet/creation.rs index ac680b5663..5068577365 100644 --- a/querier/src/parquet/creation.rs +++ b/querier/src/parquet/creation.rs @@ -212,7 +212,7 @@ impl ChunkAdapter { let chunk_id = ChunkId::from(Uuid::from_u128(parquet_file.id.get() as _)); let order = if self.rpc_write { - ChunkOrder::new(parquet_file.created_at.get()) + ChunkOrder::new(parquet_file.max_l0_created_at.get()) } else { ChunkOrder::new(parquet_file.max_sequence_number.get()) }; diff --git a/service_grpc_catalog/src/lib.rs b/service_grpc_catalog/src/lib.rs index d8a2e44aac..7b3302e1a8 100644 --- a/service_grpc_catalog/src/lib.rs +++ b/service_grpc_catalog/src/lib.rs @@ -182,6 +182,7 @@ fn to_parquet_file(p: data_types::ParquetFile) -> ParquetFile { compaction_level: p.compaction_level as i32, created_at: p.created_at.get(), column_set: p.column_set.iter().map(|id| id.get()).collect(), + max_l0_created_at: p.max_l0_created_at.get(), } } @@ -257,6 +258,7 @@ mod tests { compaction_level: CompactionLevel::Initial, created_at: Timestamp::new(2343), column_set: ColumnSet::new([ColumnId::new(1), ColumnId::new(2)]), + max_l0_created_at: Timestamp::new(2343), }; let p2params = ParquetFileParams { object_store_id: Uuid::new_v4(), diff --git a/service_grpc_object_store/src/lib.rs b/service_grpc_object_store/src/lib.rs index 9ce251047c..89e1a69e34 100644 --- a/service_grpc_object_store/src/lib.rs +++ b/service_grpc_object_store/src/lib.rs @@ -154,6 +154,7 @@ mod tests { compaction_level: CompactionLevel::Initial, created_at: Timestamp::new(2343), column_set: ColumnSet::new([ColumnId::new(1), ColumnId::new(2)]), + max_l0_created_at: Timestamp::new(2343), }; p1 = repos.parquet_files().create(p1params).await.unwrap(); diff --git a/test_fixtures/cpu.parquet b/test_fixtures/cpu.parquet index 86cae861b61b39c3730b4038e82d7b8a9d77edf6..0638da329728e8b6e2729308a9c39916a1e6fa84 100644 GIT binary patch literal 1967 zcmcIl&1)N15P#aps$+C;Lf-2x=#UgFT8Q;oDRD>(Z}qj3t&f#s>w`kO+TBRjhgXr- zUe$;G1v#eI96~9j^xB?#=rM;9dgxzKeC(yrnI~CsViKD|2XezFLfsPm;?26B_{bf|Q#MT1aR!^UiL-e>(wkcG$jFG8&d4;T zZsgE*FEn-M(h&J_$a}|G={Zw3E(ZERVlSoesTBU(7iwOf_DYrCu?5mw`|;cFln-HG zKV|2(uzKc^m)f%Z8@LMUU+lr}9*k_!_c!c?FZ?PHeg<1yMxBpWZb-^}vkO6q!Uz%O z%jZFs$e;m2 zgBy}~bQ@$RgdjZ6w{~XnZxC5r$MEUR&_low!Xzc6JPd#D3tjo)vY*!IE%tnsMo9QK z|LeH_`Gv4w*Qm^1uF@Ai68`0b(0^Gl`*V#l_Sbd#Ibly%m_(Gja@r( zMto={@?I;k=c#=s+e)f&ozJ#2{i0Xy7u|Ap;FeF+bk1|MOt;cEwY?VConoe2j~NhK z;DsXB3%gvuQ0Fru2YJ0-Z9JH7TmF3|iZs(RoO2yMq;1YUp6T)unm_^x21lI_YWJd1 zPe8Ni+2uLTcZ9!%XP{lKc6lB(wahUw=2OuQR70kE$jiu(G6!?H;03G)DL@Jmbrw42 z;asbaJB_`$Nwk72n~NStqtHB%Fm_Ng8e5-M`{RBmF^hK^smGQ!^xBV{v{e|o?Petg zw++p~3|t*|lHGK5R5|Ww`);QX7{uz!d0?)K^N4z?sCPDxs_J4sEYBa+;C?;qRdqVa zN~|oPSPw65;s09T#aTnOFg&)iPP&!Ynp%zcQ8QUjnT0**tvzlxW&@b##L`OR&dB6B z%=Z=^{4u+!4!K(78s{yPIgb}P^a+=l!!z_FIuR@@IM{M|(Zu`=AhB042`T1-eS(B{ zf%$+0LuEb=?g#MmtZ2p8yYRgkzX*j#1|;zNsJ(k%`9t)-cV6%lSs5FXCr7rsIho!* YwkOjA=kRti9*IR_?!6CT21{vJlA-KB*cXtiJ-CY6%cX!uh&-=aaTKik) ztbOhC=j`1-`s(hPuBYm$s=KTeg_T)>u)tzyU^$d5kQ@lW1^{TAnBM@BCeVO+Xe0mt z0Je8wbhdLgw1)lrhJ_W5g_Vt+gA?|z(?>4uf1dN814vPZ5!hg~ggTSQyUqO&Nf8iX z#59ND-s!8S0s%l(peY0J5*>&Mga^VCYQjPY^MH{4lRSj_-zzIY0bn3Em*`M%_5gb* zduV$Ydsurodw6>UdqjIAdt`eQdsKTgdvtpYdrTneJ5B&R<~tA!gb)xF?-Ctv7Y6Sc zntB$B8u7nd1R||8P*)ZR{cp0;K>^4hE6N#yMeOXX7!@7u7`d6uos3zTYz&#eb~a!` zCub&G6K9AJxIpTE7lQ16`wr-T|Cj$MIp9Aehe-dQlEIqWIXV9=S~WD@EHw2hEcGr7 zq2~Xd@c*Z^Asga{0L+D#1=|1Hmhhtj;8zWVyLHXbNC(Q)MyDFY-}jWqO;Vi@c#sww z9;LGEbT4a-oSj+GYj+5rYf`(8?8QO`P zSPEvXZvdJ!+rmW}C1&LX?{$m8omW(Llx7aXLSg?iNpTXNp$|q57Tv%Q2;*ci#yvt)Fc3D2v{K6jm z|UKM$Swq|g4Bqh{AQllHhW>^rhf=Dq^lK)Qyoa==w(njz`2dE)Jgxge14m^5;|AB2*$Q^IKhEyhEB6ty@}z zXgHyo>imt#l3J$$S&FWf1tK21fma~RN<7V1=&aHYm>{3*kU_NfY6Me{W(JAz8yLlr zGY)ncjo9B$}IQJ(43g^VTHvN<4N+M)yoL|X_wt^y6is3@E>*Su(8>HzxoDKQz{G;Gm zFpYfRl_ds$Hr|V>Tj*zBMs>w!_W1kp5^MA=HJC`|rCB9hwJdK3pJYbz&a+L36Jg2H zd-ow0zP$7M9^j?X1ANCysMI~S70rVB2a&^Z9^GC!L2PF0o7DO|CH=L%s+58XP0bSz znS0V`X{9fUP*7NC(=}NVZF~?UPzy*GBWY+;;9dmG^-HZ}W`a%q3NN8QiE*6{9@&;8 zO;Coadhn=^q`(m~BXgGmU;0o{6ua{AhQ+1dajYZuUFtS}BTy99OTo1!E| zi-rzs@g^lHML`?FO5*`>$fV~}y8o)$9b`5^O8L1zC<&9YF8;r zIJ>v8?vFBJ#rP}~qsb~y%^p+w!4?soY)NCxotAF$EU9+~1ul9?X;aFqUB=*q+Q*>s4ZWqm&vu$^UGnp(?YdO5)zW9Sa&i6636Ox1@`w?u&e zhu0&ablZ9HkU=`R#k!;Y#-Ydr=Th4IlQ%57u+TsSnPvKvY(?REC=ELQE!d9Df{SM=cZ=ci< zG%U1j-SN&Mbmvz5zPY-^oj!8w@aK}7nywyPQPLsnQ+3idoTyw(O9sTod6Y{HLg93F zh&SmUWZU87!A|k9yE9=rVPC?|xHnD?j43o;pK_M}spQ^}N--Hc&zZf155&`qx9gaz zyV;$&ZgPzy_{EBJup~xHFRo5aq{B4?#EalP4nyVRKn8557%^!|_U3PDjdQc4Stt?8 zz{4NFqW`Q*b%;^0Ce&-RkDb_nb9&8AzTVXHjEKFShJ?(RrlO#F#EWUKs zppG!#DECrfpk5Btsy2puVm;Axwzc^sMlF^+kMD}!>2rxN}3BMpGAB$Pjzt3;$(mzEC**X`z zq8KyWUv3kw?QeG$_M}OEFUQZ9dU=i_B+W<8`z2!Q(4sGG7UPZkw45C1i6~=VR&461 zuyG?CfL4nVNSvPTnq~ia1G@C2<9xjI%HI z^CS5k#~XdUz1$a#4yF!TrX!}qS&}zr7!EwLPsD1vH*%p+GO0<{z%GWGj8QW>WCN@u zYhho}g+1f)C5a7V<$F?*MvBCfspbZIy zUff^&F(?srkn)~xL`yAQBedEPB6d=1UNY|E2JdY-9HzNrVx7OjfC9CZvpBw*V^~BD zr^eW^?hxiIaSLE=6=K40jsD2sMA5BQA&q(NWJZsqJvb)Sv*7?fr3BQ;VQV)|slii; zA&+)Sob;B}_)0b*4?CU!=HJH={*{>%%v zlD{3c&>AOdF2Fc$tTAr+2p-_xKykRn*%Aynt~I3?sbI8Q4D? zVbOF5rDVC08?I+?`?^G>7cD^n%0^lM<~`;eXBWoiQjsHGK@zHrA99FvOu6B6 z8j$)DM1;g?*QzNH!Kszvq3OM(%M}Cj*w@NZxp0;N$(vk%d#|df#02Z*VL#m7zIViv z6+mSAnFMnfF1U>}k>Iu~biNO_Xh5C?(+cbPLHE|4IO}Wvs{O#4~*wnNa(s~RK5`s8z0lY_4Iv~bg^sYTKamg6#6q-bn zUv)2DvI*%$EKmYw&UFs6h?|jzPTOPjDo?a{4Y!YMNP7ykTaN?{j(>tCcS~~8*f*#} z(5^X!2KgDa!meH$9l3;S2cyDRd=$c%RV(BCJ1`-xzcz*Qv?uCMnRRiVJ$<(FVO|!- zX+k61n`2T4d@uetbon0o=+lmt*5r~^pd(a4P37ywS4@iso89?cj%MOrO`ZR49+aK( z-k`UAI51`;QwC~%J@natYSml0I|68b9F;lD-;yl8JhWY4uooufY2D+S+ITb1iu@*T zHrU`P5tK=>Mfe=I9aH8amY!`uB?PpAM4^5#aXYKRqU-uK;L7kWj9lKMW!^ zt!8(we;@)Txn!zPUO+Gkh|PSdJ1;O4gH$98B%B`ZrgK;>)LQK9qh)eC?oXG2574vuAfR}( z0}dx4{|Y#o&ZCIS6-ETkB481PphIq9;$y@j@K`Oy`s2~bg=0xSCphmqbaoB9nmnD=aLTm8`#8k4I$&PVfA;Kj+TO1I1Hkqn952ole`)4wSG;wR$0 zJeo$_!x9TcCx5<2_kDYPgE^$ljUg2afJVS(Fux@24MHwaa%#LB&G>-ouV;05N!D+N zUZxkmbduE{j{8BoJ?u(XJo-J4>+NHm!G}1q&$cg;{h`DXIuugJr1_#+QbfG6=(BmE zLsA$-v?|AnWe_=%8FX5`k>(^=O(q#6oxaFra5*@hB;Sm_Cnt5WT^?yDQD77-bAa)r zDxFpq4aej3#I;Z^l1cmM@|;94@1#^@YxUW9Wd4I{#qqS^9lbhHJvv5c45@bW@xOK{ zQMb+W%`-R*oyfq&(1aK3(*nV-def{H^K5TJTu*Ss)@7QDYNur8C+~ z#&(V0?ORLl3q%hczn*`iwJY0i^;O*>Z-@P8w;6~gk@&*&#IQG(#^v_=md80{<{`;b z+Bp373kYx$&3( zh8F)cYX5Ek1?W=u>-?`9K>vUD%#i-!%hBQQM&92{)qkPB)%(B4)ITcxfA-2&4XCpX z{_k;x_V-pf*_k?9{MYRY15=M0{oiBkpT4J)vx%YgziYA8g!5GZ2%_0K*njDix~$Q*>VOtxILPd}tobZMTxJ+Hgoo_inF@QuI5RgF%6@I8 zija$WE1oSa7C<<__I#8U$Tx`>PolB?K9aQ| zTrA^xBmXl!z1J&nBZ|s7RfBkfM}LKQ)$*dFZ}XGctzvzyRBsk&hoz$wyG>=1N8u{Z zaq?YcO%Us#qW8Zjgt7egZk#0x#Qs^g{@|0iDPd8?iTa8tuJB#Ejo(eV+Cf#6&7F}Z z6fw!~#-Ro&NJ3r3FxC=EGF1jZeC)F!*f}hyuUgT6N9#lz%=OK4_0atrB_6lDE|IQ8 zd2nqzdT=%;TVcq1G&3%%&I=v5KT%^a15NGhf;zCOp92j0=irUqA4I z0~ZkXmI}P=#G^JK`djM@ZRM}+-nDpU_{6=79QJZuvY<_|?KF@};mj7ENi<>M6wlF2SE{n)G1Drir#GWinko4etZio5Nly(_pJXZQpWdDI>Z2# zle5{Aw33|~abp4zD)~^)erH*lYK6+xb;|4F3}NFLfWqh@W=bNOupMAwwJvSq9219g zYdOhd)r~R6V-_XI?}W84UgQUk&XsC`$vEJTascXk_DfXt&OcT9kZU*Ofhp2bov9J8 z8cCMBuw4LIW9mjRh$s`Wdee_3pW!IJ5H+}kMqYTb6{iBB)5E_}73E!3*RwW}xZd%Y!-VM@E&zaDXj`#5xNrzZ>j`vMXKB)=N1 z>`OUOxX{p>SQE0n@JVA5;W}Bnx6H{#Q{r%qn@I_^BzuPP1&{<&dZzwHZz7xxGEv>q z)=gxt?jjD}5p79r+i*0zQ=yjVE^^tIzkKs^s?BV0QH)aPLv&yR04ltojvDTEL=SX7 zPu`BJl0Pl2DUqabvDH|@VA&XuKh`4QecBZUKQG3J^j;xOHmDO~8}%2AHT8IU>0lhCyda|{!7p^*d%cdX5{QiB z9!u~v;G^=wiDZxokbD8P`&v%vAzrgW9ad+a2;u3$C#4WyBWcT?AlT1re$?g|k|MHk z2+ZEA{Q+gu#=p|^u`V>x3{_)TmgLkY_%HP;5*=<`I-9H)yf|PaeJyENT;CrUiZd99Hn^rimy_e*X7s# zn~Gg6Z0Mk+%bFu{;%^afX5ZAECOsl^(3~b!UaeNwy>=2!AZy`1s!{P+VKUNIo7_b9 z1*3+U`Cm?GCJgl4fV@w#D1$?S!d~(76D!FhhZM?m)@++k5H_qXI#xEp8xKA&mrkpY zEfTCuK%Qi&ShGZsn8cM{sG}~8lIN@_FsuU`SWRutAYuaS#qp88bzZ!v_u%){vkHaf zg11NNa}BnScXlgF;puDvK zq6hNphw^qodFX@Iu4T&;G76b)qSx-Led$gyXg0w z7bKazqKpW2uzaQXXB`h@o!GODav21457RJJO~rkyG6=fHXqLZvfl0J)D#7W)RlB{S zVw+m{&9fn2vbEn~>5i49+m7G7B!Dobop3jP-xi^>grjBS3*QmtS>;AAWxb}qe_gok zurNjnBPl4+Ad8 zBDxfCJ46W={;A_4BWP^vFI!fg2cJpDs?zG}t!2~?+Fz6}SzObwhQQjiU@%c>9vVfF zrE*0xKwZu0dM4S+Bt}|fcmc_$xrdNLL_02C(0klG|4J*qLfo^rH|Df%w4Hk=-+W8Y z&XE#ri_(4s5U_=m5c1`F5v^atN4mvXpDPE^#K(po9d00R_P$IfAc;*VHthFKTIQ8L z5crj;ju1u84at}yNU^@qUGB|rvYz%E*8NCy zj4oKlrJ0i6WJLawP@zKP(j%D|;QRqC&|Si1P6e*l5-S+xRK*_J963PdhGsWIf*tAV zP??Z2O!-vz*cd$=mrZ~2uJa-T_K!>@BKDRVj3%)whW$3SMmd?aI>iy zC<5xib>GBUg|r6e@O626)Q^?6O5?3GF6fEOo`r$d@#Co|{umGV4ObAtB}Uzm6GU_5 z+g~APH=*4sFG%vi{8PIkZu9;FI zYp$++@2+x{%UWI^_%6E=6r(4f5+IdB2UD)|z%mHZuaKOKq)%fXYR$cxA_~MTS@yD% z^ri_aOBWZt^H!ha`L^|S4Nbg^uElF%-RoPApf)+%Xh6Wo7$U}B_D_s23%fe@na`H2 z`9uu4bZb(}zo*KWI({OHm8`pHx6(kxRN-8`&*n|*Db#IMzFEFiLFVRJW{u6_6}ei32_8onp# z#o<*$N*g!|>#4%K`hk9$%Bx`**F$3Z+Ww(;H;bZnUrBtraMgbzM^Y_myA>DR#mm4j zd>w0%^}u_rZ^{w^ZbO2Bovg~{ejO=_*vQic^D_XXI_pH^FQ%zZrrTC9X&!V52W~YT zToi+`AKxj3JeTfkvT_g%^Q^7PPBIQr9dr&rW|!$B`U4>B#$+}Oh#jp__*6ak{8<+h zSRNWgyB%042n4Q@t~9tXkaM>h+_(}VqXy?Ht!!0v_V2Cb$HRKxP{Lkj1H5-dx`1%HHGU2nEEHxT z@vPa=siL<>{FVlvwiT~;1ssZGL{tjCllb!!OMx@}9u5j`qMiI+`euJN18&dPvkop& z-$4*~AJf|kr{WklZl%tL=-OE7mPdQegGvmkzd(;{+u{4?p#kV2_5QLmo&|>}=#S)- z%vRI58WaOb-yrcA{sVmsZdqW&6YwTy|g{2B_Z@4fke~$pe|%W9jHaK ziJtNNu*c*kvwy)B*+bkyt1ae2GI7~7*WlRuJK-&=^J$4~^SJ_1g;FS+v9jF>WLUV(@1hmAl|jU9$ePfK<_FNzxo+!rh;~ob* z)OZiWIqv1{EWgvEU8LME$pVJd0S*T;cn-$&9rIt_?Opl)X+#zykXsN=mPm1Ziu=uYMZ%m=5 zRMRpcnF(OtZ0QdGutHtOOgM0Ab^&Y$5_JHQzaq>A7FMBMK=zEdU$&v8-MW;0xJR5Q zdP`L7F*qhXUY0Z_dD^zXH4u;Vdo+Xua2bS+bnzhQxy4!S6P(8xYKqY)x?*m1NEV1Z z3w~3Zp^Cs(IL`xXYI=<;6F$yv9{<(VzA*>!b|av=a#%>|y!}R7vEo5%V8a ztg1e5>X#t?I0-1M>`Bg^aDFY3(5~{VCo%`aA*dQ78;fv%zYq|+YjZ+1mA$z9 zSQGiQoPWD~u2W^d(`C0VA6_I+8Mb7(8#?uN6_(hR9;D7)#&9Es-@(3srj8Np8h6f5 z5A=p0zI4yRIPs-%VCR!=kA7sBzMzW~;MPHU6JqieTaJR$8ylc6%h;L>j zPoVws(-DwbE9f?qf5gfBjUq*B5+Prgia+v1#2w!=ah{ zDUwiyim}?nGWd8n=|zh~QBe-H>?_6vcdgxYpmFg+?}6!2o9VQ$jbl-zMk=&}(YWBE zP92*_UB}T(>+A5r$@z}knx&d5=9;}CkPNLk()7dg!{C?$_Af#Ln%SIFU0WdMFUJxx zX#oI0vOk`ltbxylkwae=-TS-%OfG+pfSQit#~GkOB! z!ZVQ~Z4X{RH4#o(Q-XoW@}U_|9cn#C;Un%aRsI(5erAoyth^~C`+E@eba#nUQKa_I9%%K` zaB&-`b({h?`3>lg1n`E#R=&YYGDWYjee3N{R)D1K7&)VArLZu;-Pc8Cvt&S0821J3 z6aLo|_zu*0UYRKyhelL}xfoY|)jjeLH#V@6L6>=!J`o%?bK+DdbAJ{5pGk?Vv`S>{ zl%~ekcO{MDKY0PcQDv7+kPQ3JWz+1q$7U;x{@zmeZhD$PXt&FjRewe+1m9oqdJrP< z7(EJ&aHeFpBc|`hm4=!!6AZ~}L~<|0cGm-Y4a6;;L++bqvZ3f^%D#q-c+q7y!MUThFWe@aUyBc5hYI8ee&uPxLj z3{9mvho$VVZ&LD3UeKP~a{FvPyCs4o@Ib(7S^gU&MYt4NxM9(p~ZhX$02 zH!Mo{Rss}kANFtfoS*5O8xI$;D-)1@O+>6KHBr4O_+lAS|13fwq(JKZe(dGy7#k?b zFg2wutw!7C6`TI%^Nx3EkN>Kb`VHD;&W0%>;PEi-GEkgLu}Fgvg88AyWED$2(MbbD z5QTAGF#0X3r zQSLT&5a;J98l!a`l(^aW$}h8mD<4-6#U+_9Ra?6lu=vbNP}kFWzN&t$^}*E31&G`X5^)Y7!ubtPz!a#X;co$#DB-;LBJIL9_PZLOs_-yr7r zSE~HvVw^RuEiVH65NThBfd-#wAyFqGKg@`xBF!?IlyRD+ZFh$`JXhMpr9p2X|GDhB z{GhGc;yNK4VME7U$x{kFWUjf14ZOzwFLTW+3{4Em7m>f_nto}JV?+@0>FeL0&iYgR z$M@sNUjeYB-2hlzgKj89%G_>fRF2JV7)&wJ9#~vegC01Lac&Phq4Q=B0%;IwFCt}< zK`#<*VQw!nL*r&I3QIp}A1cSZK_43TZf+ks-|c1}h7c@SKc*P2VLz56WnMqFEXP(q zj-nXZ0IsU4;Q&z6IBx(?*LiCI-yn!=5M-QWIQY)IFmI5+x^Zjpy?sB~5TWzD;SiDg zZr%{F_wCjYi61QaFli93(J)yUW&SXE)ay(bMXV5ZIAx-uayZpDgM@JE40|?7nrv_A zQJTCkHe|Y@R7Yg`veJ6w4^=;|ks0cS*-#i&=8eV}f9&RuF}2@rk1=<{QjD|o&q$q;S*smyx&JU|3hAk(i z^chbtNJ*T)re8^d*aucwk~~^WS&BLN>U^a2mRf6P(C;F27vxzjz!q;p_W6-QgSfBYhk<@W4_B zVTBNdXc{%seg}c0dG0~R@lu1+#z|_Xzf4k1inW+C92T@>vVGvS&GVzF3789$O_wc7 zi;9;mE1N=oT2yzy>sZ%Mi0jy(h8tz5HgCDUx7^q*Ua@Vz+h4Kkjt6Gi_hD&dIt-Bw zp$m-A4NW?ZU(IAXIZO%TXF1Qgi0Qd3=q+No{<3H$a$ECwAadUd=lUYP%czv?aUkda z#q+po=!@5R2ZFx$^@N1J&;44czVGwdkp9=-ZwLl{(0H7r{)f2coBoK@VMzff>3s%) z=mLma0a()9d0gj;nuaQxTJCuoAk)%3%J+^-d7&f$p!_h31Vnr-s#fv5aGLy7a#8x~ z;caZj)<^Ov)=_SX=yyNOccQuXOLt=U?!&TU1)<;V;;JF?P{w~I@SsePqHm#0l;eG( zOj45Op-NWMMw&?0G%edp(RDrA`(_YAx1Va95)S-injLPME?L%6%x=9tZ>o~>o$sVT$E(S!r>3aCt@ja(P8fFTqLq!Z7cL zswM*Sld2!m$raUY*UJ?(U7j%3wH?Tx7@Ydo)vYPcL0$~?UnwKQ>&MtuDjTLm-d8ow zDeKtqEYQ2#G%cC7Wq!{It7oKJbv-$2-VFJ0{$n@A>b&K!*h)p>xVi%DdP=-d-FDFm zZP)%)jqIZRb^+3fdfczL=zP6Dx#$8QfNHy8aCB>XWC^3}{g6Jiv-F|$9bNWeixAfJ zJ8^P5^rI{5)(zkG3M;o3o`e;N{zEE zeq^8c&(ZEbLsTaZ$f(Qa?@3p&C*G1Jb$cfib-5X2)b-y1A7r{aZ3cYRmIXfl`>3m` z3E&yqqbZ1q2`(>$X&A)tx^dG1z9M3TX5rV7YE-IJhoRrpq3Ia-}7ZdPs}SAnzoJG+XhQ zNxsd`aLCpAj($g|kXp(p!9ayma$~r3QdIldA+QYA(@=P0XF3=!jrBB)T6->~K}N2{ z%G(^3k8*oQSmv8jbs}B<71r#zqC@p4djsPFutThSY94;RFF4}I!q3| z{ax1PVp15Y`N9^?Cn>nRT=$uF5Hxi+C#1n*+~*$eRfQXDpvT!RzP5t+1p)aRnS1I(jtrHEnEf%Dl?a z3-ZCos;_ElY6G`+Sr+MrZ*3N1VrWRgT@K_XY(GmTF4|Vgzv}R@;KGh(tZ<^)5=)dm zqBS~#;|K}5w0L5GA4f}~idd9bcW#m}ceuVcmRziKRrn{spS#Uu1rpf@`mXI}BHb}V z!dLZI&b>^hKTOc*TDv$7ZDz+O4mimx5`V2+lXhNxydouee_F`u{PNRrEI-l#09sM!4nov~@SW_d*+u}fe(wqFTaGs}EaaW3;)Nhzf={iwt@>gw5JNdyG z6oae;5oh;4CB~&I(AQJ1KlMhvriyr@7A_%;?VD|}17-Ob@(atH+>3-gBj$IB% zG{=VVz`N9LE~&LpWae@K1AMyB1p*Tm+R=n%?wBfhz6?oXGie&U+NKr%T2R39P}%~^ z?98!q@m7XO*-eCbh-@h>%<+>WGP0`P;MTNJ+|Hqvm!}_L( zIOd)Y5?Ks#*z(?j8e#Vnl5b04P;f;Q?RQ7vu;04?p&U#|fKf`>Ed|;0}En`Ejn}1%gZen70$MIbJ=_Ra^jukc_Kj0jTLS0Z|B&E z@6~aoHV+Y**r7MXelzs<6|$fu(DX=Z#rFf(i%S7wz)^x&HexAb_@Sf2Ey`XVjdS01 zBw(t<v4ZP_&9lk4Jzqs$F8|RKmc@5(V`rtZta4 zna{)=hhzy)Eg?hFO$DdcLAYwKOVC^9exRKe-)K&^+}pCtYYvPEUDI}t>kqIf5-^4l zT@J|KtflEMUl0@VFp)4`pDjeLgPZT(yGc)H(V@NOYvS~*!L{q)z27A?R5`qVg=szr z3EJR7vlDZWYVj!;Mg)mZGsX$giA}=WL2-#;MUlH(Q7ViSf0E5!_s@bd4C$VWy3)}p zD~Pk6ZkNw;Uw#At8${++q+}FqNlA&C*bPDnx~mJsxuuL$7Z1(fvn%NQuH{<`=t0CU z=m5@x5i(#*nLm{*ozwOs&Ib%Jvgto zcJI<0;M+PFm#Gvg87A9~zF2JeL4mJcWG4mA{W;Ll4u$o*X1Y4_H_sE_w zNhbjciRpcZ^j--4sw_bO=LbLZ?JY96vLHtwJ8{8<>ZgQ5X?g3S!h~WAxr$7Dj&|?x zv|>Z+b*~0hHM2r?5Cdx-QY42$g+Wx{tqQGrbomdE^sCelf3#D}6LUApnJ0kL4N@3r z3822{R#8r`pWV3PqPM8>CPRLpTWUu_cU#xDOjf||>c zCUe6rpUsHF_88|dJJGbf{@|YOVfn>3RRX+d>5&a)TZ6b$+&73nmEWI5r373%yf8Y@ z07UltAMdu_0rQY5bJ^#v9jVwKwo+=dLy)!?^fq8{*~VAxA6X<*gbLd*3TEECW4IU;{Kj)q6@J9iI$Q zg`Ou8U#zx2?k(CNV)A`!Q&TMvZPisO>IU3z-twvNeVEak-m^^*BXF1%jQi%6WT2PMb&4(ratEISdWeR;9nS zO!YKWd>Z4zf0pIVOhJMg=G%L9>iH(AICm@anOp@0fz|!JOsG3%))g*Y1=rXZk@WQG~$_p&g zDW4BhEYb~pIGFwd>?h~u00GZn)|iumVuA@)t03JGy3k}MJttpV(j72{Y+-?$HMZn7 z^+V$AqE|An9X-YGn>lko6XmuqLwTps5i#Lew-5s@29#oHKM?5HJktoea!(y z{IwAPyNiqG%*is-ze0RMKo@?Y{^201^lMvd+l$6KTB(oaeHXw%JLXwmd_WyCxM=7s zr%awy8SuNh4zs0yqW9Yu>RAq~N++4J4Ep}9v+N(uZCD?&(~pXK?H?N@SdGCUs2&lJ zh_QcXg1Kej*84epj8;4?=v!=k;%nAY4^Mp}ubv*XEkjRQ?+xUG;e0 z{#{Gc-$EI0K0MNBTMiU#GCokUk;#F1lt4JN7tvr59^nPt~LmqKv2hT-*1^jUy?(`woTl>#F3E4%%I? zUsb8uP6kxryr0kBz4<{f|x@rWpFh;t$4N0(CNcKe3|&?@&X&anI5mnjrPnN!#|c) zO07=bPLXs`V13Hsig1Hz4VW>*U%6?}yipQKIa2(oz1Q5jBee92;X)`HgL|1zx5=Kx z77=&JM8ooFwAY6vI&NF~B{tSyjvF4;4Z(G}`ifULrPrc-|B zH8R*+=nvmq;FTnau!eM=;+)S*swezBw)VrdWUrB*y~!?m3cz&(Q15bTBZ5ic3QBl^?e{_Qvn3)+~1r4 z?HpgNgn*;jV2Dp zcZ{*g!Ao(K%oUZgqxFGD4;P|eFn^FYhXvsgZXyjF++WD`6o|9gHVeUh;o)2>ubm4( zMZ}WWU=n8Qby=&HLljPUc78uH-XDz=tkGaEl-tHx#U}h$IrG~im=%3igFL+$X+eYO zSJHz}nukfg2(|jcKr%kI|8}ET`?UfHpOr~|)zyvbP>0LN%!Qa$_ZL2Ge7umo0>l6} z;-&4tPQd}G!8Of{Mz7!DFGuJ>ebIDNAGhVP2Y8EZU%HdFEwAsr255gQP+mPKO6Kw_ z&u(*u`X^wS1+27ZvKMp``4_~=yx&m~!|;Prg1&I8s+-b7<_uAQBmmFF zIro~2$;yI|%4X_LL1K_ftUi(|Int&`*-c*eP`R(mq-0*ngtHoG`!(FSs~I^l_e=WS!3; z7sOF3o83O>PG*2_4kD!%peyyuk@(1#onCD5{Z55b0*87S+3Mj!TE|%c+?V)&}7X50TnfLWtSOfr}_6pEtzUrU8RmlltWx6qZ1&4RnpG)OI-R@th6F_9@-kO+K;IUo4`4Q5T!7)DWfQ6t&k9dea@W z{jPB9LkWgCu@oRB4Z`h=T)5!^w|Ywvv_I`2VET#850`PlbajOnQe@^ow!rO$phJf- zylKnv;Al7O(HUcK7Oju3m;jqFB3TrKo%7jA0qBq-hnrE=mWJ$CLm&K%%{C1@Z*;y_ zv@niP?iQk68mXFEhqHd!?%dj%g7UE~+?&(Uo@7bUiF7wvq@S&RHj`1*H=nkpqXZ~Z z_ZaERMlzPk+2nBKG|iF6L?0FH;j|Y1vt!#TU!5{eqQ2n;X7y;++|!c0aOH*)6UlgY?1}}Bdf_^dHCGt=c5z<*yU0G@GBQBEIU;Ua2${)Kd$4ak zf|stOhx2&4gcAk70%z|7Tqx5wC9`x|bILCv>XV_)wVkS%!Upeh5cIbd)jCD}^o=Cd zp>V%E+7ODAZlUCEh$w|T;udusM+y{Y+PAFXJcM*3J&r9D_h*%~ZoC?w%m?)9t0keb z2}b#@wPuvt?Fc9m++1xg%5wcge2x4Z7Fmk4%(n&`GN$V5NuPSxtst3QPfJlE9~Y9W zuR^=oZ}RrGo|UhYf&Fx1o?h%B^hcY>q*&-3+}7bHweTl_(lEVpDf&ZQDR_CH3Z0^VE%aA zHqI@kaKF2>HG|E6_dmM3@~|e7EFLoGFp3HhVo+4%Mv;UNF7Hl&a0Lh?kOZ%Ugm8o_ z;dZO? zwYBU8TT3tdUD#g)Ud}%5P1_3Poo)ZQOYa$Q_b4rzaDcDAwcl2nJV>u7Uh4b2GJjobc8;z`hxduR;$nh%SDezj$_Hg$op(Fiots1_ zvo=1M*7K>LthZgds)@?(GGFbRL@RKJ`|;t1{%M@@-{+*?sqH+<0Iu8aU7*hzGn^0J z8aTaQXO?DF4OsG}T0Gkaw2!@5tmN7TlQrA)5_6?T`_4}uuQ7#B`q8$eWz!05OhcSn z^pVPs-j0)LODL3v6)vi~lTSUq|WY}(oVUcvHDl>GCjTNw-;5-o;hP&U1C*zUDD&)rU?`$VQd|wqXOwpy3K4# z_SD&TGAvI$W~&NWHvfm_+tX>z4dY_hwP)Vx57H&PtTY)rdm4+K`}3B83h<|e9mhS{ z?5^s)#Yu@bu2HPtWLJlOcT12yXYn7m@@2T#O@8ZY)ct7hoE?0Y%Ot`g(*&c7%?FC~ zPZPd>xOe~K`WcUWP3COnX+2#dvkO|%gDYF1bnHh=r}ER+7d0)nB(82)tgoNobuD&p z^OK)9lsMIC*Mz359O!+Rqp$z+>D@PJ%<%osonKhl z-1}wGqbs)v@A`1ROk-&{zSCNwdo|_Ii;9e+TE0`8Y^Mgbc06jl>1C!NFm7~SWRN9W z)LA#Bwf^_AidE4y>0}ovC9$e1?45B2jjzm2nQqyRX`5Yf*BQ4VwM=<^ceCxd(4?Cx z3k?JhU7L2@XeW7EYtIyYd~yo2VY_l((V3lLAu4&blXiknu5bRi0}R!lMY{tF@<<=Y zb_b{KRbHJ+W3vwz?_v}r>R!B7T1r|!X0!6@C7;YVOl=PKo_QNbP*&Q~?}9arjq>Pt z-5Zx`)>KeKysC3C^2Eu9)>tnc4P|IxRkHnWzZdbk2>WxH~#-YKHh%dRHlM|i*VNqs98rytDx)YM%PF_Dm1v6+?oY(lr! zLG|4>+fE*NWpVJ;hNR2SeotO-o7i_m~ok()d?^Vlkk>s80&|m2=hT|A2KRd8=5cnl}+-s^n(lL#h@RY+k5R+8r$> zSaxw_6RfTW9GYN#EB3_%n>$H*Z8kkQer>jWWu6Z(EuP9}A{d1oeV5~Pu3H<`)GvY*1w$#R$$(V0b`xn83~ z2gLe1a0c5Van2@{zu;Wv9({@Xa;&d%7KrtUymFZIS;MhDNHdq^#boEQefYV#-o6os zlV>c8d!@`uFVd-z}-7G@`9@N?=B$j zJAD5}@PRJ%?UF<7-rGa!7UFUj`rpah9{OwlmF-~-O36OqjT7igBhE}u)@C$Umfw`N zT0XcbyJT;4E3%UrOmA_d9+P&drg(p|kJa_d59HDg2F72Wm^g zwWIq=X3k7dIj1)(%|u+n7niam%vkx=t&$(q|t90YB39vTS#%R(Xz>C$~J;N3g9t&lle$`f2%^AN=zL zTb%s2(YM7J76hi1-!G8VniLv`j3bY?9Qpn4N!;d|}>uNx@+V4fD+r1nPW`H;HXYsQBjxu3Gi zC-UA_s+nQc<>l39!y^+9@@P6b*=xqXMauW!7rbHeh&p^6`2pYDj*)3$3d|+zC%g# zfD)K6jFvY3rn2UZq5PSN@_dWBX7pf{x08pn=`NZlCJh!REFCVcSw5V-xpFw0;y0YF z7Y%1YI9c<|gOvQ`q&T!#VMjAsFk&+?S|s1cJE9G|g%&Ls zxqB5YQ`|U;mW+t7k3fRgAO;+rsim!T(=g#X?ct**|L&oh_?~90CIR2wh}E>g7}k!J zBYfk-EAbzY2j`!{YVK0P817Wl{<;$k@i{GU2kK8_H5XwFPvEqN&6MIyDsI3X?chQD zK7fYDzeUlWYRJ+4q6LbOqn#IsZ$uz#n-GGnJUOI-2BBbE{^Oo_{57PvFHx|!OXXlA zv`>-SXj|Fw&k=4lXAo|CR>m9xdaMeufmCAJ8I?>meICIX{nN>Fuj=gbQcMe>FGNIqeY5-osc z%?xmd#9VI=HpS5;I!FXbr0!Ps;ut5X%vun_ z(2eXYVZ?LIxe^;o21^*|&JqT=vz#UFcH(%h4=;!Tv8^DoTa1tm(Y(ZAaSS`BI9C>A z>B?fn@F6!VM`oB4P3##auy7;MA$u2UOqdYjd5WolkUbScP=!1_ z-F@bv2A;4_g9s-}u_FXQOz;CI7YL$2b`azW7tzF^ggPXGs8mEA1J`pJmP2qQ_>M)JO0^27vYI z5o*ZgbG}ZnT!Ak<5OTh5mVOqV5&_Z@f`<#exU(#YL>t-gJ(U7W!UYy$z|k!l{53OD zAhAU_z+Cx0-ZCEO4=*G(n#%!TRJiM()ej1@5;{s^c<{3v1U3L)J}ATUZEuGRnhQAi zk!S-d4oxvkg=+3cb_nHp+r|s&-T?wo01D9yJ;>^t@^zEKd|@R+XZCZxG;3H6q(gk+ zs&c*pGLOXPILE^6QLTap!7gSFm>Qt;LxmzZ$M>H-n6cool}H41 zCafbe0oeh5fahQ9Q>@%W5nTczY9P^{NMyj3kdXpQ0Zmj0VL~GlJ%~h51l9tQz&eB= zA;&F%NA~s?gh+yrxk7XUzEDYXz|#G8eCg~i%iAP_=0)1;QB}qol>+HVvqQ5YAuI-laYihfx$egM9A9$GA9c5oWTN1 z*Cq_Eh#Gms{{K7wL*o0GOMo8>5O4rF7;6-NiRcr$>Ja~i7!mkEOg2ItxzJf)5^NAv z;s6^-jDss630>(9ayg$4{E>nTsesCZ;bN3Otb3pNL%IJXaee&Ys-xO-phtcXc)=Oc zC$J9W`h+YBlp11>U^FHh8CCRNJZLn*`xPuh=L>W-2IdEr0J6T23nE761N5JX@`oP< zQG@&(K|c@~GT%e(!EdDL|Fpm0feSGNt#H0HB1#lFS1Qwq7D;7+VWB!?3p0`#3A0!Y KgYf|WOXFX_Ldj|X From c100737a81d4c0b13e01fd254de3c61d4be77fa8 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 26 Jan 2023 06:27:07 -0500 Subject: [PATCH 2/3] chore: Do not send dictionary encoded data to clients --- .../tests/end_to_end_cases/querier.rs | 61 ++++++++++++++++++- influxdb_iox_client/src/client/flight/mod.rs | 16 ++--- service_grpc_flight/src/lib.rs | 29 ++++++++- 3 files changed, 94 insertions(+), 12 deletions(-) diff --git a/influxdb_iox/tests/end_to_end_cases/querier.rs b/influxdb_iox/tests/end_to_end_cases/querier.rs index fb145b97f0..8bb38bfbff 100644 --- a/influxdb_iox/tests/end_to_end_cases/querier.rs +++ b/influxdb_iox/tests/end_to_end_cases/querier.rs @@ -3,12 +3,18 @@ mod multi_ingester; use std::time::Duration; +use arrow::datatypes::{DataType, SchemaRef}; +use arrow_flight::{ + decode::{DecodedFlightData, DecodedPayload}, + error::FlightError, +}; use arrow_util::assert_batches_sorted_eq; use assert_cmd::{assert::Assert, Command}; use futures::{FutureExt, StreamExt, TryStreamExt}; use generated_types::{ aggregate::AggregateType, read_group_request::Group, read_response::frame::Data, }; +use influxdb_iox_client::flight::IOxRecordBatchStream; use predicates::prelude::*; use test_helpers::assert_contains; use test_helpers_end_to_end::{ @@ -92,7 +98,7 @@ mod with_kafka { let mut client = influxdb_iox_client::flight::Client::new(querier_connection); - let result_stream = client.sql(namespace.into(), sql).await.unwrap(); + let result_stream = client.sql(namespace, &sql).await.unwrap(); let mut flight_stream = result_stream.into_inner(); @@ -103,6 +109,15 @@ mod with_kafka { // otherwise other clients may complain // https://github.com/influxdata/influxdb_iox/pull/6668 assert!(flight_stream.got_schema()); + + // run the query again and ensure there are no dictionaries + let result_stream = client.sql(namespace, sql).await.unwrap(); + verify_schema(result_stream).await; + + // run a query that does return results and ensure there are no dictionaries + let sql = format!("select * from {table_name}"); + let result_stream = client.sql(namespace, sql).await.unwrap(); + verify_schema(result_stream).await; } .boxed() })), @@ -1043,7 +1058,7 @@ mod kafkaless_rpc_write { let mut client = influxdb_iox_client::flight::Client::new(querier_connection); - let result_stream = client.sql(namespace.into(), sql).await.unwrap(); + let result_stream = client.sql(namespace, &sql).await.unwrap(); let mut flight_stream = result_stream.into_inner(); @@ -1054,6 +1069,15 @@ mod kafkaless_rpc_write { // otherwise other clients may complain // https://github.com/influxdata/influxdb_iox/pull/6668 assert!(flight_stream.got_schema()); + + // run the query again and ensure there are no dictionaries + let result_stream = client.sql(namespace, sql).await.unwrap(); + verify_schema(result_stream).await; + + // run a query that does return results and ensure there are no dictionaries + let sql = format!("select * from {table_name}"); + let result_stream = client.sql(namespace, sql).await.unwrap(); + verify_schema(result_stream).await; } .boxed() })), @@ -1175,3 +1199,36 @@ mod kafkaless_rpc_write { StepTest::new(&mut cluster, steps).run().await } } + +/// Some clients, such as the golang ones, can not decode +/// dictinary encoded Flight data. This function asserts that all +/// schemas received in the stream are unpacked +pub(crate) async fn verify_schema(stream: IOxRecordBatchStream) { + let flight_stream = stream.into_inner().into_inner(); + + let decoded_data: Result, FlightError> = + flight_stream.try_collect().await; + + // no errors + let decoded_data = decoded_data.unwrap(); + + // the schema should not have any dictionary encoded batches in it + // as go clients can't deal with this + for DecodedFlightData { inner: _, payload } in decoded_data { + match payload { + DecodedPayload::None => {} + DecodedPayload::Schema(s) => assert_no_dictionaries(s), + DecodedPayload::RecordBatch(b) => assert_no_dictionaries(b.schema()), + } + } +} + +fn assert_no_dictionaries(schema: SchemaRef) { + for field in schema.fields() { + let dt = field.data_type(); + assert!( + !matches!(dt, DataType::Dictionary(_, _)), + "Found unexpected dictionary in schema: {schema:#?}" + ); + } +} diff --git a/influxdb_iox_client/src/client/flight/mod.rs b/influxdb_iox_client/src/client/flight/mod.rs index 557b28fa8b..f7c6cc45be 100644 --- a/influxdb_iox_client/src/client/flight/mod.rs +++ b/influxdb_iox_client/src/client/flight/mod.rs @@ -173,12 +173,12 @@ impl Client { /// a struct that can stream Arrow [`RecordBatch`] results. pub async fn sql( &mut self, - namespace_name: String, - sql_query: String, + namespace_name: impl Into + Send, + sql_query: impl Into + Send, ) -> Result { let request = ReadInfo { - namespace_name, - sql_query, + namespace_name: namespace_name.into(), + sql_query: sql_query.into(), query_type: QueryType::Sql.into(), flightsql_command: vec![], }; @@ -190,12 +190,12 @@ impl Client { /// a struct that can stream Arrow [`RecordBatch`] results. pub async fn influxql( &mut self, - namespace_name: String, - influxql_query: String, + namespace_name: impl Into + Send, + influxql_query: impl Into + Send, ) -> Result { let request = ReadInfo { - namespace_name, - sql_query: influxql_query, + namespace_name: namespace_name.into(), + sql_query: influxql_query.into(), query_type: QueryType::InfluxQl.into(), flightsql_command: vec![], }; diff --git a/service_grpc_flight/src/lib.rs b/service_grpc_flight/src/lib.rs index e7c84f2952..3087865aab 100644 --- a/service_grpc_flight/src/lib.rs +++ b/service_grpc_flight/src/lib.rs @@ -4,7 +4,9 @@ mod request; use arrow::{ - datatypes::SchemaRef, error::ArrowError, ipc::writer::IpcWriteOptions, + datatypes::{DataType, Field, Schema, SchemaRef}, + error::ArrowError, + ipc::writer::IpcWriteOptions, record_batch::RecordBatch, }; use arrow_flight::{ @@ -732,7 +734,7 @@ impl IOxFlightDataEncoderBuilder { fn new(schema: SchemaRef) -> Self { Self { inner: FlightDataEncoderBuilder::new(), - schema, + schema: prepare_schema_for_flight(schema), } } @@ -788,6 +790,29 @@ impl Stream for IOxFlightDataEncoder { } } +/// Prepare an arrow Schema for transport over the Arrow Flight protocol +/// +/// Convert dictionary types to underlying types +/// +/// See hydrate_dictionary for more information +fn prepare_schema_for_flight(schema: SchemaRef) -> SchemaRef { + let fields = schema + .fields() + .iter() + .map(|field| match field.data_type() { + DataType::Dictionary(_, value_type) => Field::new( + field.name(), + value_type.as_ref().clone(), + field.is_nullable(), + ) + .with_metadata(field.metadata().clone()), + _ => field.clone(), + }) + .collect(); + + Arc::new(Schema::new(fields)) +} + impl Stream for GetStream { type Item = Result; From 6a0429584a73e693f9b171d192d81b4af55a2279 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 26 Jan 2023 06:59:34 -0500 Subject: [PATCH 3/3] fix: update doc example --- influxdb_iox_client/src/client/flight/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/influxdb_iox_client/src/client/flight/mod.rs b/influxdb_iox_client/src/client/flight/mod.rs index f7c6cc45be..fabdf7b96b 100644 --- a/influxdb_iox_client/src/client/flight/mod.rs +++ b/influxdb_iox_client/src/client/flight/mod.rs @@ -125,7 +125,7 @@ impl From for Error { /// /// // results is a stream of RecordBatches /// let query_results = client -/// .sql("my_namespace".into(), "select * from cpu_load".into()) +/// .sql("my_namespace", "select * from cpu_load") /// .await /// .expect("query request should work"); ///