Merge branch 'main' into cn/query-tests-grpc

pull/24376/head
kodiakhq[bot] 2023-01-18 19:03:51 +00:00 committed by GitHub
commit 33168b97f0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
36 changed files with 1562 additions and 710 deletions

527
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -7,6 +7,7 @@ members = [
"clap_blocks",
"client_util",
"compactor",
"compactor2",
"data_types",
"datafusion_util",
"dml",
@ -35,6 +36,7 @@ members = [
"iox_time",
"ioxd_common",
"ioxd_compactor",
"ioxd_compactor2",
"ioxd_ingester",
"ioxd_ingester2",
"ioxd_garbage_collector",
@ -114,16 +116,12 @@ edition = "2021"
license = "MIT OR Apache-2.0"
[workspace.dependencies]
arrow = { version = "29.0.0" }
arrow-flight = { version = "29.0.0" }
#datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev="07f49803a3d7a9e9b3c2c9a7714c1bb08db71385", default-features = false }
#datafusion-proto = { git = "https://github.com/apache/arrow-datafusion.git", rev="07f49803a3d7a9e9b3c2c9a7714c1bb08db71385" }
# Temporary patch to https://github.com/alamb/arrow-datafusion/tree/alamb/patched_for_iox
# See https://github.com/alamb/arrow-datafusion/pull/7 for details
datafusion = { git = "https://github.com/alamb/arrow-datafusion.git", branch="alamb/patched_for_iox", default-features = false }
datafusion-proto = { git = "https://github.com/alamb/arrow-datafusion.git", branch="alamb/patched_for_iox" }
arrow = { version = "30.0.0" }
arrow-flight = { version = "30.0.0" }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev="279440b2ab92d18675b8102e342d4d82182287dc", default-features = false }
datafusion-proto = { git = "https://github.com/apache/arrow-datafusion.git", rev="279440b2ab92d18675b8102e342d4d82182287dc" }
hashbrown = { version = "0.13.2" }
parquet = { version = "29.0.0" }
parquet = { version = "30.0.0" }
# This profile optimizes for runtime performance and small binary size at the expense of longer
# build times. It's most suitable for final release builds.
@ -148,9 +146,4 @@ incremental = true
opt-level = 3
[profile.dev.package.similar]
opt-level = 3
[patch.crates-io]
# remove and bump object_store dep version once this revision is released.
# patch for https://github.com/influxdata/idpe/issues/16611
object_store = { git = 'https://github.com/apache/arrow-rs.git', rev = "f5c165acc0e6cc4b34e0eaea006aab7e5bd28d66", package = "object_store" }
opt-level = 3

View File

@ -1,294 +1,16 @@
//! CLI config for compactor2-related commands
// This is mostly the same as clap_blocks/src/compactor.rs except compactor2 doesn't have
// `--write-buffer-topic`, `--shard-index-range-start`, or `--shard-index-range-end`.
// Also, there isn't a `compactor2 run-once` command yet.
use std::num::NonZeroUsize;
#![cfg_attr(rustfmt, rustfmt_skip)] // https://github.com/rust-lang/rustfmt/issues/5489
/// Create compactor configuration that can have different defaults. The `run compactor`
/// server/service needs different defaults than the `compactor run-once` command, and this macro
/// enables sharing of the parts of the configs that are the same without duplicating the code.
macro_rules! gen_compactor_config {
(
$name:ident,
// hot_multiple is currently the only flag that has a differing default. Add more macro
// arguments similar to this one if more flags need different defaults.
hot_multiple_default = $hot_multiple_default:literal
$(,)?
) => {
/// CLI config for compactor
#[derive(Debug, Clone, Copy, clap::Parser)]
pub struct $name {
/// Desired max size of compacted parquet files.
/// It is a target desired value, rather than a guarantee.
/// 1024 * 1024 * 25 = 26,214,400 (25MB)
#[clap(
long = "compaction-max-desired-size-bytes",
env = "INFLUXDB_IOX_COMPACTION_MAX_DESIRED_FILE_SIZE_BYTES",
default_value = "26214400",
action
)]
pub max_desired_file_size_bytes: u64,
/// Percentage of desired max file size.
/// If the estimated compacted result is too small, no need to split it.
/// This percentage is to determine how small it is:
/// < percentage_max_file_size * max_desired_file_size_bytes:
/// This value must be between (0, 100)
/// Default is 80
#[clap(
long = "compaction-percentage-max-file_size",
env = "INFLUXDB_IOX_COMPACTION_PERCENTAGE_MAX_FILE_SIZE",
default_value = "80",
action
)]
pub percentage_max_file_size: u16,
/// Split file percentage
/// If the estimated compacted result is neither too small nor too large, it will be
/// split into 2 files determined by this percentage.
/// . Too small means: < percentage_max_file_size * max_desired_file_size_bytes
/// . Too large means: > max_desired_file_size_bytes
/// . Any size in the middle will be considered neither too small nor too large
///
/// This value must be between (0, 100)
/// Default is 80
#[clap(
long = "compaction-split-percentage",
env = "INFLUXDB_IOX_COMPACTION_SPLIT_PERCENTAGE",
default_value = "80",
action
)]
pub split_percentage: u16,
/// Max number of partitions per shard we want to compact per cycle
/// Default: 1
#[clap(
long = "compaction-max-number-partitions-per-shard",
env = "INFLUXDB_IOX_COMPACTION_MAX_NUMBER_PARTITIONS_PER_SHARD",
default_value = "1",
action
)]
pub max_number_partitions_per_shard: usize,
/// Min number of recent ingested files a partition needs to be considered for
/// compacting
///
/// Default: 1
#[clap(
long = "compaction-min-number-recent-ingested-files-per-partition",
env = "INFLUXDB_IOX_COMPACTION_MIN_NUMBER_RECENT_INGESTED_FILES_PER_PARTITION",
default_value = "1",
action
)]
pub min_number_recent_ingested_files_per_partition: usize,
/// The multiple of times that compacting hot partitions should run for every one time
/// that compacting cold partitions runs. Set to 1 to compact hot partitions and cold
/// partitions equally.
///
/// Default is
#[doc = $hot_multiple_default]
#[clap(
long = "compaction-hot-multiple",
env = "INFLUXDB_IOX_COMPACTION_HOT_MULTIPLE",
default_value = $hot_multiple_default,
action
)]
pub hot_multiple: usize,
/// The multiple of times that compacting warm partitions should run for every one time
/// that compacting cold partitions runs. Set to 1 to compact warm partitions and cold
/// partitions equally.
///
/// Default is 1
#[clap(
long = "compaction-warm-multiple",
env = "INFLUXDB_IOX_COMPACTION_WARM_MULTIPLE",
default_value = "1",
action
)]
pub warm_multiple: usize,
/// The memory budget assigned to this compactor.
///
/// For each partition candidate, we will estimate the memory needed to compact each
/// file and only add more files if their needed estimated memory is below this memory
/// budget. Since we must compact L1 files that overlapped with L0 files, if their
/// total estimated memory does not allow us to compact a part of a partition at all,
/// we will not compact it and will log the partition and its related information in a
/// table in our catalog for further diagnosis of the issue.
///
/// The number of candidates compacted concurrently is also decided using this
/// estimation and budget.
///
/// Default is 30 * 1024 * 1024 * 1024 = 32,212,254,720 bytes (30GB).
#[clap(
long = "compaction-memory-budget-bytes",
env = "INFLUXDB_IOX_COMPACTION_MEMORY_BUDGET_BYTES",
default_value = "32212254720",
action
)]
pub memory_budget_bytes: u64,
/// Minimum number of rows allocated for each record batch fed into DataFusion plan
///
/// We will use max(parquet_file's row_count, min_num_rows_allocated_per_record_batch_to_datafusion_plan)
/// to estimate number of rows allocated for each record batch fed into DataFusion plan.
///
#[clap(
long = "compaction-min-rows-allocated-per-record-batch-to-plan",
env = "INFLUXDB_IOX_COMPACTION_MIN_ROWS_PER_RECORD_BATCH_TO_PLAN",
default_value = "8192",
action
)]
pub min_num_rows_allocated_per_record_batch_to_datafusion_plan: u64,
/// Max number of files to compact per partition
///
/// Due to limitations of the implementation of the compaction plans
/// there is a hard maximum on the number of files that can be compacted
/// at once. This avoids a wide fan-in multi-way merge in the DataFusion plan
#[clap(
long = "compaction-max-num-compacting-files",
env = "INFLUXDB_IOX_COMPACTION_MAX_COMPACTING_FILES",
default_value = "20",
action
)]
pub max_num_compacting_files: usize,
/// Max number of files to compact for a partition in which the first file and its
/// overlaps push the file count limit over `max_num_compacting_files`.
/// It's a special case of `max_num_compacting_files` that's higher just for the first
/// file in a partition
#[clap(
long = "compaction-max-num-compacting-files-first-in-partition",
env = "INFLUXDB_IOX_COMPACTION_MAX_COMPACTING_FILES_FIRST_IN_PARTITION",
default_value = "40",
action
)]
pub max_num_compacting_files_first_in_partition: usize,
/// Number of minutes without a write to a partition before it is considered cold
/// and thus a candidate for compaction
#[clap(
long = "compaction-minutes-without-new-writes-to-be-cold",
env = "INFLUXDB_IOX_COMPACTION_MINUTES_WITHOUT_NEW_WRITE_TO_BE_COLD",
default_value = "480",
action
)]
pub minutes_without_new_writes_to_be_cold: u64,
/// When select cold partition candidates, partitions with new created files (any level) after
/// this threshold will be considered a candidate. However, only partitions without new writes
/// after this minutes_without_new_writes_to_be_cold will get compacted
#[clap(
long = "compaction-cold-partition_candidate-hours-threshold",
env = "INFLUXDB_IOX_COMPACTION_COLD_PARTITION_CANDIDATES_HOURS_THRESHOLD",
default_value = "24",
action
)]
pub cold_partition_candidates_hours_threshold: u64,
/// When querying for partitions with data for hot compaction, how many hours to look
/// back for a first pass.
#[clap(
long = "compaction-hot-partition-hours-threshold-1",
env = "INFLUXDB_IOX_COMPACTION_HOT_PARTITION_HOURS_THRESHOLD_1",
default_value = "4",
action
)]
pub hot_compaction_hours_threshold_1: u64,
/// When querying for partitions with data for hot compaction, how many hours to look
/// back for a second pass if we found nothing in the first pass.
#[clap(
long = "compaction-hot-partition-hours-threshold-2",
env = "INFLUXDB_IOX_COMPACTION_HOT_PARTITION_HOURS_THRESHOLD_2",
default_value = "24",
action
)]
pub hot_compaction_hours_threshold_2: u64,
/// Max number of partitions that can be compacted in parallel at once
/// We use memory budget to estimate how many partitions can be compacted in parallel at once.
/// However, we do not want to have that number too large which will cause the high usage of CPU cores
/// and may also lead to inaccuracy of memory estimation. This number is to cap that.
#[clap(
long = "compaction-max-parallel-partitions",
env = "INFLUXDB_IOX_COMPACTION_MAX_PARALLEL_PARTITIONS",
default_value = "20",
action
)]
pub max_parallel_partitions: u64,
/// When select warm partition candidates, partitions with new created files (any level) after
/// this threshold will be considered a candidate. However, only partitions with many contiguous small
/// L1 files will get warm compacted
#[clap(
long = "compaction-warm-partition_candidate-hours-threshold",
env = "INFLUXDB_IOX_COMPACTION_WARM_PARTITION_CANDIDATES_HOURS_THRESHOLD",
default_value = "24",
action
)]
pub warm_partition_candidates_hours_threshold: u64,
/// When querying for partitions suitable for warm compaction, this is the
/// upper bound on file size to be counted as "small".
/// Default is half of max_desired_file_size_bytes's default (see above).
#[clap(
long = "compaction-warm-small-size-threshold-bytes",
env = "INFLUXDB_IOX_COMPACTION_WARM_SMALL_SIZE_THRESHOLD_BYTES",
default_value = "13107200",
action
)]
pub warm_compaction_small_size_threshold_bytes: i64,
/// When querying for partitions suitable for warm compaction, this is the minimum
/// number of "small" files a partition must have in order for it to be selected
/// as a candidate for warm compaction.
#[clap(
long = "compaction-warm-min-small-file-count",
env = "INFLUXDB_IOX_COMPACTION_WARM_MIN_SMALL_FILE_COUNT",
default_value = "10",
action
)]
pub warm_compaction_min_small_file_count: usize,
}
};
}
gen_compactor_config!(Compactor2Config, hot_multiple_default = "4");
gen_compactor_config!(Compactor2OnceConfig, hot_multiple_default = "1");
impl Compactor2OnceConfig {
/// Convert the configuration for `compactor run-once` into the configuration for `run
/// compactor` so that run-once can reuse some of the code that the compactor server uses.
pub fn into_compactor_config(self) -> Compactor2Config {
Compactor2Config {
max_desired_file_size_bytes: self.max_desired_file_size_bytes,
percentage_max_file_size: self.percentage_max_file_size,
split_percentage: self.split_percentage,
max_number_partitions_per_shard: self.max_number_partitions_per_shard,
min_number_recent_ingested_files_per_partition: self
.min_number_recent_ingested_files_per_partition,
hot_multiple: self.hot_multiple,
warm_multiple: self.warm_multiple,
memory_budget_bytes: self.memory_budget_bytes,
min_num_rows_allocated_per_record_batch_to_datafusion_plan: self
.min_num_rows_allocated_per_record_batch_to_datafusion_plan,
max_num_compacting_files: self.max_num_compacting_files,
max_num_compacting_files_first_in_partition: self.max_num_compacting_files_first_in_partition,
minutes_without_new_writes_to_be_cold: self.minutes_without_new_writes_to_be_cold,
cold_partition_candidates_hours_threshold: self.cold_partition_candidates_hours_threshold,
hot_compaction_hours_threshold_1: self.hot_compaction_hours_threshold_1,
hot_compaction_hours_threshold_2: self.hot_compaction_hours_threshold_2,
max_parallel_partitions: self.max_parallel_partitions,
warm_partition_candidates_hours_threshold: self.warm_partition_candidates_hours_threshold,
warm_compaction_small_size_threshold_bytes: self.warm_compaction_small_size_threshold_bytes,
warm_compaction_min_small_file_count: self.warm_compaction_min_small_file_count,
}
}
/// CLI config for compactor2
#[derive(Debug, Clone, Copy, clap::Parser)]
pub struct Compactor2Config {
/// Number of partitions that should be compacted in parallel.
#[clap(
long = "compaction-partition-concurrency",
env = "INFLUXDB_IOX_COMPACTION_PARTITION_CONCURRENCY",
default_value = "10",
action
)]
pub compaction_partition_concurrency: NonZeroUsize,
}

23
compactor2/Cargo.toml Normal file
View File

@ -0,0 +1,23 @@
[package]
name = "compactor2"
version.workspace = true
authors.workspace = true
edition.workspace = true
license.workspace = true
[dependencies]
data_types = { path = "../data_types" }
futures = "0.3"
iox_catalog = { path = "../iox_catalog" }
iox_query = { path = "../iox_query" }
iox_time = { path = "../iox_time" }
metric = { path = "../metric" }
observability_deps = { path = "../observability_deps" }
parquet_file = { path = "../parquet_file" }
rand = "0.8.3"
snafu = "0.7"
tokio = { version = "1", features = ["macros", "rt", "sync"] }
tokio-util = { version = "0.7.4" }
workspace-hack = { path = "../workspace-hack"}
[dev-dependencies]

23
compactor2/src/compact.rs Normal file
View File

@ -0,0 +1,23 @@
//! Actual compaction routine.
use std::sync::Arc;
use data_types::ParquetFile;
use iox_catalog::interface::Catalog;
use snafu::Snafu;
/// Compaction errors.
#[derive(Debug, Snafu)]
#[allow(missing_copy_implementations)]
pub enum Error {}
/// Perform compaction on given files including catalog transaction.
///
/// This MUST use all files. No further filtering is performed here.
pub async fn compact_files(
_files: &[ParquetFile],
_catalog: &Arc<dyn Catalog>,
) -> Result<(), Error> {
// TODO: implement this
// TODO: split this into catalog actual DF execution and catalog bookkeeping
Ok(())
}

View File

@ -0,0 +1,69 @@
//! Main compactor entry point.
use std::sync::Arc;
use futures::{
future::{BoxFuture, Shared},
FutureExt, TryFutureExt,
};
use observability_deps::tracing::warn;
use tokio::task::{JoinError, JoinHandle};
use tokio_util::sync::CancellationToken;
use crate::{config::Config, driver::compact, rules::hardcoded_rules};
/// A [`JoinHandle`] that can be cloned
type SharedJoinHandle = Shared<BoxFuture<'static, Result<(), Arc<JoinError>>>>;
/// Convert a [`JoinHandle`] into a [`SharedJoinHandle`].
fn shared_handle(handle: JoinHandle<()>) -> SharedJoinHandle {
handle.map_err(Arc::new).boxed().shared()
}
/// Main compactor driver.
#[derive(Debug)]
pub struct Compactor2 {
shutdown: CancellationToken,
worker: SharedJoinHandle,
}
impl Compactor2 {
/// Start compactor.
pub fn start(config: Config) -> Self {
let shutdown = CancellationToken::new();
let shutdown_captured = shutdown.clone();
let rules = hardcoded_rules();
let worker = tokio::spawn(async move {
tokio::select! {
_ = shutdown_captured.cancelled() => {}
_ = async {
loop {
compact(&config, &rules).await;
}
} => unreachable!(),
}
});
let worker = shared_handle(worker);
Self { shutdown, worker }
}
/// Trigger shutdown. You should [join](Self::join) afterwards.
pub fn shutdown(&self) {
self.shutdown.cancel();
}
/// Wait until the compactor finishes.
pub async fn join(&self) -> Result<(), Arc<JoinError>> {
self.worker.clone().await
}
}
impl Drop for Compactor2 {
fn drop(&mut self) {
if self.worker.clone().now_or_never().is_none() {
warn!("Compactor was not shut down properly");
}
}
}

29
compactor2/src/config.rs Normal file
View File

@ -0,0 +1,29 @@
//! Config-related stuff.
use std::{num::NonZeroUsize, sync::Arc};
use iox_catalog::interface::Catalog;
use iox_query::exec::Executor;
use iox_time::TimeProvider;
use parquet_file::storage::ParquetStorage;
/// Config to set up a compactor.
#[derive(Debug, Clone)]
pub struct Config {
/// Metric registry.
pub metric_registry: Arc<metric::Registry>,
/// Central catalog.
pub catalog: Arc<dyn Catalog>,
/// Store holding the parquet files.
pub parquet_store: ParquetStorage,
/// Executor.
pub exec: Arc<Executor>,
/// Time provider.
pub time_provider: Arc<dyn TimeProvider>,
/// Number of partitions that should be compacted in parallel.
pub partition_concurrency: NonZeroUsize,
}

88
compactor2/src/driver.rs Normal file
View File

@ -0,0 +1,88 @@
use std::sync::Arc;
use data_types::{ParquetFile, PartitionId};
use futures::StreamExt;
use iox_catalog::interface::Catalog;
use observability_deps::tracing::{error, info};
use rand::{rngs::StdRng, seq::SliceRandom, SeedableRng};
use crate::{compact::compact_files, config::Config, rules::Rules};
pub async fn compact(config: &Config, rules: &Arc<Rules>) {
let partition_ids = get_partition_ids(&config.catalog).await;
// TODO: implementing ID-based sharding / hash-partitioning so we can run multiple compactors in parallel
let partition_ids = randomize_partition_order(partition_ids, 1234);
futures::stream::iter(partition_ids)
.map(|partition_id| {
let config = config.clone();
let rules = Arc::clone(rules);
async move {
let files = get_parquet_files(&config.catalog, partition_id).await;
let files = files
.into_iter()
.filter(|file| rules.file_filters.iter().all(|filter| filter.apply(file)))
.collect::<Vec<_>>();
if !rules
.partition_filters
.iter()
.all(|filter| filter.apply(&files))
{
return;
}
if files.is_empty() {
return;
}
if let Err(e) = compact_files(&files, &config.catalog).await {
error!(
%e,
partition_id=partition_id.get(),
"Error while compacting partition",
);
// TODO: mark partition as "cannot compact"
return;
}
info!(
input_size = files.iter().map(|f| f.file_size_bytes).sum::<i64>(),
input_files = files.len(),
partition_id = partition_id.get(),
"Compacted partition",
);
}
})
.buffer_unordered(config.partition_concurrency.get())
.collect::<()>()
.await;
}
/// Get partiion IDs from catalog.
///
/// This method performs retries.
///
/// This should only perform basic filtering. It MUST NOT inspect individual parquet files.
async fn get_partition_ids(_catalog: &Arc<dyn Catalog>) -> Vec<PartitionId> {
// TODO: get partition IDs from catalog, wrapped by retry
vec![]
}
/// Get parquet files for given partition.
///
/// This method performs retries.
async fn get_parquet_files(
_catalog: &Arc<dyn Catalog>,
_partition: PartitionId,
) -> Vec<ParquetFile> {
// TODO: get files from from catalog, wrapped by retry
vec![]
}
fn randomize_partition_order(mut partitions: Vec<PartitionId>, seed: u64) -> Vec<PartitionId> {
let mut rng = StdRng::seed_from_u64(seed);
partitions.shuffle(&mut rng);
partitions
}

18
compactor2/src/lib.rs Normal file
View File

@ -0,0 +1,18 @@
//! The compactor.
#![deny(rustdoc::broken_intra_doc_links, rust_2018_idioms)]
#![warn(
missing_copy_implementations,
missing_docs,
clippy::explicit_iter_loop,
clippy::future_not_send,
clippy::use_self,
clippy::clone_on_ref_ptr,
clippy::todo,
clippy::dbg_macro
)]
pub mod compact;
pub mod compactor;
pub mod config;
mod driver;
mod rules;

View File

@ -0,0 +1,9 @@
use std::fmt::Debug;
use data_types::ParquetFile;
pub trait FileFilter: Debug + Send + Sync {
fn apply(&self, file: &ParquetFile) -> bool;
fn name(&self) -> &'static str;
}

View File

@ -0,0 +1,22 @@
use std::sync::Arc;
use self::{file_filter::FileFilter, partition_filter::PartitionFilter};
pub mod file_filter;
pub mod partition_filter;
#[derive(Debug)]
pub struct Rules {
pub file_filters: Vec<Arc<dyn FileFilter>>,
pub partition_filters: Vec<Arc<dyn PartitionFilter>>,
}
/// Get hardcoded rules.
///
/// TODO: make this a runtime config
pub fn hardcoded_rules() -> Arc<Rules> {
Arc::new(Rules {
file_filters: vec![],
partition_filters: vec![],
})
}

View File

@ -0,0 +1,9 @@
use std::fmt::Debug;
use data_types::ParquetFile;
pub trait PartitionFilter: Debug + Send + Sync {
fn apply(&self, files: &[ParquetFile]) -> bool;
fn name(&self) -> &'static str;
}

View File

@ -21,6 +21,7 @@ iox_catalog = { path = "../iox_catalog" }
iox_arrow_flight = { path = "../iox_arrow_flight" }
ioxd_common = { path = "../ioxd_common"}
ioxd_compactor = { path = "../ioxd_compactor"}
ioxd_compactor2 = { path = "../ioxd_compactor2"}
ioxd_ingest_replica = { path = "../ioxd_ingest_replica" }
ioxd_ingester = { path = "../ioxd_ingester"}
ioxd_ingester2 = { path = "../ioxd_ingester2"}

View File

@ -16,7 +16,7 @@ use clap_blocks::{
};
use ioxd_common::server_type::{CommonServerState, CommonServerStateError};
use ioxd_common::Service;
use ioxd_compactor::create_compactor2_server_type;
use ioxd_compactor2::create_compactor2_server_type;
use crate::process_info::{setup_metric_registry, USIZE_MAX};
@ -135,8 +135,7 @@ pub async fn command(config: Config) -> Result<(), Error> {
exec,
time_provider,
config.compactor_config,
)
.await?;
);
info!("starting compactor");

View File

@ -7,11 +7,10 @@ use futures::{Stream, StreamExt};
use generated_types::influxdata::iox::ingester::v1::{self as proto, PartitionStatus};
use iox_arrow_flight::{
encode::{
prepare_batch_for_flight, prepare_schema_for_flight, split_batch_for_grpc_response,
GRPC_TARGET_MAX_BATCH_SIZE_BYTES,
flight_data_from_arrow_batch, prepare_batch_for_flight, prepare_schema_for_flight,
split_batch_for_grpc_response, GRPC_TARGET_MAX_BATCH_SIZE_BYTES,
},
flight_service_server::FlightService as Flight,
utils::flight_data_from_arrow_batch,
Action, ActionType, Criteria, Empty, FlightData, FlightDescriptor, FlightInfo,
HandshakeRequest, HandshakeResponse, IpcMessage, PutResult, SchemaAsIpc, SchemaResult, Ticket,
};
@ -435,7 +434,7 @@ impl Stream for FlightFrameCodec {
let flight_data = FlightData::new(
None,
IpcMessage(build_none_flight_msg()),
IpcMessage(build_none_flight_msg().into()),
bytes.to_vec(),
vec![],
);
@ -477,6 +476,7 @@ fn build_none_flight_msg() -> Vec<u8> {
#[cfg(test)]
mod tests {
use arrow::{error::ArrowError, ipc::MessageHeader};
use bytes::Bytes;
use data_types::PartitionId;
use futures::StreamExt;
use generated_types::influxdata::iox::ingester::v1::{self as proto};
@ -649,7 +649,9 @@ mod tests {
let mut flight =
FlightService::new(MockQueryExec::default(), 100, &metric::Registry::default());
let req = tonic::Request::new(Ticket { ticket: vec![] });
let req = tonic::Request::new(Ticket {
ticket: Bytes::new(),
});
match flight.do_get(req).await {
Ok(_) => panic!("expected error because of invalid ticket"),
Err(s) => {
@ -659,7 +661,9 @@ mod tests {
flight.request_sem = Semaphore::new(0);
let req = tonic::Request::new(Ticket { ticket: vec![] });
let req = tonic::Request::new(Ticket {
ticket: Bytes::new(),
});
match flight.do_get(req).await {
Ok(_) => panic!("expected error because of request limit"),
Err(s) => {

View File

@ -13,7 +13,7 @@ use flatbuffers::FlatBufferBuilder;
use futures::Stream;
use generated_types::influxdata::iox::ingester::v1::{self as proto};
use iox_arrow_flight::{
flight_service_server::FlightService as Flight, utils::flight_data_from_arrow_batch, Action,
encode::flight_data_from_arrow_batch, flight_service_server::FlightService as Flight, Action,
ActionType, Criteria, Empty, FlightData, FlightDescriptor, FlightInfo, HandshakeRequest,
HandshakeResponse, IpcMessage, PutResult, SchemaAsIpc, SchemaResult, Ticket,
};
@ -332,7 +332,7 @@ impl Stream for GetStream {
let flight_data = FlightData::new(
None,
IpcMessage(build_none_flight_msg()),
IpcMessage(build_none_flight_msg().into()),
bytes.to_vec(),
vec![],
);

View File

@ -7,11 +7,10 @@ use futures::{Stream, StreamExt};
use generated_types::influxdata::iox::ingester::v1::{self as proto, PartitionStatus};
use iox_arrow_flight::{
encode::{
prepare_batch_for_flight, prepare_schema_for_flight, split_batch_for_grpc_response,
GRPC_TARGET_MAX_BATCH_SIZE_BYTES,
flight_data_from_arrow_batch, prepare_batch_for_flight, prepare_schema_for_flight,
split_batch_for_grpc_response, GRPC_TARGET_MAX_BATCH_SIZE_BYTES,
},
flight_service_server::FlightService as Flight,
utils::flight_data_from_arrow_batch,
Action, ActionType, Criteria, Empty, FlightData, FlightDescriptor, FlightInfo,
HandshakeRequest, HandshakeResponse, IpcMessage, PutResult, SchemaAsIpc, SchemaResult, Ticket,
};
@ -442,7 +441,7 @@ impl Stream for FlightFrameCodec {
let flight_data = FlightData::new(
None,
IpcMessage(build_none_flight_msg()),
IpcMessage(build_none_flight_msg().into()),
bytes.to_vec(),
vec![],
);
@ -484,6 +483,7 @@ fn build_none_flight_msg() -> Vec<u8> {
#[cfg(test)]
mod tests {
use arrow::{error::ArrowError, ipc::MessageHeader};
use bytes::Bytes;
use data_types::PartitionId;
use futures::StreamExt;
use generated_types::influxdata::iox::ingester::v1::{self as proto};
@ -660,7 +660,9 @@ mod tests {
&metric::Registry::default(),
);
let req = tonic::Request::new(Ticket { ticket: vec![] });
let req = tonic::Request::new(Ticket {
ticket: Bytes::new(),
});
match flight.do_get(req).await {
Ok(_) => panic!("expected error because of invalid ticket"),
Err(s) => {
@ -670,7 +672,9 @@ mod tests {
flight.request_sem = Semaphore::new(0);
let req = tonic::Request::new(Ticket { ticket: vec![] });
let req = tonic::Request::new(Ticket {
ticket: Bytes::new(),
});
match flight.do_get(req).await {
Ok(_) => panic!("expected error because of request limit"),
Err(s) => {

View File

@ -108,7 +108,7 @@ impl FlightClient {
pub async fn handshake(&mut self, payload: Vec<u8>) -> Result<Vec<u8>> {
let request = HandshakeRequest {
protocol_version: 0,
payload,
payload: payload.into(),
};
let mut response_stream = self
@ -128,7 +128,7 @@ impl FlightClient {
));
}
Ok(response.payload)
Ok(response.payload.to_vec())
} else {
Err(FlightError::protocol("No response from handshake"))
}
@ -138,7 +138,9 @@ impl FlightClient {
/// returning a [`FlightRecordBatchStream`] for reading
/// [`RecordBatch`]es.
pub async fn do_get(&mut self, ticket: Vec<u8>) -> Result<FlightRecordBatchStream> {
let t = Ticket { ticket };
let t = Ticket {
ticket: ticket.into(),
};
let request = self.make_request(t);
let response = self

View File

@ -6,7 +6,7 @@ use arrow::{
ipc::writer::IpcWriteOptions,
record_batch::RecordBatch,
};
use arrow_flight::{utils::flight_data_from_arrow_batch, FlightData, SchemaAsIpc};
use arrow_flight::{FlightData, SchemaAsIpc};
use futures::{stream::BoxStream, StreamExt};
/// Creates a stream which encodes a [`Stream`](futures::Stream) of
@ -93,7 +93,7 @@ impl StreamEncoderBuilder {
// to have that schema too
let schema = Arc::new(prepare_schema_for_flight(&schema));
let mut schema_flight_data: FlightData = SchemaAsIpc::new(&schema, &options).into();
schema_flight_data.app_metadata = app_metadata;
schema_flight_data.app_metadata = app_metadata.into();
let schema_stream = futures::stream::once(async move { Ok(schema_flight_data) });
@ -248,6 +248,25 @@ fn hydrate_dictionary(array: &ArrayRef) -> Result<ArrayRef, tonic::Status> {
}
}
/// TODO remove when arrow 31.0.0 is released
/// and instead use the FlightDataEncoder directly
pub fn flight_data_from_arrow_batch(
batch: &RecordBatch,
options: &IpcWriteOptions,
) -> (Vec<FlightData>, FlightData) {
let data_gen = arrow::ipc::writer::IpcDataGenerator::default();
let mut dictionary_tracker = arrow::ipc::writer::DictionaryTracker::new(false);
let (encoded_dictionaries, encoded_batch) = data_gen
.encoded_batch(batch, &mut dictionary_tracker, options)
.expect("DictionaryTracker configured above to not error on replacement");
let flight_dictionaries = encoded_dictionaries.into_iter().map(Into::into).collect();
let flight_batch = encoded_batch.into();
(flight_dictionaries, flight_batch)
}
#[cfg(test)]
mod tests {
use arrow::{

View File

@ -155,6 +155,6 @@ impl FlightSqlClient {
})?
.ticket;
self.inner.do_get(ticket).await
self.inner.do_get(ticket.into()).await
}
}

View File

@ -521,6 +521,9 @@ pub trait PartitionRepo: Send + Sync {
time_in_the_past: Timestamp,
max_num_partitions: usize,
) -> Result<Vec<PartitionParam>>;
/// Select partitions to compact
async fn partitions_to_compact(&mut self, recent_time: Timestamp) -> Result<Vec<PartitionId>>;
}
/// Functions for working with tombstones in the catalog
@ -3888,6 +3891,13 @@ pub(crate) mod test_helpers {
.await
.unwrap();
assert!(partitions.is_empty());
// read from partition table only
let partitions = repos
.partitions()
.partitions_to_compact(time_two_hour_ago)
.await
.unwrap();
assert!(partitions.is_empty());
// create a deleted L0 file that was created 1 hour ago which is recently
let l0_one_hour_ago_file_params = ParquetFileParams {
@ -3909,6 +3919,14 @@ pub(crate) mod test_helpers {
.unwrap();
assert_eq!(partitions.len(), 1);
assert_eq!(partitions[0].partition_id, partition1.id);
// read from partition table only
let partitions = repos
.partitions()
.partitions_to_compact(time_two_hour_ago)
.await
.unwrap();
assert_eq!(partitions.len(), 1);
assert_eq!(partitions[0], partition1.id);
// -----------------
// PARTITION two
@ -3927,6 +3945,14 @@ pub(crate) mod test_helpers {
.unwrap();
assert_eq!(partitions.len(), 1);
assert_eq!(partitions[0].partition_id, partition1.id);
// read from partition table only
let partitions = repos
.partitions()
.partitions_to_compact(time_two_hour_ago)
.await
.unwrap();
assert_eq!(partitions.len(), 1);
assert_eq!(partitions[0], partition1.id);
// Add a L0 file created non-recently (5 hours ago)
let l0_five_hour_ago_file_params = ParquetFileParams {
@ -3949,6 +3975,14 @@ pub(crate) mod test_helpers {
.unwrap();
assert_eq!(partitions.len(), 1);
assert_eq!(partitions[0].partition_id, partition1.id);
// read from partition table only
let partitions = repos
.partitions()
.partitions_to_compact(time_two_hour_ago)
.await
.unwrap();
assert_eq!(partitions.len(), 1);
assert_eq!(partitions[0], partition1.id);
// Add a L1 created recently (just now)
let l1_file_params = ParquetFileParams {
@ -3976,6 +4010,18 @@ pub(crate) mod test_helpers {
partitions.sort_by(|a, b| a.partition_id.cmp(&b.partition_id));
assert_eq!(partitions[0].partition_id, partition1.id);
assert_eq!(partitions[1].partition_id, partition2.id);
// read from partition table only
let partitions = repos
.partitions()
.partitions_to_compact(time_two_hour_ago)
.await
.unwrap();
assert_eq!(partitions.len(), 2);
// sort by partition id
let mut partitions = partitions;
partitions.sort();
assert_eq!(partitions[0], partition1.id);
assert_eq!(partitions[1], partition2.id);
// -----------------
// PARTITION three
@ -3998,6 +4044,18 @@ pub(crate) mod test_helpers {
partitions.sort_by(|a, b| a.partition_id.cmp(&b.partition_id));
assert_eq!(partitions[0].partition_id, partition1.id);
assert_eq!(partitions[1].partition_id, partition2.id);
// read from partition table only
let partitions = repos
.partitions()
.partitions_to_compact(time_two_hour_ago)
.await
.unwrap();
assert_eq!(partitions.len(), 2);
// sort by partition id
let mut partitions = partitions;
partitions.sort();
assert_eq!(partitions[0], partition1.id);
assert_eq!(partitions[1], partition2.id);
// Add a L2 created recently (just now) for partition three
// Since it is L2, the partition won't get updated
@ -4026,6 +4084,18 @@ pub(crate) mod test_helpers {
partitions.sort_by(|a, b| a.partition_id.cmp(&b.partition_id));
assert_eq!(partitions[0].partition_id, partition1.id);
assert_eq!(partitions[1].partition_id, partition2.id);
// read from partition table only
let partitions = repos
.partitions()
.partitions_to_compact(time_two_hour_ago)
.await
.unwrap();
assert_eq!(partitions.len(), 2);
// sort by partition id
let mut partitions = partitions;
partitions.sort();
assert_eq!(partitions[0], partition1.id);
assert_eq!(partitions[1], partition2.id);
// add an L0 file created recently (one hour ago) for partition three
let l0_one_hour_ago_file_params = ParquetFileParams {
@ -4053,6 +4123,19 @@ pub(crate) mod test_helpers {
assert_eq!(partitions[0].partition_id, partition1.id);
assert_eq!(partitions[1].partition_id, partition2.id);
assert_eq!(partitions[2].partition_id, partition3.id);
// read from partition table only
let partitions = repos
.partitions()
.partitions_to_compact(time_two_hour_ago)
.await
.unwrap();
assert_eq!(partitions.len(), 3);
// sort by partition id
let mut partitions = partitions;
partitions.sort();
assert_eq!(partitions[0], partition1.id);
assert_eq!(partitions[1], partition2.id);
assert_eq!(partitions[2], partition3.id);
// Limit max num partition
let partitions = repos

View File

@ -1019,6 +1019,19 @@ impl PartitionRepo for MemTxn {
Ok(partitions)
}
async fn partitions_to_compact(&mut self, recent_time: Timestamp) -> Result<Vec<PartitionId>> {
let stage = self.stage();
let partitions: Vec<_> = stage
.partitions
.iter()
.filter(|p| p.new_file_at > Some(recent_time))
.map(|p| p.id)
.collect();
Ok(partitions)
}
}
#[async_trait]

View File

@ -253,6 +253,7 @@ decorate!(
"partition_update_persisted_sequence_number" = update_persisted_sequence_number(&mut self, partition_id: PartitionId, sequence_number: SequenceNumber) -> Result<()>;
"partition_most_recent_n" = most_recent_n(&mut self, n: usize, shards: &[ShardId]) -> Result<Vec<Partition>>;
"partitions_with_recent_created_files" = partitions_with_recent_created_files(&mut self, time_in_the_past: Timestamp, max_num_partitions: usize) -> Result<Vec<PartitionParam>>;
"partitions_to_compact" = partitions_to_compact(&mut self, recent_time: Timestamp) -> Result<Vec<PartitionId>>;
]
);

View File

@ -1434,6 +1434,20 @@ WHERE id = $2;
.await
.map_err(|e| Error::SqlxError { source: e })
}
async fn partitions_to_compact(&mut self, recent_time: Timestamp) -> Result<Vec<PartitionId>> {
sqlx::query_as(
r#"
SELECT p.id as partition_id
FROM partition p
WHERE p.new_file_at > $1
"#,
)
.bind(recent_time) // $1
.fetch_all(&mut self.inner)
.await
.map_err(|e| Error::SqlxError { source: e })
}
}
#[async_trait]

View File

@ -4,6 +4,7 @@
pub(crate) mod context;
pub mod field;
pub mod fieldlist;
pub(crate) mod gapfill;
mod non_null_checker;
mod query_tracing;
mod schema_pivot;

View File

@ -0,0 +1,136 @@
//! This module contains code that implements
//! a gap-filling extension to DataFusion
use std::sync::Arc;
use datafusion::{
common::{DFSchema, DFSchemaRef},
error::Result,
logical_expr::{utils::exprlist_to_fields, LogicalPlan, UserDefinedLogicalNode},
prelude::Expr,
};
/// A logical node that represents the gap filling operation.
#[derive(Clone, Debug)]
pub struct GapFill {
input: Arc<LogicalPlan>,
group_expr: Vec<Expr>,
aggr_expr: Vec<Expr>,
schema: DFSchemaRef,
time_column: Expr,
}
impl GapFill {
pub fn try_new(
input: Arc<LogicalPlan>,
group_expr: Vec<Expr>,
aggr_expr: Vec<Expr>,
time_column: Expr,
) -> Result<Self> {
let all_expr = group_expr.iter().chain(aggr_expr.iter());
let schema = DFSchema::new_with_metadata(
exprlist_to_fields(all_expr, &input)?,
input.schema().metadata().clone(),
)?
.into();
Ok(Self {
input,
group_expr,
aggr_expr,
schema,
time_column,
})
}
}
impl UserDefinedLogicalNode for GapFill {
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn inputs(&self) -> Vec<&LogicalPlan> {
vec![self.input.as_ref()]
}
fn schema(&self) -> &DFSchemaRef {
&self.schema
}
fn expressions(&self) -> Vec<Expr> {
self.group_expr
.iter()
.chain(self.aggr_expr.iter())
.cloned()
.collect()
}
fn fmt_for_explain(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"GapFill: groupBy=[{:?}], aggr=[{:?}], time_column={}",
self.group_expr, self.aggr_expr, self.time_column
)
}
fn from_template(
&self,
exprs: &[Expr],
inputs: &[LogicalPlan],
) -> Arc<dyn UserDefinedLogicalNode> {
let mut group_expr: Vec<_> = exprs.to_vec();
let aggr_expr = group_expr.split_off(self.group_expr.len());
let gapfill = Self::try_new(
Arc::new(inputs[0].clone()),
group_expr,
aggr_expr,
self.time_column.clone(),
)
.expect("should not fail");
Arc::new(gapfill)
}
}
#[cfg(test)]
mod test {
use super::*;
use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
use datafusion::{
error::Result,
logical_expr::{logical_plan, Extension},
prelude::col,
};
fn table_scan() -> Result<LogicalPlan> {
let schema = Schema::new(vec![
Field::new(
"time",
DataType::Timestamp(TimeUnit::Nanosecond, None),
false,
),
Field::new("loc", DataType::Utf8, false),
Field::new("temp", DataType::Float64, false),
]);
logical_plan::table_scan(Some("temps"), &schema, None)?.build()
}
#[test]
fn fmt_logical_plan() -> Result<()> {
// This test case does not make much sense but
// just verifies we can construct a logical gapfill node
// and show its plan.
let scan = table_scan()?;
let gapfill = GapFill::try_new(
Arc::new(scan),
vec![col("loc"), col("time")],
vec![col("temp")],
col("time"),
)?;
let plan = LogicalPlan::Extension(Extension {
node: Arc::new(gapfill),
});
let expected = "GapFill: groupBy=[[loc, time]], aggr=[[temp]], time_column=time\
\n TableScan: temps";
assert_eq!(expected, format!("{}", plan.display_indent()));
Ok(())
}
}

View File

@ -285,7 +285,7 @@ impl SeriesSet {
let tags = self.create_frame_tags(schema.field(index.value_index).name());
let mut timestamps = compute::nullif::nullif(
let mut timestamps = compute::kernels::nullif::nullif(
batch.column(index.timestamp_index),
&compute::is_null(array).expect("is_null"),
)

View File

@ -0,0 +1,476 @@
//! An optimizer rule that transforms a plan
//! to fill gaps in time series data.
use crate::exec::gapfill::GapFill;
use datafusion::{
error::{DataFusionError, Result},
logical_expr::{
expr_rewriter::{ExprRewritable, ExprRewriter, RewriteRecursion},
expr_visitor::{ExprVisitable, ExpressionVisitor, Recursion},
Aggregate, BuiltinScalarFunction, Extension, LogicalPlan, Sort,
},
optimizer::{optimizer::ApplyOrder, OptimizerConfig, OptimizerRule},
prelude::{col, Expr},
};
use query_functions::gapfill::DATE_BIN_GAPFILL_UDF_NAME;
use std::sync::Arc;
// This optimizer rule enables gap-filling semantics for SQL queries
/// that contain calls to `DATE_BIN_GAPFILL()`.
///
/// In SQL a typical gap-filling query might look like this:
/// ```sql
/// SELECT
/// location,
/// DATE_BIN_GAPFILL(INTERVAL '1 minute', time, '1970-01-01T00:00:00Z') AS minute,
/// AVG(temp)
/// FROM temps
/// WHERE time > NOW() - INTERVAL '6 hours' AND time < NOW()
/// GROUP BY LOCATION, MINUTE
/// ```
/// The aggregation step of the initial logical plan looks like this:
/// ```text
/// Aggregate: groupBy=[[datebingapfill(IntervalDayTime("60000"), temps.time, TimestampNanosecond(0, None)))]], aggr=[[AVG(temps.temp)]]
/// ```
/// However, `DATE_BIN_GAPFILL()` does not have an actual implementation like other functions.
/// Instead, the plan is transformed to this:
/// ```text
/// GapFill: groupBy=[[datebingapfill(IntervalDayTime("60000"), temps.time, TimestampNanosecond(0, None)))]], aggr=[[AVG(temps.temp)]], start=..., stop=...
/// Sort: datebingapfill(IntervalDayTime("60000"), temps.time, TimestampNanosecond(0, None))
/// Aggregate: groupBy=[[datebingapfill(IntervalDayTime("60000"), temps.time, TimestampNanosecond(0, None)))]], aggr=[[AVG(temps.temp)]]
/// ```
/// This optimizer rule makes that transformation.
pub struct HandleGapFill;
impl HandleGapFill {
pub fn new() -> Self {
Self {}
}
}
impl Default for HandleGapFill {
fn default() -> Self {
Self::new()
}
}
impl OptimizerRule for HandleGapFill {
fn try_optimize(
&self,
plan: &LogicalPlan,
_config: &dyn OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
handle_gap_fill(plan)
}
fn name(&self) -> &str {
"handle_gap_fill"
}
fn apply_order(&self) -> Option<ApplyOrder> {
Some(ApplyOrder::BottomUp)
}
}
fn handle_gap_fill(plan: &LogicalPlan) -> Result<Option<LogicalPlan>> {
let res = match plan {
LogicalPlan::Aggregate(aggr) => handle_aggregate(aggr)?,
_ => None,
};
if res.is_none() {
// no transformation was applied,
// so make sure the plan is not using gap filling
// functions in an unsupported way.
check_node(plan)?;
}
Ok(res)
}
fn handle_aggregate(aggr: &Aggregate) -> Result<Option<LogicalPlan>> {
let Aggregate {
input,
group_expr,
aggr_expr,
schema,
..
} = aggr;
// new_group_expr has DATE_BIN_GAPFILL replaced with DATE_BIN.
let (new_group_expr, dbg_idx) = if let Some(v) = replace_date_bin_gapfill(group_expr)? {
v
} else {
return Ok(None);
};
let orig_dbg_name = schema.fields()[dbg_idx].name();
let new_aggr_plan = {
let new_aggr_plan =
Aggregate::try_new(Arc::clone(input), new_group_expr, aggr_expr.clone())?;
let new_aggr_plan = LogicalPlan::Aggregate(new_aggr_plan);
check_node(&new_aggr_plan)?;
new_aggr_plan
};
let new_sort_plan = {
let mut sort_exprs: Vec<_> = new_aggr_plan
.schema()
.fields()
.iter()
.take(group_expr.len())
.map(|f| col(f.qualified_column()).sort(true, true))
.collect();
// ensure that date_bin_gapfill is the last sort expression.
let last_elm = sort_exprs.len() - 1;
sort_exprs.swap(dbg_idx, last_elm);
LogicalPlan::Sort(Sort {
expr: sort_exprs,
input: Arc::new(new_aggr_plan),
fetch: None,
})
};
let new_gap_fill_plan = {
let mut new_group_expr: Vec<_> = new_sort_plan
.schema()
.fields()
.iter()
.enumerate()
.map(|(i, f)| {
let e = Expr::Column(f.qualified_column());
if i == dbg_idx {
// Make the column name look the same as in the original
// Aggregate node.
Expr::Alias(Box::new(e), orig_dbg_name.to_string())
} else {
e
}
})
.collect();
let aggr_expr = new_group_expr.split_off(group_expr.len());
let time_column = col(new_sort_plan.schema().fields()[dbg_idx].qualified_column());
LogicalPlan::Extension(Extension {
node: Arc::new(GapFill::try_new(
Arc::new(new_sort_plan),
new_group_expr,
aggr_expr,
time_column,
)?),
})
};
Ok(Some(new_gap_fill_plan))
}
// Iterate over the group expression list.
// If it finds no occurrences of date_bin_gapfill at the top of
// each expression tree, it will return None.
// If it finds such an occurrence, it will return a new expression list
// with the date_bin_gapfill replaced with date_bin, and the index of
// where the replacement occurred.
fn replace_date_bin_gapfill(group_expr: &[Expr]) -> Result<Option<(Vec<Expr>, usize)>> {
let mut date_bin_gapfill_count = 0;
group_expr.iter().try_for_each(|e| -> Result<()> {
let fn_cnt = count_date_bin_gapfill(e)?;
date_bin_gapfill_count += fn_cnt;
Ok(())
})?;
match date_bin_gapfill_count {
0 => return Ok(None),
2.. => {
return Err(DataFusionError::Plan(
"DATE_BIN_GAPFILL specified more than once".to_string(),
))
}
_ => (),
}
let group_expr = group_expr.to_owned();
let mut new_group_expr = Vec::with_capacity(group_expr.len());
let mut dbg_idx = None;
group_expr
.into_iter()
.enumerate()
.try_for_each(|(i, e)| -> Result<()> {
let mut rewriter = DateBinGapfillRewriter { found: false };
new_group_expr.push(e.rewrite(&mut rewriter)?);
if rewriter.found {
dbg_idx = Some(i);
}
Ok(())
})?;
Ok(Some((
new_group_expr,
dbg_idx.expect("should have found a call to DATE_BIN_GAPFILL based on previous check"),
)))
}
struct DateBinGapfillRewriter {
found: bool,
}
impl ExprRewriter for DateBinGapfillRewriter {
fn pre_visit(&mut self, expr: &Expr) -> Result<RewriteRecursion> {
match expr {
Expr::ScalarUDF { fun, .. } if fun.name == DATE_BIN_GAPFILL_UDF_NAME => {
Ok(RewriteRecursion::Mutate)
}
_ => Ok(RewriteRecursion::Continue),
}
}
fn mutate(&mut self, expr: Expr) -> Result<Expr> {
match expr {
Expr::ScalarUDF { fun, args } if fun.name == DATE_BIN_GAPFILL_UDF_NAME => {
self.found = true;
Ok(Expr::ScalarFunction {
fun: BuiltinScalarFunction::DateBin,
args,
})
}
_ => Ok(expr),
}
}
}
fn count_date_bin_gapfill(e: &Expr) -> Result<usize> {
struct Finder {
count: usize,
}
impl ExpressionVisitor for Finder {
fn pre_visit(mut self, expr: &Expr) -> Result<Recursion<Self>> {
match expr {
Expr::ScalarUDF { fun, .. } if fun.name == DATE_BIN_GAPFILL_UDF_NAME => {
self.count += 1;
}
_ => (),
};
Ok(Recursion::Continue(self))
}
}
let f = Finder { count: 0 };
let f = e.accept(f)?;
Ok(f.count)
}
fn check_node(node: &LogicalPlan) -> Result<()> {
node.expressions().iter().try_for_each(|expr| {
let count = count_date_bin_gapfill(expr)?;
if count > 0 {
Err(DataFusionError::Plan(format!(
"{} may only be used as a GROUP BY expression",
DATE_BIN_GAPFILL_UDF_NAME
)))
} else {
Ok(())
}
})
}
#[cfg(test)]
mod test {
use std::sync::Arc;
use super::HandleGapFill;
use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
use datafusion::error::Result;
use datafusion::logical_expr::{logical_plan, LogicalPlan, LogicalPlanBuilder};
use datafusion::optimizer::optimizer::Optimizer;
use datafusion::optimizer::OptimizerContext;
use datafusion::prelude::{avg, col, lit, lit_timestamp_nano, Expr};
use datafusion::scalar::ScalarValue;
use query_functions::gapfill::DATE_BIN_GAPFILL_UDF_NAME;
fn table_scan() -> Result<LogicalPlan> {
let schema = Schema::new(vec![
Field::new(
"time",
DataType::Timestamp(TimeUnit::Nanosecond, None),
false,
),
Field::new("loc", DataType::Utf8, false),
Field::new("temp", DataType::Float64, false),
]);
logical_plan::table_scan(Some("temps"), &schema, None)?.build()
}
fn date_bin_gapfill(interval: Expr, time: Expr) -> Result<Expr> {
Ok(Expr::ScalarUDF {
fun: query_functions::registry().udf(DATE_BIN_GAPFILL_UDF_NAME)?,
args: vec![interval, time, lit_timestamp_nano(0)],
})
}
fn optimize(plan: &LogicalPlan) -> Result<Option<LogicalPlan>> {
let optimizer = Optimizer::with_rules(vec![Arc::new(HandleGapFill::default())]);
optimizer.optimize_recursively(
optimizer.rules.get(0).unwrap(),
plan,
&OptimizerContext::new(),
)
}
fn assert_optimizer_err(plan: &LogicalPlan, expected: &str) {
match optimize(plan) {
Ok(plan) => assert_eq!(format!("{}", plan.unwrap().display_indent()), "an error"),
Err(ref e) => {
let actual = e.to_string();
if expected.is_empty() || !actual.contains(expected) {
assert_eq!(actual, expected)
}
}
}
}
fn assert_optimization_skipped(plan: &LogicalPlan) -> Result<()> {
let new_plan = optimize(plan)?;
if new_plan.is_none() {
return Ok(());
}
assert_eq!(
format!("{}", plan.display_indent()),
format!("{}", new_plan.unwrap().display_indent())
);
Ok(())
}
fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) -> Result<()> {
let new_plan = optimize(plan)?;
let new_lines = new_plan
.expect("plan should have been optimized")
.display_indent()
.to_string();
let new_lines = new_lines.split('\n');
let expected_lines = expected.split('\n');
// compare each line rather than the whole blob, to make it easier
// to read output when tests fail.
expected_lines
.zip(new_lines)
.for_each(|(expected, actual)| assert_eq!(expected, actual));
Ok(())
}
#[test]
fn misplaced_fns_err() -> Result<()> {
// date_bin_gapfill used in a filter should produce an error
let scan = table_scan()?;
let plan = LogicalPlanBuilder::from(scan)
.filter(
date_bin_gapfill(
lit(ScalarValue::IntervalDayTime(Some(60_0000))),
col("temp"),
)?
.gt(lit(100.0)),
)?
.build()?;
assert_optimizer_err(
&plan,
"Error during planning: date_bin_gapfill may only be used as a GROUP BY expression",
);
Ok(())
}
#[test]
fn no_change() -> Result<()> {
let plan = LogicalPlanBuilder::from(table_scan()?)
.aggregate(vec![col("loc")], vec![avg(col("temp"))])?
.build()?;
assert_optimization_skipped(&plan)?;
Ok(())
}
#[test]
fn date_bin_gapfill_simple() -> Result<()> {
let plan = LogicalPlanBuilder::from(table_scan()?)
.aggregate(
vec![date_bin_gapfill(
lit(ScalarValue::IntervalDayTime(Some(60_000))),
col("time"),
)?],
vec![avg(col("temp"))],
)?
.build()?;
let dbg_args = "IntervalDayTime(\"60000\"),temps.time,TimestampNanosecond(0, None)";
let expected = format!(
"GapFill: groupBy=[[datebin({dbg_args}) AS date_bin_gapfill({dbg_args})]], aggr=[[AVG(temps.temp)]], time_column=datebin({dbg_args})\
\n Sort: datebin({dbg_args}) ASC NULLS FIRST\
\n Aggregate: groupBy=[[datebin(IntervalDayTime(\"60000\"), temps.time, TimestampNanosecond(0, None))]], aggr=[[AVG(temps.temp)]]\
\n TableScan: temps");
assert_optimized_plan_eq(&plan, &expected)?;
Ok(())
}
#[test]
fn reordered_sort_exprs() -> Result<()> {
// grouping by date_bin_gapfill(...), loc
// but the sort node should have date_bin_gapfill last.
let plan = LogicalPlanBuilder::from(table_scan()?)
.aggregate(
vec![
date_bin_gapfill(lit(ScalarValue::IntervalDayTime(Some(60_000))), col("time"))?,
col("loc"),
],
vec![avg(col("temp"))],
)?
.build()?;
let dbg_args = "IntervalDayTime(\"60000\"),temps.time,TimestampNanosecond(0, None)";
let expected = format!(
"GapFill: groupBy=[[datebin({dbg_args}) AS date_bin_gapfill({dbg_args}), temps.loc]], aggr=[[AVG(temps.temp)]], time_column=datebin({dbg_args})\
\n Sort: temps.loc ASC NULLS FIRST, datebin({dbg_args}) ASC NULLS FIRST\
\n Aggregate: groupBy=[[datebin(IntervalDayTime(\"60000\"), temps.time, TimestampNanosecond(0, None)), temps.loc]], aggr=[[AVG(temps.temp)]]\
\n TableScan: temps");
assert_optimized_plan_eq(&plan, &expected)?;
Ok(())
}
#[test]
fn double_date_bin_gapfill() -> Result<()> {
let plan = LogicalPlanBuilder::from(table_scan()?)
.aggregate(
vec![
date_bin_gapfill(lit(ScalarValue::IntervalDayTime(Some(60_000))), col("time"))?,
date_bin_gapfill(lit(ScalarValue::IntervalDayTime(Some(30_000))), col("time"))?,
],
vec![avg(col("temp"))],
)?
.build()?;
assert_optimizer_err(
&plan,
"Error during planning: DATE_BIN_GAPFILL specified more than once",
);
Ok(())
}
#[test]
fn with_projection() -> Result<()> {
let dbg_args = "IntervalDayTime(\"60000\"),temps.time,TimestampNanosecond(0, None)";
let plan = LogicalPlanBuilder::from(table_scan()?)
.aggregate(
vec![date_bin_gapfill(
lit(ScalarValue::IntervalDayTime(Some(60_000))),
col("time"),
)?],
vec![avg(col("temp"))],
)?
.project(vec![
col(format!("date_bin_gapfill({dbg_args})")),
col("AVG(temps.temp)"),
])?
.build()?;
let expected = format!(
"Projection: date_bin_gapfill({dbg_args}), AVG(temps.temp)\
\n GapFill: groupBy=[[datebin({dbg_args}) AS date_bin_gapfill({dbg_args})]], aggr=[[AVG(temps.temp)]], time_column=datebin({dbg_args})\
\n Sort: datebin({dbg_args}) ASC NULLS FIRST\
\n Aggregate: groupBy=[[datebin(IntervalDayTime(\"60000\"), temps.time, TimestampNanosecond(0, None))]], aggr=[[AVG(temps.temp)]]\
\n TableScan: temps");
assert_optimized_plan_eq(&plan, &expected)?;
Ok(())
}
}

View File

@ -4,6 +4,7 @@ use datafusion::execution::context::SessionState;
use self::influx_regex_to_datafusion_regex::InfluxRegexToDataFusionRegex;
mod handle_gapfill;
mod influx_regex_to_datafusion_regex;
/// Register IOx-specific logical [`OptimizerRule`]s with the SessionContext

View File

@ -1,5 +1,5 @@
use async_trait::async_trait;
use clap_blocks::{compactor::CompactorConfig, compactor2::Compactor2Config};
use clap_blocks::compactor::CompactorConfig;
use compactor::{
handler::{CompactorHandler, CompactorHandlerImpl},
server::{grpc::GrpcDelegate, CompactorServer},
@ -134,129 +134,6 @@ impl HttpApiErrorSource for IoxHttpError {
}
}
/// Instantiate a compactor server that uses the RPC write path
// NOTE!!! This needs to be kept in sync with `create_compactor_server_type` until the switch to
// the RPC write path mode is complete! See the annotations about where these two functions line up
// and where they diverge.
pub async fn create_compactor2_server_type(
common_state: &CommonServerState,
metric_registry: Arc<metric::Registry>,
catalog: Arc<dyn Catalog>,
parquet_store: ParquetStorage,
exec: Arc<Executor>,
time_provider: Arc<dyn TimeProvider>,
// Parameter difference: this function takes a `Compactor2Config` that doesn't have a write
// buffer topic or shard indexes. All the other parameters here are the same.
compactor_config: Compactor2Config,
) -> Result<Arc<dyn ServerType>> {
let grpc_catalog = Arc::clone(&catalog);
// Setup difference: build a compactor2 instead, which expects a `Compactor2Config`.
let compactor = build_compactor2_from_config(
compactor_config,
catalog,
parquet_store,
exec,
time_provider,
Arc::clone(&metric_registry),
)
.await?;
let compactor_handler = Arc::new(CompactorHandlerImpl::new(Arc::new(compactor)));
let grpc = GrpcDelegate::new(grpc_catalog, Arc::clone(&compactor_handler));
let compactor = CompactorServer::new(metric_registry, grpc, compactor_handler);
Ok(Arc::new(CompactorServerType::new(compactor, common_state)))
}
// NOTE!!! This needs to be kept in sync with `build_compactor_from_config` until the switch to
// the RPC write path mode is complete! See the annotations about where these two functions line up
// and where they diverge.
pub async fn build_compactor2_from_config(
// Parameter difference: this function takes a `Compactor2Config` that doesn't have a write
// buffer topic or shard indexes. All the other parameters here are the same.
compactor_config: Compactor2Config,
catalog: Arc<dyn Catalog>,
parquet_store: ParquetStorage,
exec: Arc<Executor>,
time_provider: Arc<dyn TimeProvider>,
metric_registry: Arc<Registry>,
) -> Result<compactor::compact::Compactor, Error> {
// 1. Shard index range checking: MISSING
// This function doesn't check the shard index range here like `build_compactor_from_config`
// does because shard indexes aren't relevant to `compactor2`.
// 2. Split percentage value range checking
if compactor_config.split_percentage < 1 || compactor_config.split_percentage > 100 {
return Err(Error::SplitPercentageRange {
split_percentage: compactor_config.split_percentage,
});
}
// 3. Ensure topic and shard indexes are in the catalog: MISSING
// This isn't relevant to `compactor2`.
// 4. Convert config type to handler config type
let Compactor2Config {
max_desired_file_size_bytes,
percentage_max_file_size,
split_percentage,
max_number_partitions_per_shard,
min_number_recent_ingested_files_per_partition,
hot_multiple,
warm_multiple,
memory_budget_bytes,
min_num_rows_allocated_per_record_batch_to_datafusion_plan,
max_num_compacting_files,
max_num_compacting_files_first_in_partition,
minutes_without_new_writes_to_be_cold,
cold_partition_candidates_hours_threshold,
hot_compaction_hours_threshold_1,
hot_compaction_hours_threshold_2,
max_parallel_partitions,
warm_partition_candidates_hours_threshold,
warm_compaction_small_size_threshold_bytes,
warm_compaction_min_small_file_count,
} = compactor_config;
let compactor_config = compactor::handler::CompactorConfig {
max_desired_file_size_bytes,
percentage_max_file_size,
split_percentage,
max_number_partitions_per_shard,
min_number_recent_ingested_files_per_partition,
hot_multiple,
warm_multiple,
memory_budget_bytes,
min_num_rows_allocated_per_record_batch_to_datafusion_plan,
max_num_compacting_files,
max_num_compacting_files_first_in_partition,
minutes_without_new_writes_to_be_cold,
cold_partition_candidates_hours_threshold,
hot_compaction_hours_threshold_1,
hot_compaction_hours_threshold_2,
max_parallel_partitions,
warm_partition_candidates_hours_threshold,
warm_compaction_small_size_threshold_bytes,
warm_compaction_min_small_file_count,
};
// 4. END
// 5. Create a new compactor: `compactor2` is assigned all shards (this argument can go away
// completely when the switch to RPC mode is complete)
Ok(compactor::compact::Compactor::new(
compactor::compact::ShardAssignment::All,
catalog,
parquet_store,
exec,
time_provider,
backoff::BackoffConfig::default(),
compactor_config,
metric_registry,
))
}
/// Instantiate a compactor server
// NOTE!!! This needs to be kept in sync with `create_compactor2_server_type` until the switch to
// the RPC write path mode is complete! See the annotations about where these two functions line up

View File

@ -0,0 +1,21 @@
[package]
name = "ioxd_compactor2"
version.workspace = true
authors.workspace = true
edition.workspace = true
license.workspace = true
[dependencies]
async-trait = "0.1"
clap_blocks = { path = "../clap_blocks" }
compactor2 = { path = "../compactor2" }
hyper = "0.14"
iox_catalog = { path = "../iox_catalog" }
iox_time = { path = "../iox_time" }
iox_query = { path = "../iox_query" }
ioxd_common = { path = "../ioxd_common" }
metric = { path = "../metric" }
parquet_file = { path = "../parquet_file" }
tokio-util = "0.7.4"
trace = { path = "../trace" }
workspace-hack = { path = "../workspace-hack"}

144
ioxd_compactor2/src/lib.rs Normal file
View File

@ -0,0 +1,144 @@
use async_trait::async_trait;
use clap_blocks::compactor2::Compactor2Config;
use compactor2::compactor::Compactor2;
use hyper::{Body, Request, Response};
use iox_catalog::interface::Catalog;
use iox_query::exec::Executor;
use iox_time::TimeProvider;
use ioxd_common::{
add_service,
http::error::{HttpApiError, HttpApiErrorCode, HttpApiErrorSource},
rpc::RpcBuilderInput,
serve_builder,
server_type::{CommonServerState, RpcError, ServerType},
setup_builder,
};
use metric::Registry;
use parquet_file::storage::ParquetStorage;
use std::{
fmt::{Debug, Display},
sync::Arc,
};
use tokio_util::sync::CancellationToken;
use trace::TraceCollector;
pub struct Compactor2ServerType {
compactor: Compactor2,
metric_registry: Arc<Registry>,
trace_collector: Option<Arc<dyn TraceCollector>>,
}
impl std::fmt::Debug for Compactor2ServerType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Compactor2")
}
}
impl Compactor2ServerType {
pub fn new(
compactor: Compactor2,
metric_registry: Arc<metric::Registry>,
common_state: &CommonServerState,
) -> Self {
Self {
compactor,
metric_registry,
trace_collector: common_state.trace_collector(),
}
}
}
#[async_trait]
impl ServerType for Compactor2ServerType {
/// Return the [`metric::Registry`] used by the compactor.
fn metric_registry(&self) -> Arc<Registry> {
Arc::clone(&self.metric_registry)
}
/// Returns the trace collector for compactor traces.
fn trace_collector(&self) -> Option<Arc<dyn TraceCollector>> {
self.trace_collector.as_ref().map(Arc::clone)
}
/// Just return "not found".
async fn route_http_request(
&self,
_req: Request<Body>,
) -> Result<Response<Body>, Box<dyn HttpApiErrorSource>> {
Err(Box::new(IoxHttpError::NotFound))
}
/// Configure the gRPC services.
async fn server_grpc(self: Arc<Self>, builder_input: RpcBuilderInput) -> Result<(), RpcError> {
let builder = setup_builder!(builder_input, self);
serve_builder!(builder);
Ok(())
}
async fn join(self: Arc<Self>) {
self.compactor
.join()
.await
.expect("clean compactor shutdown");
}
fn shutdown(&self, frontend: CancellationToken) {
frontend.cancel();
self.compactor.shutdown();
}
}
/// Simple error struct, we're not really providing an HTTP interface for the compactor.
#[derive(Debug)]
pub enum IoxHttpError {
NotFound,
}
impl IoxHttpError {
fn status_code(&self) -> HttpApiErrorCode {
match self {
IoxHttpError::NotFound => HttpApiErrorCode::NotFound,
}
}
}
impl Display for IoxHttpError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}", self)
}
}
impl std::error::Error for IoxHttpError {}
impl HttpApiErrorSource for IoxHttpError {
fn to_http_api_error(&self) -> HttpApiError {
HttpApiError::new(self.status_code(), self.to_string())
}
}
/// Instantiate a compactor2 server that uses the RPC write path
pub fn create_compactor2_server_type(
common_state: &CommonServerState,
metric_registry: Arc<metric::Registry>,
catalog: Arc<dyn Catalog>,
parquet_store: ParquetStorage,
exec: Arc<Executor>,
time_provider: Arc<dyn TimeProvider>,
compactor_config: Compactor2Config,
) -> Arc<dyn ServerType> {
let compactor = Compactor2::start(compactor2::config::Config {
metric_registry: Arc::clone(&metric_registry),
catalog,
parquet_store,
exec,
time_provider,
partition_concurrency: compactor_config.compaction_partition_concurrency,
});
Arc::new(Compactor2ServerType::new(
compactor,
metric_registry,
common_state,
))
}

View File

@ -31,7 +31,7 @@ pub mod selectors;
mod window;
/// gap filling expressions
mod gapfill;
pub mod gapfill;
/// Function registry
mod registry;

View File

@ -341,7 +341,7 @@ where
}?;
let message: prost_types::Any =
prost::Message::decode(cmd.as_slice()).context(DeserializationSnafu)?;
prost::Message::decode(cmd.as_ref()).context(DeserializationSnafu)?;
let flight_info = self.dispatch(&namespace_name, request, message).await?;
Ok(tonic::Response::new(flight_info))
@ -449,7 +449,7 @@ where
}];
Ok(FlightInfo {
schema,
schema: schema.into(),
flight_descriptor: Some(flight_descriptor),
endpoint,
total_records,
@ -578,7 +578,9 @@ mod tests {
server: Arc::clone(&test_storage),
};
let ticket = Ticket {
ticket: br#"{"namespace_name": "my_db", "sql_query": "SELECT 1;"}"#.to_vec(),
ticket: br#"{"namespace_name": "my_db", "sql_query": "SELECT 1;"}"#
.to_vec()
.into(),
};
let streaming_resp1 = service
.do_get(tonic::Request::new(ticket.clone()))

View File

@ -95,7 +95,9 @@ impl IoxGetRequest {
let ticket = read_info.encode_to_vec();
Ok(Ticket { ticket })
Ok(Ticket {
ticket: ticket.into(),
})
}
/// The Go clients still use an older form of ticket encoding, JSON tickets
@ -241,7 +243,7 @@ mod tests {
#[test]
fn proto_ticket_decoding_error() {
let ticket = Ticket {
ticket: b"invalid ticket".to_vec(),
ticket: b"invalid ticket".to_vec().into(),
};
// Reverts to default (unspecified) for invalid query_type enumeration, and thus SQL
@ -279,13 +281,13 @@ mod tests {
fn make_proto_ticket(read_info: &proto::ReadInfo) -> Ticket {
Ticket {
ticket: read_info.encode_to_vec(),
ticket: read_info.encode_to_vec().into(),
}
}
fn make_json_ticket(json: &str) -> Ticket {
Ticket {
ticket: json.as_bytes().to_vec(),
ticket: json.as_bytes().to_vec().into(),
}
}
}

View File

@ -17,10 +17,10 @@ license.workspace = true
### BEGIN HAKARI SECTION
[dependencies]
ahash = { version = "0.8", default-features = false, features = ["getrandom", "runtime-rng"] }
arrow = { version = "29", features = ["arrow-csv", "arrow-ipc", "arrow-json", "comfy-table", "csv", "dyn_cmp_dict", "ipc", "json", "prettyprint"] }
arrow-flight = { version = "29", features = ["flight-sql-experimental", "prost-types"] }
arrow-ord = { version = "29", default-features = false, features = ["dyn_cmp_dict"] }
arrow-string = { version = "29", default-features = false, features = ["dyn_cmp_dict"] }
arrow = { version = "30", features = ["arrow-csv", "arrow-ipc", "arrow-json", "comfy-table", "csv", "dyn_cmp_dict", "ipc", "json", "prettyprint"] }
arrow-flight = { version = "30", features = ["flight-sql-experimental"] }
arrow-ord = { version = "30", default-features = false, features = ["dyn_cmp_dict"] }
arrow-string = { version = "30", default-features = false, features = ["dyn_cmp_dict"] }
base64 = { version = "0.13", features = ["std"] }
bitflags = { version = "1" }
byteorder = { version = "1", features = ["std"] }
@ -28,7 +28,7 @@ bytes = { version = "1", features = ["std"] }
chrono = { version = "0.4", default-features = false, features = ["alloc", "clock", "iana-time-zone", "serde", "std", "winapi"] }
crossbeam-utils = { version = "0.8", features = ["std"] }
crypto-common = { version = "0.1", default-features = false, features = ["std"] }
datafusion = { git = "https://github.com/alamb/arrow-datafusion.git", branch = "alamb/patched_for_iox", features = ["async-compression", "bzip2", "compression", "crypto_expressions", "flate2", "regex_expressions", "unicode_expressions", "xz2"] }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "279440b2ab92d18675b8102e342d4d82182287dc", features = ["async-compression", "bzip2", "compression", "crypto_expressions", "flate2", "regex_expressions", "unicode_expressions", "xz2"] }
digest = { version = "0.10", features = ["alloc", "block-buffer", "core-api", "mac", "std", "subtle"] }
either = { version = "1", features = ["use_std"] }
fixedbitset = { version = "0.4", features = ["std"] }
@ -40,7 +40,7 @@ futures-io = { version = "0.3", features = ["std"] }
futures-sink = { version = "0.3", features = ["alloc", "std"] }
futures-task = { version = "0.3", default-features = false, features = ["alloc", "std"] }
futures-util = { version = "0.3", features = ["alloc", "async-await", "async-await-macro", "channel", "futures-channel", "futures-io", "futures-macro", "futures-sink", "io", "memchr", "sink", "slab", "std"] }
getrandom = { version = "0.2", default-features = false, features = ["js", "js-sys", "std", "wasm-bindgen"] }
getrandom = { version = "0.2", default-features = false, features = ["std"] }
hashbrown-5ef9efb8ec2df382 = { package = "hashbrown", version = "0.12", features = ["ahash", "inline-more", "raw"] }
hashbrown-594e8ee84c453af0 = { package = "hashbrown", version = "0.13", features = ["ahash", "inline-more", "raw"] }
indexmap = { version = "1", default-features = false, features = ["std"] }
@ -53,10 +53,10 @@ memchr = { version = "2", features = ["std"] }
nom = { version = "7", features = ["alloc", "std"] }
num-integer = { version = "0.1", default-features = false, features = ["i128", "std"] }
num-traits = { version = "0.2", features = ["i128", "libm", "std"] }
object_store = { git = "https://github.com/apache/arrow-rs.git", rev = "f5c165acc0e6cc4b34e0eaea006aab7e5bd28d66", default-features = false, features = ["aws", "azure", "base64", "cloud", "gcp", "getrandom", "quick-xml", "rand", "reqwest", "ring", "rustls-pemfile", "serde", "serde_json"] }
object_store = { version = "0.5", default-features = false, features = ["aws", "azure", "base64", "cloud", "gcp", "quick-xml", "rand", "reqwest", "ring", "rustls-pemfile", "serde", "serde_json"] }
once_cell = { version = "1", features = ["alloc", "parking_lot", "parking_lot_core", "race", "std"] }
parking_lot = { version = "0.12", features = ["arc_lock"] }
parquet = { version = "29", features = ["arrow", "arrow-array", "arrow-buffer", "arrow-cast", "arrow-data", "arrow-ipc", "arrow-schema", "arrow-select", "async", "base64", "brotli", "experimental", "flate2", "futures", "lz4", "snap", "tokio", "zstd"] }
parquet = { version = "30", features = ["arrow", "arrow-array", "arrow-buffer", "arrow-cast", "arrow-data", "arrow-ipc", "arrow-schema", "arrow-select", "async", "base64", "brotli", "experimental", "flate2", "futures", "lz4", "snap", "tokio", "zstd"] }
phf_shared = { version = "0.11", features = ["std"] }
predicates = { version = "2", features = ["diff", "difflib", "float-cmp", "normalize-line-endings", "regex"] }
prost = { version = "0.11", features = ["prost-derive", "std"] }
@ -109,7 +109,7 @@ futures-io = { version = "0.3", features = ["std"] }
futures-sink = { version = "0.3", features = ["alloc", "std"] }
futures-task = { version = "0.3", default-features = false, features = ["alloc", "std"] }
futures-util = { version = "0.3", features = ["alloc", "async-await", "async-await-macro", "channel", "futures-channel", "futures-io", "futures-macro", "futures-sink", "io", "memchr", "sink", "slab", "std"] }
getrandom = { version = "0.2", default-features = false, features = ["js", "js-sys", "std", "wasm-bindgen"] }
getrandom = { version = "0.2", default-features = false, features = ["std"] }
hashbrown-5ef9efb8ec2df382 = { package = "hashbrown", version = "0.12", features = ["ahash", "inline-more", "raw"] }
heck = { version = "0.4", features = ["unicode", "unicode-segmentation"] }
indexmap = { version = "1", default-features = false, features = ["std"] }