diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 9ce7369217..a04455a1b9 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -45,6 +45,12 @@ jobs: with: command: build args: --workspace + # Ensure benches still build + - name: Build Benches + uses: actions-rs/cargo@v1 + with: + command: test + args: --workspace --benches --no-run test: name: Test diff --git a/Cargo.lock b/Cargo.lock index ae6e644ed8..64e4fc9451 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -101,7 +101,7 @@ checksum = "23b62fc65de8e4e7f52534fb52b0f3ed04746ae267519eef2a83941e8085068b" [[package]] name = "arrow" version = "4.0.0-SNAPSHOT" -source = "git+https://github.com/apache/arrow.git?rev=e2d6c057684b587151afffe50f7eaef94533e017#e2d6c057684b587151afffe50f7eaef94533e017" +source = "git+https://github.com/apache/arrow.git?rev=ad4504e8e85eb8e5babe0f01ca8cf9947499fc40#ad4504e8e85eb8e5babe0f01ca8cf9947499fc40" dependencies = [ "cfg_aliases", "chrono", @@ -124,7 +124,7 @@ dependencies = [ [[package]] name = "arrow-flight" version = "4.0.0-SNAPSHOT" -source = "git+https://github.com/apache/arrow.git?rev=e2d6c057684b587151afffe50f7eaef94533e017#e2d6c057684b587151afffe50f7eaef94533e017" +source = "git+https://github.com/apache/arrow.git?rev=ad4504e8e85eb8e5babe0f01ca8cf9947499fc40#ad4504e8e85eb8e5babe0f01ca8cf9947499fc40" dependencies = [ "arrow", "bytes", @@ -237,7 +237,7 @@ checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a" [[package]] name = "azure_core" version = "0.1.0" -source = "git+https://github.com/Azure/azure-sdk-for-rust.git?rev=5ecad7216e1f04c5ff41e7de4667f006664c8cca#5ecad7216e1f04c5ff41e7de4667f006664c8cca" +source = "git+https://github.com/Azure/azure-sdk-for-rust.git?rev=14ff9326bb1ba07f98733a548988eccd4532b945#14ff9326bb1ba07f98733a548988eccd4532b945" dependencies = [ "RustyXML", "async-trait", @@ -266,7 +266,7 @@ dependencies = [ [[package]] name = "azure_storage" version = "0.1.0" -source = "git+https://github.com/Azure/azure-sdk-for-rust.git?rev=5ecad7216e1f04c5ff41e7de4667f006664c8cca#5ecad7216e1f04c5ff41e7de4667f006664c8cca" +source = "git+https://github.com/Azure/azure-sdk-for-rust.git?rev=14ff9326bb1ba07f98733a548988eccd4532b945#14ff9326bb1ba07f98733a548988eccd4532b945" dependencies = [ "RustyXML", "azure_core", @@ -289,6 +289,7 @@ dependencies = [ "serde_derive", "serde_json", "smallvec", + "thiserror", "time 0.2.25", "url", "uuid", @@ -848,7 +849,7 @@ dependencies = [ [[package]] name = "datafusion" version = "4.0.0-SNAPSHOT" -source = "git+https://github.com/apache/arrow.git?rev=e2d6c057684b587151afffe50f7eaef94533e017#e2d6c057684b587151afffe50f7eaef94533e017" +source = "git+https://github.com/apache/arrow.git?rev=ad4504e8e85eb8e5babe0f01ca8cf9947499fc40#ad4504e8e85eb8e5babe0f01ca8cf9947499fc40" dependencies = [ "ahash 0.7.0", "arrow", @@ -1206,6 +1207,23 @@ dependencies = [ "once_cell", ] +[[package]] +name = "futures-test" +version = "0.3.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b30f48f6b9cd26d8739965d6e3345c511718884fb223795b80dc71d24a9ea9a" +dependencies = [ + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", + "once_cell", + "pin-project 1.0.5", + "pin-utils", +] + [[package]] name = "futures-util" version = "0.3.12" @@ -2134,6 +2152,7 @@ dependencies = [ "cloud-storage", "dotenv", "futures", + "futures-test", "itertools 0.9.0", "percent-encoding", "reqwest", @@ -2282,7 +2301,7 @@ dependencies = [ [[package]] name = "parquet" version = "4.0.0-SNAPSHOT" -source = "git+https://github.com/apache/arrow.git?rev=e2d6c057684b587151afffe50f7eaef94533e017#e2d6c057684b587151afffe50f7eaef94533e017" +source = "git+https://github.com/apache/arrow.git?rev=ad4504e8e85eb8e5babe0f01ca8cf9947499fc40#ad4504e8e85eb8e5babe0f01ca8cf9947499fc40" dependencies = [ "arrow", "base64 0.12.3", diff --git a/arrow_deps/Cargo.toml b/arrow_deps/Cargo.toml index af84d36501..d2e9d9719d 100644 --- a/arrow_deps/Cargo.toml +++ b/arrow_deps/Cargo.toml @@ -8,11 +8,11 @@ description = "Apache Arrow / Parquet / DataFusion dependencies for InfluxDB IOx [dependencies] # In alphabetical order # We are using development version of arrow/parquet/datafusion and the dependencies are at the same rev -# The version can be found here: https://github.com/apache/arrow/commit/e2d6c057684b587151afffe50f7eaef94533e017 +# The version can be found here: https://github.com/apache/arrow/commit/ad4504e8e85eb8e5babe0f01ca8cf9947499fc40 # -arrow = { git = "https://github.com/apache/arrow.git", rev = "e2d6c057684b587151afffe50f7eaef94533e017" , features = ["simd"] } -arrow-flight = { git = "https://github.com/apache/arrow.git", rev = "e2d6c057684b587151afffe50f7eaef94533e017" } -datafusion = { git = "https://github.com/apache/arrow.git", rev = "e2d6c057684b587151afffe50f7eaef94533e017" } +arrow = { git = "https://github.com/apache/arrow.git", rev = "ad4504e8e85eb8e5babe0f01ca8cf9947499fc40" , features = ["simd"] } +arrow-flight = { git = "https://github.com/apache/arrow.git", rev = "ad4504e8e85eb8e5babe0f01ca8cf9947499fc40" } +datafusion = { git = "https://github.com/apache/arrow.git", rev = "ad4504e8e85eb8e5babe0f01ca8cf9947499fc40" } # Turn off the "arrow" feature; it currently has a bug that causes the crate to rebuild every time # and we're not currently using it anyway -parquet = { git = "https://github.com/apache/arrow.git", rev = "e2d6c057684b587151afffe50f7eaef94533e017", default-features = false, features = ["snap", "brotli", "flate2", "lz4", "zstd"] } +parquet = { git = "https://github.com/apache/arrow.git", rev = "ad4504e8e85eb8e5babe0f01ca8cf9947499fc40", default-features = false, features = ["snap", "brotli", "flate2", "lz4", "zstd"] } diff --git a/build.rs b/build.rs new file mode 100644 index 0000000000..5126e2f655 --- /dev/null +++ b/build.rs @@ -0,0 +1,14 @@ +// Include the GIT_HASH, if any, in `GIT_HASH` environment variable at build +// time +// +// https://stackoverflow.com/questions/43753491/include-git-commit-hash-as-string-into-rust-program +use std::process::Command; +fn main() { + let output = Command::new("git").args(&["rev-parse", "HEAD"]).output(); + + if let Ok(output) = output { + if let Ok(git_hash) = String::from_utf8(output.stdout) { + println!("cargo:rustc-env=GIT_HASH={}", git_hash); + } + } +} diff --git a/data_types/src/selection.rs b/data_types/src/selection.rs index bc5ee81cfd..34604d86d7 100644 --- a/data_types/src/selection.rs +++ b/data_types/src/selection.rs @@ -1,4 +1,4 @@ -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Copy)] /// A collection of columns to include in query results. /// /// The `All` variant denotes that the caller wishes to include all table diff --git a/docs/env.example b/docs/env.example new file mode 100644 index 0000000000..8154233886 --- /dev/null +++ b/docs/env.example @@ -0,0 +1,44 @@ +# This is an example .env file showing all of the environment variables that can +# be configured within the project. Copy this file to the top level directory with +# the name `.env`, then remove any `#` at the beginning of the lines of variables +# you'd like to set and change the values. +# +# The identifier for the server. Used for writing to object storage and as +# an identifier that is added to replicated writes, WAL segments and Chunks. +# Must be unique in a group of connected or semi-connected IOx servers. +# Must be a number that can be represented by a 32-bit unsigned integer. +# INFLUXDB_IOX_ID=1 +# +# Where to store files on disk: +# INFLUXDB_IOX_DB_DIR=$HOME/.influxdb_iox +# TEST_INFLUXDB_IOX_DB_DIR=$HOME/.influxdb_iox +# +# Addresses for the server processes: +# INFLUXDB_IOX_BIND_ADDR=127.0.0.1:8080 +# INFLUXDB_IOX_GRPC_BIND_ADDR=127.0.0.1:8082 +# +# If using Amazon S3 as an object store: +# AWS_ACCESS_KEY_ID=access_key_value +# AWS_SECRET_ACCESS_KEY=secret_access_key_value +# AWS_DEFAULT_REGION=us-east-2 +# AWS_S3_BUCKET_NAME=bucket-name +# +# If using Google Cloud Storage as an object store: +# GCS_BUCKET_NAME=bucket_name +# Set one of SERVICE_ACCOUNT or GOOGLE_APPLICATION_CREDENTIALS, either to a path of a filename +# containing Google credential JSON or to the JSON directly. +# SERVICE_ACCOUNT=/path/to/auth/info.json +# GOOGLE_APPLICATION_CREDENTIALS={"project_id": ...} +# +# If using Microsoft Azure as an object store: +# The name you see when going to All Services > Storage accounts > [name] +# AZURE_STORAGE_ACCOUNT= +# The name of a container you've created in the storage account, under Blob Service > Containers +# AZURE_STORAGE_CONTAINER= +# In the Storage account's Settings > Access keys, one of the Key values +# AZURE_STORAGE_MASTER_KEY= +# +# To enable Jaeger tracing: +# OTEL_SERVICE_NAME="iox" # defaults to iox +# OTEL_EXPORTER_JAEGER_AGENT_HOST="jaeger.influxdata.net" +# OTEL_EXPORTER_JAEGER_AGENT_PORT="6831" \ No newline at end of file diff --git a/mutable_buffer/src/database.rs b/mutable_buffer/src/database.rs index 2f6d7126f0..ffc01f731c 100644 --- a/mutable_buffer/src/database.rs +++ b/mutable_buffer/src/database.rs @@ -2,7 +2,7 @@ use generated_types::wal; use query::group_by::GroupByAndAggregate; use query::group_by::WindowDuration; use query::{ - exec::{stringset::StringSet, FieldListPlan, SeriesSetPlan, SeriesSetPlans}, + exec::{stringset::StringSet, SeriesSetPlan, SeriesSetPlans}, predicate::Predicate, Database, }; @@ -213,15 +213,6 @@ impl Database for MutableBufferDb { Ok(()) } - /// return all field names in this database, while applying optional - /// predicates - async fn field_column_names(&self, predicate: Predicate) -> Result { - let mut filter = ChunkTableFilter::new(predicate); - let mut visitor = TableFieldPredVisitor::new(); - self.accept(&mut filter, &mut visitor)?; - Ok(visitor.into_fieldlist_plan()) - } - /// return all column values in this database, while applying optional /// predicates async fn column_values( @@ -275,7 +266,7 @@ impl Database for MutableBufferDb { } /// Return the partition keys for data in this DB - async fn partition_keys(&self) -> Result, Self::Error> { + fn partition_keys(&self) -> Result, Self::Error> { let partitions = self.partitions.read().expect("mutex poisoned"); let keys = partitions.keys().cloned().collect(); Ok(keys) @@ -283,7 +274,7 @@ impl Database for MutableBufferDb { /// Return the list of chunks, in order of id, for the specified /// partition_key - async fn chunks(&self, partition_key: &str) -> Vec> { + fn chunks(&self, partition_key: &str) -> Vec> { let partition = self.get_partition(partition_key); let partition = partition.read().expect("mutex poisoned"); partition.chunks() @@ -537,39 +528,6 @@ impl ChunkTableFilter { } } -/// return a plan that selects all values from field columns after -/// applying timestamp and other predicates -#[derive(Debug)] -struct TableFieldPredVisitor { - // As Each table can be spread across multiple Chunks, we - // collect all the relevant plans and Union them together. - plans: Vec, -} - -impl Visitor for TableFieldPredVisitor { - fn pre_visit_table( - &mut self, - table: &Table, - chunk: &Chunk, - filter: &mut ChunkTableFilter, - ) -> Result<()> { - self.plans - .push(table.field_names_plan(filter.chunk_predicate(), chunk)?); - Ok(()) - } -} - -impl TableFieldPredVisitor { - fn new() -> Self { - let plans = Vec::new(); - Self { plans } - } - - fn into_fieldlist_plan(self) -> FieldListPlan { - FieldListPlan::Plans(self.plans) - } -} - /// return all values in the `column_name` column /// in this database, while applying the timestamp range /// @@ -823,7 +781,6 @@ mod tests { use super::*; use data_types::selection::Selection; use query::{ - exec::fieldlist::{Field, FieldList}, exec::{ field::FieldIndexes, seriesset::{Error as SeriesSetError, SeriesSet, SeriesSetItem}, @@ -834,10 +791,7 @@ mod tests { }; use arrow_deps::{ - arrow::{ - array::{Array, StringArray}, - datatypes::DataType, - }, + arrow::array::{Array, StringArray}, datafusion::prelude::*, }; use influxdb_line_protocol::{parse_lines, ParsedLine}; @@ -1272,155 +1226,6 @@ mod tests { ); } - #[tokio::test] - async fn test_field_columns() -> Result { - // Ensure that the database queries are hooked up correctly - - let db = MutableBufferDb::new("column_namedb"); - - let lp_data = vec![ - "h2o,state=MA,city=Boston temp=70.4 50", - "h2o,state=MA,city=Boston other_temp=70.4 250", - "h2o,state=CA,city=Boston other_temp=72.4 350", - "o2,state=MA,city=Boston temp=53.4,reading=51 50", - ] - .join("\n"); - - let lines: Vec<_> = parse_lines(&lp_data).map(|l| l.unwrap()).collect(); - write_lines(&db, &lines).await; - - // write a new lp_line that is in a new day and thus a new partititon - let nanoseconds_per_day: i64 = 1_000_000_000 * 60 * 60 * 24; - - let lp_data = vec![format!( - "h2o,state=MA,city=Boston temp=70.4,moisture=43.0 {}", - nanoseconds_per_day * 10 - )] - .join("\n"); - let lines: Vec<_> = parse_lines(&lp_data).map(|l| l.unwrap()).collect(); - write_lines(&db, &lines).await; - - // ensure there are 2 chunks - assert_eq!(db.len(), 2); - - // setup to run the execution plan ( - let executor = Executor::default(); - - let predicate = PredicateBuilder::default() - .table("NoSuchTable") - .add_expr(col("state").eq(lit("MA"))) // state=MA - .build(); - - // make sure table filtering works (no tables match) - let plan = db - .field_column_names(predicate) - .await - .expect("Created field_columns plan successfully"); - - let fieldlists = executor - .to_field_list(plan) - .await - .expect("Running fieldlist plan"); - assert!(fieldlists.fields.is_empty()); - - // get only fields from h20 (but both chunks) - let predicate = PredicateBuilder::default() - .table("h2o") - .add_expr(col("state").eq(lit("MA"))) // state=MA - .build(); - - let plan = db - .field_column_names(predicate) - .await - .expect("Created field_columns plan successfully"); - - let actual = executor - .to_field_list(plan) - .await - .expect("Running fieldlist plan"); - - let expected = FieldList { - fields: vec![ - Field { - name: "moisture".into(), - data_type: DataType::Float64, - last_timestamp: nanoseconds_per_day * 10, - }, - Field { - name: "other_temp".into(), - data_type: DataType::Float64, - last_timestamp: 250, - }, - Field { - name: "temp".into(), - data_type: DataType::Float64, - last_timestamp: nanoseconds_per_day * 10, - }, - ], - }; - - assert_eq!( - expected, actual, - "Expected:\n{:#?}\nActual:\n{:#?}", - expected, actual - ); - - Ok(()) - } - - #[tokio::test] - async fn test_field_columns_timestamp_predicate() -> Result { - // check the appropriate filters are applied in the datafusion plans - let db = MutableBufferDb::new("column_namedb"); - - let lp_data = vec![ - "h2o,state=MA,city=Boston temp=70.4 50", - "h2o,state=MA,city=Boston other_temp=70.4 250", - "h2o,state=CA,city=Boston other_temp=72.4 350", - "o2,state=MA,city=Boston temp=53.4,reading=51 50", - ] - .join("\n"); - - let lines: Vec<_> = parse_lines(&lp_data).map(|l| l.unwrap()).collect(); - write_lines(&db, &lines).await; - - // setup to run the execution plan ( - let executor = Executor::default(); - - let predicate = PredicateBuilder::default() - .table("h2o") - .timestamp_range(200, 300) - .add_expr(col("state").eq(lit("MA"))) // state=MA - .build(); - - let plan = db - .field_column_names(predicate) - .await - .expect("Created field_columns plan successfully"); - - let actual = executor - .to_field_list(plan) - .await - .expect("Running fieldlist plan"); - - // Should only have other_temp as a field - let expected = FieldList { - fields: vec![Field { - name: "other_temp".into(), - data_type: DataType::Float64, - last_timestamp: 250, - }], - }; - - assert_eq!( - expected, actual, - "Expected:\n{:#?}\nActual:\n{:#?}", - expected, actual - ); - - Ok(()) - } - #[tokio::test] async fn db_size() { let db = MutableBufferDb::new("column_namedb"); diff --git a/mutable_buffer/src/table.rs b/mutable_buffer/src/table.rs index ff648de30b..769f2b9bd9 100644 --- a/mutable_buffer/src/table.rs +++ b/mutable_buffer/src/table.rs @@ -614,39 +614,6 @@ impl Table { )) } - /// Creates a plan that produces an output table with rows that - /// match the predicate for all fields in the table. - /// - /// The output looks like (field0, field1, ..., time) - /// - /// The data is not sorted in any particular order - /// - /// The created plan looks like: - /// - /// Projection (select the field columns needed) - /// Filter(predicate) [optional] - /// InMemoryScan - pub fn field_names_plan( - &self, - chunk_predicate: &ChunkPredicate, - chunk: &Chunk, - ) -> Result { - // Scan and Filter - let plan_builder = self.scan_with_predicates(chunk_predicate, chunk)?; - - // Selection - let select_exprs = self - .field_and_time_column_names(chunk_predicate, chunk) - .into_iter() - .map(|c| c.into_expr()) - .collect::>(); - - let plan_builder = plan_builder.project(&select_exprs).context(BuildingPlan)?; - - // and finally create the plan - plan_builder.build().context(BuildingPlan) - } - // Returns (tag_columns, field_columns) vectors with the names of // all tag and field columns, respectively, after any predicates // have been applied. The vectors are sorted by lexically by name. @@ -690,42 +657,6 @@ impl Table { Ok((tag_columns, field_columns)) } - // Returns (field_columns and time) in sorted order - fn field_and_time_column_names( - &self, - chunk_predicate: &ChunkPredicate, - chunk: &Chunk, - ) -> ArcStringVec { - let mut field_columns = self - .columns - .iter() - .filter_map(|(column_id, column)| { - match column { - Column::Tag(_, _) => None, // skip tags - _ => { - if chunk_predicate.should_include_field(*column_id) - || chunk_predicate.is_time_column(*column_id) - { - let column_name = chunk - .dictionary - .lookup_id(*column_id) - .expect("Find column name in dictionary"); - Some(Arc::new(column_name.to_string())) - } else { - None - } - } - } - }) - .collect::>(); - - // Sort the field columns too so that the output always comes - // out in a predictable order - field_columns.sort(); - - field_columns - } - /// Returns the column selection for all the columns in this table, orderd /// by table name fn all_columns_selection<'a>(&self, chunk: &'a Chunk) -> Result> { @@ -2091,47 +2022,6 @@ mod tests { assert_eq!(expected, results, "expected output"); } - #[tokio::test] - async fn test_field_name_plan() { - let mut chunk = Chunk::new(42); - let dictionary = &mut chunk.dictionary; - let mut table = Table::new(dictionary.lookup_value_or_insert("table_name")); - - let lp_lines = vec![ - // Order this so field3 comes before field2 - // (and thus the columns need to get reordered) - "h2o,tag1=foo,tag2=bar field1=70.6,field3=2 100", - "h2o,tag1=foo,tag2=bar field1=70.4,field2=\"ss\" 100", - "h2o,tag1=foo,tag2=bar field1=70.5,field2=\"ss\" 100", - "h2o,tag1=foo,tag2=bar field1=70.6,field4=true 1000", - ]; - - write_lines_to_table(&mut table, dictionary, lp_lines); - - let predicate = PredicateBuilder::default().timestamp_range(0, 200).build(); - - let chunk_predicate = chunk.compile_predicate(&predicate).unwrap(); - - let field_names_set_plan = table - .field_names_plan(&chunk_predicate, &chunk) - .expect("creating the field_name plan"); - - // run the created plan, ensuring the output is as expected - let results = run_plan(field_names_set_plan).await; - - let expected = vec![ - "+--------+--------+--------+--------+------+", - "| field1 | field2 | field3 | field4 | time |", - "+--------+--------+--------+--------+------+", - "| 70.6 | | 2 | | 100 |", - "| 70.4 | ss | | | 100 |", - "| 70.5 | ss | | | 100 |", - "+--------+--------+--------+--------+------+", - ]; - - assert_eq!(expected, results, "expected output"); - } - #[test] fn test_reorder_prefix() { assert_eq!(reorder_prefix_ok(&[], &[]), &[] as &[&str]); diff --git a/object_store/Cargo.toml b/object_store/Cargo.toml index 10cf95c8b0..a677886989 100644 --- a/object_store/Cargo.toml +++ b/object_store/Cargo.toml @@ -7,9 +7,9 @@ edition = "2018" [dependencies] # In alphabetical order async-trait = "0.1.42" # Microsoft Azure Blob storage integration -# In order to support tokio 1.0 needed to pull in unreleased azure sdk -azure_core = { git = "https://github.com/Azure/azure-sdk-for-rust.git", rev = "5ecad7216e1f04c5ff41e7de4667f006664c8cca" } -azure_storage = { git = "https://github.com/Azure/azure-sdk-for-rust.git", rev = "5ecad7216e1f04c5ff41e7de4667f006664c8cca", default-features = false, features = ["table", "blob"] } +# In order to support tokio 1.0 and delimiters, needed to pull in unreleased azure sdk +azure_core = { git = "https://github.com/Azure/azure-sdk-for-rust.git", rev = "14ff9326bb1ba07f98733a548988eccd4532b945" } +azure_storage = { git = "https://github.com/Azure/azure-sdk-for-rust.git", rev = "14ff9326bb1ba07f98733a548988eccd4532b945", default-features = false, features = ["table", "blob", "queue"] } bytes = "1.0" chrono = "0.4" # Google Cloud Storage integration @@ -26,11 +26,13 @@ rusoto_s3 = "0.46.0" snafu = { version = "0.6.10", features = ["futures"] } tokio = { version = "1.0", features = ["macros", "fs"] } # Filesystem integration -tokio-util = "0.6.2" +tokio-util = { version = "0.6.3", features = [ "io" ] } reqwest = "0.11" # Filesystem integration walkdir = "2" +tempfile = "3.1.0" [dev-dependencies] # In alphabetical order dotenv = "0.15.0" tempfile = "3.1.0" +futures-test = "0.3.12" diff --git a/object_store/src/aws.rs b/object_store/src/aws.rs index a0b27d3fa5..8d8326a6a7 100644 --- a/object_store/src/aws.rs +++ b/object_store/src/aws.rs @@ -1,6 +1,7 @@ //! This module contains the IOx implementation for using S3 as the object //! store. use crate::{ + buffer::slurp_stream_tempfile, path::{cloud::CloudPath, DELIMITER}, ListResult, ObjectMeta, ObjectStoreApi, }; @@ -94,6 +95,9 @@ pub enum Error { source: chrono::ParseError, bucket: String, }, + + #[snafu(display("Unable to buffer data into temporary file, Error: {}", source))] + UnableToBufferStream { source: std::io::Error }, } /// Configuration for connecting to [Amazon S3](https://aws.amazon.com/s3/). @@ -120,11 +124,20 @@ impl ObjectStoreApi for AmazonS3 { CloudPath::default() } - async fn put(&self, location: &Self::Path, bytes: S, length: usize) -> Result<()> + async fn put(&self, location: &Self::Path, bytes: S, length: Option) -> Result<()> where S: Stream> + Send + Sync + 'static, { - let bytes = ByteStream::new_with_size(bytes, length); + let bytes = match length { + Some(length) => ByteStream::new_with_size(bytes, length), + None => { + let bytes = slurp_stream_tempfile(bytes) + .await + .context(UnableToBufferStream)?; + let length = bytes.size(); + ByteStream::new_with_size(bytes, length) + } + }; let put_request = rusoto_s3::PutObjectRequest { bucket: self.bucket_name.clone(), @@ -582,7 +595,7 @@ mod tests { .put( &location, futures::stream::once(async move { stream_data }), - data.len(), + Some(data.len()), ) .await .unwrap_err(); @@ -618,7 +631,7 @@ mod tests { .put( &location, futures::stream::once(async move { stream_data }), - data.len(), + Some(data.len()), ) .await .unwrap_err(); diff --git a/object_store/src/azure.rs b/object_store/src/azure.rs index bc3ec7f1e9..1455de4e4b 100644 --- a/object_store/src/azure.rs +++ b/object_store/src/azure.rs @@ -1,8 +1,11 @@ //! This module contains the IOx implementation for using Azure Blob storage as //! the object store. -use crate::{path::cloud::CloudPath, ListResult, ObjectStoreApi}; +use crate::{ + path::{cloud::CloudPath, DELIMITER}, + ListResult, ObjectMeta, ObjectStoreApi, +}; use async_trait::async_trait; -use azure_core::HttpClient; +use azure_core::prelude::*; use azure_storage::{ clients::{ AsBlobClient, AsContainerClient, AsStorageClient, ContainerClient, StorageAccountClient, @@ -15,8 +18,8 @@ use futures::{ FutureExt, Stream, StreamExt, TryStreamExt, }; use snafu::{ensure, ResultExt, Snafu}; -use std::io; use std::sync::Arc; +use std::{convert::TryInto, io}; /// A specialized `Result` for Azure object store-related errors pub type Result = std::result::Result; @@ -68,7 +71,7 @@ impl ObjectStoreApi for MicrosoftAzure { CloudPath::default() } - async fn put(&self, location: &Self::Path, bytes: S, length: usize) -> Result<()> + async fn put(&self, location: &Self::Path, bytes: S, length: Option) -> Result<()> where S: Stream> + Send + Sync + 'static, { @@ -79,17 +82,19 @@ impl ObjectStoreApi for MicrosoftAzure { .await .expect("Should have been able to collect streaming data"); - ensure!( - temporary_non_streaming.len() == length, - DataDoesNotMatchLength { - actual: temporary_non_streaming.len(), - expected: length, - } - ); + if let Some(length) = length { + ensure!( + temporary_non_streaming.len() == length, + DataDoesNotMatchLength { + actual: temporary_non_streaming.len(), + expected: length, + } + ); + } self.container_client .as_blob_client(&location) - .put_block_blob(&temporary_non_streaming) + .put_block_blob(temporary_non_streaming) .execute() .await .context(UnableToPutData { @@ -166,15 +171,15 @@ impl ObjectStoreApi for MicrosoftAzure { Err(err) => return Some((Err(err), state)), }; - let next_state = if let Some(marker) = resp.incomplete_vector.next_marker() { + let next_state = if let Some(marker) = resp.next_marker { ListState::HasMore(marker.as_str().to_string()) } else { ListState::Done }; let names = resp - .incomplete_vector - .vector + .blobs + .blobs .into_iter() .map(|blob| CloudPath::raw(blob.name)) .collect(); @@ -184,8 +189,55 @@ impl ObjectStoreApi for MicrosoftAzure { .boxed()) } - async fn list_with_delimiter(&self, _prefix: &Self::Path) -> Result> { - unimplemented!(); + async fn list_with_delimiter(&self, prefix: &Self::Path) -> Result> { + let mut request = self.container_client.list_blobs(); + + let prefix = prefix.to_raw(); + + request = request.delimiter(Delimiter::new(DELIMITER)); + request = request.prefix(&*prefix); + + let resp = request.execute().await.context(UnableToListData)?; + + let next_token = resp.next_marker.as_ref().map(|m| m.as_str().to_string()); + + let common_prefixes = resp + .blobs + .blob_prefix + .map(|prefixes| { + prefixes + .iter() + .map(|prefix| CloudPath::raw(&prefix.name)) + .collect() + }) + .unwrap_or_else(Vec::new); + + let objects = resp + .blobs + .blobs + .into_iter() + .map(|blob| { + let location = CloudPath::raw(blob.name); + let last_modified = blob.properties.last_modified; + let size = blob + .properties + .content_length + .try_into() + .expect("unsupported size on this platform"); + + ObjectMeta { + location, + last_modified, + size, + } + }) + .collect(); + + Ok(ListResult { + next_token, + common_prefixes, + objects, + }) } } @@ -233,7 +285,7 @@ impl MicrosoftAzure { #[cfg(test)] mod tests { use super::*; - use crate::tests::put_get_delete_list; + use crate::tests::{list_with_delimiter, put_get_delete_list}; use std::env; type Error = Box; @@ -245,39 +297,35 @@ mod tests { () => { dotenv::dotenv().ok(); - let account = env::var("AZURE_STORAGE_ACCOUNT"); - let container = env::var("AZURE_STORAGE_CONTAINER"); + let required_vars = [ + "AZURE_STORAGE_ACCOUNT", + "AZURE_STORAGE_CONTAINER", + "AZURE_STORAGE_MASTER_KEY", + ]; + let unset_vars: Vec<_> = required_vars + .iter() + .filter_map(|&name| match env::var(name) { + Ok(_) => None, + Err(_) => Some(name), + }) + .collect(); + let unset_var_names = unset_vars.join(", "); + let force = std::env::var("TEST_INTEGRATION"); - match (account.is_ok(), container.is_ok(), force.is_ok()) { - (false, false, true) => { - panic!( - "TEST_INTEGRATION is set, \ - but AZURE_STROAGE_ACCOUNT and AZURE_STORAGE_CONTAINER are not" - ) - } - (false, true, true) => { - panic!("TEST_INTEGRATION is set, but AZURE_STORAGE_ACCOUNT is not") - } - (true, false, true) => { - panic!("TEST_INTEGRATION is set, but AZURE_STORAGE_CONTAINER is not") - } - (false, false, false) => { - eprintln!( - "skipping integration test - set \ - AZURE_STROAGE_ACCOUNT and AZURE_STORAGE_CONTAINER to run" - ); - return Ok(()); - } - (false, true, false) => { - eprintln!("skipping integration test - set AZURE_STORAGE_ACCOUNT to run"); - return Ok(()); - } - (true, false, false) => { - eprintln!("skipping integration test - set AZURE_STROAGE_CONTAINER to run"); - return Ok(()); - } - _ => {} + if force.is_ok() && !unset_var_names.is_empty() { + panic!( + "TEST_INTEGRATION is set, \ + but variable(s) {} need to be set", + unset_var_names + ) + } else if force.is_err() && !unset_var_names.is_empty() { + eprintln!( + "skipping Azure integration test - set \ + {} to run", + unset_var_names + ); + return Ok(()); } }; } @@ -291,6 +339,7 @@ mod tests { let integration = MicrosoftAzure::new_from_env(container_name); put_get_delete_list(&integration).await?; + list_with_delimiter(&integration).await?; Ok(()) } diff --git a/object_store/src/buffer.rs b/object_store/src/buffer.rs new file mode 100644 index 0000000000..10d32483b4 --- /dev/null +++ b/object_store/src/buffer.rs @@ -0,0 +1,130 @@ +//! This module contains a `Stream` wrapper that fully consumes (slurps) a +//! `Stream` so it can compute its size, while saving it to a backing store for +//! later replay. +use bytes::Bytes; +use futures::{pin_mut, Stream, StreamExt}; +use std::io::{Cursor, Result, SeekFrom}; +use std::pin::Pin; +use std::task::{Context, Poll}; +use tokio::fs::File; +use tokio::io::{copy, AsyncRead, AsyncSeek, AsyncSeekExt, AsyncWrite}; +use tokio_util::io::{ReaderStream, StreamReader}; + +/// Returns a BufferedStream backed by a temporary file. +/// +/// The temporary file will be deleted when the result stream +/// is dropped. +pub async fn slurp_stream_tempfile(bytes: S) -> Result> +where + S: Stream> + Send + Sync, +{ + let tmp = File::from_std(tempfile::tempfile()?); + BufferedStream::new(tmp, bytes).await +} + +/// Returns a BufferedStream backed by a in-memory buffer. +#[allow(dead_code)] +pub async fn slurp_stream_memory(bytes: S) -> Result>>> +where + S: Stream> + Send + Sync, +{ + BufferedStream::new(Cursor::new(Vec::new()), bytes).await +} + +// A stream fully buffered by a backing store.. +pub struct BufferedStream +where + R: AsyncRead + AsyncWrite + AsyncSeek + Unpin, +{ + size: usize, + inner: ReaderStream, +} + +impl BufferedStream +where + R: AsyncRead + AsyncWrite + AsyncSeek + Unpin, +{ + /// Consumes the bytes stream fully and writes its content into file. + /// It returns a Stream implementation that reads the same content from the + /// buffered file. + /// + /// The granularity of stream "chunks" will not be preserved. + pub async fn new(mut backing_store: R, bytes: S) -> Result + where + S: Stream> + Send + Sync, + { + pin_mut!(bytes); + let mut read = StreamReader::new(bytes); + let size = copy(&mut read, &mut backing_store).await? as usize; + backing_store.seek(SeekFrom::Start(0)).await?; + + Ok(Self { + size, + inner: ReaderStream::new(backing_store), + }) + } + + pub fn size(&self) -> usize { + self.size + } +} + +impl Stream for BufferedStream +where + R: AsyncRead + AsyncWrite + AsyncSeek + Unpin, +{ + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_next_unpin(cx) + } + + fn size_hint(&self) -> (usize, Option) { + (self.size, Some(self.size())) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use futures::stream::{self, TryStreamExt}; + use futures_test::stream::StreamTestExt; + + fn test_data() -> impl Stream> + Send + Sync { + stream::iter(vec!["foo", "bar", "baz"]) + .map(|i| Ok(Bytes::from(i))) + .interleave_pending() + } + + async fn check_stream(buf_stream: BufferedStream) -> Result<()> + where + R: AsyncRead + AsyncWrite + AsyncSeek + Unpin, + { + assert_eq!(buf_stream.size(), 9); + assert_eq!(buf_stream.size_hint(), (9, Some(9))); + + let content = buf_stream + .map_ok(|b| bytes::BytesMut::from(&b[..])) + .try_concat() + .await?; + + assert_eq!(content, "foobarbaz"); + Ok(()) + } + + #[tokio::test] + async fn test_buffered_stream() -> Result<()> { + let backing_store = std::io::Cursor::new(Vec::new()); // in-memory buffer + check_stream(BufferedStream::new(backing_store, test_data()).await?).await + } + + #[tokio::test] + async fn test_slurp_stream_tempfile() -> Result<()> { + check_stream(slurp_stream_tempfile(test_data()).await?).await + } + + #[tokio::test] + async fn test_slurp_stream_memory() -> Result<()> { + check_stream(slurp_stream_memory(test_data()).await?).await + } +} diff --git a/object_store/src/disk.rs b/object_store/src/disk.rs index 5eece641da..be80a37daa 100644 --- a/object_store/src/disk.rs +++ b/object_store/src/disk.rs @@ -76,7 +76,7 @@ impl ObjectStoreApi for File { FilePath::default() } - async fn put(&self, location: &Self::Path, bytes: S, length: usize) -> Result<()> + async fn put(&self, location: &Self::Path, bytes: S, length: Option) -> Result<()> where S: Stream> + Send + Sync + 'static, { @@ -86,13 +86,15 @@ impl ObjectStoreApi for File { .await .context(UnableToStreamDataIntoMemory)?; - ensure!( - content.len() == length, - DataDoesNotMatchLength { - actual: content.len(), - expected: length, - } - ); + if let Some(length) = length { + ensure!( + content.len() == length, + DataDoesNotMatchLength { + actual: content.len(), + expected: length, + } + ); + } let path = self.path(location); @@ -290,7 +292,7 @@ mod tests { let bytes = stream::once(async { Ok(Bytes::from("hello world")) }); let mut location = integration.new_path(); location.set_file_name("junk"); - let res = integration.put(&location, bytes, 0).await; + let res = integration.put(&location, bytes, Some(0)).await; assert!(matches!( res.err().unwrap(), @@ -317,7 +319,36 @@ mod tests { .put( &location, futures::stream::once(async move { stream_data }), - data.len(), + Some(data.len()), + ) + .await?; + + let read_data = integration + .get(&location) + .await? + .map_ok(|b| bytes::BytesMut::from(&b[..])) + .try_concat() + .await?; + assert_eq!(&*read_data, data); + + Ok(()) + } + + #[tokio::test] + async fn unknown_length() -> Result<()> { + let root = TempDir::new()?; + let integration = File::new(root.path()); + + let data = Bytes::from("arbitrary data"); + let stream_data = std::io::Result::Ok(data.clone()); + + let mut location = integration.new_path(); + location.set_file_name("some_file"); + integration + .put( + &location, + futures::stream::once(async move { stream_data }), + None, ) .await?; diff --git a/object_store/src/gcp.rs b/object_store/src/gcp.rs index 9abd46656d..ac60170508 100644 --- a/object_store/src/gcp.rs +++ b/object_store/src/gcp.rs @@ -84,7 +84,7 @@ impl ObjectStoreApi for GoogleCloudStorage { CloudPath::default() } - async fn put(&self, location: &Self::Path, bytes: S, length: usize) -> Result<()> + async fn put(&self, location: &Self::Path, bytes: S, length: Option) -> Result<()> where S: Stream> + Send + Sync + 'static, { @@ -95,13 +95,15 @@ impl ObjectStoreApi for GoogleCloudStorage { .expect("Should have been able to collect streaming data") .to_vec(); - ensure!( - temporary_non_streaming.len() == length, - DataDoesNotMatchLength { - actual: temporary_non_streaming.len(), - expected: length, - } - ); + if let Some(length) = length { + ensure!( + temporary_non_streaming.len() == length, + DataDoesNotMatchLength { + actual: temporary_non_streaming.len(), + expected: length, + } + ); + } let location = location.to_raw(); let location_copy = location.clone(); @@ -418,7 +420,7 @@ mod test { .put( &location, futures::stream::once(async move { stream_data }), - data.len(), + Some(data.len()), ) .await .unwrap_err(); diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs index 81607bb219..f283e5aea1 100644 --- a/object_store/src/lib.rs +++ b/object_store/src/lib.rs @@ -18,6 +18,7 @@ pub mod aws; pub mod azure; +mod buffer; pub mod disk; pub mod gcp; pub mod memory; @@ -54,7 +55,7 @@ pub trait ObjectStoreApi: Send + Sync + 'static { &self, location: &Self::Path, bytes: S, - length: usize, + length: Option, ) -> Result<(), Self::Error> where S: Stream> + Send + Sync + 'static; @@ -130,7 +131,7 @@ impl ObjectStoreApi for ObjectStore { } } - async fn put(&self, location: &Self::Path, bytes: S, length: usize) -> Result<()> + async fn put(&self, location: &Self::Path, bytes: S, length: Option) -> Result<()> where S: Stream> + Send + Sync + 'static, { @@ -485,7 +486,7 @@ mod tests { .put( &location, futures::stream::once(async move { stream_data }), - data.len(), + Some(data.len()), ) .await?; @@ -551,7 +552,7 @@ mod tests { .put( f, futures::stream::once(async move { stream_data }), - data.len(), + Some(data.len()), ) .await .unwrap(); diff --git a/object_store/src/memory.rs b/object_store/src/memory.rs index 0650712786..c72deadebf 100644 --- a/object_store/src/memory.rs +++ b/object_store/src/memory.rs @@ -45,7 +45,7 @@ impl ObjectStoreApi for InMemory { DirsAndFileName::default() } - async fn put(&self, location: &Self::Path, bytes: S, length: usize) -> Result<()> + async fn put(&self, location: &Self::Path, bytes: S, length: Option) -> Result<()> where S: Stream> + Send + Sync + 'static, { @@ -55,13 +55,15 @@ impl ObjectStoreApi for InMemory { .await .context(UnableToStreamDataIntoMemory)?; - ensure!( - content.len() == length, - DataDoesNotMatchLength { - actual: content.len(), - expected: length, - } - ); + if let Some(length) = length { + ensure!( + content.len() == length, + DataDoesNotMatchLength { + actual: content.len(), + expected: length, + } + ); + } let content = content.freeze(); @@ -201,7 +203,7 @@ mod tests { let bytes = stream::once(async { Ok(Bytes::from("hello world")) }); let mut location = integration.new_path(); location.set_file_name("junk"); - let res = integration.put(&location, bytes, 0).await; + let res = integration.put(&location, bytes, Some(0)).await; assert!(matches!( res.err().unwrap(), @@ -213,4 +215,32 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn unknown_length() -> Result<()> { + let integration = InMemory::new(); + + let data = Bytes::from("arbitrary data"); + let stream_data = std::io::Result::Ok(data.clone()); + + let mut location = integration.new_path(); + location.set_file_name("some_file"); + integration + .put( + &location, + futures::stream::once(async move { stream_data }), + None, + ) + .await?; + + let read_data = integration + .get(&location) + .await? + .map_ok(|b| bytes::BytesMut::from(&b[..])) + .try_concat() + .await?; + assert_eq!(&*read_data, data); + + Ok(()) + } } diff --git a/query/src/exec.rs b/query/src/exec.rs index 4c5cf176ad..404503b294 100644 --- a/query/src/exec.rs +++ b/query/src/exec.rs @@ -28,7 +28,7 @@ use tokio::sync::mpsc::{self, error::SendError}; use snafu::{ResultExt, Snafu}; -use crate::plan::stringset::StringSetPlan; +use crate::plan::{fieldlist::FieldListPlan, stringset::StringSetPlan}; #[derive(Debug, Snafu)] pub enum Error { @@ -170,14 +170,6 @@ impl From> for SeriesSetPlans { } } -/// A plan that can be run to produce a sequence of FieldLists -/// DataFusion plans or a known set of results -#[derive(Debug)] -pub enum FieldListPlan { - Known(Result), - Plans(Vec), -} - /// Handles executing plans, and marshalling the results into rust /// native structures. #[derive(Debug, Default)] @@ -295,46 +287,43 @@ impl Executor { /// Executes `plan` and return the resulting FieldList pub async fn to_field_list(&self, plan: FieldListPlan) -> Result { - match plan { - FieldListPlan::Known(res) => res, - FieldListPlan::Plans(plans) => { - // Run the plans in parallel - let handles = plans - .into_iter() - .map(|plan| { - let counters = Arc::clone(&self.counters); + let FieldListPlan { plans } = plan; - tokio::task::spawn(async move { - let ctx = IOxExecutionContext::new(counters); - let physical_plan = ctx - .prepare_plan(&plan) - .await - .context(DataFusionPhysicalPlanning)?; + // Run the plans in parallel + let handles = plans + .into_iter() + .map(|plan| { + let counters = Arc::clone(&self.counters); - // TODO: avoid this buffering - let fieldlist = ctx - .collect(physical_plan) - .await - .context(FieldListExectuon)? - .into_fieldlist() - .context(FieldListConversion); + tokio::task::spawn(async move { + let ctx = IOxExecutionContext::new(counters); + let physical_plan = ctx + .prepare_plan(&plan) + .await + .context(DataFusionPhysicalPlanning)?; - Ok(fieldlist) - }) - }) - .collect::>(); + // TODO: avoid this buffering + let fieldlist = ctx + .collect(physical_plan) + .await + .context(FieldListExectuon)? + .into_fieldlist() + .context(FieldListConversion); - // collect them all up and combine them - let mut results = Vec::new(); - for join_handle in handles { - let fieldlist = join_handle.await.context(JoinError)???; + Ok(fieldlist) + }) + }) + .collect::>(); - results.push(fieldlist); - } + // collect them all up and combine them + let mut results = Vec::new(); + for join_handle in handles { + let fieldlist = join_handle.await.context(JoinError)???; - results.into_fieldlist().context(FieldListConversion) - } + results.push(fieldlist); } + + results.into_fieldlist().context(FieldListConversion) } /// Run the plan and return a record batch reader for reading the results diff --git a/query/src/frontend/influxrpc.rs b/query/src/frontend/influxrpc.rs index 4b7e985c64..43f3ebf9b7 100644 --- a/query/src/frontend/influxrpc.rs +++ b/query/src/frontend/influxrpc.rs @@ -5,7 +5,7 @@ use std::{ use arrow_deps::datafusion::{ error::{DataFusionError, Result as DatafusionResult}, - logical_plan::{Expr, ExpressionVisitor, LogicalPlanBuilder, Operator, Recursion}, + logical_plan::{Expr, ExpressionVisitor, LogicalPlan, LogicalPlanBuilder, Operator, Recursion}, prelude::col, }; use data_types::{ @@ -17,7 +17,10 @@ use tracing::debug; use crate::{ exec::{make_schema_pivot, stringset::StringSet}, - plan::stringset::{Error as StringSetError, StringSetPlan, StringSetPlanBuilder}, + plan::{ + fieldlist::FieldListPlan, + stringset::{Error as StringSetError, StringSetPlan, StringSetPlanBuilder}, + }, predicate::{Predicate, PredicateBuilder}, provider::ProviderBuilder, util::schema_has_all_expr_columns, @@ -188,40 +191,10 @@ impl InfluxRPCPlanner { // entirely using the metadata let mut need_full_plans = BTreeMap::new(); - let no_tables = StringSet::new(); let mut known_columns = BTreeSet::new(); for chunk in self.filtered_chunks(database, &predicate).await? { // try and get the table names that have rows that match the predicate - let table_names = chunk - .table_names(&predicate, &no_tables) - .await - .map_err(|e| Box::new(e) as _) - .context(TableNamePlan)?; - - debug!(table_names=?table_names, chunk_id = chunk.id(), "chunk tables"); - - let table_names = match table_names { - Some(table_names) => { - debug!("found table names with original predicate"); - table_names - } - None => { - // couldn't find table names with predicate, get all chunk tables, - // fall back to filtering ourself - let table_name_predicate = if let Some(table_names) = &predicate.table_names { - PredicateBuilder::new().tables(table_names).build() - } else { - Predicate::default() - }; - chunk - .table_names(&table_name_predicate, &no_tables) - .await - .map_err(|e| Box::new(e) as _) - .context(InternalTableNamePlanForDefault)? - // unwrap the Option - .context(InternalTableNameCannotGetPlanForDefault)? - } - }; + let table_names = self.chunk_table_names(chunk.as_ref(), &predicate).await?; for table_name in table_names { debug!( @@ -296,6 +269,97 @@ impl InfluxRPCPlanner { .context(CreatingStringSet) } + /// Returns a plan that produces a list of columns and their + /// datatypes (as defined in the data written via `write_lines`), + /// and which have more than zero rows which pass the conditions + /// specified by `predicate`. + pub async fn field_columns( + &self, + database: &D, + predicate: Predicate, + ) -> Result + where + D: Database + 'static, + { + debug!(predicate=?predicate, "planning field_columns"); + + // Algorithm is to run a "select field_cols from table where + // type plan for each table in the chunks" + // + // The executor then figures out which columns have non-null + // values and stops the plan executing once it has them + + // map table -> Vec> + let mut table_chunks = BTreeMap::new(); + let chunks = self.filtered_chunks(database, &predicate).await?; + for chunk in chunks { + let table_names = self.chunk_table_names(chunk.as_ref(), &predicate).await?; + for table_name in table_names { + table_chunks + .entry(table_name) + .or_insert_with(Vec::new) + .push(Arc::clone(&chunk)); + } + } + + let mut field_list_plan = FieldListPlan::new(); + for (table_name, chunks) in table_chunks { + if let Some(plan) = self + .field_columns_plan(&table_name, &predicate, chunks) + .await? + { + field_list_plan = field_list_plan.append(plan); + } + } + + Ok(field_list_plan) + } + + /// Find all the table names in the specified chunk that pass the predicate + async fn chunk_table_names( + &self, + chunk: &C, + predicate: &Predicate, + ) -> Result> + where + C: PartitionChunk + 'static, + { + let no_tables = StringSet::new(); + + // try and get the table names that have rows that match the predicate + let table_names = chunk + .table_names(&predicate, &no_tables) + .await + .map_err(|e| Box::new(e) as _) + .context(TableNamePlan)?; + + debug!(table_names=?table_names, chunk_id = chunk.id(), "chunk tables"); + + let table_names = match table_names { + Some(table_names) => { + debug!("found table names with original predicate"); + table_names + } + None => { + // couldn't find table names with predicate, get all chunk tables, + // fall back to filtering ourself + let table_name_predicate = if let Some(table_names) = &predicate.table_names { + PredicateBuilder::new().tables(table_names).build() + } else { + Predicate::default() + }; + chunk + .table_names(&table_name_predicate, &no_tables) + .await + .map_err(|e| Box::new(e) as _) + .context(InternalTableNamePlanForDefault)? + // unwrap the Option + .context(InternalTableNameCannotGetPlanForDefault)? + } + }; + Ok(table_names) + } + /// removes any columns from Names that are not "Tag"s in the Influx Data /// Model fn restrict_to_tags(&self, schema: &Schema, names: BTreeSet) -> BTreeSet { @@ -313,9 +377,11 @@ impl InfluxRPCPlanner { /// /// The created plan looks like: /// + /// ```text /// Extension(PivotSchema) /// Filter(predicate) /// TableScan (of chunks) + /// ``` async fn tag_column_names_plan( &self, table_name: &str, @@ -325,6 +391,123 @@ impl InfluxRPCPlanner { where C: PartitionChunk + 'static, { + let scan_and_filter = self.scan_and_filter(table_name, predicate, chunks).await?; + + let TableScanAndFilter { + plan_builder, + schema, + } = match scan_and_filter { + None => return Ok(None), + Some(t) => t, + }; + + // now, select only the tag columns + let select_exprs = schema + .iter() + .filter_map(|(influx_column_type, field)| { + if matches!(influx_column_type, Some(InfluxColumnType::Tag)) { + Some(col(field.name())) + } else { + None + } + }) + .collect::>(); + + let plan = plan_builder + .project(&select_exprs) + .context(BuildingPlan)? + .build() + .context(BuildingPlan)?; + + // And finally pivot the plan + let plan = make_schema_pivot(plan); + debug!(table_name=table_name, plan=%plan.display_indent_schema(), + "created column_name plan for table"); + + Ok(Some(plan.into())) + } + + /// Creates a DataFusion LogicalPlan that returns the timestamp + /// and all field columns for a specified table: + /// + /// The output looks like (field0, field1, ..., time) + /// + /// The data is not sorted in any particular order + /// + /// returns `None` if the table contains no rows that would pass + /// the predicate. + /// + /// The created plan looks like: + /// + /// ```text + /// Projection (select the field columns needed) + /// Filter(predicate) [optional] + /// InMemoryScan + /// ``` + async fn field_columns_plan( + &self, + table_name: &str, + predicate: &Predicate, + chunks: Vec>, + ) -> Result> + where + C: PartitionChunk + 'static, + { + let scan_and_filter = self.scan_and_filter(table_name, predicate, chunks).await?; + let TableScanAndFilter { + plan_builder, + schema, + } = match scan_and_filter { + None => return Ok(None), + Some(t) => t, + }; + + // Selection of only fields and time + let select_exprs = schema + .iter() + .filter_map(|(influx_column_type, field)| match influx_column_type { + Some(InfluxColumnType::Field(_)) => Some(col(field.name())), + Some(InfluxColumnType::Timestamp) => Some(col(field.name())), + Some(_) => None, + None => None, + }) + .collect::>(); + + let plan = plan_builder + .project(&select_exprs) + .context(BuildingPlan)? + .build() + .context(BuildingPlan)?; + + Ok(Some(plan)) + } + + /// Create a plan that scans the specified table, and applies any + /// filtering specified on the predicate, if any. + /// + /// If the table can produce no rows based on predicate + /// evaluation, returns Ok(None) + /// + /// The created plan looks like: + /// + /// ```text + /// Filter(predicate) [optional] + /// InMemoryScan + /// ``` + async fn scan_and_filter( + &self, + table_name: &str, + predicate: &Predicate, + chunks: Vec>, + ) -> Result> + where + C: PartitionChunk + 'static, + { + // Scan all columns to begin with (datafusion projection + // pushdown optimization will prune out uneeded columns later) + let projection = None; + let selection = Selection::All; + // Prepare the scan of the table let mut builder = ProviderBuilder::new(table_name); for chunk in chunks { @@ -339,7 +522,7 @@ impl InfluxRPCPlanner { ); let chunk_table_schema = chunk - .table_schema(table_name, Selection::All) + .table_schema(table_name, selection) .await .map_err(|e| Box::new(e) as _) .context(GettingTableSchema { @@ -355,10 +538,6 @@ impl InfluxRPCPlanner { let provider = builder.build().context(CreatingProvider { table_name })?; let schema = provider.iox_schema(); - // Scan all columns to begin with (datafusion projection - // pushdown optimization will prune out uneeded columns later) - let projection = None; - let mut plan_builder = LogicalPlanBuilder::scan(table_name, Arc::new(provider), projection) .context(BuildingPlan)?; @@ -369,40 +548,25 @@ impl InfluxRPCPlanner { // to evaluate the predicate (if not, it means no rows can // match and thus we should skip this plan) if !schema_has_all_expr_columns(&schema, &filter_expr) { - debug!(table_name=table_name, schema=?schema, filter_expr=?filter_expr, "Skipping table as schema doesn't have all filter_expr columns"); + debug!(table_name=table_name, + schema=?schema, + filter_expr=?filter_expr, + "Skipping table as schema doesn't have all filter_expr columns"); return Ok(None); } // Assuming that if a table doesn't have all the columns // in an expression it can't be true isn't correct for // certain predicates (e.g. IS NOT NULL), so error out - // here until we have proper support for that + // here until we have proper support for that case check_predicate_support(&filter_expr)?; plan_builder = plan_builder.filter(filter_expr).context(BuildingPlan)?; } - // now, select only the tag columns - let select_exprs = schema - .iter() - .filter_map(|(influx_column_type, field)| { - if matches!(influx_column_type, Some(InfluxColumnType::Tag)) { - Some(col(field.name())) - } else { - None - } - }) - .collect::>(); - - let plan_builder = plan_builder.project(&select_exprs).context(BuildingPlan)?; - - let plan = plan_builder.build().context(BuildingPlan)?; - - // And finally pivot the plan - let plan = make_schema_pivot(plan); - debug!(table_name=table_name, plan=%plan.display_indent_schema(), - "created column_name plan for table"); - - Ok(Some(plan.into())) + Ok(Some(TableScanAndFilter { + plan_builder, + schema, + })) } /// Returns a list of chunks across all partitions which may @@ -422,7 +586,6 @@ impl InfluxRPCPlanner { let partition_keys = database .partition_keys() - .await .map_err(|e| Box::new(e) as _) .context(ListingPartitions)?; @@ -430,7 +593,7 @@ impl InfluxRPCPlanner { for key in partition_keys { // TODO prune partitions somehow - let partition_chunks = database.chunks(&key).await; + let partition_chunks = database.chunks(&key); for chunk in partition_chunks { let could_pass_predicate = chunk .could_pass_predicate(predicate) @@ -505,3 +668,10 @@ impl ExpressionVisitor for SupportVisitor { } } } + +struct TableScanAndFilter { + /// Represents plan that scans a table and applies optional filtering + plan_builder: LogicalPlanBuilder, + /// The IOx schema of the result + schema: Schema, +} diff --git a/query/src/frontend/sql.rs b/query/src/frontend/sql.rs index 2324875755..ce305c21a0 100644 --- a/query/src/frontend/sql.rs +++ b/query/src/frontend/sql.rs @@ -92,7 +92,6 @@ impl SQLQueryPlanner { let partition_keys = database .partition_keys() - .await .map_err(|e| Box::new(e) as _) .context(GettingDatabasePartition)?; @@ -103,7 +102,7 @@ impl SQLQueryPlanner { let mut builder = ProviderBuilder::new(table_name); for partition_key in &partition_keys { - for chunk in database.chunks(partition_key).await { + for chunk in database.chunks(partition_key) { if chunk.has_table(table_name) { let chunk_id = chunk.id(); let chunk_table_schema = chunk @@ -129,7 +128,7 @@ impl SQLQueryPlanner { .context(CreatingTableProvider { table_name })?; ctx.inner_mut() - .register_table(&table_name, Box::new(provider)); + .register_table(&table_name, Arc::new(provider)); } ctx.prepare_sql(query).await.context(Preparing) diff --git a/query/src/func/selectors.rs b/query/src/func/selectors.rs index 0d22bde0ba..484f5e5843 100644 --- a/query/src/func/selectors.rs +++ b/query/src/func/selectors.rs @@ -672,7 +672,7 @@ mod test { ) .unwrap(); let mut ctx = ExecutionContext::new(); - ctx.register_table("t", Box::new(provider)); + ctx.register_table("t", Arc::new(provider)); let df = ctx.table("t").unwrap(); let df = df.aggregate(&[], &aggs).unwrap(); diff --git a/query/src/lib.rs b/query/src/lib.rs index 48a09817c9..95308bc8ca 100644 --- a/query/src/lib.rs +++ b/query/src/lib.rs @@ -11,7 +11,7 @@ use async_trait::async_trait; use data_types::{ data::ReplicatedWrite, partition_metadata::TableSummary, schema::Schema, selection::Selection, }; -use exec::{stringset::StringSet, Executor, FieldListPlan, SeriesSetPlans}; +use exec::{stringset::StringSet, Executor, SeriesSetPlans}; use plan::stringset::StringSetPlan; use std::{fmt::Debug, sync::Arc}; @@ -44,23 +44,17 @@ pub trait Database: Debug + Send + Sync { async fn store_replicated_write(&self, write: &ReplicatedWrite) -> Result<(), Self::Error>; /// Return the partition keys for data in this DB - async fn partition_keys(&self) -> Result, Self::Error>; + fn partition_keys(&self) -> Result, Self::Error>; /// Returns a covering set of chunks in the specified partition. A /// covering set means that together the chunks make up a single /// complete copy of the data being queried. - async fn chunks(&self, partition_key: &str) -> Vec>; + fn chunks(&self, partition_key: &str) -> Vec>; // ---------- // The functions below are slated for removal (migration into a gRPC query // frontend) --------- - /// Returns a plan that produces a list of column names in this - /// database which store fields (as defined in the data written - /// via `write_lines`), and which have at least one row which - /// matches the conditions listed on `predicate`. - async fn field_column_names(&self, predicate: Predicate) -> Result; - /// Returns a plan which finds the distinct values in the /// `column_name` column of this database which pass the /// conditions specified by `predicate`. diff --git a/query/src/plan.rs b/query/src/plan.rs index d2d466080e..b7baba6ffb 100644 --- a/query/src/plan.rs +++ b/query/src/plan.rs @@ -1 +1,2 @@ +pub mod fieldlist; pub mod stringset; diff --git a/query/src/plan/fieldlist.rs b/query/src/plan/fieldlist.rs new file mode 100644 index 0000000000..5e24a00c4e --- /dev/null +++ b/query/src/plan/fieldlist.rs @@ -0,0 +1,20 @@ +use arrow_deps::datafusion::logical_plan::LogicalPlan; + +/// A plan which produces a logical set of Fields (e.g. InfluxDB +/// Fields with name, and data type, and last_timestamp). +#[derive(Debug, Default)] +pub struct FieldListPlan { + pub plans: Vec, +} + +impl FieldListPlan { + pub fn new() -> Self { + Self::default() + } + + /// Append a new plan to this list of plans + pub fn append(mut self, plan: LogicalPlan) -> Self { + self.plans.push(plan); + self + } +} diff --git a/query/src/test.rs b/query/src/test.rs index 186d85d115..ee1f29afb7 100644 --- a/query/src/test.rs +++ b/query/src/test.rs @@ -1,13 +1,22 @@ //! This module provides a reference implementaton of `query::DatabaseSource` //! and `query::Database` for use in testing. +//! +//! AKA it is a Mock -use arrow_deps::datafusion::physical_plan::SendableRecordBatchStream; +use arrow_deps::{ + arrow::{ + array::{ArrayRef, Int64Array, StringArray}, + datatypes::DataType, + record_batch::RecordBatch, + }, + datafusion::physical_plan::{common::SizedRecordBatchStream, SendableRecordBatchStream}, +}; use crate::{exec::Executor, group_by::GroupByAndAggregate, plan::stringset::StringSetPlan}; use crate::{ exec::{ stringset::{StringSet, StringSetRef}, - FieldListPlan, SeriesSetPlans, + SeriesSetPlans, }, Database, DatabaseStore, PartitionChunk, Predicate, }; @@ -64,12 +73,6 @@ pub struct TestDatabase { /// The last request for `query_series` query_groups_request: Arc>>, - - /// Responses to return on the next request to `field_column_values` - field_columns_value: Arc>>, - - /// The last request for `query_series` - field_columns_request: Arc>>, } /// Records the parameters passed to a column values request @@ -98,13 +101,6 @@ pub struct QueryGroupsRequest { pub gby_agg: GroupByAndAggregate, } -/// Records the parameters passed to a `field_columns` request -#[derive(Debug, PartialEq, Clone)] -pub struct FieldColumnsRequest { - /// Stringified '{:?}' version of the predicate - pub predicate: String, -} - #[derive(Snafu, Debug)] pub enum TestError { #[snafu(display("Test database error: {}", message))] @@ -233,21 +229,6 @@ impl TestDatabase { .expect("mutex poisoned") .take() } - - /// Set the FieldSet plan that will be returned - pub fn set_field_colum_names_values(&self, plan: FieldListPlan) { - *(Arc::clone(&self.field_columns_value) - .lock() - .expect("mutex poisoned")) = Some(plan); - } - - /// Get the parameters from the last column name request - pub fn get_field_columns_request(&self) -> Option { - Arc::clone(&self.field_columns_request) - .lock() - .expect("mutex poisoned") - .take() - } } /// returns true if this line is within the range of the timestamp @@ -307,27 +288,6 @@ impl Database for TestDatabase { Ok(()) } - async fn field_column_names(&self, predicate: Predicate) -> Result { - // save the request - let predicate = predicate_to_test_string(&predicate); - - let field_columns_request = Some(FieldColumnsRequest { predicate }); - - *Arc::clone(&self.field_columns_request) - .lock() - .expect("mutex poisoned") = field_columns_request; - - // pull out the saved columns - Arc::clone(&self.field_columns_value) - .lock() - .expect("mutex poisoned") - .take() - // Turn None into an error - .context(General { - message: "No saved field_column_name in TestDatabase", - }) - } - /// Return the mocked out column values, recording the request async fn column_values( &self, @@ -402,13 +362,13 @@ impl Database for TestDatabase { } /// Return the partition keys for data in this DB - async fn partition_keys(&self) -> Result, Self::Error> { + fn partition_keys(&self) -> Result, Self::Error> { let partitions = self.partitions.lock().expect("mutex poisoned"); let keys = partitions.keys().cloned().collect(); Ok(keys) } - async fn chunks(&self, partition_key: &str) -> Vec> { + fn chunks(&self, partition_key: &str) -> Vec> { let partitions = self.partitions.lock().expect("mutex poisoned"); if let Some(chunks) = partitions.get(partition_key) { chunks.values().cloned().collect() @@ -428,6 +388,9 @@ pub struct TestChunk { /// Column names: table_name -> Schema table_schemas: BTreeMap, + /// RecordBatches that are returned on each request + table_data: BTreeMap>>, + /// A saved error that is returned instead of actual results saved_error: Option, } @@ -463,7 +426,7 @@ impl TestChunk { /// Register an tag column with the test chunk pub fn with_tag_column( - mut self, + self, table_name: impl Into, column_name: impl Into, ) -> Self { @@ -474,6 +437,43 @@ impl TestChunk { // merge it in to any existing schema let new_column_schema = SchemaBuilder::new().tag(&column_name).build().unwrap(); + self.add_schema_to_table(table_name, new_column_schema) + } + + /// Register a timetamp column with the test chunk + pub fn with_time_column(self, table_name: impl Into) -> Self { + let table_name = table_name.into(); + + // make a new schema with the specified column and + // merge it in to any existing schema + let new_column_schema = SchemaBuilder::new().timestamp().build().unwrap(); + + self.add_schema_to_table(table_name, new_column_schema) + } + + /// Register an int field column with the test chunk + pub fn with_int_field_column( + self, + table_name: impl Into, + column_name: impl Into, + ) -> Self { + let column_name = column_name.into(); + + // make a new schema with the specified column and + // merge it in to any existing schema + let new_column_schema = SchemaBuilder::new() + .field(&column_name, DataType::Int64) + .build() + .unwrap(); + self.add_schema_to_table(table_name, new_column_schema) + } + + fn add_schema_to_table( + mut self, + table_name: impl Into, + new_column_schema: Schema, + ) -> Self { + let table_name = table_name.into(); let mut merger = SchemaMerger::new().merge(new_column_schema).unwrap(); if let Some(existing_schema) = self.table_schemas.remove(&table_name) { @@ -496,6 +496,37 @@ impl TestChunk { //.map(|v| v.clone()) .cloned() } + + /// Prepares this chunk to return a specific record batch with one + /// row of non null data. + pub fn with_one_row_of_null_data(mut self, table_name: impl Into) -> Self { + let table_name = table_name.into(); + let schema = self + .table_schemas + .get(&table_name) + .expect("table must exist in TestChunk"); + + // create arays + let columns = schema + .iter() + .map(|(_influxdb_column_type, field)| match field.data_type() { + DataType::Int64 => Arc::new(Int64Array::from(vec![1000])) as ArrayRef, + DataType::Utf8 => Arc::new(StringArray::from(vec!["MA"])) as ArrayRef, + _ => unimplemented!( + "Unimplemented data type for test database: {:?}", + field.data_type() + ), + }) + .collect::>(); + + let batch = RecordBatch::try_new(schema.into(), columns).expect("made record batch"); + + self.table_data + .entry(table_name) + .or_default() + .push(Arc::new(batch)); + self + } } #[async_trait] @@ -514,11 +545,21 @@ impl PartitionChunk for TestChunk { async fn read_filter( &self, - _table_name: &str, - _predicate: &Predicate, + table_name: &str, + predicate: &Predicate, _selection: Selection<'_>, ) -> Result { - unimplemented!() + self.check_error()?; + + // save the predicate + self.predicate + .lock() + .expect("mutex poisoned") + .replace(predicate.clone()); + + let batches = self.table_data.get(table_name).expect("Table had data"); + let stream = SizedRecordBatchStream::new(batches[0].schema(), batches.clone()); + Ok(Box::pin(stream)) } async fn table_names( @@ -562,8 +603,8 @@ impl PartitionChunk for TestChunk { }) } - fn has_table(&self, _table_name: &str) -> bool { - unimplemented!() + fn has_table(&self, table_name: &str) -> bool { + self.table_schemas.contains_key(table_name) } async fn column_names( diff --git a/read_buffer/src/chunk.rs b/read_buffer/src/chunk.rs index 9068e35107..3ff8399f40 100644 --- a/read_buffer/src/chunk.rs +++ b/read_buffer/src/chunk.rs @@ -273,6 +273,7 @@ impl Chunk { &self, table_name: &str, predicate: &Predicate, + columns: Selection<'_>, dst: BTreeSet, ) -> BTreeSet { let chunk_data = self.chunk_data.read().unwrap(); @@ -280,7 +281,7 @@ impl Chunk { // TODO(edd): same potential contention as `table_names` but I'm ok // with this for now. match chunk_data.data.get(table_name) { - Some(table) => table.column_names(predicate, dst), + Some(table) => table.column_names(predicate, columns, dst), None => dst, } } diff --git a/read_buffer/src/lib.rs b/read_buffer/src/lib.rs index 24bcf80218..7a453fee4d 100644 --- a/read_buffer/src/lib.rs +++ b/read_buffer/src/lib.rs @@ -460,14 +460,17 @@ impl Database { Ok(names) } - /// Returns the distinct set of column names (tag keys) that satisfy the - /// provided predicate. + /// Returns the distinct set of column names that satisfy the provided + /// predicate. Columns can be limited via a selection, which means callers + /// that know they are only interested in certain columns can specify those + /// and reduce total execution time. pub fn column_names( &self, partition_key: &str, table_name: &str, chunk_ids: &[u32], predicate: Predicate, + only_columns: Selection<'_>, ) -> Result>> { let partition_data = self.data.read().unwrap(); @@ -493,7 +496,7 @@ impl Database { // the dst buffer is pushed into each chunk's `column_names` // implementation ensuring that we short-circuit any tables where // we have already determined column names. - chunk.column_names(table_name, &predicate, dst) + chunk.column_names(table_name, &predicate, only_columns, dst) }); Ok(Some(names)) @@ -783,7 +786,7 @@ mod test { array::{ ArrayRef, BinaryArray, BooleanArray, Float64Array, Int64Array, StringArray, UInt64Array, }, - datatypes::DataType::{Boolean, Float64, Int64, UInt64}, + datatypes::DataType::{Boolean, Float64, Int64, UInt64, Utf8}, }; use data_types::schema::builder::SchemaBuilder; @@ -1092,7 +1095,13 @@ mod test { // Just query against the first chunk. let result = db - .column_names("hour_1", "Utopia", &[22], Predicate::default()) + .column_names( + "hour_1", + "Utopia", + &[22], + Predicate::default(), + Selection::All, + ) .unwrap(); assert_eq!( @@ -1100,14 +1109,26 @@ mod test { Some(to_set(&["counter", "region", "sketchy_sensor", "time"])) ); let result = db - .column_names("hour_1", "Utopia", &[40], Predicate::default()) + .column_names( + "hour_1", + "Utopia", + &[40], + Predicate::default(), + Selection::All, + ) .unwrap(); assert_eq!(result, Some(to_set(&["active", "time"]))); // And now the union across all chunks. let result = db - .column_names("hour_1", "Utopia", &[22, 40], Predicate::default()) + .column_names( + "hour_1", + "Utopia", + &[22, 40], + Predicate::default(), + Selection::All, + ) .unwrap(); assert_eq!( @@ -1128,6 +1149,7 @@ mod test { "Utopia", &[22, 40], Predicate::new(vec![BinaryExpr::from(("time", "=", 30_i64))]), + Selection::All, ) .unwrap(); @@ -1141,6 +1163,7 @@ mod test { "Utopia", &[22, 40], Predicate::new(vec![BinaryExpr::from(("active", "=", true))]), + Selection::All, ) .unwrap(); @@ -1161,6 +1184,7 @@ mod test { .non_null_field("counter", Float64) .field("sketchy_sensor", Int64) .non_null_field("active", Boolean) + .field("msg", Utf8) .timestamp() .build() .unwrap(); @@ -1171,6 +1195,11 @@ mod test { Arc::new(Float64Array::from(vec![1.2, 300.3, 4500.3])), Arc::new(Int64Array::from(vec![None, Some(33), Some(44)])), Arc::new(BooleanArray::from(vec![true, false, false])), + Arc::new(StringArray::from(vec![ + Some("message a"), + Some("message b"), + None, + ])), Arc::new(Int64Array::from(vec![i, 2 * i, 3 * i])), ]; @@ -1212,6 +1241,11 @@ mod test { &exp_sketchy_sensor_values, ); assert_rb_column_equals(&first_row_group, "active", &exp_active_values); + assert_rb_column_equals( + &first_row_group, + "msg", + &Values::String(vec![Some("message a")]), + ); assert_rb_column_equals(&first_row_group, "time", &Values::I64(vec![100])); // first row from first record batch let second_row_group = itr.next().unwrap(); @@ -1314,6 +1348,7 @@ mod test { .non_null_field("counter", UInt64) .field("sketchy_sensor", UInt64) .non_null_field("active", Boolean) + .non_null_field("msg", Utf8) .timestamp() .build() .unwrap(); @@ -1325,6 +1360,7 @@ mod test { Arc::new(UInt64Array::from(vec![1000, 3000, 5000])), Arc::new(UInt64Array::from(vec![Some(44), None, Some(55)])), Arc::new(BooleanArray::from(vec![true, true, false])), + Arc::new(StringArray::from(vec![Some("msg a"), Some("msg b"), None])), Arc::new(Int64Array::from(vec![i, 20 + i, 30 + i])), ]; @@ -1363,6 +1399,7 @@ mod test { ("active", AggregateType::Count), ("active", AggregateType::Min), ("active", AggregateType::Max), + ("msg", AggregateType::Max), ], ) .unwrap(); @@ -1381,6 +1418,7 @@ mod test { assert_rb_column_equals(&result, "active_count", &Values::U64(vec![3])); assert_rb_column_equals(&result, "active_min", &Values::Bool(vec![Some(false)])); assert_rb_column_equals(&result, "active_max", &Values::Bool(vec![Some(true)])); + assert_rb_column_equals(&result, "msg_max", &Values::String(vec![Some("msg b")])); // // With group keys diff --git a/read_buffer/src/row_group.rs b/read_buffer/src/row_group.rs index a454261ec6..1ef1d0cfae 100644 --- a/read_buffer/src/row_group.rs +++ b/read_buffer/src/row_group.rs @@ -22,7 +22,10 @@ use arrow_deps::{ arrow, datafusion::logical_plan::Expr as DfExpr, datafusion::scalar::ScalarValue as DFScalarValue, }; -use data_types::schema::{InfluxColumnType, Schema}; +use data_types::{ + schema::{InfluxColumnType, Schema}, + selection::Selection, +}; /// The name used for a timestamp column. pub const TIME_COLUMN_NAME: &str = data_types::TIME_COLUMN_NAME; @@ -46,7 +49,7 @@ pub enum Error { pub type Result = std::result::Result; /// A `RowGroup` is an immutable horizontal chunk of a single `Table`. By -/// definition it has the same schema as all the other read groups in the table. +/// definition it has the same schema as all the other row groups in the table. /// All the columns within the `RowGroup` must have the same number of logical /// rows. pub struct RowGroup { @@ -135,7 +138,7 @@ impl RowGroup { } } - /// The total estimated size in bytes of the read group + /// The total estimated size in bytes of the row group pub fn size(&self) -> u64 { let base_size = std::mem::size_of::() + self @@ -941,7 +944,12 @@ impl RowGroup { /// /// If you are familiar with InfluxDB, this is essentially an implementation /// of `SHOW TAG KEYS`. - pub fn column_names(&self, predicate: &Predicate, dst: &mut BTreeSet) { + pub fn column_names( + &self, + predicate: &Predicate, + columns: Selection<'_>, + dst: &mut BTreeSet, + ) { // Determine the set of columns in this row group that are not already // present in `dst`, i.e., they haven't been identified in other row // groups already. @@ -951,7 +959,16 @@ impl RowGroup { .filter_map(|(name, &id)| match dst.contains(name) { // N.B there is bool::then() but it's currently unstable. true => None, - false => Some((name, &self.columns[id])), + false => match columns { + Selection::All => Some((name, &self.columns[id])), + Selection::Some(names) => { + if names.iter().any(|selection| name == selection) { + Some((name, &self.columns[id])) + } else { + None + } + } + }, }) .collect::>(); @@ -1019,6 +1036,9 @@ impl From for RowGroup { arrow::datatypes::DataType::Boolean => { Column::from(arrow::array::BooleanArray::from(arrow_column.data())) } + arrow::datatypes::DataType::Utf8 => { + Column::from(arrow::array::StringArray::from(arrow_column.data())) + } dt => unimplemented!( "data type {:?} currently not supported for field columns", dt @@ -1495,6 +1515,11 @@ impl MetaData { self.columns_size += column_size; } + // Returns meta information about the column. + fn column_meta(&self, name: ColumnName<'_>) -> &ColumnMeta { + self.columns.get(name).unwrap() + } + // Extract schema information for a set of columns. fn schema_for_column_names( &self, @@ -2193,13 +2218,13 @@ west,4 // columns read_group_all_rows_all_rle(&row_group); - // test read group queries that group on fewer than five columns. + // test row group queries that group on fewer than five columns. read_group_hash_u128_key(&row_group); - // test read group queries that use a vector-based group key. + // test row group queries that use a vector-based group key. read_group_hash_vec_key(&row_group); - // test read group queries that only group on one column. + // test row group queries that only group on one column. read_group_single_groupby_column(&row_group); } @@ -2834,16 +2859,21 @@ west,host-d,11,9 &[Some("Thinking"), Some("of"), Some("a"), Some("place")][..], )); columns.insert("track".to_string(), track); + let temp = ColumnType::Field(Column::from( + &[Some("hot"), Some("cold"), Some("cold"), Some("warm")][..], + )); + columns.insert("temp".to_string(), temp); + let tc = ColumnType::Time(Column::from(&[100_i64, 200, 500, 600][..])); columns.insert("time".to_string(), tc); let row_group = RowGroup::new(4, columns); // No predicate - just find a value in each column that matches. let mut dst = BTreeSet::new(); - row_group.column_names(&Predicate::default(), &mut dst); + row_group.column_names(&Predicate::default(), Selection::All, &mut dst); assert_eq!( dst, - vec!["region", "time", "track"] + vec!["region", "temp", "time", "track"] .into_iter() .map(|s| s.to_owned()) .collect() @@ -2853,6 +2883,7 @@ west,host-d,11,9 let mut dst = BTreeSet::new(); row_group.column_names( &Predicate::new(vec![BinaryExpr::from(("region", "=", "east"))]), + Selection::All, &mut dst, ); assert!(dst.is_empty()); @@ -2862,16 +2893,17 @@ west,host-d,11,9 let mut dst = BTreeSet::new(); let names = row_group.column_names( &Predicate::new(vec![BinaryExpr::from(("track", "=", "place"))]), + Selection::All, &mut dst, ); // query matches one row. // - // region, track, time - // NULL , place, 600 + // region, temp, track, time + // NULL , warm, place, 600 // assert_eq!( dst, - vec!["track", "time"] + vec!["temp", "time", "track",] .into_iter() .map(|s| s.to_owned()) .collect() @@ -2883,16 +2915,35 @@ west,host-d,11,9 let rc = ColumnType::Tag(Column::from(&[Some("prod")][..])); columns.insert("env".to_string(), rc); let tc = ColumnType::Time(Column::from(&[100_i64][..])); + let temp = ColumnType::Field(Column::from(&[Some("hot")][..])); + columns.insert("temp".to_string(), temp); + columns.insert("time".to_string(), tc); let row_group = RowGroup::new(1, columns); - row_group.column_names(&Predicate::default(), &mut dst); + row_group.column_names(&Predicate::default(), Selection::All, &mut dst); assert_eq!( dst, - vec!["env", "time", "track"] + vec!["env", "temp", "time", "track"] .into_iter() .map(|s| s.to_owned()) .collect() ); + + // just tag keys + dst.clear(); + row_group.column_names(&Predicate::default(), Selection::Some(&["env"]), &mut dst); + assert_eq!( + dst.iter().cloned().collect::>(), + vec!["env".to_owned()], + ); + + // just field keys + dst.clear(); + row_group.column_names(&Predicate::default(), Selection::Some(&["temp"]), &mut dst); + assert_eq!( + dst.iter().cloned().collect::>(), + vec!["temp".to_owned()], + ); } } diff --git a/read_buffer/src/table.rs b/read_buffer/src/table.rs index a1c95982bd..937f49e8d4 100644 --- a/read_buffer/src/table.rs +++ b/read_buffer/src/table.rs @@ -13,7 +13,6 @@ use snafu::{ensure, Snafu}; use crate::row_group::{self, ColumnName, GroupKey, Predicate, RowGroup}; use crate::schema::{AggregateType, ColumnType, LogicalDataType, ResultSchema}; use crate::value::{AggregateResult, Scalar, Value}; - #[derive(Debug, Snafu)] pub enum Error { #[snafu(display("cannot drop last row group in table; drop table"))] @@ -431,10 +430,12 @@ impl Table { /// /// Optionally a predicate may be provided. In such a case only column names /// will be returned belonging to columns whom have at least one non-null - /// value for any row satisfying the predicate. + /// value for any row satisfying the predicate. Finally, the caller can + /// specify a set of column names to limit execution to only those. pub fn column_names( &self, predicate: &Predicate, + columns: Selection<'_>, mut dst: BTreeSet, ) -> BTreeSet { let table_data = self.table_data.read().unwrap(); @@ -459,7 +460,7 @@ impl Table { // lock. let (_, row_groups) = self.filter_row_groups(predicate); for row_group in row_groups { - row_group.column_names(predicate, &mut dst); + row_group.column_names(predicate, columns, &mut dst); } dst @@ -1296,6 +1297,7 @@ west,host-b,100 let rc = ColumnType::Tag(Column::from(&["west", "south", "north"][..])); columns.insert("region".to_string(), rc); + let rg = RowGroup::new(3, columns); let mut table = Table::new("cpu".to_owned(), rg); @@ -1306,6 +1308,7 @@ west,host-b,100 let rc = ColumnType::Tag(Column::from(vec![Some("north"), None, None].as_slice())); columns.insert("region".to_string(), rc); + let rg = RowGroup::new(3, columns); table.add_row_group(rg); @@ -1322,7 +1325,7 @@ west,host-b,100 // NULL, 400 let mut dst: BTreeSet = BTreeSet::new(); - dst = table.column_names(&Predicate::default(), dst); + dst = table.column_names(&Predicate::default(), Selection::All, dst); assert_eq!( dst.iter().cloned().collect::>(), @@ -1330,7 +1333,7 @@ west,host-b,100 ); // re-run and get the same answer - dst = table.column_names(&Predicate::default(), dst); + dst = table.column_names(&Predicate::default(), Selection::All, dst); assert_eq!( dst.iter().cloned().collect::>(), vec!["region".to_owned(), "time".to_owned()], @@ -1340,6 +1343,7 @@ west,host-b,100 // region from previous results. dst = table.column_names( &Predicate::new(vec![BinaryExpr::from(("time", ">=", 300_i64))]), + Selection::All, dst, ); assert_eq!( @@ -1350,6 +1354,7 @@ west,host-b,100 // wipe the destination buffer and region won't show up dst = table.column_names( &Predicate::new(vec![BinaryExpr::from(("time", ">=", 300_i64))]), + Selection::All, BTreeSet::new(), ); assert_eq!( diff --git a/server/src/buffer.rs b/server/src/buffer.rs index 66684a49ee..2977cbcef7 100644 --- a/server/src/buffer.rs +++ b/server/src/buffer.rs @@ -399,7 +399,7 @@ impl Segment { .put( &location, futures::stream::once(async move { stream_data }), - len, + Some(len), ) .await { diff --git a/server/src/db.rs b/server/src/db.rs index 94eb32366d..95fb707c00 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -126,11 +126,10 @@ impl Db { // Return a list of all chunks in the mutable_buffer (that can // potentially be migrated into the read buffer or object store) - pub async fn mutable_buffer_chunks(&self, partition_key: &str) -> Vec> { + pub fn mutable_buffer_chunks(&self, partition_key: &str) -> Vec> { let chunks = if let Some(mutable_buffer) = self.mutable_buffer.as_ref() { mutable_buffer .chunks(partition_key) - .await .into_iter() .map(DBChunk::new_mb) .collect() @@ -141,7 +140,7 @@ impl Db { } /// List chunks that are currently in the read buffer - pub async fn read_buffer_chunks(&self, partition_key: &str) -> Vec> { + pub fn read_buffer_chunks(&self, partition_key: &str) -> Vec> { self.read_buffer .chunk_ids(partition_key) .into_iter() @@ -247,14 +246,14 @@ impl Database for Db { type Chunk = DBChunk; /// Return a covering set of chunks for a particular partition - async fn chunks(&self, partition_key: &str) -> Vec> { + fn chunks(&self, partition_key: &str) -> Vec> { // return a coverting set of chunks. TODO include read buffer // chunks and take them preferentially from the read buffer. // returns a coverting set of chunks -- aka take chunks from read buffer // preferentially - let mutable_chunk_iter = self.mutable_buffer_chunks(partition_key).await.into_iter(); + let mutable_chunk_iter = self.mutable_buffer_chunks(partition_key).into_iter(); - let read_buffer_chunk_iter = self.read_buffer_chunks(partition_key).await.into_iter(); + let read_buffer_chunk_iter = self.read_buffer_chunks(partition_key).into_iter(); let chunks: BTreeMap<_, _> = mutable_chunk_iter .chain(read_buffer_chunk_iter) @@ -277,18 +276,6 @@ impl Database for Db { .context(MutableBufferWrite) } - async fn field_column_names( - &self, - predicate: query::predicate::Predicate, - ) -> Result { - self.mutable_buffer - .as_ref() - .context(DatabaseNotReadable)? - .field_column_names(predicate) - .await - .context(MutableBufferRead) - } - async fn column_values( &self, column_name: &str, @@ -327,12 +314,11 @@ impl Database for Db { .context(MutableBufferRead) } - async fn partition_keys(&self) -> Result, Self::Error> { + fn partition_keys(&self) -> Result, Self::Error> { self.mutable_buffer .as_ref() .context(DatabaseNotReadable)? .partition_keys() - .await .context(MutableBufferRead) } } @@ -392,7 +378,7 @@ mod tests { let db = make_db(); let mut writer = TestLPWriter::default(); writer.write_lp_string(&db, "cpu bar=1 10").await.unwrap(); - assert_eq!(vec!["1970-01-01T00"], db.partition_keys().await.unwrap()); + assert_eq!(vec!["1970-01-01T00"], db.partition_keys().unwrap()); let mb_chunk = db.rollover_partition("1970-01-01T00").await.unwrap(); assert_eq!(mb_chunk.id(), 0); @@ -448,8 +434,8 @@ mod tests { // we should have chunks in both the mutable buffer and read buffer // (Note the currently open chunk is not listed) - assert_eq!(mutable_chunk_ids(&db, partition_key).await, vec![0, 1]); - assert_eq!(read_buffer_chunk_ids(&db, partition_key).await, vec![0]); + assert_eq!(mutable_chunk_ids(&db, partition_key), vec![0, 1]); + assert_eq!(read_buffer_chunk_ids(&db, partition_key), vec![0]); // data should be readable let expected = vec![ @@ -468,8 +454,8 @@ mod tests { .await .unwrap(); - assert_eq!(mutable_chunk_ids(&db, partition_key).await, vec![1]); - assert_eq!(read_buffer_chunk_ids(&db, partition_key).await, vec![0]); + assert_eq!(mutable_chunk_ids(&db, partition_key), vec![1]); + assert_eq!(read_buffer_chunk_ids(&db, partition_key), vec![0]); let batches = run_query(&db, "select * from cpu").await; assert_table_eq!(&expected, &batches); @@ -479,7 +465,7 @@ mod tests { .await .unwrap(); assert_eq!( - read_buffer_chunk_ids(&db, partition_key).await, + read_buffer_chunk_ids(&db, partition_key), vec![] as Vec ); @@ -499,9 +485,9 @@ mod tests { writer.write_lp_string(&db, "cpu bar=1 10").await.unwrap(); writer.write_lp_string(&db, "cpu bar=1 20").await.unwrap(); - assert_eq!(mutable_chunk_ids(&db, partition_key).await, vec![0]); + assert_eq!(mutable_chunk_ids(&db, partition_key), vec![0]); assert_eq!( - read_buffer_chunk_ids(&db, partition_key).await, + read_buffer_chunk_ids(&db, partition_key), vec![] as Vec ); @@ -519,8 +505,8 @@ mod tests { writer.write_lp_string(&db, "cpu bar=1 40").await.unwrap(); - assert_eq!(mutable_chunk_ids(&db, partition_key).await, vec![0, 1, 2]); - assert_eq!(read_buffer_chunk_ids(&db, partition_key).await, vec![1]); + assert_eq!(mutable_chunk_ids(&db, partition_key), vec![0, 1, 2]); + assert_eq!(read_buffer_chunk_ids(&db, partition_key), vec![1]); } // run a sql query against the database, returning the results as record batches @@ -533,10 +519,9 @@ mod tests { collect(physical_plan).await.unwrap() } - async fn mutable_chunk_ids(db: &Db, partition_key: &str) -> Vec { + fn mutable_chunk_ids(db: &Db, partition_key: &str) -> Vec { let mut chunk_ids: Vec = db .mutable_buffer_chunks(partition_key) - .await .iter() .map(|chunk| chunk.id()) .collect(); @@ -544,10 +529,9 @@ mod tests { chunk_ids } - async fn read_buffer_chunk_ids(db: &Db, partition_key: &str) -> Vec { + fn read_buffer_chunk_ids(db: &Db, partition_key: &str) -> Vec { let mut chunk_ids: Vec = db .read_buffer_chunks(partition_key) - .await .iter() .map(|chunk| chunk.id()) .collect(); diff --git a/server/src/db/chunk.rs b/server/src/db/chunk.rs index 0dd0604901..1d60170ad5 100644 --- a/server/src/db/chunk.rs +++ b/server/src/db/chunk.rs @@ -246,7 +246,7 @@ impl PartitionChunk for DBChunk { // Note Mutable buffer doesn't support predicate // pushdown (other than pruning out the entire chunk // via `might_pass_predicate) - let schema: Schema = self.table_schema(table_name, selection.clone()).await?; + let schema: Schema = self.table_schema(table_name, selection).await?; Ok(Box::pin(MutableBufferChunkStream::new( Arc::clone(&chunk), @@ -339,7 +339,13 @@ impl PartitionChunk for DBChunk { let chunk_ids = &[chunk_id]; let names = db - .column_names(partition_key, table_name, chunk_ids, rb_predicate) + .column_names( + partition_key, + table_name, + chunk_ids, + rb_predicate, + Selection::All, + ) .context(ReadBufferChunk { chunk_id })?; Ok(names) diff --git a/server/src/lib.rs b/server/src/lib.rs index f98a365340..fae8304579 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -216,7 +216,7 @@ impl Server { .put( &location, futures::stream::once(async move { stream_data }), - len, + Some(len), ) .await .context(StoreError)?; diff --git a/server/src/query_tests/influxrpc.rs b/server/src/query_tests/influxrpc.rs index 5def191239..8f211af558 100644 --- a/server/src/query_tests/influxrpc.rs +++ b/server/src/query_tests/influxrpc.rs @@ -1,2 +1,3 @@ +pub mod field_columns; pub mod table_names; pub mod tag_column_names; diff --git a/server/src/query_tests/influxrpc/field_columns.rs b/server/src/query_tests/influxrpc/field_columns.rs new file mode 100644 index 0000000000..dc6f27df34 --- /dev/null +++ b/server/src/query_tests/influxrpc/field_columns.rs @@ -0,0 +1,166 @@ +use arrow_deps::{ + arrow::datatypes::DataType, + assert_table_eq, + datafusion::logical_plan::{col, lit}, +}; +use query::{ + exec::{ + fieldlist::{Field, FieldList}, + Executor, + }, + frontend::influxrpc::InfluxRPCPlanner, + predicate::PredicateBuilder, +}; + +use crate::query_tests::scenarios::*; + +/// Creates and loads several database scenarios using the db_setup +/// function. +/// +/// runs field_column_names(predicate) and compares it to the expected +/// output +macro_rules! run_field_columns_test_case { + ($DB_SETUP:expr, $PREDICATE:expr, $EXPECTED_FIELDS:expr) => { + test_helpers::maybe_start_logging(); + let predicate = $PREDICATE; + let expected_fields = $EXPECTED_FIELDS; + for scenario in $DB_SETUP.make().await { + let DBScenario { + scenario_name, db, .. + } = scenario; + println!("Running scenario '{}'", scenario_name); + println!("Predicate: '{:#?}'", predicate); + let planner = InfluxRPCPlanner::new(); + let executor = Executor::new(); + + let plan = planner + .field_columns(&db, predicate.clone()) + .await + .expect("built plan successfully"); + let fields = executor + .to_field_list(plan) + .await + .expect("converted plan to strings successfully"); + + assert_eq!( + fields, expected_fields, + "Error in scenario '{}'\n\nexpected:\n{:#?}\nactual:\n{:#?}", + scenario_name, expected_fields, fields + ); + } + }; +} + +#[tokio::test] +async fn test_field_columns_empty_database() { + let predicate = PredicateBuilder::default().build(); + let expected_fields = FieldList::default(); + run_field_columns_test_case!(NoData {}, predicate, expected_fields); +} + +#[tokio::test] +async fn test_field_columns_no_predicate() { + let predicate = PredicateBuilder::default() + .table("NoSuchTable") + .add_expr(col("state").eq(lit("MA"))) // state=MA + .build(); + let expected_fields = FieldList::default(); + run_field_columns_test_case!(TwoMeasurementsManyFields {}, predicate, expected_fields); +} + +#[tokio::test] +async fn test_field_columns_with_pred() { + // get only fields from h20 (but both chunks) + let predicate = PredicateBuilder::default() + .table("h2o") + .add_expr(col("state").eq(lit("MA"))) // state=MA + .build(); + + let expected_fields = FieldList { + fields: vec![ + Field { + name: "moisture".into(), + data_type: DataType::Float64, + last_timestamp: 100000, + }, + Field { + name: "other_temp".into(), + data_type: DataType::Float64, + last_timestamp: 250, + }, + Field { + name: "temp".into(), + data_type: DataType::Float64, + last_timestamp: 100000, + }, + ], + }; + + run_field_columns_test_case!(TwoMeasurementsManyFields {}, predicate, expected_fields); +} + +#[tokio::test] +async fn test_field_columns_with_ts_pred() { + let predicate = PredicateBuilder::default() + .table("h2o") + .timestamp_range(200, 300) + .add_expr(col("state").eq(lit("MA"))) // state=MA + .build(); + + let expected_fields = FieldList { + fields: vec![Field { + name: "other_temp".into(), + data_type: DataType::Float64, + last_timestamp: 250, + }], + }; + + run_field_columns_test_case!(TwoMeasurementsManyFields {}, predicate, expected_fields); +} + +#[tokio::test] +async fn test_field_name_plan() { + test_helpers::maybe_start_logging(); + // Tests that the ordering that comes out is reasonable + let scenarios = OneMeasurementManyFields {}.make().await; + + for scenario in scenarios { + let predicate = PredicateBuilder::default().timestamp_range(0, 200).build(); + + let DBScenario { + scenario_name, db, .. + } = scenario; + println!("Running scenario '{}'", scenario_name); + println!("Predicate: '{:#?}'", predicate); + let planner = InfluxRPCPlanner::new(); + let executor = Executor::new(); + + let plan = planner + .field_columns(&db, predicate.clone()) + .await + .expect("built plan successfully"); + + let mut plans = plan.plans; + let plan = plans.pop().unwrap(); + assert!(plans.is_empty()); // only one plan + + // run the created plan directly, ensuring the output is as + // expected (specifically that the column ordering is correct) + let results = executor + .run_logical_plan(plan) + .await + .expect("ok running plan"); + + let expected = vec![ + "+--------+--------+--------+--------+------+", + "| field1 | field2 | field3 | field4 | time |", + "+--------+--------+--------+--------+------+", + "| 70.6 | | 2 | | 100 |", + "| 70.4 | ss | | | 100 |", + "| 70.5 | ss | | | 100 |", + "+--------+--------+--------+--------+------+", + ]; + + assert_table_eq!(expected, &results); + } +} diff --git a/server/src/query_tests/influxrpc/table_names.rs b/server/src/query_tests/influxrpc/table_names.rs index 23f4674ded..9b686b38df 100644 --- a/server/src/query_tests/influxrpc/table_names.rs +++ b/server/src/query_tests/influxrpc/table_names.rs @@ -17,7 +17,9 @@ macro_rules! run_table_names_test_case { test_helpers::maybe_start_logging(); let predicate = $PREDICATE; for scenario in $DB_SETUP.make().await { - let DBScenario { scenario_name, db } = scenario; + let DBScenario { + scenario_name, db, .. + } = scenario; println!("Running scenario '{}'", scenario_name); println!("Predicate: '{:#?}'", predicate); let planner = InfluxRPCPlanner::new(); diff --git a/server/src/query_tests/influxrpc/tag_column_names.rs b/server/src/query_tests/influxrpc/tag_column_names.rs index c94a14f800..2d471a2b86 100644 --- a/server/src/query_tests/influxrpc/tag_column_names.rs +++ b/server/src/query_tests/influxrpc/tag_column_names.rs @@ -13,7 +13,7 @@ use crate::query_tests::scenarios::*; /// Creates and loads several database scenarios using the db_setup /// function. /// -/// runs table_tag_column_names(predicate) and compares it to the expected +/// runs table_column_names(predicate) and compares it to the expected /// output macro_rules! run_tag_column_names_test_case { ($DB_SETUP:expr, $PREDICATE:expr, $EXPECTED_NAMES:expr) => { @@ -21,7 +21,9 @@ macro_rules! run_tag_column_names_test_case { let predicate = $PREDICATE; let expected_names = $EXPECTED_NAMES; for scenario in $DB_SETUP.make().await { - let DBScenario { scenario_name, db } = scenario; + let DBScenario { + scenario_name, db, .. + } = scenario; println!("Running scenario '{}'", scenario_name); println!("Predicate: '{:#?}'", predicate); let planner = InfluxRPCPlanner::new(); diff --git a/server/src/query_tests/scenarios.rs b/server/src/query_tests/scenarios.rs index 01bb961565..40bc8d4e09 100644 --- a/server/src/query_tests/scenarios.rs +++ b/server/src/query_tests/scenarios.rs @@ -36,8 +36,8 @@ impl DBSetup for NoData { // listing partitions (which may create an entry in a map) // in an empty database let db = make_db(); - assert_eq!(db.mutable_buffer_chunks(partition_key).await.len(), 1); // only open chunk - assert_eq!(db.read_buffer_chunks(partition_key).await.len(), 0); + assert_eq!(db.mutable_buffer_chunks(partition_key).len(), 1); // only open chunk + assert_eq!(db.read_buffer_chunks(partition_key).len(), 0); let scenario2 = DBScenario { scenario_name: "New, Empty Database after partitions are listed".into(), db, @@ -55,9 +55,9 @@ impl DBSetup for NoData { .await .unwrap(); - assert_eq!(db.mutable_buffer_chunks(partition_key).await.len(), 1); + assert_eq!(db.mutable_buffer_chunks(partition_key).len(), 1); - assert_eq!(db.read_buffer_chunks(partition_key).await.len(), 0); // only open chunk + assert_eq!(db.read_buffer_chunks(partition_key).len(), 0); // only open chunk let scenario3 = DBScenario { scenario_name: "Empty Database after drop chunk".into(), @@ -78,51 +78,7 @@ impl DBSetup for TwoMeasurements { cpu,region=west user=21.0 150\n\ disk,region=east bytes=99i 200"; - let db = make_db(); - let mut writer = TestLPWriter::default(); - writer.write_lp_string(&db, data).await.unwrap(); - let scenario1 = DBScenario { - scenario_name: "Data in open chunk of mutable buffer".into(), - db, - }; - - let db = make_db(); - let mut writer = TestLPWriter::default(); - writer.write_lp_string(&db, data).await.unwrap(); - db.rollover_partition(partition_key).await.unwrap(); - let scenario2 = DBScenario { - scenario_name: "Data in closed chunk of mutable buffer".into(), - db, - }; - - let db = make_db(); - let mut writer = TestLPWriter::default(); - writer.write_lp_string(&db, data).await.unwrap(); - db.rollover_partition(partition_key).await.unwrap(); - db.load_chunk_to_read_buffer(partition_key, 0) - .await - .unwrap(); - let scenario3 = DBScenario { - scenario_name: "Data in both read buffer and mutable buffer".into(), - db, - }; - - let db = make_db(); - let mut writer = TestLPWriter::default(); - writer.write_lp_string(&db, data).await.unwrap(); - db.rollover_partition(partition_key).await.unwrap(); - db.load_chunk_to_read_buffer(partition_key, 0) - .await - .unwrap(); - db.drop_mutable_buffer_chunk(partition_key, 0) - .await - .unwrap(); - let scenario4 = DBScenario { - scenario_name: "Data in only buffer and not mutable buffer".into(), - db, - }; - - vec![scenario1, scenario2, scenario3, scenario4] + make_one_chunk_scenarios(partition_key, data).await } } @@ -138,71 +94,7 @@ impl DBSetup for MultiChunkSchemaMerge { let data2 = "cpu,region=east,host=foo user=23.2 100\n\ cpu,region=west,host=bar user=21.0 250"; - let db = make_db(); - let mut writer = TestLPWriter::default(); - writer.write_lp_string(&db, data1).await.unwrap(); - writer.write_lp_string(&db, data2).await.unwrap(); - let scenario1 = DBScenario { - scenario_name: "Data in single open chunk of mutable buffer".into(), - db, - }; - - // spread across 2 mutable buffer chunks - let db = make_db(); - let mut writer = TestLPWriter::default(); - writer.write_lp_string(&db, data1).await.unwrap(); - db.rollover_partition(partition_key).await.unwrap(); - writer.write_lp_string(&db, data2).await.unwrap(); - let scenario2 = DBScenario { - scenario_name: "Data in open chunk and closed chunk of mutable buffer".into(), - db, - }; - - // spread across 1 mutable buffer, 1 read buffer chunks - let db = make_db(); - let mut writer = TestLPWriter::default(); - writer.write_lp_string(&db, data1).await.unwrap(); - db.rollover_partition(partition_key).await.unwrap(); - db.load_chunk_to_read_buffer(partition_key, 0) - .await - .unwrap(); - db.drop_mutable_buffer_chunk(partition_key, 0) - .await - .unwrap(); - writer.write_lp_string(&db, data2).await.unwrap(); - let scenario3 = DBScenario { - scenario_name: "Data in open chunk of mutable buffer, and one chunk of read buffer" - .into(), - db, - }; - - // in 2 read buffer chunks - let db = make_db(); - let mut writer = TestLPWriter::default(); - writer.write_lp_string(&db, data1).await.unwrap(); - db.rollover_partition(partition_key).await.unwrap(); - writer.write_lp_string(&db, data2).await.unwrap(); - db.rollover_partition(partition_key).await.unwrap(); - - db.load_chunk_to_read_buffer(partition_key, 0) - .await - .unwrap(); - db.drop_mutable_buffer_chunk(partition_key, 0) - .await - .unwrap(); - - db.load_chunk_to_read_buffer(partition_key, 1) - .await - .unwrap(); - db.drop_mutable_buffer_chunk(partition_key, 1) - .await - .unwrap(); - let scenario4 = DBScenario { - scenario_name: "Data in two read buffer chunks".into(), - db, - }; - - vec![scenario1, scenario2, scenario3, scenario4] + make_two_chunk_scenarios(partition_key, data1, data2).await } } @@ -212,78 +104,48 @@ pub struct TwoMeasurementsManyNulls {} impl DBSetup for TwoMeasurementsManyNulls { async fn make(&self) -> Vec { let partition_key = "1970-01-01T00"; - - let lp_data1 = "h2o,state=CA,city=LA,county=LA temp=70.4 100\n\ + let data1 = "h2o,state=CA,city=LA,county=LA temp=70.4 100\n\ h2o,state=MA,city=Boston,county=Suffolk temp=72.4 250\n\ o2,state=MA,city=Boston temp=50.4 200\n\ o2,state=CA temp=79.0 300\n"; - let lp_data2 = "o2,state=NY temp=60.8 400\n\ + let data2 = "o2,state=NY temp=60.8 400\n\ o2,state=NY,city=NYC temp=61.0 500\n\ o2,state=NY,city=NYC,borough=Brooklyn temp=61.0 600\n"; - let db = make_db(); - let mut writer = TestLPWriter::default(); - writer.write_lp_string(&db, lp_data1).await.unwrap(); - writer.write_lp_string(&db, lp_data2).await.unwrap(); - let scenario1 = DBScenario { - scenario_name: "Data in open chunk of mutable buffer".into(), - db, - }; + make_two_chunk_scenarios(partition_key, data1, data2).await + } +} - let db = make_db(); - let mut writer = TestLPWriter::default(); - writer.write_lp_string(&db, lp_data1).await.unwrap(); - db.rollover_partition(partition_key).await.unwrap(); - writer.write_lp_string(&db, lp_data2).await.unwrap(); - let scenario2 = DBScenario { - scenario_name: "Data in one open chunk, one closed chunk of mutable buffer".into(), - db, - }; +pub struct TwoMeasurementsManyFields {} +#[async_trait] +impl DBSetup for TwoMeasurementsManyFields { + async fn make(&self) -> Vec { + let partition_key = "1970-01-01T00"; - let db = make_db(); - let mut writer = TestLPWriter::default(); - writer.write_lp_string(&db, lp_data1).await.unwrap(); - db.rollover_partition(partition_key).await.unwrap(); - writer.write_lp_string(&db, lp_data2).await.unwrap(); - db.rollover_partition(partition_key).await.unwrap(); - db.load_chunk_to_read_buffer(partition_key, 0) - .await - .unwrap(); - db.drop_mutable_buffer_chunk(partition_key, 0) - .await - .unwrap(); - let scenario3 = DBScenario { - scenario_name: "One data chunk in read buffer, one chunk of mutable buffer".into(), - db, - }; + let data1 = "h2o,state=MA,city=Boston temp=70.4 50\n\ + h2o,state=MA,city=Boston other_temp=70.4 250\n\ + h2o,state=CA,city=Boston other_temp=72.4 350\n\ + o2,state=MA,city=Boston temp=53.4,reading=51 50\n\ + o2,state=CA temp=79.0 300"; + let data2 = "h2o,state=MA,city=Boston temp=70.4,moisture=43.0 100000"; + make_two_chunk_scenarios(partition_key, data1, data2).await + } +} - let db = make_db(); - let mut writer = TestLPWriter::default(); - writer.write_lp_string(&db, lp_data1).await.unwrap(); - db.rollover_partition(partition_key).await.unwrap(); - writer.write_lp_string(&db, lp_data2).await.unwrap(); - db.rollover_partition(partition_key).await.unwrap(); +pub struct OneMeasurementManyFields {} +#[async_trait] +impl DBSetup for OneMeasurementManyFields { + async fn make(&self) -> Vec { + let partition_key = "1970-01-01T00"; - db.load_chunk_to_read_buffer(partition_key, 0) - .await - .unwrap(); - db.drop_mutable_buffer_chunk(partition_key, 0) - .await - .unwrap(); + // Order this so field3 comes before field2 + // (and thus the columns need to get reordered) + let data = "h2o,tag1=foo,tag2=bar field1=70.6,field3=2 100\n\ + h2o,tag1=foo,tag2=bar field1=70.4,field2=\"ss\" 100\n\ + h2o,tag1=foo,tag2=bar field1=70.5,field2=\"ss\" 100\n\ + h2o,tag1=foo,tag2=bar field1=70.6,field4=true 1000"; - db.load_chunk_to_read_buffer(partition_key, 1) - .await - .unwrap(); - db.drop_mutable_buffer_chunk(partition_key, 1) - .await - .unwrap(); - - let scenario4 = DBScenario { - scenario_name: "Data in 2 read buffer chunks".into(), - db, - }; - - vec![scenario1, scenario2, scenario3, scenario4] + make_one_chunk_scenarios(partition_key, data).await } } @@ -314,3 +176,134 @@ impl DBSetup for EndToEndTest { vec![scenario1] } } + +/// This function loads two chunks of lp data into 4 different scenarios +/// +/// Data in single open mutable buffer chunk +/// Data in single closed mutable buffer chunk, one closed mutable chunk +/// Data in both read buffer and mutable buffer chunk +/// Data in one only read buffer chunk +async fn make_one_chunk_scenarios(partition_key: &str, data: &str) -> Vec { + let db = make_db(); + let mut writer = TestLPWriter::default(); + writer.write_lp_string(&db, data).await.unwrap(); + let scenario1 = DBScenario { + scenario_name: "Data in open chunk of mutable buffer".into(), + db, + }; + + let db = make_db(); + let mut writer = TestLPWriter::default(); + writer.write_lp_string(&db, data).await.unwrap(); + db.rollover_partition(partition_key).await.unwrap(); + let scenario2 = DBScenario { + scenario_name: "Data in closed chunk of mutable buffer".into(), + db, + }; + + let db = make_db(); + let mut writer = TestLPWriter::default(); + writer.write_lp_string(&db, data).await.unwrap(); + db.rollover_partition(partition_key).await.unwrap(); + db.load_chunk_to_read_buffer(partition_key, 0) + .await + .unwrap(); + let scenario3 = DBScenario { + scenario_name: "Data in both read buffer and mutable buffer".into(), + db, + }; + + let db = make_db(); + let mut writer = TestLPWriter::default(); + writer.write_lp_string(&db, data).await.unwrap(); + db.rollover_partition(partition_key).await.unwrap(); + db.load_chunk_to_read_buffer(partition_key, 0) + .await + .unwrap(); + db.drop_mutable_buffer_chunk(partition_key, 0) + .await + .unwrap(); + let scenario4 = DBScenario { + scenario_name: "Data in only read buffer and not mutable buffer".into(), + db, + }; + + vec![scenario1, scenario2, scenario3, scenario4] +} + +/// This function loads two chunks of lp data into 4 different scenarios +/// +/// Data in single open mutable buffer chunk +/// Data in one open mutable buffer chunk, one closed mutable chunk +/// Data in one open mutable buffer chunk, one read buffer chunk +/// Data in one two read buffer chunks, +async fn make_two_chunk_scenarios( + partition_key: &str, + data1: &str, + data2: &str, +) -> Vec { + let db = make_db(); + let mut writer = TestLPWriter::default(); + writer.write_lp_string(&db, data1).await.unwrap(); + writer.write_lp_string(&db, data2).await.unwrap(); + let scenario1 = DBScenario { + scenario_name: "Data in single open chunk of mutable buffer".into(), + db, + }; + + // spread across 2 mutable buffer chunks + let db = make_db(); + let mut writer = TestLPWriter::default(); + writer.write_lp_string(&db, data1).await.unwrap(); + db.rollover_partition(partition_key).await.unwrap(); + writer.write_lp_string(&db, data2).await.unwrap(); + let scenario2 = DBScenario { + scenario_name: "Data in one open chunk and one closed chunk of mutable buffer".into(), + db, + }; + + // spread across 1 mutable buffer, 1 read buffer chunks + let db = make_db(); + let mut writer = TestLPWriter::default(); + writer.write_lp_string(&db, data1).await.unwrap(); + db.rollover_partition(partition_key).await.unwrap(); + db.load_chunk_to_read_buffer(partition_key, 0) + .await + .unwrap(); + db.drop_mutable_buffer_chunk(partition_key, 0) + .await + .unwrap(); + writer.write_lp_string(&db, data2).await.unwrap(); + let scenario3 = DBScenario { + scenario_name: "Data in open chunk of mutable buffer, and one chunk of read buffer".into(), + db, + }; + + // in 2 read buffer chunks + let db = make_db(); + let mut writer = TestLPWriter::default(); + writer.write_lp_string(&db, data1).await.unwrap(); + db.rollover_partition(partition_key).await.unwrap(); + writer.write_lp_string(&db, data2).await.unwrap(); + db.rollover_partition(partition_key).await.unwrap(); + + db.load_chunk_to_read_buffer(partition_key, 0) + .await + .unwrap(); + db.drop_mutable_buffer_chunk(partition_key, 0) + .await + .unwrap(); + + db.load_chunk_to_read_buffer(partition_key, 1) + .await + .unwrap(); + db.drop_mutable_buffer_chunk(partition_key, 1) + .await + .unwrap(); + let scenario4 = DBScenario { + scenario_name: "Data in two read buffer chunks".into(), + db, + }; + + vec![scenario1, scenario2, scenario3, scenario4] +} diff --git a/server/src/query_tests/sql.rs b/server/src/query_tests/sql.rs index 388444521c..727853dca4 100644 --- a/server/src/query_tests/sql.rs +++ b/server/src/query_tests/sql.rs @@ -16,7 +16,9 @@ macro_rules! run_sql_test_case { test_helpers::maybe_start_logging(); let sql = $SQL.to_string(); for scenario in $DB_SETUP.make().await { - let DBScenario { scenario_name, db } = scenario; + let DBScenario { + scenario_name, db, .. + } = scenario; println!("Running scenario '{}'", scenario_name); println!("SQL: '{:#?}'", sql); let planner = SQLQueryPlanner::new(); diff --git a/server/src/query_tests/table_schema.rs b/server/src/query_tests/table_schema.rs index 156fda56ec..9aceb80aa5 100644 --- a/server/src/query_tests/table_schema.rs +++ b/server/src/query_tests/table_schema.rs @@ -19,7 +19,9 @@ macro_rules! run_table_schema_test_case { let expected_schema = $EXPECTED_SCHEMA; for scenario in $DB_SETUP.make().await { - let DBScenario { scenario_name, db } = scenario; + let DBScenario { + scenario_name, db, .. + } = scenario; println!("Running scenario '{}'", scenario_name); println!( "Getting schema for table '{}', selection {:?}", @@ -29,8 +31,8 @@ macro_rules! run_table_schema_test_case { // Make sure at least one table has data let mut chunks_with_table = 0; - for partition_key in db.partition_keys().await.unwrap() { - for chunk in db.chunks(&partition_key).await { + for partition_key in db.partition_keys().unwrap() { + for chunk in db.chunks(&partition_key) { if chunk.has_table(table_name) { chunks_with_table += 1; let actual_schema = chunk diff --git a/server/src/snapshot.rs b/server/src/snapshot.rs index 7037bd02b3..6833863d05 100644 --- a/server/src/snapshot.rs +++ b/server/src/snapshot.rs @@ -194,7 +194,7 @@ where .put( &partition_meta_path, futures::stream::once(async move { stream_data }), - len, + Some(len), ) .await .context(WritingToObjectStore)?; @@ -247,7 +247,7 @@ where .put( &file_name, futures::stream::once(async move { stream_data }), - len, + Some(len), ) .await .context(WritingToObjectStore) @@ -393,7 +393,7 @@ mem,host=A,region=west used=45 1 let mut data_path = store.new_path(); data_path.push_dir("data"); - let chunk = Arc::clone(&db.chunks("1970-01-01T00").await[0]); + let chunk = Arc::clone(&db.chunks("1970-01-01T00")[0]); let snapshot = snapshot_chunk( metadata_path.clone(), diff --git a/src/influxdb_ioxd.rs b/src/influxdb_ioxd.rs index c8aebef03f..fa8a0cab2b 100644 --- a/src/influxdb_ioxd.rs +++ b/src/influxdb_ioxd.rs @@ -145,7 +145,8 @@ pub async fn main(logging_level: LoggingLevel, config: Option) -> Result .serve(router_service); info!(bind_address=?bind_addr, "HTTP server listening"); - println!("InfluxDB IOx server ready"); + let git_hash = option_env!("GIT_HASH").unwrap_or("UNKNOWN"); + info!(git_hash, "InfluxDB IOx server ready"); // Wait for both the servers to complete let (grpc_server, server) = futures::future::join(grpc_server, http_server).await; diff --git a/src/influxdb_ioxd/http.rs b/src/influxdb_ioxd/http.rs index bf96f6bb6b..31ba21756d 100644 --- a/src/influxdb_ioxd/http.rs +++ b/src/influxdb_ioxd/http.rs @@ -668,14 +668,13 @@ async fn list_partitions( bucket: &info.bucket, })?; - let partition_keys = db - .partition_keys() - .await - .map_err(|e| Box::new(e) as _) - .context(BucketByName { - org: &info.org, - bucket_name: &info.bucket, - })?; + let partition_keys = + db.partition_keys() + .map_err(|e| Box::new(e) as _) + .context(BucketByName { + org: &info.org, + bucket_name: &info.bucket, + })?; let result = serde_json::to_string(&partition_keys).context(JsonGenerationError)?; diff --git a/src/influxdb_ioxd/rpc/storage/service.rs b/src/influxdb_ioxd/rpc/storage/service.rs index d49ccdd944..65c448a799 100644 --- a/src/influxdb_ioxd/rpc/storage/service.rs +++ b/src/influxdb_ioxd/rpc/storage/service.rs @@ -1061,7 +1061,7 @@ async fn field_names_impl( rpc_predicate: Option, ) -> Result where - T: DatabaseStore, + T: DatabaseStore + 'static, { let rpc_predicate_string = format!("{:?}", rpc_predicate); @@ -1080,14 +1080,16 @@ where .await .context(DatabaseNotFound { db_name })?; - let executor = db_store.executor(); + let planner = InfluxRPCPlanner::new(); - let field_list_plan = db - .field_column_names(predicate) + let field_list_plan = planner + .field_columns(db.as_ref(), predicate) .await .map_err(|e| Box::new(e) as _) .context(ListingFields { db_name })?; + let executor = db_store.executor(); + let field_list = executor .to_field_list(field_list_plan) .await @@ -1102,17 +1104,11 @@ mod tests { use super::super::id::ID; use super::*; - use arrow_deps::{ - arrow::datatypes::DataType, - datafusion::logical_plan::{col, lit, Expr}, - }; + use arrow_deps::datafusion::logical_plan::{col, lit, Expr}; use panic_logging::SendPanicsToTracing; use query::{ - exec::fieldlist::{Field, FieldList}, - exec::FieldListPlan, exec::SeriesSetPlans, group_by::{Aggregate as QueryAggregate, WindowDuration as QueryWindowDuration}, - test::FieldColumnsRequest, test::QueryGroupsRequest, test::TestDatabaseStore, test::{ColumnValuesRequest, QuerySeriesRequest, TestChunk}, @@ -1538,34 +1534,73 @@ mod tests { actual_tag_values, tag_values, "unexpected tag values while getting tag values for measurement names" ); + } + + #[tokio::test] + async fn test_storage_rpc_tag_values_field() { + // Start a test gRPC server on a randomally allocated port + let mut fixture = Fixture::new().await.expect("Connecting to test server"); + + let db_info = OrgAndBucket::new(123, 456); + let partition_id = 1; + + // Add a chunk with a field + let chunk = TestChunk::new(0) + .with_int_field_column("TheMeasurement", "Field1") + .with_time_column("TheMeasurement") + .with_tag_column("TheMeasurement", "state") + .with_one_row_of_null_data("TheMeasurement"); + + fixture + .test_storage + .db_or_create(&db_info.db_name) + .await + .unwrap() + .add_chunk("my_partition_key", Arc::new(chunk)); + + let source = Some(StorageClientWrapper::read_source( + db_info.org_id, + db_info.bucket_id, + partition_id, + )); // --- // test tag_key = _field means listing all field names // --- let request = TagValuesRequest { tags_source: source.clone(), - range: make_timestamp_range(1000, 1500), - predicate: None, + range: make_timestamp_range(0, 2000), + predicate: make_state_ma_predicate(), tag_key: [255].into(), }; - // Setup a single field name (Field1) - let fieldlist = FieldList { - fields: vec![Field { - name: "Field1".into(), - data_type: DataType::Utf8, - last_timestamp: 1000, - }], - }; - let fieldlist_plan = FieldListPlan::Known(Ok(fieldlist)); - test_db.set_field_colum_names_values(fieldlist_plan); - let expected_tag_values = vec!["Field1"]; let actual_tag_values = fixture.storage_client.tag_values(request).await.unwrap(); assert_eq!( actual_tag_values, expected_tag_values, "unexpected tag values while getting tag values for field names" ); + } + + #[tokio::test] + async fn test_storage_rpc_tag_values_error() { + // Start a test gRPC server on a randomally allocated port + let mut fixture = Fixture::new().await.expect("Connecting to test server"); + + let db_info = OrgAndBucket::new(123, 456); + let partition_id = 1; + + let test_db = fixture + .test_storage + .db_or_create(&db_info.db_name) + .await + .expect("creating test database"); + + let source = Some(StorageClientWrapper::read_source( + db_info.org_id, + db_info.bucket_id, + partition_id, + )); // --- // test error @@ -2135,18 +2170,28 @@ mod tests { } #[tokio::test] - async fn test_measurement_fields() -> Result<(), tonic::Status> { + async fn test_measurement_fields() { + test_helpers::maybe_start_logging(); + // Start a test gRPC server on a randomally allocated port let mut fixture = Fixture::new().await.expect("Connecting to test server"); let db_info = OrgAndBucket::new(123, 456); let partition_id = 1; - let test_db = fixture + // Add a chunk with a field + let chunk = TestChunk::new(0) + .with_int_field_column("TheMeasurement", "Field1") + .with_time_column("TheMeasurement") + .with_tag_column("TheMeasurement", "state") + .with_one_row_of_null_data("TheMeasurement"); + + fixture .test_storage .db_or_create(&db_info.db_name) .await - .expect("creating test database"); + .unwrap() + .add_chunk("my_partition_key", Arc::new(chunk)); let source = Some(StorageClientWrapper::read_source( db_info.org_id, @@ -2157,37 +2202,45 @@ mod tests { let request = MeasurementFieldsRequest { source: source.clone(), measurement: "TheMeasurement".into(), - range: make_timestamp_range(150, 200), + range: make_timestamp_range(0, 2000), predicate: make_state_ma_predicate(), }; - let expected_request = FieldColumnsRequest { - predicate: "Predicate { table_names: TheMeasurement exprs: [#state Eq Utf8(\"MA\")] range: TimestampRange { start: 150, end: 200 }}".into() - }; - - let fieldlist = FieldList { - fields: vec![Field { - name: "Field1".into(), - data_type: DataType::Utf8, - last_timestamp: 1000, - }], - }; - - let fieldlist_plan = FieldListPlan::Known(Ok(fieldlist)); - test_db.set_field_colum_names_values(fieldlist_plan); - - let actual_fields = fixture.storage_client.measurement_fields(request).await?; - let expected_fields: Vec = vec!["key: Field1, type: 3, timestamp: 1000".into()]; + let actual_fields = fixture + .storage_client + .measurement_fields(request) + .await + .unwrap(); + let expected_fields: Vec = vec!["key: Field1, type: 1, timestamp: 1000".into()]; assert_eq!( actual_fields, expected_fields, - "unexpected frames returned by measuremnt_fields" - ); - assert_eq!( - test_db.get_field_columns_request(), - Some(expected_request), - "unexpected request to measurement-fields" + "unexpected frames returned by measurement_fields" ); + } + + #[tokio::test] + async fn test_measurement_fields_error() { + // Start a test gRPC server on a randomally allocated port + let mut fixture = Fixture::new().await.expect("Connecting to test server"); + + let db_info = OrgAndBucket::new(123, 456); + let partition_id = 1; + + let chunk = TestChunk::new(0).with_error("Sugar we are going down"); + + fixture + .test_storage + .db_or_create(&db_info.db_name) + .await + .unwrap() + .add_chunk("my_partition_key", Arc::new(chunk)); + + let source = Some(StorageClientWrapper::read_source( + db_info.org_id, + db_info.bucket_id, + partition_id, + )); // --- // test error @@ -2202,21 +2255,7 @@ mod tests { // Note we don't set the response on the test database, so we expect an error let response = fixture.storage_client.measurement_fields(request).await; assert!(response.is_err()); - let response_string = format!("{:?}", response); - let expected_error = "No saved field_column_name in TestDatabase"; - assert!( - response_string.contains(expected_error), - "'{}' did not contain expected content '{}'", - response_string, - expected_error - ); - - let expected_request = Some(FieldColumnsRequest { - predicate: "Predicate { table_names: TheMeasurement}".into(), - }); - assert_eq!(test_db.get_field_columns_request(), expected_request); - - Ok(()) + assert_contains!(response.unwrap_err().to_string(), "Sugar we are going down"); } fn make_timestamp_range(start: i64, end: i64) -> Option {