chore: revert teaching compactor to use sort_key_ids (#8574)
parent
d029265696
commit
2eb74ddb87
|
@ -1,46 +0,0 @@
|
|||
use std::{fmt::Display, sync::Arc};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use backoff::{Backoff, BackoffConfig};
|
||||
use data_types::{Column, TableId};
|
||||
use iox_catalog::interface::Catalog;
|
||||
|
||||
use super::ColumnsSource;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct CatalogColumnsSource {
|
||||
backoff_config: BackoffConfig,
|
||||
catalog: Arc<dyn Catalog>,
|
||||
}
|
||||
|
||||
impl CatalogColumnsSource {
|
||||
pub fn new(backoff_config: BackoffConfig, catalog: Arc<dyn Catalog>) -> Self {
|
||||
Self {
|
||||
backoff_config,
|
||||
catalog,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Display for CatalogColumnsSource {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "catalog")
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl ColumnsSource for CatalogColumnsSource {
|
||||
async fn fetch(&self, table: TableId) -> Vec<Column> {
|
||||
Backoff::new(&self.backoff_config)
|
||||
.retry_all_errors("table_of_given_table_id", || async {
|
||||
self.catalog
|
||||
.repositories()
|
||||
.await
|
||||
.columns()
|
||||
.list_by_table_id(table)
|
||||
.await
|
||||
})
|
||||
.await
|
||||
.expect("retry forever")
|
||||
}
|
||||
}
|
|
@ -1,71 +0,0 @@
|
|||
use std::{collections::HashMap, fmt::Display};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use data_types::{Column, TableId};
|
||||
|
||||
use super::ColumnsSource;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct MockColumnsSource {
|
||||
tables: HashMap<TableId, Vec<Column>>,
|
||||
}
|
||||
|
||||
impl MockColumnsSource {
|
||||
#[allow(dead_code)] // not used anywhere
|
||||
pub fn new(tables: HashMap<TableId, Vec<Column>>) -> Self {
|
||||
Self { tables }
|
||||
}
|
||||
}
|
||||
|
||||
impl Display for MockColumnsSource {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "mock")
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl ColumnsSource for MockColumnsSource {
|
||||
async fn fetch(&self, table: TableId) -> Vec<Column> {
|
||||
self.tables.get(&table).cloned().unwrap_or_default()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use data_types::ColumnType;
|
||||
use iox_tests::{ColumnBuilder, TableBuilder};
|
||||
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_display() {
|
||||
assert_eq!(
|
||||
MockColumnsSource::new(HashMap::default()).to_string(),
|
||||
"mock",
|
||||
)
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_fetch() {
|
||||
// // t_1 has one column and t_2 has no column
|
||||
let t1 = TableBuilder::new(1).with_name("table1").build();
|
||||
let t1_c1 = ColumnBuilder::new(1, t1.id.get())
|
||||
.with_name("time")
|
||||
.with_column_type(ColumnType::Time)
|
||||
.build();
|
||||
let t2 = TableBuilder::new(2).with_name("table2").build();
|
||||
|
||||
let tables = HashMap::from([(t1.id, vec![t1_c1.clone()]), (t2.id, vec![])]);
|
||||
let source = MockColumnsSource::new(tables);
|
||||
|
||||
// different tables
|
||||
assert_eq!(source.fetch(t1.id).await, vec![t1_c1.clone()],);
|
||||
assert_eq!(source.fetch(t2.id).await, vec![]);
|
||||
|
||||
// fetching does not drain
|
||||
assert_eq!(source.fetch(t1.id).await, vec![t1_c1],);
|
||||
|
||||
// unknown table => empty result
|
||||
assert_eq!(source.fetch(TableId::new(3)).await, vec![]);
|
||||
}
|
||||
}
|
|
@ -1,15 +0,0 @@
|
|||
use std::fmt::{Debug, Display};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use data_types::{Column, TableId};
|
||||
|
||||
pub mod catalog;
|
||||
pub mod mock;
|
||||
|
||||
#[async_trait]
|
||||
pub trait ColumnsSource: Debug + Display + Send + Sync {
|
||||
/// Get Columns for a given table
|
||||
///
|
||||
/// This method performs retries.
|
||||
async fn fetch(&self, table: TableId) -> Vec<Column>;
|
||||
}
|
|
@ -12,7 +12,6 @@ use crate::{config::Config, error::ErrorKind, object_store::ignore_writes::Ignor
|
|||
|
||||
use super::{
|
||||
changed_files_filter::logging::LoggingChangedFiles,
|
||||
columns_source::catalog::CatalogColumnsSource,
|
||||
commit::CommitToScheduler,
|
||||
compaction_job_done_sink::{
|
||||
error_kind::ErrorKindCompactionJobDoneSinkWrapper,
|
||||
|
@ -193,7 +192,6 @@ fn make_compaction_job_stream(
|
|||
|
||||
fn make_partition_info_source(config: &Config) -> Arc<dyn PartitionInfoSource> {
|
||||
Arc::new(SubSourcePartitionInfoSource::new(
|
||||
CatalogColumnsSource::new(config.backoff_config.clone(), Arc::clone(&config.catalog)),
|
||||
LoggingPartitionSourceWrapper::new(MetricsPartitionSourceWrapper::new(
|
||||
CatalogPartitionSource::new(config.backoff_config.clone(), Arc::clone(&config.catalog)),
|
||||
&config.metric_registry,
|
||||
|
|
|
@ -12,7 +12,6 @@ use self::{
|
|||
};
|
||||
|
||||
pub mod changed_files_filter;
|
||||
pub mod columns_source;
|
||||
pub(crate) mod commit;
|
||||
pub mod compaction_job_done_sink;
|
||||
pub mod compaction_job_stream;
|
||||
|
|
|
@ -2,12 +2,11 @@ use std::{fmt::Display, sync::Arc};
|
|||
|
||||
use async_trait::async_trait;
|
||||
use data_types::PartitionId;
|
||||
use schema::sort::SortKey;
|
||||
|
||||
use crate::{
|
||||
components::{
|
||||
columns_source::ColumnsSource, namespaces_source::NamespacesSource,
|
||||
partition_source::PartitionSource, tables_source::TablesSource,
|
||||
namespaces_source::NamespacesSource, partition_source::PartitionSource,
|
||||
tables_source::TablesSource,
|
||||
},
|
||||
error::DynError,
|
||||
partition_info::PartitionInfo,
|
||||
|
@ -16,34 +15,25 @@ use crate::{
|
|||
use super::PartitionInfoSource;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct SubSourcePartitionInfoSource<C, P, T, N>
|
||||
pub struct SubSourcePartitionInfoSource<P, T, N>
|
||||
where
|
||||
C: ColumnsSource,
|
||||
P: PartitionSource,
|
||||
T: TablesSource,
|
||||
N: NamespacesSource,
|
||||
{
|
||||
columns_source: C,
|
||||
partition_source: P,
|
||||
tables_source: T,
|
||||
namespaces_source: N,
|
||||
}
|
||||
|
||||
impl<C, P, T, N> SubSourcePartitionInfoSource<C, P, T, N>
|
||||
impl<P, T, N> SubSourcePartitionInfoSource<P, T, N>
|
||||
where
|
||||
C: ColumnsSource,
|
||||
P: PartitionSource,
|
||||
T: TablesSource,
|
||||
N: NamespacesSource,
|
||||
{
|
||||
pub fn new(
|
||||
columns_source: C,
|
||||
partition_source: P,
|
||||
tables_source: T,
|
||||
namespaces_source: N,
|
||||
) -> Self {
|
||||
pub fn new(partition_source: P, tables_source: T, namespaces_source: N) -> Self {
|
||||
Self {
|
||||
columns_source,
|
||||
partition_source,
|
||||
tables_source,
|
||||
namespaces_source,
|
||||
|
@ -51,9 +41,8 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
impl<C, P, T, N> Display for SubSourcePartitionInfoSource<C, P, T, N>
|
||||
impl<P, T, N> Display for SubSourcePartitionInfoSource<P, T, N>
|
||||
where
|
||||
C: ColumnsSource,
|
||||
P: PartitionSource,
|
||||
T: TablesSource,
|
||||
N: NamespacesSource,
|
||||
|
@ -68,9 +57,8 @@ where
|
|||
}
|
||||
|
||||
#[async_trait]
|
||||
impl<C, P, T, N> PartitionInfoSource for SubSourcePartitionInfoSource<C, P, T, N>
|
||||
impl<P, T, N> PartitionInfoSource for SubSourcePartitionInfoSource<P, T, N>
|
||||
where
|
||||
C: ColumnsSource,
|
||||
P: PartitionSource,
|
||||
T: TablesSource,
|
||||
N: NamespacesSource,
|
||||
|
@ -108,43 +96,6 @@ where
|
|||
.get(&table.name)
|
||||
.ok_or_else::<DynError, _>(|| String::from("Cannot find table schema").into())?;
|
||||
|
||||
// fetch table columns to get column names for the partition's sort_key_ids
|
||||
let columns = self.columns_source.fetch(table.id).await;
|
||||
|
||||
// sort_key_ids of the partition
|
||||
let sort_key_ids = partition.sort_key_ids_none_if_empty();
|
||||
// sort_key of the partition. This will be removed but until then, use it to validate the
|
||||
// sort_key computed by mapping sort_key_ids to column names
|
||||
let p_sort_key = partition.sort_key();
|
||||
|
||||
// convert column ids to column names
|
||||
let sort_key = sort_key_ids.as_ref().map(|ids| {
|
||||
let names = ids
|
||||
.iter()
|
||||
.map(|id| {
|
||||
columns
|
||||
.iter()
|
||||
.find(|c| c.id == *id)
|
||||
.map(|c| c.name.clone())
|
||||
.ok_or_else::<DynError, _>(|| {
|
||||
format!(
|
||||
"Cannot find column with id {} for table {}",
|
||||
id.get(),
|
||||
table.name
|
||||
)
|
||||
.into()
|
||||
})
|
||||
})
|
||||
.collect::<Result<Vec<_>, _>>()
|
||||
.expect("Cannot find column names for sort key ids");
|
||||
|
||||
SortKey::from_columns(names.iter().map(|s| &**s))
|
||||
});
|
||||
|
||||
// This is here to catch bugs if any while mapping sort_key_ids to column names
|
||||
// This wil be removed once sort_key is removed from partition
|
||||
assert_eq!(sort_key, p_sort_key);
|
||||
|
||||
Ok(Arc::new(PartitionInfo {
|
||||
partition_id,
|
||||
partition_hash_id: partition.hash_id().cloned(),
|
||||
|
@ -152,7 +103,7 @@ where
|
|||
namespace_name: namespace.name,
|
||||
table: Arc::new(table),
|
||||
table_schema: Arc::new(table_schema.clone()),
|
||||
sort_key,
|
||||
sort_key: partition.sort_key(),
|
||||
partition_key: partition.partition_key,
|
||||
}))
|
||||
}
|
||||
|
|
|
@ -738,7 +738,7 @@ async fn random_backfill_empty_partition() {
|
|||
- "L0 "
|
||||
- "L0.190[0,355] 1.02us 76mb|------------L0.190------------| "
|
||||
- "L0.193[356,668] 1.02us 79mb |----------L0.193----------| "
|
||||
- "L0.194[669,986] 1.02us 67mb |----------L0.194----------| "
|
||||
- "L0.195[669,986] 1.02us 67mb |----------L0.195----------| "
|
||||
- "L0.191[42,355] 1.04us 71mb |----------L0.191----------| "
|
||||
- "**** 3 Output Files (parquet_file_id not yet assigned), 292mb total:"
|
||||
- "L1 "
|
||||
|
@ -746,11 +746,11 @@ async fn random_backfill_empty_partition() {
|
|||
- "L1.?[339,676] 1.04us 100mb |------------L1.?------------| "
|
||||
- "L1.?[677,986] 1.04us 92mb |-----------L1.?-----------| "
|
||||
- "Committing partition 1:"
|
||||
- " Soft Deleting 4 files: L0.190, L0.191, L0.193, L0.194"
|
||||
- " Soft Deleting 4 files: L0.190, L0.191, L0.193, L0.195"
|
||||
- " Creating 3 files"
|
||||
- "**** Simulation run 59, type=split(ReduceOverlap)(split_times=[676]). 1 Input Files, 60mb total:"
|
||||
- "L0, all files 60mb "
|
||||
- "L0.195[669,986] 1.05us |-----------------------------------------L0.195-----------------------------------------|"
|
||||
- "L0.196[669,986] 1.05us |-----------------------------------------L0.196-----------------------------------------|"
|
||||
- "**** 2 Output Files (parquet_file_id not yet assigned), 60mb total:"
|
||||
- "L0 "
|
||||
- "L0.?[669,676] 1.05us 2mb |L0.?| "
|
||||
|
@ -763,11 +763,11 @@ async fn random_backfill_empty_partition() {
|
|||
- "L0.?[42,338] 1.05us 38mb |---------------------------------------L0.?----------------------------------------| "
|
||||
- "L0.?[339,355] 1.05us 2mb |L0.?|"
|
||||
- "Committing partition 1:"
|
||||
- " Soft Deleting 2 files: L0.192, L0.195"
|
||||
- " Soft Deleting 2 files: L0.192, L0.196"
|
||||
- " Creating 4 files"
|
||||
- "**** Simulation run 61, type=split(CompactAndSplitOutput(FoundSubsetLessThanMaxCompactSize))(split_times=[528]). 2 Input Files, 179mb total:"
|
||||
- "L0 "
|
||||
- "L0.196[356,668] 1.04us 79mb |-------------------------------------L0.196--------------------------------------| "
|
||||
- "L0.194[356,668] 1.04us 79mb |-------------------------------------L0.194--------------------------------------| "
|
||||
- "L1 "
|
||||
- "L1.199[339,676] 1.04us 100mb|-----------------------------------------L1.199-----------------------------------------|"
|
||||
- "**** 2 Output Files (parquet_file_id not yet assigned), 179mb total:"
|
||||
|
@ -775,7 +775,7 @@ async fn random_backfill_empty_partition() {
|
|||
- "L1.?[339,528] 1.04us 100mb|----------------------L1.?----------------------| "
|
||||
- "L1.?[529,676] 1.04us 78mb |----------------L1.?-----------------| "
|
||||
- "Committing partition 1:"
|
||||
- " Soft Deleting 2 files: L0.196, L1.199"
|
||||
- " Soft Deleting 2 files: L0.194, L1.199"
|
||||
- " Creating 2 files"
|
||||
- "**** Simulation run 62, type=split(ReduceOverlap)(split_times=[528]). 1 Input Files, 38mb total:"
|
||||
- "L0, all files 38mb "
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -527,21 +527,6 @@ impl Partition {
|
|||
pub fn sort_key_ids(&self) -> Option<&SortedColumnSet> {
|
||||
self.sort_key_ids.as_ref()
|
||||
}
|
||||
|
||||
// todo: resue the same function in https://github.com/influxdata/influxdb_iox/pull/8556/
|
||||
/// The sort_key_ids if present and not empty
|
||||
pub fn sort_key_ids_none_if_empty(&self) -> Option<SortedColumnSet> {
|
||||
match self.sort_key_ids.as_ref() {
|
||||
None => None,
|
||||
Some(sort_key_ids) => {
|
||||
if sort_key_ids.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(sort_key_ids.clone())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
use data_types::{
|
||||
Column, ColumnId, ColumnSet, ColumnType, CompactionLevel, NamespaceId, ParquetFile,
|
||||
ParquetFileId, Partition, PartitionId, PartitionKey, SkippedCompaction, Table, TableId,
|
||||
Timestamp, TransitionPartitionId,
|
||||
ColumnSet, CompactionLevel, NamespaceId, ParquetFile, ParquetFileId, Partition, PartitionId,
|
||||
PartitionKey, SkippedCompaction, Table, TableId, Timestamp, TransitionPartitionId,
|
||||
};
|
||||
use uuid::Uuid;
|
||||
|
||||
|
@ -112,51 +111,6 @@ impl From<ParquetFile> for ParquetFileBuilder {
|
|||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
/// Build [`Column`]s for testing
|
||||
pub struct ColumnBuilder {
|
||||
column: Column,
|
||||
}
|
||||
|
||||
impl ColumnBuilder {
|
||||
/// Create a builder to create a column with `table_id` `id`
|
||||
pub fn new(id: i64, table_id: i64) -> Self {
|
||||
Self {
|
||||
column: Column {
|
||||
id: ColumnId::new(id),
|
||||
table_id: TableId::new(table_id),
|
||||
name: "column".to_string(),
|
||||
column_type: ColumnType::Tag,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
/// Set the column name
|
||||
pub fn with_name(self, name: &str) -> Self {
|
||||
Self {
|
||||
column: Column {
|
||||
name: name.to_string(),
|
||||
..self.column
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
/// Set column type
|
||||
pub fn with_column_type(self, column_type: ColumnType) -> Self {
|
||||
Self {
|
||||
column: Column {
|
||||
column_type,
|
||||
..self.column
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
/// Create the table
|
||||
pub fn build(self) -> Column {
|
||||
self.column
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
/// Build [`Table`]s for testing
|
||||
pub struct TableBuilder {
|
||||
|
|
|
@ -25,9 +25,7 @@ pub use catalog::{
|
|||
};
|
||||
|
||||
mod builders;
|
||||
pub use builders::{
|
||||
ColumnBuilder, ParquetFileBuilder, PartitionBuilder, SkippedCompactionBuilder, TableBuilder,
|
||||
};
|
||||
pub use builders::{ParquetFileBuilder, PartitionBuilder, SkippedCompactionBuilder, TableBuilder};
|
||||
|
||||
/// Create a partition identifier from an int (which gets used as the table ID) and a partition key
|
||||
/// with the string "arbitrary". Most useful in cases where there isn't any actual catalog
|
||||
|
|
Loading…
Reference in New Issue