Merge pull request #3268 from influxdata/er/feat/load_chunk_cli
feat: add CLI command for loading chunks to Read Bufferpull/24376/head
commit
03d715e09e
|
@ -1,5 +1,5 @@
|
|||
//! Module contains a representation of chunk metadata
|
||||
use std::{convert::TryFrom, num::NonZeroU32, sync::Arc};
|
||||
use std::{convert::TryFrom, num::NonZeroU32, str::FromStr, sync::Arc};
|
||||
|
||||
use bytes::Bytes;
|
||||
use snafu::{ResultExt, Snafu};
|
||||
|
@ -256,13 +256,16 @@ impl From<ChunkId> for Bytes {
|
|||
}
|
||||
|
||||
#[derive(Debug, Snafu)]
|
||||
pub enum BytesToChunkIdError {
|
||||
pub enum ChunkIdConversionError {
|
||||
#[snafu(display("Cannot convert bytes to chunk ID: {}", source))]
|
||||
CannotConvertBytes { source: uuid::Error },
|
||||
|
||||
#[snafu(display("Cannot convert UUID text to chunk ID: {}", source))]
|
||||
CannotConvertUUIDText { source: uuid::Error },
|
||||
}
|
||||
|
||||
impl TryFrom<Bytes> for ChunkId {
|
||||
type Error = BytesToChunkIdError;
|
||||
type Error = ChunkIdConversionError;
|
||||
|
||||
fn try_from(value: Bytes) -> Result<Self, Self::Error> {
|
||||
Ok(Self(Uuid::from_slice(&value).context(CannotConvertBytes)?))
|
||||
|
@ -275,6 +278,17 @@ impl From<Uuid> for ChunkId {
|
|||
}
|
||||
}
|
||||
|
||||
/// Implements conversion from the canonical textual representation of a UUID
|
||||
/// into a `ChunkId`.
|
||||
impl FromStr for ChunkId {
|
||||
type Err = ChunkIdConversionError;
|
||||
|
||||
fn from_str(s: &str) -> Result<Self, Self::Err> {
|
||||
let uuid = Uuid::parse_str(s).context(CannotConvertUUIDText)?;
|
||||
Ok(Self::from(uuid))
|
||||
}
|
||||
}
|
||||
|
||||
/// Order of a chunk.
|
||||
///
|
||||
/// This is used for:
|
||||
|
|
|
@ -1,8 +1,11 @@
|
|||
//! This module implements the `chunk` CLI command
|
||||
use std::str::FromStr;
|
||||
|
||||
use data_types::chunk_metadata::{ChunkId, ChunkIdConversionError};
|
||||
use generated_types::google::FieldViolation;
|
||||
use influxdb_iox_client::{
|
||||
connection::Connection,
|
||||
management::{self, ListChunksError},
|
||||
management::{self, generated_types::Chunk, ListChunksError, LoadPartitionChunkError},
|
||||
};
|
||||
use structopt::StructOpt;
|
||||
use thiserror::Error;
|
||||
|
@ -21,6 +24,15 @@ pub enum Error {
|
|||
|
||||
#[error("Error connecting to IOx: {0}")]
|
||||
ConnectionError(#[from] influxdb_iox_client::connection::Error),
|
||||
|
||||
#[error("Error loading chunks: {0}")]
|
||||
LoadChunkError(#[from] LoadPartitionChunkError),
|
||||
|
||||
#[error("Chunk {value:?} not found")]
|
||||
ChunkNotFound { value: String },
|
||||
|
||||
#[error("Invalid chunk ID: {0}")]
|
||||
InvalidChunkIDError(#[from] ChunkIdConversionError),
|
||||
}
|
||||
|
||||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||
|
@ -39,10 +51,21 @@ struct List {
|
|||
db_name: String,
|
||||
}
|
||||
|
||||
/// Loads the specified chunk in the specified database from the Object Store to the Read Buffer.
|
||||
#[derive(Debug, StructOpt)]
|
||||
struct Load {
|
||||
/// The name of the database
|
||||
db_name: String,
|
||||
|
||||
/// The ID of the chunk
|
||||
chunk_id: String,
|
||||
}
|
||||
|
||||
/// All possible subcommands for chunk
|
||||
#[derive(Debug, StructOpt)]
|
||||
enum Command {
|
||||
List(List),
|
||||
Load(Load),
|
||||
}
|
||||
|
||||
pub async fn command(connection: Connection, config: Config) -> Result<()> {
|
||||
|
@ -56,7 +79,42 @@ pub async fn command(connection: Connection, config: Config) -> Result<()> {
|
|||
|
||||
serde_json::to_writer_pretty(std::io::stdout(), &chunks)?;
|
||||
}
|
||||
Command::Load(load) => {
|
||||
let Load { db_name, chunk_id } = load;
|
||||
|
||||
let mut client = management::Client::new(connection);
|
||||
let chunks = client.list_chunks(&db_name).await?;
|
||||
|
||||
let load_chunk_id = ChunkId::from_str(&chunk_id)?;
|
||||
for chunk in chunks {
|
||||
let id: ChunkId = chunk
|
||||
.id
|
||||
.clone()
|
||||
.try_into()
|
||||
.expect("catalog chunk IDs to be valid");
|
||||
if id == load_chunk_id {
|
||||
return load_chunk_to_read_buffer(&mut client, &db_name, chunk).await;
|
||||
}
|
||||
}
|
||||
|
||||
return Err(Error::ChunkNotFound {
|
||||
value: load_chunk_id.to_string(),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn load_chunk_to_read_buffer(
|
||||
client: &mut management::Client,
|
||||
db_name: &str,
|
||||
chunk: Chunk,
|
||||
) -> Result<()> {
|
||||
let operation = client
|
||||
.load_partition_chunk(db_name, chunk.table_name, chunk.partition_key, chunk.id)
|
||||
.await?;
|
||||
|
||||
serde_json::to_writer_pretty(std::io::stdout(), &operation)?;
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
@ -13,7 +13,7 @@ use generated_types::{
|
|||
influxdata::iox::management::v1::{operation_metadata::Job, WipePreservedCatalog},
|
||||
};
|
||||
use predicates::prelude::*;
|
||||
use std::time::Duration;
|
||||
use std::{sync::Arc, time::Duration};
|
||||
use tempfile::TempDir;
|
||||
use test_helpers::make_temp_file;
|
||||
use uuid::Uuid;
|
||||
|
@ -1158,9 +1158,8 @@ fn load_lp(addr: &str, db_name: &str, lp_data: Vec<&str>) {
|
|||
.stdout(predicate::str::contains("Lines OK"));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_unload_partition_chunk() {
|
||||
let fixture = ServerFixture::create_shared(ServerType::Database).await;
|
||||
async fn setup_load_unload_partition_chunk() -> (Arc<ServerFixture>, String, String) {
|
||||
let fixture = Arc::from(ServerFixture::create_shared(ServerType::Database).await);
|
||||
let addr = fixture.grpc_base();
|
||||
let db_name = rand_name();
|
||||
|
||||
|
@ -1174,6 +1173,70 @@ async fn test_unload_partition_chunk() {
|
|||
let lp_data = vec!["cpu,region=west user=23.2 10"];
|
||||
load_lp(addr, &db_name, lp_data);
|
||||
|
||||
(Arc::clone(&fixture), db_name, String::from(addr))
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_load_partition_chunk() {
|
||||
let (fixture, db_name, addr) = setup_load_unload_partition_chunk().await;
|
||||
let mut chunks = wait_for_exact_chunk_states(
|
||||
&fixture,
|
||||
&db_name,
|
||||
vec![ChunkStorage::ReadBufferAndObjectStore],
|
||||
std::time::Duration::from_secs(5),
|
||||
)
|
||||
.await;
|
||||
let chunk = chunks.pop().unwrap();
|
||||
|
||||
Command::cargo_bin("influxdb_iox")
|
||||
.unwrap()
|
||||
.arg("database")
|
||||
.arg("partition")
|
||||
.arg("unload-chunk")
|
||||
.arg(&db_name)
|
||||
.arg("cpu")
|
||||
.arg("cpu")
|
||||
.arg(chunk.id.get().to_string())
|
||||
.arg("--host")
|
||||
.arg(&addr)
|
||||
.assert()
|
||||
.success()
|
||||
.stdout(predicate::str::contains("Ok"));
|
||||
|
||||
let mut chunks = wait_for_exact_chunk_states(
|
||||
&fixture,
|
||||
&db_name,
|
||||
vec![ChunkStorage::ObjectStoreOnly],
|
||||
std::time::Duration::from_secs(5),
|
||||
)
|
||||
.await;
|
||||
let chunk = chunks.pop().unwrap();
|
||||
|
||||
Command::cargo_bin("influxdb_iox")
|
||||
.unwrap()
|
||||
.arg("database")
|
||||
.arg("chunk")
|
||||
.arg("load")
|
||||
.arg(&db_name)
|
||||
.arg(chunk.id.get().to_string())
|
||||
.arg("--host")
|
||||
.arg(&addr)
|
||||
.assert()
|
||||
.success()
|
||||
.stdout(predicate::str::contains("loadReadBufferChunk"));
|
||||
|
||||
wait_for_exact_chunk_states(
|
||||
&fixture,
|
||||
&db_name,
|
||||
vec![ChunkStorage::ReadBufferAndObjectStore],
|
||||
std::time::Duration::from_secs(5),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_unload_partition_chunk() {
|
||||
let (fixture, db_name, addr) = setup_load_unload_partition_chunk().await;
|
||||
let mut chunks = wait_for_exact_chunk_states(
|
||||
&fixture,
|
||||
&db_name,
|
||||
|
|
|
@ -284,7 +284,7 @@ pub enum UnloadPartitionChunkError {
|
|||
ServerError(tonic::Status),
|
||||
}
|
||||
|
||||
/// Errors returned by [`Client::unload_partition_chunk`]
|
||||
/// Errors returned by [`Client::load_partition_chunk`]
|
||||
#[derive(Debug, Error)]
|
||||
pub enum LoadPartitionChunkError {
|
||||
/// Database not found
|
||||
|
@ -880,7 +880,7 @@ impl Client {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
/// Unload chunk from read buffer but keep it in object store.
|
||||
/// Load a chunk from the object store into the Read Buffer.
|
||||
pub async fn load_partition_chunk(
|
||||
&mut self,
|
||||
db_name: impl Into<String> + Send,
|
||||
|
|
|
@ -155,7 +155,7 @@ pub enum Error {
|
|||
|
||||
#[snafu(display("Cannot decode chunk id: {}", source))]
|
||||
CannotDecodeChunkId {
|
||||
source: data_types::chunk_metadata::BytesToChunkIdError,
|
||||
source: data_types::chunk_metadata::ChunkIdConversionError,
|
||||
},
|
||||
}
|
||||
|
||||
|
|
|
@ -238,7 +238,7 @@ pub enum Error {
|
|||
|
||||
#[snafu(display("Cannot decode chunk id: {}", source))]
|
||||
CannotDecodeChunkId {
|
||||
source: data_types::chunk_metadata::BytesToChunkIdError,
|
||||
source: data_types::chunk_metadata::ChunkIdConversionError,
|
||||
},
|
||||
}
|
||||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||
|
|
Loading…
Reference in New Issue