From 518df742dfd710e3cc4231140d36bcccc124fa2b Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 13 Apr 2021 14:05:03 -0400 Subject: [PATCH 1/7] chore: update arrow deps (#1195) --- Cargo.lock | 76 +++++++++++++++++++++---------------------- arrow_deps/Cargo.toml | 10 +++--- 2 files changed, 43 insertions(+), 43 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5c519f64dc..6802e86d24 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -111,7 +111,7 @@ checksum = "23b62fc65de8e4e7f52534fb52b0f3ed04746ae267519eef2a83941e8085068b" [[package]] name = "arrow" version = "4.0.0-SNAPSHOT" -source = "git+https://github.com/apache/arrow.git?rev=e69478a890b1e4eee49b540b69b2711d170a0433#e69478a890b1e4eee49b540b69b2711d170a0433" +source = "git+https://github.com/apache/arrow.git?rev=00a443629c00079ea03c0b9f415d74669d2759a7#00a443629c00079ea03c0b9f415d74669d2759a7" dependencies = [ "cfg_aliases", "chrono", @@ -134,7 +134,7 @@ dependencies = [ [[package]] name = "arrow-flight" version = "4.0.0-SNAPSHOT" -source = "git+https://github.com/apache/arrow.git?rev=e69478a890b1e4eee49b540b69b2711d170a0433#e69478a890b1e4eee49b540b69b2711d170a0433" +source = "git+https://github.com/apache/arrow.git?rev=00a443629c00079ea03c0b9f415d74669d2759a7#00a443629c00079ea03c0b9f415d74669d2759a7" dependencies = [ "arrow", "bytes", @@ -429,9 +429,9 @@ dependencies = [ [[package]] name = "cast" -version = "0.2.3" +version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4b9434b9a5aa1450faa3f9cb14ea0e8c53bb5d2b3c1bfd1ab4fc03e9f33fbfb0" +checksum = "cc38c385bfd7e444464011bb24820f40dd1c76bcdfa1b78611cb7c2e5cafab75" dependencies = [ "rustc_version", ] @@ -488,9 +488,9 @@ dependencies = [ [[package]] name = "clang-sys" -version = "1.1.1" +version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f54d78e30b388d4815220c8dd03fea5656b6c6d32adb59e89061552a102f8da1" +checksum = "853eda514c284c2287f4bf20ae614f8781f40a81d32ecda6e91449304dfe077c" dependencies = [ "glob", "libc", @@ -662,9 +662,9 @@ dependencies = [ [[package]] name = "crossbeam-channel" -version = "0.5.0" +version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dca26ee1f8d361640700bde38b2c37d8c22b3ce2d360e1fc1c74ea4b0aa7d775" +checksum = "06ed27e177f16d65f0f0c22a213e17c696ace5dd64b14258b52f9417ccb52db4" dependencies = [ "cfg-if 1.0.0", "crossbeam-utils", @@ -787,7 +787,7 @@ dependencies = [ [[package]] name = "datafusion" version = "4.0.0-SNAPSHOT" -source = "git+https://github.com/apache/arrow.git?rev=e69478a890b1e4eee49b540b69b2711d170a0433#e69478a890b1e4eee49b540b69b2711d170a0433" +source = "git+https://github.com/apache/arrow.git?rev=00a443629c00079ea03c0b9f415d74669d2759a7#00a443629c00079ea03c0b9f415d74669d2759a7" dependencies = [ "ahash 0.7.2", "arrow", @@ -1044,9 +1044,9 @@ checksum = "2022715d62ab30faffd124d40b76f4134a550a87792276512b18d63272333394" [[package]] name = "futures" -version = "0.3.13" +version = "0.3.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f55667319111d593ba876406af7c409c0ebb44dc4be6132a783ccf163ea14c1" +checksum = "a9d5813545e459ad3ca1bff9915e9ad7f1a47dc6a91b627ce321d5863b7dd253" dependencies = [ "futures-channel", "futures-core", @@ -1059,9 +1059,9 @@ dependencies = [ [[package]] name = "futures-channel" -version = "0.3.13" +version = "0.3.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8c2dd2df839b57db9ab69c2c9d8f3e8c81984781937fe2807dc6dcf3b2ad2939" +checksum = "ce79c6a52a299137a6013061e0cf0e688fce5d7f1bc60125f520912fdb29ec25" dependencies = [ "futures-core", "futures-sink", @@ -1069,15 +1069,15 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.13" +version = "0.3.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "15496a72fabf0e62bdc3df11a59a3787429221dd0710ba8ef163d6f7a9112c94" +checksum = "098cd1c6dda6ca01650f1a37a794245eb73181d0d4d4e955e2f3c37db7af1815" [[package]] name = "futures-executor" -version = "0.3.13" +version = "0.3.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "891a4b7b96d84d5940084b2a37632dd65deeae662c114ceaa2c879629c9c0ad1" +checksum = "10f6cb7042eda00f0049b1d2080aa4b93442997ee507eb3828e8bd7577f94c9d" dependencies = [ "futures-core", "futures-task", @@ -1086,15 +1086,15 @@ dependencies = [ [[package]] name = "futures-io" -version = "0.3.13" +version = "0.3.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d71c2c65c57704c32f5241c1223167c2c3294fd34ac020c807ddbe6db287ba59" +checksum = "365a1a1fb30ea1c03a830fdb2158f5236833ac81fa0ad12fe35b29cddc35cb04" [[package]] name = "futures-macro" -version = "0.3.13" +version = "0.3.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ea405816a5139fb39af82c2beb921d52143f556038378d6db21183a5c37fbfb7" +checksum = "668c6733a182cd7deb4f1de7ba3bf2120823835b3bcfbeacf7d2c4a773c1bb8b" dependencies = [ "proc-macro-hack", "proc-macro2", @@ -1104,21 +1104,21 @@ dependencies = [ [[package]] name = "futures-sink" -version = "0.3.13" +version = "0.3.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "85754d98985841b7d4f5e8e6fbfa4a4ac847916893ec511a2917ccd8525b8bb3" +checksum = "5c5629433c555de3d82861a7a4e3794a4c40040390907cfbfd7143a92a426c23" [[package]] name = "futures-task" -version = "0.3.13" +version = "0.3.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fa189ef211c15ee602667a6fcfe1c1fd9e07d42250d2156382820fba33c9df80" +checksum = "ba7aa51095076f3ba6d9a1f702f74bd05ec65f555d70d2033d55ba8d69f581bc" [[package]] name = "futures-test" -version = "0.3.13" +version = "0.3.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f1fe5e51002528907757d5f1648101086f7197f792112db43ba23b06b09e6bce" +checksum = "e77baeade98824bc928c21b8ad39918b9d8a06745ebdb6e2c93fb7673fb7968d" dependencies = [ "futures-core", "futures-executor", @@ -1132,9 +1132,9 @@ dependencies = [ [[package]] name = "futures-util" -version = "0.3.13" +version = "0.3.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1812c7ab8aedf8d6f2701a43e1243acdbcc2b36ab26e2ad421eb99ac963d96d1" +checksum = "3c144ad54d60f23927f0a6b6d816e4271278b64f005ad65e4e35291d2de9c025" dependencies = [ "futures-channel", "futures-core", @@ -2299,7 +2299,7 @@ dependencies = [ [[package]] name = "parquet" version = "4.0.0-SNAPSHOT" -source = "git+https://github.com/apache/arrow.git?rev=e69478a890b1e4eee49b540b69b2711d170a0433#e69478a890b1e4eee49b540b69b2711d170a0433" +source = "git+https://github.com/apache/arrow.git?rev=00a443629c00079ea03c0b9f415d74669d2759a7#00a443629c00079ea03c0b9f415d74669d2759a7" dependencies = [ "arrow", "base64 0.12.3", @@ -2881,9 +2881,9 @@ dependencies = [ [[package]] name = "reqwest" -version = "0.11.2" +version = "0.11.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bf12057f289428dbf5c591c74bf10392e4a8003f993405a902f20117019022d4" +checksum = "2296f2fac53979e8ccbc4a1136b25dcefd37be9ed7e4a1f6b05a6029c84ff124" dependencies = [ "base64 0.13.0", "bytes", @@ -3118,9 +3118,9 @@ checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" [[package]] name = "sct" -version = "0.6.0" +version = "0.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e3042af939fca8c3453b7af0f1c66e533a15a86169e39de2657310ade8f98d3c" +checksum = "b362b83898e0e69f38515b82ee15aa80636befe47c3b6d3d89a911e78fc228ce" dependencies = [ "ring", "untrusted", @@ -3758,9 +3758,9 @@ checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c" [[package]] name = "tokio" -version = "1.4.0" +version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "134af885d758d645f0f0505c9a8b3f9bf8a348fd822e112ab5248138348f1722" +checksum = "83f0c8e7c0addab50b663055baf787d0af7f413a46e6e7fb9559a4e4db7137a5" dependencies = [ "autocfg", "bytes", @@ -3821,9 +3821,9 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.6.5" +version = "0.6.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5143d049e85af7fbc36f5454d990e62c2df705b3589f123b71f441b6b59f443f" +checksum = "940a12c99365c31ea8dd9ba04ec1be183ffe4920102bb7122c2f515437601e8e" dependencies = [ "bytes", "futures-core", diff --git a/arrow_deps/Cargo.toml b/arrow_deps/Cargo.toml index ba8263f2d6..5466195a9f 100644 --- a/arrow_deps/Cargo.toml +++ b/arrow_deps/Cargo.toml @@ -8,14 +8,14 @@ description = "Apache Arrow / Parquet / DataFusion dependencies for InfluxDB IOx [dependencies] # In alphabetical order # We are using development version of arrow/parquet/datafusion and the dependencies are at the same rev -# The version can be found here: https://github.com/apache/arrow/commit/e69478a890b1e4eee49b540b69b2711d170a0433 +# The version can be found here: https://github.com/apache/arrow/commit/00a443629c00079ea03c0b9f415d74669d2759a7 # -arrow = { git = "https://github.com/apache/arrow.git", rev = "e69478a890b1e4eee49b540b69b2711d170a0433" , features = ["simd"] } -arrow-flight = { git = "https://github.com/apache/arrow.git", rev = "e69478a890b1e4eee49b540b69b2711d170a0433" } +arrow = { git = "https://github.com/apache/arrow.git", rev = "00a443629c00079ea03c0b9f415d74669d2759a7" , features = ["simd"] } +arrow-flight = { git = "https://github.com/apache/arrow.git", rev = "00a443629c00079ea03c0b9f415d74669d2759a7" } # Turn off optional datafusion features (function packages) -datafusion = { git = "https://github.com/apache/arrow.git", rev = "e69478a890b1e4eee49b540b69b2711d170a0433", default-features = false } +datafusion = { git = "https://github.com/apache/arrow.git", rev = "00a443629c00079ea03c0b9f415d74669d2759a7", default-features = false } # Turn off the "arrow" feature; it currently has a bug that causes the crate to rebuild every time # and we're not currently using it anyway -parquet = { git = "https://github.com/apache/arrow.git", rev = "e69478a890b1e4eee49b540b69b2711d170a0433", default-features = false, features = ["snap", "brotli", "flate2", "lz4", "zstd"] } +parquet = { git = "https://github.com/apache/arrow.git", rev = "00a443629c00079ea03c0b9f415d74669d2759a7", default-features = false, features = ["snap", "brotli", "flate2", "lz4", "zstd"] } From 61cd745ab67a083f7a647fd1a27d62a95e7c80ec Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> Date: Tue, 13 Apr 2021 20:09:36 +0100 Subject: [PATCH 2/7] refactor: remove mutable buffer predicate logic (#1186) Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com> --- mutable_buffer/src/chunk.rs | 279 +----------------------------------- mutable_buffer/src/lib.rs | 1 - mutable_buffer/src/table.rs | 240 +------------------------------ server/src/db/pred.rs | 30 ---- server/src/db/streams.rs | 102 +------------ 5 files changed, 3 insertions(+), 649 deletions(-) diff --git a/mutable_buffer/src/chunk.rs b/mutable_buffer/src/chunk.rs index 0c8bfccadf..4359a8c276 100644 --- a/mutable_buffer/src/chunk.rs +++ b/mutable_buffer/src/chunk.rs @@ -5,20 +5,17 @@ use std::sync::Arc; use snafu::{OptionExt, ResultExt, Snafu}; -use arrow_deps::{arrow::record_batch::RecordBatch, datafusion::logical_plan::Expr}; +use arrow_deps::arrow::record_batch::RecordBatch; use data_types::{database_rules::WriterId, partition_metadata::TableSummary}; use internal_types::{ entry::{ClockValue, TableBatch}, - schema::Schema, selection::Selection, }; use tracker::{MemRegistry, MemTracker}; use crate::chunk::snapshot::ChunkSnapshot; use crate::{ - column::Column, dictionary::{Dictionary, Error as DictionaryError}, - pred::{ChunkPredicate, ChunkPredicateBuilder}, table::Table, }; use parking_lot::Mutex; @@ -39,54 +36,9 @@ pub enum Error { source: crate::table::Error, }, - #[snafu(display("Error checking predicate in table {}: {}", table_id, source))] - PredicateCheck { - table_id: u32, - source: crate::table::Error, - }, - - #[snafu(display("Error checking predicate in table '{}': {}", table_name, source))] - NamedTablePredicateCheck { - table_name: String, - source: crate::table::Error, - }, - - #[snafu(display( - "Unsupported predicate when mutable buffer table names. Found a general expression: {:?}", - exprs - ))] - PredicateNotYetSupported { exprs: Vec }, - - #[snafu(display("Table ID {} not found in dictionary of chunk {}", table_id, chunk))] - TableIdNotFoundInDictionary { - table_id: u32, - chunk: u64, - source: DictionaryError, - }, - - #[snafu(display( - "Internal error: table {} not found in dictionary of chunk {}", - table_name, - chunk_id - ))] - InternalTableNotFoundInDictionary { table_name: String, chunk_id: u32 }, - #[snafu(display("Table {} not found in chunk {}", table, chunk))] TableNotFoundInChunk { table: u32, chunk: u64 }, - #[snafu(display("Table '{}' not found in chunk {}", table_name, chunk_id))] - NamedTableNotFoundInChunk { table_name: String, chunk_id: u64 }, - - #[snafu(display("Attempt to write table batch without a name"))] - TableWriteWithoutName, - - #[snafu(display("Value ID {} not found in dictionary of chunk {}", value_id, chunk_id))] - InternalColumnValueIdNotFoundInDictionary { - value_id: u32, - chunk_id: u64, - source: DictionaryError, - }, - #[snafu(display("Column ID {} not found in dictionary of chunk {}", column_id, chunk))] ColumnIdNotFoundInDictionary { column_id: u32, @@ -104,12 +56,6 @@ pub enum Error { chunk_id: u64, source: DictionaryError, }, - - #[snafu(display( - "Column '{}' is not a string tag column and thus can not list values", - column_name - ))] - UnsupportedColumnTypeForListingValues { column_name: String }, } pub type Result = std::result::Result; @@ -206,214 +152,6 @@ impl Chunk { snapshot } - /// Return all the names of the tables names in this chunk that match - /// chunk predicate - pub fn table_names(&self, chunk_predicate: &ChunkPredicate) -> Result> { - // we don't support arbitrary expressions in chunk predicate yet - if !chunk_predicate.chunk_exprs.is_empty() { - return PredicateNotYetSupported { - exprs: chunk_predicate.chunk_exprs.clone(), - } - .fail(); - } - - self.tables - .iter() - .filter_map(|(&table_id, table)| { - // could match is good enough for this metadata query - match table.could_match_predicate(chunk_predicate) { - Ok(true) => Some(self.dictionary.lookup_id(table_id).context( - TableIdNotFoundInDictionary { - table_id, - chunk: self.id, - }, - )), - Ok(false) => None, - Err(e) => Some(Err(e).context(PredicateCheck { table_id })), - } - }) - .collect() - } - - /// If the column names that match the predicate can be found - /// from the predicate entirely using metadata, return those - /// strings. - /// - /// If the predicate cannot be evaluated entirely with - /// metadata, return `Ok(None)`. - pub fn column_names( - &self, - table_name: &str, - chunk_predicate: &ChunkPredicate, - selection: Selection<'_>, - ) -> Result>> { - // No support for general purpose expressions - if !chunk_predicate.chunk_exprs.is_empty() { - return Ok(None); - } - - let table_name_id = self.table_name_id(table_name)?; - - let mut chunk_column_ids = BTreeSet::new(); - - // Is this table in the chunk? - if let Some(table) = self.tables.get(&table_name_id) { - for (&column_id, column) in &table.columns { - let column_matches_predicate = table - .column_matches_predicate(&column, chunk_predicate) - .context(NamedTableError { table_name })?; - - if column_matches_predicate { - chunk_column_ids.insert(column_id); - } - } - } - - // Only return subset of these selection_cols if not all_cols - let mut all_cols = true; - let selection_cols = match selection { - Selection::All => &[""], - Selection::Some(cols) => { - all_cols = false; - cols - } - }; - - let mut column_names = BTreeSet::new(); - for &column_id in &chunk_column_ids { - let column_name = - self.dictionary - .lookup_id(column_id) - .context(ColumnIdNotFoundInDictionary { - column_id, - chunk: self.id, - })?; - - if !column_names.contains(column_name) - && (all_cols || selection_cols.contains(&column_name)) - { - // only use columns in selection_cols - column_names.insert(column_name.to_string()); - } - } - - Ok(Some(column_names)) - } - - /// Return the id of the table in the chunk's dictionary - fn table_name_id(&self, table_name: &str) -> Result { - self.dictionary - .id(table_name) - .context(InternalTableNotFoundInDictionary { - table_name, - chunk_id: self.id(), - }) - } - - /// Returns the strings of the specified Tag column that satisfy - /// the predicate, if they can be determined entirely using metadata. - /// - /// If the predicate cannot be evaluated entirely with metadata, - /// return `Ok(None)`. - pub fn tag_column_values( - &self, - table_name: &str, - column_name: &str, - chunk_predicate: &ChunkPredicate, - ) -> Result>> { - // No support for general purpose expressions - if !chunk_predicate.chunk_exprs.is_empty() { - return Ok(None); - } - let chunk_id = self.id(); - - let table_name_id = self.table_name_id(table_name)?; - - // Is this table even in the chunk? - let table = self - .tables - .get(&table_name_id) - .context(NamedTableNotFoundInChunk { - table_name, - chunk_id, - })?; - - // See if we can rule out the table entire on metadata - let could_match = table - .could_match_predicate(chunk_predicate) - .context(NamedTablePredicateCheck { table_name })?; - - if !could_match { - // No columns could match, return empty set - return Ok(Default::default()); - } - - let column_id = - self.dictionary - .lookup_value(column_name) - .context(ColumnNameNotFoundInDictionary { - column_name, - chunk_id, - })?; - - let column = table - .column(column_id) - .context(NamedTableError { table_name })?; - - if let Column::Tag(column, _) = column { - // if we have a timestamp predicate, find all values - // where the timestamp is within range. Otherwise take - // all values. - - // Collect matching ids into BTreeSet to deduplicate on - // ids *before* looking up Strings - let column_value_ids: BTreeSet = match chunk_predicate.range { - None => { - // take all non-null values - column.iter().filter_map(|&s| s).collect() - } - Some(range) => { - // filter out all values that don't match the timestmap - let time_column = table - .column_i64(chunk_predicate.time_column_id) - .context(NamedTableError { table_name })?; - - column - .iter() - .zip(time_column.iter()) - .filter_map(|(&column_value_id, ×tamp_value)| { - if range.contains_opt(timestamp_value) { - column_value_id - } else { - None - } - }) - .collect() - } - }; - - // convert all the (deduplicated) ids to Strings - let column_values = column_value_ids - .into_iter() - .map(|value_id| { - let value = self.dictionary.lookup_id(value_id).context( - InternalColumnValueIdNotFoundInDictionary { value_id, chunk_id }, - )?; - Ok(value.to_string()) - }) - .collect::>>()?; - - Ok(Some(column_values)) - } else { - UnsupportedColumnTypeForListingValues { column_name }.fail() - } - } - - /// Return a builder suitable to create predicates for this Chunk - pub fn predicate_builder(&self) -> Result, crate::pred::Error> { - ChunkPredicateBuilder::new(&self.dictionary) - } - /// returns true if there is no data in this chunk pub fn is_empty(&self) -> bool { self.tables.is_empty() @@ -474,21 +212,6 @@ impl Chunk { Ok(table) } - /// Return Schema for the specified table / columns - pub fn table_schema(&self, table_name: &str, selection: Selection<'_>) -> Result { - let table = self - .table(table_name)? - // Option --> Result - .context(NamedTableNotFoundInChunk { - table_name, - chunk_id: self.id(), - })?; - - table - .schema(self, selection) - .context(NamedTableError { table_name }) - } - /// Return the approximate memory size of the chunk, in bytes including the /// dictionary, tables, and their rows. pub fn size(&self) -> usize { diff --git a/mutable_buffer/src/lib.rs b/mutable_buffer/src/lib.rs index d2e8b64af1..0fd90cc61d 100644 --- a/mutable_buffer/src/lib.rs +++ b/mutable_buffer/src/lib.rs @@ -60,5 +60,4 @@ pub mod chunk; mod column; mod dictionary; -pub mod pred; mod table; diff --git a/mutable_buffer/src/table.rs b/mutable_buffer/src/table.rs index e522eeadba..5439f9f054 100644 --- a/mutable_buffer/src/table.rs +++ b/mutable_buffer/src/table.rs @@ -1,15 +1,10 @@ -use std::{ - cmp, - collections::{BTreeMap, BTreeSet}, - sync::Arc, -}; +use std::{cmp, collections::BTreeMap, sync::Arc}; use crate::{ chunk::Chunk, column, column::Column, dictionary::{Dictionary, Error as DictionaryError}, - pred::{ChunkIdSet, ChunkPredicate}, }; use data_types::{ database_rules::WriterId, @@ -473,118 +468,6 @@ impl Table { RecordBatch::try_new(schema, columns).context(ArrowError {}) } - /// returns true if any row in this table could possible match the - /// predicate. true does not mean any rows will *actually* match, - /// just that the entire table can not be ruled out. - /// - /// false means that no rows in this table could possibly match - pub fn could_match_predicate(&self, chunk_predicate: &ChunkPredicate) -> Result { - Ok( - self.matches_column_name_predicate(chunk_predicate.field_name_predicate.as_ref()) - && self.matches_table_name_predicate(chunk_predicate.table_name_predicate.as_ref()) - && self.matches_timestamp_predicate(chunk_predicate)? - && self.has_columns(chunk_predicate.required_columns.as_ref()), - ) - } - - /// Returns true if the table contains any of the field columns - /// requested or there are no specific fields requested. - fn matches_column_name_predicate(&self, column_selection: Option<&BTreeSet>) -> bool { - match column_selection { - Some(column_selection) => { - for column_id in column_selection { - if let Some(column) = self.columns.get(column_id) { - if !column.is_tag() { - return true; - } - } - } - - // selection only had tag columns - false - } - None => true, // no specific selection - } - } - - fn matches_table_name_predicate(&self, table_name_predicate: Option<&BTreeSet>) -> bool { - match table_name_predicate { - Some(table_name_predicate) => table_name_predicate.contains(&self.id), - None => true, // no table predicate - } - } - - /// returns true if there are any timestamps in this table that - /// fall within the timestamp range - fn matches_timestamp_predicate(&self, chunk_predicate: &ChunkPredicate) -> Result { - match &chunk_predicate.range { - None => Ok(true), - Some(range) => { - let time_column_id = chunk_predicate.time_column_id; - let time_column = self.column(time_column_id)?; - time_column.has_i64_range(range.start, range.end).context( - ColumnPredicateEvaluation { - column: time_column_id, - }, - ) - } - } - } - - /// returns true if no columns are specified, or the table has all - /// columns specified - fn has_columns(&self, columns: Option<&ChunkIdSet>) -> bool { - if let Some(columns) = columns { - match columns { - ChunkIdSet::AtLeastOneMissing => return false, - ChunkIdSet::Present(symbols) => { - for symbol in symbols { - if !self.columns.contains_key(symbol) { - return false; - } - } - } - } - } - true - } - - /// returns true if there are any rows in column that are non-null - /// and within the timestamp range specified by pred - pub(crate) fn column_matches_predicate( - &self, - column: &Column, - chunk_predicate: &ChunkPredicate, - ) -> Result { - match column { - Column::F64(v, _) => self.column_value_matches_predicate(v, chunk_predicate), - Column::I64(v, _) => self.column_value_matches_predicate(v, chunk_predicate), - Column::U64(v, _) => self.column_value_matches_predicate(v, chunk_predicate), - Column::String(v, _) => self.column_value_matches_predicate(v, chunk_predicate), - Column::Bool(v, _) => self.column_value_matches_predicate(v, chunk_predicate), - Column::Tag(v, _) => self.column_value_matches_predicate(v, chunk_predicate), - } - } - - fn column_value_matches_predicate( - &self, - column_value: &[Option], - chunk_predicate: &ChunkPredicate, - ) -> Result { - match chunk_predicate.range { - None => Ok(true), - Some(range) => { - let time_column_id = chunk_predicate.time_column_id; - let time_column = self.column(time_column_id)?; - time_column - .has_non_null_i64_range(column_value, range.start, range.end) - .context(ColumnPredicateEvaluation { - column: time_column_id, - }) - } - } - } - pub fn stats(&self, chunk: &Chunk) -> Vec { self.columns .iter() @@ -639,49 +522,6 @@ mod tests { use super::*; use tracker::MemRegistry; - #[test] - fn test_has_columns() { - let registry = Arc::new(MemRegistry::new()); - let mut chunk = Chunk::new(42, registry.as_ref()); - let dictionary = &mut chunk.dictionary; - let mut table = Table::new(dictionary.lookup_value_or_insert("table_name")); - - let lp_lines = vec![ - "h2o,state=MA,city=Boston temp=70.4 100", - "h2o,state=MA,city=Boston temp=72.4 250", - ]; - - write_lines_to_table(&mut table, dictionary, lp_lines); - - let state_symbol = dictionary.id("state").unwrap(); - let new_symbol = dictionary.lookup_value_or_insert("not_a_columns"); - - assert!(table.has_columns(None)); - - let pred = ChunkIdSet::AtLeastOneMissing; - assert!(!table.has_columns(Some(&pred))); - - let set = BTreeSet::::new(); - let pred = ChunkIdSet::Present(set); - assert!(table.has_columns(Some(&pred))); - - let mut set = BTreeSet::new(); - set.insert(state_symbol); - let pred = ChunkIdSet::Present(set); - assert!(table.has_columns(Some(&pred))); - - let mut set = BTreeSet::new(); - set.insert(new_symbol); - let pred = ChunkIdSet::Present(set); - assert!(!table.has_columns(Some(&pred))); - - let mut set = BTreeSet::new(); - set.insert(state_symbol); - set.insert(new_symbol); - let pred = ChunkIdSet::Present(set); - assert!(!table.has_columns(Some(&pred))); - } - #[test] fn table_size() { let registry = Arc::new(MemRegistry::new()); @@ -706,84 +546,6 @@ mod tests { assert_eq!(320, table.size()); } - #[test] - fn test_matches_table_name_predicate() { - let registry = Arc::new(MemRegistry::new()); - let mut chunk = Chunk::new(42, registry.as_ref()); - let dictionary = &mut chunk.dictionary; - let mut table = Table::new(dictionary.lookup_value_or_insert("h2o")); - - let lp_lines = vec![ - "h2o,state=MA,city=Boston temp=70.4 100", - "h2o,state=MA,city=Boston temp=72.4 250", - ]; - write_lines_to_table(&mut table, dictionary, lp_lines); - - let h2o_symbol = dictionary.id("h2o").unwrap(); - - assert!(table.matches_table_name_predicate(None)); - - let set = BTreeSet::new(); - assert!(!table.matches_table_name_predicate(Some(&set))); - - let mut set = BTreeSet::new(); - set.insert(h2o_symbol); - assert!(table.matches_table_name_predicate(Some(&set))); - - // Some symbol that is not the same as h2o_symbol - assert_ne!(37377, h2o_symbol); - let mut set = BTreeSet::new(); - set.insert(37377); - assert!(!table.matches_table_name_predicate(Some(&set))); - } - - #[test] - fn test_matches_column_name_predicate() { - let registry = Arc::new(MemRegistry::new()); - let mut chunk = Chunk::new(42, registry.as_ref()); - let dictionary = &mut chunk.dictionary; - let mut table = Table::new(dictionary.lookup_value_or_insert("h2o")); - - let lp_lines = vec![ - "h2o,state=MA,city=Boston temp=70.4,awesomeness=1000 100", - "h2o,state=MA,city=Boston temp=72.4,awesomeness=2000 250", - ]; - write_lines_to_table(&mut table, dictionary, lp_lines); - - let state_symbol = dictionary.id("state").unwrap(); - let temp_symbol = dictionary.id("temp").unwrap(); - let awesomeness_symbol = dictionary.id("awesomeness").unwrap(); - - assert!(table.matches_column_name_predicate(None)); - - let set = BTreeSet::new(); - assert!(!table.matches_column_name_predicate(Some(&set))); - - // tag columns should not count - let mut set = BTreeSet::new(); - set.insert(state_symbol); - assert!(!table.matches_column_name_predicate(Some(&set))); - - let mut set = BTreeSet::new(); - set.insert(temp_symbol); - assert!(table.matches_column_name_predicate(Some(&set))); - - let mut set = BTreeSet::new(); - set.insert(temp_symbol); - set.insert(awesomeness_symbol); - assert!(table.matches_column_name_predicate(Some(&set))); - - let mut set = BTreeSet::new(); - set.insert(temp_symbol); - set.insert(awesomeness_symbol); - set.insert(1337); // some other symbol, but that is ok - assert!(table.matches_column_name_predicate(Some(&set))); - - let mut set = BTreeSet::new(); - set.insert(1337); - assert!(!table.matches_column_name_predicate(Some(&set))); - } - #[test] fn test_to_arrow_schema_all() { let registry = Arc::new(MemRegistry::new()); diff --git a/server/src/db/pred.rs b/server/src/db/pred.rs index 02276d0d31..ec8f469f80 100644 --- a/server/src/db/pred.rs +++ b/server/src/db/pred.rs @@ -3,7 +3,6 @@ use std::convert::TryFrom; -use mutable_buffer::{chunk::Chunk, pred::ChunkPredicate}; use query::predicate::Predicate; use snafu::Snafu; @@ -11,15 +10,6 @@ use snafu::Snafu; pub enum Error { #[snafu(display("Error translating predicate: {}", msg))] ReadBufferPredicate { msg: String, pred: Predicate }, - - #[snafu(display("Error building predicate for mutable buffer: {}", source))] - MutableBufferPredicate { source: mutable_buffer::pred::Error }, -} - -impl From for Error { - fn from(source: mutable_buffer::pred::Error) -> Self { - Self::MutableBufferPredicate { source } - } } pub type Result = std::result::Result; @@ -52,25 +42,6 @@ pub fn to_read_buffer_predicate(predicate: &Predicate) -> Result, - predicate: &Predicate, -) -> Result { - let predicate = chunk - .as_ref() - .predicate_builder()? - .table_names(predicate.table_names.as_ref())? - .field_names(predicate.field_columns.as_ref())? - .range(predicate.range)? - // it would be nice to avoid cloning all the exprs here. - .exprs(predicate.exprs.clone())? - .build(); - - Ok(predicate) -} - #[cfg(test)] pub mod test { use super::*; @@ -196,7 +167,6 @@ pub mod test { Error::ReadBufferPredicate { msg, pred: _ } => { assert_eq!(msg, exp.to_owned()); } - _ => panic!("Unexpected error type"), } } } diff --git a/server/src/db/streams.rs b/server/src/db/streams.rs index b421c018a2..8aa5243296 100644 --- a/server/src/db/streams.rs +++ b/server/src/db/streams.rs @@ -1,15 +1,9 @@ //! Adapter streams for different Chunk types that implement the interface //! needed by DataFusion use arrow_deps::{ - arrow::{ - datatypes::SchemaRef, - error::{ArrowError, Result as ArrowResult}, - record_batch::RecordBatch, - }, + arrow::{datatypes::SchemaRef, error::Result as ArrowResult, record_batch::RecordBatch}, datafusion::physical_plan::RecordBatchStream, }; -use internal_types::selection::Selection; -use mutable_buffer::chunk::Chunk as MBChunk; use read_buffer::ReadFilterResults; use std::{ @@ -17,100 +11,6 @@ use std::{ task::{Context, Poll}, }; -use snafu::{ResultExt, Snafu}; - -#[derive(Debug, Snafu)] -pub enum Error { - #[snafu(display( - "Error getting data for table '{}' chunk {}: {}", - table_name, - chunk_id, - source - ))] - GettingTableData { - table_name: String, - chunk_id: u32, - source: mutable_buffer::chunk::Error, - }, -} - -/// Adapter which will produce record batches from a mutable buffer -/// chunk on demand -pub(crate) struct MutableBufferChunkStream { - /// Requested output schema (includes selection) - schema: SchemaRef, - chunk: Arc, - table_name: Arc, - - /// Vector of record batches to send in reverse order (send data[len-1] - /// next) Is None until the first call to poll_next - data: Option>, -} - -impl MutableBufferChunkStream { - #[allow(dead_code)] - pub fn new(chunk: Arc, schema: SchemaRef, table_name: impl Into) -> Self { - Self { - chunk, - schema, - table_name: Arc::new(table_name.into()), - data: None, - } - } - - // gets the next batch, as needed - fn next_batch(&mut self) -> ArrowResult> { - if self.data.is_none() { - // Want all the columns in the schema. Note we don't - // use `Selection::All` here because the mutable buffer chunk would interpret it - // as "all columns in the table in that chunk" rather than - // all columns this query needs - let selected_cols = self - .schema - .fields() - .iter() - .map(|f| f.name() as &str) - .collect::>(); - let selection = Selection::Some(&selected_cols); - - let mut data = Vec::new(); - self.chunk - .table_to_arrow(&mut data, self.table_name.as_ref(), selection) - .context(GettingTableData { - table_name: self.table_name.as_ref(), - chunk_id: self.chunk.id(), - }) - .map_err(|e| ArrowError::ExternalError(Box::new(e)))?; - - // reverse the array so we can pop off the back - data.reverse(); - self.data = Some(data); - } - - // self.data was set to Some above - Ok(self.data.as_mut().unwrap().pop()) - } -} - -impl RecordBatchStream for MutableBufferChunkStream { - fn schema(&self) -> SchemaRef { - Arc::clone(&self.schema) - } -} - -impl futures::Stream for MutableBufferChunkStream { - type Item = ArrowResult; - - fn poll_next( - mut self: std::pin::Pin<&mut Self>, - _: &mut Context<'_>, - ) -> Poll> { - Poll::Ready(self.next_batch().transpose()) - } - - // TODO is there a useful size_hint to pass? -} - /// Adapter which will take a ReadFilterResults and make it an async stream pub struct ReadFilterResultsStream { read_results: ReadFilterResults, From 7ef490694c7de3aadb5d2fbca7f1244f4230eb29 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> Date: Tue, 13 Apr 2021 21:59:41 +0100 Subject: [PATCH 3/7] refactor: reduce module coupling in mutable buffer (#1199) * refactor: reduce module coupling in mutable buffer * refactor: tweak visibility * chore: formatting --- mutable_buffer/src/chunk.rs | 10 +- mutable_buffer/src/chunk/snapshot.rs | 4 +- mutable_buffer/src/column.rs | 185 ----------------- mutable_buffer/src/pred.rs | 298 --------------------------- mutable_buffer/src/table.rs | 145 +++++-------- 5 files changed, 61 insertions(+), 581 deletions(-) delete mode 100644 mutable_buffer/src/pred.rs diff --git a/mutable_buffer/src/chunk.rs b/mutable_buffer/src/chunk.rs index 4359a8c276..b2711cb7f1 100644 --- a/mutable_buffer/src/chunk.rs +++ b/mutable_buffer/src/chunk.rs @@ -63,16 +63,16 @@ pub type Result = std::result::Result; #[derive(Debug)] pub struct Chunk { /// The id for this chunk - pub id: u32, + id: u32, /// `dictionary` maps &str -> u32. The u32s are used in place of String or /// str to avoid slow string operations. The same dictionary is used for /// table names, tag names, tag values, and column names. // TODO: intern string field values too? - pub dictionary: Dictionary, + dictionary: Dictionary, /// map of the dictionary ID for the table name to the table - pub tables: HashMap, + tables: HashMap, /// keep track of memory used by chunk tracker: MemTracker, @@ -173,7 +173,7 @@ impl Chunk { if let Some(table) = self.table(table_name)? { dst.push( table - .to_arrow(&self, selection) + .to_arrow(&self.dictionary, selection) .context(NamedTableError { table_name })?, ); } @@ -192,7 +192,7 @@ impl Chunk { TableSummary { name: name.to_string(), - columns: table.stats(&self), + columns: table.stats(&self.dictionary), } }) .collect() diff --git a/mutable_buffer/src/chunk/snapshot.rs b/mutable_buffer/src/chunk/snapshot.rs index 901b8c5640..4f6e1f3c15 100644 --- a/mutable_buffer/src/chunk/snapshot.rs +++ b/mutable_buffer/src/chunk/snapshot.rs @@ -55,8 +55,8 @@ impl ChunkSnapshot { pub fn new(chunk: &Chunk) -> Self { let mut records: HashMap = Default::default(); for (id, table) in &chunk.tables { - let schema = table.schema(chunk, Selection::All).unwrap(); - let batch = table.to_arrow(chunk, Selection::All).unwrap(); + let schema = table.schema(&chunk.dictionary, Selection::All).unwrap(); + let batch = table.to_arrow(&chunk.dictionary, Selection::All).unwrap(); let name = chunk.dictionary.lookup_id(*id).unwrap(); let timestamp_range = chunk diff --git a/mutable_buffer/src/column.rs b/mutable_buffer/src/column.rs index 7d822aa103..dfe9617d2b 100644 --- a/mutable_buffer/src/column.rs +++ b/mutable_buffer/src/column.rs @@ -1,7 +1,6 @@ use snafu::Snafu; use crate::dictionary::Dictionary; -use arrow_deps::arrow::datatypes::DataType as ArrowDataType; use data_types::partition_metadata::StatValues; use generated_types::entry::LogicalColumnType; use internal_types::entry::TypedValuesIterator; @@ -320,10 +319,6 @@ impl Column { } } - pub fn is_empty(&self) -> bool { - self.len() == 0 - } - pub fn type_description(&self) -> &'static str { match self { Self::F64(_, _) => "f64", @@ -335,56 +330,6 @@ impl Column { } } - /// Return the arrow DataType for this column - pub fn data_type(&self) -> ArrowDataType { - match self { - Self::F64(..) => ArrowDataType::Float64, - Self::I64(..) => ArrowDataType::Int64, - Self::U64(..) => ArrowDataType::UInt64, - Self::String(..) => ArrowDataType::Utf8, - Self::Bool(..) => ArrowDataType::Boolean, - Self::Tag(..) => ArrowDataType::Utf8, - } - } - - // push_none_if_len_equal will add a None value to the end of the Vec of values - // if the length is equal to the passed in value. This is used to ensure - // columns are all the same length. - pub fn push_none_if_len_equal(&mut self, len: usize) { - match self { - Self::F64(v, _) => { - if v.len() == len { - v.push(None); - } - } - Self::I64(v, _) => { - if v.len() == len { - v.push(None); - } - } - Self::U64(v, _) => { - if v.len() == len { - v.push(None); - } - } - Self::String(v, _) => { - if v.len() == len { - v.push(None); - } - } - Self::Bool(v, _) => { - if v.len() == len { - v.push(None); - } - } - Self::Tag(v, _) => { - if v.len() == len { - v.push(None); - } - } - } - } - pub fn get_i64_stats(&self) -> Option> { match self { Self::I64(_, values) => Some(values.clone()), @@ -392,50 +337,6 @@ impl Column { } } - /// Returns true if any rows are within the range [min_value, - /// max_value). Inclusive of `start`, exclusive of `end` - pub fn has_i64_range(&self, start: i64, end: i64) -> Result { - match self { - Self::I64(_, stats) => { - if stats.max < start || stats.min >= end { - Ok(false) - } else { - Ok(true) - } - } - _ => InternalTypeMismatchForTimePredicate {}.fail(), - } - } - - /// Return true of this column's type is a Tag - pub fn is_tag(&self) -> bool { - matches!(self, Self::Tag(..)) - } - - /// Returns true if there exists at least one row idx where this - /// self[i] is within the range [min_value, max_value). Inclusive - /// of `start`, exclusive of `end` and where col[i] is non null - pub fn has_non_null_i64_range( - &self, - column: &[Option], - start: i64, - end: i64, - ) -> Result { - match self { - Self::I64(v, _) => { - for (index, val) in v.iter().enumerate() { - if let Some(val) = val { - if start <= *val && *val < end && column[index].is_some() { - return Ok(true); - } - } - } - Ok(false) - } - _ => InternalTypeMismatchForTimePredicate {}.fail(), - } - } - /// The approximate memory size of the data in the column. Note that /// the space taken for the tag string values is represented in /// the dictionary size in the chunk that holds the table that has this @@ -467,89 +368,3 @@ impl Column { } } } - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_has_i64_range() { - let mut stats = StatValues::new(1); - stats.update(2); - let col = Column::I64(vec![Some(1), None, Some(2)], stats.clone()); - assert!(!col.has_i64_range(-1, 0).unwrap()); - assert!(!col.has_i64_range(0, 1).unwrap()); - assert!(col.has_i64_range(1, 2).unwrap()); - assert!(col.has_i64_range(2, 3).unwrap()); - assert!(!col.has_i64_range(3, 4).unwrap()); - - let col = Column::I64(vec![Some(2), None, Some(1)], stats); - assert!(!col.has_i64_range(-1, 0).unwrap()); - assert!(!col.has_i64_range(0, 1).unwrap()); - assert!(col.has_i64_range(1, 2).unwrap()); - assert!(col.has_i64_range(2, 3).unwrap()); - assert!(!col.has_i64_range(3, 4).unwrap()); - } - - #[test] - fn test_has_i64_range_does_not_panic() { - // providing the wrong column type should get an internal error, not a panic - let col = Column::F64(vec![Some(1.2)], StatValues::new(1.2)); - let res = col.has_i64_range(-1, 0); - assert!(res.is_err()); - let res_string = format!("{:?}", res); - let expected = "InternalTypeMismatchForTimePredicate"; - assert!( - res_string.contains(expected), - "Did not find expected text '{}' in '{}'", - expected, - res_string - ); - } - - #[test] - fn test_has_non_null_i64_range_() { - let none_col: Vec> = vec![None, None, None]; - let some_col: Vec> = vec![Some(0), Some(0), Some(0)]; - - let mut stats = StatValues::new(1); - stats.update(2); - let col = Column::I64(vec![Some(1), None, Some(2)], stats); - - assert!(!col.has_non_null_i64_range(&some_col, -1, 0).unwrap()); - assert!(!col.has_non_null_i64_range(&some_col, 0, 1).unwrap()); - assert!(col.has_non_null_i64_range(&some_col, 1, 2).unwrap()); - assert!(col.has_non_null_i64_range(&some_col, 2, 3).unwrap()); - assert!(!col.has_non_null_i64_range(&some_col, 3, 4).unwrap()); - - assert!(!col.has_non_null_i64_range(&none_col, -1, 0).unwrap()); - assert!(!col.has_non_null_i64_range(&none_col, 0, 1).unwrap()); - assert!(!col.has_non_null_i64_range(&none_col, 1, 2).unwrap()); - assert!(!col.has_non_null_i64_range(&none_col, 2, 3).unwrap()); - assert!(!col.has_non_null_i64_range(&none_col, 3, 4).unwrap()); - } - - #[test] - fn column_size() { - let i64col = Column::I64(vec![Some(1), Some(1)], StatValues::new(1)); - assert_eq!(40, i64col.size()); - - let f64col = Column::F64(vec![Some(1.1), Some(1.1), Some(1.1)], StatValues::new(1.1)); - assert_eq!(56, f64col.size()); - - let boolcol = Column::Bool(vec![Some(true)], StatValues::new(true)); - assert_eq!(9, boolcol.size()); - - let tagcol = Column::Tag( - vec![Some(1), Some(1), Some(1), Some(1)], - StatValues::new("foo".to_string()), - ); - assert_eq!(40, tagcol.size()); - - let stringcol = Column::String( - vec![Some("foo".to_string()), Some("hello world".to_string())], - StatValues::new("foo".to_string()), - ); - assert_eq!(70, stringcol.size()); - } -} diff --git a/mutable_buffer/src/pred.rs b/mutable_buffer/src/pred.rs deleted file mode 100644 index 24a7db3074..0000000000 --- a/mutable_buffer/src/pred.rs +++ /dev/null @@ -1,298 +0,0 @@ -use std::collections::{BTreeSet, HashSet}; - -use crate::dictionary::{Dictionary, Error as DictionaryError}; - -use arrow_deps::{ - datafusion::{ - error::{DataFusionError, Result as DatafusionResult}, - logical_plan::{Expr, ExpressionVisitor, Operator, Recursion}, - optimizer::utils::expr_to_column_names, - }, - util::{make_range_expr, AndExprBuilder}, -}; -use data_types::timestamp::TimestampRange; -use internal_types::schema::TIME_COLUMN_NAME; - -//use snafu::{OptionExt, ResultExt, Snafu}; -use snafu::{ensure, ResultExt, Snafu}; - -#[derive(Debug, Snafu)] -pub enum Error { - #[snafu(display("Error writing table '{}': {}", table_name, source))] - TableWrite { - table_name: String, - source: crate::table::Error, - }, - - #[snafu(display("Time Column was not not found in dictionary: {}", source))] - TimeColumnNotFound { source: DictionaryError }, - - #[snafu(display("Unsupported predicate. Mutable buffer does not support: {}", source))] - UnsupportedPredicate { source: DataFusionError }, - - #[snafu(display( - "Internal error visiting expressions in ChunkPredicateBuilder: {}", - source - ))] - InternalVisitingExpressions { source: DataFusionError }, - - #[snafu(display("table_names has already been specified in ChunkPredicateBuilder"))] - TableNamesAlreadySet {}, - - #[snafu(display("field_names has already been specified in ChunkPredicateBuilder"))] - FieldNamesAlreadySet {}, - - #[snafu(display("range has already been specified in ChunkPredicateBuilder"))] - RangeAlreadySet {}, - - #[snafu(display("exprs has already been specified in ChunkPredicateBuilder"))] - ExprsAlreadySet {}, - - #[snafu(display("required_columns has already been specified in ChunkPredicateBuilder"))] - RequiredColumnsAlreadySet {}, -} - -pub type Result = std::result::Result; - -/// Describes the result of translating a set of strings into -/// chunk specific ids -#[derive(Debug, PartialEq, Eq)] -pub enum ChunkIdSet { - /// At least one of the strings was not present in the chunks' - /// dictionary. - /// - /// This is important when testing for the presence of all ids in - /// a set, as we know they can not all be present - AtLeastOneMissing, - - /// All strings existed in this chunk's dictionary - Present(BTreeSet), -} - -/// a 'Compiled' set of predicates / filters that can be evaluated on -/// this chunk (where strings have been translated to chunk -/// specific u32 ids) -#[derive(Debug, Default)] -pub struct ChunkPredicate { - /// If present, restrict the request to just those tables whose - /// names are in table_names. If present but empty, means there - /// was a predicate but no tables named that way exist in the - /// chunk (so no table can pass) - pub table_name_predicate: Option>, - - /// Optional column restriction. If present, further - /// restrict any field columns returned to only those named, and - /// skip tables entirely when querying metadata that do not have - /// *any* of the fields - pub field_name_predicate: Option>, - - /// General DataFusion expressions (arbitrary predicates) applied - /// as a filter using logical conjuction (aka are 'AND'ed - /// together). Only rows that evaluate to TRUE for all these - /// expressions should be returned. - /// - /// TODO these exprs should eventually be removed (when they are - /// all handled one layer up in the query layer) - pub chunk_exprs: Vec, - - /// If Some, then the table must contain all columns specified - /// to pass the predicate - pub required_columns: Option, - - /// The id of the "time" column in this chunk - pub time_column_id: u32, - - /// Timestamp range: only rows within this range should be considered - pub range: Option, -} - -impl ChunkPredicate { - /// Creates and adds a datafuson predicate representing the - /// combination of predicate and timestamp. - pub fn filter_expr(&self) -> Option { - // build up a list of expressions - let mut builder = - AndExprBuilder::default().append_opt(self.make_timestamp_predicate_expr()); - - for expr in &self.chunk_exprs { - builder = builder.append_expr(expr.clone()); - } - - builder.build() - } - - /// For plans which select a subset of fields, returns true if - /// the field should be included in the results - pub fn should_include_field(&self, field_id: u32) -> bool { - match &self.field_name_predicate { - None => true, - Some(field_restriction) => field_restriction.contains(&field_id), - } - } - - /// Return true if this column is the time column - pub fn is_time_column(&self, id: u32) -> bool { - self.time_column_id == id - } - - /// Creates a DataFusion predicate for appliying a timestamp range: - /// - /// range.start <= time and time < range.end` - fn make_timestamp_predicate_expr(&self) -> Option { - self.range - .map(|range| make_range_expr(range.start, range.end, TIME_COLUMN_NAME)) - } -} - -/// Builds ChunkPredicates -#[derive(Debug)] -pub struct ChunkPredicateBuilder<'a> { - inner: ChunkPredicate, - dictionary: &'a Dictionary, -} - -impl<'a> ChunkPredicateBuilder<'a> { - pub fn new(dictionary: &'a Dictionary) -> Result { - let time_column_id = dictionary - .lookup_value(TIME_COLUMN_NAME) - .context(TimeColumnNotFound)?; - - let inner = ChunkPredicate { - time_column_id, - ..Default::default() - }; - - Ok(Self { inner, dictionary }) - } - - /// Set table_name_predicate so only tables in `names` are returned - pub fn table_names(mut self, names: Option<&BTreeSet>) -> Result { - ensure!( - self.inner.table_name_predicate.is_none(), - TableNamesAlreadySet - ); - self.inner.table_name_predicate = self.compile_string_list(names); - Ok(self) - } - - /// Set field_name_predicate so only tables in `names` are returned - pub fn field_names(mut self, names: Option<&BTreeSet>) -> Result { - ensure!( - self.inner.field_name_predicate.is_none(), - FieldNamesAlreadySet - ); - self.inner.field_name_predicate = self.compile_string_list(names); - Ok(self) - } - - pub fn range(mut self, range: Option) -> Result { - ensure!(self.inner.range.is_none(), RangeAlreadySet); - self.inner.range = range; - Ok(self) - } - - /// Set the general purpose predicates - pub fn exprs(mut self, chunk_exprs: Vec) -> Result { - // In order to evaluate expressions in the table, all columns - // referenced in the expression must appear (I think, not sure - // about NOT, etc so panic if we see one of those); - let mut visitor = SupportVisitor {}; - let mut predicate_columns: HashSet = HashSet::new(); - for expr in &chunk_exprs { - visitor = expr.accept(visitor).context(UnsupportedPredicate)?; - expr_to_column_names(&expr, &mut predicate_columns) - .context(InternalVisitingExpressions)?; - } - - ensure!(self.inner.chunk_exprs.is_empty(), ExprsAlreadySet); - self.inner.chunk_exprs = chunk_exprs; - - // if there are any column references in the expression, ensure they appear in - // any table - if !predicate_columns.is_empty() { - ensure!( - self.inner.required_columns.is_none(), - RequiredColumnsAlreadySet - ); - self.inner.required_columns = Some(self.make_chunk_ids(predicate_columns.iter())); - } - Ok(self) - } - - /// Return the created chunk predicate, consuming self - pub fn build(self) -> ChunkPredicate { - self.inner - } - - /// Converts a Set of strings into a set of ids in terms of this - /// Chunk's dictionary. - /// - /// If there are no matching Strings in the chunks dictionary, - /// those strings are ignored and a (potentially empty) set is - /// returned. - fn compile_string_list(&self, names: Option<&BTreeSet>) -> Option> { - names.map(|names| { - names - .iter() - .filter_map(|name| self.dictionary.id(name)) - .collect::>() - }) - } - - /// Translate a bunch of strings into a set of ids from the dictionarythis - /// chunk - pub fn make_chunk_ids<'b, I>(&self, predicate_columns: I) -> ChunkIdSet - where - I: Iterator, - { - let mut symbols = BTreeSet::new(); - for column_name in predicate_columns { - if let Some(column_id) = self.dictionary.id(column_name) { - symbols.insert(column_id); - } else { - return ChunkIdSet::AtLeastOneMissing; - } - } - - ChunkIdSet::Present(symbols) - } -} - -/// Used to figure out if we know how to deal with this kind of -/// predicate in the write buffer -struct SupportVisitor {} - -impl ExpressionVisitor for SupportVisitor { - fn pre_visit(self, expr: &Expr) -> DatafusionResult> { - match expr { - Expr::Literal(..) => Ok(Recursion::Continue(self)), - Expr::Column(..) => Ok(Recursion::Continue(self)), - Expr::BinaryExpr { op, .. } => { - match op { - Operator::Eq - | Operator::Lt - | Operator::LtEq - | Operator::Gt - | Operator::GtEq - | Operator::Plus - | Operator::Minus - | Operator::Multiply - | Operator::Divide - | Operator::And - | Operator::Or => Ok(Recursion::Continue(self)), - // Unsupported (need to think about ramifications) - Operator::NotEq | Operator::Modulus | Operator::Like | Operator::NotLike => { - Err(DataFusionError::NotImplemented(format!( - "Operator {:?} not yet supported in IOx MutableBuffer", - op - ))) - } - } - } - _ => Err(DataFusionError::NotImplemented(format!( - "Unsupported expression in mutable_buffer database: {:?}", - expr - ))), - } - } -} diff --git a/mutable_buffer/src/table.rs b/mutable_buffer/src/table.rs index 5439f9f054..687b5dcffa 100644 --- a/mutable_buffer/src/table.rs +++ b/mutable_buffer/src/table.rs @@ -1,7 +1,6 @@ use std::{cmp, collections::BTreeMap, sync::Arc}; use crate::{ - chunk::Chunk, column, column::Column, dictionary::{Dictionary, Error as DictionaryError}, @@ -31,12 +30,8 @@ use arrow_deps::{ #[derive(Debug, Snafu)] pub enum Error { - #[snafu(display("Tag value ID {} not found in dictionary of chunk {}", value, chunk))] - TagValueIdNotFoundInDictionary { - value: u32, - chunk: u64, - source: DictionaryError, - }, + #[snafu(display("Tag value ID {} not found in dictionary of chunk", value))] + TagValueIdNotFoundInDictionary { value: u32, source: DictionaryError }, #[snafu(display("Column error on column {}: {}", column, source))] ColumnError { @@ -59,21 +54,12 @@ pub enum Error { #[snafu(display("Internal error: unexpected aggregate request for None aggregate",))] InternalUnexpectedNoneAggregate {}, - #[snafu(display( - "Column name '{}' not found in dictionary of chunk {}", - column_name, - chunk - ))] - ColumnNameNotFoundInDictionary { column_name: String, chunk: u64 }, + #[snafu(display("Column name '{}' not found in dictionary of chunk", column_name,))] + ColumnNameNotFoundInDictionary { column_name: String }, - #[snafu(display( - "Internal: Column id '{}' not found in dictionary of chunk {}", - column_id, - chunk - ))] + #[snafu(display("Internal: Column id '{}' not found in dictionary", column_id,))] ColumnIdNotFoundInDictionary { column_id: u32, - chunk: u64, source: DictionaryError, }, @@ -165,21 +151,6 @@ impl Table { }) } - /// Returns a reference to the specified column as a slice of - /// i64s. Errors if the type is not i64 - pub fn column_i64(&self, column_id: u32) -> Result<&[Option]> { - let column = self.column(column_id)?; - match column { - Column::I64(vals, _) => Ok(vals), - _ => InternalColumnTypeMismatch { - column_id, - expected_column_type: "i64", - actual_column_type: column.type_description(), - } - .fail(), - } - } - /// Validates the schema of the passed in columns, then adds their values to /// the associated columns in the table and updates summary statistics. pub fn write_columns( @@ -277,17 +248,20 @@ impl Table { /// Returns the column selection for all the columns in this table, orderd /// by table name - fn all_columns_selection<'a>(&self, chunk: &'a Chunk) -> Result> { + fn all_columns_selection<'a>( + &self, + dictionary: &'a Dictionary, + ) -> Result> { let cols = self .columns .iter() .map(|(column_id, _)| { - let column_name = chunk.dictionary.lookup_id(*column_id).context( - ColumnIdNotFoundInDictionary { - column_id: *column_id, - chunk: chunk.id, - }, - )?; + let column_name = + dictionary + .lookup_id(*column_id) + .context(ColumnIdNotFoundInDictionary { + column_id: *column_id, + })?; Ok(ColSelection { column_name, column_id: *column_id, @@ -304,45 +278,45 @@ impl Table { /// Returns a column selection for just the specified columns fn specific_columns_selection<'a>( &self, - chunk: &'a Chunk, + dictionary: &'a Dictionary, columns: &'a [&'a str], ) -> Result> { - let cols = - columns - .iter() - .map(|&column_name| { - let column_id = chunk.dictionary.id(column_name).context( - ColumnNameNotFoundInDictionary { - column_name, - chunk: chunk.id, - }, - )?; + let cols = columns + .iter() + .map(|&column_name| { + let column_id = dictionary + .id(column_name) + .context(ColumnNameNotFoundInDictionary { column_name })?; - Ok(ColSelection { - column_name, - column_id, - }) + Ok(ColSelection { + column_name, + column_id, }) - .collect::>()?; + }) + .collect::>()?; Ok(TableColSelection { cols }) } /// Converts this table to an arrow record batch. - pub fn to_arrow(&self, chunk: &Chunk, selection: Selection<'_>) -> Result { + pub fn to_arrow( + &self, + dictionary: &Dictionary, + selection: Selection<'_>, + ) -> Result { // translate chunk selection into name/indexes: let selection = match selection { - Selection::All => self.all_columns_selection(chunk), - Selection::Some(cols) => self.specific_columns_selection(chunk, cols), + Selection::All => self.all_columns_selection(dictionary), + Selection::Some(cols) => self.specific_columns_selection(dictionary, cols), }?; - self.to_arrow_impl(chunk, &selection) + self.to_arrow_impl(dictionary, &selection) } - pub fn schema(&self, chunk: &Chunk, selection: Selection<'_>) -> Result { + pub fn schema(&self, dictionary: &Dictionary, selection: Selection<'_>) -> Result { // translate chunk selection into name/indexes: let selection = match selection { - Selection::All => self.all_columns_selection(chunk), - Selection::Some(cols) => self.specific_columns_selection(chunk, cols), + Selection::All => self.all_columns_selection(dictionary), + Selection::Some(cols) => self.specific_columns_selection(dictionary, cols), }?; self.schema_impl(&selection) } @@ -379,7 +353,7 @@ impl Table { /// requested columns with index are tuples of column_name, column_index fn to_arrow_impl( &self, - chunk: &Chunk, + dictionary: &Dictionary, selection: &TableColSelection<'_>, ) -> Result { let mut columns = Vec::with_capacity(selection.cols.len()); @@ -408,12 +382,9 @@ impl Table { match v { None => builder.append_null(), Some(value_id) => { - let tag_value = chunk.dictionary.lookup_id(*value_id).context( - TagValueIdNotFoundInDictionary { - value: *value_id, - chunk: chunk.id, - }, - )?; + let tag_value = dictionary + .lookup_id(*value_id) + .context(TagValueIdNotFoundInDictionary { value: *value_id })?; builder.append_value(tag_value) } } @@ -468,12 +439,11 @@ impl Table { RecordBatch::try_new(schema, columns).context(ArrowError {}) } - pub fn stats(&self, chunk: &Chunk) -> Vec { + pub fn stats(&self, dictionary: &Dictionary) -> Vec { self.columns .iter() .map(|(column_id, c)| { - let column_name = chunk - .dictionary + let column_name = dictionary .lookup_id(*column_id) .expect("column name in dictionary"); @@ -520,13 +490,10 @@ mod tests { use internal_types::entry::test_helpers::lp_to_entry; use super::*; - use tracker::MemRegistry; #[test] fn table_size() { - let registry = Arc::new(MemRegistry::new()); - let mut chunk = Chunk::new(42, registry.as_ref()); - let dictionary = &mut chunk.dictionary; + let mut dictionary = Dictionary::new(); let mut table = Table::new(dictionary.lookup_value_or_insert("table_name")); let lp_lines = vec![ @@ -534,33 +501,31 @@ mod tests { "h2o,state=MA,city=Boston temp=72.4 250", ]; - write_lines_to_table(&mut table, dictionary, lp_lines.clone()); + write_lines_to_table(&mut table, &mut dictionary, lp_lines.clone()); assert_eq!(128, table.size()); // doesn't double because of the stats overhead - write_lines_to_table(&mut table, dictionary, lp_lines.clone()); + write_lines_to_table(&mut table, &mut dictionary, lp_lines.clone()); assert_eq!(224, table.size()); // now make sure it increased by the same amount minus stats overhead - write_lines_to_table(&mut table, dictionary, lp_lines); + write_lines_to_table(&mut table, &mut dictionary, lp_lines); assert_eq!(320, table.size()); } #[test] fn test_to_arrow_schema_all() { - let registry = Arc::new(MemRegistry::new()); - let mut chunk = Chunk::new(42, registry.as_ref()); - let dictionary = &mut chunk.dictionary; + let mut dictionary = Dictionary::new(); let mut table = Table::new(dictionary.lookup_value_or_insert("table_name")); let lp_lines = vec![ "h2o,state=MA,city=Boston float_field=70.4,int_field=8i,uint_field=42u,bool_field=t,string_field=\"foo\" 100", ]; - write_lines_to_table(&mut table, dictionary, lp_lines); + write_lines_to_table(&mut table, &mut dictionary, lp_lines); let selection = Selection::All; - let actual_schema = table.schema(&chunk, selection).unwrap(); + let actual_schema = table.schema(&dictionary, selection).unwrap(); let expected_schema = SchemaBuilder::new() .field("bool_field", ArrowDataType::Boolean) .tag("city") @@ -582,17 +547,15 @@ mod tests { #[test] fn test_to_arrow_schema_subset() { - let registry = Arc::new(MemRegistry::new()); - let mut chunk = Chunk::new(42, registry.as_ref()); - let dictionary = &mut chunk.dictionary; + let mut dictionary = Dictionary::new(); let mut table = Table::new(dictionary.lookup_value_or_insert("table_name")); let lp_lines = vec!["h2o,state=MA,city=Boston float_field=70.4 100"]; - write_lines_to_table(&mut table, dictionary, lp_lines); + write_lines_to_table(&mut table, &mut dictionary, lp_lines); let selection = Selection::Some(&["float_field"]); - let actual_schema = table.schema(&chunk, selection).unwrap(); + let actual_schema = table.schema(&dictionary, selection).unwrap(); let expected_schema = SchemaBuilder::new() .field("float_field", ArrowDataType::Float64) .build() From 59c35e5a4fdabc240fd12ee0cd107a0e53de64aa Mon Sep 17 00:00:00 2001 From: Marko Mikulicic Date: Tue, 13 Apr 2021 16:27:52 +0200 Subject: [PATCH 4/7] feat: Implement WriteBuffer gRPC --- .../influxdata/iox/write/v1/service.proto | 21 +++++- server/src/db.rs | 4 ++ server/src/lib.rs | 69 ++++++++++++++++++- src/influxdb_ioxd/rpc/error.rs | 5 ++ src/influxdb_ioxd/rpc/write.rs | 17 +++++ 5 files changed, 114 insertions(+), 2 deletions(-) diff --git a/generated_types/protos/influxdata/iox/write/v1/service.proto b/generated_types/protos/influxdata/iox/write/v1/service.proto index c0fa0a9cbf..17e3a03a96 100644 --- a/generated_types/protos/influxdata/iox/write/v1/service.proto +++ b/generated_types/protos/influxdata/iox/write/v1/service.proto @@ -4,7 +4,12 @@ package influxdata.iox.write.v1; service WriteService { // write data into a specific Database - rpc Write(WriteRequest) returns (WriteResponse); + rpc Write(WriteRequest) returns (WriteResponse) { + option deprecated = true; + }; + + // write an entry into a Database + rpc WriteEntry(WriteEntryRequest) returns (WriteEntryResponse); } message WriteRequest { @@ -21,3 +26,17 @@ message WriteResponse { // how many lines were parsed and written into the database uint64 lines_written = 1; } + + +message WriteEntryRequest { + // name of database into which to write + string db_name = 1; + + // entry, in serialized flatbuffers [Entry] format + // + // [Entry](https://github.com/influxdata/influxdb_iox/blob/main/generated_types/protos/influxdata/iox/write/v1/entry.fbs) + bytes entry = 2; +} + +message WriteEntryResponse { +} diff --git a/server/src/db.rs b/server/src/db.rs index 94d60de241..c66fff5df1 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -642,6 +642,10 @@ impl Db { ) .context(SequencedEntryError)?; + if self.rules.read().wal_buffer_config.is_some() { + todo!("route to the Write Buffer. TODO: carols10cents #1157") + } + self.store_sequenced_entry(sequenced_entry) } diff --git a/server/src/lib.rs b/server/src/lib.rs index fa25b149db..5d6ce4dca5 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -67,6 +67,7 @@ clippy::clone_on_ref_ptr )] +use std::convert::TryInto; use std::sync::Arc; use async_trait::async_trait; @@ -150,6 +151,10 @@ pub enum Error { WalError { source: buffer::Error }, #[snafu(display("error converting line protocol to flatbuffers: {}", source))] LineConversion { source: entry::Error }, + #[snafu(display("error decoding entry flatbuffers: {}", source))] + DecodingEntry { + source: flatbuffers::InvalidFlatbuffer, + }, } pub type Result = std::result::Result; @@ -368,6 +373,19 @@ impl Server { Ok(()) } + pub async fn write_entry(&self, db_name: &str, entry_bytes: Vec) -> Result<()> { + self.require_id()?; + + let db_name = DatabaseName::new(db_name).context(InvalidDatabaseName)?; + let db = self + .config + .db(&db_name) + .context(DatabaseNotFound { db_name: &*db_name })?; + + let entry = entry_bytes.try_into().context(DecodingEntry)?; + self.handle_write_entry(&db, entry).await + } + pub async fn handle_write_entry(&self, db: &Db, entry: Entry) -> Result<()> { db.store_entry(entry) .map_err(|e| Error::UnknownDatabaseError { @@ -674,7 +692,7 @@ mod tests { use tokio_util::sync::CancellationToken; use arrow_deps::{assert_table_eq, datafusion::physical_plan::collect}; - use data_types::database_rules::{PartitionTemplate, TemplatePart}; + use data_types::database_rules::{PartitionTemplate, TemplatePart, NO_SHARD_CONFIG}; use influxdb_line_protocol::parse_lines; use object_store::{memory::InMemory, path::ObjectStorePath}; use query::{frontend::sql::SQLQueryPlanner, Database}; @@ -867,6 +885,55 @@ mod tests { assert_table_eq!(expected, &batches); } + #[tokio::test] + async fn write_entry_local() { + let manager = TestConnectionManager::new(); + let store = Arc::new(ObjectStore::new_in_memory(InMemory::new())); + let server = Server::new(manager, store); + server.set_id(NonZeroU32::new(1).unwrap()).unwrap(); + + let name = DatabaseName::new("foo".to_string()).unwrap(); + server + .create_database( + DatabaseRules::new(name), + server.require_id().unwrap(), + Arc::clone(&server.store), + ) + .await + .unwrap(); + + let db_name = DatabaseName::new("foo").unwrap(); + let db = server.db(&db_name).unwrap(); + + let line = "cpu bar=1 10"; + let lines: Vec<_> = parse_lines(line).map(|l| l.unwrap()).collect(); + let sharded_entries = lines_to_sharded_entries(&lines, NO_SHARD_CONFIG, &*db.rules.read()) + .expect("sharded entries"); + + let entry = &sharded_entries[0].entry; + server + .write_entry("foo", entry.data().into()) + .await + .expect("write entry"); + + let planner = SQLQueryPlanner::default(); + let executor = server.executor(); + let physical_plan = planner + .query(db, "select * from cpu", executor.as_ref()) + .await + .unwrap(); + + let batches = collect(physical_plan).await.unwrap(); + let expected = vec![ + "+-----+------+", + "| bar | time |", + "+-----+------+", + "| 1 | 10 |", + "+-----+------+", + ]; + assert_table_eq!(expected, &batches); + } + #[tokio::test] async fn close_chunk() { test_helpers::maybe_start_logging(); diff --git a/src/influxdb_ioxd/rpc/error.rs b/src/influxdb_ioxd/rpc/error.rs index 20bdf63966..bd78779564 100644 --- a/src/influxdb_ioxd/rpc/error.rs +++ b/src/influxdb_ioxd/rpc/error.rs @@ -23,6 +23,11 @@ pub fn default_server_error_handler(error: server::Error) -> tonic::Status { description: source.to_string(), } .into(), + Error::DecodingEntry { source } => FieldViolation { + field: "entry".into(), + description: source.to_string(), + } + .into(), error => { error!(?error, "Unexpected error"); InternalError {}.into() diff --git a/src/influxdb_ioxd/rpc/write.rs b/src/influxdb_ioxd/rpc/write.rs index ff715620b7..a67e492dc5 100644 --- a/src/influxdb_ioxd/rpc/write.rs +++ b/src/influxdb_ioxd/rpc/write.rs @@ -47,6 +47,23 @@ where let lines_written = lp_line_count as u64; Ok(Response::new(WriteResponse { lines_written })) } + + async fn write_entry( + &self, + request: tonic::Request, + ) -> Result, tonic::Status> { + let request = request.into_inner(); + if request.entry.is_empty() { + return Err(FieldViolation::required("entry").into()); + } + + self.server + .write_entry(&request.db_name, request.entry) + .await + .map_err(default_server_error_handler)?; + + Ok(Response::new(WriteEntryResponse {})) + } } /// Instantiate the write service From 6d0467277b4df9a40222e85244cdeeb5f8f02e05 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 13 Apr 2021 17:56:36 -0400 Subject: [PATCH 5/7] docs: update multi core design doc (#1197) Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com> --- docs/multi_core_tasks.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/multi_core_tasks.md b/docs/multi_core_tasks.md index 57f3e0d148..605fada859 100644 --- a/docs/multi_core_tasks.md +++ b/docs/multi_core_tasks.md @@ -6,7 +6,7 @@ As discussed on https://github.com/influxdata/influxdb_iox/pull/221 and https:// 1. Use only async I/O via `tokio` for socket communication. It is ok to use either blocking (e.g. `std::fs::File`) or async APIs (e.g. `tokio::fs::File`) for local File I/O. -2. All CPU bound tasks should be scheduled on the separate application level `thread_pool` not with `tokio::task::spawn` nor `tokio::task::spawn_blocking` nor a new threadpool. +2. All CPU bound tasks should be scheduled on the separate application level `thread_pool` (which can be another tokio executor but should be separate from the executor that handles I/O). We will work, over time, to migrate the rest of the codebase to use these patterns. @@ -41,11 +41,11 @@ It is ok to use either blocking (e.g. `std::fs::File`) or async APIs for local This can not always be done (e.g. with a library such as parquet writer which is not `async`). In such cases, using `tokio::task::spawn_blocking` should be used to perform the file I/O. -### All CPU heavy work should be done on the single app level worker pool, separate from the tokio runtime +### All CPU heavy work should be done on the single app level worker pool, separate from the tokio runtime handling IO -**What**: All CPU heavy work should be done on the single app level worker pool. We provide a `thread_pool` interface that interacts nicely with async tasks (e.g. that allows an async task to `await` for a CPU heavy task to complete). +**What**: All CPU heavy work should be done on the app level worker pool. We provide a `thread_pool` interface that interacts nicely with async tasks (e.g. that allows an async task to `await` for a CPU heavy task to complete). -**Rationale**: A single app level worker pool gives us a single place to control work priority, eventually, so that tasks such as compaction of large data files can have lower precedence than incoming queries. By using a different pool than the tokio runtime, with a limited number of threads, we avoid over-saturating the CPU with OS threads and thereby starving the limited number tokio I/O threads. A separate, single app level pool also limits the number of underlying OS CPU threads which are spawned, even under heavy load, keeping thread context switching overhead low. +**Rationale**: A single app level worker pool gives us a single place to control work priority, eventually, so that tasks such as compaction of large data files can have lower precedence than incoming queries. By using a different pool than the main tokio runtime, with a limited number of threads, we avoid over-saturating the CPU with OS threads and thereby starving the limited number tokio I/O threads. A separate, single app level pool also limits the number of underlying OS CPU threads which are spawned, even under heavy load, keeping thread context switching overhead low. There will, of course, always be a judgment call to be made of where "CPU bound work" starts and "work acceptable for I/O processing" ends. A reasonable rule of thumb is if a job will *always* be completed in less than 100ms then that is probably fine for an I/O thread). This number may be revised as we tune the system. From 150ed4e1d9864bd548a2274f780d24c9494905f6 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 13 Apr 2021 18:17:19 -0400 Subject: [PATCH 6/7] refactor: Remove async from `InfluxRPCPlanner` (#1200) * refactor: Remove async from InfluxRPCPlanner * fix: make it compile Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com> --- query/src/frontend/influxrpc.rs | 104 +++++++----------- .../query_tests/influxrpc/field_columns.rs | 2 - .../src/query_tests/influxrpc/read_filter.rs | 1 - .../src/query_tests/influxrpc/read_group.rs | 1 - .../influxrpc/read_window_aggregate.rs | 1 - .../src/query_tests/influxrpc/table_names.rs | 1 - server/src/query_tests/influxrpc/tag_keys.rs | 1 - .../src/query_tests/influxrpc/tag_values.rs | 3 +- src/influxdb_ioxd/rpc/storage/service.rs | 13 +-- 9 files changed, 45 insertions(+), 82 deletions(-) diff --git a/query/src/frontend/influxrpc.rs b/query/src/frontend/influxrpc.rs index 01aaa61d0f..cf6132ab9d 100644 --- a/query/src/frontend/influxrpc.rs +++ b/query/src/frontend/influxrpc.rs @@ -195,13 +195,13 @@ impl InfluxRPCPlanner { /// Returns a plan that lists the names of tables in this /// database that have at least one row that matches the /// conditions listed on `predicate` - pub async fn table_names(&self, database: &D, predicate: Predicate) -> Result + pub fn table_names(&self, database: &D, predicate: Predicate) -> Result where D: Database + 'static, { let mut builder = StringSetPlanBuilder::new(); - for chunk in self.filtered_chunks(database, &predicate).await? { + for chunk in self.filtered_chunks(database, &predicate)? { let new_table_names = chunk .table_names(&predicate, builder.known_strings()) .map_err(|e| Box::new(e) as _) @@ -227,7 +227,7 @@ impl InfluxRPCPlanner { /// columns (as defined in the InfluxDB Data model) names in this /// database that have more than zero rows which pass the /// conditions specified by `predicate`. - pub async fn tag_keys(&self, database: &D, predicate: Predicate) -> Result + pub fn tag_keys(&self, database: &D, predicate: Predicate) -> Result where D: Database + 'static, { @@ -246,9 +246,9 @@ impl InfluxRPCPlanner { let mut need_full_plans = BTreeMap::new(); let mut known_columns = BTreeSet::new(); - for chunk in self.filtered_chunks(database, &predicate).await? { + for chunk in self.filtered_chunks(database, &predicate)? { // try and get the table names that have rows that match the predicate - let table_names = self.chunk_table_names(chunk.as_ref(), &predicate).await?; + let table_names = self.chunk_table_names(chunk.as_ref(), &predicate)?; for table_name in table_names { debug!( @@ -308,7 +308,7 @@ impl InfluxRPCPlanner { // were already known to have data (based on the contents of known_columns) for (table_name, chunks) in need_full_plans.into_iter() { - let plan = self.tag_keys_plan(&table_name, &predicate, chunks).await?; + let plan = self.tag_keys_plan(&table_name, &predicate, chunks)?; if let Some(plan) = plan { builder = builder.append(plan) @@ -326,7 +326,7 @@ impl InfluxRPCPlanner { /// Returns a plan which finds the distinct, non-null tag values /// in the specified `tag_name` column of this database which pass /// the conditions specified by `predicate`. - pub async fn tag_values( + pub fn tag_values( &self, database: &D, tag_name: &str, @@ -351,8 +351,8 @@ impl InfluxRPCPlanner { let mut need_full_plans = BTreeMap::new(); let mut known_values = BTreeSet::new(); - for chunk in self.filtered_chunks(database, &predicate).await? { - let table_names = self.chunk_table_names(chunk.as_ref(), &predicate).await?; + for chunk in self.filtered_chunks(database, &predicate)? { + let table_names = self.chunk_table_names(chunk.as_ref(), &predicate)?; for table_name in table_names { debug!( @@ -426,9 +426,7 @@ impl InfluxRPCPlanner { // time in `known_columns`, and some tables in chunks that we // need to run a plan to find what values pass the predicate. for (table_name, chunks) in need_full_plans.into_iter() { - let scan_and_filter = self - .scan_and_filter(&table_name, &predicate, chunks) - .await?; + let scan_and_filter = self.scan_and_filter(&table_name, &predicate, chunks)?; // if we have any data to scan, make a plan! if let Some(TableScanAndFilter { @@ -471,11 +469,7 @@ impl InfluxRPCPlanner { /// datatypes (as defined in the data written via `write_lines`), /// and which have more than zero rows which pass the conditions /// specified by `predicate`. - pub async fn field_columns( - &self, - database: &D, - predicate: Predicate, - ) -> Result + pub fn field_columns(&self, database: &D, predicate: Predicate) -> Result where D: Database + 'static, { @@ -488,15 +482,12 @@ impl InfluxRPCPlanner { // values and stops the plan executing once it has them // map table -> Vec> - let chunks = self.filtered_chunks(database, &predicate).await?; - let table_chunks = self.group_chunks_by_table(&predicate, chunks).await?; + let chunks = self.filtered_chunks(database, &predicate)?; + let table_chunks = self.group_chunks_by_table(&predicate, chunks)?; let mut field_list_plan = FieldListPlan::new(); for (table_name, chunks) in table_chunks { - if let Some(plan) = self - .field_columns_plan(&table_name, &predicate, chunks) - .await? - { + if let Some(plan) = self.field_columns_plan(&table_name, &predicate, chunks)? { field_list_plan = field_list_plan.append(plan); } } @@ -523,7 +514,7 @@ impl InfluxRPCPlanner { /// rows for a particular series (groups where all tags are the /// same) occur together in the plan - pub async fn read_filter(&self, database: &D, predicate: Predicate) -> Result + pub fn read_filter(&self, database: &D, predicate: Predicate) -> Result where D: Database + 'static, { @@ -531,17 +522,15 @@ impl InfluxRPCPlanner { // group tables by chunk, pruning if possible // key is table name, values are chunks - let chunks = self.filtered_chunks(database, &predicate).await?; - let table_chunks = self.group_chunks_by_table(&predicate, chunks).await?; + let chunks = self.filtered_chunks(database, &predicate)?; + let table_chunks = self.group_chunks_by_table(&predicate, chunks)?; // now, build up plans for each table let mut ss_plans = Vec::with_capacity(table_chunks.len()); for (table_name, chunks) in table_chunks { let prefix_columns: Option<&[&str]> = None; - let ss_plan = self - .read_filter_plan(table_name, prefix_columns, &predicate, chunks) - .await?; + let ss_plan = self.read_filter_plan(table_name, prefix_columns, &predicate, chunks)?; // If we have to do real work, add it to the list of plans if let Some(ss_plan) = ss_plan { ss_plans.push(ss_plan); @@ -555,7 +544,7 @@ impl InfluxRPCPlanner { /// with rows grouped by an aggregate function. Note that we still /// group by all tags (so group within series) and the /// group_columns define the order of the result - pub async fn read_group( + pub fn read_group( &self, database: &D, predicate: Predicate, @@ -568,8 +557,8 @@ impl InfluxRPCPlanner { debug!(predicate=?predicate, agg=?agg, "planning read_group"); // group tables by chunk, pruning if possible - let chunks = self.filtered_chunks(database, &predicate).await?; - let table_chunks = self.group_chunks_by_table(&predicate, chunks).await?; + let chunks = self.filtered_chunks(database, &predicate)?; + let table_chunks = self.group_chunks_by_table(&predicate, chunks)?; let num_prefix_tag_group_columns = group_columns.len(); // now, build up plans for each table @@ -577,13 +566,9 @@ impl InfluxRPCPlanner { for (table_name, chunks) in table_chunks { let ss_plan = match agg { Aggregate::None => { - self.read_filter_plan(table_name, Some(group_columns), &predicate, chunks) - .await? - } - _ => { - self.read_group_plan(table_name, &predicate, agg, group_columns, chunks) - .await? + self.read_filter_plan(table_name, Some(group_columns), &predicate, chunks)? } + _ => self.read_group_plan(table_name, &predicate, agg, group_columns, chunks)?, }; // If we have to do real work, add it to the list of plans @@ -598,7 +583,7 @@ impl InfluxRPCPlanner { /// Creates a GroupedSeriesSet plan that produces an output table with rows /// that are grouped by window defintions - pub async fn read_window_aggregate( + pub fn read_window_aggregate( &self, database: &D, predicate: Predicate, @@ -612,15 +597,14 @@ impl InfluxRPCPlanner { debug!(predicate=?predicate, "planning read_window_aggregate"); // group tables by chunk, pruning if possible - let chunks = self.filtered_chunks(database, &predicate).await?; - let table_chunks = self.group_chunks_by_table(&predicate, chunks).await?; + let chunks = self.filtered_chunks(database, &predicate)?; + let table_chunks = self.group_chunks_by_table(&predicate, chunks)?; // now, build up plans for each table let mut ss_plans = Vec::with_capacity(table_chunks.len()); for (table_name, chunks) in table_chunks { let ss_plan = self - .read_window_aggregate_plan(table_name, &predicate, agg, &every, &offset, chunks) - .await?; + .read_window_aggregate_plan(table_name, &predicate, agg, &every, &offset, chunks)?; // If we have to do real work, add it to the list of plans if let Some(ss_plan) = ss_plan { ss_plans.push(ss_plan); @@ -631,7 +615,7 @@ impl InfluxRPCPlanner { } /// Creates a map of table_name --> Chunks that have that table - async fn group_chunks_by_table( + fn group_chunks_by_table( &self, predicate: &Predicate, chunks: Vec>, @@ -641,7 +625,7 @@ impl InfluxRPCPlanner { { let mut table_chunks = BTreeMap::new(); for chunk in chunks { - let table_names = self.chunk_table_names(chunk.as_ref(), &predicate).await?; + let table_names = self.chunk_table_names(chunk.as_ref(), &predicate)?; for table_name in table_names { table_chunks .entry(table_name) @@ -653,11 +637,7 @@ impl InfluxRPCPlanner { } /// Find all the table names in the specified chunk that pass the predicate - async fn chunk_table_names( - &self, - chunk: &C, - predicate: &Predicate, - ) -> Result> + fn chunk_table_names(&self, chunk: &C, predicate: &Predicate) -> Result> where C: PartitionChunk + 'static, { @@ -705,7 +685,7 @@ impl InfluxRPCPlanner { /// Filter(predicate) /// TableScan (of chunks) /// ``` - async fn tag_keys_plan( + fn tag_keys_plan( &self, table_name: &str, predicate: &Predicate, @@ -714,7 +694,7 @@ impl InfluxRPCPlanner { where C: PartitionChunk + 'static, { - let scan_and_filter = self.scan_and_filter(table_name, predicate, chunks).await?; + let scan_and_filter = self.scan_and_filter(table_name, predicate, chunks)?; let TableScanAndFilter { plan_builder, @@ -767,7 +747,7 @@ impl InfluxRPCPlanner { /// Filter(predicate) [optional] /// Scan /// ``` - async fn field_columns_plan( + fn field_columns_plan( &self, table_name: &str, predicate: &Predicate, @@ -776,7 +756,7 @@ impl InfluxRPCPlanner { where C: PartitionChunk + 'static, { - let scan_and_filter = self.scan_and_filter(table_name, predicate, chunks).await?; + let scan_and_filter = self.scan_and_filter(table_name, predicate, chunks)?; let TableScanAndFilter { plan_builder, schema, @@ -817,7 +797,7 @@ impl InfluxRPCPlanner { /// Order by (tag_columns, timestamp_column) /// Filter(predicate) /// Scan - async fn read_filter_plan( + fn read_filter_plan( &self, table_name: impl Into, prefix_columns: Option<&[impl AsRef]>, @@ -828,7 +808,7 @@ impl InfluxRPCPlanner { C: PartitionChunk + 'static, { let table_name = table_name.into(); - let scan_and_filter = self.scan_and_filter(&table_name, predicate, chunks).await?; + let scan_and_filter = self.scan_and_filter(&table_name, predicate, chunks)?; let TableScanAndFilter { plan_builder, @@ -937,7 +917,7 @@ impl InfluxRPCPlanner { /// GroupBy(gby cols, aggs, time cols) /// Filter(predicate) /// Scan - pub async fn read_group_plan( + pub fn read_group_plan( &self, table_name: impl Into, predicate: &Predicate, @@ -949,7 +929,7 @@ impl InfluxRPCPlanner { C: PartitionChunk + 'static, { let table_name = table_name.into(); - let scan_and_filter = self.scan_and_filter(&table_name, predicate, chunks).await?; + let scan_and_filter = self.scan_and_filter(&table_name, predicate, chunks)?; let TableScanAndFilter { plan_builder, @@ -1027,7 +1007,7 @@ impl InfluxRPCPlanner { /// GroupBy(gby: tag columns, window_function; agg: aggregate(field) /// Filter(predicate) /// Scan - pub async fn read_window_aggregate_plan( + pub fn read_window_aggregate_plan( &self, table_name: impl Into, predicate: &Predicate, @@ -1040,7 +1020,7 @@ impl InfluxRPCPlanner { C: PartitionChunk + 'static, { let table_name = table_name.into(); - let scan_and_filter = self.scan_and_filter(&table_name, predicate, chunks).await?; + let scan_and_filter = self.scan_and_filter(&table_name, predicate, chunks)?; let TableScanAndFilter { plan_builder, @@ -1114,7 +1094,7 @@ impl InfluxRPCPlanner { /// Filter(predicate) [optional] /// Scan /// ``` - async fn scan_and_filter( + fn scan_and_filter( &self, table_name: &str, predicate: &Predicate, @@ -1190,7 +1170,7 @@ impl InfluxRPCPlanner { /// Returns a list of chunks across all partitions which may /// contain data that pass the predicate - async fn filtered_chunks( + fn filtered_chunks( &self, database: &D, predicate: &Predicate, diff --git a/server/src/query_tests/influxrpc/field_columns.rs b/server/src/query_tests/influxrpc/field_columns.rs index dc6f27df34..f0751375b6 100644 --- a/server/src/query_tests/influxrpc/field_columns.rs +++ b/server/src/query_tests/influxrpc/field_columns.rs @@ -35,7 +35,6 @@ macro_rules! run_field_columns_test_case { let plan = planner .field_columns(&db, predicate.clone()) - .await .expect("built plan successfully"); let fields = executor .to_field_list(plan) @@ -137,7 +136,6 @@ async fn test_field_name_plan() { let plan = planner .field_columns(&db, predicate.clone()) - .await .expect("built plan successfully"); let mut plans = plan.plans; diff --git a/server/src/query_tests/influxrpc/read_filter.rs b/server/src/query_tests/influxrpc/read_filter.rs index b3831d39a4..b11b10f3e8 100644 --- a/server/src/query_tests/influxrpc/read_filter.rs +++ b/server/src/query_tests/influxrpc/read_filter.rs @@ -50,7 +50,6 @@ macro_rules! run_read_filter_test_case { let plan = planner .read_filter(&db, predicate.clone()) - .await .expect("built plan successfully"); let string_results = run_series_set_plan(executor, plan).await; diff --git a/server/src/query_tests/influxrpc/read_group.rs b/server/src/query_tests/influxrpc/read_group.rs index a614180183..aac4f408ee 100644 --- a/server/src/query_tests/influxrpc/read_group.rs +++ b/server/src/query_tests/influxrpc/read_group.rs @@ -30,7 +30,6 @@ macro_rules! run_read_group_test_case { let plans = planner .read_group(&db, predicate.clone(), agg, &group_columns) - .await .expect("built plan successfully"); let plans = plans.into_inner(); diff --git a/server/src/query_tests/influxrpc/read_window_aggregate.rs b/server/src/query_tests/influxrpc/read_window_aggregate.rs index 8d3b728176..42e64d219d 100644 --- a/server/src/query_tests/influxrpc/read_window_aggregate.rs +++ b/server/src/query_tests/influxrpc/read_window_aggregate.rs @@ -34,7 +34,6 @@ macro_rules! run_read_window_aggregate_test_case { let plans = planner .read_window_aggregate(&db, predicate.clone(), agg, every.clone(), offset.clone()) - .await .expect("built plan successfully"); let plans = plans.into_inner(); diff --git a/server/src/query_tests/influxrpc/table_names.rs b/server/src/query_tests/influxrpc/table_names.rs index 7ef7d041ef..da106479f8 100644 --- a/server/src/query_tests/influxrpc/table_names.rs +++ b/server/src/query_tests/influxrpc/table_names.rs @@ -27,7 +27,6 @@ macro_rules! run_table_names_test_case { let plan = planner .table_names(&db, predicate.clone()) - .await .expect("built plan successfully"); let names = executor .to_string_set(plan) diff --git a/server/src/query_tests/influxrpc/tag_keys.rs b/server/src/query_tests/influxrpc/tag_keys.rs index 39c940bf87..30e93f3117 100644 --- a/server/src/query_tests/influxrpc/tag_keys.rs +++ b/server/src/query_tests/influxrpc/tag_keys.rs @@ -31,7 +31,6 @@ macro_rules! run_tag_keys_test_case { let plan = planner .tag_keys(&db, predicate.clone()) - .await .expect("built plan successfully"); let names = executor .to_string_set(plan) diff --git a/server/src/query_tests/influxrpc/tag_values.rs b/server/src/query_tests/influxrpc/tag_values.rs index 484dc41ab3..09a08ec163 100644 --- a/server/src/query_tests/influxrpc/tag_values.rs +++ b/server/src/query_tests/influxrpc/tag_values.rs @@ -29,7 +29,6 @@ macro_rules! run_tag_values_test_case { let plan = planner .tag_values(&db, &tag_name, predicate.clone()) - .await .expect("built plan successfully"); let names = executor .to_string_set(plan) @@ -239,7 +238,7 @@ async fn list_tag_values_field_col() { // Test: temp is a field, not a tag let tag_name = "temp"; - let plan_result = planner.tag_values(&db, &tag_name, predicate.clone()).await; + let plan_result = planner.tag_values(&db, &tag_name, predicate.clone()); assert_eq!( plan_result.unwrap_err().to_string(), diff --git a/src/influxdb_ioxd/rpc/storage/service.rs b/src/influxdb_ioxd/rpc/storage/service.rs index 94ea311961..bf317869dd 100644 --- a/src/influxdb_ioxd/rpc/storage/service.rs +++ b/src/influxdb_ioxd/rpc/storage/service.rs @@ -714,7 +714,6 @@ where let plan = planner .table_names(db.as_ref(), predicate) - .await .map_err(|e| Box::new(e) as _) .context(ListingTables { db_name })?; let executor = db_store.executor(); @@ -765,7 +764,6 @@ where let tag_key_plan = planner .tag_keys(db.as_ref(), predicate) - .await .map_err(|e| Box::new(e) as _) .context(ListingColumns { db_name: db_name.as_str(), @@ -825,7 +823,6 @@ where let tag_value_plan = planner .tag_values(db.as_ref(), tag_name, predicate) - .await .map_err(|e| Box::new(e) as _) .context(ListingTagValues { db_name, tag_name })?; @@ -882,7 +879,6 @@ where let series_plan = planner .read_filter(db.as_ref(), predicate) - .await .map_err(|e| Box::new(e) as _) .context(PlanningFilteringSeries { db_name })?; @@ -968,14 +964,10 @@ where let grouped_series_set_plan = match gby_agg { GroupByAndAggregate::Columns { agg, group_columns } => { - planner - .read_group(db.as_ref(), predicate, agg, &group_columns) - .await + planner.read_group(db.as_ref(), predicate, agg, &group_columns) } GroupByAndAggregate::Window { agg, every, offset } => { - planner - .read_window_aggregate(db.as_ref(), predicate, agg, every, offset) - .await + planner.read_window_aggregate(db.as_ref(), predicate, agg, every, offset) } }; let grouped_series_set_plan = grouped_series_set_plan @@ -1039,7 +1031,6 @@ where let field_list_plan = planner .field_columns(db.as_ref(), predicate) - .await .map_err(|e| Box::new(e) as _) .context(ListingFields { db_name })?; From c9cbc7485759dfd2e630b1252e0f91c707595b62 Mon Sep 17 00:00:00 2001 From: Marko Mikulicic Date: Tue, 13 Apr 2021 15:10:39 +0200 Subject: [PATCH 7/7] feat: Use a DID newtype instead of u32 for dictionary ids Rationale --------- We use `u32` throughout the codebase to reference for interned dictionary strings. We also use `u32` for other reasons and it would be nice to get some help from the compiler to avoid mixing them up --- mutable_buffer/src/chunk.rs | 10 +++--- mutable_buffer/src/column.rs | 6 ++-- mutable_buffer/src/dictionary.rs | 46 ++++++++++++++++-------- mutable_buffer/src/table.rs | 36 +++++++++---------- server/src/query_tests/sql.rs | 2 +- tests/end_to_end_cases/management_api.rs | 6 ++-- tests/end_to_end_cases/management_cli.rs | 2 +- 7 files changed, 63 insertions(+), 45 deletions(-) diff --git a/mutable_buffer/src/chunk.rs b/mutable_buffer/src/chunk.rs index b2711cb7f1..d7cc928123 100644 --- a/mutable_buffer/src/chunk.rs +++ b/mutable_buffer/src/chunk.rs @@ -15,7 +15,7 @@ use tracker::{MemRegistry, MemTracker}; use crate::chunk::snapshot::ChunkSnapshot; use crate::{ - dictionary::{Dictionary, Error as DictionaryError}, + dictionary::{Dictionary, Error as DictionaryError, DID}, table::Table, }; use parking_lot::Mutex; @@ -37,11 +37,11 @@ pub enum Error { }, #[snafu(display("Table {} not found in chunk {}", table, chunk))] - TableNotFoundInChunk { table: u32, chunk: u64 }, + TableNotFoundInChunk { table: DID, chunk: u64 }, #[snafu(display("Column ID {} not found in dictionary of chunk {}", column_id, chunk))] ColumnIdNotFoundInDictionary { - column_id: u32, + column_id: DID, chunk: u64, source: DictionaryError, }, @@ -65,14 +65,14 @@ pub struct Chunk { /// The id for this chunk id: u32, - /// `dictionary` maps &str -> u32. The u32s are used in place of String or + /// `dictionary` maps &str -> DID. The DIDs are used in place of String or /// str to avoid slow string operations. The same dictionary is used for /// table names, tag names, tag values, and column names. // TODO: intern string field values too? dictionary: Dictionary, /// map of the dictionary ID for the table name to the table - tables: HashMap, + tables: HashMap, /// keep track of memory used by chunk tracker: MemTracker, diff --git a/mutable_buffer/src/column.rs b/mutable_buffer/src/column.rs index dfe9617d2b..df6f088235 100644 --- a/mutable_buffer/src/column.rs +++ b/mutable_buffer/src/column.rs @@ -1,6 +1,6 @@ use snafu::Snafu; -use crate::dictionary::Dictionary; +use crate::dictionary::{Dictionary, DID}; use data_types::partition_metadata::StatValues; use generated_types::entry::LogicalColumnType; use internal_types::entry::TypedValuesIterator; @@ -36,7 +36,7 @@ pub enum Column { U64(Vec>, StatValues), String(Vec>, StatValues), Bool(Vec>, StatValues), - Tag(Vec>, StatValues), + Tag(Vec>, StatValues), } impl Column { @@ -356,7 +356,7 @@ impl Column { mem::size_of::>() * v.len() + mem::size_of_val(&stats) } Self::Tag(v, stats) => { - mem::size_of::>() * v.len() + mem::size_of_val(&stats) + mem::size_of::>() * v.len() + mem::size_of_val(&stats) } Self::String(v, stats) => { let string_bytes_size = v diff --git a/mutable_buffer/src/dictionary.rs b/mutable_buffer/src/dictionary.rs index bd5f68fab6..b0920d5c5d 100644 --- a/mutable_buffer/src/dictionary.rs +++ b/mutable_buffer/src/dictionary.rs @@ -8,7 +8,7 @@ use string_interner::{ #[derive(Debug, Snafu)] pub enum Error { #[snafu(display("Dictionary lookup error on id {}", id))] - DictionaryIdLookupError { id: u32 }, + DictionaryIdLookupError { id: DID }, #[snafu(display("Dictionary lookup error for value {}", value))] DictionaryValueLookupError { value: String }, @@ -16,6 +16,30 @@ pub enum Error { pub type Result = std::result::Result; +/// A "dictionary ID" (DID) is a compact numeric representation of an interned +/// string in the dictionary. The same string always maps the same DID. DIDs can +/// be compared, hashed and cheaply copied around, just like small integers. +#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)] +pub struct DID(DefaultSymbol); + +impl DID { + fn new(s: DefaultSymbol) -> Self { + Self(s) + } +} + +impl From for DefaultSymbol { + fn from(id: DID) -> Self { + id.0 + } +} + +impl std::fmt::Display for DID { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.0.to_usize()) + } +} + #[derive(Debug, Clone)] pub struct Dictionary { interner: StringInterner, DefaultHashBuilder>, @@ -39,43 +63,37 @@ impl Dictionary { /// Returns the id corresponding to value, adding an entry for the /// id if it is not yet present in the dictionary. - pub fn lookup_value_or_insert(&mut self, value: &str) -> u32 { + pub fn lookup_value_or_insert(&mut self, value: &str) -> DID { self.id(value).unwrap_or_else(|| { self.size += value.len(); self.size += std::mem::size_of::(); - symbol_to_u32(self.interner.get_or_intern(value)) + DID::new(self.interner.get_or_intern(value)) }) } /// Returns the ID in self.dictionary that corresponds to `value`, if any. /// Returns an error if no such value is found. Does not add the value /// to the dictionary. - pub fn lookup_value(&self, value: &str) -> Result { + pub fn lookup_value(&self, value: &str) -> Result { self.id(value).context(DictionaryValueLookupError { value }) } /// Returns the ID in self.dictionary that corresponds to `value`, /// if any. No error is returned to avoid an allocation when no value is /// present - pub fn id(&self, value: &str) -> Option { - self.interner.get(value).map(symbol_to_u32) + pub fn id(&self, value: &str) -> Option { + self.interner.get(value).map(DID::new) } /// Returns the str in self.dictionary that corresponds to `id`, /// if any. Returns an error if no such id is found - pub fn lookup_id(&self, id: u32) -> Result<&str> { - let symbol = - Symbol::try_from_usize(id as usize).expect("to be able to convert u32 to symbol"); + pub fn lookup_id(&self, id: DID) -> Result<&str> { self.interner - .resolve(symbol) + .resolve(id.into()) .context(DictionaryIdLookupError { id }) } } -fn symbol_to_u32(sym: DefaultSymbol) -> u32 { - sym.to_usize() as u32 -} - #[cfg(test)] mod test { use crate::dictionary::Dictionary; diff --git a/mutable_buffer/src/table.rs b/mutable_buffer/src/table.rs index 687b5dcffa..3ca97766b5 100644 --- a/mutable_buffer/src/table.rs +++ b/mutable_buffer/src/table.rs @@ -3,7 +3,7 @@ use std::{cmp, collections::BTreeMap, sync::Arc}; use crate::{ column, column::Column, - dictionary::{Dictionary, Error as DictionaryError}, + dictionary::{Dictionary, Error as DictionaryError, DID}, }; use data_types::{ database_rules::WriterId, @@ -31,7 +31,7 @@ use arrow_deps::{ #[derive(Debug, Snafu)] pub enum Error { #[snafu(display("Tag value ID {} not found in dictionary of chunk", value))] - TagValueIdNotFoundInDictionary { value: u32, source: DictionaryError }, + TagValueIdNotFoundInDictionary { value: DID, source: DictionaryError }, #[snafu(display("Column error on column {}: {}", column, source))] ColumnError { @@ -46,7 +46,7 @@ pub enum Error { actual_column_type ))] InternalColumnTypeMismatch { - column_id: u32, + column_id: DID, expected_column_type: String, actual_column_type: String, }, @@ -59,7 +59,7 @@ pub enum Error { #[snafu(display("Internal: Column id '{}' not found in dictionary", column_id,))] ColumnIdNotFoundInDictionary { - column_id: u32, + column_id: DID, source: DictionaryError, }, @@ -76,22 +76,22 @@ pub enum Error { column_name, column_id ))] - InternalNoColumnInIndex { column_name: String, column_id: u32 }, + InternalNoColumnInIndex { column_name: String, column_id: DID }, #[snafu(display("Error creating column from wal for column {}: {}", column, source))] CreatingFromWal { - column: u32, + column: DID, source: crate::column::Error, }, #[snafu(display("Error evaluating column predicate for column {}: {}", column, source))] ColumnPredicateEvaluation { - column: u32, + column: DID, source: crate::column::Error, }, #[snafu(display("Row insert to table {} missing column name", table))] - ColumnNameNotInRow { table: u32 }, + ColumnNameNotInRow { table: DID }, #[snafu(display( "Group column '{}' not found in tag columns: {}", @@ -107,21 +107,21 @@ pub enum Error { DuplicateGroupColumn { column_name: String }, #[snafu(display("Column {} not found in table {}", id, table_id))] - ColumnIdNotFound { id: u32, table_id: u32 }, + ColumnIdNotFound { id: DID, table_id: DID }, } pub type Result = std::result::Result; #[derive(Debug, Clone)] pub struct Table { - /// Name of the table as a u32 in the chunk dictionary - pub id: u32, + /// Name of the table as a DID in the chunk dictionary + pub id: DID, /// Map of column id from the chunk dictionary to the column - pub columns: BTreeMap, + pub columns: BTreeMap, } impl Table { - pub fn new(id: u32) -> Self { + pub fn new(id: DID) -> Self { Self { id, columns: BTreeMap::new(), @@ -144,7 +144,7 @@ impl Table { } /// Returns a reference to the specified column - pub(crate) fn column(&self, column_id: u32) -> Result<&Column> { + pub(crate) fn column(&self, column_id: DID) -> Result<&Column> { self.columns.get(&column_id).context(ColumnIdNotFound { id: column_id, table_id: self.id, @@ -468,7 +468,7 @@ impl Table { struct ColSelection<'a> { column_name: &'a str, - column_id: u32, + column_id: DID, } /// Represets a set of column_name, column_index pairs @@ -502,15 +502,15 @@ mod tests { ]; write_lines_to_table(&mut table, &mut dictionary, lp_lines.clone()); - assert_eq!(128, table.size()); + assert_eq!(112, table.size()); // doesn't double because of the stats overhead write_lines_to_table(&mut table, &mut dictionary, lp_lines.clone()); - assert_eq!(224, table.size()); + assert_eq!(192, table.size()); // now make sure it increased by the same amount minus stats overhead write_lines_to_table(&mut table, &mut dictionary, lp_lines); - assert_eq!(320, table.size()); + assert_eq!(272, table.size()); } #[test] diff --git a/server/src/query_tests/sql.rs b/server/src/query_tests/sql.rs index dfe3dc8f5e..eaad832418 100644 --- a/server/src/query_tests/sql.rs +++ b/server/src/query_tests/sql.rs @@ -278,7 +278,7 @@ async fn sql_select_from_system_tables() { "+----+---------------+-------------------+-----------------+", "| id | partition_key | storage | estimated_bytes |", "+----+---------------+-------------------+-----------------+", - "| 0 | 1970-01-01T00 | OpenMutableBuffer | 493 |", + "| 0 | 1970-01-01T00 | OpenMutableBuffer | 453 |", "+----+---------------+-------------------+-----------------+", ]; run_sql_test_case!( diff --git a/tests/end_to_end_cases/management_api.rs b/tests/end_to_end_cases/management_api.rs index 0ea2b48b14..6a9b699342 100644 --- a/tests/end_to_end_cases/management_api.rs +++ b/tests/end_to_end_cases/management_api.rs @@ -277,7 +277,7 @@ async fn test_chunk_get() { partition_key: "cpu".into(), id: 0, storage: ChunkStorage::OpenMutableBuffer as i32, - estimated_bytes: 145, + estimated_bytes: 137, time_of_first_write: None, time_of_last_write: None, time_closing: None, @@ -286,7 +286,7 @@ async fn test_chunk_get() { partition_key: "disk".into(), id: 0, storage: ChunkStorage::OpenMutableBuffer as i32, - estimated_bytes: 107, + estimated_bytes: 103, time_of_first_write: None, time_of_last_write: None, time_closing: None, @@ -452,7 +452,7 @@ async fn test_list_partition_chunks() { partition_key: "cpu".into(), id: 0, storage: ChunkStorage::OpenMutableBuffer as i32, - estimated_bytes: 145, + estimated_bytes: 137, time_of_first_write: None, time_of_last_write: None, time_closing: None, diff --git a/tests/end_to_end_cases/management_cli.rs b/tests/end_to_end_cases/management_cli.rs index 973bf98bc2..765ddb0132 100644 --- a/tests/end_to_end_cases/management_cli.rs +++ b/tests/end_to_end_cases/management_cli.rs @@ -191,7 +191,7 @@ async fn test_get_chunks() { .and(predicate::str::contains( r#""storage": "OpenMutableBuffer","#, )) - .and(predicate::str::contains(r#""estimated_bytes": 145"#)) + .and(predicate::str::contains(r#""estimated_bytes": 137"#)) // Check for a non empty timestamp such as // "time_of_first_write": "2021-03-30T17:11:10.723866Z", .and(predicate::str::contains(r#""time_of_first_write": "20"#));