refactor(ingester): limit visibility

Marks many internal data structures as non-pub.

Many remain as they're used across tests / from multiple callers
"peeking", but this limits the scope of false sharing in the future.
pull/24376/head
Dom Dwyer 2022-09-27 12:39:59 +02:00
parent 11be746dc0
commit b873297fad
18 changed files with 119 additions and 156 deletions

View File

@ -52,20 +52,20 @@ pub enum Error {
} }
/// A specialized `Error` for Ingester's Compact errors /// A specialized `Error` for Ingester's Compact errors
pub type Result<T, E = Error> = std::result::Result<T, E>; pub(crate) type Result<T, E = Error> = std::result::Result<T, E>;
/// Result of calling [`compact_persisting_batch`] /// Result of calling [`compact_persisting_batch`]
pub struct CompactedStream { pub(crate) struct CompactedStream {
/// A stream of compacted, deduplicated /// A stream of compacted, deduplicated
/// [`RecordBatch`](arrow::record_batch::RecordBatch)es /// [`RecordBatch`](arrow::record_batch::RecordBatch)es
pub stream: SendableRecordBatchStream, pub(crate) stream: SendableRecordBatchStream,
/// Metadata for `stream` /// Metadata for `stream`
pub iox_metadata: IoxMetadata, pub(crate) iox_metadata: IoxMetadata,
/// An updated [`SortKey`], if any. If returned, the compaction /// An updated [`SortKey`], if any. If returned, the compaction
/// required extending the partition's [`SortKey`] (typically /// required extending the partition's [`SortKey`] (typically
/// because new columns were in this parquet file that were not in /// because new columns were in this parquet file that were not in
/// previous files). /// previous files).
pub sort_key_update: Option<SortKey>, pub(crate) sort_key_update: Option<SortKey>,
} }
impl std::fmt::Debug for CompactedStream { impl std::fmt::Debug for CompactedStream {
@ -80,7 +80,7 @@ impl std::fmt::Debug for CompactedStream {
/// Compact a given persisting batch into a [`CompactedStream`] or /// Compact a given persisting batch into a [`CompactedStream`] or
/// `None` if there is no data to compact. /// `None` if there is no data to compact.
pub async fn compact_persisting_batch( pub(crate) async fn compact_persisting_batch(
time_provider: Arc<dyn TimeProvider>, time_provider: Arc<dyn TimeProvider>,
executor: &Executor, executor: &Executor,
namespace_id: i64, namespace_id: i64,
@ -127,14 +127,14 @@ pub async fn compact_persisting_batch(
let (_min_seq, max_seq) = batch.data.min_max_sequence_numbers(); let (_min_seq, max_seq) = batch.data.min_max_sequence_numbers();
let iox_metadata = IoxMetadata { let iox_metadata = IoxMetadata {
object_store_id: batch.object_store_id, object_store_id: batch.object_store_id(),
creation_timestamp: time_provider.now(), creation_timestamp: time_provider.now(),
shard_id: batch.shard_id, shard_id: batch.shard_id(),
namespace_id: NamespaceId::new(namespace_id), namespace_id: NamespaceId::new(namespace_id),
namespace_name: Arc::from(namespace_name.as_str()), namespace_name: Arc::from(namespace_name.as_str()),
table_id: batch.table_id, table_id: batch.table_id(),
table_name: Arc::from(table_name.as_str()), table_name: Arc::from(table_name.as_str()),
partition_id: batch.partition_id, partition_id: batch.partition_id(),
partition_key: partition_key.clone(), partition_key: partition_key.clone(),
max_sequence_number: max_seq, max_sequence_number: max_seq,
compaction_level: CompactionLevel::Initial, compaction_level: CompactionLevel::Initial,
@ -149,7 +149,7 @@ pub async fn compact_persisting_batch(
} }
/// Compact a given Queryable Batch /// Compact a given Queryable Batch
pub async fn compact( pub(crate) async fn compact(
executor: &Executor, executor: &Executor,
data: Arc<QueryableBatch>, data: Arc<QueryableBatch>,
sort_key: SortKey, sort_key: SortKey,

View File

@ -145,7 +145,7 @@ impl IngesterData {
} }
/// Get shard data for specific shard. /// Get shard data for specific shard.
#[allow(dead_code)] // Used in tests #[cfg(test)]
pub(crate) fn shard(&self, shard_id: ShardId) -> Option<&ShardData> { pub(crate) fn shard(&self, shard_id: ShardId) -> Option<&ShardData> {
self.shards.get(&shard_id) self.shards.get(&shard_id)
} }
@ -178,7 +178,7 @@ impl IngesterData {
/// Return the ingestion progress for the specified shards /// Return the ingestion progress for the specified shards
/// Returns an empty `ShardProgress` for any shards that this ingester doesn't know about. /// Returns an empty `ShardProgress` for any shards that this ingester doesn't know about.
pub(crate) async fn progresses( pub(super) async fn progresses(
&self, &self,
shard_indexes: Vec<ShardIndex>, shard_indexes: Vec<ShardIndex>,
) -> BTreeMap<ShardIndex, ShardProgress> { ) -> BTreeMap<ShardIndex, ShardProgress> {
@ -571,7 +571,7 @@ impl IngesterQueryResponse {
} }
/// Flattened version of [`IngesterQueryResponse`]. /// Flattened version of [`IngesterQueryResponse`].
pub type FlatIngesterQueryResponseStream = pub(crate) type FlatIngesterQueryResponseStream =
Pin<Box<dyn Stream<Item = Result<FlatIngesterQueryResponse, ArrowError>> + Send>>; Pin<Box<dyn Stream<Item = Result<FlatIngesterQueryResponse, ArrowError>> + Send>>;
/// Element within the flat wire protocol. /// Element within the flat wire protocol.

View File

@ -81,7 +81,11 @@ pub(crate) struct NamespaceData {
impl NamespaceData { impl NamespaceData {
/// Initialize new tables with default partition template of daily /// Initialize new tables with default partition template of daily
pub fn new(namespace_id: NamespaceId, shard_id: ShardId, metrics: &metric::Registry) -> Self { pub(super) fn new(
namespace_id: NamespaceId,
shard_id: ShardId,
metrics: &metric::Registry,
) -> Self {
let table_count = metrics let table_count = metrics
.register_metric::<U64Counter>( .register_metric::<U64Counter>(
"ingester_tables_total", "ingester_tables_total",
@ -271,7 +275,7 @@ impl NamespaceData {
/// Walks down the table and partition and clears the persisting batch. The sequence number is /// Walks down the table and partition and clears the persisting batch. The sequence number is
/// the max_sequence_number for the persisted parquet file, which should be kept in the table /// the max_sequence_number for the persisted parquet file, which should be kept in the table
/// data buffer. /// data buffer.
pub(crate) async fn mark_persisted( pub(super) async fn mark_persisted(
&self, &self,
table_name: &str, table_name: &str,
partition_key: &PartitionKey, partition_key: &PartitionKey,
@ -288,7 +292,7 @@ impl NamespaceData {
} }
/// Return progress from this Namespace /// Return progress from this Namespace
pub(crate) async fn progress(&self) -> ShardProgress { pub(super) async fn progress(&self) -> ShardProgress {
let tables: Vec<_> = self.tables.read().values().map(Arc::clone).collect(); let tables: Vec<_> = self.tables.read().values().map(Arc::clone).collect();
// Consolidate progtress across partitions. // Consolidate progtress across partitions.

View File

@ -20,10 +20,10 @@ pub mod resolver;
/// Read only copy of the unpersisted data for a partition in the ingester for a specific partition. /// Read only copy of the unpersisted data for a partition in the ingester for a specific partition.
#[derive(Debug)] #[derive(Debug)]
pub(crate) struct UnpersistedPartitionData { pub(crate) struct UnpersistedPartitionData {
pub partition_id: PartitionId, pub(crate) partition_id: PartitionId,
pub non_persisted: Vec<Arc<SnapshotBatch>>, pub(crate) non_persisted: Vec<Arc<SnapshotBatch>>,
pub persisting: Option<QueryableBatch>, pub(crate) persisting: Option<QueryableBatch>,
pub partition_status: PartitionStatus, pub(crate) partition_status: PartitionStatus,
} }
/// Status of a partition that has unpersisted data. /// Status of a partition that has unpersisted data.
@ -43,7 +43,7 @@ pub struct PartitionStatus {
/// PersistingBatch contains all needed info and data for creating /// PersistingBatch contains all needed info and data for creating
/// a parquet file for given set of SnapshotBatches /// a parquet file for given set of SnapshotBatches
#[derive(Debug, PartialEq, Clone)] #[derive(Debug, PartialEq, Clone)]
pub struct PersistingBatch { pub(crate) struct PersistingBatch {
/// Shard id of the data /// Shard id of the data
pub(crate) shard_id: ShardId, pub(crate) shard_id: ShardId,
@ -60,9 +60,27 @@ pub struct PersistingBatch {
pub(crate) data: Arc<QueryableBatch>, pub(crate) data: Arc<QueryableBatch>,
} }
impl PersistingBatch {
pub(crate) fn object_store_id(&self) -> Uuid {
self.object_store_id
}
pub(crate) fn shard_id(&self) -> ShardId {
self.shard_id
}
pub(crate) fn table_id(&self) -> TableId {
self.table_id
}
pub(crate) fn partition_id(&self) -> PartitionId {
self.partition_id
}
}
/// SnapshotBatch contains data of many contiguous BufferBatches /// SnapshotBatch contains data of many contiguous BufferBatches
#[derive(Debug, PartialEq)] #[derive(Debug, PartialEq)]
pub struct SnapshotBatch { pub(crate) struct SnapshotBatch {
/// Min sequence number of its combined BufferBatches /// Min sequence number of its combined BufferBatches
pub(crate) min_sequence_number: SequenceNumber, pub(crate) min_sequence_number: SequenceNumber,
/// Max sequence number of its combined BufferBatches /// Max sequence number of its combined BufferBatches
@ -73,7 +91,10 @@ pub struct SnapshotBatch {
impl SnapshotBatch { impl SnapshotBatch {
/// Return only data of the given columns /// Return only data of the given columns
pub fn scan(&self, selection: Selection<'_>) -> Result<Option<Arc<RecordBatch>>, super::Error> { pub(crate) fn scan(
&self,
selection: Selection<'_>,
) -> Result<Option<Arc<RecordBatch>>, super::Error> {
Ok(match selection { Ok(match selection {
Selection::All => Some(Arc::clone(&self.data)), Selection::All => Some(Arc::clone(&self.data)),
Selection::Some(columns) => { Selection::Some(columns) => {
@ -129,7 +150,7 @@ pub(crate) struct PartitionData {
impl PartitionData { impl PartitionData {
/// Initialize a new partition data buffer /// Initialize a new partition data buffer
pub fn new( pub(crate) fn new(
id: PartitionId, id: PartitionId,
shard_id: ShardId, shard_id: ShardId,
table_id: TableId, table_id: TableId,

View File

@ -251,13 +251,13 @@ impl DataBuffer {
/// BufferBatch is a MutableBatch with its ingesting order, sequence_number, that helps the /// BufferBatch is a MutableBatch with its ingesting order, sequence_number, that helps the
/// ingester keep the batches of data in their ingesting order /// ingester keep the batches of data in their ingesting order
#[derive(Debug)] #[derive(Debug)]
pub struct BufferBatch { pub(crate) struct BufferBatch {
/// Sequence number of the first write in this batch /// Sequence number of the first write in this batch
pub(crate) min_sequence_number: SequenceNumber, pub(crate) min_sequence_number: SequenceNumber,
/// Sequence number of the last write in this batch /// Sequence number of the last write in this batch
pub(crate) max_sequence_number: SequenceNumber, pub(super) max_sequence_number: SequenceNumber,
/// Ingesting data /// Ingesting data
pub(crate) data: MutableBatch, pub(super) data: MutableBatch,
} }
impl BufferBatch { impl BufferBatch {

View File

@ -39,7 +39,7 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Query a given Queryable Batch, applying selection and filters as appropriate /// Query a given Queryable Batch, applying selection and filters as appropriate
/// Return stream of record batches /// Return stream of record batches
pub async fn query( pub(crate) async fn query(
executor: &Executor, executor: &Executor,
data: Arc<QueryableBatch>, data: Arc<QueryableBatch>,
) -> Result<SendableRecordBatchStream> { ) -> Result<SendableRecordBatchStream> {

View File

@ -19,7 +19,7 @@ use crate::lifecycle::LifecycleHandle;
/// Data of a Shard /// Data of a Shard
#[derive(Debug)] #[derive(Debug)]
pub struct ShardData { pub(crate) struct ShardData {
/// The shard index for this shard /// The shard index for this shard
shard_index: ShardIndex, shard_index: ShardIndex,
/// The catalog ID for this shard. /// The catalog ID for this shard.
@ -34,7 +34,11 @@ pub struct ShardData {
impl ShardData { impl ShardData {
/// Initialise a new [`ShardData`] that emits metrics to `metrics`. /// Initialise a new [`ShardData`] that emits metrics to `metrics`.
pub fn new(shard_index: ShardIndex, shard_id: ShardId, metrics: Arc<metric::Registry>) -> Self { pub(super) fn new(
shard_index: ShardIndex,
shard_id: ShardId,
metrics: Arc<metric::Registry>,
) -> Self {
let namespace_count = metrics let namespace_count = metrics
.register_metric::<U64Counter>( .register_metric::<U64Counter>(
"ingester_namespaces_total", "ingester_namespaces_total",
@ -115,7 +119,7 @@ impl ShardData {
} }
/// Return the progress of this shard /// Return the progress of this shard
pub(crate) async fn progress(&self) -> ShardProgress { pub(super) async fn progress(&self) -> ShardProgress {
let namespaces: Vec<_> = self.namespaces.read().values().map(Arc::clone).collect(); let namespaces: Vec<_> = self.namespaces.read().values().map(Arc::clone).collect();
let mut progress = ShardProgress::new(); let mut progress = ShardProgress::new();

View File

@ -45,7 +45,7 @@ impl TableData {
/// The partition provider is used to instantiate a [`PartitionData`] /// The partition provider is used to instantiate a [`PartitionData`]
/// instance when this [`TableData`] instance observes an op for a partition /// instance when this [`TableData`] instance observes an op for a partition
/// for the first time. /// for the first time.
pub fn new( pub(super) fn new(
table_id: TableId, table_id: TableId,
table_name: &str, table_name: &str,
shard_id: ShardId, shard_id: ShardId,
@ -179,7 +179,7 @@ impl TableData {
} }
/// Return progress from this Table /// Return progress from this Table
pub(crate) fn progress(&self) -> ShardProgress { pub(super) fn progress(&self) -> ShardProgress {
let progress = ShardProgress::new(); let progress = ShardProgress::new();
let progress = match self.parquet_max_sequence_number() { let progress = match self.parquet_max_sequence_number() {
Some(n) => progress.with_persisted(n), Some(n) => progress.with_persisted(n),
@ -194,7 +194,7 @@ impl TableData {
} }
#[cfg(test)] #[cfg(test)]
pub(crate) fn table_id(&self) -> TableId { pub(super) fn table_id(&self) -> TableId {
self.table_id self.table_id
} }
} }

View File

@ -32,8 +32,8 @@ use crate::{
poison::PoisonCabinet, poison::PoisonCabinet,
querier_handler::prepare_data_to_querier, querier_handler::prepare_data_to_querier,
stream_handler::{ stream_handler::{
sink_adaptor::IngestSinkAdaptor, sink_instrumentation::SinkInstrumentation, handler::SequencedStreamHandler, sink_adaptor::IngestSinkAdaptor,
PeriodicWatermarkFetcher, SequencedStreamHandler, sink_instrumentation::SinkInstrumentation, PeriodicWatermarkFetcher,
}, },
}; };

View File

@ -13,16 +13,16 @@
clippy::clone_on_ref_ptr clippy::clone_on_ref_ptr
)] )]
pub mod compact; pub(crate) mod compact;
pub mod data; pub mod data;
pub mod handler; pub mod handler;
mod job; mod job;
pub mod lifecycle; pub mod lifecycle;
mod poison; mod poison;
pub mod querier_handler; pub mod querier_handler;
pub mod query; pub(crate) mod query;
pub mod server; pub mod server;
pub mod stream_handler; pub(crate) mod stream_handler;
#[cfg(test)] #[cfg(test)]
pub mod test_util; pub(crate) mod test_util;

View File

@ -21,7 +21,7 @@ use crate::{
poison::{PoisonCabinet, PoisonPill}, poison::{PoisonCabinet, PoisonPill},
}; };
/// API suitable for ingester tasks to query and update the [`LifecycleManager`] state. /// API suitable for ingester tasks to query and update the server lifecycle state.
pub trait LifecycleHandle: Send + Sync + 'static { pub trait LifecycleHandle: Send + Sync + 'static {
/// Logs bytes written into a partition so that it can be tracked for the manager to /// Logs bytes written into a partition so that it can be tracked for the manager to
/// trigger persistence. Returns true if the ingester should pause consuming from the /// trigger persistence. Returns true if the ingester should pause consuming from the
@ -46,7 +46,7 @@ pub trait LifecycleHandle: Send + Sync + 'static {
/// This handle presents an API suitable for ingester tasks to query and update /// This handle presents an API suitable for ingester tasks to query and update
/// the [`LifecycleManager`] state. /// the [`LifecycleManager`] state.
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct LifecycleHandleImpl { pub(crate) struct LifecycleHandleImpl {
time_provider: Arc<dyn TimeProvider>, time_provider: Arc<dyn TimeProvider>,
config: Arc<LifecycleConfig>, config: Arc<LifecycleConfig>,
@ -113,7 +113,7 @@ impl LifecycleHandle for LifecycleHandleImpl {
/// A [`LifecycleManager`] MUST be driven by an external actor periodically /// A [`LifecycleManager`] MUST be driven by an external actor periodically
/// calling [`LifecycleManager::maybe_persist()`]. /// calling [`LifecycleManager::maybe_persist()`].
#[derive(Debug)] #[derive(Debug)]
pub struct LifecycleManager { pub(crate) struct LifecycleManager {
config: Arc<LifecycleConfig>, config: Arc<LifecycleConfig>,
time_provider: Arc<dyn TimeProvider>, time_provider: Arc<dyn TimeProvider>,
job_registry: Arc<JobRegistry>, job_registry: Arc<JobRegistry>,
@ -280,7 +280,7 @@ impl LifecycleManager {
} }
/// Acquire a shareable [`LifecycleHandle`] for this manager instance. /// Acquire a shareable [`LifecycleHandle`] for this manager instance.
pub fn handle(&self) -> LifecycleHandleImpl { pub(super) fn handle(&self) -> LifecycleHandleImpl {
LifecycleHandleImpl { LifecycleHandleImpl {
time_provider: Arc::clone(&self.time_provider), time_provider: Arc::clone(&self.time_provider),
config: Arc::clone(&self.config), config: Arc::clone(&self.config),

View File

@ -49,7 +49,7 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Queryable data used for both query and persistence /// Queryable data used for both query and persistence
#[derive(Debug, PartialEq, Clone)] #[derive(Debug, PartialEq, Clone)]
pub struct QueryableBatch { pub(crate) struct QueryableBatch {
/// data /// data
pub(crate) data: Vec<Arc<SnapshotBatch>>, pub(crate) data: Vec<Arc<SnapshotBatch>>,
@ -65,7 +65,7 @@ pub struct QueryableBatch {
impl QueryableBatch { impl QueryableBatch {
/// Initilaize a QueryableBatch /// Initilaize a QueryableBatch
pub fn new( pub(crate) fn new(
table_name: Arc<str>, table_name: Arc<str>,
partition_id: PartitionId, partition_id: PartitionId,
data: Vec<Arc<SnapshotBatch>>, data: Vec<Arc<SnapshotBatch>>,
@ -81,19 +81,19 @@ impl QueryableBatch {
} }
/// Add snapshots to this batch /// Add snapshots to this batch
pub fn with_data(mut self, mut data: Vec<Arc<SnapshotBatch>>) -> Self { pub(crate) fn with_data(mut self, mut data: Vec<Arc<SnapshotBatch>>) -> Self {
self.data.append(&mut data); self.data.append(&mut data);
self self
} }
/// Add more tombstones /// Add more tombstones
pub fn add_tombstones(&mut self, deletes: &[Tombstone]) { pub(crate) fn add_tombstones(&mut self, deletes: &[Tombstone]) {
let delete_predicates = tombstones_to_delete_predicates_iter(deletes); let delete_predicates = tombstones_to_delete_predicates_iter(deletes);
self.delete_predicates.extend(delete_predicates); self.delete_predicates.extend(delete_predicates);
} }
/// return min and max of all the snapshots /// return min and max of all the snapshots
pub fn min_max_sequence_numbers(&self) -> (SequenceNumber, SequenceNumber) { pub(crate) fn min_max_sequence_numbers(&self) -> (SequenceNumber, SequenceNumber) {
let min = self let min = self
.data .data
.first() .first()
@ -112,7 +112,7 @@ impl QueryableBatch {
} }
/// return true if it has no data /// return true if it has no data
pub fn is_empty(&self) -> bool { pub(crate) fn is_empty(&self) -> bool {
self.data.is_empty() self.data.is_empty()
} }
} }

View File

@ -1,3 +1,5 @@
//! A handler of streamed ops from a write buffer.
use std::{fmt::Debug, time::Duration}; use std::{fmt::Debug, time::Duration};
use data_types::{SequenceNumber, ShardIndex}; use data_types::{SequenceNumber, ShardIndex};
@ -32,7 +34,7 @@ const INGEST_POLL_INTERVAL: Duration = Duration::from_millis(100);
/// [`LifecycleManager`]: crate::lifecycle::LifecycleManager /// [`LifecycleManager`]: crate::lifecycle::LifecycleManager
/// [`LifecycleHandle::can_resume_ingest()`]: crate::lifecycle::LifecycleHandle::can_resume_ingest() /// [`LifecycleHandle::can_resume_ingest()`]: crate::lifecycle::LifecycleHandle::can_resume_ingest()
#[derive(Debug)] #[derive(Debug)]
pub struct SequencedStreamHandler<I, O, T = SystemProvider> { pub(crate) struct SequencedStreamHandler<I, O, T = SystemProvider> {
/// Creator/manager of the stream of DML ops /// Creator/manager of the stream of DML ops
write_buffer_stream_handler: I, write_buffer_stream_handler: I,
@ -80,7 +82,7 @@ impl<I, O> SequencedStreamHandler<I, O> {
/// `stream` once [`SequencedStreamHandler::run()`] is called, and /// `stream` once [`SequencedStreamHandler::run()`] is called, and
/// gracefully stops when `shutdown` is cancelled. /// gracefully stops when `shutdown` is cancelled.
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
pub fn new( pub(crate) fn new(
write_buffer_stream_handler: I, write_buffer_stream_handler: I,
current_sequence_number: SequenceNumber, current_sequence_number: SequenceNumber,
sink: O, sink: O,

View File

@ -11,12 +11,13 @@
//! limited by the [`LifecycleManager`] it is initialised by, pausing until //! limited by the [`LifecycleManager`] it is initialised by, pausing until
//! [`LifecycleHandle::can_resume_ingest()`] returns true. //! [`LifecycleHandle::can_resume_ingest()`] returns true.
//! //!
//! [`SequencedStreamHandler`]: handler::SequencedStreamHandler
//! [`DmlOperation`]: dml::DmlOperation //! [`DmlOperation`]: dml::DmlOperation
//! [`WriteBufferReading`]: write_buffer::core::WriteBufferReading //! [`WriteBufferReading`]: write_buffer::core::WriteBufferReading
//! [`LifecycleManager`]: crate::lifecycle::LifecycleManager //! [`LifecycleManager`]: crate::lifecycle::LifecycleManager
//! [`LifecycleHandle::can_resume_ingest()`]: crate::lifecycle::LifecycleHandle::can_resume_ingest() //! [`LifecycleHandle::can_resume_ingest()`]: crate::lifecycle::LifecycleHandle::can_resume_ingest()
mod handler; pub mod handler;
mod periodic_watermark_fetcher; mod periodic_watermark_fetcher;
mod sink; mod sink;
@ -27,6 +28,5 @@ pub mod mock_watermark_fetcher;
pub mod sink_adaptor; pub mod sink_adaptor;
pub mod sink_instrumentation; pub mod sink_instrumentation;
pub use handler::*;
pub use periodic_watermark_fetcher::*; pub use periodic_watermark_fetcher::*;
pub use sink::*; pub use sink::*;

View File

@ -32,7 +32,7 @@ pub struct PeriodicWatermarkFetcher {
impl PeriodicWatermarkFetcher { impl PeriodicWatermarkFetcher {
/// Instantiate a new [`PeriodicWatermarkFetcher`] that polls `write_buffer` /// Instantiate a new [`PeriodicWatermarkFetcher`] that polls `write_buffer`
/// every `interval` period for the maximum offset for `shard_index`. /// every `interval` period for the maximum offset for `shard_index`.
pub fn new( pub(crate) fn new(
write_buffer: Arc<dyn WriteBufferReading>, write_buffer: Arc<dyn WriteBufferReading>,
shard_index: ShardIndex, shard_index: ShardIndex,
interval: Duration, interval: Duration,
@ -61,7 +61,7 @@ impl PeriodicWatermarkFetcher {
/// ///
/// If the watermark value has yet to be observed (i.e. due to continuous /// If the watermark value has yet to be observed (i.e. due to continuous
/// fetch errors) `None` is returned. /// fetch errors) `None` is returned.
pub fn cached_watermark(&self) -> Option<i64> { pub(crate) fn cached_watermark(&self) -> Option<i64> {
match self.last_watermark.load(Ordering::Relaxed) { match self.last_watermark.load(Ordering::Relaxed) {
// A value of 0 means "never observed a watermark". // A value of 0 means "never observed a watermark".
0 => None, 0 => None,

View File

@ -11,7 +11,7 @@ use crate::{data::IngesterData, lifecycle::LifecycleHandleImpl};
/// Provides a [`DmlSink`] implementation for a [`IngesterData`] instance. /// Provides a [`DmlSink`] implementation for a [`IngesterData`] instance.
#[derive(Debug)] #[derive(Debug)]
pub struct IngestSinkAdaptor { pub(crate) struct IngestSinkAdaptor {
ingest_data: Arc<IngesterData>, ingest_data: Arc<IngesterData>,
lifecycle_handle: LifecycleHandleImpl, lifecycle_handle: LifecycleHandleImpl,
shard_id: ShardId, shard_id: ShardId,
@ -20,7 +20,7 @@ pub struct IngestSinkAdaptor {
impl IngestSinkAdaptor { impl IngestSinkAdaptor {
/// Wrap an [`IngesterData`] in an adaptor layer to provide a [`DmlSink`] /// Wrap an [`IngesterData`] in an adaptor layer to provide a [`DmlSink`]
/// implementation. /// implementation.
pub fn new( pub(crate) fn new(
ingest_data: Arc<IngesterData>, ingest_data: Arc<IngesterData>,
lifecycle_handle: LifecycleHandleImpl, lifecycle_handle: LifecycleHandleImpl,
shard_id: ShardId, shard_id: ShardId,

View File

@ -17,7 +17,7 @@ use super::DmlSink;
/// # Caching /// # Caching
/// ///
/// Implementations may cache the watermark and return inaccurate values. /// Implementations may cache the watermark and return inaccurate values.
pub trait WatermarkFetcher: Debug + Send + Sync { pub(crate) trait WatermarkFetcher: Debug + Send + Sync {
/// Return a watermark if available. /// Return a watermark if available.
fn watermark(&self) -> Option<i64>; fn watermark(&self) -> Option<i64>;
} }
@ -37,7 +37,7 @@ pub trait WatermarkFetcher: Debug + Send + Sync {
/// instance is running on, and the routers. If either of these clocks are /// instance is running on, and the routers. If either of these clocks are
/// incorrect/skewed/drifting the metrics emitted may be incorrect. /// incorrect/skewed/drifting the metrics emitted may be incorrect.
#[derive(Debug)] #[derive(Debug)]
pub struct SinkInstrumentation<F, T, P = SystemProvider> { pub(crate) struct SinkInstrumentation<F, T, P = SystemProvider> {
/// The [`DmlSink`] impl this layer decorates. /// The [`DmlSink`] impl this layer decorates.
/// ///
/// All ops this impl is called with are passed into `inner` for processing. /// All ops this impl is called with are passed into `inner` for processing.
@ -78,7 +78,7 @@ where
/// The current high watermark is read from `watermark_fetcher` and used to /// The current high watermark is read from `watermark_fetcher` and used to
/// derive some metric values (such as lag). This impl is tolerant of /// derive some metric values (such as lag). This impl is tolerant of
/// cached/stale watermark values being returned by `watermark_fetcher`. /// cached/stale watermark values being returned by `watermark_fetcher`.
pub fn new( pub(crate) fn new(
inner: T, inner: T,
watermark_fetcher: F, watermark_fetcher: F,
topic_name: String, topic_name: String,

View File

@ -1,6 +1,7 @@
//! Test setups and data for ingester crate //! Test setups and data for ingester crate
#![allow(missing_docs)] #![allow(missing_docs)]
#![cfg(test)]
use std::{sync::Arc, time::Duration}; use std::{sync::Arc, time::Duration};
@ -14,7 +15,7 @@ use data_types::{
use dml::{DmlDelete, DmlMeta, DmlOperation, DmlWrite}; use dml::{DmlDelete, DmlMeta, DmlOperation, DmlWrite};
use iox_catalog::{interface::Catalog, mem::MemCatalog}; use iox_catalog::{interface::Catalog, mem::MemCatalog};
use iox_query::test::{raw_data, TestChunk}; use iox_query::test::{raw_data, TestChunk};
use iox_time::{SystemProvider, Time, TimeProvider}; use iox_time::{SystemProvider, Time};
use mutable_batch_lp::lines_to_batches; use mutable_batch_lp::lines_to_batches;
use object_store::memory::InMemory; use object_store::memory::InMemory;
use parquet_file::metadata::IoxMetadata; use parquet_file::metadata::IoxMetadata;
@ -31,77 +32,8 @@ use crate::{
query::QueryableBatch, query::QueryableBatch,
}; };
/// Create a persisting batch, some tombstones and corresponding metadata for them after compaction
pub async fn make_persisting_batch_with_meta() -> (Arc<PersistingBatch>, Vec<Tombstone>, IoxMetadata)
{
// record batches of input data
let batches = create_batches_with_influxtype_different_columns_different_order().await;
// tombstones
let tombstones = vec![
create_tombstone(
1,
1,
1,
100, // delete's seq_number
0, // min time of data to get deleted
200000, // max time of data to get deleted
"tag2=CT and field_int=1000", // delete predicate
),
create_tombstone(
1, 1, 1, 101, // delete's seq_number
0, // min time of data to get deleted
200000, // max time of data to get deleted
"tag1!=MT", // delete predicate
),
];
// IDs set to the persisting batch and its compacted metadata
let uuid = Uuid::new_v4();
let namespace_name = "test_namespace";
let partition_key = "test_partition_key";
let table_name = "test_table";
let shard_id = 1;
let seq_num_start: i64 = 1;
let seq_num_end: i64 = seq_num_start + 1; // 2 batches
let namespace_id = 1;
let table_id = 1;
let partition_id = 1;
// make the persisting batch
let persisting_batch = make_persisting_batch(
shard_id,
seq_num_start,
table_id,
table_name,
partition_id,
uuid,
batches,
tombstones.clone(),
);
// make metadata
let time_provider = Arc::new(SystemProvider::new());
let meta = make_meta(
uuid,
time_provider.now(),
shard_id,
namespace_id,
namespace_name,
table_id,
table_name,
partition_id,
partition_key,
seq_num_end,
CompactionLevel::Initial,
Some(SortKey::from_columns(vec!["tag1", "tag2", "time"])),
);
(persisting_batch, tombstones, meta)
}
/// Create tombstone for testing /// Create tombstone for testing
pub fn create_tombstone( pub(crate) fn create_tombstone(
id: i64, id: i64,
table_id: i64, table_id: i64,
shard_id: i64, shard_id: i64,
@ -122,7 +54,7 @@ pub fn create_tombstone(
} }
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
pub fn make_meta( pub(crate) fn make_meta(
object_store_id: Uuid, object_store_id: Uuid,
creation_timestamp: Time, creation_timestamp: Time,
shard_id: i64, shard_id: i64,
@ -153,7 +85,7 @@ pub fn make_meta(
} }
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
pub fn make_persisting_batch( pub(crate) fn make_persisting_batch(
shard_id: i64, shard_id: i64,
seq_num_start: i64, seq_num_start: i64,
table_id: i64, table_id: i64,
@ -170,7 +102,6 @@ pub fn make_persisting_batch(
batches, batches,
tombstones, tombstones,
); );
Arc::new(PersistingBatch { Arc::new(PersistingBatch {
shard_id: ShardId::new(shard_id), shard_id: ShardId::new(shard_id),
table_id: TableId::new(table_id), table_id: TableId::new(table_id),
@ -180,7 +111,7 @@ pub fn make_persisting_batch(
}) })
} }
pub fn make_queryable_batch( pub(crate) fn make_queryable_batch(
table_name: &str, table_name: &str,
partition_id: i64, partition_id: i64,
seq_num_start: i64, seq_num_start: i64,
@ -189,7 +120,7 @@ pub fn make_queryable_batch(
make_queryable_batch_with_deletes(table_name, partition_id, seq_num_start, batches, vec![]) make_queryable_batch_with_deletes(table_name, partition_id, seq_num_start, batches, vec![])
} }
pub fn make_queryable_batch_with_deletes( pub(crate) fn make_queryable_batch_with_deletes(
table_name: &str, table_name: &str,
partition_id: i64, partition_id: i64,
seq_num_start: i64, seq_num_start: i64,
@ -213,7 +144,7 @@ pub fn make_queryable_batch_with_deletes(
)) ))
} }
pub fn make_snapshot_batch( pub(crate) fn make_snapshot_batch(
batch: Arc<RecordBatch>, batch: Arc<RecordBatch>,
min: SequenceNumber, min: SequenceNumber,
max: SequenceNumber, max: SequenceNumber,
@ -225,7 +156,7 @@ pub fn make_snapshot_batch(
} }
} }
pub async fn create_one_row_record_batch_with_influxtype() -> Vec<Arc<RecordBatch>> { pub(crate) async fn create_one_row_record_batch_with_influxtype() -> Vec<Arc<RecordBatch>> {
let chunk1 = Arc::new( let chunk1 = Arc::new(
TestChunk::new("t") TestChunk::new("t")
.with_id(1) .with_id(1)
@ -253,7 +184,8 @@ pub async fn create_one_row_record_batch_with_influxtype() -> Vec<Arc<RecordBatc
batches batches
} }
pub async fn create_one_record_batch_with_influxtype_no_duplicates() -> Vec<Arc<RecordBatch>> { pub(crate) async fn create_one_record_batch_with_influxtype_no_duplicates() -> Vec<Arc<RecordBatch>>
{
let chunk1 = Arc::new( let chunk1 = Arc::new(
TestChunk::new("t") TestChunk::new("t")
.with_id(1) .with_id(1)
@ -283,7 +215,7 @@ pub async fn create_one_record_batch_with_influxtype_no_duplicates() -> Vec<Arc<
batches batches
} }
pub async fn create_one_record_batch_with_influxtype_duplicates() -> Vec<Arc<RecordBatch>> { pub(crate) async fn create_one_record_batch_with_influxtype_duplicates() -> Vec<Arc<RecordBatch>> {
let chunk1 = Arc::new( let chunk1 = Arc::new(
TestChunk::new("t") TestChunk::new("t")
.with_id(1) .with_id(1)
@ -321,7 +253,7 @@ pub async fn create_one_record_batch_with_influxtype_duplicates() -> Vec<Arc<Rec
} }
/// RecordBatches with knowledge of influx metadata /// RecordBatches with knowledge of influx metadata
pub async fn create_batches_with_influxtype() -> Vec<Arc<RecordBatch>> { pub(crate) async fn create_batches_with_influxtype() -> Vec<Arc<RecordBatch>> {
// Use the available TestChunk to create chunks and then convert them to raw RecordBatches // Use the available TestChunk to create chunks and then convert them to raw RecordBatches
let mut batches = vec![]; let mut batches = vec![];
@ -407,7 +339,7 @@ pub async fn create_batches_with_influxtype() -> Vec<Arc<RecordBatch>> {
} }
/// RecordBatches with knowledge of influx metadata /// RecordBatches with knowledge of influx metadata
pub async fn create_batches_with_influxtype_different_columns() -> Vec<Arc<RecordBatch>> { pub(crate) async fn create_batches_with_influxtype_different_columns() -> Vec<Arc<RecordBatch>> {
// Use the available TestChunk to create chunks and then convert them to raw RecordBatches // Use the available TestChunk to create chunks and then convert them to raw RecordBatches
let mut batches = vec![]; let mut batches = vec![];
@ -471,7 +403,7 @@ pub async fn create_batches_with_influxtype_different_columns() -> Vec<Arc<Recor
} }
/// RecordBatches with knowledge of influx metadata /// RecordBatches with knowledge of influx metadata
pub async fn create_batches_with_influxtype_different_columns_different_order( pub(crate) async fn create_batches_with_influxtype_different_columns_different_order(
) -> Vec<Arc<RecordBatch>> { ) -> Vec<Arc<RecordBatch>> {
// Use the available TestChunk to create chunks and then convert them to raw RecordBatches // Use the available TestChunk to create chunks and then convert them to raw RecordBatches
let mut batches = vec![]; let mut batches = vec![];
@ -535,7 +467,8 @@ pub async fn create_batches_with_influxtype_different_columns_different_order(
} }
/// Has 2 tag columns; tag1 has a lower cardinality (3) than tag3 (4) /// Has 2 tag columns; tag1 has a lower cardinality (3) than tag3 (4)
pub async fn create_batches_with_influxtype_different_cardinality() -> Vec<Arc<RecordBatch>> { pub(crate) async fn create_batches_with_influxtype_different_cardinality() -> Vec<Arc<RecordBatch>>
{
// Use the available TestChunk to create chunks and then convert them to raw RecordBatches // Use the available TestChunk to create chunks and then convert them to raw RecordBatches
let mut batches = vec![]; let mut batches = vec![];
@ -589,7 +522,8 @@ pub async fn create_batches_with_influxtype_different_cardinality() -> Vec<Arc<R
} }
/// RecordBatches with knowledge of influx metadata /// RecordBatches with knowledge of influx metadata
pub async fn create_batches_with_influxtype_same_columns_different_type() -> Vec<Arc<RecordBatch>> { pub(crate) async fn create_batches_with_influxtype_same_columns_different_type(
) -> Vec<Arc<RecordBatch>> {
// Use the available TestChunk to create chunks and then convert them to raw RecordBatches // Use the available TestChunk to create chunks and then convert them to raw RecordBatches
let mut batches = vec![]; let mut batches = vec![];
@ -641,12 +575,10 @@ pub async fn create_batches_with_influxtype_same_columns_different_type() -> Vec
batches batches
} }
pub const TEST_NAMESPACE: &str = "test_namespace"; pub(crate) const TEST_NAMESPACE: &str = "test_namespace";
pub const TEST_NAMESPACE_EMPTY: &str = "test_namespace_empty"; pub(crate) const TEST_TABLE: &str = "test_table";
pub const TEST_TABLE: &str = "test_table"; pub(crate) const TEST_PARTITION_1: &str = "test+partition_1";
pub const TEST_TABLE_EMPTY: &str = "test_table_empty"; pub(crate) const TEST_PARTITION_2: &str = "test+partition_2";
pub const TEST_PARTITION_1: &str = "test+partition_1";
pub const TEST_PARTITION_2: &str = "test+partition_2";
bitflags! { bitflags! {
/// Make the same in-memory data but data are split between: /// Make the same in-memory data but data are split between:
@ -660,7 +592,7 @@ bitflags! {
/// . snapshot + persisting /// . snapshot + persisting
/// . buffer + snapshot + persisting /// . buffer + snapshot + persisting
/// . If the second partittion exists, it only has data in its buffer /// . If the second partittion exists, it only has data in its buffer
pub struct DataLocation: u8 { pub(crate) struct DataLocation: u8 {
const BUFFER = 0b001; const BUFFER = 0b001;
const SNAPSHOT = 0b010; const SNAPSHOT = 0b010;
const PERSISTING = 0b100; const PERSISTING = 0b100;
@ -673,7 +605,7 @@ bitflags! {
/// This function produces one scenario but with the parameter combination (2*7), /// This function produces one scenario but with the parameter combination (2*7),
/// you will be able to produce 14 scenarios by calling it in 2 loops /// you will be able to produce 14 scenarios by calling it in 2 loops
pub async fn make_ingester_data(two_partitions: bool, loc: DataLocation) -> IngesterData { pub(crate) async fn make_ingester_data(two_partitions: bool, loc: DataLocation) -> IngesterData {
// Whatever data because they won't be used in the tests // Whatever data because they won't be used in the tests
let metrics: Arc<metric::Registry> = Default::default(); let metrics: Arc<metric::Registry> = Default::default();
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new(Arc::clone(&metrics))); let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new(Arc::clone(&metrics)));
@ -739,7 +671,7 @@ pub async fn make_ingester_data(two_partitions: bool, loc: DataLocation) -> Inge
ingester ingester
} }
pub async fn make_ingester_data_with_tombstones(loc: DataLocation) -> IngesterData { pub(crate) async fn make_ingester_data_with_tombstones(loc: DataLocation) -> IngesterData {
// Whatever data because they won't be used in the tests // Whatever data because they won't be used in the tests
let metrics: Arc<metric::Registry> = Default::default(); let metrics: Arc<metric::Registry> = Default::default();
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new(Arc::clone(&metrics))); let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new(Arc::clone(&metrics)));