From c6edccbad8e3eba7f4b945b6a2dfbba952117bd8 Mon Sep 17 00:00:00 2001 From: Praveen Kumar <pkumar@influxdata.com> Date: Fri, 8 Nov 2024 15:03:16 +0000 Subject: [PATCH] feat: move all `get_by_id` methods to take reference in schema --- influxdb3_catalog/src/catalog.rs | 30 +++++++++---------- .../src/system_tables/last_caches.rs | 2 +- .../src/system_tables/parquet_files.rs | 2 +- influxdb3_write/src/last_cache/mod.rs | 12 ++++---- .../src/last_cache/table_function.rs | 2 +- influxdb3_write/src/write_buffer/mod.rs | 10 +++---- .../src/write_buffer/queryable_buffer.rs | 14 ++++----- 7 files changed, 36 insertions(+), 36 deletions(-) diff --git a/influxdb3_catalog/src/catalog.rs b/influxdb3_catalog/src/catalog.rs index c6c3354209..c6270a60a2 100644 --- a/influxdb3_catalog/src/catalog.rs +++ b/influxdb3_catalog/src/catalog.rs @@ -170,16 +170,16 @@ impl Catalog { self.inner.read().db_map.get_by_right(db_name).copied() } - pub fn db_id_to_name(&self, db_id: DbId) -> Option<Arc<str>> { - self.inner.read().db_map.get_by_left(&db_id).map(Arc::clone) + pub fn db_id_to_name(&self, db_id: &DbId) -> Option<Arc<str>> { + self.inner.read().db_map.get_by_left(db_id).map(Arc::clone) } pub fn db_schema(&self, db_name: &str) -> Option<Arc<DatabaseSchema>> { self.db_schema_and_id(db_name).map(|(_, schema)| schema) } - pub fn db_schema_by_id(&self, db_id: DbId) -> Option<Arc<DatabaseSchema>> { - self.inner.read().databases.get(&db_id).cloned() + pub fn db_schema_by_id(&self, db_id: &DbId) -> Option<Arc<DatabaseSchema>> { + self.inner.read().databases.get(db_id).cloned() } pub fn db_schema_and_id(&self, db_name: &str) -> Option<(DbId, Arc<DatabaseSchema>)> { @@ -582,9 +582,9 @@ impl DatabaseSchema { .map(|(_, schema)| schema.clone()) } - pub fn table_schema_by_id(&self, table_id: TableId) -> Option<Schema> { + pub fn table_schema_by_id(&self, table_id: &TableId) -> Option<Schema> { self.tables - .get(&table_id) + .get(table_id) .map(|table| table.influx_schema()) .cloned() } @@ -611,8 +611,8 @@ impl DatabaseSchema { .and_then(|table_id| self.tables.get(table_id).cloned()) } - pub fn table_definition_by_id(&self, table_id: TableId) -> Option<Arc<TableDefinition>> { - self.tables.get(&table_id).cloned() + pub fn table_definition_by_id(&self, table_id: &TableId) -> Option<Arc<TableDefinition>> { + self.tables.get(table_id).cloned() } pub fn table_definition_and_id( @@ -636,8 +636,8 @@ impl DatabaseSchema { .collect() } - pub fn table_exists(&self, table_id: TableId) -> bool { - self.tables.contains_key(&table_id) + pub fn table_exists(&self, table_id: &TableId) -> bool { + self.tables.contains_key(table_id) } pub fn tables(&self) -> impl Iterator<Item = Arc<TableDefinition>> + use<'_> { @@ -648,8 +648,8 @@ impl DatabaseSchema { self.table_map.get_by_right(&table_name.into()).copied() } - pub fn table_id_to_name(&self, table_id: TableId) -> Option<Arc<str>> { - self.table_map.get_by_left(&table_id).map(Arc::clone) + pub fn table_id_to_name(&self, table_id: &TableId) -> Option<Arc<str>> { + self.table_map.get_by_left(table_id).map(Arc::clone) } } @@ -959,10 +959,10 @@ impl TableDefinition { .expect("Column exists in mapping") } - pub fn column_id_to_name_unchecked(&self, id: ColumnId) -> Arc<str> { + pub fn column_id_to_name_unchecked(&self, id: &ColumnId) -> Arc<str> { Arc::clone( self.column_map - .get_by_left(&id) + .get_by_left(id) .expect("Column exists in mapping"), ) } @@ -1254,7 +1254,7 @@ mod tests { let table = database.tables.get_mut(&TableId::from(0)).unwrap(); println!("table: {table:#?}"); assert_eq!(table.column_map.len(), 1); - assert_eq!(table.column_id_to_name_unchecked(0.into()), "test".into()); + assert_eq!(table.column_id_to_name_unchecked(&0.into()), "test".into()); Arc::make_mut(table) .add_columns(vec![( diff --git a/influxdb3_server/src/system_tables/last_caches.rs b/influxdb3_server/src/system_tables/last_caches.rs index adf3ffb955..45d2f7f621 100644 --- a/influxdb3_server/src/system_tables/last_caches.rs +++ b/influxdb3_server/src/system_tables/last_caches.rs @@ -107,7 +107,7 @@ fn from_last_cache_definitions( for cache_defn in cache_defns { let table_defn = db_schema - .table_definition_by_id(cache_defn.table_id) + .table_definition_by_id(&cache_defn.table_id) .expect("table should exist for last cache"); table_name_arr.append_value(&cache_defn.table); diff --git a/influxdb3_server/src/system_tables/parquet_files.rs b/influxdb3_server/src/system_tables/parquet_files.rs index df6b9d257b..d64c852156 100644 --- a/influxdb3_server/src/system_tables/parquet_files.rs +++ b/influxdb3_server/src/system_tables/parquet_files.rs @@ -95,7 +95,7 @@ impl IoxSystemTable for ParquetFilesTable { self.db_id, self.buffer .catalog() - .db_schema_by_id(self.db_id) + .db_schema_by_id(&self.db_id) .expect("db exists") .table_name_to_id(table_name.as_str()) .expect("table exists"), diff --git a/influxdb3_write/src/last_cache/mod.rs b/influxdb3_write/src/last_cache/mod.rs index 910c6a73df..afb6db7108 100644 --- a/influxdb3_write/src/last_cache/mod.rs +++ b/influxdb3_write/src/last_cache/mod.rs @@ -226,9 +226,9 @@ impl LastCacheProvider { .flat_map(|(table_id, table_map)| { let table_name = self .catalog - .db_schema_by_id(db) + .db_schema_by_id(&db) .expect("db exists") - .table_id_to_name(*table_id) + .table_id_to_name(table_id) .expect("table exists"); table_map.iter().map(move |(lc_name, lc)| { lc.to_definition(*table_id, table_name.as_ref(), Arc::clone(lc_name)) @@ -505,13 +505,13 @@ impl LastCacheProvider { if db_cache.is_empty() { continue; } - let Some(db_schema) = self.catalog.db_schema_by_id(batch.database_id) + let Some(db_schema) = self.catalog.db_schema_by_id(&batch.database_id) else { continue; }; for (table_id, table_chunks) in &batch.table_chunks { if let Some(table_cache) = db_cache.get_mut(table_id) { - let Some(table_def) = db_schema.table_definition_by_id(*table_id) + let Some(table_def) = db_schema.table_definition_by_id(table_id) else { continue; }; @@ -552,8 +552,8 @@ impl LastCacheProvider { ) -> Option<Result<Vec<RecordBatch>, ArrowError>> { let table_def = self .catalog - .db_schema_by_id(db_id) - .and_then(|db| db.table_definition_by_id(table_id)) + .db_schema_by_id(&db_id) + .and_then(|db| db.table_definition_by_id(&table_id)) .expect("valid db and table ids to get table definition"); self.cache_map diff --git a/influxdb3_write/src/last_cache/table_function.rs b/influxdb3_write/src/last_cache/table_function.rs index 37d04a641f..509bffb473 100644 --- a/influxdb3_write/src/last_cache/table_function.rs +++ b/influxdb3_write/src/last_cache/table_function.rs @@ -100,7 +100,7 @@ impl TableFunctionImpl for LastCacheFunction { let Some(table_def) = self .provider .catalog - .db_schema_by_id(self.db_id) + .db_schema_by_id(&self.db_id) .expect("db exists") .table_definition(table_name.as_str()) else { diff --git a/influxdb3_write/src/write_buffer/mod.rs b/influxdb3_write/src/write_buffer/mod.rs index e3ab18e149..e1b5fed8a6 100644 --- a/influxdb3_write/src/write_buffer/mod.rs +++ b/influxdb3_write/src/write_buffer/mod.rs @@ -466,10 +466,10 @@ impl LastCacheManager for WriteBufferImpl { let cache_name = cache_name.map(Into::into); let catalog = self.catalog(); let db_schema = catalog - .db_schema_by_id(db_id) + .db_schema_by_id(&db_id) .ok_or(Error::DbDoesNotExist)?; let table_def = db_schema - .table_definition_by_id(table_id) + .table_definition_by_id(&table_id) .ok_or(Error::TableDoesNotExist)?; if let Some(info) = self.last_cache.create_cache(CreateCacheArguments { @@ -503,7 +503,7 @@ impl LastCacheManager for WriteBufferImpl { cache_name: &str, ) -> crate::Result<(), self::Error> { let catalog = self.catalog(); - let db_schema = catalog.db_schema_by_id(db_id).expect("db should exist"); + let db_schema = catalog.db_schema_by_id(&db_id).expect("db should exist"); self.last_cache.delete_cache(db_id, tbl_id, cache_name)?; catalog.delete_last_cache(db_id, tbl_id, cache_name); @@ -516,7 +516,7 @@ impl LastCacheManager for WriteBufferImpl { database_name: Arc::clone(&db_schema.name), ops: vec![CatalogOp::DeleteLastCache(LastCacheDelete { table_id: tbl_id, - table_name: db_schema.table_id_to_name(tbl_id).expect("table exists"), + table_name: db_schema.table_id_to_name(&tbl_id).expect("table exists"), name: cache_name.into(), })], })]) @@ -569,7 +569,7 @@ mod tests { .unwrap() .convert_lines_to_buffer(Gen1Duration::new_5m()); - let db = catalog.db_schema_by_id(DbId::from(0)).unwrap(); + let db = catalog.db_schema_by_id(&DbId::from(0)).unwrap(); assert_eq!(db.tables.len(), 2); // cpu table diff --git a/influxdb3_write/src/write_buffer/queryable_buffer.rs b/influxdb3_write/src/write_buffer/queryable_buffer.rs index 4eedead338..a70aab58e6 100644 --- a/influxdb3_write/src/write_buffer/queryable_buffer.rs +++ b/influxdb3_write/src/write_buffer/queryable_buffer.rs @@ -147,17 +147,17 @@ impl QueryableBuffer { let mut persisting_chunks = vec![]; let catalog = Arc::clone(&buffer.catalog); for (database_id, table_map) in buffer.db_to_table.iter_mut() { - let db_schema = catalog.db_schema_by_id(*database_id).expect("db exists"); + let db_schema = catalog.db_schema_by_id(database_id).expect("db exists"); for (table_id, table_buffer) in table_map.iter_mut() { let table_def = db_schema - .table_definition_by_id(*table_id) + .table_definition_by_id(table_id) .expect("table exists"); let snapshot_chunks = table_buffer.snapshot(table_def, snapshot_details.end_time_marker); for chunk in snapshot_chunks { let table_name = - db_schema.table_id_to_name(*table_id).expect("table exists"); + db_schema.table_id_to_name(table_id).expect("table exists"); let persist_job = PersistJob { database_id: *database_id, table_id: *table_id, @@ -372,14 +372,14 @@ impl BufferState { let db_schema = self .catalog - .db_schema_by_id(catalog_batch.database_id) + .db_schema_by_id(&catalog_batch.database_id) .expect("database should exist"); for op in catalog_batch.ops { match op { CatalogOp::CreateLastCache(definition) => { let table_def = db_schema - .table_definition_by_id(definition.table_id) + .table_definition_by_id(&definition.table_id) .expect("table should exist"); last_cache_provider.create_cache_from_definition( db_schema.id, @@ -408,14 +408,14 @@ impl BufferState { fn add_write_batch(&mut self, write_batch: WriteBatch) { let db_schema = self .catalog - .db_schema_by_id(write_batch.database_id) + .db_schema_by_id(&write_batch.database_id) .expect("database should exist"); let database_buffer = self.db_to_table.entry(write_batch.database_id).or_default(); for (table_id, table_chunks) in write_batch.table_chunks { let table_buffer = database_buffer.entry(table_id).or_insert_with(|| { let table_def = db_schema - .table_definition_by_id(table_id) + .table_definition_by_id(&table_id) .expect("table should exist"); // TODO: can we have the primary key stored on the table definition (we already have // the series key, so that doesn't seem like too much of a stretch).