chore: merge main to branch

pull/24376/head
Nga Tran 2021-10-11 10:47:10 -04:00
commit fbf5539336
88 changed files with 232 additions and 301 deletions

82
Cargo.lock generated
View File

@ -31,9 +31,9 @@ checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe"
[[package]]
name = "ahash"
version = "0.7.4"
version = "0.7.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "43bb833f0bf979d8475d38fbf09ed3b8a55e1885fe93ad3f93239fc6a4f17b98"
checksum = "991984e3fd003e7ba02eb724f87a0f997b78677c46c0e91f8424ad7394c9886a"
dependencies = [
"getrandom 0.2.3",
"once_cell",
@ -182,9 +182,9 @@ dependencies = [
[[package]]
name = "assert_cmd"
version = "2.0.1"
version = "2.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b800c4403e8105d959595e1f88119e78bc12bc874c4336973658b648a746ba93"
checksum = "e996dc7940838b7ef1096b882e29ec30a3149a3a443cdc8dba19ed382eca1fe2"
dependencies = [
"bstr",
"doc-comment",
@ -1014,8 +1014,8 @@ dependencies = [
"flatbuffers",
"generated_types",
"influxdb_line_protocol",
"internal_types",
"ouroboros",
"schema",
"snafu",
]
@ -1795,13 +1795,8 @@ checksum = "90c11140ffea82edce8dcd74137ce9324ec24b3cf0175fc9d7e29164da9915b8"
name = "internal_types"
version = "0.1.0"
dependencies = [
"arrow",
"arrow_util",
"chrono",
"futures",
"hashbrown 0.11.2",
"indexmap",
"itertools",
"parking_lot",
"snafu",
"tokio",
@ -2209,10 +2204,10 @@ dependencies = [
"data_types",
"entry",
"hashbrown 0.11.2",
"internal_types",
"metric",
"observability_deps",
"parking_lot",
"schema",
"snafu",
"test_helpers",
"tokio",
@ -2589,9 +2584,9 @@ dependencies = [
[[package]]
name = "ouroboros"
version = "0.12.0"
version = "0.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c711f35b4e881534535e7d943aa158ed673baf8e73b06bdd93b31703cf968cc3"
checksum = "f357ef82d1b4db66fbed0b8d542cbd3c22d0bf5b393b3c257b9ba4568e70c9c3"
dependencies = [
"aliasable",
"ouroboros_macro",
@ -2600,9 +2595,9 @@ dependencies = [
[[package]]
name = "ouroboros_macro"
version = "0.12.0"
version = "0.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "adbdedd935f91827ea19bb8b97c20dd5870221eac3e9d9a2e70367ecc813479d"
checksum = "44a0b52c2cbaef7dffa5fec1a43274afe8bd2a644fa9fc50a9ef4ff0269b1257"
dependencies = [
"Inflector",
"proc-macro-error",
@ -2618,10 +2613,10 @@ dependencies = [
"arrow",
"criterion",
"influxdb_tsm",
"internal_types",
"observability_deps",
"parquet",
"rand",
"schema",
"snafu",
"test_helpers",
]
@ -2702,7 +2697,6 @@ dependencies = [
"datafusion_util",
"futures",
"generated_types",
"internal_types",
"iox_object_store",
"metric",
"object_store",
@ -2714,6 +2708,7 @@ dependencies = [
"persistence_windows",
"predicate",
"prost",
"schema",
"snafu",
"tempfile",
"test_helpers",
@ -2802,7 +2797,6 @@ version = "0.1.0"
dependencies = [
"chrono",
"data_types",
"entry",
"internal_types",
"observability_deps",
"snafu",
@ -2965,11 +2959,10 @@ dependencies = [
"datafusion 0.1.0",
"datafusion_util",
"generated_types",
"hashbrown 0.11.2",
"internal_types",
"observability_deps",
"ordered-float 2.8.0",
"regex",
"schema",
"serde_json",
"snafu",
"sqlparser",
@ -3155,13 +3148,13 @@ dependencies = [
"datafusion_util",
"futures",
"hashbrown 0.11.2",
"internal_types",
"itertools",
"libc",
"observability_deps",
"parking_lot",
"predicate",
"regex",
"schema",
"snafu",
"test_helpers",
"tokio",
@ -3179,12 +3172,12 @@ dependencies = [
"chrono",
"data_types",
"datafusion 0.1.0",
"internal_types",
"metric",
"object_store",
"once_cell",
"predicate",
"query",
"schema",
"server",
"snafu",
"tempfile",
@ -3358,7 +3351,6 @@ dependencies = [
"datafusion 0.1.0",
"either",
"hashbrown 0.11.2",
"internal_types",
"itertools",
"metric",
"observability_deps",
@ -3367,6 +3359,7 @@ dependencies = [
"permutation",
"rand",
"rand_distr",
"schema",
"snafu",
"test_helpers",
]
@ -3444,9 +3437,9 @@ dependencies = [
[[package]]
name = "reqwest"
version = "0.11.4"
version = "0.11.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "246e9f61b9bb77df069a947682be06e31ac43ea37862e244a69f177694ea6d22"
checksum = "51c732d463dd300362ffb44b7b125f299c23d2990411a4253824630ebc7467fb"
dependencies = [
"base64 0.13.0",
"bytes",
@ -3687,6 +3680,18 @@ dependencies = [
"winapi",
]
[[package]]
name = "schema"
version = "0.1.0"
dependencies = [
"arrow",
"arrow_util",
"hashbrown 0.11.2",
"indexmap",
"itertools",
"snafu",
]
[[package]]
name = "scopeguard"
version = "1.1.0"
@ -3847,6 +3852,7 @@ dependencies = [
"rand",
"rand_distr",
"read_buffer",
"schema",
"serde",
"serde_json",
"snafu",
@ -4180,18 +4186,18 @@ dependencies = [
[[package]]
name = "thiserror"
version = "1.0.29"
version = "1.0.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "602eca064b2d83369e2b2f34b09c70b605402801927c65c11071ac911d299b88"
checksum = "854babe52e4df1653706b98fcfc05843010039b406875930a70e4d9644e5c417"
dependencies = [
"thiserror-impl",
]
[[package]]
name = "thiserror-impl"
version = "1.0.29"
version = "1.0.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bad553cc2c78e8de258400763a647e80e6d1b31ee237275d756f6836d204494c"
checksum = "aa32fd3f627f367fe16f893e2597ae3c05020f8bba2666a4e6ea73d377e5714b"
dependencies = [
"proc-macro2",
"quote",
@ -4532,9 +4538,9 @@ dependencies = [
[[package]]
name = "tracing"
version = "0.1.28"
version = "0.1.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "84f96e095c0c82419687c20ddf5cb3eadb61f4e1405923c9dc8e53a1adacbda8"
checksum = "375a639232caf30edfc78e8d89b2d4c375515393e7af7e16f01cd96917fb2105"
dependencies = [
"cfg-if",
"log",
@ -4545,9 +4551,9 @@ dependencies = [
[[package]]
name = "tracing-attributes"
version = "0.1.16"
version = "0.1.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "98863d0dd09fa59a1b79c6750ad80dbda6b75f4e71c437a6a1a8cb91a8bcbd77"
checksum = "f4f480b8f81512e825f337ad51e94c1eb5d3bbdf2b363dcd01e2b19a9ffe3f8e"
dependencies = [
"proc-macro2",
"quote",
@ -4556,9 +4562,9 @@ dependencies = [
[[package]]
name = "tracing-core"
version = "0.1.20"
version = "0.1.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "46125608c26121c81b0c6d693eab5a420e416da7e43c426d2e8f7df8da8a3acf"
checksum = "1f4ed65637b8390770814083d20756f87bfa2c21bf2f110babdc5438351746e4"
dependencies = [
"lazy_static",
]
@ -4596,9 +4602,9 @@ dependencies = [
[[package]]
name = "tracing-subscriber"
version = "0.2.24"
version = "0.2.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fdd0568dbfe3baf7048b7908d2b32bca0d81cd56bec6d2a8f894b01d74f86be3"
checksum = "0e0d2eaa99c3c2e41547cfa109e910a68ea03823cccad4a0525dcbc9b01e8c71"
dependencies = [
"ansi_term 0.12.1",
"chrono",
@ -4805,8 +4811,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "632f73e236b219150ea279196e54e610f5dbafa5d61786303d4da54f84e47fce"
dependencies = [
"cfg-if",
"serde",
"serde_json",
"wasm-bindgen-macro",
]

View File

@ -69,6 +69,7 @@ members = [
"trace_http",
"tracker",
"trogging",
"schema",
"grpc-router",
"grpc-router/grpc-router-test-gen",
"write_buffer",
@ -143,7 +144,7 @@ serde_json = "1.0.67"
serde_urlencoded = "0.7.0"
snafu = "0.6.9"
structopt = "0.3.23"
thiserror = "1.0.28"
thiserror = "1.0.30"
tikv-jemalloc-ctl = { version = "0.4.0" }
tokio = { version = "1.11", features = ["macros", "rt-multi-thread", "parking_lot", "signal"] }
tokio-stream = { version = "0.1.2", features = ["net"] }
@ -172,7 +173,7 @@ parking_lot = "0.11.2"
write_buffer = { path = "write_buffer" }
# Crates.io dependencies, in alphabetical order
assert_cmd = "2.0.0"
assert_cmd = "2.0.2"
flate2 = "1.0"
hex = "0.4.2"
predicates = "2.0.2"

View File

@ -8,7 +8,7 @@ description = "Apache Arrow utilities"
[dependencies]
arrow = { version = "5.5", features = ["prettyprint"] }
ahash = "0.7.2"
ahash = "0.7.5"
num-traits = "0.2"
snafu = "0.6"
hashbrown = "0.11"

View File

@ -8,7 +8,7 @@ edition = "2018"
[dependencies]
http = "0.2.3"
prost = "0.8"
thiserror = "1.0.28"
thiserror = "1.0.30"
tonic = { version = "0.5.0" }
tower = "0.4"

View File

@ -160,10 +160,6 @@ pub struct ChunkSummary {
/// into IOx. Note due to the compaction, etc... this may not be the chunk
/// that data was originally written into
pub time_of_last_write: DateTime<Utc>,
/// Time at which this chunk was marked as closed. Note this is
/// not the same as the timestamps on the data itself
pub time_closed: Option<DateTime<Utc>>,
}
/// Represents metadata about the physical storage of a column in a chunk

View File

@ -19,6 +19,7 @@ pub mod error;
pub mod job;
pub mod names;
pub mod partition_metadata;
pub mod sequence;
pub mod server_id;
pub mod timestamp;
pub mod write_summary;

View File

@ -0,0 +1,14 @@
#[derive(Debug, Copy, Clone)]
pub struct Sequence {
pub id: u32,
pub number: u64,
}
impl Sequence {
pub fn new(sequencer_id: u32, sequence_number: u64) -> Self {
Self {
id: sequencer_id,
number: sequence_number,
}
}
}

View File

@ -13,6 +13,6 @@ data_types = { path = "../data_types" }
flatbuffers = "2"
snafu = "0.6"
influxdb_line_protocol = { path = "../influxdb_line_protocol" }
ouroboros = "0.12.0"
internal_types = { path = "../internal_types" }
ouroboros = "0.13.0"
schema = { path = "../schema" }
generated_types = { path = "../generated_types" }

View File

@ -9,15 +9,16 @@ use ouroboros::self_referencing;
use snafu::{OptionExt, ResultExt, Snafu};
use data_types::database_rules::{Error as DataError, Partitioner, ShardId, Sharder};
use data_types::sequence::Sequence;
use data_types::write_summary::TimestampSummary;
use generated_types::influxdata::pbdata::v1 as pb;
use influxdb_line_protocol::{FieldValue, ParsedLine};
use internal_types::schema::{
use schema::{
builder::{Error as SchemaBuilderError, SchemaBuilder},
IOxValueType, InfluxColumnType, InfluxFieldType, Schema, TIME_COLUMN_NAME,
};
use crate::entry_fb;
use data_types::write_summary::TimestampSummary;
#[derive(Debug, Snafu)]
pub enum Error {
@ -1750,21 +1751,6 @@ pub struct SequencedEntry {
sequence_and_producer_ts: Option<(Sequence, DateTime<Utc>)>,
}
#[derive(Debug, Copy, Clone)]
pub struct Sequence {
pub id: u32,
pub number: u64,
}
impl Sequence {
pub fn new(sequencer_id: u32, sequence_number: u64) -> Self {
Self {
id: sequencer_id,
number: sequence_number,
}
}
}
impl SequencedEntry {
pub fn new_from_sequence(
sequence: Sequence,

View File

@ -13,7 +13,7 @@ pbjson-types = "0.1"
prost = "0.8"
regex = "1.4"
serde = { version = "1.0", features = ["derive"] }
thiserror = "1.0.28"
thiserror = "1.0.30"
tonic = "0.5"
[dev-dependencies]

View File

@ -90,9 +90,8 @@ message Chunk {
// that data was originally written into
google.protobuf.Timestamp time_of_last_write = 6;
// Time at which this chunk was marked as closed. Note this is not
// the same as the timestamps on the data itself
google.protobuf.Timestamp time_closed = 7;
// Was `time_closed`.
reserved 7;
// Order of this chunk relative to other overlapping chunks.
uint32 order = 13;

View File

@ -25,7 +25,6 @@ impl From<ChunkSummary> for management::Chunk {
time_of_last_access,
time_of_first_write,
time_of_last_write,
time_closed,
order,
} = summary;
@ -41,7 +40,6 @@ impl From<ChunkSummary> for management::Chunk {
time_of_last_access: time_of_last_access.map(Into::into),
time_of_first_write: Some(time_of_first_write.into()),
time_of_last_write: Some(time_of_last_write.into()),
time_closed: time_closed.map(Into::into),
order: order.get(),
}
}
@ -106,7 +104,6 @@ impl TryFrom<management::Chunk> for ChunkSummary {
time_of_last_access,
time_of_first_write,
time_of_last_write,
time_closed,
order,
} = proto;
@ -123,7 +120,6 @@ impl TryFrom<management::Chunk> for ChunkSummary {
time_of_last_access: timestamp(time_of_last_access, "time_of_last_access")?,
time_of_first_write: required_timestamp(time_of_first_write, "time_of_first_write")?,
time_of_last_write: required_timestamp(time_of_last_write, "time_of_last_write")?,
time_closed: timestamp(time_closed, "time_closed")?,
order: ChunkOrder::new(order).ok_or_else(|| FieldViolation {
field: "order".to_string(),
description: "Order must be non-zero".to_string(),
@ -188,7 +184,6 @@ mod test {
lifecycle_action: management::ChunkLifecycleAction::Compacting.into(),
time_of_first_write: Some(now.into()),
time_of_last_write: Some(now.into()),
time_closed: None,
time_of_last_access: Some(pbjson_types::Timestamp {
seconds: 50,
nanos: 7,
@ -208,7 +203,6 @@ mod test {
lifecycle_action: Some(ChunkLifecycleAction::Compacting),
time_of_first_write: now,
time_of_last_write: now,
time_closed: None,
time_of_last_access: Some(Utc.timestamp_nanos(50_000_000_007)),
order: ChunkOrder::new(5).unwrap(),
};
@ -234,7 +228,6 @@ mod test {
lifecycle_action: Some(ChunkLifecycleAction::Persisting),
time_of_first_write: now,
time_of_last_write: now,
time_closed: None,
time_of_last_access: Some(Utc.timestamp_nanos(12_000_100_007)),
order: ChunkOrder::new(5).unwrap(),
};
@ -252,7 +245,6 @@ mod test {
lifecycle_action: management::ChunkLifecycleAction::Persisting.into(),
time_of_first_write: Some(now.into()),
time_of_last_write: Some(now.into()),
time_closed: None,
time_of_last_access: Some(pbjson_types::Timestamp {
seconds: 12,
nanos: 100_007,

View File

@ -12,7 +12,7 @@ observability_deps = { path = "../observability_deps" }
paste = "1.0.5"
prost = "0.8"
prost-types = "0.8"
thiserror = "1.0.28"
thiserror = "1.0.30"
tokio = { version = "1.11", features = ["macros", "rt-multi-thread", "parking_lot", "signal"] }
tokio-stream = { version = "0.1.2", features = ["net"] }
tokio-util = { version = "0.6.3" }

View File

@ -23,7 +23,7 @@ prost = "0.8"
rand = "0.8.3"
serde = "1.0.128"
serde_json = { version = "1.0.67", optional = true }
thiserror = "1.0.28"
thiserror = "1.0.30"
tonic = { version = "0.5.0" }
[dev-dependencies] # In alphabetical order

View File

@ -7,16 +7,11 @@ description = "InfluxDB IOx internal types, shared between IOx instances"
readme = "README.md"
[dependencies]
arrow = { version = "5.5", features = ["prettyprint"] }
chrono = "0.4"
hashbrown = "0.11"
indexmap = "1.6"
itertools = "0.10.1"
parking_lot = "0.11"
snafu = "0.6"
tokio = { version = "1.11", features = ["sync"] }
[dev-dependencies]
arrow_util = { path = "../arrow_util" }
futures = "0.3"
tokio = { version = "1.11", features = ["macros", "rt", "rt-multi-thread", "sync", "time"] }

View File

@ -9,5 +9,3 @@
pub mod access;
pub mod freezable;
pub mod once;
pub mod schema;
pub mod selection;

View File

@ -28,7 +28,7 @@ tokio = { version = "1.11", features = ["macros", "rt-multi-thread"] }
toml = "0.5.6"
tracing = "0.1"
tracing-futures = "0.2.4"
tracing-subscriber = "0.2.24"
tracing-subscriber = "0.2.25"
uuid = { version = "0.8.1", default_features = false }
[dev-dependencies]

View File

@ -21,7 +21,7 @@ chrono = "0.4"
data_types = { path = "../data_types" }
entry = { path = "../entry" }
hashbrown = "0.11"
internal_types = { path = "../internal_types" }
schema = { path = "../schema" }
metric = { path = "../metric" }
observability_deps = { path = "../observability_deps" }
parking_lot = "0.11.2"

View File

@ -6,11 +6,9 @@ use arrow::record_batch::RecordBatch;
use data_types::partition_metadata::{ColumnSummary, InfluxDbType, TableSummary};
use entry::TableBatch;
use hashbrown::HashMap;
use internal_types::{
schema::{builder::SchemaBuilder, InfluxColumnType, Schema},
selection::Selection,
};
use parking_lot::Mutex;
use schema::selection::Selection;
use schema::{builder::SchemaBuilder, InfluxColumnType, Schema};
use snafu::{ensure, OptionExt, ResultExt, Snafu};
use std::{collections::BTreeSet, sync::Arc};
@ -35,9 +33,7 @@ pub enum Error {
ArrowError { source: arrow::error::ArrowError },
#[snafu(display("Internal error converting schema: {}", source))]
InternalSchema {
source: internal_types::schema::builder::Error,
},
InternalSchema { source: schema::builder::Error },
#[snafu(display("Column not found: {}", column))]
ColumnNotFound { column: String },
@ -435,7 +431,7 @@ mod tests {
ColumnSummary, InfluxDbType, StatValues, Statistics, TableSummary,
};
use entry::test_helpers::lp_to_entry;
use internal_types::schema::{InfluxColumnType, InfluxFieldType};
use schema::{InfluxColumnType, InfluxFieldType};
use std::{convert::TryFrom, num::NonZeroU64, vec};
#[test]

View File

@ -5,10 +5,9 @@ use data_types::{
partition_metadata::{Statistics, TableSummary},
timestamp::TimestampRange,
};
use internal_types::{
schema::{Schema, TIME_COLUMN_NAME},
selection::Selection,
};
use schema::selection::Selection;
use schema::{Schema, TIME_COLUMN_NAME};
use snafu::{ResultExt, Snafu};
use std::{collections::BTreeSet, sync::Arc};
@ -18,9 +17,7 @@ pub enum Error {
TableNotFound { table_name: String },
#[snafu(display("Failed to select columns: {}", source))]
SelectColumns {
source: internal_types::schema::Error,
},
SelectColumns { source: schema::Error },
}
pub type Result<T, E = Error> = std::result::Result<T, E>;

View File

@ -16,7 +16,7 @@ use arrow_util::bitset::{iter_set_positions, BitSet};
use arrow_util::string::PackedStringArray;
use data_types::partition_metadata::{IsNan, StatValues, Statistics};
use entry::Column as EntryColumn;
use internal_types::schema::{IOxValueType, InfluxColumnType, InfluxFieldType, TIME_DATA_TYPE};
use schema::{IOxValueType, InfluxColumnType, InfluxFieldType, TIME_DATA_TYPE};
use crate::dictionary::{Dictionary, DID, INVALID_DID};

View File

@ -7,7 +7,7 @@ edition = "2018"
[dependencies] # In alphabetical order
arrow = { version = "5.5", features = ["prettyprint"] }
influxdb_tsm = { path = "../influxdb_tsm" }
internal_types = { path = "../internal_types" }
schema = { path = "../schema" }
snafu = "0.6.2"
observability_deps = { path = "../observability_deps" }
parquet = "5.5"

View File

@ -11,10 +11,10 @@ pub mod packers;
pub mod sorter;
pub mod stats;
use schema::Schema;
use snafu::Snafu;
pub use crate::packers::{Packer, Packers};
use internal_types::schema::Schema;
pub use parquet::data_type::ByteArray;
use std::borrow::Cow;

View File

@ -9,8 +9,8 @@ use core::iter::Iterator;
use std::iter;
use std::slice::Chunks;
use internal_types::schema::{InfluxColumnType, InfluxFieldType};
use parquet::data_type::ByteArray;
use schema::{InfluxColumnType, InfluxFieldType};
use std::default::Default;
// NOTE: See https://blog.twitter.com/engineering/en_us/a/2013/dremel-made-simple-with-parquet.html

View File

@ -14,7 +14,6 @@ datafusion = { path = "../datafusion" }
datafusion_util = { path = "../datafusion_util" }
futures = "0.3.7"
generated_types = { path = "../generated_types" }
internal_types = { path = "../internal_types" }
iox_object_store = { path = "../iox_object_store" }
metric = { path = "../metric" }
object_store = { path = "../object_store" }
@ -27,6 +26,7 @@ persistence_windows = { path = "../persistence_windows" }
predicate = { path = "../predicate" }
prost = "0.8"
snafu = "0.6"
schema = { path = "../schema" }
tempfile = "3.1.0"
thrift = "0.13"
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync"] }

View File

@ -37,7 +37,7 @@ pub struct DumpOptions {
/// error otherwise.
pub show_iox_metadata: bool,
/// Show debug output of [`Schema`](internal_types::schema::Schema) if decoding succeeds, show the decoding
/// Show debug output of [`Schema`](schema::Schema) if decoding succeeds, show the decoding
/// error otherwise.
pub show_schema: bool,

View File

@ -4,12 +4,10 @@ use data_types::{
timestamp::TimestampRange,
};
use datafusion::physical_plan::SendableRecordBatchStream;
use internal_types::{
schema::{Schema, TIME_COLUMN_NAME},
selection::Selection,
};
use iox_object_store::{IoxObjectStore, ParquetFilePath};
use predicate::predicate::Predicate;
use schema::selection::Selection;
use schema::{Schema, TIME_COLUMN_NAME};
use snafu::{ResultExt, Snafu};
use std::{collections::BTreeSet, mem, sync::Arc};
@ -22,9 +20,7 @@ pub enum Error {
ReadParquet { source: crate::storage::Error },
#[snafu(display("Failed to select columns: {}", source))]
SelectColumns {
source: internal_types::schema::Error,
},
SelectColumns { source: schema::Error },
#[snafu(
display("Cannot decode parquet metadata from {:?}: {}", path, source),

View File

@ -92,7 +92,6 @@ use data_types::{
partition_metadata::{ColumnSummary, InfluxDbType, StatValues, Statistics},
};
use generated_types::influxdata::iox::catalog::v1 as proto;
use internal_types::schema::{InfluxColumnType, InfluxFieldType, Schema};
use parquet::{
arrow::parquet_to_arrow_schema,
file::{
@ -111,6 +110,7 @@ use persistence_windows::{
min_max_sequence::OptionalMinMaxSequence,
};
use prost::Message;
use schema::{InfluxColumnType, InfluxFieldType, Schema};
use snafu::{ensure, OptionExt, ResultExt, Snafu};
use std::{collections::BTreeMap, convert::TryInto, sync::Arc};
use thrift::protocol::{TCompactInputProtocol, TCompactOutputProtocol, TOutputProtocol};
@ -207,9 +207,7 @@ pub enum Error {
},
#[snafu(display("Cannot read IOx schema from arrow: {}", source))]
IoxFromArrowFailure {
source: internal_types::schema::Error,
},
IoxFromArrowFailure { source: schema::Error },
#[snafu(display("Parquet metadata does not contain IOx metadata"))]
IoxMetadataMissing {},
@ -862,7 +860,7 @@ fn extract_iox_statistics(
mod tests {
use super::*;
use internal_types::schema::TIME_COLUMN_NAME;
use schema::TIME_COLUMN_NAME;
use crate::test_utils::{
chunk_addr, create_partition_and_database_checkpoint, load_parquet_from_store, make_chunk,

View File

@ -13,7 +13,6 @@ use datafusion::{
};
use datafusion_util::AdapterStream;
use futures::StreamExt;
use internal_types::selection::Selection;
use iox_object_store::{IoxObjectStore, ParquetFilePath};
use observability_deps::tracing::debug;
use parking_lot::Mutex;
@ -24,6 +23,7 @@ use parquet::{
file::{metadata::KeyValue, properties::WriterProperties, writer::TryClone},
};
use predicate::predicate::Predicate;
use schema::selection::Selection;
use snafu::{ensure, OptionExt, ResultExt, Snafu};
use std::{
io::{Cursor, Seek, SeekFrom, Write},

View File

@ -21,10 +21,6 @@ use data_types::{
use datafusion::physical_plan::SendableRecordBatchStream;
use datafusion_util::MemoryStream;
use futures::TryStreamExt;
use internal_types::{
schema::{builder::SchemaBuilder, Schema, TIME_COLUMN_NAME},
selection::Selection,
};
use iox_object_store::{IoxObjectStore, ParquetFilePath};
use object_store::ObjectStore;
use parquet::{
@ -35,6 +31,8 @@ use persistence_windows::{
checkpoint::{DatabaseCheckpoint, PartitionCheckpoint, PersistCheckpointBuilder},
min_max_sequence::OptionalMinMaxSequence,
};
use schema::selection::Selection;
use schema::{builder::SchemaBuilder, Schema, TIME_COLUMN_NAME};
use snafu::{ResultExt, Snafu};
use std::{collections::BTreeMap, num::NonZeroU32, sync::Arc};

View File

@ -6,7 +6,6 @@ edition = "2018"
[dependencies]
chrono = "0.4"
data_types = { path = "../data_types" }
entry = { path = "../entry" }
internal_types = { path = "../internal_types" }
observability_deps = { path = "../observability_deps" }
snafu = "0.6.2"

View File

@ -319,9 +319,8 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
/// `max_persisted` timestamp ("flush timestamp").
///
/// The `min_persisted` timestamp is relative to the value in
/// [`TIME_COLUMN_NAME`](internal_types::schema::TIME_COLUMN_NAME). The
/// min/max sequence numbers are relative to their respective
/// sequencers.
/// the time column. The min/max sequence numbers are relative
/// to their respective sequencers.
///
/// Since the sequence number is per-Entry, that it can be evaluated
/// quickly during replay, while the timestamp must be checked for each

View File

@ -9,8 +9,9 @@ use std::{
use chrono::{DateTime, Duration, Utc};
use data_types::{partition_metadata::PartitionAddr, write_summary::WriteSummary};
use entry::Sequence;
use data_types::{
partition_metadata::PartitionAddr, sequence::Sequence, write_summary::WriteSummary,
};
use internal_types::freezable::{Freezable, FreezeHandle};
use crate::min_max_sequence::MinMaxSequence;

View File

@ -10,8 +10,7 @@ data_types = { path = "../data_types" }
datafusion = { path = "../datafusion" }
datafusion_util = { path = "../datafusion_util" }
generated_types = { path = "../generated_types" }
hashbrown = "0.11"
internal_types = { path = "../internal_types" }
schema = { path = "../schema" }
observability_deps = { path = "../observability_deps" }
ordered-float = "2"
regex = "1"

View File

@ -3,7 +3,7 @@ use std::{collections::BTreeSet, convert::TryInto};
use chrono::DateTime;
use data_types::timestamp::TimestampRange;
use datafusion::logical_plan::{lit, Column, Expr, Operator};
use internal_types::schema::TIME_COLUMN_NAME;
use schema::TIME_COLUMN_NAME;
use snafu::{ResultExt, Snafu};
use sqlparser::{
ast::{BinaryOperator, Expr as SqlParserExpr, Ident, Statement, Value},

View File

@ -15,8 +15,8 @@ use datafusion::{
optimizer::utils,
};
use datafusion_util::{make_range_expr, AndExprBuilder};
use internal_types::schema::TIME_COLUMN_NAME;
use observability_deps::tracing::debug;
use schema::TIME_COLUMN_NAME;
/// This `Predicate` represents the empty predicate (aka that
/// evaluates to true for all rows).

View File

@ -24,10 +24,10 @@ datafusion = { path = "../datafusion" }
datafusion_util = { path = "../datafusion_util" }
futures = "0.3"
hashbrown = "0.11"
internal_types = { path = "../internal_types" }
observability_deps = { path = "../observability_deps" }
parking_lot = "0.11.2"
regex = "1"
schema = { path = "../schema" }
snafu = "0.6.9"
tokio = { version = "1.11", features = ["macros"] }
tokio-stream = "0.1.2"

View File

@ -1,7 +1,7 @@
use std::sync::Arc;
use arrow::{self, datatypes::SchemaRef};
use internal_types::schema::TIME_COLUMN_NAME;
use schema::TIME_COLUMN_NAME;
use snafu::{ResultExt, Snafu};
#[derive(Debug, Snafu)]

View File

@ -9,7 +9,7 @@ use arrow::{
datatypes::{DataType, SchemaRef},
record_batch::RecordBatch,
};
use internal_types::schema::TIME_COLUMN_NAME;
use schema::TIME_COLUMN_NAME;
use snafu::{ensure, ResultExt, Snafu};
@ -191,7 +191,7 @@ mod tests {
array::{Int64Array, StringArray},
datatypes::{DataType as ArrowDataType, Field as ArrowField, Schema},
};
use internal_types::schema::TIME_DATA_TYPE;
use schema::TIME_DATA_TYPE;
#[test]
fn test_convert_single_batch() {

View File

@ -12,7 +12,7 @@ mod test {
ExecutionPlan, ExecutionPlanVisitor,
};
use futures::StreamExt;
use internal_types::schema::{merge::SchemaMerger, sort::SortKey, Schema};
use schema::{merge::SchemaMerger, sort::SortKey, Schema};
use crate::{
exec::{split::StreamSplitExec, Executor, ExecutorType},

View File

@ -15,12 +15,10 @@ use datafusion::{
use datafusion_util::AsExpr;
use hashbrown::{HashMap, HashSet};
use internal_types::{
schema::{InfluxColumnType, Schema, TIME_COLUMN_NAME},
selection::Selection,
};
use observability_deps::tracing::{debug, trace};
use predicate::predicate::{Predicate, PredicateMatch};
use schema::selection::Selection;
use schema::{InfluxColumnType, Schema, TIME_COLUMN_NAME};
use snafu::{ensure, OptionExt, ResultExt, Snafu};
use crate::{
@ -1619,7 +1617,7 @@ pub fn schema_has_all_expr_columns(schema: &Schema, expr: &Expr) -> bool {
#[cfg(test)]
mod tests {
use internal_types::schema::builder::SchemaBuilder;
use schema::builder::SchemaBuilder;
use super::*;

View File

@ -6,8 +6,8 @@ use datafusion::{
logical_plan::{col, Expr, LogicalPlan, LogicalPlanBuilder},
scalar::ScalarValue,
};
use internal_types::schema::{sort::SortKey, Schema, TIME_COLUMN_NAME};
use observability_deps::tracing::{debug, trace};
use schema::{sort::SortKey, Schema, TIME_COLUMN_NAME};
use crate::{
exec::make_stream_split,
@ -19,9 +19,7 @@ use snafu::{ResultExt, Snafu};
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Chunk schema not compatible for compact plan: {}", source))]
ChunkSchemaNotCompatible {
source: internal_types::schema::merge::Error,
},
ChunkSchemaNotCompatible { source: schema::merge::Error },
#[snafu(display("Reorg planner got error building plan: {}", source))]
BuildingPlan {
@ -254,7 +252,7 @@ struct ScanPlan<C: QueryChunk + 'static> {
mod test {
use arrow::compute::SortOptions;
use arrow_util::assert_batches_eq;
use internal_types::schema::merge::SchemaMerger;
use schema::merge::SchemaMerger;
use crate::{
exec::{Executor, ExecutorType},

View File

@ -36,7 +36,7 @@ use internal::{
I64LastSelector, I64MaxSelector, I64MinSelector, Utf8FirstSelector, Utf8LastSelector,
Utf8MaxSelector, Utf8MinSelector,
};
use internal_types::schema::TIME_DATA_TYPE;
use schema::TIME_DATA_TYPE;
/// Returns a DataFusion user defined aggregate function for computing
/// one field of the first() selector function.
@ -313,7 +313,7 @@ mod test {
util::pretty::pretty_format_batches,
};
use datafusion::{datasource::MemTable, logical_plan::Expr, prelude::*};
use internal_types::schema::TIME_DATA_TIMEZONE;
use schema::TIME_DATA_TIMEZONE;
use super::*;

View File

@ -1,7 +1,7 @@
mod internal;
pub use internal::{Duration, Window};
use internal_types::schema::TIME_DATA_TYPE;
use schema::TIME_DATA_TYPE;
use std::sync::Arc;
@ -94,7 +94,7 @@ pub fn make_window_bound_expr(
#[cfg(test)]
mod tests {
use arrow::array::TimestampNanosecondArray;
use internal_types::schema::TIME_DATA_TIMEZONE;
use schema::TIME_DATA_TIMEZONE;
use super::*;

View File

@ -14,15 +14,13 @@ use data_types::{
};
use datafusion::physical_plan::SendableRecordBatchStream;
use exec::stringset::StringSet;
use internal_types::{
schema::{sort::SortKey, Schema, TIME_COLUMN_NAME},
selection::Selection,
};
use observability_deps::tracing::{debug, trace};
use predicate::{
delete_predicate::DeletePredicate,
predicate::{Predicate, PredicateMatch},
};
use schema::selection::Selection;
use schema::{sort::SortKey, Schema, TIME_COLUMN_NAME};
use hashbrown::HashMap;
use std::{collections::BTreeSet, fmt::Debug, iter::FromIterator, sync::Arc};

View File

@ -18,9 +18,9 @@ use datafusion::{
ExecutionPlan,
},
};
use internal_types::schema::{merge::SchemaMerger, sort::SortKey, Schema};
use observability_deps::tracing::{debug, trace};
use predicate::predicate::{Predicate, PredicateBuilder};
use schema::{merge::SchemaMerger, sort::SortKey, Schema};
use crate::{
compute_sort_key,
@ -1048,7 +1048,7 @@ mod test {
use arrow::datatypes::DataType;
use arrow_util::assert_batches_eq;
use datafusion::physical_plan::collect;
use internal_types::schema::{builder::SchemaBuilder, TIME_COLUMN_NAME};
use schema::{builder::SchemaBuilder, TIME_COLUMN_NAME};
use crate::{
test::{raw_data, TestChunk},

View File

@ -381,7 +381,7 @@ impl RecordBatchDeduplicator {
}
/// Get column name out of the `expr`. TODO use
/// internal_types::schema::SortKey instead.
/// schema::SortKey instead.
fn get_col_name(expr: &dyn PhysicalExpr) -> &str {
expr.as_any()
.downcast_ref::<datafusion::physical_plan::expressions::Column>()

View File

@ -5,7 +5,7 @@
//! writes (and thus are stored in separate rows)
use data_types::partition_metadata::{ColumnSummary, StatOverlap, Statistics};
use internal_types::schema::TIME_COLUMN_NAME;
use schema::TIME_COLUMN_NAME;
use snafu::Snafu;
use crate::QueryChunkMeta;

View File

@ -11,7 +11,8 @@ use datafusion::{
DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics,
},
};
use internal_types::{schema::Schema, selection::Selection};
use schema::selection::Selection;
use schema::Schema;
use crate::QueryChunk;
use predicate::predicate::Predicate;

View File

@ -5,7 +5,7 @@ use datafusion::{
physical_plan::{ColumnStatistics, Statistics as DFStatistics},
scalar::ScalarValue,
};
use internal_types::schema::Schema;
use schema::Schema;
/// Converts stats.min and an appropriate `ScalarValue`
pub(crate) fn min_to_scalar(stats: &IOxStatistics) -> Option<ScalarValue> {
@ -88,7 +88,7 @@ mod test {
use super::*;
use data_types::partition_metadata::{InfluxDbType, StatValues};
use internal_types::schema::{builder::SchemaBuilder, InfluxFieldType};
use schema::{builder::SchemaBuilder, InfluxFieldType};
#[test]
fn convert() {

View File

@ -20,14 +20,13 @@ use data_types::{
};
use datafusion::physical_plan::{common::SizedRecordBatchStream, SendableRecordBatchStream};
use futures::StreamExt;
use internal_types::schema::sort::SortKey;
use internal_types::{
schema::{builder::SchemaBuilder, merge::SchemaMerger, InfluxColumnType, Schema},
selection::Selection,
};
use observability_deps::tracing::debug;
use parking_lot::Mutex;
use predicate::delete_predicate::DeletePredicate;
use schema::selection::Selection;
use schema::{
builder::SchemaBuilder, merge::SchemaMerger, sort::SortKey, InfluxColumnType, Schema,
};
use snafu::Snafu;
use std::num::NonZeroU64;
use std::{collections::BTreeMap, fmt, sync::Arc};

View File

@ -13,8 +13,8 @@ use datafusion::{
ExecutionPlan, PhysicalExpr,
},
};
use internal_types::schema::sort::SortKey;
use observability_deps::tracing::trace;
use schema::sort::SortKey;
/// Create a logical plan that produces the record batch
pub fn make_scan_plan(batch: RecordBatch) -> std::result::Result<LogicalPlan, DataFusionError> {

View File

@ -21,9 +21,9 @@ server = { path = "../server" }
arrow = { version = "5.5", features = ["prettyprint"] }
arrow_util = { path = "../arrow_util" }
data_types = { path = "../data_types" }
internal_types = { path = "../internal_types" }
metric = { path = "../metric" }
object_store = { path = "../object_store" }
schema = { path = "../schema" }
snafu = "0.6.3"
tempfile = "3.1.0"
test_helpers = { path = "../test_helpers" }

View File

@ -1,12 +1,10 @@
//! Tests for the table_names implementation
use arrow::datatypes::DataType;
use internal_types::{
schema::{builder::SchemaBuilder, sort::SortKey, Schema, TIME_COLUMN_NAME},
selection::Selection,
};
use predicate::predicate::PredicateBuilder;
use query::{QueryChunk, QueryChunkMeta, QueryDatabase};
use schema::selection::Selection;
use schema::{builder::SchemaBuilder, sort::SortKey, Schema, TIME_COLUMN_NAME};
use super::scenarios::*;

View File

@ -19,13 +19,13 @@ data_types = { path = "../data_types" }
datafusion = { path = "../datafusion" }
either = "1.6.1"
hashbrown = "0.11"
internal_types = { path = "../internal_types" }
itertools = "0.10.1"
metric = { path = "../metric" }
observability_deps = { path = "../observability_deps" }
parking_lot = "0.11"
permutation = "0.2.5"
snafu = "0.6"
schema = { path = "../schema" }
[dev-dependencies] # In alphabetical order
criterion = "0.3.3"

View File

@ -3,8 +3,8 @@ use arrow::{
record_batch::RecordBatch,
};
use criterion::{criterion_group, criterion_main, BatchSize, Criterion};
use internal_types::schema::builder::SchemaBuilder;
use read_buffer::{BinaryExpr, ChunkMetrics, Predicate, RBChunk};
use schema::builder::SchemaBuilder;
use std::sync::Arc;
const BASE_TIME: i64 = 1351700038292387000_i64;

View File

@ -4,13 +4,13 @@ use rand::prelude::*;
use rand::Rng;
use rand_distr::{Distribution, Normal};
use internal_types::selection::Selection;
use packers::{sorter, Packers};
use read_buffer::{
benchmarks::{Column, ColumnType, RowGroup},
RBChunk,
};
use read_buffer::{BinaryExpr, Predicate};
use schema::selection::Selection;
const ONE_MS: i64 = 1_000_000;

View File

@ -6,9 +6,10 @@ use crate::{
};
use arrow::record_batch::RecordBatch;
use data_types::{chunk_metadata::ChunkColumnSummary, partition_metadata::TableSummary};
use internal_types::{schema::builder::Error as SchemaError, schema::Schema, selection::Selection};
use metric::{Attributes, CumulativeGauge, CumulativeRecorder, RecorderCollection};
use observability_deps::tracing::debug;
use schema::selection::Selection;
use schema::{builder::Error as SchemaError, Schema};
use snafu::{ResultExt, Snafu};
use std::{
collections::{BTreeMap, BTreeSet},
@ -492,8 +493,8 @@ mod test {
},
};
use data_types::partition_metadata::{ColumnSummary, InfluxDbType, StatValues, Statistics};
use internal_types::schema::builder::SchemaBuilder;
use metric::{MetricKind, Observation, ObservationSet, RawReporter};
use schema::builder::SchemaBuilder;
use std::{num::NonZeroU64, sync::Arc};
// helper to make the `add_remove_tables` test simpler to read.

View File

@ -9,9 +9,9 @@ mod table;
mod value;
// Identifiers that are exported as part of the public API.
pub use self::schema::*;
pub use chunk::{Chunk as RBChunk, ChunkMetrics, Error};
pub use row_group::{BinaryExpr, Predicate};
pub use schema::*;
pub use table::ReadFilterResults;
/// THIS MODULE SHOULD ONLY BE IMPORTED FOR BENCHMARKS.

View File

@ -17,6 +17,7 @@ use crate::schema::{AggregateType, LogicalDataType, ResultSchema};
use crate::value::{
AggregateVec, EncodedValues, OwnedValue, Scalar, Value, Values, ValuesIterator,
};
use ::schema::{selection::Selection, InfluxColumnType, Schema};
use arrow::{
array,
array::ArrayRef,
@ -27,12 +28,10 @@ use datafusion::{
logical_plan::Expr as DfExpr, logical_plan::Operator as DFOperator,
scalar::ScalarValue as DFScalarValue,
};
use internal_types::schema::{InfluxColumnType, Schema};
use internal_types::selection::Selection;
use std::num::NonZeroU64;
/// The name used for a timestamp column.
pub const TIME_COLUMN_NAME: &str = internal_types::schema::TIME_COLUMN_NAME;
pub const TIME_COLUMN_NAME: &str = ::schema::TIME_COLUMN_NAME;
#[derive(Debug, Snafu)]
pub enum Error {
@ -40,9 +39,7 @@ pub enum Error {
ArrowConversion { source: arrow::error::ArrowError },
#[snafu(display("schema conversion error: {}", source))]
SchemaConversion {
source: internal_types::schema::builder::Error,
},
SchemaConversion { source: ::schema::builder::Error },
#[snafu(display("unsupported operation: {}", msg))]
UnsupportedOperation { msg: String },
@ -1787,7 +1784,7 @@ impl TryFrom<ReadFilterResult<'_>> for RecordBatch {
type Error = Error;
fn try_from(result: ReadFilterResult<'_>) -> Result<Self, Self::Error> {
let schema = internal_types::schema::Schema::try_from(result.schema())
let schema = ::schema::Schema::try_from(result.schema())
.map_err(|source| Error::SchemaConversion { source })?;
let columns: Vec<ArrayRef> = result
@ -2263,7 +2260,7 @@ impl TryFrom<ReadAggregateResult<'_>> for RecordBatch {
type Error = Error;
fn try_from(mut result: ReadAggregateResult<'_>) -> Result<Self, Self::Error> {
let schema = internal_types::schema::Schema::try_from(result.schema())
let schema = ::schema::Schema::try_from(result.schema())
.map_err(|source| Error::SchemaConversion { source })?;
let arrow_schema: arrow::datatypes::SchemaRef = schema.into();

View File

@ -1,6 +1,6 @@
use std::{convert::TryFrom, fmt::Display};
use internal_types::schema::InfluxFieldType;
use schema::InfluxFieldType;
/// A schema that is used to track the names and semantics of columns returned
/// in results out of various operations on a row group.
@ -96,11 +96,11 @@ impl Display for ResultSchema {
}
}
impl TryFrom<&ResultSchema> for internal_types::schema::Schema {
type Error = internal_types::schema::builder::Error;
impl TryFrom<&ResultSchema> for ::schema::Schema {
type Error = ::schema::builder::Error;
fn try_from(rs: &ResultSchema) -> Result<Self, Self::Error> {
let mut builder = internal_types::schema::builder::SchemaBuilder::new();
let mut builder = ::schema::builder::SchemaBuilder::new();
for (col_type, data_type) in &rs.select_columns {
match col_type {
ColumnType::Tag(name) => builder.tag(name.as_str()),

View File

@ -7,8 +7,8 @@ use crate::{
};
use arrow::record_batch::RecordBatch;
use data_types::{chunk_metadata::ChunkColumnSummary, partition_metadata::TableSummary};
use internal_types::selection::Selection;
use parking_lot::RwLock;
use schema::selection::Selection;
use snafu::{ensure, Snafu};
use std::{
collections::{BTreeMap, BTreeSet},

16
schema/Cargo.toml Normal file
View File

@ -0,0 +1,16 @@
[package]
name = "schema"
version = "0.1.0"
authors = ["Andrew Lamb <andrew@nerdnetworks.org>"]
edition = "2018"
description = "IOx Schema definition"
[dependencies]
arrow = { version = "5.5", features = ["prettyprint"] }
hashbrown = "0.11"
indexmap = "1.6"
itertools = "0.10.1"
snafu = "0.6"
[dev-dependencies]
arrow_util = { path = "../arrow_util" }

View File

@ -3,7 +3,7 @@ use std::convert::TryInto;
use arrow::datatypes::{DataType as ArrowDataType, Field as ArrowField};
use snafu::{ResultExt, Snafu};
use crate::schema::sort::SortKey;
use crate::sort::SortKey;
use super::{InfluxColumnType, InfluxFieldType, Schema, TIME_COLUMN_NAME};
@ -124,7 +124,7 @@ impl SchemaBuilder {
/// Creates an Arrow schema with embedded metadata.
/// All schema validation happens at this time.
/// ```
/// use internal_types::schema::{builder::SchemaBuilder, InfluxColumnType, InfluxFieldType};
/// use schema::{builder::SchemaBuilder, InfluxColumnType, InfluxFieldType};
///
/// let schema = SchemaBuilder::new()
/// .tag("region")

View File

@ -1,4 +1,4 @@
//! This module contains the schema definiton for IOx
//! This module contains the schema definition for IOx
use std::{
cmp::Ordering,
collections::HashMap,
@ -11,13 +11,12 @@ use arrow::datatypes::{
DataType as ArrowDataType, Field as ArrowField, Schema as ArrowSchema,
SchemaRef as ArrowSchemaRef, TimeUnit,
};
use hashbrown::HashSet;
use selection::Selection;
use snafu::{OptionExt, Snafu};
use crate::{
schema::sort::{ColumnSort, SortKey},
selection::Selection,
};
use hashbrown::HashSet;
use crate::sort::{ColumnSort, SortKey};
/// The name of the timestamp column in the InfluxDB datamodel
pub const TIME_COLUMN_NAME: &str = "time";
@ -41,6 +40,7 @@ pub fn TIME_DATA_TYPE() -> ArrowDataType {
pub mod builder;
pub mod merge;
pub mod selection;
pub mod sort;
/// Database schema creation / validation errors.
@ -787,11 +787,13 @@ macro_rules! assert_column_eq {
#[cfg(test)]
mod test {
use arrow::compute::SortOptions;
use InfluxColumnType::*;
use InfluxFieldType::*;
use crate::merge::SchemaMerger;
use super::{builder::SchemaBuilder, *};
use crate::schema::merge::SchemaMerger;
fn make_field(
name: &str,

View File

@ -3,7 +3,7 @@ use hashbrown::hash_map::RawEntryMut;
use hashbrown::HashMap;
use snafu::Snafu;
use crate::schema::sort::SortKey;
use crate::sort::SortKey;
use super::{InfluxColumnType, Schema};
@ -188,8 +188,8 @@ impl SchemaMerger {
#[cfg(test)]
mod tests {
use crate::schema::builder::SchemaBuilder;
use crate::schema::InfluxFieldType::Integer;
use crate::builder::SchemaBuilder;
use crate::InfluxFieldType::Integer;
use super::*;

View File

@ -43,6 +43,7 @@ query = { path = "../query" }
rand = "0.8.3"
rand_distr = "0.4.2"
read_buffer = { path = "../read_buffer" }
schema = { path = "../schema" }
serde = "1.0"
serde_json = "1.0"
snafu = "0.6"

View File

@ -1194,7 +1194,8 @@ mod tests {
use data_types::database_rules::{
PartitionTemplate, TemplatePart, WriteBufferConnection, WriteBufferDirection,
};
use entry::{test_helpers::lp_to_entries, Sequence, SequencedEntry};
use data_types::sequence::Sequence;
use entry::{test_helpers::lp_to_entries, SequencedEntry};
use object_store::ObjectStore;
use std::{
convert::{TryFrom, TryInto},

View File

@ -24,11 +24,11 @@ use data_types::{
chunk_metadata::{ChunkId, ChunkLifecycleAction, ChunkOrder, ChunkSummary},
database_rules::DatabaseRules,
partition_metadata::{PartitionSummary, TableSummary},
sequence::Sequence,
server_id::ServerId,
};
use datafusion::catalog::{catalog::CatalogProvider, schema::SchemaProvider};
use entry::{Entry, Sequence, SequencedEntry, TableBatch};
use internal_types::schema::Schema;
use entry::{Entry, SequencedEntry, TableBatch};
use iox_object_store::IoxObjectStore;
use mutable_buffer::chunk::{ChunkMetrics as MutableBufferChunkMetrics, MBChunk};
use observability_deps::tracing::{debug, error, info, warn};
@ -44,6 +44,7 @@ use query::{
exec::{ExecutionContextProvider, Executor, ExecutorType, IOxExecutionContext},
QueryDatabase,
};
use schema::Schema;
use trace::ctx::SpanContext;
use write_buffer::core::{WriteBufferReading, WriteBufferWriting};
@ -137,14 +138,10 @@ pub enum Error {
TableBatchMissingTimes {},
#[snafu(display("Table batch has invalid schema: {}", source))]
TableBatchSchemaExtractError {
source: internal_types::schema::builder::Error,
},
TableBatchSchemaExtractError { source: schema::builder::Error },
#[snafu(display("Table batch has mismatching schema: {}", source))]
TableBatchSchemaMergeError {
source: internal_types::schema::merge::Error,
},
TableBatchSchemaMergeError { source: schema::merge::Error },
#[snafu(display(
"Unable to flush partition at the moment {}:{}",
@ -1468,7 +1465,6 @@ mod tests {
write_summary::TimestampSummary,
};
use entry::test_helpers::lp_to_entry;
use internal_types::{schema::Schema, selection::Selection};
use iox_object_store::ParquetFilePath;
use metric::{Attributes, CumulativeGauge, Metric, Observation};
use object_store::ObjectStore;
@ -1479,6 +1475,8 @@ mod tests {
};
use persistence_windows::min_max_sequence::MinMaxSequence;
use query::{QueryChunk, QueryDatabase};
use schema::selection::Selection;
use schema::Schema;
use write_buffer::mock::{
MockBufferForWriting, MockBufferForWritingThatAlwaysErrors, MockBufferSharedState,
};
@ -2554,8 +2552,6 @@ mod tests {
assert!(start < chunk.time_of_first_write());
assert!(chunk.time_of_first_write() < after_data_load);
assert!(chunk.time_of_first_write() == chunk.time_of_last_write());
assert!(after_data_load < chunk.time_closed().unwrap());
assert!(chunk.time_closed().unwrap() < after_rollover);
}
#[tokio::test]
@ -2617,7 +2613,6 @@ mod tests {
time_of_last_access: None,
time_of_first_write: Utc.timestamp_nanos(1),
time_of_last_write: Utc.timestamp_nanos(1),
time_closed: None,
order: ChunkOrder::new(5).unwrap(),
}];
@ -2650,9 +2645,7 @@ mod tests {
let t_second_write = Utc::now();
write_lp_with_time(&db, "cpu bar=2 2", t_second_write).await;
let t_close_before = Utc::now();
db.rollover_partition("cpu", "1970-01-01T00").await.unwrap();
let t_close_after = Utc::now();
let mut chunk_summaries = db.chunk_summaries().unwrap();
@ -2661,8 +2654,6 @@ mod tests {
let summary = &chunk_summaries[0];
assert_eq!(summary.time_of_first_write, t_first_write);
assert_eq!(summary.time_of_last_write, t_second_write);
assert!(t_close_before <= summary.time_closed.unwrap());
assert!(summary.time_closed.unwrap() <= t_close_after);
}
fn assert_first_last_times_eq(chunk_summary: &ChunkSummary, expected: DateTime<Utc>) {
@ -2855,7 +2846,6 @@ mod tests {
time_of_last_access: None,
time_of_first_write: Utc.timestamp_nanos(1),
time_of_last_write: Utc.timestamp_nanos(1),
time_closed: None,
},
ChunkSummary {
partition_key: Arc::from("1970-01-05T15"),
@ -2870,7 +2860,6 @@ mod tests {
time_of_last_access: None,
time_of_first_write: Utc.timestamp_nanos(1),
time_of_last_write: Utc.timestamp_nanos(1),
time_closed: None,
},
ChunkSummary {
partition_key: Arc::from("1970-01-05T15"),
@ -2885,7 +2874,6 @@ mod tests {
time_of_last_access: None,
time_of_first_write: Utc.timestamp_nanos(1),
time_of_last_write: Utc.timestamp_nanos(1),
time_closed: None,
},
];

View File

@ -16,7 +16,6 @@ use datafusion::{
catalog::{catalog::CatalogProvider, schema::SchemaProvider},
datasource::TableProvider,
};
use internal_types::schema::Schema;
use metric::{Attributes, Metric, U64Counter};
use observability_deps::tracing::debug;
use predicate::predicate::{Predicate, PredicateBuilder};
@ -24,6 +23,7 @@ use query::{
provider::{ChunkPruner, ProviderBuilder},
QueryChunk, QueryChunkMeta, DEFAULT_SCHEMA,
};
use schema::Schema;
use system_tables::{SystemSchemaProvider, SYSTEM_SCHEMA};
use hashbrown::HashMap;

View File

@ -9,7 +9,7 @@ use hashbrown::{HashMap, HashSet};
use data_types::chunk_metadata::ChunkSummary;
use data_types::chunk_metadata::DetailedChunkSummary;
use data_types::partition_metadata::{PartitionAddr, PartitionSummary, TableSummary};
use internal_types::schema::Schema;
use schema::Schema;
use snafu::{OptionExt, Snafu};
use tracker::{MappedRwLockReadGuard, RwLock, RwLockReadGuard};

View File

@ -11,12 +11,13 @@ use data_types::{
partition_metadata::TableSummary,
write_summary::TimestampSummary,
};
use internal_types::{access::AccessRecorder, schema::Schema};
use internal_types::access::AccessRecorder;
use mutable_buffer::chunk::{snapshot::ChunkSnapshot as MBChunkSnapshot, MBChunk};
use observability_deps::tracing::debug;
use parquet_file::chunk::ParquetChunk;
use predicate::delete_predicate::DeletePredicate;
use read_buffer::RBChunk;
use schema::Schema;
use tracker::{TaskRegistration, TaskTracker};
use crate::db::catalog::metrics::{StorageRecorder, TimestampHistogram};
@ -219,10 +220,6 @@ pub struct CatalogChunk {
/// that data was originally written into
time_of_last_write: DateTime<Utc>,
/// Time at which this chunk was marked as closed. Note this is
/// not the same as the timestamps on the data itself
time_closed: Option<DateTime<Utc>>,
/// Order of this chunk relative to other overlapping chunks.
order: ChunkOrder,
}
@ -292,7 +289,6 @@ impl CatalogChunk {
access_recorder: Default::default(),
time_of_first_write: time_of_write,
time_of_last_write: time_of_write,
time_closed: None,
order,
};
chunk.update_metrics();
@ -330,7 +326,6 @@ impl CatalogChunk {
access_recorder: Default::default(),
time_of_first_write,
time_of_last_write,
time_closed: None,
order,
};
chunk.update_metrics();
@ -371,7 +366,6 @@ impl CatalogChunk {
access_recorder: Default::default(),
time_of_first_write,
time_of_last_write,
time_closed: None,
order,
};
chunk.update_metrics();
@ -421,10 +415,6 @@ impl CatalogChunk {
self.time_of_last_write
}
pub fn time_closed(&self) -> Option<DateTime<Utc>> {
self.time_closed
}
pub fn order(&self) -> ChunkOrder {
self.order
}
@ -589,7 +579,6 @@ impl CatalogChunk {
time_of_last_access,
time_of_first_write: self.time_of_first_write,
time_of_last_write: self.time_of_last_write,
time_closed: self.time_closed,
order: self.order,
}
}
@ -692,9 +681,6 @@ impl CatalogChunk {
match &self.stage {
ChunkStage::Open { mb_chunk, .. } => {
debug!(%self.addr, row_count=mb_chunk.rows(), "freezing chunk");
assert!(self.time_closed.is_none());
self.time_closed = Some(Utc::now());
let (s, _) = mb_chunk.snapshot();
// Cache table summary + schema
@ -945,10 +931,8 @@ mod tests {
let mut chunk = make_open_chunk();
let registration = TaskRegistration::new();
assert!(chunk.time_closed.is_none());
assert!(matches!(chunk.stage, ChunkStage::Open { .. }));
chunk.set_compacting(&registration).unwrap();
assert!(chunk.time_closed.is_some());
assert!(matches!(chunk.stage, ChunkStage::Frozen { .. }));
}

View File

@ -8,12 +8,12 @@ use data_types::{
partition_metadata::{PartitionAddr, PartitionSummary},
};
use hashbrown::HashMap;
use internal_types::schema::Schema;
use observability_deps::tracing::info;
use persistence_windows::{
min_max_sequence::OptionalMinMaxSequence, persistence_windows::PersistenceWindows,
};
use predicate::delete_predicate::DeletePredicate;
use schema::Schema;
use snafu::{OptionExt, Snafu};
use std::{collections::BTreeMap, fmt::Display, sync::Arc};
use tracker::RwLock;

View File

@ -2,7 +2,7 @@ use super::partition::Partition;
use crate::db::catalog::metrics::TableMetrics;
use data_types::partition_metadata::{PartitionAddr, PartitionSummary};
use hashbrown::HashMap;
use internal_types::schema::{
use schema::{
builder::SchemaBuilder,
merge::{Error as SchemaMergerError, SchemaMerger},
Schema,
@ -196,7 +196,7 @@ impl<'a> TableSchemaUpsertHandle<'a> {
#[cfg(test)]
mod tests {
use internal_types::schema::{InfluxColumnType, InfluxFieldType};
use schema::{InfluxColumnType, InfluxFieldType};
use super::*;

View File

@ -8,11 +8,7 @@ use data_types::{
};
use datafusion::physical_plan::SendableRecordBatchStream;
use datafusion_util::MemoryStream;
use internal_types::{
access::AccessRecorder,
schema::{sort::SortKey, Schema},
selection::Selection,
};
use internal_types::access::AccessRecorder;
use iox_object_store::ParquetFilePath;
use mutable_buffer::chunk::snapshot::ChunkSnapshot;
use observability_deps::tracing::debug;
@ -24,6 +20,7 @@ use predicate::{
};
use query::{exec::stringset::StringSet, QueryChunk, QueryChunkMeta};
use read_buffer::RBChunk;
use schema::{selection::Selection, sort::SortKey, Schema};
use snafu::{OptionExt, ResultExt, Snafu};
use std::{
collections::{BTreeMap, BTreeSet},
@ -54,9 +51,7 @@ pub enum Error {
},
#[snafu(display("Internal error restricting schema: {}", source))]
InternalSelectingSchema {
source: internal_types::schema::Error,
},
InternalSelectingSchema { source: schema::Error },
#[snafu(display("Predicate conversion error: {}", source))]
PredicateConversion { source: super::pred::Error },

View File

@ -14,10 +14,7 @@ use data_types::{
DatabaseName,
};
use datafusion::physical_plan::SendableRecordBatchStream;
use internal_types::{
access::AccessMetrics,
schema::{merge::SchemaMerger, Schema, TIME_COLUMN_NAME},
};
use internal_types::access::AccessMetrics;
use lifecycle::{
LifecycleChunk, LifecyclePartition, LifecycleReadGuard, LifecycleWriteGuard, LockableChunk,
LockablePartition,
@ -25,6 +22,7 @@ use lifecycle::{
use observability_deps::tracing::{info, trace};
use persistence_windows::persistence_windows::FlushHandle;
use query::QueryChunkMeta;
use schema::{merge::SchemaMerger, Schema, TIME_COLUMN_NAME};
use std::{
convert::TryInto,
fmt::Display,

View File

@ -13,7 +13,6 @@ use crate::db::{
use ::lifecycle::LifecycleWriteGuard;
use data_types::{chunk_metadata::ChunkLifecycleAction, job::Job};
use internal_types::selection::Selection;
use observability_deps::tracing::{debug, warn};
use parquet_file::{
catalog::interface::CatalogParquetInfo,
@ -27,6 +26,7 @@ use persistence_windows::{
};
use predicate::predicate::Predicate;
use query::{QueryChunk, QueryChunkMeta};
use schema::selection::Selection;
use snafu::ResultExt;
use std::{future::Future, sync::Arc};
use tracker::{TaskTracker, TrackedFuture, TrackedFutureExt};

View File

@ -5,7 +5,8 @@ use std::{
};
use chrono::Utc;
use entry::{Sequence, TableBatch};
use data_types::sequence::Sequence;
use entry::TableBatch;
use futures::TryStreamExt;
use observability_deps::tracing::info;
use persistence_windows::{
@ -420,11 +421,12 @@ mod tests {
use chrono::{DateTime, Utc};
use data_types::{
database_rules::{PartitionTemplate, Partitioner, TemplatePart},
sequence::Sequence,
server_id::ServerId,
};
use entry::{
test_helpers::{lp_to_entries, lp_to_entry},
Sequence, SequencedEntry,
SequencedEntry,
};
use object_store::ObjectStore;
use persistence_windows::{

View File

@ -49,8 +49,7 @@ fn chunk_summaries_schema() -> SchemaRef {
Field::new("row_count", DataType::UInt64, false),
Field::new("time_of_last_access", ts.clone(), true),
Field::new("time_of_first_write", ts.clone(), false),
Field::new("time_of_last_write", ts.clone(), false),
Field::new("time_closed", ts, true),
Field::new("time_of_last_write", ts, false),
Field::new("order", DataType::UInt32, false),
]))
}
@ -112,11 +111,6 @@ fn from_chunk_summaries(schema: SchemaRef, chunks: Vec<ChunkSummary>) -> Result<
.map(|c| c.time_of_last_write)
.map(time_to_ts)
.collect::<TimestampNanosecondArray>();
let time_closed = chunks
.iter()
.map(|c| c.time_closed)
.map(optional_time_to_ts)
.collect::<TimestampNanosecondArray>();
let order = chunks
.iter()
.map(|c| Some(c.order.get()))
@ -136,7 +130,6 @@ fn from_chunk_summaries(schema: SchemaRef, chunks: Vec<ChunkSummary>) -> Result<
Arc::new(time_of_last_access),
Arc::new(time_of_first_write),
Arc::new(time_of_last_write),
Arc::new(time_closed),
Arc::new(order),
],
)
@ -164,7 +157,6 @@ mod tests {
time_of_last_access: None,
time_of_first_write: Utc.timestamp_nanos(10_000_000_000),
time_of_last_write: Utc.timestamp_nanos(10_000_000_000),
time_closed: None,
order: ChunkOrder::new(5).unwrap(),
},
ChunkSummary {
@ -179,7 +171,6 @@ mod tests {
time_of_last_access: Some(Utc.timestamp_nanos(754_000_000_000)),
time_of_first_write: Utc.timestamp_nanos(80_000_000_000),
time_of_last_write: Utc.timestamp_nanos(80_000_000_000),
time_closed: None,
order: ChunkOrder::new(6).unwrap(),
},
ChunkSummary {
@ -194,19 +185,18 @@ mod tests {
time_of_last_access: Some(Utc.timestamp_nanos(5_000_000_000)),
time_of_first_write: Utc.timestamp_nanos(100_000_000_000),
time_of_last_write: Utc.timestamp_nanos(200_000_000_000),
time_closed: None,
order: ChunkOrder::new(7).unwrap(),
},
];
let expected = vec![
"+--------------------------------------+---------------+------------+-------------------+------------------------------+--------------+--------------------+-----------+----------------------+----------------------+----------------------+-------------+-------+",
"| id | partition_key | table_name | storage | lifecycle_action | memory_bytes | object_store_bytes | row_count | time_of_last_access | time_of_first_write | time_of_last_write | time_closed | order |",
"+--------------------------------------+---------------+------------+-------------------+------------------------------+--------------+--------------------+-----------+----------------------+----------------------+----------------------+-------------+-------+",
"| 00000000-0000-0000-0000-000000000000 | p1 | table1 | OpenMutableBuffer | | 23754 | | 11 | | 1970-01-01T00:00:10Z | 1970-01-01T00:00:10Z | | 5 |",
"| 00000000-0000-0000-0000-000000000001 | p1 | table1 | OpenMutableBuffer | Persisting to Object Storage | 23455 | | 22 | 1970-01-01T00:12:34Z | 1970-01-01T00:01:20Z | 1970-01-01T00:01:20Z | | 6 |",
"| 00000000-0000-0000-0000-000000000002 | p1 | table1 | ObjectStoreOnly | | 1234 | 5678 | 33 | 1970-01-01T00:00:05Z | 1970-01-01T00:01:40Z | 1970-01-01T00:03:20Z | | 7 |",
"+--------------------------------------+---------------+------------+-------------------+------------------------------+--------------+--------------------+-----------+----------------------+----------------------+----------------------+-------------+-------+",
"+--------------------------------------+---------------+------------+-------------------+------------------------------+--------------+--------------------+-----------+----------------------+----------------------+----------------------+-------+",
"| id | partition_key | table_name | storage | lifecycle_action | memory_bytes | object_store_bytes | row_count | time_of_last_access | time_of_first_write | time_of_last_write | order |",
"+--------------------------------------+---------------+------------+-------------------+------------------------------+--------------+--------------------+-----------+----------------------+----------------------+----------------------+-------+",
"| 00000000-0000-0000-0000-000000000000 | p1 | table1 | OpenMutableBuffer | | 23754 | | 11 | | 1970-01-01T00:00:10Z | 1970-01-01T00:00:10Z | 5 |",
"| 00000000-0000-0000-0000-000000000001 | p1 | table1 | OpenMutableBuffer | Persisting to Object Storage | 23455 | | 22 | 1970-01-01T00:12:34Z | 1970-01-01T00:01:20Z | 1970-01-01T00:01:20Z | 6 |",
"| 00000000-0000-0000-0000-000000000002 | p1 | table1 | ObjectStoreOnly | | 1234 | 5678 | 33 | 1970-01-01T00:00:05Z | 1970-01-01T00:01:40Z | 1970-01-01T00:03:20Z | 7 |",
"+--------------------------------------+---------------+------------+-------------------+------------------------------+--------------+--------------------+-----------+----------------------+----------------------+----------------------+-------+",
];
let schema = chunk_summaries_schema();

View File

@ -320,7 +320,6 @@ mod tests {
time_of_last_access: None,
time_of_first_write: Utc.timestamp_nanos(1),
time_of_last_write: Utc.timestamp_nanos(2),
time_closed: None,
order: ChunkOrder::new(5).unwrap(),
},
columns: vec![
@ -357,7 +356,6 @@ mod tests {
time_of_last_access: None,
time_of_first_write: Utc.timestamp_nanos(1),
time_of_last_write: Utc.timestamp_nanos(2),
time_closed: None,
order: ChunkOrder::new(6).unwrap(),
},
columns: vec![ChunkColumnSummary {
@ -388,7 +386,6 @@ mod tests {
time_of_last_access: None,
time_of_first_write: Utc.timestamp_nanos(1),
time_of_last_write: Utc.timestamp_nanos(2),
time_closed: None,
order: ChunkOrder::new(5).unwrap(),
},
columns: vec![ChunkColumnSummary {

View File

@ -201,8 +201,8 @@ mod tests {
use ::test_helpers::assert_contains;
use arrow_util::assert_batches_eq;
use data_types::database_rules::{PartitionTemplate, TemplatePart};
use data_types::sequence::Sequence;
use entry::test_helpers::lp_to_entry;
use entry::Sequence;
use persistence_windows::min_max_sequence::MinMaxSequence;
use query::exec::ExecutionContextProvider;
use query::frontend::sql::SqlQueryPlanner;

View File

@ -511,9 +511,7 @@ async fn test_chunk_get() {
// make sure there were timestamps prior to normalization
assert!(
chunks[0].time_of_first_write.is_some()
&& chunks[0].time_of_last_write.is_some()
&& chunks[0].time_closed.is_none(), // chunk is not yet closed
chunks[0].time_of_first_write.is_some() && chunks[0].time_of_last_write.is_some(), // chunk is not yet closed
"actual:{:#?}",
chunks[0]
);
@ -535,7 +533,6 @@ async fn test_chunk_get() {
time_of_last_access: None,
time_of_first_write: None,
time_of_last_write: None,
time_closed: None,
order: 1,
},
Chunk {
@ -550,7 +547,6 @@ async fn test_chunk_get() {
time_of_last_access: None,
time_of_first_write: None,
time_of_last_write: None,
time_closed: None,
order: 1,
},
];
@ -722,7 +718,6 @@ async fn test_list_partition_chunks() {
time_of_last_access: None,
time_of_first_write: None,
time_of_last_write: None,
time_closed: None,
order: 1,
}];
@ -1069,7 +1064,6 @@ fn normalize_chunks(chunks: Vec<Chunk>) -> Vec<Chunk> {
time_of_last_access: None,
time_of_first_write: None,
time_of_last_write: None,
time_closed: None,
memory_bytes,
object_store_bytes,
order,

View File

@ -10,7 +10,7 @@ description = "IOx logging pipeline built upon tokio-tracing"
[dependencies]
logfmt = { path = "../logfmt" }
observability_deps = { path = "../observability_deps" }
thiserror = "1.0.28"
thiserror = "1.0.30"
tracing-log = "0.1"
tracing-subscriber = "0.2"
structopt = { version = "0.3.23", optional = true }

View File

@ -2,7 +2,8 @@ use std::fmt::Debug;
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use entry::{Entry, Sequence, SequencedEntry};
use data_types::sequence::Sequence;
use entry::{Entry, SequencedEntry};
use futures::{future::BoxFuture, stream::BoxStream};
/// Generic boxed error type that is used in this crate.

View File

@ -8,8 +8,10 @@ use std::{
use async_trait::async_trait;
use chrono::{DateTime, TimeZone, Utc};
use data_types::{database_rules::WriteBufferCreationConfig, server_id::ServerId};
use entry::{Entry, Sequence, SequencedEntry};
use data_types::{
database_rules::WriteBufferCreationConfig, sequence::Sequence, server_id::ServerId,
};
use entry::{Entry, SequencedEntry};
use futures::{FutureExt, StreamExt};
use observability_deps::tracing::{debug, info};
use rdkafka::{

View File

@ -11,7 +11,8 @@ use futures::{stream, FutureExt, StreamExt};
use parking_lot::Mutex;
use data_types::database_rules::WriteBufferCreationConfig;
use entry::{Entry, Sequence, SequencedEntry};
use data_types::sequence::Sequence;
use entry::{Entry, SequencedEntry};
use crate::core::{
EntryStream, FetchHighWatermark, FetchHighWatermarkFut, WriteBufferError, WriteBufferReading,