Merge pull request #270 from influxdata/er/spike/segment_store

feat: Segment Store prototype (in-memory query execution engine)
pull/24376/head
Edd Robinson 2020-09-25 15:12:24 +01:00 committed by GitHub
commit 0abfc3b266
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 7009 additions and 20 deletions

2
.gitignore vendored
View File

@ -1,4 +1,4 @@
/target **/target
**/*.rs.bk **/*.rs.bk
.idea/ .idea/
.env .env

82
Cargo.lock generated
View File

@ -104,6 +104,42 @@ dependencies = [
"serde_json", "serde_json",
] ]
[[package]]
name = "arrow"
version = "2.0.0-SNAPSHOT"
source = "git+https://github.com/apache/arrow.git?rev=aa6889a74c57d6faea0d27ea8013d9b0c7ef809a#aa6889a74c57d6faea0d27ea8013d9b0c7ef809a"
dependencies = [
"arrow-flight",
"chrono",
"csv",
"flatbuffers",
"hex",
"indexmap",
"lazy_static",
"num 0.3.0",
"prettytable-rs",
"rand",
"regex",
"serde",
"serde_derive",
"serde_json",
]
[[package]]
name = "arrow-flight"
version = "2.0.0-SNAPSHOT"
source = "git+https://github.com/apache/arrow.git?rev=aa6889a74c57d6faea0d27ea8013d9b0c7ef809a#aa6889a74c57d6faea0d27ea8013d9b0c7ef809a"
dependencies = [
"bytes",
"futures",
"proc-macro2",
"prost",
"prost-derive",
"tokio",
"tonic",
"tonic-build",
]
[[package]] [[package]]
name = "assert-json-diff" name = "assert-json-diff"
version = "1.1.0" version = "1.1.0"
@ -611,7 +647,7 @@ name = "datafusion"
version = "2.0.0-SNAPSHOT" version = "2.0.0-SNAPSHOT"
source = "git+https://github.com/apache/arrow.git?rev=171e8bfe5fe13467a1763227e495fae6bc5d011d#171e8bfe5fe13467a1763227e495fae6bc5d011d" source = "git+https://github.com/apache/arrow.git?rev=171e8bfe5fe13467a1763227e495fae6bc5d011d#171e8bfe5fe13467a1763227e495fae6bc5d011d"
dependencies = [ dependencies = [
"arrow", "arrow 2.0.0-SNAPSHOT (git+https://github.com/apache/arrow.git?rev=171e8bfe5fe13467a1763227e495fae6bc5d011d)",
"chrono", "chrono",
"clap", "clap",
"crossbeam", "crossbeam",
@ -637,6 +673,7 @@ dependencies = [
"delorean_generated_types", "delorean_generated_types",
"delorean_ingest", "delorean_ingest",
"delorean_line_parser", "delorean_line_parser",
"delorean_mem_qe",
"delorean_object_store", "delorean_object_store",
"delorean_parquet", "delorean_parquet",
"delorean_partitioned_store", "delorean_partitioned_store",
@ -677,7 +714,7 @@ dependencies = [
name = "delorean_arrow" name = "delorean_arrow"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"arrow", "arrow 2.0.0-SNAPSHOT (git+https://github.com/apache/arrow.git?rev=171e8bfe5fe13467a1763227e495fae6bc5d011d)",
"datafusion", "datafusion",
"parquet", "parquet",
] ]
@ -698,6 +735,7 @@ dependencies = [
name = "delorean_ingest" name = "delorean_ingest"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"arrow 2.0.0-SNAPSHOT (git+https://github.com/apache/arrow.git?rev=aa6889a74c57d6faea0d27ea8013d9b0c7ef809a)",
"delorean_line_parser", "delorean_line_parser",
"delorean_table", "delorean_table",
"delorean_table_schema", "delorean_table_schema",
@ -722,6 +760,23 @@ dependencies = [
"snafu", "snafu",
] ]
[[package]]
name = "delorean_mem_qe"
version = "0.1.0"
dependencies = [
"chrono",
"criterion",
"croaring",
"crossbeam",
"delorean_arrow",
"delorean_table",
"env_logger",
"heapsize",
"human_format",
"log",
"snafu",
]
[[package]] [[package]]
name = "delorean_object_store" name = "delorean_object_store"
version = "0.1.0" version = "0.1.0"
@ -1253,6 +1308,15 @@ dependencies = [
"autocfg", "autocfg",
] ]
[[package]]
name = "heapsize"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1679e6ea370dee694f91f1dc469bf94cf8f52051d147aec3e1f9497c6fc22461"
dependencies = [
"winapi 0.3.8",
]
[[package]] [[package]]
name = "heck" name = "heck"
version = "0.3.1" version = "0.3.1"
@ -1546,9 +1610,9 @@ dependencies = [
[[package]] [[package]]
name = "log" name = "log"
version = "0.4.8" version = "0.4.11"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "14b6052be84e6b71ab17edffc2eeabf5c2c3ae1fdb464aae35ac50c67a44e1f7" checksum = "4fabed175da42fed1fa0746b0ea71f412aa9d35e76e95e59b192c64b9dc2bf8b"
dependencies = [ dependencies = [
"cfg-if", "cfg-if",
] ]
@ -1982,7 +2046,7 @@ name = "parquet"
version = "2.0.0-SNAPSHOT" version = "2.0.0-SNAPSHOT"
source = "git+https://github.com/apache/arrow.git?rev=171e8bfe5fe13467a1763227e495fae6bc5d011d#171e8bfe5fe13467a1763227e495fae6bc5d011d" source = "git+https://github.com/apache/arrow.git?rev=171e8bfe5fe13467a1763227e495fae6bc5d011d#171e8bfe5fe13467a1763227e495fae6bc5d011d"
dependencies = [ dependencies = [
"arrow", "arrow 2.0.0-SNAPSHOT (git+https://github.com/apache/arrow.git?rev=171e8bfe5fe13467a1763227e495fae6bc5d011d)",
"brotli", "brotli",
"byteorder", "byteorder",
"chrono", "chrono",
@ -2757,9 +2821,9 @@ checksum = "3757cb9d89161a2f24e1cf78efa0c1fcff485d18e3f55e0aa3480824ddaa0f3f"
[[package]] [[package]]
name = "snafu" name = "snafu"
version = "0.6.8" version = "0.6.9"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c7f5aed652511f5c9123cf2afbe9c244c29db6effa2abb05c866e965c82405ce" checksum = "9c4e6046e4691afe918fd1b603fd6e515bcda5388a1092a9edbada307d159f09"
dependencies = [ dependencies = [
"doc-comment", "doc-comment",
"futures-core", "futures-core",
@ -2769,9 +2833,9 @@ dependencies = [
[[package]] [[package]]
name = "snafu-derive" name = "snafu-derive"
version = "0.6.8" version = "0.6.9"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ebf8f7d5720104a9df0f7076a8682024e958bba0fe9848767bb44f251f3648e9" checksum = "7073448732a89f2f3e6581989106067f403d378faeafb4a50812eb814170d3e5"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",

View File

@ -12,6 +12,7 @@ members = [
"delorean_ingest", "delorean_ingest",
"delorean_line_parser", "delorean_line_parser",
"delorean_object_store", "delorean_object_store",
"delorean_mem_qe",
"delorean_parquet", "delorean_parquet",
"delorean_partitioned_store", "delorean_partitioned_store",
"delorean_table", "delorean_table",
@ -33,6 +34,7 @@ delorean_arrow = { path = "delorean_arrow" }
delorean_generated_types = { path = "delorean_generated_types" } delorean_generated_types = { path = "delorean_generated_types" }
delorean_ingest = { path = "delorean_ingest" } delorean_ingest = { path = "delorean_ingest" }
delorean_line_parser = { path = "delorean_line_parser" } delorean_line_parser = { path = "delorean_line_parser" }
delorean_mem_qe = { path = "delorean_mem_qe" }
delorean_parquet = { path = "delorean_parquet" } delorean_parquet = { path = "delorean_parquet" }
delorean_partitioned_store = { path = "delorean_partitioned_store" } delorean_partitioned_store = { path = "delorean_partitioned_store" }
delorean_table = { path = "delorean_table" } delorean_table = { path = "delorean_table" }
@ -67,7 +69,7 @@ tracing = "0.1"
tracing-futures="0.2.4" tracing-futures="0.2.4"
http = "0.2.0" http = "0.2.0"
snafu = "0.6.2" snafu = "0.6.9"
libflate = "1.0.0" libflate = "1.0.0"
[dev-dependencies] [dev-dependencies]

View File

@ -7,7 +7,7 @@ edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies] [dependencies]
arrow = { git = "https://github.com/apache/arrow.git", rev="aa6889a74c57d6faea0d27ea8013d9b0c7ef809a", version = "2.0.0-SNAPSHOT" }
snafu = "0.6.2" snafu = "0.6.2"
env_logger = "0.7.1" env_logger = "0.7.1"
log = "0.4.8" log = "0.4.8"

View File

@ -508,6 +508,154 @@ fn pack_lines<'a>(schema: &Schema, lines: &[ParsedLine<'a>]) -> Vec<Packers> {
packers packers
} }
// use arrow::array;
// use arrow::datatypes;
// use arrow::ipc::writer;
// use arrow::record_batch;
// use std::fs::File;
// use std::sync::Arc;
// fn arrow_datatype(datatype: DataType) -> datatypes::DataType {
// match datatype {
// DataType::Float => datatypes::DataType::Float64,
// DataType::Integer => datatypes::DataType::Int64,
// DataType::String => datatypes::DataType::Utf8,
// // DataType::String => datatypes::DataType::Dictionary(
// // std::boxed::Box::new(datatypes::DataType::Int16),
// // std::boxed::Box::new(datatypes::DataType::Utf8),
// // ),
// DataType::Boolean => datatypes::DataType::Boolean,
// DataType::Timestamp => datatypes::DataType::Int64,
// }
// }
// fn write_arrow_file(parquet_schema: Schema, packers: Vec<Packers>) -> Result<(), Error> {
// let file = File::create("/tmp/http_api_requests_total.arrow").unwrap();
// let mut record_batch_fields: Vec<datatypes::Field> = vec![];
// // no default() on Field...
// record_batch_fields.resize(
// parquet_schema.get_col_defs().len(),
// datatypes::Field::new("foo", datatypes::DataType::Int64, false),
// );
// for col_def in parquet_schema.get_col_defs() {
// let nullable = col_def.data_type != DataType::Timestamp;
// // if col_def.data_type == DataType::Timestamp {
// // nullable = false;
// // } else {
// // nullable = true;
// // }
// record_batch_fields[col_def.index as usize] = datatypes::Field::new(
// col_def.name.as_str(),
// arrow_datatype(col_def.data_type),
// nullable,
// );
// }
// println!("{:?}", record_batch_fields);
// println!("{:?}", parquet_schema.get_col_defs());
// let schema = datatypes::Schema::new(record_batch_fields);
// let mut writer = writer::StreamWriter::try_new(file, &schema).unwrap();
// // let num_rows = packers[0].num_rows();
// let batch_size = 60_000;
// let mut packer_chunkers: Vec<PackerChunker<'_>> = vec![];
// for packer in &packers {
// packer_chunkers.push(packer.chunk_values(batch_size));
// }
// loop {
// let mut chunked_packers: Vec<Packers> = Vec::with_capacity(packers.len());
// for chunker in &mut packer_chunkers {
// match chunker {
// PackerChunker::Float(c) => {
// if let Some(chunk) = c.next() {
// chunked_packers.push(Packers::Float(Packer::from(chunk)));
// }
// }
// PackerChunker::Integer(c) => {
// if let Some(chunk) = c.next() {
// chunked_packers.push(Packers::Integer(Packer::from(chunk)));
// }
// }
// PackerChunker::String(c) => {
// if let Some(chunk) = c.next() {
// chunked_packers.push(Packers::String(Packer::from(chunk)));
// }
// }
// PackerChunker::Boolean(c) => {
// if let Some(chunk) = c.next() {
// chunked_packers.push(Packers::Boolean(Packer::from(chunk)));
// }
// }
// }
// }
// if chunked_packers.is_empty() {
// break;
// }
// // let sort = [0, 7, 6, 12];
// // let sort = [8, 4, 9, 0, 1, 7, 10, 6, 5, 2, 3, 12];
// let sort = [3, 2, 5, 6, 10, 7, 1, 0, 9, 4, 8, 12];
// delorean_table::sorter::sort(&mut chunked_packers, &sort).unwrap();
// println!(
// "Writing {:?} packers with size: {:?}",
// chunked_packers.len(),
// chunked_packers[0].num_rows()
// );
// write_arrow_batch(&mut writer, Arc::new(schema.clone()), chunked_packers);
// }
// writer.finish().unwrap();
// Ok(())
// }
// fn write_arrow_batch(
// w: &mut writer::StreamWriter<File>,
// schema: Arc<datatypes::Schema>,
// packers: Vec<Packers>,
// ) {
// let mut record_batch_arrays: Vec<array::ArrayRef> = vec![];
// for packer in packers {
// match packer {
// Packers::Float(p) => {
// record_batch_arrays.push(Arc::new(array::Float64Array::from(p.values().to_vec())));
// }
// Packers::Integer(p) => {
// record_batch_arrays.push(Arc::new(array::Int64Array::from(p.values().to_vec())));
// }
// Packers::String(p) => {
// let mut builder = array::StringBuilder::new(p.num_rows());
// for v in p.values() {
// match v {
// Some(v) => {
// builder.append_value(v.as_utf8().unwrap()).unwrap();
// }
// None => {
// builder.append_null().unwrap();
// }
// }
// }
// let array = builder.finish();
// record_batch_arrays.push(Arc::new(array));
// }
// Packers::Boolean(p) => {
// let array = array::BooleanArray::from(p.values().to_vec());
// record_batch_arrays.push(Arc::new(array));
// }
// }
// }
// let record_batch = record_batch::RecordBatch::try_new(schema, record_batch_arrays).unwrap();
// w.write(&record_batch).unwrap();
// }
/// Converts one or more TSM files into the delorean_table internal columnar /// Converts one or more TSM files into the delorean_table internal columnar
/// data format and then passes that converted data to a `DeloreanTableWriter`. /// data format and then passes that converted data to a `DeloreanTableWriter`.
pub struct TSMFileConverter { pub struct TSMFileConverter {

View File

@ -0,0 +1,25 @@
[package]
name = "delorean_mem_qe"
version = "0.1.0"
authors = ["Edd Robinson <me@edd.io>"]
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
delorean_arrow = { path = "../delorean_arrow" }
delorean_table = { path = "../delorean_table" }
heapsize = "0.4.2"
snafu = "0.6.8"
croaring = "0.4.5"
crossbeam = "0.7.3"
chrono = "0.4"
log = "0.4.11"
env_logger = "0.7.1"
human_format = "1.0.3"
[dev-dependencies]
criterion = "0.3"

View File

@ -0,0 +1,95 @@
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
const BATCH_SIZES: [usize; 5] = [10, 100, 1_000, 10_000, 100_000];
const CARDINALITIES: [usize; 4] = [1, 5, 10, 100];
fn encoding_drle_row_ids_sorted(c: &mut Criterion) {
benchmark_row_ids(
c,
"encoding_drle_row_ids_sorted",
&BATCH_SIZES,
&CARDINALITIES,
);
}
fn benchmark_row_ids(
c: &mut Criterion,
benchmark_group_name: &str,
batch_sizes: &[usize],
cardinalities: &[usize],
) {
let mut group = c.benchmark_group(benchmark_group_name);
for &batch_size in batch_sizes {
for &cardinality in cardinalities {
let mut input = delorean_mem_qe::encoding::DictionaryRLE::new();
let values = batch_size / cardinality;
for i in 0..cardinality {
input.push_additional(Some(i.to_string()), values as u64);
}
group.throughput(Throughput::Bytes(batch_size as u64));
group.bench_with_input(
BenchmarkId::from_parameter(format!("{:?}_{:?}", batch_size, cardinality)),
&input,
|b, input| {
b.iter(|| {
// do work
for i in 0..cardinality {
let _ = input.row_ids(Some(i.to_string())).collect::<Vec<usize>>();
}
});
},
);
}
}
group.finish();
}
fn encoding_drle_row_ids_sorted_roaring(c: &mut Criterion) {
benchmark_row_ids_roaring(
c,
"encoding_drle_row_ids_sorted_roaring",
&BATCH_SIZES,
&CARDINALITIES,
);
}
fn benchmark_row_ids_roaring(
c: &mut Criterion,
benchmark_group_name: &str,
batch_sizes: &[usize],
cardinalities: &[usize],
) {
let mut group = c.benchmark_group(benchmark_group_name);
for &batch_size in batch_sizes {
for &cardinality in cardinalities {
let mut input = delorean_mem_qe::encoding::DictionaryRLE::new();
let values = batch_size / cardinality;
for i in 0..cardinality {
input.push_additional(Some(i.to_string()), values as u64);
}
group.throughput(Throughput::Bytes(batch_size as u64));
group.bench_with_input(
BenchmarkId::from_parameter(format!("{:?}_{:?}", batch_size, cardinality)),
&input,
|b, input| {
b.iter(|| {
// do work
for i in 0..cardinality {
let _ = input.row_ids_eq_roaring(Some(i.to_string()));
}
});
},
);
}
}
group.finish();
}
criterion_group!(
benches,
encoding_drle_row_ids_sorted,
encoding_drle_row_ids_sorted_roaring
);
criterion_main!(benches);

View File

@ -0,0 +1,331 @@
//! Code for interfacing and running queries in DataFusion
// use crate::Store;
// use delorean_arrow::arrow::{
// datatypes::{Schema, SchemaRef},
// record_batch::{RecordBatch, RecordBatchReader},
// util::pretty,
// };
// use delorean_arrow::datafusion::prelude::*;
// use delorean_arrow::datafusion::{
// datasource::TableProvider,
// execution::{
// context::ExecutionContextState,
// physical_plan::{common::RecordBatchIterator, ExecutionPlan, Partition},
// },
// logicalplan::{make_logical_plan_node, Expr, LogicalPlan},
// lp::LogicalPlanNode,
// optimizer::utils,
// };
// use crate::column;
// use std::{
// fmt,
// sync::{Arc, Mutex},
// };
// Wrapper to adapt a Store to a DataFusion "TableProvider" --
// eventually we could also implement this directly on Store
// pub struct StoreTableSource {
// store: Arc<Store>,
// }
// impl<'a> StoreTableSource {
// pub fn new(store: Arc<Store>) -> Self {
// Self { store }
// }
// }
// impl TableProvider for StoreTableSource {
// /// Get a reference to the schema for this table
// fn schema(&self) -> SchemaRef {
// self.store.schema()
// }
// /// Perform a scan of a table and return a sequence of iterators over the data (one
// /// iterator per partition)
// fn scan(
// &self,
// _projection: &Option<Vec<usize>>,
// _batch_size: usize,
// ) -> delorean_arrow::datafusion::error::Result<Vec<Arc<dyn Partition>>> {
// unimplemented!("scan not yet implemented");
// }
// }
// /// Prototype of how a Delorean query engine, built on top of
// /// DataFusion, but using specialized column store operators might
// /// look like.
// ///
// /// Data from the Segments in the `store` are visible in DataFusion
// /// as a table ("measurement") in this prototype.
// pub struct DeloreanQueryEngine {
// ctx: ExecutionContext,
// store: Arc<Store>,
// }
// impl DeloreanQueryEngine {
// pub fn new(store: Arc<Store>) -> Self {
// let start = std::time::Instant::now();
// let mut ctx = ExecutionContext::new();
// let source = StoreTableSource::new(store.clone());
// let source = Box::new(source);
// ctx.register_table("measurement", source);
// println!("Completed setup in {:?}", start.elapsed());
// DeloreanQueryEngine { ctx, store }
// }
// // Run the specified SQL and return the number of records matched
// pub fn run_sql(&mut self, sql: &str) -> usize {
// let plan = self
// .ctx
// .create_logical_plan(sql)
// .expect("Creating the logical plan");
// //println!("Created logical plan:\n{:?}", plan);
// let plan = self.rewrite_to_segment_scan(&plan);
// //println!("Rewritten logical plan:\n{:?}", plan);
// match self.ctx.collect_plan(&plan) {
// Err(err) => {
// println!("Error running query: {:?}", err);
// 0
// }
// Ok(results) => {
// if results.is_empty() {
// //println!("Empty result returned");
// 0
// } else {
// pretty::print_batches(&results).expect("printing");
// results.iter().map(|b| b.num_rows()).sum()
// }
// }
// }
// }
// /// Specialized optimizer pass that combines a `TableScan` and a `Filter`
// /// together into a SegementStore with the predicates.
// ///
// /// For example, given this input:
// ///
// /// Projection: #env, #method, #host, #counter, #time
// /// Filter: #time GtEq Int64(1590036110000000)
// /// TableScan: measurement projection=None
// ///
// /// The following plan would be produced
// /// Projection: #env, #method, #host, #counter, #time
// /// SegmentScan: measurement projection=None predicate=: #time GtEq Int64(1590036110000000)
// ///
// fn rewrite_to_segment_scan(&self, plan: &LogicalPlan) -> LogicalPlan {
// if let LogicalPlan::Filter { predicate, input } = plan {
// // see if the input is a TableScan
// if let LogicalPlan::TableScan { .. } = **input {
// return make_logical_plan_node(Box::new(SegmentScan::new(
// self.store.clone(),
// predicate.clone(),
// )));
// }
// }
// // otherwise recursively apply
// let optimized_inputs = utils::inputs(&plan)
// .iter()
// .map(|input| self.rewrite_to_segment_scan(input))
// .collect();
// return utils::from_plan(plan, &utils::expressions(plan), &optimized_inputs)
// .expect("Created plan");
// }
// }
// /// LogicalPlan node that serves as a scan of the segment store with optional predicates
// struct SegmentScan {
// /// The underlying Store
// store: Arc<Store>,
// schema: SchemaRef,
// /// The predicate to apply during the scan
// predicate: Expr,
// }
// impl<'a> SegmentScan {
// fn new(store: Arc<Store>, predicate: Expr) -> Self {
// let schema = store.schema().clone();
// SegmentScan {
// store,
// schema,
// predicate,
// }
// }
// }
// impl LogicalPlanNode for SegmentScan {
// /// Return a reference to the logical plan's inputs
// fn inputs(&self) -> Vec<&LogicalPlan> {
// Vec::new()
// }
// /// Get a reference to the logical plan's schema
// fn schema(&self) -> &Schema {
// self.schema.as_ref()
// }
// /// returns all expressions (non-recursively) in the current logical plan node.
// fn expressions(&self) -> Vec<Expr> {
// // The predicate expression gets absorbed by this node As
// // there are no inputs, there are no exprs that operate on
// // inputs
// Vec::new()
// }
// /// Write a single line human readable string to `f` for use in explain plan
// fn format_for_explain(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
// write!(
// f,
// "SegmentScan: {:?} predicate {:?}",
// self.store.as_ref() as *const Store,
// self.predicate
// )
// }
// /// Create a clone of this node.
// ///
// /// Note std::Clone needs a Sized type, so we must implement a
// /// clone that creates a node with a known Size (i.e. Box)
// //
// fn dyn_clone(&self) -> Box<dyn LogicalPlanNode> {
// Box::new(SegmentScan::new(self.store.clone(), self.predicate.clone()))
// }
// /// Create a clone of this LogicalPlanNode with inputs and expressions replaced.
// ///
// /// Note that exprs and inputs are in the same order as the result
// /// of self.inputs and self.exprs.
// ///
// /// So, clone_from_template(exprs).exprs() == exprs
// fn clone_from_template(
// &self,
// exprs: &Vec<Expr>,
// inputs: &Vec<LogicalPlan>,
// ) -> Box<dyn LogicalPlanNode> {
// assert_eq!(exprs.len(), 0, "no exprs expected");
// assert_eq!(inputs.len(), 0, "no inputs expected");
// Box::new(SegmentScan::new(self.store.clone(), self.predicate.clone()))
// }
// /// Create the corresponding physical scheplan for this node
// fn create_physical_plan(
// &self,
// input_physical_plans: Vec<Arc<dyn ExecutionPlan>>,
// _ctx_state: Arc<Mutex<ExecutionContextState>>,
// ) -> delorean_arrow::datafusion::error::Result<Arc<dyn ExecutionPlan>> {
// assert_eq!(input_physical_plans.len(), 0, "Can not have inputs");
// // If this were real code, we would now progrmatically
// // transform the DataFusion Expr into the specific form needed
// // by the Segment. However, to save prototype time we just
// // hard code it here instead
// assert_eq!(
// format!("{:?}", self.predicate),
// "CAST(#time AS Int64) GtEq Int64(1590036110000000) And CAST(#time AS Int64) Lt Int64(1590040770000000) And #env Eq Utf8(\"prod01-eu-central-1\")"
// );
// let time_range = (1590036110000000, 1590040770000000);
// let string_predicate = StringPredicate {
// col_name: "env".into(),
// value: "prod01-eu-central-1".into(),
// };
// Ok(Arc::new(SegmentScanExec::new(
// self.store.clone(),
// time_range,
// string_predicate,
// )))
// }
// }
// #[derive(Debug, Clone)]
// struct StringPredicate {
// col_name: String,
// value: String,
// }
// /// StoreScan execution node
// #[derive(Debug)]
// pub struct SegmentScanExec {
// store: Arc<Store>,
// // Specialized predicates to apply
// time_range: (i64, i64),
// string_predicate: StringPredicate,
// }
// impl SegmentScanExec {
// fn new(store: Arc<Store>, time_range: (i64, i64), string_predicate: StringPredicate) -> Self {
// SegmentScanExec {
// store,
// time_range,
// string_predicate,
// }
// }
// }
// impl ExecutionPlan for SegmentScanExec {
// fn schema(&self) -> SchemaRef {
// self.store.schema()
// }
// fn partitions(&self) -> delorean_arrow::datafusion::error::Result<Vec<Arc<dyn Partitioning>>> {
// let store = self.store.clone();
// Ok(vec![Arc::new(SegmentPartition {
// store,
// time_range: self.time_range,
// string_predicate: self.string_predicate.clone(),
// })])
// }
// }
// #[derive(Debug)]
// struct SegmentPartition {
// store: Arc<Store>,
// time_range: (i64, i64),
// string_predicate: StringPredicate,
// }
// impl Partition for SegmentPartition {
// fn execute(
// &self,
// ) -> delorean_arrow::datafusion::error::Result<Arc<Mutex<dyn RecordBatchReader + Send + Sync>>>
// {
// let combined_results: Vec<Arc<RecordBatch>> = vec![];
// let segments = self.store.segments();
// // prepare the string predicates in the manner Segments want them
// let col_name = &self.string_predicate.col_name;
// let scalar = column::Scalar::String(&self.string_predicate.value);
// // Here
// let _columns = segments.read_filter_eq(
// self.time_range,
// &[(col_name, Some(scalar))],
// vec![
// "env".to_string(),
// "method".to_string(),
// "host".to_string(),
// "counter".to_string(),
// "time".to_string(),
// ],
// );
// // If we were implementing this for real, we would not convert
// // `columns` into RecordBatches and feed them back out
// Ok(Arc::new(Mutex::new(RecordBatchIterator::new(
// self.store.schema().clone(),
// combined_results,
// ))))
// }
// }

View File

@ -0,0 +1,717 @@
use std::{
env,
ffi::OsStr,
fs,
fs::File,
path::{Path, PathBuf},
rc::Rc,
sync::Arc,
};
use datatypes::TimeUnit;
use snafu::Snafu;
use delorean_arrow::arrow::array::StringArrayOps;
use delorean_arrow::arrow::record_batch::{RecordBatch, RecordBatchReader};
use delorean_arrow::arrow::{array, array::Array, datatypes, ipc};
use delorean_arrow::parquet::arrow::arrow_reader::ArrowReader;
use delorean_mem_qe::column;
use delorean_mem_qe::column::{AggregateType, Column};
use delorean_mem_qe::segment::{ColumnType, GroupingStrategy, Schema, Segment};
use delorean_mem_qe::Store;
// use delorean_mem_qe::{adapter::DeloreanQueryEngine, Store};
#[derive(Snafu, Debug, Clone, Copy, PartialEq)]
pub enum Error {}
fn format_size(sz: usize) -> String {
human_format::Formatter::new().format(sz as f64)
}
fn main() {
env_logger::init();
let args: Vec<String> = env::args().collect();
let path = &args[1];
let mut sort_order = vec![];
if let Some(arg) = args.get(2) {
sort_order = arg.split(',').collect::<Vec<_>>();
println!("sort is {:?}", sort_order);
};
let mut store = Store::default();
match Path::new(path).extension().and_then(OsStr::to_str) {
Some("arrow") => build_arrow_store(path, &mut store, sort_order).unwrap(),
Some("parquet") => build_parquet_store(path, &mut store, sort_order).unwrap(),
_ => panic!("unsupported file type"),
}
println!(
"total segments {:?} with total size {} ({})",
store.segment_total(),
format_size(store.size()),
store.size()
);
let store = Arc::new(store);
time_select_with_pred(&store);
// time_datafusion_select_with_pred(store.clone());
time_first_host(&store);
time_sum_range(&store);
time_count_range(&store);
time_group_single_with_pred(&store);
time_group_by_multi_agg_count(&store);
time_group_by_multi_agg_sorted_count(&store);
time_window_agg_count(&store);
time_tag_keys_with_pred(&store);
time_tag_values_with_pred(&store);
time_group_by_different_columns(&store);
}
fn build_parquet_store(path: &str, store: &mut Store, sort_order: Vec<&str>) -> Result<(), Error> {
let path = PathBuf::from(path);
let r = File::open(&path).unwrap();
let file_size = fs::metadata(&path).expect("read metadata").len();
println!(
"Reading {} ({}) bytes of Parquet from {:?}....",
format_size(file_size as usize),
file_size,
path
);
let parquet_reader =
delorean_arrow::parquet::file::reader::SerializedFileReader::new(r).unwrap();
let mut reader = delorean_arrow::parquet::arrow::arrow_reader::ParquetFileArrowReader::new(
Rc::new(parquet_reader),
);
let batch_size = 60000;
let record_batch_reader = reader.get_record_reader(batch_size).unwrap();
build_store(record_batch_reader, store, sort_order)
}
fn build_arrow_store(path: &str, store: &mut Store, sort_order: Vec<&str>) -> Result<(), Error> {
let r = File::open(Path::new(path)).unwrap();
let file_size = fs::metadata(&path).expect("read metadata").len();
println!(
"Reading {} ({}) bytes of Arrow from {:?}....",
format_size(file_size as usize),
file_size,
path
);
let reader = ipc::reader::StreamReader::try_new(r).unwrap();
build_store(reader, store, sort_order)
}
fn build_store(
mut reader: impl RecordBatchReader,
store: &mut Store,
sort_order: Vec<&str>,
) -> Result<(), Error> {
let mut total_rows_read = 0;
let start = std::time::Instant::now();
loop {
let rb = reader.next_batch();
match rb {
Err(e) => println!("WARNING: error reading batch: {:?}, SKIPPING", e),
Ok(Some(rb)) => {
// if i < 360 {
// i += 1;
// continue;
// }
let schema = Schema::with_sort_order(
rb.schema(),
sort_order.iter().map(|s| s.to_string()).collect(),
);
total_rows_read += rb.num_rows();
let mut segment = Segment::new(rb.num_rows(), schema);
convert_record_batch(rb, &mut segment)?;
log::debug!("{}", &segment);
store.add_segment(segment);
}
Ok(None) => {
let now = std::time::Instant::now();
println!(
"Completed loading {} rows in {:?}",
total_rows_read,
now - start
);
return Ok(());
}
}
}
}
fn convert_record_batch(rb: RecordBatch, segment: &mut Segment) -> Result<(), Error> {
println!(
"Loading record batch: cols {:?} rows {:?}",
rb.num_columns(),
rb.num_rows()
);
for (i, column) in rb.columns().iter().enumerate() {
match *column.data_type() {
datatypes::DataType::Float64 => {
if column.null_count() > 0 {
panic!("null floats");
}
let arr = column
.as_any()
.downcast_ref::<array::Float64Array>()
.unwrap();
let column = Column::from(arr.value_slice(0, rb.num_rows()));
segment.add_column(rb.schema().field(i).name(), ColumnType::Field(column));
// TODO(edd): figure out how to get ownership here without
// cloning
// let arr: array::Float64Array = arrow::array::PrimitiveArray::from(column.data());
// let column = Column::from(arr);
// segment.add_column(rb.schema().field(i).name(), column);
}
datatypes::DataType::Int64 => {
if column.null_count() > 0 {
panic!("null integers not expected in testing");
}
let arr = column.as_any().downcast_ref::<array::Int64Array>().unwrap();
let column = Column::from(arr.value_slice(0, rb.num_rows()));
segment.add_column(rb.schema().field(i).name(), ColumnType::Time(column));
// TODO(edd): figure out how to get ownership here without
// cloning
// let arr: array::Int64Array = arrow::array::PrimitiveArray::from(column.data());
// let column = Column::from(arr);
// segment.add_column(rb.schema().field(i).name(), column);
}
datatypes::DataType::Timestamp(TimeUnit::Microsecond, None) => {
if column.null_count() > 0 {
panic!("null timestamps not expected in testing");
}
let arr = column
.as_any()
.downcast_ref::<array::TimestampMicrosecondArray>()
.unwrap();
let column = Column::from(arr.value_slice(0, rb.num_rows()));
segment.add_column(rb.schema().field(i).name(), ColumnType::Time(column));
// TODO(edd): figure out how to get ownership here without
// cloning
// let arr: array::TimestampMicrosecondArray =
// arrow::array::PrimitiveArray::from(column.data());
// let column = Column::from(arr);
// segment.add_column(rb.schema().field(i).name(), column);
}
datatypes::DataType::Utf8 => {
let arr = column
.as_any()
.downcast_ref::<array::StringArray>()
.unwrap();
// IMPORTANT - build a set of values (dictionary) ahead of
// time so we can ensure we encoded the column in an ordinally
// correct manner.
//
// We can use a trick where encoded integers are ordered according
// to the decoded values, making sorting, comparison and grouping
// more efficient.
//
let mut dictionary: std::collections::BTreeSet<Option<String>> =
std::collections::BTreeSet::new();
for j in 1..arr.len() {
let next = if column.is_null(j) {
None
} else {
Some(arr.value(j).to_string())
};
dictionary.insert(next);
}
let mut c = column::String::with_dictionary(dictionary);
let mut prev = if !column.is_null(0) {
Some(arr.value(0))
} else {
None
};
let mut count = 1_u64;
for j in 1..arr.len() {
let next = if column.is_null(j) {
None
} else {
Some(arr.value(j))
};
if prev == next {
count += 1;
continue;
}
match prev {
Some(x) => c.add_additional(Some(x.to_string()), count),
None => c.add_additional(None, count),
}
prev = next;
count = 1;
}
// Add final batch to column if any
match prev {
Some(x) => c.add_additional(Some(x.to_string()), count),
None => c.add_additional(None, count),
}
segment.add_column(
rb.schema().field(i).name(),
ColumnType::Tag(Column::String(c)),
);
}
datatypes::DataType::Boolean => {
panic!("unsupported");
}
_ => panic!("unsupported datatype"),
}
}
Ok(())
}
//
// SELECT FIRST(host) FROM measurement
//
fn time_first_host(store: &Store) {
let repeat = 100;
let mut total_time: std::time::Duration = std::time::Duration::new(0, 0);
let mut track = 0;
let segments = store.segments();
for _ in 0..repeat {
let now = std::time::Instant::now();
let (ts, _, _) = segments.first("host").unwrap();
total_time += now.elapsed();
track += ts;
}
println!(
"time_first_host ran {:?} in {:?} {:?} / call {:?}",
repeat,
total_time,
total_time / repeat,
track
);
}
//
// SELECT SUM(counter) FROM measurement
// WHERE time >= "2020-05-07 06:48:00" AND time < "2020-05-21 07:00:10"
//
fn time_sum_range(store: &Store) {
let repeat = 100;
let mut total_time: std::time::Duration = std::time::Duration::new(0, 0);
let segments = store.segments();
let mut track = 0.0;
for _ in 0..repeat {
let now = std::time::Instant::now();
for segment in segments.segments() {
let filtered_ids =
segment.filter_by_predicates_eq((1588834080000000, 1590044410000000), &[]);
if let Some(mut row_ids) = filtered_ids {
if let column::Scalar::Float(v) =
segment.sum_column("counter", &mut row_ids).unwrap()
{
track += v;
}
}
}
total_time += now.elapsed();
}
println!(
"time_sum_range ran {:?} in {:?} {:?} / total {:?}",
repeat,
total_time,
total_time / repeat,
track
);
}
//
// SELECT COUNT(counter) FROM measurement
// WHERE time >= "2020-05-07 06:48:00" AND time < "2020-05-21 07:00:10"
//
fn time_count_range(store: &Store) {
let repeat = 100;
let mut total_time: std::time::Duration = std::time::Duration::new(0, 0);
let mut track = 0;
let segments = store.segments();
for _ in 0..repeat {
let now = std::time::Instant::now();
for segment in segments.segments() {
let filtered_ids =
segment.filter_by_predicates_eq((1588834080000000, 1590044410000000), &[]);
if let Some(mut row_ids) = filtered_ids {
track += segment.count_column("counter", &mut row_ids).unwrap();
}
}
total_time += now.elapsed();
}
println!(
"time_count_range ran {:?} in {:?} {:?} / total {:?}",
repeat,
total_time,
total_time / repeat,
track
);
}
//
// SELECT env, method, host, counter, time
// FROM measurement
// WHERE time >= "2020-05-21 04:41:50" AND time < "2020-05-21 05:59:30"
// AND "env" = "prod01-eu-central-1"
fn time_select_with_pred(store: &Store) {
let repeat = 100;
let mut total_time: std::time::Duration = std::time::Duration::new(0, 0);
let mut track = 0;
let segments = store.segments();
for _ in 0..repeat {
let now = std::time::Instant::now();
let columns = segments.read_filter_eq(
(1590036110000000, 1590040770000000),
&[("env", "prod01-eu-central-1")],
vec![
"env".to_string(),
"method".to_string(),
"host".to_string(),
"counter".to_string(),
"time".to_string(),
],
);
total_time += now.elapsed();
track += columns.len();
}
println!(
"time_select_with_pred ran {:?} in {:?} {:?} / call {:?}",
repeat,
total_time,
total_time / repeat,
track
);
}
/// DataFusion implementation of
//
// SELECT env, method, host, counter, time
// FROM measurement
// WHERE time >= "2020-05-21 04:41:50" AND time < "2020-05-21 05:59:30"
// AND "env" = "prod01-eu-central-1"
//
// Use the hard coded timestamp values 1590036110000000, 1590040770000000
// fn time_datafusion_select_with_pred(store: Arc<Store>) {
// let mut query_engine = DeloreanQueryEngine::new(store);
// let sql_string = r#"SELECT env, method, host, counter, time
// FROM measurement
// WHERE time::BIGINT >= 1590036110000000
// AND time::BIGINT < 1590040770000000
// AND env = 'prod01-eu-central-1'
// "#;
// let repeat = 100;
// let mut total_time: std::time::Duration = std::time::Duration::new(0, 0);
// let mut track = 0;
// for _ in 0..repeat {
// let now = std::time::Instant::now();
// track += query_engine.run_sql(&sql_string);
// total_time += now.elapsed();
// }
// println!(
// "time_datafusion_select_with_pred ran {:?} in {:?} {:?} / call {:?}",
// repeat,
// total_time,
// total_time / repeat,
// track
// );
// }
//
// SELECT env, method, host, counter, time
// FROM measurement
// WHERE time >= "2020-05-21 04:41:50" AND time < "2020-05-21 05:59:30"
// AND "env" = "prod01-eu-central-1"
//
fn time_group_single_with_pred(store: &Store) {
let repeat = 100;
let mut total_time: std::time::Duration = std::time::Duration::new(0, 0);
let mut track = 0;
let segments = store.segments();
for _ in 0..repeat {
let now = std::time::Instant::now();
for segment in segments.segments() {
let results = segment.group_single_agg_by_predicate_eq(
(1588834080000000, 1590044410000000),
&[],
&"env".to_string(),
&[("counter".to_string(), AggregateType::Count)],
);
track += results.len();
}
total_time += now.elapsed();
}
println!(
"time_group_single_with_pred ran {:?} in {:?} {:?} / call {:?}",
repeat,
total_time,
total_time / repeat,
track
);
}
//
// SELECT COUNT(counter)
// FROM measurement
// WHERE time >= "2020-05-21 04:41:50" AND time < "2020-05-21 05:59:30"
// GROUP BY "status", "method"
//
fn time_group_by_multi_agg_count(store: &Store) {
let strats = vec![
GroupingStrategy::HashGroup,
GroupingStrategy::HashGroupConcurrent,
GroupingStrategy::SortGroup,
GroupingStrategy::SortGroupConcurrent,
];
for strat in &strats {
let repeat = 10;
let mut total_time: std::time::Duration = std::time::Duration::new(0, 0);
let mut total_max = 0;
let segments = store.segments();
for _ in 0..repeat {
let now = std::time::Instant::now();
let groups = segments.read_group_eq(
(1589000000000001, 1590044410000001),
&[],
vec!["status".to_string(), "method".to_string()],
vec![("counter".to_string(), AggregateType::Count)],
0,
strat,
);
total_time += now.elapsed();
total_max += groups.len();
}
println!(
"time_group_by_multi_agg_count_{:?} ran {:?} in {:?} {:?} / call {:?}",
strat,
repeat,
total_time,
total_time / repeat,
total_max
);
}
}
//
// SELECT COUNT(counter)
// FROM measurement
// WHERE time >= "2020-05-21 04:41:50" AND time < "2020-05-21 05:59:30"
// GROUP BY "env", "role"
//
fn time_group_by_multi_agg_sorted_count(store: &Store) {
let strats = vec![
GroupingStrategy::HashGroup,
GroupingStrategy::HashGroupConcurrent,
GroupingStrategy::SortGroup,
GroupingStrategy::SortGroupConcurrent,
];
for strat in &strats {
let repeat = 10;
let mut total_time: std::time::Duration = std::time::Duration::new(0, 0);
let mut total_max = 0;
let segments = store.segments();
for _ in 0..repeat {
let now = std::time::Instant::now();
let groups = segments.read_group_eq(
(1589000000000001, 1590044410000000),
&[],
vec!["env".to_string(), "role".to_string()],
vec![("counter".to_string(), AggregateType::Count)],
0,
strat,
);
total_time += now.elapsed();
total_max += groups.len();
}
println!(
"time_group_by_multi_agg_SORTED_count_{:?} ran {:?} in {:?} {:?} / call {:?}",
strat,
repeat,
total_time,
total_time / repeat,
total_max
);
}
}
fn time_window_agg_count(store: &Store) {
let strats = vec![
GroupingStrategy::HashGroup,
GroupingStrategy::HashGroupConcurrent,
GroupingStrategy::SortGroup,
GroupingStrategy::SortGroupConcurrent,
];
for strat in &strats {
let repeat = 10;
let mut total_time: std::time::Duration = std::time::Duration::new(0, 0);
let mut total_max = 0;
let segments = store.segments();
for _ in 0..repeat {
let now = std::time::Instant::now();
let groups = segments.read_group_eq(
(1589000000000001, 1590044410000000),
&[],
vec!["env".to_string(), "role".to_string()],
vec![("counter".to_string(), AggregateType::Count)],
60000000 * 10, // 10 minutes,
strat,
);
total_time += now.elapsed();
total_max += groups.len();
}
println!(
"time_window_agg_count {:?} ran {:?} in {:?} {:?} / call {:?}",
strat,
repeat,
total_time,
total_time / repeat,
total_max
);
}
}
//
// SHOW TAG KEYS WHERE time >= x and time < y AND "env" = 'prod01-eu-central-1'
fn time_tag_keys_with_pred(store: &Store) {
let repeat = 10;
let mut total_time: std::time::Duration = std::time::Duration::new(0, 0);
let mut track = 0;
let segments = store.segments();
for _ in 0..repeat {
let now = std::time::Instant::now();
let columns = segments.tag_keys(
(1588834080000000, 1590044410000000),
&[("env", "prod01-eu-central-1")],
);
total_time += now.elapsed();
track += columns.len();
// println!("{:?}", columns);
}
println!(
"time_tag_keys_with_pred ran {:?} in {:?} {:?} / call {:?}",
repeat,
total_time,
total_time / repeat,
track
);
}
//
// SHOW TAG VALUES ON "host", "method" WHERE time >= x and time < y AND "env" = 'prod01-us-west-1'
fn time_tag_values_with_pred(store: &Store) {
let repeat = 10;
let mut total_time: std::time::Duration = std::time::Duration::new(0, 0);
let mut track = 0;
let segments = store.segments();
for _ in 0..repeat {
let now = std::time::Instant::now();
let tag_values = segments.tag_values(
(1588834080000000, 1590044410000000),
&[("env", "prod01-us-west-2")],
&["host".to_string(), "method".to_string()],
);
total_time += now.elapsed();
track += tag_values.len();
}
println!(
"time_tag_values_with_pred ran {:?} in {:?} {:?} / call {:?}",
repeat,
total_time,
total_time / repeat,
track
);
}
// This is for a performance experiment where I wanted to show the performance
// change as more columns are grouped on.
//
// This only shows good peformance when the input file is ordered on all of the
// columns below.
fn time_group_by_different_columns(store: &Store) {
let strats = vec![
GroupingStrategy::HashGroup,
GroupingStrategy::HashGroupConcurrent,
GroupingStrategy::SortGroup,
GroupingStrategy::SortGroupConcurrent,
];
let cols = vec![
"status".to_string(),
"method".to_string(),
"url".to_string(),
"env".to_string(),
"handler".to_string(),
"role".to_string(),
"user_agent".to_string(),
"path".to_string(),
"nodename".to_string(),
"host".to_string(),
"hostname".to_string(),
];
for strat in &strats {
let repeat = 10;
let mut total_time: std::time::Duration = std::time::Duration::new(0, 0);
let segments = store.segments();
for i in 1..=cols.len() {
for _ in 0..repeat {
let now = std::time::Instant::now();
segments.read_group_eq(
(1589000000000001, 1590044410000000),
&[],
cols[0..i].to_vec(),
vec![("counter".to_string(), AggregateType::Count)],
0,
strat,
);
total_time += now.elapsed();
}
println!(
"time_group_by_different_columns{:?} cols: {:?} ran {:?} in {:?} {:?}",
strat,
i,
repeat,
total_time,
total_time / repeat,
);
}
}
}

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,49 @@
#![deny(rust_2018_idioms)]
#![allow(clippy::type_complexity)]
pub mod adapter;
pub mod column;
pub mod encoding;
pub mod segment;
pub mod sorter;
use delorean_arrow::arrow::datatypes::SchemaRef;
use segment::{Segment, Segments};
#[derive(Debug, Default)]
pub struct Store {
segments: Vec<Segment>,
/// Total size of the store, in bytes
store_size: usize,
}
impl Store {
pub fn add_segment(&mut self, segment: Segment) {
self.store_size += segment.size();
self.segments.push(segment);
}
/// The total size of all segments in the store, in bytes.
pub fn size(&self) -> usize {
self.store_size
}
pub fn segment_total(&self) -> usize {
self.segments.len()
}
pub fn segments(&self) -> Segments<'_> {
// let iter: std::slice::Iter<'a, Segment> = self.segments.iter();
// let segments = iter.collect::<Vec<&'a Segment>>();
Segments::new(self.segments.iter().collect::<Vec<&Segment>>())
}
pub fn schema(&self) -> SchemaRef {
assert!(
!self.segments.is_empty(),
"Need to have at least one segment in a store"
);
// assume all segments have the same schema
self.segments[0].schema()
}
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,206 @@
//! The sorter module provides a sort function which will sort a collection of
//! `Packer` columns by arbitrary columns. All sorting is done in ascending
//! order.
//!
//! `sorter::sort` implements Quicksort using Hoare's partitioning scheme (how
//! you choose the pivot). This partitioning scheme typically significantly
//! reduces the number of swaps necessary but it does have some drawbacks.
//!
//! Firstly, the worse case runtime of this implementation is `O(n^2)` when the
//! input set of columns are sorted according to the desired sort order. To
//! avoid that behaviour, a heuristic is used for inputs over a certain size;
//! large inputs are first linearly scanned to determine if the input is already
//! sorted.
//!
//! Secondly, the sort produced using this partitioning scheme is not stable.
//!
use std::cmp::Ordering;
use std::collections::BTreeSet;
use std::ops::Range;
use snafu::ensure;
use snafu::Snafu;
use super::column;
#[derive(Snafu, Debug, Clone, Copy, PartialEq)]
pub enum Error {
#[snafu(display(r#"Too many sort columns specified"#))]
TooManyColumns,
#[snafu(display(r#"Same column specified as sort column multiple times"#))]
RepeatedColumns { index: usize },
#[snafu(display(r#"Specified column index is out bounds"#))]
OutOfBoundsColumn { index: usize },
}
/// Any Packers inputs with more than this many rows will have a linear
/// comparison scan performed on them to ensure they're not already sorted.
const SORTED_CHECK_SIZE: usize = 1000;
/// Sort a slice of `Vector` based on the provided column indexes.
///
/// All chosen columns will be sorted in ascending order; the sort is *not*
/// stable.
pub fn sort(vectors: &mut [column::Vector], sort_by: &[usize]) -> Result<(), Error> {
if vectors.is_empty() || sort_by.is_empty() {
return Ok(());
}
ensure!(sort_by.len() <= vectors.len(), TooManyColumns);
let mut col_set = BTreeSet::new();
for &index in sort_by {
ensure!(col_set.insert(index), RepeatedColumns { index });
}
// TODO(edd): map first/last still unstable https://github.com/rust-lang/rust/issues/62924
if let Some(index) = col_set.range(vectors.len()..).next() {
return OutOfBoundsColumn { index: *index }.fail();
}
// Hoare's partitioning scheme can have quadratic runtime behaviour in
// the worst case when the inputs are already sorted. To avoid this, a
// check is added for large inputs.
let n = vectors[0].len();
if n > SORTED_CHECK_SIZE {
let mut sorted = true;
for i in 1..n {
if cmp(vectors, i - 1, i, sort_by) == Ordering::Greater {
sorted = false;
break;
}
}
if sorted {
log::debug!("columns already sorted");
return Ok(());
}
}
let now = std::time::Instant::now();
quicksort_by(vectors, 0..n - 1, sort_by);
log::debug!("sorted in {:?}", now.elapsed());
Ok(())
}
fn quicksort_by(vectors: &mut [column::Vector], range: Range<usize>, sort_by: &[usize]) {
if range.start >= range.end {
return;
}
let pivot = partition(vectors, &range, sort_by);
quicksort_by(vectors, range.start..pivot, sort_by);
quicksort_by(vectors, pivot + 1..range.end, sort_by);
}
fn partition(vectors: &mut [column::Vector], range: &Range<usize>, sort_by: &[usize]) -> usize {
let pivot = (range.start + range.end) / 2;
let (lo, hi) = (range.start, range.end);
if cmp(vectors, pivot as usize, lo as usize, sort_by) == Ordering::Less {
swap(vectors, lo as usize, pivot as usize);
}
if cmp(vectors, hi as usize, lo as usize, sort_by) == Ordering::Less {
swap(vectors, lo as usize, hi as usize);
}
if cmp(vectors, pivot as usize, hi as usize, sort_by) == Ordering::Less {
swap(vectors, hi as usize, pivot as usize);
}
let pivot = hi;
let mut i = range.start;
let mut j = range.end;
loop {
while cmp(vectors, i as usize, pivot as usize, sort_by) == Ordering::Less {
i += 1;
}
while cmp(vectors, j as usize, pivot as usize, sort_by) == Ordering::Greater {
j -= 1;
}
if i >= j {
return j;
}
swap(vectors, i as usize, j as usize);
i += 1;
j -= 1;
}
}
fn cmp(vectors: &[column::Vector], a: usize, b: usize, sort_by: &[usize]) -> Ordering {
for &idx in sort_by {
match &vectors[idx] {
column::Vector::Unsigned32(p) => {
let cmp = p.get(a).cmp(&p.get(b));
if cmp != Ordering::Equal {
return cmp;
}
// if cmp equal then try next vector.
}
column::Vector::Integer(p) => {
let cmp = p.get(a).cmp(&p.get(b));
if cmp != Ordering::Equal {
return cmp;
}
// if cmp equal then try next vector.
}
_ => unimplemented!("todo!"), // don't compare on non-string / timestamp cols
}
}
Ordering::Equal
}
#[allow(dead_code)]
fn vectors_sorted_asc(vectors: &[column::Vector], len: usize, sort_by: &[usize]) -> bool {
'row_wise: for i in 1..len {
for &idx in sort_by {
match &vectors[idx] {
column::Vector::Unsigned32(vec) => {
match vec[i - 1].cmp(&vec[i]) {
Ordering::Less => continue 'row_wise,
Ordering::Equal => continue,
Ordering::Greater => return false,
}
// if vec[i - 1] < vec[i] {
// continue 'row_wise;
// } else if vec[i - 1] == vec[i] {
// // try next column
// continue;
// } else {
// // value is > so
// return false;
// }
}
column::Vector::Integer(vec) => {
match vec[i - 1].cmp(&vec[i]) {
Ordering::Less => continue 'row_wise,
Ordering::Equal => continue,
Ordering::Greater => return false,
}
// if vec[i - 1] < vec[i] {
// continue 'row_wise;
// } else if vec[i - 1] == vec[i] {
// // try next column
// continue;
// } else {
// // value is > so
// return false;
// }
}
_ => unimplemented!("todo!"), // don't compare on non-string / timestamp cols
}
}
}
true
}
// Swap the same pair of elements in each packer column
fn swap(vectors: &mut [column::Vector], a: usize, b: usize) {
for p in vectors {
p.swap(a, b);
}
}

View File

@ -7,6 +7,7 @@
// soon... We'll see how long that actually takes... // soon... We'll see how long that actually takes...
use core::iter::Iterator; use core::iter::Iterator;
use std::iter; use std::iter;
use std::slice::Chunks;
use delorean_arrow::parquet::data_type::ByteArray; use delorean_arrow::parquet::data_type::ByteArray;
use std::default::Default; use std::default::Default;
@ -44,7 +45,16 @@ macro_rules! typed_packer_accessors {
}; };
} }
impl Packers { impl<'a> Packers {
pub fn chunk_values(&self, chunk_size: usize) -> PackerChunker<'_> {
match self {
Self::Float(p) => PackerChunker::Float(p.values.chunks(chunk_size)),
Self::Integer(p) => PackerChunker::Integer(p.values.chunks(chunk_size)),
Self::String(p) => PackerChunker::String(p.values.chunks(chunk_size)),
Self::Boolean(p) => PackerChunker::Boolean(p.values.chunks(chunk_size)),
}
}
/// Create a String Packers with repeated values. /// Create a String Packers with repeated values.
pub fn from_elem_str(v: &str, n: usize) -> Self { pub fn from_elem_str(v: &str, n: usize) -> Self {
Self::String(Packer::from(vec![ByteArray::from(v); n])) Self::String(Packer::from(vec![ByteArray::from(v); n]))
@ -205,6 +215,15 @@ impl std::convert::From<Vec<Option<Vec<u8>>>> for Packers {
} }
} }
/// PackerChunker represents chunkable Packer variants.
#[derive(Debug)]
pub enum PackerChunker<'a> {
Float(Chunks<'a, Option<f64>>),
Integer(Chunks<'a, Option<i64>>),
String(Chunks<'a, Option<ByteArray>>),
Boolean(Chunks<'a, Option<bool>>),
}
#[derive(Debug, Default, PartialEq)] #[derive(Debug, Default, PartialEq)]
pub struct Packer<T> pub struct Packer<T>
where where
@ -215,7 +234,7 @@ where
impl<T> Packer<T> impl<T> Packer<T>
where where
T: Default + Clone, T: Default + Clone + std::fmt::Debug,
{ {
pub fn new() -> Self { pub fn new() -> Self {
Self { values: Vec::new() } Self { values: Vec::new() }
@ -259,7 +278,13 @@ where
&self.values &self.values
} }
/// returns a binary vector indicating which indexes have null values. /// Returns an iterator that emits `chunk_size` values from the Packer until
/// all values are returned.
pub fn chunk_values(&self, chunk_size: usize) -> std::slice::Chunks<'_, Option<T>> {
self.values.chunks(chunk_size)
}
/// Returns a binary vector indicating which indexes have null values.
pub fn def_levels(&self) -> Vec<i16> { pub fn def_levels(&self) -> Vec<i16> {
self.values self.values
.iter() .iter()
@ -333,7 +358,7 @@ where
impl<'a, T> Iterator for PackerIterator<'a, T> impl<'a, T> Iterator for PackerIterator<'a, T>
where where
T: Default + Clone, T: Default + Clone + std::fmt::Debug,
{ {
type Item = Option<&'a T>; type Item = Option<&'a T>;
@ -365,7 +390,7 @@ where
// `Packer<T>` value, e.g., `Packer<f64>`. // `Packer<T>` value, e.g., `Packer<f64>`.
impl<T> std::convert::From<Vec<Option<T>>> for Packer<T> impl<T> std::convert::From<Vec<Option<T>>> for Packer<T>
where where
T: Default + Clone, T: Default + Clone + std::fmt::Debug,
{ {
fn from(values: Vec<Option<T>>) -> Self { fn from(values: Vec<Option<T>>) -> Self {
let mut packer = Self::new(); let mut packer = Self::new();
@ -376,6 +401,21 @@ where
} }
} }
// Convert `&[<Option<T>]`, e.g., `&[Option<f64>]` into the appropriate
// `Packer<T>` value, e.g., `Packer<f64>`.
impl<T> std::convert::From<&[Option<T>]> for Packer<T>
where
T: Default + Clone + std::fmt::Debug,
{
fn from(values: &[Option<T>]) -> Self {
let mut packer = Self::new();
for v in values {
packer.push_option(v.clone());
}
packer
}
}
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use super::*; use super::*;

View File

@ -67,7 +67,7 @@ pub fn sort(packers: &mut [Packers], sort_by: &[usize]) -> Result<(), Error> {
if n > SORTED_CHECK_SIZE { if n > SORTED_CHECK_SIZE {
let mut sorted = true; let mut sorted = true;
for i in 1..n { for i in 1..n {
if cmp(packers, 0, i, sort_by) != Ordering::Equal { if cmp(packers, i - 1, i, sort_by) == Ordering::Greater {
sorted = false; sorted = false;
break; break;
} }
@ -76,9 +76,14 @@ pub fn sort(packers: &mut [Packers], sort_by: &[usize]) -> Result<(), Error> {
if sorted { if sorted {
return Ok(()); return Ok(());
} }
// if packers_sorted_asc(packers, n, sort_by) {
// return Ok(());
// }
// return Ok(());
} }
let now = std::time::Instant::now();
quicksort_by(packers, 0..n - 1, sort_by); quicksort_by(packers, 0..n - 1, sort_by);
println!("sorted in {:?}", now.elapsed());
Ok(()) Ok(())
} }
@ -152,9 +157,9 @@ fn cmp(packers: &[Packers], a: usize, b: usize, sort_by: &[usize]) -> Ordering {
Packers::Integer(p) => { Packers::Integer(p) => {
let cmp = p.get(a).cmp(&p.get(b)); let cmp = p.get(a).cmp(&p.get(b));
if cmp != Ordering::Equal { if cmp != Ordering::Equal {
// if cmp equal then try next packer column.
return cmp; return cmp;
} }
// if cmp equal then try next packer column.
} }
_ => continue, // don't compare on non-string / timestamp cols _ => continue, // don't compare on non-string / timestamp cols
} }
@ -162,6 +167,38 @@ fn cmp(packers: &[Packers], a: usize, b: usize, sort_by: &[usize]) -> Ordering {
Ordering::Equal Ordering::Equal
} }
#[allow(dead_code)]
fn packers_sorted_asc(packers: &[Packers], len: usize, sort_by: &[usize]) -> bool {
'row_wise: for i in 1..len {
for &idx in sort_by {
match &packers[idx] {
Packers::String(p) => {
let vec = p.values();
if vec[i - 1] < vec[i] {
continue 'row_wise;
} else if vec[i - 1] == vec[i] {
// try next column
continue;
} else {
// value is > so
return false;
}
}
Packers::Integer(p) => {
let vec = p.values();
match vec[i - 1].cmp(&vec[i]) {
Ordering::Less => continue 'row_wise,
Ordering::Equal => continue,
Ordering::Greater => return false,
}
}
_ => continue, // don't compare on non-string / timestamp cols
}
}
}
true
}
// Swap the same pair of elements in each packer column // Swap the same pair of elements in each packer column
fn swap(packers: &mut [Packers], a: usize, b: usize) { fn swap(packers: &mut [Packers], a: usize, b: usize) {
for p in packers { for p in packers {