From c9567cad7d207933601c7296f6aaa90a36bc0ba3 Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Fri, 19 Aug 2022 15:37:55 -0400 Subject: [PATCH] fix: Rename some more sequencer to shard --- compactor/src/lib.rs | 24 +++++++-------- compactor/src/parquet_file_combining.rs | 4 +-- ingester/src/handler.rs | 40 +++++++++++-------------- ingester/src/test_util.rs | 2 +- 4 files changed, 33 insertions(+), 37 deletions(-) diff --git a/compactor/src/lib.rs b/compactor/src/lib.rs index 383eab383f..2a4547ebe2 100644 --- a/compactor/src/lib.rs +++ b/compactor/src/lib.rs @@ -235,19 +235,19 @@ mod tests { .join("\n"); let ns = catalog.create_namespace("ns").await; - let sequencer = ns.create_shard(1).await; + let shard = ns.create_shard(1).await; let table = ns.create_table("table").await; table.create_column("field_int", ColumnType::I64).await; table.create_column("tag1", ColumnType::Tag).await; table.create_column("tag2", ColumnType::Tag).await; table.create_column("tag3", ColumnType::Tag).await; table.create_column("time", ColumnType::Time).await; - let partition = table.with_shard(&sequencer).create_partition("part").await; + let partition = table.with_shard(&shard).create_partition("part").await; let time = Arc::new(SystemProvider::new()); let config = make_compactor_config(); let metrics = Arc::new(metric::Registry::new()); let compactor = Compactor::new( - vec![sequencer.shard.id], + vec![shard.shard.id], Arc::clone(&catalog.catalog), ParquetStorage::new(Arc::clone(&catalog.object_store)), Arc::new(Executor::new(1)), @@ -322,7 +322,7 @@ mod tests { partition.create_parquet_file(builder).await; // should have 4 level-0 files before compacting - let count = catalog.count_level_0_files(sequencer.shard.id).await; + let count = catalog.count_level_0_files(shard.shard.id).await; assert_eq!(count, 4); // ------------------------------------------------ @@ -453,20 +453,20 @@ mod tests { .join("\n"); let ns = catalog.create_namespace("ns").await; - let sequencer = ns.create_shard(1).await; + let shard = ns.create_shard(1).await; let table = ns.create_table("table").await; table.create_column("field_int", ColumnType::I64).await; table.create_column("tag1", ColumnType::Tag).await; table.create_column("tag2", ColumnType::Tag).await; table.create_column("tag3", ColumnType::Tag).await; table.create_column("time", ColumnType::Time).await; - let partition = table.with_shard(&sequencer).create_partition("part").await; + let partition = table.with_shard(&shard).create_partition("part").await; let time = Arc::new(SystemProvider::new()); let time_38_hour_ago = (time.now() - Duration::from_secs(60 * 60 * 38)).timestamp_nanos(); let config = make_compactor_config(); let metrics = Arc::new(metric::Registry::new()); let compactor = Compactor::new( - vec![sequencer.shard.id], + vec![shard.shard.id], Arc::clone(&catalog.catalog), ParquetStorage::new(Arc::clone(&catalog.object_store)), Arc::new(Executor::new(1)), @@ -541,7 +541,7 @@ mod tests { partition.create_parquet_file(builder).await; // should have 4 level-0 files before compacting - let count = catalog.count_level_0_files(sequencer.shard.id).await; + let count = catalog.count_level_0_files(shard.shard.id).await; assert_eq!(count, 4); // ------------------------------------------------ @@ -637,20 +637,20 @@ mod tests { .join("\n"); let ns = catalog.create_namespace("ns").await; - let sequencer = ns.create_shard(1).await; + let shard = ns.create_shard(1).await; let table = ns.create_table("table").await; table.create_column("field_int", ColumnType::I64).await; table.create_column("tag1", ColumnType::Tag).await; table.create_column("tag2", ColumnType::Tag).await; table.create_column("tag3", ColumnType::Tag).await; table.create_column("time", ColumnType::Time).await; - let partition = table.with_shard(&sequencer).create_partition("part").await; + let partition = table.with_shard(&shard).create_partition("part").await; let time = Arc::new(SystemProvider::new()); let time_38_hour_ago = (time.now() - Duration::from_secs(60 * 60 * 38)).timestamp_nanos(); let config = make_compactor_config(); let metrics = Arc::new(metric::Registry::new()); let compactor = Compactor::new( - vec![sequencer.shard.id], + vec![shard.shard.id], Arc::clone(&catalog.catalog), ParquetStorage::new(Arc::clone(&catalog.object_store)), Arc::new(Executor::new(1)), @@ -684,7 +684,7 @@ mod tests { partition.create_parquet_file(builder).await; // should have 1 level-0 file before compacting - let count = catalog.count_level_0_files(sequencer.shard.id).await; + let count = catalog.count_level_0_files(shard.shard.id).await; assert_eq!(count, 1); // ------------------------------------------------ diff --git a/compactor/src/parquet_file_combining.rs b/compactor/src/parquet_file_combining.rs index 29b4a0eb29..4f205c93c0 100644 --- a/compactor/src/parquet_file_combining.rs +++ b/compactor/src/parquet_file_combining.rs @@ -499,7 +499,7 @@ mod tests { async fn test_setup() -> TestSetup { let catalog = TestCatalog::new(); let ns = catalog.create_namespace("ns").await; - let sequencer = ns.create_shard(1).await; + let shard = ns.create_shard(1).await; let table = ns.create_table("table").await; table.create_column("field_int", ColumnType::I64).await; table.create_column("tag1", ColumnType::Tag).await; @@ -509,7 +509,7 @@ mod tests { let table_schema = table.catalog_schema().await; let partition = table - .with_shard(&sequencer) + .with_shard(&shard) .create_partition("2022-07-13") .await; diff --git a/ingester/src/handler.rs b/ingester/src/handler.rs index c5656ac16f..579ca9758b 100644 --- a/ingester/src/handler.rs +++ b/ingester/src/handler.rs @@ -39,7 +39,7 @@ use write_summary::ShardProgress; #[allow(missing_copy_implementations, missing_docs)] pub enum Error { #[snafu(display( - "No sequencer record found for kafka topic {} and partition {}", + "No shard record found for kafka topic {} and partition {}", kafka_topic, kafka_partition ))] @@ -66,7 +66,7 @@ pub trait IngestHandler: Send + Sync { request: IngesterQueryRequest, ) -> Result; - /// Return sequencer progress for the requested kafka partitions + /// Return shard progress for the requested kafka partitions async fn progresses( &self, sequencers: Vec, @@ -361,7 +361,7 @@ impl IngestHandler for IngestHandlerImpl { self.data.exec().shutdown(); } - /// Return the ingestion progress from each sequencer + /// Return the ingestion progress from each shard async fn progresses( &self, partitions: Vec, @@ -656,8 +656,8 @@ mod tests { .await .unwrap(); - let mut sequencer_states = BTreeMap::new(); - sequencer_states.insert(kafka_partition, shard); + let mut shard_states = BTreeMap::new(); + shard_states.insert(kafka_partition, shard); let write_buffer_state = MockBufferSharedState::empty_with_n_sequencers(NonZeroU32::try_from(1).unwrap()); @@ -686,7 +686,7 @@ mod tests { let ingester = IngestHandlerImpl::new( lifecycle_config, kafka_topic.clone(), - sequencer_states, + shard_states, Arc::clone(&catalog), object_store, reading, @@ -768,10 +768,9 @@ mod tests { ), ]; - let (ingester, sequencer, namespace) = - ingester_test_setup(write_operations, 2, false).await; + let (ingester, shard, namespace) = ingester_test_setup(write_operations, 2, false).await; - verify_ingester_buffer_has_data(ingester, sequencer, namespace, |first_batch| { + verify_ingester_buffer_has_data(ingester, shard, namespace, |first_batch| { if first_batch.min_sequencer_number == SequenceNumber::new(1) { panic!( "initialization did a seek to the beginning rather than \ @@ -799,8 +798,7 @@ mod tests { 150, ), )]; - let (ingester, _sequencer, _namespace) = - ingester_test_setup(write_operations, 2, false).await; + let (ingester, _shard, _namespace) = ingester_test_setup(write_operations, 2, false).await; tokio::time::timeout(Duration::from_millis(1000), ingester.join()) .await @@ -824,8 +822,7 @@ mod tests { 150, ), )]; - let (ingester, _sequencer, _namespace) = - ingester_test_setup(write_operations, 10, false).await; + let (ingester, _shard, _namespace) = ingester_test_setup(write_operations, 10, false).await; tokio::time::timeout(Duration::from_millis(1100), ingester.join()) .await @@ -849,8 +846,7 @@ mod tests { 150, ), )]; - let (ingester, _sequencer, _namespace) = - ingester_test_setup(write_operations, 10, true).await; + let (ingester, _shard, _namespace) = ingester_test_setup(write_operations, 10, true).await; tokio::time::timeout(Duration::from_millis(1100), ingester.join()) .await @@ -877,9 +873,9 @@ mod tests { // Set the min unpersisted to something bigger than the write's sequence number to // cause an UnknownSequenceNumber error. Skip to oldest available = true, so ingester // should find data - let (ingester, sequencer, namespace) = ingester_test_setup(write_operations, 1, true).await; + let (ingester, shard, namespace) = ingester_test_setup(write_operations, 1, true).await; - verify_ingester_buffer_has_data(ingester, sequencer, namespace, |first_batch| { + verify_ingester_buffer_has_data(ingester, shard, namespace, |first_batch| { assert_eq!( first_batch.min_sequencer_number, SequenceNumber::new(10), @@ -936,15 +932,15 @@ mod tests { .create("foo", "inf", kafka_topic.id, query_pool.id) .await .unwrap(); - let sequencer = txn + let shard = txn .shards() .create_or_get(&kafka_topic, kafka_partition) .await .unwrap(); txn.commit().await.unwrap(); - let mut sequencer_states = BTreeMap::new(); - sequencer_states.insert(kafka_partition, sequencer); + let mut shard_states = BTreeMap::new(); + shard_states.insert(kafka_partition, shard); let write_buffer_state = MockBufferSharedState::empty_with_n_sequencers(NonZeroU32::try_from(1).unwrap()); @@ -962,7 +958,7 @@ mod tests { let ingester = IngestHandlerImpl::new( lifecycle_config, kafka_topic.clone(), - sequencer_states, + shard_states, Arc::clone(&catalog), object_store, reading, @@ -976,7 +972,7 @@ mod tests { Self { catalog, - shard: sequencer, + shard, namespace, kafka_topic, kafka_partition, diff --git a/ingester/src/test_util.rs b/ingester/src/test_util.rs index 3d8081e8e7..ddf638b15a 100644 --- a/ingester/src/test_util.rs +++ b/ingester/src/test_util.rs @@ -862,7 +862,7 @@ pub(crate) async fn make_one_partition_with_tombstones( let ts = create_tombstone( 2, // tombstone id table_id.get(), // table id - shard_id.get(), // sequencer id + shard_id.get(), // shard id seq_num, // delete's seq_number 10, // min time of data to get deleted 50, // max time of data to get deleted