diff --git a/Cargo.lock b/Cargo.lock index 9f2da24fe7..7a6f549bea 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1804,7 +1804,6 @@ dependencies = [ "test_helpers", "tokio", "tracing", - "wal", ] [[package]] diff --git a/mutable_buffer/Cargo.toml b/mutable_buffer/Cargo.toml index 26c5641489..821e40714b 100644 --- a/mutable_buffer/Cargo.toml +++ b/mutable_buffer/Cargo.toml @@ -11,7 +11,6 @@ data_types = { path = "../data_types" } generated_types = { path = "../generated_types" } influxdb_line_protocol = { path = "../influxdb_line_protocol" } query = { path = "../query" } -wal = { path = "../wal" } test_helpers = { path = "../test_helpers" } async-trait = "0.1" @@ -26,7 +25,3 @@ tracing = "0.1" [dev-dependencies] test_helpers = { path = "../test_helpers" } criterion = "0.3" - -[[bench]] -name = "benchmark" -harness = false diff --git a/mutable_buffer/benches/benchmark.rs b/mutable_buffer/benches/benchmark.rs deleted file mode 100644 index 97545d9faa..0000000000 --- a/mutable_buffer/benches/benchmark.rs +++ /dev/null @@ -1,114 +0,0 @@ -use criterion::{criterion_group, criterion_main, Criterion, Throughput}; -use influxdb_line_protocol as line_parser; -use mutable_buffer::{restore_partitions_from_wal, MutableBufferDb}; -use query::TSDatabase; -use wal::{Entry, WalBuilder}; - -type Error = Box; -type Result = std::result::Result; - -const DAY_1_NS: i64 = 1_600_000_000_000_000_000; -const DAY_2_NS: i64 = 1_500_000_000_000_000_000; -const DAY_3_NS: i64 = 1_400_000_000_000_000_000; -const DAYS_NS: &[i64] = &[DAY_1_NS, DAY_2_NS, DAY_3_NS]; - -fn benchmark_restore_single_entry_single_partition(common_create_entries: &mut Criterion) { - let (entries, line_count) = - generate_single_entry_single_partition().expect("Unable to create benchmarking entries"); - assert_eq!(entries.len(), 1); - assert_eq!(line_count, 1000); - - let mut group = common_create_entries.benchmark_group("wal-restoration"); - group.throughput(Throughput::Elements(line_count as u64)); - group.bench_function("restore_single_entry_single_partition", |b| { - b.iter(|| { - let entries = entries.clone().into_iter().map(Ok); - let (partitions, _stats) = restore_partitions_from_wal(entries).unwrap(); - assert_eq!(partitions.len(), 1); - }) - }); - group.finish(); -} - -#[tokio::main] -async fn generate_single_entry_single_partition() -> Result<(Vec, usize)> { - common_create_entries(|add_entry| { - add_entry(create_line_protocol(1000, DAY_1_NS)); - }) - .await -} - -fn benchmark_restore_multiple_entry_multiple_partition(common_create_entries: &mut Criterion) { - let (entries, line_count) = generate_multiple_entry_multiple_partition() - .expect("Unable to create benchmarking entries"); - assert_eq!(entries.len(), 12); - assert_eq!(line_count, 12000); - - let mut group = common_create_entries.benchmark_group("wal-restoration"); - group.throughput(Throughput::Elements(line_count as u64)); - group.bench_function("restore_multiple_entry_multiple_partition", |b| { - b.iter(|| { - let entries = entries.clone().into_iter().map(Ok); - let (partitions, _stats) = restore_partitions_from_wal(entries).unwrap(); - assert_eq!(partitions.len(), 3); - }) - }); - group.finish(); -} - -#[tokio::main] -async fn generate_multiple_entry_multiple_partition() -> Result<(Vec, usize)> { - common_create_entries(|add_entry| { - for &day_ns in DAYS_NS { - for _ in 0..4 { - add_entry(create_line_protocol(1000, day_ns)) - } - } - }) - .await -} - -async fn common_create_entries( - mut f: impl FnMut(&mut dyn FnMut(String)), -) -> Result<(Vec, usize)> { - let tmp_dir = test_helpers::tmp_dir()?; - let mut wal_dir = tmp_dir.as_ref().to_owned(); - let db = MutableBufferDb::try_with_wal("mydb", &mut wal_dir).await?; - - let mut lp_entries = Vec::new(); - f(&mut |entry| lp_entries.push(entry)); - - let mut total_lines = 0; - for lp_entry in lp_entries { - let lines: Vec<_> = line_parser::parse_lines(&lp_entry).collect::>()?; - db.write_lines(&lines).await?; - total_lines += lines.len(); - } - - let wal_builder = WalBuilder::new(wal_dir); - let entries = wal_builder.entries()?.collect::>()?; - - Ok((entries, total_lines)) -} - -fn create_line_protocol(entries: usize, timestamp_ns: i64) -> String { - use std::fmt::Write; - - let mut s = String::new(); - for _ in 0..entries { - writeln!( - s, - "processes,host=my.awesome.computer.example.com blocked=0i,zombies=0i,stopped=0i,running=42i,sleeping=999i,total=1041i,unknown=0i,idle=0i {timestamp}", - timestamp = timestamp_ns, - ).unwrap(); - } - s -} - -criterion_group!( - benches, - benchmark_restore_single_entry_single_partition, - benchmark_restore_multiple_entry_multiple_partition, -); - -criterion_main!(benches); diff --git a/mutable_buffer/src/chunk.rs b/mutable_buffer/src/chunk.rs new file mode 100644 index 0000000000..745644def1 --- /dev/null +++ b/mutable_buffer/src/chunk.rs @@ -0,0 +1,465 @@ +//! Represents a Chunk of data (a collection of tables and their data within +//! some chunk) in the mutable store. +use arrow_deps::{ + arrow::record_batch::RecordBatch, + datafusion::{ + logical_plan::Expr, logical_plan::Operator, optimizer::utils::expr_to_column_names, + prelude::*, + }, +}; +use generated_types::wal as wb; +use std::collections::{BTreeSet, HashMap, HashSet}; + +use data_types::{partition_metadata::Table as TableStats, TIME_COLUMN_NAME}; +use query::{ + predicate::{Predicate, TimestampRange}, + util::{visit_expression, AndExprBuilder, ExpressionVisitor}, +}; + +use crate::dictionary::{Dictionary, Error as DictionaryError}; +use crate::table::Table; + +use snafu::{OptionExt, ResultExt, Snafu}; + +#[derive(Debug, Snafu)] +pub enum Error { + #[snafu(display("Error writing table '{}': {}", table_name, source))] + TableWrite { + table_name: String, + source: crate::table::Error, + }, + + #[snafu(display("Table Error in '{}': {}", table_name, source))] + NamedTableError { + table_name: String, + source: crate::table::Error, + }, + + #[snafu(display("Table name {} not found in dictionary of chunk {}", table, chunk))] + TableNameNotFoundInDictionary { + table: String, + chunk: String, + source: crate::dictionary::Error, + }, + + #[snafu(display("Table ID {} not found in dictionary of chunk {}", table, chunk))] + TableIdNotFoundInDictionary { + table: u32, + chunk: String, + source: DictionaryError, + }, + + #[snafu(display("Table {} not found in chunk {}", table, chunk))] + TableNotFoundInChunk { table: u32, chunk: String }, + + #[snafu(display("Attempt to write table batch without a name"))] + TableWriteWithoutName, +} + +pub type Result = std::result::Result; + +#[derive(Debug)] +pub struct Chunk { + /// Chunk key for all rows in this the chunk + pub key: String, + + /// `dictionary` maps &str -> u32. The u32s are used in place of String or + /// str to avoid slow string operations. The same dictionary is used for + /// table names, tag names, tag values, and column names. + // TODO: intern string field values too? + pub dictionary: Dictionary, + + /// map of the dictionary ID for the table name to the table + pub tables: HashMap, +} + +/// Describes the result of translating a set of strings into +/// chunk specific ids +#[derive(Debug, PartialEq, Eq)] +pub enum ChunkIdSet { + /// At least one of the strings was not present in the chunks' + /// dictionary. + /// + /// This is important when testing for the presence of all ids in + /// a set, as we know they can not all be present + AtLeastOneMissing, + + /// All strings existed in this chunk's dictionary + Present(BTreeSet), +} + +/// a 'Compiled' set of predicates / filters that can be evaluated on +/// this chunk (where strings have been translated to chunk +/// specific u32 ids) +#[derive(Debug)] +pub struct ChunkPredicate { + /// If present, restrict the request to just those tables whose + /// names are in table_names. If present but empty, means there + /// was a predicate but no tables named that way exist in the + /// chunk (so no table can pass) + pub table_name_predicate: Option>, + + /// Optional column restriction. If present, further + /// restrict any field columns returned to only those named, and + /// skip tables entirely when querying metadata that do not have + /// *any* of the fields + pub field_name_predicate: Option>, + + /// General DataFusion expressions (arbitrary predicates) applied + /// as a filter using logical conjuction (aka are 'AND'ed + /// together). Only rows that evaluate to TRUE for all these + /// expressions should be returned. + pub chunk_exprs: Vec, + + /// If Some, then the table must contain all columns specified + /// to pass the predicate + pub required_columns: Option, + + /// The id of the "time" column in this chunk + pub time_column_id: u32, + + /// Timestamp range: only rows within this range should be considered + pub range: Option, +} + +impl ChunkPredicate { + /// Creates and adds a datafuson predicate representing the + /// combination of predicate and timestamp. + pub fn filter_expr(&self) -> Option { + // build up a list of expressions + let mut builder = + AndExprBuilder::default().append_opt(self.make_timestamp_predicate_expr()); + + for expr in &self.chunk_exprs { + builder = builder.append_expr(expr.clone()); + } + + builder.build() + } + + /// For plans which select a subset of fields, returns true if + /// the field should be included in the results + pub fn should_include_field(&self, field_id: u32) -> bool { + match &self.field_name_predicate { + None => true, + Some(field_restriction) => field_restriction.contains(&field_id), + } + } + + /// Return true if this column is the time column + pub fn is_time_column(&self, id: u32) -> bool { + self.time_column_id == id + } + + /// Creates a DataFusion predicate for appliying a timestamp range: + /// + /// range.start <= time and time < range.end` + fn make_timestamp_predicate_expr(&self) -> Option { + self.range.map(|range| make_range_expr(&range)) + } +} + +/// Creates expression like: +/// range.low <= time && time < range.high +fn make_range_expr(range: &TimestampRange) -> Expr { + let ts_low = lit(range.start).lt_eq(col(TIME_COLUMN_NAME)); + let ts_high = col(TIME_COLUMN_NAME).lt(lit(range.end)); + + ts_low.and(ts_high) +} + +impl Chunk { + pub fn new(key: impl Into) -> Self { + Self { + key: key.into(), + dictionary: Dictionary::new(), + tables: HashMap::new(), + } + } + + pub fn write_entry(&mut self, entry: &wb::WriteBufferEntry<'_>) -> Result<()> { + if let Some(table_batches) = entry.table_batches() { + for batch in table_batches { + self.write_table_batch(&batch)?; + } + } + + Ok(()) + } + + fn write_table_batch(&mut self, batch: &wb::TableWriteBatch<'_>) -> Result<()> { + let table_name = batch.name().context(TableWriteWithoutName)?; + let table_id = self.dictionary.lookup_value_or_insert(table_name); + + let table = self + .tables + .entry(table_id) + .or_insert_with(|| Table::new(table_id)); + + if let Some(rows) = batch.rows() { + table + .append_rows(&mut self.dictionary, &rows) + .context(TableWrite { table_name })?; + } + + Ok(()) + } + + /// Translates `predicate` into per-chunk ids that can be + /// directly evaluated against tables in this chunk + pub fn compile_predicate(&self, predicate: &Predicate) -> Result { + let table_name_predicate = self.compile_string_list(predicate.table_names.as_ref()); + + let field_restriction = self.compile_string_list(predicate.field_columns.as_ref()); + + let time_column_id = self + .dictionary + .lookup_value(TIME_COLUMN_NAME) + .expect("time is in the chunk dictionary"); + + let range = predicate.range; + + // it would be nice to avoid cloning all the exprs here. + let chunk_exprs = predicate.exprs.clone(); + + // In order to evaluate expressions in the table, all columns + // referenced in the expression must appear (I think, not sure + // about NOT, etc so panic if we see one of those); + let mut visitor = SupportVisitor {}; + let mut predicate_columns: HashSet = HashSet::new(); + for expr in &chunk_exprs { + visit_expression(expr, &mut visitor); + expr_to_column_names(&expr, &mut predicate_columns).unwrap(); + } + + // if there are any column references in the expression, ensure they appear in + // any table + let required_columns = if predicate_columns.is_empty() { + None + } else { + Some(self.make_chunk_ids(predicate_columns.iter())) + }; + + Ok(ChunkPredicate { + table_name_predicate, + field_name_predicate: field_restriction, + chunk_exprs, + required_columns, + time_column_id, + range, + }) + } + + /// Converts a potential set of strings into a set of ids in terms + /// of this dictionary. If there are no matching Strings in the + /// chunks dictionary, those strings are ignored and a + /// (potentially empty) set is returned. + fn compile_string_list(&self, names: Option<&BTreeSet>) -> Option> { + names.map(|names| { + names + .iter() + .filter_map(|name| self.dictionary.id(name)) + .collect::>() + }) + } + + /// Adds the ids of any columns in additional_required_columns to the + /// required columns of predicate + pub fn add_required_columns_to_predicate( + &self, + additional_required_columns: &HashSet, + predicate: &mut ChunkPredicate, + ) { + for column_name in additional_required_columns { + // Once know we have missing columns, no need to try + // and figure out if these any additional columns are needed + if Some(ChunkIdSet::AtLeastOneMissing) == predicate.required_columns { + return; + } + + let column_id = self.dictionary.id(column_name); + + // Update the required colunm list + predicate.required_columns = Some(match predicate.required_columns.take() { + None => { + if let Some(column_id) = column_id { + let mut symbols = BTreeSet::new(); + symbols.insert(column_id); + ChunkIdSet::Present(symbols) + } else { + ChunkIdSet::AtLeastOneMissing + } + } + Some(ChunkIdSet::Present(mut symbols)) => { + if let Some(column_id) = column_id { + symbols.insert(column_id); + ChunkIdSet::Present(symbols) + } else { + ChunkIdSet::AtLeastOneMissing + } + } + Some(ChunkIdSet::AtLeastOneMissing) => { + unreachable!("Covered by case above while adding required columns to predicate") + } + }); + } + } + + /// returns true if data with chunk key `key` should be + /// written to this chunk, + pub fn should_write(&self, key: &str) -> bool { + self.key.starts_with(key) + } + + /// Convert the table specified in this chunk into some number of + /// record batches, appended to dst + pub fn table_to_arrow( + &self, + dst: &mut Vec, + table_name: &str, + columns: &[&str], + ) -> Result<()> { + let table = self.table(table_name)?; + + dst.push( + table + .to_arrow(&self, columns) + .context(NamedTableError { table_name })?, + ); + Ok(()) + } + + /// Returns a vec of the summary statistics of the tables in this chunk + pub fn table_stats(&self) -> Result> { + let mut stats = Vec::with_capacity(self.tables.len()); + + for (id, table) in &self.tables { + let name = self + .dictionary + .lookup_id(*id) + .context(TableIdNotFoundInDictionary { + table: *id, + chunk: &self.key, + })?; + + let columns = table.stats(); + + stats.push(TableStats { + name: name.to_string(), + columns, + }); + } + + Ok(stats) + } + + fn table(&self, table_name: &str) -> Result<&Table> { + let table_id = + self.dictionary + .lookup_value(table_name) + .context(TableNameNotFoundInDictionary { + table: table_name, + chunk: &self.key, + })?; + + let table = self.tables.get(&table_id).context(TableNotFoundInChunk { + table: table_id, + chunk: &self.key, + })?; + + Ok(table) + } + + /// Translate a bunch of strings into a set of ids relative to this + /// chunk + pub fn make_chunk_ids<'a, I>(&self, predicate_columns: I) -> ChunkIdSet + where + I: Iterator, + { + let mut symbols = BTreeSet::new(); + for column_name in predicate_columns { + if let Some(column_id) = self.dictionary.id(column_name) { + symbols.insert(column_id); + } else { + return ChunkIdSet::AtLeastOneMissing; + } + } + + ChunkIdSet::Present(symbols) + } +} + +impl query::PartitionChunk for Chunk { + type Error = Error; + + fn key(&self) -> &str { + &self.key + } + + fn table_stats(&self) -> Result, Self::Error> { + self.table_stats() + } + + fn table_to_arrow( + &self, + dst: &mut Vec, + table_name: &str, + columns: &[&str], + ) -> Result<(), Self::Error> { + self.table_to_arrow(dst, table_name, columns) + } +} + +/// Used to figure out if we know how to deal with this kind of +/// predicate in the write buffer +struct SupportVisitor {} + +impl ExpressionVisitor for SupportVisitor { + fn pre_visit(&mut self, expr: &Expr) { + match expr { + Expr::Literal(..) => {} + Expr::Column(..) => {} + Expr::BinaryExpr { op, .. } => { + match op { + Operator::Eq + | Operator::Lt + | Operator::LtEq + | Operator::Gt + | Operator::GtEq + | Operator::Plus + | Operator::Minus + | Operator::Multiply + | Operator::Divide + | Operator::And + | Operator::Or => {} + // Unsupported (need to think about ramifications) + Operator::NotEq | Operator::Modulus | Operator::Like | Operator::NotLike => { + panic!("Unsupported binary operator in expression: {:?}", expr) + } + } + } + _ => panic!( + "Unsupported expression in mutable_buffer database: {:?}", + expr + ), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_make_range_expr() { + // Test that the generated predicate is correct + + let range = TimestampRange::new(101, 202); + + let ts_predicate_expr = make_range_expr(&range); + let expected_string = "Int64(101) LtEq #time And #time Lt Int64(202)"; + let actual_string = format!("{:?}", ts_predicate_expr); + + assert_eq!(actual_string, expected_string); + } +} diff --git a/mutable_buffer/src/column.rs b/mutable_buffer/src/column.rs index 13c8580f62..291c5b873b 100644 --- a/mutable_buffer/src/column.rs +++ b/mutable_buffer/src/column.rs @@ -27,7 +27,7 @@ pub enum Error { pub type Result = std::result::Result; #[derive(Debug)] -/// Stores the actual data for columns in a partition along with summary +/// Stores the actual data for columns in a chunk along with summary /// statistics pub enum Column { F64(Vec>, Statistics), diff --git a/mutable_buffer/src/database.rs b/mutable_buffer/src/database.rs index 1f21b424e3..2b29e9b5bc 100644 --- a/mutable_buffer/src/database.rs +++ b/mutable_buffer/src/database.rs @@ -1,6 +1,5 @@ use generated_types::wal as wb; use influxdb_line_protocol::ParsedLine; -use query::group_by::Aggregate; use query::group_by::GroupByAndAggregate; use query::group_by::WindowDuration; use query::{ @@ -8,25 +7,19 @@ use query::{ predicate::Predicate, SQLDatabase, TSDatabase, }; -use wal::{ - writer::{start_wal_sync_task, Error as WalWriterError, WalDetails}, - WalBuilder, -}; +use query::{group_by::Aggregate, predicate::PredicateBuilder}; use crate::column::Column; -use crate::partition::Partition; -use crate::{partition::PartitionPredicate, table::Table}; - -use std::io::ErrorKind; -use std::path::PathBuf; -use std::sync::Arc; -use std::{ - collections::{BTreeSet, HashSet}, - path::Path, +use crate::table::Table; +use crate::{ + chunk::{Chunk, ChunkPredicate}, + partition::Partition, }; +use std::collections::{BTreeSet, HashMap, HashSet}; +use std::sync::Arc; + use arrow_deps::{ - arrow, arrow::{datatypes::Schema as ArrowSchema, record_batch::RecordBatch}, datafusion::{ datasource::MemTable, error::DataFusionError, execution::context::ExecutionContext, @@ -36,124 +29,54 @@ use arrow_deps::{ use data_types::data::{split_lines_into_write_entry_partitions, ReplicatedWrite}; use crate::dictionary::Error as DictionaryError; -use crate::partition::restore_partitions_from_wal; use async_trait::async_trait; use chrono::{offset::TimeZone, Utc}; -use snafu::{OptionExt, ResultExt, Snafu}; +use snafu::{ResultExt, Snafu}; use sqlparser::{ ast::{SetExpr, Statement, TableFactor}, dialect::GenericDialect, parser::Parser, }; use tokio::sync::RwLock; -use tracing::info; #[derive(Debug, Snafu)] pub enum Error { - #[snafu(display("Dir {:?} invalid for DB", dir))] - OpenDb { dir: PathBuf }, - - #[snafu(display("Error opening WAL for database {}: {}", database, source))] - OpeningWal { - database: String, - source: WalWriterError, - }, - - #[snafu(display("Error writing to WAL for database {}: {}", database, source))] - WritingWal { - database: String, - source: WalWriterError, - }, - - #[snafu(display("Error opening WAL for database {}: {}", database, source))] - LoadingWal { - database: String, - source: wal::Error, - }, - - #[snafu(display("Error recovering WAL for database {}: {}", database, source))] - WalRecoverError { - database: String, - source: crate::partition::Error, - }, - - #[snafu(display("Error recovering WAL for partition {} on table {}", partition, table))] - WalPartitionError { partition: String, table: String }, - - #[snafu(display("Error recovering write from WAL, column id {} not found", column_id))] - WalColumnError { column_id: u16 }, - - #[snafu(display("Error creating db dir for {}: {}", database, err))] - CreatingWalDir { - database: String, - err: std::io::Error, - }, - - #[snafu(display("Database {} doesn't exist", database))] - DatabaseNotFound { database: String }, - - #[snafu(display("Partition {} is full", partition))] - PartitionFull { partition: String }, - #[snafu(display("Error in {}: {}", source_module, source))] PassThrough { source_module: &'static str, source: Box, }, - #[snafu(display( - "Table name {} not found in dictionary of partition {}", - table, - partition - ))] + #[snafu(display("Table name {} not found in dictionary of chunk {}", table, chunk))] TableNameNotFoundInDictionary { table: String, - partition: String, + chunk: String, source: DictionaryError, }, #[snafu(display( - "Table ID {} not found in dictionary of partition {}", - table, - partition - ))] - TableIdNotFoundInDictionary { - table: u32, - partition: String, - source: DictionaryError, - }, - - #[snafu(display( - "Column name {} not found in dictionary of partition {}", + "Column name {} not found in dictionary of chunk {}", column_name, - partition + chunk ))] ColumnNameNotFoundInDictionary { column_name: String, - partition: String, + chunk: String, source: DictionaryError, }, - #[snafu(display( - "Column ID {} not found in dictionary of partition {}", - column_id, - partition - ))] + #[snafu(display("Column ID {} not found in dictionary of chunk {}", column_id, chunk))] ColumnIdNotFoundInDictionary { column_id: u32, - partition: String, + chunk: String, source: DictionaryError, }, - #[snafu(display( - "Value ID {} not found in dictionary of partition {}", - value_id, - partition - ))] + #[snafu(display("Value ID {} not found in dictionary of chunk {}", value_id, chunk))] ColumnValueIdNotFoundInDictionary { value_id: u32, - partition: String, + chunk: String, source: DictionaryError, }, @@ -163,18 +86,6 @@ pub enum Error { ))] UnsupportedColumnTypeForListingValues { column_name: String }, - #[snafu(display("Table {} not found in partition {}", table, partition))] - TableNotFoundInPartition { table: u32, partition: String }, - - #[snafu(display("Internal Error: Column {} not found", column))] - InternalColumnNotFound { column: u32 }, - - #[snafu(display("Unexpected insert error"))] - InsertError, - - #[snafu(display("arrow conversion error: {}", source))] - ArrowError { source: arrow::error::ArrowError }, - #[snafu(display("id conversion error"))] IdConversionError { source: std::num::TryFromIntError }, @@ -196,26 +107,8 @@ pub enum Error { statement: Box, }, - #[snafu(display("query error {} on query {}", message, query))] - GenericQueryError { message: String, query: String }, - #[snafu(display("replicated write from writer {} missing payload", writer))] MissingPayload { writer: u32 }, - - #[snafu(display("partition {} not found", partition_key))] - PartitionNotFound { partition_key: String }, - - #[snafu(display( - "error converting partition table to arrow on partition {} with table {}: {}", - partition_key, - table_name, - source - ))] - PartitionTableToArrowError { - partition_key: String, - table_name: String, - source: crate::partition::Error, - }, } impl From for Error { @@ -227,6 +120,15 @@ impl From for Error { } } +impl From for Error { + fn from(e: crate::chunk::Error) -> Self { + Self::PassThrough { + source_module: "Chunk", + source: Box::new(e), + } + } +} + impl From for Error { fn from(e: crate::partition::Error) -> Self { Self::PassThrough { @@ -239,11 +141,13 @@ impl From for Error { pub type Result = std::result::Result; #[derive(Debug, Default)] +/// This implements the mutable buffer. See the module doc comments +/// for more details. pub struct MutableBufferDb { pub name: String, - // TODO: partitions need to be wrapped in an Arc if they're going to be used without this lock - partitions: RwLock>, - wal_details: Option, + + /// Maps partition keys to partitions which hold the actual data + partitions: RwLock>>>, } impl MutableBufferDb { @@ -255,99 +159,17 @@ impl MutableBufferDb { } } - /// Create a new DB that will create and use the Write Ahead Log - /// (WAL) directory `wal_dir` - pub async fn try_with_wal(name: impl Into, wal_dir: &mut PathBuf) -> Result { - let name = name.into(); - wal_dir.push(&name); - if let Err(e) = std::fs::create_dir(wal_dir.clone()) { - match e.kind() { - ErrorKind::AlreadyExists => (), - _ => { - return CreatingWalDir { - database: name, - err: e, - } - .fail() - } - } - } - let wal_builder = WalBuilder::new(wal_dir.clone()); - let wal_details = start_wal_sync_task(wal_builder) - .await - .context(OpeningWal { database: &name })?; - wal_details - .write_metadata() - .await - .context(OpeningWal { database: &name })?; - - Ok(Self { - name, - wal_details: Some(wal_details), - ..Default::default() - }) - } - - /// Create a new DB and initially restore pre-existing data in the - /// Write Ahead Log (WAL) directory `wal_dir` - pub async fn restore_from_wal(wal_dir: &Path) -> Result { - let now = std::time::Instant::now(); - let name = wal_dir - .iter() - .last() - .with_context(|| OpenDb { dir: &wal_dir })? - .to_str() - .with_context(|| OpenDb { dir: &wal_dir })? - .to_string(); - - let wal_builder = WalBuilder::new(wal_dir); - let wal_details = start_wal_sync_task(wal_builder.clone()) - .await - .context(OpeningWal { database: &name })?; - - // TODO: check wal metadata format - let entries = wal_builder - .entries() - .context(LoadingWal { database: &name })?; - - let (partitions, stats) = - restore_partitions_from_wal(entries).context(WalRecoverError { database: &name })?; - - let elapsed = now.elapsed(); - info!( - "{} database loaded {} rows in {:?} in {} tables", - &name, - stats.row_count, - elapsed, - stats.tables.len(), - ); - - info!("{} database partition count: {}", &name, partitions.len(),); - - Ok(Self { - name, - partitions: RwLock::new(partitions), - wal_details: Some(wal_details), - }) - } - + /// Directs the writes from batch into the appropriate partitions async fn write_entries_to_partitions(&self, batch: &wb::WriteBufferBatch<'_>) -> Result<()> { if let Some(entries) = batch.entries() { - let mut partitions = self.partitions.write().await; - for entry in entries { let key = entry .partition_key() .expect("partition key should have been inserted"); - match partitions.iter_mut().find(|p| p.should_write(key)) { - Some(p) => p.write_entry(&entry)?, - None => { - let mut p = Partition::new(key); - p.write_entry(&entry)?; - partitions.push(p) - } - } + let partition = self.get_partition(key).await; + let mut partition = partition.write().await; + partition.write_entry(&entry)? } } @@ -355,47 +177,38 @@ impl MutableBufferDb { } async fn table_to_arrow(&self, table_name: &str, columns: &[&str]) -> Result> { - let partitions = self.partitions.read().await; - - let batches = partitions - .iter() - .map(|p| p.table_to_arrow(table_name, columns)) - .collect::, crate::partition::Error>>()?; + let mut batches = Vec::new(); + for partition in self.partition_snapshot().await.into_iter() { + let partition = partition.read().await; + for chunk in partition.iter() { + chunk.table_to_arrow(&mut batches, table_name, columns)? + } + } Ok(batches) } - pub async fn remove_partition(&self, partition_key: &str) -> Result { - let mut partitions = self.partitions.write().await; - let pos = partitions - .iter() - .position(|p| p.key == partition_key) - .context(PartitionNotFound { partition_key })?; - - Ok(partitions.remove(pos)) + pub async fn rollover_partition(&self, partition_key: &str) -> Result> { + let partition = self.get_partition(partition_key).await; + let mut partition = partition.write().await; + Ok(partition.rollover_chunk()) } } #[async_trait] impl TSDatabase for MutableBufferDb { - type Partition = crate::partition::Partition; + type Chunk = crate::chunk::Chunk; type Error = Error; // TODO: writes lines creates a column named "time" for the timestamp data. If // we keep this we need to validate that no tag or field has the same // name. async fn write_lines(&self, lines: &[ParsedLine<'_>]) -> Result<(), Self::Error> { - let data = split_lines_into_write_entry_partitions(partition_key, lines); + let data = split_lines_into_write_entry_partitions(compute_partition_key, lines); let batch = flatbuffers::get_root::>(&data); self.write_entries_to_partitions(&batch).await?; - if let Some(wal) = &self.wal_details { - wal.write_and_sync(data).await.context(WritingWal { - database: &self.name, - })?; - } - Ok(()) } @@ -410,66 +223,28 @@ impl TSDatabase for MutableBufferDb { } }; - if let Some(wal) = &self.wal_details { - // TODO(paul): refactor this so we're not cloning. Although replicated writes - // shouldn't be using a WAL and how the WAL is used at all is - // likely to have a larger refactor soon. - wal.write_and_sync(write.data.clone()) - .await - .context(WritingWal { - database: &self.name, - })?; - } - Ok(()) } async fn table_names(&self, predicate: Predicate) -> Result { - // TODO: Cache this information to avoid creating this each time - let partitions = self.partitions.read().await; - - let mut table_names: BTreeSet = BTreeSet::new(); - for partition in partitions.iter() { - let partition_predicate = partition.compile_predicate(&predicate)?; - // this doesn't seem to make any sense - assert!( - partition_predicate.field_name_predicate.is_none(), - "Column selection for table names not supported" - ); - - // It might make sense to ask for all table names that - // have rows that pass a general purpose predicate. I am - // not sure if it is needed now, so panic - assert!( - partition_predicate.partition_exprs.is_empty(), - "General partition exprs on table name list are not supported" - ); - - for (table_name_symbol, table) in &partition.tables { - if table.could_match_predicate(&partition_predicate)? { - let table_name = partition.dictionary.lookup_id(*table_name_symbol).unwrap(); - - if !table_names.contains(table_name) { - table_names.insert(table_name.to_string()); - } - } - } - } - Ok(table_names.into()) + let mut filter = ChunkTableFilter::new(predicate); + let mut visitor = TableNameVisitor::new(); + self.accept(&mut filter, &mut visitor).await?; + Ok(visitor.into_inner().into()) } // return all column names in this database, while applying optional predicates async fn tag_column_names(&self, predicate: Predicate) -> Result { let has_exprs = predicate.has_exprs(); - let mut filter = PartitionTableFilter::new(predicate); + let mut filter = ChunkTableFilter::new(predicate); if has_exprs { let mut visitor = NamePredVisitor::new(); - self.visit_tables(&mut filter, &mut visitor).await?; + self.accept(&mut filter, &mut visitor).await?; Ok(visitor.plans.into()) } else { let mut visitor = NameVisitor::new(); - self.visit_tables(&mut filter, &mut visitor).await?; + self.accept(&mut filter, &mut visitor).await?; Ok(visitor.column_names.into()) } } @@ -477,9 +252,9 @@ impl TSDatabase for MutableBufferDb { /// return all field names in this database, while applying optional /// predicates async fn field_column_names(&self, predicate: Predicate) -> Result { - let mut filter = PartitionTableFilter::new(predicate); + let mut filter = ChunkTableFilter::new(predicate); let mut visitor = TableFieldPredVisitor::new(); - self.visit_tables(&mut filter, &mut visitor).await?; + self.accept(&mut filter, &mut visitor).await?; Ok(visitor.into_fieldlist_plan()) } @@ -491,23 +266,23 @@ impl TSDatabase for MutableBufferDb { predicate: Predicate, ) -> Result { let has_exprs = predicate.has_exprs(); - let mut filter = PartitionTableFilter::new(predicate); + let mut filter = ChunkTableFilter::new(predicate); if has_exprs { let mut visitor = ValuePredVisitor::new(column_name); - self.visit_tables(&mut filter, &mut visitor).await?; + self.accept(&mut filter, &mut visitor).await?; Ok(visitor.plans.into()) } else { let mut visitor = ValueVisitor::new(column_name); - self.visit_tables(&mut filter, &mut visitor).await?; + self.accept(&mut filter, &mut visitor).await?; Ok(visitor.column_values.into()) } } async fn query_series(&self, predicate: Predicate) -> Result { - let mut filter = PartitionTableFilter::new(predicate); + let mut filter = ChunkTableFilter::new(predicate); let mut visitor = SeriesVisitor::new(); - self.visit_tables(&mut filter, &mut visitor).await?; + self.accept(&mut filter, &mut visitor).await?; Ok(visitor.plans.into()) } @@ -516,7 +291,7 @@ impl TSDatabase for MutableBufferDb { predicate: Predicate, gby_agg: GroupByAndAggregate, ) -> Result { - let mut filter = PartitionTableFilter::new(predicate); + let mut filter = ChunkTableFilter::new(predicate); match gby_agg { GroupByAndAggregate::Columns { agg, group_columns } => { @@ -524,12 +299,12 @@ impl TSDatabase for MutableBufferDb { // can skip tables without those tags) let mut filter = filter.add_required_columns(&group_columns); let mut visitor = GroupsVisitor::new(agg, group_columns); - self.visit_tables(&mut filter, &mut visitor).await?; + self.accept(&mut filter, &mut visitor).await?; Ok(visitor.plans.into()) } GroupByAndAggregate::Window { agg, every, offset } => { let mut visitor = WindowGroupsVisitor::new(agg, every, offset); - self.visit_tables(&mut filter, &mut visitor).await?; + self.accept(&mut filter, &mut visitor).await?; Ok(visitor.plans.into()) } } @@ -538,7 +313,7 @@ impl TSDatabase for MutableBufferDb { #[async_trait] impl SQLDatabase for MutableBufferDb { - type Partition = Partition; + type Chunk = Chunk; type Error = Error; async fn query(&self, query: &str) -> Result, Self::Error> { @@ -607,89 +382,81 @@ impl SQLDatabase for MutableBufferDb { /// Return the partition keys for data in this DB async fn partition_keys(&self) -> Result, Self::Error> { let partitions = self.partitions.read().await; - let keys = partitions.iter().map(|p| p.key.clone()).collect(); - + let keys = partitions.keys().cloned().collect(); Ok(keys) } - /// Return the table names that are in a given partition key + /// Return all table names that are in a given partition key async fn table_names_for_partition( &self, partition_key: &str, ) -> Result, Self::Error> { - let partitions = self.partitions.read().await; - let partition = partitions - .iter() - .find(|p| p.key == partition_key) - .context(PartitionNotFound { partition_key })?; - - let mut tables = Vec::with_capacity(partition.tables.len()); - - for id in partition.tables.keys() { - let name = - partition - .dictionary - .lookup_id(*id) - .context(TableIdNotFoundInDictionary { - table: *id, - partition: &partition.key, - })?; - - tables.push(name.to_string()); - } - - Ok(tables) - } - - async fn remove_partition(&self, partition_key: &str) -> Result> { - let mut partitions = self.partitions.write().await; - let pos = partitions - .iter() - .position(|p| p.key == partition_key) - .context(PartitionNotFound { partition_key })?; - - Ok(Arc::new(partitions.remove(pos))) + let predicate = PredicateBuilder::default() + .partition_key(partition_key) + .build(); + let mut filter = ChunkTableFilter::new(predicate); + let mut visitor = TableNameVisitor::new(); + self.accept(&mut filter, &mut visitor).await?; + let names = visitor.into_inner().into_iter().collect(); + Ok(names) } } /// This trait is used to implement a "Visitor" pattern for Database /// which can be used to define logic that shares a common Depth First -/// Search (DFS) traversal of the Database --> Partition --> Table --> +/// Search (DFS) traversal of the Database --> Chunk --> Table --> /// Column datastructure heirarchy. /// /// Specifically, if we had a database like the following: /// /// YesterdayPartition -/// CPU Table1 -/// Col1 -/// TodayPartition -/// CPU Table2 -/// Col2 +/// Chunk1 +/// CPU Table1 +/// Col1 +/// Chunk2 +/// CPU Table1 +/// Col2 +/// TodayPartition +/// Chunk3 +/// CPU Table3 +/// Col3 /// /// Then the methods would be invoked in the following order /// /// visitor.pre_visit_partition(YesterdayPartition) +/// visitor.pre_visit_chunk(Chunk1) /// visitor.pre_visit_table(CPU Table1) /// visitor.visit_column(Col1) /// visitor.post_visit_table(CPU Table1) -/// visitor.post_visit_partition(YesterdayPartition) -/// visitor.pre_visit_partition(TodayPartition) +/// visitor.post_visit_chunk(Chunk1) +/// visitor.pre_visit_chunk(Chunk2) /// visitor.pre_visit_table(CPU Table2) /// visitor.visit_column(Col2) /// visitor.post_visit_table(CPU Table2) -/// visitor.post_visit_partition(TodayPartition) +/// visitor.post_visit_chunk(Chunk2) +/// visitor.pre_visit_partition(TodayPartition) +/// visitor.pre_visit_chunk(Chunk3) +/// visitor.pre_visit_table(CPU Table3) +/// visitor.visit_column(Col3) +/// visitor.post_visit_table(CPU Table3) +/// visitor.post_visit_chunk(Chunk3) trait Visitor { - // called once before any column in a partition is visisted + // called once before any chunk in a partition is visisted fn pre_visit_partition(&mut self, _partition: &Partition) -> Result<()> { Ok(()) } + // called once before any column in a chunk is visisted + fn pre_visit_chunk(&mut self, _chunk: &Chunk) -> Result<()> { + Ok(()) + } + // called once before any column in a Table is visited fn pre_visit_table( &mut self, _table: &Table, - _partition: &Partition, - _filter: &mut PartitionTableFilter, + _chunk: &Chunk, + _filter: &mut ChunkTableFilter, ) -> Result<()> { Ok(()) } @@ -700,18 +467,18 @@ trait Visitor { _table: &Table, _column_id: u32, _column: &Column, - _filter: &mut PartitionTableFilter, + _filter: &mut ChunkTableFilter, ) -> Result<()> { Ok(()) } // called once after all columns in a Table are visited - fn post_visit_table(&mut self, _table: &Table, _partition: &Partition) -> Result<()> { + fn post_visit_table(&mut self, _table: &Table, _chunk: &Chunk) -> Result<()> { Ok(()) } - // called once after all columns in a partition is visited - fn post_visit_partition(&mut self, _partition: &Partition) -> Result<()> { + // called once after all columns in a chunk is visited + fn post_visit_chunk(&mut self, _chunk: &Chunk) -> Result<()> { Ok(()) } } @@ -727,56 +494,89 @@ impl MutableBufferDb { self.partitions.read().await.is_empty() } + /// Retrieve (or create) the partition for the specified partition key + async fn get_partition(&self, partition_key: &str) -> Arc> { + // until we think this code is likely to be a contention hot + // spot, simply use a write lock even when often a read lock + // would do. + let mut partitions = self.partitions.write().await; + + if let Some(partition) = partitions.get(partition_key) { + partition.clone() + } else { + let partition = Arc::new(RwLock::new(Partition::new(partition_key))); + partitions.insert(partition_key.to_string(), partition.clone()); + partition + } + } + + /// get a snapshot of all the current partitions -- useful so that + /// while doing stuff with one partition we don't prevent creating + /// new partitions + /// + /// Note that since we don't hold the lock on self.partitions + /// after this returns, new partitions can be added, and some + /// partitions in the snapshot could be dropped from the overall + /// database + async fn partition_snapshot(&self) -> Vec>> { + let partitions = self.partitions.read().await; + partitions.values().cloned().collect() + } + /// Traverse this database's tables, calling the relevant /// functions, in order, of `visitor`, as described on the Visitor /// trait. /// /// Skips visiting any table or columns of `filter.should_visit_table` /// returns false - async fn visit_tables( + async fn accept( &self, - filter: &mut PartitionTableFilter, + filter: &mut ChunkTableFilter, visitor: &mut V, ) -> Result<()> { - let partitions = self.partitions.read().await; + for partition in self.partition_snapshot().await.into_iter() { + let partition = partition.read().await; - for partition in partitions.iter() { - visitor.pre_visit_partition(partition)?; - filter.pre_visit_partition(partition)?; + if filter.should_visit_partition(&partition)? { + for chunk in partition.iter() { + visitor.pre_visit_chunk(chunk)?; + filter.pre_visit_chunk(chunk)?; - for table in partition.tables.values() { - if filter.should_visit_table(table)? { - visitor.pre_visit_table(table, partition, filter)?; + for table in chunk.tables.values() { + if filter.should_visit_table(table)? { + visitor.pre_visit_table(table, chunk, filter)?; - for (column_id, column_index) in &table.column_id_to_index { - visitor.visit_column( - table, - *column_id, - &table.columns[*column_index], - filter, - )? + for (column_id, column_index) in &table.column_id_to_index { + visitor.visit_column( + table, + *column_id, + &table.columns[*column_index], + filter, + )? + } + + visitor.post_visit_table(table, chunk)?; + } } - - visitor.post_visit_table(table, partition)?; + visitor.post_visit_chunk(chunk)?; } } - visitor.post_visit_partition(partition)?; - } // next partition + } // next chunk Ok(()) } } -/// Common logic for processing and filtering tables in the write buffer +/// Common logic for processing and filtering tables in the mutable buffer /// -/// Note that since each partition has its own dictionary, mappings -/// between Strings --> we cache the String->id mappings per partition +/// Note that since each chunk has its own dictionary, mappings +/// between Strings --> we cache the String->id mappings per chunk /// /// b) the table doesn't have a column range that overlaps the /// predicate values, e.g., if you have env = "us-west" and a /// table's env column has the range ["eu-south", "us-north"]. #[derive(Debug)] -struct PartitionTableFilter { +struct ChunkTableFilter { predicate: Predicate, /// If specififed, only tables with all specified columns will be @@ -786,16 +586,16 @@ struct PartitionTableFilter { additional_required_columns: Option>, /// A 'compiled' version of the predicate to evaluate on tables / - /// columns in a particular partition during the walk - partition_predicate: Option, + /// columns in a particular chunk during the walk + chunk_predicate: Option, } -impl PartitionTableFilter { +impl ChunkTableFilter { fn new(predicate: Predicate) -> Self { Self { predicate, additional_required_columns: None, - partition_predicate: None, + chunk_predicate: None, } } @@ -817,34 +617,42 @@ impl PartitionTableFilter { self } - /// Called when each partition gets visited. Since ids are + /// Called when each chunk gets visited. Since ids are /// specific to each partitition, the predicates much get /// translated each time. - fn pre_visit_partition(&mut self, partition: &Partition) -> Result<()> { - let mut partition_predicate = partition.compile_predicate(&self.predicate)?; + fn pre_visit_chunk(&mut self, chunk: &Chunk) -> Result<()> { + let mut chunk_predicate = chunk.compile_predicate(&self.predicate)?; // add any additional column needs if let Some(additional_required_columns) = &self.additional_required_columns { - partition.add_required_columns_to_predicate( + chunk.add_required_columns_to_predicate( additional_required_columns, - &mut partition_predicate, + &mut chunk_predicate, ); } - self.partition_predicate = Some(partition_predicate); + self.chunk_predicate = Some(chunk_predicate); Ok(()) } /// If returns false, skips visiting _table and all its columns fn should_visit_table(&mut self, table: &Table) -> Result { - Ok(table.could_match_predicate(self.partition_predicate())?) + Ok(table.could_match_predicate(self.chunk_predicate())?) } - pub fn partition_predicate(&self) -> &PartitionPredicate { - self.partition_predicate + /// If returns false, skips visiting partition + fn should_visit_partition(&mut self, partition: &Partition) -> Result { + match &self.predicate.partition_key { + Some(partition_key) => Ok(partition.key() == partition_key), + None => Ok(true), + } + } + + pub fn chunk_predicate(&self) -> &ChunkPredicate { + self.chunk_predicate .as_ref() - .expect("Visited partition to compile predicate") + .expect("Visited chunk to compile predicate") } } @@ -852,14 +660,14 @@ impl PartitionTableFilter { /// timestamp range (has no general purpose predicates) struct NameVisitor { column_names: StringSet, - partition_column_ids: BTreeSet, + chunk_column_ids: BTreeSet, } impl NameVisitor { fn new() -> Self { Self { column_names: StringSet::new(), - partition_column_ids: BTreeSet::new(), + chunk_column_ids: BTreeSet::new(), } } } @@ -870,30 +678,32 @@ impl Visitor for NameVisitor { table: &Table, column_id: u32, column: &Column, - filter: &mut PartitionTableFilter, + filter: &mut ChunkTableFilter, ) -> Result<()> { if let Column::Tag(column, _) = column { - if table.column_matches_predicate(column, filter.partition_predicate())? { - self.partition_column_ids.insert(column_id); + if table.column_matches_predicate(column, filter.chunk_predicate())? { + self.chunk_column_ids.insert(column_id); } } Ok(()) } - fn pre_visit_partition(&mut self, _partition: &Partition) -> Result<()> { - self.partition_column_ids.clear(); + fn pre_visit_chunk(&mut self, _chunk: &Chunk) -> Result<()> { + self.chunk_column_ids.clear(); Ok(()) } - fn post_visit_partition(&mut self, partition: &Partition) -> Result<()> { - // convert all the partition's column_ids to Strings - for &column_id in &self.partition_column_ids { - let column_name = partition.dictionary.lookup_id(column_id).context( - ColumnIdNotFoundInDictionary { - column_id, - partition: &partition.key, - }, - )?; + fn post_visit_chunk(&mut self, chunk: &Chunk) -> Result<()> { + // convert all the chunk's column_ids to Strings + for &column_id in &self.chunk_column_ids { + let column_name = + chunk + .dictionary + .lookup_id(column_id) + .context(ColumnIdNotFoundInDictionary { + column_id, + chunk: &chunk.key, + })?; if !self.column_names.contains(column_name) { self.column_names.insert(column_name.to_string()); @@ -903,6 +713,41 @@ impl Visitor for NameVisitor { } } +/// Return all table names in this database, while applying a +/// general purpose predicates +struct TableNameVisitor { + table_names: BTreeSet, +} + +impl TableNameVisitor { + fn new() -> Self { + Self { + table_names: BTreeSet::new(), + } + } + fn into_inner(self) -> BTreeSet { + let Self { table_names } = self; + table_names + } +} + +impl Visitor for TableNameVisitor { + fn pre_visit_table( + &mut self, + table: &Table, + chunk: &Chunk, + _filter: &mut ChunkTableFilter, + ) -> Result<()> { + // If the table has rows that could match the filter, add it + // the table name should always have an encoded value in the dictionary + let table_name = chunk.dictionary.lookup_id(table.id).unwrap(); + if !self.table_names.contains(table_name) { + self.table_names.insert(table_name.to_string()); + } + Ok(()) + } +} + /// Return all column names in this database, while applying a /// general purpose predicates struct NamePredVisitor { @@ -919,11 +764,11 @@ impl Visitor for NamePredVisitor { fn pre_visit_table( &mut self, table: &Table, - partition: &Partition, - filter: &mut PartitionTableFilter, + chunk: &Chunk, + filter: &mut ChunkTableFilter, ) -> Result<()> { self.plans - .push(table.tag_column_names_plan(filter.partition_predicate(), partition)?); + .push(table.tag_column_names_plan(filter.chunk_predicate(), chunk)?); Ok(()) } } @@ -932,7 +777,7 @@ impl Visitor for NamePredVisitor { /// applying timestamp and other predicates #[derive(Debug)] struct TableFieldPredVisitor { - // As Each table can be spread across multiple Partitions, we + // As Each table can be spread across multiple Chunks, we // collect all the relevant plans and Union them together. plans: Vec, } @@ -941,11 +786,11 @@ impl Visitor for TableFieldPredVisitor { fn pre_visit_table( &mut self, table: &Table, - partition: &Partition, - filter: &mut PartitionTableFilter, + chunk: &Chunk, + filter: &mut ChunkTableFilter, ) -> Result<()> { self.plans - .push(table.field_names_plan(filter.partition_predicate(), partition)?); + .push(table.field_names_plan(filter.chunk_predicate(), chunk)?); Ok(()) } } @@ -965,13 +810,13 @@ impl TableFieldPredVisitor { /// in this database, while applying the timestamp range /// /// Potential optimizations: Run this in parallel (in different -/// futures) for each partition / table, rather than a single one +/// futures) for each chunk / table, rather than a single one /// -- but that will require building up parallel hash tables. struct ValueVisitor<'a> { column_name: &'a str, // what column id we are looking for column_id: Option, - partition_value_ids: BTreeSet, + chunk_value_ids: BTreeSet, column_values: StringSet, } @@ -981,24 +826,21 @@ impl<'a> ValueVisitor<'a> { column_name, column_id: None, column_values: StringSet::new(), - partition_value_ids: BTreeSet::new(), + chunk_value_ids: BTreeSet::new(), } } } impl<'a> Visitor for ValueVisitor<'a> { - fn pre_visit_partition(&mut self, partition: &Partition) -> Result<()> { - self.partition_value_ids.clear(); + fn pre_visit_chunk(&mut self, chunk: &Chunk) -> Result<()> { + self.chunk_value_ids.clear(); - self.column_id = Some( - partition - .dictionary - .lookup_value(self.column_name) - .context(ColumnNameNotFoundInDictionary { - column_name: self.column_name, - partition: &partition.key, - })?, - ); + self.column_id = Some(chunk.dictionary.lookup_value(self.column_name).context( + ColumnNameNotFoundInDictionary { + column_name: self.column_name, + chunk: &chunk.key, + }, + )?); Ok(()) } @@ -1008,7 +850,7 @@ impl<'a> Visitor for ValueVisitor<'a> { table: &Table, column_id: u32, column: &Column, - filter: &mut PartitionTableFilter, + filter: &mut ChunkTableFilter, ) -> Result<()> { if Some(column_id) != self.column_id { return Ok(()); @@ -1019,17 +861,17 @@ impl<'a> Visitor for ValueVisitor<'a> { // if we have a timestamp prediate, find all values // where the timestamp is within range. Otherwise take // all values. - let partition_predicate = filter.partition_predicate(); - match partition_predicate.range { + let chunk_predicate = filter.chunk_predicate(); + match chunk_predicate.range { None => { // take all non-null values column.iter().filter_map(|&s| s).for_each(|value_id| { - self.partition_value_ids.insert(value_id); + self.chunk_value_ids.insert(value_id); }); } Some(range) => { // filter out all values that don't match the timestmap - let time_column = table.column_i64(partition_predicate.time_column_id)?; + let time_column = table.column_i64(chunk_predicate.time_column_id)?; column .iter() @@ -1042,7 +884,7 @@ impl<'a> Visitor for ValueVisitor<'a> { } }) .for_each(|value_id| { - self.partition_value_ids.insert(value_id); + self.chunk_value_ids.insert(value_id); }); } } @@ -1055,13 +897,13 @@ impl<'a> Visitor for ValueVisitor<'a> { } } - fn post_visit_partition(&mut self, partition: &Partition) -> Result<()> { - // convert all the partition's column_ids to Strings - for &value_id in &self.partition_value_ids { - let value = partition.dictionary.lookup_id(value_id).context( + fn post_visit_chunk(&mut self, chunk: &Chunk) -> Result<()> { + // convert all the chunk's column_ids to Strings + for &value_id in &self.chunk_value_ids { + let value = chunk.dictionary.lookup_id(value_id).context( ColumnValueIdNotFoundInDictionary { value_id, - partition: &partition.key, + chunk: &chunk.key, }, )?; @@ -1095,15 +937,15 @@ impl<'a> Visitor for ValuePredVisitor<'a> { fn pre_visit_table( &mut self, table: &Table, - partition: &Partition, - filter: &mut PartitionTableFilter, + chunk: &Chunk, + filter: &mut ChunkTableFilter, ) -> Result<()> { // skip table entirely if there are no rows that fall in the timestamp - if table.could_match_predicate(filter.partition_predicate())? { + if table.could_match_predicate(filter.chunk_predicate())? { self.plans.push(table.tag_values_plan( self.column_name, - filter.partition_predicate(), - partition, + filter.chunk_predicate(), + chunk, )?); } Ok(()) @@ -1126,11 +968,11 @@ impl Visitor for SeriesVisitor { fn pre_visit_table( &mut self, table: &Table, - partition: &Partition, - filter: &mut PartitionTableFilter, + chunk: &Chunk, + filter: &mut ChunkTableFilter, ) -> Result<()> { self.plans - .push(table.series_set_plan(filter.partition_predicate(), partition)?); + .push(table.series_set_plan(filter.chunk_predicate(), chunk)?); Ok(()) } @@ -1158,14 +1000,14 @@ impl Visitor for GroupsVisitor { fn pre_visit_table( &mut self, table: &Table, - partition: &Partition, - filter: &mut PartitionTableFilter, + chunk: &Chunk, + filter: &mut ChunkTableFilter, ) -> Result<()> { self.plans.push(table.grouped_series_set_plan( - filter.partition_predicate(), + filter.chunk_predicate(), self.agg, &self.group_columns, - partition, + chunk, )?); Ok(()) @@ -1197,26 +1039,27 @@ impl Visitor for WindowGroupsVisitor { fn pre_visit_table( &mut self, table: &Table, - partition: &Partition, - filter: &mut PartitionTableFilter, + chunk: &Chunk, + filter: &mut ChunkTableFilter, ) -> Result<()> { self.plans.push(table.window_grouped_series_set_plan( - filter.partition_predicate(), + filter.chunk_predicate(), self.agg, &self.every, &self.offset, - partition, + chunk, )?); Ok(()) } } -// partition_key returns the partition key for the given line. The key will be -// the prefix of a partition name (multiple partitions can exist for each key). -// It uses the user defined partitioning rules to construct this key -pub fn partition_key(line: &ParsedLine<'_>) -> String { - // TODO - wire this up to use partitioning rules, for now just partition by day +// compute_partition_key returns the partititon key for the given +// line. The key will be the prefix of a chunk name (multiple chunks +// can exist for each key). It uses the user defined chunking rules +// to construct this key +pub fn compute_partition_key(line: &ParsedLine<'_>) -> String { + // TODO - wire this up to use chunking rules, for now just chunk by day let ts = line.timestamp.unwrap(); let dt = Utc.timestamp_nanos(ts); dt.format("%Y-%m-%dT%H").to_string() @@ -1232,21 +1075,21 @@ struct ArrowTable { mod tests { use super::*; use arrow_deps::datafusion::prelude::*; - use query::exec::{field::FieldIndexes, seriesset::SeriesSetItem}; - use query::{ exec::fieldlist::{Field, FieldList}, exec::{ - seriesset::{Error as SeriesSetError, SeriesSet}, + field::FieldIndexes, + seriesset::{Error as SeriesSetError, SeriesSet, SeriesSetItem}, Executor, }, predicate::PredicateBuilder, TSDatabase, }; - use arrow::{ + use arrow_deps::arrow::{ array::{Array, StringArray}, datatypes::DataType, + record_batch::RecordBatch, util::pretty::pretty_format_batches, }; use influxdb_line_protocol::parse_lines; @@ -1264,8 +1107,8 @@ mod tests { // output. The default failure message uses Debug formatting, which prints // newlines as `\n`. This prints the pretty_format_batches using Display so // it's easier to read the tables. - fn assert_table_eq(table: &str, partitions: &[arrow::record_batch::RecordBatch]) { - let res = pretty_format_batches(partitions).unwrap(); + fn assert_table_eq(table: &str, chunks: &[RecordBatch]) { + let res = pretty_format_batches(chunks).unwrap(); assert_eq!(table, res, "\n\nleft:\n\n{}\nright:\n\n{}", table, res); } @@ -1282,9 +1125,7 @@ mod tests { #[tokio::test] async fn list_table_names() -> Result { - let mut dir = test_helpers::tmp_dir()?.into_path(); - - let db = MutableBufferDb::try_with_wal("mydb", &mut dir).await?; + let db = MutableBufferDb::new("mydb"); // no tables initially assert_eq!( @@ -1310,9 +1151,7 @@ mod tests { #[tokio::test] async fn list_table_names_timestamps() -> Result { - let mut dir = test_helpers::tmp_dir()?.into_path(); - - let db = MutableBufferDb::try_with_wal("mydb", &mut dir).await?; + let db = MutableBufferDb::new("mydb"); // write two different tables at the following times: // cpu: 100 and 150 @@ -1346,9 +1185,7 @@ mod tests { #[tokio::test] async fn missing_tags_are_null() -> Result { - let mut dir = test_helpers::tmp_dir()?.into_path(); - - let db = MutableBufferDb::try_with_wal("mydb", &mut dir).await?; + let db = MutableBufferDb::new("mydb"); // Note the `region` tag is introduced in the second line, so // the values in prior rows for the region column are @@ -1363,8 +1200,8 @@ mod tests { .collect(); db.write_lines(&lines).await?; - let partitions = db.table_to_arrow("cpu", &["region", "core"]).await?; - let columns = partitions[0].columns(); + let chunks = db.table_to_arrow("cpu", &["region", "core"]).await?; + let columns = chunks[0].columns(); assert_eq!( 2, @@ -1398,89 +1235,6 @@ mod tests { Ok(()) } - #[tokio::test] - async fn write_data_and_recover() -> Result { - let mut dir = test_helpers::tmp_dir()?.into_path(); - - let expected_cpu_table = r#"+--------+------+------+-------+-------------+------+------+---------+-----------+ -| region | host | user | other | str | b | time | new_tag | new_field | -+--------+------+------+-------+-------------+------+------+---------+-----------+ -| west | A | 23.2 | 1 | some string | true | 10 | | | -| west | B | 23.1 | | | | 15 | | | -| | A | | | | | 20 | foo | 15.1 | -+--------+------+------+-------+-------------+------+------+---------+-----------+ -"#; - let expected_mem_table = r#"+--------+------+-------+------+ -| region | host | val | time | -+--------+------+-------+------+ -| east | C | 23432 | 10 | -+--------+------+-------+------+ -"#; - let expected_disk_table = r#"+--------+------+----------+--------------+------+ -| region | host | bytes | used_percent | time | -+--------+------+----------+--------------+------+ -| west | A | 23432323 | 76.2 | 10 | -+--------+------+----------+--------------+------+ -"#; - - let cpu_columns = &[ - "region", - "host", - "user", - "other", - "str", - "b", - "time", - "new_tag", - "new_field", - ]; - let mem_columns = &["region", "host", "val", "time"]; - let disk_columns = &["region", "host", "bytes", "used_percent", "time"]; - - { - let db = MutableBufferDb::try_with_wal("mydb", &mut dir).await?; - let lines: Vec<_> = parse_lines("cpu,region=west,host=A user=23.2,other=1i,str=\"some string\",b=true 10\ndisk,region=west,host=A bytes=23432323i,used_percent=76.2 10").map(|l| l.unwrap()).collect(); - db.write_lines(&lines).await?; - let lines: Vec<_> = parse_lines("cpu,region=west,host=B user=23.1 15") - .map(|l| l.unwrap()) - .collect(); - db.write_lines(&lines).await?; - let lines: Vec<_> = parse_lines("cpu,host=A,new_tag=foo new_field=15.1 20") - .map(|l| l.unwrap()) - .collect(); - db.write_lines(&lines).await?; - let lines: Vec<_> = parse_lines("mem,region=east,host=C val=23432 10") - .map(|l| l.unwrap()) - .collect(); - db.write_lines(&lines).await?; - - let partitions = db.table_to_arrow("cpu", cpu_columns).await?; - assert_table_eq(expected_cpu_table, &partitions); - - let partitions = db.table_to_arrow("mem", mem_columns).await?; - assert_table_eq(expected_mem_table, &partitions); - - let partitions = db.table_to_arrow("disk", disk_columns).await?; - assert_table_eq(expected_disk_table, &partitions); - } - - // check that it recovers from the wal - { - let db = MutableBufferDb::restore_from_wal(&dir).await?; - - let partitions = db.table_to_arrow("cpu", cpu_columns).await?; - assert_table_eq(expected_cpu_table, &partitions); - - let partitions = db.table_to_arrow("mem", mem_columns).await?; - assert_table_eq(expected_mem_table, &partitions); - - let partitions = db.table_to_arrow("disk", disk_columns).await?; - assert_table_eq(expected_disk_table, &partitions); - } - - Ok(()) - } - #[tokio::test] async fn write_and_query() -> Result { let db = MutableBufferDb::new("foo"); @@ -1504,124 +1258,6 @@ mod tests { Ok(()) } - #[tokio::test] - async fn recover_partial_entries() -> Result { - let mut dir = test_helpers::tmp_dir()?.into_path(); - - let expected_cpu_table = r#"+--------+------+------+-------+-------------+------+------+---------+-----------+ -| region | host | user | other | str | b | time | new_tag | new_field | -+--------+------+------+-------+-------------+------+------+---------+-----------+ -| west | A | 23.2 | 1 | some string | true | 10 | | | -| west | B | 23.1 | | | | 15 | | | -| | A | | | | | 20 | foo | 15.1 | -+--------+------+------+-------+-------------+------+------+---------+-----------+ -"#; - - let expected_mem_table = r#"+--------+------+-------+------+ -| region | host | val | time | -+--------+------+-------+------+ -| east | C | 23432 | 10 | -+--------+------+-------+------+ -"#; - let expected_disk_table = r#"+--------+------+----------+--------------+------+ -| region | host | bytes | used_percent | time | -+--------+------+----------+--------------+------+ -| west | A | 23432323 | 76.2 | 10 | -+--------+------+----------+--------------+------+ -"#; - - let cpu_columns = &[ - "region", - "host", - "user", - "other", - "str", - "b", - "time", - "new_tag", - "new_field", - ]; - let mem_columns = &["region", "host", "val", "time"]; - let disk_columns = &["region", "host", "bytes", "used_percent", "time"]; - { - let db = MutableBufferDb::try_with_wal("mydb", &mut dir).await?; - let lines: Vec<_> = parse_lines("cpu,region=west,host=A user=23.2,other=1i,str=\"some string\",b=true 10\ndisk,region=west,host=A bytes=23432323i,used_percent=76.2 10").map(|l| l.unwrap()).collect(); - db.write_lines(&lines).await?; - let lines: Vec<_> = parse_lines("cpu,region=west,host=B user=23.1 15") - .map(|l| l.unwrap()) - .collect(); - db.write_lines(&lines).await?; - let lines: Vec<_> = parse_lines("cpu,host=A,new_tag=foo new_field=15.1 20") - .map(|l| l.unwrap()) - .collect(); - db.write_lines(&lines).await?; - let lines: Vec<_> = parse_lines("mem,region=east,host=C val=23432 10") - .map(|l| l.unwrap()) - .collect(); - db.write_lines(&lines).await?; - - let partitions = db.table_to_arrow("cpu", cpu_columns).await?; - assert_table_eq(expected_cpu_table, &partitions); - - let partitions = db.table_to_arrow("mem", mem_columns).await?; - assert_table_eq(expected_mem_table, &partitions); - - let partitions = db.table_to_arrow("disk", disk_columns).await?; - assert_table_eq(expected_disk_table, &partitions); - } - - // check that it can recover from the last 2 self-describing entries of the wal - { - let name = dir.iter().last().unwrap().to_str().unwrap().to_string(); - - let wal_builder = WalBuilder::new(&dir); - - let wal_entries = wal_builder - .entries() - .context(LoadingWal { database: &name })?; - - // Skip the first 2 entries in the wal; only restore from the last 2 - let wal_entries = wal_entries.skip(2); - - let (partitions, _stats) = restore_partitions_from_wal(wal_entries)?; - - let db = MutableBufferDb { - name, - partitions: RwLock::new(partitions), - wal_details: None, - }; - - // some cpu - let smaller_cpu_table = r#"+------+---------+-----------+------+ -| host | new_tag | new_field | time | -+------+---------+-----------+------+ -| A | foo | 15.1 | 20 | -+------+---------+-----------+------+ -"#; - let smaller_cpu_columns = &["host", "new_tag", "new_field", "time"]; - let partitions = db.table_to_arrow("cpu", smaller_cpu_columns).await?; - assert_table_eq(smaller_cpu_table, &partitions); - - // all of mem - let partitions = db.table_to_arrow("mem", mem_columns).await?; - assert_table_eq(expected_mem_table, &partitions); - - // no disk - let nonexistent_table = db.table_to_arrow("disk", disk_columns).await; - assert!(nonexistent_table.is_err()); - let actual_message = format!("{:?}", nonexistent_table); - let expected_message = "TableNameNotFoundInDictionary"; - assert!( - actual_message.contains(expected_message), - "Did not find '{}' in '{}'", - expected_message, - actual_message - ); - } - - Ok(()) - } - #[tokio::test] async fn db_partition_key() -> Result { let partition_keys: Vec<_> = parse_lines( @@ -1629,7 +1265,7 @@ mod tests { cpu user=23.2 1600107710000000000 disk bytes=23432323i 1600136510000000000", ) - .map(|line| partition_key(&line.unwrap())) + .map(|line| compute_partition_key(&line.unwrap())) .collect(); assert_eq!(partition_keys, vec!["2020-09-14T18", "2020-09-15T02"]); @@ -1639,8 +1275,7 @@ disk bytes=23432323i 1600136510000000000", #[tokio::test] async fn list_column_names() -> Result { - let mut dir = test_helpers::tmp_dir()?.into_path(); - let db = MutableBufferDb::try_with_wal("column_namedb", &mut dir).await?; + let db = MutableBufferDb::new("column_namedb"); let lp_data = "h2o,state=CA,city=LA,county=LA temp=70.4 100\n\ h2o,state=MA,city=Boston,county=Suffolk temp=72.4 250\n\ @@ -1767,8 +1402,7 @@ disk bytes=23432323i 1600136510000000000", async fn list_column_names_predicate() -> Result { // Demonstration test to show column names with predicate working - let mut dir = test_helpers::tmp_dir()?.into_path(); - let db = MutableBufferDb::try_with_wal("column_namedb", &mut dir).await?; + let db = MutableBufferDb::new("column_namedb"); let lp_data = "h2o,state=CA,city=LA,county=LA temp=70.4 100\n\ h2o,state=MA,city=Boston,county=Suffolk temp=72.4 250\n\ @@ -1801,8 +1435,7 @@ disk bytes=23432323i 1600136510000000000", #[tokio::test] async fn list_column_values() -> Result { - let mut dir = test_helpers::tmp_dir()?.into_path(); - let db = MutableBufferDb::try_with_wal("column_namedb", &mut dir).await?; + let db = MutableBufferDb::new("column_namedb"); let lp_data = "h2o,state=CA,city=LA temp=70.4 100\n\ h2o,state=MA,city=Boston temp=72.4 250\n\ @@ -1957,8 +1590,7 @@ disk bytes=23432323i 1600136510000000000", // This test checks that everything is wired together // correctly. There are more detailed tests in table.rs that // test the generated queries. - let mut dir = test_helpers::tmp_dir()?.into_path(); - let db = MutableBufferDb::try_with_wal("column_namedb", &mut dir).await?; + let db = MutableBufferDb::new("column_namedb"); let mut lp_lines = vec![ "h2o,state=MA,city=Boston temp=70.4 100", // to row 2 @@ -2034,8 +1666,7 @@ disk bytes=23432323i 1600136510000000000", #[tokio::test] async fn test_query_series_filter() -> Result { // check the appropriate filters are applied in the datafusion plans - let mut dir = test_helpers::tmp_dir()?.into_path(); - let db = MutableBufferDb::try_with_wal("column_namedb", &mut dir).await?; + let db = MutableBufferDb::new("column_namedb"); let lp_lines = vec![ "h2o,state=MA,city=Boston temp=70.4 100", @@ -2084,8 +1715,7 @@ disk bytes=23432323i 1600136510000000000", #[tokio::test] async fn test_query_series_pred_refers_to_column_not_in_table() -> Result { - let mut dir = test_helpers::tmp_dir()?.into_path(); - let db = MutableBufferDb::try_with_wal("column_namedb", &mut dir).await?; + let db = MutableBufferDb::new("column_namedb"); let lp_lines = vec![ "h2o,state=MA,city=Boston temp=70.4 100", @@ -2144,10 +1774,7 @@ disk bytes=23432323i 1600136510000000000", expected = "Unsupported binary operator in expression: #state NotEq Utf8(\"MA\")" )] async fn test_query_series_pred_neq() { - let mut dir = test_helpers::tmp_dir().unwrap().into_path(); - let db = MutableBufferDb::try_with_wal("column_namedb", &mut dir) - .await - .unwrap(); + let db = MutableBufferDb::new("column_namedb"); let lp_lines = vec![ "h2o,state=MA,city=Boston temp=70.4 100", @@ -2171,8 +1798,7 @@ disk bytes=23432323i 1600136510000000000", async fn test_field_columns() -> Result { // Ensure that the database queries are hooked up correctly - let mut dir = test_helpers::tmp_dir()?.into_path(); - let db = MutableBufferDb::try_with_wal("column_namedb", &mut dir).await?; + let db = MutableBufferDb::new("column_namedb"); let lp_data = vec![ "h2o,state=MA,city=Boston temp=70.4 50", @@ -2185,7 +1811,7 @@ disk bytes=23432323i 1600136510000000000", let lines: Vec<_> = parse_lines(&lp_data).map(|l| l.unwrap()).collect(); db.write_lines(&lines).await?; - // write a new lp_line that is in a new day and thus a new partition + // write a new lp_line that is in a new day and thus a new chunk let nanoseconds_per_day: i64 = 1_000_000_000 * 60 * 60 * 24; let lp_data = vec![format!( @@ -2196,7 +1822,7 @@ disk bytes=23432323i 1600136510000000000", let lines: Vec<_> = parse_lines(&lp_data).map(|l| l.unwrap()).collect(); db.write_lines(&lines).await?; - // ensure there are 2 partitions + // ensure there are 2 chunks assert_eq!(db.len().await, 2); // setup to run the execution plan ( @@ -2219,7 +1845,7 @@ disk bytes=23432323i 1600136510000000000", .expect("Running fieldlist plan"); assert!(fieldlists.fields.is_empty()); - // get only fields from h20 (but both partitions) + // get only fields from h20 (but both chunks) let predicate = PredicateBuilder::default() .table("h2o") .add_expr(col("state").eq(lit("MA"))) // state=MA @@ -2267,8 +1893,7 @@ disk bytes=23432323i 1600136510000000000", #[tokio::test] async fn test_field_columns_timestamp_predicate() -> Result { // check the appropriate filters are applied in the datafusion plans - let mut dir = test_helpers::tmp_dir()?.into_path(); - let db = MutableBufferDb::try_with_wal("column_namedb", &mut dir).await?; + let db = MutableBufferDb::new("column_namedb"); let lp_data = vec![ "h2o,state=MA,city=Boston temp=70.4 50", diff --git a/mutable_buffer/src/lib.rs b/mutable_buffer/src/lib.rs index e022fa0881..34a3d505fe 100644 --- a/mutable_buffer/src/lib.rs +++ b/mutable_buffer/src/lib.rs @@ -1,4 +1,52 @@ -//! Contains an in memory write buffer that stores incoming data, durably. +//! Contains an in memory mutable buffer that stores incoming data in +//! a structure that is designed to be quickly appended to as well as queried +//! +//! The mutable buffer is structured in this way: +//! +//! ┌───────────────────────────────────────────────┐ +//! │ │ +//! │ ┌────────────────┐ │ +//! │ │ Database │ │ +//! │ └────────────────┘ │ +//! │ │ one partition per │ +//! │ │ partition_key │ +//! │ ▼ │ +//! │ ┌────────────────┐ │ +//! │ │ Partition │ │ +//! │ └────────────────┘ │ +//! │ │ one mutable Chunk │ +//! │ │ 0 or more immutable │ +//! │ ▼ Chunks │ +//! │ ┌────────────────┐ │ +//! │ │ Chunk │ │ +//! │ └────────────────┘ │ +//! │ │ multiple Tables (measurements) │ +//! │ ▼ │ +//! │ ┌────────────────┐ │ +//! │ │ Table │ │ +//! │ └────────────────┘ │ +//! │ │ multiple Colums │ +//! │ ▼ │ +//! │ ┌────────────────┐ │ +//! │ │ Column │ │ +//! │ └────────────────┘ │ +//! │ MutableBuffer │ +//! │ │ +//! └───────────────────────────────────────────────┘ +//! +//! Each row of data is routed into a particular partitions based on +//! column values in that row. The partition's active (mutable) chunk is updated +//! with the new data. +//! +//! The currently active chunk in a partition can be rolled over. When +//! this happens, the chunk becomes immutable (read-only) and stops taking +//! writes. Any new writes to the same partition will create a +//! a new active chunk. +//! +//! Note: Strings in the mutable buffer are dictionary encoded (via +//! string interning) to reduce memory usage. This dictionary encoding +//! is done on a per-Chunk basis, so that as soon as the chunk becomes +//! "immutable" the corresponding dictionary also becomes immutable #![deny(rust_2018_idioms)] #![warn( @@ -8,15 +56,15 @@ clippy::use_self )] +pub mod chunk; mod column; mod database; mod dictionary; -pub mod partition; +mod partition; mod store; mod table; -// Allow restore partitions to be used outside of this crate (for +// Allow restore chunks to be used outside of this crate (for // benchmarking) pub use crate::database::MutableBufferDb; -pub use crate::partition::restore_partitions_from_wal; pub use crate::store::MutableBufferDatabases; diff --git a/mutable_buffer/src/partition.rs b/mutable_buffer/src/partition.rs index d8335ea0a4..d240c037b8 100644 --- a/mutable_buffer/src/partition.rs +++ b/mutable_buffer/src/partition.rs @@ -1,551 +1,115 @@ -use arrow_deps::{ - arrow::record_batch::RecordBatch, - datafusion::{ - logical_plan::Expr, logical_plan::Operator, optimizer::utils::expr_to_column_names, - prelude::*, - }, -}; +//! Holds one or more Chunks. + use generated_types::wal as wb; -use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; -use wal::{Entry as WalEntry, Result as WalResult}; +use std::sync::Arc; -use data_types::{partition_metadata::Table as TableStats, TIME_COLUMN_NAME}; -use query::{ - predicate::{Predicate, TimestampRange}, - util::{visit_expression, AndExprBuilder, ExpressionVisitor}, -}; +use crate::chunk::{Chunk, Error as ChunkError}; -use crate::dictionary::{Dictionary, Error as DictionaryError}; -use crate::table::Table; - -use snafu::{OptionExt, ResultExt, Snafu}; +use snafu::{ResultExt, Snafu}; #[derive(Debug, Snafu)] pub enum Error { - #[snafu(display("Could not read WAL entry: {}", source))] - WalEntryRead { source: wal::Error }, - - #[snafu(display("Partition {} not found", partition))] - PartitionNotFound { partition: String }, - #[snafu(display( - "Column name {} not found in dictionary of partition {}", - column, - partition + "Error writing to active chunk of partition with key '{}': {}", + partition_key, + source ))] - ColumnNameNotFoundInDictionary { - column: String, - partition: String, - source: crate::dictionary::Error, + WritingChunkData { + partition_key: String, + source: ChunkError, }, - - #[snafu(display("Error writing table '{}': {}", table_name, source))] - TableWrite { - table_name: String, - source: crate::table::Error, - }, - - #[snafu(display("Table Error in '{}': {}", table_name, source))] - NamedTableError { - table_name: String, - source: crate::table::Error, - }, - - #[snafu(display( - "Table name {} not found in dictionary of partition {}", - table, - partition - ))] - TableNameNotFoundInDictionary { - table: String, - partition: String, - source: crate::dictionary::Error, - }, - - #[snafu(display( - "Table ID {} not found in dictionary of partition {}", - table, - partition - ))] - TableIdNotFoundInDictionary { - table: u32, - partition: String, - source: DictionaryError, - }, - - #[snafu(display("Table {} not found in partition {}", table, partition))] - TableNotFoundInPartition { table: u32, partition: String }, - - #[snafu(display("Attempt to write table batch without a name"))] - TableWriteWithoutName, - - #[snafu(display("Error restoring WAL entry, missing partition key"))] - MissingPartitionKey, } pub type Result = std::result::Result; #[derive(Debug)] pub struct Partition { - pub key: String, + /// The partition key that is shared by all Chunks in this Partition + key: String, - /// `dictionary` maps &str -> u32. The u32s are used in place of String or - /// str to avoid slow string operations. The same dictionary is used for - /// table names, tag names, tag values, and column names. - // TODO: intern string field values too? - pub dictionary: Dictionary, + /// The active mutable Chunk; All new writes go to this chunk + mutable_chunk: Chunk, - /// map of the dictionary ID for the table name to the table - pub tables: HashMap, - - pub is_open: bool, -} - -/// Describes the result of translating a set of strings into -/// partition specific ids -#[derive(Debug, PartialEq, Eq)] -pub enum PartitionIdSet { - /// At least one of the strings was not present in the partitions' - /// dictionary. - /// - /// This is important when testing for the presence of all ids in - /// a set, as we know they can not all be present - AtLeastOneMissing, - - /// All strings existed in this partition's dictionary - Present(BTreeSet), -} - -/// a 'Compiled' set of predicates / filters that can be evaluated on -/// this partition (where strings have been translated to partition -/// specific u32 ids) -#[derive(Debug)] -pub struct PartitionPredicate { - /// If present, restrict the request to just those tables whose - /// names are in table_names. If present but empty, means there - /// was a predicate but no tables named that way exist in the - /// partition (so no table can pass) - pub table_name_predicate: Option>, - - /// Optional column restriction. If present, further - /// restrict any field columns returned to only those named, and - /// skip tables entirely when querying metadata that do not have - /// *any* of the fields - pub field_name_predicate: Option>, - - /// General DataFusion expressions (arbitrary predicates) applied - /// as a filter using logical conjuction (aka are 'AND'ed - /// together). Only rows that evaluate to TRUE for all these - /// expressions should be returned. - pub partition_exprs: Vec, - - /// If Some, then the table must contain all columns specified - /// to pass the predicate - pub required_columns: Option, - - /// The id of the "time" column in this partition - pub time_column_id: u32, - - /// Timestamp range: only rows within this range should be considered - pub range: Option, -} - -impl PartitionPredicate { - /// Creates and adds a datafuson predicate representing the - /// combination of predicate and timestamp. - pub fn filter_expr(&self) -> Option { - // build up a list of expressions - let mut builder = - AndExprBuilder::default().append_opt(self.make_timestamp_predicate_expr()); - - for expr in &self.partition_exprs { - builder = builder.append_expr(expr.clone()); - } - - builder.build() - } - - /// For plans which select a subset of fields, returns true if - /// the field should be included in the results - pub fn should_include_field(&self, field_id: u32) -> bool { - match &self.field_name_predicate { - None => true, - Some(field_restriction) => field_restriction.contains(&field_id), - } - } - - /// Return true if this column is the time column - pub fn is_time_column(&self, id: u32) -> bool { - self.time_column_id == id - } - - /// Creates a DataFusion predicate for appliying a timestamp range: - /// - /// range.start <= time and time < range.end` - fn make_timestamp_predicate_expr(&self) -> Option { - self.range.map(|range| make_range_expr(&range)) - } -} - -/// Creates expression like: -/// range.low <= time && time < range.high -fn make_range_expr(range: &TimestampRange) -> Expr { - let ts_low = lit(range.start).lt_eq(col(TIME_COLUMN_NAME)); - let ts_high = col(TIME_COLUMN_NAME).lt(lit(range.end)); - - ts_low.and(ts_high) + /// Immutable chunks which can no longer be written + immutable_chunks: Vec>, } impl Partition { pub fn new(key: impl Into) -> Self { + let key: String = key.into(); + let mutable_chunk = Chunk::new(&key); Self { - key: key.into(), - dictionary: Dictionary::new(), - tables: HashMap::new(), - is_open: true, + key, + mutable_chunk, + immutable_chunks: Vec::new(), } } + /// write data to the active mutable chunk pub fn write_entry(&mut self, entry: &wb::WriteBufferEntry<'_>) -> Result<()> { - if let Some(table_batches) = entry.table_batches() { - for batch in table_batches { - self.write_table_batch(&batch)?; - } - } - - Ok(()) + assert_eq!( + entry + .partition_key() + .expect("partition key should be present"), + self.key + ); + self.mutable_chunk + .write_entry(entry) + .with_context(|| WritingChunkData { + partition_key: entry.partition_key().unwrap(), + }) } - fn write_table_batch(&mut self, batch: &wb::TableWriteBatch<'_>) -> Result<()> { - let table_name = batch.name().context(TableWriteWithoutName)?; - let table_id = self.dictionary.lookup_value_or_insert(table_name); - - let table = self - .tables - .entry(table_id) - .or_insert_with(|| Table::new(table_id)); - - if let Some(rows) = batch.rows() { - table - .append_rows(&mut self.dictionary, &rows) - .context(TableWrite { table_name })?; - } - - Ok(()) + /// roll over the active chunk into the immutable_chunks, + /// returning the most recently active chunk. Any new writes to + /// this partition will go to a new chunk + pub fn rollover_chunk(&mut self) -> Arc { + todo!(); } - /// Translates `predicate` into per-partition ids that can be - /// directly evaluated against tables in this partition - pub fn compile_predicate(&self, predicate: &Predicate) -> Result { - let table_name_predicate = self.compile_string_list(predicate.table_names.as_ref()); - - let field_restriction = self.compile_string_list(predicate.field_columns.as_ref()); - - let time_column_id = self - .dictionary - .lookup_value(TIME_COLUMN_NAME) - .expect("time is in the partition dictionary"); - - let range = predicate.range; - - // it would be nice to avoid cloning all the exprs here. - let partition_exprs = predicate.exprs.clone(); - - // In order to evaluate expressions in the table, all columns - // referenced in the expression must appear (I think, not sure - // about NOT, etc so panic if we see one of those); - let mut visitor = SupportVisitor {}; - let mut predicate_columns: HashSet = HashSet::new(); - for expr in &partition_exprs { - visit_expression(expr, &mut visitor); - expr_to_column_names(&expr, &mut predicate_columns).unwrap(); - } - - // if there are any column references in the expression, ensure they appear in - // any table - let required_columns = if predicate_columns.is_empty() { - None - } else { - Some(self.make_partition_ids(predicate_columns.iter())) - }; - - Ok(PartitionPredicate { - table_name_predicate, - field_name_predicate: field_restriction, - partition_exprs, - required_columns, - time_column_id, - range, - }) - } - - /// Converts a potential set of strings into a set of ids in terms - /// of this dictionary. If there are no matching Strings in the - /// partitions dictionary, those strings are ignored and a - /// (potentially empty) set is returned. - fn compile_string_list(&self, names: Option<&BTreeSet>) -> Option> { - names.map(|names| { - names - .iter() - .filter_map(|name| self.dictionary.id(name)) - .collect::>() - }) - } - - /// Adds the ids of any columns in additional_required_columns to the - /// required columns of predicate - pub fn add_required_columns_to_predicate( - &self, - additional_required_columns: &HashSet, - predicate: &mut PartitionPredicate, - ) { - for column_name in additional_required_columns { - // Once know we have missing columns, no need to try - // and figure out if these any additional columns are needed - if Some(PartitionIdSet::AtLeastOneMissing) == predicate.required_columns { - return; - } - - let column_id = self.dictionary.id(column_name); - - // Update the required colunm list - predicate.required_columns = Some(match predicate.required_columns.take() { - None => { - if let Some(column_id) = column_id { - let mut symbols = BTreeSet::new(); - symbols.insert(column_id); - PartitionIdSet::Present(symbols) - } else { - PartitionIdSet::AtLeastOneMissing - } - } - Some(PartitionIdSet::Present(mut symbols)) => { - if let Some(column_id) = column_id { - symbols.insert(column_id); - PartitionIdSet::Present(symbols) - } else { - PartitionIdSet::AtLeastOneMissing - } - } - Some(PartitionIdSet::AtLeastOneMissing) => { - unreachable!("Covered by case above while adding required columns to predicate") - } - }); - } - } - - /// returns true if data with partition key `key` should be - /// written to this partition, - pub fn should_write(&self, key: &str) -> bool { - self.key.starts_with(key) && self.is_open - } - - /// Convert the table specified in this partition into an arrow record batch - pub fn table_to_arrow(&self, table_name: &str, columns: &[&str]) -> Result { - let table = self.table(table_name)?; - - table - .to_arrow(&self, columns) - .context(NamedTableError { table_name }) - } - - /// Returns a vec of the summary statistics of the tables in this partition - pub fn table_stats(&self) -> Result> { - let mut stats = Vec::with_capacity(self.tables.len()); - - for (id, table) in &self.tables { - let name = self - .dictionary - .lookup_id(*id) - .context(TableIdNotFoundInDictionary { - table: *id, - partition: &self.key, - })?; - - let columns = table.stats(); - - stats.push(TableStats { - name: name.to_string(), - columns, - }); - } - - Ok(stats) - } - - fn table(&self, table_name: &str) -> Result<&Table> { - let table_id = - self.dictionary - .lookup_value(table_name) - .context(TableNameNotFoundInDictionary { - table: table_name, - partition: &self.key, - })?; - - let table = self - .tables - .get(&table_id) - .context(TableNotFoundInPartition { - table: table_id, - partition: &self.key, - })?; - - Ok(table) - } - - /// Translate a bunch of strings into a set of ids relative to this - /// partition - pub fn make_partition_ids<'a, I>(&self, predicate_columns: I) -> PartitionIdSet - where - I: Iterator, - { - let mut symbols = BTreeSet::new(); - for column_name in predicate_columns { - if let Some(column_id) = self.dictionary.id(column_name) { - symbols.insert(column_id); - } else { - return PartitionIdSet::AtLeastOneMissing; - } - } - - PartitionIdSet::Present(symbols) - } -} - -impl query::PartitionChunk for Partition { - type Error = Error; - - fn key(&self) -> &str { + pub fn key(&self) -> &str { &self.key } - fn table_stats(&self) -> Result, Self::Error> { - self.table_stats() - } - - fn table_to_arrow( - &self, - table_name: &str, - columns: &[&str], - ) -> Result { - self.table_to_arrow(table_name, columns) + /// Return an iterator over each Chunk in this partition + pub fn iter(&self) -> ChunkIter<'_> { + ChunkIter::new(self) } } -/// Used to figure out if we know how to deal with this kind of -/// predicate in the write buffer -struct SupportVisitor {} +pub struct ChunkIter<'a> { + partition: &'a Partition, + visited_mutable: bool, + next_immutable_index: usize, +} -impl ExpressionVisitor for SupportVisitor { - fn pre_visit(&mut self, expr: &Expr) { - match expr { - Expr::Literal(..) => {} - Expr::Column(..) => {} - Expr::BinaryExpr { op, .. } => { - match op { - Operator::Eq - | Operator::Lt - | Operator::LtEq - | Operator::Gt - | Operator::GtEq - | Operator::Plus - | Operator::Minus - | Operator::Multiply - | Operator::Divide - | Operator::And - | Operator::Or => {} - // Unsupported (need to think about ramifications) - Operator::NotEq | Operator::Modulus | Operator::Like | Operator::NotLike => { - panic!("Unsupported binary operator in expression: {:?}", expr) - } - } - } - _ => panic!( - "Unsupported expression in mutable_buffer database: {:?}", - expr - ), +impl<'a> ChunkIter<'a> { + fn new(partition: &'a Partition) -> Self { + Self { + partition, + visited_mutable: false, + next_immutable_index: 0, } } } -#[derive(Default, Debug)] -pub struct RestorationStats { - pub row_count: usize, - pub tables: BTreeSet, -} +/// Iterates over chunks in a partition +impl<'a> Iterator for ChunkIter<'a> { + type Item = &'a Chunk; -/// Given a set of WAL entries, restore them into a set of Partitions. -pub fn restore_partitions_from_wal( - wal_entries: impl Iterator>, -) -> Result<(Vec, RestorationStats)> { - let mut stats = RestorationStats::default(); + fn next(&mut self) -> Option { + let partition = self.partition; - let mut partitions = BTreeMap::new(); - - for wal_entry in wal_entries { - let wal_entry = wal_entry.context(WalEntryRead)?; - let bytes = wal_entry.as_data(); - - let batch = flatbuffers::get_root::>(&bytes); - - if let Some(entries) = batch.entries() { - for entry in entries { - let partition_key = entry.partition_key().context(MissingPartitionKey)?; - - if !partitions.contains_key(partition_key) { - partitions.insert( - partition_key.to_string(), - Partition::new(partition_key.to_string()), - ); - } - - let partition = partitions - .get_mut(partition_key) - .context(PartitionNotFound { - partition: partition_key, - })?; - - partition.write_entry(&entry)?; - } + if !self.visited_mutable { + let chunk = &partition.mutable_chunk; + self.visited_mutable = true; + Some(chunk) + } else if self.next_immutable_index < partition.immutable_chunks.len() { + let chunk = &self.partition.immutable_chunks[self.next_immutable_index]; + self.next_immutable_index += 1; + Some(chunk) + } else { + None } } - let partitions = partitions - .into_iter() - .map(|(_, p)| p) - .collect::>(); - - // compute the stats - for p in &partitions { - for (id, table) in &p.tables { - let name = p - .dictionary - .lookup_id(*id) - .expect("table id wasn't inserted into dictionary on restore"); - if !stats.tables.contains(name) { - stats.tables.insert(name.to_string()); - } - - stats.row_count += table.row_count(); - } - } - - Ok((partitions, stats)) -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_make_range_expr() { - // Test that the generated predicate is correct - - let range = TimestampRange::new(101, 202); - - let ts_predicate_expr = make_range_expr(&range); - let expected_string = "Int64(101) LtEq #time And #time Lt Int64(202)"; - let actual_string = format!("{:?}", ts_predicate_expr); - - assert_eq!(actual_string, expected_string); - } } diff --git a/mutable_buffer/src/store.rs b/mutable_buffer/src/store.rs index 6693b5ea6b..a5e9b9ed40 100644 --- a/mutable_buffer/src/store.rs +++ b/mutable_buffer/src/store.rs @@ -1,9 +1,9 @@ use async_trait::async_trait; use query::DatabaseStore; -use snafu::{ResultExt, Snafu}; +use snafu::Snafu; use tokio::sync::RwLock; -use std::{fs, sync::Arc}; +use std::sync::Arc; use std::{collections::BTreeMap, path::PathBuf}; @@ -16,60 +16,16 @@ pub enum Error { dir: PathBuf, source: std::io::Error, }, - - #[snafu(display("Error from database: {}", source))] - DatabaseError { source: crate::database::Error }, - - #[snafu(display("Error reading metadata: {}", source))] - ReadMetadataError { source: std::io::Error }, } pub type Result = std::result::Result; -#[derive(Debug)] +#[derive(Debug, Default)] pub struct MutableBufferDatabases { databases: RwLock>>, - base_dir: PathBuf, } impl MutableBufferDatabases { - pub fn new(base_dir: impl Into) -> Self { - Self { - databases: RwLock::new(BTreeMap::new()), - base_dir: base_dir.into(), - } - } - - /// wal_dirs will traverse the directories from the service base directory - /// and return the directories that contain WALs for databases, which - /// can be used to restore those DBs. - pub fn wal_dirs(&self) -> Result> { - let entries = fs::read_dir(&self.base_dir).context(ReadError { - dir: &self.base_dir, - })?; - - let mut dirs = vec![]; - - for entry in entries { - let entry = entry.context(ReadError { - dir: &self.base_dir, - })?; - - let meta = entry.metadata().context(ReadMetadataError {})?; - if meta.is_dir() { - if let Some(p) = entry.path().iter().last() { - if let Some(s) = p.to_str() { - if !s.starts_with('.') { - dirs.push(entry.path()); - }; - } - }; - }; - } - - Ok(dirs) - } - pub async fn add_db(&self, db: MutableBufferDb) { let mut databases = self.databases.write().await; databases.insert(db.name.clone(), Arc::new(db)); @@ -106,9 +62,7 @@ impl DatabaseStore for MutableBufferDatabases { return Ok(db.clone()); } - let db = MutableBufferDb::try_with_wal(name, &mut self.base_dir.clone()) - .await - .context(DatabaseError)?; + let db = MutableBufferDb::new(name); let db = Arc::new(db); databases.insert(name.to_string(), db.clone()); diff --git a/mutable_buffer/src/table.rs b/mutable_buffer/src/table.rs index 5a80a22a56..b3eb6c5cff 100644 --- a/mutable_buffer/src/table.rs +++ b/mutable_buffer/src/table.rs @@ -10,11 +10,11 @@ use tracing::debug; use std::{collections::BTreeSet, collections::HashMap, sync::Arc}; use crate::{ + chunk::ChunkIdSet, + chunk::{Chunk, ChunkPredicate}, column, column::Column, dictionary::{Dictionary, Error as DictionaryError}, - partition::PartitionIdSet, - partition::{Partition, PartitionPredicate}, }; use data_types::{partition_metadata::Column as ColumnStats, TIME_COLUMN_NAME}; use snafu::{OptionExt, ResultExt, Snafu}; @@ -35,39 +35,13 @@ use arrow_deps::{ #[derive(Debug, Snafu)] pub enum Error { - #[snafu(display("Table {} not found", table))] - TableNotFound { table: String }, - - #[snafu(display( - "Column {} said it was type {} but extracting a value of that type failed", - column, - expected - ))] - WalValueTypeMismatch { column: String, expected: String }, - - #[snafu(display( - "Tag value ID {} not found in dictionary of partition {}", - value, - partition - ))] + #[snafu(display("Tag value ID {} not found in dictionary of chunk {}", value, chunk))] TagValueIdNotFoundInDictionary { value: u32, - partition: String, + chunk: String, source: DictionaryError, }, - #[snafu(display( - "Column type mismatch for column {}: can't insert {} into column with type {}", - column, - inserted_value_type, - existing_column_type - ))] - ColumnTypeMismatch { - column: String, - existing_column_type: String, - inserted_value_type: String, - }, - #[snafu(display("Column error on column {}: {}", column, source))] ColumnError { column: String, @@ -93,39 +67,27 @@ pub enum Error { InternalAggregateNotSelector { agg: Aggregate }, #[snafu(display( - "Column name '{}' not found in dictionary of partition {}", + "Column name '{}' not found in dictionary of chunk {}", column_name, - partition + chunk ))] ColumnNameNotFoundInDictionary { column_name: String, - partition: String, + chunk: String, source: DictionaryError, }, #[snafu(display( - "Internal: Column id '{}' not found in dictionary of partition {}", + "Internal: Column id '{}' not found in dictionary of chunk {}", column_id, - partition + chunk ))] ColumnIdNotFoundInDictionary { column_id: u32, - partition: String, + chunk: String, source: DictionaryError, }, - #[snafu(display( - "Schema mismatch: for column {}: can't insert {} into column with type {}", - column, - inserted_value_type, - existing_column_type - ))] - SchemaMismatch { - column: u32, - existing_column_type: String, - inserted_value_type: String, - }, - #[snafu(display("Error building plan: {}", source))] BuildingPlan { source: datafusion::error::DataFusionError, @@ -134,12 +96,6 @@ pub enum Error { #[snafu(display("arrow conversion error: {}", source))] ArrowError { source: arrow::error::ArrowError }, - #[snafu(display("Schema mismatch: for column {}: {}", column, source))] - InternalSchemaMismatch { - column: u32, - source: crate::column::Error, - }, - #[snafu(display( "No index entry found for column {} with id {}", column_name, @@ -177,20 +133,15 @@ pub enum Error { #[snafu(display("Duplicate group column '{}'", column_name))] DuplicateGroupColumn { column_name: String }, - - #[snafu(display("Internal error converting schema to DFSchema: {}", source))] - InternalConvertingSchema { - source: datafusion::error::DataFusionError, - }, } pub type Result = std::result::Result; #[derive(Debug)] pub struct Table { - /// Name of the table as a u32 in the partition dictionary + /// Name of the table as a u32 in the chunk dictionary pub id: u32, - /// Maps column name (as a u32 in the partition dictionary) to an index in + /// Maps column name (as a u32 in the chunk dictionary) to an index in /// self.columns pub column_id_to_index: HashMap, @@ -297,9 +248,9 @@ impl Table { /// combination of predicate and timestamp. Returns the builder fn add_datafusion_predicate( plan_builder: LogicalPlanBuilder, - partition_predicate: &PartitionPredicate, + chunk_predicate: &ChunkPredicate, ) -> Result { - match partition_predicate.filter_expr() { + match chunk_predicate.filter_expr() { Some(df_predicate) => plan_builder.filter(df_predicate).context(BuildingPlan), None => Ok(plan_builder), } @@ -316,12 +267,12 @@ impl Table { /// InMemoryScan pub fn tag_column_names_plan( &self, - partition_predicate: &PartitionPredicate, - partition: &Partition, + chunk_predicate: &ChunkPredicate, + chunk: &Chunk, ) -> Result { - let need_time_column = partition_predicate.range.is_some(); + let need_time_column = chunk_predicate.range.is_some(); - let time_column_id = partition_predicate.time_column_id; + let time_column_id = chunk_predicate.time_column_id; // figure out the tag columns let requested_columns_with_index = self @@ -338,7 +289,7 @@ impl Table { if need_column { // the id came out of our map, so it should always be valid - let column_name = partition.dictionary.lookup_id(column_id).unwrap(); + let column_name = chunk.dictionary.lookup_id(column_id).unwrap(); Some((column_name, column_index)) } else { None @@ -347,7 +298,7 @@ impl Table { .collect::>(); // TODO avoid materializing here - let data = self.to_arrow_impl(partition, &requested_columns_with_index)?; + let data = self.to_arrow_impl(chunk, &requested_columns_with_index)?; let schema = data.schema(); @@ -356,7 +307,7 @@ impl Table { let plan_builder = LogicalPlanBuilder::scan_memory(vec![vec![data]], schema, projection) .context(BuildingPlan)?; - let plan_builder = Self::add_datafusion_predicate(plan_builder, partition_predicate)?; + let plan_builder = Self::add_datafusion_predicate(plan_builder, chunk_predicate)?; // add optional selection to remove time column let plan_builder = if !need_time_column { @@ -384,7 +335,7 @@ impl Table { debug!( "Created column_name plan for table '{}':\n{}", - partition.dictionary.lookup_id(self.id).unwrap(), + chunk.dictionary.lookup_id(self.id).unwrap(), plan.display_indent_schema() ); @@ -402,11 +353,11 @@ impl Table { pub fn tag_values_plan( &self, column_name: &str, - partition_predicate: &PartitionPredicate, - partition: &Partition, + chunk_predicate: &ChunkPredicate, + chunk: &Chunk, ) -> Result { // Scan and Filter - let plan_builder = self.scan_with_predicates(partition_predicate, partition)?; + let plan_builder = self.scan_with_predicates(chunk_predicate, chunk)?; let select_exprs = vec![col(column_name)]; @@ -430,10 +381,10 @@ impl Table { /// same) occur together in the plan pub fn series_set_plan( &self, - partition_predicate: &PartitionPredicate, - partition: &Partition, + chunk_predicate: &ChunkPredicate, + chunk: &Chunk, ) -> Result { - self.series_set_plan_impl(partition_predicate, None, partition) + self.series_set_plan_impl(chunk_predicate, None, chunk) } /// Creates the plans for computing series set, ensuring that @@ -447,12 +398,12 @@ impl Table { /// InMemoryScan pub fn series_set_plan_impl( &self, - partition_predicate: &PartitionPredicate, + chunk_predicate: &ChunkPredicate, prefix_columns: Option<&[String]>, - partition: &Partition, + chunk: &Chunk, ) -> Result { let (mut tag_columns, field_columns) = - self.tag_and_field_column_names(partition_predicate, partition)?; + self.tag_and_field_column_names(chunk_predicate, chunk)?; // reorder tag_columns to have the prefix columns, if requested if let Some(prefix_columns) = prefix_columns { @@ -462,7 +413,7 @@ impl Table { // TODO avoid materializing all the columns here (ideally we // would use a data source and then let DataFusion prune out // column references during its optimizer phase). - let plan_builder = self.scan_with_predicates(partition_predicate, partition)?; + let plan_builder = self.scan_with_predicates(chunk_predicate, chunk)?; let mut sort_exprs = Vec::new(); sort_exprs.extend(tag_columns.iter().map(|c| c.into_sort_expr())); @@ -483,7 +434,7 @@ impl Table { let plan = plan_builder.build().context(BuildingPlan)?; Ok(SeriesSetPlan::new_from_shared_timestamp( - self.table_name(partition), + self.table_name(chunk), plan, tag_columns, field_columns, @@ -491,18 +442,18 @@ impl Table { } /// Returns a LogialPlannBuilder which scans all columns in this - /// Table and has applied any predicates in `partition_predicate` + /// Table and has applied any predicates in `chunk_predicate` /// /// Filter(predicate) /// InMemoryScan fn scan_with_predicates( &self, - partition_predicate: &PartitionPredicate, - partition: &Partition, + chunk_predicate: &ChunkPredicate, + chunk: &Chunk, ) -> Result { // TODO avoid materializing all the columns here (ideally // DataFusion can prune some of them out) - let data = self.all_to_arrow(partition)?; + let data = self.all_to_arrow(chunk)?; let schema = data.schema(); @@ -513,13 +464,13 @@ impl Table { .context(BuildingPlan)?; // Filtering - Self::add_datafusion_predicate(plan_builder, partition_predicate) + Self::add_datafusion_predicate(plan_builder, chunk_predicate) } /// Look up this table's name as a string - fn table_name(&self, partition: &Partition) -> Arc { + fn table_name(&self, chunk: &Chunk) -> Arc { // I wonder if all this string creation will be too slow? - let table_name = partition + let table_name = chunk .dictionary .lookup_id(self.id) .expect("looking up table name in dictionary") @@ -533,17 +484,17 @@ impl Table { /// series_set_plan_impl for more details. pub fn grouped_series_set_plan( &self, - partition_predicate: &PartitionPredicate, + chunk_predicate: &ChunkPredicate, agg: Aggregate, group_columns: &[String], - partition: &Partition, + chunk: &Chunk, ) -> Result { let num_prefix_tag_group_columns = group_columns.len(); let plan = if let Aggregate::None = agg { - self.series_set_plan_impl(partition_predicate, Some(&group_columns), partition)? + self.series_set_plan_impl(chunk_predicate, Some(&group_columns), chunk)? } else { - self.aggregate_series_set_plan(partition_predicate, agg, group_columns, partition)? + self.aggregate_series_set_plan(chunk_predicate, agg, group_columns, chunk)? }; Ok(plan.grouped(num_prefix_tag_group_columns)) @@ -589,20 +540,20 @@ impl Table { /// InMemoryScan pub fn aggregate_series_set_plan( &self, - partition_predicate: &PartitionPredicate, + chunk_predicate: &ChunkPredicate, agg: Aggregate, group_columns: &[String], - partition: &Partition, + chunk: &Chunk, ) -> Result { let (tag_columns, field_columns) = - self.tag_and_field_column_names(partition_predicate, partition)?; + self.tag_and_field_column_names(chunk_predicate, chunk)?; // order the tag columns so that the group keys come first (we will group and // order in the same order) let tag_columns = reorder_prefix(group_columns, tag_columns)?; // Scan and Filter - let plan_builder = self.scan_with_predicates(partition_predicate, partition)?; + let plan_builder = self.scan_with_predicates(chunk_predicate, chunk)?; // Group by all tag columns let group_exprs = tag_columns @@ -614,7 +565,7 @@ impl Table { agg_exprs, field_columns, } = AggExprs::new(agg, field_columns, |col_name| { - let index = self.column_index(partition, col_name)?; + let index = self.column_index(chunk, col_name)?; Ok(self.columns[index].data_type()) })?; @@ -633,7 +584,7 @@ impl Table { let plan = plan_builder.build().context(BuildingPlan)?; Ok(SeriesSetPlan::new( - self.table_name(partition), + self.table_name(chunk), plan, tag_columns, field_columns, @@ -670,17 +621,17 @@ impl Table { /// InMemoryScan pub fn window_grouped_series_set_plan( &self, - partition_predicate: &PartitionPredicate, + chunk_predicate: &ChunkPredicate, agg: Aggregate, every: &WindowDuration, offset: &WindowDuration, - partition: &Partition, + chunk: &Chunk, ) -> Result { let (tag_columns, field_columns) = - self.tag_and_field_column_names(partition_predicate, partition)?; + self.tag_and_field_column_names(chunk_predicate, chunk)?; // Scan and Filter - let plan_builder = self.scan_with_predicates(partition_predicate, partition)?; + let plan_builder = self.scan_with_predicates(chunk_predicate, chunk)?; // Group by all tag columns and the window bounds let mut group_exprs = tag_columns @@ -714,7 +665,7 @@ impl Table { let plan = plan_builder.build().context(BuildingPlan)?; Ok(SeriesSetPlan::new_from_shared_timestamp( - self.table_name(partition), + self.table_name(chunk), plan, tag_columns, field_columns, @@ -735,15 +686,15 @@ impl Table { /// InMemoryScan pub fn field_names_plan( &self, - partition_predicate: &PartitionPredicate, - partition: &Partition, + chunk_predicate: &ChunkPredicate, + chunk: &Chunk, ) -> Result { // Scan and Filter - let plan_builder = self.scan_with_predicates(partition_predicate, partition)?; + let plan_builder = self.scan_with_predicates(chunk_predicate, chunk)?; // Selection let select_exprs = self - .field_and_time_column_names(partition_predicate, partition) + .field_and_time_column_names(chunk_predicate, chunk) .into_iter() .map(|c| c.into_expr()) .collect::>(); @@ -759,14 +710,14 @@ impl Table { // have been applied. The vectors are sorted by lexically by name. fn tag_and_field_column_names( &self, - partition_predicate: &PartitionPredicate, - partition: &Partition, + chunk_predicate: &ChunkPredicate, + chunk: &Chunk, ) -> Result<(ArcStringVec, ArcStringVec)> { let mut tag_columns = Vec::with_capacity(self.column_id_to_index.len()); let mut field_columns = Vec::with_capacity(self.column_id_to_index.len()); for (&column_id, &column_index) in &self.column_id_to_index { - let column_name = partition + let column_name = chunk .dictionary .lookup_id(column_id) .expect("Find column name in dictionary"); @@ -777,7 +728,7 @@ impl Table { match self.columns[column_index] { Column::Tag(_, _) => tag_columns.push(column_name), _ => { - if partition_predicate.should_include_field(column_id) { + if chunk_predicate.should_include_field(column_id) { field_columns.push(column_name) } } @@ -800,8 +751,8 @@ impl Table { // Returns (field_columns and time) in sorted order fn field_and_time_column_names( &self, - partition_predicate: &PartitionPredicate, - partition: &Partition, + chunk_predicate: &ChunkPredicate, + chunk: &Chunk, ) -> ArcStringVec { let mut field_columns = self .column_id_to_index @@ -810,10 +761,10 @@ impl Table { match self.columns[column_index] { Column::Tag(_, _) => None, // skip tags _ => { - if partition_predicate.should_include_field(column_id) - || partition_predicate.is_time_column(column_id) + if chunk_predicate.should_include_field(column_id) + || chunk_predicate.is_time_column(column_id) { - let column_name = partition + let column_name = chunk .dictionary .lookup_id(column_id) .expect("Find column name in dictionary"); @@ -834,28 +785,26 @@ impl Table { } /// Converts this table to an arrow record batch. - pub fn to_arrow( - &self, - partition: &Partition, - requested_columns: &[&str], - ) -> Result { + pub fn to_arrow(&self, chunk: &Chunk, requested_columns: &[&str]) -> Result { // if requested columns is empty, retrieve all columns in the table if requested_columns.is_empty() { - self.all_to_arrow(partition) + self.all_to_arrow(chunk) } else { - let columns_with_index = self.column_names_with_index(partition, requested_columns)?; + let columns_with_index = self.column_names_with_index(chunk, requested_columns)?; - self.to_arrow_impl(partition, &columns_with_index) + self.to_arrow_impl(chunk, &columns_with_index) } } - fn column_index(&self, partition: &Partition, column_name: &str) -> Result { - let column_id = partition.dictionary.lookup_value(column_name).context( - ColumnNameNotFoundInDictionary { - column_name, - partition: &partition.key, - }, - )?; + fn column_index(&self, chunk: &Chunk, column_name: &str) -> Result { + let column_id = + chunk + .dictionary + .lookup_value(column_name) + .context(ColumnNameNotFoundInDictionary { + column_name, + chunk: &chunk.key, + })?; self.column_id_to_index .get(&column_id) @@ -869,13 +818,13 @@ impl Table { /// Returns (name, index) pairs for all named columns fn column_names_with_index<'a>( &self, - partition: &Partition, + chunk: &Chunk, columns: &[&'a str], ) -> Result> { columns .iter() .map(|&column_name| { - let column_index = self.column_index(partition, column_name)?; + let column_index = self.column_index(chunk, column_name)?; Ok((column_name, column_index)) }) @@ -883,15 +832,15 @@ impl Table { } /// Convert all columns to an arrow record batch - pub fn all_to_arrow(&self, partition: &Partition) -> Result { + pub fn all_to_arrow(&self, chunk: &Chunk) -> Result { let mut requested_columns_with_index = self .column_id_to_index .iter() .map(|(&column_id, &column_index)| { - let column_name = partition.dictionary.lookup_id(column_id).context( + let column_name = chunk.dictionary.lookup_id(column_id).context( ColumnIdNotFoundInDictionary { column_id, - partition: &partition.key, + chunk: &chunk.key, }, )?; Ok((column_name, column_index)) @@ -900,7 +849,7 @@ impl Table { requested_columns_with_index.sort_by(|(a, _), (b, _)| a.cmp(b)); - self.to_arrow_impl(partition, &requested_columns_with_index) + self.to_arrow_impl(chunk, &requested_columns_with_index) } /// Converts this table to an arrow record batch, @@ -908,7 +857,7 @@ impl Table { /// requested columns with index are tuples of column_name, column_index pub fn to_arrow_impl( &self, - partition: &Partition, + chunk: &Chunk, requested_columns_with_index: &[(&str, usize)], ) -> Result { let mut fields = Vec::with_capacity(requested_columns_with_index.len()); @@ -938,10 +887,10 @@ impl Table { match v { None => builder.append_null(), Some(value_id) => { - let tag_value = partition.dictionary.lookup_id(*value_id).context( + let tag_value = chunk.dictionary.lookup_id(*value_id).context( TagValueIdNotFoundInDictionary { value: *value_id, - partition: &partition.key, + chunk: &chunk.key, }, )?; builder.append_value(tag_value) @@ -997,14 +946,12 @@ impl Table { /// just that the entire table can not be ruled out. /// /// false means that no rows in this table could possibly match - pub fn could_match_predicate(&self, partition_predicate: &PartitionPredicate) -> Result { + pub fn could_match_predicate(&self, chunk_predicate: &ChunkPredicate) -> Result { Ok( - self.matches_column_name_predicate(partition_predicate.field_name_predicate.as_ref()) - && self.matches_table_name_predicate( - partition_predicate.table_name_predicate.as_ref(), - ) - && self.matches_timestamp_predicate(partition_predicate)? - && self.has_columns(partition_predicate.required_columns.as_ref()), + self.matches_column_name_predicate(chunk_predicate.field_name_predicate.as_ref()) + && self.matches_table_name_predicate(chunk_predicate.table_name_predicate.as_ref()) + && self.matches_timestamp_predicate(chunk_predicate)? + && self.has_columns(chunk_predicate.required_columns.as_ref()), ) } @@ -1032,14 +979,11 @@ impl Table { /// returns true if there are any timestamps in this table that /// fall within the timestamp range - fn matches_timestamp_predicate( - &self, - partition_predicate: &PartitionPredicate, - ) -> Result { - match &partition_predicate.range { + fn matches_timestamp_predicate(&self, chunk_predicate: &ChunkPredicate) -> Result { + match &chunk_predicate.range { None => Ok(true), Some(range) => { - let time_column_id = partition_predicate.time_column_id; + let time_column_id = chunk_predicate.time_column_id; let time_column = self.column(time_column_id)?; time_column.has_i64_range(range.start, range.end).context( ColumnPredicateEvaluation { @@ -1052,11 +996,11 @@ impl Table { /// returns true if no columns are specified, or the table has all /// columns specified - fn has_columns(&self, columns: Option<&PartitionIdSet>) -> bool { + fn has_columns(&self, columns: Option<&ChunkIdSet>) -> bool { if let Some(columns) = columns { match columns { - PartitionIdSet::AtLeastOneMissing => return false, - PartitionIdSet::Present(symbols) => { + ChunkIdSet::AtLeastOneMissing => return false, + ChunkIdSet::Present(symbols) => { for symbol in symbols { if !self.column_id_to_index.contains_key(symbol) { return false; @@ -1073,12 +1017,12 @@ impl Table { pub fn column_matches_predicate( &self, column: &[Option], - partition_predicate: &PartitionPredicate, + chunk_predicate: &ChunkPredicate, ) -> Result { - match partition_predicate.range { + match chunk_predicate.range { None => Ok(true), Some(range) => { - let time_column_id = partition_predicate.time_column_id; + let time_column_id = chunk_predicate.time_column_id; let time_column = self.column(time_column_id)?; time_column .has_non_null_i64_range(column, range.start, range.end) @@ -1329,8 +1273,8 @@ mod tests { #[test] fn test_has_columns() { - let mut partition = Partition::new("dummy_partition_key"); - let dictionary = &mut partition.dictionary; + let mut chunk = Chunk::new("dummy_chunk_key"); + let dictionary = &mut chunk.dictionary; let mut table = Table::new(dictionary.lookup_value_or_insert("table_name")); let lp_lines = vec![ @@ -1345,34 +1289,34 @@ mod tests { assert!(table.has_columns(None)); - let pred = PartitionIdSet::AtLeastOneMissing; + let pred = ChunkIdSet::AtLeastOneMissing; assert!(!table.has_columns(Some(&pred))); let set = BTreeSet::::new(); - let pred = PartitionIdSet::Present(set); + let pred = ChunkIdSet::Present(set); assert!(table.has_columns(Some(&pred))); let mut set = BTreeSet::new(); set.insert(state_symbol); - let pred = PartitionIdSet::Present(set); + let pred = ChunkIdSet::Present(set); assert!(table.has_columns(Some(&pred))); let mut set = BTreeSet::new(); set.insert(new_symbol); - let pred = PartitionIdSet::Present(set); + let pred = ChunkIdSet::Present(set); assert!(!table.has_columns(Some(&pred))); let mut set = BTreeSet::new(); set.insert(state_symbol); set.insert(new_symbol); - let pred = PartitionIdSet::Present(set); + let pred = ChunkIdSet::Present(set); assert!(!table.has_columns(Some(&pred))); } #[test] fn test_matches_table_name_predicate() { - let mut partition = Partition::new("dummy_partition_key"); - let dictionary = &mut partition.dictionary; + let mut chunk = Chunk::new("dummy_chunk_key"); + let dictionary = &mut chunk.dictionary; let mut table = Table::new(dictionary.lookup_value_or_insert("h2o")); let lp_lines = vec![ @@ -1401,8 +1345,8 @@ mod tests { #[test] fn test_matches_column_name_predicate() { - let mut partition = Partition::new("dummy_partition_key"); - let dictionary = &mut partition.dictionary; + let mut chunk = Chunk::new("dummy_chunk_key"); + let dictionary = &mut chunk.dictionary; let mut table = Table::new(dictionary.lookup_value_or_insert("h2o")); let lp_lines = vec![ @@ -1447,8 +1391,8 @@ mod tests { #[tokio::test] async fn test_series_set_plan() { - let mut partition = Partition::new("dummy_partition_key"); - let dictionary = &mut partition.dictionary; + let mut chunk = Chunk::new("dummy_chunk_key"); + let dictionary = &mut chunk.dictionary; let mut table = Table::new(dictionary.lookup_value_or_insert("table_name")); let lp_lines = vec![ @@ -1461,9 +1405,9 @@ mod tests { write_lines_to_table(&mut table, dictionary, lp_lines); let predicate = PredicateBuilder::default().build(); - let partition_predicate = partition.compile_predicate(&predicate).unwrap(); + let chunk_predicate = chunk.compile_predicate(&predicate).unwrap(); let series_set_plan = table - .series_set_plan(&partition_predicate, &partition) + .series_set_plan(&chunk_predicate, &chunk) .expect("creating the series set plan"); assert_eq!(series_set_plan.table_name.as_ref(), "table_name"); @@ -1494,8 +1438,8 @@ mod tests { // test that the columns and rows come out in the right order (tags then // timestamp) - let mut partition = Partition::new("dummy_partition_key"); - let dictionary = &mut partition.dictionary; + let mut chunk = Chunk::new("dummy_chunk_key"); + let dictionary = &mut chunk.dictionary; let mut table = Table::new(dictionary.lookup_value_or_insert("table_name")); let lp_lines = vec![ @@ -1509,9 +1453,9 @@ mod tests { write_lines_to_table(&mut table, dictionary, lp_lines); let predicate = PredicateBuilder::default().build(); - let partition_predicate = partition.compile_predicate(&predicate).unwrap(); + let chunk_predicate = chunk.compile_predicate(&predicate).unwrap(); let series_set_plan = table - .series_set_plan(&partition_predicate, &partition) + .series_set_plan(&chunk_predicate, &chunk) .expect("creating the series set plan"); assert_eq!(series_set_plan.table_name.as_ref(), "table_name"); @@ -1543,8 +1487,8 @@ mod tests { async fn test_series_set_plan_filter() { // test that filters are applied reasonably - let mut partition = Partition::new("dummy_partition_key"); - let dictionary = &mut partition.dictionary; + let mut chunk = Chunk::new("dummy_chunk_key"); + let dictionary = &mut chunk.dictionary; let mut table = Table::new(dictionary.lookup_value_or_insert("table_name")); let lp_lines = vec![ @@ -1561,10 +1505,10 @@ mod tests { .timestamp_range(190, 210) .build(); - let partition_predicate = partition.compile_predicate(&predicate).unwrap(); + let chunk_predicate = chunk.compile_predicate(&predicate).unwrap(); let series_set_plan = table - .series_set_plan(&partition_predicate, &partition) + .series_set_plan(&chunk_predicate, &chunk) .expect("creating the series set plan"); assert_eq!(series_set_plan.table_name.as_ref(), "table_name"); @@ -1932,8 +1876,8 @@ mod tests { #[tokio::test] async fn test_grouped_window_series_set_plan_nanoseconds() { - let mut partition = Partition::new("dummy_partition_key"); - let dictionary = &mut partition.dictionary; + let mut chunk = Chunk::new("dummy_chunk_key"); + let dictionary = &mut chunk.dictionary; let mut table = Table::new(dictionary.lookup_value_or_insert("table_name")); let lp_lines = vec![ @@ -1961,14 +1905,14 @@ mod tests { .add_expr(col("city").eq(lit("Boston")).or(col("city").eq(lit("LA")))) .timestamp_range(100, 450) .build(); - let partition_predicate = partition.compile_predicate(&predicate).unwrap(); + let chunk_predicate = chunk.compile_predicate(&predicate).unwrap(); let agg = Aggregate::Mean; let every = WindowDuration::from_nanoseconds(200); let offset = WindowDuration::from_nanoseconds(0); let plan = table - .window_grouped_series_set_plan(&partition_predicate, agg, &every, &offset, &partition) + .window_grouped_series_set_plan(&chunk_predicate, agg, &every, &offset, &chunk) .expect("creating the grouped_series set plan"); assert_eq!(plan.tag_columns, *str_vec_to_arc_vec(&["city", "state"])); @@ -1996,8 +1940,8 @@ mod tests { #[tokio::test] async fn test_grouped_window_series_set_plan_months() { - let mut partition = Partition::new("dummy_partition_key"); - let dictionary = &mut partition.dictionary; + let mut chunk = Chunk::new("dummy_chunk_key"); + let dictionary = &mut chunk.dictionary; let mut table = Table::new(dictionary.lookup_value_or_insert("table_name")); let lp_lines = vec![ @@ -2010,14 +1954,14 @@ mod tests { write_lines_to_table(&mut table, dictionary, lp_lines); let predicate = PredicateBuilder::default().build(); - let partition_predicate = partition.compile_predicate(&predicate).unwrap(); + let chunk_predicate = chunk.compile_predicate(&predicate).unwrap(); let agg = Aggregate::Mean; let every = WindowDuration::from_months(1, false); let offset = WindowDuration::from_months(0, false); let plan = table - .window_grouped_series_set_plan(&partition_predicate, agg, &every, &offset, &partition) + .window_grouped_series_set_plan(&chunk_predicate, agg, &every, &offset, &chunk) .expect("creating the grouped_series set plan"); assert_eq!(plan.tag_columns, *str_vec_to_arc_vec(&["city", "state"])); @@ -2041,8 +1985,8 @@ mod tests { #[tokio::test] async fn test_field_name_plan() { - let mut partition = Partition::new("dummy_partition_key"); - let dictionary = &mut partition.dictionary; + let mut chunk = Chunk::new("dummy_chunk_key"); + let dictionary = &mut chunk.dictionary; let mut table = Table::new(dictionary.lookup_value_or_insert("table_name")); let lp_lines = vec![ @@ -2058,10 +2002,10 @@ mod tests { let predicate = PredicateBuilder::default().timestamp_range(0, 200).build(); - let partition_predicate = partition.compile_predicate(&predicate).unwrap(); + let chunk_predicate = chunk.compile_predicate(&predicate).unwrap(); let field_names_set_plan = table - .field_names_plan(&partition_predicate, &partition) + .field_names_plan(&chunk_predicate, &chunk) .expect("creating the field_name plan"); // run the created plan, ensuring the output is as expected @@ -2189,7 +2133,7 @@ mod tests { let lines: Vec<_> = parse_lines(&lp_data).map(|l| l.unwrap()).collect(); - let data = split_lines_into_write_entry_partitions(partition_key_func, &lines); + let data = split_lines_into_write_entry_partitions(chunk_key_func, &lines); let batch = flatbuffers::get_root::>(&data); let entries = batch.entries().expect("at least one entry"); @@ -2205,25 +2149,25 @@ mod tests { } } - fn partition_key_func(_: &ParsedLine<'_>) -> String { - String::from("the_partition_key") + fn chunk_key_func(_: &ParsedLine<'_>) -> String { + String::from("the_chunk_key") } /// Pre-loaded Table for use in tests struct TableFixture { - partition: Partition, + chunk: Chunk, table: Table, } impl TableFixture { /// Create an Table with the specified lines loaded fn new(lp_lines: Vec<&str>) -> Self { - let mut partition = Partition::new("dummy_partition_key"); - let dictionary = &mut partition.dictionary; + let mut chunk = Chunk::new("dummy_chunk_key"); + let dictionary = &mut chunk.dictionary; let mut table = Table::new(dictionary.lookup_value_or_insert("table_name")); write_lines_to_table(&mut table, dictionary, lp_lines); - Self { partition, table } + Self { chunk, table } } /// create a series set plan from the predicate ane aggregates @@ -2234,13 +2178,13 @@ mod tests { agg: Aggregate, group_columns: &[&str], ) -> Vec { - let partition_predicate = self.partition.compile_predicate(&predicate).unwrap(); + let chunk_predicate = self.chunk.compile_predicate(&predicate).unwrap(); let group_columns: Vec<_> = group_columns.iter().map(|s| String::from(*s)).collect(); let grouped_series_set_plan = self .table - .grouped_series_set_plan(&partition_predicate, agg, &group_columns, &self.partition) + .grouped_series_set_plan(&chunk_predicate, agg, &group_columns, &self.chunk) .expect("creating the grouped_series set plan"); // ensure the group prefix got to the right place diff --git a/query/src/lib.rs b/query/src/lib.rs index 2944a32177..cd78b927ce 100644 --- a/query/src/lib.rs +++ b/query/src/lib.rs @@ -46,7 +46,7 @@ use self::predicate::{Predicate, TimestampRange}; #[async_trait] pub trait TSDatabase: Debug + Send + Sync { /// the type of partition that is stored by this database - type Partition: Send + Sync + 'static + PartitionChunk; + type Chunk: Send + Sync + 'static + PartitionChunk; type Error: std::error::Error + Send + Sync + 'static; /// writes parsed lines into this database @@ -109,7 +109,7 @@ pub trait TSDatabase: Debug + Send + Sync { #[async_trait] pub trait SQLDatabase: Debug + Send + Sync { /// the type of partition that is stored by this database - type Partition: Send + Sync + 'static + PartitionChunk; + type Chunk: Send + Sync + 'static + PartitionChunk; type Error: std::error::Error + Send + Sync + 'static; /// Execute the specified query and return arrow record batches with the @@ -132,12 +132,6 @@ pub trait SQLDatabase: Debug + Send + Sync { &self, partition_key: &str, ) -> Result, Self::Error>; - - /// Removes the partition from the database and returns it - async fn remove_partition( - &self, - partition_key: &str, - ) -> Result, Self::Error>; } /// Collection of data that shares the same partition key @@ -150,12 +144,13 @@ pub trait PartitionChunk: Debug + Send + Sync { /// returns the partition metadata stats for every table in the partition fn table_stats(&self) -> Result, Self::Error>; - /// converts the table to an Arrow RecordBatch + /// converts the table to an Arrow RecordBatch and writes to dst fn table_to_arrow( &self, + dst: &mut Vec, table_name: &str, columns: &[&str], - ) -> Result; + ) -> Result<(), Self::Error>; } #[async_trait] diff --git a/query/src/predicate.rs b/query/src/predicate.rs index e80c16ac1c..5de9f04976 100644 --- a/query/src/predicate.rs +++ b/query/src/predicate.rs @@ -62,6 +62,9 @@ pub struct Predicate { /// Optional timestamp range: only rows within this range are included in /// results. Other rows are excluded pub range: Option, + + /// Optional partition key filter + pub partition_key: Option, } impl Predicate { @@ -157,6 +160,16 @@ impl PredicateBuilder { self } + /// Set the partition key restriction + pub fn partition_key(mut self, partition_key: impl Into) -> Self { + assert!( + self.inner.partition_key.is_none(), + "multiple partition key predicates not suported" + ); + self.inner.partition_key = Some(partition_key.into()); + self + } + /// Create a predicate, consuming this builder pub fn build(self) -> Predicate { self.inner diff --git a/query/src/test.rs b/query/src/test.rs index b6ee97abac..d7072cec50 100644 --- a/query/src/test.rs +++ b/query/src/test.rs @@ -221,6 +221,7 @@ fn predicate_to_test_string(predicate: &Predicate) -> String { field_columns, exprs, range, + partition_key, } = predicate; let mut result = String::new(); @@ -242,13 +243,17 @@ fn predicate_to_test_string(predicate: &Predicate) -> String { write!(result, " range: {:?}", range).unwrap(); } + if let Some(partition_key) = partition_key { + write!(result, " partition_key: {:?}", partition_key).unwrap(); + } + write!(result, "}}").unwrap(); result } #[async_trait] impl TSDatabase for TestDatabase { - type Partition = TestPartition; + type Chunk = TestChunk; type Error = TestError; /// Writes parsed lines into this database @@ -402,7 +407,7 @@ impl TSDatabase for TestDatabase { #[async_trait] impl SQLDatabase for TestDatabase { - type Partition = TestPartition; + type Chunk = TestChunk; type Error = TestError; /// Execute the specified query and return arrow record batches with the @@ -424,13 +429,6 @@ impl SQLDatabase for TestDatabase { unimplemented!("table_names_for_partition not yet implemented for test database"); } - async fn remove_partition( - &self, - _partition_key: &str, - ) -> Result, Self::Error> { - unimplemented!() - } - /// Fetch the specified table names and columns as Arrow /// RecordBatches. Columns are returned in the order specified. async fn table_to_arrow( @@ -443,9 +441,9 @@ impl SQLDatabase for TestDatabase { } #[derive(Debug)] -pub struct TestPartition {} +pub struct TestChunk {} -impl PartitionChunk for TestPartition { +impl PartitionChunk for TestChunk { type Error = TestError; fn key(&self) -> &str { @@ -458,9 +456,10 @@ impl PartitionChunk for TestPartition { fn table_to_arrow( &self, + _dst: &mut Vec, _table_name: &str, _columns: &[&str], - ) -> Result { + ) -> Result<(), Self::Error> { unimplemented!() } } diff --git a/server/src/snapshot.rs b/server/src/snapshot.rs index aa2697a4c4..23ea544f49 100644 --- a/server/src/snapshot.rs +++ b/server/src/snapshot.rs @@ -1,4 +1,4 @@ -//! This module contains code for snapshotting a database partition to Parquet +//! This module contains code for snapshotting a database chunk to Parquet //! files in object storage. use arrow_deps::{ arrow::record_batch::RecordBatch, @@ -144,15 +144,14 @@ where async fn run(&self, notify: Option>) -> Result<()> { while let Some((pos, table_name)) = self.next_table() { - let batch = self - .partition - .table_to_arrow(table_name, &[]) + let mut batches = Vec::new(); + self.partition + .table_to_arrow(&mut batches, table_name, &[]) .map_err(|e| Box::new(e) as _) .context(PartitionError)?; let file_name = format!("{}/{}.parquet", &self.data_path, table_name); - - self.write_batch(batch, &file_name).await?; + self.write_batches(batches, &file_name).await?; self.mark_table_finished(pos); if self.should_stop() { @@ -186,12 +185,14 @@ where Ok(()) } - async fn write_batch(&self, batch: RecordBatch, file_name: &str) -> Result<()> { + async fn write_batches(&self, batches: Vec, file_name: &str) -> Result<()> { let mem_writer = MemWriter::default(); { - let mut writer = ArrowWriter::try_new(mem_writer.clone(), batch.schema(), None) + let mut writer = ArrowWriter::try_new(mem_writer.clone(), batches[0].schema(), None) .context(OpeningParquetWriter)?; - writer.write(&batch).context(WritingParquetToMemory)?; + for batch in batches.into_iter() { + writer.write(&batch).context(WritingParquetToMemory)?; + } writer.close().context(ClosingParquetWriter)?; } // drop the reference to the MemWriter that the SerializedFileWriter has @@ -234,7 +235,7 @@ pub struct Status { error: Option, } -pub fn snapshot_partition( +pub fn snapshot_chunk( metadata_path: impl Into, data_path: impl Into, store: Arc, @@ -325,7 +326,7 @@ mod tests { use data_types::database_rules::DatabaseRules; use futures::TryStreamExt; use influxdb_line_protocol::parse_lines; - use mutable_buffer::partition::Partition as PartitionWB; + use mutable_buffer::chunk::Chunk as ChunkWB; use object_store::InMemory; #[tokio::test] @@ -339,23 +340,23 @@ mem,host=A,region=west used=45 1 let lines: Vec<_> = parse_lines(lp).map(|l| l.unwrap()).collect(); let write = lines_to_replicated_write(1, 1, &lines, &DatabaseRules::default()); - let mut partition = PartitionWB::new("testaroo"); + let mut chunk = ChunkWB::new("testaroo"); for e in write.write_buffer_batch().unwrap().entries().unwrap() { - partition.write_entry(&e).unwrap(); + chunk.write_entry(&e).unwrap(); } let store = Arc::new(ObjectStore::new_in_memory(InMemory::new())); - let partition = Arc::new(partition); + let chunk = Arc::new(chunk); let (tx, rx) = tokio::sync::oneshot::channel(); let metadata_path = "/meta"; let data_path = "/data"; - let snapshot = snapshot_partition( + let snapshot = snapshot_chunk( metadata_path, data_path, store.clone(), - partition.clone(), + chunk.clone(), Some(tx), ) .unwrap(); @@ -393,16 +394,16 @@ mem,host=A,region=west used=45 1 ]; let store = Arc::new(ObjectStore::new_in_memory(InMemory::new())); - let partition = Arc::new(PartitionWB::new("testaroo")); + let chunk = Arc::new(ChunkWB::new("testaroo")); let metadata_path = "/meta".to_string(); let data_path = "/data".to_string(); let snapshot = Snapshot::new( - partition.key.clone(), + chunk.key.clone(), metadata_path, data_path, store, - partition, + chunk, tables, ); diff --git a/src/server/http_routes.rs b/src/server/http_routes.rs index ba118467ef..e84187a8cb 100644 --- a/src/server/http_routes.rs +++ b/src/server/http_routes.rs @@ -497,7 +497,7 @@ async fn list_partitions( struct SnapshotInfo { org: String, bucket: String, - partition: String, + chunk: String, } #[tracing::instrument(level = "debug")] @@ -542,13 +542,13 @@ async fn snapshot_partition