From 167cde1838649985023d7191d73760ae1c583a67 Mon Sep 17 00:00:00 2001 From: Marko Mikulicic Date: Wed, 1 Feb 2023 18:04:03 +0100 Subject: [PATCH] fix(iox): Add transition shard to catalog setup (#6799) --- data_types/src/lib.rs | 2 +- iox_catalog/src/interface.rs | 28 +++++++++++++++++++++++++-- iox_catalog/src/lib.rs | 4 +++- iox_catalog/src/postgres.rs | 37 ++++++++++++++++++++++++++++++++++-- 4 files changed, 65 insertions(+), 6 deletions(-) diff --git a/data_types/src/lib.rs b/data_types/src/lib.rs index 59cfde3261..5d84cb6de2 100644 --- a/data_types/src/lib.rs +++ b/data_types/src/lib.rs @@ -110,7 +110,7 @@ pub struct TopicId(i64); #[allow(missing_docs)] impl TopicId { - pub fn new(v: i64) -> Self { + pub const fn new(v: i64) -> Self { Self(v) } pub fn get(&self) -> i64 { diff --git a/iox_catalog/src/interface.rs b/iox_catalog/src/interface.rs index bccc070006..076928d67e 100644 --- a/iox_catalog/src/interface.rs +++ b/iox_catalog/src/interface.rs @@ -940,12 +940,17 @@ pub async fn list_schemas( #[cfg(test)] pub(crate) mod test_helpers { - use crate::{validate_or_insert_schema, DEFAULT_MAX_COLUMNS_PER_TABLE, DEFAULT_MAX_TABLES}; + use crate::{ + validate_or_insert_schema, DEFAULT_MAX_COLUMNS_PER_TABLE, DEFAULT_MAX_TABLES, + SHARED_TOPIC_ID, + }; use super::*; use ::test_helpers::{assert_contains, tracing::TracingCapture}; use assert_matches::assert_matches; - use data_types::{ColumnId, ColumnSet, CompactionLevel}; + use data_types::{ + ColumnId, ColumnSet, CompactionLevel, TRANSITION_SHARD_ID, TRANSITION_SHARD_INDEX, + }; use metric::{Attributes, DurationHistogram, Metric}; use std::{ ops::{Add, DerefMut}, @@ -995,6 +1000,25 @@ pub(crate) mod test_helpers { async fn test_setup(catalog: Arc) { catalog.setup().await.expect("first catalog setup"); catalog.setup().await.expect("second catalog setup"); + + if std::env::var("INFLUXDB_IOX_RPC_MODE").is_ok() { + let transition_shard = catalog + .repositories() + .await + .shards() + .get_by_topic_id_and_shard_index(SHARED_TOPIC_ID, TRANSITION_SHARD_INDEX) + .await + .expect("transition shard"); + + assert_matches!( + transition_shard, + Some(Shard { + id, + shard_index, + .. + }) if id == TRANSITION_SHARD_ID && shard_index == TRANSITION_SHARD_INDEX + ); + } } async fn test_topic(catalog: Arc) { diff --git a/iox_catalog/src/lib.rs b/iox_catalog/src/lib.rs index 62d151cc12..df8f1617d1 100644 --- a/iox_catalog/src/lib.rs +++ b/iox_catalog/src/lib.rs @@ -15,7 +15,8 @@ use crate::interface::{ColumnTypeMismatchSnafu, Error, RepoCollection, Result, Transaction}; use data_types::{ - ColumnType, NamespaceSchema, QueryPool, Shard, ShardId, ShardIndex, TableSchema, TopicMetadata, + ColumnType, NamespaceSchema, QueryPool, Shard, ShardId, ShardIndex, TableSchema, TopicId, + TopicMetadata, }; use mutable_batch::MutableBatch; use std::{ @@ -25,6 +26,7 @@ use std::{ use thiserror::Error; const SHARED_TOPIC_NAME: &str = "iox-shared"; +const SHARED_TOPIC_ID: TopicId = TopicId::new(1); const SHARED_QUERY_POOL: &str = SHARED_TOPIC_NAME; const TIME_COLUMN: &str = "time"; diff --git a/iox_catalog/src/postgres.rs b/iox_catalog/src/postgres.rs index 41c9df7bcf..8c08533cf2 100644 --- a/iox_catalog/src/postgres.rs +++ b/iox_catalog/src/postgres.rs @@ -8,7 +8,7 @@ use crate::{ TombstoneRepo, TopicMetadataRepo, Transaction, }, metrics::MetricDecorator, - DEFAULT_MAX_COLUMNS_PER_TABLE, DEFAULT_MAX_TABLES, + DEFAULT_MAX_COLUMNS_PER_TABLE, DEFAULT_MAX_TABLES, SHARED_TOPIC_ID, SHARED_TOPIC_NAME, }; use async_trait::async_trait; use data_types::{ @@ -16,7 +16,7 @@ use data_types::{ ParquetFileId, ParquetFileParams, Partition, PartitionId, PartitionKey, PartitionParam, ProcessedTombstone, QueryPool, QueryPoolId, SequenceNumber, Shard, ShardId, ShardIndex, SkippedCompaction, Table, TableId, TablePartition, Timestamp, Tombstone, TombstoneId, TopicId, - TopicMetadata, + TopicMetadata, TRANSITION_SHARD_ID, TRANSITION_SHARD_INDEX, }; use iox_time::{SystemProvider, TimeProvider}; use observability_deps::tracing::{debug, info, warn}; @@ -292,6 +292,39 @@ impl Catalog for PostgresCatalog { .await .map_err(|e| Error::Setup { source: e.into() })?; + if std::env::var("INFLUXDB_IOX_RPC_MODE").is_ok() { + // We need to manually insert the topic here so that we can create the transition shard below. + sqlx::query( + r#" +INSERT INTO topic (name) +VALUES ($1) +ON CONFLICT ON CONSTRAINT topic_name_unique +DO NOTHING; + "#, + ) + .bind(SHARED_TOPIC_NAME) + .execute(&self.pool) + .await + .map_err(|e| Error::Setup { source: e })?; + + // The transition shard must exist and must have magic ID and INDEX. + sqlx::query( + r#" +INSERT INTO shard (id, topic_id, shard_index, min_unpersisted_sequence_number) +OVERRIDING SYSTEM VALUE +VALUES ($1, $2, $3, 0) +ON CONFLICT ON CONSTRAINT shard_unique +DO NOTHING; + "#, + ) + .bind(TRANSITION_SHARD_ID) + .bind(SHARED_TOPIC_ID) + .bind(TRANSITION_SHARD_INDEX) + .execute(&self.pool) + .await + .map_err(|e| Error::Setup { source: e })?; + } + Ok(()) }