refactor: querier<>ingester flight protocol adjustments (#4286)
* refactor: querier<>ingester flight protocol adjustments This makes a few adjustments to the querier<>ingester flight protocol. Query Scope =========== The querier will request data for ALL sequencer IDs for now. There is no reason to have a request per sequencer ID. We can add a range/set filter later if we want, but this is not required for now. Partition-level =============== The only time when the querier cares about sequencer IDs (i.e. sharding) at all is when it selects which ingesters to ask for unpersisted data (this is currently not implemented, it just asks all ingesters). Afterwards the querier only cares about partitions (which are bound to specific sequencers anyways) because this is the level where parquet file persistence and compaction as well as deduplication happen. So we make partitions a first-class citizen in the ingester response. Metadata VS RecordBatches ========================= The global app-metadata will list all partitions and their max persisted parquet files and tombstones (theoretically tombstones are at table-level, but the ingester could in the future break them down to the partition-level). Then it receives a stream of record batches. Each record batch is tagged (via key-value metadata in its schema) so it can be assigned to a partition. At the moment the ingester returns 0 or 1 batches per unpersisted partition (0 in case we've filtered out all the data via the predicate), but in the future it is free to return multiple batches. This setup gives the ingester more freedom over memory management and (potentially parallel) query processing, while at the same time keeps the set of duplicated information minimal and allows easy extensions (since the global metadata is a full-blown protobuf message). Querier ======= At the moment the querier ignores all the metdata. Follow-up PRs will change that. * docs: improve Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org> * refactor: make code clearer Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>pull/24376/head
parent
e96aed6949
commit
83f77712b1
|
@ -803,8 +803,6 @@ pub struct ProcessedTombstone {
|
|||
pub struct IngesterQueryRequest {
|
||||
/// namespace to search
|
||||
pub namespace: String,
|
||||
/// sequencer to search
|
||||
pub sequencer_id: SequencerId,
|
||||
/// Table to search
|
||||
pub table: String,
|
||||
/// Columns the query service is interested in
|
||||
|
@ -814,18 +812,16 @@ pub struct IngesterQueryRequest {
|
|||
}
|
||||
|
||||
impl IngesterQueryRequest {
|
||||
/// Make a request
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
/// Make a request to return data for a specified table for
|
||||
/// all sequencers an ingester is responsible for
|
||||
pub fn new(
|
||||
namespace: String,
|
||||
sequencer_id: SequencerId,
|
||||
table: String,
|
||||
columns: Vec<String>,
|
||||
predicate: Option<Predicate>,
|
||||
) -> Self {
|
||||
Self {
|
||||
namespace,
|
||||
sequencer_id,
|
||||
table,
|
||||
columns,
|
||||
predicate,
|
||||
|
|
|
@ -116,6 +116,7 @@ fn generate_grpc_types(root: &Path) -> Result<()> {
|
|||
".influxdata.iox.write.v1.WriteEntryRequest.entry",
|
||||
])
|
||||
.btree_map(&[
|
||||
".influxdata.iox.ingester.v1.IngesterQueryResponseMetadata.unpersisted_partitions",
|
||||
".influxdata.iox.preserved_catalog.v1.DatabaseCheckpoint.sequencer_numbers",
|
||||
".influxdata.iox.preserved_catalog.v1.PartitionCheckpoint.sequencer_numbers",
|
||||
]);
|
||||
|
|
|
@ -28,8 +28,9 @@ message IngesterQueryRequest {
|
|||
// Namespace to search
|
||||
string namespace = 7;
|
||||
|
||||
// Sequencer to search
|
||||
int32 sequencer_id = 8;
|
||||
// was used to only request data from a single sequencer ID
|
||||
reserved "sequencer_id";
|
||||
reserved 8;
|
||||
}
|
||||
|
||||
// Metadata that the ingester provides to the query service along with the results. Serialized
|
||||
|
@ -43,11 +44,31 @@ message IngesterQueryResponseMetadata {
|
|||
reserved "max_sequencer_number";
|
||||
reserved 2;
|
||||
|
||||
// Max sequence number persisted for this table
|
||||
optional int64 parquet_max_sequence_number = 3;
|
||||
// Was max sequence number persisted for this table
|
||||
reserved "parquet_max_sequence_number";
|
||||
reserved 3;
|
||||
|
||||
// Max sequence number for a tombstone associated with this table
|
||||
optional int64 tombstone_max_sequence_number = 4;
|
||||
// Was max sequence number for a tombstone associated with this table
|
||||
reserved "tombstone_max_sequence_number";
|
||||
reserved 4;
|
||||
|
||||
// Map partition_id to status for every partition that has unpersisted data.
|
||||
//
|
||||
// If a partition does NOT appear within this map, then either all data was persisted or the ingester has never seen
|
||||
// data for this partition. In either case the querier may just read all parquet files for the missing partition
|
||||
// and ensure it has a complete dataset
|
||||
map<int64, PartitionStatus> unpersisted_partitions = 5;
|
||||
}
|
||||
|
||||
// Status of a partition that has unpersisted data.
|
||||
//
|
||||
// Note that this structure is specific to a partition (which itself is bound to a table and sequencer)!
|
||||
message PartitionStatus {
|
||||
// Max sequence number persisted
|
||||
optional int64 parquet_max_sequence_number = 1;
|
||||
|
||||
// Max sequence number for a tombstone associated
|
||||
optional int64 tombstone_max_sequence_number = 2;
|
||||
}
|
||||
|
||||
// Serialization of `predicate::predicate::Predicate` that contains DataFusion `Expr`s
|
||||
|
|
|
@ -1,9 +1,9 @@
|
|||
use crate::{
|
||||
google::{FieldViolation, FieldViolationExt, OptionalField},
|
||||
google::{FieldViolation, OptionalField},
|
||||
influxdata::iox::ingester::v1 as proto,
|
||||
};
|
||||
use data_types::timestamp::TimestampRange;
|
||||
use data_types2::{IngesterQueryRequest, SequencerId};
|
||||
use data_types2::IngesterQueryRequest;
|
||||
use datafusion::{
|
||||
logical_plan::{
|
||||
abs, acos, asin, atan, ceil, concat, cos, digest, exp, floor, ln, log10, log2, round,
|
||||
|
@ -25,22 +25,14 @@ impl TryFrom<proto::IngesterQueryRequest> for IngesterQueryRequest {
|
|||
fn try_from(proto: proto::IngesterQueryRequest) -> Result<Self, Self::Error> {
|
||||
let proto::IngesterQueryRequest {
|
||||
namespace,
|
||||
sequencer_id,
|
||||
table,
|
||||
columns,
|
||||
predicate,
|
||||
} = proto;
|
||||
|
||||
let predicate = predicate.map(TryInto::try_into).transpose()?;
|
||||
let sequencer_id: i16 = sequencer_id.try_into().scope("sequencer_id")?;
|
||||
|
||||
Ok(Self::new(
|
||||
namespace,
|
||||
SequencerId::new(sequencer_id),
|
||||
table,
|
||||
columns,
|
||||
predicate,
|
||||
))
|
||||
Ok(Self::new(namespace, table, columns, predicate))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -50,7 +42,6 @@ impl TryFrom<IngesterQueryRequest> for proto::IngesterQueryRequest {
|
|||
fn try_from(query: IngesterQueryRequest) -> Result<Self, Self::Error> {
|
||||
let IngesterQueryRequest {
|
||||
namespace,
|
||||
sequencer_id,
|
||||
table,
|
||||
columns,
|
||||
predicate,
|
||||
|
@ -58,7 +49,6 @@ impl TryFrom<IngesterQueryRequest> for proto::IngesterQueryRequest {
|
|||
|
||||
Ok(Self {
|
||||
namespace,
|
||||
sequencer_id: sequencer_id.get().into(),
|
||||
table,
|
||||
columns,
|
||||
predicate: predicate.map(TryInto::try_into).transpose()?,
|
||||
|
@ -1703,7 +1693,6 @@ mod tests {
|
|||
|
||||
let rust_query = IngesterQueryRequest::new(
|
||||
"mydb".into(),
|
||||
SequencerId::new(5),
|
||||
"cpu".into(),
|
||||
vec!["usage".into(), "time".into()],
|
||||
Some(rust_predicate),
|
||||
|
@ -1711,7 +1700,6 @@ mod tests {
|
|||
|
||||
let proto_query = proto::IngesterQueryRequest {
|
||||
namespace: "mydb".into(),
|
||||
sequencer_id: 5,
|
||||
table: "cpu".into(),
|
||||
columns: vec!["usage".into(), "time".into()],
|
||||
predicate: Some(proto_predicate),
|
||||
|
|
|
@ -1,16 +1,18 @@
|
|||
use std::collections::BTreeMap;
|
||||
|
||||
use generated_types::influxdata::iox::ingester::v1::PartitionStatus;
|
||||
use http::StatusCode;
|
||||
use test_helpers_end_to_end_ng::{
|
||||
get_write_token, maybe_skip_integration, wait_for_readable, MiniCluster, TestConfig,
|
||||
};
|
||||
|
||||
use arrow_util::assert_batches_sorted_eq;
|
||||
use data_types2::{IngesterQueryRequest, SequencerId};
|
||||
use data_types2::IngesterQueryRequest;
|
||||
|
||||
#[tokio::test]
|
||||
async fn ingester_flight_api() {
|
||||
let database_url = maybe_skip_integration!();
|
||||
|
||||
let sequencer_id = SequencerId::new(1);
|
||||
let table_name = "mytable";
|
||||
|
||||
let router2_config = TestConfig::new_router2(&database_url);
|
||||
|
@ -38,7 +40,6 @@ async fn ingester_flight_api() {
|
|||
|
||||
let query = IngesterQueryRequest::new(
|
||||
cluster.namespace().to_string(),
|
||||
sequencer_id,
|
||||
table_name.into(),
|
||||
vec![],
|
||||
Some(::predicate::EMPTY_PREDICATE),
|
||||
|
@ -49,10 +50,18 @@ async fn ingester_flight_api() {
|
|||
.await
|
||||
.unwrap();
|
||||
|
||||
assert!(performed_query
|
||||
.app_metadata()
|
||||
.parquet_max_sequence_number
|
||||
.is_none());
|
||||
let unpersisted_partitions = &performed_query.app_metadata().unpersisted_partitions;
|
||||
let partition_id = *unpersisted_partitions.keys().next().unwrap();
|
||||
assert_eq!(
|
||||
unpersisted_partitions,
|
||||
&BTreeMap::from([(
|
||||
partition_id,
|
||||
PartitionStatus {
|
||||
parquet_max_sequence_number: None,
|
||||
tombstone_max_sequence_number: None
|
||||
}
|
||||
)]),
|
||||
);
|
||||
|
||||
let query_results = performed_query.collect().await.unwrap();
|
||||
|
||||
|
|
|
@ -605,7 +605,10 @@ impl NamespaceData {
|
|||
}
|
||||
|
||||
/// Gets the buffered table data
|
||||
fn table_data(&self, table_name: &str) -> Option<Arc<tokio::sync::RwLock<TableData>>> {
|
||||
pub(crate) fn table_data(
|
||||
&self,
|
||||
table_name: &str,
|
||||
) -> Option<Arc<tokio::sync::RwLock<TableData>>> {
|
||||
let t = self.tables.read();
|
||||
t.get(table_name).cloned()
|
||||
}
|
||||
|
@ -812,25 +815,21 @@ impl TableData {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
pub fn non_persisted_and_persisting_batches(
|
||||
&self,
|
||||
) -> (Vec<Arc<SnapshotBatch>>, Vec<QueryableBatch>) {
|
||||
let mut snapshots = vec![];
|
||||
let mut queryable_batches = vec![];
|
||||
|
||||
for p in self.partition_data.values() {
|
||||
snapshots.append(
|
||||
&mut p
|
||||
pub fn unpersisted_partition_data(&self) -> Vec<UnpersistedPartitionData> {
|
||||
self.partition_data
|
||||
.values()
|
||||
.map(|p| UnpersistedPartitionData {
|
||||
partition_id: p.id,
|
||||
non_persisted: p
|
||||
.get_non_persisting_data()
|
||||
.expect("get_non_persisting should always work"),
|
||||
);
|
||||
|
||||
if let Some(q) = p.get_persisting_data() {
|
||||
queryable_batches.push(q);
|
||||
}
|
||||
}
|
||||
|
||||
(snapshots, queryable_batches)
|
||||
persisting: p.get_persisting_data(),
|
||||
partition_status: PartitionStatus {
|
||||
parquet_max_sequence_number: p.data.max_persisted_sequence_number,
|
||||
tombstone_max_sequence_number: self.tombstone_max_sequence_number,
|
||||
},
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
async fn insert_partition(
|
||||
|
@ -880,6 +879,15 @@ impl TableData {
|
|||
}
|
||||
}
|
||||
|
||||
/// Read only copy of the unpersisted data for a partition in the ingester for a specific partition.
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct UnpersistedPartitionData {
|
||||
pub partition_id: PartitionId,
|
||||
pub non_persisted: Vec<Arc<SnapshotBatch>>,
|
||||
pub persisting: Option<QueryableBatch>,
|
||||
pub partition_status: PartitionStatus,
|
||||
}
|
||||
|
||||
/// Data of an IOx Partition of a given Table of a Namesapce that belongs to a given Shard
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct PartitionData {
|
||||
|
@ -1378,6 +1386,19 @@ pub struct QueryableBatch {
|
|||
pub(crate) table_name: String,
|
||||
}
|
||||
|
||||
/// Status of a partition that has unpersisted data.
|
||||
///
|
||||
/// Note that this structure is specific to a partition (which itself is bound to a table and sequencer)!
|
||||
#[derive(Debug, Clone)]
|
||||
#[allow(missing_copy_implementations)]
|
||||
pub struct PartitionStatus {
|
||||
/// Max sequence number persisted
|
||||
pub parquet_max_sequence_number: Option<SequenceNumber>,
|
||||
|
||||
/// Max sequence number for a tombstone
|
||||
pub tombstone_max_sequence_number: Option<SequenceNumber>,
|
||||
}
|
||||
|
||||
/// Response sending to the query service per its request defined in IngesterQueryRequest
|
||||
pub struct IngesterQueryResponse {
|
||||
/// Stream of RecordBatch results that match the requested query
|
||||
|
@ -1386,11 +1407,11 @@ pub struct IngesterQueryResponse {
|
|||
/// The schema of the record batches
|
||||
pub schema: Schema,
|
||||
|
||||
/// Max sequence number persisted for this table
|
||||
pub parquet_max_sequence_number: Option<SequenceNumber>,
|
||||
|
||||
/// Max sequence number for a tombstone associated with this table
|
||||
pub tombstone_max_sequence_number: Option<SequenceNumber>,
|
||||
/// Contains status for every partition that has unpersisted data.
|
||||
///
|
||||
/// If a partition does NOT appear within this map, then either all data was persisted or the ingester has never seen
|
||||
/// data for this partition. In either case the querier may just read all parquet files for the missing partition.
|
||||
pub unpersisted_partitions: BTreeMap<PartitionId, PartitionStatus>,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for IngesterQueryResponse {
|
||||
|
@ -1398,14 +1419,7 @@ impl std::fmt::Debug for IngesterQueryResponse {
|
|||
f.debug_struct("IngesterQueryResponse")
|
||||
.field("data", &"<RECORDBATCH STREAM>")
|
||||
.field("schema", &self.schema)
|
||||
.field(
|
||||
"parquet_max_sequence_number",
|
||||
&self.parquet_max_sequence_number,
|
||||
)
|
||||
.field(
|
||||
"tombstone_max_sequence_number",
|
||||
&self.tombstone_max_sequence_number,
|
||||
)
|
||||
.field("unpersisted_partitions", &self.unpersisted_partitions)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
@ -1415,14 +1429,12 @@ impl IngesterQueryResponse {
|
|||
pub fn new(
|
||||
data: SendableRecordBatchStream,
|
||||
schema: Schema,
|
||||
parquet_max_sequence_number: Option<SequenceNumber>,
|
||||
tombstone_max_sequence_number: Option<SequenceNumber>,
|
||||
unpersisted_partitions: BTreeMap<PartitionId, PartitionStatus>,
|
||||
) -> Self {
|
||||
Self {
|
||||
data,
|
||||
schema,
|
||||
parquet_max_sequence_number,
|
||||
tombstone_max_sequence_number,
|
||||
unpersisted_partitions,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,9 +1,11 @@
|
|||
//! Handle all requests from Querier
|
||||
|
||||
use crate::data::{self, IngesterData, IngesterQueryResponse, QueryableBatch};
|
||||
use crate::data::{
|
||||
self, IngesterData, IngesterQueryResponse, QueryableBatch, UnpersistedPartitionData,
|
||||
};
|
||||
use arrow::record_batch::RecordBatch;
|
||||
use arrow_util::util::merge_record_batches;
|
||||
use data_types2::{IngesterQueryRequest, SequencerId};
|
||||
use data_types2::IngesterQueryRequest;
|
||||
use datafusion::{
|
||||
error::DataFusionError,
|
||||
physical_plan::{
|
||||
|
@ -18,9 +20,16 @@ use query::{
|
|||
frontend::reorg::ReorgPlanner,
|
||||
QueryChunkMeta,
|
||||
};
|
||||
use schema::{merge::merge_record_batch_schemas, selection::Selection};
|
||||
use snafu::{OptionExt, ResultExt, Snafu};
|
||||
use std::sync::Arc;
|
||||
use schema::{
|
||||
merge::{merge_record_batch_schemas, SchemaMerger},
|
||||
selection::Selection,
|
||||
Schema,
|
||||
};
|
||||
use snafu::{ensure, ResultExt, Snafu};
|
||||
use std::{
|
||||
collections::{BTreeMap, HashMap},
|
||||
sync::Arc,
|
||||
};
|
||||
|
||||
#[derive(Debug, Snafu)]
|
||||
#[allow(missing_copy_implementations, missing_docs)]
|
||||
|
@ -49,10 +58,11 @@ pub enum Error {
|
|||
CollectStream { source: DataFusionError },
|
||||
|
||||
#[snafu(display(
|
||||
"No Table Data found for the given sequencer id {}, namespace name {}, table name {}", sequencer_id.get(), namespace_name, table_name
|
||||
"No Table Data found for the given namespace name {}, table name {}",
|
||||
namespace_name,
|
||||
table_name
|
||||
))]
|
||||
TableNotFound {
|
||||
sequencer_id: SequencerId,
|
||||
namespace_name: String,
|
||||
table_name: String,
|
||||
},
|
||||
|
@ -62,6 +72,9 @@ pub enum Error {
|
|||
|
||||
#[snafu(display("Error concating same-schema record batches: {}", source))]
|
||||
ConcatBatches { source: arrow::error::ArrowError },
|
||||
|
||||
#[snafu(display("Error merging schemas: {}", source))]
|
||||
MergeSchema { source: schema::merge::Error },
|
||||
}
|
||||
|
||||
/// A specialized `Error` for Ingester's Query errors
|
||||
|
@ -72,31 +85,81 @@ pub async fn prepare_data_to_querier(
|
|||
ingest_data: &Arc<IngesterData>,
|
||||
request: &IngesterQueryRequest,
|
||||
) -> Result<IngesterQueryResponse> {
|
||||
// ------------------------------------------------
|
||||
// Read the IngesterData to get TableData for the given request's table
|
||||
let table_data =
|
||||
ingest_data.table_data(request.sequencer_id, &request.namespace, &request.table);
|
||||
let table_data = table_data.context(TableNotFoundSnafu {
|
||||
sequencer_id: request.sequencer_id,
|
||||
namespace_name: &request.namespace,
|
||||
table_name: &request.table,
|
||||
})?;
|
||||
let mut schema_merger = SchemaMerger::new();
|
||||
let mut unpersisted_partitions = BTreeMap::new();
|
||||
let mut batches = vec![];
|
||||
for sequencer_data in ingest_data.sequencers.values() {
|
||||
let maybe_table_data = sequencer_data
|
||||
.namespace(&request.namespace)
|
||||
.and_then(|namespace_data| namespace_data.table_data(&request.table));
|
||||
|
||||
let (
|
||||
parquet_max_sequence_number,
|
||||
tombstone_max_sequence_number,
|
||||
(filter_not_applied_batches, persisting_batches),
|
||||
) = {
|
||||
let table_data = table_data.read().await;
|
||||
(
|
||||
table_data.parquet_max_sequence_number(),
|
||||
table_data.tombstone_max_sequence_number(),
|
||||
table_data.non_persisted_and_persisting_batches(),
|
||||
)
|
||||
};
|
||||
let table_data = match maybe_table_data {
|
||||
Some(table_data) => table_data,
|
||||
None => {
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
let unpersisted_partition_data = {
|
||||
let table_data = table_data.read().await;
|
||||
table_data.unpersisted_partition_data()
|
||||
};
|
||||
|
||||
for partition in unpersisted_partition_data {
|
||||
// include partition in `unpersisted_partitions` even when there we might filter out all the data, because
|
||||
// the metadata (e.g. max persisted parquet file) is important for the querier.
|
||||
unpersisted_partitions
|
||||
.insert(partition.partition_id, partition.partition_status.clone());
|
||||
|
||||
// extract payload
|
||||
let (schema, batch) =
|
||||
prepare_data_to_querier_for_partition(&ingest_data.exec, partition, request)
|
||||
.await?;
|
||||
schema_merger = schema_merger
|
||||
.merge(schema.as_ref())
|
||||
.context(MergeSchemaSnafu)?;
|
||||
if let Some(batch) = batch {
|
||||
batches.push(Arc::new(batch));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ensure!(
|
||||
!unpersisted_partitions.is_empty(),
|
||||
TableNotFoundSnafu {
|
||||
namespace_name: &request.namespace,
|
||||
table_name: &request.table
|
||||
}
|
||||
);
|
||||
let schema = schema_merger.build();
|
||||
|
||||
// ------------------------------------------------
|
||||
// Accumulate data from each partition
|
||||
// Make a stream for this batch
|
||||
let dummy_metrics = ExecutionPlanMetricsSet::new();
|
||||
let mem_metrics = MemTrackingMetrics::new(&dummy_metrics, 0);
|
||||
let stream = SizedRecordBatchStream::new(schema.as_arrow(), batches, mem_metrics);
|
||||
|
||||
Ok(IngesterQueryResponse::new(
|
||||
Box::pin(stream),
|
||||
schema,
|
||||
unpersisted_partitions,
|
||||
))
|
||||
}
|
||||
|
||||
async fn prepare_data_to_querier_for_partition(
|
||||
executor: &Executor,
|
||||
unpersisted_partition_data: UnpersistedPartitionData,
|
||||
request: &IngesterQueryRequest,
|
||||
) -> Result<(Arc<Schema>, Option<RecordBatch>)> {
|
||||
// ------------------------------------------------
|
||||
// Metadata
|
||||
let schema_metadata = HashMap::from([(
|
||||
String::from("iox:partition_id"),
|
||||
unpersisted_partition_data.partition_id.get().to_string(),
|
||||
)]);
|
||||
|
||||
// ------------------------------------------------
|
||||
// Accumulate data
|
||||
|
||||
// Make Filters
|
||||
let selection_columns: Vec<_> = request.columns.iter().map(String::as_str).collect();
|
||||
|
@ -108,15 +171,16 @@ pub async fn prepare_data_to_querier(
|
|||
let predicate = request.predicate.clone().unwrap_or_default();
|
||||
|
||||
let mut filter_applied_batches = vec![];
|
||||
for queryable_batch in persisting_batches {
|
||||
if let Some(queryable_batch) = unpersisted_partition_data.persisting {
|
||||
// ------------------------------------------------
|
||||
// persisting data
|
||||
|
||||
let record_batch = run_query(
|
||||
&ingest_data.exec,
|
||||
executor,
|
||||
Arc::new(queryable_batch),
|
||||
predicate.clone(),
|
||||
selection,
|
||||
schema_metadata.clone(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
|
@ -128,17 +192,21 @@ pub async fn prepare_data_to_querier(
|
|||
}
|
||||
|
||||
// ------------------------------------------------
|
||||
// Apply filters on the snapshot bacthes
|
||||
if !filter_not_applied_batches.is_empty() {
|
||||
// Apply filters on the snapshot batches
|
||||
if !unpersisted_partition_data.non_persisted.is_empty() {
|
||||
// Make a Query able batch for all the snapshot
|
||||
let queryable_batch =
|
||||
QueryableBatch::new(&request.table, filter_not_applied_batches, vec![]);
|
||||
let queryable_batch = QueryableBatch::new(
|
||||
&request.table,
|
||||
unpersisted_partition_data.non_persisted,
|
||||
vec![],
|
||||
);
|
||||
|
||||
let record_batch = run_query(
|
||||
&ingest_data.exec,
|
||||
executor,
|
||||
Arc::new(queryable_batch),
|
||||
predicate,
|
||||
selection,
|
||||
schema_metadata.clone(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
|
@ -153,23 +221,20 @@ pub async fn prepare_data_to_querier(
|
|||
// Combine record batches into one batch and pad null values as needed
|
||||
|
||||
// Schema of all record batches after merging
|
||||
let schema = merge_record_batch_schemas(&filter_applied_batches);
|
||||
let schema = Arc::new(
|
||||
Schema::try_from(Arc::new(
|
||||
merge_record_batch_schemas(&filter_applied_batches)
|
||||
.as_arrow()
|
||||
.as_ref()
|
||||
.clone()
|
||||
.with_metadata(schema_metadata),
|
||||
))
|
||||
.expect("schema roundtrip should work"),
|
||||
);
|
||||
let batch = merge_record_batches(schema.as_arrow(), filter_applied_batches)
|
||||
.context(ConcatBatchesSnafu)?;
|
||||
|
||||
// ------------------------------------------------
|
||||
// Make a stream for this batch
|
||||
let dummy_metrics = ExecutionPlanMetricsSet::new();
|
||||
let mem_metrics = MemTrackingMetrics::new(&dummy_metrics, 0);
|
||||
let stream_batch = batch.map(|b| vec![Arc::new(b)]).unwrap_or_default();
|
||||
let stream = SizedRecordBatchStream::new(schema.as_arrow(), stream_batch, mem_metrics);
|
||||
|
||||
Ok(IngesterQueryResponse::new(
|
||||
Box::pin(stream),
|
||||
(*schema).clone(),
|
||||
parquet_max_sequence_number,
|
||||
tombstone_max_sequence_number,
|
||||
))
|
||||
Ok((schema, batch))
|
||||
}
|
||||
|
||||
/// Query a given Queryable Batch, applying selection and filters as appropriate
|
||||
|
@ -179,6 +244,7 @@ pub async fn run_query(
|
|||
data: Arc<QueryableBatch>,
|
||||
predicate: Predicate,
|
||||
selection: Selection<'_>,
|
||||
schema_metadata: HashMap<String, String>,
|
||||
) -> Result<Option<RecordBatch>> {
|
||||
let stream = query(executor, data, predicate, selection).await?;
|
||||
|
||||
|
@ -194,6 +260,19 @@ pub async fn run_query(
|
|||
let record_batch = RecordBatch::concat(&record_batches[0].schema(), &record_batches)
|
||||
.context(ConcatBatchesSnafu)?;
|
||||
|
||||
// modify schema AFTER concat, otherwise Arrow will be unhappy about the different Key-Value data
|
||||
let record_batch = RecordBatch::try_new(
|
||||
Arc::new(
|
||||
record_batch
|
||||
.schema()
|
||||
.as_ref()
|
||||
.clone()
|
||||
.with_metadata(schema_metadata),
|
||||
),
|
||||
record_batch.columns().to_vec(),
|
||||
)
|
||||
.expect("adding key-value schema data should work");
|
||||
|
||||
Ok(Some(record_batch))
|
||||
}
|
||||
|
||||
|
@ -210,11 +289,15 @@ pub async fn query(
|
|||
|
||||
let indices = match selection {
|
||||
Selection::All => None,
|
||||
Selection::Some(columns) => Some(
|
||||
data.schema()
|
||||
.compute_select_indicies(columns)
|
||||
.context(SelectColumnsSnafu)?,
|
||||
),
|
||||
Selection::Some(columns) => {
|
||||
let schema = data.schema();
|
||||
Some(
|
||||
columns
|
||||
.iter()
|
||||
.flat_map(|&column_name| schema.find_index_of(column_name))
|
||||
.collect(),
|
||||
)
|
||||
}
|
||||
};
|
||||
|
||||
let mut expr = vec![];
|
||||
|
@ -376,14 +459,13 @@ mod tests {
|
|||
DataLocation::PERSISTING,
|
||||
] {
|
||||
let scenario = Arc::new(make_ingester_data(two_partitions, loc));
|
||||
scenarios.push(scenario);
|
||||
scenarios.push((loc, scenario));
|
||||
}
|
||||
}
|
||||
|
||||
// read data from all scenarios without any filters
|
||||
let mut request = IngesterQueryRequest::new(
|
||||
TEST_NAMESPACE.to_string(),
|
||||
SequencerId::new(1), // must be 1
|
||||
TEST_TABLE.to_string(),
|
||||
vec![],
|
||||
None,
|
||||
|
@ -402,12 +484,14 @@ mod tests {
|
|||
"| Wilmington | mon | | 1970-01-01T00:00:00.000000035Z |", // in group 3 - seq_num: 6
|
||||
"+------------+-----+------+--------------------------------+",
|
||||
];
|
||||
for scenario in &scenarios {
|
||||
for (loc, scenario) in &scenarios {
|
||||
println!("Location: {loc:?}");
|
||||
let stream = prepare_data_to_querier(scenario, &request).await.unwrap();
|
||||
let result = datafusion::physical_plan::common::collect(stream.data)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_batches_sorted_eq!(&expected, &result);
|
||||
assert_batches_have_schema_metadata(&result);
|
||||
}
|
||||
|
||||
// read data from all scenarios and filter out column day
|
||||
|
@ -426,12 +510,14 @@ mod tests {
|
|||
"| Wilmington | | 1970-01-01T00:00:00.000000035Z |",
|
||||
"+------------+------+--------------------------------+",
|
||||
];
|
||||
for scenario in &scenarios {
|
||||
for (loc, scenario) in &scenarios {
|
||||
println!("Location: {loc:?}");
|
||||
let stream = prepare_data_to_querier(scenario, &request).await.unwrap();
|
||||
let result = datafusion::physical_plan::common::collect(stream.data)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_batches_sorted_eq!(&expected, &result);
|
||||
assert_batches_have_schema_metadata(&result);
|
||||
}
|
||||
|
||||
// read data from all scenarios, filter out column day, city Medford, time outside range [0, 42)
|
||||
|
@ -453,12 +539,14 @@ mod tests {
|
|||
"| Wilmington | | 1970-01-01T00:00:00.000000035Z |",
|
||||
"+------------+------+--------------------------------+",
|
||||
];
|
||||
for scenario in &scenarios {
|
||||
for (loc, scenario) in &scenarios {
|
||||
println!("Location: {loc:?}");
|
||||
let stream = prepare_data_to_querier(scenario, &request).await.unwrap();
|
||||
let result = datafusion::physical_plan::common::collect(stream.data)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_batches_sorted_eq!(&expected, &result);
|
||||
assert_batches_have_schema_metadata(&result);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -484,7 +572,6 @@ mod tests {
|
|||
// read data from all scenarios without any filters
|
||||
let mut request = IngesterQueryRequest::new(
|
||||
TEST_NAMESPACE.to_string(),
|
||||
SequencerId::new(1), // must be 1
|
||||
TEST_TABLE.to_string(),
|
||||
vec![],
|
||||
None,
|
||||
|
@ -507,6 +594,7 @@ mod tests {
|
|||
.await
|
||||
.unwrap();
|
||||
assert_batches_sorted_eq!(&expected, &result);
|
||||
assert_batches_have_schema_metadata(&result);
|
||||
}
|
||||
|
||||
// read data from all scenarios and filter out column day
|
||||
|
@ -529,6 +617,7 @@ mod tests {
|
|||
.await
|
||||
.unwrap();
|
||||
assert_batches_sorted_eq!(&expected, &result);
|
||||
assert_batches_have_schema_metadata(&result);
|
||||
}
|
||||
|
||||
// read data from all scenarios, filter out column day, city Medford, time outside range [0, 42)
|
||||
|
@ -554,6 +643,19 @@ mod tests {
|
|||
.await
|
||||
.unwrap();
|
||||
assert_batches_sorted_eq!(&expected, &result);
|
||||
assert_batches_have_schema_metadata(&result);
|
||||
}
|
||||
}
|
||||
|
||||
fn assert_batches_have_schema_metadata(batches: &[RecordBatch]) {
|
||||
for batch in batches {
|
||||
let schema = batch.schema();
|
||||
|
||||
let metadata = schema.metadata();
|
||||
assert_eq!(metadata.len(), 1);
|
||||
|
||||
let partition_id_str = metadata.get("iox:partition_id").unwrap();
|
||||
partition_id_str.parse::<i64>().unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -303,8 +303,7 @@ impl GetStream {
|
|||
let IngesterQueryResponse {
|
||||
mut data,
|
||||
schema,
|
||||
parquet_max_sequence_number,
|
||||
tombstone_max_sequence_number,
|
||||
unpersisted_partitions,
|
||||
} = query_response;
|
||||
|
||||
// setup channel
|
||||
|
@ -318,8 +317,22 @@ impl GetStream {
|
|||
// Add max_sequencer_number to app metadata
|
||||
let mut bytes = bytes::BytesMut::new();
|
||||
let app_metadata = proto::IngesterQueryResponseMetadata {
|
||||
parquet_max_sequence_number: parquet_max_sequence_number.map(|n| n.get()),
|
||||
tombstone_max_sequence_number: tombstone_max_sequence_number.map(|n| n.get()),
|
||||
unpersisted_partitions: unpersisted_partitions
|
||||
.into_iter()
|
||||
.map(|(id, status)| {
|
||||
(
|
||||
id.get(),
|
||||
proto::PartitionStatus {
|
||||
parquet_max_sequence_number: status
|
||||
.parquet_max_sequence_number
|
||||
.map(|x| x.get()),
|
||||
tombstone_max_sequence_number: status
|
||||
.tombstone_max_sequence_number
|
||||
.map(|x| x.get()),
|
||||
},
|
||||
)
|
||||
})
|
||||
.collect(),
|
||||
};
|
||||
prost::Message::encode(&app_metadata, &mut bytes).context(SerializationSnafu)?;
|
||||
schema_flight_data.app_metadata = bytes.to_vec();
|
||||
|
|
|
@ -4,7 +4,7 @@ use arrow::{datatypes::DataType, error::ArrowError, record_batch::RecordBatch};
|
|||
use async_trait::async_trait;
|
||||
use client_util::connection;
|
||||
use data_types2::{
|
||||
ChunkAddr, ChunkId, ChunkOrder, IngesterQueryRequest, SequenceNumber, SequencerId, TableSummary,
|
||||
ChunkAddr, ChunkId, ChunkOrder, IngesterQueryRequest, SequenceNumber, TableSummary,
|
||||
};
|
||||
use datafusion_util::MemoryStream;
|
||||
use influxdb_iox_client::flight::{self, Client as FlightClient, Error as FlightError};
|
||||
|
@ -136,9 +136,6 @@ impl IngesterConnection for IngesterConnectionImpl {
|
|||
|
||||
let ingester_query_request = IngesterQueryRequest {
|
||||
namespace: namespace_name.to_string(),
|
||||
// TODO: change protocol to search all sequencers...
|
||||
//sequencer_id: SequencerId::new(0),
|
||||
sequencer_id: SequencerId::new(1),
|
||||
table: table_name.to_string(),
|
||||
columns: columns.clone(),
|
||||
predicate: Some(predicate.clone()),
|
||||
|
|
Loading…
Reference in New Issue