Merge branch 'main' into dependabot/cargo/ouroboros-0.11.1

pull/24376/head
kodiakhq[bot] 2021-09-27 13:24:10 +00:00 committed by GitHub
commit b9b9362bfa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 108 additions and 78 deletions

View File

@ -7,6 +7,7 @@ use data_types::{
chunk_metadata::{ChunkAddr, ChunkId, ChunkLifecycleAction, ChunkOrder, ChunkSummary},
partition_metadata::{PartitionAddr, PartitionSummary},
};
use hashbrown::HashMap;
use internal_types::schema::Schema;
use observability_deps::tracing::info;
use persistence_windows::{
@ -14,11 +15,7 @@ use persistence_windows::{
};
use predicate::predicate::Predicate;
use snafu::{OptionExt, Snafu};
use std::{
collections::{btree_map::Entry, BTreeMap},
fmt::Display,
sync::Arc,
};
use std::{collections::BTreeMap, fmt::Display, sync::Arc};
use tracker::RwLock;
#[derive(Debug, Snafu)]
@ -42,6 +39,76 @@ pub enum Error {
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Provides ordered iteration of a collection of chunks
#[derive(Debug, Default)]
struct ChunkCollection {
/// The chunks that make up this partition, indexed by order and id.
///
/// This is the order that chunks should be iterated and locks acquired
chunks: BTreeMap<(ChunkOrder, ChunkId), Arc<RwLock<CatalogChunk>>>,
/// Provides a lookup from `ChunkId` to the corresponding `ChunkOrder`
chunk_orders: HashMap<ChunkId, ChunkOrder>,
}
impl ChunkCollection {
/// Returns an iterator over the chunks in this collection
/// ordered by `ChunkOrder` and then `ChunkId`
fn iter(&self) -> impl Iterator<Item = (ChunkId, ChunkOrder, &Arc<RwLock<CatalogChunk>>)> + '_ {
self.chunks
.iter()
.map(|((order, id), chunk)| (*id, *order, chunk))
}
/// Returns an iterator over the chunks in this collection
/// ordered by `ChunkOrder` and then `ChunkId`
fn values(&self) -> impl Iterator<Item = &Arc<RwLock<CatalogChunk>>> + '_ {
self.chunks.values()
}
/// Gets a chunk by `ChunkId`
fn get(&self, id: ChunkId) -> Option<(&Arc<RwLock<CatalogChunk>>, ChunkOrder)> {
let order = *self.chunk_orders.get(&id)?;
let chunk = self.chunks.get(&(order, id)).unwrap();
Some((chunk, order))
}
/// Inserts a new chunk
///
/// # Panics
///
/// Panics if a chunk already exists with the given id
fn insert(
&mut self,
id: ChunkId,
order: ChunkOrder,
chunk: Arc<RwLock<CatalogChunk>>,
) -> &Arc<RwLock<CatalogChunk>> {
match self.chunk_orders.entry(id) {
hashbrown::hash_map::Entry::Occupied(_) => {
panic!("chunk already found with id: {}", id)
}
hashbrown::hash_map::Entry::Vacant(v) => v.insert(order),
};
match self.chunks.entry((order, id)) {
std::collections::btree_map::Entry::Occupied(_) => unreachable!(),
std::collections::btree_map::Entry::Vacant(v) => v.insert(chunk),
}
}
/// Remove a chunk with the given ID, returns None if the chunk doesn't exist
fn remove(&mut self, id: ChunkId) -> Option<Arc<RwLock<CatalogChunk>>> {
let order = self.chunk_orders.remove(&id)?;
Some(self.chunks.remove(&(order, id)).unwrap())
}
/// Returns `true` if the collection contains no chunks
fn is_empty(&self) -> bool {
self.chunk_orders.is_empty()
}
}
/// IOx Catalog Partition
///
/// A partition contains multiple Chunks for a given table
@ -49,10 +116,8 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
pub struct Partition {
addr: PartitionAddr,
/// The chunks that make up this partition, indexed by id.
//
// Alongside the chunk we also store its order.
chunks: BTreeMap<ChunkId, (ChunkOrder, Arc<RwLock<CatalogChunk>>)>,
/// The chunks that make up this partition
chunks: ChunkCollection,
/// When this partition was created
created_at: DateTime<Utc>,
@ -138,7 +203,7 @@ impl Partition {
&mut self,
chunk: mutable_buffer::chunk::MBChunk,
time_of_write: DateTime<Utc>,
) -> Arc<RwLock<CatalogChunk>> {
) -> &Arc<RwLock<CatalogChunk>> {
assert_eq!(chunk.table_name().as_ref(), self.table_name());
let chunk_id = self.next_chunk_id();
@ -154,17 +219,7 @@ impl Partition {
chunk_order,
);
let chunk = Arc::new(self.metrics.new_chunk_lock(chunk));
if self
.chunks
.insert(chunk_id, (chunk_order, Arc::clone(&chunk)))
.is_some()
{
// A fundamental invariant has been violated - abort
panic!("chunk already existed with id {}", chunk_id)
}
chunk
self.chunks.insert(chunk_id, chunk_order, chunk)
}
/// Create a new read buffer chunk.
@ -178,7 +233,7 @@ impl Partition {
schema: Arc<Schema>,
delete_predicates: Vec<Arc<Predicate>>,
chunk_order: ChunkOrder,
) -> (ChunkId, Arc<RwLock<CatalogChunk>>) {
) -> (ChunkId, &Arc<RwLock<CatalogChunk>>) {
let chunk_id = self.next_chunk_id();
assert!(
chunk_order < self.next_chunk_order,
@ -201,14 +256,7 @@ impl Partition {
chunk_order,
)));
if self
.chunks
.insert(chunk_id, (chunk_order, Arc::clone(&chunk)))
.is_some()
{
// A fundamental invariant has been violated - abort
panic!("chunk already existed with id {}", chunk_id)
}
let chunk = self.chunks.insert(chunk_id, chunk_order, chunk);
(chunk_id, chunk)
}
@ -227,7 +275,7 @@ impl Partition {
time_of_last_write: DateTime<Utc>,
delete_predicates: Vec<Arc<Predicate>>,
chunk_order: ChunkOrder,
) -> Arc<RwLock<CatalogChunk>> {
) -> &Arc<RwLock<CatalogChunk>> {
assert_eq!(chunk.table_name(), self.table_name());
let addr = ChunkAddr::new(&self.addr, chunk_id);
@ -245,28 +293,23 @@ impl Partition {
)),
);
match self.chunks.entry(chunk_id) {
Entry::Vacant(vacant) => {
// only update internal state when we know that insertion is OK
self.next_chunk_id = self.next_chunk_id.max(chunk_id.next());
self.next_chunk_order = self.next_chunk_order.max(chunk_order.next());
let chunk = self.chunks.insert(chunk_id, chunk_order, chunk);
vacant.insert((chunk_order, Arc::clone(&chunk)));
chunk
}
Entry::Occupied(_) => panic!("chunk with id {} already exists", chunk_id),
}
// only update internal state when we know that insertion is OK
self.next_chunk_id = self.next_chunk_id.max(chunk_id.next());
self.next_chunk_order = self.next_chunk_order.max(chunk_order.next());
chunk
}
/// Drop the specified chunk
pub fn drop_chunk(&mut self, chunk_id: ChunkId) -> Result<Arc<RwLock<CatalogChunk>>> {
match self.chunks.entry(chunk_id) {
Entry::Vacant(_) => Err(Error::ChunkNotFound {
match self.chunks.get(chunk_id) {
None => Err(Error::ChunkNotFound {
chunk: ChunkAddr::new(&self.addr, chunk_id),
}),
Entry::Occupied(occupied) => {
Some((chunk, _)) => {
{
let (_order, chunk) = occupied.get();
let chunk = chunk.read();
if let Some(action) = chunk.lifecycle_action() {
if action.metadata() != &ChunkLifecycleAction::Dropping {
@ -277,8 +320,7 @@ impl Partition {
}
}
}
let (_order, chunk) = occupied.remove();
Ok(chunk)
Ok(self.chunks.remove(chunk_id).unwrap())
}
}
}
@ -286,54 +328,41 @@ impl Partition {
/// Drop the specified chunk even if it has an in-progress lifecycle action
/// returning the dropped chunk
pub fn force_drop_chunk(&mut self, chunk_id: ChunkId) -> Result<Arc<RwLock<CatalogChunk>>> {
self.chunks
.remove(&chunk_id)
.map(|(_order, chunk)| chunk)
.context(ChunkNotFound {
chunk: ChunkAddr::new(&self.addr, chunk_id),
})
self.chunks.remove(chunk_id).context(ChunkNotFound {
chunk: ChunkAddr::new(&self.addr, chunk_id),
})
}
/// Return the first currently open chunk, if any
pub fn open_chunk(&self) -> Option<Arc<RwLock<CatalogChunk>>> {
self.chunks
.values()
.find(|(_order, chunk)| {
.find(|chunk| {
let chunk = chunk.read();
matches!(chunk.stage(), ChunkStage::Open { .. })
})
.cloned()
.map(|(_order, chunk)| chunk)
}
/// Return an immutable chunk and its order reference by chunk id.
pub fn chunk(&self, chunk_id: ChunkId) -> Option<(&Arc<RwLock<CatalogChunk>>, ChunkOrder)> {
self.chunks
.get(&chunk_id)
.map(|(order, chunk)| (chunk, *order))
self.chunks.get(chunk_id)
}
/// Return chunks in this partition.
///
/// Note that chunks are guaranteed ordered by chunk order and ID.
pub fn chunks(&self) -> Vec<&Arc<RwLock<CatalogChunk>>> {
self.keyed_chunks()
.into_iter()
.map(|(_id, _order, chunk)| chunk)
.collect()
/// Note that chunks are guaranteed ordered by chunk order and then ID.
pub fn chunks(&self) -> impl Iterator<Item = &Arc<RwLock<CatalogChunk>>> {
self.chunks.values()
}
/// Return chunks in this partition with their order and ids.
///
/// Note that chunks are guaranteed ordered by chunk order and ID.
pub fn keyed_chunks(&self) -> Vec<(ChunkId, ChunkOrder, &Arc<RwLock<CatalogChunk>>)> {
let mut chunks: Vec<_> = self
.chunks
.iter()
.map(|(id, (order, chunk))| (*id, *order, chunk))
.collect();
chunks.sort_by_key(|(id, order, _chunk)| (*order, *id));
chunks
pub fn keyed_chunks(
&self,
) -> impl Iterator<Item = (ChunkId, ChunkOrder, &Arc<RwLock<CatalogChunk>>)> + '_ {
self.chunks.iter()
}
/// Return a PartitionSummary for this partition. If the partition
@ -346,14 +375,14 @@ impl Partition {
self.addr.partition_key.to_string(),
self.chunks
.values()
.map(|(_order, chunk)| chunk.read().table_summary().as_ref().clone()),
.map(|chunk| chunk.read().table_summary().as_ref().clone()),
))
}
}
/// Return chunk summaries for all chunks in this partition
pub fn chunk_summaries(&self) -> impl Iterator<Item = ChunkSummary> + '_ {
self.chunks().into_iter().map(|x| x.read().summary())
self.chunks.values().map(|x| x.read().summary())
}
/// Return reference to partition-specific metrics.

View File

@ -108,21 +108,22 @@ pub(crate) fn compact_chunks(
.expect("chunk has zero rows");
let rb_row_groups = rb_chunk.row_groups();
let (_id, new_chunk) = {
let new_chunk = {
let mut partition = partition.write();
for id in chunk_ids {
partition.force_drop_chunk(id).expect(
"There was a lifecycle action attached to this chunk, who deleted it?!",
);
}
partition.create_rub_chunk(
let (_, chunk) = partition.create_rub_chunk(
rb_chunk,
time_of_first_write,
time_of_last_write,
schema,
delete_predicates,
min_order,
)
);
Arc::clone(chunk)
};
let guard = new_chunk.read();

View File

@ -164,7 +164,7 @@ where
);
let to_persist = LockableCatalogChunk {
db,
chunk: new_chunk,
chunk: Arc::clone(new_chunk),
id: new_chunk_id,
order: min_order,
};