refactor: Organize uses

pull/24376/head
Carol (Nichols || Goulding) 2021-07-12 21:49:48 -04:00
parent 7371b0aabf
commit 0a724878e6
7 changed files with 71 additions and 99 deletions

View File

@ -1,19 +1,16 @@
use std::collections::BTreeSet;
use std::sync::Arc;
use super::MBChunk;
use arrow::record_batch::RecordBatch;
use snafu::{ResultExt, Snafu};
use data_types::partition_metadata::TableSummary;
use data_types::timestamp::TimestampRange;
use data_types::{
error::ErrorLogger,
partition_metadata::{ColumnSummary, Statistics},
partition_metadata::{ColumnSummary, Statistics, TableSummary},
timestamp::TimestampRange,
};
use internal_types::schema::{Schema, TIME_COLUMN_NAME};
use internal_types::selection::Selection;
use super::MBChunk;
use internal_types::{
schema::{Schema, TIME_COLUMN_NAME},
selection::Selection,
};
use snafu::{ResultExt, Snafu};
use std::{collections::BTreeSet, sync::Arc};
#[derive(Debug, Snafu)]
pub enum Error {

View File

@ -1,13 +1,11 @@
use std::sync::Arc;
use criterion::{criterion_group, criterion_main, BatchSize, Criterion};
use arrow::{
array::{ArrayRef, Int64Array, StringArray},
record_batch::RecordBatch,
};
use criterion::{criterion_group, criterion_main, BatchSize, Criterion};
use internal_types::schema::builder::SchemaBuilder;
use read_buffer::{BinaryExpr, ChunkMetrics, Predicate, RBChunk};
use std::sync::Arc;
const BASE_TIME: i64 = 1351700038292387000_i64;
const ONE_MS: i64 = 1_000_000;

View File

@ -1,22 +1,20 @@
use crate::{
column::Statistics,
row_group::{ColumnName, Predicate, RowGroup},
schema::{AggregateType, ResultSchema},
table::{self, Table},
};
use arrow::record_batch::RecordBatch;
use data_types::{chunk_metadata::ChunkColumnSummary, partition_metadata::TableSummary};
use internal_types::{schema::builder::Error as SchemaError, schema::Schema, selection::Selection};
use metrics::{Gauge, KeyValue};
use observability_deps::tracing::info;
use snafu::{ResultExt, Snafu};
use std::{
collections::{BTreeMap, BTreeSet},
convert::TryFrom,
};
use metrics::{Gauge, KeyValue};
use snafu::{ResultExt, Snafu};
use arrow::record_batch::RecordBatch;
use data_types::{chunk_metadata::ChunkColumnSummary, partition_metadata::TableSummary};
use internal_types::{schema::builder::Error as SchemaError, schema::Schema, selection::Selection};
use observability_deps::tracing::info;
use crate::row_group::{ColumnName, Predicate};
use crate::schema::{AggregateType, ResultSchema};
use crate::table;
use crate::table::Table;
use crate::{column::Statistics, row_group::RowGroup};
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("unsupported operation: {}", msg))]
@ -435,27 +433,25 @@ impl ChunkMetrics {
#[cfg(test)]
mod test {
use std::sync::Arc;
use arrow::{
array::{
ArrayRef, BinaryArray, BooleanArray, Float64Array, Int64Array, StringArray,
TimestampNanosecondArray, UInt64Array,
},
datatypes::DataType::{Boolean, Float64, Int64, UInt64, Utf8},
};
use data_types::partition_metadata::{ColumnSummary, InfluxDbType, StatValues, Statistics};
use internal_types::schema::builder::SchemaBuilder;
use super::*;
use crate::BinaryExpr;
use crate::{
row_group::{ColumnType, RowGroup},
value::Values,
BinaryExpr,
};
use arrow::array::DictionaryArray;
use arrow::datatypes::Int32Type;
use std::num::NonZeroU64;
use arrow::{
array::{
ArrayRef, BinaryArray, BooleanArray, DictionaryArray, Float64Array, Int64Array,
StringArray, TimestampNanosecondArray, UInt64Array,
},
datatypes::{
DataType::{Boolean, Float64, Int64, UInt64, Utf8},
Int32Type,
},
};
use data_types::partition_metadata::{ColumnSummary, InfluxDbType, StatValues, Statistics};
use internal_types::schema::builder::SchemaBuilder;
use std::{num::NonZeroU64, sync::Arc};
// helper to make the `add_remove_tables` test simpler to read.
fn gen_recordbatch() -> RecordBatch {

View File

@ -1,3 +1,14 @@
use crate::{
column,
row_group::{self, ColumnName, Predicate, RowGroup},
schema::{AggregateType, ColumnType, LogicalDataType, ResultSchema},
value::{OwnedValue, Scalar, Value},
};
use arrow::record_batch::RecordBatch;
use data_types::{chunk_metadata::ChunkColumnSummary, partition_metadata::TableSummary};
use internal_types::selection::Selection;
use parking_lot::RwLock;
use snafu::{ensure, Snafu};
use std::{
collections::{BTreeMap, BTreeSet},
convert::TryInto,
@ -5,20 +16,6 @@ use std::{
sync::Arc,
};
use parking_lot::RwLock;
use snafu::{ensure, Snafu};
use arrow::record_batch::RecordBatch;
use data_types::{chunk_metadata::ChunkColumnSummary, partition_metadata::TableSummary};
use internal_types::selection::Selection;
use crate::schema::{AggregateType, ColumnType, LogicalDataType, ResultSchema};
use crate::value::{OwnedValue, Scalar, Value};
use crate::{
column,
row_group::{self, ColumnName, Predicate, RowGroup},
};
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("cannot drop last row group in table; drop table"))]

View File

@ -1,10 +1,6 @@
use std::{
collections::{BTreeMap, BTreeSet},
sync::Arc,
use super::{
catalog::chunk::ChunkMetadata, pred::to_read_buffer_predicate, streams::ReadFilterResultsStream,
};
use snafu::{OptionExt, ResultExt, Snafu};
use data_types::partition_metadata;
use datafusion::physical_plan::SendableRecordBatchStream;
use datafusion_util::MemoryStream;
@ -24,9 +20,10 @@ use query::{
QueryChunk, QueryChunkMeta,
};
use read_buffer::RBChunk;
use super::{
catalog::chunk::ChunkMetadata, pred::to_read_buffer_predicate, streams::ReadFilterResultsStream,
use snafu::{OptionExt, ResultExt, Snafu};
use std::{
collections::{BTreeMap, BTreeSet},
sync::Arc,
};
#[derive(Debug, Snafu)]

View File

@ -1,24 +1,18 @@
//! This module contains the code to compact chunks together
use std::future::Future;
use std::sync::Arc;
use super::{error::Result, merge_schemas, LockableCatalogChunk, LockableCatalogPartition};
use crate::db::{
catalog::{chunk::CatalogChunk, partition::Partition},
lifecycle::{collect_rub, new_rub_chunk},
DbChunk,
};
use data_types::job::Job;
use lifecycle::LifecycleWriteGuard;
use observability_deps::tracing::info;
use query::exec::ExecutorType;
use query::frontend::reorg::ReorgPlanner;
use query::{compute_sort_key, QueryChunkMeta};
use query::{compute_sort_key, exec::ExecutorType, frontend::reorg::ReorgPlanner, QueryChunkMeta};
use std::{future::Future, sync::Arc};
use tracker::{TaskTracker, TrackedFuture, TrackedFutureExt};
use crate::db::catalog::chunk::CatalogChunk;
use crate::db::catalog::partition::Partition;
use crate::db::DbChunk;
use super::merge_schemas;
use super::{error::Result, LockableCatalogChunk, LockableCatalogPartition};
use crate::db::lifecycle::{collect_rub, new_rub_chunk};
/// Compact the provided chunks into a single chunk,
/// returning the newly created chunk
///
@ -116,8 +110,7 @@ pub(crate) fn compact_chunks(
#[cfg(test)]
mod tests {
use super::*;
use crate::db::test_helpers::write_lp;
use crate::utils::make_db;
use crate::{db::test_helpers::write_lp, utils::make_db};
use data_types::chunk_metadata::ChunkStorage;
use lifecycle::{LockableChunk, LockablePartition};
use query::QueryDatabase;

View File

@ -1,25 +1,19 @@
//! This module contains the code that splits and persist chunks
use std::future::Future;
use std::sync::Arc;
use super::{LockableCatalogChunk, LockableCatalogPartition, Result};
use crate::db::{
catalog::{chunk::CatalogChunk, partition::Partition},
lifecycle::{collect_rub, merge_schemas, new_rub_chunk, write::write_chunk_to_object_store},
DbChunk,
};
use data_types::job::Job;
use lifecycle::{LifecycleWriteGuard, LockableChunk, LockablePartition};
use observability_deps::tracing::info;
use persistence_windows::persistence_windows::FlushHandle;
use query::exec::ExecutorType;
use query::frontend::reorg::ReorgPlanner;
use query::{compute_sort_key, QueryChunkMeta};
use query::{compute_sort_key, exec::ExecutorType, frontend::reorg::ReorgPlanner, QueryChunkMeta};
use std::{future::Future, sync::Arc};
use tracker::{TaskTracker, TrackedFuture, TrackedFutureExt};
use crate::db::catalog::chunk::CatalogChunk;
use crate::db::catalog::partition::Partition;
use crate::db::lifecycle::write::write_chunk_to_object_store;
use crate::db::lifecycle::{collect_rub, merge_schemas, new_rub_chunk};
use crate::db::DbChunk;
use super::{LockableCatalogChunk, LockableCatalogPartition, Result};
/// Split and then persist the provided chunks
///
/// TODO: Replace low-level locks with transaction object