From 0a724878e672f9d2ad16f4770623d061c1430d5b Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Mon, 12 Jul 2021 21:49:48 -0400 Subject: [PATCH] refactor: Organize uses --- mutable_buffer/src/chunk/snapshot.rs | 21 +++++------ read_buffer/benches/database.rs | 6 +-- read_buffer/src/chunk.rs | 56 +++++++++++++--------------- read_buffer/src/table.rs | 25 ++++++------- server/src/db/chunk.rs | 15 +++----- server/src/db/lifecycle/compact.rs | 25 +++++-------- server/src/db/lifecycle/persist.rs | 22 ++++------- 7 files changed, 71 insertions(+), 99 deletions(-) diff --git a/mutable_buffer/src/chunk/snapshot.rs b/mutable_buffer/src/chunk/snapshot.rs index 03cbc647c4..a6395fe4dd 100644 --- a/mutable_buffer/src/chunk/snapshot.rs +++ b/mutable_buffer/src/chunk/snapshot.rs @@ -1,19 +1,16 @@ -use std::collections::BTreeSet; -use std::sync::Arc; - +use super::MBChunk; use arrow::record_batch::RecordBatch; -use snafu::{ResultExt, Snafu}; - -use data_types::partition_metadata::TableSummary; -use data_types::timestamp::TimestampRange; use data_types::{ error::ErrorLogger, - partition_metadata::{ColumnSummary, Statistics}, + partition_metadata::{ColumnSummary, Statistics, TableSummary}, + timestamp::TimestampRange, }; -use internal_types::schema::{Schema, TIME_COLUMN_NAME}; -use internal_types::selection::Selection; - -use super::MBChunk; +use internal_types::{ + schema::{Schema, TIME_COLUMN_NAME}, + selection::Selection, +}; +use snafu::{ResultExt, Snafu}; +use std::{collections::BTreeSet, sync::Arc}; #[derive(Debug, Snafu)] pub enum Error { diff --git a/read_buffer/benches/database.rs b/read_buffer/benches/database.rs index 4e41d734bb..46b61cd07a 100644 --- a/read_buffer/benches/database.rs +++ b/read_buffer/benches/database.rs @@ -1,13 +1,11 @@ -use std::sync::Arc; - -use criterion::{criterion_group, criterion_main, BatchSize, Criterion}; - use arrow::{ array::{ArrayRef, Int64Array, StringArray}, record_batch::RecordBatch, }; +use criterion::{criterion_group, criterion_main, BatchSize, Criterion}; use internal_types::schema::builder::SchemaBuilder; use read_buffer::{BinaryExpr, ChunkMetrics, Predicate, RBChunk}; +use std::sync::Arc; const BASE_TIME: i64 = 1351700038292387000_i64; const ONE_MS: i64 = 1_000_000; diff --git a/read_buffer/src/chunk.rs b/read_buffer/src/chunk.rs index 18e3ed7e9e..9627235569 100644 --- a/read_buffer/src/chunk.rs +++ b/read_buffer/src/chunk.rs @@ -1,22 +1,20 @@ +use crate::{ + column::Statistics, + row_group::{ColumnName, Predicate, RowGroup}, + schema::{AggregateType, ResultSchema}, + table::{self, Table}, +}; +use arrow::record_batch::RecordBatch; +use data_types::{chunk_metadata::ChunkColumnSummary, partition_metadata::TableSummary}; +use internal_types::{schema::builder::Error as SchemaError, schema::Schema, selection::Selection}; +use metrics::{Gauge, KeyValue}; +use observability_deps::tracing::info; +use snafu::{ResultExt, Snafu}; use std::{ collections::{BTreeMap, BTreeSet}, convert::TryFrom, }; -use metrics::{Gauge, KeyValue}; -use snafu::{ResultExt, Snafu}; - -use arrow::record_batch::RecordBatch; -use data_types::{chunk_metadata::ChunkColumnSummary, partition_metadata::TableSummary}; -use internal_types::{schema::builder::Error as SchemaError, schema::Schema, selection::Selection}; -use observability_deps::tracing::info; - -use crate::row_group::{ColumnName, Predicate}; -use crate::schema::{AggregateType, ResultSchema}; -use crate::table; -use crate::table::Table; -use crate::{column::Statistics, row_group::RowGroup}; - #[derive(Debug, Snafu)] pub enum Error { #[snafu(display("unsupported operation: {}", msg))] @@ -435,27 +433,25 @@ impl ChunkMetrics { #[cfg(test)] mod test { - use std::sync::Arc; - - use arrow::{ - array::{ - ArrayRef, BinaryArray, BooleanArray, Float64Array, Int64Array, StringArray, - TimestampNanosecondArray, UInt64Array, - }, - datatypes::DataType::{Boolean, Float64, Int64, UInt64, Utf8}, - }; - use data_types::partition_metadata::{ColumnSummary, InfluxDbType, StatValues, Statistics}; - use internal_types::schema::builder::SchemaBuilder; - use super::*; - use crate::BinaryExpr; use crate::{ row_group::{ColumnType, RowGroup}, value::Values, + BinaryExpr, }; - use arrow::array::DictionaryArray; - use arrow::datatypes::Int32Type; - use std::num::NonZeroU64; + use arrow::{ + array::{ + ArrayRef, BinaryArray, BooleanArray, DictionaryArray, Float64Array, Int64Array, + StringArray, TimestampNanosecondArray, UInt64Array, + }, + datatypes::{ + DataType::{Boolean, Float64, Int64, UInt64, Utf8}, + Int32Type, + }, + }; + use data_types::partition_metadata::{ColumnSummary, InfluxDbType, StatValues, Statistics}; + use internal_types::schema::builder::SchemaBuilder; + use std::{num::NonZeroU64, sync::Arc}; // helper to make the `add_remove_tables` test simpler to read. fn gen_recordbatch() -> RecordBatch { diff --git a/read_buffer/src/table.rs b/read_buffer/src/table.rs index 346c9bf70a..3e0eac2e7d 100644 --- a/read_buffer/src/table.rs +++ b/read_buffer/src/table.rs @@ -1,3 +1,14 @@ +use crate::{ + column, + row_group::{self, ColumnName, Predicate, RowGroup}, + schema::{AggregateType, ColumnType, LogicalDataType, ResultSchema}, + value::{OwnedValue, Scalar, Value}, +}; +use arrow::record_batch::RecordBatch; +use data_types::{chunk_metadata::ChunkColumnSummary, partition_metadata::TableSummary}; +use internal_types::selection::Selection; +use parking_lot::RwLock; +use snafu::{ensure, Snafu}; use std::{ collections::{BTreeMap, BTreeSet}, convert::TryInto, @@ -5,20 +16,6 @@ use std::{ sync::Arc, }; -use parking_lot::RwLock; -use snafu::{ensure, Snafu}; - -use arrow::record_batch::RecordBatch; -use data_types::{chunk_metadata::ChunkColumnSummary, partition_metadata::TableSummary}; -use internal_types::selection::Selection; - -use crate::schema::{AggregateType, ColumnType, LogicalDataType, ResultSchema}; -use crate::value::{OwnedValue, Scalar, Value}; -use crate::{ - column, - row_group::{self, ColumnName, Predicate, RowGroup}, -}; - #[derive(Debug, Snafu)] pub enum Error { #[snafu(display("cannot drop last row group in table; drop table"))] diff --git a/server/src/db/chunk.rs b/server/src/db/chunk.rs index 6ea5e4e21c..b31f9ef9a3 100644 --- a/server/src/db/chunk.rs +++ b/server/src/db/chunk.rs @@ -1,10 +1,6 @@ -use std::{ - collections::{BTreeMap, BTreeSet}, - sync::Arc, +use super::{ + catalog::chunk::ChunkMetadata, pred::to_read_buffer_predicate, streams::ReadFilterResultsStream, }; - -use snafu::{OptionExt, ResultExt, Snafu}; - use data_types::partition_metadata; use datafusion::physical_plan::SendableRecordBatchStream; use datafusion_util::MemoryStream; @@ -24,9 +20,10 @@ use query::{ QueryChunk, QueryChunkMeta, }; use read_buffer::RBChunk; - -use super::{ - catalog::chunk::ChunkMetadata, pred::to_read_buffer_predicate, streams::ReadFilterResultsStream, +use snafu::{OptionExt, ResultExt, Snafu}; +use std::{ + collections::{BTreeMap, BTreeSet}, + sync::Arc, }; #[derive(Debug, Snafu)] diff --git a/server/src/db/lifecycle/compact.rs b/server/src/db/lifecycle/compact.rs index a7b07d48de..15a02e11b0 100644 --- a/server/src/db/lifecycle/compact.rs +++ b/server/src/db/lifecycle/compact.rs @@ -1,24 +1,18 @@ //! This module contains the code to compact chunks together -use std::future::Future; -use std::sync::Arc; - +use super::{error::Result, merge_schemas, LockableCatalogChunk, LockableCatalogPartition}; +use crate::db::{ + catalog::{chunk::CatalogChunk, partition::Partition}, + lifecycle::{collect_rub, new_rub_chunk}, + DbChunk, +}; use data_types::job::Job; use lifecycle::LifecycleWriteGuard; use observability_deps::tracing::info; -use query::exec::ExecutorType; -use query::frontend::reorg::ReorgPlanner; -use query::{compute_sort_key, QueryChunkMeta}; +use query::{compute_sort_key, exec::ExecutorType, frontend::reorg::ReorgPlanner, QueryChunkMeta}; +use std::{future::Future, sync::Arc}; use tracker::{TaskTracker, TrackedFuture, TrackedFutureExt}; -use crate::db::catalog::chunk::CatalogChunk; -use crate::db::catalog::partition::Partition; -use crate::db::DbChunk; - -use super::merge_schemas; -use super::{error::Result, LockableCatalogChunk, LockableCatalogPartition}; -use crate::db::lifecycle::{collect_rub, new_rub_chunk}; - /// Compact the provided chunks into a single chunk, /// returning the newly created chunk /// @@ -116,8 +110,7 @@ pub(crate) fn compact_chunks( #[cfg(test)] mod tests { use super::*; - use crate::db::test_helpers::write_lp; - use crate::utils::make_db; + use crate::{db::test_helpers::write_lp, utils::make_db}; use data_types::chunk_metadata::ChunkStorage; use lifecycle::{LockableChunk, LockablePartition}; use query::QueryDatabase; diff --git a/server/src/db/lifecycle/persist.rs b/server/src/db/lifecycle/persist.rs index 84c87488f4..37ae0f541e 100644 --- a/server/src/db/lifecycle/persist.rs +++ b/server/src/db/lifecycle/persist.rs @@ -1,25 +1,19 @@ //! This module contains the code that splits and persist chunks -use std::future::Future; -use std::sync::Arc; - +use super::{LockableCatalogChunk, LockableCatalogPartition, Result}; +use crate::db::{ + catalog::{chunk::CatalogChunk, partition::Partition}, + lifecycle::{collect_rub, merge_schemas, new_rub_chunk, write::write_chunk_to_object_store}, + DbChunk, +}; use data_types::job::Job; use lifecycle::{LifecycleWriteGuard, LockableChunk, LockablePartition}; use observability_deps::tracing::info; use persistence_windows::persistence_windows::FlushHandle; -use query::exec::ExecutorType; -use query::frontend::reorg::ReorgPlanner; -use query::{compute_sort_key, QueryChunkMeta}; +use query::{compute_sort_key, exec::ExecutorType, frontend::reorg::ReorgPlanner, QueryChunkMeta}; +use std::{future::Future, sync::Arc}; use tracker::{TaskTracker, TrackedFuture, TrackedFutureExt}; -use crate::db::catalog::chunk::CatalogChunk; -use crate::db::catalog::partition::Partition; -use crate::db::lifecycle::write::write_chunk_to_object_store; -use crate::db::lifecycle::{collect_rub, merge_schemas, new_rub_chunk}; -use crate::db::DbChunk; - -use super::{LockableCatalogChunk, LockableCatalogPartition, Result}; - /// Split and then persist the provided chunks /// /// TODO: Replace low-level locks with transaction object