Merge pull request #3514 from influxdata/ntran/ingest_compact
feat: ingester's compactionpull/24376/head
commit
6287d7dc5c
|
@ -1860,13 +1860,20 @@ name = "ingester"
|
|||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"arrow_util",
|
||||
"data_types",
|
||||
"datafusion 0.1.0",
|
||||
"hyper",
|
||||
"iox_catalog",
|
||||
"metric",
|
||||
"mutable_batch",
|
||||
"parking_lot",
|
||||
"predicate",
|
||||
"query",
|
||||
"schema",
|
||||
"snafu",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"uuid",
|
||||
"workspace-hack",
|
||||
]
|
||||
|
@ -4603,9 +4610,9 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "thread_local"
|
||||
version = "1.1.3"
|
||||
version = "1.1.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8018d24e04c95ac8790716a5987d0fec4f8b27249ffa0f7d33f1369bdfb88cbd"
|
||||
checksum = "5516c27b78311c50bf42c071425c560ac799b11c30b31f87e3081965fe5e0180"
|
||||
dependencies = [
|
||||
"once_cell",
|
||||
]
|
||||
|
|
|
@ -6,12 +6,19 @@ edition = "2021"
|
|||
|
||||
[dependencies]
|
||||
arrow = { version = "7.0", features = ["prettyprint"] }
|
||||
arrow_util = { path = "../arrow_util" }
|
||||
datafusion = { path = "../datafusion" }
|
||||
data_types = { path = "../data_types" }
|
||||
hyper = "0.14"
|
||||
iox_catalog = { path = "../iox_catalog" }
|
||||
metric = { path = "../metric" }
|
||||
mutable_batch = { path = "../mutable_batch"}
|
||||
parking_lot = "0.11.2"
|
||||
predicate = { path = "../predicate" }
|
||||
query = { path = "../query" }
|
||||
schema = { path = "../schema" }
|
||||
snafu = "0.7"
|
||||
tokio = { version = "1.13", features = ["macros", "parking_lot", "rt-multi-thread", "sync", "time"] }
|
||||
thiserror = "1.0"
|
||||
uuid = { version = "0.8", features = ["v4"] }
|
||||
workspace-hack = { path = "../workspace-hack"}
|
||||
|
|
|
@ -0,0 +1,322 @@
|
|||
//! This module is responsible for compacting Ingester's data
|
||||
|
||||
use datafusion::{error::DataFusionError, physical_plan::SendableRecordBatchStream};
|
||||
use query::{
|
||||
exec::{Executor, ExecutorType},
|
||||
frontend::reorg::ReorgPlanner,
|
||||
QueryChunkMeta,
|
||||
};
|
||||
use snafu::{ResultExt, Snafu};
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::data::QueryableBatch;
|
||||
|
||||
#[derive(Debug, Snafu)]
|
||||
#[allow(missing_copy_implementations, missing_docs)]
|
||||
pub enum Error {
|
||||
#[snafu(display("Error while building logical plan for Ingester's compaction"))]
|
||||
LogicalPlan {
|
||||
source: query::frontend::reorg::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Error while building physical plan for Ingester's compaction"))]
|
||||
PhysicalPlan { source: DataFusionError },
|
||||
|
||||
#[snafu(display("Error while executing Ingester's compaction"))]
|
||||
ExecutePlan { source: DataFusionError },
|
||||
|
||||
#[snafu(display("Error while building delete predicate from start time, {}, stop time, {}, and serialized predicate, {}", min, max, predicate))]
|
||||
DeletePredicate {
|
||||
source: predicate::delete_predicate::Error,
|
||||
min: String,
|
||||
max: String,
|
||||
predicate: String,
|
||||
},
|
||||
}
|
||||
|
||||
/// A specialized `Error` for Ingester's Compact errors
|
||||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||
|
||||
/// Compact the given Ingester's data
|
||||
/// Note: the given `executor` should be created with the IngesterServer
|
||||
pub async fn compact(
|
||||
executor: &Executor,
|
||||
data: Arc<QueryableBatch>,
|
||||
) -> Result<SendableRecordBatchStream> {
|
||||
// Build logical plan for compaction
|
||||
let ctx = executor.new_context(ExecutorType::Reorg);
|
||||
let logical_plan = ReorgPlanner::new()
|
||||
.scan_single_chunk_plan(data.schema(), data)
|
||||
.context(LogicalPlanSnafu {})?;
|
||||
|
||||
// Build physical plan
|
||||
let physical_plan = ctx
|
||||
.prepare_plan(&logical_plan)
|
||||
.await
|
||||
.context(PhysicalPlanSnafu {})?;
|
||||
|
||||
// Execute the plan and return the compacted stream
|
||||
let output_stream = ctx
|
||||
.execute_stream(physical_plan)
|
||||
.await
|
||||
.context(ExecutePlanSnafu {})?;
|
||||
Ok(output_stream)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::num::NonZeroU64;
|
||||
|
||||
use crate::data::SnapshotBatch;
|
||||
|
||||
use super::*;
|
||||
|
||||
use arrow::record_batch::RecordBatch;
|
||||
use arrow_util::assert_batches_eq;
|
||||
use iox_catalog::interface::SequenceNumber;
|
||||
use query::test::{raw_data, TestChunk};
|
||||
|
||||
#[ignore]
|
||||
#[tokio::test]
|
||||
async fn test_compact_no_dedup_no_delete() {
|
||||
let batches = create_record_batches_with_influxtype_no_duplicates().await;
|
||||
let compact_batch = make_queryable_batch(batches);
|
||||
let exc = Executor::new(1);
|
||||
let stream = compact(&exc, compact_batch).await.unwrap();
|
||||
let output_batches = datafusion::physical_plan::common::collect(stream)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
println!("output_batches: {:#?}", output_batches);
|
||||
|
||||
let expected = vec![
|
||||
"+-----------+------+-------------------------------+",
|
||||
"| field_int | tag1 | time |",
|
||||
"+-----------+------+-------------------------------+",
|
||||
"| 1000 | MT | 1970-01-01 00:00:00.000001 |",
|
||||
"| 10 | MT | 1970-01-01 00:00:00.000007 |",
|
||||
"| 70 | CT | 1970-01-01 00:00:00.000000100 |",
|
||||
"| 100 | AL | 1970-01-01 00:00:00.000000050 |",
|
||||
"| 5 | MT | 1970-01-01 00:00:00.000000005 |",
|
||||
"| 1000 | MT | 1970-01-01 00:00:00.000002 |",
|
||||
"| 20 | MT | 1970-01-01 00:00:00.000007 |",
|
||||
"| 70 | CT | 1970-01-01 00:00:00.000000500 |",
|
||||
"| 10 | AL | 1970-01-01 00:00:00.000000050 |",
|
||||
"| 30 | MT | 1970-01-01 00:00:00.000000005 |",
|
||||
"+-----------+------+-------------------------------+",
|
||||
];
|
||||
assert_batches_eq!(&expected, &output_batches);
|
||||
}
|
||||
|
||||
#[ignore]
|
||||
#[tokio::test]
|
||||
async fn test_compact_with_dedup_no_delete() {
|
||||
let batches = create_batches_with_influxtype().await;
|
||||
let compact_batch = make_queryable_batch(batches);
|
||||
let exc = Executor::new(1);
|
||||
|
||||
let stream = compact(&exc, compact_batch).await.unwrap();
|
||||
let output_batches = datafusion::physical_plan::common::collect(stream)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// println!("output_batches: {:#?}", output_batches);
|
||||
// let table = pretty_format_batches(&[batch]).unwrap();
|
||||
let expected = vec![
|
||||
"+-----------+------+-------------------------------+",
|
||||
"| field_int | tag1 | time |",
|
||||
"+-----------+------+-------------------------------+",
|
||||
"| 1000 | MT | 1970-01-01 00:00:00.000001 |",
|
||||
"| 10 | MT | 1970-01-01 00:00:00.000007 |",
|
||||
"| 70 | CT | 1970-01-01 00:00:00.000000100 |",
|
||||
"| 100 | AL | 1970-01-01 00:00:00.000000050 |",
|
||||
"| 5 | MT | 1970-01-01 00:00:00.000000005 |",
|
||||
"| 1000 | MT | 1970-01-01 00:00:00.000002 |",
|
||||
"| 20 | MT | 1970-01-01 00:00:00.000007 |",
|
||||
"| 70 | CT | 1970-01-01 00:00:00.000000500 |",
|
||||
"| 10 | AL | 1970-01-01 00:00:00.000000050 |",
|
||||
"| 30 | MT | 1970-01-01 00:00:00.000000005 |",
|
||||
"+-----------+------+-------------------------------+",
|
||||
];
|
||||
assert_batches_eq!(&expected, &output_batches);
|
||||
}
|
||||
|
||||
// ----------------------------------------------------------------------------------------------
|
||||
// Data for testing
|
||||
pub fn make_queryable_batch(batches: Vec<Arc<RecordBatch>>) -> Arc<QueryableBatch> {
|
||||
// make snapshots for the bacthes
|
||||
let mut snapshots = vec![];
|
||||
let mut seq_num = 1;
|
||||
for batch in batches {
|
||||
let seq = SequenceNumber::new(seq_num);
|
||||
snapshots.push(make_snapshot_batch(batch, seq, seq));
|
||||
seq_num += 1;
|
||||
}
|
||||
|
||||
Arc::new(QueryableBatch::new("test_table", snapshots, vec![]))
|
||||
}
|
||||
|
||||
pub fn make_snapshot_batch(
|
||||
batch: Arc<RecordBatch>,
|
||||
min: SequenceNumber,
|
||||
max: SequenceNumber,
|
||||
) -> SnapshotBatch {
|
||||
SnapshotBatch {
|
||||
min_sequencer_number: min,
|
||||
max_sequencer_number: max,
|
||||
data: batch,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn create_record_batches_with_influxtype_no_duplicates() -> Vec<Arc<RecordBatch>> {
|
||||
let chunk1 = Arc::new(
|
||||
TestChunk::new("t")
|
||||
.with_id(1)
|
||||
.with_time_column_with_full_stats(
|
||||
Some(5),
|
||||
Some(7000),
|
||||
10,
|
||||
Some(NonZeroU64::new(7).unwrap()),
|
||||
)
|
||||
.with_tag_column_with_full_stats(
|
||||
"tag1",
|
||||
Some("AL"),
|
||||
Some("MT"),
|
||||
10,
|
||||
Some(NonZeroU64::new(3).unwrap()),
|
||||
)
|
||||
.with_i64_field_column("field_int")
|
||||
.with_ten_rows_of_data_some_duplicates(),
|
||||
);
|
||||
let batches = raw_data(&[chunk1]).await;
|
||||
let batches: Vec<_> = batches.iter().map(|r| Arc::new(r.clone())).collect();
|
||||
|
||||
// Output data look like this:
|
||||
// let expected = vec![
|
||||
// "+-----------+------+-------------------------------+",
|
||||
// "| field_int | tag1 | time |",
|
||||
// "+-----------+------+-------------------------------+",
|
||||
// "| 1000 | MT | 1970-01-01 00:00:00.000001 |",
|
||||
// "| 10 | MT | 1970-01-01 00:00:00.000007 |",
|
||||
// "| 70 | CT | 1970-01-01 00:00:00.000000100 |",
|
||||
// "| 100 | AL | 1970-01-01 00:00:00.000000050 |",
|
||||
// "| 5 | MT | 1970-01-01 00:00:00.000000005 |",
|
||||
// "| 1000 | MT | 1970-01-01 00:00:00.000002 |",
|
||||
// "| 20 | MT | 1970-01-01 00:00:00.000007 |",
|
||||
// "| 70 | CT | 1970-01-01 00:00:00.000000500 |",
|
||||
// "| 10 | AL | 1970-01-01 00:00:00.000000050 |",
|
||||
// "| 30 | MT | 1970-01-01 00:00:00.000000005 |",
|
||||
// "+-----------+------+-------------------------------+",
|
||||
// ];
|
||||
|
||||
batches
|
||||
}
|
||||
|
||||
// RecordBatches with knowledge of influx metadata
|
||||
pub async fn create_batches_with_influxtype() -> Vec<Arc<RecordBatch>> {
|
||||
// Use the available TestChunk to create chunks and then convert them to raw RecordBatches
|
||||
let mut batches = vec![];
|
||||
|
||||
// This test covers all kind of chunks: overlap, non-overlap without duplicates within, non-overlap with duplicates within
|
||||
// todo: may want to simplify the below data of test chunks. these are reuse the current code that cover many commpaction cases
|
||||
let chunk1 = Arc::new(
|
||||
TestChunk::new("t")
|
||||
.with_id(1)
|
||||
.with_time_column_with_full_stats(
|
||||
Some(5),
|
||||
Some(7000),
|
||||
10,
|
||||
Some(NonZeroU64::new(7).unwrap()),
|
||||
)
|
||||
.with_tag_column_with_full_stats(
|
||||
"tag1",
|
||||
Some("AL"),
|
||||
Some("MT"),
|
||||
10,
|
||||
Some(NonZeroU64::new(3).unwrap()),
|
||||
)
|
||||
.with_i64_field_column("field_int")
|
||||
.with_ten_rows_of_data_some_duplicates(),
|
||||
);
|
||||
let batch1 = raw_data(&[chunk1]).await[0].clone();
|
||||
//println!("BATCH1: {:#?}", batch1);
|
||||
batches.push(Arc::new(batch1));
|
||||
|
||||
// chunk2 overlaps with chunk 1
|
||||
let chunk2 = Arc::new(
|
||||
TestChunk::new("t")
|
||||
.with_id(2)
|
||||
.with_time_column_with_full_stats(
|
||||
Some(5),
|
||||
Some(7000),
|
||||
5,
|
||||
Some(NonZeroU64::new(5).unwrap()),
|
||||
)
|
||||
.with_tag_column_with_full_stats(
|
||||
"tag1",
|
||||
Some("AL"),
|
||||
Some("MT"),
|
||||
5,
|
||||
Some(NonZeroU64::new(3).unwrap()),
|
||||
)
|
||||
.with_i64_field_column("field_int")
|
||||
.with_five_rows_of_data(),
|
||||
);
|
||||
let batch2 = raw_data(&[chunk2]).await[0].clone();
|
||||
//println!("BATCH2: {:#?}", batch2);
|
||||
batches.push(Arc::new(batch2));
|
||||
|
||||
// chunk3 no overlap, no duplicates within
|
||||
let chunk3 = Arc::new(
|
||||
TestChunk::new("t")
|
||||
.with_id(3)
|
||||
.with_time_column_with_full_stats(
|
||||
Some(8000),
|
||||
Some(20000),
|
||||
3,
|
||||
Some(NonZeroU64::new(3).unwrap()),
|
||||
)
|
||||
.with_tag_column_with_full_stats(
|
||||
"tag1",
|
||||
Some("UT"),
|
||||
Some("WA"),
|
||||
3,
|
||||
Some(NonZeroU64::new(3).unwrap()),
|
||||
)
|
||||
.with_i64_field_column("field_int")
|
||||
.with_three_rows_of_data(),
|
||||
);
|
||||
let batch3 = raw_data(&[chunk3]).await[0].clone();
|
||||
//println!("BATCH3: {:#?}", batch3);
|
||||
batches.push(Arc::new(batch3));
|
||||
|
||||
// chunk3 no overlap, duplicates within
|
||||
let chunk4 = Arc::new(
|
||||
TestChunk::new("t")
|
||||
.with_id(4)
|
||||
.with_time_column_with_full_stats(
|
||||
Some(28000),
|
||||
Some(220000),
|
||||
4,
|
||||
Some(NonZeroU64::new(3).unwrap()),
|
||||
)
|
||||
.with_tag_column_with_full_stats(
|
||||
"tag1",
|
||||
Some("UT"),
|
||||
Some("WA"),
|
||||
4,
|
||||
Some(NonZeroU64::new(3).unwrap()),
|
||||
)
|
||||
.with_i64_field_column("field_int")
|
||||
.with_may_contain_pk_duplicates(true)
|
||||
.with_four_rows_of_data(),
|
||||
);
|
||||
let batch4 = raw_data(&[chunk4]).await[0].clone();
|
||||
//println!("BATCH4: {:#?}", batch4);
|
||||
batches.push(Arc::new(batch4));
|
||||
|
||||
// todo: show what output data look like in comments for easier debug
|
||||
|
||||
batches
|
||||
}
|
||||
}
|
|
@ -2,6 +2,7 @@
|
|||
//!
|
||||
|
||||
use arrow::record_batch::RecordBatch;
|
||||
use data_types::delete_predicate::DeletePredicate;
|
||||
use std::{collections::BTreeMap, sync::Arc};
|
||||
use uuid::Uuid;
|
||||
|
||||
|
@ -167,17 +168,19 @@ pub struct BufferBatch {
|
|||
}
|
||||
|
||||
/// SnapshotBatch contains data of many contiguous BufferBatches
|
||||
#[derive(Debug)]
|
||||
pub struct SnapshotBatch {
|
||||
/// Min sequencer number of its comebined BufferBatches
|
||||
pub min_sequencer_number: SequenceNumber,
|
||||
/// Max sequencer number of its comebined BufferBatches
|
||||
pub max_sequencer_number: SequenceNumber,
|
||||
/// Data of its comebined BufferBatches kept in one RecordBatch
|
||||
pub data: RecordBatch,
|
||||
pub data: Arc<RecordBatch>,
|
||||
}
|
||||
|
||||
/// PersistingBatch contains all needed info and data for creating
|
||||
/// a parquet file for given set of SnapshotBatches
|
||||
#[derive(Debug)]
|
||||
pub struct PersistingBatch {
|
||||
/// Sesquencer id of the data
|
||||
pub sequencer_id: SequencerId,
|
||||
|
@ -191,10 +194,23 @@ pub struct PersistingBatch {
|
|||
/// Id of to-be-created parquet file of this data
|
||||
pub object_store_id: Uuid,
|
||||
|
||||
/// data to be persisted
|
||||
/// data
|
||||
pub data: Arc<QueryableBatch>,
|
||||
}
|
||||
|
||||
/// Queryable data used for both query and persistence
|
||||
#[derive(Debug)]
|
||||
pub struct QueryableBatch {
|
||||
/// data
|
||||
pub data: Vec<SnapshotBatch>,
|
||||
|
||||
/// delete predicates to be appied to the data
|
||||
/// before perssiting
|
||||
/// Tomstones to be applied on data
|
||||
pub deletes: Vec<Tombstone>,
|
||||
|
||||
/// Delete predicates of the tombstones
|
||||
/// Note: this is needed here to return its reference for a trait function
|
||||
pub delete_predicates: Vec<Arc<DeletePredicate>>,
|
||||
|
||||
/// This is needed to return a reference for a trait function
|
||||
pub table_name: String,
|
||||
}
|
||||
|
|
|
@ -11,8 +11,9 @@
|
|||
clippy::use_self,
|
||||
clippy::clone_on_ref_ptr
|
||||
)]
|
||||
|
||||
#[allow(dead_code)]
|
||||
#![allow(dead_code)]
|
||||
pub mod compact;
|
||||
pub mod data;
|
||||
pub mod handler;
|
||||
pub mod query;
|
||||
pub mod server;
|
||||
|
|
|
@ -0,0 +1,322 @@
|
|||
//! Module to handle query on Ingester's data
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use arrow::record_batch::RecordBatch;
|
||||
use data_types::{
|
||||
chunk_metadata::{ChunkAddr, ChunkId, ChunkOrder},
|
||||
delete_predicate::DeletePredicate,
|
||||
partition_metadata::TableSummary,
|
||||
};
|
||||
use datafusion::physical_plan::{common::SizedRecordBatchStream, SendableRecordBatchStream};
|
||||
use iox_catalog::interface::Tombstone;
|
||||
use predicate::{
|
||||
delete_predicate::parse_delete_predicate,
|
||||
predicate::{Predicate, PredicateMatch},
|
||||
};
|
||||
use query::{exec::stringset::StringSet, QueryChunk, QueryChunkMeta};
|
||||
use schema::{merge::SchemaMerger, selection::Selection, sort::SortKey, Schema};
|
||||
use snafu::Snafu;
|
||||
|
||||
use crate::data::{QueryableBatch, SnapshotBatch};
|
||||
|
||||
#[derive(Debug, Snafu)]
|
||||
#[allow(missing_copy_implementations, missing_docs)]
|
||||
pub enum Error {
|
||||
#[snafu(display("Error while building delete predicate from start time, {}, stop time, {}, and serialized predicate, {}", min, max, predicate))]
|
||||
DeletePredicate {
|
||||
source: predicate::delete_predicate::Error,
|
||||
min: String,
|
||||
max: String,
|
||||
predicate: String,
|
||||
},
|
||||
}
|
||||
|
||||
/// A specialized `Error` for Ingester's Query errors
|
||||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||
|
||||
// todo: move this function to a more appropriate crate
|
||||
/// Return the merged schema for RecordBatches
|
||||
///
|
||||
/// This is infallable because the schemas of chunks within a
|
||||
/// partition are assumed to be compatible because that schema was
|
||||
/// enforced as part of writing into the partition
|
||||
pub fn merge_record_batch_schemas(batches: &[Arc<RecordBatch>]) -> Arc<Schema> {
|
||||
let mut merger = SchemaMerger::new();
|
||||
for batch in batches {
|
||||
let schema = Schema::try_from(batch.schema()).expect("Schema conversion error");
|
||||
merger = merger.merge(&schema).expect("schemas compatible");
|
||||
}
|
||||
Arc::new(merger.build())
|
||||
}
|
||||
|
||||
impl QueryableBatch {
|
||||
/// Initilaize a QueryableBatch
|
||||
pub fn new(table_name: &str, data: Vec<SnapshotBatch>, deletes: Vec<Tombstone>) -> Self {
|
||||
let mut delete_predicates = vec![];
|
||||
for delete in &deletes {
|
||||
let delete_predicate = Arc::new(
|
||||
parse_delete_predicate(
|
||||
&delete.min_time.get().to_string(),
|
||||
&delete.max_time.get().to_string(),
|
||||
&delete.serialized_predicate,
|
||||
)
|
||||
.expect("Error building delete predicate"),
|
||||
);
|
||||
|
||||
delete_predicates.push(delete_predicate);
|
||||
}
|
||||
|
||||
Self {
|
||||
data,
|
||||
deletes,
|
||||
delete_predicates,
|
||||
table_name: table_name.to_string(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl QueryChunkMeta for QueryableBatch {
|
||||
fn summary(&self) -> &TableSummary {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
fn schema(&self) -> Arc<Schema> {
|
||||
// todo: may want store this schema as a field of QueryableBatch and
|
||||
// only do this schema merge the first time it is call
|
||||
|
||||
// Merge schema of all RecordBatches of the PerstingBatch
|
||||
let batches: Vec<Arc<RecordBatch>> =
|
||||
self.data.iter().map(|s| Arc::clone(&s.data)).collect();
|
||||
merge_record_batch_schemas(&batches)
|
||||
}
|
||||
|
||||
fn delete_predicates(&self) -> &[Arc<DeletePredicate>] {
|
||||
self.delete_predicates.as_ref()
|
||||
}
|
||||
}
|
||||
|
||||
impl QueryChunk for QueryableBatch {
|
||||
type Error = Error;
|
||||
|
||||
// This function should not be used in PersistingBatch context
|
||||
fn id(&self) -> ChunkId {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
// This function should not be used in PersistingBatch context
|
||||
fn addr(&self) -> ChunkAddr {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
/// Returns the name of the table stored in this chunk
|
||||
fn table_name(&self) -> &str {
|
||||
&self.table_name
|
||||
}
|
||||
|
||||
/// Returns true if the chunk may contain a duplicate "primary
|
||||
/// key" within itself
|
||||
fn may_contain_pk_duplicates(&self) -> bool {
|
||||
// always true because they are not deduplicated yet
|
||||
true
|
||||
}
|
||||
|
||||
/// Returns the result of applying the `predicate` to the chunk
|
||||
/// using an efficient, but inexact method, based on metadata.
|
||||
///
|
||||
/// NOTE: This method is suitable for calling during planning, and
|
||||
/// may return PredicateMatch::Unknown for certain types of
|
||||
/// predicates.
|
||||
fn apply_predicate_to_metadata(
|
||||
&self,
|
||||
_predicate: &Predicate,
|
||||
) -> Result<PredicateMatch, Self::Error> {
|
||||
Ok(PredicateMatch::Unknown)
|
||||
}
|
||||
|
||||
/// Returns a set of Strings with column names from the specified
|
||||
/// table that have at least one row that matches `predicate`, if
|
||||
/// the predicate can be evaluated entirely on the metadata of
|
||||
/// this Chunk. Returns `None` otherwise
|
||||
fn column_names(
|
||||
&self,
|
||||
_predicate: &Predicate,
|
||||
_columns: Selection<'_>,
|
||||
) -> Result<Option<StringSet>, Self::Error> {
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
/// Return a set of Strings containing the distinct values in the
|
||||
/// specified columns. If the predicate can be evaluated entirely
|
||||
/// on the metadata of this Chunk. Returns `None` otherwise
|
||||
///
|
||||
/// The requested columns must all have String type.
|
||||
fn column_values(
|
||||
&self,
|
||||
_column_name: &str,
|
||||
_predicate: &Predicate,
|
||||
) -> Result<Option<StringSet>, Self::Error> {
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
/// Provides access to raw `QueryChunk` data as an
|
||||
/// asynchronous stream of `RecordBatch`es filtered by a *required*
|
||||
/// predicate. Note that not all chunks can evaluate all types of
|
||||
/// predicates and this function will return an error
|
||||
/// if requested to evaluate with a predicate that is not supported
|
||||
///
|
||||
/// This is the analog of the `TableProvider` in DataFusion
|
||||
///
|
||||
/// The reason we can't simply use the `TableProvider` trait
|
||||
/// directly is that the data for a particular Table lives in
|
||||
/// several chunks within a partition, so there needs to be an
|
||||
/// implementation of `TableProvider` that stitches together the
|
||||
/// streams from several different `QueryChunk`s.
|
||||
fn read_filter(
|
||||
&self,
|
||||
_predicate: &Predicate, // no needs because all data will be read for compaction
|
||||
_selection: Selection<'_>, // no needs because all columns will be read and compact
|
||||
) -> Result<SendableRecordBatchStream, Self::Error> {
|
||||
let batches: Vec<_> = self.data.iter().map(|s| Arc::clone(&s.data)).collect();
|
||||
let stream = SizedRecordBatchStream::new(self.schema().as_arrow(), batches);
|
||||
Ok(Box::pin(stream))
|
||||
}
|
||||
|
||||
/// Returns true if data of this chunk is sorted
|
||||
fn is_sorted_on_pk(&self) -> bool {
|
||||
false
|
||||
}
|
||||
|
||||
/// Returns the sort key of the chunk if any
|
||||
fn sort_key(&self) -> Option<SortKey<'_>> {
|
||||
None
|
||||
}
|
||||
|
||||
/// Returns chunk type
|
||||
fn chunk_type(&self) -> &str {
|
||||
"PersistingBatch"
|
||||
}
|
||||
|
||||
// This function should not be used in PersistingBatch context
|
||||
fn order(&self) -> ChunkOrder {
|
||||
unimplemented!()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
use arrow::{
|
||||
array::{
|
||||
ArrayRef, BooleanArray, DictionaryArray, Float64Array, Int64Array, StringArray,
|
||||
TimestampNanosecondArray, UInt64Array,
|
||||
},
|
||||
datatypes::{DataType, Int32Type, TimeUnit},
|
||||
};
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_merge_batch_schema() {
|
||||
// Merge schema of the batches
|
||||
// The fileds in the schema are sorted by column name
|
||||
let batches = create_batches();
|
||||
let merged_schema = (&*merge_record_batch_schemas(&batches)).clone();
|
||||
|
||||
// Expected Arrow schema
|
||||
let arrow_schema = Arc::new(arrow::datatypes::Schema::new(vec![
|
||||
arrow::datatypes::Field::new(
|
||||
"dict",
|
||||
DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
|
||||
true,
|
||||
),
|
||||
arrow::datatypes::Field::new("int64", DataType::Int64, true),
|
||||
arrow::datatypes::Field::new("string", DataType::Utf8, true),
|
||||
arrow::datatypes::Field::new("bool", DataType::Boolean, true),
|
||||
arrow::datatypes::Field::new(
|
||||
"time",
|
||||
DataType::Timestamp(TimeUnit::Nanosecond, None),
|
||||
false,
|
||||
),
|
||||
arrow::datatypes::Field::new("uint64", DataType::UInt64, false),
|
||||
arrow::datatypes::Field::new("float64", DataType::Float64, true),
|
||||
]));
|
||||
let expected_schema = Schema::try_from(arrow_schema)
|
||||
.unwrap()
|
||||
.sort_fields_by_name();
|
||||
|
||||
assert_eq!(
|
||||
expected_schema, merged_schema,
|
||||
"\nExpected:\n{:#?}\nActual:\n{:#?}",
|
||||
expected_schema, merged_schema
|
||||
);
|
||||
}
|
||||
|
||||
// ----------------------------------------------------------------------------------------------
|
||||
// Data for testing
|
||||
// Create pure RecordBatches without knowledge of Influx datatype
|
||||
fn create_batches() -> Vec<Arc<RecordBatch>> {
|
||||
// Batch 1: <dict, i64, str, bool, time> & 3 rows
|
||||
let dict_array: ArrayRef = Arc::new(
|
||||
vec![Some("a"), None, Some("b")]
|
||||
.into_iter()
|
||||
.collect::<DictionaryArray<Int32Type>>(),
|
||||
);
|
||||
let int64_array: ArrayRef =
|
||||
Arc::new([Some(-1), None, Some(2)].iter().collect::<Int64Array>());
|
||||
let string_array: ArrayRef = Arc::new(
|
||||
vec![Some("foo"), Some("and"), Some("bar")]
|
||||
.into_iter()
|
||||
.collect::<StringArray>(),
|
||||
);
|
||||
let bool_array: ArrayRef = Arc::new(
|
||||
[Some(true), None, Some(false)]
|
||||
.iter()
|
||||
.collect::<BooleanArray>(),
|
||||
);
|
||||
let ts_array: ArrayRef = Arc::new(
|
||||
[Some(150), Some(200), Some(1526823730000000000)]
|
||||
.iter()
|
||||
.collect::<TimestampNanosecondArray>(),
|
||||
);
|
||||
let batch1 = RecordBatch::try_from_iter_with_nullable(vec![
|
||||
("dict", dict_array, true),
|
||||
("int64", int64_array, true),
|
||||
("string", string_array, true),
|
||||
("bool", bool_array, true),
|
||||
("time", ts_array, false), // not null
|
||||
])
|
||||
.unwrap();
|
||||
|
||||
// Batch 2: <dict, u64, f64, str, bool, time> & 2 rows
|
||||
let dict_array: ArrayRef = Arc::new(
|
||||
vec![None, Some("d")]
|
||||
.into_iter()
|
||||
.collect::<DictionaryArray<Int32Type>>(),
|
||||
);
|
||||
let uint64_array: ArrayRef = Arc::new([Some(1), Some(2)].iter().collect::<UInt64Array>()); // not null
|
||||
let float64_array: ArrayRef =
|
||||
Arc::new([Some(1.0), Some(2.0)].iter().collect::<Float64Array>());
|
||||
let string_array: ArrayRef = Arc::new(
|
||||
vec![Some("foo"), Some("bar")]
|
||||
.into_iter()
|
||||
.collect::<StringArray>(),
|
||||
);
|
||||
let bool_array: ArrayRef = Arc::new([Some(true), None].iter().collect::<BooleanArray>());
|
||||
let ts_array: ArrayRef = Arc::new(
|
||||
[Some(100), Some(1626823730000000000)] // not null
|
||||
.iter()
|
||||
.collect::<TimestampNanosecondArray>(),
|
||||
);
|
||||
let batch2 = RecordBatch::try_from_iter_with_nullable(vec![
|
||||
("dict", dict_array, true),
|
||||
("uint64", uint64_array, false), // not null
|
||||
("float64", float64_array, true),
|
||||
("string", string_array, true),
|
||||
("bool", bool_array, true),
|
||||
("time", ts_array, false), // not null
|
||||
])
|
||||
.unwrap();
|
||||
|
||||
vec![Arc::new(batch1), Arc::new(batch2)]
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue