Merge pull request #2528 from influxdata/crepererum/compress_parquet_metadata

feat: compress encoded parquet metadata
pull/24376/head
kodiakhq[bot] 2021-09-20 07:51:24 +00:00 committed by GitHub
commit fa5c884aab
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 155 additions and 135 deletions

2
Cargo.lock generated
View File

@ -2814,6 +2814,7 @@ dependencies = [
"datafusion_util",
"futures",
"generated_types",
"google_types",
"internal_types",
"iox_object_store",
"metric",
@ -2833,6 +2834,7 @@ dependencies = [
"tokio-stream",
"tracker",
"uuid",
"zstd",
]
[[package]]

View File

@ -65,7 +65,11 @@ fn generate_grpc_types(root: &Path) -> Result<()> {
.compile_well_known_types()
.disable_comments(&[".google"])
.extern_path(".google.protobuf", "::google_types::protobuf")
.bytes(&[".influxdata.iox.catalog.v1.AddParquet.metadata"]);
.bytes(&[".influxdata.iox.catalog.v1.AddParquet.metadata"])
.btree_map(&[
".influxdata.iox.catalog.v1.DatabaseCheckpoint.sequencer_numbers",
".influxdata.iox.catalog.v1.PartitionCheckpoint.sequencer_numbers",
]);
let descriptor_path = PathBuf::from(env::var("OUT_DIR").unwrap()).join("proto_descriptor.bin");
tonic_build::configure()

View File

@ -30,11 +30,12 @@ message AddParquet {
// [Apache Parquet] metadata encoded using [Apache Thrift].
//
// The metadata is encoded using the [Thrift Compact Protocol].
// The metadata is encoded using the [Thrift Compact Protocol] and compressed using [Zstandard].
//
// [Apache Parquet]: https://parquet.apache.org/
// [Apache Thrift]: https://thrift.apache.org/
// [Thrift Compact Protocol]: https://github.com/apache/thrift/blob/master/doc/specs/thrift-compact-protocol.md
// [Zstandard]: http://facebook.github.io/zstd/
bytes metadata = 2;
}

View File

@ -1,13 +1,7 @@
syntax = "proto3";
package influxdata.iox.catalog.v1;
// Timestamp similar to the Google version but w/ fixed-sized integers.
//
// This is helpful for deterministic outputs.
message FixedSizeTimestamp {
sfixed64 seconds = 1;
sfixed32 nanos = 2;
}
import "google/protobuf/timestamp.proto";
// IOx-specific metadata that will be serialized into the file-level key-value Parquet metadata under a single key.
message IoxMetadata {
@ -15,7 +9,7 @@ message IoxMetadata {
uint32 version = 1;
// Timestamp when this file was created.
FixedSizeTimestamp creation_timestamp = 2;
google.protobuf.Timestamp creation_timestamp = 2;
// Table that holds this parquet file.
string table_name = 3;
@ -33,10 +27,10 @@ message IoxMetadata {
DatabaseCheckpoint database_checkpoint = 7;
// Wallclock timestamp of when the first data in this file was received by IOx.
FixedSizeTimestamp time_of_first_write = 8;
google.protobuf.Timestamp time_of_first_write = 8;
// Wallclock timestamp of when the last data in this file was received by IOx.
FixedSizeTimestamp time_of_last_write = 9;
google.protobuf.Timestamp time_of_last_write = 9;
// Order of this chunk relative to other overlapping chunks.
uint32 chunk_order = 10;
@ -52,7 +46,7 @@ message PartitionCheckpoint {
map<uint32, OptionalMinMaxSequence> sequencer_numbers = 1;
// Minimum unpersisted timestamp.
FixedSizeTimestamp min_unpersisted_timestamp = 2;
google.protobuf.Timestamp min_unpersisted_timestamp = 2;
}
// Record of the playback state for the whole database.

View File

@ -14,6 +14,7 @@ datafusion = { path = "../datafusion" }
datafusion_util = { path = "../datafusion_util" }
futures = "0.3.7"
generated_types = { path = "../generated_types" }
google_types = { path = "../google_types" }
internal_types = { path = "../internal_types" }
iox_object_store = { path = "../iox_object_store" }
metric = { path = "../metric" }
@ -32,6 +33,7 @@ tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync"
tokio-stream = "0.1"
tracker = { path = "../tracker" }
uuid = { version = "0.8", features = ["serde", "v4"] }
zstd = "0.9"
[dev-dependencies]
arrow_util = { path = "../arrow_util" }

View File

@ -37,7 +37,7 @@ pub use crate::catalog::internals::proto_parse::Error as ProtoParseError;
/// Current version for serialized transactions.
///
/// For breaking changes, this will change.
pub const TRANSACTION_VERSION: u32 = 13;
pub const TRANSACTION_VERSION: u32 = 14;
#[derive(Debug, Snafu)]
pub enum Error {

View File

@ -272,7 +272,7 @@ File {
is_checkpoint: false,
proto: Ok(
Transaction {
version: 13,
version: 14,
actions: [],
revision_counter: 0,
uuid: "00000000-0000-0000-0000-000000000000",
@ -297,7 +297,7 @@ File {
is_checkpoint: false,
proto: Ok(
Transaction {
version: 13,
version: 14,
actions: [
Action {
action: Some(
@ -313,7 +313,7 @@ File {
},
),
file_size_bytes: 33,
metadata: b"metadata omitted (1742 bytes)",
metadata: b"metadata omitted (930 bytes)",
},
),
),
@ -396,7 +396,7 @@ File {
is_checkpoint: false,
proto: Ok(
Transaction {
version: 13,
version: 14,
actions: [],
revision_counter: 0,
uuid: "00000000-0000-0000-0000-000000000000",
@ -421,7 +421,7 @@ File {
is_checkpoint: false,
proto: Ok(
Transaction {
version: 13,
version: 14,
actions: [
Action {
action: Some(
@ -437,7 +437,7 @@ File {
},
),
file_size_bytes: 33,
metadata: b"metadata omitted (1742 bytes)",
metadata: b"metadata omitted (930 bytes)",
},
),
),

View File

@ -86,7 +86,7 @@
//! [Apache Parquet]: https://parquet.apache.org/
//! [Apache Thrift]: https://thrift.apache.org/
//! [Thrift Compact Protocol]: https://github.com/apache/thrift/blob/master/doc/specs/thrift-compact-protocol.md
use chrono::{DateTime, NaiveDateTime, Utc};
use chrono::{DateTime, Utc};
use data_types::{
chunk_metadata::ChunkOrder,
partition_metadata::{ColumnSummary, InfluxDbType, StatValues, Statistics},
@ -121,7 +121,7 @@ use thrift::protocol::{TCompactInputProtocol, TCompactOutputProtocol, TOutputPro
///
/// **Important: When changing this structure, consider bumping the
/// [catalog transaction version](crate::catalog::core::TRANSACTION_VERSION)!**
pub const METADATA_VERSION: u32 = 6;
pub const METADATA_VERSION: u32 = 7;
/// File-level metadata key to store the IOx-specific data.
///
@ -231,6 +231,12 @@ pub enum Error {
expected
))]
IoxMetadataVersionMismatch { actual: u32, expected: Vec<u32> },
#[snafu(display("Cannot encode ZSTD message for parquet metadata: {}", source))]
ZstdEncodeFailure { source: std::io::Error },
#[snafu(display("Cannot decode ZSTD message for parquet metadata: {}", source))]
ZstdDecodeFailure { source: std::io::Error },
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@ -391,9 +397,9 @@ impl IoxMetadata {
)
})
.collect(),
min_unpersisted_timestamp: Some(encode_timestamp(
self.partition_checkpoint.min_unpersisted_timestamp(),
)),
min_unpersisted_timestamp: Some(
self.partition_checkpoint.min_unpersisted_timestamp().into(),
),
};
let proto_database_checkpoint = proto::DatabaseCheckpoint {
@ -416,9 +422,9 @@ impl IoxMetadata {
let proto_msg = proto::IoxMetadata {
version: METADATA_VERSION,
creation_timestamp: Some(encode_timestamp(self.creation_timestamp)),
time_of_first_write: Some(encode_timestamp(self.time_of_first_write)),
time_of_last_write: Some(encode_timestamp(self.time_of_last_write)),
creation_timestamp: Some(self.creation_timestamp.into()),
time_of_first_write: Some(self.time_of_first_write.into()),
time_of_last_write: Some(self.time_of_last_write.into()),
table_name: self.table_name.to_string(),
partition_key: self.partition_key.to_string(),
chunk_id: self.chunk_id,
@ -434,35 +440,21 @@ impl IoxMetadata {
}
}
fn encode_timestamp(ts: DateTime<Utc>) -> proto::FixedSizeTimestamp {
proto::FixedSizeTimestamp {
seconds: ts.timestamp(),
nanos: ts.timestamp_subsec_nanos() as i32,
}
}
fn decode_timestamp(ts: proto::FixedSizeTimestamp) -> Result<DateTime<Utc>> {
let dt = NaiveDateTime::from_timestamp(
ts.seconds,
ts.nanos
.try_into()
.map_err(|e| Box::new(e) as _)
.context(IoxMetadataBroken)?,
);
Ok(chrono::DateTime::<Utc>::from_utc(dt, Utc))
}
fn decode_timestamp_from_field(
value: Option<proto::FixedSizeTimestamp>,
value: Option<google_types::protobuf::Timestamp>,
field: &'static str,
) -> Result<DateTime<Utc>> {
decode_timestamp(value.context(IoxMetadataFieldMissing { field })?)
value
.context(IoxMetadataFieldMissing { field })?
.try_into()
.map_err(|e| Box::new(e) as _)
.context(IoxMetadataBroken)
}
/// Parquet metadata with IOx-specific wrapper.
#[derive(Debug, Clone)]
pub struct IoxParquetMetaData {
/// [Apache Parquet] metadata as freestanding [Apache Thrift]-encoded bytes.
/// [Apache Parquet] metadata as freestanding [Apache Thrift]-encoded, and [Zstandard]-compressed bytes.
///
/// This can be used to store metadata separate from the related payload data. The usage of [Apache Thrift] allows the
/// same stability guarantees as the usage of an ordinary [Apache Parquet] file. To encode a thrift message into bytes
@ -471,6 +463,7 @@ pub struct IoxParquetMetaData {
/// [Apache Parquet]: https://parquet.apache.org/
/// [Apache Thrift]: https://thrift.apache.org/
/// [Thrift Compact Protocol]: https://github.com/apache/thrift/blob/master/doc/specs/thrift-compact-protocol.md
/// [Zstandard]: http://facebook.github.io/zstd/
data: Vec<u8>,
}
@ -490,7 +483,7 @@ impl IoxParquetMetaData {
Self { data }
}
/// [Apache Parquet] metadata as freestanding [Apache Thrift]-encoded bytes.
/// [Apache Parquet] metadata as freestanding [Apache Thrift]-encoded, and [Zstandard]-compressed bytes.
///
/// This can be used to store metadata separate from the related payload data. The usage of [Apache Thrift] allows the
/// same stability guarantees as the usage of an ordinary [Apache Parquet] file. To encode a thrift message into bytes
@ -499,11 +492,12 @@ impl IoxParquetMetaData {
/// [Apache Parquet]: https://parquet.apache.org/
/// [Apache Thrift]: https://thrift.apache.org/
/// [Thrift Compact Protocol]: https://github.com/apache/thrift/blob/master/doc/specs/thrift-compact-protocol.md
/// [Zstandard]: http://facebook.github.io/zstd/
pub fn thrift_bytes(&self) -> &[u8] {
self.data.as_ref()
}
/// Encode [Apache Parquet] metadata as freestanding [Apache Thrift]-encoded bytes.
/// Encode [Apache Parquet] metadata as freestanding [Apache Thrift]-encoded, and [Zstandard]-compressed bytes.
///
/// This can be used to store metadata separate from the related payload data. The usage of [Apache Thrift] allows the
/// same stability guarantees as the usage of an ordinary [Apache Parquet] file. To encode a thrift message into bytes
@ -512,6 +506,7 @@ impl IoxParquetMetaData {
/// [Apache Parquet]: https://parquet.apache.org/
/// [Apache Thrift]: https://thrift.apache.org/
/// [Thrift Compact Protocol]: https://github.com/apache/thrift/blob/master/doc/specs/thrift-compact-protocol.md
/// [Zstandard]: http://facebook.github.io/zstd/
fn parquet_md_to_thrift(parquet_md: ParquetMetaData) -> Result<Vec<u8>> {
// step 1: assemble a thrift-compatible struct
use parquet::schema::types::to_thrift as schema_to_thrift;
@ -548,22 +543,30 @@ impl IoxParquetMetaData {
protocol.flush().context(ThriftWriteFailure {})?;
}
// step 3: compress data
// Note: level 0 is the zstd-provided default
let buffer = zstd::encode_all(&buffer[..], 0).context(ZstdEncodeFailure)?;
Ok(buffer)
}
/// Decode [Apache Parquet] metadata from [Apache Thrift]-encoded bytes.
/// Decode [Apache Parquet] metadata from [Apache Thrift]-encoded, and [Zstandard]-compressed bytes.
///
/// [Apache Parquet]: https://parquet.apache.org/
/// [Apache Thrift]: https://thrift.apache.org/
/// [Zstandard]: http://facebook.github.io/zstd/
pub fn decode(&self) -> Result<DecodedIoxParquetMetaData> {
// step 1: load thrift data from byte stream
// step 1: decompress
let data = zstd::decode_all(&self.data[..]).context(ZstdDecodeFailure)?;
// step 2: load thrift data from byte stream
let thrift_file_metadata = {
let mut protocol = TCompactInputProtocol::new(&self.data[..]);
let mut protocol = TCompactInputProtocol::new(&data[..]);
parquet_format::FileMetaData::read_from_in_protocol(&mut protocol)
.context(ThriftReadFailure {})?
};
// step 2: convert thrift to in-mem structs
// step 3: convert thrift to in-mem structs
use parquet::schema::types::from_thrift as schema_from_thrift;
let schema =
@ -853,7 +856,6 @@ mod tests {
use super::*;
use internal_types::schema::TIME_COLUMN_NAME;
use persistence_windows::checkpoint::PersistCheckpointBuilder;
use crate::test_utils::{
chunk_addr, create_partition_and_database_checkpoint, load_parquet_from_store, make_chunk,
@ -1095,42 +1097,6 @@ mod tests {
);
}
#[test]
fn test_iox_metadata_to_protobuf_deterministic_size() {
// checks that different timestamps do NOT alter the size of the serialized metadata
let table_name = Arc::from("table1");
let partition_key = Arc::from("part1");
// try multiple time to provoke an error
for _ in 0..100 {
// build checkpoints
let min_unpersisted_timestamp = Utc::now();
let partition_checkpoint = PartitionCheckpoint::new(
Arc::clone(&table_name),
Arc::clone(&partition_key),
Default::default(),
min_unpersisted_timestamp,
);
let builder = PersistCheckpointBuilder::new(partition_checkpoint);
let (partition_checkpoint, database_checkpoint) = builder.build();
let metadata = IoxMetadata {
creation_timestamp: Utc::now(),
table_name: Arc::clone(&table_name),
partition_key: Arc::clone(&partition_key),
chunk_id: 1337,
partition_checkpoint,
database_checkpoint,
time_of_first_write: Utc::now(),
time_of_last_write: Utc::now(),
chunk_order: ChunkOrder::new(5),
};
let proto_bytes = metadata.to_protobuf().unwrap();
assert_eq!(proto_bytes.len(), 90);
}
}
#[tokio::test]
async fn test_parquet_metadata_size() {
// setup: preserve chunk to object store
@ -1146,6 +1112,6 @@ mod tests {
.await
.unwrap();
let parquet_metadata = IoxParquetMetaData::from_file_bytes(parquet_data).unwrap();
assert_eq!(parquet_metadata.size(), 11939);
assert_eq!(parquet_metadata.size(), 3716);
}
}

View File

@ -612,6 +612,21 @@ impl Db {
partition_key: &str,
now: Instant,
) -> Result<Arc<DbChunk>> {
self.persist_partition_with_timestamp(table_name, partition_key, now, Utc::now)
.await
}
/// Internal use only for testing.
async fn persist_partition_with_timestamp<F>(
self: &Arc<Self>,
table_name: &str,
partition_key: &str,
now: Instant,
f_parquet_creation_timestamp: F,
) -> Result<Arc<DbChunk>>
where
F: Fn() -> DateTime<Utc> + Send,
{
// Use explicit scope to ensure the async generator doesn't
// assume the locks have to possibly live across the `await`
let fut = {
@ -664,8 +679,13 @@ impl Db {
}
);
let (_, fut) = lifecycle::persist_chunks(partition, chunks, flush_handle)
.context(LifecycleError)?;
let (_, fut) = lifecycle::persist_chunks(
partition,
chunks,
flush_handle,
f_parquet_creation_timestamp,
)
.context(LifecycleError)?;
fut
};
@ -1577,7 +1597,8 @@ mod tests {
let db = Arc::clone(&test_db.db);
write_lp(db.as_ref(), "cpu bar=1 10").await;
let t1_write = Utc.timestamp(11, 22);
write_lp_with_time(db.as_ref(), "cpu bar=1 10", t1_write).await;
let registry = test_db.metric_registry.as_ref();
@ -1593,10 +1614,14 @@ mod tests {
catalog_chunk_size_bytes_metric_eq(registry, "mutable_buffer", 700);
// write into same chunk again.
write_lp(db.as_ref(), "cpu bar=2 20").await;
write_lp(db.as_ref(), "cpu bar=3 30").await;
write_lp(db.as_ref(), "cpu bar=4 40").await;
write_lp(db.as_ref(), "cpu bar=5 50").await;
let t2_write = t1_write + chrono::Duration::seconds(1);
write_lp_with_time(db.as_ref(), "cpu bar=2 20", t2_write).await;
let t3_write = t2_write + chrono::Duration::seconds(1);
write_lp_with_time(db.as_ref(), "cpu bar=3 30", t3_write).await;
let t4_write = t3_write + chrono::Duration::seconds(1);
write_lp_with_time(db.as_ref(), "cpu bar=4 40", t4_write).await;
let t5_write = t4_write + chrono::Duration::seconds(1);
write_lp_with_time(db.as_ref(), "cpu bar=5 50", t5_write).await;
// verify chunk size updated
catalog_chunk_size_bytes_metric_eq(registry, "mutable_buffer", 764);
@ -1638,16 +1663,18 @@ mod tests {
let expected_read_buffer_size = 1706;
catalog_chunk_size_bytes_metric_eq(registry, "read_buffer", expected_read_buffer_size);
db.persist_partition(
let t6_write = t5_write + chrono::Duration::seconds(1);
db.persist_partition_with_timestamp(
"cpu",
"1970-01-01T00",
Instant::now() + Duration::from_secs(1),
|| t6_write,
)
.await
.unwrap();
// A chunk is now in the object store and still in read buffer
let expected_parquet_size = 1551;
let expected_parquet_size = 1243;
catalog_chunk_size_bytes_metric_eq(registry, "read_buffer", expected_read_buffer_size);
// now also in OS
catalog_chunk_size_bytes_metric_eq(registry, "object_store", expected_parquet_size);
@ -2041,8 +2068,10 @@ mod tests {
let db = test_db.db;
// Write some line protocols in Mutable buffer of the DB
write_lp(db.as_ref(), "cpu bar=1 10").await;
write_lp(db.as_ref(), "cpu bar=2 20").await;
let t1_write = Utc.timestamp(11, 22);
write_lp_with_time(db.as_ref(), "cpu bar=1 10", t1_write).await;
let t2_write = t1_write + chrono::Duration::seconds(1);
write_lp_with_time(db.as_ref(), "cpu bar=2 20", t2_write).await;
//Now mark the MB chunk close
let partition_key = "1970-01-01T00";
@ -2057,11 +2086,13 @@ mod tests {
.await
.unwrap();
// Write the RB chunk to Object Store but keep it in RB
let t3_persist = t2_write + chrono::Duration::seconds(1);
let pq_chunk = db
.persist_partition(
.persist_partition_with_timestamp(
"cpu",
partition_key,
Instant::now() + Duration::from_secs(1),
|| t3_persist,
)
.await
.unwrap();
@ -2071,7 +2102,7 @@ mod tests {
// Read buffer + Parquet chunk size
catalog_chunk_size_bytes_metric_eq(registry, "mutable_buffer", 0);
catalog_chunk_size_bytes_metric_eq(registry, "read_buffer", 1700);
catalog_chunk_size_bytes_metric_eq(registry, "object_store", 1551);
catalog_chunk_size_bytes_metric_eq(registry, "object_store", 1242);
// while MB and RB chunk are identical, the PQ chunk is a new one (split off)
assert_eq!(mb_chunk.id(), rb_chunk.id());
@ -2137,8 +2168,10 @@ mod tests {
let db = test_db.db;
// Write some line protocols in Mutable buffer of the DB
write_lp(db.as_ref(), "cpu bar=1 10").await;
write_lp(db.as_ref(), "cpu bar=2 20").await;
let t1_write = Utc.timestamp(11, 22);
write_lp_with_time(db.as_ref(), "cpu bar=1 10", t1_write).await;
let t2_write = t1_write + chrono::Duration::seconds(1);
write_lp_with_time(db.as_ref(), "cpu bar=2 20", t2_write).await;
// Now mark the MB chunk close
let partition_key = "1970-01-01T00";
@ -2153,11 +2186,13 @@ mod tests {
.await
.unwrap();
// Write the RB chunk to Object Store but keep it in RB
let t3_persist = t2_write + chrono::Duration::seconds(1);
let pq_chunk = db
.persist_partition(
.persist_partition_with_timestamp(
"cpu",
partition_key,
Instant::now() + Duration::from_secs(1),
|| t3_persist,
)
.await
.unwrap();
@ -2180,7 +2215,7 @@ mod tests {
// Read buffer + Parquet chunk size
catalog_chunk_size_bytes_metric_eq(registry, "mutable_buffer", 0);
catalog_chunk_size_bytes_metric_eq(registry, "read_buffer", 1700);
catalog_chunk_size_bytes_metric_eq(registry, "object_store", 1551);
catalog_chunk_size_bytes_metric_eq(registry, "object_store", 1242);
// Unload RB chunk but keep it in OS
let pq_chunk = db
@ -2202,7 +2237,7 @@ mod tests {
// Parquet chunk size only
catalog_chunk_size_bytes_metric_eq(registry, "mutable_buffer", 0);
catalog_chunk_size_bytes_metric_eq(registry, "read_buffer", 0);
catalog_chunk_size_bytes_metric_eq(registry, "object_store", 1551);
catalog_chunk_size_bytes_metric_eq(registry, "object_store", 1242);
// Verify data written to the parquet file in object store
//
@ -2542,16 +2577,16 @@ mod tests {
// get three chunks: one open, one closed in mb and one close in rb
// In open chunk, will end up in rb/os
let t_write1 = Utc::now();
write_lp_with_time(&db, "cpu bar=1 1", t_write1).await;
let t1_write = Utc.timestamp(11, 22);
write_lp_with_time(&db, "cpu bar=1 1", t1_write).await;
// Move open chunk to closed
db.rollover_partition("cpu", "1970-01-01T00").await.unwrap();
// New open chunk in mb
// This point will end up in rb/os
let t_write2 = Utc::now();
write_lp_with_time(&db, "cpu bar=1,baz=2 2", t_write2).await;
let t2_write = t1_write + chrono::Duration::seconds(1);
write_lp_with_time(&db, "cpu bar=1,baz=2 2", t2_write).await;
// Check first/last write times on the chunks at this point
let mut chunk_summaries = db.chunk_summaries().expect("expected summary to return");
@ -2560,15 +2595,15 @@ mod tests {
// Each chunk has one write, so both chunks should have first write == last write
let closed_mb_t3 = chunk_summaries[0].clone();
assert_eq!(closed_mb_t3.storage, ChunkStorage::ClosedMutableBuffer);
assert_first_last_times_eq(&closed_mb_t3, t_write1);
assert_first_last_times_eq(&closed_mb_t3, t1_write);
let open_mb_t3 = chunk_summaries[1].clone();
assert_eq!(open_mb_t3.storage, ChunkStorage::OpenMutableBuffer);
assert_first_last_times_eq(&open_mb_t3, t_write2);
assert_first_last_times_eq(&open_mb_t3, t2_write);
assert_chunks_times_ordered(&closed_mb_t3, &open_mb_t3);
// This point makes a new open mb chunk and will end up in the closed mb chunk
let t_write3 = Utc::now();
write_lp_with_time(&db, "cpu bar=1,baz=2,frob=3 400000000000000", t_write3).await;
let t3_write = t2_write + chrono::Duration::seconds(1);
write_lp_with_time(&db, "cpu bar=1,baz=2,frob=3 400000000000000", t3_write).await;
// Check first/last write times on the chunks at this point
let mut chunk_summaries = db.chunk_summaries().expect("expected summary to return");
@ -2610,10 +2645,12 @@ mod tests {
assert_chunks_times_eq(&other_open_mb_t5, &other_open_mb_t4);
// Persist rb to parquet os
db.persist_partition(
let t4_persist = t3_write + chrono::Duration::seconds(1);
db.persist_partition_with_timestamp(
"cpu",
"1970-01-01T00",
Instant::now() + Duration::from_secs(1),
|| t4_persist,
)
.await
.unwrap();
@ -2654,8 +2691,8 @@ mod tests {
// New open chunk in mb
// This point will stay in this open mb chunk
let t_write4 = Utc::now();
write_lp_with_time(&db, "cpu bar=1,baz=3,blargh=3 400000000000000", t_write4).await;
let t5_write = t4_persist + chrono::Duration::seconds(1);
write_lp_with_time(&db, "cpu bar=1,baz=3,blargh=3 400000000000000", t5_write).await;
// Check first/last write times on the chunks at this point
let mut chunk_summaries = db.chunk_summaries().expect("expected summary to return");
@ -2673,7 +2710,7 @@ mod tests {
// times should be the same
let open_mb_t8 = chunk_summaries[2].clone();
assert_eq!(open_mb_t8.storage, ChunkStorage::OpenMutableBuffer);
assert_first_last_times_eq(&open_mb_t8, t_write4);
assert_first_last_times_eq(&open_mb_t8, t5_write);
let lifecycle_action = None;
@ -2684,8 +2721,8 @@ mod tests {
id: 2,
storage: ChunkStorage::ReadBufferAndObjectStore,
lifecycle_action,
memory_bytes: 4557, // size of RB and OS chunks
object_store_bytes: 1577, // size of parquet file
memory_bytes: 4085, // size of RB and OS chunks
object_store_bytes: 1533, // size of parquet file
row_count: 2,
time_of_last_access: None,
time_of_first_write: Utc.timestamp_nanos(1),
@ -2739,7 +2776,7 @@ mod tests {
assert_eq!(db.catalog.metrics().memory().mutable_buffer(), 2486 + 1303);
assert_eq!(db.catalog.metrics().memory().read_buffer(), 2550);
assert_eq!(db.catalog.metrics().memory().object_store(), 2007);
assert_eq!(db.catalog.metrics().memory().object_store(), 1535);
}
#[tokio::test]

View File

@ -225,7 +225,7 @@ impl LockablePartition for LockableCatalogPartition {
handle: Self::PersistHandle,
) -> Result<TaskTracker<Job>, Self::Error> {
info!(table=%partition.table_name(), partition=%partition.partition_key(), "persisting chunks");
let (tracker, fut) = persist::persist_chunks(partition, chunks, handle.0)?;
let (tracker, fut) = persist::persist_chunks(partition, chunks, handle.0, Utc::now)?;
let _ = tokio::spawn(async move { fut.await.log_if_error("persisting chunks") });
Ok(tracker)
}

View File

@ -19,14 +19,18 @@ use tracker::{TaskTracker, TrackedFuture, TrackedFutureExt};
/// Split and then persist the provided chunks
///
/// TODO: Replace low-level locks with transaction object
pub fn persist_chunks(
pub fn persist_chunks<F>(
partition: LifecycleWriteGuard<'_, Partition, LockableCatalogPartition>,
chunks: Vec<LifecycleWriteGuard<'_, CatalogChunk, LockableCatalogChunk>>,
flush_handle: FlushHandle,
f_parquet_creation_timestamp: F,
) -> Result<(
TaskTracker<Job>,
TrackedFuture<impl Future<Output = Result<Arc<DbChunk>>> + Send>,
)> {
)>
where
F: Fn() -> DateTime<Utc> + Send,
{
assert!(
!chunks.is_empty(),
"must provide at least 1 chunk to persist"
@ -164,7 +168,13 @@ pub fn persist_chunks(
};
let to_persist = to_persist.write();
write_chunk_to_object_store(partition_write, to_persist, flush_handle)?.1
write_chunk_to_object_store(
partition_write,
to_persist,
flush_handle,
f_parquet_creation_timestamp,
)?
.1
};
// Wait for write operation to complete
@ -239,7 +249,7 @@ mod tests {
assert_eq!(handle.timestamp(), Utc.timestamp_nanos(10));
let chunks: Vec<_> = chunks.map(|x| x.upgrade()).collect();
persist_chunks(partition, chunks, handle)
persist_chunks(partition, chunks, handle, Utc::now)
.unwrap()
.1
.await

View File

@ -12,7 +12,7 @@ use crate::db::{
use ::lifecycle::LifecycleWriteGuard;
use chrono::Utc;
use chrono::{DateTime, Utc};
use data_types::{chunk_metadata::ChunkLifecycleAction, job::Job};
use internal_types::selection::Selection;
use observability_deps::tracing::{debug, warn};
@ -40,14 +40,18 @@ use super::{
///
/// Returns a future registered with the tracker registry, and the corresponding tracker
/// The caller can either spawn this future to tokio, or block directly on it
pub(super) fn write_chunk_to_object_store(
pub(super) fn write_chunk_to_object_store<F>(
partition: LifecycleWriteGuard<'_, Partition, LockableCatalogPartition>,
mut chunk: LifecycleWriteGuard<'_, CatalogChunk, LockableCatalogChunk>,
flush_handle: FlushHandle,
f_parquet_creation_timestamp: F,
) -> Result<(
TaskTracker<Job>,
TrackedFuture<impl Future<Output = Result<Arc<DbChunk>>> + Send>,
)> {
)>
where
F: Fn() -> DateTime<Utc> + Send,
{
let db = Arc::clone(&chunk.data().db);
let addr = chunk.addr().clone();
let table_name = Arc::clone(&addr.table_name);
@ -119,7 +123,7 @@ pub(super) fn write_chunk_to_object_store(
// IMPORTANT: Writing must take place while holding the cleanup lock, otherwise the file might be deleted
// between creation and the transaction commit.
let metadata = IoxMetadata {
creation_timestamp: Utc::now(),
creation_timestamp: f_parquet_creation_timestamp(),
table_name: Arc::clone(&table_name),
partition_key: Arc::clone(&partition_key),
chunk_id: addr.chunk_id,