Merge pull request #3296 from influxdata/ntran/grpc_compact_os_chunks

feat: grpc call for compact object store chunks
pull/24376/head
kodiakhq[bot] 2021-12-07 03:35:04 +00:00 committed by GitHub
commit dca1dd1d14
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 333 additions and 14 deletions

View File

@ -85,6 +85,7 @@ fn generate_grpc_types(root: &Path) -> Result<()> {
".influxdata.iox.management.v1.ClosePartitionChunkRequest.chunk_id",
".influxdata.iox.management.v1.CompactChunks.chunks",
".influxdata.iox.management.v1.CompactObjectStoreChunks.chunks",
".influxdata.iox.management.v1.CompactObjectStoreChunksRequest.chunk_ids",
".influxdata.iox.management.v1.DropChunk.chunk_id",
".influxdata.iox.management.v1.PersistChunks.chunks",
".influxdata.iox.management.v1.WriteChunk.chunk_id",

View File

@ -90,6 +90,11 @@ service ManagementService {
// Drop partition from memory and (if persisted) from object store.
rpc DropPartition(DropPartitionRequest) returns (DropPartitionResponse);
// Compact given object store chunks
//
// Errors if the chunks are not compacted yet and not contiguous
rpc CompactObjectStoreChunks(CompactObjectStoreChunksRequest) returns (CompactObjectStoreChunksResponse);
}
message ListDatabasesRequest {
@ -472,3 +477,24 @@ message DropPartitionRequest {
message DropPartitionResponse {
}
// Request to commpact given object store chunks
message CompactObjectStoreChunksRequest {
// the name of the database
string db_name = 1;
// the partition key
string partition_key = 2;
// the table name
string table_name = 3;
// the chunk ids
repeated bytes chunk_ids = 4;
}
message CompactObjectStoreChunksResponse {
// The operation that tracks the work for compacting object store chunks
google.longrunning.Operation operation = 1;
}

View File

@ -67,6 +67,24 @@ struct Persist {
force: bool,
}
/// Compact Object Store Chunks
///
/// Errors if the chunks are not yet compacted and not contiguous.
#[derive(Debug, StructOpt)]
struct CompactObjectStoreChunks {
/// The name of the database
db_name: String,
/// The partition key
partition_key: String,
/// The table name
table_name: String,
/// The chunk ids
chunk_ids: Vec<Uuid>,
}
/// lists all chunks in this partition
#[derive(Debug, StructOpt)]
struct ListChunks {
@ -152,6 +170,11 @@ enum Command {
/// chunk that contains the persisted data.
Persist(Persist),
/// Compact Object Store Chunks
///
/// Errors if the chunks are not yet compacted and not contiguous.
CompactObjectStoreChunks(CompactObjectStoreChunks),
/// Drop partition from memory and (if persisted) from object store.
Drop(DropPartition),
@ -213,6 +236,25 @@ pub async fn command(connection: Connection, config: Config) -> Result<()> {
.await?;
println!("Ok");
}
Command::CompactObjectStoreChunks(compact) => {
let CompactObjectStoreChunks {
db_name,
partition_key,
table_name,
chunk_ids,
} = compact;
let chunk_ids = chunk_ids
.iter()
.map(|chunk_id| chunk_id.as_bytes().to_vec().into())
.collect();
let operation = client
.compact_object_store_chunks(db_name, table_name, partition_key, chunk_ids)
.await?;
serde_json::to_writer_pretty(std::io::stdout(), &operation)?;
}
Command::Drop(drop_partition) => {
let DropPartition {
db_name,

View File

@ -534,6 +534,42 @@ impl management_service_server::ManagementService for ManagementService {
Ok(Response::new(DropPartitionResponse {}))
}
async fn compact_object_store_chunks(
&self,
request: Request<CompactObjectStoreChunksRequest>,
) -> Result<Response<CompactObjectStoreChunksResponse>, Status> {
let CompactObjectStoreChunksRequest {
db_name,
partition_key,
table_name,
chunk_ids,
} = request.into_inner();
// Validate that the database name is legit
let db_name = DatabaseName::new(db_name).scope("db_name")?;
let db = self
.server
.db(&db_name)
.map_err(default_server_error_handler)?;
let mut chunk_id_ids = vec![];
for chunk_id in chunk_ids {
let chunk_id = ChunkId::try_from(chunk_id).scope("chunk_id")?;
chunk_id_ids.push(chunk_id);
}
let tracker = db
.compact_object_store_chunks(&table_name, &partition_key, chunk_id_ids)
.map_err(default_db_error_handler)?;
let operation = Some(super::operations::encode_tracker(tracker)?);
Ok(Response::new(CompactObjectStoreChunksResponse {
operation,
}))
}
}
/// Returns [`DatabaseRules`] formated according to the `omit_defaults` flag. If `omit_defaults` is

View File

@ -1774,3 +1774,178 @@ async fn test_persist_partition_error() {
as i32
);
}
#[tokio::test]
async fn test_compact_os_chunks() {
use data_types::chunk_metadata::ChunkStorage;
let fixture = ServerFixture::create_shared(ServerType::Database).await;
let mut write_client = fixture.write_client();
let mut management_client = fixture.management_client();
let mut operations_client = fixture.operations_client();
let db_name = rand_name();
DatabaseBuilder::new(db_name.clone())
.persist(true)
.persist_age_threshold_seconds(1_000)
.late_arrive_window_seconds(1)
.build(fixture.grpc_channel())
.await;
// Chunk 1
let lp_lines = vec!["cpu,tag1=cupcakes bar=1 10", "cpu,tag1=cookies bar=2 10"];
let num_lines_written = write_client
.write_lp(&db_name, lp_lines.join("\n"), 0)
.await
.expect("write succeded");
assert_eq!(num_lines_written, 2);
wait_for_exact_chunk_states(
&fixture,
&db_name,
vec![ChunkStorage::OpenMutableBuffer],
std::time::Duration::from_secs(5),
)
.await;
let chunks = management_client
.list_chunks(&db_name)
.await
.expect("listing chunks");
assert_eq!(chunks.len(), 1);
let partition_key = &chunks[0].partition_key;
management_client
.persist_partition(&db_name, "cpu", &partition_key[..], true)
.await
.unwrap();
let chunks = management_client
.list_chunks(&db_name)
.await
.expect("listing chunks");
assert_eq!(chunks.len(), 1);
assert_eq!(
chunks[0].storage,
generated_types::influxdata::iox::management::v1::ChunkStorage::ReadBufferAndObjectStore
as i32
);
// chunk 2
let lp_lines = vec![
"cpu,tag1=cookies bar=2 20",
"cpu,tag1=cookies bar=3 30", // duplicate
"cpu,tag1=cupcakes bar=2 20",
];
let num_lines_written = write_client
.write_lp(&db_name, lp_lines.join("\n"), 0)
.await
.expect("write succeded");
assert_eq!(num_lines_written, 3);
let chunks = management_client
.list_chunks(&db_name)
.await
.expect("listing chunks");
assert_eq!(chunks.len(), 2);
let partition_key = &chunks[0].partition_key;
management_client
.persist_partition(&db_name, "cpu", &partition_key[..], true)
.await
.unwrap();
let mut chunks = management_client
.list_chunks(&db_name)
.await
.expect("listing chunks");
// ensure chunk in deterministic order
chunks.sort_by(|c1, c2| c1.id.cmp(&c2.id));
assert_eq!(chunks.len(), 2);
assert_eq!(
chunks[0].storage,
generated_types::influxdata::iox::management::v1::ChunkStorage::ReadBufferAndObjectStore
as i32
);
assert_eq!(
chunks[1].storage,
generated_types::influxdata::iox::management::v1::ChunkStorage::ReadBufferAndObjectStore
as i32
);
let chunk_id_1 = chunks[0].id.clone();
let partition_key_1 = &chunks[0].partition_key;
let chunk_id_2 = chunks[1].id.clone();
let partition_key_2 = &chunks[1].partition_key;
assert_eq!(partition_key_1, partition_key_2);
// unload both RUBs
management_client
.unload_partition_chunk(&db_name, "cpu", &partition_key_1[..], chunk_id_1.clone())
.await
.unwrap();
management_client
.unload_partition_chunk(&db_name, "cpu", &partition_key_2[..], chunk_id_2.clone())
.await
.unwrap();
// verify chunk status again
let chunks = management_client
.list_chunks(&db_name)
.await
.expect("listing chunks");
assert_eq!(chunks.len(), 2);
assert_eq!(
chunks[0].storage,
generated_types::influxdata::iox::management::v1::ChunkStorage::ObjectStoreOnly as i32
);
assert_eq!(
chunks[1].storage,
generated_types::influxdata::iox::management::v1::ChunkStorage::ObjectStoreOnly as i32
);
// Compact 2 chunks
let iox_operation = management_client
.compact_object_store_chunks(
&db_name,
"cpu",
&partition_key_1[..],
vec![chunk_id_1.clone(), chunk_id_2.clone()],
)
.await
.unwrap();
let operation_id = iox_operation.operation.id();
// ensure we got a legit job description back
match iox_operation.metadata.job {
Some(Job::CompactObjectStoreChunks(job)) => {
assert_eq!(&job.db_name, &db_name);
assert_eq!(job.partition_key.as_str(), partition_key_1);
assert_eq!(job.table_name.as_str(), "cpu");
}
job => panic!("unexpected job returned {:#?}", job),
}
// wait for the job to be done
operations_client
.wait_operation(operation_id, Some(std::time::Duration::from_secs(1)))
.await
.expect("failed to wait operation");
// verify chunks after compaction
let chunks = management_client
.list_chunks(&db_name)
.await
.expect("listing chunks");
assert_eq!(chunks.len(), 1);
assert_eq!(
chunks[0].storage,
generated_types::influxdata::iox::management::v1::ChunkStorage::ObjectStoreOnly as i32
);
let new_chunk_id = chunks[0].id.clone();
assert_ne!(new_chunk_id, chunk_id_1);
assert_ne!(new_chunk_id, chunk_id_2);
}

View File

@ -503,4 +503,35 @@ impl Client {
Ok(())
}
/// Compact given object store chunks (db, table, partition, chunks)
///
/// Error if the chunks are not yet compacted and not contiguous
pub async fn compact_object_store_chunks(
&mut self,
db_name: impl Into<String> + Send,
table_name: impl Into<String> + Send,
partition_key: impl Into<String> + Send,
chunk_ids: Vec<Bytes>,
) -> Result<IoxOperation, Error> {
let db_name = db_name.into();
let partition_key = partition_key.into();
let table_name = table_name.into();
let response = self
.inner
.compact_object_store_chunks(CompactObjectStoreChunksRequest {
db_name,
partition_key,
table_name,
chunk_ids,
})
.await?;
Ok(response
.into_inner()
.operation
.unwrap_field("operation")?
.try_into()?)
}
}

View File

@ -663,36 +663,44 @@ impl Db {
}
/// Compact all provided persisted chunks
pub async fn compact_object_store_chunks(
pub fn compact_object_store_chunks(
self: &Arc<Self>,
table_name: &str,
partition_key: &str,
chunk_ids: Vec<ChunkId>,
) -> Result<Option<Arc<DbChunk>>> {
if chunk_ids.is_empty() {
return Ok(None);
}
) -> Result<TaskTracker<Job>> {
// Use explicit scope to ensure the async generator doesn't
// assume the locks have to possibly live across the `await`
let fut = {
let (tracker, fut) = {
// Lock for read
let partition = self.lockable_partition(table_name, partition_key)?;
let partition = partition.read();
let mut chunks = vec![];
for chunk_id in chunk_ids {
let chunk = LockablePartition::chunk(&partition, chunk_id).ok_or(
catalog::Error::ChunkNotFound {
chunk_id,
partition: partition_key.to_string(),
table: table_name.to_string(),
},
)?;
chunks.push(chunk);
}
// todo: set these chunks
let chunks = vec![];
// Lock partition for write
// Lock for write
let partition = partition.upgrade();
let chunks = chunks.iter().map(|c| c.write()).collect();
// invoke compact
let (_, fut) =
let (tracker, fut) =
lifecycle::compact_object_store::compact_object_store_chunks(partition, chunks)
.context(LifecycleError)?;
fut
(tracker, fut)
};
fut.await.context(TaskCancelled)?.context(LifecycleError)
let _ =
tokio::spawn(async move { fut.await.context(TaskCancelled)?.context(LifecycleError) });
Ok(tracker)
}
/// Persist given partition.