Merge pull request #7789 from influxdata/cn/move-peas

fix: Have data_types depend on generated_types rather than the other way around
pull/24376/head
kodiakhq[bot] 2023-05-15 14:42:10 +00:00 committed by GitHub
commit 41ec673f6f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
34 changed files with 297 additions and 301 deletions

35
Cargo.lock generated
View File

@ -1414,6 +1414,7 @@ name = "data_types"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"croaring", "croaring",
"generated_types",
"influxdb-line-protocol", "influxdb-line-protocol",
"iox_time", "iox_time",
"observability_deps", "observability_deps",
@ -2036,17 +2037,12 @@ version = "0.1.0"
dependencies = [ dependencies = [
"base64 0.21.0", "base64 0.21.0",
"bytes", "bytes",
"data_types",
"datafusion",
"datafusion-proto",
"observability_deps", "observability_deps",
"pbjson", "pbjson",
"pbjson-build", "pbjson-build",
"pbjson-types", "pbjson-types",
"predicate",
"prost", "prost",
"prost-build", "prost-build",
"query_functions",
"serde", "serde",
"snafu", "snafu",
"tonic", "tonic",
@ -2565,6 +2561,7 @@ dependencies = [
"influxdb_iox_client", "influxdb_iox_client",
"influxdb_storage_client", "influxdb_storage_client",
"influxrpc_parser", "influxrpc_parser",
"ingester_query_grpc",
"insta", "insta",
"iox_catalog", "iox_catalog",
"iox_query", "iox_query",
@ -2698,6 +2695,7 @@ dependencies = [
"generated_types", "generated_types",
"hashbrown 0.13.2", "hashbrown 0.13.2",
"influxdb_iox_client", "influxdb_iox_client",
"ingester_query_grpc",
"ingester_test_ctx", "ingester_test_ctx",
"iox_catalog", "iox_catalog",
"iox_query", "iox_query",
@ -2732,6 +2730,27 @@ dependencies = [
"workspace-hack", "workspace-hack",
] ]
[[package]]
name = "ingester_query_grpc"
version = "0.1.0"
dependencies = [
"base64 0.21.0",
"data_types",
"datafusion",
"datafusion-proto",
"pbjson",
"pbjson-build",
"pbjson-types",
"predicate",
"prost",
"prost-build",
"query_functions",
"serde",
"snafu",
"tonic-build",
"workspace-hack",
]
[[package]] [[package]]
name = "ingester_test_ctx" name = "ingester_test_ctx"
version = "0.1.0" version = "0.1.0"
@ -2746,6 +2765,7 @@ dependencies = [
"hashbrown 0.13.2", "hashbrown 0.13.2",
"influxdb_iox_client", "influxdb_iox_client",
"ingester", "ingester",
"ingester_query_grpc",
"iox_catalog", "iox_catalog",
"iox_query", "iox_query",
"iox_time", "iox_time",
@ -4426,8 +4446,8 @@ dependencies = [
"datafusion", "datafusion",
"datafusion_util", "datafusion_util",
"futures", "futures",
"generated_types",
"influxdb_iox_client", "influxdb_iox_client",
"ingester_query_grpc",
"insta", "insta",
"iox_catalog", "iox_catalog",
"iox_query", "iox_query",
@ -5612,6 +5632,7 @@ dependencies = [
"http", "http",
"hyper", "hyper",
"influxdb_iox_client", "influxdb_iox_client",
"ingester_query_grpc",
"iox_catalog", "iox_catalog",
"mutable_batch_lp", "mutable_batch_lp",
"mutable_batch_pb", "mutable_batch_pb",
@ -6714,7 +6735,6 @@ dependencies = [
"hashbrown 0.12.3", "hashbrown 0.12.3",
"hashbrown 0.13.2", "hashbrown 0.13.2",
"heck", "heck",
"hyper",
"indexmap", "indexmap",
"io-lifetimes", "io-lifetimes",
"itertools", "itertools",
@ -6751,6 +6771,7 @@ dependencies = [
"sha2", "sha2",
"similar", "similar",
"smallvec", "smallvec",
"sqlparser",
"sqlx", "sqlx",
"sqlx-core", "sqlx-core",
"sqlx-macros", "sqlx-macros",

View File

@ -30,6 +30,7 @@ members = [
"influxrpc_parser", "influxrpc_parser",
"ingester_test_ctx", "ingester_test_ctx",
"ingester", "ingester",
"ingester_query_grpc",
"iox_catalog", "iox_catalog",
"iox_data_generator", "iox_data_generator",
"iox_query_influxql", "iox_query_influxql",

View File

@ -10,6 +10,7 @@ license.workspace = true
croaring = "0.8.1" croaring = "0.8.1"
influxdb-line-protocol = { path = "../influxdb_line_protocol" } influxdb-line-protocol = { path = "../influxdb_line_protocol" }
iox_time = { path = "../iox_time" } iox_time = { path = "../iox_time" }
generated_types = { path = "../generated_types" }
observability_deps = { path = "../observability_deps" } observability_deps = { path = "../observability_deps" }
once_cell = "1" once_cell = "1"
ordered-float = "3" ordered-float = "3"

View File

@ -1,6 +1,7 @@
//! Types having to do with columns. //! Types having to do with columns.
use super::TableId; use super::TableId;
use generated_types::influxdata::iox::schema::v1 as proto;
use influxdb_line_protocol::FieldValue; use influxdb_line_protocol::FieldValue;
use schema::{builder::SchemaBuilder, InfluxColumnType, InfluxFieldType, Schema}; use schema::{builder::SchemaBuilder, InfluxColumnType, InfluxFieldType, Schema};
use sqlx::postgres::PgHasArrayType; use sqlx::postgres::PgHasArrayType;
@ -306,6 +307,25 @@ pub fn column_type_from_field(field_value: &FieldValue) -> ColumnType {
} }
} }
impl TryFrom<proto::column_schema::ColumnType> for ColumnType {
type Error = Box<dyn std::error::Error>;
fn try_from(value: proto::column_schema::ColumnType) -> Result<Self, Self::Error> {
Ok(match value {
proto::column_schema::ColumnType::I64 => ColumnType::I64,
proto::column_schema::ColumnType::U64 => ColumnType::U64,
proto::column_schema::ColumnType::F64 => ColumnType::F64,
proto::column_schema::ColumnType::Bool => ColumnType::Bool,
proto::column_schema::ColumnType::String => ColumnType::String,
proto::column_schema::ColumnType::Time => ColumnType::Time,
proto::column_schema::ColumnType::Tag => ColumnType::Tag,
proto::column_schema::ColumnType::Unspecified => {
return Err("unknown column type".into())
}
})
}
}
/// Set of columns. /// Set of columns.
#[derive(Debug, Clone, PartialEq, Eq, sqlx::Type)] #[derive(Debug, Clone, PartialEq, Eq, sqlx::Type)]
#[sqlx(transparent)] #[sqlx(transparent)]
@ -364,4 +384,38 @@ mod tests {
fn test_column_set_duplicates() { fn test_column_set_duplicates() {
ColumnSet::new([ColumnId::new(1), ColumnId::new(2), ColumnId::new(1)]); ColumnSet::new([ColumnId::new(1), ColumnId::new(2), ColumnId::new(1)]);
} }
#[test]
fn test_column_schema() {
assert_eq!(
ColumnType::try_from(proto::column_schema::ColumnType::I64).unwrap(),
ColumnType::I64,
);
assert_eq!(
ColumnType::try_from(proto::column_schema::ColumnType::U64).unwrap(),
ColumnType::U64,
);
assert_eq!(
ColumnType::try_from(proto::column_schema::ColumnType::F64).unwrap(),
ColumnType::F64,
);
assert_eq!(
ColumnType::try_from(proto::column_schema::ColumnType::Bool).unwrap(),
ColumnType::Bool,
);
assert_eq!(
ColumnType::try_from(proto::column_schema::ColumnType::String).unwrap(),
ColumnType::String,
);
assert_eq!(
ColumnType::try_from(proto::column_schema::ColumnType::Time).unwrap(),
ColumnType::Time,
);
assert_eq!(
ColumnType::try_from(proto::column_schema::ColumnType::Tag).unwrap(),
ColumnType::Tag,
);
assert!(ColumnType::try_from(proto::column_schema::ColumnType::Unspecified).is_err());
}
} }

View File

@ -623,6 +623,33 @@ pub struct SkippedCompaction {
pub limit_num_files_first_in_partition: i64, pub limit_num_files_first_in_partition: i64,
} }
use generated_types::influxdata::iox::compactor::v1 as compactor_proto;
impl From<SkippedCompaction> for compactor_proto::SkippedCompaction {
fn from(skipped_compaction: SkippedCompaction) -> Self {
let SkippedCompaction {
partition_id,
reason,
skipped_at,
estimated_bytes,
limit_bytes,
num_files,
limit_num_files,
limit_num_files_first_in_partition,
} = skipped_compaction;
Self {
partition_id: partition_id.get(),
reason,
skipped_at: skipped_at.get(),
estimated_bytes,
limit_bytes,
num_files,
limit_num_files,
limit_num_files_first_in_partition: Some(limit_num_files_first_in_partition),
}
}
}
/// Data for a parquet file reference that has been inserted in the catalog. /// Data for a parquet file reference that has been inserted in the catalog.
#[derive(Debug, Clone, PartialEq, Eq, sqlx::FromRow)] #[derive(Debug, Clone, PartialEq, Eq, sqlx::FromRow)]
pub struct ParquetFile { pub struct ParquetFile {

View File

@ -8,15 +8,10 @@ license.workspace = true
[dependencies] # In alphabetical order [dependencies] # In alphabetical order
base64 = "0.21" base64 = "0.21"
bytes = "1.4" bytes = "1.4"
data_types = { path = "../data_types", optional = true }
datafusion = { workspace = true, optional = true }
datafusion-proto = { workspace = true, optional = true }
observability_deps = { path = "../observability_deps" } observability_deps = { path = "../observability_deps" }
pbjson = "0.5" pbjson = "0.5"
pbjson-types = "0.5" pbjson-types = "0.5"
predicate = { path = "../predicate", optional = true }
prost = "0.11" prost = "0.11"
query_functions = { path = "../query_functions" }
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
snafu = "0.7" snafu = "0.7"
tonic = { workspace = true } tonic = { workspace = true }
@ -26,12 +21,3 @@ workspace-hack = { version = "0.1", path = "../workspace-hack" }
tonic-build = { workspace = true } tonic-build = { workspace = true }
prost-build = "0.11" prost-build = "0.11"
pbjson-build = "0.5" pbjson-build = "0.5"
[dev-dependencies]
data_types = { path = "../data_types" }
datafusion = { workspace = true }
predicate = { path = "../predicate" }
[features]
default = ["data_types_conversions"]
data_types_conversions = ["data_types", "datafusion", "datafusion-proto", "predicate"]

View File

@ -53,7 +53,6 @@ fn generate_grpc_types(root: &Path) -> Result<()> {
compactor_path.join("service.proto"), compactor_path.join("service.proto"),
delete_path.join("service.proto"), delete_path.join("service.proto"),
ingester_path.join("parquet_metadata.proto"), ingester_path.join("parquet_metadata.proto"),
ingester_path.join("query.proto"),
ingester_path.join("write.proto"), ingester_path.join("write.proto"),
ingester_path.join("replication.proto"), ingester_path.join("replication.proto"),
ingester_path.join("persist.proto"), ingester_path.join("persist.proto"),

View File

@ -1,28 +0,0 @@
use crate::influxdata::iox::compactor::v1 as proto;
use data_types::SkippedCompaction;
impl From<SkippedCompaction> for proto::SkippedCompaction {
fn from(skipped_compaction: SkippedCompaction) -> Self {
let SkippedCompaction {
partition_id,
reason,
skipped_at,
estimated_bytes,
limit_bytes,
num_files,
limit_num_files,
limit_num_files_first_in_partition,
} = skipped_compaction;
Self {
partition_id: partition_id.get(),
reason,
skipped_at: skipped_at.get(),
estimated_bytes,
limit_bytes,
num_files,
limit_num_files,
limit_num_files_first_in_partition: Some(limit_num_files_first_in_partition),
}
}
}

View File

@ -1,151 +0,0 @@
//! Code to serialize and deserialize certain expressions.
//!
//! Note that [Ballista] also provides a serialization using [Protocol Buffers 3]. However the
//! protocol is meant as a communication channel between workers and clients of Ballista, not for
//! long term preservation. For IOx we need a more stable solution. Luckily we only need to support
//! a very small subset of expression.
//!
//! [Ballista]: https://github.com/apache/arrow-datafusion/blob/22fcb3d7a68a56afbe12eab9e7d98f7b8de33703/ballista/rust/core/proto/ballista.proto
//! [Protocol Buffers 3]: https://developers.google.com/protocol-buffers/docs/proto3
use crate::google::{FieldViolation, FromOptionalField, FromRepeatedField, OptionalField};
use crate::influxdata::iox::predicate::v1 as proto;
use crate::influxdata::iox::predicate::v1::scalar::Value;
use crate::influxdata::iox::predicate::v1::{Expr, Predicate};
use data_types::{DeleteExpr, DeletePredicate, Op, Scalar, TimestampRange};
impl From<DeletePredicate> for proto::Predicate {
fn from(predicate: DeletePredicate) -> Self {
proto::Predicate {
range: Some(proto::TimestampRange {
start: predicate.range.start(),
end: predicate.range.end(),
}),
exprs: predicate.exprs.into_iter().map(Into::into).collect(),
}
}
}
impl TryFrom<proto::Predicate> for DeletePredicate {
type Error = FieldViolation;
fn try_from(value: Predicate) -> Result<Self, Self::Error> {
let range = value.range.unwrap_field("range")?;
Ok(Self {
range: TimestampRange::new(range.start, range.end),
exprs: value.exprs.repeated("exprs")?,
})
}
}
impl TryFrom<proto::Expr> for DeleteExpr {
type Error = FieldViolation;
fn try_from(value: Expr) -> Result<Self, Self::Error> {
Ok(Self {
column: value.column,
op: proto::Op::from_i32(value.op).required("op")?,
scalar: value.scalar.required("scalar")?,
})
}
}
impl From<DeleteExpr> for proto::Expr {
fn from(expr: DeleteExpr) -> Self {
Self {
column: expr.column,
op: proto::Op::from(expr.op).into(),
scalar: Some(expr.scalar.into()),
}
}
}
impl TryFrom<proto::Scalar> for Scalar {
type Error = FieldViolation;
fn try_from(value: proto::Scalar) -> Result<Self, Self::Error> {
Ok(value.value.unwrap_field("value")?.into())
}
}
impl From<proto::scalar::Value> for Scalar {
fn from(value: Value) -> Self {
match value {
Value::ValueBool(v) => Self::Bool(v),
Value::ValueI64(v) => Self::I64(v),
Value::ValueF64(v) => Self::F64(v.into()),
Value::ValueString(v) => Self::String(v),
}
}
}
impl From<Scalar> for proto::Scalar {
fn from(value: Scalar) -> Self {
let value = match value {
Scalar::Bool(v) => Value::ValueBool(v),
Scalar::I64(v) => Value::ValueI64(v),
Scalar::F64(v) => Value::ValueF64(v.0),
Scalar::String(v) => Value::ValueString(v),
};
Self { value: Some(value) }
}
}
impl TryFrom<proto::Op> for Op {
type Error = FieldViolation;
fn try_from(value: proto::Op) -> Result<Self, Self::Error> {
match value {
proto::Op::Unspecified => Err(FieldViolation::required("")),
proto::Op::Eq => Ok(Self::Eq),
proto::Op::Ne => Ok(Self::Ne),
}
}
}
impl From<Op> for proto::Op {
fn from(value: Op) -> Self {
match value {
Op::Eq => Self::Eq,
Op::Ne => Self::Ne,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_roundtrip() {
let round_trip = |expr: DeleteExpr| {
let serialized: proto::Expr = expr.clone().into();
let deserialized: DeleteExpr = serialized.try_into().unwrap();
assert_eq!(expr, deserialized);
};
round_trip(DeleteExpr {
column: "foo".to_string(),
op: Op::Eq,
scalar: Scalar::Bool(true),
});
round_trip(DeleteExpr {
column: "bar".to_string(),
op: Op::Ne,
scalar: Scalar::I64(-1),
});
round_trip(DeleteExpr {
column: "baz".to_string(),
op: Op::Eq,
scalar: Scalar::F64((-1.1).into()),
});
round_trip(DeleteExpr {
column: "col".to_string(),
op: Op::Eq,
scalar: Scalar::String("foo".to_string()),
});
}
}

View File

@ -145,25 +145,6 @@ pub mod influxdata {
env!("OUT_DIR"), env!("OUT_DIR"),
"/influxdata.iox.schema.v1.serde.rs" "/influxdata.iox.schema.v1.serde.rs"
)); ));
impl TryFrom<column_schema::ColumnType> for data_types::ColumnType {
type Error = Box<dyn std::error::Error>;
fn try_from(value: column_schema::ColumnType) -> Result<Self, Self::Error> {
Ok(match value {
column_schema::ColumnType::I64 => data_types::ColumnType::I64,
column_schema::ColumnType::U64 => data_types::ColumnType::U64,
column_schema::ColumnType::F64 => data_types::ColumnType::F64,
column_schema::ColumnType::Bool => data_types::ColumnType::Bool,
column_schema::ColumnType::String => data_types::ColumnType::String,
column_schema::ColumnType::Time => data_types::ColumnType::Time,
column_schema::ColumnType::Tag => data_types::ColumnType::Tag,
column_schema::ColumnType::Unspecified => {
return Err("unknown column type".into())
}
})
}
}
} }
} }
@ -239,13 +220,6 @@ pub use influxdata::platform::storage::*;
pub mod google; pub mod google;
#[cfg(any(feature = "data_types_conversions", test))]
pub mod compactor;
#[cfg(any(feature = "data_types_conversions", test))]
pub mod delete_predicate;
#[cfg(any(feature = "data_types_conversions", test))]
pub mod ingester;
pub use prost::{DecodeError, EncodeError}; pub use prost::{DecodeError, EncodeError};
#[cfg(test)] #[cfg(test)]
@ -267,40 +241,4 @@ mod tests {
// The URL must start with the type.googleapis.com prefix // The URL must start with the type.googleapis.com prefix
assert!(!protobuf_type_url_eq(STORAGE_SERVICE, STORAGE_SERVICE,)); assert!(!protobuf_type_url_eq(STORAGE_SERVICE, STORAGE_SERVICE,));
} }
#[test]
fn test_column_schema() {
use influxdata::iox::schema::v1::*;
assert_eq!(
data_types::ColumnType::try_from(column_schema::ColumnType::I64).unwrap(),
data_types::ColumnType::I64,
);
assert_eq!(
data_types::ColumnType::try_from(column_schema::ColumnType::U64).unwrap(),
data_types::ColumnType::U64,
);
assert_eq!(
data_types::ColumnType::try_from(column_schema::ColumnType::F64).unwrap(),
data_types::ColumnType::F64,
);
assert_eq!(
data_types::ColumnType::try_from(column_schema::ColumnType::Bool).unwrap(),
data_types::ColumnType::Bool,
);
assert_eq!(
data_types::ColumnType::try_from(column_schema::ColumnType::String).unwrap(),
data_types::ColumnType::String,
);
assert_eq!(
data_types::ColumnType::try_from(column_schema::ColumnType::Time).unwrap(),
data_types::ColumnType::Time,
);
assert_eq!(
data_types::ColumnType::try_from(column_schema::ColumnType::Tag).unwrap(),
data_types::ColumnType::Tag,
);
assert!(data_types::ColumnType::try_from(column_schema::ColumnType::Unspecified).is_err());
}
} }

View File

@ -19,6 +19,7 @@ import = { path = "../import" }
influxdb_iox_client = { path = "../influxdb_iox_client", features = ["flight", "format"] } influxdb_iox_client = { path = "../influxdb_iox_client", features = ["flight", "format"] }
influxdb_storage_client = { path = "../influxdb_storage_client" } influxdb_storage_client = { path = "../influxdb_storage_client" }
influxrpc_parser = { path = "../influxrpc_parser"} influxrpc_parser = { path = "../influxrpc_parser"}
ingester_query_grpc = { path = "../ingester_query_grpc" }
iox_catalog = { path = "../iox_catalog" } iox_catalog = { path = "../iox_catalog" }
ioxd_common = { path = "../ioxd_common"} ioxd_common = { path = "../ioxd_common"}
ioxd_compactor = { path = "../ioxd_compactor"} ioxd_compactor = { path = "../ioxd_compactor"}

View File

@ -1,12 +1,9 @@
use arrow_flight::Ticket; use arrow_flight::Ticket;
use futures::TryStreamExt; use futures::TryStreamExt;
use generated_types::ingester::{ use influxdb_iox_client::{connection::Connection, format::QueryOutputFormat};
decode_proto_predicate_from_base64, DecodeProtoPredicateFromBase64Error, use ingester_query_grpc::{
}; decode_proto_predicate_from_base64, influxdata::iox::ingester::v1::IngesterQueryRequest,
use influxdb_iox_client::{ DecodeProtoPredicateFromBase64Error,
connection::Connection,
flight::{self},
format::QueryOutputFormat,
}; };
use prost::Message; use prost::Message;
use std::str::FromStr; use std::str::FromStr;
@ -73,7 +70,7 @@ pub async fn command(connection: Connection, config: Config) -> Result<()> {
None None
}; };
let request = flight::generated_types::IngesterQueryRequest { let request = IngesterQueryRequest {
table_id, table_id,
columns, columns,
predicate, predicate,

View File

@ -2,8 +2,8 @@ use arrow_flight::{error::FlightError, Ticket};
use arrow_util::assert_batches_sorted_eq; use arrow_util::assert_batches_sorted_eq;
use data_types::{NamespaceId, TableId}; use data_types::{NamespaceId, TableId};
use futures::FutureExt; use futures::FutureExt;
use generated_types::{influxdata::iox::ingester::v1 as proto, ingester::IngesterQueryRequest};
use http::StatusCode; use http::StatusCode;
use ingester_query_grpc::{influxdata::iox::ingester::v1 as proto, IngesterQueryRequest};
use prost::Message; use prost::Message;
use test_helpers_end_to_end::{maybe_skip_integration, MiniCluster, Step, StepTest, StepTestState}; use test_helpers_end_to_end::{maybe_skip_integration, MiniCluster, Step, StepTest, StepTestState};

View File

@ -1,6 +1,6 @@
use arrow_util::assert_batches_sorted_eq; use arrow_util::assert_batches_sorted_eq;
use futures::FutureExt; use futures::FutureExt;
use generated_types::{influxdata::iox::ingester::v1 as proto, ingester::IngesterQueryRequest}; use ingester_query_grpc::{influxdata::iox::ingester::v1 as proto, IngesterQueryRequest};
use std::num::NonZeroUsize; use std::num::NonZeroUsize;
use test_helpers_end_to_end::{ use test_helpers_end_to_end::{
maybe_skip_integration, MiniCluster, Step, StepTest, StepTestState, TestConfig, maybe_skip_integration, MiniCluster, Step, StepTest, StepTestState, TestConfig,

View File

@ -19,7 +19,7 @@ client_util = { path = "../client_util" }
comfy-table = { version = "6.1", default-features = false} comfy-table = { version = "6.1", default-features = false}
futures-util = { version = "0.3" } futures-util = { version = "0.3" }
influxdb-line-protocol = { path = "../influxdb_line_protocol"} influxdb-line-protocol = { path = "../influxdb_line_protocol"}
generated_types = { path = "../generated_types", default-features = false, features = ["data_types_conversions"] } generated_types = { path = "../generated_types" }
prost = "0.11" prost = "0.11"
rand = "0.8.3" rand = "0.8.3"
reqwest = { version = "0.11", default-features = false, features = ["stream", "rustls-tls"] } reqwest = { version = "0.11", default-features = false, features = ["stream", "rustls-tls"] }

View File

@ -21,10 +21,7 @@ use crate::connection::Connection;
/// Re-export generated_types /// Re-export generated_types
pub mod generated_types { pub mod generated_types {
pub use generated_types::influxdata::iox::{ pub use generated_types::influxdata::iox::querier::v1::*;
ingester::v1::{IngesterQueryRequest, IngesterQueryResponseMetadata, Predicate},
querier::v1::*,
};
} }
/// Error responses when querying an IOx namespace using the IOx Flight API. /// Error responses when querying an IOx namespace using the IOx Flight API.

View File

@ -7,7 +7,7 @@ license.workspace = true
[dependencies] [dependencies]
client_util = { path = "../client_util" } client_util = { path = "../client_util" }
generated_types = { path = "../generated_types", default-features=false, features=["data_types"] } generated_types = { path = "../generated_types" }
prost = "0.11" prost = "0.11"
tonic = { workspace = true } tonic = { workspace = true }
futures-util = { version = "0.3" } futures-util = { version = "0.3" }

View File

@ -22,6 +22,7 @@ flatbuffers = "23.1.21"
futures = "0.3.28" futures = "0.3.28"
generated_types = { version = "0.1.0", path = "../generated_types" } generated_types = { version = "0.1.0", path = "../generated_types" }
hashbrown.workspace = true hashbrown.workspace = true
ingester_query_grpc = { path = "../ingester_query_grpc" }
iox_catalog = { version = "0.1.0", path = "../iox_catalog" } iox_catalog = { version = "0.1.0", path = "../iox_catalog" }
iox_query = { version = "0.1.0", path = "../iox_query" } iox_query = { version = "0.1.0", path = "../iox_query" }
iox_time = { path = "../iox_time" } iox_time = { path = "../iox_time" }

View File

@ -9,7 +9,7 @@ use arrow_flight::{
use data_types::{NamespaceId, PartitionId, TableId}; use data_types::{NamespaceId, PartitionId, TableId};
use flatbuffers::FlatBufferBuilder; use flatbuffers::FlatBufferBuilder;
use futures::{Stream, StreamExt, TryStreamExt}; use futures::{Stream, StreamExt, TryStreamExt};
use generated_types::influxdata::iox::ingester::v1 as proto; use ingester_query_grpc::influxdata::iox::ingester::v1 as proto;
use metric::U64Counter; use metric::U64Counter;
use observability_deps::tracing::*; use observability_deps::tracing::*;
use prost::Message; use prost::Message;

View File

@ -1,7 +1,7 @@
use arrow_util::assert_batches_sorted_eq; use arrow_util::assert_batches_sorted_eq;
use assert_matches::assert_matches; use assert_matches::assert_matches;
use data_types::PartitionKey; use data_types::PartitionKey;
use influxdb_iox_client::flight::generated_types::IngesterQueryRequest; use ingester_query_grpc::influxdata::iox::ingester::v1::IngesterQueryRequest;
use ingester_test_ctx::TestContextBuilder; use ingester_test_ctx::TestContextBuilder;
use iox_catalog::interface::Catalog; use iox_catalog::interface::Catalog;
use metric::{DurationHistogram, U64Histogram}; use metric::{DurationHistogram, U64Histogram};

View File

@ -0,0 +1,26 @@
[package]
name = "ingester_query_grpc"
version.workspace = true
authors.workspace = true
edition.workspace = true
license.workspace = true
[dependencies] # In alphabetical order
base64 = "0.21"
data_types = { path = "../data_types" }
datafusion = { workspace = true }
datafusion-proto = { workspace = true }
pbjson = "0.5"
pbjson-types = "0.5"
predicate = { path = "../predicate" }
prost = "0.11"
query_functions = { path = "../query_functions" }
serde = { version = "1.0", features = ["derive"] }
snafu = "0.7"
workspace-hack = { version = "0.1", path = "../workspace-hack" }
[build-dependencies] # In alphabetical order
tonic-build = { workspace = true }
prost-build = "0.11"
pbjson-build = "0.5"

View File

@ -0,0 +1,56 @@
//! Compiles Protocol Buffers into native Rust types.
use std::env;
use std::path::{Path, PathBuf};
type Error = Box<dyn std::error::Error>;
type Result<T, E = Error> = std::result::Result<T, E>;
fn main() -> Result<()> {
let root = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("protos");
generate_grpc_types(&root)?;
Ok(())
}
/// Schema used with IOx specific Ingester-Querier gRPC requests
///
/// Creates:
///
/// - `influxdata.iox.ingester.v1.rs`
fn generate_grpc_types(root: &Path) -> Result<()> {
let ingester_path = root.join("influxdata/iox/ingester/v1");
let proto_files = vec![ingester_path.join("query.proto")];
// Tell cargo to recompile if any of these proto files are changed
for proto_file in &proto_files {
println!("cargo:rerun-if-changed={}", proto_file.display());
}
let mut config = prost_build::Config::new();
config
.compile_well_known_types()
.disable_comments([".google"])
.extern_path(".google.protobuf", "::pbjson_types")
.btree_map([
".influxdata.iox.ingester.v1.IngesterQueryResponseMetadata.unpersisted_partitions",
]);
let descriptor_path = PathBuf::from(env::var("OUT_DIR").unwrap()).join("proto_descriptor.bin");
tonic_build::configure()
.file_descriptor_set_path(&descriptor_path)
// protoc in ubuntu builder needs this option
.protoc_arg("--experimental_allow_proto3_optional")
.compile_with_config(config, &proto_files, &[root])?;
let descriptor_set = std::fs::read(descriptor_path)?;
pbjson_build::Builder::new()
.register_descriptors(&descriptor_set)?
.build(&[".influxdata.iox"])?;
Ok(())
}

View File

@ -1,4 +1,10 @@
use crate::{google::FieldViolation, influxdata::iox::ingester::v1 as proto}; // This crate deliberately does not use the same linting rules as the other
// crates because of all the generated code it contains that we don't have much
// control over.
#![deny(rustdoc::broken_intra_doc_links, rustdoc::bare_urls)]
#![allow(clippy::derive_partial_eq_without_eq, clippy::needless_borrow)]
use crate::influxdata::iox::ingester::v1 as proto;
use base64::{prelude::BASE64_STANDARD, Engine}; use base64::{prelude::BASE64_STANDARD, Engine};
use data_types::{NamespaceId, TableId, TimestampRange}; use data_types::{NamespaceId, TableId, TimestampRange};
use datafusion::{common::DataFusionError, prelude::Expr}; use datafusion::{common::DataFusionError, prelude::Expr};
@ -7,6 +13,67 @@ use predicate::{Predicate, ValueExpr};
use prost::Message; use prost::Message;
use snafu::{ResultExt, Snafu}; use snafu::{ResultExt, Snafu};
/// This module imports the generated protobuf code into a Rust module
/// hierarchy that matches the namespace hierarchy of the protobuf
/// definitions
pub mod influxdata {
pub mod iox {
pub mod ingester {
pub mod v1 {
include!(concat!(env!("OUT_DIR"), "/influxdata.iox.ingester.v1.rs"));
include!(concat!(
env!("OUT_DIR"),
"/influxdata.iox.ingester.v1.serde.rs"
));
}
}
}
}
/// Error returned if a request field has an invalid value. Includes
/// machinery to add parent field names for context -- thus it will
/// report `rules.write_timeout` than simply `write_timeout`.
#[derive(Debug, Default, Clone, PartialEq)]
pub struct FieldViolation {
pub field: String,
pub description: String,
}
impl FieldViolation {
pub fn required(field: impl Into<String>) -> Self {
Self {
field: field.into(),
description: "Field is required".to_string(),
}
}
/// Re-scopes this error as the child of another field
pub fn scope(self, field: impl Into<String>) -> Self {
let field = if self.field.is_empty() {
field.into()
} else {
[field.into(), self.field].join(".")
};
Self {
field,
description: self.description,
}
}
}
impl std::error::Error for FieldViolation {}
impl std::fmt::Display for FieldViolation {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"Violation for field \"{}\": {}",
self.field, self.description
)
}
}
fn expr_to_bytes_violation(field: impl Into<String>, e: DataFusionError) -> FieldViolation { fn expr_to_bytes_violation(field: impl Into<String>, e: DataFusionError) -> FieldViolation {
FieldViolation { FieldViolation {
field: field.into(), field: field.into(),

View File

@ -16,6 +16,7 @@ generated_types = { version = "0.1.0", path = "../generated_types" }
hashbrown.workspace = true hashbrown.workspace = true
influxdb_iox_client = { path = "../influxdb_iox_client" } influxdb_iox_client = { path = "../influxdb_iox_client" }
ingester = { path = "../ingester" } ingester = { path = "../ingester" }
ingester_query_grpc = { path = "../ingester_query_grpc" }
iox_catalog = { version = "0.1.0", path = "../iox_catalog" } iox_catalog = { version = "0.1.0", path = "../iox_catalog" }
iox_query = { version = "0.1.0", path = "../iox_query" } iox_query = { version = "0.1.0", path = "../iox_query" }
iox_time = { path = "../iox_time" } iox_time = { path = "../iox_time" }

View File

@ -25,8 +25,8 @@ use futures::{stream::FuturesUnordered, FutureExt, StreamExt, TryStreamExt};
use generated_types::influxdata::iox::ingester::v1::{ use generated_types::influxdata::iox::ingester::v1::{
write_service_server::WriteService, WriteRequest, write_service_server::WriteService, WriteRequest,
}; };
use influxdb_iox_client::flight;
use ingester::{IngesterGuard, IngesterRpcInterface}; use ingester::{IngesterGuard, IngesterRpcInterface};
use ingester_query_grpc::influxdata::iox::ingester::v1::IngesterQueryRequest;
use iox_catalog::{ use iox_catalog::{
interface::{Catalog, SoftDeletedRows}, interface::{Catalog, SoftDeletedRows},
validate_or_insert_schema, validate_or_insert_schema,
@ -342,7 +342,7 @@ where
/// Submit a query to the ingester's public query interface. /// Submit a query to the ingester's public query interface.
pub async fn query( pub async fn query(
&self, &self,
request: flight::generated_types::IngesterQueryRequest, request: IngesterQueryRequest,
) -> Result<Vec<RecordBatch>, influxdb_iox_client::flight::Error> { ) -> Result<Vec<RecordBatch>, influxdb_iox_client::flight::Error> {
let mut bytes = bytes::BytesMut::new(); let mut bytes = bytes::BytesMut::new();
prost::Message::encode(&request, &mut bytes)?; prost::Message::encode(&request, &mut bytes)?;

View File

@ -17,11 +17,11 @@ data_types = { path = "../data_types" }
datafusion = { workspace = true } datafusion = { workspace = true }
datafusion_util = { path = "../datafusion_util" } datafusion_util = { path = "../datafusion_util" }
futures = "0.3" futures = "0.3"
generated_types = { path = "../generated_types" }
influxdb_iox_client = { path = "../influxdb_iox_client" } influxdb_iox_client = { path = "../influxdb_iox_client" }
iox_catalog = { path = "../iox_catalog" } iox_catalog = { path = "../iox_catalog" }
iox_query = { path = "../iox_query" } iox_query = { path = "../iox_query" }
iox_time = { path = "../iox_time" } iox_time = { path = "../iox_time" }
ingester_query_grpc = { path = "../ingester_query_grpc" }
metric = { path = "../metric" } metric = { path = "../metric" }
object_store = "0.5.6" object_store = "0.5.6"
observability_deps = { path = "../observability_deps" } observability_deps = { path = "../observability_deps" }

View File

@ -12,7 +12,7 @@ use std::{
use async_trait::async_trait; use async_trait::async_trait;
use backoff::{Backoff, BackoffConfig}; use backoff::{Backoff, BackoffConfig};
use generated_types::ingester::IngesterQueryRequest; use ingester_query_grpc::IngesterQueryRequest;
use iox_time::{Time, TimeProvider}; use iox_time::{Time, TimeProvider};
use metric::{Metric, Registry, U64Gauge}; use metric::{Metric, Registry, U64Gauge};
use observability_deps::tracing::{info, warn}; use observability_deps::tracing::{info, warn};
@ -505,8 +505,9 @@ mod tests {
use arrow_flight::decode::DecodedPayload; use arrow_flight::decode::DecodedPayload;
use assert_matches::assert_matches; use assert_matches::assert_matches;
use data_types::{NamespaceId, TableId}; use data_types::{NamespaceId, TableId};
use generated_types::google::FieldViolation; use ingester_query_grpc::{
use influxdb_iox_client::flight::generated_types::IngesterQueryResponseMetadata; influxdata::iox::ingester::v1::IngesterQueryResponseMetadata, FieldViolation,
};
use iox_time::MockProvider; use iox_time::MockProvider;
use metric::Attributes; use metric::Attributes;
use test_helpers::maybe_start_logging; use test_helpers::maybe_start_logging;

View File

@ -5,8 +5,7 @@ use arrow_flight::{
use async_trait::async_trait; use async_trait::async_trait;
use client_util::connection::{self, Connection}; use client_util::connection::{self, Connection};
use futures::StreamExt; use futures::StreamExt;
use generated_types::ingester::IngesterQueryRequest; use ingester_query_grpc::{influxdata::iox::ingester::v1 as proto, IngesterQueryRequest};
use influxdb_iox_client::flight::generated_types as proto;
use observability_deps::tracing::{debug, warn}; use observability_deps::tracing::{debug, warn};
use prost::Message; use prost::Message;
use snafu::{ResultExt, Snafu}; use snafu::{ResultExt, Snafu};
@ -33,7 +32,7 @@ pub enum Error {
#[snafu(display("Internal error creating flight request : {}", source))] #[snafu(display("Internal error creating flight request : {}", source))]
CreatingRequest { CreatingRequest {
source: influxdb_iox_client::google::FieldViolation, source: ingester_query_grpc::FieldViolation,
}, },
#[snafu(display("Failed to perform flight request: {}", source))] #[snafu(display("Failed to perform flight request: {}", source))]

View File

@ -1,7 +1,7 @@
use std::sync::Arc; use std::sync::Arc;
use async_trait::async_trait; use async_trait::async_trait;
use generated_types::ingester::IngesterQueryRequest; use ingester_query_grpc::IngesterQueryRequest;
use trace::{ctx::SpanContext, span::SpanRecorder}; use trace::{ctx::SpanContext, span::SpanRecorder};
use crate::ingester::flight_client::{Error as FlightClientError, IngesterFlightClient, QueryData}; use crate::ingester::flight_client::{Error as FlightClientError, IngesterFlightClient, QueryData};

View File

@ -17,8 +17,10 @@ use data_types::{
}; };
use datafusion::error::DataFusionError; use datafusion::error::DataFusionError;
use futures::{stream::FuturesUnordered, TryStreamExt}; use futures::{stream::FuturesUnordered, TryStreamExt};
use generated_types::ingester::{encode_proto_predicate_as_base64, IngesterQueryRequest}; use ingester_query_grpc::{
use influxdb_iox_client::flight::generated_types::IngesterQueryResponseMetadata; encode_proto_predicate_as_base64, influxdata::iox::ingester::v1::IngesterQueryResponseMetadata,
IngesterQueryRequest,
};
use iox_query::{ use iox_query::{
exec::{stringset::StringSet, IOxSessionContext}, exec::{stringset::StringSet, IOxSessionContext},
util::{compute_timenanosecond_min_max, create_basic_summary}, util::{compute_timenanosecond_min_max, create_basic_summary},
@ -623,7 +625,7 @@ impl IngesterStreamDecoder {
} }
fn encode_predicate_as_base64(predicate: &Predicate) -> String { fn encode_predicate_as_base64(predicate: &Predicate) -> String {
use generated_types::influxdata::iox::ingester::v1::Predicate as ProtoPredicate; use ingester_query_grpc::influxdata::iox::ingester::v1::Predicate as ProtoPredicate;
let predicate = match ProtoPredicate::try_from(predicate.clone()) { let predicate = match ProtoPredicate::try_from(predicate.clone()) {
Ok(predicate) => predicate, Ok(predicate) => predicate,
@ -1053,7 +1055,7 @@ mod tests {
}; };
use assert_matches::assert_matches; use assert_matches::assert_matches;
use data_types::TableId; use data_types::TableId;
use influxdb_iox_client::flight::generated_types::IngesterQueryResponseMetadata; use ingester_query_grpc::influxdata::iox::ingester::v1::IngesterQueryResponseMetadata;
use iox_tests::TestCatalog; use iox_tests::TestCatalog;
use metric::Attributes; use metric::Attributes;
use mutable_batch_lp::test_helpers::lp_to_mutable_batch; use mutable_batch_lp::test_helpers::lp_to_mutable_batch;

View File

@ -18,6 +18,7 @@ generated_types = { path = "../generated_types" }
http = "0.2.9" http = "0.2.9"
hyper = "0.14" hyper = "0.14"
influxdb_iox_client = { path = "../influxdb_iox_client", features = ["flight", "format"] } influxdb_iox_client = { path = "../influxdb_iox_client", features = ["flight", "format"] }
ingester_query_grpc = { path = "../ingester_query_grpc" }
iox_catalog = { path = "../iox_catalog" } iox_catalog = { path = "../iox_catalog" }
mutable_batch_lp = { path = "../mutable_batch_lp" } mutable_batch_lp = { path = "../mutable_batch_lp" }
mutable_batch_pb = { path = "../mutable_batch_pb" } mutable_batch_pb = { path = "../mutable_batch_pb" }

View File

@ -15,9 +15,11 @@ use http::Response;
use hyper::Body; use hyper::Body;
use influxdb_iox_client::{ use influxdb_iox_client::{
connection::{Connection, GrpcConnection}, connection::{Connection, GrpcConnection},
flight::generated_types::{IngesterQueryRequest, IngesterQueryResponseMetadata},
schema::generated_types::{schema_service_client::SchemaServiceClient, GetSchemaRequest}, schema::generated_types::{schema_service_client::SchemaServiceClient, GetSchemaRequest},
}; };
use ingester_query_grpc::influxdata::iox::ingester::v1::{
IngesterQueryRequest, IngesterQueryResponseMetadata,
};
use observability_deps::tracing::{debug, info}; use observability_deps::tracing::{debug, info};
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use prost::Message; use prost::Message;

View File

@ -80,6 +80,7 @@ serde_json = { version = "1", features = ["raw_value"] }
sha2 = { version = "0.10" } sha2 = { version = "0.10" }
similar = { version = "2", features = ["inline"] } similar = { version = "2", features = ["inline"] }
smallvec = { version = "1", default-features = false, features = ["union"] } smallvec = { version = "1", default-features = false, features = ["union"] }
sqlparser = { version = "0.33", features = ["visitor"] }
sqlx = { version = "0.6", features = ["json", "postgres", "runtime-tokio-rustls", "sqlite", "tls", "uuid"] } sqlx = { version = "0.6", features = ["json", "postgres", "runtime-tokio-rustls", "sqlite", "tls", "uuid"] }
sqlx-core = { version = "0.6", default-features = false, features = ["any", "migrate", "postgres", "runtime-tokio-rustls", "sqlite", "uuid"] } sqlx-core = { version = "0.6", default-features = false, features = ["any", "migrate", "postgres", "runtime-tokio-rustls", "sqlite", "uuid"] }
thrift = { version = "0.17" } thrift = { version = "0.17" }
@ -154,7 +155,6 @@ url = { version = "2" }
uuid = { version = "1", features = ["v4"] } uuid = { version = "1", features = ["v4"] }
[target.x86_64-unknown-linux-gnu.dependencies] [target.x86_64-unknown-linux-gnu.dependencies]
hyper = { version = "0.14", features = ["full"] }
io-lifetimes = { version = "1" } io-lifetimes = { version = "1" }
nix = { version = "0.26" } nix = { version = "0.26" }
once_cell = { version = "1", default-features = false, features = ["unstable"] } once_cell = { version = "1", default-features = false, features = ["unstable"] }
@ -166,7 +166,6 @@ once_cell = { version = "1", default-features = false, features = ["unstable"] }
rustix = { version = "0.37", features = ["fs", "termios"] } rustix = { version = "0.37", features = ["fs", "termios"] }
[target.x86_64-apple-darwin.dependencies] [target.x86_64-apple-darwin.dependencies]
hyper = { version = "0.14", features = ["full"] }
io-lifetimes = { version = "1" } io-lifetimes = { version = "1" }
nix = { version = "0.26" } nix = { version = "0.26" }
once_cell = { version = "1", default-features = false, features = ["unstable"] } once_cell = { version = "1", default-features = false, features = ["unstable"] }
@ -178,7 +177,6 @@ once_cell = { version = "1", default-features = false, features = ["unstable"] }
rustix = { version = "0.37", features = ["fs", "termios"] } rustix = { version = "0.37", features = ["fs", "termios"] }
[target.aarch64-apple-darwin.dependencies] [target.aarch64-apple-darwin.dependencies]
hyper = { version = "0.14", features = ["full"] }
io-lifetimes = { version = "1" } io-lifetimes = { version = "1" }
nix = { version = "0.26" } nix = { version = "0.26" }
once_cell = { version = "1", default-features = false, features = ["unstable"] } once_cell = { version = "1", default-features = false, features = ["unstable"] }
@ -190,7 +188,6 @@ once_cell = { version = "1", default-features = false, features = ["unstable"] }
rustix = { version = "0.37", features = ["fs", "termios"] } rustix = { version = "0.37", features = ["fs", "termios"] }
[target.x86_64-pc-windows-msvc.dependencies] [target.x86_64-pc-windows-msvc.dependencies]
hyper = { version = "0.14", features = ["full"] }
once_cell = { version = "1", default-features = false, features = ["unstable"] } once_cell = { version = "1", default-features = false, features = ["unstable"] }
scopeguard = { version = "1" } scopeguard = { version = "1" }
winapi = { version = "0.3", default-features = false, features = ["basetsd", "consoleapi", "errhandlingapi", "fileapi", "handleapi", "impl-debug", "impl-default", "knownfolders", "minwinbase", "minwindef", "ntsecapi", "ntstatus", "objbase", "processenv", "shellapi", "shlobj", "std", "stringapiset", "synchapi", "timezoneapi", "winbase", "wincon", "winerror", "winnt", "winreg", "winuser", "ws2ipdef", "ws2tcpip", "wtypesbase"] } winapi = { version = "0.3", default-features = false, features = ["basetsd", "consoleapi", "errhandlingapi", "fileapi", "handleapi", "impl-debug", "impl-default", "knownfolders", "minwinbase", "minwindef", "ntsecapi", "ntstatus", "objbase", "processenv", "shellapi", "shlobj", "std", "stringapiset", "synchapi", "timezoneapi", "winbase", "wincon", "winerror", "winnt", "winreg", "winuser", "ws2ipdef", "ws2tcpip", "wtypesbase"] }