From 564238ad8c3608c2351435b056e0d37c7a8d8532 Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Thu, 12 Aug 2021 09:45:52 -0400 Subject: [PATCH] refactor: Organize uses --- parquet_file/src/catalog.rs | 34 +++++++++++----------- server/src/database.rs | 42 +++++++++++++++------------ server/src/lib.rs | 58 +++++++++++++++++-------------------- 3 files changed, 67 insertions(+), 67 deletions(-) diff --git a/parquet_file/src/catalog.rs b/parquet_file/src/catalog.rs index d19e67c92f..ec892f4ed1 100644 --- a/parquet_file/src/catalog.rs +++ b/parquet_file/src/catalog.rs @@ -1,16 +1,4 @@ //! Catalog preservation and transaction handling. -use std::{ - collections::{ - hash_map::Entry::{Occupied, Vacant}, - HashMap, - }, - convert::TryInto, - fmt::{Debug, Display}, - num::TryFromIntError, - str::FromStr, - sync::Arc, -}; - use crate::metadata::IoxParquetMetaData; use bytes::Bytes; use chrono::{DateTime, Utc}; @@ -25,6 +13,17 @@ use observability_deps::tracing::{info, warn}; use parking_lot::RwLock; use prost::{DecodeError, EncodeError, Message}; use snafu::{OptionExt, ResultExt, Snafu}; +use std::{ + collections::{ + hash_map::Entry::{Occupied, Vacant}, + HashMap, + }, + convert::TryInto, + fmt::{Debug, Display}, + num::TryFromIntError, + str::FromStr, + sync::Arc, +}; use tokio::sync::{Semaphore, SemaphorePermit}; use uuid::Uuid; @@ -1563,14 +1562,15 @@ pub mod test_helpers { #[cfg(test)] mod tests { + use super::{ + test_helpers::{ + assert_catalog_state_implementation, break_catalog_with_weird_version, TestCatalogState, + }, + *, + }; use crate::test_utils::{chunk_addr, make_iox_object_store, make_metadata}; use object_store::parsed_path; - use super::test_helpers::{ - assert_catalog_state_implementation, break_catalog_with_weird_version, TestCatalogState, - }; - use super::*; - #[tokio::test] async fn test_create_empty() { let iox_object_store = make_iox_object_store(); diff --git a/server/src/database.rs b/server/src/database.rs index 97c36fd9a8..51e6b31eea 100644 --- a/server/src/database.rs +++ b/server/src/database.rs @@ -1,31 +1,35 @@ -use std::future::Future; -use std::sync::Arc; -use std::time::Duration; - -use data_types::database_state::DatabaseStateCode; -use data_types::server_id::ServerId; -use data_types::{database_rules::DatabaseRules, DatabaseName}; -use futures::future::{BoxFuture, Shared}; -use futures::{FutureExt, TryFutureExt}; +use crate::{ + db::{ + load::{create_preserved_catalog, load_or_create_preserved_catalog}, + DatabaseToCommit, + }, + ApplicationState, Db, DB_RULES_FILE_NAME, +}; +use bytes::BytesMut; +use data_types::{ + database_rules::DatabaseRules, database_state::DatabaseStateCode, server_id::ServerId, + DatabaseName, +}; +use futures::{ + future::{BoxFuture, Shared}, + FutureExt, TryFutureExt, +}; use generated_types::database_rules::encode_database_rules; use internal_types::freezable::Freezable; use iox_object_store::IoxObjectStore; -use object_store::path::{ObjectStorePath, Path}; +use object_store::{ + path::{ObjectStorePath, Path}, + ObjectStore, ObjectStoreApi, +}; use observability_deps::tracing::{error, info, warn}; use parking_lot::RwLock; +use parquet_file::catalog::PreservedCatalog; use persistence_windows::checkpoint::ReplayPlan; use snafu::{ResultExt, Snafu}; -use tokio::sync::Notify; -use tokio::task::JoinError; +use std::{future::Future, sync::Arc, time::Duration}; +use tokio::{sync::Notify, task::JoinError}; use tokio_util::sync::CancellationToken; -use crate::db::load::{create_preserved_catalog, load_or_create_preserved_catalog}; -use crate::db::DatabaseToCommit; -use crate::{ApplicationState, Db, DB_RULES_FILE_NAME}; -use bytes::BytesMut; -use object_store::{ObjectStore, ObjectStoreApi}; -use parquet_file::catalog::PreservedCatalog; - const INIT_BACKOFF: Duration = Duration::from_secs(1); #[derive(Debug, Snafu)] diff --git a/server/src/lib.rs b/server/src/lib.rs index 8224aa712d..4e091ccda5 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -68,13 +68,10 @@ clippy::future_not_send )] -use std::sync::Arc; - use async_trait::async_trait; -use data_types::database_rules::ShardConfig; -use data_types::error::ErrorLogger; use data_types::{ - database_rules::{DatabaseRules, NodeGroup, RoutingRules, ShardId, Sink}, + database_rules::{DatabaseRules, NodeGroup, RoutingRules, ShardConfig, ShardId, Sink}, + error::ErrorLogger, job::Job, server_id::ServerId, {DatabaseName, DatabaseNameError}, @@ -88,26 +85,26 @@ use influxdb_line_protocol::ParsedLine; use internal_types::freezable::Freezable; use lifecycle::LockableChunk; use metrics::{KeyValue, MetricObserverBuilder}; -use object_store::{ObjectStore, ObjectStoreApi}; +use object_store::{ + path::{parsed::DirsAndFileName, ObjectStorePath, Path}, + ObjectStore, ObjectStoreApi, +}; use observability_deps::tracing::{error, info, warn}; use parking_lot::RwLock; use query::exec::Executor; use rand::seq::SliceRandom; use resolver::Resolver; use snafu::{OptionExt, ResultExt, Snafu}; -use tokio::task::JoinError; +use std::sync::Arc; +use tokio::{sync::Notify, task::JoinError}; use tokio_util::sync::CancellationToken; use tracker::{TaskTracker, TrackedFutureExt}; pub use application::ApplicationState; pub use connection::{ConnectionManager, ConnectionManagerImpl, RemoteServer}; - pub use db::Db; pub use job::JobRegistry; -use object_store::path::parsed::DirsAndFileName; -use object_store::path::{ObjectStorePath, Path}; pub use resolver::{GrpcConnectionString, RemoteTemplate}; -use tokio::sync::Notify; mod application; mod connection; @@ -1274,6 +1271,25 @@ fn database_store_prefix( #[cfg(test)] mod tests { + use super::*; + use arrow::record_batch::RecordBatch; + use arrow_util::assert_batches_eq; + use bytes::Bytes; + use connection::test_helpers::{TestConnectionManager, TestRemoteServer}; + use data_types::{ + chunk_metadata::ChunkAddr, + database_rules::{ + HashRing, LifecycleRules, PartitionTemplate, ShardConfig, TemplatePart, NO_SHARD_CONFIG, + }, + }; + use futures::TryStreamExt; + use generated_types::database_rules::decode_database_rules; + use influxdb_line_protocol::parse_lines; + use iox_object_store::IoxObjectStore; + use metrics::TestMetricRegistry; + use object_store::{path::ObjectStorePath, ObjectStore}; + use parquet_file::catalog::{test_helpers::TestCatalogState, PreservedCatalog}; + use query::{exec::ExecutorType, frontend::sql::SqlQueryPlanner, QueryDatabase}; use std::{ convert::{Infallible, TryFrom}, sync::{ @@ -1282,28 +1298,8 @@ mod tests { }, time::{Duration, Instant}, }; - - use arrow::record_batch::RecordBatch; - use bytes::Bytes; - use futures::TryStreamExt; - - use arrow_util::assert_batches_eq; - use connection::test_helpers::{TestConnectionManager, TestRemoteServer}; - use data_types::chunk_metadata::ChunkAddr; - use data_types::database_rules::{ - HashRing, LifecycleRules, PartitionTemplate, ShardConfig, TemplatePart, NO_SHARD_CONFIG, - }; - use generated_types::database_rules::decode_database_rules; - use influxdb_line_protocol::parse_lines; - use iox_object_store::IoxObjectStore; - use metrics::TestMetricRegistry; - use object_store::{path::ObjectStorePath, ObjectStore}; - use parquet_file::catalog::{test_helpers::TestCatalogState, PreservedCatalog}; - use query::{exec::ExecutorType, frontend::sql::SqlQueryPlanner, QueryDatabase}; use test_helpers::assert_contains; - use super::*; - const ARBITRARY_DEFAULT_TIME: i64 = 456; fn make_application() -> Arc {