Merge pull request #4662 from influxdata/dom/meta-remove-row-count
refactor: do not embed row count & min/max timestamps in IOxMetadatapull/24376/head
commit
5239417925
|
@ -21,10 +21,9 @@ use iox_query::{
|
|||
exec::{Executor, ExecutorType},
|
||||
frontend::reorg::ReorgPlanner,
|
||||
provider::overlap::group_potential_duplicates,
|
||||
util::compute_timenanosecond_min_max,
|
||||
QueryChunk,
|
||||
};
|
||||
use iox_time::{Time, TimeProvider};
|
||||
use iox_time::TimeProvider;
|
||||
use metric::{Attributes, Metric, U64Counter, U64Gauge, U64Histogram, U64HistogramOptions};
|
||||
use observability_deps::tracing::{debug, info, trace, warn};
|
||||
use parquet_file::{
|
||||
|
@ -746,16 +745,8 @@ impl Compactor {
|
|||
);
|
||||
|
||||
let row_count: usize = output_batches.iter().map(|b| b.num_rows()).sum();
|
||||
let row_count = row_count.try_into().context(RowCountTypeConversionSnafu)?;
|
||||
|
||||
debug!("got {} rows from stream {}", row_count, i);
|
||||
if row_count == 0 {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Compute min and max of the `time` column
|
||||
let (min_time, max_time) =
|
||||
compute_timenanosecond_min_max(&output_batches).context(MinMaxSnafu)?;
|
||||
|
||||
let meta = IoxMetadata {
|
||||
object_store_id: Uuid::new_v4(),
|
||||
|
@ -767,11 +758,8 @@ impl Compactor {
|
|||
table_name: Arc::<str>::clone(&iox_metadata.table_name),
|
||||
partition_id: iox_metadata.partition_id,
|
||||
partition_key: Arc::<str>::clone(&iox_metadata.partition_key),
|
||||
time_of_first_write: Time::from_timestamp_nanos(min_time),
|
||||
time_of_last_write: Time::from_timestamp_nanos(max_time),
|
||||
min_sequence_number,
|
||||
max_sequence_number,
|
||||
row_count,
|
||||
compaction_level: 1, // compacted result file always have level 1
|
||||
sort_key: Some(sort_key.clone()),
|
||||
};
|
||||
|
@ -2406,11 +2394,8 @@ mod tests {
|
|||
table_name: "temperature".into(),
|
||||
partition_id: PartitionId::new(4),
|
||||
partition_key: "somehour".into(),
|
||||
time_of_first_write: compactor.time_provider.now(),
|
||||
time_of_last_write: compactor.time_provider.now(),
|
||||
min_sequence_number: SequenceNumber::new(5),
|
||||
max_sequence_number: SequenceNumber::new(6),
|
||||
row_count: 3,
|
||||
compaction_level: 1, // level of compacted data is always 1
|
||||
sort_key: Some(SortKey::from_columns(["tag1", "time"])),
|
||||
};
|
||||
|
@ -2554,11 +2539,8 @@ mod tests {
|
|||
table_name: "temperature".into(),
|
||||
partition_id: PartitionId::new(4),
|
||||
partition_key: "somehour".into(),
|
||||
time_of_first_write: compactor.time_provider.now(),
|
||||
time_of_last_write: compactor.time_provider.now(),
|
||||
min_sequence_number: SequenceNumber::new(5),
|
||||
max_sequence_number: SequenceNumber::new(6),
|
||||
row_count: 3,
|
||||
compaction_level: 1, // file level of compacted file is always 1
|
||||
sort_key: None,
|
||||
};
|
||||
|
|
|
@ -7,6 +7,13 @@ import "google/protobuf/timestamp.proto";
|
|||
// IOx-specific metadata that will be serialized into the file-level key-value Parquet metadata
|
||||
// under a single key.
|
||||
message IoxMetadata {
|
||||
// Removed as the Parquet metadata itself contains the row count & min/max
|
||||
// timestamps, and specifying them here creates a dependency that prevents
|
||||
// streaming serialisation (needing to know the number rows before you can
|
||||
// serialise your parquet file with this metadata structure within it)
|
||||
reserved 10, 11, 14;
|
||||
reserved "row_count", "time_of_first_write", "time_of_last_write";
|
||||
|
||||
// Object store ID. Used in the parquet filename. 16 bytes in big-endian order.
|
||||
bytes object_store_id = 1;
|
||||
|
||||
|
@ -34,21 +41,12 @@ message IoxMetadata {
|
|||
// Partition key of the partition that holds this parquet file.
|
||||
string partition_key = 9;
|
||||
|
||||
// Wallclock timestamp of when the first data in this file was received by IOx.
|
||||
google.protobuf.Timestamp time_of_first_write = 10;
|
||||
|
||||
// Wallclock timestamp of when the last data in this file was received by IOx.
|
||||
google.protobuf.Timestamp time_of_last_write = 11;
|
||||
|
||||
// The minimum sequence number from a sequencer in this parquet file.
|
||||
int64 min_sequence_number = 12;
|
||||
|
||||
// The maximum sequence number from a sequencer in this parquet file.
|
||||
int64 max_sequence_number = 13;
|
||||
|
||||
// Number of rows of data in this file.
|
||||
int64 row_count = 14;
|
||||
|
||||
// The sort key of this chunk
|
||||
SortKey sort_key = 15;
|
||||
|
||||
|
|
|
@ -8,10 +8,9 @@ use iox_catalog::interface::INITIAL_COMPACTION_LEVEL;
|
|||
use iox_query::{
|
||||
exec::{Executor, ExecutorType},
|
||||
frontend::reorg::ReorgPlanner,
|
||||
util::compute_timenanosecond_min_max,
|
||||
QueryChunk, QueryChunkMeta,
|
||||
};
|
||||
use iox_time::{Time, TimeProvider};
|
||||
use iox_time::TimeProvider;
|
||||
use parquet_file::metadata::IoxMetadata;
|
||||
use schema::sort::{adjust_sort_key_columns, compute_sort_key, SortKey};
|
||||
use snafu::{ResultExt, Snafu};
|
||||
|
@ -105,13 +104,6 @@ pub async fn compact_persisting_batch(
|
|||
.filter(|b| b.num_rows() != 0)
|
||||
.collect();
|
||||
|
||||
let row_count: usize = output_batches.iter().map(|b| b.num_rows()).sum();
|
||||
let row_count = row_count.try_into().context(RowCountTypeConversionSnafu)?;
|
||||
|
||||
// Compute min and max of the `time` column
|
||||
let (min_time, max_time) =
|
||||
compute_timenanosecond_min_max(&output_batches).context(MinMaxSnafu)?;
|
||||
|
||||
// Compute min and max sequence numbers
|
||||
let (min_seq, max_seq) = batch.data.min_max_sequence_numbers();
|
||||
|
||||
|
@ -125,11 +117,8 @@ pub async fn compact_persisting_batch(
|
|||
table_name: Arc::from(table_name.as_str()),
|
||||
partition_id: batch.partition_id,
|
||||
partition_key: Arc::from(partition_key.as_str()),
|
||||
time_of_first_write: Time::from_timestamp_nanos(min_time),
|
||||
time_of_last_write: Time::from_timestamp_nanos(max_time),
|
||||
min_sequence_number: min_seq,
|
||||
max_sequence_number: max_seq,
|
||||
row_count,
|
||||
compaction_level: INITIAL_COMPACTION_LEVEL,
|
||||
sort_key: Some(metadata_sort_key),
|
||||
};
|
||||
|
@ -333,11 +322,8 @@ mod tests {
|
|||
table_name,
|
||||
partition_id,
|
||||
partition_key,
|
||||
8000,
|
||||
20000,
|
||||
seq_num_start,
|
||||
seq_num_end,
|
||||
3,
|
||||
INITIAL_COMPACTION_LEVEL,
|
||||
Some(SortKey::from_columns(["tag1", "time"])),
|
||||
);
|
||||
|
@ -428,11 +414,8 @@ mod tests {
|
|||
table_name,
|
||||
partition_id,
|
||||
partition_key,
|
||||
28000,
|
||||
220000,
|
||||
seq_num_start,
|
||||
seq_num_end,
|
||||
4,
|
||||
INITIAL_COMPACTION_LEVEL,
|
||||
// Sort key should now be set
|
||||
Some(SortKey::from_columns(["tag1", "tag3", "time"])),
|
||||
|
@ -526,11 +509,8 @@ mod tests {
|
|||
table_name,
|
||||
partition_id,
|
||||
partition_key,
|
||||
28000,
|
||||
220000,
|
||||
seq_num_start,
|
||||
seq_num_end,
|
||||
4,
|
||||
INITIAL_COMPACTION_LEVEL,
|
||||
// The sort key in the metadata should be the same as specified (that is, not
|
||||
// recomputed)
|
||||
|
@ -624,11 +604,8 @@ mod tests {
|
|||
table_name,
|
||||
partition_id,
|
||||
partition_key,
|
||||
28000,
|
||||
220000,
|
||||
seq_num_start,
|
||||
seq_num_end,
|
||||
4,
|
||||
INITIAL_COMPACTION_LEVEL,
|
||||
// The sort key in the metadata should be updated to include the new column just before
|
||||
// the time column
|
||||
|
@ -725,11 +702,8 @@ mod tests {
|
|||
table_name,
|
||||
partition_id,
|
||||
partition_key,
|
||||
28000,
|
||||
220000,
|
||||
seq_num_start,
|
||||
seq_num_end,
|
||||
4,
|
||||
INITIAL_COMPACTION_LEVEL,
|
||||
// The sort key in the metadata should only contain the columns in this file
|
||||
Some(SortKey::from_columns(["tag3", "tag1", "time"])),
|
||||
|
|
|
@ -73,11 +73,8 @@ mod tests {
|
|||
table_name: "temperature".into(),
|
||||
partition_id: PartitionId::new(4),
|
||||
partition_key: "somehour".into(),
|
||||
time_of_first_write: now(),
|
||||
time_of_last_write: now(),
|
||||
min_sequence_number: SequenceNumber::new(5),
|
||||
max_sequence_number: SequenceNumber::new(6),
|
||||
row_count: 0,
|
||||
compaction_level: INITIAL_COMPACTION_LEVEL,
|
||||
sort_key: None,
|
||||
};
|
||||
|
@ -107,11 +104,8 @@ mod tests {
|
|||
table_name: "temperature".into(),
|
||||
partition_id: PartitionId::new(4),
|
||||
partition_key: "somehour".into(),
|
||||
time_of_first_write: now(),
|
||||
time_of_last_write: now(),
|
||||
min_sequence_number: SequenceNumber::new(5),
|
||||
max_sequence_number: SequenceNumber::new(6),
|
||||
row_count: 3,
|
||||
compaction_level: INITIAL_COMPACTION_LEVEL,
|
||||
sort_key: None,
|
||||
};
|
||||
|
|
|
@ -64,8 +64,6 @@ pub async fn make_persisting_batch_with_meta() -> (Arc<PersistingBatch>, Vec<Tom
|
|||
let namespace_id = 1;
|
||||
let table_id = 1;
|
||||
let partition_id = 1;
|
||||
let row_count: usize = batches.iter().map(|b| b.num_rows()).sum();
|
||||
let row_count = row_count.try_into().unwrap();
|
||||
|
||||
// make the persisting batch
|
||||
let persisting_batch = make_persisting_batch(
|
||||
|
@ -91,11 +89,8 @@ pub async fn make_persisting_batch_with_meta() -> (Arc<PersistingBatch>, Vec<Tom
|
|||
table_name,
|
||||
partition_id,
|
||||
partition_key,
|
||||
5,
|
||||
7000,
|
||||
seq_num_start,
|
||||
seq_num_end,
|
||||
row_count,
|
||||
INITIAL_COMPACTION_LEVEL,
|
||||
Some(SortKey::from_columns(vec!["tag1", "tag2", "time"])),
|
||||
);
|
||||
|
@ -135,11 +130,8 @@ pub fn make_meta(
|
|||
table_name: &str,
|
||||
partition_id: i64,
|
||||
partition_key: &str,
|
||||
min_time: i64,
|
||||
max_time: i64,
|
||||
min_sequence_number: i64,
|
||||
max_sequence_number: i64,
|
||||
row_count: i64,
|
||||
compaction_level: i16,
|
||||
sort_key: Option<SortKey>,
|
||||
) -> IoxMetadata {
|
||||
|
@ -153,11 +145,8 @@ pub fn make_meta(
|
|||
table_name: Arc::from(table_name),
|
||||
partition_id: PartitionId::new(partition_id),
|
||||
partition_key: Arc::from(partition_key),
|
||||
time_of_first_write: Time::from_timestamp_nanos(min_time),
|
||||
time_of_last_write: Time::from_timestamp_nanos(max_time),
|
||||
min_sequence_number: SequenceNumber::new(min_sequence_number),
|
||||
max_sequence_number: SequenceNumber::new(max_sequence_number),
|
||||
row_count,
|
||||
compaction_level,
|
||||
sort_key,
|
||||
}
|
||||
|
|
|
@ -517,11 +517,8 @@ impl TestPartition {
|
|||
table_name: self.table.table.name.clone().into(),
|
||||
partition_id: self.partition.id,
|
||||
partition_key: self.partition.partition_key.clone().into(),
|
||||
time_of_first_write: Time::from_timestamp_nanos(min_time),
|
||||
time_of_last_write: Time::from_timestamp_nanos(max_time),
|
||||
min_sequence_number,
|
||||
max_sequence_number,
|
||||
row_count: row_count as i64,
|
||||
compaction_level: INITIAL_COMPACTION_LEVEL,
|
||||
sort_key: Some(sort_key.clone()),
|
||||
};
|
||||
|
|
|
@ -108,7 +108,7 @@ use parquet::{
|
|||
use prost::Message;
|
||||
use schema::{
|
||||
sort::{SortKey, SortKeyBuilder},
|
||||
InfluxColumnType, InfluxFieldType, Schema,
|
||||
InfluxColumnType, InfluxFieldType, Schema, TIME_COLUMN_NAME,
|
||||
};
|
||||
use snafu::{ensure, OptionExt, ResultExt, Snafu};
|
||||
use std::{convert::TryInto, sync::Arc};
|
||||
|
@ -265,23 +265,12 @@ pub struct IoxMetadata {
|
|||
/// parittion key of the data
|
||||
pub partition_key: Arc<str>,
|
||||
|
||||
/// Time of the first write of the data
|
||||
/// This is also the min value of the column `time`
|
||||
pub time_of_first_write: Time,
|
||||
|
||||
/// Time of the last write of the data
|
||||
/// This is also the max value of the column `time`
|
||||
pub time_of_last_write: Time,
|
||||
|
||||
/// sequence number of the first write
|
||||
pub min_sequence_number: SequenceNumber,
|
||||
|
||||
/// sequence number of the last write
|
||||
pub max_sequence_number: SequenceNumber,
|
||||
|
||||
/// number of rows of data
|
||||
pub row_count: i64,
|
||||
|
||||
/// the compaction level of the file
|
||||
pub compaction_level: i16,
|
||||
|
||||
|
@ -313,11 +302,8 @@ impl IoxMetadata {
|
|||
table_name: self.table_name.to_string(),
|
||||
partition_id: self.partition_id.get(),
|
||||
partition_key: self.partition_key.to_string(),
|
||||
time_of_first_write: Some(self.time_of_first_write.date_time().into()),
|
||||
time_of_last_write: Some(self.time_of_last_write.date_time().into()),
|
||||
min_sequence_number: self.min_sequence_number.get(),
|
||||
max_sequence_number: self.max_sequence_number.get(),
|
||||
row_count: self.row_count,
|
||||
sort_key,
|
||||
compaction_level: self.compaction_level as i32,
|
||||
};
|
||||
|
@ -338,12 +324,6 @@ impl IoxMetadata {
|
|||
// extract creation timestamp
|
||||
let creation_timestamp =
|
||||
decode_timestamp_from_field(proto_msg.creation_timestamp, "creation_timestamp")?;
|
||||
// extract time of first write
|
||||
let time_of_first_write =
|
||||
decode_timestamp_from_field(proto_msg.time_of_first_write, "time_of_first_write")?;
|
||||
// extract time of last write
|
||||
let time_of_last_write =
|
||||
decode_timestamp_from_field(proto_msg.time_of_last_write, "time_of_last_write")?;
|
||||
|
||||
// extract strings
|
||||
let namespace_name = Arc::from(proto_msg.namespace_name.as_ref());
|
||||
|
@ -373,11 +353,8 @@ impl IoxMetadata {
|
|||
table_name,
|
||||
partition_id: PartitionId::new(proto_msg.partition_id),
|
||||
partition_key,
|
||||
time_of_first_write,
|
||||
time_of_last_write,
|
||||
min_sequence_number: SequenceNumber::new(proto_msg.min_sequence_number),
|
||||
max_sequence_number: SequenceNumber::new(proto_msg.max_sequence_number),
|
||||
row_count: proto_msg.row_count,
|
||||
sort_key,
|
||||
compaction_level: proto_msg.compaction_level as i16,
|
||||
})
|
||||
|
@ -389,11 +366,52 @@ impl IoxMetadata {
|
|||
}
|
||||
|
||||
/// Create a corresponding iox catalog's ParquetFile
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// This method panics if the [`IoxParquetMetaData`] structure does not
|
||||
/// contain valid metadata bytes, has no readable schema, or has no field
|
||||
/// statistics.
|
||||
///
|
||||
/// A [`RecordBatch`] serialised without the embedded metadata found in the
|
||||
/// IOx [`Schema`] type will cause a statistic resolution failure due to
|
||||
/// lack of the IOx field type metadata for the time column. Batches
|
||||
/// produced from the through the IOx write path always include this
|
||||
/// metadata.
|
||||
///
|
||||
/// [`RecordBatch`]: arrow::record_batch::RecordBatch
|
||||
pub fn to_parquet_file(
|
||||
&self,
|
||||
file_size_bytes: usize,
|
||||
metadata: &IoxParquetMetaData,
|
||||
) -> ParquetFileParams {
|
||||
let decoded = metadata.decode().expect("invalid IOx metadata");
|
||||
let row_count = decoded.row_count();
|
||||
|
||||
// Derive the min/max timestamp from the Parquet column statistics.
|
||||
let schema = decoded
|
||||
.read_schema()
|
||||
.expect("failed to read encoded schema");
|
||||
let time_summary = decoded
|
||||
.read_statistics(&*schema)
|
||||
.expect("invalid statistics")
|
||||
.into_iter()
|
||||
.find(|v| v.name == TIME_COLUMN_NAME)
|
||||
.expect("no time column in metadata statistics");
|
||||
|
||||
// Sanity check the type of this column before using the values.
|
||||
assert_eq!(time_summary.influxdb_type, Some(InfluxDbType::Timestamp));
|
||||
|
||||
// Extract the min/max timestamps.
|
||||
let (min_time, max_time) = match time_summary.stats {
|
||||
Statistics::I64(stats) => {
|
||||
let min = Timestamp::new(stats.min.expect("no min time statistic"));
|
||||
let max = Timestamp::new(stats.max.expect("no max time statistic"));
|
||||
(min, max)
|
||||
}
|
||||
_ => panic!("unexpected physical type for timestamp column"),
|
||||
};
|
||||
|
||||
ParquetFileParams {
|
||||
sequencer_id: self.sequencer_id,
|
||||
namespace_id: self.namespace_id,
|
||||
|
@ -402,12 +420,12 @@ impl IoxMetadata {
|
|||
object_store_id: self.object_store_id,
|
||||
min_sequence_number: self.min_sequence_number,
|
||||
max_sequence_number: self.max_sequence_number,
|
||||
min_time: Timestamp::new(self.time_of_first_write.timestamp_nanos()),
|
||||
max_time: Timestamp::new(self.time_of_last_write.timestamp_nanos()),
|
||||
min_time,
|
||||
max_time,
|
||||
file_size_bytes: file_size_bytes as i64,
|
||||
parquet_metadata: metadata.thrift_bytes().to_vec(),
|
||||
row_count: self.row_count,
|
||||
compaction_level: self.compaction_level,
|
||||
row_count: row_count.try_into().expect("row count overflows i64"),
|
||||
created_at: Timestamp::new(self.creation_timestamp.timestamp_nanos()),
|
||||
}
|
||||
}
|
||||
|
@ -874,11 +892,8 @@ mod tests {
|
|||
table_name: Arc::from("weather"),
|
||||
partition_id: PartitionId::new(4),
|
||||
partition_key: Arc::from("part"),
|
||||
time_of_first_write: Time::from_timestamp(3234, 0),
|
||||
time_of_last_write: Time::from_timestamp(3234, 3456),
|
||||
min_sequence_number: SequenceNumber::new(5),
|
||||
max_sequence_number: SequenceNumber::new(6),
|
||||
row_count: 3,
|
||||
compaction_level: 0,
|
||||
sort_key: Some(sort_key),
|
||||
};
|
||||
|
@ -902,11 +917,8 @@ mod tests {
|
|||
table_name: "platanos".into(),
|
||||
partition_id: PartitionId::new(4),
|
||||
partition_key: "potato".into(),
|
||||
time_of_first_write: Time::from_timestamp_nanos(4242),
|
||||
time_of_last_write: Time::from_timestamp_nanos(424242),
|
||||
min_sequence_number: SequenceNumber::new(10),
|
||||
max_sequence_number: SequenceNumber::new(11),
|
||||
row_count: 1000,
|
||||
compaction_level: 1,
|
||||
sort_key: None,
|
||||
};
|
||||
|
|
|
@ -176,11 +176,8 @@ mod tests {
|
|||
table_name: "platanos".into(),
|
||||
partition_id: PartitionId::new(4),
|
||||
partition_key: "potato".into(),
|
||||
time_of_first_write: Time::from_timestamp_nanos(4242),
|
||||
time_of_last_write: Time::from_timestamp_nanos(424242),
|
||||
min_sequence_number: SequenceNumber::new(10),
|
||||
max_sequence_number: SequenceNumber::new(11),
|
||||
row_count: 1000,
|
||||
compaction_level: 1,
|
||||
sort_key: None,
|
||||
};
|
||||
|
|
|
@ -290,11 +290,8 @@ mod tests {
|
|||
table_name: "platanos".into(),
|
||||
partition_id: PartitionId::new(4),
|
||||
partition_key: "potato".into(),
|
||||
time_of_first_write: Time::from_timestamp_nanos(4242),
|
||||
time_of_last_write: Time::from_timestamp_nanos(424242),
|
||||
min_sequence_number: SequenceNumber::new(10),
|
||||
max_sequence_number: SequenceNumber::new(11),
|
||||
row_count: 1000,
|
||||
compaction_level: 1,
|
||||
sort_key: None,
|
||||
};
|
||||
|
|
|
@ -0,0 +1,174 @@
|
|||
use std::sync::Arc;
|
||||
|
||||
use arrow::{
|
||||
array::{ArrayRef, StringBuilder, TimestampNanosecondBuilder},
|
||||
record_batch::RecordBatch,
|
||||
};
|
||||
use data_types::{NamespaceId, PartitionId, SequenceNumber, SequencerId, TableId, Timestamp};
|
||||
use iox_time::Time;
|
||||
use object_store::DynObjectStore;
|
||||
use parquet_file::{metadata::IoxMetadata, storage::ParquetStorage};
|
||||
use schema::{builder::SchemaBuilder, InfluxFieldType, TIME_COLUMN_NAME};
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_decoded_iox_metadata() {
|
||||
// A representative IOx data sample (with a time column, an invariant upheld
|
||||
// in the IOx write path)
|
||||
let data = [
|
||||
(
|
||||
TIME_COLUMN_NAME,
|
||||
to_timestamp_array(&[
|
||||
// NOTE: not ordered to ensure min/max is derived, not head/tail
|
||||
1646917692000000000,
|
||||
1653311292000000000,
|
||||
1647695292000000000,
|
||||
]),
|
||||
),
|
||||
(
|
||||
"some_field",
|
||||
to_string_array(&["bananas", "platanos", "manzana"]),
|
||||
),
|
||||
];
|
||||
|
||||
// And the metadata the batch would be encoded with if it came through the
|
||||
// IOx write path.
|
||||
let meta = IoxMetadata {
|
||||
object_store_id: Default::default(),
|
||||
creation_timestamp: Time::from_timestamp_nanos(42),
|
||||
namespace_id: NamespaceId::new(1),
|
||||
namespace_name: "bananas".into(),
|
||||
sequencer_id: SequencerId::new(2),
|
||||
table_id: TableId::new(3),
|
||||
table_name: "platanos".into(),
|
||||
partition_id: PartitionId::new(4),
|
||||
partition_key: "potato".into(),
|
||||
min_sequence_number: SequenceNumber::new(10),
|
||||
max_sequence_number: SequenceNumber::new(11),
|
||||
compaction_level: 1,
|
||||
sort_key: None,
|
||||
};
|
||||
|
||||
let batch = RecordBatch::try_from_iter(data).unwrap();
|
||||
let stream = futures::stream::iter([Ok(batch.clone())]);
|
||||
|
||||
let object_store: Arc<DynObjectStore> = Arc::new(object_store::memory::InMemory::default());
|
||||
let storage = ParquetStorage::new(object_store);
|
||||
|
||||
let (iox_parquet_meta, file_size) = storage
|
||||
.upload(stream, &meta)
|
||||
.await
|
||||
.expect("failed to serialise & persist record batch");
|
||||
|
||||
// Sanity check - can't assert the actual value.
|
||||
assert!(file_size > 0);
|
||||
|
||||
// Decode the IOx metadata embedded in the parquet file metadata.
|
||||
let decoded = iox_parquet_meta
|
||||
.decode()
|
||||
.expect("failed to decode parquet file metadata");
|
||||
|
||||
// And verify the metadata matches the expected values.
|
||||
assert_eq!(
|
||||
decoded.row_count(),
|
||||
3,
|
||||
"row count statistics does not match input row count"
|
||||
);
|
||||
|
||||
let got = decoded
|
||||
.read_iox_metadata_new()
|
||||
.expect("failed to deserialise embedded IOx metadata");
|
||||
assert_eq!(
|
||||
got, meta,
|
||||
"embedded metadata does not match original metadata"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_derive_parquet_file_params() {
|
||||
// A representative IOx data sample (with a time column, an invariant upheld
|
||||
// in the IOx write path)
|
||||
let data = vec![
|
||||
to_string_array(&["bananas", "platanos", "manzana"]),
|
||||
to_timestamp_array(&[
|
||||
// NOTE: not ordered to ensure min/max extracted, not head/tail
|
||||
1646917692000000000,
|
||||
1653311292000000000,
|
||||
1647695292000000000,
|
||||
]),
|
||||
];
|
||||
|
||||
// And the metadata the batch would be encoded with if it came through the
|
||||
// IOx write path.
|
||||
let meta = IoxMetadata {
|
||||
object_store_id: Default::default(),
|
||||
creation_timestamp: Time::from_timestamp_nanos(1234),
|
||||
namespace_id: NamespaceId::new(1),
|
||||
namespace_name: "bananas".into(),
|
||||
sequencer_id: SequencerId::new(2),
|
||||
table_id: TableId::new(3),
|
||||
table_name: "platanos".into(),
|
||||
partition_id: PartitionId::new(4),
|
||||
partition_key: "potato".into(),
|
||||
min_sequence_number: SequenceNumber::new(10),
|
||||
max_sequence_number: SequenceNumber::new(11),
|
||||
compaction_level: 1,
|
||||
sort_key: None,
|
||||
};
|
||||
|
||||
// Build a schema that contains the IOx metadata, ensuring it is correctly
|
||||
// populated in the final parquet file's metadata.
|
||||
let schema = SchemaBuilder::new()
|
||||
.influx_field("some_field", InfluxFieldType::String)
|
||||
.timestamp()
|
||||
.build()
|
||||
.expect("could not create schema")
|
||||
.as_arrow();
|
||||
|
||||
let batch = RecordBatch::try_new(schema, data).unwrap();
|
||||
let stream = futures::stream::iter([Ok(batch.clone())]);
|
||||
|
||||
let object_store: Arc<DynObjectStore> = Arc::new(object_store::memory::InMemory::default());
|
||||
let storage = ParquetStorage::new(object_store);
|
||||
|
||||
let (iox_parquet_meta, file_size) = storage
|
||||
.upload(stream, &meta)
|
||||
.await
|
||||
.expect("failed to serialise & persist record batch");
|
||||
|
||||
// Use the IoxParquetMetaData and original IoxMetadata to derive a
|
||||
// ParquetFileParams.
|
||||
let catalog_data = meta.to_parquet_file(file_size, &iox_parquet_meta);
|
||||
|
||||
// And verify the resulting statistics used in the catalog.
|
||||
//
|
||||
// NOTE: thrift-encoded metadata not checked
|
||||
assert_eq!(catalog_data.sequencer_id, meta.sequencer_id);
|
||||
assert_eq!(catalog_data.namespace_id, meta.namespace_id);
|
||||
assert_eq!(catalog_data.table_id, meta.table_id);
|
||||
assert_eq!(catalog_data.partition_id, meta.partition_id);
|
||||
assert_eq!(catalog_data.object_store_id, meta.object_store_id);
|
||||
assert_eq!(catalog_data.min_sequence_number, meta.min_sequence_number);
|
||||
assert_eq!(catalog_data.max_sequence_number, meta.max_sequence_number);
|
||||
assert_eq!(catalog_data.file_size_bytes, file_size as i64);
|
||||
assert_eq!(catalog_data.compaction_level, meta.compaction_level);
|
||||
assert_eq!(catalog_data.created_at, Timestamp::new(1234));
|
||||
assert_eq!(catalog_data.row_count, 3);
|
||||
assert_eq!(catalog_data.min_time, Timestamp::new(1646917692000000000));
|
||||
assert_eq!(catalog_data.max_time, Timestamp::new(1653311292000000000));
|
||||
}
|
||||
|
||||
fn to_string_array(strs: &[&str]) -> ArrayRef {
|
||||
let mut builder = StringBuilder::new(strs.len());
|
||||
for s in strs {
|
||||
builder.append_value(s).expect("appending string");
|
||||
}
|
||||
Arc::new(builder.finish())
|
||||
}
|
||||
|
||||
fn to_timestamp_array(timestamps: &[i64]) -> ArrayRef {
|
||||
let mut builder = TimestampNanosecondBuilder::new(timestamps.len());
|
||||
builder
|
||||
.append_slice(timestamps)
|
||||
.expect("failed to append timestamp values");
|
||||
Arc::new(builder.finish())
|
||||
}
|
Loading…
Reference in New Issue