diff --git a/Cargo.lock b/Cargo.lock index 3173fce4b3..185c34f342 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4363,7 +4363,6 @@ dependencies = [ "arrow_util", "async-trait", "backoff 0.1.0", - "data_types", "data_types2", "datafusion 0.1.0", "dml", diff --git a/compactor/src/query.rs b/compactor/src/query.rs index fe179c17d4..f770474a3a 100644 --- a/compactor/src/query.rs +++ b/compactor/src/query.rs @@ -1,11 +1,8 @@ //! Queryable Compactor Data -use std::sync::Arc; - -use data_types::timestamp::TimestampMinMax; use data_types2::{ ChunkAddr, ChunkId, ChunkOrder, DeletePredicate, PartitionId, SequenceNumber, TableSummary, - Timestamp, Tombstone, + Timestamp, TimestampMinMax, Tombstone, }; use datafusion::physical_plan::SendableRecordBatchStream; use observability_deps::tracing::trace; @@ -17,6 +14,7 @@ use query::{ }; use schema::{merge::SchemaMerger, selection::Selection, sort::SortKey, Schema}; use snafu::{ResultExt, Snafu}; +use std::sync::Arc; #[derive(Debug, Snafu)] #[allow(missing_copy_implementations, missing_docs)] diff --git a/data_types2/src/lib.rs b/data_types2/src/lib.rs index c3369c8624..2e97d0ae52 100644 --- a/data_types2/src/lib.rs +++ b/data_types2/src/lib.rs @@ -27,8 +27,6 @@ use std::{ }; use uuid::Uuid; -pub use data_types::timestamp::{TimestampMinMax, TimestampRange, MAX_NANO_TIME, MIN_NANO_TIME}; - /// Unique ID for a `Namespace` #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, sqlx::Type)] #[sqlx(transparent)] @@ -1939,6 +1937,105 @@ impl Sequence { } } +/// minimum time that can be represented. +/// +/// 1677-09-21 00:12:43.145224194 +0000 UTC +/// +/// The two lowest minimum integers are used as sentinel values. The +/// minimum value needs to be used as a value lower than any other value for +/// comparisons and another separate value is needed to act as a sentinel +/// default value that is unusable by the user, but usable internally. +/// Because these two values need to be used for a special purpose, we do +/// not allow users to write points at these two times. +/// +/// Source: [influxdb](https://github.com/influxdata/influxdb/blob/540bb66e1381a48a6d1ede4fc3e49c75a7d9f4af/models/time.go#L12-L34) +pub const MIN_NANO_TIME: i64 = i64::MIN + 2; + +/// maximum time that can be represented. +/// +/// 2262-04-11 23:47:16.854775806 +0000 UTC +/// +/// The highest time represented by a nanosecond needs to be used for an +/// exclusive range in the shard group, so the maximum time needs to be one +/// less than the possible maximum number of nanoseconds representable by an +/// int64 so that we don't lose a point at that one time. +/// Source: [influxdb](https://github.com/influxdata/influxdb/blob/540bb66e1381a48a6d1ede4fc3e49c75a7d9f4af/models/time.go#L12-L34) +pub const MAX_NANO_TIME: i64 = i64::MAX - 1; + +/// Specifies a continuous range of nanosecond timestamps. Timestamp +/// predicates are so common and critical to performance of timeseries +/// databases in general, and IOx in particular, that they are handled +/// specially +/// +/// Timestamp ranges are defined such that a value `v` is within the +/// range iff: +/// +/// ```text +/// range.start <= v < range.end +/// ``` +#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Copy, Debug, Hash)] +pub struct TimestampRange { + /// Start defines the inclusive lower bound. Minimum value is [MIN_NANO_TIME] + start: i64, + /// End defines the inclusive upper bound. Maximum value is [MAX_NANO_TIME] + end: i64, +} + +impl TimestampRange { + /// Create a new TimestampRange. Clamps to MIN_NANO_TIME/MAX_NANO_TIME. + pub fn new(start: i64, end: i64) -> Self { + debug_assert!(end >= start); + let start = start.max(MIN_NANO_TIME); + let end = end.min(MAX_NANO_TIME); + Self { start, end } + } + + #[inline] + /// Returns true if this range contains the value v + pub fn contains(&self, v: i64) -> bool { + self.start <= v && v < self.end + } + + /// Return the timestamp range's end. + pub fn end(&self) -> i64 { + self.end + } + + /// Return the timestamp range's start. + pub fn start(&self) -> i64 { + self.start + } +} + +/// Specifies a min/max timestamp value. +/// +/// Note this differs subtlety (but critically) from a +/// `TimestampRange` as the minimum and maximum values are included +#[derive(Clone, Debug, Copy)] +pub struct TimestampMinMax { + /// The minimum timestamp value + pub min: i64, + /// the maximum timestamp value + pub max: i64, +} + +impl TimestampMinMax { + /// Create a new TimestampMinMax. Panics if min > max. + pub fn new(min: i64, max: i64) -> Self { + assert!(min <= max, "expected min ({}) <= max ({})", min, max); + Self { min, max } + } + + #[inline] + /// Returns true if any of the values between min / max + /// (inclusive) are contained within the specified timestamp range + pub fn overlaps(&self, range: TimestampRange) -> bool { + range.contains(self.min) + || range.contains(self.max) + || (self.min <= range.start && self.max >= range.end) + } +} + #[cfg(test)] mod tests { use super::*; @@ -2882,4 +2979,80 @@ mod tests { assert_eq!(stat.max, None); assert_eq!(stat.total_count, 1); } + + #[test] + fn test_timestamp_nano_min_max() { + let cases = vec![ + ( + "MIN/MAX Nanos", + TimestampRange::new(MIN_NANO_TIME, MAX_NANO_TIME), + ), + ("MIN/MAX i64", TimestampRange::new(i64::MIN, i64::MAX)), + ]; + + for (name, range) in cases { + println!("case: {}", name); + assert!(!range.contains(i64::MIN)); + assert!(range.contains(MIN_NANO_TIME)); + assert!(range.contains(MIN_NANO_TIME + 1)); + assert!(range.contains(MAX_NANO_TIME - 1)); + assert!(!range.contains(MAX_NANO_TIME)); + assert!(!range.contains(i64::MAX)); + } + } + + #[test] + fn test_timestamp_i64_min_max_offset() { + let range = TimestampRange::new(MIN_NANO_TIME + 1, MAX_NANO_TIME - 1); + + assert!(!range.contains(i64::MIN)); + assert!(!range.contains(MIN_NANO_TIME)); + assert!(range.contains(MIN_NANO_TIME + 1)); + assert!(range.contains(MAX_NANO_TIME - 2)); + assert!(!range.contains(MAX_NANO_TIME - 1)); + assert!(!range.contains(MAX_NANO_TIME)); + assert!(!range.contains(i64::MAX)); + } + + #[test] + fn test_timestamp_range_contains() { + let range = TimestampRange::new(100, 200); + assert!(!range.contains(99)); + assert!(range.contains(100)); + assert!(range.contains(101)); + assert!(range.contains(199)); + assert!(!range.contains(200)); + assert!(!range.contains(201)); + } + + #[test] + fn test_timestamp_range_overlaps() { + let range = TimestampRange::new(100, 200); + assert!(!TimestampMinMax::new(0, 99).overlaps(range)); + assert!(TimestampMinMax::new(0, 100).overlaps(range)); + assert!(TimestampMinMax::new(0, 101).overlaps(range)); + + assert!(TimestampMinMax::new(0, 200).overlaps(range)); + assert!(TimestampMinMax::new(0, 201).overlaps(range)); + assert!(TimestampMinMax::new(0, 300).overlaps(range)); + + assert!(TimestampMinMax::new(100, 101).overlaps(range)); + assert!(TimestampMinMax::new(100, 200).overlaps(range)); + assert!(TimestampMinMax::new(100, 201).overlaps(range)); + + assert!(TimestampMinMax::new(101, 101).overlaps(range)); + assert!(TimestampMinMax::new(101, 200).overlaps(range)); + assert!(TimestampMinMax::new(101, 201).overlaps(range)); + + assert!(!TimestampMinMax::new(200, 200).overlaps(range)); + assert!(!TimestampMinMax::new(200, 201).overlaps(range)); + + assert!(!TimestampMinMax::new(201, 300).overlaps(range)); + } + + #[test] + #[should_panic(expected = "expected min (2) <= max (1)")] + fn test_timestamp_min_max_invalid() { + TimestampMinMax::new(2, 1); + } } diff --git a/dml/src/lib.rs b/dml/src/lib.rs index 2eb0d96fd4..c7750fdfcf 100644 --- a/dml/src/lib.rs +++ b/dml/src/lib.rs @@ -487,8 +487,8 @@ mod tests { use data_types::{ consistent_hasher::ConsistentHasher, router::{HashRing, Matcher, MatcherToShard}, - timestamp::TimestampRange, }; + use data_types2::TimestampRange; use mutable_batch_lp::lines_to_batches; use regex::Regex; diff --git a/ingester/src/query.rs b/ingester/src/query.rs index 6d3a3b5ff4..e0de111ae9 100644 --- a/ingester/src/query.rs +++ b/ingester/src/query.rs @@ -3,10 +3,9 @@ use crate::data::{QueryableBatch, SnapshotBatch}; use arrow::record_batch::RecordBatch; use arrow_util::util::merge_record_batches; -use data_types::timestamp::TimestampMinMax; use data_types2::{ ChunkAddr, ChunkId, ChunkOrder, DeletePredicate, PartitionId, SequenceNumber, TableSummary, - Tombstone, + TimestampMinMax, Tombstone, }; use datafusion::{ logical_plan::ExprRewritable, diff --git a/querier/src/chunk/mod.rs b/querier/src/chunk/mod.rs index f649ff7cce..cd502ab42d 100644 --- a/querier/src/chunk/mod.rs +++ b/querier/src/chunk/mod.rs @@ -2,10 +2,9 @@ use crate::cache::CatalogCache; use arrow::record_batch::RecordBatch; -use data_types::timestamp::TimestampMinMax; use data_types2::{ ChunkAddr, ChunkId, ChunkOrder, DeletePredicate, ParquetFile, ParquetFileId, - ParquetFileWithMetadata, PartitionId, SequenceNumber, SequencerId, + ParquetFileWithMetadata, PartitionId, SequenceNumber, SequencerId, TimestampMinMax, }; use futures::StreamExt; use iox_catalog::interface::Catalog; diff --git a/querier/src/chunk/query_access.rs b/querier/src/chunk/query_access.rs index 8d6d17da0a..9596ad230f 100644 --- a/querier/src/chunk/query_access.rs +++ b/querier/src/chunk/query_access.rs @@ -1,5 +1,4 @@ -use std::sync::Arc; - +use crate::chunk::{ChunkStorage, QuerierChunk}; use data_types2::{ ChunkAddr, ChunkId, ChunkOrder, DeletePredicate, PartitionId, TableSummary, TimestampMinMax, }; @@ -8,8 +7,7 @@ use predicate::PredicateMatch; use query::{QueryChunk, QueryChunkError, QueryChunkMeta}; use schema::{sort::SortKey, Schema}; use snafu::{ResultExt, Snafu}; - -use crate::chunk::{ChunkStorage, QuerierChunk}; +use std::sync::Arc; #[derive(Debug, Snafu)] pub enum Error { diff --git a/querier/src/ingester/mod.rs b/querier/src/ingester/mod.rs index 529d1a6105..9f814b53e5 100644 --- a/querier/src/ingester/mod.rs +++ b/querier/src/ingester/mod.rs @@ -1,10 +1,13 @@ +use self::{ + flight_client::{Error as FlightClientError, FlightClient, FlightClientImpl, FlightError}, + test_util::MockIngesterConnection, +}; use crate::cache::CatalogCache; use arrow::{datatypes::DataType, error::ArrowError, record_batch::RecordBatch}; use async_trait::async_trait; -use data_types::timestamp::TimestampMinMax; use data_types2::{ ChunkAddr, ChunkId, ChunkOrder, ColumnSummary, InfluxDbType, PartitionId, SequenceNumber, - SequencerId, StatValues, Statistics, TableSummary, + SequencerId, StatValues, Statistics, TableSummary, TimestampMinMax, }; use datafusion_util::MemoryStream; use futures::{stream::FuturesUnordered, TryStreamExt}; @@ -20,11 +23,6 @@ use schema::{selection::Selection, sort::SortKey, InfluxColumnType, InfluxFieldT use snafu::{ensure, OptionExt, ResultExt, Snafu}; use std::{any::Any, collections::HashMap, sync::Arc}; -use self::{ - flight_client::{Error as FlightClientError, FlightClient, FlightClientImpl, FlightError}, - test_util::MockIngesterConnection, -}; - pub(crate) mod flight_client; pub(crate) mod test_util; diff --git a/query_tests/Cargo.toml b/query_tests/Cargo.toml index 56397021ab..9850de37b0 100644 --- a/query_tests/Cargo.toml +++ b/query_tests/Cargo.toml @@ -5,13 +5,10 @@ authors = ["Andrew Lamb "] edition = "2021" description = "Tests of the query engine against different database configurations" -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - [dependencies] arrow = { version = "13", features = ["prettyprint"] } async-trait = "0.1" backoff = { path = "../backoff" } -data_types = { path = "../data_types" } data_types2 = { path = "../data_types2" } datafusion = { path = "../datafusion" } dml = { path = "../dml" } diff --git a/query_tests/src/influxrpc/field_columns.rs b/query_tests/src/influxrpc/field_columns.rs index 6ae01e351c..237d94875c 100644 --- a/query_tests/src/influxrpc/field_columns.rs +++ b/query_tests/src/influxrpc/field_columns.rs @@ -1,15 +1,13 @@ +use crate::scenarios::*; use arrow::datatypes::DataType; -use data_types::timestamp::{MAX_NANO_TIME, MIN_NANO_TIME}; +use data_types2::{MAX_NANO_TIME, MIN_NANO_TIME}; use datafusion::logical_plan::{col, lit}; -use predicate::rpc_predicate::InfluxRpcPredicate; -use predicate::PredicateBuilder; +use predicate::{rpc_predicate::InfluxRpcPredicate, PredicateBuilder}; use query::{ exec::fieldlist::{Field, FieldList}, frontend::influxrpc::InfluxRpcPlanner, }; -use crate::scenarios::*; - /// Creates and loads several database scenarios using the db_setup /// function. /// diff --git a/query_tests/src/influxrpc/table_names.rs b/query_tests/src/influxrpc/table_names.rs index 63f10232c0..03b5dedd61 100644 --- a/query_tests/src/influxrpc/table_names.rs +++ b/query_tests/src/influxrpc/table_names.rs @@ -1,15 +1,13 @@ //! Tests for the Influx gRPC queries -use data_types::timestamp::{MAX_NANO_TIME, MIN_NANO_TIME}; +use crate::scenarios::*; +use data_types2::{MAX_NANO_TIME, MIN_NANO_TIME}; use datafusion::logical_plan::{col, lit}; -use predicate::rpc_predicate::InfluxRpcPredicate; -use predicate::PredicateBuilder; +use predicate::{rpc_predicate::InfluxRpcPredicate, PredicateBuilder}; use query::{ exec::stringset::{IntoStringSet, StringSetRef}, frontend::influxrpc::InfluxRpcPlanner, }; -use crate::scenarios::*; - /// runs table_names(predicate) and compares it to the expected /// output async fn run_table_names_test_case( diff --git a/query_tests/src/influxrpc/tag_keys.rs b/query_tests/src/influxrpc/tag_keys.rs index 68e11c0daa..0288063ab6 100644 --- a/query_tests/src/influxrpc/tag_keys.rs +++ b/query_tests/src/influxrpc/tag_keys.rs @@ -1,14 +1,12 @@ -use data_types::timestamp::{MAX_NANO_TIME, MIN_NANO_TIME}; +use crate::scenarios::*; +use data_types2::{MAX_NANO_TIME, MIN_NANO_TIME}; use datafusion::logical_plan::{col, lit}; -use predicate::rpc_predicate::InfluxRpcPredicate; -use predicate::PredicateBuilder; +use predicate::{rpc_predicate::InfluxRpcPredicate, PredicateBuilder}; use query::{ exec::stringset::{IntoStringSet, StringSetRef}, frontend::influxrpc::InfluxRpcPlanner, }; -use crate::scenarios::*; - /// Creates and loads several database scenarios using the db_setup /// function. ///