diff --git a/compactor2/src/test_util.rs b/compactor2/src/test_util.rs index 5f147b9be8..15c8252d69 100644 --- a/compactor2/src/test_util.rs +++ b/compactor2/src/test_util.rs @@ -10,6 +10,7 @@ use data_types::{ ColumnId, ColumnSchema, ColumnSet, ColumnType, CompactionLevel, Namespace, NamespaceId, NamespaceSchema, ParquetFile, ParquetFileId, Partition, PartitionId, PartitionKey, QueryPoolId, SequenceNumber, ShardId, SkippedCompaction, Table, TableId, TableSchema, Timestamp, TopicId, + TRANSITION_SHARD_NUMBER, }; use datafusion::arrow::record_batch::RecordBatch; use futures::TryStreamExt; @@ -259,7 +260,7 @@ impl SkippedCompactionBuilder { } } -const SHARD_INDEX: i32 = 1234; +const SHARD_INDEX: i32 = TRANSITION_SHARD_NUMBER; const PARTITION_THRESHOLD: Duration = Duration::from_secs(10 * 60); // 10min const MAX_DESIRE_FILE_SIZE: u64 = 100 * 1024; const PERCENTAGE_MAX_FILE_SIZE: u16 = 5; diff --git a/data_types/src/lib.rs b/data_types/src/lib.rs index 836e748c3b..59cfde3261 100644 --- a/data_types/src/lib.rs +++ b/data_types/src/lib.rs @@ -37,6 +37,13 @@ use std::{ }; use uuid::Uuid; +/// Magic number to be used shard indices and shard ids in "kafkaless". +pub const TRANSITION_SHARD_NUMBER: i32 = 1234; +/// In kafkaless mode all new persisted data uses this shard id. +pub const TRANSITION_SHARD_ID: ShardId = ShardId::new(TRANSITION_SHARD_NUMBER as i64); +/// In kafkaless mode all new persisted data uses this shard index. +pub const TRANSITION_SHARD_INDEX: ShardIndex = ShardIndex::new(TRANSITION_SHARD_NUMBER); + /// Compaction levels #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Hash, sqlx::Type)] #[repr(i16)] diff --git a/ingest_replica/src/lib.rs b/ingest_replica/src/lib.rs index 8103157c63..090b993771 100644 --- a/ingest_replica/src/lib.rs +++ b/ingest_replica/src/lib.rs @@ -31,7 +31,7 @@ use crate::{buffer::Buffer, cache::SchemaCache, grpc::GrpcDelegate}; use arrow_flight::flight_service_server::{FlightService, FlightServiceServer}; use async_trait::async_trait; use data_types::sequence_number_set::SequenceNumberSet; -use data_types::{NamespaceId, PartitionId, SequenceNumber, ShardIndex, TableId}; +use data_types::{NamespaceId, PartitionId, SequenceNumber, TableId, TRANSITION_SHARD_INDEX}; use generated_types::influxdata::iox::ingester::v1::replication_service_server::{ ReplicationService, ReplicationServiceServer, }; @@ -51,11 +51,6 @@ pub enum BufferError { MutableBatch(#[from] mutable_batch::Error), } -/// During the testing of ingest replica, the catalog will require a ShardIndex for -/// various operations. This is a const value for these occasions. Look up the ShardId for this -/// ShardIndex when needed. -const TRANSITION_SHARD_INDEX: ShardIndex = ShardIndex::new(1234); - /// Acquire opaque handles to the IngestReplica RPC service implementations. /// /// This trait serves as the public crate API boundary - callers external to the diff --git a/ingester2/src/lib.rs b/ingester2/src/lib.rs index b4d45d5958..36b736f0a8 100644 --- a/ingester2/src/lib.rs +++ b/ingester2/src/lib.rs @@ -44,7 +44,7 @@ missing_docs )] -use data_types::ShardIndex; +use data_types::TRANSITION_SHARD_INDEX; /// A macro to conditionally prepend `pub` to the inner tokens for benchmarking /// purposes, should the `benches` feature be enabled. @@ -71,11 +71,6 @@ macro_rules! maybe_pub { }; } -/// During the testing of ingester2, the catalog will require a ShardIndex for -/// various operations. This is a const value for these occasions. Look up the ShardId for this -/// ShardIndex when needed. -const TRANSITION_SHARD_INDEX: ShardIndex = ShardIndex::new(1234); - /// Ingester initialisation methods & types. /// /// This module defines the public API boundary of the Ingester crate. diff --git a/ioxd_compactor2/src/lib.rs b/ioxd_compactor2/src/lib.rs index cbe05ba171..b1317c07f0 100644 --- a/ioxd_compactor2/src/lib.rs +++ b/ioxd_compactor2/src/lib.rs @@ -2,7 +2,7 @@ use async_trait::async_trait; use backoff::BackoffConfig; use clap_blocks::compactor2::Compactor2Config; use compactor2::{compactor::Compactor2, config::Config}; -use data_types::PartitionId; +use data_types::{PartitionId, TRANSITION_SHARD_NUMBER}; use hyper::{Body, Request, Response}; use iox_catalog::interface::Catalog; use iox_query::exec::Executor; @@ -27,7 +27,7 @@ use trace::TraceCollector; // There is only one shard with index 1 const TOPIC: &str = "iox-shared"; -const TRANSITION_SHARD_INDEX: i32 = 1234; // see ingester2 crate +const TRANSITION_SHARD_INDEX: i32 = TRANSITION_SHARD_NUMBER; pub struct Compactor2ServerType { compactor: Compactor2,