feat: support concurrent access to Chunk

pull/24376/head
Edd Robinson 2021-01-28 22:38:27 +00:00
parent 30b90943bc
commit bc08c6b404
3 changed files with 223 additions and 111 deletions

View File

@ -1,4 +1,7 @@
use std::collections::{btree_map::Entry, BTreeMap, BTreeSet};
use std::{
collections::{btree_map::Entry, BTreeMap, BTreeSet},
sync::RwLock,
};
use data_types::selection::Selection;
@ -11,29 +14,66 @@ use crate::Error;
type TableName = String;
// Tie data and meta-data together so that they can be wrapped in RWLock.
struct TableData {
size: u64, // size in bytes of the chunk
rows: u64, // Total number of rows across all tables
// Total number of row groups across all tables in the chunk.
row_groups: usize,
// The set of tables within this chunk. Each table is identified by a
// measurement name. Whilst tables most contain immutable row-group data,
// they need to be mutable because they can have immutable data added or
// removed, causing changes to their meta-data.
data: BTreeMap<TableName, RwLock<Table>>,
}
/// A `Chunk` comprises a collection of `Tables` where every table must have a
/// unique identifier (name).
pub struct Chunk {
// The unique identifier for this chunk.
id: u32,
// Metadata about the tables within this chunk.
meta: MetaData,
// The set of tables within this chunk. Each table is identified by a
// measurement name.
tables: BTreeMap<TableName, Table>,
// A chunk's data is held in a collection of mutable tables and
// mutable meta data (`TableData`).
//
// Concurrent access to the `TableData` is managed via an `RwLock`, which is
// taken in the following circumstances:
//
// * A lock is needed when updating a table with a new row group. It is held as long as it
// takes to update the table and update the chunk's meta-data. This is not long.
//
// * A lock is needed when removing an entire table. It is held as long as it takes to
// remove the table from the `TableData`'s map, and re-construct new meta-data. This is
// not long.
//
// * A read lock is needed for all read operations over chunk data (tables). However, the
// read lock is only taken for as long as it takes to determine which table data is needed
// to perform the read, shallow-clone that data (via Rcs), and construct an iterator for
// executing that operation. Once the iterator is returned to the caller, the lock is
// freed. Therefore, read execution against the chunk is mostly lock-free.
//
// TODO(edd): `table_names` is currently one exception to execution that is mostly
// lock-free. At the moment the read-lock is held for the duration of the
// call. Whilst this execution will probably be in the order of micro-seconds
// I plan to improve this situation in due course.
chunk_data: RwLock<TableData>,
}
impl Chunk {
pub fn new(id: u32, table: Table) -> Self {
let mut p = Self {
Self {
id,
meta: MetaData::new(&table),
tables: BTreeMap::new(),
};
p.tables.insert(table.name().to_owned(), table);
p
chunk_data: RwLock::new(TableData {
size: table.size(),
rows: table.rows(),
row_groups: table.row_groups(),
data: vec![(table.name().to_owned(), RwLock::new(table))]
.into_iter()
.collect(),
}),
}
}
/// The chunk's ID.
@ -43,77 +83,104 @@ impl Chunk {
/// The total size in bytes of all row groups in all tables in this chunk.
pub fn size(&self) -> u64 {
self.meta.size
self.chunk_data.read().unwrap().size
}
/// The total number of rows in all row groups in all tables in this chunk.
pub fn rows(&self) -> u64 {
self.meta.rows
self.chunk_data.read().unwrap().rows
}
/// The total number of row groups in all tables in this chunk.
pub fn row_groups(&self) -> usize {
self.meta.row_groups
self.chunk_data.read().unwrap().row_groups
}
/// The total number of tables in this chunk.
pub fn tables(&self) -> usize {
self.tables.len()
self.chunk_data.read().unwrap().data.len()
}
/// Returns true if the chunk contains data for this table.
pub fn has_table(&self, table_name: &str) -> bool {
self.tables.contains_key(table_name)
self.chunk_data
.read()
.unwrap()
.data
.contains_key(table_name)
}
/// Returns true if there are no tables under this chunk.
pub fn is_empty(&self) -> bool {
self.tables() == 0
self.chunk_data.read().unwrap().data.len() == 0
}
/// Add a row_group to a table in the chunk, updating all Chunk meta data.
///
/// This operation locks the chunk for the duration of the call.
pub fn upsert_table(&mut self, table_name: impl Into<String>, row_group: RowGroup) {
// update meta data
self.meta.update(&row_group);
let table_name = table_name.into();
let mut chunk_data = self.chunk_data.write().unwrap();
match self.tables.entry(table_name.clone()) {
Entry::Occupied(mut e) => {
let table = e.get_mut();
table.add_row_group(row_group);
// update the meta-data for this chunk with contents of row group.
chunk_data.size += row_group.size();
chunk_data.rows += row_group.rows() as u64;
chunk_data.row_groups += 1;
match chunk_data.data.entry(table_name.clone()) {
Entry::Occupied(table_entry) => {
let table = table_entry.get();
// lock the table (even though we have locked the chunk there
// could be in flight queries to the table by design).
table.write().unwrap().add_row_group(row_group);
}
Entry::Vacant(e) => {
e.insert(Table::new(table_name, row_group));
Entry::Vacant(table_entry) => {
// add a new table to this chunk.
table_entry.insert(RwLock::new(Table::new(table_name, row_group)));
}
};
}
/// Removes the table specified by `name` along with all of its contained
/// data. Data may not be freed until all concurrent read operations against
/// the specified table have finished.
///
/// Dropping a table that does not exist is effectively an no-op.
pub fn drop_table(&mut self, name: &str) {
let mut chunk_data = self.chunk_data.write().unwrap();
// Remove table and update chunk meta-data if table exists.
if let Some(table) = chunk_data.data.remove(name) {
chunk_data.size -= table.read().unwrap().size();
chunk_data.rows -= table.read().unwrap().rows();
chunk_data.row_groups -= table.read().unwrap().row_groups();
}
}
/// Returns an iterator of lazily executed `read_filter` operations on the
/// provided table for the specified column selections.
///
/// Results may be filtered by conjunctive predicates.
///
/// `None` indicates that the table was not found on the chunk, whilst a
/// `ReadFilterResults` value that immediately yields `None` indicates that
/// there were no matching results.
///
/// TODO(edd): Alternatively we could assert the caller must have done
/// appropriate pruning and that the table should always exist, meaning we
/// can blow up here and not need to return an option.
/// Results may be filtered by conjunctive predicates. Returns an error if
/// the specified table does not exist.
pub fn read_filter(
&self,
table_name: &str,
predicate: &Predicate,
select_columns: &Selection<'_>,
) -> Result<table::ReadFilterResults, Error> {
// Lookup table by name and dispatch execution.
match self.tables.get(table_name) {
Some(table) => Ok(table.read_filter(select_columns, predicate)),
None => crate::TableNotFound {
table_name: table_name.to_owned(),
// Get reference to table from chunk if it exists.
let tables = self.chunk_data.read().unwrap();
let table = match tables.data.get(table_name) {
Some(table) => table.read().unwrap(),
None => {
return crate::TableNotFound {
table_name: table_name.to_owned(),
}
.fail()
}
.fail(),
}
};
Ok(table.read_filter(select_columns, predicate))
}
/// Returns an iterable collection of data in group columns and aggregate
@ -132,9 +199,17 @@ impl Chunk {
aggregates: &[(ColumnName<'_>, AggregateType)],
) -> Option<table::ReadAggregateResults> {
// Lookup table by name and dispatch execution.
self.tables
self.chunk_data
.read()
.unwrap()
.data
.get(table_name)
.map(|table| table.read_aggregate(predicate, group_columns, aggregates))
.map(|table| {
table
.read()
.unwrap()
.read_aggregate(predicate, group_columns, aggregates)
})
}
//
@ -150,25 +225,36 @@ impl Chunk {
pub fn table_names(
&self,
predicate: &Predicate,
skip_table_names: &BTreeSet<&String>,
) -> BTreeSet<&String> {
skip_table_names: &BTreeSet<String>,
) -> BTreeSet<String> {
if predicate.is_empty() {
return self
.tables
.chunk_data
.read()
.unwrap()
.data
.keys()
.filter(|&name| !skip_table_names.contains(name))
.cloned()
.collect::<BTreeSet<_>>();
}
self.tables
// TODO(edd): potential contention. The read lock is held on the chunk
// for the duration of determining if its table satisfies the predicate.
// This may be expensive in pathological cases. This can be improved
// by releasing the lock before doing the execution.
self.chunk_data
.read()
.unwrap()
.data
.iter()
.filter_map(|(name, table)| {
if skip_table_names.contains(name) {
return None;
}
match table.satisfies_predicate(predicate) {
true => Some(name),
match table.read().unwrap().satisfies_predicate(predicate) {
true => Some(name.to_owned()),
false => None,
}
})
@ -212,54 +298,6 @@ impl Chunk {
}
}
// `Chunk` metadata that is used to track statistics about the chunk and
// whether it could contain data necessary to execute a query.
struct MetaData {
size: u64, // size in bytes of the chunk
rows: u64, // Total number of rows across all tables
row_groups: usize, // Total number of row groups across all tables in the chunk.
// The total time range of *all* data (across all tables) within this
// chunk.
//
// This would only be None if the chunk contained only tables that had
// no time-stamp column or the values were all NULL.
time_range: Option<(i64, i64)>,
}
impl MetaData {
pub fn new(table: &Table) -> Self {
Self {
size: table.size(),
rows: table.rows(),
time_range: table.time_range(),
row_groups: 1,
}
}
/// Updates the meta data associated with the `Chunk` based on the provided
/// row_group
pub fn update(&mut self, table_data: &RowGroup) {
self.size += table_data.size();
self.rows += table_data.rows() as u64;
self.row_groups += 1;
let (them_min, them_max) = table_data.time_range();
self.time_range = Some(match self.time_range {
Some((this_min, this_max)) => (them_min.min(this_min), them_max.max(this_max)),
None => (them_min, them_max),
})
}
// invalidate should be called when a table is removed. All meta data must
// be determined by asking each table in the chunk for its meta data.
pub fn invalidate(&mut self) {
// Update size, rows, time_range by linearly scanning all tables.
todo!()
}
}
#[cfg(test)]
mod test {
use std::collections::BTreeMap;
@ -268,18 +306,73 @@ mod test {
use crate::row_group::{ColumnType, RowGroup};
use crate::{column::Column, BinaryExpr};
#[test]
fn add_remove_tables() {
// Create a Chunk from a Table.
let columns = vec![(
"time".to_owned(),
ColumnType::create_time(&[1_i64, 2, 3, 4, 5, 6][..]),
)]
.into_iter()
.collect::<BTreeMap<_, _>>();
let rg = RowGroup::new(6, columns);
let table = Table::new("table_1", rg);
let mut chunk = Chunk::new(22, table);
assert_eq!(chunk.rows(), 6);
assert_eq!(chunk.row_groups(), 1);
assert_eq!(chunk.tables(), 1);
// Add a row group to the same table in the Chunk.
let columns = vec![("time", ColumnType::create_time(&[-2_i64, 2, 8][..]))]
.into_iter()
.map(|(k, v)| (k.to_owned(), v))
.collect::<BTreeMap<_, _>>();
let rg = RowGroup::new(3, columns);
chunk.upsert_table("table_1", rg);
assert_eq!(chunk.rows(), 9);
assert_eq!(chunk.row_groups(), 2);
assert_eq!(chunk.tables(), 1);
// Add a row group to another table in the Chunk.
let columns = vec![("time", ColumnType::create_time(&[-3_i64, 2][..]))]
.into_iter()
.map(|(k, v)| (k.to_owned(), v))
.collect::<BTreeMap<_, _>>();
let rg = RowGroup::new(2, columns);
chunk.upsert_table("table_2", rg);
assert_eq!(chunk.rows(), 11);
assert_eq!(chunk.row_groups(), 3);
assert_eq!(chunk.tables(), 2);
// Drop table_1
chunk.drop_table("table_1");
assert_eq!(chunk.rows(), 2);
assert_eq!(chunk.row_groups(), 1);
assert_eq!(chunk.tables(), 1);
// Drop table_2 - empty table
chunk.drop_table("table_2");
assert_eq!(chunk.rows(), 0);
assert_eq!(chunk.row_groups(), 0);
assert_eq!(chunk.tables(), 0);
// Drop table_2 - no-op
chunk.drop_table("table_2");
assert_eq!(chunk.rows(), 0);
assert_eq!(chunk.row_groups(), 0);
assert_eq!(chunk.tables(), 0);
}
#[test]
fn table_names() {
let columns = vec![
(
"time",
ColumnType::Time(Column::from(&[1_i64, 2, 3, 4, 5, 6][..])),
),
("time", ColumnType::create_time(&[1_i64, 2, 3, 4, 5, 6][..])),
(
"region",
ColumnType::Tag(Column::from(
&["west", "west", "east", "west", "south", "north"][..],
)),
ColumnType::create_tag(&["west", "west", "east", "west", "south", "north"][..]),
),
]
.into_iter()
@ -302,7 +395,10 @@ mod test {
// All table names returned if no predicate and not in skip list
let table_names = chunk.table_names(
&Predicate::default(),
&["table_2".to_owned()].iter().collect::<BTreeSet<&String>>(),
&["table_2".to_owned()]
.iter()
.cloned()
.collect::<BTreeSet<String>>(),
);
assert_eq!(
table_names
@ -315,7 +411,10 @@ mod test {
// Table name not returned if it is in skip list
let table_names = chunk.table_names(
&Predicate::default(),
&["table_1".to_owned()].iter().collect::<BTreeSet<&String>>(),
&["table_1".to_owned()]
.iter()
.cloned()
.collect::<BTreeSet<String>>(),
);
assert!(table_names.is_empty());
@ -347,9 +446,7 @@ mod test {
),
(
"region",
ColumnType::Tag(Column::from(
&["west", "west", "east", "west", "south", "north"][..],
)),
ColumnType::create_tag(&["west", "west", "east", "west", "south", "north"][..]),
),
]
.into_iter()

View File

@ -1270,6 +1270,16 @@ impl ColumnType {
ColumnType::Time(c) => c.size(),
}
}
/// Helper function to construct a `Tag` column from a slice of `&str`
pub fn create_tag(values: &[&str]) -> Self {
Self::Tag(Column::from(values))
}
/// Helper function to construct a `Time` column from a slice of `i64`
pub fn create_time(values: &[i64]) -> Self {
Self::Time(Column::from(values))
}
}
#[derive(Debug, Clone)]

View File

@ -110,6 +110,11 @@ impl Table {
self.table_data.read().unwrap().meta.size
}
// Returns the total number of row groups in this table.
pub fn row_groups(&self) -> usize {
self.table_data.read().unwrap().data.len()
}
/// The number of rows in this table.
pub fn rows(&self) -> u64 {
self.table_data.read().unwrap().meta.rows