From 9c4266c503a0a54801ddbc700f4553334d61901d Mon Sep 17 00:00:00 2001 From: Nga Tran Date: Fri, 11 Nov 2022 10:21:06 -0500 Subject: [PATCH 1/2] refactor: first step to remove unused retention_duration (#6113) * refactor: first step to remove unused retention_duration * refactor: remove retenion_duration from update catalog Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com> --- compactor/src/compact.rs | 7 +- compactor/src/garbage_collector.rs | 6 +- garbage_collector/src/objectstore/checker.rs | 2 +- .../aggregate_tsm_schema/update_catalog.rs | 75 ++----------------- influxdb_iox/src/commands/import/schema.rs | 1 - influxdb_iox/src/commands/remote/partition.rs | 4 +- influxdb_iox/tests/end_to_end_cases/cli.rs | 4 +- ingester/src/data.rs | 12 +-- .../src/data/partition/resolver/catalog.rs | 7 +- ingester/src/handler.rs | 2 +- ingester/src/test_util.rs | 7 +- ingester/tests/common/mod.rs | 7 +- iox_catalog/src/interface.rs | 47 ++++-------- iox_catalog/src/lib.rs | 4 +- iox_catalog/src/mem.rs | 3 +- iox_catalog/src/metrics.rs | 2 +- iox_catalog/src/postgres.rs | 26 +++---- iox_tests/src/util.rs | 2 +- ioxd_router/src/lib.rs | 3 +- router/src/dml_handlers/ns_autocreation.rs | 1 - router/src/namespace_resolver.rs | 7 +- .../src/namespace_resolver/ns_autocreation.rs | 12 +-- router/tests/http.rs | 7 -- service_grpc_catalog/src/lib.rs | 4 +- service_grpc_object_store/src/lib.rs | 2 +- service_grpc_schema/src/lib.rs | 2 +- 26 files changed, 63 insertions(+), 193 deletions(-) diff --git a/compactor/src/compact.rs b/compactor/src/compact.rs index 88d4cabd7e..f77d6c7972 100644 --- a/compactor/src/compact.rs +++ b/compactor/src/compact.rs @@ -711,12 +711,7 @@ pub mod tests { let pool = txn.query_pools().create_or_get("foo").await.unwrap(); let namespace = txn .namespaces() - .create( - "namespace_hot_partitions_to_compact", - "inf", - topic.id, - pool.id, - ) + .create("namespace_hot_partitions_to_compact", topic.id, pool.id) .await .unwrap(); let table = txn diff --git a/compactor/src/garbage_collector.rs b/compactor/src/garbage_collector.rs index 5da081b328..cb863c51ef 100644 --- a/compactor/src/garbage_collector.rs +++ b/compactor/src/garbage_collector.rs @@ -149,7 +149,7 @@ mod tests { let pool = txn.query_pools().create_or_get("foo").await.unwrap(); let namespace = txn .namespaces() - .create("gc_leave_undeleted_files_alone", "inf", topic.id, pool.id) + .create("gc_leave_undeleted_files_alone", topic.id, pool.id) .await .unwrap(); let table = txn @@ -229,7 +229,7 @@ mod tests { let pool = txn.query_pools().create_or_get("foo").await.unwrap(); let namespace = txn .namespaces() - .create("gc_leave_too_new_files_alone", "inf", topic.id, pool.id) + .create("gc_leave_too_new_files_alone", topic.id, pool.id) .await .unwrap(); let table = txn @@ -313,7 +313,7 @@ mod tests { let pool = txn.query_pools().create_or_get("foo").await.unwrap(); let namespace = txn .namespaces() - .create("gc_remove_old_enough_files", "inf", topic.id, pool.id) + .create("gc_remove_old_enough_files", topic.id, pool.id) .await .unwrap(); let table = txn diff --git a/garbage_collector/src/objectstore/checker.rs b/garbage_collector/src/objectstore/checker.rs index 6f02f62155..892b1cc832 100644 --- a/garbage_collector/src/objectstore/checker.rs +++ b/garbage_collector/src/objectstore/checker.rs @@ -138,7 +138,7 @@ mod tests { let pool = repos.query_pools().create_or_get("foo").await.unwrap(); let namespace = repos .namespaces() - .create("namespace_parquet_file_test", "inf", topic.id, pool.id) + .create("namespace_parquet_file_test", topic.id, pool.id) .await .unwrap(); let table = repos diff --git a/import/src/aggregate_tsm_schema/update_catalog.rs b/import/src/aggregate_tsm_schema/update_catalog.rs index a9d681fda3..0b9e3ec2c4 100644 --- a/import/src/aggregate_tsm_schema/update_catalog.rs +++ b/import/src/aggregate_tsm_schema/update_catalog.rs @@ -55,7 +55,6 @@ pub async fn update_iox_catalog<'a>( merged_tsm_schema: &'a AggregateTSMSchema, topic: &'a str, query_pool_name: Option<&'a str>, - retention: Option<&'a str>, catalog: Arc, connection: Connection, ) -> Result<(), UpdateCatalogError> { @@ -67,8 +66,8 @@ pub async fn update_iox_catalog<'a>( Ok(iox_schema) => iox_schema, Err(iox_catalog::interface::Error::NamespaceNotFoundByName { .. }) => { // Namespace has to be created; ensure the user provided the required parameters - let (query_pool_name, retention) = match (query_pool_name, retention) { - (Some(query_pool_name), Some(retention)) => (query_pool_name, retention), + let query_pool_name = match query_pool_name { + Some(query_pool_name) => query_pool_name, _ => { return Err(UpdateCatalogError::NamespaceCreationError("in order to create the namespace you must provide query_pool_name and retention args".to_string())); } @@ -78,7 +77,6 @@ pub async fn update_iox_catalog<'a>( get_topic_id_and_query_id(repos.deref_mut(), topic, query_pool_name).await?; let _namespace = create_namespace( namespace_name.as_str(), - retention, topic_id, query_id, repos.deref_mut(), @@ -138,7 +136,6 @@ where async fn create_namespace( name: &str, - retention: &str, topic_id: TopicId, query_id: QueryPoolId, repos: &mut R, @@ -146,11 +143,7 @@ async fn create_namespace( where R: RepoCollection + ?Sized, { - match repos - .namespaces() - .create(name, retention, topic_id, query_id) - .await - { + match repos.namespaces().create(name, topic_id, query_id).await { Ok(ns) => Ok(ns), Err(iox_catalog::interface::Error::NameExists { .. }) => { // presumably it got created in the meantime? @@ -532,7 +525,6 @@ mod tests { &agg_schema, "iox-shared", Some("iox-shared"), - Some("inf"), Arc::clone(&catalog), connection, ) @@ -598,7 +590,7 @@ mod tests { // create namespace, table and columns for weather measurement let namespace = txn .namespaces() - .create("1234_5678", "inf", TopicId::new(1), QueryPoolId::new(1)) + .create("1234_5678", TopicId::new(1), QueryPoolId::new(1)) .await .expect("namespace created"); let mut table = txn @@ -652,7 +644,6 @@ mod tests { &agg_schema, "iox-shared", Some("iox-shared"), - Some("inf"), Arc::clone(&catalog), connection, ) @@ -703,7 +694,7 @@ mod tests { // create namespace, table and columns for weather measurement let namespace = txn .namespaces() - .create("1234_5678", "inf", TopicId::new(1), QueryPoolId::new(1)) + .create("1234_5678", TopicId::new(1), QueryPoolId::new(1)) .await .expect("namespace created"); let mut table = txn @@ -750,7 +741,6 @@ mod tests { &agg_schema, "iox-shared", Some("iox-shared"), - Some("inf"), Arc::clone(&catalog), connection, ) @@ -784,7 +774,7 @@ mod tests { // create namespace, table and columns for weather measurement let namespace = txn .namespaces() - .create("1234_5678", "inf", TopicId::new(1), QueryPoolId::new(1)) + .create("1234_5678", TopicId::new(1), QueryPoolId::new(1)) .await .expect("namespace created"); let mut table = txn @@ -830,7 +820,6 @@ mod tests { &agg_schema, "iox-shared", Some("iox-shared"), - Some("inf"), Arc::clone(&catalog), connection, ) @@ -883,57 +872,6 @@ mod tests { &agg_schema, "iox-shared", None, - Some("inf"), - Arc::clone(&catalog), - connection, - ) - .await - .expect_err("should fail namespace creation"); - assert_matches!(err, UpdateCatalogError::NamespaceCreationError(_)); - } - - #[tokio::test] - async fn needs_creating_but_missing_retention() { - // init a test catalog stack - let metrics = Arc::new(metric::Registry::default()); - let catalog: Arc = Arc::new(MemCatalog::new(Arc::clone(&metrics))); - catalog - .repositories() - .await - .topics() - .create_or_get("iox-shared") - .await - .expect("topic created"); - let (connection, _join_handle, _requests) = create_test_shard_service(MapToShardResponse { - shard_id: 0, - shard_index: 0, - }) - .await; - - let json = r#" - { - "org_id": "1234", - "bucket_id": "5678", - "measurements": { - "cpu": { - "tags": [ - { "name": "host", "values": ["server", "desktop"] } - ], - "fields": [ - { "name": "usage", "types": ["Float"] } - ], - "earliest_time": "2022-01-01T00:00:00.00Z", - "latest_time": "2022-07-07T06:00:00.00Z" - } - } - } - "#; - let agg_schema: AggregateTSMSchema = json.try_into().unwrap(); - let err = update_iox_catalog( - &agg_schema, - "iox-shared", - Some("iox-shared"), - None, Arc::clone(&catalog), connection, ) @@ -992,7 +930,6 @@ mod tests { &agg_schema, "iox-shared", Some("iox-shared"), - Some("inf"), Arc::clone(&catalog), connection, ) diff --git a/influxdb_iox/src/commands/import/schema.rs b/influxdb_iox/src/commands/import/schema.rs index 6ad16c38c8..8cbaf886c7 100644 --- a/influxdb_iox/src/commands/import/schema.rs +++ b/influxdb_iox/src/commands/import/schema.rs @@ -183,7 +183,6 @@ pub async fn command(connection: Connection, config: Config) -> Result<(), Schem &merged_tsm_schema, merge_config.write_buffer_config.topic(), merge_config.query_pool_name.as_deref(), - merge_config.retention.as_deref(), Arc::clone(&catalog), connection.clone(), ) diff --git a/influxdb_iox/src/commands/remote/partition.rs b/influxdb_iox/src/commands/remote/partition.rs index 431764eba4..3b3b1a1f43 100644 --- a/influxdb_iox/src/commands/remote/partition.rs +++ b/influxdb_iox/src/commands/remote/partition.rs @@ -246,7 +246,7 @@ async fn load_schema( let namespace = match repos .namespaces() - .create(namespace, "inf", topic.id, query_pool.id) + .create(namespace, topic.id, query_pool.id) .await { Ok(n) => n, @@ -527,7 +527,7 @@ mod tests { .unwrap(); namespace = repos .namespaces() - .create("load_parquet_files", "", topic.id, query_pool.id) + .create("load_parquet_files", topic.id, query_pool.id) .await .unwrap(); table = repos diff --git a/influxdb_iox/tests/end_to_end_cases/cli.rs b/influxdb_iox/tests/end_to_end_cases/cli.rs index 7bc2760db4..88c3d076d0 100644 --- a/influxdb_iox/tests/end_to_end_cases/cli.rs +++ b/influxdb_iox/tests/end_to_end_cases/cli.rs @@ -607,7 +607,7 @@ async fn namespace_retention() { .arg("retention") .arg("--retention-hours") .arg(retention_period_hours.to_string()) - .arg(&namespace) + .arg(namespace) .assert() .success() .stdout( @@ -638,7 +638,7 @@ async fn namespace_retention() { .arg("retention") .arg("--retention-hours") .arg(retention_period_hours.to_string()) - .arg(&namespace) + .arg(namespace) .assert() .success() .stdout( diff --git a/ingester/src/data.rs b/ingester/src/data.rs index b05036cefd..0cb9c2d66b 100644 --- a/ingester/src/data.rs +++ b/ingester/src/data.rs @@ -634,7 +634,7 @@ mod tests { let shard_index = ShardIndex::new(0); let namespace = repos .namespaces() - .create("foo", "inf", topic.id, query_pool.id) + .create("foo", topic.id, query_pool.id) .await .unwrap(); let shard1 = repos @@ -735,7 +735,7 @@ mod tests { let shard_index = ShardIndex::new(0); let namespace = repos .namespaces() - .create("foo", "inf", topic.id, query_pool.id) + .create("foo", topic.id, query_pool.id) .await .unwrap(); let shard1 = repos @@ -838,7 +838,7 @@ mod tests { let shard_index = ShardIndex::new(0); let namespace = repos .namespaces() - .create("foo", "inf", topic.id, query_pool.id) + .create("foo", topic.id, query_pool.id) .await .unwrap(); let shard1 = repos @@ -1118,7 +1118,7 @@ mod tests { let shard_index = ShardIndex::new(0); let namespace = repos .namespaces() - .create("foo", "inf", topic.id, query_pool.id) + .create("foo", topic.id, query_pool.id) .await .unwrap(); let shard1 = repos @@ -1260,7 +1260,7 @@ mod tests { let shard_index = ShardIndex::new(0); let namespace = repos .namespaces() - .create("foo", "inf", topic.id, query_pool.id) + .create("foo", topic.id, query_pool.id) .await .unwrap(); let shard = repos @@ -1440,7 +1440,7 @@ mod tests { let shard_index = ShardIndex::new(0); let namespace = repos .namespaces() - .create("foo", "inf", topic.id, query_pool.id) + .create("foo", topic.id, query_pool.id) .await .unwrap(); let shard1 = repos diff --git a/ingester/src/data/partition/resolver/catalog.rs b/ingester/src/data/partition/resolver/catalog.rs index ef34b6e681..583cc2ed08 100644 --- a/ingester/src/data/partition/resolver/catalog.rs +++ b/ingester/src/data/partition/resolver/catalog.rs @@ -111,12 +111,7 @@ mod tests { let q = repos.query_pools().create_or_get("platanos").await.unwrap(); let ns = repos .namespaces() - .create( - TABLE_NAME, - iox_catalog::INFINITE_RETENTION_POLICY, - t.id, - q.id, - ) + .create(TABLE_NAME, t.id, q.id) .await .unwrap(); diff --git a/ingester/src/handler.rs b/ingester/src/handler.rs index 9a99bc1d65..c7f3c10535 100644 --- a/ingester/src/handler.rs +++ b/ingester/src/handler.rs @@ -508,7 +508,7 @@ mod tests { let shard_index = ShardIndex::new(0); let namespace = txn .namespaces() - .create("foo", "inf", topic.id, query_pool.id) + .create("foo", topic.id, query_pool.id) .await .unwrap(); let mut shard = txn diff --git a/ingester/src/test_util.rs b/ingester/src/test_util.rs index 13a584e387..7a37a3d09d 100644 --- a/ingester/src/test_util.rs +++ b/ingester/src/test_util.rs @@ -730,12 +730,7 @@ pub(crate) async fn populate_catalog( let query_pool = c.query_pools().create_or_get("query-pool").await.unwrap(); let ns_id = c .namespaces() - .create( - namespace, - iox_catalog::INFINITE_RETENTION_POLICY, - topic.id, - query_pool.id, - ) + .create(namespace, topic.id, query_pool.id) .await .unwrap() .id; diff --git a/ingester/tests/common/mod.rs b/ingester/tests/common/mod.rs index 2ca166a864..65fb2377ab 100644 --- a/ingester/tests/common/mod.rs +++ b/ingester/tests/common/mod.rs @@ -181,12 +181,7 @@ impl TestContext { .repositories() .await .namespaces() - .create( - name, - iox_catalog::INFINITE_RETENTION_POLICY, - self.topic_id, - self.query_id, - ) + .create(name, self.topic_id, self.query_id) .await .expect("failed to create test namespace"); diff --git a/iox_catalog/src/interface.rs b/iox_catalog/src/interface.rs index 1ec546a321..e81ba8bd14 100644 --- a/iox_catalog/src/interface.rs +++ b/iox_catalog/src/interface.rs @@ -290,7 +290,6 @@ pub trait NamespaceRepo: Send + Sync { async fn create( &mut self, name: &str, - retention_duration: &str, topic_id: TopicId, query_pool_id: QueryPoolId, ) -> Result; @@ -977,7 +976,7 @@ pub(crate) mod test_helpers { let namespace_name = "test_namespace"; let namespace = repos .namespaces() - .create(namespace_name, "inf", topic.id, pool.id) + .create(namespace_name, topic.id, pool.id) .await .unwrap(); assert!(namespace.id > NamespaceId::new(0)); @@ -992,7 +991,7 @@ pub(crate) mod test_helpers { let conflict = repos .namespaces() - .create(namespace_name, "inf", topic.id, pool.id) + .create(namespace_name, topic.id, pool.id) .await; assert!(matches!( conflict.unwrap_err(), @@ -1032,7 +1031,7 @@ pub(crate) mod test_helpers { let namespace2_name = "test_namespace2"; let namespace2 = repos .namespaces() - .create(namespace2_name, "inf", topic.id, pool.id) + .create(namespace2_name, topic.id, pool.id) .await .unwrap(); let mut namespaces = repos.namespaces().list().await.unwrap(); @@ -1081,7 +1080,7 @@ pub(crate) mod test_helpers { let pool = repos.query_pools().create_or_get("foo").await.unwrap(); let namespace = repos .namespaces() - .create("namespace_table_test", "inf", topic.id, pool.id) + .create("namespace_table_test", topic.id, pool.id) .await .unwrap(); @@ -1118,7 +1117,7 @@ pub(crate) mod test_helpers { // test we can create a table of the same name in a different namespace let namespace2 = repos .namespaces() - .create("two", "inf", topic.id, pool.id) + .create("two", topic.id, pool.id) .await .unwrap(); assert_ne!(namespace, namespace2); @@ -1209,7 +1208,7 @@ pub(crate) mod test_helpers { let pool = repos.query_pools().create_or_get("foo").await.unwrap(); let namespace = repos .namespaces() - .create("namespace_column_test", "inf", topic.id, pool.id) + .create("namespace_column_test", topic.id, pool.id) .await .unwrap(); let table = repos @@ -1420,7 +1419,7 @@ pub(crate) mod test_helpers { let pool = repos.query_pools().create_or_get("foo").await.unwrap(); let namespace = repos .namespaces() - .create("namespace_partition_test", "inf", topic.id, pool.id) + .create("namespace_partition_test", topic.id, pool.id) .await .unwrap(); let table = repos @@ -1498,7 +1497,7 @@ pub(crate) mod test_helpers { // test list_by_namespace let namespace2 = repos .namespaces() - .create("namespace_partition_test2", "inf", topic.id, pool.id) + .create("namespace_partition_test2", topic.id, pool.id) .await .unwrap(); let table2 = repos @@ -1695,7 +1694,7 @@ pub(crate) mod test_helpers { let pool = repos.query_pools().create_or_get("foo").await.unwrap(); let namespace = repos .namespaces() - .create("namespace_tombstone_test", "inf", topic.id, pool.id) + .create("namespace_tombstone_test", topic.id, pool.id) .await .unwrap(); let table = repos @@ -1779,7 +1778,7 @@ pub(crate) mod test_helpers { // test list_by_namespace let namespace2 = repos .namespaces() - .create("namespace_tombstone_test2", "inf", topic.id, pool.id) + .create("namespace_tombstone_test2", topic.id, pool.id) .await .unwrap(); let table2 = repos @@ -1860,7 +1859,6 @@ pub(crate) mod test_helpers { .namespaces() .create( "namespace_tombstones_by_parquet_file_test", - "inf", topic.id, pool.id, ) @@ -2077,7 +2075,7 @@ pub(crate) mod test_helpers { let pool = repos.query_pools().create_or_get("foo").await.unwrap(); let namespace = repos .namespaces() - .create("namespace_parquet_file_test", "inf", topic.id, pool.id) + .create("namespace_parquet_file_test", topic.id, pool.id) .await .unwrap(); let table = repos @@ -2232,7 +2230,7 @@ pub(crate) mod test_helpers { // test list_by_namespace_not_to_delete let namespace2 = repos .namespaces() - .create("namespace_parquet_file_test1", "inf", topic.id, pool.id) + .create("namespace_parquet_file_test1", topic.id, pool.id) .await .unwrap(); let table2 = repos @@ -2583,7 +2581,6 @@ pub(crate) mod test_helpers { .namespaces() .create( "namespace_parquet_file_compaction_level_0_test", - "inf", topic.id, pool.id, ) @@ -2701,7 +2698,6 @@ pub(crate) mod test_helpers { .namespaces() .create( "namespace_parquet_file_compaction_level_1_test", - "inf", topic.id, pool.id, ) @@ -2922,12 +2918,7 @@ pub(crate) mod test_helpers { .unwrap(); let namespace = repos .namespaces() - .create( - "test_most_level_0_files_partitions", - "inf", - topic.id, - pool.id, - ) + .create("test_most_level_0_files_partitions", topic.id, pool.id) .await .unwrap(); let table = repos @@ -3381,7 +3372,6 @@ pub(crate) mod test_helpers { .namespaces() .create( "test_recent_highest_throughput_partitions", - "inf", topic.id, pool.id, ) @@ -3672,7 +3662,6 @@ pub(crate) mod test_helpers { .namespaces() .create( "namespace_parquet_file_test_list_by_partiton_not_to_delete", - "inf", topic.id, pool.id, ) @@ -3785,7 +3774,6 @@ pub(crate) mod test_helpers { .namespaces() .create( "namespace_update_to_compaction_level_1_test", - "inf", topic.id, pool.id, ) @@ -3907,12 +3895,7 @@ pub(crate) mod test_helpers { let pool = repos.query_pools().create_or_get("foo").await.unwrap(); let namespace = repos .namespaces() - .create( - "namespace_processed_tombstone_test", - "inf", - topic.id, - pool.id, - ) + .create("namespace_processed_tombstone_test", topic.id, pool.id) .await .unwrap(); let table = repos @@ -4152,7 +4135,7 @@ pub(crate) mod test_helpers { let pool = repos.query_pools().create_or_get("foo").await.unwrap(); let namespace = repos .namespaces() - .create(namespace_name, "inf", topic.id, pool.id) + .create(namespace_name, topic.id, pool.id) .await; let namespace = match namespace { diff --git a/iox_catalog/src/lib.rs b/iox_catalog/src/lib.rs index 5abb907b0e..f0692b37d6 100644 --- a/iox_catalog/src/lib.rs +++ b/iox_catalog/src/lib.rs @@ -36,8 +36,6 @@ pub const DEFAULT_MAX_COLUMNS_PER_TABLE: i32 = 200; pub const DEFAULT_RETENTION_PERIOD: Option = None; /// A string value representing an infinite retention policy. -pub const INFINITE_RETENTION_POLICY: &str = "inf"; - pub mod interface; pub mod mem; pub mod metrics; @@ -272,7 +270,7 @@ mod tests { let namespace = txn .namespaces() - .create(NAMESPACE_NAME, "inf", topic.id, query_pool.id) + .create(NAMESPACE_NAME, topic.id, query_pool.id) .await .unwrap(); diff --git a/iox_catalog/src/mem.rs b/iox_catalog/src/mem.rs index 7f1d3702a9..390c9293a1 100644 --- a/iox_catalog/src/mem.rs +++ b/iox_catalog/src/mem.rs @@ -285,7 +285,6 @@ impl NamespaceRepo for MemTxn { async fn create( &mut self, name: &str, - retention_duration: &str, topic_id: TopicId, query_pool_id: QueryPoolId, ) -> Result { @@ -302,7 +301,7 @@ impl NamespaceRepo for MemTxn { name: name.to_string(), topic_id, query_pool_id, - retention_duration: Some(retention_duration.to_string()), + retention_duration: Some("inf".to_string()), // temporary until the field retention_durantion is removed in the catalgo table in next PR max_tables: DEFAULT_MAX_TABLES, max_columns_per_table: DEFAULT_MAX_COLUMNS_PER_TABLE, retention_period_ns: DEFAULT_RETENTION_PERIOD, diff --git a/iox_catalog/src/metrics.rs b/iox_catalog/src/metrics.rs index 515b841f90..477de03041 100644 --- a/iox_catalog/src/metrics.rs +++ b/iox_catalog/src/metrics.rs @@ -193,7 +193,7 @@ decorate!( decorate!( impl_trait = NamespaceRepo, methods = [ - "namespace_create" = create(&mut self, name: &str, retention_duration: &str, topic_id: TopicId, query_pool_id: QueryPoolId) -> Result; + "namespace_create" = create(&mut self, name: &str, topic_id: TopicId, query_pool_id: QueryPoolId) -> Result; "namespace_update_retention_period" = update_retention_period(&mut self, name: &str, retention_hours: i64) -> Result; "namespace_list" = list(&mut self) -> Result>; "namespace_get_by_id" = get_by_id(&mut self, id: NamespaceId) -> Result>; diff --git a/iox_catalog/src/postgres.rs b/iox_catalog/src/postgres.rs index 1e0bac2a77..993f355179 100644 --- a/iox_catalog/src/postgres.rs +++ b/iox_catalog/src/postgres.rs @@ -589,21 +589,19 @@ impl NamespaceRepo for PostgresTxn { async fn create( &mut self, name: &str, - retention_duration: &str, topic_id: TopicId, query_pool_id: QueryPoolId, ) -> Result { let rec = sqlx::query_as::<_, Namespace>( r#" -INSERT INTO namespace ( name, retention_duration, topic_id, query_pool_id ) -VALUES ( $1, $2, $3, $4 ) -RETURNING *; - "#, + INSERT INTO namespace ( name, topic_id, query_pool_id ) + VALUES ( $1, $2, $3 ) + RETURNING *; + "#, ) .bind(name) // $1 - .bind(retention_duration) // $2 - .bind(topic_id) // $3 - .bind(query_pool_id) // $4 + .bind(topic_id) // $2 + .bind(query_pool_id) // $3 .fetch_one(&mut self.inner) .await .map_err(|e| { @@ -2355,7 +2353,7 @@ mod tests { .repositories() .await .namespaces() - .create("ns", crate::INFINITE_RETENTION_POLICY, kafka.id, query.id) + .create("ns", kafka.id, query.id) .await .expect("namespace create failed") .id; @@ -2431,7 +2429,7 @@ mod tests { .repositories() .await .namespaces() - .create("ns2", crate::INFINITE_RETENTION_POLICY, kafka.id, query.id) + .create("ns2", kafka.id, query.id) .await .expect("namespace create failed") .id; @@ -2514,7 +2512,7 @@ mod tests { .repositories() .await .namespaces() - .create("ns4", crate::INFINITE_RETENTION_POLICY, kafka.id, query.id) + .create("ns4", kafka.id, query.id) .await .expect("namespace create failed") .id; @@ -2573,7 +2571,7 @@ mod tests { .repositories() .await .namespaces() - .create("ns3", crate::INFINITE_RETENTION_POLICY, kafka.id, query.id) + .create("ns3", kafka.id, query.id) .await .expect("namespace create failed") .id; @@ -2734,7 +2732,7 @@ mod tests { .repositories() .await .namespaces() - .create("ns4", crate::INFINITE_RETENTION_POLICY, kafka.id, query.id) + .create("ns4", kafka.id, query.id) .await .expect("namespace create failed") .id; @@ -2916,7 +2914,7 @@ mod tests { .repositories() .await .namespaces() - .create("ns4", crate::INFINITE_RETENTION_POLICY, kafka.id, query.id) + .create("ns4", kafka.id, query.id) .await .expect("namespace create failed") .id; diff --git a/iox_tests/src/util.rs b/iox_tests/src/util.rs index 6f0cc61ef9..cfde31ff0e 100644 --- a/iox_tests/src/util.rs +++ b/iox_tests/src/util.rs @@ -151,7 +151,7 @@ impl TestCatalog { let query_pool = repos.query_pools().create_or_get("pool").await.unwrap(); let namespace = repos .namespaces() - .create(name, "1y", topic.id, query_pool.id) + .create(name, topic.id, query_pool.id) .await .unwrap(); diff --git a/ioxd_router/src/lib.rs b/ioxd_router/src/lib.rs index 5b2c6a83a0..4d64cdaead 100644 --- a/ioxd_router/src/lib.rs +++ b/ioxd_router/src/lib.rs @@ -258,7 +258,6 @@ pub async fn create_router_server_type( Arc::clone(&catalog), topic_id, query_id, - iox_catalog::INFINITE_RETENTION_POLICY.to_owned(), ); // //////////////////////////////////////////////////////////////////////////// @@ -412,7 +411,7 @@ mod tests { let pool = repos.query_pools().create_or_get("foo").await.unwrap(); let namespace = repos .namespaces() - .create("test_ns", "inf", topic.id, pool.id) + .create("test_ns", topic.id, pool.id) .await .unwrap(); diff --git a/router/src/dml_handlers/ns_autocreation.rs b/router/src/dml_handlers/ns_autocreation.rs index 7e69ecd028..186baf0f3b 100644 --- a/router/src/dml_handlers/ns_autocreation.rs +++ b/router/src/dml_handlers/ns_autocreation.rs @@ -224,7 +224,6 @@ mod tests { Namespace { id: NamespaceId::new(1), name: ns.to_string(), - retention_duration: Some("inf".to_owned()), topic_id: TopicId::new(42), query_pool_id: QueryPoolId::new(42), max_tables: iox_catalog::DEFAULT_MAX_TABLES, diff --git a/router/src/namespace_resolver.rs b/router/src/namespace_resolver.rs index 6ca2e2344f..d5e862677e 100644 --- a/router/src/namespace_resolver.rs +++ b/router/src/namespace_resolver.rs @@ -167,12 +167,7 @@ mod tests { let query_pool = repos.query_pools().create_or_get("platanos").await.unwrap(); repos .namespaces() - .create( - &ns, - iox_catalog::INFINITE_RETENTION_POLICY, - topic.id, - query_pool.id, - ) + .create(&ns, topic.id, query_pool.id) .await .expect("failed to setup catalog state"); } diff --git a/router/src/namespace_resolver/ns_autocreation.rs b/router/src/namespace_resolver/ns_autocreation.rs index d6314c51ea..eaa67442d5 100644 --- a/router/src/namespace_resolver/ns_autocreation.rs +++ b/router/src/namespace_resolver/ns_autocreation.rs @@ -30,7 +30,6 @@ pub struct NamespaceAutocreation { topic_id: TopicId, query_id: QueryPoolId, - retention: String, } impl NamespaceAutocreation { @@ -48,7 +47,6 @@ impl NamespaceAutocreation { catalog: Arc, topic_id: TopicId, query_id: QueryPoolId, - retention: String, ) -> Self { Self { inner, @@ -56,7 +54,6 @@ impl NamespaceAutocreation { catalog, topic_id, query_id, - retention, } } } @@ -80,12 +77,7 @@ where match repos .namespaces() - .create( - namespace.as_str(), - &self.retention, - self.topic_id, - self.query_id, - ) + .create(namespace.as_str(), self.topic_id, self.query_id) .await { Ok(_) => { @@ -148,7 +140,6 @@ mod tests { Arc::clone(&catalog), TopicId::new(42), QueryPoolId::new(42), - "inf".to_owned(), ); // Drive the code under test @@ -186,7 +177,6 @@ mod tests { Arc::clone(&catalog), TopicId::new(42), QueryPoolId::new(42), - "inf".to_owned(), ); let created_id = creator diff --git a/router/tests/http.rs b/router/tests/http.rs index acee28853f..a825d2dbc6 100644 --- a/router/tests/http.rs +++ b/router/tests/http.rs @@ -116,7 +116,6 @@ impl TestContext { Arc::clone(&catalog), TopicId::new(TEST_TOPIC_ID), QueryPoolId::new(TEST_QUERY_POOL_ID), - iox_catalog::INFINITE_RETENTION_POLICY.to_owned(), ); let delegate = HttpDelegate::new(1024, 100, namespace_resolver, handler_stack, &metrics); @@ -193,10 +192,6 @@ async fn test_write_ok() { .expect("query should succeed") .expect("namespace not found"); assert_eq!(ns.name, "bananas_test"); - assert_eq!( - ns.retention_duration.as_deref(), - Some(iox_catalog::INFINITE_RETENTION_POLICY) - ); assert_eq!(ns.topic_id, TopicId::new(TEST_TOPIC_ID)); assert_eq!(ns.query_pool_id, QueryPoolId::new(TEST_QUERY_POOL_ID)); @@ -357,7 +352,6 @@ async fn test_write_propagate_ids() { .namespaces() .create( "bananas_test", - iox_catalog::INFINITE_RETENTION_POLICY, TopicId::new(TEST_TOPIC_ID), QueryPoolId::new(TEST_QUERY_POOL_ID), ) @@ -432,7 +426,6 @@ async fn test_delete_propagate_ids() { .namespaces() .create( "bananas_test", - iox_catalog::INFINITE_RETENTION_POLICY, TopicId::new(TEST_TOPIC_ID), QueryPoolId::new(TEST_QUERY_POOL_ID), ) diff --git a/service_grpc_catalog/src/lib.rs b/service_grpc_catalog/src/lib.rs index ac7bdc3216..568b11d8d5 100644 --- a/service_grpc_catalog/src/lib.rs +++ b/service_grpc_catalog/src/lib.rs @@ -194,7 +194,7 @@ mod tests { .unwrap(); let namespace = repos .namespaces() - .create("catalog_partition_test", "inf", topic.id, pool.id) + .create("catalog_partition_test", topic.id, pool.id) .await .unwrap(); let table = repos @@ -271,7 +271,7 @@ mod tests { .unwrap(); let namespace = repos .namespaces() - .create("catalog_partition_test", "inf", topic.id, pool.id) + .create("catalog_partition_test", topic.id, pool.id) .await .unwrap(); let table = repos diff --git a/service_grpc_object_store/src/lib.rs b/service_grpc_object_store/src/lib.rs index f5f515bf35..d0fa5c6c05 100644 --- a/service_grpc_object_store/src/lib.rs +++ b/service_grpc_object_store/src/lib.rs @@ -127,7 +127,7 @@ mod tests { .unwrap(); let namespace = repos .namespaces() - .create("catalog_partition_test", "inf", topic.id, pool.id) + .create("catalog_partition_test", topic.id, pool.id) .await .unwrap(); let table = repos diff --git a/service_grpc_schema/src/lib.rs b/service_grpc_schema/src/lib.rs index 23ef242e03..8e5199c5b9 100644 --- a/service_grpc_schema/src/lib.rs +++ b/service_grpc_schema/src/lib.rs @@ -123,7 +123,7 @@ mod tests { let pool = repos.query_pools().create_or_get("franz").await.unwrap(); let namespace = repos .namespaces() - .create("namespace_schema_test", "inf", topic.id, pool.id) + .create("namespace_schema_test", topic.id, pool.id) .await .unwrap(); let table = repos From ea2559a6404e1fc22dfe87085f4eeaa96566f140 Mon Sep 17 00:00:00 2001 From: Nga Tran Date: Fri, 11 Nov 2022 11:21:45 -0500 Subject: [PATCH 2/2] chore: update SQL syntax after adding the new column to_delete into table partition (#6123) --- docs/compactor.md | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/docs/compactor.md b/docs/compactor.md index f0cc46a331..03fd751d4e 100644 --- a/docs/compactor.md +++ b/docs/compactor.md @@ -202,19 +202,19 @@ WHERE partition.id = skipped_compactions.partition_id and partition.shard_id = s ORDER BY shard_index, table_id, partition_key, skipped_at; -- Number of files per level for top 50 partitions with most files of a specified day -SELECT s.shard_index, pf.table_id, partition_id, partition_key, - count(case when to_delete is null then 1 end) total_not_deleted, - count(case when compaction_level=0 and to_delete is null then 1 end) num_l0, - count(case when compaction_level=1 and to_delete is null then 1 end) num_l1, - count(case when compaction_level=2 and to_delete is null then 1 end) num_l2 , - count(case when compaction_level=0 and to_delete is not null then 1 end) deleted_num_l0, - count(case when compaction_level=1 and to_delete is not null then 1 end) deleted_num_l1, - count(case when compaction_level=2 and to_delete is not null then 1 end) deleted_num_l2 +SELECT s.shard_index, pf.table_id, pf.partition_id, p.partition_key, + count(case when pf.to_delete is null then 1 end) total_not_deleted, + count(case when pf.compaction_level=0 and pf.to_delete is null then 1 end) num_l0, + count(case when pf.compaction_level=1 and pf.to_delete is null then 1 end) num_l1, + count(case when pf.compaction_level=2 and pf.to_delete is null then 1 end) num_l2 , + count(case when pf.compaction_level=0 and pf.to_delete is not null then 1 end) deleted_num_l0, + count(case when pf.compaction_level=1 and pf.to_delete is not null then 1 end) deleted_num_l1, + count(case when pf.compaction_level=2 and pf.to_delete is not null then 1 end) deleted_num_l2 FROM parquet_file pf, partition p, shard s WHERE pf.partition_id = p.id AND pf.shard_id = s.id - AND partition_key = '2022-10-11' -GROUP BY s.shard_index, pf.table_id, partition_id, partition_key -ORDER BY count(case when to_delete is null then 1 end) DESC + AND p.partition_key = '2022-10-11' +GROUP BY s.shard_index, pf.table_id, pf.partition_id, p.partition_key +ORDER BY count(case when pf.to_delete is null then 1 end) DESC LIMIT 50; -- Partitions with level-0 files ingested within the last 4 hours