From 01a44899fd21048904a92a8eee8a0974cff8bb01 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 11 Oct 2021 01:45:35 +0000 Subject: [PATCH 01/11] chore(deps): bump assert_cmd from 2.0.1 to 2.0.2 Bumps [assert_cmd](https://github.com/assert-rs/assert_cmd) from 2.0.1 to 2.0.2. - [Release notes](https://github.com/assert-rs/assert_cmd/releases) - [Changelog](https://github.com/assert-rs/assert_cmd/blob/master/CHANGELOG.md) - [Commits](https://github.com/assert-rs/assert_cmd/compare/v2.0.1...v2.0.2) --- updated-dependencies: - dependency-name: assert_cmd dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] --- Cargo.lock | 4 ++-- Cargo.toml | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2e04f881c9..6285704a07 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -182,9 +182,9 @@ dependencies = [ [[package]] name = "assert_cmd" -version = "2.0.1" +version = "2.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b800c4403e8105d959595e1f88119e78bc12bc874c4336973658b648a746ba93" +checksum = "e996dc7940838b7ef1096b882e29ec30a3149a3a443cdc8dba19ed382eca1fe2" dependencies = [ "bstr", "doc-comment", diff --git a/Cargo.toml b/Cargo.toml index 19074244ed..79708a9175 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -172,7 +172,7 @@ parking_lot = "0.11.2" write_buffer = { path = "write_buffer" } # Crates.io dependencies, in alphabetical order -assert_cmd = "2.0.0" +assert_cmd = "2.0.2" flate2 = "1.0" hex = "0.4.2" predicates = "2.0.2" From 1327735537a991ab3f311e8058b22bc8bf5358ca Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 11 Oct 2021 08:01:21 +0000 Subject: [PATCH 02/11] chore(deps): bump thiserror from 1.0.29 to 1.0.30 Bumps [thiserror](https://github.com/dtolnay/thiserror) from 1.0.29 to 1.0.30. - [Release notes](https://github.com/dtolnay/thiserror/releases) - [Commits](https://github.com/dtolnay/thiserror/compare/1.0.29...1.0.30) --- updated-dependencies: - dependency-name: thiserror dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] --- Cargo.lock | 8 ++++---- Cargo.toml | 2 +- client_util/Cargo.toml | 2 +- generated_types/Cargo.toml | 2 +- grpc-router/Cargo.toml | 2 +- influxdb_iox_client/Cargo.toml | 2 +- trogging/Cargo.toml | 2 +- 7 files changed, 10 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6285704a07..ff04f53de6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4179,18 +4179,18 @@ dependencies = [ [[package]] name = "thiserror" -version = "1.0.29" +version = "1.0.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "602eca064b2d83369e2b2f34b09c70b605402801927c65c11071ac911d299b88" +checksum = "854babe52e4df1653706b98fcfc05843010039b406875930a70e4d9644e5c417" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.29" +version = "1.0.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bad553cc2c78e8de258400763a647e80e6d1b31ee237275d756f6836d204494c" +checksum = "aa32fd3f627f367fe16f893e2597ae3c05020f8bba2666a4e6ea73d377e5714b" dependencies = [ "proc-macro2", "quote", diff --git a/Cargo.toml b/Cargo.toml index 79708a9175..0aba2fbf0a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -143,7 +143,7 @@ serde_json = "1.0.67" serde_urlencoded = "0.7.0" snafu = "0.6.9" structopt = "0.3.23" -thiserror = "1.0.28" +thiserror = "1.0.30" tikv-jemalloc-ctl = { version = "0.4.0" } tokio = { version = "1.11", features = ["macros", "rt-multi-thread", "parking_lot", "signal"] } tokio-stream = { version = "0.1.2", features = ["net"] } diff --git a/client_util/Cargo.toml b/client_util/Cargo.toml index c7b1f313ab..b61f69e75e 100644 --- a/client_util/Cargo.toml +++ b/client_util/Cargo.toml @@ -8,7 +8,7 @@ edition = "2018" [dependencies] http = "0.2.3" prost = "0.8" -thiserror = "1.0.28" +thiserror = "1.0.30" tonic = { version = "0.5.0" } tower = "0.4" diff --git a/generated_types/Cargo.toml b/generated_types/Cargo.toml index 5d51cbbfe4..51db90847d 100644 --- a/generated_types/Cargo.toml +++ b/generated_types/Cargo.toml @@ -13,7 +13,7 @@ pbjson-types = "0.1" prost = "0.8" regex = "1.4" serde = { version = "1.0", features = ["derive"] } -thiserror = "1.0.28" +thiserror = "1.0.30" tonic = "0.5" [dev-dependencies] diff --git a/grpc-router/Cargo.toml b/grpc-router/Cargo.toml index 1b9f7821f6..bab1334eee 100644 --- a/grpc-router/Cargo.toml +++ b/grpc-router/Cargo.toml @@ -12,7 +12,7 @@ observability_deps = { path = "../observability_deps" } paste = "1.0.5" prost = "0.8" prost-types = "0.8" -thiserror = "1.0.28" +thiserror = "1.0.30" tokio = { version = "1.11", features = ["macros", "rt-multi-thread", "parking_lot", "signal"] } tokio-stream = { version = "0.1.2", features = ["net"] } tokio-util = { version = "0.6.3" } diff --git a/influxdb_iox_client/Cargo.toml b/influxdb_iox_client/Cargo.toml index 3474cb3510..ad5291306c 100644 --- a/influxdb_iox_client/Cargo.toml +++ b/influxdb_iox_client/Cargo.toml @@ -23,7 +23,7 @@ prost = "0.8" rand = "0.8.3" serde = "1.0.128" serde_json = { version = "1.0.67", optional = true } -thiserror = "1.0.28" +thiserror = "1.0.30" tonic = { version = "0.5.0" } [dev-dependencies] # In alphabetical order diff --git a/trogging/Cargo.toml b/trogging/Cargo.toml index b25997ad8e..489e255ac0 100644 --- a/trogging/Cargo.toml +++ b/trogging/Cargo.toml @@ -10,7 +10,7 @@ description = "IOx logging pipeline built upon tokio-tracing" [dependencies] logfmt = { path = "../logfmt" } observability_deps = { path = "../observability_deps" } -thiserror = "1.0.28" +thiserror = "1.0.30" tracing-log = "0.1" tracing-subscriber = "0.2" structopt = { version = "0.3.23", optional = true } From 3f921d43e60644834bc8cd7d976dc2aa92d7110a Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 11 Oct 2021 08:14:11 +0000 Subject: [PATCH 03/11] chore(deps): bump tracing-subscriber from 0.2.24 to 0.2.25 Bumps [tracing-subscriber](https://github.com/tokio-rs/tracing) from 0.2.24 to 0.2.25. - [Release notes](https://github.com/tokio-rs/tracing/releases) - [Commits](https://github.com/tokio-rs/tracing/compare/tracing-subscriber-0.2.24...tracing-subscriber-0.2.25) --- updated-dependencies: - dependency-name: tracing-subscriber dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] --- Cargo.lock | 4 ++-- iox_data_generator/Cargo.toml | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ff04f53de6..5cb09499c8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4595,9 +4595,9 @@ dependencies = [ [[package]] name = "tracing-subscriber" -version = "0.2.24" +version = "0.2.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fdd0568dbfe3baf7048b7908d2b32bca0d81cd56bec6d2a8f894b01d74f86be3" +checksum = "0e0d2eaa99c3c2e41547cfa109e910a68ea03823cccad4a0525dcbc9b01e8c71" dependencies = [ "ansi_term 0.12.1", "chrono", diff --git a/iox_data_generator/Cargo.toml b/iox_data_generator/Cargo.toml index 49abafcd81..750a045274 100644 --- a/iox_data_generator/Cargo.toml +++ b/iox_data_generator/Cargo.toml @@ -28,7 +28,7 @@ tokio = { version = "1.11", features = ["macros", "rt-multi-thread"] } toml = "0.5.6" tracing = "0.1" tracing-futures = "0.2.4" -tracing-subscriber = "0.2.24" +tracing-subscriber = "0.2.25" uuid = { version = "0.8.1", default_features = false } [dev-dependencies] From 71a9aa212065701828be2d407e025b486cb2f110 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 11 Oct 2021 08:24:27 +0000 Subject: [PATCH 04/11] chore(deps): bump ouroboros from 0.12.0 to 0.13.0 Bumps [ouroboros](https://github.com/joshua-maros/ouroboros) from 0.12.0 to 0.13.0. - [Release notes](https://github.com/joshua-maros/ouroboros/releases) - [Commits](https://github.com/joshua-maros/ouroboros/commits) --- updated-dependencies: - dependency-name: ouroboros dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] --- Cargo.lock | 8 ++++---- entry/Cargo.toml | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5cb09499c8..8a4acd251e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2589,9 +2589,9 @@ dependencies = [ [[package]] name = "ouroboros" -version = "0.12.0" +version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c711f35b4e881534535e7d943aa158ed673baf8e73b06bdd93b31703cf968cc3" +checksum = "f357ef82d1b4db66fbed0b8d542cbd3c22d0bf5b393b3c257b9ba4568e70c9c3" dependencies = [ "aliasable", "ouroboros_macro", @@ -2600,9 +2600,9 @@ dependencies = [ [[package]] name = "ouroboros_macro" -version = "0.12.0" +version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "adbdedd935f91827ea19bb8b97c20dd5870221eac3e9d9a2e70367ecc813479d" +checksum = "44a0b52c2cbaef7dffa5fec1a43274afe8bd2a644fa9fc50a9ef4ff0269b1257" dependencies = [ "Inflector", "proc-macro-error", diff --git a/entry/Cargo.toml b/entry/Cargo.toml index 94d83b9724..3e1c559fb2 100644 --- a/entry/Cargo.toml +++ b/entry/Cargo.toml @@ -13,6 +13,6 @@ data_types = { path = "../data_types" } flatbuffers = "2" snafu = "0.6" influxdb_line_protocol = { path = "../influxdb_line_protocol" } -ouroboros = "0.12.0" +ouroboros = "0.13.0" internal_types = { path = "../internal_types" } generated_types = { path = "../generated_types" } From 42d4ad61e1aaad2f903cb1e75b451d724d84a5b7 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 11 Oct 2021 08:36:50 +0000 Subject: [PATCH 05/11] chore(deps): bump ahash from 0.7.4 to 0.7.5 Bumps [ahash](https://github.com/tkaitchuck/ahash) from 0.7.4 to 0.7.5. - [Release notes](https://github.com/tkaitchuck/ahash/releases) - [Commits](https://github.com/tkaitchuck/ahash/commits) --- updated-dependencies: - dependency-name: ahash dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] --- Cargo.lock | 4 ++-- arrow_util/Cargo.toml | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8a4acd251e..3e8ad10161 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -31,9 +31,9 @@ checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" [[package]] name = "ahash" -version = "0.7.4" +version = "0.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "43bb833f0bf979d8475d38fbf09ed3b8a55e1885fe93ad3f93239fc6a4f17b98" +checksum = "991984e3fd003e7ba02eb724f87a0f997b78677c46c0e91f8424ad7394c9886a" dependencies = [ "getrandom 0.2.3", "once_cell", diff --git a/arrow_util/Cargo.toml b/arrow_util/Cargo.toml index d84076b063..34e9adcf81 100644 --- a/arrow_util/Cargo.toml +++ b/arrow_util/Cargo.toml @@ -8,7 +8,7 @@ description = "Apache Arrow utilities" [dependencies] arrow = { version = "5.5", features = ["prettyprint"] } -ahash = "0.7.2" +ahash = "0.7.5" num-traits = "0.2" snafu = "0.6" hashbrown = "0.11" From 7cf3cf252a4c361590cec201ef0dd515589f6865 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 11 Oct 2021 09:05:33 +0000 Subject: [PATCH 06/11] chore(deps): bump tracing from 0.1.28 to 0.1.29 (#2788) Bumps [tracing](https://github.com/tokio-rs/tracing) from 0.1.28 to 0.1.29. - [Release notes](https://github.com/tokio-rs/tracing/releases) - [Commits](https://github.com/tokio-rs/tracing/compare/tracing-0.1.28...tracing-0.1.29) --- updated-dependencies: - dependency-name: tracing dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- Cargo.lock | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3e8ad10161..cec27c9908 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4531,9 +4531,9 @@ dependencies = [ [[package]] name = "tracing" -version = "0.1.28" +version = "0.1.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "84f96e095c0c82419687c20ddf5cb3eadb61f4e1405923c9dc8e53a1adacbda8" +checksum = "375a639232caf30edfc78e8d89b2d4c375515393e7af7e16f01cd96917fb2105" dependencies = [ "cfg-if", "log", @@ -4544,9 +4544,9 @@ dependencies = [ [[package]] name = "tracing-attributes" -version = "0.1.16" +version = "0.1.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "98863d0dd09fa59a1b79c6750ad80dbda6b75f4e71c437a6a1a8cb91a8bcbd77" +checksum = "f4f480b8f81512e825f337ad51e94c1eb5d3bbdf2b363dcd01e2b19a9ffe3f8e" dependencies = [ "proc-macro2", "quote", @@ -4555,9 +4555,9 @@ dependencies = [ [[package]] name = "tracing-core" -version = "0.1.20" +version = "0.1.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "46125608c26121c81b0c6d693eab5a420e416da7e43c426d2e8f7df8da8a3acf" +checksum = "1f4ed65637b8390770814083d20756f87bfa2c21bf2f110babdc5438351746e4" dependencies = [ "lazy_static", ] From 435a8aec91f1bef1ef76ebe1cc793047e3e7a787 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 11 Oct 2021 09:14:15 +0000 Subject: [PATCH 07/11] chore(deps): bump reqwest from 0.11.4 to 0.11.5 (#2789) Bumps [reqwest](https://github.com/seanmonstar/reqwest) from 0.11.4 to 0.11.5. - [Release notes](https://github.com/seanmonstar/reqwest/releases) - [Changelog](https://github.com/seanmonstar/reqwest/blob/master/CHANGELOG.md) - [Commits](https://github.com/seanmonstar/reqwest/compare/v0.11.4...v0.11.5) --- updated-dependencies: - dependency-name: reqwest dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com> --- Cargo.lock | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index cec27c9908..75ee5a16b9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3443,9 +3443,9 @@ dependencies = [ [[package]] name = "reqwest" -version = "0.11.4" +version = "0.11.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "246e9f61b9bb77df069a947682be06e31ac43ea37862e244a69f177694ea6d22" +checksum = "51c732d463dd300362ffb44b7b125f299c23d2990411a4253824630ebc7467fb" dependencies = [ "base64 0.13.0", "bytes", @@ -4804,8 +4804,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "632f73e236b219150ea279196e54e610f5dbafa5d61786303d4da54f84e47fce" dependencies = [ "cfg-if", - "serde", - "serde_json", "wasm-bindgen-macro", ] From f35a49edd01d00b0513e125425f9118cb36dfaed Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> Date: Mon, 11 Oct 2021 10:23:00 +0100 Subject: [PATCH 08/11] refactor: move Sequence to data_types (#2780) Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com> --- Cargo.lock | 1 - data_types/src/lib.rs | 1 + data_types/src/sequence.rs | 14 ++++++++++++++ entry/src/entry.rs | 18 ++---------------- persistence_windows/Cargo.toml | 1 - persistence_windows/src/persistence_windows.rs | 5 +++-- server/src/database.rs | 3 ++- server/src/db.rs | 3 ++- server/src/db/replay.rs | 6 ++++-- server/src/write_buffer.rs | 2 +- write_buffer/src/core.rs | 3 ++- write_buffer/src/kafka.rs | 6 ++++-- write_buffer/src/mock.rs | 3 ++- 13 files changed, 37 insertions(+), 29 deletions(-) create mode 100644 data_types/src/sequence.rs diff --git a/Cargo.lock b/Cargo.lock index 75ee5a16b9..0ab6573722 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2802,7 +2802,6 @@ version = "0.1.0" dependencies = [ "chrono", "data_types", - "entry", "internal_types", "observability_deps", "snafu", diff --git a/data_types/src/lib.rs b/data_types/src/lib.rs index beb03ce295..12a9d294ff 100644 --- a/data_types/src/lib.rs +++ b/data_types/src/lib.rs @@ -19,6 +19,7 @@ pub mod error; pub mod job; pub mod names; pub mod partition_metadata; +pub mod sequence; pub mod server_id; pub mod timestamp; pub mod write_summary; diff --git a/data_types/src/sequence.rs b/data_types/src/sequence.rs new file mode 100644 index 0000000000..923c92c601 --- /dev/null +++ b/data_types/src/sequence.rs @@ -0,0 +1,14 @@ +#[derive(Debug, Copy, Clone)] +pub struct Sequence { + pub id: u32, + pub number: u64, +} + +impl Sequence { + pub fn new(sequencer_id: u32, sequence_number: u64) -> Self { + Self { + id: sequencer_id, + number: sequence_number, + } + } +} diff --git a/entry/src/entry.rs b/entry/src/entry.rs index 5c515f0a15..45683a1082 100644 --- a/entry/src/entry.rs +++ b/entry/src/entry.rs @@ -9,6 +9,8 @@ use ouroboros::self_referencing; use snafu::{OptionExt, ResultExt, Snafu}; use data_types::database_rules::{Error as DataError, Partitioner, ShardId, Sharder}; +use data_types::sequence::Sequence; +use data_types::write_summary::TimestampSummary; use generated_types::influxdata::pbdata::v1 as pb; use influxdb_line_protocol::{FieldValue, ParsedLine}; use internal_types::schema::{ @@ -17,7 +19,6 @@ use internal_types::schema::{ }; use crate::entry_fb; -use data_types::write_summary::TimestampSummary; #[derive(Debug, Snafu)] pub enum Error { @@ -1750,21 +1751,6 @@ pub struct SequencedEntry { sequence_and_producer_ts: Option<(Sequence, DateTime)>, } -#[derive(Debug, Copy, Clone)] -pub struct Sequence { - pub id: u32, - pub number: u64, -} - -impl Sequence { - pub fn new(sequencer_id: u32, sequence_number: u64) -> Self { - Self { - id: sequencer_id, - number: sequence_number, - } - } -} - impl SequencedEntry { pub fn new_from_sequence( sequence: Sequence, diff --git a/persistence_windows/Cargo.toml b/persistence_windows/Cargo.toml index 8384954bd6..8da9219ce0 100644 --- a/persistence_windows/Cargo.toml +++ b/persistence_windows/Cargo.toml @@ -6,7 +6,6 @@ edition = "2018" [dependencies] chrono = "0.4" data_types = { path = "../data_types" } -entry = { path = "../entry" } internal_types = { path = "../internal_types" } observability_deps = { path = "../observability_deps" } snafu = "0.6.2" diff --git a/persistence_windows/src/persistence_windows.rs b/persistence_windows/src/persistence_windows.rs index 4a6f403a4d..cc236e4de5 100644 --- a/persistence_windows/src/persistence_windows.rs +++ b/persistence_windows/src/persistence_windows.rs @@ -9,8 +9,9 @@ use std::{ use chrono::{DateTime, Duration, Utc}; -use data_types::{partition_metadata::PartitionAddr, write_summary::WriteSummary}; -use entry::Sequence; +use data_types::{ + partition_metadata::PartitionAddr, sequence::Sequence, write_summary::WriteSummary, +}; use internal_types::freezable::{Freezable, FreezeHandle}; use crate::min_max_sequence::MinMaxSequence; diff --git a/server/src/database.rs b/server/src/database.rs index 018a20b4c7..d0543d71fe 100644 --- a/server/src/database.rs +++ b/server/src/database.rs @@ -1194,7 +1194,8 @@ mod tests { use data_types::database_rules::{ PartitionTemplate, TemplatePart, WriteBufferConnection, WriteBufferDirection, }; - use entry::{test_helpers::lp_to_entries, Sequence, SequencedEntry}; + use data_types::sequence::Sequence; + use entry::{test_helpers::lp_to_entries, SequencedEntry}; use object_store::ObjectStore; use std::{ convert::{TryFrom, TryInto}, diff --git a/server/src/db.rs b/server/src/db.rs index 830029da1c..95ccf07e71 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -24,10 +24,11 @@ use data_types::{ chunk_metadata::{ChunkId, ChunkLifecycleAction, ChunkOrder, ChunkSummary}, database_rules::DatabaseRules, partition_metadata::{PartitionSummary, TableSummary}, + sequence::Sequence, server_id::ServerId, }; use datafusion::catalog::{catalog::CatalogProvider, schema::SchemaProvider}; -use entry::{Entry, Sequence, SequencedEntry, TableBatch}; +use entry::{Entry, SequencedEntry, TableBatch}; use internal_types::schema::Schema; use iox_object_store::IoxObjectStore; use mutable_buffer::chunk::{ChunkMetrics as MutableBufferChunkMetrics, MBChunk}; diff --git a/server/src/db/replay.rs b/server/src/db/replay.rs index deb69fbd80..e5e8353237 100644 --- a/server/src/db/replay.rs +++ b/server/src/db/replay.rs @@ -5,7 +5,8 @@ use std::{ }; use chrono::Utc; -use entry::{Sequence, TableBatch}; +use data_types::sequence::Sequence; +use entry::TableBatch; use futures::TryStreamExt; use observability_deps::tracing::info; use persistence_windows::{ @@ -420,11 +421,12 @@ mod tests { use chrono::{DateTime, Utc}; use data_types::{ database_rules::{PartitionTemplate, Partitioner, TemplatePart}, + sequence::Sequence, server_id::ServerId, }; use entry::{ test_helpers::{lp_to_entries, lp_to_entry}, - Sequence, SequencedEntry, + SequencedEntry, }; use object_store::ObjectStore; use persistence_windows::{ diff --git a/server/src/write_buffer.rs b/server/src/write_buffer.rs index 914ea0f283..7f41afbbfb 100644 --- a/server/src/write_buffer.rs +++ b/server/src/write_buffer.rs @@ -201,8 +201,8 @@ mod tests { use ::test_helpers::assert_contains; use arrow_util::assert_batches_eq; use data_types::database_rules::{PartitionTemplate, TemplatePart}; + use data_types::sequence::Sequence; use entry::test_helpers::lp_to_entry; - use entry::Sequence; use persistence_windows::min_max_sequence::MinMaxSequence; use query::exec::ExecutionContextProvider; use query::frontend::sql::SqlQueryPlanner; diff --git a/write_buffer/src/core.rs b/write_buffer/src/core.rs index 8dbabf628b..55e1da6e99 100644 --- a/write_buffer/src/core.rs +++ b/write_buffer/src/core.rs @@ -2,7 +2,8 @@ use std::fmt::Debug; use async_trait::async_trait; use chrono::{DateTime, Utc}; -use entry::{Entry, Sequence, SequencedEntry}; +use data_types::sequence::Sequence; +use entry::{Entry, SequencedEntry}; use futures::{future::BoxFuture, stream::BoxStream}; /// Generic boxed error type that is used in this crate. diff --git a/write_buffer/src/kafka.rs b/write_buffer/src/kafka.rs index 8a55990e03..b5de77f1ae 100644 --- a/write_buffer/src/kafka.rs +++ b/write_buffer/src/kafka.rs @@ -8,8 +8,10 @@ use std::{ use async_trait::async_trait; use chrono::{DateTime, TimeZone, Utc}; -use data_types::{database_rules::WriteBufferCreationConfig, server_id::ServerId}; -use entry::{Entry, Sequence, SequencedEntry}; +use data_types::{ + database_rules::WriteBufferCreationConfig, sequence::Sequence, server_id::ServerId, +}; +use entry::{Entry, SequencedEntry}; use futures::{FutureExt, StreamExt}; use observability_deps::tracing::{debug, info}; use rdkafka::{ diff --git a/write_buffer/src/mock.rs b/write_buffer/src/mock.rs index a54130898f..9d96ab35b8 100644 --- a/write_buffer/src/mock.rs +++ b/write_buffer/src/mock.rs @@ -11,7 +11,8 @@ use futures::{stream, FutureExt, StreamExt}; use parking_lot::Mutex; use data_types::database_rules::WriteBufferCreationConfig; -use entry::{Entry, Sequence, SequencedEntry}; +use data_types::sequence::Sequence; +use entry::{Entry, SequencedEntry}; use crate::core::{ EntryStream, FetchHighWatermark, FetchHighWatermarkFut, WriteBufferError, WriteBufferReading, From afe34751e7458c44dd24d7ffafa56bf4c5250712 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> Date: Mon, 11 Oct 2021 10:45:08 +0100 Subject: [PATCH 09/11] refactor: split out schema crate (#2781) * refactor: split out schema crate * chore: fix doc --- Cargo.lock | 34 ++++++++++++------- Cargo.toml | 1 + entry/Cargo.toml | 2 +- entry/src/entry.rs | 2 +- internal_types/Cargo.toml | 5 --- internal_types/src/lib.rs | 2 -- mutable_buffer/Cargo.toml | 2 +- mutable_buffer/src/chunk.rs | 12 +++---- mutable_buffer/src/chunk/snapshot.rs | 11 +++--- mutable_buffer/src/column.rs | 2 +- packers/Cargo.toml | 2 +- packers/src/lib.rs | 2 +- packers/src/packers.rs | 2 +- parquet_file/Cargo.toml | 2 +- parquet_file/src/catalog/dump.rs | 2 +- parquet_file/src/chunk.rs | 10 ++---- parquet_file/src/metadata.rs | 8 ++--- parquet_file/src/storage.rs | 2 +- parquet_file/src/test_utils.rs | 6 ++-- persistence_windows/src/checkpoint.rs | 5 ++- predicate/Cargo.toml | 2 +- predicate/src/predicate.rs | 2 +- query/Cargo.toml | 2 +- query/src/exec/field.rs | 2 +- query/src/exec/fieldlist.rs | 4 +-- query/src/frontend.rs | 2 +- query/src/frontend/influxrpc.rs | 8 ++--- query/src/frontend/reorg.rs | 8 ++--- query/src/func/selectors.rs | 4 +-- query/src/func/window.rs | 4 +-- query/src/lib.rs | 6 ++-- query/src/provider.rs | 4 +-- query/src/provider/deduplicate/algo.rs | 2 +- query/src/provider/overlap.rs | 2 +- query/src/provider/physical.rs | 3 +- query/src/statistics.rs | 4 +-- query/src/test.rs | 9 +++-- query/src/util.rs | 2 +- query_tests/Cargo.toml | 2 +- query_tests/src/table_schema.rs | 6 ++-- read_buffer/Cargo.toml | 2 +- read_buffer/benches/database.rs | 2 +- read_buffer/benches/read_filter.rs | 2 +- read_buffer/src/chunk.rs | 5 +-- read_buffer/src/lib.rs | 2 +- read_buffer/src/row_group.rs | 13 +++---- read_buffer/src/schema.rs | 8 ++--- read_buffer/src/table.rs | 2 +- schema/Cargo.toml | 16 +++++++++ .../src/schema => schema/src}/builder.rs | 4 +-- .../src/schema.rs => schema/src/lib.rs | 16 +++++---- .../src/schema => schema/src}/merge.rs | 6 ++-- {internal_types => schema}/src/selection.rs | 0 .../src/schema => schema/src}/sort.rs | 0 server/Cargo.toml | 1 + server/src/db.rs | 13 +++---- server/src/db/access.rs | 2 +- server/src/db/catalog.rs | 2 +- server/src/db/catalog/chunk.rs | 3 +- server/src/db/catalog/partition.rs | 2 +- server/src/db/catalog/table.rs | 4 +-- server/src/db/chunk.rs | 11 ++---- server/src/db/lifecycle.rs | 6 ++-- server/src/db/lifecycle/write.rs | 2 +- 64 files changed, 151 insertions(+), 165 deletions(-) create mode 100644 schema/Cargo.toml rename {internal_types/src/schema => schema/src}/builder.rs (98%) rename internal_types/src/schema.rs => schema/src/lib.rs (99%) rename {internal_types/src/schema => schema/src}/merge.rs (99%) rename {internal_types => schema}/src/selection.rs (100%) rename {internal_types/src/schema => schema/src}/sort.rs (100%) diff --git a/Cargo.lock b/Cargo.lock index 0ab6573722..1d587e37fc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1014,8 +1014,8 @@ dependencies = [ "flatbuffers", "generated_types", "influxdb_line_protocol", - "internal_types", "ouroboros", + "schema", "snafu", ] @@ -1795,13 +1795,8 @@ checksum = "90c11140ffea82edce8dcd74137ce9324ec24b3cf0175fc9d7e29164da9915b8" name = "internal_types" version = "0.1.0" dependencies = [ - "arrow", - "arrow_util", "chrono", "futures", - "hashbrown 0.11.2", - "indexmap", - "itertools", "parking_lot", "snafu", "tokio", @@ -2209,10 +2204,10 @@ dependencies = [ "data_types", "entry", "hashbrown 0.11.2", - "internal_types", "metric", "observability_deps", "parking_lot", + "schema", "snafu", "test_helpers", "tokio", @@ -2618,10 +2613,10 @@ dependencies = [ "arrow", "criterion", "influxdb_tsm", - "internal_types", "observability_deps", "parquet", "rand", + "schema", "snafu", "test_helpers", ] @@ -2702,7 +2697,6 @@ dependencies = [ "datafusion_util", "futures", "generated_types", - "internal_types", "iox_object_store", "metric", "object_store", @@ -2714,6 +2708,7 @@ dependencies = [ "persistence_windows", "predicate", "prost", + "schema", "snafu", "tempfile", "test_helpers", @@ -2964,10 +2959,10 @@ dependencies = [ "datafusion 0.1.0", "datafusion_util", "generated_types", - "internal_types", "observability_deps", "ordered-float 2.8.0", "regex", + "schema", "serde_json", "snafu", "sqlparser", @@ -3153,13 +3148,13 @@ dependencies = [ "datafusion_util", "futures", "hashbrown 0.11.2", - "internal_types", "itertools", "libc", "observability_deps", "parking_lot", "predicate", "regex", + "schema", "snafu", "test_helpers", "tokio", @@ -3177,12 +3172,12 @@ dependencies = [ "chrono", "data_types", "datafusion 0.1.0", - "internal_types", "metric", "object_store", "once_cell", "predicate", "query", + "schema", "server", "snafu", "tempfile", @@ -3356,7 +3351,6 @@ dependencies = [ "datafusion 0.1.0", "either", "hashbrown 0.11.2", - "internal_types", "itertools", "metric", "observability_deps", @@ -3365,6 +3359,7 @@ dependencies = [ "permutation", "rand", "rand_distr", + "schema", "snafu", "test_helpers", ] @@ -3685,6 +3680,18 @@ dependencies = [ "winapi", ] +[[package]] +name = "schema" +version = "0.1.0" +dependencies = [ + "arrow", + "arrow_util", + "hashbrown 0.11.2", + "indexmap", + "itertools", + "snafu", +] + [[package]] name = "scopeguard" version = "1.1.0" @@ -3845,6 +3852,7 @@ dependencies = [ "rand", "rand_distr", "read_buffer", + "schema", "serde", "serde_json", "snafu", diff --git a/Cargo.toml b/Cargo.toml index 0aba2fbf0a..3f052c9c49 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -69,6 +69,7 @@ members = [ "trace_http", "tracker", "trogging", + "schema", "grpc-router", "grpc-router/grpc-router-test-gen", "write_buffer", diff --git a/entry/Cargo.toml b/entry/Cargo.toml index 3e1c559fb2..f83bf163ef 100644 --- a/entry/Cargo.toml +++ b/entry/Cargo.toml @@ -14,5 +14,5 @@ flatbuffers = "2" snafu = "0.6" influxdb_line_protocol = { path = "../influxdb_line_protocol" } ouroboros = "0.13.0" -internal_types = { path = "../internal_types" } +schema = { path = "../schema" } generated_types = { path = "../generated_types" } diff --git a/entry/src/entry.rs b/entry/src/entry.rs index 45683a1082..3088a2552d 100644 --- a/entry/src/entry.rs +++ b/entry/src/entry.rs @@ -13,7 +13,7 @@ use data_types::sequence::Sequence; use data_types::write_summary::TimestampSummary; use generated_types::influxdata::pbdata::v1 as pb; use influxdb_line_protocol::{FieldValue, ParsedLine}; -use internal_types::schema::{ +use schema::{ builder::{Error as SchemaBuilderError, SchemaBuilder}, IOxValueType, InfluxColumnType, InfluxFieldType, Schema, TIME_COLUMN_NAME, }; diff --git a/internal_types/Cargo.toml b/internal_types/Cargo.toml index 5af229e986..b96434c2b4 100644 --- a/internal_types/Cargo.toml +++ b/internal_types/Cargo.toml @@ -7,16 +7,11 @@ description = "InfluxDB IOx internal types, shared between IOx instances" readme = "README.md" [dependencies] -arrow = { version = "5.5", features = ["prettyprint"] } chrono = "0.4" -hashbrown = "0.11" -indexmap = "1.6" -itertools = "0.10.1" parking_lot = "0.11" snafu = "0.6" tokio = { version = "1.11", features = ["sync"] } [dev-dependencies] -arrow_util = { path = "../arrow_util" } futures = "0.3" tokio = { version = "1.11", features = ["macros", "rt", "rt-multi-thread", "sync", "time"] } diff --git a/internal_types/src/lib.rs b/internal_types/src/lib.rs index 39a6b91472..abd15cee93 100644 --- a/internal_types/src/lib.rs +++ b/internal_types/src/lib.rs @@ -9,5 +9,3 @@ pub mod access; pub mod freezable; pub mod once; -pub mod schema; -pub mod selection; diff --git a/mutable_buffer/Cargo.toml b/mutable_buffer/Cargo.toml index 7bc206dc66..9d3c492c9a 100644 --- a/mutable_buffer/Cargo.toml +++ b/mutable_buffer/Cargo.toml @@ -21,7 +21,7 @@ chrono = "0.4" data_types = { path = "../data_types" } entry = { path = "../entry" } hashbrown = "0.11" -internal_types = { path = "../internal_types" } +schema = { path = "../schema" } metric = { path = "../metric" } observability_deps = { path = "../observability_deps" } parking_lot = "0.11.2" diff --git a/mutable_buffer/src/chunk.rs b/mutable_buffer/src/chunk.rs index c8bcf4087e..8f9f8930d4 100644 --- a/mutable_buffer/src/chunk.rs +++ b/mutable_buffer/src/chunk.rs @@ -6,11 +6,9 @@ use arrow::record_batch::RecordBatch; use data_types::partition_metadata::{ColumnSummary, InfluxDbType, TableSummary}; use entry::TableBatch; use hashbrown::HashMap; -use internal_types::{ - schema::{builder::SchemaBuilder, InfluxColumnType, Schema}, - selection::Selection, -}; use parking_lot::Mutex; +use schema::selection::Selection; +use schema::{builder::SchemaBuilder, InfluxColumnType, Schema}; use snafu::{ensure, OptionExt, ResultExt, Snafu}; use std::{collections::BTreeSet, sync::Arc}; @@ -35,9 +33,7 @@ pub enum Error { ArrowError { source: arrow::error::ArrowError }, #[snafu(display("Internal error converting schema: {}", source))] - InternalSchema { - source: internal_types::schema::builder::Error, - }, + InternalSchema { source: schema::builder::Error }, #[snafu(display("Column not found: {}", column))] ColumnNotFound { column: String }, @@ -435,7 +431,7 @@ mod tests { ColumnSummary, InfluxDbType, StatValues, Statistics, TableSummary, }; use entry::test_helpers::lp_to_entry; - use internal_types::schema::{InfluxColumnType, InfluxFieldType}; + use schema::{InfluxColumnType, InfluxFieldType}; use std::{convert::TryFrom, num::NonZeroU64, vec}; #[test] diff --git a/mutable_buffer/src/chunk/snapshot.rs b/mutable_buffer/src/chunk/snapshot.rs index 4ec4dfff6d..5a695cd9d0 100644 --- a/mutable_buffer/src/chunk/snapshot.rs +++ b/mutable_buffer/src/chunk/snapshot.rs @@ -5,10 +5,9 @@ use data_types::{ partition_metadata::{Statistics, TableSummary}, timestamp::TimestampRange, }; -use internal_types::{ - schema::{Schema, TIME_COLUMN_NAME}, - selection::Selection, -}; +use schema::selection::Selection; +use schema::{Schema, TIME_COLUMN_NAME}; + use snafu::{ResultExt, Snafu}; use std::{collections::BTreeSet, sync::Arc}; @@ -18,9 +17,7 @@ pub enum Error { TableNotFound { table_name: String }, #[snafu(display("Failed to select columns: {}", source))] - SelectColumns { - source: internal_types::schema::Error, - }, + SelectColumns { source: schema::Error }, } pub type Result = std::result::Result; diff --git a/mutable_buffer/src/column.rs b/mutable_buffer/src/column.rs index 9e374c547a..e1aaf1861b 100644 --- a/mutable_buffer/src/column.rs +++ b/mutable_buffer/src/column.rs @@ -16,7 +16,7 @@ use arrow_util::bitset::{iter_set_positions, BitSet}; use arrow_util::string::PackedStringArray; use data_types::partition_metadata::{IsNan, StatValues, Statistics}; use entry::Column as EntryColumn; -use internal_types::schema::{IOxValueType, InfluxColumnType, InfluxFieldType, TIME_DATA_TYPE}; +use schema::{IOxValueType, InfluxColumnType, InfluxFieldType, TIME_DATA_TYPE}; use crate::dictionary::{Dictionary, DID, INVALID_DID}; diff --git a/packers/Cargo.toml b/packers/Cargo.toml index 3f75e0f965..4888da3938 100644 --- a/packers/Cargo.toml +++ b/packers/Cargo.toml @@ -7,7 +7,7 @@ edition = "2018" [dependencies] # In alphabetical order arrow = { version = "5.5", features = ["prettyprint"] } influxdb_tsm = { path = "../influxdb_tsm" } -internal_types = { path = "../internal_types" } +schema = { path = "../schema" } snafu = "0.6.2" observability_deps = { path = "../observability_deps" } parquet = "5.5" diff --git a/packers/src/lib.rs b/packers/src/lib.rs index 85fa58ae38..e38d7145a1 100644 --- a/packers/src/lib.rs +++ b/packers/src/lib.rs @@ -11,10 +11,10 @@ pub mod packers; pub mod sorter; pub mod stats; +use schema::Schema; use snafu::Snafu; pub use crate::packers::{Packer, Packers}; -use internal_types::schema::Schema; pub use parquet::data_type::ByteArray; use std::borrow::Cow; diff --git a/packers/src/packers.rs b/packers/src/packers.rs index 8890b1959a..9d4feb920a 100644 --- a/packers/src/packers.rs +++ b/packers/src/packers.rs @@ -9,8 +9,8 @@ use core::iter::Iterator; use std::iter; use std::slice::Chunks; -use internal_types::schema::{InfluxColumnType, InfluxFieldType}; use parquet::data_type::ByteArray; +use schema::{InfluxColumnType, InfluxFieldType}; use std::default::Default; // NOTE: See https://blog.twitter.com/engineering/en_us/a/2013/dremel-made-simple-with-parquet.html diff --git a/parquet_file/Cargo.toml b/parquet_file/Cargo.toml index 94733e4b0f..5ea3a72f9c 100644 --- a/parquet_file/Cargo.toml +++ b/parquet_file/Cargo.toml @@ -14,7 +14,6 @@ datafusion = { path = "../datafusion" } datafusion_util = { path = "../datafusion_util" } futures = "0.3.7" generated_types = { path = "../generated_types" } -internal_types = { path = "../internal_types" } iox_object_store = { path = "../iox_object_store" } metric = { path = "../metric" } object_store = { path = "../object_store" } @@ -27,6 +26,7 @@ persistence_windows = { path = "../persistence_windows" } predicate = { path = "../predicate" } prost = "0.8" snafu = "0.6" +schema = { path = "../schema" } tempfile = "3.1.0" thrift = "0.13" tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync"] } diff --git a/parquet_file/src/catalog/dump.rs b/parquet_file/src/catalog/dump.rs index 72f360041d..98d9a911b9 100644 --- a/parquet_file/src/catalog/dump.rs +++ b/parquet_file/src/catalog/dump.rs @@ -37,7 +37,7 @@ pub struct DumpOptions { /// error otherwise. pub show_iox_metadata: bool, - /// Show debug output of [`Schema`](internal_types::schema::Schema) if decoding succeeds, show the decoding + /// Show debug output of [`Schema`](schema::Schema) if decoding succeeds, show the decoding /// error otherwise. pub show_schema: bool, diff --git a/parquet_file/src/chunk.rs b/parquet_file/src/chunk.rs index 4cd27279a6..d4ea91d2b1 100644 --- a/parquet_file/src/chunk.rs +++ b/parquet_file/src/chunk.rs @@ -4,12 +4,10 @@ use data_types::{ timestamp::TimestampRange, }; use datafusion::physical_plan::SendableRecordBatchStream; -use internal_types::{ - schema::{Schema, TIME_COLUMN_NAME}, - selection::Selection, -}; use iox_object_store::{IoxObjectStore, ParquetFilePath}; use predicate::predicate::Predicate; +use schema::selection::Selection; +use schema::{Schema, TIME_COLUMN_NAME}; use snafu::{ResultExt, Snafu}; use std::{collections::BTreeSet, mem, sync::Arc}; @@ -22,9 +20,7 @@ pub enum Error { ReadParquet { source: crate::storage::Error }, #[snafu(display("Failed to select columns: {}", source))] - SelectColumns { - source: internal_types::schema::Error, - }, + SelectColumns { source: schema::Error }, #[snafu( display("Cannot decode parquet metadata from {:?}: {}", path, source), diff --git a/parquet_file/src/metadata.rs b/parquet_file/src/metadata.rs index 0cdf8ff94d..628dc6e14b 100644 --- a/parquet_file/src/metadata.rs +++ b/parquet_file/src/metadata.rs @@ -92,7 +92,6 @@ use data_types::{ partition_metadata::{ColumnSummary, InfluxDbType, StatValues, Statistics}, }; use generated_types::influxdata::iox::catalog::v1 as proto; -use internal_types::schema::{InfluxColumnType, InfluxFieldType, Schema}; use parquet::{ arrow::parquet_to_arrow_schema, file::{ @@ -111,6 +110,7 @@ use persistence_windows::{ min_max_sequence::OptionalMinMaxSequence, }; use prost::Message; +use schema::{InfluxColumnType, InfluxFieldType, Schema}; use snafu::{ensure, OptionExt, ResultExt, Snafu}; use std::{collections::BTreeMap, convert::TryInto, sync::Arc}; use thrift::protocol::{TCompactInputProtocol, TCompactOutputProtocol, TOutputProtocol}; @@ -207,9 +207,7 @@ pub enum Error { }, #[snafu(display("Cannot read IOx schema from arrow: {}", source))] - IoxFromArrowFailure { - source: internal_types::schema::Error, - }, + IoxFromArrowFailure { source: schema::Error }, #[snafu(display("Parquet metadata does not contain IOx metadata"))] IoxMetadataMissing {}, @@ -862,7 +860,7 @@ fn extract_iox_statistics( mod tests { use super::*; - use internal_types::schema::TIME_COLUMN_NAME; + use schema::TIME_COLUMN_NAME; use crate::test_utils::{ chunk_addr, create_partition_and_database_checkpoint, load_parquet_from_store, make_chunk, diff --git a/parquet_file/src/storage.rs b/parquet_file/src/storage.rs index dc70404a6d..3645c2f28a 100644 --- a/parquet_file/src/storage.rs +++ b/parquet_file/src/storage.rs @@ -13,7 +13,6 @@ use datafusion::{ }; use datafusion_util::AdapterStream; use futures::StreamExt; -use internal_types::selection::Selection; use iox_object_store::{IoxObjectStore, ParquetFilePath}; use observability_deps::tracing::debug; use parking_lot::Mutex; @@ -24,6 +23,7 @@ use parquet::{ file::{metadata::KeyValue, properties::WriterProperties, writer::TryClone}, }; use predicate::predicate::Predicate; +use schema::selection::Selection; use snafu::{ensure, OptionExt, ResultExt, Snafu}; use std::{ io::{Cursor, Seek, SeekFrom, Write}, diff --git a/parquet_file/src/test_utils.rs b/parquet_file/src/test_utils.rs index 67e5fd9c93..91da2ae474 100644 --- a/parquet_file/src/test_utils.rs +++ b/parquet_file/src/test_utils.rs @@ -21,10 +21,6 @@ use data_types::{ use datafusion::physical_plan::SendableRecordBatchStream; use datafusion_util::MemoryStream; use futures::TryStreamExt; -use internal_types::{ - schema::{builder::SchemaBuilder, Schema, TIME_COLUMN_NAME}, - selection::Selection, -}; use iox_object_store::{IoxObjectStore, ParquetFilePath}; use object_store::ObjectStore; use parquet::{ @@ -35,6 +31,8 @@ use persistence_windows::{ checkpoint::{DatabaseCheckpoint, PartitionCheckpoint, PersistCheckpointBuilder}, min_max_sequence::OptionalMinMaxSequence, }; +use schema::selection::Selection; +use schema::{builder::SchemaBuilder, Schema, TIME_COLUMN_NAME}; use snafu::{ResultExt, Snafu}; use std::{collections::BTreeMap, num::NonZeroU32, sync::Arc}; diff --git a/persistence_windows/src/checkpoint.rs b/persistence_windows/src/checkpoint.rs index ac7a571054..e91887e3cf 100644 --- a/persistence_windows/src/checkpoint.rs +++ b/persistence_windows/src/checkpoint.rs @@ -319,9 +319,8 @@ pub type Result = std::result::Result; /// `max_persisted` timestamp ("flush timestamp"). /// /// The `min_persisted` timestamp is relative to the value in -/// [`TIME_COLUMN_NAME`](internal_types::schema::TIME_COLUMN_NAME). The -/// min/max sequence numbers are relative to their respective -/// sequencers. +/// the time column. The min/max sequence numbers are relative +/// to their respective sequencers. /// /// Since the sequence number is per-Entry, that it can be evaluated /// quickly during replay, while the timestamp must be checked for each diff --git a/predicate/Cargo.toml b/predicate/Cargo.toml index e3f8514293..c0f97a28a1 100644 --- a/predicate/Cargo.toml +++ b/predicate/Cargo.toml @@ -10,7 +10,7 @@ data_types = { path = "../data_types" } datafusion = { path = "../datafusion" } datafusion_util = { path = "../datafusion_util" } generated_types = { path = "../generated_types" } -internal_types = { path = "../internal_types" } +schema = { path = "../schema" } observability_deps = { path = "../observability_deps" } ordered-float = "2" regex = "1" diff --git a/predicate/src/predicate.rs b/predicate/src/predicate.rs index 1e54e76ba8..d7f3b624b3 100644 --- a/predicate/src/predicate.rs +++ b/predicate/src/predicate.rs @@ -15,8 +15,8 @@ use datafusion::{ optimizer::utils, }; use datafusion_util::{make_range_expr, AndExprBuilder}; -use internal_types::schema::TIME_COLUMN_NAME; use observability_deps::tracing::debug; +use schema::TIME_COLUMN_NAME; /// This `Predicate` represents the empty predicate (aka that /// evaluates to true for all rows). diff --git a/query/Cargo.toml b/query/Cargo.toml index 5b7c4f788a..e5756ac8fb 100644 --- a/query/Cargo.toml +++ b/query/Cargo.toml @@ -24,10 +24,10 @@ datafusion = { path = "../datafusion" } datafusion_util = { path = "../datafusion_util" } futures = "0.3" hashbrown = "0.11" -internal_types = { path = "../internal_types" } observability_deps = { path = "../observability_deps" } parking_lot = "0.11.2" regex = "1" +schema = { path = "../schema" } snafu = "0.6.9" tokio = { version = "1.11", features = ["macros"] } tokio-stream = "0.1.2" diff --git a/query/src/exec/field.rs b/query/src/exec/field.rs index 7d795bd661..678b67e0e2 100644 --- a/query/src/exec/field.rs +++ b/query/src/exec/field.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use arrow::{self, datatypes::SchemaRef}; -use internal_types::schema::TIME_COLUMN_NAME; +use schema::TIME_COLUMN_NAME; use snafu::{ResultExt, Snafu}; #[derive(Debug, Snafu)] diff --git a/query/src/exec/fieldlist.rs b/query/src/exec/fieldlist.rs index 46142e9095..2b84a370aa 100644 --- a/query/src/exec/fieldlist.rs +++ b/query/src/exec/fieldlist.rs @@ -9,7 +9,7 @@ use arrow::{ datatypes::{DataType, SchemaRef}, record_batch::RecordBatch, }; -use internal_types::schema::TIME_COLUMN_NAME; +use schema::TIME_COLUMN_NAME; use snafu::{ensure, ResultExt, Snafu}; @@ -191,7 +191,7 @@ mod tests { array::{Int64Array, StringArray}, datatypes::{DataType as ArrowDataType, Field as ArrowField, Schema}, }; - use internal_types::schema::TIME_DATA_TYPE; + use schema::TIME_DATA_TYPE; #[test] fn test_convert_single_batch() { diff --git a/query/src/frontend.rs b/query/src/frontend.rs index 5690215128..a0b8a8c2bf 100644 --- a/query/src/frontend.rs +++ b/query/src/frontend.rs @@ -12,7 +12,7 @@ mod test { ExecutionPlan, ExecutionPlanVisitor, }; use futures::StreamExt; - use internal_types::schema::{merge::SchemaMerger, sort::SortKey, Schema}; + use schema::{merge::SchemaMerger, sort::SortKey, Schema}; use crate::{ exec::{split::StreamSplitExec, Executor, ExecutorType}, diff --git a/query/src/frontend/influxrpc.rs b/query/src/frontend/influxrpc.rs index f1fb57d6a7..8f1096afd2 100644 --- a/query/src/frontend/influxrpc.rs +++ b/query/src/frontend/influxrpc.rs @@ -15,12 +15,10 @@ use datafusion::{ use datafusion_util::AsExpr; use hashbrown::{HashMap, HashSet}; -use internal_types::{ - schema::{InfluxColumnType, Schema, TIME_COLUMN_NAME}, - selection::Selection, -}; use observability_deps::tracing::{debug, trace}; use predicate::predicate::{Predicate, PredicateMatch}; +use schema::selection::Selection; +use schema::{InfluxColumnType, Schema, TIME_COLUMN_NAME}; use snafu::{ensure, OptionExt, ResultExt, Snafu}; use crate::{ @@ -1619,7 +1617,7 @@ pub fn schema_has_all_expr_columns(schema: &Schema, expr: &Expr) -> bool { #[cfg(test)] mod tests { - use internal_types::schema::builder::SchemaBuilder; + use schema::builder::SchemaBuilder; use super::*; diff --git a/query/src/frontend/reorg.rs b/query/src/frontend/reorg.rs index 1e783fc6c6..e4aeec6616 100644 --- a/query/src/frontend/reorg.rs +++ b/query/src/frontend/reorg.rs @@ -6,8 +6,8 @@ use datafusion::{ logical_plan::{col, Expr, LogicalPlan, LogicalPlanBuilder}, scalar::ScalarValue, }; -use internal_types::schema::{sort::SortKey, Schema, TIME_COLUMN_NAME}; use observability_deps::tracing::{debug, trace}; +use schema::{sort::SortKey, Schema, TIME_COLUMN_NAME}; use crate::{ exec::make_stream_split, @@ -19,9 +19,7 @@ use snafu::{ResultExt, Snafu}; #[derive(Debug, Snafu)] pub enum Error { #[snafu(display("Chunk schema not compatible for compact plan: {}", source))] - ChunkSchemaNotCompatible { - source: internal_types::schema::merge::Error, - }, + ChunkSchemaNotCompatible { source: schema::merge::Error }, #[snafu(display("Reorg planner got error building plan: {}", source))] BuildingPlan { @@ -254,7 +252,7 @@ struct ScanPlan { mod test { use arrow::compute::SortOptions; use arrow_util::assert_batches_eq; - use internal_types::schema::merge::SchemaMerger; + use schema::merge::SchemaMerger; use crate::{ exec::{Executor, ExecutorType}, diff --git a/query/src/func/selectors.rs b/query/src/func/selectors.rs index 5a194bbfed..fbe8c49742 100644 --- a/query/src/func/selectors.rs +++ b/query/src/func/selectors.rs @@ -36,7 +36,7 @@ use internal::{ I64LastSelector, I64MaxSelector, I64MinSelector, Utf8FirstSelector, Utf8LastSelector, Utf8MaxSelector, Utf8MinSelector, }; -use internal_types::schema::TIME_DATA_TYPE; +use schema::TIME_DATA_TYPE; /// Returns a DataFusion user defined aggregate function for computing /// one field of the first() selector function. @@ -313,7 +313,7 @@ mod test { util::pretty::pretty_format_batches, }; use datafusion::{datasource::MemTable, logical_plan::Expr, prelude::*}; - use internal_types::schema::TIME_DATA_TIMEZONE; + use schema::TIME_DATA_TIMEZONE; use super::*; diff --git a/query/src/func/window.rs b/query/src/func/window.rs index 34a01db158..2a4be43bbc 100644 --- a/query/src/func/window.rs +++ b/query/src/func/window.rs @@ -1,7 +1,7 @@ mod internal; pub use internal::{Duration, Window}; -use internal_types::schema::TIME_DATA_TYPE; +use schema::TIME_DATA_TYPE; use std::sync::Arc; @@ -94,7 +94,7 @@ pub fn make_window_bound_expr( #[cfg(test)] mod tests { use arrow::array::TimestampNanosecondArray; - use internal_types::schema::TIME_DATA_TIMEZONE; + use schema::TIME_DATA_TIMEZONE; use super::*; diff --git a/query/src/lib.rs b/query/src/lib.rs index f842af55c7..e803ab54f6 100644 --- a/query/src/lib.rs +++ b/query/src/lib.rs @@ -14,15 +14,13 @@ use data_types::{ }; use datafusion::physical_plan::SendableRecordBatchStream; use exec::stringset::StringSet; -use internal_types::{ - schema::{sort::SortKey, Schema, TIME_COLUMN_NAME}, - selection::Selection, -}; use observability_deps::tracing::{debug, trace}; use predicate::{ delete_predicate::DeletePredicate, predicate::{Predicate, PredicateMatch}, }; +use schema::selection::Selection; +use schema::{sort::SortKey, Schema, TIME_COLUMN_NAME}; use hashbrown::HashMap; use std::{fmt::Debug, sync::Arc}; diff --git a/query/src/provider.rs b/query/src/provider.rs index b7b066f0f9..6da1e27797 100644 --- a/query/src/provider.rs +++ b/query/src/provider.rs @@ -18,9 +18,9 @@ use datafusion::{ ExecutionPlan, }, }; -use internal_types::schema::{merge::SchemaMerger, sort::SortKey, Schema}; use observability_deps::tracing::{debug, trace}; use predicate::predicate::{Predicate, PredicateBuilder}; +use schema::{merge::SchemaMerger, sort::SortKey, Schema}; use crate::{ compute_sort_key, @@ -1007,7 +1007,7 @@ mod test { use arrow::datatypes::DataType; use arrow_util::assert_batches_eq; use datafusion::physical_plan::collect; - use internal_types::schema::{builder::SchemaBuilder, TIME_COLUMN_NAME}; + use schema::{builder::SchemaBuilder, TIME_COLUMN_NAME}; use crate::{ test::{raw_data, TestChunk}, diff --git a/query/src/provider/deduplicate/algo.rs b/query/src/provider/deduplicate/algo.rs index 744a007969..9908574a27 100644 --- a/query/src/provider/deduplicate/algo.rs +++ b/query/src/provider/deduplicate/algo.rs @@ -381,7 +381,7 @@ impl RecordBatchDeduplicator { } /// Get column name out of the `expr`. TODO use -/// internal_types::schema::SortKey instead. +/// schema::SortKey instead. fn get_col_name(expr: &dyn PhysicalExpr) -> &str { expr.as_any() .downcast_ref::() diff --git a/query/src/provider/overlap.rs b/query/src/provider/overlap.rs index 0580414d4f..8996ad3512 100644 --- a/query/src/provider/overlap.rs +++ b/query/src/provider/overlap.rs @@ -5,7 +5,7 @@ //! writes (and thus are stored in separate rows) use data_types::partition_metadata::{ColumnSummary, StatOverlap, Statistics}; -use internal_types::schema::TIME_COLUMN_NAME; +use schema::TIME_COLUMN_NAME; use snafu::Snafu; use crate::QueryChunkMeta; diff --git a/query/src/provider/physical.rs b/query/src/provider/physical.rs index 4c07f31e2a..29bc26b0ca 100644 --- a/query/src/provider/physical.rs +++ b/query/src/provider/physical.rs @@ -11,7 +11,8 @@ use datafusion::{ DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics, }, }; -use internal_types::{schema::Schema, selection::Selection}; +use schema::selection::Selection; +use schema::Schema; use crate::QueryChunk; use predicate::predicate::Predicate; diff --git a/query/src/statistics.rs b/query/src/statistics.rs index f116a63beb..e16e540c1d 100644 --- a/query/src/statistics.rs +++ b/query/src/statistics.rs @@ -5,7 +5,7 @@ use datafusion::{ physical_plan::{ColumnStatistics, Statistics as DFStatistics}, scalar::ScalarValue, }; -use internal_types::schema::Schema; +use schema::Schema; /// Converts stats.min and an appropriate `ScalarValue` pub(crate) fn min_to_scalar(stats: &IOxStatistics) -> Option { @@ -88,7 +88,7 @@ mod test { use super::*; use data_types::partition_metadata::{InfluxDbType, StatValues}; - use internal_types::schema::{builder::SchemaBuilder, InfluxFieldType}; + use schema::{builder::SchemaBuilder, InfluxFieldType}; #[test] fn convert() { diff --git a/query/src/test.rs b/query/src/test.rs index 8bce7dcb63..20c401cd17 100644 --- a/query/src/test.rs +++ b/query/src/test.rs @@ -20,14 +20,13 @@ use data_types::{ }; use datafusion::physical_plan::{common::SizedRecordBatchStream, SendableRecordBatchStream}; use futures::StreamExt; -use internal_types::schema::sort::SortKey; -use internal_types::{ - schema::{builder::SchemaBuilder, merge::SchemaMerger, InfluxColumnType, Schema}, - selection::Selection, -}; use observability_deps::tracing::debug; use parking_lot::Mutex; use predicate::delete_predicate::DeletePredicate; +use schema::selection::Selection; +use schema::{ + builder::SchemaBuilder, merge::SchemaMerger, sort::SortKey, InfluxColumnType, Schema, +}; use snafu::Snafu; use std::num::NonZeroU64; use std::{collections::BTreeMap, fmt, sync::Arc}; diff --git a/query/src/util.rs b/query/src/util.rs index 3c163d32ee..088abdce46 100644 --- a/query/src/util.rs +++ b/query/src/util.rs @@ -13,7 +13,7 @@ use datafusion::{ ExecutionPlan, PhysicalExpr, }, }; -use internal_types::schema::sort::SortKey; +use schema::sort::SortKey; /// Create a logical plan that produces the record batch pub fn make_scan_plan(batch: RecordBatch) -> std::result::Result { diff --git a/query_tests/Cargo.toml b/query_tests/Cargo.toml index ede2d4f83d..13589ec77b 100644 --- a/query_tests/Cargo.toml +++ b/query_tests/Cargo.toml @@ -21,9 +21,9 @@ server = { path = "../server" } arrow = { version = "5.5", features = ["prettyprint"] } arrow_util = { path = "../arrow_util" } data_types = { path = "../data_types" } -internal_types = { path = "../internal_types" } metric = { path = "../metric" } object_store = { path = "../object_store" } +schema = { path = "../schema" } snafu = "0.6.3" tempfile = "3.1.0" test_helpers = { path = "../test_helpers" } diff --git a/query_tests/src/table_schema.rs b/query_tests/src/table_schema.rs index a1be0d95ff..2d5175a188 100644 --- a/query_tests/src/table_schema.rs +++ b/query_tests/src/table_schema.rs @@ -1,12 +1,10 @@ //! Tests for the table_names implementation use arrow::datatypes::DataType; -use internal_types::{ - schema::{builder::SchemaBuilder, sort::SortKey, Schema, TIME_COLUMN_NAME}, - selection::Selection, -}; use predicate::predicate::PredicateBuilder; use query::{QueryChunk, QueryChunkMeta, QueryDatabase}; +use schema::selection::Selection; +use schema::{builder::SchemaBuilder, sort::SortKey, Schema, TIME_COLUMN_NAME}; use super::scenarios::*; diff --git a/read_buffer/Cargo.toml b/read_buffer/Cargo.toml index f246198686..955d06cdb4 100644 --- a/read_buffer/Cargo.toml +++ b/read_buffer/Cargo.toml @@ -19,13 +19,13 @@ data_types = { path = "../data_types" } datafusion = { path = "../datafusion" } either = "1.6.1" hashbrown = "0.11" -internal_types = { path = "../internal_types" } itertools = "0.10.1" metric = { path = "../metric" } observability_deps = { path = "../observability_deps" } parking_lot = "0.11" permutation = "0.2.5" snafu = "0.6" +schema = { path = "../schema" } [dev-dependencies] # In alphabetical order criterion = "0.3.3" diff --git a/read_buffer/benches/database.rs b/read_buffer/benches/database.rs index 713e4a62c9..0e11225925 100644 --- a/read_buffer/benches/database.rs +++ b/read_buffer/benches/database.rs @@ -3,8 +3,8 @@ use arrow::{ record_batch::RecordBatch, }; use criterion::{criterion_group, criterion_main, BatchSize, Criterion}; -use internal_types::schema::builder::SchemaBuilder; use read_buffer::{BinaryExpr, ChunkMetrics, Predicate, RBChunk}; +use schema::builder::SchemaBuilder; use std::sync::Arc; const BASE_TIME: i64 = 1351700038292387000_i64; diff --git a/read_buffer/benches/read_filter.rs b/read_buffer/benches/read_filter.rs index 0bd3f9f20d..656ee72679 100644 --- a/read_buffer/benches/read_filter.rs +++ b/read_buffer/benches/read_filter.rs @@ -4,13 +4,13 @@ use rand::prelude::*; use rand::Rng; use rand_distr::{Distribution, Normal}; -use internal_types::selection::Selection; use packers::{sorter, Packers}; use read_buffer::{ benchmarks::{Column, ColumnType, RowGroup}, RBChunk, }; use read_buffer::{BinaryExpr, Predicate}; +use schema::selection::Selection; const ONE_MS: i64 = 1_000_000; diff --git a/read_buffer/src/chunk.rs b/read_buffer/src/chunk.rs index e0ed0286d4..94ac209ba4 100644 --- a/read_buffer/src/chunk.rs +++ b/read_buffer/src/chunk.rs @@ -6,9 +6,10 @@ use crate::{ }; use arrow::record_batch::RecordBatch; use data_types::{chunk_metadata::ChunkColumnSummary, partition_metadata::TableSummary}; -use internal_types::{schema::builder::Error as SchemaError, schema::Schema, selection::Selection}; use metric::{Attributes, CumulativeGauge, CumulativeRecorder, RecorderCollection}; use observability_deps::tracing::debug; +use schema::selection::Selection; +use schema::{builder::Error as SchemaError, Schema}; use snafu::{ResultExt, Snafu}; use std::{ collections::{BTreeMap, BTreeSet}, @@ -492,8 +493,8 @@ mod test { }, }; use data_types::partition_metadata::{ColumnSummary, InfluxDbType, StatValues, Statistics}; - use internal_types::schema::builder::SchemaBuilder; use metric::{MetricKind, Observation, ObservationSet, RawReporter}; + use schema::builder::SchemaBuilder; use std::{num::NonZeroU64, sync::Arc}; // helper to make the `add_remove_tables` test simpler to read. diff --git a/read_buffer/src/lib.rs b/read_buffer/src/lib.rs index 1a2f87a3eb..421802a8f1 100644 --- a/read_buffer/src/lib.rs +++ b/read_buffer/src/lib.rs @@ -9,9 +9,9 @@ mod table; mod value; // Identifiers that are exported as part of the public API. +pub use self::schema::*; pub use chunk::{Chunk as RBChunk, ChunkMetrics, Error}; pub use row_group::{BinaryExpr, Predicate}; -pub use schema::*; pub use table::ReadFilterResults; /// THIS MODULE SHOULD ONLY BE IMPORTED FOR BENCHMARKS. diff --git a/read_buffer/src/row_group.rs b/read_buffer/src/row_group.rs index d9c16dd153..a785e79b9e 100644 --- a/read_buffer/src/row_group.rs +++ b/read_buffer/src/row_group.rs @@ -17,6 +17,7 @@ use crate::schema::{AggregateType, LogicalDataType, ResultSchema}; use crate::value::{ AggregateVec, EncodedValues, OwnedValue, Scalar, Value, Values, ValuesIterator, }; +use ::schema::{selection::Selection, InfluxColumnType, Schema}; use arrow::{ array, array::ArrayRef, @@ -27,12 +28,10 @@ use datafusion::{ logical_plan::Expr as DfExpr, logical_plan::Operator as DFOperator, scalar::ScalarValue as DFScalarValue, }; -use internal_types::schema::{InfluxColumnType, Schema}; -use internal_types::selection::Selection; use std::num::NonZeroU64; /// The name used for a timestamp column. -pub const TIME_COLUMN_NAME: &str = internal_types::schema::TIME_COLUMN_NAME; +pub const TIME_COLUMN_NAME: &str = ::schema::TIME_COLUMN_NAME; #[derive(Debug, Snafu)] pub enum Error { @@ -40,9 +39,7 @@ pub enum Error { ArrowConversion { source: arrow::error::ArrowError }, #[snafu(display("schema conversion error: {}", source))] - SchemaConversion { - source: internal_types::schema::builder::Error, - }, + SchemaConversion { source: ::schema::builder::Error }, #[snafu(display("unsupported operation: {}", msg))] UnsupportedOperation { msg: String }, @@ -1787,7 +1784,7 @@ impl TryFrom> for RecordBatch { type Error = Error; fn try_from(result: ReadFilterResult<'_>) -> Result { - let schema = internal_types::schema::Schema::try_from(result.schema()) + let schema = ::schema::Schema::try_from(result.schema()) .map_err(|source| Error::SchemaConversion { source })?; let columns: Vec = result @@ -2263,7 +2260,7 @@ impl TryFrom> for RecordBatch { type Error = Error; fn try_from(mut result: ReadAggregateResult<'_>) -> Result { - let schema = internal_types::schema::Schema::try_from(result.schema()) + let schema = ::schema::Schema::try_from(result.schema()) .map_err(|source| Error::SchemaConversion { source })?; let arrow_schema: arrow::datatypes::SchemaRef = schema.into(); diff --git a/read_buffer/src/schema.rs b/read_buffer/src/schema.rs index e8d9791af9..1ea39cb3eb 100644 --- a/read_buffer/src/schema.rs +++ b/read_buffer/src/schema.rs @@ -1,6 +1,6 @@ use std::{convert::TryFrom, fmt::Display}; -use internal_types::schema::InfluxFieldType; +use schema::InfluxFieldType; /// A schema that is used to track the names and semantics of columns returned /// in results out of various operations on a row group. @@ -96,11 +96,11 @@ impl Display for ResultSchema { } } -impl TryFrom<&ResultSchema> for internal_types::schema::Schema { - type Error = internal_types::schema::builder::Error; +impl TryFrom<&ResultSchema> for ::schema::Schema { + type Error = ::schema::builder::Error; fn try_from(rs: &ResultSchema) -> Result { - let mut builder = internal_types::schema::builder::SchemaBuilder::new(); + let mut builder = ::schema::builder::SchemaBuilder::new(); for (col_type, data_type) in &rs.select_columns { match col_type { ColumnType::Tag(name) => builder.tag(name.as_str()), diff --git a/read_buffer/src/table.rs b/read_buffer/src/table.rs index b0d98d8d50..6db494b7aa 100644 --- a/read_buffer/src/table.rs +++ b/read_buffer/src/table.rs @@ -7,8 +7,8 @@ use crate::{ }; use arrow::record_batch::RecordBatch; use data_types::{chunk_metadata::ChunkColumnSummary, partition_metadata::TableSummary}; -use internal_types::selection::Selection; use parking_lot::RwLock; +use schema::selection::Selection; use snafu::{ensure, Snafu}; use std::{ collections::{BTreeMap, BTreeSet}, diff --git a/schema/Cargo.toml b/schema/Cargo.toml new file mode 100644 index 0000000000..2cbe6a4d33 --- /dev/null +++ b/schema/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "schema" +version = "0.1.0" +authors = ["Andrew Lamb "] +edition = "2018" +description = "IOx Schema definition" + +[dependencies] +arrow = { version = "5.5", features = ["prettyprint"] } +hashbrown = "0.11" +indexmap = "1.6" +itertools = "0.10.1" +snafu = "0.6" + +[dev-dependencies] +arrow_util = { path = "../arrow_util" } diff --git a/internal_types/src/schema/builder.rs b/schema/src/builder.rs similarity index 98% rename from internal_types/src/schema/builder.rs rename to schema/src/builder.rs index 8450dd2335..8afb4df3a9 100644 --- a/internal_types/src/schema/builder.rs +++ b/schema/src/builder.rs @@ -3,7 +3,7 @@ use std::convert::TryInto; use arrow::datatypes::{DataType as ArrowDataType, Field as ArrowField}; use snafu::{ResultExt, Snafu}; -use crate::schema::sort::SortKey; +use crate::sort::SortKey; use super::{InfluxColumnType, InfluxFieldType, Schema, TIME_COLUMN_NAME}; @@ -124,7 +124,7 @@ impl SchemaBuilder { /// Creates an Arrow schema with embedded metadata. /// All schema validation happens at this time. /// ``` - /// use internal_types::schema::{builder::SchemaBuilder, InfluxColumnType, InfluxFieldType}; + /// use schema::{builder::SchemaBuilder, InfluxColumnType, InfluxFieldType}; /// /// let schema = SchemaBuilder::new() /// .tag("region") diff --git a/internal_types/src/schema.rs b/schema/src/lib.rs similarity index 99% rename from internal_types/src/schema.rs rename to schema/src/lib.rs index 2f82a0da7b..9de694c035 100644 --- a/internal_types/src/schema.rs +++ b/schema/src/lib.rs @@ -1,4 +1,4 @@ -//! This module contains the schema definiton for IOx +//! This module contains the schema definition for IOx use std::{ cmp::Ordering, collections::HashMap, @@ -11,13 +11,12 @@ use arrow::datatypes::{ DataType as ArrowDataType, Field as ArrowField, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, TimeUnit, }; +use hashbrown::HashSet; + +use selection::Selection; use snafu::{OptionExt, Snafu}; -use crate::{ - schema::sort::{ColumnSort, SortKey}, - selection::Selection, -}; -use hashbrown::HashSet; +use crate::sort::{ColumnSort, SortKey}; /// The name of the timestamp column in the InfluxDB datamodel pub const TIME_COLUMN_NAME: &str = "time"; @@ -41,6 +40,7 @@ pub fn TIME_DATA_TYPE() -> ArrowDataType { pub mod builder; pub mod merge; +pub mod selection; pub mod sort; /// Database schema creation / validation errors. @@ -787,11 +787,13 @@ macro_rules! assert_column_eq { #[cfg(test)] mod test { use arrow::compute::SortOptions; + use InfluxColumnType::*; use InfluxFieldType::*; + use crate::merge::SchemaMerger; + use super::{builder::SchemaBuilder, *}; - use crate::schema::merge::SchemaMerger; fn make_field( name: &str, diff --git a/internal_types/src/schema/merge.rs b/schema/src/merge.rs similarity index 99% rename from internal_types/src/schema/merge.rs rename to schema/src/merge.rs index eb45573fa1..6a649b091b 100644 --- a/internal_types/src/schema/merge.rs +++ b/schema/src/merge.rs @@ -3,7 +3,7 @@ use hashbrown::hash_map::RawEntryMut; use hashbrown::HashMap; use snafu::Snafu; -use crate::schema::sort::SortKey; +use crate::sort::SortKey; use super::{InfluxColumnType, Schema}; @@ -188,8 +188,8 @@ impl SchemaMerger { #[cfg(test)] mod tests { - use crate::schema::builder::SchemaBuilder; - use crate::schema::InfluxFieldType::Integer; + use crate::builder::SchemaBuilder; + use crate::InfluxFieldType::Integer; use super::*; diff --git a/internal_types/src/selection.rs b/schema/src/selection.rs similarity index 100% rename from internal_types/src/selection.rs rename to schema/src/selection.rs diff --git a/internal_types/src/schema/sort.rs b/schema/src/sort.rs similarity index 100% rename from internal_types/src/schema/sort.rs rename to schema/src/sort.rs diff --git a/server/Cargo.toml b/server/Cargo.toml index 777b0cf71e..fe011fa570 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -43,6 +43,7 @@ query = { path = "../query" } rand = "0.8.3" rand_distr = "0.4.2" read_buffer = { path = "../read_buffer" } +schema = { path = "../schema" } serde = "1.0" serde_json = "1.0" snafu = "0.6" diff --git a/server/src/db.rs b/server/src/db.rs index 95ccf07e71..37dcee79bf 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -29,7 +29,6 @@ use data_types::{ }; use datafusion::catalog::{catalog::CatalogProvider, schema::SchemaProvider}; use entry::{Entry, SequencedEntry, TableBatch}; -use internal_types::schema::Schema; use iox_object_store::IoxObjectStore; use mutable_buffer::chunk::{ChunkMetrics as MutableBufferChunkMetrics, MBChunk}; use observability_deps::tracing::{debug, error, info, warn}; @@ -45,6 +44,7 @@ use query::{ exec::{ExecutionContextProvider, Executor, ExecutorType, IOxExecutionContext}, QueryDatabase, }; +use schema::Schema; use trace::ctx::SpanContext; use write_buffer::core::{WriteBufferReading, WriteBufferWriting}; @@ -138,14 +138,10 @@ pub enum Error { TableBatchMissingTimes {}, #[snafu(display("Table batch has invalid schema: {}", source))] - TableBatchSchemaExtractError { - source: internal_types::schema::builder::Error, - }, + TableBatchSchemaExtractError { source: schema::builder::Error }, #[snafu(display("Table batch has mismatching schema: {}", source))] - TableBatchSchemaMergeError { - source: internal_types::schema::merge::Error, - }, + TableBatchSchemaMergeError { source: schema::merge::Error }, #[snafu(display( "Unable to flush partition at the moment {}:{}", @@ -1469,7 +1465,6 @@ mod tests { write_summary::TimestampSummary, }; use entry::test_helpers::lp_to_entry; - use internal_types::{schema::Schema, selection::Selection}; use iox_object_store::ParquetFilePath; use metric::{Attributes, CumulativeGauge, Metric, Observation}; use object_store::ObjectStore; @@ -1480,6 +1475,8 @@ mod tests { }; use persistence_windows::min_max_sequence::MinMaxSequence; use query::{QueryChunk, QueryDatabase}; + use schema::selection::Selection; + use schema::Schema; use write_buffer::mock::{ MockBufferForWriting, MockBufferForWritingThatAlwaysErrors, MockBufferSharedState, }; diff --git a/server/src/db/access.rs b/server/src/db/access.rs index 9e0da7427c..32180b157e 100644 --- a/server/src/db/access.rs +++ b/server/src/db/access.rs @@ -16,7 +16,6 @@ use datafusion::{ catalog::{catalog::CatalogProvider, schema::SchemaProvider}, datasource::TableProvider, }; -use internal_types::schema::Schema; use metric::{Attributes, Metric, U64Counter}; use observability_deps::tracing::debug; use predicate::predicate::{Predicate, PredicateBuilder}; @@ -24,6 +23,7 @@ use query::{ provider::{ChunkPruner, ProviderBuilder}, QueryChunk, QueryChunkMeta, DEFAULT_SCHEMA, }; +use schema::Schema; use system_tables::{SystemSchemaProvider, SYSTEM_SCHEMA}; use hashbrown::HashMap; diff --git a/server/src/db/catalog.rs b/server/src/db/catalog.rs index 5344689426..322c04d783 100644 --- a/server/src/db/catalog.rs +++ b/server/src/db/catalog.rs @@ -9,7 +9,7 @@ use hashbrown::{HashMap, HashSet}; use data_types::chunk_metadata::ChunkSummary; use data_types::chunk_metadata::DetailedChunkSummary; use data_types::partition_metadata::{PartitionAddr, PartitionSummary, TableSummary}; -use internal_types::schema::Schema; +use schema::Schema; use snafu::{OptionExt, Snafu}; use tracker::{MappedRwLockReadGuard, RwLock, RwLockReadGuard}; diff --git a/server/src/db/catalog/chunk.rs b/server/src/db/catalog/chunk.rs index 868e3c73ce..9bb04c29a5 100644 --- a/server/src/db/catalog/chunk.rs +++ b/server/src/db/catalog/chunk.rs @@ -11,12 +11,13 @@ use data_types::{ partition_metadata::TableSummary, write_summary::TimestampSummary, }; -use internal_types::{access::AccessRecorder, schema::Schema}; +use internal_types::access::AccessRecorder; use mutable_buffer::chunk::{snapshot::ChunkSnapshot as MBChunkSnapshot, MBChunk}; use observability_deps::tracing::debug; use parquet_file::chunk::ParquetChunk; use predicate::delete_predicate::DeletePredicate; use read_buffer::RBChunk; +use schema::Schema; use tracker::{TaskRegistration, TaskTracker}; use crate::db::catalog::metrics::{StorageRecorder, TimestampHistogram}; diff --git a/server/src/db/catalog/partition.rs b/server/src/db/catalog/partition.rs index a3cca98a44..2e323e2967 100644 --- a/server/src/db/catalog/partition.rs +++ b/server/src/db/catalog/partition.rs @@ -8,12 +8,12 @@ use data_types::{ partition_metadata::{PartitionAddr, PartitionSummary}, }; use hashbrown::HashMap; -use internal_types::schema::Schema; use observability_deps::tracing::info; use persistence_windows::{ min_max_sequence::OptionalMinMaxSequence, persistence_windows::PersistenceWindows, }; use predicate::delete_predicate::DeletePredicate; +use schema::Schema; use snafu::{OptionExt, Snafu}; use std::{collections::BTreeMap, fmt::Display, sync::Arc}; use tracker::RwLock; diff --git a/server/src/db/catalog/table.rs b/server/src/db/catalog/table.rs index f9967129b9..74b71f5756 100644 --- a/server/src/db/catalog/table.rs +++ b/server/src/db/catalog/table.rs @@ -2,7 +2,7 @@ use super::partition::Partition; use crate::db::catalog::metrics::TableMetrics; use data_types::partition_metadata::{PartitionAddr, PartitionSummary}; use hashbrown::HashMap; -use internal_types::schema::{ +use schema::{ builder::SchemaBuilder, merge::{Error as SchemaMergerError, SchemaMerger}, Schema, @@ -196,7 +196,7 @@ impl<'a> TableSchemaUpsertHandle<'a> { #[cfg(test)] mod tests { - use internal_types::schema::{InfluxColumnType, InfluxFieldType}; + use schema::{InfluxColumnType, InfluxFieldType}; use super::*; diff --git a/server/src/db/chunk.rs b/server/src/db/chunk.rs index 6daeca7b59..cf41112f35 100644 --- a/server/src/db/chunk.rs +++ b/server/src/db/chunk.rs @@ -8,11 +8,7 @@ use data_types::{ }; use datafusion::physical_plan::SendableRecordBatchStream; use datafusion_util::MemoryStream; -use internal_types::{ - access::AccessRecorder, - schema::{sort::SortKey, Schema}, - selection::Selection, -}; +use internal_types::access::AccessRecorder; use iox_object_store::ParquetFilePath; use mutable_buffer::chunk::snapshot::ChunkSnapshot; use observability_deps::tracing::debug; @@ -24,6 +20,7 @@ use predicate::{ }; use query::{exec::stringset::StringSet, QueryChunk, QueryChunkMeta}; use read_buffer::RBChunk; +use schema::{selection::Selection, sort::SortKey, Schema}; use snafu::{OptionExt, ResultExt, Snafu}; use std::{ collections::{BTreeMap, BTreeSet}, @@ -54,9 +51,7 @@ pub enum Error { }, #[snafu(display("Internal error restricting schema: {}", source))] - InternalSelectingSchema { - source: internal_types::schema::Error, - }, + InternalSelectingSchema { source: schema::Error }, #[snafu(display("Predicate conversion error: {}", source))] PredicateConversion { source: super::pred::Error }, diff --git a/server/src/db/lifecycle.rs b/server/src/db/lifecycle.rs index 5c4177bc21..fbc5b90fac 100644 --- a/server/src/db/lifecycle.rs +++ b/server/src/db/lifecycle.rs @@ -14,10 +14,7 @@ use data_types::{ DatabaseName, }; use datafusion::physical_plan::SendableRecordBatchStream; -use internal_types::{ - access::AccessMetrics, - schema::{merge::SchemaMerger, Schema, TIME_COLUMN_NAME}, -}; +use internal_types::access::AccessMetrics; use lifecycle::{ LifecycleChunk, LifecyclePartition, LifecycleReadGuard, LifecycleWriteGuard, LockableChunk, LockablePartition, @@ -25,6 +22,7 @@ use lifecycle::{ use observability_deps::tracing::{info, trace}; use persistence_windows::persistence_windows::FlushHandle; use query::QueryChunkMeta; +use schema::{merge::SchemaMerger, Schema, TIME_COLUMN_NAME}; use std::{ convert::TryInto, fmt::Display, diff --git a/server/src/db/lifecycle/write.rs b/server/src/db/lifecycle/write.rs index 0b346c0e66..9b1213ebc6 100644 --- a/server/src/db/lifecycle/write.rs +++ b/server/src/db/lifecycle/write.rs @@ -13,7 +13,6 @@ use crate::db::{ use ::lifecycle::LifecycleWriteGuard; use data_types::{chunk_metadata::ChunkLifecycleAction, job::Job}; -use internal_types::selection::Selection; use observability_deps::tracing::{debug, warn}; use parquet_file::{ catalog::interface::CatalogParquetInfo, @@ -27,6 +26,7 @@ use persistence_windows::{ }; use predicate::predicate::Predicate; use query::{QueryChunk, QueryChunkMeta}; +use schema::selection::Selection; use snafu::ResultExt; use std::{future::Future, sync::Arc}; use tracker::{TaskTracker, TrackedFuture, TrackedFutureExt}; From c4a26417641732a50ac49fac68b33b855a6cf387 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Mon, 11 Oct 2021 15:49:34 +0200 Subject: [PATCH 10/11] refactor: remove `time_closed` The "time closed" is a leftover from an old lifecycle system, where chunks moved through the system (open=>closed=>persisted) without being merged. Now we have the compaction as well as the split query for persistence that can merge chunks, so a single "time closed" doesn't make sense any longer. So in fact it is `None` for many chunks and is also not persisted. Also the current lifecycle policy doesn't use this value. So let's just remove it. Closes #1846. --- data_types/src/chunk_metadata.rs | 4 --- .../influxdata/iox/management/v1/chunk.proto | 5 ++-- generated_types/src/chunk.rs | 8 ------ server/src/db.rs | 10 ------- server/src/db/catalog/chunk.rs | 17 ------------ server/src/db/system_tables/chunks.rs | 26 ++++++------------- server/src/db/system_tables/columns.rs | 3 --- tests/end_to_end_cases/management_api.rs | 8 +----- 8 files changed, 11 insertions(+), 70 deletions(-) diff --git a/data_types/src/chunk_metadata.rs b/data_types/src/chunk_metadata.rs index 7617c85f6b..2c56f603c7 100644 --- a/data_types/src/chunk_metadata.rs +++ b/data_types/src/chunk_metadata.rs @@ -160,10 +160,6 @@ pub struct ChunkSummary { /// into IOx. Note due to the compaction, etc... this may not be the chunk /// that data was originally written into pub time_of_last_write: DateTime, - - /// Time at which this chunk was marked as closed. Note this is - /// not the same as the timestamps on the data itself - pub time_closed: Option>, } /// Represents metadata about the physical storage of a column in a chunk diff --git a/generated_types/protos/influxdata/iox/management/v1/chunk.proto b/generated_types/protos/influxdata/iox/management/v1/chunk.proto index 1cb9b69947..800ad3bce8 100644 --- a/generated_types/protos/influxdata/iox/management/v1/chunk.proto +++ b/generated_types/protos/influxdata/iox/management/v1/chunk.proto @@ -90,9 +90,8 @@ message Chunk { // that data was originally written into google.protobuf.Timestamp time_of_last_write = 6; - // Time at which this chunk was marked as closed. Note this is not - // the same as the timestamps on the data itself - google.protobuf.Timestamp time_closed = 7; + // Was `time_closed`. + reserved 7; // Order of this chunk relative to other overlapping chunks. uint32 order = 13; diff --git a/generated_types/src/chunk.rs b/generated_types/src/chunk.rs index 1f1e3509c5..388aa9ceaa 100644 --- a/generated_types/src/chunk.rs +++ b/generated_types/src/chunk.rs @@ -25,7 +25,6 @@ impl From for management::Chunk { time_of_last_access, time_of_first_write, time_of_last_write, - time_closed, order, } = summary; @@ -41,7 +40,6 @@ impl From for management::Chunk { time_of_last_access: time_of_last_access.map(Into::into), time_of_first_write: Some(time_of_first_write.into()), time_of_last_write: Some(time_of_last_write.into()), - time_closed: time_closed.map(Into::into), order: order.get(), } } @@ -106,7 +104,6 @@ impl TryFrom for ChunkSummary { time_of_last_access, time_of_first_write, time_of_last_write, - time_closed, order, } = proto; @@ -123,7 +120,6 @@ impl TryFrom for ChunkSummary { time_of_last_access: timestamp(time_of_last_access, "time_of_last_access")?, time_of_first_write: required_timestamp(time_of_first_write, "time_of_first_write")?, time_of_last_write: required_timestamp(time_of_last_write, "time_of_last_write")?, - time_closed: timestamp(time_closed, "time_closed")?, order: ChunkOrder::new(order).ok_or_else(|| FieldViolation { field: "order".to_string(), description: "Order must be non-zero".to_string(), @@ -188,7 +184,6 @@ mod test { lifecycle_action: management::ChunkLifecycleAction::Compacting.into(), time_of_first_write: Some(now.into()), time_of_last_write: Some(now.into()), - time_closed: None, time_of_last_access: Some(pbjson_types::Timestamp { seconds: 50, nanos: 7, @@ -208,7 +203,6 @@ mod test { lifecycle_action: Some(ChunkLifecycleAction::Compacting), time_of_first_write: now, time_of_last_write: now, - time_closed: None, time_of_last_access: Some(Utc.timestamp_nanos(50_000_000_007)), order: ChunkOrder::new(5).unwrap(), }; @@ -234,7 +228,6 @@ mod test { lifecycle_action: Some(ChunkLifecycleAction::Persisting), time_of_first_write: now, time_of_last_write: now, - time_closed: None, time_of_last_access: Some(Utc.timestamp_nanos(12_000_100_007)), order: ChunkOrder::new(5).unwrap(), }; @@ -252,7 +245,6 @@ mod test { lifecycle_action: management::ChunkLifecycleAction::Persisting.into(), time_of_first_write: Some(now.into()), time_of_last_write: Some(now.into()), - time_closed: None, time_of_last_access: Some(pbjson_types::Timestamp { seconds: 12, nanos: 100_007, diff --git a/server/src/db.rs b/server/src/db.rs index 37dcee79bf..7baac7c489 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -2552,8 +2552,6 @@ mod tests { assert!(start < chunk.time_of_first_write()); assert!(chunk.time_of_first_write() < after_data_load); assert!(chunk.time_of_first_write() == chunk.time_of_last_write()); - assert!(after_data_load < chunk.time_closed().unwrap()); - assert!(chunk.time_closed().unwrap() < after_rollover); } #[tokio::test] @@ -2615,7 +2613,6 @@ mod tests { time_of_last_access: None, time_of_first_write: Utc.timestamp_nanos(1), time_of_last_write: Utc.timestamp_nanos(1), - time_closed: None, order: ChunkOrder::new(5).unwrap(), }]; @@ -2648,9 +2645,7 @@ mod tests { let t_second_write = Utc::now(); write_lp_with_time(&db, "cpu bar=2 2", t_second_write).await; - let t_close_before = Utc::now(); db.rollover_partition("cpu", "1970-01-01T00").await.unwrap(); - let t_close_after = Utc::now(); let mut chunk_summaries = db.chunk_summaries().unwrap(); @@ -2659,8 +2654,6 @@ mod tests { let summary = &chunk_summaries[0]; assert_eq!(summary.time_of_first_write, t_first_write); assert_eq!(summary.time_of_last_write, t_second_write); - assert!(t_close_before <= summary.time_closed.unwrap()); - assert!(summary.time_closed.unwrap() <= t_close_after); } fn assert_first_last_times_eq(chunk_summary: &ChunkSummary, expected: DateTime) { @@ -2853,7 +2846,6 @@ mod tests { time_of_last_access: None, time_of_first_write: Utc.timestamp_nanos(1), time_of_last_write: Utc.timestamp_nanos(1), - time_closed: None, }, ChunkSummary { partition_key: Arc::from("1970-01-05T15"), @@ -2868,7 +2860,6 @@ mod tests { time_of_last_access: None, time_of_first_write: Utc.timestamp_nanos(1), time_of_last_write: Utc.timestamp_nanos(1), - time_closed: None, }, ChunkSummary { partition_key: Arc::from("1970-01-05T15"), @@ -2883,7 +2874,6 @@ mod tests { time_of_last_access: None, time_of_first_write: Utc.timestamp_nanos(1), time_of_last_write: Utc.timestamp_nanos(1), - time_closed: None, }, ]; diff --git a/server/src/db/catalog/chunk.rs b/server/src/db/catalog/chunk.rs index 9bb04c29a5..dbcf3df784 100644 --- a/server/src/db/catalog/chunk.rs +++ b/server/src/db/catalog/chunk.rs @@ -220,10 +220,6 @@ pub struct CatalogChunk { /// that data was originally written into time_of_last_write: DateTime, - /// Time at which this chunk was marked as closed. Note this is - /// not the same as the timestamps on the data itself - time_closed: Option>, - /// Order of this chunk relative to other overlapping chunks. order: ChunkOrder, } @@ -293,7 +289,6 @@ impl CatalogChunk { access_recorder: Default::default(), time_of_first_write: time_of_write, time_of_last_write: time_of_write, - time_closed: None, order, }; chunk.update_metrics(); @@ -331,7 +326,6 @@ impl CatalogChunk { access_recorder: Default::default(), time_of_first_write, time_of_last_write, - time_closed: None, order, }; chunk.update_metrics(); @@ -372,7 +366,6 @@ impl CatalogChunk { access_recorder: Default::default(), time_of_first_write, time_of_last_write, - time_closed: None, order, }; chunk.update_metrics(); @@ -422,10 +415,6 @@ impl CatalogChunk { self.time_of_last_write } - pub fn time_closed(&self) -> Option> { - self.time_closed - } - pub fn order(&self) -> ChunkOrder { self.order } @@ -590,7 +579,6 @@ impl CatalogChunk { time_of_last_access, time_of_first_write: self.time_of_first_write, time_of_last_write: self.time_of_last_write, - time_closed: self.time_closed, order: self.order, } } @@ -693,9 +681,6 @@ impl CatalogChunk { match &self.stage { ChunkStage::Open { mb_chunk, .. } => { debug!(%self.addr, row_count=mb_chunk.rows(), "freezing chunk"); - assert!(self.time_closed.is_none()); - - self.time_closed = Some(Utc::now()); let (s, _) = mb_chunk.snapshot(); // Cache table summary + schema @@ -946,10 +931,8 @@ mod tests { let mut chunk = make_open_chunk(); let registration = TaskRegistration::new(); - assert!(chunk.time_closed.is_none()); assert!(matches!(chunk.stage, ChunkStage::Open { .. })); chunk.set_compacting(®istration).unwrap(); - assert!(chunk.time_closed.is_some()); assert!(matches!(chunk.stage, ChunkStage::Frozen { .. })); } diff --git a/server/src/db/system_tables/chunks.rs b/server/src/db/system_tables/chunks.rs index 33d5f30377..ed06e8f59e 100644 --- a/server/src/db/system_tables/chunks.rs +++ b/server/src/db/system_tables/chunks.rs @@ -49,8 +49,7 @@ fn chunk_summaries_schema() -> SchemaRef { Field::new("row_count", DataType::UInt64, false), Field::new("time_of_last_access", ts.clone(), true), Field::new("time_of_first_write", ts.clone(), false), - Field::new("time_of_last_write", ts.clone(), false), - Field::new("time_closed", ts, true), + Field::new("time_of_last_write", ts, false), Field::new("order", DataType::UInt32, false), ])) } @@ -112,11 +111,6 @@ fn from_chunk_summaries(schema: SchemaRef, chunks: Vec) -> Result< .map(|c| c.time_of_last_write) .map(time_to_ts) .collect::(); - let time_closed = chunks - .iter() - .map(|c| c.time_closed) - .map(optional_time_to_ts) - .collect::(); let order = chunks .iter() .map(|c| Some(c.order.get())) @@ -136,7 +130,6 @@ fn from_chunk_summaries(schema: SchemaRef, chunks: Vec) -> Result< Arc::new(time_of_last_access), Arc::new(time_of_first_write), Arc::new(time_of_last_write), - Arc::new(time_closed), Arc::new(order), ], ) @@ -164,7 +157,6 @@ mod tests { time_of_last_access: None, time_of_first_write: Utc.timestamp_nanos(10_000_000_000), time_of_last_write: Utc.timestamp_nanos(10_000_000_000), - time_closed: None, order: ChunkOrder::new(5).unwrap(), }, ChunkSummary { @@ -179,7 +171,6 @@ mod tests { time_of_last_access: Some(Utc.timestamp_nanos(754_000_000_000)), time_of_first_write: Utc.timestamp_nanos(80_000_000_000), time_of_last_write: Utc.timestamp_nanos(80_000_000_000), - time_closed: None, order: ChunkOrder::new(6).unwrap(), }, ChunkSummary { @@ -194,19 +185,18 @@ mod tests { time_of_last_access: Some(Utc.timestamp_nanos(5_000_000_000)), time_of_first_write: Utc.timestamp_nanos(100_000_000_000), time_of_last_write: Utc.timestamp_nanos(200_000_000_000), - time_closed: None, order: ChunkOrder::new(7).unwrap(), }, ]; let expected = vec![ - "+--------------------------------------+---------------+------------+-------------------+------------------------------+--------------+--------------------+-----------+----------------------+----------------------+----------------------+-------------+-------+", - "| id | partition_key | table_name | storage | lifecycle_action | memory_bytes | object_store_bytes | row_count | time_of_last_access | time_of_first_write | time_of_last_write | time_closed | order |", - "+--------------------------------------+---------------+------------+-------------------+------------------------------+--------------+--------------------+-----------+----------------------+----------------------+----------------------+-------------+-------+", - "| 00000000-0000-0000-0000-000000000000 | p1 | table1 | OpenMutableBuffer | | 23754 | | 11 | | 1970-01-01T00:00:10Z | 1970-01-01T00:00:10Z | | 5 |", - "| 00000000-0000-0000-0000-000000000001 | p1 | table1 | OpenMutableBuffer | Persisting to Object Storage | 23455 | | 22 | 1970-01-01T00:12:34Z | 1970-01-01T00:01:20Z | 1970-01-01T00:01:20Z | | 6 |", - "| 00000000-0000-0000-0000-000000000002 | p1 | table1 | ObjectStoreOnly | | 1234 | 5678 | 33 | 1970-01-01T00:00:05Z | 1970-01-01T00:01:40Z | 1970-01-01T00:03:20Z | | 7 |", - "+--------------------------------------+---------------+------------+-------------------+------------------------------+--------------+--------------------+-----------+----------------------+----------------------+----------------------+-------------+-------+", + "+--------------------------------------+---------------+------------+-------------------+------------------------------+--------------+--------------------+-----------+----------------------+----------------------+----------------------+-------+", + "| id | partition_key | table_name | storage | lifecycle_action | memory_bytes | object_store_bytes | row_count | time_of_last_access | time_of_first_write | time_of_last_write | order |", + "+--------------------------------------+---------------+------------+-------------------+------------------------------+--------------+--------------------+-----------+----------------------+----------------------+----------------------+-------+", + "| 00000000-0000-0000-0000-000000000000 | p1 | table1 | OpenMutableBuffer | | 23754 | | 11 | | 1970-01-01T00:00:10Z | 1970-01-01T00:00:10Z | 5 |", + "| 00000000-0000-0000-0000-000000000001 | p1 | table1 | OpenMutableBuffer | Persisting to Object Storage | 23455 | | 22 | 1970-01-01T00:12:34Z | 1970-01-01T00:01:20Z | 1970-01-01T00:01:20Z | 6 |", + "| 00000000-0000-0000-0000-000000000002 | p1 | table1 | ObjectStoreOnly | | 1234 | 5678 | 33 | 1970-01-01T00:00:05Z | 1970-01-01T00:01:40Z | 1970-01-01T00:03:20Z | 7 |", + "+--------------------------------------+---------------+------------+-------------------+------------------------------+--------------+--------------------+-----------+----------------------+----------------------+----------------------+-------+", ]; let schema = chunk_summaries_schema(); diff --git a/server/src/db/system_tables/columns.rs b/server/src/db/system_tables/columns.rs index c5ff9600bb..130d96b229 100644 --- a/server/src/db/system_tables/columns.rs +++ b/server/src/db/system_tables/columns.rs @@ -320,7 +320,6 @@ mod tests { time_of_last_access: None, time_of_first_write: Utc.timestamp_nanos(1), time_of_last_write: Utc.timestamp_nanos(2), - time_closed: None, order: ChunkOrder::new(5).unwrap(), }, columns: vec![ @@ -357,7 +356,6 @@ mod tests { time_of_last_access: None, time_of_first_write: Utc.timestamp_nanos(1), time_of_last_write: Utc.timestamp_nanos(2), - time_closed: None, order: ChunkOrder::new(6).unwrap(), }, columns: vec![ChunkColumnSummary { @@ -388,7 +386,6 @@ mod tests { time_of_last_access: None, time_of_first_write: Utc.timestamp_nanos(1), time_of_last_write: Utc.timestamp_nanos(2), - time_closed: None, order: ChunkOrder::new(5).unwrap(), }, columns: vec![ChunkColumnSummary { diff --git a/tests/end_to_end_cases/management_api.rs b/tests/end_to_end_cases/management_api.rs index 69285a771a..603270f47d 100644 --- a/tests/end_to_end_cases/management_api.rs +++ b/tests/end_to_end_cases/management_api.rs @@ -511,9 +511,7 @@ async fn test_chunk_get() { // make sure there were timestamps prior to normalization assert!( - chunks[0].time_of_first_write.is_some() - && chunks[0].time_of_last_write.is_some() - && chunks[0].time_closed.is_none(), // chunk is not yet closed + chunks[0].time_of_first_write.is_some() && chunks[0].time_of_last_write.is_some(), // chunk is not yet closed "actual:{:#?}", chunks[0] ); @@ -535,7 +533,6 @@ async fn test_chunk_get() { time_of_last_access: None, time_of_first_write: None, time_of_last_write: None, - time_closed: None, order: 1, }, Chunk { @@ -550,7 +547,6 @@ async fn test_chunk_get() { time_of_last_access: None, time_of_first_write: None, time_of_last_write: None, - time_closed: None, order: 1, }, ]; @@ -722,7 +718,6 @@ async fn test_list_partition_chunks() { time_of_last_access: None, time_of_first_write: None, time_of_last_write: None, - time_closed: None, order: 1, }]; @@ -1069,7 +1064,6 @@ fn normalize_chunks(chunks: Vec) -> Vec { time_of_last_access: None, time_of_first_write: None, time_of_last_write: None, - time_closed: None, memory_bytes, object_store_bytes, order, From 24ae269b3a54afee0dbad248a800828a1e297b2d Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Mon, 11 Oct 2021 10:03:19 +0200 Subject: [PATCH 11/11] refactor: cancel executor jobs on drop Our executor is not meant as a fire-and-forget system. Instead the submitter should always poll the result. Dropping the receiver side (aka the job handle) should cancel the job. --- Cargo.lock | 2 + query/Cargo.toml | 2 + query/src/exec/task.rs | 92 ++++++++++++++++++++++++++++++++++++++---- 3 files changed, 89 insertions(+), 7 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1d587e37fc..0f4bd72e42 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3152,6 +3152,7 @@ dependencies = [ "libc", "observability_deps", "parking_lot", + "pin-project", "predicate", "regex", "schema", @@ -3159,6 +3160,7 @@ dependencies = [ "test_helpers", "tokio", "tokio-stream", + "tokio-util", "trace", ] diff --git a/query/Cargo.toml b/query/Cargo.toml index e5756ac8fb..8f13a8cc51 100644 --- a/query/Cargo.toml +++ b/query/Cargo.toml @@ -26,11 +26,13 @@ futures = "0.3" hashbrown = "0.11" observability_deps = { path = "../observability_deps" } parking_lot = "0.11.2" +pin-project = "1.0" regex = "1" schema = { path = "../schema" } snafu = "0.6.9" tokio = { version = "1.11", features = ["macros"] } tokio-stream = "0.1.2" +tokio-util = { version = "0.6.3" } trace = { path = "../trace" } predicate = { path = "../predicate" } diff --git a/query/src/exec/task.rs b/query/src/exec/task.rs index 9dcc859152..0342460b4b 100644 --- a/query/src/exec/task.rs +++ b/query/src/exec/task.rs @@ -2,20 +2,68 @@ //! intensive" workloads such as DataFusion plans use parking_lot::Mutex; +use pin_project::{pin_project, pinned_drop}; use std::{pin::Pin, sync::Arc}; use tokio::sync::oneshot::Receiver; +use tokio_util::sync::CancellationToken; use futures::Future; use observability_deps::tracing::warn; -/// The type of thing that the dedicated executor runs -type Task = Pin + Send>>; +/// Task that can be added to the executor-internal queue. +/// +/// Every task within the executor is represented by a [`Job`] that can be polled by the API user. +struct Task { + fut: Pin + Send>>, + cancel: CancellationToken, +} + +impl Task { + /// Run task. + /// + /// This runs the payload or cancels if the linked [`Job`] is dropped. + async fn run(self) { + tokio::select! { + _ = self.cancel.cancelled() => (), + _ = self.fut => (), + } + } +} /// The type of error that is returned from tasks in this module #[allow(dead_code)] pub type Error = tokio::sync::oneshot::error::RecvError; +/// Job within the executor. +/// +/// Dropping the job will cancel its linked task. +#[pin_project(PinnedDrop)] +pub struct Job { + cancel: CancellationToken, + #[pin] + rx: Receiver, +} + +impl Future for Job { + type Output = Result; + + fn poll( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll { + let this = self.project(); + this.rx.poll(cx) + } +} + +#[pinned_drop] +impl PinnedDrop for Job { + fn drop(self: Pin<&mut Self>) { + self.cancel.cancel(); + } +} + /// Runs futures (and any `tasks` that are `tokio::task::spawned` by /// them) on a separate tokio Executor #[derive(Clone)] @@ -86,7 +134,7 @@ impl DedicatedExecutor { let handle = join.read_owned().await; tokio::task::spawn(async move { - task.await; + task.run().await; std::mem::drop(handle); }); } @@ -111,30 +159,35 @@ impl DedicatedExecutor { /// /// Currently all tasks are added to the tokio executor /// immediately and compete for the threadpool's resources. - pub fn spawn(&self, task: T) -> Receiver + pub fn spawn(&self, task: T) -> Job where T: Future + Send + 'static, T::Output: Send + 'static, { let (tx, rx) = tokio::sync::oneshot::channel(); - let job = Box::pin(async move { + let fut = Box::pin(async move { let task_output = task.await; if tx.send(task_output).is_err() { warn!("Spawned task output ignored: receiver dropped") } }); + let cancel = CancellationToken::new(); + let task = Task { + fut, + cancel: cancel.clone(), + }; let mut state = self.state.lock(); if let Some(requests) = &mut state.requests { // would fail if someone has started shutdown - requests.send(job).ok(); + requests.send(task).ok(); } else { warn!("tried to schedule task on an executor that was shutdown"); } - rx + Job { rx, cancel } } /// signals shutdown of this executor and any Clones @@ -185,6 +238,7 @@ fn set_current_thread_priority(prio: i32) { mod tests { use super::*; use std::sync::{Arc, Barrier}; + use tokio::sync::Barrier as AsyncBarrier; #[cfg(unix)] fn get_current_thread_priority() -> i32 { @@ -352,9 +406,33 @@ mod tests { exec.join(); } + #[tokio::test] + async fn drop_receiver() { + let barrier1 = Arc::new(AsyncBarrier::new(2)); + let barrier2 = Arc::new(AsyncBarrier::new(2)); + + let exec = DedicatedExecutor::new("Test DedicatedExecutor", 1); + + let dedicated_task1 = exec.spawn(do_work_async(11, Arc::clone(&barrier1))); + let dedicated_task2 = exec.spawn(do_work_async(22, Arc::clone(&barrier2))); + + drop(dedicated_task1); + + barrier2.wait().await; + assert_eq!(dedicated_task2.await.unwrap(), 22); + + exec.join() + } + /// Wait for the barrier and then return `result` async fn do_work(result: usize, barrier: Arc) -> usize { barrier.wait(); result } + + /// Wait for the barrier and then return `result` + async fn do_work_async(result: usize, barrier: Arc) -> usize { + barrier.wait().await; + result + } }