refactor: Remove first/last write times from ParquetFile chunks
parent
11b7755325
commit
af7866a638
|
@ -3,7 +3,7 @@ use crate::{
|
|||
storage::Storage,
|
||||
};
|
||||
use data_types::{
|
||||
partition_metadata::{Statistics, TableSummaryAndTimes},
|
||||
partition_metadata::{Statistics, TableSummary},
|
||||
timestamp::TimestampRange,
|
||||
};
|
||||
use datafusion::physical_plan::SendableRecordBatchStream;
|
||||
|
@ -85,7 +85,7 @@ pub struct ParquetChunk {
|
|||
partition_key: Arc<str>,
|
||||
|
||||
/// Meta data of the table
|
||||
table_summary: Arc<TableSummaryAndTimes>,
|
||||
table_summary: Arc<TableSummary>,
|
||||
|
||||
/// Schema that goes with this table's parquet file
|
||||
schema: Arc<Schema>,
|
||||
|
@ -128,8 +128,6 @@ impl ParquetChunk {
|
|||
|
||||
let IoxMetadata {
|
||||
table_name,
|
||||
time_of_first_write,
|
||||
time_of_last_write,
|
||||
partition_key,
|
||||
..
|
||||
} = iox_md;
|
||||
|
@ -142,11 +140,9 @@ impl ParquetChunk {
|
|||
.context(StatisticsReadFailed {
|
||||
path: &file_location,
|
||||
})?;
|
||||
let table_summary = TableSummaryAndTimes {
|
||||
let table_summary = TableSummary {
|
||||
name: table_name.to_string(),
|
||||
columns,
|
||||
time_of_first_write,
|
||||
time_of_last_write,
|
||||
};
|
||||
|
||||
Ok(Self::new_from_parts(
|
||||
|
@ -166,7 +162,7 @@ impl ParquetChunk {
|
|||
#[allow(clippy::too_many_arguments)]
|
||||
pub(crate) fn new_from_parts(
|
||||
partition_key: Arc<str>,
|
||||
table_summary: Arc<TableSummaryAndTimes>,
|
||||
table_summary: Arc<TableSummary>,
|
||||
schema: Arc<Schema>,
|
||||
file_location: Path,
|
||||
store: Arc<ObjectStore>,
|
||||
|
@ -200,7 +196,7 @@ impl ParquetChunk {
|
|||
}
|
||||
|
||||
/// Returns the summary statistics for this chunk
|
||||
pub fn table_summary(&self) -> &Arc<TableSummaryAndTimes> {
|
||||
pub fn table_summary(&self) -> &Arc<TableSummary> {
|
||||
&self.table_summary
|
||||
}
|
||||
|
||||
|
@ -286,7 +282,7 @@ impl ParquetChunk {
|
|||
}
|
||||
|
||||
/// Extracts min/max values of the timestamp column, from the TableSummary, if possible
|
||||
fn extract_range(table_summary: &TableSummaryAndTimes) -> Option<TimestampRange> {
|
||||
fn extract_range(table_summary: &TableSummary) -> Option<TimestampRange> {
|
||||
table_summary
|
||||
.column(TIME_COLUMN_NAME)
|
||||
.map(|c| {
|
||||
|
|
|
@ -14,9 +14,7 @@ use arrow::{
|
|||
use chrono::{TimeZone, Utc};
|
||||
use data_types::{
|
||||
chunk_metadata::ChunkAddr,
|
||||
partition_metadata::{
|
||||
ColumnSummary, InfluxDbType, StatValues, Statistics, TableSummaryAndTimes,
|
||||
},
|
||||
partition_metadata::{ColumnSummary, InfluxDbType, StatValues, Statistics, TableSummary},
|
||||
server_id::ServerId,
|
||||
};
|
||||
use datafusion::physical_plan::SendableRecordBatchStream;
|
||||
|
@ -132,11 +130,9 @@ pub async fn make_chunk_given_record_batch(
|
|||
let server_id = ServerId::new(NonZeroU32::new(1).unwrap());
|
||||
let storage = Storage::new(Arc::clone(&store), server_id);
|
||||
|
||||
let table_summary = TableSummaryAndTimes {
|
||||
let table_summary = TableSummary {
|
||||
name: addr.table_name.to_string(),
|
||||
columns: column_summaries,
|
||||
time_of_first_write: Utc.timestamp(30, 40),
|
||||
time_of_last_write: Utc.timestamp(50, 60),
|
||||
};
|
||||
let stream: SendableRecordBatchStream = if record_batches.is_empty() {
|
||||
Box::pin(MemoryStream::new_with_schema(
|
||||
|
|
|
@ -1939,7 +1939,7 @@ mod tests {
|
|||
.eq(1.0)
|
||||
.unwrap();
|
||||
|
||||
let expected_parquet_size = 703;
|
||||
let expected_parquet_size = 679;
|
||||
catalog_chunk_size_bytes_metric_eq(
|
||||
&test_db.metric_registry,
|
||||
"read_buffer",
|
||||
|
@ -2421,7 +2421,7 @@ mod tests {
|
|||
("svr_id", "10"),
|
||||
])
|
||||
.histogram()
|
||||
.sample_sum_eq(2316.0)
|
||||
.sample_sum_eq(2292.0)
|
||||
.unwrap();
|
||||
|
||||
// while MB and RB chunk are identical, the PQ chunk is a new one (split off)
|
||||
|
@ -2541,7 +2541,7 @@ mod tests {
|
|||
("svr_id", "10"),
|
||||
])
|
||||
.histogram()
|
||||
.sample_sum_eq(2316.0)
|
||||
.sample_sum_eq(2292.0)
|
||||
.unwrap();
|
||||
|
||||
// Unload RB chunk but keep it in OS
|
||||
|
@ -2571,7 +2571,7 @@ mod tests {
|
|||
("svr_id", "10"),
|
||||
])
|
||||
.histogram()
|
||||
.sample_sum_eq(703.0)
|
||||
.sample_sum_eq(679.0)
|
||||
.unwrap();
|
||||
|
||||
// Verify data written to the parquet file in object store
|
||||
|
@ -3179,7 +3179,7 @@ mod tests {
|
|||
id: 2,
|
||||
storage: ChunkStorage::ReadBufferAndObjectStore,
|
||||
lifecycle_action,
|
||||
memory_bytes: 3308, // size of RB and OS chunks
|
||||
memory_bytes: 3284, // size of RB and OS chunks
|
||||
object_store_bytes: 1523, // size of parquet file
|
||||
row_count: 2,
|
||||
time_of_last_access: None,
|
||||
|
@ -3231,7 +3231,7 @@ mod tests {
|
|||
|
||||
assert_eq!(db.catalog.metrics().memory().mutable_buffer(), 2486 + 87);
|
||||
assert_eq!(db.catalog.metrics().memory().read_buffer(), 2410);
|
||||
assert_eq!(db.catalog.metrics().memory().object_store(), 898);
|
||||
assert_eq!(db.catalog.metrics().memory().object_store(), 874);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
|
|
@ -343,23 +343,15 @@ impl CatalogChunk {
|
|||
pub(super) fn new_object_store_only(
|
||||
addr: ChunkAddr,
|
||||
chunk: Arc<parquet_file::chunk::ParquetChunk>,
|
||||
time_of_first_write: DateTime<Utc>,
|
||||
time_of_last_write: DateTime<Utc>,
|
||||
metrics: ChunkMetrics,
|
||||
) -> Self {
|
||||
assert_eq!(chunk.table_name(), addr.table_name.as_ref());
|
||||
|
||||
let summary = chunk.table_summary();
|
||||
let time_of_first_write = summary.time_of_first_write;
|
||||
let time_of_last_write = summary.time_of_last_write;
|
||||
|
||||
// this is temporary
|
||||
let table_summary = TableSummary {
|
||||
name: summary.name.clone(),
|
||||
columns: summary.columns.clone(),
|
||||
};
|
||||
|
||||
// Cache table summary + schema
|
||||
let meta = Arc::new(ChunkMetadata {
|
||||
table_summary: Arc::new(table_summary),
|
||||
table_summary: Arc::clone(chunk.table_summary()),
|
||||
schema: chunk.schema(),
|
||||
});
|
||||
|
||||
|
@ -1163,12 +1155,16 @@ mod tests {
|
|||
|
||||
async fn make_persisted_chunk() -> CatalogChunk {
|
||||
let addr = chunk_addr();
|
||||
let now = Utc::now();
|
||||
|
||||
// assemble ParquetChunk
|
||||
let parquet_chunk = make_parquet_chunk(addr.clone()).await;
|
||||
|
||||
CatalogChunk::new_object_store_only(
|
||||
addr,
|
||||
Arc::new(parquet_chunk),
|
||||
now,
|
||||
now,
|
||||
ChunkMetrics::new_unregistered(),
|
||||
)
|
||||
}
|
||||
|
|
|
@ -201,6 +201,8 @@ impl Partition {
|
|||
&mut self,
|
||||
chunk_id: u32,
|
||||
chunk: Arc<parquet_file::chunk::ParquetChunk>,
|
||||
time_of_first_write: DateTime<Utc>,
|
||||
time_of_last_write: DateTime<Utc>,
|
||||
) -> Arc<RwLock<CatalogChunk>> {
|
||||
assert_eq!(chunk.table_name(), self.table_name());
|
||||
|
||||
|
@ -211,6 +213,8 @@ impl Partition {
|
|||
.new_chunk_lock(CatalogChunk::new_object_store_only(
|
||||
addr,
|
||||
chunk,
|
||||
time_of_first_write,
|
||||
time_of_last_write,
|
||||
self.metrics.new_chunk_metrics(),
|
||||
)),
|
||||
);
|
||||
|
|
|
@ -229,7 +229,12 @@ impl CatalogState for Loader {
|
|||
let schema_handle = TableSchemaUpsertHandle::new(&table_schema, &parquet_chunk.schema())
|
||||
.map_err(|e| Box::new(e) as _)
|
||||
.context(SchemaError { path: info.path })?;
|
||||
partition.insert_object_store_only_chunk(iox_md.chunk_id, parquet_chunk);
|
||||
partition.insert_object_store_only_chunk(
|
||||
iox_md.chunk_id,
|
||||
parquet_chunk,
|
||||
iox_md.time_of_first_write,
|
||||
iox_md.time_of_last_write,
|
||||
);
|
||||
schema_handle.commit();
|
||||
|
||||
Ok(())
|
||||
|
|
Loading…
Reference in New Issue