Merge pull request #6178 from influxdata/dom/ingester-rpc-write

feat(ingester): rpc write handler
pull/24376/head
kodiakhq[bot] 2022-11-18 16:43:53 +00:00 committed by GitHub
commit 4d55efe655
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 977 additions and 573 deletions

1
Cargo.lock generated
View File

@ -2340,6 +2340,7 @@ dependencies = [
"metric",
"mutable_batch",
"mutable_batch_lp",
"mutable_batch_pb",
"object_store",
"observability_deps",
"once_cell",

View File

@ -50,6 +50,7 @@ uuid = { version = "1", features = ["v4"] }
workspace-hack = { path = "../workspace-hack"}
write_buffer = { path = "../write_buffer" }
write_summary = { path = "../write_summary" }
mutable_batch_pb = { version = "0.1.0", path = "../mutable_batch_pb" }
[dev-dependencies]
assert_matches = "1.5.0"

View File

@ -1,42 +1,21 @@
//! gRPC service implementations for `ingester`.
use crate::{
handler::IngestHandler,
querier_handler::{FlatIngesterQueryResponse, FlatIngesterQueryResponseStream},
mod query;
mod rpc_write;
mod write_info;
use std::sync::{atomic::AtomicU64, Arc};
use crate::handler::IngestHandler;
use arrow_flight::flight_service_server::{
FlightService as Flight, FlightServiceServer as FlightServer,
};
use arrow::error::ArrowError;
use arrow_flight::{
flight_service_server::{FlightService as Flight, FlightServiceServer as FlightServer},
Action, ActionType, Criteria, Empty, FlightData, FlightDescriptor, FlightInfo,
HandshakeRequest, HandshakeResponse, IpcMessage, PutResult, SchemaAsIpc, SchemaResult, Ticket,
};
use data_types::{NamespaceId, TableId};
use flatbuffers::FlatBufferBuilder;
use futures::Stream;
use generated_types::influxdata::iox::{
catalog::v1::*,
ingester::v1::{
self as proto,
write_info_service_server::{WriteInfoService, WriteInfoServiceServer},
},
ingester::v1::write_info_service_server::{WriteInfoService, WriteInfoServiceServer},
};
use iox_catalog::interface::Catalog;
use observability_deps::tracing::*;
use pin_project::pin_project;
use prost::Message;
use service_grpc_catalog::CatalogService;
use snafu::{ResultExt, Snafu};
use std::{
pin::Pin,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
task::Poll,
};
use tonic::{Request, Response, Streaming};
use trace::{ctx::SpanContext, span::SpanExt};
use write_summary::WriteSummary;
/// This type is responsible for managing all gRPC services exposed by `ingester`.
#[derive(Debug)]
@ -66,17 +45,17 @@ impl<I: IngestHandler + Send + Sync + 'static> GrpcDelegate<I> {
/// Acquire an Arrow Flight gRPC service implementation.
pub fn flight_service(&self) -> FlightServer<impl Flight> {
FlightServer::new(FlightService {
ingest_handler: Arc::clone(&self.ingest_handler),
test_flight_do_get_panic: Arc::clone(&self.test_flight_do_get_panic),
})
FlightServer::new(query::FlightService::new(
Arc::clone(&self.ingest_handler),
Arc::clone(&self.test_flight_do_get_panic),
))
}
/// Acquire an WriteInfo gRPC service implementation.
pub fn write_info_service(&self) -> WriteInfoServiceServer<impl WriteInfoService> {
WriteInfoServiceServer::new(WriteInfoServiceImpl::new(
Arc::clone(&self.ingest_handler) as _
))
WriteInfoServiceServer::new(write_info::WriteInfoServiceImpl::new(Arc::clone(
&self.ingest_handler,
) as _))
}
/// Acquire a [`CatalogService`] gRPC service implementation.
@ -91,538 +70,3 @@ impl<I: IngestHandler + Send + Sync + 'static> GrpcDelegate<I> {
)))
}
}
/// Implementation of write info
struct WriteInfoServiceImpl {
handler: Arc<dyn IngestHandler + Send + Sync + 'static>,
}
impl WriteInfoServiceImpl {
pub fn new(handler: Arc<dyn IngestHandler + Send + Sync + 'static>) -> Self {
Self { handler }
}
}
#[tonic::async_trait]
impl WriteInfoService for WriteInfoServiceImpl {
async fn get_write_info(
&self,
request: Request<proto::GetWriteInfoRequest>,
) -> Result<Response<proto::GetWriteInfoResponse>, tonic::Status> {
let proto::GetWriteInfoRequest { write_token } = request.into_inner();
let write_summary =
WriteSummary::try_from_token(&write_token).map_err(tonic::Status::invalid_argument)?;
let progresses = self.handler.progresses(write_summary.shard_indexes()).await;
let shard_infos = progresses
.into_iter()
.map(|(shard_index, progress)| {
let status = write_summary
.write_status(shard_index, &progress)
.map_err(|e| tonic::Status::invalid_argument(e.to_string()))?;
let shard_index = shard_index.get();
let status = proto::ShardStatus::from(status);
debug!(shard_index, ?status, "write info status",);
Ok(proto::ShardInfo {
shard_index,
status: status.into(),
})
})
.collect::<Result<Vec<_>, tonic::Status>>()?;
Ok(tonic::Response::new(proto::GetWriteInfoResponse {
shard_infos,
}))
}
}
#[derive(Debug, Snafu)]
#[allow(missing_docs)]
pub enum Error {
#[snafu(display("Invalid ticket. Error: {:?} Ticket: {:?}", source, ticket))]
InvalidTicket {
source: prost::DecodeError,
ticket: Vec<u8>,
},
#[snafu(display("Invalid query, could not convert protobuf: {}", source))]
InvalidQuery {
source: generated_types::google::FieldViolation,
},
#[snafu(display("Error while performing query: {}", source))]
Query {
source: Box<crate::querier_handler::Error>,
},
#[snafu(display("No Namespace Data found for the given namespace ID {}", namespace_id,))]
NamespaceNotFound { namespace_id: NamespaceId },
#[snafu(display(
"No Table Data found for the given namespace ID {}, table ID {}",
namespace_id,
table_id
))]
TableNotFound {
namespace_id: NamespaceId,
table_id: TableId,
},
#[snafu(display("Error while streaming query results: {}", source))]
QueryStream { source: ArrowError },
#[snafu(display("Error during protobuf serialization: {}", source))]
Serialization { source: prost::EncodeError },
}
impl From<Error> for tonic::Status {
/// Logs and converts a result from the business logic into the appropriate tonic status
fn from(err: Error) -> Self {
// An explicit match on the Error enum will ensure appropriate logging is handled for any
// new error variants.
let msg = "Error handling Flight gRPC request";
match err {
Error::InvalidTicket { .. }
| Error::InvalidQuery { .. }
| Error::Query { .. }
| Error::NamespaceNotFound { .. }
| Error::TableNotFound { .. } => {
debug!(e=%err, msg)
}
Error::QueryStream { .. } | Error::Serialization { .. } => {
warn!(e=%err, msg)
}
}
err.to_status()
}
}
impl Error {
/// Converts a result from the business logic into the appropriate tonic status
fn to_status(&self) -> tonic::Status {
use tonic::Status;
match self {
Self::InvalidTicket { .. } | Self::InvalidQuery { .. } => {
Status::invalid_argument(self.to_string())
}
Self::Query { .. } | Self::QueryStream { .. } | Self::Serialization { .. } => {
Status::internal(self.to_string())
}
Self::NamespaceNotFound { .. } | Self::TableNotFound { .. } => {
Status::not_found(self.to_string())
}
}
}
}
/// Concrete implementation of the gRPC Arrow Flight Service API
#[derive(Debug)]
struct FlightService<I: IngestHandler + Send + Sync + 'static> {
ingest_handler: Arc<I>,
/// How many `do_get` flight requests should panic for testing purposes.
///
/// Every panic will decrease the counter until it reaches zero. At zero, no panics will occur.
test_flight_do_get_panic: Arc<AtomicU64>,
}
impl<I> FlightService<I>
where
I: IngestHandler + Send + Sync + 'static,
{
fn maybe_panic_in_flight_do_get(&self) {
loop {
let current = self.test_flight_do_get_panic.load(Ordering::SeqCst);
if current == 0 {
return;
}
if self
.test_flight_do_get_panic
.compare_exchange(current, current - 1, Ordering::SeqCst, Ordering::SeqCst)
.is_ok()
{
break;
}
}
panic!("Panicking in `do_get` for testing purposes.");
}
}
type TonicStream<T> = Pin<Box<dyn Stream<Item = Result<T, tonic::Status>> + Send + 'static>>;
#[tonic::async_trait]
impl<I: IngestHandler + Send + Sync + 'static> Flight for FlightService<I> {
type HandshakeStream = TonicStream<HandshakeResponse>;
type ListFlightsStream = TonicStream<FlightInfo>;
type DoGetStream = TonicStream<FlightData>;
type DoPutStream = TonicStream<PutResult>;
type DoActionStream = TonicStream<arrow_flight::Result>;
type ListActionsStream = TonicStream<ActionType>;
type DoExchangeStream = TonicStream<FlightData>;
async fn get_schema(
&self,
_request: Request<FlightDescriptor>,
) -> Result<Response<SchemaResult>, tonic::Status> {
Err(tonic::Status::unimplemented("Not yet implemented"))
}
async fn do_get(
&self,
request: Request<Ticket>,
) -> Result<Response<Self::DoGetStream>, tonic::Status> {
let span_ctx: Option<SpanContext> = request.extensions().get().cloned();
let ticket = request.into_inner();
let proto_query_request =
proto::IngesterQueryRequest::decode(&*ticket.ticket).context(InvalidTicketSnafu {
ticket: ticket.ticket,
})?;
let query_request = proto_query_request.try_into().context(InvalidQuerySnafu)?;
self.maybe_panic_in_flight_do_get();
let query_response = self
.ingest_handler
.query(query_request, span_ctx.child_span("ingest handler query"))
.await
.map_err(|e| match e {
crate::querier_handler::Error::NamespaceNotFound { namespace_id } => {
Error::NamespaceNotFound { namespace_id }
}
crate::querier_handler::Error::TableNotFound {
namespace_id,
table_id,
} => Error::TableNotFound {
namespace_id,
table_id,
},
_ => Error::Query {
source: Box::new(e),
},
})?;
let output = GetStream::new(query_response.flatten());
Ok(Response::new(Box::pin(output) as Self::DoGetStream))
}
async fn handshake(
&self,
request: Request<Streaming<HandshakeRequest>>,
) -> Result<Response<Self::HandshakeStream>, tonic::Status> {
let request = request.into_inner().message().await?.unwrap();
let response = HandshakeResponse {
protocol_version: request.protocol_version,
payload: request.payload,
};
let output = futures::stream::iter(std::iter::once(Ok(response)));
Ok(Response::new(Box::pin(output) as Self::HandshakeStream))
}
async fn list_flights(
&self,
_request: Request<Criteria>,
) -> Result<Response<Self::ListFlightsStream>, tonic::Status> {
Err(tonic::Status::unimplemented("Not yet implemented"))
}
async fn get_flight_info(
&self,
_request: Request<FlightDescriptor>,
) -> Result<Response<FlightInfo>, tonic::Status> {
Err(tonic::Status::unimplemented("Not yet implemented"))
}
async fn do_put(
&self,
_request: Request<Streaming<FlightData>>,
) -> Result<Response<Self::DoPutStream>, tonic::Status> {
Err(tonic::Status::unimplemented("Not yet implemented"))
}
async fn do_action(
&self,
_request: Request<Action>,
) -> Result<Response<Self::DoActionStream>, tonic::Status> {
Err(tonic::Status::unimplemented("Not yet implemented"))
}
async fn list_actions(
&self,
_request: Request<Empty>,
) -> Result<Response<Self::ListActionsStream>, tonic::Status> {
Err(tonic::Status::unimplemented("Not yet implemented"))
}
async fn do_exchange(
&self,
_request: Request<Streaming<FlightData>>,
) -> Result<Response<Self::DoExchangeStream>, tonic::Status> {
Err(tonic::Status::unimplemented("Not yet implemented"))
}
}
#[pin_project]
struct GetStream {
#[pin]
inner: Pin<Box<dyn Stream<Item = Result<FlatIngesterQueryResponse, ArrowError>> + Send>>,
done: bool,
buffer: Vec<FlightData>,
}
impl GetStream {
fn new(inner: FlatIngesterQueryResponseStream) -> Self {
Self {
inner,
done: false,
buffer: vec![],
}
}
}
impl Stream for GetStream {
type Item = Result<FlightData, tonic::Status>;
fn poll_next(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let this = self.project();
if !this.buffer.is_empty() {
let next = this.buffer.remove(0);
return Poll::Ready(Some(Ok(next)));
}
if *this.done {
Poll::Ready(None)
} else {
match this.inner.poll_next(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(None) => {
*this.done = true;
Poll::Ready(None)
}
Poll::Ready(Some(Err(e))) => {
*this.done = true;
let e = Error::QueryStream { source: e }.into();
Poll::Ready(Some(Err(e)))
}
Poll::Ready(Some(Ok(FlatIngesterQueryResponse::StartPartition {
partition_id,
status,
}))) => {
let mut bytes = bytes::BytesMut::new();
let app_metadata = proto::IngesterQueryResponseMetadata {
partition_id: partition_id.get(),
status: Some(proto::PartitionStatus {
parquet_max_sequence_number: status
.parquet_max_sequence_number
.map(|x| x.get()),
}),
};
prost::Message::encode(&app_metadata, &mut bytes)
.context(SerializationSnafu)?;
let flight_data = arrow_flight::FlightData::new(
None,
IpcMessage(build_none_flight_msg()),
bytes.to_vec(),
vec![],
);
Poll::Ready(Some(Ok(flight_data)))
}
Poll::Ready(Some(Ok(FlatIngesterQueryResponse::StartSnapshot { schema }))) => {
let options = arrow::ipc::writer::IpcWriteOptions::default();
let flight_data: FlightData = SchemaAsIpc::new(&schema, &options).into();
Poll::Ready(Some(Ok(flight_data)))
}
Poll::Ready(Some(Ok(FlatIngesterQueryResponse::RecordBatch { batch }))) => {
let options = arrow::ipc::writer::IpcWriteOptions::default();
let (mut flight_dictionaries, flight_batch) =
arrow_flight::utils::flight_data_from_arrow_batch(&batch, &options);
std::mem::swap(this.buffer, &mut flight_dictionaries);
this.buffer.push(flight_batch);
let next = this.buffer.remove(0);
Poll::Ready(Some(Ok(next)))
}
}
}
}
}
fn build_none_flight_msg() -> Vec<u8> {
let mut fbb = FlatBufferBuilder::new();
let mut message = arrow::ipc::MessageBuilder::new(&mut fbb);
message.add_version(arrow::ipc::MetadataVersion::V5);
message.add_header_type(arrow::ipc::MessageHeader::NONE);
message.add_bodyLength(0);
let data = message.finish();
fbb.finish(data, None);
fbb.finished_data().to_vec()
}
#[cfg(test)]
mod tests {
use arrow::ipc::MessageHeader;
use data_types::PartitionId;
use futures::StreamExt;
use mutable_batch_lp::test_helpers::lp_to_mutable_batch;
use schema::Projection;
use crate::querier_handler::PartitionStatus;
use super::*;
#[tokio::test]
async fn test_get_stream_empty() {
assert_get_stream(vec![], vec![]).await;
}
#[tokio::test]
async fn test_get_stream_all_types() {
let batch = lp_to_mutable_batch("table z=1 0")
.1
.to_arrow(Projection::All)
.unwrap();
let schema = batch.schema();
assert_get_stream(
vec![
Ok(FlatIngesterQueryResponse::StartPartition {
partition_id: PartitionId::new(1),
status: PartitionStatus {
parquet_max_sequence_number: None,
},
}),
Ok(FlatIngesterQueryResponse::StartSnapshot { schema }),
Ok(FlatIngesterQueryResponse::RecordBatch { batch }),
],
vec![
Ok(DecodedFlightData {
header_type: MessageHeader::NONE,
app_metadata: proto::IngesterQueryResponseMetadata {
partition_id: 1,
status: Some(proto::PartitionStatus {
parquet_max_sequence_number: None,
}),
},
}),
Ok(DecodedFlightData {
header_type: MessageHeader::Schema,
app_metadata: proto::IngesterQueryResponseMetadata::default(),
}),
Ok(DecodedFlightData {
header_type: MessageHeader::RecordBatch,
app_metadata: proto::IngesterQueryResponseMetadata::default(),
}),
],
)
.await;
}
#[tokio::test]
async fn test_get_stream_shortcuts_err() {
assert_get_stream(
vec![
Ok(FlatIngesterQueryResponse::StartPartition {
partition_id: PartitionId::new(1),
status: PartitionStatus {
parquet_max_sequence_number: None,
},
}),
Err(ArrowError::IoError("foo".into())),
Ok(FlatIngesterQueryResponse::StartPartition {
partition_id: PartitionId::new(1),
status: PartitionStatus {
parquet_max_sequence_number: None,
},
}),
],
vec![
Ok(DecodedFlightData {
header_type: MessageHeader::NONE,
app_metadata: proto::IngesterQueryResponseMetadata {
partition_id: 1,
status: Some(proto::PartitionStatus {
parquet_max_sequence_number: None,
}),
},
}),
Err(tonic::Code::Internal),
],
)
.await;
}
#[tokio::test]
async fn test_get_stream_dictionary_batches() {
let batch = lp_to_mutable_batch("table,x=\"foo\",y=\"bar\" z=1 0")
.1
.to_arrow(Projection::All)
.unwrap();
assert_get_stream(
vec![Ok(FlatIngesterQueryResponse::RecordBatch { batch })],
vec![
Ok(DecodedFlightData {
header_type: MessageHeader::DictionaryBatch,
app_metadata: proto::IngesterQueryResponseMetadata::default(),
}),
Ok(DecodedFlightData {
header_type: MessageHeader::DictionaryBatch,
app_metadata: proto::IngesterQueryResponseMetadata::default(),
}),
Ok(DecodedFlightData {
header_type: MessageHeader::RecordBatch,
app_metadata: proto::IngesterQueryResponseMetadata::default(),
}),
],
)
.await;
}
struct DecodedFlightData {
header_type: MessageHeader,
app_metadata: proto::IngesterQueryResponseMetadata,
}
async fn assert_get_stream(
inputs: Vec<Result<FlatIngesterQueryResponse, ArrowError>>,
expected: Vec<Result<DecodedFlightData, tonic::Code>>,
) {
let inner = Box::pin(futures::stream::iter(inputs));
let stream = GetStream::new(inner);
let actual: Vec<_> = stream.collect().await;
assert_eq!(actual.len(), expected.len());
for (actual, expected) in actual.into_iter().zip(expected) {
match (actual, expected) {
(Ok(actual), Ok(expected)) => {
let header_type = arrow::ipc::root_as_message(&actual.data_header[..])
.unwrap()
.header_type();
assert_eq!(header_type, expected.header_type);
let app_metadata: proto::IngesterQueryResponseMetadata =
prost::Message::decode(&actual.app_metadata[..]).unwrap();
assert_eq!(app_metadata, expected.app_metadata);
}
(Err(actual), Err(expected)) => {
assert_eq!(actual.code(), expected);
}
(Ok(_), Err(_)) => panic!("Actual is Ok but expected is Err"),
(Err(_), Ok(_)) => panic!("Actual is Err but expected is Ok"),
}
}
}
}

View File

@ -0,0 +1,524 @@
use crate::{
handler::IngestHandler,
querier_handler::{FlatIngesterQueryResponse, FlatIngesterQueryResponseStream},
};
use arrow::error::ArrowError;
use arrow_flight::{
flight_service_server::FlightService as Flight, Action, ActionType, Criteria, Empty,
FlightData, FlightDescriptor, FlightInfo, HandshakeRequest, HandshakeResponse, IpcMessage,
PutResult, SchemaAsIpc, SchemaResult, Ticket,
};
use data_types::{NamespaceId, TableId};
use flatbuffers::FlatBufferBuilder;
use futures::Stream;
use generated_types::influxdata::iox::ingester::v1::{self as proto};
use observability_deps::tracing::*;
use pin_project::pin_project;
use prost::Message;
use snafu::{ResultExt, Snafu};
use std::{
pin::Pin,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
task::Poll,
};
use tonic::{Request, Response, Streaming};
use trace::{ctx::SpanContext, span::SpanExt};
#[derive(Debug, Snafu)]
#[allow(missing_docs)]
pub enum Error {
#[snafu(display("Invalid ticket. Error: {:?} Ticket: {:?}", source, ticket))]
InvalidTicket {
source: prost::DecodeError,
ticket: Vec<u8>,
},
#[snafu(display("Invalid query, could not convert protobuf: {}", source))]
InvalidQuery {
source: generated_types::google::FieldViolation,
},
#[snafu(display("Error while performing query: {}", source))]
Query {
source: Box<crate::querier_handler::Error>,
},
#[snafu(display("No Namespace Data found for the given namespace ID {}", namespace_id,))]
NamespaceNotFound { namespace_id: NamespaceId },
#[snafu(display(
"No Table Data found for the given namespace ID {}, table ID {}",
namespace_id,
table_id
))]
TableNotFound {
namespace_id: NamespaceId,
table_id: TableId,
},
#[snafu(display("Error while streaming query results: {}", source))]
QueryStream { source: ArrowError },
#[snafu(display("Error during protobuf serialization: {}", source))]
Serialization { source: prost::EncodeError },
}
impl From<Error> for tonic::Status {
/// Logs and converts a result from the business logic into the appropriate tonic status
fn from(err: Error) -> Self {
// An explicit match on the Error enum will ensure appropriate logging is handled for any
// new error variants.
let msg = "Error handling Flight gRPC request";
match err {
Error::InvalidTicket { .. }
| Error::InvalidQuery { .. }
| Error::Query { .. }
| Error::NamespaceNotFound { .. }
| Error::TableNotFound { .. } => {
debug!(e=%err, msg)
}
Error::QueryStream { .. } | Error::Serialization { .. } => {
warn!(e=%err, msg)
}
}
err.to_status()
}
}
impl Error {
/// Converts a result from the business logic into the appropriate tonic status
fn to_status(&self) -> tonic::Status {
use tonic::Status;
match self {
Self::InvalidTicket { .. } | Self::InvalidQuery { .. } => {
Status::invalid_argument(self.to_string())
}
Self::Query { .. } | Self::QueryStream { .. } | Self::Serialization { .. } => {
Status::internal(self.to_string())
}
Self::NamespaceNotFound { .. } | Self::TableNotFound { .. } => {
Status::not_found(self.to_string())
}
}
}
}
/// Concrete implementation of the gRPC Arrow Flight Service API
#[derive(Debug)]
pub(super) struct FlightService<I: IngestHandler + Send + Sync + 'static> {
ingest_handler: Arc<I>,
/// How many `do_get` flight requests should panic for testing purposes.
///
/// Every panic will decrease the counter until it reaches zero. At zero, no panics will occur.
test_flight_do_get_panic: Arc<AtomicU64>,
}
impl<I> FlightService<I>
where
I: IngestHandler + Send + Sync + 'static,
{
pub(super) fn new(ingest_handler: Arc<I>, test_flight_do_get_panic: Arc<AtomicU64>) -> Self {
Self {
ingest_handler,
test_flight_do_get_panic,
}
}
fn maybe_panic_in_flight_do_get(&self) {
loop {
let current = self.test_flight_do_get_panic.load(Ordering::SeqCst);
if current == 0 {
return;
}
if self
.test_flight_do_get_panic
.compare_exchange(current, current - 1, Ordering::SeqCst, Ordering::SeqCst)
.is_ok()
{
break;
}
}
panic!("Panicking in `do_get` for testing purposes.");
}
}
type TonicStream<T> = Pin<Box<dyn Stream<Item = Result<T, tonic::Status>> + Send + 'static>>;
#[tonic::async_trait]
impl<I: IngestHandler + Send + Sync + 'static> Flight for FlightService<I> {
type HandshakeStream = TonicStream<HandshakeResponse>;
type ListFlightsStream = TonicStream<FlightInfo>;
type DoGetStream = TonicStream<FlightData>;
type DoPutStream = TonicStream<PutResult>;
type DoActionStream = TonicStream<arrow_flight::Result>;
type ListActionsStream = TonicStream<ActionType>;
type DoExchangeStream = TonicStream<FlightData>;
async fn get_schema(
&self,
_request: Request<FlightDescriptor>,
) -> Result<Response<SchemaResult>, tonic::Status> {
Err(tonic::Status::unimplemented("Not yet implemented"))
}
async fn do_get(
&self,
request: Request<Ticket>,
) -> Result<Response<Self::DoGetStream>, tonic::Status> {
let span_ctx: Option<SpanContext> = request.extensions().get().cloned();
let ticket = request.into_inner();
let proto_query_request =
proto::IngesterQueryRequest::decode(&*ticket.ticket).context(InvalidTicketSnafu {
ticket: ticket.ticket,
})?;
let query_request = proto_query_request.try_into().context(InvalidQuerySnafu)?;
self.maybe_panic_in_flight_do_get();
let query_response = self
.ingest_handler
.query(query_request, span_ctx.child_span("ingest handler query"))
.await
.map_err(|e| match e {
crate::querier_handler::Error::NamespaceNotFound { namespace_id } => {
Error::NamespaceNotFound { namespace_id }
}
crate::querier_handler::Error::TableNotFound {
namespace_id,
table_id,
} => Error::TableNotFound {
namespace_id,
table_id,
},
_ => Error::Query {
source: Box::new(e),
},
})?;
let output = GetStream::new(query_response.flatten());
Ok(Response::new(Box::pin(output) as Self::DoGetStream))
}
async fn handshake(
&self,
request: Request<Streaming<HandshakeRequest>>,
) -> Result<Response<Self::HandshakeStream>, tonic::Status> {
let request = request.into_inner().message().await?.unwrap();
let response = HandshakeResponse {
protocol_version: request.protocol_version,
payload: request.payload,
};
let output = futures::stream::iter(std::iter::once(Ok(response)));
Ok(Response::new(Box::pin(output) as Self::HandshakeStream))
}
async fn list_flights(
&self,
_request: Request<Criteria>,
) -> Result<Response<Self::ListFlightsStream>, tonic::Status> {
Err(tonic::Status::unimplemented("Not yet implemented"))
}
async fn get_flight_info(
&self,
_request: Request<FlightDescriptor>,
) -> Result<Response<FlightInfo>, tonic::Status> {
Err(tonic::Status::unimplemented("Not yet implemented"))
}
async fn do_put(
&self,
_request: Request<Streaming<FlightData>>,
) -> Result<Response<Self::DoPutStream>, tonic::Status> {
Err(tonic::Status::unimplemented("Not yet implemented"))
}
async fn do_action(
&self,
_request: Request<Action>,
) -> Result<Response<Self::DoActionStream>, tonic::Status> {
Err(tonic::Status::unimplemented("Not yet implemented"))
}
async fn list_actions(
&self,
_request: Request<Empty>,
) -> Result<Response<Self::ListActionsStream>, tonic::Status> {
Err(tonic::Status::unimplemented("Not yet implemented"))
}
async fn do_exchange(
&self,
_request: Request<Streaming<FlightData>>,
) -> Result<Response<Self::DoExchangeStream>, tonic::Status> {
Err(tonic::Status::unimplemented("Not yet implemented"))
}
}
#[pin_project]
struct GetStream {
#[pin]
inner: Pin<Box<dyn Stream<Item = Result<FlatIngesterQueryResponse, ArrowError>> + Send>>,
done: bool,
buffer: Vec<FlightData>,
}
impl GetStream {
fn new(inner: FlatIngesterQueryResponseStream) -> Self {
Self {
inner,
done: false,
buffer: vec![],
}
}
}
impl Stream for GetStream {
type Item = Result<FlightData, tonic::Status>;
fn poll_next(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let this = self.project();
if !this.buffer.is_empty() {
let next = this.buffer.remove(0);
return Poll::Ready(Some(Ok(next)));
}
if *this.done {
Poll::Ready(None)
} else {
match this.inner.poll_next(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(None) => {
*this.done = true;
Poll::Ready(None)
}
Poll::Ready(Some(Err(e))) => {
*this.done = true;
let e = Error::QueryStream { source: e }.into();
Poll::Ready(Some(Err(e)))
}
Poll::Ready(Some(Ok(FlatIngesterQueryResponse::StartPartition {
partition_id,
status,
}))) => {
let mut bytes = bytes::BytesMut::new();
let app_metadata = proto::IngesterQueryResponseMetadata {
partition_id: partition_id.get(),
status: Some(proto::PartitionStatus {
parquet_max_sequence_number: status
.parquet_max_sequence_number
.map(|x| x.get()),
}),
};
prost::Message::encode(&app_metadata, &mut bytes)
.context(SerializationSnafu)?;
let flight_data = arrow_flight::FlightData::new(
None,
IpcMessage(build_none_flight_msg()),
bytes.to_vec(),
vec![],
);
Poll::Ready(Some(Ok(flight_data)))
}
Poll::Ready(Some(Ok(FlatIngesterQueryResponse::StartSnapshot { schema }))) => {
let options = arrow::ipc::writer::IpcWriteOptions::default();
let flight_data: FlightData = SchemaAsIpc::new(&schema, &options).into();
Poll::Ready(Some(Ok(flight_data)))
}
Poll::Ready(Some(Ok(FlatIngesterQueryResponse::RecordBatch { batch }))) => {
let options = arrow::ipc::writer::IpcWriteOptions::default();
let (mut flight_dictionaries, flight_batch) =
arrow_flight::utils::flight_data_from_arrow_batch(&batch, &options);
std::mem::swap(this.buffer, &mut flight_dictionaries);
this.buffer.push(flight_batch);
let next = this.buffer.remove(0);
Poll::Ready(Some(Ok(next)))
}
}
}
}
}
fn build_none_flight_msg() -> Vec<u8> {
let mut fbb = FlatBufferBuilder::new();
let mut message = arrow::ipc::MessageBuilder::new(&mut fbb);
message.add_version(arrow::ipc::MetadataVersion::V5);
message.add_header_type(arrow::ipc::MessageHeader::NONE);
message.add_bodyLength(0);
let data = message.finish();
fbb.finish(data, None);
fbb.finished_data().to_vec()
}
#[cfg(test)]
mod tests {
use arrow::{error::ArrowError, ipc::MessageHeader};
use data_types::PartitionId;
use futures::StreamExt;
use generated_types::influxdata::iox::ingester::v1::{self as proto};
use mutable_batch_lp::test_helpers::lp_to_mutable_batch;
use schema::Projection;
use crate::querier_handler::{FlatIngesterQueryResponse, PartitionStatus};
use super::*;
#[tokio::test]
async fn test_get_stream_empty() {
assert_get_stream(vec![], vec![]).await;
}
#[tokio::test]
async fn test_get_stream_all_types() {
let batch = lp_to_mutable_batch("table z=1 0")
.1
.to_arrow(Projection::All)
.unwrap();
let schema = batch.schema();
assert_get_stream(
vec![
Ok(FlatIngesterQueryResponse::StartPartition {
partition_id: PartitionId::new(1),
status: PartitionStatus {
parquet_max_sequence_number: None,
},
}),
Ok(FlatIngesterQueryResponse::StartSnapshot { schema }),
Ok(FlatIngesterQueryResponse::RecordBatch { batch }),
],
vec![
Ok(DecodedFlightData {
header_type: MessageHeader::NONE,
app_metadata: proto::IngesterQueryResponseMetadata {
partition_id: 1,
status: Some(proto::PartitionStatus {
parquet_max_sequence_number: None,
}),
},
}),
Ok(DecodedFlightData {
header_type: MessageHeader::Schema,
app_metadata: proto::IngesterQueryResponseMetadata::default(),
}),
Ok(DecodedFlightData {
header_type: MessageHeader::RecordBatch,
app_metadata: proto::IngesterQueryResponseMetadata::default(),
}),
],
)
.await;
}
#[tokio::test]
async fn test_get_stream_shortcuts_err() {
assert_get_stream(
vec![
Ok(FlatIngesterQueryResponse::StartPartition {
partition_id: PartitionId::new(1),
status: PartitionStatus {
parquet_max_sequence_number: None,
},
}),
Err(ArrowError::IoError("foo".into())),
Ok(FlatIngesterQueryResponse::StartPartition {
partition_id: PartitionId::new(1),
status: PartitionStatus {
parquet_max_sequence_number: None,
},
}),
],
vec![
Ok(DecodedFlightData {
header_type: MessageHeader::NONE,
app_metadata: proto::IngesterQueryResponseMetadata {
partition_id: 1,
status: Some(proto::PartitionStatus {
parquet_max_sequence_number: None,
}),
},
}),
Err(tonic::Code::Internal),
],
)
.await;
}
#[tokio::test]
async fn test_get_stream_dictionary_batches() {
let batch = lp_to_mutable_batch("table,x=\"foo\",y=\"bar\" z=1 0")
.1
.to_arrow(Projection::All)
.unwrap();
assert_get_stream(
vec![Ok(FlatIngesterQueryResponse::RecordBatch { batch })],
vec![
Ok(DecodedFlightData {
header_type: MessageHeader::DictionaryBatch,
app_metadata: proto::IngesterQueryResponseMetadata::default(),
}),
Ok(DecodedFlightData {
header_type: MessageHeader::DictionaryBatch,
app_metadata: proto::IngesterQueryResponseMetadata::default(),
}),
Ok(DecodedFlightData {
header_type: MessageHeader::RecordBatch,
app_metadata: proto::IngesterQueryResponseMetadata::default(),
}),
],
)
.await;
}
struct DecodedFlightData {
header_type: MessageHeader,
app_metadata: proto::IngesterQueryResponseMetadata,
}
async fn assert_get_stream(
inputs: Vec<Result<FlatIngesterQueryResponse, ArrowError>>,
expected: Vec<Result<DecodedFlightData, tonic::Code>>,
) {
let inner = Box::pin(futures::stream::iter(inputs));
let stream = GetStream::new(inner);
let actual: Vec<_> = stream.collect().await;
assert_eq!(actual.len(), expected.len());
for (actual, expected) in actual.into_iter().zip(expected) {
match (actual, expected) {
(Ok(actual), Ok(expected)) => {
let header_type = arrow::ipc::root_as_message(&actual.data_header[..])
.unwrap()
.header_type();
assert_eq!(header_type, expected.header_type);
let app_metadata: proto::IngesterQueryResponseMetadata =
prost::Message::decode(&actual.app_metadata[..]).unwrap();
assert_eq!(app_metadata, expected.app_metadata);
}
(Err(actual), Err(expected)) => {
assert_eq!(actual.code(), expected);
}
(Ok(_), Err(_)) => panic!("Actual is Ok but expected is Err"),
(Err(_), Ok(_)) => panic!("Actual is Err but expected is Ok"),
}
}
}
}

View File

@ -0,0 +1,377 @@
use data_types::{NamespaceId, PartitionKey, TableId};
use dml::{DmlMeta, DmlOperation, DmlWrite};
use generated_types::influxdata::iox::ingester::v1::{
self as proto, write_service_server::WriteService,
};
use mutable_batch::writer;
use mutable_batch_pb::decode::decode_database_batch;
use observability_deps::tracing::*;
use thiserror::Error;
use tonic::{Request, Response};
use crate::{data::DmlApplyAction, stream_handler::DmlSink};
// A list of error states when handling an RPC write request.
//
// Note that this isn't strictly necessary as the [`WriteService`] trait
// expects a [`tonic::Status`] error value, but by defining the errors here they
// serve as documentation of the potential error states (which are then
// converted into [`tonic::Status`] for the handler).
#[derive(Debug, Error)]
enum RpcError {
#[error("rpc write request does not contain a payload")]
NoPayload,
#[error("rpc write request does not contain any table data")]
NoTables,
#[error(transparent)]
Decode(mutable_batch_pb::decode::Error),
#[error(transparent)]
Apply(crate::data::Error),
}
impl From<RpcError> for tonic::Status {
fn from(e: RpcError) -> Self {
use crate::data::Error;
match e {
RpcError::Decode(_) | RpcError::NoPayload | RpcError::NoTables => {
Self::invalid_argument(e.to_string())
}
RpcError::Apply(Error::BufferWrite { source }) => map_write_error(source),
RpcError::Apply(Error::ShardNotFound { .. }) => {
// This is not a reachable error state in the gRPC write model,
// and is enumerated here purely because of error conflation
// (one big error type instead of small, composable errors).
unreachable!()
}
}
}
}
/// Map a [`mutable_batch::Error`] to a [`tonic::Status`].
///
/// This method takes care to enumerate all possible error states, so that new
/// error additions cause a compilation failure, and therefore require the new
/// error to be explicitly mapped to a gRPC status code.
fn map_write_error(e: mutable_batch::Error) -> tonic::Status {
use tonic::Status;
match e {
mutable_batch::Error::ColumnError { .. }
| mutable_batch::Error::ArrowError { .. }
| mutable_batch::Error::InternalSchema { .. }
| mutable_batch::Error::ColumnNotFound { .. }
| mutable_batch::Error::WriterError {
source: writer::Error::KeyNotFound { .. } | writer::Error::InsufficientValues { .. },
} => Status::internal(e.to_string()),
mutable_batch::Error::WriterError {
source: writer::Error::TypeMismatch { .. },
} => {
// While a schema type conflict is ultimately a user error, if it
// reaches the ingester it should have already passed through schema
// validation in the router, and as such it is an internal system
// failure.
Status::internal(e.to_string())
}
}
}
/// A gRPC [`WriteService`] handler.
///
/// This handler accepts writes from an upstream, and applies them to the
/// provided [`DmlSink`].
#[derive(Debug)]
pub struct RpcWrite<T> {
sink: T,
}
impl<T> RpcWrite<T> {
/// Instantiate a new [`RpcWrite`] that pushes [`DmlOperation`] instances
/// into `sink`.
#[allow(dead_code)]
pub fn new(sink: T) -> Self {
Self { sink }
}
}
#[tonic::async_trait]
impl<T> WriteService for RpcWrite<T>
where
T: DmlSink + 'static,
{
async fn write(
&self,
request: Request<proto::WriteRequest>,
) -> Result<Response<proto::WriteResponse>, tonic::Status> {
let remote_addr = request
.remote_addr()
.map(|v| v.to_string())
.unwrap_or_else(|| "<unknown>".to_string());
// Extract the write payload
let payload = request.into_inner().payload.ok_or(RpcError::NoPayload)?;
let batches = decode_database_batch(&payload).map_err(RpcError::Decode)?;
let num_tables = batches.len();
let namespace_id = NamespaceId::new(payload.database_id);
let partition_key = PartitionKey::from(payload.partition_key);
// Never attempt to create a DmlWrite with no tables - doing so causes a
// panic.
if num_tables == 0 {
return Err(RpcError::NoTables)?;
}
trace!(
remote_addr,
num_tables,
%namespace_id,
%partition_key,
"received rpc write"
);
// Reconstruct the DML operation
let op = DmlWrite::new(
namespace_id,
batches
.into_iter()
.map(|(k, v)| (TableId::new(k), v))
.collect(),
partition_key,
// The tracing context should be propagated over the RPC boundary.
//
// See https://github.com/influxdata/influxdb_iox/issues/6177
DmlMeta::unsequenced(None),
);
// Apply the DML op to the in-memory buffer.
match self.sink.apply(DmlOperation::Write(op)).await {
Ok(DmlApplyAction::Applied(_)) => {
// Discard the lifecycle manager's "should_pause" hint.
}
Ok(DmlApplyAction::Skipped) => {
// Assert that the write was not skipped due to having a non-monotonic
// sequence number. In this gRPC write model, there are no sequence
// numbers!
unreachable!("rpc write saw skipped op apply call")
}
Err(e) => {
error!(error=%e, "failed to apply DML op");
return Err(RpcError::Apply(e))?;
}
}
Ok(Response::new(proto::WriteResponse {}))
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use assert_matches::assert_matches;
use generated_types::influxdata::pbdata::v1::{
column::{SemanticType, Values},
Column, DatabaseBatch, TableBatch,
};
use crate::stream_handler::mock_sink::MockDmlSink;
use super::*;
const NAMESPACE_ID: NamespaceId = NamespaceId::new(42);
const PARTITION_KEY: &str = "bananas";
macro_rules! test_rpc_write {
(
$name:ident,
request = $request:expr, // Proto WriteRequest request the server receives
sink_ret = $sink_ret:expr, // The mock return value from the DmlSink, if called
want_err = $want_err:literal, // The expectation of an error from the handler
want_calls = $($want_calls:tt)+ //
) => {
paste::paste! {
#[tokio::test]
async fn [<test_rpc_write_ $name>]() {
let mock = Arc::new(
MockDmlSink::default().with_apply_return(vec![$sink_ret]),
);
let handler = RpcWrite::new(Arc::clone(&mock));
let ret = handler
.write(Request::new($request))
.await;
assert_eq!(ret.is_err(), $want_err, "wanted handler error {} got {:?}", $want_err, ret);
assert_matches!(mock.get_calls().as_slice(), $($want_calls)+);
}
}
};
}
test_rpc_write!(
apply_ok_pause_true,
request = proto::WriteRequest {
payload: Some(DatabaseBatch {
database_id: NAMESPACE_ID.get(),
partition_key: PARTITION_KEY.to_string(),
table_batches: vec![TableBatch {
table_id: 42,
columns: vec![Column {
column_name: "time".to_string(),
semantic_type: SemanticType::Time.into(),
values: Some(Values {
i64_values: vec![4242],
f64_values: vec![],
u64_values: vec![],
string_values: vec![],
bool_values: vec![],
bytes_values: vec![],
packed_string_values: None,
interned_string_values: None,
}),
null_mask: vec![0],
}],
row_count: 1,
}],
}),
},
sink_ret = Ok(DmlApplyAction::Applied(true)),
want_err = false,
want_calls = [DmlOperation::Write(w)] => {
// Assert the various DmlWrite properties match the expected values
assert_eq!(w.namespace_id(), NAMESPACE_ID);
assert_eq!(w.table_count(), 1);
assert_eq!(*w.partition_key(), PartitionKey::from(PARTITION_KEY));
}
);
test_rpc_write!(
apply_ok_pause_false,
request = proto::WriteRequest {
payload: Some(DatabaseBatch {
database_id: NAMESPACE_ID.get(),
partition_key: PARTITION_KEY.to_string(),
table_batches: vec![TableBatch {
table_id: 42,
columns: vec![Column {
column_name: "time".to_string(),
semantic_type: SemanticType::Time.into(),
values: Some(Values {
i64_values: vec![4242],
f64_values: vec![],
u64_values: vec![],
string_values: vec![],
bool_values: vec![],
bytes_values: vec![],
packed_string_values: None,
interned_string_values: None,
}),
null_mask: vec![0],
}],
row_count: 1,
}],
}),
},
sink_ret = Ok(DmlApplyAction::Applied(false)),
want_err = false,
want_calls = [DmlOperation::Write(w)] => {
// Assert the various DmlWrite properties match the expected values
assert_eq!(w.namespace_id(), NAMESPACE_ID);
assert_eq!(w.table_count(), 1);
assert_eq!(*w.partition_key(), PartitionKey::from(PARTITION_KEY));
}
);
test_rpc_write!(
no_payload,
request = proto::WriteRequest { payload: None },
sink_ret = Ok(DmlApplyAction::Applied(false)),
want_err = true,
want_calls = []
);
test_rpc_write!(
no_tables,
request = proto::WriteRequest {
payload: Some(DatabaseBatch {
database_id: NAMESPACE_ID.get(),
partition_key: PARTITION_KEY.to_string(),
table_batches: vec![],
}),
},
sink_ret = Ok(DmlApplyAction::Applied(false)),
want_err = true,
want_calls = []
);
test_rpc_write!(
batch_error,
request = proto::WriteRequest {
payload: Some(DatabaseBatch {
database_id: NAMESPACE_ID.get(),
partition_key: PARTITION_KEY.to_string(),
table_batches: vec![TableBatch {
table_id: 42,
columns: vec![Column {
column_name: "time".to_string(),
semantic_type: SemanticType::Time.into(),
values: Some(Values {
i64_values: vec![4242],
f64_values: vec![],
u64_values: vec![4242], // Two types for one column
string_values: vec![],
bool_values: vec![],
bytes_values: vec![],
packed_string_values: None,
interned_string_values: None,
}),
null_mask: vec![0],
}],
row_count: 1,
}],
}),
},
sink_ret = Ok(DmlApplyAction::Applied(false)),
want_err = true,
want_calls = []
);
#[tokio::test]
#[should_panic(expected = "unreachable")]
async fn test_rpc_write_apply_skipped() {
let mock =
Arc::new(MockDmlSink::default().with_apply_return(vec![Ok(DmlApplyAction::Skipped)]));
let handler = RpcWrite::new(Arc::clone(&mock));
let _ = handler
.write(Request::new(proto::WriteRequest {
payload: Some(DatabaseBatch {
database_id: NAMESPACE_ID.get(),
partition_key: PARTITION_KEY.to_string(),
table_batches: vec![TableBatch {
table_id: 42,
columns: vec![Column {
column_name: "time".to_string(),
semantic_type: SemanticType::Time.into(),
values: Some(Values {
i64_values: vec![4242],
f64_values: vec![],
u64_values: vec![],
string_values: vec![],
bool_values: vec![],
bytes_values: vec![],
packed_string_values: None,
interned_string_values: None,
}),
null_mask: vec![0],
}],
row_count: 1,
}],
}),
}))
.await;
}
}

View File

@ -0,0 +1,57 @@
use std::sync::Arc;
use generated_types::influxdata::iox::ingester::v1::{
self as proto, write_info_service_server::WriteInfoService,
};
use observability_deps::tracing::*;
use tonic::{Request, Response};
use write_summary::WriteSummary;
use crate::handler::IngestHandler;
/// Implementation of write info
pub(super) struct WriteInfoServiceImpl {
handler: Arc<dyn IngestHandler + Send + Sync + 'static>,
}
impl WriteInfoServiceImpl {
pub fn new(handler: Arc<dyn IngestHandler + Send + Sync + 'static>) -> Self {
Self { handler }
}
}
#[tonic::async_trait]
impl WriteInfoService for WriteInfoServiceImpl {
async fn get_write_info(
&self,
request: Request<proto::GetWriteInfoRequest>,
) -> Result<Response<proto::GetWriteInfoResponse>, tonic::Status> {
let proto::GetWriteInfoRequest { write_token } = request.into_inner();
let write_summary =
WriteSummary::try_from_token(&write_token).map_err(tonic::Status::invalid_argument)?;
let progresses = self.handler.progresses(write_summary.shard_indexes()).await;
let shard_infos = progresses
.into_iter()
.map(|(shard_index, progress)| {
let status = write_summary
.write_status(shard_index, &progress)
.map_err(|e| tonic::Status::invalid_argument(e.to_string()))?;
let shard_index = shard_index.get();
let status = proto::ShardStatus::from(status);
debug!(shard_index, ?status, "write info status",);
Ok(proto::ShardInfo {
shard_index,
status: status.into(),
})
})
.collect::<Result<Vec<_>, tonic::Status>>()?;
Ok(tonic::Response::new(proto::GetWriteInfoResponse {
shard_infos,
}))
}
}