From 5f3706e0ee15185abb69fe39ce9b684323fc958f Mon Sep 17 00:00:00 2001 From: Nga Tran Date: Fri, 3 Dec 2021 18:01:28 -0500 Subject: [PATCH 1/3] feat: grpc call for copact object store chunks --- generated_types/build.rs | 1 + .../iox/management/v1/service.proto | 24 +++++++++++ .../src/commands/database/partition.rs | 43 +++++++++++++++++++ .../server_type/database/rpc/management.rs | 32 ++++++++++++++ influxdb_iox_client/src/client/management.rs | 30 +++++++++++++ server/src/db.rs | 18 ++++++-- 6 files changed, 144 insertions(+), 4 deletions(-) diff --git a/generated_types/build.rs b/generated_types/build.rs index b9eae88b62..9880e2f190 100644 --- a/generated_types/build.rs +++ b/generated_types/build.rs @@ -85,6 +85,7 @@ fn generate_grpc_types(root: &Path) -> Result<()> { ".influxdata.iox.management.v1.ClosePartitionChunkRequest.chunk_id", ".influxdata.iox.management.v1.CompactChunks.chunks", ".influxdata.iox.management.v1.CompactObjectStoreChunks.chunks", + ".influxdata.iox.management.v1.CompactObjectStoreChunksRequest.chunk_ids", ".influxdata.iox.management.v1.DropChunk.chunk_id", ".influxdata.iox.management.v1.PersistChunks.chunks", ".influxdata.iox.management.v1.WriteChunk.chunk_id", diff --git a/generated_types/protos/influxdata/iox/management/v1/service.proto b/generated_types/protos/influxdata/iox/management/v1/service.proto index 0a5077bdfe..52b2ed0d63 100644 --- a/generated_types/protos/influxdata/iox/management/v1/service.proto +++ b/generated_types/protos/influxdata/iox/management/v1/service.proto @@ -90,6 +90,11 @@ service ManagementService { // Drop partition from memory and (if persisted) from object store. rpc DropPartition(DropPartitionRequest) returns (DropPartitionResponse); + + // Compact given object store chunks + // + // Errors if the chunks are not compacted yet and not contiguous + rpc CompactObjectStoreChunks(CompactObjectStoreChunksRequest) returns (CompactObjectStoreChunksResponse); } message ListDatabasesRequest { @@ -463,3 +468,22 @@ message DropPartitionRequest { message DropPartitionResponse { } + +// Request to commpact given object store chunks +message CompactObjectStoreChunksRequest { + // the name of the database + string db_name = 1; + + // the partition key + string partition_key = 2; + + // the table name + string table_name = 3; + + // the chunk ids + repeated bytes chunk_ids = 4; +} + +message CompactObjectStoreChunksResponse { +} + diff --git a/influxdb_iox/src/commands/database/partition.rs b/influxdb_iox/src/commands/database/partition.rs index 28fa87cc17..a786c49d20 100644 --- a/influxdb_iox/src/commands/database/partition.rs +++ b/influxdb_iox/src/commands/database/partition.rs @@ -67,6 +67,25 @@ struct Persist { force: bool, } +/// Compact Object Store Chunks +/// +/// Errors if the chunks are not yet compacted and not contiguous. If successful it returns the +/// compacted chunk or None if no data after compacted due to rows eliminated from deletes and deduplication +#[derive(Debug, StructOpt)] +struct CompactObjectStoreChunks { + /// The name of the database + db_name: String, + + /// The partition key + partition_key: String, + + /// The table name + table_name: String, + + /// The chunk ids + chunk_ids: Vec, +} + /// lists all chunks in this partition #[derive(Debug, StructOpt)] struct ListChunks { @@ -152,6 +171,12 @@ enum Command { /// chunk that contains the persisted data. Persist(Persist), + /// Compact Object Store Chunks + /// + /// Errors if the chunks are not yet compacted and not contiguous. If successful it returns the + /// compacted chunk or None if no data after compacted due to rows eliminated from deletes and deduplication + CompactObjectStoreChunks(CompactObjectStoreChunks), + /// Drop partition from memory and (if persisted) from object store. Drop(DropPartition), @@ -213,6 +238,24 @@ pub async fn command(connection: Connection, config: Config) -> Result<()> { .await?; println!("Ok"); } + Command::CompactObjectStoreChunks(compact) => { + let CompactObjectStoreChunks { + db_name, + partition_key, + table_name, + chunk_ids, + } = compact; + + let chunk_ids = chunk_ids + .iter() + .map(|chunk_id| chunk_id.as_bytes().to_vec().into()) + .collect(); + + client + .compact_object_store_chunks(db_name, table_name, partition_key, chunk_ids) + .await?; + println!("Ok"); + } Command::Drop(drop_partition) => { let DropPartition { db_name, diff --git a/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/management.rs b/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/management.rs index 2b8c4724ed..0f1f5ed6b1 100644 --- a/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/management.rs +++ b/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/management.rs @@ -534,6 +534,38 @@ impl management_service_server::ManagementService for ManagementService { Ok(Response::new(DropPartitionResponse {})) } + + async fn compact_object_store_chunks( + &self, + request: Request, + ) -> Result, Status> { + let CompactObjectStoreChunksRequest { + db_name, + partition_key, + table_name, + chunk_ids, + } = request.into_inner(); + + // Validate that the database name is legit + let db_name = DatabaseName::new(db_name).scope("db_name")?; + + let db = self + .server + .db(&db_name) + .map_err(default_server_error_handler)?; + + let mut chunk_id_ids = vec![]; + for chunk_id in chunk_ids { + let chunk_id = ChunkId::try_from(chunk_id).scope("chunk_id")?; + chunk_id_ids.push(chunk_id); + } + + db.compact_object_store_chunks(&table_name, &partition_key, chunk_id_ids) + .await + .map_err(default_db_error_handler)?; + + Ok(Response::new(CompactObjectStoreChunksResponse {})) + } } /// Returns [`DatabaseRules`] formated according to the `omit_defaults` flag. If `omit_defaults` is diff --git a/influxdb_iox_client/src/client/management.rs b/influxdb_iox_client/src/client/management.rs index 5bcb91cd41..25ade00eef 100644 --- a/influxdb_iox_client/src/client/management.rs +++ b/influxdb_iox_client/src/client/management.rs @@ -494,4 +494,34 @@ impl Client { Ok(()) } + + /// Compact given object store chunks (db, table, partition, chunks) + /// + /// Error if the chunks are not yet compacted and not contiguous + pub async fn compact_object_store_chunks( + &mut self, + db_name: impl Into + Send, + table_name: impl Into + Send, + partition_key: impl Into + Send, + chunk_ids: Vec, + ) -> Result<(), Error> { + let db_name = db_name.into(); + let partition_key = partition_key.into(); + let table_name = table_name.into(); + // let chunk_ids = chunk_ids + // .iter() + // .map(|chunk_id| chunk_id.as_bytes().to_vec()) + // .collect(); + + self.inner + .compact_object_store_chunks(CompactObjectStoreChunksRequest { + db_name, + partition_key, + table_name, + chunk_ids, + }) + .await?; + + Ok(()) + } } diff --git a/server/src/db.rs b/server/src/db.rs index f614911bdd..b43f58dbf4 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -676,14 +676,24 @@ impl Db { // Use explicit scope to ensure the async generator doesn't // assume the locks have to possibly live across the `await` let fut = { + // Lock for read let partition = self.lockable_partition(table_name, partition_key)?; let partition = partition.read(); + let mut chunks = vec![]; + for chunk_id in chunk_ids { + let chunk = LockablePartition::chunk(&partition, chunk_id).ok_or( + catalog::Error::ChunkNotFound { + chunk_id, + partition: partition_key.to_string(), + table: table_name.to_string(), + }, + )?; + chunks.push(chunk); + } - // todo: set these chunks - let chunks = vec![]; - - // Lock partition for write + // Lock for write let partition = partition.upgrade(); + let chunks = chunks.iter().map(|c| c.write()).collect(); // invoke compact let (_, fut) = From d8ed8a3bf724135e6a92813ea5b69e7361c4f552 Mon Sep 17 00:00:00 2001 From: Nga Tran Date: Mon, 6 Dec 2021 12:06:07 -0500 Subject: [PATCH 2/3] refactor: address review comments --- .../influxdata/iox/management/v1/service.proto | 2 ++ .../src/commands/database/partition.rs | 11 +++++------ .../server_type/database/rpc/management.rs | 10 +++++++--- influxdb_iox_client/src/client/management.rs | 15 ++++++++------- server/src/db.rs | 18 ++++++++---------- 5 files changed, 30 insertions(+), 26 deletions(-) diff --git a/generated_types/protos/influxdata/iox/management/v1/service.proto b/generated_types/protos/influxdata/iox/management/v1/service.proto index 52b2ed0d63..61a0304496 100644 --- a/generated_types/protos/influxdata/iox/management/v1/service.proto +++ b/generated_types/protos/influxdata/iox/management/v1/service.proto @@ -485,5 +485,7 @@ message CompactObjectStoreChunksRequest { } message CompactObjectStoreChunksResponse { + // The operation that tracks the work for compacting object store chunks + google.longrunning.Operation operation = 1; } diff --git a/influxdb_iox/src/commands/database/partition.rs b/influxdb_iox/src/commands/database/partition.rs index a786c49d20..bc8399ed68 100644 --- a/influxdb_iox/src/commands/database/partition.rs +++ b/influxdb_iox/src/commands/database/partition.rs @@ -69,8 +69,7 @@ struct Persist { /// Compact Object Store Chunks /// -/// Errors if the chunks are not yet compacted and not contiguous. If successful it returns the -/// compacted chunk or None if no data after compacted due to rows eliminated from deletes and deduplication +/// Errors if the chunks are not yet compacted and not contiguous. #[derive(Debug, StructOpt)] struct CompactObjectStoreChunks { /// The name of the database @@ -173,8 +172,7 @@ enum Command { /// Compact Object Store Chunks /// - /// Errors if the chunks are not yet compacted and not contiguous. If successful it returns the - /// compacted chunk or None if no data after compacted due to rows eliminated from deletes and deduplication + /// Errors if the chunks are not yet compacted and not contiguous. CompactObjectStoreChunks(CompactObjectStoreChunks), /// Drop partition from memory and (if persisted) from object store. @@ -251,10 +249,11 @@ pub async fn command(connection: Connection, config: Config) -> Result<()> { .map(|chunk_id| chunk_id.as_bytes().to_vec().into()) .collect(); - client + let operation = client .compact_object_store_chunks(db_name, table_name, partition_key, chunk_ids) .await?; - println!("Ok"); + + serde_json::to_writer_pretty(std::io::stdout(), &operation)?; } Command::Drop(drop_partition) => { let DropPartition { diff --git a/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/management.rs b/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/management.rs index 0f1f5ed6b1..b14807d234 100644 --- a/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/management.rs +++ b/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/management.rs @@ -560,11 +560,15 @@ impl management_service_server::ManagementService for ManagementService { chunk_id_ids.push(chunk_id); } - db.compact_object_store_chunks(&table_name, &partition_key, chunk_id_ids) - .await + let tracker = db + .compact_object_store_chunks(&table_name, &partition_key, chunk_id_ids) .map_err(default_db_error_handler)?; - Ok(Response::new(CompactObjectStoreChunksResponse {})) + let operation = Some(super::operations::encode_tracker(tracker)?); + + Ok(Response::new(CompactObjectStoreChunksResponse { + operation, + })) } } diff --git a/influxdb_iox_client/src/client/management.rs b/influxdb_iox_client/src/client/management.rs index 25ade00eef..dff95cd63b 100644 --- a/influxdb_iox_client/src/client/management.rs +++ b/influxdb_iox_client/src/client/management.rs @@ -504,16 +504,13 @@ impl Client { table_name: impl Into + Send, partition_key: impl Into + Send, chunk_ids: Vec, - ) -> Result<(), Error> { + ) -> Result { let db_name = db_name.into(); let partition_key = partition_key.into(); let table_name = table_name.into(); - // let chunk_ids = chunk_ids - // .iter() - // .map(|chunk_id| chunk_id.as_bytes().to_vec()) - // .collect(); - self.inner + let response = self + .inner .compact_object_store_chunks(CompactObjectStoreChunksRequest { db_name, partition_key, @@ -522,6 +519,10 @@ impl Client { }) .await?; - Ok(()) + Ok(response + .into_inner() + .operation + .unwrap_field("operation")? + .try_into()?) } } diff --git a/server/src/db.rs b/server/src/db.rs index b43f58dbf4..6180532f1b 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -663,19 +663,15 @@ impl Db { } /// Compact all provided persisted chunks - pub async fn compact_object_store_chunks( + pub fn compact_object_store_chunks( self: &Arc, table_name: &str, partition_key: &str, chunk_ids: Vec, - ) -> Result>> { - if chunk_ids.is_empty() { - return Ok(None); - } - + ) -> Result> { // Use explicit scope to ensure the async generator doesn't // assume the locks have to possibly live across the `await` - let fut = { + let (tracker, fut) = { // Lock for read let partition = self.lockable_partition(table_name, partition_key)?; let partition = partition.read(); @@ -696,13 +692,15 @@ impl Db { let chunks = chunks.iter().map(|c| c.write()).collect(); // invoke compact - let (_, fut) = + let (tracker, fut) = lifecycle::compact_object_store::compact_object_store_chunks(partition, chunks) .context(LifecycleError)?; - fut + (tracker, fut) }; - fut.await.context(TaskCancelled)?.context(LifecycleError) + let _ = + tokio::spawn(async move { fut.await.context(TaskCancelled)?.context(LifecycleError) }); + Ok(tracker) } /// Persist given partition. From 7073691f0476941a5f68d14b89f23d9e626ee178 Mon Sep 17 00:00:00 2001 From: Nga Tran Date: Mon, 6 Dec 2021 22:25:33 -0500 Subject: [PATCH 3/3] test: grpc test for compaction os chunks --- .../tests/end_to_end_cases/management_api.rs | 175 ++++++++++++++++++ 1 file changed, 175 insertions(+) diff --git a/influxdb_iox/tests/end_to_end_cases/management_api.rs b/influxdb_iox/tests/end_to_end_cases/management_api.rs index f38e33c009..bfed8e74a7 100644 --- a/influxdb_iox/tests/end_to_end_cases/management_api.rs +++ b/influxdb_iox/tests/end_to_end_cases/management_api.rs @@ -1774,3 +1774,178 @@ async fn test_persist_partition_error() { as i32 ); } + +#[tokio::test] +async fn test_compact_os_chunks() { + use data_types::chunk_metadata::ChunkStorage; + + let fixture = ServerFixture::create_shared(ServerType::Database).await; + let mut write_client = fixture.write_client(); + let mut management_client = fixture.management_client(); + let mut operations_client = fixture.operations_client(); + + let db_name = rand_name(); + DatabaseBuilder::new(db_name.clone()) + .persist(true) + .persist_age_threshold_seconds(1_000) + .late_arrive_window_seconds(1) + .build(fixture.grpc_channel()) + .await; + + // Chunk 1 + let lp_lines = vec!["cpu,tag1=cupcakes bar=1 10", "cpu,tag1=cookies bar=2 10"]; + + let num_lines_written = write_client + .write_lp(&db_name, lp_lines.join("\n"), 0) + .await + .expect("write succeded"); + assert_eq!(num_lines_written, 2); + + wait_for_exact_chunk_states( + &fixture, + &db_name, + vec![ChunkStorage::OpenMutableBuffer], + std::time::Duration::from_secs(5), + ) + .await; + + let chunks = management_client + .list_chunks(&db_name) + .await + .expect("listing chunks"); + assert_eq!(chunks.len(), 1); + let partition_key = &chunks[0].partition_key; + + management_client + .persist_partition(&db_name, "cpu", &partition_key[..], true) + .await + .unwrap(); + + let chunks = management_client + .list_chunks(&db_name) + .await + .expect("listing chunks"); + assert_eq!(chunks.len(), 1); + assert_eq!( + chunks[0].storage, + generated_types::influxdata::iox::management::v1::ChunkStorage::ReadBufferAndObjectStore + as i32 + ); + + // chunk 2 + let lp_lines = vec![ + "cpu,tag1=cookies bar=2 20", + "cpu,tag1=cookies bar=3 30", // duplicate + "cpu,tag1=cupcakes bar=2 20", + ]; + + let num_lines_written = write_client + .write_lp(&db_name, lp_lines.join("\n"), 0) + .await + .expect("write succeded"); + assert_eq!(num_lines_written, 3); + + let chunks = management_client + .list_chunks(&db_name) + .await + .expect("listing chunks"); + assert_eq!(chunks.len(), 2); + let partition_key = &chunks[0].partition_key; + + management_client + .persist_partition(&db_name, "cpu", &partition_key[..], true) + .await + .unwrap(); + + let mut chunks = management_client + .list_chunks(&db_name) + .await + .expect("listing chunks"); + // ensure chunk in deterministic order + chunks.sort_by(|c1, c2| c1.id.cmp(&c2.id)); + assert_eq!(chunks.len(), 2); + assert_eq!( + chunks[0].storage, + generated_types::influxdata::iox::management::v1::ChunkStorage::ReadBufferAndObjectStore + as i32 + ); + assert_eq!( + chunks[1].storage, + generated_types::influxdata::iox::management::v1::ChunkStorage::ReadBufferAndObjectStore + as i32 + ); + + let chunk_id_1 = chunks[0].id.clone(); + let partition_key_1 = &chunks[0].partition_key; + let chunk_id_2 = chunks[1].id.clone(); + let partition_key_2 = &chunks[1].partition_key; + assert_eq!(partition_key_1, partition_key_2); + + // unload both RUBs + management_client + .unload_partition_chunk(&db_name, "cpu", &partition_key_1[..], chunk_id_1.clone()) + .await + .unwrap(); + management_client + .unload_partition_chunk(&db_name, "cpu", &partition_key_2[..], chunk_id_2.clone()) + .await + .unwrap(); + + // verify chunk status again + let chunks = management_client + .list_chunks(&db_name) + .await + .expect("listing chunks"); + assert_eq!(chunks.len(), 2); + assert_eq!( + chunks[0].storage, + generated_types::influxdata::iox::management::v1::ChunkStorage::ObjectStoreOnly as i32 + ); + assert_eq!( + chunks[1].storage, + generated_types::influxdata::iox::management::v1::ChunkStorage::ObjectStoreOnly as i32 + ); + + // Compact 2 chunks + let iox_operation = management_client + .compact_object_store_chunks( + &db_name, + "cpu", + &partition_key_1[..], + vec![chunk_id_1.clone(), chunk_id_2.clone()], + ) + .await + .unwrap(); + + let operation_id = iox_operation.operation.id(); + + // ensure we got a legit job description back + match iox_operation.metadata.job { + Some(Job::CompactObjectStoreChunks(job)) => { + assert_eq!(&job.db_name, &db_name); + assert_eq!(job.partition_key.as_str(), partition_key_1); + assert_eq!(job.table_name.as_str(), "cpu"); + } + job => panic!("unexpected job returned {:#?}", job), + } + + // wait for the job to be done + operations_client + .wait_operation(operation_id, Some(std::time::Duration::from_secs(1))) + .await + .expect("failed to wait operation"); + + // verify chunks after compaction + let chunks = management_client + .list_chunks(&db_name) + .await + .expect("listing chunks"); + assert_eq!(chunks.len(), 1); + assert_eq!( + chunks[0].storage, + generated_types::influxdata::iox::management::v1::ChunkStorage::ObjectStoreOnly as i32 + ); + let new_chunk_id = chunks[0].id.clone(); + assert_ne!(new_chunk_id, chunk_id_1); + assert_ne!(new_chunk_id, chunk_id_2); +}