refactor: Move some types (not yet exposed to clients) into internal_types (#1015)

* refactor: Move some types (not yet exposed to clients) into internal_types

* docs: Add README.md explaining the rationale

* refactor: remove some stragglers

* fix: fix benches

* fix: Apply suggestions from code review

Co-authored-by: Carol (Nichols || Goulding) <193874+carols10cents@users.noreply.github.com>

* fix: add clippy lints

* fix: fmt

* docs: Apply suggestions from code review

fix typos

Co-authored-by: Carol (Nichols || Goulding) <193874+carols10cents@users.noreply.github.com>
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Andrew Lamb 2021-03-19 12:27:57 -04:00 committed by GitHub
parent 7e6c6d67b4
commit 6e1795fda0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
56 changed files with 177 additions and 130 deletions

30
Cargo.lock generated
View File

@ -747,11 +747,7 @@ dependencies = [
name = "data_types"
version = "0.1.0"
dependencies = [
"arrow_deps",
"chrono",
"crc32fast",
"criterion",
"flatbuffers 0.6.1",
"generated_types",
"influxdb_line_protocol",
"percent-encoding",
@ -1429,6 +1425,7 @@ dependencies = [
"influxdb_line_protocol",
"influxdb_tsm",
"ingest",
"internal_types",
"logfmt",
"mem_qe",
"mutable_buffer",
@ -1516,10 +1513,10 @@ name = "ingest"
version = "0.1.0"
dependencies = [
"arrow_deps",
"data_types",
"flate2",
"influxdb_line_protocol",
"influxdb_tsm",
"internal_types",
"packers",
"parking_lot",
"snafu",
@ -1542,6 +1539,22 @@ version = "1.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "48dc51180a9b377fd75814d0cc02199c20f8e99433d6762f650d39cdbbd3b56f"
[[package]]
name = "internal_types"
version = "0.1.0"
dependencies = [
"arrow_deps",
"chrono",
"crc32fast",
"criterion",
"data_types",
"flatbuffers 0.6.1",
"generated_types",
"influxdb_line_protocol",
"snafu",
"tracing",
]
[[package]]
name = "ipnet"
version = "2.3.0"
@ -1833,6 +1846,7 @@ dependencies = [
"flatbuffers 0.6.1",
"generated_types",
"influxdb_line_protocol",
"internal_types",
"snafu",
"string-interner",
"test_helpers",
@ -2151,9 +2165,9 @@ name = "packers"
version = "0.1.0"
dependencies = [
"arrow_deps",
"data_types",
"human_format",
"influxdb_tsm",
"internal_types",
"rand 0.8.3",
"snafu",
"test_helpers",
@ -2507,6 +2521,7 @@ dependencies = [
"data_types",
"futures",
"influxdb_line_protocol",
"internal_types",
"parking_lot",
"snafu",
"sqlparser",
@ -2654,9 +2669,9 @@ dependencies = [
"arrow_deps",
"criterion",
"croaring",
"data_types",
"either",
"hashbrown",
"internal_types",
"itertools 0.9.0",
"packers",
"permutation",
@ -3125,6 +3140,7 @@ dependencies = [
"generated_types",
"hashbrown",
"influxdb_line_protocol",
"internal_types",
"mutable_buffer",
"object_store",
"parking_lot",

View File

@ -4,6 +4,7 @@ version = "0.1.0"
authors = ["Paul Dix <paul@pauldix.net>"]
edition = "2018"
default-run = "influxdb_iox"
readme = "README.md"
[workspace] # In alphabetical order
members = [
@ -16,6 +17,7 @@ members = [
"influxdb_tsm",
"influxdb2_client",
"ingest",
"internal_types",
"logfmt",
"mem_qe",
"mutable_buffer",
@ -43,6 +45,7 @@ generated_types = { path = "generated_types" }
influxdb_iox_client = { path = "influxdb_iox_client", features = ["format"] }
influxdb_line_protocol = { path = "influxdb_line_protocol" }
influxdb_tsm = { path = "influxdb_tsm" }
internal_types = { path = "internal_types" }
ingest = { path = "ingest" }
logfmt = { path = "logfmt" }
mem_qe = { path = "mem_qe" }

View File

@ -1,5 +1,4 @@
use criterion::{criterion_group, criterion_main, Criterion, Throughput};
use data_types::schema::Schema;
use influxdb_line_protocol::parse_lines;
use ingest::{
parquet::{
@ -8,6 +7,7 @@ use ingest::{
},
ConversionSettings, LineProtocolConverter,
};
use internal_types::schema::Schema;
use packers::{Error as TableError, IOxTableWriter, IOxTableWriterSource};
use std::time::Duration;

View File

@ -2,13 +2,12 @@
name = "data_types"
version = "0.1.0"
authors = ["pauldix <paul@pauldix.net>"]
description = "InfluxDB IOx data_types, shared between IOx instances and IOx clients"
edition = "2018"
readme = "README.md"
[dependencies] # In alphabetical order
arrow_deps = { path = "../arrow_deps" }
chrono = { version = "0.4", features = ["serde"] }
crc32fast = "1.2.0"
flatbuffers = "0.6" # TODO: Update to 0.8
generated_types = { path = "../generated_types" }
influxdb_line_protocol = { path = "../influxdb_line_protocol" }
percent-encoding = "2.1.0"
@ -21,9 +20,4 @@ tonic = { version = "0.4.0" }
tracing = "0.1"
[dev-dependencies] # In alphabetical order
criterion = "0.3"
test_helpers = { path = "../test_helpers" }
[[bench]]
name = "benchmark"
harness = false

5
data_types/README.md Normal file
View File

@ -0,0 +1,5 @@
# Data Types
This crate contains types that are designed for external consumption (in `influxdb_iox_client` and other "client" facing uses).
*Client facing* in this case means exposed via management API or CLI and where changing the structs may require additional coordination / organization with clients.

View File

@ -11,26 +11,14 @@
)]
pub use database_name::*;
pub use schema::TIME_COLUMN_NAME;
/// The name of the column containing table names returned by a call to
/// `table_names`.
pub const TABLE_NAMES_COLUMN_NAME: &str = "table";
/// The name of the column containing column names returned by a call to
/// `column_names`.
pub const COLUMN_NAMES_COLUMN_NAME: &str = "column";
pub mod chunk;
pub mod data;
pub mod database_rules;
pub mod error;
pub mod http;
pub mod job;
pub mod names;
pub mod partition_metadata;
pub mod schema;
pub mod selection;
pub mod timestamp;
pub mod wal;

View File

@ -6,9 +6,9 @@ edition = "2018"
[dependencies] # In alphabetical order
arrow_deps = { path = "../arrow_deps" }
data_types = { path = "../data_types" }
influxdb_line_protocol = { path = "../influxdb_line_protocol" }
influxdb_tsm = { path = "../influxdb_tsm" }
internal_types = { path = "../internal_types" }
packers = { path = "../packers" }
snafu = "0.6.2"
tracing = "0.1"

View File

@ -11,16 +11,15 @@
clippy::clone_on_ref_ptr
)]
use data_types::{
schema::{builder::InfluxSchemaBuilder, InfluxFieldType, Schema},
TIME_COLUMN_NAME,
};
use influxdb_line_protocol::{FieldValue, ParsedLine};
use influxdb_tsm::{
mapper::{ColumnData, MeasurementTable, TSMMeasurementMapper},
reader::{BlockDecoder, TSMBlockReader, TSMIndexReader},
BlockType, TSMError,
};
use internal_types::schema::{
builder::InfluxSchemaBuilder, InfluxFieldType, Schema, TIME_COLUMN_NAME,
};
use packers::{
ByteArray, Error as TableError, IOxTableWriter, IOxTableWriterSource, Packer, Packers,
};
@ -75,7 +74,7 @@ pub enum Error {
#[snafu(display(r#"Error building schema: {}"#, source))]
BuildingSchema {
source: data_types::schema::builder::Error,
source: internal_types::schema::builder::Error,
},
#[snafu(display(r#"Error writing to TableWriter: {}"#, source))]
@ -96,8 +95,8 @@ pub enum Error {
CouldNotFindColumn,
}
impl From<data_types::schema::builder::Error> for Error {
fn from(source: data_types::schema::builder::Error) -> Self {
impl From<internal_types::schema::builder::Error> for Error {
fn from(source: internal_types::schema::builder::Error) -> Self {
Self::BuildingSchema { source }
}
}
@ -820,7 +819,8 @@ impl TSMFileConverter {
mut block_reader: impl BlockDecoder,
m: &mut MeasurementTable,
) -> Result<(Schema, Vec<Packers>), Error> {
let mut builder = data_types::schema::builder::SchemaBuilder::new().measurement(&m.name);
let mut builder =
internal_types::schema::builder::SchemaBuilder::new().measurement(&m.name);
let mut packed_columns: Vec<Packers> = Vec::new();
let mut tks = Vec::new();
@ -1099,11 +1099,11 @@ impl std::fmt::Debug for TSMFileConverter {
#[cfg(test)]
mod tests {
use super::*;
use data_types::{assert_column_eq, schema::InfluxColumnType};
use influxdb_tsm::{
reader::{BlockData, MockBlockDecoder},
Block,
};
use internal_types::{assert_column_eq, schema::InfluxColumnType};
use packers::{Error as TableError, IOxTableWriter, IOxTableWriterSource, Packers};
use test_helpers::approximately_equal;

View File

@ -9,7 +9,7 @@ use arrow_deps::parquet::{
},
schema::types::{ColumnPath, Type},
};
use data_types::schema::{InfluxColumnType, InfluxFieldType, Schema};
use internal_types::schema::{InfluxColumnType, InfluxFieldType, Schema};
use parquet::file::writer::ParquetWriter;
use snafu::{OptionExt, ResultExt, Snafu};
use std::{
@ -97,7 +97,7 @@ where
///
/// ```
/// # use std::fs;
/// # use data_types::schema::{builder::SchemaBuilder, InfluxFieldType};
/// # use internal_types::schema::{builder::SchemaBuilder, InfluxFieldType};
/// # use packers::IOxTableWriter;
/// # use packers::{Packer, Packers};
/// # use ingest::parquet::writer::{IOxParquetTableWriter, CompressionLevel};
@ -505,7 +505,7 @@ fn create_writer_props(
#[cfg(test)]
mod tests {
use data_types::schema::builder::SchemaBuilder;
use internal_types::schema::builder::SchemaBuilder;
use super::*;

View File

@ -1,5 +1,5 @@
use data_types::schema::{builder::SchemaBuilder, InfluxFieldType};
use ingest::parquet::writer::{CompressionLevel, IOxParquetTableWriter};
use internal_types::schema::{builder::SchemaBuilder, InfluxFieldType};
use packers::{IOxTableWriter, Packer, Packers};
use arrow_deps::parquet::data_type::ByteArray;

27
internal_types/Cargo.toml Normal file
View File

@ -0,0 +1,27 @@
[package]
name = "internal_types"
version = "0.1.0"
authors = ["Andrew Lamb <andrew@nerdnetworks.org>"]
edition = "2018"
description = "InfluxDB IOx internal types, shared between IOx instances"
readme = "README.md"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
arrow_deps = { path = "../arrow_deps" }
crc32fast = "1.2.0"
chrono = { version = "0.4", features = ["serde"] }
data_types = { path = "../data_types" }
flatbuffers = "0.6" # TODO: Update to 0.8
generated_types = { path = "../generated_types" }
influxdb_line_protocol = { path = "../influxdb_line_protocol" }
snafu = "0.6"
tracing = "0.1"
[dev-dependencies] # In alphabetical order
criterion = "0.3"
[[bench]]
name = "benchmark"
harness = false

7
internal_types/README.md Normal file
View File

@ -0,0 +1,7 @@
# Internal Types
This crate contains InfluxDB IOx "internal" types which are shared
across crates and internally between IOx instances, but not exposed
externally to clients
*Internal* in this case means that changing the structs is designed not to require additional coordination / organization with clients.

View File

@ -1,9 +1,9 @@
use criterion::measurement::WallTime;
use criterion::{criterion_group, criterion_main, Bencher, BenchmarkId, Criterion, Throughput};
use data_types::data::{lines_to_replicated_write as lines_to_rw, ReplicatedWrite};
use data_types::database_rules::{DatabaseRules, PartitionTemplate, TemplatePart};
use generated_types::wal as wb;
use influxdb_line_protocol::{parse_lines, ParsedLine};
use internal_types::data::{lines_to_replicated_write as lines_to_rw, ReplicatedWrite};
use std::collections::{BTreeMap, BTreeSet};
use std::fmt;
use std::time::Duration;

View File

@ -1,8 +1,8 @@
//! This module contains helper methods for constructing replicated writes
//! based on `DatabaseRules`.
use crate::database_rules::Partitioner;
use crate::TIME_COLUMN_NAME;
use crate::schema::TIME_COLUMN_NAME;
use data_types::database_rules::Partitioner;
use generated_types::wal as wb;
use influxdb_line_protocol::{FieldValue, ParsedLine};

11
internal_types/src/lib.rs Normal file
View File

@ -0,0 +1,11 @@
#![deny(rust_2018_idioms)]
#![warn(
missing_debug_implementations,
clippy::explicit_iter_loop,
clippy::use_self,
clippy::clone_on_ref_ptr
)]
pub mod data;
pub mod schema;
pub mod selection;

View File

@ -140,7 +140,7 @@ impl SchemaBuilder {
/// schema validation happens at this time.
/// ```
/// use data_types::schema::{builder::SchemaBuilder, InfluxColumnType, InfluxFieldType};
/// use internal_types::schema::{builder::SchemaBuilder, InfluxColumnType, InfluxFieldType};
///
/// let schema = SchemaBuilder::new()
/// .tag("region")

View File

@ -20,6 +20,7 @@ chrono = "0.4"
data_types = { path = "../data_types" }
flatbuffers = "0.6" # TODO: Update to 0.8
generated_types = { path = "../generated_types" }
internal_types = { path = "../internal_types" }
influxdb_line_protocol = { path = "../influxdb_line_protocol" }
snafu = "0.6.2"
string-interner = "0.12.2"

View File

@ -9,7 +9,8 @@ use chrono::{DateTime, Utc};
use generated_types::wal as wb;
use std::collections::{BTreeSet, HashMap};
use data_types::{partition_metadata::TableSummary, schema::Schema, selection::Selection};
use data_types::partition_metadata::TableSummary;
use internal_types::{schema::Schema, selection::Selection};
use crate::{
column::Column,

View File

@ -2,9 +2,9 @@ use generated_types::wal as wb;
use snafu::Snafu;
use crate::dictionary::Dictionary;
use data_types::{data::type_description, partition_metadata::StatValues};
use arrow_deps::arrow::datatypes::DataType as ArrowDataType;
use data_types::partition_metadata::StatValues;
use internal_types::data::type_description;
use std::mem;

View File

@ -1,8 +1,6 @@
use data_types::{
data::ReplicatedWrite,
database_rules::{PartitionSort, PartitionSortRules},
};
use data_types::database_rules::{PartitionSort, PartitionSortRules};
use generated_types::wal;
use internal_types::data::ReplicatedWrite;
use crate::{chunk::Chunk, partition::Partition};
@ -249,12 +247,10 @@ impl MutableBufferDb {
mod tests {
use super::*;
use chrono::{DateTime, Utc};
use data_types::{
data::lines_to_replicated_write, database_rules::Partitioner, selection::Selection,
};
use data_types::database_rules::{Order, Partitioner};
use internal_types::{data::lines_to_replicated_write, selection::Selection};
use arrow_deps::arrow::array::{Array, StringArray};
use data_types::database_rules::Order;
use influxdb_line_protocol::{parse_lines, ParsedLine};
type TestError = Box<dyn std::error::Error + Send + Sync + 'static>;

View File

@ -302,10 +302,8 @@ impl<'a> Iterator for ChunkIter<'a> {
mod tests {
use super::*;
use chrono::Utc;
use data_types::{
data::split_lines_into_write_entry_partitions, partition_metadata::PartitionSummary,
selection::Selection,
};
use data_types::partition_metadata::PartitionSummary;
use internal_types::{data::split_lines_into_write_entry_partitions, selection::Selection};
use arrow_deps::{
arrow::record_batch::RecordBatch, assert_table_eq, test_util::sort_record_batch,

View File

@ -10,7 +10,8 @@ use arrow_deps::{
},
util::{make_range_expr, AndExprBuilder},
};
use data_types::{timestamp::TimestampRange, TIME_COLUMN_NAME};
use data_types::timestamp::TimestampRange;
use internal_types::schema::TIME_COLUMN_NAME;
//use snafu::{OptionExt, ResultExt, Snafu};
use snafu::{ensure, ResultExt, Snafu};

View File

@ -12,12 +12,12 @@ use crate::{
dictionary::{Dictionary, Error as DictionaryError},
pred::{ChunkIdSet, ChunkPredicate},
};
use data_types::{
partition_metadata::{ColumnSummary, Statistics},
schema::{builder::SchemaBuilder, Schema},
use data_types::partition_metadata::{ColumnSummary, Statistics};
use internal_types::{
schema::{builder::SchemaBuilder, Schema, TIME_COLUMN_NAME},
selection::Selection,
TIME_COLUMN_NAME,
};
use snafu::{OptionExt, ResultExt, Snafu};
use arrow_deps::{
@ -84,7 +84,7 @@ pub enum Error {
#[snafu(display("Internal error converting schema: {}", source))]
InternalSchema {
source: data_types::schema::builder::Error,
source: internal_types::schema::builder::Error,
},
#[snafu(display(
@ -597,8 +597,8 @@ impl<'a> TableColSelection<'a> {
#[cfg(test)]
mod tests {
use data_types::data::split_lines_into_write_entry_partitions;
use influxdb_line_protocol::{parse_lines, ParsedLine};
use internal_types::data::split_lines_into_write_entry_partitions;
use super::*;

View File

@ -6,9 +6,9 @@ edition = "2018"
[dependencies] # In alphabetical order
arrow_deps = { path = "../arrow_deps" }
data_types = { path = "../data_types" }
human_format = "1.0.3"
influxdb_tsm = { path = "../influxdb_tsm" }
internal_types = { path = "../internal_types" }
snafu = "0.6.2"
tracing = "0.1"

View File

@ -15,7 +15,7 @@ use snafu::Snafu;
pub use crate::packers::{Packer, Packers};
pub use arrow_deps::parquet::data_type::ByteArray;
use data_types::schema::Schema;
use internal_types::schema::Schema;
use std::borrow::Cow;

View File

@ -10,7 +10,7 @@ use std::iter;
use std::slice::Chunks;
use arrow_deps::parquet::data_type::ByteArray;
use data_types::schema::{InfluxColumnType, InfluxFieldType};
use internal_types::schema::{InfluxColumnType, InfluxFieldType};
use std::default::Default;
// NOTE: See https://blog.twitter.com/engineering/en_us/a/2013/dremel-made-simple-with-parquet.html

View File

@ -21,6 +21,7 @@ croaring = "0.4.5"
data_types = { path = "../data_types" }
futures = "0.3.7"
influxdb_line_protocol = { path = "../influxdb_line_protocol" }
internal_types = { path = "../internal_types" }
parking_lot = "0.11.1"
snafu = "0.6.2"
sqlparser = "0.8.0"

View File

@ -1,7 +1,7 @@
use std::sync::Arc;
use arrow_deps::arrow::{self, datatypes::SchemaRef};
use data_types::TIME_COLUMN_NAME;
use internal_types::schema::TIME_COLUMN_NAME;
use snafu::{ResultExt, Snafu};
#[derive(Debug, Snafu)]

View File

@ -9,7 +9,7 @@ use arrow_deps::arrow::{
datatypes::{DataType, SchemaRef},
record_batch::RecordBatch,
};
use data_types::TIME_COLUMN_NAME;
use internal_types::schema::TIME_COLUMN_NAME;
use snafu::{ensure, ResultExt, Snafu};

View File

@ -14,10 +14,9 @@ use arrow_deps::{
},
util::IntoExpr,
};
use data_types::{
schema::{InfluxColumnType, Schema},
use internal_types::{
schema::{InfluxColumnType, Schema, TIME_COLUMN_NAME},
selection::Selection,
TIME_COLUMN_NAME,
};
use snafu::{ensure, OptionExt, ResultExt, Snafu};
use tracing::debug;

View File

@ -4,7 +4,7 @@ use snafu::{ResultExt, Snafu};
use crate::{exec::Executor, provider::ProviderBuilder, Database, PartitionChunk};
use arrow_deps::datafusion::{error::DataFusionError, physical_plan::ExecutionPlan};
use data_types::selection::Selection;
use internal_types::selection::Selection;
#[derive(Debug, Snafu)]
pub enum Error {

View File

@ -8,11 +8,9 @@
use arrow_deps::datafusion::physical_plan::SendableRecordBatchStream;
use async_trait::async_trait;
use data_types::{
chunk::ChunkSummary, data::ReplicatedWrite, partition_metadata::TableSummary, schema::Schema,
selection::Selection,
};
use data_types::{chunk::ChunkSummary, partition_metadata::TableSummary};
use exec::{stringset::StringSet, Executor};
use internal_types::{data::ReplicatedWrite, schema::Schema, selection::Selection};
use std::{fmt::Debug, sync::Arc};

View File

@ -1,7 +1,10 @@
use std::sync::Arc;
use arrow_deps::{datafusion::logical_plan::LogicalPlan, util::str_iter_to_batch};
use data_types::TABLE_NAMES_COLUMN_NAME;
/// The name of the column containing table names returned by a call to
/// `table_names`.
const TABLE_NAMES_COLUMN_NAME: &str = "table";
use crate::{
exec::stringset::{StringSet, StringSetRef},

View File

@ -9,7 +9,8 @@ use arrow_deps::{
datafusion::logical_plan::Expr,
util::{make_range_expr, AndExprBuilder},
};
use data_types::{timestamp::TimestampRange, TIME_COLUMN_NAME};
use data_types::timestamp::TimestampRange;
use internal_types::schema::TIME_COLUMN_NAME;
/// This `Predicate` represents the empty predicate (aka that
/// evaluates to true for all rows).

View File

@ -14,7 +14,7 @@ use arrow_deps::{
physical_plan::ExecutionPlan,
},
};
use data_types::schema::{builder::SchemaMerger, Schema};
use internal_types::schema::{builder::SchemaMerger, Schema};
use crate::{predicate::Predicate, util::project_schema, PartitionChunk};
@ -29,7 +29,7 @@ pub enum Error {
#[snafu(display("Chunk schema not compatible for table '{}': {}", table_name, source))]
ChunkSchemaNotCompatible {
table_name: String,
source: data_types::schema::builder::Error,
source: internal_types::schema::builder::Error,
},
#[snafu(display(
@ -39,7 +39,7 @@ pub enum Error {
))]
InternalNoChunks {
table_name: String,
source: data_types::schema::builder::Error,
source: internal_types::schema::builder::Error,
},
#[snafu(display("Internal error: No rows found in table '{}'", table_name))]

View File

@ -9,7 +9,7 @@ use arrow_deps::{
physical_plan::{ExecutionPlan, Partitioning, SendableRecordBatchStream},
},
};
use data_types::{schema::Schema, selection::Selection};
use internal_types::{schema::Schema, selection::Selection};
use crate::{predicate::Predicate, PartitionChunk};

View File

@ -18,16 +18,16 @@ use crate::{
Database, DatabaseStore, PartitionChunk, Predicate,
};
use data_types::{
use data_types::database_rules::{DatabaseRules, PartitionTemplate, TemplatePart};
use influxdb_line_protocol::{parse_lines, ParsedLine};
use internal_types::{
data::{lines_to_replicated_write, ReplicatedWrite},
database_rules::{DatabaseRules, PartitionTemplate, TemplatePart},
schema::{
builder::{SchemaBuilder, SchemaMerger},
Schema,
},
selection::Selection,
};
use influxdb_line_protocol::{parse_lines, ParsedLine};
use async_trait::async_trait;
use chrono::{DateTime, Utc};

View File

@ -11,7 +11,7 @@ use arrow_deps::{
optimizer::utils::expr_to_column_names,
},
};
use data_types::schema::Schema;
use internal_types::schema::Schema;
/// Create a logical plan that produces the record batch
pub fn make_scan_plan(batch: RecordBatch) -> std::result::Result<LogicalPlan, DataFusionError> {
@ -57,7 +57,7 @@ pub fn schema_has_all_expr_columns(schema: &Schema, expr: &Expr) -> bool {
#[cfg(test)]
mod tests {
use arrow_deps::datafusion::prelude::*;
use data_types::schema::builder::SchemaBuilder;
use internal_types::schema::builder::SchemaBuilder;
use super::*;

View File

@ -13,9 +13,9 @@ edition = "2018"
[dependencies] # In alphabetical order
arrow_deps = { path = "../arrow_deps" }
croaring = "0.4.5"
data_types = { path = "../data_types" }
either = "1.6.1"
hashbrown = "0.9.1"
internal_types = { path = "../internal_types" }
itertools = "0.9.0"
packers = { path = "../packers" }
permutation = "0.2.5"

View File

@ -6,7 +6,7 @@ use arrow_deps::arrow::{
array::{ArrayRef, Int64Array, StringArray},
record_batch::RecordBatch,
};
use data_types::schema::builder::SchemaBuilder;
use internal_types::schema::builder::SchemaBuilder;
use read_buffer::{BinaryExpr, Database, Predicate};
const BASE_TIME: i64 = 1351700038292387000_i64;

View File

@ -3,7 +3,7 @@ use std::{
sync::RwLock,
};
use data_types::selection::Selection;
use internal_types::selection::Selection;
use snafu::{ResultExt, Snafu};
use crate::row_group::RowGroup;

View File

@ -16,7 +16,7 @@ use std::{
};
use arrow_deps::arrow::record_batch::RecordBatch;
use data_types::{
use internal_types::{
schema::{builder::SchemaMerger, Schema},
selection::Selection,
};
@ -40,7 +40,7 @@ pub enum Error {
// TODO add more context / helpful error here
#[snafu(display("Error building unioned read buffer schema for chunks: {}", source))]
BuildingSchema {
source: data_types::schema::builder::Error,
source: internal_types::schema::builder::Error,
},
#[snafu(display("partition key does not exist: {}", key))]
@ -842,7 +842,7 @@ mod test {
},
datatypes::DataType::{Boolean, Float64, Int64, UInt64, Utf8},
};
use data_types::schema::builder::SchemaBuilder;
use internal_types::schema::builder::SchemaBuilder;
use crate::value::Values;

View File

@ -22,13 +22,11 @@ use arrow_deps::{
arrow, datafusion::logical_plan::Expr as DfExpr,
datafusion::scalar::ScalarValue as DFScalarValue,
};
use data_types::{
schema::{InfluxColumnType, Schema},
selection::Selection,
};
use internal_types::schema::{InfluxColumnType, Schema};
use internal_types::selection::Selection;
/// The name used for a timestamp column.
pub const TIME_COLUMN_NAME: &str = data_types::TIME_COLUMN_NAME;
pub const TIME_COLUMN_NAME: &str = internal_types::schema::TIME_COLUMN_NAME;
#[derive(Debug, Snafu)]
pub enum Error {
@ -39,7 +37,7 @@ pub enum Error {
#[snafu(display("schema conversion error: {}", source))]
SchemaError {
source: data_types::schema::builder::Error,
source: internal_types::schema::builder::Error,
},
#[snafu(display("unsupported operation: {}", msg))]
@ -1638,7 +1636,7 @@ impl TryFrom<ReadFilterResult<'_>> for RecordBatch {
type Error = Error;
fn try_from(result: ReadFilterResult<'_>) -> Result<Self, Self::Error> {
let schema = data_types::schema::Schema::try_from(result.schema())
let schema = internal_types::schema::Schema::try_from(result.schema())
.map_err(|source| Error::SchemaError { source })?;
let arrow_schema: arrow_deps::arrow::datatypes::SchemaRef = schema.into();
@ -1871,7 +1869,7 @@ impl TryFrom<ReadAggregateResult<'_>> for RecordBatch {
type Error = Error;
fn try_from(result: ReadAggregateResult<'_>) -> Result<Self, Self::Error> {
let schema = data_types::schema::Schema::try_from(result.schema())
let schema = internal_types::schema::Schema::try_from(result.schema())
.map_err(|source| Error::SchemaError { source })?;
let arrow_schema: arrow_deps::arrow::datatypes::SchemaRef = schema.into();

View File

@ -1,7 +1,7 @@
use std::{convert::TryFrom, fmt::Display};
use arrow_deps::arrow;
use data_types::schema::InfluxFieldType;
use internal_types::schema::InfluxFieldType;
/// A schema that is used to track the names and semantics of columns returned
/// in results out of various operations on a row group.
@ -96,11 +96,11 @@ impl Display for ResultSchema {
}
}
impl TryFrom<&ResultSchema> for data_types::schema::Schema {
type Error = data_types::schema::builder::Error;
impl TryFrom<&ResultSchema> for internal_types::schema::Schema {
type Error = internal_types::schema::builder::Error;
fn try_from(rs: &ResultSchema) -> Result<Self, Self::Error> {
let mut builder = data_types::schema::builder::SchemaBuilder::new();
let mut builder = internal_types::schema::builder::SchemaBuilder::new();
for (col_type, data_type) in &rs.select_columns {
match col_type {
ColumnType::Tag(name) => builder = builder.tag(name.as_str()),

View File

@ -7,7 +7,7 @@ use std::{
};
use arrow_deps::arrow::record_batch::RecordBatch;
use data_types::selection::Selection;
use internal_types::selection::Selection;
use snafu::{ensure, Snafu};
use crate::row_group::{self, ColumnName, GroupKey, Predicate, RowGroup};

View File

@ -16,6 +16,7 @@ futures = "0.3.7"
generated_types = { path = "../generated_types" }
hashbrown = "0.9.1"
influxdb_line_protocol = { path = "../influxdb_line_protocol" }
internal_types = { path = "../internal_types" }
mutable_buffer = { path = "../mutable_buffer" }
object_store = { path = "../object_store" }
parking_lot = "0.11.1"

View File

@ -1,11 +1,11 @@
//! This module contains code for managing the WAL buffer
use data_types::{
data::ReplicatedWrite,
database_rules::{WalBufferRollover, WriterId},
DatabaseName,
};
use generated_types::wal;
use internal_types::data::ReplicatedWrite;
use object_store::{path::ObjectStorePath, ObjectStore, ObjectStoreApi};
use std::{
@ -567,8 +567,9 @@ fn database_object_store_path(
#[cfg(test)]
mod tests {
use super::*;
use data_types::{data::lines_to_replicated_write, database_rules::DatabaseRules};
use data_types::database_rules::DatabaseRules;
use influxdb_line_protocol::parse_lines;
use internal_types::data::lines_to_replicated_write;
use object_store::memory::InMemory;
#[test]

View File

@ -10,9 +10,8 @@ use std::{
};
use async_trait::async_trait;
use data_types::{
chunk::ChunkSummary, data::ReplicatedWrite, database_rules::DatabaseRules, selection::Selection,
};
use data_types::{chunk::ChunkSummary, database_rules::DatabaseRules};
use internal_types::{data::ReplicatedWrite, selection::Selection};
use mutable_buffer::MutableBufferDb;
use parking_lot::Mutex;
use query::{Database, PartitionChunk};

View File

@ -1,9 +1,6 @@
use arrow_deps::datafusion::physical_plan::SendableRecordBatchStream;
use data_types::{
chunk::{ChunkStorage, ChunkSummary},
schema::Schema,
selection::Selection,
};
use data_types::chunk::{ChunkStorage, ChunkSummary};
use internal_types::{schema::Schema, selection::Selection};
use mutable_buffer::chunk::Chunk as MBChunk;
use query::{exec::stringset::StringSet, predicate::Predicate, PartitionChunk};
use read_buffer::Database as ReadBufferDb;
@ -33,7 +30,9 @@ pub enum Error {
},
#[snafu(display("Internal error restricting schema: {}", source))]
InternalSelectingSchema { source: data_types::schema::Error },
InternalSelectingSchema {
source: internal_types::schema::Error,
},
#[snafu(display("Predicate conversion error: {}", source))]
PredicateConversion { source: super::pred::Error },

View File

@ -8,7 +8,7 @@ use arrow_deps::{
},
datafusion::physical_plan::RecordBatchStream,
};
use data_types::selection::Selection;
use internal_types::selection::Selection;
use mutable_buffer::chunk::Chunk as MBChunk;
use read_buffer::ReadFilterResults;

View File

@ -80,11 +80,12 @@ use snafu::{OptionExt, ResultExt, Snafu};
use tracing::{debug, error, info};
use data_types::{
data::{lines_to_replicated_write, ReplicatedWrite},
database_rules::{DatabaseRules, WriterId},
job::Job,
{DatabaseName, DatabaseNameError},
};
use internal_types::data::{lines_to_replicated_write, ReplicatedWrite};
use influxdb_line_protocol::ParsedLine;
use object_store::{path::ObjectStorePath, ObjectStore, ObjectStoreApi};
use query::{exec::Executor, DatabaseStore};

View File

@ -1,7 +1,7 @@
//! Tests for the table_names implementation
use arrow_deps::arrow::datatypes::DataType;
use data_types::{schema::builder::SchemaBuilder, selection::Selection};
use internal_types::{schema::builder::SchemaBuilder, selection::Selection};
use query::{Database, PartitionChunk};
use super::scenarios::*;

View File

@ -5,10 +5,8 @@ use arrow_deps::{
datafusion::physical_plan::SendableRecordBatchStream,
parquet::{self, arrow::ArrowWriter, file::writer::TryClone},
};
use data_types::{
partition_metadata::{PartitionSummary, TableSummary},
selection::Selection,
};
use data_types::partition_metadata::{PartitionSummary, TableSummary};
use internal_types::selection::Selection;
use object_store::{path::ObjectStorePath, ObjectStore, ObjectStoreApi};
use query::{predicate::EMPTY_PREDICATE, PartitionChunk};

View File

@ -1,9 +1,9 @@
use data_types::schema::Schema;
use influxdb_line_protocol::parse_lines;
use ingest::{
parquet::writer::{CompressionLevel, Error as ParquetWriterError, IOxParquetTableWriter},
ConversionSettings, Error as IngestError, LineProtocolConverter, TSMFileConverter,
};
use internal_types::schema::Schema;
use packers::{Error as TableError, IOxTableWriter, IOxTableWriterSource};
use snafu::{OptionExt, ResultExt, Snafu};
use std::{