Merge pull request #4641 from influxdata/dom/parquet-store

refactor: parquet store
pull/24376/head
kodiakhq[bot] 2022-05-19 12:58:44 +00:00 committed by GitHub
commit 0c21693826
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 159 additions and 189 deletions

3
Cargo.lock generated
View File

@ -2565,6 +2565,7 @@ dependencies = [
"ioxd_common",
"metric",
"object_store",
"parquet_file",
"thiserror",
"trace",
"workspace-hack",
@ -2606,6 +2607,7 @@ dependencies = [
"ioxd_common",
"metric",
"object_store",
"parquet_file",
"querier",
"service_grpc_flight",
"service_grpc_influxrpc",
@ -4230,6 +4232,7 @@ dependencies = [
"mutable_batch",
"mutable_batch_lp",
"once_cell",
"parquet_file",
"predicate",
"pretty_assertions",
"querier",

View File

@ -10,7 +10,6 @@ use crate::{
};
use arrow::record_batch::RecordBatch;
use backoff::{Backoff, BackoffConfig};
use bytes::Bytes;
use data_types::{
ParquetFile, ParquetFileId, ParquetFileWithMetadata, PartitionId, SequencerId, TableId,
Timestamp, Tombstone, TombstoneId,
@ -26,10 +25,10 @@ use iox_query::{
};
use iox_time::{Time, TimeProvider};
use metric::{Attributes, Metric, U64Counter, U64Gauge, U64Histogram, U64HistogramOptions};
use object_store::DynObjectStore;
use observability_deps::tracing::{debug, info, trace, warn};
use parquet_file::{
metadata::{IoxMetadata, IoxParquetMetaData},
storage::ParquetStorage,
ParquetFilePath,
};
use schema::sort::SortKey;
@ -141,7 +140,9 @@ pub enum Error {
},
#[snafu(display("Error writing to object store: {}", source))]
WritingToObjectStore { source: object_store::Error },
WritingToObjectStore {
source: parquet_file::storage::Error,
},
#[snafu(display("Error updating catalog {}", source))]
Update {
@ -189,7 +190,7 @@ pub struct Compactor {
/// Sequencers assigned to this compactor
sequencers: Vec<SequencerId>,
/// Object store for reading and persistence of parquet files
object_store: Arc<DynObjectStore>,
store: ParquetStorage,
/// The global catalog for schema, parquet files and tombstones
catalog: Arc<dyn Catalog>,
@ -234,7 +235,7 @@ impl Compactor {
pub fn new(
sequencers: Vec<SequencerId>,
catalog: Arc<dyn Catalog>,
object_store: Arc<DynObjectStore>,
store: ParquetStorage,
exec: Arc<Executor>,
time_provider: Arc<dyn TimeProvider>,
backoff_config: BackoffConfig,
@ -282,7 +283,7 @@ impl Compactor {
Self {
sequencers,
catalog,
object_store,
store,
exec,
time_provider,
backoff_config,
@ -485,7 +486,7 @@ impl Compactor {
info!("persisting file {}", meta.object_store_id);
let file_size_and_md = Backoff::new(&self.backoff_config)
.retry_all_errors("persist to object store", || {
Self::persist(&meta, data.clone(), Arc::clone(&self.object_store))
Self::persist(&meta, data.clone(), self.store.clone())
})
.await
.expect("retry forever");
@ -654,7 +655,7 @@ impl Compactor {
.iter()
.map(|f| {
f.to_queryable_parquet_chunk(
Arc::clone(&self.object_store),
self.store.clone(),
iox_metadata.table_name.to_string(),
iox_metadata.sort_key.clone(),
partition_sort_key.clone(),
@ -793,7 +794,7 @@ impl Compactor {
async fn persist(
metadata: &IoxMetadata,
record_batches: Vec<RecordBatch>,
object_store: Arc<DynObjectStore>,
store: ParquetStorage,
) -> Result<Option<(usize, IoxParquetMetaData)>> {
if record_batches.is_empty() {
return Ok(None);
@ -804,7 +805,7 @@ impl Compactor {
.expect("record_batches.is_empty was just checked")
.schema();
let data = parquet_file::storage::Storage::new(Arc::clone(&object_store))
let data = store
.parquet_bytes(record_batches, schema, metadata)
.await
.context(ConvertingToBytesSnafu)?;
@ -821,7 +822,6 @@ impl Compactor {
let data = Arc::try_unwrap(data).expect("dangling reference to data");
let file_size = data.len();
let bytes = Bytes::from(data);
let path = ParquetFilePath::new(
metadata.namespace_id,
@ -830,10 +830,9 @@ impl Compactor {
metadata.partition_id,
metadata.object_store_id,
);
let path = path.object_store_path();
object_store
.put(&path, bytes)
store
.to_object_store(data, &path)
.await
.context(WritingToObjectStoreSnafu)?;
@ -1169,7 +1168,7 @@ mod tests {
let compactor = Compactor::new(
vec![sequencer.sequencer.id],
Arc::clone(&catalog.catalog),
Arc::clone(&catalog.object_store),
ParquetStorage::new(Arc::clone(&catalog.object_store)),
Arc::new(Executor::new(1)),
Arc::new(SystemProvider::new()),
BackoffConfig::default(),
@ -1211,7 +1210,7 @@ mod tests {
catalog.metric_registry(),
usize::MAX,
)),
catalog.object_store(),
ParquetStorage::new(catalog.object_store()),
catalog.metric_registry(),
catalog.time_provider(),
);
@ -1295,7 +1294,7 @@ mod tests {
let compactor = Compactor::new(
vec![sequencer.sequencer.id],
Arc::clone(&catalog.catalog),
Arc::clone(&catalog.object_store),
ParquetStorage::new(Arc::clone(&catalog.object_store)),
Arc::new(Executor::new(1)),
Arc::new(SystemProvider::new()),
BackoffConfig::default(),
@ -1421,7 +1420,7 @@ mod tests {
catalog.metric_registry(),
usize::MAX,
)),
catalog.object_store(),
ParquetStorage::new(catalog.object_store()),
catalog.metric_registry(),
catalog.time_provider(),
);
@ -1485,7 +1484,7 @@ mod tests {
let compactor = Compactor::new(
vec![sequencer.sequencer.id],
Arc::clone(&catalog.catalog),
Arc::clone(&catalog.object_store),
ParquetStorage::new(Arc::clone(&catalog.object_store)),
Arc::new(Executor::new(1)),
Arc::new(SystemProvider::new()),
BackoffConfig::default(),
@ -1587,7 +1586,7 @@ mod tests {
let compactor = Compactor::new(
vec![sequencer.sequencer.id],
Arc::clone(&catalog.catalog),
Arc::clone(&catalog.object_store),
ParquetStorage::new(Arc::clone(&catalog.object_store)),
Arc::new(Executor::new(1)),
Arc::new(SystemProvider::new()),
BackoffConfig::default(),
@ -1696,7 +1695,7 @@ mod tests {
let compactor = Compactor::new(
vec![sequencer.sequencer.id],
Arc::clone(&catalog.catalog),
Arc::clone(&catalog.object_store),
ParquetStorage::new(Arc::clone(&catalog.object_store)),
Arc::new(Executor::new(1)),
Arc::new(SystemProvider::new()),
BackoffConfig::default(),
@ -1850,13 +1849,13 @@ mod tests {
};
let pc1 = pt1.to_queryable_parquet_chunk(
Arc::clone(&catalog.object_store),
ParquetStorage::new(Arc::clone(&catalog.object_store)),
table.table.name.clone(),
partition.partition.sort_key(),
partition.partition.sort_key(),
);
let pc2 = pt2.to_queryable_parquet_chunk(
Arc::clone(&catalog.object_store),
ParquetStorage::new(Arc::clone(&catalog.object_store)),
table.table.name.clone(),
partition.partition.sort_key(),
partition.partition.sort_key(),
@ -2216,7 +2215,7 @@ mod tests {
let compactor = Compactor::new(
vec![],
Arc::clone(&catalog.catalog),
Arc::clone(&catalog.object_store),
ParquetStorage::new(Arc::clone(&catalog.object_store)),
Arc::new(Executor::new(1)),
Arc::new(SystemProvider::new()),
BackoffConfig::default(),
@ -2421,7 +2420,7 @@ mod tests {
let compactor = Compactor::new(
vec![],
Arc::clone(&catalog.catalog),
Arc::clone(&catalog.object_store),
ParquetStorage::new(Arc::clone(&catalog.object_store)),
Arc::new(Executor::new(1)),
Arc::new(SystemProvider::new()),
BackoffConfig::default(),
@ -2485,12 +2484,13 @@ mod tests {
Compactor::persist(
&compacted_data.meta,
compacted_data.data,
Arc::clone(&compactor.object_store),
compactor.store.clone(),
)
.await
.unwrap();
let list = compactor.object_store.list(None).await.unwrap();
let object_store = catalog.object_store();
let list = object_store.list(None).await.unwrap();
let object_store_files: Vec<_> = list.try_collect().await.unwrap();
assert_eq!(object_store_files.len(), 1);
}
@ -2502,7 +2502,7 @@ mod tests {
let compactor = Compactor::new(
vec![],
Arc::clone(&catalog.catalog),
Arc::clone(&catalog.object_store),
ParquetStorage::new(Arc::clone(&catalog.object_store)),
Arc::new(Executor::new(1)),
Arc::new(SystemProvider::new()),
BackoffConfig::default(),
@ -2832,7 +2832,7 @@ mod tests {
let compactor = Compactor::new(
vec![sequencer.id],
Arc::clone(&catalog.catalog),
Arc::clone(&catalog.object_store),
ParquetStorage::new(Arc::clone(&catalog.object_store)),
Arc::new(Executor::new(1)),
Arc::new(SystemProvider::new()),
BackoffConfig::default(),
@ -2906,7 +2906,7 @@ mod tests {
let compactor = Compactor::new(
vec![sequencer.sequencer.id],
Arc::clone(&catalog.catalog),
Arc::clone(&catalog.object_store),
ParquetStorage::new(Arc::clone(&catalog.object_store)),
Arc::new(Executor::new(1)),
Arc::new(SystemProvider::new()),
BackoffConfig::default(),

View File

@ -10,8 +10,8 @@ use futures::{
use iox_catalog::interface::Catalog;
use iox_query::exec::Executor;
use iox_time::TimeProvider;
use object_store::DynObjectStore;
use observability_deps::tracing::*;
use parquet_file::storage::ParquetStorage;
use std::sync::Arc;
use thiserror::Error;
use tokio::task::{JoinError, JoinHandle};
@ -63,7 +63,7 @@ impl CompactorHandlerImpl {
pub fn new(
sequencers: Vec<SequencerId>,
catalog: Arc<dyn Catalog>,
object_store: Arc<DynObjectStore>,
store: ParquetStorage,
exec: Arc<Executor>,
time_provider: Arc<dyn TimeProvider>,
registry: Arc<metric::Registry>,
@ -72,7 +72,7 @@ impl CompactorHandlerImpl {
let compactor_data = Arc::new(Compactor::new(
sequencers,
catalog,
object_store,
store,
exec,
time_provider,
BackoffConfig::default(),

View File

@ -5,11 +5,11 @@ use arrow::record_batch::RecordBatch;
use data_types::{
ParquetFileId, ParquetFileParams, ParquetFileWithMetadata, Timestamp, Tombstone, TombstoneId,
};
use object_store::DynObjectStore;
use observability_deps::tracing::*;
use parquet_file::{
chunk::{new_parquet_chunk, ChunkMetrics, DecodedParquetFile},
chunk::{ChunkMetrics, DecodedParquetFile, ParquetChunk},
metadata::{IoxMetadata, IoxParquetMetaData},
storage::ParquetStorage,
};
use schema::sort::SortKey;
use std::{
@ -87,17 +87,17 @@ impl ParquetFileWithTombstone {
/// Convert to a QueryableParquetChunk
pub fn to_queryable_parquet_chunk(
&self,
object_store: Arc<DynObjectStore>,
store: ParquetStorage,
table_name: String,
sort_key: Option<SortKey>,
partition_sort_key: Option<SortKey>,
) -> QueryableParquetChunk {
let decoded_parquet_file = DecodedParquetFile::new((*self.data).clone());
let parquet_chunk = new_parquet_chunk(
let parquet_chunk = ParquetChunk::new(
&decoded_parquet_file,
ChunkMetrics::new_unregistered(), // TODO: need to add metrics
object_store,
store,
);
trace!(

View File

@ -24,6 +24,7 @@ use mutable_batch::MutableBatch;
use object_store::DynObjectStore;
use observability_deps::tracing::{debug, warn};
use parking_lot::RwLock;
use parquet_file::storage::ParquetStorage;
use predicate::Predicate;
use schema::{selection::Selection, Schema};
use snafu::{OptionExt, ResultExt, Snafu};
@ -111,7 +112,7 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
#[derive(Debug)]
pub struct IngesterData {
/// Object store for persistence of parquet files
object_store: Arc<DynObjectStore>,
store: ParquetStorage,
/// The global catalog for schema, parquet files and tombstones
catalog: Arc<dyn Catalog>,
@ -142,7 +143,7 @@ impl IngesterData {
backoff_config: BackoffConfig,
) -> Self {
Self {
object_store,
store: ParquetStorage::new(object_store),
catalog,
sequencers,
partitioner,
@ -302,11 +303,7 @@ impl Persister for IngesterData {
// save the compacted data to a parquet file in object storage
let file_size_and_md = Backoff::new(&self.backoff_config)
.retry_all_errors("persist to object store", || {
persist(
&iox_meta,
record_batches.to_vec(),
Arc::clone(&self.object_store),
)
persist(&iox_meta, record_batches.to_vec(), self.store.clone())
})
.await
.expect("retry forever");

View File

@ -1,10 +1,9 @@
//! Persist compacted data to parquet files in object storage
use arrow::record_batch::RecordBatch;
use bytes::Bytes;
use object_store::DynObjectStore;
use parquet_file::{
metadata::{IoxMetadata, IoxParquetMetaData},
storage::ParquetStorage,
ParquetFilePath,
};
use snafu::{ResultExt, Snafu};
@ -19,7 +18,9 @@ pub enum Error {
},
#[snafu(display("Error writing to object store: {}", source))]
WritingToObjectStore { source: object_store::Error },
WritingToObjectStore {
source: parquet_file::storage::Error,
},
}
/// A specialized `Error` for Ingester's persistence errors
@ -31,7 +32,7 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
pub async fn persist(
metadata: &IoxMetadata,
record_batches: Vec<RecordBatch>,
object_store: Arc<DynObjectStore>,
store: ParquetStorage,
) -> Result<Option<(usize, IoxParquetMetaData)>> {
if record_batches.is_empty() {
return Ok(None);
@ -41,7 +42,7 @@ pub async fn persist(
.expect("record_batches.is_empty was just checked")
.schema();
let data = parquet_file::storage::Storage::new(Arc::clone(&object_store))
let data = store
.parquet_bytes(record_batches, schema, metadata)
.await
.context(ConvertingToBytesSnafu)?;
@ -58,7 +59,6 @@ pub async fn persist(
let data = Arc::try_unwrap(data).expect("dangling reference to data");
let file_size = data.len();
let bytes = Bytes::from(data);
let path = ParquetFilePath::new(
metadata.namespace_id,
@ -68,10 +68,8 @@ pub async fn persist(
metadata.object_store_id,
);
let path = path.object_store_path();
object_store
.put(&path, bytes)
store
.to_object_store(data, &path)
.await
.context(WritingToObjectStoreSnafu)?;
@ -86,7 +84,7 @@ mod tests {
use iox_catalog::interface::INITIAL_COMPACTION_LEVEL;
use iox_query::test::{raw_data, TestChunk};
use iox_time::Time;
use object_store::memory::InMemory;
use object_store::{memory::InMemory, DynObjectStore};
use std::sync::Arc;
use uuid::Uuid;
@ -120,9 +118,13 @@ mod tests {
};
let object_store = object_store();
persist(&metadata, vec![], Arc::clone(&object_store))
.await
.unwrap();
persist(
&metadata,
vec![],
ParquetStorage::new(Arc::clone(&object_store)),
)
.await
.unwrap();
let mut list = object_store.list(None).await.unwrap();
assert!(list.next().await.is_none());
@ -162,9 +164,13 @@ mod tests {
let object_store = object_store();
persist(&metadata, batches, Arc::clone(&object_store))
.await
.unwrap();
persist(
&metadata,
batches,
ParquetStorage::new(Arc::clone(&object_store)),
)
.await
.unwrap();
let list = object_store.list(None).await.unwrap();
let obj_store_paths: Vec<_> = list.try_collect().await.unwrap();

View File

@ -4,7 +4,6 @@ use arrow::{
compute::{lexsort, SortColumn, SortOptions},
record_batch::RecordBatch,
};
use bytes::Bytes;
use data_types::{
Column, ColumnType, KafkaPartition, KafkaTopic, Namespace, ParquetFile, ParquetFileId,
ParquetFileParams, ParquetFileWithMetadata, Partition, PartitionId, QueryPool, SequenceNumber,
@ -21,6 +20,7 @@ use mutable_batch_lp::test_helpers::lp_to_mutable_batch;
use object_store::{memory::InMemory, DynObjectStore};
use parquet_file::{
metadata::{IoxMetadata, IoxParquetMetaData},
storage::ParquetStorage,
ParquetFilePath,
};
use schema::{
@ -530,7 +530,7 @@ impl TestPartition {
sort_key: Some(sort_key.clone()),
};
let (parquet_metadata_bin, real_file_size_bytes) = create_parquet_file(
Arc::clone(&self.catalog.object_store),
ParquetStorage::new(Arc::clone(&self.catalog.object_store)),
&metadata,
record_batch,
)
@ -646,13 +646,13 @@ async fn update_catalog_sort_key_if_needed(
/// Create parquet file and return thrift-encoded and zstd-compressed parquet metadata as well as the file size.
async fn create_parquet_file(
object_store: Arc<DynObjectStore>,
store: ParquetStorage,
metadata: &IoxMetadata,
record_batch: RecordBatch,
) -> (Vec<u8>, usize) {
let schema = record_batch.schema();
let data = parquet_file::storage::Storage::new(Arc::clone(&object_store))
let data = store
.parquet_bytes(vec![record_batch], schema, metadata)
.await
.unwrap();
@ -664,7 +664,6 @@ async fn create_parquet_file(
let data = Arc::try_unwrap(data).expect("dangling reference to data");
let file_size = data.len();
let bytes = Bytes::from(data);
let path = ParquetFilePath::new(
metadata.namespace_id,
@ -673,9 +672,8 @@ async fn create_parquet_file(
metadata.partition_id,
metadata.object_store_id,
);
let path = path.object_store_path();
object_store.put(&path, bytes).await.unwrap();
store.to_object_store(data, &path).await.unwrap();
(parquet_md, file_size)
}

View File

@ -23,3 +23,4 @@ async-trait = "0.1"
hyper = "0.14"
thiserror = "1.0.31"
workspace-hack = { path = "../workspace-hack"}
parquet_file = { version = "0.1.0", path = "../parquet_file" }

View File

@ -19,6 +19,7 @@ use ioxd_common::{
};
use metric::Registry;
use object_store::DynObjectStore;
use parquet_file::storage::ParquetStorage;
use std::{
fmt::{Debug, Display},
sync::Arc,
@ -160,6 +161,8 @@ pub async fn create_compactor_server_type(
}
txn.commit().await?;
let parquet_store = ParquetStorage::new(object_store);
let compactor_config = compactor::handler::CompactorConfig::new(
compactor_config.split_percentage,
compactor_config.max_concurrent_compaction_size_bytes,
@ -168,7 +171,7 @@ pub async fn create_compactor_server_type(
let compactor_handler = Arc::new(CompactorHandlerImpl::new(
sequencers,
catalog,
object_store,
parquet_store,
exec,
time_provider,
Arc::clone(&metric_registry),

View File

@ -27,6 +27,7 @@ hyper = "0.14"
tokio = { version = "1.18", features = ["macros", "net", "parking_lot", "rt-multi-thread", "signal", "sync", "time"] }
tonic = "0.7"
workspace-hack = { path = "../workspace-hack"}
parquet_file = { version = "0.1.0", path = "../parquet_file" }
[dev-dependencies]

View File

@ -10,6 +10,7 @@ use iox_query::exec::Executor;
use iox_time::TimeProvider;
use metric::Registry;
use object_store::DynObjectStore;
use parquet_file::storage::ParquetStorage;
use querier::{
create_ingester_connection, QuerierCatalogCache, QuerierDatabase, QuerierHandler,
QuerierHandlerImpl, QuerierServer,
@ -161,7 +162,7 @@ pub async fn create_querier_server_type(args: QuerierServerTypeArgs<'_>) -> Arc<
let database = Arc::new(QuerierDatabase::new(
catalog_cache,
Arc::clone(&args.metric_registry),
args.object_store,
ParquetStorage::new(args.object_store),
args.exec,
ingester_connection,
));

View File

@ -55,6 +55,7 @@ impl proto::namespace_service_server::NamespaceService for NamespaceServiceImpl
mod tests {
use generated_types::influxdata::iox::namespace::v1::namespace_service_server::NamespaceService;
use iox_tests::util::TestCatalog;
use parquet_file::storage::ParquetStorage;
use querier::{create_ingester_connection_for_testing, QuerierCatalogCache};
use super::*;
@ -72,7 +73,7 @@ mod tests {
let db = Arc::new(QuerierDatabase::new(
catalog_cache,
catalog.metric_registry(),
catalog.object_store(),
ParquetStorage::new(catalog.object_store()),
catalog.exec(),
create_ingester_connection_for_testing(),
));
@ -99,7 +100,7 @@ mod tests {
let db = Arc::new(QuerierDatabase::new(
catalog_cache,
catalog.metric_registry(),
catalog.object_store(),
ParquetStorage::new(catalog.object_store()),
catalog.exec(),
create_ingester_connection_for_testing(),
));

View File

@ -1,13 +1,12 @@
use crate::{
metadata::{DecodedIoxParquetMetaData, IoxMetadata, IoxParquetMetaData},
storage::Storage,
storage::ParquetStorage,
ParquetFilePath,
};
use data_types::{
ParquetFile, ParquetFileWithMetadata, Statistics, TableSummary, TimestampMinMax, TimestampRange,
};
use datafusion::physical_plan::SendableRecordBatchStream;
use object_store::DynObjectStore;
use observability_deps::tracing::*;
use predicate::Predicate;
use schema::{selection::Selection, Schema, TIME_COLUMN_NAME};
@ -88,7 +87,7 @@ pub struct ParquetChunk {
timestamp_min_max: Option<TimestampMinMax>,
/// Persists the parquet file within a database's relative path
object_store: Arc<DynObjectStore>,
store: ParquetStorage,
/// Path in the database's object store.
path: ParquetFilePath,
@ -107,61 +106,41 @@ pub struct ParquetChunk {
}
impl ParquetChunk {
/// Creates new chunk from given parquet metadata.
/// Create parquet chunk.
pub fn new(
path: &ParquetFilePath,
object_store: Arc<DynObjectStore>,
file_size_bytes: usize,
parquet_metadata: Arc<IoxParquetMetaData>,
decoded_parquet_file: &DecodedParquetFile,
metrics: ChunkMetrics,
) -> Result<Self> {
let decoded = parquet_metadata
store: ParquetStorage,
) -> Self {
let iox_metadata = &decoded_parquet_file.iox_metadata;
let path = ParquetFilePath::new(
iox_metadata.namespace_id,
iox_metadata.table_id,
iox_metadata.sequencer_id,
iox_metadata.partition_id,
iox_metadata.object_store_id,
);
let decoded = decoded_parquet_file
.parquet_metadata
.as_ref()
.decode()
.context(MetadataDecodeFailedSnafu { path })?;
let schema = decoded
.read_schema()
.context(SchemaReadFailedSnafu { path })?;
let columns = decoded
.read_statistics(&schema)
.context(StatisticsReadFailedSnafu { path })?;
.unwrap();
let schema = decoded.read_schema().unwrap();
let columns = decoded.read_statistics(&schema).unwrap();
let table_summary = TableSummary { columns };
let rows = decoded.row_count();
Ok(Self::new_from_parts(
Arc::new(table_summary),
schema,
path,
object_store,
file_size_bytes,
parquet_metadata,
rows,
metrics,
))
}
/// Creates a new chunk from given parts w/o parsing anything from the provided parquet
/// metadata.
#[allow(clippy::too_many_arguments)]
pub(crate) fn new_from_parts(
table_summary: Arc<TableSummary>,
schema: Arc<Schema>,
path: &ParquetFilePath,
object_store: Arc<DynObjectStore>,
file_size_bytes: usize,
parquet_metadata: Arc<IoxParquetMetaData>,
rows: usize,
metrics: ChunkMetrics,
) -> Self {
let timestamp_min_max = extract_range(&table_summary);
let file_size_bytes = decoded_parquet_file.parquet_file.file_size_bytes as usize;
Self {
table_summary,
table_summary: Arc::new(table_summary),
schema,
timestamp_min_max,
path: path.into(),
object_store,
store,
path,
file_size_bytes,
parquet_metadata,
parquet_metadata: Arc::clone(&decoded_parquet_file.parquet_metadata),
rows,
metrics,
}
@ -231,14 +210,14 @@ impl ParquetChunk {
selection: Selection<'_>,
) -> Result<SendableRecordBatchStream> {
trace!(path=?self.path, "fetching parquet data for filtered read");
Storage::read_filter(
predicate,
selection,
Arc::clone(&self.schema.as_arrow()),
self.path,
Arc::clone(&self.object_store),
)
.context(ReadParquetSnafu)
self.store
.read_filter(
predicate,
selection,
Arc::clone(&self.schema.as_arrow()),
self.path,
)
.context(ReadParquetSnafu)
}
/// The total number of rows in all row groups in this chunk.
@ -299,31 +278,3 @@ impl DecodedParquetFile {
}
}
}
/// Create parquet chunk.
pub fn new_parquet_chunk(
decoded_parquet_file: &DecodedParquetFile,
metrics: ChunkMetrics,
object_store: Arc<DynObjectStore>,
) -> ParquetChunk {
let iox_metadata = &decoded_parquet_file.iox_metadata;
let path = ParquetFilePath::new(
iox_metadata.namespace_id,
iox_metadata.table_id,
iox_metadata.sequencer_id,
iox_metadata.partition_id,
iox_metadata.object_store_id,
);
let parquet_file = &decoded_parquet_file.parquet_file;
let file_size_bytes = parquet_file.file_size_bytes as usize;
ParquetChunk::new(
&path,
object_store,
file_size_bytes,
Arc::clone(&decoded_parquet_file.parquet_metadata),
metrics,
)
.expect("cannot create chunk")
}

View File

@ -85,11 +85,11 @@ pub enum Error {
pub type Result<T, E = Error> = std::result::Result<T, E>;
#[derive(Debug, Clone)]
pub struct Storage {
pub struct ParquetStorage {
object_store: Arc<DynObjectStore>,
}
impl Storage {
impl ParquetStorage {
pub fn new(object_store: Arc<DynObjectStore>) -> Self {
Self { object_store }
}
@ -227,11 +227,11 @@ impl Storage {
}
pub fn read_filter(
&self,
_predicate: &Predicate,
selection: Selection<'_>,
schema: SchemaRef,
path: ParquetFilePath,
object_store: Arc<DynObjectStore>,
) -> Result<SendableRecordBatchStream> {
// Indices of columns in the schema needed to read
let projection: Vec<usize> = Self::column_indices(selection, Arc::clone(&schema));
@ -249,17 +249,20 @@ impl Storage {
// Run async dance here to make sure any error returned
// `download_and_scan_parquet` is sent back to the reader and
// not silently ignored
tokio::task::spawn_blocking(move || {
let download_result =
Self::download_and_scan_parquet(projection, path, object_store, tx.clone());
tokio::task::spawn_blocking({
let object_store = Arc::clone(&self.object_store);
move || {
let download_result =
Self::download_and_scan_parquet(projection, path, object_store, tx.clone());
// If there was an error returned from download_and_scan_parquet send it back to the receiver.
if let Err(e) = download_result {
warn!(error=%e, "Parquet download & scan failed");
let e = ArrowError::ExternalError(Box::new(e));
if let Err(e) = tx.blocking_send(ArrowResult::Err(e)) {
// if no one is listening, there is no one else to hear our screams
debug!(%e, "Error sending result of download function. Receiver is closed.");
// If there was an error returned from download_and_scan_parquet send it back to the receiver.
if let Err(e) = download_result {
warn!(error=%e, "Parquet download & scan failed");
let e = ArrowError::ExternalError(Box::new(e));
if let Err(e) = tx.blocking_send(ArrowResult::Err(e)) {
// if no one is listening, there is no one else to hear our screams
debug!(%e, "Error sending result of download function. Receiver is closed.");
}
}
}
});

View File

@ -10,9 +10,9 @@ use futures::StreamExt;
use iox_catalog::interface::Catalog;
use iox_query::{exec::IOxSessionContext, QueryChunk};
use iox_time::TimeProvider;
use object_store::DynObjectStore;
use parquet_file::chunk::{
new_parquet_chunk, ChunkMetrics as ParquetChunkMetrics, DecodedParquetFile, ParquetChunk,
use parquet_file::{
chunk::{ChunkMetrics as ParquetChunkMetrics, DecodedParquetFile, ParquetChunk},
storage::ParquetStorage,
};
use schema::{selection::Selection, sort::SortKey};
use std::sync::Arc;
@ -172,7 +172,7 @@ pub struct ParquetChunkAdapter {
catalog_cache: Arc<CatalogCache>,
/// Object store.
object_store: Arc<DynObjectStore>,
store: ParquetStorage,
/// Metric registry.
metric_registry: Arc<metric::Registry>,
@ -186,13 +186,13 @@ impl ParquetChunkAdapter {
/// Create new adapter with empty cache.
pub fn new(
catalog_cache: Arc<CatalogCache>,
object_store: Arc<DynObjectStore>,
store: ParquetStorage,
metric_registry: Arc<metric::Registry>,
time_provider: Arc<dyn TimeProvider>,
) -> Self {
Self {
catalog_cache,
object_store,
store,
metric_registry,
time_provider,
}
@ -217,10 +217,10 @@ impl ParquetChunkAdapter {
) -> Option<ParquetChunk> {
let metrics = ParquetChunkMetrics::new(self.metric_registry.as_ref());
Some(new_parquet_chunk(
Some(ParquetChunk::new(
decoded_parquet_file,
metrics,
Arc::clone(&self.object_store),
self.store.clone(),
))
}
@ -307,7 +307,7 @@ pub mod tests {
catalog.metric_registry(),
usize::MAX,
)),
catalog.object_store(),
ParquetStorage::new(catalog.object_store()),
catalog.metric_registry(),
catalog.time_provider(),
);

View File

@ -8,7 +8,7 @@ use async_trait::async_trait;
use backoff::{Backoff, BackoffConfig};
use data_types::Namespace;
use iox_query::exec::Executor;
use object_store::DynObjectStore;
use parquet_file::storage::ParquetStorage;
use service_common::QueryDatabaseProvider;
use std::sync::Arc;
@ -59,13 +59,13 @@ impl QuerierDatabase {
pub fn new(
catalog_cache: Arc<CatalogCache>,
metric_registry: Arc<metric::Registry>,
object_store: Arc<DynObjectStore>,
store: ParquetStorage,
exec: Arc<Executor>,
ingester_connection: Arc<dyn IngesterConnection>,
) -> Self {
let chunk_adapter = Arc::new(ParquetChunkAdapter::new(
Arc::clone(&catalog_cache),
Arc::clone(&object_store),
store,
Arc::clone(&metric_registry),
catalog_cache.time_provider(),
));
@ -144,7 +144,7 @@ mod tests {
let db = QuerierDatabase::new(
catalog_cache,
catalog.metric_registry(),
catalog.object_store(),
ParquetStorage::new(catalog.object_store()),
catalog.exec(),
create_ingester_connection_for_testing(),
);

View File

@ -134,6 +134,7 @@ mod tests {
use iox_query::exec::Executor;
use iox_time::{MockProvider, Time};
use object_store::memory::InMemory;
use parquet_file::storage::ParquetStorage;
use crate::{cache::CatalogCache, create_ingester_connection_for_testing};
@ -176,7 +177,7 @@ mod tests {
let database = Arc::new(QuerierDatabase::new(
catalog_cache,
metric_registry,
object_store,
ParquetStorage::new(object_store),
exec,
create_ingester_connection_for_testing(),
));

View File

@ -7,7 +7,7 @@ use crate::{
use backoff::BackoffConfig;
use data_types::{NamespaceId, NamespaceSchema};
use iox_query::exec::Executor;
use object_store::DynObjectStore;
use parquet_file::storage::ParquetStorage;
use schema::Schema;
use std::{collections::HashMap, sync::Arc};
@ -94,7 +94,7 @@ impl QuerierNamespace {
#[allow(clippy::too_many_arguments)]
pub fn new_testing(
catalog_cache: Arc<CatalogCache>,
object_store: Arc<DynObjectStore>,
store: ParquetStorage,
metric_registry: Arc<metric::Registry>,
name: Arc<str>,
schema: Arc<NamespaceSchema>,
@ -104,7 +104,7 @@ impl QuerierNamespace {
let time_provider = catalog_cache.time_provider();
let chunk_adapter = Arc::new(ParquetChunkAdapter::new(
catalog_cache,
object_store,
store,
metric_registry,
Arc::clone(&time_provider),
));

View File

@ -2,6 +2,7 @@ use std::sync::Arc;
use iox_catalog::interface::get_schema_by_name;
use iox_tests::util::TestNamespace;
use parquet_file::storage::ParquetStorage;
use crate::{create_ingester_connection_for_testing, QuerierCatalogCache};
@ -24,7 +25,7 @@ pub async fn querier_namespace(ns: &Arc<TestNamespace>) -> QuerierNamespace {
));
QuerierNamespace::new_testing(
catalog_cache,
ns.catalog.object_store(),
ParquetStorage::new(ns.catalog.object_store()),
ns.catalog.metric_registry(),
ns.namespace.name.clone().into(),
schema,

View File

@ -3,6 +3,7 @@ use std::sync::Arc;
use backoff::BackoffConfig;
use iox_catalog::interface::get_schema_by_name;
use iox_tests::util::{TestCatalog, TestTable};
use parquet_file::storage::ParquetStorage;
use schema::Schema;
use crate::{
@ -21,7 +22,7 @@ pub async fn querier_table(catalog: &Arc<TestCatalog>, table: &Arc<TestTable>) -
));
let chunk_adapter = Arc::new(ParquetChunkAdapter::new(
catalog_cache,
catalog.object_store(),
ParquetStorage::new(catalog.object_store()),
catalog.metric_registry(),
catalog.time_provider(),
));

View File

@ -28,6 +28,7 @@ querier = { path = "../querier" }
iox_query = { path = "../iox_query" }
pretty_assertions = "1.2.1"
workspace-hack = { path = "../workspace-hack"}
parquet_file = { version = "0.1.0", path = "../parquet_file" }
[dev-dependencies]
arrow = { version = "14.0.0", features = ["prettyprint"] }

View File

@ -25,6 +25,7 @@ use iox_tests::util::{TestCatalog, TestNamespace, TestSequencer};
use itertools::Itertools;
use mutable_batch::MutableBatch;
use mutable_batch_lp::LinesConverter;
use parquet_file::storage::ParquetStorage;
use querier::{
IngesterConnectionImpl, IngesterFlightClient, IngesterFlightClientError,
IngesterFlightClientQueryData, QuerierCatalogCache, QuerierNamespace,
@ -824,7 +825,7 @@ impl MockIngester {
Arc::new(QuerierNamespace::new_testing(
catalog_cache,
catalog.object_store(),
ParquetStorage::new(catalog.object_store()),
catalog.metric_registry(),
ns.namespace.name.clone().into(),
schema,