feat: initial work for querying data from parquet file in object store

pull/24376/head
Nga Tran 2021-04-13 13:47:10 -04:00
parent 55a77914b1
commit 4a6d6bd7ad
5 changed files with 146 additions and 5 deletions

1
Cargo.lock generated
View File

@ -2332,6 +2332,7 @@ dependencies = [
"bytes",
"data_types",
"futures",
"internal_types",
"object_store",
"parking_lot",
"snafu",

View File

@ -8,7 +8,8 @@ edition = "2018"
arrow_deps = { path = "../arrow_deps" }
bytes = "1.0"
data_types = { path = "../data_types" }
futures = "0.3"
futures = "0.3.7"
internal_types = {path = "../internal_types"}
object_store = {path = "../object_store"}
parking_lot = "0.11.1"
snafu = "0.6"

View File

@ -1,12 +1,37 @@
use snafu::{OptionExt, ResultExt, Snafu};
use std::collections::BTreeSet;
use crate::table::Table;
use data_types::partition_metadata::TableSummary;
use internal_types::{schema::Schema, selection::Selection};
use object_store::path::Path;
use tracker::{MemRegistry, MemTracker};
use std::mem;
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Error writing table '{}': {}", table_name, source))]
TableWrite {
table_name: String,
source: crate::table::Error,
},
#[snafu(display("Table Error in '{}': {}", table_name, source))]
NamedTableError {
table_name: String,
source: crate::table::Error,
},
#[snafu(display("Table '{}' not found in chunk {}", table_name, chunk_id))]
NamedTableNotFoundInChunk { table_name: String, chunk_id: u64 },
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
#[derive(Debug)]
pub struct Chunk {
/// Partition this chunk belongs to
@ -82,4 +107,20 @@ impl Chunk {
size + self.partition_key.len() + mem::size_of::<u32>() + mem::size_of::<Self>()
}
/// Return Schema for the specified table / columns
pub fn table_schema(&self, table_name: &str, selection: Selection<'_>) -> Result<Schema> {
let table = self
.tables
.iter()
.find(|t| t.has_table(table_name))
.context(NamedTableNotFoundInChunk {
table_name,
chunk_id: self.id(),
})?;
table
.schema(selection)
.context(NamedTableError { table_name })
}
}

View File

@ -1,7 +1,33 @@
use snafu::{ResultExt, Snafu};
use std::mem;
use data_types::partition_metadata::TableSummary;
use internal_types::{schema::{builder::SchemaBuilder, Schema}, selection::Selection};
use object_store::path::Path;
use std::mem;
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Error writing table '{}'", table_name))]
TableWrite {
table_name: String,
},
#[snafu(display("Table Error in '{}'", table_name))]
NamedTableError {
table_name: String,
},
#[snafu(display("Table '{}' not found in chunk {}", table_name, chunk_id))]
NamedTableNotFoundInChunk { table_name: String, chunk_id: u64 },
#[snafu(display("Internal error converting schema: {}", source))]
InternalSchema {
source: internal_types::schema::builder::Error,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Table that belongs to a chunk persisted in a parquet file in object store
#[derive(Debug, Clone)]
@ -47,4 +73,70 @@ impl Table {
pub fn path(&self) -> Path {
self.object_store_path.clone()
}
/// Return all columns of this table
// pub fn all_columns_selection(&self) -> Result<TableColSelection<'a>> {
// // TODO
// let cols: Vec<ColSelection> = vec![];
// let selection = TableColSelection { cols };
// // sort so the columns always come out in a predictable name
// Ok(selection.sort_by_name())
// }
// /// Returns a column selection for just the specified columns
// fn specific_columns_selection<'a>(
// &self,
// columns: &'a [&'a str],
// ) -> Result<TableColSelection<'a>> {
// // TODO
// let cols: Vec<ColSelection> = vec![];
// Ok(TableColSelection { cols })
// }
pub fn schema(&self, selection: Selection<'_>) -> Result<Schema> {
let mut schema_builder = SchemaBuilder::new();
// // TODO: maybe just refactor MB's corresponding one
// for col in &selection.cols {
// let column_name = col.column_name;
// let column = self.column(col.column_id)?;
// schema_builder = match column {
// Column::String(_, _) => schema_builder.field(column_name, ArrowDataType::Utf8),
// Column::Tag(_, _) => schema_builder.tag(column_name),
// Column::F64(_, _) => schema_builder.field(column_name, ArrowDataType::Float64),
// Column::I64(_, _) => {
// if column_name == TIME_COLUMN_NAME {
// schema_builder.timestamp()
// } else {
// schema_builder.field(column_name, ArrowDataType::Int64)
// }
// }
// Column::U64(_, _) => schema_builder.field(column_name, ArrowDataType::UInt64),
// Column::Bool(_, _) => schema_builder.field(column_name, ArrowDataType::Boolean),
// };
// }
schema_builder.build().context(InternalSchema)
// translate chunk selection into name/indexes:
// let selection = match selection {
// Selection::All => self.all_columns_selection(),
// Selection::Some(cols) => self.specific_columns_selection(cols),
// }?;
// self.schema_impl(&selection)
}
// fn schema_impl(&self, selection: &TableColSelection<'_>) -> Result<Schema> {
// let mut schema_builder = SchemaBuilder::new();
// // TODO: maybe just refactor MB's corresponding one
// schema_builder.build().context(InternalSchema)
// }
}

View File

@ -28,6 +28,12 @@ pub enum Error {
chunk_id: u32,
},
#[snafu(display("Parquet File Error in chunk {}: {}", chunk_id, source))]
ParquetFileChunkError {
source: parquet_file::chunk::Error,
chunk_id: u32,
},
#[snafu(display("Internal error restricting schema: {}", source))]
InternalSelectingSchema {
source: internal_types::schema::Error,
@ -212,9 +218,9 @@ impl PartitionChunk for DBChunk {
Ok(schema)
}
Self::ParquetFile { .. } => {
unimplemented!("parquet file not implemented for table schema")
}
Self::ParquetFile { chunk, .. } => chunk
.table_schema(table_name, selection)
.context(ParquetFileChunkError{ chunk_id: chunk.id() }),
}
}