diff --git a/Cargo.lock b/Cargo.lock index 86cbc76e3f..067862ffce 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -92,9 +92,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.38" +version = "1.0.39" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "afddf7f520a80dbf76e6f50a35bca42a2331ef227a28b3b6dc5c2e2338d114b1" +checksum = "81cddc5f91628367664cc7c69714ff08deee8a3efc54623011c772544d7b2767" [[package]] name = "arrayref" @@ -111,7 +111,7 @@ checksum = "23b62fc65de8e4e7f52534fb52b0f3ed04746ae267519eef2a83941e8085068b" [[package]] name = "arrow" version = "4.0.0-SNAPSHOT" -source = "git+https://github.com/apache/arrow.git?rev=6208a79739d0228ecc566fa8436ee61068452212#6208a79739d0228ecc566fa8436ee61068452212" +source = "git+https://github.com/apache/arrow.git?rev=9262a5d83b6037d0c39668310b1875e32f1b7e83#9262a5d83b6037d0c39668310b1875e32f1b7e83" dependencies = [ "cfg_aliases", "chrono", @@ -134,7 +134,7 @@ dependencies = [ [[package]] name = "arrow-flight" version = "4.0.0-SNAPSHOT" -source = "git+https://github.com/apache/arrow.git?rev=6208a79739d0228ecc566fa8436ee61068452212#6208a79739d0228ecc566fa8436ee61068452212" +source = "git+https://github.com/apache/arrow.git?rev=9262a5d83b6037d0c39668310b1875e32f1b7e83#9262a5d83b6037d0c39668310b1875e32f1b7e83" dependencies = [ "arrow", "bytes", @@ -518,9 +518,9 @@ dependencies = [ [[package]] name = "cloud-storage" -version = "0.8.3" +version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a27c92803d7c48c97d828f468d0bb3069f21a2531ccc8361486a7e05ba9518ec" +checksum = "3a536949c1551eb281033e5cc44ea9b5a9b162adb6a57e50eb1413b94d660882" dependencies = [ "base64 0.13.0", "bytes", @@ -781,7 +781,7 @@ dependencies = [ [[package]] name = "datafusion" version = "4.0.0-SNAPSHOT" -source = "git+https://github.com/apache/arrow.git?rev=6208a79739d0228ecc566fa8436ee61068452212#6208a79739d0228ecc566fa8436ee61068452212" +source = "git+https://github.com/apache/arrow.git?rev=9262a5d83b6037d0c39668310b1875e32f1b7e83#9262a5d83b6037d0c39668310b1875e32f1b7e83" dependencies = [ "ahash 0.7.2", "arrow", @@ -789,7 +789,7 @@ dependencies = [ "chrono", "clap", "futures", - "hashbrown", + "hashbrown 0.11.2", "log", "num_cpus", "ordered-float 2.1.1", @@ -1206,9 +1206,9 @@ dependencies = [ [[package]] name = "h2" -version = "0.3.1" +version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d832b01df74254fe364568d6ddc294443f61cbec82816b60904303af87efae78" +checksum = "fc018e188373e2777d0ef2467ebff62a08e66c3f5857b23c8fbec3018210dc00" dependencies = [ "bytes", "fnv", @@ -1238,6 +1238,15 @@ dependencies = [ "ahash 0.4.7", ] +[[package]] +name = "hashbrown" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab5ef0d4909ef3724cc8cce6ccc8572c5c817592e9285f5464f8e86f8bd3726e" +dependencies = [ + "ahash 0.7.2", +] + [[package]] name = "heck" version = "0.3.2" @@ -1285,12 +1294,13 @@ dependencies = [ [[package]] name = "http-body" -version = "0.4.0" +version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2861bd27ee074e5ee891e8b539837a9430012e249d7f0ca2d795650f579c1994" +checksum = "5dfb77c123b4e2f72a2069aeae0b4b4949cc7e966df277813fc16347e7549737" dependencies = [ "bytes", "http", + "pin-project-lite", ] [[package]] @@ -1389,7 +1399,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "824845a0bf897a9042383849b02c1bc219c2383772efcd5c6f9766fa4b81aef3" dependencies = [ "autocfg", - "hashbrown", + "hashbrown 0.9.1", ] [[package]] @@ -1606,9 +1616,9 @@ dependencies = [ [[package]] name = "js-sys" -version = "0.3.48" +version = "0.3.49" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dc9f84f9b115ce7843d60706df1422a916680bfdfcbdb0447c5614ff9d7e4d78" +checksum = "dc15e39392125075f60c95ba416f5381ff6c3a948ff02ab12464715adf56c821" dependencies = [ "wasm-bindgen", ] @@ -1654,9 +1664,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.89" +version = "0.2.91" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "538c092e5586f4cdd7dd8078c4a79220e3e168880218124dcbce860f0ea938c6" +checksum = "8916b1f6ca17130ec6568feccee27c156ad12037880833a3b842a823236502e7" [[package]] name = "libloading" @@ -1799,9 +1809,9 @@ dependencies = [ [[package]] name = "mio" -version = "0.7.10" +version = "0.7.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2182a122f3b7f3f5329cb1972cee089ba2459a0a80a56935e6e674f096f8d839" +checksum = "cf80d3e903b34e0bd7282b218398aec54e082c840d9baf8339e0080a0c542956" dependencies = [ "libc", "log", @@ -1812,11 +1822,10 @@ dependencies = [ [[package]] name = "miow" -version = "0.3.6" +version = "0.3.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5a33c1b55807fbed163481b5ba66db4b2fa6cde694a5027be10fb724206c5897" +checksum = "b9f1c5b025cda876f66ef43a113f91ebc9f4ccef34843000e0adf6ebbab84e21" dependencies = [ - "socket2", "winapi", ] @@ -2242,7 +2251,7 @@ dependencies = [ [[package]] name = "parquet" version = "4.0.0-SNAPSHOT" -source = "git+https://github.com/apache/arrow.git?rev=6208a79739d0228ecc566fa8436ee61068452212#6208a79739d0228ecc566fa8436ee61068452212" +source = "git+https://github.com/apache/arrow.git?rev=9262a5d83b6037d0c39668310b1875e32f1b7e83#9262a5d83b6037d0c39668310b1875e32f1b7e83" dependencies = [ "arrow", "base64 0.12.3", @@ -2703,7 +2712,7 @@ dependencies = [ "criterion", "croaring", "either", - "hashbrown", + "hashbrown 0.9.1", "internal_types", "itertools 0.9.0", "packers", @@ -3072,9 +3081,9 @@ checksum = "388a1df253eca08550bef6c72392cfe7c30914bf41df5269b68cbd6ff8f570a3" [[package]] name = "serde" -version = "1.0.124" +version = "1.0.125" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bd761ff957cb2a45fbb9ab3da6512de9de55872866160b23c25f1a841e99d29f" +checksum = "558dc50e1a5a5fa7112ca2ce4effcb321b0300c0d4ccf0776a9f60cd89031171" dependencies = [ "serde_derive", ] @@ -3103,9 +3112,9 @@ dependencies = [ [[package]] name = "serde_derive" -version = "1.0.124" +version = "1.0.125" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1800f7693e94e186f5e25a28291ae1570da908aff7d97a095dec1e56ff99069b" +checksum = "b093b7a2bb58203b5da3056c05b4ec1fed827dcfdb37347a8841695263b3d06d" dependencies = [ "proc-macro2", "quote", @@ -3172,7 +3181,7 @@ dependencies = [ "flatbuffers", "futures", "generated_types", - "hashbrown", + "hashbrown 0.9.1", "influxdb_line_protocol", "internal_types", "mutable_buffer", @@ -3324,9 +3333,9 @@ checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3" [[package]] name = "standback" -version = "0.2.15" +version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a2beb4d1860a61f571530b3f855a1b538d0200f7871c63331ecd6f17b1f014f8" +checksum = "e113fb6f3de07a243d434a56ec6f186dfd51cb08448239fe7bcae73f87ff28ff" dependencies = [ "version_check", ] @@ -3393,7 +3402,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "383196d1876517ee6f9f0864d1fc1070331b803335d3c6daaa04bbcccd823c08" dependencies = [ "cfg-if 1.0.0", - "hashbrown", + "hashbrown 0.9.1", "serde", ] @@ -3636,9 +3645,9 @@ checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c" [[package]] name = "tokio" -version = "1.3.0" +version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8d56477f6ed99e10225f38f9f75f872f29b8b8bd8c0b946f63345bb144e9eeda" +checksum = "134af885d758d645f0f0505c9a8b3f9bf8a348fd822e112ab5248138348f1722" dependencies = [ "autocfg", "bytes", @@ -3688,9 +3697,9 @@ dependencies = [ [[package]] name = "tokio-stream" -version = "0.1.4" +version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c535f53c0cfa1acace62995a8994fc9cc1f12d202420da96ff306ee24d576469" +checksum = "e177a5d8c3bf36de9ebe6d58537d8879e964332f93fb3339e43f618c81361af0" dependencies = [ "futures-core", "pin-project-lite", @@ -3699,9 +3708,9 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.6.4" +version = "0.6.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec31e5cc6b46e653cf57762f36f71d5e6386391d88a72fd6db4508f8f676fb29" +checksum = "5143d049e85af7fbc36f5454d990e62c2df705b3589f123b71f441b6b59f443f" dependencies = [ "bytes", "futures-core", @@ -3713,9 +3722,9 @@ dependencies = [ [[package]] name = "tonic" -version = "0.4.0" +version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3ba8f479158947373b6df40cf48f4779bb25c99ca3c661bd95e0ab1963ad8b0e" +checksum = "91491e5f15431f2189ec8c1f9dcbadac949450399c22c912ceae9570eb472f61" dependencies = [ "async-stream", "async-trait", @@ -3742,9 +3751,9 @@ dependencies = [ [[package]] name = "tonic-build" -version = "0.4.0" +version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c1e8546fd40d56d28089835c0a81bb396848103b00f888aea42d46eb5974df07" +checksum = "e2e09854abff4c0716059219e155ab0539aecbfc26a40214897b062653adb6ba" dependencies = [ "proc-macro2", "prost-build", @@ -4030,9 +4039,9 @@ dependencies = [ [[package]] name = "walkdir" -version = "2.3.1" +version = "2.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "777182bc735b6424e1a57516d35ed72cb8019d85c8c9bf536dccb3445c1a2f7d" +checksum = "808cf2735cd4b6866113f648b791c6adc5714537bc222d9347bb203386ffda56" dependencies = [ "same-file", "winapi", @@ -4063,9 +4072,9 @@ checksum = "fd6fbd9a79829dd1ad0cc20627bf1ed606756a7f77edff7b66b7064f9cb327c6" [[package]] name = "wasm-bindgen" -version = "0.2.71" +version = "0.2.72" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7ee1280240b7c461d6a0071313e08f34a60b0365f14260362e5a2b17d1d31aa7" +checksum = "8fe8f61dba8e5d645a4d8132dc7a0a66861ed5e1045d2c0ed940fab33bac0fbe" dependencies = [ "cfg-if 1.0.0", "serde", @@ -4075,9 +4084,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-backend" -version = "0.2.71" +version = "0.2.72" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b7d8b6942b8bb3a9b0e73fc79b98095a27de6fa247615e59d096754a3bc2aa8" +checksum = "046ceba58ff062da072c7cb4ba5b22a37f00a302483f7e2a6cdc18fedbdc1fd3" dependencies = [ "bumpalo", "lazy_static", @@ -4090,9 +4099,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-futures" -version = "0.4.21" +version = "0.4.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e67a5806118af01f0d9045915676b22aaebecf4178ae7021bc171dab0b897ab" +checksum = "73157efb9af26fb564bb59a009afd1c7c334a44db171d280690d0c3faaec3468" dependencies = [ "cfg-if 1.0.0", "js-sys", @@ -4102,9 +4111,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro" -version = "0.2.71" +version = "0.2.72" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e5ac38da8ef716661f0f36c0d8320b89028efe10c7c0afde65baffb496ce0d3b" +checksum = "0ef9aa01d36cda046f797c57959ff5f3c615c9cc63997a8d545831ec7976819b" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -4112,9 +4121,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.71" +version = "0.2.72" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cc053ec74d454df287b9374ee8abb36ffd5acb95ba87da3ba5b7d3fe20eb401e" +checksum = "96eb45c1b2ee33545a813a92dbb53856418bf7eb54ab34f7f7ff1448a5b3735d" dependencies = [ "proc-macro2", "quote", @@ -4125,15 +4134,15 @@ dependencies = [ [[package]] name = "wasm-bindgen-shared" -version = "0.2.71" +version = "0.2.72" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7d6f8ec44822dd71f5f221a5847fb34acd9060535c1211b70a05844c0f6383b1" +checksum = "b7148f4696fb4960a346eaa60bbfb42a1ac4ebba21f750f75fc1375b098d5ffa" [[package]] name = "web-sys" -version = "0.3.48" +version = "0.3.49" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec600b26223b2948cedfde2a0aa6756dcf1fef616f43d7b3097aaf53a6c4d92b" +checksum = "59fe19d70f5dacc03f6e46777213facae5ac3801575d56ca6cbd4c93dcd12310" dependencies = [ "js-sys", "wasm-bindgen", diff --git a/arrow_deps/Cargo.toml b/arrow_deps/Cargo.toml index ed63129754..78000cb915 100644 --- a/arrow_deps/Cargo.toml +++ b/arrow_deps/Cargo.toml @@ -8,14 +8,14 @@ description = "Apache Arrow / Parquet / DataFusion dependencies for InfluxDB IOx [dependencies] # In alphabetical order # We are using development version of arrow/parquet/datafusion and the dependencies are at the same rev -# The version can be found here: https://github.com/apache/arrow/commit/6208a79739d0228ecc566fa8436ee61068452212 +# The version can be found here: https://github.com/apache/arrow/commit/9262a5d83b6037d0c39668310b1875e32f1b7e83 # -arrow = { git = "https://github.com/apache/arrow.git", rev = "6208a79739d0228ecc566fa8436ee61068452212" , features = ["simd"] } -arrow-flight = { git = "https://github.com/apache/arrow.git", rev = "6208a79739d0228ecc566fa8436ee61068452212" } +arrow = { git = "https://github.com/apache/arrow.git", rev = "9262a5d83b6037d0c39668310b1875e32f1b7e83" , features = ["simd"] } +arrow-flight = { git = "https://github.com/apache/arrow.git", rev = "9262a5d83b6037d0c39668310b1875e32f1b7e83" } # Turn off optional datafusion features (function packages) -datafusion = { git = "https://github.com/apache/arrow.git", rev = "6208a79739d0228ecc566fa8436ee61068452212", default-features = false } +datafusion = { git = "https://github.com/apache/arrow.git", rev = "9262a5d83b6037d0c39668310b1875e32f1b7e83", default-features = false } # Turn off the "arrow" feature; it currently has a bug that causes the crate to rebuild every time # and we're not currently using it anyway -parquet = { git = "https://github.com/apache/arrow.git", rev = "6208a79739d0228ecc566fa8436ee61068452212", default-features = false, features = ["snap", "brotli", "flate2", "lz4", "zstd"] } +parquet = { git = "https://github.com/apache/arrow.git", rev = "9262a5d83b6037d0c39668310b1875e32f1b7e83", default-features = false, features = ["snap", "brotli", "flate2", "lz4", "zstd"] } diff --git a/data_types/src/database_rules.rs b/data_types/src/database_rules.rs index 00e12f28bf..c4c90c1965 100644 --- a/data_types/src/database_rules.rs +++ b/data_types/src/database_rules.rs @@ -2,7 +2,6 @@ use std::convert::{TryFrom, TryInto}; use chrono::{DateTime, TimeZone, Utc}; use regex::Regex; -use serde::{Deserialize, Serialize}; use snafu::Snafu; use generated_types::google::protobuf::Empty; @@ -22,27 +21,33 @@ pub enum Error { source_module: &'static str, source: Box, }, + + #[snafu(context(false))] + ProstDecodeError { source: prost::DecodeError }, + + #[snafu(context(false))] + ProstEncodeError { source: prost::EncodeError }, + + #[snafu(context(false))] + FieldViolation { source: FieldViolation }, } pub type Result = std::result::Result; /// DatabaseRules contains the rules for replicating data, sending data to /// subscribers, and querying data for a single database. -#[derive(Debug, Serialize, Deserialize, Default, Eq, PartialEq, Clone)] +#[derive(Debug, Default, Eq, PartialEq, Clone)] pub struct DatabaseRules { /// The unencoded name of the database. This gets put in by the create /// database call, so an empty default is fine. - #[serde(default)] pub name: String, // TODO: Use DatabaseName here /// Template that generates a partition key for each row inserted into the /// db - #[serde(default)] pub partition_template: PartitionTemplate, /// When set this will buffer WAL writes in memory based on the /// configuration. - #[serde(default)] pub wal_buffer_config: Option, /// Unless explicitly disabled by setting this to None (or null in JSON), @@ -50,7 +55,6 @@ pub struct DatabaseRules { /// called the Mutable Buffer. It is optimized to receive writes so they /// can be batched together later to the Read Buffer or to Parquet files /// in object storage. - #[serde(default = "MutableBufferConfig::default_option")] pub mutable_buffer_config: Option, /// An optional config to split writes into different "shards". A shard @@ -60,7 +64,6 @@ pub struct DatabaseRules { /// based on table name and assign to 1 of 10 shards. Within each /// shard you would have partitions, which would likely be based off time. /// This makes it possible to horizontally scale out writes. - #[serde(default)] pub shard_config: Option, } @@ -81,6 +84,18 @@ impl DatabaseRules { } } +impl DatabaseRules { + pub fn decode(bytes: prost::bytes::Bytes) -> Result { + let message: management::DatabaseRules = prost::Message::decode(bytes)?; + Ok(message.try_into()?) + } + + pub fn encode(self, bytes: &mut prost::bytes::BytesMut) -> Result<()> { + let encoded: management::DatabaseRules = self.into(); + Ok(prost::Message::encode(&encoded, bytes)?) + } +} + /// Generates a partition key based on the line and the default time. pub trait Partitioner { fn partition_key( @@ -137,7 +152,7 @@ impl TryFrom for DatabaseRules { /// MutableBufferConfig defines the configuration for the in-memory database /// that is hot for writes as they arrive. Operators can define rules for /// evicting data once the mutable buffer passes a set memory threshold. -#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)] +#[derive(Debug, Eq, PartialEq, Clone)] pub struct MutableBufferConfig { /// The size the mutable buffer should be limited to. Once the buffer gets /// to this size it will drop partitions in the given order. If unable @@ -244,7 +259,7 @@ impl TryFrom for MutableBufferConfig { /// sort: PartitionSort::CreatedAtTime, /// }; /// ``` -#[derive(Debug, Default, Serialize, Deserialize, Eq, PartialEq, Clone)] +#[derive(Debug, Default, Eq, PartialEq, Clone)] pub struct PartitionSortRules { /// Sort partitions by this order. Last will be dropped. pub order: Order, @@ -278,7 +293,7 @@ impl TryFrom for Partitio } /// What to sort the partition by. -#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)] +#[derive(Debug, Eq, PartialEq, Clone)] pub enum PartitionSort { /// The last time the partition received a write. LastWriteTime, @@ -351,7 +366,7 @@ impl TryFrom for } /// The sort order. -#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)] +#[derive(Debug, Eq, PartialEq, Clone)] pub enum Order { Asc, Desc, @@ -385,7 +400,7 @@ impl TryFrom for Order { } /// Use columns of this type. -#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)] +#[derive(Debug, Eq, PartialEq, Clone)] pub enum ColumnType { I64, U64, @@ -422,7 +437,7 @@ impl TryFrom for ColumnType { } /// Use either the min or max summary statistic. -#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)] +#[derive(Debug, Eq, PartialEq, Clone)] pub enum ColumnValue { Min, Max, @@ -454,7 +469,7 @@ impl TryFrom for ColumnValue { /// WalBufferConfig defines the configuration for buffering data from the WAL in /// memory. This buffer is used for asynchronous replication and to collect /// segments before sending them to object storage. -#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)] +#[derive(Debug, Eq, PartialEq, Clone)] pub struct WalBufferConfig { /// The size the WAL buffer should be limited to. Once the buffer gets to /// this size it will drop old segments to remain below this size, but @@ -528,7 +543,7 @@ impl TryFrom for WalBufferConfig { /// WalBufferRollover defines the behavior of what should happen if a write /// comes in that would cause the buffer to exceed its max size AND the oldest /// segment can't be dropped because it has not yet been persisted. -#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, Copy)] +#[derive(Debug, Clone, Eq, PartialEq, Copy)] pub enum WalBufferRollover { /// Drop the old segment even though it hasn't been persisted. This part of /// the WAL will be lost on this server. @@ -573,7 +588,7 @@ impl TryFrom for WalBufferRollover { /// /// The key is constructed in order of the template parts; thus ordering changes /// what partition key is generated. -#[derive(Debug, Serialize, Deserialize, Default, Eq, PartialEq, Clone)] +#[derive(Debug, Default, Eq, PartialEq, Clone)] pub struct PartitionTemplate { pub parts: Vec, } @@ -627,7 +642,7 @@ impl TryFrom for PartitionTemplate { /// `TemplatePart` specifies what part of a row should be used to compute this /// part of a partition key. -#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)] +#[derive(Debug, Eq, PartialEq, Clone)] pub enum TemplatePart { /// The name of a table Table, @@ -647,7 +662,7 @@ pub enum TemplatePart { /// `RegexCapture` is for pulling parts of a string column into the partition /// key. -#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)] +#[derive(Debug, Eq, PartialEq, Clone)] pub struct RegexCapture { column: String, regex: String, @@ -662,7 +677,7 @@ pub struct RegexCapture { /// For example, a time format of "%Y-%m-%d %H:%M:%S" will produce /// partition key parts such as "2021-03-14 12:25:21" and /// "2021-04-14 12:24:21" -#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)] +#[derive(Debug, Eq, PartialEq, Clone)] pub struct StrftimeColumn { column: String, format: String, @@ -737,7 +752,7 @@ impl TryFrom for TemplatePart { /// based on table name and assign to 1 of 10 shards. Within each /// shard you would have partitions, which would likely be based off time. /// This makes it possible to horizontally scale out writes. -#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)] +#[derive(Debug, Eq, PartialEq, Clone)] pub struct ShardConfig { /// An optional matcher. If there is a match, the route will be evaluated to /// the given targets, otherwise the hash ring will be evaluated. This is @@ -758,7 +773,7 @@ pub struct ShardConfig { /// Maps a matcher with specific target group. If the line/row matches /// it should be sent to the group. -#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Default)] +#[derive(Debug, Eq, PartialEq, Clone, Default)] pub struct MatcherToTargets { pub matcher: Matcher, pub target: NodeGroup, @@ -769,7 +784,7 @@ pub type NodeGroup = Vec; /// HashRing is a rule for creating a hash key for a row and mapping that to /// an individual node on a ring. -#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)] +#[derive(Debug, Eq, PartialEq, Clone)] pub struct HashRing { /// If true the table name will be included in the hash key pub table_name: bool, @@ -781,10 +796,9 @@ pub struct HashRing { /// A matcher is used to match routing rules or subscriptions on a row-by-row /// (or line) basis. -#[derive(Debug, Serialize, Deserialize, Clone, Default)] +#[derive(Debug, Clone, Default)] pub struct Matcher { /// if provided, match if the table name matches against the regex - #[serde(with = "serde_regex")] pub table_name_regex: Option, // paul: what should we use for predicate matching here against a single row/line? pub predicate: Option, diff --git a/query/src/frontend/influxrpc.rs b/query/src/frontend/influxrpc.rs index 590233ae8b..c08a827733 100644 --- a/query/src/frontend/influxrpc.rs +++ b/query/src/frontend/influxrpc.rs @@ -454,7 +454,7 @@ impl InfluxRPCPlanner { // Filter(predicate) // InMemoryScan let plan = plan_builder - .project(&select_exprs) + .project(select_exprs.clone()) .context(BuildingPlan)? .filter(tag_name_is_not_null) .context(BuildingPlan)? @@ -744,7 +744,7 @@ impl InfluxRPCPlanner { .collect::>(); let plan = plan_builder - .project(&select_exprs) + .project(select_exprs) .context(BuildingPlan)? .build() .context(BuildingPlan)?; @@ -804,7 +804,7 @@ impl InfluxRPCPlanner { .collect::>(); let plan = plan_builder - .project(&select_exprs) + .project(select_exprs) .context(BuildingPlan)? .build() .context(BuildingPlan)?; @@ -865,7 +865,7 @@ impl InfluxRPCPlanner { // Order by let plan_builder = plan_builder - .sort(&tags_and_timestamp) + .sort(tags_and_timestamp) .context(BuildingPlan)?; // Select away anything that isn't in the influx data model @@ -877,7 +877,7 @@ impl InfluxRPCPlanner { .collect(); let plan_builder = plan_builder - .project(&tags_fields_and_timestamps) + .project(tags_fields_and_timestamps) .context(BuildingPlan)?; let plan = plan_builder.build().context(BuildingPlan)?; @@ -993,9 +993,9 @@ impl InfluxRPCPlanner { .collect::>(); let plan_builder = plan_builder - .aggregate(&group_exprs, &agg_exprs) + .aggregate(group_exprs, agg_exprs) .context(BuildingPlan)? - .sort(&sort_exprs) + .sort(sort_exprs) .context(BuildingPlan)?; // and finally create the plan @@ -1079,9 +1079,9 @@ impl InfluxRPCPlanner { .collect::>(); let plan_builder = plan_builder - .aggregate(&group_exprs, &agg_exprs) + .aggregate(group_exprs, agg_exprs) .context(BuildingPlan)? - .sort(&sort_exprs) + .sort(sort_exprs) .context(BuildingPlan)?; // and finally create the plan diff --git a/query/src/frontend/sql.rs b/query/src/frontend/sql.rs index f30788236b..76d9251cbe 100644 --- a/query/src/frontend/sql.rs +++ b/query/src/frontend/sql.rs @@ -63,6 +63,16 @@ pub enum Error { table_name: String, source: crate::provider::Error, }, + + #[snafu(display( + "Error registering table provider for table '{}': {}", + table_name, + source + ))] + RegisteringTableProvider { + table_name: String, + source: DataFusionError, + }, } pub type Result = std::result::Result; @@ -128,7 +138,8 @@ impl SQLQueryPlanner { .context(CreatingTableProvider { table_name })?; ctx.inner_mut() - .register_table(&table_name, Arc::new(provider)); + .register_table(table_name.as_str(), Arc::new(provider)) + .context(RegisteringTableProvider { table_name })?; } ctx.prepare_sql(query).await.context(Preparing) diff --git a/query/src/func/selectors.rs b/query/src/func/selectors.rs index 484f5e5843..c12ddb3f5e 100644 --- a/query/src/func/selectors.rs +++ b/query/src/func/selectors.rs @@ -672,10 +672,10 @@ mod test { ) .unwrap(); let mut ctx = ExecutionContext::new(); - ctx.register_table("t", Arc::new(provider)); + ctx.register_table("t", Arc::new(provider)).unwrap(); let df = ctx.table("t").unwrap(); - let df = df.aggregate(&[], &aggs).unwrap(); + let df = df.aggregate(vec![], aggs).unwrap(); // execute the query let record_batches = df.collect().await.unwrap(); diff --git a/read_buffer/src/column/encoding/bool.rs b/read_buffer/src/column/encoding/bool.rs index fdba7048a5..e346cedc84 100644 --- a/read_buffer/src/column/encoding/bool.rs +++ b/read_buffer/src/column/encoding/bool.rs @@ -343,7 +343,7 @@ mod test { #[test] fn size() { let v = Bool::from(vec![None, None, Some(true), Some(false)].as_slice()); - assert_eq!(v.size(), 240); + assert_eq!(v.size(), 464); } #[test] diff --git a/read_buffer/src/column/encoding/fixed_null.rs b/read_buffer/src/column/encoding/fixed_null.rs index 0be758cfa7..889f15e4a2 100644 --- a/read_buffer/src/column/encoding/fixed_null.rs +++ b/read_buffer/src/column/encoding/fixed_null.rs @@ -599,7 +599,7 @@ mod test { #[test] fn size() { let v = FixedNull::::from(vec![None, None, Some(100), Some(2222)].as_slice()); - assert_eq!(v.size(), 240); + assert_eq!(v.size(), 344); } #[test] diff --git a/read_buffer/src/row_group.rs b/read_buffer/src/row_group.rs index 2a4f112bb0..0d755d2ece 100644 --- a/read_buffer/src/row_group.rs +++ b/read_buffer/src/row_group.rs @@ -1092,27 +1092,27 @@ impl From for RowGroup { Some(InfluxColumnType::Tag) => { assert_eq!(arrow_column.data_type(), &arrow::datatypes::DataType::Utf8); let column_data = - Column::from(arrow::array::StringArray::from(arrow_column.data())); + Column::from(arrow::array::StringArray::from(arrow_column.data().clone())); columns.insert(col_name.to_owned(), ColumnType::Tag(column_data)); } Some(InfluxColumnType::Field(_)) => { let column_data = match arrow_column.data_type() { - arrow::datatypes::DataType::Int64 => { - Column::from(arrow::array::Int64Array::from(arrow_column.data())) - } - arrow::datatypes::DataType::Float64 => { - Column::from(arrow::array::Float64Array::from(arrow_column.data())) - } - arrow::datatypes::DataType::UInt64 => { - Column::from(arrow::array::UInt64Array::from(arrow_column.data())) - } - arrow::datatypes::DataType::Boolean => { - Column::from(arrow::array::BooleanArray::from(arrow_column.data())) - } - arrow::datatypes::DataType::Utf8 => { - Column::from(arrow::array::StringArray::from(arrow_column.data())) - } + arrow::datatypes::DataType::Int64 => Column::from( + arrow::array::Int64Array::from(arrow_column.data().clone()), + ), + arrow::datatypes::DataType::Float64 => Column::from( + arrow::array::Float64Array::from(arrow_column.data().clone()), + ), + arrow::datatypes::DataType::UInt64 => Column::from( + arrow::array::UInt64Array::from(arrow_column.data().clone()), + ), + arrow::datatypes::DataType::Boolean => Column::from( + arrow::array::BooleanArray::from(arrow_column.data().clone()), + ), + arrow::datatypes::DataType::Utf8 => Column::from( + arrow::array::StringArray::from(arrow_column.data().clone()), + ), dt => unimplemented!( "data type {:?} currently not supported for field columns", dt @@ -1125,7 +1125,7 @@ impl From for RowGroup { assert_eq!(col_name, TIME_COLUMN_NAME); let column_data = - Column::from(arrow::array::Int64Array::from(arrow_column.data())); + Column::from(arrow::array::Int64Array::from(arrow_column.data().clone())); columns.insert(col_name.to_owned(), ColumnType::Time(column_data)); } diff --git a/server/src/config.rs b/server/src/config.rs index 4082203faa..503a1e90cc 100644 --- a/server/src/config.rs +++ b/server/src/config.rs @@ -17,7 +17,7 @@ use tokio::task::JoinHandle; use tokio_util::sync::CancellationToken; use tracing::{error, info, warn, Instrument}; -pub(crate) const DB_RULES_FILE_NAME: &str = "rules.json"; +pub(crate) const DB_RULES_FILE_NAME: &str = "rules.pb"; /// The Config tracks the configuration of databases and their rules along /// with host groups for replication. It is used as an in-memory structure @@ -318,7 +318,7 @@ mod test { let mut expected_path = base_path; expected_path.push_dir("foo"); - expected_path.set_file_name("rules.json"); + expected_path.set_file_name("rules.pb"); assert_eq!(rules_path, expected_path); } diff --git a/server/src/lib.rs b/server/src/lib.rs index 82f67b3b91..f744997058 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -73,7 +73,7 @@ use std::sync::{ }; use async_trait::async_trait; -use bytes::Bytes; +use bytes::BytesMut; use futures::stream::TryStreamExt; use parking_lot::Mutex; use snafu::{OptionExt, ResultExt, Snafu}; @@ -139,7 +139,9 @@ pub enum Error { #[snafu(display("unable to use server until id is set"))] IdNotSet, #[snafu(display("error serializing configuration {}", source))] - ErrorSerializing { source: serde_json::Error }, + ErrorSerializing { + source: data_types::database_rules::Error, + }, #[snafu(display("error deserializing configuration {}", source))] ErrorDeserializing { source: serde_json::Error }, #[snafu(display("store error: {}", source))] @@ -239,13 +241,19 @@ impl Server { let db_reservation = self.config.create_db(db_name, rules)?; - let data = - Bytes::from(serde_json::to_vec(&db_reservation.db.rules).context(ErrorSerializing)?); + let mut data = BytesMut::new(); + db_reservation + .db + .rules + .clone() + .encode(&mut data) + .context(ErrorSerializing)?; + let len = data.len(); let location = object_store_path_for_database_config(&self.root_path()?, &db_reservation.name); - let stream_data = std::io::Result::Ok(data); + let stream_data = std::io::Result::Ok(data.freeze()); self.store .put( &location, @@ -305,9 +313,9 @@ impl Server { res = get_store_bytes(&path, &store).await; } - let res = res.unwrap(); + let res = res.unwrap().freeze(); - match serde_json::from_slice::(&res) { + match DatabaseRules::decode(res) { Err(e) => { error!("error parsing database config {:?} from store: {}", path, e) } @@ -722,7 +730,7 @@ mod tests { let mut rules_path = server.store.new_path(); rules_path.push_all_dirs(&["1", name]); - rules_path.set_file_name("rules.json"); + rules_path.set_file_name("rules.pb"); let read_data = server .store @@ -732,10 +740,10 @@ mod tests { .map_ok(|b| bytes::BytesMut::from(&b[..])) .try_concat() .await - .unwrap(); + .unwrap() + .freeze(); - let read_data = std::str::from_utf8(&*read_data).unwrap(); - let read_rules = serde_json::from_str::(read_data).unwrap(); + let read_rules = DatabaseRules::decode(read_data).unwrap(); assert_eq!(rules, read_rules); diff --git a/server/src/tracker.rs b/server/src/tracker.rs index 007f98e511..dcbdef2737 100644 --- a/server/src/tracker.rs +++ b/server/src/tracker.rs @@ -551,15 +551,11 @@ mod tests { let assert_fuzzy = |actual: usize, expected: std::time::Duration| { // Number of milliseconds of toleration - let epsilon = Duration::from_millis(10).as_nanos() as usize; + let epsilon = Duration::from_millis(25).as_nanos() as usize; let expected = expected.as_nanos() as usize; - assert!( - actual > expected.saturating_sub(epsilon), - "Expected {} got {}", - expected, - actual - ); + // std::thread::sleep is guaranteed to take at least as long as requested + assert!(actual > expected, "Expected {} got {}", expected, actual); assert!( actual < expected.saturating_add(epsilon), "Expected {} got {}",