Merge branch 'main' into ntran/query_local_parquet

pull/24376/head
kodiakhq[bot] 2021-04-13 22:38:56 +00:00 committed by GitHub
commit 8e0ee48018
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
29 changed files with 331 additions and 1404 deletions

76
Cargo.lock generated
View File

@ -111,7 +111,7 @@ checksum = "23b62fc65de8e4e7f52534fb52b0f3ed04746ae267519eef2a83941e8085068b"
[[package]]
name = "arrow"
version = "4.0.0-SNAPSHOT"
source = "git+https://github.com/apache/arrow.git?rev=e69478a890b1e4eee49b540b69b2711d170a0433#e69478a890b1e4eee49b540b69b2711d170a0433"
source = "git+https://github.com/apache/arrow.git?rev=00a443629c00079ea03c0b9f415d74669d2759a7#00a443629c00079ea03c0b9f415d74669d2759a7"
dependencies = [
"cfg_aliases",
"chrono",
@ -134,7 +134,7 @@ dependencies = [
[[package]]
name = "arrow-flight"
version = "4.0.0-SNAPSHOT"
source = "git+https://github.com/apache/arrow.git?rev=e69478a890b1e4eee49b540b69b2711d170a0433#e69478a890b1e4eee49b540b69b2711d170a0433"
source = "git+https://github.com/apache/arrow.git?rev=00a443629c00079ea03c0b9f415d74669d2759a7#00a443629c00079ea03c0b9f415d74669d2759a7"
dependencies = [
"arrow",
"bytes",
@ -429,9 +429,9 @@ dependencies = [
[[package]]
name = "cast"
version = "0.2.3"
version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4b9434b9a5aa1450faa3f9cb14ea0e8c53bb5d2b3c1bfd1ab4fc03e9f33fbfb0"
checksum = "cc38c385bfd7e444464011bb24820f40dd1c76bcdfa1b78611cb7c2e5cafab75"
dependencies = [
"rustc_version",
]
@ -488,9 +488,9 @@ dependencies = [
[[package]]
name = "clang-sys"
version = "1.1.1"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f54d78e30b388d4815220c8dd03fea5656b6c6d32adb59e89061552a102f8da1"
checksum = "853eda514c284c2287f4bf20ae614f8781f40a81d32ecda6e91449304dfe077c"
dependencies = [
"glob",
"libc",
@ -662,9 +662,9 @@ dependencies = [
[[package]]
name = "crossbeam-channel"
version = "0.5.0"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dca26ee1f8d361640700bde38b2c37d8c22b3ce2d360e1fc1c74ea4b0aa7d775"
checksum = "06ed27e177f16d65f0f0c22a213e17c696ace5dd64b14258b52f9417ccb52db4"
dependencies = [
"cfg-if 1.0.0",
"crossbeam-utils",
@ -787,7 +787,7 @@ dependencies = [
[[package]]
name = "datafusion"
version = "4.0.0-SNAPSHOT"
source = "git+https://github.com/apache/arrow.git?rev=e69478a890b1e4eee49b540b69b2711d170a0433#e69478a890b1e4eee49b540b69b2711d170a0433"
source = "git+https://github.com/apache/arrow.git?rev=00a443629c00079ea03c0b9f415d74669d2759a7#00a443629c00079ea03c0b9f415d74669d2759a7"
dependencies = [
"ahash 0.7.2",
"arrow",
@ -1044,9 +1044,9 @@ checksum = "2022715d62ab30faffd124d40b76f4134a550a87792276512b18d63272333394"
[[package]]
name = "futures"
version = "0.3.13"
version = "0.3.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f55667319111d593ba876406af7c409c0ebb44dc4be6132a783ccf163ea14c1"
checksum = "a9d5813545e459ad3ca1bff9915e9ad7f1a47dc6a91b627ce321d5863b7dd253"
dependencies = [
"futures-channel",
"futures-core",
@ -1059,9 +1059,9 @@ dependencies = [
[[package]]
name = "futures-channel"
version = "0.3.13"
version = "0.3.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8c2dd2df839b57db9ab69c2c9d8f3e8c81984781937fe2807dc6dcf3b2ad2939"
checksum = "ce79c6a52a299137a6013061e0cf0e688fce5d7f1bc60125f520912fdb29ec25"
dependencies = [
"futures-core",
"futures-sink",
@ -1069,15 +1069,15 @@ dependencies = [
[[package]]
name = "futures-core"
version = "0.3.13"
version = "0.3.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "15496a72fabf0e62bdc3df11a59a3787429221dd0710ba8ef163d6f7a9112c94"
checksum = "098cd1c6dda6ca01650f1a37a794245eb73181d0d4d4e955e2f3c37db7af1815"
[[package]]
name = "futures-executor"
version = "0.3.13"
version = "0.3.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "891a4b7b96d84d5940084b2a37632dd65deeae662c114ceaa2c879629c9c0ad1"
checksum = "10f6cb7042eda00f0049b1d2080aa4b93442997ee507eb3828e8bd7577f94c9d"
dependencies = [
"futures-core",
"futures-task",
@ -1086,15 +1086,15 @@ dependencies = [
[[package]]
name = "futures-io"
version = "0.3.13"
version = "0.3.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d71c2c65c57704c32f5241c1223167c2c3294fd34ac020c807ddbe6db287ba59"
checksum = "365a1a1fb30ea1c03a830fdb2158f5236833ac81fa0ad12fe35b29cddc35cb04"
[[package]]
name = "futures-macro"
version = "0.3.13"
version = "0.3.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ea405816a5139fb39af82c2beb921d52143f556038378d6db21183a5c37fbfb7"
checksum = "668c6733a182cd7deb4f1de7ba3bf2120823835b3bcfbeacf7d2c4a773c1bb8b"
dependencies = [
"proc-macro-hack",
"proc-macro2",
@ -1104,21 +1104,21 @@ dependencies = [
[[package]]
name = "futures-sink"
version = "0.3.13"
version = "0.3.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "85754d98985841b7d4f5e8e6fbfa4a4ac847916893ec511a2917ccd8525b8bb3"
checksum = "5c5629433c555de3d82861a7a4e3794a4c40040390907cfbfd7143a92a426c23"
[[package]]
name = "futures-task"
version = "0.3.13"
version = "0.3.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fa189ef211c15ee602667a6fcfe1c1fd9e07d42250d2156382820fba33c9df80"
checksum = "ba7aa51095076f3ba6d9a1f702f74bd05ec65f555d70d2033d55ba8d69f581bc"
[[package]]
name = "futures-test"
version = "0.3.13"
version = "0.3.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f1fe5e51002528907757d5f1648101086f7197f792112db43ba23b06b09e6bce"
checksum = "e77baeade98824bc928c21b8ad39918b9d8a06745ebdb6e2c93fb7673fb7968d"
dependencies = [
"futures-core",
"futures-executor",
@ -1132,9 +1132,9 @@ dependencies = [
[[package]]
name = "futures-util"
version = "0.3.13"
version = "0.3.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1812c7ab8aedf8d6f2701a43e1243acdbcc2b36ab26e2ad421eb99ac963d96d1"
checksum = "3c144ad54d60f23927f0a6b6d816e4271278b64f005ad65e4e35291d2de9c025"
dependencies = [
"futures-channel",
"futures-core",
@ -2299,7 +2299,7 @@ dependencies = [
[[package]]
name = "parquet"
version = "4.0.0-SNAPSHOT"
source = "git+https://github.com/apache/arrow.git?rev=e69478a890b1e4eee49b540b69b2711d170a0433#e69478a890b1e4eee49b540b69b2711d170a0433"
source = "git+https://github.com/apache/arrow.git?rev=00a443629c00079ea03c0b9f415d74669d2759a7#00a443629c00079ea03c0b9f415d74669d2759a7"
dependencies = [
"arrow",
"base64 0.12.3",
@ -2882,9 +2882,9 @@ dependencies = [
[[package]]
name = "reqwest"
version = "0.11.2"
version = "0.11.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bf12057f289428dbf5c591c74bf10392e4a8003f993405a902f20117019022d4"
checksum = "2296f2fac53979e8ccbc4a1136b25dcefd37be9ed7e4a1f6b05a6029c84ff124"
dependencies = [
"base64 0.13.0",
"bytes",
@ -3119,9 +3119,9 @@ checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd"
[[package]]
name = "sct"
version = "0.6.0"
version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e3042af939fca8c3453b7af0f1c66e533a15a86169e39de2657310ade8f98d3c"
checksum = "b362b83898e0e69f38515b82ee15aa80636befe47c3b6d3d89a911e78fc228ce"
dependencies = [
"ring",
"untrusted",
@ -3759,9 +3759,9 @@ checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c"
[[package]]
name = "tokio"
version = "1.4.0"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "134af885d758d645f0f0505c9a8b3f9bf8a348fd822e112ab5248138348f1722"
checksum = "83f0c8e7c0addab50b663055baf787d0af7f413a46e6e7fb9559a4e4db7137a5"
dependencies = [
"autocfg",
"bytes",
@ -3822,9 +3822,9 @@ dependencies = [
[[package]]
name = "tokio-util"
version = "0.6.5"
version = "0.6.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5143d049e85af7fbc36f5454d990e62c2df705b3589f123b71f441b6b59f443f"
checksum = "940a12c99365c31ea8dd9ba04ec1be183ffe4920102bb7122c2f515437601e8e"
dependencies = [
"bytes",
"futures-core",

View File

@ -8,14 +8,14 @@ description = "Apache Arrow / Parquet / DataFusion dependencies for InfluxDB IOx
[dependencies] # In alphabetical order
# We are using development version of arrow/parquet/datafusion and the dependencies are at the same rev
# The version can be found here: https://github.com/apache/arrow/commit/e69478a890b1e4eee49b540b69b2711d170a0433
# The version can be found here: https://github.com/apache/arrow/commit/00a443629c00079ea03c0b9f415d74669d2759a7
#
arrow = { git = "https://github.com/apache/arrow.git", rev = "e69478a890b1e4eee49b540b69b2711d170a0433" , features = ["simd"] }
arrow-flight = { git = "https://github.com/apache/arrow.git", rev = "e69478a890b1e4eee49b540b69b2711d170a0433" }
arrow = { git = "https://github.com/apache/arrow.git", rev = "00a443629c00079ea03c0b9f415d74669d2759a7" , features = ["simd"] }
arrow-flight = { git = "https://github.com/apache/arrow.git", rev = "00a443629c00079ea03c0b9f415d74669d2759a7" }
# Turn off optional datafusion features (function packages)
datafusion = { git = "https://github.com/apache/arrow.git", rev = "e69478a890b1e4eee49b540b69b2711d170a0433", default-features = false }
datafusion = { git = "https://github.com/apache/arrow.git", rev = "00a443629c00079ea03c0b9f415d74669d2759a7", default-features = false }
# Turn off the "arrow" feature; it currently has a bug that causes the crate to rebuild every time
# and we're not currently using it anyway
parquet = { git = "https://github.com/apache/arrow.git", rev = "e69478a890b1e4eee49b540b69b2711d170a0433", default-features = false, features = ["snap", "brotli", "flate2", "lz4", "zstd"] }
parquet = { git = "https://github.com/apache/arrow.git", rev = "00a443629c00079ea03c0b9f415d74669d2759a7", default-features = false, features = ["snap", "brotli", "flate2", "lz4", "zstd"] }

View File

@ -6,7 +6,7 @@ As discussed on https://github.com/influxdata/influxdb_iox/pull/221 and https://
1. Use only async I/O via `tokio` for socket communication. It is ok to use either blocking (e.g. `std::fs::File`) or async APIs (e.g. `tokio::fs::File`) for local File I/O.
2. All CPU bound tasks should be scheduled on the separate application level `thread_pool` not with `tokio::task::spawn` nor `tokio::task::spawn_blocking` nor a new threadpool.
2. All CPU bound tasks should be scheduled on the separate application level `thread_pool` (which can be another tokio executor but should be separate from the executor that handles I/O).
We will work, over time, to migrate the rest of the codebase to use these patterns.
@ -41,11 +41,11 @@ It is ok to use either blocking (e.g. `std::fs::File`) or async APIs for local
This can not always be done (e.g. with a library such as parquet writer which is not `async`). In such cases, using `tokio::task::spawn_blocking` should be used to perform the file I/O.
### All CPU heavy work should be done on the single app level worker pool, separate from the tokio runtime
### All CPU heavy work should be done on the single app level worker pool, separate from the tokio runtime handling IO
**What**: All CPU heavy work should be done on the single app level worker pool. We provide a `thread_pool` interface that interacts nicely with async tasks (e.g. that allows an async task to `await` for a CPU heavy task to complete).
**What**: All CPU heavy work should be done on the app level worker pool. We provide a `thread_pool` interface that interacts nicely with async tasks (e.g. that allows an async task to `await` for a CPU heavy task to complete).
**Rationale**: A single app level worker pool gives us a single place to control work priority, eventually, so that tasks such as compaction of large data files can have lower precedence than incoming queries. By using a different pool than the tokio runtime, with a limited number of threads, we avoid over-saturating the CPU with OS threads and thereby starving the limited number tokio I/O threads. A separate, single app level pool also limits the number of underlying OS CPU threads which are spawned, even under heavy load, keeping thread context switching overhead low.
**Rationale**: A single app level worker pool gives us a single place to control work priority, eventually, so that tasks such as compaction of large data files can have lower precedence than incoming queries. By using a different pool than the main tokio runtime, with a limited number of threads, we avoid over-saturating the CPU with OS threads and thereby starving the limited number tokio I/O threads. A separate, single app level pool also limits the number of underlying OS CPU threads which are spawned, even under heavy load, keeping thread context switching overhead low.
There will, of course, always be a judgment call to be made of where "CPU bound work" starts and "work acceptable for I/O processing" ends. A reasonable rule of thumb is if a job will *always* be completed in less than 100ms then that is probably fine for an I/O thread). This number may be revised as we tune the system.

View File

@ -4,7 +4,12 @@ package influxdata.iox.write.v1;
service WriteService {
// write data into a specific Database
rpc Write(WriteRequest) returns (WriteResponse);
rpc Write(WriteRequest) returns (WriteResponse) {
option deprecated = true;
};
// write an entry into a Database
rpc WriteEntry(WriteEntryRequest) returns (WriteEntryResponse);
}
message WriteRequest {
@ -21,3 +26,17 @@ message WriteResponse {
// how many lines were parsed and written into the database
uint64 lines_written = 1;
}
message WriteEntryRequest {
// name of database into which to write
string db_name = 1;
// entry, in serialized flatbuffers [Entry] format
//
// [Entry](https://github.com/influxdata/influxdb_iox/blob/main/generated_types/protos/influxdata/iox/write/v1/entry.fbs)
bytes entry = 2;
}
message WriteEntryResponse {
}

View File

@ -5,20 +5,17 @@ use std::sync::Arc;
use snafu::{OptionExt, ResultExt, Snafu};
use arrow_deps::{arrow::record_batch::RecordBatch, datafusion::logical_plan::Expr};
use arrow_deps::arrow::record_batch::RecordBatch;
use data_types::{database_rules::WriterId, partition_metadata::TableSummary};
use internal_types::{
entry::{ClockValue, TableBatch},
schema::Schema,
selection::Selection,
};
use tracker::{MemRegistry, MemTracker};
use crate::chunk::snapshot::ChunkSnapshot;
use crate::{
column::Column,
dictionary::{Dictionary, Error as DictionaryError},
pred::{ChunkPredicate, ChunkPredicateBuilder},
dictionary::{Dictionary, Error as DictionaryError, DID},
table::Table,
};
use parking_lot::Mutex;
@ -39,57 +36,12 @@ pub enum Error {
source: crate::table::Error,
},
#[snafu(display("Error checking predicate in table {}: {}", table_id, source))]
PredicateCheck {
table_id: u32,
source: crate::table::Error,
},
#[snafu(display("Error checking predicate in table '{}': {}", table_name, source))]
NamedTablePredicateCheck {
table_name: String,
source: crate::table::Error,
},
#[snafu(display(
"Unsupported predicate when mutable buffer table names. Found a general expression: {:?}",
exprs
))]
PredicateNotYetSupported { exprs: Vec<Expr> },
#[snafu(display("Table ID {} not found in dictionary of chunk {}", table_id, chunk))]
TableIdNotFoundInDictionary {
table_id: u32,
chunk: u64,
source: DictionaryError,
},
#[snafu(display(
"Internal error: table {} not found in dictionary of chunk {}",
table_name,
chunk_id
))]
InternalTableNotFoundInDictionary { table_name: String, chunk_id: u32 },
#[snafu(display("Table {} not found in chunk {}", table, chunk))]
TableNotFoundInChunk { table: u32, chunk: u64 },
#[snafu(display("Table '{}' not found in chunk {}", table_name, chunk_id))]
NamedTableNotFoundInChunk { table_name: String, chunk_id: u64 },
#[snafu(display("Attempt to write table batch without a name"))]
TableWriteWithoutName,
#[snafu(display("Value ID {} not found in dictionary of chunk {}", value_id, chunk_id))]
InternalColumnValueIdNotFoundInDictionary {
value_id: u32,
chunk_id: u64,
source: DictionaryError,
},
TableNotFoundInChunk { table: DID, chunk: u64 },
#[snafu(display("Column ID {} not found in dictionary of chunk {}", column_id, chunk))]
ColumnIdNotFoundInDictionary {
column_id: u32,
column_id: DID,
chunk: u64,
source: DictionaryError,
},
@ -104,12 +56,6 @@ pub enum Error {
chunk_id: u64,
source: DictionaryError,
},
#[snafu(display(
"Column '{}' is not a string tag column and thus can not list values",
column_name
))]
UnsupportedColumnTypeForListingValues { column_name: String },
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@ -117,16 +63,16 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
#[derive(Debug)]
pub struct Chunk {
/// The id for this chunk
pub id: u32,
id: u32,
/// `dictionary` maps &str -> u32. The u32s are used in place of String or
/// `dictionary` maps &str -> DID. The DIDs are used in place of String or
/// str to avoid slow string operations. The same dictionary is used for
/// table names, tag names, tag values, and column names.
// TODO: intern string field values too?
pub dictionary: Dictionary,
dictionary: Dictionary,
/// map of the dictionary ID for the table name to the table
pub tables: HashMap<u32, Table>,
tables: HashMap<DID, Table>,
/// keep track of memory used by chunk
tracker: MemTracker,
@ -206,214 +152,6 @@ impl Chunk {
snapshot
}
/// Return all the names of the tables names in this chunk that match
/// chunk predicate
pub fn table_names(&self, chunk_predicate: &ChunkPredicate) -> Result<Vec<&str>> {
// we don't support arbitrary expressions in chunk predicate yet
if !chunk_predicate.chunk_exprs.is_empty() {
return PredicateNotYetSupported {
exprs: chunk_predicate.chunk_exprs.clone(),
}
.fail();
}
self.tables
.iter()
.filter_map(|(&table_id, table)| {
// could match is good enough for this metadata query
match table.could_match_predicate(chunk_predicate) {
Ok(true) => Some(self.dictionary.lookup_id(table_id).context(
TableIdNotFoundInDictionary {
table_id,
chunk: self.id,
},
)),
Ok(false) => None,
Err(e) => Some(Err(e).context(PredicateCheck { table_id })),
}
})
.collect()
}
/// If the column names that match the predicate can be found
/// from the predicate entirely using metadata, return those
/// strings.
///
/// If the predicate cannot be evaluated entirely with
/// metadata, return `Ok(None)`.
pub fn column_names(
&self,
table_name: &str,
chunk_predicate: &ChunkPredicate,
selection: Selection<'_>,
) -> Result<Option<BTreeSet<String>>> {
// No support for general purpose expressions
if !chunk_predicate.chunk_exprs.is_empty() {
return Ok(None);
}
let table_name_id = self.table_name_id(table_name)?;
let mut chunk_column_ids = BTreeSet::new();
// Is this table in the chunk?
if let Some(table) = self.tables.get(&table_name_id) {
for (&column_id, column) in &table.columns {
let column_matches_predicate = table
.column_matches_predicate(&column, chunk_predicate)
.context(NamedTableError { table_name })?;
if column_matches_predicate {
chunk_column_ids.insert(column_id);
}
}
}
// Only return subset of these selection_cols if not all_cols
let mut all_cols = true;
let selection_cols = match selection {
Selection::All => &[""],
Selection::Some(cols) => {
all_cols = false;
cols
}
};
let mut column_names = BTreeSet::new();
for &column_id in &chunk_column_ids {
let column_name =
self.dictionary
.lookup_id(column_id)
.context(ColumnIdNotFoundInDictionary {
column_id,
chunk: self.id,
})?;
if !column_names.contains(column_name)
&& (all_cols || selection_cols.contains(&column_name))
{
// only use columns in selection_cols
column_names.insert(column_name.to_string());
}
}
Ok(Some(column_names))
}
/// Return the id of the table in the chunk's dictionary
fn table_name_id(&self, table_name: &str) -> Result<u32> {
self.dictionary
.id(table_name)
.context(InternalTableNotFoundInDictionary {
table_name,
chunk_id: self.id(),
})
}
/// Returns the strings of the specified Tag column that satisfy
/// the predicate, if they can be determined entirely using metadata.
///
/// If the predicate cannot be evaluated entirely with metadata,
/// return `Ok(None)`.
pub fn tag_column_values(
&self,
table_name: &str,
column_name: &str,
chunk_predicate: &ChunkPredicate,
) -> Result<Option<BTreeSet<String>>> {
// No support for general purpose expressions
if !chunk_predicate.chunk_exprs.is_empty() {
return Ok(None);
}
let chunk_id = self.id();
let table_name_id = self.table_name_id(table_name)?;
// Is this table even in the chunk?
let table = self
.tables
.get(&table_name_id)
.context(NamedTableNotFoundInChunk {
table_name,
chunk_id,
})?;
// See if we can rule out the table entire on metadata
let could_match = table
.could_match_predicate(chunk_predicate)
.context(NamedTablePredicateCheck { table_name })?;
if !could_match {
// No columns could match, return empty set
return Ok(Default::default());
}
let column_id =
self.dictionary
.lookup_value(column_name)
.context(ColumnNameNotFoundInDictionary {
column_name,
chunk_id,
})?;
let column = table
.column(column_id)
.context(NamedTableError { table_name })?;
if let Column::Tag(column, _) = column {
// if we have a timestamp predicate, find all values
// where the timestamp is within range. Otherwise take
// all values.
// Collect matching ids into BTreeSet to deduplicate on
// ids *before* looking up Strings
let column_value_ids: BTreeSet<u32> = match chunk_predicate.range {
None => {
// take all non-null values
column.iter().filter_map(|&s| s).collect()
}
Some(range) => {
// filter out all values that don't match the timestmap
let time_column = table
.column_i64(chunk_predicate.time_column_id)
.context(NamedTableError { table_name })?;
column
.iter()
.zip(time_column.iter())
.filter_map(|(&column_value_id, &timestamp_value)| {
if range.contains_opt(timestamp_value) {
column_value_id
} else {
None
}
})
.collect()
}
};
// convert all the (deduplicated) ids to Strings
let column_values = column_value_ids
.into_iter()
.map(|value_id| {
let value = self.dictionary.lookup_id(value_id).context(
InternalColumnValueIdNotFoundInDictionary { value_id, chunk_id },
)?;
Ok(value.to_string())
})
.collect::<Result<BTreeSet<String>>>()?;
Ok(Some(column_values))
} else {
UnsupportedColumnTypeForListingValues { column_name }.fail()
}
}
/// Return a builder suitable to create predicates for this Chunk
pub fn predicate_builder(&self) -> Result<ChunkPredicateBuilder<'_>, crate::pred::Error> {
ChunkPredicateBuilder::new(&self.dictionary)
}
/// returns true if there is no data in this chunk
pub fn is_empty(&self) -> bool {
self.tables.is_empty()
@ -435,7 +173,7 @@ impl Chunk {
if let Some(table) = self.table(table_name)? {
dst.push(
table
.to_arrow(&self, selection)
.to_arrow(&self.dictionary, selection)
.context(NamedTableError { table_name })?,
);
}
@ -454,7 +192,7 @@ impl Chunk {
TableSummary {
name: name.to_string(),
columns: table.stats(&self),
columns: table.stats(&self.dictionary),
}
})
.collect()
@ -474,21 +212,6 @@ impl Chunk {
Ok(table)
}
/// Return Schema for the specified table / columns
pub fn table_schema(&self, table_name: &str, selection: Selection<'_>) -> Result<Schema> {
let table = self
.table(table_name)?
// Option --> Result
.context(NamedTableNotFoundInChunk {
table_name,
chunk_id: self.id(),
})?;
table
.schema(self, selection)
.context(NamedTableError { table_name })
}
/// Return the approximate memory size of the chunk, in bytes including the
/// dictionary, tables, and their rows.
pub fn size(&self) -> usize {

View File

@ -55,8 +55,8 @@ impl ChunkSnapshot {
pub fn new(chunk: &Chunk) -> Self {
let mut records: HashMap<String, TableSnapshot> = Default::default();
for (id, table) in &chunk.tables {
let schema = table.schema(chunk, Selection::All).unwrap();
let batch = table.to_arrow(chunk, Selection::All).unwrap();
let schema = table.schema(&chunk.dictionary, Selection::All).unwrap();
let batch = table.to_arrow(&chunk.dictionary, Selection::All).unwrap();
let name = chunk.dictionary.lookup_id(*id).unwrap();
let timestamp_range = chunk

View File

@ -1,7 +1,6 @@
use snafu::Snafu;
use crate::dictionary::Dictionary;
use arrow_deps::arrow::datatypes::DataType as ArrowDataType;
use crate::dictionary::{Dictionary, DID};
use data_types::partition_metadata::StatValues;
use generated_types::entry::LogicalColumnType;
use internal_types::entry::TypedValuesIterator;
@ -37,7 +36,7 @@ pub enum Column {
U64(Vec<Option<u64>>, StatValues<u64>),
String(Vec<Option<String>>, StatValues<String>),
Bool(Vec<Option<bool>>, StatValues<bool>),
Tag(Vec<Option<u32>>, StatValues<String>),
Tag(Vec<Option<DID>>, StatValues<String>),
}
impl Column {
@ -320,10 +319,6 @@ impl Column {
}
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn type_description(&self) -> &'static str {
match self {
Self::F64(_, _) => "f64",
@ -335,56 +330,6 @@ impl Column {
}
}
/// Return the arrow DataType for this column
pub fn data_type(&self) -> ArrowDataType {
match self {
Self::F64(..) => ArrowDataType::Float64,
Self::I64(..) => ArrowDataType::Int64,
Self::U64(..) => ArrowDataType::UInt64,
Self::String(..) => ArrowDataType::Utf8,
Self::Bool(..) => ArrowDataType::Boolean,
Self::Tag(..) => ArrowDataType::Utf8,
}
}
// push_none_if_len_equal will add a None value to the end of the Vec of values
// if the length is equal to the passed in value. This is used to ensure
// columns are all the same length.
pub fn push_none_if_len_equal(&mut self, len: usize) {
match self {
Self::F64(v, _) => {
if v.len() == len {
v.push(None);
}
}
Self::I64(v, _) => {
if v.len() == len {
v.push(None);
}
}
Self::U64(v, _) => {
if v.len() == len {
v.push(None);
}
}
Self::String(v, _) => {
if v.len() == len {
v.push(None);
}
}
Self::Bool(v, _) => {
if v.len() == len {
v.push(None);
}
}
Self::Tag(v, _) => {
if v.len() == len {
v.push(None);
}
}
}
}
pub fn get_i64_stats(&self) -> Option<StatValues<i64>> {
match self {
Self::I64(_, values) => Some(values.clone()),
@ -392,50 +337,6 @@ impl Column {
}
}
/// Returns true if any rows are within the range [min_value,
/// max_value). Inclusive of `start`, exclusive of `end`
pub fn has_i64_range(&self, start: i64, end: i64) -> Result<bool> {
match self {
Self::I64(_, stats) => {
if stats.max < start || stats.min >= end {
Ok(false)
} else {
Ok(true)
}
}
_ => InternalTypeMismatchForTimePredicate {}.fail(),
}
}
/// Return true of this column's type is a Tag
pub fn is_tag(&self) -> bool {
matches!(self, Self::Tag(..))
}
/// Returns true if there exists at least one row idx where this
/// self[i] is within the range [min_value, max_value). Inclusive
/// of `start`, exclusive of `end` and where col[i] is non null
pub fn has_non_null_i64_range<T>(
&self,
column: &[Option<T>],
start: i64,
end: i64,
) -> Result<bool> {
match self {
Self::I64(v, _) => {
for (index, val) in v.iter().enumerate() {
if let Some(val) = val {
if start <= *val && *val < end && column[index].is_some() {
return Ok(true);
}
}
}
Ok(false)
}
_ => InternalTypeMismatchForTimePredicate {}.fail(),
}
}
/// The approximate memory size of the data in the column. Note that
/// the space taken for the tag string values is represented in
/// the dictionary size in the chunk that holds the table that has this
@ -455,7 +356,7 @@ impl Column {
mem::size_of::<Option<bool>>() * v.len() + mem::size_of_val(&stats)
}
Self::Tag(v, stats) => {
mem::size_of::<Option<u32>>() * v.len() + mem::size_of_val(&stats)
mem::size_of::<Option<DID>>() * v.len() + mem::size_of_val(&stats)
}
Self::String(v, stats) => {
let string_bytes_size = v
@ -467,89 +368,3 @@ impl Column {
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_has_i64_range() {
let mut stats = StatValues::new(1);
stats.update(2);
let col = Column::I64(vec![Some(1), None, Some(2)], stats.clone());
assert!(!col.has_i64_range(-1, 0).unwrap());
assert!(!col.has_i64_range(0, 1).unwrap());
assert!(col.has_i64_range(1, 2).unwrap());
assert!(col.has_i64_range(2, 3).unwrap());
assert!(!col.has_i64_range(3, 4).unwrap());
let col = Column::I64(vec![Some(2), None, Some(1)], stats);
assert!(!col.has_i64_range(-1, 0).unwrap());
assert!(!col.has_i64_range(0, 1).unwrap());
assert!(col.has_i64_range(1, 2).unwrap());
assert!(col.has_i64_range(2, 3).unwrap());
assert!(!col.has_i64_range(3, 4).unwrap());
}
#[test]
fn test_has_i64_range_does_not_panic() {
// providing the wrong column type should get an internal error, not a panic
let col = Column::F64(vec![Some(1.2)], StatValues::new(1.2));
let res = col.has_i64_range(-1, 0);
assert!(res.is_err());
let res_string = format!("{:?}", res);
let expected = "InternalTypeMismatchForTimePredicate";
assert!(
res_string.contains(expected),
"Did not find expected text '{}' in '{}'",
expected,
res_string
);
}
#[test]
fn test_has_non_null_i64_range_() {
let none_col: Vec<Option<u32>> = vec![None, None, None];
let some_col: Vec<Option<u32>> = vec![Some(0), Some(0), Some(0)];
let mut stats = StatValues::new(1);
stats.update(2);
let col = Column::I64(vec![Some(1), None, Some(2)], stats);
assert!(!col.has_non_null_i64_range(&some_col, -1, 0).unwrap());
assert!(!col.has_non_null_i64_range(&some_col, 0, 1).unwrap());
assert!(col.has_non_null_i64_range(&some_col, 1, 2).unwrap());
assert!(col.has_non_null_i64_range(&some_col, 2, 3).unwrap());
assert!(!col.has_non_null_i64_range(&some_col, 3, 4).unwrap());
assert!(!col.has_non_null_i64_range(&none_col, -1, 0).unwrap());
assert!(!col.has_non_null_i64_range(&none_col, 0, 1).unwrap());
assert!(!col.has_non_null_i64_range(&none_col, 1, 2).unwrap());
assert!(!col.has_non_null_i64_range(&none_col, 2, 3).unwrap());
assert!(!col.has_non_null_i64_range(&none_col, 3, 4).unwrap());
}
#[test]
fn column_size() {
let i64col = Column::I64(vec![Some(1), Some(1)], StatValues::new(1));
assert_eq!(40, i64col.size());
let f64col = Column::F64(vec![Some(1.1), Some(1.1), Some(1.1)], StatValues::new(1.1));
assert_eq!(56, f64col.size());
let boolcol = Column::Bool(vec![Some(true)], StatValues::new(true));
assert_eq!(9, boolcol.size());
let tagcol = Column::Tag(
vec![Some(1), Some(1), Some(1), Some(1)],
StatValues::new("foo".to_string()),
);
assert_eq!(40, tagcol.size());
let stringcol = Column::String(
vec![Some("foo".to_string()), Some("hello world".to_string())],
StatValues::new("foo".to_string()),
);
assert_eq!(70, stringcol.size());
}
}

View File

@ -8,7 +8,7 @@ use string_interner::{
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Dictionary lookup error on id {}", id))]
DictionaryIdLookupError { id: u32 },
DictionaryIdLookupError { id: DID },
#[snafu(display("Dictionary lookup error for value {}", value))]
DictionaryValueLookupError { value: String },
@ -16,6 +16,30 @@ pub enum Error {
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// A "dictionary ID" (DID) is a compact numeric representation of an interned
/// string in the dictionary. The same string always maps the same DID. DIDs can
/// be compared, hashed and cheaply copied around, just like small integers.
#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
pub struct DID(DefaultSymbol);
impl DID {
fn new(s: DefaultSymbol) -> Self {
Self(s)
}
}
impl From<DID> for DefaultSymbol {
fn from(id: DID) -> Self {
id.0
}
}
impl std::fmt::Display for DID {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0.to_usize())
}
}
#[derive(Debug, Clone)]
pub struct Dictionary {
interner: StringInterner<DefaultSymbol, StringBackend<DefaultSymbol>, DefaultHashBuilder>,
@ -39,43 +63,37 @@ impl Dictionary {
/// Returns the id corresponding to value, adding an entry for the
/// id if it is not yet present in the dictionary.
pub fn lookup_value_or_insert(&mut self, value: &str) -> u32 {
pub fn lookup_value_or_insert(&mut self, value: &str) -> DID {
self.id(value).unwrap_or_else(|| {
self.size += value.len();
self.size += std::mem::size_of::<u32>();
symbol_to_u32(self.interner.get_or_intern(value))
DID::new(self.interner.get_or_intern(value))
})
}
/// Returns the ID in self.dictionary that corresponds to `value`, if any.
/// Returns an error if no such value is found. Does not add the value
/// to the dictionary.
pub fn lookup_value(&self, value: &str) -> Result<u32> {
pub fn lookup_value(&self, value: &str) -> Result<DID> {
self.id(value).context(DictionaryValueLookupError { value })
}
/// Returns the ID in self.dictionary that corresponds to `value`,
/// if any. No error is returned to avoid an allocation when no value is
/// present
pub fn id(&self, value: &str) -> Option<u32> {
self.interner.get(value).map(symbol_to_u32)
pub fn id(&self, value: &str) -> Option<DID> {
self.interner.get(value).map(DID::new)
}
/// Returns the str in self.dictionary that corresponds to `id`,
/// if any. Returns an error if no such id is found
pub fn lookup_id(&self, id: u32) -> Result<&str> {
let symbol =
Symbol::try_from_usize(id as usize).expect("to be able to convert u32 to symbol");
pub fn lookup_id(&self, id: DID) -> Result<&str> {
self.interner
.resolve(symbol)
.resolve(id.into())
.context(DictionaryIdLookupError { id })
}
}
fn symbol_to_u32(sym: DefaultSymbol) -> u32 {
sym.to_usize() as u32
}
#[cfg(test)]
mod test {
use crate::dictionary::Dictionary;

View File

@ -60,5 +60,4 @@
pub mod chunk;
mod column;
mod dictionary;
pub mod pred;
mod table;

View File

@ -1,298 +0,0 @@
use std::collections::{BTreeSet, HashSet};
use crate::dictionary::{Dictionary, Error as DictionaryError};
use arrow_deps::{
datafusion::{
error::{DataFusionError, Result as DatafusionResult},
logical_plan::{Expr, ExpressionVisitor, Operator, Recursion},
optimizer::utils::expr_to_column_names,
},
util::{make_range_expr, AndExprBuilder},
};
use data_types::timestamp::TimestampRange;
use internal_types::schema::TIME_COLUMN_NAME;
//use snafu::{OptionExt, ResultExt, Snafu};
use snafu::{ensure, ResultExt, Snafu};
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Error writing table '{}': {}", table_name, source))]
TableWrite {
table_name: String,
source: crate::table::Error,
},
#[snafu(display("Time Column was not not found in dictionary: {}", source))]
TimeColumnNotFound { source: DictionaryError },
#[snafu(display("Unsupported predicate. Mutable buffer does not support: {}", source))]
UnsupportedPredicate { source: DataFusionError },
#[snafu(display(
"Internal error visiting expressions in ChunkPredicateBuilder: {}",
source
))]
InternalVisitingExpressions { source: DataFusionError },
#[snafu(display("table_names has already been specified in ChunkPredicateBuilder"))]
TableNamesAlreadySet {},
#[snafu(display("field_names has already been specified in ChunkPredicateBuilder"))]
FieldNamesAlreadySet {},
#[snafu(display("range has already been specified in ChunkPredicateBuilder"))]
RangeAlreadySet {},
#[snafu(display("exprs has already been specified in ChunkPredicateBuilder"))]
ExprsAlreadySet {},
#[snafu(display("required_columns has already been specified in ChunkPredicateBuilder"))]
RequiredColumnsAlreadySet {},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Describes the result of translating a set of strings into
/// chunk specific ids
#[derive(Debug, PartialEq, Eq)]
pub enum ChunkIdSet {
/// At least one of the strings was not present in the chunks'
/// dictionary.
///
/// This is important when testing for the presence of all ids in
/// a set, as we know they can not all be present
AtLeastOneMissing,
/// All strings existed in this chunk's dictionary
Present(BTreeSet<u32>),
}
/// a 'Compiled' set of predicates / filters that can be evaluated on
/// this chunk (where strings have been translated to chunk
/// specific u32 ids)
#[derive(Debug, Default)]
pub struct ChunkPredicate {
/// If present, restrict the request to just those tables whose
/// names are in table_names. If present but empty, means there
/// was a predicate but no tables named that way exist in the
/// chunk (so no table can pass)
pub table_name_predicate: Option<BTreeSet<u32>>,
/// Optional column restriction. If present, further
/// restrict any field columns returned to only those named, and
/// skip tables entirely when querying metadata that do not have
/// *any* of the fields
pub field_name_predicate: Option<BTreeSet<u32>>,
/// General DataFusion expressions (arbitrary predicates) applied
/// as a filter using logical conjuction (aka are 'AND'ed
/// together). Only rows that evaluate to TRUE for all these
/// expressions should be returned.
///
/// TODO these exprs should eventually be removed (when they are
/// all handled one layer up in the query layer)
pub chunk_exprs: Vec<Expr>,
/// If Some, then the table must contain all columns specified
/// to pass the predicate
pub required_columns: Option<ChunkIdSet>,
/// The id of the "time" column in this chunk
pub time_column_id: u32,
/// Timestamp range: only rows within this range should be considered
pub range: Option<TimestampRange>,
}
impl ChunkPredicate {
/// Creates and adds a datafuson predicate representing the
/// combination of predicate and timestamp.
pub fn filter_expr(&self) -> Option<Expr> {
// build up a list of expressions
let mut builder =
AndExprBuilder::default().append_opt(self.make_timestamp_predicate_expr());
for expr in &self.chunk_exprs {
builder = builder.append_expr(expr.clone());
}
builder.build()
}
/// For plans which select a subset of fields, returns true if
/// the field should be included in the results
pub fn should_include_field(&self, field_id: u32) -> bool {
match &self.field_name_predicate {
None => true,
Some(field_restriction) => field_restriction.contains(&field_id),
}
}
/// Return true if this column is the time column
pub fn is_time_column(&self, id: u32) -> bool {
self.time_column_id == id
}
/// Creates a DataFusion predicate for appliying a timestamp range:
///
/// range.start <= time and time < range.end`
fn make_timestamp_predicate_expr(&self) -> Option<Expr> {
self.range
.map(|range| make_range_expr(range.start, range.end, TIME_COLUMN_NAME))
}
}
/// Builds ChunkPredicates
#[derive(Debug)]
pub struct ChunkPredicateBuilder<'a> {
inner: ChunkPredicate,
dictionary: &'a Dictionary,
}
impl<'a> ChunkPredicateBuilder<'a> {
pub fn new(dictionary: &'a Dictionary) -> Result<Self> {
let time_column_id = dictionary
.lookup_value(TIME_COLUMN_NAME)
.context(TimeColumnNotFound)?;
let inner = ChunkPredicate {
time_column_id,
..Default::default()
};
Ok(Self { inner, dictionary })
}
/// Set table_name_predicate so only tables in `names` are returned
pub fn table_names(mut self, names: Option<&BTreeSet<String>>) -> Result<Self> {
ensure!(
self.inner.table_name_predicate.is_none(),
TableNamesAlreadySet
);
self.inner.table_name_predicate = self.compile_string_list(names);
Ok(self)
}
/// Set field_name_predicate so only tables in `names` are returned
pub fn field_names(mut self, names: Option<&BTreeSet<String>>) -> Result<Self> {
ensure!(
self.inner.field_name_predicate.is_none(),
FieldNamesAlreadySet
);
self.inner.field_name_predicate = self.compile_string_list(names);
Ok(self)
}
pub fn range(mut self, range: Option<TimestampRange>) -> Result<Self> {
ensure!(self.inner.range.is_none(), RangeAlreadySet);
self.inner.range = range;
Ok(self)
}
/// Set the general purpose predicates
pub fn exprs(mut self, chunk_exprs: Vec<Expr>) -> Result<Self> {
// In order to evaluate expressions in the table, all columns
// referenced in the expression must appear (I think, not sure
// about NOT, etc so panic if we see one of those);
let mut visitor = SupportVisitor {};
let mut predicate_columns: HashSet<String> = HashSet::new();
for expr in &chunk_exprs {
visitor = expr.accept(visitor).context(UnsupportedPredicate)?;
expr_to_column_names(&expr, &mut predicate_columns)
.context(InternalVisitingExpressions)?;
}
ensure!(self.inner.chunk_exprs.is_empty(), ExprsAlreadySet);
self.inner.chunk_exprs = chunk_exprs;
// if there are any column references in the expression, ensure they appear in
// any table
if !predicate_columns.is_empty() {
ensure!(
self.inner.required_columns.is_none(),
RequiredColumnsAlreadySet
);
self.inner.required_columns = Some(self.make_chunk_ids(predicate_columns.iter()));
}
Ok(self)
}
/// Return the created chunk predicate, consuming self
pub fn build(self) -> ChunkPredicate {
self.inner
}
/// Converts a Set of strings into a set of ids in terms of this
/// Chunk's dictionary.
///
/// If there are no matching Strings in the chunks dictionary,
/// those strings are ignored and a (potentially empty) set is
/// returned.
fn compile_string_list(&self, names: Option<&BTreeSet<String>>) -> Option<BTreeSet<u32>> {
names.map(|names| {
names
.iter()
.filter_map(|name| self.dictionary.id(name))
.collect::<BTreeSet<_>>()
})
}
/// Translate a bunch of strings into a set of ids from the dictionarythis
/// chunk
pub fn make_chunk_ids<'b, I>(&self, predicate_columns: I) -> ChunkIdSet
where
I: Iterator<Item = &'b String>,
{
let mut symbols = BTreeSet::new();
for column_name in predicate_columns {
if let Some(column_id) = self.dictionary.id(column_name) {
symbols.insert(column_id);
} else {
return ChunkIdSet::AtLeastOneMissing;
}
}
ChunkIdSet::Present(symbols)
}
}
/// Used to figure out if we know how to deal with this kind of
/// predicate in the write buffer
struct SupportVisitor {}
impl ExpressionVisitor for SupportVisitor {
fn pre_visit(self, expr: &Expr) -> DatafusionResult<Recursion<Self>> {
match expr {
Expr::Literal(..) => Ok(Recursion::Continue(self)),
Expr::Column(..) => Ok(Recursion::Continue(self)),
Expr::BinaryExpr { op, .. } => {
match op {
Operator::Eq
| Operator::Lt
| Operator::LtEq
| Operator::Gt
| Operator::GtEq
| Operator::Plus
| Operator::Minus
| Operator::Multiply
| Operator::Divide
| Operator::And
| Operator::Or => Ok(Recursion::Continue(self)),
// Unsupported (need to think about ramifications)
Operator::NotEq | Operator::Modulus | Operator::Like | Operator::NotLike => {
Err(DataFusionError::NotImplemented(format!(
"Operator {:?} not yet supported in IOx MutableBuffer",
op
)))
}
}
}
_ => Err(DataFusionError::NotImplemented(format!(
"Unsupported expression in mutable_buffer database: {:?}",
expr
))),
}
}
}

View File

@ -1,15 +1,9 @@
use std::{
cmp,
collections::{BTreeMap, BTreeSet},
sync::Arc,
};
use std::{cmp, collections::BTreeMap, sync::Arc};
use crate::{
chunk::Chunk,
column,
column::Column,
dictionary::{Dictionary, Error as DictionaryError},
pred::{ChunkIdSet, ChunkPredicate},
dictionary::{Dictionary, Error as DictionaryError, DID},
};
use data_types::{
database_rules::WriterId,
@ -36,12 +30,8 @@ use arrow_deps::{
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Tag value ID {} not found in dictionary of chunk {}", value, chunk))]
TagValueIdNotFoundInDictionary {
value: u32,
chunk: u64,
source: DictionaryError,
},
#[snafu(display("Tag value ID {} not found in dictionary of chunk", value))]
TagValueIdNotFoundInDictionary { value: DID, source: DictionaryError },
#[snafu(display("Column error on column {}: {}", column, source))]
ColumnError {
@ -56,7 +46,7 @@ pub enum Error {
actual_column_type
))]
InternalColumnTypeMismatch {
column_id: u32,
column_id: DID,
expected_column_type: String,
actual_column_type: String,
},
@ -64,21 +54,12 @@ pub enum Error {
#[snafu(display("Internal error: unexpected aggregate request for None aggregate",))]
InternalUnexpectedNoneAggregate {},
#[snafu(display(
"Column name '{}' not found in dictionary of chunk {}",
column_name,
chunk
))]
ColumnNameNotFoundInDictionary { column_name: String, chunk: u64 },
#[snafu(display("Column name '{}' not found in dictionary of chunk", column_name,))]
ColumnNameNotFoundInDictionary { column_name: String },
#[snafu(display(
"Internal: Column id '{}' not found in dictionary of chunk {}",
column_id,
chunk
))]
#[snafu(display("Internal: Column id '{}' not found in dictionary", column_id,))]
ColumnIdNotFoundInDictionary {
column_id: u32,
chunk: u64,
column_id: DID,
source: DictionaryError,
},
@ -95,22 +76,22 @@ pub enum Error {
column_name,
column_id
))]
InternalNoColumnInIndex { column_name: String, column_id: u32 },
InternalNoColumnInIndex { column_name: String, column_id: DID },
#[snafu(display("Error creating column from wal for column {}: {}", column, source))]
CreatingFromWal {
column: u32,
column: DID,
source: crate::column::Error,
},
#[snafu(display("Error evaluating column predicate for column {}: {}", column, source))]
ColumnPredicateEvaluation {
column: u32,
column: DID,
source: crate::column::Error,
},
#[snafu(display("Row insert to table {} missing column name", table))]
ColumnNameNotInRow { table: u32 },
ColumnNameNotInRow { table: DID },
#[snafu(display(
"Group column '{}' not found in tag columns: {}",
@ -126,21 +107,21 @@ pub enum Error {
DuplicateGroupColumn { column_name: String },
#[snafu(display("Column {} not found in table {}", id, table_id))]
ColumnIdNotFound { id: u32, table_id: u32 },
ColumnIdNotFound { id: DID, table_id: DID },
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
#[derive(Debug, Clone)]
pub struct Table {
/// Name of the table as a u32 in the chunk dictionary
pub id: u32,
/// Name of the table as a DID in the chunk dictionary
pub id: DID,
/// Map of column id from the chunk dictionary to the column
pub columns: BTreeMap<u32, Column>,
pub columns: BTreeMap<DID, Column>,
}
impl Table {
pub fn new(id: u32) -> Self {
pub fn new(id: DID) -> Self {
Self {
id,
columns: BTreeMap::new(),
@ -163,28 +144,13 @@ impl Table {
}
/// Returns a reference to the specified column
pub(crate) fn column(&self, column_id: u32) -> Result<&Column> {
pub(crate) fn column(&self, column_id: DID) -> Result<&Column> {
self.columns.get(&column_id).context(ColumnIdNotFound {
id: column_id,
table_id: self.id,
})
}
/// Returns a reference to the specified column as a slice of
/// i64s. Errors if the type is not i64
pub fn column_i64(&self, column_id: u32) -> Result<&[Option<i64>]> {
let column = self.column(column_id)?;
match column {
Column::I64(vals, _) => Ok(vals),
_ => InternalColumnTypeMismatch {
column_id,
expected_column_type: "i64",
actual_column_type: column.type_description(),
}
.fail(),
}
}
/// Validates the schema of the passed in columns, then adds their values to
/// the associated columns in the table and updates summary statistics.
pub fn write_columns(
@ -282,17 +248,20 @@ impl Table {
/// Returns the column selection for all the columns in this table, orderd
/// by table name
fn all_columns_selection<'a>(&self, chunk: &'a Chunk) -> Result<TableColSelection<'a>> {
fn all_columns_selection<'a>(
&self,
dictionary: &'a Dictionary,
) -> Result<TableColSelection<'a>> {
let cols = self
.columns
.iter()
.map(|(column_id, _)| {
let column_name = chunk.dictionary.lookup_id(*column_id).context(
ColumnIdNotFoundInDictionary {
column_id: *column_id,
chunk: chunk.id,
},
)?;
let column_name =
dictionary
.lookup_id(*column_id)
.context(ColumnIdNotFoundInDictionary {
column_id: *column_id,
})?;
Ok(ColSelection {
column_name,
column_id: *column_id,
@ -309,45 +278,45 @@ impl Table {
/// Returns a column selection for just the specified columns
fn specific_columns_selection<'a>(
&self,
chunk: &'a Chunk,
dictionary: &'a Dictionary,
columns: &'a [&'a str],
) -> Result<TableColSelection<'a>> {
let cols =
columns
.iter()
.map(|&column_name| {
let column_id = chunk.dictionary.id(column_name).context(
ColumnNameNotFoundInDictionary {
column_name,
chunk: chunk.id,
},
)?;
let cols = columns
.iter()
.map(|&column_name| {
let column_id = dictionary
.id(column_name)
.context(ColumnNameNotFoundInDictionary { column_name })?;
Ok(ColSelection {
column_name,
column_id,
})
Ok(ColSelection {
column_name,
column_id,
})
.collect::<Result<_>>()?;
})
.collect::<Result<_>>()?;
Ok(TableColSelection { cols })
}
/// Converts this table to an arrow record batch.
pub fn to_arrow(&self, chunk: &Chunk, selection: Selection<'_>) -> Result<RecordBatch> {
pub fn to_arrow(
&self,
dictionary: &Dictionary,
selection: Selection<'_>,
) -> Result<RecordBatch> {
// translate chunk selection into name/indexes:
let selection = match selection {
Selection::All => self.all_columns_selection(chunk),
Selection::Some(cols) => self.specific_columns_selection(chunk, cols),
Selection::All => self.all_columns_selection(dictionary),
Selection::Some(cols) => self.specific_columns_selection(dictionary, cols),
}?;
self.to_arrow_impl(chunk, &selection)
self.to_arrow_impl(dictionary, &selection)
}
pub fn schema(&self, chunk: &Chunk, selection: Selection<'_>) -> Result<Schema> {
pub fn schema(&self, dictionary: &Dictionary, selection: Selection<'_>) -> Result<Schema> {
// translate chunk selection into name/indexes:
let selection = match selection {
Selection::All => self.all_columns_selection(chunk),
Selection::Some(cols) => self.specific_columns_selection(chunk, cols),
Selection::All => self.all_columns_selection(dictionary),
Selection::Some(cols) => self.specific_columns_selection(dictionary, cols),
}?;
self.schema_impl(&selection)
}
@ -384,7 +353,7 @@ impl Table {
/// requested columns with index are tuples of column_name, column_index
fn to_arrow_impl(
&self,
chunk: &Chunk,
dictionary: &Dictionary,
selection: &TableColSelection<'_>,
) -> Result<RecordBatch> {
let mut columns = Vec::with_capacity(selection.cols.len());
@ -413,12 +382,9 @@ impl Table {
match v {
None => builder.append_null(),
Some(value_id) => {
let tag_value = chunk.dictionary.lookup_id(*value_id).context(
TagValueIdNotFoundInDictionary {
value: *value_id,
chunk: chunk.id,
},
)?;
let tag_value = dictionary
.lookup_id(*value_id)
.context(TagValueIdNotFoundInDictionary { value: *value_id })?;
builder.append_value(tag_value)
}
}
@ -473,124 +439,11 @@ impl Table {
RecordBatch::try_new(schema, columns).context(ArrowError {})
}
/// returns true if any row in this table could possible match the
/// predicate. true does not mean any rows will *actually* match,
/// just that the entire table can not be ruled out.
///
/// false means that no rows in this table could possibly match
pub fn could_match_predicate(&self, chunk_predicate: &ChunkPredicate) -> Result<bool> {
Ok(
self.matches_column_name_predicate(chunk_predicate.field_name_predicate.as_ref())
&& self.matches_table_name_predicate(chunk_predicate.table_name_predicate.as_ref())
&& self.matches_timestamp_predicate(chunk_predicate)?
&& self.has_columns(chunk_predicate.required_columns.as_ref()),
)
}
/// Returns true if the table contains any of the field columns
/// requested or there are no specific fields requested.
fn matches_column_name_predicate(&self, column_selection: Option<&BTreeSet<u32>>) -> bool {
match column_selection {
Some(column_selection) => {
for column_id in column_selection {
if let Some(column) = self.columns.get(column_id) {
if !column.is_tag() {
return true;
}
}
}
// selection only had tag columns
false
}
None => true, // no specific selection
}
}
fn matches_table_name_predicate(&self, table_name_predicate: Option<&BTreeSet<u32>>) -> bool {
match table_name_predicate {
Some(table_name_predicate) => table_name_predicate.contains(&self.id),
None => true, // no table predicate
}
}
/// returns true if there are any timestamps in this table that
/// fall within the timestamp range
fn matches_timestamp_predicate(&self, chunk_predicate: &ChunkPredicate) -> Result<bool> {
match &chunk_predicate.range {
None => Ok(true),
Some(range) => {
let time_column_id = chunk_predicate.time_column_id;
let time_column = self.column(time_column_id)?;
time_column.has_i64_range(range.start, range.end).context(
ColumnPredicateEvaluation {
column: time_column_id,
},
)
}
}
}
/// returns true if no columns are specified, or the table has all
/// columns specified
fn has_columns(&self, columns: Option<&ChunkIdSet>) -> bool {
if let Some(columns) = columns {
match columns {
ChunkIdSet::AtLeastOneMissing => return false,
ChunkIdSet::Present(symbols) => {
for symbol in symbols {
if !self.columns.contains_key(symbol) {
return false;
}
}
}
}
}
true
}
/// returns true if there are any rows in column that are non-null
/// and within the timestamp range specified by pred
pub(crate) fn column_matches_predicate(
&self,
column: &Column,
chunk_predicate: &ChunkPredicate,
) -> Result<bool> {
match column {
Column::F64(v, _) => self.column_value_matches_predicate(v, chunk_predicate),
Column::I64(v, _) => self.column_value_matches_predicate(v, chunk_predicate),
Column::U64(v, _) => self.column_value_matches_predicate(v, chunk_predicate),
Column::String(v, _) => self.column_value_matches_predicate(v, chunk_predicate),
Column::Bool(v, _) => self.column_value_matches_predicate(v, chunk_predicate),
Column::Tag(v, _) => self.column_value_matches_predicate(v, chunk_predicate),
}
}
fn column_value_matches_predicate<T>(
&self,
column_value: &[Option<T>],
chunk_predicate: &ChunkPredicate,
) -> Result<bool> {
match chunk_predicate.range {
None => Ok(true),
Some(range) => {
let time_column_id = chunk_predicate.time_column_id;
let time_column = self.column(time_column_id)?;
time_column
.has_non_null_i64_range(column_value, range.start, range.end)
.context(ColumnPredicateEvaluation {
column: time_column_id,
})
}
}
}
pub fn stats(&self, chunk: &Chunk) -> Vec<ColumnSummary> {
pub fn stats(&self, dictionary: &Dictionary) -> Vec<ColumnSummary> {
self.columns
.iter()
.map(|(column_id, c)| {
let column_name = chunk
.dictionary
let column_name = dictionary
.lookup_id(*column_id)
.expect("column name in dictionary");
@ -615,7 +468,7 @@ impl Table {
struct ColSelection<'a> {
column_name: &'a str,
column_id: u32,
column_id: DID,
}
/// Represets a set of column_name, column_index pairs
@ -637,56 +490,10 @@ mod tests {
use internal_types::entry::test_helpers::lp_to_entry;
use super::*;
use tracker::MemRegistry;
#[test]
fn test_has_columns() {
let registry = Arc::new(MemRegistry::new());
let mut chunk = Chunk::new(42, registry.as_ref());
let dictionary = &mut chunk.dictionary;
let mut table = Table::new(dictionary.lookup_value_or_insert("table_name"));
let lp_lines = vec![
"h2o,state=MA,city=Boston temp=70.4 100",
"h2o,state=MA,city=Boston temp=72.4 250",
];
write_lines_to_table(&mut table, dictionary, lp_lines);
let state_symbol = dictionary.id("state").unwrap();
let new_symbol = dictionary.lookup_value_or_insert("not_a_columns");
assert!(table.has_columns(None));
let pred = ChunkIdSet::AtLeastOneMissing;
assert!(!table.has_columns(Some(&pred)));
let set = BTreeSet::<u32>::new();
let pred = ChunkIdSet::Present(set);
assert!(table.has_columns(Some(&pred)));
let mut set = BTreeSet::new();
set.insert(state_symbol);
let pred = ChunkIdSet::Present(set);
assert!(table.has_columns(Some(&pred)));
let mut set = BTreeSet::new();
set.insert(new_symbol);
let pred = ChunkIdSet::Present(set);
assert!(!table.has_columns(Some(&pred)));
let mut set = BTreeSet::new();
set.insert(state_symbol);
set.insert(new_symbol);
let pred = ChunkIdSet::Present(set);
assert!(!table.has_columns(Some(&pred)));
}
#[test]
fn table_size() {
let registry = Arc::new(MemRegistry::new());
let mut chunk = Chunk::new(42, registry.as_ref());
let dictionary = &mut chunk.dictionary;
let mut dictionary = Dictionary::new();
let mut table = Table::new(dictionary.lookup_value_or_insert("table_name"));
let lp_lines = vec![
@ -694,111 +501,31 @@ mod tests {
"h2o,state=MA,city=Boston temp=72.4 250",
];
write_lines_to_table(&mut table, dictionary, lp_lines.clone());
assert_eq!(128, table.size());
write_lines_to_table(&mut table, &mut dictionary, lp_lines.clone());
assert_eq!(112, table.size());
// doesn't double because of the stats overhead
write_lines_to_table(&mut table, dictionary, lp_lines.clone());
assert_eq!(224, table.size());
write_lines_to_table(&mut table, &mut dictionary, lp_lines.clone());
assert_eq!(192, table.size());
// now make sure it increased by the same amount minus stats overhead
write_lines_to_table(&mut table, dictionary, lp_lines);
assert_eq!(320, table.size());
}
#[test]
fn test_matches_table_name_predicate() {
let registry = Arc::new(MemRegistry::new());
let mut chunk = Chunk::new(42, registry.as_ref());
let dictionary = &mut chunk.dictionary;
let mut table = Table::new(dictionary.lookup_value_or_insert("h2o"));
let lp_lines = vec![
"h2o,state=MA,city=Boston temp=70.4 100",
"h2o,state=MA,city=Boston temp=72.4 250",
];
write_lines_to_table(&mut table, dictionary, lp_lines);
let h2o_symbol = dictionary.id("h2o").unwrap();
assert!(table.matches_table_name_predicate(None));
let set = BTreeSet::new();
assert!(!table.matches_table_name_predicate(Some(&set)));
let mut set = BTreeSet::new();
set.insert(h2o_symbol);
assert!(table.matches_table_name_predicate(Some(&set)));
// Some symbol that is not the same as h2o_symbol
assert_ne!(37377, h2o_symbol);
let mut set = BTreeSet::new();
set.insert(37377);
assert!(!table.matches_table_name_predicate(Some(&set)));
}
#[test]
fn test_matches_column_name_predicate() {
let registry = Arc::new(MemRegistry::new());
let mut chunk = Chunk::new(42, registry.as_ref());
let dictionary = &mut chunk.dictionary;
let mut table = Table::new(dictionary.lookup_value_or_insert("h2o"));
let lp_lines = vec![
"h2o,state=MA,city=Boston temp=70.4,awesomeness=1000 100",
"h2o,state=MA,city=Boston temp=72.4,awesomeness=2000 250",
];
write_lines_to_table(&mut table, dictionary, lp_lines);
let state_symbol = dictionary.id("state").unwrap();
let temp_symbol = dictionary.id("temp").unwrap();
let awesomeness_symbol = dictionary.id("awesomeness").unwrap();
assert!(table.matches_column_name_predicate(None));
let set = BTreeSet::new();
assert!(!table.matches_column_name_predicate(Some(&set)));
// tag columns should not count
let mut set = BTreeSet::new();
set.insert(state_symbol);
assert!(!table.matches_column_name_predicate(Some(&set)));
let mut set = BTreeSet::new();
set.insert(temp_symbol);
assert!(table.matches_column_name_predicate(Some(&set)));
let mut set = BTreeSet::new();
set.insert(temp_symbol);
set.insert(awesomeness_symbol);
assert!(table.matches_column_name_predicate(Some(&set)));
let mut set = BTreeSet::new();
set.insert(temp_symbol);
set.insert(awesomeness_symbol);
set.insert(1337); // some other symbol, but that is ok
assert!(table.matches_column_name_predicate(Some(&set)));
let mut set = BTreeSet::new();
set.insert(1337);
assert!(!table.matches_column_name_predicate(Some(&set)));
write_lines_to_table(&mut table, &mut dictionary, lp_lines);
assert_eq!(272, table.size());
}
#[test]
fn test_to_arrow_schema_all() {
let registry = Arc::new(MemRegistry::new());
let mut chunk = Chunk::new(42, registry.as_ref());
let dictionary = &mut chunk.dictionary;
let mut dictionary = Dictionary::new();
let mut table = Table::new(dictionary.lookup_value_or_insert("table_name"));
let lp_lines = vec![
"h2o,state=MA,city=Boston float_field=70.4,int_field=8i,uint_field=42u,bool_field=t,string_field=\"foo\" 100",
];
write_lines_to_table(&mut table, dictionary, lp_lines);
write_lines_to_table(&mut table, &mut dictionary, lp_lines);
let selection = Selection::All;
let actual_schema = table.schema(&chunk, selection).unwrap();
let actual_schema = table.schema(&dictionary, selection).unwrap();
let expected_schema = SchemaBuilder::new()
.field("bool_field", ArrowDataType::Boolean)
.tag("city")
@ -820,17 +547,15 @@ mod tests {
#[test]
fn test_to_arrow_schema_subset() {
let registry = Arc::new(MemRegistry::new());
let mut chunk = Chunk::new(42, registry.as_ref());
let dictionary = &mut chunk.dictionary;
let mut dictionary = Dictionary::new();
let mut table = Table::new(dictionary.lookup_value_or_insert("table_name"));
let lp_lines = vec!["h2o,state=MA,city=Boston float_field=70.4 100"];
write_lines_to_table(&mut table, dictionary, lp_lines);
write_lines_to_table(&mut table, &mut dictionary, lp_lines);
let selection = Selection::Some(&["float_field"]);
let actual_schema = table.schema(&chunk, selection).unwrap();
let actual_schema = table.schema(&dictionary, selection).unwrap();
let expected_schema = SchemaBuilder::new()
.field("float_field", ArrowDataType::Float64)
.build()

View File

@ -195,13 +195,13 @@ impl InfluxRPCPlanner {
/// Returns a plan that lists the names of tables in this
/// database that have at least one row that matches the
/// conditions listed on `predicate`
pub async fn table_names<D>(&self, database: &D, predicate: Predicate) -> Result<StringSetPlan>
pub fn table_names<D>(&self, database: &D, predicate: Predicate) -> Result<StringSetPlan>
where
D: Database + 'static,
{
let mut builder = StringSetPlanBuilder::new();
for chunk in self.filtered_chunks(database, &predicate).await? {
for chunk in self.filtered_chunks(database, &predicate)? {
let new_table_names = chunk
.table_names(&predicate, builder.known_strings())
.map_err(|e| Box::new(e) as _)
@ -227,7 +227,7 @@ impl InfluxRPCPlanner {
/// columns (as defined in the InfluxDB Data model) names in this
/// database that have more than zero rows which pass the
/// conditions specified by `predicate`.
pub async fn tag_keys<D>(&self, database: &D, predicate: Predicate) -> Result<StringSetPlan>
pub fn tag_keys<D>(&self, database: &D, predicate: Predicate) -> Result<StringSetPlan>
where
D: Database + 'static,
{
@ -246,9 +246,9 @@ impl InfluxRPCPlanner {
let mut need_full_plans = BTreeMap::new();
let mut known_columns = BTreeSet::new();
for chunk in self.filtered_chunks(database, &predicate).await? {
for chunk in self.filtered_chunks(database, &predicate)? {
// try and get the table names that have rows that match the predicate
let table_names = self.chunk_table_names(chunk.as_ref(), &predicate).await?;
let table_names = self.chunk_table_names(chunk.as_ref(), &predicate)?;
for table_name in table_names {
debug!(
@ -308,7 +308,7 @@ impl InfluxRPCPlanner {
// were already known to have data (based on the contents of known_columns)
for (table_name, chunks) in need_full_plans.into_iter() {
let plan = self.tag_keys_plan(&table_name, &predicate, chunks).await?;
let plan = self.tag_keys_plan(&table_name, &predicate, chunks)?;
if let Some(plan) = plan {
builder = builder.append(plan)
@ -326,7 +326,7 @@ impl InfluxRPCPlanner {
/// Returns a plan which finds the distinct, non-null tag values
/// in the specified `tag_name` column of this database which pass
/// the conditions specified by `predicate`.
pub async fn tag_values<D>(
pub fn tag_values<D>(
&self,
database: &D,
tag_name: &str,
@ -351,8 +351,8 @@ impl InfluxRPCPlanner {
let mut need_full_plans = BTreeMap::new();
let mut known_values = BTreeSet::new();
for chunk in self.filtered_chunks(database, &predicate).await? {
let table_names = self.chunk_table_names(chunk.as_ref(), &predicate).await?;
for chunk in self.filtered_chunks(database, &predicate)? {
let table_names = self.chunk_table_names(chunk.as_ref(), &predicate)?;
for table_name in table_names {
debug!(
@ -426,9 +426,7 @@ impl InfluxRPCPlanner {
// time in `known_columns`, and some tables in chunks that we
// need to run a plan to find what values pass the predicate.
for (table_name, chunks) in need_full_plans.into_iter() {
let scan_and_filter = self
.scan_and_filter(&table_name, &predicate, chunks)
.await?;
let scan_and_filter = self.scan_and_filter(&table_name, &predicate, chunks)?;
// if we have any data to scan, make a plan!
if let Some(TableScanAndFilter {
@ -471,11 +469,7 @@ impl InfluxRPCPlanner {
/// datatypes (as defined in the data written via `write_lines`),
/// and which have more than zero rows which pass the conditions
/// specified by `predicate`.
pub async fn field_columns<D>(
&self,
database: &D,
predicate: Predicate,
) -> Result<FieldListPlan>
pub fn field_columns<D>(&self, database: &D, predicate: Predicate) -> Result<FieldListPlan>
where
D: Database + 'static,
{
@ -488,15 +482,12 @@ impl InfluxRPCPlanner {
// values and stops the plan executing once it has them
// map table -> Vec<Arc<Chunk>>
let chunks = self.filtered_chunks(database, &predicate).await?;
let table_chunks = self.group_chunks_by_table(&predicate, chunks).await?;
let chunks = self.filtered_chunks(database, &predicate)?;
let table_chunks = self.group_chunks_by_table(&predicate, chunks)?;
let mut field_list_plan = FieldListPlan::new();
for (table_name, chunks) in table_chunks {
if let Some(plan) = self
.field_columns_plan(&table_name, &predicate, chunks)
.await?
{
if let Some(plan) = self.field_columns_plan(&table_name, &predicate, chunks)? {
field_list_plan = field_list_plan.append(plan);
}
}
@ -523,7 +514,7 @@ impl InfluxRPCPlanner {
/// rows for a particular series (groups where all tags are the
/// same) occur together in the plan
pub async fn read_filter<D>(&self, database: &D, predicate: Predicate) -> Result<SeriesSetPlans>
pub fn read_filter<D>(&self, database: &D, predicate: Predicate) -> Result<SeriesSetPlans>
where
D: Database + 'static,
{
@ -531,17 +522,15 @@ impl InfluxRPCPlanner {
// group tables by chunk, pruning if possible
// key is table name, values are chunks
let chunks = self.filtered_chunks(database, &predicate).await?;
let table_chunks = self.group_chunks_by_table(&predicate, chunks).await?;
let chunks = self.filtered_chunks(database, &predicate)?;
let table_chunks = self.group_chunks_by_table(&predicate, chunks)?;
// now, build up plans for each table
let mut ss_plans = Vec::with_capacity(table_chunks.len());
for (table_name, chunks) in table_chunks {
let prefix_columns: Option<&[&str]> = None;
let ss_plan = self
.read_filter_plan(table_name, prefix_columns, &predicate, chunks)
.await?;
let ss_plan = self.read_filter_plan(table_name, prefix_columns, &predicate, chunks)?;
// If we have to do real work, add it to the list of plans
if let Some(ss_plan) = ss_plan {
ss_plans.push(ss_plan);
@ -555,7 +544,7 @@ impl InfluxRPCPlanner {
/// with rows grouped by an aggregate function. Note that we still
/// group by all tags (so group within series) and the
/// group_columns define the order of the result
pub async fn read_group<D>(
pub fn read_group<D>(
&self,
database: &D,
predicate: Predicate,
@ -568,8 +557,8 @@ impl InfluxRPCPlanner {
debug!(predicate=?predicate, agg=?agg, "planning read_group");
// group tables by chunk, pruning if possible
let chunks = self.filtered_chunks(database, &predicate).await?;
let table_chunks = self.group_chunks_by_table(&predicate, chunks).await?;
let chunks = self.filtered_chunks(database, &predicate)?;
let table_chunks = self.group_chunks_by_table(&predicate, chunks)?;
let num_prefix_tag_group_columns = group_columns.len();
// now, build up plans for each table
@ -577,13 +566,9 @@ impl InfluxRPCPlanner {
for (table_name, chunks) in table_chunks {
let ss_plan = match agg {
Aggregate::None => {
self.read_filter_plan(table_name, Some(group_columns), &predicate, chunks)
.await?
}
_ => {
self.read_group_plan(table_name, &predicate, agg, group_columns, chunks)
.await?
self.read_filter_plan(table_name, Some(group_columns), &predicate, chunks)?
}
_ => self.read_group_plan(table_name, &predicate, agg, group_columns, chunks)?,
};
// If we have to do real work, add it to the list of plans
@ -598,7 +583,7 @@ impl InfluxRPCPlanner {
/// Creates a GroupedSeriesSet plan that produces an output table with rows
/// that are grouped by window defintions
pub async fn read_window_aggregate<D>(
pub fn read_window_aggregate<D>(
&self,
database: &D,
predicate: Predicate,
@ -612,15 +597,14 @@ impl InfluxRPCPlanner {
debug!(predicate=?predicate, "planning read_window_aggregate");
// group tables by chunk, pruning if possible
let chunks = self.filtered_chunks(database, &predicate).await?;
let table_chunks = self.group_chunks_by_table(&predicate, chunks).await?;
let chunks = self.filtered_chunks(database, &predicate)?;
let table_chunks = self.group_chunks_by_table(&predicate, chunks)?;
// now, build up plans for each table
let mut ss_plans = Vec::with_capacity(table_chunks.len());
for (table_name, chunks) in table_chunks {
let ss_plan = self
.read_window_aggregate_plan(table_name, &predicate, agg, &every, &offset, chunks)
.await?;
.read_window_aggregate_plan(table_name, &predicate, agg, &every, &offset, chunks)?;
// If we have to do real work, add it to the list of plans
if let Some(ss_plan) = ss_plan {
ss_plans.push(ss_plan);
@ -631,7 +615,7 @@ impl InfluxRPCPlanner {
}
/// Creates a map of table_name --> Chunks that have that table
async fn group_chunks_by_table<C>(
fn group_chunks_by_table<C>(
&self,
predicate: &Predicate,
chunks: Vec<Arc<C>>,
@ -641,7 +625,7 @@ impl InfluxRPCPlanner {
{
let mut table_chunks = BTreeMap::new();
for chunk in chunks {
let table_names = self.chunk_table_names(chunk.as_ref(), &predicate).await?;
let table_names = self.chunk_table_names(chunk.as_ref(), &predicate)?;
for table_name in table_names {
table_chunks
.entry(table_name)
@ -653,11 +637,7 @@ impl InfluxRPCPlanner {
}
/// Find all the table names in the specified chunk that pass the predicate
async fn chunk_table_names<C>(
&self,
chunk: &C,
predicate: &Predicate,
) -> Result<BTreeSet<String>>
fn chunk_table_names<C>(&self, chunk: &C, predicate: &Predicate) -> Result<BTreeSet<String>>
where
C: PartitionChunk + 'static,
{
@ -705,7 +685,7 @@ impl InfluxRPCPlanner {
/// Filter(predicate)
/// TableScan (of chunks)
/// ```
async fn tag_keys_plan<C>(
fn tag_keys_plan<C>(
&self,
table_name: &str,
predicate: &Predicate,
@ -714,7 +694,7 @@ impl InfluxRPCPlanner {
where
C: PartitionChunk + 'static,
{
let scan_and_filter = self.scan_and_filter(table_name, predicate, chunks).await?;
let scan_and_filter = self.scan_and_filter(table_name, predicate, chunks)?;
let TableScanAndFilter {
plan_builder,
@ -767,7 +747,7 @@ impl InfluxRPCPlanner {
/// Filter(predicate) [optional]
/// Scan
/// ```
async fn field_columns_plan<C>(
fn field_columns_plan<C>(
&self,
table_name: &str,
predicate: &Predicate,
@ -776,7 +756,7 @@ impl InfluxRPCPlanner {
where
C: PartitionChunk + 'static,
{
let scan_and_filter = self.scan_and_filter(table_name, predicate, chunks).await?;
let scan_and_filter = self.scan_and_filter(table_name, predicate, chunks)?;
let TableScanAndFilter {
plan_builder,
schema,
@ -817,7 +797,7 @@ impl InfluxRPCPlanner {
/// Order by (tag_columns, timestamp_column)
/// Filter(predicate)
/// Scan
async fn read_filter_plan<C>(
fn read_filter_plan<C>(
&self,
table_name: impl Into<String>,
prefix_columns: Option<&[impl AsRef<str>]>,
@ -828,7 +808,7 @@ impl InfluxRPCPlanner {
C: PartitionChunk + 'static,
{
let table_name = table_name.into();
let scan_and_filter = self.scan_and_filter(&table_name, predicate, chunks).await?;
let scan_and_filter = self.scan_and_filter(&table_name, predicate, chunks)?;
let TableScanAndFilter {
plan_builder,
@ -937,7 +917,7 @@ impl InfluxRPCPlanner {
/// GroupBy(gby cols, aggs, time cols)
/// Filter(predicate)
/// Scan
pub async fn read_group_plan<C>(
pub fn read_group_plan<C>(
&self,
table_name: impl Into<String>,
predicate: &Predicate,
@ -949,7 +929,7 @@ impl InfluxRPCPlanner {
C: PartitionChunk + 'static,
{
let table_name = table_name.into();
let scan_and_filter = self.scan_and_filter(&table_name, predicate, chunks).await?;
let scan_and_filter = self.scan_and_filter(&table_name, predicate, chunks)?;
let TableScanAndFilter {
plan_builder,
@ -1027,7 +1007,7 @@ impl InfluxRPCPlanner {
/// GroupBy(gby: tag columns, window_function; agg: aggregate(field)
/// Filter(predicate)
/// Scan
pub async fn read_window_aggregate_plan<C>(
pub fn read_window_aggregate_plan<C>(
&self,
table_name: impl Into<String>,
predicate: &Predicate,
@ -1040,7 +1020,7 @@ impl InfluxRPCPlanner {
C: PartitionChunk + 'static,
{
let table_name = table_name.into();
let scan_and_filter = self.scan_and_filter(&table_name, predicate, chunks).await?;
let scan_and_filter = self.scan_and_filter(&table_name, predicate, chunks)?;
let TableScanAndFilter {
plan_builder,
@ -1114,7 +1094,7 @@ impl InfluxRPCPlanner {
/// Filter(predicate) [optional]
/// Scan
/// ```
async fn scan_and_filter<C>(
fn scan_and_filter<C>(
&self,
table_name: &str,
predicate: &Predicate,
@ -1190,7 +1170,7 @@ impl InfluxRPCPlanner {
/// Returns a list of chunks across all partitions which may
/// contain data that pass the predicate
async fn filtered_chunks<D>(
fn filtered_chunks<D>(
&self,
database: &D,
predicate: &Predicate,

View File

@ -681,6 +681,10 @@ impl Db {
)
.context(SequencedEntryError)?;
if self.rules.read().wal_buffer_config.is_some() {
todo!("route to the Write Buffer. TODO: carols10cents #1157")
}
self.store_sequenced_entry(sequenced_entry)
}

View File

@ -3,7 +3,6 @@
use std::convert::TryFrom;
use mutable_buffer::{chunk::Chunk, pred::ChunkPredicate};
use query::predicate::Predicate;
use snafu::Snafu;
@ -11,15 +10,6 @@ use snafu::Snafu;
pub enum Error {
#[snafu(display("Error translating predicate: {}", msg))]
ReadBufferPredicate { msg: String, pred: Predicate },
#[snafu(display("Error building predicate for mutable buffer: {}", source))]
MutableBufferPredicate { source: mutable_buffer::pred::Error },
}
impl From<mutable_buffer::pred::Error> for Error {
fn from(source: mutable_buffer::pred::Error) -> Self {
Self::MutableBufferPredicate { source }
}
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@ -52,25 +42,6 @@ pub fn to_read_buffer_predicate(predicate: &Predicate) -> Result<read_buffer::Pr
}
}
/// Converts a [`query::Predicate`] into [`ChunkPredicate`],
/// suitable for evaluating on the MutableBuffer.
pub fn to_mutable_buffer_predicate(
chunk: impl AsRef<Chunk>,
predicate: &Predicate,
) -> Result<ChunkPredicate> {
let predicate = chunk
.as_ref()
.predicate_builder()?
.table_names(predicate.table_names.as_ref())?
.field_names(predicate.field_columns.as_ref())?
.range(predicate.range)?
// it would be nice to avoid cloning all the exprs here.
.exprs(predicate.exprs.clone())?
.build();
Ok(predicate)
}
#[cfg(test)]
pub mod test {
use super::*;
@ -196,7 +167,6 @@ pub mod test {
Error::ReadBufferPredicate { msg, pred: _ } => {
assert_eq!(msg, exp.to_owned());
}
_ => panic!("Unexpected error type"),
}
}
}

View File

@ -1,15 +1,9 @@
//! Adapter streams for different Chunk types that implement the interface
//! needed by DataFusion
use arrow_deps::{
arrow::{
datatypes::SchemaRef,
error::{ArrowError, Result as ArrowResult},
record_batch::RecordBatch,
},
arrow::{datatypes::SchemaRef, error::Result as ArrowResult, record_batch::RecordBatch},
datafusion::physical_plan::RecordBatchStream,
};
use internal_types::selection::Selection;
use mutable_buffer::chunk::Chunk as MBChunk;
use read_buffer::ReadFilterResults;
use std::{
@ -17,100 +11,6 @@ use std::{
task::{Context, Poll},
};
use snafu::{ResultExt, Snafu};
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display(
"Error getting data for table '{}' chunk {}: {}",
table_name,
chunk_id,
source
))]
GettingTableData {
table_name: String,
chunk_id: u32,
source: mutable_buffer::chunk::Error,
},
}
/// Adapter which will produce record batches from a mutable buffer
/// chunk on demand
pub(crate) struct MutableBufferChunkStream {
/// Requested output schema (includes selection)
schema: SchemaRef,
chunk: Arc<MBChunk>,
table_name: Arc<String>,
/// Vector of record batches to send in reverse order (send data[len-1]
/// next) Is None until the first call to poll_next
data: Option<Vec<RecordBatch>>,
}
impl MutableBufferChunkStream {
#[allow(dead_code)]
pub fn new(chunk: Arc<MBChunk>, schema: SchemaRef, table_name: impl Into<String>) -> Self {
Self {
chunk,
schema,
table_name: Arc::new(table_name.into()),
data: None,
}
}
// gets the next batch, as needed
fn next_batch(&mut self) -> ArrowResult<Option<RecordBatch>> {
if self.data.is_none() {
// Want all the columns in the schema. Note we don't
// use `Selection::All` here because the mutable buffer chunk would interpret it
// as "all columns in the table in that chunk" rather than
// all columns this query needs
let selected_cols = self
.schema
.fields()
.iter()
.map(|f| f.name() as &str)
.collect::<Vec<_>>();
let selection = Selection::Some(&selected_cols);
let mut data = Vec::new();
self.chunk
.table_to_arrow(&mut data, self.table_name.as_ref(), selection)
.context(GettingTableData {
table_name: self.table_name.as_ref(),
chunk_id: self.chunk.id(),
})
.map_err(|e| ArrowError::ExternalError(Box::new(e)))?;
// reverse the array so we can pop off the back
data.reverse();
self.data = Some(data);
}
// self.data was set to Some above
Ok(self.data.as_mut().unwrap().pop())
}
}
impl RecordBatchStream for MutableBufferChunkStream {
fn schema(&self) -> SchemaRef {
Arc::clone(&self.schema)
}
}
impl futures::Stream for MutableBufferChunkStream {
type Item = ArrowResult<RecordBatch>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
_: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
Poll::Ready(self.next_batch().transpose())
}
// TODO is there a useful size_hint to pass?
}
/// Adapter which will take a ReadFilterResults and make it an async stream
pub struct ReadFilterResultsStream {
read_results: ReadFilterResults,

View File

@ -67,6 +67,7 @@
clippy::clone_on_ref_ptr
)]
use std::convert::TryInto;
use std::sync::Arc;
use async_trait::async_trait;
@ -150,6 +151,10 @@ pub enum Error {
WalError { source: buffer::Error },
#[snafu(display("error converting line protocol to flatbuffers: {}", source))]
LineConversion { source: entry::Error },
#[snafu(display("error decoding entry flatbuffers: {}", source))]
DecodingEntry {
source: flatbuffers::InvalidFlatbuffer,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@ -368,6 +373,19 @@ impl<M: ConnectionManager> Server<M> {
Ok(())
}
pub async fn write_entry(&self, db_name: &str, entry_bytes: Vec<u8>) -> Result<()> {
self.require_id()?;
let db_name = DatabaseName::new(db_name).context(InvalidDatabaseName)?;
let db = self
.config
.db(&db_name)
.context(DatabaseNotFound { db_name: &*db_name })?;
let entry = entry_bytes.try_into().context(DecodingEntry)?;
self.handle_write_entry(&db, entry).await
}
pub async fn handle_write_entry(&self, db: &Db, entry: Entry) -> Result<()> {
db.store_entry(entry)
.map_err(|e| Error::UnknownDatabaseError {
@ -674,7 +692,7 @@ mod tests {
use tokio_util::sync::CancellationToken;
use arrow_deps::{assert_table_eq, datafusion::physical_plan::collect};
use data_types::database_rules::{PartitionTemplate, TemplatePart};
use data_types::database_rules::{PartitionTemplate, TemplatePart, NO_SHARD_CONFIG};
use influxdb_line_protocol::parse_lines;
use object_store::{memory::InMemory, path::ObjectStorePath};
use query::{frontend::sql::SQLQueryPlanner, Database};
@ -867,6 +885,55 @@ mod tests {
assert_table_eq!(expected, &batches);
}
#[tokio::test]
async fn write_entry_local() {
let manager = TestConnectionManager::new();
let store = Arc::new(ObjectStore::new_in_memory(InMemory::new()));
let server = Server::new(manager, store);
server.set_id(NonZeroU32::new(1).unwrap()).unwrap();
let name = DatabaseName::new("foo".to_string()).unwrap();
server
.create_database(
DatabaseRules::new(name),
server.require_id().unwrap(),
Arc::clone(&server.store),
)
.await
.unwrap();
let db_name = DatabaseName::new("foo").unwrap();
let db = server.db(&db_name).unwrap();
let line = "cpu bar=1 10";
let lines: Vec<_> = parse_lines(line).map(|l| l.unwrap()).collect();
let sharded_entries = lines_to_sharded_entries(&lines, NO_SHARD_CONFIG, &*db.rules.read())
.expect("sharded entries");
let entry = &sharded_entries[0].entry;
server
.write_entry("foo", entry.data().into())
.await
.expect("write entry");
let planner = SQLQueryPlanner::default();
let executor = server.executor();
let physical_plan = planner
.query(db, "select * from cpu", executor.as_ref())
.await
.unwrap();
let batches = collect(physical_plan).await.unwrap();
let expected = vec![
"+-----+------+",
"| bar | time |",
"+-----+------+",
"| 1 | 10 |",
"+-----+------+",
];
assert_table_eq!(expected, &batches);
}
#[tokio::test]
async fn close_chunk() {
test_helpers::maybe_start_logging();

View File

@ -35,7 +35,6 @@ macro_rules! run_field_columns_test_case {
let plan = planner
.field_columns(&db, predicate.clone())
.await
.expect("built plan successfully");
let fields = executor
.to_field_list(plan)
@ -137,7 +136,6 @@ async fn test_field_name_plan() {
let plan = planner
.field_columns(&db, predicate.clone())
.await
.expect("built plan successfully");
let mut plans = plan.plans;

View File

@ -50,7 +50,6 @@ macro_rules! run_read_filter_test_case {
let plan = planner
.read_filter(&db, predicate.clone())
.await
.expect("built plan successfully");
let string_results = run_series_set_plan(executor, plan).await;

View File

@ -30,7 +30,6 @@ macro_rules! run_read_group_test_case {
let plans = planner
.read_group(&db, predicate.clone(), agg, &group_columns)
.await
.expect("built plan successfully");
let plans = plans.into_inner();

View File

@ -34,7 +34,6 @@ macro_rules! run_read_window_aggregate_test_case {
let plans = planner
.read_window_aggregate(&db, predicate.clone(), agg, every.clone(), offset.clone())
.await
.expect("built plan successfully");
let plans = plans.into_inner();

View File

@ -27,7 +27,6 @@ macro_rules! run_table_names_test_case {
let plan = planner
.table_names(&db, predicate.clone())
.await
.expect("built plan successfully");
let names = executor
.to_string_set(plan)

View File

@ -31,7 +31,6 @@ macro_rules! run_tag_keys_test_case {
let plan = planner
.tag_keys(&db, predicate.clone())
.await
.expect("built plan successfully");
let names = executor
.to_string_set(plan)

View File

@ -29,7 +29,6 @@ macro_rules! run_tag_values_test_case {
let plan = planner
.tag_values(&db, &tag_name, predicate.clone())
.await
.expect("built plan successfully");
let names = executor
.to_string_set(plan)
@ -239,7 +238,7 @@ async fn list_tag_values_field_col() {
// Test: temp is a field, not a tag
let tag_name = "temp";
let plan_result = planner.tag_values(&db, &tag_name, predicate.clone()).await;
let plan_result = planner.tag_values(&db, &tag_name, predicate.clone());
assert_eq!(
plan_result.unwrap_err().to_string(),

View File

@ -278,7 +278,7 @@ async fn sql_select_from_system_tables() {
"+----+---------------+-------------------+-----------------+",
"| id | partition_key | storage | estimated_bytes |",
"+----+---------------+-------------------+-----------------+",
"| 0 | 1970-01-01T00 | OpenMutableBuffer | 493 |",
"| 0 | 1970-01-01T00 | OpenMutableBuffer | 453 |",
"+----+---------------+-------------------+-----------------+",
];
run_sql_test_case!(

View File

@ -23,6 +23,11 @@ pub fn default_server_error_handler(error: server::Error) -> tonic::Status {
description: source.to_string(),
}
.into(),
Error::DecodingEntry { source } => FieldViolation {
field: "entry".into(),
description: source.to_string(),
}
.into(),
error => {
error!(?error, "Unexpected error");
InternalError {}.into()

View File

@ -714,7 +714,6 @@ where
let plan = planner
.table_names(db.as_ref(), predicate)
.await
.map_err(|e| Box::new(e) as _)
.context(ListingTables { db_name })?;
let executor = db_store.executor();
@ -765,7 +764,6 @@ where
let tag_key_plan = planner
.tag_keys(db.as_ref(), predicate)
.await
.map_err(|e| Box::new(e) as _)
.context(ListingColumns {
db_name: db_name.as_str(),
@ -825,7 +823,6 @@ where
let tag_value_plan = planner
.tag_values(db.as_ref(), tag_name, predicate)
.await
.map_err(|e| Box::new(e) as _)
.context(ListingTagValues { db_name, tag_name })?;
@ -882,7 +879,6 @@ where
let series_plan = planner
.read_filter(db.as_ref(), predicate)
.await
.map_err(|e| Box::new(e) as _)
.context(PlanningFilteringSeries { db_name })?;
@ -968,14 +964,10 @@ where
let grouped_series_set_plan = match gby_agg {
GroupByAndAggregate::Columns { agg, group_columns } => {
planner
.read_group(db.as_ref(), predicate, agg, &group_columns)
.await
planner.read_group(db.as_ref(), predicate, agg, &group_columns)
}
GroupByAndAggregate::Window { agg, every, offset } => {
planner
.read_window_aggregate(db.as_ref(), predicate, agg, every, offset)
.await
planner.read_window_aggregate(db.as_ref(), predicate, agg, every, offset)
}
};
let grouped_series_set_plan = grouped_series_set_plan
@ -1039,7 +1031,6 @@ where
let field_list_plan = planner
.field_columns(db.as_ref(), predicate)
.await
.map_err(|e| Box::new(e) as _)
.context(ListingFields { db_name })?;

View File

@ -47,6 +47,23 @@ where
let lines_written = lp_line_count as u64;
Ok(Response::new(WriteResponse { lines_written }))
}
async fn write_entry(
&self,
request: tonic::Request<WriteEntryRequest>,
) -> Result<tonic::Response<WriteEntryResponse>, tonic::Status> {
let request = request.into_inner();
if request.entry.is_empty() {
return Err(FieldViolation::required("entry").into());
}
self.server
.write_entry(&request.db_name, request.entry)
.await
.map_err(default_server_error_handler)?;
Ok(Response::new(WriteEntryResponse {}))
}
}
/// Instantiate the write service

View File

@ -277,7 +277,7 @@ async fn test_chunk_get() {
partition_key: "cpu".into(),
id: 0,
storage: ChunkStorage::OpenMutableBuffer as i32,
estimated_bytes: 145,
estimated_bytes: 137,
time_of_first_write: None,
time_of_last_write: None,
time_closing: None,
@ -286,7 +286,7 @@ async fn test_chunk_get() {
partition_key: "disk".into(),
id: 0,
storage: ChunkStorage::OpenMutableBuffer as i32,
estimated_bytes: 107,
estimated_bytes: 103,
time_of_first_write: None,
time_of_last_write: None,
time_closing: None,
@ -452,7 +452,7 @@ async fn test_list_partition_chunks() {
partition_key: "cpu".into(),
id: 0,
storage: ChunkStorage::OpenMutableBuffer as i32,
estimated_bytes: 145,
estimated_bytes: 137,
time_of_first_write: None,
time_of_last_write: None,
time_closing: None,

View File

@ -191,7 +191,7 @@ async fn test_get_chunks() {
.and(predicate::str::contains(
r#""storage": "OpenMutableBuffer","#,
))
.and(predicate::str::contains(r#""estimated_bytes": 145"#))
.and(predicate::str::contains(r#""estimated_bytes": 137"#))
// Check for a non empty timestamp such as
// "time_of_first_write": "2021-03-30T17:11:10.723866Z",
.and(predicate::str::contains(r#""time_of_first_write": "20"#));