feat: Thread time_of_{first,last}_write through Parquet metadata

pull/24376/head
Jake Goulding 2021-07-09 15:40:36 -04:00 committed by Carol (Nichols || Goulding)
parent 9604ce7084
commit d928bc84e6
11 changed files with 172 additions and 45 deletions

View File

@ -108,6 +108,23 @@ impl From<TableSummaryAndTimes> for TableSummary {
}
}
impl TableSummaryAndTimes {
pub fn size(&self) -> usize {
// Total size of all ColumnSummaries that belong to this table which include
// column names and their stats
let size: usize = self.columns.iter().map(|c| c.size()).sum();
size
+ self.name.len() // Add size of the table name
+ mem::size_of::<Self>() // Add size of this struct that points to
// table, ColumnSummary, and times
}
/// Get the column summary by name.
pub fn column(&self, name: &str) -> Option<&ColumnSummary> {
self.columns.iter().find(|c| c.name == name)
}
}
/// Metadata and statistics information for a table. This can be
/// either for the portion of a Table stored within a single chunk or
/// aggregated across chunks.

View File

@ -31,6 +31,12 @@ message IoxMetadata {
// Database checkpoint created at the time of the write.
DatabaseCheckpoint database_checkpoint = 7;
// Timestamp when this file was first written.
FixedSizeTimestamp time_of_first_write = 8;
// Timestamp when this file was last written.
FixedSizeTimestamp time_of_last_write = 9;
}
// Partition checkpoint.

View File

@ -1,6 +1,9 @@
use crate::{metadata::IoxParquetMetaData, storage::Storage};
use crate::{
metadata::{IoxMetadata, IoxParquetMetaData},
storage::Storage,
};
use data_types::{
partition_metadata::{Statistics, TableSummary},
partition_metadata::{Statistics, TableSummaryAndTimes},
timestamp::TimestampRange,
};
use datafusion::physical_plan::SendableRecordBatchStream;
@ -82,7 +85,7 @@ pub struct ParquetChunk {
partition_key: Arc<str>,
/// Meta data of the table
table_summary: Arc<TableSummary>,
table_summary: Arc<TableSummaryAndTimes>,
/// Schema that goes with this table's parquet file
schema: Arc<Schema>,
@ -122,6 +125,15 @@ impl ParquetChunk {
.context(IoxMetadataReadFailed {
path: &file_location,
})?;
let IoxMetadata {
table_name,
time_of_first_write,
time_of_last_write,
partition_key,
..
} = iox_md;
let schema = parquet_metadata.read_schema().context(SchemaReadFailed {
path: &file_location,
})?;
@ -130,14 +142,15 @@ impl ParquetChunk {
.context(StatisticsReadFailed {
path: &file_location,
})?;
let table_summary = TableSummary {
name: iox_md.table_name.to_string(),
let table_summary = TableSummaryAndTimes {
name: table_name.to_string(),
columns,
time_of_first_write,
time_of_last_write,
};
Ok(Self::new_from_parts(
iox_md.partition_key,
partition_key,
Arc::new(table_summary),
schema,
file_location,
@ -148,11 +161,12 @@ impl ParquetChunk {
))
}
/// Creates a new chunk from given parts w/o parsing anything from the provided parquet metadata.
/// Creates a new chunk from given parts w/o parsing anything from the provided parquet
/// metadata.
#[allow(clippy::too_many_arguments)]
pub(crate) fn new_from_parts(
partition_key: Arc<str>,
table_summary: Arc<TableSummary>,
table_summary: Arc<TableSummaryAndTimes>,
schema: Arc<Schema>,
file_location: Path,
store: Arc<ObjectStore>,
@ -186,7 +200,7 @@ impl ParquetChunk {
}
/// Returns the summary statistics for this chunk
pub fn table_summary(&self) -> &Arc<TableSummary> {
pub fn table_summary(&self) -> &Arc<TableSummaryAndTimes> {
&self.table_summary
}
@ -221,8 +235,7 @@ impl ParquetChunk {
}
}
// Return the columns names that belong to the given column
// selection
// Return the columns names that belong to the given column selection
pub fn column_names(&self, selection: Selection<'_>) -> Option<BTreeSet<String>> {
let fields = self.schema.inner().fields().iter();
@ -273,7 +286,7 @@ impl ParquetChunk {
}
/// Extracts min/max values of the timestamp column, from the TableSummary, if possible
fn extract_range(table_summary: &TableSummary) -> Option<TimestampRange> {
fn extract_range(table_summary: &TableSummaryAndTimes) -> Option<TimestampRange> {
table_summary
.column(TIME_COLUMN_NAME)
.map(|c| {

View File

@ -252,6 +252,9 @@ pub struct IoxMetadata {
/// Timestamp when this file was created.
pub creation_timestamp: DateTime<Utc>,
pub time_of_first_write: DateTime<Utc>, // TODO: METADATA_VERSION?
pub time_of_last_write: DateTime<Utc>,
/// Table that holds this parquet file.
pub table_name: Arc<str>,
@ -285,13 +288,14 @@ impl IoxMetadata {
}
// extract creation timestamp
let creation_timestamp: DateTime<Utc> = decode_timestamp(
proto_msg
.creation_timestamp
.context(IoxMetadataFieldMissing {
field: "creation_timestamp",
})?,
)?;
let creation_timestamp =
decode_timestamp_from_field(proto_msg.creation_timestamp, "creation_timestamp")?;
// extract time of first write
let time_of_first_write =
decode_timestamp_from_field(proto_msg.time_of_first_write, "time_of_first_write")?;
// extract time of last write
let time_of_last_write =
decode_timestamp_from_field(proto_msg.time_of_last_write, "time_of_last_write")?;
// extract strings
let table_name = Arc::from(proto_msg.table_name.as_ref());
@ -317,12 +321,9 @@ impl IoxMetadata {
}
})
.collect::<Result<BTreeMap<u32, OptionalMinMaxSequence>>>()?;
let min_unpersisted_timestamp = decode_timestamp(
proto_partition_checkpoint
.min_unpersisted_timestamp
.context(IoxMetadataFieldMissing {
field: "partition_checkpoint.min_unpersisted_timestamp",
})?,
let min_unpersisted_timestamp = decode_timestamp_from_field(
proto_partition_checkpoint.min_unpersisted_timestamp,
"partition_checkpoint.min_unpersisted_timestamp",
)?;
let partition_checkpoint = PartitionCheckpoint::new(
Arc::clone(&table_name),
@ -355,6 +356,8 @@ impl IoxMetadata {
Ok(Self {
creation_timestamp,
time_of_first_write,
time_of_last_write,
table_name,
partition_key,
chunk_id: proto_msg.chunk_id,
@ -407,6 +410,8 @@ impl IoxMetadata {
let proto_msg = proto::IoxMetadata {
version: METADATA_VERSION,
creation_timestamp: Some(encode_timestamp(self.creation_timestamp)),
time_of_first_write: Some(encode_timestamp(self.time_of_first_write)),
time_of_last_write: Some(encode_timestamp(self.time_of_last_write)),
table_name: self.table_name.to_string(),
partition_key: self.partition_key.to_string(),
chunk_id: self.chunk_id,
@ -439,6 +444,13 @@ fn decode_timestamp(ts: proto::FixedSizeTimestamp) -> Result<DateTime<Utc>> {
Ok(chrono::DateTime::<Utc>::from_utc(dt, Utc))
}
fn decode_timestamp_from_field(
value: Option<proto::FixedSizeTimestamp>,
field: &'static str,
) -> Result<DateTime<Utc>> {
decode_timestamp(value.context(IoxMetadataFieldMissing { field })?)
}
/// Parquet metadata with IOx-specific wrapper.
#[derive(Clone, Debug)]
pub struct IoxParquetMetaData {
@ -972,6 +984,8 @@ mod tests {
chunk_id: 1337,
partition_checkpoint,
database_checkpoint,
time_of_first_write: Utc::now(),
time_of_last_write: Utc::now(),
};
let proto_bytes = metadata.to_protobuf().unwrap();
@ -1020,10 +1034,12 @@ mod tests {
chunk_id: 1337,
partition_checkpoint,
database_checkpoint,
time_of_first_write: Utc::now(),
time_of_last_write: Utc::now(),
};
let proto_bytes = metadata.to_protobuf().unwrap();
assert_eq!(proto_bytes.len(), 56);
assert_eq!(proto_bytes.len(), 88);
}
}
}

View File

@ -473,6 +473,8 @@ mod tests {
chunk_id,
partition_checkpoint,
database_checkpoint,
time_of_first_write: Utc::now(),
time_of_last_write: Utc::now(),
};
let stream: SendableRecordBatchStream = Box::pin(MemoryStream::new(record_batches));
let (path, file_size_bytes, metadata) = storage

View File

@ -473,6 +473,8 @@ mod tests {
chunk_id: 1337,
partition_checkpoint,
database_checkpoint,
time_of_first_write: Utc::now(),
time_of_last_write: Utc::now(),
};
// create parquet file
@ -545,6 +547,8 @@ mod tests {
chunk_id,
partition_checkpoint,
database_checkpoint,
time_of_first_write: Utc::now(),
time_of_last_write: Utc::now(),
};
let (path, _file_size_bytes, _metadata) = storage

View File

@ -14,7 +14,9 @@ use arrow::{
use chrono::{TimeZone, Utc};
use data_types::{
chunk_metadata::ChunkAddr,
partition_metadata::{ColumnSummary, InfluxDbType, StatValues, Statistics, TableSummary},
partition_metadata::{
ColumnSummary, InfluxDbType, StatValues, Statistics, TableSummaryAndTimes,
},
server_id::ServerId,
};
use datafusion::physical_plan::SendableRecordBatchStream;
@ -130,8 +132,12 @@ pub async fn make_chunk_given_record_batch(
let server_id = ServerId::new(NonZeroU32::new(1).unwrap());
let storage = Storage::new(Arc::clone(&store), server_id);
let mut table_summary = TableSummary::new(addr.table_name.to_string());
table_summary.columns = column_summaries;
let table_summary = TableSummaryAndTimes {
name: addr.table_name.to_string(),
columns: column_summaries,
time_of_first_write: Utc.timestamp(30, 40),
time_of_last_write: Utc.timestamp(50, 60),
};
let stream: SendableRecordBatchStream = if record_batches.is_empty() {
Box::pin(MemoryStream::new_with_schema(
record_batches,
@ -151,6 +157,8 @@ pub async fn make_chunk_given_record_batch(
chunk_id: addr.chunk_id,
partition_checkpoint,
database_checkpoint,
time_of_first_write: Utc.timestamp(30, 40),
time_of_last_write: Utc.timestamp(50, 60),
};
let (path, file_size_bytes, parquet_metadata) = storage
.write_to_object_store(addr.clone(), stream, metadata)

View File

@ -2023,7 +2023,7 @@ mod tests {
.eq(1.0)
.unwrap();
let expected_parquet_size = 647;
let expected_parquet_size = 671;
catalog_chunk_size_bytes_metric_eq(
&test_db.metric_registry,
"read_buffer",
@ -2471,7 +2471,7 @@ mod tests {
("svr_id", "10"),
])
.histogram()
.sample_sum_eq(2284.0)
.sample_sum_eq(2308.0)
.unwrap();
// while MB and RB chunk are identical, the PQ chunk is a new one (split off)
@ -2591,7 +2591,7 @@ mod tests {
("svr_id", "10"),
])
.histogram()
.sample_sum_eq(2284.0)
.sample_sum_eq(2308.0)
.unwrap();
// Unload RB chunk but keep it in OS
@ -2621,7 +2621,7 @@ mod tests {
("svr_id", "10"),
])
.histogram()
.sample_sum_eq(647.0)
.sample_sum_eq(671.0)
.unwrap();
// Verify data written to the parquet file in object store
@ -3219,8 +3219,8 @@ mod tests {
2,
ChunkStorage::ReadBufferAndObjectStore,
lifecycle_action,
3260, // size of RB and OS chunks
1479, // size of parquet file
3284, // size of RB and OS chunks
1523, // size of parquet file
2,
),
ChunkSummary::new_without_timestamps(
@ -3248,14 +3248,15 @@ mod tests {
for (expected_summary, actual_summary) in expected.iter().zip(chunk_summaries.iter()) {
assert_eq!(
expected_summary, actual_summary,
"\n\nexpected:\n{:#?}\n\nactual:\n{:#?}\n\n",
expected_summary, actual_summary
"\n\nexpected item:\n{:#?}\n\nactual item:\n{:#?}\n\n\
all expected:\n{:#?}\n\nall actual:\n{:#?}",
expected_summary, actual_summary, expected, chunk_summaries
);
}
assert_eq!(db.catalog.metrics().memory().mutable_buffer(), 2446 + 87);
assert_eq!(db.catalog.metrics().memory().read_buffer(), 2434);
assert_eq!(db.catalog.metrics().memory().object_store(), 826);
assert_eq!(db.catalog.metrics().memory().object_store(), 850);
}
#[tokio::test]

View File

@ -336,9 +336,19 @@ impl CatalogChunk {
) -> Self {
assert_eq!(chunk.table_name(), addr.table_name.as_ref());
let summary = chunk.table_summary();
let first_write = summary.time_of_first_write;
let last_write = summary.time_of_last_write;
// this is temporary
let table_summary = TableSummary {
name: summary.name.clone(),
columns: summary.columns.clone(),
};
// Cache table summary + schema
let meta = Arc::new(ChunkMetadata {
table_summary: Arc::clone(chunk.table_summary()),
table_summary: Arc::new(table_summary),
schema: chunk.schema(),
});
@ -354,8 +364,8 @@ impl CatalogChunk {
lifecycle_action: None,
metrics,
access_recorder: Default::default(),
time_of_first_write: None,
time_of_last_write: None,
time_of_first_write: Some(first_write),
time_of_last_write: Some(last_write),
time_closed: None,
};
chunk.update_metrics();

View File

@ -207,7 +207,7 @@ impl DbChunk {
match &self.state {
State::MutableBuffer { chunk } => chunk.table_summary().time_of_first_write,
State::ReadBuffer { chunk, .. } => chunk.table_summary().time_of_first_write,
_ => unimplemented!(),
State::ParquetFile { chunk } => chunk.table_summary().time_of_first_write,
}
}
@ -215,7 +215,7 @@ impl DbChunk {
match &self.state {
State::MutableBuffer { chunk } => chunk.table_summary().time_of_last_write,
State::ReadBuffer { chunk, .. } => chunk.table_summary().time_of_last_write,
_ => unimplemented!(),
State::ParquetFile { chunk } => chunk.table_summary().time_of_last_write,
}
}
}
@ -495,7 +495,10 @@ impl QueryChunkMeta for DbChunk {
mod tests {
use super::*;
use crate::{
db::{catalog::chunk::CatalogChunk, test_helpers::write_lp},
db::{
catalog::chunk::{CatalogChunk, ChunkStage},
test_helpers::write_lp,
},
utils::make_db,
};
use data_types::chunk_metadata::ChunkStorage;
@ -588,7 +591,10 @@ mod tests {
async fn parquet_records_access() {
let db = make_db().await.db;
let before_creation = Utc::now();
write_lp(&db, "cpu,tag=1 bar=1 1").await;
let after_creation = Utc::now();
let id = db
.persist_partition(
"cpu",
@ -598,6 +604,7 @@ mod tests {
.await
.unwrap()
.id;
db.unload_read_buffer("cpu", "1970-01-01T00", id).unwrap();
let chunks = db.catalog.chunks();
@ -605,7 +612,45 @@ mod tests {
let chunk = chunks.into_iter().next().unwrap();
let chunk = chunk.read();
assert_eq!(chunk.storage().1, ChunkStorage::ObjectStoreOnly);
let first_write = chunk.time_of_first_write().unwrap();
let last_write = chunk.time_of_last_write().unwrap();
assert_eq!(first_write, last_write);
assert!(before_creation < first_write);
assert!(last_write < after_creation);
test_chunk_access(&chunk).await
}
#[tokio::test]
async fn parquet_snapshot() {
let db = make_db().await.db;
let before_creation = Utc::now();
write_lp(&db, "cpu,tag=1 bar=1 1").await;
let after_creation = Utc::now();
write_lp(&db, "cpu,tag=2 bar=2 2").await;
let after_write = Utc::now();
db.persist_partition(
"cpu",
"1970-01-01T00",
Instant::now() + Duration::from_secs(10000),
)
.await
.unwrap();
let chunks = db.catalog.chunks();
assert_eq!(chunks.len(), 1);
let chunk = chunks.into_iter().next().unwrap();
let chunk = chunk.read();
assert!(matches!(chunk.stage(), ChunkStage::Persisted { .. }));
let snapshot = DbChunk::parquet_file_snapshot(&chunk);
let first_write = snapshot.time_of_first_write();
let last_write = snapshot.time_of_last_write();
assert!(before_creation < first_write);
assert!(first_write < after_creation);
assert!(first_write < last_write);
assert!(last_write < after_write);
}
}

View File

@ -62,6 +62,9 @@ pub(super) fn write_chunk_to_object_store(
chunk.set_writing_to_object_store(&registration)?;
let db_chunk = DbChunk::snapshot(&*chunk);
let time_of_first_write = db_chunk.time_of_first_write();
let time_of_last_write = db_chunk.time_of_last_write();
debug!(chunk=%chunk.addr(), "chunk marked WRITING , loading tables into object store");
// Drop locks
@ -118,6 +121,8 @@ pub(super) fn write_chunk_to_object_store(
chunk_id: addr.chunk_id,
partition_checkpoint,
database_checkpoint,
time_of_first_write,
time_of_last_write,
};
let (path, file_size_bytes, parquet_metadata) = storage
.write_to_object_store(addr, stream, metadata)