diff --git a/ingester/src/compact.rs b/ingester/src/compact.rs index 47bd902540..2a85b93c03 100644 --- a/ingester/src/compact.rs +++ b/ingester/src/compact.rs @@ -1,5 +1,7 @@ //! This module is responsible for compacting Ingester's data +use std::sync::Arc; + use data_types::{CompactionLevel, NamespaceId, PartitionInfo}; use datafusion::{error::DataFusionError, physical_plan::SendableRecordBatchStream}; use iox_query::{ @@ -11,7 +13,6 @@ use iox_time::TimeProvider; use parquet_file::metadata::IoxMetadata; use schema::sort::{adjust_sort_key_columns, compute_sort_key, SortKey}; use snafu::{ResultExt, Snafu}; -use std::sync::Arc; use crate::{data::partition::PersistingBatch, query::QueryableBatch}; @@ -29,7 +30,13 @@ pub enum Error { #[snafu(display("Error while executing Ingester's compaction"))] ExecutePlan { source: DataFusionError }, - #[snafu(display("Error while building delete predicate from start time, {}, stop time, {}, and serialized predicate, {}", min, max, predicate))] + #[snafu(display( + "Error while building delete predicate from start time, {}, stop time, {}, and serialized \ + predicate, {}", + min, + max, + predicate + ))] DeletePredicate { source: predicate::delete_predicate::Error, min: String, @@ -170,6 +177,13 @@ pub async fn compact( #[cfg(test)] mod tests { + use arrow_util::assert_batches_eq; + use data_types::{Partition, PartitionId, ShardId, TableId}; + use iox_time::SystemProvider; + use mutable_batch_lp::lines_to_batches; + use schema::selection::Selection; + use uuid::Uuid; + use super::*; use crate::test_util::{ create_batches_with_influxtype, create_batches_with_influxtype_different_cardinality, @@ -181,12 +195,6 @@ mod tests { create_one_row_record_batch_with_influxtype, create_tombstone, make_meta, make_persisting_batch, make_queryable_batch, make_queryable_batch_with_deletes, }; - use arrow_util::assert_batches_eq; - use data_types::{Partition, PartitionId, ShardId, TableId}; - use iox_time::SystemProvider; - use mutable_batch_lp::lines_to_batches; - use schema::selection::Selection; - use uuid::Uuid; // this test was added to guard against https://github.com/influxdata/influxdb_iox/issues/3782 // where if sending in a single row it would compact into an output of two batches, one of diff --git a/ingester/src/data.rs b/ingester/src/data.rs index cbae688025..0c29b371c6 100644 --- a/ingester/src/data.rs +++ b/ingester/src/data.rs @@ -1,17 +1,12 @@ //! Data for the lifecycle of the Ingester -use crate::{ - compact::{compact_persisting_batch, CompactedStream}, - lifecycle::LifecycleHandle, -}; +use std::{collections::BTreeMap, pin::Pin, sync::Arc}; + use arrow::{error::ArrowError, record_batch::RecordBatch}; use arrow_util::optimize::{optimize_record_batch, optimize_schema}; use async_trait::async_trait; use backoff::{Backoff, BackoffConfig}; -use data_types::{ - DeletePredicate, PartitionId, SequenceNumber, - ShardId, ShardIndex, Timestamp, -}; +use data_types::{PartitionId, SequenceNumber, ShardId, ShardIndex}; use datafusion::physical_plan::SendableRecordBatchStream; use dml::DmlOperation; use futures::{Stream, StreamExt}; @@ -19,21 +14,17 @@ use iox_catalog::interface::{get_table_schema_by_id, Catalog}; use iox_query::exec::Executor; use iox_time::SystemProvider; use metric::{Attributes, Metric, U64Histogram, U64HistogramOptions}; - use object_store::DynObjectStore; use observability_deps::tracing::{debug, warn}; - use parquet_file::storage::ParquetStorage; -use schema::selection::Selection; -use snafu::{OptionExt, ResultExt, Snafu}; -use std::{ - collections::{BTreeMap}, - pin::Pin, - sync::Arc, -}; -use uuid::Uuid; +use snafu::{OptionExt, Snafu}; use write_summary::ShardProgress; +use crate::{ + compact::{compact_persisting_batch, CompactedStream}, + lifecycle::LifecycleHandle, +}; + pub mod namespace; pub mod partition; mod query_dedup; @@ -553,17 +544,17 @@ pub enum FlatIngesterQueryResponse { #[cfg(test)] mod tests { - use super::*; - use crate::{ - data::namespace::NamespaceData, - lifecycle::{LifecycleConfig, LifecycleManager}, + use std::{ + ops::DerefMut, + task::{Context, Poll}, + time::Duration, }; + use arrow::datatypes::SchemaRef; - use assert_matches::assert_matches; use data_types::{ - ColumnId, ColumnSet, CompactionLevel, NamespaceSchema, NonEmptyString, ParquetFileParams, - Sequence, TimestampRange, + ColumnId, ColumnSet, CompactionLevel, DeletePredicate, NamespaceSchema, NonEmptyString, + ParquetFileParams, Sequence, Timestamp, TimestampRange, }; use datafusion::physical_plan::RecordBatchStream; use dml::{DmlDelete, DmlMeta, DmlWrite}; @@ -573,10 +564,13 @@ mod tests { use metric::{MetricObserver, Observation}; use mutable_batch_lp::{lines_to_batches, test_helpers::lp_to_mutable_batch}; use object_store::memory::InMemory; - use std::{ - ops::DerefMut, - task::{Context, Poll}, - time::Duration, + use schema::selection::Selection; + use uuid::Uuid; + + use super::*; + use crate::{ + data::namespace::NamespaceData, + lifecycle::{LifecycleConfig, LifecycleManager}, }; #[tokio::test] diff --git a/ingester/src/handler.rs b/ingester/src/handler.rs index 9018d4fe09..a8a7308150 100644 --- a/ingester/src/handler.rs +++ b/ingester/src/handler.rs @@ -1,15 +1,7 @@ //! Ingest handler -use crate::{ - data::{shard::ShardData, IngesterData, IngesterQueryResponse}, - lifecycle::{run_lifecycle_manager, LifecycleConfig, LifecycleManager}, - poison::PoisonCabinet, - querier_handler::prepare_data_to_querier, - stream_handler::{ - sink_adaptor::IngestSinkAdaptor, sink_instrumentation::SinkInstrumentation, - PeriodicWatermarkFetcher, SequencedStreamHandler, - }, -}; +use std::{collections::BTreeMap, sync::Arc, time::Duration}; + use async_trait::async_trait; use backoff::BackoffConfig; use data_types::{Shard, ShardIndex, TopicMetadata}; @@ -26,7 +18,6 @@ use metric::{DurationHistogram, Metric, U64Counter}; use object_store::DynObjectStore; use observability_deps::tracing::*; use snafu::{ResultExt, Snafu}; -use std::{collections::BTreeMap, sync::Arc, time::Duration}; use tokio::{ sync::{Semaphore, TryAcquireError}, task::{JoinError, JoinHandle}, @@ -35,6 +26,17 @@ use tokio_util::sync::CancellationToken; use write_buffer::core::WriteBufferReading; use write_summary::ShardProgress; +use crate::{ + data::{shard::ShardData, IngesterData, IngesterQueryResponse}, + lifecycle::{run_lifecycle_manager, LifecycleConfig, LifecycleManager}, + poison::PoisonCabinet, + querier_handler::prepare_data_to_querier, + stream_handler::{ + sink_adaptor::IngestSinkAdaptor, sink_instrumentation::SinkInstrumentation, + PeriodicWatermarkFetcher, SequencedStreamHandler, + }, +}; + #[derive(Debug, Snafu)] #[allow(missing_copy_implementations, missing_docs)] pub enum Error { @@ -382,9 +384,8 @@ impl Drop for IngestHandlerImpl { #[cfg(test)] mod tests { - use crate::data::partition::SnapshotBatch; + use std::{num::NonZeroU32, ops::DerefMut}; - use super::*; use data_types::{Namespace, NamespaceSchema, QueryPool, Sequence, SequenceNumber}; use dml::{DmlMeta, DmlWrite}; use iox_catalog::{mem::MemCatalog, validate_or_insert_schema}; @@ -392,10 +393,12 @@ mod tests { use metric::{Attributes, Metric, U64Counter, U64Gauge}; use mutable_batch_lp::lines_to_batches; use object_store::memory::InMemory; - use std::{num::NonZeroU32, ops::DerefMut}; use test_helpers::maybe_start_logging; use write_buffer::mock::{MockBufferForReading, MockBufferSharedState}; + use super::*; + use crate::data::partition::SnapshotBatch; + #[tokio::test] async fn read_from_write_buffer_write_to_mutable_buffer() { let ingester = TestIngester::new().await; @@ -765,8 +768,7 @@ mod tests { verify_ingester_buffer_has_data(ingester, shard, namespace, |first_batch| { if first_batch.min_sequence_number == SequenceNumber::new(1) { panic!( - "initialization did a seek to the beginning rather than \ - the min_unpersisted" + "initialization did a seek to the beginning rather than the min_unpersisted" ); } }) diff --git a/ingester/src/job.rs b/ingester/src/job.rs index 6b20226f23..ffec98838b 100644 --- a/ingester/src/job.rs +++ b/ingester/src/job.rs @@ -1,7 +1,8 @@ +use std::sync::Arc; + use data_types::PartitionId; use iox_time::TimeProvider; use parking_lot::Mutex; -use std::sync::Arc; use tracker::{ AbstractTaskRegistry, TaskRegistration, TaskRegistry, TaskRegistryWithHistory, TaskRegistryWithMetrics, TaskTracker, diff --git a/ingester/src/lifecycle.rs b/ingester/src/lifecycle.rs index 75338b5e02..63a274cc4e 100644 --- a/ingester/src/lifecycle.rs +++ b/ingester/src/lifecycle.rs @@ -5,20 +5,22 @@ //! some absolute number and individual Parquet files that get persisted below some number. It //! is expected that they may be above or below the absolute thresholds. -use crate::{ - data::Persister, - job::{Job, JobRegistry}, - poison::{PoisonCabinet, PoisonPill}, -}; +use std::{collections::BTreeMap, sync::Arc, time::Duration}; + use data_types::{PartitionId, SequenceNumber, ShardId}; use iox_time::{Time, TimeProvider}; use metric::{Metric, U64Counter}; use observability_deps::tracing::{error, info, warn}; use parking_lot::Mutex; -use std::{collections::BTreeMap, sync::Arc, time::Duration}; use tokio_util::sync::CancellationToken; use tracker::TrackedFutureExt; +use crate::{ + data::Persister, + job::{Job, JobRegistry}, + poison::{PoisonCabinet, PoisonPill}, +}; + /// API suitable for ingester tasks to query and update the [`LifecycleManager`] state. pub trait LifecycleHandle: Send + Sync + 'static { /// Logs bytes written into a partition so that it can be tracked for the manager to @@ -566,13 +568,15 @@ pub(crate) async fn run_lifecycle_manager( #[cfg(test)] mod tests { - use super::*; + use std::collections::BTreeSet; + use async_trait::async_trait; use iox_time::MockProvider; use metric::{Attributes, Registry}; - use std::collections::BTreeSet; use tokio::sync::Barrier; + use super::*; + #[derive(Default)] struct TestPersister { persist_called: Mutex>, diff --git a/ingester/src/poison.rs b/ingester/src/poison.rs index 5180e07150..9fdd2733d2 100644 --- a/ingester/src/poison.rs +++ b/ingester/src/poison.rs @@ -1,12 +1,13 @@ -use data_types::ShardIndex; -use futures::Future; -use parking_lot::{RwLock, RwLockUpgradableReadGuard}; -use pin_project::pin_project; use std::{ sync::Arc, task::{Poll, Waker}, }; +use data_types::ShardIndex; +use futures::Future; +use parking_lot::{RwLock, RwLockUpgradableReadGuard}; +use pin_project::pin_project; + #[derive(Debug, Clone, PartialEq, Eq)] #[allow(dead_code)] pub enum PoisonPill { diff --git a/ingester/src/querier_handler.rs b/ingester/src/querier_handler.rs index dda3af9ac1..c8e9d37129 100644 --- a/ingester/src/querier_handler.rs +++ b/ingester/src/querier_handler.rs @@ -1,12 +1,7 @@ //! Handle all requests from Querier -use crate::{ - data::{ - partition::UnpersistedPartitionData, IngesterData, IngesterQueryPartition, - IngesterQueryResponse, - }, - query::QueryableBatch, -}; +use std::sync::Arc; + use arrow::error::ArrowError; use datafusion::{ error::DataFusionError, logical_plan::LogicalPlanBuilder, @@ -22,7 +17,14 @@ use observability_deps::tracing::debug; use predicate::Predicate; use schema::selection::Selection; use snafu::{ensure, ResultExt, Snafu}; -use std::sync::Arc; + +use crate::{ + data::{ + partition::UnpersistedPartitionData, IngesterData, IngesterQueryPartition, + IngesterQueryResponse, + }, + query::QueryableBatch, +}; #[derive(Debug, Snafu)] #[allow(missing_copy_implementations, missing_docs)] @@ -285,6 +287,13 @@ pub(crate) async fn query( #[cfg(test)] mod tests { + use arrow::record_batch::RecordBatch; + use arrow_util::{assert_batches_eq, assert_batches_sorted_eq}; + use assert_matches::assert_matches; + use datafusion::logical_plan::{col, lit}; + use futures::TryStreamExt; + use predicate::Predicate; + use super::*; use crate::{ data::FlatIngesterQueryResponse, @@ -294,12 +303,6 @@ mod tests { make_queryable_batch_with_deletes, DataLocation, TEST_NAMESPACE, TEST_TABLE, }, }; - use arrow::record_batch::RecordBatch; - use arrow_util::{assert_batches_eq, assert_batches_sorted_eq}; - use assert_matches::assert_matches; - use datafusion::logical_plan::{col, lit}; - use futures::TryStreamExt; - use predicate::Predicate; #[tokio::test] async fn test_query() { diff --git a/ingester/src/query.rs b/ingester/src/query.rs index 905051a1bf..ff735d96ca 100644 --- a/ingester/src/query.rs +++ b/ingester/src/query.rs @@ -1,5 +1,7 @@ //! Module to handle query on Ingester's data +use std::{any::Any, sync::Arc}; + use arrow::record_batch::RecordBatch; use arrow_util::util::ensure_schema; use data_types::{ @@ -22,7 +24,6 @@ use predicate::{ }; use schema::{merge::merge_record_batch_schemas, selection::Selection, sort::SortKey, Schema}; use snafu::{ResultExt, Snafu}; -use std::{any::Any, sync::Arc}; use crate::data::partition::SnapshotBatch; @@ -259,8 +260,6 @@ impl QueryChunk for QueryableBatch { #[cfg(test)] mod tests { - use super::*; - use crate::test_util::create_tombstone; use arrow::{ array::{ ArrayRef, BooleanArray, DictionaryArray, Float64Array, Int64Array, StringArray, @@ -270,6 +269,9 @@ mod tests { }; use data_types::{DeleteExpr, Op, Scalar, TimestampRange}; + use super::*; + use crate::test_util::create_tombstone; + #[tokio::test] async fn test_merge_batch_schema() { // Merge schema of the batches diff --git a/ingester/src/server.rs b/ingester/src/server.rs index 387619043f..c05395881c 100644 --- a/ingester/src/server.rs +++ b/ingester/src/server.rs @@ -1,10 +1,9 @@ //! Ingester server entrypoint. -use std::sync::Arc; +use std::{fmt::Debug, sync::Arc}; use self::{grpc::GrpcDelegate, http::HttpDelegate}; use crate::handler::IngestHandler; -use std::fmt::Debug; pub mod grpc; pub mod http; diff --git a/ingester/src/server/grpc.rs b/ingester/src/server/grpc.rs index bcf8301268..4f06a93a46 100644 --- a/ingester/src/server/grpc.rs +++ b/ingester/src/server/grpc.rs @@ -1,9 +1,14 @@ //! gRPC service implementations for `ingester`. -use crate::{ - data::{FlatIngesterQueryResponse, FlatIngesterQueryResponseStream}, - handler::IngestHandler, +use std::{ + pin::Pin, + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, + }, + task::Poll, }; + use arrow::error::ArrowError; use arrow_flight::{ flight_service_server::{FlightService as Flight, FlightServiceServer as FlightServer}, @@ -20,18 +25,15 @@ use observability_deps::tracing::{debug, info, warn}; use pin_project::pin_project; use prost::Message; use snafu::{ResultExt, Snafu}; -use std::{ - pin::Pin, - sync::{ - atomic::{AtomicU64, Ordering}, - Arc, - }, - task::Poll, -}; use tonic::{Request, Response, Streaming}; use trace::ctx::SpanContext; use write_summary::WriteSummary; +use crate::{ + data::{FlatIngesterQueryResponse, FlatIngesterQueryResponseStream}, + handler::IngestHandler, +}; + /// This type is responsible for managing all gRPC services exposed by /// `ingester`. #[derive(Debug, Default)] @@ -465,9 +467,8 @@ mod tests { use mutable_batch_lp::test_helpers::lp_to_mutable_batch; use schema::selection::Selection; - use crate::data::partition::PartitionStatus; - use super::*; + use crate::data::partition::PartitionStatus; #[tokio::test] async fn test_get_stream_empty() { diff --git a/ingester/src/server/http.rs b/ingester/src/server/http.rs index 0630365e9c..ea48a8d6b2 100644 --- a/ingester/src/server/http.rs +++ b/ingester/src/server/http.rs @@ -1,10 +1,12 @@ //! HTTP service implementations for `ingester`. -use crate::handler::IngestHandler; -use hyper::{Body, Request, Response, StatusCode}; use std::sync::Arc; + +use hyper::{Body, Request, Response, StatusCode}; use thiserror::Error; +use crate::handler::IngestHandler; + /// Errors returned by the `router` HTTP request handler. #[derive(Debug, Error, Copy, Clone)] pub enum Error { diff --git a/ingester/src/stream_handler/handler.rs b/ingester/src/stream_handler/handler.rs index 40e34992e7..c998c49bcb 100644 --- a/ingester/src/stream_handler/handler.rs +++ b/ingester/src/stream_handler/handler.rs @@ -1,15 +1,17 @@ -use super::DmlSink; -use crate::lifecycle::{LifecycleHandle, LifecycleHandleImpl}; +use std::{fmt::Debug, time::Duration}; + use data_types::{SequenceNumber, ShardIndex}; use dml::DmlOperation; use futures::{pin_mut, FutureExt, StreamExt}; use iox_time::{SystemProvider, TimeProvider}; use metric::{Attributes, DurationCounter, DurationGauge, U64Counter}; use observability_deps::tracing::*; -use std::{fmt::Debug, time::Duration}; use tokio_util::sync::CancellationToken; use write_buffer::core::{WriteBufferErrorKind, WriteBufferStreamHandler}; +use super::DmlSink; +use crate::lifecycle::{LifecycleHandle, LifecycleHandleImpl}; + /// When the [`LifecycleManager`] indicates that ingest should be paused because /// of memory pressure, the shard will loop, sleeping this long between /// calls to [`LifecycleHandle::can_resume_ingest()`] with the manager if it @@ -89,10 +91,13 @@ impl SequencedStreamHandler { skip_to_oldest_available: bool, ) -> Self { // TTBR - let time_to_be_readable = metrics.register_metric::( - "ingester_ttbr", - "duration of time between producer writing to consumer putting into queryable cache", - ).recorder(metric_attrs(shard_index, &topic_name, None, false)); + let time_to_be_readable = metrics + .register_metric::( + "ingester_ttbr", + "duration of time between producer writing to consumer putting into queryable \ + cache", + ) + .recorder(metric_attrs(shard_index, &topic_name, None, false)); // Lifecycle-driven ingest pause duration let pause_duration = metrics @@ -461,11 +466,8 @@ fn metric_attrs( #[cfg(test)] mod tests { - use super::*; - use crate::{ - lifecycle::{LifecycleConfig, LifecycleManager}, - stream_handler::mock_sink::MockDmlSink, - }; + use std::sync::Arc; + use assert_matches::assert_matches; use async_trait::async_trait; use data_types::{DeletePredicate, Sequence, TimestampRange}; @@ -475,12 +477,17 @@ mod tests { use metric::Metric; use mutable_batch_lp::lines_to_batches; use once_cell::sync::Lazy; - use std::sync::Arc; use test_helpers::timeout::FutureTimeout; use tokio::sync::{mpsc, oneshot}; use tokio_stream::wrappers::ReceiverStream; use write_buffer::core::WriteBufferError; + use super::*; + use crate::{ + lifecycle::{LifecycleConfig, LifecycleManager}, + stream_handler::mock_sink::MockDmlSink, + }; + static TEST_TIME: Lazy