refactor: internalise ShardData init

Move the initialisation of ShardData (an internal ingester data
structure) into the ingester itself.

Previously callers would initialise the ingester state, and pass it into
the IngesterData constructor.
pull/24376/head
Dom Dwyer 2022-09-27 12:05:02 +02:00
parent 8139d01d08
commit 11be746dc0
4 changed files with 30 additions and 82 deletions

View File

@ -98,14 +98,17 @@ pub struct IngesterData {
impl IngesterData {
/// Create new instance.
pub fn new(
pub fn new<T>(
object_store: Arc<DynObjectStore>,
catalog: Arc<dyn Catalog>,
shards: BTreeMap<ShardId, ShardData>,
shards: T,
exec: Arc<Executor>,
backoff_config: BackoffConfig,
metrics: Arc<metric::Registry>,
) -> Self {
) -> Self
where
T: IntoIterator<Item = (ShardId, ShardIndex)>,
{
let persisted_file_size_bytes = metrics.register_metric_with_options(
"ingester_persisted_file_size_bytes",
"Size of files persisted by the ingester",
@ -121,6 +124,11 @@ impl IngesterData {
},
);
let shards = shards
.into_iter()
.map(|(id, index)| (id, ShardData::new(index, id, Arc::clone(&metrics))))
.collect();
Self {
store: ParquetStorage::new(object_store),
catalog,
@ -645,19 +653,12 @@ mod tests {
.await
.unwrap();
let mut shards = BTreeMap::new();
let shard_index = ShardIndex::new(0);
shards.insert(
shard1.id,
ShardData::new(shard_index, shard1.id, Arc::clone(&metrics)),
);
let object_store: Arc<DynObjectStore> = Arc::new(InMemory::new());
let data = Arc::new(IngesterData::new(
Arc::clone(&object_store),
Arc::clone(&catalog),
shards,
[(shard1.id, shard_index)],
Arc::new(Executor::new(1)),
BackoffConfig::default(),
Arc::clone(&metrics),
@ -732,18 +733,13 @@ mod tests {
.create_or_get(&topic, shard_index)
.await
.unwrap();
let mut shards = BTreeMap::new();
shards.insert(
shard1.id,
ShardData::new(shard1.shard_index, shard1.id, Arc::clone(&metrics)),
);
let object_store: Arc<DynObjectStore> = Arc::new(InMemory::new());
let data = Arc::new(IngesterData::new(
Arc::clone(&object_store),
Arc::clone(&catalog),
shards,
[(shard1.id, shard1.shard_index)],
Arc::new(Executor::new(1)),
BackoffConfig::default(),
Arc::clone(&metrics),
@ -837,22 +833,16 @@ mod tests {
.create_or_get(&topic, shard_index)
.await
.unwrap();
let mut shards = BTreeMap::new();
shards.insert(
shard1.id,
ShardData::new(shard1.shard_index, shard1.id, Arc::clone(&metrics)),
);
shards.insert(
shard2.id,
ShardData::new(shard2.shard_index, shard2.id, Arc::clone(&metrics)),
);
let object_store: Arc<DynObjectStore> = Arc::new(InMemory::new());
let data = Arc::new(IngesterData::new(
Arc::clone(&object_store),
Arc::clone(&catalog),
shards,
[
(shard1.id, shard1.shard_index),
(shard2.id, shard2.shard_index),
],
Arc::new(Executor::new(1)),
BackoffConfig::default(),
Arc::clone(&metrics),
@ -1093,22 +1083,16 @@ mod tests {
.create_or_get(&topic, shard_index)
.await
.unwrap();
let mut shards = BTreeMap::new();
shards.insert(
shard1.id,
ShardData::new(shard1.shard_index, shard1.id, Arc::clone(&metrics)),
);
shards.insert(
shard2.id,
ShardData::new(shard2.shard_index, shard2.id, Arc::clone(&metrics)),
);
let object_store: Arc<DynObjectStore> = Arc::new(InMemory::new());
let data = Arc::new(IngesterData::new(
Arc::clone(&object_store),
Arc::clone(&catalog),
shards,
[
(shard1.id, shard1.shard_index),
(shard2.id, shard2.shard_index),
],
Arc::new(Executor::new(1)),
BackoffConfig::default(),
Arc::clone(&metrics),
@ -1392,20 +1376,14 @@ mod tests {
.create_or_get(&topic, shard_index)
.await
.unwrap();
let mut shards = BTreeMap::new();
let shard_index = ShardIndex::new(0);
shards.insert(
shard1.id,
ShardData::new(shard_index, shard1.id, Arc::clone(&metrics)),
);
let object_store: Arc<DynObjectStore> = Arc::new(InMemory::new());
let data = Arc::new(IngesterData::new(
Arc::clone(&object_store),
Arc::clone(&catalog),
shards,
[(shard1.id, shard_index)],
Arc::new(Executor::new(1)),
BackoffConfig::default(),
Arc::clone(&metrics),

View File

@ -27,7 +27,7 @@ use write_buffer::core::WriteBufferReading;
use write_summary::ShardProgress;
use crate::{
data::{shard::ShardData, IngesterData, IngesterQueryResponse},
data::{IngesterData, IngesterQueryResponse},
lifecycle::{run_lifecycle_manager, LifecycleConfig, LifecycleManager},
poison::PoisonCabinet,
querier_handler::prepare_data_to_querier,
@ -135,18 +135,10 @@ impl IngestHandlerImpl {
skip_to_oldest_available: bool,
max_requests: usize,
) -> Result<Self> {
// build the initial ingester data state
let mut shards = BTreeMap::new();
for s in shard_states.values() {
shards.insert(
s.id,
ShardData::new(s.shard_index, s.id, Arc::clone(&metric_registry)),
);
}
let data = Arc::new(IngesterData::new(
object_store,
catalog,
shards,
shard_states.clone().into_iter().map(|(idx, s)| (s.id, idx)),
exec,
BackoffConfig::default(),
Arc::clone(&metric_registry),

View File

@ -2,7 +2,7 @@
#![allow(missing_docs)]
use std::{collections::BTreeMap, sync::Arc, time::Duration};
use std::{sync::Arc, time::Duration};
use arrow::record_batch::RecordBatch;
use arrow_util::assert_batches_eq;
@ -25,7 +25,6 @@ use uuid::Uuid;
use crate::{
data::{
partition::{PersistingBatch, SnapshotBatch},
shard::ShardData,
IngesterData,
},
lifecycle::{LifecycleConfig, LifecycleHandle, LifecycleManager},
@ -696,16 +695,11 @@ pub async fn make_ingester_data(two_partitions: bool, loc: DataLocation) -> Inge
// Make data for one shard and two tables
let shard_index = ShardIndex::new(1);
let shard_id = populate_catalog(&*catalog).await;
let mut shards = BTreeMap::new();
shards.insert(
shard_id,
ShardData::new(shard_index, shard_id, Arc::clone(&metrics)),
);
let ingester = IngesterData::new(
object_store,
catalog,
shards,
[(shard_id, shard_index)],
exec,
backoff::BackoffConfig::default(),
metrics,
@ -768,16 +762,10 @@ pub async fn make_ingester_data_with_tombstones(loc: DataLocation) -> IngesterDa
let shard_index = ShardIndex::new(0);
let shard_id = populate_catalog(&*catalog).await;
let mut shards = BTreeMap::new();
shards.insert(
shard_id,
ShardData::new(shard_index, shard_id, Arc::clone(&metrics)),
);
let ingester = IngesterData::new(
object_store,
catalog,
shards,
[(shard_id, shard_index)],
exec,
backoff::BackoffConfig::default(),
metrics,

View File

@ -14,9 +14,7 @@ use generated_types::{
};
use influxdb_iox_client::flight::{low_level::LowLevelMessage, Error as FlightError};
use ingester::{
data::{
shard::ShardData, FlatIngesterQueryResponse, IngesterData, IngesterQueryResponse, Persister,
},
data::{FlatIngesterQueryResponse, IngesterData, IngesterQueryResponse, Persister},
lifecycle::LifecycleHandle,
querier_handler::prepare_data_to_querier,
};
@ -35,7 +33,7 @@ use schema::selection::Selection;
use sharder::JumpHash;
use std::{
cmp::Ordering,
collections::{BTreeMap, HashMap, HashSet},
collections::{HashMap, HashSet},
fmt::Display,
fmt::Write,
sync::Arc,
@ -693,18 +691,10 @@ impl MockIngester {
let ns = catalog.create_namespace("test_db").await;
let shard = ns.create_shard(1).await;
let shards = BTreeMap::from([(
shard.shard.id,
ShardData::new(
shard.shard.shard_index,
shard.shard.id,
catalog.metric_registry(),
),
)]);
let ingester_data = Arc::new(IngesterData::new(
catalog.object_store(),
catalog.catalog(),
shards,
[(shard.shard.id, shard.shard.shard_index)],
catalog.exec(),
BackoffConfig::default(),
catalog.metric_registry(),