diff --git a/clap_blocks/src/catalog_dsn.rs b/clap_blocks/src/catalog_dsn.rs index 956c85dee2..54fda61500 100644 --- a/clap_blocks/src/catalog_dsn.rs +++ b/clap_blocks/src/catalog_dsn.rs @@ -1,14 +1,13 @@ //! Catalog-DSN-related configs. use iox_catalog::sqlite::{SqliteCatalog, SqliteConnectionOptions}; use iox_catalog::{ - create_or_get_default_records, interface::Catalog, mem::MemCatalog, postgres::{PostgresCatalog, PostgresConnectionOptions}, }; use observability_deps::tracing::*; use snafu::{OptionExt, ResultExt, Snafu}; -use std::{ops::DerefMut, sync::Arc, time::Duration}; +use std::{sync::Arc, time::Duration}; #[derive(Debug, Snafu)] #[allow(missing_docs)] @@ -211,12 +210,6 @@ impl CatalogDsnConfig { } CatalogType::Memory => { let mem = MemCatalog::new(metrics); - - let mut txn = mem.repositories().await; - create_or_get_default_records(txn.deref_mut()) - .await - .context(CatalogSnafu)?; - Arc::new(mem) as Arc } CatalogType::Sqlite => { diff --git a/clap_blocks/src/router2.rs b/clap_blocks/src/router2.rs index 149a5219ce..5fd0506dd9 100644 --- a/clap_blocks/src/router2.rs +++ b/clap_blocks/src/router2.rs @@ -71,26 +71,6 @@ pub struct Router2Config { )] pub ingester_addresses: Vec, - /// Write buffer topic/database that should be used. - // This isn't really relevant to the RPC write path and will be removed eventually. - #[clap( - long = "write-buffer-topic", - env = "INFLUXDB_IOX_WRITE_BUFFER_TOPIC", - default_value = "iox-shared", - action - )] - pub topic: String, - - /// Query pool name to dispatch writes to. - // This isn't really relevant to the RPC write path and will be removed eventually. - #[clap( - long = "query-pool", - env = "INFLUXDB_IOX_QUERY_POOL_NAME", - default_value = "iox-shared", - action - )] - pub query_pool_name: String, - /// Retention period to use when auto-creating namespaces. /// For infinite retention, leave this unset and it will default to `None`. /// Setting it to zero will not make it infinite. diff --git a/compactor2/src/components/namespaces_source/mock.rs b/compactor2/src/components/namespaces_source/mock.rs index 5bebc24dc2..fab5243e5d 100644 --- a/compactor2/src/components/namespaces_source/mock.rs +++ b/compactor2/src/components/namespaces_source/mock.rs @@ -49,9 +49,7 @@ impl NamespacesSource for MockNamespacesSource { mod tests { use std::collections::BTreeMap; - use data_types::{ - ColumnId, ColumnSchema, ColumnType, QueryPoolId, TableId, TableSchema, TopicId, - }; + use data_types::{ColumnId, ColumnSchema, ColumnType, TableId, TableSchema}; use super::*; @@ -182,15 +180,11 @@ mod tests { ]); let id = NamespaceId::new(id); - let topic_id = TopicId::new(0); - let query_pool_id = QueryPoolId::new(0); Self { namespace: NamespaceWrapper { ns: Namespace { id, name: "ns".to_string(), - topic_id, - query_pool_id, max_tables: 10, max_columns_per_table: 10, retention_period_ns: None, @@ -198,8 +192,6 @@ mod tests { }, schema: NamespaceSchema { id, - topic_id, - query_pool_id, tables, max_columns_per_table: 10, max_tables: 42, diff --git a/data_types/src/lib.rs b/data_types/src/lib.rs index 4bdd34d3dc..9adc3e8442 100644 --- a/data_types/src/lib.rs +++ b/data_types/src/lib.rs @@ -129,42 +129,6 @@ impl std::fmt::Display for NamespaceId { } } -/// Unique ID for a Topic, assigned by the catalog and used in [`TopicMetadata`] -#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, sqlx::Type)] -#[sqlx(transparent)] -pub struct TopicId(i64); - -#[allow(missing_docs)] -impl TopicId { - pub const fn new(v: i64) -> Self { - Self(v) - } - pub fn get(&self) -> i64 { - self.0 - } -} - -impl std::fmt::Display for TopicId { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - write!(f, "{}", self.0) - } -} - -/// Unique ID for a `QueryPool` -#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, sqlx::Type)] -#[sqlx(transparent)] -pub struct QueryPoolId(i64); - -#[allow(missing_docs)] -impl QueryPoolId { - pub fn new(v: i64) -> Self { - Self(v) - } - pub fn get(&self) -> i64 { - self.0 - } -} - /// Unique ID for a `Table` #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, sqlx::Type)] #[sqlx(transparent)] @@ -341,25 +305,6 @@ impl std::fmt::Display for ParquetFileId { } } -/// Data object for a topic. When Kafka is used as the write buffer, this is the Kafka topic name -/// plus a catalog-assigned ID. -#[derive(Debug, Clone, Eq, PartialEq, sqlx::FromRow)] -pub struct TopicMetadata { - /// The id of the topic - pub id: TopicId, - /// The unique name of the topic - pub name: String, -} - -/// Data object for a query pool -#[derive(Debug, Clone, Eq, PartialEq, sqlx::FromRow)] -pub struct QueryPool { - /// The id of the pool - pub id: QueryPoolId, - /// The unique name of the pool - pub name: String, -} - /// Data object for a namespace #[derive(Debug, Clone, Eq, PartialEq, sqlx::FromRow)] pub struct Namespace { @@ -370,10 +315,6 @@ pub struct Namespace { #[sqlx(default)] /// The retention period in ns. None represents infinite duration (i.e. never drop data). pub retention_period_ns: Option, - /// The topic that writes to this namespace will land in - pub topic_id: TopicId, - /// The query pool assigned to answer queries for this namespace - pub query_pool_id: QueryPoolId, /// The maximum number of tables that can exist in this namespace pub max_tables: i32, /// The maximum number of columns per table in this namespace @@ -388,10 +329,6 @@ pub struct Namespace { pub struct NamespaceSchema { /// the namespace id pub id: NamespaceId, - /// the topic this namespace gets data written to - pub topic_id: TopicId, - /// the query pool assigned to answer queries for this namespace - pub query_pool_id: QueryPoolId, /// the tables in the namespace by name pub tables: BTreeMap, /// the number of columns per table this namespace allows @@ -407,8 +344,6 @@ impl NamespaceSchema { /// Create a new `NamespaceSchema` pub fn new( id: NamespaceId, - topic_id: TopicId, - query_pool_id: QueryPoolId, max_columns_per_table: i32, max_tables: i32, retention_period_ns: Option, @@ -416,8 +351,6 @@ impl NamespaceSchema { Self { id, tables: BTreeMap::new(), - topic_id, - query_pool_id, max_columns_per_table: max_columns_per_table as usize, max_tables: max_tables as usize, retention_period_ns, @@ -1171,35 +1104,6 @@ pub enum TemplatePart { /// partition key parts such as "2021-03-14 12:25:21" and /// "2021-04-14 12:24:21" TimeFormat(String), - /// Applies a regex to the value in a string column - RegexCapture(RegexCapture), - /// Applies a `strftime` pattern to some column other than "time" - StrftimeColumn(StrftimeColumn), -} - -/// `RegexCapture` is for pulling parts of a string column into the partition -/// key. -#[derive(Debug, Eq, PartialEq, Clone)] -#[allow(missing_docs)] -pub struct RegexCapture { - pub column: String, - pub regex: String, -} - -/// [`StrftimeColumn`] is used to create a time based partition key off some -/// column other than the builtin `time` column. -/// -/// The value of the named column is formatted using a `strftime` -/// style string. -/// -/// For example, a time format of "%Y-%m-%d %H:%M:%S" will produce -/// partition key parts such as "2021-03-14 12:25:21" and -/// "2021-04-14 12:24:21" -#[derive(Debug, Eq, PartialEq, Clone)] -#[allow(missing_docs)] -pub struct StrftimeColumn { - pub column: String, - pub format: String, } /// Represents a parsed delete predicate for evaluation by the InfluxDB IOx @@ -3006,8 +2910,6 @@ mod tests { fn test_namespace_schema_size() { let schema1 = NamespaceSchema { id: NamespaceId::new(1), - topic_id: TopicId::new(2), - query_pool_id: QueryPoolId::new(3), tables: BTreeMap::from([]), max_columns_per_table: 4, max_tables: 42, @@ -3015,8 +2917,6 @@ mod tests { }; let schema2 = NamespaceSchema { id: NamespaceId::new(1), - topic_id: TopicId::new(2), - query_pool_id: QueryPoolId::new(3), tables: BTreeMap::from([(String::from("foo"), TableSchema::new(TableId::new(1)))]), max_columns_per_table: 4, max_tables: 42, diff --git a/docs/cli.md b/docs/cli.md index f426f489cb..3ca67714ab 100644 --- a/docs/cli.md +++ b/docs/cli.md @@ -143,8 +143,6 @@ $ influxdb_iox namespace list $ influxdb_iox debug schema get 26f7e5a4b7be365b_917b97a92e883afc { "id": "1", - "topicId": "1", - "queryPoolId": "1", "tables": { "cpu": { "id": "5", diff --git a/docs/underground_guide.md b/docs/underground_guide.md index 0406902df6..1a0403ac5f 100644 --- a/docs/underground_guide.md +++ b/docs/underground_guide.md @@ -48,13 +48,6 @@ OBJECT_STORE=file \ DATABASE_DIRECTORY=~/data_dir \ LOG_FILTER=debug \ ./target/release/influxdb_iox catalog setup - -# initialize the topic -INFLUXDB_IOX_CATALOG_DSN=postgres://postgres@localhost:5432/postgres \ -OBJECT_STORE=file \ -DATABASE_DIRECTORY=~/data_dir \ -LOG_FILTER=debug \ -./target/release/influxdb_iox catalog topic update iox-shared ``` ## Inspecting Catalog state diff --git a/garbage_collector/src/objectstore/checker.rs b/garbage_collector/src/objectstore/checker.rs index d9b75d1188..2913fc458f 100644 --- a/garbage_collector/src/objectstore/checker.rs +++ b/garbage_collector/src/objectstore/checker.rs @@ -155,11 +155,9 @@ mod tests { let metric_registry = Arc::new(metric::Registry::new()); let catalog = Arc::new(MemCatalog::new(Arc::clone(&metric_registry))); let mut repos = catalog.repositories().await; - let topic = repos.topics().create_or_get("foo").await.unwrap(); - let pool = repos.query_pools().create_or_get("foo").await.unwrap(); let namespace = repos .namespaces() - .create("namespace_parquet_file_test", None, topic.id, pool.id) + .create("namespace_parquet_file_test", None) .await .unwrap(); let table = repos diff --git a/generated_types/protos/influxdata/iox/schema/v1/service.proto b/generated_types/protos/influxdata/iox/schema/v1/service.proto index 7aac5a4164..a028132295 100644 --- a/generated_types/protos/influxdata/iox/schema/v1/service.proto +++ b/generated_types/protos/influxdata/iox/schema/v1/service.proto @@ -20,13 +20,15 @@ message NamespaceSchema { // Renamed to topic_id reserved 2; reserved "kafka_topic_id"; + // Removed topic ID + reserved 5; + reserved "topic_id"; + // Removed query pool ID + reserved 3; + reserved "query_pool_id"; // Namespace ID int64 id = 1; - // Topic ID - int64 topic_id = 5; - // Query Pool ID - int64 query_pool_id = 3; // Map of Table Name -> Table Schema map tables = 4; } diff --git a/import/src/aggregate_tsm_schema/update_catalog.rs b/import/src/aggregate_tsm_schema/update_catalog.rs index 5f41c61204..bd19480fb4 100644 --- a/import/src/aggregate_tsm_schema/update_catalog.rs +++ b/import/src/aggregate_tsm_schema/update_catalog.rs @@ -2,7 +2,7 @@ use crate::{AggregateTSMMeasurement, AggregateTSMSchema}; use chrono::{format::StrftimeItems, offset::FixedOffset, DateTime, Duration}; use data_types::{ ColumnType, Namespace, NamespaceName, NamespaceSchema, OrgBucketMappingError, Partition, - PartitionKey, QueryPoolId, TableSchema, TopicId, + PartitionKey, TableSchema, }; use iox_catalog::interface::{ get_schema_by_name, CasFailure, Catalog, RepoCollection, SoftDeletedRows, @@ -25,9 +25,6 @@ pub enum UpdateCatalogError { #[error("Couldn't construct namespace from org and bucket: {0}")] InvalidOrgBucket(#[from] OrgBucketMappingError), - #[error("No topic named '{topic_name}' found in the catalog")] - TopicCatalogLookup { topic_name: String }, - #[error("No namespace named {0} in Catalog")] NamespaceNotFound(String), @@ -43,14 +40,10 @@ pub enum UpdateCatalogError { /// Given a merged schema, update the IOx catalog to either merge that schema into the existing one /// for the namespace, or create the namespace and schema using the merged schema. -/// Will error if the namespace needs to be created but the user hasn't explicitly set the query -/// pool name and retention setting, allowing the user to not provide them if they're not needed. -/// Would have done the same for `topic` but that comes from the shared clap block and isn't -/// an `Option`. -pub async fn update_iox_catalog<'a>( - merged_tsm_schema: &'a AggregateTSMSchema, - topic: &'a str, - query_pool_name: &'a str, +/// Will error if the namespace needs to be created but the user hasn't explicitly set the +/// retention setting, allowing the user to not provide it if it's not needed. +pub async fn update_iox_catalog( + merged_tsm_schema: &AggregateTSMSchema, catalog: Arc, ) -> Result<(), UpdateCatalogError> { let namespace_name = @@ -67,15 +60,7 @@ pub async fn update_iox_catalog<'a>( Ok(iox_schema) => iox_schema, Err(iox_catalog::interface::Error::NamespaceNotFoundByName { .. }) => { // create the namespace - let (topic_id, query_id) = - get_topic_id_and_query_id(repos.deref_mut(), topic, query_pool_name).await?; - let _namespace = create_namespace( - namespace_name.as_str(), - topic_id, - query_id, - repos.deref_mut(), - ) - .await?; + let _namespace = create_namespace(namespace_name.as_str(), repos.deref_mut()).await?; // fetch the newly-created schema (which will be empty except for the time column, // which won't impact the merge we're about to do) match get_schema_by_name( @@ -98,46 +83,11 @@ pub async fn update_iox_catalog<'a>( Ok(()) } -async fn get_topic_id_and_query_id<'a, R>( - repos: &mut R, - topic_name: &'a str, - query_pool_name: &'a str, -) -> Result<(TopicId, QueryPoolId), UpdateCatalogError> +async fn create_namespace(name: &str, repos: &mut R) -> Result where R: RepoCollection + ?Sized, { - let topic_id = repos - .topics() - .get_by_name(topic_name) - .await - .map_err(UpdateCatalogError::CatalogError)? - .map(|v| v.id) - .ok_or_else(|| UpdateCatalogError::TopicCatalogLookup { - topic_name: topic_name.to_string(), - })?; - let query_id = repos - .query_pools() - .create_or_get(query_pool_name) - .await - .map(|v| v.id) - .map_err(UpdateCatalogError::CatalogError)?; - Ok((topic_id, query_id)) -} - -async fn create_namespace( - name: &str, - topic_id: TopicId, - query_id: QueryPoolId, - repos: &mut R, -) -> Result -where - R: RepoCollection + ?Sized, -{ - match repos - .namespaces() - .create(name, None, topic_id, query_id) - .await - { + match repos.namespaces().create(name, None).await { Ok(ns) => Ok(ns), Err(iox_catalog::interface::Error::NameExists { .. }) => { // presumably it got created in the meantime? @@ -401,13 +351,6 @@ mod tests { // init a test catalog stack let metrics = Arc::new(metric::Registry::default()); let catalog: Arc = Arc::new(MemCatalog::new(Arc::clone(&metrics))); - catalog - .repositories() - .await - .topics() - .create_or_get("iox-shared") - .await - .expect("topic created"); let json = r#" { @@ -428,14 +371,9 @@ mod tests { } "#; let agg_schema: AggregateTSMSchema = json.try_into().unwrap(); - update_iox_catalog( - &agg_schema, - "iox-shared", - "iox-shared", - Arc::clone(&catalog), - ) - .await - .expect("schema update worked"); + update_iox_catalog(&agg_schema, Arc::clone(&catalog)) + .await + .expect("schema update worked"); let mut repos = catalog.repositories().await; let iox_schema = get_schema_by_name( "1234_5678", @@ -483,19 +421,13 @@ mod tests { // init a test catalog stack let metrics = Arc::new(metric::Registry::default()); let catalog: Arc = Arc::new(MemCatalog::new(Arc::clone(&metrics))); - // We need txn to go out of scope to release the lock before update_iox_catalog { let mut txn = catalog.repositories().await; - txn.topics() - .create_or_get("iox-shared") - .await - .expect("topic created"); - // create namespace, table and columns for weather measurement let namespace = txn .namespaces() - .create("1234_5678", None, TopicId::new(1), QueryPoolId::new(1)) + .create("1234_5678", None) .await .expect("namespace created"); let mut table = txn @@ -545,14 +477,9 @@ mod tests { } "#; let agg_schema: AggregateTSMSchema = json.try_into().unwrap(); - update_iox_catalog( - &agg_schema, - "iox-shared", - "iox-shared", - Arc::clone(&catalog), - ) - .await - .expect("schema update worked"); + update_iox_catalog(&agg_schema, Arc::clone(&catalog)) + .await + .expect("schema update worked"); let mut repos = catalog.repositories().await; let iox_schema = get_schema_by_name( "1234_5678", @@ -589,15 +516,10 @@ mod tests { // We need txn to go out of scope to release the lock before update_iox_catalog { let mut txn = catalog.repositories().await; - txn.topics() - .create_or_get("iox-shared") - .await - .expect("topic created"); - // create namespace, table and columns for weather measurement let namespace = txn .namespaces() - .create("1234_5678", None, TopicId::new(1), QueryPoolId::new(1)) + .create("1234_5678", None) .await .expect("namespace created"); let mut table = txn @@ -640,14 +562,9 @@ mod tests { } "#; let agg_schema: AggregateTSMSchema = json.try_into().unwrap(); - let err = update_iox_catalog( - &agg_schema, - "iox-shared", - "iox-shared", - Arc::clone(&catalog), - ) - .await - .expect_err("should fail catalog update"); + let err = update_iox_catalog(&agg_schema, Arc::clone(&catalog)) + .await + .expect_err("should fail catalog update"); assert_matches!(err, UpdateCatalogError::SchemaUpdateError(_)); assert!(err .to_string() @@ -663,15 +580,11 @@ mod tests { // We need txn to go out of scope to release the lock before update_iox_catalog { let mut txn = catalog.repositories().await; - txn.topics() - .create_or_get("iox-shared") - .await - .expect("topic created"); // create namespace, table and columns for weather measurement let namespace = txn .namespaces() - .create("1234_5678", None, TopicId::new(1), QueryPoolId::new(1)) + .create("1234_5678", None) .await .expect("namespace created"); let mut table = txn @@ -713,14 +626,9 @@ mod tests { } "#; let agg_schema: AggregateTSMSchema = json.try_into().unwrap(); - let err = update_iox_catalog( - &agg_schema, - "iox-shared", - "iox-shared", - Arc::clone(&catalog), - ) - .await - .expect_err("should fail catalog update"); + let err = update_iox_catalog(&agg_schema, Arc::clone(&catalog)) + .await + .expect_err("should fail catalog update"); assert_matches!(err, UpdateCatalogError::SchemaUpdateError(_)); assert!(err.to_string().ends_with( "a column with name temperature already exists in the schema with a different type" diff --git a/influxdb_iox/src/commands/catalog.rs b/influxdb_iox/src/commands/catalog.rs index 456d29c4c3..d6eabdf341 100644 --- a/influxdb_iox/src/commands/catalog.rs +++ b/influxdb_iox/src/commands/catalog.rs @@ -5,14 +5,9 @@ use thiserror::Error; use crate::process_info::setup_metric_registry; -mod topic; - #[allow(clippy::enum_variant_names)] #[derive(Debug, Error)] pub enum Error { - #[error("Error in topic subcommand: {0}")] - Topic(#[from] topic::Error), - #[error("Catalog error: {0}")] Catalog(#[from] iox_catalog::interface::Error), @@ -39,9 +34,6 @@ struct Setup { enum Command { /// Run database migrations Setup(Setup), - - /// Manage topic - Topic(topic::Config), } pub async fn command(config: Config) -> Result<(), Error> { @@ -52,9 +44,6 @@ pub async fn command(config: Config) -> Result<(), Error> { catalog.setup().await?; println!("OK"); } - Command::Topic(config) => { - topic::command(config).await?; - } } Ok(()) diff --git a/influxdb_iox/src/commands/catalog/topic.rs b/influxdb_iox/src/commands/catalog/topic.rs deleted file mode 100644 index 8783db3805..0000000000 --- a/influxdb_iox/src/commands/catalog/topic.rs +++ /dev/null @@ -1,54 +0,0 @@ -//! This module implements the `catalog topic` CLI subcommand - -use thiserror::Error; - -use clap_blocks::catalog_dsn::CatalogDsnConfig; - -use crate::process_info::setup_metric_registry; - -#[allow(clippy::enum_variant_names)] -#[derive(Debug, Error)] -pub enum Error { - #[error("Error updating catalog: {0}")] - UpdateCatalogError(#[from] iox_catalog::interface::Error), - - #[error("Catalog DSN error: {0}")] - CatalogDsn(#[from] clap_blocks::catalog_dsn::Error), -} - -/// Manage IOx chunks -#[derive(Debug, clap::Parser)] -pub struct Config { - #[clap(subcommand)] - command: Command, -} - -/// Create or update a topic -#[derive(Debug, clap::Parser)] -struct Update { - #[clap(flatten)] - catalog_dsn: CatalogDsnConfig, - - /// The name of the topic - #[clap(action)] - db_name: String, -} - -/// All possible subcommands for topic -#[derive(Debug, clap::Parser)] -enum Command { - Update(Update), -} - -pub async fn command(config: Config) -> Result<(), Error> { - match config.command { - Command::Update(update) => { - let metrics = setup_metric_registry(); - let catalog = update.catalog_dsn.get_catalog("cli", metrics).await?; - let mut repos = catalog.repositories().await; - let topic = repos.topics().create_or_get(&update.db_name).await?; - println!("{}", topic.id); - Ok(()) - } - } -} diff --git a/influxdb_iox/src/commands/import/schema.rs b/influxdb_iox/src/commands/import/schema.rs index 193ff96d89..1092dbbe05 100644 --- a/influxdb_iox/src/commands/import/schema.rs +++ b/influxdb_iox/src/commands/import/schema.rs @@ -84,26 +84,6 @@ pub struct MergeConfig { #[clap(flatten)] catalog_dsn: CatalogDsnConfig, - /// Write buffer topic/database that should be used. - // This isn't really relevant to the RPC write path and will be removed eventually. - #[clap( - long = "write-buffer-topic", - env = "INFLUXDB_IOX_WRITE_BUFFER_TOPIC", - default_value = "iox-shared", - action - )] - pub topic: String, - - /// Query pool name to dispatch writes to. - // This isn't really relevant to the RPC write path and will be removed eventually. - #[clap( - long = "query-pool", - env = "INFLUXDB_IOX_QUERY_POOL_NAME", - default_value = "iox-shared", - action - )] - pub query_pool_name: String, - #[clap(long)] /// Retention setting setting (used only if we need to create the namespace) retention: Option, @@ -192,13 +172,7 @@ pub async fn command(config: Config) -> Result<(), SchemaCommandError> { // given we have a valid aggregate TSM schema, fetch the schema for the namespace from // the IOx catalog, if it exists, and update it with our aggregate schema - update_iox_catalog( - &merged_tsm_schema, - &merge_config.topic, - &merge_config.query_pool_name, - Arc::clone(&catalog), - ) - .await?; + update_iox_catalog(&merged_tsm_schema, Arc::clone(&catalog)).await?; Ok(()) } diff --git a/influxdb_iox/src/commands/run/all_in_one.rs b/influxdb_iox/src/commands/run/all_in_one.rs index ca519aecf5..85d303424e 100644 --- a/influxdb_iox/src/commands/run/all_in_one.rs +++ b/influxdb_iox/src/commands/run/all_in_one.rs @@ -64,9 +64,6 @@ pub const DEFAULT_INGESTER_GRPC_BIND_ADDR: &str = "127.0.0.1:8083"; /// The default bind address for the Compactor gRPC pub const DEFAULT_COMPACTOR_GRPC_BIND_ADDR: &str = "127.0.0.1:8084"; -// If you want this level of control, should be instantiating the services individually -const QUERY_POOL_NAME: &str = "iox-shared"; - #[derive(Debug, Error)] pub enum Error { #[error("Run: {0}")] @@ -472,13 +469,11 @@ impl Config { let router_config = Router2Config { authz_address: authz_address.clone(), single_tenant_deployment, - query_pool_name: QUERY_POOL_NAME.to_string(), http_request_limit: 1_000, ingester_addresses: ingester_addresses.clone(), new_namespace_retention_hours: None, // infinite retention namespace_autocreation_enabled: true, partition_key_pattern: "%Y-%m-%d".to_string(), - topic: QUERY_POOL_NAME.to_string(), rpc_write_timeout_seconds: Duration::new(3, 0), rpc_write_replicas: None, rpc_write_max_outgoing_bytes: ingester_config.rpc_write_max_incoming_bytes, @@ -590,14 +585,6 @@ pub async fn command(config: Config) -> Result<()> { info!("running db migrations"); catalog.setup().await?; - // Create a topic - catalog - .repositories() - .await - .topics() - .create_or_get(QUERY_POOL_NAME) - .await?; - let object_store: Arc = make_object_store(router_run_config.object_store_config()) .map_err(Error::ObjectStoreParsing)?; diff --git a/ingester2/src/buffer_tree/partition/resolver/catalog.rs b/ingester2/src/buffer_tree/partition/resolver/catalog.rs index eb5b69160c..ff76eaf3ff 100644 --- a/ingester2/src/buffer_tree/partition/resolver/catalog.rs +++ b/ingester2/src/buffer_tree/partition/resolver/catalog.rs @@ -114,11 +114,9 @@ mod tests { let (namespace_id, table_id) = { let mut repos = catalog.repositories().await; - let t = repos.topics().create_or_get("platanos").await.unwrap(); - let q = repos.query_pools().create_or_get("platanos").await.unwrap(); let ns = repos .namespaces() - .create(TABLE_NAME, None, t.id, q.id) + .create(TABLE_NAME, None) .await .unwrap(); diff --git a/ingester2/src/test_util.rs b/ingester2/src/test_util.rs index 140acefcb0..6e710bd06a 100644 --- a/ingester2/src/test_util.rs +++ b/ingester2/src/test_util.rs @@ -283,14 +283,7 @@ pub(crate) async fn populate_catalog( table: &str, ) -> (NamespaceId, TableId) { let mut c = catalog.repositories().await; - let topic = c.topics().create_or_get("kafka-topic").await.unwrap(); - let query_pool = c.query_pools().create_or_get("query-pool").await.unwrap(); - let ns_id = c - .namespaces() - .create(namespace, None, topic.id, query_pool.id) - .await - .unwrap() - .id; + let ns_id = c.namespaces().create(namespace, None).await.unwrap().id; let table_id = c.tables().create_or_get(table, ns_id).await.unwrap().id; (ns_id, table_id) diff --git a/ingester2_test_ctx/src/lib.rs b/ingester2_test_ctx/src/lib.rs index 9fbb5e66a7..ad50f7edb9 100644 --- a/ingester2_test_ctx/src/lib.rs +++ b/ingester2_test_ctx/src/lib.rs @@ -17,8 +17,7 @@ use std::{collections::HashMap, sync::Arc, time::Duration}; use arrow::record_batch::RecordBatch; use arrow_flight::{decode::FlightRecordBatchStream, flight_service_server::FlightService, Ticket}; use data_types::{ - Namespace, NamespaceId, NamespaceSchema, ParquetFile, PartitionKey, QueryPoolId, - SequenceNumber, TableId, TopicId, TopicMetadata, + Namespace, NamespaceId, NamespaceSchema, ParquetFile, PartitionKey, SequenceNumber, TableId, }; use dml::{DmlMeta, DmlWrite}; use futures::{stream::FuturesUnordered, FutureExt, StreamExt, TryStreamExt}; @@ -43,9 +42,6 @@ use tokio::sync::oneshot; use tokio_util::sync::CancellationToken; use tonic::Request; -/// The (legacy) topic name this ingester uses. -pub const TEST_TOPIC_NAME: &str = "banana-topics"; - /// The default max persist queue depth - configurable with /// [`TestContextBuilder::with_max_persist_queue_depth()`]. pub const DEFAULT_MAX_PERSIST_QUEUE_DEPTH: usize = 5; @@ -53,6 +49,11 @@ pub const DEFAULT_MAX_PERSIST_QUEUE_DEPTH: usize = 5; /// [`TestContextBuilder::with_persist_hot_partition_cost()`]. pub const DEFAULT_PERSIST_HOT_PARTITION_COST: usize = 20_000_000; +/// Construct a new [`TestContextBuilder`] to make a [`TestContext`] for an [`ingester2`] instance. +pub fn test_context() -> TestContextBuilder { + TestContextBuilder::default() +} + /// Configure and construct a [`TestContext`] containing an [`ingester2`] instance. #[derive(Debug)] pub struct TestContextBuilder { @@ -126,24 +127,6 @@ impl TestContextBuilder { let storage = ParquetStorage::new(object_store, parquet_file::storage::StorageId::from("iox")); - // Initialise a topic and query pool. - // - // Note that tests should set up their own namespace via - // ensure_namespace() - let topic: TopicMetadata; - let query_id: QueryPoolId; - // txn must go out of scope so the lock is released for `ingester2::new` - { - let mut txn = catalog.repositories().await; - topic = txn.topics().create_or_get(TEST_TOPIC_NAME).await.unwrap(); - query_id = txn - .query_pools() - .create_or_get("banana-query-pool") - .await - .unwrap() - .id; - } - // Settings so that the ingester will effectively never persist by itself, only on demand let wal_rotation_period = Duration::from_secs(1_000_000); @@ -175,8 +158,6 @@ impl TestContextBuilder { _dir: dir, catalog, _storage: storage, - query_id, - topic_id: topic.id, metrics, namespaces: Default::default(), } @@ -193,8 +174,6 @@ pub struct TestContext { shutdown_tx: oneshot::Sender, catalog: Arc, _storage: ParquetStorage, - query_id: QueryPoolId, - topic_id: TopicId, metrics: Arc, /// Once the last [`TempDir`] reference is dropped, the directory it @@ -228,7 +207,7 @@ where .repositories() .await .namespaces() - .create(name, None, self.topic_id, self.query_id) + .create(name, None) .await .expect("failed to create test namespace"); @@ -238,8 +217,6 @@ where ns.id, NamespaceSchema::new( ns.id, - self.topic_id, - self.query_id, iox_catalog::DEFAULT_MAX_COLUMNS_PER_TABLE, iox_catalog::DEFAULT_MAX_TABLES, retention_period_ns, diff --git a/iox_catalog/README.md b/iox_catalog/README.md index 38a6ea8b20..21388cf02a 100644 --- a/iox_catalog/README.md +++ b/iox_catalog/README.md @@ -27,7 +27,6 @@ You'll then need to create the database. You can do this via the sqlx command li cargo install sqlx-cli DATABASE_URL= sqlx database create cargo run -q -- catalog setup -cargo run -- catalog topic update iox-shared ``` This will set up the database based on the files in `./migrations` in this crate. SQLx also creates diff --git a/iox_catalog/src/interface.rs b/iox_catalog/src/interface.rs index 5d422ef99e..0acddebf36 100644 --- a/iox_catalog/src/interface.rs +++ b/iox_catalog/src/interface.rs @@ -3,8 +3,8 @@ use async_trait::async_trait; use data_types::{ Column, ColumnSchema, ColumnType, CompactionLevel, Namespace, NamespaceId, NamespaceSchema, - ParquetFile, ParquetFileId, ParquetFileParams, Partition, PartitionId, PartitionKey, QueryPool, - QueryPoolId, SkippedCompaction, Table, TableId, TableSchema, Timestamp, TopicId, TopicMetadata, + ParquetFile, ParquetFileId, ParquetFileParams, Partition, PartitionId, PartitionKey, + SkippedCompaction, Table, TableId, TableSchema, Timestamp, }; use iox_time::TimeProvider; use snafu::{OptionExt, Snafu}; @@ -218,12 +218,6 @@ pub trait Catalog: Send + Sync + Debug + Display { /// should and must not care how these are implemented. #[async_trait] pub trait RepoCollection: Send + Sync + Debug { - /// Repository for [topics](data_types::TopicMetadata). - fn topics(&mut self) -> &mut dyn TopicMetadataRepo; - - /// Repository for [query pools](data_types::QueryPool). - fn query_pools(&mut self) -> &mut dyn QueryPoolRepo; - /// Repository for [namespaces](data_types::Namespace). fn namespaces(&mut self) -> &mut dyn NamespaceRepo; @@ -240,36 +234,13 @@ pub trait RepoCollection: Send + Sync + Debug { fn parquet_files(&mut self) -> &mut dyn ParquetFileRepo; } -/// Functions for working with topics in the catalog. -#[async_trait] -pub trait TopicMetadataRepo: Send + Sync { - /// Creates the topic in the catalog or gets the existing record by name. - async fn create_or_get(&mut self, name: &str) -> Result; - - /// Gets the topic by its unique name - async fn get_by_name(&mut self, name: &str) -> Result>; -} - -/// Functions for working with query pools in the catalog. -#[async_trait] -pub trait QueryPoolRepo: Send + Sync { - /// Creates the query pool in the catalog or gets the existing record by name. - async fn create_or_get(&mut self, name: &str) -> Result; -} - /// Functions for working with namespaces in the catalog #[async_trait] pub trait NamespaceRepo: Send + Sync { /// Creates the namespace in the catalog. If one by the same name already exists, an /// error is returned. /// Specify `None` for `retention_period_ns` to get infinite retention. - async fn create( - &mut self, - name: &str, - retention_period_ns: Option, - topic_id: TopicId, - query_pool_id: QueryPoolId, - ) -> Result; + async fn create(&mut self, name: &str, retention_period_ns: Option) -> Result; /// Update retention period for a namespace async fn update_retention_period( @@ -562,8 +533,6 @@ where let mut namespace = NamespaceSchema::new( namespace.id, - namespace.topic_id, - namespace.query_pool_id, namespace.max_columns_per_table, namespace.max_tables, namespace.retention_period_ns, @@ -715,8 +684,6 @@ pub async fn list_schemas( let mut ns = NamespaceSchema::new( v.id, - v.topic_id, - v.query_pool_id, v.max_columns_per_table, v.max_tables, v.retention_period_ns, @@ -747,7 +714,6 @@ pub(crate) mod test_helpers { test_setup(clean_state().await).await; test_namespace_soft_deletion(clean_state().await).await; test_partitions_new_file_between(clean_state().await).await; - test_query_pool(clean_state().await).await; test_column(clean_state().await).await; test_partition(clean_state().await).await; test_parquet_file(clean_state().await).await; @@ -758,10 +724,6 @@ pub(crate) mod test_helpers { test_list_schemas_soft_deleted_rows(clean_state().await).await; test_delete_namespace(clean_state().await).await; - let catalog = clean_state().await; - test_topic(Arc::clone(&catalog)).await; - assert_metric_hit(&catalog.metrics(), "topic_create_or_get"); - let catalog = clean_state().await; test_namespace(Arc::clone(&catalog)).await; assert_metric_hit(&catalog.metrics(), "namespace_create"); @@ -788,41 +750,12 @@ pub(crate) mod test_helpers { catalog.setup().await.expect("second catalog setup"); } - async fn test_topic(catalog: Arc) { - let mut repos = catalog.repositories().await; - let topic_repo = repos.topics(); - - let k = topic_repo.create_or_get("foo").await.unwrap(); - assert!(k.id > TopicId::new(0)); - assert_eq!(k.name, "foo"); - let k2 = topic_repo.create_or_get("foo").await.unwrap(); - assert_eq!(k, k2); - let k3 = topic_repo.get_by_name("foo").await.unwrap().unwrap(); - assert_eq!(k3, k); - let k3 = topic_repo.get_by_name("asdf").await.unwrap(); - assert!(k3.is_none()); - } - - async fn test_query_pool(catalog: Arc) { - let mut repos = catalog.repositories().await; - let query_repo = repos.query_pools(); - - let q = query_repo.create_or_get("foo").await.unwrap(); - assert!(q.id > QueryPoolId::new(0)); - assert_eq!(q.name, "foo"); - let q2 = query_repo.create_or_get("foo").await.unwrap(); - assert_eq!(q, q2); - } - async fn test_namespace(catalog: Arc) { let mut repos = catalog.repositories().await; - let topic = repos.topics().create_or_get("foo").await.unwrap(); - let pool = repos.query_pools().create_or_get("foo").await.unwrap(); - let namespace_name = "test_namespace"; let namespace = repos .namespaces() - .create(namespace_name, None, topic.id, pool.id) + .create(namespace_name, None) .await .unwrap(); assert!(namespace.id > NamespaceId::new(0)); @@ -835,10 +768,7 @@ pub(crate) mod test_helpers { DEFAULT_MAX_COLUMNS_PER_TABLE ); - let conflict = repos - .namespaces() - .create(namespace_name, None, topic.id, pool.id) - .await; + let conflict = repos.namespaces().create(namespace_name, None).await; assert!(matches!( conflict.unwrap_err(), Error::NameExists { name: _ } @@ -877,7 +807,7 @@ pub(crate) mod test_helpers { let namespace2_name = "test_namespace2"; let namespace2 = repos .namespaces() - .create(namespace2_name, None, topic.id, pool.id) + .create(namespace2_name, None) .await .unwrap(); let mut namespaces = repos @@ -926,7 +856,7 @@ pub(crate) mod test_helpers { let namespace3_name = "test_namespace3"; let namespace3 = repos .namespaces() - .create(namespace3_name, None, topic.id, pool.id) + .create(namespace3_name, None) .await .expect("namespace with NULL retention should be created"); assert!(namespace3.retention_period_ns.is_none()); @@ -935,12 +865,7 @@ pub(crate) mod test_helpers { let namespace4_name = "test_namespace4"; let namespace4 = repos .namespaces() - .create( - namespace4_name, - Some(NEW_RETENTION_PERIOD_NS), - topic.id, - pool.id, - ) + .create(namespace4_name, Some(NEW_RETENTION_PERIOD_NS)) .await .expect("namespace with 5-hour retention should be created"); assert_eq!( @@ -986,19 +911,9 @@ pub(crate) mod test_helpers { /// the expected rows for all three states of [`SoftDeletedRows`]. async fn test_namespace_soft_deletion(catalog: Arc) { let mut repos = catalog.repositories().await; - let topic = repos.topics().create_or_get("foo").await.unwrap(); - let pool = repos.query_pools().create_or_get("foo").await.unwrap(); - let deleted_ns = repos - .namespaces() - .create("deleted-ns", None, topic.id, pool.id) - .await - .unwrap(); - let active_ns = repos - .namespaces() - .create("active-ns", None, topic.id, pool.id) - .await - .unwrap(); + let deleted_ns = repos.namespaces().create("deleted-ns", None).await.unwrap(); + let active_ns = repos.namespaces().create("active-ns", None).await.unwrap(); // Mark "deleted-ns" as soft-deleted. repos.namespaces().soft_delete("deleted-ns").await.unwrap(); @@ -1152,11 +1067,9 @@ pub(crate) mod test_helpers { async fn test_table(catalog: Arc) { let mut repos = catalog.repositories().await; - let topic = repos.topics().create_or_get("foo").await.unwrap(); - let pool = repos.query_pools().create_or_get("foo").await.unwrap(); let namespace = repos .namespaces() - .create("namespace_table_test", None, topic.id, pool.id) + .create("namespace_table_test", None) .await .unwrap(); @@ -1191,11 +1104,7 @@ pub(crate) mod test_helpers { assert_eq!(vec![t.clone()], tables); // test we can create a table of the same name in a different namespace - let namespace2 = repos - .namespaces() - .create("two", None, topic.id, pool.id) - .await - .unwrap(); + let namespace2 = repos.namespaces().create("two", None).await.unwrap(); assert_ne!(namespace, namespace2); let test_table = repos .tables() @@ -1291,11 +1200,9 @@ pub(crate) mod test_helpers { async fn test_column(catalog: Arc) { let mut repos = catalog.repositories().await; - let topic = repos.topics().create_or_get("foo").await.unwrap(); - let pool = repos.query_pools().create_or_get("foo").await.unwrap(); let namespace = repos .namespaces() - .create("namespace_column_test", None, topic.id, pool.id) + .create("namespace_column_test", None) .await .unwrap(); let table = repos @@ -1426,11 +1333,9 @@ pub(crate) mod test_helpers { async fn test_partition(catalog: Arc) { let mut repos = catalog.repositories().await; - let topic = repos.topics().create_or_get("foo").await.unwrap(); - let pool = repos.query_pools().create_or_get("foo").await.unwrap(); let namespace = repos .namespaces() - .create("namespace_partition_test", None, topic.id, pool.id) + .create("namespace_partition_test", None) .await .unwrap(); let table = repos @@ -1710,11 +1615,9 @@ pub(crate) mod test_helpers { /// tests many interactions with the catalog and parquet files. See the individual conditions herein async fn test_parquet_file(catalog: Arc) { let mut repos = catalog.repositories().await; - let topic = repos.topics().create_or_get("foo").await.unwrap(); - let pool = repos.query_pools().create_or_get("foo").await.unwrap(); let namespace = repos .namespaces() - .create("namespace_parquet_file_test", None, topic.id, pool.id) + .create("namespace_parquet_file_test", None) .await .unwrap(); let table = repos @@ -1899,7 +1802,7 @@ pub(crate) mod test_helpers { // test list_by_namespace_not_to_delete let namespace2 = repos .namespaces() - .create("namespace_parquet_file_test1", None, topic.id, pool.id) + .create("namespace_parquet_file_test1", None) .await .unwrap(); let table2 = repos @@ -2192,16 +2095,14 @@ pub(crate) mod test_helpers { async fn test_parquet_file_delete_broken(catalog: Arc) { let mut repos = catalog.repositories().await; - let topic = repos.topics().create_or_get("foo").await.unwrap(); - let pool = repos.query_pools().create_or_get("foo").await.unwrap(); let namespace_1 = repos .namespaces() - .create("retention_broken_1", None, topic.id, pool.id) + .create("retention_broken_1", None) .await .unwrap(); let namespace_2 = repos .namespaces() - .create("retention_broken_2", Some(1), topic.id, pool.id) + .create("retention_broken_2", Some(1)) .await .unwrap(); let table_1 = repos @@ -2274,19 +2175,9 @@ pub(crate) mod test_helpers { async fn test_partitions_new_file_between(catalog: Arc) { let mut repos = catalog.repositories().await; - let topic = repos - .topics() - .create_or_get("new_file_between") - .await - .unwrap(); - let pool = repos - .query_pools() - .create_or_get("new_file_between") - .await - .unwrap(); let namespace = repos .namespaces() - .create("test_partitions_new_file_between", None, topic.id, pool.id) + .create("test_partitions_new_file_between", None) .await .unwrap(); let table = repos @@ -2650,15 +2541,11 @@ pub(crate) mod test_helpers { async fn test_list_by_partiton_not_to_delete(catalog: Arc) { let mut repos = catalog.repositories().await; - let topic = repos.topics().create_or_get("foo").await.unwrap(); - let pool = repos.query_pools().create_or_get("foo").await.unwrap(); let namespace = repos .namespaces() .create( "namespace_parquet_file_test_list_by_partiton_not_to_delete", None, - topic.id, - pool.id, ) .await .unwrap(); @@ -2764,16 +2651,9 @@ pub(crate) mod test_helpers { async fn test_update_to_compaction_level_1(catalog: Arc) { let mut repos = catalog.repositories().await; - let topic = repos.topics().create_or_get("foo").await.unwrap(); - let pool = repos.query_pools().create_or_get("foo").await.unwrap(); let namespace = repos .namespaces() - .create( - "namespace_update_to_compaction_level_1_test", - None, - topic.id, - pool.id, - ) + .create("namespace_update_to_compaction_level_1_test", None) .await .unwrap(); let table = repos @@ -2857,11 +2737,9 @@ pub(crate) mod test_helpers { /// effective. async fn test_delete_namespace(catalog: Arc) { let mut repos = catalog.repositories().await; - let topic = repos.topics().create_or_get("foo").await.unwrap(); - let pool = repos.query_pools().create_or_get("foo").await.unwrap(); let namespace_1 = repos .namespaces() - .create("namespace_test_delete_namespace_1", None, topic.id, pool.id) + .create("namespace_test_delete_namespace_1", None) .await .unwrap(); let table_1 = repos @@ -2916,7 +2794,7 @@ pub(crate) mod test_helpers { // it, let's create another so we can ensure that doesn't get deleted. let namespace_2 = repos .namespaces() - .create("namespace_test_delete_namespace_2", None, topic.id, pool.id) + .create("namespace_test_delete_namespace_2", None) .await .unwrap(); let table_2 = repos @@ -3101,12 +2979,7 @@ pub(crate) mod test_helpers { where R: RepoCollection + ?Sized, { - let topic = repos.topics().create_or_get("foo").await.unwrap(); - let pool = repos.query_pools().create_or_get("foo").await.unwrap(); - let namespace = repos - .namespaces() - .create(namespace_name, None, topic.id, pool.id) - .await; + let namespace = repos.namespaces().create(namespace_name, None).await; let namespace = match namespace { Ok(v) => v, @@ -3123,8 +2996,6 @@ pub(crate) mod test_helpers { let batches = batches.iter().map(|(table, batch)| (table.as_str(), batch)); let ns = NamespaceSchema::new( namespace.id, - topic.id, - pool.id, namespace.max_columns_per_table, namespace.max_tables, namespace.retention_period_ns, diff --git a/iox_catalog/src/kafkaless_transition.rs b/iox_catalog/src/kafkaless_transition.rs index 408848b794..fa89731d3c 100644 --- a/iox_catalog/src/kafkaless_transition.rs +++ b/iox_catalog/src/kafkaless_transition.rs @@ -1,11 +1,13 @@ -use data_types::TopicId; - /// Magic number to be used shard indices and shard ids in "kafkaless". pub(crate) const TRANSITION_SHARD_NUMBER: i32 = 1234; /// In kafkaless mode all new persisted data uses this shard id. pub(crate) const TRANSITION_SHARD_ID: ShardId = ShardId::new(TRANSITION_SHARD_NUMBER as i64); /// In kafkaless mode all new persisted data uses this shard index. pub(crate) const TRANSITION_SHARD_INDEX: ShardIndex = ShardIndex::new(TRANSITION_SHARD_NUMBER); +pub(crate) const SHARED_TOPIC_NAME: &str = "iox-shared"; +pub(crate) const SHARED_TOPIC_ID: TopicId = TopicId::new(1); +pub(crate) const SHARED_QUERY_POOL_ID: QueryPoolId = QueryPoolId::new(1); +pub(crate) const SHARED_QUERY_POOL: &str = SHARED_TOPIC_NAME; /// Unique ID for a `Shard`, assigned by the catalog. Joins to other catalog tables to uniquely /// identify shards independently of the underlying write buffer implementation. @@ -67,3 +69,27 @@ pub(crate) struct Shard { /// and write buffer pub(crate) shard_index: ShardIndex, } + +/// Unique ID for a Topic, assigned by the catalog +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, sqlx::Type)] +#[sqlx(transparent)] +pub struct TopicId(i64); + +#[allow(missing_docs)] +impl TopicId { + pub const fn new(v: i64) -> Self { + Self(v) + } +} + +/// Unique ID for a `QueryPool` +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, sqlx::Type)] +#[sqlx(transparent)] +pub struct QueryPoolId(i64); + +#[allow(missing_docs)] +impl QueryPoolId { + pub const fn new(v: i64) -> Self { + Self(v) + } +} diff --git a/iox_catalog/src/lib.rs b/iox_catalog/src/lib.rs index 0cb84d47d8..30ab2c3a31 100644 --- a/iox_catalog/src/lib.rs +++ b/iox_catalog/src/lib.rs @@ -14,14 +14,11 @@ )] use crate::interface::{ColumnTypeMismatchSnafu, Error, RepoCollection, Result}; -use data_types::{ColumnType, NamespaceSchema, QueryPool, TableSchema, TopicId, TopicMetadata}; +use data_types::{ColumnType, NamespaceSchema, TableSchema}; use mutable_batch::MutableBatch; use std::{borrow::Cow, collections::HashMap}; use thiserror::Error; -const SHARED_TOPIC_NAME: &str = "iox-shared"; -const SHARED_TOPIC_ID: TopicId = TopicId::new(1); -const SHARED_QUERY_POOL: &str = SHARED_TOPIC_NAME; const TIME_COLUMN: &str = "time"; /// Default per-namespace table count service protection limit. @@ -204,19 +201,6 @@ where Ok(()) } -/// Creates or gets records in the catalog for the shared topic and query pool for each of the -/// partitions. -/// -/// Used in tests and when creating an in-memory catalog. -pub async fn create_or_get_default_records( - txn: &mut dyn RepoCollection, -) -> Result<(TopicMetadata, QueryPool)> { - let topic = txn.topics().create_or_get(SHARED_TOPIC_NAME).await?; - let query_pool = txn.query_pools().create_or_get(SHARED_QUERY_POOL).await?; - - Ok((topic, query_pool)) -} - #[cfg(test)] mod tests { use std::{collections::BTreeMap, sync::Arc}; @@ -251,20 +235,15 @@ mod tests { let metrics = Arc::new(metric::Registry::default()); let repo = MemCatalog::new(metrics); let mut txn = repo.repositories().await; - let (topic, query_pool) = create_or_get_default_records( - txn.deref_mut() - ).await.unwrap(); let namespace = txn .namespaces() - .create(NAMESPACE_NAME, None, topic.id, query_pool.id) + .create(NAMESPACE_NAME, None) .await .unwrap(); let schema = NamespaceSchema::new( namespace.id, - namespace.topic_id, - namespace.query_pool_id, namespace.max_columns_per_table, namespace.max_tables, namespace.retention_period_ns, diff --git a/iox_catalog/src/mem.rs b/iox_catalog/src/mem.rs index a6ce9ee671..d42688db85 100644 --- a/iox_catalog/src/mem.rs +++ b/iox_catalog/src/mem.rs @@ -4,18 +4,18 @@ use crate::{ interface::{ CasFailure, Catalog, ColumnRepo, ColumnTypeMismatchSnafu, Error, NamespaceRepo, - ParquetFileRepo, PartitionRepo, QueryPoolRepo, RepoCollection, Result, SoftDeletedRows, - TableRepo, TopicMetadataRepo, MAX_PARQUET_FILES_SELECTED_ONCE, + ParquetFileRepo, PartitionRepo, RepoCollection, Result, SoftDeletedRows, TableRepo, + MAX_PARQUET_FILES_SELECTED_ONCE, }, - kafkaless_transition::{Shard, TRANSITION_SHARD_ID, TRANSITION_SHARD_INDEX}, + kafkaless_transition::{Shard, SHARED_TOPIC_ID, TRANSITION_SHARD_ID, TRANSITION_SHARD_INDEX}, metrics::MetricDecorator, - DEFAULT_MAX_COLUMNS_PER_TABLE, DEFAULT_MAX_TABLES, SHARED_TOPIC_ID, SHARED_TOPIC_NAME, + DEFAULT_MAX_COLUMNS_PER_TABLE, DEFAULT_MAX_TABLES, }; use async_trait::async_trait; use data_types::{ Column, ColumnId, ColumnType, CompactionLevel, Namespace, NamespaceId, ParquetFile, - ParquetFileId, ParquetFileParams, Partition, PartitionId, PartitionKey, QueryPool, QueryPoolId, - SkippedCompaction, Table, TableId, Timestamp, TopicId, TopicMetadata, + ParquetFileId, ParquetFileParams, Partition, PartitionId, PartitionKey, SkippedCompaction, + Table, TableId, Timestamp, }; use iox_time::{SystemProvider, TimeProvider}; use snafu::ensure; @@ -62,8 +62,6 @@ impl std::fmt::Debug for MemCatalog { #[derive(Default, Debug, Clone)] struct MemCollections { - topics: Vec, - query_pools: Vec, namespaces: Vec, tables: Vec, columns: Vec, @@ -98,18 +96,10 @@ impl Catalog for MemCatalog { let mut guard = Arc::clone(&self.collections).lock_owned().await; let mut stage = guard.clone(); - // We need to manually insert the topic here so that we can create the transition shard - // below. - let topic = TopicMetadata { - id: SHARED_TOPIC_ID, - name: SHARED_TOPIC_NAME.to_string(), - }; - stage.topics.push(topic.clone()); - // The transition shard must exist and must have magic ID and INDEX. let shard = Shard { id: TRANSITION_SHARD_ID, - topic_id: topic.id, + topic_id: SHARED_TOPIC_ID, shard_index: TRANSITION_SHARD_INDEX, }; stage.shards.push(shard); @@ -139,14 +129,6 @@ impl Catalog for MemCatalog { #[async_trait] impl RepoCollection for MemTxn { - fn topics(&mut self) -> &mut dyn TopicMetadataRepo { - self - } - - fn query_pools(&mut self) -> &mut dyn QueryPoolRepo { - self - } - fn namespaces(&mut self) -> &mut dyn NamespaceRepo { self } @@ -168,64 +150,9 @@ impl RepoCollection for MemTxn { } } -#[async_trait] -impl TopicMetadataRepo for MemTxn { - async fn create_or_get(&mut self, name: &str) -> Result { - let stage = self.stage(); - - let topic = match stage.topics.iter().find(|t| t.name == name) { - Some(t) => t, - None => { - let topic = TopicMetadata { - id: TopicId::new(stage.topics.len() as i64 + 1), - name: name.to_string(), - }; - stage.topics.push(topic); - stage.topics.last().unwrap() - } - }; - - Ok(topic.clone()) - } - - async fn get_by_name(&mut self, name: &str) -> Result> { - let stage = self.stage(); - - let topic = stage.topics.iter().find(|t| t.name == name).cloned(); - Ok(topic) - } -} - -#[async_trait] -impl QueryPoolRepo for MemTxn { - async fn create_or_get(&mut self, name: &str) -> Result { - let stage = self.stage(); - - let pool = match stage.query_pools.iter().find(|t| t.name == name) { - Some(t) => t, - None => { - let pool = QueryPool { - id: QueryPoolId::new(stage.query_pools.len() as i64 + 1), - name: name.to_string(), - }; - stage.query_pools.push(pool); - stage.query_pools.last().unwrap() - } - }; - - Ok(pool.clone()) - } -} - #[async_trait] impl NamespaceRepo for MemTxn { - async fn create( - &mut self, - name: &str, - retention_period_ns: Option, - topic_id: TopicId, - query_pool_id: QueryPoolId, - ) -> Result { + async fn create(&mut self, name: &str, retention_period_ns: Option) -> Result { let stage = self.stage(); if stage.namespaces.iter().any(|n| n.name == name) { @@ -237,8 +164,6 @@ impl NamespaceRepo for MemTxn { let namespace = Namespace { id: NamespaceId::new(stage.namespaces.len() as i64 + 1), name: name.to_string(), - topic_id, - query_pool_id, max_tables: DEFAULT_MAX_TABLES, max_columns_per_table: DEFAULT_MAX_COLUMNS_PER_TABLE, retention_period_ns, diff --git a/iox_catalog/src/metrics.rs b/iox_catalog/src/metrics.rs index 6efca45b94..09ce99bcbd 100644 --- a/iox_catalog/src/metrics.rs +++ b/iox_catalog/src/metrics.rs @@ -1,14 +1,14 @@ //! Metric instrumentation for catalog implementations. use crate::interface::{ - CasFailure, ColumnRepo, NamespaceRepo, ParquetFileRepo, PartitionRepo, QueryPoolRepo, - RepoCollection, Result, SoftDeletedRows, TableRepo, TopicMetadataRepo, + CasFailure, ColumnRepo, NamespaceRepo, ParquetFileRepo, PartitionRepo, RepoCollection, Result, + SoftDeletedRows, TableRepo, }; use async_trait::async_trait; use data_types::{ Column, ColumnType, CompactionLevel, Namespace, NamespaceId, ParquetFile, ParquetFileId, - ParquetFileParams, Partition, PartitionId, PartitionKey, QueryPool, QueryPoolId, - SkippedCompaction, Table, TableId, Timestamp, TopicId, TopicMetadata, + ParquetFileParams, Partition, PartitionId, PartitionKey, SkippedCompaction, Table, TableId, + Timestamp, }; use iox_time::{SystemProvider, TimeProvider}; use metric::{DurationHistogram, Metric}; @@ -41,24 +41,9 @@ impl MetricDecorator { impl RepoCollection for MetricDecorator where - T: TopicMetadataRepo - + QueryPoolRepo - + NamespaceRepo - + TableRepo - + ColumnRepo - + PartitionRepo - + ParquetFileRepo - + Debug, + T: NamespaceRepo + TableRepo + ColumnRepo + PartitionRepo + ParquetFileRepo + Debug, P: TimeProvider, { - fn topics(&mut self) -> &mut dyn TopicMetadataRepo { - self - } - - fn query_pools(&mut self) -> &mut dyn QueryPoolRepo { - self - } - fn namespaces(&mut self) -> &mut dyn NamespaceRepo { self } @@ -143,25 +128,10 @@ macro_rules! decorate { }; } -decorate!( - impl_trait = TopicMetadataRepo, - methods = [ - "topic_create_or_get" = create_or_get(&mut self, name: &str) -> Result; - "topic_get_by_name" = get_by_name(&mut self, name: &str) -> Result>; - ] -); - -decorate!( - impl_trait = QueryPoolRepo, - methods = [ - "query_create_or_get" = create_or_get(&mut self, name: &str) -> Result; - ] -); - decorate!( impl_trait = NamespaceRepo, methods = [ - "namespace_create" = create(&mut self, name: &str, retention_period_ns: Option, topic_id: TopicId, query_pool_id: QueryPoolId) -> Result; + "namespace_create" = create(&mut self, name: &str, retention_period_ns: Option) -> Result; "namespace_update_retention_period" = update_retention_period(&mut self, name: &str, retention_period_ns: Option) -> Result; "namespace_list" = list(&mut self, deleted: SoftDeletedRows) -> Result>; "namespace_get_by_id" = get_by_id(&mut self, id: NamespaceId, deleted: SoftDeletedRows) -> Result>; diff --git a/iox_catalog/src/postgres.rs b/iox_catalog/src/postgres.rs index d2cc8ba195..5851ad3edf 100644 --- a/iox_catalog/src/postgres.rs +++ b/iox_catalog/src/postgres.rs @@ -3,18 +3,21 @@ use crate::{ interface::{ self, CasFailure, Catalog, ColumnRepo, ColumnTypeMismatchSnafu, Error, NamespaceRepo, - ParquetFileRepo, PartitionRepo, QueryPoolRepo, RepoCollection, Result, SoftDeletedRows, - TableRepo, TopicMetadataRepo, MAX_PARQUET_FILES_SELECTED_ONCE, + ParquetFileRepo, PartitionRepo, RepoCollection, Result, SoftDeletedRows, TableRepo, + MAX_PARQUET_FILES_SELECTED_ONCE, + }, + kafkaless_transition::{ + SHARED_QUERY_POOL, SHARED_QUERY_POOL_ID, SHARED_TOPIC_ID, SHARED_TOPIC_NAME, + TRANSITION_SHARD_ID, TRANSITION_SHARD_INDEX, }, - kafkaless_transition::{TRANSITION_SHARD_ID, TRANSITION_SHARD_INDEX}, metrics::MetricDecorator, - DEFAULT_MAX_COLUMNS_PER_TABLE, DEFAULT_MAX_TABLES, SHARED_TOPIC_ID, SHARED_TOPIC_NAME, + DEFAULT_MAX_COLUMNS_PER_TABLE, DEFAULT_MAX_TABLES, }; use async_trait::async_trait; use data_types::{ Column, ColumnType, CompactionLevel, Namespace, NamespaceId, ParquetFile, ParquetFileId, - ParquetFileParams, Partition, PartitionId, PartitionKey, QueryPool, QueryPoolId, - SkippedCompaction, Table, TableId, Timestamp, TopicId, TopicMetadata, + ParquetFileParams, Partition, PartitionId, PartitionKey, SkippedCompaction, Table, TableId, + Timestamp, }; use iox_time::{SystemProvider, TimeProvider}; use observability_deps::tracing::{debug, info, warn}; @@ -240,7 +243,8 @@ impl Catalog for PostgresCatalog { .await .map_err(|e| Error::Setup { source: e.into() })?; - // We need to manually insert the topic here so that we can create the transition shard below. + // We need to manually insert the topic here so that we can create the transition shard + // below. sqlx::query( r#" INSERT INTO topic (name) @@ -271,6 +275,21 @@ DO NOTHING; .await .map_err(|e| Error::Setup { source: e })?; + // We need to manually insert the query pool here so that we can create namespaces that + // reference it. + sqlx::query( + r#" +INSERT INTO query_pool (name) +VALUES ($1) +ON CONFLICT ON CONSTRAINT query_pool_name_unique +DO NOTHING; + "#, + ) + .bind(SHARED_QUERY_POOL) + .execute(&self.pool) + .await + .map_err(|e| Error::Setup { source: e })?; + Ok(()) } @@ -458,14 +477,6 @@ fn get_dsn_file_path(dsn: &str) -> Option { #[async_trait] impl RepoCollection for PostgresTxn { - fn topics(&mut self) -> &mut dyn TopicMetadataRepo { - self - } - - fn query_pools(&mut self) -> &mut dyn QueryPoolRepo { - self - } - fn namespaces(&mut self) -> &mut dyn NamespaceRepo { self } @@ -487,88 +498,19 @@ impl RepoCollection for PostgresTxn { } } -#[async_trait] -impl TopicMetadataRepo for PostgresTxn { - async fn create_or_get(&mut self, name: &str) -> Result { - let rec = sqlx::query_as::<_, TopicMetadata>( - r#" -INSERT INTO topic ( name ) -VALUES ( $1 ) -ON CONFLICT ON CONSTRAINT topic_name_unique -DO UPDATE SET name = topic.name -RETURNING *; - "#, - ) - .bind(name) // $1 - .fetch_one(&mut self.inner) - .await - .map_err(|e| Error::SqlxError { source: e })?; - - Ok(rec) - } - - async fn get_by_name(&mut self, name: &str) -> Result> { - let rec = sqlx::query_as::<_, TopicMetadata>( - r#" -SELECT * -FROM topic -WHERE name = $1; - "#, - ) - .bind(name) // $1 - .fetch_one(&mut self.inner) - .await; - - if let Err(sqlx::Error::RowNotFound) = rec { - return Ok(None); - } - - let topic = rec.map_err(|e| Error::SqlxError { source: e })?; - - Ok(Some(topic)) - } -} - -#[async_trait] -impl QueryPoolRepo for PostgresTxn { - async fn create_or_get(&mut self, name: &str) -> Result { - let rec = sqlx::query_as::<_, QueryPool>( - r#" -INSERT INTO query_pool ( name ) -VALUES ( $1 ) -ON CONFLICT ON CONSTRAINT query_pool_name_unique -DO UPDATE SET name = query_pool.name -RETURNING *; - "#, - ) - .bind(name) // $1 - .fetch_one(&mut self.inner) - .await - .map_err(|e| Error::SqlxError { source: e })?; - - Ok(rec) - } -} - #[async_trait] impl NamespaceRepo for PostgresTxn { - async fn create( - &mut self, - name: &str, - retention_period_ns: Option, - topic_id: TopicId, - query_pool_id: QueryPoolId, - ) -> Result { + async fn create(&mut self, name: &str, retention_period_ns: Option) -> Result { let rec = sqlx::query_as::<_, Namespace>( r#" INSERT INTO namespace ( name, topic_id, query_pool_id, retention_period_ns, max_tables ) VALUES ( $1, $2, $3, $4, $5 ) - RETURNING *; + RETURNING id, name, retention_period_ns, max_tables, max_columns_per_table, deleted_at; "#, ) .bind(name) // $1 - .bind(topic_id) // $2 - .bind(query_pool_id) // $3 + .bind(SHARED_TOPIC_ID) // $2 + .bind(SHARED_QUERY_POOL_ID) // $3 .bind(retention_period_ns) // $4 .bind(DEFAULT_MAX_TABLES); // $5 @@ -594,7 +536,11 @@ impl NamespaceRepo for PostgresTxn { async fn list(&mut self, deleted: SoftDeletedRows) -> Result> { let rec = sqlx::query_as::<_, Namespace>( format!( - r#"SELECT * FROM namespace WHERE {v};"#, + r#" +SELECT id, name, retention_period_ns, max_tables, max_columns_per_table, deleted_at +FROM namespace +WHERE {v}; + "#, v = deleted.as_sql_predicate() ) .as_str(), @@ -613,7 +559,11 @@ impl NamespaceRepo for PostgresTxn { ) -> Result> { let rec = sqlx::query_as::<_, Namespace>( format!( - r#"SELECT * FROM namespace WHERE id=$1 AND {v};"#, + r#" +SELECT id, name, retention_period_ns, max_tables, max_columns_per_table, deleted_at +FROM namespace +WHERE id=$1 AND {v}; + "#, v = deleted.as_sql_predicate() ) .as_str(), @@ -638,7 +588,11 @@ impl NamespaceRepo for PostgresTxn { ) -> Result> { let rec = sqlx::query_as::<_, Namespace>( format!( - r#"SELECT * FROM namespace WHERE name=$1 AND {v};"#, + r#" +SELECT id, name, retention_period_ns, max_tables, max_columns_per_table, deleted_at +FROM namespace +WHERE name=$1 AND {v}; + "#, v = deleted.as_sql_predicate() ) .as_str(), @@ -675,7 +629,7 @@ impl NamespaceRepo for PostgresTxn { UPDATE namespace SET max_tables = $1 WHERE name = $2 -RETURNING *; +RETURNING id, name, retention_period_ns, max_tables, max_columns_per_table, deleted_at; "#, ) .bind(new_max) @@ -699,7 +653,7 @@ RETURNING *; UPDATE namespace SET max_columns_per_table = $1 WHERE name = $2 -RETURNING *; +RETURNING id, name, retention_period_ns, max_tables, max_columns_per_table, deleted_at; "#, ) .bind(new_max) @@ -723,7 +677,12 @@ RETURNING *; retention_period_ns: Option, ) -> Result { let rec = sqlx::query_as::<_, Namespace>( - r#"UPDATE namespace SET retention_period_ns = $1 WHERE name = $2 RETURNING *;"#, + r#" +UPDATE namespace +SET retention_period_ns = $1 +WHERE name = $2 +RETURNING id, name, retention_period_ns, max_tables, max_columns_per_table, deleted_at; + "#, ) .bind(retention_period_ns) // $1 .bind(name) // $2 @@ -1567,7 +1526,7 @@ INSERT INTO parquet_file ( shard_id, table_id, partition_id, object_store_id, min_time, max_time, file_size_bytes, row_count, compaction_level, created_at, namespace_id, column_set, max_l0_created_at ) -VALUES ( $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14 ) +VALUES ( $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13 ) RETURNING id, table_id, partition_id, object_store_id, min_time, max_time, to_delete, file_size_bytes, @@ -1681,13 +1640,12 @@ fn is_fk_violation(e: &sqlx::Error) -> bool { #[cfg(test)] mod tests { use super::*; - use crate::create_or_get_default_records; use assert_matches::assert_matches; use data_types::{ColumnId, ColumnSet}; use metric::{Attributes, DurationHistogram, Metric}; use rand::Rng; use sqlx::migrate::MigrateDatabase; - use std::{env, io::Write, ops::DerefMut, sync::Arc, time::Instant}; + use std::{env, io::Write, sync::Arc, time::Instant}; use tempfile::NamedTempFile; // Helper macro to skip tests if TEST_INTEGRATION and TEST_INFLUXDB_IOX_CATALOG_DSN environment @@ -1864,18 +1822,13 @@ mod tests { maybe_skip_integration!(); let postgres = setup_db().await; - let postgres: Arc = Arc::new(postgres); - let mut txn = postgres.repositories().await; - let (kafka, query) = create_or_get_default_records(txn.deref_mut()) - .await - .expect("db init failed"); let namespace_id = postgres .repositories() .await .namespaces() - .create("ns4", None, kafka.id, query.id) + .create("ns4", None) .await .expect("namespace create failed") .id; @@ -2007,18 +1960,13 @@ mod tests { let postgres = setup_db().await; let metrics = Arc::clone(&postgres.metrics); - let postgres: Arc = Arc::new(postgres); - let mut txn = postgres.repositories().await; - let (kafka, query) = create_or_get_default_records(txn.deref_mut()) - .await - .expect("db init failed"); let namespace_id = postgres .repositories() .await .namespaces() - .create("ns4", None, kafka.id, query.id) + .create("ns4", None) .await .expect("namespace create failed") .id; @@ -2175,18 +2123,13 @@ mod tests { let postgres = setup_db().await; let pool = postgres.pool.clone(); - let postgres: Arc = Arc::new(postgres); - let mut txn = postgres.repositories().await; - let (kafka, query) = create_or_get_default_records(txn.deref_mut()) - .await - .expect("db init failed"); let namespace_id = postgres .repositories() .await .namespaces() - .create("ns4", None, kafka.id, query.id) + .create("ns4", None) .await .expect("namespace create failed") .id; diff --git a/iox_catalog/src/sqlite.rs b/iox_catalog/src/sqlite.rs index 8afb6276bb..63696e8336 100644 --- a/iox_catalog/src/sqlite.rs +++ b/iox_catalog/src/sqlite.rs @@ -3,18 +3,21 @@ use crate::{ interface::{ self, CasFailure, Catalog, ColumnRepo, ColumnTypeMismatchSnafu, Error, NamespaceRepo, - ParquetFileRepo, PartitionRepo, QueryPoolRepo, RepoCollection, Result, SoftDeletedRows, - TableRepo, TopicMetadataRepo, MAX_PARQUET_FILES_SELECTED_ONCE, + ParquetFileRepo, PartitionRepo, RepoCollection, Result, SoftDeletedRows, TableRepo, + MAX_PARQUET_FILES_SELECTED_ONCE, + }, + kafkaless_transition::{ + SHARED_QUERY_POOL, SHARED_QUERY_POOL_ID, SHARED_TOPIC_ID, SHARED_TOPIC_NAME, + TRANSITION_SHARD_ID, TRANSITION_SHARD_INDEX, }, - kafkaless_transition::{TRANSITION_SHARD_ID, TRANSITION_SHARD_INDEX}, metrics::MetricDecorator, - DEFAULT_MAX_COLUMNS_PER_TABLE, DEFAULT_MAX_TABLES, SHARED_TOPIC_ID, SHARED_TOPIC_NAME, + DEFAULT_MAX_COLUMNS_PER_TABLE, DEFAULT_MAX_TABLES, }; use async_trait::async_trait; use data_types::{ Column, ColumnId, ColumnSet, ColumnType, CompactionLevel, Namespace, NamespaceId, ParquetFile, - ParquetFileId, ParquetFileParams, Partition, PartitionId, PartitionKey, QueryPool, QueryPoolId, - SkippedCompaction, Table, TableId, Timestamp, TopicId, TopicMetadata, + ParquetFileId, ParquetFileParams, Partition, PartitionId, PartitionKey, SkippedCompaction, + Table, TableId, Timestamp, }; use serde::{Deserialize, Serialize}; use std::collections::HashSet; @@ -165,7 +168,8 @@ impl Catalog for SqliteCatalog { .await .map_err(|e| Error::Setup { source: e.into() })?; - // We need to manually insert the topic here so that we can create the transition shard below. + // We need to manually insert the topic here so that we can create the transition shard + // below. sqlx::query( r#" INSERT INTO topic (name) @@ -195,6 +199,21 @@ DO NOTHING; .await .map_err(|e| Error::Setup { source: e })?; + // We need to manually insert the query pool here so that we can create namespaces that + // reference it. + sqlx::query( + r#" +INSERT INTO query_pool (name) +VALUES ($1) +ON CONFLICT (name) +DO NOTHING; + "#, + ) + .bind(SHARED_QUERY_POOL) + .execute(&self.pool) + .await + .map_err(|e| Error::Setup { source: e })?; + Ok(()) } @@ -221,14 +240,6 @@ DO NOTHING; #[async_trait] impl RepoCollection for SqliteTxn { - fn topics(&mut self) -> &mut dyn TopicMetadataRepo { - self - } - - fn query_pools(&mut self) -> &mut dyn QueryPoolRepo { - self - } - fn namespaces(&mut self) -> &mut dyn NamespaceRepo { self } @@ -250,90 +261,21 @@ impl RepoCollection for SqliteTxn { } } -#[async_trait] -impl TopicMetadataRepo for SqliteTxn { - async fn create_or_get(&mut self, name: &str) -> Result { - let rec = sqlx::query_as::<_, TopicMetadata>( - r#" -INSERT INTO topic ( name ) -VALUES ( $1 ) -ON CONFLICT (name) -DO UPDATE SET name = topic.name -RETURNING *; - "#, - ) - .bind(name) // $1 - .fetch_one(self.inner.get_mut()) - .await - .map_err(|e| Error::SqlxError { source: e })?; - - Ok(rec) - } - - async fn get_by_name(&mut self, name: &str) -> Result> { - let rec = sqlx::query_as::<_, TopicMetadata>( - r#" -SELECT * -FROM topic -WHERE name = $1; - "#, - ) - .bind(name) // $1 - .fetch_one(self.inner.get_mut()) - .await; - - if let Err(sqlx::Error::RowNotFound) = rec { - return Ok(None); - } - - let topic = rec.map_err(|e| Error::SqlxError { source: e })?; - - Ok(Some(topic)) - } -} - -#[async_trait] -impl QueryPoolRepo for SqliteTxn { - async fn create_or_get(&mut self, name: &str) -> Result { - let rec = sqlx::query_as::<_, QueryPool>( - r#" -INSERT INTO query_pool ( name ) -VALUES ( $1 ) -ON CONFLICT (name) -DO UPDATE SET name = query_pool.name -RETURNING *; - "#, - ) - .bind(name) // $1 - .fetch_one(self.inner.get_mut()) - .await - .map_err(|e| Error::SqlxError { source: e })?; - - Ok(rec) - } -} - #[async_trait] impl NamespaceRepo for SqliteTxn { - async fn create( - &mut self, - name: &str, - retention_period_ns: Option, - topic_id: TopicId, - query_pool_id: QueryPoolId, - ) -> Result { + async fn create(&mut self, name: &str, retention_period_ns: Option) -> Result { let rec = sqlx::query_as::<_, Namespace>( r#" - INSERT INTO namespace ( name, topic_id, query_pool_id, retention_period_ns, max_tables ) - VALUES ( $1, $2, $3, $4, $5 ) - RETURNING *; +INSERT INTO namespace ( name, topic_id, query_pool_id, retention_period_ns, max_tables ) +VALUES ( $1, $2, $3, $4, $5 ) +RETURNING id, name, retention_period_ns, max_tables, max_columns_per_table, deleted_at; "#, ) - .bind(name) // $1 - .bind(topic_id) // $2 - .bind(query_pool_id) // $3 - .bind(retention_period_ns) // $4 - .bind(DEFAULT_MAX_TABLES); // $5 + .bind(name) // $1 + .bind(SHARED_TOPIC_ID) // $2 + .bind(SHARED_QUERY_POOL_ID) // $3 + .bind(retention_period_ns) // $4 + .bind(DEFAULT_MAX_TABLES); // $5 let rec = rec.fetch_one(self.inner.get_mut()).await.map_err(|e| { if is_unique_violation(&e) { @@ -357,7 +299,11 @@ impl NamespaceRepo for SqliteTxn { async fn list(&mut self, deleted: SoftDeletedRows) -> Result> { let rec = sqlx::query_as::<_, Namespace>( format!( - r#"SELECT * FROM namespace WHERE {v};"#, + r#" +SELECT id, name, retention_period_ns, max_tables, max_columns_per_table, deleted_at +FROM namespace +WHERE {v}; + "#, v = deleted.as_sql_predicate() ) .as_str(), @@ -376,7 +322,11 @@ impl NamespaceRepo for SqliteTxn { ) -> Result> { let rec = sqlx::query_as::<_, Namespace>( format!( - r#"SELECT * FROM namespace WHERE id=$1 AND {v};"#, + r#" +SELECT id, name, retention_period_ns, max_tables, max_columns_per_table, deleted_at +FROM namespace +WHERE id=$1 AND {v}; + "#, v = deleted.as_sql_predicate() ) .as_str(), @@ -401,7 +351,11 @@ impl NamespaceRepo for SqliteTxn { ) -> Result> { let rec = sqlx::query_as::<_, Namespace>( format!( - r#"SELECT * FROM namespace WHERE name=$1 AND {v};"#, + r#" +SELECT id, name, retention_period_ns, max_tables, max_columns_per_table, deleted_at +FROM namespace +WHERE name=$1 AND {v}; + "#, v = deleted.as_sql_predicate() ) .as_str(), @@ -438,7 +392,7 @@ impl NamespaceRepo for SqliteTxn { UPDATE namespace SET max_tables = $1 WHERE name = $2 -RETURNING *; +RETURNING id, name, retention_period_ns, max_tables, max_columns_per_table, deleted_at; "#, ) .bind(new_max) @@ -462,7 +416,7 @@ RETURNING *; UPDATE namespace SET max_columns_per_table = $1 WHERE name = $2 -RETURNING *; +RETURNING id, name, retention_period_ns, max_tables, max_columns_per_table, deleted_at; "#, ) .bind(new_max) @@ -486,7 +440,12 @@ RETURNING *; retention_period_ns: Option, ) -> Result { let rec = sqlx::query_as::<_, Namespace>( - r#"UPDATE namespace SET retention_period_ns = $1 WHERE name = $2 RETURNING *;"#, + r#" +UPDATE namespace +SET retention_period_ns = $1 +WHERE name = $2 +RETURNING id, name, retention_period_ns, max_tables, max_columns_per_table, deleted_at; + "#, ) .bind(retention_period_ns) // $1 .bind(name) // $2 @@ -1436,7 +1395,7 @@ INSERT INTO parquet_file ( shard_id, table_id, partition_id, object_store_id, min_time, max_time, file_size_bytes, row_count, compaction_level, created_at, namespace_id, column_set, max_l0_created_at ) -VALUES ( $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14 ) +VALUES ( $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13 ) RETURNING id, table_id, partition_id, object_store_id, min_time, max_time, to_delete, file_size_bytes, @@ -1552,10 +1511,9 @@ fn is_unique_violation(e: &sqlx::Error) -> bool { #[cfg(test)] mod tests { use super::*; - use crate::create_or_get_default_records; use assert_matches::assert_matches; use metric::{Attributes, DurationHistogram, Metric}; - use std::{ops::DerefMut, sync::Arc}; + use std::sync::Arc; fn assert_metric_hit(metrics: &Registry, name: &'static str) { let histogram = metrics @@ -1596,16 +1554,12 @@ mod tests { let sqlite = setup_db().await; let sqlite: Arc = Arc::new(sqlite); - let mut txn = sqlite.repositories().await; - let (kafka, query) = create_or_get_default_records(txn.deref_mut()) - .await - .expect("db init failed"); let namespace_id = sqlite .repositories() .await .namespaces() - .create("ns4", None, kafka.id, query.id) + .create("ns4", None) .await .expect("namespace create failed") .id; @@ -1653,16 +1607,12 @@ mod tests { let metrics = Arc::clone(&sqlite.metrics); let sqlite: Arc = Arc::new(sqlite); - let mut txn = sqlite.repositories().await; - let (kafka, query) = create_or_get_default_records(txn.deref_mut()) - .await - .expect("db init failed"); let namespace_id = sqlite .repositories() .await .namespaces() - .create("ns4", None, kafka.id, query.id) + .create("ns4", None) .await .expect("namespace create failed") .id; @@ -1819,16 +1769,12 @@ mod tests { let pool = sqlite.pool.clone(); let sqlite: Arc = Arc::new(sqlite); - let mut txn = sqlite.repositories().await; - let (kafka, query) = create_or_get_default_records(txn.deref_mut()) - .await - .expect("db init failed"); let namespace_id = sqlite .repositories() .await .namespaces() - .create("ns4", None, kafka.id, query.id) + .create("ns4", None) .await .expect("namespace create failed") .id; diff --git a/iox_tests/src/catalog.rs b/iox_tests/src/catalog.rs index 0d3cd9507e..6564abede0 100644 --- a/iox_tests/src/catalog.rs +++ b/iox_tests/src/catalog.rs @@ -6,8 +6,7 @@ use arrow::{ }; use data_types::{ Column, ColumnSet, ColumnType, CompactionLevel, Namespace, NamespaceSchema, ParquetFile, - ParquetFileParams, Partition, PartitionId, QueryPool, Table, TableId, TableSchema, Timestamp, - TopicMetadata, + ParquetFileParams, Partition, PartitionId, Table, TableId, TableSchema, Timestamp, }; use datafusion::physical_plan::metrics::Count; use datafusion_util::MemoryStream; @@ -144,19 +143,14 @@ impl TestCatalog { retention_period_ns: Option, ) -> Arc { let mut repos = self.catalog.repositories().await; - - let topic = repos.topics().create_or_get("topic").await.unwrap(); - let query_pool = repos.query_pools().create_or_get("pool").await.unwrap(); let namespace = repos .namespaces() - .create(name, retention_period_ns, topic.id, query_pool.id) + .create(name, retention_period_ns) .await .unwrap(); Arc::new(TestNamespace { catalog: Arc::clone(self), - topic, - query_pool, namespace, }) } @@ -216,8 +210,6 @@ impl TestCatalog { #[allow(missing_docs)] pub struct TestNamespace { pub catalog: Arc, - pub topic: TopicMetadata, - pub query_pool: QueryPool, pub namespace: Namespace, } diff --git a/ioxd_router/src/lib.rs b/ioxd_router/src/lib.rs index fc4056b86c..992958f78f 100644 --- a/ioxd_router/src/lib.rs +++ b/ioxd_router/src/lib.rs @@ -64,9 +64,6 @@ pub enum Error { #[error("Catalog DSN error: {0}")] CatalogDsn(#[from] clap_blocks::catalog_dsn::Error), - #[error("No topic named '{topic_name}' found in the catalog")] - TopicCatalogLookup { topic_name: String }, - #[error("authz configuration error for '{addr}': '{source}'")] AuthzConfig { source: Box, @@ -273,46 +270,10 @@ pub async fn create_router2_server_type( // Initialise the Namespace ID lookup + cache let namespace_resolver = NamespaceSchemaResolver::new(Arc::clone(&ns_cache)); - //////////////////////////////////////////////////////////////////////////// - // - // THIS CODE IS FOR TESTING ONLY. - // - // The source of truth for the topics & query pools will be read from - // the DB, rather than CLI args for a prod deployment. - // - //////////////////////////////////////////////////////////////////////////// - // - // Look up the topic ID needed to populate namespace creation - // requests. - // - // This code / auto-creation is for architecture testing purposes only - a - // prod deployment would expect namespaces to be explicitly created and this - // layer would be removed. - let mut txn = catalog.repositories().await; - let topic_id = txn - .topics() - .get_by_name(&router_config.topic) - .await? - .map(|v| v.id) - .unwrap_or_else(|| panic!("no topic named {} in catalog", router_config.topic)); - let query_id = txn - .query_pools() - .create_or_get(&router_config.query_pool_name) - .await - .map(|v| v.id) - .unwrap_or_else(|e| { - panic!( - "failed to upsert query pool {} in catalog: {}", - router_config.query_pool_name, e - ) - }); - let namespace_resolver = NamespaceAutocreation::new( namespace_resolver, Arc::clone(&ns_cache), Arc::clone(&catalog), - topic_id, - query_id, { if router_config.namespace_autocreation_enabled { MissingNamespaceAction::AutoCreate( @@ -394,7 +355,7 @@ pub async fn create_router2_server_type( // Initialize the gRPC API delegate that creates the services relevant to the RPC // write router path and use it to create the relevant `RpcWriteRouterServer` and // `RpcWriteRouterServerType`. - let grpc = RpcWriteGrpcDelegate::new(catalog, object_store, topic_id, query_id); + let grpc = RpcWriteGrpcDelegate::new(catalog, object_store); let router_server = RpcWriteRouterServer::new(http, grpc, metrics, common_state.trace_collector()); @@ -434,13 +395,7 @@ mod tests { let catalog = Arc::new(MemCatalog::new(Default::default())); let mut repos = catalog.repositories().await; - let topic = repos.topics().create_or_get("foo").await.unwrap(); - let pool = repos.query_pools().create_or_get("foo").await.unwrap(); - let namespace = repos - .namespaces() - .create("test_ns", None, topic.id, pool.id) - .await - .unwrap(); + let namespace = repos.namespaces().create("test_ns", None).await.unwrap(); let table = repos .tables() diff --git a/mutable_batch/src/payload/partition.rs b/mutable_batch/src/payload/partition.rs index e3fe61dcf2..5ed817019a 100644 --- a/mutable_batch/src/payload/partition.rs +++ b/mutable_batch/src/payload/partition.rs @@ -94,8 +94,6 @@ fn partition_keys<'a>( |col| Template::Column(col, name), ), TemplatePart::TimeFormat(fmt) => Template::TimeFormat(time, StrftimeItems::new(fmt)), - TemplatePart::RegexCapture(_) => unimplemented!(), - TemplatePart::StrftimeColumn(_) => unimplemented!(), }) .collect(); diff --git a/query_functions/src/selectors.rs b/query_functions/src/selectors.rs index a990a7b53b..2e01943e0d 100644 --- a/query_functions/src/selectors.rs +++ b/query_functions/src/selectors.rs @@ -112,10 +112,8 @@ use datafusion::{ /// Internal implementations of the selector functions mod internal; use internal::{ - BooleanFirstSelector, BooleanLastSelector, BooleanMaxSelector, BooleanMinSelector, - F64FirstSelector, F64LastSelector, F64MaxSelector, F64MinSelector, I64FirstSelector, - I64LastSelector, I64MaxSelector, I64MinSelector, U64FirstSelector, U64LastSelector, - U64MaxSelector, U64MinSelector, Utf8FirstSelector, Utf8LastSelector, Utf8MaxSelector, + BooleanMaxSelector, BooleanMinSelector, F64MaxSelector, F64MinSelector, FirstSelector, + I64MaxSelector, I64MinSelector, LastSelector, U64MaxSelector, U64MinSelector, Utf8MaxSelector, Utf8MinSelector, }; use schema::TIME_DATA_TYPE; @@ -255,40 +253,29 @@ impl FactoryBuilder { let accumulator: Box = match (selector_type, value_type) { // First - (SelectorType::First, DataType::Float64) => { - Box::new(SelectorAccumulator::::new()) + (SelectorType::First, value_type) => { + Box::new(SelectorAccumulator::new(FirstSelector::new(value_type)?)) } - (SelectorType::First, DataType::Int64) => Box::new(SelectorAccumulator::::new()), - (SelectorType::First, DataType::UInt64) => Box::new(SelectorAccumulator::::new()), - (SelectorType::First, DataType::Utf8) => Box::new(SelectorAccumulator::::new()), - (SelectorType::First, DataType::Boolean) => Box::new(SelectorAccumulator::::new( - )), // Last - (SelectorType::Last, DataType::Float64) => Box::new(SelectorAccumulator::::new()), - (SelectorType::Last, DataType::Int64) => Box::new(SelectorAccumulator::::new()), - (SelectorType::Last, DataType::UInt64) => Box::new(SelectorAccumulator::::new()), - (SelectorType::Last, DataType::Utf8) => Box::new(SelectorAccumulator::::new()), - (SelectorType::Last, DataType::Boolean) => { - Box::new(SelectorAccumulator::::new()) - }, + (SelectorType::Last, data_type) => Box::new(SelectorAccumulator::new(LastSelector::new(data_type)?)), // Min - (SelectorType::Min, DataType::Float64) => Box::new(SelectorAccumulator::::new()), - (SelectorType::Min, DataType::Int64) => Box::new(SelectorAccumulator::::new()), - (SelectorType::Min, DataType::UInt64) => Box::new(SelectorAccumulator::::new()), - (SelectorType::Min, DataType::Utf8) => Box::new(SelectorAccumulator::::new()), + (SelectorType::Min, DataType::Float64) => Box::new(SelectorAccumulator::new(F64MinSelector::default())), + (SelectorType::Min, DataType::Int64) => Box::new(SelectorAccumulator::new(I64MinSelector::default())), + (SelectorType::Min, DataType::UInt64) => Box::new(SelectorAccumulator::new(U64MinSelector::default())), + (SelectorType::Min, DataType::Utf8) => Box::new(SelectorAccumulator::new(Utf8MinSelector::default())), (SelectorType::Min, DataType::Boolean) => { - Box::new(SelectorAccumulator::::new()) + Box::new(SelectorAccumulator::<>::new(BooleanMinSelector::default())) }, // Max - (SelectorType::Max, DataType::Float64) => Box::new(SelectorAccumulator::::new()), - (SelectorType::Max, DataType::Int64) => Box::new(SelectorAccumulator::::new()), - (SelectorType::Max, DataType::UInt64) => Box::new(SelectorAccumulator::::new()), - (SelectorType::Max, DataType::Utf8) => Box::new(SelectorAccumulator::::new()), + (SelectorType::Max, DataType::Float64) => Box::new(SelectorAccumulator::new(F64MaxSelector::default())), + (SelectorType::Max, DataType::Int64) => Box::new(SelectorAccumulator::new(I64MaxSelector::default())), + (SelectorType::Max, DataType::UInt64) => Box::new(SelectorAccumulator::new(U64MaxSelector::default())), + (SelectorType::Max, DataType::Utf8) => Box::new(SelectorAccumulator::new(Utf8MaxSelector::default())), (SelectorType::Max, DataType::Boolean) => { - Box::new(SelectorAccumulator::::new()) + Box::new(SelectorAccumulator::new(BooleanMaxSelector::default())) }, // Catch (selector_type, value_type) => return Err(DataFusionError::Internal(format!( @@ -303,7 +290,7 @@ impl FactoryBuilder { /// Implements the logic of the specific selector function (this is a /// cutdown version of the Accumulator DataFusion trait, to allow /// sharing between implementations) -trait Selector: Debug + Default + Send + Sync { +trait Selector: Debug + Send + Sync { /// return state in a form that DataFusion can store during execution fn datafusion_state(&self) -> DataFusionResult>; @@ -412,10 +399,8 @@ impl SelectorAccumulator where SELECTOR: Selector, { - pub fn new() -> Self { - Self { - selector: SELECTOR::default(), - } + pub fn new(selector: SELECTOR) -> Self { + Self { selector } } } diff --git a/query_functions/src/selectors/internal.rs b/query_functions/src/selectors/internal.rs index 94404ff26e..e55fe8a614 100644 --- a/query_functions/src/selectors/internal.rs +++ b/query_functions/src/selectors/internal.rs @@ -17,7 +17,7 @@ use arrow::{ max as array_max, max_boolean as array_max_boolean, max_string as array_max_string, min as array_min, min_boolean as array_min_boolean, min_string as array_min_string, }, - datatypes::{Field, Fields}, + datatypes::{DataType, Field, Fields}, }; use datafusion::{error::Result as DataFusionResult, scalar::ScalarValue}; @@ -116,219 +116,169 @@ fn make_scalar_struct(data_fields: Vec) -> ScalarValue { ScalarValue::Struct(Some(data_fields), Fields::from(fields)) } -macro_rules! make_first_selector { - ($STRUCTNAME:ident, $RUSTTYPE:ident, $ARRTYPE:ident, $MINFUNC:ident, $TO_SCALARVALUE: expr) => { - #[derive(Debug)] - pub struct $STRUCTNAME { - value: Option<$RUSTTYPE>, - time: Option, - } - - impl Default for $STRUCTNAME { - fn default() -> Self { - Self { - value: None, - time: None, - } - } - } - - impl Selector for $STRUCTNAME { - fn datafusion_state(&self) -> DataFusionResult> { - Ok(vec![ - $TO_SCALARVALUE(self.value.clone()), - ScalarValue::TimestampNanosecond(self.time, None), - ]) - } - - fn evaluate(&self) -> DataFusionResult { - Ok(make_scalar_struct(vec![ - $TO_SCALARVALUE(self.value.clone()), - ScalarValue::TimestampNanosecond(self.time, None), - ])) - } - - fn update_batch( - &mut self, - value_arr: &ArrayRef, - time_arr: &ArrayRef, - ) -> DataFusionResult<()> { - let value_arr = value_arr - .as_any() - .downcast_ref::<$ARRTYPE>() - // the input type arguments should be ensured by datafusion - .expect("First argument was value"); - - let time_arr = time_arr - .as_any() - .downcast_ref::() - // the input type arguments should be ensured by datafusion - .expect("Second argument was time"); - - // Only look for times where the array also has a non - // null value (the time array should have no nulls itself) - // - // For example, for the following input, the correct - // current min time is 200 (not 100) - // - // value | time - // -------------- - // NULL | 100 - // A | 200 - // B | 300 - // - // Note this could likely be faster if we used `ArrayData` APIs - let time_arr: TimestampNanosecondArray = time_arr - .iter() - .zip(value_arr.iter()) - .map(|(ts, value)| if value.is_some() { ts } else { None }) - .collect(); - - let cur_min_time = $MINFUNC(&time_arr); - - let need_update = match (&self.time, &cur_min_time) { - (Some(time), Some(cur_min_time)) => cur_min_time < time, - // No existing minimum, so update needed - (None, Some(_)) => true, - // No actual minimum time found, so no update needed - (_, None) => false, - }; - - if need_update { - let index = time_arr - .iter() - // arrow doesn't tell us what index had the - // minimum, so need to find it ourselves see also - // https://github.com/apache/arrow-datafusion/issues/600 - .enumerate() - .find(|(_, time)| cur_min_time == *time) - .map(|(idx, _)| idx) - .unwrap(); // value always exists - - self.time = cur_min_time; - self.value = if value_arr.is_null(index) { - None - } else { - Some(value_arr.value(index).to_owned()) - }; - } - - Ok(()) - } - - fn size(&self) -> usize { - // no nested types - std::mem::size_of_val(self) - } - } - }; +#[derive(Debug)] +pub struct FirstSelector { + value: ScalarValue, + time: Option, } -macro_rules! make_last_selector { - ($STRUCTNAME:ident, $RUSTTYPE:ident, $ARRTYPE:ident, $MAXFUNC:ident, $TO_SCALARVALUE: expr) => { - #[derive(Debug)] - pub struct $STRUCTNAME { - value: Option<$RUSTTYPE>, - time: Option, +impl FirstSelector { + pub fn new(data_type: &DataType) -> DataFusionResult { + Ok(Self { + value: ScalarValue::try_from(data_type)?, + time: None, + }) + } +} + +impl Selector for FirstSelector { + fn datafusion_state(&self) -> DataFusionResult> { + Ok(vec![ + self.value.clone(), + ScalarValue::TimestampNanosecond(self.time, None), + ]) + } + + fn evaluate(&self) -> DataFusionResult { + Ok(make_scalar_struct(vec![ + self.value.clone(), + ScalarValue::TimestampNanosecond(self.time, None), + ])) + } + + fn update_batch(&mut self, value_arr: &ArrayRef, time_arr: &ArrayRef) -> DataFusionResult<()> { + // Only look for times where the array also has a non + // null value (the time array should have no nulls itself) + // + // For example, for the following input, the correct + // current min time is 200 (not 100) + // + // value | time + // -------------- + // NULL | 100 + // A | 200 + // B | 300 + // + let time_arr = arrow::compute::nullif(time_arr, &arrow::compute::is_null(&value_arr)?)?; + + let time_arr = time_arr + .as_any() + .downcast_ref::() + // the input type arguments should be ensured by datafusion + .expect("Second argument was time"); + let cur_min_time = array_min(time_arr); + + let need_update = match (&self.time, &cur_min_time) { + (Some(time), Some(cur_min_time)) => cur_min_time < time, + // No existing minimum, so update needed + (None, Some(_)) => true, + // No actual minimum time found, so no update needed + (_, None) => false, + }; + + if need_update { + let index = time_arr + .iter() + // arrow doesn't tell us what index had the + // minimum, so need to find it ourselves see also + // https://github.com/apache/arrow-datafusion/issues/600 + .enumerate() + .find(|(_, time)| cur_min_time == *time) + .map(|(idx, _)| idx) + .unwrap(); // value always exists + + self.time = cur_min_time; + self.value = ScalarValue::try_from_array(&value_arr, index)?; } - impl Default for $STRUCTNAME { - fn default() -> Self { - Self { - value: None, - time: None, - } - } + Ok(()) + } + + fn size(&self) -> usize { + std::mem::size_of_val(self) - std::mem::size_of_val(&self.value) + self.value.size() + } +} + +#[derive(Debug)] +pub struct LastSelector { + value: ScalarValue, + time: Option, +} + +impl LastSelector { + pub fn new(data_type: &DataType) -> DataFusionResult { + Ok(Self { + value: ScalarValue::try_from(data_type)?, + time: None, + }) + } +} + +impl Selector for LastSelector { + fn datafusion_state(&self) -> DataFusionResult> { + Ok(vec![ + self.value.clone(), + ScalarValue::TimestampNanosecond(self.time, None), + ]) + } + + fn evaluate(&self) -> DataFusionResult { + Ok(make_scalar_struct(vec![ + self.value.clone(), + ScalarValue::TimestampNanosecond(self.time, None), + ])) + } + + fn update_batch(&mut self, value_arr: &ArrayRef, time_arr: &ArrayRef) -> DataFusionResult<()> { + // Only look for times where the array also has a non + // null value (the time array should have no nulls itself) + // + // For example, for the following input, the correct + // current max time is 200 (not 300) + // + // value | time + // -------------- + // A | 100 + // B | 200 + // NULL | 300 + // + let time_arr = arrow::compute::nullif(time_arr, &arrow::compute::is_null(&value_arr)?)?; + + let time_arr = time_arr + .as_any() + .downcast_ref::() + // the input type arguments should be ensured by datafusion + .expect("Second argument was time"); + let cur_max_time = array_max(time_arr); + + let need_update = match (&self.time, &cur_max_time) { + (Some(time), Some(cur_max_time)) => time < cur_max_time, + // No existing maximum, so update needed + (None, Some(_)) => true, + // No actual maximum value found, so no update needed + (_, None) => false, + }; + + if need_update { + let index = time_arr + .iter() + // arrow doesn't tell us what index had the + // maximum, so need to find it ourselves + .enumerate() + .find(|(_, time)| cur_max_time == *time) + .map(|(idx, _)| idx) + .unwrap(); // value always exists + + self.time = cur_max_time; + self.value = ScalarValue::try_from_array(&value_arr, index)?; } - impl Selector for $STRUCTNAME { - fn datafusion_state(&self) -> DataFusionResult> { - Ok(vec![ - $TO_SCALARVALUE(self.value.clone()), - ScalarValue::TimestampNanosecond(self.time, None), - ]) - } + Ok(()) + } - fn evaluate(&self) -> DataFusionResult { - Ok(make_scalar_struct(vec![ - $TO_SCALARVALUE(self.value.clone()), - ScalarValue::TimestampNanosecond(self.time, None), - ])) - } - - fn update_batch( - &mut self, - value_arr: &ArrayRef, - time_arr: &ArrayRef, - ) -> DataFusionResult<()> { - let value_arr = value_arr - .as_any() - .downcast_ref::<$ARRTYPE>() - // the input type arguments should be ensured by datafusion - .expect("First argument was value"); - - let time_arr = time_arr - .as_any() - .downcast_ref::() - // the input type arguments should be ensured by datafusion - .expect("Second argument was time"); - - // Only look for times where the array also has a non - // null value (the time array should have no nulls itself) - // - // For example, for the following input, the correct - // current max time is 200 (not 300) - // - // value | time - // -------------- - // A | 100 - // B | 200 - // NULL | 300 - // - // Note this could likely be faster if we used `ArrayData` APIs - let time_arr: TimestampNanosecondArray = time_arr - .iter() - .zip(value_arr.iter()) - .map(|(ts, value)| if value.is_some() { ts } else { None }) - .collect(); - - let cur_max_time = $MAXFUNC(&time_arr); - - let need_update = match (&self.time, &cur_max_time) { - (Some(time), Some(cur_max_time)) => time < cur_max_time, - // No existing maximum, so update needed - (None, Some(_)) => true, - // No actual maximum value found, so no update needed - (_, None) => false, - }; - - if need_update { - let index = time_arr - .iter() - // arrow doesn't tell us what index had the - // maximum, so need to find it ourselves - .enumerate() - .find(|(_, time)| cur_max_time == *time) - .map(|(idx, _)| idx) - .unwrap(); // value always exists - - self.time = cur_max_time; - self.value = if value_arr.is_null(index) { - None - } else { - Some(value_arr.value(index).to_owned()) - }; - } - - Ok(()) - } - - fn size(&self) -> usize { - // no nested types - std::mem::size_of_val(self) - } - } - }; + fn size(&self) -> usize { + std::mem::size_of_val(self) - std::mem::size_of_val(&self.value) + self.value.size() + } } /// Did we find a new min/max @@ -583,82 +533,6 @@ macro_rules! make_max_selector { }; } -// FIRST - -make_first_selector!( - F64FirstSelector, - f64, - Float64Array, - array_min, - ScalarValue::Float64 -); -make_first_selector!( - I64FirstSelector, - i64, - Int64Array, - array_min, - ScalarValue::Int64 -); -make_first_selector!( - U64FirstSelector, - u64, - UInt64Array, - array_min, - ScalarValue::UInt64 -); -make_first_selector!( - Utf8FirstSelector, - String, - StringArray, - array_min, - ScalarValue::Utf8 -); -make_first_selector!( - BooleanFirstSelector, - bool, - BooleanArray, - array_min, - ScalarValue::Boolean -); - -// LAST - -make_last_selector!( - F64LastSelector, - f64, - Float64Array, - array_max, - ScalarValue::Float64 -); -make_last_selector!( - I64LastSelector, - i64, - Int64Array, - array_max, - ScalarValue::Int64 -); -make_last_selector!( - U64LastSelector, - u64, - UInt64Array, - array_max, - ScalarValue::UInt64 -); -make_last_selector!( - Utf8LastSelector, - String, - StringArray, - array_max, - ScalarValue::Utf8 -); -make_last_selector!( - BooleanLastSelector, - bool, - BooleanArray, - array_max, - ScalarValue::Boolean -); - // MIN make_min_selector!( diff --git a/router/src/namespace_cache/memory.rs b/router/src/namespace_cache/memory.rs index bfe0b5c97a..8cdc04f942 100644 --- a/router/src/namespace_cache/memory.rs +++ b/router/src/namespace_cache/memory.rs @@ -139,8 +139,7 @@ mod tests { use assert_matches::assert_matches; use data_types::{ - Column, ColumnId, ColumnSchema, ColumnType, NamespaceId, QueryPoolId, TableId, TableSchema, - TopicId, + Column, ColumnId, ColumnSchema, ColumnType, NamespaceId, TableId, TableSchema, }; use proptest::{prelude::*, prop_compose, proptest}; @@ -162,8 +161,6 @@ mod tests { let schema1 = NamespaceSchema { id: TEST_NAMESPACE_ID, - topic_id: TopicId::new(24), - query_pool_id: QueryPoolId::new(1234), tables: Default::default(), max_columns_per_table: 50, max_tables: 24, @@ -180,8 +177,6 @@ mod tests { let schema2 = NamespaceSchema { id: TEST_NAMESPACE_ID, - topic_id: TopicId::new(2), - query_pool_id: QueryPoolId::new(2), tables: Default::default(), max_columns_per_table: 10, max_tables: 42, @@ -228,8 +223,6 @@ mod tests { let schema_update_1 = NamespaceSchema { id: NamespaceId::new(42), - topic_id: TopicId::new(76), - query_pool_id: QueryPoolId::new(64), tables: BTreeMap::from([(String::from(table_name), first_write_table_schema)]), max_columns_per_table: 50, max_tables: 24, @@ -311,8 +304,6 @@ mod tests { let schema_update_1 = NamespaceSchema { id: NamespaceId::new(42), - topic_id: TopicId::new(76), - query_pool_id: QueryPoolId::new(64), tables: BTreeMap::from([ (String::from("table_1"), table_1.to_owned()), (String::from("table_2"), table_2.to_owned()), @@ -410,8 +401,6 @@ mod tests { NamespaceSchema { id: TEST_NAMESPACE_ID, tables, - topic_id: TopicId::new(1), // Ignored - query_pool_id: QueryPoolId::new(1), // Ignored max_columns_per_table, max_tables, retention_period_ns, diff --git a/router/src/namespace_cache/metrics.rs b/router/src/namespace_cache/metrics.rs index 7273afc91d..d76815357c 100644 --- a/router/src/namespace_cache/metrics.rs +++ b/router/src/namespace_cache/metrics.rs @@ -128,9 +128,7 @@ mod tests { use std::collections::BTreeMap; use assert_matches::assert_matches; - use data_types::{ - ColumnId, ColumnSchema, ColumnType, NamespaceId, QueryPoolId, TableId, TableSchema, TopicId, - }; + use data_types::{ColumnId, ColumnSchema, ColumnType, NamespaceId, TableId, TableSchema}; use metric::{Attributes, MetricObserver, Observation}; use super::*; @@ -168,8 +166,6 @@ mod tests { NamespaceSchema { id: NamespaceId::new(42), - topic_id: TopicId::new(24), - query_pool_id: QueryPoolId::new(1234), tables, max_columns_per_table: 100, max_tables: 42, diff --git a/router/src/namespace_cache/read_through_cache.rs b/router/src/namespace_cache/read_through_cache.rs index 7691e8a1bc..1c600eed76 100644 --- a/router/src/namespace_cache/read_through_cache.rs +++ b/router/src/namespace_cache/read_through_cache.rs @@ -98,7 +98,7 @@ where #[cfg(test)] mod tests { use assert_matches::assert_matches; - use data_types::{NamespaceId, QueryPoolId, TopicId}; + use data_types::NamespaceId; use iox_catalog::mem::MemCatalog; use super::*; @@ -120,8 +120,6 @@ mod tests { // Place a schema in the cache for that name let schema1 = NamespaceSchema::new( NamespaceId::new(1), - TopicId::new(2), - QueryPoolId::new(3), iox_catalog::DEFAULT_MAX_COLUMNS_PER_TABLE, iox_catalog::DEFAULT_MAX_TABLES, iox_catalog::DEFAULT_RETENTION_PERIOD, @@ -156,8 +154,6 @@ mod tests { // Place a schema in the catalog for that name let schema1 = NamespaceSchema::new( NamespaceId::new(1), - TopicId::new(2), - QueryPoolId::new(3), iox_catalog::DEFAULT_MAX_COLUMNS_PER_TABLE, iox_catalog::DEFAULT_MAX_TABLES, iox_catalog::DEFAULT_RETENTION_PERIOD, @@ -167,12 +163,7 @@ mod tests { .repositories() .await .namespaces() - .create( - &ns, - iox_catalog::DEFAULT_RETENTION_PERIOD, - schema1.topic_id, - schema1.query_pool_id, - ) + .create(&ns, iox_catalog::DEFAULT_RETENTION_PERIOD,) .await, Ok(_) ); diff --git a/router/src/namespace_cache/sharded_cache.rs b/router/src/namespace_cache/sharded_cache.rs index c9ba7a70c7..3b91649343 100644 --- a/router/src/namespace_cache/sharded_cache.rs +++ b/router/src/namespace_cache/sharded_cache.rs @@ -51,7 +51,7 @@ mod tests { use std::{collections::HashMap, iter}; use assert_matches::assert_matches; - use data_types::{NamespaceId, QueryPoolId, TopicId}; + use data_types::NamespaceId; use rand::{distributions::Alphanumeric, thread_rng, Rng}; use super::*; @@ -70,8 +70,6 @@ mod tests { fn schema_with_id(id: i64) -> NamespaceSchema { NamespaceSchema { id: NamespaceId::new(id), - topic_id: TopicId::new(1), - query_pool_id: QueryPoolId::new(1), tables: Default::default(), max_columns_per_table: 7, max_tables: 42, diff --git a/router/src/namespace_resolver.rs b/router/src/namespace_resolver.rs index b0caacb2a9..555489ab25 100644 --- a/router/src/namespace_resolver.rs +++ b/router/src/namespace_resolver.rs @@ -71,7 +71,7 @@ mod tests { use std::sync::Arc; use assert_matches::assert_matches; - use data_types::{NamespaceId, NamespaceSchema, QueryPoolId, TopicId}; + use data_types::{NamespaceId, NamespaceSchema}; use iox_catalog::{ interface::{Catalog, SoftDeletedRows}, mem::MemCatalog, @@ -96,8 +96,6 @@ mod tests { ns.clone(), NamespaceSchema { id: NamespaceId::new(42), - topic_id: TopicId::new(2), - query_pool_id: QueryPoolId::new(3), tables: Default::default(), max_columns_per_table: 4, max_tables: 42, @@ -143,11 +141,9 @@ mod tests { // Create the namespace in the catalog { let mut repos = catalog.repositories().await; - let topic = repos.topics().create_or_get("bananas").await.unwrap(); - let query_pool = repos.query_pools().create_or_get("platanos").await.unwrap(); repos .namespaces() - .create(&ns, None, topic.id, query_pool.id) + .create(&ns, None) .await .expect("failed to setup catalog state"); } @@ -177,11 +173,9 @@ mod tests { // Create the namespace in the catalog and mark it as deleted { let mut repos = catalog.repositories().await; - let topic = repos.topics().create_or_get("bananas").await.unwrap(); - let query_pool = repos.query_pools().create_or_get("platanos").await.unwrap(); repos .namespaces() - .create(&ns, None, topic.id, query_pool.id) + .create(&ns, None) .await .expect("failed to setup catalog state"); repos diff --git a/router/src/namespace_resolver/ns_autocreation.rs b/router/src/namespace_resolver/ns_autocreation.rs index a0514ee098..2ec498e5f2 100644 --- a/router/src/namespace_resolver/ns_autocreation.rs +++ b/router/src/namespace_resolver/ns_autocreation.rs @@ -1,7 +1,7 @@ use std::{fmt::Debug, sync::Arc}; use async_trait::async_trait; -use data_types::{NamespaceId, NamespaceName, QueryPoolId, TopicId}; +use data_types::{NamespaceId, NamespaceName}; use iox_catalog::interface::Catalog; use observability_deps::tracing::*; use thiserror::Error; @@ -43,8 +43,6 @@ pub struct NamespaceAutocreation { cache: C, catalog: Arc, - topic_id: TopicId, - query_id: QueryPoolId, action: MissingNamespaceAction, } @@ -53,7 +51,7 @@ impl NamespaceAutocreation { /// namespace exists in `catalog`. /// /// If the namespace does not exist, it is created with the specified - /// `topic_id`, `query_id` and `retention` policy. + /// `retention` policy. /// /// Namespaces are looked up in `cache`, skipping the creation request to /// the catalog if there's a hit. @@ -61,16 +59,12 @@ impl NamespaceAutocreation { inner: T, cache: C, catalog: Arc, - topic_id: TopicId, - query_id: QueryPoolId, action: MissingNamespaceAction, ) -> Self { Self { inner, cache, catalog, - topic_id, - query_id, action, } } @@ -113,12 +107,7 @@ where .repositories() .await .namespaces() - .create( - namespace.as_str(), - retention_period_ns, - self.topic_id, - self.query_id, - ) + .create(namespace.as_str(), retention_period_ns) .await { Ok(_) => { @@ -178,8 +167,6 @@ mod tests { ns.clone(), NamespaceSchema { id: NAMESPACE_ID, - topic_id: TopicId::new(2), - query_pool_id: QueryPoolId::new(3), tables: Default::default(), max_columns_per_table: 4, max_tables: 42, @@ -191,8 +178,6 @@ mod tests { MockNamespaceResolver::default().with_mapping(ns.clone(), NAMESPACE_ID), cache, Arc::clone(&catalog), - TopicId::new(42), - QueryPoolId::new(42), MissingNamespaceAction::AutoCreate(TEST_RETENTION_PERIOD_NS), ); @@ -233,8 +218,6 @@ mod tests { MockNamespaceResolver::default().with_mapping(ns.clone(), NamespaceId::new(1)), cache, Arc::clone(&catalog), - TopicId::new(42), - QueryPoolId::new(42), MissingNamespaceAction::AutoCreate(TEST_RETENTION_PERIOD_NS), ); @@ -259,8 +242,6 @@ mod tests { Namespace { id: NamespaceId::new(1), name: ns.to_string(), - topic_id: TopicId::new(42), - query_pool_id: QueryPoolId::new(42), max_tables: iox_catalog::DEFAULT_MAX_TABLES, max_columns_per_table: iox_catalog::DEFAULT_MAX_COLUMNS_PER_TABLE, retention_period_ns: TEST_RETENTION_PERIOD_NS, @@ -284,8 +265,6 @@ mod tests { MockNamespaceResolver::default(), cache, Arc::clone(&catalog), - TopicId::new(42), - QueryPoolId::new(42), MissingNamespaceAction::Reject, ); @@ -324,8 +303,6 @@ mod tests { NamespaceSchemaResolver::new(Arc::clone(&cache)), Arc::clone(&cache), Arc::clone(&catalog), - TopicId::new(42), - QueryPoolId::new(42), MissingNamespaceAction::AutoCreate(TEST_RETENTION_PERIOD_NS), ); @@ -339,8 +316,6 @@ mod tests { NamespaceSchemaResolver::new(Arc::clone(&cache)), cache, Arc::clone(&catalog), - TopicId::new(42), - QueryPoolId::new(42), MissingNamespaceAction::Reject, ); diff --git a/router/src/server/grpc.rs b/router/src/server/grpc.rs index f00029c517..34ecae0a04 100644 --- a/router/src/server/grpc.rs +++ b/router/src/server/grpc.rs @@ -1,6 +1,5 @@ //! gRPC service implementations for `router`. -use data_types::{QueryPoolId, TopicId}; use generated_types::influxdata::iox::{catalog::v1::*, namespace::v1::*, object_store::v1::*}; use iox_catalog::interface::Catalog; use object_store::DynObjectStore; @@ -15,25 +14,14 @@ use std::sync::Arc; pub struct RpcWriteGrpcDelegate { catalog: Arc, object_store: Arc, - - // Temporary values during kafka -> kafkaless transition. - topic_id: TopicId, - query_id: QueryPoolId, } impl RpcWriteGrpcDelegate { /// Create a new gRPC handler - pub fn new( - catalog: Arc, - object_store: Arc, - topic_id: TopicId, - query_id: QueryPoolId, - ) -> Self { + pub fn new(catalog: Arc, object_store: Arc) -> Self { Self { catalog, object_store, - topic_id, - query_id, } } @@ -62,10 +50,6 @@ impl RpcWriteGrpcDelegate { /// /// [`NamespaceService`]: generated_types::influxdata::iox::namespace::v1::namespace_service_server::NamespaceService. pub fn namespace_service(&self) -> impl namespace_service_server::NamespaceService { - NamespaceService::new( - Arc::clone(&self.catalog), - Some(self.topic_id), - Some(self.query_id), - ) + NamespaceService::new(Arc::clone(&self.catalog)) } } diff --git a/router/tests/common/mod.rs b/router/tests/common/mod.rs index d755b2e7d3..bea90ebaad 100644 --- a/router/tests/common/mod.rs +++ b/router/tests/common/mod.rs @@ -1,6 +1,6 @@ use std::{iter, string::String, sync::Arc, time::Duration}; -use data_types::{PartitionTemplate, QueryPoolId, TableId, TemplatePart, TopicId}; +use data_types::{PartitionTemplate, TableId, TemplatePart}; use generated_types::influxdata::iox::ingester::v1::WriteRequest; use hashbrown::HashMap; use hyper::{Body, Request, Response}; @@ -24,14 +24,6 @@ use router::{ }, }; -/// The topic catalog ID assigned by the namespace auto-creator in the -/// handler stack for namespaces it has not yet observed. -pub const TEST_TOPIC_ID: i64 = 1; - -/// The query pool catalog ID assigned by the namespace auto-creator in the -/// handler stack for namespaces it has not yet observed. -pub const TEST_QUERY_POOL_ID: i64 = 1; - /// Common retention period value we'll use in tests pub const TEST_RETENTION_PERIOD: Duration = Duration::from_secs(3600); @@ -168,8 +160,6 @@ impl TestContext { namespace_resolver, Arc::clone(&ns_cache), Arc::clone(&catalog), - TopicId::new(TEST_TOPIC_ID), - QueryPoolId::new(TEST_QUERY_POOL_ID), namespace_autocreation, ); @@ -193,12 +183,8 @@ impl TestContext { write_request_unifier, ); - let grpc_delegate = RpcWriteGrpcDelegate::new( - Arc::clone(&catalog), - Arc::new(InMemory::default()), - TopicId::new(TEST_TOPIC_ID), - QueryPoolId::new(TEST_QUERY_POOL_ID), - ); + let grpc_delegate = + RpcWriteGrpcDelegate::new(Arc::clone(&catalog), Arc::new(InMemory::default())); Self { client, diff --git a/router/tests/http.rs b/router/tests/http.rs index cb1203c43c..264c0b1a36 100644 --- a/router/tests/http.rs +++ b/router/tests/http.rs @@ -1,6 +1,6 @@ -use crate::common::{TestContextBuilder, TEST_QUERY_POOL_ID, TEST_RETENTION_PERIOD, TEST_TOPIC_ID}; +use crate::common::{TestContextBuilder, TEST_RETENTION_PERIOD}; use assert_matches::assert_matches; -use data_types::{ColumnType, QueryPoolId, TopicId}; +use data_types::ColumnType; use futures::{stream::FuturesUnordered, StreamExt}; use generated_types::influxdata::{iox::ingester::v1::WriteRequest, pbdata::v1::DatabaseBatch}; use hashbrown::HashMap; @@ -62,8 +62,6 @@ async fn test_write_ok() { .expect("query should succeed") .expect("namespace not found"); assert_eq!(ns.name, "bananas_test"); - assert_eq!(ns.topic_id, TopicId::new(TEST_TOPIC_ID)); - assert_eq!(ns.query_pool_id, QueryPoolId::new(TEST_QUERY_POOL_ID)); assert_eq!(ns.retention_period_ns, None); // Ensure the metric instrumentation was hit @@ -272,12 +270,7 @@ async fn test_write_propagate_ids() { .repositories() .await .namespaces() - .create( - "bananas_test", - None, - TopicId::new(TEST_TOPIC_ID), - QueryPoolId::new(TEST_QUERY_POOL_ID), - ) + .create("bananas_test", None) .await .expect("failed to update table limit"); @@ -359,12 +352,7 @@ async fn test_delete_unsupported() { .repositories() .await .namespaces() - .create( - "bananas_test", - None, - TopicId::new(TEST_TOPIC_ID), - QueryPoolId::new(TEST_QUERY_POOL_ID), - ) + .create("bananas_test", None) .await .expect("failed to update table limit"); diff --git a/scripts/docker_catalog.sh b/scripts/docker_catalog.sh index af25b70e9a..78fad806b7 100755 --- a/scripts/docker_catalog.sh +++ b/scripts/docker_catalog.sh @@ -31,7 +31,6 @@ export INFLUXDB_IOX_CATALOG_DSN="postgresql://postgres@localhost:5432/postgres" export DATABASE_URL="${INFLUXDB_IOX_CATALOG_DSN}" cargo sqlx database create cargo run -q -- catalog setup -cargo run -q -- catalog topic update iox-shared echo "Enjoy your database! Point IOx to it by running the following:" echo "\$ export INFLUXDB_IOX_CATALOG_DSN=\"${DATABASE_URL}\"" diff --git a/service_grpc_catalog/src/lib.rs b/service_grpc_catalog/src/lib.rs index 22331ee194..1ee97856c4 100644 --- a/service_grpc_catalog/src/lib.rs +++ b/service_grpc_catalog/src/lib.rs @@ -212,15 +212,9 @@ mod tests { let metrics = Arc::new(metric::Registry::default()); let catalog = Arc::new(MemCatalog::new(metrics)); let mut repos = catalog.repositories().await; - let topic = repos.topics().create_or_get("iox-shared").await.unwrap(); - let pool = repos - .query_pools() - .create_or_get("iox-shared") - .await - .unwrap(); let namespace = repos .namespaces() - .create("catalog_partition_test", None, topic.id, pool.id) + .create("catalog_partition_test", None) .await .unwrap(); let table = repos @@ -281,15 +275,9 @@ mod tests { let metrics = Arc::new(metric::Registry::default()); let catalog = Arc::new(MemCatalog::new(metrics)); let mut repos = catalog.repositories().await; - let topic = repos.topics().create_or_get("iox-shared").await.unwrap(); - let pool = repos - .query_pools() - .create_or_get("iox-shared") - .await - .unwrap(); let namespace = repos .namespaces() - .create("catalog_partition_test", None, topic.id, pool.id) + .create("catalog_partition_test", None) .await .unwrap(); let table = repos diff --git a/service_grpc_namespace/src/lib.rs b/service_grpc_namespace/src/lib.rs index 3fd96297bb..83efb9f6da 100644 --- a/service_grpc_namespace/src/lib.rs +++ b/service_grpc_namespace/src/lib.rs @@ -1,7 +1,7 @@ //! Implementation of the namespace gRPC service use std::sync::Arc; -use data_types::{Namespace as CatalogNamespace, NamespaceName, QueryPoolId, TopicId}; +use data_types::{Namespace as CatalogNamespace, NamespaceName}; use generated_types::influxdata::iox::namespace::v1::{ update_namespace_service_protection_limit_request::LimitUpdate, *, }; @@ -14,21 +14,11 @@ use tonic::{Request, Response, Status}; pub struct NamespaceService { /// Catalog. catalog: Arc, - topic_id: Option, - query_id: Option, } impl NamespaceService { - pub fn new( - catalog: Arc, - topic_id: Option, - query_id: Option, - ) -> Self { - Self { - catalog, - topic_id, - query_id, - } + pub fn new(catalog: Arc) -> Self { + Self { catalog } } } @@ -58,10 +48,6 @@ impl namespace_service_server::NamespaceService for NamespaceService { &self, request: Request, ) -> Result, Status> { - if self.topic_id.is_none() || self.query_id.is_none() { - return Err(Status::invalid_argument("topic_id or query_id not set")); - } - let mut repos = self.catalog.repositories().await; let CreateNamespaceRequest { @@ -80,12 +66,7 @@ impl namespace_service_server::NamespaceService for NamespaceService { let namespace = repos .namespaces() - .create( - &namespace_name, - retention_period_ns, - self.topic_id.unwrap(), - self.query_id.unwrap(), - ) + .create(&namespace_name, retention_period_ns) .await .map_err(|e| { warn!(error=%e, %namespace_name, "failed to create namespace"); @@ -349,22 +330,7 @@ mod tests { let catalog: Arc = Arc::new(MemCatalog::new(Arc::new(metric::Registry::default()))); - let topic = catalog - .repositories() - .await - .topics() - .create_or_get("kafka-topic") - .await - .unwrap(); - let query_pool = catalog - .repositories() - .await - .query_pools() - .create_or_get("query-pool") - .await - .unwrap(); - - let handler = NamespaceService::new(catalog, Some(topic.id), Some(query_pool.id)); + let handler = NamespaceService::new(catalog); // There should be no namespaces to start with. { @@ -499,22 +465,7 @@ mod tests { let catalog: Arc = Arc::new(MemCatalog::new(Arc::new(metric::Registry::default()))); - let topic = catalog - .repositories() - .await - .topics() - .create_or_get("kafka-topic") - .await - .unwrap(); - let query_pool = catalog - .repositories() - .await - .query_pools() - .create_or_get("query-pool") - .await - .unwrap(); - - let handler = NamespaceService::new(catalog, Some(topic.id), Some(query_pool.id)); + let handler = NamespaceService::new(catalog); let req = CreateNamespaceRequest { name: NS_NAME.to_string(), retention_period_ns: Some(RETENTION), @@ -572,22 +523,7 @@ mod tests { let catalog: Arc = Arc::new(MemCatalog::new(Arc::new(metric::Registry::default()))); - let topic = catalog - .repositories() - .await - .topics() - .create_or_get("kafka-topic") - .await - .unwrap(); - let query_pool = catalog - .repositories() - .await - .query_pools() - .create_or_get("query-pool") - .await - .unwrap(); - - let handler = NamespaceService::new(catalog, Some(topic.id), Some(query_pool.id)); + let handler = NamespaceService::new(catalog); let req = CreateNamespaceRequest { name: String::from($name), diff --git a/service_grpc_object_store/src/lib.rs b/service_grpc_object_store/src/lib.rs index 26dfc38bcc..8a92ac2ba9 100644 --- a/service_grpc_object_store/src/lib.rs +++ b/service_grpc_object_store/src/lib.rs @@ -110,15 +110,9 @@ mod tests { let metrics = Arc::new(metric::Registry::default()); let catalog = Arc::new(MemCatalog::new(metrics)); let mut repos = catalog.repositories().await; - let topic = repos.topics().create_or_get("iox-shared").await.unwrap(); - let pool = repos - .query_pools() - .create_or_get("iox-shared") - .await - .unwrap(); let namespace = repos .namespaces() - .create("catalog_partition_test", None, topic.id, pool.id) + .create("catalog_partition_test", None) .await .unwrap(); let table = repos diff --git a/service_grpc_schema/src/lib.rs b/service_grpc_schema/src/lib.rs index 47e5bdb6ae..f359e57224 100644 --- a/service_grpc_schema/src/lib.rs +++ b/service_grpc_schema/src/lib.rs @@ -48,8 +48,6 @@ fn schema_to_proto(schema: Arc) -> GetSchemaRespons let response = GetSchemaResponse { schema: Some(NamespaceSchema { id: schema.id.get(), - topic_id: schema.topic_id.get(), - query_pool_id: schema.query_pool_id.get(), tables: schema .tables .iter() @@ -95,11 +93,9 @@ mod tests { let metrics = Arc::new(metric::Registry::default()); let catalog = Arc::new(MemCatalog::new(metrics)); let mut repos = catalog.repositories().await; - let topic = repos.topics().create_or_get("franz").await.unwrap(); - let pool = repos.query_pools().create_or_get("franz").await.unwrap(); let namespace = repos .namespaces() - .create("namespace_schema_test", None, topic.id, pool.id) + .create("namespace_schema_test", None) .await .unwrap(); let table = repos diff --git a/test_helpers_end_to_end/src/database.rs b/test_helpers_end_to_end/src/database.rs index 1a3664adac..59f85319e0 100644 --- a/test_helpers_end_to_end/src/database.rs +++ b/test_helpers_end_to_end/src/database.rs @@ -40,17 +40,5 @@ pub async fn initialize_db(dsn: &str, schema_name: &str) { .ok() .unwrap(); - // Create the shared topic in the catalog - Command::cargo_bin("influxdb_iox") - .unwrap() - .arg("catalog") - .arg("topic") - .arg("update") - .arg("iox-shared") - .env("INFLUXDB_IOX_CATALOG_DSN", dsn) - .env("INFLUXDB_IOX_CATALOG_POSTGRES_SCHEMA_NAME", schema_name) - .ok() - .unwrap(); - init.insert(schema_name.into()); }