From 1534ae9edfc44f569ef9840fd380e8301be6609c Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> Date: Mon, 27 Sep 2021 12:59:27 +0100 Subject: [PATCH] refactor: store chunks in iteration order (#2634) Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com> --- server/src/db/catalog/partition.rs | 177 +++++++++++++++++------------ server/src/db/lifecycle/compact.rs | 7 +- server/src/db/lifecycle/persist.rs | 2 +- 3 files changed, 108 insertions(+), 78 deletions(-) diff --git a/server/src/db/catalog/partition.rs b/server/src/db/catalog/partition.rs index 3480cfbf71..a8bdbab4c8 100644 --- a/server/src/db/catalog/partition.rs +++ b/server/src/db/catalog/partition.rs @@ -7,6 +7,7 @@ use data_types::{ chunk_metadata::{ChunkAddr, ChunkId, ChunkLifecycleAction, ChunkOrder, ChunkSummary}, partition_metadata::{PartitionAddr, PartitionSummary}, }; +use hashbrown::HashMap; use internal_types::schema::Schema; use observability_deps::tracing::info; use persistence_windows::{ @@ -14,11 +15,7 @@ use persistence_windows::{ }; use predicate::predicate::Predicate; use snafu::{OptionExt, Snafu}; -use std::{ - collections::{btree_map::Entry, BTreeMap}, - fmt::Display, - sync::Arc, -}; +use std::{collections::BTreeMap, fmt::Display, sync::Arc}; use tracker::RwLock; #[derive(Debug, Snafu)] @@ -42,6 +39,76 @@ pub enum Error { pub type Result = std::result::Result; +/// Provides ordered iteration of a collection of chunks +#[derive(Debug, Default)] +struct ChunkCollection { + /// The chunks that make up this partition, indexed by order and id. + /// + /// This is the order that chunks should be iterated and locks acquired + chunks: BTreeMap<(ChunkOrder, ChunkId), Arc>>, + + /// Provides a lookup from `ChunkId` to the corresponding `ChunkOrder` + chunk_orders: HashMap, +} + +impl ChunkCollection { + /// Returns an iterator over the chunks in this collection + /// ordered by `ChunkOrder` and then `ChunkId` + fn iter(&self) -> impl Iterator>)> + '_ { + self.chunks + .iter() + .map(|((order, id), chunk)| (*id, *order, chunk)) + } + + /// Returns an iterator over the chunks in this collection + /// ordered by `ChunkOrder` and then `ChunkId` + fn values(&self) -> impl Iterator>> + '_ { + self.chunks.values() + } + + /// Gets a chunk by `ChunkId` + fn get(&self, id: ChunkId) -> Option<(&Arc>, ChunkOrder)> { + let order = *self.chunk_orders.get(&id)?; + let chunk = self.chunks.get(&(order, id)).unwrap(); + Some((chunk, order)) + } + + /// Inserts a new chunk + /// + /// # Panics + /// + /// Panics if a chunk already exists with the given id + fn insert( + &mut self, + id: ChunkId, + order: ChunkOrder, + chunk: Arc>, + ) -> &Arc> { + match self.chunk_orders.entry(id) { + hashbrown::hash_map::Entry::Occupied(_) => { + panic!("chunk already found with id: {}", id) + } + hashbrown::hash_map::Entry::Vacant(v) => v.insert(order), + }; + + match self.chunks.entry((order, id)) { + std::collections::btree_map::Entry::Occupied(_) => unreachable!(), + std::collections::btree_map::Entry::Vacant(v) => v.insert(chunk), + } + } + + /// Remove a chunk with the given ID, returns None if the chunk doesn't exist + fn remove(&mut self, id: ChunkId) -> Option>> { + let order = self.chunk_orders.remove(&id)?; + Some(self.chunks.remove(&(order, id)).unwrap()) + } + + /// Returns `true` if the collection contains no chunks + fn is_empty(&self) -> bool { + self.chunk_orders.is_empty() + } +} + /// IOx Catalog Partition /// /// A partition contains multiple Chunks for a given table @@ -49,10 +116,8 @@ pub type Result = std::result::Result; pub struct Partition { addr: PartitionAddr, - /// The chunks that make up this partition, indexed by id. - // - // Alongside the chunk we also store its order. - chunks: BTreeMap>)>, + /// The chunks that make up this partition + chunks: ChunkCollection, /// When this partition was created created_at: DateTime, @@ -138,7 +203,7 @@ impl Partition { &mut self, chunk: mutable_buffer::chunk::MBChunk, time_of_write: DateTime, - ) -> Arc> { + ) -> &Arc> { assert_eq!(chunk.table_name().as_ref(), self.table_name()); let chunk_id = self.next_chunk_id(); @@ -154,17 +219,7 @@ impl Partition { chunk_order, ); let chunk = Arc::new(self.metrics.new_chunk_lock(chunk)); - - if self - .chunks - .insert(chunk_id, (chunk_order, Arc::clone(&chunk))) - .is_some() - { - // A fundamental invariant has been violated - abort - panic!("chunk already existed with id {}", chunk_id) - } - - chunk + self.chunks.insert(chunk_id, chunk_order, chunk) } /// Create a new read buffer chunk. @@ -178,7 +233,7 @@ impl Partition { schema: Arc, delete_predicates: Vec>, chunk_order: ChunkOrder, - ) -> (ChunkId, Arc>) { + ) -> (ChunkId, &Arc>) { let chunk_id = self.next_chunk_id(); assert!( chunk_order < self.next_chunk_order, @@ -201,14 +256,7 @@ impl Partition { chunk_order, ))); - if self - .chunks - .insert(chunk_id, (chunk_order, Arc::clone(&chunk))) - .is_some() - { - // A fundamental invariant has been violated - abort - panic!("chunk already existed with id {}", chunk_id) - } + let chunk = self.chunks.insert(chunk_id, chunk_order, chunk); (chunk_id, chunk) } @@ -227,7 +275,7 @@ impl Partition { time_of_last_write: DateTime, delete_predicates: Vec>, chunk_order: ChunkOrder, - ) -> Arc> { + ) -> &Arc> { assert_eq!(chunk.table_name(), self.table_name()); let addr = ChunkAddr::new(&self.addr, chunk_id); @@ -245,28 +293,23 @@ impl Partition { )), ); - match self.chunks.entry(chunk_id) { - Entry::Vacant(vacant) => { - // only update internal state when we know that insertion is OK - self.next_chunk_id = self.next_chunk_id.max(chunk_id.next()); - self.next_chunk_order = self.next_chunk_order.max(chunk_order.next()); + let chunk = self.chunks.insert(chunk_id, chunk_order, chunk); - vacant.insert((chunk_order, Arc::clone(&chunk))); - chunk - } - Entry::Occupied(_) => panic!("chunk with id {} already exists", chunk_id), - } + // only update internal state when we know that insertion is OK + self.next_chunk_id = self.next_chunk_id.max(chunk_id.next()); + self.next_chunk_order = self.next_chunk_order.max(chunk_order.next()); + + chunk } /// Drop the specified chunk pub fn drop_chunk(&mut self, chunk_id: ChunkId) -> Result>> { - match self.chunks.entry(chunk_id) { - Entry::Vacant(_) => Err(Error::ChunkNotFound { + match self.chunks.get(chunk_id) { + None => Err(Error::ChunkNotFound { chunk: ChunkAddr::new(&self.addr, chunk_id), }), - Entry::Occupied(occupied) => { + Some((chunk, _)) => { { - let (_order, chunk) = occupied.get(); let chunk = chunk.read(); if let Some(action) = chunk.lifecycle_action() { if action.metadata() != &ChunkLifecycleAction::Dropping { @@ -277,8 +320,7 @@ impl Partition { } } } - let (_order, chunk) = occupied.remove(); - Ok(chunk) + Ok(self.chunks.remove(chunk_id).unwrap()) } } } @@ -286,54 +328,41 @@ impl Partition { /// Drop the specified chunk even if it has an in-progress lifecycle action /// returning the dropped chunk pub fn force_drop_chunk(&mut self, chunk_id: ChunkId) -> Result>> { - self.chunks - .remove(&chunk_id) - .map(|(_order, chunk)| chunk) - .context(ChunkNotFound { - chunk: ChunkAddr::new(&self.addr, chunk_id), - }) + self.chunks.remove(chunk_id).context(ChunkNotFound { + chunk: ChunkAddr::new(&self.addr, chunk_id), + }) } /// Return the first currently open chunk, if any pub fn open_chunk(&self) -> Option>> { self.chunks .values() - .find(|(_order, chunk)| { + .find(|chunk| { let chunk = chunk.read(); matches!(chunk.stage(), ChunkStage::Open { .. }) }) .cloned() - .map(|(_order, chunk)| chunk) } /// Return an immutable chunk and its order reference by chunk id. pub fn chunk(&self, chunk_id: ChunkId) -> Option<(&Arc>, ChunkOrder)> { - self.chunks - .get(&chunk_id) - .map(|(order, chunk)| (chunk, *order)) + self.chunks.get(chunk_id) } /// Return chunks in this partition. /// - /// Note that chunks are guaranteed ordered by chunk order and ID. - pub fn chunks(&self) -> Vec<&Arc>> { - self.keyed_chunks() - .into_iter() - .map(|(_id, _order, chunk)| chunk) - .collect() + /// Note that chunks are guaranteed ordered by chunk order and then ID. + pub fn chunks(&self) -> impl Iterator>> { + self.chunks.values() } /// Return chunks in this partition with their order and ids. /// /// Note that chunks are guaranteed ordered by chunk order and ID. - pub fn keyed_chunks(&self) -> Vec<(ChunkId, ChunkOrder, &Arc>)> { - let mut chunks: Vec<_> = self - .chunks - .iter() - .map(|(id, (order, chunk))| (*id, *order, chunk)) - .collect(); - chunks.sort_by_key(|(id, order, _chunk)| (*order, *id)); - chunks + pub fn keyed_chunks( + &self, + ) -> impl Iterator>)> + '_ { + self.chunks.iter() } /// Return a PartitionSummary for this partition. If the partition @@ -346,14 +375,14 @@ impl Partition { self.addr.partition_key.to_string(), self.chunks .values() - .map(|(_order, chunk)| chunk.read().table_summary().as_ref().clone()), + .map(|chunk| chunk.read().table_summary().as_ref().clone()), )) } } /// Return chunk summaries for all chunks in this partition pub fn chunk_summaries(&self) -> impl Iterator + '_ { - self.chunks().into_iter().map(|x| x.read().summary()) + self.chunks.values().map(|x| x.read().summary()) } /// Return reference to partition-specific metrics. diff --git a/server/src/db/lifecycle/compact.rs b/server/src/db/lifecycle/compact.rs index f0f8e12a67..a5bb5a21c0 100644 --- a/server/src/db/lifecycle/compact.rs +++ b/server/src/db/lifecycle/compact.rs @@ -108,21 +108,22 @@ pub(crate) fn compact_chunks( .expect("chunk has zero rows"); let rb_row_groups = rb_chunk.row_groups(); - let (_id, new_chunk) = { + let new_chunk = { let mut partition = partition.write(); for id in chunk_ids { partition.force_drop_chunk(id).expect( "There was a lifecycle action attached to this chunk, who deleted it?!", ); } - partition.create_rub_chunk( + let (_, chunk) = partition.create_rub_chunk( rb_chunk, time_of_first_write, time_of_last_write, schema, delete_predicates, min_order, - ) + ); + Arc::clone(chunk) }; let guard = new_chunk.read(); diff --git a/server/src/db/lifecycle/persist.rs b/server/src/db/lifecycle/persist.rs index 0530c4bed6..b9d6ef1815 100644 --- a/server/src/db/lifecycle/persist.rs +++ b/server/src/db/lifecycle/persist.rs @@ -164,7 +164,7 @@ where ); let to_persist = LockableCatalogChunk { db, - chunk: new_chunk, + chunk: Arc::clone(new_chunk), id: new_chunk_id, order: min_order, };