feat: add schema+batch serde for ingester->querier V2 (#8498)

* feat: `PartitionIdentifier` serde

* fix: typo

* refactor: use `Bytes` for V2 protocols

* feat: add schema+batch serde for i->q V2

---------

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Marco Neumann 2023-08-22 15:30:47 +02:00 committed by GitHub
parent 53a2ece7b3
commit 50ab9afd5e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 530 additions and 9 deletions

3
Cargo.lock generated
View File

@ -2713,10 +2713,13 @@ dependencies = [
name = "ingester_query_grpc"
version = "0.1.0"
dependencies = [
"arrow",
"base64 0.21.2",
"bytes",
"data_types",
"datafusion",
"datafusion-proto",
"flatbuffers",
"pbjson",
"pbjson-build",
"predicate",

View File

@ -6,10 +6,13 @@ edition.workspace = true
license.workspace = true
[dependencies] # In alphabetical order
arrow = { workspace = true, features = ["prettyprint", "dyn_cmp_dict"] }
bytes = "1.4"
base64 = "0.21"
data_types = { path = "../data_types" }
datafusion = { workspace = true }
datafusion-proto = { workspace = true }
flatbuffers = "23.5.26"
pbjson = "0.5"
predicate = { path = "../predicate" }
prost = "0.11"
@ -23,4 +26,3 @@ workspace-hack = { version = "0.1", path = "../workspace-hack" }
tonic-build = { workspace = true }
prost-build = "0.11"
pbjson-build = "0.5"

View File

@ -41,7 +41,8 @@ fn generate_grpc_types(root: &Path) -> Result<()> {
.extern_path(".google.protobuf", "::pbjson_types")
.btree_map([
".influxdata.iox.ingester.v1.IngesterQueryResponseMetadata.unpersisted_partitions",
]);
])
.bytes([".influxdata.iox.ingester.v2"]);
let descriptor_path = PathBuf::from(env::var("OUT_DIR").unwrap()).join("proto_descriptor.bin");
tonic_build::configure()

View File

@ -36,6 +36,24 @@ message Filters {
repeated bytes exprs = 1;
}
// Arrow encoded data.
message EncodedData {
// Data that describes the arrow payload.
bytes ipc_message = 1;
// The actual arrow payload itself.
bytes arrow_data = 2;
}
// An encoded Arrow RecordBatch w/o schema information.
message RecordBatch {
// Dictionary data.
repeated EncodedData dictionaries = 1;
// Record batch itself.
EncodedData batch = 2;
}
message QueryRequest {
// Namespace to search
int64 namespace_id = 1;
@ -50,7 +68,7 @@ message QueryRequest {
Filters filters = 4;
}
message IngesterQueryReponseMetadata {
message IngesterQueryResponseMetadata {
message Partition {
// Partition ID.
PartitionIdentifier id = 1;
@ -81,7 +99,7 @@ message IngesterQueryReponseMetadata {
repeated Partition partitions = 4;
}
message IngesterQueryReponsePayload {
message IngesterQueryResponsePayload {
// Partition ID.
PartitionIdentifier partition_id = 1;
@ -94,16 +112,16 @@ message IngesterQueryReponsePayload {
repeated int64 projection = 2;
// Serialized RecordBatch (w/o schema)
bytes record_batch = 3;
RecordBatch record_batch = 3;
}
message QueryResponse {
oneof msg {
// Metadata, this is ALWAYS the first message (even when there are no further messages) and MUST NOT be repeated.
IngesterQueryReponseMetadata metadata = 1;
IngesterQueryResponseMetadata metadata = 1;
// Payload, following the first message.
IngesterQueryReponsePayload payload = 2;
IngesterQueryResponsePayload payload = 2;
}
}

View File

@ -0,0 +1,450 @@
//! (De-)Serialization of Apache Arrow [`Schema`] and [`RecordBatch`] data.
//!
//! **⚠️ These routines are IOx-specific and MUST NOT be used as a public interface!**
//!
//! Specifically this is a custom protocol, similar to but not derived from Arrow Flight.
//! See <https://github.com/influxdata/influxdb_iox/issues/8169>.
use std::{collections::HashMap, sync::Arc};
use arrow::{
buffer::Buffer,
datatypes::{DataType, Field, FieldRef, Schema, SchemaRef},
error::ArrowError,
ipc::{
convert::fb_to_schema,
reader::{read_dictionary, read_record_batch},
root_as_message,
writer::{DictionaryTracker, EncodedData, IpcDataGenerator, IpcWriteOptions},
},
record_batch::{RecordBatch, RecordBatchOptions},
};
use bytes::Bytes;
use flatbuffers::InvalidFlatbuffer;
use snafu::{ensure, OptionExt, ResultExt, Snafu};
use crate::influxdata::iox::ingester::v2 as proto2;
/// Serialize [`Schema`] to [`Bytes`].
pub fn schema_to_bytes(schema: &Schema) -> Bytes {
let EncodedData {
ipc_message,
arrow_data,
} = IpcDataGenerator::default().schema_to_bytes(schema, &write_options());
assert!(
arrow_data.is_empty(),
"arrow_data should always be empty for schema messages"
);
ipc_message.into()
}
#[derive(Debug, Snafu)]
#[snafu(module)]
pub enum BytesToSchemaError {
#[snafu(display("Unable to get root as message: {source}"))]
RootAsMessage { source: InvalidFlatbuffer },
#[snafu(display("Unable to read IPC message as schema, is: {variant}"))]
WrongMessageType { variant: &'static str },
}
/// Read [`Schema`] from bytes.
pub fn bytes_to_schema(data: &[u8]) -> Result<Schema, BytesToSchemaError> {
let message = root_as_message(data).context(bytes_to_schema_error::RootAsMessageSnafu)?;
let ipc_schema =
message
.header_as_schema()
.context(bytes_to_schema_error::WrongMessageTypeSnafu {
variant: message.header_type().variant_name().unwrap_or("<UNKNOWN>"),
})?;
let schema = fb_to_schema(ipc_schema);
Ok(schema)
}
/// Encoder to read/write Arrow [`RecordBatch`]es from/to [`proto2::RecordBatch`].
#[derive(Debug)]
pub struct BatchEncoder {
/// The original batch schema.
batch_schema: SchemaRef,
/// Schema with unique dictionary IDs.
dict_schema: SchemaRef,
}
#[derive(Debug, Snafu)]
#[snafu(module)]
pub enum ProjectError {
#[snafu(display("Cannot project: {source}"))]
CannotProject { source: ArrowError },
}
#[derive(Debug, Snafu)]
#[snafu(module)]
pub enum WriteError {
#[snafu(display("Invalid batch schema\n\nActual:\n{actual}\n\nExpected:\n{expected}"))]
InvalidSchema {
actual: SchemaRef,
expected: SchemaRef,
},
}
#[derive(Debug, Snafu)]
#[snafu(module)]
pub enum ReadError {
#[snafu(display("Unable to get root as dictionary message #{idx} (0-based): {source}"))]
DictionaryRootAsMessage {
source: InvalidFlatbuffer,
idx: usize,
},
#[snafu(display("Unable to read IPC message #{idx} (0-based) as dictionary, is: {variant}"))]
DictionaryWrongMessageType { variant: &'static str, idx: usize },
#[snafu(display("Cannot read dictionary: {source}"))]
ReadDictionary { source: ArrowError },
#[snafu(display("Record batch is required but missing"))]
RecordBatchRequired,
#[snafu(display("Unable to get root as record batch message: {source}"))]
RecordBatchRootAsMessage { source: InvalidFlatbuffer },
#[snafu(display("Unable to read IPC message as record batch, is: {variant}"))]
RecordBatchWrongMessageType { variant: &'static str },
#[snafu(display("Cannot read record batch: {source}"))]
ReadRecordBatch { source: ArrowError },
}
impl BatchEncoder {
/// Create new encoder.
///
/// For schemas that contain dictionaries, this involves copying data and may be rather costly. If you can, try to
/// only do this once and use [`project`](Self::project) to select the right columns for the appropriate batch.
pub fn new(batch_schema: SchemaRef) -> Self {
let mut dict_id_counter = 0;
let dict_schema = Arc::new(Schema::new_with_metadata(
batch_schema
.fields()
.iter()
.map(|f| assign_dict_ids(f, &mut dict_id_counter))
.collect::<Vec<_>>(),
batch_schema.metadata().clone(),
));
Self {
batch_schema,
dict_schema,
}
}
/// Project schema stored within this encoder.
pub fn project(&self, indices: &[usize]) -> Result<Self, ProjectError> {
Ok(Self {
batch_schema: Arc::new(
self.batch_schema
.project(indices)
.context(project_error::CannotProjectSnafu)?,
),
dict_schema: Arc::new(
self.dict_schema
.project(indices)
.context(project_error::CannotProjectSnafu)?,
),
})
}
/// Serialize batch.
pub fn write(&self, batch: &RecordBatch) -> Result<proto2::RecordBatch, WriteError> {
ensure!(
batch.schema() == self.batch_schema,
write_error::InvalidSchemaSnafu {
actual: batch.schema(),
expected: Arc::clone(&self.batch_schema),
}
);
let batch = reassign_schema(batch, Arc::clone(&self.dict_schema));
let mut dictionary_tracker = DictionaryTracker::new(true);
let (dictionaries, batch) = IpcDataGenerator::default()
.encoded_batch(&batch, &mut dictionary_tracker, &write_options())
.expect("serialization w/o compression should NEVER fail");
Ok(proto2::RecordBatch {
dictionaries: dictionaries.into_iter().map(|enc| enc.into()).collect(),
batch: Some(batch.into()),
})
}
/// Deserialize batch.
pub fn read(&self, batch: proto2::RecordBatch) -> Result<RecordBatch, ReadError> {
let proto2::RecordBatch {
dictionaries,
batch,
} = batch;
let mut dictionaries_by_field = HashMap::with_capacity(dictionaries.len());
for (idx, enc) in dictionaries.into_iter().enumerate() {
let proto2::EncodedData {
ipc_message,
arrow_data,
} = enc;
let message = root_as_message(&ipc_message)
.context(read_error::DictionaryRootAsMessageSnafu { idx })?;
let dictionary_batch = message.header_as_dictionary_batch().context(
read_error::DictionaryWrongMessageTypeSnafu {
variant: message.header_type().variant_name().unwrap_or("<UNKNOWN>"),
idx,
},
)?;
read_dictionary(
&Buffer::from_vec(arrow_data.to_vec()),
dictionary_batch,
&self.dict_schema,
&mut dictionaries_by_field,
&message.version(),
)
.context(read_error::ReadDictionarySnafu)?;
}
let proto2::EncodedData {
ipc_message,
arrow_data,
} = batch.context(read_error::RecordBatchRequiredSnafu)?;
let message =
root_as_message(&ipc_message).context(read_error::RecordBatchRootAsMessageSnafu)?;
let record_batch = message.header_as_record_batch().context(
read_error::RecordBatchWrongMessageTypeSnafu {
variant: message.header_type().variant_name().unwrap_or("<UNKNOWN>"),
},
)?;
let batch = read_record_batch(
&Buffer::from_vec(arrow_data.to_vec()),
record_batch,
Arc::clone(&self.dict_schema),
&dictionaries_by_field,
None,
&message.version(),
)
.context(read_error::ReadRecordBatchSnafu)?;
Ok(reassign_schema(&batch, Arc::clone(&self.batch_schema)))
}
}
/// Recursively assign unique dictionary IDs.
fn assign_dict_ids(field: &FieldRef, counter: &mut i64) -> FieldRef {
match field.data_type() {
DataType::Dictionary(_, _) => {
let dict_id = *counter;
*counter += 1;
Arc::new(
Field::new_dict(
field.name(),
field.data_type().clone(),
field.is_nullable(),
dict_id,
field.dict_is_ordered().expect("is dict type"),
)
.with_metadata(field.metadata().clone()),
)
}
DataType::Struct(fields) => {
let data_type =
DataType::Struct(fields.iter().map(|f| assign_dict_ids(f, counter)).collect());
Arc::new(field.as_ref().clone().with_data_type(data_type))
}
DataType::Union(fields, mode) => {
let data_type = DataType::Union(
fields
.iter()
.map(|(id, f)| (id, assign_dict_ids(f, counter)))
.collect(),
*mode,
);
Arc::new(field.as_ref().clone().with_data_type(data_type))
}
DataType::List(field) => {
let data_type = DataType::List(assign_dict_ids(field, counter));
Arc::new(field.as_ref().clone().with_data_type(data_type))
}
DataType::LargeList(field) => {
let data_type = DataType::LargeList(assign_dict_ids(field, counter));
Arc::new(field.as_ref().clone().with_data_type(data_type))
}
DataType::FixedSizeList(field, s) => {
let data_type = DataType::FixedSizeList(assign_dict_ids(field, counter), *s);
Arc::new(field.as_ref().clone().with_data_type(data_type))
}
DataType::Map(field, sorted) => {
let data_type = DataType::Map(assign_dict_ids(field, counter), *sorted);
Arc::new(field.as_ref().clone().with_data_type(data_type))
}
_ => Arc::clone(field),
}
}
/// Re-assign schema to given batch.
///
/// This is required to overwrite dictionary IDs.
fn reassign_schema(batch: &RecordBatch, schema: SchemaRef) -> RecordBatch {
RecordBatch::try_new_with_options(
schema,
batch.columns().to_vec(),
&RecordBatchOptions::default().with_row_count(Some(batch.num_rows())),
)
.expect("re-assigning schema should always work")
}
impl From<EncodedData> for proto2::EncodedData {
fn from(enc: EncodedData) -> Self {
let EncodedData {
ipc_message,
arrow_data,
} = enc;
Self {
ipc_message: ipc_message.into(),
arrow_data: arrow_data.into(),
}
}
}
/// Write options that are used for all relevant methods in this module.
fn write_options() -> IpcWriteOptions {
IpcWriteOptions::default()
}
#[cfg(test)]
mod tests {
use std::{collections::HashMap, sync::Arc};
use arrow::{
array::{ArrayRef, Int64Array, StringDictionaryBuilder},
datatypes::Int32Type,
};
use datafusion::{
arrow::datatypes::{DataType, Field},
common::assert_contains,
};
use super::*;
#[test]
fn test_schema_roundtrip() {
let schema = schema();
let bytes = schema_to_bytes(&schema);
// ensure that the deserialization is NOT sensitive to alignment
const MAX_OFFSET: usize = 8;
let mut buffer = Vec::with_capacity(bytes.len() + MAX_OFFSET);
for offset in 0..MAX_OFFSET {
buffer.clear();
buffer.resize(offset, 0u8);
buffer.extend_from_slice(&bytes);
let schema2 = bytes_to_schema(&buffer[offset..]).unwrap();
assert_eq!(schema, schema2);
}
}
#[test]
fn test_record_batch_roundtrip() {
let schema = Arc::new(schema());
let batch = RecordBatch::try_new(
Arc::clone(&schema),
vec![int64array(), dictarray1(), dictarray2(), dictarray1()],
)
.unwrap();
let encoder = BatchEncoder::new(schema);
let encoded = encoder.write(&batch).unwrap();
// check that we actually use dictionaries and don't hydrate them
assert_eq!(encoded.dictionaries.len(), 3);
let batch2 = encoder.read(encoded).unwrap();
assert_eq!(batch, batch2);
}
#[test]
fn test_write_checks_schema() {
let schema = Arc::new(schema());
let batch =
RecordBatch::try_new(Arc::new(schema.project(&[0]).unwrap()), vec![int64array()])
.unwrap();
let encoder = BatchEncoder::new(schema);
let err = encoder.write(&batch).unwrap_err();
assert_contains!(err.to_string(), "Invalid batch schema");
}
#[test]
fn test_project() {
let schema = Arc::new(schema());
let batch = RecordBatch::try_new(
Arc::new(schema.project(&[0, 3, 2]).unwrap()),
vec![int64array(), dictarray1(), dictarray1()],
)
.unwrap();
let encoder = BatchEncoder::new(schema).project(&[0, 3, 2]).unwrap();
let encoded = encoder.write(&batch).unwrap();
// check that we actually use dictionaries and don't hydrate them
assert_eq!(encoded.dictionaries.len(), 2);
let batch2 = encoder.read(encoded).unwrap();
assert_eq!(batch, batch2);
}
fn schema() -> Schema {
Schema::new_with_metadata(
vec![
Field::new("f1", DataType::Int64, true)
.with_metadata(HashMap::from([("k".to_owned(), "v".to_owned())])),
Field::new(
"f2",
DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
false,
),
Field::new(
"f3",
DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
false,
),
Field::new(
"f4",
DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
false,
),
],
HashMap::from([("foo".to_owned(), "bar".to_owned())]),
)
}
fn int64array() -> ArrayRef {
Arc::new(Int64Array::from(vec![None, Some(1i64), Some(2i64)]))
}
fn dictarray1() -> ArrayRef {
let mut builder = StringDictionaryBuilder::<Int32Type>::new();
builder.append("foo").unwrap();
builder.append("foo").unwrap();
builder.append("bar").unwrap();
Arc::new(builder.finish())
}
fn dictarray2() -> ArrayRef {
let mut builder = StringDictionaryBuilder::<Int32Type>::new();
builder.append("fo").unwrap();
builder.append("fo").unwrap();
builder.append("ba").unwrap();
Arc::new(builder.finish())
}
}

View File

@ -21,7 +21,9 @@ use workspace_hack as _;
use crate::influxdata::iox::ingester::v1 as proto;
use crate::influxdata::iox::ingester::v2 as proto2;
use base64::{prelude::BASE64_STANDARD, Engine};
use data_types::{NamespaceId, TableId, TimestampRange};
use data_types::{
NamespaceId, PartitionHashId, PartitionId, TableId, TimestampRange, TransitionPartitionId,
};
use datafusion::{common::DataFusionError, prelude::Expr};
use datafusion_proto::bytes::Serializeable;
use predicate::{Predicate, ValueExpr};
@ -57,6 +59,8 @@ pub mod influxdata {
}
}
pub mod arrow_serde;
/// Error returned if a request field has an invalid value. Includes
/// machinery to add parent field names for context -- thus it will
/// report `rules.write_timeout` than simply `write_timeout`.
@ -373,7 +377,6 @@ impl TryFrom<Vec<Expr>> for proto2::Filters {
.iter()
.map(|expr| {
expr.to_bytes()
.map(|bytes| bytes.to_vec())
.map_err(|e| expr_to_bytes_violation("exprs", e))
})
.collect::<Result<Vec<_>, _>>()?;
@ -449,6 +452,50 @@ pub fn decode_proto2_filters_from_base64(
proto2::Filters::decode(predicate_binary.as_slice()).context(ProtobufDecodeSnafu)
}
impl TryFrom<proto2::PartitionIdentifier> for TransitionPartitionId {
type Error = FieldViolation;
fn try_from(value: proto2::PartitionIdentifier) -> Result<Self, Self::Error> {
let proto2::PartitionIdentifier {
partition_identifier,
} = value;
let id =
partition_identifier.ok_or_else(|| FieldViolation::required("partition_identifier"))?;
let id = match id {
proto2::partition_identifier::PartitionIdentifier::CatalogId(id) => {
Self::Deprecated(PartitionId::new(id))
}
proto2::partition_identifier::PartitionIdentifier::HashId(id) => {
Self::Deterministic(PartitionHashId::try_from(id.as_ref()).map_err(|e| {
FieldViolation {
field: "partition_identifier".to_owned(),
description: e.to_string(),
}
})?)
}
};
Ok(id)
}
}
impl From<TransitionPartitionId> for proto2::PartitionIdentifier {
fn from(id: TransitionPartitionId) -> Self {
let id = match id {
TransitionPartitionId::Deprecated(id) => {
proto2::partition_identifier::PartitionIdentifier::CatalogId(id.get())
}
TransitionPartitionId::Deterministic(id) => {
proto2::partition_identifier::PartitionIdentifier::HashId(
id.as_bytes().to_vec().into(),
)
}
};
Self {
partition_identifier: Some(id),
}
}
}
#[cfg(test)]
mod tests {
use std::collections::BTreeSet;