diff --git a/ingester/src/data.rs b/ingester/src/data.rs index 0e300b04e6..1bdac05160 100644 --- a/ingester/src/data.rs +++ b/ingester/src/data.rs @@ -98,14 +98,17 @@ pub struct IngesterData { impl IngesterData { /// Create new instance. - pub fn new( + pub fn new( object_store: Arc, catalog: Arc, - shards: BTreeMap, + shards: T, exec: Arc, backoff_config: BackoffConfig, metrics: Arc, - ) -> Self { + ) -> Self + where + T: IntoIterator, + { let persisted_file_size_bytes = metrics.register_metric_with_options( "ingester_persisted_file_size_bytes", "Size of files persisted by the ingester", @@ -121,6 +124,11 @@ impl IngesterData { }, ); + let shards = shards + .into_iter() + .map(|(id, index)| (id, ShardData::new(index, id, Arc::clone(&metrics)))) + .collect(); + Self { store: ParquetStorage::new(object_store), catalog, @@ -645,19 +653,12 @@ mod tests { .await .unwrap(); - let mut shards = BTreeMap::new(); - let shard_index = ShardIndex::new(0); - shards.insert( - shard1.id, - ShardData::new(shard_index, shard1.id, Arc::clone(&metrics)), - ); - let object_store: Arc = Arc::new(InMemory::new()); let data = Arc::new(IngesterData::new( Arc::clone(&object_store), Arc::clone(&catalog), - shards, + [(shard1.id, shard_index)], Arc::new(Executor::new(1)), BackoffConfig::default(), Arc::clone(&metrics), @@ -732,18 +733,13 @@ mod tests { .create_or_get(&topic, shard_index) .await .unwrap(); - let mut shards = BTreeMap::new(); - shards.insert( - shard1.id, - ShardData::new(shard1.shard_index, shard1.id, Arc::clone(&metrics)), - ); let object_store: Arc = Arc::new(InMemory::new()); let data = Arc::new(IngesterData::new( Arc::clone(&object_store), Arc::clone(&catalog), - shards, + [(shard1.id, shard1.shard_index)], Arc::new(Executor::new(1)), BackoffConfig::default(), Arc::clone(&metrics), @@ -837,22 +833,16 @@ mod tests { .create_or_get(&topic, shard_index) .await .unwrap(); - let mut shards = BTreeMap::new(); - shards.insert( - shard1.id, - ShardData::new(shard1.shard_index, shard1.id, Arc::clone(&metrics)), - ); - shards.insert( - shard2.id, - ShardData::new(shard2.shard_index, shard2.id, Arc::clone(&metrics)), - ); let object_store: Arc = Arc::new(InMemory::new()); let data = Arc::new(IngesterData::new( Arc::clone(&object_store), Arc::clone(&catalog), - shards, + [ + (shard1.id, shard1.shard_index), + (shard2.id, shard2.shard_index), + ], Arc::new(Executor::new(1)), BackoffConfig::default(), Arc::clone(&metrics), @@ -1093,22 +1083,16 @@ mod tests { .create_or_get(&topic, shard_index) .await .unwrap(); - let mut shards = BTreeMap::new(); - shards.insert( - shard1.id, - ShardData::new(shard1.shard_index, shard1.id, Arc::clone(&metrics)), - ); - shards.insert( - shard2.id, - ShardData::new(shard2.shard_index, shard2.id, Arc::clone(&metrics)), - ); let object_store: Arc = Arc::new(InMemory::new()); let data = Arc::new(IngesterData::new( Arc::clone(&object_store), Arc::clone(&catalog), - shards, + [ + (shard1.id, shard1.shard_index), + (shard2.id, shard2.shard_index), + ], Arc::new(Executor::new(1)), BackoffConfig::default(), Arc::clone(&metrics), @@ -1392,20 +1376,14 @@ mod tests { .create_or_get(&topic, shard_index) .await .unwrap(); - - let mut shards = BTreeMap::new(); let shard_index = ShardIndex::new(0); - shards.insert( - shard1.id, - ShardData::new(shard_index, shard1.id, Arc::clone(&metrics)), - ); let object_store: Arc = Arc::new(InMemory::new()); let data = Arc::new(IngesterData::new( Arc::clone(&object_store), Arc::clone(&catalog), - shards, + [(shard1.id, shard_index)], Arc::new(Executor::new(1)), BackoffConfig::default(), Arc::clone(&metrics), diff --git a/ingester/src/handler.rs b/ingester/src/handler.rs index 7e0d7f5a2b..36e00867ee 100644 --- a/ingester/src/handler.rs +++ b/ingester/src/handler.rs @@ -27,7 +27,7 @@ use write_buffer::core::WriteBufferReading; use write_summary::ShardProgress; use crate::{ - data::{shard::ShardData, IngesterData, IngesterQueryResponse}, + data::{IngesterData, IngesterQueryResponse}, lifecycle::{run_lifecycle_manager, LifecycleConfig, LifecycleManager}, poison::PoisonCabinet, querier_handler::prepare_data_to_querier, @@ -135,18 +135,10 @@ impl IngestHandlerImpl { skip_to_oldest_available: bool, max_requests: usize, ) -> Result { - // build the initial ingester data state - let mut shards = BTreeMap::new(); - for s in shard_states.values() { - shards.insert( - s.id, - ShardData::new(s.shard_index, s.id, Arc::clone(&metric_registry)), - ); - } let data = Arc::new(IngesterData::new( object_store, catalog, - shards, + shard_states.clone().into_iter().map(|(idx, s)| (s.id, idx)), exec, BackoffConfig::default(), Arc::clone(&metric_registry), diff --git a/ingester/src/test_util.rs b/ingester/src/test_util.rs index f4ad282a46..169c8a3ba0 100644 --- a/ingester/src/test_util.rs +++ b/ingester/src/test_util.rs @@ -2,7 +2,7 @@ #![allow(missing_docs)] -use std::{collections::BTreeMap, sync::Arc, time::Duration}; +use std::{sync::Arc, time::Duration}; use arrow::record_batch::RecordBatch; use arrow_util::assert_batches_eq; @@ -25,7 +25,6 @@ use uuid::Uuid; use crate::{ data::{ partition::{PersistingBatch, SnapshotBatch}, - shard::ShardData, IngesterData, }, lifecycle::{LifecycleConfig, LifecycleHandle, LifecycleManager}, @@ -696,16 +695,11 @@ pub async fn make_ingester_data(two_partitions: bool, loc: DataLocation) -> Inge // Make data for one shard and two tables let shard_index = ShardIndex::new(1); let shard_id = populate_catalog(&*catalog).await; - let mut shards = BTreeMap::new(); - shards.insert( - shard_id, - ShardData::new(shard_index, shard_id, Arc::clone(&metrics)), - ); let ingester = IngesterData::new( object_store, catalog, - shards, + [(shard_id, shard_index)], exec, backoff::BackoffConfig::default(), metrics, @@ -768,16 +762,10 @@ pub async fn make_ingester_data_with_tombstones(loc: DataLocation) -> IngesterDa let shard_index = ShardIndex::new(0); let shard_id = populate_catalog(&*catalog).await; - let mut shards = BTreeMap::new(); - shards.insert( - shard_id, - ShardData::new(shard_index, shard_id, Arc::clone(&metrics)), - ); - let ingester = IngesterData::new( object_store, catalog, - shards, + [(shard_id, shard_index)], exec, backoff::BackoffConfig::default(), metrics, diff --git a/query_tests/src/scenarios/util.rs b/query_tests/src/scenarios/util.rs index 2c5bef0504..3348a3a6cf 100644 --- a/query_tests/src/scenarios/util.rs +++ b/query_tests/src/scenarios/util.rs @@ -14,9 +14,7 @@ use generated_types::{ }; use influxdb_iox_client::flight::{low_level::LowLevelMessage, Error as FlightError}; use ingester::{ - data::{ - shard::ShardData, FlatIngesterQueryResponse, IngesterData, IngesterQueryResponse, Persister, - }, + data::{FlatIngesterQueryResponse, IngesterData, IngesterQueryResponse, Persister}, lifecycle::LifecycleHandle, querier_handler::prepare_data_to_querier, }; @@ -35,7 +33,7 @@ use schema::selection::Selection; use sharder::JumpHash; use std::{ cmp::Ordering, - collections::{BTreeMap, HashMap, HashSet}, + collections::{HashMap, HashSet}, fmt::Display, fmt::Write, sync::Arc, @@ -693,18 +691,10 @@ impl MockIngester { let ns = catalog.create_namespace("test_db").await; let shard = ns.create_shard(1).await; - let shards = BTreeMap::from([( - shard.shard.id, - ShardData::new( - shard.shard.shard_index, - shard.shard.id, - catalog.metric_registry(), - ), - )]); let ingester_data = Arc::new(IngesterData::new( catalog.object_store(), catalog.catalog(), - shards, + [(shard.shard.id, shard.shard.shard_index)], catalog.exec(), BackoffConfig::default(), catalog.metric_registry(),