Merge branch 'main' into ntran/1048

pull/24376/head
kodiakhq[bot] 2021-03-25 17:21:31 +00:00 committed by GitHub
commit c15a2251f5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 176 additions and 138 deletions

125
Cargo.lock generated
View File

@ -92,9 +92,9 @@ dependencies = [
[[package]]
name = "anyhow"
version = "1.0.38"
version = "1.0.39"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "afddf7f520a80dbf76e6f50a35bca42a2331ef227a28b3b6dc5c2e2338d114b1"
checksum = "81cddc5f91628367664cc7c69714ff08deee8a3efc54623011c772544d7b2767"
[[package]]
name = "arrayref"
@ -111,7 +111,7 @@ checksum = "23b62fc65de8e4e7f52534fb52b0f3ed04746ae267519eef2a83941e8085068b"
[[package]]
name = "arrow"
version = "4.0.0-SNAPSHOT"
source = "git+https://github.com/apache/arrow.git?rev=6208a79739d0228ecc566fa8436ee61068452212#6208a79739d0228ecc566fa8436ee61068452212"
source = "git+https://github.com/apache/arrow.git?rev=9262a5d83b6037d0c39668310b1875e32f1b7e83#9262a5d83b6037d0c39668310b1875e32f1b7e83"
dependencies = [
"cfg_aliases",
"chrono",
@ -134,7 +134,7 @@ dependencies = [
[[package]]
name = "arrow-flight"
version = "4.0.0-SNAPSHOT"
source = "git+https://github.com/apache/arrow.git?rev=6208a79739d0228ecc566fa8436ee61068452212#6208a79739d0228ecc566fa8436ee61068452212"
source = "git+https://github.com/apache/arrow.git?rev=9262a5d83b6037d0c39668310b1875e32f1b7e83#9262a5d83b6037d0c39668310b1875e32f1b7e83"
dependencies = [
"arrow",
"bytes",
@ -518,9 +518,9 @@ dependencies = [
[[package]]
name = "cloud-storage"
version = "0.8.3"
version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a27c92803d7c48c97d828f468d0bb3069f21a2531ccc8361486a7e05ba9518ec"
checksum = "3a536949c1551eb281033e5cc44ea9b5a9b162adb6a57e50eb1413b94d660882"
dependencies = [
"base64 0.13.0",
"bytes",
@ -781,7 +781,7 @@ dependencies = [
[[package]]
name = "datafusion"
version = "4.0.0-SNAPSHOT"
source = "git+https://github.com/apache/arrow.git?rev=6208a79739d0228ecc566fa8436ee61068452212#6208a79739d0228ecc566fa8436ee61068452212"
source = "git+https://github.com/apache/arrow.git?rev=9262a5d83b6037d0c39668310b1875e32f1b7e83#9262a5d83b6037d0c39668310b1875e32f1b7e83"
dependencies = [
"ahash 0.7.2",
"arrow",
@ -789,7 +789,7 @@ dependencies = [
"chrono",
"clap",
"futures",
"hashbrown",
"hashbrown 0.11.2",
"log",
"num_cpus",
"ordered-float 2.1.1",
@ -1206,9 +1206,9 @@ dependencies = [
[[package]]
name = "h2"
version = "0.3.1"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d832b01df74254fe364568d6ddc294443f61cbec82816b60904303af87efae78"
checksum = "fc018e188373e2777d0ef2467ebff62a08e66c3f5857b23c8fbec3018210dc00"
dependencies = [
"bytes",
"fnv",
@ -1238,6 +1238,15 @@ dependencies = [
"ahash 0.4.7",
]
[[package]]
name = "hashbrown"
version = "0.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ab5ef0d4909ef3724cc8cce6ccc8572c5c817592e9285f5464f8e86f8bd3726e"
dependencies = [
"ahash 0.7.2",
]
[[package]]
name = "heck"
version = "0.3.2"
@ -1285,12 +1294,13 @@ dependencies = [
[[package]]
name = "http-body"
version = "0.4.0"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2861bd27ee074e5ee891e8b539837a9430012e249d7f0ca2d795650f579c1994"
checksum = "5dfb77c123b4e2f72a2069aeae0b4b4949cc7e966df277813fc16347e7549737"
dependencies = [
"bytes",
"http",
"pin-project-lite",
]
[[package]]
@ -1389,7 +1399,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "824845a0bf897a9042383849b02c1bc219c2383772efcd5c6f9766fa4b81aef3"
dependencies = [
"autocfg",
"hashbrown",
"hashbrown 0.9.1",
]
[[package]]
@ -1606,9 +1616,9 @@ dependencies = [
[[package]]
name = "js-sys"
version = "0.3.48"
version = "0.3.49"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dc9f84f9b115ce7843d60706df1422a916680bfdfcbdb0447c5614ff9d7e4d78"
checksum = "dc15e39392125075f60c95ba416f5381ff6c3a948ff02ab12464715adf56c821"
dependencies = [
"wasm-bindgen",
]
@ -1654,9 +1664,9 @@ dependencies = [
[[package]]
name = "libc"
version = "0.2.89"
version = "0.2.91"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "538c092e5586f4cdd7dd8078c4a79220e3e168880218124dcbce860f0ea938c6"
checksum = "8916b1f6ca17130ec6568feccee27c156ad12037880833a3b842a823236502e7"
[[package]]
name = "libloading"
@ -1799,9 +1809,9 @@ dependencies = [
[[package]]
name = "mio"
version = "0.7.10"
version = "0.7.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2182a122f3b7f3f5329cb1972cee089ba2459a0a80a56935e6e674f096f8d839"
checksum = "cf80d3e903b34e0bd7282b218398aec54e082c840d9baf8339e0080a0c542956"
dependencies = [
"libc",
"log",
@ -1812,11 +1822,10 @@ dependencies = [
[[package]]
name = "miow"
version = "0.3.6"
version = "0.3.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5a33c1b55807fbed163481b5ba66db4b2fa6cde694a5027be10fb724206c5897"
checksum = "b9f1c5b025cda876f66ef43a113f91ebc9f4ccef34843000e0adf6ebbab84e21"
dependencies = [
"socket2",
"winapi",
]
@ -2242,7 +2251,7 @@ dependencies = [
[[package]]
name = "parquet"
version = "4.0.0-SNAPSHOT"
source = "git+https://github.com/apache/arrow.git?rev=6208a79739d0228ecc566fa8436ee61068452212#6208a79739d0228ecc566fa8436ee61068452212"
source = "git+https://github.com/apache/arrow.git?rev=9262a5d83b6037d0c39668310b1875e32f1b7e83#9262a5d83b6037d0c39668310b1875e32f1b7e83"
dependencies = [
"arrow",
"base64 0.12.3",
@ -2703,7 +2712,7 @@ dependencies = [
"criterion",
"croaring",
"either",
"hashbrown",
"hashbrown 0.9.1",
"internal_types",
"itertools 0.9.0",
"packers",
@ -3072,9 +3081,9 @@ checksum = "388a1df253eca08550bef6c72392cfe7c30914bf41df5269b68cbd6ff8f570a3"
[[package]]
name = "serde"
version = "1.0.124"
version = "1.0.125"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bd761ff957cb2a45fbb9ab3da6512de9de55872866160b23c25f1a841e99d29f"
checksum = "558dc50e1a5a5fa7112ca2ce4effcb321b0300c0d4ccf0776a9f60cd89031171"
dependencies = [
"serde_derive",
]
@ -3103,9 +3112,9 @@ dependencies = [
[[package]]
name = "serde_derive"
version = "1.0.124"
version = "1.0.125"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1800f7693e94e186f5e25a28291ae1570da908aff7d97a095dec1e56ff99069b"
checksum = "b093b7a2bb58203b5da3056c05b4ec1fed827dcfdb37347a8841695263b3d06d"
dependencies = [
"proc-macro2",
"quote",
@ -3172,7 +3181,7 @@ dependencies = [
"flatbuffers",
"futures",
"generated_types",
"hashbrown",
"hashbrown 0.9.1",
"influxdb_line_protocol",
"internal_types",
"mutable_buffer",
@ -3324,9 +3333,9 @@ checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3"
[[package]]
name = "standback"
version = "0.2.15"
version = "0.2.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a2beb4d1860a61f571530b3f855a1b538d0200f7871c63331ecd6f17b1f014f8"
checksum = "e113fb6f3de07a243d434a56ec6f186dfd51cb08448239fe7bcae73f87ff28ff"
dependencies = [
"version_check",
]
@ -3393,7 +3402,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "383196d1876517ee6f9f0864d1fc1070331b803335d3c6daaa04bbcccd823c08"
dependencies = [
"cfg-if 1.0.0",
"hashbrown",
"hashbrown 0.9.1",
"serde",
]
@ -3636,9 +3645,9 @@ checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c"
[[package]]
name = "tokio"
version = "1.3.0"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8d56477f6ed99e10225f38f9f75f872f29b8b8bd8c0b946f63345bb144e9eeda"
checksum = "134af885d758d645f0f0505c9a8b3f9bf8a348fd822e112ab5248138348f1722"
dependencies = [
"autocfg",
"bytes",
@ -3688,9 +3697,9 @@ dependencies = [
[[package]]
name = "tokio-stream"
version = "0.1.4"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c535f53c0cfa1acace62995a8994fc9cc1f12d202420da96ff306ee24d576469"
checksum = "e177a5d8c3bf36de9ebe6d58537d8879e964332f93fb3339e43f618c81361af0"
dependencies = [
"futures-core",
"pin-project-lite",
@ -3699,9 +3708,9 @@ dependencies = [
[[package]]
name = "tokio-util"
version = "0.6.4"
version = "0.6.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ec31e5cc6b46e653cf57762f36f71d5e6386391d88a72fd6db4508f8f676fb29"
checksum = "5143d049e85af7fbc36f5454d990e62c2df705b3589f123b71f441b6b59f443f"
dependencies = [
"bytes",
"futures-core",
@ -3713,9 +3722,9 @@ dependencies = [
[[package]]
name = "tonic"
version = "0.4.0"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3ba8f479158947373b6df40cf48f4779bb25c99ca3c661bd95e0ab1963ad8b0e"
checksum = "91491e5f15431f2189ec8c1f9dcbadac949450399c22c912ceae9570eb472f61"
dependencies = [
"async-stream",
"async-trait",
@ -3742,9 +3751,9 @@ dependencies = [
[[package]]
name = "tonic-build"
version = "0.4.0"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c1e8546fd40d56d28089835c0a81bb396848103b00f888aea42d46eb5974df07"
checksum = "e2e09854abff4c0716059219e155ab0539aecbfc26a40214897b062653adb6ba"
dependencies = [
"proc-macro2",
"prost-build",
@ -4030,9 +4039,9 @@ dependencies = [
[[package]]
name = "walkdir"
version = "2.3.1"
version = "2.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "777182bc735b6424e1a57516d35ed72cb8019d85c8c9bf536dccb3445c1a2f7d"
checksum = "808cf2735cd4b6866113f648b791c6adc5714537bc222d9347bb203386ffda56"
dependencies = [
"same-file",
"winapi",
@ -4063,9 +4072,9 @@ checksum = "fd6fbd9a79829dd1ad0cc20627bf1ed606756a7f77edff7b66b7064f9cb327c6"
[[package]]
name = "wasm-bindgen"
version = "0.2.71"
version = "0.2.72"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7ee1280240b7c461d6a0071313e08f34a60b0365f14260362e5a2b17d1d31aa7"
checksum = "8fe8f61dba8e5d645a4d8132dc7a0a66861ed5e1045d2c0ed940fab33bac0fbe"
dependencies = [
"cfg-if 1.0.0",
"serde",
@ -4075,9 +4084,9 @@ dependencies = [
[[package]]
name = "wasm-bindgen-backend"
version = "0.2.71"
version = "0.2.72"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b7d8b6942b8bb3a9b0e73fc79b98095a27de6fa247615e59d096754a3bc2aa8"
checksum = "046ceba58ff062da072c7cb4ba5b22a37f00a302483f7e2a6cdc18fedbdc1fd3"
dependencies = [
"bumpalo",
"lazy_static",
@ -4090,9 +4099,9 @@ dependencies = [
[[package]]
name = "wasm-bindgen-futures"
version = "0.4.21"
version = "0.4.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e67a5806118af01f0d9045915676b22aaebecf4178ae7021bc171dab0b897ab"
checksum = "73157efb9af26fb564bb59a009afd1c7c334a44db171d280690d0c3faaec3468"
dependencies = [
"cfg-if 1.0.0",
"js-sys",
@ -4102,9 +4111,9 @@ dependencies = [
[[package]]
name = "wasm-bindgen-macro"
version = "0.2.71"
version = "0.2.72"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5ac38da8ef716661f0f36c0d8320b89028efe10c7c0afde65baffb496ce0d3b"
checksum = "0ef9aa01d36cda046f797c57959ff5f3c615c9cc63997a8d545831ec7976819b"
dependencies = [
"quote",
"wasm-bindgen-macro-support",
@ -4112,9 +4121,9 @@ dependencies = [
[[package]]
name = "wasm-bindgen-macro-support"
version = "0.2.71"
version = "0.2.72"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cc053ec74d454df287b9374ee8abb36ffd5acb95ba87da3ba5b7d3fe20eb401e"
checksum = "96eb45c1b2ee33545a813a92dbb53856418bf7eb54ab34f7f7ff1448a5b3735d"
dependencies = [
"proc-macro2",
"quote",
@ -4125,15 +4134,15 @@ dependencies = [
[[package]]
name = "wasm-bindgen-shared"
version = "0.2.71"
version = "0.2.72"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7d6f8ec44822dd71f5f221a5847fb34acd9060535c1211b70a05844c0f6383b1"
checksum = "b7148f4696fb4960a346eaa60bbfb42a1ac4ebba21f750f75fc1375b098d5ffa"
[[package]]
name = "web-sys"
version = "0.3.48"
version = "0.3.49"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ec600b26223b2948cedfde2a0aa6756dcf1fef616f43d7b3097aaf53a6c4d92b"
checksum = "59fe19d70f5dacc03f6e46777213facae5ac3801575d56ca6cbd4c93dcd12310"
dependencies = [
"js-sys",
"wasm-bindgen",

View File

@ -8,14 +8,14 @@ description = "Apache Arrow / Parquet / DataFusion dependencies for InfluxDB IOx
[dependencies] # In alphabetical order
# We are using development version of arrow/parquet/datafusion and the dependencies are at the same rev
# The version can be found here: https://github.com/apache/arrow/commit/6208a79739d0228ecc566fa8436ee61068452212
# The version can be found here: https://github.com/apache/arrow/commit/9262a5d83b6037d0c39668310b1875e32f1b7e83
#
arrow = { git = "https://github.com/apache/arrow.git", rev = "6208a79739d0228ecc566fa8436ee61068452212" , features = ["simd"] }
arrow-flight = { git = "https://github.com/apache/arrow.git", rev = "6208a79739d0228ecc566fa8436ee61068452212" }
arrow = { git = "https://github.com/apache/arrow.git", rev = "9262a5d83b6037d0c39668310b1875e32f1b7e83" , features = ["simd"] }
arrow-flight = { git = "https://github.com/apache/arrow.git", rev = "9262a5d83b6037d0c39668310b1875e32f1b7e83" }
# Turn off optional datafusion features (function packages)
datafusion = { git = "https://github.com/apache/arrow.git", rev = "6208a79739d0228ecc566fa8436ee61068452212", default-features = false }
datafusion = { git = "https://github.com/apache/arrow.git", rev = "9262a5d83b6037d0c39668310b1875e32f1b7e83", default-features = false }
# Turn off the "arrow" feature; it currently has a bug that causes the crate to rebuild every time
# and we're not currently using it anyway
parquet = { git = "https://github.com/apache/arrow.git", rev = "6208a79739d0228ecc566fa8436ee61068452212", default-features = false, features = ["snap", "brotli", "flate2", "lz4", "zstd"] }
parquet = { git = "https://github.com/apache/arrow.git", rev = "9262a5d83b6037d0c39668310b1875e32f1b7e83", default-features = false, features = ["snap", "brotli", "flate2", "lz4", "zstd"] }

View File

@ -2,7 +2,6 @@ use std::convert::{TryFrom, TryInto};
use chrono::{DateTime, TimeZone, Utc};
use regex::Regex;
use serde::{Deserialize, Serialize};
use snafu::Snafu;
use generated_types::google::protobuf::Empty;
@ -22,27 +21,33 @@ pub enum Error {
source_module: &'static str,
source: Box<dyn std::error::Error + Send + Sync + 'static>,
},
#[snafu(context(false))]
ProstDecodeError { source: prost::DecodeError },
#[snafu(context(false))]
ProstEncodeError { source: prost::EncodeError },
#[snafu(context(false))]
FieldViolation { source: FieldViolation },
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// DatabaseRules contains the rules for replicating data, sending data to
/// subscribers, and querying data for a single database.
#[derive(Debug, Serialize, Deserialize, Default, Eq, PartialEq, Clone)]
#[derive(Debug, Default, Eq, PartialEq, Clone)]
pub struct DatabaseRules {
/// The unencoded name of the database. This gets put in by the create
/// database call, so an empty default is fine.
#[serde(default)]
pub name: String, // TODO: Use DatabaseName here
/// Template that generates a partition key for each row inserted into the
/// db
#[serde(default)]
pub partition_template: PartitionTemplate,
/// When set this will buffer WAL writes in memory based on the
/// configuration.
#[serde(default)]
pub wal_buffer_config: Option<WalBufferConfig>,
/// Unless explicitly disabled by setting this to None (or null in JSON),
@ -50,7 +55,6 @@ pub struct DatabaseRules {
/// called the Mutable Buffer. It is optimized to receive writes so they
/// can be batched together later to the Read Buffer or to Parquet files
/// in object storage.
#[serde(default = "MutableBufferConfig::default_option")]
pub mutable_buffer_config: Option<MutableBufferConfig>,
/// An optional config to split writes into different "shards". A shard
@ -60,7 +64,6 @@ pub struct DatabaseRules {
/// based on table name and assign to 1 of 10 shards. Within each
/// shard you would have partitions, which would likely be based off time.
/// This makes it possible to horizontally scale out writes.
#[serde(default)]
pub shard_config: Option<ShardConfig>,
}
@ -81,6 +84,18 @@ impl DatabaseRules {
}
}
impl DatabaseRules {
pub fn decode(bytes: prost::bytes::Bytes) -> Result<Self> {
let message: management::DatabaseRules = prost::Message::decode(bytes)?;
Ok(message.try_into()?)
}
pub fn encode(self, bytes: &mut prost::bytes::BytesMut) -> Result<()> {
let encoded: management::DatabaseRules = self.into();
Ok(prost::Message::encode(&encoded, bytes)?)
}
}
/// Generates a partition key based on the line and the default time.
pub trait Partitioner {
fn partition_key(
@ -137,7 +152,7 @@ impl TryFrom<management::DatabaseRules> for DatabaseRules {
/// MutableBufferConfig defines the configuration for the in-memory database
/// that is hot for writes as they arrive. Operators can define rules for
/// evicting data once the mutable buffer passes a set memory threshold.
#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)]
#[derive(Debug, Eq, PartialEq, Clone)]
pub struct MutableBufferConfig {
/// The size the mutable buffer should be limited to. Once the buffer gets
/// to this size it will drop partitions in the given order. If unable
@ -244,7 +259,7 @@ impl TryFrom<management::MutableBufferConfig> for MutableBufferConfig {
/// sort: PartitionSort::CreatedAtTime,
/// };
/// ```
#[derive(Debug, Default, Serialize, Deserialize, Eq, PartialEq, Clone)]
#[derive(Debug, Default, Eq, PartialEq, Clone)]
pub struct PartitionSortRules {
/// Sort partitions by this order. Last will be dropped.
pub order: Order,
@ -278,7 +293,7 @@ impl TryFrom<management::mutable_buffer_config::PartitionDropOrder> for Partitio
}
/// What to sort the partition by.
#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)]
#[derive(Debug, Eq, PartialEq, Clone)]
pub enum PartitionSort {
/// The last time the partition received a write.
LastWriteTime,
@ -351,7 +366,7 @@ impl TryFrom<management::mutable_buffer_config::partition_drop_order::Sort> for
}
/// The sort order.
#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)]
#[derive(Debug, Eq, PartialEq, Clone)]
pub enum Order {
Asc,
Desc,
@ -385,7 +400,7 @@ impl TryFrom<management::Order> for Order {
}
/// Use columns of this type.
#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)]
#[derive(Debug, Eq, PartialEq, Clone)]
pub enum ColumnType {
I64,
U64,
@ -422,7 +437,7 @@ impl TryFrom<management::ColumnType> for ColumnType {
}
/// Use either the min or max summary statistic.
#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)]
#[derive(Debug, Eq, PartialEq, Clone)]
pub enum ColumnValue {
Min,
Max,
@ -454,7 +469,7 @@ impl TryFrom<management::Aggregate> for ColumnValue {
/// WalBufferConfig defines the configuration for buffering data from the WAL in
/// memory. This buffer is used for asynchronous replication and to collect
/// segments before sending them to object storage.
#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)]
#[derive(Debug, Eq, PartialEq, Clone)]
pub struct WalBufferConfig {
/// The size the WAL buffer should be limited to. Once the buffer gets to
/// this size it will drop old segments to remain below this size, but
@ -528,7 +543,7 @@ impl TryFrom<management::WalBufferConfig> for WalBufferConfig {
/// WalBufferRollover defines the behavior of what should happen if a write
/// comes in that would cause the buffer to exceed its max size AND the oldest
/// segment can't be dropped because it has not yet been persisted.
#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, Copy)]
#[derive(Debug, Clone, Eq, PartialEq, Copy)]
pub enum WalBufferRollover {
/// Drop the old segment even though it hasn't been persisted. This part of
/// the WAL will be lost on this server.
@ -573,7 +588,7 @@ impl TryFrom<management::wal_buffer_config::Rollover> for WalBufferRollover {
///
/// The key is constructed in order of the template parts; thus ordering changes
/// what partition key is generated.
#[derive(Debug, Serialize, Deserialize, Default, Eq, PartialEq, Clone)]
#[derive(Debug, Default, Eq, PartialEq, Clone)]
pub struct PartitionTemplate {
pub parts: Vec<TemplatePart>,
}
@ -627,7 +642,7 @@ impl TryFrom<management::PartitionTemplate> for PartitionTemplate {
/// `TemplatePart` specifies what part of a row should be used to compute this
/// part of a partition key.
#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)]
#[derive(Debug, Eq, PartialEq, Clone)]
pub enum TemplatePart {
/// The name of a table
Table,
@ -647,7 +662,7 @@ pub enum TemplatePart {
/// `RegexCapture` is for pulling parts of a string column into the partition
/// key.
#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)]
#[derive(Debug, Eq, PartialEq, Clone)]
pub struct RegexCapture {
column: String,
regex: String,
@ -662,7 +677,7 @@ pub struct RegexCapture {
/// For example, a time format of "%Y-%m-%d %H:%M:%S" will produce
/// partition key parts such as "2021-03-14 12:25:21" and
/// "2021-04-14 12:24:21"
#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)]
#[derive(Debug, Eq, PartialEq, Clone)]
pub struct StrftimeColumn {
column: String,
format: String,
@ -737,7 +752,7 @@ impl TryFrom<management::partition_template::Part> for TemplatePart {
/// based on table name and assign to 1 of 10 shards. Within each
/// shard you would have partitions, which would likely be based off time.
/// This makes it possible to horizontally scale out writes.
#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)]
#[derive(Debug, Eq, PartialEq, Clone)]
pub struct ShardConfig {
/// An optional matcher. If there is a match, the route will be evaluated to
/// the given targets, otherwise the hash ring will be evaluated. This is
@ -758,7 +773,7 @@ pub struct ShardConfig {
/// Maps a matcher with specific target group. If the line/row matches
/// it should be sent to the group.
#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Default)]
#[derive(Debug, Eq, PartialEq, Clone, Default)]
pub struct MatcherToTargets {
pub matcher: Matcher,
pub target: NodeGroup,
@ -769,7 +784,7 @@ pub type NodeGroup = Vec<WriterId>;
/// HashRing is a rule for creating a hash key for a row and mapping that to
/// an individual node on a ring.
#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)]
#[derive(Debug, Eq, PartialEq, Clone)]
pub struct HashRing {
/// If true the table name will be included in the hash key
pub table_name: bool,
@ -781,10 +796,9 @@ pub struct HashRing {
/// A matcher is used to match routing rules or subscriptions on a row-by-row
/// (or line) basis.
#[derive(Debug, Serialize, Deserialize, Clone, Default)]
#[derive(Debug, Clone, Default)]
pub struct Matcher {
/// if provided, match if the table name matches against the regex
#[serde(with = "serde_regex")]
pub table_name_regex: Option<Regex>,
// paul: what should we use for predicate matching here against a single row/line?
pub predicate: Option<String>,

View File

@ -454,7 +454,7 @@ impl InfluxRPCPlanner {
// Filter(predicate)
// InMemoryScan
let plan = plan_builder
.project(&select_exprs)
.project(select_exprs.clone())
.context(BuildingPlan)?
.filter(tag_name_is_not_null)
.context(BuildingPlan)?
@ -744,7 +744,7 @@ impl InfluxRPCPlanner {
.collect::<Vec<_>>();
let plan = plan_builder
.project(&select_exprs)
.project(select_exprs)
.context(BuildingPlan)?
.build()
.context(BuildingPlan)?;
@ -804,7 +804,7 @@ impl InfluxRPCPlanner {
.collect::<Vec<_>>();
let plan = plan_builder
.project(&select_exprs)
.project(select_exprs)
.context(BuildingPlan)?
.build()
.context(BuildingPlan)?;
@ -865,7 +865,7 @@ impl InfluxRPCPlanner {
// Order by
let plan_builder = plan_builder
.sort(&tags_and_timestamp)
.sort(tags_and_timestamp)
.context(BuildingPlan)?;
// Select away anything that isn't in the influx data model
@ -877,7 +877,7 @@ impl InfluxRPCPlanner {
.collect();
let plan_builder = plan_builder
.project(&tags_fields_and_timestamps)
.project(tags_fields_and_timestamps)
.context(BuildingPlan)?;
let plan = plan_builder.build().context(BuildingPlan)?;
@ -993,9 +993,9 @@ impl InfluxRPCPlanner {
.collect::<Vec<_>>();
let plan_builder = plan_builder
.aggregate(&group_exprs, &agg_exprs)
.aggregate(group_exprs, agg_exprs)
.context(BuildingPlan)?
.sort(&sort_exprs)
.sort(sort_exprs)
.context(BuildingPlan)?;
// and finally create the plan
@ -1079,9 +1079,9 @@ impl InfluxRPCPlanner {
.collect::<Vec<_>>();
let plan_builder = plan_builder
.aggregate(&group_exprs, &agg_exprs)
.aggregate(group_exprs, agg_exprs)
.context(BuildingPlan)?
.sort(&sort_exprs)
.sort(sort_exprs)
.context(BuildingPlan)?;
// and finally create the plan

View File

@ -63,6 +63,16 @@ pub enum Error {
table_name: String,
source: crate::provider::Error,
},
#[snafu(display(
"Error registering table provider for table '{}': {}",
table_name,
source
))]
RegisteringTableProvider {
table_name: String,
source: DataFusionError,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@ -128,7 +138,8 @@ impl SQLQueryPlanner {
.context(CreatingTableProvider { table_name })?;
ctx.inner_mut()
.register_table(&table_name, Arc::new(provider));
.register_table(table_name.as_str(), Arc::new(provider))
.context(RegisteringTableProvider { table_name })?;
}
ctx.prepare_sql(query).await.context(Preparing)

View File

@ -672,10 +672,10 @@ mod test {
)
.unwrap();
let mut ctx = ExecutionContext::new();
ctx.register_table("t", Arc::new(provider));
ctx.register_table("t", Arc::new(provider)).unwrap();
let df = ctx.table("t").unwrap();
let df = df.aggregate(&[], &aggs).unwrap();
let df = df.aggregate(vec![], aggs).unwrap();
// execute the query
let record_batches = df.collect().await.unwrap();

View File

@ -343,7 +343,7 @@ mod test {
#[test]
fn size() {
let v = Bool::from(vec![None, None, Some(true), Some(false)].as_slice());
assert_eq!(v.size(), 240);
assert_eq!(v.size(), 464);
}
#[test]

View File

@ -599,7 +599,7 @@ mod test {
#[test]
fn size() {
let v = FixedNull::<UInt64Type>::from(vec![None, None, Some(100), Some(2222)].as_slice());
assert_eq!(v.size(), 240);
assert_eq!(v.size(), 344);
}
#[test]

View File

@ -1092,27 +1092,27 @@ impl From<RecordBatch> for RowGroup {
Some(InfluxColumnType::Tag) => {
assert_eq!(arrow_column.data_type(), &arrow::datatypes::DataType::Utf8);
let column_data =
Column::from(arrow::array::StringArray::from(arrow_column.data()));
Column::from(arrow::array::StringArray::from(arrow_column.data().clone()));
columns.insert(col_name.to_owned(), ColumnType::Tag(column_data));
}
Some(InfluxColumnType::Field(_)) => {
let column_data = match arrow_column.data_type() {
arrow::datatypes::DataType::Int64 => {
Column::from(arrow::array::Int64Array::from(arrow_column.data()))
}
arrow::datatypes::DataType::Float64 => {
Column::from(arrow::array::Float64Array::from(arrow_column.data()))
}
arrow::datatypes::DataType::UInt64 => {
Column::from(arrow::array::UInt64Array::from(arrow_column.data()))
}
arrow::datatypes::DataType::Boolean => {
Column::from(arrow::array::BooleanArray::from(arrow_column.data()))
}
arrow::datatypes::DataType::Utf8 => {
Column::from(arrow::array::StringArray::from(arrow_column.data()))
}
arrow::datatypes::DataType::Int64 => Column::from(
arrow::array::Int64Array::from(arrow_column.data().clone()),
),
arrow::datatypes::DataType::Float64 => Column::from(
arrow::array::Float64Array::from(arrow_column.data().clone()),
),
arrow::datatypes::DataType::UInt64 => Column::from(
arrow::array::UInt64Array::from(arrow_column.data().clone()),
),
arrow::datatypes::DataType::Boolean => Column::from(
arrow::array::BooleanArray::from(arrow_column.data().clone()),
),
arrow::datatypes::DataType::Utf8 => Column::from(
arrow::array::StringArray::from(arrow_column.data().clone()),
),
dt => unimplemented!(
"data type {:?} currently not supported for field columns",
dt
@ -1125,7 +1125,7 @@ impl From<RecordBatch> for RowGroup {
assert_eq!(col_name, TIME_COLUMN_NAME);
let column_data =
Column::from(arrow::array::Int64Array::from(arrow_column.data()));
Column::from(arrow::array::Int64Array::from(arrow_column.data().clone()));
columns.insert(col_name.to_owned(), ColumnType::Time(column_data));
}

View File

@ -17,7 +17,7 @@ use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use tracing::{error, info, warn, Instrument};
pub(crate) const DB_RULES_FILE_NAME: &str = "rules.json";
pub(crate) const DB_RULES_FILE_NAME: &str = "rules.pb";
/// The Config tracks the configuration of databases and their rules along
/// with host groups for replication. It is used as an in-memory structure
@ -318,7 +318,7 @@ mod test {
let mut expected_path = base_path;
expected_path.push_dir("foo");
expected_path.set_file_name("rules.json");
expected_path.set_file_name("rules.pb");
assert_eq!(rules_path, expected_path);
}

View File

@ -73,7 +73,7 @@ use std::sync::{
};
use async_trait::async_trait;
use bytes::Bytes;
use bytes::BytesMut;
use futures::stream::TryStreamExt;
use parking_lot::Mutex;
use snafu::{OptionExt, ResultExt, Snafu};
@ -139,7 +139,9 @@ pub enum Error {
#[snafu(display("unable to use server until id is set"))]
IdNotSet,
#[snafu(display("error serializing configuration {}", source))]
ErrorSerializing { source: serde_json::Error },
ErrorSerializing {
source: data_types::database_rules::Error,
},
#[snafu(display("error deserializing configuration {}", source))]
ErrorDeserializing { source: serde_json::Error },
#[snafu(display("store error: {}", source))]
@ -239,13 +241,19 @@ impl<M: ConnectionManager> Server<M> {
let db_reservation = self.config.create_db(db_name, rules)?;
let data =
Bytes::from(serde_json::to_vec(&db_reservation.db.rules).context(ErrorSerializing)?);
let mut data = BytesMut::new();
db_reservation
.db
.rules
.clone()
.encode(&mut data)
.context(ErrorSerializing)?;
let len = data.len();
let location =
object_store_path_for_database_config(&self.root_path()?, &db_reservation.name);
let stream_data = std::io::Result::Ok(data);
let stream_data = std::io::Result::Ok(data.freeze());
self.store
.put(
&location,
@ -305,9 +313,9 @@ impl<M: ConnectionManager> Server<M> {
res = get_store_bytes(&path, &store).await;
}
let res = res.unwrap();
let res = res.unwrap().freeze();
match serde_json::from_slice::<DatabaseRules>(&res) {
match DatabaseRules::decode(res) {
Err(e) => {
error!("error parsing database config {:?} from store: {}", path, e)
}
@ -722,7 +730,7 @@ mod tests {
let mut rules_path = server.store.new_path();
rules_path.push_all_dirs(&["1", name]);
rules_path.set_file_name("rules.json");
rules_path.set_file_name("rules.pb");
let read_data = server
.store
@ -732,10 +740,10 @@ mod tests {
.map_ok(|b| bytes::BytesMut::from(&b[..]))
.try_concat()
.await
.unwrap();
.unwrap()
.freeze();
let read_data = std::str::from_utf8(&*read_data).unwrap();
let read_rules = serde_json::from_str::<DatabaseRules>(read_data).unwrap();
let read_rules = DatabaseRules::decode(read_data).unwrap();
assert_eq!(rules, read_rules);

View File

@ -551,15 +551,11 @@ mod tests {
let assert_fuzzy = |actual: usize, expected: std::time::Duration| {
// Number of milliseconds of toleration
let epsilon = Duration::from_millis(10).as_nanos() as usize;
let epsilon = Duration::from_millis(25).as_nanos() as usize;
let expected = expected.as_nanos() as usize;
assert!(
actual > expected.saturating_sub(epsilon),
"Expected {} got {}",
expected,
actual
);
// std::thread::sleep is guaranteed to take at least as long as requested
assert!(actual > expected, "Expected {} got {}", expected, actual);
assert!(
actual < expected.saturating_add(epsilon),
"Expected {} got {}",