feat: function to get parttion candidates from partition table (#6519)

* feat: function to get parttion candidates from partition table

* chore: cleanup

* fix: make new_file_at the same value as created_at

* chore: cleanup

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Nga Tran 2023-01-06 11:20:45 -05:00 committed by GitHub
parent c87e1c8fe2
commit b856edf826
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 297 additions and 3 deletions

View File

@ -904,6 +904,9 @@ pub struct Partition {
///
/// If [`None`] no data has been persisted for this partition.
pub persisted_sequence_number: Option<SequenceNumber>,
/// The time at which the newest file of the partition is created
pub new_file_at: Option<Timestamp>,
}
impl Partition {
@ -922,6 +925,7 @@ impl Partition {
pub struct PartitionParam {
/// the partition
pub partition_id: PartitionId,
// Remove this shard_id: https://github.com/influxdata/influxdb_iox/issues/6518
/// the partition's shard
pub shard_id: ShardId,
/// the partition's namespace

View File

@ -1269,6 +1269,7 @@ mod tests {
persisted_sequence_number: None,
partition_key: PartitionKey::from("2022-06-21"),
sort_key: Vec::new(),
new_file_at: None,
};
let sort_key = get_sort_key(&partition, &m).1.unwrap();
let sort_key = sort_key.to_columns().collect::<Vec<_>>();
@ -1317,6 +1318,7 @@ mod tests {
partition_key: PartitionKey::from("2022-06-21"),
// N.B. sort key is already what it will computed to; here we're testing the `adjust_sort_key_columns` code path
sort_key: vec!["host".to_string(), "arch".to_string(), "time".to_string()],
new_file_at: None,
};
// ensure sort key is unchanged
let _maybe_updated_sk = get_sort_key(&partition, &m).1;
@ -1364,6 +1366,7 @@ mod tests {
partition_key: PartitionKey::from("2022-06-21"),
// N.B. is missing host so will need updating
sort_key: vec!["arch".to_string(), "time".to_string()],
new_file_at: None,
};
let sort_key = get_sort_key(&partition, &m).1.unwrap();
let sort_key = sort_key.to_columns().collect::<Vec<_>>();
@ -1413,6 +1416,7 @@ mod tests {
partition_key: PartitionKey::from("2022-06-21"),
// N.B. is missing arch so will need updating
sort_key: vec!["host".to_string(), "time".to_string()],
new_file_at: None,
};
let sort_key = get_sort_key(&partition, &m).1.unwrap();
let sort_key = sort_key.to_columns().collect::<Vec<_>>();

View File

@ -315,6 +315,7 @@ mod tests {
partition_key: stored_partition_key.clone(),
sort_key: vec!["dos".to_string(), "bananas".to_string()],
persisted_sequence_number: Default::default(),
new_file_at: Default::default(),
};
let cache = new_cache(inner, [partition]);
@ -374,6 +375,7 @@ mod tests {
partition_key: PARTITION_KEY.into(),
sort_key: Default::default(),
persisted_sequence_number: Default::default(),
new_file_at: Default::default(),
};
let cache = new_cache(inner, [partition]);
@ -418,6 +420,7 @@ mod tests {
partition_key: PARTITION_KEY.into(),
sort_key: Default::default(),
persisted_sequence_number: Default::default(),
new_file_at: Default::default(),
};
let cache = new_cache(inner, [partition]);
@ -462,6 +465,7 @@ mod tests {
partition_key: PARTITION_KEY.into(),
sort_key: Default::default(),
persisted_sequence_number: Default::default(),
new_file_at: Default::default(),
};
let cache = new_cache(inner, [partition]);

View File

@ -301,6 +301,7 @@ mod tests {
partition_key: stored_partition_key.clone(),
sort_key: vec!["dos".to_string(), "bananas".to_string()],
persisted_sequence_number: Default::default(),
new_file_at: Default::default(),
};
let cache = new_cache(inner, [partition]);
@ -365,6 +366,7 @@ mod tests {
partition_key: PARTITION_KEY.into(),
sort_key: Default::default(),
persisted_sequence_number: Default::default(),
new_file_at: Default::default(),
};
let cache = new_cache(inner, [partition]);
@ -413,6 +415,7 @@ mod tests {
partition_key: PARTITION_KEY.into(),
sort_key: Default::default(),
persisted_sequence_number: Default::default(),
new_file_at: Default::default(),
};
let cache = new_cache(inner, [partition]);

View File

@ -7,7 +7,7 @@ RETURNS TRIGGER
LANGUAGE PLPGSQL
AS $$
BEGIN
UPDATE partition SET new_file_at = EXTRACT(EPOCH FROM now() ) * 1000000000 WHERE id = NEW.partition_id;
UPDATE partition SET new_file_at = NEW.created_at WHERE id = NEW.partition_id;
RETURN NEW;
END;

View File

@ -512,6 +512,14 @@ pub trait PartitionRepo: Send + Sync {
/// Return the N most recently created partitions for the specified shards.
async fn most_recent_n(&mut self, n: usize, shards: &[ShardId]) -> Result<Vec<Partition>>;
/// Select partition for cold/warm/hot compaction
/// These are partitions with files created recently (aka created after the specified time_in_the_past)
/// These files include all levels of compaction files
async fn partitions_with_recent_created_files(
&mut self,
time_in_the_past: Timestamp,
) -> Result<Vec<PartitionParam>>;
}
/// Functions for working with tombstones in the catalog
@ -625,6 +633,7 @@ pub trait ParquetFileRepo: Send + Sync {
max_time: Timestamp,
) -> Result<Vec<ParquetFile>>;
// Remove this function: https://github.com/influxdata/influxdb_iox/issues/6518
/// Select partition for cold/warm/hot compaction
/// These are partitions with files created recently (aka created after the specified time_in_the_past)
/// These files include all levels of compaction files and both non-deleted and soft-deleted files
@ -633,6 +642,7 @@ pub trait ParquetFileRepo: Send + Sync {
time_in_the_past: Timestamp,
) -> Result<Vec<PartitionId>>;
// Remove this function: https://github.com/influxdata/influxdb_iox/issues/6518
/// List the most recent highest throughput partition for a given shard, if specified
async fn recent_highest_throughput_partitions(
&mut self,
@ -642,6 +652,7 @@ pub trait ParquetFileRepo: Send + Sync {
num_partitions: usize,
) -> Result<Vec<PartitionParam>>;
// Remove this function: https://github.com/influxdata/influxdb_iox/issues/6518
/// List the partitions for a given shard that have at least the given count of L1 files no
/// bigger than the provided size threshold. Limits the number of partitions returned to
/// num_partitions, for performance.
@ -653,6 +664,7 @@ pub trait ParquetFileRepo: Send + Sync {
num_partitions: usize,
) -> Result<Vec<PartitionParam>>;
// Remove this function: https://github.com/influxdata/influxdb_iox/issues/6518
/// List partitions with the most level 0 + level 1 files created earlier than
/// `older_than_num_hours` hours ago for a given shard (if specified). In other words, "cold"
/// partitions that need compaction.
@ -942,6 +954,7 @@ pub(crate) mod test_helpers {
pub(crate) async fn test_catalog(catalog: Arc<dyn Catalog>) {
test_setup(Arc::clone(&catalog)).await;
test_partitions_with_recent_created_files(Arc::clone(&catalog)).await;
test_most_cold_files_partitions(Arc::clone(&catalog)).await;
test_topic(Arc::clone(&catalog)).await;
test_query_pool(Arc::clone(&catalog)).await;
@ -955,7 +968,6 @@ pub(crate) mod test_helpers {
test_parquet_file(Arc::clone(&catalog)).await;
test_parquet_file_compaction_level_0(Arc::clone(&catalog)).await;
test_parquet_file_compaction_level_1(Arc::clone(&catalog)).await;
test_partitions_with_recent_created_files(Arc::clone(&catalog)).await;
test_recent_highest_throughput_partitions(Arc::clone(&catalog)).await;
test_partitions_with_small_l1_file_count(Arc::clone(&catalog)).await;
test_update_to_compaction_level_1(Arc::clone(&catalog)).await;
@ -1144,6 +1156,28 @@ pub(crate) mod test_helpers {
.update_retention_period(namespace4_name, None)
.await
.expect("namespace should be updateable");
// remove namespace to avoid it from affecting later tests
repos
.namespaces()
.delete("test_namespace")
.await
.expect("delete namespace should succeed");
repos
.namespaces()
.delete("test_namespace2")
.await
.expect("delete namespace should succeed");
repos
.namespaces()
.delete("test_namespace3")
.await
.expect("delete namespace should succeed");
repos
.namespaces()
.delete("test_namespace4")
.await
.expect("delete namespace should succeed");
}
async fn test_table(catalog: Arc<dyn Catalog>) {
@ -1272,6 +1306,17 @@ pub(crate) mod test_helpers {
namespace_id: _
}
));
repos
.namespaces()
.delete("namespace_table_test")
.await
.expect("delete namespace should succeed");
repos
.namespaces()
.delete("two")
.await
.expect("delete namespace should succeed");
}
async fn test_column(catalog: Arc<dyn Catalog>) {
@ -1420,6 +1465,12 @@ pub(crate) mod test_helpers {
let mut table3_column_names: Vec<_> = table3_columns.iter().map(|c| &c.name).collect();
table3_column_names.sort();
assert_eq!(table3_column_names, vec!["apples", "oranges"]);
repos
.namespaces()
.delete("namespace_column_test")
.await
.expect("delete namespace should succeed");
}
async fn test_shards(catalog: Arc<dyn Catalog>) {
@ -1806,6 +1857,17 @@ pub(crate) mod test_helpers {
.await
.expect("should list most recent");
assert_eq!(recent, recent2);
repos
.namespaces()
.delete("namespace_partition_test2")
.await
.expect("delete namespace should succeed");
repos
.namespaces()
.delete("namespace_partition_test")
.await
.expect("delete namespace should succeed");
}
async fn test_tombstone(catalog: Arc<dyn Catalog>) {
@ -1969,6 +2031,17 @@ pub(crate) mod test_helpers {
assert_eq!(ts, t4.clone()); // still there
let ts = repos.tombstones().get_by_id(t5.id).await.unwrap().unwrap();
assert_eq!(ts, t5.clone()); // still there
repos
.namespaces()
.delete("namespace_tombstone_test2")
.await
.expect("delete namespace should succeed");
repos
.namespaces()
.delete("namespace_tombstone_test")
.await
.expect("delete namespace should succeed");
}
async fn test_tombstones_by_parquet_file(catalog: Arc<dyn Catalog>) {
@ -2188,6 +2261,12 @@ pub(crate) mod test_helpers {
"\ntombstones: {:#?}\nexpected: {:#?}\nparquet_file: {:#?}",
tombstones, expected, parquet_file
);
repos
.namespaces()
.delete("namespace_tombstones_by_parquet_file_test")
.await
.expect("delete namespace should succeed");
}
async fn test_parquet_file(catalog: Arc<dyn Catalog>) {
@ -2820,6 +2899,13 @@ pub(crate) mod test_helpers {
"\nlevel 0: {:#?}\nexpected: {:#?}",
level_0, expected,
);
// drop the namespace to avoid the created data in this tests from affecting other tests
repos
.namespaces()
.delete("namespace_parquet_file_compaction_level_0_test")
.await
.expect("delete namespace should succeed");
}
async fn test_parquet_file_compaction_level_1(catalog: Arc<dyn Catalog>) {
@ -3047,6 +3133,13 @@ pub(crate) mod test_helpers {
"\nlevel 1: {:#?}\nexpected: {:#?}",
level_1, expected,
);
// drop the namespace to avoid the created data in this tests from affecting other tests
repos
.namespaces()
.delete("namespace_parquet_file_compaction_level_1_test")
.await
.expect("delete namespace should succeed");
}
async fn test_most_cold_files_partitions(catalog: Arc<dyn Catalog>) {
@ -3690,6 +3783,14 @@ pub(crate) mod test_helpers {
assert_eq!(partitions[2].partition_id, partition_3.id);
// then should be partition_1 witth 1 file: 1 L0
assert_eq!(partitions[3].partition_id, partition_1.id);
// drop the namespace to avoid the created data in this tests from affecting other tests
repos
.namespaces()
.delete("test_most_level_0_files_partitions")
.await
.expect("delete namespace should succeed");
repos.abort().await.unwrap();
}
@ -3740,6 +3841,13 @@ pub(crate) mod test_helpers {
.await
.unwrap();
assert!(partitions.is_empty());
// get from partition table
let partitions = repos
.partitions()
.partitions_with_recent_created_files(time_two_hour_ago)
.await
.unwrap();
assert!(partitions.is_empty());
// -----------------
// PARTITION one
@ -3755,6 +3863,13 @@ pub(crate) mod test_helpers {
.await
.unwrap();
assert!(partitions.is_empty());
// get from partition table
let partitions = repos
.partitions()
.partitions_with_recent_created_files(time_two_hour_ago)
.await
.unwrap();
assert!(partitions.is_empty());
// create files for partition one
@ -3792,6 +3907,13 @@ pub(crate) mod test_helpers {
.unwrap();
// still empty becasue the file was not recently created
assert!(partitions.is_empty());
// get from partition table
let partitions = repos
.partitions()
.partitions_with_recent_created_files(time_two_hour_ago)
.await
.unwrap();
assert!(partitions.is_empty());
// create a deleted L0 file that was created 1 hour ago which is recently
let l0_one_hour_ago_file_params = ParquetFileParams {
@ -3812,6 +3934,14 @@ pub(crate) mod test_helpers {
// partition one should be returned
assert_eq!(partitions.len(), 1);
assert!(partitions.contains(&partition1.id));
// get from partition table
let partitions = repos
.partitions()
.partitions_with_recent_created_files(time_two_hour_ago)
.await
.unwrap();
assert_eq!(partitions.len(), 1);
assert_eq!(partitions[0].partition_id, partition1.id);
// -----------------
// PARTITION two
@ -3829,6 +3959,14 @@ pub(crate) mod test_helpers {
// should return partittion one only
assert_eq!(partitions.len(), 1);
assert!(partitions.contains(&partition1.id));
// get from partition table
let partitions = repos
.partitions()
.partitions_with_recent_created_files(time_two_hour_ago)
.await
.unwrap();
assert_eq!(partitions.len(), 1);
assert_eq!(partitions[0].partition_id, partition1.id);
// Add a L0 file created non-recently (5 hours ago)
let l0_five_hour_ago_file_params = ParquetFileParams {
@ -3850,6 +3988,14 @@ pub(crate) mod test_helpers {
// still return partittione one only
assert_eq!(partitions.len(), 1);
assert!(partitions.contains(&partition1.id));
// get from partition table
let partitions = repos
.partitions()
.partitions_with_recent_created_files(time_two_hour_ago)
.await
.unwrap();
assert_eq!(partitions.len(), 1);
assert_eq!(partitions[0].partition_id, partition1.id);
// Add a L1 created recently (just now)
let l1_file_params = ParquetFileParams {
@ -3873,6 +4019,18 @@ pub(crate) mod test_helpers {
assert_eq!(partitions.len(), 2);
assert!(partitions.contains(&partition1.id));
assert!(partitions.contains(&partition2.id));
// get from partition table
let partitions = repos
.partitions()
.partitions_with_recent_created_files(time_two_hour_ago)
.await
.unwrap();
assert_eq!(partitions.len(), 2);
// sort by partition id
let mut partitions = partitions;
partitions.sort_by(|a, b| a.partition_id.cmp(&b.partition_id));
assert_eq!(partitions[0].partition_id, partition1.id);
assert_eq!(partitions[1].partition_id, partition2.id);
// -----------------
// PARTITION three
@ -3891,6 +4049,18 @@ pub(crate) mod test_helpers {
assert_eq!(partitions.len(), 2);
assert!(partitions.contains(&partition1.id));
assert!(partitions.contains(&partition2.id));
// get from partition table
let partitions = repos
.partitions()
.partitions_with_recent_created_files(time_two_hour_ago)
.await
.unwrap();
assert_eq!(partitions.len(), 2);
// sort by partition id
let mut partitions = partitions;
partitions.sort_by(|a, b| a.partition_id.cmp(&b.partition_id));
assert_eq!(partitions[0].partition_id, partition1.id);
assert_eq!(partitions[1].partition_id, partition2.id);
// add an L0 file created recently (one hour ago)
let l0_one_hour_ago_file_params = ParquetFileParams {
@ -3914,8 +4084,21 @@ pub(crate) mod test_helpers {
assert!(partitions.contains(&partition1.id));
assert!(partitions.contains(&partition2.id));
assert!(partitions.contains(&partition3.id));
// get from partition table
let partitions = repos
.partitions()
.partitions_with_recent_created_files(time_two_hour_ago)
.await
.unwrap();
assert_eq!(partitions.len(), 3);
// sort by partition id
let mut partitions = partitions;
partitions.sort_by(|a, b| a.partition_id.cmp(&b.partition_id));
assert_eq!(partitions[0].partition_id, partition1.id);
assert_eq!(partitions[1].partition_id, partition2.id);
assert_eq!(partitions[2].partition_id, partition3.id);
// drop the namespace to avoid the crearted data in this tests from affacting other tests
// drop the namespace to avoid the created data in this tests from affecting other tests
repos
.namespaces()
.delete("test_partitions_with_recent_created_files")
@ -4342,6 +4525,14 @@ pub(crate) mod test_helpers {
.unwrap();
assert_eq!(partitions.len(), 1);
assert_eq!(partitions[0].partition_id, partition.id);
// Uncomment this out after https://github.com/influxdata/influxdb_iox/issues/6517 is fixed
// // remove namespace to avoid it from affecting later tests
// repos
// .namespaces()
// .delete("test_recent_highest_throughput_partitions")
// .await
// .expect("delete namespace should succeed");
}
async fn test_partitions_with_small_l1_file_count(catalog: Arc<dyn Catalog>) {
@ -4539,6 +4730,7 @@ pub(crate) mod test_helpers {
)
.await
.unwrap();
// BUG: https://github.com/influxdata/influxdb_iox/issues/6517
assert_eq!(partitions.len(), 1);
// must be the partition with 2 files
assert_eq!(partitions[0].partition_id, another_partition.id);
@ -4579,6 +4771,13 @@ pub(crate) mod test_helpers {
.unwrap();
assert_eq!(partitions.len(), 1);
assert_eq!(partitions[0].partition_id, partition.id);
// remove namespace to avoid it from affecting later tests
repos
.namespaces()
.delete("test_partitions_with_small_l1_file_count")
.await
.expect("delete namespace should succeed");
}
async fn test_list_by_partiton_not_to_delete(catalog: Arc<dyn Catalog>) {
@ -4700,6 +4899,13 @@ pub(crate) mod test_helpers {
assert_eq!(files.len(), 2);
assert_matches!(files.iter().find(|f| f.id == parquet_file.id), Some(_));
assert_matches!(files.iter().find(|f| f.id == level1_file.id), Some(_));
// remove namespace to avoid it from affecting later tests
repos
.namespaces()
.delete("namespace_parquet_file_test_list_by_partiton_not_to_delete")
.await
.expect("delete namespace should succeed");
}
async fn test_update_to_compaction_level_1(catalog: Arc<dyn Catalog>) {
@ -4828,6 +5034,13 @@ pub(crate) mod test_helpers {
"\nlevel 1: {:#?}\nexpected: {:#?}",
level_1, expected,
);
// remove namespace to avoid it from affecting later tests
repos
.namespaces()
.delete("namespace_update_to_compaction_level_1_test")
.await
.expect("delete namespace should succeed");
}
async fn test_processed_tombstones(catalog: Arc<dyn Catalog>) {
@ -5006,6 +5219,13 @@ pub(crate) mod test_helpers {
.await
.unwrap();
assert_eq!(count, 0);
// remove namespace to avoid it from affecting later tests
repos
.namespaces()
.delete("namespace_processed_tombstone_test")
.await
.expect("delete namespace should succeed");
}
async fn test_delete_namespace(catalog: Arc<dyn Catalog>) {

View File

@ -812,6 +812,7 @@ impl PartitionRepo for MemTxn {
partition_key: key,
sort_key: vec![],
persisted_sequence_number: None,
new_file_at: None,
};
stage.partitions.push(p);
stage.partitions.last().unwrap()
@ -985,6 +986,37 @@ impl PartitionRepo for MemTxn {
.cloned()
.collect())
}
async fn partitions_with_recent_created_files(
&mut self,
time_in_the_past: Timestamp,
) -> Result<Vec<PartitionParam>> {
let stage = self.stage();
let partitions: Vec<_> = stage
.partitions
.iter()
.filter(|p| p.new_file_at > Some(time_in_the_past))
.map(|p| {
// get namesapce_id of this partition
let namespace_id = stage
.tables
.iter()
.find(|t| t.id == p.table_id)
.map(|t| t.namespace_id)
.unwrap_or(NamespaceId::new(1));
PartitionParam {
partition_id: p.id,
table_id: p.table_id,
shard_id: ShardId::new(1), // this is unused and will be removed when we remove shard_id
namespace_id,
}
})
.collect();
Ok(partitions)
}
}
#[async_trait]
@ -1163,6 +1195,14 @@ impl ParquetFileRepo for MemTxn {
};
stage.parquet_files.push(parquet_file);
// Update the new_file_at field its partition to the time of created_at
let partition = stage
.partitions
.iter_mut()
.find(|p| p.id == partition_id)
.ok_or(Error::PartitionNotFound { id: partition_id })?;
partition.new_file_at = Some(created_at);
Ok(stage.parquet_files.last().unwrap().clone())
}

View File

@ -252,6 +252,7 @@ decorate!(
"partition_delete_skipped_compactions" = delete_skipped_compactions(&mut self, partition_id: PartitionId) -> Result<Option<SkippedCompaction>>;
"partition_update_persisted_sequence_number" = update_persisted_sequence_number(&mut self, partition_id: PartitionId, sequence_number: SequenceNumber) -> Result<()>;
"partition_most_recent_n" = most_recent_n(&mut self, n: usize, shards: &[ShardId]) -> Result<Vec<Partition>>;
"partitions_with_recent_created_files" = partitions_with_recent_created_files(&mut self, time_in_the_past: Timestamp) -> Result<Vec<PartitionParam>>;
]
);

View File

@ -1413,6 +1413,24 @@ WHERE id = $2;
.await
.map_err(|e| Error::SqlxError { source: e })
}
async fn partitions_with_recent_created_files(
&mut self,
time_in_the_past: Timestamp,
) -> Result<Vec<PartitionParam>> {
sqlx::query_as(
r#"
SELECT p.id as partition_id, p.table_id, t.namespace_id, p.shard_id
FROM partition p, table_name t
WHERE p.new_file_at > $1
AND p.table_id = t.id;
"#,
)
.bind(time_in_the_past) // $1
.fetch_all(&mut self.inner)
.await
.map_err(|e| Error::SqlxError { source: e })
}
}
#[async_trait]