Merge branch 'main' into cn/only-read-server
commit
04b4eeb6ec
|
|
@ -283,9 +283,9 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "backtrace"
|
||||
version = "0.3.61"
|
||||
version = "0.3.62"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e7a905d892734eea339e896738c14b9afce22b5318f64b951e70bf3844419b01"
|
||||
checksum = "091bcdf2da9950f96aa522681ce805e6857f6ca8df73833d35736ab2dc78e152"
|
||||
dependencies = [
|
||||
"addr2line",
|
||||
"cc",
|
||||
|
|
@ -1429,9 +1429,9 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "http-body"
|
||||
version = "0.4.3"
|
||||
version = "0.4.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "399c583b2979440c60be0821a6199eca73bc3c8dcd9d070d75ac726e2c6186e5"
|
||||
checksum = "1ff4f84919677303da5f147645dbea6b1881f368d03ac84e1dc09031ebd7b2c6"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"http",
|
||||
|
|
@ -1458,9 +1458,9 @@ checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4"
|
|||
|
||||
[[package]]
|
||||
name = "hyper"
|
||||
version = "0.14.13"
|
||||
version = "0.14.14"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "15d1cfb9e4f68655fa04c01f59edb405b6074a0f7118ea881e5026e4a1cd8593"
|
||||
checksum = "2b91bb1f221b6ea1f1e4371216b70f40748774c2fb5971b450c07773fb92d26b"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"futures-channel",
|
||||
|
|
@ -1931,9 +1931,9 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "libc"
|
||||
version = "0.2.104"
|
||||
version = "0.2.105"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7b2f96d100e1cf1929e7719b7edb3b90ab5298072638fccd77be9ce942ecdfce"
|
||||
checksum = "869d572136620d55835903746bcb5cdc54cb2851fd0aeec53220b4bb65ef3013"
|
||||
|
||||
[[package]]
|
||||
name = "libloading"
|
||||
|
|
@ -2035,9 +2035,9 @@ checksum = "3e2e65a1a2e43cfcb47a895c4c8b10d1f4a61097f9f254f183aee60cad9c651d"
|
|||
|
||||
[[package]]
|
||||
name = "matchers"
|
||||
version = "0.0.1"
|
||||
version = "0.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f099785f7595cc4b4553a174ce30dd7589ef93391ff414dbb67f62392b9e0ce1"
|
||||
checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558"
|
||||
dependencies = [
|
||||
"regex-automata",
|
||||
]
|
||||
|
|
@ -2219,6 +2219,7 @@ dependencies = [
|
|||
"bytes",
|
||||
"criterion",
|
||||
"flate2",
|
||||
"hashbrown",
|
||||
"influxdb_line_protocol",
|
||||
"mutable_batch",
|
||||
"schema",
|
||||
|
|
@ -2504,9 +2505,9 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "object"
|
||||
version = "0.26.2"
|
||||
version = "0.27.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "39f37e50073ccad23b6d09bcb5b263f4e76d3bb6038e4a3c08e52162ffa8abc2"
|
||||
checksum = "67ac1d3f9a1d3616fd9a60c8d74296f22406a238b6a72f5cc1e6f314df4ffbf9"
|
||||
dependencies = [
|
||||
"memchr",
|
||||
]
|
||||
|
|
@ -4518,9 +4519,9 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "tower"
|
||||
version = "0.4.9"
|
||||
version = "0.4.10"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d15a6b60cdff0cb039d81d3b37f8bc3d7e53dca09069aae3ef2502ca4834fe30"
|
||||
checksum = "c00e500fff5fa1131c866b246041a6bf96da9c965f8fe4128cb1421f23e93c00"
|
||||
dependencies = [
|
||||
"futures-core",
|
||||
"futures-util",
|
||||
|
|
@ -4658,12 +4659,11 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "tracing-subscriber"
|
||||
version = "0.2.25"
|
||||
version = "0.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0e0d2eaa99c3c2e41547cfa109e910a68ea03823cccad4a0525dcbc9b01e8c71"
|
||||
checksum = "5cf865b5ddc38e503a29c41c4843e616a73028ae18c637bc3eb2afaef4909c84"
|
||||
dependencies = [
|
||||
"ansi_term 0.12.1",
|
||||
"chrono",
|
||||
"lazy_static",
|
||||
"matchers",
|
||||
"regex",
|
||||
|
|
|
|||
|
|
@ -2,7 +2,7 @@
|
|||
name = "influxdb_iox"
|
||||
version = "0.1.0"
|
||||
authors = ["Paul Dix <paul@pauldix.net>"]
|
||||
edition = "2018"
|
||||
edition = "2021"
|
||||
default-run = "influxdb_iox"
|
||||
readme = "README.md"
|
||||
|
||||
|
|
@ -94,7 +94,7 @@ data_types = { path = "data_types" }
|
|||
entry = { path = "entry" }
|
||||
generated_types = { path = "generated_types" }
|
||||
|
||||
influxdb_iox_client = { path = "influxdb_iox_client", features = ["format"] }
|
||||
influxdb_iox_client = { path = "influxdb_iox_client", features = ["flight", "format"] }
|
||||
influxdb_line_protocol = { path = "influxdb_line_protocol" }
|
||||
internal_types = { path = "internal_types" }
|
||||
iox_object_store = { path = "iox_object_store" }
|
||||
|
|
|
|||
|
|
@ -2,7 +2,7 @@
|
|||
name = "arrow_util"
|
||||
version = "0.1.0"
|
||||
authors = ["Andrew Lamb <andrew@nerdnetworks.org>"]
|
||||
edition = "2018"
|
||||
edition = "2021"
|
||||
description = "Apache Arrow utilities"
|
||||
|
||||
[dependencies]
|
||||
|
|
|
|||
|
|
@ -29,6 +29,12 @@ impl BitSet {
|
|||
bitset
|
||||
}
|
||||
|
||||
/// Reserve space for `count` further bits
|
||||
pub fn reserve(&mut self, count: usize) {
|
||||
let new_buf_len = (self.len + count + 7) >> 3;
|
||||
self.buffer.reserve(new_buf_len);
|
||||
}
|
||||
|
||||
/// Appends `count` unset bits
|
||||
pub fn append_unset(&mut self, count: usize) {
|
||||
self.len += count;
|
||||
|
|
|
|||
|
|
@ -3,7 +3,7 @@ name = "client_util"
|
|||
version = "0.1.0"
|
||||
authors = ["Raphael Taylor-Davies <r.taylordavies@googlemail.com>"]
|
||||
description = "Shared code for IOx clients"
|
||||
edition = "2018"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
http = "0.2.3"
|
||||
|
|
@ -13,4 +13,4 @@ tonic = { version = "0.5.0" }
|
|||
tower = "0.4"
|
||||
|
||||
[dev-dependencies]
|
||||
tokio = { version = "1.11", features = ["macros"] }
|
||||
tokio = { version = "1.11", features = ["macros"] }
|
||||
|
|
|
|||
|
|
@ -3,7 +3,7 @@ name = "data_types"
|
|||
version = "0.1.0"
|
||||
authors = ["pauldix <paul@pauldix.net>"]
|
||||
description = "InfluxDB IOx data_types, shared between IOx instances and IOx clients"
|
||||
edition = "2018"
|
||||
edition = "2021"
|
||||
readme = "README.md"
|
||||
|
||||
[dependencies] # In alphabetical order
|
||||
|
|
|
|||
|
|
@ -2,7 +2,7 @@
|
|||
name = "datafusion"
|
||||
version = "0.1.0"
|
||||
authors = ["Andrew Lamb <andrew@nerdnetworks.org>"]
|
||||
edition = "2018"
|
||||
edition = "2021"
|
||||
description = "Re-exports datafusion at a specific version"
|
||||
|
||||
[dependencies]
|
||||
|
|
|
|||
|
|
@ -2,7 +2,7 @@
|
|||
name = "datafusion_util"
|
||||
version = "0.1.0"
|
||||
authors = ["Andrew Lamb <andrew@nerdnetworks.org>"]
|
||||
edition = "2018"
|
||||
edition = "2021"
|
||||
description = "Datafusion utilities"
|
||||
|
||||
[dependencies]
|
||||
|
|
|
|||
|
|
@ -2,7 +2,7 @@
|
|||
name = "entry"
|
||||
version = "0.1.0"
|
||||
authors = ["Paul Dix <paul@pauldix.net>"]
|
||||
edition = "2018"
|
||||
edition = "2021"
|
||||
description = "The entry format used by the write buffer"
|
||||
|
||||
[dependencies]
|
||||
|
|
|
|||
|
|
@ -2,7 +2,7 @@
|
|||
name = "generated_types"
|
||||
version = "0.1.0"
|
||||
authors = ["Paul Dix <paul@pauldix.net>"]
|
||||
edition = "2018"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies] # In alphabetical order
|
||||
bytes = "1.0"
|
||||
|
|
|
|||
|
|
@ -2,7 +2,7 @@
|
|||
name = "grpc-router"
|
||||
version = "0.1.0"
|
||||
authors = ["Marko Mikulicic <mkm@influxdata.com>"]
|
||||
edition = "2018"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
bytes = "1.0"
|
||||
|
|
|
|||
|
|
@ -2,7 +2,7 @@
|
|||
name = "grpc-router-test-gen"
|
||||
version = "0.1.0"
|
||||
authors = ["Marko Mikulicic <mkm@influxdata.com>"]
|
||||
edition = "2018"
|
||||
edition = "2021"
|
||||
description = "Protobuf used in test for the grpc-router crate; need to be in a separate create because of linter limitations"
|
||||
|
||||
[dependencies]
|
||||
|
|
|
|||
|
|
@ -2,7 +2,7 @@
|
|||
name = "influxdb2_client"
|
||||
version = "0.1.0"
|
||||
authors = ["Paul Dix <paul@pauldix.net>"]
|
||||
edition = "2018"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies] # In alphabetical order
|
||||
bytes = "1.0"
|
||||
|
|
|
|||
|
|
@ -2,11 +2,11 @@
|
|||
name = "influxdb_iox_client"
|
||||
version = "0.1.0"
|
||||
authors = ["Dom Dwyer <dom@itsallbroken.com>"]
|
||||
edition = "2018"
|
||||
edition = "2021"
|
||||
|
||||
[features]
|
||||
flight = ["arrow", "arrow-flight", "arrow_util", "serde/derive", "serde_json", "futures-util"]
|
||||
format = ["arrow"]
|
||||
format = ["arrow", "arrow_util"]
|
||||
|
||||
[dependencies]
|
||||
# Workspace dependencies, in alphabetical order
|
||||
|
|
|
|||
|
|
@ -2,7 +2,7 @@
|
|||
name = "influxdb_line_protocol"
|
||||
version = "0.1.0"
|
||||
authors = ["Paul Dix <paul@pauldix.net>"]
|
||||
edition = "2018"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies] # In alphabetical order
|
||||
nom = "7"
|
||||
|
|
@ -11,4 +11,4 @@ snafu = "0.6.2"
|
|||
observability_deps = { path = "../observability_deps" }
|
||||
|
||||
[dev-dependencies] # In alphabetical order
|
||||
test_helpers = { path = "../test_helpers" }
|
||||
test_helpers = { path = "../test_helpers" }
|
||||
|
|
|
|||
|
|
@ -2,7 +2,7 @@
|
|||
name = "influxdb_storage_client"
|
||||
version = "0.1.0"
|
||||
authors = ["Raphael Taylor-Davies <r.taylordavies@googlemail.com>"]
|
||||
edition = "2018"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
client_util = { path = "../client_util" }
|
||||
|
|
@ -11,4 +11,4 @@ prost = "0.8"
|
|||
tonic = { version = "0.5.0" }
|
||||
futures-util = { version = "0.3.1" }
|
||||
|
||||
[dev-dependencies]
|
||||
[dev-dependencies]
|
||||
|
|
|
|||
|
|
@ -2,7 +2,7 @@
|
|||
name = "influxdb_tsm"
|
||||
version = "0.1.0"
|
||||
authors = ["Edd Robinson <me@edd.io>"]
|
||||
edition = "2018"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies] # In alphabetical order
|
||||
integer-encoding = "3.0.2"
|
||||
|
|
|
|||
|
|
@ -2,7 +2,7 @@
|
|||
name = "internal_types"
|
||||
version = "0.1.0"
|
||||
authors = ["Andrew Lamb <andrew@nerdnetworks.org>"]
|
||||
edition = "2018"
|
||||
edition = "2021"
|
||||
description = "InfluxDB IOx internal types, shared between IOx instances"
|
||||
readme = "README.md"
|
||||
|
||||
|
|
|
|||
|
|
@ -2,7 +2,7 @@
|
|||
name = "iox_data_generator"
|
||||
version = "0.1.0"
|
||||
authors = ["Paul Dix <paul@pauldix.net>"]
|
||||
edition = "2018"
|
||||
edition = "2021"
|
||||
default-run = "iox_data_generator"
|
||||
|
||||
[dependencies]
|
||||
|
|
@ -26,7 +26,7 @@ snafu = "0.6.8"
|
|||
tokio = { version = "1.11", features = ["macros", "rt-multi-thread"] }
|
||||
toml = "0.5.6"
|
||||
tracing = "0.1"
|
||||
tracing-subscriber = "0.2.25"
|
||||
tracing-subscriber = "0.3.0"
|
||||
uuid = { version = "0.8.1", default_features = false }
|
||||
|
||||
[dev-dependencies]
|
||||
|
|
|
|||
|
|
@ -1,7 +1,7 @@
|
|||
[package]
|
||||
name = "iox_object_store"
|
||||
version = "0.1.0"
|
||||
edition = "2018"
|
||||
edition = "2021"
|
||||
description = "IOx-specific semantics wrapping the general-purpose object store crate"
|
||||
|
||||
[dependencies]
|
||||
|
|
|
|||
|
|
@ -2,7 +2,7 @@
|
|||
name = "lifecycle"
|
||||
version = "0.1.0"
|
||||
authors = ["Raphael Taylor-Davies <r.taylordavies@googlemail.com>"]
|
||||
edition = "2018"
|
||||
edition = "2021"
|
||||
description = "Implements the IOx data lifecycle"
|
||||
|
||||
[dependencies]
|
||||
|
|
|
|||
|
|
@ -3,13 +3,14 @@ name = "logfmt"
|
|||
version = "0.1.0"
|
||||
authors = ["Andrew Lamb <andrew@nerdnetworks.org>"]
|
||||
description = "tracing_subscriber layer for writing out logfmt formatted events"
|
||||
edition = "2018"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies] # In alphabetical order
|
||||
observability_deps = { path = "../observability_deps" }
|
||||
tracing-subscriber = "0.2"
|
||||
tracing-subscriber = "0.3"
|
||||
|
||||
[dev-dependencies] # In alphabetical order
|
||||
once_cell = { version = "1.4.0", features = ["parking_lot"] }
|
||||
parking_lot = "0.11.2"
|
||||
regex = "1.4.3"
|
||||
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
|
||||
|
|
|
|||
|
|
@ -19,12 +19,18 @@ use tracing_subscriber::{fmt::MakeWriter, layer::Context, registry::LookupSpan,
|
|||
/// looked very small and did not (obviously) work with the tracing subscriber
|
||||
///
|
||||
/// [logfmt]: https://brandur.org/logfmt
|
||||
pub struct LogFmtLayer<W: MakeWriter> {
|
||||
pub struct LogFmtLayer<W>
|
||||
where
|
||||
W: for<'writer> MakeWriter<'writer>,
|
||||
{
|
||||
writer: W,
|
||||
display_target: bool,
|
||||
}
|
||||
|
||||
impl<W: MakeWriter> LogFmtLayer<W> {
|
||||
impl<W> LogFmtLayer<W>
|
||||
where
|
||||
W: for<'writer> MakeWriter<'writer>,
|
||||
{
|
||||
/// Create a new logfmt Layer to pass into tracing_subscriber
|
||||
///
|
||||
/// Note this layer simply formats and writes to the specified writer. It
|
||||
|
|
@ -68,7 +74,7 @@ impl<W: MakeWriter> LogFmtLayer<W> {
|
|||
|
||||
impl<S, W> Layer<S> for LogFmtLayer<W>
|
||||
where
|
||||
W: MakeWriter + 'static,
|
||||
W: for<'writer> MakeWriter<'writer> + 'static,
|
||||
S: Subscriber + for<'a> LookupSpan<'a>,
|
||||
{
|
||||
fn register_callsite(
|
||||
|
|
@ -78,7 +84,7 @@ where
|
|||
Interest::always()
|
||||
}
|
||||
|
||||
fn new_span(&self, attrs: &tracing::span::Attributes<'_>, id: &Id, ctx: Context<'_, S>) {
|
||||
fn on_new_span(&self, attrs: &tracing::span::Attributes<'_>, id: &Id, ctx: Context<'_, S>) {
|
||||
let writer = self.writer.make_writer();
|
||||
let metadata = ctx.metadata(id).expect("span should have metadata");
|
||||
let mut p = FieldPrinter::new(writer, metadata.level(), self.display_target);
|
||||
|
|
|
|||
|
|
@ -363,7 +363,7 @@ impl std::io::Write for CapturedWriter {
|
|||
}
|
||||
}
|
||||
|
||||
impl MakeWriter for CapturedWriter {
|
||||
impl MakeWriter<'_> for CapturedWriter {
|
||||
type Writer = Self;
|
||||
|
||||
fn make_writer(&self) -> Self::Writer {
|
||||
|
|
|
|||
|
|
@ -2,7 +2,7 @@
|
|||
name = "metric"
|
||||
version = "0.1.0"
|
||||
authors = ["Raphael Taylor-Davies <r.taylordavies@googlemail.com>"]
|
||||
edition = "2018"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies] # In alphabetical order
|
||||
|
||||
|
|
|
|||
|
|
@ -2,7 +2,7 @@
|
|||
name = "metric_exporters"
|
||||
version = "0.1.0"
|
||||
authors = ["Raphael Taylor-Davies <r.taylordavies@googlemail.com>"]
|
||||
edition = "2018"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies] # In alphabetical order
|
||||
|
||||
|
|
|
|||
|
|
@ -1,7 +1,7 @@
|
|||
[package]
|
||||
name = "mutable_batch"
|
||||
version = "0.1.0"
|
||||
edition = "2018"
|
||||
edition = "2021"
|
||||
description = "A mutable arrow RecordBatch"
|
||||
|
||||
[dependencies]
|
||||
|
|
|
|||
|
|
@ -1,10 +1,9 @@
|
|||
//! A [`Column`] stores the rows for a given column name
|
||||
|
||||
use std::fmt::Formatter;
|
||||
use std::iter::Enumerate;
|
||||
use std::iter::{Enumerate, Zip};
|
||||
use std::mem;
|
||||
use std::sync::Arc;
|
||||
use std::{convert::TryInto, iter::Zip};
|
||||
|
||||
use arrow::error::ArrowError;
|
||||
use arrow::{
|
||||
|
|
|
|||
|
|
@ -175,6 +175,20 @@ impl MutableBatch {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
/// Extend this [`MutableBatch`] with `ranges` rows from `other`
|
||||
pub fn extend_from_ranges(
|
||||
&mut self,
|
||||
other: &Self,
|
||||
ranges: &[Range<usize>],
|
||||
) -> writer::Result<()> {
|
||||
let to_insert = ranges.iter().map(|x| x.end - x.start).sum();
|
||||
|
||||
let mut writer = writer::Writer::new(self, to_insert);
|
||||
writer.write_batch_ranges(other, ranges)?;
|
||||
writer.commit();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Returns a reference to the specified column
|
||||
pub(crate) fn column(&self, column: &str) -> Result<&Column> {
|
||||
let idx = self
|
||||
|
|
|
|||
|
|
@ -499,86 +499,105 @@ impl<'a> Writer<'a> {
|
|||
src: &MutableBatch,
|
||||
range: Range<usize>,
|
||||
) -> Result<()> {
|
||||
if range.start == 0 && range.end == src.row_count {
|
||||
self.write_batch_ranges(src, &[range])
|
||||
}
|
||||
|
||||
/// Write the rows identified by `ranges` to the provided MutableBatch
|
||||
pub(crate) fn write_batch_ranges(
|
||||
&mut self,
|
||||
src: &MutableBatch,
|
||||
ranges: &[Range<usize>],
|
||||
) -> Result<()> {
|
||||
let to_insert = self.to_insert;
|
||||
|
||||
if to_insert == src.row_count {
|
||||
return self.write_batch(src);
|
||||
}
|
||||
|
||||
assert_eq!(range.end - range.start, self.to_insert);
|
||||
for (src_col_name, src_col_idx) in &src.column_names {
|
||||
let src_col = &src.columns[*src_col_idx];
|
||||
let (dst_col_idx, dst_col) = self.column_mut(src_col_name, src_col.influx_type)?;
|
||||
let stats = match (&mut dst_col.data, &src_col.data) {
|
||||
(ColumnData::F64(dst_data, _), ColumnData::F64(src_data, _)) => {
|
||||
dst_data.extend_from_slice(&src_data[range.clone()]);
|
||||
Statistics::F64(compute_stats(src_col.valid.bytes(), range.clone(), |x| {
|
||||
&src_data[x]
|
||||
}))
|
||||
}
|
||||
(ColumnData::I64(dst_data, _), ColumnData::I64(src_data, _)) => {
|
||||
dst_data.extend_from_slice(&src_data[range.clone()]);
|
||||
Statistics::I64(compute_stats(src_col.valid.bytes(), range.clone(), |x| {
|
||||
&src_data[x]
|
||||
}))
|
||||
}
|
||||
(ColumnData::U64(dst_data, _), ColumnData::U64(src_data, _)) => {
|
||||
dst_data.extend_from_slice(&src_data[range.clone()]);
|
||||
Statistics::U64(compute_stats(src_col.valid.bytes(), range.clone(), |x| {
|
||||
&src_data[x]
|
||||
}))
|
||||
}
|
||||
(ColumnData::F64(dst_data, _), ColumnData::F64(src_data, _)) => Statistics::F64(
|
||||
write_slice(to_insert, ranges, src_col.valid.bytes(), src_data, dst_data),
|
||||
),
|
||||
(ColumnData::I64(dst_data, _), ColumnData::I64(src_data, _)) => Statistics::I64(
|
||||
write_slice(to_insert, ranges, src_col.valid.bytes(), src_data, dst_data),
|
||||
),
|
||||
(ColumnData::U64(dst_data, _), ColumnData::U64(src_data, _)) => Statistics::U64(
|
||||
write_slice(to_insert, ranges, src_col.valid.bytes(), src_data, dst_data),
|
||||
),
|
||||
(ColumnData::Bool(dst_data, _), ColumnData::Bool(src_data, _)) => {
|
||||
dst_data.extend_from_range(src_data, range.clone());
|
||||
Statistics::Bool(compute_bool_stats(
|
||||
src_col.valid.bytes(),
|
||||
range.clone(),
|
||||
src_data,
|
||||
))
|
||||
dst_data.reserve(to_insert);
|
||||
let mut stats = StatValues::new_empty();
|
||||
for range in ranges {
|
||||
dst_data.extend_from_range(src_data, range.clone());
|
||||
compute_bool_stats(
|
||||
src_col.valid.bytes(),
|
||||
range.clone(),
|
||||
src_data,
|
||||
&mut stats,
|
||||
)
|
||||
}
|
||||
Statistics::Bool(stats)
|
||||
}
|
||||
(ColumnData::String(dst_data, _), ColumnData::String(src_data, _)) => {
|
||||
dst_data.extend_from_range(src_data, range.clone());
|
||||
Statistics::String(compute_stats(src_col.valid.bytes(), range.clone(), |x| {
|
||||
src_data.get(x).unwrap()
|
||||
}))
|
||||
let mut stats = StatValues::new_empty();
|
||||
for range in ranges {
|
||||
dst_data.extend_from_range(src_data, range.clone());
|
||||
compute_stats(src_col.valid.bytes(), range.clone(), &mut stats, |x| {
|
||||
src_data.get(x).unwrap()
|
||||
})
|
||||
}
|
||||
Statistics::String(stats)
|
||||
}
|
||||
(
|
||||
ColumnData::Tag(dst_data, dst_dict, _),
|
||||
ColumnData::Tag(src_data, src_dict, _),
|
||||
) => {
|
||||
dst_data.reserve(to_insert);
|
||||
|
||||
let mut mapping: Vec<_> = vec![None; src_dict.values().len()];
|
||||
|
||||
let mut stats = StatValues::new_empty();
|
||||
dst_data.extend(src_data[range.clone()].iter().map(|src_id| match *src_id {
|
||||
INVALID_DID => {
|
||||
stats.update_for_nulls(1);
|
||||
INVALID_DID
|
||||
}
|
||||
_ => {
|
||||
let maybe_did = &mut mapping[*src_id as usize];
|
||||
match maybe_did {
|
||||
Some(did) => {
|
||||
stats.total_count += 1;
|
||||
*did
|
||||
for range in ranges {
|
||||
dst_data.extend(src_data[range.clone()].iter().map(
|
||||
|src_id| match *src_id {
|
||||
INVALID_DID => {
|
||||
stats.update_for_nulls(1);
|
||||
INVALID_DID
|
||||
}
|
||||
None => {
|
||||
let value = src_dict.lookup_id(*src_id).unwrap();
|
||||
stats.update(value);
|
||||
_ => {
|
||||
let maybe_did = &mut mapping[*src_id as usize];
|
||||
match maybe_did {
|
||||
Some(did) => {
|
||||
stats.total_count += 1;
|
||||
*did
|
||||
}
|
||||
None => {
|
||||
let value = src_dict.lookup_id(*src_id).unwrap();
|
||||
stats.update(value);
|
||||
|
||||
let did = dst_dict.lookup_value_or_insert(value);
|
||||
*maybe_did = Some(did);
|
||||
did
|
||||
let did = dst_dict.lookup_value_or_insert(value);
|
||||
*maybe_did = Some(did);
|
||||
did
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}));
|
||||
},
|
||||
));
|
||||
}
|
||||
|
||||
Statistics::String(stats)
|
||||
}
|
||||
_ => unreachable!(),
|
||||
};
|
||||
|
||||
dst_col
|
||||
.valid
|
||||
.extend_from_range(&src_col.valid, range.clone());
|
||||
dst_col.valid.reserve(to_insert);
|
||||
for range in ranges {
|
||||
dst_col
|
||||
.valid
|
||||
.extend_from_range(&src_col.valid, range.clone());
|
||||
}
|
||||
|
||||
self.statistics.push((dst_col_idx, stats));
|
||||
}
|
||||
|
|
@ -707,12 +726,16 @@ fn append_valid_mask(column: &mut Column, valid_mask: Option<&[u8]>, to_insert:
|
|||
}
|
||||
}
|
||||
|
||||
fn compute_bool_stats(valid: &[u8], range: Range<usize>, col_data: &BitSet) -> StatValues<bool> {
|
||||
fn compute_bool_stats(
|
||||
valid: &[u8],
|
||||
range: Range<usize>,
|
||||
col_data: &BitSet,
|
||||
stats: &mut StatValues<bool>,
|
||||
) {
|
||||
// There are likely faster ways to do this
|
||||
let indexes =
|
||||
iter_set_positions_with_offset(valid, range.start).take_while(|idx| *idx < range.end);
|
||||
|
||||
let mut stats = StatValues::new_empty();
|
||||
for index in indexes {
|
||||
let value = col_data.get(index);
|
||||
stats.update(&value)
|
||||
|
|
@ -720,11 +743,33 @@ fn compute_bool_stats(valid: &[u8], range: Range<usize>, col_data: &BitSet) -> S
|
|||
|
||||
let count = range.end - range.start;
|
||||
stats.update_for_nulls(count as u64 - stats.total_count);
|
||||
}
|
||||
|
||||
fn write_slice<T>(
|
||||
to_insert: usize,
|
||||
ranges: &[Range<usize>],
|
||||
valid: &[u8],
|
||||
src_data: &[T],
|
||||
dst_data: &mut Vec<T>,
|
||||
) -> StatValues<T>
|
||||
where
|
||||
T: Clone + PartialOrd + IsNan,
|
||||
{
|
||||
dst_data.reserve(to_insert);
|
||||
let mut stats = StatValues::new_empty();
|
||||
for range in ranges {
|
||||
dst_data.extend_from_slice(&src_data[range.clone()]);
|
||||
compute_stats(valid, range.clone(), &mut stats, |x| &src_data[x]);
|
||||
}
|
||||
stats
|
||||
}
|
||||
|
||||
fn compute_stats<'a, T, U, F>(valid: &[u8], range: Range<usize>, accessor: F) -> StatValues<T>
|
||||
where
|
||||
fn compute_stats<'a, T, U, F>(
|
||||
valid: &[u8],
|
||||
range: Range<usize>,
|
||||
stats: &mut StatValues<T>,
|
||||
accessor: F,
|
||||
) where
|
||||
U: 'a + ToOwned<Owned = T> + PartialOrd + ?Sized + IsNan,
|
||||
F: Fn(usize) -> &'a U,
|
||||
T: std::borrow::Borrow<U>,
|
||||
|
|
@ -733,14 +778,12 @@ where
|
|||
.take_while(|idx| *idx < range.end)
|
||||
.map(accessor);
|
||||
|
||||
let mut stats = StatValues::new_empty();
|
||||
for value in values {
|
||||
stats.update(value)
|
||||
}
|
||||
|
||||
let count = range.end - range.start;
|
||||
stats.update_for_nulls(count as u64 - stats.total_count);
|
||||
stats
|
||||
}
|
||||
|
||||
impl<'a> Drop for Writer<'a> {
|
||||
|
|
|
|||
|
|
@ -1,10 +1,11 @@
|
|||
[package]
|
||||
name = "mutable_batch_lp"
|
||||
version = "0.1.0"
|
||||
edition = "2018"
|
||||
edition = "2021"
|
||||
description = "Conversion logic for line protocol -> MutableBatch"
|
||||
|
||||
[dependencies]
|
||||
hashbrown = "0.11"
|
||||
influxdb_line_protocol = { path = "../influxdb_line_protocol" }
|
||||
mutable_batch = { path = "../mutable_batch" }
|
||||
schema = { path = "../schema" }
|
||||
|
|
|
|||
|
|
@ -4,7 +4,7 @@ use bytes::Bytes;
|
|||
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
|
||||
use flate2::read::GzDecoder;
|
||||
|
||||
use mutable_batch_lp::lines_to_batch;
|
||||
use mutable_batch_lp::lines_to_batches;
|
||||
|
||||
fn generate_lp_bytes() -> Bytes {
|
||||
let raw = include_bytes!("../../tests/fixtures/lineproto/read_filter.lp.gz");
|
||||
|
|
@ -23,7 +23,9 @@ pub fn write_lp(c: &mut Criterion) {
|
|||
group.bench_function(BenchmarkId::from_parameter(count), |b| {
|
||||
b.iter(|| {
|
||||
for _ in 0..*count {
|
||||
lines_to_batch(std::str::from_utf8(&lp_bytes).unwrap(), 0).unwrap();
|
||||
let batches =
|
||||
lines_to_batches(std::str::from_utf8(&lp_bytes).unwrap(), 0).unwrap();
|
||||
assert_eq!(batches.len(), 1);
|
||||
}
|
||||
});
|
||||
});
|
||||
|
|
|
|||
|
|
@ -11,6 +11,7 @@
|
|||
clippy::clone_on_ref_ptr
|
||||
)]
|
||||
|
||||
use hashbrown::HashMap;
|
||||
use influxdb_line_protocol::{parse_lines, FieldValue, ParsedLine};
|
||||
use mutable_batch::writer::Writer;
|
||||
use mutable_batch::MutableBatch;
|
||||
|
|
@ -36,18 +37,26 @@ pub enum Error {
|
|||
/// Result type for line protocol conversion
|
||||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||
|
||||
/// Converts the provided lines of line protocol to a [`MutableBatch`]
|
||||
pub fn lines_to_batch(lines: &str, default_time: i64) -> Result<MutableBatch> {
|
||||
let mut batch = MutableBatch::new();
|
||||
/// Converts the provided lines of line protocol to a set of [`MutableBatch`]
|
||||
/// keyed by measurement name
|
||||
pub fn lines_to_batches(lines: &str, default_time: i64) -> Result<HashMap<String, MutableBatch>> {
|
||||
let mut batches = HashMap::new();
|
||||
for (line_idx, maybe_line) in parse_lines(lines).enumerate() {
|
||||
let line = maybe_line.context(LineProtocol { line: line_idx + 1 })?;
|
||||
let measurement = line.series.measurement.as_str();
|
||||
|
||||
let (_, batch) = batches
|
||||
.raw_entry_mut()
|
||||
.from_key(measurement)
|
||||
.or_insert_with(|| (measurement.to_string(), MutableBatch::new()));
|
||||
|
||||
// TODO: Reuse writer
|
||||
let mut writer = Writer::new(&mut batch, 1);
|
||||
let mut writer = Writer::new(batch, 1);
|
||||
write_line(&mut writer, line, default_time).context(Write { line: line_idx + 1 })?;
|
||||
writer.commit();
|
||||
}
|
||||
Ok(batch)
|
||||
|
||||
Ok(batches)
|
||||
}
|
||||
|
||||
fn write_line(
|
||||
|
|
@ -95,10 +104,14 @@ mod tests {
|
|||
fn test_basic() {
|
||||
let lp = r#"cpu,tag1=v1,tag2=v2 val=2i 0
|
||||
cpu,tag1=v4,tag2=v1 val=2i 0
|
||||
mem,tag1=v2 ival=3i 0
|
||||
cpu,tag2=v2 val=3i 1
|
||||
cpu,tag1=v1,tag2=v2 fval=2.0"#;
|
||||
cpu,tag1=v1,tag2=v2 fval=2.0
|
||||
mem,tag1=v5 ival=2i 1
|
||||
"#;
|
||||
|
||||
let batch = lines_to_batch(lp, 5).unwrap();
|
||||
let batch = lines_to_batches(lp, 5).unwrap();
|
||||
assert_eq!(batch.len(), 2);
|
||||
|
||||
assert_batches_eq!(
|
||||
&[
|
||||
|
|
@ -111,7 +124,19 @@ mod tests {
|
|||
"| 2 | v1 | v2 | 1970-01-01T00:00:00.000000005Z | |",
|
||||
"+------+------+------+--------------------------------+-----+",
|
||||
],
|
||||
&[batch.to_arrow(Selection::All).unwrap()]
|
||||
&[batch["cpu"].to_arrow(Selection::All).unwrap()]
|
||||
);
|
||||
|
||||
assert_batches_eq!(
|
||||
&[
|
||||
"+------+------+--------------------------------+",
|
||||
"| ival | tag1 | time |",
|
||||
"+------+------+--------------------------------+",
|
||||
"| 3 | v2 | 1970-01-01T00:00:00Z |",
|
||||
"| 2 | v5 | 1970-01-01T00:00:00.000000001Z |",
|
||||
"+------+------+--------------------------------+",
|
||||
],
|
||||
&[batch["mem"].to_arrow(Selection::All).unwrap()]
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,7 +1,7 @@
|
|||
[package]
|
||||
name = "mutable_batch_pb"
|
||||
version = "0.1.0"
|
||||
edition = "2018"
|
||||
edition = "2021"
|
||||
description = "Conversion logic for binary write protocol <-> MutableBatch"
|
||||
|
||||
[dependencies]
|
||||
|
|
|
|||
|
|
@ -2,7 +2,7 @@
|
|||
name = "mutable_buffer"
|
||||
version = "0.1.0"
|
||||
authors = ["Andrew Lamb <andrew@nerdnetworks.org>"]
|
||||
edition = "2018"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies] # In alphabetical order
|
||||
arrow = { version = "6.0", features = ["prettyprint"] }
|
||||
|
|
|
|||
|
|
@ -2,7 +2,7 @@
|
|||
name = "object_store"
|
||||
version = "0.1.0"
|
||||
authors = ["Paul Dix <paul@pauldix.net>"]
|
||||
edition = "2018"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies] # In alphabetical order
|
||||
async-trait = "0.1.42"
|
||||
|
|
|
|||
|
|
@ -2,7 +2,7 @@
|
|||
name = "observability_deps"
|
||||
version = "0.1.0"
|
||||
authors = ["Paul Dix <paul@pauldix.net>"]
|
||||
edition = "2018"
|
||||
edition = "2021"
|
||||
description = "Observability ecosystem dependencies for InfluxDB IOx, to ensure consistent versions and unified updates"
|
||||
|
||||
[dependencies] # In alphabetical order
|
||||
|
|
|
|||
|
|
@ -2,7 +2,7 @@
|
|||
name = "packers"
|
||||
version = "0.1.0"
|
||||
authors = ["Andrew Lamb <andrew@nerdnetworks.org>"]
|
||||
edition = "2018"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies] # In alphabetical order
|
||||
arrow = { version = "6.0", features = ["prettyprint"] }
|
||||
|
|
|
|||
|
|
@ -2,7 +2,7 @@
|
|||
name = "panic_logging"
|
||||
version = "0.1.0"
|
||||
authors = ["Paul Dix <paul@pauldix.net>"]
|
||||
edition = "2018"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies] # In alphabetical order
|
||||
observability_deps = { path = "../observability_deps" }
|
||||
|
|
|
|||
|
|
@ -1,7 +1,7 @@
|
|||
[package]
|
||||
name = "parquet_catalog"
|
||||
version = "0.1.0"
|
||||
edition = "2018"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
arrow = { version = "6.0", features = ["prettyprint"] }
|
||||
|
|
|
|||
|
|
@ -2,7 +2,7 @@
|
|||
name = "parquet_file"
|
||||
version = "0.1.0"
|
||||
authors = ["Nga Tran <nga-tran@live.com>"]
|
||||
edition = "2018"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies] # In alphabetical order
|
||||
arrow = { version = "6.0", features = ["prettyprint"] }
|
||||
|
|
|
|||
|
|
@ -1,7 +1,7 @@
|
|||
[package]
|
||||
name = "persistence_windows"
|
||||
version = "0.1.0"
|
||||
edition = "2018"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
data_types = { path = "../data_types" }
|
||||
|
|
|
|||
|
|
@ -1,7 +1,7 @@
|
|||
[package]
|
||||
name = "predicate"
|
||||
version = "0.1.0"
|
||||
edition = "2018"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
arrow = { version = "6.0", features = ["prettyprint"] }
|
||||
|
|
|
|||
|
|
@ -31,8 +31,9 @@ pub const EMPTY_PREDICATE: Predicate = Predicate {
|
|||
#[derive(Debug, Clone, Copy)]
|
||||
/// The result of evaluating a predicate on a set of rows
|
||||
pub enum PredicateMatch {
|
||||
/// There is at least one row that matches the predicate
|
||||
AtLeastOne,
|
||||
/// There is at least one row that matches the predicate that has
|
||||
/// at least one non null value in each field of the predicate
|
||||
AtLeastOneNonNullField,
|
||||
|
||||
/// There are exactly zero rows that match the predicate
|
||||
Zero,
|
||||
|
|
|
|||
|
|
@ -2,7 +2,7 @@
|
|||
name = "query"
|
||||
version = "0.1.0"
|
||||
authors = ["Andrew Lamb <andrew@nerdnetworks.org>"]
|
||||
edition = "2018"
|
||||
edition = "2021"
|
||||
description = "IOx Query Interface and Executor"
|
||||
|
||||
# This crate is designed to be independent of the rest of the IOx
|
||||
|
|
|
|||
|
|
@ -22,20 +22,17 @@ use futures::TryStreamExt;
|
|||
use observability_deps::tracing::{debug, trace};
|
||||
use trace::{ctx::SpanContext, span::SpanRecorder};
|
||||
|
||||
use crate::{
|
||||
exec::{
|
||||
fieldlist::{FieldList, IntoFieldList},
|
||||
non_null_checker::NonNullCheckerExec,
|
||||
query_tracing::TracedStream,
|
||||
schema_pivot::{SchemaPivotExec, SchemaPivotNode},
|
||||
seriesset::{
|
||||
converter::{GroupGenerator, SeriesSetConverter},
|
||||
series::Series,
|
||||
},
|
||||
split::StreamSplitExec,
|
||||
stringset::{IntoStringSet, StringSetRef},
|
||||
use crate::exec::{
|
||||
fieldlist::{FieldList, IntoFieldList},
|
||||
non_null_checker::NonNullCheckerExec,
|
||||
query_tracing::TracedStream,
|
||||
schema_pivot::{SchemaPivotExec, SchemaPivotNode},
|
||||
seriesset::{
|
||||
converter::{GroupGenerator, SeriesSetConverter},
|
||||
series::Series,
|
||||
},
|
||||
plan::stringset::TableNamePlanBuilder,
|
||||
split::StreamSplitExec,
|
||||
stringset::{IntoStringSet, StringSetRef},
|
||||
};
|
||||
|
||||
use crate::plan::{
|
||||
|
|
@ -489,25 +486,6 @@ impl IOxExecutionContext {
|
|||
}
|
||||
}
|
||||
|
||||
/// Executes table_plans and, if returns some rows, add that table into the return list
|
||||
/// Tables discovered from meta data won't need any plan
|
||||
pub async fn to_table_names(&self, builder: TableNamePlanBuilder) -> Result<StringSetRef> {
|
||||
let ctx = self.child_ctx("to_table_names");
|
||||
|
||||
// first get all meta data tables
|
||||
let mut tables = builder.meta_data_table_names().clone();
|
||||
|
||||
// Now run each plan and if it returns data, add it table in
|
||||
let table_plans = builder.table_plans();
|
||||
for (table, plan) in table_plans {
|
||||
if !ctx.run_logical_plan(plan).await?.is_empty() {
|
||||
tables.insert(table.clone());
|
||||
}
|
||||
}
|
||||
|
||||
Ok(Arc::new(tables))
|
||||
}
|
||||
|
||||
/// Run the plan and return a record batch reader for reading the results
|
||||
pub async fn run_logical_plan(&self, plan: LogicalPlan) -> Result<Vec<RecordBatch>> {
|
||||
self.run_logical_plans(vec![plan]).await
|
||||
|
|
|
|||
|
|
@ -26,7 +26,7 @@ use schema::{InfluxColumnType, Schema, TIME_COLUMN_NAME};
|
|||
use snafu::{ensure, OptionExt, ResultExt, Snafu};
|
||||
|
||||
use crate::{
|
||||
exec::{field::FieldColumns, make_schema_pivot},
|
||||
exec::{field::FieldColumns, make_non_null_checker, make_schema_pivot},
|
||||
func::{
|
||||
selectors::{selector_first, selector_last, selector_max, selector_min, SelectorOutput},
|
||||
window::make_window_bound_expr,
|
||||
|
|
@ -35,9 +35,7 @@ use crate::{
|
|||
plan::{
|
||||
fieldlist::FieldListPlan,
|
||||
seriesset::{SeriesSetPlan, SeriesSetPlans},
|
||||
stringset::{
|
||||
Error as StringSetError, StringSetPlan, StringSetPlanBuilder, TableNamePlanBuilder,
|
||||
},
|
||||
stringset::{Error as StringSetError, StringSetPlan, StringSetPlanBuilder},
|
||||
},
|
||||
provider::ProviderBuilder,
|
||||
QueryChunk, QueryChunkMeta, QueryDatabase,
|
||||
|
|
@ -229,11 +227,13 @@ impl InfluxRpcPlanner {
|
|||
/// . A set of plans of tables of either
|
||||
/// . chunks with deleted data or
|
||||
/// . chunks without deleted data but cannot be decided from meta data
|
||||
pub fn table_names<D>(&self, database: &D, predicate: Predicate) -> Result<TableNamePlanBuilder>
|
||||
pub fn table_names<D>(&self, database: &D, predicate: Predicate) -> Result<StringSetPlan>
|
||||
where
|
||||
D: QueryDatabase + 'static,
|
||||
{
|
||||
let mut builder = TableNamePlanBuilder::new();
|
||||
debug!(predicate=?predicate, "planning table_names");
|
||||
|
||||
let mut builder = StringSetPlanBuilder::new();
|
||||
let mut normalizer = PredicateNormalizer::new(predicate);
|
||||
|
||||
// Mapping between table and chunks that need full plan
|
||||
|
|
@ -244,9 +244,11 @@ impl InfluxRpcPlanner {
|
|||
for chunk in database.chunks(normalizer.unnormalized()) {
|
||||
let table_name = chunk.table_name();
|
||||
let schema = chunk.schema();
|
||||
trace!(chunk_id=%chunk.id(), table_name, "Considering table");
|
||||
|
||||
// Table is already in the returned table list, no longer needs to discover it from other chunks
|
||||
if builder.contains_meta_data_table(table_name.to_string()) {
|
||||
if builder.contains(table_name) {
|
||||
trace!("already seen");
|
||||
continue;
|
||||
}
|
||||
|
||||
|
|
@ -258,10 +260,11 @@ impl InfluxRpcPlanner {
|
|||
.or_insert_with(Vec::new)
|
||||
.push(Arc::clone(&chunk));
|
||||
} else {
|
||||
// See if we can have enough info from the chunk's meta data to answer
|
||||
// that this table participates in the request
|
||||
// See if we have enough info only from the chunk's
|
||||
// meta data to know if the table has data that
|
||||
// matches the predicate
|
||||
let predicate = normalizer.normalized(table_name, schema);
|
||||
//
|
||||
|
||||
// Try and apply the predicate using only metadata
|
||||
let pred_result = chunk
|
||||
.apply_predicate_to_metadata(&predicate)
|
||||
|
|
@ -269,38 +272,38 @@ impl InfluxRpcPlanner {
|
|||
.context(CheckingChunkPredicate {
|
||||
chunk_id: chunk.id(),
|
||||
})?;
|
||||
//
|
||||
|
||||
match pred_result {
|
||||
PredicateMatch::AtLeastOne => {
|
||||
PredicateMatch::AtLeastOneNonNullField => {
|
||||
trace!("Metadata predicate: table matches");
|
||||
// Meta data of the table covers predicates of the request
|
||||
builder.append_meta_data_table(table_name.to_string());
|
||||
builder.append_string(table_name);
|
||||
}
|
||||
PredicateMatch::Unknown => {
|
||||
trace!("Metadata predicate: unknown match");
|
||||
// We cannot match the predicate to get answer from meta data, let do full plan
|
||||
full_plan_table_chunks
|
||||
.entry(table_name.to_string())
|
||||
.or_insert_with(Vec::new)
|
||||
.push(Arc::clone(&chunk));
|
||||
}
|
||||
PredicateMatch::Zero => {} // this chunk's table does not participate in the request
|
||||
PredicateMatch::Zero => {
|
||||
trace!("Metadata predicate: zero rows match");
|
||||
} // this chunk's table does not participate in the request
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// remove items from full_plan_table_chunks whose tables are already in the returned list
|
||||
let meta_data_tables = builder.meta_data_table_names();
|
||||
for table in meta_data_tables {
|
||||
full_plan_table_chunks.remove(&table);
|
||||
// remove items from full_plan_table_chunks whose tables are
|
||||
// already in the returned list
|
||||
for table in builder.known_strings_iter() {
|
||||
trace!(%table, "Table is known to have matches, skipping plan");
|
||||
full_plan_table_chunks.remove(table);
|
||||
if full_plan_table_chunks.is_empty() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// No full plans needed
|
||||
if full_plan_table_chunks.is_empty() {
|
||||
return Ok(builder);
|
||||
}
|
||||
|
||||
// Now build plans for full-plan tables
|
||||
for (table_name, chunks) in full_plan_table_chunks {
|
||||
let schema = database.table_schema(&table_name).context(TableRemoved {
|
||||
|
|
@ -309,11 +312,11 @@ impl InfluxRpcPlanner {
|
|||
if let Some(plan) =
|
||||
self.table_name_plan(&table_name, schema, &mut normalizer, chunks)?
|
||||
{
|
||||
builder.append_plans(table_name, plan);
|
||||
builder = builder.append_other(plan.into());
|
||||
}
|
||||
}
|
||||
|
||||
Ok(builder)
|
||||
builder.build().context(CreatingStringSet)
|
||||
}
|
||||
|
||||
/// Returns a set of plans that produces the names of "tag"
|
||||
|
|
@ -427,14 +430,14 @@ impl InfluxRpcPlanner {
|
|||
let plan = self.tag_keys_plan(&table_name, schema, &mut normalizer, chunks)?;
|
||||
|
||||
if let Some(plan) = plan {
|
||||
builder = builder.append(plan)
|
||||
builder = builder.append_other(plan)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// add the known columns we could find from metadata only
|
||||
builder
|
||||
.append(known_columns.into())
|
||||
.append_other(known_columns.into())
|
||||
.build()
|
||||
.context(CreatingStringSet)
|
||||
}
|
||||
|
|
@ -596,13 +599,13 @@ impl InfluxRpcPlanner {
|
|||
.build()
|
||||
.context(BuildingPlan)?;
|
||||
|
||||
builder = builder.append(plan.into());
|
||||
builder = builder.append_other(plan.into());
|
||||
}
|
||||
}
|
||||
|
||||
// add the known values we could find from metadata only
|
||||
builder
|
||||
.append(known_values.into())
|
||||
.append_other(known_values.into())
|
||||
.build()
|
||||
.context(CreatingStringSet)
|
||||
}
|
||||
|
|
@ -830,19 +833,13 @@ impl InfluxRpcPlanner {
|
|||
chunk_id: chunk.id(),
|
||||
})?;
|
||||
|
||||
match pred_result {
|
||||
PredicateMatch::AtLeastOne |
|
||||
if !matches!(pred_result, PredicateMatch::Zero) {
|
||||
// have to include chunk as we can't rule it out
|
||||
PredicateMatch::Unknown => {
|
||||
let table_name = chunk.table_name().to_string();
|
||||
table_chunks
|
||||
.entry(table_name)
|
||||
.or_insert_with(Vec::new)
|
||||
.push(Arc::clone(&chunk));
|
||||
}
|
||||
// Skip chunk here based on metadata
|
||||
PredicateMatch::Zero => {
|
||||
}
|
||||
let table_name = chunk.table_name().to_string();
|
||||
table_chunks
|
||||
.entry(table_name)
|
||||
.or_insert_with(Vec::new)
|
||||
.push(Arc::clone(&chunk));
|
||||
}
|
||||
}
|
||||
Ok(table_chunks)
|
||||
|
|
@ -960,11 +957,11 @@ impl InfluxRpcPlanner {
|
|||
Ok(Some(plan))
|
||||
}
|
||||
|
||||
/// Creates a DataFusion LogicalPlan that returns the timestamp
|
||||
/// for a specified table:
|
||||
/// Creates a DataFusion LogicalPlan that returns the values in
|
||||
/// the fields for a specified table:
|
||||
///
|
||||
/// The output looks like (time)
|
||||
/// The time column is chosen because it must be included in all tables
|
||||
/// The output produces the table name as a single string if any
|
||||
/// non null values are passed in.
|
||||
///
|
||||
/// The data is not sorted in any particular order
|
||||
///
|
||||
|
|
@ -974,13 +971,11 @@ impl InfluxRpcPlanner {
|
|||
/// The created plan looks like:
|
||||
///
|
||||
/// ```text
|
||||
/// Projection (select time)
|
||||
/// NonNullChecker
|
||||
/// Projection (select fields)
|
||||
/// Filter(predicate) [optional]
|
||||
/// Scan
|
||||
/// ```
|
||||
// TODO: for optimization in the future, build `select count(*)` plan instead,
|
||||
// ,but if we do this, we also need to change the way we handle output
|
||||
// of the function invoking this because it will always return a number
|
||||
fn table_name_plan<C>(
|
||||
&self,
|
||||
table_name: &str,
|
||||
|
|
@ -991,6 +986,7 @@ impl InfluxRpcPlanner {
|
|||
where
|
||||
C: QueryChunk + 'static,
|
||||
{
|
||||
debug!(%table_name, "Creating table_name full plan");
|
||||
let scan_and_filter = self.scan_and_filter(table_name, schema, normalizer, chunks)?;
|
||||
let TableScanAndFilter {
|
||||
plan_builder,
|
||||
|
|
@ -1000,15 +996,11 @@ impl InfluxRpcPlanner {
|
|||
Some(t) => t,
|
||||
};
|
||||
|
||||
// Selection of only time
|
||||
let select_exprs = schema
|
||||
.iter()
|
||||
.filter_map(|(influx_column_type, field)| match influx_column_type {
|
||||
Some(InfluxColumnType::Timestamp) => Some(col(field.name())),
|
||||
Some(_) => None,
|
||||
None => None,
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
// Select only fields requested
|
||||
let predicate = normalizer.normalized(table_name, Arc::clone(&schema));
|
||||
let select_exprs: Vec<_> = filtered_fields_iter(&schema, &predicate)
|
||||
.map(|field| col(field.name().as_str()))
|
||||
.collect();
|
||||
|
||||
let plan = plan_builder
|
||||
.project(select_exprs)
|
||||
|
|
@ -1016,6 +1008,9 @@ impl InfluxRpcPlanner {
|
|||
.build()
|
||||
.context(BuildingPlan)?;
|
||||
|
||||
// Add the final node that outputs the table name or not, depending
|
||||
let plan = make_non_null_checker(table_name, plan);
|
||||
|
||||
Ok(Some(plan))
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -1,4 +1,4 @@
|
|||
use std::{collections::BTreeMap, sync::Arc};
|
||||
use std::sync::Arc;
|
||||
|
||||
use arrow_util::util::str_iter_to_batch;
|
||||
use datafusion::logical_plan::LogicalPlan;
|
||||
|
|
@ -97,7 +97,7 @@ impl StringSetPlanBuilder {
|
|||
|
||||
/// Append the strings from the passed plan into ourselves if possible, or
|
||||
/// passes on the plan
|
||||
pub fn append(mut self, other: StringSetPlan) -> Self {
|
||||
pub fn append_other(mut self, other: StringSetPlan) -> Self {
|
||||
match other {
|
||||
StringSetPlan::Known(ssref) => match Arc::try_unwrap(ssref) {
|
||||
Ok(mut ss) => {
|
||||
|
|
@ -117,6 +117,23 @@ impl StringSetPlanBuilder {
|
|||
self
|
||||
}
|
||||
|
||||
/// Return true if we know already that `s` is contained in the
|
||||
/// StringSet. Note that if `contains()` returns false, `s` may be
|
||||
/// in the stringset after execution.
|
||||
pub fn contains(&self, s: impl AsRef<str>) -> bool {
|
||||
self.strings.contains(s.as_ref())
|
||||
}
|
||||
|
||||
/// Append a single string to the known set of strings in this builder
|
||||
pub fn append_string(&mut self, s: impl Into<String>) {
|
||||
self.strings.insert(s.into());
|
||||
}
|
||||
|
||||
/// returns an iterator over the currently known strings in this builder
|
||||
pub fn known_strings_iter(&self) -> impl Iterator<Item = &String> {
|
||||
self.strings.iter()
|
||||
}
|
||||
|
||||
/// Create a StringSetPlan that produces the deduplicated (union)
|
||||
/// of all plans `append`ed to this builder.
|
||||
pub fn build(self) -> Result<StringSetPlan> {
|
||||
|
|
@ -143,39 +160,6 @@ impl StringSetPlanBuilder {
|
|||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
pub struct TableNamePlanBuilder {
|
||||
/// Known tables achieved from meta data
|
||||
meta_data_tables: StringSet,
|
||||
/// Other tables and their general plans
|
||||
plans: BTreeMap<String, LogicalPlan>,
|
||||
}
|
||||
|
||||
impl TableNamePlanBuilder {
|
||||
pub fn new() -> Self {
|
||||
Self::default()
|
||||
}
|
||||
pub fn append_meta_data_table(&mut self, table: String) {
|
||||
self.meta_data_tables.insert(table);
|
||||
}
|
||||
|
||||
pub fn append_plans(&mut self, table_name: String, plan: LogicalPlan) {
|
||||
self.plans.insert(table_name, plan);
|
||||
}
|
||||
|
||||
pub fn contains_meta_data_table(&self, table: String) -> bool {
|
||||
self.meta_data_tables.contains(&table)
|
||||
}
|
||||
|
||||
pub fn meta_data_table_names(&self) -> StringSet {
|
||||
self.meta_data_tables.clone()
|
||||
}
|
||||
|
||||
pub fn table_plans(&self) -> BTreeMap<String, LogicalPlan> {
|
||||
self.plans.clone()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::exec::{Executor, ExecutorType};
|
||||
|
|
@ -196,8 +180,8 @@ mod tests {
|
|||
#[test]
|
||||
fn test_builder_strings_only() {
|
||||
let plan = StringSetPlanBuilder::new()
|
||||
.append(to_string_set(&["foo", "bar"]).into())
|
||||
.append(to_string_set(&["bar", "baz"]).into())
|
||||
.append_other(to_string_set(&["foo", "bar"]).into())
|
||||
.append_other(to_string_set(&["bar", "baz"]).into())
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
|
|
@ -228,9 +212,9 @@ mod tests {
|
|||
|
||||
// when a df plan is appended the whole plan should be different
|
||||
let plan = StringSetPlanBuilder::new()
|
||||
.append(to_string_set(&["foo", "bar"]).into())
|
||||
.append(vec![df_plan].into())
|
||||
.append(to_string_set(&["baz"]).into())
|
||||
.append_other(to_string_set(&["foo", "bar"]).into())
|
||||
.append_other(vec![df_plan].into())
|
||||
.append_other(to_string_set(&["baz"]).into())
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
|
|
|
|||
|
|
@ -2,7 +2,7 @@
|
|||
name = "query_tests"
|
||||
version = "0.1.0"
|
||||
authors = ["Andrew Lamb <andrew@nerdnetworks.org>"]
|
||||
edition = "2018"
|
||||
edition = "2021"
|
||||
description = "Tests of the query engine against different database configurations"
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
|
|
|||
|
|
@ -3,9 +3,9 @@ name = "generate"
|
|||
description = "Creates rust #tests for files in .sql"
|
||||
version = "0.1.0"
|
||||
authors = ["Andrew Lamb <andrew@nerdnetworks.org>"]
|
||||
edition = "2018"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies] # In alphabetical order
|
||||
|
||||
# Note this is a standalone binary and not part of the overall workspace
|
||||
[workspace]
|
||||
[workspace]
|
||||
|
|
|
|||
|
|
@ -1,4 +1,5 @@
|
|||
//! Tests for the Influx gRPC queries
|
||||
use datafusion::logical_plan::{col, lit};
|
||||
use predicate::predicate::{Predicate, PredicateBuilder, EMPTY_PREDICATE};
|
||||
use query::{
|
||||
exec::stringset::{IntoStringSet, StringSetRef},
|
||||
|
|
@ -24,11 +25,12 @@ where
|
|||
let planner = InfluxRpcPlanner::new();
|
||||
let ctx = db.executor().new_context(query::exec::ExecutorType::Query);
|
||||
|
||||
let builder = planner
|
||||
let plan = planner
|
||||
.table_names(db.as_ref(), predicate.clone())
|
||||
.expect("built plan successfully");
|
||||
|
||||
let names = ctx
|
||||
.to_table_names(builder)
|
||||
.to_string_set(plan)
|
||||
.await
|
||||
.expect("converted plan to strings successfully");
|
||||
|
||||
|
|
@ -53,6 +55,49 @@ async fn list_table_names_no_data_pred() {
|
|||
run_table_names_test_case(TwoMeasurements {}, EMPTY_PREDICATE, vec!["cpu", "disk"]).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn list_table_names_no_data_passes() {
|
||||
// no rows pass this predicate
|
||||
run_table_names_test_case(
|
||||
TwoMeasurementsManyFields {},
|
||||
tsp(10000000, 20000000),
|
||||
vec![],
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn list_table_names_no_non_null_data_passes() {
|
||||
// only a single row with a null field passes this predicate (expect no table names)
|
||||
let predicate = PredicateBuilder::default()
|
||||
.table("o2")
|
||||
// only get last row of o2 (timestamp = 300)
|
||||
.timestamp_range(200, 400)
|
||||
// model predicate like _field='reading' which last row does not have
|
||||
.field_columns(vec!["reading"])
|
||||
.build();
|
||||
|
||||
run_table_names_test_case(TwoMeasurementsManyFields {}, predicate, vec![]).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn list_table_names_no_non_null_general_data_passes() {
|
||||
// only a single row with a null field passes this predicate
|
||||
// (expect no table names) -- has a general purpose predicate to
|
||||
// force a generic plan
|
||||
let predicate = PredicateBuilder::default()
|
||||
.table("o2")
|
||||
// only get last row of o2 (timestamp = 300)
|
||||
.timestamp_range(200, 400)
|
||||
// model predicate like _field='reading' which last row does not have
|
||||
.field_columns(vec!["reading"])
|
||||
// (state = CA) OR (temp > 50)
|
||||
.add_expr(col("state").eq(lit("CA")).or(col("temp").gt(lit(50))))
|
||||
.build();
|
||||
|
||||
run_table_names_test_case(TwoMeasurementsManyFields {}, predicate, vec![]).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn list_table_names_no_data_pred_with_delete() {
|
||||
run_table_names_test_case(
|
||||
|
|
|
|||
|
|
@ -103,11 +103,11 @@ async fn chunk_pruning_influxrpc() {
|
|||
|
||||
let ctx = db.executor().new_context(query::exec::ExecutorType::Query);
|
||||
|
||||
let builder = InfluxRpcPlanner::new()
|
||||
let plan = InfluxRpcPlanner::new()
|
||||
.table_names(db.as_ref(), predicate)
|
||||
.unwrap();
|
||||
|
||||
let result = ctx.to_table_names(builder).await.unwrap();
|
||||
let result = ctx.to_string_set(plan).await.unwrap();
|
||||
|
||||
assert_eq!(&expected, result.as_ref());
|
||||
|
||||
|
|
|
|||
|
|
@ -2,7 +2,7 @@
|
|||
name = "read_buffer"
|
||||
version = "0.1.0"
|
||||
authors = ["Edd Robinson <me@edd.io>"]
|
||||
edition = "2018"
|
||||
edition = "2021"
|
||||
|
||||
# Note this crate is designed to be standalone, and should not depend
|
||||
# on the IOx Query Engine. The rationale is:
|
||||
|
|
|
|||
|
|
@ -1,4 +1,4 @@
|
|||
edition = "2018"
|
||||
edition = "2021"
|
||||
|
||||
# Unstable features not yet supported on stable Rust
|
||||
#wrap_comments = true
|
||||
|
|
|
|||
|
|
@ -2,7 +2,7 @@
|
|||
name = "schema"
|
||||
version = "0.1.0"
|
||||
authors = ["Andrew Lamb <andrew@nerdnetworks.org>"]
|
||||
edition = "2018"
|
||||
edition = "2021"
|
||||
description = "IOx Schema definition"
|
||||
|
||||
[dependencies]
|
||||
|
|
|
|||
|
|
@ -2,7 +2,7 @@
|
|||
name = "server"
|
||||
version = "0.1.0"
|
||||
authors = ["pauldix <paul@pauldix.net>"]
|
||||
edition = "2018"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies] # In alphabetical order
|
||||
arrow = { version = "6.0", features = ["prettyprint"] }
|
||||
|
|
|
|||
|
|
@ -4,6 +4,7 @@ use object_store::ObjectStore;
|
|||
use observability_deps::tracing::info;
|
||||
use query::exec::Executor;
|
||||
use time::TimeProvider;
|
||||
use trace::TraceCollector;
|
||||
use write_buffer::config::WriteBufferConfigFactory;
|
||||
|
||||
use crate::JobRegistry;
|
||||
|
|
@ -18,13 +19,18 @@ pub struct ApplicationState {
|
|||
job_registry: Arc<JobRegistry>,
|
||||
metric_registry: Arc<metric::Registry>,
|
||||
time_provider: Arc<dyn TimeProvider>,
|
||||
trace_collector: Option<Arc<dyn TraceCollector>>,
|
||||
}
|
||||
|
||||
impl ApplicationState {
|
||||
/// Creates a new `ApplicationState`
|
||||
///
|
||||
/// Uses number of CPUs in the system if num_worker_threads is not set
|
||||
pub fn new(object_store: Arc<ObjectStore>, num_worker_threads: Option<usize>) -> Self {
|
||||
pub fn new(
|
||||
object_store: Arc<ObjectStore>,
|
||||
num_worker_threads: Option<usize>,
|
||||
trace_collector: Option<Arc<dyn TraceCollector>>,
|
||||
) -> Self {
|
||||
let num_threads = num_worker_threads.unwrap_or_else(num_cpus::get);
|
||||
info!(%num_threads, "using specified number of threads per thread pool");
|
||||
|
||||
|
|
@ -45,6 +51,7 @@ impl ApplicationState {
|
|||
job_registry,
|
||||
metric_registry,
|
||||
time_provider,
|
||||
trace_collector,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -68,6 +75,10 @@ impl ApplicationState {
|
|||
&self.time_provider
|
||||
}
|
||||
|
||||
pub fn trace_collector(&self) -> &Option<Arc<dyn TraceCollector>> {
|
||||
&self.trace_collector
|
||||
}
|
||||
|
||||
pub fn executor(&self) -> &Arc<Executor> {
|
||||
&self.executor
|
||||
}
|
||||
|
|
|
|||
|
|
@ -30,7 +30,6 @@ use std::{future::Future, sync::Arc, time::Duration};
|
|||
use tokio::{sync::Notify, task::JoinError};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use trace::ctx::SpanContext;
|
||||
use trace::{RingBufferTraceCollector, TraceCollector};
|
||||
use uuid::Uuid;
|
||||
|
||||
const INIT_BACKOFF: Duration = Duration::from_secs(1);
|
||||
|
|
@ -1312,10 +1311,8 @@ impl DatabaseStateCatalogLoaded {
|
|||
) -> Result<DatabaseStateInitialized, InitError> {
|
||||
let db = Arc::clone(&self.db);
|
||||
|
||||
// TODO: use proper trace collector
|
||||
let trace_collector: Arc<dyn TraceCollector> = Arc::new(RingBufferTraceCollector::new(5));
|
||||
|
||||
let rules = self.provided_rules.rules();
|
||||
let trace_collector = shared.application.trace_collector();
|
||||
let write_buffer_factory = shared.application.write_buffer_factory();
|
||||
let write_buffer_consumer = match rules.write_buffer_connection.as_ref() {
|
||||
Some(connection) if matches!(connection.direction, WriteBufferDirection::Read) => {
|
||||
|
|
@ -1323,7 +1320,7 @@ impl DatabaseStateCatalogLoaded {
|
|||
.new_config_read(
|
||||
shared.config.server_id,
|
||||
shared.config.name.as_str(),
|
||||
Some(&trace_collector),
|
||||
trace_collector.as_ref(),
|
||||
connection,
|
||||
)
|
||||
.await
|
||||
|
|
@ -1375,13 +1372,14 @@ impl DatabaseStateInitialized {
|
|||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::test_utils::make_application;
|
||||
|
||||
use super::*;
|
||||
use data_types::database_rules::{
|
||||
PartitionTemplate, TemplatePart, WriteBufferConnection, WriteBufferDirection,
|
||||
};
|
||||
use data_types::sequence::Sequence;
|
||||
use entry::{test_helpers::lp_to_entries, SequencedEntry};
|
||||
use object_store::ObjectStore;
|
||||
use std::{
|
||||
convert::{TryFrom, TryInto},
|
||||
num::NonZeroU32,
|
||||
|
|
@ -1393,10 +1391,7 @@ mod tests {
|
|||
|
||||
#[tokio::test]
|
||||
async fn database_shutdown_waits_for_jobs() {
|
||||
let application = Arc::new(ApplicationState::new(
|
||||
Arc::new(ObjectStore::new_in_memory()),
|
||||
None,
|
||||
));
|
||||
let application = make_application();
|
||||
|
||||
let database = Database::new(
|
||||
Arc::clone(&application),
|
||||
|
|
@ -1454,10 +1449,7 @@ mod tests {
|
|||
|
||||
async fn initialized_database() -> (Arc<ApplicationState>, Database) {
|
||||
let server_id = ServerId::try_from(1).unwrap();
|
||||
let application = Arc::new(ApplicationState::new(
|
||||
Arc::new(ObjectStore::new_in_memory()),
|
||||
None,
|
||||
));
|
||||
let application = make_application();
|
||||
|
||||
let db_name = DatabaseName::new("test").unwrap();
|
||||
let uuid = Uuid::new_v4();
|
||||
|
|
@ -1594,10 +1586,7 @@ mod tests {
|
|||
));
|
||||
|
||||
// setup application
|
||||
let application = Arc::new(ApplicationState::new(
|
||||
Arc::new(ObjectStore::new_in_memory()),
|
||||
None,
|
||||
));
|
||||
let application = make_application();
|
||||
application
|
||||
.write_buffer_factory()
|
||||
.register_mock("my_mock".to_string(), state.clone());
|
||||
|
|
|
|||
|
|
@ -20,6 +20,7 @@ use predicate::{
|
|||
};
|
||||
use query::{exec::stringset::StringSet, QueryChunk, QueryChunkMeta};
|
||||
use read_buffer::RBChunk;
|
||||
use schema::InfluxColumnType;
|
||||
use schema::{selection::Selection, sort::SortKey, Schema};
|
||||
use snafu::{OptionExt, ResultExt, Snafu};
|
||||
use std::{
|
||||
|
|
@ -237,6 +238,31 @@ impl DbChunk {
|
|||
debug!(?rub_preds, "RUB delete predicates");
|
||||
Ok(rub_preds)
|
||||
}
|
||||
|
||||
/// Return true if any of the fields called for in the `predicate`
|
||||
/// contain at least 1 null value. Returns false ONLY if all
|
||||
/// fields that pass `predicate` are entirely non null
|
||||
fn fields_have_nulls(&self, predicate: &Predicate) -> bool {
|
||||
self.meta.schema.iter().any(|(influx_column_type, field)| {
|
||||
if matches!(influx_column_type, Some(InfluxColumnType::Field(_)))
|
||||
&& predicate.should_include_field(field.name())
|
||||
{
|
||||
match self.meta.table_summary.column(field.name()) {
|
||||
Some(column_summary) => {
|
||||
// only if this is false can we return false
|
||||
column_summary.null_count() > 0
|
||||
}
|
||||
None => {
|
||||
// don't know the stats for this column, so assume there can be nulls
|
||||
true
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// not a field column
|
||||
false
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl QueryChunk for DbChunk {
|
||||
|
|
@ -264,23 +290,12 @@ impl QueryChunk for DbChunk {
|
|||
return Ok(PredicateMatch::Zero);
|
||||
}
|
||||
|
||||
// TODO apply predicate pruning here...
|
||||
|
||||
let pred_result = match &self.state {
|
||||
State::MutableBuffer { chunk, .. } => {
|
||||
if predicate.has_exprs() {
|
||||
// TODO: Support more predicates
|
||||
if predicate.has_exprs() || chunk.has_timerange(&predicate.range) {
|
||||
// TODO some more work to figure out if we
|
||||
// definite have / do not have result
|
||||
PredicateMatch::Unknown
|
||||
} else if chunk.has_timerange(&predicate.range) {
|
||||
// Note: this isn't precise / correct: if the
|
||||
// chunk has the timerange, some other part of the
|
||||
// predicate may rule out the rows, and thus
|
||||
// without further work this clause should return
|
||||
// "Unknown" rather than falsely claiming that
|
||||
// there is at least one row:
|
||||
//
|
||||
// https://github.com/influxdata/influxdb_iox/issues/1590
|
||||
PredicateMatch::AtLeastOne
|
||||
} else {
|
||||
PredicateMatch::Zero
|
||||
}
|
||||
|
|
@ -305,19 +320,21 @@ impl QueryChunk for DbChunk {
|
|||
// on meta-data only. This should be possible without needing to
|
||||
// know the execution engine the chunk is held in.
|
||||
if chunk.satisfies_predicate(&rb_predicate) {
|
||||
PredicateMatch::AtLeastOne
|
||||
// if any of the fields referred to in the
|
||||
// predicate has nulls, don't know without more
|
||||
// work if the rows that matched had non null values
|
||||
if self.fields_have_nulls(predicate) {
|
||||
PredicateMatch::Unknown
|
||||
} else {
|
||||
PredicateMatch::AtLeastOneNonNullField
|
||||
}
|
||||
} else {
|
||||
PredicateMatch::Zero
|
||||
}
|
||||
}
|
||||
State::ParquetFile { chunk, .. } => {
|
||||
if predicate.has_exprs() {
|
||||
// TODO: Support more predicates
|
||||
if predicate.has_exprs() || chunk.has_timerange(predicate.range.as_ref()) {
|
||||
PredicateMatch::Unknown
|
||||
} else if chunk.has_timerange(predicate.range.as_ref()) {
|
||||
// As above, this should really be "Unknown" rather than AtLeastOne
|
||||
// for precision / correctness.
|
||||
PredicateMatch::AtLeastOne
|
||||
} else {
|
||||
PredicateMatch::Zero
|
||||
}
|
||||
|
|
|
|||
|
|
@ -408,14 +408,11 @@ impl SequenceNumberSection {
|
|||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
use std::{
|
||||
convert::TryFrom,
|
||||
num::{NonZeroU32, NonZeroU64, NonZeroUsize},
|
||||
sync::Arc,
|
||||
time::{Duration, Instant},
|
||||
use crate::{
|
||||
lifecycle::LifecycleWorker,
|
||||
utils::{TestDb, TestDbBuilder},
|
||||
write_buffer::WriteBufferConsumer,
|
||||
};
|
||||
|
||||
use arrow_util::assert_batches_eq;
|
||||
use data_types::{
|
||||
database_rules::{PartitionTemplate, Partitioner, TemplatePart},
|
||||
|
|
@ -432,16 +429,18 @@ mod tests {
|
|||
min_max_sequence::OptionalMinMaxSequence,
|
||||
};
|
||||
use query::{exec::ExecutionContextProvider, frontend::sql::SqlQueryPlanner};
|
||||
use std::{
|
||||
convert::TryFrom,
|
||||
num::{NonZeroU32, NonZeroU64, NonZeroUsize},
|
||||
sync::Arc,
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
use test_helpers::{assert_contains, assert_not_contains, tracing::TracingCapture};
|
||||
use time::{Time, TimeProvider};
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use write_buffer::mock::{MockBufferForReading, MockBufferSharedState};
|
||||
|
||||
use crate::lifecycle::LifecycleWorker;
|
||||
use crate::utils::TestDb;
|
||||
use crate::write_buffer::WriteBufferConsumer;
|
||||
|
||||
#[derive(Debug)]
|
||||
struct TestSequencedEntry {
|
||||
sequencer_id: u32,
|
||||
|
|
@ -572,15 +571,17 @@ mod tests {
|
|||
let write_buffer_state =
|
||||
MockBufferSharedState::empty_with_n_sequencers(self.n_sequencers);
|
||||
|
||||
let (mut test_db, mut shutdown, mut join_handle) = Self::create_test_db(
|
||||
let test_db_builder = Self::create_test_db_builder(
|
||||
Arc::clone(&object_store),
|
||||
server_id,
|
||||
db_name,
|
||||
partition_template.clone(),
|
||||
self.catalog_transactions_until_checkpoint,
|
||||
Arc::<time::MockProvider>::clone(&time),
|
||||
)
|
||||
.await;
|
||||
);
|
||||
|
||||
let (mut test_db, mut shutdown, mut join_handle) =
|
||||
Self::create_test_db(&test_db_builder).await;
|
||||
|
||||
let mut lifecycle = LifecycleWorker::new(Arc::clone(&test_db.db));
|
||||
|
||||
|
|
@ -620,15 +621,8 @@ mod tests {
|
|||
drop(test_db);
|
||||
|
||||
// then create new one
|
||||
let (test_db_tmp, shutdown_tmp, join_handle_tmp) = Self::create_test_db(
|
||||
Arc::clone(&object_store),
|
||||
server_id,
|
||||
db_name,
|
||||
partition_template.clone(),
|
||||
self.catalog_transactions_until_checkpoint,
|
||||
Arc::<time::MockProvider>::clone(&time),
|
||||
)
|
||||
.await;
|
||||
let (test_db_tmp, shutdown_tmp, join_handle_tmp) =
|
||||
Self::create_test_db(&test_db_builder).await;
|
||||
test_db = test_db_tmp;
|
||||
shutdown = shutdown_tmp;
|
||||
join_handle = join_handle_tmp;
|
||||
|
|
@ -759,14 +753,29 @@ mod tests {
|
|||
}
|
||||
|
||||
async fn create_test_db(
|
||||
builder: &TestDbBuilder,
|
||||
) -> (TestDb, CancellationToken, JoinHandle<()>) {
|
||||
let test_db = builder.build().await;
|
||||
// start background worker
|
||||
|
||||
let shutdown: CancellationToken = Default::default();
|
||||
let shutdown_captured = shutdown.clone();
|
||||
let db_captured = Arc::clone(&test_db.db);
|
||||
let join_handle =
|
||||
tokio::spawn(async move { db_captured.background_worker(shutdown_captured).await });
|
||||
|
||||
(test_db, shutdown, join_handle)
|
||||
}
|
||||
|
||||
fn create_test_db_builder(
|
||||
object_store: Arc<ObjectStore>,
|
||||
server_id: ServerId,
|
||||
db_name: &'static str,
|
||||
partition_template: PartitionTemplate,
|
||||
catalog_transactions_until_checkpoint: NonZeroU64,
|
||||
time_provider: Arc<dyn TimeProvider>,
|
||||
) -> (TestDb, CancellationToken, JoinHandle<()>) {
|
||||
let test_db = TestDb::builder()
|
||||
) -> TestDbBuilder {
|
||||
TestDb::builder()
|
||||
.object_store(object_store)
|
||||
.server_id(server_id)
|
||||
.lifecycle_rules(data_types::database_rules::LifecycleRules {
|
||||
|
|
@ -779,17 +788,6 @@ mod tests {
|
|||
.partition_template(partition_template)
|
||||
.time_provider(time_provider)
|
||||
.db_name(db_name)
|
||||
.build()
|
||||
.await;
|
||||
|
||||
// start background worker
|
||||
let shutdown: CancellationToken = Default::default();
|
||||
let shutdown_captured = shutdown.clone();
|
||||
let db_captured = Arc::clone(&test_db.db);
|
||||
let join_handle =
|
||||
tokio::spawn(async move { db_captured.background_worker(shutdown_captured).await });
|
||||
|
||||
(test_db, shutdown, join_handle)
|
||||
}
|
||||
|
||||
/// Evaluates given checks.
|
||||
|
|
|
|||
|
|
@ -876,6 +876,9 @@ where
|
|||
// immediately to the client and abort all other outstanding requests.
|
||||
futures_util::future::try_join_all(sharded_entries.into_iter().map(
|
||||
|sharded_entry| async {
|
||||
// capture entire entry in closure
|
||||
let sharded_entry = sharded_entry;
|
||||
|
||||
let sink = match &rules.routing_rules {
|
||||
Some(RoutingRules::ShardConfig(shard_config)) => {
|
||||
let id = sharded_entry.shard_id.expect("sharded entry");
|
||||
|
|
@ -1346,6 +1349,7 @@ pub mod test_utils {
|
|||
Arc::new(ApplicationState::new(
|
||||
Arc::new(ObjectStore::new_in_memory()),
|
||||
None,
|
||||
None,
|
||||
))
|
||||
}
|
||||
|
||||
|
|
@ -2078,7 +2082,7 @@ mod tests {
|
|||
async fn init_error_generic() {
|
||||
// use an object store that will hopefully fail to read
|
||||
let store = Arc::new(ObjectStore::new_failing_store().unwrap());
|
||||
let application = Arc::new(ApplicationState::new(store, None));
|
||||
let application = Arc::new(ApplicationState::new(store, None, None));
|
||||
let server = make_server(application);
|
||||
|
||||
server.set_id(ServerId::try_from(1).unwrap()).unwrap();
|
||||
|
|
|
|||
|
|
@ -10,6 +10,7 @@ use tokio_util::sync::CancellationToken;
|
|||
|
||||
use entry::SequencedEntry;
|
||||
use observability_deps::tracing::{debug, error, info, warn};
|
||||
use trace::span::SpanRecorder;
|
||||
use write_buffer::core::{FetchHighWatermark, WriteBufferError, WriteBufferReading};
|
||||
|
||||
use crate::Db;
|
||||
|
|
@ -151,12 +152,20 @@ async fn stream_in_sequenced_entries<'a>(
|
|||
// store entry
|
||||
let mut logged_hard_limit = false;
|
||||
loop {
|
||||
let mut span_recorder = SpanRecorder::new(
|
||||
sequenced_entry
|
||||
.span_context()
|
||||
.map(|parent| parent.child("IOx write buffer")),
|
||||
);
|
||||
|
||||
match db.store_sequenced_entry(
|
||||
Arc::clone(&sequenced_entry),
|
||||
crate::db::filter_table_batch_keep_all,
|
||||
) {
|
||||
Ok(_) => {
|
||||
metrics.success();
|
||||
span_recorder.ok("stored entry");
|
||||
|
||||
break;
|
||||
}
|
||||
Err(crate::db::Error::HardLimitReached {}) => {
|
||||
|
|
@ -169,6 +178,8 @@ async fn stream_in_sequenced_entries<'a>(
|
|||
);
|
||||
logged_hard_limit = true;
|
||||
}
|
||||
span_recorder.error("hard limit reached");
|
||||
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
continue;
|
||||
}
|
||||
|
|
@ -179,6 +190,7 @@ async fn stream_in_sequenced_entries<'a>(
|
|||
sequencer_id,
|
||||
"Error storing SequencedEntry from write buffer in database"
|
||||
);
|
||||
span_recorder.error("cannot store entry");
|
||||
|
||||
// no retry
|
||||
break;
|
||||
|
|
|
|||
|
|
@ -2,7 +2,7 @@
|
|||
name = "server_benchmarks"
|
||||
version = "0.1.0"
|
||||
authors = ["Andrew Lamb <andrew@nerdnetworks.org>"]
|
||||
edition = "2018"
|
||||
edition = "2021"
|
||||
description = "Server related bechmarks, grouped into their own crate to minimize build dev build times"
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
|
|
|||
|
|
@ -79,7 +79,10 @@ async fn wait_for_signal() {
|
|||
let _ = tokio::signal::ctrl_c().await;
|
||||
}
|
||||
|
||||
async fn make_application(config: &Config) -> Result<Arc<ApplicationState>> {
|
||||
async fn make_application(
|
||||
config: &Config,
|
||||
trace_collector: Option<Arc<dyn TraceCollector>>,
|
||||
) -> Result<Arc<ApplicationState>> {
|
||||
warn_about_inmem_store(&config.object_store_config);
|
||||
let object_store =
|
||||
ObjectStore::try_from(&config.object_store_config).context(ObjectStoreParsing)?;
|
||||
|
|
@ -91,6 +94,7 @@ async fn make_application(config: &Config) -> Result<Arc<ApplicationState>> {
|
|||
Ok(Arc::new(ApplicationState::new(
|
||||
object_storage,
|
||||
config.num_worker_threads,
|
||||
trace_collector,
|
||||
)))
|
||||
}
|
||||
|
||||
|
|
@ -178,7 +182,11 @@ pub async fn main(config: Config) -> Result<()> {
|
|||
let f = SendPanicsToTracing::new();
|
||||
std::mem::forget(f);
|
||||
|
||||
let application = make_application(&config).await?;
|
||||
let async_exporter = config.tracing_config.build().context(Tracing)?;
|
||||
let trace_collector = async_exporter
|
||||
.clone()
|
||||
.map(|x| -> Arc<dyn TraceCollector> { x });
|
||||
let application = make_application(&config, trace_collector).await?;
|
||||
|
||||
// Register jemalloc metrics
|
||||
application
|
||||
|
|
@ -189,17 +197,12 @@ pub async fn main(config: Config) -> Result<()> {
|
|||
|
||||
let grpc_listener = grpc_listener(config.grpc_bind_address).await?;
|
||||
let http_listener = http_listener(config.http_bind_address).await?;
|
||||
let async_exporter = config.tracing_config.build().context(Tracing)?;
|
||||
let trace_collector = async_exporter
|
||||
.clone()
|
||||
.map(|x| -> Arc<dyn TraceCollector> { x });
|
||||
|
||||
let r = serve(
|
||||
config,
|
||||
application,
|
||||
grpc_listener,
|
||||
http_listener,
|
||||
trace_collector,
|
||||
app_server,
|
||||
)
|
||||
.await;
|
||||
|
|
@ -241,7 +244,6 @@ async fn serve(
|
|||
application: Arc<ApplicationState>,
|
||||
grpc_listener: tokio::net::TcpListener,
|
||||
http_listener: AddrIncoming,
|
||||
trace_collector: Option<Arc<dyn TraceCollector>>,
|
||||
app_server: Arc<AppServer<ConnectionManager>>,
|
||||
) -> Result<()> {
|
||||
// Construct a token to trigger shutdown of API services
|
||||
|
|
@ -262,7 +264,6 @@ async fn serve(
|
|||
Arc::clone(&application),
|
||||
Arc::clone(&app_server),
|
||||
trace_header_parser.clone(),
|
||||
trace_collector.clone(),
|
||||
frontend_shutdown.clone(),
|
||||
config.initial_serving_state.into(),
|
||||
)
|
||||
|
|
@ -279,7 +280,6 @@ async fn serve(
|
|||
frontend_shutdown.clone(),
|
||||
max_http_request_size,
|
||||
trace_header_parser,
|
||||
trace_collector,
|
||||
)
|
||||
.fuse();
|
||||
info!("HTTP server listening");
|
||||
|
|
@ -381,7 +381,7 @@ mod tests {
|
|||
use super::*;
|
||||
use ::http::{header::HeaderName, HeaderValue};
|
||||
use data_types::{database_rules::DatabaseRules, DatabaseName};
|
||||
use influxdb_iox_client::connection::Connection;
|
||||
use influxdb_iox_client::{connection::Connection, flight::PerformQuery};
|
||||
use server::rules::ProvidedDatabaseRules;
|
||||
use std::{convert::TryInto, num::NonZeroU64};
|
||||
use structopt::StructOpt;
|
||||
|
|
@ -412,16 +412,9 @@ mod tests {
|
|||
let grpc_listener = grpc_listener(config.grpc_bind_address).await.unwrap();
|
||||
let http_listener = http_listener(config.grpc_bind_address).await.unwrap();
|
||||
|
||||
serve(
|
||||
config,
|
||||
application,
|
||||
grpc_listener,
|
||||
http_listener,
|
||||
None,
|
||||
server,
|
||||
)
|
||||
.await
|
||||
.unwrap()
|
||||
serve(config, application, grpc_listener, http_listener, server)
|
||||
.await
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
|
@ -430,7 +423,7 @@ mod tests {
|
|||
|
||||
// Create a server and wait for it to initialize
|
||||
let config = test_config(Some(23));
|
||||
let application = make_application(&config).await.unwrap();
|
||||
let application = make_application(&config, None).await.unwrap();
|
||||
let server = make_server(Arc::clone(&application), &config);
|
||||
server.wait_for_init().await.unwrap();
|
||||
|
||||
|
|
@ -458,7 +451,7 @@ mod tests {
|
|||
async fn test_server_shutdown_uninit() {
|
||||
// Create a server but don't set a server id
|
||||
let config = test_config(None);
|
||||
let application = make_application(&config).await.unwrap();
|
||||
let application = make_application(&config, None).await.unwrap();
|
||||
let server = make_server(Arc::clone(&application), &config);
|
||||
|
||||
let serve_fut = test_serve(config, application, Arc::clone(&server)).fuse();
|
||||
|
|
@ -489,7 +482,7 @@ mod tests {
|
|||
async fn test_server_panic() {
|
||||
// Create a server and wait for it to initialize
|
||||
let config = test_config(Some(999999999));
|
||||
let application = make_application(&config).await.unwrap();
|
||||
let application = make_application(&config, None).await.unwrap();
|
||||
let server = make_server(Arc::clone(&application), &config);
|
||||
server.wait_for_init().await.unwrap();
|
||||
|
||||
|
|
@ -516,7 +509,7 @@ mod tests {
|
|||
async fn test_database_panic() {
|
||||
// Create a server and wait for it to initialize
|
||||
let config = test_config(Some(23));
|
||||
let application = make_application(&config).await.unwrap();
|
||||
let application = make_application(&config, None).await.unwrap();
|
||||
let server = make_server(Arc::clone(&application), &config);
|
||||
server.wait_for_init().await.unwrap();
|
||||
|
||||
|
|
@ -597,7 +590,9 @@ mod tests {
|
|||
JoinHandle<Result<()>>,
|
||||
) {
|
||||
let config = test_config(Some(23));
|
||||
let application = make_application(&config).await.unwrap();
|
||||
let application = make_application(&config, Some(Arc::<T>::clone(collector)))
|
||||
.await
|
||||
.unwrap();
|
||||
let server = make_server(Arc::clone(&application), &config);
|
||||
server.wait_for_init().await.unwrap();
|
||||
|
||||
|
|
@ -611,7 +606,6 @@ mod tests {
|
|||
application,
|
||||
grpc_listener,
|
||||
http_listener,
|
||||
Some(Arc::<T>::clone(collector)),
|
||||
Arc::clone(&server),
|
||||
);
|
||||
|
||||
|
|
@ -690,6 +684,11 @@ mod tests {
|
|||
join.await.unwrap().unwrap();
|
||||
}
|
||||
|
||||
/// Ensure that query is fully executed.
|
||||
async fn consume_query(mut query: PerformQuery) {
|
||||
while query.next().await.unwrap().is_some() {}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_query_tracing() {
|
||||
let collector = Arc::new(RingBufferTraceCollector::new(100));
|
||||
|
|
@ -721,10 +720,13 @@ mod tests {
|
|||
.unwrap();
|
||||
|
||||
let mut flight = influxdb_iox_client::flight::Client::new(conn.clone());
|
||||
flight
|
||||
.perform_query(db_info.db_name(), "select * from cpu;")
|
||||
.await
|
||||
.unwrap();
|
||||
consume_query(
|
||||
flight
|
||||
.perform_query(db_info.db_name(), "select * from cpu;")
|
||||
.await
|
||||
.unwrap(),
|
||||
)
|
||||
.await;
|
||||
|
||||
flight
|
||||
.perform_query("nonexistent", "select * from cpu;")
|
||||
|
|
|
|||
|
|
@ -52,7 +52,6 @@ use std::{
|
|||
};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tower::Layer;
|
||||
use trace::TraceCollector;
|
||||
use trace_http::tower::TraceLayer;
|
||||
|
||||
/// Constants used in API error codes.
|
||||
|
|
@ -865,12 +864,12 @@ pub async fn serve<M>(
|
|||
shutdown: CancellationToken,
|
||||
max_request_size: usize,
|
||||
trace_header_parser: TraceHeaderParser,
|
||||
trace_collector: Option<Arc<dyn TraceCollector>>,
|
||||
) -> Result<(), hyper::Error>
|
||||
where
|
||||
M: ConnectionManager + Send + Sync + Debug + 'static,
|
||||
{
|
||||
let metric_registry = Arc::clone(application.metric_registry());
|
||||
let trace_collector = application.trace_collector().clone();
|
||||
|
||||
let trace_layer = TraceLayer::new(trace_header_parser, metric_registry, trace_collector, false);
|
||||
let lp_metrics = Arc::new(LineProtocolMetrics::new(
|
||||
|
|
@ -924,6 +923,7 @@ mod tests {
|
|||
Arc::new(ApplicationState::new(
|
||||
Arc::new(ObjectStore::new_in_memory()),
|
||||
None,
|
||||
None,
|
||||
))
|
||||
}
|
||||
|
||||
|
|
@ -939,7 +939,7 @@ mod tests {
|
|||
async fn test_health() {
|
||||
let application = make_application();
|
||||
let app_server = make_server(Arc::clone(&application));
|
||||
let server_url = test_server(application, Arc::clone(&app_server), None);
|
||||
let server_url = test_server(application, Arc::clone(&app_server));
|
||||
|
||||
let client = Client::new();
|
||||
let response = client.get(&format!("{}/health", server_url)).send().await;
|
||||
|
|
@ -958,7 +958,7 @@ mod tests {
|
|||
.register_metric("my_metric", "description");
|
||||
|
||||
let app_server = make_server(Arc::clone(&application));
|
||||
let server_url = test_server(application, Arc::clone(&app_server), None);
|
||||
let server_url = test_server(application, Arc::clone(&app_server));
|
||||
|
||||
metric.recorder(&[("tag", "value")]).inc(20);
|
||||
|
||||
|
|
@ -998,15 +998,15 @@ mod tests {
|
|||
|
||||
#[tokio::test]
|
||||
async fn test_tracing() {
|
||||
let application = make_application();
|
||||
let app_server = make_server(Arc::clone(&application));
|
||||
let trace_collector = Arc::new(RingBufferTraceCollector::new(5));
|
||||
|
||||
let server_url = test_server(
|
||||
application,
|
||||
Arc::clone(&app_server),
|
||||
let application = Arc::new(ApplicationState::new(
|
||||
Arc::new(ObjectStore::new_in_memory()),
|
||||
None,
|
||||
Some(Arc::<RingBufferTraceCollector>::clone(&trace_collector)),
|
||||
);
|
||||
));
|
||||
let app_server = make_server(Arc::clone(&application));
|
||||
|
||||
let server_url = test_server(application, Arc::clone(&app_server));
|
||||
|
||||
let client = Client::new();
|
||||
let response = client
|
||||
|
|
@ -1036,7 +1036,7 @@ mod tests {
|
|||
.create_database(make_rules("MyOrg_MyBucket"))
|
||||
.await
|
||||
.unwrap();
|
||||
let server_url = test_server(application, Arc::clone(&app_server), None);
|
||||
let server_url = test_server(application, Arc::clone(&app_server));
|
||||
|
||||
let client = Client::new();
|
||||
|
||||
|
|
@ -1083,7 +1083,7 @@ mod tests {
|
|||
.create_database(make_rules("MyOrg_MyBucket"))
|
||||
.await
|
||||
.unwrap();
|
||||
let server_url = test_server(application, Arc::clone(&app_server), None);
|
||||
let server_url = test_server(application, Arc::clone(&app_server));
|
||||
|
||||
// Set up client
|
||||
let client = Client::new();
|
||||
|
|
@ -1209,7 +1209,7 @@ mod tests {
|
|||
.await
|
||||
.unwrap();
|
||||
|
||||
let server_url = test_server(application, Arc::clone(&app_server), None);
|
||||
let server_url = test_server(application, Arc::clone(&app_server));
|
||||
|
||||
let client = Client::new();
|
||||
|
||||
|
|
@ -1399,7 +1399,7 @@ mod tests {
|
|||
.create_database(make_rules("MyOrg_MyBucket"))
|
||||
.await
|
||||
.unwrap();
|
||||
let server_url = test_server(application, Arc::clone(&app_server), None);
|
||||
let server_url = test_server(application, Arc::clone(&app_server));
|
||||
|
||||
let client = Client::new();
|
||||
|
||||
|
|
@ -1693,7 +1693,6 @@ mod tests {
|
|||
fn test_server(
|
||||
application: Arc<ApplicationState>,
|
||||
server: Arc<AppServer<ConnectionManagerImpl>>,
|
||||
trace_collector: Option<Arc<dyn TraceCollector>>,
|
||||
) -> String {
|
||||
// NB: specify port 0 to let the OS pick the port.
|
||||
let bind_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0);
|
||||
|
|
@ -1710,7 +1709,6 @@ mod tests {
|
|||
CancellationToken::new(),
|
||||
TEST_MAX_REQUEST_SIZE,
|
||||
trace_header_parser,
|
||||
trace_collector,
|
||||
));
|
||||
println!("Started server at {}", server_url);
|
||||
server_url
|
||||
|
|
@ -1734,7 +1732,7 @@ mod tests {
|
|||
.create_database(make_rules("MyOrg_MyBucket"))
|
||||
.await
|
||||
.unwrap();
|
||||
let server_url = test_server(application, Arc::clone(&app_server), None);
|
||||
let server_url = test_server(application, Arc::clone(&app_server));
|
||||
|
||||
(app_server, server_url)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -7,11 +7,7 @@ use query::{
|
|||
exec::IOxExecutionContext,
|
||||
frontend::{influxrpc::InfluxRpcPlanner, sql::SqlQueryPlanner},
|
||||
group_by::{Aggregate, WindowDuration},
|
||||
plan::{
|
||||
fieldlist::FieldListPlan,
|
||||
seriesset::SeriesSetPlans,
|
||||
stringset::{StringSetPlan, TableNamePlanBuilder},
|
||||
},
|
||||
plan::{fieldlist::FieldListPlan, seriesset::SeriesSetPlans, stringset::StringSetPlan},
|
||||
QueryDatabase,
|
||||
};
|
||||
|
||||
|
|
@ -54,7 +50,7 @@ impl Planner {
|
|||
&self,
|
||||
database: Arc<D>,
|
||||
predicate: Predicate,
|
||||
) -> Result<TableNamePlanBuilder>
|
||||
) -> Result<StringSetPlan>
|
||||
where
|
||||
D: QueryDatabase + 'static,
|
||||
{
|
||||
|
|
|
|||
|
|
@ -11,7 +11,6 @@ use trace_http::ctx::TraceHeaderParser;
|
|||
|
||||
use crate::influxdb_ioxd::serving_readiness::ServingReadiness;
|
||||
use server::{connection::ConnectionManager, ApplicationState, Server};
|
||||
use trace::TraceCollector;
|
||||
|
||||
pub mod error;
|
||||
mod flight;
|
||||
|
|
@ -90,7 +89,6 @@ pub async fn serve<M>(
|
|||
application: Arc<ApplicationState>,
|
||||
server: Arc<Server<M>>,
|
||||
trace_header_parser: TraceHeaderParser,
|
||||
trace_collector: Option<Arc<dyn TraceCollector>>,
|
||||
shutdown: CancellationToken,
|
||||
serving_readiness: ServingReadiness,
|
||||
) -> Result<()>
|
||||
|
|
@ -109,7 +107,7 @@ where
|
|||
let mut builder = builder.layer(trace_http::tower::TraceLayer::new(
|
||||
trace_header_parser,
|
||||
Arc::clone(application.metric_registry()),
|
||||
trace_collector,
|
||||
application.trace_collector().clone(),
|
||||
true,
|
||||
));
|
||||
|
||||
|
|
|
|||
|
|
@ -724,14 +724,14 @@ where
|
|||
let db = db_store.db(db_name).context(DatabaseNotFound { db_name })?;
|
||||
let ctx = db.new_query_context(span_ctx);
|
||||
|
||||
let builder = Planner::new(&ctx)
|
||||
let plan = Planner::new(&ctx)
|
||||
.table_names(db, predicate)
|
||||
.await
|
||||
.map_err(|e| Box::new(e) as _)
|
||||
.context(ListingTables { db_name })?;
|
||||
|
||||
let table_names = ctx
|
||||
.to_table_names(builder)
|
||||
.to_string_set(plan)
|
||||
.await
|
||||
.map_err(|e| Box::new(e) as _)
|
||||
.context(ListingTables { db_name })?;
|
||||
|
|
@ -1116,11 +1116,11 @@ mod tests {
|
|||
|
||||
let chunk0 = TestChunk::new("h2o")
|
||||
.with_id(0)
|
||||
.with_predicate_match(PredicateMatch::AtLeastOne);
|
||||
.with_predicate_match(PredicateMatch::AtLeastOneNonNullField);
|
||||
|
||||
let chunk1 = TestChunk::new("o2")
|
||||
.with_id(1)
|
||||
.with_predicate_match(PredicateMatch::AtLeastOne);
|
||||
.with_predicate_match(PredicateMatch::AtLeastOneNonNullField);
|
||||
|
||||
fixture
|
||||
.test_storage
|
||||
|
|
@ -1474,7 +1474,8 @@ mod tests {
|
|||
tag_key: [0].into(),
|
||||
};
|
||||
|
||||
let chunk = TestChunk::new("h2o").with_predicate_match(PredicateMatch::AtLeastOne);
|
||||
let chunk =
|
||||
TestChunk::new("h2o").with_predicate_match(PredicateMatch::AtLeastOneNonNullField);
|
||||
|
||||
fixture
|
||||
.test_storage
|
||||
|
|
|
|||
|
|
@ -2,11 +2,11 @@
|
|||
name = "test_helpers"
|
||||
version = "0.1.0"
|
||||
authors = ["Paul Dix <paul@pauldix.net>"]
|
||||
edition = "2018"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies] # In alphabetical order
|
||||
dotenv = "0.15.0"
|
||||
parking_lot = "0.11.2"
|
||||
tempfile = "3.1.0"
|
||||
tracing-subscriber = "0.2"
|
||||
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
|
||||
observability_deps = { path = "../observability_deps" }
|
||||
|
|
|
|||
|
|
@ -292,7 +292,7 @@ struct TestServer {
|
|||
}
|
||||
|
||||
// Options for creating test servers
|
||||
#[derive(Default, Debug)]
|
||||
#[derive(Default, Debug, Clone)]
|
||||
pub struct TestConfig {
|
||||
/// Additional environment variables
|
||||
env: Vec<(String, String)>,
|
||||
|
|
|
|||
|
|
@ -1,5 +1,8 @@
|
|||
use crate::{
|
||||
common::server_fixture::ServerFixture,
|
||||
common::{
|
||||
server_fixture::{ServerFixture, TestConfig},
|
||||
udp_listener::UdpCapture,
|
||||
},
|
||||
end_to_end_cases::scenario::{rand_name, DatabaseBuilder},
|
||||
};
|
||||
use arrow_util::assert_batches_sorted_eq;
|
||||
|
|
@ -17,6 +20,7 @@ use rdkafka::{
|
|||
ClientConfig, Message, Offset, TopicPartitionList,
|
||||
};
|
||||
use std::convert::TryFrom;
|
||||
use tempfile::TempDir;
|
||||
use test_helpers::assert_contains;
|
||||
use write_buffer::{kafka::test_utils::kafka_sequencer_options, maybe_skip_kafka_integration};
|
||||
|
||||
|
|
@ -325,3 +329,89 @@ async fn test_create_database_missing_write_buffer_sequencers() {
|
|||
&err
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
pub async fn test_cross_write_buffer_tracing() {
|
||||
let write_buffer_dir = TempDir::new().unwrap();
|
||||
|
||||
// setup tracing
|
||||
let udp_capture = UdpCapture::new().await;
|
||||
let test_config = TestConfig::new()
|
||||
.with_env("TRACES_EXPORTER", "jaeger")
|
||||
.with_env("TRACES_EXPORTER_JAEGER_AGENT_HOST", udp_capture.ip())
|
||||
.with_env("TRACES_EXPORTER_JAEGER_AGENT_PORT", udp_capture.port())
|
||||
.with_client_header("jaeger-debug-id", "some-debug-id");
|
||||
|
||||
// we need to use two servers but the same DB name here because the Kafka topic is named after the DB name
|
||||
let db_name = rand_name();
|
||||
|
||||
// create producer server
|
||||
let server_write = ServerFixture::create_single_use_with_config(test_config.clone()).await;
|
||||
server_write
|
||||
.management_client()
|
||||
.update_server_id(1)
|
||||
.await
|
||||
.unwrap();
|
||||
server_write.wait_server_initialized().await;
|
||||
let conn_write = WriteBufferConnection {
|
||||
direction: WriteBufferDirection::Write.into(),
|
||||
r#type: "file".to_string(),
|
||||
connection: write_buffer_dir.path().display().to_string(),
|
||||
creation_config: Some(WriteBufferCreationConfig {
|
||||
n_sequencers: 1,
|
||||
options: kafka_sequencer_options(),
|
||||
}),
|
||||
..Default::default()
|
||||
};
|
||||
DatabaseBuilder::new(db_name.clone())
|
||||
.write_buffer(conn_write.clone())
|
||||
.build(server_write.grpc_channel())
|
||||
.await;
|
||||
|
||||
// create consumer DB
|
||||
let server_read = ServerFixture::create_single_use_with_config(test_config).await;
|
||||
server_read
|
||||
.management_client()
|
||||
.update_server_id(2)
|
||||
.await
|
||||
.unwrap();
|
||||
server_read.wait_server_initialized().await;
|
||||
let conn_read = WriteBufferConnection {
|
||||
direction: WriteBufferDirection::Read.into(),
|
||||
..conn_write
|
||||
};
|
||||
DatabaseBuilder::new(db_name.clone())
|
||||
.write_buffer(conn_read)
|
||||
.build(server_read.grpc_channel())
|
||||
.await;
|
||||
|
||||
// write some points
|
||||
let mut write_client = server_write.write_client();
|
||||
let lp_lines = [
|
||||
"cpu,region=west user=23.2 100",
|
||||
"cpu,region=west user=21.0 150",
|
||||
"disk,region=east bytes=99i 200",
|
||||
];
|
||||
let num_lines_written = write_client
|
||||
.write(&db_name, lp_lines.join("\n"))
|
||||
.await
|
||||
.expect("cannot write");
|
||||
assert_eq!(num_lines_written, 3);
|
||||
|
||||
// "shallow" packet inspection and verify the UDP server got
|
||||
// something that had some expected results (maybe we could
|
||||
// eventually verify the payload here too)
|
||||
udp_capture
|
||||
.wait_for(|m| m.to_string().contains("IOx write buffer"))
|
||||
.await;
|
||||
udp_capture
|
||||
.wait_for(|m| m.to_string().contains("stored entry"))
|
||||
.await;
|
||||
|
||||
// // debugging assistance
|
||||
// tokio::time::sleep(std::time::Duration::from_secs(10)).await;
|
||||
// println!("Traces received (1):\n\n{:#?}", udp_capture.messages());
|
||||
|
||||
// wait for the UDP server to shutdown
|
||||
udp_capture.stop().await
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,7 +1,7 @@
|
|||
[package]
|
||||
name = "time"
|
||||
version = "0.1.0"
|
||||
edition = "2018"
|
||||
edition = "2021"
|
||||
description = "Time functionality for IOx"
|
||||
|
||||
[dependencies]
|
||||
|
|
|
|||
|
|
@ -2,7 +2,7 @@
|
|||
name = "trace"
|
||||
version = "0.1.0"
|
||||
authors = ["Raphael Taylor-Davies <r.taylordavies@googlemail.com>"]
|
||||
edition = "2018"
|
||||
edition = "2021"
|
||||
description = "Distributed tracing support within IOx"
|
||||
|
||||
[dependencies]
|
||||
|
|
|
|||
|
|
@ -2,7 +2,7 @@
|
|||
name = "trace_exporters"
|
||||
version = "0.1.0"
|
||||
authors = ["Raphael Taylor-Davies <r.taylordavies@googlemail.com>"]
|
||||
edition = "2018"
|
||||
edition = "2021"
|
||||
description = "Additional tracing exporters for IOx"
|
||||
|
||||
[dependencies]
|
||||
|
|
|
|||
|
|
@ -2,7 +2,7 @@
|
|||
name = "trace_http"
|
||||
version = "0.1.0"
|
||||
authors = ["Raphael Taylor-Davies <r.taylordavies@googlemail.com>"]
|
||||
edition = "2018"
|
||||
edition = "2021"
|
||||
description = "Distributed tracing support for HTTP services"
|
||||
|
||||
[dependencies]
|
||||
|
|
|
|||
|
|
@ -2,7 +2,7 @@
|
|||
name = "tracker"
|
||||
version = "0.1.0"
|
||||
authors = ["Raphael Taylor-Davies <r.taylordavies@googlemail.com>"]
|
||||
edition = "2018"
|
||||
edition = "2021"
|
||||
description = "Utilities for tracking resource utilisation within IOx"
|
||||
|
||||
[dependencies]
|
||||
|
|
|
|||
|
|
@ -2,7 +2,7 @@
|
|||
name = "trogging"
|
||||
version = "0.1.0"
|
||||
authors = ["Marko Mikulicic <mkm@influxdata.com>"]
|
||||
edition = "2018"
|
||||
edition = "2021"
|
||||
description = "IOx logging pipeline built upon tokio-tracing"
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
|
@ -12,7 +12,7 @@ logfmt = { path = "../logfmt" }
|
|||
observability_deps = { path = "../observability_deps" }
|
||||
thiserror = "1.0.30"
|
||||
tracing-log = "0.1"
|
||||
tracing-subscriber = "0.2"
|
||||
tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] }
|
||||
structopt = { version = "0.3.25", optional = true }
|
||||
|
||||
[dev-dependencies]
|
||||
|
|
|
|||
|
|
@ -105,7 +105,7 @@ impl LoggingConfig {
|
|||
|
||||
pub fn with_builder<W>(&self, builder: Builder<W>) -> Builder<BoxMakeWriter>
|
||||
where
|
||||
W: MakeWriter + Send + Sync + Clone + 'static,
|
||||
W: for<'writer> MakeWriter<'writer> + Send + Sync + Clone + 'static,
|
||||
{
|
||||
builder
|
||||
.with_log_filter(&self.log_filter)
|
||||
|
|
@ -129,7 +129,7 @@ pub trait LoggingConfigBuilderExt {
|
|||
|
||||
impl<W> LoggingConfigBuilderExt for Builder<W>
|
||||
where
|
||||
W: MakeWriter + Send + Sync + Clone + 'static,
|
||||
W: for<'writer> MakeWriter<'writer> + Send + Sync + Clone + 'static,
|
||||
{
|
||||
fn with_logging_config(self, config: &LoggingConfig) -> Builder<BoxMakeWriter> {
|
||||
config.with_builder(self)
|
||||
|
|
|
|||
|
|
@ -86,7 +86,7 @@ impl Builder {
|
|||
impl<W> Builder<W> {
|
||||
pub fn with_writer<W2>(self, make_writer: W2) -> Builder<W2>
|
||||
where
|
||||
W2: MakeWriter + Send + Sync + 'static,
|
||||
W2: for<'writer> MakeWriter<'writer> + Send + Sync + 'static,
|
||||
{
|
||||
Builder::<W2> {
|
||||
make_writer,
|
||||
|
|
@ -103,7 +103,7 @@ impl<W> Builder<W> {
|
|||
// This needs to be a separate impl block because they place different bounds on the type parameters.
|
||||
impl<W> Builder<W>
|
||||
where
|
||||
W: MakeWriter + Send + Sync + 'static,
|
||||
W: for<'writer> MakeWriter<'writer> + Send + Sync + 'static,
|
||||
{
|
||||
pub const DEFAULT_LOG_FILTER: &'static str = "warn";
|
||||
|
||||
|
|
@ -277,17 +277,30 @@ impl Drop for TroggingGuard {
|
|||
|
||||
fn make_writer<M>(m: M) -> BoxMakeWriter
|
||||
where
|
||||
M: MakeWriter + Send + Sync + 'static,
|
||||
M: for<'writer> MakeWriter<'writer> + Send + Sync + 'static,
|
||||
{
|
||||
fmt::writer::BoxMakeWriter::new(move || {
|
||||
std::io::LineWriter::with_capacity(
|
||||
MAX_LINE_LENGTH,
|
||||
LimitedWriter(MAX_LINE_LENGTH, m.make_writer()),
|
||||
)
|
||||
BoxMakeWriter::new(MakeWriterHelper {
|
||||
inner: BoxMakeWriter::new(m),
|
||||
})
|
||||
}
|
||||
|
||||
struct MakeWriterHelper {
|
||||
inner: BoxMakeWriter,
|
||||
}
|
||||
|
||||
impl<'a> MakeWriter<'a> for MakeWriterHelper {
|
||||
type Writer = Box<dyn Write + 'a>;
|
||||
|
||||
fn make_writer(&'a self) -> Self::Writer {
|
||||
Box::new(std::io::LineWriter::with_capacity(
|
||||
MAX_LINE_LENGTH,
|
||||
LimitedWriter(MAX_LINE_LENGTH, self.inner.make_writer()),
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
struct LimitedWriter<W: Write>(usize, W);
|
||||
|
||||
impl<W: Write> Write for LimitedWriter<W> {
|
||||
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
|
||||
if buf.is_empty() {
|
||||
|
|
@ -341,7 +354,7 @@ pub mod test_util {
|
|||
}
|
||||
}
|
||||
|
||||
impl MakeWriter for TestWriter {
|
||||
impl MakeWriter<'_> for TestWriter {
|
||||
type Writer = SynchronizedWriter<Vec<u8>>;
|
||||
|
||||
fn make_writer(&self) -> Self::Writer {
|
||||
|
|
@ -356,9 +369,9 @@ pub mod test_util {
|
|||
/// Removes non-determinism by removing timestamps from the log lines.
|
||||
/// It supports the built-in tracing timestamp format and the logfmt timestamps.
|
||||
pub fn without_timestamps(&self) -> String {
|
||||
// logfmt or fmt::layer() time format
|
||||
// logfmt (e.g. `time=12345`) or fmt::layer() (e.g. `2021-10-25T13:48:50.555258`) time format
|
||||
let timestamp = regex::Regex::new(
|
||||
r"(?m)( ?time=[0-9]+|^([A-Z][a-z]{2}) \d{1,2} \d{2}:\d{2}:\d{2}.\d{3} *)",
|
||||
r"(?m)( ?time=[0-9]+|^(\d{4})-\d{1,2}-\d{1,2}T\d{2}:\d{2}:\d{2}.\d+Z *)",
|
||||
)
|
||||
.unwrap();
|
||||
timestamp.replace_all(&self.to_string(), "").to_string()
|
||||
|
|
@ -379,7 +392,7 @@ pub mod test_util {
|
|||
/// the logging macros invoked by the function.
|
||||
pub fn log_test<W, F>(builder: Builder<W>, f: F) -> Captured
|
||||
where
|
||||
W: MakeWriter + Send + Sync + 'static,
|
||||
W: for<'writer> MakeWriter<'writer> + Send + Sync + 'static,
|
||||
F: Fn(),
|
||||
{
|
||||
let (writer, output) = TestWriter::new();
|
||||
|
|
@ -401,7 +414,7 @@ pub mod test_util {
|
|||
/// and returns the captured output.
|
||||
pub fn simple_test<W>(builder: Builder<W>) -> Captured
|
||||
where
|
||||
W: MakeWriter + Send + Sync + 'static,
|
||||
W: for<'writer> MakeWriter<'writer> + Send + Sync + 'static,
|
||||
{
|
||||
log_test(builder, || {
|
||||
error!("foo");
|
||||
|
|
@ -598,7 +611,8 @@ ERROR foo
|
|||
#[test]
|
||||
fn line_buffering() {
|
||||
let (test_writer, captured) = TestWriter::new();
|
||||
let mut writer = make_writer(test_writer).make_writer();
|
||||
let mw = make_writer(test_writer);
|
||||
let mut writer = mw.make_writer();
|
||||
writer.write_all("foo".as_bytes()).unwrap();
|
||||
// wasn't flushed yet because there was no newline yet
|
||||
assert_eq!(captured.to_string(), "");
|
||||
|
|
@ -611,7 +625,8 @@ ERROR foo
|
|||
|
||||
// another case when the line buffer flushes even before a newline is when the internal buffer limit
|
||||
let (test_writer, captured) = TestWriter::new();
|
||||
let mut writer = make_writer(test_writer).make_writer();
|
||||
let mw = make_writer(test_writer);
|
||||
let mut writer = mw.make_writer();
|
||||
let long = std::iter::repeat(b'X')
|
||||
.take(MAX_LINE_LENGTH)
|
||||
.collect::<Vec<u8>>();
|
||||
|
|
|
|||
|
|
@ -1,7 +1,7 @@
|
|||
[package]
|
||||
name = "write_buffer"
|
||||
version = "0.1.0"
|
||||
edition = "2018"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
async-trait = "0.1"
|
||||
|
|
|
|||
Loading…
Reference in New Issue