fix: Undo the addition of a TableInfo type; store partition_template on TableSchema
parent
596673d515
commit
cc41216382
|
@ -49,9 +49,7 @@ impl NamespacesSource for MockNamespacesSource {
|
|||
mod tests {
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
use data_types::{
|
||||
Column, ColumnId, ColumnType, ColumnsByName, TableId, TableInfo, TableSchema,
|
||||
};
|
||||
use data_types::{Column, ColumnId, ColumnType, ColumnsByName, TableId, TableSchema};
|
||||
|
||||
use super::*;
|
||||
|
||||
|
@ -130,54 +128,50 @@ mod tests {
|
|||
let tables = BTreeMap::from([
|
||||
(
|
||||
"table1".to_string(),
|
||||
TableInfo {
|
||||
schema: TableSchema {
|
||||
id: TableId::new(1),
|
||||
columns: ColumnsByName::new(&[
|
||||
Column {
|
||||
name: "col1".to_string(),
|
||||
id: ColumnId::new(1),
|
||||
column_type: ColumnType::I64,
|
||||
table_id: TableId::new(1),
|
||||
},
|
||||
Column {
|
||||
name: "col2".to_string(),
|
||||
id: ColumnId::new(2),
|
||||
column_type: ColumnType::String,
|
||||
table_id: TableId::new(1),
|
||||
},
|
||||
]),
|
||||
},
|
||||
TableSchema {
|
||||
id: TableId::new(1),
|
||||
partition_template: None,
|
||||
columns: ColumnsByName::new(&[
|
||||
Column {
|
||||
name: "col1".to_string(),
|
||||
id: ColumnId::new(1),
|
||||
column_type: ColumnType::I64,
|
||||
table_id: TableId::new(1),
|
||||
},
|
||||
Column {
|
||||
name: "col2".to_string(),
|
||||
id: ColumnId::new(2),
|
||||
column_type: ColumnType::String,
|
||||
table_id: TableId::new(1),
|
||||
},
|
||||
]),
|
||||
},
|
||||
),
|
||||
(
|
||||
"table2".to_string(),
|
||||
TableInfo {
|
||||
schema: TableSchema {
|
||||
id: TableId::new(2),
|
||||
columns: ColumnsByName::new(&[
|
||||
Column {
|
||||
name: "col1".to_string(),
|
||||
id: ColumnId::new(3),
|
||||
column_type: ColumnType::I64,
|
||||
table_id: TableId::new(2),
|
||||
},
|
||||
Column {
|
||||
name: "col2".to_string(),
|
||||
id: ColumnId::new(4),
|
||||
column_type: ColumnType::String,
|
||||
table_id: TableId::new(2),
|
||||
},
|
||||
Column {
|
||||
name: "col3".to_string(),
|
||||
id: ColumnId::new(5),
|
||||
column_type: ColumnType::F64,
|
||||
table_id: TableId::new(2),
|
||||
},
|
||||
]),
|
||||
},
|
||||
TableSchema {
|
||||
id: TableId::new(2),
|
||||
partition_template: None,
|
||||
columns: ColumnsByName::new(&[
|
||||
Column {
|
||||
name: "col1".to_string(),
|
||||
id: ColumnId::new(3),
|
||||
column_type: ColumnType::I64,
|
||||
table_id: TableId::new(2),
|
||||
},
|
||||
Column {
|
||||
name: "col2".to_string(),
|
||||
id: ColumnId::new(4),
|
||||
column_type: ColumnType::String,
|
||||
table_id: TableId::new(2),
|
||||
},
|
||||
Column {
|
||||
name: "col3".to_string(),
|
||||
id: ColumnId::new(5),
|
||||
column_type: ColumnType::F64,
|
||||
table_id: TableId::new(2),
|
||||
},
|
||||
]),
|
||||
},
|
||||
),
|
||||
]);
|
||||
|
|
|
@ -91,17 +91,17 @@ where
|
|||
.await
|
||||
.ok_or_else::<DynError, _>(|| String::from("Cannot find namespace schema").into())?;
|
||||
|
||||
let table_info = namespace_schema
|
||||
let table_schema = namespace_schema
|
||||
.tables
|
||||
.get(&table.name)
|
||||
.ok_or_else::<DynError, _>(|| String::from("Cannot find table info").into())?;
|
||||
.ok_or_else::<DynError, _>(|| String::from("Cannot find table schema").into())?;
|
||||
|
||||
Ok(Arc::new(PartitionInfo {
|
||||
partition_id,
|
||||
namespace_id: table.namespace_id,
|
||||
namespace_name: namespace.name,
|
||||
table: Arc::new(table),
|
||||
table_schema: Arc::new(table_info.schema.clone()),
|
||||
table_schema: Arc::new(table_schema.clone()),
|
||||
sort_key: partition.sort_key(),
|
||||
partition_key: partition.partition_key,
|
||||
}))
|
||||
|
|
|
@ -27,10 +27,7 @@ impl PartitionInfoBuilder {
|
|||
namespace_id,
|
||||
name: String::from("table"),
|
||||
}),
|
||||
table_schema: Arc::new(TableSchema {
|
||||
id: table_id,
|
||||
columns: ColumnsByName::new(&[]),
|
||||
}),
|
||||
table_schema: Arc::new(TableSchema::new(table_id)),
|
||||
sort_key: None,
|
||||
partition_key: PartitionKey::from("key"),
|
||||
},
|
||||
|
@ -54,6 +51,7 @@ impl PartitionInfoBuilder {
|
|||
|
||||
let table_schema = Arc::new(TableSchema {
|
||||
id: self.inner.table.id,
|
||||
partition_template: None,
|
||||
columns: ColumnsByName::new(&columns),
|
||||
});
|
||||
self.inner.table_schema = table_schema;
|
||||
|
|
|
@ -330,7 +330,7 @@ pub struct NamespaceSchema {
|
|||
/// the namespace id
|
||||
pub id: NamespaceId,
|
||||
/// the tables in the namespace by name
|
||||
pub tables: BTreeMap<String, TableInfo>,
|
||||
pub tables: BTreeMap<String, TableSchema>,
|
||||
/// the number of columns per table this namespace allows
|
||||
pub max_columns_per_table: usize,
|
||||
/// The maximum number of tables permitted in this namespace.
|
||||
|
@ -390,41 +390,6 @@ pub struct Table {
|
|||
pub name: String,
|
||||
}
|
||||
|
||||
/// Useful table information to cache, including the table's partition template if any, and the
|
||||
/// table's columns.
|
||||
#[derive(Debug, Clone, Eq, PartialEq)]
|
||||
pub struct TableInfo {
|
||||
/// This table's schema
|
||||
pub schema: TableSchema,
|
||||
/// This table's partition template
|
||||
pub partition_template: Option<Arc<PartitionTemplate>>,
|
||||
}
|
||||
|
||||
impl TableInfo {
|
||||
/// This table's ID
|
||||
pub fn id(&self) -> TableId {
|
||||
self.schema.id
|
||||
}
|
||||
|
||||
/// Estimated Size in bytes including `self`.
|
||||
pub fn size(&self) -> usize {
|
||||
size_of_val(self) + size_of_val(&self.partition_template) + self.schema.size()
|
||||
}
|
||||
}
|
||||
|
||||
impl From<&Table> for TableInfo {
|
||||
fn from(table: &Table) -> Self {
|
||||
let &Table { id, .. } = table;
|
||||
|
||||
Self {
|
||||
schema: TableSchema::new(id),
|
||||
|
||||
// TODO: Store and retrieve PartitionTemplate from the database
|
||||
partition_template: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Column definitions for a table indexed by their name
|
||||
#[derive(Debug, Clone, Eq, PartialEq)]
|
||||
pub struct ColumnsByName(BTreeMap<String, ColumnSchema>);
|
||||
|
@ -518,15 +483,30 @@ impl TryFrom<ColumnsByName> for Schema {
|
|||
pub struct TableSchema {
|
||||
/// the table id
|
||||
pub id: TableId,
|
||||
|
||||
/// the table's partition template
|
||||
pub partition_template: Option<Arc<PartitionTemplate>>,
|
||||
|
||||
/// the table's columns by their name
|
||||
pub columns: ColumnsByName,
|
||||
}
|
||||
|
||||
impl TableSchema {
|
||||
/// Initialize new `TableSchema`
|
||||
/// Initialize new `TableSchema` with the given `TableId`.
|
||||
pub fn new(id: TableId) -> Self {
|
||||
Self {
|
||||
id,
|
||||
partition_template: None,
|
||||
columns: ColumnsByName::new(&[]),
|
||||
}
|
||||
}
|
||||
|
||||
/// Initialize new `TableSchema` from the information in the given `Table`.
|
||||
pub fn new_empty_from(table: &Table) -> Self {
|
||||
Self {
|
||||
id: table.id,
|
||||
// TODO: Store and retrieve PartitionTemplate from the database
|
||||
partition_template: None,
|
||||
columns: ColumnsByName::new(&[]),
|
||||
}
|
||||
}
|
||||
|
@ -3028,10 +3008,12 @@ mod tests {
|
|||
fn test_table_schema_size() {
|
||||
let schema1 = TableSchema {
|
||||
id: TableId::new(1),
|
||||
partition_template: None,
|
||||
columns: ColumnsByName::new(&[]),
|
||||
};
|
||||
let schema2 = TableSchema {
|
||||
id: TableId::new(2),
|
||||
partition_template: None,
|
||||
columns: ColumnsByName::new(&[Column {
|
||||
id: ColumnId::new(1),
|
||||
table_id: TableId::new(2),
|
||||
|
@ -3056,11 +3038,9 @@ mod tests {
|
|||
id: NamespaceId::new(1),
|
||||
tables: BTreeMap::from([(
|
||||
String::from("foo"),
|
||||
TableInfo {
|
||||
schema: TableSchema {
|
||||
id: TableId::new(1),
|
||||
columns: ColumnsByName::new(&[]),
|
||||
},
|
||||
TableSchema {
|
||||
id: TableId::new(1),
|
||||
columns: ColumnsByName::new(&[]),
|
||||
partition_template: None,
|
||||
},
|
||||
)]),
|
||||
|
|
|
@ -2,7 +2,7 @@ use crate::{AggregateTSMMeasurement, AggregateTSMSchema};
|
|||
use chrono::{format::StrftimeItems, offset::FixedOffset, DateTime, Duration};
|
||||
use data_types::{
|
||||
ColumnType, Namespace, NamespaceName, NamespaceSchema, OrgBucketMappingError, Partition,
|
||||
PartitionKey, TableInfo,
|
||||
PartitionKey, TableSchema,
|
||||
};
|
||||
use iox_catalog::interface::{
|
||||
get_schema_by_name, CasFailure, Catalog, RepoCollection, SoftDeletedRows,
|
||||
|
@ -126,12 +126,12 @@ where
|
|||
.tables()
|
||||
.create_or_get(measurement_name, iox_schema.id)
|
||||
.await
|
||||
.map(|t| TableInfo::from(&t))?;
|
||||
.map(|t| TableSchema::new_empty_from(&t))?;
|
||||
let time_col = repos
|
||||
.columns()
|
||||
.create_or_get("time", table.id(), ColumnType::Time)
|
||||
.create_or_get("time", table.id, ColumnType::Time)
|
||||
.await?;
|
||||
table.schema.add_column(&time_col);
|
||||
table.add_column(&time_col);
|
||||
table
|
||||
}
|
||||
};
|
||||
|
@ -140,7 +140,7 @@ where
|
|||
// fields and tags are both columns; tag is a special type of column.
|
||||
// check that the schema has all these columns or update accordingly.
|
||||
for tag in measurement.tags.values() {
|
||||
match table.schema.columns.get(tag.name.as_str()) {
|
||||
match table.columns.get(tag.name.as_str()) {
|
||||
Some(c) if c.is_tag() => {
|
||||
// nothing to do, all good
|
||||
}
|
||||
|
@ -178,7 +178,7 @@ where
|
|||
field.name, field_type, e,
|
||||
))
|
||||
})?);
|
||||
match table.schema.columns.get(field.name.as_str()) {
|
||||
match table.columns.get(field.name.as_str()) {
|
||||
Some(c) if c.matches_type(influx_column_type) => {
|
||||
// nothing to do, all good
|
||||
}
|
||||
|
@ -210,7 +210,7 @@ where
|
|||
// figure it's okay.
|
||||
repos
|
||||
.columns()
|
||||
.create_or_get_many_unchecked(table.id(), column_batch)
|
||||
.create_or_get_many_unchecked(table.id, column_batch)
|
||||
.await?;
|
||||
}
|
||||
// create a partition for every day in the date range.
|
||||
|
@ -223,7 +223,7 @@ where
|
|||
// gets matched as `None`` in the code below
|
||||
let partition = repos
|
||||
.partitions()
|
||||
.create_or_get(partition_key, table.id())
|
||||
.create_or_get(partition_key, table.id)
|
||||
.await
|
||||
.map_err(UpdateCatalogError::CatalogError)?;
|
||||
// get the sort key from the partition, if it exists. create it or update it as
|
||||
|
@ -384,10 +384,10 @@ mod tests {
|
|||
.expect("got schema");
|
||||
assert_eq!(iox_schema.tables.len(), 1);
|
||||
let table = iox_schema.tables.get("cpu").expect("got table");
|
||||
assert_eq!(table.schema.column_count(), 3); // one tag & one field, plus time
|
||||
let tag = table.schema.columns.get("host").expect("got tag");
|
||||
assert_eq!(table.column_count(), 3); // one tag & one field, plus time
|
||||
let tag = table.columns.get("host").expect("got tag");
|
||||
assert!(tag.is_tag());
|
||||
let field = table.schema.columns.get("usage").expect("got field");
|
||||
let field = table.columns.get("usage").expect("got field");
|
||||
assert_eq!(
|
||||
field.column_type,
|
||||
InfluxColumnType::Field(InfluxFieldType::Float)
|
||||
|
@ -395,7 +395,7 @@ mod tests {
|
|||
// check that the partitions were created and the sort keys are correct
|
||||
let partitions = repos
|
||||
.partitions()
|
||||
.list_by_table_id(table.id())
|
||||
.list_by_table_id(table.id)
|
||||
.await
|
||||
.expect("got partitions");
|
||||
// number of days in the date range of the schema
|
||||
|
@ -435,26 +435,26 @@ mod tests {
|
|||
.tables()
|
||||
.create_or_get("weather", namespace.id)
|
||||
.await
|
||||
.map(|t| TableInfo::from(&t))
|
||||
.map(|t| TableSchema::new_empty_from(&t))
|
||||
.expect("table created");
|
||||
let time_col = txn
|
||||
.columns()
|
||||
.create_or_get("time", table.id(), ColumnType::Time)
|
||||
.create_or_get("time", table.id, ColumnType::Time)
|
||||
.await
|
||||
.expect("column created");
|
||||
table.schema.add_column(&time_col);
|
||||
table.add_column(&time_col);
|
||||
let location_col = txn
|
||||
.columns()
|
||||
.create_or_get("city", table.id(), ColumnType::Tag)
|
||||
.create_or_get("city", table.id, ColumnType::Tag)
|
||||
.await
|
||||
.expect("column created");
|
||||
let temperature_col = txn
|
||||
.columns()
|
||||
.create_or_get("temperature", table.id(), ColumnType::F64)
|
||||
.create_or_get("temperature", table.id, ColumnType::F64)
|
||||
.await
|
||||
.expect("column created");
|
||||
table.schema.add_column(&location_col);
|
||||
table.schema.add_column(&temperature_col);
|
||||
table.add_column(&location_col);
|
||||
table.add_column(&temperature_col);
|
||||
txn.commit().await.unwrap();
|
||||
|
||||
// merge with aggregate schema that has some overlap
|
||||
|
@ -491,17 +491,17 @@ mod tests {
|
|||
.expect("got schema");
|
||||
assert_eq!(iox_schema.tables.len(), 1);
|
||||
let table = iox_schema.tables.get("weather").expect("got table");
|
||||
assert_eq!(table.schema.column_count(), 5); // two tags, two fields, plus time
|
||||
let tag1 = table.schema.columns.get("city").expect("got tag");
|
||||
assert_eq!(table.column_count(), 5); // two tags, two fields, plus time
|
||||
let tag1 = table.columns.get("city").expect("got tag");
|
||||
assert!(tag1.is_tag());
|
||||
let tag2 = table.schema.columns.get("country").expect("got tag");
|
||||
let tag2 = table.columns.get("country").expect("got tag");
|
||||
assert!(tag2.is_tag());
|
||||
let field1 = table.schema.columns.get("temperature").expect("got field");
|
||||
let field1 = table.columns.get("temperature").expect("got field");
|
||||
assert_eq!(
|
||||
field1.column_type,
|
||||
InfluxColumnType::Field(InfluxFieldType::Float)
|
||||
);
|
||||
let field2 = table.schema.columns.get("humidity").expect("got field");
|
||||
let field2 = table.columns.get("humidity").expect("got field");
|
||||
assert_eq!(
|
||||
field2.column_type,
|
||||
InfluxColumnType::Field(InfluxFieldType::Float)
|
||||
|
@ -527,20 +527,20 @@ mod tests {
|
|||
.tables()
|
||||
.create_or_get("weather", namespace.id)
|
||||
.await
|
||||
.map(|t| TableInfo::from(&t))
|
||||
.map(|t| TableSchema::new_empty_from(&t))
|
||||
.expect("table created");
|
||||
let time_col = txn
|
||||
.columns()
|
||||
.create_or_get("time", table.id(), ColumnType::Time)
|
||||
.create_or_get("time", table.id, ColumnType::Time)
|
||||
.await
|
||||
.expect("column created");
|
||||
table.schema.add_column(&time_col);
|
||||
table.add_column(&time_col);
|
||||
let temperature_col = txn
|
||||
.columns()
|
||||
.create_or_get("temperature", table.id(), ColumnType::F64)
|
||||
.create_or_get("temperature", table.id, ColumnType::F64)
|
||||
.await
|
||||
.expect("column created");
|
||||
table.schema.add_column(&temperature_col);
|
||||
table.add_column(&temperature_col);
|
||||
txn.commit().await.unwrap();
|
||||
|
||||
// merge with aggregate schema that has some issue that will trip a catalog error
|
||||
|
@ -592,20 +592,20 @@ mod tests {
|
|||
.tables()
|
||||
.create_or_get("weather", namespace.id)
|
||||
.await
|
||||
.map(|t| TableInfo::from(&t))
|
||||
.map(|t| TableSchema::new_empty_from(&t))
|
||||
.expect("table created");
|
||||
let time_col = txn
|
||||
.columns()
|
||||
.create_or_get("time", table.id(), ColumnType::Time)
|
||||
.create_or_get("time", table.id, ColumnType::Time)
|
||||
.await
|
||||
.expect("column created");
|
||||
table.schema.add_column(&time_col);
|
||||
table.add_column(&time_col);
|
||||
let temperature_col = txn
|
||||
.columns()
|
||||
.create_or_get("temperature", table.id(), ColumnType::F64)
|
||||
.create_or_get("temperature", table.id, ColumnType::F64)
|
||||
.await
|
||||
.expect("column created");
|
||||
table.schema.add_column(&temperature_col);
|
||||
table.add_column(&temperature_col);
|
||||
txn.commit().await.unwrap();
|
||||
|
||||
// merge with aggregate schema that has some issue that will trip a catalog error
|
||||
|
|
|
@ -4,7 +4,7 @@ use async_trait::async_trait;
|
|||
use data_types::{
|
||||
Column, ColumnType, ColumnsByName, CompactionLevel, Namespace, NamespaceId, NamespaceSchema,
|
||||
ParquetFile, ParquetFileId, ParquetFileParams, Partition, PartitionId, PartitionKey,
|
||||
SkippedCompaction, Table, TableId, TableInfo, Timestamp,
|
||||
SkippedCompaction, Table, TableId, TableSchema, Timestamp,
|
||||
};
|
||||
use iox_time::TimeProvider;
|
||||
use snafu::{OptionExt, Snafu};
|
||||
|
@ -590,18 +590,18 @@ where
|
|||
|
||||
let mut namespace = NamespaceSchema::new_empty_from(&namespace);
|
||||
|
||||
let mut table_id_to_info = BTreeMap::new();
|
||||
let mut table_id_to_schema = BTreeMap::new();
|
||||
for t in tables {
|
||||
let table_info = TableInfo::from(&t);
|
||||
table_id_to_info.insert(t.id, (t.name, table_info));
|
||||
let table_schema = TableSchema::new_empty_from(&t);
|
||||
table_id_to_schema.insert(t.id, (t.name, table_schema));
|
||||
}
|
||||
|
||||
for c in columns {
|
||||
let (_, t) = table_id_to_info.get_mut(&c.table_id).unwrap();
|
||||
t.schema.add_column(&c);
|
||||
let (_, t) = table_id_to_schema.get_mut(&c.table_id).unwrap();
|
||||
t.add_column(&c);
|
||||
}
|
||||
|
||||
for (_, (table_name, schema)) in table_id_to_info {
|
||||
for (_, (table_name, schema)) in table_id_to_schema {
|
||||
namespace.tables.insert(table_name, schema);
|
||||
}
|
||||
|
||||
|
@ -684,23 +684,23 @@ pub async fn list_schemas(
|
|||
});
|
||||
|
||||
// A set of tables within a single namespace.
|
||||
type NamespaceTables = BTreeMap<String, TableInfo>;
|
||||
type NamespaceTables = BTreeMap<String, TableSchema>;
|
||||
|
||||
let mut joined = HashMap::<NamespaceId, NamespaceTables>::default();
|
||||
for column in columns {
|
||||
// Resolve the table this column references
|
||||
let table = tables.get(&column.table_id).expect("no table for column");
|
||||
|
||||
let table_info = joined
|
||||
let table_schema = joined
|
||||
// Find or create a record in the joined <NamespaceId, Tables> map
|
||||
// for this namespace ID.
|
||||
.entry(table.namespace_id)
|
||||
.or_default()
|
||||
// Fetch the schema record for this table, or create an empty one.
|
||||
.entry(table.name.clone())
|
||||
.or_insert_with(|| TableInfo::from(table));
|
||||
.or_insert_with(|| TableSchema::new_empty_from(table));
|
||||
|
||||
table_info.schema.add_column(&column);
|
||||
table_schema.add_column(&column);
|
||||
}
|
||||
|
||||
// The table map is no longer needed - immediately reclaim the memory.
|
||||
|
|
|
@ -14,7 +14,7 @@
|
|||
)]
|
||||
|
||||
use crate::interface::{ColumnTypeMismatchSnafu, Error, RepoCollection, Result};
|
||||
use data_types::{ColumnType, NamespaceSchema, TableInfo};
|
||||
use data_types::{ColumnType, NamespaceSchema, TableSchema};
|
||||
use mutable_batch::MutableBatch;
|
||||
use std::{borrow::Cow, collections::HashMap};
|
||||
use thiserror::Error;
|
||||
|
@ -118,15 +118,15 @@ where
|
|||
.tables()
|
||||
.create_or_get(table_name, schema.id)
|
||||
.await
|
||||
.map(|t| TableInfo::from(&t))?;
|
||||
.map(|t| TableSchema::new_empty_from(&t))?;
|
||||
|
||||
// Always add a time column to all new tables.
|
||||
let time_col = repos
|
||||
.columns()
|
||||
.create_or_get(TIME_COLUMN, table.id(), ColumnType::Time)
|
||||
.create_or_get(TIME_COLUMN, table.id, ColumnType::Time)
|
||||
.await?;
|
||||
|
||||
table.schema.add_column(&time_col);
|
||||
table.add_column(&time_col);
|
||||
|
||||
assert!(schema
|
||||
.to_mut()
|
||||
|
@ -152,7 +152,7 @@ where
|
|||
// If it does, validate it. If it does not exist, create it and insert
|
||||
// it into the cached schema.
|
||||
|
||||
match table.schema.columns.get(name.as_str()) {
|
||||
match table.columns.get(name.as_str()) {
|
||||
Some(existing) if existing.matches_type(col.influx_type()) => {
|
||||
// No action is needed as the column matches the existing column
|
||||
// schema.
|
||||
|
@ -182,10 +182,10 @@ where
|
|||
if !column_batch.is_empty() {
|
||||
repos
|
||||
.columns()
|
||||
.create_or_get_many_unchecked(table.id(), column_batch)
|
||||
.create_or_get_many_unchecked(table.id, column_batch)
|
||||
.await?
|
||||
.into_iter()
|
||||
.for_each(|c| table.to_mut().schema.add_column(&c));
|
||||
.for_each(|c| table.to_mut().add_column(&c));
|
||||
}
|
||||
|
||||
if let Cow::Owned(table) = table {
|
||||
|
@ -288,9 +288,8 @@ mod tests {
|
|||
let actual_tables: BTreeMap<String, BTreeMap<String, ColumnType>> = schema
|
||||
.tables
|
||||
.iter()
|
||||
.map(|(table, table_info)| {
|
||||
let desired_cols = table_info
|
||||
.schema
|
||||
.map(|(table, table_schema)| {
|
||||
let desired_cols = table_schema
|
||||
.columns
|
||||
.iter()
|
||||
.map(|(column, column_schema)| (column.clone(), column_schema.column_type))
|
||||
|
|
|
@ -346,6 +346,7 @@ impl TestTable {
|
|||
pub async fn catalog_schema(&self) -> TableSchema {
|
||||
TableSchema {
|
||||
id: self.table.id,
|
||||
partition_template: None,
|
||||
columns: self.catalog_columns().await,
|
||||
}
|
||||
}
|
||||
|
|
|
@ -13,7 +13,7 @@ use cache_system::{
|
|||
loader::{metrics::MetricsLoader, FunctionLoader},
|
||||
resource_consumption::FunctionEstimator,
|
||||
};
|
||||
use data_types::{ColumnId, NamespaceId, NamespaceSchema, TableId, TableInfo};
|
||||
use data_types::{ColumnId, NamespaceId, NamespaceSchema, TableId, TableSchema};
|
||||
use iox_catalog::interface::{get_schema_by_name, Catalog, SoftDeletedRows};
|
||||
use iox_time::TimeProvider;
|
||||
use schema::Schema;
|
||||
|
@ -238,19 +238,17 @@ impl CachedTable {
|
|||
}
|
||||
}
|
||||
|
||||
impl From<TableInfo> for CachedTable {
|
||||
fn from(table: TableInfo) -> Self {
|
||||
impl From<TableSchema> for CachedTable {
|
||||
fn from(table: TableSchema) -> Self {
|
||||
let mut column_id_map: HashMap<ColumnId, Arc<str>> = table
|
||||
.schema
|
||||
.column_id_map()
|
||||
.iter()
|
||||
.map(|(&c_id, &name)| (c_id, Arc::from(name)))
|
||||
.collect();
|
||||
column_id_map.shrink_to_fit();
|
||||
|
||||
let id = table.id();
|
||||
let id = table.id;
|
||||
let schema: Schema = table
|
||||
.schema
|
||||
.columns
|
||||
.try_into()
|
||||
.expect("Catalog table schema broken");
|
||||
|
|
|
@ -33,7 +33,7 @@ pub async fn querier_table(catalog: &Arc<TestCatalog>, table: &Arc<TestTable>) -
|
|||
.await
|
||||
.unwrap();
|
||||
let table_info = catalog_schema.tables.remove(&table.table.name).unwrap();
|
||||
let schema = Schema::try_from(table_info.schema.columns).unwrap();
|
||||
let schema = Schema::try_from(table_info.columns).unwrap();
|
||||
|
||||
let namespace_name = Arc::from(table.namespace.namespace.name.as_str());
|
||||
|
||||
|
|
|
@ -304,7 +304,7 @@ where
|
|||
.into_iter()
|
||||
.map(|(name, data)| {
|
||||
let table = latest_schema.tables.get(&name).unwrap();
|
||||
let id = table.id();
|
||||
let id = table.id;
|
||||
let table_partition_template = table.partition_template.clone();
|
||||
|
||||
(id, (name, table_partition_template, data))
|
||||
|
@ -372,7 +372,7 @@ fn validate_schema_limits(
|
|||
for (table_name, batch) in batches {
|
||||
// Get the column set for this table from the schema.
|
||||
let mut existing_columns = match schema.tables.get(table_name) {
|
||||
Some(v) => v.schema.column_names(),
|
||||
Some(v) => v.column_names(),
|
||||
None if batch.columns().len() > schema.max_columns_per_table => {
|
||||
// The table does not exist, therefore all the columns in this
|
||||
// write must be created - there's no need to perform a set
|
||||
|
@ -774,7 +774,6 @@ mod tests {
|
|||
let table = ns.tables.get(table).expect("table should exist in cache");
|
||||
assert_eq!(
|
||||
table
|
||||
.schema
|
||||
.columns
|
||||
.get(col)
|
||||
.expect("column not cached")
|
||||
|
|
|
@ -59,7 +59,7 @@ impl NamespaceCache for Arc<MemoryNamespaceCache> {
|
|||
new_columns: schema
|
||||
.tables
|
||||
.values()
|
||||
.map(|v| v.schema.column_count())
|
||||
.map(|v| v.column_count())
|
||||
.sum::<usize>(),
|
||||
new_tables: schema.tables.len(),
|
||||
did_update: false,
|
||||
|
@ -100,12 +100,12 @@ fn merge_schema_additive(
|
|||
// to 0 as the schemas become fully populated, leaving the common path free
|
||||
// of overhead.
|
||||
for (old_table_name, old_table) in &old_ns.tables {
|
||||
old_column_count += old_table.schema.column_count();
|
||||
old_column_count += old_table.column_count();
|
||||
match new_ns.tables.get_mut(old_table_name) {
|
||||
Some(new_table) => {
|
||||
for (column_name, column) in old_table.schema.columns.iter() {
|
||||
if !new_table.schema.contains_column_name(column_name) {
|
||||
new_table.schema.add_column_schema(column_name, column);
|
||||
for (column_name, column) in old_table.columns.iter() {
|
||||
if !new_table.contains_column_name(column_name) {
|
||||
new_table.add_column_schema(column_name, column);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -125,7 +125,7 @@ fn merge_schema_additive(
|
|||
new_columns: new_ns
|
||||
.tables
|
||||
.values()
|
||||
.map(|v| v.schema.column_count())
|
||||
.map(|v| v.column_count())
|
||||
.sum::<usize>()
|
||||
- old_column_count,
|
||||
did_update: true,
|
||||
|
@ -139,7 +139,7 @@ mod tests {
|
|||
|
||||
use assert_matches::assert_matches;
|
||||
use data_types::{
|
||||
Column, ColumnId, ColumnSchema, ColumnType, ColumnsByName, NamespaceId, TableId, TableInfo,
|
||||
Column, ColumnId, ColumnSchema, ColumnType, ColumnsByName, NamespaceId, TableId,
|
||||
TableSchema,
|
||||
};
|
||||
use proptest::{prelude::*, prop_compose, proptest};
|
||||
|
@ -225,25 +225,16 @@ mod tests {
|
|||
// These MUST always be different
|
||||
assert_ne!(first_write_table_schema, second_write_table_schema);
|
||||
|
||||
let first_write_table_info = TableInfo {
|
||||
schema: first_write_table_schema,
|
||||
partition_template: None,
|
||||
};
|
||||
let second_write_table_info = TableInfo {
|
||||
schema: second_write_table_schema,
|
||||
partition_template: None,
|
||||
};
|
||||
|
||||
let schema_update_1 = NamespaceSchema {
|
||||
id: NamespaceId::new(42),
|
||||
tables: BTreeMap::from([(String::from(table_name), first_write_table_info)]),
|
||||
tables: BTreeMap::from([(String::from(table_name), first_write_table_schema)]),
|
||||
max_columns_per_table: 50,
|
||||
max_tables: 24,
|
||||
retention_period_ns: None,
|
||||
partition_template: None,
|
||||
};
|
||||
let schema_update_2 = NamespaceSchema {
|
||||
tables: BTreeMap::from([(String::from(table_name), second_write_table_info)]),
|
||||
tables: BTreeMap::from([(String::from(table_name), second_write_table_schema)]),
|
||||
..schema_update_1.clone()
|
||||
};
|
||||
|
||||
|
@ -251,10 +242,6 @@ mod tests {
|
|||
let mut want_table_schema = TableSchema::new(table_id);
|
||||
want_table_schema.add_column(&column_1);
|
||||
want_table_schema.add_column(&column_2);
|
||||
let want_table_schema = TableInfo {
|
||||
schema: want_table_schema,
|
||||
partition_template: None,
|
||||
};
|
||||
NamespaceSchema {
|
||||
tables: BTreeMap::from([(String::from(table_name), want_table_schema)]),
|
||||
..schema_update_1.clone()
|
||||
|
@ -311,10 +298,6 @@ mod tests {
|
|||
name: "column_a".to_string(),
|
||||
column_type: ColumnType::String,
|
||||
});
|
||||
let table_1 = TableInfo {
|
||||
schema: table_1,
|
||||
partition_template: None,
|
||||
};
|
||||
let mut table_2 = TableSchema::new(TableId::new(2));
|
||||
table_2.add_column(&Column {
|
||||
id: ColumnId::new(2),
|
||||
|
@ -322,10 +305,6 @@ mod tests {
|
|||
name: "column_b".to_string(),
|
||||
column_type: ColumnType::String,
|
||||
});
|
||||
let table_2 = TableInfo {
|
||||
schema: table_2,
|
||||
partition_template: None,
|
||||
};
|
||||
let mut table_3 = TableSchema::new(TableId::new(3));
|
||||
table_3.add_column(&Column {
|
||||
id: ColumnId::new(3),
|
||||
|
@ -333,10 +312,6 @@ mod tests {
|
|||
name: "column_c".to_string(),
|
||||
column_type: ColumnType::String,
|
||||
});
|
||||
let table_3 = TableInfo {
|
||||
schema: table_3,
|
||||
partition_template: None,
|
||||
};
|
||||
|
||||
let schema_update_1 = NamespaceSchema {
|
||||
id: NamespaceId::new(42),
|
||||
|
@ -415,7 +390,7 @@ mod tests {
|
|||
}
|
||||
|
||||
prop_compose! {
|
||||
/// Generate an arbitrary TableInfo with up to 10 columns.
|
||||
/// Generate an arbitrary TableSchema with up to 10 columns.
|
||||
fn arbitrary_table_schema()(
|
||||
id in any::<i64>(),
|
||||
columns in proptest::collection::btree_map(
|
||||
|
@ -423,11 +398,12 @@ mod tests {
|
|||
arbitrary_column_schema(),
|
||||
(0, 10) // Set size range
|
||||
),
|
||||
) -> TableInfo {
|
||||
) -> TableSchema {
|
||||
let columns = ColumnsByName::from(columns);
|
||||
TableInfo {
|
||||
schema: TableSchema { id: TableId::new(id), columns },
|
||||
TableSchema {
|
||||
id: TableId::new(id),
|
||||
partition_template: None,
|
||||
columns,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -463,7 +439,6 @@ mod tests {
|
|||
.flat_map(|(table_name, col_set)| {
|
||||
// Build a set of tuples in the form (table_name, column_name)
|
||||
col_set
|
||||
.schema
|
||||
.columns
|
||||
.names()
|
||||
.into_iter()
|
||||
|
|
|
@ -127,7 +127,7 @@ where
|
|||
mod tests {
|
||||
use assert_matches::assert_matches;
|
||||
use data_types::{
|
||||
Column, ColumnId, ColumnType, ColumnsByName, NamespaceId, TableId, TableInfo, TableSchema,
|
||||
Column, ColumnId, ColumnType, ColumnsByName, NamespaceId, TableId, TableSchema,
|
||||
};
|
||||
use metric::{Attributes, MetricObserver, Observation};
|
||||
|
||||
|
@ -153,12 +153,10 @@ mod tests {
|
|||
|
||||
(
|
||||
i.to_string(),
|
||||
TableInfo {
|
||||
schema: TableSchema {
|
||||
id: TableId::new(i as _),
|
||||
columns: ColumnsByName::new(&columns),
|
||||
},
|
||||
TableSchema {
|
||||
id: TableId::new(i as _),
|
||||
partition_template: None,
|
||||
columns: ColumnsByName::new(&columns),
|
||||
},
|
||||
)
|
||||
})
|
||||
|
|
|
@ -55,9 +55,8 @@ fn schema_to_proto(schema: Arc<data_types::NamespaceSchema>) -> GetSchemaRespons
|
|||
(
|
||||
name.clone(),
|
||||
TableSchema {
|
||||
id: t.id().get(),
|
||||
id: t.id.get(),
|
||||
columns: t
|
||||
.schema
|
||||
.columns
|
||||
.iter()
|
||||
.map(|(name, c)| {
|
||||
|
|
Loading…
Reference in New Issue