refactor: make `SchemaMerger` self-consuming
The error handling in `merge` was incomplete, aka it could leave the merger in a half-modified state in case of an error. That's generally a bad idea and can lead to ugly bugs. Also the "builder" pattern that is used here usually consumes itself (and provides a clone impl), so it is easier to reason about modifications. So this commit just changes it to self-consuming builder. A nice side effect of the new pattern is also that it is build-time checked and does not contain a runtime assert any longer.pull/24376/head
parent
4f5fe62428
commit
4172d7946c
|
@ -79,14 +79,12 @@ fn nullable_to_str(nullability: bool) -> &'static str {
|
|||
///
|
||||
/// 2. The measurement names must be consistent: one or both can be
|
||||
/// `None`, or they can both be `Some(name`)
|
||||
#[derive(Debug, Default)]
|
||||
#[derive(Debug, Default, Clone)]
|
||||
pub struct SchemaMerger {
|
||||
/// Maps column names to their definition
|
||||
fields: HashMap<String, (Field, Option<InfluxColumnType>)>,
|
||||
/// The measurement name if any
|
||||
measurement: Option<String>,
|
||||
/// If the builder has been consumed
|
||||
finished: bool,
|
||||
}
|
||||
|
||||
impl SchemaMerger {
|
||||
|
@ -96,7 +94,7 @@ impl SchemaMerger {
|
|||
|
||||
/// Appends the schema to the merged schema being built,
|
||||
/// validating that no columns are added.
|
||||
pub fn merge(&mut self, other: &Schema) -> Result<&mut Self> {
|
||||
pub fn merge(mut self, other: &Schema) -> Result<Self> {
|
||||
// Verify measurement name is compatible
|
||||
match (self.measurement.as_ref(), other.measurement()) {
|
||||
(Some(existing_measurement), Some(new_measurement)) => {
|
||||
|
@ -170,17 +168,14 @@ impl SchemaMerger {
|
|||
}
|
||||
|
||||
/// Returns the schema that was built, the columns are always sorted in lexicographic order
|
||||
pub fn build(&mut self) -> Schema {
|
||||
pub fn build(self) -> Schema {
|
||||
self.build_with_sort_key(&Default::default())
|
||||
}
|
||||
|
||||
/// Returns the schema that was built, the columns are always sorted in lexicographic order
|
||||
///
|
||||
/// Additionally specifies a sort key for the data
|
||||
pub fn build_with_sort_key(&mut self, sort_key: &SortKey<'_>) -> Schema {
|
||||
assert!(!self.finished, "build called multiple times");
|
||||
self.finished = true;
|
||||
|
||||
pub fn build_with_sort_key(mut self, sort_key: &SortKey<'_>) -> Schema {
|
||||
Schema::new_from_parts(
|
||||
self.measurement.take(),
|
||||
self.fields.drain().map(|x| x.1),
|
||||
|
|
|
@ -121,7 +121,10 @@ impl<C: QueryChunk> ProviderBuilder<C> {
|
|||
pub fn add_chunk(&mut self, chunk: Arc<C>) -> Result<&mut Self> {
|
||||
let chunk_table_schema = chunk.schema();
|
||||
|
||||
self.schema_merger
|
||||
// TODO: avoid this clone call by making ProviderBuilder a self-consuming builder
|
||||
self.schema_merger = self
|
||||
.schema_merger
|
||||
.clone()
|
||||
.merge(&chunk_table_schema.as_ref())
|
||||
.context(ChunkSchemaNotCompatible {
|
||||
table_name: self.table_name.as_ref(),
|
||||
|
@ -159,7 +162,8 @@ impl<C: QueryChunk> ProviderBuilder<C> {
|
|||
assert!(!self.finished, "build called multiple times");
|
||||
self.finished = true;
|
||||
|
||||
let iox_schema = self.schema_merger.build();
|
||||
// TODO: avoid this clone call by making ProviderBuilder a self-consuming builder
|
||||
let iox_schema = self.schema_merger.clone().build();
|
||||
|
||||
// if the table was reported to exist, it should not be empty
|
||||
if self.chunks.is_empty() {
|
||||
|
@ -744,7 +748,7 @@ impl<C: QueryChunk + 'static> Deduplicater<C> {
|
|||
let chunk_schema = chunk.schema();
|
||||
let chunk_pk = chunk_schema.primary_key();
|
||||
let chunk_pk_schema = chunk_schema.select_by_names(&chunk_pk).unwrap();
|
||||
pk_schema_merger.merge(&chunk_pk_schema).unwrap();
|
||||
pk_schema_merger = pk_schema_merger.merge(&chunk_pk_schema).unwrap();
|
||||
}
|
||||
let pk_schema = pk_schema_merger.build();
|
||||
Arc::new(pk_schema)
|
||||
|
|
|
@ -337,10 +337,10 @@ impl TestChunk {
|
|||
};
|
||||
|
||||
let mut merger = SchemaMerger::new();
|
||||
merger.merge(&new_column_schema).unwrap();
|
||||
merger = merger.merge(&new_column_schema).unwrap();
|
||||
|
||||
if let Some(existing_schema) = self.table_schema.as_ref() {
|
||||
merger
|
||||
merger = merger
|
||||
.merge(existing_schema)
|
||||
.expect("merging was successful");
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue