fix: add `storage-type=iox` header to influxrpc responses (#5917)

* refactor: Move response creation into a single location

* fix: add storage-type=iox header to influxrpc responses

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Andrew Lamb 2022-10-20 05:24:55 -04:00 committed by GitHub
parent 5a63cf1f33
commit 487f4a420f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 116 additions and 46 deletions

View File

@ -42,7 +42,7 @@ use std::{
};
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tonic::Status;
use tonic::{metadata::MetadataMap, Status};
use trace::{ctx::SpanContext, span::SpanExt};
use trace_http::ctx::{RequestLogContext, RequestLogContextExt};
use tracker::InstrumentedAsyncOwnedSemaphorePermit;
@ -214,10 +214,19 @@ impl Error {
Self::NotYetImplemented { .. } => tonic::Code::Unimplemented,
};
tonic::Status::new(code, msg)
let mut status = tonic::Status::new(code, msg);
add_headers(status.metadata_mut());
status
}
}
/// Add IOx specific headers to the response
fn add_headers(metadata: &mut MetadataMap) {
// Note we can't use capital letters otherwise the http header
// library asserts, so return lowercase storage-type
metadata.insert("storage-type", "iox".parse().unwrap());
}
/// Implements the protobuf defined Storage service for a DatabaseStore
#[tonic::async_trait]
impl<T> Storage for StorageService<T>
@ -267,10 +276,7 @@ where
query_completed_token.set_success();
}
Ok(tonic::Response::new(StreamWithPermit::new(
futures::stream::iter(results),
permit,
)))
make_response(futures::stream::iter(results), permit)
}
type ReadGroupStream =
@ -350,10 +356,7 @@ where
query_completed_token.set_success();
}
Ok(tonic::Response::new(StreamWithPermit::new(
futures::stream::iter(results),
permit,
)))
make_response(futures::stream::iter(results), permit)
}
type ReadWindowAggregateStream =
@ -432,10 +435,7 @@ where
query_completed_token.set_success();
}
Ok(tonic::Response::new(StreamWithPermit::new(
futures::stream::iter(results),
permit,
)))
make_response(futures::stream::iter(results), permit)
}
type TagKeysStream = StreamWithPermit<ReceiverStream<Result<StringValuesResponse, Status>>>;
@ -499,10 +499,7 @@ where
.await
.expect("sending tag_keys response to server");
Ok(tonic::Response::new(StreamWithPermit::new(
ReceiverStream::new(rx),
permit,
)))
make_response(ReceiverStream::new(rx), permit)
}
type TagValuesStream = StreamWithPermit<ReceiverStream<Result<StringValuesResponse, Status>>>;
@ -603,10 +600,7 @@ where
.await
.expect("sending tag_values response to server");
Ok(tonic::Response::new(StreamWithPermit::new(
ReceiverStream::new(rx),
permit,
)))
make_response(ReceiverStream::new(rx), permit)
}
type TagValuesGroupedByMeasurementAndTagKeyStream = StreamWithPermit<
@ -661,10 +655,7 @@ where
query_completed_token.set_success();
}
Ok(tonic::Response::new(StreamWithPermit::new(
futures::stream::iter(results),
permit,
)))
make_response(futures::stream::iter(results), permit)
}
type ReadSeriesCardinalityStream = ReceiverStream<Result<Int64ValuesResponse, Status>>;
@ -772,10 +763,7 @@ where
.await
.expect("sending measurement names response to server");
Ok(tonic::Response::new(StreamWithPermit::new(
ReceiverStream::new(rx),
permit,
)))
make_response(ReceiverStream::new(rx), permit)
}
type MeasurementTagKeysStream =
@ -843,10 +831,7 @@ where
.await
.expect("sending measurement_tag_keys response to server");
Ok(tonic::Response::new(StreamWithPermit::new(
ReceiverStream::new(rx),
permit,
)))
make_response(ReceiverStream::new(rx), permit)
}
type MeasurementTagValuesStream =
@ -917,10 +902,7 @@ where
.await
.expect("sending measurement_tag_values response to server");
Ok(tonic::Response::new(StreamWithPermit::new(
ReceiverStream::new(rx),
permit,
)))
make_response(ReceiverStream::new(rx), permit)
}
type MeasurementFieldsStream =
@ -993,10 +975,7 @@ where
.await
.expect("sending measurement_fields response to server");
Ok(tonic::Response::new(StreamWithPermit::new(
ReceiverStream::new(rx),
permit,
)))
make_response(ReceiverStream::new(rx), permit)
}
async fn offsets(
@ -1564,6 +1543,16 @@ impl<T, E: std::fmt::Debug> ErrorLogger for Result<T, E> {
}
}
/// Return the stream of results as a gRPC (tonic) response
pub fn make_response<S>(
stream: S,
permit: InstrumentedAsyncOwnedSemaphorePermit,
) -> Result<tonic::Response<StreamWithPermit<S>>, tonic::Status> {
let mut response = tonic::Response::new(StreamWithPermit::new(stream, permit));
add_headers(response.metadata_mut());
Ok(response)
}
/// Helper to keep a semaphore permit attached to a stream.
#[pin_project]
pub struct StreamWithPermit<S> {
@ -1603,7 +1592,7 @@ mod tests {
use futures::Future;
use generated_types::{i_ox_testing_client::IOxTestingClient, tag_key_predicate::Value};
use influxdb_storage_client::{
connection::{Builder as ConnectionBuilder, GrpcConnection},
connection::{Builder as ConnectionBuilder, Connection, GrpcConnection},
generated_types::*,
Client as StorageClient, OrgAndBucket,
};
@ -3404,6 +3393,84 @@ mod tests {
}
}
#[tokio::test]
// ensure that the expected IOx header is included with successes
async fn test_headers() {
test_helpers::maybe_start_logging();
// Start a test gRPC server on a randomally allocated port
let fixture = Fixture::new().await.expect("Connecting to test server");
let db_info = org_and_bucket();
// Add a chunk with a field
let chunk = TestChunk::new("TheMeasurement")
.with_time_column()
.with_tag_column("state")
.with_one_row_of_data();
fixture
.test_storage
.db_or_create(db_info.db_name())
.await
.add_chunk("my_partition_key", Arc::new(chunk));
// use the raw gRPR client to examine the headers
let mut storage_client = storage_client::StorageClient::new(
fixture.client_connection.clone().into_grpc_connection(),
);
let source = Some(StorageClient::read_source(&db_info, 1));
let request = ReadFilterRequest {
read_source: source.clone(),
range: None,
predicate: None,
..Default::default()
};
let response = storage_client.read_filter(request).await.unwrap();
println!("Result is {:?}", response);
assert_eq!(response.metadata().get("storage-type").unwrap(), "iox");
}
#[tokio::test]
// ensure that the expected IOx header is included with errors
async fn test_headers_error() {
test_helpers::maybe_start_logging();
// Start a test gRPC server on a randomally allocated port
let fixture = Fixture::new().await.expect("Connecting to test server");
let db_info = org_and_bucket();
let chunk = TestChunk::new("my_table").with_error("Sugar we are going down");
fixture
.test_storage
.db_or_create(db_info.db_name())
.await
.add_chunk("my_partition_key", Arc::new(chunk));
// use the raw gRPR client to examine the headers
let mut storage_client = storage_client::StorageClient::new(
fixture.client_connection.clone().into_grpc_connection(),
);
let source = Some(StorageClient::read_source(&db_info, 1));
let request = ReadFilterRequest {
read_source: source.clone(),
range: None,
predicate: None,
..Default::default()
};
let response = storage_client.read_filter(request).await.unwrap_err();
println!("Result is {:?}", response);
assert_eq!(response.metadata().get("storage-type").unwrap(), "iox");
}
fn make_timestamp_range(start: i64, end: i64) -> TimestampRange {
TimestampRange { start, end }
}
@ -3482,6 +3549,7 @@ mod tests {
// Wrapper around raw clients and test database
struct Fixture {
client_connection: Connection,
iox_client: IOxTestingClient<GrpcConnection>,
storage_client: StorageClient,
test_storage: Arc<TestDatabaseStore>,
@ -3536,17 +3604,19 @@ mod tests {
let join_handle = tokio::task::spawn(server);
let conn = ConnectionBuilder::default()
let client_connection = ConnectionBuilder::default()
.connect_timeout(std::time::Duration::from_secs(30))
.build(format!("http://{}", bind_addr))
.await
.unwrap();
let iox_client = IOxTestingClient::new(conn.clone().into_grpc_connection());
let iox_client =
IOxTestingClient::new(client_connection.clone().into_grpc_connection());
let storage_client = StorageClient::new(conn);
let storage_client = StorageClient::new(client_connection.clone());
Ok(Self {
client_connection,
iox_client,
storage_client,
test_storage,