diff --git a/generated_types/build.rs b/generated_types/build.rs index 77da72038f..b9eae88b62 100644 --- a/generated_types/build.rs +++ b/generated_types/build.rs @@ -89,6 +89,7 @@ fn generate_grpc_types(root: &Path) -> Result<()> { ".influxdata.iox.management.v1.PersistChunks.chunks", ".influxdata.iox.management.v1.WriteChunk.chunk_id", ".influxdata.iox.management.v1.LoadReadBufferChunk.chunk_id", + ".influxdata.iox.management.v1.LoadPartitionChunkRequest.chunk_id", ".influxdata.iox.management.v1.UnloadPartitionChunkRequest.chunk_id", ".influxdata.iox.preserved_catalog.v1.AddParquet.metadata", ".influxdata.iox.preserved_catalog.v1.ChunkAddr.chunk_id", diff --git a/generated_types/protos/influxdata/iox/management/v1/service.proto b/generated_types/protos/influxdata/iox/management/v1/service.proto index 5a79a1b9ec..0a5077bdfe 100644 --- a/generated_types/protos/influxdata/iox/management/v1/service.proto +++ b/generated_types/protos/influxdata/iox/management/v1/service.proto @@ -67,6 +67,9 @@ service ManagementService { // Unload chunk from read buffer but keep it in object store rpc UnloadPartitionChunk(UnloadPartitionChunkRequest) returns (UnloadPartitionChunkResponse); + // Load chunk from object store into read buffer + rpc LoadPartitionChunk(LoadPartitionChunkRequest) returns (LoadPartitionChunkResponse); + // Get server status rpc GetServerStatus(GetServerStatusRequest) returns (GetServerStatusResponse); @@ -300,6 +303,28 @@ message UnloadPartitionChunkRequest { message UnloadPartitionChunkResponse { } +// Request to unload chunk from read buffer but keep it in object store +message LoadPartitionChunkRequest { + // the name of the database + string db_name = 1; + + // the partition key + string partition_key = 2; + + // the table name + string table_name = 3; + + // the chunk id + // + // UUID is stored as 16 bytes in big-endian order. + bytes chunk_id = 4; +} + +message LoadPartitionChunkResponse { + // The operation that tracks the work for loading the chunk + google.longrunning.Operation operation = 1; +} + message GetServerStatusRequest {} message GetServerStatusResponse { // Server status. diff --git a/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/management.rs b/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/management.rs index 1448d0fd3f..a8d556b5b5 100644 --- a/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/management.rs +++ b/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/management.rs @@ -370,6 +370,35 @@ impl management_service_server::ManagementService for ManagementService { Ok(Response::new(UnloadPartitionChunkResponse {})) } + async fn load_partition_chunk( + &self, + request: tonic::Request, + ) -> Result, tonic::Status> { + let LoadPartitionChunkRequest { + db_name, + partition_key, + table_name, + chunk_id, + } = request.into_inner(); + + // Validate that the database name is legit + let db_name = DatabaseName::new(db_name).scope("db_name")?; + let chunk_id = ChunkId::try_from(chunk_id).scope("chunk_id")?; + + let db = self + .server + .db(&db_name) + .map_err(default_server_error_handler)?; + + let tracker = db + .load_read_buffer(&table_name, &partition_key, chunk_id) + .map_err(default_db_error_handler)?; + + let operation = Some(super::operations::encode_tracker(tracker)?); + + Ok(Response::new(LoadPartitionChunkResponse { operation })) + } + async fn get_server_status( &self, _request: Request, diff --git a/influxdb_iox/tests/end_to_end_cases/management_api.rs b/influxdb_iox/tests/end_to_end_cases/management_api.rs index cd29034df3..e9bcbc9110 100644 --- a/influxdb_iox/tests/end_to_end_cases/management_api.rs +++ b/influxdb_iox/tests/end_to_end_cases/management_api.rs @@ -1410,6 +1410,7 @@ async fn test_unload_read_buffer() { let fixture = ServerFixture::create_shared(ServerType::Database).await; let mut write_client = fixture.write_client(); let mut management_client = fixture.management_client(); + let mut operations_client = fixture.operations_client(); let db_name = rand_name(); DatabaseBuilder::new(db_name.clone()) @@ -1443,21 +1444,59 @@ async fn test_unload_read_buffer() { .expect("listing chunks"); assert_eq!(chunks.len(), 1); let chunk_id = chunks[0].id.clone(); + let table_name = &chunks[0].table_name; let partition_key = &chunks[0].partition_key; management_client - .unload_partition_chunk(&db_name, "data", &partition_key[..], chunk_id) + .unload_partition_chunk(&db_name, "data", &partition_key[..], chunk_id.clone()) .await .unwrap(); + let chunks = management_client .list_chunks(&db_name) .await .expect("listing chunks"); + assert_eq!(chunks.len(), 1); let storage: generated_types::influxdata::iox::management::v1::ChunkStorage = ChunkStorage::ObjectStoreOnly.into(); let storage: i32 = storage.into(); assert_eq!(chunks[0].storage, storage); + + let iox_operation = management_client + .load_partition_chunk(&db_name, "data", &partition_key[..], chunk_id.clone()) + .await + .unwrap(); + + let operation_id = iox_operation.operation.id(); + + // ensure we got a legit job description back + match iox_operation.metadata.job { + Some(Job::LoadReadBufferChunk(job)) => { + assert_eq!(job.chunk_id, chunk_id); + assert_eq!(&job.db_name, &db_name); + assert_eq!(job.partition_key.as_str(), partition_key); + assert_eq!(job.table_name.as_str(), table_name); + } + job => panic!("unexpected job returned {:#?}", job), + } + + // wait for the job to be done + operations_client + .wait_operation(operation_id, Some(std::time::Duration::from_secs(1))) + .await + .expect("failed to wait operation"); + + let chunks = management_client + .list_chunks(&db_name) + .await + .expect("listing chunks"); + + assert_eq!(chunks.len(), 1); + let storage: generated_types::influxdata::iox::management::v1::ChunkStorage = + ChunkStorage::ReadBufferAndObjectStore.into(); + let storage: i32 = storage.into(); + assert_eq!(chunks[0].storage, storage); } #[tokio::test] diff --git a/influxdb_iox_client/src/client/management.rs b/influxdb_iox_client/src/client/management.rs index 8ed5b77492..c4344b277f 100644 --- a/influxdb_iox_client/src/client/management.rs +++ b/influxdb_iox_client/src/client/management.rs @@ -284,6 +284,34 @@ pub enum UnloadPartitionChunkError { ServerError(tonic::Status), } +/// Errors returned by [`Client::unload_partition_chunk`] +#[derive(Debug, Error)] +pub enum LoadPartitionChunkError { + /// Database not found + #[error("Not found: {}", .0)] + NotFound(String), + + /// Server indicated that it is not (yet) available + #[error("Server unavailable: {}", .0.message())] + Unavailable(tonic::Status), + + /// Server indicated that it is not (yet) available + #[error("Cannot perform operation due to wrong chunk lifecycle state: {}", .0.message())] + LifecycleError(tonic::Status), + + /// Client received an unexpected error from the server + #[error("Unexpected server error: {}: {}", .0.code(), .0.message())] + ServerError(tonic::Status), + + /// Response payload was invalid + #[error("Invalid response: {0}")] + InvalidResponse(#[from] FieldViolation), + + /// Response contained no payload + #[error("Server returned an empty response")] + EmptyResponse, +} + /// Errors returned by [`Client::get_server_status`] #[derive(Debug, Error)] pub enum GetServerStatusError { @@ -852,6 +880,43 @@ impl Client { Ok(()) } + /// Unload chunk from read buffer but keep it in object store. + pub async fn load_partition_chunk( + &mut self, + db_name: impl Into + Send, + table_name: impl Into + Send, + partition_key: impl Into + Send, + chunk_id: Bytes, + ) -> Result { + let db_name = db_name.into(); + let partition_key = partition_key.into(); + let table_name = table_name.into(); + + let response = self + .inner + .load_partition_chunk(LoadPartitionChunkRequest { + db_name, + partition_key, + table_name, + chunk_id, + }) + .await + .map_err(|status| match status.code() { + tonic::Code::NotFound => { + LoadPartitionChunkError::NotFound(status.message().to_string()) + } + tonic::Code::Unavailable => LoadPartitionChunkError::Unavailable(status), + tonic::Code::FailedPrecondition => LoadPartitionChunkError::LifecycleError(status), + _ => LoadPartitionChunkError::ServerError(status), + })?; + + Ok(response + .into_inner() + .operation + .ok_or(LoadPartitionChunkError::EmptyResponse)? + .try_into()?) + } + /// Wipe potential preserved catalog of an uninitialized database. pub async fn wipe_preserved_catalog( &mut self, diff --git a/server/src/db.rs b/server/src/db.rs index 5dec984a15..d9ccf57e85 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -18,6 +18,7 @@ use rand_distr::{Distribution, Poisson}; use snafu::{ensure, OptionExt, ResultExt, Snafu}; pub use ::lifecycle::{LifecycleChunk, LockableChunk, LockablePartition}; +use data_types::job::Job; use data_types::partition_metadata::PartitionAddr; use data_types::{ chunk_metadata::{ChunkId, ChunkLifecycleAction, ChunkOrder, ChunkSummary}, @@ -49,6 +50,7 @@ use schema::selection::Selection; use schema::Schema; use time::{Time, TimeProvider}; use trace::ctx::SpanContext; +use tracker::TaskTracker; use write_buffer::core::WriteBufferReading; pub(crate) use crate::db::chunk::DbChunk; @@ -759,6 +761,17 @@ impl Db { lifecycle::unload_read_buffer_chunk(chunk).context(LifecycleError) } + /// Load chunk from object store to read buffer + pub fn load_read_buffer( + self: &Arc, + table_name: &str, + partition_key: &str, + chunk_id: ChunkId, + ) -> Result> { + let chunk = self.lockable_chunk(table_name, partition_key, chunk_id)?; + LockableChunk::load_read_buffer(chunk.write()).context(LifecycleError) + } + /// Return chunk summary information for all chunks in the specified /// partition across all storage systems pub fn partition_chunk_summaries(&self, partition_key: &str) -> Vec {