chore: merge main to branch

pull/24376/head
Nga Tran 2021-09-16 17:28:37 -04:00
commit 0444d1b4fd
42 changed files with 573 additions and 645 deletions

2
Cargo.lock generated
View File

@ -2902,6 +2902,8 @@ checksum = "acbf547ad0c65e31259204bd90935776d1c693cec2f4ff7abb7a1bbbd40dfe58"
name = "pbjson"
version = "0.1.0"
dependencies = [
"base64 0.13.0",
"bytes",
"serde",
]

View File

@ -117,45 +117,3 @@ impl Job {
}
}
}
/// The status of a running operation
#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
pub enum OperationStatus {
/// A task associated with the operation is running
Running,
/// All tasks associated with the operation have finished successfully
Success,
/// The operation was cancelled and no associated tasks are running
Cancelled,
/// An operation error was returned
Errored,
}
/// A group of asynchronous tasks being performed by an IOx server
///
/// TODO: Temporary until prost adds JSON support
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Operation {
/// ID of the running operation
pub id: usize,
// The total number of created tasks
pub total_count: u64,
// The number of pending tasks
pub pending_count: u64,
// The number of tasks that completed successfully
pub success_count: u64,
// The number of tasks that returned an error
pub error_count: u64,
// The number of tasks that were cancelled
pub cancelled_count: u64,
// The number of tasks that did not run to completion (e.g. panic)
pub dropped_count: u64,
/// Wall time spent executing this operation
pub wall_time: std::time::Duration,
/// CPU time spent executing this operation
pub cpu_time: std::time::Duration,
/// Additional job metadata
pub job: Option<Job>,
/// The status of the running operation
pub status: OperationStatus,
}

View File

@ -27,12 +27,11 @@ message OperationMetadata {
// The number of tasks that did not run to completion (e.g. panic)
uint64 dropped_count = 16;
reserved 6;
// What kind of job is it?
oneof job {
Dummy dummy = 5;
/* historical artifact
PersistSegment persist_segment = 6;
*/
CloseChunk close_chunk = 7;
WriteChunk write_chunk = 8;
WipePreservedCatalog wipe_preserved_catalog = 9;

View File

@ -12,6 +12,11 @@ pub mod longrunning {
include!(concat!(env!("OUT_DIR"), "/google.longrunning.rs"));
include!(concat!(env!("OUT_DIR"), "/google.longrunning.serde.rs"));
use crate::google::{FieldViolation, FieldViolationExt};
use crate::influxdata::iox::management::v1::{OperationMetadata, OPERATION_METADATA};
use prost::{bytes::Bytes, Message};
use std::convert::TryFrom;
impl Operation {
/// Return the IOx operation `id`. This `id` can
/// be passed to the various APIs in the
@ -21,6 +26,46 @@ pub mod longrunning {
.parse()
.expect("Internal error: id returned from server was not an integer")
}
/// Decodes an IOx `OperationMetadata` metadata payload
pub fn iox_metadata(&self) -> Result<OperationMetadata, FieldViolation> {
let metadata = self
.metadata
.as_ref()
.ok_or_else(|| FieldViolation::required("metadata"))?;
if !crate::protobuf_type_url_eq(&metadata.type_url, OPERATION_METADATA) {
return Err(FieldViolation {
field: "metadata.type_url".to_string(),
description: "Unexpected field type".to_string(),
});
}
Message::decode(Bytes::clone(&metadata.value)).field("metadata.value")
}
}
/// Groups together an `Operation` with a decoded `OperationMetadata`
///
/// When serialized this will serialize the encoded Any field on `Operation` along
/// with its decoded representation as `OperationMetadata`
#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
pub struct IoxOperation {
/// The `Operation` message returned from the API
pub operation: Operation,
/// The decoded `Operation::metadata` contained within `IoxOperation::operation`
pub metadata: OperationMetadata,
}
impl TryFrom<Operation> for IoxOperation {
type Error = FieldViolation;
fn try_from(operation: Operation) -> Result<Self, Self::Error> {
Ok(Self {
metadata: operation.iox_metadata()?,
operation,
})
}
}
}

View File

@ -1,11 +1,5 @@
use crate::google::{longrunning, protobuf::Any, FieldViolation, FieldViolationExt};
use crate::influxdata::iox::management::v1 as management;
use crate::protobuf_type_url_eq;
use data_types::chunk_metadata::ChunkAddr;
use data_types::job::{Job, OperationStatus};
use data_types::partition_metadata::PartitionAddr;
use std::convert::TryFrom;
use std::sync::Arc;
use data_types::job::Job;
impl From<Job> for management::operation_metadata::Job {
fn from(job: Job) -> Self {
@ -61,142 +55,3 @@ impl From<Job> for management::operation_metadata::Job {
}
}
}
impl From<management::operation_metadata::Job> for Job {
fn from(value: management::operation_metadata::Job) -> Self {
use management::operation_metadata::Job;
match value {
Job::Dummy(management::Dummy { nanos, db_name }) => Self::Dummy {
nanos,
db_name: (!db_name.is_empty()).then(|| Arc::from(db_name.as_str())),
},
Job::CloseChunk(management::CloseChunk {
db_name,
partition_key,
table_name,
chunk_id,
}) => Self::CompactChunk {
chunk: ChunkAddr {
db_name: Arc::from(db_name.as_str()),
table_name: Arc::from(table_name.as_str()),
partition_key: Arc::from(partition_key.as_str()),
chunk_id,
},
},
Job::WriteChunk(management::WriteChunk {
db_name,
partition_key,
table_name,
chunk_id,
}) => Self::WriteChunk {
chunk: ChunkAddr {
db_name: Arc::from(db_name.as_str()),
table_name: Arc::from(table_name.as_str()),
partition_key: Arc::from(partition_key.as_str()),
chunk_id,
},
},
Job::WipePreservedCatalog(management::WipePreservedCatalog { db_name }) => {
Self::WipePreservedCatalog {
db_name: Arc::from(db_name.as_str()),
}
}
Job::CompactChunks(management::CompactChunks {
db_name,
partition_key,
table_name,
chunks,
}) => Self::CompactChunks {
partition: PartitionAddr {
db_name: Arc::from(db_name.as_str()),
table_name: Arc::from(table_name.as_str()),
partition_key: Arc::from(partition_key.as_str()),
},
chunks,
},
Job::PersistChunks(management::PersistChunks {
db_name,
partition_key,
table_name,
chunks,
}) => Self::PersistChunks {
partition: PartitionAddr {
db_name: Arc::from(db_name.as_str()),
table_name: Arc::from(table_name.as_str()),
partition_key: Arc::from(partition_key.as_str()),
},
chunks,
},
Job::DropChunk(management::DropChunk {
db_name,
partition_key,
table_name,
chunk_id,
}) => Self::DropChunk {
chunk: ChunkAddr {
db_name: Arc::from(db_name.as_str()),
table_name: Arc::from(table_name.as_str()),
partition_key: Arc::from(partition_key.as_str()),
chunk_id,
},
},
Job::DropPartition(management::DropPartition {
db_name,
partition_key,
table_name,
}) => Self::DropPartition {
partition: PartitionAddr {
db_name: Arc::from(db_name.as_str()),
table_name: Arc::from(table_name.as_str()),
partition_key: Arc::from(partition_key.as_str()),
},
},
}
}
}
impl TryFrom<longrunning::Operation> for data_types::job::Operation {
type Error = FieldViolation;
fn try_from(operation: longrunning::Operation) -> Result<Self, Self::Error> {
let metadata: Any = operation
.metadata
.ok_or_else(|| FieldViolation::required("metadata"))?;
if !protobuf_type_url_eq(&metadata.type_url, management::OPERATION_METADATA) {
return Err(FieldViolation {
field: "metadata.type_url".to_string(),
description: "Unexpected field type".to_string(),
});
}
let meta: management::OperationMetadata =
prost::Message::decode(metadata.value).field("metadata.value")?;
let status = match &operation.result {
None => OperationStatus::Running,
Some(longrunning::operation::Result::Response(_)) => OperationStatus::Success,
Some(longrunning::operation::Result::Error(status)) => {
if status.code == tonic::Code::Cancelled as i32 {
OperationStatus::Cancelled
} else {
OperationStatus::Errored
}
}
};
Ok(Self {
id: operation.name.parse().field("name")?,
total_count: meta.total_count,
pending_count: meta.pending_count,
success_count: meta.success_count,
error_count: meta.error_count,
cancelled_count: meta.cancelled_count,
dropped_count: meta.dropped_count,
wall_time: std::time::Duration::from_nanos(meta.wall_nanos),
cpu_time: std::time::Duration::from_nanos(meta.cpu_nanos),
job: meta.job.map(Into::into),
status,
})
}
}

View File

@ -3,8 +3,8 @@ use thiserror::Error;
use self::generated_types::{management_service_client::ManagementServiceClient, *};
use crate::connection::Connection;
use ::generated_types::google::longrunning::Operation;
use crate::google::{longrunning::IoxOperation, FieldViolation};
use std::convert::TryInto;
use std::num::NonZeroU32;
@ -180,6 +180,10 @@ pub enum CreateDummyJobError {
#[error("Server returned an empty response")]
EmptyResponse,
/// Response payload was invalid
#[error("Invalid response: {0}")]
InvalidResponse(#[from] FieldViolation),
/// Client received an unexpected error from the server
#[error("Unexpected server error: {}: {}", .0.code(), .0.message())]
ServerError(tonic::Status),
@ -264,6 +268,10 @@ pub enum ClosePartitionChunkError {
#[error("Server unavailable: {}", .0.message())]
Unavailable(tonic::Status),
/// Response payload was invalid
#[error("Invalid response: {0}")]
InvalidResponse(#[from] FieldViolation),
/// Client received an unexpected error from the server
#[error("Unexpected server error: {}: {}", .0.code(), .0.message())]
ServerError(tonic::Status),
@ -316,6 +324,10 @@ pub enum WipePersistedCatalogError {
#[error("Server returned an empty response")]
EmptyResponse,
/// Response payload was invalid
#[error("Invalid response: {0}")]
InvalidResponse(#[from] FieldViolation),
/// Client received an unexpected error from the server
#[error("Unexpected server error: {}: {}", .0.code(), .0.message())]
ServerError(tonic::Status),
@ -800,7 +812,7 @@ impl Client {
pub async fn create_dummy_job(
&mut self,
nanos: Vec<u64>,
) -> Result<Operation, CreateDummyJobError> {
) -> Result<IoxOperation, CreateDummyJobError> {
let response = self
.inner
.create_dummy_job(CreateDummyJobRequest { nanos })
@ -810,7 +822,8 @@ impl Client {
Ok(response
.into_inner()
.operation
.ok_or(CreateDummyJobError::EmptyResponse)?)
.ok_or(CreateDummyJobError::EmptyResponse)?
.try_into()?)
}
/// Closes the specified chunk in the specified partition and
@ -823,7 +836,7 @@ impl Client {
table_name: impl Into<String> + Send,
partition_key: impl Into<String> + Send,
chunk_id: u32,
) -> Result<Operation, ClosePartitionChunkError> {
) -> Result<IoxOperation, ClosePartitionChunkError> {
let db_name = db_name.into();
let partition_key = partition_key.into();
let table_name = table_name.into();
@ -846,7 +859,8 @@ impl Client {
Ok(response
.into_inner()
.operation
.ok_or(ClosePartitionChunkError::EmptyResponse)?)
.ok_or(ClosePartitionChunkError::EmptyResponse)?
.try_into()?)
}
/// Unload chunk from read buffer but keep it in object store.
@ -887,7 +901,7 @@ impl Client {
pub async fn wipe_persisted_catalog(
&mut self,
db_name: impl Into<String> + Send,
) -> Result<Operation, WipePersistedCatalogError> {
) -> Result<IoxOperation, WipePersistedCatalogError> {
let db_name = db_name.into();
let response = self
@ -905,7 +919,8 @@ impl Client {
Ok(response
.into_inner()
.operation
.ok_or(WipePersistedCatalogError::EmptyResponse)?)
.ok_or(WipePersistedCatalogError::EmptyResponse)?
.try_into()?)
}
/// Skip replay of an uninitialized database.

View File

@ -1,11 +1,9 @@
use thiserror::Error;
use ::generated_types::{
google::FieldViolation, influxdata::iox::management::v1 as management, protobuf_type_url_eq,
};
use self::generated_types::{operations_client::OperationsClient, *};
use crate::connection::Connection;
use std::convert::TryInto;
/// Re-export generated_types
pub mod generated_types {
pub use generated_types::google::longrunning::*;
@ -16,7 +14,7 @@ pub mod generated_types {
pub enum Error {
/// Client received an invalid response
#[error("Invalid server response: {}", .0)]
InvalidResponse(#[from] FieldViolation),
InvalidResponse(#[from] ::generated_types::google::FieldViolation),
/// Operation was not found
#[error("Operation not found: {}", .0)]
@ -66,7 +64,7 @@ impl Client {
}
/// Get information of all client operation
pub async fn list_operations(&mut self) -> Result<Vec<ClientOperation>> {
pub async fn list_operations(&mut self) -> Result<Vec<IoxOperation>> {
Ok(self
.inner
.list_operations(ListOperationsRequest::default())
@ -75,12 +73,12 @@ impl Client {
.into_inner()
.operations
.into_iter()
.map(|o| ClientOperation::try_new(o).unwrap())
.collect())
.map(TryInto::try_into)
.collect::<Result<_, _>>()?)
}
/// Get information about a specific operation
pub async fn get_operation(&mut self, id: usize) -> Result<Operation> {
pub async fn get_operation(&mut self, id: usize) -> Result<IoxOperation> {
Ok(self
.inner
.get_operation(GetOperationRequest {
@ -91,7 +89,8 @@ impl Client {
tonic::Code::NotFound => Error::NotFound(id),
_ => Error::ServerError(e),
})?
.into_inner())
.into_inner()
.try_into()?)
}
/// Cancel a given operation
@ -115,7 +114,7 @@ impl Client {
&mut self,
id: usize,
timeout: Option<std::time::Duration>,
) -> Result<Operation> {
) -> Result<IoxOperation> {
Ok(self
.inner
.wait_operation(WaitOperationRequest {
@ -127,50 +126,7 @@ impl Client {
tonic::Code::NotFound => Error::NotFound(id),
_ => Error::ServerError(e),
})?
.into_inner())
}
/// Return the Client Operation
pub async fn client_operation(&mut self, id: usize) -> Result<ClientOperation> {
let operation = self.get_operation(id).await?;
ClientOperation::try_new(operation)
}
}
/// IOx's Client Operation
#[derive(Debug, Clone)]
pub struct ClientOperation {
inner: generated_types::Operation,
}
impl ClientOperation {
/// Create a new Cient Operation
pub fn try_new(operation: generated_types::Operation) -> Result<Self> {
if operation.metadata.is_some() {
let metadata = operation.metadata.clone().unwrap();
if !protobuf_type_url_eq(&metadata.type_url, management::OPERATION_METADATA) {
return Err(Error::WrongOperationMetaData);
}
} else {
return Err(Error::NotFound(0));
}
Ok(Self { inner: operation })
}
/// Return Metadata for this client operation
pub fn metadata(&self) -> management::OperationMetadata {
prost::Message::decode(self.inner.metadata.clone().unwrap().value)
.expect("failed to decode metadata")
}
/// Return name of this operation
pub fn name(&self) -> &str {
&self.inner.name
}
/// Return the inner's Operation
pub fn operation(self) -> Operation {
self.inner
.into_inner()
.try_into()?)
}
}

View File

@ -77,6 +77,15 @@ pub enum Error {
/// Underlying `agent` module error that caused this problem
source: agent::Error,
},
/// Error that may happen when constructing an agent's writer
#[snafu(display("Could not create writer for agent `{}`, caused by:\n{}", name, source))]
CouldNotCreateAgentWriter {
/// The name of the relevant agent
name: String,
/// Underlying `write` module error that caused this problem
source: write::Error,
},
}
type Result<T, E = Error> = std::result::Result<T, E>;
@ -135,7 +144,9 @@ pub async fn generate<T: DataGenRng>(
)
.context(CouldNotCreateAgent { name: &agent_name })?;
let agent_points_writer = points_writer_builder.build_for_agent(&agent_name);
let agent_points_writer = points_writer_builder
.build_for_agent(&agent_name)
.context(CouldNotCreateAgentWriter { name: &agent_name })?;
handles.push(tokio::task::spawn(async move {
agent.generate_all(agent_points_writer, batch_size).await

View File

@ -10,14 +10,23 @@ use std::{
};
use std::{
fs,
fs::OpenOptions,
fs::{File, OpenOptions},
io::BufWriter,
path::{Path, PathBuf},
};
use tracing::info;
/// Errors that may happen while writing points.
#[derive(Snafu, Debug)]
pub enum Error {
/// Error that may happen when writing line protocol to a file
#[snafu(display("Could open line protocol file {}: {}", filename.display(), source))]
CantOpenLineProtocolFile {
/// The location of the file we tried to open
filename: PathBuf,
/// Underlying IO error that caused this problem
source: std::io::Error,
},
/// Error that may happen when writing line protocol to a no-op sink
#[snafu(display("Could not generate line protocol: {}", source))]
CantWriteToNoOp {
@ -174,7 +183,7 @@ impl PointsWriterBuilder {
/// Create a writer out of this writer's configuration for a particular
/// agent that runs in a separate thread/task.
pub fn build_for_agent(&mut self, agent_name: &str) -> PointsWriter {
pub fn build_for_agent(&mut self, agent_name: &str) -> Result<PointsWriter> {
let inner_writer = match &mut self.config {
PointsWriterConfig::Api {
client,
@ -189,7 +198,16 @@ impl PointsWriterBuilder {
let mut filename = dir_path.clone();
filename.push(agent_name);
filename.set_extension("txt");
InnerPointsWriter::File(filename)
let file = OpenOptions::new()
.append(true)
.create(true)
.open(&filename)
.context(CantOpenLineProtocolFile { filename })?;
let file = BufWriter::new(file);
InnerPointsWriter::File { file }
}
PointsWriterConfig::NoOp { perform_write } => InnerPointsWriter::NoOp {
perform_write: *perform_write,
@ -204,7 +222,7 @@ impl PointsWriterBuilder {
PointsWriterConfig::Stdout => InnerPointsWriter::Stdout,
};
PointsWriter { inner_writer }
Ok(PointsWriter { inner_writer })
}
}
@ -228,7 +246,9 @@ enum InnerPointsWriter {
org: String,
bucket: String,
},
File(PathBuf),
File {
file: BufWriter<File>,
},
NoOp {
perform_write: bool,
},
@ -250,22 +270,12 @@ impl InnerPointsWriter {
.await
.context(CantWriteToApi)?;
}
Self::File(filename) => {
info!("Opening file {:?}", filename);
let num_points = points.len();
let file = OpenOptions::new()
.append(true)
.create(true)
.open(&filename)
.context(CantWriteToLineProtocolFile)?;
let mut file = std::io::BufWriter::new(file);
Self::File { file } => {
for point in points {
point
.write_data_point_to(&mut file)
.write_data_point_to(&mut *file)
.context(CantWriteToLineProtocolFile)?;
}
info!("Wrote {} points to {:?}", num_points, filename);
}
Self::NoOp { perform_write } => {
if *perform_write {

View File

@ -8,5 +8,7 @@ description = "Utilities for pbjson converion"
[dependencies]
serde = { version = "1.0", features = ["derive"] }
base64 = "0.13"
[dev-dependencies]
bytes = "1.0"

View File

@ -9,12 +9,17 @@
#[doc(hidden)]
pub mod private {
/// Re-export base64
pub use base64;
use serde::Deserialize;
use std::str::FromStr;
/// Used to parse a number from either a string or its raw representation
#[derive(Debug, Copy, Clone, PartialOrd, PartialEq, Hash, Ord, Eq)]
pub struct NumberDeserialize<T>(pub T);
#[derive(serde::Deserialize)]
#[derive(Deserialize)]
#[serde(untagged)]
enum Content<'a, T> {
Str(&'a str),
@ -26,7 +31,6 @@ pub mod private {
T: FromStr + serde::Deserialize<'de>,
<T as FromStr>::Err: std::error::Error,
{
#[allow(deprecated)]
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
@ -38,4 +42,41 @@ pub mod private {
}))
}
}
#[derive(Debug, Copy, Clone, PartialOrd, PartialEq, Hash, Ord, Eq)]
pub struct BytesDeserialize<T>(pub T);
impl<'de, T> Deserialize<'de> for BytesDeserialize<T>
where
T: From<Vec<u8>>,
{
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
let s: &str = Deserialize::deserialize(deserializer)?;
let decoded = base64::decode(s).map_err(serde::de::Error::custom)?;
Ok(Self(decoded.into()))
}
}
#[cfg(test)]
mod tests {
use super::*;
use bytes::Bytes;
use serde::de::value::{BorrowedStrDeserializer, Error};
#[test]
fn test_bytes() {
let raw = vec![2, 5, 62, 2, 5, 7, 8, 43, 5, 8, 4, 23, 5, 7, 7, 3, 2, 5, 196];
let encoded = base64::encode(&raw);
let deserializer = BorrowedStrDeserializer::<'_, Error>::new(&encoded);
let a: Bytes = BytesDeserialize::deserialize(deserializer).unwrap().0;
let b: Vec<u8> = BytesDeserialize::deserialize(deserializer).unwrap().0;
assert_eq!(raw.as_slice(), &a);
assert_eq!(raw.as_slice(), &b);
}
}
}

View File

@ -198,28 +198,14 @@ fn write_serialize_variable<W: Write>(
writer: &mut W,
) -> Result<()> {
match &field.field_type {
FieldType::Scalar(ScalarType::I64) | FieldType::Scalar(ScalarType::U64) => {
match field.field_modifier {
FieldModifier::Repeated => {
writeln!(
writer,
"{}struct_ser.serialize_field(\"{}\", &{}.iter().map(ToString::to_string).collect::<Vec<_>>())?;",
Indent(indent),
field.json_name(),
variable.raw
)
}
_ => {
writeln!(
writer,
"{}struct_ser.serialize_field(\"{}\", {}.to_string().as_str())?;",
Indent(indent),
field.json_name(),
variable.raw
)
}
}
}
FieldType::Scalar(scalar) => write_serialize_scalar_variable(
indent,
*scalar,
field.field_modifier,
variable,
field.json_name(),
writer,
),
FieldType::Enum(path) => {
write!(writer, "{}let v = ", Indent(indent))?;
match field.field_modifier {
@ -301,6 +287,52 @@ fn write_serialize_variable<W: Write>(
}
}
fn write_serialize_scalar_variable<W: Write>(
indent: usize,
scalar: ScalarType,
field_modifier: FieldModifier,
variable: Variable<'_>,
json_name: String,
writer: &mut W,
) -> Result<()> {
let conversion = match scalar {
ScalarType::I64 | ScalarType::U64 => "ToString::to_string",
ScalarType::Bytes => "pbjson::private::base64::encode",
_ => {
return writeln!(
writer,
"{}struct_ser.serialize_field(\"{}\", {})?;",
Indent(indent),
json_name,
variable.as_ref
)
}
};
match field_modifier {
FieldModifier::Repeated => {
writeln!(
writer,
"{}struct_ser.serialize_field(\"{}\", &{}.iter().map({}).collect::<Vec<_>>())?;",
Indent(indent),
json_name,
variable.raw,
conversion
)
}
_ => {
writeln!(
writer,
"{}struct_ser.serialize_field(\"{}\", {}(&{}).as_str())?;",
Indent(indent),
json_name,
conversion,
variable.raw,
)
}
}
}
fn write_serialize_field<W: Write>(
config: &Config,
indent: usize,
@ -641,33 +673,8 @@ fn write_deserialize_field<W: Write>(
}
match &field.field_type {
FieldType::Scalar(scalar) if scalar.is_numeric() => {
writeln!(writer)?;
match field.field_modifier {
FieldModifier::Repeated => {
writeln!(
writer,
"{}map.next_value::<Vec<::pbjson::private::NumberDeserialize<{}>>>()?",
Indent(indent + 2),
scalar.rust_type()
)?;
writeln!(
writer,
"{}.into_iter().map(|x| x.0).collect()",
Indent(indent + 3)
)?;
}
_ => {
writeln!(
writer,
"{}map.next_value::<::pbjson::private::NumberDeserialize<{}>>()?.0",
Indent(indent + 2),
scalar.rust_type()
)?;
}
}
write!(writer, "{}", Indent(indent + 1))?;
FieldType::Scalar(scalar) => {
write_encode_scalar_field(indent + 1, *scalar, field.field_modifier, writer)?;
}
FieldType::Enum(path) => match field.field_modifier {
FieldModifier::Repeated => {
@ -693,8 +700,12 @@ fn write_deserialize_field<W: Write>(
Indent(indent + 2),
)?;
let map_k = match key.is_numeric() {
true => {
let map_k = match key {
ScalarType::Bytes => {
// https://github.com/tokio-rs/prost/issues/531
panic!("bytes are not currently supported as map keys")
}
_ if key.is_numeric() => {
write!(
writer,
"::pbjson::private::NumberDeserialize<{}>",
@ -702,7 +713,7 @@ fn write_deserialize_field<W: Write>(
)?;
"k.0"
}
false => {
_ => {
write!(writer, "_")?;
"k"
}
@ -717,6 +728,10 @@ fn write_deserialize_field<W: Write>(
)?;
"v.0"
}
FieldType::Scalar(ScalarType::Bytes) => {
// https://github.com/tokio-rs/prost/issues/531
panic!("bytes are not currently supported as map values")
}
FieldType::Enum(path) => {
write!(writer, "{}", config.rust_type(path))?;
"v as i32"
@ -752,3 +767,43 @@ fn write_deserialize_field<W: Write>(
writeln!(writer, ");")?;
writeln!(writer, "{}}}", Indent(indent))
}
fn write_encode_scalar_field<W: Write>(
indent: usize,
scalar: ScalarType,
field_modifier: FieldModifier,
writer: &mut W,
) -> Result<()> {
let deserializer = match scalar {
ScalarType::Bytes => "BytesDeserialize",
_ if scalar.is_numeric() => "NumberDeserialize",
_ => return write!(writer, "map.next_value()?",),
};
writeln!(writer)?;
match field_modifier {
FieldModifier::Repeated => {
writeln!(
writer,
"{}map.next_value::<Vec<::pbjson::private::{}<_>>>()?",
Indent(indent + 1),
deserializer
)?;
writeln!(
writer,
"{}.into_iter().map(|x| x.0).collect()",
Indent(indent + 2)
)?;
}
_ => {
writeln!(
writer,
"{}map.next_value::<::pbjson::private::{}<_>>()?.0",
Indent(indent + 1),
deserializer
)?;
}
}
write!(writer, "{}", Indent(indent))
}

View File

@ -12,7 +12,7 @@ use prost_types::{
use crate::descriptor::{Descriptor, DescriptorSet, MessageDescriptor, Syntax, TypeName, TypePath};
use crate::escape::escape_ident;
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Copy)]
pub enum ScalarType {
F64,
F32,
@ -61,7 +61,7 @@ pub enum FieldType {
Map(ScalarType, Box<FieldType>),
}
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Copy)]
pub enum FieldModifier {
Required,
Optional,

View File

@ -60,11 +60,13 @@ message KitchenSink {
Empty one_of_message = 30;
}
// Bytes support is currently broken
// bytes bytes = 31;
// optional bytes optional_bytes = 32;
// map<string, bytes> bytes_dict = 35;
bytes bytes = 31;
optional bytes optional_bytes = 32;
repeated bytes repeated_bytes = 33;
string string = 33;
optional string optional_string = 34;
// Bytes support is currently broken - https://github.com/tokio-rs/prost/issues/531
// map<string, bytes> bytes_dict = 34;
string string = 35;
optional string optional_string = 36;
}

View File

@ -196,13 +196,30 @@ mod tests {
decoded.repeated_value = Default::default();
verify_decode(&decoded, "{}");
// Bytes support currently broken
// decoded.bytes = prost::bytes::Bytes::from_static(b"kjkjkj");
// verify(&decoded, r#"{"bytes":"a2pramtqCg=="}"#);
//
// decoded.repeated_value = Default::default();
// verify_decode(&decoded, "{}");
//
decoded.bytes = prost::bytes::Bytes::from_static(b"kjkjkj");
verify(&decoded, r#"{"bytes":"a2pramtq"}"#);
decoded.bytes = Default::default();
verify_decode(&decoded, "{}");
decoded.optional_bytes = Some(prost::bytes::Bytes::from_static(b"kjkjkj"));
verify(&decoded, r#"{"optionalBytes":"a2pramtq"}"#);
decoded.optional_bytes = Some(Default::default());
verify(&decoded, r#"{"optionalBytes":""}"#);
decoded.optional_bytes = None;
verify_decode(&decoded, "{}");
decoded.repeated_bytes = vec![
prost::bytes::Bytes::from_static(b"sdfsd"),
prost::bytes::Bytes::from_static(b"fghfg"),
];
verify(&decoded, r#"{"repeatedBytes":["c2Rmc2Q=","ZmdoZmc="]}"#);
decoded.repeated_bytes = Default::default();
verify_decode(&decoded, "{}");
// decoded.bytes_dict.insert(
// "test".to_string(),
// prost::bytes::Bytes::from_static(b"asdf"),

View File

@ -164,8 +164,13 @@ impl Predicate {
/// Add each range [start, stop] of the delete_predicates into the predicate in
/// the form "time < start OR time > stop" to eliminate that range from the query
pub fn add_delete_ranges(&mut self, delete_predicates: &[Self]) {
pub fn add_delete_ranges<S>(&mut self, delete_predicates: &[S])
where
S: AsRef<Self>,
{
for pred in delete_predicates {
let pred = pred.as_ref();
if let Some(range) = pred.range {
let expr = col(TIME_COLUMN_NAME)
.lt(lit(range.start))
@ -182,8 +187,13 @@ impl Predicate {
/// The negated list will be "NOT(Delete_1)", NOT(Delete_2)" which means
/// NOT(city != "Boston" AND temp = 70), NOT(state = "NY" AND route != "I90") which means
/// [NOT(city = Boston") OR NOT(temp = 70)], [NOT(state = "NY") OR NOT(route != "I90")]
pub fn add_delete_exprs(&mut self, delete_predicates: &[Self]) {
pub fn add_delete_exprs<S>(&mut self, delete_predicates: &[S])
where
S: AsRef<Self>,
{
for pred in delete_predicates {
let pred = pred.as_ref();
let mut expr: Option<Expr> = None;
for exp in &pred.exprs {
match expr {

View File

@ -11,18 +11,19 @@ use datafusion::{
logical_plan::{LogicalPlan, UserDefinedLogicalNode},
physical_plan::{
coalesce_partitions::CoalescePartitionsExec,
collect, displayable,
displayable,
planner::{DefaultPhysicalPlanner, ExtensionPlanner},
ExecutionPlan, PhysicalPlanner, SendableRecordBatchStream,
},
prelude::*,
};
use futures::TryStreamExt;
use observability_deps::tracing::{debug, trace};
use trace::{ctx::SpanContext, span::SpanRecorder};
use crate::exec::{
fieldlist::{FieldList, IntoFieldList},
query_tracing::send_metrics_to_tracing,
query_tracing::TracedStream,
schema_pivot::{SchemaPivotExec, SchemaPivotNode},
seriesset::{SeriesSetConverter, SeriesSetItem},
split::StreamSplitExec,
@ -272,45 +273,63 @@ impl IOxExecutionContext {
/// Executes the logical plan using DataFusion on a separate
/// thread pool and produces RecordBatches
pub async fn collect(&self, physical_plan: Arc<dyn ExecutionPlan>) -> Result<Vec<RecordBatch>> {
let ctx = self.child_ctx("collect");
debug!(
"Running plan, physical:\n{}",
displayable(physical_plan.as_ref()).indent()
);
let ctx = self.child_ctx("collect");
let stream = ctx.execute_stream(physical_plan).await?;
let res = ctx.run(collect(Arc::clone(&physical_plan))).await;
// send metrics to tracing, even on error
ctx.save_metrics(physical_plan);
res
ctx.run(
stream
.err_into() // convert to DataFusionError
.try_collect(),
)
.await
}
/// Executes the physical plan and produces a RecordBatchStream to stream
/// over the result that iterates over the results.
pub async fn execute(
/// Executes the physical plan and produces a
/// `SendableRecordBatchStream` to stream over the result that
/// iterates over the results. The creation of the stream is
/// performed in a separate thread pool.
pub async fn execute_stream(
&self,
physical_plan: Arc<dyn ExecutionPlan>,
) -> Result<SendableRecordBatchStream> {
match physical_plan.output_partitioning().partition_count() {
0 => unreachable!(),
1 => self.execute_partition(physical_plan, 0).await,
1 => self.execute_stream_partitioned(physical_plan, 0).await,
_ => {
// Merge into a single partition
self.execute_partition(Arc::new(CoalescePartitionsExec::new(physical_plan)), 0)
.await
self.execute_stream_partitioned(
Arc::new(CoalescePartitionsExec::new(physical_plan)),
0,
)
.await
}
}
}
/// Executes a single partition of a physical plan and produces a RecordBatchStream to stream
/// over the result that iterates over the results.
pub async fn execute_partition(
/// Executes a single partition of a physical plan and produces a
/// `SendableRecordBatchStream` to stream over the result that
/// iterates over the results. The creation of the stream is
/// performed in a separate thread pool.
pub async fn execute_stream_partitioned(
&self,
physical_plan: Arc<dyn ExecutionPlan>,
partition: usize,
) -> Result<SendableRecordBatchStream> {
self.run(async move { physical_plan.execute(partition).await })
.await
let span = self
.recorder
.span()
.map(|span| span.child("execute_stream_partitioned"));
self.run(async move {
let stream = physical_plan.execute(partition).await?;
let stream = TracedStream::new(stream, span, physical_plan);
Ok(Box::pin(stream) as _)
})
.await
}
/// Executes the SeriesSetPlans on the query executor, in
@ -349,7 +368,7 @@ impl IOxExecutionContext {
let physical_plan = ctx.prepare_plan(&plan)?;
let it = ctx.execute(physical_plan).await?;
let it = ctx.execute_stream(physical_plan).await?;
SeriesSetConverter::default()
.convert(
@ -486,19 +505,4 @@ impl IOxExecutionContext {
recorder: self.recorder.child(name),
}
}
/// Saves any DataFusion metrics that are currently present in
/// `physical_plan` to the span recorder so they show up in
/// distributed traces (e.g. Jaeger)
///
/// This function should be invoked after `physical_plan` has
/// fully `collect`ed, meaning that `PhysicalPlan::execute()` has
/// been invoked and the resulting streams have been completely
/// consumed. Calling `save_metrics` metrics prior to this point
/// may result in saving incomplete information.
pub fn save_metrics(&self, physical_plan: Arc<dyn ExecutionPlan>) {
if let Some(span) = self.recorder.span() {
send_metrics_to_tracing(span, physical_plan.as_ref())
}
}
}

View File

@ -1,15 +1,67 @@
//! This module contains the code to map DataFusion metrics to `Span`s
//! for use in distributed tracing (e.g. Jaeger)
use std::{borrow::Cow, fmt};
use std::{borrow::Cow, fmt, sync::Arc};
use arrow::record_batch::RecordBatch;
use chrono::{DateTime, Utc};
use datafusion::physical_plan::{
metrics::{MetricValue, MetricsSet},
DisplayFormatType, ExecutionPlan,
DisplayFormatType, ExecutionPlan, RecordBatchStream, SendableRecordBatchStream,
};
use futures::StreamExt;
use observability_deps::tracing::debug;
use trace::span::Span;
use trace::span::{Span, SpanRecorder};
/// Stream wrapper that records DataFusion `MetricSets` into IOx
/// [`Span`]s when it is dropped.
pub(crate) struct TracedStream {
inner: SendableRecordBatchStream,
span_recorder: SpanRecorder,
physical_plan: Arc<dyn ExecutionPlan>,
}
impl TracedStream {
/// Return a stream that records DataFusion `MetricSets` from
/// `physical_plan` into `span` when dropped.
pub(crate) fn new(
inner: SendableRecordBatchStream,
span: Option<trace::span::Span>,
physical_plan: Arc<dyn ExecutionPlan>,
) -> Self {
Self {
inner,
span_recorder: SpanRecorder::new(span),
physical_plan,
}
}
}
impl RecordBatchStream for TracedStream {
fn schema(&self) -> arrow::datatypes::SchemaRef {
self.inner.schema()
}
}
impl futures::Stream for TracedStream {
type Item = arrow::error::Result<RecordBatch>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
self.inner.poll_next_unpin(cx)
}
}
impl Drop for TracedStream {
fn drop(&mut self) {
if let Some(span) = self.span_recorder.span() {
let default_end_time = Utc::now();
send_metrics_to_tracing(default_end_time, span, self.physical_plan.as_ref());
}
}
}
/// This function translates data in DataFusion `MetricSets` into IOx
/// [`Span`]s. It records a snapshot of the current state of the
@ -26,15 +78,7 @@ use trace::span::Span;
/// 1. If the ExecutionPlan had no metrics
/// 2. The total number of rows produced by the ExecutionPlan (if available)
/// 3. The elapsed compute time taken by the ExecutionPlan
pub(crate) fn send_metrics_to_tracing(parent_span: &Span, physical_plan: &dyn ExecutionPlan) {
// The parent span may be open, but since the physical_plan is
// assumed to be fully collected, using `now()` is a conservative
// estimate of the end time
let default_end_time = Utc::now();
send_metrics_to_tracing_inner(default_end_time, parent_span, physical_plan)
}
fn send_metrics_to_tracing_inner(
fn send_metrics_to_tracing(
default_end_time: DateTime<Utc>,
parent_span: &Span,
physical_plan: &dyn ExecutionPlan,
@ -101,7 +145,7 @@ fn send_metrics_to_tracing_inner(
// recurse
for child in physical_plan.children() {
send_metrics_to_tracing_inner(span_end, &span, child.as_ref())
send_metrics_to_tracing(span_end, &span, child.as_ref())
}
span.export()
@ -185,7 +229,7 @@ mod tests {
let exec = TestExec::new(name, Default::default());
let traces = TraceBuilder::new();
send_metrics_to_tracing(&traces.make_span(), &exec);
send_metrics_to_tracing(Utc::now(), &traces.make_span(), &exec);
let spans = traces.spans();
assert_eq!(spans.len(), 1);
@ -216,7 +260,7 @@ mod tests {
exec.new_child("child4", make_time_metricset(None, None));
let traces = TraceBuilder::new();
send_metrics_to_tracing_inner(ts5, &traces.make_span(), &exec);
send_metrics_to_tracing(ts5, &traces.make_span(), &exec);
let spans = traces.spans();
println!("Spans: \n\n{:#?}", spans);
@ -242,7 +286,7 @@ mod tests {
exec.metrics = None;
let traces = TraceBuilder::new();
send_metrics_to_tracing(&traces.make_span(), &exec);
send_metrics_to_tracing(Utc::now(), &traces.make_span(), &exec);
let spans = traces.spans();
assert_eq!(spans.len(), 1);
@ -266,7 +310,7 @@ mod tests {
add_elapsed_compute(exec.metrics_mut(), 2000, 2);
let traces = TraceBuilder::new();
send_metrics_to_tracing(&traces.make_span(), &exec);
send_metrics_to_tracing(Utc::now(), &traces.make_span(), &exec);
// aggregated metrics should be reported
let spans = traces.spans();

View File

@ -46,7 +46,7 @@ pub trait QueryChunkMeta: Sized {
fn schema(&self) -> Arc<Schema>;
// return a reference to delete predicates of the chunk
fn delete_predicates(&self) -> &Vec<Predicate>;
fn delete_predicates(&self) -> &[Arc<Predicate>];
}
/// A `Database` is the main trait implemented by the IOx subsystems
@ -137,7 +137,7 @@ pub trait QueryChunk: QueryChunkMeta + Debug + Send + Sync {
&self,
predicate: &Predicate,
selection: Selection<'_>,
delete_predicates: &[Predicate],
delete_predicates: &[Arc<Predicate>],
) -> Result<SendableRecordBatchStream, Self::Error>;
/// Returns true if data of this chunk is sorted
@ -166,10 +166,10 @@ where
self.as_ref().schema()
}
fn delete_predicates(&self) -> &Vec<Predicate> {
let pred: &Vec<Predicate> = self.as_ref().delete_predicates();
fn delete_predicates(&self) -> &[Arc<Predicate>] {
let pred = self.as_ref().delete_predicates();
debug!(?pred, "Delete predicate in QueryChunkMeta");
self.as_ref().delete_predicates()
pred
}
}

View File

@ -175,7 +175,8 @@ pub struct TestChunk {
predicate_match: Option<PredicateMatch>,
/// Copy of delete predicates passed
delete_predicates: Vec<Predicate>,
delete_predicates: Vec<Arc<Predicate>>,
/// Order of this chunk relative to other overlapping chunks.
order: ChunkOrder,
}
@ -823,7 +824,7 @@ impl QueryChunk for TestChunk {
&self,
predicate: &Predicate,
_selection: Selection<'_>,
_delete_predicates: &[Predicate],
_delete_predicates: &[Arc<Predicate>],
) -> Result<SendableRecordBatchStream, Self::Error> {
self.check_error()?;
@ -913,11 +914,11 @@ impl QueryChunkMeta for TestChunk {
}
// return a reference to delete predicates of the chunk
fn delete_predicates(&self) -> &Vec<Predicate> {
let pred: &Vec<Predicate> = &self.delete_predicates;
fn delete_predicates(&self) -> &[Arc<Predicate>] {
let pred = &self.delete_predicates;
debug!(?pred, "Delete predicate in Test Chunk");
&self.delete_predicates
pred
}
}
@ -927,7 +928,7 @@ pub async fn raw_data(chunks: &[Arc<TestChunk>]) -> Vec<RecordBatch> {
for c in chunks {
let pred = Predicate::default();
let selection = Selection::All;
let delete_predicates: Vec<Predicate> = vec![];
let delete_predicates: Vec<Arc<Predicate>> = vec![];
let mut stream = c
.read_filter(&pred, selection, &delete_predicates)
.expect("Error in read_filter");

View File

@ -519,7 +519,7 @@ impl Db {
pub async fn delete(
self: &Arc<Self>,
table_name: &str,
delete_predicate: &Predicate,
delete_predicate: Arc<Predicate>,
) -> Result<()> {
// get all partitions of this table
let table = self
@ -534,7 +534,7 @@ impl Db {
// save the delete predicate in the chunk
let mut chunk = chunk.write();
chunk
.add_delete_predicate(delete_predicate)
.add_delete_predicate(Arc::clone(&delete_predicate))
.context(AddDeletePredicateError)?;
}
}

View File

@ -80,7 +80,7 @@ pub struct ChunkMetadata {
pub schema: Arc<Schema>,
/// Delete predicates of this chunk
pub delete_predicates: Arc<Vec<Predicate>>,
pub delete_predicates: Vec<Arc<Predicate>>,
}
/// Different memory representations of a frozen chunk.
@ -307,14 +307,14 @@ impl CatalogChunk {
time_of_last_write: DateTime<Utc>,
schema: Arc<Schema>,
metrics: ChunkMetrics,
delete_predicates: Arc<Vec<Predicate>>,
delete_predicates: Vec<Arc<Predicate>>,
order: ChunkOrder,
) -> Self {
let stage = ChunkStage::Frozen {
meta: Arc::new(ChunkMetadata {
table_summary: Arc::new(chunk.table_summary()),
schema,
delete_predicates: Arc::clone(&delete_predicates),
delete_predicates,
}),
representation: ChunkStageFrozenRepr::ReadBuffer(Arc::new(chunk)),
};
@ -342,7 +342,7 @@ impl CatalogChunk {
time_of_first_write: DateTime<Utc>,
time_of_last_write: DateTime<Utc>,
metrics: ChunkMetrics,
delete_predicates: Arc<Vec<Predicate>>,
delete_predicates: Vec<Arc<Predicate>>,
order: ChunkOrder,
) -> Self {
assert_eq!(chunk.table_name(), addr.table_name.as_ref());
@ -469,7 +469,7 @@ impl CatalogChunk {
}
}
pub fn add_delete_predicate(&mut self, delete_predicate: &Predicate) -> Result<()> {
pub fn add_delete_predicate(&mut self, delete_predicate: Arc<Predicate>) -> Result<()> {
debug!(
?delete_predicate,
"Input delete predicate to CatalogChunk add_delete_predicate"
@ -479,24 +479,14 @@ impl CatalogChunk {
// Freeze/close this chunk and add delete_predicate to its frozen one
self.freeze_with_predicate(delete_predicate)?;
}
ChunkStage::Frozen { meta, .. } => {
ChunkStage::Frozen { meta, .. } | ChunkStage::Persisted { meta, .. } => {
// Add the delete_predicate into the chunk's metadata
let mut del_preds: Vec<Predicate> = (*meta.delete_predicates).clone();
del_preds.push(delete_predicate.clone());
let mut del_preds = meta.delete_predicates.clone();
del_preds.push(delete_predicate);
*meta = Arc::new(ChunkMetadata {
table_summary: Arc::clone(&meta.table_summary),
schema: Arc::clone(&meta.schema),
delete_predicates: Arc::new(del_preds),
});
}
ChunkStage::Persisted { meta, .. } => {
// Add the delete_predicate into the chunk's metadata
let mut del_preds: Vec<Predicate> = (*meta.delete_predicates).clone();
del_preds.push(delete_predicate.clone());
*meta = Arc::new(ChunkMetadata {
table_summary: Arc::clone(&meta.table_summary),
schema: Arc::clone(&meta.schema),
delete_predicates: Arc::new(del_preds),
delete_predicates: del_preds,
});
}
}
@ -504,22 +494,22 @@ impl CatalogChunk {
Ok(())
}
pub fn delete_predicates(&mut self) -> Arc<Vec<Predicate>> {
pub fn delete_predicates(&mut self) -> &[Arc<Predicate>] {
match &self.stage {
ChunkStage::Open { mb_chunk: _ } => {
// no delete predicate for open chunk
debug!("delete_predicates of Open chunk is empty");
Arc::new(vec![])
&[]
}
ChunkStage::Frozen { meta, .. } => {
let preds = &meta.delete_predicates;
debug!(?preds, "delete_predicates of Frozen chunk");
Arc::clone(&meta.delete_predicates)
preds
}
ChunkStage::Persisted { meta, .. } => {
let preds = &meta.delete_predicates;
debug!(?preds, "delete_predicates of Persisted chunk");
Arc::clone(&meta.delete_predicates)
preds
}
}
}
@ -692,11 +682,14 @@ impl CatalogChunk {
///
/// This only works for chunks in the _open_ stage (chunk is converted) and the _frozen_ stage
/// (no-op) and will fail for other stages.
pub fn freeze_with_predicate(&mut self, delete_predicate: &Predicate) -> Result<()> {
self.freeze_with_delete_predicates(vec![delete_predicate.clone()])
pub fn freeze_with_predicate(&mut self, delete_predicate: Arc<Predicate>) -> Result<()> {
self.freeze_with_delete_predicates(vec![delete_predicate])
}
fn freeze_with_delete_predicates(&mut self, delete_predicates: Vec<Predicate>) -> Result<()> {
fn freeze_with_delete_predicates(
&mut self,
delete_predicates: Vec<Arc<Predicate>>,
) -> Result<()> {
match &self.stage {
ChunkStage::Open { mb_chunk, .. } => {
debug!(%self.addr, row_count=mb_chunk.rows(), "freezing chunk");
@ -709,7 +702,7 @@ impl CatalogChunk {
let metadata = ChunkMetadata {
table_summary: Arc::new(mb_chunk.table_summary()),
schema: s.full_schema(),
delete_predicates: Arc::new(delete_predicates),
delete_predicates,
};
self.stage = ChunkStage::Frozen {
@ -793,7 +786,7 @@ impl CatalogChunk {
*meta = Arc::new(ChunkMetadata {
table_summary: Arc::clone(&meta.table_summary),
schema,
delete_predicates: Arc::clone(&meta.delete_predicates),
delete_predicates: meta.delete_predicates.clone(),
});
match &representation {
@ -1168,7 +1161,7 @@ mod tests {
expected_exprs1.push(e);
// Add a delete predicate into a chunk the open chunk = delete simulation for open chunk
chunk.add_delete_predicate(&del_pred1).unwrap();
chunk.add_delete_predicate(Arc::new(del_pred1)).unwrap();
// chunk must be in frozen stage now
assert_eq!(chunk.stage().name(), "Frozen");
// chunk must have a delete predicate
@ -1199,7 +1192,7 @@ mod tests {
let mut expected_exprs2 = vec![];
let e = col("cost").not_eq(lit(15));
expected_exprs2.push(e);
chunk.add_delete_predicate(&del_pred2).unwrap();
chunk.add_delete_predicate(Arc::new(del_pred2)).unwrap();
// chunk still must be in frozen stage now
assert_eq!(chunk.stage().name(), "Frozen");
// chunk must have 2 delete predicates
@ -1265,7 +1258,7 @@ mod tests {
now,
now,
ChunkMetrics::new_unregistered(),
Arc::new(vec![] as Vec<Predicate>),
vec![],
ChunkOrder::new(6),
)
}

View File

@ -176,7 +176,7 @@ impl Partition {
time_of_first_write: DateTime<Utc>,
time_of_last_write: DateTime<Utc>,
schema: Arc<Schema>,
delete_predicates: Arc<Vec<Predicate>>,
delete_predicates: Vec<Arc<Predicate>>,
chunk_order: ChunkOrder,
) -> (u32, Arc<RwLock<CatalogChunk>>) {
let chunk_id = Self::pick_next(&mut self.next_chunk_id, "Chunk ID Overflow");
@ -231,7 +231,7 @@ impl Partition {
chunk: Arc<parquet_file::chunk::ParquetChunk>,
time_of_first_write: DateTime<Utc>,
time_of_last_write: DateTime<Utc>,
delete_predicates: Arc<Vec<Predicate>>,
delete_predicates: Vec<Arc<Predicate>>,
chunk_order: ChunkOrder,
) -> Arc<RwLock<CatalogChunk>> {
assert_eq!(chunk.table_name(), self.table_name());
@ -246,7 +246,7 @@ impl Partition {
time_of_first_write,
time_of_last_write,
self.metrics.new_chunk_metrics(),
Arc::clone(&delete_predicates),
delete_predicates,
chunk_order,
)),
);

View File

@ -121,7 +121,7 @@ impl DbChunk {
let meta = ChunkMetadata {
table_summary: Arc::new(mb_chunk.table_summary()),
schema: snapshot.full_schema(),
delete_predicates: Arc::new(vec![]), // open chunk does not have delete predicate
delete_predicates: vec![], // open chunk does not have delete predicate
};
(state, Arc::new(meta))
}
@ -226,7 +226,7 @@ impl DbChunk {
}
pub fn to_rub_negated_predicates(
delete_predicates: &[Predicate],
delete_predicates: &[Arc<Predicate>],
) -> Result<Vec<read_buffer::Predicate>> {
let mut rub_preds: Vec<read_buffer::Predicate> = vec![];
for pred in delete_predicates {
@ -331,7 +331,7 @@ impl QueryChunk for DbChunk {
&self,
predicate: &Predicate,
selection: Selection<'_>,
delete_predicates: &[Predicate],
delete_predicates: &[Arc<Predicate>],
) -> Result<SendableRecordBatchStream, Self::Error> {
// Predicate is not required to be applied for correctness. We only pushed it down
// when possible for performance gain
@ -536,11 +536,11 @@ impl QueryChunkMeta for DbChunk {
}
// return a reference to delete predicates of the chunk
fn delete_predicates(&self) -> &Vec<Predicate> {
let pred: &Vec<Predicate> = &self.meta.delete_predicates;
fn delete_predicates(&self) -> &[Arc<Predicate>] {
let pred = &self.meta.delete_predicates;
debug!(?pred, "Delete predicate in DbChunk");
&self.meta.delete_predicates
pred
}
}

View File

@ -45,7 +45,7 @@ pub(crate) fn compact_chunks(
let mut input_rows = 0;
let mut time_of_first_write: Option<DateTime<Utc>> = None;
let mut time_of_last_write: Option<DateTime<Utc>> = None;
let mut delete_predicates: Vec<Predicate> = vec![];
let mut delete_predicates: Vec<Arc<Predicate>> = vec![];
let mut min_order = ChunkOrder::MAX;
let query_chunks = chunks
.into_iter()
@ -66,8 +66,7 @@ pub(crate) fn compact_chunks(
.map(|prev_last| prev_last.max(candidate_last))
.or(Some(candidate_last));
let mut preds = (*chunk.delete_predicates()).clone();
delete_predicates.append(&mut preds);
delete_predicates.extend(chunk.delete_predicates().iter().cloned());
min_order = min_order.min(chunk.order());
@ -103,7 +102,7 @@ pub(crate) fn compact_chunks(
ReorgPlanner::new().compact_plan(schema, query_chunks.iter().map(Arc::clone), key)?;
let physical_plan = ctx.prepare_plan(&plan)?;
let stream = ctx.execute(physical_plan).await?;
let stream = ctx.execute_stream(physical_plan).await?;
let rb_chunk = collect_rub(stream, &addr, metric_registry.as_ref())
.await?
.expect("chunk has zero rows");
@ -119,7 +118,7 @@ pub(crate) fn compact_chunks(
time_of_first_write,
time_of_last_write,
schema,
Arc::new(delete_predicates),
delete_predicates,
min_order,
)
};

View File

@ -54,7 +54,7 @@ pub fn move_chunk_to_read_buffer(
ReorgPlanner::new().compact_plan(schema, query_chunks.iter().map(Arc::clone), key)?;
let physical_plan = ctx.prepare_plan(&plan)?;
let stream = ctx.execute(physical_plan).await?;
let stream = ctx.execute_stream(physical_plan).await?;
let rb_chunk = collect_rub(
stream,
&addr.clone().into_partition(),

View File

@ -52,7 +52,7 @@ pub fn persist_chunks(
let mut time_of_first_write: Option<DateTime<Utc>> = None;
let mut time_of_last_write: Option<DateTime<Utc>> = None;
let mut query_chunks = vec![];
let mut delete_predicates: Vec<Predicate> = vec![];
let mut delete_predicates: Vec<Arc<Predicate>> = vec![];
let mut min_order = ChunkOrder::MAX;
for mut chunk in chunks {
// Sanity-check
@ -72,8 +72,7 @@ pub fn persist_chunks(
.map(|prev_last| prev_last.max(candidate_last))
.or(Some(candidate_last));
let mut preds = (*chunk.delete_predicates()).clone();
delete_predicates.append(&mut preds);
delete_predicates.extend(chunk.delete_predicates().iter().cloned());
min_order = min_order.min(chunk.order());
@ -112,8 +111,10 @@ pub fn persist_chunks(
"Expected split plan to produce exactly 2 partitions"
);
let to_persist_stream = ctx.execute_partition(Arc::clone(&physical_plan), 0).await?;
let remainder_stream = ctx.execute_partition(physical_plan, 1).await?;
let to_persist_stream = ctx
.execute_stream_partitioned(Arc::clone(&physical_plan), 0)
.await?;
let remainder_stream = ctx.execute_stream_partitioned(physical_plan, 1).await?;
let (to_persist, remainder) = futures::future::try_join(
collect_rub(to_persist_stream, &addr, metric_registry.as_ref()),
@ -131,7 +132,6 @@ pub fn persist_chunks(
partition_write.force_drop_chunk(id)
}
let del_preds = Arc::new(delete_predicates);
// Upsert remainder to catalog
if let Some(remainder) = remainder {
partition_write.create_rub_chunk(
@ -139,7 +139,7 @@ pub fn persist_chunks(
time_of_first_write,
time_of_last_write,
Arc::clone(&schema),
Arc::clone(&del_preds),
delete_predicates.clone(),
min_order,
);
}
@ -153,7 +153,7 @@ pub fn persist_chunks(
time_of_first_write,
time_of_last_write,
schema,
del_preds,
delete_predicates,
min_order,
);
let to_persist = LockableCatalogChunk {

View File

@ -226,7 +226,7 @@ impl CatalogState for Loader {
.map_err(|e| Box::new(e) as _)
.context(SchemaError { path: info.path })?;
let delete_predicates: Arc<Vec<Predicate>> = Arc::new(vec![]); // NGA todo: After Marco saves delete predicate into the catalog, it will need to get extracted into this variable
let delete_predicates: Vec<Arc<Predicate>> = vec![]; // NGA todo: After Marco saves delete predicate into the catalog, it will need to get extracted into this variable
partition.insert_object_store_only_chunk(
iox_md.chunk_id,
parquet_chunk,

View File

@ -1,6 +1,5 @@
//! This module implements the `partition` CLI command
use data_types::chunk_metadata::ChunkSummary;
use data_types::job::Operation;
use generated_types::google::FieldViolation;
use influxdb_iox_client::{
connection::Connection,
@ -10,7 +9,7 @@ use influxdb_iox_client::{
PersistPartitionError, UnloadPartitionChunkError,
},
};
use std::convert::{TryFrom, TryInto};
use std::convert::TryFrom;
use structopt::StructOpt;
use thiserror::Error;
@ -283,10 +282,9 @@ pub async fn command(connection: Connection, config: Config) -> Result<()> {
chunk_id,
} = close_chunk;
let operation: Operation = client
let operation = client
.close_partition_chunk(db_name, table_name, partition_key, chunk_id)
.await?
.try_into()?;
.await?;
serde_json::to_writer_pretty(std::io::stdout(), &operation)?;
}

View File

@ -1,6 +1,3 @@
use std::convert::TryInto;
use data_types::job::Operation;
use generated_types::google::FieldViolation;
use influxdb_iox_client::{connection::Connection, management};
use snafu::{ResultExt, Snafu};
@ -74,12 +71,10 @@ pub async fn command(connection: Connection, config: Config) -> Result<()> {
return Err(Error::NeedsTheForceError);
}
let operation: Operation = client
let operation = client
.wipe_persisted_catalog(db_name)
.await
.context(WipeError)?
.try_into()
.context(InvalidResponse)?;
.context(WipeError)?;
serde_json::to_writer_pretty(std::io::stdout(), &operation).context(WritingJson)?;
}

View File

@ -1,11 +1,8 @@
use data_types::job::Operation;
use generated_types::google::FieldViolation;
use influxdb_iox_client::{
connection::Connection,
management,
operations::{self, Client},
};
use std::convert::TryInto;
use structopt::StructOpt;
use thiserror::Error;
@ -15,9 +12,6 @@ pub enum Error {
#[error("Client error: {0}")]
ClientError(#[from] operations::Error),
#[error("Received invalid response: {0}")]
InvalidResponse(#[from] FieldViolation),
#[error("Failed to create dummy job: {0}")]
CreateDummyJobError(#[from] management::CreateDummyJobError),
@ -68,29 +62,16 @@ enum Command {
pub async fn command(connection: Connection, config: Config) -> Result<()> {
match config.command {
Command::List => {
let result: Result<Vec<Operation>, _> = Client::new(connection)
.list_operations()
.await?
.into_iter()
.map(|c| c.operation())
.map(TryInto::try_into)
.collect();
let operations = result?;
let operations = Client::new(connection).list_operations().await?;
serde_json::to_writer_pretty(std::io::stdout(), &operations)?;
}
Command::Get { id } => {
let operation: Operation = Client::new(connection)
.get_operation(id)
.await?
.try_into()?;
let operation = Client::new(connection).get_operation(id).await?;
serde_json::to_writer_pretty(std::io::stdout(), &operation)?;
}
Command::Wait { id, nanos } => {
let timeout = nanos.map(std::time::Duration::from_nanos);
let operation: Operation = Client::new(connection)
.wait_operation(id, timeout)
.await?
.try_into()?;
let operation = Client::new(connection).wait_operation(id, timeout).await?;
serde_json::to_writer_pretty(std::io::stdout(), &operation)?;
}
Command::Cancel { id } => {
@ -98,10 +79,9 @@ pub async fn command(connection: Connection, config: Config) -> Result<()> {
println!("Ok");
}
Command::Test { nanos } => {
let operation: Operation = management::Client::new(connection)
let operation = management::Client::new(connection)
.create_dummy_job(nanos)
.await?
.try_into()?;
.await?;
serde_json::to_writer_pretty(std::io::stdout(), &operation)?;
}
}

View File

@ -753,9 +753,11 @@ mod tests {
child(prepare_sql_span, "prepare_plan").unwrap();
let collect_span = child(ctx_span, "collect").unwrap();
let execute_span = child(collect_span, "execute_stream_partitioned").unwrap();
let coalesce_span = child(execute_span, "CoalescePartitionsEx").unwrap();
// validate spans from DataFusion ExecutionPlan are present
child(collect_span, "ProjectionExec: expr").unwrap();
child(coalesce_span, "ProjectionExec: expr").unwrap();
let database_not_found = root_spans[3];
assert_eq!(database_not_found.status, SpanStatus::Err);

View File

@ -28,7 +28,7 @@ use data_types::{
};
use influxdb_iox_client::format::QueryOutputFormat;
use influxdb_line_protocol::parse_lines;
use query::{exec::ExecutionContextProvider, QueryDatabase};
use query::exec::ExecutionContextProvider;
use server::{ApplicationState, ConnectionManager, Error, Server as AppServer};
// External crates
@ -392,7 +392,6 @@ where
.get("/health", health::<M>)
.get("/metrics", handle_metrics::<M>)
.get("/iox/api/v1/databases/:name/query", query::<M>)
.get("/api/v1/partitions", list_partitions::<M>)
.get("/debug/pprof", pprof_home::<M>)
.get("/debug/pprof/profile", pprof_profile::<M>)
.get("/debug/pprof/allocs", pprof_heappy_profile::<M>)
@ -644,43 +643,6 @@ async fn handle_metrics<M: ConnectionManager + Send + Sync + Debug + 'static>(
Ok(Response::new(Body::from(body)))
}
#[derive(Deserialize, Debug)]
/// Arguments in the query string of the request to /partitions
struct DatabaseInfo {
org: String,
bucket: String,
}
#[tracing::instrument(level = "debug")]
async fn list_partitions<M: ConnectionManager + Send + Sync + Debug + 'static>(
req: Request<Body>,
) -> Result<Response<Body>, ApplicationError> {
let server = Arc::clone(&req.data::<Server<M>>().expect("server state").app_server);
let query = req.uri().query().context(ExpectedQueryString {})?;
let info: DatabaseInfo = serde_urlencoded::from_str(query).context(InvalidQueryString {
query_string: query,
})?;
let db_name =
org_and_bucket_to_database(&info.org, &info.bucket).context(BucketMappingError)?;
let db = server.db(&db_name)?;
let partition_keys =
db.partition_keys()
.map_err(|e| Box::new(e) as _)
.context(BucketByName {
org: &info.org,
bucket_name: &info.bucket,
})?;
let result = serde_json::to_string(&partition_keys).context(JsonGenerationError)?;
Ok(Response::new(Body::from(result)))
}
#[derive(Deserialize, Debug)]
/// Arguments in the query string of the request to /snapshot
struct SnapshotInfo {

View File

@ -615,7 +615,7 @@ where
del_predicate.exprs.push(expr);
}
db.delete(&table_name, &del_predicate)
db.delete(&table_name, Arc::new(del_predicate))
.await
.map_err(default_db_error_handler)?;
}

View File

@ -9,7 +9,6 @@ use generated_types::{
};
use influxdb_iox_client::{
management::{Client, CreateDatabaseError},
operations,
write::WriteError,
};
@ -880,20 +879,16 @@ async fn test_close_partition_chunk() {
assert_eq!(chunks[0].storage, ChunkStorage::OpenMutableBuffer as i32);
// Move the chunk to read buffer
let operation = management_client
let iox_operation = management_client
.close_partition_chunk(&db_name, table_name, partition_key, 0)
.await
.expect("new partition chunk");
println!("Operation response is {:?}", operation);
let operation_id = operation.id();
let meta = operations::ClientOperation::try_new(operation)
.unwrap()
.metadata();
println!("Operation response is {:?}", iox_operation);
let operation_id = iox_operation.operation.id();
// ensure we got a legit job description back
if let Some(Job::CloseChunk(close_chunk)) = meta.job {
if let Some(Job::CloseChunk(close_chunk)) = iox_operation.metadata.job {
assert_eq!(close_chunk.db_name, db_name);
assert_eq!(close_chunk.partition_key, partition_key);
assert_eq!(close_chunk.chunk_id, 0);
@ -1020,20 +1015,16 @@ async fn test_wipe_preserved_catalog() {
// Recover by wiping preserved catalog
//
let operation = management_client
let iox_operation = management_client
.wipe_persisted_catalog(&db_name)
.await
.expect("wipe persisted catalog");
println!("Operation response is {:?}", operation);
let operation_id = operation.id();
let meta = operations::ClientOperation::try_new(operation)
.unwrap()
.metadata();
println!("Operation response is {:?}", iox_operation);
let operation_id = iox_operation.operation.id();
// ensure we got a legit job description back
if let Some(Job::WipePreservedCatalog(wipe_persisted_catalog)) = meta.job {
if let Some(Job::WipePreservedCatalog(wipe_persisted_catalog)) = iox_operation.metadata.job {
assert_eq!(wipe_persisted_catalog.db_name, db_name);
} else {
panic!("unexpected job returned")

View File

@ -1,13 +1,12 @@
use std::sync::Arc;
use std::time::Duration;
use assert_cmd::Command;
use predicates::prelude::*;
use data_types::chunk_metadata::ChunkAddr;
use data_types::{
chunk_metadata::ChunkStorage,
job::{Job, Operation},
use data_types::chunk_metadata::ChunkStorage;
use generated_types::google::longrunning::IoxOperation;
use generated_types::influxdata::iox::management::v1::{
operation_metadata::Job, CloseChunk, WipePreservedCatalog,
};
use test_helpers::make_temp_file;
use write_buffer::maybe_skip_kafka_integration;
@ -720,7 +719,7 @@ async fn test_close_partition_chunk() {
let lp_data = vec!["cpu,region=west user=23.2 100"];
load_lp(addr, &db_name, lp_data);
let stdout: Operation = serde_json::from_slice(
let stdout: IoxOperation = serde_json::from_slice(
&Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("database")
@ -739,18 +738,16 @@ async fn test_close_partition_chunk() {
)
.expect("Expected JSON output");
let expected_job = Job::CompactChunk {
chunk: ChunkAddr {
db_name: Arc::from(db_name.as_str()),
table_name: Arc::from("cpu"),
partition_key: Arc::from("cpu"),
chunk_id: 0,
},
};
let expected_job = Job::CloseChunk(CloseChunk {
db_name,
table_name: "cpu".to_string(),
partition_key: "cpu".to_string(),
chunk_id: 0,
});
assert_eq!(
Some(expected_job),
stdout.job,
stdout.metadata.job,
"operation was {:#?}",
stdout
);
@ -783,7 +780,7 @@ async fn test_wipe_persisted_catalog() {
let server_fixture = fixture_broken_catalog(&db_name).await;
let addr = server_fixture.grpc_base();
let stdout: Operation = serde_json::from_slice(
let stdout: IoxOperation = serde_json::from_slice(
&Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("database")
@ -800,13 +797,11 @@ async fn test_wipe_persisted_catalog() {
)
.expect("Expected JSON output");
let expected_job = Job::WipePreservedCatalog {
db_name: Arc::from(db_name.as_str()),
};
let expected_job = Job::WipePreservedCatalog(WipePreservedCatalog { db_name });
assert_eq!(
Some(expected_job),
stdout.job,
stdout.metadata.job,
"operation was {:#?}",
stdout
);

View File

@ -17,7 +17,7 @@ async fn test_operations() {
let nanos = vec![Duration::from_secs(20).as_nanos() as _, 1];
let operation = management_client
let iox_operation = management_client
.create_dummy_job(nanos.clone())
.await
.expect("create dummy job failed");
@ -28,20 +28,15 @@ async fn test_operations() {
.expect("list operations failed");
assert_eq!(running_ops.len(), 1);
assert_eq!(running_ops[0].name(), operation.name);
assert_eq!(running_ops[0].operation.name, iox_operation.operation.name);
let id = operation.name.parse().expect("not an integer");
let id = iox_operation.operation.id();
let iox_operation = operations_client.get_operation(id).await.unwrap();
let meta = operations_client
.client_operation(id)
.await
.unwrap()
.metadata();
let job = iox_operation.metadata.job.expect("expected a job");
let job = meta.job.expect("expected a job");
assert_eq!(meta.total_count, 2);
assert_eq!(meta.pending_count, 1);
assert_eq!(iox_operation.metadata.total_count, 2);
assert_eq!(iox_operation.metadata.pending_count, 1);
assert_eq!(
job,
operation_metadata::Job::Dummy(Dummy {
@ -51,14 +46,14 @@ async fn test_operations() {
);
// Check wait times out correctly
let fetched = operations_client
let iox_operation = operations_client
.wait_operation(id, Some(Duration::from_micros(10)))
.await
.expect("failed to wait operation");
assert!(!fetched.done);
assert!(!iox_operation.operation.done);
// Shouldn't specify wall_nanos as not complete
assert_eq!(meta.wall_nanos, 0);
assert_eq!(iox_operation.metadata.wall_nanos, 0);
let wait = tokio::spawn(async move {
let mut operations_client = server_fixture.operations_client();
@ -74,18 +69,15 @@ async fn test_operations() {
.expect("failed to cancel operation");
let waited = wait.await.unwrap();
let meta = operations::ClientOperation::try_new(waited.clone())
.unwrap()
.metadata();
assert!(waited.done);
assert!(meta.wall_nanos > 0);
assert!(meta.cpu_nanos > 0);
assert_eq!(meta.pending_count, 0);
assert_eq!(meta.total_count, 2);
assert_eq!(meta.cancelled_count, 1);
assert!(waited.operation.done);
assert!(waited.metadata.wall_nanos > 0);
assert!(waited.metadata.cpu_nanos > 0);
assert_eq!(waited.metadata.pending_count, 0);
assert_eq!(waited.metadata.total_count, 2);
assert_eq!(waited.metadata.cancelled_count, 1);
match waited.result {
match waited.operation.result {
Some(operations::generated_types::operation::Result::Error(status)) => {
assert_eq!(status.code, tonic::Code::Cancelled as i32)
}

View File

@ -1,6 +1,7 @@
use crate::common::server_fixture::ServerFixture;
use assert_cmd::Command;
use data_types::job::{Job, Operation, OperationStatus};
use generated_types::google::longrunning::IoxOperation;
use generated_types::influxdata::iox::management::v1::{operation_metadata::Job, Dummy};
use predicates::prelude::*;
#[tokio::test]
@ -9,7 +10,7 @@ async fn test_start_stop() {
let addr = server_fixture.grpc_base();
let duration = std::time::Duration::from_secs(10).as_nanos() as u64;
let stdout: Operation = serde_json::from_slice(
let stdout: IoxOperation = serde_json::from_slice(
&Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("operation")
@ -24,13 +25,13 @@ async fn test_start_stop() {
)
.expect("expected JSON output");
assert_eq!(stdout.total_count, 1);
match stdout.job {
Some(Job::Dummy { nanos, .. }) => assert_eq!(nanos, vec![duration]),
_ => panic!("expected dummy job got {:?}", stdout.job),
assert_eq!(stdout.metadata.total_count, 1);
match stdout.metadata.job {
Some(Job::Dummy(Dummy { nanos, .. })) => assert_eq!(nanos, vec![duration]),
_ => panic!("expected dummy job got {:?}", stdout.metadata.job),
}
let operations: Vec<Operation> = serde_json::from_slice(
let operations: Vec<IoxOperation> = serde_json::from_slice(
&Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("operation")
@ -45,33 +46,33 @@ async fn test_start_stop() {
.expect("expected JSON output");
assert_eq!(operations.len(), 1);
match &operations[0].job {
Some(Job::Dummy { nanos, .. }) => {
match &operations[0].metadata.job {
Some(Job::Dummy(Dummy { nanos, .. })) => {
assert_eq!(nanos.len(), 1);
assert_eq!(nanos[0], duration);
}
_ => panic!("expected dummy job got {:?}", &operations[0].job),
_ => panic!("expected dummy job got {:?}", &operations[0].metadata.job),
}
let id = operations[0].id;
let name = &operations[0].operation.name;
Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("operation")
.arg("cancel")
.arg(id.to_string())
.arg(name.clone())
.arg("--host")
.arg(addr)
.assert()
.success()
.stdout(predicate::str::contains("Ok"));
let completed: Operation = serde_json::from_slice(
let completed: IoxOperation = serde_json::from_slice(
&Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("operation")
.arg("wait")
.arg(id.to_string())
.arg(name.to_string())
.arg("--host")
.arg(addr)
.assert()
@ -81,9 +82,8 @@ async fn test_start_stop() {
)
.expect("expected JSON output");
assert_eq!(completed.pending_count, 0);
assert_eq!(completed.total_count, 1);
assert_eq!(completed.cancelled_count, 1);
assert_eq!(completed.status, OperationStatus::Cancelled);
assert_eq!(&completed.job, &operations[0].job)
assert_eq!(completed.metadata.pending_count, 0);
assert_eq!(completed.metadata.total_count, 1);
assert_eq!(completed.metadata.cancelled_count, 1);
assert_eq!(&completed.metadata.job, &operations[0].metadata.job)
}

View File

@ -2,7 +2,6 @@ use itertools::Itertools;
use arrow_util::assert_batches_eq;
use data_types::chunk_metadata::ChunkStorage;
use influxdb_iox_client::operations;
use crate::{
common::server_fixture::ServerFixture,
@ -125,11 +124,11 @@ async fn test_full_lifecycle() {
.await
.unwrap()
.iter()
.any(|operation| match operation.metadata().job {
.any(|operation| match &operation.metadata.job {
Some(Job::CompactChunks(CompactChunks {
db_name: operation_db_name,
..
})) => operation_db_name == db_name,
})) => operation_db_name == &db_name,
_ => false,
});
assert!(performed_compaction);
@ -269,20 +268,16 @@ async fn create_readbuffer_chunk(fixture: &ServerFixture, db_name: &str) -> u32
assert_eq!(chunks[0].storage, ChunkStorage::OpenMutableBuffer);
// Move the chunk to read buffer
let operation = management_client
let iox_operation = management_client
.close_partition_chunk(db_name, table_name, partition_key, 0)
.await
.expect("new partition chunk");
println!("Operation response is {:?}", operation);
let operation_id = operation.id();
let meta = operations::ClientOperation::try_new(operation)
.unwrap()
.metadata();
println!("Operation response is {:?}", iox_operation);
let operation_id = iox_operation.operation.id();
// ensure we got a legit job description back
if let Some(Job::CloseChunk(close_chunk)) = meta.job {
if let Some(Job::CloseChunk(close_chunk)) = iox_operation.metadata.job {
assert_eq!(close_chunk.db_name, db_name);
assert_eq!(close_chunk.partition_key, partition_key);
assert_eq!(close_chunk.chunk_id, 0);

View File

@ -556,12 +556,8 @@ where
}
if t_start.elapsed() >= wait_time {
let operations = fixture.operations_client().list_operations().await.unwrap();
let mut operations: Vec<_> = operations
.into_iter()
.map(|x| (x.name().parse::<usize>().unwrap(), x.metadata()))
.collect();
operations.sort_by_key(|x| x.0);
let mut operations = fixture.operations_client().list_operations().await.unwrap();
operations.sort_by(|a, b| a.operation.name.cmp(&b.operation.name));
panic!(
"Could not find {} within {:?}.\nChunks were: {:#?}\nOperations were: {:#?}",

View File

@ -306,17 +306,20 @@ async fn test_sql_observer_operations() {
let partition_key = "cpu";
let table_name = "cpu";
// Move the chunk to read buffer
let operation = management_client
let iox_operation = management_client
.close_partition_chunk(&db_name, table_name, partition_key, 0)
.await
.expect("new partition chunk");
println!("Operation response is {:?}", operation);
println!("Operation response is {:?}", iox_operation);
// wait for the job to be done
fixture
.operations_client()
.wait_operation(operation.id(), Some(std::time::Duration::from_secs(1)))
.wait_operation(
iox_operation.operation.id(),
Some(std::time::Duration::from_secs(1)),
)
.await
.expect("failed to wait operation");

View File

@ -27,12 +27,12 @@ async fn test_operations() {
.expect("write succeded");
// Move the chunk to read buffer
let operation = management_client
let iox_operation = management_client
.close_partition_chunk(&db_name1, table_name, partition_key, 0)
.await
.expect("new partition chunk");
let operation_id = operation.id();
let operation_id = iox_operation.operation.id();
operations_client
.wait_operation(operation_id, Some(std::time::Duration::from_secs(1)))
.await