diff --git a/Cargo.lock b/Cargo.lock index 5c519f64dc..dabbf52f49 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2332,6 +2332,7 @@ dependencies = [ "bytes", "data_types", "futures", + "internal_types", "object_store", "parking_lot", "snafu", diff --git a/parquet_file/Cargo.toml b/parquet_file/Cargo.toml index 2750677be4..965056fb07 100644 --- a/parquet_file/Cargo.toml +++ b/parquet_file/Cargo.toml @@ -8,7 +8,8 @@ edition = "2018" arrow_deps = { path = "../arrow_deps" } bytes = "1.0" data_types = { path = "../data_types" } -futures = "0.3" +futures = "0.3.7" +internal_types = {path = "../internal_types"} object_store = {path = "../object_store"} parking_lot = "0.11.1" snafu = "0.6" diff --git a/parquet_file/src/chunk.rs b/parquet_file/src/chunk.rs index 8f971924b7..d352cfd223 100644 --- a/parquet_file/src/chunk.rs +++ b/parquet_file/src/chunk.rs @@ -1,12 +1,37 @@ + +use snafu::{OptionExt, ResultExt, Snafu}; use std::collections::BTreeSet; use crate::table::Table; use data_types::partition_metadata::TableSummary; +use internal_types::{schema::Schema, selection::Selection}; use object_store::path::Path; use tracker::{MemRegistry, MemTracker}; + + use std::mem; +#[derive(Debug, Snafu)] +pub enum Error { + #[snafu(display("Error writing table '{}': {}", table_name, source))] + TableWrite { + table_name: String, + source: crate::table::Error, + }, + + #[snafu(display("Table Error in '{}': {}", table_name, source))] + NamedTableError { + table_name: String, + source: crate::table::Error, + }, + + #[snafu(display("Table '{}' not found in chunk {}", table_name, chunk_id))] + NamedTableNotFoundInChunk { table_name: String, chunk_id: u64 }, +} + +pub type Result = std::result::Result; + #[derive(Debug)] pub struct Chunk { /// Partition this chunk belongs to @@ -82,4 +107,20 @@ impl Chunk { size + self.partition_key.len() + mem::size_of::() + mem::size_of::() } + + /// Return Schema for the specified table / columns + pub fn table_schema(&self, table_name: &str, selection: Selection<'_>) -> Result { + let table = self + .tables + .iter() + .find(|t| t.has_table(table_name)) + .context(NamedTableNotFoundInChunk { + table_name, + chunk_id: self.id(), + })?; + + table + .schema(selection) + .context(NamedTableError { table_name }) + } } diff --git a/parquet_file/src/table.rs b/parquet_file/src/table.rs index 4f2776e8ea..f642c637ec 100644 --- a/parquet_file/src/table.rs +++ b/parquet_file/src/table.rs @@ -1,7 +1,33 @@ +use snafu::{ResultExt, Snafu}; +use std::mem; + use data_types::partition_metadata::TableSummary; +use internal_types::{schema::{builder::SchemaBuilder, Schema}, selection::Selection}; use object_store::path::Path; -use std::mem; +#[derive(Debug, Snafu)] +pub enum Error { + #[snafu(display("Error writing table '{}'", table_name))] + TableWrite { + table_name: String, + }, + + #[snafu(display("Table Error in '{}'", table_name))] + NamedTableError { + table_name: String, + }, + + #[snafu(display("Table '{}' not found in chunk {}", table_name, chunk_id))] + NamedTableNotFoundInChunk { table_name: String, chunk_id: u64 }, + + #[snafu(display("Internal error converting schema: {}", source))] + InternalSchema { + source: internal_types::schema::builder::Error, + }, +} + +pub type Result = std::result::Result; + /// Table that belongs to a chunk persisted in a parquet file in object store #[derive(Debug, Clone)] @@ -47,4 +73,70 @@ impl Table { pub fn path(&self) -> Path { self.object_store_path.clone() } + + + /// Return all columns of this table + // pub fn all_columns_selection(&self) -> Result> { + // // TODO + // let cols: Vec = vec![]; + // let selection = TableColSelection { cols }; + + // // sort so the columns always come out in a predictable name + // Ok(selection.sort_by_name()) + // } + + // /// Returns a column selection for just the specified columns + // fn specific_columns_selection<'a>( + // &self, + // columns: &'a [&'a str], + // ) -> Result> { + // // TODO + // let cols: Vec = vec![]; + + // Ok(TableColSelection { cols }) + // } + + pub fn schema(&self, selection: Selection<'_>) -> Result { + + let mut schema_builder = SchemaBuilder::new(); + + // // TODO: maybe just refactor MB's corresponding one + // for col in &selection.cols { + // let column_name = col.column_name; + // let column = self.column(col.column_id)?; + + // schema_builder = match column { + // Column::String(_, _) => schema_builder.field(column_name, ArrowDataType::Utf8), + // Column::Tag(_, _) => schema_builder.tag(column_name), + // Column::F64(_, _) => schema_builder.field(column_name, ArrowDataType::Float64), + // Column::I64(_, _) => { + // if column_name == TIME_COLUMN_NAME { + // schema_builder.timestamp() + // } else { + // schema_builder.field(column_name, ArrowDataType::Int64) + // } + // } + // Column::U64(_, _) => schema_builder.field(column_name, ArrowDataType::UInt64), + // Column::Bool(_, _) => schema_builder.field(column_name, ArrowDataType::Boolean), + // }; + // } + + schema_builder.build().context(InternalSchema) + + + // translate chunk selection into name/indexes: + // let selection = match selection { + // Selection::All => self.all_columns_selection(), + // Selection::Some(cols) => self.specific_columns_selection(cols), + // }?; + // self.schema_impl(&selection) + } + + // fn schema_impl(&self, selection: &TableColSelection<'_>) -> Result { + // let mut schema_builder = SchemaBuilder::new(); + + // // TODO: maybe just refactor MB's corresponding one + + // schema_builder.build().context(InternalSchema) + // } } diff --git a/server/src/db/chunk.rs b/server/src/db/chunk.rs index 99898df8f6..f442360045 100644 --- a/server/src/db/chunk.rs +++ b/server/src/db/chunk.rs @@ -28,6 +28,12 @@ pub enum Error { chunk_id: u32, }, + #[snafu(display("Parquet File Error in chunk {}: {}", chunk_id, source))] + ParquetFileChunkError { + source: parquet_file::chunk::Error, + chunk_id: u32, + }, + #[snafu(display("Internal error restricting schema: {}", source))] InternalSelectingSchema { source: internal_types::schema::Error, @@ -212,9 +218,9 @@ impl PartitionChunk for DBChunk { Ok(schema) } - Self::ParquetFile { .. } => { - unimplemented!("parquet file not implemented for table schema") - } + Self::ParquetFile { chunk, .. } => chunk + .table_schema(table_name, selection) + .context(ParquetFileChunkError{ chunk_id: chunk.id() }), } }