From 7c9a21632be89fd773fdf58e9d990fb9419fe9be Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Mon, 26 Jul 2021 14:10:02 -0400 Subject: [PATCH] refactor: Organize uses --- data_types/src/chunk_metadata.rs | 3 +- generated_types/src/chunk.rs | 12 ++++--- lifecycle/src/lib.rs | 13 +++---- lifecycle/src/policy.rs | 48 +++++++++++++------------- server/src/db.rs | 26 +++++++------- server/src/db/catalog/chunk.rs | 10 +++--- server/src/db/chunk.rs | 2 +- server/src/db/lifecycle.rs | 40 ++++++++++----------- server/src/db/lifecycle/persist.rs | 15 ++++---- server/src/db/system_tables.rs | 14 +++----- server/src/db/system_tables/chunks.rs | 30 ++++++++-------- server/src/db/system_tables/columns.rs | 36 +++++++++---------- 12 files changed, 120 insertions(+), 129 deletions(-) diff --git a/data_types/src/chunk_metadata.rs b/data_types/src/chunk_metadata.rs index 32dd14be26..7e4c19623d 100644 --- a/data_types/src/chunk_metadata.rs +++ b/data_types/src/chunk_metadata.rs @@ -1,9 +1,8 @@ //! Module contains a representation of chunk metadata -use std::sync::Arc; - use crate::partition_metadata::PartitionAddr; use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; +use std::sync::Arc; /// Address of the chunk within the catalog #[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] diff --git a/generated_types/src/chunk.rs b/generated_types/src/chunk.rs index ab02c89572..05700d2449 100644 --- a/generated_types/src/chunk.rs +++ b/generated_types/src/chunk.rs @@ -1,8 +1,12 @@ -use crate::google::{FieldViolation, FromFieldOpt}; -use crate::influxdata::iox::management::v1 as management; +use crate::{ + google::{FieldViolation, FromFieldOpt}, + influxdata::iox::management::v1 as management, +}; use data_types::chunk_metadata::{ChunkLifecycleAction, ChunkStorage, ChunkSummary}; -use std::convert::{TryFrom, TryInto}; -use std::sync::Arc; +use std::{ + convert::{TryFrom, TryInto}, + sync::Arc, +}; /// Conversion code to management API chunk structure impl From for management::Chunk { diff --git a/lifecycle/src/lib.rs b/lifecycle/src/lib.rs index c8ccd92bee..3a76aaf48a 100644 --- a/lifecycle/src/lib.rs +++ b/lifecycle/src/lib.rs @@ -9,18 +9,19 @@ )] use chrono::{DateTime, Utc}; - -use data_types::chunk_metadata::{ChunkAddr, ChunkLifecycleAction, ChunkStorage}; -use data_types::database_rules::LifecycleRules; -use data_types::DatabaseName; -pub use guard::*; +use data_types::{ + chunk_metadata::{ChunkAddr, ChunkLifecycleAction, ChunkStorage}, + database_rules::LifecycleRules, + DatabaseName, +}; use internal_types::access::AccessMetrics; -pub use policy::*; use std::time::Instant; use tracker::TaskTracker; mod guard; +pub use guard::*; mod policy; +pub use policy::*; /// A trait that encapsulates the database logic that is automated by `LifecyclePolicy` pub trait LifecycleDb { diff --git a/lifecycle/src/policy.rs b/lifecycle/src/policy.rs index 73c4760587..ffa56e6ea2 100644 --- a/lifecycle/src/policy.rs +++ b/lifecycle/src/policy.rs @@ -1,21 +1,22 @@ -use std::convert::TryInto; -use std::fmt::Debug; -use std::time::{Duration, Instant}; - -use chrono::{DateTime, Utc}; -use data_types::DatabaseName; -use futures::future::BoxFuture; - -use data_types::chunk_metadata::{ChunkLifecycleAction, ChunkStorage}; -use data_types::database_rules::LifecycleRules; -use observability_deps::tracing::{debug, info, trace, warn}; -use tracker::TaskTracker; - use crate::{ LifecycleChunk, LifecycleDb, LifecyclePartition, LockableChunk, LockablePartition, PersistHandle, }; +use chrono::{DateTime, Utc}; +use data_types::{ + chunk_metadata::{ChunkLifecycleAction, ChunkStorage}, + database_rules::LifecycleRules, + DatabaseName, +}; +use futures::future::BoxFuture; use internal_types::access::AccessMetrics; +use observability_deps::tracing::{debug, info, trace, warn}; +use std::{ + convert::TryInto, + fmt::Debug, + time::{Duration, Instant}, +}; +use tracker::TaskTracker; /// Number of seconds to wait before retying a failed lifecycle action pub const LIFECYCLE_ACTION_BACKOFF: Duration = Duration::from_secs(10); @@ -654,21 +655,20 @@ fn sort_free_candidates

(candidates: &mut Vec>) { #[cfg(test)] mod tests { - use std::cmp::max; - use std::collections::BTreeMap; - use std::convert::Infallible; - use std::num::{NonZeroU32, NonZeroUsize}; - use std::sync::Arc; - - use data_types::chunk_metadata::{ChunkAddr, ChunkStorage}; - use tracker::{RwLock, TaskId, TaskRegistration, TaskRegistry}; - + use super::*; use crate::{ ChunkLifecycleAction, LifecycleReadGuard, LifecycleWriteGuard, LockableChunk, LockablePartition, PersistHandle, }; - - use super::*; + use data_types::chunk_metadata::{ChunkAddr, ChunkStorage}; + use std::{ + cmp::max, + collections::BTreeMap, + convert::Infallible, + num::{NonZeroU32, NonZeroUsize}, + sync::Arc, + }; + use tracker::{RwLock, TaskId, TaskRegistration, TaskRegistry}; #[derive(Debug, Eq, PartialEq)] enum MoverEvents { diff --git a/server/src/db.rs b/server/src/db.rs index f40f619b4a..a0124ee779 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -1,15 +1,17 @@ //! This module contains the main IOx Database object which has the //! instances of the mutable buffer, read buffer, and object store -use crate::db::catalog::chunk::ChunkStage; -use crate::db::catalog::table::TableSchemaUpsertHandle; pub(crate) use crate::db::chunk::DbChunk; -use crate::db::lifecycle::ArcDb; use crate::{ db::{ access::QueryCatalogAccess, - catalog::{chunk::CatalogChunk, partition::Partition, Catalog, TableNameFilter}, - lifecycle::{LockableCatalogChunk, LockableCatalogPartition}, + catalog::{ + chunk::{CatalogChunk, ChunkStage}, + partition::Partition, + table::TableSchemaUpsertHandle, + Catalog, TableNameFilter, + }, + lifecycle::{ArcDb, LockableCatalogChunk, LockableCatalogPartition}, }, JobRegistry, }; @@ -31,13 +33,11 @@ use mutable_buffer::chunk::{ChunkMetrics as MutableBufferChunkMetrics, MBChunk}; use object_store::{path::parsed::DirsAndFileName, ObjectStore}; use observability_deps::tracing::{debug, error, info}; use parking_lot::RwLock; -use parquet_file::catalog::CatalogParquetInfo; use parquet_file::{ - catalog::{CheckpointData, PreservedCatalog}, + catalog::{CatalogParquetInfo, CheckpointData, PreservedCatalog}, cleanup::{delete_files as delete_parquet_files, get_unreferenced_parquet_files}, }; -use persistence_windows::checkpoint::ReplayPlan; -use persistence_windows::persistence_windows::PersistenceWindows; +use persistence_windows::{checkpoint::ReplayPlan, persistence_windows::PersistenceWindows}; use query::{exec::Executor, predicate::Predicate, QueryDatabase}; use rand_distr::{Distribution, Poisson}; use snafu::{ensure, OptionExt, ResultExt, Snafu}; @@ -50,8 +50,10 @@ use std::{ }, time::{Duration, Instant}, }; -use write_buffer::config::WriteBufferConfig; -use write_buffer::core::{FetchHighWatermark, WriteBufferError}; +use write_buffer::{ + config::WriteBufferConfig, + core::{FetchHighWatermark, WriteBufferError}, +}; pub mod access; pub mod catalog; @@ -1331,11 +1333,11 @@ mod tests { use arrow_util::{assert_batches_eq, assert_batches_sorted_eq}; use bytes::Bytes; use chrono::DateTime; - use data_types::write_summary::TimestampSummary; use data_types::{ chunk_metadata::ChunkStorage, database_rules::{LifecycleRules, PartitionTemplate, TemplatePart}, partition_metadata::{ColumnSummary, InfluxDbType, StatValues, Statistics, TableSummary}, + write_summary::TimestampSummary, }; use entry::{test_helpers::lp_to_entry, Sequence}; use futures::{stream, StreamExt, TryStreamExt}; diff --git a/server/src/db/catalog/chunk.rs b/server/src/db/catalog/chunk.rs index 16bf3274a0..c1966fa4e3 100644 --- a/server/src/db/catalog/chunk.rs +++ b/server/src/db/catalog/chunk.rs @@ -1,16 +1,15 @@ use crate::db::catalog::metrics::{StorageGauge, TimestampHistogram}; use chrono::{DateTime, Utc}; -use data_types::instant::to_approximate_datetime; -use data_types::write_summary::TimestampSummary; use data_types::{ chunk_metadata::{ ChunkAddr, ChunkColumnSummary, ChunkLifecycleAction, ChunkStorage, ChunkSummary, DetailedChunkSummary, }, + instant::to_approximate_datetime, partition_metadata::TableSummary, + write_summary::TimestampSummary, }; -use internal_types::access::AccessRecorder; -use internal_types::schema::Schema; +use internal_types::{access::AccessRecorder, schema::Schema}; use metrics::{Counter, Histogram, KeyValue}; use mutable_buffer::chunk::{snapshot::ChunkSnapshot as MBChunkSnapshot, MBChunk}; use observability_deps::tracing::debug; @@ -969,6 +968,7 @@ impl CatalogChunk { #[cfg(test)] mod tests { + use super::*; use entry::test_helpers::lp_to_entry; use mutable_buffer::chunk::{ChunkMetrics as MBChunkMetrics, MBChunk}; use parquet_file::{ @@ -976,8 +976,6 @@ mod tests { test_utils::{make_chunk as make_parquet_chunk_with_store, make_object_store}, }; - use super::*; - #[test] fn test_new_open() { let addr = chunk_addr(); diff --git a/server/src/db/chunk.rs b/server/src/db/chunk.rs index 3a7f3b70d7..32f3b559cb 100644 --- a/server/src/db/chunk.rs +++ b/server/src/db/chunk.rs @@ -5,8 +5,8 @@ use chrono::{DateTime, Utc}; use data_types::partition_metadata; use datafusion::physical_plan::SendableRecordBatchStream; use datafusion_util::MemoryStream; -use internal_types::access::AccessRecorder; use internal_types::{ + access::AccessRecorder, schema::{sort::SortKey, Schema}, selection::Selection, }; diff --git a/server/src/db/lifecycle.rs b/server/src/db/lifecycle.rs index c0fdc545b3..970f1361e6 100644 --- a/server/src/db/lifecycle.rs +++ b/server/src/db/lifecycle.rs @@ -1,20 +1,23 @@ -use std::fmt::Display; -use std::sync::Arc; -use std::time::Instant; - -use chrono::{DateTime, TimeZone, Utc}; - +use super::DbChunk; +use crate::{ + db::catalog::{chunk::CatalogChunk, partition::Partition}, + Db, +}; use ::lifecycle::LifecycleDb; -use data_types::chunk_metadata::{ChunkAddr, ChunkLifecycleAction, ChunkStorage}; -use data_types::database_rules::LifecycleRules; -use data_types::error::ErrorLogger; -use data_types::job::Job; -use data_types::partition_metadata::Statistics; -use data_types::DatabaseName; +use chrono::{DateTime, TimeZone, Utc}; +use data_types::{ + chunk_metadata::{ChunkAddr, ChunkLifecycleAction, ChunkStorage}, + database_rules::LifecycleRules, + error::ErrorLogger, + job::Job, + partition_metadata::Statistics, + DatabaseName, +}; use datafusion::physical_plan::SendableRecordBatchStream; -use internal_types::access::AccessMetrics; -use internal_types::schema::merge::SchemaMerger; -use internal_types::schema::{Schema, TIME_COLUMN_NAME}; +use internal_types::{ + access::AccessMetrics, + schema::{merge::SchemaMerger, Schema, TIME_COLUMN_NAME}, +}; use lifecycle::{ LifecycleChunk, LifecyclePartition, LifecycleReadGuard, LifecycleWriteGuard, LockableChunk, LockablePartition, @@ -22,12 +25,9 @@ use lifecycle::{ use observability_deps::tracing::{info, trace}; use persistence_windows::persistence_windows::FlushHandle; use query::QueryChunkMeta; +use std::{fmt::Display, sync::Arc, time::Instant}; use tracker::{RwLock, TaskTracker}; -use crate::db::catalog::chunk::CatalogChunk; -use crate::db::catalog::partition::Partition; -use crate::Db; - pub(crate) use compact::compact_chunks; pub(crate) use drop::drop_chunk; pub(crate) use error::{Error, Result}; @@ -35,8 +35,6 @@ pub(crate) use move_chunk::move_chunk_to_read_buffer; pub(crate) use persist::persist_chunks; pub(crate) use unload::unload_read_buffer_chunk; -use super::DbChunk; - mod compact; mod drop; mod error; diff --git a/server/src/db/lifecycle/persist.rs b/server/src/db/lifecycle/persist.rs index fa9eaf5ff1..c5679e7f0d 100644 --- a/server/src/db/lifecycle/persist.rs +++ b/server/src/db/lifecycle/persist.rs @@ -168,19 +168,16 @@ pub fn persist_chunks( #[cfg(test)] mod tests { - use std::num::{NonZeroU32, NonZeroU64}; - use std::time::Instant; - + use super::*; + use crate::{db::test_helpers::write_lp, utils::TestDb}; use chrono::{TimeZone, Utc}; - use data_types::database_rules::LifecycleRules; use lifecycle::{LockableChunk, LockablePartition}; use query::QueryDatabase; - - use crate::db::test_helpers::write_lp; - use crate::utils::TestDb; - - use super::*; + use std::{ + num::{NonZeroU32, NonZeroU64}, + time::Instant, + }; #[tokio::test] async fn test_flush_overlapping() { diff --git a/server/src/db/system_tables.rs b/server/src/db/system_tables.rs index bcc474e230..0050df1c3a 100644 --- a/server/src/db/system_tables.rs +++ b/server/src/db/system_tables.rs @@ -7,26 +7,21 @@ //! //! For example `SELECT * FROM system.chunks` -use std::any::Any; -use std::sync::Arc; - +use super::catalog::Catalog; +use crate::JobRegistry; use arrow::{ datatypes::{Field, Schema, SchemaRef}, error::Result, record_batch::RecordBatch, }; use chrono::{DateTime, Utc}; - use datafusion::{ catalog::schema::SchemaProvider, datasource::{datasource::Statistics, TableProvider}, error::{DataFusionError, Result as DataFusionResult}, physical_plan::{memory::MemoryExec, ExecutionPlan}, }; - -use crate::JobRegistry; - -use super::catalog::Catalog; +use std::{any::Any, sync::Arc}; mod chunks; mod columns; @@ -205,11 +200,10 @@ fn scan_batch( #[cfg(test)] mod tests { + use super::*; use arrow::array::{ArrayRef, UInt64Array}; use arrow_util::assert_batches_eq; - use super::*; - fn seq_array(start: u64, end: u64) -> ArrayRef { Arc::new(UInt64Array::from_iter_values(start..end)) } diff --git a/server/src/db/system_tables/chunks.rs b/server/src/db/system_tables/chunks.rs index a635c659a1..be863f24b5 100644 --- a/server/src/db/system_tables/chunks.rs +++ b/server/src/db/system_tables/chunks.rs @@ -1,16 +1,16 @@ +use crate::db::{ + catalog::Catalog, + system_tables::{time_to_ts, IoxSystemTable}, +}; +use arrow::{ + array::{StringArray, TimestampNanosecondArray, UInt32Array, UInt64Array}, + datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit}, + error::Result, + record_batch::RecordBatch, +}; +use data_types::{chunk_metadata::ChunkSummary, error::ErrorLogger}; use std::sync::Arc; -use arrow::array::{StringArray, TimestampNanosecondArray, UInt32Array, UInt64Array}; -use arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit}; -use arrow::error::Result; -use arrow::record_batch::RecordBatch; - -use data_types::chunk_metadata::ChunkSummary; -use data_types::error::ErrorLogger; - -use crate::db::catalog::Catalog; -use crate::db::system_tables::{time_to_ts, IoxSystemTable}; - /// Implementation of system.chunks table #[derive(Debug)] pub(super) struct ChunksTable { @@ -128,12 +128,10 @@ fn from_chunk_summaries(schema: SchemaRef, chunks: Vec) -> Result< #[cfg(test)] mod tests { - use chrono::{TimeZone, Utc}; - - use arrow_util::assert_batches_eq; - use data_types::chunk_metadata::{ChunkLifecycleAction, ChunkStorage}; - use super::*; + use arrow_util::assert_batches_eq; + use chrono::{TimeZone, Utc}; + use data_types::chunk_metadata::{ChunkLifecycleAction, ChunkStorage}; #[test] fn test_from_chunk_summaries() { diff --git a/server/src/db/system_tables/columns.rs b/server/src/db/system_tables/columns.rs index c4682bde22..f4db734bf2 100644 --- a/server/src/db/system_tables/columns.rs +++ b/server/src/db/system_tables/columns.rs @@ -1,17 +1,16 @@ -use std::collections::HashMap; -use std::sync::Arc; - -use arrow::array::{ArrayRef, StringBuilder, UInt32Builder, UInt64Builder}; -use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; -use arrow::error::Result; -use arrow::record_batch::RecordBatch; - -use data_types::chunk_metadata::DetailedChunkSummary; -use data_types::error::ErrorLogger; -use data_types::partition_metadata::{PartitionSummary, TableSummary}; - -use crate::db::catalog::Catalog; -use crate::db::system_tables::IoxSystemTable; +use crate::db::{catalog::Catalog, system_tables::IoxSystemTable}; +use arrow::{ + array::{ArrayRef, StringBuilder, UInt32Builder, UInt64Builder}, + datatypes::{DataType, Field, Schema, SchemaRef}, + error::Result, + record_batch::RecordBatch, +}; +use data_types::{ + chunk_metadata::DetailedChunkSummary, + error::ErrorLogger, + partition_metadata::{PartitionSummary, TableSummary}, +}; +use std::{collections::HashMap, sync::Arc}; /// Implementation of `system.columns` system table #[derive(Debug)] @@ -217,11 +216,12 @@ fn assemble_chunk_columns( #[cfg(test)] mod tests { - use arrow_util::assert_batches_eq; - use data_types::chunk_metadata::{ChunkColumnSummary, ChunkStorage, ChunkSummary}; - use data_types::partition_metadata::{ColumnSummary, InfluxDbType, StatValues, Statistics}; - use super::*; + use arrow_util::assert_batches_eq; + use data_types::{ + chunk_metadata::{ChunkColumnSummary, ChunkStorage, ChunkSummary}, + partition_metadata::{ColumnSummary, InfluxDbType, StatValues, Statistics}, + }; #[test] fn test_from_partition_summaries() {