feat: add API+CLI to unload chunks

Closes #1919.
pull/24376/head
Marco Neumann 2021-07-12 14:06:01 +02:00
parent 3dae69ede9
commit 3d008f4d27
9 changed files with 372 additions and 90 deletions

View File

@ -62,6 +62,9 @@ service ManagementService {
// Close a chunk and move it to the read buffer
rpc ClosePartitionChunk(ClosePartitionChunkRequest) returns (ClosePartitionChunkResponse);
// Unload chunk from read buffer but keep it in object store
rpc UnloadPartitionChunk(UnloadPartitionChunkRequest) returns (UnloadPartitionChunkResponse);
// Get server status
rpc GetServerStatus(GetServerStatusRequest) returns (GetServerStatusResponse);
@ -242,6 +245,24 @@ message ClosePartitionChunkResponse {
google.longrunning.Operation operation = 1;
}
// Request to unload chunk from read buffer but keep it in object store
message UnloadPartitionChunkRequest {
// the name of the database
string db_name = 1;
// the partition key
string partition_key = 2;
// the table name
string table_name = 4;
// the chunk id
uint32 chunk_id = 3;
}
message UnloadPartitionChunkResponse {
}
message GetServerStatusRequest {}
message GetServerStatusResponse {
// Server status.

View File

@ -249,6 +249,26 @@ pub enum ClosePartitionChunkError {
ServerError(tonic::Status),
}
/// Errors returned by [`Client::unload_partition_chunk`]
#[derive(Debug, Error)]
pub enum UnloadPartitionChunkError {
/// Database not found
#[error("Not found: {}", .0)]
NotFound(String),
/// Server indicated that it is not (yet) available
#[error("Server unavailable: {}", .0.message())]
Unavailable(tonic::Status),
/// Server indicated that it is not (yet) available
#[error("Cannot perform operation due to wrong chunk lifecycle state: {}", .0.message())]
LifecycleError(tonic::Status),
/// Client received an unexpected error from the server
#[error("Unexpected server error: {}: {}", .0.code(), .0.message())]
ServerError(tonic::Status),
}
/// Errors returned by [`Client::get_server_status`]
#[derive(Debug, Error)]
pub enum GetServerStatusError {
@ -669,6 +689,40 @@ impl Client {
.ok_or(ClosePartitionChunkError::EmptyResponse)?)
}
/// Unload chunk from read buffer but keep it in object store.
pub async fn unload_partition_chunk(
&mut self,
db_name: impl Into<String> + Send,
table_name: impl Into<String> + Send,
partition_key: impl Into<String> + Send,
chunk_id: u32,
) -> Result<(), UnloadPartitionChunkError> {
let db_name = db_name.into();
let partition_key = partition_key.into();
let table_name = table_name.into();
self.inner
.unload_partition_chunk(UnloadPartitionChunkRequest {
db_name,
partition_key,
table_name,
chunk_id,
})
.await
.map_err(|status| match status.code() {
tonic::Code::NotFound => {
UnloadPartitionChunkError::NotFound(status.message().to_string())
}
tonic::Code::Unavailable => UnloadPartitionChunkError::Unavailable(status),
tonic::Code::FailedPrecondition => {
UnloadPartitionChunkError::LifecycleError(status)
}
_ => UnloadPartitionChunkError::ServerError(status),
})?;
Ok(())
}
/// Wipe potential preserved catalog of an uninitialized database.
pub async fn wipe_persisted_catalog(
&mut self,

View File

@ -6,7 +6,7 @@ use influxdb_iox_client::{
connection::Builder,
management::{
self, ClosePartitionChunkError, GetPartitionError, ListPartitionChunksError,
ListPartitionsError, NewPartitionChunkError,
ListPartitionsError, NewPartitionChunkError, UnloadPartitionChunkError
},
};
use std::convert::{TryFrom, TryInto};
@ -30,6 +30,9 @@ pub enum Error {
#[error("Error closing chunk: {0}")]
ClosePartitionChunkError(#[from] ClosePartitionChunkError),
#[error("Error unloading chunk: {0}")]
UnloadPartitionChunkError(#[from] UnloadPartitionChunkError),
#[error("Error rendering response as JSON: {0}")]
WritingJson(#[from] serde_json::Error),
@ -107,19 +110,42 @@ struct CloseChunk {
chunk_id: u32,
}
/// Unload chunk from read buffer but keep it in object store.
#[derive(Debug, StructOpt)]
struct UnloadChunk {
/// The name of the database
db_name: String,
/// The partition key
partition_key: String,
/// The table name
table_name: String,
/// The chunk id
chunk_id: u32,
}
/// All possible subcommands for partition
#[derive(Debug, StructOpt)]
enum Command {
// List partitions
/// List partitions
List(List),
// Get details about a particular partition
/// Get details about a particular partition
Get(Get),
// List chunks in a partition
/// List chunks in a partition
ListChunks(ListChunks),
// Create a new chunk in the partition
/// Create a new chunk in the partition
NewChunk(NewChunk),
// Close the chunk and move to read buffer
/// Close the chunk and move to read buffer
CloseChunk(CloseChunk),
/// Unload chunk from read buffer but keep it in object store.
UnloadChunk(UnloadChunk),
}
pub async fn command(url: String, config: Config) -> Result<()> {
@ -198,6 +224,19 @@ pub async fn command(url: String, config: Config) -> Result<()> {
serde_json::to_writer_pretty(std::io::stdout(), &operation)?;
}
Command::UnloadChunk(close_chunk) => {
let UnloadChunk {
db_name,
partition_key,
table_name,
chunk_id,
} = close_chunk;
client
.unload_partition_chunk(db_name, table_name, partition_key, chunk_id)
.await?;
println!("Ok");
}
}
Ok(())

View File

@ -92,6 +92,15 @@ pub fn default_db_error_handler(error: server::db::Error) -> tonic::Status {
description: "Cannot write to database: no mutable buffer configured".to_string(),
}
.into(),
Error::LifecycleError { source } => PreconditionViolation {
category: "chunk".to_string(),
subject: "influxdata.com/iox".to_string(),
description: format!(
"Cannot perform operation due to wrong chunk lifecycle: {}",
source
),
}
.into(),
Error::CatalogError { source } => default_catalog_error_handler(source),
error => {
error!(?error, "Unexpected error");

View File

@ -363,6 +363,38 @@ where
Ok(Response::new(ClosePartitionChunkResponse { operation }))
}
async fn unload_partition_chunk(
&self,
request: tonic::Request<UnloadPartitionChunkRequest>,
) -> Result<tonic::Response<UnloadPartitionChunkResponse>, tonic::Status> {
let UnloadPartitionChunkRequest {
db_name,
partition_key,
table_name,
chunk_id,
} = request.into_inner();
// Validate that the database name is legit
let db_name = DatabaseName::new(db_name).field("db_name")?;
let db = match self.server.db(&db_name) {
Some(db) => db,
None => {
return Err(NotFound {
resource_type: "database".to_string(),
resource_name: db_name.to_string(),
..Default::default()
}
.into())
}
};
db.unload_read_buffer(&table_name, &partition_key, chunk_id)
.map_err(default_db_error_handler)?;
Ok(Response::new(UnloadPartitionChunkResponse {}))
}
async fn set_serving_readiness(
&self,
request: Request<SetServingReadinessRequest>,

View File

@ -13,7 +13,10 @@ use test_helpers::assert_contains;
use super::scenario::{
create_readable_database, create_two_partition_database, create_unreadable_database, rand_name,
};
use crate::common::server_fixture::ServerFixture;
use crate::{
common::server_fixture::ServerFixture,
end_to_end_cases::scenario::{create_quickly_persisting_database, wait_for_exact_chunk_states},
};
use std::time::Instant;
use tonic::Code;
@ -966,3 +969,55 @@ async fn test_get_server_status_db_error() {
DatabaseState::Known
);
}
#[tokio::test]
async fn test_unload_read_buffer() {
use data_types::chunk_metadata::ChunkStorage;
let fixture = ServerFixture::create_shared().await;
let mut write_client = fixture.write_client();
let mut management_client = fixture.management_client();
let db_name = rand_name();
create_quickly_persisting_database(&db_name, fixture.grpc_channel(), 1).await;
let lp_lines: Vec<_> = (0..1_000)
.map(|i| format!("data,tag1=val{} x={} {}", i, i * 10, i))
.collect();
let num_lines_written = write_client
.write(&db_name, lp_lines.join("\n"))
.await
.expect("successful write");
assert_eq!(num_lines_written, 1000);
wait_for_exact_chunk_states(
&fixture,
&db_name,
vec![ChunkStorage::ReadBufferAndObjectStore],
std::time::Duration::from_secs(5),
)
.await;
let chunks = management_client
.list_chunks(&db_name)
.await
.expect("listing chunks");
assert_eq!(chunks.len(), 1);
let chunk_id = chunks[0].id;
let partition_key = &chunks[0].partition_key;
management_client
.unload_partition_chunk(&db_name, "data", &partition_key[..], chunk_id)
.await
.unwrap();
let chunks = management_client
.list_chunks(&db_name)
.await
.expect("listing chunks");
assert_eq!(chunks.len(), 1);
let storage: generated_types::influxdata::iox::management::v1::ChunkStorage =
ChunkStorage::ObjectStoreOnly.into();
let storage: i32 = storage.into();
assert_eq!(chunks[0].storage, storage);
}

View File

@ -1,9 +1,15 @@
use assert_cmd::Command;
use data_types::job::{Job, Operation};
use data_types::{
chunk_metadata::ChunkStorage,
job::{Job, Operation},
};
use predicates::prelude::*;
use test_helpers::make_temp_file;
use crate::common::server_fixture::ServerFixture;
use crate::{
common::server_fixture::ServerFixture,
end_to_end_cases::scenario::{create_quickly_persisting_database, wait_for_exact_chunk_states},
};
use super::scenario::{create_readable_database, rand_name};
@ -672,3 +678,65 @@ fn load_lp(addr: &str, db_name: &str, lp_data: Vec<&str>) {
.success()
.stdout(predicate::str::contains("Lines OK"));
}
#[tokio::test]
async fn test_unload_partition_chunk() {
let server_fixture = ServerFixture::create_shared().await;
let addr = server_fixture.grpc_base();
let db_name = rand_name();
create_quickly_persisting_database(&db_name, server_fixture.grpc_channel(), 1).await;
let lp_data = vec!["cpu,region=west user=23.2 10"];
load_lp(addr, &db_name, lp_data);
wait_for_exact_chunk_states(
&server_fixture,
&db_name,
vec![ChunkStorage::ReadBufferAndObjectStore],
std::time::Duration::from_secs(5),
)
.await;
Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("database")
.arg("partition")
.arg("unload-chunk")
.arg(&db_name)
.arg("1970-01-01 00:00:00")
.arg("cpu")
.arg("1")
.arg("--host")
.arg(addr)
.assert()
.success()
.stdout(predicate::str::contains("Ok"));
}
#[tokio::test]
async fn test_unload_partition_chunk_error() {
let server_fixture = ServerFixture::create_shared().await;
let addr = server_fixture.grpc_base();
let db_name = rand_name();
create_readable_database(&db_name, server_fixture.grpc_channel()).await;
let lp_data = vec!["cpu,region=west user=23.2 100"];
load_lp(addr, &db_name, lp_data);
Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("database")
.arg("partition")
.arg("unload-chunk")
.arg(&db_name)
.arg("cpu")
.arg("cpu")
.arg("0")
.arg("--host")
.arg(addr)
.assert()
.failure()
.stderr(predicate::str::contains("wrong chunk lifecycle"));
}

View File

@ -1,12 +1,13 @@
use std::convert::TryInto;
use itertools::Itertools;
use arrow_util::assert_batches_eq;
use data_types::chunk_metadata::{ChunkStorage, ChunkSummary};
use data_types::chunk_metadata::ChunkStorage;
use influxdb_iox_client::operations;
use crate::common::server_fixture::ServerFixture;
use crate::{
common::server_fixture::ServerFixture,
end_to_end_cases::scenario::{list_chunks, wait_for_exact_chunk_states},
};
use super::scenario::{
collect_query, create_quickly_persisting_database, create_readable_database, rand_name,
@ -214,75 +215,6 @@ async fn create_readbuffer_chunk(fixture: &ServerFixture, db_name: &str) -> u32
chunk_id
}
// Wait for the chunks to be in exactly `desired_storages` states
async fn wait_for_exact_chunk_states(
fixture: &ServerFixture,
db_name: &str,
mut desired_storages: Vec<ChunkStorage>,
wait_time: std::time::Duration,
) {
// ensure consistent order
desired_storages.sort();
let fail_message = format!("persisted chunks in exactly {:?}", desired_storages);
let pred = |chunks: &[ChunkSummary]| {
let actual_storages = chunks.iter().map(|chunk| chunk.storage).collect::<Vec<_>>();
desired_storages == actual_storages
};
wait_for_state(fixture, db_name, pred, fail_message, wait_time).await
}
/// Wait for the predicate to pass
async fn wait_for_state<P>(
fixture: &ServerFixture,
db_name: &str,
mut pred: P,
fail_message: String,
wait_time: std::time::Duration,
) where
P: FnMut(&[ChunkSummary]) -> bool,
{
let t_start = std::time::Instant::now();
loop {
let chunks = list_chunks(fixture, db_name).await;
if pred(&chunks) {
return;
}
// Log the current status of the chunks
for chunk in &chunks {
println!(
"{:?}: chunk {} partition {} storage: {:?} row_count: {} time_of_last_write: {:?}",
(t_start.elapsed()),
chunk.id,
chunk.partition_key,
chunk.storage,
chunk.row_count,
chunk.time_of_last_write
);
}
if t_start.elapsed() >= wait_time {
let operations = fixture.operations_client().list_operations().await.unwrap();
let mut operations: Vec<_> = operations
.into_iter()
.map(|x| (x.name().parse::<usize>().unwrap(), x.metadata()))
.collect();
operations.sort_by_key(|x| x.0);
panic!(
"Could not find {} within {:?}.\nChunks were: {:#?}\nOperations were: {:#?}",
fail_message, wait_time, chunks, operations
)
}
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
}
}
async fn assert_chunk_query_works(fixture: &ServerFixture, db_name: &str) {
let mut client = fixture.flight_client();
let sql_query = "select region, user, time from cpu";
@ -300,11 +232,3 @@ async fn assert_chunk_query_works(fixture: &ServerFixture, db_name: &str) {
assert_batches_eq!(expected_read_data, &batches);
}
/// Gets the list of ChunkSummaries from the server
async fn list_chunks(fixture: &ServerFixture, db_name: &str) -> Vec<ChunkSummary> {
let mut management_client = fixture.management_client();
let chunks = management_client.list_chunks(db_name).await.unwrap();
chunks.into_iter().map(|c| c.try_into().unwrap()).collect()
}

View File

@ -5,6 +5,7 @@ use arrow::{
array::{ArrayRef, Float64Array, StringArray, TimestampNanosecondArray},
record_batch::RecordBatch,
};
use data_types::chunk_metadata::{ChunkStorage, ChunkSummary};
use futures::prelude::*;
use prost::Message;
use rand::{
@ -18,6 +19,8 @@ use generated_types::influxdata::iox::management::v1::*;
use generated_types::{influxdata::iox::management::v1::DatabaseRules, ReadSource, TimestampRange};
use influxdb_iox_client::flight::PerformQuery;
use crate::common::server_fixture::ServerFixture;
type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
type Result<T, E = Error> = std::result::Result<T, E>;
@ -416,3 +419,80 @@ pub async fn collect_query(mut query_results: PerformQuery) -> Vec<RecordBatch>
}
batches
}
/// Wait for the chunks to be in exactly `desired_storages` states
pub async fn wait_for_exact_chunk_states(
fixture: &ServerFixture,
db_name: &str,
mut desired_storages: Vec<ChunkStorage>,
wait_time: std::time::Duration,
) {
// ensure consistent order
desired_storages.sort();
let fail_message = format!("persisted chunks in exactly {:?}", desired_storages);
let pred = |chunks: &[ChunkSummary]| {
let actual_storages = chunks.iter().map(|chunk| chunk.storage).collect::<Vec<_>>();
desired_storages == actual_storages
};
wait_for_state(fixture, db_name, pred, fail_message, wait_time).await
}
/// Wait for the predicate to pass
async fn wait_for_state<P>(
fixture: &ServerFixture,
db_name: &str,
mut pred: P,
fail_message: String,
wait_time: std::time::Duration,
) where
P: FnMut(&[ChunkSummary]) -> bool,
{
let t_start = std::time::Instant::now();
loop {
let chunks = list_chunks(fixture, db_name).await;
if pred(&chunks) {
return;
}
// Log the current status of the chunks
for chunk in &chunks {
println!(
"{:?}: chunk {} partition {} storage: {:?} row_count: {} time_of_last_write: {:?}",
(t_start.elapsed()),
chunk.id,
chunk.partition_key,
chunk.storage,
chunk.row_count,
chunk.time_of_last_write
);
}
if t_start.elapsed() >= wait_time {
let operations = fixture.operations_client().list_operations().await.unwrap();
let mut operations: Vec<_> = operations
.into_iter()
.map(|x| (x.name().parse::<usize>().unwrap(), x.metadata()))
.collect();
operations.sort_by_key(|x| x.0);
panic!(
"Could not find {} within {:?}.\nChunks were: {:#?}\nOperations were: {:#?}",
fail_message, wait_time, chunks, operations
)
}
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
}
}
/// Gets the list of ChunkSummaries from the server
pub async fn list_chunks(fixture: &ServerFixture, db_name: &str) -> Vec<ChunkSummary> {
let mut management_client = fixture.management_client();
let chunks = management_client.list_chunks(db_name).await.unwrap();
chunks.into_iter().map(|c| c.try_into().unwrap()).collect()
}