Merge pull request #1409 from influxdata/crepererum/issue1379

feat: implement parquet metadata handling
pull/24376/head
kodiakhq[bot] 2021-05-05 13:30:23 +00:00 committed by GitHub
commit dab086821c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 1191 additions and 71 deletions

3
Cargo.lock generated
View File

@ -156,7 +156,9 @@ dependencies = [
"arrow",
"arrow-flight",
"datafusion",
"futures",
"parquet",
"parquet-format",
]
[[package]]
@ -2412,6 +2414,7 @@ dependencies = [
"prost-types",
"query",
"snafu",
"thrift",
"tokio",
"tokio-stream",
"tracker",

View File

@ -13,6 +13,11 @@ arrow-flight = { git = "https://github.com/apache/arrow-rs", rev = "ed00e4d4a160
# Turn off optional datafusion features (function packages)
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "f43dc444eb510f0ff170adecb5a23afb39c489a8", default-features = false }
futures = "0.3"
# Turn off the "arrow" feature; it currently has a bug that causes the crate to rebuild every time
# and we're not currently using it anyway
parquet = { git = "https://github.com/apache/arrow-rs", rev = "ed00e4d4a160cd5182bfafb81fee2240ec005014", default-features = false, features = ["snap", "brotli", "flate2", "lz4", "zstd"] }
# Keep in sync with the parquet version used above!
parquet-format = "2.6.1"

View File

@ -11,6 +11,7 @@ pub use arrow;
pub use arrow_flight;
pub use datafusion;
pub use parquet;
pub use parquet_format;
pub mod util;

View File

@ -1,15 +1,20 @@
//! Utility functions for working with arrow
use std::iter::FromIterator;
use std::sync::Arc;
use std::{
iter::FromIterator,
task::{Context, Poll},
};
use arrow::{
array::{ArrayRef, StringArray},
error::ArrowError,
datatypes::SchemaRef,
error::{ArrowError, Result as ArrowResult},
record_batch::RecordBatch,
};
use datafusion::{
logical_plan::{binary_expr, col, lit, Expr, Operator},
physical_plan::RecordBatchStream,
scalar::ScalarValue,
};
@ -118,6 +123,56 @@ impl AndExprBuilder {
}
}
/// A RecordBatchStream created from in-memory RecordBatches.
#[derive(Debug)]
pub struct MemoryStream {
schema: SchemaRef,
batches: Vec<RecordBatch>,
}
impl MemoryStream {
/// Create new stream.
///
/// Must at least pass one record batch!
pub fn new(batches: Vec<RecordBatch>) -> Self {
assert!(!batches.is_empty(), "must at least pass one record batch");
Self {
schema: batches[0].schema(),
batches,
}
}
/// Create new stream with provided schema.
pub fn new_with_schema(batches: Vec<RecordBatch>, schema: SchemaRef) -> Self {
Self { schema, batches }
}
}
impl RecordBatchStream for MemoryStream {
fn schema(&self) -> SchemaRef {
Arc::clone(&self.schema)
}
}
impl futures::Stream for MemoryStream {
type Item = ArrowResult<RecordBatch>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
_: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
if self.batches.is_empty() {
Poll::Ready(None)
} else {
Poll::Ready(Some(Ok(self.batches.remove(0))))
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
(self.batches.len(), Some(self.batches.len()))
}
}
#[cfg(test)]
mod tests {
use super::*;

View File

@ -18,6 +18,7 @@ prost = "0.7"
prost-types = "0.7"
query = { path = "../query" }
snafu = "0.6"
thrift = "0.13"
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync"] }
tokio-stream = "0.1"
tracker = { path = "../tracker" }

View File

@ -133,20 +133,25 @@ impl Chunk {
/// Return Schema for the specified table / columns
pub fn table_schema(&self, table_name: &str, selection: Selection<'_>) -> Result<Schema> {
let table = self
.tables
.iter()
.find(|t| t.has_table(table_name))
.context(NamedTableNotFoundInChunk {
table_name,
chunk_id: self.id(),
})?;
let table = self.find_table(table_name)?;
table
.schema(selection)
.context(NamedTableError { table_name })
}
/// Return object store path for specified table
pub fn table_path(&self, table_name: &str) -> Result<Path> {
let table = self.find_table(table_name)?;
Ok(table.path())
}
/// Return Schema for the specified table / columns
pub fn timestamp_range(&self, table_name: &str) -> Result<Option<TimestampRange>> {
let table = self.find_table(table_name)?;
Ok(table.timestamp_range())
}
// Return all tables of this chunk whose timestamp overlaps with the give one
pub fn table_names(
&self,
@ -168,14 +173,7 @@ impl Chunk {
table_name: &str,
selection: Selection<'_>,
) -> Option<BTreeSet<String>> {
let table = self
.tables
.iter()
.find(|t| t.has_table(table_name))
.context(NamedTableNotFoundInChunk {
table_name,
chunk_id: self.id(),
});
let table = self.find_table(table_name);
match table {
Ok(table) => table.column_names(selection),
@ -190,14 +188,7 @@ impl Chunk {
predicate: &Predicate,
selection: Selection<'_>,
) -> Result<SendableRecordBatchStream> {
let table = self
.tables
.iter()
.find(|t| t.has_table(table_name))
.context(NamedTableNotFoundInChunk {
table_name,
chunk_id: self.id(),
})?;
let table = self.find_table(table_name)?;
table
.read_filter(predicate, selection)
@ -211,4 +202,14 @@ impl Chunk {
pub fn rows(&self) -> usize {
self.tables.iter().map(|t| t.rows()).sum()
}
fn find_table(&self, table_name: &str) -> Result<&Table> {
self.tables
.iter()
.find(|t| t.has_table(table_name))
.context(NamedTableNotFoundInChunk {
table_name,
chunk_id: self.id(),
})
}
}

View File

@ -9,5 +9,6 @@
pub mod catalog;
pub mod chunk;
pub mod metadata;
pub mod storage;
pub mod table;

1090
parquet_file/src/metadata.rs Normal file

File diff suppressed because it is too large Load Diff

View File

@ -96,6 +96,11 @@ impl Table {
})
}
/// Return timestamp range of this table
pub fn timestamp_range(&self) -> Option<TimestampRange> {
self.timestamp_range
}
// Check if 2 time ranges overlap
pub fn matches_predicate(&self, timestamp_range: &Option<TimestampRange>) -> bool {
match (self.timestamp_range, timestamp_range) {

View File

@ -1,4 +1,4 @@
use arrow_deps::datafusion::physical_plan::SendableRecordBatchStream;
use arrow_deps::{datafusion::physical_plan::SendableRecordBatchStream, util::MemoryStream};
use internal_types::{schema::Schema, selection::Selection};
use mutable_buffer::chunk::snapshot::ChunkSnapshot;
use object_store::path::Path;
@ -13,10 +13,7 @@ use std::{
sync::Arc,
};
use super::{
pred::to_read_buffer_predicate,
streams::{MemoryStream, ReadFilterResultsStream},
};
use super::{pred::to_read_buffer_predicate, streams::ReadFilterResultsStream};
#[derive(Debug, Snafu)]
pub enum Error {
@ -284,7 +281,7 @@ impl PartitionChunk for DbChunk {
.read_filter(table_name, selection)
.context(MutableBufferChunk)?;
Ok(Box::pin(MemoryStream::new(batch)))
Ok(Box::pin(MemoryStream::new(vec![batch])))
}
Self::ReadBuffer { chunk, .. } => {
// Error converting to a rb_predicate needs to fail

View File

@ -44,42 +44,3 @@ impl futures::Stream for ReadFilterResultsStream {
// TODO is there a useful size_hint to pass?
}
/// A RecordBatchStream created from a single RecordBatch
///
/// Unfortunately datafusion's MemoryStream is crate-local
#[derive(Debug)]
pub(crate) struct MemoryStream {
schema: SchemaRef,
batch: Option<RecordBatch>,
}
impl MemoryStream {
pub fn new(batch: RecordBatch) -> Self {
Self {
schema: batch.schema(),
batch: Some(batch),
}
}
}
impl RecordBatchStream for MemoryStream {
fn schema(&self) -> SchemaRef {
Arc::clone(&self.schema)
}
}
impl futures::Stream for MemoryStream {
type Item = ArrowResult<RecordBatch>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
_: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
Poll::Ready(self.batch.take().map(Ok))
}
fn size_hint(&self) -> (usize, Option<usize>) {
(1, Some(1))
}
}