refactor: Organize uses

pull/24376/head
Carol (Nichols || Goulding) 2021-07-26 14:10:02 -04:00
parent 7a389fc237
commit 7c9a21632b
12 changed files with 120 additions and 129 deletions

View File

@ -1,9 +1,8 @@
//! Module contains a representation of chunk metadata
use std::sync::Arc;
use crate::partition_metadata::PartitionAddr;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
/// Address of the chunk within the catalog
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]

View File

@ -1,8 +1,12 @@
use crate::google::{FieldViolation, FromFieldOpt};
use crate::influxdata::iox::management::v1 as management;
use crate::{
google::{FieldViolation, FromFieldOpt},
influxdata::iox::management::v1 as management,
};
use data_types::chunk_metadata::{ChunkLifecycleAction, ChunkStorage, ChunkSummary};
use std::convert::{TryFrom, TryInto};
use std::sync::Arc;
use std::{
convert::{TryFrom, TryInto},
sync::Arc,
};
/// Conversion code to management API chunk structure
impl From<ChunkSummary> for management::Chunk {

View File

@ -9,18 +9,19 @@
)]
use chrono::{DateTime, Utc};
use data_types::chunk_metadata::{ChunkAddr, ChunkLifecycleAction, ChunkStorage};
use data_types::database_rules::LifecycleRules;
use data_types::DatabaseName;
pub use guard::*;
use data_types::{
chunk_metadata::{ChunkAddr, ChunkLifecycleAction, ChunkStorage},
database_rules::LifecycleRules,
DatabaseName,
};
use internal_types::access::AccessMetrics;
pub use policy::*;
use std::time::Instant;
use tracker::TaskTracker;
mod guard;
pub use guard::*;
mod policy;
pub use policy::*;
/// A trait that encapsulates the database logic that is automated by `LifecyclePolicy`
pub trait LifecycleDb {

View File

@ -1,21 +1,22 @@
use std::convert::TryInto;
use std::fmt::Debug;
use std::time::{Duration, Instant};
use chrono::{DateTime, Utc};
use data_types::DatabaseName;
use futures::future::BoxFuture;
use data_types::chunk_metadata::{ChunkLifecycleAction, ChunkStorage};
use data_types::database_rules::LifecycleRules;
use observability_deps::tracing::{debug, info, trace, warn};
use tracker::TaskTracker;
use crate::{
LifecycleChunk, LifecycleDb, LifecyclePartition, LockableChunk, LockablePartition,
PersistHandle,
};
use chrono::{DateTime, Utc};
use data_types::{
chunk_metadata::{ChunkLifecycleAction, ChunkStorage},
database_rules::LifecycleRules,
DatabaseName,
};
use futures::future::BoxFuture;
use internal_types::access::AccessMetrics;
use observability_deps::tracing::{debug, info, trace, warn};
use std::{
convert::TryInto,
fmt::Debug,
time::{Duration, Instant},
};
use tracker::TaskTracker;
/// Number of seconds to wait before retying a failed lifecycle action
pub const LIFECYCLE_ACTION_BACKOFF: Duration = Duration::from_secs(10);
@ -654,21 +655,20 @@ fn sort_free_candidates<P>(candidates: &mut Vec<FreeCandidate<'_, P>>) {
#[cfg(test)]
mod tests {
use std::cmp::max;
use std::collections::BTreeMap;
use std::convert::Infallible;
use std::num::{NonZeroU32, NonZeroUsize};
use std::sync::Arc;
use data_types::chunk_metadata::{ChunkAddr, ChunkStorage};
use tracker::{RwLock, TaskId, TaskRegistration, TaskRegistry};
use super::*;
use crate::{
ChunkLifecycleAction, LifecycleReadGuard, LifecycleWriteGuard, LockableChunk,
LockablePartition, PersistHandle,
};
use super::*;
use data_types::chunk_metadata::{ChunkAddr, ChunkStorage};
use std::{
cmp::max,
collections::BTreeMap,
convert::Infallible,
num::{NonZeroU32, NonZeroUsize},
sync::Arc,
};
use tracker::{RwLock, TaskId, TaskRegistration, TaskRegistry};
#[derive(Debug, Eq, PartialEq)]
enum MoverEvents {

View File

@ -1,15 +1,17 @@
//! This module contains the main IOx Database object which has the
//! instances of the mutable buffer, read buffer, and object store
use crate::db::catalog::chunk::ChunkStage;
use crate::db::catalog::table::TableSchemaUpsertHandle;
pub(crate) use crate::db::chunk::DbChunk;
use crate::db::lifecycle::ArcDb;
use crate::{
db::{
access::QueryCatalogAccess,
catalog::{chunk::CatalogChunk, partition::Partition, Catalog, TableNameFilter},
lifecycle::{LockableCatalogChunk, LockableCatalogPartition},
catalog::{
chunk::{CatalogChunk, ChunkStage},
partition::Partition,
table::TableSchemaUpsertHandle,
Catalog, TableNameFilter,
},
lifecycle::{ArcDb, LockableCatalogChunk, LockableCatalogPartition},
},
JobRegistry,
};
@ -31,13 +33,11 @@ use mutable_buffer::chunk::{ChunkMetrics as MutableBufferChunkMetrics, MBChunk};
use object_store::{path::parsed::DirsAndFileName, ObjectStore};
use observability_deps::tracing::{debug, error, info};
use parking_lot::RwLock;
use parquet_file::catalog::CatalogParquetInfo;
use parquet_file::{
catalog::{CheckpointData, PreservedCatalog},
catalog::{CatalogParquetInfo, CheckpointData, PreservedCatalog},
cleanup::{delete_files as delete_parquet_files, get_unreferenced_parquet_files},
};
use persistence_windows::checkpoint::ReplayPlan;
use persistence_windows::persistence_windows::PersistenceWindows;
use persistence_windows::{checkpoint::ReplayPlan, persistence_windows::PersistenceWindows};
use query::{exec::Executor, predicate::Predicate, QueryDatabase};
use rand_distr::{Distribution, Poisson};
use snafu::{ensure, OptionExt, ResultExt, Snafu};
@ -50,8 +50,10 @@ use std::{
},
time::{Duration, Instant},
};
use write_buffer::config::WriteBufferConfig;
use write_buffer::core::{FetchHighWatermark, WriteBufferError};
use write_buffer::{
config::WriteBufferConfig,
core::{FetchHighWatermark, WriteBufferError},
};
pub mod access;
pub mod catalog;
@ -1331,11 +1333,11 @@ mod tests {
use arrow_util::{assert_batches_eq, assert_batches_sorted_eq};
use bytes::Bytes;
use chrono::DateTime;
use data_types::write_summary::TimestampSummary;
use data_types::{
chunk_metadata::ChunkStorage,
database_rules::{LifecycleRules, PartitionTemplate, TemplatePart},
partition_metadata::{ColumnSummary, InfluxDbType, StatValues, Statistics, TableSummary},
write_summary::TimestampSummary,
};
use entry::{test_helpers::lp_to_entry, Sequence};
use futures::{stream, StreamExt, TryStreamExt};

View File

@ -1,16 +1,15 @@
use crate::db::catalog::metrics::{StorageGauge, TimestampHistogram};
use chrono::{DateTime, Utc};
use data_types::instant::to_approximate_datetime;
use data_types::write_summary::TimestampSummary;
use data_types::{
chunk_metadata::{
ChunkAddr, ChunkColumnSummary, ChunkLifecycleAction, ChunkStorage, ChunkSummary,
DetailedChunkSummary,
},
instant::to_approximate_datetime,
partition_metadata::TableSummary,
write_summary::TimestampSummary,
};
use internal_types::access::AccessRecorder;
use internal_types::schema::Schema;
use internal_types::{access::AccessRecorder, schema::Schema};
use metrics::{Counter, Histogram, KeyValue};
use mutable_buffer::chunk::{snapshot::ChunkSnapshot as MBChunkSnapshot, MBChunk};
use observability_deps::tracing::debug;
@ -969,6 +968,7 @@ impl CatalogChunk {
#[cfg(test)]
mod tests {
use super::*;
use entry::test_helpers::lp_to_entry;
use mutable_buffer::chunk::{ChunkMetrics as MBChunkMetrics, MBChunk};
use parquet_file::{
@ -976,8 +976,6 @@ mod tests {
test_utils::{make_chunk as make_parquet_chunk_with_store, make_object_store},
};
use super::*;
#[test]
fn test_new_open() {
let addr = chunk_addr();

View File

@ -5,8 +5,8 @@ use chrono::{DateTime, Utc};
use data_types::partition_metadata;
use datafusion::physical_plan::SendableRecordBatchStream;
use datafusion_util::MemoryStream;
use internal_types::access::AccessRecorder;
use internal_types::{
access::AccessRecorder,
schema::{sort::SortKey, Schema},
selection::Selection,
};

View File

@ -1,20 +1,23 @@
use std::fmt::Display;
use std::sync::Arc;
use std::time::Instant;
use chrono::{DateTime, TimeZone, Utc};
use super::DbChunk;
use crate::{
db::catalog::{chunk::CatalogChunk, partition::Partition},
Db,
};
use ::lifecycle::LifecycleDb;
use data_types::chunk_metadata::{ChunkAddr, ChunkLifecycleAction, ChunkStorage};
use data_types::database_rules::LifecycleRules;
use data_types::error::ErrorLogger;
use data_types::job::Job;
use data_types::partition_metadata::Statistics;
use data_types::DatabaseName;
use chrono::{DateTime, TimeZone, Utc};
use data_types::{
chunk_metadata::{ChunkAddr, ChunkLifecycleAction, ChunkStorage},
database_rules::LifecycleRules,
error::ErrorLogger,
job::Job,
partition_metadata::Statistics,
DatabaseName,
};
use datafusion::physical_plan::SendableRecordBatchStream;
use internal_types::access::AccessMetrics;
use internal_types::schema::merge::SchemaMerger;
use internal_types::schema::{Schema, TIME_COLUMN_NAME};
use internal_types::{
access::AccessMetrics,
schema::{merge::SchemaMerger, Schema, TIME_COLUMN_NAME},
};
use lifecycle::{
LifecycleChunk, LifecyclePartition, LifecycleReadGuard, LifecycleWriteGuard, LockableChunk,
LockablePartition,
@ -22,12 +25,9 @@ use lifecycle::{
use observability_deps::tracing::{info, trace};
use persistence_windows::persistence_windows::FlushHandle;
use query::QueryChunkMeta;
use std::{fmt::Display, sync::Arc, time::Instant};
use tracker::{RwLock, TaskTracker};
use crate::db::catalog::chunk::CatalogChunk;
use crate::db::catalog::partition::Partition;
use crate::Db;
pub(crate) use compact::compact_chunks;
pub(crate) use drop::drop_chunk;
pub(crate) use error::{Error, Result};
@ -35,8 +35,6 @@ pub(crate) use move_chunk::move_chunk_to_read_buffer;
pub(crate) use persist::persist_chunks;
pub(crate) use unload::unload_read_buffer_chunk;
use super::DbChunk;
mod compact;
mod drop;
mod error;

View File

@ -168,19 +168,16 @@ pub fn persist_chunks(
#[cfg(test)]
mod tests {
use std::num::{NonZeroU32, NonZeroU64};
use std::time::Instant;
use super::*;
use crate::{db::test_helpers::write_lp, utils::TestDb};
use chrono::{TimeZone, Utc};
use data_types::database_rules::LifecycleRules;
use lifecycle::{LockableChunk, LockablePartition};
use query::QueryDatabase;
use crate::db::test_helpers::write_lp;
use crate::utils::TestDb;
use super::*;
use std::{
num::{NonZeroU32, NonZeroU64},
time::Instant,
};
#[tokio::test]
async fn test_flush_overlapping() {

View File

@ -7,26 +7,21 @@
//!
//! For example `SELECT * FROM system.chunks`
use std::any::Any;
use std::sync::Arc;
use super::catalog::Catalog;
use crate::JobRegistry;
use arrow::{
datatypes::{Field, Schema, SchemaRef},
error::Result,
record_batch::RecordBatch,
};
use chrono::{DateTime, Utc};
use datafusion::{
catalog::schema::SchemaProvider,
datasource::{datasource::Statistics, TableProvider},
error::{DataFusionError, Result as DataFusionResult},
physical_plan::{memory::MemoryExec, ExecutionPlan},
};
use crate::JobRegistry;
use super::catalog::Catalog;
use std::{any::Any, sync::Arc};
mod chunks;
mod columns;
@ -205,11 +200,10 @@ fn scan_batch(
#[cfg(test)]
mod tests {
use super::*;
use arrow::array::{ArrayRef, UInt64Array};
use arrow_util::assert_batches_eq;
use super::*;
fn seq_array(start: u64, end: u64) -> ArrayRef {
Arc::new(UInt64Array::from_iter_values(start..end))
}

View File

@ -1,16 +1,16 @@
use crate::db::{
catalog::Catalog,
system_tables::{time_to_ts, IoxSystemTable},
};
use arrow::{
array::{StringArray, TimestampNanosecondArray, UInt32Array, UInt64Array},
datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit},
error::Result,
record_batch::RecordBatch,
};
use data_types::{chunk_metadata::ChunkSummary, error::ErrorLogger};
use std::sync::Arc;
use arrow::array::{StringArray, TimestampNanosecondArray, UInt32Array, UInt64Array};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit};
use arrow::error::Result;
use arrow::record_batch::RecordBatch;
use data_types::chunk_metadata::ChunkSummary;
use data_types::error::ErrorLogger;
use crate::db::catalog::Catalog;
use crate::db::system_tables::{time_to_ts, IoxSystemTable};
/// Implementation of system.chunks table
#[derive(Debug)]
pub(super) struct ChunksTable {
@ -128,12 +128,10 @@ fn from_chunk_summaries(schema: SchemaRef, chunks: Vec<ChunkSummary>) -> Result<
#[cfg(test)]
mod tests {
use chrono::{TimeZone, Utc};
use arrow_util::assert_batches_eq;
use data_types::chunk_metadata::{ChunkLifecycleAction, ChunkStorage};
use super::*;
use arrow_util::assert_batches_eq;
use chrono::{TimeZone, Utc};
use data_types::chunk_metadata::{ChunkLifecycleAction, ChunkStorage};
#[test]
fn test_from_chunk_summaries() {

View File

@ -1,17 +1,16 @@
use std::collections::HashMap;
use std::sync::Arc;
use arrow::array::{ArrayRef, StringBuilder, UInt32Builder, UInt64Builder};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use arrow::error::Result;
use arrow::record_batch::RecordBatch;
use data_types::chunk_metadata::DetailedChunkSummary;
use data_types::error::ErrorLogger;
use data_types::partition_metadata::{PartitionSummary, TableSummary};
use crate::db::catalog::Catalog;
use crate::db::system_tables::IoxSystemTable;
use crate::db::{catalog::Catalog, system_tables::IoxSystemTable};
use arrow::{
array::{ArrayRef, StringBuilder, UInt32Builder, UInt64Builder},
datatypes::{DataType, Field, Schema, SchemaRef},
error::Result,
record_batch::RecordBatch,
};
use data_types::{
chunk_metadata::DetailedChunkSummary,
error::ErrorLogger,
partition_metadata::{PartitionSummary, TableSummary},
};
use std::{collections::HashMap, sync::Arc};
/// Implementation of `system.columns` system table
#[derive(Debug)]
@ -217,11 +216,12 @@ fn assemble_chunk_columns(
#[cfg(test)]
mod tests {
use arrow_util::assert_batches_eq;
use data_types::chunk_metadata::{ChunkColumnSummary, ChunkStorage, ChunkSummary};
use data_types::partition_metadata::{ColumnSummary, InfluxDbType, StatValues, Statistics};
use super::*;
use arrow_util::assert_batches_eq;
use data_types::{
chunk_metadata::{ChunkColumnSummary, ChunkStorage, ChunkSummary},
partition_metadata::{ColumnSummary, InfluxDbType, StatValues, Statistics},
};
#[test]
fn test_from_partition_summaries() {