feat: Add 2 main functions table_schema and table_names for Parquet Chunk ato pay a foundation for querying it

pull/24376/head
Nga Tran 2021-04-13 18:23:55 -04:00
parent c4e56493d8
commit 05bf28ce85
5 changed files with 129 additions and 104 deletions

View File

@ -1,15 +1,12 @@
use snafu::{OptionExt, ResultExt, Snafu};
use std::collections::BTreeSet;
use crate::table::Table;
use data_types::partition_metadata::TableSummary;
use data_types::{partition_metadata::TableSummary, timestamp::TimestampRange};
use internal_types::{schema::Schema, selection::Selection};
use object_store::path::Path;
use tracker::{MemRegistry, MemTracker};
use std::mem;
#[derive(Debug, Snafu)]
@ -80,8 +77,15 @@ impl Chunk {
}
/// Add a chunk's table and its summary
pub fn add_table(&mut self, table_summary: TableSummary, file_location: Path) {
self.tables.push(Table::new(table_summary, file_location));
pub fn add_table(
&mut self,
table_summary: TableSummary,
file_location: Path,
schema: Schema,
range: Option<TimestampRange>,
) {
self.tables
.push(Table::new(table_summary, file_location, schema, range));
}
/// Return true if this chunk includes the given table
@ -108,8 +112,8 @@ impl Chunk {
size + self.partition_key.len() + mem::size_of::<u32>() + mem::size_of::<Self>()
}
/// Return Schema for the specified table / columns
pub fn table_schema(&self, table_name: &str, selection: Selection<'_>) -> Result<Schema> {
/// Return Schema for the specified table / columns
pub fn table_schema(&self, table_name: &str, selection: Selection<'_>) -> Result<Schema> {
let table = self
.tables
.iter()
@ -123,4 +127,17 @@ impl Chunk {
.schema(selection)
.context(NamedTableError { table_name })
}
pub fn table_names(
&self,
timestamp_range: Option<TimestampRange>,
) -> impl Iterator<Item = String> + '_ {
self.tables.iter().flat_map(move |t| {
if t.matches_predicate(&timestamp_range) {
Some(t.name())
} else {
None
}
})
}
}

View File

@ -1,34 +1,20 @@
use snafu::{ResultExt, Snafu};
use std::mem;
use data_types::partition_metadata::TableSummary;
use internal_types::{schema::{builder::SchemaBuilder, Schema}, selection::Selection};
use data_types::{partition_metadata::TableSummary, timestamp::TimestampRange};
use internal_types::{schema::Schema, selection::Selection};
use object_store::path::Path;
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Error writing table '{}'", table_name))]
TableWrite {
table_name: String,
},
#[snafu(display("Table Error in '{}'", table_name))]
NamedTableError {
table_name: String,
},
#[snafu(display("Table '{}' not found in chunk {}", table_name, chunk_id))]
NamedTableNotFoundInChunk { table_name: String, chunk_id: u64 },
#[snafu(display("Internal error converting schema: {}", source))]
InternalSchema {
source: internal_types::schema::builder::Error,
#[snafu(display("Failed to select columns: {}", source))]
SelectColumns {
source: internal_types::schema::Error,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Table that belongs to a chunk persisted in a parquet file in object store
#[derive(Debug, Clone)]
pub struct Table {
@ -39,13 +25,26 @@ pub struct Table {
/// <writer id>/<database>/data/<partition key>/<chunk
/// id>/<tablename>.parquet
object_store_path: Path,
/// Schema that goes with this table's parquet file
table_schema: Schema,
/// Timestamp rang of this table's parquet file
timestamp_range: Option<TimestampRange>,
}
impl Table {
pub fn new(meta: TableSummary, path: Path) -> Self {
pub fn new(
meta: TableSummary,
path: Path,
schema: Schema,
range: Option<TimestampRange>,
) -> Self {
Self {
table_summary: meta,
object_store_path: path,
table_schema: schema,
timestamp_range: range,
}
}
@ -62,6 +61,7 @@ impl Table {
mem::size_of::<Self>()
+ self.table_summary.size()
+ mem::size_of_val(&self.object_store_path)
+ mem::size_of_val(&self.table_schema)
}
/// Return name of this table
@ -74,69 +74,23 @@ impl Table {
self.object_store_path.clone()
}
/// Return all columns of this table
// pub fn all_columns_selection(&self) -> Result<TableColSelection<'a>> {
// // TODO
// let cols: Vec<ColSelection> = vec![];
// let selection = TableColSelection { cols };
// // sort so the columns always come out in a predictable name
// Ok(selection.sort_by_name())
// }
// /// Returns a column selection for just the specified columns
// fn specific_columns_selection<'a>(
// &self,
// columns: &'a [&'a str],
// ) -> Result<TableColSelection<'a>> {
// // TODO
// let cols: Vec<ColSelection> = vec![];
// Ok(TableColSelection { cols })
// }
/// return schema of this table for specified selection columns
pub fn schema(&self, selection: Selection<'_>) -> Result<Schema> {
let mut schema_builder = SchemaBuilder::new();
// // TODO: maybe just refactor MB's corresponding one
// for col in &selection.cols {
// let column_name = col.column_name;
// let column = self.column(col.column_id)?;
// schema_builder = match column {
// Column::String(_, _) => schema_builder.field(column_name, ArrowDataType::Utf8),
// Column::Tag(_, _) => schema_builder.tag(column_name),
// Column::F64(_, _) => schema_builder.field(column_name, ArrowDataType::Float64),
// Column::I64(_, _) => {
// if column_name == TIME_COLUMN_NAME {
// schema_builder.timestamp()
// } else {
// schema_builder.field(column_name, ArrowDataType::Int64)
// }
// }
// Column::U64(_, _) => schema_builder.field(column_name, ArrowDataType::UInt64),
// Column::Bool(_, _) => schema_builder.field(column_name, ArrowDataType::Boolean),
// };
// }
schema_builder.build().context(InternalSchema)
// translate chunk selection into name/indexes:
// let selection = match selection {
// Selection::All => self.all_columns_selection(),
// Selection::Some(cols) => self.specific_columns_selection(cols),
// }?;
// self.schema_impl(&selection)
Ok(match selection {
Selection::All => self.table_schema.clone(),
Selection::Some(columns) => {
let columns = self.table_schema.select(columns).context(SelectColumns)?;
self.table_schema.project(&columns)
}
})
}
// fn schema_impl(&self, selection: &TableColSelection<'_>) -> Result<Schema> {
// let mut schema_builder = SchemaBuilder::new();
// // TODO: maybe just refactor MB's corresponding one
// schema_builder.build().context(InternalSchema)
// }
pub fn matches_predicate(&self, timestamp_range: &Option<TimestampRange>) -> bool {
match (self.timestamp_range, timestamp_range) {
(Some(a), Some(b)) => !a.disjoint(b),
(None, Some(_)) => false, /* If this chunk doesn't have a time column it can't match */
// the predicate
(_, None) => true,
}
}
}

View File

@ -316,6 +316,19 @@ impl Chunk {
Ok(table.read_filter(&select_columns, &predicate))
}
/// Returns timestamp range for specified table
pub fn read_time_range(&self, table_name: &str) -> Result<Option<(i64, i64)>> {
// read lock on chunk.
let chunk_data = self.chunk_data.read().unwrap();
let table = chunk_data
.data
.get(table_name)
.context(TableNotFound { table_name })?;
Ok(table.time_range())
}
/// Returns an iterable collection of data in group columns and aggregate
/// columns, optionally filtered by the provided predicate. Results are
/// merged across all row groups within the returned table.

View File

@ -3,6 +3,7 @@
use std::any::Any;
use std::{
convert::TryInto,
num::NonZeroU32,
sync::{
atomic::{AtomicU64, AtomicUsize, Ordering},
@ -15,15 +16,19 @@ use observability_deps::tracing::{debug, info};
use parking_lot::{Mutex, RwLock};
use snafu::{ensure, OptionExt, ResultExt, Snafu};
use arrow_deps::datafusion::{
catalog::{catalog::CatalogProvider, schema::SchemaProvider},
physical_plan::SendableRecordBatchStream,
use arrow_deps::{
arrow::datatypes::SchemaRef as ArrowSchemaRef,
datafusion::{
catalog::{catalog::CatalogProvider, schema::SchemaProvider},
physical_plan::SendableRecordBatchStream,
},
};
use catalog::{chunk::ChunkState, Catalog};
pub(crate) use chunk::DBChunk;
use data_types::{
chunk::ChunkSummary, database_rules::DatabaseRules, partition_metadata::PartitionSummary,
timestamp::TimestampRange,
};
use internal_types::selection::Selection;
use object_store::ObjectStore;
@ -115,6 +120,18 @@ pub enum Error {
chunk_id: u32,
},
#[snafu(display("Read Buffer Schema Error in chunk {}: {}", chunk_id, source))]
ReadBufferChunkSchemaError {
source: read_buffer::Error,
chunk_id: u32,
},
#[snafu(display("Read Buffer Timestamp Error in chunk {}: {}", chunk_id, source))]
ReadBufferChunkTimestampError {
chunk_id: u32,
source: read_buffer::Error,
},
#[snafu(display("Error writing to object store: {}", source))]
WritingToObjectStore {
source: parquet_file::storage::Error,
@ -135,6 +152,11 @@ pub enum Error {
#[snafu(display("Error building sequenced entry: {}", source))]
SequencedEntryError { source: entry::Error },
#[snafu(display("Error building sequenced entry: {}", source))]
SchemaConversion {
source: internal_types::schema::Error,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@ -487,12 +509,16 @@ impl Db {
let read_results = rb_chunk
.read_filter(stats.name.as_str(), predicate, Selection::All)
.context(ReadBufferChunkError { chunk_id })?;
let schema = rb_chunk
let arrow_schema: ArrowSchemaRef = rb_chunk
.read_filter_table_schema(stats.name.as_str(), Selection::All)
.context(ReadBufferChunkError { chunk_id })?
.context(ReadBufferChunkSchemaError { chunk_id })?
.into();
let stream: SendableRecordBatchStream =
Box::pin(streams::ReadFilterResultsStream::new(read_results, schema));
let time_range = rb_chunk
.read_time_range(stats.name.as_str())
.context(ReadBufferChunkTimestampError { chunk_id })?;
let stream: SendableRecordBatchStream = Box::pin(
streams::ReadFilterResultsStream::new(read_results, Arc::clone(&arrow_schema)),
);
// Write this table data into the object store
let path = storage
@ -506,7 +532,20 @@ impl Db {
.context(WritingToObjectStore)?;
// Now add the saved info into the parquet_chunk
parquet_chunk.add_table(stats, path);
let schema = Arc::clone(&arrow_schema)
.try_into()
.context(SchemaConversion)?;
let table_time_range = match time_range {
None => None,
Some((start, end)) => {
if start < end {
Some(TimestampRange::new(start, end))
} else {
None
}
}
};
parquet_chunk.add_table(stats, path, schema, table_time_range);
}
// Relock the chunk again (nothing else should have been able

View File

@ -171,9 +171,7 @@ impl PartitionChunk for DBChunk {
chunk.table_names(&rb_predicate, &BTreeSet::new())
}
Self::ParquetFile { .. } => {
unimplemented!("parquet file not implemented for scan_data")
}
Self::ParquetFile { chunk, .. } => chunk.table_names(predicate.range).collect(),
};
// Prune out tables that should not be
@ -218,9 +216,13 @@ impl PartitionChunk for DBChunk {
Ok(schema)
}
Self::ParquetFile { chunk, .. } => chunk
.table_schema(table_name, selection)
.context(ParquetFileChunkError{ chunk_id: chunk.id() }),
Self::ParquetFile { chunk, .. } => {
chunk
.table_schema(table_name, selection)
.context(ParquetFileChunkError {
chunk_id: chunk.id(),
})
}
}
}