fix(iox): Add transition shard to catalog setup (#6799)
parent
c77e1214ff
commit
167cde1838
|
@ -110,7 +110,7 @@ pub struct TopicId(i64);
|
|||
|
||||
#[allow(missing_docs)]
|
||||
impl TopicId {
|
||||
pub fn new(v: i64) -> Self {
|
||||
pub const fn new(v: i64) -> Self {
|
||||
Self(v)
|
||||
}
|
||||
pub fn get(&self) -> i64 {
|
||||
|
|
|
@ -940,12 +940,17 @@ pub async fn list_schemas(
|
|||
|
||||
#[cfg(test)]
|
||||
pub(crate) mod test_helpers {
|
||||
use crate::{validate_or_insert_schema, DEFAULT_MAX_COLUMNS_PER_TABLE, DEFAULT_MAX_TABLES};
|
||||
use crate::{
|
||||
validate_or_insert_schema, DEFAULT_MAX_COLUMNS_PER_TABLE, DEFAULT_MAX_TABLES,
|
||||
SHARED_TOPIC_ID,
|
||||
};
|
||||
|
||||
use super::*;
|
||||
use ::test_helpers::{assert_contains, tracing::TracingCapture};
|
||||
use assert_matches::assert_matches;
|
||||
use data_types::{ColumnId, ColumnSet, CompactionLevel};
|
||||
use data_types::{
|
||||
ColumnId, ColumnSet, CompactionLevel, TRANSITION_SHARD_ID, TRANSITION_SHARD_INDEX,
|
||||
};
|
||||
use metric::{Attributes, DurationHistogram, Metric};
|
||||
use std::{
|
||||
ops::{Add, DerefMut},
|
||||
|
@ -995,6 +1000,25 @@ pub(crate) mod test_helpers {
|
|||
async fn test_setup(catalog: Arc<dyn Catalog>) {
|
||||
catalog.setup().await.expect("first catalog setup");
|
||||
catalog.setup().await.expect("second catalog setup");
|
||||
|
||||
if std::env::var("INFLUXDB_IOX_RPC_MODE").is_ok() {
|
||||
let transition_shard = catalog
|
||||
.repositories()
|
||||
.await
|
||||
.shards()
|
||||
.get_by_topic_id_and_shard_index(SHARED_TOPIC_ID, TRANSITION_SHARD_INDEX)
|
||||
.await
|
||||
.expect("transition shard");
|
||||
|
||||
assert_matches!(
|
||||
transition_shard,
|
||||
Some(Shard {
|
||||
id,
|
||||
shard_index,
|
||||
..
|
||||
}) if id == TRANSITION_SHARD_ID && shard_index == TRANSITION_SHARD_INDEX
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
async fn test_topic(catalog: Arc<dyn Catalog>) {
|
||||
|
|
|
@ -15,7 +15,8 @@
|
|||
|
||||
use crate::interface::{ColumnTypeMismatchSnafu, Error, RepoCollection, Result, Transaction};
|
||||
use data_types::{
|
||||
ColumnType, NamespaceSchema, QueryPool, Shard, ShardId, ShardIndex, TableSchema, TopicMetadata,
|
||||
ColumnType, NamespaceSchema, QueryPool, Shard, ShardId, ShardIndex, TableSchema, TopicId,
|
||||
TopicMetadata,
|
||||
};
|
||||
use mutable_batch::MutableBatch;
|
||||
use std::{
|
||||
|
@ -25,6 +26,7 @@ use std::{
|
|||
use thiserror::Error;
|
||||
|
||||
const SHARED_TOPIC_NAME: &str = "iox-shared";
|
||||
const SHARED_TOPIC_ID: TopicId = TopicId::new(1);
|
||||
const SHARED_QUERY_POOL: &str = SHARED_TOPIC_NAME;
|
||||
const TIME_COLUMN: &str = "time";
|
||||
|
||||
|
|
|
@ -8,7 +8,7 @@ use crate::{
|
|||
TombstoneRepo, TopicMetadataRepo, Transaction,
|
||||
},
|
||||
metrics::MetricDecorator,
|
||||
DEFAULT_MAX_COLUMNS_PER_TABLE, DEFAULT_MAX_TABLES,
|
||||
DEFAULT_MAX_COLUMNS_PER_TABLE, DEFAULT_MAX_TABLES, SHARED_TOPIC_ID, SHARED_TOPIC_NAME,
|
||||
};
|
||||
use async_trait::async_trait;
|
||||
use data_types::{
|
||||
|
@ -16,7 +16,7 @@ use data_types::{
|
|||
ParquetFileId, ParquetFileParams, Partition, PartitionId, PartitionKey, PartitionParam,
|
||||
ProcessedTombstone, QueryPool, QueryPoolId, SequenceNumber, Shard, ShardId, ShardIndex,
|
||||
SkippedCompaction, Table, TableId, TablePartition, Timestamp, Tombstone, TombstoneId, TopicId,
|
||||
TopicMetadata,
|
||||
TopicMetadata, TRANSITION_SHARD_ID, TRANSITION_SHARD_INDEX,
|
||||
};
|
||||
use iox_time::{SystemProvider, TimeProvider};
|
||||
use observability_deps::tracing::{debug, info, warn};
|
||||
|
@ -292,6 +292,39 @@ impl Catalog for PostgresCatalog {
|
|||
.await
|
||||
.map_err(|e| Error::Setup { source: e.into() })?;
|
||||
|
||||
if std::env::var("INFLUXDB_IOX_RPC_MODE").is_ok() {
|
||||
// We need to manually insert the topic here so that we can create the transition shard below.
|
||||
sqlx::query(
|
||||
r#"
|
||||
INSERT INTO topic (name)
|
||||
VALUES ($1)
|
||||
ON CONFLICT ON CONSTRAINT topic_name_unique
|
||||
DO NOTHING;
|
||||
"#,
|
||||
)
|
||||
.bind(SHARED_TOPIC_NAME)
|
||||
.execute(&self.pool)
|
||||
.await
|
||||
.map_err(|e| Error::Setup { source: e })?;
|
||||
|
||||
// The transition shard must exist and must have magic ID and INDEX.
|
||||
sqlx::query(
|
||||
r#"
|
||||
INSERT INTO shard (id, topic_id, shard_index, min_unpersisted_sequence_number)
|
||||
OVERRIDING SYSTEM VALUE
|
||||
VALUES ($1, $2, $3, 0)
|
||||
ON CONFLICT ON CONSTRAINT shard_unique
|
||||
DO NOTHING;
|
||||
"#,
|
||||
)
|
||||
.bind(TRANSITION_SHARD_ID)
|
||||
.bind(SHARED_TOPIC_ID)
|
||||
.bind(TRANSITION_SHARD_INDEX)
|
||||
.execute(&self.pool)
|
||||
.await
|
||||
.map_err(|e| Error::Setup { source: e })?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue