diff --git a/Cargo.lock b/Cargo.lock index 12a2d2173b..407e91a546 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -283,9 +283,9 @@ dependencies = [ [[package]] name = "backtrace" -version = "0.3.61" +version = "0.3.62" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e7a905d892734eea339e896738c14b9afce22b5318f64b951e70bf3844419b01" +checksum = "091bcdf2da9950f96aa522681ce805e6857f6ca8df73833d35736ab2dc78e152" dependencies = [ "addr2line", "cc", @@ -1429,9 +1429,9 @@ dependencies = [ [[package]] name = "http-body" -version = "0.4.3" +version = "0.4.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "399c583b2979440c60be0821a6199eca73bc3c8dcd9d070d75ac726e2c6186e5" +checksum = "1ff4f84919677303da5f147645dbea6b1881f368d03ac84e1dc09031ebd7b2c6" dependencies = [ "bytes", "http", @@ -1458,9 +1458,9 @@ checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" [[package]] name = "hyper" -version = "0.14.13" +version = "0.14.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "15d1cfb9e4f68655fa04c01f59edb405b6074a0f7118ea881e5026e4a1cd8593" +checksum = "2b91bb1f221b6ea1f1e4371216b70f40748774c2fb5971b450c07773fb92d26b" dependencies = [ "bytes", "futures-channel", @@ -1931,9 +1931,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.104" +version = "0.2.105" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b2f96d100e1cf1929e7719b7edb3b90ab5298072638fccd77be9ce942ecdfce" +checksum = "869d572136620d55835903746bcb5cdc54cb2851fd0aeec53220b4bb65ef3013" [[package]] name = "libloading" @@ -2035,9 +2035,9 @@ checksum = "3e2e65a1a2e43cfcb47a895c4c8b10d1f4a61097f9f254f183aee60cad9c651d" [[package]] name = "matchers" -version = "0.0.1" +version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f099785f7595cc4b4553a174ce30dd7589ef93391ff414dbb67f62392b9e0ce1" +checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558" dependencies = [ "regex-automata", ] @@ -2219,6 +2219,7 @@ dependencies = [ "bytes", "criterion", "flate2", + "hashbrown", "influxdb_line_protocol", "mutable_batch", "schema", @@ -2504,9 +2505,9 @@ dependencies = [ [[package]] name = "object" -version = "0.26.2" +version = "0.27.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "39f37e50073ccad23b6d09bcb5b263f4e76d3bb6038e4a3c08e52162ffa8abc2" +checksum = "67ac1d3f9a1d3616fd9a60c8d74296f22406a238b6a72f5cc1e6f314df4ffbf9" dependencies = [ "memchr", ] @@ -4518,9 +4519,9 @@ dependencies = [ [[package]] name = "tower" -version = "0.4.9" +version = "0.4.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d15a6b60cdff0cb039d81d3b37f8bc3d7e53dca09069aae3ef2502ca4834fe30" +checksum = "c00e500fff5fa1131c866b246041a6bf96da9c965f8fe4128cb1421f23e93c00" dependencies = [ "futures-core", "futures-util", @@ -4658,12 +4659,11 @@ dependencies = [ [[package]] name = "tracing-subscriber" -version = "0.2.25" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0e0d2eaa99c3c2e41547cfa109e910a68ea03823cccad4a0525dcbc9b01e8c71" +checksum = "5cf865b5ddc38e503a29c41c4843e616a73028ae18c637bc3eb2afaef4909c84" dependencies = [ "ansi_term 0.12.1", - "chrono", "lazy_static", "matchers", "regex", diff --git a/Cargo.toml b/Cargo.toml index 2351e44efc..514415522e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,7 +2,7 @@ name = "influxdb_iox" version = "0.1.0" authors = ["Paul Dix "] -edition = "2018" +edition = "2021" default-run = "influxdb_iox" readme = "README.md" @@ -94,7 +94,7 @@ data_types = { path = "data_types" } entry = { path = "entry" } generated_types = { path = "generated_types" } -influxdb_iox_client = { path = "influxdb_iox_client", features = ["format"] } +influxdb_iox_client = { path = "influxdb_iox_client", features = ["flight", "format"] } influxdb_line_protocol = { path = "influxdb_line_protocol" } internal_types = { path = "internal_types" } iox_object_store = { path = "iox_object_store" } diff --git a/arrow_util/Cargo.toml b/arrow_util/Cargo.toml index 0aa118f64b..e6682746a2 100644 --- a/arrow_util/Cargo.toml +++ b/arrow_util/Cargo.toml @@ -2,7 +2,7 @@ name = "arrow_util" version = "0.1.0" authors = ["Andrew Lamb "] -edition = "2018" +edition = "2021" description = "Apache Arrow utilities" [dependencies] diff --git a/arrow_util/src/bitset.rs b/arrow_util/src/bitset.rs index 70c212b529..08b4559c4a 100644 --- a/arrow_util/src/bitset.rs +++ b/arrow_util/src/bitset.rs @@ -29,6 +29,12 @@ impl BitSet { bitset } + /// Reserve space for `count` further bits + pub fn reserve(&mut self, count: usize) { + let new_buf_len = (self.len + count + 7) >> 3; + self.buffer.reserve(new_buf_len); + } + /// Appends `count` unset bits pub fn append_unset(&mut self, count: usize) { self.len += count; diff --git a/client_util/Cargo.toml b/client_util/Cargo.toml index b61f69e75e..af6f4a1b16 100644 --- a/client_util/Cargo.toml +++ b/client_util/Cargo.toml @@ -3,7 +3,7 @@ name = "client_util" version = "0.1.0" authors = ["Raphael Taylor-Davies "] description = "Shared code for IOx clients" -edition = "2018" +edition = "2021" [dependencies] http = "0.2.3" @@ -13,4 +13,4 @@ tonic = { version = "0.5.0" } tower = "0.4" [dev-dependencies] -tokio = { version = "1.11", features = ["macros"] } \ No newline at end of file +tokio = { version = "1.11", features = ["macros"] } diff --git a/data_types/Cargo.toml b/data_types/Cargo.toml index 0a729fbc25..eee2b2c9ff 100644 --- a/data_types/Cargo.toml +++ b/data_types/Cargo.toml @@ -3,7 +3,7 @@ name = "data_types" version = "0.1.0" authors = ["pauldix "] description = "InfluxDB IOx data_types, shared between IOx instances and IOx clients" -edition = "2018" +edition = "2021" readme = "README.md" [dependencies] # In alphabetical order diff --git a/datafusion/Cargo.toml b/datafusion/Cargo.toml index 66af425cea..7326fa3728 100644 --- a/datafusion/Cargo.toml +++ b/datafusion/Cargo.toml @@ -2,7 +2,7 @@ name = "datafusion" version = "0.1.0" authors = ["Andrew Lamb "] -edition = "2018" +edition = "2021" description = "Re-exports datafusion at a specific version" [dependencies] diff --git a/datafusion_util/Cargo.toml b/datafusion_util/Cargo.toml index 2a924f472a..4d3288bb71 100644 --- a/datafusion_util/Cargo.toml +++ b/datafusion_util/Cargo.toml @@ -2,7 +2,7 @@ name = "datafusion_util" version = "0.1.0" authors = ["Andrew Lamb "] -edition = "2018" +edition = "2021" description = "Datafusion utilities" [dependencies] diff --git a/entry/Cargo.toml b/entry/Cargo.toml index 459f6a59ae..6450bbda2f 100644 --- a/entry/Cargo.toml +++ b/entry/Cargo.toml @@ -2,7 +2,7 @@ name = "entry" version = "0.1.0" authors = ["Paul Dix "] -edition = "2018" +edition = "2021" description = "The entry format used by the write buffer" [dependencies] diff --git a/generated_types/Cargo.toml b/generated_types/Cargo.toml index 48400b3859..2bc44c8970 100644 --- a/generated_types/Cargo.toml +++ b/generated_types/Cargo.toml @@ -2,7 +2,7 @@ name = "generated_types" version = "0.1.0" authors = ["Paul Dix "] -edition = "2018" +edition = "2021" [dependencies] # In alphabetical order bytes = "1.0" diff --git a/grpc-router/Cargo.toml b/grpc-router/Cargo.toml index 9cfc2102bd..4d832b9b73 100644 --- a/grpc-router/Cargo.toml +++ b/grpc-router/Cargo.toml @@ -2,7 +2,7 @@ name = "grpc-router" version = "0.1.0" authors = ["Marko Mikulicic "] -edition = "2018" +edition = "2021" [dependencies] bytes = "1.0" diff --git a/grpc-router/grpc-router-test-gen/Cargo.toml b/grpc-router/grpc-router-test-gen/Cargo.toml index d3546ba504..f23a1591c1 100644 --- a/grpc-router/grpc-router-test-gen/Cargo.toml +++ b/grpc-router/grpc-router-test-gen/Cargo.toml @@ -2,7 +2,7 @@ name = "grpc-router-test-gen" version = "0.1.0" authors = ["Marko Mikulicic "] -edition = "2018" +edition = "2021" description = "Protobuf used in test for the grpc-router crate; need to be in a separate create because of linter limitations" [dependencies] diff --git a/influxdb2_client/Cargo.toml b/influxdb2_client/Cargo.toml index a475cf5989..0824082817 100644 --- a/influxdb2_client/Cargo.toml +++ b/influxdb2_client/Cargo.toml @@ -2,7 +2,7 @@ name = "influxdb2_client" version = "0.1.0" authors = ["Paul Dix "] -edition = "2018" +edition = "2021" [dependencies] # In alphabetical order bytes = "1.0" diff --git a/influxdb_iox_client/Cargo.toml b/influxdb_iox_client/Cargo.toml index e43f0cbe54..e4a5d4dd18 100644 --- a/influxdb_iox_client/Cargo.toml +++ b/influxdb_iox_client/Cargo.toml @@ -2,11 +2,11 @@ name = "influxdb_iox_client" version = "0.1.0" authors = ["Dom Dwyer "] -edition = "2018" +edition = "2021" [features] flight = ["arrow", "arrow-flight", "arrow_util", "serde/derive", "serde_json", "futures-util"] -format = ["arrow"] +format = ["arrow", "arrow_util"] [dependencies] # Workspace dependencies, in alphabetical order diff --git a/influxdb_line_protocol/Cargo.toml b/influxdb_line_protocol/Cargo.toml index b2a194c86b..096cd3f997 100644 --- a/influxdb_line_protocol/Cargo.toml +++ b/influxdb_line_protocol/Cargo.toml @@ -2,7 +2,7 @@ name = "influxdb_line_protocol" version = "0.1.0" authors = ["Paul Dix "] -edition = "2018" +edition = "2021" [dependencies] # In alphabetical order nom = "7" @@ -11,4 +11,4 @@ snafu = "0.6.2" observability_deps = { path = "../observability_deps" } [dev-dependencies] # In alphabetical order -test_helpers = { path = "../test_helpers" } \ No newline at end of file +test_helpers = { path = "../test_helpers" } diff --git a/influxdb_storage_client/Cargo.toml b/influxdb_storage_client/Cargo.toml index 77e949c8b8..ad0742e840 100644 --- a/influxdb_storage_client/Cargo.toml +++ b/influxdb_storage_client/Cargo.toml @@ -2,7 +2,7 @@ name = "influxdb_storage_client" version = "0.1.0" authors = ["Raphael Taylor-Davies "] -edition = "2018" +edition = "2021" [dependencies] client_util = { path = "../client_util" } @@ -11,4 +11,4 @@ prost = "0.8" tonic = { version = "0.5.0" } futures-util = { version = "0.3.1" } -[dev-dependencies] \ No newline at end of file +[dev-dependencies] diff --git a/influxdb_tsm/Cargo.toml b/influxdb_tsm/Cargo.toml index 200cb8a50f..79e64c3676 100644 --- a/influxdb_tsm/Cargo.toml +++ b/influxdb_tsm/Cargo.toml @@ -2,7 +2,7 @@ name = "influxdb_tsm" version = "0.1.0" authors = ["Edd Robinson "] -edition = "2018" +edition = "2021" [dependencies] # In alphabetical order integer-encoding = "3.0.2" diff --git a/internal_types/Cargo.toml b/internal_types/Cargo.toml index 9d44a6c55e..a4a010e16a 100644 --- a/internal_types/Cargo.toml +++ b/internal_types/Cargo.toml @@ -2,7 +2,7 @@ name = "internal_types" version = "0.1.0" authors = ["Andrew Lamb "] -edition = "2018" +edition = "2021" description = "InfluxDB IOx internal types, shared between IOx instances" readme = "README.md" diff --git a/iox_data_generator/Cargo.toml b/iox_data_generator/Cargo.toml index 393ac1a3b1..ba0d13e1cd 100644 --- a/iox_data_generator/Cargo.toml +++ b/iox_data_generator/Cargo.toml @@ -2,7 +2,7 @@ name = "iox_data_generator" version = "0.1.0" authors = ["Paul Dix "] -edition = "2018" +edition = "2021" default-run = "iox_data_generator" [dependencies] @@ -26,7 +26,7 @@ snafu = "0.6.8" tokio = { version = "1.11", features = ["macros", "rt-multi-thread"] } toml = "0.5.6" tracing = "0.1" -tracing-subscriber = "0.2.25" +tracing-subscriber = "0.3.0" uuid = { version = "0.8.1", default_features = false } [dev-dependencies] diff --git a/iox_object_store/Cargo.toml b/iox_object_store/Cargo.toml index d98b82d208..de6a42e660 100644 --- a/iox_object_store/Cargo.toml +++ b/iox_object_store/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "iox_object_store" version = "0.1.0" -edition = "2018" +edition = "2021" description = "IOx-specific semantics wrapping the general-purpose object store crate" [dependencies] diff --git a/lifecycle/Cargo.toml b/lifecycle/Cargo.toml index 8c827855ef..0f61c025e7 100644 --- a/lifecycle/Cargo.toml +++ b/lifecycle/Cargo.toml @@ -2,7 +2,7 @@ name = "lifecycle" version = "0.1.0" authors = ["Raphael Taylor-Davies "] -edition = "2018" +edition = "2021" description = "Implements the IOx data lifecycle" [dependencies] diff --git a/logfmt/Cargo.toml b/logfmt/Cargo.toml index 7118e276ce..3c50fcfb47 100644 --- a/logfmt/Cargo.toml +++ b/logfmt/Cargo.toml @@ -3,13 +3,14 @@ name = "logfmt" version = "0.1.0" authors = ["Andrew Lamb "] description = "tracing_subscriber layer for writing out logfmt formatted events" -edition = "2018" +edition = "2021" [dependencies] # In alphabetical order observability_deps = { path = "../observability_deps" } -tracing-subscriber = "0.2" +tracing-subscriber = "0.3" [dev-dependencies] # In alphabetical order once_cell = { version = "1.4.0", features = ["parking_lot"] } parking_lot = "0.11.2" regex = "1.4.3" +tracing-subscriber = { version = "0.3", features = ["env-filter"] } diff --git a/logfmt/src/lib.rs b/logfmt/src/lib.rs index ee95907761..4b1de785f6 100644 --- a/logfmt/src/lib.rs +++ b/logfmt/src/lib.rs @@ -19,12 +19,18 @@ use tracing_subscriber::{fmt::MakeWriter, layer::Context, registry::LookupSpan, /// looked very small and did not (obviously) work with the tracing subscriber /// /// [logfmt]: https://brandur.org/logfmt -pub struct LogFmtLayer { +pub struct LogFmtLayer +where + W: for<'writer> MakeWriter<'writer>, +{ writer: W, display_target: bool, } -impl LogFmtLayer { +impl LogFmtLayer +where + W: for<'writer> MakeWriter<'writer>, +{ /// Create a new logfmt Layer to pass into tracing_subscriber /// /// Note this layer simply formats and writes to the specified writer. It @@ -68,7 +74,7 @@ impl LogFmtLayer { impl Layer for LogFmtLayer where - W: MakeWriter + 'static, + W: for<'writer> MakeWriter<'writer> + 'static, S: Subscriber + for<'a> LookupSpan<'a>, { fn register_callsite( @@ -78,7 +84,7 @@ where Interest::always() } - fn new_span(&self, attrs: &tracing::span::Attributes<'_>, id: &Id, ctx: Context<'_, S>) { + fn on_new_span(&self, attrs: &tracing::span::Attributes<'_>, id: &Id, ctx: Context<'_, S>) { let writer = self.writer.make_writer(); let metadata = ctx.metadata(id).expect("span should have metadata"); let mut p = FieldPrinter::new(writer, metadata.level(), self.display_target); diff --git a/logfmt/tests/logging.rs b/logfmt/tests/logging.rs index d4f6d04d9c..44d556e384 100644 --- a/logfmt/tests/logging.rs +++ b/logfmt/tests/logging.rs @@ -363,7 +363,7 @@ impl std::io::Write for CapturedWriter { } } -impl MakeWriter for CapturedWriter { +impl MakeWriter<'_> for CapturedWriter { type Writer = Self; fn make_writer(&self) -> Self::Writer { diff --git a/metric/Cargo.toml b/metric/Cargo.toml index 787d25d2f7..f0e24ca25e 100644 --- a/metric/Cargo.toml +++ b/metric/Cargo.toml @@ -2,7 +2,7 @@ name = "metric" version = "0.1.0" authors = ["Raphael Taylor-Davies "] -edition = "2018" +edition = "2021" [dependencies] # In alphabetical order diff --git a/metric_exporters/Cargo.toml b/metric_exporters/Cargo.toml index 7f9020fe2b..8af1f1cade 100644 --- a/metric_exporters/Cargo.toml +++ b/metric_exporters/Cargo.toml @@ -2,7 +2,7 @@ name = "metric_exporters" version = "0.1.0" authors = ["Raphael Taylor-Davies "] -edition = "2018" +edition = "2021" [dependencies] # In alphabetical order diff --git a/mutable_batch/Cargo.toml b/mutable_batch/Cargo.toml index ee412b5557..a050d9711d 100644 --- a/mutable_batch/Cargo.toml +++ b/mutable_batch/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "mutable_batch" version = "0.1.0" -edition = "2018" +edition = "2021" description = "A mutable arrow RecordBatch" [dependencies] diff --git a/mutable_batch/src/column.rs b/mutable_batch/src/column.rs index b881fa36c1..842818026a 100644 --- a/mutable_batch/src/column.rs +++ b/mutable_batch/src/column.rs @@ -1,10 +1,9 @@ //! A [`Column`] stores the rows for a given column name use std::fmt::Formatter; -use std::iter::Enumerate; +use std::iter::{Enumerate, Zip}; use std::mem; use std::sync::Arc; -use std::{convert::TryInto, iter::Zip}; use arrow::error::ArrowError; use arrow::{ diff --git a/mutable_batch/src/lib.rs b/mutable_batch/src/lib.rs index fc8d945215..c70a2cde09 100644 --- a/mutable_batch/src/lib.rs +++ b/mutable_batch/src/lib.rs @@ -175,6 +175,20 @@ impl MutableBatch { Ok(()) } + /// Extend this [`MutableBatch`] with `ranges` rows from `other` + pub fn extend_from_ranges( + &mut self, + other: &Self, + ranges: &[Range], + ) -> writer::Result<()> { + let to_insert = ranges.iter().map(|x| x.end - x.start).sum(); + + let mut writer = writer::Writer::new(self, to_insert); + writer.write_batch_ranges(other, ranges)?; + writer.commit(); + Ok(()) + } + /// Returns a reference to the specified column pub(crate) fn column(&self, column: &str) -> Result<&Column> { let idx = self diff --git a/mutable_batch/src/writer.rs b/mutable_batch/src/writer.rs index 9ed436080e..a330f1bf98 100644 --- a/mutable_batch/src/writer.rs +++ b/mutable_batch/src/writer.rs @@ -499,86 +499,105 @@ impl<'a> Writer<'a> { src: &MutableBatch, range: Range, ) -> Result<()> { - if range.start == 0 && range.end == src.row_count { + self.write_batch_ranges(src, &[range]) + } + + /// Write the rows identified by `ranges` to the provided MutableBatch + pub(crate) fn write_batch_ranges( + &mut self, + src: &MutableBatch, + ranges: &[Range], + ) -> Result<()> { + let to_insert = self.to_insert; + + if to_insert == src.row_count { return self.write_batch(src); } - assert_eq!(range.end - range.start, self.to_insert); for (src_col_name, src_col_idx) in &src.column_names { let src_col = &src.columns[*src_col_idx]; let (dst_col_idx, dst_col) = self.column_mut(src_col_name, src_col.influx_type)?; let stats = match (&mut dst_col.data, &src_col.data) { - (ColumnData::F64(dst_data, _), ColumnData::F64(src_data, _)) => { - dst_data.extend_from_slice(&src_data[range.clone()]); - Statistics::F64(compute_stats(src_col.valid.bytes(), range.clone(), |x| { - &src_data[x] - })) - } - (ColumnData::I64(dst_data, _), ColumnData::I64(src_data, _)) => { - dst_data.extend_from_slice(&src_data[range.clone()]); - Statistics::I64(compute_stats(src_col.valid.bytes(), range.clone(), |x| { - &src_data[x] - })) - } - (ColumnData::U64(dst_data, _), ColumnData::U64(src_data, _)) => { - dst_data.extend_from_slice(&src_data[range.clone()]); - Statistics::U64(compute_stats(src_col.valid.bytes(), range.clone(), |x| { - &src_data[x] - })) - } + (ColumnData::F64(dst_data, _), ColumnData::F64(src_data, _)) => Statistics::F64( + write_slice(to_insert, ranges, src_col.valid.bytes(), src_data, dst_data), + ), + (ColumnData::I64(dst_data, _), ColumnData::I64(src_data, _)) => Statistics::I64( + write_slice(to_insert, ranges, src_col.valid.bytes(), src_data, dst_data), + ), + (ColumnData::U64(dst_data, _), ColumnData::U64(src_data, _)) => Statistics::U64( + write_slice(to_insert, ranges, src_col.valid.bytes(), src_data, dst_data), + ), (ColumnData::Bool(dst_data, _), ColumnData::Bool(src_data, _)) => { - dst_data.extend_from_range(src_data, range.clone()); - Statistics::Bool(compute_bool_stats( - src_col.valid.bytes(), - range.clone(), - src_data, - )) + dst_data.reserve(to_insert); + let mut stats = StatValues::new_empty(); + for range in ranges { + dst_data.extend_from_range(src_data, range.clone()); + compute_bool_stats( + src_col.valid.bytes(), + range.clone(), + src_data, + &mut stats, + ) + } + Statistics::Bool(stats) } (ColumnData::String(dst_data, _), ColumnData::String(src_data, _)) => { - dst_data.extend_from_range(src_data, range.clone()); - Statistics::String(compute_stats(src_col.valid.bytes(), range.clone(), |x| { - src_data.get(x).unwrap() - })) + let mut stats = StatValues::new_empty(); + for range in ranges { + dst_data.extend_from_range(src_data, range.clone()); + compute_stats(src_col.valid.bytes(), range.clone(), &mut stats, |x| { + src_data.get(x).unwrap() + }) + } + Statistics::String(stats) } ( ColumnData::Tag(dst_data, dst_dict, _), ColumnData::Tag(src_data, src_dict, _), ) => { + dst_data.reserve(to_insert); + let mut mapping: Vec<_> = vec![None; src_dict.values().len()]; - let mut stats = StatValues::new_empty(); - dst_data.extend(src_data[range.clone()].iter().map(|src_id| match *src_id { - INVALID_DID => { - stats.update_for_nulls(1); - INVALID_DID - } - _ => { - let maybe_did = &mut mapping[*src_id as usize]; - match maybe_did { - Some(did) => { - stats.total_count += 1; - *did + for range in ranges { + dst_data.extend(src_data[range.clone()].iter().map( + |src_id| match *src_id { + INVALID_DID => { + stats.update_for_nulls(1); + INVALID_DID } - None => { - let value = src_dict.lookup_id(*src_id).unwrap(); - stats.update(value); + _ => { + let maybe_did = &mut mapping[*src_id as usize]; + match maybe_did { + Some(did) => { + stats.total_count += 1; + *did + } + None => { + let value = src_dict.lookup_id(*src_id).unwrap(); + stats.update(value); - let did = dst_dict.lookup_value_or_insert(value); - *maybe_did = Some(did); - did + let did = dst_dict.lookup_value_or_insert(value); + *maybe_did = Some(did); + did + } + } } - } - } - })); + }, + )); + } Statistics::String(stats) } _ => unreachable!(), }; - dst_col - .valid - .extend_from_range(&src_col.valid, range.clone()); + dst_col.valid.reserve(to_insert); + for range in ranges { + dst_col + .valid + .extend_from_range(&src_col.valid, range.clone()); + } self.statistics.push((dst_col_idx, stats)); } @@ -707,12 +726,16 @@ fn append_valid_mask(column: &mut Column, valid_mask: Option<&[u8]>, to_insert: } } -fn compute_bool_stats(valid: &[u8], range: Range, col_data: &BitSet) -> StatValues { +fn compute_bool_stats( + valid: &[u8], + range: Range, + col_data: &BitSet, + stats: &mut StatValues, +) { // There are likely faster ways to do this let indexes = iter_set_positions_with_offset(valid, range.start).take_while(|idx| *idx < range.end); - let mut stats = StatValues::new_empty(); for index in indexes { let value = col_data.get(index); stats.update(&value) @@ -720,11 +743,33 @@ fn compute_bool_stats(valid: &[u8], range: Range, col_data: &BitSet) -> S let count = range.end - range.start; stats.update_for_nulls(count as u64 - stats.total_count); +} + +fn write_slice( + to_insert: usize, + ranges: &[Range], + valid: &[u8], + src_data: &[T], + dst_data: &mut Vec, +) -> StatValues +where + T: Clone + PartialOrd + IsNan, +{ + dst_data.reserve(to_insert); + let mut stats = StatValues::new_empty(); + for range in ranges { + dst_data.extend_from_slice(&src_data[range.clone()]); + compute_stats(valid, range.clone(), &mut stats, |x| &src_data[x]); + } stats } -fn compute_stats<'a, T, U, F>(valid: &[u8], range: Range, accessor: F) -> StatValues -where +fn compute_stats<'a, T, U, F>( + valid: &[u8], + range: Range, + stats: &mut StatValues, + accessor: F, +) where U: 'a + ToOwned + PartialOrd + ?Sized + IsNan, F: Fn(usize) -> &'a U, T: std::borrow::Borrow, @@ -733,14 +778,12 @@ where .take_while(|idx| *idx < range.end) .map(accessor); - let mut stats = StatValues::new_empty(); for value in values { stats.update(value) } let count = range.end - range.start; stats.update_for_nulls(count as u64 - stats.total_count); - stats } impl<'a> Drop for Writer<'a> { diff --git a/mutable_batch_lp/Cargo.toml b/mutable_batch_lp/Cargo.toml index cc1dd86458..bc36496c3c 100644 --- a/mutable_batch_lp/Cargo.toml +++ b/mutable_batch_lp/Cargo.toml @@ -1,10 +1,11 @@ [package] name = "mutable_batch_lp" version = "0.1.0" -edition = "2018" +edition = "2021" description = "Conversion logic for line protocol -> MutableBatch" [dependencies] +hashbrown = "0.11" influxdb_line_protocol = { path = "../influxdb_line_protocol" } mutable_batch = { path = "../mutable_batch" } schema = { path = "../schema" } diff --git a/mutable_batch_lp/benches/write_lp.rs b/mutable_batch_lp/benches/write_lp.rs index 5bd7d5d1a3..0a52daf739 100644 --- a/mutable_batch_lp/benches/write_lp.rs +++ b/mutable_batch_lp/benches/write_lp.rs @@ -4,7 +4,7 @@ use bytes::Bytes; use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput}; use flate2::read::GzDecoder; -use mutable_batch_lp::lines_to_batch; +use mutable_batch_lp::lines_to_batches; fn generate_lp_bytes() -> Bytes { let raw = include_bytes!("../../tests/fixtures/lineproto/read_filter.lp.gz"); @@ -23,7 +23,9 @@ pub fn write_lp(c: &mut Criterion) { group.bench_function(BenchmarkId::from_parameter(count), |b| { b.iter(|| { for _ in 0..*count { - lines_to_batch(std::str::from_utf8(&lp_bytes).unwrap(), 0).unwrap(); + let batches = + lines_to_batches(std::str::from_utf8(&lp_bytes).unwrap(), 0).unwrap(); + assert_eq!(batches.len(), 1); } }); }); diff --git a/mutable_batch_lp/src/lib.rs b/mutable_batch_lp/src/lib.rs index 0a5f7267a4..2f2446e808 100644 --- a/mutable_batch_lp/src/lib.rs +++ b/mutable_batch_lp/src/lib.rs @@ -11,6 +11,7 @@ clippy::clone_on_ref_ptr )] +use hashbrown::HashMap; use influxdb_line_protocol::{parse_lines, FieldValue, ParsedLine}; use mutable_batch::writer::Writer; use mutable_batch::MutableBatch; @@ -36,18 +37,26 @@ pub enum Error { /// Result type for line protocol conversion pub type Result = std::result::Result; -/// Converts the provided lines of line protocol to a [`MutableBatch`] -pub fn lines_to_batch(lines: &str, default_time: i64) -> Result { - let mut batch = MutableBatch::new(); +/// Converts the provided lines of line protocol to a set of [`MutableBatch`] +/// keyed by measurement name +pub fn lines_to_batches(lines: &str, default_time: i64) -> Result> { + let mut batches = HashMap::new(); for (line_idx, maybe_line) in parse_lines(lines).enumerate() { let line = maybe_line.context(LineProtocol { line: line_idx + 1 })?; + let measurement = line.series.measurement.as_str(); + + let (_, batch) = batches + .raw_entry_mut() + .from_key(measurement) + .or_insert_with(|| (measurement.to_string(), MutableBatch::new())); // TODO: Reuse writer - let mut writer = Writer::new(&mut batch, 1); + let mut writer = Writer::new(batch, 1); write_line(&mut writer, line, default_time).context(Write { line: line_idx + 1 })?; writer.commit(); } - Ok(batch) + + Ok(batches) } fn write_line( @@ -95,10 +104,14 @@ mod tests { fn test_basic() { let lp = r#"cpu,tag1=v1,tag2=v2 val=2i 0 cpu,tag1=v4,tag2=v1 val=2i 0 + mem,tag1=v2 ival=3i 0 cpu,tag2=v2 val=3i 1 - cpu,tag1=v1,tag2=v2 fval=2.0"#; + cpu,tag1=v1,tag2=v2 fval=2.0 + mem,tag1=v5 ival=2i 1 + "#; - let batch = lines_to_batch(lp, 5).unwrap(); + let batch = lines_to_batches(lp, 5).unwrap(); + assert_eq!(batch.len(), 2); assert_batches_eq!( &[ @@ -111,7 +124,19 @@ mod tests { "| 2 | v1 | v2 | 1970-01-01T00:00:00.000000005Z | |", "+------+------+------+--------------------------------+-----+", ], - &[batch.to_arrow(Selection::All).unwrap()] + &[batch["cpu"].to_arrow(Selection::All).unwrap()] + ); + + assert_batches_eq!( + &[ + "+------+------+--------------------------------+", + "| ival | tag1 | time |", + "+------+------+--------------------------------+", + "| 3 | v2 | 1970-01-01T00:00:00Z |", + "| 2 | v5 | 1970-01-01T00:00:00.000000001Z |", + "+------+------+--------------------------------+", + ], + &[batch["mem"].to_arrow(Selection::All).unwrap()] ); } } diff --git a/mutable_batch_pb/Cargo.toml b/mutable_batch_pb/Cargo.toml index 0d55b58948..af9b89a5e8 100644 --- a/mutable_batch_pb/Cargo.toml +++ b/mutable_batch_pb/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "mutable_batch_pb" version = "0.1.0" -edition = "2018" +edition = "2021" description = "Conversion logic for binary write protocol <-> MutableBatch" [dependencies] diff --git a/mutable_buffer/Cargo.toml b/mutable_buffer/Cargo.toml index f6fc45a408..c34e433cb5 100644 --- a/mutable_buffer/Cargo.toml +++ b/mutable_buffer/Cargo.toml @@ -2,7 +2,7 @@ name = "mutable_buffer" version = "0.1.0" authors = ["Andrew Lamb "] -edition = "2018" +edition = "2021" [dependencies] # In alphabetical order arrow = { version = "6.0", features = ["prettyprint"] } diff --git a/object_store/Cargo.toml b/object_store/Cargo.toml index a38d56c065..1538f61094 100644 --- a/object_store/Cargo.toml +++ b/object_store/Cargo.toml @@ -2,7 +2,7 @@ name = "object_store" version = "0.1.0" authors = ["Paul Dix "] -edition = "2018" +edition = "2021" [dependencies] # In alphabetical order async-trait = "0.1.42" diff --git a/observability_deps/Cargo.toml b/observability_deps/Cargo.toml index 4b66765661..01afbce882 100644 --- a/observability_deps/Cargo.toml +++ b/observability_deps/Cargo.toml @@ -2,7 +2,7 @@ name = "observability_deps" version = "0.1.0" authors = ["Paul Dix "] -edition = "2018" +edition = "2021" description = "Observability ecosystem dependencies for InfluxDB IOx, to ensure consistent versions and unified updates" [dependencies] # In alphabetical order diff --git a/packers/Cargo.toml b/packers/Cargo.toml index 2e5580911e..a6f0dc0e60 100644 --- a/packers/Cargo.toml +++ b/packers/Cargo.toml @@ -2,7 +2,7 @@ name = "packers" version = "0.1.0" authors = ["Andrew Lamb "] -edition = "2018" +edition = "2021" [dependencies] # In alphabetical order arrow = { version = "6.0", features = ["prettyprint"] } diff --git a/panic_logging/Cargo.toml b/panic_logging/Cargo.toml index 451ddbda33..d46a6b986a 100644 --- a/panic_logging/Cargo.toml +++ b/panic_logging/Cargo.toml @@ -2,7 +2,7 @@ name = "panic_logging" version = "0.1.0" authors = ["Paul Dix "] -edition = "2018" +edition = "2021" [dependencies] # In alphabetical order observability_deps = { path = "../observability_deps" } diff --git a/parquet_catalog/Cargo.toml b/parquet_catalog/Cargo.toml index 13f64ec10f..1b47f8063f 100644 --- a/parquet_catalog/Cargo.toml +++ b/parquet_catalog/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "parquet_catalog" version = "0.1.0" -edition = "2018" +edition = "2021" [dependencies] arrow = { version = "6.0", features = ["prettyprint"] } diff --git a/parquet_file/Cargo.toml b/parquet_file/Cargo.toml index b5038cc9ad..cb7edf049f 100644 --- a/parquet_file/Cargo.toml +++ b/parquet_file/Cargo.toml @@ -2,7 +2,7 @@ name = "parquet_file" version = "0.1.0" authors = ["Nga Tran "] -edition = "2018" +edition = "2021" [dependencies] # In alphabetical order arrow = { version = "6.0", features = ["prettyprint"] } diff --git a/persistence_windows/Cargo.toml b/persistence_windows/Cargo.toml index 98ce242dac..a068b524be 100644 --- a/persistence_windows/Cargo.toml +++ b/persistence_windows/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "persistence_windows" version = "0.1.0" -edition = "2018" +edition = "2021" [dependencies] data_types = { path = "../data_types" } diff --git a/predicate/Cargo.toml b/predicate/Cargo.toml index a7f44f6653..843a7ce3a6 100644 --- a/predicate/Cargo.toml +++ b/predicate/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "predicate" version = "0.1.0" -edition = "2018" +edition = "2021" [dependencies] arrow = { version = "6.0", features = ["prettyprint"] } diff --git a/predicate/src/predicate.rs b/predicate/src/predicate.rs index b0d6dcf910..5f78a73384 100644 --- a/predicate/src/predicate.rs +++ b/predicate/src/predicate.rs @@ -31,8 +31,9 @@ pub const EMPTY_PREDICATE: Predicate = Predicate { #[derive(Debug, Clone, Copy)] /// The result of evaluating a predicate on a set of rows pub enum PredicateMatch { - /// There is at least one row that matches the predicate - AtLeastOne, + /// There is at least one row that matches the predicate that has + /// at least one non null value in each field of the predicate + AtLeastOneNonNullField, /// There are exactly zero rows that match the predicate Zero, diff --git a/query/Cargo.toml b/query/Cargo.toml index 766731c189..8d402d4677 100644 --- a/query/Cargo.toml +++ b/query/Cargo.toml @@ -2,7 +2,7 @@ name = "query" version = "0.1.0" authors = ["Andrew Lamb "] -edition = "2018" +edition = "2021" description = "IOx Query Interface and Executor" # This crate is designed to be independent of the rest of the IOx diff --git a/query/src/exec/context.rs b/query/src/exec/context.rs index 44bf13455f..7fef949a8d 100644 --- a/query/src/exec/context.rs +++ b/query/src/exec/context.rs @@ -22,20 +22,17 @@ use futures::TryStreamExt; use observability_deps::tracing::{debug, trace}; use trace::{ctx::SpanContext, span::SpanRecorder}; -use crate::{ - exec::{ - fieldlist::{FieldList, IntoFieldList}, - non_null_checker::NonNullCheckerExec, - query_tracing::TracedStream, - schema_pivot::{SchemaPivotExec, SchemaPivotNode}, - seriesset::{ - converter::{GroupGenerator, SeriesSetConverter}, - series::Series, - }, - split::StreamSplitExec, - stringset::{IntoStringSet, StringSetRef}, +use crate::exec::{ + fieldlist::{FieldList, IntoFieldList}, + non_null_checker::NonNullCheckerExec, + query_tracing::TracedStream, + schema_pivot::{SchemaPivotExec, SchemaPivotNode}, + seriesset::{ + converter::{GroupGenerator, SeriesSetConverter}, + series::Series, }, - plan::stringset::TableNamePlanBuilder, + split::StreamSplitExec, + stringset::{IntoStringSet, StringSetRef}, }; use crate::plan::{ @@ -489,25 +486,6 @@ impl IOxExecutionContext { } } - /// Executes table_plans and, if returns some rows, add that table into the return list - /// Tables discovered from meta data won't need any plan - pub async fn to_table_names(&self, builder: TableNamePlanBuilder) -> Result { - let ctx = self.child_ctx("to_table_names"); - - // first get all meta data tables - let mut tables = builder.meta_data_table_names().clone(); - - // Now run each plan and if it returns data, add it table in - let table_plans = builder.table_plans(); - for (table, plan) in table_plans { - if !ctx.run_logical_plan(plan).await?.is_empty() { - tables.insert(table.clone()); - } - } - - Ok(Arc::new(tables)) - } - /// Run the plan and return a record batch reader for reading the results pub async fn run_logical_plan(&self, plan: LogicalPlan) -> Result> { self.run_logical_plans(vec![plan]).await diff --git a/query/src/frontend/influxrpc.rs b/query/src/frontend/influxrpc.rs index 84f3bddbc1..c5bcd7ac19 100644 --- a/query/src/frontend/influxrpc.rs +++ b/query/src/frontend/influxrpc.rs @@ -26,7 +26,7 @@ use schema::{InfluxColumnType, Schema, TIME_COLUMN_NAME}; use snafu::{ensure, OptionExt, ResultExt, Snafu}; use crate::{ - exec::{field::FieldColumns, make_schema_pivot}, + exec::{field::FieldColumns, make_non_null_checker, make_schema_pivot}, func::{ selectors::{selector_first, selector_last, selector_max, selector_min, SelectorOutput}, window::make_window_bound_expr, @@ -35,9 +35,7 @@ use crate::{ plan::{ fieldlist::FieldListPlan, seriesset::{SeriesSetPlan, SeriesSetPlans}, - stringset::{ - Error as StringSetError, StringSetPlan, StringSetPlanBuilder, TableNamePlanBuilder, - }, + stringset::{Error as StringSetError, StringSetPlan, StringSetPlanBuilder}, }, provider::ProviderBuilder, QueryChunk, QueryChunkMeta, QueryDatabase, @@ -229,11 +227,13 @@ impl InfluxRpcPlanner { /// . A set of plans of tables of either /// . chunks with deleted data or /// . chunks without deleted data but cannot be decided from meta data - pub fn table_names(&self, database: &D, predicate: Predicate) -> Result + pub fn table_names(&self, database: &D, predicate: Predicate) -> Result where D: QueryDatabase + 'static, { - let mut builder = TableNamePlanBuilder::new(); + debug!(predicate=?predicate, "planning table_names"); + + let mut builder = StringSetPlanBuilder::new(); let mut normalizer = PredicateNormalizer::new(predicate); // Mapping between table and chunks that need full plan @@ -244,9 +244,11 @@ impl InfluxRpcPlanner { for chunk in database.chunks(normalizer.unnormalized()) { let table_name = chunk.table_name(); let schema = chunk.schema(); + trace!(chunk_id=%chunk.id(), table_name, "Considering table"); // Table is already in the returned table list, no longer needs to discover it from other chunks - if builder.contains_meta_data_table(table_name.to_string()) { + if builder.contains(table_name) { + trace!("already seen"); continue; } @@ -258,10 +260,11 @@ impl InfluxRpcPlanner { .or_insert_with(Vec::new) .push(Arc::clone(&chunk)); } else { - // See if we can have enough info from the chunk's meta data to answer - // that this table participates in the request + // See if we have enough info only from the chunk's + // meta data to know if the table has data that + // matches the predicate let predicate = normalizer.normalized(table_name, schema); - // + // Try and apply the predicate using only metadata let pred_result = chunk .apply_predicate_to_metadata(&predicate) @@ -269,38 +272,38 @@ impl InfluxRpcPlanner { .context(CheckingChunkPredicate { chunk_id: chunk.id(), })?; - // + match pred_result { - PredicateMatch::AtLeastOne => { + PredicateMatch::AtLeastOneNonNullField => { + trace!("Metadata predicate: table matches"); // Meta data of the table covers predicates of the request - builder.append_meta_data_table(table_name.to_string()); + builder.append_string(table_name); } PredicateMatch::Unknown => { + trace!("Metadata predicate: unknown match"); // We cannot match the predicate to get answer from meta data, let do full plan full_plan_table_chunks .entry(table_name.to_string()) .or_insert_with(Vec::new) .push(Arc::clone(&chunk)); } - PredicateMatch::Zero => {} // this chunk's table does not participate in the request + PredicateMatch::Zero => { + trace!("Metadata predicate: zero rows match"); + } // this chunk's table does not participate in the request } } } - // remove items from full_plan_table_chunks whose tables are already in the returned list - let meta_data_tables = builder.meta_data_table_names(); - for table in meta_data_tables { - full_plan_table_chunks.remove(&table); + // remove items from full_plan_table_chunks whose tables are + // already in the returned list + for table in builder.known_strings_iter() { + trace!(%table, "Table is known to have matches, skipping plan"); + full_plan_table_chunks.remove(table); if full_plan_table_chunks.is_empty() { break; } } - // No full plans needed - if full_plan_table_chunks.is_empty() { - return Ok(builder); - } - // Now build plans for full-plan tables for (table_name, chunks) in full_plan_table_chunks { let schema = database.table_schema(&table_name).context(TableRemoved { @@ -309,11 +312,11 @@ impl InfluxRpcPlanner { if let Some(plan) = self.table_name_plan(&table_name, schema, &mut normalizer, chunks)? { - builder.append_plans(table_name, plan); + builder = builder.append_other(plan.into()); } } - Ok(builder) + builder.build().context(CreatingStringSet) } /// Returns a set of plans that produces the names of "tag" @@ -427,14 +430,14 @@ impl InfluxRpcPlanner { let plan = self.tag_keys_plan(&table_name, schema, &mut normalizer, chunks)?; if let Some(plan) = plan { - builder = builder.append(plan) + builder = builder.append_other(plan) } } } // add the known columns we could find from metadata only builder - .append(known_columns.into()) + .append_other(known_columns.into()) .build() .context(CreatingStringSet) } @@ -596,13 +599,13 @@ impl InfluxRpcPlanner { .build() .context(BuildingPlan)?; - builder = builder.append(plan.into()); + builder = builder.append_other(plan.into()); } } // add the known values we could find from metadata only builder - .append(known_values.into()) + .append_other(known_values.into()) .build() .context(CreatingStringSet) } @@ -830,19 +833,13 @@ impl InfluxRpcPlanner { chunk_id: chunk.id(), })?; - match pred_result { - PredicateMatch::AtLeastOne | + if !matches!(pred_result, PredicateMatch::Zero) { // have to include chunk as we can't rule it out - PredicateMatch::Unknown => { - let table_name = chunk.table_name().to_string(); - table_chunks - .entry(table_name) - .or_insert_with(Vec::new) - .push(Arc::clone(&chunk)); - } - // Skip chunk here based on metadata - PredicateMatch::Zero => { - } + let table_name = chunk.table_name().to_string(); + table_chunks + .entry(table_name) + .or_insert_with(Vec::new) + .push(Arc::clone(&chunk)); } } Ok(table_chunks) @@ -960,11 +957,11 @@ impl InfluxRpcPlanner { Ok(Some(plan)) } - /// Creates a DataFusion LogicalPlan that returns the timestamp - /// for a specified table: + /// Creates a DataFusion LogicalPlan that returns the values in + /// the fields for a specified table: /// - /// The output looks like (time) - /// The time column is chosen because it must be included in all tables + /// The output produces the table name as a single string if any + /// non null values are passed in. /// /// The data is not sorted in any particular order /// @@ -974,13 +971,11 @@ impl InfluxRpcPlanner { /// The created plan looks like: /// /// ```text - /// Projection (select time) + /// NonNullChecker + /// Projection (select fields) /// Filter(predicate) [optional] /// Scan /// ``` - // TODO: for optimization in the future, build `select count(*)` plan instead, - // ,but if we do this, we also need to change the way we handle output - // of the function invoking this because it will always return a number fn table_name_plan( &self, table_name: &str, @@ -991,6 +986,7 @@ impl InfluxRpcPlanner { where C: QueryChunk + 'static, { + debug!(%table_name, "Creating table_name full plan"); let scan_and_filter = self.scan_and_filter(table_name, schema, normalizer, chunks)?; let TableScanAndFilter { plan_builder, @@ -1000,15 +996,11 @@ impl InfluxRpcPlanner { Some(t) => t, }; - // Selection of only time - let select_exprs = schema - .iter() - .filter_map(|(influx_column_type, field)| match influx_column_type { - Some(InfluxColumnType::Timestamp) => Some(col(field.name())), - Some(_) => None, - None => None, - }) - .collect::>(); + // Select only fields requested + let predicate = normalizer.normalized(table_name, Arc::clone(&schema)); + let select_exprs: Vec<_> = filtered_fields_iter(&schema, &predicate) + .map(|field| col(field.name().as_str())) + .collect(); let plan = plan_builder .project(select_exprs) @@ -1016,6 +1008,9 @@ impl InfluxRpcPlanner { .build() .context(BuildingPlan)?; + // Add the final node that outputs the table name or not, depending + let plan = make_non_null_checker(table_name, plan); + Ok(Some(plan)) } diff --git a/query/src/plan/stringset.rs b/query/src/plan/stringset.rs index 627d19e544..b8246281a3 100644 --- a/query/src/plan/stringset.rs +++ b/query/src/plan/stringset.rs @@ -1,4 +1,4 @@ -use std::{collections::BTreeMap, sync::Arc}; +use std::sync::Arc; use arrow_util::util::str_iter_to_batch; use datafusion::logical_plan::LogicalPlan; @@ -97,7 +97,7 @@ impl StringSetPlanBuilder { /// Append the strings from the passed plan into ourselves if possible, or /// passes on the plan - pub fn append(mut self, other: StringSetPlan) -> Self { + pub fn append_other(mut self, other: StringSetPlan) -> Self { match other { StringSetPlan::Known(ssref) => match Arc::try_unwrap(ssref) { Ok(mut ss) => { @@ -117,6 +117,23 @@ impl StringSetPlanBuilder { self } + /// Return true if we know already that `s` is contained in the + /// StringSet. Note that if `contains()` returns false, `s` may be + /// in the stringset after execution. + pub fn contains(&self, s: impl AsRef) -> bool { + self.strings.contains(s.as_ref()) + } + + /// Append a single string to the known set of strings in this builder + pub fn append_string(&mut self, s: impl Into) { + self.strings.insert(s.into()); + } + + /// returns an iterator over the currently known strings in this builder + pub fn known_strings_iter(&self) -> impl Iterator { + self.strings.iter() + } + /// Create a StringSetPlan that produces the deduplicated (union) /// of all plans `append`ed to this builder. pub fn build(self) -> Result { @@ -143,39 +160,6 @@ impl StringSetPlanBuilder { } } -#[derive(Debug, Default)] -pub struct TableNamePlanBuilder { - /// Known tables achieved from meta data - meta_data_tables: StringSet, - /// Other tables and their general plans - plans: BTreeMap, -} - -impl TableNamePlanBuilder { - pub fn new() -> Self { - Self::default() - } - pub fn append_meta_data_table(&mut self, table: String) { - self.meta_data_tables.insert(table); - } - - pub fn append_plans(&mut self, table_name: String, plan: LogicalPlan) { - self.plans.insert(table_name, plan); - } - - pub fn contains_meta_data_table(&self, table: String) -> bool { - self.meta_data_tables.contains(&table) - } - - pub fn meta_data_table_names(&self) -> StringSet { - self.meta_data_tables.clone() - } - - pub fn table_plans(&self) -> BTreeMap { - self.plans.clone() - } -} - #[cfg(test)] mod tests { use crate::exec::{Executor, ExecutorType}; @@ -196,8 +180,8 @@ mod tests { #[test] fn test_builder_strings_only() { let plan = StringSetPlanBuilder::new() - .append(to_string_set(&["foo", "bar"]).into()) - .append(to_string_set(&["bar", "baz"]).into()) + .append_other(to_string_set(&["foo", "bar"]).into()) + .append_other(to_string_set(&["bar", "baz"]).into()) .build() .unwrap(); @@ -228,9 +212,9 @@ mod tests { // when a df plan is appended the whole plan should be different let plan = StringSetPlanBuilder::new() - .append(to_string_set(&["foo", "bar"]).into()) - .append(vec![df_plan].into()) - .append(to_string_set(&["baz"]).into()) + .append_other(to_string_set(&["foo", "bar"]).into()) + .append_other(vec![df_plan].into()) + .append_other(to_string_set(&["baz"]).into()) .build() .unwrap(); diff --git a/query_tests/Cargo.toml b/query_tests/Cargo.toml index 1b386903d5..bb69fc3fc1 100644 --- a/query_tests/Cargo.toml +++ b/query_tests/Cargo.toml @@ -2,7 +2,7 @@ name = "query_tests" version = "0.1.0" authors = ["Andrew Lamb "] -edition = "2018" +edition = "2021" description = "Tests of the query engine against different database configurations" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html diff --git a/query_tests/generate/Cargo.toml b/query_tests/generate/Cargo.toml index bbe46ba52b..2d78f367fe 100644 --- a/query_tests/generate/Cargo.toml +++ b/query_tests/generate/Cargo.toml @@ -3,9 +3,9 @@ name = "generate" description = "Creates rust #tests for files in .sql" version = "0.1.0" authors = ["Andrew Lamb "] -edition = "2018" +edition = "2021" [dependencies] # In alphabetical order # Note this is a standalone binary and not part of the overall workspace -[workspace] \ No newline at end of file +[workspace] diff --git a/query_tests/src/influxrpc/table_names.rs b/query_tests/src/influxrpc/table_names.rs index d44dcc1db4..0f881f2ce6 100644 --- a/query_tests/src/influxrpc/table_names.rs +++ b/query_tests/src/influxrpc/table_names.rs @@ -1,4 +1,5 @@ //! Tests for the Influx gRPC queries +use datafusion::logical_plan::{col, lit}; use predicate::predicate::{Predicate, PredicateBuilder, EMPTY_PREDICATE}; use query::{ exec::stringset::{IntoStringSet, StringSetRef}, @@ -24,11 +25,12 @@ where let planner = InfluxRpcPlanner::new(); let ctx = db.executor().new_context(query::exec::ExecutorType::Query); - let builder = planner + let plan = planner .table_names(db.as_ref(), predicate.clone()) .expect("built plan successfully"); + let names = ctx - .to_table_names(builder) + .to_string_set(plan) .await .expect("converted plan to strings successfully"); @@ -53,6 +55,49 @@ async fn list_table_names_no_data_pred() { run_table_names_test_case(TwoMeasurements {}, EMPTY_PREDICATE, vec!["cpu", "disk"]).await; } +#[tokio::test] +async fn list_table_names_no_data_passes() { + // no rows pass this predicate + run_table_names_test_case( + TwoMeasurementsManyFields {}, + tsp(10000000, 20000000), + vec![], + ) + .await; +} + +#[tokio::test] +async fn list_table_names_no_non_null_data_passes() { + // only a single row with a null field passes this predicate (expect no table names) + let predicate = PredicateBuilder::default() + .table("o2") + // only get last row of o2 (timestamp = 300) + .timestamp_range(200, 400) + // model predicate like _field='reading' which last row does not have + .field_columns(vec!["reading"]) + .build(); + + run_table_names_test_case(TwoMeasurementsManyFields {}, predicate, vec![]).await; +} + +#[tokio::test] +async fn list_table_names_no_non_null_general_data_passes() { + // only a single row with a null field passes this predicate + // (expect no table names) -- has a general purpose predicate to + // force a generic plan + let predicate = PredicateBuilder::default() + .table("o2") + // only get last row of o2 (timestamp = 300) + .timestamp_range(200, 400) + // model predicate like _field='reading' which last row does not have + .field_columns(vec!["reading"]) + // (state = CA) OR (temp > 50) + .add_expr(col("state").eq(lit("CA")).or(col("temp").gt(lit(50)))) + .build(); + + run_table_names_test_case(TwoMeasurementsManyFields {}, predicate, vec![]).await; +} + #[tokio::test] async fn list_table_names_no_data_pred_with_delete() { run_table_names_test_case( diff --git a/query_tests/src/pruning.rs b/query_tests/src/pruning.rs index 09b93b8ea9..011c6d6f51 100644 --- a/query_tests/src/pruning.rs +++ b/query_tests/src/pruning.rs @@ -103,11 +103,11 @@ async fn chunk_pruning_influxrpc() { let ctx = db.executor().new_context(query::exec::ExecutorType::Query); - let builder = InfluxRpcPlanner::new() + let plan = InfluxRpcPlanner::new() .table_names(db.as_ref(), predicate) .unwrap(); - let result = ctx.to_table_names(builder).await.unwrap(); + let result = ctx.to_string_set(plan).await.unwrap(); assert_eq!(&expected, result.as_ref()); diff --git a/read_buffer/Cargo.toml b/read_buffer/Cargo.toml index 88877f782d..707189e83b 100644 --- a/read_buffer/Cargo.toml +++ b/read_buffer/Cargo.toml @@ -2,7 +2,7 @@ name = "read_buffer" version = "0.1.0" authors = ["Edd Robinson "] -edition = "2018" +edition = "2021" # Note this crate is designed to be standalone, and should not depend # on the IOx Query Engine. The rationale is: diff --git a/rustfmt.toml b/rustfmt.toml index 4edcabf961..7b060483be 100644 --- a/rustfmt.toml +++ b/rustfmt.toml @@ -1,4 +1,4 @@ -edition = "2018" +edition = "2021" # Unstable features not yet supported on stable Rust #wrap_comments = true diff --git a/schema/Cargo.toml b/schema/Cargo.toml index 2a6d3b085a..231d10bf80 100644 --- a/schema/Cargo.toml +++ b/schema/Cargo.toml @@ -2,7 +2,7 @@ name = "schema" version = "0.1.0" authors = ["Andrew Lamb "] -edition = "2018" +edition = "2021" description = "IOx Schema definition" [dependencies] diff --git a/server/Cargo.toml b/server/Cargo.toml index eb53ec8a8a..fa2d104de3 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -2,7 +2,7 @@ name = "server" version = "0.1.0" authors = ["pauldix "] -edition = "2018" +edition = "2021" [dependencies] # In alphabetical order arrow = { version = "6.0", features = ["prettyprint"] } diff --git a/server/src/application.rs b/server/src/application.rs index 79e74eb203..83e5eaad7b 100644 --- a/server/src/application.rs +++ b/server/src/application.rs @@ -4,6 +4,7 @@ use object_store::ObjectStore; use observability_deps::tracing::info; use query::exec::Executor; use time::TimeProvider; +use trace::TraceCollector; use write_buffer::config::WriteBufferConfigFactory; use crate::JobRegistry; @@ -18,13 +19,18 @@ pub struct ApplicationState { job_registry: Arc, metric_registry: Arc, time_provider: Arc, + trace_collector: Option>, } impl ApplicationState { /// Creates a new `ApplicationState` /// /// Uses number of CPUs in the system if num_worker_threads is not set - pub fn new(object_store: Arc, num_worker_threads: Option) -> Self { + pub fn new( + object_store: Arc, + num_worker_threads: Option, + trace_collector: Option>, + ) -> Self { let num_threads = num_worker_threads.unwrap_or_else(num_cpus::get); info!(%num_threads, "using specified number of threads per thread pool"); @@ -45,6 +51,7 @@ impl ApplicationState { job_registry, metric_registry, time_provider, + trace_collector, } } @@ -68,6 +75,10 @@ impl ApplicationState { &self.time_provider } + pub fn trace_collector(&self) -> &Option> { + &self.trace_collector + } + pub fn executor(&self) -> &Arc { &self.executor } diff --git a/server/src/database.rs b/server/src/database.rs index 546786ceca..2bca56b868 100644 --- a/server/src/database.rs +++ b/server/src/database.rs @@ -30,7 +30,6 @@ use std::{future::Future, sync::Arc, time::Duration}; use tokio::{sync::Notify, task::JoinError}; use tokio_util::sync::CancellationToken; use trace::ctx::SpanContext; -use trace::{RingBufferTraceCollector, TraceCollector}; use uuid::Uuid; const INIT_BACKOFF: Duration = Duration::from_secs(1); @@ -1312,10 +1311,8 @@ impl DatabaseStateCatalogLoaded { ) -> Result { let db = Arc::clone(&self.db); - // TODO: use proper trace collector - let trace_collector: Arc = Arc::new(RingBufferTraceCollector::new(5)); - let rules = self.provided_rules.rules(); + let trace_collector = shared.application.trace_collector(); let write_buffer_factory = shared.application.write_buffer_factory(); let write_buffer_consumer = match rules.write_buffer_connection.as_ref() { Some(connection) if matches!(connection.direction, WriteBufferDirection::Read) => { @@ -1323,7 +1320,7 @@ impl DatabaseStateCatalogLoaded { .new_config_read( shared.config.server_id, shared.config.name.as_str(), - Some(&trace_collector), + trace_collector.as_ref(), connection, ) .await @@ -1375,13 +1372,14 @@ impl DatabaseStateInitialized { #[cfg(test)] mod tests { + use crate::test_utils::make_application; + use super::*; use data_types::database_rules::{ PartitionTemplate, TemplatePart, WriteBufferConnection, WriteBufferDirection, }; use data_types::sequence::Sequence; use entry::{test_helpers::lp_to_entries, SequencedEntry}; - use object_store::ObjectStore; use std::{ convert::{TryFrom, TryInto}, num::NonZeroU32, @@ -1393,10 +1391,7 @@ mod tests { #[tokio::test] async fn database_shutdown_waits_for_jobs() { - let application = Arc::new(ApplicationState::new( - Arc::new(ObjectStore::new_in_memory()), - None, - )); + let application = make_application(); let database = Database::new( Arc::clone(&application), @@ -1454,10 +1449,7 @@ mod tests { async fn initialized_database() -> (Arc, Database) { let server_id = ServerId::try_from(1).unwrap(); - let application = Arc::new(ApplicationState::new( - Arc::new(ObjectStore::new_in_memory()), - None, - )); + let application = make_application(); let db_name = DatabaseName::new("test").unwrap(); let uuid = Uuid::new_v4(); @@ -1594,10 +1586,7 @@ mod tests { )); // setup application - let application = Arc::new(ApplicationState::new( - Arc::new(ObjectStore::new_in_memory()), - None, - )); + let application = make_application(); application .write_buffer_factory() .register_mock("my_mock".to_string(), state.clone()); diff --git a/server/src/db/chunk.rs b/server/src/db/chunk.rs index 046652cd2b..d1797ea45a 100644 --- a/server/src/db/chunk.rs +++ b/server/src/db/chunk.rs @@ -20,6 +20,7 @@ use predicate::{ }; use query::{exec::stringset::StringSet, QueryChunk, QueryChunkMeta}; use read_buffer::RBChunk; +use schema::InfluxColumnType; use schema::{selection::Selection, sort::SortKey, Schema}; use snafu::{OptionExt, ResultExt, Snafu}; use std::{ @@ -237,6 +238,31 @@ impl DbChunk { debug!(?rub_preds, "RUB delete predicates"); Ok(rub_preds) } + + /// Return true if any of the fields called for in the `predicate` + /// contain at least 1 null value. Returns false ONLY if all + /// fields that pass `predicate` are entirely non null + fn fields_have_nulls(&self, predicate: &Predicate) -> bool { + self.meta.schema.iter().any(|(influx_column_type, field)| { + if matches!(influx_column_type, Some(InfluxColumnType::Field(_))) + && predicate.should_include_field(field.name()) + { + match self.meta.table_summary.column(field.name()) { + Some(column_summary) => { + // only if this is false can we return false + column_summary.null_count() > 0 + } + None => { + // don't know the stats for this column, so assume there can be nulls + true + } + } + } else { + // not a field column + false + } + }) + } } impl QueryChunk for DbChunk { @@ -264,23 +290,12 @@ impl QueryChunk for DbChunk { return Ok(PredicateMatch::Zero); } - // TODO apply predicate pruning here... - let pred_result = match &self.state { State::MutableBuffer { chunk, .. } => { - if predicate.has_exprs() { - // TODO: Support more predicates + if predicate.has_exprs() || chunk.has_timerange(&predicate.range) { + // TODO some more work to figure out if we + // definite have / do not have result PredicateMatch::Unknown - } else if chunk.has_timerange(&predicate.range) { - // Note: this isn't precise / correct: if the - // chunk has the timerange, some other part of the - // predicate may rule out the rows, and thus - // without further work this clause should return - // "Unknown" rather than falsely claiming that - // there is at least one row: - // - // https://github.com/influxdata/influxdb_iox/issues/1590 - PredicateMatch::AtLeastOne } else { PredicateMatch::Zero } @@ -305,19 +320,21 @@ impl QueryChunk for DbChunk { // on meta-data only. This should be possible without needing to // know the execution engine the chunk is held in. if chunk.satisfies_predicate(&rb_predicate) { - PredicateMatch::AtLeastOne + // if any of the fields referred to in the + // predicate has nulls, don't know without more + // work if the rows that matched had non null values + if self.fields_have_nulls(predicate) { + PredicateMatch::Unknown + } else { + PredicateMatch::AtLeastOneNonNullField + } } else { PredicateMatch::Zero } } State::ParquetFile { chunk, .. } => { - if predicate.has_exprs() { - // TODO: Support more predicates + if predicate.has_exprs() || chunk.has_timerange(predicate.range.as_ref()) { PredicateMatch::Unknown - } else if chunk.has_timerange(predicate.range.as_ref()) { - // As above, this should really be "Unknown" rather than AtLeastOne - // for precision / correctness. - PredicateMatch::AtLeastOne } else { PredicateMatch::Zero } diff --git a/server/src/db/replay.rs b/server/src/db/replay.rs index b23278a734..1bd5c216ec 100644 --- a/server/src/db/replay.rs +++ b/server/src/db/replay.rs @@ -408,14 +408,11 @@ impl SequenceNumberSection { #[cfg(test)] mod tests { use super::*; - - use std::{ - convert::TryFrom, - num::{NonZeroU32, NonZeroU64, NonZeroUsize}, - sync::Arc, - time::{Duration, Instant}, + use crate::{ + lifecycle::LifecycleWorker, + utils::{TestDb, TestDbBuilder}, + write_buffer::WriteBufferConsumer, }; - use arrow_util::assert_batches_eq; use data_types::{ database_rules::{PartitionTemplate, Partitioner, TemplatePart}, @@ -432,16 +429,18 @@ mod tests { min_max_sequence::OptionalMinMaxSequence, }; use query::{exec::ExecutionContextProvider, frontend::sql::SqlQueryPlanner}; + use std::{ + convert::TryFrom, + num::{NonZeroU32, NonZeroU64, NonZeroUsize}, + sync::Arc, + time::{Duration, Instant}, + }; use test_helpers::{assert_contains, assert_not_contains, tracing::TracingCapture}; use time::{Time, TimeProvider}; use tokio::task::JoinHandle; use tokio_util::sync::CancellationToken; use write_buffer::mock::{MockBufferForReading, MockBufferSharedState}; - use crate::lifecycle::LifecycleWorker; - use crate::utils::TestDb; - use crate::write_buffer::WriteBufferConsumer; - #[derive(Debug)] struct TestSequencedEntry { sequencer_id: u32, @@ -572,15 +571,17 @@ mod tests { let write_buffer_state = MockBufferSharedState::empty_with_n_sequencers(self.n_sequencers); - let (mut test_db, mut shutdown, mut join_handle) = Self::create_test_db( + let test_db_builder = Self::create_test_db_builder( Arc::clone(&object_store), server_id, db_name, partition_template.clone(), self.catalog_transactions_until_checkpoint, Arc::::clone(&time), - ) - .await; + ); + + let (mut test_db, mut shutdown, mut join_handle) = + Self::create_test_db(&test_db_builder).await; let mut lifecycle = LifecycleWorker::new(Arc::clone(&test_db.db)); @@ -620,15 +621,8 @@ mod tests { drop(test_db); // then create new one - let (test_db_tmp, shutdown_tmp, join_handle_tmp) = Self::create_test_db( - Arc::clone(&object_store), - server_id, - db_name, - partition_template.clone(), - self.catalog_transactions_until_checkpoint, - Arc::::clone(&time), - ) - .await; + let (test_db_tmp, shutdown_tmp, join_handle_tmp) = + Self::create_test_db(&test_db_builder).await; test_db = test_db_tmp; shutdown = shutdown_tmp; join_handle = join_handle_tmp; @@ -759,14 +753,29 @@ mod tests { } async fn create_test_db( + builder: &TestDbBuilder, + ) -> (TestDb, CancellationToken, JoinHandle<()>) { + let test_db = builder.build().await; + // start background worker + + let shutdown: CancellationToken = Default::default(); + let shutdown_captured = shutdown.clone(); + let db_captured = Arc::clone(&test_db.db); + let join_handle = + tokio::spawn(async move { db_captured.background_worker(shutdown_captured).await }); + + (test_db, shutdown, join_handle) + } + + fn create_test_db_builder( object_store: Arc, server_id: ServerId, db_name: &'static str, partition_template: PartitionTemplate, catalog_transactions_until_checkpoint: NonZeroU64, time_provider: Arc, - ) -> (TestDb, CancellationToken, JoinHandle<()>) { - let test_db = TestDb::builder() + ) -> TestDbBuilder { + TestDb::builder() .object_store(object_store) .server_id(server_id) .lifecycle_rules(data_types::database_rules::LifecycleRules { @@ -779,17 +788,6 @@ mod tests { .partition_template(partition_template) .time_provider(time_provider) .db_name(db_name) - .build() - .await; - - // start background worker - let shutdown: CancellationToken = Default::default(); - let shutdown_captured = shutdown.clone(); - let db_captured = Arc::clone(&test_db.db); - let join_handle = - tokio::spawn(async move { db_captured.background_worker(shutdown_captured).await }); - - (test_db, shutdown, join_handle) } /// Evaluates given checks. diff --git a/server/src/lib.rs b/server/src/lib.rs index 4dccae1a1d..c3a26547f7 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -876,6 +876,9 @@ where // immediately to the client and abort all other outstanding requests. futures_util::future::try_join_all(sharded_entries.into_iter().map( |sharded_entry| async { + // capture entire entry in closure + let sharded_entry = sharded_entry; + let sink = match &rules.routing_rules { Some(RoutingRules::ShardConfig(shard_config)) => { let id = sharded_entry.shard_id.expect("sharded entry"); @@ -1346,6 +1349,7 @@ pub mod test_utils { Arc::new(ApplicationState::new( Arc::new(ObjectStore::new_in_memory()), None, + None, )) } @@ -2078,7 +2082,7 @@ mod tests { async fn init_error_generic() { // use an object store that will hopefully fail to read let store = Arc::new(ObjectStore::new_failing_store().unwrap()); - let application = Arc::new(ApplicationState::new(store, None)); + let application = Arc::new(ApplicationState::new(store, None, None)); let server = make_server(application); server.set_id(ServerId::try_from(1).unwrap()).unwrap(); diff --git a/server/src/write_buffer.rs b/server/src/write_buffer.rs index c00cf172d5..4078f75a0a 100644 --- a/server/src/write_buffer.rs +++ b/server/src/write_buffer.rs @@ -10,6 +10,7 @@ use tokio_util::sync::CancellationToken; use entry::SequencedEntry; use observability_deps::tracing::{debug, error, info, warn}; +use trace::span::SpanRecorder; use write_buffer::core::{FetchHighWatermark, WriteBufferError, WriteBufferReading}; use crate::Db; @@ -151,12 +152,20 @@ async fn stream_in_sequenced_entries<'a>( // store entry let mut logged_hard_limit = false; loop { + let mut span_recorder = SpanRecorder::new( + sequenced_entry + .span_context() + .map(|parent| parent.child("IOx write buffer")), + ); + match db.store_sequenced_entry( Arc::clone(&sequenced_entry), crate::db::filter_table_batch_keep_all, ) { Ok(_) => { metrics.success(); + span_recorder.ok("stored entry"); + break; } Err(crate::db::Error::HardLimitReached {}) => { @@ -169,6 +178,8 @@ async fn stream_in_sequenced_entries<'a>( ); logged_hard_limit = true; } + span_recorder.error("hard limit reached"); + tokio::time::sleep(Duration::from_millis(100)).await; continue; } @@ -179,6 +190,7 @@ async fn stream_in_sequenced_entries<'a>( sequencer_id, "Error storing SequencedEntry from write buffer in database" ); + span_recorder.error("cannot store entry"); // no retry break; diff --git a/server_benchmarks/Cargo.toml b/server_benchmarks/Cargo.toml index d90c79527d..401c6898bc 100644 --- a/server_benchmarks/Cargo.toml +++ b/server_benchmarks/Cargo.toml @@ -2,7 +2,7 @@ name = "server_benchmarks" version = "0.1.0" authors = ["Andrew Lamb "] -edition = "2018" +edition = "2021" description = "Server related bechmarks, grouped into their own crate to minimize build dev build times" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html diff --git a/src/influxdb_ioxd.rs b/src/influxdb_ioxd.rs index 5063edf6ae..c8f16e3e51 100644 --- a/src/influxdb_ioxd.rs +++ b/src/influxdb_ioxd.rs @@ -79,7 +79,10 @@ async fn wait_for_signal() { let _ = tokio::signal::ctrl_c().await; } -async fn make_application(config: &Config) -> Result> { +async fn make_application( + config: &Config, + trace_collector: Option>, +) -> Result> { warn_about_inmem_store(&config.object_store_config); let object_store = ObjectStore::try_from(&config.object_store_config).context(ObjectStoreParsing)?; @@ -91,6 +94,7 @@ async fn make_application(config: &Config) -> Result> { Ok(Arc::new(ApplicationState::new( object_storage, config.num_worker_threads, + trace_collector, ))) } @@ -178,7 +182,11 @@ pub async fn main(config: Config) -> Result<()> { let f = SendPanicsToTracing::new(); std::mem::forget(f); - let application = make_application(&config).await?; + let async_exporter = config.tracing_config.build().context(Tracing)?; + let trace_collector = async_exporter + .clone() + .map(|x| -> Arc { x }); + let application = make_application(&config, trace_collector).await?; // Register jemalloc metrics application @@ -189,17 +197,12 @@ pub async fn main(config: Config) -> Result<()> { let grpc_listener = grpc_listener(config.grpc_bind_address).await?; let http_listener = http_listener(config.http_bind_address).await?; - let async_exporter = config.tracing_config.build().context(Tracing)?; - let trace_collector = async_exporter - .clone() - .map(|x| -> Arc { x }); let r = serve( config, application, grpc_listener, http_listener, - trace_collector, app_server, ) .await; @@ -241,7 +244,6 @@ async fn serve( application: Arc, grpc_listener: tokio::net::TcpListener, http_listener: AddrIncoming, - trace_collector: Option>, app_server: Arc>, ) -> Result<()> { // Construct a token to trigger shutdown of API services @@ -262,7 +264,6 @@ async fn serve( Arc::clone(&application), Arc::clone(&app_server), trace_header_parser.clone(), - trace_collector.clone(), frontend_shutdown.clone(), config.initial_serving_state.into(), ) @@ -279,7 +280,6 @@ async fn serve( frontend_shutdown.clone(), max_http_request_size, trace_header_parser, - trace_collector, ) .fuse(); info!("HTTP server listening"); @@ -381,7 +381,7 @@ mod tests { use super::*; use ::http::{header::HeaderName, HeaderValue}; use data_types::{database_rules::DatabaseRules, DatabaseName}; - use influxdb_iox_client::connection::Connection; + use influxdb_iox_client::{connection::Connection, flight::PerformQuery}; use server::rules::ProvidedDatabaseRules; use std::{convert::TryInto, num::NonZeroU64}; use structopt::StructOpt; @@ -412,16 +412,9 @@ mod tests { let grpc_listener = grpc_listener(config.grpc_bind_address).await.unwrap(); let http_listener = http_listener(config.grpc_bind_address).await.unwrap(); - serve( - config, - application, - grpc_listener, - http_listener, - None, - server, - ) - .await - .unwrap() + serve(config, application, grpc_listener, http_listener, server) + .await + .unwrap() } #[tokio::test] @@ -430,7 +423,7 @@ mod tests { // Create a server and wait for it to initialize let config = test_config(Some(23)); - let application = make_application(&config).await.unwrap(); + let application = make_application(&config, None).await.unwrap(); let server = make_server(Arc::clone(&application), &config); server.wait_for_init().await.unwrap(); @@ -458,7 +451,7 @@ mod tests { async fn test_server_shutdown_uninit() { // Create a server but don't set a server id let config = test_config(None); - let application = make_application(&config).await.unwrap(); + let application = make_application(&config, None).await.unwrap(); let server = make_server(Arc::clone(&application), &config); let serve_fut = test_serve(config, application, Arc::clone(&server)).fuse(); @@ -489,7 +482,7 @@ mod tests { async fn test_server_panic() { // Create a server and wait for it to initialize let config = test_config(Some(999999999)); - let application = make_application(&config).await.unwrap(); + let application = make_application(&config, None).await.unwrap(); let server = make_server(Arc::clone(&application), &config); server.wait_for_init().await.unwrap(); @@ -516,7 +509,7 @@ mod tests { async fn test_database_panic() { // Create a server and wait for it to initialize let config = test_config(Some(23)); - let application = make_application(&config).await.unwrap(); + let application = make_application(&config, None).await.unwrap(); let server = make_server(Arc::clone(&application), &config); server.wait_for_init().await.unwrap(); @@ -597,7 +590,9 @@ mod tests { JoinHandle>, ) { let config = test_config(Some(23)); - let application = make_application(&config).await.unwrap(); + let application = make_application(&config, Some(Arc::::clone(collector))) + .await + .unwrap(); let server = make_server(Arc::clone(&application), &config); server.wait_for_init().await.unwrap(); @@ -611,7 +606,6 @@ mod tests { application, grpc_listener, http_listener, - Some(Arc::::clone(collector)), Arc::clone(&server), ); @@ -690,6 +684,11 @@ mod tests { join.await.unwrap().unwrap(); } + /// Ensure that query is fully executed. + async fn consume_query(mut query: PerformQuery) { + while query.next().await.unwrap().is_some() {} + } + #[tokio::test] async fn test_query_tracing() { let collector = Arc::new(RingBufferTraceCollector::new(100)); @@ -721,10 +720,13 @@ mod tests { .unwrap(); let mut flight = influxdb_iox_client::flight::Client::new(conn.clone()); - flight - .perform_query(db_info.db_name(), "select * from cpu;") - .await - .unwrap(); + consume_query( + flight + .perform_query(db_info.db_name(), "select * from cpu;") + .await + .unwrap(), + ) + .await; flight .perform_query("nonexistent", "select * from cpu;") diff --git a/src/influxdb_ioxd/http.rs b/src/influxdb_ioxd/http.rs index d35a84a7c0..e8ece0a4e1 100644 --- a/src/influxdb_ioxd/http.rs +++ b/src/influxdb_ioxd/http.rs @@ -52,7 +52,6 @@ use std::{ }; use tokio_util::sync::CancellationToken; use tower::Layer; -use trace::TraceCollector; use trace_http::tower::TraceLayer; /// Constants used in API error codes. @@ -865,12 +864,12 @@ pub async fn serve( shutdown: CancellationToken, max_request_size: usize, trace_header_parser: TraceHeaderParser, - trace_collector: Option>, ) -> Result<(), hyper::Error> where M: ConnectionManager + Send + Sync + Debug + 'static, { let metric_registry = Arc::clone(application.metric_registry()); + let trace_collector = application.trace_collector().clone(); let trace_layer = TraceLayer::new(trace_header_parser, metric_registry, trace_collector, false); let lp_metrics = Arc::new(LineProtocolMetrics::new( @@ -924,6 +923,7 @@ mod tests { Arc::new(ApplicationState::new( Arc::new(ObjectStore::new_in_memory()), None, + None, )) } @@ -939,7 +939,7 @@ mod tests { async fn test_health() { let application = make_application(); let app_server = make_server(Arc::clone(&application)); - let server_url = test_server(application, Arc::clone(&app_server), None); + let server_url = test_server(application, Arc::clone(&app_server)); let client = Client::new(); let response = client.get(&format!("{}/health", server_url)).send().await; @@ -958,7 +958,7 @@ mod tests { .register_metric("my_metric", "description"); let app_server = make_server(Arc::clone(&application)); - let server_url = test_server(application, Arc::clone(&app_server), None); + let server_url = test_server(application, Arc::clone(&app_server)); metric.recorder(&[("tag", "value")]).inc(20); @@ -998,15 +998,15 @@ mod tests { #[tokio::test] async fn test_tracing() { - let application = make_application(); - let app_server = make_server(Arc::clone(&application)); let trace_collector = Arc::new(RingBufferTraceCollector::new(5)); - - let server_url = test_server( - application, - Arc::clone(&app_server), + let application = Arc::new(ApplicationState::new( + Arc::new(ObjectStore::new_in_memory()), + None, Some(Arc::::clone(&trace_collector)), - ); + )); + let app_server = make_server(Arc::clone(&application)); + + let server_url = test_server(application, Arc::clone(&app_server)); let client = Client::new(); let response = client @@ -1036,7 +1036,7 @@ mod tests { .create_database(make_rules("MyOrg_MyBucket")) .await .unwrap(); - let server_url = test_server(application, Arc::clone(&app_server), None); + let server_url = test_server(application, Arc::clone(&app_server)); let client = Client::new(); @@ -1083,7 +1083,7 @@ mod tests { .create_database(make_rules("MyOrg_MyBucket")) .await .unwrap(); - let server_url = test_server(application, Arc::clone(&app_server), None); + let server_url = test_server(application, Arc::clone(&app_server)); // Set up client let client = Client::new(); @@ -1209,7 +1209,7 @@ mod tests { .await .unwrap(); - let server_url = test_server(application, Arc::clone(&app_server), None); + let server_url = test_server(application, Arc::clone(&app_server)); let client = Client::new(); @@ -1399,7 +1399,7 @@ mod tests { .create_database(make_rules("MyOrg_MyBucket")) .await .unwrap(); - let server_url = test_server(application, Arc::clone(&app_server), None); + let server_url = test_server(application, Arc::clone(&app_server)); let client = Client::new(); @@ -1693,7 +1693,6 @@ mod tests { fn test_server( application: Arc, server: Arc>, - trace_collector: Option>, ) -> String { // NB: specify port 0 to let the OS pick the port. let bind_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0); @@ -1710,7 +1709,6 @@ mod tests { CancellationToken::new(), TEST_MAX_REQUEST_SIZE, trace_header_parser, - trace_collector, )); println!("Started server at {}", server_url); server_url @@ -1734,7 +1732,7 @@ mod tests { .create_database(make_rules("MyOrg_MyBucket")) .await .unwrap(); - let server_url = test_server(application, Arc::clone(&app_server), None); + let server_url = test_server(application, Arc::clone(&app_server)); (app_server, server_url) } diff --git a/src/influxdb_ioxd/planner.rs b/src/influxdb_ioxd/planner.rs index dc04bc77c7..c238baa299 100644 --- a/src/influxdb_ioxd/planner.rs +++ b/src/influxdb_ioxd/planner.rs @@ -7,11 +7,7 @@ use query::{ exec::IOxExecutionContext, frontend::{influxrpc::InfluxRpcPlanner, sql::SqlQueryPlanner}, group_by::{Aggregate, WindowDuration}, - plan::{ - fieldlist::FieldListPlan, - seriesset::SeriesSetPlans, - stringset::{StringSetPlan, TableNamePlanBuilder}, - }, + plan::{fieldlist::FieldListPlan, seriesset::SeriesSetPlans, stringset::StringSetPlan}, QueryDatabase, }; @@ -54,7 +50,7 @@ impl Planner { &self, database: Arc, predicate: Predicate, - ) -> Result + ) -> Result where D: QueryDatabase + 'static, { diff --git a/src/influxdb_ioxd/rpc.rs b/src/influxdb_ioxd/rpc.rs index 226341fd85..857f1fface 100644 --- a/src/influxdb_ioxd/rpc.rs +++ b/src/influxdb_ioxd/rpc.rs @@ -11,7 +11,6 @@ use trace_http::ctx::TraceHeaderParser; use crate::influxdb_ioxd::serving_readiness::ServingReadiness; use server::{connection::ConnectionManager, ApplicationState, Server}; -use trace::TraceCollector; pub mod error; mod flight; @@ -90,7 +89,6 @@ pub async fn serve( application: Arc, server: Arc>, trace_header_parser: TraceHeaderParser, - trace_collector: Option>, shutdown: CancellationToken, serving_readiness: ServingReadiness, ) -> Result<()> @@ -109,7 +107,7 @@ where let mut builder = builder.layer(trace_http::tower::TraceLayer::new( trace_header_parser, Arc::clone(application.metric_registry()), - trace_collector, + application.trace_collector().clone(), true, )); diff --git a/src/influxdb_ioxd/rpc/storage/service.rs b/src/influxdb_ioxd/rpc/storage/service.rs index 0a445c7efa..4374466835 100644 --- a/src/influxdb_ioxd/rpc/storage/service.rs +++ b/src/influxdb_ioxd/rpc/storage/service.rs @@ -724,14 +724,14 @@ where let db = db_store.db(db_name).context(DatabaseNotFound { db_name })?; let ctx = db.new_query_context(span_ctx); - let builder = Planner::new(&ctx) + let plan = Planner::new(&ctx) .table_names(db, predicate) .await .map_err(|e| Box::new(e) as _) .context(ListingTables { db_name })?; let table_names = ctx - .to_table_names(builder) + .to_string_set(plan) .await .map_err(|e| Box::new(e) as _) .context(ListingTables { db_name })?; @@ -1116,11 +1116,11 @@ mod tests { let chunk0 = TestChunk::new("h2o") .with_id(0) - .with_predicate_match(PredicateMatch::AtLeastOne); + .with_predicate_match(PredicateMatch::AtLeastOneNonNullField); let chunk1 = TestChunk::new("o2") .with_id(1) - .with_predicate_match(PredicateMatch::AtLeastOne); + .with_predicate_match(PredicateMatch::AtLeastOneNonNullField); fixture .test_storage @@ -1474,7 +1474,8 @@ mod tests { tag_key: [0].into(), }; - let chunk = TestChunk::new("h2o").with_predicate_match(PredicateMatch::AtLeastOne); + let chunk = + TestChunk::new("h2o").with_predicate_match(PredicateMatch::AtLeastOneNonNullField); fixture .test_storage diff --git a/test_helpers/Cargo.toml b/test_helpers/Cargo.toml index 4ccfa8f678..ddfcc04610 100644 --- a/test_helpers/Cargo.toml +++ b/test_helpers/Cargo.toml @@ -2,11 +2,11 @@ name = "test_helpers" version = "0.1.0" authors = ["Paul Dix "] -edition = "2018" +edition = "2021" [dependencies] # In alphabetical order dotenv = "0.15.0" parking_lot = "0.11.2" tempfile = "3.1.0" -tracing-subscriber = "0.2" +tracing-subscriber = { version = "0.3", features = ["env-filter"] } observability_deps = { path = "../observability_deps" } diff --git a/tests/common/server_fixture.rs b/tests/common/server_fixture.rs index bc713382ad..cc059f3ccb 100644 --- a/tests/common/server_fixture.rs +++ b/tests/common/server_fixture.rs @@ -292,7 +292,7 @@ struct TestServer { } // Options for creating test servers -#[derive(Default, Debug)] +#[derive(Default, Debug, Clone)] pub struct TestConfig { /// Additional environment variables env: Vec<(String, String)>, diff --git a/tests/end_to_end_cases/write_buffer.rs b/tests/end_to_end_cases/write_buffer.rs index 16dd15ea70..17e2bbd631 100644 --- a/tests/end_to_end_cases/write_buffer.rs +++ b/tests/end_to_end_cases/write_buffer.rs @@ -1,5 +1,8 @@ use crate::{ - common::server_fixture::ServerFixture, + common::{ + server_fixture::{ServerFixture, TestConfig}, + udp_listener::UdpCapture, + }, end_to_end_cases::scenario::{rand_name, DatabaseBuilder}, }; use arrow_util::assert_batches_sorted_eq; @@ -17,6 +20,7 @@ use rdkafka::{ ClientConfig, Message, Offset, TopicPartitionList, }; use std::convert::TryFrom; +use tempfile::TempDir; use test_helpers::assert_contains; use write_buffer::{kafka::test_utils::kafka_sequencer_options, maybe_skip_kafka_integration}; @@ -325,3 +329,89 @@ async fn test_create_database_missing_write_buffer_sequencers() { &err ); } + +#[tokio::test] +pub async fn test_cross_write_buffer_tracing() { + let write_buffer_dir = TempDir::new().unwrap(); + + // setup tracing + let udp_capture = UdpCapture::new().await; + let test_config = TestConfig::new() + .with_env("TRACES_EXPORTER", "jaeger") + .with_env("TRACES_EXPORTER_JAEGER_AGENT_HOST", udp_capture.ip()) + .with_env("TRACES_EXPORTER_JAEGER_AGENT_PORT", udp_capture.port()) + .with_client_header("jaeger-debug-id", "some-debug-id"); + + // we need to use two servers but the same DB name here because the Kafka topic is named after the DB name + let db_name = rand_name(); + + // create producer server + let server_write = ServerFixture::create_single_use_with_config(test_config.clone()).await; + server_write + .management_client() + .update_server_id(1) + .await + .unwrap(); + server_write.wait_server_initialized().await; + let conn_write = WriteBufferConnection { + direction: WriteBufferDirection::Write.into(), + r#type: "file".to_string(), + connection: write_buffer_dir.path().display().to_string(), + creation_config: Some(WriteBufferCreationConfig { + n_sequencers: 1, + options: kafka_sequencer_options(), + }), + ..Default::default() + }; + DatabaseBuilder::new(db_name.clone()) + .write_buffer(conn_write.clone()) + .build(server_write.grpc_channel()) + .await; + + // create consumer DB + let server_read = ServerFixture::create_single_use_with_config(test_config).await; + server_read + .management_client() + .update_server_id(2) + .await + .unwrap(); + server_read.wait_server_initialized().await; + let conn_read = WriteBufferConnection { + direction: WriteBufferDirection::Read.into(), + ..conn_write + }; + DatabaseBuilder::new(db_name.clone()) + .write_buffer(conn_read) + .build(server_read.grpc_channel()) + .await; + + // write some points + let mut write_client = server_write.write_client(); + let lp_lines = [ + "cpu,region=west user=23.2 100", + "cpu,region=west user=21.0 150", + "disk,region=east bytes=99i 200", + ]; + let num_lines_written = write_client + .write(&db_name, lp_lines.join("\n")) + .await + .expect("cannot write"); + assert_eq!(num_lines_written, 3); + + // "shallow" packet inspection and verify the UDP server got + // something that had some expected results (maybe we could + // eventually verify the payload here too) + udp_capture + .wait_for(|m| m.to_string().contains("IOx write buffer")) + .await; + udp_capture + .wait_for(|m| m.to_string().contains("stored entry")) + .await; + + // // debugging assistance + // tokio::time::sleep(std::time::Duration::from_secs(10)).await; + // println!("Traces received (1):\n\n{:#?}", udp_capture.messages()); + + // wait for the UDP server to shutdown + udp_capture.stop().await +} diff --git a/time/Cargo.toml b/time/Cargo.toml index 0557fda4c0..f1b4c9075b 100644 --- a/time/Cargo.toml +++ b/time/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "time" version = "0.1.0" -edition = "2018" +edition = "2021" description = "Time functionality for IOx" [dependencies] diff --git a/trace/Cargo.toml b/trace/Cargo.toml index 1e1d0f2527..c5173ac459 100644 --- a/trace/Cargo.toml +++ b/trace/Cargo.toml @@ -2,7 +2,7 @@ name = "trace" version = "0.1.0" authors = ["Raphael Taylor-Davies "] -edition = "2018" +edition = "2021" description = "Distributed tracing support within IOx" [dependencies] diff --git a/trace_exporters/Cargo.toml b/trace_exporters/Cargo.toml index c9e3d07f60..1f91219680 100644 --- a/trace_exporters/Cargo.toml +++ b/trace_exporters/Cargo.toml @@ -2,7 +2,7 @@ name = "trace_exporters" version = "0.1.0" authors = ["Raphael Taylor-Davies "] -edition = "2018" +edition = "2021" description = "Additional tracing exporters for IOx" [dependencies] diff --git a/trace_http/Cargo.toml b/trace_http/Cargo.toml index 74ef6b9ef6..311a636f93 100644 --- a/trace_http/Cargo.toml +++ b/trace_http/Cargo.toml @@ -2,7 +2,7 @@ name = "trace_http" version = "0.1.0" authors = ["Raphael Taylor-Davies "] -edition = "2018" +edition = "2021" description = "Distributed tracing support for HTTP services" [dependencies] diff --git a/tracker/Cargo.toml b/tracker/Cargo.toml index 66ab6b6f48..3a71bcd1fc 100644 --- a/tracker/Cargo.toml +++ b/tracker/Cargo.toml @@ -2,7 +2,7 @@ name = "tracker" version = "0.1.0" authors = ["Raphael Taylor-Davies "] -edition = "2018" +edition = "2021" description = "Utilities for tracking resource utilisation within IOx" [dependencies] diff --git a/trogging/Cargo.toml b/trogging/Cargo.toml index 0647192fab..c44c0bc164 100644 --- a/trogging/Cargo.toml +++ b/trogging/Cargo.toml @@ -2,7 +2,7 @@ name = "trogging" version = "0.1.0" authors = ["Marko Mikulicic "] -edition = "2018" +edition = "2021" description = "IOx logging pipeline built upon tokio-tracing" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html @@ -12,7 +12,7 @@ logfmt = { path = "../logfmt" } observability_deps = { path = "../observability_deps" } thiserror = "1.0.30" tracing-log = "0.1" -tracing-subscriber = "0.2" +tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] } structopt = { version = "0.3.25", optional = true } [dev-dependencies] diff --git a/trogging/src/cli.rs b/trogging/src/cli.rs index c0538d14f9..e1166fd30c 100644 --- a/trogging/src/cli.rs +++ b/trogging/src/cli.rs @@ -105,7 +105,7 @@ impl LoggingConfig { pub fn with_builder(&self, builder: Builder) -> Builder where - W: MakeWriter + Send + Sync + Clone + 'static, + W: for<'writer> MakeWriter<'writer> + Send + Sync + Clone + 'static, { builder .with_log_filter(&self.log_filter) @@ -129,7 +129,7 @@ pub trait LoggingConfigBuilderExt { impl LoggingConfigBuilderExt for Builder where - W: MakeWriter + Send + Sync + Clone + 'static, + W: for<'writer> MakeWriter<'writer> + Send + Sync + Clone + 'static, { fn with_logging_config(self, config: &LoggingConfig) -> Builder { config.with_builder(self) diff --git a/trogging/src/lib.rs b/trogging/src/lib.rs index 95f1f32abf..dc3c7bde92 100644 --- a/trogging/src/lib.rs +++ b/trogging/src/lib.rs @@ -86,7 +86,7 @@ impl Builder { impl Builder { pub fn with_writer(self, make_writer: W2) -> Builder where - W2: MakeWriter + Send + Sync + 'static, + W2: for<'writer> MakeWriter<'writer> + Send + Sync + 'static, { Builder:: { make_writer, @@ -103,7 +103,7 @@ impl Builder { // This needs to be a separate impl block because they place different bounds on the type parameters. impl Builder where - W: MakeWriter + Send + Sync + 'static, + W: for<'writer> MakeWriter<'writer> + Send + Sync + 'static, { pub const DEFAULT_LOG_FILTER: &'static str = "warn"; @@ -277,17 +277,30 @@ impl Drop for TroggingGuard { fn make_writer(m: M) -> BoxMakeWriter where - M: MakeWriter + Send + Sync + 'static, + M: for<'writer> MakeWriter<'writer> + Send + Sync + 'static, { - fmt::writer::BoxMakeWriter::new(move || { - std::io::LineWriter::with_capacity( - MAX_LINE_LENGTH, - LimitedWriter(MAX_LINE_LENGTH, m.make_writer()), - ) + BoxMakeWriter::new(MakeWriterHelper { + inner: BoxMakeWriter::new(m), }) } +struct MakeWriterHelper { + inner: BoxMakeWriter, +} + +impl<'a> MakeWriter<'a> for MakeWriterHelper { + type Writer = Box; + + fn make_writer(&'a self) -> Self::Writer { + Box::new(std::io::LineWriter::with_capacity( + MAX_LINE_LENGTH, + LimitedWriter(MAX_LINE_LENGTH, self.inner.make_writer()), + )) + } +} + struct LimitedWriter(usize, W); + impl Write for LimitedWriter { fn write(&mut self, buf: &[u8]) -> std::io::Result { if buf.is_empty() { @@ -341,7 +354,7 @@ pub mod test_util { } } - impl MakeWriter for TestWriter { + impl MakeWriter<'_> for TestWriter { type Writer = SynchronizedWriter>; fn make_writer(&self) -> Self::Writer { @@ -356,9 +369,9 @@ pub mod test_util { /// Removes non-determinism by removing timestamps from the log lines. /// It supports the built-in tracing timestamp format and the logfmt timestamps. pub fn without_timestamps(&self) -> String { - // logfmt or fmt::layer() time format + // logfmt (e.g. `time=12345`) or fmt::layer() (e.g. `2021-10-25T13:48:50.555258`) time format let timestamp = regex::Regex::new( - r"(?m)( ?time=[0-9]+|^([A-Z][a-z]{2}) \d{1,2} \d{2}:\d{2}:\d{2}.\d{3} *)", + r"(?m)( ?time=[0-9]+|^(\d{4})-\d{1,2}-\d{1,2}T\d{2}:\d{2}:\d{2}.\d+Z *)", ) .unwrap(); timestamp.replace_all(&self.to_string(), "").to_string() @@ -379,7 +392,7 @@ pub mod test_util { /// the logging macros invoked by the function. pub fn log_test(builder: Builder, f: F) -> Captured where - W: MakeWriter + Send + Sync + 'static, + W: for<'writer> MakeWriter<'writer> + Send + Sync + 'static, F: Fn(), { let (writer, output) = TestWriter::new(); @@ -401,7 +414,7 @@ pub mod test_util { /// and returns the captured output. pub fn simple_test(builder: Builder) -> Captured where - W: MakeWriter + Send + Sync + 'static, + W: for<'writer> MakeWriter<'writer> + Send + Sync + 'static, { log_test(builder, || { error!("foo"); @@ -598,7 +611,8 @@ ERROR foo #[test] fn line_buffering() { let (test_writer, captured) = TestWriter::new(); - let mut writer = make_writer(test_writer).make_writer(); + let mw = make_writer(test_writer); + let mut writer = mw.make_writer(); writer.write_all("foo".as_bytes()).unwrap(); // wasn't flushed yet because there was no newline yet assert_eq!(captured.to_string(), ""); @@ -611,7 +625,8 @@ ERROR foo // another case when the line buffer flushes even before a newline is when the internal buffer limit let (test_writer, captured) = TestWriter::new(); - let mut writer = make_writer(test_writer).make_writer(); + let mw = make_writer(test_writer); + let mut writer = mw.make_writer(); let long = std::iter::repeat(b'X') .take(MAX_LINE_LENGTH) .collect::>(); diff --git a/write_buffer/Cargo.toml b/write_buffer/Cargo.toml index a6fbda7608..2afe4a73b4 100644 --- a/write_buffer/Cargo.toml +++ b/write_buffer/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "write_buffer" version = "0.1.0" -edition = "2018" +edition = "2021" [dependencies] async-trait = "0.1"