fix: Rename some more sequencer to shard

pull/24376/head
Carol (Nichols || Goulding) 2022-08-19 15:37:55 -04:00
parent ab20828c2f
commit c9567cad7d
No known key found for this signature in database
GPG Key ID: E907EE5A736F87D4
4 changed files with 33 additions and 37 deletions

View File

@ -235,19 +235,19 @@ mod tests {
.join("\n");
let ns = catalog.create_namespace("ns").await;
let sequencer = ns.create_shard(1).await;
let shard = ns.create_shard(1).await;
let table = ns.create_table("table").await;
table.create_column("field_int", ColumnType::I64).await;
table.create_column("tag1", ColumnType::Tag).await;
table.create_column("tag2", ColumnType::Tag).await;
table.create_column("tag3", ColumnType::Tag).await;
table.create_column("time", ColumnType::Time).await;
let partition = table.with_shard(&sequencer).create_partition("part").await;
let partition = table.with_shard(&shard).create_partition("part").await;
let time = Arc::new(SystemProvider::new());
let config = make_compactor_config();
let metrics = Arc::new(metric::Registry::new());
let compactor = Compactor::new(
vec![sequencer.shard.id],
vec![shard.shard.id],
Arc::clone(&catalog.catalog),
ParquetStorage::new(Arc::clone(&catalog.object_store)),
Arc::new(Executor::new(1)),
@ -322,7 +322,7 @@ mod tests {
partition.create_parquet_file(builder).await;
// should have 4 level-0 files before compacting
let count = catalog.count_level_0_files(sequencer.shard.id).await;
let count = catalog.count_level_0_files(shard.shard.id).await;
assert_eq!(count, 4);
// ------------------------------------------------
@ -453,20 +453,20 @@ mod tests {
.join("\n");
let ns = catalog.create_namespace("ns").await;
let sequencer = ns.create_shard(1).await;
let shard = ns.create_shard(1).await;
let table = ns.create_table("table").await;
table.create_column("field_int", ColumnType::I64).await;
table.create_column("tag1", ColumnType::Tag).await;
table.create_column("tag2", ColumnType::Tag).await;
table.create_column("tag3", ColumnType::Tag).await;
table.create_column("time", ColumnType::Time).await;
let partition = table.with_shard(&sequencer).create_partition("part").await;
let partition = table.with_shard(&shard).create_partition("part").await;
let time = Arc::new(SystemProvider::new());
let time_38_hour_ago = (time.now() - Duration::from_secs(60 * 60 * 38)).timestamp_nanos();
let config = make_compactor_config();
let metrics = Arc::new(metric::Registry::new());
let compactor = Compactor::new(
vec![sequencer.shard.id],
vec![shard.shard.id],
Arc::clone(&catalog.catalog),
ParquetStorage::new(Arc::clone(&catalog.object_store)),
Arc::new(Executor::new(1)),
@ -541,7 +541,7 @@ mod tests {
partition.create_parquet_file(builder).await;
// should have 4 level-0 files before compacting
let count = catalog.count_level_0_files(sequencer.shard.id).await;
let count = catalog.count_level_0_files(shard.shard.id).await;
assert_eq!(count, 4);
// ------------------------------------------------
@ -637,20 +637,20 @@ mod tests {
.join("\n");
let ns = catalog.create_namespace("ns").await;
let sequencer = ns.create_shard(1).await;
let shard = ns.create_shard(1).await;
let table = ns.create_table("table").await;
table.create_column("field_int", ColumnType::I64).await;
table.create_column("tag1", ColumnType::Tag).await;
table.create_column("tag2", ColumnType::Tag).await;
table.create_column("tag3", ColumnType::Tag).await;
table.create_column("time", ColumnType::Time).await;
let partition = table.with_shard(&sequencer).create_partition("part").await;
let partition = table.with_shard(&shard).create_partition("part").await;
let time = Arc::new(SystemProvider::new());
let time_38_hour_ago = (time.now() - Duration::from_secs(60 * 60 * 38)).timestamp_nanos();
let config = make_compactor_config();
let metrics = Arc::new(metric::Registry::new());
let compactor = Compactor::new(
vec![sequencer.shard.id],
vec![shard.shard.id],
Arc::clone(&catalog.catalog),
ParquetStorage::new(Arc::clone(&catalog.object_store)),
Arc::new(Executor::new(1)),
@ -684,7 +684,7 @@ mod tests {
partition.create_parquet_file(builder).await;
// should have 1 level-0 file before compacting
let count = catalog.count_level_0_files(sequencer.shard.id).await;
let count = catalog.count_level_0_files(shard.shard.id).await;
assert_eq!(count, 1);
// ------------------------------------------------

View File

@ -499,7 +499,7 @@ mod tests {
async fn test_setup() -> TestSetup {
let catalog = TestCatalog::new();
let ns = catalog.create_namespace("ns").await;
let sequencer = ns.create_shard(1).await;
let shard = ns.create_shard(1).await;
let table = ns.create_table("table").await;
table.create_column("field_int", ColumnType::I64).await;
table.create_column("tag1", ColumnType::Tag).await;
@ -509,7 +509,7 @@ mod tests {
let table_schema = table.catalog_schema().await;
let partition = table
.with_shard(&sequencer)
.with_shard(&shard)
.create_partition("2022-07-13")
.await;

View File

@ -39,7 +39,7 @@ use write_summary::ShardProgress;
#[allow(missing_copy_implementations, missing_docs)]
pub enum Error {
#[snafu(display(
"No sequencer record found for kafka topic {} and partition {}",
"No shard record found for kafka topic {} and partition {}",
kafka_topic,
kafka_partition
))]
@ -66,7 +66,7 @@ pub trait IngestHandler: Send + Sync {
request: IngesterQueryRequest,
) -> Result<IngesterQueryResponse, crate::querier_handler::Error>;
/// Return sequencer progress for the requested kafka partitions
/// Return shard progress for the requested kafka partitions
async fn progresses(
&self,
sequencers: Vec<KafkaPartition>,
@ -361,7 +361,7 @@ impl IngestHandler for IngestHandlerImpl {
self.data.exec().shutdown();
}
/// Return the ingestion progress from each sequencer
/// Return the ingestion progress from each shard
async fn progresses(
&self,
partitions: Vec<KafkaPartition>,
@ -656,8 +656,8 @@ mod tests {
.await
.unwrap();
let mut sequencer_states = BTreeMap::new();
sequencer_states.insert(kafka_partition, shard);
let mut shard_states = BTreeMap::new();
shard_states.insert(kafka_partition, shard);
let write_buffer_state =
MockBufferSharedState::empty_with_n_sequencers(NonZeroU32::try_from(1).unwrap());
@ -686,7 +686,7 @@ mod tests {
let ingester = IngestHandlerImpl::new(
lifecycle_config,
kafka_topic.clone(),
sequencer_states,
shard_states,
Arc::clone(&catalog),
object_store,
reading,
@ -768,10 +768,9 @@ mod tests {
),
];
let (ingester, sequencer, namespace) =
ingester_test_setup(write_operations, 2, false).await;
let (ingester, shard, namespace) = ingester_test_setup(write_operations, 2, false).await;
verify_ingester_buffer_has_data(ingester, sequencer, namespace, |first_batch| {
verify_ingester_buffer_has_data(ingester, shard, namespace, |first_batch| {
if first_batch.min_sequencer_number == SequenceNumber::new(1) {
panic!(
"initialization did a seek to the beginning rather than \
@ -799,8 +798,7 @@ mod tests {
150,
),
)];
let (ingester, _sequencer, _namespace) =
ingester_test_setup(write_operations, 2, false).await;
let (ingester, _shard, _namespace) = ingester_test_setup(write_operations, 2, false).await;
tokio::time::timeout(Duration::from_millis(1000), ingester.join())
.await
@ -824,8 +822,7 @@ mod tests {
150,
),
)];
let (ingester, _sequencer, _namespace) =
ingester_test_setup(write_operations, 10, false).await;
let (ingester, _shard, _namespace) = ingester_test_setup(write_operations, 10, false).await;
tokio::time::timeout(Duration::from_millis(1100), ingester.join())
.await
@ -849,8 +846,7 @@ mod tests {
150,
),
)];
let (ingester, _sequencer, _namespace) =
ingester_test_setup(write_operations, 10, true).await;
let (ingester, _shard, _namespace) = ingester_test_setup(write_operations, 10, true).await;
tokio::time::timeout(Duration::from_millis(1100), ingester.join())
.await
@ -877,9 +873,9 @@ mod tests {
// Set the min unpersisted to something bigger than the write's sequence number to
// cause an UnknownSequenceNumber error. Skip to oldest available = true, so ingester
// should find data
let (ingester, sequencer, namespace) = ingester_test_setup(write_operations, 1, true).await;
let (ingester, shard, namespace) = ingester_test_setup(write_operations, 1, true).await;
verify_ingester_buffer_has_data(ingester, sequencer, namespace, |first_batch| {
verify_ingester_buffer_has_data(ingester, shard, namespace, |first_batch| {
assert_eq!(
first_batch.min_sequencer_number,
SequenceNumber::new(10),
@ -936,15 +932,15 @@ mod tests {
.create("foo", "inf", kafka_topic.id, query_pool.id)
.await
.unwrap();
let sequencer = txn
let shard = txn
.shards()
.create_or_get(&kafka_topic, kafka_partition)
.await
.unwrap();
txn.commit().await.unwrap();
let mut sequencer_states = BTreeMap::new();
sequencer_states.insert(kafka_partition, sequencer);
let mut shard_states = BTreeMap::new();
shard_states.insert(kafka_partition, shard);
let write_buffer_state =
MockBufferSharedState::empty_with_n_sequencers(NonZeroU32::try_from(1).unwrap());
@ -962,7 +958,7 @@ mod tests {
let ingester = IngestHandlerImpl::new(
lifecycle_config,
kafka_topic.clone(),
sequencer_states,
shard_states,
Arc::clone(&catalog),
object_store,
reading,
@ -976,7 +972,7 @@ mod tests {
Self {
catalog,
shard: sequencer,
shard,
namespace,
kafka_topic,
kafka_partition,

View File

@ -862,7 +862,7 @@ pub(crate) async fn make_one_partition_with_tombstones(
let ts = create_tombstone(
2, // tombstone id
table_id.get(), // table id
shard_id.get(), // sequencer id
shard_id.get(), // shard id
seq_num, // delete's seq_number
10, // min time of data to get deleted
50, // max time of data to get deleted