From 11be746dc0f274a33e767aabfa61dbca81c2e7e8 Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Tue, 27 Sep 2022 12:05:02 +0200 Subject: [PATCH] refactor: internalise ShardData init Move the initialisation of ShardData (an internal ingester data structure) into the ingester itself. Previously callers would initialise the ingester state, and pass it into the IngesterData constructor. --- ingester/src/data.rs | 66 +++++++++++-------------------- ingester/src/handler.rs | 12 +----- ingester/src/test_util.rs | 18 ++------- query_tests/src/scenarios/util.rs | 16 ++------ 4 files changed, 30 insertions(+), 82 deletions(-) diff --git a/ingester/src/data.rs b/ingester/src/data.rs index 0e300b04e6..1bdac05160 100644 --- a/ingester/src/data.rs +++ b/ingester/src/data.rs @@ -98,14 +98,17 @@ pub struct IngesterData { impl IngesterData { /// Create new instance. - pub fn new( + pub fn new( object_store: Arc, catalog: Arc, - shards: BTreeMap, + shards: T, exec: Arc, backoff_config: BackoffConfig, metrics: Arc, - ) -> Self { + ) -> Self + where + T: IntoIterator, + { let persisted_file_size_bytes = metrics.register_metric_with_options( "ingester_persisted_file_size_bytes", "Size of files persisted by the ingester", @@ -121,6 +124,11 @@ impl IngesterData { }, ); + let shards = shards + .into_iter() + .map(|(id, index)| (id, ShardData::new(index, id, Arc::clone(&metrics)))) + .collect(); + Self { store: ParquetStorage::new(object_store), catalog, @@ -645,19 +653,12 @@ mod tests { .await .unwrap(); - let mut shards = BTreeMap::new(); - let shard_index = ShardIndex::new(0); - shards.insert( - shard1.id, - ShardData::new(shard_index, shard1.id, Arc::clone(&metrics)), - ); - let object_store: Arc = Arc::new(InMemory::new()); let data = Arc::new(IngesterData::new( Arc::clone(&object_store), Arc::clone(&catalog), - shards, + [(shard1.id, shard_index)], Arc::new(Executor::new(1)), BackoffConfig::default(), Arc::clone(&metrics), @@ -732,18 +733,13 @@ mod tests { .create_or_get(&topic, shard_index) .await .unwrap(); - let mut shards = BTreeMap::new(); - shards.insert( - shard1.id, - ShardData::new(shard1.shard_index, shard1.id, Arc::clone(&metrics)), - ); let object_store: Arc = Arc::new(InMemory::new()); let data = Arc::new(IngesterData::new( Arc::clone(&object_store), Arc::clone(&catalog), - shards, + [(shard1.id, shard1.shard_index)], Arc::new(Executor::new(1)), BackoffConfig::default(), Arc::clone(&metrics), @@ -837,22 +833,16 @@ mod tests { .create_or_get(&topic, shard_index) .await .unwrap(); - let mut shards = BTreeMap::new(); - shards.insert( - shard1.id, - ShardData::new(shard1.shard_index, shard1.id, Arc::clone(&metrics)), - ); - shards.insert( - shard2.id, - ShardData::new(shard2.shard_index, shard2.id, Arc::clone(&metrics)), - ); let object_store: Arc = Arc::new(InMemory::new()); let data = Arc::new(IngesterData::new( Arc::clone(&object_store), Arc::clone(&catalog), - shards, + [ + (shard1.id, shard1.shard_index), + (shard2.id, shard2.shard_index), + ], Arc::new(Executor::new(1)), BackoffConfig::default(), Arc::clone(&metrics), @@ -1093,22 +1083,16 @@ mod tests { .create_or_get(&topic, shard_index) .await .unwrap(); - let mut shards = BTreeMap::new(); - shards.insert( - shard1.id, - ShardData::new(shard1.shard_index, shard1.id, Arc::clone(&metrics)), - ); - shards.insert( - shard2.id, - ShardData::new(shard2.shard_index, shard2.id, Arc::clone(&metrics)), - ); let object_store: Arc = Arc::new(InMemory::new()); let data = Arc::new(IngesterData::new( Arc::clone(&object_store), Arc::clone(&catalog), - shards, + [ + (shard1.id, shard1.shard_index), + (shard2.id, shard2.shard_index), + ], Arc::new(Executor::new(1)), BackoffConfig::default(), Arc::clone(&metrics), @@ -1392,20 +1376,14 @@ mod tests { .create_or_get(&topic, shard_index) .await .unwrap(); - - let mut shards = BTreeMap::new(); let shard_index = ShardIndex::new(0); - shards.insert( - shard1.id, - ShardData::new(shard_index, shard1.id, Arc::clone(&metrics)), - ); let object_store: Arc = Arc::new(InMemory::new()); let data = Arc::new(IngesterData::new( Arc::clone(&object_store), Arc::clone(&catalog), - shards, + [(shard1.id, shard_index)], Arc::new(Executor::new(1)), BackoffConfig::default(), Arc::clone(&metrics), diff --git a/ingester/src/handler.rs b/ingester/src/handler.rs index 7e0d7f5a2b..36e00867ee 100644 --- a/ingester/src/handler.rs +++ b/ingester/src/handler.rs @@ -27,7 +27,7 @@ use write_buffer::core::WriteBufferReading; use write_summary::ShardProgress; use crate::{ - data::{shard::ShardData, IngesterData, IngesterQueryResponse}, + data::{IngesterData, IngesterQueryResponse}, lifecycle::{run_lifecycle_manager, LifecycleConfig, LifecycleManager}, poison::PoisonCabinet, querier_handler::prepare_data_to_querier, @@ -135,18 +135,10 @@ impl IngestHandlerImpl { skip_to_oldest_available: bool, max_requests: usize, ) -> Result { - // build the initial ingester data state - let mut shards = BTreeMap::new(); - for s in shard_states.values() { - shards.insert( - s.id, - ShardData::new(s.shard_index, s.id, Arc::clone(&metric_registry)), - ); - } let data = Arc::new(IngesterData::new( object_store, catalog, - shards, + shard_states.clone().into_iter().map(|(idx, s)| (s.id, idx)), exec, BackoffConfig::default(), Arc::clone(&metric_registry), diff --git a/ingester/src/test_util.rs b/ingester/src/test_util.rs index f4ad282a46..169c8a3ba0 100644 --- a/ingester/src/test_util.rs +++ b/ingester/src/test_util.rs @@ -2,7 +2,7 @@ #![allow(missing_docs)] -use std::{collections::BTreeMap, sync::Arc, time::Duration}; +use std::{sync::Arc, time::Duration}; use arrow::record_batch::RecordBatch; use arrow_util::assert_batches_eq; @@ -25,7 +25,6 @@ use uuid::Uuid; use crate::{ data::{ partition::{PersistingBatch, SnapshotBatch}, - shard::ShardData, IngesterData, }, lifecycle::{LifecycleConfig, LifecycleHandle, LifecycleManager}, @@ -696,16 +695,11 @@ pub async fn make_ingester_data(two_partitions: bool, loc: DataLocation) -> Inge // Make data for one shard and two tables let shard_index = ShardIndex::new(1); let shard_id = populate_catalog(&*catalog).await; - let mut shards = BTreeMap::new(); - shards.insert( - shard_id, - ShardData::new(shard_index, shard_id, Arc::clone(&metrics)), - ); let ingester = IngesterData::new( object_store, catalog, - shards, + [(shard_id, shard_index)], exec, backoff::BackoffConfig::default(), metrics, @@ -768,16 +762,10 @@ pub async fn make_ingester_data_with_tombstones(loc: DataLocation) -> IngesterDa let shard_index = ShardIndex::new(0); let shard_id = populate_catalog(&*catalog).await; - let mut shards = BTreeMap::new(); - shards.insert( - shard_id, - ShardData::new(shard_index, shard_id, Arc::clone(&metrics)), - ); - let ingester = IngesterData::new( object_store, catalog, - shards, + [(shard_id, shard_index)], exec, backoff::BackoffConfig::default(), metrics, diff --git a/query_tests/src/scenarios/util.rs b/query_tests/src/scenarios/util.rs index 2c5bef0504..3348a3a6cf 100644 --- a/query_tests/src/scenarios/util.rs +++ b/query_tests/src/scenarios/util.rs @@ -14,9 +14,7 @@ use generated_types::{ }; use influxdb_iox_client::flight::{low_level::LowLevelMessage, Error as FlightError}; use ingester::{ - data::{ - shard::ShardData, FlatIngesterQueryResponse, IngesterData, IngesterQueryResponse, Persister, - }, + data::{FlatIngesterQueryResponse, IngesterData, IngesterQueryResponse, Persister}, lifecycle::LifecycleHandle, querier_handler::prepare_data_to_querier, }; @@ -35,7 +33,7 @@ use schema::selection::Selection; use sharder::JumpHash; use std::{ cmp::Ordering, - collections::{BTreeMap, HashMap, HashSet}, + collections::{HashMap, HashSet}, fmt::Display, fmt::Write, sync::Arc, @@ -693,18 +691,10 @@ impl MockIngester { let ns = catalog.create_namespace("test_db").await; let shard = ns.create_shard(1).await; - let shards = BTreeMap::from([( - shard.shard.id, - ShardData::new( - shard.shard.shard_index, - shard.shard.id, - catalog.metric_registry(), - ), - )]); let ingester_data = Arc::new(IngesterData::new( catalog.object_store(), catalog.catalog(), - shards, + [(shard.shard.id, shard.shard.shard_index)], catalog.exec(), BackoffConfig::default(), catalog.metric_registry(),