From ce224bd37ff3c22f61b6c09dd009e65e9adfb2cb Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 16 Sep 2021 10:45:20 -0400 Subject: [PATCH 1/6] fix: Capture query execution traces for storage gRPC queries as well (#2553) * fix: Capture query execution traces for storage gRPC queries as well * refactor: remove debugging droppings * refactor: do not Box::pin within TracedStream * refactor: Use Futures::TryStreamExt rather than custom collect function * fix: remove wild println Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com> --- query/src/exec/context.rs | 74 +++++++++++++------------ query/src/exec/query_tracing.rs | 78 +++++++++++++++++++++------ server/src/db/lifecycle/compact.rs | 2 +- server/src/db/lifecycle/move_chunk.rs | 2 +- server/src/db/lifecycle/persist.rs | 6 ++- src/influxdb_ioxd.rs | 4 +- 6 files changed, 109 insertions(+), 57 deletions(-) diff --git a/query/src/exec/context.rs b/query/src/exec/context.rs index b526a58773..ce72f303fb 100644 --- a/query/src/exec/context.rs +++ b/query/src/exec/context.rs @@ -11,18 +11,19 @@ use datafusion::{ logical_plan::{LogicalPlan, UserDefinedLogicalNode}, physical_plan::{ coalesce_partitions::CoalescePartitionsExec, - collect, displayable, + displayable, planner::{DefaultPhysicalPlanner, ExtensionPlanner}, ExecutionPlan, PhysicalPlanner, SendableRecordBatchStream, }, prelude::*, }; +use futures::TryStreamExt; use observability_deps::tracing::{debug, trace}; use trace::{ctx::SpanContext, span::SpanRecorder}; use crate::exec::{ fieldlist::{FieldList, IntoFieldList}, - query_tracing::send_metrics_to_tracing, + query_tracing::TracedStream, schema_pivot::{SchemaPivotExec, SchemaPivotNode}, seriesset::{SeriesSetConverter, SeriesSetItem}, split::StreamSplitExec, @@ -272,45 +273,63 @@ impl IOxExecutionContext { /// Executes the logical plan using DataFusion on a separate /// thread pool and produces RecordBatches pub async fn collect(&self, physical_plan: Arc) -> Result> { - let ctx = self.child_ctx("collect"); debug!( "Running plan, physical:\n{}", displayable(physical_plan.as_ref()).indent() ); + let ctx = self.child_ctx("collect"); + let stream = ctx.execute_stream(physical_plan).await?; - let res = ctx.run(collect(Arc::clone(&physical_plan))).await; - - // send metrics to tracing, even on error - ctx.save_metrics(physical_plan); - res + ctx.run( + stream + .err_into() // convert to DataFusionError + .try_collect(), + ) + .await } - /// Executes the physical plan and produces a RecordBatchStream to stream - /// over the result that iterates over the results. - pub async fn execute( + /// Executes the physical plan and produces a + /// `SendableRecordBatchStream` to stream over the result that + /// iterates over the results. The creation of the stream is + /// performed in a separate thread pool. + pub async fn execute_stream( &self, physical_plan: Arc, ) -> Result { match physical_plan.output_partitioning().partition_count() { 0 => unreachable!(), - 1 => self.execute_partition(physical_plan, 0).await, + 1 => self.execute_stream_partitioned(physical_plan, 0).await, _ => { // Merge into a single partition - self.execute_partition(Arc::new(CoalescePartitionsExec::new(physical_plan)), 0) - .await + self.execute_stream_partitioned( + Arc::new(CoalescePartitionsExec::new(physical_plan)), + 0, + ) + .await } } } - /// Executes a single partition of a physical plan and produces a RecordBatchStream to stream - /// over the result that iterates over the results. - pub async fn execute_partition( + /// Executes a single partition of a physical plan and produces a + /// `SendableRecordBatchStream` to stream over the result that + /// iterates over the results. The creation of the stream is + /// performed in a separate thread pool. + pub async fn execute_stream_partitioned( &self, physical_plan: Arc, partition: usize, ) -> Result { - self.run(async move { physical_plan.execute(partition).await }) - .await + let span = self + .recorder + .span() + .map(|span| span.child("execute_stream_partitioned")); + + self.run(async move { + let stream = physical_plan.execute(partition).await?; + let stream = TracedStream::new(stream, span, physical_plan); + Ok(Box::pin(stream) as _) + }) + .await } /// Executes the SeriesSetPlans on the query executor, in @@ -349,7 +368,7 @@ impl IOxExecutionContext { let physical_plan = ctx.prepare_plan(&plan)?; - let it = ctx.execute(physical_plan).await?; + let it = ctx.execute_stream(physical_plan).await?; SeriesSetConverter::default() .convert( @@ -486,19 +505,4 @@ impl IOxExecutionContext { recorder: self.recorder.child(name), } } - - /// Saves any DataFusion metrics that are currently present in - /// `physical_plan` to the span recorder so they show up in - /// distributed traces (e.g. Jaeger) - /// - /// This function should be invoked after `physical_plan` has - /// fully `collect`ed, meaning that `PhysicalPlan::execute()` has - /// been invoked and the resulting streams have been completely - /// consumed. Calling `save_metrics` metrics prior to this point - /// may result in saving incomplete information. - pub fn save_metrics(&self, physical_plan: Arc) { - if let Some(span) = self.recorder.span() { - send_metrics_to_tracing(span, physical_plan.as_ref()) - } - } } diff --git a/query/src/exec/query_tracing.rs b/query/src/exec/query_tracing.rs index cf6eb3cf99..7fce5f545e 100644 --- a/query/src/exec/query_tracing.rs +++ b/query/src/exec/query_tracing.rs @@ -1,15 +1,67 @@ //! This module contains the code to map DataFusion metrics to `Span`s //! for use in distributed tracing (e.g. Jaeger) -use std::{borrow::Cow, fmt}; +use std::{borrow::Cow, fmt, sync::Arc}; +use arrow::record_batch::RecordBatch; use chrono::{DateTime, Utc}; use datafusion::physical_plan::{ metrics::{MetricValue, MetricsSet}, - DisplayFormatType, ExecutionPlan, + DisplayFormatType, ExecutionPlan, RecordBatchStream, SendableRecordBatchStream, }; +use futures::StreamExt; use observability_deps::tracing::debug; -use trace::span::Span; +use trace::span::{Span, SpanRecorder}; + +/// Stream wrapper that records DataFusion `MetricSets` into IOx +/// [`Span`]s when it is dropped. +pub(crate) struct TracedStream { + inner: SendableRecordBatchStream, + span_recorder: SpanRecorder, + physical_plan: Arc, +} + +impl TracedStream { + /// Return a stream that records DataFusion `MetricSets` from + /// `physical_plan` into `span` when dropped. + pub(crate) fn new( + inner: SendableRecordBatchStream, + span: Option, + physical_plan: Arc, + ) -> Self { + Self { + inner, + span_recorder: SpanRecorder::new(span), + physical_plan, + } + } +} + +impl RecordBatchStream for TracedStream { + fn schema(&self) -> arrow::datatypes::SchemaRef { + self.inner.schema() + } +} + +impl futures::Stream for TracedStream { + type Item = arrow::error::Result; + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + self.inner.poll_next_unpin(cx) + } +} + +impl Drop for TracedStream { + fn drop(&mut self) { + if let Some(span) = self.span_recorder.span() { + let default_end_time = Utc::now(); + send_metrics_to_tracing(default_end_time, span, self.physical_plan.as_ref()); + } + } +} /// This function translates data in DataFusion `MetricSets` into IOx /// [`Span`]s. It records a snapshot of the current state of the @@ -26,15 +78,7 @@ use trace::span::Span; /// 1. If the ExecutionPlan had no metrics /// 2. The total number of rows produced by the ExecutionPlan (if available) /// 3. The elapsed compute time taken by the ExecutionPlan -pub(crate) fn send_metrics_to_tracing(parent_span: &Span, physical_plan: &dyn ExecutionPlan) { - // The parent span may be open, but since the physical_plan is - // assumed to be fully collected, using `now()` is a conservative - // estimate of the end time - let default_end_time = Utc::now(); - send_metrics_to_tracing_inner(default_end_time, parent_span, physical_plan) -} - -fn send_metrics_to_tracing_inner( +fn send_metrics_to_tracing( default_end_time: DateTime, parent_span: &Span, physical_plan: &dyn ExecutionPlan, @@ -101,7 +145,7 @@ fn send_metrics_to_tracing_inner( // recurse for child in physical_plan.children() { - send_metrics_to_tracing_inner(span_end, &span, child.as_ref()) + send_metrics_to_tracing(span_end, &span, child.as_ref()) } span.export() @@ -185,7 +229,7 @@ mod tests { let exec = TestExec::new(name, Default::default()); let traces = TraceBuilder::new(); - send_metrics_to_tracing(&traces.make_span(), &exec); + send_metrics_to_tracing(Utc::now(), &traces.make_span(), &exec); let spans = traces.spans(); assert_eq!(spans.len(), 1); @@ -216,7 +260,7 @@ mod tests { exec.new_child("child4", make_time_metricset(None, None)); let traces = TraceBuilder::new(); - send_metrics_to_tracing_inner(ts5, &traces.make_span(), &exec); + send_metrics_to_tracing(ts5, &traces.make_span(), &exec); let spans = traces.spans(); println!("Spans: \n\n{:#?}", spans); @@ -242,7 +286,7 @@ mod tests { exec.metrics = None; let traces = TraceBuilder::new(); - send_metrics_to_tracing(&traces.make_span(), &exec); + send_metrics_to_tracing(Utc::now(), &traces.make_span(), &exec); let spans = traces.spans(); assert_eq!(spans.len(), 1); @@ -266,7 +310,7 @@ mod tests { add_elapsed_compute(exec.metrics_mut(), 2000, 2); let traces = TraceBuilder::new(); - send_metrics_to_tracing(&traces.make_span(), &exec); + send_metrics_to_tracing(Utc::now(), &traces.make_span(), &exec); // aggregated metrics should be reported let spans = traces.spans(); diff --git a/server/src/db/lifecycle/compact.rs b/server/src/db/lifecycle/compact.rs index 0c09f8601c..281d44b1ec 100644 --- a/server/src/db/lifecycle/compact.rs +++ b/server/src/db/lifecycle/compact.rs @@ -103,7 +103,7 @@ pub(crate) fn compact_chunks( ReorgPlanner::new().compact_plan(schema, query_chunks.iter().map(Arc::clone), key)?; let physical_plan = ctx.prepare_plan(&plan)?; - let stream = ctx.execute(physical_plan).await?; + let stream = ctx.execute_stream(physical_plan).await?; let rb_chunk = collect_rub(stream, &addr, metric_registry.as_ref()) .await? .expect("chunk has zero rows"); diff --git a/server/src/db/lifecycle/move_chunk.rs b/server/src/db/lifecycle/move_chunk.rs index e0ddf48b36..2dbe716eb1 100644 --- a/server/src/db/lifecycle/move_chunk.rs +++ b/server/src/db/lifecycle/move_chunk.rs @@ -54,7 +54,7 @@ pub fn move_chunk_to_read_buffer( ReorgPlanner::new().compact_plan(schema, query_chunks.iter().map(Arc::clone), key)?; let physical_plan = ctx.prepare_plan(&plan)?; - let stream = ctx.execute(physical_plan).await?; + let stream = ctx.execute_stream(physical_plan).await?; let rb_chunk = collect_rub( stream, &addr.clone().into_partition(), diff --git a/server/src/db/lifecycle/persist.rs b/server/src/db/lifecycle/persist.rs index 8d8d5b619b..52b3e37e3a 100644 --- a/server/src/db/lifecycle/persist.rs +++ b/server/src/db/lifecycle/persist.rs @@ -112,8 +112,10 @@ pub fn persist_chunks( "Expected split plan to produce exactly 2 partitions" ); - let to_persist_stream = ctx.execute_partition(Arc::clone(&physical_plan), 0).await?; - let remainder_stream = ctx.execute_partition(physical_plan, 1).await?; + let to_persist_stream = ctx + .execute_stream_partitioned(Arc::clone(&physical_plan), 0) + .await?; + let remainder_stream = ctx.execute_stream_partitioned(physical_plan, 1).await?; let (to_persist, remainder) = futures::future::try_join( collect_rub(to_persist_stream, &addr, metric_registry.as_ref()), diff --git a/src/influxdb_ioxd.rs b/src/influxdb_ioxd.rs index 1a22ffea0e..afcffc61bd 100644 --- a/src/influxdb_ioxd.rs +++ b/src/influxdb_ioxd.rs @@ -753,9 +753,11 @@ mod tests { child(prepare_sql_span, "prepare_plan").unwrap(); let collect_span = child(ctx_span, "collect").unwrap(); + let execute_span = child(collect_span, "execute_stream_partitioned").unwrap(); + let coalesce_span = child(execute_span, "CoalescePartitionsEx").unwrap(); // validate spans from DataFusion ExecutionPlan are present - child(collect_span, "ProjectionExec: expr").unwrap(); + child(coalesce_span, "ProjectionExec: expr").unwrap(); let database_not_found = root_spans[3]; assert_eq!(database_not_found.status, SpanStatus::Err); From 69939a5ae2cd66fec2ce8832e4c9bd4b6902798c Mon Sep 17 00:00:00 2001 From: Jake Goulding Date: Mon, 23 Aug 2021 09:59:26 -0400 Subject: [PATCH 2/6] perf: Don't open the output file each time we write. This improves performance of the the file output mode, which should make it easier to improve the performance of the core generation logic. Benchmarked via: ``` time \ ./target/release/iox_data_generator \ --spec iox_data_generator/schemas/fully-supported.toml \ --output /tmp/out \ --start '1 month ago' ``` Before: ``` Submitted 271608 total points real 10.912 10911567us user 3.129 3129032us sys 6.257 6257340us cpu 86% mem 7152 KiB ``` After: ``` Submitted 271588 total points real 2.291 2291364us user 1.969 1969357us sys 0.058 58030us cpu 88% mem 7104 KiB ``` That's 21.0% of the previous time. --- iox_data_generator/src/lib.rs | 13 +++++++++- iox_data_generator/src/write.rs | 46 ++++++++++++++++++++------------- 2 files changed, 40 insertions(+), 19 deletions(-) diff --git a/iox_data_generator/src/lib.rs b/iox_data_generator/src/lib.rs index b57edf3103..b14eeb0ec1 100644 --- a/iox_data_generator/src/lib.rs +++ b/iox_data_generator/src/lib.rs @@ -77,6 +77,15 @@ pub enum Error { /// Underlying `agent` module error that caused this problem source: agent::Error, }, + + /// Error that may happen when constructing an agent's writer + #[snafu(display("Could not create writer for agent `{}`, caused by:\n{}", name, source))] + CouldNotCreateAgentWriter { + /// The name of the relevant agent + name: String, + /// Underlying `write` module error that caused this problem + source: write::Error, + }, } type Result = std::result::Result; @@ -135,7 +144,9 @@ pub async fn generate( ) .context(CouldNotCreateAgent { name: &agent_name })?; - let agent_points_writer = points_writer_builder.build_for_agent(&agent_name); + let agent_points_writer = points_writer_builder + .build_for_agent(&agent_name) + .context(CouldNotCreateAgentWriter { name: &agent_name })?; handles.push(tokio::task::spawn(async move { agent.generate_all(agent_points_writer, batch_size).await diff --git a/iox_data_generator/src/write.rs b/iox_data_generator/src/write.rs index fb0966f8d0..ae68d93940 100644 --- a/iox_data_generator/src/write.rs +++ b/iox_data_generator/src/write.rs @@ -10,14 +10,23 @@ use std::{ }; use std::{ fs, - fs::OpenOptions, + fs::{File, OpenOptions}, + io::BufWriter, path::{Path, PathBuf}, }; -use tracing::info; /// Errors that may happen while writing points. #[derive(Snafu, Debug)] pub enum Error { + /// Error that may happen when writing line protocol to a file + #[snafu(display("Could open line protocol file {}: {}", filename.display(), source))] + CantOpenLineProtocolFile { + /// The location of the file we tried to open + filename: PathBuf, + /// Underlying IO error that caused this problem + source: std::io::Error, + }, + /// Error that may happen when writing line protocol to a no-op sink #[snafu(display("Could not generate line protocol: {}", source))] CantWriteToNoOp { @@ -174,7 +183,7 @@ impl PointsWriterBuilder { /// Create a writer out of this writer's configuration for a particular /// agent that runs in a separate thread/task. - pub fn build_for_agent(&mut self, agent_name: &str) -> PointsWriter { + pub fn build_for_agent(&mut self, agent_name: &str) -> Result { let inner_writer = match &mut self.config { PointsWriterConfig::Api { client, @@ -189,7 +198,16 @@ impl PointsWriterBuilder { let mut filename = dir_path.clone(); filename.push(agent_name); filename.set_extension("txt"); - InnerPointsWriter::File(filename) + + let file = OpenOptions::new() + .append(true) + .create(true) + .open(&filename) + .context(CantOpenLineProtocolFile { filename })?; + + let file = BufWriter::new(file); + + InnerPointsWriter::File { file } } PointsWriterConfig::NoOp { perform_write } => InnerPointsWriter::NoOp { perform_write: *perform_write, @@ -204,7 +222,7 @@ impl PointsWriterBuilder { PointsWriterConfig::Stdout => InnerPointsWriter::Stdout, }; - PointsWriter { inner_writer } + Ok(PointsWriter { inner_writer }) } } @@ -228,7 +246,9 @@ enum InnerPointsWriter { org: String, bucket: String, }, - File(PathBuf), + File { + file: BufWriter, + }, NoOp { perform_write: bool, }, @@ -250,22 +270,12 @@ impl InnerPointsWriter { .await .context(CantWriteToApi)?; } - Self::File(filename) => { - info!("Opening file {:?}", filename); - let num_points = points.len(); - let file = OpenOptions::new() - .append(true) - .create(true) - .open(&filename) - .context(CantWriteToLineProtocolFile)?; - - let mut file = std::io::BufWriter::new(file); + Self::File { file } => { for point in points { point - .write_data_point_to(&mut file) + .write_data_point_to(&mut *file) .context(CantWriteToLineProtocolFile)?; } - info!("Wrote {} points to {:?}", num_points, filename); } Self::NoOp { perform_write } => { if *perform_write { From ec943081c7609459242ab587e82d5130d14052cf Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Thu, 16 Sep 2021 14:52:57 +0200 Subject: [PATCH 3/6] refactor: `Arc>` => `Vec>` for del predicates The motivations are: 1. The API uses a SINGLE predicate and adds that to many chunks. With `Arc>` you gain nothing, with `Vec>` the predicate is only stored once (in many vectors) 2. While we currently add predicates blindly to all chunks, we can be way smarter in the future and prune out tables, partitions or even single chunks (based on statistics). With that, it will be rare that many chunks share the exact same set of predicates. 3. It would be nice if we could de-duplicate predicates when writing them to the preserved catalog without needing to repeat the pruning discussed in point 2. This is way easier to implement whan chunks exists in `Arc`s. 4. As a side-note: the `Arc>` wasn't really cloned around but instead was created many time. So the new version should be more memory efficient out of the box. --- predicate/src/predicate.rs | 14 ++++++-- query/src/lib.rs | 10 +++--- query/src/test.rs | 13 +++---- server/src/db.rs | 8 ++--- server/src/db/catalog/chunk.rs | 55 +++++++++++++---------------- server/src/db/catalog/partition.rs | 6 ++-- server/src/db/chunk.rs | 12 +++---- server/src/db/lifecycle/compact.rs | 7 ++-- server/src/db/lifecycle/persist.rs | 10 +++--- server/src/db/load.rs | 2 +- src/influxdb_ioxd/rpc/management.rs | 2 +- 11 files changed, 70 insertions(+), 69 deletions(-) diff --git a/predicate/src/predicate.rs b/predicate/src/predicate.rs index f763eacba8..0cc3f0c16f 100644 --- a/predicate/src/predicate.rs +++ b/predicate/src/predicate.rs @@ -164,8 +164,13 @@ impl Predicate { /// Add each range [start, stop] of the delete_predicates into the predicate in /// the form "time < start OR time > stop" to eliminate that range from the query - pub fn add_delete_ranges(&mut self, delete_predicates: &[Self]) { + pub fn add_delete_ranges(&mut self, delete_predicates: &[S]) + where + S: AsRef, + { for pred in delete_predicates { + let pred = pred.as_ref(); + if let Some(range) = pred.range { let expr = col(TIME_COLUMN_NAME) .lt(lit(range.start)) @@ -182,8 +187,13 @@ impl Predicate { /// The negated list will be "NOT(Delete_1)", NOT(Delete_2)" which means /// NOT(city != "Boston" AND temp = 70), NOT(state = "NY" AND route != "I90") which means /// [NOT(city = Boston") OR NOT(temp = 70)], [NOT(state = "NY") OR NOT(route != "I90")] - pub fn add_delete_exprs(&mut self, delete_predicates: &[Self]) { + pub fn add_delete_exprs(&mut self, delete_predicates: &[S]) + where + S: AsRef, + { for pred in delete_predicates { + let pred = pred.as_ref(); + let mut expr: Option = None; for exp in &pred.exprs { match expr { diff --git a/query/src/lib.rs b/query/src/lib.rs index 447efb465a..f1d6d4e8e1 100644 --- a/query/src/lib.rs +++ b/query/src/lib.rs @@ -46,7 +46,7 @@ pub trait QueryChunkMeta: Sized { fn schema(&self) -> Arc; // return a reference to delete predicates of the chunk - fn delete_predicates(&self) -> &Vec; + fn delete_predicates(&self) -> &[Arc]; } /// A `Database` is the main trait implemented by the IOx subsystems @@ -137,7 +137,7 @@ pub trait QueryChunk: QueryChunkMeta + Debug + Send + Sync { &self, predicate: &Predicate, selection: Selection<'_>, - delete_predicates: &[Predicate], + delete_predicates: &[Arc], ) -> Result; /// Returns true if data of this chunk is sorted @@ -166,10 +166,10 @@ where self.as_ref().schema() } - fn delete_predicates(&self) -> &Vec { - let pred: &Vec = self.as_ref().delete_predicates(); + fn delete_predicates(&self) -> &[Arc] { + let pred = self.as_ref().delete_predicates(); debug!(?pred, "Delete predicate in QueryChunkMeta"); - self.as_ref().delete_predicates() + pred } } diff --git a/query/src/test.rs b/query/src/test.rs index 6ea0c1222d..d7c2661852 100644 --- a/query/src/test.rs +++ b/query/src/test.rs @@ -175,7 +175,8 @@ pub struct TestChunk { predicate_match: Option, /// Copy of delete predicates passed - delete_predicates: Vec, + delete_predicates: Vec>, + /// Order of this chunk relative to other overlapping chunks. order: ChunkOrder, } @@ -823,7 +824,7 @@ impl QueryChunk for TestChunk { &self, predicate: &Predicate, _selection: Selection<'_>, - _delete_predicates: &[Predicate], + _delete_predicates: &[Arc], ) -> Result { self.check_error()?; @@ -913,11 +914,11 @@ impl QueryChunkMeta for TestChunk { } // return a reference to delete predicates of the chunk - fn delete_predicates(&self) -> &Vec { - let pred: &Vec = &self.delete_predicates; + fn delete_predicates(&self) -> &[Arc] { + let pred = &self.delete_predicates; debug!(?pred, "Delete predicate in Test Chunk"); - &self.delete_predicates + pred } } @@ -927,7 +928,7 @@ pub async fn raw_data(chunks: &[Arc]) -> Vec { for c in chunks { let pred = Predicate::default(); let selection = Selection::All; - let delete_predicates: Vec = vec![]; + let delete_predicates: Vec> = vec![]; let mut stream = c .read_filter(&pred, selection, &delete_predicates) .expect("Error in read_filter"); diff --git a/server/src/db.rs b/server/src/db.rs index 3a1b17b639..7335a614fc 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -519,7 +519,7 @@ impl Db { pub async fn delete( self: &Arc, table_name: &str, - delete_predicate: &Predicate, + delete_predicate: Arc, ) -> Result<()> { // get all partitions of this table let table = self @@ -534,7 +534,7 @@ impl Db { // save the delete predicate in the chunk let mut chunk = chunk.write(); chunk - .add_delete_predicate(delete_predicate) + .add_delete_predicate(Arc::clone(&delete_predicate)) .context(AddDeletePredicateError)?; } } @@ -3780,7 +3780,7 @@ mod tests { .timestamp_range(0, 15) .add_expr(expr) .build(); - db.delete("cpu", &pred).await.unwrap(); + db.delete("cpu", Arc::new(pred)).await.unwrap(); // When the above delete is issued, the open mub chunk is frozen with the delete predicate added // Verify there is MUB but no RUB no OS assert!(!mutable_chunk_ids(&db, partition_key).is_empty()); @@ -3913,7 +3913,7 @@ mod tests { .add_expr(expr1) .add_expr(expr2) .build(); - db.delete("cpu", &pred).await.unwrap(); + db.delete("cpu", Arc::new(pred)).await.unwrap(); // When the above delete is issued, the open mub chunk is frozen with the delete predicate added // Verify there is MUB but no RUB no OS assert!(!mutable_chunk_ids(&db, partition_key).is_empty()); diff --git a/server/src/db/catalog/chunk.rs b/server/src/db/catalog/chunk.rs index 2821dd8ac5..bf474d3403 100644 --- a/server/src/db/catalog/chunk.rs +++ b/server/src/db/catalog/chunk.rs @@ -80,7 +80,7 @@ pub struct ChunkMetadata { pub schema: Arc, /// Delete predicates of this chunk - pub delete_predicates: Arc>, + pub delete_predicates: Vec>, } /// Different memory representations of a frozen chunk. @@ -307,14 +307,14 @@ impl CatalogChunk { time_of_last_write: DateTime, schema: Arc, metrics: ChunkMetrics, - delete_predicates: Arc>, + delete_predicates: Vec>, order: ChunkOrder, ) -> Self { let stage = ChunkStage::Frozen { meta: Arc::new(ChunkMetadata { table_summary: Arc::new(chunk.table_summary()), schema, - delete_predicates: Arc::clone(&delete_predicates), + delete_predicates, }), representation: ChunkStageFrozenRepr::ReadBuffer(Arc::new(chunk)), }; @@ -342,7 +342,7 @@ impl CatalogChunk { time_of_first_write: DateTime, time_of_last_write: DateTime, metrics: ChunkMetrics, - delete_predicates: Arc>, + delete_predicates: Vec>, order: ChunkOrder, ) -> Self { assert_eq!(chunk.table_name(), addr.table_name.as_ref()); @@ -469,7 +469,7 @@ impl CatalogChunk { } } - pub fn add_delete_predicate(&mut self, delete_predicate: &Predicate) -> Result<()> { + pub fn add_delete_predicate(&mut self, delete_predicate: Arc) -> Result<()> { debug!( ?delete_predicate, "Input delete predicate to CatalogChunk add_delete_predicate" @@ -479,24 +479,14 @@ impl CatalogChunk { // Freeze/close this chunk and add delete_predicate to its frozen one self.freeze_with_predicate(delete_predicate)?; } - ChunkStage::Frozen { meta, .. } => { + ChunkStage::Frozen { meta, .. } | ChunkStage::Persisted { meta, .. } => { // Add the delete_predicate into the chunk's metadata - let mut del_preds: Vec = (*meta.delete_predicates).clone(); - del_preds.push(delete_predicate.clone()); + let mut del_preds = meta.delete_predicates.clone(); + del_preds.push(delete_predicate); *meta = Arc::new(ChunkMetadata { table_summary: Arc::clone(&meta.table_summary), schema: Arc::clone(&meta.schema), - delete_predicates: Arc::new(del_preds), - }); - } - ChunkStage::Persisted { meta, .. } => { - // Add the delete_predicate into the chunk's metadata - let mut del_preds: Vec = (*meta.delete_predicates).clone(); - del_preds.push(delete_predicate.clone()); - *meta = Arc::new(ChunkMetadata { - table_summary: Arc::clone(&meta.table_summary), - schema: Arc::clone(&meta.schema), - delete_predicates: Arc::new(del_preds), + delete_predicates: del_preds, }); } } @@ -504,22 +494,22 @@ impl CatalogChunk { Ok(()) } - pub fn delete_predicates(&mut self) -> Arc> { + pub fn delete_predicates(&mut self) -> &[Arc] { match &self.stage { ChunkStage::Open { mb_chunk: _ } => { // no delete predicate for open chunk debug!("delete_predicates of Open chunk is empty"); - Arc::new(vec![]) + &[] } ChunkStage::Frozen { meta, .. } => { let preds = &meta.delete_predicates; debug!(?preds, "delete_predicates of Frozen chunk"); - Arc::clone(&meta.delete_predicates) + preds } ChunkStage::Persisted { meta, .. } => { let preds = &meta.delete_predicates; debug!(?preds, "delete_predicates of Persisted chunk"); - Arc::clone(&meta.delete_predicates) + preds } } } @@ -692,11 +682,14 @@ impl CatalogChunk { /// /// This only works for chunks in the _open_ stage (chunk is converted) and the _frozen_ stage /// (no-op) and will fail for other stages. - pub fn freeze_with_predicate(&mut self, delete_predicate: &Predicate) -> Result<()> { - self.freeze_with_delete_predicates(vec![delete_predicate.clone()]) + pub fn freeze_with_predicate(&mut self, delete_predicate: Arc) -> Result<()> { + self.freeze_with_delete_predicates(vec![delete_predicate]) } - fn freeze_with_delete_predicates(&mut self, delete_predicates: Vec) -> Result<()> { + fn freeze_with_delete_predicates( + &mut self, + delete_predicates: Vec>, + ) -> Result<()> { match &self.stage { ChunkStage::Open { mb_chunk, .. } => { debug!(%self.addr, row_count=mb_chunk.rows(), "freezing chunk"); @@ -709,7 +702,7 @@ impl CatalogChunk { let metadata = ChunkMetadata { table_summary: Arc::new(mb_chunk.table_summary()), schema: s.full_schema(), - delete_predicates: Arc::new(delete_predicates), + delete_predicates, }; self.stage = ChunkStage::Frozen { @@ -793,7 +786,7 @@ impl CatalogChunk { *meta = Arc::new(ChunkMetadata { table_summary: Arc::clone(&meta.table_summary), schema, - delete_predicates: Arc::clone(&meta.delete_predicates), + delete_predicates: meta.delete_predicates.clone(), }); match &representation { @@ -1168,7 +1161,7 @@ mod tests { expected_exprs1.push(e); // Add a delete predicate into a chunk the open chunk = delete simulation for open chunk - chunk.add_delete_predicate(&del_pred1).unwrap(); + chunk.add_delete_predicate(Arc::new(del_pred1)).unwrap(); // chunk must be in frozen stage now assert_eq!(chunk.stage().name(), "Frozen"); // chunk must have a delete predicate @@ -1199,7 +1192,7 @@ mod tests { let mut expected_exprs2 = vec![]; let e = col("cost").not_eq(lit(15)); expected_exprs2.push(e); - chunk.add_delete_predicate(&del_pred2).unwrap(); + chunk.add_delete_predicate(Arc::new(del_pred2)).unwrap(); // chunk still must be in frozen stage now assert_eq!(chunk.stage().name(), "Frozen"); // chunk must have 2 delete predicates @@ -1265,7 +1258,7 @@ mod tests { now, now, ChunkMetrics::new_unregistered(), - Arc::new(vec![] as Vec), + vec![], ChunkOrder::new(6), ) } diff --git a/server/src/db/catalog/partition.rs b/server/src/db/catalog/partition.rs index 3c989e6a7f..bcd07e8435 100644 --- a/server/src/db/catalog/partition.rs +++ b/server/src/db/catalog/partition.rs @@ -176,7 +176,7 @@ impl Partition { time_of_first_write: DateTime, time_of_last_write: DateTime, schema: Arc, - delete_predicates: Arc>, + delete_predicates: Vec>, chunk_order: ChunkOrder, ) -> (u32, Arc>) { let chunk_id = Self::pick_next(&mut self.next_chunk_id, "Chunk ID Overflow"); @@ -231,7 +231,7 @@ impl Partition { chunk: Arc, time_of_first_write: DateTime, time_of_last_write: DateTime, - delete_predicates: Arc>, + delete_predicates: Vec>, chunk_order: ChunkOrder, ) -> Arc> { assert_eq!(chunk.table_name(), self.table_name()); @@ -246,7 +246,7 @@ impl Partition { time_of_first_write, time_of_last_write, self.metrics.new_chunk_metrics(), - Arc::clone(&delete_predicates), + delete_predicates, chunk_order, )), ); diff --git a/server/src/db/chunk.rs b/server/src/db/chunk.rs index e378b56f8e..b9386d93c4 100644 --- a/server/src/db/chunk.rs +++ b/server/src/db/chunk.rs @@ -121,7 +121,7 @@ impl DbChunk { let meta = ChunkMetadata { table_summary: Arc::new(mb_chunk.table_summary()), schema: snapshot.full_schema(), - delete_predicates: Arc::new(vec![]), // open chunk does not have delete predicate + delete_predicates: vec![], // open chunk does not have delete predicate }; (state, Arc::new(meta)) } @@ -226,7 +226,7 @@ impl DbChunk { } pub fn to_rub_negated_predicates( - delete_predicates: &[Predicate], + delete_predicates: &[Arc], ) -> Result> { let mut rub_preds: Vec = vec![]; for pred in delete_predicates { @@ -331,7 +331,7 @@ impl QueryChunk for DbChunk { &self, predicate: &Predicate, selection: Selection<'_>, - delete_predicates: &[Predicate], + delete_predicates: &[Arc], ) -> Result { // Predicate is not required to be applied for correctness. We only pushed it down // when possible for performance gain @@ -536,11 +536,11 @@ impl QueryChunkMeta for DbChunk { } // return a reference to delete predicates of the chunk - fn delete_predicates(&self) -> &Vec { - let pred: &Vec = &self.meta.delete_predicates; + fn delete_predicates(&self) -> &[Arc] { + let pred = &self.meta.delete_predicates; debug!(?pred, "Delete predicate in DbChunk"); - &self.meta.delete_predicates + pred } } diff --git a/server/src/db/lifecycle/compact.rs b/server/src/db/lifecycle/compact.rs index 281d44b1ec..b661dfc546 100644 --- a/server/src/db/lifecycle/compact.rs +++ b/server/src/db/lifecycle/compact.rs @@ -45,7 +45,7 @@ pub(crate) fn compact_chunks( let mut input_rows = 0; let mut time_of_first_write: Option> = None; let mut time_of_last_write: Option> = None; - let mut delete_predicates: Vec = vec![]; + let mut delete_predicates: Vec> = vec![]; let mut min_order = ChunkOrder::MAX; let query_chunks = chunks .into_iter() @@ -66,8 +66,7 @@ pub(crate) fn compact_chunks( .map(|prev_last| prev_last.max(candidate_last)) .or(Some(candidate_last)); - let mut preds = (*chunk.delete_predicates()).clone(); - delete_predicates.append(&mut preds); + delete_predicates.extend(chunk.delete_predicates().iter().cloned()); min_order = min_order.min(chunk.order()); @@ -119,7 +118,7 @@ pub(crate) fn compact_chunks( time_of_first_write, time_of_last_write, schema, - Arc::new(delete_predicates), + delete_predicates, min_order, ) }; diff --git a/server/src/db/lifecycle/persist.rs b/server/src/db/lifecycle/persist.rs index 52b3e37e3a..7fa38818f7 100644 --- a/server/src/db/lifecycle/persist.rs +++ b/server/src/db/lifecycle/persist.rs @@ -52,7 +52,7 @@ pub fn persist_chunks( let mut time_of_first_write: Option> = None; let mut time_of_last_write: Option> = None; let mut query_chunks = vec![]; - let mut delete_predicates: Vec = vec![]; + let mut delete_predicates: Vec> = vec![]; let mut min_order = ChunkOrder::MAX; for mut chunk in chunks { // Sanity-check @@ -72,8 +72,7 @@ pub fn persist_chunks( .map(|prev_last| prev_last.max(candidate_last)) .or(Some(candidate_last)); - let mut preds = (*chunk.delete_predicates()).clone(); - delete_predicates.append(&mut preds); + delete_predicates.extend(chunk.delete_predicates().iter().cloned()); min_order = min_order.min(chunk.order()); @@ -133,7 +132,6 @@ pub fn persist_chunks( partition_write.force_drop_chunk(id) } - let del_preds = Arc::new(delete_predicates); // Upsert remainder to catalog if let Some(remainder) = remainder { partition_write.create_rub_chunk( @@ -141,7 +139,7 @@ pub fn persist_chunks( time_of_first_write, time_of_last_write, Arc::clone(&schema), - Arc::clone(&del_preds), + delete_predicates.clone(), min_order, ); } @@ -155,7 +153,7 @@ pub fn persist_chunks( time_of_first_write, time_of_last_write, schema, - del_preds, + delete_predicates, min_order, ); let to_persist = LockableCatalogChunk { diff --git a/server/src/db/load.rs b/server/src/db/load.rs index 0663e8dbe3..d0cdd1817d 100644 --- a/server/src/db/load.rs +++ b/server/src/db/load.rs @@ -226,7 +226,7 @@ impl CatalogState for Loader { .map_err(|e| Box::new(e) as _) .context(SchemaError { path: info.path })?; - let delete_predicates: Arc> = Arc::new(vec![]); // NGA todo: After Marco saves delete predicate into the catalog, it will need to get extracted into this variable + let delete_predicates: Vec> = vec![]; // NGA todo: After Marco saves delete predicate into the catalog, it will need to get extracted into this variable partition.insert_object_store_only_chunk( iox_md.chunk_id, parquet_chunk, diff --git a/src/influxdb_ioxd/rpc/management.rs b/src/influxdb_ioxd/rpc/management.rs index 2b97ff5407..4ff9fe81ea 100644 --- a/src/influxdb_ioxd/rpc/management.rs +++ b/src/influxdb_ioxd/rpc/management.rs @@ -615,7 +615,7 @@ where del_predicate.exprs.push(expr); } - db.delete(&table_name, &del_predicate) + db.delete(&table_name, Arc::new(del_predicate)) .await .map_err(default_db_error_handler)?; } From f34eab70b3946363e15826cbab2638cb76764777 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> Date: Thu, 16 Sep 2021 17:46:12 +0100 Subject: [PATCH 4/6] feat: add pbjson bytes support (#2560) * feat: add pbjson bytes support * chore: fix lint * chore: review feedback Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com> --- Cargo.lock | 2 + pbjson/Cargo.toml | 2 + pbjson/src/lib.rs | 45 +++++++- pbjson_build/src/generator/message.rs | 159 +++++++++++++++++--------- pbjson_build/src/message.rs | 4 +- pbjson_test/protos/syntax3.proto | 14 ++- pbjson_test/src/lib.rs | 31 +++-- 7 files changed, 188 insertions(+), 69 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b987139e13..5e5870e339 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2902,6 +2902,8 @@ checksum = "acbf547ad0c65e31259204bd90935776d1c693cec2f4ff7abb7a1bbbd40dfe58" name = "pbjson" version = "0.1.0" dependencies = [ + "base64 0.13.0", + "bytes", "serde", ] diff --git a/pbjson/Cargo.toml b/pbjson/Cargo.toml index 8a81ca1f5a..04ce2185f3 100644 --- a/pbjson/Cargo.toml +++ b/pbjson/Cargo.toml @@ -8,5 +8,7 @@ description = "Utilities for pbjson converion" [dependencies] serde = { version = "1.0", features = ["derive"] } +base64 = "0.13" [dev-dependencies] +bytes = "1.0" diff --git a/pbjson/src/lib.rs b/pbjson/src/lib.rs index 51f77e899f..cb9bdd0b3b 100644 --- a/pbjson/src/lib.rs +++ b/pbjson/src/lib.rs @@ -9,12 +9,17 @@ #[doc(hidden)] pub mod private { + /// Re-export base64 + pub use base64; + + use serde::Deserialize; use std::str::FromStr; + /// Used to parse a number from either a string or its raw representation #[derive(Debug, Copy, Clone, PartialOrd, PartialEq, Hash, Ord, Eq)] pub struct NumberDeserialize(pub T); - #[derive(serde::Deserialize)] + #[derive(Deserialize)] #[serde(untagged)] enum Content<'a, T> { Str(&'a str), @@ -26,7 +31,6 @@ pub mod private { T: FromStr + serde::Deserialize<'de>, ::Err: std::error::Error, { - #[allow(deprecated)] fn deserialize(deserializer: D) -> Result where D: serde::Deserializer<'de>, @@ -38,4 +42,41 @@ pub mod private { })) } } + + #[derive(Debug, Copy, Clone, PartialOrd, PartialEq, Hash, Ord, Eq)] + pub struct BytesDeserialize(pub T); + + impl<'de, T> Deserialize<'de> for BytesDeserialize + where + T: From>, + { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + let s: &str = Deserialize::deserialize(deserializer)?; + let decoded = base64::decode(s).map_err(serde::de::Error::custom)?; + Ok(Self(decoded.into())) + } + } + + #[cfg(test)] + mod tests { + use super::*; + use bytes::Bytes; + use serde::de::value::{BorrowedStrDeserializer, Error}; + + #[test] + fn test_bytes() { + let raw = vec![2, 5, 62, 2, 5, 7, 8, 43, 5, 8, 4, 23, 5, 7, 7, 3, 2, 5, 196]; + let encoded = base64::encode(&raw); + + let deserializer = BorrowedStrDeserializer::<'_, Error>::new(&encoded); + let a: Bytes = BytesDeserialize::deserialize(deserializer).unwrap().0; + let b: Vec = BytesDeserialize::deserialize(deserializer).unwrap().0; + + assert_eq!(raw.as_slice(), &a); + assert_eq!(raw.as_slice(), &b); + } + } } diff --git a/pbjson_build/src/generator/message.rs b/pbjson_build/src/generator/message.rs index 2ae1c1bfc9..9625554af3 100644 --- a/pbjson_build/src/generator/message.rs +++ b/pbjson_build/src/generator/message.rs @@ -198,28 +198,14 @@ fn write_serialize_variable( writer: &mut W, ) -> Result<()> { match &field.field_type { - FieldType::Scalar(ScalarType::I64) | FieldType::Scalar(ScalarType::U64) => { - match field.field_modifier { - FieldModifier::Repeated => { - writeln!( - writer, - "{}struct_ser.serialize_field(\"{}\", &{}.iter().map(ToString::to_string).collect::>())?;", - Indent(indent), - field.json_name(), - variable.raw - ) - } - _ => { - writeln!( - writer, - "{}struct_ser.serialize_field(\"{}\", {}.to_string().as_str())?;", - Indent(indent), - field.json_name(), - variable.raw - ) - } - } - } + FieldType::Scalar(scalar) => write_serialize_scalar_variable( + indent, + *scalar, + field.field_modifier, + variable, + field.json_name(), + writer, + ), FieldType::Enum(path) => { write!(writer, "{}let v = ", Indent(indent))?; match field.field_modifier { @@ -301,6 +287,52 @@ fn write_serialize_variable( } } +fn write_serialize_scalar_variable( + indent: usize, + scalar: ScalarType, + field_modifier: FieldModifier, + variable: Variable<'_>, + json_name: String, + writer: &mut W, +) -> Result<()> { + let conversion = match scalar { + ScalarType::I64 | ScalarType::U64 => "ToString::to_string", + ScalarType::Bytes => "pbjson::private::base64::encode", + _ => { + return writeln!( + writer, + "{}struct_ser.serialize_field(\"{}\", {})?;", + Indent(indent), + json_name, + variable.as_ref + ) + } + }; + + match field_modifier { + FieldModifier::Repeated => { + writeln!( + writer, + "{}struct_ser.serialize_field(\"{}\", &{}.iter().map({}).collect::>())?;", + Indent(indent), + json_name, + variable.raw, + conversion + ) + } + _ => { + writeln!( + writer, + "{}struct_ser.serialize_field(\"{}\", {}(&{}).as_str())?;", + Indent(indent), + json_name, + conversion, + variable.raw, + ) + } + } +} + fn write_serialize_field( config: &Config, indent: usize, @@ -641,33 +673,8 @@ fn write_deserialize_field( } match &field.field_type { - FieldType::Scalar(scalar) if scalar.is_numeric() => { - writeln!(writer)?; - - match field.field_modifier { - FieldModifier::Repeated => { - writeln!( - writer, - "{}map.next_value::>>()?", - Indent(indent + 2), - scalar.rust_type() - )?; - writeln!( - writer, - "{}.into_iter().map(|x| x.0).collect()", - Indent(indent + 3) - )?; - } - _ => { - writeln!( - writer, - "{}map.next_value::<::pbjson::private::NumberDeserialize<{}>>()?.0", - Indent(indent + 2), - scalar.rust_type() - )?; - } - } - write!(writer, "{}", Indent(indent + 1))?; + FieldType::Scalar(scalar) => { + write_encode_scalar_field(indent + 1, *scalar, field.field_modifier, writer)?; } FieldType::Enum(path) => match field.field_modifier { FieldModifier::Repeated => { @@ -693,8 +700,12 @@ fn write_deserialize_field( Indent(indent + 2), )?; - let map_k = match key.is_numeric() { - true => { + let map_k = match key { + ScalarType::Bytes => { + // https://github.com/tokio-rs/prost/issues/531 + panic!("bytes are not currently supported as map keys") + } + _ if key.is_numeric() => { write!( writer, "::pbjson::private::NumberDeserialize<{}>", @@ -702,7 +713,7 @@ fn write_deserialize_field( )?; "k.0" } - false => { + _ => { write!(writer, "_")?; "k" } @@ -717,6 +728,10 @@ fn write_deserialize_field( )?; "v.0" } + FieldType::Scalar(ScalarType::Bytes) => { + // https://github.com/tokio-rs/prost/issues/531 + panic!("bytes are not currently supported as map values") + } FieldType::Enum(path) => { write!(writer, "{}", config.rust_type(path))?; "v as i32" @@ -752,3 +767,43 @@ fn write_deserialize_field( writeln!(writer, ");")?; writeln!(writer, "{}}}", Indent(indent)) } + +fn write_encode_scalar_field( + indent: usize, + scalar: ScalarType, + field_modifier: FieldModifier, + writer: &mut W, +) -> Result<()> { + let deserializer = match scalar { + ScalarType::Bytes => "BytesDeserialize", + _ if scalar.is_numeric() => "NumberDeserialize", + _ => return write!(writer, "map.next_value()?",), + }; + + writeln!(writer)?; + + match field_modifier { + FieldModifier::Repeated => { + writeln!( + writer, + "{}map.next_value::>>()?", + Indent(indent + 1), + deserializer + )?; + writeln!( + writer, + "{}.into_iter().map(|x| x.0).collect()", + Indent(indent + 2) + )?; + } + _ => { + writeln!( + writer, + "{}map.next_value::<::pbjson::private::{}<_>>()?.0", + Indent(indent + 1), + deserializer + )?; + } + } + write!(writer, "{}", Indent(indent)) +} diff --git a/pbjson_build/src/message.rs b/pbjson_build/src/message.rs index e56337d796..5927a1f520 100644 --- a/pbjson_build/src/message.rs +++ b/pbjson_build/src/message.rs @@ -12,7 +12,7 @@ use prost_types::{ use crate::descriptor::{Descriptor, DescriptorSet, MessageDescriptor, Syntax, TypeName, TypePath}; use crate::escape::escape_ident; -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Copy)] pub enum ScalarType { F64, F32, @@ -61,7 +61,7 @@ pub enum FieldType { Map(ScalarType, Box), } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Copy)] pub enum FieldModifier { Required, Optional, diff --git a/pbjson_test/protos/syntax3.proto b/pbjson_test/protos/syntax3.proto index 044b410eca..73ef4fded7 100644 --- a/pbjson_test/protos/syntax3.proto +++ b/pbjson_test/protos/syntax3.proto @@ -60,11 +60,13 @@ message KitchenSink { Empty one_of_message = 30; } - // Bytes support is currently broken - // bytes bytes = 31; - // optional bytes optional_bytes = 32; - // map bytes_dict = 35; + bytes bytes = 31; + optional bytes optional_bytes = 32; + repeated bytes repeated_bytes = 33; - string string = 33; - optional string optional_string = 34; + // Bytes support is currently broken - https://github.com/tokio-rs/prost/issues/531 + // map bytes_dict = 34; + + string string = 35; + optional string optional_string = 36; } \ No newline at end of file diff --git a/pbjson_test/src/lib.rs b/pbjson_test/src/lib.rs index 449f7e5662..70a9253aff 100644 --- a/pbjson_test/src/lib.rs +++ b/pbjson_test/src/lib.rs @@ -196,13 +196,30 @@ mod tests { decoded.repeated_value = Default::default(); verify_decode(&decoded, "{}"); - // Bytes support currently broken - // decoded.bytes = prost::bytes::Bytes::from_static(b"kjkjkj"); - // verify(&decoded, r#"{"bytes":"a2pramtqCg=="}"#); - // - // decoded.repeated_value = Default::default(); - // verify_decode(&decoded, "{}"); - // + decoded.bytes = prost::bytes::Bytes::from_static(b"kjkjkj"); + verify(&decoded, r#"{"bytes":"a2pramtq"}"#); + + decoded.bytes = Default::default(); + verify_decode(&decoded, "{}"); + + decoded.optional_bytes = Some(prost::bytes::Bytes::from_static(b"kjkjkj")); + verify(&decoded, r#"{"optionalBytes":"a2pramtq"}"#); + + decoded.optional_bytes = Some(Default::default()); + verify(&decoded, r#"{"optionalBytes":""}"#); + + decoded.optional_bytes = None; + verify_decode(&decoded, "{}"); + + decoded.repeated_bytes = vec![ + prost::bytes::Bytes::from_static(b"sdfsd"), + prost::bytes::Bytes::from_static(b"fghfg"), + ]; + verify(&decoded, r#"{"repeatedBytes":["c2Rmc2Q=","ZmdoZmc="]}"#); + + decoded.repeated_bytes = Default::default(); + verify_decode(&decoded, "{}"); + // decoded.bytes_dict.insert( // "test".to_string(), // prost::bytes::Bytes::from_static(b"asdf"), From 9b3a13f9844b2b8364aa6128a9e5081639e8d8df Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> Date: Thu, 16 Sep 2021 17:56:20 +0100 Subject: [PATCH 5/6] feat: remove legacy http list partitions endpoint (#2557) Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com> --- src/influxdb_ioxd/http.rs | 40 +-------------------------------------- 1 file changed, 1 insertion(+), 39 deletions(-) diff --git a/src/influxdb_ioxd/http.rs b/src/influxdb_ioxd/http.rs index 76ac71d79a..04d4df9e20 100644 --- a/src/influxdb_ioxd/http.rs +++ b/src/influxdb_ioxd/http.rs @@ -28,7 +28,7 @@ use data_types::{ }; use influxdb_iox_client::format::QueryOutputFormat; use influxdb_line_protocol::parse_lines; -use query::{exec::ExecutionContextProvider, QueryDatabase}; +use query::exec::ExecutionContextProvider; use server::{ApplicationState, ConnectionManager, Error, Server as AppServer}; // External crates @@ -392,7 +392,6 @@ where .get("/health", health::) .get("/metrics", handle_metrics::) .get("/iox/api/v1/databases/:name/query", query::) - .get("/api/v1/partitions", list_partitions::) .get("/debug/pprof", pprof_home::) .get("/debug/pprof/profile", pprof_profile::) .get("/debug/pprof/allocs", pprof_heappy_profile::) @@ -644,43 +643,6 @@ async fn handle_metrics( Ok(Response::new(Body::from(body))) } -#[derive(Deserialize, Debug)] -/// Arguments in the query string of the request to /partitions -struct DatabaseInfo { - org: String, - bucket: String, -} - -#[tracing::instrument(level = "debug")] -async fn list_partitions( - req: Request, -) -> Result, ApplicationError> { - let server = Arc::clone(&req.data::>().expect("server state").app_server); - - let query = req.uri().query().context(ExpectedQueryString {})?; - - let info: DatabaseInfo = serde_urlencoded::from_str(query).context(InvalidQueryString { - query_string: query, - })?; - - let db_name = - org_and_bucket_to_database(&info.org, &info.bucket).context(BucketMappingError)?; - - let db = server.db(&db_name)?; - - let partition_keys = - db.partition_keys() - .map_err(|e| Box::new(e) as _) - .context(BucketByName { - org: &info.org, - bucket_name: &info.bucket, - })?; - - let result = serde_json::to_string(&partition_keys).context(JsonGenerationError)?; - - Ok(Response::new(Body::from(result))) -} - #[derive(Deserialize, Debug)] /// Arguments in the query string of the request to /snapshot struct SnapshotInfo { From 37b615f301ce50be23baca8d713d4611608166d0 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> Date: Thu, 16 Sep 2021 20:15:24 +0100 Subject: [PATCH 6/6] feat: migrate operations CLI to use pbjson (#2562) * feat: migrate operations CLI to use pbjson * fix: reserve removed field * chore: review feedback Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com> --- data_types/src/job.rs | 42 ----- .../influxdata/iox/management/v1/jobs.proto | 5 +- generated_types/src/google.rs | 45 ++++++ generated_types/src/job.rs | 147 +----------------- influxdb_iox_client/src/client/management.rs | 29 +++- influxdb_iox_client/src/client/operations.rs | 68 ++------ src/commands/database/partition.rs | 8 +- src/commands/database/recover.rs | 9 +- src/commands/operations.rs | 30 +--- tests/end_to_end_cases/management_api.rs | 25 +-- tests/end_to_end_cases/management_cli.rs | 35 ++--- tests/end_to_end_cases/operations_api.rs | 42 ++--- tests/end_to_end_cases/operations_cli.rs | 38 ++--- tests/end_to_end_cases/persistence.rs | 17 +- tests/end_to_end_cases/scenario.rs | 8 +- tests/end_to_end_cases/sql_cli.rs | 9 +- tests/end_to_end_cases/system_tables.rs | 4 +- 17 files changed, 167 insertions(+), 394 deletions(-) diff --git a/data_types/src/job.rs b/data_types/src/job.rs index 2918c96e1c..33ca1165be 100644 --- a/data_types/src/job.rs +++ b/data_types/src/job.rs @@ -117,45 +117,3 @@ impl Job { } } } - -/// The status of a running operation -#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)] -pub enum OperationStatus { - /// A task associated with the operation is running - Running, - /// All tasks associated with the operation have finished successfully - Success, - /// The operation was cancelled and no associated tasks are running - Cancelled, - /// An operation error was returned - Errored, -} - -/// A group of asynchronous tasks being performed by an IOx server -/// -/// TODO: Temporary until prost adds JSON support -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct Operation { - /// ID of the running operation - pub id: usize, - // The total number of created tasks - pub total_count: u64, - // The number of pending tasks - pub pending_count: u64, - // The number of tasks that completed successfully - pub success_count: u64, - // The number of tasks that returned an error - pub error_count: u64, - // The number of tasks that were cancelled - pub cancelled_count: u64, - // The number of tasks that did not run to completion (e.g. panic) - pub dropped_count: u64, - /// Wall time spent executing this operation - pub wall_time: std::time::Duration, - /// CPU time spent executing this operation - pub cpu_time: std::time::Duration, - /// Additional job metadata - pub job: Option, - /// The status of the running operation - pub status: OperationStatus, -} diff --git a/generated_types/protos/influxdata/iox/management/v1/jobs.proto b/generated_types/protos/influxdata/iox/management/v1/jobs.proto index 2e5985f144..5a021c3d1b 100644 --- a/generated_types/protos/influxdata/iox/management/v1/jobs.proto +++ b/generated_types/protos/influxdata/iox/management/v1/jobs.proto @@ -27,12 +27,11 @@ message OperationMetadata { // The number of tasks that did not run to completion (e.g. panic) uint64 dropped_count = 16; + reserved 6; + // What kind of job is it? oneof job { Dummy dummy = 5; - /* historical artifact - PersistSegment persist_segment = 6; - */ CloseChunk close_chunk = 7; WriteChunk write_chunk = 8; WipePreservedCatalog wipe_preserved_catalog = 9; diff --git a/generated_types/src/google.rs b/generated_types/src/google.rs index 3438c5edd9..9ab7287fef 100644 --- a/generated_types/src/google.rs +++ b/generated_types/src/google.rs @@ -12,6 +12,11 @@ pub mod longrunning { include!(concat!(env!("OUT_DIR"), "/google.longrunning.rs")); include!(concat!(env!("OUT_DIR"), "/google.longrunning.serde.rs")); + use crate::google::{FieldViolation, FieldViolationExt}; + use crate::influxdata::iox::management::v1::{OperationMetadata, OPERATION_METADATA}; + use prost::{bytes::Bytes, Message}; + use std::convert::TryFrom; + impl Operation { /// Return the IOx operation `id`. This `id` can /// be passed to the various APIs in the @@ -21,6 +26,46 @@ pub mod longrunning { .parse() .expect("Internal error: id returned from server was not an integer") } + + /// Decodes an IOx `OperationMetadata` metadata payload + pub fn iox_metadata(&self) -> Result { + let metadata = self + .metadata + .as_ref() + .ok_or_else(|| FieldViolation::required("metadata"))?; + + if !crate::protobuf_type_url_eq(&metadata.type_url, OPERATION_METADATA) { + return Err(FieldViolation { + field: "metadata.type_url".to_string(), + description: "Unexpected field type".to_string(), + }); + } + + Message::decode(Bytes::clone(&metadata.value)).field("metadata.value") + } + } + + /// Groups together an `Operation` with a decoded `OperationMetadata` + /// + /// When serialized this will serialize the encoded Any field on `Operation` along + /// with its decoded representation as `OperationMetadata` + #[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)] + pub struct IoxOperation { + /// The `Operation` message returned from the API + pub operation: Operation, + /// The decoded `Operation::metadata` contained within `IoxOperation::operation` + pub metadata: OperationMetadata, + } + + impl TryFrom for IoxOperation { + type Error = FieldViolation; + + fn try_from(operation: Operation) -> Result { + Ok(Self { + metadata: operation.iox_metadata()?, + operation, + }) + } } } diff --git a/generated_types/src/job.rs b/generated_types/src/job.rs index 499826f455..d3d1429875 100644 --- a/generated_types/src/job.rs +++ b/generated_types/src/job.rs @@ -1,11 +1,5 @@ -use crate::google::{longrunning, protobuf::Any, FieldViolation, FieldViolationExt}; use crate::influxdata::iox::management::v1 as management; -use crate::protobuf_type_url_eq; -use data_types::chunk_metadata::ChunkAddr; -use data_types::job::{Job, OperationStatus}; -use data_types::partition_metadata::PartitionAddr; -use std::convert::TryFrom; -use std::sync::Arc; +use data_types::job::Job; impl From for management::operation_metadata::Job { fn from(job: Job) -> Self { @@ -61,142 +55,3 @@ impl From for management::operation_metadata::Job { } } } - -impl From for Job { - fn from(value: management::operation_metadata::Job) -> Self { - use management::operation_metadata::Job; - match value { - Job::Dummy(management::Dummy { nanos, db_name }) => Self::Dummy { - nanos, - db_name: (!db_name.is_empty()).then(|| Arc::from(db_name.as_str())), - }, - Job::CloseChunk(management::CloseChunk { - db_name, - partition_key, - table_name, - chunk_id, - }) => Self::CompactChunk { - chunk: ChunkAddr { - db_name: Arc::from(db_name.as_str()), - table_name: Arc::from(table_name.as_str()), - partition_key: Arc::from(partition_key.as_str()), - chunk_id, - }, - }, - Job::WriteChunk(management::WriteChunk { - db_name, - partition_key, - table_name, - chunk_id, - }) => Self::WriteChunk { - chunk: ChunkAddr { - db_name: Arc::from(db_name.as_str()), - table_name: Arc::from(table_name.as_str()), - partition_key: Arc::from(partition_key.as_str()), - chunk_id, - }, - }, - Job::WipePreservedCatalog(management::WipePreservedCatalog { db_name }) => { - Self::WipePreservedCatalog { - db_name: Arc::from(db_name.as_str()), - } - } - Job::CompactChunks(management::CompactChunks { - db_name, - partition_key, - table_name, - chunks, - }) => Self::CompactChunks { - partition: PartitionAddr { - db_name: Arc::from(db_name.as_str()), - table_name: Arc::from(table_name.as_str()), - partition_key: Arc::from(partition_key.as_str()), - }, - chunks, - }, - Job::PersistChunks(management::PersistChunks { - db_name, - partition_key, - table_name, - chunks, - }) => Self::PersistChunks { - partition: PartitionAddr { - db_name: Arc::from(db_name.as_str()), - table_name: Arc::from(table_name.as_str()), - partition_key: Arc::from(partition_key.as_str()), - }, - chunks, - }, - Job::DropChunk(management::DropChunk { - db_name, - partition_key, - table_name, - chunk_id, - }) => Self::DropChunk { - chunk: ChunkAddr { - db_name: Arc::from(db_name.as_str()), - table_name: Arc::from(table_name.as_str()), - partition_key: Arc::from(partition_key.as_str()), - chunk_id, - }, - }, - Job::DropPartition(management::DropPartition { - db_name, - partition_key, - table_name, - }) => Self::DropPartition { - partition: PartitionAddr { - db_name: Arc::from(db_name.as_str()), - table_name: Arc::from(table_name.as_str()), - partition_key: Arc::from(partition_key.as_str()), - }, - }, - } - } -} - -impl TryFrom for data_types::job::Operation { - type Error = FieldViolation; - - fn try_from(operation: longrunning::Operation) -> Result { - let metadata: Any = operation - .metadata - .ok_or_else(|| FieldViolation::required("metadata"))?; - - if !protobuf_type_url_eq(&metadata.type_url, management::OPERATION_METADATA) { - return Err(FieldViolation { - field: "metadata.type_url".to_string(), - description: "Unexpected field type".to_string(), - }); - } - - let meta: management::OperationMetadata = - prost::Message::decode(metadata.value).field("metadata.value")?; - - let status = match &operation.result { - None => OperationStatus::Running, - Some(longrunning::operation::Result::Response(_)) => OperationStatus::Success, - Some(longrunning::operation::Result::Error(status)) => { - if status.code == tonic::Code::Cancelled as i32 { - OperationStatus::Cancelled - } else { - OperationStatus::Errored - } - } - }; - - Ok(Self { - id: operation.name.parse().field("name")?, - total_count: meta.total_count, - pending_count: meta.pending_count, - success_count: meta.success_count, - error_count: meta.error_count, - cancelled_count: meta.cancelled_count, - dropped_count: meta.dropped_count, - wall_time: std::time::Duration::from_nanos(meta.wall_nanos), - cpu_time: std::time::Duration::from_nanos(meta.cpu_nanos), - job: meta.job.map(Into::into), - status, - }) - } -} diff --git a/influxdb_iox_client/src/client/management.rs b/influxdb_iox_client/src/client/management.rs index bc3672ff6f..40926f1a85 100644 --- a/influxdb_iox_client/src/client/management.rs +++ b/influxdb_iox_client/src/client/management.rs @@ -3,8 +3,8 @@ use thiserror::Error; use self::generated_types::{management_service_client::ManagementServiceClient, *}; use crate::connection::Connection; -use ::generated_types::google::longrunning::Operation; +use crate::google::{longrunning::IoxOperation, FieldViolation}; use std::convert::TryInto; use std::num::NonZeroU32; @@ -180,6 +180,10 @@ pub enum CreateDummyJobError { #[error("Server returned an empty response")] EmptyResponse, + /// Response payload was invalid + #[error("Invalid response: {0}")] + InvalidResponse(#[from] FieldViolation), + /// Client received an unexpected error from the server #[error("Unexpected server error: {}: {}", .0.code(), .0.message())] ServerError(tonic::Status), @@ -264,6 +268,10 @@ pub enum ClosePartitionChunkError { #[error("Server unavailable: {}", .0.message())] Unavailable(tonic::Status), + /// Response payload was invalid + #[error("Invalid response: {0}")] + InvalidResponse(#[from] FieldViolation), + /// Client received an unexpected error from the server #[error("Unexpected server error: {}: {}", .0.code(), .0.message())] ServerError(tonic::Status), @@ -316,6 +324,10 @@ pub enum WipePersistedCatalogError { #[error("Server returned an empty response")] EmptyResponse, + /// Response payload was invalid + #[error("Invalid response: {0}")] + InvalidResponse(#[from] FieldViolation), + /// Client received an unexpected error from the server #[error("Unexpected server error: {}: {}", .0.code(), .0.message())] ServerError(tonic::Status), @@ -800,7 +812,7 @@ impl Client { pub async fn create_dummy_job( &mut self, nanos: Vec, - ) -> Result { + ) -> Result { let response = self .inner .create_dummy_job(CreateDummyJobRequest { nanos }) @@ -810,7 +822,8 @@ impl Client { Ok(response .into_inner() .operation - .ok_or(CreateDummyJobError::EmptyResponse)?) + .ok_or(CreateDummyJobError::EmptyResponse)? + .try_into()?) } /// Closes the specified chunk in the specified partition and @@ -823,7 +836,7 @@ impl Client { table_name: impl Into + Send, partition_key: impl Into + Send, chunk_id: u32, - ) -> Result { + ) -> Result { let db_name = db_name.into(); let partition_key = partition_key.into(); let table_name = table_name.into(); @@ -846,7 +859,8 @@ impl Client { Ok(response .into_inner() .operation - .ok_or(ClosePartitionChunkError::EmptyResponse)?) + .ok_or(ClosePartitionChunkError::EmptyResponse)? + .try_into()?) } /// Unload chunk from read buffer but keep it in object store. @@ -887,7 +901,7 @@ impl Client { pub async fn wipe_persisted_catalog( &mut self, db_name: impl Into + Send, - ) -> Result { + ) -> Result { let db_name = db_name.into(); let response = self @@ -905,7 +919,8 @@ impl Client { Ok(response .into_inner() .operation - .ok_or(WipePersistedCatalogError::EmptyResponse)?) + .ok_or(WipePersistedCatalogError::EmptyResponse)? + .try_into()?) } /// Skip replay of an uninitialized database. diff --git a/influxdb_iox_client/src/client/operations.rs b/influxdb_iox_client/src/client/operations.rs index dcc0fb6fb0..781f2ec557 100644 --- a/influxdb_iox_client/src/client/operations.rs +++ b/influxdb_iox_client/src/client/operations.rs @@ -1,11 +1,9 @@ use thiserror::Error; -use ::generated_types::{ - google::FieldViolation, influxdata::iox::management::v1 as management, protobuf_type_url_eq, -}; - use self::generated_types::{operations_client::OperationsClient, *}; use crate::connection::Connection; +use std::convert::TryInto; + /// Re-export generated_types pub mod generated_types { pub use generated_types::google::longrunning::*; @@ -16,7 +14,7 @@ pub mod generated_types { pub enum Error { /// Client received an invalid response #[error("Invalid server response: {}", .0)] - InvalidResponse(#[from] FieldViolation), + InvalidResponse(#[from] ::generated_types::google::FieldViolation), /// Operation was not found #[error("Operation not found: {}", .0)] @@ -66,7 +64,7 @@ impl Client { } /// Get information of all client operation - pub async fn list_operations(&mut self) -> Result> { + pub async fn list_operations(&mut self) -> Result> { Ok(self .inner .list_operations(ListOperationsRequest::default()) @@ -75,12 +73,12 @@ impl Client { .into_inner() .operations .into_iter() - .map(|o| ClientOperation::try_new(o).unwrap()) - .collect()) + .map(TryInto::try_into) + .collect::>()?) } /// Get information about a specific operation - pub async fn get_operation(&mut self, id: usize) -> Result { + pub async fn get_operation(&mut self, id: usize) -> Result { Ok(self .inner .get_operation(GetOperationRequest { @@ -91,7 +89,8 @@ impl Client { tonic::Code::NotFound => Error::NotFound(id), _ => Error::ServerError(e), })? - .into_inner()) + .into_inner() + .try_into()?) } /// Cancel a given operation @@ -115,7 +114,7 @@ impl Client { &mut self, id: usize, timeout: Option, - ) -> Result { + ) -> Result { Ok(self .inner .wait_operation(WaitOperationRequest { @@ -127,50 +126,7 @@ impl Client { tonic::Code::NotFound => Error::NotFound(id), _ => Error::ServerError(e), })? - .into_inner()) - } - - /// Return the Client Operation - pub async fn client_operation(&mut self, id: usize) -> Result { - let operation = self.get_operation(id).await?; - ClientOperation::try_new(operation) - } -} - -/// IOx's Client Operation -#[derive(Debug, Clone)] -pub struct ClientOperation { - inner: generated_types::Operation, -} - -impl ClientOperation { - /// Create a new Cient Operation - pub fn try_new(operation: generated_types::Operation) -> Result { - if operation.metadata.is_some() { - let metadata = operation.metadata.clone().unwrap(); - if !protobuf_type_url_eq(&metadata.type_url, management::OPERATION_METADATA) { - return Err(Error::WrongOperationMetaData); - } - } else { - return Err(Error::NotFound(0)); - } - - Ok(Self { inner: operation }) - } - - /// Return Metadata for this client operation - pub fn metadata(&self) -> management::OperationMetadata { - prost::Message::decode(self.inner.metadata.clone().unwrap().value) - .expect("failed to decode metadata") - } - - /// Return name of this operation - pub fn name(&self) -> &str { - &self.inner.name - } - - /// Return the inner's Operation - pub fn operation(self) -> Operation { - self.inner + .into_inner() + .try_into()?) } } diff --git a/src/commands/database/partition.rs b/src/commands/database/partition.rs index 22046f8046..72d8dd9988 100644 --- a/src/commands/database/partition.rs +++ b/src/commands/database/partition.rs @@ -1,6 +1,5 @@ //! This module implements the `partition` CLI command use data_types::chunk_metadata::ChunkSummary; -use data_types::job::Operation; use generated_types::google::FieldViolation; use influxdb_iox_client::{ connection::Connection, @@ -10,7 +9,7 @@ use influxdb_iox_client::{ PersistPartitionError, UnloadPartitionChunkError, }, }; -use std::convert::{TryFrom, TryInto}; +use std::convert::TryFrom; use structopt::StructOpt; use thiserror::Error; @@ -283,10 +282,9 @@ pub async fn command(connection: Connection, config: Config) -> Result<()> { chunk_id, } = close_chunk; - let operation: Operation = client + let operation = client .close_partition_chunk(db_name, table_name, partition_key, chunk_id) - .await? - .try_into()?; + .await?; serde_json::to_writer_pretty(std::io::stdout(), &operation)?; } diff --git a/src/commands/database/recover.rs b/src/commands/database/recover.rs index 2df1dee01b..5b9b8aa6eb 100644 --- a/src/commands/database/recover.rs +++ b/src/commands/database/recover.rs @@ -1,6 +1,3 @@ -use std::convert::TryInto; - -use data_types::job::Operation; use generated_types::google::FieldViolation; use influxdb_iox_client::{connection::Connection, management}; use snafu::{ResultExt, Snafu}; @@ -74,12 +71,10 @@ pub async fn command(connection: Connection, config: Config) -> Result<()> { return Err(Error::NeedsTheForceError); } - let operation: Operation = client + let operation = client .wipe_persisted_catalog(db_name) .await - .context(WipeError)? - .try_into() - .context(InvalidResponse)?; + .context(WipeError)?; serde_json::to_writer_pretty(std::io::stdout(), &operation).context(WritingJson)?; } diff --git a/src/commands/operations.rs b/src/commands/operations.rs index 44d284f0f2..af741e446d 100644 --- a/src/commands/operations.rs +++ b/src/commands/operations.rs @@ -1,11 +1,8 @@ -use data_types::job::Operation; -use generated_types::google::FieldViolation; use influxdb_iox_client::{ connection::Connection, management, operations::{self, Client}, }; -use std::convert::TryInto; use structopt::StructOpt; use thiserror::Error; @@ -15,9 +12,6 @@ pub enum Error { #[error("Client error: {0}")] ClientError(#[from] operations::Error), - #[error("Received invalid response: {0}")] - InvalidResponse(#[from] FieldViolation), - #[error("Failed to create dummy job: {0}")] CreateDummyJobError(#[from] management::CreateDummyJobError), @@ -68,29 +62,16 @@ enum Command { pub async fn command(connection: Connection, config: Config) -> Result<()> { match config.command { Command::List => { - let result: Result, _> = Client::new(connection) - .list_operations() - .await? - .into_iter() - .map(|c| c.operation()) - .map(TryInto::try_into) - .collect(); - let operations = result?; + let operations = Client::new(connection).list_operations().await?; serde_json::to_writer_pretty(std::io::stdout(), &operations)?; } Command::Get { id } => { - let operation: Operation = Client::new(connection) - .get_operation(id) - .await? - .try_into()?; + let operation = Client::new(connection).get_operation(id).await?; serde_json::to_writer_pretty(std::io::stdout(), &operation)?; } Command::Wait { id, nanos } => { let timeout = nanos.map(std::time::Duration::from_nanos); - let operation: Operation = Client::new(connection) - .wait_operation(id, timeout) - .await? - .try_into()?; + let operation = Client::new(connection).wait_operation(id, timeout).await?; serde_json::to_writer_pretty(std::io::stdout(), &operation)?; } Command::Cancel { id } => { @@ -98,10 +79,9 @@ pub async fn command(connection: Connection, config: Config) -> Result<()> { println!("Ok"); } Command::Test { nanos } => { - let operation: Operation = management::Client::new(connection) + let operation = management::Client::new(connection) .create_dummy_job(nanos) - .await? - .try_into()?; + .await?; serde_json::to_writer_pretty(std::io::stdout(), &operation)?; } } diff --git a/tests/end_to_end_cases/management_api.rs b/tests/end_to_end_cases/management_api.rs index e8c025ff0e..5bbcee73cf 100644 --- a/tests/end_to_end_cases/management_api.rs +++ b/tests/end_to_end_cases/management_api.rs @@ -9,7 +9,6 @@ use generated_types::{ }; use influxdb_iox_client::{ management::{Client, CreateDatabaseError}, - operations, write::WriteError, }; @@ -880,20 +879,16 @@ async fn test_close_partition_chunk() { assert_eq!(chunks[0].storage, ChunkStorage::OpenMutableBuffer as i32); // Move the chunk to read buffer - let operation = management_client + let iox_operation = management_client .close_partition_chunk(&db_name, table_name, partition_key, 0) .await .expect("new partition chunk"); - println!("Operation response is {:?}", operation); - let operation_id = operation.id(); - - let meta = operations::ClientOperation::try_new(operation) - .unwrap() - .metadata(); + println!("Operation response is {:?}", iox_operation); + let operation_id = iox_operation.operation.id(); // ensure we got a legit job description back - if let Some(Job::CloseChunk(close_chunk)) = meta.job { + if let Some(Job::CloseChunk(close_chunk)) = iox_operation.metadata.job { assert_eq!(close_chunk.db_name, db_name); assert_eq!(close_chunk.partition_key, partition_key); assert_eq!(close_chunk.chunk_id, 0); @@ -1020,20 +1015,16 @@ async fn test_wipe_preserved_catalog() { // Recover by wiping preserved catalog // - let operation = management_client + let iox_operation = management_client .wipe_persisted_catalog(&db_name) .await .expect("wipe persisted catalog"); - println!("Operation response is {:?}", operation); - let operation_id = operation.id(); - - let meta = operations::ClientOperation::try_new(operation) - .unwrap() - .metadata(); + println!("Operation response is {:?}", iox_operation); + let operation_id = iox_operation.operation.id(); // ensure we got a legit job description back - if let Some(Job::WipePreservedCatalog(wipe_persisted_catalog)) = meta.job { + if let Some(Job::WipePreservedCatalog(wipe_persisted_catalog)) = iox_operation.metadata.job { assert_eq!(wipe_persisted_catalog.db_name, db_name); } else { panic!("unexpected job returned") diff --git a/tests/end_to_end_cases/management_cli.rs b/tests/end_to_end_cases/management_cli.rs index e48883ab20..53e3f2f8e5 100644 --- a/tests/end_to_end_cases/management_cli.rs +++ b/tests/end_to_end_cases/management_cli.rs @@ -1,13 +1,12 @@ -use std::sync::Arc; use std::time::Duration; use assert_cmd::Command; use predicates::prelude::*; -use data_types::chunk_metadata::ChunkAddr; -use data_types::{ - chunk_metadata::ChunkStorage, - job::{Job, Operation}, +use data_types::chunk_metadata::ChunkStorage; +use generated_types::google::longrunning::IoxOperation; +use generated_types::influxdata::iox::management::v1::{ + operation_metadata::Job, CloseChunk, WipePreservedCatalog, }; use test_helpers::make_temp_file; use write_buffer::maybe_skip_kafka_integration; @@ -720,7 +719,7 @@ async fn test_close_partition_chunk() { let lp_data = vec!["cpu,region=west user=23.2 100"]; load_lp(addr, &db_name, lp_data); - let stdout: Operation = serde_json::from_slice( + let stdout: IoxOperation = serde_json::from_slice( &Command::cargo_bin("influxdb_iox") .unwrap() .arg("database") @@ -739,18 +738,16 @@ async fn test_close_partition_chunk() { ) .expect("Expected JSON output"); - let expected_job = Job::CompactChunk { - chunk: ChunkAddr { - db_name: Arc::from(db_name.as_str()), - table_name: Arc::from("cpu"), - partition_key: Arc::from("cpu"), - chunk_id: 0, - }, - }; + let expected_job = Job::CloseChunk(CloseChunk { + db_name, + table_name: "cpu".to_string(), + partition_key: "cpu".to_string(), + chunk_id: 0, + }); assert_eq!( Some(expected_job), - stdout.job, + stdout.metadata.job, "operation was {:#?}", stdout ); @@ -783,7 +780,7 @@ async fn test_wipe_persisted_catalog() { let server_fixture = fixture_broken_catalog(&db_name).await; let addr = server_fixture.grpc_base(); - let stdout: Operation = serde_json::from_slice( + let stdout: IoxOperation = serde_json::from_slice( &Command::cargo_bin("influxdb_iox") .unwrap() .arg("database") @@ -800,13 +797,11 @@ async fn test_wipe_persisted_catalog() { ) .expect("Expected JSON output"); - let expected_job = Job::WipePreservedCatalog { - db_name: Arc::from(db_name.as_str()), - }; + let expected_job = Job::WipePreservedCatalog(WipePreservedCatalog { db_name }); assert_eq!( Some(expected_job), - stdout.job, + stdout.metadata.job, "operation was {:#?}", stdout ); diff --git a/tests/end_to_end_cases/operations_api.rs b/tests/end_to_end_cases/operations_api.rs index b42ce4a887..798ddad7b9 100644 --- a/tests/end_to_end_cases/operations_api.rs +++ b/tests/end_to_end_cases/operations_api.rs @@ -17,7 +17,7 @@ async fn test_operations() { let nanos = vec![Duration::from_secs(20).as_nanos() as _, 1]; - let operation = management_client + let iox_operation = management_client .create_dummy_job(nanos.clone()) .await .expect("create dummy job failed"); @@ -28,20 +28,15 @@ async fn test_operations() { .expect("list operations failed"); assert_eq!(running_ops.len(), 1); - assert_eq!(running_ops[0].name(), operation.name); + assert_eq!(running_ops[0].operation.name, iox_operation.operation.name); - let id = operation.name.parse().expect("not an integer"); + let id = iox_operation.operation.id(); + let iox_operation = operations_client.get_operation(id).await.unwrap(); - let meta = operations_client - .client_operation(id) - .await - .unwrap() - .metadata(); + let job = iox_operation.metadata.job.expect("expected a job"); - let job = meta.job.expect("expected a job"); - - assert_eq!(meta.total_count, 2); - assert_eq!(meta.pending_count, 1); + assert_eq!(iox_operation.metadata.total_count, 2); + assert_eq!(iox_operation.metadata.pending_count, 1); assert_eq!( job, operation_metadata::Job::Dummy(Dummy { @@ -51,14 +46,14 @@ async fn test_operations() { ); // Check wait times out correctly - let fetched = operations_client + let iox_operation = operations_client .wait_operation(id, Some(Duration::from_micros(10))) .await .expect("failed to wait operation"); - assert!(!fetched.done); + assert!(!iox_operation.operation.done); // Shouldn't specify wall_nanos as not complete - assert_eq!(meta.wall_nanos, 0); + assert_eq!(iox_operation.metadata.wall_nanos, 0); let wait = tokio::spawn(async move { let mut operations_client = server_fixture.operations_client(); @@ -74,18 +69,15 @@ async fn test_operations() { .expect("failed to cancel operation"); let waited = wait.await.unwrap(); - let meta = operations::ClientOperation::try_new(waited.clone()) - .unwrap() - .metadata(); - assert!(waited.done); - assert!(meta.wall_nanos > 0); - assert!(meta.cpu_nanos > 0); - assert_eq!(meta.pending_count, 0); - assert_eq!(meta.total_count, 2); - assert_eq!(meta.cancelled_count, 1); + assert!(waited.operation.done); + assert!(waited.metadata.wall_nanos > 0); + assert!(waited.metadata.cpu_nanos > 0); + assert_eq!(waited.metadata.pending_count, 0); + assert_eq!(waited.metadata.total_count, 2); + assert_eq!(waited.metadata.cancelled_count, 1); - match waited.result { + match waited.operation.result { Some(operations::generated_types::operation::Result::Error(status)) => { assert_eq!(status.code, tonic::Code::Cancelled as i32) } diff --git a/tests/end_to_end_cases/operations_cli.rs b/tests/end_to_end_cases/operations_cli.rs index 24e63c1c22..0284e55a69 100644 --- a/tests/end_to_end_cases/operations_cli.rs +++ b/tests/end_to_end_cases/operations_cli.rs @@ -1,6 +1,7 @@ use crate::common::server_fixture::ServerFixture; use assert_cmd::Command; -use data_types::job::{Job, Operation, OperationStatus}; +use generated_types::google::longrunning::IoxOperation; +use generated_types::influxdata::iox::management::v1::{operation_metadata::Job, Dummy}; use predicates::prelude::*; #[tokio::test] @@ -9,7 +10,7 @@ async fn test_start_stop() { let addr = server_fixture.grpc_base(); let duration = std::time::Duration::from_secs(10).as_nanos() as u64; - let stdout: Operation = serde_json::from_slice( + let stdout: IoxOperation = serde_json::from_slice( &Command::cargo_bin("influxdb_iox") .unwrap() .arg("operation") @@ -24,13 +25,13 @@ async fn test_start_stop() { ) .expect("expected JSON output"); - assert_eq!(stdout.total_count, 1); - match stdout.job { - Some(Job::Dummy { nanos, .. }) => assert_eq!(nanos, vec![duration]), - _ => panic!("expected dummy job got {:?}", stdout.job), + assert_eq!(stdout.metadata.total_count, 1); + match stdout.metadata.job { + Some(Job::Dummy(Dummy { nanos, .. })) => assert_eq!(nanos, vec![duration]), + _ => panic!("expected dummy job got {:?}", stdout.metadata.job), } - let operations: Vec = serde_json::from_slice( + let operations: Vec = serde_json::from_slice( &Command::cargo_bin("influxdb_iox") .unwrap() .arg("operation") @@ -45,33 +46,33 @@ async fn test_start_stop() { .expect("expected JSON output"); assert_eq!(operations.len(), 1); - match &operations[0].job { - Some(Job::Dummy { nanos, .. }) => { + match &operations[0].metadata.job { + Some(Job::Dummy(Dummy { nanos, .. })) => { assert_eq!(nanos.len(), 1); assert_eq!(nanos[0], duration); } - _ => panic!("expected dummy job got {:?}", &operations[0].job), + _ => panic!("expected dummy job got {:?}", &operations[0].metadata.job), } - let id = operations[0].id; + let name = &operations[0].operation.name; Command::cargo_bin("influxdb_iox") .unwrap() .arg("operation") .arg("cancel") - .arg(id.to_string()) + .arg(name.clone()) .arg("--host") .arg(addr) .assert() .success() .stdout(predicate::str::contains("Ok")); - let completed: Operation = serde_json::from_slice( + let completed: IoxOperation = serde_json::from_slice( &Command::cargo_bin("influxdb_iox") .unwrap() .arg("operation") .arg("wait") - .arg(id.to_string()) + .arg(name.to_string()) .arg("--host") .arg(addr) .assert() @@ -81,9 +82,8 @@ async fn test_start_stop() { ) .expect("expected JSON output"); - assert_eq!(completed.pending_count, 0); - assert_eq!(completed.total_count, 1); - assert_eq!(completed.cancelled_count, 1); - assert_eq!(completed.status, OperationStatus::Cancelled); - assert_eq!(&completed.job, &operations[0].job) + assert_eq!(completed.metadata.pending_count, 0); + assert_eq!(completed.metadata.total_count, 1); + assert_eq!(completed.metadata.cancelled_count, 1); + assert_eq!(&completed.metadata.job, &operations[0].metadata.job) } diff --git a/tests/end_to_end_cases/persistence.rs b/tests/end_to_end_cases/persistence.rs index 46a6f8c23e..2061e71642 100644 --- a/tests/end_to_end_cases/persistence.rs +++ b/tests/end_to_end_cases/persistence.rs @@ -2,7 +2,6 @@ use itertools::Itertools; use arrow_util::assert_batches_eq; use data_types::chunk_metadata::ChunkStorage; -use influxdb_iox_client::operations; use crate::{ common::server_fixture::ServerFixture, @@ -125,11 +124,11 @@ async fn test_full_lifecycle() { .await .unwrap() .iter() - .any(|operation| match operation.metadata().job { + .any(|operation| match &operation.metadata.job { Some(Job::CompactChunks(CompactChunks { db_name: operation_db_name, .. - })) => operation_db_name == db_name, + })) => operation_db_name == &db_name, _ => false, }); assert!(performed_compaction); @@ -269,20 +268,16 @@ async fn create_readbuffer_chunk(fixture: &ServerFixture, db_name: &str) -> u32 assert_eq!(chunks[0].storage, ChunkStorage::OpenMutableBuffer); // Move the chunk to read buffer - let operation = management_client + let iox_operation = management_client .close_partition_chunk(db_name, table_name, partition_key, 0) .await .expect("new partition chunk"); - println!("Operation response is {:?}", operation); - let operation_id = operation.id(); - - let meta = operations::ClientOperation::try_new(operation) - .unwrap() - .metadata(); + println!("Operation response is {:?}", iox_operation); + let operation_id = iox_operation.operation.id(); // ensure we got a legit job description back - if let Some(Job::CloseChunk(close_chunk)) = meta.job { + if let Some(Job::CloseChunk(close_chunk)) = iox_operation.metadata.job { assert_eq!(close_chunk.db_name, db_name); assert_eq!(close_chunk.partition_key, partition_key); assert_eq!(close_chunk.chunk_id, 0); diff --git a/tests/end_to_end_cases/scenario.rs b/tests/end_to_end_cases/scenario.rs index 8dfd6a7ebb..03c889ee71 100644 --- a/tests/end_to_end_cases/scenario.rs +++ b/tests/end_to_end_cases/scenario.rs @@ -556,12 +556,8 @@ where } if t_start.elapsed() >= wait_time { - let operations = fixture.operations_client().list_operations().await.unwrap(); - let mut operations: Vec<_> = operations - .into_iter() - .map(|x| (x.name().parse::().unwrap(), x.metadata())) - .collect(); - operations.sort_by_key(|x| x.0); + let mut operations = fixture.operations_client().list_operations().await.unwrap(); + operations.sort_by(|a, b| a.operation.name.cmp(&b.operation.name)); panic!( "Could not find {} within {:?}.\nChunks were: {:#?}\nOperations were: {:#?}", diff --git a/tests/end_to_end_cases/sql_cli.rs b/tests/end_to_end_cases/sql_cli.rs index 6c4a128926..e670710d1b 100644 --- a/tests/end_to_end_cases/sql_cli.rs +++ b/tests/end_to_end_cases/sql_cli.rs @@ -306,17 +306,20 @@ async fn test_sql_observer_operations() { let partition_key = "cpu"; let table_name = "cpu"; // Move the chunk to read buffer - let operation = management_client + let iox_operation = management_client .close_partition_chunk(&db_name, table_name, partition_key, 0) .await .expect("new partition chunk"); - println!("Operation response is {:?}", operation); + println!("Operation response is {:?}", iox_operation); // wait for the job to be done fixture .operations_client() - .wait_operation(operation.id(), Some(std::time::Duration::from_secs(1))) + .wait_operation( + iox_operation.operation.id(), + Some(std::time::Duration::from_secs(1)), + ) .await .expect("failed to wait operation"); diff --git a/tests/end_to_end_cases/system_tables.rs b/tests/end_to_end_cases/system_tables.rs index 530911f236..594ad0dba8 100644 --- a/tests/end_to_end_cases/system_tables.rs +++ b/tests/end_to_end_cases/system_tables.rs @@ -27,12 +27,12 @@ async fn test_operations() { .expect("write succeded"); // Move the chunk to read buffer - let operation = management_client + let iox_operation = management_client .close_partition_chunk(&db_name1, table_name, partition_key, 0) .await .expect("new partition chunk"); - let operation_id = operation.id(); + let operation_id = iox_operation.operation.id(); operations_client .wait_operation(operation_id, Some(std::time::Duration::from_secs(1))) .await