feat: Management API + CLI command to close a chunk and move to read buffer (#1002)
* feat: Management API + CLI command to close a chunk and move to read buffer
* refactor: Less copy-pasta
* fix: track only once, use `let _` instead of `.ok()`
* docs: Apply suggestions from code review
fix comments ( 🤦♀️ for copy/pasta)
Co-authored-by: Carol (Nichols || Goulding) <193874+carols10cents@users.noreply.github.com>
* docs: Update server/src/lib.rs
Co-authored-by: Carol (Nichols || Goulding) <193874+carols10cents@users.noreply.github.com>
* refactor: Use DatabaseName rather than impl Into<String>
* fix: Fixup logical merge conflicts
Co-authored-by: Carol (Nichols || Goulding) <193874+carols10cents@users.noreply.github.com>
pull/24376/head
parent
1fee56274b
commit
3a53923684
|
@ -9,15 +9,30 @@ use std::convert::TryFrom;
|
|||
/// Used in combination with TrackerRegistry
|
||||
///
|
||||
/// TODO: Serde is temporary until prost adds JSON support
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
|
||||
pub enum Job {
|
||||
PersistSegment { writer_id: u32, segment_id: u64 },
|
||||
Dummy { nanos: Vec<u64> },
|
||||
Dummy {
|
||||
nanos: Vec<u64>,
|
||||
},
|
||||
|
||||
/// Persist a WAL segment to object store
|
||||
PersistSegment {
|
||||
writer_id: u32,
|
||||
segment_id: u64,
|
||||
},
|
||||
|
||||
/// Move a chunk from mutable buffer to read buffer
|
||||
CloseChunk {
|
||||
db_name: String,
|
||||
partition_key: String,
|
||||
chunk_id: u32,
|
||||
},
|
||||
}
|
||||
|
||||
impl From<Job> for management::operation_metadata::Job {
|
||||
fn from(job: Job) -> Self {
|
||||
match job {
|
||||
Job::Dummy { nanos } => Self::Dummy(management::Dummy { nanos }),
|
||||
Job::PersistSegment {
|
||||
writer_id,
|
||||
segment_id,
|
||||
|
@ -25,7 +40,15 @@ impl From<Job> for management::operation_metadata::Job {
|
|||
writer_id,
|
||||
segment_id,
|
||||
}),
|
||||
Job::Dummy { nanos } => Self::Dummy(management::Dummy { nanos }),
|
||||
Job::CloseChunk {
|
||||
db_name,
|
||||
partition_key,
|
||||
chunk_id,
|
||||
} => Self::CloseChunk(management::CloseChunk {
|
||||
db_name,
|
||||
partition_key,
|
||||
chunk_id,
|
||||
}),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -42,6 +65,15 @@ impl From<management::operation_metadata::Job> for Job {
|
|||
writer_id,
|
||||
segment_id,
|
||||
},
|
||||
Job::CloseChunk(management::CloseChunk {
|
||||
db_name,
|
||||
partition_key,
|
||||
chunk_id,
|
||||
}) => Self::CloseChunk {
|
||||
db_name,
|
||||
partition_key,
|
||||
chunk_id,
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2,22 +2,46 @@ syntax = "proto3";
|
|||
package influxdata.iox.management.v1;
|
||||
|
||||
message OperationMetadata {
|
||||
// How many nanoseconds of CPU time have been spent on this job so far?
|
||||
uint64 cpu_nanos = 1;
|
||||
|
||||
// How many nanoseconds has it been since the job was submitted
|
||||
uint64 wall_nanos = 2;
|
||||
|
||||
// How many total tasks does this job have currently
|
||||
uint64 task_count = 3;
|
||||
|
||||
// How many tasks for this job are still pending
|
||||
uint64 pending_count = 4;
|
||||
|
||||
// What kind of job is it?
|
||||
oneof job {
|
||||
Dummy dummy = 5;
|
||||
PersistSegment persist_segment = 6;
|
||||
CloseChunk close_chunk = 7;
|
||||
}
|
||||
}
|
||||
|
||||
// A job that simply sleeps for a specified time and then returns success
|
||||
message Dummy {
|
||||
// How long the job should sleep for before returning
|
||||
repeated uint64 nanos = 1;
|
||||
}
|
||||
|
||||
// A job that persists a WAL segment to object store
|
||||
message PersistSegment {
|
||||
uint32 writer_id = 1;
|
||||
uint64 segment_id = 2;
|
||||
}
|
||||
|
||||
message Dummy {
|
||||
repeated uint64 nanos = 1;
|
||||
// Move a chunk from mutable buffer to read buffer
|
||||
message CloseChunk {
|
||||
// name of the database
|
||||
string db_name = 1;
|
||||
|
||||
// partition key
|
||||
string partition_key = 2;
|
||||
|
||||
// chunk_id
|
||||
uint32 chunk_id = 3;
|
||||
}
|
||||
|
|
|
@ -50,6 +50,9 @@ service ManagementService {
|
|||
// Create a new chunk in the mutable buffer
|
||||
rpc NewPartitionChunk(NewPartitionChunkRequest) returns (NewPartitionChunkResponse);
|
||||
|
||||
// Close a chunk and move it to the read buffer
|
||||
rpc ClosePartitionChunk(ClosePartitionChunkRequest) returns (ClosePartitionChunkResponse);
|
||||
|
||||
}
|
||||
|
||||
message GetWriterIdRequest {}
|
||||
|
@ -184,3 +187,20 @@ message NewPartitionChunkRequest {
|
|||
|
||||
message NewPartitionChunkResponse {
|
||||
}
|
||||
|
||||
// Request that a chunk be closed and moved to the read buffer
|
||||
message ClosePartitionChunkRequest {
|
||||
// the name of the database
|
||||
string db_name = 1;
|
||||
|
||||
// the partition key
|
||||
string partition_key = 2;
|
||||
|
||||
// the chunk id
|
||||
uint32 chunk_id = 3;
|
||||
}
|
||||
|
||||
message ClosePartitionChunkResponse {
|
||||
// The operation that tracks the work for migrating the chunk
|
||||
google.longrunning.Operation operation = 1;
|
||||
}
|
||||
|
|
|
@ -165,6 +165,22 @@ pub enum NewPartitionChunkError {
|
|||
ServerError(tonic::Status),
|
||||
}
|
||||
|
||||
/// Errors returned by Client::close_partition_chunk
|
||||
#[derive(Debug, Error)]
|
||||
pub enum ClosePartitionChunkError {
|
||||
/// Database not found
|
||||
#[error("Database not found")]
|
||||
DatabaseNotFound,
|
||||
|
||||
/// Response contained no payload
|
||||
#[error("Server returned an empty response")]
|
||||
EmptyResponse,
|
||||
|
||||
/// Client received an unexpected error from the server
|
||||
#[error("Unexpected server error: {}: {}", .0.code(), .0.message())]
|
||||
ServerError(tonic::Status),
|
||||
}
|
||||
|
||||
/// An IOx Management API client.
|
||||
///
|
||||
/// This client wraps the underlying `tonic` generated client with a
|
||||
|
@ -446,4 +462,36 @@ impl Client {
|
|||
.operation
|
||||
.ok_or(CreateDummyJobError::EmptyResponse)?)
|
||||
}
|
||||
|
||||
/// Closes the specified chunk in the specified partition and
|
||||
/// begins it moving to the read buffer.
|
||||
///
|
||||
/// Returns the job tracking the data's movement
|
||||
pub async fn close_partition_chunk(
|
||||
&mut self,
|
||||
db_name: impl Into<String>,
|
||||
partition_key: impl Into<String>,
|
||||
chunk_id: u32,
|
||||
) -> Result<Operation, ClosePartitionChunkError> {
|
||||
let db_name = db_name.into();
|
||||
let partition_key = partition_key.into();
|
||||
|
||||
let response = self
|
||||
.inner
|
||||
.close_partition_chunk(ClosePartitionChunkRequest {
|
||||
db_name,
|
||||
partition_key,
|
||||
chunk_id,
|
||||
})
|
||||
.await
|
||||
.map_err(|status| match status.code() {
|
||||
tonic::Code::NotFound => ClosePartitionChunkError::DatabaseNotFound,
|
||||
_ => ClosePartitionChunkError::ServerError(status),
|
||||
})?;
|
||||
|
||||
Ok(response
|
||||
.into_inner()
|
||||
.operation
|
||||
.ok_or(ClosePartitionChunkError::EmptyResponse)?)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -45,6 +45,16 @@ pub enum Error {
|
|||
#[snafu(display("Cannot read to this database: no mutable buffer configured"))]
|
||||
DatabaseNotReadable {},
|
||||
|
||||
#[snafu(display(
|
||||
"Only closed chunks can be moved to read buffer. Chunk {} {} was open",
|
||||
partition_key,
|
||||
chunk_id
|
||||
))]
|
||||
ChunkNotClosed {
|
||||
partition_key: String,
|
||||
chunk_id: u32,
|
||||
},
|
||||
|
||||
#[snafu(display("Error dropping data from mutable buffer: {}", source))]
|
||||
MutableBufferDrop {
|
||||
source: mutable_buffer::database::Error,
|
||||
|
@ -130,6 +140,16 @@ impl Db {
|
|||
}
|
||||
}
|
||||
|
||||
/// Return true if the specified chunk is still open for new writes
|
||||
pub fn is_open_chunk(&self, partition_key: &str, chunk_id: u32) -> bool {
|
||||
if let Some(mutable_buffer) = self.mutable_buffer.as_ref() {
|
||||
let open_chunk_id = mutable_buffer.open_chunk_id(partition_key);
|
||||
open_chunk_id == chunk_id
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
// Return a list of all chunks in the mutable_buffer (that can
|
||||
// potentially be migrated into the read buffer or object store)
|
||||
pub fn mutable_buffer_chunks(&self, partition_key: &str) -> Vec<Arc<DBChunk>> {
|
||||
|
@ -150,6 +170,19 @@ impl Db {
|
|||
chunks
|
||||
}
|
||||
|
||||
// Return the specified chunk in the mutable buffer
|
||||
pub fn mutable_buffer_chunk(
|
||||
&self,
|
||||
partition_key: &str,
|
||||
chunk_id: u32,
|
||||
) -> Result<Arc<mutable_buffer::chunk::Chunk>> {
|
||||
self.mutable_buffer
|
||||
.as_ref()
|
||||
.context(DatatbaseNotWriteable)?
|
||||
.get_chunk(partition_key, chunk_id)
|
||||
.context(UnknownMutableBufferChunk { chunk_id })
|
||||
}
|
||||
|
||||
/// List chunks that are currently in the read buffer
|
||||
pub fn read_buffer_chunks(&self, partition_key: &str) -> Vec<Arc<DBChunk>> {
|
||||
self.read_buffer
|
||||
|
@ -210,12 +243,16 @@ impl Db {
|
|||
partition_key: &str,
|
||||
chunk_id: u32,
|
||||
) -> Result<Arc<DBChunk>> {
|
||||
let mb_chunk = self
|
||||
.mutable_buffer
|
||||
.as_ref()
|
||||
.context(DatatbaseNotWriteable)?
|
||||
.get_chunk(partition_key, chunk_id)
|
||||
.context(UnknownMutableBufferChunk { chunk_id })?;
|
||||
let mb_chunk = self.mutable_buffer_chunk(partition_key, chunk_id)?;
|
||||
|
||||
// Can't load an open chunk to the read buffer
|
||||
if self.is_open_chunk(partition_key, chunk_id) {
|
||||
return ChunkNotClosed {
|
||||
partition_key,
|
||||
chunk_id,
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
|
||||
let mut batches = Vec::new();
|
||||
for stats in mb_chunk.table_stats().unwrap() {
|
||||
|
@ -439,6 +476,28 @@ mod tests {
|
|||
assert_table_eq!(&expected, &batches);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn no_load_open_chunk() {
|
||||
// Test that data can not be loaded into the ReadBuffer while
|
||||
// still open (no way to ensure that new data gets into the
|
||||
// read buffer)
|
||||
let db = make_db();
|
||||
let mut writer = TestLPWriter::default();
|
||||
writer.write_lp_string(&db, "cpu bar=1 10").await.unwrap();
|
||||
|
||||
let partition_key = "1970-01-01T00";
|
||||
let err = db
|
||||
.load_chunk_to_read_buffer(partition_key, 0)
|
||||
.await
|
||||
.unwrap_err();
|
||||
|
||||
// it should be the same chunk!
|
||||
assert_contains!(
|
||||
err.to_string(),
|
||||
"Only closed chunks can be moved to read buffer. Chunk 1970-01-01T00 0 was open"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn read_from_read_buffer() {
|
||||
// Test that data can be loaded into the ReadBuffer
|
||||
|
|
|
@ -77,7 +77,7 @@ use bytes::Bytes;
|
|||
use futures::stream::TryStreamExt;
|
||||
use parking_lot::Mutex;
|
||||
use snafu::{OptionExt, ResultExt, Snafu};
|
||||
use tracing::{error, info};
|
||||
use tracing::{debug, error, info};
|
||||
|
||||
use data_types::{
|
||||
data::{lines_to_replicated_write, ReplicatedWrite},
|
||||
|
@ -122,6 +122,8 @@ pub enum Error {
|
|||
InvalidDatabaseName { source: DatabaseNameError },
|
||||
#[snafu(display("database error: {}", source))]
|
||||
UnknownDatabaseError { source: DatabaseError },
|
||||
#[snafu(display("getting mutable buffer chunk: {}", source))]
|
||||
MutableBufferChunk { source: DatabaseError },
|
||||
#[snafu(display("no local buffer for database: {}", db))]
|
||||
NoLocalBuffer { db: String },
|
||||
#[snafu(display("unable to get connection to remote server: {}", server))]
|
||||
|
@ -401,6 +403,67 @@ impl<M: ConnectionManager> Server<M> {
|
|||
tracker
|
||||
}
|
||||
|
||||
/// Closes a chunk and starts moving its data to the read buffer, as a
|
||||
/// background job, dropping when complete.
|
||||
pub fn close_chunk(
|
||||
&self,
|
||||
db_name: DatabaseName<'_>,
|
||||
partition_key: impl Into<String>,
|
||||
chunk_id: u32,
|
||||
) -> Result<Tracker<Job>> {
|
||||
let db_name = db_name.to_string();
|
||||
let name = DatabaseName::new(&db_name).context(InvalidDatabaseName)?;
|
||||
|
||||
let partition_key = partition_key.into();
|
||||
|
||||
let db = self
|
||||
.config
|
||||
.db(&name)
|
||||
.context(DatabaseNotFound { db_name: &db_name })?;
|
||||
|
||||
let (tracker, registration) = self.jobs.lock().register(Job::CloseChunk {
|
||||
db_name: db_name.clone(),
|
||||
partition_key: partition_key.clone(),
|
||||
chunk_id,
|
||||
});
|
||||
|
||||
let task = async move {
|
||||
// Close the chunk if it isn't already closed
|
||||
if db.is_open_chunk(&partition_key, chunk_id) {
|
||||
debug!(%db_name, %partition_key, %chunk_id, "Rolling over partition to close chunk");
|
||||
let result = db.rollover_partition(&partition_key).await;
|
||||
|
||||
if let Err(e) = result {
|
||||
info!(?e, %db_name, %partition_key, %chunk_id, "background task error during chunk closing");
|
||||
return Err(e);
|
||||
}
|
||||
}
|
||||
|
||||
debug!(%db_name, %partition_key, %chunk_id, "background task loading chunk to read buffer");
|
||||
let result = db.load_chunk_to_read_buffer(&partition_key, chunk_id).await;
|
||||
if let Err(e) = result {
|
||||
info!(?e, %db_name, %partition_key, %chunk_id, "background task error loading read buffer chunk");
|
||||
return Err(e);
|
||||
}
|
||||
|
||||
// now, drop the chunk
|
||||
debug!(%db_name, %partition_key, %chunk_id, "background task dropping mutable buffer chunk");
|
||||
let result = db.drop_mutable_buffer_chunk(&partition_key, chunk_id).await;
|
||||
if let Err(e) = result {
|
||||
info!(?e, %db_name, %partition_key, %chunk_id, "background task error loading read buffer chunk");
|
||||
return Err(e);
|
||||
}
|
||||
|
||||
debug!(%db_name, %partition_key, %chunk_id, "background task completed closing chunk");
|
||||
|
||||
Ok(())
|
||||
};
|
||||
|
||||
tokio::spawn(task.track(registration));
|
||||
|
||||
Ok(tracker)
|
||||
}
|
||||
|
||||
/// Returns a list of all jobs tracked by this server
|
||||
pub fn tracked_jobs(&self) -> Vec<Tracker<Job>> {
|
||||
self.jobs.lock().tracked()
|
||||
|
@ -418,20 +481,18 @@ impl<M: ConnectionManager> Server<M> {
|
|||
let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(1));
|
||||
|
||||
loop {
|
||||
// TODO: Retain limited history of past jobs, e.g. enqueue returned data into a
|
||||
// Dequeue
|
||||
let mut jobs = self.jobs.lock();
|
||||
|
||||
for job in jobs.reclaim() {
|
||||
info!(?job, "job finished");
|
||||
}
|
||||
|
||||
// Ensure mutex guard is not held across await point
|
||||
std::mem::drop(jobs);
|
||||
|
||||
self.reclaim_jobs();
|
||||
interval.tick().await;
|
||||
}
|
||||
}
|
||||
|
||||
fn reclaim_jobs(&self) {
|
||||
let mut jobs = self.jobs.lock();
|
||||
|
||||
for job in jobs.reclaim() {
|
||||
info!(?job, "job finished");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
|
@ -571,7 +632,7 @@ mod tests {
|
|||
};
|
||||
use influxdb_line_protocol::parse_lines;
|
||||
use object_store::{memory::InMemory, path::ObjectStorePath};
|
||||
use query::frontend::sql::SQLQueryPlanner;
|
||||
use query::{frontend::sql::SQLQueryPlanner, Database};
|
||||
|
||||
use crate::buffer::Segment;
|
||||
|
||||
|
@ -769,6 +830,71 @@ mod tests {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn close_chunk() -> Result {
|
||||
test_helpers::maybe_start_logging();
|
||||
let manager = TestConnectionManager::new();
|
||||
let store = Arc::new(ObjectStore::new_in_memory(InMemory::new()));
|
||||
let server = Arc::new(Server::new(manager, store));
|
||||
|
||||
let captured_server = Arc::clone(&server);
|
||||
let background_handle =
|
||||
tokio::task::spawn(async move { captured_server.background_worker().await });
|
||||
|
||||
server.set_id(1);
|
||||
|
||||
let db_name = DatabaseName::new("foo").unwrap();
|
||||
server
|
||||
.create_database(db_name.as_str(), DatabaseRules::new())
|
||||
.await?;
|
||||
|
||||
let line = "cpu bar=1 10";
|
||||
let lines: Vec<_> = parse_lines(line).map(|l| l.unwrap()).collect();
|
||||
server.write_lines(&db_name, &lines).await.unwrap();
|
||||
|
||||
// start the close (note this is not an async)
|
||||
let partition_key = "";
|
||||
let db_name_string = db_name.to_string();
|
||||
let tracker = server.close_chunk(db_name, partition_key, 0).unwrap();
|
||||
|
||||
let metadata = tracker.metadata();
|
||||
let expected_metadata = Job::CloseChunk {
|
||||
db_name: db_name_string,
|
||||
partition_key: partition_key.to_string(),
|
||||
chunk_id: 0,
|
||||
};
|
||||
assert_eq!(metadata, &expected_metadata);
|
||||
|
||||
// wait for the job to complete
|
||||
tracker.join().await;
|
||||
|
||||
// Data should be in the read buffer and not in mutable buffer
|
||||
let db_name = DatabaseName::new("foo").unwrap();
|
||||
let db = server.db(&db_name).unwrap();
|
||||
|
||||
let mut chunk_summaries = db.chunk_summaries().unwrap();
|
||||
chunk_summaries.sort_unstable();
|
||||
|
||||
let actual = chunk_summaries
|
||||
.into_iter()
|
||||
.map(|s| format!("{:?} {}", s.storage, s.id))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let expected = vec!["ReadBuffer 0", "OpenMutableBuffer 1"];
|
||||
|
||||
assert_eq!(
|
||||
expected, actual,
|
||||
"expected:\n{:#?}\n\nactual:{:#?}\n\n",
|
||||
expected, actual
|
||||
);
|
||||
|
||||
// ensure that we don't leave the server instance hanging around
|
||||
background_handle.abort();
|
||||
let _ = background_handle.await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn segment_persisted_on_rollover() {
|
||||
let manager = TestConnectionManager::new();
|
||||
|
@ -819,6 +945,30 @@ partition_key:
|
|||
assert_eq!(segment.writes[0].to_string(), write);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn background_task_cleans_jobs() -> Result {
|
||||
let manager = TestConnectionManager::new();
|
||||
let store = Arc::new(ObjectStore::new_in_memory(InMemory::new()));
|
||||
let server = Arc::new(Server::new(manager, store));
|
||||
let captured_server = Arc::clone(&server);
|
||||
let background_handle =
|
||||
tokio::task::spawn(async move { captured_server.background_worker().await });
|
||||
|
||||
let wait_nanos = 1000;
|
||||
let job = server.spawn_dummy_job(vec![wait_nanos]);
|
||||
|
||||
// Note: this will hang forwever if the background task has not been started
|
||||
job.join().await;
|
||||
|
||||
assert!(job.is_complete());
|
||||
|
||||
// ensure that we don't leave the server instance hanging around
|
||||
background_handle.abort();
|
||||
let _ = background_handle.await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[derive(Snafu, Debug, Clone)]
|
||||
enum TestClusterError {
|
||||
#[snafu(display("Test cluster error: {}", message))]
|
||||
|
|
|
@ -1,14 +1,15 @@
|
|||
//! This module implements the `partition` CLI command
|
||||
use data_types::chunk::ChunkSummary;
|
||||
use data_types::job::Operation;
|
||||
use generated_types::google::FieldViolation;
|
||||
use influxdb_iox_client::{
|
||||
connection::Builder,
|
||||
management::{
|
||||
self, GetPartitionError, ListPartitionChunksError, ListPartitionsError,
|
||||
NewPartitionChunkError,
|
||||
self, ClosePartitionChunkError, GetPartitionError, ListPartitionChunksError,
|
||||
ListPartitionsError, NewPartitionChunkError,
|
||||
},
|
||||
};
|
||||
use std::convert::TryFrom;
|
||||
use std::convert::{TryFrom, TryInto};
|
||||
use structopt::StructOpt;
|
||||
use thiserror::Error;
|
||||
|
||||
|
@ -26,14 +27,15 @@ pub enum Error {
|
|||
#[error("Error creating new partition chunk: {0}")]
|
||||
NewPartitionChunkError(#[from] NewPartitionChunkError),
|
||||
|
||||
#[error("Error interpreting server response: {0}")]
|
||||
ConvertingResponse(#[from] FieldViolation),
|
||||
#[error("Error closing chunk: {0}")]
|
||||
ClosePartitionChunkError(#[from] ClosePartitionChunkError),
|
||||
|
||||
#[error("Error rendering response as JSON: {0}")]
|
||||
WritingJson(#[from] serde_json::Error),
|
||||
|
||||
// #[error("Error rendering response as JSON: {0}")]
|
||||
// WritingJson(#[from] serde_json::Error),
|
||||
#[error("Received invalid response: {0}")]
|
||||
InvalidResponse(#[from] FieldViolation),
|
||||
|
||||
#[error("Error connecting to IOx: {0}")]
|
||||
ConnectionError(#[from] influxdb_iox_client::connection::Error),
|
||||
}
|
||||
|
@ -85,6 +87,20 @@ struct NewChunk {
|
|||
partition_key: String,
|
||||
}
|
||||
|
||||
/// Closes a chunk in the mutable buffer for writing and starts its migration to
|
||||
/// the read buffer
|
||||
#[derive(Debug, StructOpt)]
|
||||
struct CloseChunk {
|
||||
/// The name of the database
|
||||
db_name: String,
|
||||
|
||||
/// The partition key
|
||||
partition_key: String,
|
||||
|
||||
/// The chunk id
|
||||
chunk_id: u32,
|
||||
}
|
||||
|
||||
/// All possible subcommands for partition
|
||||
#[derive(Debug, StructOpt)]
|
||||
enum Command {
|
||||
|
@ -96,6 +112,8 @@ enum Command {
|
|||
ListChunks(ListChunks),
|
||||
// Create a new chunk in the partition
|
||||
NewChunk(NewChunk),
|
||||
// Close the chunk and move to read buffer
|
||||
CloseChunk(CloseChunk),
|
||||
}
|
||||
|
||||
pub async fn command(url: String, config: Config) -> Result<()> {
|
||||
|
@ -156,6 +174,20 @@ pub async fn command(url: String, config: Config) -> Result<()> {
|
|||
client.new_partition_chunk(db_name, partition_key).await?;
|
||||
println!("Ok");
|
||||
}
|
||||
Command::CloseChunk(close_chunk) => {
|
||||
let CloseChunk {
|
||||
db_name,
|
||||
partition_key,
|
||||
chunk_id,
|
||||
} = close_chunk;
|
||||
|
||||
let operation: Operation = client
|
||||
.close_partition_chunk(db_name, partition_key, chunk_id)
|
||||
.await?
|
||||
.try_into()?;
|
||||
|
||||
serde_json::to_writer_pretty(std::io::stdout(), &operation)?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
use generated_types::google::{InternalError, NotFound, PreconditionViolation};
|
||||
use generated_types::google::{FieldViolation, InternalError, NotFound, PreconditionViolation};
|
||||
use tracing::error;
|
||||
|
||||
/// map common `server::Error` errors to the appropriate tonic Status
|
||||
|
@ -18,6 +18,11 @@ pub fn default_server_error_handler(error: server::Error) -> tonic::Status {
|
|||
..Default::default()
|
||||
}
|
||||
.into(),
|
||||
Error::InvalidDatabaseName { source } => FieldViolation {
|
||||
field: "db_name".into(),
|
||||
description: source.to_string(),
|
||||
}
|
||||
.into(),
|
||||
error => {
|
||||
error!(?error, "Unexpected error");
|
||||
InternalError {}.into()
|
||||
|
|
|
@ -132,8 +132,8 @@ where
|
|||
request: Request<CreateDummyJobRequest>,
|
||||
) -> Result<Response<CreateDummyJobResponse>, Status> {
|
||||
let request = request.into_inner();
|
||||
let slot = self.server.spawn_dummy_job(request.nanos);
|
||||
let operation = Some(super::operations::encode_tracker(slot)?);
|
||||
let tracker = self.server.spawn_dummy_job(request.nanos);
|
||||
let operation = Some(super::operations::encode_tracker(tracker)?);
|
||||
Ok(Response::new(CreateDummyJobResponse { operation }))
|
||||
}
|
||||
|
||||
|
@ -280,6 +280,29 @@ where
|
|||
|
||||
Ok(Response::new(NewPartitionChunkResponse {}))
|
||||
}
|
||||
|
||||
async fn close_partition_chunk(
|
||||
&self,
|
||||
request: Request<ClosePartitionChunkRequest>,
|
||||
) -> Result<Response<ClosePartitionChunkResponse>, Status> {
|
||||
let ClosePartitionChunkRequest {
|
||||
db_name,
|
||||
partition_key,
|
||||
chunk_id,
|
||||
} = request.into_inner();
|
||||
|
||||
// Validate that the database name is legit
|
||||
let db_name = DatabaseName::new(db_name).field("db_name")?;
|
||||
|
||||
let tracker = self
|
||||
.server
|
||||
.close_chunk(db_name, partition_key, chunk_id)
|
||||
.map_err(default_server_error_handler)?;
|
||||
|
||||
let operation = Some(super::operations::encode_tracker(tracker)?);
|
||||
|
||||
Ok(Response::new(ClosePartitionChunkResponse { operation }))
|
||||
}
|
||||
}
|
||||
|
||||
pub fn make_server<M>(
|
||||
|
|
|
@ -1,15 +1,20 @@
|
|||
use std::num::NonZeroU32;
|
||||
|
||||
use generated_types::google::protobuf::Empty;
|
||||
use generated_types::{google::protobuf::Duration, influxdata::iox::management::v1::*};
|
||||
use generated_types::{
|
||||
google::protobuf::{Duration, Empty},
|
||||
influxdata::iox::management::v1::*,
|
||||
};
|
||||
use influxdb_iox_client::management::CreateDatabaseError;
|
||||
use test_helpers::assert_contains;
|
||||
|
||||
use crate::common::server_fixture::ServerFixture;
|
||||
|
||||
use super::scenario::{
|
||||
create_readable_database, create_two_partition_database, create_unreadable_database, rand_name,
|
||||
use super::{
|
||||
operations_api::get_operation_metadata,
|
||||
scenario::{
|
||||
create_readable_database, create_two_partition_database, create_unreadable_database,
|
||||
rand_name,
|
||||
},
|
||||
};
|
||||
use crate::common::server_fixture::ServerFixture;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_list_update_remotes() {
|
||||
|
@ -539,3 +544,86 @@ async fn test_new_partition_chunk_error() {
|
|||
|
||||
assert_contains!(err.to_string(), "Database not found");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_close_partition_chunk() {
|
||||
use influxdb_iox_client::management::generated_types::operation_metadata::Job;
|
||||
use influxdb_iox_client::management::generated_types::ChunkStorage;
|
||||
|
||||
let fixture = ServerFixture::create_shared().await;
|
||||
let mut management_client = fixture.management_client();
|
||||
let mut write_client = fixture.write_client();
|
||||
let mut operations_client = fixture.operations_client();
|
||||
|
||||
let db_name = rand_name();
|
||||
create_readable_database(&db_name, fixture.grpc_channel()).await;
|
||||
|
||||
let partition_key = "cpu";
|
||||
let lp_lines = vec!["cpu,region=west user=23.2 100"];
|
||||
|
||||
write_client
|
||||
.write(&db_name, lp_lines.join("\n"))
|
||||
.await
|
||||
.expect("write succeded");
|
||||
|
||||
let chunks = management_client
|
||||
.list_chunks(&db_name)
|
||||
.await
|
||||
.expect("listing chunks");
|
||||
|
||||
assert_eq!(chunks.len(), 1, "Chunks: {:#?}", chunks);
|
||||
assert_eq!(chunks[0].id, 0);
|
||||
assert_eq!(chunks[0].storage, ChunkStorage::OpenMutableBuffer as i32);
|
||||
|
||||
// Move the chunk to read buffer
|
||||
let operation = management_client
|
||||
.close_partition_chunk(&db_name, partition_key, 0)
|
||||
.await
|
||||
.expect("new partition chunk");
|
||||
|
||||
println!("Operation response is {:?}", operation);
|
||||
let operation_id = operation.name.parse().expect("not an integer");
|
||||
|
||||
let meta = get_operation_metadata(operation.metadata);
|
||||
|
||||
// ensure we got a legit job description back
|
||||
if let Some(Job::CloseChunk(close_chunk)) = meta.job {
|
||||
assert_eq!(close_chunk.db_name, db_name);
|
||||
assert_eq!(close_chunk.partition_key, partition_key);
|
||||
assert_eq!(close_chunk.chunk_id, 0);
|
||||
} else {
|
||||
panic!("unexpected job returned")
|
||||
};
|
||||
|
||||
// wait for the job to be done
|
||||
operations_client
|
||||
.wait_operation(operation_id, Some(std::time::Duration::from_secs(1)))
|
||||
.await
|
||||
.expect("failed to wait operation");
|
||||
|
||||
// And now the chunk should be good
|
||||
let mut chunks = management_client
|
||||
.list_chunks(&db_name)
|
||||
.await
|
||||
.expect("listing chunks");
|
||||
chunks.sort_by(|c1, c2| c1.id.cmp(&c2.id));
|
||||
|
||||
assert_eq!(chunks.len(), 2, "Chunks: {:#?}", chunks);
|
||||
assert_eq!(chunks[0].id, 0);
|
||||
assert_eq!(chunks[0].storage, ChunkStorage::ReadBuffer as i32);
|
||||
assert_eq!(chunks[1].id, 1);
|
||||
assert_eq!(chunks[1].storage, ChunkStorage::OpenMutableBuffer as i32);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_close_partition_chunk_error() {
|
||||
let fixture = ServerFixture::create_shared().await;
|
||||
let mut management_client = fixture.management_client();
|
||||
|
||||
let err = management_client
|
||||
.close_partition_chunk("this database does not exist", "nor_does_this_partition", 0)
|
||||
.await
|
||||
.expect_err("expected error");
|
||||
|
||||
assert_contains!(err.to_string(), "Database not found");
|
||||
}
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
use assert_cmd::Command;
|
||||
use data_types::job::{Job, Operation};
|
||||
use predicates::prelude::*;
|
||||
use test_helpers::make_temp_file;
|
||||
|
||||
|
@ -499,6 +500,69 @@ async fn test_new_partition_chunk_error() {
|
|||
.stderr(predicate::str::contains("Database not found"));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_close_partition_chunk() {
|
||||
let server_fixture = ServerFixture::create_shared().await;
|
||||
let addr = server_fixture.grpc_base();
|
||||
let db_name = rand_name();
|
||||
|
||||
create_readable_database(&db_name, server_fixture.grpc_channel()).await;
|
||||
|
||||
let lp_data = vec!["cpu,region=west user=23.2 100"];
|
||||
load_lp(addr, &db_name, lp_data);
|
||||
|
||||
let stdout: Operation = serde_json::from_slice(
|
||||
&Command::cargo_bin("influxdb_iox")
|
||||
.unwrap()
|
||||
.arg("database")
|
||||
.arg("partition")
|
||||
.arg("close-chunk")
|
||||
.arg(&db_name)
|
||||
.arg("cpu")
|
||||
.arg("0")
|
||||
.arg("--host")
|
||||
.arg(addr)
|
||||
.assert()
|
||||
.success()
|
||||
.get_output()
|
||||
.stdout,
|
||||
)
|
||||
.expect("Expected JSON output");
|
||||
|
||||
let expected_job = Job::CloseChunk {
|
||||
db_name,
|
||||
partition_key: "cpu".into(),
|
||||
chunk_id: 0,
|
||||
};
|
||||
|
||||
assert_eq!(
|
||||
Some(expected_job),
|
||||
stdout.job,
|
||||
"operation was {:#?}",
|
||||
stdout
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_close_partition_chunk_error() {
|
||||
let server_fixture = ServerFixture::create_shared().await;
|
||||
let addr = server_fixture.grpc_base();
|
||||
|
||||
Command::cargo_bin("influxdb_iox")
|
||||
.unwrap()
|
||||
.arg("database")
|
||||
.arg("partition")
|
||||
.arg("close-chunk")
|
||||
.arg("non_existent_database")
|
||||
.arg("non_existent_partition")
|
||||
.arg("0")
|
||||
.arg("--host")
|
||||
.arg(addr)
|
||||
.assert()
|
||||
.failure()
|
||||
.stderr(predicate::str::contains("Database not found"));
|
||||
}
|
||||
|
||||
/// Loads the specified lines into the named database
|
||||
fn load_lp(addr: &str, db_name: &str, lp_data: Vec<&str>) {
|
||||
let lp_data_file = make_temp_file(lp_data.join("\n"));
|
||||
|
|
|
@ -3,7 +3,9 @@ use generated_types::google::protobuf::Any;
|
|||
use influxdb_iox_client::{management::generated_types::*, operations, protobuf_type_url_eq};
|
||||
use std::time::Duration;
|
||||
|
||||
fn get_operation_metadata(metadata: Option<Any>) -> OperationMetadata {
|
||||
// TODO remove after #1001 and use something directly in the influxdb_iox_client
|
||||
// crate
|
||||
pub fn get_operation_metadata(metadata: Option<Any>) -> OperationMetadata {
|
||||
assert!(metadata.is_some());
|
||||
let metadata = metadata.unwrap();
|
||||
assert!(protobuf_type_url_eq(&metadata.type_url, OPERATION_METADATA));
|
||||
|
|
Loading…
Reference in New Issue