refactor: introduce PartitionAddr (#2010)

pull/24376/head
Raphael Taylor-Davies 2021-07-15 11:01:33 +01:00 committed by GitHub
parent 6182c0974b
commit 3e0d1eb560
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 62 additions and 57 deletions

View File

@ -1,6 +1,7 @@
//! Module contains a representation of chunk metadata
use std::sync::Arc;
use crate::partition_metadata::PartitionAddr;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
@ -20,6 +21,17 @@ pub struct ChunkAddr {
pub chunk_id: u32,
}
impl ChunkAddr {
pub fn new(partition: &PartitionAddr, chunk_id: u32) -> Self {
Self {
db_name: Arc::clone(&partition.db_name),
table_name: Arc::clone(&partition.table_name),
partition_key: Arc::clone(&partition.partition_key),
chunk_id,
}
}
}
impl std::fmt::Display for ChunkAddr {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(

View File

@ -8,8 +8,32 @@ use std::{
iter::FromIterator,
mem,
num::NonZeroU64,
sync::Arc,
};
/// Address of the chunk within the catalog
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
pub struct PartitionAddr {
/// Database name
pub db_name: Arc<str>,
/// What table does the chunk belong to?
pub table_name: Arc<str>,
/// What partition does the chunk belong to?
pub partition_key: Arc<str>,
}
impl std::fmt::Display for PartitionAddr {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"Partition('{}':'{}':'{}')",
self.db_name, self.table_name, self.partition_key
)
}
}
/// Describes the aggregated (across all chunks) summary
/// statistics for each column in a partition
#[derive(Debug, Deserialize, Serialize, PartialEq)]

View File

@ -5,7 +5,7 @@ use crate::db::catalog::metrics::PartitionMetrics;
use chrono::{DateTime, Utc};
use data_types::{
chunk_metadata::{ChunkAddr, ChunkLifecycleAction, ChunkSummary},
partition_metadata::PartitionSummary,
partition_metadata::{PartitionAddr, PartitionSummary},
};
use internal_types::schema::Schema;
use observability_deps::tracing::info;
@ -41,14 +41,7 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
/// A partition contains multiple Chunks for a given table
#[derive(Debug)]
pub struct Partition {
/// Database name
db_name: Arc<str>,
/// The partition key
partition_key: Arc<str>,
/// The table name
table_name: Arc<str>,
addr: PartitionAddr,
/// The chunks that make up this partition, indexed by id
chunks: BTreeMap<u32, Arc<RwLock<CatalogChunk>>>,
@ -75,17 +68,10 @@ impl Partition {
///
/// This function is not pub because `Partition`s should be created using the interfaces on
/// [`Catalog`](crate::db::catalog::Catalog) and not instantiated directly.
pub(super) fn new(
db_name: Arc<str>,
partition_key: Arc<str>,
table_name: Arc<str>,
metrics: PartitionMetrics,
) -> Self {
pub(super) fn new(addr: PartitionAddr, metrics: PartitionMetrics) -> Self {
let now = Utc::now();
Self {
db_name,
partition_key,
table_name,
addr,
chunks: Default::default(),
created_at: now,
last_write_at: now,
@ -95,19 +81,24 @@ impl Partition {
}
}
/// Return the address of this Partition
pub fn addr(&self) -> &PartitionAddr {
&self.addr
}
/// Return the db name of this Partition
pub fn db_name(&self) -> &str {
&self.db_name
&self.addr.db_name
}
/// Return the partition_key of this Partition
pub fn key(&self) -> &str {
&self.partition_key
&self.addr.partition_key
}
/// Return the table name of this partition
pub fn table_name(&self) -> &str {
&self.table_name
&self.addr.table_name
}
/// Update the last write time to now
@ -135,18 +126,13 @@ impl Partition {
&mut self,
chunk: mutable_buffer::chunk::MBChunk,
) -> Arc<RwLock<CatalogChunk>> {
assert_eq!(chunk.table_name().as_ref(), self.table_name.as_ref());
assert_eq!(chunk.table_name().as_ref(), self.table_name());
let chunk_id = self.next_chunk_id;
assert_ne!(self.next_chunk_id, u32::MAX, "Chunk ID Overflow");
self.next_chunk_id += 1;
let addr = ChunkAddr {
db_name: Arc::clone(&self.db_name),
table_name: Arc::clone(&self.table_name),
partition_key: Arc::clone(&self.partition_key),
chunk_id,
};
let addr = ChunkAddr::new(&self.addr, chunk_id);
let chunk = CatalogChunk::new_open(addr, chunk, self.metrics.new_chunk_metrics());
let chunk = Arc::new(self.metrics.new_chunk_lock(chunk));
@ -169,12 +155,7 @@ impl Partition {
assert_ne!(self.next_chunk_id, u32::MAX, "Chunk ID Overflow");
self.next_chunk_id += 1;
let addr = ChunkAddr {
db_name: Arc::clone(&self.db_name),
table_name: Arc::clone(&self.table_name),
partition_key: Arc::clone(&self.partition_key),
chunk_id,
};
let addr = ChunkAddr::new(&self.addr, chunk_id);
info!(%addr, row_count=chunk.rows(), "inserting RUB chunk to catalog");
let chunk = Arc::new(self.metrics.new_chunk_lock(CatalogChunk::new_rub_chunk(
@ -201,14 +182,9 @@ impl Partition {
chunk_id: u32,
chunk: Arc<parquet_file::chunk::ParquetChunk>,
) -> Arc<RwLock<CatalogChunk>> {
assert_eq!(chunk.table_name(), self.table_name.as_ref());
assert_eq!(chunk.table_name(), self.table_name());
let addr = ChunkAddr {
db_name: Arc::clone(&self.db_name),
table_name: Arc::clone(&self.table_name),
partition_key: Arc::clone(&self.partition_key),
chunk_id,
};
let addr = ChunkAddr::new(&self.addr, chunk_id);
let chunk = Arc::new(
self.metrics
@ -230,12 +206,7 @@ impl Partition {
pub fn drop_chunk(&mut self, chunk_id: u32) -> Result<Arc<RwLock<CatalogChunk>>> {
match self.chunks.entry(chunk_id) {
Entry::Vacant(_) => Err(Error::ChunkNotFound {
chunk: ChunkAddr {
db_name: Arc::clone(&self.db_name),
table_name: Arc::clone(&self.table_name),
partition_key: Arc::clone(&self.partition_key),
chunk_id,
},
chunk: ChunkAddr::new(&self.addr, chunk_id),
}),
Entry::Occupied(occupied) => {
{
@ -286,7 +257,7 @@ impl Partition {
/// Return a PartitionSummary for this partition
pub fn summary(&self) -> PartitionSummary {
PartitionSummary::from_table_summaries(
self.partition_key.to_string(),
self.addr.partition_key.to_string(),
self.chunks
.values()
.map(|x| x.read().table_summary().as_ref().clone()),
@ -317,10 +288,6 @@ impl Partition {
impl Display for Partition {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"Partition {}:{}:{}",
self.db_name, self.table_name, self.partition_key
)
self.addr.fmt(f)
}
}

View File

@ -1,6 +1,6 @@
use super::partition::Partition;
use crate::db::catalog::metrics::TableMetrics;
use data_types::partition_metadata::PartitionSummary;
use data_types::partition_metadata::{PartitionAddr, PartitionSummary};
use hashbrown::HashMap;
use internal_types::schema::{
builder::SchemaBuilder,
@ -78,9 +78,11 @@ impl Table {
let partition_key = Arc::from(partition_key.as_ref());
let partition_metrics = metrics.new_partition_metrics();
let partition = Partition::new(
Arc::clone(&db_name),
Arc::clone(&partition_key),
Arc::clone(&table_name),
PartitionAddr {
db_name: Arc::clone(db_name),
table_name: Arc::clone(table_name),
partition_key: Arc::clone(&partition_key),
},
partition_metrics,
);
let partition = Arc::new(metrics.new_partition_lock(partition));