Merge pull request #798 from influxdata/bytes-in-grpc

feat: Use Bytes instead of Vec<u8> in prost generated code
pull/24376/head
kodiakhq[bot] 2021-02-12 17:20:02 +00:00 committed by GitHub
commit da0980edc7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 442 additions and 40 deletions

12
Cargo.lock generated
View File

@ -1233,7 +1233,9 @@ version = "0.1.0"
dependencies = [
"flatbuffers 0.6.1",
"futures",
"google_types",
"prost",
"prost-build",
"prost-types",
"tonic",
"tonic-build",
@ -1283,6 +1285,15 @@ version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b919933a397b79c37e33b77bb2aa3dc8eb6e165ad809e58ff75bc7db2e34574"
[[package]]
name = "google_types"
version = "0.1.0"
dependencies = [
"prost",
"prost-build",
"prost-types",
]
[[package]]
name = "h2"
version = "0.3.0"
@ -1533,7 +1544,6 @@ dependencies = [
"panic_logging",
"predicates",
"prost",
"prost-types",
"query",
"rand 0.7.3",
"read_buffer",

View File

@ -10,6 +10,7 @@ members = [
"arrow_deps",
"data_types",
"generated_types",
"google_types",
"influxdb_iox_client",
"influxdb_line_protocol",
"influxdb_tsm",
@ -71,7 +72,6 @@ lazy_static = "1.4.0"
opentelemetry = { version = "0.12", default-features = false, features = ["trace", "tokio-support"] }
opentelemetry-jaeger = { version = "0.11", features = ["tokio"] }
prost = "0.7"
prost-types = "0.7"
# Forked to upgrade hyper and tokio
routerify = { git = "https://github.com/influxdata/routerify", rev = "274e250" }
serde = { version = "1.0", features = ["derive"] }

View File

@ -11,5 +11,8 @@ prost = "0.7"
prost-types = "0.7"
tonic = "0.4"
google_types = { path = "../google_types" }
[build-dependencies] # In alphabetical order
tonic-build = "0.4"
prost-build = "0.7"

View File

@ -1,7 +1,5 @@
//! Compiles Protocol Buffers and FlatBuffers schema definitions into
//! native Rust types.
//!
//! Source files are found in
use std::{
path::{Path, PathBuf},
@ -39,7 +37,13 @@ fn generate_grpc_types(root: &Path) -> Result<()> {
println!("cargo:rerun-if-changed={}", proto_file.display());
}
tonic_build::configure().compile(&proto_files, &[root.into()])?;
let mut config = prost_build::Config::new();
config
.compile_well_known_types()
.extern_path(".google", "::google_types");
tonic_build::configure().compile_with_config(config, &proto_files, &[root.into()])?;
Ok(())
}

View File

@ -24,3 +24,5 @@ impl TimestampRange {
}
}
}
pub use google_types as google;

13
google_types/Cargo.toml Normal file
View File

@ -0,0 +1,13 @@
[package]
name = "google_types"
version = "0.1.0"
authors = ["Raphael Taylor-Davies <r.taylordavies@googlemail.com>"]
description = "Standard Protobuf definitions - extracted into separate crate to workaround https://github.com/hyperium/tonic/issues/521"
edition = "2018"
[dependencies] # In alphabetical order
prost = "0.7"
prost-types = "0.7"
[build-dependencies] # In alphabetical order
prost-build = "0.7"

30
google_types/build.rs Normal file
View File

@ -0,0 +1,30 @@
//! Compiles Protocol Buffers and FlatBuffers schema definitions into
//! native Rust types.
use std::path::PathBuf;
type Error = Box<dyn std::error::Error>;
type Result<T, E = Error> = std::result::Result<T, E>;
fn main() -> Result<()> {
let root = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("protos");
let proto_files = vec![
root.join("google/rpc/error_details.proto"),
root.join("google/rpc/status.proto"),
root.join("google/protobuf/types.proto"),
];
// Tell cargo to recompile if any of these proto files are changed
for proto_file in &proto_files {
println!("cargo:rerun-if-changed={}", proto_file.display());
}
prost_build::Config::new()
.compile_well_known_types()
.disable_comments(&["."])
.bytes(&[".google"])
.compile_protos(&proto_files, &[root])?;
Ok(())
}

View File

@ -0,0 +1,15 @@
syntax = "proto3";
package google.protobuf;
import "google/protobuf/any.proto";
import "google/protobuf/api.proto";
import "google/protobuf/descriptor.proto";
import "google/protobuf/duration.proto";
import "google/protobuf/empty.proto";
import "google/protobuf/field_mask.proto";
import "google/protobuf/source_context.proto";
import "google/protobuf/struct.proto";
import "google/protobuf/timestamp.proto";
import "google/protobuf/type.proto";
import "google/protobuf/wrappers.proto";

View File

@ -0,0 +1,251 @@
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// From https://github.com/googleapis/googleapis/blob/master/google/rpc/error_details.proto
syntax = "proto3";
package google.rpc;
import "google/protobuf/duration.proto";
option go_package = "google.golang.org/genproto/googleapis/rpc/errdetails;errdetails";
option java_multiple_files = true;
option java_outer_classname = "ErrorDetailsProto";
option java_package = "com.google.rpc";
option objc_class_prefix = "RPC";
// Describes when the clients can retry a failed request. Clients could ignore
// the recommendation here or retry when this information is missing from error
// responses.
//
// It's always recommended that clients should use exponential backoff when
// retrying.
//
// Clients should wait until `retry_delay` amount of time has passed since
// receiving the error response before retrying. If retrying requests also
// fail, clients should use an exponential backoff scheme to gradually increase
// the delay between retries based on `retry_delay`, until either a maximum
// number of retries have been reached or a maximum retry delay cap has been
// reached.
message RetryInfo {
// Clients should wait at least this long between retrying the same request.
google.protobuf.Duration retry_delay = 1;
}
// Describes additional debugging info.
message DebugInfo {
// The stack trace entries indicating where the error occurred.
repeated string stack_entries = 1;
// Additional debugging information provided by the server.
string detail = 2;
}
// Describes how a quota check failed.
//
// For example if a daily limit was exceeded for the calling project,
// a service could respond with a QuotaFailure detail containing the project
// id and the description of the quota limit that was exceeded. If the
// calling project hasn't enabled the service in the developer console, then
// a service could respond with the project id and set `service_disabled`
// to true.
//
// Also see RetryInfo and Help types for other details about handling a
// quota failure.
message QuotaFailure {
// A message type used to describe a single quota violation. For example, a
// daily quota or a custom quota that was exceeded.
message Violation {
// The subject on which the quota check failed.
// For example, "clientip:<ip address of client>" or "project:<Google
// developer project id>".
string subject = 1;
// A description of how the quota check failed. Clients can use this
// description to find more about the quota configuration in the service's
// public documentation, or find the relevant quota limit to adjust through
// developer console.
//
// For example: "Service disabled" or "Daily Limit for read operations
// exceeded".
string description = 2;
}
// Describes all quota violations.
repeated Violation violations = 1;
}
// Describes the cause of the error with structured details.
//
// Example of an error when contacting the "pubsub.googleapis.com" API when it
// is not enabled:
//
// { "reason": "API_DISABLED"
// "domain": "googleapis.com"
// "metadata": {
// "resource": "projects/123",
// "service": "pubsub.googleapis.com"
// }
// }
//
// This response indicates that the pubsub.googleapis.com API is not enabled.
//
// Example of an error that is returned when attempting to create a Spanner
// instance in a region that is out of stock:
//
// { "reason": "STOCKOUT"
// "domain": "spanner.googleapis.com",
// "metadata": {
// "availableRegions": "us-central1,us-east2"
// }
// }
message ErrorInfo {
// The reason of the error. This is a constant value that identifies the
// proximate cause of the error. Error reasons are unique within a particular
// domain of errors. This should be at most 63 characters and match
// /[A-Z0-9_]+/.
string reason = 1;
// The logical grouping to which the "reason" belongs. The error domain
// is typically the registered service name of the tool or product that
// generates the error. Example: "pubsub.googleapis.com". If the error is
// generated by some common infrastructure, the error domain must be a
// globally unique value that identifies the infrastructure. For Google API
// infrastructure, the error domain is "googleapis.com".
string domain = 2;
// Additional structured details about this error.
//
// Keys should match /[a-zA-Z0-9-_]/ and be limited to 64 characters in
// length. When identifying the current value of an exceeded limit, the units
// should be contained in the key, not the value. For example, rather than
// {"instanceLimit": "100/request"}, should be returned as,
// {"instanceLimitPerRequest": "100"}, if the client exceeds the number of
// instances that can be created in a single (batch) request.
map<string, string> metadata = 3;
}
// Describes what preconditions have failed.
//
// For example, if an RPC failed because it required the Terms of Service to be
// acknowledged, it could list the terms of service violation in the
// PreconditionFailure message.
message PreconditionFailure {
// A message type used to describe a single precondition failure.
message Violation {
// The type of PreconditionFailure. We recommend using a service-specific
// enum type to define the supported precondition violation subjects. For
// example, "TOS" for "Terms of Service violation".
string type = 1;
// The subject, relative to the type, that failed.
// For example, "google.com/cloud" relative to the "TOS" type would indicate
// which terms of service is being referenced.
string subject = 2;
// A description of how the precondition failed. Developers can use this
// description to understand how to fix the failure.
//
// For example: "Terms of service not accepted".
string description = 3;
}
// Describes all precondition violations.
repeated Violation violations = 1;
}
// Describes violations in a client request. This error type focuses on the
// syntactic aspects of the request.
message BadRequest {
// A message type used to describe a single bad request field.
message FieldViolation {
// A path leading to a field in the request body. The value will be a
// sequence of dot-separated identifiers that identify a protocol buffer
// field. E.g., "field_violations.field" would identify this field.
string field = 1;
// A description of why the request element is bad.
string description = 2;
}
// Describes all violations in a client request.
repeated FieldViolation field_violations = 1;
}
// Contains metadata about the request that clients can attach when filing a bug
// or providing other forms of feedback.
message RequestInfo {
// An opaque string that should only be interpreted by the service generating
// it. For example, it can be used to identify requests in the service's logs.
string request_id = 1;
// Any data that was used to serve this request. For example, an encrypted
// stack trace that can be sent back to the service provider for debugging.
string serving_data = 2;
}
// Describes the resource that is being accessed.
message ResourceInfo {
// A name for the type of resource being accessed, e.g. "sql table",
// "cloud storage bucket", "file", "Google calendar"; or the type URL
// of the resource: e.g. "type.googleapis.com/google.pubsub.v1.Topic".
string resource_type = 1;
// The name of the resource being accessed. For example, a shared calendar
// name: "example.com_4fghdhgsrgh@group.calendar.google.com", if the current
// error is [google.rpc.Code.PERMISSION_DENIED][google.rpc.Code.PERMISSION_DENIED].
string resource_name = 2;
// The owner of the resource (optional).
// For example, "user:<owner email>" or "project:<Google developer project
// id>".
string owner = 3;
// Describes what error is encountered when accessing this resource.
// For example, updating a cloud project may require the `writer` permission
// on the developer console project.
string description = 4;
}
// Provides links to documentation or for performing an out of band action.
//
// For example, if a quota check failed with an error indicating the calling
// project hasn't enabled the accessed service, this can contain a URL pointing
// directly to the right place in the developer console to flip the bit.
message Help {
// Describes a URL link.
message Link {
// Describes what the link offers.
string description = 1;
// The URL of the link.
string url = 2;
}
// URL(s) pointing to additional information on handling the current error.
repeated Link links = 1;
}
// Provides a localized error message that is safe to return to the user
// which can be attached to an RPC error.
message LocalizedMessage {
// The locale used following the specification defined at
// http://www.rfc-editor.org/rfc/bcp/bcp47.txt.
// Examples are: "en-US", "fr-CH", "es-MX"
string locale = 1;
// The localized error message in the above locale.
string message = 2;
}

View File

@ -0,0 +1,49 @@
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// From https://github.com/googleapis/googleapis/blob/master/google/rpc/status.proto
syntax = "proto3";
package google.rpc;
import "google/protobuf/any.proto";
option cc_enable_arenas = true;
option go_package = "google.golang.org/genproto/googleapis/rpc/status;status";
option java_multiple_files = true;
option java_outer_classname = "StatusProto";
option java_package = "com.google.rpc";
option objc_class_prefix = "RPC";
// The `Status` type defines a logical error model that is suitable for
// different programming environments, including REST APIs and RPC APIs. It is
// used by [gRPC](https://github.com/grpc). Each `Status` message contains
// three pieces of data: error code, error message, and error details.
//
// You can find out more about this error model and how to work with it in the
// [API Design Guide](https://cloud.google.com/apis/design/errors).
message Status {
// The status code, which should be an enum value of [google.rpc.Code][google.rpc.Code].
int32 code = 1;
// A developer-facing error message, which should be in English. Any
// user-facing error message should be localized and sent in the
// [google.rpc.Status.details][google.rpc.Status.details] field, or localized by the client.
string message = 2;
// A list of messages that carry the error details. There is a common set of
// message types for APIs to use.
repeated google.protobuf.Any details = 3;
}

23
google_types/src/lib.rs Normal file
View File

@ -0,0 +1,23 @@
// This crate deliberately does not use the same linting rules as the other
// crates because of all the generated code it contains that we don't have much
// control over.
#![allow(
unused_imports,
clippy::redundant_static_lifetimes,
clippy::redundant_closure,
clippy::redundant_field_names
)]
mod pb {
pub mod google {
pub mod protobuf {
include!(concat!(env!("OUT_DIR"), "/google.protobuf.rs"));
}
pub mod rpc {
include!(concat!(env!("OUT_DIR"), "/google.rpc.rs"));
}
}
}
pub use pb::google::*;

View File

@ -1,9 +1,9 @@
use tonic::Status;
use generated_types::{
MeasurementFieldsRequest, MeasurementNamesRequest, MeasurementTagKeysRequest,
MeasurementTagValuesRequest, ReadFilterRequest, ReadGroupRequest, ReadSource,
ReadWindowAggregateRequest, TagKeysRequest, TagValuesRequest,
google::protobuf::Any, MeasurementFieldsRequest, MeasurementNamesRequest,
MeasurementTagKeysRequest, MeasurementTagValuesRequest, ReadFilterRequest, ReadGroupRequest,
ReadSource, ReadWindowAggregateRequest, TagKeysRequest, TagValuesRequest,
};
use super::id::ID;
@ -14,9 +14,9 @@ use std::convert::TryInto;
/// because for some requests the field is named `read_source` and for others it
/// is `tags_source`.
pub trait GrpcInputs {
fn read_source_field(&self) -> Option<&prost_types::Any>;
fn read_source_field(&self) -> Option<&Any>;
fn read_source_raw(&self) -> Result<&prost_types::Any, Status> {
fn read_source_raw(&self) -> Result<&Any, Status> {
Ok(self
.read_source_field()
.ok_or_else(|| Status::invalid_argument("missing read_source"))?)
@ -49,55 +49,55 @@ pub trait GrpcInputs {
}
impl GrpcInputs for ReadFilterRequest {
fn read_source_field(&self) -> Option<&prost_types::Any> {
fn read_source_field(&self) -> Option<&Any> {
self.read_source.as_ref()
}
}
impl GrpcInputs for ReadGroupRequest {
fn read_source_field(&self) -> Option<&prost_types::Any> {
fn read_source_field(&self) -> Option<&Any> {
self.read_source.as_ref()
}
}
impl GrpcInputs for TagKeysRequest {
fn read_source_field(&self) -> Option<&prost_types::Any> {
fn read_source_field(&self) -> Option<&Any> {
self.tags_source.as_ref()
}
}
impl GrpcInputs for TagValuesRequest {
fn read_source_field(&self) -> Option<&prost_types::Any> {
fn read_source_field(&self) -> Option<&Any> {
self.tags_source.as_ref()
}
}
impl GrpcInputs for MeasurementNamesRequest {
fn read_source_field(&self) -> Option<&prost_types::Any> {
fn read_source_field(&self) -> Option<&Any> {
self.source.as_ref()
}
}
impl GrpcInputs for MeasurementTagKeysRequest {
fn read_source_field(&self) -> Option<&prost_types::Any> {
fn read_source_field(&self) -> Option<&Any> {
self.source.as_ref()
}
}
impl GrpcInputs for MeasurementTagValuesRequest {
fn read_source_field(&self) -> Option<&prost_types::Any> {
fn read_source_field(&self) -> Option<&Any> {
self.source.as_ref()
}
}
impl GrpcInputs for MeasurementFieldsRequest {
fn read_source_field(&self) -> Option<&prost_types::Any> {
fn read_source_field(&self) -> Option<&Any> {
self.source.as_ref()
}
}
impl GrpcInputs for ReadWindowAggregateRequest {
fn read_source_field(&self) -> Option<&prost_types::Any> {
fn read_source_field(&self) -> Option<&Any> {
self.read_source.as_ref()
}
}

View File

@ -13,12 +13,12 @@ use super::{
};
use data_types::{error::ErrorLogger, names::org_and_bucket_to_database, DatabaseName};
use generated_types::{
i_ox_testing_server::IOxTesting, storage_server::Storage, CapabilitiesResponse, Capability,
Int64ValuesResponse, MeasurementFieldsRequest, MeasurementFieldsResponse,
MeasurementNamesRequest, MeasurementTagKeysRequest, MeasurementTagValuesRequest, Predicate,
ReadFilterRequest, ReadGroupRequest, ReadResponse, ReadSeriesCardinalityRequest,
ReadWindowAggregateRequest, StringValuesResponse, TagKeysRequest, TagValuesRequest,
TestErrorRequest, TestErrorResponse, TimestampRange,
google::protobuf::Empty, i_ox_testing_server::IOxTesting, storage_server::Storage,
CapabilitiesResponse, Capability, Int64ValuesResponse, MeasurementFieldsRequest,
MeasurementFieldsResponse, MeasurementNamesRequest, MeasurementTagKeysRequest,
MeasurementTagValuesRequest, Predicate, ReadFilterRequest, ReadGroupRequest, ReadResponse,
ReadSeriesCardinalityRequest, ReadWindowAggregateRequest, StringValuesResponse, TagKeysRequest,
TagValuesRequest, TestErrorRequest, TestErrorResponse, TimestampRange,
};
use query::{
exec::fieldlist::FieldList,
@ -503,7 +503,7 @@ where
async fn capabilities(
&self,
_req: tonic::Request<()>,
_req: tonic::Request<Empty>,
) -> Result<tonic::Response<CapabilitiesResponse>, Status> {
// Full list of go capabilities in
// idpe/storage/read/capabilities.go (aka window aggregate /
@ -1137,6 +1137,7 @@ mod tests {
Aggregate as RPCAggregate, Duration as RPCDuration, Node, ReadSource, Window as RPCWindow,
};
use generated_types::google::protobuf::Any;
use prost::Message;
type IOxTestingClient = i_ox_testing_client::IOxTestingClient<tonic::transport::Channel>;
@ -2254,25 +2255,25 @@ mod tests {
}
/// Create a ReadSource suitable for constructing messages
fn read_source(org_id: u64, bucket_id: u64, partition_id: u64) -> prost_types::Any {
fn read_source(org_id: u64, bucket_id: u64, partition_id: u64) -> Any {
let read_source = ReadSource {
org_id,
bucket_id,
partition_id,
};
let mut d = Vec::new();
let mut d = bytes::BytesMut::new();
read_source
.encode(&mut d)
.expect("encoded read source appropriately");
prost_types::Any {
Any {
type_url: "/TODO".to_string(),
value: d,
value: d.freeze(),
}
}
/// return the capabilities of the server as a hash map
async fn capabilities(&mut self) -> Result<HashMap<String, Vec<String>>, tonic::Status> {
let response = self.inner.capabilities(()).await?.into_inner();
let response = self.inner.capabilities(Empty {}).await?.into_inner();
let CapabilitiesResponse { caps } = response;

View File

@ -152,7 +152,7 @@ impl Scenario {
self.ns_since_epoch
}
fn read_source(&self) -> Option<prost_types::Any> {
fn read_source(&self) -> Option<generated_types::google::protobuf::Any> {
let partition_id = u64::from(u32::MAX);
let read_source = ReadSource {
org_id: self.org_id(),
@ -160,11 +160,11 @@ impl Scenario {
partition_id,
};
let mut d = Vec::new();
let mut d = bytes::BytesMut::new();
read_source.encode(&mut d).unwrap();
let read_source = prost_types::Any {
let read_source = generated_types::google::protobuf::Any {
type_url: "/TODO".to_string(),
value: d,
value: d.freeze(),
};
Some(read_source)

View File

@ -2,6 +2,7 @@ use crate::{create_database, substitute_nanos, Scenario};
use futures::prelude::*;
use generated_types::{
aggregate::AggregateType,
google::protobuf::{Any, Empty},
node::{Comparison, Type as NodeType, Value},
read_group_request::Group,
read_response::{frame::Data, *},
@ -27,7 +28,7 @@ pub async fn test(storage_client: &mut StorageClient<Channel>, scenario: &Scenar
/// Validate that capabilities rpc endpoint is hooked up
async fn capabilities_endpoint(storage_client: &mut StorageClient<Channel>) {
let capabilities_response = storage_client.capabilities(()).await.unwrap();
let capabilities_response = storage_client.capabilities(Empty {}).await.unwrap();
let capabilities_response = capabilities_response.into_inner();
assert_eq!(
capabilities_response.caps.len(),
@ -327,7 +328,7 @@ async fn load_read_group_data(client: &influxdb2_client::Client, scenario: &Scen
// assumes that load_read_group_data has been previously run
async fn test_read_group_none_agg(
storage_client: &mut StorageClient<tonic::transport::Channel>,
read_source: &std::option::Option<prost_types::Any>,
read_source: &std::option::Option<Any>,
) {
// read_group(group_keys: region, agg: None)
let read_group_request = ReadGroupRequest {
@ -380,7 +381,7 @@ async fn test_read_group_none_agg(
/// Test that predicates make it through
async fn test_read_group_none_agg_with_predicate(
storage_client: &mut StorageClient<tonic::transport::Channel>,
read_source: &std::option::Option<prost_types::Any>,
read_source: &std::option::Option<Any>,
) {
let read_group_request = ReadGroupRequest {
read_source: read_source.clone(),
@ -426,7 +427,7 @@ async fn test_read_group_none_agg_with_predicate(
// load_read_group_data has been previously run
async fn test_read_group_sum_agg(
storage_client: &mut StorageClient<tonic::transport::Channel>,
read_source: &std::option::Option<prost_types::Any>,
read_source: &std::option::Option<Any>,
) {
// read_group(group_keys: region, agg: Sum)
let read_group_request = ReadGroupRequest {
@ -481,7 +482,7 @@ async fn test_read_group_sum_agg(
// load_read_group_data has been previously run
async fn test_read_group_last_agg(
storage_client: &mut StorageClient<tonic::transport::Channel>,
read_source: &std::option::Option<prost_types::Any>,
read_source: &std::option::Option<Any>,
) {
// read_group(group_keys: region, agg: Last)
let read_group_request = ReadGroupRequest {