Merge pull request #3345 from influxdata/ntran/compact_os_partition

feat: compact all object store chunks of a given partition
pull/24376/head
kodiakhq[bot] 2021-12-09 20:50:17 +00:00 committed by GitHub
commit 7a56b4847a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 365 additions and 141 deletions

View File

@ -42,6 +42,7 @@ message OperationMetadata {
CompactObjectStoreChunks compact_object_store_chunks = 18;
LoadReadBufferChunk load_read_buffer_chunk = 19;
RebuildPreservedCatalog rebuild_preserved_catalog = 20;
CompactObjectStorePartition compact_object_store_partition = 21;
}
}
@ -110,6 +111,18 @@ message CompactObjectStoreChunks {
repeated bytes chunks = 4;
}
// Compact OS chunks of a partition into a single chunk
message CompactObjectStorePartition {
// name of the database
string db_name = 1;
// partition key
string partition_key = 2;
// table name
string table_name = 3;
}
// Split and write chunks to object store
message PersistChunks {

View File

@ -95,6 +95,10 @@ service ManagementService {
//
// Errors if the chunks are not compacted yet and not contiguous
rpc CompactObjectStoreChunks(CompactObjectStoreChunksRequest) returns (CompactObjectStoreChunksResponse);
// Compact all object store chunks of a given partition
//
rpc CompactObjectStorePartition(CompactObjectStorePartitionRequest) returns (CompactObjectStorePartitionResponse);
}
message ListDatabasesRequest {
@ -505,3 +509,19 @@ message CompactObjectStoreChunksResponse {
google.longrunning.Operation operation = 1;
}
// Request to commpact all object store of a given partition
message CompactObjectStorePartitionRequest {
// the name of the database
string db_name = 1;
// the partition key
string partition_key = 2;
// the table name
string table_name = 3;
}
message CompactObjectStorePartitionResponse {
// The operation that tracks the work for compacting object store chunks
google.longrunning.Operation operation = 1;
}

View File

@ -20,6 +20,10 @@ impl management::operation_metadata::Job {
db_name,
..
}) => db_name,
Self::CompactObjectStorePartition(management::CompactObjectStorePartition {
db_name,
..
}) => db_name,
}
}
}

View File

@ -85,6 +85,19 @@ struct CompactObjectStoreChunks {
chunk_ids: Vec<Uuid>,
}
/// Compact all Object Store Chunks of a partition
#[derive(Debug, StructOpt)]
struct CompactObjectStorePartition {
/// The name of the database
db_name: String,
/// The partition key
partition_key: String,
/// The table name
table_name: String,
}
/// lists all chunks in this partition
#[derive(Debug, StructOpt)]
struct ListChunks {
@ -175,6 +188,9 @@ enum Command {
/// Errors if the chunks are not yet compacted and not contiguous.
CompactObjectStoreChunks(CompactObjectStoreChunks),
/// Compact all object store chunks of a given partition
CompactObjectStorePartition(CompactObjectStorePartition),
/// Drop partition from memory and (if persisted) from object store.
Drop(DropPartition),
@ -255,6 +271,19 @@ pub async fn command(connection: Connection, config: Config) -> Result<()> {
serde_json::to_writer_pretty(std::io::stdout(), &operation)?;
}
Command::CompactObjectStorePartition(compact) => {
let CompactObjectStorePartition {
db_name,
partition_key,
table_name,
} = compact;
let operation = client
.compact_object_store_partition(db_name, table_name, partition_key)
.await?;
serde_json::to_writer_pretty(std::io::stdout(), &operation)?;
}
Command::Drop(drop_partition) => {
let DropPartition {
db_name,

View File

@ -535,6 +535,7 @@ impl management_service_server::ManagementService for ManagementService {
Ok(Response::new(DropPartitionResponse {}))
}
/// Compact all given object store chunks
async fn compact_object_store_chunks(
&self,
request: Request<CompactObjectStoreChunksRequest>,
@ -570,6 +571,36 @@ impl management_service_server::ManagementService for ManagementService {
operation,
}))
}
// Compact all object store chunks of the given partition
async fn compact_object_store_partition(
&self,
request: Request<CompactObjectStorePartitionRequest>,
) -> Result<Response<CompactObjectStorePartitionResponse>, Status> {
let CompactObjectStorePartitionRequest {
db_name,
partition_key,
table_name,
} = request.into_inner();
// Validate that the database name is legit
let db_name = DatabaseName::new(db_name).scope("db_name")?;
let db = self
.server
.db(&db_name)
.map_err(default_server_error_handler)?;
let tracker = db
.compact_object_store_partition(&table_name, &partition_key)
.map_err(default_db_error_handler)?;
let operation = Some(super::operations::encode_tracker(tracker)?);
Ok(Response::new(CompactObjectStorePartitionResponse {
operation,
}))
}
}
/// Returns [`DatabaseRules`] formated according to the `omit_defaults` flag. If `omit_defaults` is

View File

@ -1,10 +1,14 @@
use crate::{
common::server_fixture::{ServerFixture, ServerType, TestConfig, DEFAULT_SERVER_ID},
end_to_end_cases::scenario::{
create_readable_database, create_two_partition_database, create_unreadable_database,
fixture_broken_catalog, rand_name, wait_for_exact_chunk_states, DatabaseBuilder,
end_to_end_cases::{
management_cli::setup_load_and_persist_two_partition_chunks,
scenario::{
create_readable_database, create_two_partition_database, create_unreadable_database,
fixture_broken_catalog, rand_name, wait_for_exact_chunk_states, DatabaseBuilder,
},
},
};
use bytes::Bytes;
use data_types::chunk_metadata::ChunkId;
use generated_types::google::protobuf::{Duration, Empty};
use influxdb_iox_client::{
@ -1741,143 +1745,24 @@ async fn test_persist_partition_error() {
#[tokio::test]
async fn test_compact_os_chunks() {
use data_types::chunk_metadata::ChunkStorage;
let fixture = ServerFixture::create_shared(ServerType::Database).await;
let mut write_client = fixture.write_client();
// Make 2 persisted chunks for a partition
let (fixture, db_name, _addr, chunk_ids) = setup_load_and_persist_two_partition_chunks().await;
assert!(chunk_ids.len() > 1);
let mut management_client = fixture.management_client();
let mut operations_client = fixture.operations_client();
let db_name = rand_name();
DatabaseBuilder::new(db_name.clone())
.persist(true)
.persist_age_threshold_seconds(1_000)
.late_arrive_window_seconds(1)
.build(fixture.grpc_channel())
.await;
let c_ids: Vec<Bytes> = chunk_ids
.iter()
.map(|id| {
let id_uuid = Uuid::parse_str(id).unwrap();
id_uuid.as_bytes().to_vec().into()
})
.collect();
// Chunk 1
let lp_lines = vec!["cpu,tag1=cupcakes bar=1 10", "cpu,tag1=cookies bar=2 10"];
let num_lines_written = write_client
.write_lp(&db_name, lp_lines.join("\n"), 0)
.await
.expect("write succeded");
assert_eq!(num_lines_written, 2);
wait_for_exact_chunk_states(
&fixture,
&db_name,
vec![ChunkStorage::OpenMutableBuffer],
std::time::Duration::from_secs(5),
)
.await;
let chunks = management_client
.list_chunks(&db_name)
.await
.expect("listing chunks");
assert_eq!(chunks.len(), 1);
let partition_key = &chunks[0].partition_key;
management_client
.persist_partition(&db_name, "cpu", &partition_key[..], true)
.await
.unwrap();
let chunks = management_client
.list_chunks(&db_name)
.await
.expect("listing chunks");
assert_eq!(chunks.len(), 1);
assert_eq!(
chunks[0].storage,
generated_types::influxdata::iox::management::v1::ChunkStorage::ReadBufferAndObjectStore
as i32
);
// chunk 2
let lp_lines = vec![
"cpu,tag1=cookies bar=2 20",
"cpu,tag1=cookies bar=3 30", // duplicate
"cpu,tag1=cupcakes bar=2 20",
];
let num_lines_written = write_client
.write_lp(&db_name, lp_lines.join("\n"), 0)
.await
.expect("write succeded");
assert_eq!(num_lines_written, 3);
let chunks = management_client
.list_chunks(&db_name)
.await
.expect("listing chunks");
assert_eq!(chunks.len(), 2);
let partition_key = &chunks[0].partition_key;
management_client
.persist_partition(&db_name, "cpu", &partition_key[..], true)
.await
.unwrap();
let mut chunks = management_client
.list_chunks(&db_name)
.await
.expect("listing chunks");
// ensure chunk in deterministic order
chunks.sort_by(|c1, c2| c1.id.cmp(&c2.id));
assert_eq!(chunks.len(), 2);
assert_eq!(
chunks[0].storage,
generated_types::influxdata::iox::management::v1::ChunkStorage::ReadBufferAndObjectStore
as i32
);
assert_eq!(
chunks[1].storage,
generated_types::influxdata::iox::management::v1::ChunkStorage::ReadBufferAndObjectStore
as i32
);
let chunk_id_1 = chunks[0].id.clone();
let partition_key_1 = &chunks[0].partition_key;
let chunk_id_2 = chunks[1].id.clone();
let partition_key_2 = &chunks[1].partition_key;
assert_eq!(partition_key_1, partition_key_2);
// unload both RUBs
management_client
.unload_partition_chunk(&db_name, "cpu", &partition_key_1[..], chunk_id_1.clone())
.await
.unwrap();
management_client
.unload_partition_chunk(&db_name, "cpu", &partition_key_2[..], chunk_id_2.clone())
.await
.unwrap();
// verify chunk status again
let chunks = management_client
.list_chunks(&db_name)
.await
.expect("listing chunks");
assert_eq!(chunks.len(), 2);
assert_eq!(
chunks[0].storage,
generated_types::influxdata::iox::management::v1::ChunkStorage::ObjectStoreOnly as i32
);
assert_eq!(
chunks[1].storage,
generated_types::influxdata::iox::management::v1::ChunkStorage::ObjectStoreOnly as i32
);
// Compact 2 chunks
// Compact all 2 OS chunks of the partition
// note that both partition and table_name are "cpu" in the setup
let iox_operation = management_client
.compact_object_store_chunks(
&db_name,
"cpu",
&partition_key_1[..],
vec![chunk_id_1.clone(), chunk_id_2.clone()],
)
.compact_object_store_chunks(&db_name, "cpu", "cpu", c_ids.clone())
.await
.unwrap();
@ -1887,7 +1772,7 @@ async fn test_compact_os_chunks() {
match iox_operation.metadata.job {
Some(Job::CompactObjectStoreChunks(job)) => {
assert_eq!(&job.db_name, &db_name);
assert_eq!(job.partition_key.as_str(), partition_key_1);
assert_eq!(job.partition_key.as_str(), "cpu");
assert_eq!(job.table_name.as_str(), "cpu");
}
job => panic!("unexpected job returned {:#?}", job),
@ -1910,6 +1795,55 @@ async fn test_compact_os_chunks() {
generated_types::influxdata::iox::management::v1::ChunkStorage::ObjectStoreOnly as i32
);
let new_chunk_id = chunks[0].id.clone();
assert_ne!(new_chunk_id, chunk_id_1);
assert_ne!(new_chunk_id, chunk_id_2);
assert_ne!(new_chunk_id, c_ids[0]);
assert_ne!(new_chunk_id, c_ids[1]);
}
#[tokio::test]
async fn test_compact_os_partition() {
// Make 2 persisted chunks for a partition
let (fixture, db_name, _addr, chunk_ids) = setup_load_and_persist_two_partition_chunks().await;
let mut management_client = fixture.management_client();
let mut operations_client = fixture.operations_client();
// Compact all 2 OS chunks of the partition
// note that both partition and table_name are "cpu" in the setup
let iox_operation = management_client
.compact_object_store_partition(&db_name, "cpu", "cpu")
.await
.unwrap();
let operation_id = iox_operation.operation.id();
// ensure we got a legit job description back
// note that since compact_object_store_partition invokes compact_object_store_chunks,
// its job is recorded as CompactObjectStoreChunks
match iox_operation.metadata.job {
Some(Job::CompactObjectStoreChunks(job)) => {
assert_eq!(&job.db_name, &db_name);
assert_eq!(job.partition_key.as_str(), "cpu");
assert_eq!(job.table_name.as_str(), "cpu");
}
job => panic!("unexpected job returned {:#?}", job),
}
// wait for the job to be done
operations_client
.wait_operation(operation_id, Some(std::time::Duration::from_secs(1)))
.await
.expect("failed to wait operation");
// verify chunks after compaction
let chunks = management_client
.list_chunks(&db_name)
.await
.expect("listing chunks");
assert_eq!(chunks.len(), 1);
assert_eq!(
chunks[0].storage,
generated_types::influxdata::iox::management::v1::ChunkStorage::ObjectStoreOnly as i32
);
let new_chunk_id = chunks[0].id.clone();
assert_ne!(new_chunk_id, chunk_ids[0]);
assert_ne!(new_chunk_id, chunk_ids[1]);
}

View File

@ -3,7 +3,7 @@ use crate::{
common::server_fixture::{ServerFixture, ServerType},
end_to_end_cases::scenario::{
fixture_broken_catalog, fixture_replay_broken, list_chunks, wait_for_exact_chunk_states,
DatabaseBuilder,
wait_for_operations_to_complete, DatabaseBuilder,
},
};
use assert_cmd::Command;
@ -1547,3 +1547,138 @@ async fn test_persist_partition_error() {
"Cannot persist partition because it cannot be flushed at the moment",
));
}
#[tokio::test]
async fn test_compact_os_partition() {
// Make 2 persisted chunks for a partition
let (fixture, db_name, addr, _chunk_ids) = setup_load_and_persist_two_partition_chunks().await;
// Compact the partition which will compact those 2 chunks
let iox_operation: IoxOperation = serde_json::from_slice(
&Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("database")
.arg("partition")
.arg("compact-object-store-partition")
.arg(&db_name)
.arg("cpu") // partition key
.arg("cpu") // table name
//.arg(chunk_ids)
.arg("--host")
.arg(addr)
.assert()
.success()
.get_output()
.stdout,
)
.expect("Expected JSON output");
// Ensure we got a legit job description back
match iox_operation.metadata.job {
Some(Job::CompactObjectStoreChunks(job)) => {
assert_eq!(job.chunks.len(), 2);
assert_eq!(&job.db_name, &db_name);
assert_eq!(job.partition_key.as_str(), "cpu");
assert_eq!(job.table_name.as_str(), "cpu");
}
job => panic!("unexpected job returned {:#?}", job),
}
// Wait for the compaction to complete
wait_for_operations_to_complete(&fixture, &db_name, Duration::from_secs(5)).await;
// Verify chunk the DB now only has one OS-only chunk
let chunks = list_chunks(&fixture, &db_name).await;
assert_eq!(chunks.len(), 1);
assert_eq!(chunks[0].storage, ChunkStorage::ObjectStoreOnly);
}
#[tokio::test]
async fn test_compact_os_chunks() {
// Make 2 persisted chunks for a partition
let (fixture, db_name, addr, chunk_ids) = setup_load_and_persist_two_partition_chunks().await;
// Compact the partition which will compact those 2 chunks
let iox_operation: IoxOperation = serde_json::from_slice(
&Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("database")
.arg("partition")
.arg("compact-object-store-chunks")
.arg(&db_name)
.arg("cpu") // partition key
.arg("cpu") // table name
.arg(chunk_ids[0].clone())
.arg(chunk_ids[1].clone())
.arg("--host")
.arg(addr)
.assert()
.success()
.get_output()
.stdout,
)
.expect("Expected JSON output");
// Ensure we got a legit job description back
match iox_operation.metadata.job {
Some(Job::CompactObjectStoreChunks(job)) => {
assert_eq!(job.chunks.len(), 2);
assert_eq!(&job.db_name, &db_name);
assert_eq!(job.partition_key.as_str(), "cpu");
assert_eq!(job.table_name.as_str(), "cpu");
}
job => panic!("unexpected job returned {:#?}", job),
}
// Wait for the compaction to complete
wait_for_operations_to_complete(&fixture, &db_name, Duration::from_secs(5)).await;
// Verify chunk the DB now only has one OS-only chunk
let chunks = list_chunks(&fixture, &db_name).await;
assert_eq!(chunks.len(), 1);
assert_eq!(chunks[0].storage, ChunkStorage::ObjectStoreOnly);
}
pub async fn setup_load_and_persist_two_partition_chunks(
) -> (Arc<ServerFixture>, String, String, Vec<String>) {
let fixture = Arc::from(ServerFixture::create_shared(ServerType::Database).await);
let addr = fixture.grpc_base();
let db_name = rand_name();
DatabaseBuilder::new(db_name.clone())
.persist(true)
.persist_age_threshold_seconds(1)
.late_arrive_window_seconds(1)
.build(fixture.grpc_channel())
.await;
// Load first chunk and wait for it to get persisted
let lp_data = vec!["cpu,region=west user=23.2 10"];
load_lp(addr, &db_name, lp_data);
wait_for_exact_chunk_states(
&fixture,
&db_name,
vec![ChunkStorage::ReadBufferAndObjectStore],
std::time::Duration::from_secs(10),
)
.await;
// Load second chunk and wait for it to get persisted, too
let lp_data = vec!["cpu,region=east user=79 30"];
load_lp(addr, &db_name, lp_data);
let chunks = wait_for_exact_chunk_states(
&fixture,
&db_name,
vec![
ChunkStorage::ReadBufferAndObjectStore,
ChunkStorage::ReadBufferAndObjectStore,
],
std::time::Duration::from_secs(10),
)
.await;
// collect chunk ids
let chunk_ids: Vec<_> = chunks.iter().map(|c| c.id.get().to_string()).collect();
(Arc::clone(&fixture), db_name, String::from(addr), chunk_ids)
}

View File

@ -535,4 +535,31 @@ impl Client {
.unwrap_field("operation")?
.try_into()?)
}
/// Compact all object store of a give partition
pub async fn compact_object_store_partition(
&mut self,
db_name: impl Into<String> + Send,
table_name: impl Into<String> + Send,
partition_key: impl Into<String> + Send,
) -> Result<IoxOperation, Error> {
let db_name = db_name.into();
let partition_key = partition_key.into();
let table_name = table_name.into();
let response = self
.inner
.compact_object_store_partition(CompactObjectStorePartitionRequest {
db_name,
partition_key,
table_name,
})
.await?;
Ok(response
.into_inner()
.operation
.unwrap_field("operation")?
.try_into()?)
}
}

View File

@ -675,6 +675,37 @@ impl Db {
fut.await.context(TaskCancelled)?.context(LifecycleError)
}
/// Compact all persisted chunks in this partition
/// Return error if the persisted chunks are not contiguous. This means
/// there are chunks in between those OS chunks are not yet persisted
pub fn compact_object_store_partition(
self: &Arc<Self>,
table_name: &str,
partition_key: &str,
) -> Result<TaskTracker<Job>> {
// acquire partition read lock to get OS chunk ids
let partition = self.lockable_partition(table_name, partition_key)?;
let partition = partition.read();
let chunks = partition.chunks();
// Get all OS chunk IDs
let mut chunk_ids = vec![];
for chunk in chunks {
let chunk = chunk.read();
if chunk.is_persisted() {
chunk_ids.push(chunk.id());
}
}
// drop partition lock
partition.into_data();
// Compact all the OS chunks
// Error will return if those OS chunks are not contiguous which means
// a chunk in between those OS chunks are not yet persisted
self.compact_object_store_chunks(table_name, partition_key, chunk_ids)
}
/// Compact all provided persisted chunks
pub fn compact_object_store_chunks(
self: &Arc<Self>,
@ -2135,7 +2166,6 @@ mod tests {
load_parquet_from_store_for_path(&path_list[0], Arc::clone(&db.iox_object_store))
.await
.unwrap();
let parquet_metadata = IoxParquetMetaData::from_file_bytes(parquet_data.clone())
.unwrap()
.unwrap();

View File

@ -690,7 +690,8 @@ mod tests {
let partition = partition.upgrade();
let chunk1 = chunks[0].write();
let chunk2 = chunks[1].write();
let _compacted_chunk = compact_object_store_chunks(partition, vec![chunk1, chunk2])
// Provide the chunk ids in reverse contiguous order to see if we handle it well
let _compacted_chunk = compact_object_store_chunks(partition, vec![chunk2, chunk1])
.unwrap()
.1
.await