style(ingester): fmt imports & long strings

Rewrite the imports to be a consistent order; std, external, crate and
merge all crate-level imports into one use statement.
pull/24376/head
Dom Dwyer 2022-09-14 14:12:36 +02:00
parent 074722eb3e
commit ee8cdb48af
16 changed files with 175 additions and 141 deletions

View File

@ -1,5 +1,7 @@
//! This module is responsible for compacting Ingester's data
use std::sync::Arc;
use data_types::{CompactionLevel, NamespaceId, PartitionInfo};
use datafusion::{error::DataFusionError, physical_plan::SendableRecordBatchStream};
use iox_query::{
@ -11,7 +13,6 @@ use iox_time::TimeProvider;
use parquet_file::metadata::IoxMetadata;
use schema::sort::{adjust_sort_key_columns, compute_sort_key, SortKey};
use snafu::{ResultExt, Snafu};
use std::sync::Arc;
use crate::{data::partition::PersistingBatch, query::QueryableBatch};
@ -29,7 +30,13 @@ pub enum Error {
#[snafu(display("Error while executing Ingester's compaction"))]
ExecutePlan { source: DataFusionError },
#[snafu(display("Error while building delete predicate from start time, {}, stop time, {}, and serialized predicate, {}", min, max, predicate))]
#[snafu(display(
"Error while building delete predicate from start time, {}, stop time, {}, and serialized \
predicate, {}",
min,
max,
predicate
))]
DeletePredicate {
source: predicate::delete_predicate::Error,
min: String,
@ -170,6 +177,13 @@ pub async fn compact(
#[cfg(test)]
mod tests {
use arrow_util::assert_batches_eq;
use data_types::{Partition, PartitionId, ShardId, TableId};
use iox_time::SystemProvider;
use mutable_batch_lp::lines_to_batches;
use schema::selection::Selection;
use uuid::Uuid;
use super::*;
use crate::test_util::{
create_batches_with_influxtype, create_batches_with_influxtype_different_cardinality,
@ -181,12 +195,6 @@ mod tests {
create_one_row_record_batch_with_influxtype, create_tombstone, make_meta,
make_persisting_batch, make_queryable_batch, make_queryable_batch_with_deletes,
};
use arrow_util::assert_batches_eq;
use data_types::{Partition, PartitionId, ShardId, TableId};
use iox_time::SystemProvider;
use mutable_batch_lp::lines_to_batches;
use schema::selection::Selection;
use uuid::Uuid;
// this test was added to guard against https://github.com/influxdata/influxdb_iox/issues/3782
// where if sending in a single row it would compact into an output of two batches, one of

View File

@ -1,17 +1,12 @@
//! Data for the lifecycle of the Ingester
use crate::{
compact::{compact_persisting_batch, CompactedStream},
lifecycle::LifecycleHandle,
};
use std::{collections::BTreeMap, pin::Pin, sync::Arc};
use arrow::{error::ArrowError, record_batch::RecordBatch};
use arrow_util::optimize::{optimize_record_batch, optimize_schema};
use async_trait::async_trait;
use backoff::{Backoff, BackoffConfig};
use data_types::{
DeletePredicate, PartitionId, SequenceNumber,
ShardId, ShardIndex, Timestamp,
};
use data_types::{PartitionId, SequenceNumber, ShardId, ShardIndex};
use datafusion::physical_plan::SendableRecordBatchStream;
use dml::DmlOperation;
use futures::{Stream, StreamExt};
@ -19,21 +14,17 @@ use iox_catalog::interface::{get_table_schema_by_id, Catalog};
use iox_query::exec::Executor;
use iox_time::SystemProvider;
use metric::{Attributes, Metric, U64Histogram, U64HistogramOptions};
use object_store::DynObjectStore;
use observability_deps::tracing::{debug, warn};
use parquet_file::storage::ParquetStorage;
use schema::selection::Selection;
use snafu::{OptionExt, ResultExt, Snafu};
use std::{
collections::{BTreeMap},
pin::Pin,
sync::Arc,
};
use uuid::Uuid;
use snafu::{OptionExt, Snafu};
use write_summary::ShardProgress;
use crate::{
compact::{compact_persisting_batch, CompactedStream},
lifecycle::LifecycleHandle,
};
pub mod namespace;
pub mod partition;
mod query_dedup;
@ -553,17 +544,17 @@ pub enum FlatIngesterQueryResponse {
#[cfg(test)]
mod tests {
use super::*;
use crate::{
data::namespace::NamespaceData,
lifecycle::{LifecycleConfig, LifecycleManager},
use std::{
ops::DerefMut,
task::{Context, Poll},
time::Duration,
};
use arrow::datatypes::SchemaRef;
use assert_matches::assert_matches;
use data_types::{
ColumnId, ColumnSet, CompactionLevel, NamespaceSchema, NonEmptyString, ParquetFileParams,
Sequence, TimestampRange,
ColumnId, ColumnSet, CompactionLevel, DeletePredicate, NamespaceSchema, NonEmptyString,
ParquetFileParams, Sequence, Timestamp, TimestampRange,
};
use datafusion::physical_plan::RecordBatchStream;
use dml::{DmlDelete, DmlMeta, DmlWrite};
@ -573,10 +564,13 @@ mod tests {
use metric::{MetricObserver, Observation};
use mutable_batch_lp::{lines_to_batches, test_helpers::lp_to_mutable_batch};
use object_store::memory::InMemory;
use std::{
ops::DerefMut,
task::{Context, Poll},
time::Duration,
use schema::selection::Selection;
use uuid::Uuid;
use super::*;
use crate::{
data::namespace::NamespaceData,
lifecycle::{LifecycleConfig, LifecycleManager},
};
#[tokio::test]

View File

@ -1,15 +1,7 @@
//! Ingest handler
use crate::{
data::{shard::ShardData, IngesterData, IngesterQueryResponse},
lifecycle::{run_lifecycle_manager, LifecycleConfig, LifecycleManager},
poison::PoisonCabinet,
querier_handler::prepare_data_to_querier,
stream_handler::{
sink_adaptor::IngestSinkAdaptor, sink_instrumentation::SinkInstrumentation,
PeriodicWatermarkFetcher, SequencedStreamHandler,
},
};
use std::{collections::BTreeMap, sync::Arc, time::Duration};
use async_trait::async_trait;
use backoff::BackoffConfig;
use data_types::{Shard, ShardIndex, TopicMetadata};
@ -26,7 +18,6 @@ use metric::{DurationHistogram, Metric, U64Counter};
use object_store::DynObjectStore;
use observability_deps::tracing::*;
use snafu::{ResultExt, Snafu};
use std::{collections::BTreeMap, sync::Arc, time::Duration};
use tokio::{
sync::{Semaphore, TryAcquireError},
task::{JoinError, JoinHandle},
@ -35,6 +26,17 @@ use tokio_util::sync::CancellationToken;
use write_buffer::core::WriteBufferReading;
use write_summary::ShardProgress;
use crate::{
data::{shard::ShardData, IngesterData, IngesterQueryResponse},
lifecycle::{run_lifecycle_manager, LifecycleConfig, LifecycleManager},
poison::PoisonCabinet,
querier_handler::prepare_data_to_querier,
stream_handler::{
sink_adaptor::IngestSinkAdaptor, sink_instrumentation::SinkInstrumentation,
PeriodicWatermarkFetcher, SequencedStreamHandler,
},
};
#[derive(Debug, Snafu)]
#[allow(missing_copy_implementations, missing_docs)]
pub enum Error {
@ -382,9 +384,8 @@ impl<T> Drop for IngestHandlerImpl<T> {
#[cfg(test)]
mod tests {
use crate::data::partition::SnapshotBatch;
use std::{num::NonZeroU32, ops::DerefMut};
use super::*;
use data_types::{Namespace, NamespaceSchema, QueryPool, Sequence, SequenceNumber};
use dml::{DmlMeta, DmlWrite};
use iox_catalog::{mem::MemCatalog, validate_or_insert_schema};
@ -392,10 +393,12 @@ mod tests {
use metric::{Attributes, Metric, U64Counter, U64Gauge};
use mutable_batch_lp::lines_to_batches;
use object_store::memory::InMemory;
use std::{num::NonZeroU32, ops::DerefMut};
use test_helpers::maybe_start_logging;
use write_buffer::mock::{MockBufferForReading, MockBufferSharedState};
use super::*;
use crate::data::partition::SnapshotBatch;
#[tokio::test]
async fn read_from_write_buffer_write_to_mutable_buffer() {
let ingester = TestIngester::new().await;
@ -765,8 +768,7 @@ mod tests {
verify_ingester_buffer_has_data(ingester, shard, namespace, |first_batch| {
if first_batch.min_sequence_number == SequenceNumber::new(1) {
panic!(
"initialization did a seek to the beginning rather than \
the min_unpersisted"
"initialization did a seek to the beginning rather than the min_unpersisted"
);
}
})

View File

@ -1,7 +1,8 @@
use std::sync::Arc;
use data_types::PartitionId;
use iox_time::TimeProvider;
use parking_lot::Mutex;
use std::sync::Arc;
use tracker::{
AbstractTaskRegistry, TaskRegistration, TaskRegistry, TaskRegistryWithHistory,
TaskRegistryWithMetrics, TaskTracker,

View File

@ -5,20 +5,22 @@
//! some absolute number and individual Parquet files that get persisted below some number. It
//! is expected that they may be above or below the absolute thresholds.
use crate::{
data::Persister,
job::{Job, JobRegistry},
poison::{PoisonCabinet, PoisonPill},
};
use std::{collections::BTreeMap, sync::Arc, time::Duration};
use data_types::{PartitionId, SequenceNumber, ShardId};
use iox_time::{Time, TimeProvider};
use metric::{Metric, U64Counter};
use observability_deps::tracing::{error, info, warn};
use parking_lot::Mutex;
use std::{collections::BTreeMap, sync::Arc, time::Duration};
use tokio_util::sync::CancellationToken;
use tracker::TrackedFutureExt;
use crate::{
data::Persister,
job::{Job, JobRegistry},
poison::{PoisonCabinet, PoisonPill},
};
/// API suitable for ingester tasks to query and update the [`LifecycleManager`] state.
pub trait LifecycleHandle: Send + Sync + 'static {
/// Logs bytes written into a partition so that it can be tracked for the manager to
@ -566,13 +568,15 @@ pub(crate) async fn run_lifecycle_manager<P: Persister>(
#[cfg(test)]
mod tests {
use super::*;
use std::collections::BTreeSet;
use async_trait::async_trait;
use iox_time::MockProvider;
use metric::{Attributes, Registry};
use std::collections::BTreeSet;
use tokio::sync::Barrier;
use super::*;
#[derive(Default)]
struct TestPersister {
persist_called: Mutex<BTreeSet<PartitionId>>,

View File

@ -1,12 +1,13 @@
use data_types::ShardIndex;
use futures::Future;
use parking_lot::{RwLock, RwLockUpgradableReadGuard};
use pin_project::pin_project;
use std::{
sync::Arc,
task::{Poll, Waker},
};
use data_types::ShardIndex;
use futures::Future;
use parking_lot::{RwLock, RwLockUpgradableReadGuard};
use pin_project::pin_project;
#[derive(Debug, Clone, PartialEq, Eq)]
#[allow(dead_code)]
pub enum PoisonPill {

View File

@ -1,12 +1,7 @@
//! Handle all requests from Querier
use crate::{
data::{
partition::UnpersistedPartitionData, IngesterData, IngesterQueryPartition,
IngesterQueryResponse,
},
query::QueryableBatch,
};
use std::sync::Arc;
use arrow::error::ArrowError;
use datafusion::{
error::DataFusionError, logical_plan::LogicalPlanBuilder,
@ -22,7 +17,14 @@ use observability_deps::tracing::debug;
use predicate::Predicate;
use schema::selection::Selection;
use snafu::{ensure, ResultExt, Snafu};
use std::sync::Arc;
use crate::{
data::{
partition::UnpersistedPartitionData, IngesterData, IngesterQueryPartition,
IngesterQueryResponse,
},
query::QueryableBatch,
};
#[derive(Debug, Snafu)]
#[allow(missing_copy_implementations, missing_docs)]
@ -285,6 +287,13 @@ pub(crate) async fn query(
#[cfg(test)]
mod tests {
use arrow::record_batch::RecordBatch;
use arrow_util::{assert_batches_eq, assert_batches_sorted_eq};
use assert_matches::assert_matches;
use datafusion::logical_plan::{col, lit};
use futures::TryStreamExt;
use predicate::Predicate;
use super::*;
use crate::{
data::FlatIngesterQueryResponse,
@ -294,12 +303,6 @@ mod tests {
make_queryable_batch_with_deletes, DataLocation, TEST_NAMESPACE, TEST_TABLE,
},
};
use arrow::record_batch::RecordBatch;
use arrow_util::{assert_batches_eq, assert_batches_sorted_eq};
use assert_matches::assert_matches;
use datafusion::logical_plan::{col, lit};
use futures::TryStreamExt;
use predicate::Predicate;
#[tokio::test]
async fn test_query() {

View File

@ -1,5 +1,7 @@
//! Module to handle query on Ingester's data
use std::{any::Any, sync::Arc};
use arrow::record_batch::RecordBatch;
use arrow_util::util::ensure_schema;
use data_types::{
@ -22,7 +24,6 @@ use predicate::{
};
use schema::{merge::merge_record_batch_schemas, selection::Selection, sort::SortKey, Schema};
use snafu::{ResultExt, Snafu};
use std::{any::Any, sync::Arc};
use crate::data::partition::SnapshotBatch;
@ -259,8 +260,6 @@ impl QueryChunk for QueryableBatch {
#[cfg(test)]
mod tests {
use super::*;
use crate::test_util::create_tombstone;
use arrow::{
array::{
ArrayRef, BooleanArray, DictionaryArray, Float64Array, Int64Array, StringArray,
@ -270,6 +269,9 @@ mod tests {
};
use data_types::{DeleteExpr, Op, Scalar, TimestampRange};
use super::*;
use crate::test_util::create_tombstone;
#[tokio::test]
async fn test_merge_batch_schema() {
// Merge schema of the batches

View File

@ -1,10 +1,9 @@
//! Ingester server entrypoint.
use std::sync::Arc;
use std::{fmt::Debug, sync::Arc};
use self::{grpc::GrpcDelegate, http::HttpDelegate};
use crate::handler::IngestHandler;
use std::fmt::Debug;
pub mod grpc;
pub mod http;

View File

@ -1,9 +1,14 @@
//! gRPC service implementations for `ingester`.
use crate::{
data::{FlatIngesterQueryResponse, FlatIngesterQueryResponseStream},
handler::IngestHandler,
use std::{
pin::Pin,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
task::Poll,
};
use arrow::error::ArrowError;
use arrow_flight::{
flight_service_server::{FlightService as Flight, FlightServiceServer as FlightServer},
@ -20,18 +25,15 @@ use observability_deps::tracing::{debug, info, warn};
use pin_project::pin_project;
use prost::Message;
use snafu::{ResultExt, Snafu};
use std::{
pin::Pin,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
task::Poll,
};
use tonic::{Request, Response, Streaming};
use trace::ctx::SpanContext;
use write_summary::WriteSummary;
use crate::{
data::{FlatIngesterQueryResponse, FlatIngesterQueryResponseStream},
handler::IngestHandler,
};
/// This type is responsible for managing all gRPC services exposed by
/// `ingester`.
#[derive(Debug, Default)]
@ -465,9 +467,8 @@ mod tests {
use mutable_batch_lp::test_helpers::lp_to_mutable_batch;
use schema::selection::Selection;
use crate::data::partition::PartitionStatus;
use super::*;
use crate::data::partition::PartitionStatus;
#[tokio::test]
async fn test_get_stream_empty() {

View File

@ -1,10 +1,12 @@
//! HTTP service implementations for `ingester`.
use crate::handler::IngestHandler;
use hyper::{Body, Request, Response, StatusCode};
use std::sync::Arc;
use hyper::{Body, Request, Response, StatusCode};
use thiserror::Error;
use crate::handler::IngestHandler;
/// Errors returned by the `router` HTTP request handler.
#[derive(Debug, Error, Copy, Clone)]
pub enum Error {

View File

@ -1,15 +1,17 @@
use super::DmlSink;
use crate::lifecycle::{LifecycleHandle, LifecycleHandleImpl};
use std::{fmt::Debug, time::Duration};
use data_types::{SequenceNumber, ShardIndex};
use dml::DmlOperation;
use futures::{pin_mut, FutureExt, StreamExt};
use iox_time::{SystemProvider, TimeProvider};
use metric::{Attributes, DurationCounter, DurationGauge, U64Counter};
use observability_deps::tracing::*;
use std::{fmt::Debug, time::Duration};
use tokio_util::sync::CancellationToken;
use write_buffer::core::{WriteBufferErrorKind, WriteBufferStreamHandler};
use super::DmlSink;
use crate::lifecycle::{LifecycleHandle, LifecycleHandleImpl};
/// When the [`LifecycleManager`] indicates that ingest should be paused because
/// of memory pressure, the shard will loop, sleeping this long between
/// calls to [`LifecycleHandle::can_resume_ingest()`] with the manager if it
@ -89,10 +91,13 @@ impl<I, O> SequencedStreamHandler<I, O> {
skip_to_oldest_available: bool,
) -> Self {
// TTBR
let time_to_be_readable = metrics.register_metric::<DurationGauge>(
"ingester_ttbr",
"duration of time between producer writing to consumer putting into queryable cache",
).recorder(metric_attrs(shard_index, &topic_name, None, false));
let time_to_be_readable = metrics
.register_metric::<DurationGauge>(
"ingester_ttbr",
"duration of time between producer writing to consumer putting into queryable \
cache",
)
.recorder(metric_attrs(shard_index, &topic_name, None, false));
// Lifecycle-driven ingest pause duration
let pause_duration = metrics
@ -461,11 +466,8 @@ fn metric_attrs(
#[cfg(test)]
mod tests {
use super::*;
use crate::{
lifecycle::{LifecycleConfig, LifecycleManager},
stream_handler::mock_sink::MockDmlSink,
};
use std::sync::Arc;
use assert_matches::assert_matches;
use async_trait::async_trait;
use data_types::{DeletePredicate, Sequence, TimestampRange};
@ -475,12 +477,17 @@ mod tests {
use metric::Metric;
use mutable_batch_lp::lines_to_batches;
use once_cell::sync::Lazy;
use std::sync::Arc;
use test_helpers::timeout::FutureTimeout;
use tokio::sync::{mpsc, oneshot};
use tokio_stream::wrappers::ReceiverStream;
use write_buffer::core::WriteBufferError;
use super::*;
use crate::{
lifecycle::{LifecycleConfig, LifecycleManager},
stream_handler::mock_sink::MockDmlSink,
};
static TEST_TIME: Lazy<Time> = Lazy::new(|| SystemProvider::default().now());
static TEST_SHARD_INDEX: ShardIndex = ShardIndex::new(42);
static TEST_TOPIC_NAME: &str = "topic_name";
@ -967,8 +974,8 @@ mod tests {
// An abnormal end to the steam causes a panic, rather than a silent stream reader exit.
#[tokio::test]
#[should_panic(
expected = "shard index ShardIndex(42) stream for topic topic_name ended without \
graceful shutdown"
expected = "shard index ShardIndex(42) stream for topic topic_name ended without graceful \
shutdown"
)]
async fn test_early_stream_end_panic() {
let metrics = Arc::new(metric::Registry::default());

View File

@ -1,7 +1,3 @@
use super::sink_instrumentation::WatermarkFetcher;
use data_types::ShardIndex;
use metric::U64Counter;
use observability_deps::tracing::*;
use std::{
sync::{
atomic::{AtomicI64, Ordering},
@ -9,9 +5,15 @@ use std::{
},
time::{Duration, Instant},
};
use data_types::ShardIndex;
use metric::U64Counter;
use observability_deps::tracing::*;
use tokio::task::JoinHandle;
use write_buffer::core::WriteBufferReading;
use super::sink_instrumentation::WatermarkFetcher;
/// Periodically fetch and cache the maximum known write buffer offset
/// (watermark) from the write buffer for a given shard.
///

View File

@ -1,11 +1,13 @@
//! Compatibility layer providing a [`DmlSink`] impl for [`IngesterData`].
use super::DmlSink;
use crate::{data::IngesterData, lifecycle::LifecycleHandleImpl};
use std::sync::Arc;
use async_trait::async_trait;
use data_types::ShardId;
use dml::DmlOperation;
use std::sync::Arc;
use super::DmlSink;
use crate::{data::IngesterData, lifecycle::LifecycleHandleImpl};
/// Provides a [`DmlSink`] implementation for a [`IngesterData`] instance.
#[derive(Debug)]

View File

@ -1,14 +1,16 @@
//! Instrumentation for [`DmlSink`] implementations.
use super::DmlSink;
use std::fmt::Debug;
use async_trait::async_trait;
use data_types::ShardIndex;
use dml::DmlOperation;
use iox_time::{SystemProvider, TimeProvider};
use metric::{Attributes, DurationHistogram, U64Counter, U64Gauge};
use std::fmt::Debug;
use trace::span::{SpanExt, SpanRecorder};
use super::DmlSink;
/// A [`WatermarkFetcher`] abstracts a source of the write buffer high watermark
/// (max known offset).
///
@ -100,10 +102,13 @@ where
"Last consumed sequence number (e.g. Kafka offset)",
)
.recorder(attr.clone());
let write_buffer_sequence_number_lag = metrics.register_metric::<U64Gauge>(
"ingester_write_buffer_sequence_number_lag",
"The difference between the the last sequence number available (e.g. Kafka offset) and (= minus) last consumed sequence number",
).recorder(attr.clone());
let write_buffer_sequence_number_lag = metrics
.register_metric::<U64Gauge>(
"ingester_write_buffer_sequence_number_lag",
"The difference between the the last sequence number available (e.g. Kafka \
offset) and (= minus) last consumed sequence number",
)
.recorder(attr.clone());
let write_buffer_last_ingest_ts = metrics
.register_metric::<U64Gauge>(
"ingester_write_buffer_last_ingest_ts",
@ -240,12 +245,11 @@ mod tests {
use once_cell::sync::Lazy;
use trace::{ctx::SpanContext, span::SpanStatus, RingBufferTraceCollector, TraceCollector};
use super::*;
use crate::stream_handler::{
mock_sink::MockDmlSink, mock_watermark_fetcher::MockWatermarkFetcher,
};
use super::*;
/// The shard index the [`SinkInstrumentation`] under test is configured to
/// be observing for.
const SHARD_INDEX: ShardIndex = ShardIndex::new(42);

View File

@ -2,16 +2,8 @@
#![allow(missing_docs)]
use crate::{
data::{
namespace::NamespaceData,
partition::{PartitionData, PersistingBatch, SnapshotBatch},
shard::ShardData,
table::TableData,
IngesterData,
},
query::QueryableBatch,
};
use std::{collections::BTreeMap, sync::Arc};
use arrow::record_batch::RecordBatch;
use arrow_util::assert_batches_eq;
use bitflags::bitflags;
@ -25,9 +17,19 @@ use iox_time::{SystemProvider, Time, TimeProvider};
use object_store::memory::InMemory;
use parquet_file::metadata::IoxMetadata;
use schema::sort::SortKey;
use std::{collections::BTreeMap, sync::Arc};
use uuid::Uuid;
use crate::{
data::{
namespace::NamespaceData,
partition::{PartitionData, PersistingBatch, SnapshotBatch},
shard::ShardData,
table::TableData,
IngesterData,
},
query::QueryableBatch,
};
/// Create a persisting batch, some tombstones and corresponding metadata for them after compaction
pub async fn make_persisting_batch_with_meta() -> (Arc<PersistingBatch>, Vec<Tombstone>, IoxMetadata)
{