From a633964f2b3b0d5ffc4b9b12f93cb23e8f98f583 Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Mon, 6 Feb 2023 15:14:12 +0100 Subject: [PATCH 01/11] feat(catalog): return max table limit in schema The maximum number of tables is part of the Namespace, which is already loaded in its entirety. This commit copies the value into the NamespaceSchema, making it available for the router to utilise. --- compactor2/src/test_util.rs | 1 + data_types/src/lib.rs | 6 ++++++ ingester/src/data.rs | 3 ++- ingester/tests/common/mod.rs | 1 + iox_catalog/src/interface.rs | 3 +++ iox_catalog/src/lib.rs | 1 + router/src/namespace_cache/memory.rs | 2 ++ router/src/namespace_cache/metrics.rs | 1 + router/src/namespace_cache/sharded_cache.rs | 1 + router/src/namespace_resolver.rs | 1 + router/src/namespace_resolver/ns_autocreation.rs | 1 + 11 files changed, 20 insertions(+), 1 deletion(-) diff --git a/compactor2/src/test_util.rs b/compactor2/src/test_util.rs index a6afc54063..7b675b9178 100644 --- a/compactor2/src/test_util.rs +++ b/compactor2/src/test_util.rs @@ -220,6 +220,7 @@ impl NamespaceBuilder { query_pool_id, tables, max_columns_per_table: 10, + max_tables: 42, retention_period_ns: None, }, }, diff --git a/data_types/src/lib.rs b/data_types/src/lib.rs index 1dc5d810ac..d5ff391953 100644 --- a/data_types/src/lib.rs +++ b/data_types/src/lib.rs @@ -499,6 +499,8 @@ pub struct NamespaceSchema { pub tables: BTreeMap, /// the number of columns per table this namespace allows pub max_columns_per_table: usize, + /// The maximum number of tables permitted in this namespace. + pub max_tables: usize, /// The retention period in ns. /// None represents infinite duration (i.e. never drop data). pub retention_period_ns: Option, @@ -511,6 +513,7 @@ impl NamespaceSchema { topic_id: TopicId, query_pool_id: QueryPoolId, max_columns_per_table: i32, + max_tables: i32, retention_period_ns: Option, ) -> Self { Self { @@ -519,6 +522,7 @@ impl NamespaceSchema { topic_id, query_pool_id, max_columns_per_table: max_columns_per_table as usize, + max_tables: max_tables as usize, retention_period_ns, } } @@ -3479,6 +3483,7 @@ mod tests { query_pool_id: QueryPoolId::new(3), tables: BTreeMap::from([]), max_columns_per_table: 4, + max_tables: 42, retention_period_ns: None, }; let schema2 = NamespaceSchema { @@ -3487,6 +3492,7 @@ mod tests { query_pool_id: QueryPoolId::new(3), tables: BTreeMap::from([(String::from("foo"), TableSchema::new(TableId::new(1)))]), max_columns_per_table: 4, + max_tables: 42, retention_period_ns: None, }; assert!(schema1.size() < schema2.size()); diff --git a/ingester/src/data.rs b/ingester/src/data.rs index 7b5d17ad71..e6949b28af 100644 --- a/ingester/src/data.rs +++ b/ingester/src/data.rs @@ -752,7 +752,8 @@ mod tests { .await .unwrap(); - let schema = NamespaceSchema::new(namespace.id, topic.id, query_pool.id, 100, None); + let schema = + NamespaceSchema::new(namespace.id, topic.id, query_pool.id, 100, 42, None); let shard_index = ShardIndex::new(0); let shard1 = repos diff --git a/ingester/tests/common/mod.rs b/ingester/tests/common/mod.rs index fc71a495e5..3ea960282d 100644 --- a/ingester/tests/common/mod.rs +++ b/ingester/tests/common/mod.rs @@ -197,6 +197,7 @@ impl TestContext { self.topic_id, self.query_id, iox_catalog::DEFAULT_MAX_COLUMNS_PER_TABLE, + iox_catalog::DEFAULT_MAX_TABLES, retention_period_ns, ), ) diff --git a/iox_catalog/src/interface.rs b/iox_catalog/src/interface.rs index 992f9feaba..9e00ae8b51 100644 --- a/iox_catalog/src/interface.rs +++ b/iox_catalog/src/interface.rs @@ -792,6 +792,7 @@ where namespace.topic_id, namespace.query_pool_id, namespace.max_columns_per_table, + namespace.max_tables, namespace.retention_period_ns, ); @@ -929,6 +930,7 @@ pub async fn list_schemas( v.topic_id, v.query_pool_id, v.max_columns_per_table, + v.max_tables, v.retention_period_ns, ); ns.tables = joined.remove(&v.id)?; @@ -5970,6 +5972,7 @@ pub(crate) mod test_helpers { topic.id, pool.id, namespace.max_columns_per_table, + namespace.max_tables, namespace.retention_period_ns, ); diff --git a/iox_catalog/src/lib.rs b/iox_catalog/src/lib.rs index c9a29a503b..21cc2030ab 100644 --- a/iox_catalog/src/lib.rs +++ b/iox_catalog/src/lib.rs @@ -281,6 +281,7 @@ mod tests { namespace.topic_id, namespace.query_pool_id, namespace.max_columns_per_table, + namespace.max_tables, namespace.retention_period_ns, ); diff --git a/router/src/namespace_cache/memory.rs b/router/src/namespace_cache/memory.rs index 3140a0bb19..1baeed6a4d 100644 --- a/router/src/namespace_cache/memory.rs +++ b/router/src/namespace_cache/memory.rs @@ -46,6 +46,7 @@ mod tests { query_pool_id: QueryPoolId::new(1234), tables: Default::default(), max_columns_per_table: 50, + max_tables: 24, retention_period_ns: Some(876), }; assert!(cache.put_schema(ns.clone(), schema1.clone()).is_none()); @@ -57,6 +58,7 @@ mod tests { query_pool_id: QueryPoolId::new(2), tables: Default::default(), max_columns_per_table: 10, + max_tables: 42, retention_period_ns: Some(876), }; diff --git a/router/src/namespace_cache/metrics.rs b/router/src/namespace_cache/metrics.rs index e08978a592..015a8a34ec 100644 --- a/router/src/namespace_cache/metrics.rs +++ b/router/src/namespace_cache/metrics.rs @@ -199,6 +199,7 @@ mod tests { query_pool_id: QueryPoolId::new(1234), tables, max_columns_per_table: 100, + max_tables: 42, retention_period_ns: None, } } diff --git a/router/src/namespace_cache/sharded_cache.rs b/router/src/namespace_cache/sharded_cache.rs index efdb80dde6..8125e7aa92 100644 --- a/router/src/namespace_cache/sharded_cache.rs +++ b/router/src/namespace_cache/sharded_cache.rs @@ -65,6 +65,7 @@ mod tests { query_pool_id: QueryPoolId::new(1), tables: Default::default(), max_columns_per_table: 7, + max_tables: 42, retention_period_ns: None, } } diff --git a/router/src/namespace_resolver.rs b/router/src/namespace_resolver.rs index ac3158e3ca..9b0e6cd716 100644 --- a/router/src/namespace_resolver.rs +++ b/router/src/namespace_resolver.rs @@ -122,6 +122,7 @@ mod tests { query_pool_id: QueryPoolId::new(3), tables: Default::default(), max_columns_per_table: 4, + max_tables: 42, retention_period_ns: None, }, ); diff --git a/router/src/namespace_resolver/ns_autocreation.rs b/router/src/namespace_resolver/ns_autocreation.rs index feced6178c..feed257471 100644 --- a/router/src/namespace_resolver/ns_autocreation.rs +++ b/router/src/namespace_resolver/ns_autocreation.rs @@ -176,6 +176,7 @@ mod tests { query_pool_id: QueryPoolId::new(3), tables: Default::default(), max_columns_per_table: 4, + max_tables: 42, retention_period_ns: None, }, ); From a1764ee7cb2be0412f13f59473f6bc54786f5222 Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Mon, 6 Feb 2023 15:34:57 +0100 Subject: [PATCH 02/11] refactor: ExactSizeIterator for columns iter Adds ExactSizeIterator bounds to the MutableBatch::column() iter, allowing O(1) length discovery / pre-allocation optimisations for container collection. --- mutable_batch/src/lib.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/mutable_batch/src/lib.rs b/mutable_batch/src/lib.rs index db574a6e14..89bb55d70a 100644 --- a/mutable_batch/src/lib.rs +++ b/mutable_batch/src/lib.rs @@ -131,7 +131,7 @@ impl MutableBatch { } /// Returns an iterator over the columns in this batch in no particular order - pub fn columns(&self) -> impl Iterator + '_ { + pub fn columns(&self) -> impl Iterator + ExactSizeIterator + '_ { self.column_names .iter() .map(move |(name, idx)| (name, &self.columns[*idx])) @@ -269,6 +269,7 @@ mod tests { let batch = batches.get("cpu").unwrap(); assert_eq!(batch.size_data(), 128); + assert_eq!(batch.columns().len(), 5); let batches = lines_to_batches( "cpu,t1=hellomore,t2=world f1=1.1,f2=1i 1234\ncpu,t1=h,t2=w f1=2.2,f2=2i 1234", @@ -277,6 +278,7 @@ mod tests { .unwrap(); let batch = batches.get("cpu").unwrap(); assert_eq!(batch.size_data(), 138); + assert_eq!(batch.columns().len(), 5); } #[test] @@ -289,5 +291,6 @@ mod tests { let batch = batches.get("cpu").unwrap(); assert_eq!(batch.size_data(), 124); + assert_eq!(batch.columns().len(), 5); } } From dfa4ab2585f773aeea254083771f13a5ccef6d2b Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Mon, 6 Feb 2023 15:35:14 +0100 Subject: [PATCH 03/11] perf(router): fast-path column limit for new table When validating column limits for new tables, skip the column set generation and union operations against the empty existing column set. --- router/src/dml_handlers/schema_validation.rs | 29 ++++++++++++++++---- 1 file changed, 24 insertions(+), 5 deletions(-) diff --git a/router/src/dml_handlers/schema_validation.rs b/router/src/dml_handlers/schema_validation.rs index b10643a0f1..84d58f23b0 100644 --- a/router/src/dml_handlers/schema_validation.rs +++ b/router/src/dml_handlers/schema_validation.rs @@ -349,11 +349,30 @@ fn validate_column_limits( schema: &NamespaceSchema, ) -> Result<(), OverColumnLimit> { for (table_name, batch) in batches { - let mut existing_columns = schema - .tables - .get(table_name) - .map(|t| t.column_names()) - .unwrap_or_default(); + // Get the column set for this table from the schema. + let mut existing_columns = match schema.tables.get(table_name).map(|t| t.column_names()) { + Some(v) => v, + None if batch.columns().len() > schema.max_columns_per_table => { + // If there are no existing columns, all the columns in + // write must be created - there's no need to perform a set + // union to discover the distinct column count. + return Err(OverColumnLimit { + table_name: table_name.into(), + merged_column_count: batch.columns().len(), + existing_column_count: 0, + max_columns_per_table: schema.max_columns_per_table, + }); + } + None => { + // All the columns in this write are new, and they are less than + // the maximum permitted number of columns. + continue; + } + }; + + // The union of existing columns and new columns in this write must be + // calculated to derive the total distinct column count for this table + // after this write applied. let existing_column_count = existing_columns.len(); let merged_column_count = { From 114bafe9a1c662ca96dc403d56b6019003f925b4 Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Mon, 6 Feb 2023 16:50:01 +0100 Subject: [PATCH 04/11] perf(router): cached table limit enforcement Use the namespace schema cache in the router to enforce the per-namespace table limit (service protection limit), adding O(1) overhead to the existing column limit evaluation logic. Prior to this commit, each request that would breach the table limit would be (potentially partially) applied to the catalog and return an error. Every subsequent request creating a new table continued to cause a catalog query, unnecessarily adding load proportional to request counts. After this commit, catalog requests are sent when the router instance can determine (to the best of it's ability, see below) that the request will not cause the namespace to exceed the table limit. Because this uses cached schemas, the actual state set of tables may have changed - this will cause inconsistent enforcement and spurious errors in the same way it currently does for the column limit. For more details (and to track a resolution) see: https://github.com/influxdata/influxdb_iox/issues/5957 --- iox_tests/src/util.rs | 10 + router/src/dml_handlers/schema_validation.rs | 295 ++++++++++++++++--- 2 files changed, 258 insertions(+), 47 deletions(-) diff --git a/iox_tests/src/util.rs b/iox_tests/src/util.rs index e3d5576d47..f2ef1e5eb2 100644 --- a/iox_tests/src/util.rs +++ b/iox_tests/src/util.rs @@ -356,6 +356,16 @@ impl TestNamespace { .await .unwrap(); } + + /// Set the number of tables allowed in this namespace. + pub async fn update_table_limit(&self, new_max: i32) { + let mut repos = self.catalog.catalog.repositories().await; + repos + .namespaces() + .update_table_limit(&self.namespace.name, new_max) + .await + .unwrap(); + } } /// A test shard with its namespace in the catalog diff --git a/router/src/dml_handlers/schema_validation.rs b/router/src/dml_handlers/schema_validation.rs index 84d58f23b0..b2699909d9 100644 --- a/router/src/dml_handlers/schema_validation.rs +++ b/router/src/dml_handlers/schema_validation.rs @@ -207,14 +207,41 @@ where } }; - validate_column_limits(&batches, &schema).map_err(|e| { - warn!( - %namespace, - %namespace_id, - error=%e, - "service protection limit reached" - ); - self.service_limit_hit_columns.inc(1); + validate_schema_limits(&batches, &schema).map_err(|e| { + match &e { + CachedServiceProtectionLimit::Column { + table_name, + existing_column_count, + merged_column_count, + max_columns_per_table, + } => { + warn!( + %table_name, + %existing_column_count, + %merged_column_count, + %max_columns_per_table, + %namespace, + %namespace_id, + "service protection limit reached (columns)" + ); + self.service_limit_hit_columns.inc(1); + } + CachedServiceProtectionLimit::Table { + existing_table_count, + merged_table_count, + table_count_limit, + } => { + warn!( + %existing_table_count, + %merged_table_count, + %table_count_limit, + %namespace, + %namespace_id, + "service protection limit reached (tables)" + ); + self.service_limit_hit_tables.inc(1); + } + } SchemaError::ServiceLimit(Box::new(e)) })?; @@ -328,35 +355,69 @@ where } } +/// An error returned by schema limit evaluation against a cached +/// [`NamespaceSchema`]. #[derive(Debug, Error)] -#[error( - "couldn't create columns in table `{table_name}`; table contains \ +pub enum CachedServiceProtectionLimit { + /// The number of columns would exceed the table column limit cached in the + /// [`NamespaceSchema`]. + #[error( + "couldn't create columns in table `{table_name}`; table contains \ {existing_column_count} existing columns, applying this write would result \ in {merged_column_count} columns, limit is {max_columns_per_table}" -)] -struct OverColumnLimit { - table_name: String, - // Number of columns already in the table. - existing_column_count: usize, - // Number of resultant columns after merging the write with existing columns. - merged_column_count: usize, - // The configured limit. - max_columns_per_table: usize, + )] + Column { + /// The table that exceeds the column limit. + table_name: String, + /// Number of columns already in the table. + existing_column_count: usize, + /// Number of resultant columns after merging the write with existing + /// columns. + merged_column_count: usize, + /// The configured limit. + max_columns_per_table: usize, + }, + + /// The number of table would exceed the table limit cached in the + /// [`NamespaceSchema`]. + #[error( + "couldn't create new table; namespace contains {existing_table_count} \ + existing tables, applying this write would result in \ + {merged_table_count} columns, limit is {table_count_limit}" + )] + Table { + /// Number of tables already in the namespace. + existing_table_count: usize, + /// Number of resultant tables after merging the write with existing + /// tables. + merged_table_count: usize, + /// The configured limit. + table_count_limit: usize, + }, } -fn validate_column_limits( +/// Evaluate the number of columns/tables that would result if `batches` was +/// applied to `schema`, and ensure the column/table count does not exceed the +/// maximum permitted amount cached in the [`NamespaceSchema`]. +fn validate_schema_limits( batches: &HashMap, schema: &NamespaceSchema, -) -> Result<(), OverColumnLimit> { +) -> Result<(), CachedServiceProtectionLimit> { + // Maintain a counter tracking the number of tables in `batches` that do not + // exist in `schema`. + // + // This number of tables would be newly created when accepting the write. + let mut new_tables = 0; + for (table_name, batch) in batches { // Get the column set for this table from the schema. - let mut existing_columns = match schema.tables.get(table_name).map(|t| t.column_names()) { - Some(v) => v, + let mut existing_columns = match schema.tables.get(table_name) { + Some(v) => v.column_names(), None if batch.columns().len() > schema.max_columns_per_table => { - // If there are no existing columns, all the columns in + // The table does not exist, therefore all the columns in this // write must be created - there's no need to perform a set // union to discover the distinct column count. - return Err(OverColumnLimit { + return Err(CachedServiceProtectionLimit::Column { table_name: table_name.into(), merged_column_count: batch.columns().len(), existing_column_count: 0, @@ -364,8 +425,29 @@ fn validate_column_limits( }); } None => { - // All the columns in this write are new, and they are less than - // the maximum permitted number of columns. + // The table must be created. + new_tables += 1; + + // At least one new table will be created, ensure this does not + // exceed the configured maximum. + // + // Enforcing the check here ensures table limits are validated + // only when new tables are being created - this ensures + // existing tables do not become unusable if the limit is + // lowered, or because multiple writes were concurrently + // submitted to multiple router instances, exceeding the schema + // limit by some degree (eventual enforcement). + let merged_table_count = schema.tables.len() + new_tables; + if merged_table_count > schema.max_tables { + return Err(CachedServiceProtectionLimit::Table { + existing_table_count: schema.tables.len(), + merged_table_count, + table_count_limit: schema.max_tables, + }); + } + + // Therefore all the columns in this write are new, and they are + // less than the maximum permitted number of columns. continue; } }; @@ -380,13 +462,14 @@ fn validate_column_limits( existing_columns.len() }; - // If the table is currently over the column limit but this write only includes existing - // columns and doesn't exceed the limit more, this is allowed. + // If the table is currently over the column limit but this write only + // includes existing columns and doesn't exceed the limit more, this is + // allowed. let columns_were_added_in_this_batch = merged_column_count > existing_column_count; let column_limit_exceeded = merged_column_count > schema.max_columns_per_table; if columns_were_added_in_this_batch && column_limit_exceeded { - return Err(OverColumnLimit { + return Err(CachedServiceProtectionLimit::Column { table_name: table_name.into(), merged_column_count, existing_column_count, @@ -412,7 +495,7 @@ mod tests { static NAMESPACE: Lazy> = Lazy::new(|| "bananas".try_into().unwrap()); #[tokio::test] - async fn validate_limits() { + async fn test_validate_column_limits() { let (catalog, namespace) = test_setup().await; namespace.update_column_limit(3).await; @@ -422,12 +505,12 @@ mod tests { let schema = namespace.schema().await; // Columns under the limit is ok let batches = lp_to_writes("nonexistent val=42i 123456"); - assert!(validate_column_limits(&batches, &schema).is_ok()); + assert!(validate_schema_limits(&batches, &schema).is_ok()); // Columns over the limit is an error let batches = lp_to_writes("nonexistent,tag1=A,tag2=B val=42i 123456"); assert_matches!( - validate_column_limits(&batches, &schema), - Err(OverColumnLimit { + validate_schema_limits(&batches, &schema), + Err(CachedServiceProtectionLimit::Column { table_name: _, existing_column_count: 0, merged_column_count: 4, @@ -442,12 +525,12 @@ mod tests { let schema = namespace.schema().await; // Columns under the limit is ok let batches = lp_to_writes("no_columns_in_schema val=42i 123456"); - assert!(validate_column_limits(&batches, &schema).is_ok()); + assert!(validate_schema_limits(&batches, &schema).is_ok()); // Columns over the limit is an error let batches = lp_to_writes("no_columns_in_schema,tag1=A,tag2=B val=42i 123456"); assert_matches!( - validate_column_limits(&batches, &schema), - Err(OverColumnLimit { + validate_schema_limits(&batches, &schema), + Err(CachedServiceProtectionLimit::Column { table_name: _, existing_column_count: 0, merged_column_count: 4, @@ -463,15 +546,15 @@ mod tests { let schema = namespace.schema().await; // Columns already existing is ok let batches = lp_to_writes("i_got_columns i_got_music=42i 123456"); - assert!(validate_column_limits(&batches, &schema).is_ok()); + assert!(validate_schema_limits(&batches, &schema).is_ok()); // Adding columns under the limit is ok let batches = lp_to_writes("i_got_columns,tag1=A i_got_music=42i 123456"); - assert!(validate_column_limits(&batches, &schema).is_ok()); + assert!(validate_schema_limits(&batches, &schema).is_ok()); // Adding columns over the limit is an error let batches = lp_to_writes("i_got_columns,tag1=A,tag2=B i_got_music=42i 123456"); assert_matches!( - validate_column_limits(&batches, &schema), - Err(OverColumnLimit { + validate_schema_limits(&batches, &schema), + Err(CachedServiceProtectionLimit::Column { table_name: _, existing_column_count: 1, merged_column_count: 4, @@ -491,12 +574,12 @@ mod tests { let schema = namespace.schema().await; // Columns already existing is allowed let batches = lp_to_writes("bananas greatness=42i 123456"); - assert!(validate_column_limits(&batches, &schema).is_ok()); + assert!(validate_schema_limits(&batches, &schema).is_ok()); // Adding columns over the limit is an error let batches = lp_to_writes("bananas i_got_music=42i 123456"); assert_matches!( - validate_column_limits(&batches, &schema), - Err(OverColumnLimit { + validate_schema_limits(&batches, &schema), + Err(CachedServiceProtectionLimit::Column { table_name: _, existing_column_count: 3, merged_column_count: 4, @@ -549,12 +632,12 @@ mod tests { // Columns already existing is allowed let batches = lp_to_writes("dragonfruit val=42i 123456"); - assert!(validate_column_limits(&batches, &schema).is_ok()); + assert!(validate_schema_limits(&batches, &schema).is_ok()); // Adding more columns over the limit is an error let batches = lp_to_writes("dragonfruit i_got_music=42i 123456"); assert_matches!( - validate_column_limits(&batches, &schema), - Err(OverColumnLimit { + validate_schema_limits(&batches, &schema), + Err(CachedServiceProtectionLimit::Column { table_name: _, existing_column_count: 4, merged_column_count: 5, @@ -564,6 +647,124 @@ mod tests { } } + #[tokio::test] + async fn test_validate_table_limits() { + let (_catalog, namespace) = test_setup().await; + + namespace.update_table_limit(2).await; + + // Creating a table in an empty namespace is OK + { + let schema = namespace.schema().await; + let batches = lp_to_writes("nonexistent val=42i 123456"); + assert!(validate_schema_limits(&batches, &schema).is_ok()); + } + + // Creating two tables (the limit) is OK + { + let schema = namespace.schema().await; + let batches = lp_to_writes("nonexistent val=42i 123456\nbananas val=2 42"); + assert!(validate_schema_limits(&batches, &schema).is_ok()); + } + + // Creating three tables (above the limit) fails + { + let schema = namespace.schema().await; + let batches = + lp_to_writes("nonexistent val=42i 123456\nbananas val=2 42\nplatanos val=2 42"); + assert_matches!( + validate_schema_limits(&batches, &schema), + Err(CachedServiceProtectionLimit::Table { + existing_table_count: 0, + merged_table_count: 3, + table_count_limit: 2 + }) + ); + } + + // Create a table to cover non-empty namespaces + namespace.create_table("bananas").await; + + // Adding a second table is OK + { + let schema = namespace.schema().await; + let batches = lp_to_writes("bananas val=2 42\nplatanos val=2 42"); + assert!(validate_schema_limits(&batches, &schema).is_ok()); + } + + // Adding a third table is rejected OK + { + let schema = namespace.schema().await; + let batches = lp_to_writes("bananas val=2 42\nplatanos val=2 42\nnope v=2 42"); + assert_matches!( + validate_schema_limits(&batches, &schema), + Err(CachedServiceProtectionLimit::Table { + existing_table_count: 1, + merged_table_count: 3, + table_count_limit: 2 + }) + ); + } + + // Create another table and reduce the table limit to be less than the + // current number of tables. + // + // Multiple router instances can race to populate the catalog with new + // tables/columns, therefore all existing tables MUST be accepted to + // ensure deterministic enforcement once all caches have converged. + namespace.create_table("platanos").await; + namespace.update_table_limit(1).await; + + // The existing tables are accepted, even though this single write + // exceeds the new table limit. + { + let schema = namespace.schema().await; + assert_eq!(schema.tables.len(), 2); + assert_eq!(schema.max_tables, 1); + + let batches = lp_to_writes("bananas val=2 42\nplatanos val=2 42"); + assert_matches!(validate_schema_limits(&batches, &schema), Ok(())); + } + + // A new table is always rejected. + { + let schema = namespace.schema().await; + let batches = lp_to_writes("bananas val=2 42\nplatanos val=2 42\nnope v=1 42"); + assert_matches!( + validate_schema_limits(&batches, &schema), + Err(CachedServiceProtectionLimit::Table { + existing_table_count: 2, + merged_table_count: 3, + table_count_limit: 1, + }) + ); + } + { + let schema = namespace.schema().await; + let batches = lp_to_writes("bananas val=2 42\nnope v=1 42"); + assert_matches!( + validate_schema_limits(&batches, &schema), + Err(CachedServiceProtectionLimit::Table { + existing_table_count: 2, + merged_table_count: 3, + table_count_limit: 1, + }) + ); + } + { + let schema = namespace.schema().await; + let batches = lp_to_writes("nope v=1 42"); + assert_matches!( + validate_schema_limits(&batches, &schema), + Err(CachedServiceProtectionLimit::Table { + existing_table_count: 2, + merged_table_count: 3, + table_count_limit: 1, + }) + ); + } + } + // Parse `lp` into a table-keyed MutableBatch map. fn lp_to_writes(lp: &str) -> HashMap { let (writes, _) = mutable_batch_lp::lines_to_batches_stats(lp, 42) From 3881e1173450605bb2540aa63be20524b0471108 Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Mon, 6 Feb 2023 17:43:37 +0100 Subject: [PATCH 05/11] test(router): service limit error messages Assert the user-facing service limit error messages. --- router/src/server/http.rs | 26 +++++++++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) diff --git a/router/src/server/http.rs b/router/src/server/http.rs index 281a4bdd42..222d4815cc 100644 --- a/router/src/server/http.rs +++ b/router/src/server/http.rs @@ -560,7 +560,10 @@ where mod tests { use super::*; use crate::{ - dml_handlers::mock::{MockDmlHandler, MockDmlHandlerCall}, + dml_handlers::{ + mock::{MockDmlHandler, MockDmlHandlerCall}, + CachedServiceProtectionLimit, + }, namespace_resolver::{mock::MockNamespaceResolver, NamespaceCreationError}, }; use assert_matches::assert_matches; @@ -1527,5 +1530,26 @@ mod tests { RequestLimit, "this service is overloaded, please try again later", ), + + ( + DmlHandler(DmlError::Schema(SchemaError::ServiceLimit(Box::new(CachedServiceProtectionLimit::Column { + table_name: "bananas".to_string(), + existing_column_count: 42, + merged_column_count: 4242, + max_columns_per_table: 24, + })))), + "dml handler error: service limit reached: couldn't create columns in table `bananas`; table contains 42 \ + existing columns, applying this write would result in 4242 columns, limit is 24", + ), + + ( + DmlHandler(DmlError::Schema(SchemaError::ServiceLimit(Box::new(CachedServiceProtectionLimit::Table { + existing_table_count: 42, + merged_table_count: 4242, + table_count_limit: 24, + })))), + "dml handler error: service limit reached: couldn't create new table; namespace contains 42 existing \ + tables, applying this write would result in 4242 columns, limit is 24", + ), } } From 3f2eb54bcee89063e29a7a22fa6bdbdf188f81bc Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Mon, 6 Feb 2023 17:55:09 +0100 Subject: [PATCH 06/11] test(router): catalog service limit errors Assert the service limit error messages from the catalog. --- router/src/server/http.rs | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/router/src/server/http.rs b/router/src/server/http.rs index 222d4815cc..c09e9b4354 100644 --- a/router/src/server/http.rs +++ b/router/src/server/http.rs @@ -567,7 +567,7 @@ mod tests { namespace_resolver::{mock::MockNamespaceResolver, NamespaceCreationError}, }; use assert_matches::assert_matches; - use data_types::{NamespaceId, NamespaceNameError}; + use data_types::{NamespaceId, NamespaceNameError, TableId}; use flate2::{write::GzEncoder, Compression}; use hyper::header::HeaderValue; use metric::{Attributes, Metric}; @@ -1551,5 +1551,22 @@ mod tests { "dml handler error: service limit reached: couldn't create new table; namespace contains 42 existing \ tables, applying this write would result in 4242 columns, limit is 24", ), + + ( + DmlHandler(DmlError::Schema(SchemaError::ServiceLimit(Box::new(iox_catalog::interface::Error::ColumnCreateLimitError { + column_name: "bananas".to_string(), + table_id: TableId::new(42), + })))), + "dml handler error: service limit reached: couldn't create column bananas in table 42; limit reached on \ + namespace", + ), + + ( + DmlHandler(DmlError::Schema(SchemaError::ServiceLimit(Box::new(iox_catalog::interface::Error::TableCreateLimitError { + table_name: "bananas".to_string(), + namespace_id: NamespaceId::new(42), + })))), + "dml handler error: service limit reached: couldn't create table bananas; limit reached on namespace 42", + ), } } From bf6ab7fd8811b6c9a30248dd0dd9321e5fba5033 Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Mon, 6 Feb 2023 18:03:26 +0100 Subject: [PATCH 07/11] fix: error message typo columns -> tables for table limit error. --- router/src/dml_handlers/schema_validation.rs | 2 +- router/src/server/http.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/router/src/dml_handlers/schema_validation.rs b/router/src/dml_handlers/schema_validation.rs index b2699909d9..2345e6e732 100644 --- a/router/src/dml_handlers/schema_validation.rs +++ b/router/src/dml_handlers/schema_validation.rs @@ -383,7 +383,7 @@ pub enum CachedServiceProtectionLimit { #[error( "couldn't create new table; namespace contains {existing_table_count} \ existing tables, applying this write would result in \ - {merged_table_count} columns, limit is {table_count_limit}" + {merged_table_count} tables, limit is {table_count_limit}" )] Table { /// Number of tables already in the namespace. diff --git a/router/src/server/http.rs b/router/src/server/http.rs index c09e9b4354..f4813b7402 100644 --- a/router/src/server/http.rs +++ b/router/src/server/http.rs @@ -1549,7 +1549,7 @@ mod tests { table_count_limit: 24, })))), "dml handler error: service limit reached: couldn't create new table; namespace contains 42 existing \ - tables, applying this write would result in 4242 columns, limit is 24", + tables, applying this write would result in 4242 tables, limit is 24", ), ( From 65e7ff779c72f60485fb6e6903b7d0b062f4f561 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 7 Feb 2023 11:49:38 +0100 Subject: [PATCH 08/11] feat: verified test output display for compactor (#6859) * feat: verified test output display for compactor * fix: demacrofy assert_parquet_files * fix: demacrofiy assert_parquet_files_split * refactor: compute min/max without sentinels * fix: clippy --------- Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com> --- .../target_level_target_level_split.rs | 112 ++++- compactor2/src/test_util.rs | 3 + compactor2/src/test_util/display.rs | 387 ++++++++++++++++++ 3 files changed, 493 insertions(+), 9 deletions(-) create mode 100644 compactor2/src/test_util/display.rs diff --git a/compactor2/src/components/files_split/target_level_target_level_split.rs b/compactor2/src/components/files_split/target_level_target_level_split.rs index c9b5c0bd61..49d4716c2f 100644 --- a/compactor2/src/components/files_split/target_level_target_level_split.rs +++ b/compactor2/src/components/files_split/target_level_target_level_split.rs @@ -36,7 +36,8 @@ impl FilesSplit for TargetLevelTargetLevelSplit { mod tests { use crate::test_util::{ - create_l0_files, create_l1_files, create_l2_files, create_overlapped_files, + assert_parquet_files, assert_parquet_files_split, create_l0_files, create_l1_files, + create_l2_files, create_overlapped_files, }; use super::*; @@ -62,7 +63,13 @@ mod tests { #[test] fn test_apply_partial_empty_files_l0() { let files = create_l0_files(1); - assert_eq!(files.len(), 3); + let expected = vec![ + "L0 ", + "L0.2[650,750] |-----L0.2------| ", + "L0.1[450,620] |------------L0.1------------| ", + "L0.3[800,900] |-----L0.3------| ", + ]; + assert_parquet_files(expected, &files); let split = TargetLevelTargetLevelSplit::new(); let (lower, higher) = split.apply(files.clone(), CompactionLevel::Initial); @@ -81,13 +88,19 @@ mod tests { #[test] fn test_apply_partial_empty_files_l1() { let files = create_l1_files(1); - assert_eq!(files.len(), 3); + let expected = vec![ + "L1 ", + "L1.13[600,700] |-----L1.13-----| ", + "L1.12[400,500] |-----L1.12-----| ", + "L1.11[250,350] |-----L1.11-----| ", + ]; + assert_parquet_files(expected, &files); let split = TargetLevelTargetLevelSplit::new(); let (lower, higher) = split.apply(files.clone(), CompactionLevel::Initial); assert_eq!(lower.len(), 0); assert_eq!(higher.len(), 3); - // + let (lower, higher) = split.apply(files.clone(), CompactionLevel::FileNonOverlapped); assert_eq!(lower.len(), 3); assert_eq!(higher.len(), 0); @@ -100,7 +113,12 @@ mod tests { #[test] fn test_apply_partial_empty_files_l2() { let files = create_l2_files(); - assert_eq!(files.len(), 2); + let expected = vec![ + "L2 ", + "L2.21[0,100] |---------L2.21----------| ", + "L2.22[200,300] |---------L2.22----------| ", + ]; + assert_parquet_files(expected, &files); let split = TargetLevelTargetLevelSplit::new(); let (lower, higher) = split.apply(files.clone(), CompactionLevel::Initial); @@ -120,10 +138,41 @@ mod tests { fn test_apply_target_level_0() { // Test target level Initial let files = create_overlapped_files(); - assert_eq!(files.len(), 8); + let expected = vec![ + "L0 ", + "L0.2[650,750]@1 |-L0.2-| ", + "L0.1[450,620]@1 |----L0.1-----| ", + "L0.3[800,900]@100 |-L0.3-| ", + "L1 ", + "L1.13[600,700]@100 |L1.13-| ", + "L1.12[400,500]@1 |L1.12-| ", + "L1.11[250,350]@1 |L1.11-| ", + "L2 ", + "L2.21[0,100]@1 |L2.21-| ", + "L2.22[200,300]@1 |L2.22-| ", + ]; + assert_parquet_files(expected, &files); let split = TargetLevelTargetLevelSplit::new(); let (lower, higher) = split.apply(files, CompactionLevel::Initial); + + let expected = vec![ + "left", + "L0 ", + "L0.2[650,750]@1 |-----L0.2------| ", + "L0.1[450,620]@1 |------------L0.1------------| ", + "L0.3[800,900]@100 |-----L0.3------| ", + "right", + "L1 ", + "L1.13[600,700]@100 |--L1.13--| ", + "L1.12[400,500]@1 |--L1.12--| ", + "L1.11[250,350]@1 |--L1.11--| ", + "L2 ", + "L2.21[0,100]@1 |--L2.21--| ", + "L2.22[200,300]@1 |--L2.22--| ", + ]; + assert_parquet_files_split(expected, &lower, &higher); + // verify number of files assert_eq!(lower.len(), 3); assert_eq!(higher.len(), 5); @@ -141,10 +190,41 @@ mod tests { fn test_apply_target_level_l1() { // Test target level is FileNonOverlapped let files = create_overlapped_files(); - assert_eq!(files.len(), 8); + let expected = vec![ + "L0 ", + "L0.2[650,750]@1 |-L0.2-| ", + "L0.1[450,620]@1 |----L0.1-----| ", + "L0.3[800,900]@100 |-L0.3-| ", + "L1 ", + "L1.13[600,700]@100 |L1.13-| ", + "L1.12[400,500]@1 |L1.12-| ", + "L1.11[250,350]@1 |L1.11-| ", + "L2 ", + "L2.21[0,100]@1 |L2.21-| ", + "L2.22[200,300]@1 |L2.22-| ", + ]; + assert_parquet_files(expected, &files); let split = TargetLevelTargetLevelSplit::new(); let (lower, higher) = split.apply(files, CompactionLevel::FileNonOverlapped); + + let expected = vec![ + "left", + "L0 ", + "L0.2[650,750]@1 |---L0.2---| ", + "L0.1[450,620]@1 |-------L0.1-------| ", + "L0.3[800,900]@100 |---L0.3---| ", + "L1 ", + "L1.13[600,700]@100 |--L1.13---| ", + "L1.12[400,500]@1 |--L1.12---| ", + "L1.11[250,350]@1 |--L1.11---| ", + "right", + "L2 ", + "L2.21[0,100] |---------L2.21----------| ", + "L2.22[200,300] |---------L2.22----------| ", + ]; + assert_parquet_files_split(expected, &lower, &higher); + // verify number of files assert_eq!(lower.len(), 6); assert_eq!(higher.len(), 2); @@ -162,11 +242,25 @@ mod tests { fn test_apply_taget_level_l2() { // Test target level is Final let files = create_overlapped_files(); - assert_eq!(files.len(), 8); + let expected = vec![ + "L0 ", + "L0.2[650,750]@1 |-L0.2-| ", + "L0.1[450,620]@1 |----L0.1-----| ", + "L0.3[800,900]@100 |-L0.3-| ", + "L1 ", + "L1.13[600,700]@100 |L1.13-| ", + "L1.12[400,500]@1 |L1.12-| ", + "L1.11[250,350]@1 |L1.11-| ", + "L2 ", + "L2.21[0,100]@1 |L2.21-| ", + "L2.22[200,300]@1 |L2.22-| ", + ]; + assert_parquet_files(expected, &files); let split = TargetLevelTargetLevelSplit::new(); let (lower, higher) = split.apply(files, CompactionLevel::Final); - // verify number of files + + // verify number of files (nothing in higher) assert_eq!(lower.len(), 8); assert_eq!(higher.len(), 0); // verify compaction level of files diff --git a/compactor2/src/test_util.rs b/compactor2/src/test_util.rs index 1be928c6ff..c52a15f401 100644 --- a/compactor2/src/test_util.rs +++ b/compactor2/src/test_util.rs @@ -1,3 +1,6 @@ +mod display; +pub(crate) use display::{assert_parquet_files, assert_parquet_files_split}; + use std::{ collections::{BTreeMap, HashSet}, future::Future, diff --git a/compactor2/src/test_util/display.rs b/compactor2/src/test_util/display.rs new file mode 100644 index 0000000000..cb1d859e21 --- /dev/null +++ b/compactor2/src/test_util/display.rs @@ -0,0 +1,387 @@ +use std::collections::BTreeMap; + +use data_types::{CompactionLevel, ParquetFile}; + +/// Compares the a vec of strs with the output of a set of parquet +/// files. See docs on [`ParquetFileFormatter`] for example +/// expected output. +/// +/// Designed so that failure output can be directly copy/pasted +/// into the test code as expected results. +/// +/// Expects to be called about like this: +/// assert_parquet_files!(expected_lines: &[&str], &files) +#[track_caller] +pub fn assert_parquet_files<'a>( + expected_lines: impl IntoIterator, + files: &[ParquetFile], +) { + let expected_lines: Vec = expected_lines.into_iter().map(|s| s.to_string()).collect(); + + let actual_lines = readable_list_of_files(None, files); + + assert_eq!( + expected_lines, actual_lines, + "\n\nexpected:\n\n{expected_lines:#?}\nactual:\n\n{actual_lines:#?}\n\n", + ); +} + +/// Compares the a vec of strs with the output of a set of parquet +/// files. This is used to compare the results of splitting files into +/// two groups. See docs on [`ParquetFileFormatter`] for example +/// expected output. +/// +/// Designed so that failure output can be directly copy/pasted +/// into the test code as expected results. +/// +/// Expects to be called about like this: +/// assert_parquet_files_split!(expected_lines: &[&str], &files1, &files2) +#[track_caller] +pub fn assert_parquet_files_split<'a>( + expected_lines: impl IntoIterator, + files1: &[ParquetFile], + files2: &[ParquetFile], +) { + let expected_lines: Vec = expected_lines.into_iter().map(|s| s.to_string()).collect(); + + let actual_lines_one = readable_list_of_files(Some("left".into()), files1); + + let actual_lines_two = readable_list_of_files(Some("right".into()), files2); + + let actual_lines: Vec<_> = actual_lines_one + .into_iter() + .chain(actual_lines_two.into_iter()) + .collect(); + + assert_eq!( + expected_lines, actual_lines, + "\n\nexpected:\n\n{expected_lines:#?}\nactual:\n\n{actual_lines:#?}\n\n", + ); +} + +/// default width for printing +const DEFAULT_WIDTH: usize = 80; + +/// default width for header +const DEFAULT_HEADING_WIDTH: usize = 20; + +/// This function returns a visual representation of the list of +/// parquet files arranged so they are lined up horizontally based on +/// their relative time range. +/// +/// See docs on [`ParquetFileFormatter`] +/// for examples. +pub fn readable_list_of_files<'a>( + title: Option, + files: impl IntoIterator, +) -> Vec { + let mut output = vec![]; + if let Some(title) = title { + output.push(title); + } + + let files: Vec<_> = files.into_iter().collect(); + if files.is_empty() { + return output; + } + + let formatter = ParquetFileFormatter::new(&files); + + // split up the files into groups by levels (compaction levels) + let mut files_by_level = BTreeMap::new(); + for file in &files { + let existing_files = files_by_level + .entry(file.compaction_level) + .or_insert_with(Vec::new); + existing_files.push(file); + } + + for (level, files) in files_by_level { + output.push(formatter.format_level(&level)); + for file in files { + output.push(formatter.format_file(file)) + } + } + + output +} + +/// Formats a parquet files as a single line of text, with widths +/// normalized based on their min/max times and lined up horizontally +/// based on their relative time range. +/// +/// Each file has this format: +/// +/// ```text +/// L.[min_time,max_time]@file_size_bytes +/// ``` +/// +/// Example +/// +/// ```text +/// L0 +/// L0.1[100,200]@1 |----------L0.1----------| +/// L0.2[300,400]@1 |----------L0.2----------| +/// L0.11[150,350]@44 |-----------------------L0.11-----------------------| +/// ``` +#[derive(Debug, Default)] +struct ParquetFileFormatter { + /// should the size of the files be shown (if they are different) + show_size: bool, + /// width in characater + row_heading_chars: usize, + /// width, in characters, of the entire min/max timerange + width_chars: usize, + /// how many ns are given a single character's width + ns_per_char: f64, + /// what is the lowest time range in any file + min_time: i64, + /// what is the largest time in any file? + max_time: i64, +} + +#[derive(Debug)] +/// helper to track if there are multiple file sizes in a set of parquet files +enum FileSizeSeen { + None, + One(i64), + Many, +} + +impl FileSizeSeen { + fn observe(self, file_size_bytes: i64) -> Self { + match self { + Self::None => Self::One(file_size_bytes), + // same file size? + Self::One(sz) if sz == file_size_bytes => Self::One(sz), + // different file size or already seen difference + Self::One(_) | Self::Many => Self::Many, + } + } +} + +impl ParquetFileFormatter { + /// calculates display parameters for formatting a set of files + fn new(files: &[&ParquetFile]) -> Self { + let row_heading_chars = DEFAULT_HEADING_WIDTH; + let width_chars = DEFAULT_WIDTH; + + let min_time = files + .iter() + .map(|f| f.min_time.get()) + .min() + .expect("at least one file"); + let max_time = files + .iter() + .map(|f| f.max_time.get()) + .max() + .expect("at least one file"); + let file_size_seen = files + .iter() + .fold(FileSizeSeen::None, |file_size_seen, file| { + file_size_seen.observe(file.file_size_bytes) + }); + + // show the size if there are multiple sizes + let show_size = matches!(file_size_seen, FileSizeSeen::Many); + + let time_range = max_time - min_time; + + let ns_per_char = (time_range as f64) / (width_chars as f64); + + Self { + show_size, + width_chars, + ns_per_char, + min_time, + max_time, + row_heading_chars, + } + } + + /// return how many characters of `self.width_chars` would be consumed by `range` ns + fn time_range_to_chars(&self, time_range: i64) -> usize { + // avoid divide by zero + if self.ns_per_char > 0.0 { + (time_range as f64 / self.ns_per_char) as usize + } else if time_range > 0 { + self.width_chars + } else { + 0 + } + } + + fn format_level(&self, level: &CompactionLevel) -> String { + format!( + "{:width$}", + display_level(level), + width = self.width_chars + self.row_heading_chars + ) + } + + /// Formats a single parquet file into a string of `width_chars` + /// characters, which tries to visually depict the timge range of + /// the file using the width. See docs on [`ParquetFileFormatter`] + /// for examples. + fn format_file(&self, file: &ParquetFile) -> String { + // use try_into to force conversion to usize + let time_width = (file.max_time - file.min_time).get(); + + // special case "zero" width times + let field_width = if self.min_time == self.max_time { + self.width_chars + } else { + self.time_range_to_chars(time_width) + } + // account for starting and ending '|' + .saturating_sub(2); + + // Get compact display of the file, like 'L0.1' + // add |--- ---| formatting (based on field width) + let file_string = format!("|{:-^width$}|", display_file_id(file), width = field_width); + let row_heading = display_format(file, self.show_size); + + // special case "zero" width times + if self.min_time == self.max_time { + return format!( + "{row_heading:width1$}{file_string:^width2$}", + width1 = self.row_heading_chars, + width2 = self.width_chars, + ); + } + + // otherwise, figure out whitespace padding at start and back + // based on the relative start time of the file + // assume time from 0 + let prefix_time_range = file.min_time.get().saturating_sub(self.min_time); + let prefix_padding = " ".repeat(self.time_range_to_chars(prefix_time_range)); + + // pad the rest with whitespace + let postfix_padding_len = self + .width_chars + .saturating_sub(file_string.len()) + .saturating_sub(prefix_padding.len()); + let postfix_padding = " ".repeat(postfix_padding_len); + + format!( + "{row_heading:width$}{prefix_padding}{file_string}{postfix_padding}", + width = self.row_heading_chars + ) + } +} + +fn display_level(compaction_level: &CompactionLevel) -> &'static str { + match compaction_level { + CompactionLevel::Initial => "L0", + CompactionLevel::FileNonOverlapped => "L1", + CompactionLevel::Final => "L2", + } +} + +/// Display like 'L0.1' with file level and id +fn display_file_id(file: &ParquetFile) -> String { + let level = display_level(&file.compaction_level); + let id = file.id; + format!("{level}.{id}") +} + +/// Compact display of level, id min/max time and optional size. +/// +/// Example +/// +/// ```text +/// L0.1[100,200]@1 +/// ``` +fn display_format(file: &ParquetFile, show_size: bool) -> String { + let file_id = display_file_id(file); + let min_time = file.min_time.get(); // display as i64 + let max_time = file.max_time.get(); // display as i64 + let sz = file.file_size_bytes; + if show_size { + format!("{file_id}[{min_time},{max_time}]@{sz}") + } else { + format!("{file_id}[{min_time},{max_time}]") + } +} + +#[cfg(test)] +mod test { + use crate::test_util::ParquetFileBuilder; + + use super::*; + + #[test] + fn display_builder() { + let files = vec![ + ParquetFileBuilder::new(1) + .with_compaction_level(CompactionLevel::Initial) + .build(), + ParquetFileBuilder::new(2) + .with_compaction_level(CompactionLevel::Initial) + .build(), + ]; + + let expected = vec![ + "L0 ", + "L0.1[0,0] |-------------------------------------L0.1-------------------------------------|", + "L0.2[0,0] |-------------------------------------L0.2-------------------------------------|", + ]; + + assert_parquet_files(expected, &files); + } + + #[test] + fn display_builder_multi_levels_with_size() { + let files = vec![ + ParquetFileBuilder::new(1) + .with_compaction_level(CompactionLevel::Initial) + .build(), + ParquetFileBuilder::new(2) + .with_compaction_level(CompactionLevel::Initial) + .build(), + ParquetFileBuilder::new(3) + .with_compaction_level(CompactionLevel::Final) + .with_file_size_bytes(42) + .build(), + ]; + + let expected = vec![ + "L0 ", + "L0.1[0,0]@1 |-------------------------------------L0.1-------------------------------------|", + "L0.2[0,0]@1 |-------------------------------------L0.2-------------------------------------|", + "L2 ", + "L2.3[0,0]@42 |-------------------------------------L2.3-------------------------------------|", + ]; + + assert_parquet_files(expected, &files); + } + + #[test] + fn display_builder_size_time_ranges() { + let files = vec![ + ParquetFileBuilder::new(1) + .with_compaction_level(CompactionLevel::Initial) + .with_time_range(100, 200) + .build(), + ParquetFileBuilder::new(2) + .with_compaction_level(CompactionLevel::Initial) + .with_time_range(300, 400) + .build(), + // overlapping file + ParquetFileBuilder::new(11) + .with_compaction_level(CompactionLevel::Initial) + .with_time_range(150, 350) + .with_file_size_bytes(44) + .build(), + ]; + + let expected = vec![ + "L0 ", + "L0.1[100,200]@1 |----------L0.1----------| ", + "L0.2[300,400]@1 |----------L0.2----------| ", + "L0.11[150,350]@44 |-----------------------L0.11-----------------------| ", + ]; + + assert_parquet_files(expected, &files); + } +} From 3a4364b8f8be29eff699455b19793b71a9e9421a Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Tue, 7 Feb 2023 12:01:33 +0100 Subject: [PATCH 09/11] fix: partition-done metrics/logs (#6882) Ignore partitions that where throttled or filtered due to the "not unique" combo. This is in line w/ the "partitions source", so the metric for "partition in" and "partition out" line up. Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com> --- compactor2/src/components/hardcoded.rs | 42 +++++++++++++------------- 1 file changed, 21 insertions(+), 21 deletions(-) diff --git a/compactor2/src/components/hardcoded.rs b/compactor2/src/components/hardcoded.rs index 63c9c73b7d..45b79c329e 100644 --- a/compactor2/src/components/hardcoded.rs +++ b/compactor2/src/components/hardcoded.rs @@ -133,26 +133,6 @@ pub fn hardcoded_components(config: &Config) -> Arc { Arc::clone(&config.catalog), )) }; - let partition_done_sink = - LoggingPartitionDoneSinkWrapper::new(MetricsPartitionDoneSinkWrapper::new( - ErrorKindPartitionDoneSinkWrapper::new( - partition_done_sink, - ErrorKind::variants() - .iter() - .filter(|kind| { - // use explicit match statement so we never forget to add new variants - match kind { - ErrorKind::OutOfMemory | ErrorKind::Timeout | ErrorKind::Unknown => { - true - } - ErrorKind::ObjectStore => false, - } - }) - .copied() - .collect(), - ), - &config.metric_registry, - )); let commit: Arc = if config.shadow_mode { Arc::new(MockCommit::new()) @@ -219,7 +199,27 @@ pub fn hardcoded_components(config: &Config) -> Arc { &config.metric_registry, ), )), - partition_done_sink: Arc::new(partition_done_sink), + partition_done_sink: Arc::new(LoggingPartitionDoneSinkWrapper::new( + MetricsPartitionDoneSinkWrapper::new( + ErrorKindPartitionDoneSinkWrapper::new( + partition_done_sink, + ErrorKind::variants() + .iter() + .filter(|kind| { + // use explicit match statement so we never forget to add new variants + match kind { + ErrorKind::OutOfMemory + | ErrorKind::Timeout + | ErrorKind::Unknown => true, + ErrorKind::ObjectStore => false, + } + }) + .copied() + .collect(), + ), + &config.metric_registry, + ), + )), commit: Arc::new(LoggingCommitWrapper::new(MetricsCommitWrapper::new( commit, &config.metric_registry, From e179887edef7a3f47a95ee1cf52ec7659a6e1d12 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Tue, 7 Feb 2023 13:07:45 +0100 Subject: [PATCH 10/11] fix: metric buckets (#6883) --- compactor2/src/components/commit/metrics.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/compactor2/src/components/commit/metrics.rs b/compactor2/src/components/commit/metrics.rs index 36f1209b0e..88c8cd1a30 100644 --- a/compactor2/src/components/commit/metrics.rs +++ b/compactor2/src/components/commit/metrics.rs @@ -111,13 +111,13 @@ where registry, METRIC_NAME_JOB_FILES, "Number of files committed by the compactor, per job", - HistogramType::Bytes, + HistogramType::Files, ), job_bytes: Histogram::new( registry, METRIC_NAME_JOB_BYTES, "Number of bytes committed by the compactor, per job", - HistogramType::Files, + HistogramType::Bytes, ), job_rows: Histogram::new( registry, From 997cca67a30ed2efb3747c051d5ddf85d175a9b1 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Tue, 7 Feb 2023 14:52:17 +0100 Subject: [PATCH 11/11] fix: `--compaction-process-once` should exit (#6886) - do not wait for a non-empty partition result (this doesn't make sense if we are not running endlessly) - modify entry point to allow the compactor to exit on its own (this is normally not allowed for other server types) --- compactor2/src/compactor.rs | 4 +--- compactor2/src/compactor_tests.rs | 7 ++----- compactor2/src/components/hardcoded.rs | 17 ++++++++++++----- influxdb_iox/src/commands/run/compactor2.rs | 12 +++++++++++- 4 files changed, 26 insertions(+), 14 deletions(-) diff --git a/compactor2/src/compactor.rs b/compactor2/src/compactor.rs index bdaaf7cc39..0e7365da1c 100644 --- a/compactor2/src/compactor.rs +++ b/compactor2/src/compactor.rs @@ -58,10 +58,8 @@ impl Compactor2 { _ = async { compact(config.partition_concurrency, config.partition_timeout, Arc::clone(&job_semaphore), &components).await; - // the main entry point does not allow servers to shut down themselves, so we just wait forever info!("comapctor done"); - futures::future::pending::<()>().await; - } => unreachable!(), + } => {} } }); let worker = shared_handle(worker); diff --git a/compactor2/src/compactor_tests.rs b/compactor2/src/compactor_tests.rs index 45aa2eb16a..a0bcbb99fb 100644 --- a/compactor2/src/compactor_tests.rs +++ b/compactor2/src/compactor_tests.rs @@ -13,7 +13,7 @@ mod tests { }, config::AlgoVersion, driver::compact, - test_util::{list_object_store, AssertFutureExt, TestSetup}, + test_util::{list_object_store, TestSetup}, }; #[tokio::test] @@ -27,10 +27,7 @@ mod tests { assert!(files.is_empty()); // compact - // This wil wait for files forever. - let fut = run_compact(&setup); - tokio::pin!(fut); - fut.assert_pending().await; + run_compact(&setup).await; // verify catalog is still empty let files = setup.list_by_table_not_to_delete().await; diff --git a/compactor2/src/components/hardcoded.rs b/compactor2/src/components/hardcoded.rs index 45b79c329e..a1f30c8d6c 100644 --- a/compactor2/src/components/hardcoded.rs +++ b/compactor2/src/components/hardcoded.rs @@ -162,14 +162,21 @@ pub fn hardcoded_components(config: &Config) -> Arc { // Note: Place "not empty" wrapper at the very last so that the logging and metric wrapper work even when there // is not data. - let partitions_source = NotEmptyPartitionsSourceWrapper::new( + let partitions_source = LoggingPartitionsSourceWrapper::new(MetricsPartitionsSourceWrapper::new( RandomizeOrderPartitionsSourcesWrapper::new(partitions_source, 1234), &config.metric_registry, - )), - Duration::from_secs(5), - Arc::clone(&config.time_provider), - ); + )); + let partitions_source: Arc = if config.process_once { + // do not wrap into the "not empty" filter because we do NOT wanna throttle in this case but just exit early + Arc::new(partitions_source) + } else { + Arc::new(NotEmptyPartitionsSourceWrapper::new( + partitions_source, + Duration::from_secs(5), + Arc::clone(&config.time_provider), + )) + }; let partition_stream: Arc = if config.process_once { Arc::new(OncePartititionStream::new(partitions_source)) diff --git a/influxdb_iox/src/commands/run/compactor2.rs b/influxdb_iox/src/commands/run/compactor2.rs index c3ce923f46..c1171b33f0 100644 --- a/influxdb_iox/src/commands/run/compactor2.rs +++ b/influxdb_iox/src/commands/run/compactor2.rs @@ -112,6 +112,7 @@ pub async fn command(config: Config) -> Result<(), Error> { })); let time_provider = Arc::new(SystemProvider::new()); + let process_once = config.compactor_config.process_once; let server_type = create_compactor2_server_type( &common_state, Arc::clone(&metric_registry), @@ -127,5 +128,14 @@ pub async fn command(config: Config) -> Result<(), Error> { info!("starting compactor"); let services = vec![Service::create(server_type, common_state.run_config())]; - Ok(main::main(common_state, services, metric_registry).await?) + + let res = main::main(common_state, services, metric_registry).await; + match res { + Ok(()) => Ok(()), + // compactor2 is allowed to shut itself down + Err(main::Error::Wrapper { + source: _source @ ioxd_common::Error::LostServer, + }) if process_once => Ok(()), + Err(e) => Err(e.into()), + } }