From efbfbb1a0bbbeead00f145281a38b3edfbf460b2 Mon Sep 17 00:00:00 2001 From: Nga Tran Date: Wed, 8 Dec 2021 16:06:03 -0500 Subject: [PATCH 1/7] feat: compact all object store chunks of a given partition --- .../influxdata/iox/management/v1/jobs.proto | 13 ++ .../iox/management/v1/service.proto | 20 ++ generated_types/src/job.rs | 4 + .../src/commands/database/partition.rs | 29 +++ .../server_type/database/rpc/management.rs | 31 ++++ .../tests/end_to_end_cases/management_api.rs | 172 ++++++++++++++++++ influxdb_iox_client/src/client/management.rs | 27 +++ server/src/db.rs | 32 +++- .../src/db/lifecycle/compact_object_store.rs | 3 +- 9 files changed, 329 insertions(+), 2 deletions(-) diff --git a/generated_types/protos/influxdata/iox/management/v1/jobs.proto b/generated_types/protos/influxdata/iox/management/v1/jobs.proto index 8a5a764e24..2ac5a11c44 100644 --- a/generated_types/protos/influxdata/iox/management/v1/jobs.proto +++ b/generated_types/protos/influxdata/iox/management/v1/jobs.proto @@ -42,6 +42,7 @@ message OperationMetadata { CompactObjectStoreChunks compact_object_store_chunks = 18; LoadReadBufferChunk load_read_buffer_chunk = 19; RebuildPreservedCatalog rebuild_preserved_catalog = 20; + CompactObjectStorePartition compact_object_store_partition = 21; } } @@ -110,6 +111,18 @@ message CompactObjectStoreChunks { repeated bytes chunks = 4; } +// Compact OS chunks of a partition into a single chunk +message CompactObjectStorePartition { + // name of the database + string db_name = 1; + + // partition key + string partition_key = 2; + + // table name + string table_name = 3; +} + // Split and write chunks to object store message PersistChunks { diff --git a/generated_types/protos/influxdata/iox/management/v1/service.proto b/generated_types/protos/influxdata/iox/management/v1/service.proto index 15191e0802..33a900491f 100644 --- a/generated_types/protos/influxdata/iox/management/v1/service.proto +++ b/generated_types/protos/influxdata/iox/management/v1/service.proto @@ -95,6 +95,10 @@ service ManagementService { // // Errors if the chunks are not compacted yet and not contiguous rpc CompactObjectStoreChunks(CompactObjectStoreChunksRequest) returns (CompactObjectStoreChunksResponse); + + // Compact all object store chunks of a given partition + // + rpc CompactObjectStorePartition(CompactObjectStorePartitionRequest) returns (CompactObjectStorePartitionResponse); } message ListDatabasesRequest { @@ -505,3 +509,19 @@ message CompactObjectStoreChunksResponse { google.longrunning.Operation operation = 1; } +// Request to commpact all object store of a given partition +message CompactObjectStorePartitionRequest { + // the name of the database + string db_name = 1; + + // the partition key + string partition_key = 2; + + // the table name + string table_name = 3; +} + +message CompactObjectStorePartitionResponse { + // The operation that tracks the work for compacting object store chunks + google.longrunning.Operation operation = 1; +} diff --git a/generated_types/src/job.rs b/generated_types/src/job.rs index 6328930d4b..2142c9f0c0 100644 --- a/generated_types/src/job.rs +++ b/generated_types/src/job.rs @@ -20,6 +20,10 @@ impl management::operation_metadata::Job { db_name, .. }) => db_name, + Self::CompactObjectStorePartition(management::CompactObjectStorePartition { + db_name, + .. + }) => db_name, } } } diff --git a/influxdb_iox/src/commands/database/partition.rs b/influxdb_iox/src/commands/database/partition.rs index bc8399ed68..32ec0f6569 100644 --- a/influxdb_iox/src/commands/database/partition.rs +++ b/influxdb_iox/src/commands/database/partition.rs @@ -85,6 +85,19 @@ struct CompactObjectStoreChunks { chunk_ids: Vec, } +/// Compact all Object Store Chunks of a partition +#[derive(Debug, StructOpt)] +struct CompactObjectStorePartition { + /// The name of the database + db_name: String, + + /// The partition key + partition_key: String, + + /// The table name + table_name: String, +} + /// lists all chunks in this partition #[derive(Debug, StructOpt)] struct ListChunks { @@ -175,6 +188,9 @@ enum Command { /// Errors if the chunks are not yet compacted and not contiguous. CompactObjectStoreChunks(CompactObjectStoreChunks), + // Compact all object store cunks of a given partition + CompactObjectStorePartition(CompactObjectStorePartition), + /// Drop partition from memory and (if persisted) from object store. Drop(DropPartition), @@ -255,6 +271,19 @@ pub async fn command(connection: Connection, config: Config) -> Result<()> { serde_json::to_writer_pretty(std::io::stdout(), &operation)?; } + Command::CompactObjectStorePartition(compact) => { + let CompactObjectStorePartition { + db_name, + partition_key, + table_name, + } = compact; + + let operation = client + .compact_object_store_partition(db_name, table_name, partition_key) + .await?; + + serde_json::to_writer_pretty(std::io::stdout(), &operation)?; + } Command::Drop(drop_partition) => { let DropPartition { db_name, diff --git a/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/management.rs b/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/management.rs index 5f4de6c73e..03960e3e53 100644 --- a/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/management.rs +++ b/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/management.rs @@ -535,6 +535,7 @@ impl management_service_server::ManagementService for ManagementService { Ok(Response::new(DropPartitionResponse {})) } + /// Compact all given object store chunks async fn compact_object_store_chunks( &self, request: Request, @@ -570,6 +571,36 @@ impl management_service_server::ManagementService for ManagementService { operation, })) } + + // Compact all object store chunks of the given partition + async fn compact_object_store_partition( + &self, + request: Request, + ) -> Result, Status> { + let CompactObjectStorePartitionRequest { + db_name, + partition_key, + table_name, + } = request.into_inner(); + + // Validate that the database name is legit + let db_name = DatabaseName::new(db_name).scope("db_name")?; + + let db = self + .server + .db(&db_name) + .map_err(default_server_error_handler)?; + + let tracker = db + .compact_object_store_partition(&table_name, &partition_key) + .map_err(default_db_error_handler)?; + + let operation = Some(super::operations::encode_tracker(tracker)?); + + Ok(Response::new(CompactObjectStorePartitionResponse { + operation, + })) + } } /// Returns [`DatabaseRules`] formated according to the `omit_defaults` flag. If `omit_defaults` is diff --git a/influxdb_iox/tests/end_to_end_cases/management_api.rs b/influxdb_iox/tests/end_to_end_cases/management_api.rs index fdae0d48a8..86ef3323d6 100644 --- a/influxdb_iox/tests/end_to_end_cases/management_api.rs +++ b/influxdb_iox/tests/end_to_end_cases/management_api.rs @@ -1949,3 +1949,175 @@ async fn test_compact_os_chunks() { assert_ne!(new_chunk_id, chunk_id_1); assert_ne!(new_chunk_id, chunk_id_2); } + +#[tokio::test] +async fn test_compact_os_partition() { + use data_types::chunk_metadata::ChunkStorage; + + let fixture = ServerFixture::create_shared(ServerType::Database).await; + let mut write_client = fixture.write_client(); + let mut management_client = fixture.management_client(); + let mut operations_client = fixture.operations_client(); + + let db_name = rand_name(); + DatabaseBuilder::new(db_name.clone()) + .persist(true) + .persist_age_threshold_seconds(1_000) + .late_arrive_window_seconds(1) + .build(fixture.grpc_channel()) + .await; + + // Chunk 1 + let lp_lines = vec!["cpu,tag1=cupcakes bar=1 10", "cpu,tag1=cookies bar=2 10"]; + + let num_lines_written = write_client + .write_lp(&db_name, lp_lines.join("\n"), 0) + .await + .expect("write succeded"); + assert_eq!(num_lines_written, 2); + + wait_for_exact_chunk_states( + &fixture, + &db_name, + vec![ChunkStorage::OpenMutableBuffer], + std::time::Duration::from_secs(5), + ) + .await; + + let chunks = management_client + .list_chunks(&db_name) + .await + .expect("listing chunks"); + assert_eq!(chunks.len(), 1); + let partition_key = &chunks[0].partition_key; + + management_client + .persist_partition(&db_name, "cpu", &partition_key[..], true) + .await + .unwrap(); + + let chunks = management_client + .list_chunks(&db_name) + .await + .expect("listing chunks"); + assert_eq!(chunks.len(), 1); + assert_eq!( + chunks[0].storage, + generated_types::influxdata::iox::management::v1::ChunkStorage::ReadBufferAndObjectStore + as i32 + ); + + // chunk 2 + let lp_lines = vec![ + "cpu,tag1=cookies bar=2 20", + "cpu,tag1=cookies bar=3 30", // duplicate + "cpu,tag1=cupcakes bar=2 20", + ]; + + let num_lines_written = write_client + .write_lp(&db_name, lp_lines.join("\n"), 0) + .await + .expect("write succeded"); + assert_eq!(num_lines_written, 3); + + let chunks = management_client + .list_chunks(&db_name) + .await + .expect("listing chunks"); + assert_eq!(chunks.len(), 2); + let partition_key = &chunks[0].partition_key; + + management_client + .persist_partition(&db_name, "cpu", &partition_key[..], true) + .await + .unwrap(); + + let mut chunks = management_client + .list_chunks(&db_name) + .await + .expect("listing chunks"); + // ensure chunk in deterministic order + chunks.sort_by(|c1, c2| c1.id.cmp(&c2.id)); + assert_eq!(chunks.len(), 2); + assert_eq!( + chunks[0].storage, + generated_types::influxdata::iox::management::v1::ChunkStorage::ReadBufferAndObjectStore + as i32 + ); + assert_eq!( + chunks[1].storage, + generated_types::influxdata::iox::management::v1::ChunkStorage::ReadBufferAndObjectStore + as i32 + ); + + let chunk_id_1 = chunks[0].id.clone(); + let partition_key_1 = &chunks[0].partition_key; + let chunk_id_2 = chunks[1].id.clone(); + let partition_key_2 = &chunks[1].partition_key; + assert_eq!(partition_key_1, partition_key_2); + + // unload both RUBs + management_client + .unload_partition_chunk(&db_name, "cpu", &partition_key_1[..], chunk_id_1.clone()) + .await + .unwrap(); + management_client + .unload_partition_chunk(&db_name, "cpu", &partition_key_2[..], chunk_id_2.clone()) + .await + .unwrap(); + + // verify chunk status again + let chunks = management_client + .list_chunks(&db_name) + .await + .expect("listing chunks"); + assert_eq!(chunks.len(), 2); + assert_eq!( + chunks[0].storage, + generated_types::influxdata::iox::management::v1::ChunkStorage::ObjectStoreOnly as i32 + ); + assert_eq!( + chunks[1].storage, + generated_types::influxdata::iox::management::v1::ChunkStorage::ObjectStoreOnly as i32 + ); + + // Compact all 2 OS chunks of the partition + let iox_operation = management_client + .compact_object_store_partition(&db_name, "cpu", &partition_key_1[..]) + .await + .unwrap(); + + let operation_id = iox_operation.operation.id(); + + // ensure we got a legit job description back + // note that since compact_object_store_partition invokes compact_object_store_chunks, + // its job is recorded as CompactObjectStoreChunks + match iox_operation.metadata.job { + Some(Job::CompactObjectStoreChunks(job)) => { + assert_eq!(&job.db_name, &db_name); + assert_eq!(job.partition_key.as_str(), partition_key_1); + assert_eq!(job.table_name.as_str(), "cpu"); + } + job => panic!("unexpected job returned {:#?}", job), + } + + // wait for the job to be done + operations_client + .wait_operation(operation_id, Some(std::time::Duration::from_secs(1))) + .await + .expect("failed to wait operation"); + + // verify chunks after compaction + let chunks = management_client + .list_chunks(&db_name) + .await + .expect("listing chunks"); + assert_eq!(chunks.len(), 1); + assert_eq!( + chunks[0].storage, + generated_types::influxdata::iox::management::v1::ChunkStorage::ObjectStoreOnly as i32 + ); + let new_chunk_id = chunks[0].id.clone(); + assert_ne!(new_chunk_id, chunk_id_1); + assert_ne!(new_chunk_id, chunk_id_2); +} diff --git a/influxdb_iox_client/src/client/management.rs b/influxdb_iox_client/src/client/management.rs index 3ec031cdb9..e435e51796 100644 --- a/influxdb_iox_client/src/client/management.rs +++ b/influxdb_iox_client/src/client/management.rs @@ -535,4 +535,31 @@ impl Client { .unwrap_field("operation")? .try_into()?) } + + /// Compact all object store of a give partition + pub async fn compact_object_store_partition( + &mut self, + db_name: impl Into + Send, + table_name: impl Into + Send, + partition_key: impl Into + Send, + ) -> Result { + let db_name = db_name.into(); + let partition_key = partition_key.into(); + let table_name = table_name.into(); + + let response = self + .inner + .compact_object_store_partition(CompactObjectStorePartitionRequest { + db_name, + partition_key, + table_name, + }) + .await?; + + Ok(response + .into_inner() + .operation + .unwrap_field("operation")? + .try_into()?) + } } diff --git a/server/src/db.rs b/server/src/db.rs index 1e7836091f..e4e18d3897 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -675,6 +675,37 @@ impl Db { fut.await.context(TaskCancelled)?.context(LifecycleError) } + /// Compact all persisted chunks in this partition + /// Return error if the persisted chunks are not contiguous. This means + /// there are chunks in between those OS chunks are not yet persisted + pub fn compact_object_store_partition( + self: &Arc, + table_name: &str, + partition_key: &str, + ) -> Result> { + // acquire partition read lock to get OS chunk ids + let partition = self.lockable_partition(table_name, partition_key)?; + let partition = partition.read(); + let chunks = partition.chunks(); + + // Get all OS chunk IDs + let mut chunk_ids = vec![]; + for chunk in chunks { + let chunk = chunk.read(); + if chunk.is_persisted() { + chunk_ids.push(chunk.id()); + } + } + + // drop partition lock + partition.into_data(); + + // Compact all the OS chunks + // Error will return if those OS chunks are not contiguous which means + // a chunk in between those OS chunks are not yet persisted + self.compact_object_store_chunks(table_name, partition_key, chunk_ids) + } + /// Compact all provided persisted chunks pub fn compact_object_store_chunks( self: &Arc, @@ -2135,7 +2166,6 @@ mod tests { load_parquet_from_store_for_path(&path_list[0], Arc::clone(&db.iox_object_store)) .await .unwrap(); - let parquet_metadata = IoxParquetMetaData::from_file_bytes(parquet_data.clone()) .unwrap() .unwrap(); diff --git a/server/src/db/lifecycle/compact_object_store.rs b/server/src/db/lifecycle/compact_object_store.rs index 41c7ca489d..bd9d8db817 100644 --- a/server/src/db/lifecycle/compact_object_store.rs +++ b/server/src/db/lifecycle/compact_object_store.rs @@ -690,7 +690,8 @@ mod tests { let partition = partition.upgrade(); let chunk1 = chunks[0].write(); let chunk2 = chunks[1].write(); - let _compacted_chunk = compact_object_store_chunks(partition, vec![chunk1, chunk2]) + // Provide the chunk ids in reverse contiguous order to see if we handle it well + let _compacted_chunk = compact_object_store_chunks(partition, vec![chunk2, chunk1]) .unwrap() .1 .await From e46708354e9c1ad47d51881bc63ee3da0ce44dc9 Mon Sep 17 00:00:00 2001 From: Nga Tran Date: Thu, 9 Dec 2021 12:53:45 -0500 Subject: [PATCH 2/7] test: add management cli tests --- .../src/commands/database/partition.rs | 2 +- .../tests/end_to_end_cases/management_cli.rs | 137 +++++++++++++++++- 2 files changed, 137 insertions(+), 2 deletions(-) diff --git a/influxdb_iox/src/commands/database/partition.rs b/influxdb_iox/src/commands/database/partition.rs index 32ec0f6569..f742578502 100644 --- a/influxdb_iox/src/commands/database/partition.rs +++ b/influxdb_iox/src/commands/database/partition.rs @@ -188,7 +188,7 @@ enum Command { /// Errors if the chunks are not yet compacted and not contiguous. CompactObjectStoreChunks(CompactObjectStoreChunks), - // Compact all object store cunks of a given partition + /// Compact all object store chunks of a given partition CompactObjectStorePartition(CompactObjectStorePartition), /// Drop partition from memory and (if persisted) from object store. diff --git a/influxdb_iox/tests/end_to_end_cases/management_cli.rs b/influxdb_iox/tests/end_to_end_cases/management_cli.rs index c2c7b7f10c..69d6731711 100644 --- a/influxdb_iox/tests/end_to_end_cases/management_cli.rs +++ b/influxdb_iox/tests/end_to_end_cases/management_cli.rs @@ -3,7 +3,7 @@ use crate::{ common::server_fixture::{ServerFixture, ServerType}, end_to_end_cases::scenario::{ fixture_broken_catalog, fixture_replay_broken, list_chunks, wait_for_exact_chunk_states, - DatabaseBuilder, + wait_for_operations_to_complete, DatabaseBuilder, }, }; use assert_cmd::Command; @@ -1547,3 +1547,138 @@ async fn test_persist_partition_error() { "Cannot persist partition because it cannot be flushed at the moment", )); } + +#[tokio::test] +async fn test_compact_os_partition() { + // Make 2 persisted chunks for a partition + let (fixture, db_name, addr, _chunk_ids) = setup_load_and_persist_two_partition_chunks().await; + + // Compact the partition which will compact those 2 chunks + let iox_operation: IoxOperation = serde_json::from_slice( + &Command::cargo_bin("influxdb_iox") + .unwrap() + .arg("database") + .arg("partition") + .arg("compact-object-store-partition") + .arg(&db_name) + .arg("cpu") // partition key + .arg("cpu") // table name + //.arg(chunk_ids) + .arg("--host") + .arg(addr) + .assert() + .success() + .get_output() + .stdout, + ) + .expect("Expected JSON output"); + + // Ensure we got a legit job description back + match iox_operation.metadata.job { + Some(Job::CompactObjectStoreChunks(job)) => { + assert_eq!(job.chunks.len(), 2); + assert_eq!(&job.db_name, &db_name); + assert_eq!(job.partition_key.as_str(), "cpu"); + assert_eq!(job.table_name.as_str(), "cpu"); + } + job => panic!("unexpected job returned {:#?}", job), + } + // Wait for the compaction to complete + wait_for_operations_to_complete(&fixture, &db_name, Duration::from_secs(5)).await; + + // Verify chunk the DB now only has one OS-only chunk + let chunks = list_chunks(&fixture, &db_name).await; + assert_eq!(chunks.len(), 1); + assert_eq!(chunks[0].storage, ChunkStorage::ObjectStoreOnly); +} + +#[tokio::test] +async fn test_compact_os_chunks() { + // Make 2 persisted chunks for a partition + let (fixture, db_name, addr, chunk_ids) = setup_load_and_persist_two_partition_chunks().await; + + // Compact the partition which will compact those 2 chunks + let iox_operation: IoxOperation = serde_json::from_slice( + &Command::cargo_bin("influxdb_iox") + .unwrap() + .arg("database") + .arg("partition") + .arg("compact-object-store-chunks") + .arg(&db_name) + .arg("cpu") // partition key + .arg("cpu") // table name + .arg(chunk_ids[0].clone()) + .arg(chunk_ids[1].clone()) + .arg("--host") + .arg(addr) + .assert() + .success() + .get_output() + .stdout, + ) + .expect("Expected JSON output"); + + // Ensure we got a legit job description back + match iox_operation.metadata.job { + Some(Job::CompactObjectStoreChunks(job)) => { + assert_eq!(job.chunks.len(), 2); + assert_eq!(&job.db_name, &db_name); + assert_eq!(job.partition_key.as_str(), "cpu"); + assert_eq!(job.table_name.as_str(), "cpu"); + } + job => panic!("unexpected job returned {:#?}", job), + } + // Wait for the compaction to complete + wait_for_operations_to_complete(&fixture, &db_name, Duration::from_secs(5)).await; + + // Verify chunk the DB now only has one OS-only chunk + let chunks = list_chunks(&fixture, &db_name).await; + assert_eq!(chunks.len(), 1); + assert_eq!(chunks[0].storage, ChunkStorage::ObjectStoreOnly); +} + +async fn setup_load_and_persist_two_partition_chunks( +) -> (Arc, String, String, Vec) { + let fixture = Arc::from(ServerFixture::create_shared(ServerType::Database).await); + let addr = fixture.grpc_base(); + let db_name = rand_name(); + + DatabaseBuilder::new(db_name.clone()) + .persist(true) + .persist_age_threshold_seconds(1) + .late_arrive_window_seconds(1) + .build(fixture.grpc_channel()) + .await; + + // Load first chunk and wait for it to get persisted + let lp_data = vec!["cpu,region=west user=23.2 10"]; + load_lp(addr, &db_name, lp_data); + + wait_for_exact_chunk_states( + &fixture, + &db_name, + vec![ChunkStorage::ReadBufferAndObjectStore], + std::time::Duration::from_secs(10), + ) + .await; + + // Load second chunk and wait for it to get persisted, too + let lp_data = vec!["cpu,region=east user=79 30"]; + load_lp(addr, &db_name, lp_data); + + let chunks = wait_for_exact_chunk_states( + &fixture, + &db_name, + vec![ + ChunkStorage::ReadBufferAndObjectStore, + ChunkStorage::ReadBufferAndObjectStore, + ], + std::time::Duration::from_secs(10), + ) + .await; + + // collect chunk ids + let chunk_ids: Vec<_> = chunks.iter().map(|c| c.id.get().to_string()).collect(); + + (Arc::clone(&fixture), db_name, String::from(addr), chunk_ids) +} From 099e2d405697f924e109d250f05950b2684c66c7 Mon Sep 17 00:00:00 2001 From: Nga Tran Date: Thu, 9 Dec 2021 13:52:42 -0500 Subject: [PATCH 3/7] chore: Apply suggestions from code review Co-authored-by: Marco Neumann --- influxdb_iox/tests/end_to_end_cases/management_api.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/influxdb_iox/tests/end_to_end_cases/management_api.rs b/influxdb_iox/tests/end_to_end_cases/management_api.rs index 86ef3323d6..35a320e3cb 100644 --- a/influxdb_iox/tests/end_to_end_cases/management_api.rs +++ b/influxdb_iox/tests/end_to_end_cases/management_api.rs @@ -1992,7 +1992,7 @@ async fn test_compact_os_partition() { let partition_key = &chunks[0].partition_key; management_client - .persist_partition(&db_name, "cpu", &partition_key[..], true) + .persist_partition(&db_name, "cpu", partition_key.as_ref(), true) .await .unwrap(); From b2f7306d5a525df0e1f5ae959194a73bcb661b9a Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 9 Dec 2021 14:22:29 -0500 Subject: [PATCH 4/7] docs: Update database startup machine diagram --- server/src/database.rs | 26 ++++++++++++++++++++++++-- 1 file changed, 24 insertions(+), 2 deletions(-) diff --git a/server/src/database.rs b/server/src/database.rs index c2685c803f..a244a38dcf 100644 --- a/server/src/database.rs +++ b/server/src/database.rs @@ -1061,10 +1061,21 @@ pub enum InitError { /// The Database startup state machine /// /// A Database starts in DatabaseState::Known and advances through the -/// states in sequential order until it reaches Initialized or an error -/// is encountered. +/// non error states in sequential order until either: +/// +/// 1. It reaches `Initialized` +/// +/// 2. It is reset to `Known` and starts initialization again +/// +/// 3. An error is encountered, in which case it transitions to one of +/// the error states. Most are Terminal (and thus require operator +/// intervention) but some (such as `WriteBufferCreationError` may +/// resolve after some time to the basic initialization sequence +/// (e.g. `Initialized`) +/// #[derive(Debug, Clone)] enum DatabaseState { + // Basic initialization sequence states: Known(DatabaseStateKnown), DatabaseObjectStoreFound(DatabaseStateDatabaseObjectStoreFound), OwnerInfoLoaded(DatabaseStateOwnerInfoLoaded), @@ -1072,12 +1083,23 @@ enum DatabaseState { CatalogLoaded(DatabaseStateCatalogLoaded), Initialized(DatabaseStateInitialized), + // Error states + /// Terminal State DatabaseObjectStoreLookupError(DatabaseStateKnown, Arc), + /// Terminal State NoActiveDatabase(DatabaseStateKnown, Arc), + /// Terminal State OwnerInfoLoadError(DatabaseStateDatabaseObjectStoreFound, Arc), + /// Terminal State RulesLoadError(DatabaseStateOwnerInfoLoaded, Arc), + /// Terminal State CatalogLoadError(DatabaseStateRulesLoaded, Arc), + /// Non Terminal State: There was an error creating a connction to + /// the WriteBuffer, but the connection will be retried. If a + /// connection is successfully created, the database will + /// transition to `Initialized` WriteBufferCreationError(DatabaseStateCatalogLoaded, Arc), + /// Terminal State ReplayError(DatabaseStateCatalogLoaded, Arc), } From 3cda6b6c0f5c72b710fa73aec581ec079700fd11 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 9 Dec 2021 14:58:19 -0500 Subject: [PATCH 5/7] refactor: Remove collect_query and replication (#3348) Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com> --- .../end_to_end_cases/database_migration.rs | 12 ++++---- .../tests/end_to_end_cases/delete_api.rs | 12 ++++---- .../tests/end_to_end_cases/flight_api.rs | 9 +++--- .../tests/end_to_end_cases/persistence.rs | 11 ++++++-- .../tests/end_to_end_cases/scenario.rs | 10 ------- .../tests/end_to_end_cases/system_tables.rs | 28 +++++++++++++------ .../tests/end_to_end_cases/tracing.rs | 10 ++++--- influxdb_iox_client/src/client/flight.rs | 4 +-- 8 files changed, 52 insertions(+), 44 deletions(-) diff --git a/influxdb_iox/tests/end_to_end_cases/database_migration.rs b/influxdb_iox/tests/end_to_end_cases/database_migration.rs index 57fbccde45..60b513bd0e 100644 --- a/influxdb_iox/tests/end_to_end_cases/database_migration.rs +++ b/influxdb_iox/tests/end_to_end_cases/database_migration.rs @@ -17,9 +17,8 @@ use uuid::Uuid; use crate::{ common::server_fixture::{ServerFixture, ServerType}, end_to_end_cases::scenario::{ - collect_query, create_readable_database, data_dir, db_data_dir, rand_name, - wait_for_database_initialized, wait_for_exact_chunk_states, - wait_for_operations_to_complete, + create_readable_database, data_dir, db_data_dir, rand_name, wait_for_database_initialized, + wait_for_exact_chunk_states, wait_for_operations_to_complete, }, }; @@ -234,13 +233,14 @@ async fn migrate_table_files_from_one_server_to_another() { wait_for_database_initialized(&fixture, &db_name, Duration::from_secs(5)).await; // Now the data shoudl be available for the_table - let query_results = flight_client + let batches = flight_client .perform_query(&db_name, sql_query) .await + .unwrap() + .collect() + .await .unwrap(); - let batches = collect_query(query_results).await; - let expected = vec![ "+-----------------+", "| COUNT(UInt8(1)) |", diff --git a/influxdb_iox/tests/end_to_end_cases/delete_api.rs b/influxdb_iox/tests/end_to_end_cases/delete_api.rs index 44a44e1387..765216a063 100644 --- a/influxdb_iox/tests/end_to_end_cases/delete_api.rs +++ b/influxdb_iox/tests/end_to_end_cases/delete_api.rs @@ -52,7 +52,7 @@ async fn test_delete_on_database() { .perform_query(db_name.clone(), "select * from cpu") .await .unwrap(); - let batches = query_results.to_batches().await.unwrap(); + let batches = query_results.collect().await.unwrap(); let expected = [ "+--------+--------------------------------+------+", "| region | time | user |", @@ -86,7 +86,7 @@ async fn test_delete_on_database() { .perform_query(db_name.clone(), "select * from cpu") .await .unwrap(); - let batches = query_results.to_batches().await.unwrap(); + let batches = query_results.collect().await.unwrap(); let expected = [ "+--------+--------------------------------+------+", "| region | time | user |", @@ -104,7 +104,7 @@ async fn test_delete_on_database() { ) .await .unwrap(); - let batches = query_results.to_batches().await.unwrap(); + let batches = query_results.collect().await.unwrap(); // result should be as above assert_batches_sorted_eq!(&expected, &batches); @@ -113,7 +113,7 @@ async fn test_delete_on_database() { .perform_query(db_name.clone(), "select * from cpu where user!=21") .await .unwrap(); - let batches = query_results.to_batches().await.unwrap(); + let batches = query_results.collect().await.unwrap(); // result should be nothing let expected = ["++", "++"]; assert_batches_sorted_eq!(&expected, &batches); @@ -135,7 +135,7 @@ async fn test_delete_on_database() { .perform_query(db_name.clone(), "select * from cpu") .await .unwrap(); - let batches = query_results.to_batches().await.unwrap(); + let batches = query_results.collect().await.unwrap(); let cpu_expected = [ "+--------+--------------------------------+------+", "| region | time | user |", @@ -149,7 +149,7 @@ async fn test_delete_on_database() { .perform_query(db_name.clone(), "select * from disk") .await .unwrap(); - let batches = query_results.to_batches().await.unwrap(); + let batches = query_results.collect().await.unwrap(); let disk_expected = [ "+-------+--------+--------------------------------+", "| bytes | region | time |", diff --git a/influxdb_iox/tests/end_to_end_cases/flight_api.rs b/influxdb_iox/tests/end_to_end_cases/flight_api.rs index 6381b45530..b1c15a17a2 100644 --- a/influxdb_iox/tests/end_to_end_cases/flight_api.rs +++ b/influxdb_iox/tests/end_to_end_cases/flight_api.rs @@ -1,4 +1,4 @@ -use super::scenario::{collect_query, create_readable_database, rand_name, Scenario}; +use super::scenario::{create_readable_database, rand_name, Scenario}; use crate::common::server_fixture::{ServerFixture, ServerType}; use arrow_util::assert_batches_sorted_eq; @@ -20,13 +20,14 @@ pub async fn test() { // This does nothing except test the client handshake implementation. client.handshake().await.unwrap(); - let query_results = client + let batches = client .perform_query(scenario.database_name(), sql_query) .await + .unwrap() + .collect() + .await .unwrap(); - let batches = collect_query(query_results).await; - let expected_read_data: Vec<_> = expected_read_data.iter().map(|s| s.as_str()).collect(); assert_batches_sorted_eq!(expected_read_data, &batches); } diff --git a/influxdb_iox/tests/end_to_end_cases/persistence.rs b/influxdb_iox/tests/end_to_end_cases/persistence.rs index ad244e996f..96eaf6265b 100644 --- a/influxdb_iox/tests/end_to_end_cases/persistence.rs +++ b/influxdb_iox/tests/end_to_end_cases/persistence.rs @@ -10,7 +10,7 @@ use crate::{ end_to_end_cases::scenario::{list_chunks, wait_for_exact_chunk_states}, }; -use super::scenario::{collect_query, create_readable_database, rand_name, DatabaseBuilder}; +use super::scenario::{create_readable_database, rand_name, DatabaseBuilder}; use crate::common::server_fixture::DEFAULT_SERVER_ID; use generated_types::influxdata::iox::management::v1::{operation_metadata::Job, CompactChunks}; @@ -310,9 +310,14 @@ async fn assert_chunk_query_works(fixture: &ServerFixture, db_name: &str) { let mut client = fixture.flight_client(); let sql_query = "select region, user, time from cpu"; - let query_results = client.perform_query(db_name, sql_query).await.unwrap(); + let batches = client + .perform_query(db_name, sql_query) + .await + .unwrap() + .collect() + .await + .unwrap(); - let batches = collect_query(query_results).await; let expected_read_data = vec![ "+--------+------+--------------------------------+", "| region | user | time |", diff --git a/influxdb_iox/tests/end_to_end_cases/scenario.rs b/influxdb_iox/tests/end_to_end_cases/scenario.rs index 9a0f1e6c9e..b4638c074b 100644 --- a/influxdb_iox/tests/end_to_end_cases/scenario.rs +++ b/influxdb_iox/tests/end_to_end_cases/scenario.rs @@ -15,7 +15,6 @@ use generated_types::{ }; use influxdb_iox_client::{ connection::Connection, - flight::PerformQuery, management::{ self, generated_types::{partition_template, WriteBufferConnection}, @@ -468,15 +467,6 @@ pub async fn create_two_partition_database(db_name: impl Into, channel: .expect("write succeded"); } -/// Collect the results of a query into a vector of record batches -pub async fn collect_query(mut query_results: PerformQuery) -> Vec { - let mut batches = vec![]; - while let Some(data) = query_results.next().await.unwrap() { - batches.push(data); - } - batches -} - /// Wait for the chunks to be in exactly `desired_storages` states pub async fn wait_for_exact_chunk_states( fixture: &ServerFixture, diff --git a/influxdb_iox/tests/end_to_end_cases/system_tables.rs b/influxdb_iox/tests/end_to_end_cases/system_tables.rs index 2226d455a6..f1438ab796 100644 --- a/influxdb_iox/tests/end_to_end_cases/system_tables.rs +++ b/influxdb_iox/tests/end_to_end_cases/system_tables.rs @@ -4,7 +4,7 @@ use crate::{ }; use arrow_util::{assert_batches_eq, test_util::normalize_batches}; -use super::scenario::{collect_query, create_readable_database, list_chunks, rand_name}; +use super::scenario::{create_readable_database, list_chunks, rand_name}; #[tokio::test] async fn test_operations() { @@ -47,9 +47,13 @@ async fn test_operations() { let mut client = fixture.flight_client(); let sql_query = "select status, description from system.operations"; - let query_results = client.perform_query(&db_name1, sql_query).await.unwrap(); - - let batches = collect_query(query_results).await; + let batches = client + .perform_query(&db_name1, sql_query) + .await + .unwrap() + .collect() + .await + .unwrap(); // parameterize on db_name1 @@ -64,9 +68,14 @@ async fn test_operations() { assert_batches_eq!(expected_read_data, &batches); // Should not see jobs from db1 when querying db2 - let query_results = client.perform_query(&db_name2, sql_query).await.unwrap(); + let batches = client + .perform_query(&db_name2, sql_query) + .await + .unwrap() + .collect() + .await + .unwrap(); - let batches = collect_query(query_results).await; let expected_read_data = vec![ "+--------+-------------+", "| status | description |", @@ -109,13 +118,14 @@ async fn test_queries() { let query = "select query_type, query_text from system.queries"; // Query system.queries and should have an entry for the storage rpc - let query_results = fixture + let batches = fixture .flight_client() .perform_query(&db_name, query) .await + .unwrap() + .collect() + .await .unwrap(); - - let batches = collect_query(query_results).await; let batches = normalize_batches(batches, scenario.normalizer()); let expected_read_data = vec![ diff --git a/influxdb_iox/tests/end_to_end_cases/tracing.rs b/influxdb_iox/tests/end_to_end_cases/tracing.rs index 50ce73477a..92ee88a4b8 100644 --- a/influxdb_iox/tests/end_to_end_cases/tracing.rs +++ b/influxdb_iox/tests/end_to_end_cases/tracing.rs @@ -1,4 +1,4 @@ -use super::scenario::{collect_query, Scenario}; +use super::scenario::Scenario; use crate::common::{ server_fixture::{ServerFixture, ServerType, TestConfig}, udp_listener::UdpCapture, @@ -33,6 +33,7 @@ async fn setup() -> (UdpCapture, ServerFixture) { (udp_capture, server_fixture) } +/// Runs a query, discarding the results async fn run_sql_query(server_fixture: &ServerFixture) { let scenario = Scenario::new(); scenario @@ -44,12 +45,13 @@ async fn run_sql_query(server_fixture: &ServerFixture) { let sql_query = "select * from cpu_load_short"; let mut client = server_fixture.flight_client(); - let query_results = client + client .perform_query(scenario.database_name(), sql_query) .await + .unwrap() + .collect() + .await .unwrap(); - - collect_query(query_results).await; } #[tokio::test] diff --git a/influxdb_iox_client/src/client/flight.rs b/influxdb_iox_client/src/client/flight.rs index 01aaeeb300..67482bd6cf 100644 --- a/influxdb_iox_client/src/client/flight.rs +++ b/influxdb_iox_client/src/client/flight.rs @@ -218,8 +218,8 @@ impl PerformQuery { )?)) } - /// Return all record batches of it - pub async fn to_batches(&mut self) -> Result, Error> { + /// Collect and return all `RecordBatch`es into a `Vec` + pub async fn collect(&mut self) -> Result, Error> { let mut batches = Vec::new(); while let Some(data) = self.next().await? { batches.push(data); From 191e743ce0d7c21f7f37f898a49b05e049d1f2f7 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 9 Dec 2021 15:04:39 -0500 Subject: [PATCH 6/7] fix: Update server/src/database.rs Co-authored-by: Carol (Nichols || Goulding) <193874+carols10cents@users.noreply.github.com> --- server/src/database.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/database.rs b/server/src/database.rs index a244a38dcf..c69ced3fc7 100644 --- a/server/src/database.rs +++ b/server/src/database.rs @@ -1069,7 +1069,7 @@ pub enum InitError { /// /// 3. An error is encountered, in which case it transitions to one of /// the error states. Most are Terminal (and thus require operator -/// intervention) but some (such as `WriteBufferCreationError` may +/// intervention) but some (such as `WriteBufferCreationError`) may /// resolve after some time to the basic initialization sequence /// (e.g. `Initialized`) /// From 35370922f3c9ce7fbd8792d0f77026a7db9a6575 Mon Sep 17 00:00:00 2001 From: Nga Tran Date: Thu, 9 Dec 2021 15:21:56 -0500 Subject: [PATCH 7/7] refactor: make a setup for 2 persisted chunks that can be used in for different places --- .../tests/end_to_end_cases/management_api.rs | 294 ++---------------- .../tests/end_to_end_cases/management_cli.rs | 2 +- 2 files changed, 29 insertions(+), 267 deletions(-) diff --git a/influxdb_iox/tests/end_to_end_cases/management_api.rs b/influxdb_iox/tests/end_to_end_cases/management_api.rs index 35a320e3cb..d213ac7fec 100644 --- a/influxdb_iox/tests/end_to_end_cases/management_api.rs +++ b/influxdb_iox/tests/end_to_end_cases/management_api.rs @@ -1,3 +1,4 @@ +use bytes::Bytes; use data_types::chunk_metadata::ChunkId; use generated_types::google::protobuf::{Duration, Empty}; use influxdb_iox_client::{ @@ -15,13 +16,16 @@ use test_helpers::{assert_contains, assert_error}; use super::scenario::{ create_readable_database, create_two_partition_database, create_unreadable_database, rand_name, }; -use crate::common::server_fixture::{TestConfig, DEFAULT_SERVER_ID}; use crate::{ common::server_fixture::{ServerFixture, ServerType}, end_to_end_cases::scenario::{ fixture_broken_catalog, wait_for_exact_chunk_states, DatabaseBuilder, }, }; +use crate::{ + common::server_fixture::{TestConfig, DEFAULT_SERVER_ID}, + end_to_end_cases::management_cli::setup_load_and_persist_two_partition_chunks, +}; use std::time::Instant; use uuid::Uuid; @@ -1777,143 +1781,24 @@ async fn test_persist_partition_error() { #[tokio::test] async fn test_compact_os_chunks() { - use data_types::chunk_metadata::ChunkStorage; - - let fixture = ServerFixture::create_shared(ServerType::Database).await; - let mut write_client = fixture.write_client(); + // Make 2 persisted chunks for a partition + let (fixture, db_name, _addr, chunk_ids) = setup_load_and_persist_two_partition_chunks().await; + assert!(chunk_ids.len() > 1); let mut management_client = fixture.management_client(); let mut operations_client = fixture.operations_client(); - let db_name = rand_name(); - DatabaseBuilder::new(db_name.clone()) - .persist(true) - .persist_age_threshold_seconds(1_000) - .late_arrive_window_seconds(1) - .build(fixture.grpc_channel()) - .await; + let c_ids: Vec = chunk_ids + .iter() + .map(|id| { + let id_uuid = Uuid::parse_str(id).unwrap(); + id_uuid.as_bytes().to_vec().into() + }) + .collect(); - // Chunk 1 - let lp_lines = vec!["cpu,tag1=cupcakes bar=1 10", "cpu,tag1=cookies bar=2 10"]; - - let num_lines_written = write_client - .write_lp(&db_name, lp_lines.join("\n"), 0) - .await - .expect("write succeded"); - assert_eq!(num_lines_written, 2); - - wait_for_exact_chunk_states( - &fixture, - &db_name, - vec![ChunkStorage::OpenMutableBuffer], - std::time::Duration::from_secs(5), - ) - .await; - - let chunks = management_client - .list_chunks(&db_name) - .await - .expect("listing chunks"); - assert_eq!(chunks.len(), 1); - let partition_key = &chunks[0].partition_key; - - management_client - .persist_partition(&db_name, "cpu", &partition_key[..], true) - .await - .unwrap(); - - let chunks = management_client - .list_chunks(&db_name) - .await - .expect("listing chunks"); - assert_eq!(chunks.len(), 1); - assert_eq!( - chunks[0].storage, - generated_types::influxdata::iox::management::v1::ChunkStorage::ReadBufferAndObjectStore - as i32 - ); - - // chunk 2 - let lp_lines = vec![ - "cpu,tag1=cookies bar=2 20", - "cpu,tag1=cookies bar=3 30", // duplicate - "cpu,tag1=cupcakes bar=2 20", - ]; - - let num_lines_written = write_client - .write_lp(&db_name, lp_lines.join("\n"), 0) - .await - .expect("write succeded"); - assert_eq!(num_lines_written, 3); - - let chunks = management_client - .list_chunks(&db_name) - .await - .expect("listing chunks"); - assert_eq!(chunks.len(), 2); - let partition_key = &chunks[0].partition_key; - - management_client - .persist_partition(&db_name, "cpu", &partition_key[..], true) - .await - .unwrap(); - - let mut chunks = management_client - .list_chunks(&db_name) - .await - .expect("listing chunks"); - // ensure chunk in deterministic order - chunks.sort_by(|c1, c2| c1.id.cmp(&c2.id)); - assert_eq!(chunks.len(), 2); - assert_eq!( - chunks[0].storage, - generated_types::influxdata::iox::management::v1::ChunkStorage::ReadBufferAndObjectStore - as i32 - ); - assert_eq!( - chunks[1].storage, - generated_types::influxdata::iox::management::v1::ChunkStorage::ReadBufferAndObjectStore - as i32 - ); - - let chunk_id_1 = chunks[0].id.clone(); - let partition_key_1 = &chunks[0].partition_key; - let chunk_id_2 = chunks[1].id.clone(); - let partition_key_2 = &chunks[1].partition_key; - assert_eq!(partition_key_1, partition_key_2); - - // unload both RUBs - management_client - .unload_partition_chunk(&db_name, "cpu", &partition_key_1[..], chunk_id_1.clone()) - .await - .unwrap(); - management_client - .unload_partition_chunk(&db_name, "cpu", &partition_key_2[..], chunk_id_2.clone()) - .await - .unwrap(); - - // verify chunk status again - let chunks = management_client - .list_chunks(&db_name) - .await - .expect("listing chunks"); - assert_eq!(chunks.len(), 2); - assert_eq!( - chunks[0].storage, - generated_types::influxdata::iox::management::v1::ChunkStorage::ObjectStoreOnly as i32 - ); - assert_eq!( - chunks[1].storage, - generated_types::influxdata::iox::management::v1::ChunkStorage::ObjectStoreOnly as i32 - ); - - // Compact 2 chunks + // Compact all 2 OS chunks of the partition + // note that both partition and table_name are "cpu" in the setup let iox_operation = management_client - .compact_object_store_chunks( - &db_name, - "cpu", - &partition_key_1[..], - vec![chunk_id_1.clone(), chunk_id_2.clone()], - ) + .compact_object_store_chunks(&db_name, "cpu", "cpu", c_ids.clone()) .await .unwrap(); @@ -1923,7 +1808,7 @@ async fn test_compact_os_chunks() { match iox_operation.metadata.job { Some(Job::CompactObjectStoreChunks(job)) => { assert_eq!(&job.db_name, &db_name); - assert_eq!(job.partition_key.as_str(), partition_key_1); + assert_eq!(job.partition_key.as_str(), "cpu"); assert_eq!(job.table_name.as_str(), "cpu"); } job => panic!("unexpected job returned {:#?}", job), @@ -1946,144 +1831,21 @@ async fn test_compact_os_chunks() { generated_types::influxdata::iox::management::v1::ChunkStorage::ObjectStoreOnly as i32 ); let new_chunk_id = chunks[0].id.clone(); - assert_ne!(new_chunk_id, chunk_id_1); - assert_ne!(new_chunk_id, chunk_id_2); + assert_ne!(new_chunk_id, c_ids[0]); + assert_ne!(new_chunk_id, c_ids[1]); } #[tokio::test] async fn test_compact_os_partition() { - use data_types::chunk_metadata::ChunkStorage; - - let fixture = ServerFixture::create_shared(ServerType::Database).await; - let mut write_client = fixture.write_client(); + // Make 2 persisted chunks for a partition + let (fixture, db_name, _addr, chunk_ids) = setup_load_and_persist_two_partition_chunks().await; let mut management_client = fixture.management_client(); let mut operations_client = fixture.operations_client(); - let db_name = rand_name(); - DatabaseBuilder::new(db_name.clone()) - .persist(true) - .persist_age_threshold_seconds(1_000) - .late_arrive_window_seconds(1) - .build(fixture.grpc_channel()) - .await; - - // Chunk 1 - let lp_lines = vec!["cpu,tag1=cupcakes bar=1 10", "cpu,tag1=cookies bar=2 10"]; - - let num_lines_written = write_client - .write_lp(&db_name, lp_lines.join("\n"), 0) - .await - .expect("write succeded"); - assert_eq!(num_lines_written, 2); - - wait_for_exact_chunk_states( - &fixture, - &db_name, - vec![ChunkStorage::OpenMutableBuffer], - std::time::Duration::from_secs(5), - ) - .await; - - let chunks = management_client - .list_chunks(&db_name) - .await - .expect("listing chunks"); - assert_eq!(chunks.len(), 1); - let partition_key = &chunks[0].partition_key; - - management_client - .persist_partition(&db_name, "cpu", partition_key.as_ref(), true) - .await - .unwrap(); - - let chunks = management_client - .list_chunks(&db_name) - .await - .expect("listing chunks"); - assert_eq!(chunks.len(), 1); - assert_eq!( - chunks[0].storage, - generated_types::influxdata::iox::management::v1::ChunkStorage::ReadBufferAndObjectStore - as i32 - ); - - // chunk 2 - let lp_lines = vec![ - "cpu,tag1=cookies bar=2 20", - "cpu,tag1=cookies bar=3 30", // duplicate - "cpu,tag1=cupcakes bar=2 20", - ]; - - let num_lines_written = write_client - .write_lp(&db_name, lp_lines.join("\n"), 0) - .await - .expect("write succeded"); - assert_eq!(num_lines_written, 3); - - let chunks = management_client - .list_chunks(&db_name) - .await - .expect("listing chunks"); - assert_eq!(chunks.len(), 2); - let partition_key = &chunks[0].partition_key; - - management_client - .persist_partition(&db_name, "cpu", &partition_key[..], true) - .await - .unwrap(); - - let mut chunks = management_client - .list_chunks(&db_name) - .await - .expect("listing chunks"); - // ensure chunk in deterministic order - chunks.sort_by(|c1, c2| c1.id.cmp(&c2.id)); - assert_eq!(chunks.len(), 2); - assert_eq!( - chunks[0].storage, - generated_types::influxdata::iox::management::v1::ChunkStorage::ReadBufferAndObjectStore - as i32 - ); - assert_eq!( - chunks[1].storage, - generated_types::influxdata::iox::management::v1::ChunkStorage::ReadBufferAndObjectStore - as i32 - ); - - let chunk_id_1 = chunks[0].id.clone(); - let partition_key_1 = &chunks[0].partition_key; - let chunk_id_2 = chunks[1].id.clone(); - let partition_key_2 = &chunks[1].partition_key; - assert_eq!(partition_key_1, partition_key_2); - - // unload both RUBs - management_client - .unload_partition_chunk(&db_name, "cpu", &partition_key_1[..], chunk_id_1.clone()) - .await - .unwrap(); - management_client - .unload_partition_chunk(&db_name, "cpu", &partition_key_2[..], chunk_id_2.clone()) - .await - .unwrap(); - - // verify chunk status again - let chunks = management_client - .list_chunks(&db_name) - .await - .expect("listing chunks"); - assert_eq!(chunks.len(), 2); - assert_eq!( - chunks[0].storage, - generated_types::influxdata::iox::management::v1::ChunkStorage::ObjectStoreOnly as i32 - ); - assert_eq!( - chunks[1].storage, - generated_types::influxdata::iox::management::v1::ChunkStorage::ObjectStoreOnly as i32 - ); - // Compact all 2 OS chunks of the partition + // note that both partition and table_name are "cpu" in the setup let iox_operation = management_client - .compact_object_store_partition(&db_name, "cpu", &partition_key_1[..]) + .compact_object_store_partition(&db_name, "cpu", "cpu") .await .unwrap(); @@ -2095,7 +1857,7 @@ async fn test_compact_os_partition() { match iox_operation.metadata.job { Some(Job::CompactObjectStoreChunks(job)) => { assert_eq!(&job.db_name, &db_name); - assert_eq!(job.partition_key.as_str(), partition_key_1); + assert_eq!(job.partition_key.as_str(), "cpu"); assert_eq!(job.table_name.as_str(), "cpu"); } job => panic!("unexpected job returned {:#?}", job), @@ -2118,6 +1880,6 @@ async fn test_compact_os_partition() { generated_types::influxdata::iox::management::v1::ChunkStorage::ObjectStoreOnly as i32 ); let new_chunk_id = chunks[0].id.clone(); - assert_ne!(new_chunk_id, chunk_id_1); - assert_ne!(new_chunk_id, chunk_id_2); + assert_ne!(new_chunk_id, chunk_ids[0]); + assert_ne!(new_chunk_id, chunk_ids[1]); } diff --git a/influxdb_iox/tests/end_to_end_cases/management_cli.rs b/influxdb_iox/tests/end_to_end_cases/management_cli.rs index 69d6731711..46f5441691 100644 --- a/influxdb_iox/tests/end_to_end_cases/management_cli.rs +++ b/influxdb_iox/tests/end_to_end_cases/management_cli.rs @@ -1637,7 +1637,7 @@ async fn test_compact_os_chunks() { assert_eq!(chunks[0].storage, ChunkStorage::ObjectStoreOnly); } -async fn setup_load_and_persist_two_partition_chunks( +pub async fn setup_load_and_persist_two_partition_chunks( ) -> (Arc, String, String, Vec) { let fixture = Arc::from(ServerFixture::create_shared(ServerType::Database).await); let addr = fixture.grpc_base();