feat: gRPC interface for ingester->querier v2 (#8443)

* feat: gRPC interface for ingester->querier v2

Note that the actual "to/from bytes" conversion for the schema and
batches will be implemented in #8347.

Closes #8346.

* fix: typo

Co-authored-by: Carol (Nichols || Goulding) <193874+carols10cents@users.noreply.github.com>

---------

Co-authored-by: Carol (Nichols || Goulding) <193874+carols10cents@users.noreply.github.com>
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Marco Neumann 2023-08-11 11:52:05 +02:00 committed by GitHub
parent bb1a698cdf
commit b11f999806
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 289 additions and 2 deletions

1
Cargo.lock generated
View File

@ -2725,6 +2725,7 @@ dependencies = [
"query_functions",
"serde",
"snafu",
"tonic",
"tonic-build",
"workspace-hack",
]

View File

@ -16,6 +16,7 @@ prost = "0.11"
query_functions = { path = "../query_functions" }
serde = { version = "1.0", features = ["derive"] }
snafu = "0.7"
tonic = { workspace = true }
workspace-hack = { version = "0.1", path = "../workspace-hack" }
[build-dependencies] # In alphabetical order

View File

@ -20,9 +20,13 @@ fn main() -> Result<()> {
///
/// - `influxdata.iox.ingester.v1.rs`
fn generate_grpc_types(root: &Path) -> Result<()> {
let ingester_path = root.join("influxdata/iox/ingester/v1");
let ingester_path_v1 = root.join("influxdata/iox/ingester/v1");
let ingester_path_v2 = root.join("influxdata/iox/ingester/v2");
let proto_files = vec![ingester_path.join("query.proto")];
let proto_files = vec![
ingester_path_v1.join("query.proto"),
ingester_path_v2.join("query.proto"),
];
// Tell cargo to recompile if any of these proto files are changed
for proto_file in &proto_files {

View File

@ -0,0 +1,113 @@
syntax = "proto3";
package influxdata.iox.ingester.v2;
option go_package = "github.com/influxdata/iox/ingester/v2";
message PartitionIdentifier {
// Either the catalog-assigned partition ID or the deterministic identifier
// created from the table ID and partition key.
//
// For "old-style" partitions that were created before the switch to
// deterministic partition IDs, a `catalog_id` is returned, and this is used
// to address the partition by row ID.
//
// For "new-style" partitions, a deterministic hash-based ID is used to
// address a partition.
//
// Invariant: a partition is EITHER an "old-style", row addressed partition,
// OR a "new-style" hash ID addressed partition for the lifetime of the
// partition.
//
// See <https://github.com/influxdata/idpe/issues/17476>.
oneof partition_identifier {
// An "old-style" partition addressed by catalog row ID.
int64 catalog_id = 7;
// A "new-style" partition addressed by a deterministic hash ID.
bytes hash_id = 11;
}
}
message Filters {
// Optional arbitrary predicates, represented as list of DataFusion expressions applied a logical
// conjunction (aka they are 'AND'ed together). Only rows that evaluate to TRUE for all these
// expressions should be returned. Other rows are excluded from the results.
//
// Encoded using DataFusion's Expr serialization code
repeated bytes exprs = 1;
}
message QueryRequest {
// Namespace to search
int64 namespace_id = 1;
// Table that should be queried.
int64 table_id = 2;
// Columns the query service is interested in
repeated string columns = 3;
// Predicate for filtering.
Filters filters = 4;
}
message IngesterQueryReponseMetadata {
message Partition {
// Partition ID.
PartitionIdentifier id = 1;
// Minimum timestamp.
int64 t_min = 2;
// Maximum timestamp (inclusive).
int64 t_max = 3;
// Projection of the partition.
//
// The projection is represented as a SORTED set of column indices. The indices are 0-based and point to the table schema
// transmitted in this metadata message. They MUST NOT contain any duplicates.
repeated int64 projection = 4;
}
// Ingester UUID
string ingester_uuid = 1;
// Number of persisted parquet files for this ingester.
int64 persist_counter = 2;
// Serialized table schema.
bytes table_schema = 3;
// Ingester partitions.
repeated Partition partitions = 4;
}
message IngesterQueryReponsePayload {
// Partition ID.
PartitionIdentifier partition_id = 1;
// Projection of the record batch.
//
// The projection is represented as a SORTED set of column indices. The indices are 0-based and point to the schema
// transmitted in metadata message. They MUST NOT contain any duplicates.
//
// This MUST be a subset of the partition projection transmitted in the metdata message.
repeated int64 projection = 2;
// Serialized RecordBatch (w/o schema)
bytes record_batch = 3;
}
message QueryResponse {
oneof msg {
// Metadata, this is ALWAYS the first message (even when there are no further messages) and MUST NOT be repeated.
IngesterQueryReponseMetadata metadata = 1;
// Payload, following the first message.
IngesterQueryReponsePayload payload = 2;
}
}
service IngesterQueryService {
// Query ingester for unpersisted data.
rpc Query (QueryRequest) returns (stream QueryResponse);
}

View File

@ -19,6 +19,7 @@
use workspace_hack as _;
use crate::influxdata::iox::ingester::v1 as proto;
use crate::influxdata::iox::ingester::v2 as proto2;
use base64::{prelude::BASE64_STANDARD, Engine};
use data_types::{NamespaceId, TableId, TimestampRange};
use datafusion::{common::DataFusionError, prelude::Expr};
@ -41,6 +42,17 @@ pub mod influxdata {
"/influxdata.iox.ingester.v1.serde.rs"
));
}
pub mod v2 {
// generated code violates a few lints, so opt-out of them
#![allow(clippy::future_not_send)]
include!(concat!(env!("OUT_DIR"), "/influxdata.iox.ingester.v2.rs"));
include!(concat!(
env!("OUT_DIR"),
"/influxdata.iox.ingester.v2.serde.rs"
));
}
}
}
}
@ -175,6 +187,81 @@ impl TryFrom<IngesterQueryRequest> for proto::IngesterQueryRequest {
}
}
/// Request from the querier service to the ingester service
#[derive(Debug, PartialEq, Clone)]
pub struct IngesterQueryRequest2 {
/// namespace to search
pub namespace_id: NamespaceId,
/// Table to search
pub table_id: TableId,
/// Columns the query service is interested in
pub columns: Vec<String>,
/// Predicate for filtering
pub filters: Vec<Expr>,
}
impl IngesterQueryRequest2 {
/// Make a request to return data for a specified table
pub fn new(
namespace_id: NamespaceId,
table_id: TableId,
columns: Vec<String>,
filters: Vec<Expr>,
) -> Self {
Self {
namespace_id,
table_id,
columns,
filters,
}
}
}
impl TryFrom<proto2::QueryRequest> for IngesterQueryRequest2 {
type Error = FieldViolation;
fn try_from(proto: proto2::QueryRequest) -> Result<Self, Self::Error> {
let proto2::QueryRequest {
namespace_id,
table_id,
columns,
filters,
} = proto;
let namespace_id = NamespaceId::new(namespace_id);
let table_id = TableId::new(table_id);
let filters = filters
.map(TryInto::try_into)
.transpose()?
.unwrap_or_default();
Ok(Self::new(namespace_id, table_id, columns, filters))
}
}
impl TryFrom<IngesterQueryRequest2> for proto2::QueryRequest {
type Error = FieldViolation;
fn try_from(query: IngesterQueryRequest2) -> Result<Self, Self::Error> {
let IngesterQueryRequest2 {
namespace_id,
table_id,
columns,
filters,
} = query;
Ok(Self {
namespace_id: namespace_id.get(),
table_id: table_id.get(),
columns,
filters: Some(filters.try_into()?),
})
}
}
impl TryFrom<Predicate> for proto::Predicate {
type Error = FieldViolation;
@ -278,6 +365,41 @@ impl TryFrom<ValueExpr> for proto::ValueExpr {
}
}
impl TryFrom<Vec<Expr>> for proto2::Filters {
type Error = FieldViolation;
fn try_from(filters: Vec<Expr>) -> Result<Self, Self::Error> {
let exprs = filters
.iter()
.map(|expr| {
expr.to_bytes()
.map(|bytes| bytes.to_vec())
.map_err(|e| expr_to_bytes_violation("exprs", e))
})
.collect::<Result<Vec<_>, _>>()?;
Ok(Self { exprs })
}
}
impl TryFrom<proto2::Filters> for Vec<Expr> {
type Error = FieldViolation;
fn try_from(proto: proto2::Filters) -> Result<Self, Self::Error> {
let proto2::Filters { exprs } = proto;
let exprs = exprs
.into_iter()
.map(|bytes| {
Expr::from_bytes_with_registry(&bytes, query_functions::registry())
.map_err(|e| expr_from_bytes_violation("exprs", e))
})
.collect::<Result<Self, _>>()?;
Ok(exprs)
}
}
#[derive(Debug, Snafu)]
pub enum EncodeProtoPredicateFromBase64Error {
#[snafu(display("Cannot encode protobuf: {source}"))]
@ -293,6 +415,15 @@ pub fn encode_proto_predicate_as_base64(
Ok(BASE64_STANDARD.encode(&buf))
}
/// Encodes [`proto2::Filters`] as base64.
pub fn encode_proto2_filters_as_base64(
filters: &proto2::Filters,
) -> Result<String, EncodeProtoPredicateFromBase64Error> {
let mut buf = vec![];
filters.encode(&mut buf).context(ProtobufEncodeSnafu)?;
Ok(BASE64_STANDARD.encode(&buf))
}
#[derive(Debug, Snafu)]
pub enum DecodeProtoPredicateFromBase64Error {
#[snafu(display("Cannot decode base64: {source}"))]
@ -310,6 +441,14 @@ pub fn decode_proto_predicate_from_base64(
proto::Predicate::decode(predicate_binary.as_slice()).context(ProtobufDecodeSnafu)
}
/// Decodes [`proto2::Filters`] from base64 string.
pub fn decode_proto2_filters_from_base64(
s: &str,
) -> Result<proto2::Filters, DecodeProtoPredicateFromBase64Error> {
let predicate_binary = BASE64_STANDARD.decode(s).context(Base64DecodeSnafu)?;
proto2::Filters::decode(predicate_binary.as_slice()).context(ProtobufDecodeSnafu)
}
#[cfg(test)]
mod tests {
use std::collections::BTreeSet;
@ -338,6 +477,22 @@ mod tests {
assert_eq!(rust_query, rust_query_converted);
}
#[test]
fn query2_round_trip() {
let rust_query = IngesterQueryRequest2::new(
NamespaceId::new(42),
TableId::new(1337),
vec!["usage".into(), "time".into()],
vec![col("foo").eq(lit(1i64))],
);
let proto_query: proto2::QueryRequest = rust_query.clone().try_into().unwrap();
let rust_query_converted: IngesterQueryRequest2 = proto_query.try_into().unwrap();
assert_eq!(rust_query, rust_query_converted);
}
#[test]
fn predicate_proto_base64_roundtrip() {
let predicate = Predicate {
@ -351,4 +506,17 @@ mod tests {
let predicate2 = decode_proto_predicate_from_base64(&base64).unwrap();
assert_eq!(predicate, predicate2);
}
#[test]
fn filters_proto2_base64_roundtrip() {
let filters = vec![col("col").eq(lit(1i64))];
let filters_1: proto2::Filters = filters.try_into().unwrap();
let base64_1 = encode_proto2_filters_as_base64(&filters_1).unwrap();
let filters_2 = decode_proto2_filters_from_base64(&base64_1).unwrap();
let base64_2 = encode_proto2_filters_as_base64(&filters_2).unwrap();
assert_eq!(filters_1, filters_2);
assert_eq!(base64_1, base64_2);
}
}