From 369c2237f68f56d9dd01674f7f75ccfdeca8a943 Mon Sep 17 00:00:00 2001 From: Marko Mikulicic Date: Thu, 10 Jun 2021 23:26:25 +0200 Subject: [PATCH 1/8] fix: Expose jaeger knobs and default max-packet-size to something that works everywhere `--traces-exporter-jaeger-max-packet-size` is important also when you run the jaeger collector on "localhost" by running `docker run jaegertracing/all-in-one ....` which on mac doesn't really work on the real localhost but has a few hops between tunneling interfaces, so you'd get mysteriously dropped packets that can easily drive you to doubt your own sanity on an otherwise calm Thursday evening. --- src/commands/run.rs | 26 ++++++++++++++++++++++++++ src/commands/tracing.rs | 22 +++++++++++++++++----- 2 files changed, 43 insertions(+), 5 deletions(-) diff --git a/src/commands/run.rs b/src/commands/run.rs index 1a07df842b..3ff95c3df3 100644 --- a/src/commands/run.rs +++ b/src/commands/run.rs @@ -245,6 +245,32 @@ pub struct Config { )] pub traces_exporter_jaeger_agent_port: NonZeroU16, + /// Tracing: Jaeger service name. + /// + /// Only used if `--traces-exporter` is "jaeger". + #[structopt( + long = "--traces-exporter-jaeger-service-name", + env = "TRACES_EXPORTER_JAEGER_SERVICE_NAME", + default_value = "iox" + )] + pub traces_exporter_jaeger_service_name: String, + + /// Tracing: Jaeger max UDP packet size + /// + /// Default to 1300, which is a safe MTU. + /// + /// You can increase it to 65000 if the target is a jaeger collector + /// on localhost. If so, the batching exporter will be enabled for + /// extra efficiency. Otherwise an UDP packet will be sent for each exported span. + /// + /// Only used if `--traces-exporter` is "jaeger". + #[structopt( + long = "--traces-exporter-jaeger-max-packet-size", + env = "TRACES_EXPORTER_JAEGER_MAX_PACKET_SIZE", + default_value = "1300" + )] + pub traces_exporter_jaeger_max_packet_size: usize, + /// The identifier for the server. /// /// Used for writing to object storage and as an identifier that is added to diff --git a/src/commands/tracing.rs b/src/commands/tracing.rs index f5976d9333..43a4b1835f 100644 --- a/src/commands/tracing.rs +++ b/src/commands/tracing.rs @@ -160,13 +160,25 @@ fn construct_opentelemetry_tracer(config: &crate::commands::run::Config) -> Opti config.traces_exporter_jaeger_agent_port ); opentelemetry::global::set_text_map_propagator(opentelemetry_jaeger::Propagator::new()); - Some( - opentelemetry_jaeger::new_pipeline() + Some({ + let builder = opentelemetry_jaeger::new_pipeline() .with_trace_config(trace_config) .with_agent_endpoint(agent_endpoint) - .install_batch(opentelemetry::runtime::Tokio) - .unwrap(), - ) + .with_service_name(&config.traces_exporter_jaeger_service_name) + .with_max_packet_size(config.traces_exporter_jaeger_max_packet_size); + + // Batching is hard to tune because the max batch size + // is not currently exposed as a tunable from the trace config, and even then + // it's defined in terms of max number of spans, and not their size in bytes. + // Thus we enable batching only when the MTU size is 65000 which is the value suggested + // by jaeger when exporting to localhost. + if config.traces_exporter_jaeger_max_packet_size >= 65_000 { + builder.install_batch(opentelemetry::runtime::Tokio) + } else { + builder.install_simple() + } + .unwrap() + }) } TracesExporter::Otlp => { From f8a518bbed9554cd597137eed7f75048aa1cf43f Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Fri, 11 Jun 2021 11:51:51 +0200 Subject: [PATCH 2/8] refactor: inline `Table` into `parquet_file::chunk::Chunk` Note that the resulting size estimations are different because we were double-counting `Table`. `mem::size_of::()` is recursive for non-boxed types since the child will be part of the parent structure. Issue: #1295. --- parquet_file/src/chunk.rs | 148 +++++++++++++++++++-------- parquet_file/src/lib.rs | 1 - parquet_file/src/metadata.rs | 8 +- parquet_file/src/table.rs | 177 --------------------------------- parquet_file/src/test_utils.rs | 4 +- server/src/db.rs | 16 +-- server/src/db/chunk.rs | 2 +- 7 files changed, 119 insertions(+), 237 deletions(-) delete mode 100644 parquet_file/src/table.rs diff --git a/parquet_file/src/chunk.rs b/parquet_file/src/chunk.rs index 2d31bc203d..cdbd76063d 100644 --- a/parquet_file/src/chunk.rs +++ b/parquet_file/src/chunk.rs @@ -1,10 +1,16 @@ use snafu::{ResultExt, Snafu}; use std::{collections::BTreeSet, sync::Arc}; -use crate::table::Table; -use data_types::{partition_metadata::TableSummary, timestamp::TimestampRange}; +use crate::storage::Storage; +use data_types::{ + partition_metadata::{Statistics, TableSummary}, + timestamp::TimestampRange, +}; use datafusion::physical_plan::SendableRecordBatchStream; -use internal_types::{schema::Schema, selection::Selection}; +use internal_types::{ + schema::{Schema, TIME_COLUMN_NAME}, + selection::Selection, +}; use object_store::{path::Path, ObjectStore}; use query::predicate::Predicate; @@ -13,25 +19,15 @@ use std::mem; #[derive(Debug, Snafu)] pub enum Error { - #[snafu(display("Error writing table '{}': {}", table_name, source))] - TableWrite { - table_name: String, - source: crate::table::Error, - }, - - #[snafu(display("Table Error in '{}': {}", table_name, source))] - NamedTableError { - table_name: String, - source: crate::table::Error, - }, - #[snafu(display("Table '{}' not found in chunk", table_name))] NamedTableNotFoundInChunk { table_name: String }, - #[snafu(display("Error read parquet file for table '{}'", table_name,))] - ReadParquet { - table_name: String, - source: crate::table::Error, + #[snafu(display("Failed to read parquet: {}", source))] + ReadParquet { source: crate::storage::Error }, + + #[snafu(display("Failed to select columns: {}", source))] + SelectColumns { + source: internal_types::schema::Error, }, } @@ -64,8 +60,23 @@ pub struct Chunk { /// Partition this chunk belongs to partition_key: String, - /// The table in chunk - table: Table, + /// Meta data of the table + table_summary: Arc, + + /// Schema that goes with this table's parquet file + schema: Arc, + + /// Timestamp range of this table's parquet file + /// (extracted from TableSummary) + timestamp_range: Option, + + /// Object store of the above relative path to open and read the file + object_store: Arc, + + /// Path in the object store. Format: + /// //data///.parquet + object_store_path: Path, metrics: ChunkMetrics, } @@ -79,11 +90,15 @@ impl Chunk { schema: Schema, metrics: ChunkMetrics, ) -> Self { - let table = Table::new(table_summary, file_location, store, schema); + let timestamp_range = extract_range(&table_summary); let mut chunk = Self { partition_key: part_key.into(), - table, + table_summary: Arc::new(table_summary), + schema: Arc::new(schema), + timestamp_range, + object_store: store, + object_store_path: file_location, metrics, }; @@ -97,64 +112,109 @@ impl Chunk { } /// Return object store path for this chunk - pub fn table_path(&self) -> Path { - self.table.path() + pub fn path(&self) -> Path { + self.object_store_path.clone() } /// Returns the summary statistics for this chunk pub fn table_summary(&self) -> &Arc { - self.table.table_summary() + &self.table_summary } /// Returns the name of the table this chunk holds pub fn table_name(&self) -> &str { - self.table.name() + &self.table_summary.name } /// Return the approximate memory size of the chunk, in bytes including the /// dictionary, tables, and their rows. pub fn size(&self) -> usize { - self.table.size() + self.partition_key.len() + mem::size_of::() + mem::size_of::() + + self.partition_key.len() + + self.table_summary.size() + + mem::size_of_val(&self.schema.as_ref()) + + mem::size_of_val(&self.object_store_path) } - /// Return possibly restricted Schema for the table in this chunk - pub fn table_schema(&self, selection: Selection<'_>) -> Result { - self.table.schema(selection).context(NamedTableError { - table_name: self.table_name(), + /// Return possibly restricted Schema for this chunk + pub fn schema(&self, selection: Selection<'_>) -> Result { + Ok(match selection { + Selection::All => self.schema.as_ref().clone(), + Selection::Some(columns) => { + let columns = self.schema.select(columns).context(SelectColumns)?; + self.schema.project(&columns) + } }) } /// Infallably return the full schema (for all columns) for this chunk pub fn full_schema(&self) -> Arc { - self.table.full_schema() + Arc::clone(&self.schema) } - // Return true if the table in this chunk contains values within the time range + // Return true if this chunk contains values within the time range pub fn has_timerange(&self, timestamp_range: Option<&TimestampRange>) -> bool { - self.table.matches_predicate(timestamp_range) + match (self.timestamp_range, timestamp_range) { + (Some(a), Some(b)) => !a.disjoint(b), + (None, Some(_)) => false, /* If this chunk doesn't have a time column it can't match */ + // the predicate + (_, None) => true, + } } // Return the columns names that belong to the given column // selection pub fn column_names(&self, selection: Selection<'_>) -> Option> { - self.table.column_names(selection) + let fields = self.schema.inner().fields().iter(); + + Some(match selection { + Selection::Some(cols) => fields + .filter_map(|x| { + if cols.contains(&x.name().as_str()) { + Some(x.name().clone()) + } else { + None + } + }) + .collect(), + Selection::All => fields.map(|x| x.name().clone()).collect(), + }) } - /// Return stream of data read from parquet file of the given table + /// Return stream of data read from parquet file pub fn read_filter( &self, predicate: &Predicate, selection: Selection<'_>, ) -> Result { - self.table - .read_filter(predicate, selection) - .context(ReadParquet { - table_name: self.table_name(), - }) + Storage::read_filter( + predicate, + selection, + Arc::clone(&self.schema.as_arrow()), + self.object_store_path.clone(), + Arc::clone(&self.object_store), + ) + .context(ReadParquet) } - /// The total number of rows in all row groups in all tables in this chunk. + /// The total number of rows in all row groups in this chunk. pub fn rows(&self) -> usize { - self.table.rows() + // All columns have the same rows, so return get row count of the first column + self.table_summary.columns[0].count() as usize } } + +/// Extracts min/max values of the timestamp column, from the TableSummary, if possible +fn extract_range(table_summary: &TableSummary) -> Option { + table_summary + .column(TIME_COLUMN_NAME) + .map(|c| { + if let Statistics::I64(s) = &c.stats { + if let (Some(min), Some(max)) = (s.min, s.max) { + return Some(TimestampRange::new(min, max)); + } + } + None + }) + .flatten() +} diff --git a/parquet_file/src/lib.rs b/parquet_file/src/lib.rs index 347ac158f8..21eda9f41f 100644 --- a/parquet_file/src/lib.rs +++ b/parquet_file/src/lib.rs @@ -14,7 +14,6 @@ pub mod cleanup; pub mod metadata; pub mod rebuild; pub mod storage; -pub mod table; pub mod test_utils; mod storage_testing; diff --git a/parquet_file/src/metadata.rs b/parquet_file/src/metadata.rs index 2646ff7109..51dda77692 100644 --- a/parquet_file/src/metadata.rs +++ b/parquet_file/src/metadata.rs @@ -551,7 +551,7 @@ mod tests { // step 1: read back schema let schema_actual = read_schema_from_parquet_metadata(&parquet_metadata).unwrap(); - let schema_expected = chunk.table_schema(Selection::All).unwrap(); + let schema_expected = chunk.schema(Selection::All).unwrap(); assert_eq!(schema_actual, schema_expected); // step 2: read back statistics @@ -574,7 +574,7 @@ mod tests { // step 1: read back schema let schema_actual = read_schema_from_parquet_metadata(&parquet_metadata).unwrap(); - let schema_expected = chunk.table_schema(Selection::All).unwrap(); + let schema_expected = chunk.schema(Selection::All).unwrap(); assert_eq!(schema_actual, schema_expected); // step 2: read back statistics @@ -595,7 +595,7 @@ mod tests { // step 1: read back schema let schema_actual = read_schema_from_parquet_metadata(&parquet_metadata).unwrap(); - let schema_expected = chunk.table_schema(Selection::All).unwrap(); + let schema_expected = chunk.schema(Selection::All).unwrap(); assert_eq!(schema_actual, schema_expected); // step 2: reading back statistics fails @@ -618,7 +618,7 @@ mod tests { // step 1: read back schema let schema_actual = read_schema_from_parquet_metadata(&parquet_metadata).unwrap(); - let schema_expected = chunk.table_schema(Selection::All).unwrap(); + let schema_expected = chunk.schema(Selection::All).unwrap(); assert_eq!(schema_actual, schema_expected); // step 2: reading back statistics fails diff --git a/parquet_file/src/table.rs b/parquet_file/src/table.rs deleted file mode 100644 index 28276a951b..0000000000 --- a/parquet_file/src/table.rs +++ /dev/null @@ -1,177 +0,0 @@ -use snafu::{ResultExt, Snafu}; -use std::{collections::BTreeSet, mem, sync::Arc}; - -use crate::storage::{self, Storage}; -use data_types::{ - partition_metadata::{Statistics, TableSummary}, - timestamp::TimestampRange, -}; -use datafusion::physical_plan::SendableRecordBatchStream; -use internal_types::{ - schema::{Schema, TIME_COLUMN_NAME}, - selection::Selection, -}; -use object_store::{path::Path, ObjectStore}; -use query::predicate::Predicate; - -#[derive(Debug, Snafu)] -pub enum Error { - #[snafu(display("Failed to select columns: {}", source))] - SelectColumns { - source: internal_types::schema::Error, - }, - - #[snafu(display("Failed to read parquet: {}", source))] - ReadParquet { source: storage::Error }, -} - -pub type Result = std::result::Result; - -/// Table that belongs to a chunk persisted in a parquet file in object store -#[derive(Debug, Clone)] -pub struct Table { - /// Meta data of the table - table_summary: Arc, - - /// Path in the object store. Format: - /// //data///.parquet - object_store_path: Path, - - /// Object store of the above relative path to open and read the file - object_store: Arc, - - /// Schema that goes with this table's parquet file - table_schema: Arc, - - /// Timestamp range of this table's parquet file - /// (extracted from TableSummary) - timestamp_range: Option, -} - -impl Table { - pub fn new( - table_summary: TableSummary, - path: Path, - store: Arc, - schema: Schema, - ) -> Self { - let timestamp_range = extract_range(&table_summary); - - Self { - table_summary: Arc::new(table_summary), - object_store_path: path, - object_store: store, - table_schema: Arc::new(schema), - timestamp_range, - } - } - - pub fn table_summary(&self) -> &Arc { - &self.table_summary - } - - pub fn has_table(&self, table_name: &str) -> bool { - self.table_summary.has_table(table_name) - } - - /// Return the approximate memory size of the table - pub fn size(&self) -> usize { - mem::size_of::() - + self.table_summary.size() - + mem::size_of_val(&self.object_store_path) - + mem::size_of_val(&self.table_schema.as_ref()) - } - - /// Return name of this table - pub fn name(&self) -> &str { - &self.table_summary.name - } - - /// Return the object store path of this table - pub fn path(&self) -> Path { - self.object_store_path.clone() - } - - /// Return schema of this table for specified selection columns - pub fn schema(&self, selection: Selection<'_>) -> Result { - Ok(match selection { - Selection::All => self.table_schema.as_ref().clone(), - Selection::Some(columns) => { - let columns = self.table_schema.select(columns).context(SelectColumns)?; - self.table_schema.project(&columns) - } - }) - } - - /// Infallably return the full schema (for all columns) for this chunk - pub fn full_schema(&self) -> Arc { - Arc::clone(&self.table_schema) - } - - // Check if 2 time ranges overlap - pub fn matches_predicate(&self, timestamp_range: Option<&TimestampRange>) -> bool { - match (self.timestamp_range, timestamp_range) { - (Some(a), Some(b)) => !a.disjoint(b), - (None, Some(_)) => false, /* If this chunk doesn't have a time column it can't match */ - // the predicate - (_, None) => true, - } - } - - // Return columns names of this table that belong to the given column selection - pub fn column_names(&self, selection: Selection<'_>) -> Option> { - let fields = self.table_schema.inner().fields().iter(); - - Some(match selection { - Selection::Some(cols) => fields - .filter_map(|x| { - if cols.contains(&x.name().as_str()) { - Some(x.name().clone()) - } else { - None - } - }) - .collect(), - Selection::All => fields.map(|x| x.name().clone()).collect(), - }) - } - - /// Return stream of data read from parquet file for given predicate and - /// column selection - pub fn read_filter( - &self, - predicate: &Predicate, - selection: Selection<'_>, - ) -> Result { - Storage::read_filter( - predicate, - selection, - Arc::clone(&self.table_schema.as_arrow()), - self.object_store_path.clone(), - Arc::clone(&self.object_store), - ) - .context(ReadParquet) - } - - /// The number of rows of this table - pub fn rows(&self) -> usize { - // All columns have the same rows, so return get row count of the first column - self.table_summary.columns[0].count() as usize - } -} - -/// Extracts min/max values of the timestamp column, from the TableSummary, if possible -fn extract_range(table_summary: &TableSummary) -> Option { - table_summary - .column(TIME_COLUMN_NAME) - .map(|c| { - if let Statistics::I64(s) = &c.stats { - if let (Some(min), Some(max)) = (s.min, s.max) { - return Some(TimestampRange::new(min, max)); - } - } - None - }) - .flatten() -} diff --git a/parquet_file/src/test_utils.rs b/parquet_file/src/test_utils.rs index d3c1431249..2548b68e2d 100644 --- a/parquet_file/src/test_utils.rs +++ b/parquet_file/src/test_utils.rs @@ -66,7 +66,7 @@ pub async fn load_parquet_from_store_for_chunk( chunk: &Chunk, store: Arc, ) -> Result<(String, Vec)> { - let path = chunk.table_path(); + let path = chunk.path(); let table_name = chunk.table_name().to_string(); Ok(( table_name, @@ -584,7 +584,7 @@ pub async fn make_metadata( .await .unwrap(); ( - chunk.table_path(), + chunk.path(), read_parquet_metadata_from_file(parquet_data).unwrap(), ) } diff --git a/server/src/db.rs b/server/src/db.rs index 479159588a..8130477f0a 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -1461,7 +1461,7 @@ mod tests { .eq(1.0) .unwrap(); - let expected_parquet_size = 759; + let expected_parquet_size = 647; catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "read_buffer", 1616).unwrap(); // now also in OS catalog_chunk_size_bytes_metric_eq( @@ -1817,7 +1817,7 @@ mod tests { ("svr_id", "10"), ]) .histogram() - .sample_sum_eq(2375.0) + .sample_sum_eq(2263.0) .unwrap(); // it should be the same chunk! @@ -1925,7 +1925,7 @@ mod tests { ("svr_id", "10"), ]) .histogram() - .sample_sum_eq(2375.0) + .sample_sum_eq(2263.0) .unwrap(); // Unload RB chunk but keep it in OS @@ -1953,7 +1953,7 @@ mod tests { ("svr_id", "10"), ]) .histogram() - .sample_sum_eq(759.0) + .sample_sum_eq(647.0) .unwrap(); // Verify data written to the parquet file in object store @@ -2342,7 +2342,7 @@ mod tests { Arc::from("cpu"), 0, ChunkStorage::ReadBufferAndObjectStore, - 2373, // size of RB and OS chunks + 2261, // size of RB and OS chunks 1, ), ChunkSummary::new_without_timestamps( @@ -2402,7 +2402,7 @@ mod tests { .memory() .parquet() .get_total(), - 759 + 647 ); } @@ -2864,7 +2864,7 @@ mod tests { let chunk = db.chunk(table_name, partition_key, *chunk_id).unwrap(); let chunk = chunk.read(); if let ChunkStage::Persisted { parquet, .. } = chunk.stage() { - paths_expected.push(parquet.table_path().display()); + paths_expected.push(parquet.path().display()); } else { panic!("Wrong chunk state."); } @@ -2944,7 +2944,7 @@ mod tests { let chunk = db.chunk(&table_name, &partition_key, chunk_id).unwrap(); let chunk = chunk.read(); if let ChunkStage::Persisted { parquet, .. } = chunk.stage() { - paths_keep.push(parquet.table_path()); + paths_keep.push(parquet.path()); } else { panic!("Wrong chunk state."); } diff --git a/server/src/db/chunk.rs b/server/src/db/chunk.rs index 014af96ff8..a933e1b6fa 100644 --- a/server/src/db/chunk.rs +++ b/server/src/db/chunk.rs @@ -195,7 +195,7 @@ impl DbChunk { /// persisted, if any pub fn object_store_path(&self) -> Option { match &self.state { - State::ParquetFile { chunk } => Some(chunk.table_path()), + State::ParquetFile { chunk } => Some(chunk.path()), _ => None, } } From 0cbe74dbde5b7044ed7cff8ff44a1da339135021 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 11 Jun 2021 06:55:40 -0400 Subject: [PATCH 3/8] fix: persistence to parquet by swapping order of arguments (#1687) * fix: fix order of arguments * test: for persistence --- server/src/db/lifecycle.rs | 4 +- tests/end_to_end_cases/mod.rs | 1 + tests/end_to_end_cases/persistence.rs | 65 +++++++++++++++++++++++++++ tests/end_to_end_cases/scenario.rs | 37 +++++++++++++++ 4 files changed, 105 insertions(+), 2 deletions(-) create mode 100644 tests/end_to_end_cases/persistence.rs diff --git a/server/src/db/lifecycle.rs b/server/src/db/lifecycle.rs index 629b5221b0..d23f9dda55 100644 --- a/server/src/db/lifecycle.rs +++ b/server/src/db/lifecycle.rs @@ -326,8 +326,8 @@ impl ChunkMover for LifecycleManager { fn write_to_object_store( &mut self, - partition_key: String, table_name: String, + partition_key: String, chunk_id: u32, ) -> TaskTracker { info!(%partition_key, %chunk_id, "write chunk to object store"); @@ -338,7 +338,7 @@ impl ChunkMover for LifecycleManager { tracker } - fn drop_chunk(&mut self, partition_key: String, table_name: String, chunk_id: u32) { + fn drop_chunk(&mut self, table_name: String, partition_key: String, chunk_id: u32) { info!(%partition_key, %chunk_id, "dropping chunk"); let _ = self .db diff --git a/tests/end_to_end_cases/mod.rs b/tests/end_to_end_cases/mod.rs index 44a933247a..8e3c3216af 100644 --- a/tests/end_to_end_cases/mod.rs +++ b/tests/end_to_end_cases/mod.rs @@ -4,6 +4,7 @@ pub mod management_api; pub mod management_cli; pub mod operations_api; pub mod operations_cli; +mod persistence; pub mod preservation; pub mod read_api; pub mod read_cli; diff --git a/tests/end_to_end_cases/persistence.rs b/tests/end_to_end_cases/persistence.rs new file mode 100644 index 0000000000..cc35af4937 --- /dev/null +++ b/tests/end_to_end_cases/persistence.rs @@ -0,0 +1,65 @@ +use std::{ + convert::TryInto, + time::{Duration, Instant}, +}; + +use data_types::chunk_metadata::ChunkSummary; + +use crate::common::server_fixture::ServerFixture; + +use super::scenario::{create_quickly_persisting_database, rand_name}; + +#[tokio::test] +async fn test_persistence() { + let fixture = ServerFixture::create_shared().await; + let mut write_client = fixture.write_client(); + let mut management_client = fixture.management_client(); + + let db_name = rand_name(); + create_quickly_persisting_database(&db_name, fixture.grpc_channel()).await; + + // Stream in a write that should exceed the limit + let lp_lines: Vec<_> = (0..1_000) + .map(|i| format!("data,tag1=val{} x={} {}", i, i * 10, i)) + .collect(); + + let num_lines_written = write_client + .write(&db_name, lp_lines.join("\n")) + .await + .expect("successful write"); + assert_eq!(num_lines_written, 1000); + + // wait for the chunk to be written to object store + let deadline = Instant::now() + Duration::from_secs(5); + let mut chunks = vec![]; + loop { + assert!( + Instant::now() < deadline, + "Chunk did not persist in time. Chunks were: {:#?}", + chunks + ); + + chunks = management_client + .list_chunks(&db_name) + .await + .expect("listing chunks"); + + let storage_string = chunks + .iter() + .map(|c| { + let c: ChunkSummary = c.clone().try_into().unwrap(); + format!("{:?}", c.storage) + }) + .collect::>() + .join(","); + + // Found a persisted chunk, all good + if storage_string == "ReadBufferAndObjectStore" { + return; + } + + // keep looking + println!("Current chunk storage: {:#?}", storage_string); + tokio::time::sleep(Duration::from_millis(200)).await + } +} diff --git a/tests/end_to_end_cases/scenario.rs b/tests/end_to_end_cases/scenario.rs index 7b602786ad..360832f5a1 100644 --- a/tests/end_to_end_cases/scenario.rs +++ b/tests/end_to_end_cases/scenario.rs @@ -316,6 +316,43 @@ pub async fn create_readable_database( .expect("create database failed"); } +/// given a channel to talk with the management api, create a new +/// database with the specified name that will aggressively try and +/// persist all data quickly +pub async fn create_quickly_persisting_database( + db_name: impl Into, + channel: tonic::transport::Channel, +) { + let db_name = db_name.into(); + + let mut management_client = influxdb_iox_client::management::Client::new(channel); + let rules = DatabaseRules { + name: db_name.clone(), + partition_template: Some(PartitionTemplate { + parts: vec![partition_template::Part { + part: Some(partition_template::part::Part::Time( + "%Y-%m-%d %H:00:00".into(), + )), + }], + }), + lifecycle_rules: Some(LifecycleRules { + mutable_linger_seconds: 1, + mutable_size_threshold: 100, + buffer_size_soft: 1024 * 1024, + buffer_size_hard: 1024 * 1024, + persist: true, + ..Default::default() + }), + ..Default::default() + }; + + management_client + .create_database(rules.clone()) + .await + .expect("create database failed"); + println!("Created quickly persisting database {}", db_name); +} + /// given a channel to talk with the managment api, create a new /// database with no mutable buffer configured, no partitioning rules pub async fn create_unreadable_database( From fb639ee54fc0187e73a460c22c14b60961ee4365 Mon Sep 17 00:00:00 2001 From: Nga Tran Date: Fri, 11 Jun 2021 07:06:08 -0400 Subject: [PATCH 4/8] feat: add UnionExec on top of the scan activities --- query/src/provider.rs | 279 +++++++++++++++++++++++++++++++++++++++--- query/src/test.rs | 259 +++++++++++++++++++++++++++++++++++++-- 2 files changed, 512 insertions(+), 26 deletions(-) diff --git a/query/src/provider.rs b/query/src/provider.rs index 2c0f93ea42..c81d6a3c2c 100644 --- a/query/src/provider.rs +++ b/query/src/provider.rs @@ -246,6 +246,7 @@ impl TableProvider for ChunkTableProvider { scan_schema, chunks, predicate, + false )?; Ok(plan) @@ -317,6 +318,11 @@ impl Deduplicater { /// ┌───────────────────────┐ │ /// │SortPreservingMergeExec│ │ /// └───────────────────────┘ │ + /// ▲ │ + /// │ │ + /// ┌───────────────────────┐ │ + /// │ UnionExec │ │ + /// └───────────────────────┘ │ /// ▲ | /// │ | /// ┌───────────┴───────────┐ │ @@ -340,18 +346,22 @@ impl Deduplicater { schema: ArrowSchemaRef, chunks: Vec>, predicate: Predicate, + for_testing: bool, // TODO: remove this parameter when #1682 and #1683 are done ) -> Result> { // find overlapped chunks and put them into the right group self.split_overlapped_chunks(chunks.to_vec())?; + // TODO: remove this parameter when #1682 and #1683 are done // TEMP until the rest of this module's code is complete: // merge all plans into the same - self.no_duplicates_chunks - .append(&mut self.in_chunk_duplicates_chunks); - for mut group in &mut self.overlapped_chunks_set { - self.no_duplicates_chunks.append(&mut group); + if for_testing { + self.no_duplicates_chunks + .append(&mut self.in_chunk_duplicates_chunks); + for mut group in &mut self.overlapped_chunks_set { + self.no_duplicates_chunks.append(&mut group); + } + self.overlapped_chunks_set.clear(); } - self.overlapped_chunks_set.clear(); // Building plans let mut plans = vec![]; @@ -396,16 +406,16 @@ impl Deduplicater { } } - let final_plan = plans.remove(0); - - // TODO - // There are still plan, add UnionExec - if !plans.is_empty() { - // final_plan = union_plan - panic!("Unexpected error: There should be only one output for scan plan, but there were: {:#?}", plans); + match plans.len() { + // No plan generated. Something must go wrong + // Even if the chunks are empty, IOxReadFilterNode is still created + 0 => panic!("Internal error generating deduplicate plan"), + // Only one plan, no need to add union node + // Return the plan itself + 1 => Ok(plans.remove(0)), + // Has many plans and need to union them + _ => Ok(Arc::new(UnionExec::new(plans))), } - - Ok(final_plan) } /// discover overlaps and split them into three groups: @@ -888,9 +898,8 @@ mod test { Predicate::default(), ); let batch = collect(sort_plan.unwrap()).await.unwrap(); - // data is not sorted on primary key(tag1, tag2, time) - - // NOTE: When the full deduplication is done, the duplciates will be removed from this output + // data is sorted on primary key(tag1, tag2, time) + // NOTE: When the full deduplication is done, the duplicates will be removed from this output let expected = vec![ "+-----------+------+------+-------------------------------+", "| field_int | tag1 | tag2 | time |", @@ -910,6 +919,242 @@ mod test { assert_batches_eq!(&expected, &batch); } + #[tokio::test] + async fn scan_plan_with_one_chunk_no_duplicates() { + // Test no duplicate at all + let chunk = Arc::new( + TestChunk::new(1) + .with_time_column_with_stats("t", "5", "7000") + .with_tag_column_with_stats("t", "tag1", "AL", "MT") + .with_int_field_column("t", "field_int") + .with_five_rows_of_data("t"), + ); + + // Datafusion schema of the chunk + let schema = chunk.table_schema(Selection::All).unwrap().as_arrow(); + + let mut deduplicator = Deduplicater::new(); + let plan = deduplicator.build_scan_plan( + Arc::from("t"), + schema, + vec![Arc::clone(&chunk)], + Predicate::default(), + true + ); + let batch = collect(plan.unwrap()).await.unwrap(); + // No duplicates so no sort at all. The data will stay in their original order + let expected = vec![ + "+-----------+------+-------------------------------+", + "| field_int | tag1 | time |", + "+-----------+------+-------------------------------+", + "| 1000 | MT | 1970-01-01 00:00:00.000001 |", + "| 10 | MT | 1970-01-01 00:00:00.000007 |", + "| 70 | CT | 1970-01-01 00:00:00.000000100 |", + "| 100 | AL | 1970-01-01 00:00:00.000000050 |", + "| 5 | MT | 1970-01-01 00:00:00.000005 |", + "+-----------+------+-------------------------------+", + ]; + assert_batches_eq!(&expected, &batch); + } + + #[tokio::test] + async fn scan_plan_with_one_chunk_with_duplicates() { + // Test one chunk with duplicate within + let chunk = Arc::new( + TestChunk::new(1) + .with_time_column_with_stats("t", "5", "7000") + .with_tag_column_with_stats("t", "tag1", "AL", "MT") + .with_int_field_column("t", "field_int") + .with_may_contain_pk_duplicates(true) + .with_ten_rows_of_data_some_duplicates("t"), + ); + + // Datafusion schema of the chunk + let schema = chunk.table_schema(Selection::All).unwrap().as_arrow(); + + let mut deduplicator = Deduplicater::new(); + let plan = deduplicator.build_scan_plan( + Arc::from("t"), + schema, + vec![Arc::clone(&chunk)], + Predicate::default(), + true + ); + let batch = collect(plan.unwrap()).await.unwrap(); + // Data must be sorted and duplicates removed + // TODO: it is just sorted for now. When https://github.com/influxdata/influxdb_iox/issues/1646 + // is done, duplicates will be removed + let expected = vec![ + "+-----------+------+-------------------------------+", + "| field_int | tag1 | time |", + "+-----------+------+-------------------------------+", + "| 100 | AL | 1970-01-01 00:00:00.000000050 |", + "| 10 | AL | 1970-01-01 00:00:00.000000050 |", + "| 70 | CT | 1970-01-01 00:00:00.000000100 |", + "| 70 | CT | 1970-01-01 00:00:00.000000500 |", + "| 5 | MT | 1970-01-01 00:00:00.000000005 |", + "| 30 | MT | 1970-01-01 00:00:00.000000005 |", + "| 1000 | MT | 1970-01-01 00:00:00.000001 |", + "| 1000 | MT | 1970-01-01 00:00:00.000002 |", + "| 10 | MT | 1970-01-01 00:00:00.000007 |", + "| 20 | MT | 1970-01-01 00:00:00.000007 |", + "+-----------+------+-------------------------------+", + ]; + assert_batches_eq!(&expected, &batch); + } + + #[tokio::test] + async fn scan_plan_with_two_overlapped_chunks_with_duplicates() { + // test overlapped chunks + let chunk1 = Arc::new( + TestChunk::new(1) + .with_time_column_with_stats("t", "5", "7000") + .with_tag_column_with_stats("t", "tag1", "AL", "MT") + .with_int_field_column("t", "field_int") + .with_ten_rows_of_data_some_duplicates("t"), + ); + + let chunk2 = Arc::new( + TestChunk::new(1) + .with_time_column_with_stats("t", "5", "7000") + .with_tag_column_with_stats("t", "tag1", "AL", "MT") + .with_int_field_column("t", "field_int") + .with_five_rows_of_data("t"), + ); + + // Datafusion schema of the chunk + let schema = chunk1.table_schema(Selection::All).unwrap().as_arrow(); + + let mut deduplicator = Deduplicater::new(); + let plan = deduplicator.build_scan_plan( + Arc::from("t"), + schema, + vec![Arc::clone(&chunk1), Arc::clone(&chunk2)], + Predicate::default(), + true + ); + let batch = collect(plan.unwrap()).await.unwrap(); + // 2 overlapped chunks will be sort merged and dupplicates removed + // Data must be sorted and duplicates removed + // TODO: it is just sorted for now. When https://github.com/influxdata/influxdb_iox/issues/1646 + // is done, duplicates will be removed + let expected = vec![ + "+-----------+------+-------------------------------+", + "| field_int | tag1 | time |", + "+-----------+------+-------------------------------+", + "| 100 | AL | 1970-01-01 00:00:00.000000050 |", + "| 10 | AL | 1970-01-01 00:00:00.000000050 |", + "| 100 | AL | 1970-01-01 00:00:00.000000050 |", + "| 70 | CT | 1970-01-01 00:00:00.000000100 |", + "| 70 | CT | 1970-01-01 00:00:00.000000100 |", + "| 70 | CT | 1970-01-01 00:00:00.000000500 |", + "| 5 | MT | 1970-01-01 00:00:00.000000005 |", + "| 30 | MT | 1970-01-01 00:00:00.000000005 |", + "| 1000 | MT | 1970-01-01 00:00:00.000001 |", + "| 1000 | MT | 1970-01-01 00:00:00.000001 |", + "| 1000 | MT | 1970-01-01 00:00:00.000002 |", + "| 5 | MT | 1970-01-01 00:00:00.000005 |", + "| 10 | MT | 1970-01-01 00:00:00.000007 |", + "| 20 | MT | 1970-01-01 00:00:00.000007 |", + "| 10 | MT | 1970-01-01 00:00:00.000007 |", + "+-----------+------+-------------------------------+", + ]; + assert_batches_eq!(&expected, &batch); + } + + #[tokio::test] + async fn scan_plan_with_four_chunks() { + // This test covers all kind of chunks: overlap, non-overlap without duplicates within, non-overlap with duplicates within + let chunk1 = Arc::new( + TestChunk::new(1) + .with_time_column_with_stats("t", "5", "7000") + .with_tag_column_with_stats("t", "tag1", "AL", "MT") + .with_int_field_column("t", "field_int") + .with_ten_rows_of_data_some_duplicates("t"), + ); + + // chunk2 overlaps with chunk 1 + let chunk2 = Arc::new( + TestChunk::new(1) + .with_time_column_with_stats("t", "5", "7000") + .with_tag_column_with_stats("t", "tag1", "AL", "MT") + .with_int_field_column("t", "field_int") + .with_five_rows_of_data("t"), + ); + + // chunk3 no overlap, no duplicates within + let chunk3 = Arc::new( + TestChunk::new(1) + .with_time_column_with_stats("t", "8000", "20000") + .with_tag_column_with_stats("t", "tag1", "UT", "WA") + .with_int_field_column("t", "field_int") + .with_three_rows_of_data("t"), + ); + + // chunk3 no overlap, duplicates within + let chunk4 = Arc::new( + TestChunk::new(1) + .with_time_column_with_stats("t", "28000", "220000") + .with_tag_column_with_stats("t", "tag1", "UT", "WA") + .with_int_field_column("t", "field_int") + .with_may_contain_pk_duplicates(true) + .with_four_rows_of_data("t"), + ); + + // Datafusion schema of the chunk + let schema = chunk1.table_schema(Selection::All).unwrap().as_arrow(); + + let mut deduplicator = Deduplicater::new(); + let plan = deduplicator.build_scan_plan( + Arc::from("t"), + schema, + vec![ + Arc::clone(&chunk1), + Arc::clone(&chunk2), + Arc::clone(&chunk3), + Arc::clone(&chunk4), + ], + Predicate::default(), + true + ); + let batch = collect(plan.unwrap()).await.unwrap(); + // Final data will be partially sorted and duplicates removed. Detailed: + // . chunk1 and chunk2 will be sorted merged and deduplicated (rows 8-32) + // . chunk3 will stay in its original (rows 1-3) + // . chunk4 will be sorted and deduplicated (rows 4-7) + // TODO: data is only partially sorted for now. The deduplicated will happen when When https://github.com/influxdata/influxdb_iox/issues/1646 + // is done + let expected = vec![ + "+-----------+------+-------------------------------+", + "| field_int | tag1 | time |", + "+-----------+------+-------------------------------+", + "| 1000 | WA | 1970-01-01 00:00:00.000008 |", + "| 10 | VT | 1970-01-01 00:00:00.000010 |", + "| 70 | UT | 1970-01-01 00:00:00.000020 |", + "| 70 | UT | 1970-01-01 00:00:00.000020 |", + "| 10 | VT | 1970-01-01 00:00:00.000010 |", + "| 50 | VT | 1970-01-01 00:00:00.000010 |", + "| 1000 | WA | 1970-01-01 00:00:00.000008 |", + "| 100 | AL | 1970-01-01 00:00:00.000000050 |", + "| 10 | AL | 1970-01-01 00:00:00.000000050 |", + "| 100 | AL | 1970-01-01 00:00:00.000000050 |", + "| 70 | CT | 1970-01-01 00:00:00.000000100 |", + "| 70 | CT | 1970-01-01 00:00:00.000000100 |", + "| 70 | CT | 1970-01-01 00:00:00.000000500 |", + "| 5 | MT | 1970-01-01 00:00:00.000000005 |", + "| 30 | MT | 1970-01-01 00:00:00.000000005 |", + "| 1000 | MT | 1970-01-01 00:00:00.000001 |", + "| 1000 | MT | 1970-01-01 00:00:00.000001 |", + "| 1000 | MT | 1970-01-01 00:00:00.000002 |", + "| 5 | MT | 1970-01-01 00:00:00.000005 |", + "| 10 | MT | 1970-01-01 00:00:00.000007 |", + "| 20 | MT | 1970-01-01 00:00:00.000007 |", + "| 10 | MT | 1970-01-01 00:00:00.000007 |", + "+-----------+------+-------------------------------+", + ]; + assert_batches_eq!(&expected, &batch); + } + fn chunk_ids(group: &[Arc]) -> String { let ids = group.iter().map(|c| c.id().to_string()).collect::>(); ids.join(", ") diff --git a/query/src/test.rs b/query/src/test.rs index 59a8183aab..9ff255418a 100644 --- a/query/src/test.rs +++ b/query/src/test.rs @@ -21,7 +21,9 @@ use crate::{ use crate::{exec::Executor, pruning::Prunable}; use internal_types::{ - schema::{builder::SchemaBuilder, merge::SchemaMerger, InfluxColumnType, Schema}, + schema::{ + builder::SchemaBuilder, merge::SchemaMerger, InfluxColumnType, Schema, TIME_COLUMN_NAME, + }, selection::Selection, }; @@ -244,6 +246,36 @@ impl TestChunk { self.add_schema_to_table(table_name, new_column_schema) } + /// Register a timetamp column with the test chunk + pub fn with_time_column_with_stats( + self, + table_name: impl Into, + min: &str, + max: &str, + ) -> Self { + let table_name = table_name.into(); + + let mut new_self = self.with_time_column(&table_name); + + // Now, find the appropriate column summary and update the stats + let column_summary: &mut ColumnSummary = new_self + .table_summary + .as_mut() + .expect("had table summary") + .columns + .iter_mut() + .find(|c| c.name == TIME_COLUMN_NAME) + .expect("had column"); + + column_summary.stats = Statistics::String(StatValues { + min: Some(min.to_string()), + max: Some(max.to_string()), + ..Default::default() + }); + + new_self + } + /// Register an int field column with the test chunk pub fn with_int_field_column( self, @@ -367,19 +399,146 @@ impl TestChunk { self } - /// Prepares this chunk to return a specific record batch with five - /// rows of non null data that look like + /// Prepares this chunk to return a specific record batch with three + /// rows of non null data that look like, no duplicates within /// "+------+------+-----------+-------------------------------+", /// "| tag1 | tag2 | field_int | time |", /// "+------+------+-----------+-------------------------------+", - /// "| MA | MA | 1000 | 1970-01-01 00:00:00.000001 |", - /// "| MT | MT | 10 | 1970-01-01 00:00:00.000007 |", - /// "| CT | CT | 70 | 1970-01-01 00:00:00.000000100 |", - /// "| AL | AL | 100 | 1970-01-01 00:00:00.000000050 |", - /// "| MT | MT | 5 | 1970-01-01 00:00:00.000005 |", + /// "| WA | SC | 1000 | 1970-01-01 00:00:00.000008 |", + /// "| VT | NC | 10 | 1970-01-01 00:00:00.000010 |", + /// "| UT | RI | 70 | 1970-01-01 00:00:00.000020 |", /// "+------+------+-----------+-------------------------------+", + /// Stats(min, max) : tag1(UT, WA), tag2(RI, SC), time(8000, 20000) + pub fn with_three_rows_of_data(mut self, _table_name: impl Into) -> Self { + let schema = self + .table_schema + .as_ref() + .expect("table must exist in TestChunk"); + + // create arrays + let columns = schema + .iter() + .map(|(_influxdb_column_type, field)| match field.data_type() { + DataType::Int64 => Arc::new(Int64Array::from(vec![1000, 10, 70])) as ArrayRef, + DataType::Utf8 => match field.name().as_str() { + "tag1" => Arc::new(StringArray::from(vec!["WA", "VT", "UT"])) as ArrayRef, + "tag2" => Arc::new(StringArray::from(vec!["SC", "NC", "RI"])) as ArrayRef, + _ => Arc::new(StringArray::from(vec!["TX", "PR", "OR"])) as ArrayRef, + }, + DataType::Timestamp(TimeUnit::Nanosecond, _) => Arc::new( + TimestampNanosecondArray::from_vec(vec![8000, 10000, 20000], None), + ) as ArrayRef, + DataType::Dictionary(key, value) + if key.as_ref() == &DataType::Int32 && value.as_ref() == &DataType::Utf8 => + { + match field.name().as_str() { + "tag1" => Arc::new( + vec!["WA", "VT", "UT"] + .into_iter() + .collect::>(), + ) as ArrayRef, + "tag2" => Arc::new( + vec!["SC", "NC", "RI"] + .into_iter() + .collect::>(), + ) as ArrayRef, + _ => Arc::new( + vec!["TX", "PR", "OR"] + .into_iter() + .collect::>(), + ) as ArrayRef, + } + } + _ => unimplemented!( + "Unimplemented data type for test database: {:?}", + field.data_type() + ), + }) + .collect::>(); + + let batch = RecordBatch::try_new(schema.into(), columns).expect("made record batch"); + + self.table_data.push(Arc::new(batch)); + self + } + + /// Prepares this chunk to return a specific record batch with three + /// rows of non null data that look like, duplicates within + /// "+------+------+-----------+-------------------------------+", + /// "| tag1 | tag2 | field_int | time |", + /// "+------+------+-----------+-------------------------------+", + /// "| WA | SC | 1000 | 1970-01-01 00:00:00.000028 |", + /// "| VT | NC | 10 | 1970-01-01 00:00:00.000210 |", (1) + /// "| UT | RI | 70 | 1970-01-01 00:00:00.000220 |", + /// "| VT | NC | 50 | 1970-01-01 00:00:00.000210 |", // duplicate of (1) + /// "+------+------+-----------+-------------------------------+", + /// Stats(min, max) : tag1(UT, WA), tag2(RI, SC), time(28000, 220000) + pub fn with_four_rows_of_data(mut self, _table_name: impl Into) -> Self { + let schema = self + .table_schema + .as_ref() + .expect("table must exist in TestChunk"); + + // create arrays + let columns = schema + .iter() + .map(|(_influxdb_column_type, field)| match field.data_type() { + DataType::Int64 => Arc::new(Int64Array::from(vec![1000, 10, 70, 50])) as ArrayRef, + DataType::Utf8 => match field.name().as_str() { + "tag1" => Arc::new(StringArray::from(vec!["WA", "VT", "UT", "VT"])) as ArrayRef, + "tag2" => Arc::new(StringArray::from(vec!["SC", "NC", "RI", "NC"])) as ArrayRef, + _ => Arc::new(StringArray::from(vec!["TX", "PR", "OR", "AL"])) as ArrayRef, + }, + DataType::Timestamp(TimeUnit::Nanosecond, _) => Arc::new( + TimestampNanosecondArray::from_vec(vec![8000, 10000, 20000, 10000], None), + ) as ArrayRef, + DataType::Dictionary(key, value) + if key.as_ref() == &DataType::Int32 && value.as_ref() == &DataType::Utf8 => + { + match field.name().as_str() { + "tag1" => Arc::new( + vec!["WA", "VT", "UT", "VT"] + .into_iter() + .collect::>(), + ) as ArrayRef, + "tag2" => Arc::new( + vec!["SC", "NC", "RI", "NC"] + .into_iter() + .collect::>(), + ) as ArrayRef, + _ => Arc::new( + vec!["TX", "PR", "OR", "AL"] + .into_iter() + .collect::>(), + ) as ArrayRef, + } + } + _ => unimplemented!( + "Unimplemented data type for test database: {:?}", + field.data_type() + ), + }) + .collect::>(); + + let batch = RecordBatch::try_new(schema.into(), columns).expect("made record batch"); + + self.table_data.push(Arc::new(batch)); + self + } + + /// Prepares this chunk to return a specific record batch with five + /// rows of non null data that look like, no duplicates within + /// "+------+------+-----------+-------------------------------+", + /// "| tag1 | tag2 | field_int | time |", + /// "+------+------+-----------+-------------------------------+", + /// "| MT | CT | 1000 | 1970-01-01 00:00:00.000001 |", + /// "| MT | AL | 10 | 1970-01-01 00:00:00.000007 |", + /// "| CT | CT | 70 | 1970-01-01 00:00:00.000000100 |", + /// "| AL | MA | 100 | 1970-01-01 00:00:00.000000050 |", + /// "| MT | AL | 5 | 1970-01-01 00:00:00.000005 |", + /// "+------+------+-----------+-------------------------------+", + /// Stats(min, max) : tag1(AL, MT), tag2(AL, MA), time(5, 7000) pub fn with_five_rows_of_data(mut self, _table_name: impl Into) -> Self { - //let table_name = table_name.into(); let schema = self .table_schema .as_ref() @@ -439,6 +598,88 @@ impl TestChunk { self } + /// Prepares this chunk to return a specific record batch with ten + /// rows of non null data that look like, duplicates within + /// "+------+------+-----------+-------------------------------+", + /// "| tag1 | tag2 | field_int | time |", + /// "+------+------+-----------+-------------------------------+", + /// "| MT | CT | 1000 | 1970-01-01 00:00:00.000001 |", + /// "| MT | AL | 10 | 1970-01-01 00:00:00.000007 |", (1) + /// "| CT | CT | 70 | 1970-01-01 00:00:00.000000100 |", + /// "| AL | MA | 100 | 1970-01-01 00:00:00.000000050 |", (2) + /// "| MT | AL | 5 | 1970-01-01 00:00:00.000005 |", (3) + /// "| MT | CT | 1000 | 1970-01-01 00:00:00.000002 |", + /// "| MT | AL | 20 | 1970-01-01 00:00:00.000007 |", // Duplicate with (1) + /// "| CT | CT | 70 | 1970-01-01 00:00:00.000000500 |", + /// "| AL | MA | 10 | 1970-01-01 00:00:00.000000050 |", // Duplicate with (2) + /// "| MT | AL | 30 | 1970-01-01 00:00:00.000005 |", // Duplicate with (3) + /// "+------+------+-----------+-------------------------------+", + /// Stats(min, max) : tag1(AL, MT), tag2(AL, MA), time(5, 7000) + pub fn with_ten_rows_of_data_some_duplicates(mut self, _table_name: impl Into) -> Self { + //let table_name = table_name.into(); + let schema = self + .table_schema + .as_ref() + .expect("table must exist in TestChunk"); + + // create arrays + let columns = schema + .iter() + .map(|(_influxdb_column_type, field)| match field.data_type() { + DataType::Int64 => Arc::new(Int64Array::from(vec![ + 1000, 10, 70, 100, 5, 1000, 20, 70, 10, 30, + ])) as ArrayRef, + DataType::Utf8 => match field.name().as_str() { + "tag1" => Arc::new(StringArray::from(vec![ + "MT", "MT", "CT", "AL", "MT", "MT", "MT", "CT", "AL", "MT", + ])) as ArrayRef, + "tag2" => Arc::new(StringArray::from(vec![ + "CT", "AL", "CT", "MA", "AL", "CT", "AL", "CT", "MA", "AL", + ])) as ArrayRef, + _ => Arc::new(StringArray::from(vec![ + "CT", "MT", "AL", "AL", "MT", "CT", "MT", "AL", "AL", "MT", + ])) as ArrayRef, + }, + DataType::Timestamp(TimeUnit::Nanosecond, _) => { + Arc::new(TimestampNanosecondArray::from_vec( + vec![1000, 7000, 100, 50, 5, 2000, 7000, 500, 50, 5], + None, + )) as ArrayRef + } + DataType::Dictionary(key, value) + if key.as_ref() == &DataType::Int32 && value.as_ref() == &DataType::Utf8 => + { + match field.name().as_str() { + "tag1" => Arc::new( + vec!["MT", "MT", "CT", "AL", "MT", "MT", "MT", "CT", "AL", "MT"] + .into_iter() + .collect::>(), + ) as ArrayRef, + "tag2" => Arc::new( + vec!["CT", "AL", "CT", "MA", "AL", "CT", "AL", "CT", "MA", "AL"] + .into_iter() + .collect::>(), + ) as ArrayRef, + _ => Arc::new( + vec!["CT", "MT", "AL", "AL", "MT", "CT", "MT", "AL", "AL", "MT"] + .into_iter() + .collect::>(), + ) as ArrayRef, + } + } + _ => unimplemented!( + "Unimplemented data type for test database: {:?}", + field.data_type() + ), + }) + .collect::>(); + + let batch = RecordBatch::try_new(schema.into(), columns).expect("made record batch"); + + self.table_data.push(Arc::new(batch)); + self + } + /// Returns all columns of the table pub fn all_column_names(&self) -> Option { let column_names = self.table_schema.as_ref().map(|schema| { From ea9edef71651aaf19190dde3fb9ef866c36dd8ce Mon Sep 17 00:00:00 2001 From: Nga Tran Date: Fri, 11 Jun 2021 07:18:33 -0400 Subject: [PATCH 5/8] fix: testing option --- query/src/provider.rs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/query/src/provider.rs b/query/src/provider.rs index c81d6a3c2c..12ebc0cec2 100644 --- a/query/src/provider.rs +++ b/query/src/provider.rs @@ -246,7 +246,7 @@ impl TableProvider for ChunkTableProvider { scan_schema, chunks, predicate, - false + false, )?; Ok(plan) @@ -354,7 +354,7 @@ impl Deduplicater { // TODO: remove this parameter when #1682 and #1683 are done // TEMP until the rest of this module's code is complete: // merge all plans into the same - if for_testing { + if !for_testing { self.no_duplicates_chunks .append(&mut self.in_chunk_duplicates_chunks); for mut group in &mut self.overlapped_chunks_set { @@ -440,7 +440,7 @@ impl Deduplicater { Ok(()) } - /// Return true if all chunks are neither overlap nor has duplicates in itself + /// Return true if all chunks neither overlap nor have duplicates in itself fn no_duplicates(&self) -> bool { self.overlapped_chunks_set.is_empty() && self.in_chunk_duplicates_chunks.is_empty() } @@ -939,7 +939,7 @@ mod test { schema, vec![Arc::clone(&chunk)], Predicate::default(), - true + true, ); let batch = collect(plan.unwrap()).await.unwrap(); // No duplicates so no sort at all. The data will stay in their original order @@ -978,7 +978,7 @@ mod test { schema, vec![Arc::clone(&chunk)], Predicate::default(), - true + true, ); let batch = collect(plan.unwrap()).await.unwrap(); // Data must be sorted and duplicates removed @@ -1031,7 +1031,7 @@ mod test { schema, vec![Arc::clone(&chunk1), Arc::clone(&chunk2)], Predicate::default(), - true + true, ); let batch = collect(plan.unwrap()).await.unwrap(); // 2 overlapped chunks will be sort merged and dupplicates removed @@ -1115,14 +1115,14 @@ mod test { Arc::clone(&chunk4), ], Predicate::default(), - true + true, ); let batch = collect(plan.unwrap()).await.unwrap(); // Final data will be partially sorted and duplicates removed. Detailed: // . chunk1 and chunk2 will be sorted merged and deduplicated (rows 8-32) // . chunk3 will stay in its original (rows 1-3) // . chunk4 will be sorted and deduplicated (rows 4-7) - // TODO: data is only partially sorted for now. The deduplicated will happen when When https://github.com/influxdata/influxdb_iox/issues/1646 + // TODO: data is only partially sorted for now. The deduplication will happen when When https://github.com/influxdata/influxdb_iox/issues/1646 // is done let expected = vec![ "+-----------+------+-------------------------------+", From e34d157f28e0828e6c802cea39ba4cfc8539c2a6 Mon Sep 17 00:00:00 2001 From: Nga Tran Date: Fri, 11 Jun 2021 07:30:49 -0400 Subject: [PATCH 6/8] fix: comments --- query/src/provider.rs | 4 +--- query/src/test.rs | 6 +++--- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/query/src/provider.rs b/query/src/provider.rs index 12ebc0cec2..c9d6ceb2cb 100644 --- a/query/src/provider.rs +++ b/query/src/provider.rs @@ -351,7 +351,6 @@ impl Deduplicater { // find overlapped chunks and put them into the right group self.split_overlapped_chunks(chunks.to_vec())?; - // TODO: remove this parameter when #1682 and #1683 are done // TEMP until the rest of this module's code is complete: // merge all plans into the same if !for_testing { @@ -1034,8 +1033,7 @@ mod test { true, ); let batch = collect(plan.unwrap()).await.unwrap(); - // 2 overlapped chunks will be sort merged and dupplicates removed - // Data must be sorted and duplicates removed + // Two overlapped chunks will be sort merged with dupplicates removed // TODO: it is just sorted for now. When https://github.com/influxdata/influxdb_iox/issues/1646 // is done, duplicates will be removed let expected = vec![ diff --git a/query/src/test.rs b/query/src/test.rs index 9ff255418a..ca2ff6dd89 100644 --- a/query/src/test.rs +++ b/query/src/test.rs @@ -235,7 +235,7 @@ impl TestChunk { new_self } - /// Register a timetamp column with the test chunk + /// Register a timestamp column with the test chunk pub fn with_time_column(self, table_name: impl Into) -> Self { let table_name = table_name.into(); @@ -246,7 +246,7 @@ impl TestChunk { self.add_schema_to_table(table_name, new_column_schema) } - /// Register a timetamp column with the test chunk + /// Register a timestamp column with the test chunk pub fn with_time_column_with_stats( self, table_name: impl Into, @@ -462,7 +462,7 @@ impl TestChunk { self } - /// Prepares this chunk to return a specific record batch with three + /// Prepares this chunk to return a specific record batch with four /// rows of non null data that look like, duplicates within /// "+------+------+-----------+-------------------------------+", /// "| tag1 | tag2 | field_int | time |", From 4224b693d9a8d60e8dfa3a11bed0cd1e45c16eab Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 11 Jun 2021 07:33:14 -0400 Subject: [PATCH 7/8] refactor: combine preservation.rs and persistence.rs (#1692) Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com> --- tests/end_to_end_cases/mod.rs | 1 - tests/end_to_end_cases/persistence.rs | 246 +++++++++++++++++++++---- tests/end_to_end_cases/preservation.rs | 162 ---------------- 3 files changed, 209 insertions(+), 200 deletions(-) delete mode 100644 tests/end_to_end_cases/preservation.rs diff --git a/tests/end_to_end_cases/mod.rs b/tests/end_to_end_cases/mod.rs index 8e3c3216af..8f87a95693 100644 --- a/tests/end_to_end_cases/mod.rs +++ b/tests/end_to_end_cases/mod.rs @@ -5,7 +5,6 @@ pub mod management_cli; pub mod operations_api; pub mod operations_cli; mod persistence; -pub mod preservation; pub mod read_api; pub mod read_cli; pub mod scenario; diff --git a/tests/end_to_end_cases/persistence.rs b/tests/end_to_end_cases/persistence.rs index cc35af4937..fa3f1595dd 100644 --- a/tests/end_to_end_cases/persistence.rs +++ b/tests/end_to_end_cases/persistence.rs @@ -1,19 +1,18 @@ -use std::{ - convert::TryInto, - time::{Duration, Instant}, +use arrow_util::assert_batches_eq; +use data_types::chunk_metadata::{ChunkStorage, ChunkSummary}; +//use generated_types::influxdata::iox::management::v1::*; +use influxdb_iox_client::operations; + +use super::scenario::{ + collect_query, create_quickly_persisting_database, create_readable_database, rand_name, }; - -use data_types::chunk_metadata::ChunkSummary; - use crate::common::server_fixture::ServerFixture; - -use super::scenario::{create_quickly_persisting_database, rand_name}; +use std::convert::TryInto; #[tokio::test] -async fn test_persistence() { +async fn test_chunk_is_persisted_automatically() { let fixture = ServerFixture::create_shared().await; let mut write_client = fixture.write_client(); - let mut management_client = fixture.management_client(); let db_name = rand_name(); create_quickly_persisting_database(&db_name, fixture.grpc_channel()).await; @@ -29,37 +28,210 @@ async fn test_persistence() { .expect("successful write"); assert_eq!(num_lines_written, 1000); - // wait for the chunk to be written to object store - let deadline = Instant::now() + Duration::from_secs(5); - let mut chunks = vec![]; + wait_for_chunk( + &fixture, + &db_name, + ChunkStorage::ReadBufferAndObjectStore, + std::time::Duration::from_secs(5), + ) + .await; +} + +#[tokio::test] +async fn test_query_chunk_after_restart() { + // fixtures + let fixture = ServerFixture::create_single_use().await; + let server_id = 42; + let db_name = rand_name(); + + // set server ID + let mut management_client = fixture.management_client(); + management_client + .update_server_id(server_id) + .await + .expect("set ID failed"); + fixture.wait_server_initialized().await; + + // create DB and a RB chunk + create_readable_database(&db_name, fixture.grpc_channel()).await; + let chunk_id = create_readbuffer_chunk(&fixture, &db_name).await; + + // enable persistance + let mut rules = management_client.get_database(&db_name).await.unwrap(); + rules.lifecycle_rules = Some({ + let mut lifecycle_rules = rules.lifecycle_rules.unwrap(); + lifecycle_rules.persist = true; + lifecycle_rules + }); + management_client.update_database(rules).await.unwrap(); + + // wait for persistence + wait_for_persisted_chunk( + &fixture, + &db_name, + chunk_id, + std::time::Duration::from_secs(10), + ) + .await; + + // check before restart + assert_chunk_query_works(&fixture, &db_name).await; + + // restart server + let fixture = fixture.restart_server().await; + fixture.wait_server_initialized().await; + + // query data after restart + assert_chunk_query_works(&fixture, &db_name).await; +} + +/// Create a closed read buffer chunk and return its id +async fn create_readbuffer_chunk(fixture: &ServerFixture, db_name: &str) -> u32 { + use influxdb_iox_client::management::generated_types::operation_metadata::Job; + + let mut management_client = fixture.management_client(); + let mut write_client = fixture.write_client(); + let mut operations_client = fixture.operations_client(); + + let partition_key = "cpu"; + let table_name = "cpu"; + let lp_lines = vec!["cpu,region=west user=23.2 100"]; + + write_client + .write(db_name, lp_lines.join("\n")) + .await + .expect("write succeded"); + + let chunks = list_chunks(fixture, db_name).await; + + assert_eq!(chunks.len(), 1, "Chunks: {:#?}", chunks); + let chunk_id = chunks[0].id; + assert_eq!(chunks[0].storage, ChunkStorage::OpenMutableBuffer); + + // Move the chunk to read buffer + let operation = management_client + .close_partition_chunk(db_name, table_name, partition_key, 0) + .await + .expect("new partition chunk"); + + println!("Operation response is {:?}", operation); + let operation_id = operation.id(); + + let meta = operations::ClientOperation::try_new(operation) + .unwrap() + .metadata(); + + // ensure we got a legit job description back + if let Some(Job::CloseChunk(close_chunk)) = meta.job { + assert_eq!(close_chunk.db_name, db_name); + assert_eq!(close_chunk.partition_key, partition_key); + assert_eq!(close_chunk.chunk_id, 0); + } else { + panic!("unexpected job returned") + }; + + // wait for the job to be done + operations_client + .wait_operation(operation_id, Some(std::time::Duration::from_secs(1))) + .await + .expect("failed to wait operation"); + + // And now the chunk should be good + let mut chunks = list_chunks(fixture, db_name).await; + chunks.sort_by(|c1, c2| c1.id.cmp(&c2.id)); + + assert_eq!(chunks.len(), 1, "Chunks: {:#?}", chunks); + assert_eq!(chunks[0].id, chunk_id); + assert_eq!(chunks[0].storage, ChunkStorage::ReadBuffer); + + chunk_id +} + +// Wait for the specified chunk to be persisted to object store +async fn wait_for_persisted_chunk( + fixture: &ServerFixture, + db_name: &str, + chunk_id: u32, + wait_time: std::time::Duration, +) { + let t_start = std::time::Instant::now(); + loop { - assert!( - Instant::now() < deadline, - "Chunk did not persist in time. Chunks were: {:#?}", - chunks - ); + let chunks = list_chunks(fixture, db_name).await; - chunks = management_client - .list_chunks(&db_name) - .await - .expect("listing chunks"); - - let storage_string = chunks - .iter() - .map(|c| { - let c: ChunkSummary = c.clone().try_into().unwrap(); - format!("{:?}", c.storage) - }) - .collect::>() - .join(","); - - // Found a persisted chunk, all good - if storage_string == "ReadBufferAndObjectStore" { + let chunk = chunks.iter().find(|chunk| chunk.id == chunk_id).unwrap(); + if (chunk.storage == ChunkStorage::ReadBufferAndObjectStore) + || (chunk.storage == ChunkStorage::ObjectStoreOnly) + { return; } - // keep looking - println!("Current chunk storage: {:#?}", storage_string); - tokio::time::sleep(Duration::from_millis(200)).await + assert!(t_start.elapsed() < wait_time); + tokio::time::sleep(std::time::Duration::from_millis(100)).await; } } + +// Wait for at least one chunk to be in the specified storage state +async fn wait_for_chunk( + fixture: &ServerFixture, + db_name: &str, + desired_storage: ChunkStorage, + wait_time: std::time::Duration, +) { + let t_start = std::time::Instant::now(); + + loop { + let chunks = list_chunks(fixture, db_name).await; + + if chunks.iter().any(|chunk| chunk.storage == desired_storage) { + return; + } + + // Log the current status of the chunks + for chunk in &chunks { + println!( + "{:?}: chunk {} partition {} storage:{:?}", + (t_start.elapsed()), + chunk.id, + chunk.partition_key, + chunk.storage + ); + } + + assert!( + t_start.elapsed() < wait_time, + "Could not find chunk in desired state {:?} within {:?}. Chunks were: {:#?}", + desired_storage, + wait_time, + chunks + ); + + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + } +} + +async fn assert_chunk_query_works(fixture: &ServerFixture, db_name: &str) { + let mut client = fixture.flight_client(); + let sql_query = "select region, user, time from cpu"; + + let query_results = client.perform_query(db_name, sql_query).await.unwrap(); + + let batches = collect_query(query_results).await; + let expected_read_data = vec![ + "+--------+------+-------------------------------+", + "| region | user | time |", + "+--------+------+-------------------------------+", + "| west | 23.2 | 1970-01-01 00:00:00.000000100 |", + "+--------+------+-------------------------------+", + ]; + + assert_batches_eq!(expected_read_data, &batches); +} + +/// Gets the list of ChunkSummaries from the server +async fn list_chunks(fixture: &ServerFixture, db_name: &str) -> Vec { + let mut management_client = fixture.management_client(); + let chunks = management_client.list_chunks(db_name).await.unwrap(); + + chunks.into_iter().map(|c| c.try_into().unwrap()).collect() +} diff --git a/tests/end_to_end_cases/preservation.rs b/tests/end_to_end_cases/preservation.rs deleted file mode 100644 index 35fb96f5ce..0000000000 --- a/tests/end_to_end_cases/preservation.rs +++ /dev/null @@ -1,162 +0,0 @@ -use arrow_util::assert_batches_eq; -use generated_types::influxdata::iox::management::v1::*; -use influxdb_iox_client::operations; - -use super::scenario::{collect_query, create_readable_database, rand_name}; -use crate::common::server_fixture::ServerFixture; - -#[tokio::test] -async fn test_query_chunk_after_restart() { - // fixtures - let fixture = ServerFixture::create_single_use().await; - let server_id = 42; - let db_name = rand_name(); - - // set server ID - let mut management_client = fixture.management_client(); - management_client - .update_server_id(server_id) - .await - .expect("set ID failed"); - fixture.wait_server_initialized().await; - - // create DB and a RB chunk - create_readable_database(&db_name, fixture.grpc_channel()).await; - let chunk_id = create_readbuffer_chunk(&fixture, &db_name).await; - - // enable persistance - let mut rules = management_client.get_database(&db_name).await.unwrap(); - rules.lifecycle_rules = Some({ - let mut lifecycle_rules = rules.lifecycle_rules.unwrap(); - lifecycle_rules.persist = true; - lifecycle_rules - }); - management_client.update_database(rules).await.unwrap(); - - // wait for persistence - wait_for_persisted_chunk( - &fixture, - &db_name, - chunk_id, - std::time::Duration::from_secs(10), - ) - .await; - - // check before restart - assert_chunk_query_works(&fixture, &db_name).await; - - // restart server - let fixture = fixture.restart_server().await; - fixture.wait_server_initialized().await; - - // query data after restart - assert_chunk_query_works(&fixture, &db_name).await; -} - -async fn create_readbuffer_chunk(fixture: &ServerFixture, db_name: &str) -> u32 { - use influxdb_iox_client::management::generated_types::operation_metadata::Job; - - let mut management_client = fixture.management_client(); - let mut write_client = fixture.write_client(); - let mut operations_client = fixture.operations_client(); - - let partition_key = "cpu"; - let table_name = "cpu"; - let lp_lines = vec!["cpu,region=west user=23.2 100"]; - - write_client - .write(db_name, lp_lines.join("\n")) - .await - .expect("write succeded"); - - let chunks = management_client - .list_chunks(db_name) - .await - .expect("listing chunks"); - - assert_eq!(chunks.len(), 1, "Chunks: {:#?}", chunks); - let chunk_id = chunks[0].id; - assert_eq!(chunks[0].storage, ChunkStorage::OpenMutableBuffer as i32); - - // Move the chunk to read buffer - let operation = management_client - .close_partition_chunk(db_name, table_name, partition_key, 0) - .await - .expect("new partition chunk"); - - println!("Operation response is {:?}", operation); - let operation_id = operation.id(); - - let meta = operations::ClientOperation::try_new(operation) - .unwrap() - .metadata(); - - // ensure we got a legit job description back - if let Some(Job::CloseChunk(close_chunk)) = meta.job { - assert_eq!(close_chunk.db_name, db_name); - assert_eq!(close_chunk.partition_key, partition_key); - assert_eq!(close_chunk.chunk_id, 0); - } else { - panic!("unexpected job returned") - }; - - // wait for the job to be done - operations_client - .wait_operation(operation_id, Some(std::time::Duration::from_secs(1))) - .await - .expect("failed to wait operation"); - - // And now the chunk should be good - let mut chunks = management_client - .list_chunks(db_name) - .await - .expect("listing chunks"); - chunks.sort_by(|c1, c2| c1.id.cmp(&c2.id)); - - assert_eq!(chunks.len(), 1, "Chunks: {:#?}", chunks); - assert_eq!(chunks[0].id, chunk_id); - assert_eq!(chunks[0].storage, ChunkStorage::ReadBuffer as i32); - - chunk_id -} - -async fn wait_for_persisted_chunk( - fixture: &ServerFixture, - db_name: &str, - chunk_id: u32, - wait_time: std::time::Duration, -) { - let t_start = std::time::Instant::now(); - - loop { - let mut management_client = fixture.management_client(); - let chunks = management_client.list_chunks(db_name).await.unwrap(); - let chunk = chunks.iter().find(|chunk| chunk.id == chunk_id).unwrap(); - if (chunk.storage == ChunkStorage::ReadBufferAndObjectStore as i32) - || (chunk.storage == ChunkStorage::ObjectStoreOnly as i32) - { - return; - } - - assert!(t_start.elapsed() < wait_time); - tokio::time::sleep(std::time::Duration::from_millis(100)).await; - } -} - -async fn assert_chunk_query_works(fixture: &ServerFixture, db_name: &str) { - let mut client = fixture.flight_client(); - let sql_query = "select region, user, time from cpu"; - - let query_results = client.perform_query(db_name, sql_query).await.unwrap(); - - let batches = collect_query(query_results).await; - let expected_read_data = vec![ - "+--------+------+-------------------------------+", - "| region | user | time |", - "+--------+------+-------------------------------+", - "| west | 23.2 | 1970-01-01 00:00:00.000000100 |", - "+--------+------+-------------------------------+", - ]; - - assert_batches_eq!(expected_read_data, &batches); -} From 7dd0416960cc59ca74fd07d5b5d96eb23112adaf Mon Sep 17 00:00:00 2001 From: Nga Tran Date: Fri, 11 Jun 2021 09:43:39 -0400 Subject: [PATCH 8/8] refactor: address review comments --- query/src/provider.rs | 16 ++++++++-------- query/src/test.rs | 10 +++++----- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/query/src/provider.rs b/query/src/provider.rs index c9d6ceb2cb..6604021be1 100644 --- a/query/src/provider.rs +++ b/query/src/provider.rs @@ -923,7 +923,7 @@ mod test { // Test no duplicate at all let chunk = Arc::new( TestChunk::new(1) - .with_time_column_with_stats("t", "5", "7000") + .with_time_column_with_stats("t", 5, 7000) .with_tag_column_with_stats("t", "tag1", "AL", "MT") .with_int_field_column("t", "field_int") .with_five_rows_of_data("t"), @@ -961,7 +961,7 @@ mod test { // Test one chunk with duplicate within let chunk = Arc::new( TestChunk::new(1) - .with_time_column_with_stats("t", "5", "7000") + .with_time_column_with_stats("t", 5, 7000) .with_tag_column_with_stats("t", "tag1", "AL", "MT") .with_int_field_column("t", "field_int") .with_may_contain_pk_duplicates(true) @@ -1007,7 +1007,7 @@ mod test { // test overlapped chunks let chunk1 = Arc::new( TestChunk::new(1) - .with_time_column_with_stats("t", "5", "7000") + .with_time_column_with_stats("t", 5, 7000) .with_tag_column_with_stats("t", "tag1", "AL", "MT") .with_int_field_column("t", "field_int") .with_ten_rows_of_data_some_duplicates("t"), @@ -1015,7 +1015,7 @@ mod test { let chunk2 = Arc::new( TestChunk::new(1) - .with_time_column_with_stats("t", "5", "7000") + .with_time_column_with_stats("t", 5, 7000) .with_tag_column_with_stats("t", "tag1", "AL", "MT") .with_int_field_column("t", "field_int") .with_five_rows_of_data("t"), @@ -1065,7 +1065,7 @@ mod test { // This test covers all kind of chunks: overlap, non-overlap without duplicates within, non-overlap with duplicates within let chunk1 = Arc::new( TestChunk::new(1) - .with_time_column_with_stats("t", "5", "7000") + .with_time_column_with_stats("t", 5, 7000) .with_tag_column_with_stats("t", "tag1", "AL", "MT") .with_int_field_column("t", "field_int") .with_ten_rows_of_data_some_duplicates("t"), @@ -1074,7 +1074,7 @@ mod test { // chunk2 overlaps with chunk 1 let chunk2 = Arc::new( TestChunk::new(1) - .with_time_column_with_stats("t", "5", "7000") + .with_time_column_with_stats("t", 5, 7000) .with_tag_column_with_stats("t", "tag1", "AL", "MT") .with_int_field_column("t", "field_int") .with_five_rows_of_data("t"), @@ -1083,7 +1083,7 @@ mod test { // chunk3 no overlap, no duplicates within let chunk3 = Arc::new( TestChunk::new(1) - .with_time_column_with_stats("t", "8000", "20000") + .with_time_column_with_stats("t", 8000, 20000) .with_tag_column_with_stats("t", "tag1", "UT", "WA") .with_int_field_column("t", "field_int") .with_three_rows_of_data("t"), @@ -1092,7 +1092,7 @@ mod test { // chunk3 no overlap, duplicates within let chunk4 = Arc::new( TestChunk::new(1) - .with_time_column_with_stats("t", "28000", "220000") + .with_time_column_with_stats("t", 28000, 220000) .with_tag_column_with_stats("t", "tag1", "UT", "WA") .with_int_field_column("t", "field_int") .with_may_contain_pk_duplicates(true) diff --git a/query/src/test.rs b/query/src/test.rs index ca2ff6dd89..07f5f80757 100644 --- a/query/src/test.rs +++ b/query/src/test.rs @@ -250,8 +250,8 @@ impl TestChunk { pub fn with_time_column_with_stats( self, table_name: impl Into, - min: &str, - max: &str, + min: i64, + max: i64, ) -> Self { let table_name = table_name.into(); @@ -267,9 +267,9 @@ impl TestChunk { .find(|c| c.name == TIME_COLUMN_NAME) .expect("had column"); - column_summary.stats = Statistics::String(StatValues { - min: Some(min.to_string()), - max: Some(max.to_string()), + column_summary.stats = Statistics::I64(StatValues { + min: Some(min), + max: Some(max), ..Default::default() });