feat: Add "Chunks" to the Mutable Buffer (#596)

* refactor: Update docs, remove unused field

* refactor: rename partition -> chunk

* feat: Introduce new partition, which is a holder for Chunks

* refactor: Remove use of wal from mutable database

* refactor: cleanups, remove last direct use of chunks

* fix: delete old benchmarks

* fix: clippy sacrifice

* docs: tidy up comments

* refactor: remove unused error types

* chore: remove commented out tests
pull/24376/head
Andrew Lamb 2020-12-28 07:10:25 -05:00 committed by GitHub
parent c4952fc58d
commit 5fa77c32cc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 1115 additions and 1623 deletions

1
Cargo.lock generated
View File

@ -1804,7 +1804,6 @@ dependencies = [
"test_helpers",
"tokio",
"tracing",
"wal",
]
[[package]]

View File

@ -11,7 +11,6 @@ data_types = { path = "../data_types" }
generated_types = { path = "../generated_types" }
influxdb_line_protocol = { path = "../influxdb_line_protocol" }
query = { path = "../query" }
wal = { path = "../wal" }
test_helpers = { path = "../test_helpers" }
async-trait = "0.1"
@ -26,7 +25,3 @@ tracing = "0.1"
[dev-dependencies]
test_helpers = { path = "../test_helpers" }
criterion = "0.3"
[[bench]]
name = "benchmark"
harness = false

View File

@ -1,114 +0,0 @@
use criterion::{criterion_group, criterion_main, Criterion, Throughput};
use influxdb_line_protocol as line_parser;
use mutable_buffer::{restore_partitions_from_wal, MutableBufferDb};
use query::TSDatabase;
use wal::{Entry, WalBuilder};
type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
type Result<T, E = Error> = std::result::Result<T, E>;
const DAY_1_NS: i64 = 1_600_000_000_000_000_000;
const DAY_2_NS: i64 = 1_500_000_000_000_000_000;
const DAY_3_NS: i64 = 1_400_000_000_000_000_000;
const DAYS_NS: &[i64] = &[DAY_1_NS, DAY_2_NS, DAY_3_NS];
fn benchmark_restore_single_entry_single_partition(common_create_entries: &mut Criterion) {
let (entries, line_count) =
generate_single_entry_single_partition().expect("Unable to create benchmarking entries");
assert_eq!(entries.len(), 1);
assert_eq!(line_count, 1000);
let mut group = common_create_entries.benchmark_group("wal-restoration");
group.throughput(Throughput::Elements(line_count as u64));
group.bench_function("restore_single_entry_single_partition", |b| {
b.iter(|| {
let entries = entries.clone().into_iter().map(Ok);
let (partitions, _stats) = restore_partitions_from_wal(entries).unwrap();
assert_eq!(partitions.len(), 1);
})
});
group.finish();
}
#[tokio::main]
async fn generate_single_entry_single_partition() -> Result<(Vec<Entry>, usize)> {
common_create_entries(|add_entry| {
add_entry(create_line_protocol(1000, DAY_1_NS));
})
.await
}
fn benchmark_restore_multiple_entry_multiple_partition(common_create_entries: &mut Criterion) {
let (entries, line_count) = generate_multiple_entry_multiple_partition()
.expect("Unable to create benchmarking entries");
assert_eq!(entries.len(), 12);
assert_eq!(line_count, 12000);
let mut group = common_create_entries.benchmark_group("wal-restoration");
group.throughput(Throughput::Elements(line_count as u64));
group.bench_function("restore_multiple_entry_multiple_partition", |b| {
b.iter(|| {
let entries = entries.clone().into_iter().map(Ok);
let (partitions, _stats) = restore_partitions_from_wal(entries).unwrap();
assert_eq!(partitions.len(), 3);
})
});
group.finish();
}
#[tokio::main]
async fn generate_multiple_entry_multiple_partition() -> Result<(Vec<Entry>, usize)> {
common_create_entries(|add_entry| {
for &day_ns in DAYS_NS {
for _ in 0..4 {
add_entry(create_line_protocol(1000, day_ns))
}
}
})
.await
}
async fn common_create_entries(
mut f: impl FnMut(&mut dyn FnMut(String)),
) -> Result<(Vec<Entry>, usize)> {
let tmp_dir = test_helpers::tmp_dir()?;
let mut wal_dir = tmp_dir.as_ref().to_owned();
let db = MutableBufferDb::try_with_wal("mydb", &mut wal_dir).await?;
let mut lp_entries = Vec::new();
f(&mut |entry| lp_entries.push(entry));
let mut total_lines = 0;
for lp_entry in lp_entries {
let lines: Vec<_> = line_parser::parse_lines(&lp_entry).collect::<Result<_, _>>()?;
db.write_lines(&lines).await?;
total_lines += lines.len();
}
let wal_builder = WalBuilder::new(wal_dir);
let entries = wal_builder.entries()?.collect::<Result<_, _>>()?;
Ok((entries, total_lines))
}
fn create_line_protocol(entries: usize, timestamp_ns: i64) -> String {
use std::fmt::Write;
let mut s = String::new();
for _ in 0..entries {
writeln!(
s,
"processes,host=my.awesome.computer.example.com blocked=0i,zombies=0i,stopped=0i,running=42i,sleeping=999i,total=1041i,unknown=0i,idle=0i {timestamp}",
timestamp = timestamp_ns,
).unwrap();
}
s
}
criterion_group!(
benches,
benchmark_restore_single_entry_single_partition,
benchmark_restore_multiple_entry_multiple_partition,
);
criterion_main!(benches);

465
mutable_buffer/src/chunk.rs Normal file
View File

@ -0,0 +1,465 @@
//! Represents a Chunk of data (a collection of tables and their data within
//! some chunk) in the mutable store.
use arrow_deps::{
arrow::record_batch::RecordBatch,
datafusion::{
logical_plan::Expr, logical_plan::Operator, optimizer::utils::expr_to_column_names,
prelude::*,
},
};
use generated_types::wal as wb;
use std::collections::{BTreeSet, HashMap, HashSet};
use data_types::{partition_metadata::Table as TableStats, TIME_COLUMN_NAME};
use query::{
predicate::{Predicate, TimestampRange},
util::{visit_expression, AndExprBuilder, ExpressionVisitor},
};
use crate::dictionary::{Dictionary, Error as DictionaryError};
use crate::table::Table;
use snafu::{OptionExt, ResultExt, Snafu};
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Error writing table '{}': {}", table_name, source))]
TableWrite {
table_name: String,
source: crate::table::Error,
},
#[snafu(display("Table Error in '{}': {}", table_name, source))]
NamedTableError {
table_name: String,
source: crate::table::Error,
},
#[snafu(display("Table name {} not found in dictionary of chunk {}", table, chunk))]
TableNameNotFoundInDictionary {
table: String,
chunk: String,
source: crate::dictionary::Error,
},
#[snafu(display("Table ID {} not found in dictionary of chunk {}", table, chunk))]
TableIdNotFoundInDictionary {
table: u32,
chunk: String,
source: DictionaryError,
},
#[snafu(display("Table {} not found in chunk {}", table, chunk))]
TableNotFoundInChunk { table: u32, chunk: String },
#[snafu(display("Attempt to write table batch without a name"))]
TableWriteWithoutName,
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
#[derive(Debug)]
pub struct Chunk {
/// Chunk key for all rows in this the chunk
pub key: String,
/// `dictionary` maps &str -> u32. The u32s are used in place of String or
/// str to avoid slow string operations. The same dictionary is used for
/// table names, tag names, tag values, and column names.
// TODO: intern string field values too?
pub dictionary: Dictionary,
/// map of the dictionary ID for the table name to the table
pub tables: HashMap<u32, Table>,
}
/// Describes the result of translating a set of strings into
/// chunk specific ids
#[derive(Debug, PartialEq, Eq)]
pub enum ChunkIdSet {
/// At least one of the strings was not present in the chunks'
/// dictionary.
///
/// This is important when testing for the presence of all ids in
/// a set, as we know they can not all be present
AtLeastOneMissing,
/// All strings existed in this chunk's dictionary
Present(BTreeSet<u32>),
}
/// a 'Compiled' set of predicates / filters that can be evaluated on
/// this chunk (where strings have been translated to chunk
/// specific u32 ids)
#[derive(Debug)]
pub struct ChunkPredicate {
/// If present, restrict the request to just those tables whose
/// names are in table_names. If present but empty, means there
/// was a predicate but no tables named that way exist in the
/// chunk (so no table can pass)
pub table_name_predicate: Option<BTreeSet<u32>>,
/// Optional column restriction. If present, further
/// restrict any field columns returned to only those named, and
/// skip tables entirely when querying metadata that do not have
/// *any* of the fields
pub field_name_predicate: Option<BTreeSet<u32>>,
/// General DataFusion expressions (arbitrary predicates) applied
/// as a filter using logical conjuction (aka are 'AND'ed
/// together). Only rows that evaluate to TRUE for all these
/// expressions should be returned.
pub chunk_exprs: Vec<Expr>,
/// If Some, then the table must contain all columns specified
/// to pass the predicate
pub required_columns: Option<ChunkIdSet>,
/// The id of the "time" column in this chunk
pub time_column_id: u32,
/// Timestamp range: only rows within this range should be considered
pub range: Option<TimestampRange>,
}
impl ChunkPredicate {
/// Creates and adds a datafuson predicate representing the
/// combination of predicate and timestamp.
pub fn filter_expr(&self) -> Option<Expr> {
// build up a list of expressions
let mut builder =
AndExprBuilder::default().append_opt(self.make_timestamp_predicate_expr());
for expr in &self.chunk_exprs {
builder = builder.append_expr(expr.clone());
}
builder.build()
}
/// For plans which select a subset of fields, returns true if
/// the field should be included in the results
pub fn should_include_field(&self, field_id: u32) -> bool {
match &self.field_name_predicate {
None => true,
Some(field_restriction) => field_restriction.contains(&field_id),
}
}
/// Return true if this column is the time column
pub fn is_time_column(&self, id: u32) -> bool {
self.time_column_id == id
}
/// Creates a DataFusion predicate for appliying a timestamp range:
///
/// range.start <= time and time < range.end`
fn make_timestamp_predicate_expr(&self) -> Option<Expr> {
self.range.map(|range| make_range_expr(&range))
}
}
/// Creates expression like:
/// range.low <= time && time < range.high
fn make_range_expr(range: &TimestampRange) -> Expr {
let ts_low = lit(range.start).lt_eq(col(TIME_COLUMN_NAME));
let ts_high = col(TIME_COLUMN_NAME).lt(lit(range.end));
ts_low.and(ts_high)
}
impl Chunk {
pub fn new(key: impl Into<String>) -> Self {
Self {
key: key.into(),
dictionary: Dictionary::new(),
tables: HashMap::new(),
}
}
pub fn write_entry(&mut self, entry: &wb::WriteBufferEntry<'_>) -> Result<()> {
if let Some(table_batches) = entry.table_batches() {
for batch in table_batches {
self.write_table_batch(&batch)?;
}
}
Ok(())
}
fn write_table_batch(&mut self, batch: &wb::TableWriteBatch<'_>) -> Result<()> {
let table_name = batch.name().context(TableWriteWithoutName)?;
let table_id = self.dictionary.lookup_value_or_insert(table_name);
let table = self
.tables
.entry(table_id)
.or_insert_with(|| Table::new(table_id));
if let Some(rows) = batch.rows() {
table
.append_rows(&mut self.dictionary, &rows)
.context(TableWrite { table_name })?;
}
Ok(())
}
/// Translates `predicate` into per-chunk ids that can be
/// directly evaluated against tables in this chunk
pub fn compile_predicate(&self, predicate: &Predicate) -> Result<ChunkPredicate> {
let table_name_predicate = self.compile_string_list(predicate.table_names.as_ref());
let field_restriction = self.compile_string_list(predicate.field_columns.as_ref());
let time_column_id = self
.dictionary
.lookup_value(TIME_COLUMN_NAME)
.expect("time is in the chunk dictionary");
let range = predicate.range;
// it would be nice to avoid cloning all the exprs here.
let chunk_exprs = predicate.exprs.clone();
// In order to evaluate expressions in the table, all columns
// referenced in the expression must appear (I think, not sure
// about NOT, etc so panic if we see one of those);
let mut visitor = SupportVisitor {};
let mut predicate_columns: HashSet<String> = HashSet::new();
for expr in &chunk_exprs {
visit_expression(expr, &mut visitor);
expr_to_column_names(&expr, &mut predicate_columns).unwrap();
}
// if there are any column references in the expression, ensure they appear in
// any table
let required_columns = if predicate_columns.is_empty() {
None
} else {
Some(self.make_chunk_ids(predicate_columns.iter()))
};
Ok(ChunkPredicate {
table_name_predicate,
field_name_predicate: field_restriction,
chunk_exprs,
required_columns,
time_column_id,
range,
})
}
/// Converts a potential set of strings into a set of ids in terms
/// of this dictionary. If there are no matching Strings in the
/// chunks dictionary, those strings are ignored and a
/// (potentially empty) set is returned.
fn compile_string_list(&self, names: Option<&BTreeSet<String>>) -> Option<BTreeSet<u32>> {
names.map(|names| {
names
.iter()
.filter_map(|name| self.dictionary.id(name))
.collect::<BTreeSet<_>>()
})
}
/// Adds the ids of any columns in additional_required_columns to the
/// required columns of predicate
pub fn add_required_columns_to_predicate(
&self,
additional_required_columns: &HashSet<String>,
predicate: &mut ChunkPredicate,
) {
for column_name in additional_required_columns {
// Once know we have missing columns, no need to try
// and figure out if these any additional columns are needed
if Some(ChunkIdSet::AtLeastOneMissing) == predicate.required_columns {
return;
}
let column_id = self.dictionary.id(column_name);
// Update the required colunm list
predicate.required_columns = Some(match predicate.required_columns.take() {
None => {
if let Some(column_id) = column_id {
let mut symbols = BTreeSet::new();
symbols.insert(column_id);
ChunkIdSet::Present(symbols)
} else {
ChunkIdSet::AtLeastOneMissing
}
}
Some(ChunkIdSet::Present(mut symbols)) => {
if let Some(column_id) = column_id {
symbols.insert(column_id);
ChunkIdSet::Present(symbols)
} else {
ChunkIdSet::AtLeastOneMissing
}
}
Some(ChunkIdSet::AtLeastOneMissing) => {
unreachable!("Covered by case above while adding required columns to predicate")
}
});
}
}
/// returns true if data with chunk key `key` should be
/// written to this chunk,
pub fn should_write(&self, key: &str) -> bool {
self.key.starts_with(key)
}
/// Convert the table specified in this chunk into some number of
/// record batches, appended to dst
pub fn table_to_arrow(
&self,
dst: &mut Vec<RecordBatch>,
table_name: &str,
columns: &[&str],
) -> Result<()> {
let table = self.table(table_name)?;
dst.push(
table
.to_arrow(&self, columns)
.context(NamedTableError { table_name })?,
);
Ok(())
}
/// Returns a vec of the summary statistics of the tables in this chunk
pub fn table_stats(&self) -> Result<Vec<TableStats>> {
let mut stats = Vec::with_capacity(self.tables.len());
for (id, table) in &self.tables {
let name = self
.dictionary
.lookup_id(*id)
.context(TableIdNotFoundInDictionary {
table: *id,
chunk: &self.key,
})?;
let columns = table.stats();
stats.push(TableStats {
name: name.to_string(),
columns,
});
}
Ok(stats)
}
fn table(&self, table_name: &str) -> Result<&Table> {
let table_id =
self.dictionary
.lookup_value(table_name)
.context(TableNameNotFoundInDictionary {
table: table_name,
chunk: &self.key,
})?;
let table = self.tables.get(&table_id).context(TableNotFoundInChunk {
table: table_id,
chunk: &self.key,
})?;
Ok(table)
}
/// Translate a bunch of strings into a set of ids relative to this
/// chunk
pub fn make_chunk_ids<'a, I>(&self, predicate_columns: I) -> ChunkIdSet
where
I: Iterator<Item = &'a String>,
{
let mut symbols = BTreeSet::new();
for column_name in predicate_columns {
if let Some(column_id) = self.dictionary.id(column_name) {
symbols.insert(column_id);
} else {
return ChunkIdSet::AtLeastOneMissing;
}
}
ChunkIdSet::Present(symbols)
}
}
impl query::PartitionChunk for Chunk {
type Error = Error;
fn key(&self) -> &str {
&self.key
}
fn table_stats(&self) -> Result<Vec<TableStats>, Self::Error> {
self.table_stats()
}
fn table_to_arrow(
&self,
dst: &mut Vec<RecordBatch>,
table_name: &str,
columns: &[&str],
) -> Result<(), Self::Error> {
self.table_to_arrow(dst, table_name, columns)
}
}
/// Used to figure out if we know how to deal with this kind of
/// predicate in the write buffer
struct SupportVisitor {}
impl ExpressionVisitor for SupportVisitor {
fn pre_visit(&mut self, expr: &Expr) {
match expr {
Expr::Literal(..) => {}
Expr::Column(..) => {}
Expr::BinaryExpr { op, .. } => {
match op {
Operator::Eq
| Operator::Lt
| Operator::LtEq
| Operator::Gt
| Operator::GtEq
| Operator::Plus
| Operator::Minus
| Operator::Multiply
| Operator::Divide
| Operator::And
| Operator::Or => {}
// Unsupported (need to think about ramifications)
Operator::NotEq | Operator::Modulus | Operator::Like | Operator::NotLike => {
panic!("Unsupported binary operator in expression: {:?}", expr)
}
}
}
_ => panic!(
"Unsupported expression in mutable_buffer database: {:?}",
expr
),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_make_range_expr() {
// Test that the generated predicate is correct
let range = TimestampRange::new(101, 202);
let ts_predicate_expr = make_range_expr(&range);
let expected_string = "Int64(101) LtEq #time And #time Lt Int64(202)";
let actual_string = format!("{:?}", ts_predicate_expr);
assert_eq!(actual_string, expected_string);
}
}

View File

@ -27,7 +27,7 @@ pub enum Error {
pub type Result<T, E = Error> = std::result::Result<T, E>;
#[derive(Debug)]
/// Stores the actual data for columns in a partition along with summary
/// Stores the actual data for columns in a chunk along with summary
/// statistics
pub enum Column {
F64(Vec<Option<f64>>, Statistics<f64>),

File diff suppressed because it is too large Load Diff

View File

@ -1,4 +1,52 @@
//! Contains an in memory write buffer that stores incoming data, durably.
//! Contains an in memory mutable buffer that stores incoming data in
//! a structure that is designed to be quickly appended to as well as queried
//!
//! The mutable buffer is structured in this way:
//!
//! ┌───────────────────────────────────────────────┐
//! │ │
//! │ ┌────────────────┐ │
//! │ │ Database │ │
//! │ └────────────────┘ │
//! │ │ one partition per │
//! │ │ partition_key │
//! │ ▼ │
//! │ ┌────────────────┐ │
//! │ │ Partition │ │
//! │ └────────────────┘ │
//! │ │ one mutable Chunk │
//! │ │ 0 or more immutable │
//! │ ▼ Chunks │
//! │ ┌────────────────┐ │
//! │ │ Chunk │ │
//! │ └────────────────┘ │
//! │ │ multiple Tables (measurements) │
//! │ ▼ │
//! │ ┌────────────────┐ │
//! │ │ Table │ │
//! │ └────────────────┘ │
//! │ │ multiple Colums │
//! │ ▼ │
//! │ ┌────────────────┐ │
//! │ │ Column │ │
//! │ └────────────────┘ │
//! │ MutableBuffer │
//! │ │
//! └───────────────────────────────────────────────┘
//!
//! Each row of data is routed into a particular partitions based on
//! column values in that row. The partition's active (mutable) chunk is updated
//! with the new data.
//!
//! The currently active chunk in a partition can be rolled over. When
//! this happens, the chunk becomes immutable (read-only) and stops taking
//! writes. Any new writes to the same partition will create a
//! a new active chunk.
//!
//! Note: Strings in the mutable buffer are dictionary encoded (via
//! string interning) to reduce memory usage. This dictionary encoding
//! is done on a per-Chunk basis, so that as soon as the chunk becomes
//! "immutable" the corresponding dictionary also becomes immutable
#![deny(rust_2018_idioms)]
#![warn(
@ -8,15 +56,15 @@
clippy::use_self
)]
pub mod chunk;
mod column;
mod database;
mod dictionary;
pub mod partition;
mod partition;
mod store;
mod table;
// Allow restore partitions to be used outside of this crate (for
// Allow restore chunks to be used outside of this crate (for
// benchmarking)
pub use crate::database::MutableBufferDb;
pub use crate::partition::restore_partitions_from_wal;
pub use crate::store::MutableBufferDatabases;

View File

@ -1,551 +1,115 @@
use arrow_deps::{
arrow::record_batch::RecordBatch,
datafusion::{
logical_plan::Expr, logical_plan::Operator, optimizer::utils::expr_to_column_names,
prelude::*,
},
};
//! Holds one or more Chunks.
use generated_types::wal as wb;
use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
use wal::{Entry as WalEntry, Result as WalResult};
use std::sync::Arc;
use data_types::{partition_metadata::Table as TableStats, TIME_COLUMN_NAME};
use query::{
predicate::{Predicate, TimestampRange},
util::{visit_expression, AndExprBuilder, ExpressionVisitor},
};
use crate::chunk::{Chunk, Error as ChunkError};
use crate::dictionary::{Dictionary, Error as DictionaryError};
use crate::table::Table;
use snafu::{OptionExt, ResultExt, Snafu};
use snafu::{ResultExt, Snafu};
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Could not read WAL entry: {}", source))]
WalEntryRead { source: wal::Error },
#[snafu(display("Partition {} not found", partition))]
PartitionNotFound { partition: String },
#[snafu(display(
"Column name {} not found in dictionary of partition {}",
column,
partition
"Error writing to active chunk of partition with key '{}': {}",
partition_key,
source
))]
ColumnNameNotFoundInDictionary {
column: String,
partition: String,
source: crate::dictionary::Error,
WritingChunkData {
partition_key: String,
source: ChunkError,
},
#[snafu(display("Error writing table '{}': {}", table_name, source))]
TableWrite {
table_name: String,
source: crate::table::Error,
},
#[snafu(display("Table Error in '{}': {}", table_name, source))]
NamedTableError {
table_name: String,
source: crate::table::Error,
},
#[snafu(display(
"Table name {} not found in dictionary of partition {}",
table,
partition
))]
TableNameNotFoundInDictionary {
table: String,
partition: String,
source: crate::dictionary::Error,
},
#[snafu(display(
"Table ID {} not found in dictionary of partition {}",
table,
partition
))]
TableIdNotFoundInDictionary {
table: u32,
partition: String,
source: DictionaryError,
},
#[snafu(display("Table {} not found in partition {}", table, partition))]
TableNotFoundInPartition { table: u32, partition: String },
#[snafu(display("Attempt to write table batch without a name"))]
TableWriteWithoutName,
#[snafu(display("Error restoring WAL entry, missing partition key"))]
MissingPartitionKey,
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
#[derive(Debug)]
pub struct Partition {
pub key: String,
/// The partition key that is shared by all Chunks in this Partition
key: String,
/// `dictionary` maps &str -> u32. The u32s are used in place of String or
/// str to avoid slow string operations. The same dictionary is used for
/// table names, tag names, tag values, and column names.
// TODO: intern string field values too?
pub dictionary: Dictionary,
/// The active mutable Chunk; All new writes go to this chunk
mutable_chunk: Chunk,
/// map of the dictionary ID for the table name to the table
pub tables: HashMap<u32, Table>,
pub is_open: bool,
}
/// Describes the result of translating a set of strings into
/// partition specific ids
#[derive(Debug, PartialEq, Eq)]
pub enum PartitionIdSet {
/// At least one of the strings was not present in the partitions'
/// dictionary.
///
/// This is important when testing for the presence of all ids in
/// a set, as we know they can not all be present
AtLeastOneMissing,
/// All strings existed in this partition's dictionary
Present(BTreeSet<u32>),
}
/// a 'Compiled' set of predicates / filters that can be evaluated on
/// this partition (where strings have been translated to partition
/// specific u32 ids)
#[derive(Debug)]
pub struct PartitionPredicate {
/// If present, restrict the request to just those tables whose
/// names are in table_names. If present but empty, means there
/// was a predicate but no tables named that way exist in the
/// partition (so no table can pass)
pub table_name_predicate: Option<BTreeSet<u32>>,
/// Optional column restriction. If present, further
/// restrict any field columns returned to only those named, and
/// skip tables entirely when querying metadata that do not have
/// *any* of the fields
pub field_name_predicate: Option<BTreeSet<u32>>,
/// General DataFusion expressions (arbitrary predicates) applied
/// as a filter using logical conjuction (aka are 'AND'ed
/// together). Only rows that evaluate to TRUE for all these
/// expressions should be returned.
pub partition_exprs: Vec<Expr>,
/// If Some, then the table must contain all columns specified
/// to pass the predicate
pub required_columns: Option<PartitionIdSet>,
/// The id of the "time" column in this partition
pub time_column_id: u32,
/// Timestamp range: only rows within this range should be considered
pub range: Option<TimestampRange>,
}
impl PartitionPredicate {
/// Creates and adds a datafuson predicate representing the
/// combination of predicate and timestamp.
pub fn filter_expr(&self) -> Option<Expr> {
// build up a list of expressions
let mut builder =
AndExprBuilder::default().append_opt(self.make_timestamp_predicate_expr());
for expr in &self.partition_exprs {
builder = builder.append_expr(expr.clone());
}
builder.build()
}
/// For plans which select a subset of fields, returns true if
/// the field should be included in the results
pub fn should_include_field(&self, field_id: u32) -> bool {
match &self.field_name_predicate {
None => true,
Some(field_restriction) => field_restriction.contains(&field_id),
}
}
/// Return true if this column is the time column
pub fn is_time_column(&self, id: u32) -> bool {
self.time_column_id == id
}
/// Creates a DataFusion predicate for appliying a timestamp range:
///
/// range.start <= time and time < range.end`
fn make_timestamp_predicate_expr(&self) -> Option<Expr> {
self.range.map(|range| make_range_expr(&range))
}
}
/// Creates expression like:
/// range.low <= time && time < range.high
fn make_range_expr(range: &TimestampRange) -> Expr {
let ts_low = lit(range.start).lt_eq(col(TIME_COLUMN_NAME));
let ts_high = col(TIME_COLUMN_NAME).lt(lit(range.end));
ts_low.and(ts_high)
/// Immutable chunks which can no longer be written
immutable_chunks: Vec<Arc<Chunk>>,
}
impl Partition {
pub fn new(key: impl Into<String>) -> Self {
let key: String = key.into();
let mutable_chunk = Chunk::new(&key);
Self {
key: key.into(),
dictionary: Dictionary::new(),
tables: HashMap::new(),
is_open: true,
key,
mutable_chunk,
immutable_chunks: Vec::new(),
}
}
/// write data to the active mutable chunk
pub fn write_entry(&mut self, entry: &wb::WriteBufferEntry<'_>) -> Result<()> {
if let Some(table_batches) = entry.table_batches() {
for batch in table_batches {
self.write_table_batch(&batch)?;
}
}
Ok(())
assert_eq!(
entry
.partition_key()
.expect("partition key should be present"),
self.key
);
self.mutable_chunk
.write_entry(entry)
.with_context(|| WritingChunkData {
partition_key: entry.partition_key().unwrap(),
})
}
fn write_table_batch(&mut self, batch: &wb::TableWriteBatch<'_>) -> Result<()> {
let table_name = batch.name().context(TableWriteWithoutName)?;
let table_id = self.dictionary.lookup_value_or_insert(table_name);
let table = self
.tables
.entry(table_id)
.or_insert_with(|| Table::new(table_id));
if let Some(rows) = batch.rows() {
table
.append_rows(&mut self.dictionary, &rows)
.context(TableWrite { table_name })?;
}
Ok(())
/// roll over the active chunk into the immutable_chunks,
/// returning the most recently active chunk. Any new writes to
/// this partition will go to a new chunk
pub fn rollover_chunk(&mut self) -> Arc<Chunk> {
todo!();
}
/// Translates `predicate` into per-partition ids that can be
/// directly evaluated against tables in this partition
pub fn compile_predicate(&self, predicate: &Predicate) -> Result<PartitionPredicate> {
let table_name_predicate = self.compile_string_list(predicate.table_names.as_ref());
let field_restriction = self.compile_string_list(predicate.field_columns.as_ref());
let time_column_id = self
.dictionary
.lookup_value(TIME_COLUMN_NAME)
.expect("time is in the partition dictionary");
let range = predicate.range;
// it would be nice to avoid cloning all the exprs here.
let partition_exprs = predicate.exprs.clone();
// In order to evaluate expressions in the table, all columns
// referenced in the expression must appear (I think, not sure
// about NOT, etc so panic if we see one of those);
let mut visitor = SupportVisitor {};
let mut predicate_columns: HashSet<String> = HashSet::new();
for expr in &partition_exprs {
visit_expression(expr, &mut visitor);
expr_to_column_names(&expr, &mut predicate_columns).unwrap();
}
// if there are any column references in the expression, ensure they appear in
// any table
let required_columns = if predicate_columns.is_empty() {
None
} else {
Some(self.make_partition_ids(predicate_columns.iter()))
};
Ok(PartitionPredicate {
table_name_predicate,
field_name_predicate: field_restriction,
partition_exprs,
required_columns,
time_column_id,
range,
})
}
/// Converts a potential set of strings into a set of ids in terms
/// of this dictionary. If there are no matching Strings in the
/// partitions dictionary, those strings are ignored and a
/// (potentially empty) set is returned.
fn compile_string_list(&self, names: Option<&BTreeSet<String>>) -> Option<BTreeSet<u32>> {
names.map(|names| {
names
.iter()
.filter_map(|name| self.dictionary.id(name))
.collect::<BTreeSet<_>>()
})
}
/// Adds the ids of any columns in additional_required_columns to the
/// required columns of predicate
pub fn add_required_columns_to_predicate(
&self,
additional_required_columns: &HashSet<String>,
predicate: &mut PartitionPredicate,
) {
for column_name in additional_required_columns {
// Once know we have missing columns, no need to try
// and figure out if these any additional columns are needed
if Some(PartitionIdSet::AtLeastOneMissing) == predicate.required_columns {
return;
}
let column_id = self.dictionary.id(column_name);
// Update the required colunm list
predicate.required_columns = Some(match predicate.required_columns.take() {
None => {
if let Some(column_id) = column_id {
let mut symbols = BTreeSet::new();
symbols.insert(column_id);
PartitionIdSet::Present(symbols)
} else {
PartitionIdSet::AtLeastOneMissing
}
}
Some(PartitionIdSet::Present(mut symbols)) => {
if let Some(column_id) = column_id {
symbols.insert(column_id);
PartitionIdSet::Present(symbols)
} else {
PartitionIdSet::AtLeastOneMissing
}
}
Some(PartitionIdSet::AtLeastOneMissing) => {
unreachable!("Covered by case above while adding required columns to predicate")
}
});
}
}
/// returns true if data with partition key `key` should be
/// written to this partition,
pub fn should_write(&self, key: &str) -> bool {
self.key.starts_with(key) && self.is_open
}
/// Convert the table specified in this partition into an arrow record batch
pub fn table_to_arrow(&self, table_name: &str, columns: &[&str]) -> Result<RecordBatch> {
let table = self.table(table_name)?;
table
.to_arrow(&self, columns)
.context(NamedTableError { table_name })
}
/// Returns a vec of the summary statistics of the tables in this partition
pub fn table_stats(&self) -> Result<Vec<TableStats>> {
let mut stats = Vec::with_capacity(self.tables.len());
for (id, table) in &self.tables {
let name = self
.dictionary
.lookup_id(*id)
.context(TableIdNotFoundInDictionary {
table: *id,
partition: &self.key,
})?;
let columns = table.stats();
stats.push(TableStats {
name: name.to_string(),
columns,
});
}
Ok(stats)
}
fn table(&self, table_name: &str) -> Result<&Table> {
let table_id =
self.dictionary
.lookup_value(table_name)
.context(TableNameNotFoundInDictionary {
table: table_name,
partition: &self.key,
})?;
let table = self
.tables
.get(&table_id)
.context(TableNotFoundInPartition {
table: table_id,
partition: &self.key,
})?;
Ok(table)
}
/// Translate a bunch of strings into a set of ids relative to this
/// partition
pub fn make_partition_ids<'a, I>(&self, predicate_columns: I) -> PartitionIdSet
where
I: Iterator<Item = &'a String>,
{
let mut symbols = BTreeSet::new();
for column_name in predicate_columns {
if let Some(column_id) = self.dictionary.id(column_name) {
symbols.insert(column_id);
} else {
return PartitionIdSet::AtLeastOneMissing;
}
}
PartitionIdSet::Present(symbols)
}
}
impl query::PartitionChunk for Partition {
type Error = Error;
fn key(&self) -> &str {
pub fn key(&self) -> &str {
&self.key
}
fn table_stats(&self) -> Result<Vec<TableStats>, Self::Error> {
self.table_stats()
}
fn table_to_arrow(
&self,
table_name: &str,
columns: &[&str],
) -> Result<RecordBatch, Self::Error> {
self.table_to_arrow(table_name, columns)
/// Return an iterator over each Chunk in this partition
pub fn iter(&self) -> ChunkIter<'_> {
ChunkIter::new(self)
}
}
/// Used to figure out if we know how to deal with this kind of
/// predicate in the write buffer
struct SupportVisitor {}
pub struct ChunkIter<'a> {
partition: &'a Partition,
visited_mutable: bool,
next_immutable_index: usize,
}
impl ExpressionVisitor for SupportVisitor {
fn pre_visit(&mut self, expr: &Expr) {
match expr {
Expr::Literal(..) => {}
Expr::Column(..) => {}
Expr::BinaryExpr { op, .. } => {
match op {
Operator::Eq
| Operator::Lt
| Operator::LtEq
| Operator::Gt
| Operator::GtEq
| Operator::Plus
| Operator::Minus
| Operator::Multiply
| Operator::Divide
| Operator::And
| Operator::Or => {}
// Unsupported (need to think about ramifications)
Operator::NotEq | Operator::Modulus | Operator::Like | Operator::NotLike => {
panic!("Unsupported binary operator in expression: {:?}", expr)
}
}
}
_ => panic!(
"Unsupported expression in mutable_buffer database: {:?}",
expr
),
impl<'a> ChunkIter<'a> {
fn new(partition: &'a Partition) -> Self {
Self {
partition,
visited_mutable: false,
next_immutable_index: 0,
}
}
}
#[derive(Default, Debug)]
pub struct RestorationStats {
pub row_count: usize,
pub tables: BTreeSet<String>,
}
/// Iterates over chunks in a partition
impl<'a> Iterator for ChunkIter<'a> {
type Item = &'a Chunk;
/// Given a set of WAL entries, restore them into a set of Partitions.
pub fn restore_partitions_from_wal(
wal_entries: impl Iterator<Item = WalResult<WalEntry>>,
) -> Result<(Vec<Partition>, RestorationStats)> {
let mut stats = RestorationStats::default();
fn next(&mut self) -> Option<Self::Item> {
let partition = self.partition;
let mut partitions = BTreeMap::new();
for wal_entry in wal_entries {
let wal_entry = wal_entry.context(WalEntryRead)?;
let bytes = wal_entry.as_data();
let batch = flatbuffers::get_root::<wb::WriteBufferBatch<'_>>(&bytes);
if let Some(entries) = batch.entries() {
for entry in entries {
let partition_key = entry.partition_key().context(MissingPartitionKey)?;
if !partitions.contains_key(partition_key) {
partitions.insert(
partition_key.to_string(),
Partition::new(partition_key.to_string()),
);
}
let partition = partitions
.get_mut(partition_key)
.context(PartitionNotFound {
partition: partition_key,
})?;
partition.write_entry(&entry)?;
}
if !self.visited_mutable {
let chunk = &partition.mutable_chunk;
self.visited_mutable = true;
Some(chunk)
} else if self.next_immutable_index < partition.immutable_chunks.len() {
let chunk = &self.partition.immutable_chunks[self.next_immutable_index];
self.next_immutable_index += 1;
Some(chunk)
} else {
None
}
}
let partitions = partitions
.into_iter()
.map(|(_, p)| p)
.collect::<Vec<Partition>>();
// compute the stats
for p in &partitions {
for (id, table) in &p.tables {
let name = p
.dictionary
.lookup_id(*id)
.expect("table id wasn't inserted into dictionary on restore");
if !stats.tables.contains(name) {
stats.tables.insert(name.to_string());
}
stats.row_count += table.row_count();
}
}
Ok((partitions, stats))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_make_range_expr() {
// Test that the generated predicate is correct
let range = TimestampRange::new(101, 202);
let ts_predicate_expr = make_range_expr(&range);
let expected_string = "Int64(101) LtEq #time And #time Lt Int64(202)";
let actual_string = format!("{:?}", ts_predicate_expr);
assert_eq!(actual_string, expected_string);
}
}

View File

@ -1,9 +1,9 @@
use async_trait::async_trait;
use query::DatabaseStore;
use snafu::{ResultExt, Snafu};
use snafu::Snafu;
use tokio::sync::RwLock;
use std::{fs, sync::Arc};
use std::sync::Arc;
use std::{collections::BTreeMap, path::PathBuf};
@ -16,60 +16,16 @@ pub enum Error {
dir: PathBuf,
source: std::io::Error,
},
#[snafu(display("Error from database: {}", source))]
DatabaseError { source: crate::database::Error },
#[snafu(display("Error reading metadata: {}", source))]
ReadMetadataError { source: std::io::Error },
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
#[derive(Debug)]
#[derive(Debug, Default)]
pub struct MutableBufferDatabases {
databases: RwLock<BTreeMap<String, Arc<MutableBufferDb>>>,
base_dir: PathBuf,
}
impl MutableBufferDatabases {
pub fn new(base_dir: impl Into<PathBuf>) -> Self {
Self {
databases: RwLock::new(BTreeMap::new()),
base_dir: base_dir.into(),
}
}
/// wal_dirs will traverse the directories from the service base directory
/// and return the directories that contain WALs for databases, which
/// can be used to restore those DBs.
pub fn wal_dirs(&self) -> Result<Vec<PathBuf>> {
let entries = fs::read_dir(&self.base_dir).context(ReadError {
dir: &self.base_dir,
})?;
let mut dirs = vec![];
for entry in entries {
let entry = entry.context(ReadError {
dir: &self.base_dir,
})?;
let meta = entry.metadata().context(ReadMetadataError {})?;
if meta.is_dir() {
if let Some(p) = entry.path().iter().last() {
if let Some(s) = p.to_str() {
if !s.starts_with('.') {
dirs.push(entry.path());
};
}
};
};
}
Ok(dirs)
}
pub async fn add_db(&self, db: MutableBufferDb) {
let mut databases = self.databases.write().await;
databases.insert(db.name.clone(), Arc::new(db));
@ -106,9 +62,7 @@ impl DatabaseStore for MutableBufferDatabases {
return Ok(db.clone());
}
let db = MutableBufferDb::try_with_wal(name, &mut self.base_dir.clone())
.await
.context(DatabaseError)?;
let db = MutableBufferDb::new(name);
let db = Arc::new(db);
databases.insert(name.to_string(), db.clone());

View File

@ -10,11 +10,11 @@ use tracing::debug;
use std::{collections::BTreeSet, collections::HashMap, sync::Arc};
use crate::{
chunk::ChunkIdSet,
chunk::{Chunk, ChunkPredicate},
column,
column::Column,
dictionary::{Dictionary, Error as DictionaryError},
partition::PartitionIdSet,
partition::{Partition, PartitionPredicate},
};
use data_types::{partition_metadata::Column as ColumnStats, TIME_COLUMN_NAME};
use snafu::{OptionExt, ResultExt, Snafu};
@ -35,39 +35,13 @@ use arrow_deps::{
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Table {} not found", table))]
TableNotFound { table: String },
#[snafu(display(
"Column {} said it was type {} but extracting a value of that type failed",
column,
expected
))]
WalValueTypeMismatch { column: String, expected: String },
#[snafu(display(
"Tag value ID {} not found in dictionary of partition {}",
value,
partition
))]
#[snafu(display("Tag value ID {} not found in dictionary of chunk {}", value, chunk))]
TagValueIdNotFoundInDictionary {
value: u32,
partition: String,
chunk: String,
source: DictionaryError,
},
#[snafu(display(
"Column type mismatch for column {}: can't insert {} into column with type {}",
column,
inserted_value_type,
existing_column_type
))]
ColumnTypeMismatch {
column: String,
existing_column_type: String,
inserted_value_type: String,
},
#[snafu(display("Column error on column {}: {}", column, source))]
ColumnError {
column: String,
@ -93,39 +67,27 @@ pub enum Error {
InternalAggregateNotSelector { agg: Aggregate },
#[snafu(display(
"Column name '{}' not found in dictionary of partition {}",
"Column name '{}' not found in dictionary of chunk {}",
column_name,
partition
chunk
))]
ColumnNameNotFoundInDictionary {
column_name: String,
partition: String,
chunk: String,
source: DictionaryError,
},
#[snafu(display(
"Internal: Column id '{}' not found in dictionary of partition {}",
"Internal: Column id '{}' not found in dictionary of chunk {}",
column_id,
partition
chunk
))]
ColumnIdNotFoundInDictionary {
column_id: u32,
partition: String,
chunk: String,
source: DictionaryError,
},
#[snafu(display(
"Schema mismatch: for column {}: can't insert {} into column with type {}",
column,
inserted_value_type,
existing_column_type
))]
SchemaMismatch {
column: u32,
existing_column_type: String,
inserted_value_type: String,
},
#[snafu(display("Error building plan: {}", source))]
BuildingPlan {
source: datafusion::error::DataFusionError,
@ -134,12 +96,6 @@ pub enum Error {
#[snafu(display("arrow conversion error: {}", source))]
ArrowError { source: arrow::error::ArrowError },
#[snafu(display("Schema mismatch: for column {}: {}", column, source))]
InternalSchemaMismatch {
column: u32,
source: crate::column::Error,
},
#[snafu(display(
"No index entry found for column {} with id {}",
column_name,
@ -177,20 +133,15 @@ pub enum Error {
#[snafu(display("Duplicate group column '{}'", column_name))]
DuplicateGroupColumn { column_name: String },
#[snafu(display("Internal error converting schema to DFSchema: {}", source))]
InternalConvertingSchema {
source: datafusion::error::DataFusionError,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
#[derive(Debug)]
pub struct Table {
/// Name of the table as a u32 in the partition dictionary
/// Name of the table as a u32 in the chunk dictionary
pub id: u32,
/// Maps column name (as a u32 in the partition dictionary) to an index in
/// Maps column name (as a u32 in the chunk dictionary) to an index in
/// self.columns
pub column_id_to_index: HashMap<u32, usize>,
@ -297,9 +248,9 @@ impl Table {
/// combination of predicate and timestamp. Returns the builder
fn add_datafusion_predicate(
plan_builder: LogicalPlanBuilder,
partition_predicate: &PartitionPredicate,
chunk_predicate: &ChunkPredicate,
) -> Result<LogicalPlanBuilder> {
match partition_predicate.filter_expr() {
match chunk_predicate.filter_expr() {
Some(df_predicate) => plan_builder.filter(df_predicate).context(BuildingPlan),
None => Ok(plan_builder),
}
@ -316,12 +267,12 @@ impl Table {
/// InMemoryScan
pub fn tag_column_names_plan(
&self,
partition_predicate: &PartitionPredicate,
partition: &Partition,
chunk_predicate: &ChunkPredicate,
chunk: &Chunk,
) -> Result<LogicalPlan> {
let need_time_column = partition_predicate.range.is_some();
let need_time_column = chunk_predicate.range.is_some();
let time_column_id = partition_predicate.time_column_id;
let time_column_id = chunk_predicate.time_column_id;
// figure out the tag columns
let requested_columns_with_index = self
@ -338,7 +289,7 @@ impl Table {
if need_column {
// the id came out of our map, so it should always be valid
let column_name = partition.dictionary.lookup_id(column_id).unwrap();
let column_name = chunk.dictionary.lookup_id(column_id).unwrap();
Some((column_name, column_index))
} else {
None
@ -347,7 +298,7 @@ impl Table {
.collect::<Vec<_>>();
// TODO avoid materializing here
let data = self.to_arrow_impl(partition, &requested_columns_with_index)?;
let data = self.to_arrow_impl(chunk, &requested_columns_with_index)?;
let schema = data.schema();
@ -356,7 +307,7 @@ impl Table {
let plan_builder = LogicalPlanBuilder::scan_memory(vec![vec![data]], schema, projection)
.context(BuildingPlan)?;
let plan_builder = Self::add_datafusion_predicate(plan_builder, partition_predicate)?;
let plan_builder = Self::add_datafusion_predicate(plan_builder, chunk_predicate)?;
// add optional selection to remove time column
let plan_builder = if !need_time_column {
@ -384,7 +335,7 @@ impl Table {
debug!(
"Created column_name plan for table '{}':\n{}",
partition.dictionary.lookup_id(self.id).unwrap(),
chunk.dictionary.lookup_id(self.id).unwrap(),
plan.display_indent_schema()
);
@ -402,11 +353,11 @@ impl Table {
pub fn tag_values_plan(
&self,
column_name: &str,
partition_predicate: &PartitionPredicate,
partition: &Partition,
chunk_predicate: &ChunkPredicate,
chunk: &Chunk,
) -> Result<LogicalPlan> {
// Scan and Filter
let plan_builder = self.scan_with_predicates(partition_predicate, partition)?;
let plan_builder = self.scan_with_predicates(chunk_predicate, chunk)?;
let select_exprs = vec![col(column_name)];
@ -430,10 +381,10 @@ impl Table {
/// same) occur together in the plan
pub fn series_set_plan(
&self,
partition_predicate: &PartitionPredicate,
partition: &Partition,
chunk_predicate: &ChunkPredicate,
chunk: &Chunk,
) -> Result<SeriesSetPlan> {
self.series_set_plan_impl(partition_predicate, None, partition)
self.series_set_plan_impl(chunk_predicate, None, chunk)
}
/// Creates the plans for computing series set, ensuring that
@ -447,12 +398,12 @@ impl Table {
/// InMemoryScan
pub fn series_set_plan_impl(
&self,
partition_predicate: &PartitionPredicate,
chunk_predicate: &ChunkPredicate,
prefix_columns: Option<&[String]>,
partition: &Partition,
chunk: &Chunk,
) -> Result<SeriesSetPlan> {
let (mut tag_columns, field_columns) =
self.tag_and_field_column_names(partition_predicate, partition)?;
self.tag_and_field_column_names(chunk_predicate, chunk)?;
// reorder tag_columns to have the prefix columns, if requested
if let Some(prefix_columns) = prefix_columns {
@ -462,7 +413,7 @@ impl Table {
// TODO avoid materializing all the columns here (ideally we
// would use a data source and then let DataFusion prune out
// column references during its optimizer phase).
let plan_builder = self.scan_with_predicates(partition_predicate, partition)?;
let plan_builder = self.scan_with_predicates(chunk_predicate, chunk)?;
let mut sort_exprs = Vec::new();
sort_exprs.extend(tag_columns.iter().map(|c| c.into_sort_expr()));
@ -483,7 +434,7 @@ impl Table {
let plan = plan_builder.build().context(BuildingPlan)?;
Ok(SeriesSetPlan::new_from_shared_timestamp(
self.table_name(partition),
self.table_name(chunk),
plan,
tag_columns,
field_columns,
@ -491,18 +442,18 @@ impl Table {
}
/// Returns a LogialPlannBuilder which scans all columns in this
/// Table and has applied any predicates in `partition_predicate`
/// Table and has applied any predicates in `chunk_predicate`
///
/// Filter(predicate)
/// InMemoryScan
fn scan_with_predicates(
&self,
partition_predicate: &PartitionPredicate,
partition: &Partition,
chunk_predicate: &ChunkPredicate,
chunk: &Chunk,
) -> Result<LogicalPlanBuilder> {
// TODO avoid materializing all the columns here (ideally
// DataFusion can prune some of them out)
let data = self.all_to_arrow(partition)?;
let data = self.all_to_arrow(chunk)?;
let schema = data.schema();
@ -513,13 +464,13 @@ impl Table {
.context(BuildingPlan)?;
// Filtering
Self::add_datafusion_predicate(plan_builder, partition_predicate)
Self::add_datafusion_predicate(plan_builder, chunk_predicate)
}
/// Look up this table's name as a string
fn table_name(&self, partition: &Partition) -> Arc<String> {
fn table_name(&self, chunk: &Chunk) -> Arc<String> {
// I wonder if all this string creation will be too slow?
let table_name = partition
let table_name = chunk
.dictionary
.lookup_id(self.id)
.expect("looking up table name in dictionary")
@ -533,17 +484,17 @@ impl Table {
/// series_set_plan_impl for more details.
pub fn grouped_series_set_plan(
&self,
partition_predicate: &PartitionPredicate,
chunk_predicate: &ChunkPredicate,
agg: Aggregate,
group_columns: &[String],
partition: &Partition,
chunk: &Chunk,
) -> Result<SeriesSetPlan> {
let num_prefix_tag_group_columns = group_columns.len();
let plan = if let Aggregate::None = agg {
self.series_set_plan_impl(partition_predicate, Some(&group_columns), partition)?
self.series_set_plan_impl(chunk_predicate, Some(&group_columns), chunk)?
} else {
self.aggregate_series_set_plan(partition_predicate, agg, group_columns, partition)?
self.aggregate_series_set_plan(chunk_predicate, agg, group_columns, chunk)?
};
Ok(plan.grouped(num_prefix_tag_group_columns))
@ -589,20 +540,20 @@ impl Table {
/// InMemoryScan
pub fn aggregate_series_set_plan(
&self,
partition_predicate: &PartitionPredicate,
chunk_predicate: &ChunkPredicate,
agg: Aggregate,
group_columns: &[String],
partition: &Partition,
chunk: &Chunk,
) -> Result<SeriesSetPlan> {
let (tag_columns, field_columns) =
self.tag_and_field_column_names(partition_predicate, partition)?;
self.tag_and_field_column_names(chunk_predicate, chunk)?;
// order the tag columns so that the group keys come first (we will group and
// order in the same order)
let tag_columns = reorder_prefix(group_columns, tag_columns)?;
// Scan and Filter
let plan_builder = self.scan_with_predicates(partition_predicate, partition)?;
let plan_builder = self.scan_with_predicates(chunk_predicate, chunk)?;
// Group by all tag columns
let group_exprs = tag_columns
@ -614,7 +565,7 @@ impl Table {
agg_exprs,
field_columns,
} = AggExprs::new(agg, field_columns, |col_name| {
let index = self.column_index(partition, col_name)?;
let index = self.column_index(chunk, col_name)?;
Ok(self.columns[index].data_type())
})?;
@ -633,7 +584,7 @@ impl Table {
let plan = plan_builder.build().context(BuildingPlan)?;
Ok(SeriesSetPlan::new(
self.table_name(partition),
self.table_name(chunk),
plan,
tag_columns,
field_columns,
@ -670,17 +621,17 @@ impl Table {
/// InMemoryScan
pub fn window_grouped_series_set_plan(
&self,
partition_predicate: &PartitionPredicate,
chunk_predicate: &ChunkPredicate,
agg: Aggregate,
every: &WindowDuration,
offset: &WindowDuration,
partition: &Partition,
chunk: &Chunk,
) -> Result<SeriesSetPlan> {
let (tag_columns, field_columns) =
self.tag_and_field_column_names(partition_predicate, partition)?;
self.tag_and_field_column_names(chunk_predicate, chunk)?;
// Scan and Filter
let plan_builder = self.scan_with_predicates(partition_predicate, partition)?;
let plan_builder = self.scan_with_predicates(chunk_predicate, chunk)?;
// Group by all tag columns and the window bounds
let mut group_exprs = tag_columns
@ -714,7 +665,7 @@ impl Table {
let plan = plan_builder.build().context(BuildingPlan)?;
Ok(SeriesSetPlan::new_from_shared_timestamp(
self.table_name(partition),
self.table_name(chunk),
plan,
tag_columns,
field_columns,
@ -735,15 +686,15 @@ impl Table {
/// InMemoryScan
pub fn field_names_plan(
&self,
partition_predicate: &PartitionPredicate,
partition: &Partition,
chunk_predicate: &ChunkPredicate,
chunk: &Chunk,
) -> Result<LogicalPlan> {
// Scan and Filter
let plan_builder = self.scan_with_predicates(partition_predicate, partition)?;
let plan_builder = self.scan_with_predicates(chunk_predicate, chunk)?;
// Selection
let select_exprs = self
.field_and_time_column_names(partition_predicate, partition)
.field_and_time_column_names(chunk_predicate, chunk)
.into_iter()
.map(|c| c.into_expr())
.collect::<Vec<_>>();
@ -759,14 +710,14 @@ impl Table {
// have been applied. The vectors are sorted by lexically by name.
fn tag_and_field_column_names(
&self,
partition_predicate: &PartitionPredicate,
partition: &Partition,
chunk_predicate: &ChunkPredicate,
chunk: &Chunk,
) -> Result<(ArcStringVec, ArcStringVec)> {
let mut tag_columns = Vec::with_capacity(self.column_id_to_index.len());
let mut field_columns = Vec::with_capacity(self.column_id_to_index.len());
for (&column_id, &column_index) in &self.column_id_to_index {
let column_name = partition
let column_name = chunk
.dictionary
.lookup_id(column_id)
.expect("Find column name in dictionary");
@ -777,7 +728,7 @@ impl Table {
match self.columns[column_index] {
Column::Tag(_, _) => tag_columns.push(column_name),
_ => {
if partition_predicate.should_include_field(column_id) {
if chunk_predicate.should_include_field(column_id) {
field_columns.push(column_name)
}
}
@ -800,8 +751,8 @@ impl Table {
// Returns (field_columns and time) in sorted order
fn field_and_time_column_names(
&self,
partition_predicate: &PartitionPredicate,
partition: &Partition,
chunk_predicate: &ChunkPredicate,
chunk: &Chunk,
) -> ArcStringVec {
let mut field_columns = self
.column_id_to_index
@ -810,10 +761,10 @@ impl Table {
match self.columns[column_index] {
Column::Tag(_, _) => None, // skip tags
_ => {
if partition_predicate.should_include_field(column_id)
|| partition_predicate.is_time_column(column_id)
if chunk_predicate.should_include_field(column_id)
|| chunk_predicate.is_time_column(column_id)
{
let column_name = partition
let column_name = chunk
.dictionary
.lookup_id(column_id)
.expect("Find column name in dictionary");
@ -834,28 +785,26 @@ impl Table {
}
/// Converts this table to an arrow record batch.
pub fn to_arrow(
&self,
partition: &Partition,
requested_columns: &[&str],
) -> Result<RecordBatch> {
pub fn to_arrow(&self, chunk: &Chunk, requested_columns: &[&str]) -> Result<RecordBatch> {
// if requested columns is empty, retrieve all columns in the table
if requested_columns.is_empty() {
self.all_to_arrow(partition)
self.all_to_arrow(chunk)
} else {
let columns_with_index = self.column_names_with_index(partition, requested_columns)?;
let columns_with_index = self.column_names_with_index(chunk, requested_columns)?;
self.to_arrow_impl(partition, &columns_with_index)
self.to_arrow_impl(chunk, &columns_with_index)
}
}
fn column_index(&self, partition: &Partition, column_name: &str) -> Result<usize> {
let column_id = partition.dictionary.lookup_value(column_name).context(
ColumnNameNotFoundInDictionary {
column_name,
partition: &partition.key,
},
)?;
fn column_index(&self, chunk: &Chunk, column_name: &str) -> Result<usize> {
let column_id =
chunk
.dictionary
.lookup_value(column_name)
.context(ColumnNameNotFoundInDictionary {
column_name,
chunk: &chunk.key,
})?;
self.column_id_to_index
.get(&column_id)
@ -869,13 +818,13 @@ impl Table {
/// Returns (name, index) pairs for all named columns
fn column_names_with_index<'a>(
&self,
partition: &Partition,
chunk: &Chunk,
columns: &[&'a str],
) -> Result<Vec<(&'a str, usize)>> {
columns
.iter()
.map(|&column_name| {
let column_index = self.column_index(partition, column_name)?;
let column_index = self.column_index(chunk, column_name)?;
Ok((column_name, column_index))
})
@ -883,15 +832,15 @@ impl Table {
}
/// Convert all columns to an arrow record batch
pub fn all_to_arrow(&self, partition: &Partition) -> Result<RecordBatch> {
pub fn all_to_arrow(&self, chunk: &Chunk) -> Result<RecordBatch> {
let mut requested_columns_with_index = self
.column_id_to_index
.iter()
.map(|(&column_id, &column_index)| {
let column_name = partition.dictionary.lookup_id(column_id).context(
let column_name = chunk.dictionary.lookup_id(column_id).context(
ColumnIdNotFoundInDictionary {
column_id,
partition: &partition.key,
chunk: &chunk.key,
},
)?;
Ok((column_name, column_index))
@ -900,7 +849,7 @@ impl Table {
requested_columns_with_index.sort_by(|(a, _), (b, _)| a.cmp(b));
self.to_arrow_impl(partition, &requested_columns_with_index)
self.to_arrow_impl(chunk, &requested_columns_with_index)
}
/// Converts this table to an arrow record batch,
@ -908,7 +857,7 @@ impl Table {
/// requested columns with index are tuples of column_name, column_index
pub fn to_arrow_impl(
&self,
partition: &Partition,
chunk: &Chunk,
requested_columns_with_index: &[(&str, usize)],
) -> Result<RecordBatch> {
let mut fields = Vec::with_capacity(requested_columns_with_index.len());
@ -938,10 +887,10 @@ impl Table {
match v {
None => builder.append_null(),
Some(value_id) => {
let tag_value = partition.dictionary.lookup_id(*value_id).context(
let tag_value = chunk.dictionary.lookup_id(*value_id).context(
TagValueIdNotFoundInDictionary {
value: *value_id,
partition: &partition.key,
chunk: &chunk.key,
},
)?;
builder.append_value(tag_value)
@ -997,14 +946,12 @@ impl Table {
/// just that the entire table can not be ruled out.
///
/// false means that no rows in this table could possibly match
pub fn could_match_predicate(&self, partition_predicate: &PartitionPredicate) -> Result<bool> {
pub fn could_match_predicate(&self, chunk_predicate: &ChunkPredicate) -> Result<bool> {
Ok(
self.matches_column_name_predicate(partition_predicate.field_name_predicate.as_ref())
&& self.matches_table_name_predicate(
partition_predicate.table_name_predicate.as_ref(),
)
&& self.matches_timestamp_predicate(partition_predicate)?
&& self.has_columns(partition_predicate.required_columns.as_ref()),
self.matches_column_name_predicate(chunk_predicate.field_name_predicate.as_ref())
&& self.matches_table_name_predicate(chunk_predicate.table_name_predicate.as_ref())
&& self.matches_timestamp_predicate(chunk_predicate)?
&& self.has_columns(chunk_predicate.required_columns.as_ref()),
)
}
@ -1032,14 +979,11 @@ impl Table {
/// returns true if there are any timestamps in this table that
/// fall within the timestamp range
fn matches_timestamp_predicate(
&self,
partition_predicate: &PartitionPredicate,
) -> Result<bool> {
match &partition_predicate.range {
fn matches_timestamp_predicate(&self, chunk_predicate: &ChunkPredicate) -> Result<bool> {
match &chunk_predicate.range {
None => Ok(true),
Some(range) => {
let time_column_id = partition_predicate.time_column_id;
let time_column_id = chunk_predicate.time_column_id;
let time_column = self.column(time_column_id)?;
time_column.has_i64_range(range.start, range.end).context(
ColumnPredicateEvaluation {
@ -1052,11 +996,11 @@ impl Table {
/// returns true if no columns are specified, or the table has all
/// columns specified
fn has_columns(&self, columns: Option<&PartitionIdSet>) -> bool {
fn has_columns(&self, columns: Option<&ChunkIdSet>) -> bool {
if let Some(columns) = columns {
match columns {
PartitionIdSet::AtLeastOneMissing => return false,
PartitionIdSet::Present(symbols) => {
ChunkIdSet::AtLeastOneMissing => return false,
ChunkIdSet::Present(symbols) => {
for symbol in symbols {
if !self.column_id_to_index.contains_key(symbol) {
return false;
@ -1073,12 +1017,12 @@ impl Table {
pub fn column_matches_predicate<T>(
&self,
column: &[Option<T>],
partition_predicate: &PartitionPredicate,
chunk_predicate: &ChunkPredicate,
) -> Result<bool> {
match partition_predicate.range {
match chunk_predicate.range {
None => Ok(true),
Some(range) => {
let time_column_id = partition_predicate.time_column_id;
let time_column_id = chunk_predicate.time_column_id;
let time_column = self.column(time_column_id)?;
time_column
.has_non_null_i64_range(column, range.start, range.end)
@ -1329,8 +1273,8 @@ mod tests {
#[test]
fn test_has_columns() {
let mut partition = Partition::new("dummy_partition_key");
let dictionary = &mut partition.dictionary;
let mut chunk = Chunk::new("dummy_chunk_key");
let dictionary = &mut chunk.dictionary;
let mut table = Table::new(dictionary.lookup_value_or_insert("table_name"));
let lp_lines = vec![
@ -1345,34 +1289,34 @@ mod tests {
assert!(table.has_columns(None));
let pred = PartitionIdSet::AtLeastOneMissing;
let pred = ChunkIdSet::AtLeastOneMissing;
assert!(!table.has_columns(Some(&pred)));
let set = BTreeSet::<u32>::new();
let pred = PartitionIdSet::Present(set);
let pred = ChunkIdSet::Present(set);
assert!(table.has_columns(Some(&pred)));
let mut set = BTreeSet::new();
set.insert(state_symbol);
let pred = PartitionIdSet::Present(set);
let pred = ChunkIdSet::Present(set);
assert!(table.has_columns(Some(&pred)));
let mut set = BTreeSet::new();
set.insert(new_symbol);
let pred = PartitionIdSet::Present(set);
let pred = ChunkIdSet::Present(set);
assert!(!table.has_columns(Some(&pred)));
let mut set = BTreeSet::new();
set.insert(state_symbol);
set.insert(new_symbol);
let pred = PartitionIdSet::Present(set);
let pred = ChunkIdSet::Present(set);
assert!(!table.has_columns(Some(&pred)));
}
#[test]
fn test_matches_table_name_predicate() {
let mut partition = Partition::new("dummy_partition_key");
let dictionary = &mut partition.dictionary;
let mut chunk = Chunk::new("dummy_chunk_key");
let dictionary = &mut chunk.dictionary;
let mut table = Table::new(dictionary.lookup_value_or_insert("h2o"));
let lp_lines = vec![
@ -1401,8 +1345,8 @@ mod tests {
#[test]
fn test_matches_column_name_predicate() {
let mut partition = Partition::new("dummy_partition_key");
let dictionary = &mut partition.dictionary;
let mut chunk = Chunk::new("dummy_chunk_key");
let dictionary = &mut chunk.dictionary;
let mut table = Table::new(dictionary.lookup_value_or_insert("h2o"));
let lp_lines = vec![
@ -1447,8 +1391,8 @@ mod tests {
#[tokio::test]
async fn test_series_set_plan() {
let mut partition = Partition::new("dummy_partition_key");
let dictionary = &mut partition.dictionary;
let mut chunk = Chunk::new("dummy_chunk_key");
let dictionary = &mut chunk.dictionary;
let mut table = Table::new(dictionary.lookup_value_or_insert("table_name"));
let lp_lines = vec![
@ -1461,9 +1405,9 @@ mod tests {
write_lines_to_table(&mut table, dictionary, lp_lines);
let predicate = PredicateBuilder::default().build();
let partition_predicate = partition.compile_predicate(&predicate).unwrap();
let chunk_predicate = chunk.compile_predicate(&predicate).unwrap();
let series_set_plan = table
.series_set_plan(&partition_predicate, &partition)
.series_set_plan(&chunk_predicate, &chunk)
.expect("creating the series set plan");
assert_eq!(series_set_plan.table_name.as_ref(), "table_name");
@ -1494,8 +1438,8 @@ mod tests {
// test that the columns and rows come out in the right order (tags then
// timestamp)
let mut partition = Partition::new("dummy_partition_key");
let dictionary = &mut partition.dictionary;
let mut chunk = Chunk::new("dummy_chunk_key");
let dictionary = &mut chunk.dictionary;
let mut table = Table::new(dictionary.lookup_value_or_insert("table_name"));
let lp_lines = vec![
@ -1509,9 +1453,9 @@ mod tests {
write_lines_to_table(&mut table, dictionary, lp_lines);
let predicate = PredicateBuilder::default().build();
let partition_predicate = partition.compile_predicate(&predicate).unwrap();
let chunk_predicate = chunk.compile_predicate(&predicate).unwrap();
let series_set_plan = table
.series_set_plan(&partition_predicate, &partition)
.series_set_plan(&chunk_predicate, &chunk)
.expect("creating the series set plan");
assert_eq!(series_set_plan.table_name.as_ref(), "table_name");
@ -1543,8 +1487,8 @@ mod tests {
async fn test_series_set_plan_filter() {
// test that filters are applied reasonably
let mut partition = Partition::new("dummy_partition_key");
let dictionary = &mut partition.dictionary;
let mut chunk = Chunk::new("dummy_chunk_key");
let dictionary = &mut chunk.dictionary;
let mut table = Table::new(dictionary.lookup_value_or_insert("table_name"));
let lp_lines = vec![
@ -1561,10 +1505,10 @@ mod tests {
.timestamp_range(190, 210)
.build();
let partition_predicate = partition.compile_predicate(&predicate).unwrap();
let chunk_predicate = chunk.compile_predicate(&predicate).unwrap();
let series_set_plan = table
.series_set_plan(&partition_predicate, &partition)
.series_set_plan(&chunk_predicate, &chunk)
.expect("creating the series set plan");
assert_eq!(series_set_plan.table_name.as_ref(), "table_name");
@ -1932,8 +1876,8 @@ mod tests {
#[tokio::test]
async fn test_grouped_window_series_set_plan_nanoseconds() {
let mut partition = Partition::new("dummy_partition_key");
let dictionary = &mut partition.dictionary;
let mut chunk = Chunk::new("dummy_chunk_key");
let dictionary = &mut chunk.dictionary;
let mut table = Table::new(dictionary.lookup_value_or_insert("table_name"));
let lp_lines = vec![
@ -1961,14 +1905,14 @@ mod tests {
.add_expr(col("city").eq(lit("Boston")).or(col("city").eq(lit("LA"))))
.timestamp_range(100, 450)
.build();
let partition_predicate = partition.compile_predicate(&predicate).unwrap();
let chunk_predicate = chunk.compile_predicate(&predicate).unwrap();
let agg = Aggregate::Mean;
let every = WindowDuration::from_nanoseconds(200);
let offset = WindowDuration::from_nanoseconds(0);
let plan = table
.window_grouped_series_set_plan(&partition_predicate, agg, &every, &offset, &partition)
.window_grouped_series_set_plan(&chunk_predicate, agg, &every, &offset, &chunk)
.expect("creating the grouped_series set plan");
assert_eq!(plan.tag_columns, *str_vec_to_arc_vec(&["city", "state"]));
@ -1996,8 +1940,8 @@ mod tests {
#[tokio::test]
async fn test_grouped_window_series_set_plan_months() {
let mut partition = Partition::new("dummy_partition_key");
let dictionary = &mut partition.dictionary;
let mut chunk = Chunk::new("dummy_chunk_key");
let dictionary = &mut chunk.dictionary;
let mut table = Table::new(dictionary.lookup_value_or_insert("table_name"));
let lp_lines = vec![
@ -2010,14 +1954,14 @@ mod tests {
write_lines_to_table(&mut table, dictionary, lp_lines);
let predicate = PredicateBuilder::default().build();
let partition_predicate = partition.compile_predicate(&predicate).unwrap();
let chunk_predicate = chunk.compile_predicate(&predicate).unwrap();
let agg = Aggregate::Mean;
let every = WindowDuration::from_months(1, false);
let offset = WindowDuration::from_months(0, false);
let plan = table
.window_grouped_series_set_plan(&partition_predicate, agg, &every, &offset, &partition)
.window_grouped_series_set_plan(&chunk_predicate, agg, &every, &offset, &chunk)
.expect("creating the grouped_series set plan");
assert_eq!(plan.tag_columns, *str_vec_to_arc_vec(&["city", "state"]));
@ -2041,8 +1985,8 @@ mod tests {
#[tokio::test]
async fn test_field_name_plan() {
let mut partition = Partition::new("dummy_partition_key");
let dictionary = &mut partition.dictionary;
let mut chunk = Chunk::new("dummy_chunk_key");
let dictionary = &mut chunk.dictionary;
let mut table = Table::new(dictionary.lookup_value_or_insert("table_name"));
let lp_lines = vec![
@ -2058,10 +2002,10 @@ mod tests {
let predicate = PredicateBuilder::default().timestamp_range(0, 200).build();
let partition_predicate = partition.compile_predicate(&predicate).unwrap();
let chunk_predicate = chunk.compile_predicate(&predicate).unwrap();
let field_names_set_plan = table
.field_names_plan(&partition_predicate, &partition)
.field_names_plan(&chunk_predicate, &chunk)
.expect("creating the field_name plan");
// run the created plan, ensuring the output is as expected
@ -2189,7 +2133,7 @@ mod tests {
let lines: Vec<_> = parse_lines(&lp_data).map(|l| l.unwrap()).collect();
let data = split_lines_into_write_entry_partitions(partition_key_func, &lines);
let data = split_lines_into_write_entry_partitions(chunk_key_func, &lines);
let batch = flatbuffers::get_root::<wb::WriteBufferBatch<'_>>(&data);
let entries = batch.entries().expect("at least one entry");
@ -2205,25 +2149,25 @@ mod tests {
}
}
fn partition_key_func(_: &ParsedLine<'_>) -> String {
String::from("the_partition_key")
fn chunk_key_func(_: &ParsedLine<'_>) -> String {
String::from("the_chunk_key")
}
/// Pre-loaded Table for use in tests
struct TableFixture {
partition: Partition,
chunk: Chunk,
table: Table,
}
impl TableFixture {
/// Create an Table with the specified lines loaded
fn new(lp_lines: Vec<&str>) -> Self {
let mut partition = Partition::new("dummy_partition_key");
let dictionary = &mut partition.dictionary;
let mut chunk = Chunk::new("dummy_chunk_key");
let dictionary = &mut chunk.dictionary;
let mut table = Table::new(dictionary.lookup_value_or_insert("table_name"));
write_lines_to_table(&mut table, dictionary, lp_lines);
Self { partition, table }
Self { chunk, table }
}
/// create a series set plan from the predicate ane aggregates
@ -2234,13 +2178,13 @@ mod tests {
agg: Aggregate,
group_columns: &[&str],
) -> Vec<String> {
let partition_predicate = self.partition.compile_predicate(&predicate).unwrap();
let chunk_predicate = self.chunk.compile_predicate(&predicate).unwrap();
let group_columns: Vec<_> = group_columns.iter().map(|s| String::from(*s)).collect();
let grouped_series_set_plan = self
.table
.grouped_series_set_plan(&partition_predicate, agg, &group_columns, &self.partition)
.grouped_series_set_plan(&chunk_predicate, agg, &group_columns, &self.chunk)
.expect("creating the grouped_series set plan");
// ensure the group prefix got to the right place

View File

@ -46,7 +46,7 @@ use self::predicate::{Predicate, TimestampRange};
#[async_trait]
pub trait TSDatabase: Debug + Send + Sync {
/// the type of partition that is stored by this database
type Partition: Send + Sync + 'static + PartitionChunk;
type Chunk: Send + Sync + 'static + PartitionChunk;
type Error: std::error::Error + Send + Sync + 'static;
/// writes parsed lines into this database
@ -109,7 +109,7 @@ pub trait TSDatabase: Debug + Send + Sync {
#[async_trait]
pub trait SQLDatabase: Debug + Send + Sync {
/// the type of partition that is stored by this database
type Partition: Send + Sync + 'static + PartitionChunk;
type Chunk: Send + Sync + 'static + PartitionChunk;
type Error: std::error::Error + Send + Sync + 'static;
/// Execute the specified query and return arrow record batches with the
@ -132,12 +132,6 @@ pub trait SQLDatabase: Debug + Send + Sync {
&self,
partition_key: &str,
) -> Result<Vec<String>, Self::Error>;
/// Removes the partition from the database and returns it
async fn remove_partition(
&self,
partition_key: &str,
) -> Result<Arc<Self::Partition>, Self::Error>;
}
/// Collection of data that shares the same partition key
@ -150,12 +144,13 @@ pub trait PartitionChunk: Debug + Send + Sync {
/// returns the partition metadata stats for every table in the partition
fn table_stats(&self) -> Result<Vec<TableStats>, Self::Error>;
/// converts the table to an Arrow RecordBatch
/// converts the table to an Arrow RecordBatch and writes to dst
fn table_to_arrow(
&self,
dst: &mut Vec<RecordBatch>,
table_name: &str,
columns: &[&str],
) -> Result<RecordBatch, Self::Error>;
) -> Result<(), Self::Error>;
}
#[async_trait]

View File

@ -62,6 +62,9 @@ pub struct Predicate {
/// Optional timestamp range: only rows within this range are included in
/// results. Other rows are excluded
pub range: Option<TimestampRange>,
/// Optional partition key filter
pub partition_key: Option<String>,
}
impl Predicate {
@ -157,6 +160,16 @@ impl PredicateBuilder {
self
}
/// Set the partition key restriction
pub fn partition_key(mut self, partition_key: impl Into<String>) -> Self {
assert!(
self.inner.partition_key.is_none(),
"multiple partition key predicates not suported"
);
self.inner.partition_key = Some(partition_key.into());
self
}
/// Create a predicate, consuming this builder
pub fn build(self) -> Predicate {
self.inner

View File

@ -221,6 +221,7 @@ fn predicate_to_test_string(predicate: &Predicate) -> String {
field_columns,
exprs,
range,
partition_key,
} = predicate;
let mut result = String::new();
@ -242,13 +243,17 @@ fn predicate_to_test_string(predicate: &Predicate) -> String {
write!(result, " range: {:?}", range).unwrap();
}
if let Some(partition_key) = partition_key {
write!(result, " partition_key: {:?}", partition_key).unwrap();
}
write!(result, "}}").unwrap();
result
}
#[async_trait]
impl TSDatabase for TestDatabase {
type Partition = TestPartition;
type Chunk = TestChunk;
type Error = TestError;
/// Writes parsed lines into this database
@ -402,7 +407,7 @@ impl TSDatabase for TestDatabase {
#[async_trait]
impl SQLDatabase for TestDatabase {
type Partition = TestPartition;
type Chunk = TestChunk;
type Error = TestError;
/// Execute the specified query and return arrow record batches with the
@ -424,13 +429,6 @@ impl SQLDatabase for TestDatabase {
unimplemented!("table_names_for_partition not yet implemented for test database");
}
async fn remove_partition(
&self,
_partition_key: &str,
) -> Result<Arc<Self::Partition>, Self::Error> {
unimplemented!()
}
/// Fetch the specified table names and columns as Arrow
/// RecordBatches. Columns are returned in the order specified.
async fn table_to_arrow(
@ -443,9 +441,9 @@ impl SQLDatabase for TestDatabase {
}
#[derive(Debug)]
pub struct TestPartition {}
pub struct TestChunk {}
impl PartitionChunk for TestPartition {
impl PartitionChunk for TestChunk {
type Error = TestError;
fn key(&self) -> &str {
@ -458,9 +456,10 @@ impl PartitionChunk for TestPartition {
fn table_to_arrow(
&self,
_dst: &mut Vec<RecordBatch>,
_table_name: &str,
_columns: &[&str],
) -> Result<RecordBatch, Self::Error> {
) -> Result<(), Self::Error> {
unimplemented!()
}
}

View File

@ -1,4 +1,4 @@
//! This module contains code for snapshotting a database partition to Parquet
//! This module contains code for snapshotting a database chunk to Parquet
//! files in object storage.
use arrow_deps::{
arrow::record_batch::RecordBatch,
@ -144,15 +144,14 @@ where
async fn run(&self, notify: Option<oneshot::Sender<()>>) -> Result<()> {
while let Some((pos, table_name)) = self.next_table() {
let batch = self
.partition
.table_to_arrow(table_name, &[])
let mut batches = Vec::new();
self.partition
.table_to_arrow(&mut batches, table_name, &[])
.map_err(|e| Box::new(e) as _)
.context(PartitionError)?;
let file_name = format!("{}/{}.parquet", &self.data_path, table_name);
self.write_batch(batch, &file_name).await?;
self.write_batches(batches, &file_name).await?;
self.mark_table_finished(pos);
if self.should_stop() {
@ -186,12 +185,14 @@ where
Ok(())
}
async fn write_batch(&self, batch: RecordBatch, file_name: &str) -> Result<()> {
async fn write_batches(&self, batches: Vec<RecordBatch>, file_name: &str) -> Result<()> {
let mem_writer = MemWriter::default();
{
let mut writer = ArrowWriter::try_new(mem_writer.clone(), batch.schema(), None)
let mut writer = ArrowWriter::try_new(mem_writer.clone(), batches[0].schema(), None)
.context(OpeningParquetWriter)?;
writer.write(&batch).context(WritingParquetToMemory)?;
for batch in batches.into_iter() {
writer.write(&batch).context(WritingParquetToMemory)?;
}
writer.close().context(ClosingParquetWriter)?;
} // drop the reference to the MemWriter that the SerializedFileWriter has
@ -234,7 +235,7 @@ pub struct Status {
error: Option<Error>,
}
pub fn snapshot_partition<T>(
pub fn snapshot_chunk<T>(
metadata_path: impl Into<String>,
data_path: impl Into<String>,
store: Arc<ObjectStore>,
@ -325,7 +326,7 @@ mod tests {
use data_types::database_rules::DatabaseRules;
use futures::TryStreamExt;
use influxdb_line_protocol::parse_lines;
use mutable_buffer::partition::Partition as PartitionWB;
use mutable_buffer::chunk::Chunk as ChunkWB;
use object_store::InMemory;
#[tokio::test]
@ -339,23 +340,23 @@ mem,host=A,region=west used=45 1
let lines: Vec<_> = parse_lines(lp).map(|l| l.unwrap()).collect();
let write = lines_to_replicated_write(1, 1, &lines, &DatabaseRules::default());
let mut partition = PartitionWB::new("testaroo");
let mut chunk = ChunkWB::new("testaroo");
for e in write.write_buffer_batch().unwrap().entries().unwrap() {
partition.write_entry(&e).unwrap();
chunk.write_entry(&e).unwrap();
}
let store = Arc::new(ObjectStore::new_in_memory(InMemory::new()));
let partition = Arc::new(partition);
let chunk = Arc::new(chunk);
let (tx, rx) = tokio::sync::oneshot::channel();
let metadata_path = "/meta";
let data_path = "/data";
let snapshot = snapshot_partition(
let snapshot = snapshot_chunk(
metadata_path,
data_path,
store.clone(),
partition.clone(),
chunk.clone(),
Some(tx),
)
.unwrap();
@ -393,16 +394,16 @@ mem,host=A,region=west used=45 1
];
let store = Arc::new(ObjectStore::new_in_memory(InMemory::new()));
let partition = Arc::new(PartitionWB::new("testaroo"));
let chunk = Arc::new(ChunkWB::new("testaroo"));
let metadata_path = "/meta".to_string();
let data_path = "/data".to_string();
let snapshot = Snapshot::new(
partition.key.clone(),
chunk.key.clone(),
metadata_path,
data_path,
store,
partition,
chunk,
tables,
);

View File

@ -497,7 +497,7 @@ async fn list_partitions<M: ConnectionManager + Send + Sync + Debug + 'static>(
struct SnapshotInfo {
org: String,
bucket: String,
partition: String,
chunk: String,
}
#[tracing::instrument(level = "debug")]
@ -542,13 +542,13 @@ async fn snapshot_partition<M: ConnectionManager + Send + Sync + Debug + 'static
})?;
let metadata_path = format!("{}/meta", &db_name);
let data_path = format!("{}/data/{}", &db_name, &snapshot.partition);
let partition = db.remove_partition(&snapshot.partition).await.unwrap();
let snapshot = server::snapshot::snapshot_partition(
let data_path = format!("{}/data/{}", &db_name, &snapshot.chunk);
let partition = db.rollover_partition(&snapshot.chunk).await.unwrap();
let snapshot = server::snapshot::snapshot_chunk(
metadata_path,
data_path,
server.store.clone(),
Arc::new(partition),
partition,
None,
)
.unwrap();

View File

@ -8,7 +8,11 @@
//! # wal
//!
//! This crate provides a WAL tailored for InfluxDB IOx `Partition`s
//! This crate provides a local-disk based WAL tailored for InfluxDB
//! IOx `Partition`s.
//!
//! It is not currently connected to anything, but the intent is to
//! permit IOx running in standalone mode better durability.
//!
//! Work remaining:
//!