chore: Avoid defining transition shard numbers in multiple crates

pull/24376/head
Marko Mikulicic 2023-01-27 18:03:34 +01:00
parent db7e6335ca
commit 0bc7d90ee3
5 changed files with 13 additions and 15 deletions

View File

@ -10,6 +10,7 @@ use data_types::{
ColumnId, ColumnSchema, ColumnSet, ColumnType, CompactionLevel, Namespace, NamespaceId, ColumnId, ColumnSchema, ColumnSet, ColumnType, CompactionLevel, Namespace, NamespaceId,
NamespaceSchema, ParquetFile, ParquetFileId, Partition, PartitionId, PartitionKey, QueryPoolId, NamespaceSchema, ParquetFile, ParquetFileId, Partition, PartitionId, PartitionKey, QueryPoolId,
SequenceNumber, ShardId, SkippedCompaction, Table, TableId, TableSchema, Timestamp, TopicId, SequenceNumber, ShardId, SkippedCompaction, Table, TableId, TableSchema, Timestamp, TopicId,
TRANSITION_SHARD_NUMBER,
}; };
use datafusion::arrow::record_batch::RecordBatch; use datafusion::arrow::record_batch::RecordBatch;
use futures::TryStreamExt; use futures::TryStreamExt;
@ -259,7 +260,7 @@ impl SkippedCompactionBuilder {
} }
} }
const SHARD_INDEX: i32 = 1234; const SHARD_INDEX: i32 = TRANSITION_SHARD_NUMBER;
const PARTITION_THRESHOLD: Duration = Duration::from_secs(10 * 60); // 10min const PARTITION_THRESHOLD: Duration = Duration::from_secs(10 * 60); // 10min
const MAX_DESIRE_FILE_SIZE: u64 = 100 * 1024; const MAX_DESIRE_FILE_SIZE: u64 = 100 * 1024;
const PERCENTAGE_MAX_FILE_SIZE: u16 = 5; const PERCENTAGE_MAX_FILE_SIZE: u16 = 5;

View File

@ -37,6 +37,13 @@ use std::{
}; };
use uuid::Uuid; use uuid::Uuid;
/// Magic number to be used shard indices and shard ids in "kafkaless".
pub const TRANSITION_SHARD_NUMBER: i32 = 1234;
/// In kafkaless mode all new persisted data uses this shard id.
pub const TRANSITION_SHARD_ID: ShardId = ShardId::new(TRANSITION_SHARD_NUMBER as i64);
/// In kafkaless mode all new persisted data uses this shard index.
pub const TRANSITION_SHARD_INDEX: ShardIndex = ShardIndex::new(TRANSITION_SHARD_NUMBER);
/// Compaction levels /// Compaction levels
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Hash, sqlx::Type)] #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Hash, sqlx::Type)]
#[repr(i16)] #[repr(i16)]

View File

@ -31,7 +31,7 @@ use crate::{buffer::Buffer, cache::SchemaCache, grpc::GrpcDelegate};
use arrow_flight::flight_service_server::{FlightService, FlightServiceServer}; use arrow_flight::flight_service_server::{FlightService, FlightServiceServer};
use async_trait::async_trait; use async_trait::async_trait;
use data_types::sequence_number_set::SequenceNumberSet; use data_types::sequence_number_set::SequenceNumberSet;
use data_types::{NamespaceId, PartitionId, SequenceNumber, ShardIndex, TableId}; use data_types::{NamespaceId, PartitionId, SequenceNumber, TableId, TRANSITION_SHARD_INDEX};
use generated_types::influxdata::iox::ingester::v1::replication_service_server::{ use generated_types::influxdata::iox::ingester::v1::replication_service_server::{
ReplicationService, ReplicationServiceServer, ReplicationService, ReplicationServiceServer,
}; };
@ -51,11 +51,6 @@ pub enum BufferError {
MutableBatch(#[from] mutable_batch::Error), MutableBatch(#[from] mutable_batch::Error),
} }
/// During the testing of ingest replica, the catalog will require a ShardIndex for
/// various operations. This is a const value for these occasions. Look up the ShardId for this
/// ShardIndex when needed.
const TRANSITION_SHARD_INDEX: ShardIndex = ShardIndex::new(1234);
/// Acquire opaque handles to the IngestReplica RPC service implementations. /// Acquire opaque handles to the IngestReplica RPC service implementations.
/// ///
/// This trait serves as the public crate API boundary - callers external to the /// This trait serves as the public crate API boundary - callers external to the

View File

@ -44,7 +44,7 @@
missing_docs missing_docs
)] )]
use data_types::ShardIndex; use data_types::TRANSITION_SHARD_INDEX;
/// A macro to conditionally prepend `pub` to the inner tokens for benchmarking /// A macro to conditionally prepend `pub` to the inner tokens for benchmarking
/// purposes, should the `benches` feature be enabled. /// purposes, should the `benches` feature be enabled.
@ -71,11 +71,6 @@ macro_rules! maybe_pub {
}; };
} }
/// During the testing of ingester2, the catalog will require a ShardIndex for
/// various operations. This is a const value for these occasions. Look up the ShardId for this
/// ShardIndex when needed.
const TRANSITION_SHARD_INDEX: ShardIndex = ShardIndex::new(1234);
/// Ingester initialisation methods & types. /// Ingester initialisation methods & types.
/// ///
/// This module defines the public API boundary of the Ingester crate. /// This module defines the public API boundary of the Ingester crate.

View File

@ -2,7 +2,7 @@ use async_trait::async_trait;
use backoff::BackoffConfig; use backoff::BackoffConfig;
use clap_blocks::compactor2::Compactor2Config; use clap_blocks::compactor2::Compactor2Config;
use compactor2::{compactor::Compactor2, config::Config}; use compactor2::{compactor::Compactor2, config::Config};
use data_types::PartitionId; use data_types::{PartitionId, TRANSITION_SHARD_NUMBER};
use hyper::{Body, Request, Response}; use hyper::{Body, Request, Response};
use iox_catalog::interface::Catalog; use iox_catalog::interface::Catalog;
use iox_query::exec::Executor; use iox_query::exec::Executor;
@ -27,7 +27,7 @@ use trace::TraceCollector;
// There is only one shard with index 1 // There is only one shard with index 1
const TOPIC: &str = "iox-shared"; const TOPIC: &str = "iox-shared";
const TRANSITION_SHARD_INDEX: i32 = 1234; // see ingester2 crate const TRANSITION_SHARD_INDEX: i32 = TRANSITION_SHARD_NUMBER;
pub struct Compactor2ServerType { pub struct Compactor2ServerType {
compactor: Compactor2, compactor: Compactor2,