feat: define router -> ingester write protocol

Specify a gRPC service and request/response message formats to push
writes directly from a router to an ingester.
pull/24376/head
Dom Dwyer 2022-11-14 10:41:39 +01:00
parent 9db808f795
commit dcc7b10bcf
No known key found for this signature in database
GPG Key ID: E4C40DBD9157879A
4 changed files with 16 additions and 29 deletions

View File

@ -54,6 +54,7 @@ fn generate_grpc_types(root: &Path) -> Result<()> {
ingester_path.join("parquet_metadata.proto"),
ingester_path.join("query.proto"),
ingester_path.join("write_info.proto"),
ingester_path.join("write.proto"),
namespace_path.join("service.proto"),
object_store_path.join("service.proto"),
predicate_path.join("predicate.proto"),

View File

@ -0,0 +1,15 @@
syntax = "proto3";
package influxdata.iox.ingester.v1;
option go_package = "github.com/influxdata/iox/ingester/v1";
import "influxdata/pbdata/v1/influxdb_pb_data_protocol.proto";
service WriteService {
rpc Write(WriteRequest) returns (WriteResponse);
}
message WriteRequest {
influxdata.pbdata.v1.DatabaseBatch payload = 1;
}
message WriteResponse {}

View File

@ -123,19 +123,3 @@ message Column {
// Trailing off bits (0) *may* be omitted.
bytes null_mask = 4;
}
// Note there used to be a service that would load this internal protobuf format.
// See https://github.com/influxdata/influxdb_iox/pull/5750 and
// https://github.com/influxdata/influxdb_iox/issues/4866
// for rationale of why it was removed
// service WriteService {
// rpc Write (WriteRequest) returns (WriteResponse);
// }
message WriteRequest {
DatabaseBatch database_batch = 1;
}
message WriteResponse {
}

View File

@ -1,7 +1,6 @@
//! Client helpers for writing end to end ng tests
use arrow::record_batch::RecordBatch;
use futures::{stream::FuturesUnordered, StreamExt};
use generated_types::influxdata::pbdata::v1::WriteResponse;
use http::Response;
use hyper::{Body, Client, Request};
use influxdb_iox_client::{
@ -51,18 +50,6 @@ pub fn get_write_token(response: &Response<Body>) -> String {
.to_string()
}
/// Extracts the write token from the specified response (to the gRPC write API)
pub fn get_write_token_from_grpc(response: &tonic::Response<WriteResponse>) -> String {
let message = format!("no write token in {:?}", response);
response
.metadata()
.get("X-IOx-Write-Token")
.expect(&message)
.to_str()
.expect("Value not a string")
.to_string()
}
/// returns the write info from the connection (to either an ingester or a querier) for this token
pub async fn token_info(
write_token: impl AsRef<str>,