feat: If namespace or table partition templates are specified, use those

pull/24376/head
Carol (Nichols || Goulding) 2023-04-27 13:20:15 -04:00 committed by Dom Dwyer
parent c1a8408572
commit 58d9c40ffd
No known key found for this signature in database
GPG Key ID: E4C40DBD9157879A
36 changed files with 868 additions and 350 deletions

View File

@ -174,20 +174,21 @@ fn to_queryable_parquet_chunk(
partition_info: &PartitionInfo,
store: ParquetStorage,
) -> QueryableParquetChunk {
let column_id_lookup = partition_info.table_schema.column_id_map();
let column_id_lookup = partition_info.table_info.column_id_map();
let selection: Vec<_> = file
.file
.column_set
.iter()
.flat_map(|id| column_id_lookup.get(id).copied())
.collect();
let table_schema: Schema = partition_info
.table_schema
let table_info: Schema = partition_info
.table_info
.as_ref()
.clone()
.schema()
.try_into()
.expect("table schema is broken");
let schema = table_schema
.expect("table info is broken");
let schema = table_info
.select_by_names(&selection)
.expect("schema in-sync");
let pk = schema.primary_key();

View File

@ -49,7 +49,7 @@ impl NamespacesSource for MockNamespacesSource {
mod tests {
use std::collections::BTreeMap;
use data_types::{ColumnId, ColumnSchema, ColumnType, TableId, TableSchema};
use data_types::{ColumnId, ColumnSchema, ColumnType, TableId, TableInfo, TableSchema};
use super::*;
@ -128,7 +128,7 @@ mod tests {
let tables = BTreeMap::from([
(
"table1".to_string(),
TableSchema {
TableInfo::new(TableSchema {
id: TableId::new(1),
columns: BTreeMap::from([
(
@ -146,11 +146,11 @@ mod tests {
},
),
]),
},
}),
),
(
"table2".to_string(),
TableSchema {
TableInfo::new(TableSchema {
id: TableId::new(2),
columns: BTreeMap::from([
(
@ -175,7 +175,7 @@ mod tests {
},
),
]),
},
}),
),
]);
@ -196,6 +196,7 @@ mod tests {
max_columns_per_table: 10,
max_tables: 42,
retention_period_ns: None,
partition_template: None,
},
},
}

View File

@ -80,8 +80,8 @@ impl ParquetFileSink for ObjectStoreParquetFileSink {
let parquet_file =
meta.to_parquet_file(partition.partition_id, file_size, &parquet_meta, |name| {
partition
.table_schema
.columns
.table_info
.columns()
.get(name)
.expect("unknown column")
.id

View File

@ -91,7 +91,7 @@ where
.await
.ok_or_else::<DynError, _>(|| String::from("Cannot find namespace schema").into())?;
let table_schema = namespace_schema
let table_info = namespace_schema
.tables
.get(&table.name)
.ok_or_else::<DynError, _>(|| String::from("Cannot find table schema").into())?;
@ -101,7 +101,7 @@ where
namespace_id: table.namespace_id,
namespace_name: namespace.name,
table: Arc::new(table),
table_schema: Arc::new(table_schema.clone()),
table_info: Arc::new(table_info.clone()),
sort_key: partition.sort_key(),
partition_key: partition.partition_key,
}))

View File

@ -2,7 +2,7 @@
use std::sync::Arc;
use data_types::{NamespaceId, PartitionId, PartitionKey, Table, TableSchema};
use data_types::{NamespaceId, PartitionId, PartitionKey, Table, TableInfo};
use schema::sort::SortKey;
/// Information about the Partition being compacted
@ -20,8 +20,8 @@ pub struct PartitionInfo {
/// Table.
pub table: Arc<Table>,
/// Table schema
pub table_schema: Arc<TableSchema>,
/// Table info
pub table_info: Arc<TableInfo>,
/// Sort key of the partition
pub sort_key: Option<SortKey>,
@ -33,6 +33,6 @@ pub struct PartitionInfo {
impl PartitionInfo {
/// Returns number of columns in the table
pub fn column_count(&self) -> usize {
self.table_schema.column_count()
self.table_info.column_count()
}
}

View File

@ -2,7 +2,7 @@ use std::{collections::BTreeMap, sync::Arc};
use data_types::{
ColumnId, ColumnSchema, ColumnType, NamespaceId, PartitionId, PartitionKey, Table, TableId,
TableSchema,
TableInfo, TableSchema,
};
use crate::PartitionInfo;
@ -27,10 +27,10 @@ impl PartitionInfoBuilder {
namespace_id,
name: String::from("table"),
}),
table_schema: Arc::new(TableSchema {
table_info: Arc::new(TableInfo::new(TableSchema {
id: table_id,
columns: BTreeMap::new(),
}),
})),
sort_key: None,
partition_key: PartitionKey::from("key"),
},
@ -52,11 +52,11 @@ impl PartitionInfoBuilder {
columns.insert(i.to_string(), col);
}
let table_schema = Arc::new(TableSchema {
let table_info = Arc::new(TableInfo::new(TableSchema {
id: self.inner.table.id,
columns,
});
self.inner.table_schema = table_schema;
}));
self.inner.table_info = table_info;
self
}

View File

@ -766,13 +766,13 @@ async fn random_backfill_empty_partition() {
- "L0 "
- "L0.?[357,658] 1.04us 4mb |----------------------------------------L0.?----------------------------------------| "
- "L0.?[659,670] 1.04us 144kb |L0.?|"
- "**** Simulation run 72, type=split(ReduceOverlap)(split_times=[329]). 1 Input Files, 2mb total:"
- "L0, all files 2mb "
- "L0.168[173,356] 1.04us |-----------------------------------------L0.168-----------------------------------------|"
- "**** 2 Output Files (parquet_file_id not yet assigned), 2mb total:"
- "**** Simulation run 72, type=split(ReduceOverlap)(split_times=[329]). 1 Input Files, 3mb total:"
- "L0, all files 3mb "
- "L0.165[42,356] 1.04us |-----------------------------------------L0.165-----------------------------------------|"
- "**** 2 Output Files (parquet_file_id not yet assigned), 3mb total:"
- "L0 "
- "L0.?[173,329] 1.04us 2mb |-----------------------------------L0.?-----------------------------------| "
- "L0.?[330,356] 1.04us 356kb |---L0.?---| "
- "L0.?[42,329] 1.04us 3mb |--------------------------------------L0.?--------------------------------------| "
- "L0.?[330,356] 1.04us 293kb |L0.?-| "
- "**** Simulation run 73, type=split(ReduceOverlap)(split_times=[658]). 1 Input Files, 4mb total:"
- "L0, all files 4mb "
- "L0.169[357,670] 1.04us |-----------------------------------------L0.169-----------------------------------------|"
@ -822,13 +822,13 @@ async fn random_backfill_empty_partition() {
- "L0 "
- "L0.?[173,329] 1.05us 2mb |-----------------------------------L0.?-----------------------------------| "
- "L0.?[330,356] 1.05us 356kb |---L0.?---| "
- "**** Simulation run 80, type=split(ReduceOverlap)(split_times=[329]). 1 Input Files, 3mb total:"
- "L0, all files 3mb "
- "L0.165[42,356] 1.04us |-----------------------------------------L0.165-----------------------------------------|"
- "**** 2 Output Files (parquet_file_id not yet assigned), 3mb total:"
- "**** Simulation run 80, type=split(ReduceOverlap)(split_times=[658]). 1 Input Files, 4mb total:"
- "L0, all files 4mb "
- "L0.180[357,670] 1.05us |-----------------------------------------L0.180-----------------------------------------|"
- "**** 2 Output Files (parquet_file_id not yet assigned), 4mb total:"
- "L0 "
- "L0.?[42,329] 1.04us 3mb |--------------------------------------L0.?--------------------------------------| "
- "L0.?[330,356] 1.04us 293kb |L0.?-| "
- "L0.?[357,658] 1.05us 4mb |----------------------------------------L0.?----------------------------------------| "
- "L0.?[659,670] 1.05us 158kb |L0.?|"
- "**** Simulation run 81, type=split(ReduceOverlap)(split_times=[658]). 1 Input Files, 3mb total:"
- "L0, all files 3mb "
- "L0.166[357,670] 1.04us |-----------------------------------------L0.166-----------------------------------------|"
@ -836,13 +836,13 @@ async fn random_backfill_empty_partition() {
- "L0 "
- "L0.?[357,658] 1.04us 3mb |----------------------------------------L0.?----------------------------------------| "
- "L0.?[659,670] 1.04us 130kb |L0.?|"
- "**** Simulation run 82, type=split(ReduceOverlap)(split_times=[658]). 1 Input Files, 4mb total:"
- "L0, all files 4mb "
- "L0.180[357,670] 1.05us |-----------------------------------------L0.180-----------------------------------------|"
- "**** 2 Output Files (parquet_file_id not yet assigned), 4mb total:"
- "**** Simulation run 82, type=split(ReduceOverlap)(split_times=[329]). 1 Input Files, 2mb total:"
- "L0, all files 2mb "
- "L0.168[173,356] 1.04us |-----------------------------------------L0.168-----------------------------------------|"
- "**** 2 Output Files (parquet_file_id not yet assigned), 2mb total:"
- "L0 "
- "L0.?[357,658] 1.05us 4mb |----------------------------------------L0.?----------------------------------------| "
- "L0.?[659,670] 1.05us 158kb |L0.?|"
- "L0.?[173,329] 1.04us 2mb |-----------------------------------L0.?-----------------------------------| "
- "L0.?[330,356] 1.04us 356kb |---L0.?---| "
- "**** Simulation run 83, type=split(ReduceOverlap)(split_times=[329]). 1 Input Files, 5mb total:"
- "L0, all files 5mb "
- "L0.182[50,356] 1.05us |-----------------------------------------L0.182-----------------------------------------|"
@ -1012,7 +1012,7 @@ async fn random_backfill_empty_partition() {
- "L0.?[967,986] 1.05us 218kb |L0.?|"
- "**** Simulation run 105, type=split(ReduceOverlap)(split_times=[648]). 1 Input Files, 4mb total:"
- "L0, all files 4mb "
- "L0.255[357,658] 1.05us |-----------------------------------------L0.255-----------------------------------------|"
- "L0.251[357,658] 1.05us |-----------------------------------------L0.251-----------------------------------------|"
- "**** 2 Output Files (parquet_file_id not yet assigned), 4mb total:"
- "L0 "
- "L0.?[357,648] 1.05us 4mb |----------------------------------------L0.?-----------------------------------------| "
@ -1039,7 +1039,7 @@ async fn random_backfill_empty_partition() {
- "L0.?[671,966] 1.05us 3mb |---------------------------------------L0.?---------------------------------------| "
- "L0.?[967,986] 1.05us 218kb |L0.?|"
- "Committing partition 1:"
- " Soft Deleting 20 files: L0.145, L0.156, L0.167, L0.178, L0.189, L0.199, L0.205, L0.209, L0.213, L0.219, L0.223, L0.227, L0.233, L0.237, L0.243, L0.247, L0.253, L0.255, L0.261, L0.265"
- " Soft Deleting 20 files: L0.145, L0.156, L0.167, L0.178, L0.189, L0.199, L0.205, L0.209, L0.213, L0.219, L0.223, L0.227, L0.233, L0.237, L0.243, L0.247, L0.251, L0.253, L0.261, L0.265"
- " Creating 40 files"
- "**** Simulation run 109, type=split(CompactAndSplitOutput(FoundSubsetLessThanMaxCompactSize))(split_times=[263]). 2 Input Files, 103mb total:"
- "L0 "
@ -1132,14 +1132,14 @@ async fn random_backfill_empty_partition() {
- "L0.?[264,329] 1.04us 790kb |--------L0.?---------| "
- "**** Simulation run 121, type=split(ReduceOverlap)(split_times=[263]). 1 Input Files, 3mb total:"
- "L0, all files 3mb "
- "L0.251[42,329] 1.04us |-----------------------------------------L0.251-----------------------------------------|"
- "L0.235[42,329] 1.04us |-----------------------------------------L0.235-----------------------------------------|"
- "**** 2 Output Files (parquet_file_id not yet assigned), 3mb total:"
- "L0 "
- "L0.?[42,263] 1.04us 2mb |-------------------------------L0.?--------------------------------| "
- "L0.?[264,329] 1.04us 716kb |-------L0.?-------| "
- "**** Simulation run 122, type=split(ReduceOverlap)(split_times=[263]). 1 Input Files, 2mb total:"
- "L0, all files 2mb "
- "L0.235[173,329] 1.04us |-----------------------------------------L0.235-----------------------------------------|"
- "L0.255[173,329] 1.04us |-----------------------------------------L0.255-----------------------------------------|"
- "**** 2 Output Files (parquet_file_id not yet assigned), 2mb total:"
- "L0 "
- "L0.?[173,263] 1.04us 1mb |----------------------L0.?-----------------------| "
@ -1194,7 +1194,7 @@ async fn random_backfill_empty_partition() {
- "L0.?[42,263] 1.05us 2mb |-------------------------------L0.?--------------------------------| "
- "L0.?[264,329] 1.05us 716kb |-------L0.?-------| "
- "Committing partition 1:"
- " Soft Deleting 20 files: L0.197, L0.201, L0.203, L0.207, L0.211, L0.215, L0.217, L0.221, L0.225, L0.229, L0.231, L0.235, L0.239, L0.241, L0.245, L0.249, L0.251, L0.257, L0.259, L0.263"
- " Soft Deleting 20 files: L0.197, L0.201, L0.203, L0.207, L0.211, L0.215, L0.217, L0.221, L0.225, L0.229, L0.231, L0.235, L0.239, L0.241, L0.245, L0.249, L0.255, L0.257, L0.259, L0.263"
- " Creating 40 files"
- "**** Simulation run 130, type=split(CompactAndSplitOutput(FoundSubsetLessThanMaxCompactSize))(split_times=[570, 876]). 9 Input Files, 230mb total:"
- "L0 "
@ -2057,7 +2057,7 @@ async fn random_backfill_empty_partition() {
- "L0.334[42,263] 1.04us 2mb|------L0.334-------| "
- "L0.460[264,295] 1.04us 341kb |L0.460| "
- "L0.461[296,329] 1.04us 374kb |L0.461| "
- "L0.252[330,356] 1.04us 293kb |L0.252| "
- "L0.236[330,356] 1.04us 293kb |L0.236| "
- "L0.389[357,570] 1.04us 2mb |------L0.389------| "
- "L0.525[571,583] 1.04us 132kb |L0.525| "
- "L0.526[584,590] 1.04us 77kb |L0.526| "
@ -2071,7 +2071,7 @@ async fn random_backfill_empty_partition() {
- "L0.336[173,263] 1.04us 1mb |L0.336| "
- "L0.464[264,295] 1.04us 415kb |L0.464| "
- "L0.465[296,329] 1.04us 455kb |L0.465| "
- "L0.236[330,356] 1.04us 356kb |L0.236| "
- "L0.256[330,356] 1.04us 356kb |L0.256| "
- "L0.393[357,570] 1.04us 3mb |------L0.393------| "
- "L0.529[571,583] 1.04us 160kb |L0.529| "
- "L0.530[584,590] 1.04us 93kb |L0.530| "
@ -2125,7 +2125,7 @@ async fn random_backfill_empty_partition() {
- "L0.544[584,590] 1.05us 93kb |L0.544| "
- "L0.479[591,648] 1.05us 774kb |L0.479| "
- "L0.303[649,658] 1.05us 132kb |L0.303| "
- "L0.256[659,670] 1.05us 158kb |L0.256| "
- "L0.252[659,670] 1.05us 158kb |L0.252| "
- "L0.545[671,870] 1.05us 3mb |-----L0.545-----| "
- "L0.546[871,876] 1.05us 80kb |L0.546| "
- "L0.410[877,950] 1.05us 982kb |L0.410| "
@ -3646,13 +3646,13 @@ async fn random_backfill_over_l2s() {
- "L0 "
- "L0.?[295,334] 1.03us 430kb|-------------------------L0.?--------------------------| "
- "L0.?[335,356] 1.03us 243kb |------------L0.?------------| "
- "**** Simulation run 143, type=split(ReduceOverlap)(split_times=[626]). 1 Input Files, 677kb total:"
- "L0, all files 677kb "
- "L0.328[592,629] 1.03us |-----------------------------------------L0.328-----------------------------------------|"
- "**** 2 Output Files (parquet_file_id not yet assigned), 677kb total:"
- "**** Simulation run 143, type=split(ReduceOverlap)(split_times=[626]). 1 Input Files, 1mb total:"
- "L0, all files 1mb "
- "L0.324[592,670] 1.03us |-----------------------------------------L0.324-----------------------------------------|"
- "**** 2 Output Files (parquet_file_id not yet assigned), 1mb total:"
- "L0 "
- "L0.?[592,626] 1.03us 622kb|--------------------------------------L0.?--------------------------------------| "
- "L0.?[627,629] 1.03us 55kb |L0.?|"
- "L0.?[592,626] 1.03us 455kb|----------------L0.?-----------------| "
- "L0.?[627,670] 1.03us 589kb |---------------------L0.?----------------------| "
- "**** Simulation run 144, type=split(ReduceOverlap)(split_times=[334]). 1 Input Files, 1mb total:"
- "L0, all files 1mb "
- "L0.283[295,356] 1.03us |-----------------------------------------L0.283-----------------------------------------|"
@ -3702,13 +3702,13 @@ async fn random_backfill_over_l2s() {
- "L0 "
- "L0.?[295,334] 1.04us 522kb|-------------------------L0.?--------------------------| "
- "L0.?[335,356] 1.04us 295kb |------------L0.?------------| "
- "**** Simulation run 151, type=split(ReduceOverlap)(split_times=[626]). 1 Input Files, 1mb total:"
- "L0, all files 1mb "
- "L0.324[592,670] 1.03us |-----------------------------------------L0.324-----------------------------------------|"
- "**** 2 Output Files (parquet_file_id not yet assigned), 1mb total:"
- "**** Simulation run 151, type=split(ReduceOverlap)(split_times=[626]). 1 Input Files, 677kb total:"
- "L0, all files 677kb "
- "L0.342[592,629] 1.04us |-----------------------------------------L0.342-----------------------------------------|"
- "**** 2 Output Files (parquet_file_id not yet assigned), 677kb total:"
- "L0 "
- "L0.?[592,626] 1.03us 455kb|----------------L0.?-----------------| "
- "L0.?[627,670] 1.03us 589kb |---------------------L0.?----------------------| "
- "L0.?[592,626] 1.04us 622kb|--------------------------------------L0.?--------------------------------------| "
- "L0.?[627,629] 1.04us 55kb |L0.?|"
- "**** Simulation run 152, type=split(ReduceOverlap)(split_times=[334]). 1 Input Files, 817kb total:"
- "L0, all files 817kb "
- "L0.281[295,356] 1.03us |-----------------------------------------L0.281-----------------------------------------|"
@ -3718,11 +3718,11 @@ async fn random_backfill_over_l2s() {
- "L0.?[335,356] 1.03us 295kb |------------L0.?------------| "
- "**** Simulation run 153, type=split(ReduceOverlap)(split_times=[626]). 1 Input Files, 677kb total:"
- "L0, all files 677kb "
- "L0.342[592,629] 1.04us |-----------------------------------------L0.342-----------------------------------------|"
- "L0.328[592,629] 1.03us |-----------------------------------------L0.328-----------------------------------------|"
- "**** 2 Output Files (parquet_file_id not yet assigned), 677kb total:"
- "L0 "
- "L0.?[592,626] 1.04us 622kb|--------------------------------------L0.?--------------------------------------| "
- "L0.?[627,629] 1.04us 55kb |L0.?|"
- "L0.?[592,626] 1.03us 622kb|--------------------------------------L0.?--------------------------------------| "
- "L0.?[627,629] 1.03us 55kb |L0.?|"
- "**** Simulation run 154, type=split(ReduceOverlap)(split_times=[334]). 1 Input Files, 1mb total:"
- "L0, all files 1mb "
- "L0.291[295,356] 1.04us |-----------------------------------------L0.291-----------------------------------------|"
@ -3910,7 +3910,7 @@ async fn random_backfill_over_l2s() {
- "L0.?[904,986] 1.03us 918kb |----------------------------------L0.?-----------------------------------| "
- "**** Simulation run 178, type=split(ReduceOverlap)(split_times=[619]). 1 Input Files, 455kb total:"
- "L0, all files 455kb "
- "L0.404[592,626] 1.03us |-----------------------------------------L0.404-----------------------------------------|"
- "L0.388[592,626] 1.03us |-----------------------------------------L0.388-----------------------------------------|"
- "**** 2 Output Files (parquet_file_id not yet assigned), 455kb total:"
- "L0 "
- "L0.?[592,619] 1.03us 362kb|--------------------------------L0.?---------------------------------| "
@ -3924,7 +3924,7 @@ async fn random_backfill_over_l2s() {
- "L0.?[904,950] 1.03us 636kb |------------------------------L0.?------------------------------| "
- "**** Simulation run 180, type=split(ReduceOverlap)(split_times=[619]). 1 Input Files, 622kb total:"
- "L0, all files 622kb "
- "L0.388[592,626] 1.03us |-----------------------------------------L0.388-----------------------------------------|"
- "L0.408[592,626] 1.03us |-----------------------------------------L0.408-----------------------------------------|"
- "**** 2 Output Files (parquet_file_id not yet assigned), 622kb total:"
- "L0 "
- "L0.?[592,619] 1.03us 494kb|--------------------------------L0.?---------------------------------| "
@ -3973,7 +3973,7 @@ async fn random_backfill_over_l2s() {
- "L0.?[904,950] 1.04us 636kb |------------------------------L0.?------------------------------| "
- "**** Simulation run 187, type=split(ReduceOverlap)(split_times=[619]). 1 Input Files, 622kb total:"
- "L0, all files 622kb "
- "L0.408[592,626] 1.04us |-----------------------------------------L0.408-----------------------------------------|"
- "L0.404[592,626] 1.04us |-----------------------------------------L0.404-----------------------------------------|"
- "**** 2 Output Files (parquet_file_id not yet assigned), 622kb total:"
- "L0 "
- "L0.?[592,619] 1.04us 494kb|--------------------------------L0.?---------------------------------| "
@ -4253,7 +4253,7 @@ async fn random_backfill_over_l2s() {
- "L0.323[358,591] 1.03us 3mb |-------L0.323-------| "
- "L0.459[592,619] 1.03us 362kb |L0.459| "
- "L0.460[620,626] 1.03us 94kb |L0.460| "
- "L0.405[627,670] 1.03us 589kb |L0.405| "
- "L0.389[627,670] 1.03us 589kb |L0.389| "
- "L0.218[671,672] 1.03us 13kb |L0.218| "
- "L0.325[673,887] 1.03us 3mb |------L0.325------| "
- "L0.461[888,903] 1.03us 203kb |L0.461| "
@ -4266,7 +4266,7 @@ async fn random_backfill_over_l2s() {
- "L0.327[358,591] 1.03us 4mb |-------L0.327-------| "
- "L0.463[592,619] 1.03us 494kb |L0.463| "
- "L0.464[620,626] 1.03us 128kb |L0.464| "
- "L0.389[627,629] 1.03us 55kb |L0.389| "
- "L0.409[627,629] 1.03us 55kb |L0.409| "
- "L0.521[76,275] 1.04us 2mb |-----L0.521-----| "
- "L0.522[276,294] 1.04us 227kb |L0.522| "
- "L0.394[295,334] 1.04us 474kb |L0.394| "
@ -4314,7 +4314,7 @@ async fn random_backfill_over_l2s() {
- "L0.341[358,591] 1.04us 4mb |-------L0.341-------| "
- "L0.477[592,619] 1.04us 494kb |L0.477| "
- "L0.478[620,626] 1.04us 128kb |L0.478| "
- "L0.409[627,629] 1.04us 55kb |L0.409| "
- "L0.405[627,629] 1.04us 55kb |L0.405| "
- "L0.529[76,275] 1.04us 2mb |-----L0.529-----| "
- "L0.530[276,294] 1.04us 227kb |L0.530| "
- "L0.414[295,334] 1.04us 474kb |L0.414| "

View File

@ -865,11 +865,11 @@ async fn all_overlapping_l0_max_input_bytes_per_partition() {
- "L0.?[171444,200000] 6ns 1mb|-----------------------------------------L0.?------------------------------------------| "
- "**** Simulation run 52, type=split(HighL0OverlapTotalBacklog)(split_times=[171443]). 1 Input Files, 1mb total:"
- "L0, all files 1mb "
- "L0.73[171443,200000] 9ns |-----------------------------------------L0.73------------------------------------------|"
- "L0.59[171443,200000] 7ns |-----------------------------------------L0.59------------------------------------------|"
- "**** 2 Output Files (parquet_file_id not yet assigned), 1mb total:"
- "L0 "
- "L0.?[171443,171443] 9ns 0b|L0.?| "
- "L0.?[171444,200000] 9ns 1mb|-----------------------------------------L0.?------------------------------------------| "
- "L0.?[171443,171443] 7ns 0b|L0.?| "
- "L0.?[171444,200000] 7ns 1mb|-----------------------------------------L0.?------------------------------------------| "
- "**** Simulation run 53, type=split(HighL0OverlapTotalBacklog)(split_times=[171443]). 1 Input Files, 1mb total:"
- "L0, all files 1mb "
- "L0.80[171443,200000] 10ns|-----------------------------------------L0.80------------------------------------------|"
@ -879,18 +879,18 @@ async fn all_overlapping_l0_max_input_bytes_per_partition() {
- "L0.?[171444,200000] 10ns 1mb|-----------------------------------------L0.?------------------------------------------| "
- "**** Simulation run 54, type=split(HighL0OverlapTotalBacklog)(split_times=[171443]). 1 Input Files, 1mb total:"
- "L0, all files 1mb "
- "L0.59[171443,200000] 7ns |-----------------------------------------L0.59------------------------------------------|"
- "**** 2 Output Files (parquet_file_id not yet assigned), 1mb total:"
- "L0 "
- "L0.?[171443,171443] 7ns 0b|L0.?| "
- "L0.?[171444,200000] 7ns 1mb|-----------------------------------------L0.?------------------------------------------| "
- "**** Simulation run 55, type=split(HighL0OverlapTotalBacklog)(split_times=[171443]). 1 Input Files, 1mb total:"
- "L0, all files 1mb "
- "L0.66[171443,200000] 8ns |-----------------------------------------L0.66------------------------------------------|"
- "**** 2 Output Files (parquet_file_id not yet assigned), 1mb total:"
- "L0 "
- "L0.?[171443,171443] 8ns 0b|L0.?| "
- "L0.?[171444,200000] 8ns 1mb|-----------------------------------------L0.?------------------------------------------| "
- "**** Simulation run 55, type=split(HighL0OverlapTotalBacklog)(split_times=[171443]). 1 Input Files, 1mb total:"
- "L0, all files 1mb "
- "L0.73[171443,200000] 9ns |-----------------------------------------L0.73------------------------------------------|"
- "**** 2 Output Files (parquet_file_id not yet assigned), 1mb total:"
- "L0 "
- "L0.?[171443,171443] 9ns 0b|L0.?| "
- "L0.?[171444,200000] 9ns 1mb|-----------------------------------------L0.?------------------------------------------| "
- "Committing partition 1:"
- " Soft Deleting 27 files: L0.42, L0.44, L0.45, L0.49, L0.51, L0.52, L0.56, L0.58, L0.59, L0.63, L0.65, L0.66, L0.70, L0.72, L0.73, L0.77, L0.79, L0.80, L0.99, L0.103, L0.107, L0.111, L0.115, L0.119, L1.121, L1.122, L1.123"
- " Creating 55 files"
@ -1227,7 +1227,7 @@ async fn all_overlapping_l0_max_input_bytes_per_partition() {
- "L0.?[156351,160867] 7ns 208kb |--------L0.?--------| "
- "**** Simulation run 97, type=split(ReduceOverlap)(split_times=[198370]). 1 Input Files, 1mb total:"
- "L0, all files 1mb "
- "L0.176[171444,200000] 7ns|-----------------------------------------L0.176-----------------------------------------|"
- "L0.172[171444,200000] 7ns|-----------------------------------------L0.172-----------------------------------------|"
- "**** 2 Output Files (parquet_file_id not yet assigned), 1mb total:"
- "L0 "
- "L0.?[171444,198370] 7ns 1mb|---------------------------------------L0.?---------------------------------------| "
@ -1241,7 +1241,7 @@ async fn all_overlapping_l0_max_input_bytes_per_partition() {
- "L0.?[156351,160867] 8ns 208kb |--------L0.?--------| "
- "**** Simulation run 99, type=split(ReduceOverlap)(split_times=[198370]). 1 Input Files, 1mb total:"
- "L0, all files 1mb "
- "L0.178[171444,200000] 8ns|-----------------------------------------L0.178-----------------------------------------|"
- "L0.176[171444,200000] 8ns|-----------------------------------------L0.176-----------------------------------------|"
- "**** 2 Output Files (parquet_file_id not yet assigned), 1mb total:"
- "L0 "
- "L0.?[171444,198370] 8ns 1mb|---------------------------------------L0.?---------------------------------------| "
@ -1255,7 +1255,7 @@ async fn all_overlapping_l0_max_input_bytes_per_partition() {
- "L0.?[156351,160867] 9ns 208kb |--------L0.?--------| "
- "**** Simulation run 101, type=split(ReduceOverlap)(split_times=[198370]). 1 Input Files, 1mb total:"
- "L0, all files 1mb "
- "L0.172[171444,200000] 9ns|-----------------------------------------L0.172-----------------------------------------|"
- "L0.178[171444,200000] 9ns|-----------------------------------------L0.178-----------------------------------------|"
- "**** 2 Output Files (parquet_file_id not yet assigned), 1mb total:"
- "L0 "
- "L0.?[171444,198370] 9ns 1mb|---------------------------------------L0.?---------------------------------------| "
@ -1743,7 +1743,7 @@ async fn all_overlapping_l0_max_input_bytes_per_partition() {
- "**** Simulation run 156, type=split(CompactAndSplitOutput(FoundSubsetLessThanMaxCompactSize))(split_times=[170977]). 8 Input Files, 20mb total:"
- "L0 "
- "L0.190[160868,171442] 7ns 488kb |----L0.190----| "
- "L0.175[171443,171443] 7ns 0b |L0.175| "
- "L0.171[171443,171443] 7ns 0b |L0.171| "
- "L0.309[171444,185000] 7ns 625kb |------L0.309------| "
- "L0.310[185001,198370] 7ns 617kb |------L0.310------| "
- "L0.264[198371,200000] 7ns 75kb |L0.264|"
@ -1756,7 +1756,7 @@ async fn all_overlapping_l0_max_input_bytes_per_partition() {
- "L1.?[140564,170977] 7ns 10mb|--------------------L1.?--------------------| "
- "L1.?[170978,200000] 7ns 10mb |------------------L1.?-------------------| "
- "Committing partition 1:"
- " Soft Deleting 8 files: L0.175, L0.190, L0.264, L1.302, L1.306, L0.309, L0.310, L1.356"
- " Soft Deleting 8 files: L0.171, L0.190, L0.264, L1.302, L1.306, L0.309, L0.310, L1.356"
- " Creating 2 files"
- "**** Simulation run 157, type=split(ReduceOverlap)(split_times=[170977]). 1 Input Files, 488kb total:"
- "L0, all files 488kb "
@ -1924,7 +1924,7 @@ async fn all_overlapping_l0_max_input_bytes_per_partition() {
- "L0.266[156351,160867] 8ns 208kb |L0.266| "
- "L0.387[160868,170977] 8ns 466kb |---L0.387----| "
- "L0.388[170978,171442] 8ns 21kb |L0.388| "
- "L0.177[171443,171443] 8ns 0b |L0.177| "
- "L0.175[171443,171443] 8ns 0b |L0.175| "
- "L0.313[171444,185000] 8ns 625kb |------L0.313------| "
- "L0.314[185001,198370] 8ns 617kb |------L0.314------| "
- "L0.268[198371,200000] 8ns 75kb |L0.268|"
@ -1937,7 +1937,7 @@ async fn all_overlapping_l0_max_input_bytes_per_partition() {
- "L1.?[167315,194064] 8ns 10mb |-----------------L1.?-----------------| "
- "L1.?[194065,200000] 8ns 2mb |-L1.?-| "
- "Committing partition 1:"
- " Soft Deleting 13 files: L0.159, L0.177, L0.266, L0.268, L0.311, L0.312, L0.313, L0.314, L0.376, L1.385, L1.386, L0.387, L0.388"
- " Soft Deleting 13 files: L0.159, L0.175, L0.266, L0.268, L0.311, L0.312, L0.313, L0.314, L0.376, L1.385, L1.386, L0.387, L0.388"
- " Creating 3 files"
- "**** Simulation run 173, type=split(ReduceOverlap)(split_times=[167314]). 1 Input Files, 466kb total:"
- "L0, all files 466kb "
@ -2118,7 +2118,7 @@ async fn all_overlapping_l0_max_input_bytes_per_partition() {
- "L0 "
- "L0.423[167315,170977] 9ns 169kb|-L0.423-| "
- "L0.390[170978,171442] 9ns 21kb |L0.390| "
- "L0.171[171443,171443] 9ns 0b |L0.171| "
- "L0.177[171443,171443] 9ns 0b |L0.177| "
- "L0.317[171444,185000] 9ns 625kb |--------------L0.317---------------| "
- "L0.424[185001,194064] 9ns 418kb |--------L0.424--------| "
- "L0.425[194065,198370] 9ns 199kb |-L0.425--| "
@ -2131,7 +2131,7 @@ async fn all_overlapping_l0_max_input_bytes_per_partition() {
- "L1.?[167315,191189] 9ns 10mb|-----------------------------L1.?------------------------------| "
- "L1.?[191190,200000] 9ns 4mb |---------L1.?---------| "
- "Committing partition 1:"
- " Soft Deleting 9 files: L0.171, L0.272, L0.317, L0.390, L1.420, L1.421, L0.423, L0.424, L0.425"
- " Soft Deleting 9 files: L0.177, L0.272, L0.317, L0.390, L1.420, L1.421, L0.423, L0.424, L0.425"
- " Creating 2 files"
- "**** Simulation run 189, type=split(ReduceOverlap)(split_times=[191189]). 1 Input Files, 418kb total:"
- "L0, all files 418kb "
@ -2819,11 +2819,11 @@ async fn all_overlapping_l0_max_input_bytes_per_partition_small_max_desired_file
- "L0.?[171444,200000] 6ns 1mb|-----------------------------------------L0.?------------------------------------------| "
- "**** Simulation run 52, type=split(HighL0OverlapTotalBacklog)(split_times=[171443]). 1 Input Files, 1mb total:"
- "L0, all files 1mb "
- "L0.73[171443,200000] 9ns |-----------------------------------------L0.73------------------------------------------|"
- "L0.59[171443,200000] 7ns |-----------------------------------------L0.59------------------------------------------|"
- "**** 2 Output Files (parquet_file_id not yet assigned), 1mb total:"
- "L0 "
- "L0.?[171443,171443] 9ns 0b|L0.?| "
- "L0.?[171444,200000] 9ns 1mb|-----------------------------------------L0.?------------------------------------------| "
- "L0.?[171443,171443] 7ns 0b|L0.?| "
- "L0.?[171444,200000] 7ns 1mb|-----------------------------------------L0.?------------------------------------------| "
- "**** Simulation run 53, type=split(HighL0OverlapTotalBacklog)(split_times=[171443]). 1 Input Files, 1mb total:"
- "L0, all files 1mb "
- "L0.80[171443,200000] 10ns|-----------------------------------------L0.80------------------------------------------|"
@ -2833,18 +2833,18 @@ async fn all_overlapping_l0_max_input_bytes_per_partition_small_max_desired_file
- "L0.?[171444,200000] 10ns 1mb|-----------------------------------------L0.?------------------------------------------| "
- "**** Simulation run 54, type=split(HighL0OverlapTotalBacklog)(split_times=[171443]). 1 Input Files, 1mb total:"
- "L0, all files 1mb "
- "L0.59[171443,200000] 7ns |-----------------------------------------L0.59------------------------------------------|"
- "**** 2 Output Files (parquet_file_id not yet assigned), 1mb total:"
- "L0 "
- "L0.?[171443,171443] 7ns 0b|L0.?| "
- "L0.?[171444,200000] 7ns 1mb|-----------------------------------------L0.?------------------------------------------| "
- "**** Simulation run 55, type=split(HighL0OverlapTotalBacklog)(split_times=[171443]). 1 Input Files, 1mb total:"
- "L0, all files 1mb "
- "L0.66[171443,200000] 8ns |-----------------------------------------L0.66------------------------------------------|"
- "**** 2 Output Files (parquet_file_id not yet assigned), 1mb total:"
- "L0 "
- "L0.?[171443,171443] 8ns 0b|L0.?| "
- "L0.?[171444,200000] 8ns 1mb|-----------------------------------------L0.?------------------------------------------| "
- "**** Simulation run 55, type=split(HighL0OverlapTotalBacklog)(split_times=[171443]). 1 Input Files, 1mb total:"
- "L0, all files 1mb "
- "L0.73[171443,200000] 9ns |-----------------------------------------L0.73------------------------------------------|"
- "**** 2 Output Files (parquet_file_id not yet assigned), 1mb total:"
- "L0 "
- "L0.?[171443,171443] 9ns 0b|L0.?| "
- "L0.?[171444,200000] 9ns 1mb|-----------------------------------------L0.?------------------------------------------| "
- "Committing partition 1:"
- " Soft Deleting 27 files: L0.42, L0.44, L0.45, L0.49, L0.51, L0.52, L0.56, L0.58, L0.59, L0.63, L0.65, L0.66, L0.70, L0.72, L0.73, L0.77, L0.79, L0.80, L0.99, L0.103, L0.107, L0.111, L0.115, L0.119, L1.121, L1.122, L1.123"
- " Creating 55 files"
@ -3181,7 +3181,7 @@ async fn all_overlapping_l0_max_input_bytes_per_partition_small_max_desired_file
- "L0.?[156351,160867] 7ns 208kb |--------L0.?--------| "
- "**** Simulation run 97, type=split(ReduceOverlap)(split_times=[198370]). 1 Input Files, 1mb total:"
- "L0, all files 1mb "
- "L0.176[171444,200000] 7ns|-----------------------------------------L0.176-----------------------------------------|"
- "L0.172[171444,200000] 7ns|-----------------------------------------L0.172-----------------------------------------|"
- "**** 2 Output Files (parquet_file_id not yet assigned), 1mb total:"
- "L0 "
- "L0.?[171444,198370] 7ns 1mb|---------------------------------------L0.?---------------------------------------| "
@ -3195,7 +3195,7 @@ async fn all_overlapping_l0_max_input_bytes_per_partition_small_max_desired_file
- "L0.?[156351,160867] 8ns 208kb |--------L0.?--------| "
- "**** Simulation run 99, type=split(ReduceOverlap)(split_times=[198370]). 1 Input Files, 1mb total:"
- "L0, all files 1mb "
- "L0.178[171444,200000] 8ns|-----------------------------------------L0.178-----------------------------------------|"
- "L0.176[171444,200000] 8ns|-----------------------------------------L0.176-----------------------------------------|"
- "**** 2 Output Files (parquet_file_id not yet assigned), 1mb total:"
- "L0 "
- "L0.?[171444,198370] 8ns 1mb|---------------------------------------L0.?---------------------------------------| "
@ -3209,7 +3209,7 @@ async fn all_overlapping_l0_max_input_bytes_per_partition_small_max_desired_file
- "L0.?[156351,160867] 9ns 208kb |--------L0.?--------| "
- "**** Simulation run 101, type=split(ReduceOverlap)(split_times=[198370]). 1 Input Files, 1mb total:"
- "L0, all files 1mb "
- "L0.172[171444,200000] 9ns|-----------------------------------------L0.172-----------------------------------------|"
- "L0.178[171444,200000] 9ns|-----------------------------------------L0.178-----------------------------------------|"
- "**** 2 Output Files (parquet_file_id not yet assigned), 1mb total:"
- "L0 "
- "L0.?[171444,198370] 9ns 1mb|---------------------------------------L0.?---------------------------------------| "
@ -3697,7 +3697,7 @@ async fn all_overlapping_l0_max_input_bytes_per_partition_small_max_desired_file
- "**** Simulation run 156, type=split(CompactAndSplitOutput(FoundSubsetLessThanMaxCompactSize))(split_times=[170977]). 8 Input Files, 20mb total:"
- "L0 "
- "L0.190[160868,171442] 7ns 488kb |----L0.190----| "
- "L0.175[171443,171443] 7ns 0b |L0.175| "
- "L0.171[171443,171443] 7ns 0b |L0.171| "
- "L0.309[171444,185000] 7ns 625kb |------L0.309------| "
- "L0.310[185001,198370] 7ns 617kb |------L0.310------| "
- "L0.264[198371,200000] 7ns 75kb |L0.264|"
@ -3710,7 +3710,7 @@ async fn all_overlapping_l0_max_input_bytes_per_partition_small_max_desired_file
- "L1.?[140564,170977] 7ns 10mb|--------------------L1.?--------------------| "
- "L1.?[170978,200000] 7ns 10mb |------------------L1.?-------------------| "
- "Committing partition 1:"
- " Soft Deleting 8 files: L0.175, L0.190, L0.264, L1.302, L1.306, L0.309, L0.310, L1.356"
- " Soft Deleting 8 files: L0.171, L0.190, L0.264, L1.302, L1.306, L0.309, L0.310, L1.356"
- " Creating 2 files"
- "**** Simulation run 157, type=split(ReduceOverlap)(split_times=[170977]). 1 Input Files, 488kb total:"
- "L0, all files 488kb "
@ -3878,7 +3878,7 @@ async fn all_overlapping_l0_max_input_bytes_per_partition_small_max_desired_file
- "L0.266[156351,160867] 8ns 208kb |L0.266| "
- "L0.387[160868,170977] 8ns 466kb |---L0.387----| "
- "L0.388[170978,171442] 8ns 21kb |L0.388| "
- "L0.177[171443,171443] 8ns 0b |L0.177| "
- "L0.175[171443,171443] 8ns 0b |L0.175| "
- "L0.313[171444,185000] 8ns 625kb |------L0.313------| "
- "L0.314[185001,198370] 8ns 617kb |------L0.314------| "
- "L0.268[198371,200000] 8ns 75kb |L0.268|"
@ -3891,7 +3891,7 @@ async fn all_overlapping_l0_max_input_bytes_per_partition_small_max_desired_file
- "L1.?[167315,194064] 8ns 10mb |-----------------L1.?-----------------| "
- "L1.?[194065,200000] 8ns 2mb |-L1.?-| "
- "Committing partition 1:"
- " Soft Deleting 13 files: L0.159, L0.177, L0.266, L0.268, L0.311, L0.312, L0.313, L0.314, L0.376, L1.385, L1.386, L0.387, L0.388"
- " Soft Deleting 13 files: L0.159, L0.175, L0.266, L0.268, L0.311, L0.312, L0.313, L0.314, L0.376, L1.385, L1.386, L0.387, L0.388"
- " Creating 3 files"
- "**** Simulation run 173, type=split(ReduceOverlap)(split_times=[167314]). 1 Input Files, 466kb total:"
- "L0, all files 466kb "
@ -4072,7 +4072,7 @@ async fn all_overlapping_l0_max_input_bytes_per_partition_small_max_desired_file
- "L0 "
- "L0.423[167315,170977] 9ns 169kb|-L0.423-| "
- "L0.390[170978,171442] 9ns 21kb |L0.390| "
- "L0.171[171443,171443] 9ns 0b |L0.171| "
- "L0.177[171443,171443] 9ns 0b |L0.177| "
- "L0.317[171444,185000] 9ns 625kb |--------------L0.317---------------| "
- "L0.424[185001,194064] 9ns 418kb |--------L0.424--------| "
- "L0.425[194065,198370] 9ns 199kb |-L0.425--| "
@ -4085,7 +4085,7 @@ async fn all_overlapping_l0_max_input_bytes_per_partition_small_max_desired_file
- "L1.?[167315,191189] 9ns 10mb|-----------------------------L1.?------------------------------| "
- "L1.?[191190,200000] 9ns 4mb |---------L1.?---------| "
- "Committing partition 1:"
- " Soft Deleting 9 files: L0.171, L0.272, L0.317, L0.390, L1.420, L1.421, L0.423, L0.424, L0.425"
- " Soft Deleting 9 files: L0.177, L0.272, L0.317, L0.390, L1.420, L1.421, L0.423, L0.424, L0.425"
- " Creating 2 files"
- "**** Simulation run 189, type=split(ReduceOverlap)(split_times=[191189]). 1 Input Files, 418kb total:"
- "L0, all files 418kb "

View File

@ -37,7 +37,7 @@ use compactor::{
config::{CompactionType, Config, PartitionsSourceConfig},
hardcoded_components, Components, PanicDataFusionPlanner, PartitionInfo,
};
use data_types::{ColumnType, CompactionLevel, ParquetFile, TableId};
use data_types::{ColumnType, CompactionLevel, ParquetFile, TableId, TableInfo};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion_util::config::register_iox_object_store;
use futures::TryStreamExt;
@ -575,7 +575,7 @@ impl<const WITH_FILES: bool> TestSetupBuilder<WITH_FILES> {
namespace_id: self.ns.namespace.id,
namespace_name: self.ns.namespace.name.clone(),
table: Arc::new(self.table.table.clone()),
table_schema: Arc::new(self.table.catalog_schema().await),
table_info: Arc::new(TableInfo::from(&self.table.table)),
sort_key: self.partition.partition.sort_key(),
partition_key: self.partition.partition.partition_key.clone(),
});

View File

@ -330,7 +330,7 @@ pub struct NamespaceSchema {
/// the namespace id
pub id: NamespaceId,
/// the tables in the namespace by name
pub tables: BTreeMap<String, TableSchema>,
pub tables: BTreeMap<String, TableInfo>,
/// the number of columns per table this namespace allows
pub max_columns_per_table: usize,
/// The maximum number of tables permitted in this namespace.
@ -338,25 +338,34 @@ pub struct NamespaceSchema {
/// The retention period in ns.
/// None represents infinite duration (i.e. never drop data).
pub retention_period_ns: Option<i64>,
/// The optionally-specified partition template to use for writes in this namespace.
pub partition_template: Option<Arc<PartitionTemplate>>,
}
impl NamespaceSchema {
/// Create a new `NamespaceSchema`
pub fn new(
id: NamespaceId,
max_columns_per_table: i32,
max_tables: i32,
retention_period_ns: Option<i64>,
) -> Self {
impl From<&Namespace> for NamespaceSchema {
fn from(namespace: &Namespace) -> Self {
let &Namespace {
id,
retention_period_ns,
max_tables,
max_columns_per_table,
..
} = namespace;
Self {
id,
tables: BTreeMap::new(),
max_columns_per_table: max_columns_per_table as usize,
max_tables: max_tables as usize,
retention_period_ns,
// TODO: Store and retrieve PartitionTemplate from the database
partition_template: None,
}
}
}
impl NamespaceSchema {
/// Estimated Size in bytes including `self`.
pub fn size(&self) -> usize {
std::mem::size_of_val(self)
@ -379,6 +388,108 @@ pub struct Table {
pub name: String,
}
/// Useful table information to cache, including the table's partition template if any, and the
/// table's columns.
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct TableInfo {
table_schema: TableSchema,
/// This table's partition template
pub partition_template: Option<Arc<PartitionTemplate>>,
}
impl TableInfo {
/// Create new table info with the given table schema and no partition template specified.
pub fn new(table_schema: TableSchema) -> Self {
Self {
table_schema,
partition_template: None,
}
}
/// This table's ID
pub fn id(&self) -> TableId {
self.table_schema.id
}
/// This table's schema
pub fn schema(&self) -> &TableSchema {
&self.table_schema
}
/// This table's columns
pub fn columns(&self) -> &BTreeMap<String, ColumnSchema> {
&self.table_schema.columns
}
/// Mutable access to his table's columns
pub fn columns_mut(&mut self) -> &mut BTreeMap<String, ColumnSchema> {
&mut self.table_schema.columns
}
/// Add `col` to this table schema.
///
/// # Panics
///
/// This method panics if a column of the same name already exists in
/// `self`.
pub fn add_column(&mut self, col: &Column) {
let old = self
.table_schema
.columns
.insert(col.name.clone(), ColumnSchema::from(col));
assert!(old.is_none());
}
/// Estimated Size in bytes including `self`.
pub fn size(&self) -> usize {
size_of_val(self)
+ size_of_val(&self.partition_template)
+ self
.table_schema
.columns
.iter()
.map(|(k, v)| size_of_val(k) + k.capacity() + size_of_val(v))
.sum::<usize>()
}
/// Create `ID->name` map for columns.
pub fn column_id_map(&self) -> HashMap<ColumnId, &str> {
self.table_schema
.columns
.iter()
.map(|(name, c)| (c.id, name.as_str()))
.collect()
}
/// Return the set of column names for this table. Used in combination with a write operation's
/// column names to determine whether a write would exceed the max allowed columns.
pub fn column_names(&self) -> BTreeSet<&str> {
self.table_schema
.columns
.keys()
.map(|name| name.as_str())
.collect()
}
/// Return number of columns of the table
pub fn column_count(&self) -> usize {
self.table_schema.columns.len()
}
}
impl From<&Table> for TableInfo {
fn from(table: &Table) -> Self {
let &Table { id, .. } = table;
Self {
table_schema: TableSchema::new(id),
// TODO: Store and retrieve PartitionTemplate from the database
partition_template: None,
}
}
}
/// Column definitions for a table
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct TableSchema {
@ -599,10 +710,10 @@ impl From<ColumnType> for InfluxColumnType {
}
}
impl TryFrom<TableSchema> for Schema {
impl TryFrom<&TableSchema> for Schema {
type Error = schema::builder::Error;
fn try_from(value: TableSchema) -> Result<Self, Self::Error> {
fn try_from(value: &TableSchema) -> Result<Self, Self::Error> {
let mut builder = SchemaBuilder::new();
for (column_name, column_schema) in &value.columns {
@ -614,6 +725,14 @@ impl TryFrom<TableSchema> for Schema {
}
}
impl TryFrom<TableSchema> for Schema {
type Error = schema::builder::Error;
fn try_from(value: TableSchema) -> Result<Self, Self::Error> {
Self::try_from(&value)
}
}
impl PartialEq<InfluxColumnType> for ColumnType {
fn eq(&self, got: &InfluxColumnType) -> bool {
match self {
@ -2918,13 +3037,24 @@ mod tests {
max_columns_per_table: 4,
max_tables: 42,
retention_period_ns: None,
partition_template: None,
};
let schema2 = NamespaceSchema {
id: NamespaceId::new(1),
tables: BTreeMap::from([(String::from("foo"), TableSchema::new(TableId::new(1)))]),
tables: BTreeMap::from([(
String::from("foo"),
TableInfo {
table_schema: TableSchema {
id: TableId::new(1),
columns: BTreeMap::new(),
},
partition_template: None,
},
)]),
max_columns_per_table: 4,
max_tables: 42,
retention_period_ns: None,
partition_template: None,
};
assert!(schema1.size() < schema2.size());
}

View File

@ -2,7 +2,7 @@ use crate::{AggregateTSMMeasurement, AggregateTSMSchema};
use chrono::{format::StrftimeItems, offset::FixedOffset, DateTime, Duration};
use data_types::{
ColumnType, Namespace, NamespaceName, NamespaceSchema, OrgBucketMappingError, Partition,
PartitionKey, TableSchema,
PartitionKey, TableInfo,
};
use iox_catalog::interface::{
get_schema_by_name, CasFailure, Catalog, RepoCollection, SoftDeletedRows,
@ -126,10 +126,10 @@ where
.tables()
.create_or_get(measurement_name, iox_schema.id)
.await
.map(|t| TableSchema::new(t.id))?;
.map(|t| TableInfo::from(&t))?;
let time_col = repos
.columns()
.create_or_get("time", table.id, ColumnType::Time)
.create_or_get("time", table.id(), ColumnType::Time)
.await?;
table.add_column(&time_col);
table
@ -140,7 +140,7 @@ where
// fields and tags are both columns; tag is a special type of column.
// check that the schema has all these columns or update accordingly.
for tag in measurement.tags.values() {
match table.columns.get(tag.name.as_str()) {
match table.columns().get(tag.name.as_str()) {
Some(c) if c.is_tag() => {
// nothing to do, all good
}
@ -178,7 +178,7 @@ where
field.name, field_type, e,
))
})?);
match table.columns.get(field.name.as_str()) {
match table.columns().get(field.name.as_str()) {
Some(c) if c.matches_type(influx_column_type) => {
// nothing to do, all good
}
@ -210,7 +210,7 @@ where
// figure it's okay.
repos
.columns()
.create_or_get_many_unchecked(table.id, column_batch)
.create_or_get_many_unchecked(table.id(), column_batch)
.await?;
}
// create a partition for every day in the date range.
@ -223,7 +223,7 @@ where
// gets matched as `None`` in the code below
let partition = repos
.partitions()
.create_or_get(partition_key, table.id)
.create_or_get(partition_key, table.id())
.await
.map_err(UpdateCatalogError::CatalogError)?;
// get the sort key from the partition, if it exists. create it or update it as
@ -384,10 +384,10 @@ mod tests {
.expect("got schema");
assert_eq!(iox_schema.tables.len(), 1);
let table = iox_schema.tables.get("cpu").expect("got table");
assert_eq!(table.columns.len(), 3); // one tag & one field, plus time
let tag = table.columns.get("host").expect("got tag");
assert_eq!(table.columns().len(), 3); // one tag & one field, plus time
let tag = table.columns().get("host").expect("got tag");
assert!(tag.is_tag());
let field = table.columns.get("usage").expect("got field");
let field = table.columns().get("usage").expect("got field");
assert_eq!(
field.column_type,
InfluxColumnType::Field(InfluxFieldType::Float)
@ -395,7 +395,7 @@ mod tests {
// check that the partitions were created and the sort keys are correct
let partitions = repos
.partitions()
.list_by_table_id(table.id)
.list_by_table_id(table.id())
.await
.expect("got partitions");
// number of days in the date range of the schema
@ -435,22 +435,22 @@ mod tests {
.tables()
.create_or_get("weather", namespace.id)
.await
.map(|t| TableSchema::new(t.id))
.map(|t| TableInfo::from(&t))
.expect("table created");
let time_col = txn
.columns()
.create_or_get("time", table.id, ColumnType::Time)
.create_or_get("time", table.id(), ColumnType::Time)
.await
.expect("column created");
table.add_column(&time_col);
let location_col = txn
.columns()
.create_or_get("city", table.id, ColumnType::Tag)
.create_or_get("city", table.id(), ColumnType::Tag)
.await
.expect("column created");
let temperature_col = txn
.columns()
.create_or_get("temperature", table.id, ColumnType::F64)
.create_or_get("temperature", table.id(), ColumnType::F64)
.await
.expect("column created");
table.add_column(&location_col);
@ -491,17 +491,17 @@ mod tests {
.expect("got schema");
assert_eq!(iox_schema.tables.len(), 1);
let table = iox_schema.tables.get("weather").expect("got table");
assert_eq!(table.columns.len(), 5); // two tags, two fields, plus time
let tag1 = table.columns.get("city").expect("got tag");
assert_eq!(table.columns().len(), 5); // two tags, two fields, plus time
let tag1 = table.columns().get("city").expect("got tag");
assert!(tag1.is_tag());
let tag2 = table.columns.get("country").expect("got tag");
let tag2 = table.columns().get("country").expect("got tag");
assert!(tag2.is_tag());
let field1 = table.columns.get("temperature").expect("got field");
let field1 = table.columns().get("temperature").expect("got field");
assert_eq!(
field1.column_type,
InfluxColumnType::Field(InfluxFieldType::Float)
);
let field2 = table.columns.get("humidity").expect("got field");
let field2 = table.columns().get("humidity").expect("got field");
assert_eq!(
field2.column_type,
InfluxColumnType::Field(InfluxFieldType::Float)
@ -527,17 +527,17 @@ mod tests {
.tables()
.create_or_get("weather", namespace.id)
.await
.map(|t| TableSchema::new(t.id))
.map(|t| TableInfo::from(&t))
.expect("table created");
let time_col = txn
.columns()
.create_or_get("time", table.id, ColumnType::Time)
.create_or_get("time", table.id(), ColumnType::Time)
.await
.expect("column created");
table.add_column(&time_col);
let temperature_col = txn
.columns()
.create_or_get("temperature", table.id, ColumnType::F64)
.create_or_get("temperature", table.id(), ColumnType::F64)
.await
.expect("column created");
table.add_column(&temperature_col);
@ -592,17 +592,17 @@ mod tests {
.tables()
.create_or_get("weather", namespace.id)
.await
.map(|t| TableSchema::new(t.id))
.map(|t| TableInfo::from(&t))
.expect("table created");
let time_col = txn
.columns()
.create_or_get("time", table.id, ColumnType::Time)
.create_or_get("time", table.id(), ColumnType::Time)
.await
.expect("column created");
table.add_column(&time_col);
let temperature_col = txn
.columns()
.create_or_get("temperature", table.id, ColumnType::F64)
.create_or_get("temperature", table.id(), ColumnType::F64)
.await
.expect("column created");
table.add_column(&temperature_col);

View File

@ -215,12 +215,14 @@ where
self.namespaces
.insert(
ns.id,
NamespaceSchema::new(
ns.id,
iox_catalog::DEFAULT_MAX_COLUMNS_PER_TABLE,
iox_catalog::DEFAULT_MAX_TABLES,
NamespaceSchema {
id: ns.id,
tables: Default::default(),
max_columns_per_table: iox_catalog::DEFAULT_MAX_COLUMNS_PER_TABLE as usize,
max_tables: iox_catalog::DEFAULT_MAX_TABLES as usize,
retention_period_ns,
),
partition_template: None,
},
)
.is_none(),
"namespace must not be duplicated"

View File

@ -4,7 +4,7 @@ use async_trait::async_trait;
use data_types::{
Column, ColumnSchema, ColumnType, CompactionLevel, Namespace, NamespaceId, NamespaceSchema,
ParquetFile, ParquetFileId, ParquetFileParams, Partition, PartitionId, PartitionKey,
SkippedCompaction, Table, TableId, TableSchema, Timestamp,
SkippedCompaction, Table, TableId, TableInfo, TableSchema, Timestamp,
};
use iox_time::TimeProvider;
use snafu::{OptionExt, Snafu};
@ -588,21 +588,17 @@ where
let columns = repos.columns().list_by_namespace_id(namespace.id).await?;
let tables = repos.tables().list_by_namespace_id(namespace.id).await?;
let mut namespace = NamespaceSchema::new(
namespace.id,
namespace.max_columns_per_table,
namespace.max_tables,
namespace.retention_period_ns,
);
let mut namespace = NamespaceSchema::from(&namespace);
let mut table_id_to_schema = BTreeMap::new();
let mut table_id_to_info = BTreeMap::new();
for t in tables {
table_id_to_schema.insert(t.id, (t.name, TableSchema::new(t.id)));
let table_info = TableInfo::from(&t);
table_id_to_info.insert(t.id, (t.name, table_info));
}
for c in columns {
let (_, t) = table_id_to_schema.get_mut(&c.table_id).unwrap();
t.columns.insert(
let (_, t) = table_id_to_info.get_mut(&c.table_id).unwrap();
t.columns_mut().insert(
c.name,
ColumnSchema {
id: c.id,
@ -611,7 +607,7 @@ where
);
}
for (_, (table_name, schema)) in table_id_to_schema {
for (_, (table_name, schema)) in table_id_to_info {
namespace.tables.insert(table_name, schema);
}
@ -705,23 +701,23 @@ pub async fn list_schemas(
});
// A set of tables within a single namespace.
type NamespaceTables = BTreeMap<String, TableSchema>;
type NamespaceTables = BTreeMap<String, TableInfo>;
let mut joined = HashMap::<NamespaceId, NamespaceTables>::default();
for column in columns {
// Resolve the table this column references
let table = tables.get(&column.table_id).expect("no table for column");
let table_schema = joined
let table_info = joined
// Find or create a record in the joined <NamespaceId, Tables> map
// for this namespace ID.
.entry(table.namespace_id)
.or_default()
// Fetch the schema record for this table, or create an empty one.
.entry(table.name.clone())
.or_insert_with(|| TableSchema::new(column.table_id));
.or_insert_with(|| TableInfo::from(table));
table_schema.add_column(&column);
table_info.add_column(&column);
}
// The table map is no longer needed - immediately reclaim the memory.
@ -739,12 +735,8 @@ pub async fn list_schemas(
// The catalog call explicitly asked for no soft deleted records.
assert!(v.deleted_at.is_none());
let mut ns = NamespaceSchema::new(
v.id,
v.max_columns_per_table,
v.max_tables,
v.retention_period_ns,
);
let mut ns = NamespaceSchema::from(&v);
ns.tables = joined.remove(&v.id)?;
Some((v, ns))
});
@ -3049,12 +3041,7 @@ pub(crate) mod test_helpers {
let batches = mutable_batch_lp::lines_to_batches(lines, 42).unwrap();
let batches = batches.iter().map(|(table, batch)| (table.as_str(), batch));
let ns = NamespaceSchema::new(
namespace.id,
namespace.max_columns_per_table,
namespace.max_tables,
namespace.retention_period_ns,
);
let ns = NamespaceSchema::from(&namespace);
let schema = validate_or_insert_schema(batches, &ns, repos)
.await

View File

@ -14,7 +14,7 @@
)]
use crate::interface::{ColumnTypeMismatchSnafu, Error, RepoCollection, Result};
use data_types::{ColumnType, NamespaceSchema, TableSchema};
use data_types::{ColumnType, NamespaceSchema, TableInfo};
use mutable_batch::MutableBatch;
use std::{borrow::Cow, collections::HashMap};
use thiserror::Error;
@ -118,12 +118,12 @@ where
.tables()
.create_or_get(table_name, schema.id)
.await
.map(|t| TableSchema::new(t.id))?;
.map(|t| TableInfo::from(&t))?;
// Always add a time column to all new tables.
let time_col = repos
.columns()
.create_or_get(TIME_COLUMN, table.id, ColumnType::Time)
.create_or_get(TIME_COLUMN, table.id(), ColumnType::Time)
.await?;
table.add_column(&time_col);
@ -152,7 +152,7 @@ where
// If it does, validate it. If it does not exist, create it and insert
// it into the cached schema.
match table.columns.get(name.as_str()) {
match table.columns().get(name.as_str()) {
Some(existing) if existing.matches_type(col.influx_type()) => {
// No action is needed as the column matches the existing column
// schema.
@ -182,7 +182,7 @@ where
if !column_batch.is_empty() {
repos
.columns()
.create_or_get_many_unchecked(table.id, column_batch)
.create_or_get_many_unchecked(table.id(), column_batch)
.await?
.into_iter()
.for_each(|c| table.to_mut().add_column(&c));
@ -242,12 +242,7 @@ mod tests {
.await
.unwrap();
let schema = NamespaceSchema::new(
namespace.id,
namespace.max_columns_per_table,
namespace.max_tables,
namespace.retention_period_ns,
);
let schema = NamespaceSchema::from(&namespace);
// Apply all the lp literals as individual writes, feeding
// the result of one validation into the next to drive
@ -295,7 +290,7 @@ mod tests {
.iter()
.map(|(table, table_schema)| {
let desired_cols = table_schema
.columns
.columns()
.iter()
.map(|(column, column_schema)| (column.clone(), column_schema.column_type))
.collect::<BTreeMap<_, _>>();

View File

@ -13,7 +13,7 @@ use cache_system::{
loader::{metrics::MetricsLoader, FunctionLoader},
resource_consumption::FunctionEstimator,
};
use data_types::{ColumnId, NamespaceId, NamespaceSchema, TableId, TableSchema};
use data_types::{ColumnId, NamespaceId, NamespaceSchema, TableId, TableInfo};
use iox_catalog::interface::{get_schema_by_name, Catalog, SoftDeletedRows};
use iox_time::TimeProvider;
use schema::Schema;
@ -238,17 +238,20 @@ impl CachedTable {
}
}
impl From<TableSchema> for CachedTable {
fn from(table: TableSchema) -> Self {
impl From<TableInfo> for CachedTable {
fn from(table: TableInfo) -> Self {
let mut column_id_map: HashMap<ColumnId, Arc<str>> = table
.columns
.columns()
.iter()
.map(|(name, c)| (c.id, Arc::from(name.clone())))
.collect();
column_id_map.shrink_to_fit();
let id = table.id;
let schema: Schema = table.try_into().expect("Catalog table schema broken");
let id = table.id();
let schema: Schema = table
.schema()
.try_into()
.expect("Catalog table schema broken");
let mut column_id_map_rev: HashMap<Arc<str>, ColumnId> = column_id_map
.iter()

View File

@ -32,8 +32,8 @@ pub async fn querier_table(catalog: &Arc<TestCatalog>, table: &Arc<TestTable>) -
)
.await
.unwrap();
let schema = catalog_schema.tables.remove(&table.table.name).unwrap();
let schema = Schema::try_from(schema).unwrap();
let table_info = catalog_schema.tables.remove(&table.table.name).unwrap();
let schema = Schema::try_from(table_info.schema()).unwrap();
let namespace_name = Arc::from(table.namespace.namespace.name.as_str());

View File

@ -52,7 +52,13 @@ fn bench(group: &mut BenchmarkGroup<WallTime>, tables: usize, columns_per_table:
for i in 0..65_000 {
let write = lp_to_writes(format!("{}{}", i + 10_000_000, generate_lp(1, 1)).as_str());
let _ = runtime().block_on(validator.write(&NAMESPACE, NamespaceId::new(42), write, None));
let _ = runtime().block_on(validator.write(
&NAMESPACE,
NamespaceId::new(42),
None,
write,
None,
));
}
let write = lp_to_writes(&generate_lp(tables, columns_per_table));
@ -64,7 +70,7 @@ fn bench(group: &mut BenchmarkGroup<WallTime>, tables: usize, columns_per_table:
group.bench_function(format!("{tables}x{columns_per_table}"), |b| {
b.to_async(runtime()).iter_batched(
|| write.clone(),
|write| validator.write(&NAMESPACE, NamespaceId::new(42), write, None),
|write| validator.write(&NAMESPACE, NamespaceId::new(42), None, write, None),
BatchSize::SmallInput,
);
});

View File

@ -1,5 +1,6 @@
use async_trait::async_trait;
use data_types::{NamespaceId, NamespaceName};
use data_types::{NamespaceId, NamespaceName, PartitionTemplate};
use std::sync::Arc;
use trace::ctx::SpanContext;
use super::{DmlError, DmlHandler};
@ -59,17 +60,30 @@ where
&self,
namespace: &NamespaceName<'static>,
namespace_id: NamespaceId,
namespace_partition_template: Option<Arc<PartitionTemplate>>,
input: Self::WriteInput,
span_ctx: Option<SpanContext>,
) -> Result<Self::WriteOutput, Self::WriteError> {
let output = self
.first
.write(namespace, namespace_id, input, span_ctx.clone())
.write(
namespace,
namespace_id,
namespace_partition_template.clone(),
input,
span_ctx.clone(),
)
.await
.map_err(Into::into)?;
self.second
.write(namespace, namespace_id, output, span_ctx)
.write(
namespace,
namespace_id,
namespace_partition_template,
output,
span_ctx,
)
.await
.map_err(Into::into)
}

View File

@ -1,7 +1,7 @@
use std::{fmt::Debug, marker::PhantomData};
use std::{fmt::Debug, marker::PhantomData, sync::Arc};
use async_trait::async_trait;
use data_types::{NamespaceId, NamespaceName};
use data_types::{NamespaceId, NamespaceName, PartitionTemplate};
use futures::{stream::FuturesUnordered, TryStreamExt};
use trace::ctx::SpanContext;
@ -50,6 +50,7 @@ where
&self,
namespace: &NamespaceName<'static>,
namespace_id: NamespaceId,
namespace_partition_template: Option<Arc<PartitionTemplate>>,
input: Self::WriteInput,
span_ctx: Option<SpanContext>,
) -> Result<Self::WriteOutput, Self::WriteError> {
@ -58,9 +59,16 @@ where
.map(|v| {
let namespace = namespace.clone();
let span_ctx = span_ctx.clone();
let namespace_partition_template = namespace_partition_template.clone();
async move {
self.inner
.write(&namespace, namespace_id, v, span_ctx)
.write(
&namespace,
namespace_id,
namespace_partition_template,
v,
span_ctx,
)
.await
}
})

View File

@ -1,7 +1,8 @@
use async_trait::async_trait;
use data_types::{NamespaceId, NamespaceName};
use data_types::{NamespaceId, NamespaceName, PartitionTemplate};
use iox_time::{SystemProvider, TimeProvider};
use metric::{DurationHistogram, Metric};
use std::sync::Arc;
use trace::{ctx::SpanContext, span::SpanRecorder};
use super::DmlHandler;
@ -53,6 +54,7 @@ where
&self,
namespace: &NamespaceName<'static>,
namespace_id: NamespaceId,
namespace_partition_template: Option<Arc<PartitionTemplate>>,
input: Self::WriteInput,
span_ctx: Option<SpanContext>,
) -> Result<Self::WriteOutput, Self::WriteError> {
@ -64,7 +66,13 @@ where
let res = self
.inner
.write(namespace, namespace_id, input, span_ctx)
.write(
namespace,
namespace_id,
namespace_partition_template,
input,
span_ctx,
)
.await;
// Avoid exploding if time goes backwards - simply drop the measurement
@ -148,7 +156,7 @@ mod tests {
let decorator = InstrumentationDecorator::new(HANDLER_NAME, &metrics, handler);
decorator
.write(&ns, NamespaceId::new(42), (), Some(span))
.write(&ns, NamespaceId::new(42), None, (), Some(span))
.await
.expect("inner handler configured to succeed");
@ -171,7 +179,7 @@ mod tests {
let decorator = InstrumentationDecorator::new(HANDLER_NAME, &metrics, handler);
let err = decorator
.write(&ns, NamespaceId::new(42), (), Some(span))
.write(&ns, NamespaceId::new(42), None, (), Some(span))
.await
.expect_err("inner handler configured to fail");

View File

@ -1,7 +1,7 @@
use std::{collections::VecDeque, fmt::Debug};
use std::{collections::VecDeque, fmt::Debug, sync::Arc};
use async_trait::async_trait;
use data_types::{NamespaceId, NamespaceName};
use data_types::{NamespaceId, NamespaceName, PartitionTemplate};
use parking_lot::Mutex;
use trace::ctx::SpanContext;
@ -14,6 +14,7 @@ pub enum MockDmlHandlerCall<W> {
Write {
namespace: String,
namespace_id: NamespaceId,
namespace_partition_template: Option<Arc<PartitionTemplate>>,
write_input: W,
},
}
@ -87,6 +88,7 @@ where
&self,
namespace: &NamespaceName<'static>,
namespace_id: NamespaceId,
namespace_partition_template: Option<Arc<PartitionTemplate>>,
write_input: Self::WriteInput,
_span_ctx: Option<SpanContext>,
) -> Result<Self::WriteOutput, Self::WriteError> {
@ -95,6 +97,7 @@ where
MockDmlHandlerCall::Write {
namespace: namespace.into(),
namespace_id,
namespace_partition_template,
write_input,
},
write_return

View File

@ -1,9 +1,9 @@
//! A NOP implementation of [`DmlHandler`].
use std::{fmt::Debug, marker::PhantomData};
use std::{fmt::Debug, marker::PhantomData, sync::Arc};
use async_trait::async_trait;
use data_types::{NamespaceId, NamespaceName};
use data_types::{NamespaceId, NamespaceName, PartitionTemplate};
use observability_deps::tracing::*;
use trace::ctx::SpanContext;
@ -32,6 +32,7 @@ where
&self,
namespace: &NamespaceName<'static>,
namespace_id: NamespaceId,
_namespace_partition_template: Option<Arc<PartitionTemplate>>,
batches: Self::WriteInput,
_span_ctx: Option<SpanContext>,
) -> Result<Self::WriteOutput, Self::WriteError> {

View File

@ -3,6 +3,7 @@ use data_types::{NamespaceId, NamespaceName, PartitionKey, PartitionTemplate, Ta
use hashbrown::HashMap;
use mutable_batch::{MutableBatch, PartitionWrite, WritePayload};
use observability_deps::tracing::*;
use std::sync::Arc;
use thiserror::Error;
use trace::ctx::SpanContext;
@ -48,14 +49,16 @@ impl<T> Partitioned<T> {
/// occurs during partitioning.
#[derive(Debug)]
pub struct Partitioner {
partition_template: PartitionTemplate,
partition_template: Arc<PartitionTemplate>,
}
impl Partitioner {
/// Initialise a new [`Partitioner`], splitting writes according to the
/// specified [`PartitionTemplate`].
pub fn new(partition_template: PartitionTemplate) -> Self {
Self { partition_template }
Self {
partition_template: Arc::new(partition_template),
}
}
}
@ -63,14 +66,15 @@ impl Partitioner {
impl DmlHandler for Partitioner {
type WriteError = PartitionError;
type WriteInput = HashMap<TableId, (String, MutableBatch)>;
type WriteOutput = Vec<Partitioned<Self::WriteInput>>;
type WriteInput = HashMap<TableId, (String, Option<Arc<PartitionTemplate>>, MutableBatch)>;
type WriteOutput = Vec<Partitioned<HashMap<TableId, (String, MutableBatch)>>>;
/// Partition the per-table [`MutableBatch`].
async fn write(
&self,
_namespace: &NamespaceName<'static>,
_namespace_id: NamespaceId,
namespace_partition_template: Option<Arc<PartitionTemplate>>,
batch: Self::WriteInput,
_span_ctx: Option<SpanContext>,
) -> Result<Self::WriteOutput, Self::WriteError> {
@ -78,11 +82,20 @@ impl DmlHandler for Partitioner {
let mut partitions: HashMap<PartitionKey, HashMap<_, (String, MutableBatch)>> =
HashMap::default();
for (table_id, (table_name, batch)) in batch {
for (table_id, (table_name, table_partition_template, batch)) in batch {
// Partition the table batch according to the configured partition
// template and write it into the partition-keyed map.
// If the table has a partition template, use that. Otherwise, if the namespace has a
// partition template, use that. If neither the table nor the namespace has a template,
// use the partitioner's template.
let partition_template = table_partition_template
.as_ref()
.or(namespace_partition_template.as_ref())
.unwrap_or(&self.partition_template);
for (partition_key, partition_payload) in
PartitionWrite::partition(&batch, &self.partition_template)
PartitionWrite::partition(&batch, partition_template)
{
let partition = partitions.entry(partition_key).or_default();
let table_batch = partition
@ -107,17 +120,21 @@ impl DmlHandler for Partitioner {
mod tests {
use assert_matches::assert_matches;
use data_types::TemplatePart;
use super::*;
// Parse `lp` into a table-keyed MutableBatch map.
pub(crate) fn lp_to_writes(lp: &str) -> HashMap<TableId, (String, MutableBatch)> {
pub(crate) fn lp_to_writes(
lp: &str,
) -> HashMap<TableId, (String, Option<Arc<PartitionTemplate>>, MutableBatch)> {
let (writes, _) = mutable_batch_lp::lines_to_batches_stats(lp, 42)
.expect("failed to build test writes from LP");
writes
.into_iter()
.enumerate()
.map(|(i, (name, data))| (TableId::new(i as _), (name, data)))
.map(|(i, (name, data))| (TableId::new(i as _), (name, None, data)))
.collect()
}
@ -142,7 +159,13 @@ mod tests {
let writes = lp_to_writes($lp);
let handler_ret = partitioner.write(&ns, NamespaceId::new(42), writes, None).await;
let handler_ret = partitioner.write(
&ns,
NamespaceId::new(42),
None,
writes,
None
).await;
assert_matches!(handler_ret, $($want_handler_ret)+);
// Check the partition -> table mapping.
@ -182,7 +205,9 @@ mod tests {
#[allow(unused_mut)]
let mut want_writes: HashMap<PartitionKey, _> = Default::default();
$(
let mut want: Vec<String> = $want_tables.into_iter().map(|t| t.to_string()).collect();
let mut want: Vec<String> = $want_tables.into_iter()
.map(|t| t.to_string())
.collect();
want.sort();
want_writes.insert(PartitionKey::from($partition_key), want);
)*
@ -282,4 +307,250 @@ mod tests {
],
want_handler_ret = Ok(_)
);
#[tokio::test]
async fn test_write_namespace_partition_template() {
let partitioner = Partitioner::new(PartitionTemplate::default());
let ns = NamespaceName::new("bananas").expect("valid db name");
let namespace_partition_template = Some(Arc::new(PartitionTemplate {
parts: vec![
TemplatePart::TimeFormat("%Y".to_string()),
TemplatePart::Column("tag1".to_string()),
TemplatePart::Column("nonanas".to_string()),
],
}));
let writes = lp_to_writes(
"
bananas,tag1=A,tag2=C val=42i 1\n\
platanos,tag1=B,tag2=C value=42i 1465839830100400200\n\
platanos,tag1=A,tag2=D value=42i 1\n\
bananas,tag1=B,tag2=D value=42i 1465839830100400200\n\
bananas,tag1=A,tag2=D value=42i 1465839830100400200\n\
",
);
let handler_ret = partitioner
.write(
&ns,
NamespaceId::new(42),
namespace_partition_template,
writes,
None,
)
.await;
// Check the partition -> table mapping.
let got = handler_ret
.unwrap_or_default()
.into_iter()
.map(|partition| {
// Extract the table names in this partition
let mut tables = partition
.payload
.values()
.map(|v| v.0.clone())
.collect::<Vec<String>>();
tables.sort();
(partition.key, tables)
})
.collect::<HashMap<_, _>>();
let expected = HashMap::from([
(
PartitionKey::from("2016-tag1_B-nonanas"),
vec!["bananas".into(), "platanos".into()],
),
(
PartitionKey::from("1970-tag1_A-nonanas"),
vec!["bananas".into(), "platanos".into()],
),
(
PartitionKey::from("2016-tag1_A-nonanas"),
vec!["bananas".into()],
),
]);
pretty_assertions::assert_eq!(expected, got);
}
#[tokio::test]
async fn test_write_namespace_and_table_partition_template() {
let partitioner = Partitioner::new(PartitionTemplate::default());
let ns = NamespaceName::new("bananas").expect("valid db name");
// Specify this but the table partition will take precedence for bananas.
let namespace_partition_template = Some(Arc::new(PartitionTemplate {
parts: vec![
TemplatePart::TimeFormat("%Y".to_string()),
TemplatePart::Column("tag1".to_string()),
TemplatePart::Column("nonanas".to_string()),
],
}));
let bananas_table_template = Some(Arc::new(PartitionTemplate {
parts: vec![
TemplatePart::Column("oranges".to_string()),
TemplatePart::TimeFormat("%Y-%m".to_string()),
TemplatePart::Column("tag2".to_string()),
],
}));
let lp = "
bananas,tag1=A,tag2=C val=42i 1\n\
platanos,tag1=B,tag2=C value=42i 1465839830100400200\n\
platanos,tag1=A,tag2=D value=42i 1\n\
bananas,tag1=B,tag2=D value=42i 1465839830100400200\n\
bananas,tag1=A,tag2=D value=42i 1465839830100400200\n\
";
let (writes, _) = mutable_batch_lp::lines_to_batches_stats(lp, 42)
.expect("failed to build test writes from LP");
let writes = writes
.into_iter()
.enumerate()
.map(|(i, (name, data))| {
let table_partition_template = match name.as_str() {
"bananas" => bananas_table_template.clone(),
_ => None,
};
(TableId::new(i as _), (name, table_partition_template, data))
})
.collect();
let handler_ret = partitioner
.write(
&ns,
NamespaceId::new(42),
namespace_partition_template,
writes,
None,
)
.await;
// Check the partition -> table mapping.
let got = handler_ret
.unwrap_or_default()
.into_iter()
.map(|partition| {
// Extract the table names in this partition
let mut tables = partition
.payload
.values()
.map(|v| v.0.clone())
.collect::<Vec<String>>();
tables.sort();
(partition.key, tables)
})
.collect::<HashMap<_, _>>();
let expected = HashMap::from([
(
PartitionKey::from("oranges-1970-01-tag2_C"),
vec!["bananas".into()],
),
(
PartitionKey::from("oranges-2016-06-tag2_D"),
vec!["bananas".into()],
),
(
PartitionKey::from("1970-tag1_A-nonanas"),
vec!["platanos".into()],
),
(
PartitionKey::from("2016-tag1_B-nonanas"),
vec!["platanos".into()],
),
]);
pretty_assertions::assert_eq!(expected, got);
}
#[tokio::test]
async fn test_write_only_table_partition_template() {
let partitioner = Partitioner::new(PartitionTemplate::default());
let ns = NamespaceName::new("bananas").expect("valid db name");
// No namespace partition means the platanos table will fall back to the default
let namespace_partition_template = None;
let bananas_table_template = Some(Arc::new(PartitionTemplate {
parts: vec![
TemplatePart::Column("oranges".to_string()),
TemplatePart::TimeFormat("%Y-%m".to_string()),
TemplatePart::Column("tag2".to_string()),
],
}));
let lp = "
bananas,tag1=A,tag2=C val=42i 1\n\
platanos,tag1=B,tag2=C value=42i 1465839830100400200\n\
platanos,tag1=A,tag2=D value=42i 1\n\
bananas,tag1=B,tag2=D value=42i 1465839830100400200\n\
bananas,tag1=A,tag2=D value=42i 1465839830100400200\n\
";
let (writes, _) = mutable_batch_lp::lines_to_batches_stats(lp, 42)
.expect("failed to build test writes from LP");
let writes = writes
.into_iter()
.enumerate()
.map(|(i, (name, data))| {
let table_partition_template = match name.as_str() {
"bananas" => bananas_table_template.clone(),
_ => None,
};
(TableId::new(i as _), (name, table_partition_template, data))
})
.collect();
let handler_ret = partitioner
.write(
&ns,
NamespaceId::new(42),
namespace_partition_template,
writes,
None,
)
.await;
// Check the partition -> table mapping.
let got = handler_ret
.unwrap_or_default()
.into_iter()
.map(|partition| {
// Extract the table names in this partition
let mut tables = partition
.payload
.values()
.map(|v| v.0.clone())
.collect::<Vec<String>>();
tables.sort();
(partition.key, tables)
})
.collect::<HashMap<_, _>>();
let expected = HashMap::from([
(
PartitionKey::from("oranges-1970-01-tag2_C"),
vec!["bananas".into()],
),
(
PartitionKey::from("oranges-2016-06-tag2_D"),
vec!["bananas".into()],
),
(PartitionKey::from("1970-01-01"), vec!["platanos".into()]),
(PartitionKey::from("2016-06-13"), vec!["platanos".into()]),
]);
pretty_assertions::assert_eq!(expected, got);
}
}

View File

@ -1,9 +1,10 @@
use async_trait::async_trait;
use data_types::{NamespaceId, NamespaceName};
use data_types::{NamespaceId, NamespaceName, PartitionTemplate};
use hashbrown::HashMap;
use iox_time::{SystemProvider, TimeProvider};
use mutable_batch::MutableBatch;
use observability_deps::tracing::*;
use std::sync::Arc;
use thiserror::Error;
use trace::ctx::SpanContext;
@ -62,6 +63,7 @@ where
&self,
namespace: &NamespaceName<'static>,
_namespace_id: NamespaceId,
_namespace_partition_template: Option<Arc<PartitionTemplate>>,
batch: Self::WriteInput,
_span_ctx: Option<SpanContext>,
) -> Result<Self::WriteOutput, Self::WriteError> {
@ -129,7 +131,7 @@ mod tests {
let writes = lp_to_writes(&line);
let result = handler
.write(&NAMESPACE, NamespaceId::new(42), writes, None)
.write(&NAMESPACE, NamespaceId::new(42), None, writes, None)
.await;
// no error means the time is inside the retention period
@ -155,7 +157,7 @@ mod tests {
let writes = lp_to_writes(&line);
let result = handler
.write(&NAMESPACE, NamespaceId::new(42), writes, None)
.write(&NAMESPACE, NamespaceId::new(42), None, writes, None)
.await;
// error means the time is outside the retention period
@ -191,7 +193,7 @@ mod tests {
let writes = lp_to_writes(&lp);
let result = handler
.write(&NAMESPACE, NamespaceId::new(42), writes, None)
.write(&NAMESPACE, NamespaceId::new(42), None, writes, None)
.await;
// error means the time is outside the retention period
@ -227,7 +229,7 @@ mod tests {
let writes = lp_to_writes(&lp);
let result = handler
.write(&NAMESPACE, NamespaceId::new(42), writes, None)
.write(&NAMESPACE, NamespaceId::new(42), None, writes, None)
.await;
// error means the time is outside the retention period

View File

@ -16,7 +16,7 @@ use self::{
use super::{DmlHandler, Partitioned};
use async_trait::async_trait;
use data_types::{NamespaceId, NamespaceName, TableId};
use data_types::{NamespaceId, NamespaceName, PartitionTemplate, TableId};
use dml::{DmlMeta, DmlWrite};
use generated_types::influxdata::iox::ingester::v1::WriteRequest;
use hashbrown::HashMap;
@ -177,6 +177,7 @@ where
&self,
namespace: &NamespaceName<'static>,
namespace_id: NamespaceId,
_namespace_partition_template: Option<Arc<PartitionTemplate>>,
writes: Self::WriteInput,
span_ctx: Option<SpanContext>,
) -> Result<Self::WriteOutput, RpcWriteError> {
@ -385,6 +386,7 @@ mod tests {
.write(
&NamespaceName::new(NAMESPACE_NAME).unwrap(),
NAMESPACE_ID,
None,
input,
None,
)
@ -419,6 +421,7 @@ mod tests {
.write(
&NamespaceName::new(NAMESPACE_NAME).unwrap(),
NAMESPACE_ID,
None,
input,
None,
)
@ -481,6 +484,7 @@ mod tests {
.write(
&NamespaceName::new(NAMESPACE_NAME).unwrap(),
NAMESPACE_ID,
None,
input,
None,
)
@ -549,6 +553,7 @@ mod tests {
.write(
&NamespaceName::new(NAMESPACE_NAME).unwrap(),
NAMESPACE_ID,
None,
input,
None,
)

View File

@ -1,7 +1,7 @@
use std::{ops::DerefMut, sync::Arc};
use async_trait::async_trait;
use data_types::{NamespaceId, NamespaceName, NamespaceSchema, TableId};
use data_types::{NamespaceId, NamespaceName, NamespaceSchema, PartitionTemplate, TableId};
use hashbrown::HashMap;
use iox_catalog::{
interface::{Catalog, Error as CatalogError},
@ -148,8 +148,8 @@ where
// Accepts a map of TableName -> MutableBatch
type WriteInput = HashMap<String, MutableBatch>;
// And returns a map of TableId -> (TableName, MutableBatch)
type WriteOutput = HashMap<TableId, (String, MutableBatch)>;
// And returns a map of TableId -> (TableName, OptionalTablePartitionTemplate, MutableBatch)
type WriteOutput = HashMap<TableId, (String, Option<Arc<PartitionTemplate>>, MutableBatch)>;
/// Validate the schema of all the writes in `batches`.
///
@ -170,6 +170,7 @@ where
&self,
namespace: &NamespaceName<'static>,
namespace_id: NamespaceId,
_namespace_partition_template: Option<Arc<PartitionTemplate>>,
batches: Self::WriteInput,
_span_ctx: Option<SpanContext>,
) -> Result<Self::WriteOutput, Self::WriteError> {
@ -297,14 +298,16 @@ where
}
};
// Map the "TableName -> Data" into "TableId -> (TableName, Data)" for
// downstream handlers.
// Map the "TableName -> Data" into "TableId -> (TableName, OptionalTablePartitionTemplate,
// Data)" for downstream handlers.
let batches = batches
.into_iter()
.map(|(name, data)| {
let id = latest_schema.tables.get(&name).unwrap().id;
let table = latest_schema.tables.get(&name).unwrap();
let id = table.id();
let table_partition_template = table.partition_template.clone();
(id, (name, data))
(id, (name, table_partition_template, data))
})
.collect();
@ -564,12 +567,12 @@ mod tests {
// namespace schema gets cached
let writes1_valid = lp_to_writes("dragonfruit val=42i 123456");
handler1
.write(&NAMESPACE, NamespaceId::new(42), writes1_valid, None)
.write(&NAMESPACE, NamespaceId::new(42), None, writes1_valid, None)
.await
.expect("request should succeed");
let writes2_valid = lp_to_writes("dragonfruit val=43i 123457");
handler2
.write(&NAMESPACE, NamespaceId::new(42), writes2_valid, None)
.write(&NAMESPACE, NamespaceId::new(42), None, writes2_valid, None)
.await
.expect("request should succeed");
@ -577,12 +580,24 @@ mod tests {
// putting the table over the limit
let writes1_add_column = lp_to_writes("dragonfruit,tag1=A val=42i 123456");
handler1
.write(&NAMESPACE, NamespaceId::new(42), writes1_add_column, None)
.write(
&NAMESPACE,
NamespaceId::new(42),
None,
writes1_add_column,
None,
)
.await
.expect("request should succeed");
let writes2_add_column = lp_to_writes("dragonfruit,tag2=B val=43i 123457");
handler2
.write(&NAMESPACE, NamespaceId::new(42), writes2_add_column, None)
.write(
&NAMESPACE,
NamespaceId::new(42),
None,
writes2_add_column,
None,
)
.await
.expect("request should succeed");
@ -759,7 +774,7 @@ mod tests {
let table = ns.tables.get(table).expect("table should exist in cache");
assert_eq!(
table
.columns
.columns()
.get(col)
.expect("column not cached")
.column_type,
@ -779,7 +794,7 @@ mod tests {
let writes = lp_to_writes("bananas,tag1=A,tag2=B val=42i 123456");
let got = handler
.write(&NAMESPACE, NamespaceId::new(42), writes, None)
.write(&NAMESPACE, NamespaceId::new(42), None, writes, None)
.await
.expect("request should succeed");
@ -790,7 +805,7 @@ mod tests {
assert_cache(&handler, "bananas", "time", ColumnType::Time).await;
// Validate the table ID mapping.
let (name, _data) = got.get(&want_id).expect("table not in output");
let (name, _partition_template, _data) = got.get(&want_id).expect("table not in output");
assert_eq!(name, "bananas");
}
@ -804,7 +819,7 @@ mod tests {
let writes = lp_to_writes("bananas,tag1=A,tag2=B val=42i 123456");
let err = handler
.write(&ns, NamespaceId::new(42), writes, None)
.write(&ns, NamespaceId::new(42), None, writes, None)
.await
.expect_err("request should fail");
@ -823,7 +838,7 @@ mod tests {
// First write sets the schema
let writes = lp_to_writes("bananas,tag1=A,tag2=B val=42i 123456"); // val=i64
let got = handler
.write(&NAMESPACE, NamespaceId::new(42), writes.clone(), None)
.write(&NAMESPACE, NamespaceId::new(42), None, writes.clone(), None)
.await
.expect("request should succeed");
assert_eq!(writes.len(), got.len());
@ -831,7 +846,7 @@ mod tests {
// Second write attempts to violate it causing an error
let writes = lp_to_writes("bananas,tag1=A,tag2=B val=42.0 123456"); // val=float
let err = handler
.write(&NAMESPACE, NamespaceId::new(42), writes, None)
.write(&NAMESPACE, NamespaceId::new(42), None, writes, None)
.await
.expect_err("request should fail");
@ -857,7 +872,7 @@ mod tests {
// First write sets the schema
let writes = lp_to_writes("bananas,tag1=A,tag2=B val=42i 123456");
let got = handler
.write(&NAMESPACE, NamespaceId::new(42), writes.clone(), None)
.write(&NAMESPACE, NamespaceId::new(42), None, writes.clone(), None)
.await
.expect("request should succeed");
assert_eq!(writes.len(), got.len());
@ -875,7 +890,7 @@ mod tests {
// Second write attempts to violate limits, causing an error
let writes = lp_to_writes("bananas2,tag1=A,tag2=B val=42i 123456");
let err = handler
.write(&NAMESPACE, NamespaceId::new(42), writes, None)
.write(&NAMESPACE, NamespaceId::new(42), None, writes, None)
.await
.expect_err("request should fail");
@ -892,7 +907,7 @@ mod tests {
// First write sets the schema
let writes = lp_to_writes("bananas,tag1=A,tag2=B val=42i 123456");
let got = handler
.write(&NAMESPACE, NamespaceId::new(42), writes.clone(), None)
.write(&NAMESPACE, NamespaceId::new(42), None, writes.clone(), None)
.await
.expect("request should succeed");
assert_eq!(writes.len(), got.len());
@ -904,7 +919,7 @@ mod tests {
// Second write attempts to violate limits, causing an error
let writes = lp_to_writes("bananas,tag1=A,tag2=B val=42i,val2=42i 123456");
let err = handler
.write(&NAMESPACE, NamespaceId::new(42), writes, None)
.write(&NAMESPACE, NamespaceId::new(42), None, writes, None)
.await
.expect_err("request should fail");
@ -931,7 +946,7 @@ mod tests {
// First write attempts to add columns over the limit, causing an error
let writes = lp_to_writes("bananas,tag1=A,tag2=B val=42i,val2=42i 123456");
let err = handler
.write(&NAMESPACE, NamespaceId::new(42), writes, None)
.write(&NAMESPACE, NamespaceId::new(42), None, writes, None)
.await
.expect_err("request should fail");

View File

@ -2,7 +2,7 @@ use super::{
partitioner::PartitionError, retention_validation::RetentionError, RpcWriteError, SchemaError,
};
use async_trait::async_trait;
use data_types::{NamespaceId, NamespaceName};
use data_types::{NamespaceId, NamespaceName, PartitionTemplate};
use std::{error::Error, fmt::Debug, sync::Arc};
use thiserror::Error;
use trace::ctx::SpanContext;
@ -62,6 +62,7 @@ pub trait DmlHandler: Debug + Send + Sync {
&self,
namespace: &NamespaceName<'static>,
namespace_id: NamespaceId,
namespace_partition_template: Option<Arc<PartitionTemplate>>,
input: Self::WriteInput,
span_ctx: Option<SpanContext>,
) -> Result<Self::WriteOutput, Self::WriteError>;
@ -80,11 +81,18 @@ where
&self,
namespace: &NamespaceName<'static>,
namespace_id: NamespaceId,
namespace_partition_template: Option<Arc<PartitionTemplate>>,
input: Self::WriteInput,
span_ctx: Option<SpanContext>,
) -> Result<Self::WriteOutput, Self::WriteError> {
(**self)
.write(namespace, namespace_id, input, span_ctx)
.write(
namespace,
namespace_id,
namespace_partition_template,
input,
span_ctx,
)
.await
}
}

View File

@ -59,7 +59,7 @@ impl NamespaceCache for Arc<MemoryNamespaceCache> {
new_columns: schema
.tables
.values()
.map(|v| v.columns.len())
.map(|v| v.columns().len())
.sum::<usize>(),
new_tables: schema.tables.len(),
did_update: false,
@ -100,12 +100,14 @@ fn merge_schema_additive(
// to 0 as the schemas become fully populated, leaving the common path free
// of overhead.
for (old_table_name, old_table) in &old_ns.tables {
old_column_count += old_table.columns.len();
old_column_count += old_table.columns().len();
match new_ns.tables.get_mut(old_table_name) {
Some(new_table) => {
for (column_name, column) in &old_table.columns {
if !new_table.columns.contains_key(column_name) {
new_table.columns.insert(column_name.to_owned(), *column);
for (column_name, column) in old_table.columns() {
if !new_table.columns().contains_key(column_name) {
new_table
.columns_mut()
.insert(column_name.to_owned(), *column);
}
}
}
@ -125,7 +127,7 @@ fn merge_schema_additive(
new_columns: new_ns
.tables
.values()
.map(|v| v.columns.len())
.map(|v| v.columns().len())
.sum::<usize>()
- old_column_count,
did_update: true,
@ -139,7 +141,7 @@ mod tests {
use assert_matches::assert_matches;
use data_types::{
Column, ColumnId, ColumnSchema, ColumnType, NamespaceId, TableId, TableSchema,
Column, ColumnId, ColumnSchema, ColumnType, NamespaceId, TableId, TableInfo, TableSchema,
};
use proptest::{prelude::*, prop_compose, proptest};
@ -165,6 +167,7 @@ mod tests {
max_columns_per_table: 50,
max_tables: 24,
retention_period_ns: Some(876),
partition_template: None,
};
assert_matches!(cache.put_schema(ns.clone(), schema1.clone()), (new, s) => {
assert_eq!(*new, schema1);
@ -181,6 +184,7 @@ mod tests {
max_columns_per_table: 10,
max_tables: 42,
retention_period_ns: Some(876),
partition_template: None,
};
assert_matches!(cache.put_schema(ns.clone(), schema2.clone()), (new, s) => {
@ -219,27 +223,33 @@ mod tests {
let mut second_write_table_schema = TableSchema::new(table_id);
second_write_table_schema.add_column(&column_2);
assert_ne!(first_write_table_schema, second_write_table_schema); // These MUST always be different
// These MUST always be different
assert_ne!(first_write_table_schema, second_write_table_schema);
let first_write_table_info = TableInfo::new(first_write_table_schema);
let second_write_table_info = TableInfo::new(second_write_table_schema);
let schema_update_1 = NamespaceSchema {
id: NamespaceId::new(42),
tables: BTreeMap::from([(String::from(table_name), first_write_table_schema)]),
tables: BTreeMap::from([(String::from(table_name), first_write_table_info)]),
max_columns_per_table: 50,
max_tables: 24,
retention_period_ns: None,
partition_template: None,
};
let schema_update_2 = NamespaceSchema {
tables: BTreeMap::from([(String::from(table_name), second_write_table_schema)]),
..schema_update_1
tables: BTreeMap::from([(String::from(table_name), second_write_table_info)]),
..schema_update_1.clone()
};
let want_namespace_schema = {
let mut want_table_schema = TableSchema::new(table_id);
want_table_schema.add_column(&column_1);
want_table_schema.add_column(&column_2);
let want_table_schema = TableInfo::new(want_table_schema);
NamespaceSchema {
tables: BTreeMap::from([(String::from(table_name), want_table_schema)]),
..schema_update_1
..schema_update_1.clone()
}
};
@ -252,10 +262,16 @@ mod tests {
}
);
assert_matches!(cache.put_schema(ns.clone(), schema_update_1.clone()), (new_schema, new_stats) => {
assert_eq!(*new_schema, schema_update_1);
assert_eq!(new_stats, ChangeStats{ new_tables: 1, new_columns: 1, did_update: false});
});
assert_matches!(
cache.put_schema(ns.clone(), schema_update_1.clone()),
(new_schema, new_stats) => {
assert_eq!(*new_schema, schema_update_1);
assert_eq!(
new_stats,
ChangeStats { new_tables: 1, new_columns: 1, did_update: false }
);
}
);
assert_matches!(cache.put_schema(ns.clone(), schema_update_2), (new_schema, new_stats) => {
assert_eq!(*new_schema, want_namespace_schema);
assert_eq!(new_stats, ChangeStats{ new_tables: 0, new_columns: 1, did_update: true});
@ -287,6 +303,7 @@ mod tests {
name: "column_a".to_string(),
column_type: ColumnType::String,
});
let table_1 = TableInfo::new(table_1);
let mut table_2 = TableSchema::new(TableId::new(2));
table_2.add_column(&Column {
id: ColumnId::new(2),
@ -294,6 +311,7 @@ mod tests {
name: "column_b".to_string(),
column_type: ColumnType::String,
});
let table_2 = TableInfo::new(table_2);
let mut table_3 = TableSchema::new(TableId::new(3));
table_3.add_column(&Column {
id: ColumnId::new(3),
@ -301,6 +319,7 @@ mod tests {
name: "column_c".to_string(),
column_type: ColumnType::String,
});
let table_3 = TableInfo::new(table_3);
let schema_update_1 = NamespaceSchema {
id: NamespaceId::new(42),
@ -311,13 +330,14 @@ mod tests {
max_columns_per_table: 50,
max_tables: 24,
retention_period_ns: None,
partition_template: None,
};
let schema_update_2 = NamespaceSchema {
tables: BTreeMap::from([
(String::from("table_1"), table_1.to_owned()),
(String::from("table_3"), table_3.to_owned()),
]),
..schema_update_1
..schema_update_1.clone()
};
let want_namespace_schema = NamespaceSchema {
@ -326,7 +346,7 @@ mod tests {
(String::from("table_2"), table_2),
(String::from("table_3"), table_3),
]),
..schema_update_1
..schema_update_1.clone()
};
// Set up the cache and ensure there are no entries for the namespace.
@ -338,10 +358,16 @@ mod tests {
}
);
assert_matches!(cache.put_schema(ns.clone(), schema_update_1.clone()), (new_schema, new_stats) => {
assert_eq!(*new_schema, schema_update_1);
assert_eq!(new_stats, ChangeStats{ new_tables: 2, new_columns: 2, did_update: false});
});
assert_matches!(
cache.put_schema(ns.clone(), schema_update_1.clone()),
(new_schema, new_stats) => {
assert_eq!(*new_schema, schema_update_1);
assert_eq!(
new_stats,
ChangeStats { new_tables: 2, new_columns: 2, did_update: false }
);
}
);
assert_matches!(cache.put_schema(ns.clone(), schema_update_2), (new_schema, new_stats) => {
assert_eq!(*new_schema, want_namespace_schema);
assert_eq!(new_stats, ChangeStats{ new_tables: 1, new_columns: 1, did_update: true});
@ -372,7 +398,7 @@ mod tests {
}
prop_compose! {
/// Generate an arbitrary TableSchema with up to 10 columns.
/// Generate an arbitrary TableInfo with up to 10 columns.
fn arbitrary_table_schema()(
id in any::<i64>(),
columns in proptest::collection::btree_map(
@ -380,9 +406,9 @@ mod tests {
arbitrary_column_schema(),
(0, 10) // Set size range
),
) -> TableSchema {
) -> TableInfo {
let columns = columns.into_iter().map(|(k, v)| (k.to_string(), v)).collect();
TableSchema { id: TableId::new(id), columns }
TableInfo::new(TableSchema { id: TableId::new(id), columns })
}
}
@ -404,6 +430,7 @@ mod tests {
max_columns_per_table,
max_tables,
retention_period_ns,
partition_template: None,
}
}
}
@ -416,7 +443,7 @@ mod tests {
.flat_map(|(table_name, col_set)| {
// Build a set of tuples in the form (table_name, column_name)
col_set
.columns
.columns()
.keys()
.map(|col_name| (table_name.to_string(), col_name.to_string()))
})

View File

@ -128,7 +128,9 @@ mod tests {
use std::collections::BTreeMap;
use assert_matches::assert_matches;
use data_types::{ColumnId, ColumnSchema, ColumnType, NamespaceId, TableId, TableSchema};
use data_types::{
ColumnId, ColumnSchema, ColumnType, NamespaceId, TableId, TableInfo, TableSchema,
};
use metric::{Attributes, MetricObserver, Observation};
use super::*;
@ -156,10 +158,10 @@ mod tests {
(
i.to_string(),
TableSchema {
TableInfo::new(TableSchema {
id: TableId::new(i as _),
columns,
},
}),
)
})
.collect();
@ -170,6 +172,7 @@ mod tests {
max_columns_per_table: 100,
max_tables: 42,
retention_period_ns: None,
partition_template: None,
}
}

View File

@ -118,12 +118,14 @@ mod tests {
assert_matches!(cache.get_schema(&ns).await, Err(_));
// Place a schema in the cache for that name
let schema1 = NamespaceSchema::new(
NamespaceId::new(1),
iox_catalog::DEFAULT_MAX_COLUMNS_PER_TABLE,
iox_catalog::DEFAULT_MAX_TABLES,
iox_catalog::DEFAULT_RETENTION_PERIOD,
);
let schema1 = NamespaceSchema {
id: NamespaceId::new(1),
tables: Default::default(),
max_columns_per_table: iox_catalog::DEFAULT_MAX_COLUMNS_PER_TABLE as usize,
max_tables: iox_catalog::DEFAULT_MAX_TABLES as usize,
retention_period_ns: iox_catalog::DEFAULT_RETENTION_PERIOD,
partition_template: None,
};
assert_matches!(cache.put_schema(ns.clone(), schema1.clone()), (result, _) => {
assert_eq!(*result, schema1);
});
@ -152,12 +154,15 @@ mod tests {
assert_matches!(cache.get_schema(&ns).await, Err(_));
// Place a schema in the catalog for that name
let schema1 = NamespaceSchema::new(
NamespaceId::new(1),
iox_catalog::DEFAULT_MAX_COLUMNS_PER_TABLE,
iox_catalog::DEFAULT_MAX_TABLES,
iox_catalog::DEFAULT_RETENTION_PERIOD,
);
let schema1 = NamespaceSchema {
id: NamespaceId::new(1),
tables: Default::default(),
max_columns_per_table: iox_catalog::DEFAULT_MAX_COLUMNS_PER_TABLE as usize,
max_tables: iox_catalog::DEFAULT_MAX_TABLES as usize,
retention_period_ns: iox_catalog::DEFAULT_RETENTION_PERIOD,
partition_template: None,
};
assert_matches!(
catalog
.repositories()

View File

@ -74,6 +74,7 @@ mod tests {
max_columns_per_table: 7,
max_tables: 42,
retention_period_ns: None,
partition_template: None,
}
}

View File

@ -1,8 +1,9 @@
//! An trait to abstract resolving a[`NamespaceName`] to [`NamespaceId`], and a
//! collection of composable implementations.
use async_trait::async_trait;
use data_types::{NamespaceId, NamespaceName};
use data_types::{NamespaceId, NamespaceName, PartitionTemplate};
use observability_deps::tracing::*;
use std::sync::Arc;
use thiserror::Error;
use crate::namespace_cache::NamespaceCache;
@ -26,11 +27,11 @@ pub enum Error {
/// An abstract resolver of [`NamespaceName`] to [`NamespaceId`].
#[async_trait]
pub trait NamespaceResolver: std::fmt::Debug + Send + Sync {
/// Return the [`NamespaceId`] for the given [`NamespaceName`].
async fn get_namespace_id(
/// Return the [`NamespaceId`] and [`PartitionTemplate`] for the given [`NamespaceName`].
async fn get_namespace_info(
&self,
namespace: &NamespaceName<'static>,
) -> Result<NamespaceId, Error>;
) -> Result<(NamespaceId, Option<Arc<PartitionTemplate>>), Error>;
}
/// An implementation of [`NamespaceResolver`] that resolves the [`NamespaceId`]
@ -53,14 +54,13 @@ impl<C> NamespaceResolver for NamespaceSchemaResolver<C>
where
C: NamespaceCache<ReadError = iox_catalog::interface::Error>,
{
async fn get_namespace_id(
async fn get_namespace_info(
&self,
namespace: &NamespaceName<'static>,
) -> Result<NamespaceId, Error> {
// Load the namespace schema from the cache, falling back to pulling it
// from the global catalog (if it exists).
) -> Result<(NamespaceId, Option<Arc<PartitionTemplate>>), Error> {
// Load the namespace schema from the cache.
match self.cache.get_schema(namespace).await {
Ok(v) => Ok(v.id),
Ok(v) => Ok((v.id, v.partition_template.clone())),
Err(e) => return Err(Error::Lookup(e)),
}
}
@ -100,6 +100,7 @@ mod tests {
max_columns_per_table: 4,
max_tables: 42,
retention_period_ns: None,
partition_template: None,
},
);
@ -107,7 +108,7 @@ mod tests {
// Drive the code under test
resolver
.get_namespace_id(&ns)
.get_namespace_info(&ns)
.await
.expect("lookup should succeed");
@ -151,7 +152,7 @@ mod tests {
let resolver = NamespaceSchemaResolver::new(Arc::clone(&cache));
resolver
.get_namespace_id(&ns)
.get_namespace_info(&ns)
.await
.expect("lookup should succeed");
@ -188,7 +189,7 @@ mod tests {
let resolver = NamespaceSchemaResolver::new(Arc::clone(&cache));
let err = resolver
.get_namespace_id(&ns)
.get_namespace_info(&ns)
.await
.expect_err("lookup should succeed");
assert_matches!(
@ -214,7 +215,7 @@ mod tests {
let resolver = NamespaceSchemaResolver::new(Arc::clone(&cache));
let err = resolver
.get_namespace_id(&ns)
.get_namespace_info(&ns)
.await
.expect_err("lookup should error");

View File

@ -2,10 +2,10 @@
#![allow(missing_docs)]
use std::collections::HashMap;
use std::{collections::HashMap, sync::Arc};
use async_trait::async_trait;
use data_types::{NamespaceId, NamespaceName};
use data_types::{NamespaceId, NamespaceName, PartitionTemplate};
use parking_lot::Mutex;
use super::NamespaceResolver;
@ -31,15 +31,17 @@ impl MockNamespaceResolver {
#[async_trait]
impl NamespaceResolver for MockNamespaceResolver {
/// Return the [`NamespaceId`] for the given [`NamespaceName`].
async fn get_namespace_id(
async fn get_namespace_info(
&self,
namespace: &NamespaceName<'static>,
) -> Result<NamespaceId, super::Error> {
Ok(*self.map.lock().get(namespace).ok_or(super::Error::Lookup(
iox_catalog::interface::Error::NamespaceNotFoundByName {
name: namespace.to_string(),
},
))?)
) -> Result<(NamespaceId, Option<Arc<PartitionTemplate>>), super::Error> {
Ok((
*self.map.lock().get(namespace).ok_or(super::Error::Lookup(
iox_catalog::interface::Error::NamespaceNotFoundByName {
name: namespace.to_string(),
},
))?,
None,
))
}
}

View File

@ -1,7 +1,7 @@
use std::{fmt::Debug, sync::Arc};
use async_trait::async_trait;
use data_types::{NamespaceId, NamespaceName};
use data_types::{NamespaceId, NamespaceName, PartitionTemplate};
use iox_catalog::interface::Catalog;
use observability_deps::tracing::*;
use thiserror::Error;
@ -78,10 +78,10 @@ where
{
/// Force the creation of `namespace` if it does not already exist in the
/// cache, before passing the request through to the inner delegate.
async fn get_namespace_id(
async fn get_namespace_info(
&self,
namespace: &NamespaceName<'static>,
) -> Result<NamespaceId, super::Error> {
) -> Result<(NamespaceId, Option<Arc<PartitionTemplate>>), super::Error> {
if self.cache.get_schema(namespace).await.is_err() {
trace!(%namespace, "namespace not found in cache");
@ -90,7 +90,7 @@ where
// The namespace is not cached, but may exist in the
// catalog. Delegate discovery down to the inner handler,
// and map the lookup error to a reject error.
match self.inner.get_namespace_id(namespace).await {
match self.inner.get_namespace_info(namespace).await {
Ok(v) => return Ok(v),
Err(super::Error::Lookup(
iox_catalog::interface::Error::NamespaceNotFoundByName { .. },
@ -128,7 +128,7 @@ where
}
}
self.inner.get_namespace_id(namespace).await
self.inner.get_namespace_info(namespace).await
}
}
@ -171,6 +171,7 @@ mod tests {
max_columns_per_table: 4,
max_tables: 42,
retention_period_ns: None,
partition_template: None,
},
);
@ -182,11 +183,11 @@ mod tests {
);
// Drive the code under test
let got = creator
.get_namespace_id(&ns)
let (got_id, _got_partition_template) = creator
.get_namespace_info(&ns)
.await
.expect("handler should succeed");
assert_eq!(got, NAMESPACE_ID);
assert_eq!(got_id, NAMESPACE_ID);
// The cache hit should mean the catalog SHOULD NOT see a create request
// for the namespace.
@ -221,8 +222,8 @@ mod tests {
MissingNamespaceAction::AutoCreate(TEST_RETENTION_PERIOD_NS),
);
let created_id = creator
.get_namespace_id(&ns)
let (created_id, _created_partition_template) = creator
.get_namespace_info(&ns)
.await
.expect("handler should succeed");
@ -270,7 +271,7 @@ mod tests {
// It should not autocreate because we specified "rejection" behaviour, above
assert_matches!(
creator.get_namespace_id(&ns).await,
creator.get_namespace_info(&ns).await,
Err(crate::namespace_resolver::Error::Create(
NamespaceCreationError::Reject(_ns)
))
@ -307,7 +308,7 @@ mod tests {
);
let created_id = creator
.get_namespace_id(&ns)
.get_namespace_info(&ns)
.await
.expect("handler should succeed");
@ -321,7 +322,7 @@ mod tests {
// It should not autocreate because we specified "rejection" behaviour, above
let id = creator
.get_namespace_id(&ns)
.get_namespace_info(&ns)
.await
.expect("should allow existing namespace from catalog");
assert_eq!(created_id, id);

View File

@ -363,13 +363,19 @@ where
);
// Retrieve the namespace ID for this namespace.
let namespace_id = self
let (namespace_id, namespace_partition_template) = self
.namespace_resolver
.get_namespace_id(&write_info.namespace)
.get_namespace_info(&write_info.namespace)
.await?;
self.dml_handler
.write(&write_info.namespace, namespace_id, batches, span_ctx)
.write(
&write_info.namespace,
namespace_id,
namespace_partition_template,
batches,
span_ctx,
)
.await
.map_err(Into::into)?;
@ -670,7 +676,9 @@ mod tests {
body = "platanos,tag1=A,tag2=B val=42i 123456".as_bytes(),
dml_handler = [Ok(())],
want_result = Ok(_),
want_dml_calls = [MockDmlHandlerCall::Write{namespace, ..}] => {
want_dml_calls = [
MockDmlHandlerCall::Write { namespace, .. }
] => {
assert_eq!(namespace, NAMESPACE_NAME);
}
);
@ -681,7 +689,9 @@ mod tests {
body = "platanos,tag1=A,tag2=B val=42i 1647622847".as_bytes(),
dml_handler = [Ok(())],
want_result = Ok(_),
want_dml_calls = [MockDmlHandlerCall::Write{namespace, namespace_id, write_input}] => {
want_dml_calls = [
MockDmlHandlerCall::Write { namespace, namespace_id, write_input, .. }
] => {
assert_eq!(namespace, NAMESPACE_NAME);
assert_eq!(*namespace_id, NAMESPACE_ID);
@ -697,7 +707,9 @@ mod tests {
body = "platanos,tag1=A,tag2=B val=42i 1647622847000".as_bytes(),
dml_handler = [Ok(())],
want_result = Ok(_),
want_dml_calls = [MockDmlHandlerCall::Write{namespace, namespace_id, write_input}] => {
want_dml_calls = [
MockDmlHandlerCall::Write { namespace, namespace_id, write_input, .. }
] => {
assert_eq!(namespace, NAMESPACE_NAME);
assert_eq!(*namespace_id, NAMESPACE_ID);
@ -713,7 +725,9 @@ mod tests {
body = "platanos,tag1=A,tag2=B val=42i 1647622847000000".as_bytes(),
dml_handler = [Ok(())],
want_result = Ok(_),
want_dml_calls = [MockDmlHandlerCall::Write{namespace, namespace_id, write_input}] => {
want_dml_calls = [
MockDmlHandlerCall::Write { namespace, namespace_id, write_input, .. }
] => {
assert_eq!(namespace, NAMESPACE_NAME);
assert_eq!(*namespace_id, NAMESPACE_ID);
@ -729,7 +743,9 @@ mod tests {
body = "platanos,tag1=A,tag2=B val=42i 1647622847000000000".as_bytes(),
dml_handler = [Ok(())],
want_result = Ok(_),
want_dml_calls = [MockDmlHandlerCall::Write{namespace, namespace_id, write_input}] => {
want_dml_calls = [
MockDmlHandlerCall::Write { namespace, namespace_id, write_input, .. }
] => {
assert_eq!(namespace, NAMESPACE_NAME);
assert_eq!(*namespace_id, NAMESPACE_ID);
@ -853,7 +869,7 @@ mod tests {
body = "platanos,tag1=A,tag2=B val=42i 123456".as_bytes(),
dml_handler = [Err(DmlError::NamespaceNotFound(NAMESPACE_NAME.to_string()))],
want_result = Err(Error::DmlHandler(DmlError::NamespaceNotFound(_))),
want_dml_calls = [MockDmlHandlerCall::Write{namespace, ..}] => {
want_dml_calls = [MockDmlHandlerCall::Write { namespace, .. }] => {
assert_eq!(namespace, NAMESPACE_NAME);
}
);
@ -864,7 +880,7 @@ mod tests {
body = "platanos,tag1=A,tag2=B val=42i 123456".as_bytes(),
dml_handler = [Err(DmlError::Internal("💣".into()))],
want_result = Err(Error::DmlHandler(DmlError::Internal(_))),
want_dml_calls = [MockDmlHandlerCall::Write{namespace, ..}] => {
want_dml_calls = [MockDmlHandlerCall::Write { namespace, .. }] => {
assert_eq!(namespace, NAMESPACE_NAME);
}
);
@ -875,7 +891,9 @@ mod tests {
body = "test field=1u 100\ntest field=2u 100".as_bytes(),
dml_handler = [Ok(())],
want_result = Ok(_),
want_dml_calls = [MockDmlHandlerCall::Write{namespace, namespace_id, write_input}] => {
want_dml_calls = [
MockDmlHandlerCall::Write { namespace, namespace_id, write_input, .. }
] => {
assert_eq!(namespace, NAMESPACE_NAME);
assert_eq!(*namespace_id, NAMESPACE_ID);
let table = write_input.get("test").expect("table not in write");
@ -916,7 +934,7 @@ mod tests {
body = "whydo InputPower=300i,InputPower=300i".as_bytes(),
dml_handler = [Ok(())],
want_result = Ok(_),
want_dml_calls = [MockDmlHandlerCall::Write{namespace, write_input, ..}] => {
want_dml_calls = [MockDmlHandlerCall::Write { namespace, write_input, .. }] => {
assert_eq!(namespace, NAMESPACE_NAME);
let table = write_input.get("whydo").expect("table not in write");
let col = table.column("InputPower").expect("column missing");
@ -933,7 +951,7 @@ mod tests {
body = "whydo InputPower=300i,InputPower=42i".as_bytes(),
dml_handler = [Ok(())],
want_result = Ok(_),
want_dml_calls = [MockDmlHandlerCall::Write{namespace, write_input, ..}] => {
want_dml_calls = [MockDmlHandlerCall::Write { namespace, write_input, .. }] => {
assert_eq!(namespace, NAMESPACE_NAME);
let table = write_input.get("whydo").expect("table not in write");
let col = table.column("InputPower").expect("column missing");

View File

@ -55,9 +55,9 @@ fn schema_to_proto(schema: Arc<data_types::NamespaceSchema>) -> GetSchemaRespons
(
name.clone(),
TableSchema {
id: t.id.get(),
id: t.id().get(),
columns: t
.columns
.columns()
.iter()
.map(|(name, c)| {
(