Merge remote-tracking branch 'origin/main' into cn/google-list-with-delimiter

pull/24376/head
Carol (Nichols || Goulding) 2021-02-22 12:53:46 -05:00
commit a42103f436
43 changed files with 1456 additions and 914 deletions

View File

@ -45,6 +45,12 @@ jobs:
with:
command: build
args: --workspace
# Ensure benches still build
- name: Build Benches
uses: actions-rs/cargo@v1
with:
command: test
args: --workspace --benches --no-run
test:
name: Test

31
Cargo.lock generated
View File

@ -101,7 +101,7 @@ checksum = "23b62fc65de8e4e7f52534fb52b0f3ed04746ae267519eef2a83941e8085068b"
[[package]]
name = "arrow"
version = "4.0.0-SNAPSHOT"
source = "git+https://github.com/apache/arrow.git?rev=e2d6c057684b587151afffe50f7eaef94533e017#e2d6c057684b587151afffe50f7eaef94533e017"
source = "git+https://github.com/apache/arrow.git?rev=ad4504e8e85eb8e5babe0f01ca8cf9947499fc40#ad4504e8e85eb8e5babe0f01ca8cf9947499fc40"
dependencies = [
"cfg_aliases",
"chrono",
@ -124,7 +124,7 @@ dependencies = [
[[package]]
name = "arrow-flight"
version = "4.0.0-SNAPSHOT"
source = "git+https://github.com/apache/arrow.git?rev=e2d6c057684b587151afffe50f7eaef94533e017#e2d6c057684b587151afffe50f7eaef94533e017"
source = "git+https://github.com/apache/arrow.git?rev=ad4504e8e85eb8e5babe0f01ca8cf9947499fc40#ad4504e8e85eb8e5babe0f01ca8cf9947499fc40"
dependencies = [
"arrow",
"bytes",
@ -237,7 +237,7 @@ checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a"
[[package]]
name = "azure_core"
version = "0.1.0"
source = "git+https://github.com/Azure/azure-sdk-for-rust.git?rev=5ecad7216e1f04c5ff41e7de4667f006664c8cca#5ecad7216e1f04c5ff41e7de4667f006664c8cca"
source = "git+https://github.com/Azure/azure-sdk-for-rust.git?rev=14ff9326bb1ba07f98733a548988eccd4532b945#14ff9326bb1ba07f98733a548988eccd4532b945"
dependencies = [
"RustyXML",
"async-trait",
@ -266,7 +266,7 @@ dependencies = [
[[package]]
name = "azure_storage"
version = "0.1.0"
source = "git+https://github.com/Azure/azure-sdk-for-rust.git?rev=5ecad7216e1f04c5ff41e7de4667f006664c8cca#5ecad7216e1f04c5ff41e7de4667f006664c8cca"
source = "git+https://github.com/Azure/azure-sdk-for-rust.git?rev=14ff9326bb1ba07f98733a548988eccd4532b945#14ff9326bb1ba07f98733a548988eccd4532b945"
dependencies = [
"RustyXML",
"azure_core",
@ -289,6 +289,7 @@ dependencies = [
"serde_derive",
"serde_json",
"smallvec",
"thiserror",
"time 0.2.25",
"url",
"uuid",
@ -848,7 +849,7 @@ dependencies = [
[[package]]
name = "datafusion"
version = "4.0.0-SNAPSHOT"
source = "git+https://github.com/apache/arrow.git?rev=e2d6c057684b587151afffe50f7eaef94533e017#e2d6c057684b587151afffe50f7eaef94533e017"
source = "git+https://github.com/apache/arrow.git?rev=ad4504e8e85eb8e5babe0f01ca8cf9947499fc40#ad4504e8e85eb8e5babe0f01ca8cf9947499fc40"
dependencies = [
"ahash 0.7.0",
"arrow",
@ -1206,6 +1207,23 @@ dependencies = [
"once_cell",
]
[[package]]
name = "futures-test"
version = "0.3.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b30f48f6b9cd26d8739965d6e3345c511718884fb223795b80dc71d24a9ea9a"
dependencies = [
"futures-core",
"futures-executor",
"futures-io",
"futures-sink",
"futures-task",
"futures-util",
"once_cell",
"pin-project 1.0.5",
"pin-utils",
]
[[package]]
name = "futures-util"
version = "0.3.12"
@ -2134,6 +2152,7 @@ dependencies = [
"cloud-storage",
"dotenv",
"futures",
"futures-test",
"itertools 0.9.0",
"percent-encoding",
"reqwest",
@ -2282,7 +2301,7 @@ dependencies = [
[[package]]
name = "parquet"
version = "4.0.0-SNAPSHOT"
source = "git+https://github.com/apache/arrow.git?rev=e2d6c057684b587151afffe50f7eaef94533e017#e2d6c057684b587151afffe50f7eaef94533e017"
source = "git+https://github.com/apache/arrow.git?rev=ad4504e8e85eb8e5babe0f01ca8cf9947499fc40#ad4504e8e85eb8e5babe0f01ca8cf9947499fc40"
dependencies = [
"arrow",
"base64 0.12.3",

View File

@ -8,11 +8,11 @@ description = "Apache Arrow / Parquet / DataFusion dependencies for InfluxDB IOx
[dependencies] # In alphabetical order
# We are using development version of arrow/parquet/datafusion and the dependencies are at the same rev
# The version can be found here: https://github.com/apache/arrow/commit/e2d6c057684b587151afffe50f7eaef94533e017
# The version can be found here: https://github.com/apache/arrow/commit/ad4504e8e85eb8e5babe0f01ca8cf9947499fc40
#
arrow = { git = "https://github.com/apache/arrow.git", rev = "e2d6c057684b587151afffe50f7eaef94533e017" , features = ["simd"] }
arrow-flight = { git = "https://github.com/apache/arrow.git", rev = "e2d6c057684b587151afffe50f7eaef94533e017" }
datafusion = { git = "https://github.com/apache/arrow.git", rev = "e2d6c057684b587151afffe50f7eaef94533e017" }
arrow = { git = "https://github.com/apache/arrow.git", rev = "ad4504e8e85eb8e5babe0f01ca8cf9947499fc40" , features = ["simd"] }
arrow-flight = { git = "https://github.com/apache/arrow.git", rev = "ad4504e8e85eb8e5babe0f01ca8cf9947499fc40" }
datafusion = { git = "https://github.com/apache/arrow.git", rev = "ad4504e8e85eb8e5babe0f01ca8cf9947499fc40" }
# Turn off the "arrow" feature; it currently has a bug that causes the crate to rebuild every time
# and we're not currently using it anyway
parquet = { git = "https://github.com/apache/arrow.git", rev = "e2d6c057684b587151afffe50f7eaef94533e017", default-features = false, features = ["snap", "brotli", "flate2", "lz4", "zstd"] }
parquet = { git = "https://github.com/apache/arrow.git", rev = "ad4504e8e85eb8e5babe0f01ca8cf9947499fc40", default-features = false, features = ["snap", "brotli", "flate2", "lz4", "zstd"] }

14
build.rs Normal file
View File

@ -0,0 +1,14 @@
// Include the GIT_HASH, if any, in `GIT_HASH` environment variable at build
// time
//
// https://stackoverflow.com/questions/43753491/include-git-commit-hash-as-string-into-rust-program
use std::process::Command;
fn main() {
let output = Command::new("git").args(&["rev-parse", "HEAD"]).output();
if let Ok(output) = output {
if let Ok(git_hash) = String::from_utf8(output.stdout) {
println!("cargo:rustc-env=GIT_HASH={}", git_hash);
}
}
}

View File

@ -1,4 +1,4 @@
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Copy)]
/// A collection of columns to include in query results.
///
/// The `All` variant denotes that the caller wishes to include all table

44
docs/env.example Normal file
View File

@ -0,0 +1,44 @@
# This is an example .env file showing all of the environment variables that can
# be configured within the project. Copy this file to the top level directory with
# the name `.env`, then remove any `#` at the beginning of the lines of variables
# you'd like to set and change the values.
#
# The identifier for the server. Used for writing to object storage and as
# an identifier that is added to replicated writes, WAL segments and Chunks.
# Must be unique in a group of connected or semi-connected IOx servers.
# Must be a number that can be represented by a 32-bit unsigned integer.
# INFLUXDB_IOX_ID=1
#
# Where to store files on disk:
# INFLUXDB_IOX_DB_DIR=$HOME/.influxdb_iox
# TEST_INFLUXDB_IOX_DB_DIR=$HOME/.influxdb_iox
#
# Addresses for the server processes:
# INFLUXDB_IOX_BIND_ADDR=127.0.0.1:8080
# INFLUXDB_IOX_GRPC_BIND_ADDR=127.0.0.1:8082
#
# If using Amazon S3 as an object store:
# AWS_ACCESS_KEY_ID=access_key_value
# AWS_SECRET_ACCESS_KEY=secret_access_key_value
# AWS_DEFAULT_REGION=us-east-2
# AWS_S3_BUCKET_NAME=bucket-name
#
# If using Google Cloud Storage as an object store:
# GCS_BUCKET_NAME=bucket_name
# Set one of SERVICE_ACCOUNT or GOOGLE_APPLICATION_CREDENTIALS, either to a path of a filename
# containing Google credential JSON or to the JSON directly.
# SERVICE_ACCOUNT=/path/to/auth/info.json
# GOOGLE_APPLICATION_CREDENTIALS={"project_id": ...}
#
# If using Microsoft Azure as an object store:
# The name you see when going to All Services > Storage accounts > [name]
# AZURE_STORAGE_ACCOUNT=
# The name of a container you've created in the storage account, under Blob Service > Containers
# AZURE_STORAGE_CONTAINER=
# In the Storage account's Settings > Access keys, one of the Key values
# AZURE_STORAGE_MASTER_KEY=
#
# To enable Jaeger tracing:
# OTEL_SERVICE_NAME="iox" # defaults to iox
# OTEL_EXPORTER_JAEGER_AGENT_HOST="jaeger.influxdata.net"
# OTEL_EXPORTER_JAEGER_AGENT_PORT="6831"

View File

@ -2,7 +2,7 @@ use generated_types::wal;
use query::group_by::GroupByAndAggregate;
use query::group_by::WindowDuration;
use query::{
exec::{stringset::StringSet, FieldListPlan, SeriesSetPlan, SeriesSetPlans},
exec::{stringset::StringSet, SeriesSetPlan, SeriesSetPlans},
predicate::Predicate,
Database,
};
@ -213,15 +213,6 @@ impl Database for MutableBufferDb {
Ok(())
}
/// return all field names in this database, while applying optional
/// predicates
async fn field_column_names(&self, predicate: Predicate) -> Result<FieldListPlan, Self::Error> {
let mut filter = ChunkTableFilter::new(predicate);
let mut visitor = TableFieldPredVisitor::new();
self.accept(&mut filter, &mut visitor)?;
Ok(visitor.into_fieldlist_plan())
}
/// return all column values in this database, while applying optional
/// predicates
async fn column_values(
@ -275,7 +266,7 @@ impl Database for MutableBufferDb {
}
/// Return the partition keys for data in this DB
async fn partition_keys(&self) -> Result<Vec<String>, Self::Error> {
fn partition_keys(&self) -> Result<Vec<String>, Self::Error> {
let partitions = self.partitions.read().expect("mutex poisoned");
let keys = partitions.keys().cloned().collect();
Ok(keys)
@ -283,7 +274,7 @@ impl Database for MutableBufferDb {
/// Return the list of chunks, in order of id, for the specified
/// partition_key
async fn chunks(&self, partition_key: &str) -> Vec<Arc<Chunk>> {
fn chunks(&self, partition_key: &str) -> Vec<Arc<Chunk>> {
let partition = self.get_partition(partition_key);
let partition = partition.read().expect("mutex poisoned");
partition.chunks()
@ -537,39 +528,6 @@ impl ChunkTableFilter {
}
}
/// return a plan that selects all values from field columns after
/// applying timestamp and other predicates
#[derive(Debug)]
struct TableFieldPredVisitor {
// As Each table can be spread across multiple Chunks, we
// collect all the relevant plans and Union them together.
plans: Vec<LogicalPlan>,
}
impl Visitor for TableFieldPredVisitor {
fn pre_visit_table(
&mut self,
table: &Table,
chunk: &Chunk,
filter: &mut ChunkTableFilter,
) -> Result<()> {
self.plans
.push(table.field_names_plan(filter.chunk_predicate(), chunk)?);
Ok(())
}
}
impl TableFieldPredVisitor {
fn new() -> Self {
let plans = Vec::new();
Self { plans }
}
fn into_fieldlist_plan(self) -> FieldListPlan {
FieldListPlan::Plans(self.plans)
}
}
/// return all values in the `column_name` column
/// in this database, while applying the timestamp range
///
@ -823,7 +781,6 @@ mod tests {
use super::*;
use data_types::selection::Selection;
use query::{
exec::fieldlist::{Field, FieldList},
exec::{
field::FieldIndexes,
seriesset::{Error as SeriesSetError, SeriesSet, SeriesSetItem},
@ -834,10 +791,7 @@ mod tests {
};
use arrow_deps::{
arrow::{
array::{Array, StringArray},
datatypes::DataType,
},
arrow::array::{Array, StringArray},
datafusion::prelude::*,
};
use influxdb_line_protocol::{parse_lines, ParsedLine};
@ -1272,155 +1226,6 @@ mod tests {
);
}
#[tokio::test]
async fn test_field_columns() -> Result {
// Ensure that the database queries are hooked up correctly
let db = MutableBufferDb::new("column_namedb");
let lp_data = vec![
"h2o,state=MA,city=Boston temp=70.4 50",
"h2o,state=MA,city=Boston other_temp=70.4 250",
"h2o,state=CA,city=Boston other_temp=72.4 350",
"o2,state=MA,city=Boston temp=53.4,reading=51 50",
]
.join("\n");
let lines: Vec<_> = parse_lines(&lp_data).map(|l| l.unwrap()).collect();
write_lines(&db, &lines).await;
// write a new lp_line that is in a new day and thus a new partititon
let nanoseconds_per_day: i64 = 1_000_000_000 * 60 * 60 * 24;
let lp_data = vec![format!(
"h2o,state=MA,city=Boston temp=70.4,moisture=43.0 {}",
nanoseconds_per_day * 10
)]
.join("\n");
let lines: Vec<_> = parse_lines(&lp_data).map(|l| l.unwrap()).collect();
write_lines(&db, &lines).await;
// ensure there are 2 chunks
assert_eq!(db.len(), 2);
// setup to run the execution plan (
let executor = Executor::default();
let predicate = PredicateBuilder::default()
.table("NoSuchTable")
.add_expr(col("state").eq(lit("MA"))) // state=MA
.build();
// make sure table filtering works (no tables match)
let plan = db
.field_column_names(predicate)
.await
.expect("Created field_columns plan successfully");
let fieldlists = executor
.to_field_list(plan)
.await
.expect("Running fieldlist plan");
assert!(fieldlists.fields.is_empty());
// get only fields from h20 (but both chunks)
let predicate = PredicateBuilder::default()
.table("h2o")
.add_expr(col("state").eq(lit("MA"))) // state=MA
.build();
let plan = db
.field_column_names(predicate)
.await
.expect("Created field_columns plan successfully");
let actual = executor
.to_field_list(plan)
.await
.expect("Running fieldlist plan");
let expected = FieldList {
fields: vec![
Field {
name: "moisture".into(),
data_type: DataType::Float64,
last_timestamp: nanoseconds_per_day * 10,
},
Field {
name: "other_temp".into(),
data_type: DataType::Float64,
last_timestamp: 250,
},
Field {
name: "temp".into(),
data_type: DataType::Float64,
last_timestamp: nanoseconds_per_day * 10,
},
],
};
assert_eq!(
expected, actual,
"Expected:\n{:#?}\nActual:\n{:#?}",
expected, actual
);
Ok(())
}
#[tokio::test]
async fn test_field_columns_timestamp_predicate() -> Result {
// check the appropriate filters are applied in the datafusion plans
let db = MutableBufferDb::new("column_namedb");
let lp_data = vec![
"h2o,state=MA,city=Boston temp=70.4 50",
"h2o,state=MA,city=Boston other_temp=70.4 250",
"h2o,state=CA,city=Boston other_temp=72.4 350",
"o2,state=MA,city=Boston temp=53.4,reading=51 50",
]
.join("\n");
let lines: Vec<_> = parse_lines(&lp_data).map(|l| l.unwrap()).collect();
write_lines(&db, &lines).await;
// setup to run the execution plan (
let executor = Executor::default();
let predicate = PredicateBuilder::default()
.table("h2o")
.timestamp_range(200, 300)
.add_expr(col("state").eq(lit("MA"))) // state=MA
.build();
let plan = db
.field_column_names(predicate)
.await
.expect("Created field_columns plan successfully");
let actual = executor
.to_field_list(plan)
.await
.expect("Running fieldlist plan");
// Should only have other_temp as a field
let expected = FieldList {
fields: vec![Field {
name: "other_temp".into(),
data_type: DataType::Float64,
last_timestamp: 250,
}],
};
assert_eq!(
expected, actual,
"Expected:\n{:#?}\nActual:\n{:#?}",
expected, actual
);
Ok(())
}
#[tokio::test]
async fn db_size() {
let db = MutableBufferDb::new("column_namedb");

View File

@ -614,39 +614,6 @@ impl Table {
))
}
/// Creates a plan that produces an output table with rows that
/// match the predicate for all fields in the table.
///
/// The output looks like (field0, field1, ..., time)
///
/// The data is not sorted in any particular order
///
/// The created plan looks like:
///
/// Projection (select the field columns needed)
/// Filter(predicate) [optional]
/// InMemoryScan
pub fn field_names_plan(
&self,
chunk_predicate: &ChunkPredicate,
chunk: &Chunk,
) -> Result<LogicalPlan> {
// Scan and Filter
let plan_builder = self.scan_with_predicates(chunk_predicate, chunk)?;
// Selection
let select_exprs = self
.field_and_time_column_names(chunk_predicate, chunk)
.into_iter()
.map(|c| c.into_expr())
.collect::<Vec<_>>();
let plan_builder = plan_builder.project(&select_exprs).context(BuildingPlan)?;
// and finally create the plan
plan_builder.build().context(BuildingPlan)
}
// Returns (tag_columns, field_columns) vectors with the names of
// all tag and field columns, respectively, after any predicates
// have been applied. The vectors are sorted by lexically by name.
@ -690,42 +657,6 @@ impl Table {
Ok((tag_columns, field_columns))
}
// Returns (field_columns and time) in sorted order
fn field_and_time_column_names(
&self,
chunk_predicate: &ChunkPredicate,
chunk: &Chunk,
) -> ArcStringVec {
let mut field_columns = self
.columns
.iter()
.filter_map(|(column_id, column)| {
match column {
Column::Tag(_, _) => None, // skip tags
_ => {
if chunk_predicate.should_include_field(*column_id)
|| chunk_predicate.is_time_column(*column_id)
{
let column_name = chunk
.dictionary
.lookup_id(*column_id)
.expect("Find column name in dictionary");
Some(Arc::new(column_name.to_string()))
} else {
None
}
}
}
})
.collect::<Vec<_>>();
// Sort the field columns too so that the output always comes
// out in a predictable order
field_columns.sort();
field_columns
}
/// Returns the column selection for all the columns in this table, orderd
/// by table name
fn all_columns_selection<'a>(&self, chunk: &'a Chunk) -> Result<TableColSelection<'a>> {
@ -2091,47 +2022,6 @@ mod tests {
assert_eq!(expected, results, "expected output");
}
#[tokio::test]
async fn test_field_name_plan() {
let mut chunk = Chunk::new(42);
let dictionary = &mut chunk.dictionary;
let mut table = Table::new(dictionary.lookup_value_or_insert("table_name"));
let lp_lines = vec![
// Order this so field3 comes before field2
// (and thus the columns need to get reordered)
"h2o,tag1=foo,tag2=bar field1=70.6,field3=2 100",
"h2o,tag1=foo,tag2=bar field1=70.4,field2=\"ss\" 100",
"h2o,tag1=foo,tag2=bar field1=70.5,field2=\"ss\" 100",
"h2o,tag1=foo,tag2=bar field1=70.6,field4=true 1000",
];
write_lines_to_table(&mut table, dictionary, lp_lines);
let predicate = PredicateBuilder::default().timestamp_range(0, 200).build();
let chunk_predicate = chunk.compile_predicate(&predicate).unwrap();
let field_names_set_plan = table
.field_names_plan(&chunk_predicate, &chunk)
.expect("creating the field_name plan");
// run the created plan, ensuring the output is as expected
let results = run_plan(field_names_set_plan).await;
let expected = vec![
"+--------+--------+--------+--------+------+",
"| field1 | field2 | field3 | field4 | time |",
"+--------+--------+--------+--------+------+",
"| 70.6 | | 2 | | 100 |",
"| 70.4 | ss | | | 100 |",
"| 70.5 | ss | | | 100 |",
"+--------+--------+--------+--------+------+",
];
assert_eq!(expected, results, "expected output");
}
#[test]
fn test_reorder_prefix() {
assert_eq!(reorder_prefix_ok(&[], &[]), &[] as &[&str]);

View File

@ -7,9 +7,9 @@ edition = "2018"
[dependencies] # In alphabetical order
async-trait = "0.1.42"
# Microsoft Azure Blob storage integration
# In order to support tokio 1.0 needed to pull in unreleased azure sdk
azure_core = { git = "https://github.com/Azure/azure-sdk-for-rust.git", rev = "5ecad7216e1f04c5ff41e7de4667f006664c8cca" }
azure_storage = { git = "https://github.com/Azure/azure-sdk-for-rust.git", rev = "5ecad7216e1f04c5ff41e7de4667f006664c8cca", default-features = false, features = ["table", "blob"] }
# In order to support tokio 1.0 and delimiters, needed to pull in unreleased azure sdk
azure_core = { git = "https://github.com/Azure/azure-sdk-for-rust.git", rev = "14ff9326bb1ba07f98733a548988eccd4532b945" }
azure_storage = { git = "https://github.com/Azure/azure-sdk-for-rust.git", rev = "14ff9326bb1ba07f98733a548988eccd4532b945", default-features = false, features = ["table", "blob", "queue"] }
bytes = "1.0"
chrono = "0.4"
# Google Cloud Storage integration
@ -26,11 +26,13 @@ rusoto_s3 = "0.46.0"
snafu = { version = "0.6.10", features = ["futures"] }
tokio = { version = "1.0", features = ["macros", "fs"] }
# Filesystem integration
tokio-util = "0.6.2"
tokio-util = { version = "0.6.3", features = [ "io" ] }
reqwest = "0.11"
# Filesystem integration
walkdir = "2"
tempfile = "3.1.0"
[dev-dependencies] # In alphabetical order
dotenv = "0.15.0"
tempfile = "3.1.0"
futures-test = "0.3.12"

View File

@ -1,6 +1,7 @@
//! This module contains the IOx implementation for using S3 as the object
//! store.
use crate::{
buffer::slurp_stream_tempfile,
path::{cloud::CloudPath, DELIMITER},
ListResult, ObjectMeta, ObjectStoreApi,
};
@ -94,6 +95,9 @@ pub enum Error {
source: chrono::ParseError,
bucket: String,
},
#[snafu(display("Unable to buffer data into temporary file, Error: {}", source))]
UnableToBufferStream { source: std::io::Error },
}
/// Configuration for connecting to [Amazon S3](https://aws.amazon.com/s3/).
@ -120,11 +124,20 @@ impl ObjectStoreApi for AmazonS3 {
CloudPath::default()
}
async fn put<S>(&self, location: &Self::Path, bytes: S, length: usize) -> Result<()>
async fn put<S>(&self, location: &Self::Path, bytes: S, length: Option<usize>) -> Result<()>
where
S: Stream<Item = io::Result<Bytes>> + Send + Sync + 'static,
{
let bytes = ByteStream::new_with_size(bytes, length);
let bytes = match length {
Some(length) => ByteStream::new_with_size(bytes, length),
None => {
let bytes = slurp_stream_tempfile(bytes)
.await
.context(UnableToBufferStream)?;
let length = bytes.size();
ByteStream::new_with_size(bytes, length)
}
};
let put_request = rusoto_s3::PutObjectRequest {
bucket: self.bucket_name.clone(),
@ -582,7 +595,7 @@ mod tests {
.put(
&location,
futures::stream::once(async move { stream_data }),
data.len(),
Some(data.len()),
)
.await
.unwrap_err();
@ -618,7 +631,7 @@ mod tests {
.put(
&location,
futures::stream::once(async move { stream_data }),
data.len(),
Some(data.len()),
)
.await
.unwrap_err();

View File

@ -1,8 +1,11 @@
//! This module contains the IOx implementation for using Azure Blob storage as
//! the object store.
use crate::{path::cloud::CloudPath, ListResult, ObjectStoreApi};
use crate::{
path::{cloud::CloudPath, DELIMITER},
ListResult, ObjectMeta, ObjectStoreApi,
};
use async_trait::async_trait;
use azure_core::HttpClient;
use azure_core::prelude::*;
use azure_storage::{
clients::{
AsBlobClient, AsContainerClient, AsStorageClient, ContainerClient, StorageAccountClient,
@ -15,8 +18,8 @@ use futures::{
FutureExt, Stream, StreamExt, TryStreamExt,
};
use snafu::{ensure, ResultExt, Snafu};
use std::io;
use std::sync::Arc;
use std::{convert::TryInto, io};
/// A specialized `Result` for Azure object store-related errors
pub type Result<T, E = Error> = std::result::Result<T, E>;
@ -68,7 +71,7 @@ impl ObjectStoreApi for MicrosoftAzure {
CloudPath::default()
}
async fn put<S>(&self, location: &Self::Path, bytes: S, length: usize) -> Result<()>
async fn put<S>(&self, location: &Self::Path, bytes: S, length: Option<usize>) -> Result<()>
where
S: Stream<Item = io::Result<Bytes>> + Send + Sync + 'static,
{
@ -79,17 +82,19 @@ impl ObjectStoreApi for MicrosoftAzure {
.await
.expect("Should have been able to collect streaming data");
ensure!(
temporary_non_streaming.len() == length,
DataDoesNotMatchLength {
actual: temporary_non_streaming.len(),
expected: length,
}
);
if let Some(length) = length {
ensure!(
temporary_non_streaming.len() == length,
DataDoesNotMatchLength {
actual: temporary_non_streaming.len(),
expected: length,
}
);
}
self.container_client
.as_blob_client(&location)
.put_block_blob(&temporary_non_streaming)
.put_block_blob(temporary_non_streaming)
.execute()
.await
.context(UnableToPutData {
@ -166,15 +171,15 @@ impl ObjectStoreApi for MicrosoftAzure {
Err(err) => return Some((Err(err), state)),
};
let next_state = if let Some(marker) = resp.incomplete_vector.next_marker() {
let next_state = if let Some(marker) = resp.next_marker {
ListState::HasMore(marker.as_str().to_string())
} else {
ListState::Done
};
let names = resp
.incomplete_vector
.vector
.blobs
.blobs
.into_iter()
.map(|blob| CloudPath::raw(blob.name))
.collect();
@ -184,8 +189,55 @@ impl ObjectStoreApi for MicrosoftAzure {
.boxed())
}
async fn list_with_delimiter(&self, _prefix: &Self::Path) -> Result<ListResult<Self::Path>> {
unimplemented!();
async fn list_with_delimiter(&self, prefix: &Self::Path) -> Result<ListResult<Self::Path>> {
let mut request = self.container_client.list_blobs();
let prefix = prefix.to_raw();
request = request.delimiter(Delimiter::new(DELIMITER));
request = request.prefix(&*prefix);
let resp = request.execute().await.context(UnableToListData)?;
let next_token = resp.next_marker.as_ref().map(|m| m.as_str().to_string());
let common_prefixes = resp
.blobs
.blob_prefix
.map(|prefixes| {
prefixes
.iter()
.map(|prefix| CloudPath::raw(&prefix.name))
.collect()
})
.unwrap_or_else(Vec::new);
let objects = resp
.blobs
.blobs
.into_iter()
.map(|blob| {
let location = CloudPath::raw(blob.name);
let last_modified = blob.properties.last_modified;
let size = blob
.properties
.content_length
.try_into()
.expect("unsupported size on this platform");
ObjectMeta {
location,
last_modified,
size,
}
})
.collect();
Ok(ListResult {
next_token,
common_prefixes,
objects,
})
}
}
@ -233,7 +285,7 @@ impl MicrosoftAzure {
#[cfg(test)]
mod tests {
use super::*;
use crate::tests::put_get_delete_list;
use crate::tests::{list_with_delimiter, put_get_delete_list};
use std::env;
type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
@ -245,39 +297,35 @@ mod tests {
() => {
dotenv::dotenv().ok();
let account = env::var("AZURE_STORAGE_ACCOUNT");
let container = env::var("AZURE_STORAGE_CONTAINER");
let required_vars = [
"AZURE_STORAGE_ACCOUNT",
"AZURE_STORAGE_CONTAINER",
"AZURE_STORAGE_MASTER_KEY",
];
let unset_vars: Vec<_> = required_vars
.iter()
.filter_map(|&name| match env::var(name) {
Ok(_) => None,
Err(_) => Some(name),
})
.collect();
let unset_var_names = unset_vars.join(", ");
let force = std::env::var("TEST_INTEGRATION");
match (account.is_ok(), container.is_ok(), force.is_ok()) {
(false, false, true) => {
panic!(
"TEST_INTEGRATION is set, \
but AZURE_STROAGE_ACCOUNT and AZURE_STORAGE_CONTAINER are not"
)
}
(false, true, true) => {
panic!("TEST_INTEGRATION is set, but AZURE_STORAGE_ACCOUNT is not")
}
(true, false, true) => {
panic!("TEST_INTEGRATION is set, but AZURE_STORAGE_CONTAINER is not")
}
(false, false, false) => {
eprintln!(
"skipping integration test - set \
AZURE_STROAGE_ACCOUNT and AZURE_STORAGE_CONTAINER to run"
);
return Ok(());
}
(false, true, false) => {
eprintln!("skipping integration test - set AZURE_STORAGE_ACCOUNT to run");
return Ok(());
}
(true, false, false) => {
eprintln!("skipping integration test - set AZURE_STROAGE_CONTAINER to run");
return Ok(());
}
_ => {}
if force.is_ok() && !unset_var_names.is_empty() {
panic!(
"TEST_INTEGRATION is set, \
but variable(s) {} need to be set",
unset_var_names
)
} else if force.is_err() && !unset_var_names.is_empty() {
eprintln!(
"skipping Azure integration test - set \
{} to run",
unset_var_names
);
return Ok(());
}
};
}
@ -291,6 +339,7 @@ mod tests {
let integration = MicrosoftAzure::new_from_env(container_name);
put_get_delete_list(&integration).await?;
list_with_delimiter(&integration).await?;
Ok(())
}

130
object_store/src/buffer.rs Normal file
View File

@ -0,0 +1,130 @@
//! This module contains a `Stream` wrapper that fully consumes (slurps) a
//! `Stream` so it can compute its size, while saving it to a backing store for
//! later replay.
use bytes::Bytes;
use futures::{pin_mut, Stream, StreamExt};
use std::io::{Cursor, Result, SeekFrom};
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::fs::File;
use tokio::io::{copy, AsyncRead, AsyncSeek, AsyncSeekExt, AsyncWrite};
use tokio_util::io::{ReaderStream, StreamReader};
/// Returns a BufferedStream backed by a temporary file.
///
/// The temporary file will be deleted when the result stream
/// is dropped.
pub async fn slurp_stream_tempfile<S>(bytes: S) -> Result<BufferedStream<File>>
where
S: Stream<Item = Result<Bytes>> + Send + Sync,
{
let tmp = File::from_std(tempfile::tempfile()?);
BufferedStream::new(tmp, bytes).await
}
/// Returns a BufferedStream backed by a in-memory buffer.
#[allow(dead_code)]
pub async fn slurp_stream_memory<S>(bytes: S) -> Result<BufferedStream<Cursor<Vec<u8>>>>
where
S: Stream<Item = Result<Bytes>> + Send + Sync,
{
BufferedStream::new(Cursor::new(Vec::new()), bytes).await
}
// A stream fully buffered by a backing store..
pub struct BufferedStream<R>
where
R: AsyncRead + AsyncWrite + AsyncSeek + Unpin,
{
size: usize,
inner: ReaderStream<R>,
}
impl<R> BufferedStream<R>
where
R: AsyncRead + AsyncWrite + AsyncSeek + Unpin,
{
/// Consumes the bytes stream fully and writes its content into file.
/// It returns a Stream implementation that reads the same content from the
/// buffered file.
///
/// The granularity of stream "chunks" will not be preserved.
pub async fn new<S>(mut backing_store: R, bytes: S) -> Result<Self>
where
S: Stream<Item = Result<Bytes>> + Send + Sync,
{
pin_mut!(bytes);
let mut read = StreamReader::new(bytes);
let size = copy(&mut read, &mut backing_store).await? as usize;
backing_store.seek(SeekFrom::Start(0)).await?;
Ok(Self {
size,
inner: ReaderStream::new(backing_store),
})
}
pub fn size(&self) -> usize {
self.size
}
}
impl<R> Stream for BufferedStream<R>
where
R: AsyncRead + AsyncWrite + AsyncSeek + Unpin,
{
type Item = Result<Bytes>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.inner.poll_next_unpin(cx)
}
fn size_hint(&self) -> (usize, Option<usize>) {
(self.size, Some(self.size()))
}
}
#[cfg(test)]
mod tests {
use super::*;
use futures::stream::{self, TryStreamExt};
use futures_test::stream::StreamTestExt;
fn test_data() -> impl Stream<Item = Result<Bytes>> + Send + Sync {
stream::iter(vec!["foo", "bar", "baz"])
.map(|i| Ok(Bytes::from(i)))
.interleave_pending()
}
async fn check_stream<R>(buf_stream: BufferedStream<R>) -> Result<()>
where
R: AsyncRead + AsyncWrite + AsyncSeek + Unpin,
{
assert_eq!(buf_stream.size(), 9);
assert_eq!(buf_stream.size_hint(), (9, Some(9)));
let content = buf_stream
.map_ok(|b| bytes::BytesMut::from(&b[..]))
.try_concat()
.await?;
assert_eq!(content, "foobarbaz");
Ok(())
}
#[tokio::test]
async fn test_buffered_stream() -> Result<()> {
let backing_store = std::io::Cursor::new(Vec::new()); // in-memory buffer
check_stream(BufferedStream::new(backing_store, test_data()).await?).await
}
#[tokio::test]
async fn test_slurp_stream_tempfile() -> Result<()> {
check_stream(slurp_stream_tempfile(test_data()).await?).await
}
#[tokio::test]
async fn test_slurp_stream_memory() -> Result<()> {
check_stream(slurp_stream_memory(test_data()).await?).await
}
}

View File

@ -76,7 +76,7 @@ impl ObjectStoreApi for File {
FilePath::default()
}
async fn put<S>(&self, location: &Self::Path, bytes: S, length: usize) -> Result<()>
async fn put<S>(&self, location: &Self::Path, bytes: S, length: Option<usize>) -> Result<()>
where
S: Stream<Item = io::Result<Bytes>> + Send + Sync + 'static,
{
@ -86,13 +86,15 @@ impl ObjectStoreApi for File {
.await
.context(UnableToStreamDataIntoMemory)?;
ensure!(
content.len() == length,
DataDoesNotMatchLength {
actual: content.len(),
expected: length,
}
);
if let Some(length) = length {
ensure!(
content.len() == length,
DataDoesNotMatchLength {
actual: content.len(),
expected: length,
}
);
}
let path = self.path(location);
@ -290,7 +292,7 @@ mod tests {
let bytes = stream::once(async { Ok(Bytes::from("hello world")) });
let mut location = integration.new_path();
location.set_file_name("junk");
let res = integration.put(&location, bytes, 0).await;
let res = integration.put(&location, bytes, Some(0)).await;
assert!(matches!(
res.err().unwrap(),
@ -317,7 +319,36 @@ mod tests {
.put(
&location,
futures::stream::once(async move { stream_data }),
data.len(),
Some(data.len()),
)
.await?;
let read_data = integration
.get(&location)
.await?
.map_ok(|b| bytes::BytesMut::from(&b[..]))
.try_concat()
.await?;
assert_eq!(&*read_data, data);
Ok(())
}
#[tokio::test]
async fn unknown_length() -> Result<()> {
let root = TempDir::new()?;
let integration = File::new(root.path());
let data = Bytes::from("arbitrary data");
let stream_data = std::io::Result::Ok(data.clone());
let mut location = integration.new_path();
location.set_file_name("some_file");
integration
.put(
&location,
futures::stream::once(async move { stream_data }),
None,
)
.await?;

View File

@ -84,7 +84,7 @@ impl ObjectStoreApi for GoogleCloudStorage {
CloudPath::default()
}
async fn put<S>(&self, location: &Self::Path, bytes: S, length: usize) -> Result<()>
async fn put<S>(&self, location: &Self::Path, bytes: S, length: Option<usize>) -> Result<()>
where
S: Stream<Item = io::Result<Bytes>> + Send + Sync + 'static,
{
@ -95,13 +95,15 @@ impl ObjectStoreApi for GoogleCloudStorage {
.expect("Should have been able to collect streaming data")
.to_vec();
ensure!(
temporary_non_streaming.len() == length,
DataDoesNotMatchLength {
actual: temporary_non_streaming.len(),
expected: length,
}
);
if let Some(length) = length {
ensure!(
temporary_non_streaming.len() == length,
DataDoesNotMatchLength {
actual: temporary_non_streaming.len(),
expected: length,
}
);
}
let location = location.to_raw();
let location_copy = location.clone();
@ -418,7 +420,7 @@ mod test {
.put(
&location,
futures::stream::once(async move { stream_data }),
data.len(),
Some(data.len()),
)
.await
.unwrap_err();

View File

@ -18,6 +18,7 @@
pub mod aws;
pub mod azure;
mod buffer;
pub mod disk;
pub mod gcp;
pub mod memory;
@ -54,7 +55,7 @@ pub trait ObjectStoreApi: Send + Sync + 'static {
&self,
location: &Self::Path,
bytes: S,
length: usize,
length: Option<usize>,
) -> Result<(), Self::Error>
where
S: Stream<Item = io::Result<Bytes>> + Send + Sync + 'static;
@ -130,7 +131,7 @@ impl ObjectStoreApi for ObjectStore {
}
}
async fn put<S>(&self, location: &Self::Path, bytes: S, length: usize) -> Result<()>
async fn put<S>(&self, location: &Self::Path, bytes: S, length: Option<usize>) -> Result<()>
where
S: Stream<Item = io::Result<Bytes>> + Send + Sync + 'static,
{
@ -485,7 +486,7 @@ mod tests {
.put(
&location,
futures::stream::once(async move { stream_data }),
data.len(),
Some(data.len()),
)
.await?;
@ -551,7 +552,7 @@ mod tests {
.put(
f,
futures::stream::once(async move { stream_data }),
data.len(),
Some(data.len()),
)
.await
.unwrap();

View File

@ -45,7 +45,7 @@ impl ObjectStoreApi for InMemory {
DirsAndFileName::default()
}
async fn put<S>(&self, location: &Self::Path, bytes: S, length: usize) -> Result<()>
async fn put<S>(&self, location: &Self::Path, bytes: S, length: Option<usize>) -> Result<()>
where
S: Stream<Item = io::Result<Bytes>> + Send + Sync + 'static,
{
@ -55,13 +55,15 @@ impl ObjectStoreApi for InMemory {
.await
.context(UnableToStreamDataIntoMemory)?;
ensure!(
content.len() == length,
DataDoesNotMatchLength {
actual: content.len(),
expected: length,
}
);
if let Some(length) = length {
ensure!(
content.len() == length,
DataDoesNotMatchLength {
actual: content.len(),
expected: length,
}
);
}
let content = content.freeze();
@ -201,7 +203,7 @@ mod tests {
let bytes = stream::once(async { Ok(Bytes::from("hello world")) });
let mut location = integration.new_path();
location.set_file_name("junk");
let res = integration.put(&location, bytes, 0).await;
let res = integration.put(&location, bytes, Some(0)).await;
assert!(matches!(
res.err().unwrap(),
@ -213,4 +215,32 @@ mod tests {
Ok(())
}
#[tokio::test]
async fn unknown_length() -> Result<()> {
let integration = InMemory::new();
let data = Bytes::from("arbitrary data");
let stream_data = std::io::Result::Ok(data.clone());
let mut location = integration.new_path();
location.set_file_name("some_file");
integration
.put(
&location,
futures::stream::once(async move { stream_data }),
None,
)
.await?;
let read_data = integration
.get(&location)
.await?
.map_ok(|b| bytes::BytesMut::from(&b[..]))
.try_concat()
.await?;
assert_eq!(&*read_data, data);
Ok(())
}
}

View File

@ -28,7 +28,7 @@ use tokio::sync::mpsc::{self, error::SendError};
use snafu::{ResultExt, Snafu};
use crate::plan::stringset::StringSetPlan;
use crate::plan::{fieldlist::FieldListPlan, stringset::StringSetPlan};
#[derive(Debug, Snafu)]
pub enum Error {
@ -170,14 +170,6 @@ impl From<Vec<SeriesSetPlan>> for SeriesSetPlans {
}
}
/// A plan that can be run to produce a sequence of FieldLists
/// DataFusion plans or a known set of results
#[derive(Debug)]
pub enum FieldListPlan {
Known(Result<FieldList>),
Plans(Vec<LogicalPlan>),
}
/// Handles executing plans, and marshalling the results into rust
/// native structures.
#[derive(Debug, Default)]
@ -295,46 +287,43 @@ impl Executor {
/// Executes `plan` and return the resulting FieldList
pub async fn to_field_list(&self, plan: FieldListPlan) -> Result<FieldList> {
match plan {
FieldListPlan::Known(res) => res,
FieldListPlan::Plans(plans) => {
// Run the plans in parallel
let handles = plans
.into_iter()
.map(|plan| {
let counters = Arc::clone(&self.counters);
let FieldListPlan { plans } = plan;
tokio::task::spawn(async move {
let ctx = IOxExecutionContext::new(counters);
let physical_plan = ctx
.prepare_plan(&plan)
.await
.context(DataFusionPhysicalPlanning)?;
// Run the plans in parallel
let handles = plans
.into_iter()
.map(|plan| {
let counters = Arc::clone(&self.counters);
// TODO: avoid this buffering
let fieldlist = ctx
.collect(physical_plan)
.await
.context(FieldListExectuon)?
.into_fieldlist()
.context(FieldListConversion);
tokio::task::spawn(async move {
let ctx = IOxExecutionContext::new(counters);
let physical_plan = ctx
.prepare_plan(&plan)
.await
.context(DataFusionPhysicalPlanning)?;
Ok(fieldlist)
})
})
.collect::<Vec<_>>();
// TODO: avoid this buffering
let fieldlist = ctx
.collect(physical_plan)
.await
.context(FieldListExectuon)?
.into_fieldlist()
.context(FieldListConversion);
// collect them all up and combine them
let mut results = Vec::new();
for join_handle in handles {
let fieldlist = join_handle.await.context(JoinError)???;
Ok(fieldlist)
})
})
.collect::<Vec<_>>();
results.push(fieldlist);
}
// collect them all up and combine them
let mut results = Vec::new();
for join_handle in handles {
let fieldlist = join_handle.await.context(JoinError)???;
results.into_fieldlist().context(FieldListConversion)
}
results.push(fieldlist);
}
results.into_fieldlist().context(FieldListConversion)
}
/// Run the plan and return a record batch reader for reading the results

View File

@ -5,7 +5,7 @@ use std::{
use arrow_deps::datafusion::{
error::{DataFusionError, Result as DatafusionResult},
logical_plan::{Expr, ExpressionVisitor, LogicalPlanBuilder, Operator, Recursion},
logical_plan::{Expr, ExpressionVisitor, LogicalPlan, LogicalPlanBuilder, Operator, Recursion},
prelude::col,
};
use data_types::{
@ -17,7 +17,10 @@ use tracing::debug;
use crate::{
exec::{make_schema_pivot, stringset::StringSet},
plan::stringset::{Error as StringSetError, StringSetPlan, StringSetPlanBuilder},
plan::{
fieldlist::FieldListPlan,
stringset::{Error as StringSetError, StringSetPlan, StringSetPlanBuilder},
},
predicate::{Predicate, PredicateBuilder},
provider::ProviderBuilder,
util::schema_has_all_expr_columns,
@ -188,40 +191,10 @@ impl InfluxRPCPlanner {
// entirely using the metadata
let mut need_full_plans = BTreeMap::new();
let no_tables = StringSet::new();
let mut known_columns = BTreeSet::new();
for chunk in self.filtered_chunks(database, &predicate).await? {
// try and get the table names that have rows that match the predicate
let table_names = chunk
.table_names(&predicate, &no_tables)
.await
.map_err(|e| Box::new(e) as _)
.context(TableNamePlan)?;
debug!(table_names=?table_names, chunk_id = chunk.id(), "chunk tables");
let table_names = match table_names {
Some(table_names) => {
debug!("found table names with original predicate");
table_names
}
None => {
// couldn't find table names with predicate, get all chunk tables,
// fall back to filtering ourself
let table_name_predicate = if let Some(table_names) = &predicate.table_names {
PredicateBuilder::new().tables(table_names).build()
} else {
Predicate::default()
};
chunk
.table_names(&table_name_predicate, &no_tables)
.await
.map_err(|e| Box::new(e) as _)
.context(InternalTableNamePlanForDefault)?
// unwrap the Option
.context(InternalTableNameCannotGetPlanForDefault)?
}
};
let table_names = self.chunk_table_names(chunk.as_ref(), &predicate).await?;
for table_name in table_names {
debug!(
@ -296,6 +269,97 @@ impl InfluxRPCPlanner {
.context(CreatingStringSet)
}
/// Returns a plan that produces a list of columns and their
/// datatypes (as defined in the data written via `write_lines`),
/// and which have more than zero rows which pass the conditions
/// specified by `predicate`.
pub async fn field_columns<D>(
&self,
database: &D,
predicate: Predicate,
) -> Result<FieldListPlan>
where
D: Database + 'static,
{
debug!(predicate=?predicate, "planning field_columns");
// Algorithm is to run a "select field_cols from table where
// <predicate> type plan for each table in the chunks"
//
// The executor then figures out which columns have non-null
// values and stops the plan executing once it has them
// map table -> Vec<Arc<Chunk>>
let mut table_chunks = BTreeMap::new();
let chunks = self.filtered_chunks(database, &predicate).await?;
for chunk in chunks {
let table_names = self.chunk_table_names(chunk.as_ref(), &predicate).await?;
for table_name in table_names {
table_chunks
.entry(table_name)
.or_insert_with(Vec::new)
.push(Arc::clone(&chunk));
}
}
let mut field_list_plan = FieldListPlan::new();
for (table_name, chunks) in table_chunks {
if let Some(plan) = self
.field_columns_plan(&table_name, &predicate, chunks)
.await?
{
field_list_plan = field_list_plan.append(plan);
}
}
Ok(field_list_plan)
}
/// Find all the table names in the specified chunk that pass the predicate
async fn chunk_table_names<C>(
&self,
chunk: &C,
predicate: &Predicate,
) -> Result<BTreeSet<String>>
where
C: PartitionChunk + 'static,
{
let no_tables = StringSet::new();
// try and get the table names that have rows that match the predicate
let table_names = chunk
.table_names(&predicate, &no_tables)
.await
.map_err(|e| Box::new(e) as _)
.context(TableNamePlan)?;
debug!(table_names=?table_names, chunk_id = chunk.id(), "chunk tables");
let table_names = match table_names {
Some(table_names) => {
debug!("found table names with original predicate");
table_names
}
None => {
// couldn't find table names with predicate, get all chunk tables,
// fall back to filtering ourself
let table_name_predicate = if let Some(table_names) = &predicate.table_names {
PredicateBuilder::new().tables(table_names).build()
} else {
Predicate::default()
};
chunk
.table_names(&table_name_predicate, &no_tables)
.await
.map_err(|e| Box::new(e) as _)
.context(InternalTableNamePlanForDefault)?
// unwrap the Option
.context(InternalTableNameCannotGetPlanForDefault)?
}
};
Ok(table_names)
}
/// removes any columns from Names that are not "Tag"s in the Influx Data
/// Model
fn restrict_to_tags(&self, schema: &Schema, names: BTreeSet<String>) -> BTreeSet<String> {
@ -313,9 +377,11 @@ impl InfluxRPCPlanner {
///
/// The created plan looks like:
///
/// ```text
/// Extension(PivotSchema)
/// Filter(predicate)
/// TableScan (of chunks)
/// ```
async fn tag_column_names_plan<C>(
&self,
table_name: &str,
@ -325,6 +391,123 @@ impl InfluxRPCPlanner {
where
C: PartitionChunk + 'static,
{
let scan_and_filter = self.scan_and_filter(table_name, predicate, chunks).await?;
let TableScanAndFilter {
plan_builder,
schema,
} = match scan_and_filter {
None => return Ok(None),
Some(t) => t,
};
// now, select only the tag columns
let select_exprs = schema
.iter()
.filter_map(|(influx_column_type, field)| {
if matches!(influx_column_type, Some(InfluxColumnType::Tag)) {
Some(col(field.name()))
} else {
None
}
})
.collect::<Vec<_>>();
let plan = plan_builder
.project(&select_exprs)
.context(BuildingPlan)?
.build()
.context(BuildingPlan)?;
// And finally pivot the plan
let plan = make_schema_pivot(plan);
debug!(table_name=table_name, plan=%plan.display_indent_schema(),
"created column_name plan for table");
Ok(Some(plan.into()))
}
/// Creates a DataFusion LogicalPlan that returns the timestamp
/// and all field columns for a specified table:
///
/// The output looks like (field0, field1, ..., time)
///
/// The data is not sorted in any particular order
///
/// returns `None` if the table contains no rows that would pass
/// the predicate.
///
/// The created plan looks like:
///
/// ```text
/// Projection (select the field columns needed)
/// Filter(predicate) [optional]
/// InMemoryScan
/// ```
async fn field_columns_plan<C>(
&self,
table_name: &str,
predicate: &Predicate,
chunks: Vec<Arc<C>>,
) -> Result<Option<LogicalPlan>>
where
C: PartitionChunk + 'static,
{
let scan_and_filter = self.scan_and_filter(table_name, predicate, chunks).await?;
let TableScanAndFilter {
plan_builder,
schema,
} = match scan_and_filter {
None => return Ok(None),
Some(t) => t,
};
// Selection of only fields and time
let select_exprs = schema
.iter()
.filter_map(|(influx_column_type, field)| match influx_column_type {
Some(InfluxColumnType::Field(_)) => Some(col(field.name())),
Some(InfluxColumnType::Timestamp) => Some(col(field.name())),
Some(_) => None,
None => None,
})
.collect::<Vec<_>>();
let plan = plan_builder
.project(&select_exprs)
.context(BuildingPlan)?
.build()
.context(BuildingPlan)?;
Ok(Some(plan))
}
/// Create a plan that scans the specified table, and applies any
/// filtering specified on the predicate, if any.
///
/// If the table can produce no rows based on predicate
/// evaluation, returns Ok(None)
///
/// The created plan looks like:
///
/// ```text
/// Filter(predicate) [optional]
/// InMemoryScan
/// ```
async fn scan_and_filter<C>(
&self,
table_name: &str,
predicate: &Predicate,
chunks: Vec<Arc<C>>,
) -> Result<Option<TableScanAndFilter>>
where
C: PartitionChunk + 'static,
{
// Scan all columns to begin with (datafusion projection
// pushdown optimization will prune out uneeded columns later)
let projection = None;
let selection = Selection::All;
// Prepare the scan of the table
let mut builder = ProviderBuilder::new(table_name);
for chunk in chunks {
@ -339,7 +522,7 @@ impl InfluxRPCPlanner {
);
let chunk_table_schema = chunk
.table_schema(table_name, Selection::All)
.table_schema(table_name, selection)
.await
.map_err(|e| Box::new(e) as _)
.context(GettingTableSchema {
@ -355,10 +538,6 @@ impl InfluxRPCPlanner {
let provider = builder.build().context(CreatingProvider { table_name })?;
let schema = provider.iox_schema();
// Scan all columns to begin with (datafusion projection
// pushdown optimization will prune out uneeded columns later)
let projection = None;
let mut plan_builder = LogicalPlanBuilder::scan(table_name, Arc::new(provider), projection)
.context(BuildingPlan)?;
@ -369,40 +548,25 @@ impl InfluxRPCPlanner {
// to evaluate the predicate (if not, it means no rows can
// match and thus we should skip this plan)
if !schema_has_all_expr_columns(&schema, &filter_expr) {
debug!(table_name=table_name, schema=?schema, filter_expr=?filter_expr, "Skipping table as schema doesn't have all filter_expr columns");
debug!(table_name=table_name,
schema=?schema,
filter_expr=?filter_expr,
"Skipping table as schema doesn't have all filter_expr columns");
return Ok(None);
}
// Assuming that if a table doesn't have all the columns
// in an expression it can't be true isn't correct for
// certain predicates (e.g. IS NOT NULL), so error out
// here until we have proper support for that
// here until we have proper support for that case
check_predicate_support(&filter_expr)?;
plan_builder = plan_builder.filter(filter_expr).context(BuildingPlan)?;
}
// now, select only the tag columns
let select_exprs = schema
.iter()
.filter_map(|(influx_column_type, field)| {
if matches!(influx_column_type, Some(InfluxColumnType::Tag)) {
Some(col(field.name()))
} else {
None
}
})
.collect::<Vec<_>>();
let plan_builder = plan_builder.project(&select_exprs).context(BuildingPlan)?;
let plan = plan_builder.build().context(BuildingPlan)?;
// And finally pivot the plan
let plan = make_schema_pivot(plan);
debug!(table_name=table_name, plan=%plan.display_indent_schema(),
"created column_name plan for table");
Ok(Some(plan.into()))
Ok(Some(TableScanAndFilter {
plan_builder,
schema,
}))
}
/// Returns a list of chunks across all partitions which may
@ -422,7 +586,6 @@ impl InfluxRPCPlanner {
let partition_keys = database
.partition_keys()
.await
.map_err(|e| Box::new(e) as _)
.context(ListingPartitions)?;
@ -430,7 +593,7 @@ impl InfluxRPCPlanner {
for key in partition_keys {
// TODO prune partitions somehow
let partition_chunks = database.chunks(&key).await;
let partition_chunks = database.chunks(&key);
for chunk in partition_chunks {
let could_pass_predicate = chunk
.could_pass_predicate(predicate)
@ -505,3 +668,10 @@ impl ExpressionVisitor for SupportVisitor {
}
}
}
struct TableScanAndFilter {
/// Represents plan that scans a table and applies optional filtering
plan_builder: LogicalPlanBuilder,
/// The IOx schema of the result
schema: Schema,
}

View File

@ -92,7 +92,6 @@ impl SQLQueryPlanner {
let partition_keys = database
.partition_keys()
.await
.map_err(|e| Box::new(e) as _)
.context(GettingDatabasePartition)?;
@ -103,7 +102,7 @@ impl SQLQueryPlanner {
let mut builder = ProviderBuilder::new(table_name);
for partition_key in &partition_keys {
for chunk in database.chunks(partition_key).await {
for chunk in database.chunks(partition_key) {
if chunk.has_table(table_name) {
let chunk_id = chunk.id();
let chunk_table_schema = chunk
@ -129,7 +128,7 @@ impl SQLQueryPlanner {
.context(CreatingTableProvider { table_name })?;
ctx.inner_mut()
.register_table(&table_name, Box::new(provider));
.register_table(&table_name, Arc::new(provider));
}
ctx.prepare_sql(query).await.context(Preparing)

View File

@ -672,7 +672,7 @@ mod test {
)
.unwrap();
let mut ctx = ExecutionContext::new();
ctx.register_table("t", Box::new(provider));
ctx.register_table("t", Arc::new(provider));
let df = ctx.table("t").unwrap();
let df = df.aggregate(&[], &aggs).unwrap();

View File

@ -11,7 +11,7 @@ use async_trait::async_trait;
use data_types::{
data::ReplicatedWrite, partition_metadata::TableSummary, schema::Schema, selection::Selection,
};
use exec::{stringset::StringSet, Executor, FieldListPlan, SeriesSetPlans};
use exec::{stringset::StringSet, Executor, SeriesSetPlans};
use plan::stringset::StringSetPlan;
use std::{fmt::Debug, sync::Arc};
@ -44,23 +44,17 @@ pub trait Database: Debug + Send + Sync {
async fn store_replicated_write(&self, write: &ReplicatedWrite) -> Result<(), Self::Error>;
/// Return the partition keys for data in this DB
async fn partition_keys(&self) -> Result<Vec<String>, Self::Error>;
fn partition_keys(&self) -> Result<Vec<String>, Self::Error>;
/// Returns a covering set of chunks in the specified partition. A
/// covering set means that together the chunks make up a single
/// complete copy of the data being queried.
async fn chunks(&self, partition_key: &str) -> Vec<Arc<Self::Chunk>>;
fn chunks(&self, partition_key: &str) -> Vec<Arc<Self::Chunk>>;
// ----------
// The functions below are slated for removal (migration into a gRPC query
// frontend) ---------
/// Returns a plan that produces a list of column names in this
/// database which store fields (as defined in the data written
/// via `write_lines`), and which have at least one row which
/// matches the conditions listed on `predicate`.
async fn field_column_names(&self, predicate: Predicate) -> Result<FieldListPlan, Self::Error>;
/// Returns a plan which finds the distinct values in the
/// `column_name` column of this database which pass the
/// conditions specified by `predicate`.

View File

@ -1 +1,2 @@
pub mod fieldlist;
pub mod stringset;

View File

@ -0,0 +1,20 @@
use arrow_deps::datafusion::logical_plan::LogicalPlan;
/// A plan which produces a logical set of Fields (e.g. InfluxDB
/// Fields with name, and data type, and last_timestamp).
#[derive(Debug, Default)]
pub struct FieldListPlan {
pub plans: Vec<LogicalPlan>,
}
impl FieldListPlan {
pub fn new() -> Self {
Self::default()
}
/// Append a new plan to this list of plans
pub fn append(mut self, plan: LogicalPlan) -> Self {
self.plans.push(plan);
self
}
}

View File

@ -1,13 +1,22 @@
//! This module provides a reference implementaton of `query::DatabaseSource`
//! and `query::Database` for use in testing.
//!
//! AKA it is a Mock
use arrow_deps::datafusion::physical_plan::SendableRecordBatchStream;
use arrow_deps::{
arrow::{
array::{ArrayRef, Int64Array, StringArray},
datatypes::DataType,
record_batch::RecordBatch,
},
datafusion::physical_plan::{common::SizedRecordBatchStream, SendableRecordBatchStream},
};
use crate::{exec::Executor, group_by::GroupByAndAggregate, plan::stringset::StringSetPlan};
use crate::{
exec::{
stringset::{StringSet, StringSetRef},
FieldListPlan, SeriesSetPlans,
SeriesSetPlans,
},
Database, DatabaseStore, PartitionChunk, Predicate,
};
@ -64,12 +73,6 @@ pub struct TestDatabase {
/// The last request for `query_series`
query_groups_request: Arc<Mutex<Option<QueryGroupsRequest>>>,
/// Responses to return on the next request to `field_column_values`
field_columns_value: Arc<Mutex<Option<FieldListPlan>>>,
/// The last request for `query_series`
field_columns_request: Arc<Mutex<Option<FieldColumnsRequest>>>,
}
/// Records the parameters passed to a column values request
@ -98,13 +101,6 @@ pub struct QueryGroupsRequest {
pub gby_agg: GroupByAndAggregate,
}
/// Records the parameters passed to a `field_columns` request
#[derive(Debug, PartialEq, Clone)]
pub struct FieldColumnsRequest {
/// Stringified '{:?}' version of the predicate
pub predicate: String,
}
#[derive(Snafu, Debug)]
pub enum TestError {
#[snafu(display("Test database error: {}", message))]
@ -233,21 +229,6 @@ impl TestDatabase {
.expect("mutex poisoned")
.take()
}
/// Set the FieldSet plan that will be returned
pub fn set_field_colum_names_values(&self, plan: FieldListPlan) {
*(Arc::clone(&self.field_columns_value)
.lock()
.expect("mutex poisoned")) = Some(plan);
}
/// Get the parameters from the last column name request
pub fn get_field_columns_request(&self) -> Option<FieldColumnsRequest> {
Arc::clone(&self.field_columns_request)
.lock()
.expect("mutex poisoned")
.take()
}
}
/// returns true if this line is within the range of the timestamp
@ -307,27 +288,6 @@ impl Database for TestDatabase {
Ok(())
}
async fn field_column_names(&self, predicate: Predicate) -> Result<FieldListPlan, Self::Error> {
// save the request
let predicate = predicate_to_test_string(&predicate);
let field_columns_request = Some(FieldColumnsRequest { predicate });
*Arc::clone(&self.field_columns_request)
.lock()
.expect("mutex poisoned") = field_columns_request;
// pull out the saved columns
Arc::clone(&self.field_columns_value)
.lock()
.expect("mutex poisoned")
.take()
// Turn None into an error
.context(General {
message: "No saved field_column_name in TestDatabase",
})
}
/// Return the mocked out column values, recording the request
async fn column_values(
&self,
@ -402,13 +362,13 @@ impl Database for TestDatabase {
}
/// Return the partition keys for data in this DB
async fn partition_keys(&self) -> Result<Vec<String>, Self::Error> {
fn partition_keys(&self) -> Result<Vec<String>, Self::Error> {
let partitions = self.partitions.lock().expect("mutex poisoned");
let keys = partitions.keys().cloned().collect();
Ok(keys)
}
async fn chunks(&self, partition_key: &str) -> Vec<Arc<Self::Chunk>> {
fn chunks(&self, partition_key: &str) -> Vec<Arc<Self::Chunk>> {
let partitions = self.partitions.lock().expect("mutex poisoned");
if let Some(chunks) = partitions.get(partition_key) {
chunks.values().cloned().collect()
@ -428,6 +388,9 @@ pub struct TestChunk {
/// Column names: table_name -> Schema
table_schemas: BTreeMap<String, Schema>,
/// RecordBatches that are returned on each request
table_data: BTreeMap<String, Vec<Arc<RecordBatch>>>,
/// A saved error that is returned instead of actual results
saved_error: Option<String>,
}
@ -463,7 +426,7 @@ impl TestChunk {
/// Register an tag column with the test chunk
pub fn with_tag_column(
mut self,
self,
table_name: impl Into<String>,
column_name: impl Into<String>,
) -> Self {
@ -474,6 +437,43 @@ impl TestChunk {
// merge it in to any existing schema
let new_column_schema = SchemaBuilder::new().tag(&column_name).build().unwrap();
self.add_schema_to_table(table_name, new_column_schema)
}
/// Register a timetamp column with the test chunk
pub fn with_time_column(self, table_name: impl Into<String>) -> Self {
let table_name = table_name.into();
// make a new schema with the specified column and
// merge it in to any existing schema
let new_column_schema = SchemaBuilder::new().timestamp().build().unwrap();
self.add_schema_to_table(table_name, new_column_schema)
}
/// Register an int field column with the test chunk
pub fn with_int_field_column(
self,
table_name: impl Into<String>,
column_name: impl Into<String>,
) -> Self {
let column_name = column_name.into();
// make a new schema with the specified column and
// merge it in to any existing schema
let new_column_schema = SchemaBuilder::new()
.field(&column_name, DataType::Int64)
.build()
.unwrap();
self.add_schema_to_table(table_name, new_column_schema)
}
fn add_schema_to_table(
mut self,
table_name: impl Into<String>,
new_column_schema: Schema,
) -> Self {
let table_name = table_name.into();
let mut merger = SchemaMerger::new().merge(new_column_schema).unwrap();
if let Some(existing_schema) = self.table_schemas.remove(&table_name) {
@ -496,6 +496,37 @@ impl TestChunk {
//.map(|v| v.clone())
.cloned()
}
/// Prepares this chunk to return a specific record batch with one
/// row of non null data.
pub fn with_one_row_of_null_data(mut self, table_name: impl Into<String>) -> Self {
let table_name = table_name.into();
let schema = self
.table_schemas
.get(&table_name)
.expect("table must exist in TestChunk");
// create arays
let columns = schema
.iter()
.map(|(_influxdb_column_type, field)| match field.data_type() {
DataType::Int64 => Arc::new(Int64Array::from(vec![1000])) as ArrayRef,
DataType::Utf8 => Arc::new(StringArray::from(vec!["MA"])) as ArrayRef,
_ => unimplemented!(
"Unimplemented data type for test database: {:?}",
field.data_type()
),
})
.collect::<Vec<_>>();
let batch = RecordBatch::try_new(schema.into(), columns).expect("made record batch");
self.table_data
.entry(table_name)
.or_default()
.push(Arc::new(batch));
self
}
}
#[async_trait]
@ -514,11 +545,21 @@ impl PartitionChunk for TestChunk {
async fn read_filter(
&self,
_table_name: &str,
_predicate: &Predicate,
table_name: &str,
predicate: &Predicate,
_selection: Selection<'_>,
) -> Result<SendableRecordBatchStream, Self::Error> {
unimplemented!()
self.check_error()?;
// save the predicate
self.predicate
.lock()
.expect("mutex poisoned")
.replace(predicate.clone());
let batches = self.table_data.get(table_name).expect("Table had data");
let stream = SizedRecordBatchStream::new(batches[0].schema(), batches.clone());
Ok(Box::pin(stream))
}
async fn table_names(
@ -562,8 +603,8 @@ impl PartitionChunk for TestChunk {
})
}
fn has_table(&self, _table_name: &str) -> bool {
unimplemented!()
fn has_table(&self, table_name: &str) -> bool {
self.table_schemas.contains_key(table_name)
}
async fn column_names(

View File

@ -273,6 +273,7 @@ impl Chunk {
&self,
table_name: &str,
predicate: &Predicate,
columns: Selection<'_>,
dst: BTreeSet<String>,
) -> BTreeSet<String> {
let chunk_data = self.chunk_data.read().unwrap();
@ -280,7 +281,7 @@ impl Chunk {
// TODO(edd): same potential contention as `table_names` but I'm ok
// with this for now.
match chunk_data.data.get(table_name) {
Some(table) => table.column_names(predicate, dst),
Some(table) => table.column_names(predicate, columns, dst),
None => dst,
}
}

View File

@ -460,14 +460,17 @@ impl Database {
Ok(names)
}
/// Returns the distinct set of column names (tag keys) that satisfy the
/// provided predicate.
/// Returns the distinct set of column names that satisfy the provided
/// predicate. Columns can be limited via a selection, which means callers
/// that know they are only interested in certain columns can specify those
/// and reduce total execution time.
pub fn column_names(
&self,
partition_key: &str,
table_name: &str,
chunk_ids: &[u32],
predicate: Predicate,
only_columns: Selection<'_>,
) -> Result<Option<BTreeSet<String>>> {
let partition_data = self.data.read().unwrap();
@ -493,7 +496,7 @@ impl Database {
// the dst buffer is pushed into each chunk's `column_names`
// implementation ensuring that we short-circuit any tables where
// we have already determined column names.
chunk.column_names(table_name, &predicate, dst)
chunk.column_names(table_name, &predicate, only_columns, dst)
});
Ok(Some(names))
@ -783,7 +786,7 @@ mod test {
array::{
ArrayRef, BinaryArray, BooleanArray, Float64Array, Int64Array, StringArray, UInt64Array,
},
datatypes::DataType::{Boolean, Float64, Int64, UInt64},
datatypes::DataType::{Boolean, Float64, Int64, UInt64, Utf8},
};
use data_types::schema::builder::SchemaBuilder;
@ -1092,7 +1095,13 @@ mod test {
// Just query against the first chunk.
let result = db
.column_names("hour_1", "Utopia", &[22], Predicate::default())
.column_names(
"hour_1",
"Utopia",
&[22],
Predicate::default(),
Selection::All,
)
.unwrap();
assert_eq!(
@ -1100,14 +1109,26 @@ mod test {
Some(to_set(&["counter", "region", "sketchy_sensor", "time"]))
);
let result = db
.column_names("hour_1", "Utopia", &[40], Predicate::default())
.column_names(
"hour_1",
"Utopia",
&[40],
Predicate::default(),
Selection::All,
)
.unwrap();
assert_eq!(result, Some(to_set(&["active", "time"])));
// And now the union across all chunks.
let result = db
.column_names("hour_1", "Utopia", &[22, 40], Predicate::default())
.column_names(
"hour_1",
"Utopia",
&[22, 40],
Predicate::default(),
Selection::All,
)
.unwrap();
assert_eq!(
@ -1128,6 +1149,7 @@ mod test {
"Utopia",
&[22, 40],
Predicate::new(vec![BinaryExpr::from(("time", "=", 30_i64))]),
Selection::All,
)
.unwrap();
@ -1141,6 +1163,7 @@ mod test {
"Utopia",
&[22, 40],
Predicate::new(vec![BinaryExpr::from(("active", "=", true))]),
Selection::All,
)
.unwrap();
@ -1161,6 +1184,7 @@ mod test {
.non_null_field("counter", Float64)
.field("sketchy_sensor", Int64)
.non_null_field("active", Boolean)
.field("msg", Utf8)
.timestamp()
.build()
.unwrap();
@ -1171,6 +1195,11 @@ mod test {
Arc::new(Float64Array::from(vec![1.2, 300.3, 4500.3])),
Arc::new(Int64Array::from(vec![None, Some(33), Some(44)])),
Arc::new(BooleanArray::from(vec![true, false, false])),
Arc::new(StringArray::from(vec![
Some("message a"),
Some("message b"),
None,
])),
Arc::new(Int64Array::from(vec![i, 2 * i, 3 * i])),
];
@ -1212,6 +1241,11 @@ mod test {
&exp_sketchy_sensor_values,
);
assert_rb_column_equals(&first_row_group, "active", &exp_active_values);
assert_rb_column_equals(
&first_row_group,
"msg",
&Values::String(vec![Some("message a")]),
);
assert_rb_column_equals(&first_row_group, "time", &Values::I64(vec![100])); // first row from first record batch
let second_row_group = itr.next().unwrap();
@ -1314,6 +1348,7 @@ mod test {
.non_null_field("counter", UInt64)
.field("sketchy_sensor", UInt64)
.non_null_field("active", Boolean)
.non_null_field("msg", Utf8)
.timestamp()
.build()
.unwrap();
@ -1325,6 +1360,7 @@ mod test {
Arc::new(UInt64Array::from(vec![1000, 3000, 5000])),
Arc::new(UInt64Array::from(vec![Some(44), None, Some(55)])),
Arc::new(BooleanArray::from(vec![true, true, false])),
Arc::new(StringArray::from(vec![Some("msg a"), Some("msg b"), None])),
Arc::new(Int64Array::from(vec![i, 20 + i, 30 + i])),
];
@ -1363,6 +1399,7 @@ mod test {
("active", AggregateType::Count),
("active", AggregateType::Min),
("active", AggregateType::Max),
("msg", AggregateType::Max),
],
)
.unwrap();
@ -1381,6 +1418,7 @@ mod test {
assert_rb_column_equals(&result, "active_count", &Values::U64(vec![3]));
assert_rb_column_equals(&result, "active_min", &Values::Bool(vec![Some(false)]));
assert_rb_column_equals(&result, "active_max", &Values::Bool(vec![Some(true)]));
assert_rb_column_equals(&result, "msg_max", &Values::String(vec![Some("msg b")]));
//
// With group keys

View File

@ -22,7 +22,10 @@ use arrow_deps::{
arrow, datafusion::logical_plan::Expr as DfExpr,
datafusion::scalar::ScalarValue as DFScalarValue,
};
use data_types::schema::{InfluxColumnType, Schema};
use data_types::{
schema::{InfluxColumnType, Schema},
selection::Selection,
};
/// The name used for a timestamp column.
pub const TIME_COLUMN_NAME: &str = data_types::TIME_COLUMN_NAME;
@ -46,7 +49,7 @@ pub enum Error {
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// A `RowGroup` is an immutable horizontal chunk of a single `Table`. By
/// definition it has the same schema as all the other read groups in the table.
/// definition it has the same schema as all the other row groups in the table.
/// All the columns within the `RowGroup` must have the same number of logical
/// rows.
pub struct RowGroup {
@ -135,7 +138,7 @@ impl RowGroup {
}
}
/// The total estimated size in bytes of the read group
/// The total estimated size in bytes of the row group
pub fn size(&self) -> u64 {
let base_size = std::mem::size_of::<Self>()
+ self
@ -941,7 +944,12 @@ impl RowGroup {
///
/// If you are familiar with InfluxDB, this is essentially an implementation
/// of `SHOW TAG KEYS`.
pub fn column_names(&self, predicate: &Predicate, dst: &mut BTreeSet<String>) {
pub fn column_names(
&self,
predicate: &Predicate,
columns: Selection<'_>,
dst: &mut BTreeSet<String>,
) {
// Determine the set of columns in this row group that are not already
// present in `dst`, i.e., they haven't been identified in other row
// groups already.
@ -951,7 +959,16 @@ impl RowGroup {
.filter_map(|(name, &id)| match dst.contains(name) {
// N.B there is bool::then() but it's currently unstable.
true => None,
false => Some((name, &self.columns[id])),
false => match columns {
Selection::All => Some((name, &self.columns[id])),
Selection::Some(names) => {
if names.iter().any(|selection| name == selection) {
Some((name, &self.columns[id]))
} else {
None
}
}
},
})
.collect::<Vec<_>>();
@ -1019,6 +1036,9 @@ impl From<RecordBatch> for RowGroup {
arrow::datatypes::DataType::Boolean => {
Column::from(arrow::array::BooleanArray::from(arrow_column.data()))
}
arrow::datatypes::DataType::Utf8 => {
Column::from(arrow::array::StringArray::from(arrow_column.data()))
}
dt => unimplemented!(
"data type {:?} currently not supported for field columns",
dt
@ -1495,6 +1515,11 @@ impl MetaData {
self.columns_size += column_size;
}
// Returns meta information about the column.
fn column_meta(&self, name: ColumnName<'_>) -> &ColumnMeta {
self.columns.get(name).unwrap()
}
// Extract schema information for a set of columns.
fn schema_for_column_names(
&self,
@ -2193,13 +2218,13 @@ west,4
// columns
read_group_all_rows_all_rle(&row_group);
// test read group queries that group on fewer than five columns.
// test row group queries that group on fewer than five columns.
read_group_hash_u128_key(&row_group);
// test read group queries that use a vector-based group key.
// test row group queries that use a vector-based group key.
read_group_hash_vec_key(&row_group);
// test read group queries that only group on one column.
// test row group queries that only group on one column.
read_group_single_groupby_column(&row_group);
}
@ -2834,16 +2859,21 @@ west,host-d,11,9
&[Some("Thinking"), Some("of"), Some("a"), Some("place")][..],
));
columns.insert("track".to_string(), track);
let temp = ColumnType::Field(Column::from(
&[Some("hot"), Some("cold"), Some("cold"), Some("warm")][..],
));
columns.insert("temp".to_string(), temp);
let tc = ColumnType::Time(Column::from(&[100_i64, 200, 500, 600][..]));
columns.insert("time".to_string(), tc);
let row_group = RowGroup::new(4, columns);
// No predicate - just find a value in each column that matches.
let mut dst = BTreeSet::new();
row_group.column_names(&Predicate::default(), &mut dst);
row_group.column_names(&Predicate::default(), Selection::All, &mut dst);
assert_eq!(
dst,
vec!["region", "time", "track"]
vec!["region", "temp", "time", "track"]
.into_iter()
.map(|s| s.to_owned())
.collect()
@ -2853,6 +2883,7 @@ west,host-d,11,9
let mut dst = BTreeSet::new();
row_group.column_names(
&Predicate::new(vec![BinaryExpr::from(("region", "=", "east"))]),
Selection::All,
&mut dst,
);
assert!(dst.is_empty());
@ -2862,16 +2893,17 @@ west,host-d,11,9
let mut dst = BTreeSet::new();
let names = row_group.column_names(
&Predicate::new(vec![BinaryExpr::from(("track", "=", "place"))]),
Selection::All,
&mut dst,
);
// query matches one row.
//
// region, track, time
// NULL , place, 600
// region, temp, track, time
// NULL , warm, place, 600
//
assert_eq!(
dst,
vec!["track", "time"]
vec!["temp", "time", "track",]
.into_iter()
.map(|s| s.to_owned())
.collect()
@ -2883,16 +2915,35 @@ west,host-d,11,9
let rc = ColumnType::Tag(Column::from(&[Some("prod")][..]));
columns.insert("env".to_string(), rc);
let tc = ColumnType::Time(Column::from(&[100_i64][..]));
let temp = ColumnType::Field(Column::from(&[Some("hot")][..]));
columns.insert("temp".to_string(), temp);
columns.insert("time".to_string(), tc);
let row_group = RowGroup::new(1, columns);
row_group.column_names(&Predicate::default(), &mut dst);
row_group.column_names(&Predicate::default(), Selection::All, &mut dst);
assert_eq!(
dst,
vec!["env", "time", "track"]
vec!["env", "temp", "time", "track"]
.into_iter()
.map(|s| s.to_owned())
.collect()
);
// just tag keys
dst.clear();
row_group.column_names(&Predicate::default(), Selection::Some(&["env"]), &mut dst);
assert_eq!(
dst.iter().cloned().collect::<Vec<_>>(),
vec!["env".to_owned()],
);
// just field keys
dst.clear();
row_group.column_names(&Predicate::default(), Selection::Some(&["temp"]), &mut dst);
assert_eq!(
dst.iter().cloned().collect::<Vec<_>>(),
vec!["temp".to_owned()],
);
}
}

View File

@ -13,7 +13,6 @@ use snafu::{ensure, Snafu};
use crate::row_group::{self, ColumnName, GroupKey, Predicate, RowGroup};
use crate::schema::{AggregateType, ColumnType, LogicalDataType, ResultSchema};
use crate::value::{AggregateResult, Scalar, Value};
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("cannot drop last row group in table; drop table"))]
@ -431,10 +430,12 @@ impl Table {
///
/// Optionally a predicate may be provided. In such a case only column names
/// will be returned belonging to columns whom have at least one non-null
/// value for any row satisfying the predicate.
/// value for any row satisfying the predicate. Finally, the caller can
/// specify a set of column names to limit execution to only those.
pub fn column_names(
&self,
predicate: &Predicate,
columns: Selection<'_>,
mut dst: BTreeSet<String>,
) -> BTreeSet<String> {
let table_data = self.table_data.read().unwrap();
@ -459,7 +460,7 @@ impl Table {
// lock.
let (_, row_groups) = self.filter_row_groups(predicate);
for row_group in row_groups {
row_group.column_names(predicate, &mut dst);
row_group.column_names(predicate, columns, &mut dst);
}
dst
@ -1296,6 +1297,7 @@ west,host-b,100
let rc = ColumnType::Tag(Column::from(&["west", "south", "north"][..]));
columns.insert("region".to_string(), rc);
let rg = RowGroup::new(3, columns);
let mut table = Table::new("cpu".to_owned(), rg);
@ -1306,6 +1308,7 @@ west,host-b,100
let rc = ColumnType::Tag(Column::from(vec![Some("north"), None, None].as_slice()));
columns.insert("region".to_string(), rc);
let rg = RowGroup::new(3, columns);
table.add_row_group(rg);
@ -1322,7 +1325,7 @@ west,host-b,100
// NULL, 400
let mut dst: BTreeSet<String> = BTreeSet::new();
dst = table.column_names(&Predicate::default(), dst);
dst = table.column_names(&Predicate::default(), Selection::All, dst);
assert_eq!(
dst.iter().cloned().collect::<Vec<_>>(),
@ -1330,7 +1333,7 @@ west,host-b,100
);
// re-run and get the same answer
dst = table.column_names(&Predicate::default(), dst);
dst = table.column_names(&Predicate::default(), Selection::All, dst);
assert_eq!(
dst.iter().cloned().collect::<Vec<_>>(),
vec!["region".to_owned(), "time".to_owned()],
@ -1340,6 +1343,7 @@ west,host-b,100
// region from previous results.
dst = table.column_names(
&Predicate::new(vec![BinaryExpr::from(("time", ">=", 300_i64))]),
Selection::All,
dst,
);
assert_eq!(
@ -1350,6 +1354,7 @@ west,host-b,100
// wipe the destination buffer and region won't show up
dst = table.column_names(
&Predicate::new(vec![BinaryExpr::from(("time", ">=", 300_i64))]),
Selection::All,
BTreeSet::new(),
);
assert_eq!(

View File

@ -399,7 +399,7 @@ impl Segment {
.put(
&location,
futures::stream::once(async move { stream_data }),
len,
Some(len),
)
.await
{

View File

@ -126,11 +126,10 @@ impl Db {
// Return a list of all chunks in the mutable_buffer (that can
// potentially be migrated into the read buffer or object store)
pub async fn mutable_buffer_chunks(&self, partition_key: &str) -> Vec<Arc<DBChunk>> {
pub fn mutable_buffer_chunks(&self, partition_key: &str) -> Vec<Arc<DBChunk>> {
let chunks = if let Some(mutable_buffer) = self.mutable_buffer.as_ref() {
mutable_buffer
.chunks(partition_key)
.await
.into_iter()
.map(DBChunk::new_mb)
.collect()
@ -141,7 +140,7 @@ impl Db {
}
/// List chunks that are currently in the read buffer
pub async fn read_buffer_chunks(&self, partition_key: &str) -> Vec<Arc<DBChunk>> {
pub fn read_buffer_chunks(&self, partition_key: &str) -> Vec<Arc<DBChunk>> {
self.read_buffer
.chunk_ids(partition_key)
.into_iter()
@ -247,14 +246,14 @@ impl Database for Db {
type Chunk = DBChunk;
/// Return a covering set of chunks for a particular partition
async fn chunks(&self, partition_key: &str) -> Vec<Arc<Self::Chunk>> {
fn chunks(&self, partition_key: &str) -> Vec<Arc<Self::Chunk>> {
// return a coverting set of chunks. TODO include read buffer
// chunks and take them preferentially from the read buffer.
// returns a coverting set of chunks -- aka take chunks from read buffer
// preferentially
let mutable_chunk_iter = self.mutable_buffer_chunks(partition_key).await.into_iter();
let mutable_chunk_iter = self.mutable_buffer_chunks(partition_key).into_iter();
let read_buffer_chunk_iter = self.read_buffer_chunks(partition_key).await.into_iter();
let read_buffer_chunk_iter = self.read_buffer_chunks(partition_key).into_iter();
let chunks: BTreeMap<_, _> = mutable_chunk_iter
.chain(read_buffer_chunk_iter)
@ -277,18 +276,6 @@ impl Database for Db {
.context(MutableBufferWrite)
}
async fn field_column_names(
&self,
predicate: query::predicate::Predicate,
) -> Result<query::exec::FieldListPlan, Self::Error> {
self.mutable_buffer
.as_ref()
.context(DatabaseNotReadable)?
.field_column_names(predicate)
.await
.context(MutableBufferRead)
}
async fn column_values(
&self,
column_name: &str,
@ -327,12 +314,11 @@ impl Database for Db {
.context(MutableBufferRead)
}
async fn partition_keys(&self) -> Result<Vec<String>, Self::Error> {
fn partition_keys(&self) -> Result<Vec<String>, Self::Error> {
self.mutable_buffer
.as_ref()
.context(DatabaseNotReadable)?
.partition_keys()
.await
.context(MutableBufferRead)
}
}
@ -392,7 +378,7 @@ mod tests {
let db = make_db();
let mut writer = TestLPWriter::default();
writer.write_lp_string(&db, "cpu bar=1 10").await.unwrap();
assert_eq!(vec!["1970-01-01T00"], db.partition_keys().await.unwrap());
assert_eq!(vec!["1970-01-01T00"], db.partition_keys().unwrap());
let mb_chunk = db.rollover_partition("1970-01-01T00").await.unwrap();
assert_eq!(mb_chunk.id(), 0);
@ -448,8 +434,8 @@ mod tests {
// we should have chunks in both the mutable buffer and read buffer
// (Note the currently open chunk is not listed)
assert_eq!(mutable_chunk_ids(&db, partition_key).await, vec![0, 1]);
assert_eq!(read_buffer_chunk_ids(&db, partition_key).await, vec![0]);
assert_eq!(mutable_chunk_ids(&db, partition_key), vec![0, 1]);
assert_eq!(read_buffer_chunk_ids(&db, partition_key), vec![0]);
// data should be readable
let expected = vec![
@ -468,8 +454,8 @@ mod tests {
.await
.unwrap();
assert_eq!(mutable_chunk_ids(&db, partition_key).await, vec![1]);
assert_eq!(read_buffer_chunk_ids(&db, partition_key).await, vec![0]);
assert_eq!(mutable_chunk_ids(&db, partition_key), vec![1]);
assert_eq!(read_buffer_chunk_ids(&db, partition_key), vec![0]);
let batches = run_query(&db, "select * from cpu").await;
assert_table_eq!(&expected, &batches);
@ -479,7 +465,7 @@ mod tests {
.await
.unwrap();
assert_eq!(
read_buffer_chunk_ids(&db, partition_key).await,
read_buffer_chunk_ids(&db, partition_key),
vec![] as Vec<u32>
);
@ -499,9 +485,9 @@ mod tests {
writer.write_lp_string(&db, "cpu bar=1 10").await.unwrap();
writer.write_lp_string(&db, "cpu bar=1 20").await.unwrap();
assert_eq!(mutable_chunk_ids(&db, partition_key).await, vec![0]);
assert_eq!(mutable_chunk_ids(&db, partition_key), vec![0]);
assert_eq!(
read_buffer_chunk_ids(&db, partition_key).await,
read_buffer_chunk_ids(&db, partition_key),
vec![] as Vec<u32>
);
@ -519,8 +505,8 @@ mod tests {
writer.write_lp_string(&db, "cpu bar=1 40").await.unwrap();
assert_eq!(mutable_chunk_ids(&db, partition_key).await, vec![0, 1, 2]);
assert_eq!(read_buffer_chunk_ids(&db, partition_key).await, vec![1]);
assert_eq!(mutable_chunk_ids(&db, partition_key), vec![0, 1, 2]);
assert_eq!(read_buffer_chunk_ids(&db, partition_key), vec![1]);
}
// run a sql query against the database, returning the results as record batches
@ -533,10 +519,9 @@ mod tests {
collect(physical_plan).await.unwrap()
}
async fn mutable_chunk_ids(db: &Db, partition_key: &str) -> Vec<u32> {
fn mutable_chunk_ids(db: &Db, partition_key: &str) -> Vec<u32> {
let mut chunk_ids: Vec<u32> = db
.mutable_buffer_chunks(partition_key)
.await
.iter()
.map(|chunk| chunk.id())
.collect();
@ -544,10 +529,9 @@ mod tests {
chunk_ids
}
async fn read_buffer_chunk_ids(db: &Db, partition_key: &str) -> Vec<u32> {
fn read_buffer_chunk_ids(db: &Db, partition_key: &str) -> Vec<u32> {
let mut chunk_ids: Vec<u32> = db
.read_buffer_chunks(partition_key)
.await
.iter()
.map(|chunk| chunk.id())
.collect();

View File

@ -246,7 +246,7 @@ impl PartitionChunk for DBChunk {
// Note Mutable buffer doesn't support predicate
// pushdown (other than pruning out the entire chunk
// via `might_pass_predicate)
let schema: Schema = self.table_schema(table_name, selection.clone()).await?;
let schema: Schema = self.table_schema(table_name, selection).await?;
Ok(Box::pin(MutableBufferChunkStream::new(
Arc::clone(&chunk),
@ -339,7 +339,13 @@ impl PartitionChunk for DBChunk {
let chunk_ids = &[chunk_id];
let names = db
.column_names(partition_key, table_name, chunk_ids, rb_predicate)
.column_names(
partition_key,
table_name,
chunk_ids,
rb_predicate,
Selection::All,
)
.context(ReadBufferChunk { chunk_id })?;
Ok(names)

View File

@ -216,7 +216,7 @@ impl<M: ConnectionManager> Server<M> {
.put(
&location,
futures::stream::once(async move { stream_data }),
len,
Some(len),
)
.await
.context(StoreError)?;

View File

@ -1,2 +1,3 @@
pub mod field_columns;
pub mod table_names;
pub mod tag_column_names;

View File

@ -0,0 +1,166 @@
use arrow_deps::{
arrow::datatypes::DataType,
assert_table_eq,
datafusion::logical_plan::{col, lit},
};
use query::{
exec::{
fieldlist::{Field, FieldList},
Executor,
},
frontend::influxrpc::InfluxRPCPlanner,
predicate::PredicateBuilder,
};
use crate::query_tests::scenarios::*;
/// Creates and loads several database scenarios using the db_setup
/// function.
///
/// runs field_column_names(predicate) and compares it to the expected
/// output
macro_rules! run_field_columns_test_case {
($DB_SETUP:expr, $PREDICATE:expr, $EXPECTED_FIELDS:expr) => {
test_helpers::maybe_start_logging();
let predicate = $PREDICATE;
let expected_fields = $EXPECTED_FIELDS;
for scenario in $DB_SETUP.make().await {
let DBScenario {
scenario_name, db, ..
} = scenario;
println!("Running scenario '{}'", scenario_name);
println!("Predicate: '{:#?}'", predicate);
let planner = InfluxRPCPlanner::new();
let executor = Executor::new();
let plan = planner
.field_columns(&db, predicate.clone())
.await
.expect("built plan successfully");
let fields = executor
.to_field_list(plan)
.await
.expect("converted plan to strings successfully");
assert_eq!(
fields, expected_fields,
"Error in scenario '{}'\n\nexpected:\n{:#?}\nactual:\n{:#?}",
scenario_name, expected_fields, fields
);
}
};
}
#[tokio::test]
async fn test_field_columns_empty_database() {
let predicate = PredicateBuilder::default().build();
let expected_fields = FieldList::default();
run_field_columns_test_case!(NoData {}, predicate, expected_fields);
}
#[tokio::test]
async fn test_field_columns_no_predicate() {
let predicate = PredicateBuilder::default()
.table("NoSuchTable")
.add_expr(col("state").eq(lit("MA"))) // state=MA
.build();
let expected_fields = FieldList::default();
run_field_columns_test_case!(TwoMeasurementsManyFields {}, predicate, expected_fields);
}
#[tokio::test]
async fn test_field_columns_with_pred() {
// get only fields from h20 (but both chunks)
let predicate = PredicateBuilder::default()
.table("h2o")
.add_expr(col("state").eq(lit("MA"))) // state=MA
.build();
let expected_fields = FieldList {
fields: vec![
Field {
name: "moisture".into(),
data_type: DataType::Float64,
last_timestamp: 100000,
},
Field {
name: "other_temp".into(),
data_type: DataType::Float64,
last_timestamp: 250,
},
Field {
name: "temp".into(),
data_type: DataType::Float64,
last_timestamp: 100000,
},
],
};
run_field_columns_test_case!(TwoMeasurementsManyFields {}, predicate, expected_fields);
}
#[tokio::test]
async fn test_field_columns_with_ts_pred() {
let predicate = PredicateBuilder::default()
.table("h2o")
.timestamp_range(200, 300)
.add_expr(col("state").eq(lit("MA"))) // state=MA
.build();
let expected_fields = FieldList {
fields: vec![Field {
name: "other_temp".into(),
data_type: DataType::Float64,
last_timestamp: 250,
}],
};
run_field_columns_test_case!(TwoMeasurementsManyFields {}, predicate, expected_fields);
}
#[tokio::test]
async fn test_field_name_plan() {
test_helpers::maybe_start_logging();
// Tests that the ordering that comes out is reasonable
let scenarios = OneMeasurementManyFields {}.make().await;
for scenario in scenarios {
let predicate = PredicateBuilder::default().timestamp_range(0, 200).build();
let DBScenario {
scenario_name, db, ..
} = scenario;
println!("Running scenario '{}'", scenario_name);
println!("Predicate: '{:#?}'", predicate);
let planner = InfluxRPCPlanner::new();
let executor = Executor::new();
let plan = planner
.field_columns(&db, predicate.clone())
.await
.expect("built plan successfully");
let mut plans = plan.plans;
let plan = plans.pop().unwrap();
assert!(plans.is_empty()); // only one plan
// run the created plan directly, ensuring the output is as
// expected (specifically that the column ordering is correct)
let results = executor
.run_logical_plan(plan)
.await
.expect("ok running plan");
let expected = vec![
"+--------+--------+--------+--------+------+",
"| field1 | field2 | field3 | field4 | time |",
"+--------+--------+--------+--------+------+",
"| 70.6 | | 2 | | 100 |",
"| 70.4 | ss | | | 100 |",
"| 70.5 | ss | | | 100 |",
"+--------+--------+--------+--------+------+",
];
assert_table_eq!(expected, &results);
}
}

View File

@ -17,7 +17,9 @@ macro_rules! run_table_names_test_case {
test_helpers::maybe_start_logging();
let predicate = $PREDICATE;
for scenario in $DB_SETUP.make().await {
let DBScenario { scenario_name, db } = scenario;
let DBScenario {
scenario_name, db, ..
} = scenario;
println!("Running scenario '{}'", scenario_name);
println!("Predicate: '{:#?}'", predicate);
let planner = InfluxRPCPlanner::new();

View File

@ -13,7 +13,7 @@ use crate::query_tests::scenarios::*;
/// Creates and loads several database scenarios using the db_setup
/// function.
///
/// runs table_tag_column_names(predicate) and compares it to the expected
/// runs table_column_names(predicate) and compares it to the expected
/// output
macro_rules! run_tag_column_names_test_case {
($DB_SETUP:expr, $PREDICATE:expr, $EXPECTED_NAMES:expr) => {
@ -21,7 +21,9 @@ macro_rules! run_tag_column_names_test_case {
let predicate = $PREDICATE;
let expected_names = $EXPECTED_NAMES;
for scenario in $DB_SETUP.make().await {
let DBScenario { scenario_name, db } = scenario;
let DBScenario {
scenario_name, db, ..
} = scenario;
println!("Running scenario '{}'", scenario_name);
println!("Predicate: '{:#?}'", predicate);
let planner = InfluxRPCPlanner::new();

View File

@ -36,8 +36,8 @@ impl DBSetup for NoData {
// listing partitions (which may create an entry in a map)
// in an empty database
let db = make_db();
assert_eq!(db.mutable_buffer_chunks(partition_key).await.len(), 1); // only open chunk
assert_eq!(db.read_buffer_chunks(partition_key).await.len(), 0);
assert_eq!(db.mutable_buffer_chunks(partition_key).len(), 1); // only open chunk
assert_eq!(db.read_buffer_chunks(partition_key).len(), 0);
let scenario2 = DBScenario {
scenario_name: "New, Empty Database after partitions are listed".into(),
db,
@ -55,9 +55,9 @@ impl DBSetup for NoData {
.await
.unwrap();
assert_eq!(db.mutable_buffer_chunks(partition_key).await.len(), 1);
assert_eq!(db.mutable_buffer_chunks(partition_key).len(), 1);
assert_eq!(db.read_buffer_chunks(partition_key).await.len(), 0); // only open chunk
assert_eq!(db.read_buffer_chunks(partition_key).len(), 0); // only open chunk
let scenario3 = DBScenario {
scenario_name: "Empty Database after drop chunk".into(),
@ -78,51 +78,7 @@ impl DBSetup for TwoMeasurements {
cpu,region=west user=21.0 150\n\
disk,region=east bytes=99i 200";
let db = make_db();
let mut writer = TestLPWriter::default();
writer.write_lp_string(&db, data).await.unwrap();
let scenario1 = DBScenario {
scenario_name: "Data in open chunk of mutable buffer".into(),
db,
};
let db = make_db();
let mut writer = TestLPWriter::default();
writer.write_lp_string(&db, data).await.unwrap();
db.rollover_partition(partition_key).await.unwrap();
let scenario2 = DBScenario {
scenario_name: "Data in closed chunk of mutable buffer".into(),
db,
};
let db = make_db();
let mut writer = TestLPWriter::default();
writer.write_lp_string(&db, data).await.unwrap();
db.rollover_partition(partition_key).await.unwrap();
db.load_chunk_to_read_buffer(partition_key, 0)
.await
.unwrap();
let scenario3 = DBScenario {
scenario_name: "Data in both read buffer and mutable buffer".into(),
db,
};
let db = make_db();
let mut writer = TestLPWriter::default();
writer.write_lp_string(&db, data).await.unwrap();
db.rollover_partition(partition_key).await.unwrap();
db.load_chunk_to_read_buffer(partition_key, 0)
.await
.unwrap();
db.drop_mutable_buffer_chunk(partition_key, 0)
.await
.unwrap();
let scenario4 = DBScenario {
scenario_name: "Data in only buffer and not mutable buffer".into(),
db,
};
vec![scenario1, scenario2, scenario3, scenario4]
make_one_chunk_scenarios(partition_key, data).await
}
}
@ -138,71 +94,7 @@ impl DBSetup for MultiChunkSchemaMerge {
let data2 = "cpu,region=east,host=foo user=23.2 100\n\
cpu,region=west,host=bar user=21.0 250";
let db = make_db();
let mut writer = TestLPWriter::default();
writer.write_lp_string(&db, data1).await.unwrap();
writer.write_lp_string(&db, data2).await.unwrap();
let scenario1 = DBScenario {
scenario_name: "Data in single open chunk of mutable buffer".into(),
db,
};
// spread across 2 mutable buffer chunks
let db = make_db();
let mut writer = TestLPWriter::default();
writer.write_lp_string(&db, data1).await.unwrap();
db.rollover_partition(partition_key).await.unwrap();
writer.write_lp_string(&db, data2).await.unwrap();
let scenario2 = DBScenario {
scenario_name: "Data in open chunk and closed chunk of mutable buffer".into(),
db,
};
// spread across 1 mutable buffer, 1 read buffer chunks
let db = make_db();
let mut writer = TestLPWriter::default();
writer.write_lp_string(&db, data1).await.unwrap();
db.rollover_partition(partition_key).await.unwrap();
db.load_chunk_to_read_buffer(partition_key, 0)
.await
.unwrap();
db.drop_mutable_buffer_chunk(partition_key, 0)
.await
.unwrap();
writer.write_lp_string(&db, data2).await.unwrap();
let scenario3 = DBScenario {
scenario_name: "Data in open chunk of mutable buffer, and one chunk of read buffer"
.into(),
db,
};
// in 2 read buffer chunks
let db = make_db();
let mut writer = TestLPWriter::default();
writer.write_lp_string(&db, data1).await.unwrap();
db.rollover_partition(partition_key).await.unwrap();
writer.write_lp_string(&db, data2).await.unwrap();
db.rollover_partition(partition_key).await.unwrap();
db.load_chunk_to_read_buffer(partition_key, 0)
.await
.unwrap();
db.drop_mutable_buffer_chunk(partition_key, 0)
.await
.unwrap();
db.load_chunk_to_read_buffer(partition_key, 1)
.await
.unwrap();
db.drop_mutable_buffer_chunk(partition_key, 1)
.await
.unwrap();
let scenario4 = DBScenario {
scenario_name: "Data in two read buffer chunks".into(),
db,
};
vec![scenario1, scenario2, scenario3, scenario4]
make_two_chunk_scenarios(partition_key, data1, data2).await
}
}
@ -212,78 +104,48 @@ pub struct TwoMeasurementsManyNulls {}
impl DBSetup for TwoMeasurementsManyNulls {
async fn make(&self) -> Vec<DBScenario> {
let partition_key = "1970-01-01T00";
let lp_data1 = "h2o,state=CA,city=LA,county=LA temp=70.4 100\n\
let data1 = "h2o,state=CA,city=LA,county=LA temp=70.4 100\n\
h2o,state=MA,city=Boston,county=Suffolk temp=72.4 250\n\
o2,state=MA,city=Boston temp=50.4 200\n\
o2,state=CA temp=79.0 300\n";
let lp_data2 = "o2,state=NY temp=60.8 400\n\
let data2 = "o2,state=NY temp=60.8 400\n\
o2,state=NY,city=NYC temp=61.0 500\n\
o2,state=NY,city=NYC,borough=Brooklyn temp=61.0 600\n";
let db = make_db();
let mut writer = TestLPWriter::default();
writer.write_lp_string(&db, lp_data1).await.unwrap();
writer.write_lp_string(&db, lp_data2).await.unwrap();
let scenario1 = DBScenario {
scenario_name: "Data in open chunk of mutable buffer".into(),
db,
};
make_two_chunk_scenarios(partition_key, data1, data2).await
}
}
let db = make_db();
let mut writer = TestLPWriter::default();
writer.write_lp_string(&db, lp_data1).await.unwrap();
db.rollover_partition(partition_key).await.unwrap();
writer.write_lp_string(&db, lp_data2).await.unwrap();
let scenario2 = DBScenario {
scenario_name: "Data in one open chunk, one closed chunk of mutable buffer".into(),
db,
};
pub struct TwoMeasurementsManyFields {}
#[async_trait]
impl DBSetup for TwoMeasurementsManyFields {
async fn make(&self) -> Vec<DBScenario> {
let partition_key = "1970-01-01T00";
let db = make_db();
let mut writer = TestLPWriter::default();
writer.write_lp_string(&db, lp_data1).await.unwrap();
db.rollover_partition(partition_key).await.unwrap();
writer.write_lp_string(&db, lp_data2).await.unwrap();
db.rollover_partition(partition_key).await.unwrap();
db.load_chunk_to_read_buffer(partition_key, 0)
.await
.unwrap();
db.drop_mutable_buffer_chunk(partition_key, 0)
.await
.unwrap();
let scenario3 = DBScenario {
scenario_name: "One data chunk in read buffer, one chunk of mutable buffer".into(),
db,
};
let data1 = "h2o,state=MA,city=Boston temp=70.4 50\n\
h2o,state=MA,city=Boston other_temp=70.4 250\n\
h2o,state=CA,city=Boston other_temp=72.4 350\n\
o2,state=MA,city=Boston temp=53.4,reading=51 50\n\
o2,state=CA temp=79.0 300";
let data2 = "h2o,state=MA,city=Boston temp=70.4,moisture=43.0 100000";
make_two_chunk_scenarios(partition_key, data1, data2).await
}
}
let db = make_db();
let mut writer = TestLPWriter::default();
writer.write_lp_string(&db, lp_data1).await.unwrap();
db.rollover_partition(partition_key).await.unwrap();
writer.write_lp_string(&db, lp_data2).await.unwrap();
db.rollover_partition(partition_key).await.unwrap();
pub struct OneMeasurementManyFields {}
#[async_trait]
impl DBSetup for OneMeasurementManyFields {
async fn make(&self) -> Vec<DBScenario> {
let partition_key = "1970-01-01T00";
db.load_chunk_to_read_buffer(partition_key, 0)
.await
.unwrap();
db.drop_mutable_buffer_chunk(partition_key, 0)
.await
.unwrap();
// Order this so field3 comes before field2
// (and thus the columns need to get reordered)
let data = "h2o,tag1=foo,tag2=bar field1=70.6,field3=2 100\n\
h2o,tag1=foo,tag2=bar field1=70.4,field2=\"ss\" 100\n\
h2o,tag1=foo,tag2=bar field1=70.5,field2=\"ss\" 100\n\
h2o,tag1=foo,tag2=bar field1=70.6,field4=true 1000";
db.load_chunk_to_read_buffer(partition_key, 1)
.await
.unwrap();
db.drop_mutable_buffer_chunk(partition_key, 1)
.await
.unwrap();
let scenario4 = DBScenario {
scenario_name: "Data in 2 read buffer chunks".into(),
db,
};
vec![scenario1, scenario2, scenario3, scenario4]
make_one_chunk_scenarios(partition_key, data).await
}
}
@ -314,3 +176,134 @@ impl DBSetup for EndToEndTest {
vec![scenario1]
}
}
/// This function loads two chunks of lp data into 4 different scenarios
///
/// Data in single open mutable buffer chunk
/// Data in single closed mutable buffer chunk, one closed mutable chunk
/// Data in both read buffer and mutable buffer chunk
/// Data in one only read buffer chunk
async fn make_one_chunk_scenarios(partition_key: &str, data: &str) -> Vec<DBScenario> {
let db = make_db();
let mut writer = TestLPWriter::default();
writer.write_lp_string(&db, data).await.unwrap();
let scenario1 = DBScenario {
scenario_name: "Data in open chunk of mutable buffer".into(),
db,
};
let db = make_db();
let mut writer = TestLPWriter::default();
writer.write_lp_string(&db, data).await.unwrap();
db.rollover_partition(partition_key).await.unwrap();
let scenario2 = DBScenario {
scenario_name: "Data in closed chunk of mutable buffer".into(),
db,
};
let db = make_db();
let mut writer = TestLPWriter::default();
writer.write_lp_string(&db, data).await.unwrap();
db.rollover_partition(partition_key).await.unwrap();
db.load_chunk_to_read_buffer(partition_key, 0)
.await
.unwrap();
let scenario3 = DBScenario {
scenario_name: "Data in both read buffer and mutable buffer".into(),
db,
};
let db = make_db();
let mut writer = TestLPWriter::default();
writer.write_lp_string(&db, data).await.unwrap();
db.rollover_partition(partition_key).await.unwrap();
db.load_chunk_to_read_buffer(partition_key, 0)
.await
.unwrap();
db.drop_mutable_buffer_chunk(partition_key, 0)
.await
.unwrap();
let scenario4 = DBScenario {
scenario_name: "Data in only read buffer and not mutable buffer".into(),
db,
};
vec![scenario1, scenario2, scenario3, scenario4]
}
/// This function loads two chunks of lp data into 4 different scenarios
///
/// Data in single open mutable buffer chunk
/// Data in one open mutable buffer chunk, one closed mutable chunk
/// Data in one open mutable buffer chunk, one read buffer chunk
/// Data in one two read buffer chunks,
async fn make_two_chunk_scenarios(
partition_key: &str,
data1: &str,
data2: &str,
) -> Vec<DBScenario> {
let db = make_db();
let mut writer = TestLPWriter::default();
writer.write_lp_string(&db, data1).await.unwrap();
writer.write_lp_string(&db, data2).await.unwrap();
let scenario1 = DBScenario {
scenario_name: "Data in single open chunk of mutable buffer".into(),
db,
};
// spread across 2 mutable buffer chunks
let db = make_db();
let mut writer = TestLPWriter::default();
writer.write_lp_string(&db, data1).await.unwrap();
db.rollover_partition(partition_key).await.unwrap();
writer.write_lp_string(&db, data2).await.unwrap();
let scenario2 = DBScenario {
scenario_name: "Data in one open chunk and one closed chunk of mutable buffer".into(),
db,
};
// spread across 1 mutable buffer, 1 read buffer chunks
let db = make_db();
let mut writer = TestLPWriter::default();
writer.write_lp_string(&db, data1).await.unwrap();
db.rollover_partition(partition_key).await.unwrap();
db.load_chunk_to_read_buffer(partition_key, 0)
.await
.unwrap();
db.drop_mutable_buffer_chunk(partition_key, 0)
.await
.unwrap();
writer.write_lp_string(&db, data2).await.unwrap();
let scenario3 = DBScenario {
scenario_name: "Data in open chunk of mutable buffer, and one chunk of read buffer".into(),
db,
};
// in 2 read buffer chunks
let db = make_db();
let mut writer = TestLPWriter::default();
writer.write_lp_string(&db, data1).await.unwrap();
db.rollover_partition(partition_key).await.unwrap();
writer.write_lp_string(&db, data2).await.unwrap();
db.rollover_partition(partition_key).await.unwrap();
db.load_chunk_to_read_buffer(partition_key, 0)
.await
.unwrap();
db.drop_mutable_buffer_chunk(partition_key, 0)
.await
.unwrap();
db.load_chunk_to_read_buffer(partition_key, 1)
.await
.unwrap();
db.drop_mutable_buffer_chunk(partition_key, 1)
.await
.unwrap();
let scenario4 = DBScenario {
scenario_name: "Data in two read buffer chunks".into(),
db,
};
vec![scenario1, scenario2, scenario3, scenario4]
}

View File

@ -16,7 +16,9 @@ macro_rules! run_sql_test_case {
test_helpers::maybe_start_logging();
let sql = $SQL.to_string();
for scenario in $DB_SETUP.make().await {
let DBScenario { scenario_name, db } = scenario;
let DBScenario {
scenario_name, db, ..
} = scenario;
println!("Running scenario '{}'", scenario_name);
println!("SQL: '{:#?}'", sql);
let planner = SQLQueryPlanner::new();

View File

@ -19,7 +19,9 @@ macro_rules! run_table_schema_test_case {
let expected_schema = $EXPECTED_SCHEMA;
for scenario in $DB_SETUP.make().await {
let DBScenario { scenario_name, db } = scenario;
let DBScenario {
scenario_name, db, ..
} = scenario;
println!("Running scenario '{}'", scenario_name);
println!(
"Getting schema for table '{}', selection {:?}",
@ -29,8 +31,8 @@ macro_rules! run_table_schema_test_case {
// Make sure at least one table has data
let mut chunks_with_table = 0;
for partition_key in db.partition_keys().await.unwrap() {
for chunk in db.chunks(&partition_key).await {
for partition_key in db.partition_keys().unwrap() {
for chunk in db.chunks(&partition_key) {
if chunk.has_table(table_name) {
chunks_with_table += 1;
let actual_schema = chunk

View File

@ -194,7 +194,7 @@ where
.put(
&partition_meta_path,
futures::stream::once(async move { stream_data }),
len,
Some(len),
)
.await
.context(WritingToObjectStore)?;
@ -247,7 +247,7 @@ where
.put(
&file_name,
futures::stream::once(async move { stream_data }),
len,
Some(len),
)
.await
.context(WritingToObjectStore)
@ -393,7 +393,7 @@ mem,host=A,region=west used=45 1
let mut data_path = store.new_path();
data_path.push_dir("data");
let chunk = Arc::clone(&db.chunks("1970-01-01T00").await[0]);
let chunk = Arc::clone(&db.chunks("1970-01-01T00")[0]);
let snapshot = snapshot_chunk(
metadata_path.clone(),

View File

@ -145,7 +145,8 @@ pub async fn main(logging_level: LoggingLevel, config: Option<Config>) -> Result
.serve(router_service);
info!(bind_address=?bind_addr, "HTTP server listening");
println!("InfluxDB IOx server ready");
let git_hash = option_env!("GIT_HASH").unwrap_or("UNKNOWN");
info!(git_hash, "InfluxDB IOx server ready");
// Wait for both the servers to complete
let (grpc_server, server) = futures::future::join(grpc_server, http_server).await;

View File

@ -668,14 +668,13 @@ async fn list_partitions<M: ConnectionManager + Send + Sync + Debug + 'static>(
bucket: &info.bucket,
})?;
let partition_keys = db
.partition_keys()
.await
.map_err(|e| Box::new(e) as _)
.context(BucketByName {
org: &info.org,
bucket_name: &info.bucket,
})?;
let partition_keys =
db.partition_keys()
.map_err(|e| Box::new(e) as _)
.context(BucketByName {
org: &info.org,
bucket_name: &info.bucket,
})?;
let result = serde_json::to_string(&partition_keys).context(JsonGenerationError)?;

View File

@ -1061,7 +1061,7 @@ async fn field_names_impl<T>(
rpc_predicate: Option<Predicate>,
) -> Result<FieldList>
where
T: DatabaseStore,
T: DatabaseStore + 'static,
{
let rpc_predicate_string = format!("{:?}", rpc_predicate);
@ -1080,14 +1080,16 @@ where
.await
.context(DatabaseNotFound { db_name })?;
let executor = db_store.executor();
let planner = InfluxRPCPlanner::new();
let field_list_plan = db
.field_column_names(predicate)
let field_list_plan = planner
.field_columns(db.as_ref(), predicate)
.await
.map_err(|e| Box::new(e) as _)
.context(ListingFields { db_name })?;
let executor = db_store.executor();
let field_list = executor
.to_field_list(field_list_plan)
.await
@ -1102,17 +1104,11 @@ mod tests {
use super::super::id::ID;
use super::*;
use arrow_deps::{
arrow::datatypes::DataType,
datafusion::logical_plan::{col, lit, Expr},
};
use arrow_deps::datafusion::logical_plan::{col, lit, Expr};
use panic_logging::SendPanicsToTracing;
use query::{
exec::fieldlist::{Field, FieldList},
exec::FieldListPlan,
exec::SeriesSetPlans,
group_by::{Aggregate as QueryAggregate, WindowDuration as QueryWindowDuration},
test::FieldColumnsRequest,
test::QueryGroupsRequest,
test::TestDatabaseStore,
test::{ColumnValuesRequest, QuerySeriesRequest, TestChunk},
@ -1538,34 +1534,73 @@ mod tests {
actual_tag_values, tag_values,
"unexpected tag values while getting tag values for measurement names"
);
}
#[tokio::test]
async fn test_storage_rpc_tag_values_field() {
// Start a test gRPC server on a randomally allocated port
let mut fixture = Fixture::new().await.expect("Connecting to test server");
let db_info = OrgAndBucket::new(123, 456);
let partition_id = 1;
// Add a chunk with a field
let chunk = TestChunk::new(0)
.with_int_field_column("TheMeasurement", "Field1")
.with_time_column("TheMeasurement")
.with_tag_column("TheMeasurement", "state")
.with_one_row_of_null_data("TheMeasurement");
fixture
.test_storage
.db_or_create(&db_info.db_name)
.await
.unwrap()
.add_chunk("my_partition_key", Arc::new(chunk));
let source = Some(StorageClientWrapper::read_source(
db_info.org_id,
db_info.bucket_id,
partition_id,
));
// ---
// test tag_key = _field means listing all field names
// ---
let request = TagValuesRequest {
tags_source: source.clone(),
range: make_timestamp_range(1000, 1500),
predicate: None,
range: make_timestamp_range(0, 2000),
predicate: make_state_ma_predicate(),
tag_key: [255].into(),
};
// Setup a single field name (Field1)
let fieldlist = FieldList {
fields: vec![Field {
name: "Field1".into(),
data_type: DataType::Utf8,
last_timestamp: 1000,
}],
};
let fieldlist_plan = FieldListPlan::Known(Ok(fieldlist));
test_db.set_field_colum_names_values(fieldlist_plan);
let expected_tag_values = vec!["Field1"];
let actual_tag_values = fixture.storage_client.tag_values(request).await.unwrap();
assert_eq!(
actual_tag_values, expected_tag_values,
"unexpected tag values while getting tag values for field names"
);
}
#[tokio::test]
async fn test_storage_rpc_tag_values_error() {
// Start a test gRPC server on a randomally allocated port
let mut fixture = Fixture::new().await.expect("Connecting to test server");
let db_info = OrgAndBucket::new(123, 456);
let partition_id = 1;
let test_db = fixture
.test_storage
.db_or_create(&db_info.db_name)
.await
.expect("creating test database");
let source = Some(StorageClientWrapper::read_source(
db_info.org_id,
db_info.bucket_id,
partition_id,
));
// ---
// test error
@ -2135,18 +2170,28 @@ mod tests {
}
#[tokio::test]
async fn test_measurement_fields() -> Result<(), tonic::Status> {
async fn test_measurement_fields() {
test_helpers::maybe_start_logging();
// Start a test gRPC server on a randomally allocated port
let mut fixture = Fixture::new().await.expect("Connecting to test server");
let db_info = OrgAndBucket::new(123, 456);
let partition_id = 1;
let test_db = fixture
// Add a chunk with a field
let chunk = TestChunk::new(0)
.with_int_field_column("TheMeasurement", "Field1")
.with_time_column("TheMeasurement")
.with_tag_column("TheMeasurement", "state")
.with_one_row_of_null_data("TheMeasurement");
fixture
.test_storage
.db_or_create(&db_info.db_name)
.await
.expect("creating test database");
.unwrap()
.add_chunk("my_partition_key", Arc::new(chunk));
let source = Some(StorageClientWrapper::read_source(
db_info.org_id,
@ -2157,37 +2202,45 @@ mod tests {
let request = MeasurementFieldsRequest {
source: source.clone(),
measurement: "TheMeasurement".into(),
range: make_timestamp_range(150, 200),
range: make_timestamp_range(0, 2000),
predicate: make_state_ma_predicate(),
};
let expected_request = FieldColumnsRequest {
predicate: "Predicate { table_names: TheMeasurement exprs: [#state Eq Utf8(\"MA\")] range: TimestampRange { start: 150, end: 200 }}".into()
};
let fieldlist = FieldList {
fields: vec![Field {
name: "Field1".into(),
data_type: DataType::Utf8,
last_timestamp: 1000,
}],
};
let fieldlist_plan = FieldListPlan::Known(Ok(fieldlist));
test_db.set_field_colum_names_values(fieldlist_plan);
let actual_fields = fixture.storage_client.measurement_fields(request).await?;
let expected_fields: Vec<String> = vec!["key: Field1, type: 3, timestamp: 1000".into()];
let actual_fields = fixture
.storage_client
.measurement_fields(request)
.await
.unwrap();
let expected_fields: Vec<String> = vec!["key: Field1, type: 1, timestamp: 1000".into()];
assert_eq!(
actual_fields, expected_fields,
"unexpected frames returned by measuremnt_fields"
);
assert_eq!(
test_db.get_field_columns_request(),
Some(expected_request),
"unexpected request to measurement-fields"
"unexpected frames returned by measurement_fields"
);
}
#[tokio::test]
async fn test_measurement_fields_error() {
// Start a test gRPC server on a randomally allocated port
let mut fixture = Fixture::new().await.expect("Connecting to test server");
let db_info = OrgAndBucket::new(123, 456);
let partition_id = 1;
let chunk = TestChunk::new(0).with_error("Sugar we are going down");
fixture
.test_storage
.db_or_create(&db_info.db_name)
.await
.unwrap()
.add_chunk("my_partition_key", Arc::new(chunk));
let source = Some(StorageClientWrapper::read_source(
db_info.org_id,
db_info.bucket_id,
partition_id,
));
// ---
// test error
@ -2202,21 +2255,7 @@ mod tests {
// Note we don't set the response on the test database, so we expect an error
let response = fixture.storage_client.measurement_fields(request).await;
assert!(response.is_err());
let response_string = format!("{:?}", response);
let expected_error = "No saved field_column_name in TestDatabase";
assert!(
response_string.contains(expected_error),
"'{}' did not contain expected content '{}'",
response_string,
expected_error
);
let expected_request = Some(FieldColumnsRequest {
predicate: "Predicate { table_names: TheMeasurement}".into(),
});
assert_eq!(test_db.get_field_columns_request(), expected_request);
Ok(())
assert_contains!(response.unwrap_err().to_string(), "Sugar we are going down");
}
fn make_timestamp_range(start: i64, end: i64) -> Option<TimestampRange> {