fix: Move partition metadata types to data_types2

pull/24376/head
Carol (Nichols || Goulding) 2022-05-05 09:49:34 -04:00
parent afdff2b1db
commit d2671355c3
No known key found for this signature in database
GPG Key ID: E907EE5A736F87D4
22 changed files with 1220 additions and 113 deletions

2
Cargo.lock generated
View File

@ -1174,6 +1174,7 @@ version = "0.1.0"
dependencies = [ dependencies = [
"data_types", "data_types",
"influxdb_line_protocol", "influxdb_line_protocol",
"observability_deps",
"ordered-float 3.0.0", "ordered-float 3.0.0",
"percent-encoding", "percent-encoding",
"schema", "schema",
@ -4522,6 +4523,7 @@ dependencies = [
"criterion", "criterion",
"croaring", "croaring",
"data_types", "data_types",
"data_types2",
"datafusion 0.1.0", "datafusion 0.1.0",
"either", "either",
"hashbrown 0.12.0", "hashbrown 0.12.0",

View File

@ -7,6 +7,7 @@ description = "Shared data types in the IOx NG architecture"
[dependencies] [dependencies]
data_types = { path = "../data_types" } data_types = { path = "../data_types" }
influxdb_line_protocol = { path = "../influxdb_line_protocol" } influxdb_line_protocol = { path = "../influxdb_line_protocol" }
observability_deps = { path = "../observability_deps" }
ordered-float = "3" ordered-float = "3"
percent-encoding = "2.1.0" percent-encoding = "2.1.0"
schema = { path = "../schema" } schema = { path = "../schema" }

File diff suppressed because it is too large Load Diff

View File

@ -12,11 +12,10 @@
)] )]
use data_types::{ use data_types::{
partition_metadata::{StatValues, Statistics},
router::{ShardConfig, ShardId}, router::{ShardConfig, ShardId},
sequence::Sequence, sequence::Sequence,
}; };
use data_types2::{DeletePredicate, NonEmptyString}; use data_types2::{DeletePredicate, NonEmptyString, StatValues, Statistics};
use hashbrown::HashMap; use hashbrown::HashMap;
use iox_time::Time; use iox_time::Time;
use mutable_batch::MutableBatch; use mutable_batch::MutableBatch;

View File

@ -1,23 +1,18 @@
//! A [`Column`] stores the rows for a given column name //! A [`Column`] stores the rows for a given column name
use std::fmt::Formatter;
use std::mem;
use std::sync::Arc;
use arrow::error::ArrowError;
use arrow::{ use arrow::{
array::{ array::{
ArrayDataBuilder, ArrayRef, BooleanArray, Float64Array, Int64Array, ArrayDataBuilder, ArrayRef, BooleanArray, Float64Array, Int64Array,
TimestampNanosecondArray, UInt64Array, TimestampNanosecondArray, UInt64Array,
}, },
datatypes::DataType, datatypes::DataType,
error::ArrowError,
}; };
use snafu::{ResultExt, Snafu}; use arrow_util::{bitset::BitSet, string::PackedStringArray};
use data_types2::{StatValues, Statistics};
use arrow_util::bitset::BitSet;
use arrow_util::string::PackedStringArray;
use data_types::partition_metadata::{StatValues, Statistics};
use schema::{InfluxColumnType, InfluxFieldType, TIME_DATA_TYPE}; use schema::{InfluxColumnType, InfluxFieldType, TIME_DATA_TYPE};
use snafu::{ResultExt, Snafu};
use std::{fmt::Formatter, mem, sync::Arc};
/// A "dictionary ID" (DID) is a compact numeric representation of an interned /// A "dictionary ID" (DID) is a compact numeric representation of an interned
/// string in the dictionary. The same string always maps the same DID. /// string in the dictionary. The same string always maps the same DID.

View File

@ -3,7 +3,7 @@
use crate::column::{Column, ColumnData, INVALID_DID}; use crate::column::{Column, ColumnData, INVALID_DID};
use crate::MutableBatch; use crate::MutableBatch;
use arrow_util::bitset::{iter_set_positions, iter_set_positions_with_offset, BitSet}; use arrow_util::bitset::{iter_set_positions, iter_set_positions_with_offset, BitSet};
use data_types::partition_metadata::{IsNan, StatValues, Statistics}; use data_types2::{IsNan, StatValues, Statistics};
use schema::{InfluxColumnType, InfluxFieldType}; use schema::{InfluxColumnType, InfluxFieldType};
use snafu::Snafu; use snafu::Snafu;
use std::num::NonZeroU64; use std::num::NonZeroU64;

View File

@ -1,10 +1,8 @@
use arrow_util::assert_batches_eq; use arrow_util::assert_batches_eq;
use data_types::partition_metadata::{StatValues, Statistics}; use data_types2::{StatValues, Statistics};
use mutable_batch::writer::Writer; use mutable_batch::{writer::Writer, MutableBatch};
use mutable_batch::MutableBatch;
use schema::selection::Selection; use schema::selection::Selection;
use std::collections::BTreeMap; use std::{collections::BTreeMap, num::NonZeroU64};
use std::num::NonZeroU64;
#[test] #[test]
fn test_extend() { fn test_extend() {

View File

@ -1,10 +1,8 @@
use arrow_util::assert_batches_eq; use arrow_util::assert_batches_eq;
use data_types::partition_metadata::{StatValues, Statistics}; use data_types2::{StatValues, Statistics};
use mutable_batch::writer::Writer; use mutable_batch::{writer::Writer, MutableBatch};
use mutable_batch::MutableBatch;
use schema::selection::Selection; use schema::selection::Selection;
use std::collections::BTreeMap; use std::{collections::BTreeMap, num::NonZeroU64};
use std::num::NonZeroU64;
#[test] #[test]
fn test_extend_range() { fn test_extend_range() {

View File

@ -1,6 +1,6 @@
use arrow_util::assert_batches_eq; use arrow_util::assert_batches_eq;
use data_types::partition_metadata::{StatValues, Statistics};
use data_types::write_summary::TimestampSummary; use data_types::write_summary::TimestampSummary;
use data_types2::{StatValues, Statistics};
use mutable_batch::writer::Writer; use mutable_batch::writer::Writer;
use mutable_batch::MutableBatch; use mutable_batch::MutableBatch;
use schema::selection::Selection; use schema::selection::Selection;

View File

@ -1,3 +1,16 @@
use arrow::{
array::{
ArrayRef, BooleanArray, Float64Array, Int64Array, StringArray, TimestampNanosecondArray,
UInt64Array,
},
record_batch::RecordBatch,
};
use arrow_util::bitset::BitSet;
use data_types2::{IsNan, PartitionTemplate, StatValues, Statistics, TemplatePart};
use hashbrown::HashSet;
use mutable_batch::{writer::Writer, MutableBatch, PartitionWrite, WritePayload};
use rand::prelude::*;
use schema::selection::Selection;
/// A fuzz test of the [`mutable_batch::Writer`] interface: /// A fuzz test of the [`mutable_batch::Writer`] interface:
/// ///
/// - column writes - `write_i64`, `write_tag`, etc... /// - column writes - `write_i64`, `write_tag`, etc...
@ -5,25 +18,7 @@
/// - batch writes with ranges - `write_batch_ranges` /// - batch writes with ranges - `write_batch_ranges`
/// ///
/// Verifies that the rows and statistics are as expected after a number of interleaved writes /// Verifies that the rows and statistics are as expected after a number of interleaved writes
use std::collections::BTreeMap; use std::{collections::BTreeMap, num::NonZeroU64, ops::Range, sync::Arc};
use std::num::NonZeroU64;
use std::ops::Range;
use std::sync::Arc;
use arrow::array::{
ArrayRef, BooleanArray, Float64Array, Int64Array, StringArray, TimestampNanosecondArray,
UInt64Array,
};
use arrow::record_batch::RecordBatch;
use hashbrown::HashSet;
use rand::prelude::*;
use arrow_util::bitset::BitSet;
use data_types::partition_metadata::{IsNan, StatValues, Statistics};
use data_types2::{PartitionTemplate, TemplatePart};
use mutable_batch::writer::Writer;
use mutable_batch::{MutableBatch, PartitionWrite, WritePayload};
use schema::selection::Selection;
fn make_rng() -> StdRng { fn make_rng() -> StdRng {
let seed = rand::rngs::OsRng::default().next_u64(); let seed = rand::rngs::OsRng::default().next_u64();

View File

@ -3,17 +3,14 @@ use crate::{
storage::Storage, storage::Storage,
ParquetFilePath, ParquetFilePath,
}; };
use data_types::{ use data_types2::{
partition_metadata::{Statistics, TableSummary}, ParquetFile, ParquetFileWithMetadata, Statistics, TableSummary, TimestampMinMax, TimestampRange,
timestamp::{TimestampMinMax, TimestampRange},
}; };
use data_types2::{ParquetFile, ParquetFileWithMetadata};
use datafusion::physical_plan::SendableRecordBatchStream; use datafusion::physical_plan::SendableRecordBatchStream;
use object_store::DynObjectStore; use object_store::DynObjectStore;
use observability_deps::tracing::*; use observability_deps::tracing::*;
use predicate::Predicate; use predicate::Predicate;
use schema::selection::Selection; use schema::{selection::Selection, Schema, TIME_COLUMN_NAME};
use schema::{Schema, TIME_COLUMN_NAME};
use snafu::{ResultExt, Snafu}; use snafu::{ResultExt, Snafu};
use std::{collections::BTreeSet, mem, sync::Arc}; use std::{collections::BTreeSet, mem, sync::Arc};

View File

@ -86,9 +86,9 @@
//! [Apache Parquet]: https://parquet.apache.org/ //! [Apache Parquet]: https://parquet.apache.org/
//! [Apache Thrift]: https://thrift.apache.org/ //! [Apache Thrift]: https://thrift.apache.org/
//! [Thrift Compact Protocol]: https://github.com/apache/thrift/blob/master/doc/specs/thrift-compact-protocol.md //! [Thrift Compact Protocol]: https://github.com/apache/thrift/blob/master/doc/specs/thrift-compact-protocol.md
use data_types::partition_metadata::{ColumnSummary, InfluxDbType, StatValues, Statistics};
use data_types2::{ use data_types2::{
NamespaceId, ParquetFileParams, PartitionId, SequenceNumber, SequencerId, TableId, Timestamp, ColumnSummary, InfluxDbType, NamespaceId, ParquetFileParams, PartitionId, SequenceNumber,
SequencerId, StatValues, Statistics, TableId, Timestamp,
}; };
use generated_types::influxdata::iox::ingester::v1 as proto; use generated_types::influxdata::iox::ingester::v1 as proto;
use iox_time::Time; use iox_time::Time;
@ -106,8 +106,10 @@ use parquet::{
schema::types::SchemaDescriptor as ParquetSchemaDescriptor, schema::types::SchemaDescriptor as ParquetSchemaDescriptor,
}; };
use prost::Message; use prost::Message;
use schema::sort::{SortKey, SortKeyBuilder}; use schema::{
use schema::{InfluxColumnType, InfluxFieldType, Schema}; sort::{SortKey, SortKeyBuilder},
InfluxColumnType, InfluxFieldType, Schema,
};
use snafu::{ensure, OptionExt, ResultExt, Snafu}; use snafu::{ensure, OptionExt, ResultExt, Snafu};
use std::{convert::TryInto, sync::Arc}; use std::{convert::TryInto, sync::Arc};
use thrift::protocol::{TCompactInputProtocol, TCompactOutputProtocol, TOutputProtocol}; use thrift::protocol::{TCompactInputProtocol, TCompactOutputProtocol, TOutputProtocol};

View File

@ -1,7 +1,8 @@
use std::sync::Arc; use std::sync::Arc;
use data_types::timestamp::TimestampMinMax; use data_types2::{
use data_types2::{ChunkAddr, ChunkId, ChunkOrder, DeletePredicate, PartitionId, TableSummary}; ChunkAddr, ChunkId, ChunkOrder, DeletePredicate, PartitionId, TableSummary, TimestampMinMax,
};
use observability_deps::tracing::debug; use observability_deps::tracing::debug;
use predicate::PredicateMatch; use predicate::PredicateMatch;
use query::{QueryChunk, QueryChunkError, QueryChunkMeta}; use query::{QueryChunk, QueryChunkError, QueryChunkMeta};

View File

@ -1,21 +1,18 @@
//! Contains the algorithm to determine which chunks may contain //! Contains the algorithm to determine which chunks may contain "duplicate" primary keys (that is
//! "duplicate" primary keys (that is where data with the same //! where data with the same combination of "tag" columns and timestamp in the InfluxDB DataModel
//! combination of "tag" columns and timestamp in the InfluxDB //! have been written in via multiple distinct line protocol writes (and thus are stored in
//! DataModel have been written in via multiple distinct line protocol //! separate rows)
//! writes (and thus are stored in separate rows)
use data_types::{ use crate::{QueryChunk, QueryChunkMeta};
partition_metadata::{ColumnSummary, StatOverlap, Statistics}, use data_types2::{
timestamp::TimestampMinMax, ColumnSummary, DeletePredicate, ParquetFileWithMetadata, PartitionId, StatOverlap, Statistics,
TableSummary, TimestampMinMax,
}; };
use data_types2::{DeletePredicate, ParquetFileWithMetadata, PartitionId, TableSummary};
use observability_deps::tracing::debug; use observability_deps::tracing::debug;
use schema::{sort::SortKey, Schema, TIME_COLUMN_NAME}; use schema::{sort::SortKey, Schema, TIME_COLUMN_NAME};
use snafu::Snafu; use snafu::Snafu;
use std::{cmp::Ordering, sync::Arc}; use std::{cmp::Ordering, sync::Arc};
use crate::{QueryChunk, QueryChunkMeta};
#[derive(Debug, Snafu)] #[derive(Debug, Snafu)]
pub enum Error { pub enum Error {
#[snafu(display( #[snafu(display(

View File

@ -1,9 +1,9 @@
//! Implementation of a DataFusion PhysicalPlan node across partition chunks //! Implementation of a DataFusion PhysicalPlan node across partition chunks
use std::{fmt, sync::Arc}; use super::adapter::SchemaAdapterStream;
use crate::{exec::IOxSessionContext, QueryChunk};
use arrow::datatypes::SchemaRef; use arrow::datatypes::SchemaRef;
use data_types::partition_metadata::TableSummary; use data_types2::TableSummary;
use datafusion::{ use datafusion::{
error::DataFusionError, error::DataFusionError,
execution::context::TaskContext, execution::context::TaskContext,
@ -14,13 +14,9 @@ use datafusion::{
}, },
}; };
use observability_deps::tracing::debug; use observability_deps::tracing::debug;
use schema::selection::Selection;
use schema::Schema;
use crate::{exec::IOxSessionContext, QueryChunk};
use predicate::Predicate; use predicate::Predicate;
use schema::{selection::Selection, Schema};
use super::adapter::SchemaAdapterStream; use std::{fmt, sync::Arc};
/// Implements the DataFusion physical plan interface /// Implements the DataFusion physical plan interface
#[derive(Debug)] #[derive(Debug)]

View File

@ -1,23 +1,22 @@
//! Implementation of statistics based pruning //! Implementation of statistics based pruning
use std::sync::Arc; use crate::QueryChunk;
use arrow::{
use arrow::array::{ array::{
ArrayRef, BooleanArray, DictionaryArray, Float64Array, Int64Array, StringArray, UInt64Array, ArrayRef, BooleanArray, DictionaryArray, Float64Array, Int64Array, StringArray, UInt64Array,
},
datatypes::{DataType, Int32Type, TimeUnit},
}; };
use arrow::datatypes::{DataType, Int32Type, TimeUnit}; use data_types2::{StatValues, Statistics};
use data_types::partition_metadata::{StatValues, Statistics};
use datafusion::{ use datafusion::{
logical_plan::Column, logical_plan::Column,
physical_optimizer::pruning::{PruningPredicate, PruningStatistics}, physical_optimizer::pruning::{PruningPredicate, PruningStatistics},
}; };
use observability_deps::tracing::{debug, trace}; use observability_deps::tracing::{debug, trace};
use predicate::Predicate; use predicate::Predicate;
use schema::Schema;
use crate::QueryChunk;
use query_functions::group_by::Aggregate; use query_functions::group_by::Aggregate;
use schema::Schema;
use std::sync::Arc;
/// Something that cares to be notified when pruning of chunks occurs /// Something that cares to be notified when pruning of chunks occurs
pub trait PruningObserver { pub trait PruningObserver {

View File

@ -1,8 +1,6 @@
//! Code to translate IOx statistics to DataFusion statistics //! Code to translate IOx statistics to DataFusion statistics
use data_types::partition_metadata::{ use data_types2::{ColumnSummary, InfluxDbType, Statistics as IOxStatistics, TableSummary};
ColumnSummary, InfluxDbType, Statistics as IOxStatistics, TableSummary,
};
use datafusion::{ use datafusion::{
physical_plan::{ColumnStatistics, Statistics as DFStatistics}, physical_plan::{ColumnStatistics, Statistics as DFStatistics},
scalar::ScalarValue, scalar::ScalarValue,
@ -107,11 +105,10 @@ fn df_from_iox_col(col: &ColumnSummary) -> ColumnStatistics {
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use std::num::NonZeroU64;
use super::*; use super::*;
use data_types::partition_metadata::{InfluxDbType, StatValues}; use data_types2::{InfluxDbType, StatValues};
use schema::{builder::SchemaBuilder, InfluxFieldType}; use schema::{builder::SchemaBuilder, InfluxFieldType};
use std::num::NonZeroU64;
macro_rules! assert_nice_eq { macro_rules! assert_nice_eq {
($actual:ident, $expected:ident) => { ($actual:ident, $expected:ident) => {

View File

@ -283,7 +283,7 @@ impl TestChunk {
Self { Self {
table_name, table_name,
schema: Arc::new(SchemaBuilder::new().build().unwrap()), schema: Arc::new(SchemaBuilder::new().build().unwrap()),
table_summary: TableSummary::new(), table_summary: TableSummary::default(),
id: ChunkId::new_test(0), id: ChunkId::new_test(0),
may_contain_pk_duplicates: Default::default(), may_contain_pk_duplicates: Default::default(),
predicates: Default::default(), predicates: Default::default(),

View File

@ -15,6 +15,7 @@ arrow = { version = "13", features = ["prettyprint"] }
arrow_util = { path = "../arrow_util" } arrow_util = { path = "../arrow_util" }
croaring = "0.6" croaring = "0.6"
data_types = { path = "../data_types" } data_types = { path = "../data_types" }
data_types2 = { path = "../data_types2" }
datafusion = { path = "../datafusion" } datafusion = { path = "../datafusion" }
either = "1.6.1" either = "1.6.1"
hashbrown = "0.12" hashbrown = "0.12"

View File

@ -5,11 +5,10 @@ use crate::{
table::{self, Table}, table::{self, Table},
}; };
use arrow::{error::ArrowError, record_batch::RecordBatch}; use arrow::{error::ArrowError, record_batch::RecordBatch};
use data_types::{chunk_metadata::ChunkColumnSummary, partition_metadata::TableSummary}; use data_types::chunk_metadata::ChunkColumnSummary;
use data_types2::TableSummary;
use observability_deps::tracing::debug; use observability_deps::tracing::debug;
use schema::selection::Selection; use schema::{builder::Error as SchemaError, selection::Selection, Schema};
use schema::{builder::Error as SchemaError, Schema};
use snafu::{ResultExt, Snafu}; use snafu::{ResultExt, Snafu};
use std::{ use std::{
collections::{BTreeMap, BTreeSet}, collections::{BTreeMap, BTreeSet},
@ -527,11 +526,10 @@ mod test {
Int32Type, Int32Type,
}, },
}; };
use data_types::partition_metadata::{ColumnSummary, InfluxDbType, StatValues, Statistics}; use data_types2::{ColumnSummary, InfluxDbType, StatValues, Statistics};
use metric::{Attributes, MetricKind, Observation, ObservationSet, RawReporter}; use metric::{Attributes, MetricKind, Observation, ObservationSet, RawReporter};
use schema::builder::SchemaBuilder; use schema::builder::SchemaBuilder;
use std::iter::FromIterator; use std::{iter::FromIterator, num::NonZeroU64, sync::Arc};
use std::{num::NonZeroU64, sync::Arc};
// helper to make the `add_remove_tables` test simpler to read. // helper to make the `add_remove_tables` test simpler to read.
fn gen_recordbatch() -> RecordBatch { fn gen_recordbatch() -> RecordBatch {

View File

@ -241,8 +241,8 @@ impl ColumnType {
} }
} }
pub fn as_influxdb_type(&self) -> Option<data_types::partition_metadata::InfluxDbType> { pub fn as_influxdb_type(&self) -> Option<data_types2::InfluxDbType> {
use data_types::partition_metadata::InfluxDbType; use data_types2::InfluxDbType;
match self { match self {
Self::Tag(_) => Some(InfluxDbType::Tag), Self::Tag(_) => Some(InfluxDbType::Tag),
Self::Field(_) => Some(InfluxDbType::Field), Self::Field(_) => Some(InfluxDbType::Field),

View File

@ -6,7 +6,8 @@ use crate::{
BinaryExpr, BinaryExpr,
}; };
use arrow::record_batch::RecordBatch; use arrow::record_batch::RecordBatch;
use data_types::{chunk_metadata::ChunkColumnSummary, partition_metadata::TableSummary}; use data_types::chunk_metadata::ChunkColumnSummary;
use data_types2::TableSummary;
use parking_lot::RwLock; use parking_lot::RwLock;
use schema::selection::Selection; use schema::selection::Selection;
use snafu::{ensure, Snafu}; use snafu::{ensure, Snafu};
@ -782,7 +783,7 @@ impl MetaData {
} }
pub fn to_summary(&self) -> TableSummary { pub fn to_summary(&self) -> TableSummary {
use data_types::partition_metadata::{ColumnSummary, StatValues, Statistics}; use data_types2::{ColumnSummary, StatValues, Statistics};
let columns = self let columns = self
.columns .columns
.iter() .iter()
@ -850,8 +851,8 @@ impl MetaData {
fn make_null_stats( fn make_null_stats(
total_count: u64, total_count: u64,
logical_data_type: &LogicalDataType, logical_data_type: &LogicalDataType,
) -> data_types::partition_metadata::Statistics { ) -> data_types2::Statistics {
use data_types::partition_metadata::{StatValues, Statistics}; use data_types2::{StatValues, Statistics};
use LogicalDataType::*; use LogicalDataType::*;
match logical_data_type { match logical_data_type {
@ -1103,9 +1104,6 @@ impl std::fmt::Display for DisplayReadAggregateResults<'_> {
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use arrow::array::BooleanArray;
use data_types::partition_metadata::{StatValues, Statistics};
use super::*; use super::*;
use crate::{ use crate::{
column::Column, column::Column,
@ -1113,6 +1111,8 @@ mod test {
schema::{self, LogicalDataType}, schema::{self, LogicalDataType},
value::{AggregateVec, OwnedValue, Scalar}, value::{AggregateVec, OwnedValue, Scalar},
}; };
use arrow::array::BooleanArray;
use data_types2::{StatValues, Statistics};
#[test] #[test]
fn meta_data_update_with() { fn meta_data_update_with() {