From d0903b11bb25d1d8f9ff06601c8ef0548e10beaa Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 30 May 2022 08:56:09 -0400 Subject: [PATCH] refactor: reduce test duplication in `querier/src/table/mod.rs` (#4698) * refactor: reduce test duplication in `querier/src/table/mod.rs` * fix: Apply suggestions from code review Co-authored-by: Jake Goulding * fix: Update querier/src/table/test_util.rs Co-authored-by: Jake Goulding * fix: use now_nanos() * refactor: Add TestQuerierTable * refactor: rename functions for explicitness Co-authored-by: Jake Goulding --- querier/src/table/mod.rs | 448 +++++++++++++-------------------- querier/src/table/test_util.rs | 71 +++--- 2 files changed, 220 insertions(+), 299 deletions(-) diff --git a/querier/src/table/mod.rs b/querier/src/table/mod.rs index d901f061bc..7a87077c1c 100644 --- a/querier/src/table/mod.rs +++ b/querier/src/table/mod.rs @@ -281,13 +281,12 @@ mod tests { use super::*; use crate::{ ingester::{test_util::MockIngesterConnection, IngesterPartition}, - table::test_util::{lp_to_record_batch, querier_table, IngesterPartitionBuilder}, + table::test_util::{querier_table, IngesterPartitionBuilder}, }; #[tokio::test] async fn test_parquet_chunks() { maybe_start_logging(); - let pred = Predicate::default(); let catalog = TestCatalog::new(); let ns = catalog.create_namespace("ns").await; @@ -314,73 +313,31 @@ mod tests { table1.create_column("foo", ColumnType::I64).await; table2.create_column("foo", ColumnType::I64).await; - let querier_table = querier_table(&catalog, &table1).await; + let querier_table = TestQuerierTable::new(&catalog, &table1).await; // no parquet files yet - assert!(querier_table.chunks(&pred).await.unwrap().is_empty()); + assert!(querier_table.chunks().await.unwrap().is_empty()); let file111 = partition11 - .create_parquet_file_with_min_max( - "table1 foo=1 11", - 1, - 2, - now().timestamp_nanos(), - now().timestamp_nanos(), - ) + .create_parquet_file_with_min_max("table1 foo=1 11", 1, 2, now_nanos(), now_nanos()) .await; let file112 = partition11 - .create_parquet_file_with_min_max( - "table1 foo=2 22", - 3, - 4, - now().timestamp_nanos(), - now().timestamp_nanos(), - ) + .create_parquet_file_with_min_max("table1 foo=2 22", 3, 4, now_nanos(), now_nanos()) .await; let file113 = partition11 - .create_parquet_file_with_min_max( - "table1 foo=3 33", - 5, - 6, - now().timestamp_nanos(), - now().timestamp_nanos(), - ) + .create_parquet_file_with_min_max("table1 foo=3 33", 5, 6, now_nanos(), now_nanos()) .await; let file114 = partition11 - .create_parquet_file_with_min_max( - "table1 foo=4 44", - 7, - 8, - now().timestamp_nanos(), - now().timestamp_nanos(), - ) + .create_parquet_file_with_min_max("table1 foo=4 44", 7, 8, now_nanos(), now_nanos()) .await; let file115 = partition11 - .create_parquet_file_with_min_max( - "table1 foo=5 55", - 9, - 10, - now().timestamp_nanos(), - now().timestamp_nanos(), - ) + .create_parquet_file_with_min_max("table1 foo=5 55", 9, 10, now_nanos(), now_nanos()) .await; let file121 = partition12 - .create_parquet_file_with_min_max( - "table1 foo=5 55", - 1, - 2, - now().timestamp_nanos(), - now().timestamp_nanos(), - ) + .create_parquet_file_with_min_max("table1 foo=5 55", 1, 2, now_nanos(), now_nanos()) .await; let _file211 = partition21 - .create_parquet_file_with_min_max( - "table2 foo=6 66", - 1, - 2, - now().timestamp_nanos(), - now().timestamp_nanos(), - ) + .create_parquet_file_with_min_max("table2 foo=6 66", 1, 2, now_nanos(), now_nanos()) .await; file111.flag_for_delete().await; @@ -397,14 +354,14 @@ mod tests { tombstone2.mark_processed(&file112).await; // As we have now made new parquet files, force a cache refresh - querier_table.clear_parquet_cache(); - querier_table.clear_tombstone_cache(); + querier_table.inner().clear_parquet_cache(); + querier_table.inner().clear_tombstone_cache(); // now we have some files // this contains all files except for: // - file111: marked for delete // - file221: wrong table - let mut chunks = querier_table.chunks(&pred).await.unwrap(); + let mut chunks = querier_table.chunks().await.unwrap(); chunks.sort_by_key(|c| c.id()); assert_eq!(chunks.len(), 5); @@ -446,14 +403,13 @@ mod tests { #[tokio::test] async fn test_compactor_collision() { maybe_start_logging(); - let pred = Predicate::default(); let catalog = TestCatalog::new(); let ns = catalog.create_namespace("ns").await; let table = ns.create_table("table").await; let sequencer = ns.create_sequencer(1).await; let partition = table.with_sequencer(&sequencer).create_partition("k").await; - table.create_column("foo", ColumnType::I64).await; + let schema = make_schema(&table).await; // create a parquet file that cannot be processed by the querier: // @@ -478,45 +434,24 @@ mod tests { // row-level sequence numbers. partition - .create_parquet_file_with_min_max( - "table foo=1 11", - 1, - 2, - now().timestamp_nanos(), - now().timestamp_nanos(), - ) + .create_parquet_file_with_min_max("table foo=1 11", 1, 2, now_nanos(), now_nanos()) .await; - let querier_table = querier_table(&catalog, &table).await; + let builder = IngesterPartitionBuilder::new(&ns, &table, &schema, &sequencer, &partition); + let ingester_partition = + builder.build_with_max_parquet_sequence_number(Some(SequenceNumber::new(1))); - querier_table - .ingester_connection - .as_any() - .downcast_ref::() - .unwrap() - .next_response(Ok(vec![IngesterPartition::try_new( - Arc::from("ingester"), - ChunkId::new(), - Arc::from(ns.namespace.name.clone()), - Arc::from(table.table.name.clone()), - partition.partition.id, - sequencer.sequencer.id, - Arc::new(SchemaBuilder::new().build().unwrap()), - Some(SequenceNumber::new(1)), - None, - Arc::new(None), - vec![], - ) - .unwrap()])); + let querier_table = TestQuerierTable::new(&catalog, &table) + .await + .with_ingester_partition(ingester_partition); - let err = querier_table.chunks(&pred).await.unwrap_err(); + let err = querier_table.chunks().await.unwrap_err(); assert_matches!(err, Error::StateFusion { .. }); } #[tokio::test] async fn test_state_reconcile() { maybe_start_logging(); - let pred = Predicate::default(); let catalog = TestCatalog::new(); let ns = catalog.create_namespace("ns").await; @@ -533,46 +468,22 @@ mod tests { // kept because max sequence number <= 2 let file1 = partition1 - .create_parquet_file_with_min_max( - "table foo=1 11", - 1, - 2, - now().timestamp_nanos(), - now().timestamp_nanos(), - ) + .create_parquet_file_with_min_max("table foo=1 11", 1, 2, now_nanos(), now_nanos()) .await; // pruned because min sequence number > 2 partition1 - .create_parquet_file_with_min_max( - "table foo=2 22", - 3, - 3, - now().timestamp_nanos(), - now().timestamp_nanos(), - ) + .create_parquet_file_with_min_max("table foo=2 22", 3, 3, now_nanos(), now_nanos()) .await; // kept because max sequence number <= 3 let file2 = partition2 - .create_parquet_file_with_min_max( - "table foo=1 11", - 1, - 3, - now().timestamp_nanos(), - now().timestamp_nanos(), - ) + .create_parquet_file_with_min_max("table foo=1 11", 1, 3, now_nanos(), now_nanos()) .await; // pruned because min sequence number > 3 partition2 - .create_parquet_file_with_min_max( - "table foo=2 22", - 4, - 4, - now().timestamp_nanos(), - now().timestamp_nanos(), - ) + .create_parquet_file_with_min_max("table foo=2 22", 4, 4, now_nanos(), now_nanos()) .await; // partition1: kept because sequence number <= 10 @@ -596,65 +507,45 @@ mod tests { .create_tombstone(12, 1, 100, "foo=3") .await; - let querier_table = querier_table(&catalog, &table).await; - - let ingester_chunk_id1 = ChunkId::new_test(u128::MAX - 1); - let ingester_chunk_id2 = ChunkId::new_test(u128::MAX); - querier_table - .ingester_connection - .as_any() - .downcast_ref::() - .unwrap() - .next_response(Ok(vec![ - // this chunk is kept - IngesterPartition::try_new( - Arc::from("ingester"), - ingester_chunk_id1, - Arc::from(ns.namespace.name.clone()), - Arc::from(table.table.name.clone()), - partition1.partition.id, - sequencer.sequencer.id, - Arc::new( - SchemaBuilder::new() - .influx_field("foo", InfluxFieldType::Integer) - .timestamp() - .build() - .unwrap(), - ), - // parquet max persisted sequence number - Some(SequenceNumber::new(2)), - // tombstone max persisted sequence number - Some(SequenceNumber::new(10)), - Arc::new(None), - vec![lp_to_record_batch("table foo=3i 33")], - ) + let schema = Arc::new( + SchemaBuilder::new() + .influx_field("foo", InfluxFieldType::Integer) + .timestamp() + .build() .unwrap(), - // this chunk is filtered out because it has no record batches but the reconciling still takes place - IngesterPartition::try_new( - Arc::from("ingester"), - ingester_chunk_id2, - Arc::from(ns.namespace.name.clone()), - Arc::from(table.table.name.clone()), - partition2.partition.id, - sequencer.sequencer.id, - Arc::new( - SchemaBuilder::new() - .influx_field("foo", InfluxFieldType::Integer) - .timestamp() - .build() - .unwrap(), + ); + + let ingester_chunk_id1 = u128::MAX - 1; + + let builder1 = IngesterPartitionBuilder::new(&ns, &table, &schema, &sequencer, &partition1); + let builder2 = IngesterPartitionBuilder::new(&ns, &table, &schema, &sequencer, &partition2); + + let querier_table = TestQuerierTable::new(&catalog, &table) + .await + .with_ingester_partition( + // this chunk is kept + builder1 + .with_ingester_chunk_id(ingester_chunk_id1) + .with_lp(["table foo=3i 33"]) + .build( + // parquet max persisted sequence number + Some(SequenceNumber::new(2)), + // tombstone max persisted sequence number + Some(SequenceNumber::new(10)), ), + ) + .with_ingester_partition( + // this chunk is filtered out because it has no record batches but the reconciling still takes place + builder2.with_ingester_chunk_id(u128::MAX).build( // parquet max persisted sequence number Some(SequenceNumber::new(3)), // tombstone max persisted sequence number Some(SequenceNumber::new(11)), - Arc::new(None), - vec![], - ) - .unwrap(), - ])); + ), + ); + + let mut chunks = querier_table.chunks().await.unwrap(); - let mut chunks = querier_table.chunks(&pred).await.unwrap(); chunks.sort_by_key(|c| c.id()); // three chunks (two parquet files and one for the in-mem ingester data) @@ -669,7 +560,7 @@ mod tests { chunks[1].id(), ChunkId::new_test(file2.parquet_file.id.get() as u128), ); - assert_eq!(chunks[2].id(), ingester_chunk_id1); + assert_eq!(chunks[2].id(), ChunkId::new_test(ingester_chunk_id1)); // check delete predicates // parquet chunks have predicate attached @@ -682,7 +573,6 @@ mod tests { #[tokio::test] async fn test_ingester_overlap_detection() { maybe_start_logging(); - let pred = Predicate::default(); let catalog = TestCatalog::new(); let ns = catalog.create_namespace("ns").await; @@ -697,12 +587,6 @@ mod tests { .create_partition("k2") .await; - let querier_table = querier_table(&catalog, &table).await; - - let ingester_chunk_id1 = ChunkId::new_test(1); - let ingester_chunk_id2 = ChunkId::new_test(2); - let ingester_chunk_id3 = ChunkId::new_test(3); - let schema = Arc::new( SchemaBuilder::new() .influx_field("foo", InfluxFieldType::Integer) @@ -711,63 +595,45 @@ mod tests { .unwrap(), ); - querier_table - .ingester_connection - .as_any() - .downcast_ref::() - .unwrap() - .next_response(Ok(vec![ - IngesterPartition::try_new( - Arc::from("ingester1"), - ingester_chunk_id1, - Arc::from(ns.namespace.name.clone()), - Arc::from(table.table.name.clone()), - partition1.partition.id, - sequencer.sequencer.id, - Arc::clone(&schema), - // parquet max persisted sequence number - None, - // tombstone max persisted sequence number - None, - Arc::new(None), - vec![lp_to_record_batch("table foo=1i 1")], - ) - .unwrap(), - IngesterPartition::try_new( - Arc::from("ingester1"), - ingester_chunk_id2, - Arc::from(ns.namespace.name.clone()), - Arc::from(table.table.name.clone()), - partition2.partition.id, - sequencer.sequencer.id, - Arc::clone(&schema), - // parquet max persisted sequence number - None, - // tombstone max persisted sequence number - None, - Arc::new(None), - vec![lp_to_record_batch("table foo=2i 2")], - ) - .unwrap(), - IngesterPartition::try_new( - Arc::from("ingester2"), - ingester_chunk_id3, - Arc::from(ns.namespace.name.clone()), - Arc::from(table.table.name.clone()), - partition1.partition.id, - sequencer.sequencer.id, - Arc::clone(&schema), - // parquet max persisted sequence number - None, - // tombstone max persisted sequence number - None, - Arc::new(None), - vec![lp_to_record_batch("table foo=3i 3")], - ) - .unwrap(), - ])); + let builder1 = IngesterPartitionBuilder::new(&ns, &table, &schema, &sequencer, &partition1); + let builder2 = IngesterPartitionBuilder::new(&ns, &table, &schema, &sequencer, &partition2); + + let querier_table = TestQuerierTable::new(&catalog, &table) + .await + .with_ingester_partition( + builder1 + .clone() + .with_ingester_chunk_id(1) + .with_lp(vec!["table foo=1i 1"]) + .build( + // parquet max persisted sequence number + None, // tombstone max persisted sequence number + None, + ), + ) + .with_ingester_partition( + builder2 + .with_ingester_chunk_id(2) + .with_lp(vec!["table foo=2i 2"]) + .build( + // parquet max persisted sequence number + None, // tombstone max persisted sequence number + None, + ), + ) + .with_ingester_partition( + builder1 + .with_ingester_chunk_id(3) + .with_lp(vec!["table foo=3i 3"]) + .build( + // parquet max persisted sequence number + None, // tombstone max persisted sequence number + None, + ), + ); + + let err = querier_table.chunks().await.unwrap_err(); - let err = querier_table.chunks(&pred).await.unwrap_err(); assert_matches!(err, Error::IngestersOverlap { .. }); } @@ -780,32 +646,34 @@ mod tests { let sequencer = ns.create_sequencer(1).await; let partition = table.with_sequencer(&sequencer).create_partition("k").await; let schema = make_schema(&table).await; - let now = now().timestamp_nanos(); - let querier_table = querier_table(&catalog, &table).await; - let builder = - IngesterPartitionBuilder::new(ns, table, schema, sequencer, Arc::clone(&partition)); + let builder = IngesterPartitionBuilder::new(&ns, &table, &schema, &sequencer, &partition) + .with_lp(["table foo=1i 1"]); // Parquet file between with max sequence number 2 partition - .create_parquet_file_with_min_max("table1 foo=1 11", 1, 2, now, now) + .create_parquet_file_with_min_max("table1 foo=1 11", 1, 2, now_nanos(), now_nanos()) .await; let ingester_partition = builder.build_with_max_parquet_sequence_number(Some(SequenceNumber::new(2))); + let querier_table = TestQuerierTable::new(&catalog, &table) + .await + .with_ingester_partition(ingester_partition); + // Expect 2 chunks: one for ingester, and one from parquet file - let chunks = chunks_with_ingester_partition(&querier_table, &ingester_partition).await; + let chunks = querier_table.chunks().await.unwrap(); assert_eq!(chunks.len(), 2); // Now, make a second chunk with max sequence number 3 partition - .create_parquet_file_with_min_max("table1 foo=1 22", 2, 3, now, now) + .create_parquet_file_with_min_max("table1 foo=1 22", 2, 3, now_nanos(), now_nanos()) .await; // With the same ingester response, still expect 2 chunks: one // for ingester, and one from parquet file - let chunks = chunks_with_ingester_partition(&querier_table, &ingester_partition).await; + let chunks = querier_table.chunks().await.unwrap(); assert_eq!(chunks.len(), 2); // update the ingester response to return a new max parquet @@ -813,8 +681,12 @@ mod tests { let ingester_partition = builder.build_with_max_parquet_sequence_number(Some(SequenceNumber::new(3))); + let querier_table = querier_table + .clear_ingester_partitions() + .with_ingester_partition(ingester_partition); + // expect the second file is found, resulting in three chunks - let chunks = chunks_with_ingester_partition(&querier_table, &ingester_partition).await; + let chunks = querier_table.chunks().await.unwrap(); assert_eq!(chunks.len(), 3); } @@ -827,20 +699,15 @@ mod tests { let sequencer = ns.create_sequencer(1).await; let partition = table.with_sequencer(&sequencer).create_partition("k").await; let schema = make_schema(&table).await; - let now = now().timestamp_nanos(); + // Expect 1 chunk with with one delete predicate + let querier_table = TestQuerierTable::new(&catalog, &table).await; - let querier_table = querier_table(&catalog, &table).await; - let builder = IngesterPartitionBuilder::new( - ns, - Arc::clone(&table), - schema, - Arc::clone(&sequencer), - Arc::clone(&partition), - ); + let builder = IngesterPartitionBuilder::new(&ns, &table, &schema, &sequencer, &partition) + .with_lp(["table foo=1i 1"]); // parquet file with max sequence number 1 partition - .create_parquet_file_with_min_max("table1 foo=1 11", 1, 1, now, now) + .create_parquet_file_with_min_max("table1 foo=1 11", 1, 1, now_nanos(), now_nanos()) .await; // tombstone with max sequence number 2 @@ -854,9 +721,9 @@ mod tests { let ingester_partition = builder.build(max_parquet_sequence_number, max_tombstone_sequence_number); - // Expect 1 chunk with with one delete predicate - let deletes = - num_deletes(chunks_with_ingester_partition(&querier_table, &ingester_partition).await); + let querier_table = querier_table.with_ingester_partition(ingester_partition); + + let deletes = num_deletes(querier_table.chunks().await.unwrap()); assert_eq!(&deletes, &[1, 0]); // Now, make a second tombstone with max sequence number 3 @@ -867,18 +734,19 @@ mod tests { // With the same ingester response, still expect 1 delete // (because cache is not cleared) - let deletes = - num_deletes(chunks_with_ingester_partition(&querier_table, &ingester_partition).await); + let deletes = num_deletes(querier_table.chunks().await.unwrap()); assert_eq!(&deletes, &[1, 0]); // update the ingester response to return a new max delete sequence number let max_tombstone_sequence_number = Some(SequenceNumber::new(3)); let ingester_partition = builder.build(max_parquet_sequence_number, max_tombstone_sequence_number); + let querier_table = querier_table + .clear_ingester_partitions() + .with_ingester_partition(ingester_partition); // second tombstone should be found - let deletes = - num_deletes(chunks_with_ingester_partition(&querier_table, &ingester_partition).await); + let deletes = num_deletes(querier_table.chunks().await.unwrap()); assert_eq!(&deletes, &[2, 0]); } @@ -895,20 +763,55 @@ mod tests { ) } - /// Invokes querier_table.chunks modeling the ingester sending `next_response` - async fn chunks_with_ingester_partition( - querier_table: &QuerierTable, - ingester_partition: &IngesterPartition, - ) -> Vec> { - let pred = Predicate::default(); - querier_table - .ingester_connection - .as_any() - .downcast_ref::() - .unwrap() - .next_response(Ok(vec![ingester_partition.clone()])); + /// A `QuerierTable` and some number of `IngesterPartitions` that + /// are fed to the ingester connection on the next call to + /// `chunks()` + struct TestQuerierTable { + // The underling table + querier_table: QuerierTable, + /// Ingester partitions + ingester_partitions: Vec, + } - querier_table.chunks(&pred).await.unwrap() + impl TestQuerierTable { + /// Create a new wrapped [`QuerierTable`] + async fn new(catalog: &Arc, table: &Arc) -> Self { + Self { + querier_table: querier_table(catalog, table).await, + ingester_partitions: vec![], + } + } + + /// Return a reference to the inner table + fn inner(&self) -> &QuerierTable { + &self.querier_table + } + + /// add the `ingester_partition` to the ingester response processed by the table + fn with_ingester_partition(mut self, ingester_partition: IngesterPartition) -> Self { + self.ingester_partitions.push(ingester_partition); + self + } + + /// Clears ingester partitions for next response from ingester + fn clear_ingester_partitions(mut self) -> Self { + self.ingester_partitions.clear(); + self + } + + /// Invokes querier_table.chunks modeling the ingester sending the partitions in this table + async fn chunks(&self) -> Result>> { + let pred = Predicate::default(); + + self.querier_table + .ingester_connection + .as_any() + .downcast_ref::() + .unwrap() + .next_response(Ok(self.ingester_partitions.clone())); + + self.querier_table.chunks(&pred).await + } } /// returns the number of deletes in each chunk @@ -919,4 +822,9 @@ mod tests { .map(|chunk| chunk.delete_predicates().len()) .collect() } + + /// returns the value of now as nanoseconds since the epoch + fn now_nanos() -> i64 { + now().timestamp_nanos() + } } diff --git a/querier/src/table/test_util.rs b/querier/src/table/test_util.rs index c8be1db0eb..af19f171b7 100644 --- a/querier/src/table/test_util.rs +++ b/querier/src/table/test_util.rs @@ -68,35 +68,48 @@ pub(crate) struct IngesterPartitionBuilder { partition_sort_key: Arc>, /// Data returned from the partition, in line protocol format - lp: String, + lp: Vec, } impl IngesterPartitionBuilder { pub(crate) fn new( - ns: Arc, - table: Arc, - schema: Arc, - sequencer: Arc, - partition: Arc, + ns: &Arc, + table: &Arc, + schema: &Arc, + sequencer: &Arc, + partition: &Arc, ) -> Self { Self { - ns, - table, - schema, - sequencer, - partition, + ns: Arc::clone(ns), + table: Arc::clone(table), + schema: Arc::clone(schema), + sequencer: Arc::clone(sequencer), + partition: Arc::clone(partition), ingester_name: Arc::from("ingester1"), partition_sort_key: Arc::new(None), ingester_chunk_id: 1, - lp: "table foo=1i 1".to_string(), + lp: Vec::new(), } } + /// set the partition chunk id to use when creating partitons + pub(crate) fn with_ingester_chunk_id(mut self, ingester_chunk_id: u128) -> Self { + self.ingester_chunk_id = ingester_chunk_id; + self + } + + /// Set the line protocol that will be present in this partition + /// with an interator of `AsRef`s + pub(crate) fn with_lp(mut self, lp: impl IntoIterator>) -> Self { + self.lp = lp.into_iter().map(|s| s.as_ref().to_string()).collect(); + self + } + /// Create a ingester partition with the specified max parquet sequence number pub(crate) fn build_with_max_parquet_sequence_number( &self, parquet_max_sequence_number: Option, - ) -> Arc { + ) -> IngesterPartition { let tombstone_max_sequence_number = None; self.build(parquet_max_sequence_number, tombstone_max_sequence_number) @@ -107,22 +120,22 @@ impl IngesterPartitionBuilder { &self, parquet_max_sequence_number: Option, tombstone_max_sequence_number: Option, - ) -> Arc { - Arc::new( - IngesterPartition::try_new( - Arc::clone(&self.ingester_name), - ChunkId::new_test(self.ingester_chunk_id), - Arc::from(self.ns.namespace.name.as_str()), - Arc::from(self.table.table.name.as_str()), - self.partition.partition.id, - self.sequencer.sequencer.id, - Arc::clone(&self.schema), - parquet_max_sequence_number, - tombstone_max_sequence_number, - Arc::clone(&self.partition_sort_key), - vec![lp_to_record_batch(&self.lp)], - ) - .unwrap(), + ) -> IngesterPartition { + let data = self.lp.iter().map(|lp| lp_to_record_batch(lp)).collect(); + + IngesterPartition::try_new( + Arc::clone(&self.ingester_name), + ChunkId::new_test(self.ingester_chunk_id), + Arc::from(self.ns.namespace.name.as_str()), + Arc::from(self.table.table.name.as_str()), + self.partition.partition.id, + self.sequencer.sequencer.id, + Arc::clone(&self.schema), + parquet_max_sequence_number, + tombstone_max_sequence_number, + Arc::clone(&self.partition_sort_key), + data, ) + .unwrap() } }