From 3aa3ebe0e8369aa964b8151c3d2db1883c663859 Mon Sep 17 00:00:00 2001 From: Paul Dix Date: Fri, 1 Apr 2022 18:51:01 -0400 Subject: [PATCH 01/12] chore: add compactor logging (#4207) --- compactor/src/compact.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/compactor/src/compact.rs b/compactor/src/compact.rs index c69df39933..4624b6529c 100644 --- a/compactor/src/compact.rs +++ b/compactor/src/compact.rs @@ -18,7 +18,7 @@ use iox_catalog::interface::{Catalog, Transaction}; use iox_object_store::ParquetFilePath; use metric::{Attributes, Metric, U64Counter, U64Gauge, U64Histogram, U64HistogramOptions}; use object_store::DynObjectStore; -use observability_deps::tracing::warn; +use observability_deps::tracing::{info, warn}; use parquet_file::metadata::{IoxMetadata, IoxParquetMetaData}; use query::{ compute_sort_key_for_chunks, exec::ExecutorType, frontend::reorg::ReorgPlanner, @@ -363,6 +363,7 @@ impl Compactor { partition_id: PartitionId, compaction_max_size_bytes: i64, ) -> Result<()> { + info!("compacting partition {}", partition_id); let start_time = self.time_provider.now(); let parquet_files = self @@ -388,6 +389,7 @@ impl Compactor { // Attach appropriate tombstones to each file let groups_with_tombstones = self.add_tombstones_to_groups(compact_file_groups).await?; + info!("compacting {} groups", groups_with_tombstones.len()); // Compact, persist,and update catalog accordingly for each overlaped file let mut tombstones = BTreeMap::new(); @@ -407,6 +409,7 @@ impl Compactor { // deleted. These should already be unique, no need to dedupe. let original_parquet_file_ids: Vec<_> = group.parquet_files.iter().map(|f| f.data.id).collect(); + info!("compacting group of files: {:?}", original_parquet_file_ids); // compact let split_compacted_files = self.compact(group.parquet_files).await?; From 833c10c0834ea769ecee96f7246be676e86844ba Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Sat, 2 Apr 2022 06:34:51 -0400 Subject: [PATCH 02/12] feat: return write_token from HTTP writes to router2 (#4202) * feat: return write_token from HTTP writes to router2 * fix: Update router2/src/dml_handlers/instrumentation.rs Co-authored-by: Dom * refactor: Use WriteSummary::default more vigorously * fix: fix typo and add links to follow on issues Co-authored-by: Dom Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com> --- Cargo.lock | 15 ++ Cargo.toml | 1 + data_types2/src/lib.rs | 12 ++ db/src/pred.rs | 2 +- generated_types/build.rs | 2 + .../iox/write_summary/v1/write_summary.proto | 18 ++ generated_types/src/lib.rs | 13 ++ .../tests/end_to_end_ng_cases/all_in_one.rs | 8 +- .../tests/end_to_end_ng_cases/querier.rs | 9 +- ioxd_router2/Cargo.toml | 1 + ioxd_router2/src/lib.rs | 11 +- router2/Cargo.toml | 1 + router2/benches/e2e.rs | 6 +- router2/src/dml_handlers/fan_out.rs | 11 +- router2/src/dml_handlers/instrumentation.rs | 7 +- router2/src/dml_handlers/mock.rs | 10 +- router2/src/dml_handlers/mod.rs | 3 + .../src/dml_handlers/sharded_write_buffer.rs | 48 +++-- router2/src/dml_handlers/write_summary.rs | 60 ++++++ router2/src/server/grpc.rs | 16 +- router2/src/server/http.rs | 65 +++--- router2/tests/http.rs | 14 +- test_helpers_end_to_end_ng/src/client.rs | 31 ++- write_summary/Cargo.toml | 26 +++ write_summary/src/lib.rs | 199 ++++++++++++++++++ 25 files changed, 504 insertions(+), 85 deletions(-) create mode 100644 generated_types/protos/influxdata/iox/write_summary/v1/write_summary.proto create mode 100644 router2/src/dml_handlers/write_summary.rs create mode 100644 write_summary/Cargo.toml create mode 100644 write_summary/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 6673349571..3ef023dbfc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2747,6 +2747,7 @@ dependencies = [ "trace_http", "workspace-hack", "write_buffer", + "write_summary", ] [[package]] @@ -4755,6 +4756,7 @@ dependencies = [ "trace", "workspace-hack", "write_buffer", + "write_summary", ] [[package]] @@ -6901,6 +6903,19 @@ dependencies = [ "workspace-hack", ] +[[package]] +name = "write_summary" +version = "0.1.0" +dependencies = [ + "base64 0.13.0", + "data_types", + "dml", + "generated_types", + "serde_json", + "time 0.1.0", + "workspace-hack", +] + [[package]] name = "xml-rs" version = "0.8.4" diff --git a/Cargo.toml b/Cargo.toml index 70d9b30945..409b6c9bc0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -78,6 +78,7 @@ members = [ "trogging", "workspace-hack", "write_buffer", + "write_summary", ] default-members = ["influxdb_iox"] diff --git a/data_types2/src/lib.rs b/data_types2/src/lib.rs index c7cb4dd184..fc6a9438b5 100644 --- a/data_types2/src/lib.rs +++ b/data_types2/src/lib.rs @@ -816,3 +816,15 @@ impl IngesterQueryRequest { } } } + +/// The information on what sequence numbers were stored for a +/// particular (line protocol) write that may have been sharded / +/// partitioned across multiple sequencers +/// +/// This information can be used to wait for a particular write to +/// become readable. +#[derive(Debug, Clone)] +pub struct SequencerWrites { + /// List of sequences + pub sequencers: Vec, +} diff --git a/db/src/pred.rs b/db/src/pred.rs index 2af8dd2820..1e52256e19 100644 --- a/db/src/pred.rs +++ b/db/src/pred.rs @@ -14,7 +14,7 @@ pub enum Error { pub type Result = std::result::Result; -/// Converts a [`predicate::predicate::Predicate`] into [`read_buffer::Predicate`], +/// Converts a [`predicate::Predicate`] into [`read_buffer::Predicate`], /// suitable for evaluating on the ReadBuffer. /// /// NOTE: a valid Read Buffer predicate is not guaranteed to be applicable to an diff --git a/generated_types/build.rs b/generated_types/build.rs index 729a39a952..6baa0e0af1 100644 --- a/generated_types/build.rs +++ b/generated_types/build.rs @@ -43,6 +43,7 @@ fn generate_grpc_types(root: &Path) -> Result<()> { let schema_path = root.join("influxdata/iox/schema/v1"); let storage_path = root.join("influxdata/platform/storage"); let write_buffer_path = root.join("influxdata/iox/write_buffer/v1"); + let write_summary_path = root.join("influxdata/iox/write_summary/v1"); let proto_files = vec![ delete_path.join("service.proto"), @@ -77,6 +78,7 @@ fn generate_grpc_types(root: &Path) -> Result<()> { storage_path.join("storage_common.proto"), storage_path.join("test.proto"), write_buffer_path.join("write_buffer.proto"), + write_summary_path.join("write_summary.proto"), ]; // Tell cargo to recompile if any of these proto files are changed diff --git a/generated_types/protos/influxdata/iox/write_summary/v1/write_summary.proto b/generated_types/protos/influxdata/iox/write_summary/v1/write_summary.proto new file mode 100644 index 0000000000..f539001e15 --- /dev/null +++ b/generated_types/protos/influxdata/iox/write_summary/v1/write_summary.proto @@ -0,0 +1,18 @@ +syntax = "proto3"; +package influxdata.iox.write_summary.v1; + +// Represents a single logical write that was partitioned and shared +// into multiple pieces in multiple sequencers +message WriteSummary { + // per sequencer information + repeated SequencerWrite sequencers = 1; +} + +// Per sequencer information aout what sequence numbers contain part of a write +message SequencerWrite { + // Unique sequencer ID. + uint32 sequencer_id = 1; + + // Which sequence numbers for this sequencer had data + repeated uint64 sequence_numbers = 13; +} diff --git a/generated_types/src/lib.rs b/generated_types/src/lib.rs index 1b4678850c..9e45adcfc5 100644 --- a/generated_types/src/lib.rs +++ b/generated_types/src/lib.rs @@ -156,6 +156,19 @@ pub mod influxdata { )); } } + + pub mod write_summary { + pub mod v1 { + include!(concat!( + env!("OUT_DIR"), + "/influxdata.iox.write_summary.v1.rs" + )); + include!(concat!( + env!("OUT_DIR"), + "/influxdata.iox.write_summary.v1.serde.rs" + )); + } + } } pub mod pbdata { diff --git a/influxdb_iox/tests/end_to_end_ng_cases/all_in_one.rs b/influxdb_iox/tests/end_to_end_ng_cases/all_in_one.rs index 4f53bce4b2..dfef9382f6 100644 --- a/influxdb_iox/tests/end_to_end_ng_cases/all_in_one.rs +++ b/influxdb_iox/tests/end_to_end_ng_cases/all_in_one.rs @@ -1,8 +1,8 @@ use arrow_util::assert_batches_sorted_eq; use http::StatusCode; use test_helpers_end_to_end_ng::{ - maybe_skip_integration, query_until_results, rand_name, write_to_router, ServerFixture, - TestConfig, + get_write_token, maybe_skip_integration, query_when_readable, rand_name, write_to_router, + ServerFixture, TestConfig, }; #[tokio::test] @@ -26,12 +26,14 @@ async fn smoke() { let response = write_to_router(lp, org, bucket, all_in_one.server().router_http_base()).await; assert_eq!(response.status(), StatusCode::NO_CONTENT); + let write_token = get_write_token(&response); // run query in a loop until the data becomes available let sql = format!("select * from {}", table_name); - let batches = query_until_results( + let batches = query_when_readable( sql, namespace, + write_token, all_in_one.server().querier_grpc_connection(), ) .await; diff --git a/influxdb_iox/tests/end_to_end_ng_cases/querier.rs b/influxdb_iox/tests/end_to_end_ng_cases/querier.rs index 61b76c3676..779268ece2 100644 --- a/influxdb_iox/tests/end_to_end_ng_cases/querier.rs +++ b/influxdb_iox/tests/end_to_end_ng_cases/querier.rs @@ -1,7 +1,7 @@ use arrow_util::assert_batches_sorted_eq; use http::StatusCode; use test_helpers_end_to_end_ng::{ - maybe_skip_integration, query_until_results, MiniCluster, TestConfig, + get_write_token, maybe_skip_integration, query_when_readable, MiniCluster, TestConfig, }; #[tokio::test] @@ -28,11 +28,16 @@ async fn basic_on_parquet() { let response = cluster.write_to_router(lp).await; assert_eq!(response.status(), StatusCode::NO_CONTENT); + // assert that the response contains a write token + let write_token = get_write_token(&response); + assert!(!write_token.is_empty()); + // run query in a loop until the data becomes available let sql = format!("select * from {}", table_name); - let batches = query_until_results( + let batches = query_when_readable( sql, cluster.namespace(), + write_token, cluster.querier().querier_grpc_connection(), ) .await; diff --git a/ioxd_router2/Cargo.toml b/ioxd_router2/Cargo.toml index 9c734477f3..f3c63515fb 100644 --- a/ioxd_router2/Cargo.toml +++ b/ioxd_router2/Cargo.toml @@ -22,6 +22,7 @@ time = { path = "../time" } trace = { path = "../trace" } trace_http = { path = "../trace_http" } write_buffer = { path = "../write_buffer" } +write_summary = { path = "../write_summary" } # Crates.io dependencies, in alphabetical order arrow-flight = "11" diff --git a/ioxd_router2/src/lib.rs b/ioxd_router2/src/lib.rs index d76c33e523..d608f90bd6 100644 --- a/ioxd_router2/src/lib.rs +++ b/ioxd_router2/src/lib.rs @@ -17,16 +17,17 @@ use router2::{ dml_handlers::{ DmlHandler, DmlHandlerChainExt, FanOutAdaptor, InstrumentationDecorator, NamespaceAutocreation, Partitioner, SchemaValidator, ShardedWriteBuffer, + WriteSummaryAdapter, }, namespace_cache::{metrics::InstrumentedCache, MemoryNamespaceCache, ShardedCache}, sequencer::Sequencer, server::{grpc::GrpcDelegate, http::HttpDelegate, RouterServer}, sharder::JumpHash, }; +use thiserror::Error; use tokio_util::sync::CancellationToken; use trace::TraceCollector; - -use thiserror::Error; +use write_summary::WriteSummary; use ioxd_common::{ add_service, @@ -71,7 +72,7 @@ impl RouterServerType { #[async_trait] impl ServerType for RouterServerType where - D: DmlHandler> + 'static, + D: DmlHandler, WriteOutput = WriteSummary> + 'static, { /// Return the [`metric::Registry`] used by the router. fn metric_registry(&self) -> Arc { @@ -233,6 +234,8 @@ pub async fn create_router2_server_type( // //////////////////////////////////////////////////////////////////////////// + let parallel_write = WriteSummaryAdapter::new(FanOutAdaptor::new(write_buffer)); + // Build the chain of DML handlers that forms the request processing // pipeline, starting with the namespace creator (for testing purposes) and // write partitioner that yields a set of partitioned batches. @@ -248,7 +251,7 @@ pub async fn create_router2_server_type( .and_then(InstrumentationDecorator::new( "parallel_write", &*metrics, - FanOutAdaptor::new(write_buffer), + parallel_write, )); // Record the overall request handling latency diff --git a/router2/Cargo.toml b/router2/Cargo.toml index 88002e06c2..fa8519c751 100644 --- a/router2/Cargo.toml +++ b/router2/Cargo.toml @@ -35,6 +35,7 @@ tonic = "0.6" trace = { path = "../trace/" } workspace-hack = { path = "../workspace-hack"} write_buffer = { path = "../write_buffer" } +write_summary = { path = "../write_summary" } [dev-dependencies] assert_matches = "1.5" diff --git a/router2/benches/e2e.rs b/router2/benches/e2e.rs index 0b67b2311c..8af8d00ec4 100644 --- a/router2/benches/e2e.rs +++ b/router2/benches/e2e.rs @@ -7,6 +7,7 @@ use iox_catalog::{interface::Catalog, mem::MemCatalog}; use router2::{ dml_handlers::{ DmlHandlerChainExt, FanOutAdaptor, Partitioner, SchemaValidator, ShardedWriteBuffer, + WriteSummaryAdapter, }, namespace_cache::{MemoryNamespaceCache, ShardedCache}, sequencer::Sequencer, @@ -66,8 +67,9 @@ fn e2e_benchmarks(c: &mut Criterion) { parts: vec![TemplatePart::TimeFormat("%Y-%m-%d".to_owned())], }); - let handler_stack = - schema_validator.and_then(partitioner.and_then(FanOutAdaptor::new(write_buffer))); + let handler_stack = schema_validator.and_then( + partitioner.and_then(WriteSummaryAdapter::new(FanOutAdaptor::new(write_buffer))), + ); HttpDelegate::new(1024, Arc::new(handler_stack), &metrics) }; diff --git a/router2/src/dml_handlers/fan_out.rs b/router2/src/dml_handlers/fan_out.rs index 40adff45ed..74be1dc67d 100644 --- a/router2/src/dml_handlers/fan_out.rs +++ b/router2/src/dml_handlers/fan_out.rs @@ -2,7 +2,7 @@ use super::DmlHandler; use async_trait::async_trait; use data_types2::{DatabaseName, DeletePredicate}; use futures::{stream::FuturesUnordered, TryStreamExt}; -use std::{fmt::Debug, future, marker::PhantomData}; +use std::{fmt::Debug, marker::PhantomData}; use trace::ctx::SpanContext; /// A [`FanOutAdaptor`] takes an iterator of DML write operation inputs and @@ -38,7 +38,7 @@ where U: Iterator + Send + Sync, { type WriteInput = I; - type WriteOutput = (); + type WriteOutput = Vec; type WriteError = T::WriteError; type DeleteError = T::DeleteError; @@ -51,7 +51,7 @@ where input: Self::WriteInput, span_ctx: Option, ) -> Result { - input + let results = input .into_iter() .map(|v| { let namespace = namespace.clone(); @@ -59,10 +59,9 @@ where async move { self.inner.write(&namespace, v, span_ctx).await } }) .collect::>() - .try_for_each(|_| future::ready(Ok(()))) + .try_collect::>() .await?; - - Ok(()) + Ok(results) } /// Pass the delete through to the inner handler. diff --git a/router2/src/dml_handlers/instrumentation.rs b/router2/src/dml_handlers/instrumentation.rs index 6c4bfabc43..b0a4a111b1 100644 --- a/router2/src/dml_handlers/instrumentation.rs +++ b/router2/src/dml_handlers/instrumentation.rs @@ -165,6 +165,7 @@ mod tests { use metric::Attributes; use std::sync::Arc; use trace::{span::SpanStatus, RingBufferTraceCollector, TraceCollector}; + use write_summary::WriteSummary; const HANDLER_NAME: &str = "bananas"; @@ -205,10 +206,14 @@ mod tests { ); } + fn summary() -> WriteSummary { + WriteSummary::default() + } + #[tokio::test] async fn test_write_ok() { let ns = "platanos".try_into().unwrap(); - let handler = Arc::new(MockDmlHandler::default().with_write_return([Ok(())])); + let handler = Arc::new(MockDmlHandler::default().with_write_return([Ok(summary())])); let metrics = Arc::new(metric::Registry::default()); let traces: Arc = Arc::new(RingBufferTraceCollector::new(5)); diff --git a/router2/src/dml_handlers/mock.rs b/router2/src/dml_handlers/mock.rs index 1649c7e9cf..9b4d07cb28 100644 --- a/router2/src/dml_handlers/mock.rs +++ b/router2/src/dml_handlers/mock.rs @@ -6,6 +6,7 @@ use async_trait::async_trait; use data_types2::{DatabaseName, DeletePredicate}; use parking_lot::Mutex; use trace::ctx::SpanContext; +use write_summary::WriteSummary; /// A captured call to a [`MockDmlHandler`], generic over `W`, the captured /// [`DmlHandler::WriteInput`] type. @@ -25,7 +26,7 @@ pub enum MockDmlHandlerCall { #[derive(Debug)] struct Inner { calls: Vec>, - write_return: VecDeque>, + write_return: VecDeque>, delete_return: VecDeque>, } @@ -58,7 +59,10 @@ impl MockDmlHandler where W: Clone, { - pub fn with_write_return(self, ret: impl Into>>) -> Self { + pub fn with_write_return( + self, + ret: impl Into>>, + ) -> Self { self.0.lock().write_return = ret.into(); self } @@ -93,7 +97,7 @@ where type WriteError = DmlError; type DeleteError = DmlError; type WriteInput = W; - type WriteOutput = (); + type WriteOutput = WriteSummary; async fn write( &self, diff --git a/router2/src/dml_handlers/mod.rs b/router2/src/dml_handlers/mod.rs index 8acb43875f..baf6c633f7 100644 --- a/router2/src/dml_handlers/mod.rs +++ b/router2/src/dml_handlers/mod.rs @@ -99,5 +99,8 @@ pub use chain::*; mod fan_out; pub use fan_out::*; +mod write_summary; +pub use self::write_summary::*; + #[cfg(test)] pub mod mock; diff --git a/router2/src/dml_handlers/sharded_write_buffer.rs b/router2/src/dml_handlers/sharded_write_buffer.rs index 51953215cb..0798d67153 100644 --- a/router2/src/dml_handlers/sharded_write_buffer.rs +++ b/router2/src/dml_handlers/sharded_write_buffer.rs @@ -11,7 +11,6 @@ use mutable_batch::MutableBatch; use observability_deps::tracing::*; use std::{ fmt::{Debug, Display}, - future, sync::Arc, }; use thiserror::Error; @@ -81,7 +80,7 @@ where type DeleteError = ShardError; type WriteInput = Partitioned>; - type WriteOutput = (); + type WriteOutput = Vec; /// Shard `writes` and dispatch the resultant DML operations. async fn write( @@ -162,30 +161,37 @@ where /// Enumerates all items in the iterator, maps each to a future that dispatches /// the [`DmlOperation`] to its paired [`Sequencer`], executes all the futures /// in parallel and gathers any errors. -async fn parallel_enqueue(v: T) -> Result<(), ShardError> +/// +/// Returns a list of the sequences that were written +async fn parallel_enqueue(v: T) -> Result, ShardError> where T: Iterator, DmlOperation)> + Send, { - let mut successes = 0; - let errs = v - .map(|(sequencer, op)| async move { - tokio::spawn(async move { sequencer.enqueue(op).await }) - .await - .expect("shard enqueue panic") - }) - .collect::>() - .filter_map(|v| { - if v.is_ok() { - successes += 1; - } - future::ready(v.err()) - }) - .collect::>() - .await; + let mut successes = vec![]; + let mut errs = vec![]; + + v.map(|(sequencer, op)| async move { + tokio::spawn(async move { sequencer.enqueue(op).await }) + .await + .expect("shard enqueue panic") + }) + // Use FuturesUnordered so the futures can run in parallel + .collect::>() + .collect::>() + .await + // Sort the result into successes/failures upon completion + .into_iter() + .for_each(|v| match v { + Ok(meta) => successes.push(meta), + Err(e) => errs.push(e), + }); match errs.len() { - 0 => Ok(()), - _n => Err(ShardError::WriteBufferErrors { successes, errs }), + 0 => Ok(successes), + _n => Err(ShardError::WriteBufferErrors { + successes: successes.len(), + errs, + }), } } diff --git a/router2/src/dml_handlers/write_summary.rs b/router2/src/dml_handlers/write_summary.rs new file mode 100644 index 0000000000..806f34b50a --- /dev/null +++ b/router2/src/dml_handlers/write_summary.rs @@ -0,0 +1,60 @@ +use super::DmlHandler; +use async_trait::async_trait; +use data_types2::{DatabaseName, DeletePredicate}; +use dml::DmlMeta; +use std::fmt::Debug; +use trace::ctx::SpanContext; +use write_summary::WriteSummary; + +/// A [`WriteSummaryAdapter`] wraps DML Handler that produces +/// `Vec>` for each write, and produces a WriteSummary, +/// suitable for +/// sending back to a client +#[derive(Debug, Default)] +pub struct WriteSummaryAdapter { + inner: T, +} + +impl WriteSummaryAdapter { + /// Construct a [`WriteSummaryAdapter`] that passes DML operations to `inner` + /// concurrently. + pub fn new(inner: T) -> Self { + Self { inner } + } +} + +#[async_trait] +impl DmlHandler for WriteSummaryAdapter +where + T: DmlHandler>>, +{ + type WriteInput = T::WriteInput; + type WriteOutput = WriteSummary; + type WriteError = T::WriteError; + type DeleteError = T::DeleteError; + + /// Sends `input` to the inner handler, which returns a + /// `Vec>`, creating a `WriteSummary` + async fn write( + &self, + namespace: &DatabaseName<'static>, + input: Self::WriteInput, + span_ctx: Option, + ) -> Result { + let metas = self.inner.write(namespace, input, span_ctx).await?; + Ok(WriteSummary::new(metas)) + } + + /// Pass the delete through to the inner handler. + async fn delete( + &self, + namespace: &DatabaseName<'static>, + table_name: &str, + predicate: &DeletePredicate, + span_ctx: Option, + ) -> Result<(), Self::DeleteError> { + self.inner + .delete(namespace, table_name, predicate, span_ctx) + .await + } +} diff --git a/router2/src/server/grpc.rs b/router2/src/server/grpc.rs index c7a8733d02..a01ae4f38e 100644 --- a/router2/src/server/grpc.rs +++ b/router2/src/server/grpc.rs @@ -15,6 +15,7 @@ use schema::selection::Selection; use std::ops::DerefMut; use tonic::{Request, Response, Status}; use trace::ctx::SpanContext; +use write_summary::WriteSummary; use crate::dml_handlers::{DmlError, DmlHandler, PartitionError}; @@ -45,7 +46,7 @@ impl GrpcDelegate { impl GrpcDelegate where - D: DmlHandler> + 'static, + D: DmlHandler, WriteOutput = WriteSummary> + 'static, { /// Acquire a [`WriteService`] gRPC service implementation. /// @@ -159,6 +160,9 @@ where "routing grpc write", ); + // TODO return the produced WriteSummary to the client + // https://github.com/influxdata/influxdb_iox/issues/4208 + self.dml_handler .write(&namespace, tables, span_ctx) .await @@ -265,10 +269,14 @@ mod tests { use super::*; + fn summary() -> WriteSummary { + WriteSummary::default() + } + #[tokio::test] async fn test_write_no_batch() { let metrics = Arc::new(metric::Registry::default()); - let handler = Arc::new(MockDmlHandler::default().with_write_return([Ok(())])); + let handler = Arc::new(MockDmlHandler::default().with_write_return([Ok(summary())])); let grpc = super::WriteService::new(Arc::clone(&handler), &metrics); let req = WriteRequest::default(); @@ -285,7 +293,7 @@ mod tests { #[tokio::test] async fn test_write_no_namespace() { let metrics = Arc::new(metric::Registry::default()); - let handler = Arc::new(MockDmlHandler::default().with_write_return([Ok(())])); + let handler = Arc::new(MockDmlHandler::default().with_write_return([Ok(summary())])); let grpc = super::WriteService::new(Arc::clone(&handler), &metrics); let req = WriteRequest { @@ -307,7 +315,7 @@ mod tests { #[tokio::test] async fn test_write_ok() { let metrics = Arc::new(metric::Registry::default()); - let handler = Arc::new(MockDmlHandler::default().with_write_return([Ok(())])); + let handler = Arc::new(MockDmlHandler::default().with_write_return([Ok(summary())])); let grpc = super::WriteService::new(Arc::clone(&handler), &metrics); let req = WriteRequest { diff --git a/router2/src/server/http.rs b/router2/src/server/http.rs index fb4c4bc9b4..ace220c21f 100644 --- a/router2/src/server/http.rs +++ b/router2/src/server/http.rs @@ -18,6 +18,9 @@ use serde::Deserialize; use thiserror::Error; use time::{SystemProvider, TimeProvider}; use trace::ctx::SpanContext; +use write_summary::WriteSummary; + +const WRITE_TOKEN_HTTP_HEADER: &str = "X-IOx-Write-Token"; /// Errors returned by the `router2` HTTP request handler. #[derive(Debug, Error)] @@ -263,7 +266,7 @@ impl HttpDelegate { impl HttpDelegate where - D: DmlHandler>, + D: DmlHandler, WriteOutput = WriteSummary>, T: TimeProvider, { /// Routes `req` to the appropriate handler, if any, returning the handler @@ -274,10 +277,16 @@ where (&Method::POST, "/api/v2/delete") => self.delete_handler(req).await, _ => return Err(Error::NoHandler), } - .map(|_| response_no_content()) + .map(|summary| { + Response::builder() + .status(StatusCode::NO_CONTENT) + .header(WRITE_TOKEN_HTTP_HEADER, summary.to_token()) + .body(Body::empty()) + .unwrap() + }) } - async fn write_handler(&self, req: Request) -> Result<(), Error> { + async fn write_handler(&self, req: Request) -> Result { let span_ctx: Option = req.extensions().get().cloned(); let write_info = WriteInfo::try_from(&req)?; @@ -300,7 +309,7 @@ where Ok(v) => v, Err(mutable_batch_lp::Error::EmptyPayload) => { debug!("nothing to write"); - return Ok(()); + return Ok(WriteSummary::default()); } Err(e) => return Err(Error::ParseLineProtocol(e)), }; @@ -318,7 +327,8 @@ where "routing write", ); - self.dml_handler + let summary = self + .dml_handler .write(&namespace, batches, span_ctx) .await .map_err(Into::into)?; @@ -328,10 +338,10 @@ where self.write_metric_tables.inc(num_tables as _); self.write_metric_body_size.inc(body.len() as _); - Ok(()) + Ok(summary) } - async fn delete_handler(&self, req: Request) -> Result<(), Error> { + async fn delete_handler(&self, req: Request) -> Result { let span_ctx: Option = req.extensions().get().cloned(); let account = WriteInfo::try_from(&req)?; @@ -376,7 +386,9 @@ where self.delete_metric_body_size.inc(body.len() as _); - Ok(()) + // TODO pass back write summaries for deletes as well + // https://github.com/influxdata/influxdb_iox/issues/4209 + Ok(WriteSummary::default()) } /// Parse the request's body into raw bytes, applying the configured size @@ -437,13 +449,6 @@ where } } -fn response_no_content() -> Response { - Response::builder() - .status(StatusCode::NO_CONTENT) - .body(Body::empty()) - .unwrap() -} - #[cfg(test)] mod tests { use std::{io::Write, iter, sync::Arc}; @@ -460,6 +465,10 @@ mod tests { const MAX_BYTES: usize = 1024; + fn summary() -> WriteSummary { + WriteSummary::default() + } + fn assert_metric_hit(metrics: &metric::Registry, name: &'static str, value: Option) { let counter = metrics .get_instrument::>(name) @@ -639,7 +648,7 @@ mod tests { ok, query_string = "?org=bananas&bucket=test", body = "platanos,tag1=A,tag2=B val=42i 123456".as_bytes(), - dml_handler = [Ok(())], + dml_handler = [Ok(summary())], want_result = Ok(_), want_dml_calls = [MockDmlHandlerCall::Write{namespace, ..}] => { assert_eq!(namespace, "bananas_test"); @@ -650,7 +659,7 @@ mod tests { ok_precision_s, query_string = "?org=bananas&bucket=test&precision=s", body = "platanos,tag1=A,tag2=B val=42i 1647622847".as_bytes(), - dml_handler = [Ok(())], + dml_handler = [Ok(summary())], want_result = Ok(_), want_dml_calls = [MockDmlHandlerCall::Write{namespace, write_input}] => { assert_eq!(namespace, "bananas_test"); @@ -665,7 +674,7 @@ mod tests { ok_precision_ms, query_string = "?org=bananas&bucket=test&precision=ms", body = "platanos,tag1=A,tag2=B val=42i 1647622847000".as_bytes(), - dml_handler = [Ok(())], + dml_handler = [Ok(summary())], want_result = Ok(_), want_dml_calls = [MockDmlHandlerCall::Write{namespace, write_input}] => { assert_eq!(namespace, "bananas_test"); @@ -680,7 +689,7 @@ mod tests { ok_precision_us, query_string = "?org=bananas&bucket=test&precision=us", body = "platanos,tag1=A,tag2=B val=42i 1647622847000000".as_bytes(), - dml_handler = [Ok(())], + dml_handler = [Ok(summary())], want_result = Ok(_), want_dml_calls = [MockDmlHandlerCall::Write{namespace, write_input}] => { assert_eq!(namespace, "bananas_test"); @@ -695,7 +704,7 @@ mod tests { ok_precision_ns, query_string = "?org=bananas&bucket=test&precision=ns", body = "platanos,tag1=A,tag2=B val=42i 1647622847000000000".as_bytes(), - dml_handler = [Ok(())], + dml_handler = [Ok(summary())], want_result = Ok(_), want_dml_calls = [MockDmlHandlerCall::Write{namespace, write_input}] => { assert_eq!(namespace, "bananas_test"); @@ -711,7 +720,7 @@ mod tests { // SECONDS, so multiplies the provided timestamp by 1,000,000,000 query_string = "?org=bananas&bucket=test&precision=s", body = "platanos,tag1=A,tag2=B val=42i 1647622847000000000".as_bytes(), - dml_handler = [Ok(())], + dml_handler = [Ok(summary())], want_result = Err(Error::ParseLineProtocol(_)), want_dml_calls = [] ); @@ -720,7 +729,7 @@ mod tests { no_query_params, query_string = "", body = "platanos,tag1=A,tag2=B val=42i 123456".as_bytes(), - dml_handler = [Ok(())], + dml_handler = [Ok(summary())], want_result = Err(Error::InvalidOrgBucket(OrgBucketError::NotSpecified)), want_dml_calls = [] // None ); @@ -729,7 +738,7 @@ mod tests { no_org_bucket, query_string = "?", body = "platanos,tag1=A,tag2=B val=42i 123456".as_bytes(), - dml_handler = [Ok(())], + dml_handler = [Ok(summary())], want_result = Err(Error::InvalidOrgBucket(OrgBucketError::DecodeFail(_))), want_dml_calls = [] // None ); @@ -738,7 +747,7 @@ mod tests { empty_org_bucket, query_string = "?org=&bucket=", body = "platanos,tag1=A,tag2=B val=42i 123456".as_bytes(), - dml_handler = [Ok(())], + dml_handler = [Ok(summary())], want_result = Err(Error::InvalidOrgBucket(OrgBucketError::NotSpecified)), want_dml_calls = [] // None ); @@ -747,7 +756,7 @@ mod tests { invalid_org_bucket, query_string = format!("?org=test&bucket={}", "A".repeat(1000)), body = "platanos,tag1=A,tag2=B val=42i 123456".as_bytes(), - dml_handler = [Ok(())], + dml_handler = [Ok(summary())], want_result = Err(Error::InvalidOrgBucket(OrgBucketError::MappingFail(_))), want_dml_calls = [] // None ); @@ -756,7 +765,7 @@ mod tests { invalid_line_protocol, query_string = "?org=bananas&bucket=test", body = "not line protocol".as_bytes(), - dml_handler = [Ok(())], + dml_handler = [Ok(summary())], want_result = Err(Error::ParseLineProtocol(_)), want_dml_calls = [] // None ); @@ -765,7 +774,7 @@ mod tests { non_utf8_body, query_string = "?org=bananas&bucket=test", body = vec![0xc3, 0x28], - dml_handler = [Ok(())], + dml_handler = [Ok(summary())], want_result = Err(Error::NonUtf8Body(_)), want_dml_calls = [] // None ); @@ -793,7 +802,7 @@ mod tests { .flat_map(|s| s.bytes()) .collect::>() }, - dml_handler = [Ok(())], + dml_handler = [Ok(summary())], want_result = Err(Error::RequestSizeExceeded(_)), want_dml_calls = [] // None ); diff --git a/router2/tests/http.rs b/router2/tests/http.rs index 676110a959..217bf64b64 100644 --- a/router2/tests/http.rs +++ b/router2/tests/http.rs @@ -10,7 +10,7 @@ use router2::{ dml_handlers::{ Chain, DmlError, DmlHandlerChainExt, FanOutAdaptor, InstrumentationDecorator, NamespaceAutocreation, Partitioned, Partitioner, SchemaError, SchemaValidator, - ShardedWriteBuffer, + ShardedWriteBuffer, WriteSummaryAdapter, }, namespace_cache::{MemoryNamespaceCache, ShardedCache}, sequencer::Sequencer, @@ -55,9 +55,11 @@ type HttpDelegateStack = HttpDelegate< >, Partitioner, >, - FanOutAdaptor< - ShardedWriteBuffer>>, - Vec>>, + WriteSummaryAdapter< + FanOutAdaptor< + ShardedWriteBuffer>>, + Vec>>, + >, >, >, >, @@ -109,7 +111,9 @@ impl TestContext { let handler_stack = ns_creator .and_then(schema_validator) .and_then(partitioner) - .and_then(FanOutAdaptor::new(sharded_write_buffer)); + .and_then(WriteSummaryAdapter::new(FanOutAdaptor::new( + sharded_write_buffer, + ))); let handler_stack = InstrumentationDecorator::new("request", &*metrics, handler_stack); diff --git a/test_helpers_end_to_end_ng/src/client.rs b/test_helpers_end_to_end_ng/src/client.rs index 649ee04ad5..d595138cca 100644 --- a/test_helpers_end_to_end_ng/src/client.rs +++ b/test_helpers_end_to_end_ng/src/client.rs @@ -34,20 +34,41 @@ pub async fn write_to_router( .expect("http error sending write") } -/// Runs a query using the flight API on the specified connection -/// until responses are produced -/// -/// The retry loop is used to wait for writes to become visible +/// Extracts the write token from the specified response (to the /api/v2/write api) +pub fn get_write_token(response: &Response) -> String { + let message = format!("no write token in {:?}", response); + response + .headers() + .get("X-IOx-Write-Token") + .expect(&message) + .to_str() + .expect("Value not a string") + .to_string() +} + const MAX_QUERY_RETRY_TIME_SEC: u64 = 10; -pub async fn query_until_results( +/// Runs a query using the flight API on the specified connection +/// until responses are produced. +/// +/// (Will) eventually Wait until data from the specified write token +/// is readable, but currently waits for +/// +/// The retry loop is used to wait for writes to become visible +pub async fn query_when_readable( sql: impl Into, namespace: impl Into, + write_token: impl Into, connection: Connection, ) -> Vec { let namespace = namespace.into(); let sql = sql.into(); + println!( + "(TODO) Waiting for Write Token to be visible {}", + write_token.into() + ); + let mut client = influxdb_iox_client::flight::Client::new(connection); // This does nothing except test the client handshake implementation. diff --git a/write_summary/Cargo.toml b/write_summary/Cargo.toml new file mode 100644 index 0000000000..d776e6e2a5 --- /dev/null +++ b/write_summary/Cargo.toml @@ -0,0 +1,26 @@ +[package] +name = "write_summary" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +# Workspace dependencies, in alphabetical order +dml = { path = "../dml" } +generated_types = { path = "../generated_types" } + + +# Crates.io dependencies, in alphabetical order +base64 = "0.13" +serde_json = "1.0.79" +workspace-hack = { path = "../workspace-hack"} + + +[dev-dependencies] +# Workspace dependencies, in alphabetical order +data_types = { path = "../data_types" } +time = { path = "../time" } + + +# Crates.io dependencies, in alphabetical order diff --git a/write_summary/src/lib.rs b/write_summary/src/lib.rs new file mode 100644 index 0000000000..3dfc2ab33c --- /dev/null +++ b/write_summary/src/lib.rs @@ -0,0 +1,199 @@ +use std::collections::BTreeMap; + +/// Protobuf to/from conversion +use generated_types::influxdata::iox::write_summary::v1 as proto; + +use dml::DmlMeta; + +/// Contains information about a single write. +/// +/// A single write consisting of multiple lines of line protocol +/// formatted data are shared and partitioned across potentially +/// several sequencers which are then processed by the ingester to +/// become readable at potentially different times. +/// +/// This struct contains sufficient information to determine the +/// current state of the write as a whole +#[derive(Debug, Default)] +/// Summary of a Vec> +pub struct WriteSummary { + metas: Vec>, +} + +impl WriteSummary { + pub fn new(metas: Vec>) -> Self { + Self { metas } + } + + /// Return an opaque summary "token" of this summary + pub fn to_token(self) -> String { + let proto_write_summary: proto::WriteSummary = self.into(); + base64::encode( + serde_json::to_string(&proto_write_summary) + .expect("unexpected error serializing token to json"), + ) + } +} + +impl From for proto::WriteSummary { + fn from(summary: WriteSummary) -> Self { + // create a map from sequencer_id to sequences + let sequences = summary + .metas + .iter() + .flat_map(|v| v.iter()) + .filter_map(|meta| meta.sequence()); + + // Use BTreeMap to ensure consistent output + let mut sequencers = BTreeMap::new(); + for s in sequences { + sequencers + .entry(s.sequencer_id) + .or_insert_with(Vec::new) + .push(s.sequence_number) + } + + let sequencers = sequencers + .into_iter() + .map(|(sequencer_id, sequence_numbers)| proto::SequencerWrite { + sequencer_id, + sequence_numbers, + }) + .collect(); + + Self { sequencers } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use data_types::sequence::Sequence; + + #[test] + fn empty() { + let metas = vec![]; + let summary: proto::WriteSummary = WriteSummary::new(metas).into(); + + let expected = proto::WriteSummary { sequencers: vec![] }; + + assert_eq!(summary, expected); + } + + #[test] + fn one() { + let metas = vec![vec![make_meta(Sequence::new(1, 2))]]; + let summary: proto::WriteSummary = WriteSummary::new(metas).into(); + + let expected = proto::WriteSummary { + sequencers: vec![proto::SequencerWrite { + sequencer_id: 1, + sequence_numbers: vec![2], + }], + }; + + assert_eq!(summary, expected); + } + + #[test] + fn many() { + let metas = vec![ + vec![ + make_meta(Sequence::new(1, 2)), + make_meta(Sequence::new(10, 20)), + ], + vec![make_meta(Sequence::new(1, 3))], + ]; + let summary: proto::WriteSummary = WriteSummary::new(metas).into(); + + let expected = proto::WriteSummary { + sequencers: vec![ + proto::SequencerWrite { + sequencer_id: 1, + sequence_numbers: vec![2, 3], + }, + proto::SequencerWrite { + sequencer_id: 10, + sequence_numbers: vec![20], + }, + ], + }; + + assert_eq!(summary, expected); + } + + #[test] + fn different_order() { + // order in sequences shouldn't matter + let metas1 = vec![vec![ + make_meta(Sequence::new(1, 2)), + make_meta(Sequence::new(2, 3)), + ]]; + + // order in sequences shouldn't matter + let metas2 = vec![vec![ + make_meta(Sequence::new(2, 3)), + make_meta(Sequence::new(1, 2)), + ]]; + + let summary1: proto::WriteSummary = WriteSummary::new(metas1).into(); + let summary2: proto::WriteSummary = WriteSummary::new(metas2).into(); + + let expected = proto::WriteSummary { + sequencers: vec![ + proto::SequencerWrite { + sequencer_id: 1, + sequence_numbers: vec![2], + }, + proto::SequencerWrite { + sequencer_id: 2, + sequence_numbers: vec![3], + }, + ], + }; + + assert_eq!(summary1, expected); + assert_eq!(summary2, expected); + } + + #[test] + fn token_creation() { + let metas = vec![vec![make_meta(Sequence::new(1, 2))]]; + let summary = WriteSummary::new(metas.clone()); + let summary_copy = WriteSummary::new(metas); + + let metas2 = vec![vec![make_meta(Sequence::new(2, 3))]]; + let summary2 = WriteSummary::new(metas2); + + let token = summary.to_token(); + + // non empty + assert!(!token.is_empty()); + + // same when created with same metas + assert_eq!(token, summary_copy.to_token()); + + // different when created with different metas + assert_ne!(token, summary2.to_token()); + + assert!( + !token.contains("sequenceNumbers"), + "token not obscured: {}", + token + ); + assert!( + !token.contains("sequencers"), + "token not obscured: {}", + token + ); + } + + fn make_meta(s: Sequence) -> DmlMeta { + use time::TimeProvider; + let time_provider = time::SystemProvider::new(); + + let span_context = None; + let bytes_read = 132; + DmlMeta::sequenced(s, time_provider.now(), span_context, bytes_read) + } +} From 0892ccf7fb3b9b3bc0a57b2383e4216b66af0aff Mon Sep 17 00:00:00 2001 From: Paul Dix Date: Sat, 2 Apr 2022 14:23:33 -0400 Subject: [PATCH 03/12] fix: compactor use join_all (#4211) I forgot to address this in #4139. Have the compactor use join and make sure the error gets logged. --- compactor/src/handler.rs | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/compactor/src/handler.rs b/compactor/src/handler.rs index a9ed4c1f1c..c63c7b7005 100644 --- a/compactor/src/handler.rs +++ b/compactor/src/handler.rs @@ -172,9 +172,15 @@ async fn run_compactor(compactor: Arc, shutdown: CancellationToken) { let compactor = Arc::clone(&compactor); let partition_id = c.partition_id; let handle = tokio::task::spawn(async move { - compactor + if let Err(e) = compactor .compact_partition(partition_id, max_file_size) .await + { + warn!( + "compaction on partition {} failed with: {:?}", + partition_id, e + ); + } }); used_size += c.file_size_bytes; handles.push(handle); @@ -185,9 +191,7 @@ async fn run_compactor(compactor: Arc, shutdown: CancellationToken) { let compactions_run = handles.len(); - if let Err(e) = futures::future::try_join_all(handles).await { - warn!("error compacting: {}", e); - } + let _ = futures::future::join_all(handles).await; // if all candidate partitions have been compacted, wait a bit before checking again if compactions_run == candidates.len() { From edda409b19aec4aa3045d97a6a86fa9a87836e0d Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Sun, 3 Apr 2022 06:42:22 -0400 Subject: [PATCH 04/12] refactor: Extract `ioxd_test`, `ioxd_compactor`, `ioxd_ingester`; remove `ioxd` (#4210) * refactor: Extract test, compactor, ingester, and test * chore: Run cargo hakari tasks Co-authored-by: CircleCI[bot] Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com> --- Cargo.lock | 143 +++++++++++------- Cargo.toml | 4 +- influxdb_iox/Cargo.toml | 4 +- influxdb_iox/src/commands/run/all_in_one.rs | 14 +- influxdb_iox/src/commands/run/compactor.rs | 5 +- influxdb_iox/src/commands/run/database.rs | 2 +- influxdb_iox/src/commands/run/ingester.rs | 5 +- influxdb_iox/src/commands/run/main.rs | 2 +- influxdb_iox/src/commands/run/querier.rs | 2 +- influxdb_iox/src/commands/run/router.rs | 2 +- influxdb_iox/src/commands/run/router2.rs | 2 +- influxdb_iox/src/commands/run/test.rs | 7 +- ioxd/Cargo.toml | 66 -------- ioxd/src/server_type/mod.rs | 3 - ioxd_common/src/lib.rs | 3 + ioxd/src/lib.rs => ioxd_common/src/service.rs | 3 +- ioxd_compactor/Cargo.toml | 34 +++++ .../compactor.rs => ioxd_compactor/src/lib.rs | 0 ioxd_ingester/Cargo.toml | 35 +++++ .../ingester.rs => ioxd_ingester/src/lib.rs | 0 ioxd_test/Cargo.toml | 29 ++++ .../test.rs => ioxd_test/src/lib.rs | 0 22 files changed, 211 insertions(+), 154 deletions(-) delete mode 100644 ioxd/Cargo.toml delete mode 100644 ioxd/src/server_type/mod.rs rename ioxd/src/lib.rs => ioxd_common/src/service.rs (93%) create mode 100644 ioxd_compactor/Cargo.toml rename ioxd/src/server_type/compactor.rs => ioxd_compactor/src/lib.rs (100%) create mode 100644 ioxd_ingester/Cargo.toml rename ioxd/src/server_type/ingester.rs => ioxd_ingester/src/lib.rs (100%) create mode 100644 ioxd_test/Cargo.toml rename ioxd/src/server_type/test.rs => ioxd_test/src/lib.rs (100%) diff --git a/Cargo.lock b/Cargo.lock index 3ef023dbfc..ca122ad179 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2145,12 +2145,14 @@ dependencies = [ "iox_catalog", "iox_object_store", "iox_tests", - "ioxd", "ioxd_common", + "ioxd_compactor", "ioxd_database", + "ioxd_ingester", "ioxd_querier", "ioxd_router", "ioxd_router2", + "ioxd_test", "itertools", "job_registry", "libc", @@ -2502,63 +2504,6 @@ dependencies = [ "workspace-hack", ] -[[package]] -name = "ioxd" -version = "0.1.0" -dependencies = [ - "ansi_term", - "arrow-flight", - "arrow_util", - "assert_cmd", - "async-trait", - "clap 3.1.7", - "clap_blocks", - "compactor", - "data_types2", - "dml", - "futures", - "generated_types", - "hashbrown 0.12.0", - "http", - "hyper", - "ingester", - "iox_catalog", - "iox_tests", - "ioxd_common", - "metric", - "metric_exporters", - "mutable_batch", - "num_cpus", - "object_store", - "observability_deps", - "panic_logging", - "pprof 0.7.0", - "query", - "router", - "router2", - "server", - "service_common", - "service_grpc_flight", - "service_grpc_influxrpc", - "service_grpc_testing", - "snafu", - "tempfile", - "test_helpers", - "thiserror", - "time 0.1.0", - "tokio", - "tokio-stream", - "tokio-util 0.7.1", - "tonic", - "tonic-health", - "tonic-reflection", - "trace", - "trace_exporters", - "trace_http", - "workspace-hack", - "write_buffer", -] - [[package]] name = "ioxd_common" version = "0.1.0" @@ -2600,6 +2545,35 @@ dependencies = [ "workspace-hack", ] +[[package]] +name = "ioxd_compactor" +version = "0.1.0" +dependencies = [ + "async-trait", + "clap_blocks", + "compactor", + "data_types2", + "generated_types", + "hyper", + "iox_catalog", + "ioxd_common", + "metric", + "object_store", + "query", + "service_grpc_testing", + "thiserror", + "time 0.1.0", + "tokio", + "tokio-stream", + "tokio-util 0.7.1", + "tonic", + "tonic-health", + "tonic-reflection", + "trace", + "trace_http", + "workspace-hack", +] + [[package]] name = "ioxd_database" version = "0.1.0" @@ -2654,6 +2628,36 @@ dependencies = [ "workspace-hack", ] +[[package]] +name = "ioxd_ingester" +version = "0.1.0" +dependencies = [ + "async-trait", + "clap_blocks", + "data_types2", + "generated_types", + "hyper", + "ingester", + "iox_catalog", + "ioxd_common", + "metric", + "object_store", + "query", + "service_grpc_testing", + "thiserror", + "time 0.1.0", + "tokio", + "tokio-stream", + "tokio-util 0.7.1", + "tonic", + "tonic-health", + "tonic-reflection", + "trace", + "trace_http", + "workspace-hack", + "write_buffer", +] + [[package]] name = "ioxd_querier" version = "0.1.0" @@ -2750,6 +2754,29 @@ dependencies = [ "write_summary", ] +[[package]] +name = "ioxd_test" +version = "0.1.0" +dependencies = [ + "async-trait", + "clap 3.1.7", + "generated_types", + "hyper", + "ioxd_common", + "metric", + "service_grpc_testing", + "snafu", + "tokio", + "tokio-stream", + "tokio-util 0.7.1", + "tonic", + "tonic-health", + "tonic-reflection", + "trace", + "trace_http", + "workspace-hack", +] + [[package]] name = "ipnet" version = "2.4.0" diff --git a/Cargo.toml b/Cargo.toml index 409b6c9bc0..db88d23849 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,12 +30,14 @@ members = [ "iox_gitops_adapter", "iox_object_store", "iox_tests", - "ioxd", "ioxd_common", + "ioxd_compactor", + "ioxd_ingester", "ioxd_querier", "ioxd_database", "ioxd_router", "ioxd_router2", + "ioxd_test", "job_registry", "lifecycle", "logfmt", diff --git a/influxdb_iox/Cargo.toml b/influxdb_iox/Cargo.toml index 05235ef7d5..e9c8293eac 100644 --- a/influxdb_iox/Cargo.toml +++ b/influxdb_iox/Cargo.toml @@ -24,11 +24,13 @@ influxrpc_parser = { path = "../influxrpc_parser"} iox_catalog = { path = "../iox_catalog" } iox_object_store = { path = "../iox_object_store" } ioxd_common = { path = "../ioxd_common"} +ioxd_compactor = { path = "../ioxd_compactor"} ioxd_database = { path = "../ioxd_database"} +ioxd_ingester = { path = "../ioxd_ingester"} ioxd_router = { path = "../ioxd_router"} ioxd_router2 = { path = "../ioxd_router2"} ioxd_querier = { path = "../ioxd_querier"} -ioxd = { path = "../ioxd", default-features = false } +ioxd_test = { path = "../ioxd_test"} job_registry = { path = "../job_registry" } logfmt = { path = "../logfmt" } metric = { path = "../metric" } diff --git a/influxdb_iox/src/commands/run/all_in_one.rs b/influxdb_iox/src/commands/run/all_in_one.rs index 5dea197e71..121e62af7b 100644 --- a/influxdb_iox/src/commands/run/all_in_one.rs +++ b/influxdb_iox/src/commands/run/all_in_one.rs @@ -10,15 +10,11 @@ use clap_blocks::{ write_buffer::WriteBufferConfig, }; use ioxd_common::server_type::{CommonServerState, CommonServerStateError}; - +use ioxd_common::Service; +use ioxd_compactor::create_compactor_server_type; +use ioxd_ingester::create_ingester_server_type; use ioxd_querier::create_querier_server_type; use ioxd_router2::create_router2_server_type; - -use ioxd::{ - self, - server_type::{compactor::create_compactor_server_type, ingester::create_ingester_server_type}, - Service, -}; use object_store::{DynObjectStore, ObjectStoreImpl}; use observability_deps::tracing::*; use query::exec::Executor; @@ -60,10 +56,10 @@ pub enum Error { Router2(#[from] ioxd_router2::Error), #[error("Ingester error: {0}")] - Ingester(#[from] ioxd::server_type::ingester::Error), + Ingester(#[from] ioxd_ingester::Error), #[error("error initializing compactor: {0}")] - Compactor(#[from] ioxd::server_type::compactor::Error), + Compactor(#[from] ioxd_compactor::Error), #[error("Invalid config: {0}")] InvalidConfig(#[from] CommonServerStateError), diff --git a/influxdb_iox/src/commands/run/compactor.rs b/influxdb_iox/src/commands/run/compactor.rs index 6bc1cde728..1fc8657561 100644 --- a/influxdb_iox/src/commands/run/compactor.rs +++ b/influxdb_iox/src/commands/run/compactor.rs @@ -10,8 +10,9 @@ use time::SystemProvider; use clap_blocks::{ catalog_dsn::CatalogDsnConfig, compactor::CompactorConfig, run_config::RunConfig, }; -use ioxd::{self, server_type::compactor::create_compactor_server_type, Service}; use ioxd_common::server_type::{CommonServerState, CommonServerStateError}; +use ioxd_common::Service; +use ioxd_compactor::create_compactor_server_type; use super::main; @@ -33,7 +34,7 @@ pub enum Error { ObjectStoreParsing(#[from] clap_blocks::object_store::ParseError), #[error("error initializing compactor: {0}")] - Compactor(#[from] ioxd::server_type::compactor::Error), + Compactor(#[from] ioxd_compactor::Error), } #[derive(Debug, clap::Parser)] diff --git a/influxdb_iox/src/commands/run/database.rs b/influxdb_iox/src/commands/run/database.rs index ee35ad33d6..f57cb852ff 100644 --- a/influxdb_iox/src/commands/run/database.rs +++ b/influxdb_iox/src/commands/run/database.rs @@ -4,8 +4,8 @@ use std::sync::Arc; use clap_blocks::run_config::RunConfig; use data_types::boolean_flag::BooleanFlag; -use ioxd::{self, Service}; use ioxd_common::server_type::{CommonServerState, CommonServerStateError}; +use ioxd_common::Service; use ioxd_database::{ setup::{make_application, make_server}, DatabaseServerType, diff --git a/influxdb_iox/src/commands/run/ingester.rs b/influxdb_iox/src/commands/run/ingester.rs index a28e936c1e..4390a947f0 100644 --- a/influxdb_iox/src/commands/run/ingester.rs +++ b/influxdb_iox/src/commands/run/ingester.rs @@ -4,8 +4,9 @@ use clap_blocks::{ catalog_dsn::CatalogDsnConfig, ingester::IngesterConfig, run_config::RunConfig, write_buffer::WriteBufferConfig, }; -use ioxd::{self, server_type::ingester::create_ingester_server_type, Service}; use ioxd_common::server_type::{CommonServerState, CommonServerStateError}; +use ioxd_common::Service; +use ioxd_ingester::create_ingester_server_type; use object_store::{instrumentation::ObjectStoreMetrics, DynObjectStore, ObjectStoreImpl}; use observability_deps::tracing::*; use query::exec::Executor; @@ -26,7 +27,7 @@ pub enum Error { ObjectStoreParsing(#[from] clap_blocks::object_store::ParseError), #[error("error initializing ingester: {0}")] - Ingester(#[from] ioxd::server_type::ingester::Error), + Ingester(#[from] ioxd_ingester::Error), #[error("Catalog DSN error: {0}")] CatalogDsn(#[from] clap_blocks::catalog_dsn::Error), diff --git a/influxdb_iox/src/commands/run/main.rs b/influxdb_iox/src/commands/run/main.rs index a206feddd2..88302f009b 100644 --- a/influxdb_iox/src/commands/run/main.rs +++ b/influxdb_iox/src/commands/run/main.rs @@ -1,4 +1,4 @@ -use ioxd::Service; +use ioxd_common::Service; use ioxd_common::{grpc_listener, http_listener, serve, server_type::CommonServerState}; use observability_deps::tracing::{error, info}; use panic_logging::SendPanicsToTracing; diff --git a/influxdb_iox/src/commands/run/querier.rs b/influxdb_iox/src/commands/run/querier.rs index 9a28af4705..efbd383dd5 100644 --- a/influxdb_iox/src/commands/run/querier.rs +++ b/influxdb_iox/src/commands/run/querier.rs @@ -8,8 +8,8 @@ use thiserror::Error; use time::SystemProvider; use clap_blocks::{catalog_dsn::CatalogDsnConfig, run_config::RunConfig}; -use ioxd::{self, Service}; use ioxd_common::server_type::{CommonServerState, CommonServerStateError}; +use ioxd_common::Service; use ioxd_querier::create_querier_server_type; use super::main; diff --git a/influxdb_iox/src/commands/run/router.rs b/influxdb_iox/src/commands/run/router.rs index 38400deec7..25dbb17b60 100644 --- a/influxdb_iox/src/commands/run/router.rs +++ b/influxdb_iox/src/commands/run/router.rs @@ -7,8 +7,8 @@ use clap_blocks::run_config::RunConfig; use data_types::router::Router as RouterConfig; use generated_types::{google::FieldViolation, influxdata::iox::router::v1::RouterConfigFile}; -use ioxd::{self, Service}; use ioxd_common::server_type::{CommonServerState, CommonServerStateError}; +use ioxd_common::Service; use ioxd_router::RouterServerType; use observability_deps::tracing::warn; use router::{resolver::RemoteTemplate, server::RouterServer}; diff --git a/influxdb_iox/src/commands/run/router2.rs b/influxdb_iox/src/commands/run/router2.rs index 1b24c370a4..008deac261 100644 --- a/influxdb_iox/src/commands/run/router2.rs +++ b/influxdb_iox/src/commands/run/router2.rs @@ -6,8 +6,8 @@ use clap_blocks::{ catalog_dsn::CatalogDsnConfig, run_config::RunConfig, write_buffer::WriteBufferConfig, }; -use ioxd::{self, Service}; use ioxd_common::server_type::{CommonServerState, CommonServerStateError}; +use ioxd_common::Service; use ioxd_router2::create_router2_server_type; use observability_deps::tracing::*; use thiserror::Error; diff --git a/influxdb_iox/src/commands/run/test.rs b/influxdb_iox/src/commands/run/test.rs index 7576cf6a8f..b2ac0a0924 100644 --- a/influxdb_iox/src/commands/run/test.rs +++ b/influxdb_iox/src/commands/run/test.rs @@ -3,12 +3,9 @@ use std::sync::Arc; use clap_blocks::run_config::RunConfig; -use ioxd::{ - self, - server_type::test::{TestAction, TestServerType}, - Service, -}; use ioxd_common::server_type::{CommonServerState, CommonServerStateError}; +use ioxd_common::Service; +use ioxd_test::{TestAction, TestServerType}; use metric::Registry; use thiserror::Error; diff --git a/ioxd/Cargo.toml b/ioxd/Cargo.toml deleted file mode 100644 index 13af5fe08a..0000000000 --- a/ioxd/Cargo.toml +++ /dev/null @@ -1,66 +0,0 @@ -[package] -name = "ioxd" -version = "0.1.0" -edition = "2021" - - -[dependencies] -# Workspace dependencies, in alphabetical order -clap_blocks = { path = "../clap_blocks" } -compactor = { path = "../compactor" } -data_types2 = { path = "../data_types2" } -dml = { path = "../dml" } -generated_types = { path = "../generated_types" } -ingester = { path = "../ingester" } -iox_catalog = { path = "../iox_catalog" } -ioxd_common = { path = "../ioxd_common" } -metric = { path = "../metric" } -metric_exporters = { path = "../metric_exporters" } -mutable_batch = { path = "../mutable_batch" } -object_store = { path = "../object_store" } -observability_deps = { path = "../observability_deps" } -panic_logging = { path = "../panic_logging" } -query = { path = "../query" } -router = { path = "../router" } -router2 = { path = "../router2" } -server = { path = "../server" } -service_common = { path = "../service_common" } -service_grpc_flight = { path = "../service_grpc_flight" } -service_grpc_influxrpc = { path = "../service_grpc_influxrpc" } -service_grpc_testing = { path = "../service_grpc_testing" } -time = { path = "../time" } -trace = { path = "../trace" } -trace_exporters = { path = "../trace_exporters" } -trace_http = { path = "../trace_http" } -write_buffer = { path = "../write_buffer" } - -# Crates.io dependencies, in alphabetical order -ansi_term = "0.12" -arrow-flight = "11" -async-trait = "0.1" -clap = { version = "3", features = ["derive", "env"] } -futures = "0.3" -hashbrown = "0.12" -http = "0.2.0" -hyper = "0.14" -num_cpus = "1.13.0" -pprof = { version = "0.7", default-features = false, features = ["flamegraph", "prost-codec"], optional = true } -snafu = "0.7" -thiserror = "1.0.30" -tokio = { version = "1.17", features = ["macros", "net", "parking_lot", "rt-multi-thread", "signal", "sync", "time"] } -tokio-stream = { version = "0.1", features = ["net"] } -tokio-util = { version = "0.7.1" } -tonic = "0.6" -tonic-health = "0.5.0" -tonic-reflection = "0.3.0" -workspace-hack = { path = "../workspace-hack"} - -[dev-dependencies] -# Workspace dependencies, in alphabetical order -arrow_util = { path = "../arrow_util" } -test_helpers = { path = "../test_helpers" } -iox_tests = { path = "../iox_tests" } - -# Crates.io dependencies, in alphabetical order -assert_cmd = "2.0.2" -tempfile = "3.1.0" diff --git a/ioxd/src/server_type/mod.rs b/ioxd/src/server_type/mod.rs deleted file mode 100644 index 5fba4d246f..0000000000 --- a/ioxd/src/server_type/mod.rs +++ /dev/null @@ -1,3 +0,0 @@ -pub mod compactor; -pub mod ingester; -pub mod test; diff --git a/ioxd_common/src/lib.rs b/ioxd_common/src/lib.rs index 19db29e64d..786bc36591 100644 --- a/ioxd_common/src/lib.rs +++ b/ioxd_common/src/lib.rs @@ -1,6 +1,9 @@ pub mod http; pub mod rpc; pub mod server_type; +mod service; + +pub use service::Service; use crate::server_type::{CommonServerState, ServerType}; use futures::{future::FusedFuture, pin_mut, FutureExt}; diff --git a/ioxd/src/lib.rs b/ioxd_common/src/service.rs similarity index 93% rename from ioxd/src/lib.rs rename to ioxd_common/src/service.rs index a8111c839b..d4af18a2cf 100644 --- a/ioxd/src/lib.rs +++ b/ioxd_common/src/service.rs @@ -1,9 +1,8 @@ use std::sync::Arc; use clap_blocks::{run_config::RunConfig, socket_addr::SocketAddr}; -use ioxd_common::server_type::ServerType; -pub mod server_type; +use crate::server_type::ServerType; /// A service that will start on the specified addresses pub struct Service { diff --git a/ioxd_compactor/Cargo.toml b/ioxd_compactor/Cargo.toml new file mode 100644 index 0000000000..a3d8715fcb --- /dev/null +++ b/ioxd_compactor/Cargo.toml @@ -0,0 +1,34 @@ +[package] +name = "ioxd_compactor" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +# Workspace dependencies, in alphabetical order +clap_blocks = { path = "../clap_blocks" } +compactor = { path = "../compactor" } +data_types2 = { path = "../data_types2" } +generated_types = { path = "../generated_types" } +iox_catalog = { path = "../iox_catalog" } +ioxd_common = { path = "../ioxd_common" } +metric = { path = "../metric" } +query = { path = "../query" } +object_store = { path = "../object_store" } +service_grpc_testing = { path = "../service_grpc_testing" } +time = { path = "../time" } +trace = { path = "../trace" } +trace_http = { path = "../trace_http" } + +# Crates.io dependencies, in alphabetical order +async-trait = "0.1" +hyper = "0.14" +thiserror = "1.0.30" +tokio = { version = "1.17", features = ["macros", "net", "parking_lot", "rt-multi-thread", "signal", "sync", "time"] } +tokio-stream = { version = "0.1", features = ["net"] } +tokio-util = { version = "0.7.1" } +tonic = "0.6" +tonic-health = "0.5.0" +tonic-reflection = "0.3.0" +workspace-hack = { path = "../workspace-hack"} diff --git a/ioxd/src/server_type/compactor.rs b/ioxd_compactor/src/lib.rs similarity index 100% rename from ioxd/src/server_type/compactor.rs rename to ioxd_compactor/src/lib.rs diff --git a/ioxd_ingester/Cargo.toml b/ioxd_ingester/Cargo.toml new file mode 100644 index 0000000000..1dd202257d --- /dev/null +++ b/ioxd_ingester/Cargo.toml @@ -0,0 +1,35 @@ +[package] +name = "ioxd_ingester" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +# Workspace dependencies, in alphabetical order +clap_blocks = { path = "../clap_blocks" } +data_types2 = { path = "../data_types2" } +generated_types = { path = "../generated_types" } +ingester = { path = "../ingester" } +iox_catalog = { path = "../iox_catalog" } +ioxd_common = { path = "../ioxd_common" } +metric = { path = "../metric" } +object_store = { path = "../object_store" } +query = { path = "../query" } +service_grpc_testing = { path = "../service_grpc_testing" } +time = { path = "../time" } +trace = { path = "../trace" } +trace_http = { path = "../trace_http" } +write_buffer = { path = "../write_buffer" } + +# Crates.io dependencies, in alphabetical order +async-trait = "0.1" +hyper = "0.14" +thiserror = "1.0.30" +tokio = { version = "1.17", features = ["macros", "net", "parking_lot", "rt-multi-thread", "signal", "sync", "time"] } +tokio-stream = { version = "0.1", features = ["net"] } +tokio-util = { version = "0.7.1" } +tonic = "0.6" +tonic-health = "0.5.0" +tonic-reflection = "0.3.0" +workspace-hack = { path = "../workspace-hack"} diff --git a/ioxd/src/server_type/ingester.rs b/ioxd_ingester/src/lib.rs similarity index 100% rename from ioxd/src/server_type/ingester.rs rename to ioxd_ingester/src/lib.rs diff --git a/ioxd_test/Cargo.toml b/ioxd_test/Cargo.toml new file mode 100644 index 0000000000..88072af18d --- /dev/null +++ b/ioxd_test/Cargo.toml @@ -0,0 +1,29 @@ +[package] +name = "ioxd_test" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +# Workspace dependencies, in alphabetical order +generated_types = { path = "../generated_types" } +ioxd_common = { path = "../ioxd_common" } +metric = { path = "../metric" } +service_grpc_testing = { path = "../service_grpc_testing" } +trace = { path = "../trace" } +trace_http = { path = "../trace_http" } + + +# Crates.io dependencies, in alphabetical order +async-trait = "0.1" +clap = { version = "3", features = ["derive", "env"] } +hyper = "0.14" +snafu = "0.7" +tokio = { version = "1.17", features = ["macros", "net", "parking_lot", "rt-multi-thread", "signal", "sync", "time"] } +tokio-stream = { version = "0.1", features = ["net"] } +tokio-util = { version = "0.7.1" } +tonic = "0.6" +tonic-health = "0.5.0" +tonic-reflection = "0.3.0" +workspace-hack = { path = "../workspace-hack"} diff --git a/ioxd/src/server_type/test.rs b/ioxd_test/src/lib.rs similarity index 100% rename from ioxd/src/server_type/test.rs rename to ioxd_test/src/lib.rs From 1edd89eb67d33a7ded5557b4b9d9f93804e5d699 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 4 Apr 2022 10:36:58 +0000 Subject: [PATCH 05/12] chore(deps): Bump clap from 3.1.7 to 3.1.8 (#4221) Bumps [clap](https://github.com/clap-rs/clap) from 3.1.7 to 3.1.8. - [Release notes](https://github.com/clap-rs/clap/releases) - [Changelog](https://github.com/clap-rs/clap/blob/master/CHANGELOG.md) - [Commits](https://github.com/clap-rs/clap/compare/v3.1.7...v3.1.8) --- updated-dependencies: - dependency-name: clap dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- Cargo.lock | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ca122ad179..5bd687c2b1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -615,9 +615,9 @@ dependencies = [ [[package]] name = "clap" -version = "3.1.7" +version = "3.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c67e7973e74896f4bba06ca2dcfd28d54f9cb8c035e940a32b88ed48f5f5ecf2" +checksum = "71c47df61d9e16dc010b55dba1952a57d8c215dbb533fd13cdd13369aac73b1c" dependencies = [ "atty", "bitflags", @@ -634,7 +634,7 @@ dependencies = [ name = "clap_blocks" version = "0.1.0" dependencies = [ - "clap 3.1.7", + "clap 3.1.8", "data_types", "futures", "iox_catalog", @@ -2116,7 +2116,7 @@ dependencies = [ "byteorder", "bytes", "chrono", - "clap 3.1.7", + "clap 3.1.8", "clap_blocks", "comfy-table", "compactor", @@ -2413,7 +2413,7 @@ version = "0.1.0" dependencies = [ "chrono", "chrono-english", - "clap 3.1.7", + "clap 3.1.8", "criterion", "data_types", "futures", @@ -2442,7 +2442,7 @@ dependencies = [ "assert_matches", "async-trait", "chrono", - "clap 3.1.7", + "clap 3.1.8", "dotenv", "futures", "glob", @@ -2511,7 +2511,7 @@ dependencies = [ "async-trait", "bytes", "chrono", - "clap 3.1.7", + "clap 3.1.8", "clap_blocks", "data_types", "dml", @@ -2583,7 +2583,7 @@ dependencies = [ "arrow_util", "async-trait", "bytes", - "clap 3.1.7", + "clap 3.1.8", "clap_blocks", "data_types", "db", @@ -2759,7 +2759,7 @@ name = "ioxd_test" version = "0.1.0" dependencies = [ "async-trait", - "clap 3.1.7", + "clap 3.1.8", "generated_types", "hyper", "ioxd_common", @@ -6260,7 +6260,7 @@ version = "0.1.0" dependencies = [ "async-trait", "chrono", - "clap 3.1.7", + "clap 3.1.8", "futures", "observability_deps", "snafu", @@ -6405,7 +6405,7 @@ dependencies = [ name = "trogging" version = "0.1.0" dependencies = [ - "clap 3.1.7", + "clap 3.1.8", "logfmt", "observability_deps", "regex", From 36dd6f26a39bd984d6faf1b5691d12eea1d48db2 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 4 Apr 2022 10:45:31 +0000 Subject: [PATCH 06/12] chore(deps): Bump pbjson-build from 0.2.3 to 0.3.0 (#4220) Bumps [pbjson-build](https://github.com/influxdata/pbjson) from 0.2.3 to 0.3.0. - [Release notes](https://github.com/influxdata/pbjson/releases) - [Commits](https://github.com/influxdata/pbjson/compare/0.2.3...0.3.0) --- updated-dependencies: - dependency-name: pbjson-build dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com> --- Cargo.lock | 129 +++++++++++++++++++++++----------- generated_types/Cargo.toml | 2 +- iox_gitops_adapter/Cargo.toml | 2 +- 3 files changed, 89 insertions(+), 44 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5bd687c2b1..498a47789f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -130,8 +130,8 @@ dependencies = [ "bytes", "futures", "proc-macro2", - "prost", - "prost-derive", + "prost 0.9.0", + "prost-derive 0.9.0", "tokio", "tonic", "tonic-build", @@ -670,7 +670,7 @@ name = "client_util" version = "0.1.0" dependencies = [ "http", - "prost", + "prost 0.9.0", "thiserror", "tokio", "tonic", @@ -1684,10 +1684,10 @@ dependencies = [ "num_cpus", "observability_deps", "pbjson", - "pbjson-build", + "pbjson-build 0.3.0", "pbjson-types", "predicate", - "prost", + "prost 0.9.0", "prost-build", "regex", "serde", @@ -1751,9 +1751,9 @@ dependencies = [ "grpc-router-test-gen", "observability_deps", "paste", - "prost", + "prost 0.9.0", "prost-build", - "prost-types", + "prost-types 0.9.0", "thiserror", "tokio", "tokio-stream", @@ -1767,9 +1767,9 @@ dependencies = [ name = "grpc-router-test-gen" version = "0.1.0" dependencies = [ - "prost", + "prost 0.9.0", "prost-build", - "prost-types", + "prost-types 0.9.0", "tonic", "tonic-build", ] @@ -2177,7 +2177,7 @@ dependencies = [ "pprof 0.7.0", "predicate", "predicates", - "prost", + "prost 0.9.0", "querier", "query", "rand", @@ -2233,7 +2233,7 @@ dependencies = [ "mutable_batch", "mutable_batch_lp", "mutable_batch_pb", - "prost", + "prost 0.9.0", "rand", "serde", "serde_json", @@ -2262,7 +2262,7 @@ dependencies = [ "client_util", "futures-util", "generated_types", - "prost", + "prost 0.9.0", "tonic", "workspace-hack", ] @@ -2325,7 +2325,7 @@ dependencies = [ "parquet_file", "pin-project", "predicate", - "prost", + "prost 0.9.0", "query", "schema", "snafu", @@ -2451,8 +2451,8 @@ dependencies = [ "kube-derive", "kube-runtime", "parking_lot 0.11.2", - "pbjson-build", - "prost", + "pbjson-build 0.3.0", + "prost 0.9.0", "schemars", "serde", "serde_json", @@ -2527,7 +2527,7 @@ dependencies = [ "observability_deps", "parking_lot 0.12.0", "predicate", - "prost", + "prost 0.9.0", "reqwest", "serde", "serde_json", @@ -2600,7 +2600,7 @@ dependencies = [ "mutable_batch_pb", "object_store", "observability_deps", - "prost", + "prost 0.9.0", "query", "reqwest", "schema", @@ -3401,7 +3401,7 @@ dependencies = [ "mutable_batch", "mutable_batch_lp", "mutable_batch_pb", - "prost", + "prost 0.9.0", ] [[package]] @@ -3859,7 +3859,7 @@ dependencies = [ "pbjson-types", "persistence_windows", "predicate", - "prost", + "prost 0.9.0", "schema", "snafu", "tempfile", @@ -3896,7 +3896,7 @@ dependencies = [ "pbjson-types", "persistence_windows", "predicate", - "prost", + "prost 0.9.0", "schema", "snafu", "tempfile", @@ -3934,8 +3934,20 @@ checksum = "f7ded6959888ee91bc803eb467411e416181ce68c125955338c2ad7dfbfa610d" dependencies = [ "heck 0.4.0", "itertools", - "prost", - "prost-types", + "prost 0.9.0", + "prost-types 0.9.0", +] + +[[package]] +name = "pbjson-build" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "956a449e8a85fc040e9f8cd8fd4dd2e68059d179092401f0d8570ba059f76dae" +dependencies = [ + "heck 0.4.0", + "itertools", + "prost 0.10.0", + "prost-types 0.10.0", ] [[package]] @@ -3947,8 +3959,8 @@ dependencies = [ "bytes", "chrono", "pbjson", - "pbjson-build", - "prost", + "pbjson-build 0.2.3", + "prost 0.9.0", "prost-build", "serde", ] @@ -4132,9 +4144,9 @@ dependencies = [ "log", "nix", "parking_lot 0.11.2", - "prost", + "prost 0.9.0", "prost-build", - "prost-derive", + "prost-derive 0.9.0", "smallvec", "symbolic-demangle", "tempfile", @@ -4156,9 +4168,9 @@ dependencies = [ "nix", "once_cell", "parking_lot 0.12.0", - "prost", + "prost 0.9.0", "prost-build", - "prost-derive", + "prost-derive 0.9.0", "smallvec", "symbolic-demangle", "tempfile", @@ -4307,7 +4319,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "444879275cb4fd84958b1a1d5420d15e6fcf7c235fe47f053c9c2a80aceb6001" dependencies = [ "bytes", - "prost-derive", + "prost-derive 0.9.0", +] + +[[package]] +name = "prost" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1bd5316aa8f5c82add416dfbc25116b84b748a21153f512917e8143640a71bbd" +dependencies = [ + "bytes", + "prost-derive 0.10.0", ] [[package]] @@ -4323,8 +4345,8 @@ dependencies = [ "log", "multimap", "petgraph", - "prost", - "prost-types", + "prost 0.9.0", + "prost-types 0.9.0", "regex", "tempfile", "which", @@ -4343,6 +4365,19 @@ dependencies = [ "syn", ] +[[package]] +name = "prost-derive" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df35198f0777b75e9ff669737c6da5136b59dba33cf5a010a6d1cc4d56defc6f" +dependencies = [ + "anyhow", + "itertools", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "prost-types" version = "0.9.0" @@ -4350,7 +4385,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "534b7a0e836e3c482d2693070f982e39e7611da9695d4d1f5a4b186b51faef0a" dependencies = [ "bytes", - "prost", + "prost 0.9.0", +] + +[[package]] +name = "prost-types" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "926681c118ae6e512a3ccefd4abbe5521a14f4cc1e207356d4d00c0b7f2006fd" +dependencies = [ + "bytes", + "prost 0.10.0", ] [[package]] @@ -4381,7 +4426,7 @@ dependencies = [ "pin-project", "predicate", "proptest", - "prost", + "prost 0.9.0", "query", "rand", "schema", @@ -5376,7 +5421,7 @@ dependencies = [ "panic_logging", "parking_lot 0.12.0", "predicate", - "prost", + "prost 0.9.0", "query", "regex", "schema", @@ -6137,8 +6182,8 @@ dependencies = [ "hyper-timeout", "percent-encoding", "pin-project", - "prost", - "prost-derive", + "prost 0.9.0", + "prost-derive 0.9.0", "tokio", "tokio-stream", "tokio-util 0.6.9", @@ -6169,7 +6214,7 @@ checksum = "7ae388bee1d4e52c9dc334f0d5918757b07b3ffafafd7953d254c7a0e8605e02" dependencies = [ "async-stream", "bytes", - "prost", + "prost 0.9.0", "tokio", "tokio-stream", "tonic", @@ -6183,8 +6228,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "228cc5aa5d3e6e0624b5f756a7558038ee86428d1d58d8c6e551b389b12cf355" dependencies = [ "bytes", - "prost", - "prost-types", + "prost 0.9.0", + "prost-types 0.9.0", "tokio", "tokio-stream", "tonic", @@ -6863,8 +6908,8 @@ dependencies = [ "once_cell", "parquet", "predicates", - "prost", - "prost-types", + "prost 0.9.0", + "prost-types 0.9.0", "rand", "regex", "regex-automata", @@ -6916,7 +6961,7 @@ dependencies = [ "observability_deps", "parking_lot 0.12.0", "pin-project", - "prost", + "prost 0.9.0", "rskafka", "schema", "tempfile", diff --git a/generated_types/Cargo.toml b/generated_types/Cargo.toml index 3a9f501d7e..698fe04cf3 100644 --- a/generated_types/Cargo.toml +++ b/generated_types/Cargo.toml @@ -27,7 +27,7 @@ num_cpus = "1.13.0" [build-dependencies] # In alphabetical order tonic-build = "0.6" prost-build = "0.9" -pbjson-build = "0.2" +pbjson-build = "0.3" [features] default = ["data_types_conversions"] diff --git a/iox_gitops_adapter/Cargo.toml b/iox_gitops_adapter/Cargo.toml index e0a6d381a6..91d13dc0e4 100644 --- a/iox_gitops_adapter/Cargo.toml +++ b/iox_gitops_adapter/Cargo.toml @@ -36,7 +36,7 @@ trogging = { path = "../trogging", default-features = false, features = ["clap"] [build-dependencies] glob = "0.3.0" -pbjson-build = "0.2" +pbjson-build = "0.3" tonic-build = "0.6" [dev-dependencies] From dc9632114c364906a2ae1a9a028f5fc43d6261c1 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 4 Apr 2022 10:53:31 +0000 Subject: [PATCH 07/12] chore(deps): Bump pretty_assertions from 1.2.0 to 1.2.1 (#4213) Bumps [pretty_assertions](https://github.com/colin-kiegel/rust-pretty-assertions) from 1.2.0 to 1.2.1. - [Release notes](https://github.com/colin-kiegel/rust-pretty-assertions/releases) - [Changelog](https://github.com/colin-kiegel/rust-pretty-assertions/blob/main/CHANGELOG.md) - [Commits](https://github.com/colin-kiegel/rust-pretty-assertions/compare/v1.2.0...v1.2.1) --- updated-dependencies: - dependency-name: pretty_assertions dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com> --- Cargo.lock | 4 ++-- iox_catalog/Cargo.toml | 2 +- router2/Cargo.toml | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 498a47789f..86afc9e7d8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4238,9 +4238,9 @@ dependencies = [ [[package]] name = "pretty_assertions" -version = "1.2.0" +version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "57c038cb5319b9c704bf9c227c261d275bfec0ad438118a2787ce47944fb228b" +checksum = "c89f989ac94207d048d92db058e4f6ec7342b0971fc58d1271ca148b799b3563" dependencies = [ "ansi_term", "ctor", diff --git a/iox_catalog/Cargo.toml b/iox_catalog/Cargo.toml index 7498633f62..9d438a6fa7 100644 --- a/iox_catalog/Cargo.toml +++ b/iox_catalog/Cargo.toml @@ -24,7 +24,7 @@ workspace-hack = { path = "../workspace-hack"} dotenv = "0.15.0" mutable_batch_lp = { path = "../mutable_batch_lp" } paste = "1.0.7" -pretty_assertions = "1.2.0" +pretty_assertions = "1.2.1" rand = "0.8" tempfile = "3" test_helpers = { path = "../test_helpers" } diff --git a/router2/Cargo.toml b/router2/Cargo.toml index fa8519c751..689e80d932 100644 --- a/router2/Cargo.toml +++ b/router2/Cargo.toml @@ -42,7 +42,7 @@ assert_matches = "1.5" criterion = { version = "0.3.4", features = ["async_tokio", "html_reports"] } lazy_static = "1.4.0" paste = "1.0.7" -pretty_assertions = "1.2.0" +pretty_assertions = "1.2.1" rand = "0.8.3" schema = { path = "../schema" } From 4c052be568505b5260d414123b5d97596ace1ed9 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 4 Apr 2022 11:01:14 +0000 Subject: [PATCH 08/12] chore(deps): Bump sqlparser from 0.15.0 to 0.16.0 (#4219) Bumps [sqlparser](https://github.com/sqlparser-rs/sqlparser-rs) from 0.15.0 to 0.16.0. - [Release notes](https://github.com/sqlparser-rs/sqlparser-rs/releases) - [Changelog](https://github.com/sqlparser-rs/sqlparser-rs/blob/main/CHANGELOG.md) - [Commits](https://github.com/sqlparser-rs/sqlparser-rs/compare/v0.15.0...v0.16.0) --- updated-dependencies: - dependency-name: sqlparser dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com> --- Cargo.lock | 19 ++++++++++++++----- influxrpc_parser/Cargo.toml | 2 +- predicate/Cargo.toml | 2 +- 3 files changed, 16 insertions(+), 7 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 86afc9e7d8..118091ad47 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1118,7 +1118,7 @@ dependencies = [ "pin-project-lite", "rand", "smallvec", - "sqlparser", + "sqlparser 0.15.0", "tempfile", "tokio", "tokio-stream", @@ -1133,7 +1133,7 @@ dependencies = [ "arrow", "ordered-float 2.10.0", "parquet", - "sqlparser", + "sqlparser 0.15.0", ] [[package]] @@ -1157,7 +1157,7 @@ dependencies = [ "ahash", "arrow", "datafusion-common", - "sqlparser", + "sqlparser 0.15.0", ] [[package]] @@ -2288,7 +2288,7 @@ version = "0.1.0" dependencies = [ "generated_types", "snafu", - "sqlparser", + "sqlparser 0.16.0", "workspace-hack", ] @@ -4200,7 +4200,7 @@ dependencies = [ "schema", "serde_json", "snafu", - "sqlparser", + "sqlparser 0.16.0", "test_helpers", "tokio", "workspace-hack", @@ -5630,6 +5630,15 @@ dependencies = [ "log", ] +[[package]] +name = "sqlparser" +version = "0.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e9a527b68048eb95495a1508f6c8395c8defcff5ecdbe8ad4106d08a2ef2a3c" +dependencies = [ + "log", +] + [[package]] name = "sqlx" version = "0.5.11" diff --git a/influxrpc_parser/Cargo.toml b/influxrpc_parser/Cargo.toml index efc180d761..7ac5062b9f 100644 --- a/influxrpc_parser/Cargo.toml +++ b/influxrpc_parser/Cargo.toml @@ -4,7 +4,7 @@ version = "0.1.0" edition = "2021" [dependencies] -sqlparser = "0.15.0" +sqlparser = "0.16.0" snafu = "0.7.0" generated_types = { path = "../generated_types" } diff --git a/predicate/Cargo.toml b/predicate/Cargo.toml index c7a213a80e..b6464a831b 100644 --- a/predicate/Cargo.toml +++ b/predicate/Cargo.toml @@ -17,7 +17,7 @@ regex-syntax = "0.6.25" schema = { path = "../schema" } serde_json = "1.0.79" snafu = "0.7" -sqlparser = "0.15.0" +sqlparser = "0.16.0" workspace-hack = { path = "../workspace-hack"} [dev-dependencies] From d19b944ba58030eb8bf66186cced59a5f7a0fcfc Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 4 Apr 2022 11:09:29 +0000 Subject: [PATCH 09/12] chore(deps): Bump tracing-subscriber from 0.3.9 to 0.3.10 (#4222) Bumps [tracing-subscriber](https://github.com/tokio-rs/tracing) from 0.3.9 to 0.3.10. - [Release notes](https://github.com/tokio-rs/tracing/releases) - [Commits](https://github.com/tokio-rs/tracing/compare/tracing-subscriber-0.3.9...tracing-subscriber-0.3.10) --- updated-dependencies: - dependency-name: tracing-subscriber dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com> --- Cargo.lock | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 118091ad47..0c60d1e307 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6410,9 +6410,9 @@ dependencies = [ [[package]] name = "tracing-subscriber" -version = "0.3.9" +version = "0.3.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9e0ab7bdc962035a87fba73f3acca9b8a8d0034c2e6f60b84aeaaddddc155dce" +checksum = "b9df98b037d039d03400d9dd06b0f8ce05486b5f25e9a2d7d36196e142ebbc52" dependencies = [ "ansi_term", "lazy_static", From 26f6a1721faf9240530c99983e9b5cd1f7bc2463 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 4 Apr 2022 11:17:24 +0000 Subject: [PATCH 10/12] chore(deps): Bump tracing-core from 0.1.23 to 0.1.24 (#4217) Bumps [tracing-core](https://github.com/tokio-rs/tracing) from 0.1.23 to 0.1.24. - [Release notes](https://github.com/tokio-rs/tracing/releases) - [Commits](https://github.com/tokio-rs/tracing/compare/tracing-core-0.1.23...tracing-core-0.1.24) --- updated-dependencies: - dependency-name: tracing-core dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com> --- Cargo.lock | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0c60d1e307..9cc7cc574e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6369,9 +6369,9 @@ dependencies = [ [[package]] name = "tracing-core" -version = "0.1.23" +version = "0.1.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aa31669fa42c09c34d94d8165dd2012e8ff3c66aca50f3bb226b68f216f2706c" +checksum = "90442985ee2f57c9e1b548ee72ae842f4a9a20e3f417cc38dbc5dc684d9bb4ee" dependencies = [ "lazy_static", "valuable", From 61bc9c83addabf5f7a6d544ba277adc8560b9731 Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Mon, 4 Apr 2022 13:02:37 +0100 Subject: [PATCH 11/12] refactor: add table_id index on column_name After checking the postgres workload for the catalog in prod, this missing index was noted as the cause of unexpectedly expensive plans for simple queries. --- .../migrations/20220404120116_column_name-table_id-index.sql | 2 ++ 1 file changed, 2 insertions(+) create mode 100644 iox_catalog/migrations/20220404120116_column_name-table_id-index.sql diff --git a/iox_catalog/migrations/20220404120116_column_name-table_id-index.sql b/iox_catalog/migrations/20220404120116_column_name-table_id-index.sql new file mode 100644 index 0000000000..f262a1fce7 --- /dev/null +++ b/iox_catalog/migrations/20220404120116_column_name-table_id-index.sql @@ -0,0 +1,2 @@ +-- Avoid seqscan when filtering columns by their table ID. +CREATE INDEX IF NOT EXISTS column_name_table_idx ON column_name (table_id); From 276449ee0905734ba3c40065580022a8dfdde7f0 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 4 Apr 2022 12:05:46 +0000 Subject: [PATCH 12/12] chore(deps): Bump pbjson from 0.2.3 to 0.3.0 (#4215) Bumps [pbjson](https://github.com/influxdata/pbjson) from 0.2.3 to 0.3.0. - [Release notes](https://github.com/influxdata/pbjson/releases) - [Commits](https://github.com/influxdata/pbjson/compare/0.2.3...0.3.0) --- updated-dependencies: - dependency-name: pbjson dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- Cargo.lock | 14 ++++++++++++-- generated_types/Cargo.toml | 2 +- 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9cc7cc574e..237d84f665 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1683,7 +1683,7 @@ dependencies = [ "datafusion 0.1.0", "num_cpus", "observability_deps", - "pbjson", + "pbjson 0.3.0", "pbjson-build 0.3.0", "pbjson-types", "predicate", @@ -3926,6 +3926,16 @@ dependencies = [ "serde", ] +[[package]] +name = "pbjson" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d86c0a61b93c50f35af5d8a4f134790f47cbebf8803a7219dd1e7238cd1af022" +dependencies = [ + "base64 0.13.0", + "serde", +] + [[package]] name = "pbjson-build" version = "0.2.3" @@ -3958,7 +3968,7 @@ checksum = "58d94ffa4c36eb9d09fb8e0461f8256347d1e48793f53a8a210b43726f4ec884" dependencies = [ "bytes", "chrono", - "pbjson", + "pbjson 0.2.3", "pbjson-build 0.2.3", "prost 0.9.0", "prost-build", diff --git a/generated_types/Cargo.toml b/generated_types/Cargo.toml index 698fe04cf3..a64a39369a 100644 --- a/generated_types/Cargo.toml +++ b/generated_types/Cargo.toml @@ -10,7 +10,7 @@ data_types = { path = "../data_types", optional = true } data_types2 = { path = "../data_types2", optional = true } datafusion = { path = "../datafusion", optional = true } observability_deps = { path = "../observability_deps" } -pbjson = "0.2" +pbjson = "0.3" pbjson-types = "0.2" predicate = { path = "../predicate", optional = true } prost = "0.9"