refactor: remove IoxMetadata min & max timestamp

Removes the min/max timestamp fields from the IoxMetadata proto
structure embedded within a Parquet file's metadata.

These values are redundant as they already exist within the Parquet
column statistics, and precluded streaming serialisation as these
removed min/max values were needed before serialising the file.
pull/24376/head
Dom Dwyer 2022-05-23 11:06:47 +01:00
parent a142a9eb57
commit 2e6c49be83
10 changed files with 55 additions and 94 deletions

View File

@ -21,10 +21,9 @@ use iox_query::{
exec::{Executor, ExecutorType},
frontend::reorg::ReorgPlanner,
provider::overlap::group_potential_duplicates,
util::compute_timenanosecond_min_max,
QueryChunk,
};
use iox_time::{Time, TimeProvider};
use iox_time::TimeProvider;
use metric::{Attributes, Metric, U64Counter, U64Gauge, U64Histogram, U64HistogramOptions};
use observability_deps::tracing::{debug, info, trace, warn};
use parquet_file::{
@ -749,10 +748,6 @@ impl Compactor {
debug!("got {} rows from stream {}", row_count, i);
// Compute min and max of the `time` column
let (min_time, max_time) =
compute_timenanosecond_min_max(&output_batches).context(MinMaxSnafu)?;
let meta = IoxMetadata {
object_store_id: Uuid::new_v4(),
creation_timestamp: self.time_provider.now(),
@ -763,8 +758,6 @@ impl Compactor {
table_name: Arc::<str>::clone(&iox_metadata.table_name),
partition_id: iox_metadata.partition_id,
partition_key: Arc::<str>::clone(&iox_metadata.partition_key),
time_of_first_write: Time::from_timestamp_nanos(min_time),
time_of_last_write: Time::from_timestamp_nanos(max_time),
min_sequence_number,
max_sequence_number,
compaction_level: 1, // compacted result file always have level 1
@ -2401,8 +2394,6 @@ mod tests {
table_name: "temperature".into(),
partition_id: PartitionId::new(4),
partition_key: "somehour".into(),
time_of_first_write: compactor.time_provider.now(),
time_of_last_write: compactor.time_provider.now(),
min_sequence_number: SequenceNumber::new(5),
max_sequence_number: SequenceNumber::new(6),
compaction_level: 1, // level of compacted data is always 1
@ -2548,8 +2539,6 @@ mod tests {
table_name: "temperature".into(),
partition_id: PartitionId::new(4),
partition_key: "somehour".into(),
time_of_first_write: compactor.time_provider.now(),
time_of_last_write: compactor.time_provider.now(),
min_sequence_number: SequenceNumber::new(5),
max_sequence_number: SequenceNumber::new(6),
compaction_level: 1, // file level of compacted file is always 1

View File

@ -7,12 +7,12 @@ import "google/protobuf/timestamp.proto";
// IOx-specific metadata that will be serialized into the file-level key-value Parquet metadata
// under a single key.
message IoxMetadata {
// Removed as the Parquet metadata itself contains the row count, and
// specifying it here creates a dependency that prevents streaming
// serialisation (needing to know the number rows before you can serialise
// your parquet file with this metadata structure within it)
reserved 14;
reserved "row_count";
// Removed as the Parquet metadata itself contains the row count & min/max
// timestamps, and specifying them here creates a dependency that prevents
// streaming serialisation (needing to know the number rows before you can
// serialise your parquet file with this metadata structure within it)
reserved 10, 11, 14;
reserved "row_count", "time_of_first_write", "time_of_last_write";
// Object store ID. Used in the parquet filename. 16 bytes in big-endian order.
bytes object_store_id = 1;
@ -41,12 +41,6 @@ message IoxMetadata {
// Partition key of the partition that holds this parquet file.
string partition_key = 9;
// Wallclock timestamp of when the first data in this file was received by IOx.
google.protobuf.Timestamp time_of_first_write = 10;
// Wallclock timestamp of when the last data in this file was received by IOx.
google.protobuf.Timestamp time_of_last_write = 11;
// The minimum sequence number from a sequencer in this parquet file.
int64 min_sequence_number = 12;

View File

@ -8,10 +8,9 @@ use iox_catalog::interface::INITIAL_COMPACTION_LEVEL;
use iox_query::{
exec::{Executor, ExecutorType},
frontend::reorg::ReorgPlanner,
util::compute_timenanosecond_min_max,
QueryChunk, QueryChunkMeta,
};
use iox_time::{Time, TimeProvider};
use iox_time::TimeProvider;
use parquet_file::metadata::IoxMetadata;
use schema::sort::{adjust_sort_key_columns, compute_sort_key, SortKey};
use snafu::{ResultExt, Snafu};
@ -105,10 +104,6 @@ pub async fn compact_persisting_batch(
.filter(|b| b.num_rows() != 0)
.collect();
// Compute min and max of the `time` column
let (min_time, max_time) =
compute_timenanosecond_min_max(&output_batches).context(MinMaxSnafu)?;
// Compute min and max sequence numbers
let (min_seq, max_seq) = batch.data.min_max_sequence_numbers();
@ -122,8 +117,6 @@ pub async fn compact_persisting_batch(
table_name: Arc::from(table_name.as_str()),
partition_id: batch.partition_id,
partition_key: Arc::from(partition_key.as_str()),
time_of_first_write: Time::from_timestamp_nanos(min_time),
time_of_last_write: Time::from_timestamp_nanos(max_time),
min_sequence_number: min_seq,
max_sequence_number: max_seq,
compaction_level: INITIAL_COMPACTION_LEVEL,
@ -329,8 +322,6 @@ mod tests {
table_name,
partition_id,
partition_key,
8000,
20000,
seq_num_start,
seq_num_end,
INITIAL_COMPACTION_LEVEL,
@ -423,8 +414,6 @@ mod tests {
table_name,
partition_id,
partition_key,
28000,
220000,
seq_num_start,
seq_num_end,
INITIAL_COMPACTION_LEVEL,
@ -520,8 +509,6 @@ mod tests {
table_name,
partition_id,
partition_key,
28000,
220000,
seq_num_start,
seq_num_end,
INITIAL_COMPACTION_LEVEL,
@ -617,8 +604,6 @@ mod tests {
table_name,
partition_id,
partition_key,
28000,
220000,
seq_num_start,
seq_num_end,
INITIAL_COMPACTION_LEVEL,
@ -717,8 +702,6 @@ mod tests {
table_name,
partition_id,
partition_key,
28000,
220000,
seq_num_start,
seq_num_end,
INITIAL_COMPACTION_LEVEL,

View File

@ -73,8 +73,6 @@ mod tests {
table_name: "temperature".into(),
partition_id: PartitionId::new(4),
partition_key: "somehour".into(),
time_of_first_write: now(),
time_of_last_write: now(),
min_sequence_number: SequenceNumber::new(5),
max_sequence_number: SequenceNumber::new(6),
compaction_level: INITIAL_COMPACTION_LEVEL,
@ -106,8 +104,6 @@ mod tests {
table_name: "temperature".into(),
partition_id: PartitionId::new(4),
partition_key: "somehour".into(),
time_of_first_write: now(),
time_of_last_write: now(),
min_sequence_number: SequenceNumber::new(5),
max_sequence_number: SequenceNumber::new(6),
compaction_level: INITIAL_COMPACTION_LEVEL,

View File

@ -89,8 +89,6 @@ pub async fn make_persisting_batch_with_meta() -> (Arc<PersistingBatch>, Vec<Tom
table_name,
partition_id,
partition_key,
5,
7000,
seq_num_start,
seq_num_end,
INITIAL_COMPACTION_LEVEL,
@ -132,8 +130,6 @@ pub fn make_meta(
table_name: &str,
partition_id: i64,
partition_key: &str,
min_time: i64,
max_time: i64,
min_sequence_number: i64,
max_sequence_number: i64,
compaction_level: i16,
@ -149,8 +145,6 @@ pub fn make_meta(
table_name: Arc::from(table_name),
partition_id: PartitionId::new(partition_id),
partition_key: Arc::from(partition_key),
time_of_first_write: Time::from_timestamp_nanos(min_time),
time_of_last_write: Time::from_timestamp_nanos(max_time),
min_sequence_number: SequenceNumber::new(min_sequence_number),
max_sequence_number: SequenceNumber::new(max_sequence_number),
compaction_level,

View File

@ -517,8 +517,6 @@ impl TestPartition {
table_name: self.table.table.name.clone().into(),
partition_id: self.partition.id,
partition_key: self.partition.partition_key.clone().into(),
time_of_first_write: Time::from_timestamp_nanos(min_time),
time_of_last_write: Time::from_timestamp_nanos(max_time),
min_sequence_number,
max_sequence_number,
compaction_level: INITIAL_COMPACTION_LEVEL,

View File

@ -108,7 +108,7 @@ use parquet::{
use prost::Message;
use schema::{
sort::{SortKey, SortKeyBuilder},
InfluxColumnType, InfluxFieldType, Schema,
InfluxColumnType, InfluxFieldType, Schema, TIME_COLUMN_NAME,
};
use snafu::{ensure, OptionExt, ResultExt, Snafu};
use std::{convert::TryInto, sync::Arc};
@ -265,14 +265,6 @@ pub struct IoxMetadata {
/// parittion key of the data
pub partition_key: Arc<str>,
/// Time of the first write of the data
/// This is also the min value of the column `time`
pub time_of_first_write: Time,
/// Time of the last write of the data
/// This is also the max value of the column `time`
pub time_of_last_write: Time,
/// sequence number of the first write
pub min_sequence_number: SequenceNumber,
@ -310,8 +302,6 @@ impl IoxMetadata {
table_name: self.table_name.to_string(),
partition_id: self.partition_id.get(),
partition_key: self.partition_key.to_string(),
time_of_first_write: Some(self.time_of_first_write.date_time().into()),
time_of_last_write: Some(self.time_of_last_write.date_time().into()),
min_sequence_number: self.min_sequence_number.get(),
max_sequence_number: self.max_sequence_number.get(),
sort_key,
@ -334,12 +324,6 @@ impl IoxMetadata {
// extract creation timestamp
let creation_timestamp =
decode_timestamp_from_field(proto_msg.creation_timestamp, "creation_timestamp")?;
// extract time of first write
let time_of_first_write =
decode_timestamp_from_field(proto_msg.time_of_first_write, "time_of_first_write")?;
// extract time of last write
let time_of_last_write =
decode_timestamp_from_field(proto_msg.time_of_last_write, "time_of_last_write")?;
// extract strings
let namespace_name = Arc::from(proto_msg.namespace_name.as_ref());
@ -369,8 +353,6 @@ impl IoxMetadata {
table_name,
partition_id: PartitionId::new(proto_msg.partition_id),
partition_key,
time_of_first_write,
time_of_last_write,
min_sequence_number: SequenceNumber::new(proto_msg.min_sequence_number),
max_sequence_number: SequenceNumber::new(proto_msg.max_sequence_number),
sort_key,
@ -384,12 +366,52 @@ impl IoxMetadata {
}
/// Create a corresponding iox catalog's ParquetFile
///
/// # Panics
///
/// This method panics if the [`IoxParquetMetaData`] structure does not
/// contain valid metadata bytes, has no readable schema, or has no field
/// statistics.
///
/// A [`RecordBatch`] serialised without the embedded metadata found in the
/// IOx [`Schema`] type will cause a statistic resolution failure due to
/// lack of the IOx field type metadata for the time column. Batches
/// produced from the through the IOx write path always include this
/// metadata.
///
/// [`RecordBatch`]: arrow::record_batch::RecordBatch
pub fn to_parquet_file(
&self,
file_size_bytes: usize,
metadata: &IoxParquetMetaData,
) -> ParquetFileParams {
let row_count = metadata.decode().expect("invalid metadata").row_count();
let decoded = metadata.decode().expect("invalid IOx metadata");
let row_count = decoded.row_count();
// Derive the min/max timestamp from the Parquet column statistics.
let schema = decoded
.read_schema()
.expect("failed to read encoded schema");
let time_summary = decoded
.read_statistics(&*schema)
.expect("invalid statistics")
.into_iter()
.find(|v| v.name == TIME_COLUMN_NAME)
.expect("no time column in metadata statistics");
// Sanity check the type of this column before using the values.
assert_eq!(time_summary.influxdb_type, Some(InfluxDbType::Timestamp));
// Extract the min/max timestamps.
let (min_time, max_time) = match time_summary.stats {
Statistics::I64(stats) => {
let min = Timestamp::new(stats.min.expect("no min time statistic"));
let max = Timestamp::new(stats.max.expect("no max time statistic"));
(min, max)
}
_ => panic!("unexpected physical type for timestamp column"),
};
ParquetFileParams {
sequencer_id: self.sequencer_id,
namespace_id: self.namespace_id,
@ -398,8 +420,8 @@ impl IoxMetadata {
object_store_id: self.object_store_id,
min_sequence_number: self.min_sequence_number,
max_sequence_number: self.max_sequence_number,
min_time: Timestamp::new(self.time_of_first_write.timestamp_nanos()),
max_time: Timestamp::new(self.time_of_last_write.timestamp_nanos()),
min_time,
max_time,
file_size_bytes: file_size_bytes as i64,
parquet_metadata: metadata.thrift_bytes().to_vec(),
compaction_level: self.compaction_level,
@ -870,8 +892,6 @@ mod tests {
table_name: Arc::from("weather"),
partition_id: PartitionId::new(4),
partition_key: Arc::from("part"),
time_of_first_write: Time::from_timestamp(3234, 0),
time_of_last_write: Time::from_timestamp(3234, 3456),
min_sequence_number: SequenceNumber::new(5),
max_sequence_number: SequenceNumber::new(6),
compaction_level: 0,
@ -897,8 +917,6 @@ mod tests {
table_name: "platanos".into(),
partition_id: PartitionId::new(4),
partition_key: "potato".into(),
time_of_first_write: Time::from_timestamp_nanos(4242),
time_of_last_write: Time::from_timestamp_nanos(424242),
min_sequence_number: SequenceNumber::new(10),
max_sequence_number: SequenceNumber::new(11),
compaction_level: 1,

View File

@ -176,8 +176,6 @@ mod tests {
table_name: "platanos".into(),
partition_id: PartitionId::new(4),
partition_key: "potato".into(),
time_of_first_write: Time::from_timestamp_nanos(4242),
time_of_last_write: Time::from_timestamp_nanos(424242),
min_sequence_number: SequenceNumber::new(10),
max_sequence_number: SequenceNumber::new(11),
compaction_level: 1,

View File

@ -290,8 +290,6 @@ mod tests {
table_name: "platanos".into(),
partition_id: PartitionId::new(4),
partition_key: "potato".into(),
time_of_first_write: Time::from_timestamp_nanos(4242),
time_of_last_write: Time::from_timestamp_nanos(424242),
min_sequence_number: SequenceNumber::new(10),
max_sequence_number: SequenceNumber::new(11),
compaction_level: 1,

View File

@ -42,8 +42,6 @@ async fn test_decoded_iox_metadata() {
table_name: "platanos".into(),
partition_id: PartitionId::new(4),
partition_key: "potato".into(),
time_of_first_write: Time::from_timestamp_nanos(4242),
time_of_last_write: Time::from_timestamp_nanos(424242),
min_sequence_number: SequenceNumber::new(10),
max_sequence_number: SequenceNumber::new(11),
compaction_level: 1,
@ -92,6 +90,7 @@ async fn test_derive_parquet_file_params() {
let data = vec![
to_string_array(&["bananas", "platanos", "manzana"]),
to_timestamp_array(&[
// NOTE: not ordered to ensure min/max extracted, not head/tail
1646917692000000000,
1653311292000000000,
1647695292000000000,
@ -110,8 +109,6 @@ async fn test_derive_parquet_file_params() {
table_name: "platanos".into(),
partition_id: PartitionId::new(4),
partition_key: "potato".into(),
time_of_first_write: Time::from_timestamp_nanos(4242),
time_of_last_write: Time::from_timestamp_nanos(424242),
min_sequence_number: SequenceNumber::new(10),
max_sequence_number: SequenceNumber::new(11),
compaction_level: 1,
@ -156,12 +153,8 @@ async fn test_derive_parquet_file_params() {
assert_eq!(catalog_data.compaction_level, meta.compaction_level);
assert_eq!(catalog_data.created_at, Timestamp::new(1234));
assert_eq!(catalog_data.row_count, 3);
// NOTE: these DO NOT reflect the actual values! These values were not
// derived from the actual data, but instead trusted from the input
// IoxMetadata.
assert_eq!(catalog_data.min_time, Timestamp::new(4242));
assert_eq!(catalog_data.max_time, Timestamp::new(424242));
assert_eq!(catalog_data.min_time, Timestamp::new(1646917692000000000));
assert_eq!(catalog_data.max_time, Timestamp::new(1653311292000000000));
}
fn to_string_array(strs: &[&str]) -> ArrayRef {