feat: move all `get_by_id` methods to take reference in schema
parent
54cc9796a2
commit
c6edccbad8
|
@ -170,16 +170,16 @@ impl Catalog {
|
|||
self.inner.read().db_map.get_by_right(db_name).copied()
|
||||
}
|
||||
|
||||
pub fn db_id_to_name(&self, db_id: DbId) -> Option<Arc<str>> {
|
||||
self.inner.read().db_map.get_by_left(&db_id).map(Arc::clone)
|
||||
pub fn db_id_to_name(&self, db_id: &DbId) -> Option<Arc<str>> {
|
||||
self.inner.read().db_map.get_by_left(db_id).map(Arc::clone)
|
||||
}
|
||||
|
||||
pub fn db_schema(&self, db_name: &str) -> Option<Arc<DatabaseSchema>> {
|
||||
self.db_schema_and_id(db_name).map(|(_, schema)| schema)
|
||||
}
|
||||
|
||||
pub fn db_schema_by_id(&self, db_id: DbId) -> Option<Arc<DatabaseSchema>> {
|
||||
self.inner.read().databases.get(&db_id).cloned()
|
||||
pub fn db_schema_by_id(&self, db_id: &DbId) -> Option<Arc<DatabaseSchema>> {
|
||||
self.inner.read().databases.get(db_id).cloned()
|
||||
}
|
||||
|
||||
pub fn db_schema_and_id(&self, db_name: &str) -> Option<(DbId, Arc<DatabaseSchema>)> {
|
||||
|
@ -582,9 +582,9 @@ impl DatabaseSchema {
|
|||
.map(|(_, schema)| schema.clone())
|
||||
}
|
||||
|
||||
pub fn table_schema_by_id(&self, table_id: TableId) -> Option<Schema> {
|
||||
pub fn table_schema_by_id(&self, table_id: &TableId) -> Option<Schema> {
|
||||
self.tables
|
||||
.get(&table_id)
|
||||
.get(table_id)
|
||||
.map(|table| table.influx_schema())
|
||||
.cloned()
|
||||
}
|
||||
|
@ -611,8 +611,8 @@ impl DatabaseSchema {
|
|||
.and_then(|table_id| self.tables.get(table_id).cloned())
|
||||
}
|
||||
|
||||
pub fn table_definition_by_id(&self, table_id: TableId) -> Option<Arc<TableDefinition>> {
|
||||
self.tables.get(&table_id).cloned()
|
||||
pub fn table_definition_by_id(&self, table_id: &TableId) -> Option<Arc<TableDefinition>> {
|
||||
self.tables.get(table_id).cloned()
|
||||
}
|
||||
|
||||
pub fn table_definition_and_id(
|
||||
|
@ -636,8 +636,8 @@ impl DatabaseSchema {
|
|||
.collect()
|
||||
}
|
||||
|
||||
pub fn table_exists(&self, table_id: TableId) -> bool {
|
||||
self.tables.contains_key(&table_id)
|
||||
pub fn table_exists(&self, table_id: &TableId) -> bool {
|
||||
self.tables.contains_key(table_id)
|
||||
}
|
||||
|
||||
pub fn tables(&self) -> impl Iterator<Item = Arc<TableDefinition>> + use<'_> {
|
||||
|
@ -648,8 +648,8 @@ impl DatabaseSchema {
|
|||
self.table_map.get_by_right(&table_name.into()).copied()
|
||||
}
|
||||
|
||||
pub fn table_id_to_name(&self, table_id: TableId) -> Option<Arc<str>> {
|
||||
self.table_map.get_by_left(&table_id).map(Arc::clone)
|
||||
pub fn table_id_to_name(&self, table_id: &TableId) -> Option<Arc<str>> {
|
||||
self.table_map.get_by_left(table_id).map(Arc::clone)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -959,10 +959,10 @@ impl TableDefinition {
|
|||
.expect("Column exists in mapping")
|
||||
}
|
||||
|
||||
pub fn column_id_to_name_unchecked(&self, id: ColumnId) -> Arc<str> {
|
||||
pub fn column_id_to_name_unchecked(&self, id: &ColumnId) -> Arc<str> {
|
||||
Arc::clone(
|
||||
self.column_map
|
||||
.get_by_left(&id)
|
||||
.get_by_left(id)
|
||||
.expect("Column exists in mapping"),
|
||||
)
|
||||
}
|
||||
|
@ -1254,7 +1254,7 @@ mod tests {
|
|||
let table = database.tables.get_mut(&TableId::from(0)).unwrap();
|
||||
println!("table: {table:#?}");
|
||||
assert_eq!(table.column_map.len(), 1);
|
||||
assert_eq!(table.column_id_to_name_unchecked(0.into()), "test".into());
|
||||
assert_eq!(table.column_id_to_name_unchecked(&0.into()), "test".into());
|
||||
|
||||
Arc::make_mut(table)
|
||||
.add_columns(vec![(
|
||||
|
|
|
@ -107,7 +107,7 @@ fn from_last_cache_definitions(
|
|||
|
||||
for cache_defn in cache_defns {
|
||||
let table_defn = db_schema
|
||||
.table_definition_by_id(cache_defn.table_id)
|
||||
.table_definition_by_id(&cache_defn.table_id)
|
||||
.expect("table should exist for last cache");
|
||||
|
||||
table_name_arr.append_value(&cache_defn.table);
|
||||
|
|
|
@ -95,7 +95,7 @@ impl IoxSystemTable for ParquetFilesTable {
|
|||
self.db_id,
|
||||
self.buffer
|
||||
.catalog()
|
||||
.db_schema_by_id(self.db_id)
|
||||
.db_schema_by_id(&self.db_id)
|
||||
.expect("db exists")
|
||||
.table_name_to_id(table_name.as_str())
|
||||
.expect("table exists"),
|
||||
|
|
|
@ -226,9 +226,9 @@ impl LastCacheProvider {
|
|||
.flat_map(|(table_id, table_map)| {
|
||||
let table_name = self
|
||||
.catalog
|
||||
.db_schema_by_id(db)
|
||||
.db_schema_by_id(&db)
|
||||
.expect("db exists")
|
||||
.table_id_to_name(*table_id)
|
||||
.table_id_to_name(table_id)
|
||||
.expect("table exists");
|
||||
table_map.iter().map(move |(lc_name, lc)| {
|
||||
lc.to_definition(*table_id, table_name.as_ref(), Arc::clone(lc_name))
|
||||
|
@ -505,13 +505,13 @@ impl LastCacheProvider {
|
|||
if db_cache.is_empty() {
|
||||
continue;
|
||||
}
|
||||
let Some(db_schema) = self.catalog.db_schema_by_id(batch.database_id)
|
||||
let Some(db_schema) = self.catalog.db_schema_by_id(&batch.database_id)
|
||||
else {
|
||||
continue;
|
||||
};
|
||||
for (table_id, table_chunks) in &batch.table_chunks {
|
||||
if let Some(table_cache) = db_cache.get_mut(table_id) {
|
||||
let Some(table_def) = db_schema.table_definition_by_id(*table_id)
|
||||
let Some(table_def) = db_schema.table_definition_by_id(table_id)
|
||||
else {
|
||||
continue;
|
||||
};
|
||||
|
@ -552,8 +552,8 @@ impl LastCacheProvider {
|
|||
) -> Option<Result<Vec<RecordBatch>, ArrowError>> {
|
||||
let table_def = self
|
||||
.catalog
|
||||
.db_schema_by_id(db_id)
|
||||
.and_then(|db| db.table_definition_by_id(table_id))
|
||||
.db_schema_by_id(&db_id)
|
||||
.and_then(|db| db.table_definition_by_id(&table_id))
|
||||
.expect("valid db and table ids to get table definition");
|
||||
|
||||
self.cache_map
|
||||
|
|
|
@ -100,7 +100,7 @@ impl TableFunctionImpl for LastCacheFunction {
|
|||
let Some(table_def) = self
|
||||
.provider
|
||||
.catalog
|
||||
.db_schema_by_id(self.db_id)
|
||||
.db_schema_by_id(&self.db_id)
|
||||
.expect("db exists")
|
||||
.table_definition(table_name.as_str())
|
||||
else {
|
||||
|
|
|
@ -466,10 +466,10 @@ impl LastCacheManager for WriteBufferImpl {
|
|||
let cache_name = cache_name.map(Into::into);
|
||||
let catalog = self.catalog();
|
||||
let db_schema = catalog
|
||||
.db_schema_by_id(db_id)
|
||||
.db_schema_by_id(&db_id)
|
||||
.ok_or(Error::DbDoesNotExist)?;
|
||||
let table_def = db_schema
|
||||
.table_definition_by_id(table_id)
|
||||
.table_definition_by_id(&table_id)
|
||||
.ok_or(Error::TableDoesNotExist)?;
|
||||
|
||||
if let Some(info) = self.last_cache.create_cache(CreateCacheArguments {
|
||||
|
@ -503,7 +503,7 @@ impl LastCacheManager for WriteBufferImpl {
|
|||
cache_name: &str,
|
||||
) -> crate::Result<(), self::Error> {
|
||||
let catalog = self.catalog();
|
||||
let db_schema = catalog.db_schema_by_id(db_id).expect("db should exist");
|
||||
let db_schema = catalog.db_schema_by_id(&db_id).expect("db should exist");
|
||||
self.last_cache.delete_cache(db_id, tbl_id, cache_name)?;
|
||||
catalog.delete_last_cache(db_id, tbl_id, cache_name);
|
||||
|
||||
|
@ -516,7 +516,7 @@ impl LastCacheManager for WriteBufferImpl {
|
|||
database_name: Arc::clone(&db_schema.name),
|
||||
ops: vec![CatalogOp::DeleteLastCache(LastCacheDelete {
|
||||
table_id: tbl_id,
|
||||
table_name: db_schema.table_id_to_name(tbl_id).expect("table exists"),
|
||||
table_name: db_schema.table_id_to_name(&tbl_id).expect("table exists"),
|
||||
name: cache_name.into(),
|
||||
})],
|
||||
})])
|
||||
|
@ -569,7 +569,7 @@ mod tests {
|
|||
.unwrap()
|
||||
.convert_lines_to_buffer(Gen1Duration::new_5m());
|
||||
|
||||
let db = catalog.db_schema_by_id(DbId::from(0)).unwrap();
|
||||
let db = catalog.db_schema_by_id(&DbId::from(0)).unwrap();
|
||||
|
||||
assert_eq!(db.tables.len(), 2);
|
||||
// cpu table
|
||||
|
|
|
@ -147,17 +147,17 @@ impl QueryableBuffer {
|
|||
let mut persisting_chunks = vec![];
|
||||
let catalog = Arc::clone(&buffer.catalog);
|
||||
for (database_id, table_map) in buffer.db_to_table.iter_mut() {
|
||||
let db_schema = catalog.db_schema_by_id(*database_id).expect("db exists");
|
||||
let db_schema = catalog.db_schema_by_id(database_id).expect("db exists");
|
||||
for (table_id, table_buffer) in table_map.iter_mut() {
|
||||
let table_def = db_schema
|
||||
.table_definition_by_id(*table_id)
|
||||
.table_definition_by_id(table_id)
|
||||
.expect("table exists");
|
||||
let snapshot_chunks =
|
||||
table_buffer.snapshot(table_def, snapshot_details.end_time_marker);
|
||||
|
||||
for chunk in snapshot_chunks {
|
||||
let table_name =
|
||||
db_schema.table_id_to_name(*table_id).expect("table exists");
|
||||
db_schema.table_id_to_name(table_id).expect("table exists");
|
||||
let persist_job = PersistJob {
|
||||
database_id: *database_id,
|
||||
table_id: *table_id,
|
||||
|
@ -372,14 +372,14 @@ impl BufferState {
|
|||
|
||||
let db_schema = self
|
||||
.catalog
|
||||
.db_schema_by_id(catalog_batch.database_id)
|
||||
.db_schema_by_id(&catalog_batch.database_id)
|
||||
.expect("database should exist");
|
||||
|
||||
for op in catalog_batch.ops {
|
||||
match op {
|
||||
CatalogOp::CreateLastCache(definition) => {
|
||||
let table_def = db_schema
|
||||
.table_definition_by_id(definition.table_id)
|
||||
.table_definition_by_id(&definition.table_id)
|
||||
.expect("table should exist");
|
||||
last_cache_provider.create_cache_from_definition(
|
||||
db_schema.id,
|
||||
|
@ -408,14 +408,14 @@ impl BufferState {
|
|||
fn add_write_batch(&mut self, write_batch: WriteBatch) {
|
||||
let db_schema = self
|
||||
.catalog
|
||||
.db_schema_by_id(write_batch.database_id)
|
||||
.db_schema_by_id(&write_batch.database_id)
|
||||
.expect("database should exist");
|
||||
let database_buffer = self.db_to_table.entry(write_batch.database_id).or_default();
|
||||
|
||||
for (table_id, table_chunks) in write_batch.table_chunks {
|
||||
let table_buffer = database_buffer.entry(table_id).or_insert_with(|| {
|
||||
let table_def = db_schema
|
||||
.table_definition_by_id(table_id)
|
||||
.table_definition_by_id(&table_id)
|
||||
.expect("table should exist");
|
||||
// TODO: can we have the primary key stored on the table definition (we already have
|
||||
// the series key, so that doesn't seem like too much of a stretch).
|
||||
|
|
Loading…
Reference in New Issue