refactor: for paul

pull/24376/head
NGA-TRAN 2022-01-21 16:49:02 -05:00
parent 191adc9fc7
commit cd01b141f3
2 changed files with 289 additions and 6 deletions

View File

@ -15,5 +15,6 @@ predicate = { path = "../predicate" }
query = { path = "../query" }
schema = { path = "../schema" }
snafu = "0.7"
tokio = { version = "1.13", features = ["macros", "parking_lot", "rt-multi-thread", "sync", "time"] }
uuid = { version = "0.8", features = ["v4"] }
workspace-hack = { path = "../workspace-hack"}

View File

@ -56,7 +56,7 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Note: the given `executor` should be created with the IngesterServer
pub async fn compact(
executor: &Executor,
data: &'static PersistingBatch,
data: Arc<PersistingBatch>,
) -> Result<SendableRecordBatchStream> {
let chunk = CompactingChunk::new(data);
@ -99,9 +99,9 @@ pub fn merge_record_batch_schemas(batches: &[Arc<RecordBatch>]) -> Arc<Schema> {
/// CompactingChunk is a wrapper of a PersistingBatch that implements
/// QueryChunk and QueryChunkMeta needed to build a query plan to compact its data
#[derive(Debug)]
pub struct CompactingChunk<'a> {
pub struct CompactingChunk {
/// Provided ingest data
pub data: &'a PersistingBatch,
pub data: Arc<PersistingBatch>,
/// Statistic of this data which is currently empty as it may not needed
pub summary: TableSummary,
@ -112,7 +112,7 @@ pub struct CompactingChunk<'a> {
impl<'a> CompactingChunk<'a> {
/// Create a PersistingChunk for a given PesistingBatch
pub fn new(data: &'a PersistingBatch) -> Self {
pub fn new(data: Arc<PersistingBatch>) -> Self {
// Empty table summary because PersistingBatch does not keep stats
// Todo - note to self: the TableSummary is used to get stastistics
// while planning the scan. This empty column stats may
@ -243,8 +243,8 @@ impl QueryChunk for CompactingChunk<'_> {
/// streams from several different `QueryChunk`s.
fn read_filter(
&self,
_predicate: &Predicate,
_selection: Selection<'_>,
_predicate: &Predicate, // no needs because all data will be read for compaction
_selection: Selection<'_>, // no needs because all columns will be read and compact
) -> Result<SendableRecordBatchStream, Self::Error> {
let batches: Vec<_> = self.data.data.iter().map(|s| Arc::clone(&s.data)).collect();
let stream = SizedRecordBatchStream::new(self.schema().as_arrow(), batches);
@ -271,3 +271,285 @@ impl QueryChunk for CompactingChunk<'_> {
unimplemented!()
}
}
#[cfg(test)]
mod tests {
use std::num::NonZeroU64;
use crate::data::SnapshotBatch;
use super::*;
use arrow::{array::{ArrayRef, DictionaryArray, Int64Array, StringArray, BooleanArray, TimestampNanosecondArray, UInt64Array, Float64Array}, datatypes::{Int32Type, DataType, TimeUnit}};
use iox_catalog::interface::{SequenceNumber, SequencerId, TableId, PartitionId};
use query::test::{TestChunk, raw_data};
use uuid::Uuid;
#[tokio::test]
async fn test_merge_batch_schema() {
// Merge schema of the batches
// The fileds in the schema are sorted by column name
let batches = create_batches();
let merged_schema = (&*merge_record_batch_schemas(&batches)).clone();
// Expected Arrow schema
let arrow_schema = Arc::new(arrow::datatypes::Schema::new(vec![
arrow::datatypes::Field::new("dict", DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)), true),
arrow::datatypes::Field::new("int64", DataType::Int64, true),
arrow::datatypes::Field::new("string", DataType::Utf8, true),
arrow::datatypes::Field::new("bool", DataType::Boolean, true),
arrow::datatypes::Field::new("time", DataType::Timestamp(TimeUnit::Nanosecond, None), false),
arrow::datatypes::Field::new("uint64", DataType::UInt64, false),
arrow::datatypes::Field::new("float64", DataType::Float64, true),
]));
let expected_schema = Schema::try_from(arrow_schema).unwrap().sort_fields_by_name();
assert_eq!(
expected_schema, merged_schema,
"\nExpected:\n{:#?}\nActual:\n{:#?}",
expected_schema, merged_schema
);
}
#[tokio::test]
async fn test_compact() {
let batches = create_batches_with_influxtype().await;
let persisting_batch = make_persisting_batch(batches);
let exc = Executor::new(1);
let stream = compact(&exc, persisting_batch).await.unwrap();
let output_batches = datafusion::physical_plan::common::collect(stream).await.unwrap();
println!("output_batches: {:#?}", output_batches);
// let table = pretty_format_batches(&[batch]).unwrap();
// let expected = vec![
// "+------+-------+--------+---------+-------+--------+--------------------------------+",
// "| dict | int64 | uint64 | float64 | bool | string | time |",
// "+------+-------+--------+---------+-------+--------+--------------------------------+",
// "| a | -1 | 1 | 1 | true | foo | |",
// "| | | | | | | 1970-01-01T00:00:00.000000100Z |",
// "| b | 2 | 2 | 2 | false | bar | 2021-07-20T23:28:50Z |",
// "+------+-------+--------+---------+-------+--------+--------------------------------+",
// ];
}
// ----------------------------------------------------------------------------------------------
// Data for testing
// Crerate pure RecordBatches without knowledge of Influx datatype
fn create_batches() -> Vec<Arc<RecordBatch>> {
// Batch 1: <dict, i64, str, bool, time> & 3 rows
let dict_array: ArrayRef = Arc::new(
vec![Some("a"), None, Some("b")]
.into_iter()
.collect::<DictionaryArray<Int32Type>>(),
);
let int64_array: ArrayRef =
Arc::new([Some(-1), None, Some(2)].iter().collect::<Int64Array>());
let string_array: ArrayRef = Arc::new(
vec![Some("foo"), Some("and"), Some("bar")]
.into_iter()
.collect::<StringArray>(),
);
let bool_array: ArrayRef = Arc::new(
[Some(true), None, Some(false)]
.iter()
.collect::<BooleanArray>(),
);
let ts_array: ArrayRef = Arc::new(
[Some(150), Some(200), Some(1526823730000000000)]
.iter()
.collect::<TimestampNanosecondArray>(),
);
let batch1 = RecordBatch::try_from_iter_with_nullable(vec![
("dict", dict_array, true),
("int64", int64_array, true),
("string", string_array, true),
("bool", bool_array, true),
("time", ts_array, false), // not null
])
.unwrap();
// Batch 2: <dict, u64, f64, str, bool, time> & 2 rows
let dict_array: ArrayRef = Arc::new(
vec![None, Some("d")]
.into_iter()
.collect::<DictionaryArray<Int32Type>>(),
);
let uint64_array: ArrayRef =
Arc::new([Some(1), Some(2)].iter().collect::<UInt64Array>()); // not null
let float64_array: ArrayRef = Arc::new(
[Some(1.0), Some(2.0)]
.iter()
.collect::<Float64Array>(),
);
let string_array: ArrayRef = Arc::new(
vec![Some("foo"), Some("bar")]
.into_iter()
.collect::<StringArray>(),
);
let bool_array: ArrayRef = Arc::new(
[Some(true), None]
.iter()
.collect::<BooleanArray>(),
);
let ts_array: ArrayRef = Arc::new(
[Some(100), Some(1626823730000000000)] // not null
.iter()
.collect::<TimestampNanosecondArray>(),
);
let batch2 = RecordBatch::try_from_iter_with_nullable (vec![
("dict", dict_array, true),
("uint64", uint64_array, false), // not null
("float64", float64_array, true),
("string", string_array, true),
("bool", bool_array, true),
("time", ts_array, false), // not null
])
.unwrap();
vec![Arc::new(batch1), Arc::new(batch2)]
}
// RecordBatches with knowledge of influx metadata
pub async fn create_batches_with_influxtype() -> Vec<Arc<RecordBatch>> {
// Use the available TestChunk to create chunks and then convert them to raw RecordBatches
let mut batches = vec![];
// This test covers all kind of chunks: overlap, non-overlap without duplicates within, non-overlap with duplicates within
let chunk1 = Arc::new(
TestChunk::new("t")
.with_id(1)
.with_time_column_with_full_stats(
Some(5),
Some(7000),
10,
Some(NonZeroU64::new(7).unwrap()),
)
.with_tag_column_with_full_stats(
"tag1",
Some("AL"),
Some("MT"),
10,
Some(NonZeroU64::new(3).unwrap()),
)
.with_i64_field_column("field_int")
.with_ten_rows_of_data_some_duplicates(),
);
let batch1 = raw_data(&vec![chunk1]).await[0].clone();
//println!("BATCH1: {:#?}", batch1);
batches.push(Arc::new(batch1));
// chunk2 overlaps with chunk 1
let chunk2 = Arc::new(
TestChunk::new("t")
.with_id(2)
.with_time_column_with_full_stats(
Some(5),
Some(7000),
5,
Some(NonZeroU64::new(5).unwrap()),
)
.with_tag_column_with_full_stats(
"tag1",
Some("AL"),
Some("MT"),
5,
Some(NonZeroU64::new(3).unwrap()),
)
.with_i64_field_column("field_int")
.with_five_rows_of_data(),
);
let batch2 = raw_data(&vec![chunk2]).await[0].clone();
//println!("BATCH2: {:#?}", batch2);
batches.push(Arc::new(batch2));
// chunk3 no overlap, no duplicates within
let chunk3 = Arc::new(
TestChunk::new("t")
.with_id(3)
.with_time_column_with_full_stats(
Some(8000),
Some(20000),
3,
Some(NonZeroU64::new(3).unwrap()),
)
.with_tag_column_with_full_stats(
"tag1",
Some("UT"),
Some("WA"),
3,
Some(NonZeroU64::new(3).unwrap()),
)
.with_i64_field_column("field_int")
.with_three_rows_of_data(),
);
let batch3 = raw_data(&vec![chunk3]).await[0].clone();
//println!("BATCH3: {:#?}", batch3);
batches.push(Arc::new(batch3));
// chunk3 no overlap, duplicates within
let chunk4 = Arc::new(
TestChunk::new("t")
.with_id(4)
.with_time_column_with_full_stats(
Some(28000),
Some(220000),
4,
Some(NonZeroU64::new(3).unwrap()),
)
.with_tag_column_with_full_stats(
"tag1",
Some("UT"),
Some("WA"),
4,
Some(NonZeroU64::new(3).unwrap()),
)
.with_i64_field_column("field_int")
.with_may_contain_pk_duplicates(true)
.with_four_rows_of_data(),
);
let batch4 = raw_data(&vec![chunk4]).await[0].clone();
//println!("BATCH4: {:#?}", batch4);
batches.push(Arc::new(batch4));
batches
}
pub fn make_persisting_batch<'a>(batches: Vec<Arc<RecordBatch>>) -> PersistingBatch {
// make snapshots for the bacthes
let mut snapshots = vec![];
let mut seq_num = 1;
for batch in batches {
let seq = SequenceNumber::new(seq_num);
snapshots.push(make_snapshot_batch(batch, seq, seq ));
seq_num = seq_num + 1;
}
PersistingBatch {
sequencer_id: SequencerId::new(1),
table_id: TableId::new(1),
partition_id: PartitionId::new(1),
object_store_id: Uuid::new_v4(),
data: snapshots,
deletes: vec![],
}
}
pub fn make_snapshot_batch(batch: Arc<RecordBatch>, min: SequenceNumber, max: SequenceNumber) -> SnapshotBatch {
SnapshotBatch {
min_sequencer_number: min,
max_sequencer_number: max,
data: batch,
}
}
}