fix: Move timestamp types to data_types2

pull/24376/head
Carol (Nichols || Goulding) 2022-05-05 10:41:42 -04:00
parent 236edb9181
commit 2ef44f2024
No known key found for this signature in database
GPG Key ID: E907EE5A736F87D4
12 changed files with 196 additions and 41 deletions

1
Cargo.lock generated
View File

@ -4363,7 +4363,6 @@ dependencies = [
"arrow_util",
"async-trait",
"backoff 0.1.0",
"data_types",
"data_types2",
"datafusion 0.1.0",
"dml",

View File

@ -1,11 +1,8 @@
//! Queryable Compactor Data
use std::sync::Arc;
use data_types::timestamp::TimestampMinMax;
use data_types2::{
ChunkAddr, ChunkId, ChunkOrder, DeletePredicate, PartitionId, SequenceNumber, TableSummary,
Timestamp, Tombstone,
Timestamp, TimestampMinMax, Tombstone,
};
use datafusion::physical_plan::SendableRecordBatchStream;
use observability_deps::tracing::trace;
@ -17,6 +14,7 @@ use query::{
};
use schema::{merge::SchemaMerger, selection::Selection, sort::SortKey, Schema};
use snafu::{ResultExt, Snafu};
use std::sync::Arc;
#[derive(Debug, Snafu)]
#[allow(missing_copy_implementations, missing_docs)]

View File

@ -27,8 +27,6 @@ use std::{
};
use uuid::Uuid;
pub use data_types::timestamp::{TimestampMinMax, TimestampRange, MAX_NANO_TIME, MIN_NANO_TIME};
/// Unique ID for a `Namespace`
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, sqlx::Type)]
#[sqlx(transparent)]
@ -1939,6 +1937,105 @@ impl Sequence {
}
}
/// minimum time that can be represented.
///
/// 1677-09-21 00:12:43.145224194 +0000 UTC
///
/// The two lowest minimum integers are used as sentinel values. The
/// minimum value needs to be used as a value lower than any other value for
/// comparisons and another separate value is needed to act as a sentinel
/// default value that is unusable by the user, but usable internally.
/// Because these two values need to be used for a special purpose, we do
/// not allow users to write points at these two times.
///
/// Source: [influxdb](https://github.com/influxdata/influxdb/blob/540bb66e1381a48a6d1ede4fc3e49c75a7d9f4af/models/time.go#L12-L34)
pub const MIN_NANO_TIME: i64 = i64::MIN + 2;
/// maximum time that can be represented.
///
/// 2262-04-11 23:47:16.854775806 +0000 UTC
///
/// The highest time represented by a nanosecond needs to be used for an
/// exclusive range in the shard group, so the maximum time needs to be one
/// less than the possible maximum number of nanoseconds representable by an
/// int64 so that we don't lose a point at that one time.
/// Source: [influxdb](https://github.com/influxdata/influxdb/blob/540bb66e1381a48a6d1ede4fc3e49c75a7d9f4af/models/time.go#L12-L34)
pub const MAX_NANO_TIME: i64 = i64::MAX - 1;
/// Specifies a continuous range of nanosecond timestamps. Timestamp
/// predicates are so common and critical to performance of timeseries
/// databases in general, and IOx in particular, that they are handled
/// specially
///
/// Timestamp ranges are defined such that a value `v` is within the
/// range iff:
///
/// ```text
/// range.start <= v < range.end
/// ```
#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Copy, Debug, Hash)]
pub struct TimestampRange {
/// Start defines the inclusive lower bound. Minimum value is [MIN_NANO_TIME]
start: i64,
/// End defines the inclusive upper bound. Maximum value is [MAX_NANO_TIME]
end: i64,
}
impl TimestampRange {
/// Create a new TimestampRange. Clamps to MIN_NANO_TIME/MAX_NANO_TIME.
pub fn new(start: i64, end: i64) -> Self {
debug_assert!(end >= start);
let start = start.max(MIN_NANO_TIME);
let end = end.min(MAX_NANO_TIME);
Self { start, end }
}
#[inline]
/// Returns true if this range contains the value v
pub fn contains(&self, v: i64) -> bool {
self.start <= v && v < self.end
}
/// Return the timestamp range's end.
pub fn end(&self) -> i64 {
self.end
}
/// Return the timestamp range's start.
pub fn start(&self) -> i64 {
self.start
}
}
/// Specifies a min/max timestamp value.
///
/// Note this differs subtlety (but critically) from a
/// `TimestampRange` as the minimum and maximum values are included
#[derive(Clone, Debug, Copy)]
pub struct TimestampMinMax {
/// The minimum timestamp value
pub min: i64,
/// the maximum timestamp value
pub max: i64,
}
impl TimestampMinMax {
/// Create a new TimestampMinMax. Panics if min > max.
pub fn new(min: i64, max: i64) -> Self {
assert!(min <= max, "expected min ({}) <= max ({})", min, max);
Self { min, max }
}
#[inline]
/// Returns true if any of the values between min / max
/// (inclusive) are contained within the specified timestamp range
pub fn overlaps(&self, range: TimestampRange) -> bool {
range.contains(self.min)
|| range.contains(self.max)
|| (self.min <= range.start && self.max >= range.end)
}
}
#[cfg(test)]
mod tests {
use super::*;
@ -2882,4 +2979,80 @@ mod tests {
assert_eq!(stat.max, None);
assert_eq!(stat.total_count, 1);
}
#[test]
fn test_timestamp_nano_min_max() {
let cases = vec![
(
"MIN/MAX Nanos",
TimestampRange::new(MIN_NANO_TIME, MAX_NANO_TIME),
),
("MIN/MAX i64", TimestampRange::new(i64::MIN, i64::MAX)),
];
for (name, range) in cases {
println!("case: {}", name);
assert!(!range.contains(i64::MIN));
assert!(range.contains(MIN_NANO_TIME));
assert!(range.contains(MIN_NANO_TIME + 1));
assert!(range.contains(MAX_NANO_TIME - 1));
assert!(!range.contains(MAX_NANO_TIME));
assert!(!range.contains(i64::MAX));
}
}
#[test]
fn test_timestamp_i64_min_max_offset() {
let range = TimestampRange::new(MIN_NANO_TIME + 1, MAX_NANO_TIME - 1);
assert!(!range.contains(i64::MIN));
assert!(!range.contains(MIN_NANO_TIME));
assert!(range.contains(MIN_NANO_TIME + 1));
assert!(range.contains(MAX_NANO_TIME - 2));
assert!(!range.contains(MAX_NANO_TIME - 1));
assert!(!range.contains(MAX_NANO_TIME));
assert!(!range.contains(i64::MAX));
}
#[test]
fn test_timestamp_range_contains() {
let range = TimestampRange::new(100, 200);
assert!(!range.contains(99));
assert!(range.contains(100));
assert!(range.contains(101));
assert!(range.contains(199));
assert!(!range.contains(200));
assert!(!range.contains(201));
}
#[test]
fn test_timestamp_range_overlaps() {
let range = TimestampRange::new(100, 200);
assert!(!TimestampMinMax::new(0, 99).overlaps(range));
assert!(TimestampMinMax::new(0, 100).overlaps(range));
assert!(TimestampMinMax::new(0, 101).overlaps(range));
assert!(TimestampMinMax::new(0, 200).overlaps(range));
assert!(TimestampMinMax::new(0, 201).overlaps(range));
assert!(TimestampMinMax::new(0, 300).overlaps(range));
assert!(TimestampMinMax::new(100, 101).overlaps(range));
assert!(TimestampMinMax::new(100, 200).overlaps(range));
assert!(TimestampMinMax::new(100, 201).overlaps(range));
assert!(TimestampMinMax::new(101, 101).overlaps(range));
assert!(TimestampMinMax::new(101, 200).overlaps(range));
assert!(TimestampMinMax::new(101, 201).overlaps(range));
assert!(!TimestampMinMax::new(200, 200).overlaps(range));
assert!(!TimestampMinMax::new(200, 201).overlaps(range));
assert!(!TimestampMinMax::new(201, 300).overlaps(range));
}
#[test]
#[should_panic(expected = "expected min (2) <= max (1)")]
fn test_timestamp_min_max_invalid() {
TimestampMinMax::new(2, 1);
}
}

View File

@ -487,8 +487,8 @@ mod tests {
use data_types::{
consistent_hasher::ConsistentHasher,
router::{HashRing, Matcher, MatcherToShard},
timestamp::TimestampRange,
};
use data_types2::TimestampRange;
use mutable_batch_lp::lines_to_batches;
use regex::Regex;

View File

@ -3,10 +3,9 @@
use crate::data::{QueryableBatch, SnapshotBatch};
use arrow::record_batch::RecordBatch;
use arrow_util::util::merge_record_batches;
use data_types::timestamp::TimestampMinMax;
use data_types2::{
ChunkAddr, ChunkId, ChunkOrder, DeletePredicate, PartitionId, SequenceNumber, TableSummary,
Tombstone,
TimestampMinMax, Tombstone,
};
use datafusion::{
logical_plan::ExprRewritable,

View File

@ -2,10 +2,9 @@
use crate::cache::CatalogCache;
use arrow::record_batch::RecordBatch;
use data_types::timestamp::TimestampMinMax;
use data_types2::{
ChunkAddr, ChunkId, ChunkOrder, DeletePredicate, ParquetFile, ParquetFileId,
ParquetFileWithMetadata, PartitionId, SequenceNumber, SequencerId,
ParquetFileWithMetadata, PartitionId, SequenceNumber, SequencerId, TimestampMinMax,
};
use futures::StreamExt;
use iox_catalog::interface::Catalog;

View File

@ -1,5 +1,4 @@
use std::sync::Arc;
use crate::chunk::{ChunkStorage, QuerierChunk};
use data_types2::{
ChunkAddr, ChunkId, ChunkOrder, DeletePredicate, PartitionId, TableSummary, TimestampMinMax,
};
@ -8,8 +7,7 @@ use predicate::PredicateMatch;
use query::{QueryChunk, QueryChunkError, QueryChunkMeta};
use schema::{sort::SortKey, Schema};
use snafu::{ResultExt, Snafu};
use crate::chunk::{ChunkStorage, QuerierChunk};
use std::sync::Arc;
#[derive(Debug, Snafu)]
pub enum Error {

View File

@ -1,10 +1,13 @@
use self::{
flight_client::{Error as FlightClientError, FlightClient, FlightClientImpl, FlightError},
test_util::MockIngesterConnection,
};
use crate::cache::CatalogCache;
use arrow::{datatypes::DataType, error::ArrowError, record_batch::RecordBatch};
use async_trait::async_trait;
use data_types::timestamp::TimestampMinMax;
use data_types2::{
ChunkAddr, ChunkId, ChunkOrder, ColumnSummary, InfluxDbType, PartitionId, SequenceNumber,
SequencerId, StatValues, Statistics, TableSummary,
SequencerId, StatValues, Statistics, TableSummary, TimestampMinMax,
};
use datafusion_util::MemoryStream;
use futures::{stream::FuturesUnordered, TryStreamExt};
@ -20,11 +23,6 @@ use schema::{selection::Selection, sort::SortKey, InfluxColumnType, InfluxFieldT
use snafu::{ensure, OptionExt, ResultExt, Snafu};
use std::{any::Any, collections::HashMap, sync::Arc};
use self::{
flight_client::{Error as FlightClientError, FlightClient, FlightClientImpl, FlightError},
test_util::MockIngesterConnection,
};
pub(crate) mod flight_client;
pub(crate) mod test_util;

View File

@ -5,13 +5,10 @@ authors = ["Andrew Lamb <andrew@nerdnetworks.org>"]
edition = "2021"
description = "Tests of the query engine against different database configurations"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
arrow = { version = "13", features = ["prettyprint"] }
async-trait = "0.1"
backoff = { path = "../backoff" }
data_types = { path = "../data_types" }
data_types2 = { path = "../data_types2" }
datafusion = { path = "../datafusion" }
dml = { path = "../dml" }

View File

@ -1,15 +1,13 @@
use crate::scenarios::*;
use arrow::datatypes::DataType;
use data_types::timestamp::{MAX_NANO_TIME, MIN_NANO_TIME};
use data_types2::{MAX_NANO_TIME, MIN_NANO_TIME};
use datafusion::logical_plan::{col, lit};
use predicate::rpc_predicate::InfluxRpcPredicate;
use predicate::PredicateBuilder;
use predicate::{rpc_predicate::InfluxRpcPredicate, PredicateBuilder};
use query::{
exec::fieldlist::{Field, FieldList},
frontend::influxrpc::InfluxRpcPlanner,
};
use crate::scenarios::*;
/// Creates and loads several database scenarios using the db_setup
/// function.
///

View File

@ -1,15 +1,13 @@
//! Tests for the Influx gRPC queries
use data_types::timestamp::{MAX_NANO_TIME, MIN_NANO_TIME};
use crate::scenarios::*;
use data_types2::{MAX_NANO_TIME, MIN_NANO_TIME};
use datafusion::logical_plan::{col, lit};
use predicate::rpc_predicate::InfluxRpcPredicate;
use predicate::PredicateBuilder;
use predicate::{rpc_predicate::InfluxRpcPredicate, PredicateBuilder};
use query::{
exec::stringset::{IntoStringSet, StringSetRef},
frontend::influxrpc::InfluxRpcPlanner,
};
use crate::scenarios::*;
/// runs table_names(predicate) and compares it to the expected
/// output
async fn run_table_names_test_case<D>(

View File

@ -1,14 +1,12 @@
use data_types::timestamp::{MAX_NANO_TIME, MIN_NANO_TIME};
use crate::scenarios::*;
use data_types2::{MAX_NANO_TIME, MIN_NANO_TIME};
use datafusion::logical_plan::{col, lit};
use predicate::rpc_predicate::InfluxRpcPredicate;
use predicate::PredicateBuilder;
use predicate::{rpc_predicate::InfluxRpcPredicate, PredicateBuilder};
use query::{
exec::stringset::{IntoStringSet, StringSetRef},
frontend::influxrpc::InfluxRpcPlanner,
};
use crate::scenarios::*;
/// Creates and loads several database scenarios using the db_setup
/// function.
///