refactor: Organize uses

pull/24376/head
Carol (Nichols || Goulding) 2021-08-12 09:45:52 -04:00
parent ae6b0e669b
commit 564238ad8c
3 changed files with 67 additions and 67 deletions

View File

@ -1,16 +1,4 @@
//! Catalog preservation and transaction handling. //! Catalog preservation and transaction handling.
use std::{
collections::{
hash_map::Entry::{Occupied, Vacant},
HashMap,
},
convert::TryInto,
fmt::{Debug, Display},
num::TryFromIntError,
str::FromStr,
sync::Arc,
};
use crate::metadata::IoxParquetMetaData; use crate::metadata::IoxParquetMetaData;
use bytes::Bytes; use bytes::Bytes;
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
@ -25,6 +13,17 @@ use observability_deps::tracing::{info, warn};
use parking_lot::RwLock; use parking_lot::RwLock;
use prost::{DecodeError, EncodeError, Message}; use prost::{DecodeError, EncodeError, Message};
use snafu::{OptionExt, ResultExt, Snafu}; use snafu::{OptionExt, ResultExt, Snafu};
use std::{
collections::{
hash_map::Entry::{Occupied, Vacant},
HashMap,
},
convert::TryInto,
fmt::{Debug, Display},
num::TryFromIntError,
str::FromStr,
sync::Arc,
};
use tokio::sync::{Semaphore, SemaphorePermit}; use tokio::sync::{Semaphore, SemaphorePermit};
use uuid::Uuid; use uuid::Uuid;
@ -1563,14 +1562,15 @@ pub mod test_helpers {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::{
test_helpers::{
assert_catalog_state_implementation, break_catalog_with_weird_version, TestCatalogState,
},
*,
};
use crate::test_utils::{chunk_addr, make_iox_object_store, make_metadata}; use crate::test_utils::{chunk_addr, make_iox_object_store, make_metadata};
use object_store::parsed_path; use object_store::parsed_path;
use super::test_helpers::{
assert_catalog_state_implementation, break_catalog_with_weird_version, TestCatalogState,
};
use super::*;
#[tokio::test] #[tokio::test]
async fn test_create_empty() { async fn test_create_empty() {
let iox_object_store = make_iox_object_store(); let iox_object_store = make_iox_object_store();

View File

@ -1,31 +1,35 @@
use std::future::Future; use crate::{
use std::sync::Arc; db::{
use std::time::Duration; load::{create_preserved_catalog, load_or_create_preserved_catalog},
DatabaseToCommit,
use data_types::database_state::DatabaseStateCode; },
use data_types::server_id::ServerId; ApplicationState, Db, DB_RULES_FILE_NAME,
use data_types::{database_rules::DatabaseRules, DatabaseName}; };
use futures::future::{BoxFuture, Shared}; use bytes::BytesMut;
use futures::{FutureExt, TryFutureExt}; use data_types::{
database_rules::DatabaseRules, database_state::DatabaseStateCode, server_id::ServerId,
DatabaseName,
};
use futures::{
future::{BoxFuture, Shared},
FutureExt, TryFutureExt,
};
use generated_types::database_rules::encode_database_rules; use generated_types::database_rules::encode_database_rules;
use internal_types::freezable::Freezable; use internal_types::freezable::Freezable;
use iox_object_store::IoxObjectStore; use iox_object_store::IoxObjectStore;
use object_store::path::{ObjectStorePath, Path}; use object_store::{
path::{ObjectStorePath, Path},
ObjectStore, ObjectStoreApi,
};
use observability_deps::tracing::{error, info, warn}; use observability_deps::tracing::{error, info, warn};
use parking_lot::RwLock; use parking_lot::RwLock;
use parquet_file::catalog::PreservedCatalog;
use persistence_windows::checkpoint::ReplayPlan; use persistence_windows::checkpoint::ReplayPlan;
use snafu::{ResultExt, Snafu}; use snafu::{ResultExt, Snafu};
use tokio::sync::Notify; use std::{future::Future, sync::Arc, time::Duration};
use tokio::task::JoinError; use tokio::{sync::Notify, task::JoinError};
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use crate::db::load::{create_preserved_catalog, load_or_create_preserved_catalog};
use crate::db::DatabaseToCommit;
use crate::{ApplicationState, Db, DB_RULES_FILE_NAME};
use bytes::BytesMut;
use object_store::{ObjectStore, ObjectStoreApi};
use parquet_file::catalog::PreservedCatalog;
const INIT_BACKOFF: Duration = Duration::from_secs(1); const INIT_BACKOFF: Duration = Duration::from_secs(1);
#[derive(Debug, Snafu)] #[derive(Debug, Snafu)]

View File

@ -68,13 +68,10 @@
clippy::future_not_send clippy::future_not_send
)] )]
use std::sync::Arc;
use async_trait::async_trait; use async_trait::async_trait;
use data_types::database_rules::ShardConfig;
use data_types::error::ErrorLogger;
use data_types::{ use data_types::{
database_rules::{DatabaseRules, NodeGroup, RoutingRules, ShardId, Sink}, database_rules::{DatabaseRules, NodeGroup, RoutingRules, ShardConfig, ShardId, Sink},
error::ErrorLogger,
job::Job, job::Job,
server_id::ServerId, server_id::ServerId,
{DatabaseName, DatabaseNameError}, {DatabaseName, DatabaseNameError},
@ -88,26 +85,26 @@ use influxdb_line_protocol::ParsedLine;
use internal_types::freezable::Freezable; use internal_types::freezable::Freezable;
use lifecycle::LockableChunk; use lifecycle::LockableChunk;
use metrics::{KeyValue, MetricObserverBuilder}; use metrics::{KeyValue, MetricObserverBuilder};
use object_store::{ObjectStore, ObjectStoreApi}; use object_store::{
path::{parsed::DirsAndFileName, ObjectStorePath, Path},
ObjectStore, ObjectStoreApi,
};
use observability_deps::tracing::{error, info, warn}; use observability_deps::tracing::{error, info, warn};
use parking_lot::RwLock; use parking_lot::RwLock;
use query::exec::Executor; use query::exec::Executor;
use rand::seq::SliceRandom; use rand::seq::SliceRandom;
use resolver::Resolver; use resolver::Resolver;
use snafu::{OptionExt, ResultExt, Snafu}; use snafu::{OptionExt, ResultExt, Snafu};
use tokio::task::JoinError; use std::sync::Arc;
use tokio::{sync::Notify, task::JoinError};
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use tracker::{TaskTracker, TrackedFutureExt}; use tracker::{TaskTracker, TrackedFutureExt};
pub use application::ApplicationState; pub use application::ApplicationState;
pub use connection::{ConnectionManager, ConnectionManagerImpl, RemoteServer}; pub use connection::{ConnectionManager, ConnectionManagerImpl, RemoteServer};
pub use db::Db; pub use db::Db;
pub use job::JobRegistry; pub use job::JobRegistry;
use object_store::path::parsed::DirsAndFileName;
use object_store::path::{ObjectStorePath, Path};
pub use resolver::{GrpcConnectionString, RemoteTemplate}; pub use resolver::{GrpcConnectionString, RemoteTemplate};
use tokio::sync::Notify;
mod application; mod application;
mod connection; mod connection;
@ -1274,6 +1271,25 @@ fn database_store_prefix(
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*;
use arrow::record_batch::RecordBatch;
use arrow_util::assert_batches_eq;
use bytes::Bytes;
use connection::test_helpers::{TestConnectionManager, TestRemoteServer};
use data_types::{
chunk_metadata::ChunkAddr,
database_rules::{
HashRing, LifecycleRules, PartitionTemplate, ShardConfig, TemplatePart, NO_SHARD_CONFIG,
},
};
use futures::TryStreamExt;
use generated_types::database_rules::decode_database_rules;
use influxdb_line_protocol::parse_lines;
use iox_object_store::IoxObjectStore;
use metrics::TestMetricRegistry;
use object_store::{path::ObjectStorePath, ObjectStore};
use parquet_file::catalog::{test_helpers::TestCatalogState, PreservedCatalog};
use query::{exec::ExecutorType, frontend::sql::SqlQueryPlanner, QueryDatabase};
use std::{ use std::{
convert::{Infallible, TryFrom}, convert::{Infallible, TryFrom},
sync::{ sync::{
@ -1282,28 +1298,8 @@ mod tests {
}, },
time::{Duration, Instant}, time::{Duration, Instant},
}; };
use arrow::record_batch::RecordBatch;
use bytes::Bytes;
use futures::TryStreamExt;
use arrow_util::assert_batches_eq;
use connection::test_helpers::{TestConnectionManager, TestRemoteServer};
use data_types::chunk_metadata::ChunkAddr;
use data_types::database_rules::{
HashRing, LifecycleRules, PartitionTemplate, ShardConfig, TemplatePart, NO_SHARD_CONFIG,
};
use generated_types::database_rules::decode_database_rules;
use influxdb_line_protocol::parse_lines;
use iox_object_store::IoxObjectStore;
use metrics::TestMetricRegistry;
use object_store::{path::ObjectStorePath, ObjectStore};
use parquet_file::catalog::{test_helpers::TestCatalogState, PreservedCatalog};
use query::{exec::ExecutorType, frontend::sql::SqlQueryPlanner, QueryDatabase};
use test_helpers::assert_contains; use test_helpers::assert_contains;
use super::*;
const ARBITRARY_DEFAULT_TIME: i64 = 456; const ARBITRARY_DEFAULT_TIME: i64 = 456;
fn make_application() -> Arc<ApplicationState> { fn make_application() -> Arc<ApplicationState> {