From 4c7f96ead85862aff64034a7bdf4e0739b7c76a6 Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Fri, 12 May 2023 11:27:41 -0400 Subject: [PATCH 1/3] fix: Remove unused delete predicate proto conversion code --- generated_types/src/delete_predicate.rs | 151 ------------------------ generated_types/src/lib.rs | 2 - 2 files changed, 153 deletions(-) delete mode 100644 generated_types/src/delete_predicate.rs diff --git a/generated_types/src/delete_predicate.rs b/generated_types/src/delete_predicate.rs deleted file mode 100644 index 6fefc157dd..0000000000 --- a/generated_types/src/delete_predicate.rs +++ /dev/null @@ -1,151 +0,0 @@ -//! Code to serialize and deserialize certain expressions. -//! -//! Note that [Ballista] also provides a serialization using [Protocol Buffers 3]. However the -//! protocol is meant as a communication channel between workers and clients of Ballista, not for -//! long term preservation. For IOx we need a more stable solution. Luckily we only need to support -//! a very small subset of expression. -//! -//! [Ballista]: https://github.com/apache/arrow-datafusion/blob/22fcb3d7a68a56afbe12eab9e7d98f7b8de33703/ballista/rust/core/proto/ballista.proto -//! [Protocol Buffers 3]: https://developers.google.com/protocol-buffers/docs/proto3 - -use crate::google::{FieldViolation, FromOptionalField, FromRepeatedField, OptionalField}; -use crate::influxdata::iox::predicate::v1 as proto; -use crate::influxdata::iox::predicate::v1::scalar::Value; -use crate::influxdata::iox::predicate::v1::{Expr, Predicate}; -use data_types::{DeleteExpr, DeletePredicate, Op, Scalar, TimestampRange}; - -impl From for proto::Predicate { - fn from(predicate: DeletePredicate) -> Self { - proto::Predicate { - range: Some(proto::TimestampRange { - start: predicate.range.start(), - end: predicate.range.end(), - }), - exprs: predicate.exprs.into_iter().map(Into::into).collect(), - } - } -} - -impl TryFrom for DeletePredicate { - type Error = FieldViolation; - - fn try_from(value: Predicate) -> Result { - let range = value.range.unwrap_field("range")?; - - Ok(Self { - range: TimestampRange::new(range.start, range.end), - exprs: value.exprs.repeated("exprs")?, - }) - } -} - -impl TryFrom for DeleteExpr { - type Error = FieldViolation; - - fn try_from(value: Expr) -> Result { - Ok(Self { - column: value.column, - op: proto::Op::from_i32(value.op).required("op")?, - scalar: value.scalar.required("scalar")?, - }) - } -} - -impl From for proto::Expr { - fn from(expr: DeleteExpr) -> Self { - Self { - column: expr.column, - op: proto::Op::from(expr.op).into(), - scalar: Some(expr.scalar.into()), - } - } -} - -impl TryFrom for Scalar { - type Error = FieldViolation; - - fn try_from(value: proto::Scalar) -> Result { - Ok(value.value.unwrap_field("value")?.into()) - } -} - -impl From for Scalar { - fn from(value: Value) -> Self { - match value { - Value::ValueBool(v) => Self::Bool(v), - Value::ValueI64(v) => Self::I64(v), - Value::ValueF64(v) => Self::F64(v.into()), - Value::ValueString(v) => Self::String(v), - } - } -} - -impl From for proto::Scalar { - fn from(value: Scalar) -> Self { - let value = match value { - Scalar::Bool(v) => Value::ValueBool(v), - Scalar::I64(v) => Value::ValueI64(v), - Scalar::F64(v) => Value::ValueF64(v.0), - Scalar::String(v) => Value::ValueString(v), - }; - - Self { value: Some(value) } - } -} - -impl TryFrom for Op { - type Error = FieldViolation; - - fn try_from(value: proto::Op) -> Result { - match value { - proto::Op::Unspecified => Err(FieldViolation::required("")), - proto::Op::Eq => Ok(Self::Eq), - proto::Op::Ne => Ok(Self::Ne), - } - } -} - -impl From for proto::Op { - fn from(value: Op) -> Self { - match value { - Op::Eq => Self::Eq, - Op::Ne => Self::Ne, - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_roundtrip() { - let round_trip = |expr: DeleteExpr| { - let serialized: proto::Expr = expr.clone().into(); - let deserialized: DeleteExpr = serialized.try_into().unwrap(); - assert_eq!(expr, deserialized); - }; - - round_trip(DeleteExpr { - column: "foo".to_string(), - op: Op::Eq, - scalar: Scalar::Bool(true), - }); - - round_trip(DeleteExpr { - column: "bar".to_string(), - op: Op::Ne, - scalar: Scalar::I64(-1), - }); - round_trip(DeleteExpr { - column: "baz".to_string(), - op: Op::Eq, - scalar: Scalar::F64((-1.1).into()), - }); - round_trip(DeleteExpr { - column: "col".to_string(), - op: Op::Eq, - scalar: Scalar::String("foo".to_string()), - }); - } -} diff --git a/generated_types/src/lib.rs b/generated_types/src/lib.rs index b718f51a44..73c3b8085f 100644 --- a/generated_types/src/lib.rs +++ b/generated_types/src/lib.rs @@ -242,8 +242,6 @@ pub mod google; #[cfg(any(feature = "data_types_conversions", test))] pub mod compactor; #[cfg(any(feature = "data_types_conversions", test))] -pub mod delete_predicate; -#[cfg(any(feature = "data_types_conversions", test))] pub mod ingester; pub use prost::{DecodeError, EncodeError}; From 1770d0f4d8e5a9b3a82dca488eb14174f023de98 Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Fri, 12 May 2023 11:35:29 -0400 Subject: [PATCH 2/3] fix: Move ingester-querier gRPC communication to its own crate --- Cargo.lock | 32 +++++++-- Cargo.toml | 1 + generated_types/Cargo.toml | 8 +-- generated_types/build.rs | 1 - generated_types/src/lib.rs | 2 - influxdb_iox/Cargo.toml | 1 + influxdb_iox/src/commands/query_ingester.rs | 13 ++-- .../tests/end_to_end_cases/ingester.rs | 2 +- .../querier/multi_ingester.rs | 2 +- influxdb_iox_client/src/client/flight/mod.rs | 5 +- ingester/Cargo.toml | 1 + ingester/src/server/grpc/query.rs | 2 +- ingester/tests/write.rs | 2 +- ingester_query_grpc/Cargo.toml | 26 +++++++ ingester_query_grpc/build.rs | 56 +++++++++++++++ .../influxdata/iox/ingester/v1/query.proto | 0 .../src/lib.rs | 69 ++++++++++++++++++- ingester_test_ctx/Cargo.toml | 1 + ingester_test_ctx/src/lib.rs | 4 +- querier/Cargo.toml | 2 +- querier/src/ingester/circuit_breaker.rs | 7 +- querier/src/ingester/flight_client.rs | 5 +- querier/src/ingester/invalidate_on_error.rs | 2 +- querier/src/ingester/mod.rs | 10 +-- test_helpers_end_to_end/Cargo.toml | 1 + test_helpers_end_to_end/src/mini_cluster.rs | 4 +- workspace-hack/Cargo.toml | 1 + 27 files changed, 213 insertions(+), 47 deletions(-) create mode 100644 ingester_query_grpc/Cargo.toml create mode 100644 ingester_query_grpc/build.rs rename {generated_types => ingester_query_grpc}/protos/influxdata/iox/ingester/v1/query.proto (100%) rename generated_types/src/ingester.rs => ingester_query_grpc/src/lib.rs (79%) diff --git a/Cargo.lock b/Cargo.lock index 4c16a937d3..e6a944b68f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2037,16 +2037,12 @@ dependencies = [ "base64 0.21.0", "bytes", "data_types", - "datafusion", - "datafusion-proto", "observability_deps", "pbjson", "pbjson-build", "pbjson-types", - "predicate", "prost", "prost-build", - "query_functions", "serde", "snafu", "tonic", @@ -2565,6 +2561,7 @@ dependencies = [ "influxdb_iox_client", "influxdb_storage_client", "influxrpc_parser", + "ingester_query_grpc", "insta", "iox_catalog", "iox_query", @@ -2698,6 +2695,7 @@ dependencies = [ "generated_types", "hashbrown 0.13.2", "influxdb_iox_client", + "ingester_query_grpc", "ingester_test_ctx", "iox_catalog", "iox_query", @@ -2732,6 +2730,27 @@ dependencies = [ "workspace-hack", ] +[[package]] +name = "ingester_query_grpc" +version = "0.1.0" +dependencies = [ + "base64 0.21.0", + "data_types", + "datafusion", + "datafusion-proto", + "pbjson", + "pbjson-build", + "pbjson-types", + "predicate", + "prost", + "prost-build", + "query_functions", + "serde", + "snafu", + "tonic-build", + "workspace-hack", +] + [[package]] name = "ingester_test_ctx" version = "0.1.0" @@ -2746,6 +2765,7 @@ dependencies = [ "hashbrown 0.13.2", "influxdb_iox_client", "ingester", + "ingester_query_grpc", "iox_catalog", "iox_query", "iox_time", @@ -4426,8 +4446,8 @@ dependencies = [ "datafusion", "datafusion_util", "futures", - "generated_types", "influxdb_iox_client", + "ingester_query_grpc", "insta", "iox_catalog", "iox_query", @@ -5612,6 +5632,7 @@ dependencies = [ "http", "hyper", "influxdb_iox_client", + "ingester_query_grpc", "iox_catalog", "mutable_batch_lp", "mutable_batch_pb", @@ -6751,6 +6772,7 @@ dependencies = [ "sha2", "similar", "smallvec", + "sqlparser", "sqlx", "sqlx-core", "sqlx-macros", diff --git a/Cargo.toml b/Cargo.toml index 5705441e4d..3adbedec21 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,6 +30,7 @@ members = [ "influxrpc_parser", "ingester_test_ctx", "ingester", + "ingester_query_grpc", "iox_catalog", "iox_data_generator", "iox_query_influxql", diff --git a/generated_types/Cargo.toml b/generated_types/Cargo.toml index da22a1201d..21d67c1924 100644 --- a/generated_types/Cargo.toml +++ b/generated_types/Cargo.toml @@ -9,14 +9,10 @@ license.workspace = true base64 = "0.21" bytes = "1.4" data_types = { path = "../data_types", optional = true } -datafusion = { workspace = true, optional = true } -datafusion-proto = { workspace = true, optional = true } observability_deps = { path = "../observability_deps" } pbjson = "0.5" pbjson-types = "0.5" -predicate = { path = "../predicate", optional = true } prost = "0.11" -query_functions = { path = "../query_functions" } serde = { version = "1.0", features = ["derive"] } snafu = "0.7" tonic = { workspace = true } @@ -29,9 +25,7 @@ pbjson-build = "0.5" [dev-dependencies] data_types = { path = "../data_types" } -datafusion = { workspace = true } -predicate = { path = "../predicate" } [features] default = ["data_types_conversions"] -data_types_conversions = ["data_types", "datafusion", "datafusion-proto", "predicate"] +data_types_conversions = ["data_types"] diff --git a/generated_types/build.rs b/generated_types/build.rs index de783dd931..feef1964ca 100644 --- a/generated_types/build.rs +++ b/generated_types/build.rs @@ -53,7 +53,6 @@ fn generate_grpc_types(root: &Path) -> Result<()> { compactor_path.join("service.proto"), delete_path.join("service.proto"), ingester_path.join("parquet_metadata.proto"), - ingester_path.join("query.proto"), ingester_path.join("write.proto"), ingester_path.join("replication.proto"), ingester_path.join("persist.proto"), diff --git a/generated_types/src/lib.rs b/generated_types/src/lib.rs index 73c3b8085f..e981c40590 100644 --- a/generated_types/src/lib.rs +++ b/generated_types/src/lib.rs @@ -241,8 +241,6 @@ pub mod google; #[cfg(any(feature = "data_types_conversions", test))] pub mod compactor; -#[cfg(any(feature = "data_types_conversions", test))] -pub mod ingester; pub use prost::{DecodeError, EncodeError}; diff --git a/influxdb_iox/Cargo.toml b/influxdb_iox/Cargo.toml index 0e0927b6e9..e3b5c51db5 100644 --- a/influxdb_iox/Cargo.toml +++ b/influxdb_iox/Cargo.toml @@ -19,6 +19,7 @@ import = { path = "../import" } influxdb_iox_client = { path = "../influxdb_iox_client", features = ["flight", "format"] } influxdb_storage_client = { path = "../influxdb_storage_client" } influxrpc_parser = { path = "../influxrpc_parser"} +ingester_query_grpc = { path = "../ingester_query_grpc" } iox_catalog = { path = "../iox_catalog" } ioxd_common = { path = "../ioxd_common"} ioxd_compactor = { path = "../ioxd_compactor"} diff --git a/influxdb_iox/src/commands/query_ingester.rs b/influxdb_iox/src/commands/query_ingester.rs index 20f404a207..e1a757ba28 100644 --- a/influxdb_iox/src/commands/query_ingester.rs +++ b/influxdb_iox/src/commands/query_ingester.rs @@ -1,12 +1,9 @@ use arrow_flight::Ticket; use futures::TryStreamExt; -use generated_types::ingester::{ - decode_proto_predicate_from_base64, DecodeProtoPredicateFromBase64Error, -}; -use influxdb_iox_client::{ - connection::Connection, - flight::{self}, - format::QueryOutputFormat, +use influxdb_iox_client::{connection::Connection, format::QueryOutputFormat}; +use ingester_query_grpc::{ + decode_proto_predicate_from_base64, influxdata::iox::ingester::v1::IngesterQueryRequest, + DecodeProtoPredicateFromBase64Error, }; use prost::Message; use std::str::FromStr; @@ -73,7 +70,7 @@ pub async fn command(connection: Connection, config: Config) -> Result<()> { None }; - let request = flight::generated_types::IngesterQueryRequest { + let request = IngesterQueryRequest { table_id, columns, predicate, diff --git a/influxdb_iox/tests/end_to_end_cases/ingester.rs b/influxdb_iox/tests/end_to_end_cases/ingester.rs index c3dc5034b0..ddae96c165 100644 --- a/influxdb_iox/tests/end_to_end_cases/ingester.rs +++ b/influxdb_iox/tests/end_to_end_cases/ingester.rs @@ -2,8 +2,8 @@ use arrow_flight::{error::FlightError, Ticket}; use arrow_util::assert_batches_sorted_eq; use data_types::{NamespaceId, TableId}; use futures::FutureExt; -use generated_types::{influxdata::iox::ingester::v1 as proto, ingester::IngesterQueryRequest}; use http::StatusCode; +use ingester_query_grpc::{influxdata::iox::ingester::v1 as proto, IngesterQueryRequest}; use prost::Message; use test_helpers_end_to_end::{maybe_skip_integration, MiniCluster, Step, StepTest, StepTestState}; diff --git a/influxdb_iox/tests/end_to_end_cases/querier/multi_ingester.rs b/influxdb_iox/tests/end_to_end_cases/querier/multi_ingester.rs index 1c6ccd7dbd..1680b01c52 100644 --- a/influxdb_iox/tests/end_to_end_cases/querier/multi_ingester.rs +++ b/influxdb_iox/tests/end_to_end_cases/querier/multi_ingester.rs @@ -1,6 +1,6 @@ use arrow_util::assert_batches_sorted_eq; use futures::FutureExt; -use generated_types::{influxdata::iox::ingester::v1 as proto, ingester::IngesterQueryRequest}; +use ingester_query_grpc::{influxdata::iox::ingester::v1 as proto, IngesterQueryRequest}; use std::num::NonZeroUsize; use test_helpers_end_to_end::{ maybe_skip_integration, MiniCluster, Step, StepTest, StepTestState, TestConfig, diff --git a/influxdb_iox_client/src/client/flight/mod.rs b/influxdb_iox_client/src/client/flight/mod.rs index 0d5abad11b..d4c66a1272 100644 --- a/influxdb_iox_client/src/client/flight/mod.rs +++ b/influxdb_iox_client/src/client/flight/mod.rs @@ -21,10 +21,7 @@ use crate::connection::Connection; /// Re-export generated_types pub mod generated_types { - pub use generated_types::influxdata::iox::{ - ingester::v1::{IngesterQueryRequest, IngesterQueryResponseMetadata, Predicate}, - querier::v1::*, - }; + pub use generated_types::influxdata::iox::querier::v1::*; } /// Error responses when querying an IOx namespace using the IOx Flight API. diff --git a/ingester/Cargo.toml b/ingester/Cargo.toml index 7a1bdc77a6..2574f80d12 100644 --- a/ingester/Cargo.toml +++ b/ingester/Cargo.toml @@ -22,6 +22,7 @@ flatbuffers = "23.1.21" futures = "0.3.28" generated_types = { version = "0.1.0", path = "../generated_types" } hashbrown.workspace = true +ingester_query_grpc = { path = "../ingester_query_grpc" } iox_catalog = { version = "0.1.0", path = "../iox_catalog" } iox_query = { version = "0.1.0", path = "../iox_query" } iox_time = { path = "../iox_time" } diff --git a/ingester/src/server/grpc/query.rs b/ingester/src/server/grpc/query.rs index de013c7406..6e3a97d437 100644 --- a/ingester/src/server/grpc/query.rs +++ b/ingester/src/server/grpc/query.rs @@ -9,7 +9,7 @@ use arrow_flight::{ use data_types::{NamespaceId, PartitionId, TableId}; use flatbuffers::FlatBufferBuilder; use futures::{Stream, StreamExt, TryStreamExt}; -use generated_types::influxdata::iox::ingester::v1 as proto; +use ingester_query_grpc::influxdata::iox::ingester::v1 as proto; use metric::U64Counter; use observability_deps::tracing::*; use prost::Message; diff --git a/ingester/tests/write.rs b/ingester/tests/write.rs index ddc0fc1686..930e433461 100644 --- a/ingester/tests/write.rs +++ b/ingester/tests/write.rs @@ -1,7 +1,7 @@ use arrow_util::assert_batches_sorted_eq; use assert_matches::assert_matches; use data_types::PartitionKey; -use influxdb_iox_client::flight::generated_types::IngesterQueryRequest; +use ingester_query_grpc::influxdata::iox::ingester::v1::IngesterQueryRequest; use ingester_test_ctx::TestContextBuilder; use iox_catalog::interface::Catalog; use metric::{DurationHistogram, U64Histogram}; diff --git a/ingester_query_grpc/Cargo.toml b/ingester_query_grpc/Cargo.toml new file mode 100644 index 0000000000..35b9623e52 --- /dev/null +++ b/ingester_query_grpc/Cargo.toml @@ -0,0 +1,26 @@ +[package] +name = "ingester_query_grpc" +version.workspace = true +authors.workspace = true +edition.workspace = true +license.workspace = true + +[dependencies] # In alphabetical order +base64 = "0.21" +data_types = { path = "../data_types" } +datafusion = { workspace = true } +datafusion-proto = { workspace = true } +pbjson = "0.5" +pbjson-types = "0.5" +predicate = { path = "../predicate" } +prost = "0.11" +query_functions = { path = "../query_functions" } +serde = { version = "1.0", features = ["derive"] } +snafu = "0.7" +workspace-hack = { version = "0.1", path = "../workspace-hack" } + +[build-dependencies] # In alphabetical order +tonic-build = { workspace = true } +prost-build = "0.11" +pbjson-build = "0.5" + diff --git a/ingester_query_grpc/build.rs b/ingester_query_grpc/build.rs new file mode 100644 index 0000000000..3ac0647812 --- /dev/null +++ b/ingester_query_grpc/build.rs @@ -0,0 +1,56 @@ +//! Compiles Protocol Buffers into native Rust types. + +use std::env; +use std::path::{Path, PathBuf}; + +type Error = Box; +type Result = std::result::Result; + +fn main() -> Result<()> { + let root = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("protos"); + + generate_grpc_types(&root)?; + + Ok(()) +} + +/// Schema used with IOx specific Ingester-Querier gRPC requests +/// +/// Creates: +/// +/// - `influxdata.iox.ingester.v1.rs` +fn generate_grpc_types(root: &Path) -> Result<()> { + let ingester_path = root.join("influxdata/iox/ingester/v1"); + + let proto_files = vec![ingester_path.join("query.proto")]; + + // Tell cargo to recompile if any of these proto files are changed + for proto_file in &proto_files { + println!("cargo:rerun-if-changed={}", proto_file.display()); + } + + let mut config = prost_build::Config::new(); + + config + .compile_well_known_types() + .disable_comments([".google"]) + .extern_path(".google.protobuf", "::pbjson_types") + .btree_map([ + ".influxdata.iox.ingester.v1.IngesterQueryResponseMetadata.unpersisted_partitions", + ]); + + let descriptor_path = PathBuf::from(env::var("OUT_DIR").unwrap()).join("proto_descriptor.bin"); + tonic_build::configure() + .file_descriptor_set_path(&descriptor_path) + // protoc in ubuntu builder needs this option + .protoc_arg("--experimental_allow_proto3_optional") + .compile_with_config(config, &proto_files, &[root])?; + + let descriptor_set = std::fs::read(descriptor_path)?; + + pbjson_build::Builder::new() + .register_descriptors(&descriptor_set)? + .build(&[".influxdata.iox"])?; + + Ok(()) +} diff --git a/generated_types/protos/influxdata/iox/ingester/v1/query.proto b/ingester_query_grpc/protos/influxdata/iox/ingester/v1/query.proto similarity index 100% rename from generated_types/protos/influxdata/iox/ingester/v1/query.proto rename to ingester_query_grpc/protos/influxdata/iox/ingester/v1/query.proto diff --git a/generated_types/src/ingester.rs b/ingester_query_grpc/src/lib.rs similarity index 79% rename from generated_types/src/ingester.rs rename to ingester_query_grpc/src/lib.rs index e38b09a6e3..8c6b4f1a0e 100644 --- a/generated_types/src/ingester.rs +++ b/ingester_query_grpc/src/lib.rs @@ -1,4 +1,10 @@ -use crate::{google::FieldViolation, influxdata::iox::ingester::v1 as proto}; +// This crate deliberately does not use the same linting rules as the other +// crates because of all the generated code it contains that we don't have much +// control over. +#![deny(rustdoc::broken_intra_doc_links, rustdoc::bare_urls)] +#![allow(clippy::derive_partial_eq_without_eq, clippy::needless_borrow)] + +use crate::influxdata::iox::ingester::v1 as proto; use base64::{prelude::BASE64_STANDARD, Engine}; use data_types::{NamespaceId, TableId, TimestampRange}; use datafusion::{common::DataFusionError, prelude::Expr}; @@ -7,6 +13,67 @@ use predicate::{Predicate, ValueExpr}; use prost::Message; use snafu::{ResultExt, Snafu}; +/// This module imports the generated protobuf code into a Rust module +/// hierarchy that matches the namespace hierarchy of the protobuf +/// definitions +pub mod influxdata { + pub mod iox { + pub mod ingester { + pub mod v1 { + include!(concat!(env!("OUT_DIR"), "/influxdata.iox.ingester.v1.rs")); + include!(concat!( + env!("OUT_DIR"), + "/influxdata.iox.ingester.v1.serde.rs" + )); + } + } + } +} + +/// Error returned if a request field has an invalid value. Includes +/// machinery to add parent field names for context -- thus it will +/// report `rules.write_timeout` than simply `write_timeout`. +#[derive(Debug, Default, Clone, PartialEq)] +pub struct FieldViolation { + pub field: String, + pub description: String, +} + +impl FieldViolation { + pub fn required(field: impl Into) -> Self { + Self { + field: field.into(), + description: "Field is required".to_string(), + } + } + + /// Re-scopes this error as the child of another field + pub fn scope(self, field: impl Into) -> Self { + let field = if self.field.is_empty() { + field.into() + } else { + [field.into(), self.field].join(".") + }; + + Self { + field, + description: self.description, + } + } +} + +impl std::error::Error for FieldViolation {} + +impl std::fmt::Display for FieldViolation { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "Violation for field \"{}\": {}", + self.field, self.description + ) + } +} + fn expr_to_bytes_violation(field: impl Into, e: DataFusionError) -> FieldViolation { FieldViolation { field: field.into(), diff --git a/ingester_test_ctx/Cargo.toml b/ingester_test_ctx/Cargo.toml index c85ea97082..57ce7dd861 100644 --- a/ingester_test_ctx/Cargo.toml +++ b/ingester_test_ctx/Cargo.toml @@ -16,6 +16,7 @@ generated_types = { version = "0.1.0", path = "../generated_types" } hashbrown.workspace = true influxdb_iox_client = { path = "../influxdb_iox_client" } ingester = { path = "../ingester" } +ingester_query_grpc = { path = "../ingester_query_grpc" } iox_catalog = { version = "0.1.0", path = "../iox_catalog" } iox_query = { version = "0.1.0", path = "../iox_query" } iox_time = { path = "../iox_time" } diff --git a/ingester_test_ctx/src/lib.rs b/ingester_test_ctx/src/lib.rs index 83a7ab5b7e..86865f2265 100644 --- a/ingester_test_ctx/src/lib.rs +++ b/ingester_test_ctx/src/lib.rs @@ -24,8 +24,8 @@ use futures::{stream::FuturesUnordered, FutureExt, StreamExt, TryStreamExt}; use generated_types::influxdata::iox::ingester::v1::{ write_service_server::WriteService, WriteRequest, }; -use influxdb_iox_client::flight; use ingester::{IngesterGuard, IngesterRpcInterface}; +use ingester_query_grpc::influxdata::iox::ingester::v1::IngesterQueryRequest; use iox_catalog::{ interface::{Catalog, SoftDeletedRows}, validate_or_insert_schema, @@ -341,7 +341,7 @@ where /// Submit a query to the ingester's public query interface. pub async fn query( &self, - request: flight::generated_types::IngesterQueryRequest, + request: IngesterQueryRequest, ) -> Result, influxdb_iox_client::flight::Error> { let mut bytes = bytes::BytesMut::new(); prost::Message::encode(&request, &mut bytes)?; diff --git a/querier/Cargo.toml b/querier/Cargo.toml index 331abf7a8f..1bcaec9cab 100644 --- a/querier/Cargo.toml +++ b/querier/Cargo.toml @@ -17,11 +17,11 @@ data_types = { path = "../data_types" } datafusion = { workspace = true } datafusion_util = { path = "../datafusion_util" } futures = "0.3" -generated_types = { path = "../generated_types" } influxdb_iox_client = { path = "../influxdb_iox_client" } iox_catalog = { path = "../iox_catalog" } iox_query = { path = "../iox_query" } iox_time = { path = "../iox_time" } +ingester_query_grpc = { path = "../ingester_query_grpc" } metric = { path = "../metric" } object_store = "0.5.6" observability_deps = { path = "../observability_deps" } diff --git a/querier/src/ingester/circuit_breaker.rs b/querier/src/ingester/circuit_breaker.rs index c7a7310bf5..dce04f050d 100644 --- a/querier/src/ingester/circuit_breaker.rs +++ b/querier/src/ingester/circuit_breaker.rs @@ -12,7 +12,7 @@ use std::{ use async_trait::async_trait; use backoff::{Backoff, BackoffConfig}; -use generated_types::ingester::IngesterQueryRequest; +use ingester_query_grpc::IngesterQueryRequest; use iox_time::{Time, TimeProvider}; use metric::{Metric, Registry, U64Gauge}; use observability_deps::tracing::{info, warn}; @@ -505,8 +505,9 @@ mod tests { use arrow_flight::decode::DecodedPayload; use assert_matches::assert_matches; use data_types::{NamespaceId, TableId}; - use generated_types::google::FieldViolation; - use influxdb_iox_client::flight::generated_types::IngesterQueryResponseMetadata; + use ingester_query_grpc::{ + influxdata::iox::ingester::v1::IngesterQueryResponseMetadata, FieldViolation, + }; use iox_time::MockProvider; use metric::Attributes; use test_helpers::maybe_start_logging; diff --git a/querier/src/ingester/flight_client.rs b/querier/src/ingester/flight_client.rs index 20fab425a8..fea503b39d 100644 --- a/querier/src/ingester/flight_client.rs +++ b/querier/src/ingester/flight_client.rs @@ -5,8 +5,7 @@ use arrow_flight::{ use async_trait::async_trait; use client_util::connection::{self, Connection}; use futures::StreamExt; -use generated_types::ingester::IngesterQueryRequest; -use influxdb_iox_client::flight::generated_types as proto; +use ingester_query_grpc::{influxdata::iox::ingester::v1 as proto, IngesterQueryRequest}; use observability_deps::tracing::{debug, warn}; use prost::Message; use snafu::{ResultExt, Snafu}; @@ -33,7 +32,7 @@ pub enum Error { #[snafu(display("Internal error creating flight request : {}", source))] CreatingRequest { - source: influxdb_iox_client::google::FieldViolation, + source: ingester_query_grpc::FieldViolation, }, #[snafu(display("Failed to perform flight request: {}", source))] diff --git a/querier/src/ingester/invalidate_on_error.rs b/querier/src/ingester/invalidate_on_error.rs index a5f54f8b55..f862b5bfa9 100644 --- a/querier/src/ingester/invalidate_on_error.rs +++ b/querier/src/ingester/invalidate_on_error.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use async_trait::async_trait; -use generated_types::ingester::IngesterQueryRequest; +use ingester_query_grpc::IngesterQueryRequest; use trace::{ctx::SpanContext, span::SpanRecorder}; use crate::ingester::flight_client::{Error as FlightClientError, IngesterFlightClient, QueryData}; diff --git a/querier/src/ingester/mod.rs b/querier/src/ingester/mod.rs index 30cccd2157..cc3e550504 100644 --- a/querier/src/ingester/mod.rs +++ b/querier/src/ingester/mod.rs @@ -17,8 +17,10 @@ use data_types::{ }; use datafusion::error::DataFusionError; use futures::{stream::FuturesUnordered, TryStreamExt}; -use generated_types::ingester::{encode_proto_predicate_as_base64, IngesterQueryRequest}; -use influxdb_iox_client::flight::generated_types::IngesterQueryResponseMetadata; +use ingester_query_grpc::{ + encode_proto_predicate_as_base64, influxdata::iox::ingester::v1::IngesterQueryResponseMetadata, + IngesterQueryRequest, +}; use iox_query::{ exec::{stringset::StringSet, IOxSessionContext}, util::{compute_timenanosecond_min_max, create_basic_summary}, @@ -623,7 +625,7 @@ impl IngesterStreamDecoder { } fn encode_predicate_as_base64(predicate: &Predicate) -> String { - use generated_types::influxdata::iox::ingester::v1::Predicate as ProtoPredicate; + use ingester_query_grpc::influxdata::iox::ingester::v1::Predicate as ProtoPredicate; let predicate = match ProtoPredicate::try_from(predicate.clone()) { Ok(predicate) => predicate, @@ -1053,7 +1055,7 @@ mod tests { }; use assert_matches::assert_matches; use data_types::TableId; - use influxdb_iox_client::flight::generated_types::IngesterQueryResponseMetadata; + use ingester_query_grpc::influxdata::iox::ingester::v1::IngesterQueryResponseMetadata; use iox_tests::TestCatalog; use metric::Attributes; use mutable_batch_lp::test_helpers::lp_to_mutable_batch; diff --git a/test_helpers_end_to_end/Cargo.toml b/test_helpers_end_to_end/Cargo.toml index ef31b0a402..17e2518f6c 100644 --- a/test_helpers_end_to_end/Cargo.toml +++ b/test_helpers_end_to_end/Cargo.toml @@ -18,6 +18,7 @@ generated_types = { path = "../generated_types" } http = "0.2.9" hyper = "0.14" influxdb_iox_client = { path = "../influxdb_iox_client", features = ["flight", "format"] } +ingester_query_grpc = { path = "../ingester_query_grpc" } iox_catalog = { path = "../iox_catalog" } mutable_batch_lp = { path = "../mutable_batch_lp" } mutable_batch_pb = { path = "../mutable_batch_pb" } diff --git a/test_helpers_end_to_end/src/mini_cluster.rs b/test_helpers_end_to_end/src/mini_cluster.rs index 9b7fe9494c..d670b26663 100644 --- a/test_helpers_end_to_end/src/mini_cluster.rs +++ b/test_helpers_end_to_end/src/mini_cluster.rs @@ -15,9 +15,11 @@ use http::Response; use hyper::Body; use influxdb_iox_client::{ connection::{Connection, GrpcConnection}, - flight::generated_types::{IngesterQueryRequest, IngesterQueryResponseMetadata}, schema::generated_types::{schema_service_client::SchemaServiceClient, GetSchemaRequest}, }; +use ingester_query_grpc::influxdata::iox::ingester::v1::{ + IngesterQueryRequest, IngesterQueryResponseMetadata, +}; use observability_deps::tracing::{debug, info}; use once_cell::sync::Lazy; use prost::Message; diff --git a/workspace-hack/Cargo.toml b/workspace-hack/Cargo.toml index 0d07e07fff..c4d6e7809a 100644 --- a/workspace-hack/Cargo.toml +++ b/workspace-hack/Cargo.toml @@ -80,6 +80,7 @@ serde_json = { version = "1", features = ["raw_value"] } sha2 = { version = "0.10" } similar = { version = "2", features = ["inline"] } smallvec = { version = "1", default-features = false, features = ["union"] } +sqlparser = { version = "0.33", features = ["visitor"] } sqlx = { version = "0.6", features = ["json", "postgres", "runtime-tokio-rustls", "sqlite", "tls", "uuid"] } sqlx-core = { version = "0.6", default-features = false, features = ["any", "migrate", "postgres", "runtime-tokio-rustls", "sqlite", "uuid"] } thrift = { version = "0.17" } From 14007808bdab41990f3a1f9047ba52b0a7b1391d Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Fri, 12 May 2023 13:25:49 -0400 Subject: [PATCH 3/3] fix: Move remaining conversions between data types and proto into data_types And have data_types depend on generated_types rather than vice versa. --- Cargo.lock | 3 +- data_types/Cargo.toml | 1 + data_types/src/columns.rs | 54 ++++++++++++++++++++++++++++ data_types/src/lib.rs | 27 ++++++++++++++ generated_types/Cargo.toml | 8 ----- generated_types/src/compactor.rs | 28 --------------- generated_types/src/lib.rs | 58 ------------------------------ influxdb_iox_client/Cargo.toml | 2 +- influxdb_storage_client/Cargo.toml | 2 +- workspace-hack/Cargo.toml | 4 --- 10 files changed, 85 insertions(+), 102 deletions(-) delete mode 100644 generated_types/src/compactor.rs diff --git a/Cargo.lock b/Cargo.lock index e6a944b68f..7cc75a95e9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1414,6 +1414,7 @@ name = "data_types" version = "0.1.0" dependencies = [ "croaring", + "generated_types", "influxdb-line-protocol", "iox_time", "observability_deps", @@ -2036,7 +2037,6 @@ version = "0.1.0" dependencies = [ "base64 0.21.0", "bytes", - "data_types", "observability_deps", "pbjson", "pbjson-build", @@ -6735,7 +6735,6 @@ dependencies = [ "hashbrown 0.12.3", "hashbrown 0.13.2", "heck", - "hyper", "indexmap", "io-lifetimes", "itertools", diff --git a/data_types/Cargo.toml b/data_types/Cargo.toml index 7de4c08e1d..f3c3d4db1a 100644 --- a/data_types/Cargo.toml +++ b/data_types/Cargo.toml @@ -10,6 +10,7 @@ license.workspace = true croaring = "0.8.1" influxdb-line-protocol = { path = "../influxdb_line_protocol" } iox_time = { path = "../iox_time" } +generated_types = { path = "../generated_types" } observability_deps = { path = "../observability_deps" } once_cell = "1" ordered-float = "3" diff --git a/data_types/src/columns.rs b/data_types/src/columns.rs index a4d6bd8fd9..58c2db0913 100644 --- a/data_types/src/columns.rs +++ b/data_types/src/columns.rs @@ -1,6 +1,7 @@ //! Types having to do with columns. use super::TableId; +use generated_types::influxdata::iox::schema::v1 as proto; use influxdb_line_protocol::FieldValue; use schema::{builder::SchemaBuilder, InfluxColumnType, InfluxFieldType, Schema}; use sqlx::postgres::PgHasArrayType; @@ -305,6 +306,25 @@ pub fn column_type_from_field(field_value: &FieldValue) -> ColumnType { } } +impl TryFrom for ColumnType { + type Error = Box; + + fn try_from(value: proto::column_schema::ColumnType) -> Result { + Ok(match value { + proto::column_schema::ColumnType::I64 => ColumnType::I64, + proto::column_schema::ColumnType::U64 => ColumnType::U64, + proto::column_schema::ColumnType::F64 => ColumnType::F64, + proto::column_schema::ColumnType::Bool => ColumnType::Bool, + proto::column_schema::ColumnType::String => ColumnType::String, + proto::column_schema::ColumnType::Time => ColumnType::Time, + proto::column_schema::ColumnType::Tag => ColumnType::Tag, + proto::column_schema::ColumnType::Unspecified => { + return Err("unknown column type".into()) + } + }) + } +} + /// Set of columns. #[derive(Debug, Clone, PartialEq, Eq, sqlx::Type)] #[sqlx(transparent)] @@ -363,4 +383,38 @@ mod tests { fn test_column_set_duplicates() { ColumnSet::new([ColumnId::new(1), ColumnId::new(2), ColumnId::new(1)]); } + + #[test] + fn test_column_schema() { + assert_eq!( + ColumnType::try_from(proto::column_schema::ColumnType::I64).unwrap(), + ColumnType::I64, + ); + assert_eq!( + ColumnType::try_from(proto::column_schema::ColumnType::U64).unwrap(), + ColumnType::U64, + ); + assert_eq!( + ColumnType::try_from(proto::column_schema::ColumnType::F64).unwrap(), + ColumnType::F64, + ); + assert_eq!( + ColumnType::try_from(proto::column_schema::ColumnType::Bool).unwrap(), + ColumnType::Bool, + ); + assert_eq!( + ColumnType::try_from(proto::column_schema::ColumnType::String).unwrap(), + ColumnType::String, + ); + assert_eq!( + ColumnType::try_from(proto::column_schema::ColumnType::Time).unwrap(), + ColumnType::Time, + ); + assert_eq!( + ColumnType::try_from(proto::column_schema::ColumnType::Tag).unwrap(), + ColumnType::Tag, + ); + + assert!(ColumnType::try_from(proto::column_schema::ColumnType::Unspecified).is_err()); + } } diff --git a/data_types/src/lib.rs b/data_types/src/lib.rs index 84a415da8d..73e7bd8847 100644 --- a/data_types/src/lib.rs +++ b/data_types/src/lib.rs @@ -623,6 +623,33 @@ pub struct SkippedCompaction { pub limit_num_files_first_in_partition: i64, } +use generated_types::influxdata::iox::compactor::v1 as compactor_proto; +impl From for compactor_proto::SkippedCompaction { + fn from(skipped_compaction: SkippedCompaction) -> Self { + let SkippedCompaction { + partition_id, + reason, + skipped_at, + estimated_bytes, + limit_bytes, + num_files, + limit_num_files, + limit_num_files_first_in_partition, + } = skipped_compaction; + + Self { + partition_id: partition_id.get(), + reason, + skipped_at: skipped_at.get(), + estimated_bytes, + limit_bytes, + num_files, + limit_num_files, + limit_num_files_first_in_partition: Some(limit_num_files_first_in_partition), + } + } +} + /// Data for a parquet file reference that has been inserted in the catalog. #[derive(Debug, Clone, PartialEq, Eq, sqlx::FromRow)] pub struct ParquetFile { diff --git a/generated_types/Cargo.toml b/generated_types/Cargo.toml index 21d67c1924..26c8729261 100644 --- a/generated_types/Cargo.toml +++ b/generated_types/Cargo.toml @@ -8,7 +8,6 @@ license.workspace = true [dependencies] # In alphabetical order base64 = "0.21" bytes = "1.4" -data_types = { path = "../data_types", optional = true } observability_deps = { path = "../observability_deps" } pbjson = "0.5" pbjson-types = "0.5" @@ -22,10 +21,3 @@ workspace-hack = { version = "0.1", path = "../workspace-hack" } tonic-build = { workspace = true } prost-build = "0.11" pbjson-build = "0.5" - -[dev-dependencies] -data_types = { path = "../data_types" } - -[features] -default = ["data_types_conversions"] -data_types_conversions = ["data_types"] diff --git a/generated_types/src/compactor.rs b/generated_types/src/compactor.rs deleted file mode 100644 index 364647fd43..0000000000 --- a/generated_types/src/compactor.rs +++ /dev/null @@ -1,28 +0,0 @@ -use crate::influxdata::iox::compactor::v1 as proto; -use data_types::SkippedCompaction; - -impl From for proto::SkippedCompaction { - fn from(skipped_compaction: SkippedCompaction) -> Self { - let SkippedCompaction { - partition_id, - reason, - skipped_at, - estimated_bytes, - limit_bytes, - num_files, - limit_num_files, - limit_num_files_first_in_partition, - } = skipped_compaction; - - Self { - partition_id: partition_id.get(), - reason, - skipped_at: skipped_at.get(), - estimated_bytes, - limit_bytes, - num_files, - limit_num_files, - limit_num_files_first_in_partition: Some(limit_num_files_first_in_partition), - } - } -} diff --git a/generated_types/src/lib.rs b/generated_types/src/lib.rs index e981c40590..db013bad74 100644 --- a/generated_types/src/lib.rs +++ b/generated_types/src/lib.rs @@ -145,25 +145,6 @@ pub mod influxdata { env!("OUT_DIR"), "/influxdata.iox.schema.v1.serde.rs" )); - - impl TryFrom for data_types::ColumnType { - type Error = Box; - - fn try_from(value: column_schema::ColumnType) -> Result { - Ok(match value { - column_schema::ColumnType::I64 => data_types::ColumnType::I64, - column_schema::ColumnType::U64 => data_types::ColumnType::U64, - column_schema::ColumnType::F64 => data_types::ColumnType::F64, - column_schema::ColumnType::Bool => data_types::ColumnType::Bool, - column_schema::ColumnType::String => data_types::ColumnType::String, - column_schema::ColumnType::Time => data_types::ColumnType::Time, - column_schema::ColumnType::Tag => data_types::ColumnType::Tag, - column_schema::ColumnType::Unspecified => { - return Err("unknown column type".into()) - } - }) - } - } } } @@ -239,9 +220,6 @@ pub use influxdata::platform::storage::*; pub mod google; -#[cfg(any(feature = "data_types_conversions", test))] -pub mod compactor; - pub use prost::{DecodeError, EncodeError}; #[cfg(test)] @@ -263,40 +241,4 @@ mod tests { // The URL must start with the type.googleapis.com prefix assert!(!protobuf_type_url_eq(STORAGE_SERVICE, STORAGE_SERVICE,)); } - - #[test] - fn test_column_schema() { - use influxdata::iox::schema::v1::*; - - assert_eq!( - data_types::ColumnType::try_from(column_schema::ColumnType::I64).unwrap(), - data_types::ColumnType::I64, - ); - assert_eq!( - data_types::ColumnType::try_from(column_schema::ColumnType::U64).unwrap(), - data_types::ColumnType::U64, - ); - assert_eq!( - data_types::ColumnType::try_from(column_schema::ColumnType::F64).unwrap(), - data_types::ColumnType::F64, - ); - assert_eq!( - data_types::ColumnType::try_from(column_schema::ColumnType::Bool).unwrap(), - data_types::ColumnType::Bool, - ); - assert_eq!( - data_types::ColumnType::try_from(column_schema::ColumnType::String).unwrap(), - data_types::ColumnType::String, - ); - assert_eq!( - data_types::ColumnType::try_from(column_schema::ColumnType::Time).unwrap(), - data_types::ColumnType::Time, - ); - assert_eq!( - data_types::ColumnType::try_from(column_schema::ColumnType::Tag).unwrap(), - data_types::ColumnType::Tag, - ); - - assert!(data_types::ColumnType::try_from(column_schema::ColumnType::Unspecified).is_err()); - } } diff --git a/influxdb_iox_client/Cargo.toml b/influxdb_iox_client/Cargo.toml index 85425cbceb..63e8cc00e8 100644 --- a/influxdb_iox_client/Cargo.toml +++ b/influxdb_iox_client/Cargo.toml @@ -19,7 +19,7 @@ client_util = { path = "../client_util" } comfy-table = { version = "6.1", default-features = false} futures-util = { version = "0.3" } influxdb-line-protocol = { path = "../influxdb_line_protocol"} -generated_types = { path = "../generated_types", default-features = false, features = ["data_types_conversions"] } +generated_types = { path = "../generated_types" } prost = "0.11" rand = "0.8.3" reqwest = { version = "0.11", default-features = false, features = ["stream", "rustls-tls"] } diff --git a/influxdb_storage_client/Cargo.toml b/influxdb_storage_client/Cargo.toml index 28a6c958b2..fdd3a8c9e7 100644 --- a/influxdb_storage_client/Cargo.toml +++ b/influxdb_storage_client/Cargo.toml @@ -7,7 +7,7 @@ license.workspace = true [dependencies] client_util = { path = "../client_util" } -generated_types = { path = "../generated_types", default-features=false, features=["data_types"] } +generated_types = { path = "../generated_types" } prost = "0.11" tonic = { workspace = true } futures-util = { version = "0.3" } diff --git a/workspace-hack/Cargo.toml b/workspace-hack/Cargo.toml index c4d6e7809a..6d1980e007 100644 --- a/workspace-hack/Cargo.toml +++ b/workspace-hack/Cargo.toml @@ -155,7 +155,6 @@ url = { version = "2" } uuid = { version = "1", features = ["v4"] } [target.x86_64-unknown-linux-gnu.dependencies] -hyper = { version = "0.14", features = ["full"] } io-lifetimes = { version = "1" } nix = { version = "0.26" } once_cell = { version = "1", default-features = false, features = ["unstable"] } @@ -167,7 +166,6 @@ once_cell = { version = "1", default-features = false, features = ["unstable"] } rustix = { version = "0.37", features = ["fs", "termios"] } [target.x86_64-apple-darwin.dependencies] -hyper = { version = "0.14", features = ["full"] } io-lifetimes = { version = "1" } nix = { version = "0.26" } once_cell = { version = "1", default-features = false, features = ["unstable"] } @@ -179,7 +177,6 @@ once_cell = { version = "1", default-features = false, features = ["unstable"] } rustix = { version = "0.37", features = ["fs", "termios"] } [target.aarch64-apple-darwin.dependencies] -hyper = { version = "0.14", features = ["full"] } io-lifetimes = { version = "1" } nix = { version = "0.26" } once_cell = { version = "1", default-features = false, features = ["unstable"] } @@ -191,7 +188,6 @@ once_cell = { version = "1", default-features = false, features = ["unstable"] } rustix = { version = "0.37", features = ["fs", "termios"] } [target.x86_64-pc-windows-msvc.dependencies] -hyper = { version = "0.14", features = ["full"] } once_cell = { version = "1", default-features = false, features = ["unstable"] } scopeguard = { version = "1" } winapi = { version = "0.3", default-features = false, features = ["basetsd", "consoleapi", "errhandlingapi", "fileapi", "handleapi", "impl-debug", "impl-default", "knownfolders", "minwinbase", "minwindef", "ntsecapi", "ntstatus", "objbase", "processenv", "shellapi", "shlobj", "std", "stringapiset", "synchapi", "timezoneapi", "winbase", "wincon", "winerror", "winnt", "winreg", "winuser", "ws2ipdef", "ws2tcpip", "wtypesbase"] }