Merge pull request #6737 from influxdata/dry_transition_shard_id

chore: Avoid defining transition shard numbers in multiple crates
pull/24376/head
Marko Mikulicic 2023-01-27 19:09:58 +01:00 committed by GitHub
commit 6d5cfba76f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 13 additions and 15 deletions

View File

@ -10,6 +10,7 @@ use data_types::{
ColumnId, ColumnSchema, ColumnSet, ColumnType, CompactionLevel, Namespace, NamespaceId,
NamespaceSchema, ParquetFile, ParquetFileId, Partition, PartitionId, PartitionKey, QueryPoolId,
SequenceNumber, ShardId, SkippedCompaction, Table, TableId, TableSchema, Timestamp, TopicId,
TRANSITION_SHARD_NUMBER,
};
use datafusion::arrow::record_batch::RecordBatch;
use futures::TryStreamExt;
@ -259,7 +260,7 @@ impl SkippedCompactionBuilder {
}
}
const SHARD_INDEX: i32 = 1234;
const SHARD_INDEX: i32 = TRANSITION_SHARD_NUMBER;
const PARTITION_THRESHOLD: Duration = Duration::from_secs(10 * 60); // 10min
const MAX_DESIRE_FILE_SIZE: u64 = 100 * 1024;
const PERCENTAGE_MAX_FILE_SIZE: u16 = 5;

View File

@ -37,6 +37,13 @@ use std::{
};
use uuid::Uuid;
/// Magic number to be used shard indices and shard ids in "kafkaless".
pub const TRANSITION_SHARD_NUMBER: i32 = 1234;
/// In kafkaless mode all new persisted data uses this shard id.
pub const TRANSITION_SHARD_ID: ShardId = ShardId::new(TRANSITION_SHARD_NUMBER as i64);
/// In kafkaless mode all new persisted data uses this shard index.
pub const TRANSITION_SHARD_INDEX: ShardIndex = ShardIndex::new(TRANSITION_SHARD_NUMBER);
/// Compaction levels
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Hash, sqlx::Type)]
#[repr(i16)]

View File

@ -31,7 +31,7 @@ use crate::{buffer::Buffer, cache::SchemaCache, grpc::GrpcDelegate};
use arrow_flight::flight_service_server::{FlightService, FlightServiceServer};
use async_trait::async_trait;
use data_types::sequence_number_set::SequenceNumberSet;
use data_types::{NamespaceId, PartitionId, SequenceNumber, ShardIndex, TableId};
use data_types::{NamespaceId, PartitionId, SequenceNumber, TableId, TRANSITION_SHARD_INDEX};
use generated_types::influxdata::iox::ingester::v1::replication_service_server::{
ReplicationService, ReplicationServiceServer,
};
@ -51,11 +51,6 @@ pub enum BufferError {
MutableBatch(#[from] mutable_batch::Error),
}
/// During the testing of ingest replica, the catalog will require a ShardIndex for
/// various operations. This is a const value for these occasions. Look up the ShardId for this
/// ShardIndex when needed.
const TRANSITION_SHARD_INDEX: ShardIndex = ShardIndex::new(1234);
/// Acquire opaque handles to the IngestReplica RPC service implementations.
///
/// This trait serves as the public crate API boundary - callers external to the

View File

@ -44,7 +44,7 @@
missing_docs
)]
use data_types::ShardIndex;
use data_types::TRANSITION_SHARD_INDEX;
/// A macro to conditionally prepend `pub` to the inner tokens for benchmarking
/// purposes, should the `benches` feature be enabled.
@ -71,11 +71,6 @@ macro_rules! maybe_pub {
};
}
/// During the testing of ingester2, the catalog will require a ShardIndex for
/// various operations. This is a const value for these occasions. Look up the ShardId for this
/// ShardIndex when needed.
const TRANSITION_SHARD_INDEX: ShardIndex = ShardIndex::new(1234);
/// Ingester initialisation methods & types.
///
/// This module defines the public API boundary of the Ingester crate.

View File

@ -2,7 +2,7 @@ use async_trait::async_trait;
use backoff::BackoffConfig;
use clap_blocks::compactor2::Compactor2Config;
use compactor2::{compactor::Compactor2, config::Config};
use data_types::PartitionId;
use data_types::{PartitionId, TRANSITION_SHARD_NUMBER};
use hyper::{Body, Request, Response};
use iox_catalog::interface::Catalog;
use iox_query::exec::Executor;
@ -27,7 +27,7 @@ use trace::TraceCollector;
// There is only one shard with index 1
const TOPIC: &str = "iox-shared";
const TRANSITION_SHARD_INDEX: i32 = 1234; // see ingester2 crate
const TRANSITION_SHARD_INDEX: i32 = TRANSITION_SHARD_NUMBER;
pub struct Compactor2ServerType {
compactor: Compactor2,