feat: load RUB from object store (#3224) (#3250)

pull/24376/head
Raphael Taylor-Davies 2021-11-30 14:39:52 +00:00 committed by GitHub
parent 20da62eb87
commit 1e515a1dec
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 173 additions and 1 deletions

View File

@ -89,6 +89,7 @@ fn generate_grpc_types(root: &Path) -> Result<()> {
".influxdata.iox.management.v1.PersistChunks.chunks",
".influxdata.iox.management.v1.WriteChunk.chunk_id",
".influxdata.iox.management.v1.LoadReadBufferChunk.chunk_id",
".influxdata.iox.management.v1.LoadPartitionChunkRequest.chunk_id",
".influxdata.iox.management.v1.UnloadPartitionChunkRequest.chunk_id",
".influxdata.iox.preserved_catalog.v1.AddParquet.metadata",
".influxdata.iox.preserved_catalog.v1.ChunkAddr.chunk_id",

View File

@ -67,6 +67,9 @@ service ManagementService {
// Unload chunk from read buffer but keep it in object store
rpc UnloadPartitionChunk(UnloadPartitionChunkRequest) returns (UnloadPartitionChunkResponse);
// Load chunk from object store into read buffer
rpc LoadPartitionChunk(LoadPartitionChunkRequest) returns (LoadPartitionChunkResponse);
// Get server status
rpc GetServerStatus(GetServerStatusRequest) returns (GetServerStatusResponse);
@ -300,6 +303,28 @@ message UnloadPartitionChunkRequest {
message UnloadPartitionChunkResponse {
}
// Request to unload chunk from read buffer but keep it in object store
message LoadPartitionChunkRequest {
// the name of the database
string db_name = 1;
// the partition key
string partition_key = 2;
// the table name
string table_name = 3;
// the chunk id
//
// UUID is stored as 16 bytes in big-endian order.
bytes chunk_id = 4;
}
message LoadPartitionChunkResponse {
// The operation that tracks the work for loading the chunk
google.longrunning.Operation operation = 1;
}
message GetServerStatusRequest {}
message GetServerStatusResponse {
// Server status.

View File

@ -370,6 +370,35 @@ impl management_service_server::ManagementService for ManagementService {
Ok(Response::new(UnloadPartitionChunkResponse {}))
}
async fn load_partition_chunk(
&self,
request: tonic::Request<LoadPartitionChunkRequest>,
) -> Result<tonic::Response<LoadPartitionChunkResponse>, tonic::Status> {
let LoadPartitionChunkRequest {
db_name,
partition_key,
table_name,
chunk_id,
} = request.into_inner();
// Validate that the database name is legit
let db_name = DatabaseName::new(db_name).scope("db_name")?;
let chunk_id = ChunkId::try_from(chunk_id).scope("chunk_id")?;
let db = self
.server
.db(&db_name)
.map_err(default_server_error_handler)?;
let tracker = db
.load_read_buffer(&table_name, &partition_key, chunk_id)
.map_err(default_db_error_handler)?;
let operation = Some(super::operations::encode_tracker(tracker)?);
Ok(Response::new(LoadPartitionChunkResponse { operation }))
}
async fn get_server_status(
&self,
_request: Request<GetServerStatusRequest>,

View File

@ -1410,6 +1410,7 @@ async fn test_unload_read_buffer() {
let fixture = ServerFixture::create_shared(ServerType::Database).await;
let mut write_client = fixture.write_client();
let mut management_client = fixture.management_client();
let mut operations_client = fixture.operations_client();
let db_name = rand_name();
DatabaseBuilder::new(db_name.clone())
@ -1443,21 +1444,59 @@ async fn test_unload_read_buffer() {
.expect("listing chunks");
assert_eq!(chunks.len(), 1);
let chunk_id = chunks[0].id.clone();
let table_name = &chunks[0].table_name;
let partition_key = &chunks[0].partition_key;
management_client
.unload_partition_chunk(&db_name, "data", &partition_key[..], chunk_id)
.unload_partition_chunk(&db_name, "data", &partition_key[..], chunk_id.clone())
.await
.unwrap();
let chunks = management_client
.list_chunks(&db_name)
.await
.expect("listing chunks");
assert_eq!(chunks.len(), 1);
let storage: generated_types::influxdata::iox::management::v1::ChunkStorage =
ChunkStorage::ObjectStoreOnly.into();
let storage: i32 = storage.into();
assert_eq!(chunks[0].storage, storage);
let iox_operation = management_client
.load_partition_chunk(&db_name, "data", &partition_key[..], chunk_id.clone())
.await
.unwrap();
let operation_id = iox_operation.operation.id();
// ensure we got a legit job description back
match iox_operation.metadata.job {
Some(Job::LoadReadBufferChunk(job)) => {
assert_eq!(job.chunk_id, chunk_id);
assert_eq!(&job.db_name, &db_name);
assert_eq!(job.partition_key.as_str(), partition_key);
assert_eq!(job.table_name.as_str(), table_name);
}
job => panic!("unexpected job returned {:#?}", job),
}
// wait for the job to be done
operations_client
.wait_operation(operation_id, Some(std::time::Duration::from_secs(1)))
.await
.expect("failed to wait operation");
let chunks = management_client
.list_chunks(&db_name)
.await
.expect("listing chunks");
assert_eq!(chunks.len(), 1);
let storage: generated_types::influxdata::iox::management::v1::ChunkStorage =
ChunkStorage::ReadBufferAndObjectStore.into();
let storage: i32 = storage.into();
assert_eq!(chunks[0].storage, storage);
}
#[tokio::test]

View File

@ -284,6 +284,34 @@ pub enum UnloadPartitionChunkError {
ServerError(tonic::Status),
}
/// Errors returned by [`Client::unload_partition_chunk`]
#[derive(Debug, Error)]
pub enum LoadPartitionChunkError {
/// Database not found
#[error("Not found: {}", .0)]
NotFound(String),
/// Server indicated that it is not (yet) available
#[error("Server unavailable: {}", .0.message())]
Unavailable(tonic::Status),
/// Server indicated that it is not (yet) available
#[error("Cannot perform operation due to wrong chunk lifecycle state: {}", .0.message())]
LifecycleError(tonic::Status),
/// Client received an unexpected error from the server
#[error("Unexpected server error: {}: {}", .0.code(), .0.message())]
ServerError(tonic::Status),
/// Response payload was invalid
#[error("Invalid response: {0}")]
InvalidResponse(#[from] FieldViolation),
/// Response contained no payload
#[error("Server returned an empty response")]
EmptyResponse,
}
/// Errors returned by [`Client::get_server_status`]
#[derive(Debug, Error)]
pub enum GetServerStatusError {
@ -852,6 +880,43 @@ impl Client {
Ok(())
}
/// Unload chunk from read buffer but keep it in object store.
pub async fn load_partition_chunk(
&mut self,
db_name: impl Into<String> + Send,
table_name: impl Into<String> + Send,
partition_key: impl Into<String> + Send,
chunk_id: Bytes,
) -> Result<IoxOperation, LoadPartitionChunkError> {
let db_name = db_name.into();
let partition_key = partition_key.into();
let table_name = table_name.into();
let response = self
.inner
.load_partition_chunk(LoadPartitionChunkRequest {
db_name,
partition_key,
table_name,
chunk_id,
})
.await
.map_err(|status| match status.code() {
tonic::Code::NotFound => {
LoadPartitionChunkError::NotFound(status.message().to_string())
}
tonic::Code::Unavailable => LoadPartitionChunkError::Unavailable(status),
tonic::Code::FailedPrecondition => LoadPartitionChunkError::LifecycleError(status),
_ => LoadPartitionChunkError::ServerError(status),
})?;
Ok(response
.into_inner()
.operation
.ok_or(LoadPartitionChunkError::EmptyResponse)?
.try_into()?)
}
/// Wipe potential preserved catalog of an uninitialized database.
pub async fn wipe_preserved_catalog(
&mut self,

View File

@ -18,6 +18,7 @@ use rand_distr::{Distribution, Poisson};
use snafu::{ensure, OptionExt, ResultExt, Snafu};
pub use ::lifecycle::{LifecycleChunk, LockableChunk, LockablePartition};
use data_types::job::Job;
use data_types::partition_metadata::PartitionAddr;
use data_types::{
chunk_metadata::{ChunkId, ChunkLifecycleAction, ChunkOrder, ChunkSummary},
@ -49,6 +50,7 @@ use schema::selection::Selection;
use schema::Schema;
use time::{Time, TimeProvider};
use trace::ctx::SpanContext;
use tracker::TaskTracker;
use write_buffer::core::WriteBufferReading;
pub(crate) use crate::db::chunk::DbChunk;
@ -759,6 +761,17 @@ impl Db {
lifecycle::unload_read_buffer_chunk(chunk).context(LifecycleError)
}
/// Load chunk from object store to read buffer
pub fn load_read_buffer(
self: &Arc<Self>,
table_name: &str,
partition_key: &str,
chunk_id: ChunkId,
) -> Result<TaskTracker<Job>> {
let chunk = self.lockable_chunk(table_name, partition_key, chunk_id)?;
LockableChunk::load_read_buffer(chunk.write()).context(LifecycleError)
}
/// Return chunk summary information for all chunks in the specified
/// partition across all storage systems
pub fn partition_chunk_summaries(&self, partition_key: &str) -> Vec<ChunkSummary> {