diff --git a/Cargo.lock b/Cargo.lock index f96426c444..661f94a1d4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -747,11 +747,7 @@ dependencies = [ name = "data_types" version = "0.1.0" dependencies = [ - "arrow_deps", "chrono", - "crc32fast", - "criterion", - "flatbuffers 0.6.1", "generated_types", "influxdb_line_protocol", "percent-encoding", @@ -1429,6 +1425,7 @@ dependencies = [ "influxdb_line_protocol", "influxdb_tsm", "ingest", + "internal_types", "logfmt", "mem_qe", "mutable_buffer", @@ -1516,10 +1513,10 @@ name = "ingest" version = "0.1.0" dependencies = [ "arrow_deps", - "data_types", "flate2", "influxdb_line_protocol", "influxdb_tsm", + "internal_types", "packers", "parking_lot", "snafu", @@ -1542,6 +1539,22 @@ version = "1.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "48dc51180a9b377fd75814d0cc02199c20f8e99433d6762f650d39cdbbd3b56f" +[[package]] +name = "internal_types" +version = "0.1.0" +dependencies = [ + "arrow_deps", + "chrono", + "crc32fast", + "criterion", + "data_types", + "flatbuffers 0.6.1", + "generated_types", + "influxdb_line_protocol", + "snafu", + "tracing", +] + [[package]] name = "ipnet" version = "2.3.0" @@ -1833,6 +1846,7 @@ dependencies = [ "flatbuffers 0.6.1", "generated_types", "influxdb_line_protocol", + "internal_types", "snafu", "string-interner", "test_helpers", @@ -2151,9 +2165,9 @@ name = "packers" version = "0.1.0" dependencies = [ "arrow_deps", - "data_types", "human_format", "influxdb_tsm", + "internal_types", "rand 0.8.3", "snafu", "test_helpers", @@ -2507,6 +2521,7 @@ dependencies = [ "data_types", "futures", "influxdb_line_protocol", + "internal_types", "parking_lot", "snafu", "sqlparser", @@ -2654,9 +2669,9 @@ dependencies = [ "arrow_deps", "criterion", "croaring", - "data_types", "either", "hashbrown", + "internal_types", "itertools 0.9.0", "packers", "permutation", @@ -3125,6 +3140,7 @@ dependencies = [ "generated_types", "hashbrown", "influxdb_line_protocol", + "internal_types", "mutable_buffer", "object_store", "parking_lot", diff --git a/Cargo.toml b/Cargo.toml index 5b70053332..89dc2b5d29 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,6 +4,7 @@ version = "0.1.0" authors = ["Paul Dix "] edition = "2018" default-run = "influxdb_iox" +readme = "README.md" [workspace] # In alphabetical order members = [ @@ -16,6 +17,7 @@ members = [ "influxdb_tsm", "influxdb2_client", "ingest", + "internal_types", "logfmt", "mem_qe", "mutable_buffer", @@ -43,6 +45,7 @@ generated_types = { path = "generated_types" } influxdb_iox_client = { path = "influxdb_iox_client", features = ["format"] } influxdb_line_protocol = { path = "influxdb_line_protocol" } influxdb_tsm = { path = "influxdb_tsm" } +internal_types = { path = "internal_types" } ingest = { path = "ingest" } logfmt = { path = "logfmt" } mem_qe = { path = "mem_qe" } diff --git a/benches/line_protocol_to_parquet.rs b/benches/line_protocol_to_parquet.rs index bc416d6980..93160bf438 100644 --- a/benches/line_protocol_to_parquet.rs +++ b/benches/line_protocol_to_parquet.rs @@ -1,5 +1,4 @@ use criterion::{criterion_group, criterion_main, Criterion, Throughput}; -use data_types::schema::Schema; use influxdb_line_protocol::parse_lines; use ingest::{ parquet::{ @@ -8,6 +7,7 @@ use ingest::{ }, ConversionSettings, LineProtocolConverter, }; +use internal_types::schema::Schema; use packers::{Error as TableError, IOxTableWriter, IOxTableWriterSource}; use std::time::Duration; diff --git a/data_types/Cargo.toml b/data_types/Cargo.toml index f30aba1fc5..8c99f52135 100644 --- a/data_types/Cargo.toml +++ b/data_types/Cargo.toml @@ -2,13 +2,12 @@ name = "data_types" version = "0.1.0" authors = ["pauldix "] +description = "InfluxDB IOx data_types, shared between IOx instances and IOx clients" edition = "2018" +readme = "README.md" [dependencies] # In alphabetical order -arrow_deps = { path = "../arrow_deps" } chrono = { version = "0.4", features = ["serde"] } -crc32fast = "1.2.0" -flatbuffers = "0.6" # TODO: Update to 0.8 generated_types = { path = "../generated_types" } influxdb_line_protocol = { path = "../influxdb_line_protocol" } percent-encoding = "2.1.0" @@ -21,9 +20,4 @@ tonic = { version = "0.4.0" } tracing = "0.1" [dev-dependencies] # In alphabetical order -criterion = "0.3" test_helpers = { path = "../test_helpers" } - -[[bench]] -name = "benchmark" -harness = false diff --git a/data_types/README.md b/data_types/README.md new file mode 100644 index 0000000000..2911ec8088 --- /dev/null +++ b/data_types/README.md @@ -0,0 +1,5 @@ +# Data Types + +This crate contains types that are designed for external consumption (in `influxdb_iox_client` and other "client" facing uses). + +*Client facing* in this case means exposed via management API or CLI and where changing the structs may require additional coordination / organization with clients. diff --git a/data_types/src/lib.rs b/data_types/src/lib.rs index 574feb3c27..1d0333bf34 100644 --- a/data_types/src/lib.rs +++ b/data_types/src/lib.rs @@ -11,26 +11,14 @@ )] pub use database_name::*; -pub use schema::TIME_COLUMN_NAME; - -/// The name of the column containing table names returned by a call to -/// `table_names`. -pub const TABLE_NAMES_COLUMN_NAME: &str = "table"; - -/// The name of the column containing column names returned by a call to -/// `column_names`. -pub const COLUMN_NAMES_COLUMN_NAME: &str = "column"; pub mod chunk; -pub mod data; pub mod database_rules; pub mod error; pub mod http; pub mod job; pub mod names; pub mod partition_metadata; -pub mod schema; -pub mod selection; pub mod timestamp; pub mod wal; diff --git a/ingest/Cargo.toml b/ingest/Cargo.toml index b43ce27c94..743a283a24 100644 --- a/ingest/Cargo.toml +++ b/ingest/Cargo.toml @@ -6,9 +6,9 @@ edition = "2018" [dependencies] # In alphabetical order arrow_deps = { path = "../arrow_deps" } -data_types = { path = "../data_types" } influxdb_line_protocol = { path = "../influxdb_line_protocol" } influxdb_tsm = { path = "../influxdb_tsm" } +internal_types = { path = "../internal_types" } packers = { path = "../packers" } snafu = "0.6.2" tracing = "0.1" diff --git a/ingest/src/lib.rs b/ingest/src/lib.rs index 637bdf7c58..a07b7187d5 100644 --- a/ingest/src/lib.rs +++ b/ingest/src/lib.rs @@ -11,16 +11,15 @@ clippy::clone_on_ref_ptr )] -use data_types::{ - schema::{builder::InfluxSchemaBuilder, InfluxFieldType, Schema}, - TIME_COLUMN_NAME, -}; use influxdb_line_protocol::{FieldValue, ParsedLine}; use influxdb_tsm::{ mapper::{ColumnData, MeasurementTable, TSMMeasurementMapper}, reader::{BlockDecoder, TSMBlockReader, TSMIndexReader}, BlockType, TSMError, }; +use internal_types::schema::{ + builder::InfluxSchemaBuilder, InfluxFieldType, Schema, TIME_COLUMN_NAME, +}; use packers::{ ByteArray, Error as TableError, IOxTableWriter, IOxTableWriterSource, Packer, Packers, }; @@ -75,7 +74,7 @@ pub enum Error { #[snafu(display(r#"Error building schema: {}"#, source))] BuildingSchema { - source: data_types::schema::builder::Error, + source: internal_types::schema::builder::Error, }, #[snafu(display(r#"Error writing to TableWriter: {}"#, source))] @@ -96,8 +95,8 @@ pub enum Error { CouldNotFindColumn, } -impl From for Error { - fn from(source: data_types::schema::builder::Error) -> Self { +impl From for Error { + fn from(source: internal_types::schema::builder::Error) -> Self { Self::BuildingSchema { source } } } @@ -820,7 +819,8 @@ impl TSMFileConverter { mut block_reader: impl BlockDecoder, m: &mut MeasurementTable, ) -> Result<(Schema, Vec), Error> { - let mut builder = data_types::schema::builder::SchemaBuilder::new().measurement(&m.name); + let mut builder = + internal_types::schema::builder::SchemaBuilder::new().measurement(&m.name); let mut packed_columns: Vec = Vec::new(); let mut tks = Vec::new(); @@ -1099,11 +1099,11 @@ impl std::fmt::Debug for TSMFileConverter { #[cfg(test)] mod tests { use super::*; - use data_types::{assert_column_eq, schema::InfluxColumnType}; use influxdb_tsm::{ reader::{BlockData, MockBlockDecoder}, Block, }; + use internal_types::{assert_column_eq, schema::InfluxColumnType}; use packers::{Error as TableError, IOxTableWriter, IOxTableWriterSource, Packers}; use test_helpers::approximately_equal; diff --git a/ingest/src/parquet/writer.rs b/ingest/src/parquet/writer.rs index cfbfaa3f65..ab5046fd7c 100644 --- a/ingest/src/parquet/writer.rs +++ b/ingest/src/parquet/writer.rs @@ -9,7 +9,7 @@ use arrow_deps::parquet::{ }, schema::types::{ColumnPath, Type}, }; -use data_types::schema::{InfluxColumnType, InfluxFieldType, Schema}; +use internal_types::schema::{InfluxColumnType, InfluxFieldType, Schema}; use parquet::file::writer::ParquetWriter; use snafu::{OptionExt, ResultExt, Snafu}; use std::{ @@ -97,7 +97,7 @@ where /// /// ``` /// # use std::fs; - /// # use data_types::schema::{builder::SchemaBuilder, InfluxFieldType}; + /// # use internal_types::schema::{builder::SchemaBuilder, InfluxFieldType}; /// # use packers::IOxTableWriter; /// # use packers::{Packer, Packers}; /// # use ingest::parquet::writer::{IOxParquetTableWriter, CompressionLevel}; @@ -505,7 +505,7 @@ fn create_writer_props( #[cfg(test)] mod tests { - use data_types::schema::builder::SchemaBuilder; + use internal_types::schema::builder::SchemaBuilder; use super::*; diff --git a/ingest/tests/read_write.rs b/ingest/tests/read_write.rs index 9c4f0aa78e..5ecf815291 100644 --- a/ingest/tests/read_write.rs +++ b/ingest/tests/read_write.rs @@ -1,5 +1,5 @@ -use data_types::schema::{builder::SchemaBuilder, InfluxFieldType}; use ingest::parquet::writer::{CompressionLevel, IOxParquetTableWriter}; +use internal_types::schema::{builder::SchemaBuilder, InfluxFieldType}; use packers::{IOxTableWriter, Packer, Packers}; use arrow_deps::parquet::data_type::ByteArray; diff --git a/internal_types/Cargo.toml b/internal_types/Cargo.toml new file mode 100644 index 0000000000..799f27a2d1 --- /dev/null +++ b/internal_types/Cargo.toml @@ -0,0 +1,27 @@ +[package] +name = "internal_types" +version = "0.1.0" +authors = ["Andrew Lamb "] +edition = "2018" +description = "InfluxDB IOx internal types, shared between IOx instances" +readme = "README.md" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +arrow_deps = { path = "../arrow_deps" } +crc32fast = "1.2.0" +chrono = { version = "0.4", features = ["serde"] } +data_types = { path = "../data_types" } +flatbuffers = "0.6" # TODO: Update to 0.8 +generated_types = { path = "../generated_types" } +influxdb_line_protocol = { path = "../influxdb_line_protocol" } +snafu = "0.6" +tracing = "0.1" + +[dev-dependencies] # In alphabetical order +criterion = "0.3" + +[[bench]] +name = "benchmark" +harness = false diff --git a/internal_types/README.md b/internal_types/README.md new file mode 100644 index 0000000000..5ee9876ad8 --- /dev/null +++ b/internal_types/README.md @@ -0,0 +1,7 @@ +# Internal Types + +This crate contains InfluxDB IOx "internal" types which are shared +across crates and internally between IOx instances, but not exposed +externally to clients + +*Internal* in this case means that changing the structs is designed not to require additional coordination / organization with clients. diff --git a/data_types/benches/benchmark.rs b/internal_types/benches/benchmark.rs similarity index 99% rename from data_types/benches/benchmark.rs rename to internal_types/benches/benchmark.rs index 9914d41e57..7ec4db8a32 100644 --- a/data_types/benches/benchmark.rs +++ b/internal_types/benches/benchmark.rs @@ -1,9 +1,9 @@ use criterion::measurement::WallTime; use criterion::{criterion_group, criterion_main, Bencher, BenchmarkId, Criterion, Throughput}; -use data_types::data::{lines_to_replicated_write as lines_to_rw, ReplicatedWrite}; use data_types::database_rules::{DatabaseRules, PartitionTemplate, TemplatePart}; use generated_types::wal as wb; use influxdb_line_protocol::{parse_lines, ParsedLine}; +use internal_types::data::{lines_to_replicated_write as lines_to_rw, ReplicatedWrite}; use std::collections::{BTreeMap, BTreeSet}; use std::fmt; use std::time::Duration; diff --git a/data_types/src/data.rs b/internal_types/src/data.rs similarity index 99% rename from data_types/src/data.rs rename to internal_types/src/data.rs index 20bd1c66b1..d7457d0f2e 100644 --- a/data_types/src/data.rs +++ b/internal_types/src/data.rs @@ -1,8 +1,8 @@ //! This module contains helper methods for constructing replicated writes //! based on `DatabaseRules`. -use crate::database_rules::Partitioner; -use crate::TIME_COLUMN_NAME; +use crate::schema::TIME_COLUMN_NAME; +use data_types::database_rules::Partitioner; use generated_types::wal as wb; use influxdb_line_protocol::{FieldValue, ParsedLine}; diff --git a/internal_types/src/lib.rs b/internal_types/src/lib.rs new file mode 100644 index 0000000000..ecaf7f6243 --- /dev/null +++ b/internal_types/src/lib.rs @@ -0,0 +1,11 @@ +#![deny(rust_2018_idioms)] +#![warn( + missing_debug_implementations, + clippy::explicit_iter_loop, + clippy::use_self, + clippy::clone_on_ref_ptr +)] + +pub mod data; +pub mod schema; +pub mod selection; diff --git a/data_types/src/schema.rs b/internal_types/src/schema.rs similarity index 100% rename from data_types/src/schema.rs rename to internal_types/src/schema.rs diff --git a/data_types/src/schema/builder.rs b/internal_types/src/schema/builder.rs similarity index 99% rename from data_types/src/schema/builder.rs rename to internal_types/src/schema/builder.rs index 8a04f044c5..8746d3ec93 100644 --- a/data_types/src/schema/builder.rs +++ b/internal_types/src/schema/builder.rs @@ -140,7 +140,7 @@ impl SchemaBuilder { /// schema validation happens at this time. /// ``` - /// use data_types::schema::{builder::SchemaBuilder, InfluxColumnType, InfluxFieldType}; + /// use internal_types::schema::{builder::SchemaBuilder, InfluxColumnType, InfluxFieldType}; /// /// let schema = SchemaBuilder::new() /// .tag("region") diff --git a/data_types/src/selection.rs b/internal_types/src/selection.rs similarity index 100% rename from data_types/src/selection.rs rename to internal_types/src/selection.rs diff --git a/mutable_buffer/Cargo.toml b/mutable_buffer/Cargo.toml index 638a62c2f9..efd29c249a 100644 --- a/mutable_buffer/Cargo.toml +++ b/mutable_buffer/Cargo.toml @@ -20,6 +20,7 @@ chrono = "0.4" data_types = { path = "../data_types" } flatbuffers = "0.6" # TODO: Update to 0.8 generated_types = { path = "../generated_types" } +internal_types = { path = "../internal_types" } influxdb_line_protocol = { path = "../influxdb_line_protocol" } snafu = "0.6.2" string-interner = "0.12.2" diff --git a/mutable_buffer/src/chunk.rs b/mutable_buffer/src/chunk.rs index 20713c8c01..6f4ebc1a27 100644 --- a/mutable_buffer/src/chunk.rs +++ b/mutable_buffer/src/chunk.rs @@ -9,7 +9,8 @@ use chrono::{DateTime, Utc}; use generated_types::wal as wb; use std::collections::{BTreeSet, HashMap}; -use data_types::{partition_metadata::TableSummary, schema::Schema, selection::Selection}; +use data_types::partition_metadata::TableSummary; +use internal_types::{schema::Schema, selection::Selection}; use crate::{ column::Column, diff --git a/mutable_buffer/src/column.rs b/mutable_buffer/src/column.rs index d10c6e2c78..20ceafc5d3 100644 --- a/mutable_buffer/src/column.rs +++ b/mutable_buffer/src/column.rs @@ -2,9 +2,9 @@ use generated_types::wal as wb; use snafu::Snafu; use crate::dictionary::Dictionary; -use data_types::{data::type_description, partition_metadata::StatValues}; - use arrow_deps::arrow::datatypes::DataType as ArrowDataType; +use data_types::partition_metadata::StatValues; +use internal_types::data::type_description; use std::mem; diff --git a/mutable_buffer/src/database.rs b/mutable_buffer/src/database.rs index 1e095f52ff..51fe91e796 100644 --- a/mutable_buffer/src/database.rs +++ b/mutable_buffer/src/database.rs @@ -1,8 +1,6 @@ -use data_types::{ - data::ReplicatedWrite, - database_rules::{PartitionSort, PartitionSortRules}, -}; +use data_types::database_rules::{PartitionSort, PartitionSortRules}; use generated_types::wal; +use internal_types::data::ReplicatedWrite; use crate::{chunk::Chunk, partition::Partition}; @@ -249,12 +247,10 @@ impl MutableBufferDb { mod tests { use super::*; use chrono::{DateTime, Utc}; - use data_types::{ - data::lines_to_replicated_write, database_rules::Partitioner, selection::Selection, - }; + use data_types::database_rules::{Order, Partitioner}; + use internal_types::{data::lines_to_replicated_write, selection::Selection}; use arrow_deps::arrow::array::{Array, StringArray}; - use data_types::database_rules::Order; use influxdb_line_protocol::{parse_lines, ParsedLine}; type TestError = Box; diff --git a/mutable_buffer/src/partition.rs b/mutable_buffer/src/partition.rs index f050199cae..92797c92ba 100644 --- a/mutable_buffer/src/partition.rs +++ b/mutable_buffer/src/partition.rs @@ -302,10 +302,8 @@ impl<'a> Iterator for ChunkIter<'a> { mod tests { use super::*; use chrono::Utc; - use data_types::{ - data::split_lines_into_write_entry_partitions, partition_metadata::PartitionSummary, - selection::Selection, - }; + use data_types::partition_metadata::PartitionSummary; + use internal_types::{data::split_lines_into_write_entry_partitions, selection::Selection}; use arrow_deps::{ arrow::record_batch::RecordBatch, assert_table_eq, test_util::sort_record_batch, diff --git a/mutable_buffer/src/pred.rs b/mutable_buffer/src/pred.rs index 869840e302..24a7db3074 100644 --- a/mutable_buffer/src/pred.rs +++ b/mutable_buffer/src/pred.rs @@ -10,7 +10,8 @@ use arrow_deps::{ }, util::{make_range_expr, AndExprBuilder}, }; -use data_types::{timestamp::TimestampRange, TIME_COLUMN_NAME}; +use data_types::timestamp::TimestampRange; +use internal_types::schema::TIME_COLUMN_NAME; //use snafu::{OptionExt, ResultExt, Snafu}; use snafu::{ensure, ResultExt, Snafu}; diff --git a/mutable_buffer/src/table.rs b/mutable_buffer/src/table.rs index 43d275ab6f..e10d76d09f 100644 --- a/mutable_buffer/src/table.rs +++ b/mutable_buffer/src/table.rs @@ -12,12 +12,12 @@ use crate::{ dictionary::{Dictionary, Error as DictionaryError}, pred::{ChunkIdSet, ChunkPredicate}, }; -use data_types::{ - partition_metadata::{ColumnSummary, Statistics}, - schema::{builder::SchemaBuilder, Schema}, +use data_types::partition_metadata::{ColumnSummary, Statistics}; +use internal_types::{ + schema::{builder::SchemaBuilder, Schema, TIME_COLUMN_NAME}, selection::Selection, - TIME_COLUMN_NAME, }; + use snafu::{OptionExt, ResultExt, Snafu}; use arrow_deps::{ @@ -84,7 +84,7 @@ pub enum Error { #[snafu(display("Internal error converting schema: {}", source))] InternalSchema { - source: data_types::schema::builder::Error, + source: internal_types::schema::builder::Error, }, #[snafu(display( @@ -597,8 +597,8 @@ impl<'a> TableColSelection<'a> { #[cfg(test)] mod tests { - use data_types::data::split_lines_into_write_entry_partitions; use influxdb_line_protocol::{parse_lines, ParsedLine}; + use internal_types::data::split_lines_into_write_entry_partitions; use super::*; diff --git a/packers/Cargo.toml b/packers/Cargo.toml index fa9b92124c..627ac781b6 100644 --- a/packers/Cargo.toml +++ b/packers/Cargo.toml @@ -6,9 +6,9 @@ edition = "2018" [dependencies] # In alphabetical order arrow_deps = { path = "../arrow_deps" } -data_types = { path = "../data_types" } human_format = "1.0.3" influxdb_tsm = { path = "../influxdb_tsm" } +internal_types = { path = "../internal_types" } snafu = "0.6.2" tracing = "0.1" diff --git a/packers/src/lib.rs b/packers/src/lib.rs index e3197acf80..5a82ec9bdb 100644 --- a/packers/src/lib.rs +++ b/packers/src/lib.rs @@ -15,7 +15,7 @@ use snafu::Snafu; pub use crate::packers::{Packer, Packers}; pub use arrow_deps::parquet::data_type::ByteArray; -use data_types::schema::Schema; +use internal_types::schema::Schema; use std::borrow::Cow; diff --git a/packers/src/packers.rs b/packers/src/packers.rs index 59616cdf18..64d9767943 100644 --- a/packers/src/packers.rs +++ b/packers/src/packers.rs @@ -10,7 +10,7 @@ use std::iter; use std::slice::Chunks; use arrow_deps::parquet::data_type::ByteArray; -use data_types::schema::{InfluxColumnType, InfluxFieldType}; +use internal_types::schema::{InfluxColumnType, InfluxFieldType}; use std::default::Default; // NOTE: See https://blog.twitter.com/engineering/en_us/a/2013/dremel-made-simple-with-parquet.html diff --git a/query/Cargo.toml b/query/Cargo.toml index 2273117036..2276bf995d 100644 --- a/query/Cargo.toml +++ b/query/Cargo.toml @@ -21,6 +21,7 @@ croaring = "0.4.5" data_types = { path = "../data_types" } futures = "0.3.7" influxdb_line_protocol = { path = "../influxdb_line_protocol" } +internal_types = { path = "../internal_types" } parking_lot = "0.11.1" snafu = "0.6.2" sqlparser = "0.8.0" diff --git a/query/src/exec/field.rs b/query/src/exec/field.rs index a9ef1154eb..b21f42d757 100644 --- a/query/src/exec/field.rs +++ b/query/src/exec/field.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use arrow_deps::arrow::{self, datatypes::SchemaRef}; -use data_types::TIME_COLUMN_NAME; +use internal_types::schema::TIME_COLUMN_NAME; use snafu::{ResultExt, Snafu}; #[derive(Debug, Snafu)] diff --git a/query/src/exec/fieldlist.rs b/query/src/exec/fieldlist.rs index e0b846aa0c..4c30928e77 100644 --- a/query/src/exec/fieldlist.rs +++ b/query/src/exec/fieldlist.rs @@ -9,7 +9,7 @@ use arrow_deps::arrow::{ datatypes::{DataType, SchemaRef}, record_batch::RecordBatch, }; -use data_types::TIME_COLUMN_NAME; +use internal_types::schema::TIME_COLUMN_NAME; use snafu::{ensure, ResultExt, Snafu}; diff --git a/query/src/frontend/influxrpc.rs b/query/src/frontend/influxrpc.rs index 86853b4a94..590233ae8b 100644 --- a/query/src/frontend/influxrpc.rs +++ b/query/src/frontend/influxrpc.rs @@ -14,10 +14,9 @@ use arrow_deps::{ }, util::IntoExpr, }; -use data_types::{ - schema::{InfluxColumnType, Schema}, +use internal_types::{ + schema::{InfluxColumnType, Schema, TIME_COLUMN_NAME}, selection::Selection, - TIME_COLUMN_NAME, }; use snafu::{ensure, OptionExt, ResultExt, Snafu}; use tracing::debug; diff --git a/query/src/frontend/sql.rs b/query/src/frontend/sql.rs index ce305c21a0..f30788236b 100644 --- a/query/src/frontend/sql.rs +++ b/query/src/frontend/sql.rs @@ -4,7 +4,7 @@ use snafu::{ResultExt, Snafu}; use crate::{exec::Executor, provider::ProviderBuilder, Database, PartitionChunk}; use arrow_deps::datafusion::{error::DataFusionError, physical_plan::ExecutionPlan}; -use data_types::selection::Selection; +use internal_types::selection::Selection; #[derive(Debug, Snafu)] pub enum Error { diff --git a/query/src/lib.rs b/query/src/lib.rs index b1a3a53b74..446f2297f8 100644 --- a/query/src/lib.rs +++ b/query/src/lib.rs @@ -8,11 +8,9 @@ use arrow_deps::datafusion::physical_plan::SendableRecordBatchStream; use async_trait::async_trait; -use data_types::{ - chunk::ChunkSummary, data::ReplicatedWrite, partition_metadata::TableSummary, schema::Schema, - selection::Selection, -}; +use data_types::{chunk::ChunkSummary, partition_metadata::TableSummary}; use exec::{stringset::StringSet, Executor}; +use internal_types::{data::ReplicatedWrite, schema::Schema, selection::Selection}; use std::{fmt::Debug, sync::Arc}; diff --git a/query/src/plan/stringset.rs b/query/src/plan/stringset.rs index e2b9a60962..73cc850854 100644 --- a/query/src/plan/stringset.rs +++ b/query/src/plan/stringset.rs @@ -1,7 +1,10 @@ use std::sync::Arc; use arrow_deps::{datafusion::logical_plan::LogicalPlan, util::str_iter_to_batch}; -use data_types::TABLE_NAMES_COLUMN_NAME; + +/// The name of the column containing table names returned by a call to +/// `table_names`. +const TABLE_NAMES_COLUMN_NAME: &str = "table"; use crate::{ exec::stringset::{StringSet, StringSetRef}, diff --git a/query/src/predicate.rs b/query/src/predicate.rs index e2b6759ab5..d02b8dec2a 100644 --- a/query/src/predicate.rs +++ b/query/src/predicate.rs @@ -9,7 +9,8 @@ use arrow_deps::{ datafusion::logical_plan::Expr, util::{make_range_expr, AndExprBuilder}, }; -use data_types::{timestamp::TimestampRange, TIME_COLUMN_NAME}; +use data_types::timestamp::TimestampRange; +use internal_types::schema::TIME_COLUMN_NAME; /// This `Predicate` represents the empty predicate (aka that /// evaluates to true for all rows). diff --git a/query/src/provider.rs b/query/src/provider.rs index df91b53081..5a6e8065e6 100644 --- a/query/src/provider.rs +++ b/query/src/provider.rs @@ -14,7 +14,7 @@ use arrow_deps::{ physical_plan::ExecutionPlan, }, }; -use data_types::schema::{builder::SchemaMerger, Schema}; +use internal_types::schema::{builder::SchemaMerger, Schema}; use crate::{predicate::Predicate, util::project_schema, PartitionChunk}; @@ -29,7 +29,7 @@ pub enum Error { #[snafu(display("Chunk schema not compatible for table '{}': {}", table_name, source))] ChunkSchemaNotCompatible { table_name: String, - source: data_types::schema::builder::Error, + source: internal_types::schema::builder::Error, }, #[snafu(display( @@ -39,7 +39,7 @@ pub enum Error { ))] InternalNoChunks { table_name: String, - source: data_types::schema::builder::Error, + source: internal_types::schema::builder::Error, }, #[snafu(display("Internal error: No rows found in table '{}'", table_name))] diff --git a/query/src/provider/physical.rs b/query/src/provider/physical.rs index c17e961450..3342711d7a 100644 --- a/query/src/provider/physical.rs +++ b/query/src/provider/physical.rs @@ -9,7 +9,7 @@ use arrow_deps::{ physical_plan::{ExecutionPlan, Partitioning, SendableRecordBatchStream}, }, }; -use data_types::{schema::Schema, selection::Selection}; +use internal_types::{schema::Schema, selection::Selection}; use crate::{predicate::Predicate, PartitionChunk}; diff --git a/query/src/test.rs b/query/src/test.rs index 8117dae1df..f46740ac18 100644 --- a/query/src/test.rs +++ b/query/src/test.rs @@ -18,16 +18,16 @@ use crate::{ Database, DatabaseStore, PartitionChunk, Predicate, }; -use data_types::{ +use data_types::database_rules::{DatabaseRules, PartitionTemplate, TemplatePart}; +use influxdb_line_protocol::{parse_lines, ParsedLine}; +use internal_types::{ data::{lines_to_replicated_write, ReplicatedWrite}, - database_rules::{DatabaseRules, PartitionTemplate, TemplatePart}, schema::{ builder::{SchemaBuilder, SchemaMerger}, Schema, }, selection::Selection, }; -use influxdb_line_protocol::{parse_lines, ParsedLine}; use async_trait::async_trait; use chrono::{DateTime, Utc}; diff --git a/query/src/util.rs b/query/src/util.rs index 19244482d0..53bd71cc78 100644 --- a/query/src/util.rs +++ b/query/src/util.rs @@ -11,7 +11,7 @@ use arrow_deps::{ optimizer::utils::expr_to_column_names, }, }; -use data_types::schema::Schema; +use internal_types::schema::Schema; /// Create a logical plan that produces the record batch pub fn make_scan_plan(batch: RecordBatch) -> std::result::Result { @@ -57,7 +57,7 @@ pub fn schema_has_all_expr_columns(schema: &Schema, expr: &Expr) -> bool { #[cfg(test)] mod tests { use arrow_deps::datafusion::prelude::*; - use data_types::schema::builder::SchemaBuilder; + use internal_types::schema::builder::SchemaBuilder; use super::*; diff --git a/read_buffer/Cargo.toml b/read_buffer/Cargo.toml index de9018c428..61d37b35b0 100644 --- a/read_buffer/Cargo.toml +++ b/read_buffer/Cargo.toml @@ -13,9 +13,9 @@ edition = "2018" [dependencies] # In alphabetical order arrow_deps = { path = "../arrow_deps" } croaring = "0.4.5" -data_types = { path = "../data_types" } either = "1.6.1" hashbrown = "0.9.1" +internal_types = { path = "../internal_types" } itertools = "0.9.0" packers = { path = "../packers" } permutation = "0.2.5" diff --git a/read_buffer/benches/database.rs b/read_buffer/benches/database.rs index 1bef6a222c..975e550b1f 100644 --- a/read_buffer/benches/database.rs +++ b/read_buffer/benches/database.rs @@ -6,7 +6,7 @@ use arrow_deps::arrow::{ array::{ArrayRef, Int64Array, StringArray}, record_batch::RecordBatch, }; -use data_types::schema::builder::SchemaBuilder; +use internal_types::schema::builder::SchemaBuilder; use read_buffer::{BinaryExpr, Database, Predicate}; const BASE_TIME: i64 = 1351700038292387000_i64; diff --git a/read_buffer/src/chunk.rs b/read_buffer/src/chunk.rs index 30bccdb8eb..506b0d3338 100644 --- a/read_buffer/src/chunk.rs +++ b/read_buffer/src/chunk.rs @@ -3,7 +3,7 @@ use std::{ sync::RwLock, }; -use data_types::selection::Selection; +use internal_types::selection::Selection; use snafu::{ResultExt, Snafu}; use crate::row_group::RowGroup; diff --git a/read_buffer/src/lib.rs b/read_buffer/src/lib.rs index b0df18b8e5..af21e2d92a 100644 --- a/read_buffer/src/lib.rs +++ b/read_buffer/src/lib.rs @@ -16,7 +16,7 @@ use std::{ }; use arrow_deps::arrow::record_batch::RecordBatch; -use data_types::{ +use internal_types::{ schema::{builder::SchemaMerger, Schema}, selection::Selection, }; @@ -40,7 +40,7 @@ pub enum Error { // TODO add more context / helpful error here #[snafu(display("Error building unioned read buffer schema for chunks: {}", source))] BuildingSchema { - source: data_types::schema::builder::Error, + source: internal_types::schema::builder::Error, }, #[snafu(display("partition key does not exist: {}", key))] @@ -842,7 +842,7 @@ mod test { }, datatypes::DataType::{Boolean, Float64, Int64, UInt64, Utf8}, }; - use data_types::schema::builder::SchemaBuilder; + use internal_types::schema::builder::SchemaBuilder; use crate::value::Values; diff --git a/read_buffer/src/row_group.rs b/read_buffer/src/row_group.rs index 07b94fddea..d057eed6c2 100644 --- a/read_buffer/src/row_group.rs +++ b/read_buffer/src/row_group.rs @@ -22,13 +22,11 @@ use arrow_deps::{ arrow, datafusion::logical_plan::Expr as DfExpr, datafusion::scalar::ScalarValue as DFScalarValue, }; -use data_types::{ - schema::{InfluxColumnType, Schema}, - selection::Selection, -}; +use internal_types::schema::{InfluxColumnType, Schema}; +use internal_types::selection::Selection; /// The name used for a timestamp column. -pub const TIME_COLUMN_NAME: &str = data_types::TIME_COLUMN_NAME; +pub const TIME_COLUMN_NAME: &str = internal_types::schema::TIME_COLUMN_NAME; #[derive(Debug, Snafu)] pub enum Error { @@ -39,7 +37,7 @@ pub enum Error { #[snafu(display("schema conversion error: {}", source))] SchemaError { - source: data_types::schema::builder::Error, + source: internal_types::schema::builder::Error, }, #[snafu(display("unsupported operation: {}", msg))] @@ -1638,7 +1636,7 @@ impl TryFrom> for RecordBatch { type Error = Error; fn try_from(result: ReadFilterResult<'_>) -> Result { - let schema = data_types::schema::Schema::try_from(result.schema()) + let schema = internal_types::schema::Schema::try_from(result.schema()) .map_err(|source| Error::SchemaError { source })?; let arrow_schema: arrow_deps::arrow::datatypes::SchemaRef = schema.into(); @@ -1871,7 +1869,7 @@ impl TryFrom> for RecordBatch { type Error = Error; fn try_from(result: ReadAggregateResult<'_>) -> Result { - let schema = data_types::schema::Schema::try_from(result.schema()) + let schema = internal_types::schema::Schema::try_from(result.schema()) .map_err(|source| Error::SchemaError { source })?; let arrow_schema: arrow_deps::arrow::datatypes::SchemaRef = schema.into(); diff --git a/read_buffer/src/schema.rs b/read_buffer/src/schema.rs index 895a1b6ee7..9d9828ff2a 100644 --- a/read_buffer/src/schema.rs +++ b/read_buffer/src/schema.rs @@ -1,7 +1,7 @@ use std::{convert::TryFrom, fmt::Display}; use arrow_deps::arrow; -use data_types::schema::InfluxFieldType; +use internal_types::schema::InfluxFieldType; /// A schema that is used to track the names and semantics of columns returned /// in results out of various operations on a row group. @@ -96,11 +96,11 @@ impl Display for ResultSchema { } } -impl TryFrom<&ResultSchema> for data_types::schema::Schema { - type Error = data_types::schema::builder::Error; +impl TryFrom<&ResultSchema> for internal_types::schema::Schema { + type Error = internal_types::schema::builder::Error; fn try_from(rs: &ResultSchema) -> Result { - let mut builder = data_types::schema::builder::SchemaBuilder::new(); + let mut builder = internal_types::schema::builder::SchemaBuilder::new(); for (col_type, data_type) in &rs.select_columns { match col_type { ColumnType::Tag(name) => builder = builder.tag(name.as_str()), diff --git a/read_buffer/src/table.rs b/read_buffer/src/table.rs index e80fae23fe..f11c56e1b6 100644 --- a/read_buffer/src/table.rs +++ b/read_buffer/src/table.rs @@ -7,7 +7,7 @@ use std::{ }; use arrow_deps::arrow::record_batch::RecordBatch; -use data_types::selection::Selection; +use internal_types::selection::Selection; use snafu::{ensure, Snafu}; use crate::row_group::{self, ColumnName, GroupKey, Predicate, RowGroup}; diff --git a/server/Cargo.toml b/server/Cargo.toml index 8d26d6b735..257af8cd02 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -16,6 +16,7 @@ futures = "0.3.7" generated_types = { path = "../generated_types" } hashbrown = "0.9.1" influxdb_line_protocol = { path = "../influxdb_line_protocol" } +internal_types = { path = "../internal_types" } mutable_buffer = { path = "../mutable_buffer" } object_store = { path = "../object_store" } parking_lot = "0.11.1" diff --git a/server/src/buffer.rs b/server/src/buffer.rs index 7046cc1642..8448130ec2 100644 --- a/server/src/buffer.rs +++ b/server/src/buffer.rs @@ -1,11 +1,11 @@ //! This module contains code for managing the WAL buffer use data_types::{ - data::ReplicatedWrite, database_rules::{WalBufferRollover, WriterId}, DatabaseName, }; use generated_types::wal; +use internal_types::data::ReplicatedWrite; use object_store::{path::ObjectStorePath, ObjectStore, ObjectStoreApi}; use std::{ @@ -567,8 +567,9 @@ fn database_object_store_path( #[cfg(test)] mod tests { use super::*; - use data_types::{data::lines_to_replicated_write, database_rules::DatabaseRules}; + use data_types::database_rules::DatabaseRules; use influxdb_line_protocol::parse_lines; + use internal_types::data::lines_to_replicated_write; use object_store::memory::InMemory; #[test] diff --git a/server/src/db.rs b/server/src/db.rs index f557f97520..a3de6a84e3 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -10,9 +10,8 @@ use std::{ }; use async_trait::async_trait; -use data_types::{ - chunk::ChunkSummary, data::ReplicatedWrite, database_rules::DatabaseRules, selection::Selection, -}; +use data_types::{chunk::ChunkSummary, database_rules::DatabaseRules}; +use internal_types::{data::ReplicatedWrite, selection::Selection}; use mutable_buffer::MutableBufferDb; use parking_lot::Mutex; use query::{Database, PartitionChunk}; diff --git a/server/src/db/chunk.rs b/server/src/db/chunk.rs index 9713c8d83f..be1cf8747d 100644 --- a/server/src/db/chunk.rs +++ b/server/src/db/chunk.rs @@ -1,9 +1,6 @@ use arrow_deps::datafusion::physical_plan::SendableRecordBatchStream; -use data_types::{ - chunk::{ChunkStorage, ChunkSummary}, - schema::Schema, - selection::Selection, -}; +use data_types::chunk::{ChunkStorage, ChunkSummary}; +use internal_types::{schema::Schema, selection::Selection}; use mutable_buffer::chunk::Chunk as MBChunk; use query::{exec::stringset::StringSet, predicate::Predicate, PartitionChunk}; use read_buffer::Database as ReadBufferDb; @@ -33,7 +30,9 @@ pub enum Error { }, #[snafu(display("Internal error restricting schema: {}", source))] - InternalSelectingSchema { source: data_types::schema::Error }, + InternalSelectingSchema { + source: internal_types::schema::Error, + }, #[snafu(display("Predicate conversion error: {}", source))] PredicateConversion { source: super::pred::Error }, diff --git a/server/src/db/streams.rs b/server/src/db/streams.rs index c255bed513..5ea7d8cd17 100644 --- a/server/src/db/streams.rs +++ b/server/src/db/streams.rs @@ -8,7 +8,7 @@ use arrow_deps::{ }, datafusion::physical_plan::RecordBatchStream, }; -use data_types::selection::Selection; +use internal_types::selection::Selection; use mutable_buffer::chunk::Chunk as MBChunk; use read_buffer::ReadFilterResults; diff --git a/server/src/lib.rs b/server/src/lib.rs index 197bc72659..69e5dfa3b6 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -80,11 +80,12 @@ use snafu::{OptionExt, ResultExt, Snafu}; use tracing::{debug, error, info}; use data_types::{ - data::{lines_to_replicated_write, ReplicatedWrite}, database_rules::{DatabaseRules, WriterId}, job::Job, {DatabaseName, DatabaseNameError}, }; +use internal_types::data::{lines_to_replicated_write, ReplicatedWrite}; + use influxdb_line_protocol::ParsedLine; use object_store::{path::ObjectStorePath, ObjectStore, ObjectStoreApi}; use query::{exec::Executor, DatabaseStore}; diff --git a/server/src/query_tests/table_schema.rs b/server/src/query_tests/table_schema.rs index 9aceb80aa5..c3b49df0ae 100644 --- a/server/src/query_tests/table_schema.rs +++ b/server/src/query_tests/table_schema.rs @@ -1,7 +1,7 @@ //! Tests for the table_names implementation use arrow_deps::arrow::datatypes::DataType; -use data_types::{schema::builder::SchemaBuilder, selection::Selection}; +use internal_types::{schema::builder::SchemaBuilder, selection::Selection}; use query::{Database, PartitionChunk}; use super::scenarios::*; diff --git a/server/src/snapshot.rs b/server/src/snapshot.rs index 403435fdfa..de64050e16 100644 --- a/server/src/snapshot.rs +++ b/server/src/snapshot.rs @@ -5,10 +5,8 @@ use arrow_deps::{ datafusion::physical_plan::SendableRecordBatchStream, parquet::{self, arrow::ArrowWriter, file::writer::TryClone}, }; -use data_types::{ - partition_metadata::{PartitionSummary, TableSummary}, - selection::Selection, -}; +use data_types::partition_metadata::{PartitionSummary, TableSummary}; +use internal_types::selection::Selection; use object_store::{path::ObjectStorePath, ObjectStore, ObjectStoreApi}; use query::{predicate::EMPTY_PREDICATE, PartitionChunk}; diff --git a/src/commands/convert.rs b/src/commands/convert.rs index d04b5ff00f..0b64ae8f88 100644 --- a/src/commands/convert.rs +++ b/src/commands/convert.rs @@ -1,9 +1,9 @@ -use data_types::schema::Schema; use influxdb_line_protocol::parse_lines; use ingest::{ parquet::writer::{CompressionLevel, Error as ParquetWriterError, IOxParquetTableWriter}, ConversionSettings, Error as IngestError, LineProtocolConverter, TSMFileConverter, }; +use internal_types::schema::Schema; use packers::{Error as TableError, IOxTableWriter, IOxTableWriterSource}; use snafu::{OptionExt, ResultExt, Snafu}; use std::{