Merge branch 'main' into dom/cache-table-limit

pull/24376/head
Dom 2023-02-07 10:04:50 +00:00 committed by GitHub
commit 9e8785f9c1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 4315 additions and 1336 deletions

35
Cargo.lock generated
View File

@ -1822,6 +1822,18 @@ dependencies = [
"num-traits",
]
[[package]]
name = "flume"
version = "0.10.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1657b4441c3403d9f7b3409e47575237dac27b1b5726df654a6ecbf92f0f7577"
dependencies = [
"futures-core",
"futures-sink",
"pin-project",
"spin 0.9.4",
]
[[package]]
name = "fnv"
version = "1.0.7"
@ -2820,9 +2832,11 @@ dependencies = [
"mutable_batch",
"mutable_batch_lp",
"observability_deps",
"parking_lot 0.12.1",
"paste",
"pretty_assertions",
"rand",
"serde",
"snafu",
"sqlx",
"sqlx-hotswap-pool",
@ -3316,6 +3330,17 @@ version = "0.2.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "348108ab3fba42ec82ff6e9564fc4ca0247bdccdc68dd8af9764bbc79c3c8ffb"
[[package]]
name = "libsqlite3-sys"
version = "0.24.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "898745e570c7d0453cc1fbc4a701eb6c662ed54e8fec8b7d14be137ebeeb9d14"
dependencies = [
"cc",
"pkg-config",
"vcpkg",
]
[[package]]
name = "link-cplusplus"
version = "1.0.8"
@ -5442,8 +5467,10 @@ dependencies = [
"dotenvy",
"either",
"event-listener",
"flume",
"futures-channel",
"futures-core",
"futures-executor",
"futures-intrusive",
"futures-util",
"hashlink",
@ -5453,6 +5480,7 @@ dependencies = [
"indexmap",
"itoa 1.0.5",
"libc",
"libsqlite3-sys",
"log",
"md-5",
"memchr",
@ -6372,6 +6400,12 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d"
[[package]]
name = "vcpkg"
version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426"
[[package]]
name = "version_check"
version = "0.9.4"
@ -6724,6 +6758,7 @@ dependencies = [
"flate2",
"futures-channel",
"futures-core",
"futures-executor",
"futures-io",
"futures-sink",
"futures-task",

View File

@ -1,4 +1,5 @@
//! Catalog-DSN-related configs.
use iox_catalog::sqlite::{SqliteCatalog, SqliteConnectionOptions};
use iox_catalog::{
create_or_get_default_records,
interface::Catalog,
@ -15,6 +16,9 @@ pub enum Error {
#[snafu(display("A Postgres connection string in --catalog-dsn is required."))]
ConnectionStringRequired,
#[snafu(display("A SQLite connection string in --catalog-dsn is required."))]
ConnectionStringSqliteRequired,
#[snafu(display("A catalog error occurred: {}", source))]
Catalog {
source: iox_catalog::interface::Error,
@ -44,7 +48,7 @@ fn default_hotswap_poll_interval_timeout() -> &'static str {
}
/// CLI config for catalog DSN.
#[derive(Debug, Clone, clap::Parser)]
#[derive(Debug, Clone, Default, clap::Parser)]
pub struct CatalogDsnConfig {
/// The type of catalog to use. "memory" is only useful for testing purposes.
#[clap(
@ -110,13 +114,17 @@ pub struct CatalogDsnConfig {
}
/// Catalog type.
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, clap::ValueEnum)]
#[derive(Debug, Copy, Clone, Default, PartialEq, Eq, PartialOrd, Ord, clap::ValueEnum)]
pub enum CatalogType {
/// PostgreSQL.
#[default]
Postgres,
/// In-memory.
Memory,
/// SQLite.
Sqlite,
}
impl CatalogDsnConfig {
@ -127,12 +135,7 @@ impl CatalogDsnConfig {
Self {
catalog_type_: CatalogType::Memory,
dsn: None,
max_catalog_connections: PostgresConnectionOptions::DEFAULT_MAX_CONNS,
postgres_schema_name: PostgresConnectionOptions::DEFAULT_SCHEMA_NAME.to_string(),
connect_timeout: PostgresConnectionOptions::DEFAULT_CONNECT_TIMEOUT,
idle_timeout: PostgresConnectionOptions::DEFAULT_IDLE_TIMEOUT,
hotswap_poll_interval: PostgresConnectionOptions::DEFAULT_HOTSWAP_POLL_INTERVAL,
..Self::default()
}
}
@ -151,6 +154,17 @@ impl CatalogDsnConfig {
}
}
/// Create a new Postgres instance for all-in-one mode if a catalog DSN is specified
pub fn new_sqlite(dsn: String) -> Self {
info!("Catalog: SQLite at `{}`", dsn);
Self {
catalog_type_: CatalogType::Sqlite,
dsn: Some(dsn),
..Self::default()
}
}
/// Get config-dependent catalog.
pub async fn get_catalog(
&self,
@ -189,6 +203,20 @@ impl CatalogDsnConfig {
Arc::new(mem) as Arc<dyn Catalog>
}
CatalogType::Sqlite => {
let options = SqliteConnectionOptions {
dsn: self
.dsn
.as_ref()
.context(ConnectionStringSqliteRequiredSnafu)?
.clone(),
};
Arc::new(
SqliteCatalog::connect(options, metrics)
.await
.context(CatalogSnafu)?,
) as Arc<dyn Catalog>
}
};
Ok(catalog)

View File

@ -3,7 +3,7 @@ mod tests {
use std::{num::NonZeroUsize, sync::Arc, time::Duration};
use arrow_util::assert_batches_sorted_eq;
use data_types::CompactionLevel;
use data_types::{CompactionLevel, ParquetFile};
use iox_query::exec::ExecutorType;
use tracker::AsyncSemaphoreMetrics;
@ -46,16 +46,10 @@ mod tests {
setup.set_compact_version(AlgoVersion::AllAtOnce);
// verify 6 files
let files = setup.list_by_table_not_to_delete().await;
assert_eq!(files.len(), 6);
//
// verify ID and compaction level of the files
let files_and_levels: Vec<_> = files
.iter()
.map(|f| (f.id.get(), f.compaction_level))
.collect();
assert_eq!(
files_and_levels,
let files = setup.list_by_table_not_to_delete().await;
assert_levels(
&files,
vec![
(1, CompactionLevel::FileNonOverlapped),
(2, CompactionLevel::Initial),
@ -63,30 +57,21 @@ mod tests {
(4, CompactionLevel::FileNonOverlapped),
(5, CompactionLevel::Initial),
(6, CompactionLevel::Initial),
]
],
);
// verify ID and max_l0_created_at
let time_provider = Arc::clone(&setup.config.time_provider);
let time_1_minute_future = time_provider.minutes_into_future(1).timestamp_nanos();
let time_2_minutes_future = time_provider.minutes_into_future(2).timestamp_nanos();
let time_3_minutes_future = time_provider.minutes_into_future(3).timestamp_nanos();
let time_5_minutes_future = time_provider.minutes_into_future(5).timestamp_nanos();
let files_and_max_l0_created_ats: Vec<_> = files
.iter()
.map(|f| (f.id.get(), f.max_l0_created_at.get()))
.collect();
assert_eq!(
files_and_max_l0_created_ats,
let times = setup.test_times();
assert_max_l0_created_at(
&files,
vec![
(1, time_1_minute_future),
(2, time_2_minutes_future),
(3, time_5_minutes_future),
(4, time_3_minutes_future),
(5, time_5_minutes_future),
(6, time_2_minutes_future),
]
(1, times.time_1_minute_future),
(2, times.time_2_minutes_future),
(3, times.time_5_minutes_future),
(4, times.time_3_minutes_future),
(5, times.time_5_minutes_future),
(6, times.time_2_minutes_future),
],
);
// compact
@ -94,30 +79,21 @@ mod tests {
// verify number of files: 6 files are compacted into 2 files
let files = setup.list_by_table_not_to_delete().await;
assert_eq!(files.len(), 2);
//
// verify ID and compaction level of the files
let files_and_levels: Vec<_> = files
.iter()
.map(|f| (f.id.get(), f.compaction_level))
.collect();
println!("{files_and_levels:?}");
assert_eq!(
files_and_levels,
assert_levels(
&files,
vec![
(7, CompactionLevel::FileNonOverlapped),
(8, CompactionLevel::FileNonOverlapped),
]
],
);
// verify ID and max_l0_created_at
let files_and_max_l0_created_ats: Vec<_> = files
.iter()
.map(|f| (f.id.get(), f.max_l0_created_at.get()))
.collect();
// both files have max_l0_created time_5_minutes_future which is the max of all L0 input's max_l0_created_at
assert_eq!(
files_and_max_l0_created_ats,
vec![(7, time_5_minutes_future), (8, time_5_minutes_future),]
assert_max_l0_created_at(
&files,
// both files have max_l0_created time_5_minutes_future
// which is the max of all L0 input's max_l0_created_at
vec![
(7, times.time_5_minutes_future),
(8, times.time_5_minutes_future),
],
);
// verify the content of files
@ -170,15 +146,8 @@ mod tests {
// verify 6 files
let files = setup.list_by_table_not_to_delete().await;
assert_eq!(files.len(), 6);
//
// verify ID and compaction level of the files
let files_and_levels: Vec<_> = files
.iter()
.map(|f| (f.id.get(), f.compaction_level))
.collect();
assert_eq!(
files_and_levels,
assert_levels(
&files,
vec![
(1, CompactionLevel::FileNonOverlapped),
(2, CompactionLevel::Initial),
@ -186,30 +155,21 @@ mod tests {
(4, CompactionLevel::FileNonOverlapped),
(5, CompactionLevel::Initial),
(6, CompactionLevel::Initial),
]
],
);
// verify ID and max_l0_created_at
let time_provider = Arc::clone(&setup.config.time_provider);
let time_1_minute_future = time_provider.minutes_into_future(1).timestamp_nanos();
let time_2_minutes_future = time_provider.minutes_into_future(2).timestamp_nanos();
let time_3_minutes_future = time_provider.minutes_into_future(3).timestamp_nanos();
let time_5_minutes_future = time_provider.minutes_into_future(5).timestamp_nanos();
let files_and_max_l0_created_ats: Vec<_> = files
.iter()
.map(|f| (f.id.get(), f.max_l0_created_at.get()))
.collect();
assert_eq!(
files_and_max_l0_created_ats,
let times = setup.test_times();
assert_max_l0_created_at(
&files,
vec![
(1, time_1_minute_future),
(2, time_2_minutes_future),
(3, time_5_minutes_future),
(4, time_3_minutes_future),
(5, time_5_minutes_future),
(6, time_2_minutes_future),
]
(1, times.time_1_minute_future),
(2, times.time_2_minutes_future),
(3, times.time_5_minutes_future),
(4, times.time_3_minutes_future),
(5, times.time_5_minutes_future),
(6, times.time_2_minutes_future),
],
);
// compact
@ -218,29 +178,23 @@ mod tests {
// verify number of files: 6 files are compacted into 2 files
let files = setup.list_by_table_not_to_delete().await;
assert_eq!(files.len(), 2);
//
// verify ID and compaction level of the files
let files_and_levels: Vec<_> = files
.iter()
.map(|f| (f.id.get(), f.compaction_level))
.collect();
println!("{files_and_levels:?}");
// This is the result of 2-round compaction fomr L0s -> L1s and then L1s -> L2s
// The first round will create two L1 files IDs 7 and 8
// The second round will create tow L2 file IDs 9 and 10
assert_eq!(
files_and_levels,
vec![(9, CompactionLevel::Final), (10, CompactionLevel::Final),]
assert_levels(
&files,
// This is the result of 2-round compaction fomr L0s -> L1s and then L1s -> L2s
// The first round will create two L1 files IDs 7 and 8
// The second round will create tow L2 file IDs 9 and 10
vec![(9, CompactionLevel::Final), (10, CompactionLevel::Final)],
);
// verify ID and max_l0_created_at
let files_and_max_l0_created_ats: Vec<_> = files
.iter()
.map(|f| (f.id.get(), f.max_l0_created_at.get()))
.collect();
// both files have max_l0_created time_5_minutes_future which is the max of all L0 input's max_l0_created_at
assert_eq!(
files_and_max_l0_created_ats,
vec![(9, time_5_minutes_future), (10, time_5_minutes_future),]
assert_max_l0_created_at(
&files,
// both files have max_l0_created time_5_minutes_future
// which is the max of all L0 input's max_l0_created_at
vec![
(9, times.time_5_minutes_future),
(10, times.time_5_minutes_future),
],
);
// verify the content of files
@ -289,26 +243,18 @@ mod tests {
// Create a test setup with 6 files
let setup = TestSetup::builder().with_files().build().await;
let expected_files_and_levels = vec![
(1, CompactionLevel::FileNonOverlapped),
(2, CompactionLevel::Initial),
(3, CompactionLevel::Initial),
(4, CompactionLevel::FileNonOverlapped),
(5, CompactionLevel::Initial),
(6, CompactionLevel::Initial),
];
// verify 6 files
let files = setup.list_by_table_not_to_delete().await;
assert_eq!(files.len(), 6);
//
// verify ID and compaction level of the files
let files_and_levels: Vec<_> = files
.iter()
.map(|f| (f.id.get(), f.compaction_level))
.collect();
assert_eq!(
files_and_levels,
vec![
(1, CompactionLevel::FileNonOverlapped),
(2, CompactionLevel::Initial),
(3, CompactionLevel::Initial),
(4, CompactionLevel::FileNonOverlapped),
(5, CompactionLevel::Initial),
(6, CompactionLevel::Initial),
]
);
assert_levels(&files, expected_files_and_levels.clone());
// add the partition into skipped compaction
setup
@ -321,24 +267,7 @@ mod tests {
// verify still 6 files
let files = setup.list_by_table_not_to_delete().await;
assert_eq!(files.len(), 6);
//
// verify ID and compaction level of the files
let files_and_levels: Vec<_> = files
.iter()
.map(|f| (f.id.get(), f.compaction_level))
.collect();
assert_eq!(
files_and_levels,
vec![
(1, CompactionLevel::FileNonOverlapped),
(2, CompactionLevel::Initial),
(3, CompactionLevel::Initial),
(4, CompactionLevel::FileNonOverlapped),
(5, CompactionLevel::Initial),
(6, CompactionLevel::Initial),
]
);
assert_levels(&files, expected_files_and_levels.clone());
}
#[tokio::test]
@ -478,4 +407,39 @@ mod tests {
)
.await;
}
#[track_caller]
fn assert_levels<'a>(
files: impl IntoIterator<Item = &'a ParquetFile>,
expected_files_and_levels: impl IntoIterator<Item = (i64, CompactionLevel)>,
) {
let files_and_levels: Vec<_> = files
.into_iter()
.map(|f| (f.id.get(), f.compaction_level))
.collect();
let expected_files_and_levels: Vec<_> = expected_files_and_levels.into_iter().collect();
assert_eq!(files_and_levels, expected_files_and_levels);
}
#[track_caller]
/// Asserts each parquet file has (id, max_l0_created_at)
fn assert_max_l0_created_at<'a>(
files: impl IntoIterator<Item = &'a ParquetFile>,
expected_files_and_max_l0_created_ats: impl IntoIterator<Item = (i64, i64)>,
) {
let files_and_max_l0_created_ats: Vec<_> = files
.into_iter()
.map(|f| (f.id.get(), f.max_l0_created_at.get()))
.collect();
let expected_files_and_max_l0_created_ats: Vec<_> =
expected_files_and_max_l0_created_ats.into_iter().collect();
assert_eq!(
files_and_max_l0_created_ats,
expected_files_and_max_l0_created_ats
);
}
}

View File

@ -5,5 +5,10 @@ use data_types::ParquetFile;
pub mod single_branch;
pub trait DivideInitial: Debug + Display + Send + Sync {
/// Divides a group of files that should be compacted into
/// potentially smaller groups called "branches",
///
/// Each branch is compacted together in a single plan, and each
/// compact plan may produce one or more parquet files.
fn divide(&self, files: Vec<ParquetFile>) -> Vec<Vec<ParquetFile>>;
}

View File

@ -10,7 +10,12 @@ pub mod target_level_target_level_split;
pub mod target_level_upgrade_split;
pub trait FilesSplit: Debug + Display + Send + Sync {
/// Split provided files into 2 groups of files. There will be different split needs:
/// Split provided files into 2 groups of files:
/// (files_to_compact, files_to_keep)
///
/// Only files in files_to_compact are considered for compaction this round
///
/// There will be different split needs:
/// . `[files <= target_level]` and `[files > target_level]`
/// . `[overlapping_files]` and `[non_overlapping_files]`
/// . `[files_to_upgrade]` and `[files_to_compact]`

View File

@ -20,11 +20,16 @@ pub mod or;
///
/// May return an error. In this case, the partition will be marked as "skipped".
///
/// If you only plan to inspect the ID but not the files and not perform any IO, check
/// [`IdOnlyPartitionFilter`](crate::components::id_only_partition_filter::IdOnlyPartitionFilter) which usually runs
/// earlier in the pipeline and hence is more efficient.
/// If you only plan to inspect the ID but not the files and not
/// perform any IO, check
/// [`IdOnlyPartitionFilter`](crate::components::id_only_partition_filter::IdOnlyPartitionFilter)
/// which usually runs earlier in the pipeline and hence is more
/// efficient.
#[async_trait]
pub trait PartitionFilter: Debug + Display + Send + Sync {
/// Return `true` if the if the compactor should run a
/// compaction on this partition. Return `false` if this partition
/// does not need any more compaction.
async fn apply(
&self,
partition_id: PartitionId,

View File

@ -4,6 +4,7 @@ use data_types::{CompactionLevel, ParquetFile, ParquetFileParams, PartitionId};
use datafusion::physical_plan::SendableRecordBatchStream;
use futures::{stream::FuturesOrdered, StreamExt, TryFutureExt, TryStreamExt};
use iox_time::Time;
use observability_deps::tracing::info;
use parquet_file::ParquetFilePath;
use tracker::InstrumentedAsyncSemaphore;
@ -100,8 +101,8 @@ async fn compact_partition(
/// The files are split into non-time-overlaped branches, each is compacted in parallel.
/// The output of each branch is then combined and re-branch in next round until
/// they should not be compacted based on defined stop conditions.
//
// Example: Partition has 7 files: f1, f2, f3, f4, f5, f6, f7
///
/// Example: Partition has 7 files: f1, f2, f3, f4, f5, f6, f7
/// Input: shown by their time range
/// |--f1--| |----f3----| |-f4-||-f5-||-f7-|
/// |------f2----------| |--f6--|
@ -193,6 +194,7 @@ async fn try_compact_partition(
// fetch partition info only if we need it
let mut lazy_partition_info = None;
// loop for each "Round", consider each file in the partition
loop {
files = components.files_filter.apply(files);
@ -218,15 +220,18 @@ async fn try_compact_partition(
let mut branches = components.divide_initial.divide(files_now);
let mut files_next = files_later;
// loop for each "Branch"
while let Some(branch) = branches.pop() {
let input_paths: Vec<ParquetFilePath> = branch.iter().map(|f| f.into()).collect();
let input_paths: Vec<ParquetFilePath> =
branch.iter().map(ParquetFilePath::from).collect();
// Identify the target level and files that should be compacted, upgraded, and
// kept for next round of compaction
let compaction_plan = buil_compaction_plan(branch, Arc::clone(&components))?;
// Identify the target level and files that should be
// compacted together, upgraded, and kept for next round of
// compaction
let compaction_plan = build_compaction_plan(branch, Arc::clone(&components))?;
// Compact
let created_file_params = compact_files(
let created_file_params = run_compaction_plan(
&compaction_plan.files_to_compact,
partition_info,
&components,
@ -265,24 +270,30 @@ async fn try_compact_partition(
}
}
/// Each CompactionPlan specifies the target level and files that should be compacted, upgraded, and
/// kept for next round of compaction
/// A CompactionPlan specifies the parameters for a single, which may
/// generate one or more new parquet files. It includes the target
/// [`CompactionLevel`], the specific files that should be compacted
/// together to form new file(s), files that should be upgraded
/// without chainging, files that should be left unmodified.
struct CompactionPlan {
/// Target level to compact to
/// The target level of file resulting from compaction
target_level: CompactionLevel,
/// Small and/or overlapped files to compact
/// Files which should be compacted into a new single parquet
/// file, often the small and/or overlapped files
files_to_compact: Vec<ParquetFile>,
/// Non-overlapped and large enough files to upgrade
/// Non-overlapped files that should be upgraded to the target
/// level without rewriting (for example they are of sufficient
/// size)
files_to_upgrade: Vec<ParquetFile>,
/// Non-overlapped or higher-target-level files to keep for next round of compaction
/// files which should not be modified. For example,
/// non-overlapped or higher-target-level files
files_to_keep: Vec<ParquetFile>,
}
/// Build compaction plan for a given set of files
/// This function will determine the target level to compact to and split the files into
/// files_to_compact, files_to_upgrade, and files_to_keep
/// Build [`CompactionPlan`] for a for a given set of files.
///
/// # Example:
///
/// Example:
/// . Input:
/// |--L0.1--| |--L0.2--| |--L0.3--| |--L0.4--| --L0.5--|
/// |--L1.1--| |--L1.2--| |--L1.3--| |--L1.4--|
@ -294,7 +305,7 @@ struct CompactionPlan {
/// . files_to_upgrade = [L0.1, L0.5]
/// . files_to_compact = [L0.2, L0.3, L0.4, L1.2, L1.3]
///
fn buil_compaction_plan(
fn build_compaction_plan(
files: Vec<ParquetFile>,
components: Arc<Components>,
) -> Result<CompactionPlan, DynError> {
@ -308,7 +319,7 @@ fn buil_compaction_plan(
// Since output of one compaction is used as input of next compaction, all files that are not
// compacted or upgraded are still kept to consider in next round of compaction
// Split atctual files to compact from its higher-target-level files
// Split actual files to compact from its higher-target-level files
// The higher-target-level files are kept for next round of compaction
let (files_to_compact, mut files_to_keep) = components
.target_level_split
@ -326,6 +337,14 @@ fn buil_compaction_plan(
.upgrade_split
.apply(files_to_compact, target_level);
info!(
target_level = target_level.to_string(),
files_to_compacts = files_to_compact.len(),
files_to_upgrade = files_to_upgrade.len(),
files_to_keep = files_to_keep.len(),
"Compaction Plan"
);
Ok(CompactionPlan {
target_level,
files_to_compact,
@ -334,10 +353,8 @@ fn buil_compaction_plan(
})
}
/// Compact into the given target_level
/// This function assumes the input files only include overlapped files of `target_level - 1`
/// and files of target_level.
async fn compact_files(
/// Compact `files` into a new parquet file of the the given target_level
async fn run_compaction_plan(
files: &[ParquetFile],
partition_info: &Arc<PartitionInfo>,
components: &Arc<Components>,

View File

@ -550,6 +550,34 @@ impl TestSetup {
let mut config = Arc::get_mut(&mut self.config).unwrap();
config.min_num_l1_files_to_compact = min_num_l1_files_to_compact;
}
/// return a set of times relative to config.time_provider.now()
pub fn test_times(&self) -> TestTimes {
TestTimes::new(self.config.time_provider.as_ref())
}
}
/// A collection of nanosecond timestamps relative to now
pub struct TestTimes {
pub time_1_minute_future: i64,
pub time_2_minutes_future: i64,
pub time_3_minutes_future: i64,
pub time_5_minutes_future: i64,
}
impl TestTimes {
fn new(time_provider: &dyn TimeProvider) -> Self {
let time_1_minute_future = time_provider.minutes_into_future(1).timestamp_nanos();
let time_2_minutes_future = time_provider.minutes_into_future(2).timestamp_nanos();
let time_3_minutes_future = time_provider.minutes_into_future(3).timestamp_nanos();
let time_5_minutes_future = time_provider.minutes_into_future(5).timestamp_nanos();
Self {
time_1_minute_future,
time_2_minutes_future,
time_3_minutes_future,
time_5_minutes_future,
}
}
}
pub async fn list_object_store(store: &Arc<DynObjectStore>) -> HashSet<Path> {

View File

@ -858,11 +858,8 @@ impl From<&str> for PartitionKey {
}
}
impl<DB> sqlx::Type<DB> for PartitionKey
where
DB: sqlx::Database<TypeInfo = sqlx::postgres::PgTypeInfo>,
{
fn type_info() -> DB::TypeInfo {
impl sqlx::Type<sqlx::Postgres> for PartitionKey {
fn type_info() -> sqlx::postgres::PgTypeInfo {
// Store this type as VARCHAR
sqlx::postgres::PgTypeInfo::with_name("VARCHAR")
}
@ -887,6 +884,31 @@ impl sqlx::Decode<'_, sqlx::Postgres> for PartitionKey {
}
}
impl sqlx::Type<sqlx::Sqlite> for PartitionKey {
fn type_info() -> sqlx::sqlite::SqliteTypeInfo {
<String as sqlx::Type<sqlx::Sqlite>>::type_info()
}
}
impl sqlx::Encode<'_, sqlx::Sqlite> for PartitionKey {
fn encode_by_ref(
&self,
buf: &mut <sqlx::Sqlite as sqlx::database::HasArguments<'_>>::ArgumentBuffer,
) -> sqlx::encode::IsNull {
<String as sqlx::Encode<sqlx::Sqlite>>::encode(self.0.to_string(), buf)
}
}
impl sqlx::Decode<'_, sqlx::Sqlite> for PartitionKey {
fn decode(
value: <sqlx::Sqlite as sqlx::database::HasValueRef<'_>>::ValueRef,
) -> Result<Self, Box<dyn std::error::Error + 'static + Send + Sync>> {
Ok(Self(
<String as sqlx::Decode<sqlx::Sqlite>>::decode(value)?.into(),
))
}
}
/// Data object for a partition. The combination of shard, table and key are unique (i.e. only
/// one record can exist for each combo)
#[derive(Debug, Clone, PartialEq, Eq, sqlx::FromRow)]

View File

@ -14,8 +14,6 @@ use std::{collections::HashMap, sync::Arc};
use crate::process_info::{setup_metric_registry, USIZE_MAX};
mod generate;
#[derive(Debug, clap::Parser)]
pub struct Config {
#[clap(subcommand)]
@ -53,21 +51,6 @@ pub enum Command {
)]
exec_mem_pool_bytes: usize,
},
/// Generate Parquet files and catalog entries with different characteristics for the purposes
/// of investigating how the compactor handles them.
///
/// Only works with `--object-store file` because this is for generating local development
/// data.
///
/// Within the directory specified by `--data-dir`, will generate a
/// `compactor_data/line_protocol` subdirectory to avoid interfering with other existing IOx
/// files that may be in the `--data-dir`.
///
/// WARNING: On every run of this tool, the `compactor_data/line_protocol` subdirectory will be
/// removed. If you want to keep any previously generated files, move or copy them before
/// running this tool again.
Generate(generate::Config),
}
pub async fn command(config: Config) -> Result<()> {
@ -121,9 +104,6 @@ pub async fn command(config: Config) -> Result<()> {
compactor::handler::run_compactor_once(compactor).await;
}
Command::Generate(config) => {
generate::run(config).await?;
}
}
Ok(())
@ -143,9 +123,6 @@ pub enum Error {
#[snafu(context(false))]
Compacting { source: ioxd_compactor::Error },
#[snafu(context(false))]
Generating { source: generate::Error },
}
pub type Result<T, E = Error> = std::result::Result<T, E>;

View File

@ -1,685 +0,0 @@
//! Implements the `compactor generate` command.
use bytes::Bytes;
use clap::ValueEnum;
use clap_blocks::{
catalog_dsn::CatalogDsnConfig,
object_store::{make_object_store, ObjectStoreConfig, ObjectStoreType},
};
use object_store::DynObjectStore;
use snafu::prelude::*;
use std::{
ffi::OsStr, fmt::Write, fs, num::NonZeroUsize, path::PathBuf, process::Command, sync::Arc,
};
#[derive(Debug, clap::Parser)]
pub struct Config {
#[clap(flatten)]
object_store_config: ObjectStoreConfig,
#[clap(flatten)]
catalog_dsn: CatalogDsnConfig,
/// The type of compaction to be done on the files. If `hot` is specified, the generated
/// files will have compaction level 0, will overlap with each other slightly, and will be
/// marked that they were created within the last (approximately) 30 minutes. If `cold` is
/// specified, the generated files will have compaction level 1, won't overlap with each other,
/// and will be marked that they were created between 8 and 24 hours ago.
#[clap(
value_enum,
value_parser,
long = "compaction-type",
env = "INFLUXDB_IOX_COMPACTOR_GENERATE_TYPE",
default_value = "hot",
action
)]
compaction_type: CompactionType,
/// The number of IOx partitions to generate files for. Each partition will have the number
/// of files specified by `--num-files` generated.
#[clap(
long = "num-partitions",
env = "INFLUXDB_IOX_COMPACTOR_GENERATE_NUM_PARTITIONS",
default_value = "1",
action
)]
num_partitions: NonZeroUsize,
/// The number of parquet files to generate per partition.
#[clap(
long = "num-files",
env = "INFLUXDB_IOX_COMPACTOR_GENERATE_NUM_FILES",
default_value = "1",
action
)]
num_files: NonZeroUsize,
/// The number of columns to generate in each file. One column will always be the
/// timestamp. Additional columns will be given a type in I64, F64, String, Bool, and
/// Tag in equal proportion.
#[clap(
long = "num-cols",
env = "INFLUXDB_IOX_COMPACTOR_GENERATE_NUM_COLS",
default_value = "6",
action
)]
num_columns: NonZeroUsize,
/// The number of rows to generate in each file.
#[clap(
long = "num-rows",
env = "INFLUXDB_IOX_COMPACTOR_GENERATE_NUM_ROWS",
default_value = "1",
action
)]
num_rows: NonZeroUsize,
}
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, ValueEnum)]
pub enum CompactionType {
Hot,
Cold,
}
pub async fn run(config: Config) -> Result<()> {
if !matches!(
&config.object_store_config.object_store,
Some(ObjectStoreType::File)
) {
panic!("Sorry, this tool only works with 'file' object stores.");
}
let object_store = make_object_store(&config.object_store_config)?;
let root_dir: PathBuf = config
.object_store_config
.database_directory
.as_ref()
.expect("--data-dir is required and has already been checked")
.into();
let compactor_data_dir = root_dir.join("compactor_data");
let parquet_dir = compactor_data_dir.join("parquet");
if compactor_data_dir
.try_exists()
.context(FileExistenceSnafu {
path: &compactor_data_dir,
})?
{
fs::remove_dir_all(&compactor_data_dir).context(RemoveSnafu {
path: &compactor_data_dir,
})?;
}
let spec_location = "compactor_data/spec.toml";
let spec_in_root = compactor_data_dir.join("spec.toml");
let Config {
compaction_type,
num_rows,
num_files,
..
} = config;
let TimeValues {
sampling_interval_ns,
start_end_args,
} = TimeValues::new(compaction_type, num_rows.get(), num_files.get());
for (file_id, &start_end) in start_end_args
.iter()
.enumerate()
.take(config.num_files.get())
{
write_data_generation_spec(
file_id,
Arc::clone(&object_store),
config.num_columns.get(),
sampling_interval_ns,
spec_location,
)
.await?;
let StartEndMinutesAgo { start, end } = start_end;
generate_data(&spec_in_root, &parquet_dir, num_rows.get(), start, end)?;
}
Ok(())
}
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Could not parse the object store configuration"))]
#[snafu(context(false))]
ObjectStoreConfigParsing {
source: clap_blocks::object_store::ParseError,
},
#[snafu(display("Could not write file to object storage"))]
ObjectStoreWriting { source: object_store::Error },
#[snafu(display("Could not parse object store path"))]
ObjectStorePathParsing { source: object_store::path::Error },
#[snafu(display("Subcommand failed: {status}"))]
Subcommand { status: String },
#[snafu(display("Could not check for existence of path {}", path.display()))]
FileExistence {
path: PathBuf,
source: std::io::Error,
},
#[snafu(display("Could not remove directory {}", path.display()))]
Remove {
path: PathBuf,
source: std::io::Error,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
async fn write_data_generation_spec(
file_id: usize,
object_store: Arc<DynObjectStore>,
num_columns: usize,
sampling_interval_ns: usize,
spec_location: &str,
) -> Result<()> {
let object_store_spec_path =
object_store::path::Path::parse(spec_location).context(ObjectStorePathParsingSnafu)?;
let contents = data_generation_spec_contents(file_id, sampling_interval_ns, num_columns);
let data = Bytes::from(contents);
object_store
.put(&object_store_spec_path, data)
.await
.context(ObjectStoreWritingSnafu)?;
Ok(())
}
fn generate_data(
spec_in_root: impl AsRef<OsStr>,
parquet_dir: impl AsRef<OsStr>,
num_rows: usize,
start: usize,
end: usize,
) -> Result<()> {
let status = Command::new("cargo")
.arg("run")
.arg("-p")
.arg("iox_data_generator")
.arg("--")
.arg("--specification")
.arg(&spec_in_root)
.arg("--parquet")
.arg(&parquet_dir)
.arg("--start")
.arg(&format!("{start} minutes ago"))
.arg("--end")
.arg(&format!("{end} minutes ago"))
.arg("--batch-size")
.arg(num_rows.to_string())
.status()
.expect("Running the data generator should have worked");
ensure!(
status.success(),
SubcommandSnafu {
status: status.to_string()
}
);
Ok(())
}
fn data_generation_spec_contents(
file_id: usize,
sampling_interval_ns: usize,
num_columns: usize,
) -> String {
let mut spec = format!(
r#"
name = "for_compaction"
[[database_writers]]
database_ratio = 1.0
agents = [{{name = "data_{file_id}", sampling_interval = "{sampling_interval_ns}ns"}}]
[[agents]]
name = "data_{file_id}"
[[agents.measurements]]
name = "measure"
"#
);
// The 1st column is always time, and the data generator always generates a timestamp without
// any configuration needed, so the number of columns that need configuration is one less.
let num_columns = num_columns - 1;
// Every 5th column will be a tag.
let num_tags = num_columns / 5;
// The remaining columns will be fields of various types.
let num_fields = num_columns - num_tags;
// Tags go with the measurement, so they have to be specified in the config first.
if num_tags > 0 {
spec.push_str("tag_pairs = [\n");
for tag_id in 1..=num_tags {
let _ = write!(
spec,
r#" {{key = "tag_{tag_id}", template = "{{{{random 1}}}}", regenerate_after_lines = 1}},"#
);
spec.push('\n');
}
spec.push_str("]\n")
}
for field_id in 0..num_fields {
spec.push_str(&field_spec(field_id));
spec.push('\n');
}
spec
}
fn field_spec(field_id: usize) -> String {
match field_id % 4 {
0 => format!(
r#"
[[agents.measurements.fields]]
name = "i64_{field_id}"
i64_range = [0, 100]"#
),
1 => format!(
r#"
[[agents.measurements.fields]]
name = "f64_{field_id}"
f64_range = [0.0, 100.0]"#
),
2 => format!(
r#"
[[agents.measurements.fields]]
name = "string_{field_id}"
template = "{{{{random 4}}}}""#
),
3 => format!(
r#"
[[agents.measurements.fields]]
name = "bool_{field_id}"
bool = true"#
),
_ => unreachable!("% 4 can only result in 0 - 3"),
}
}
#[derive(Debug, PartialEq, Clone)]
struct TimeValues {
sampling_interval_ns: usize,
start_end_args: Vec<StartEndMinutesAgo>,
}
#[derive(Debug, PartialEq, Copy, Clone)]
struct StartEndMinutesAgo {
start: usize,
end: usize,
}
impl TimeValues {
fn new(compaction_type: CompactionType, num_rows: usize, num_files: usize) -> Self {
match compaction_type {
CompactionType::Hot => {
// Make the range approximately 30 min ago to now.
let full_range_start_minutes = 30;
let full_range_end_minutes = 0;
// Overlap each file by this many minutes on the start and end with other files to
// create realistic level 0 files for hot compaction.
let overlap_minutes = 1;
Self::inner(
full_range_start_minutes,
full_range_end_minutes,
overlap_minutes,
num_rows,
num_files,
)
}
CompactionType::Cold => {
// Make the range approximately 24 hours ago to 8 hours ago.
let full_range_start_minutes = 24 * 60;
let full_range_end_minutes = 8 * 60;
// Don't overlap level 1 files
let overlap_minutes = 0;
Self::inner(
full_range_start_minutes,
full_range_end_minutes,
overlap_minutes,
num_rows,
num_files,
)
}
}
}
// Clippy suggests changing `if overlap_minutes == 0 { 1 } else { 0 }` to
// `usize::from(overlap_minutes == 0)`, but I think the original is clearer
#[allow(clippy::bool_to_int_with_if)]
fn inner(
full_range_start_minutes: usize,
full_range_end_minutes: usize,
overlap_minutes: usize,
num_rows: usize,
num_files: usize,
) -> Self {
// Divide the full range evenly across all files, plus the overlap on each end.
let full_range_length_minutes = full_range_start_minutes - full_range_end_minutes;
let minutes_per_file = full_range_length_minutes / num_files + overlap_minutes * 2;
// Tell the generator to create one point every this many nanoseconds to create the
// specified number of rows in each file.
let fencepost_num_rows = if num_rows != 1 {
num_rows - 1
} else {
num_rows
};
let sampling_interval_ns = (minutes_per_file * 60 * 1_000_000_000) / fencepost_num_rows;
let start_end_args = (0..num_files)
.rev()
.map(|file_id| StartEndMinutesAgo {
start: minutes_per_file * (file_id + 1) - overlap_minutes * file_id
+ full_range_end_minutes,
end: minutes_per_file * file_id - overlap_minutes * file_id
+ full_range_end_minutes
// When the overlap is 0, subtract 1 because the data generator is inclusive
- (if overlap_minutes == 0 { 1 } else { 0 }),
})
.collect();
Self {
sampling_interval_ns,
start_end_args,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
mod hot {
use super::*;
const COMPACTION_TYPE: CompactionType = CompactionType::Hot;
#[test]
fn one_row_one_file() {
let num_rows = 1;
let num_files = 1;
let TimeValues {
sampling_interval_ns,
start_end_args,
} = TimeValues::new(COMPACTION_TYPE, num_rows, num_files);
assert_eq!(sampling_interval_ns, 1_920_000_000_000);
assert_eq!(
start_end_args,
vec![StartEndMinutesAgo { start: 32, end: 0 }]
);
}
#[test]
fn one_thousand_rows_one_file() {
let num_rows = 1_000;
let num_files = 1;
let TimeValues {
sampling_interval_ns,
start_end_args,
} = TimeValues::new(COMPACTION_TYPE, num_rows, num_files);
assert_eq!(sampling_interval_ns, 1_921_921_921);
assert_eq!(
start_end_args,
vec![StartEndMinutesAgo { start: 32, end: 0 }]
);
}
#[test]
fn one_row_three_files() {
let num_rows = 1;
let num_files = 3;
let TimeValues {
sampling_interval_ns,
start_end_args,
} = TimeValues::new(COMPACTION_TYPE, num_rows, num_files);
assert_eq!(sampling_interval_ns, 720_000_000_000);
assert_eq!(
start_end_args,
vec![
StartEndMinutesAgo { start: 34, end: 22 },
StartEndMinutesAgo { start: 23, end: 11 },
StartEndMinutesAgo { start: 12, end: 0 },
]
);
}
#[test]
fn one_thousand_rows_three_files() {
let num_rows = 1_000;
let num_files = 3;
let TimeValues {
sampling_interval_ns,
start_end_args,
} = TimeValues::new(COMPACTION_TYPE, num_rows, num_files);
assert_eq!(sampling_interval_ns, 720_720_720);
assert_eq!(
start_end_args,
vec![
StartEndMinutesAgo { start: 34, end: 22 },
StartEndMinutesAgo { start: 23, end: 11 },
StartEndMinutesAgo { start: 12, end: 0 },
]
);
}
}
mod cold {
use super::*;
const COMPACTION_TYPE: CompactionType = CompactionType::Cold;
#[test]
fn one_row_one_file() {
let num_rows = 1;
let num_files = 1;
let TimeValues {
sampling_interval_ns,
start_end_args,
} = TimeValues::new(COMPACTION_TYPE, num_rows, num_files);
assert_eq!(sampling_interval_ns, 57_600_000_000_000);
assert_eq!(
start_end_args,
vec![StartEndMinutesAgo {
start: 24 * 60,
end: 8 * 60 - 1,
}]
);
}
#[test]
fn one_thousand_rows_one_file() {
let num_rows = 1_000;
let num_files = 1;
let TimeValues {
sampling_interval_ns,
start_end_args,
} = TimeValues::new(COMPACTION_TYPE, num_rows, num_files);
assert_eq!(sampling_interval_ns, 57_657_657_657);
assert_eq!(
start_end_args,
vec![StartEndMinutesAgo {
start: 24 * 60,
end: 8 * 60 - 1,
}]
);
}
#[test]
fn one_row_three_files() {
let num_rows = 1;
let num_files = 3;
let TimeValues {
sampling_interval_ns,
start_end_args,
} = TimeValues::new(COMPACTION_TYPE, num_rows, num_files);
assert_eq!(sampling_interval_ns, 19_200_000_000_000);
assert_eq!(
start_end_args,
vec![
StartEndMinutesAgo {
start: 1440,
end: 1119,
},
StartEndMinutesAgo {
start: 1120,
end: 799,
},
StartEndMinutesAgo {
start: 800,
end: 479,
},
]
);
}
#[test]
fn one_thousand_rows_three_files() {
let num_rows = 1_000;
let num_files = 3;
let TimeValues {
sampling_interval_ns,
start_end_args,
} = TimeValues::new(COMPACTION_TYPE, num_rows, num_files);
assert_eq!(sampling_interval_ns, 19_219_219_219);
assert_eq!(
start_end_args,
vec![
StartEndMinutesAgo {
start: 1440,
end: 1119,
},
StartEndMinutesAgo {
start: 1120,
end: 799,
},
StartEndMinutesAgo {
start: 800,
end: 479,
},
]
);
}
}
#[test]
fn minimal_spec_contents() {
let spec = data_generation_spec_contents(1, 1, 2);
assert_eq!(
spec,
r#"
name = "for_compaction"
[[database_writers]]
database_ratio = 1.0
agents = [{name = "data_1", sampling_interval = "1ns"}]
[[agents]]
name = "data_1"
[[agents.measurements]]
name = "measure"
[[agents.measurements.fields]]
name = "i64_0"
i64_range = [0, 100]
"#
);
}
#[test]
fn many_columns_spec_contents() {
let spec = data_generation_spec_contents(3, 100, 12);
assert_eq!(
spec,
r#"
name = "for_compaction"
[[database_writers]]
database_ratio = 1.0
agents = [{name = "data_3", sampling_interval = "100ns"}]
[[agents]]
name = "data_3"
[[agents.measurements]]
name = "measure"
tag_pairs = [
{key = "tag_1", template = "{{random 1}}", regenerate_after_lines = 1},
{key = "tag_2", template = "{{random 1}}", regenerate_after_lines = 1},
]
[[agents.measurements.fields]]
name = "i64_0"
i64_range = [0, 100]
[[agents.measurements.fields]]
name = "f64_1"
f64_range = [0.0, 100.0]
[[agents.measurements.fields]]
name = "string_2"
template = "{{random 4}}"
[[agents.measurements.fields]]
name = "bool_3"
bool = true
[[agents.measurements.fields]]
name = "i64_4"
i64_range = [0, 100]
[[agents.measurements.fields]]
name = "f64_5"
f64_range = [0.0, 100.0]
[[agents.measurements.fields]]
name = "string_6"
template = "{{random 4}}"
[[agents.measurements.fields]]
name = "bool_7"
bool = true
[[agents.measurements.fields]]
name = "i64_8"
i64_range = [0, 100]
"#
);
}
}

View File

@ -1,153 +0,0 @@
use arrow::record_batch::RecordBatch;
use assert_cmd::Command;
use datafusion::datasource::object_store::ObjectStoreUrl;
use futures::TryStreamExt;
use object_store::{local::LocalFileSystem, path::Path as ObjectStorePath, ObjectStore};
use parquet_to_line_protocol::ParquetFileReader;
use predicates::prelude::*;
use std::sync::Arc;
use test_helpers_end_to_end::maybe_skip_integration;
#[tokio::test]
async fn compactor_generate_has_defaults() {
let database_url = maybe_skip_integration!();
let dir = tempfile::tempdir()
.expect("could not get temporary directory")
.into_path();
Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("compactor")
.arg("generate")
.arg("--catalog-dsn")
.arg(&database_url)
.arg("--object-store")
.arg("file")
.arg("--data-dir")
.arg(&dir)
.assert()
.success();
let data_generation_spec = dir.join("compactor_data/spec.toml");
assert!(data_generation_spec.exists());
}
#[tokio::test]
async fn compactor_generate_zeroes_are_invalid() {
let database_url = maybe_skip_integration!();
let dir = tempfile::tempdir().expect("could not get temporary directory");
Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("compactor")
.arg("generate")
.arg("--catalog-dsn")
.arg(&database_url)
.arg("--object-store")
.arg("file")
.arg("--data-dir")
.arg(dir.path())
.arg("--num-partitions")
.arg("0")
.arg("--num-files")
.arg("0")
.arg("--num-cols")
.arg("0")
.arg("--num-rows")
.arg("0")
.assert()
.failure()
.stderr(predicate::str::contains(
"number would be zero for non-zero type",
));
}
#[tokio::test]
async fn compactor_generate_creates_files_and_catalog_entries() {
let database_url = maybe_skip_integration!();
let dir = tempfile::tempdir().expect("could not get temporary directory");
Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("compactor")
.arg("generate")
.arg("--catalog-dsn")
.arg(&database_url)
.arg("--object-store")
.arg("file")
.arg("--data-dir")
.arg(dir.path())
.assert()
.success();
let data_generation_spec = dir.path().join("compactor_data/spec.toml");
assert!(data_generation_spec.exists());
}
#[tokio::test]
async fn running_compactor_generate_twice_overwrites_existing_files() {
let database_url = maybe_skip_integration!();
let dir = tempfile::tempdir().expect("could not get temporary directory");
Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("compactor")
.arg("generate")
.arg("--catalog-dsn")
.arg(&database_url)
.arg("--object-store")
.arg("file")
.arg("--data-dir")
.arg(dir.path())
.assert()
.success();
let first_run_data_path = dir
.path()
.join("compactor_data/parquet/data_0_measure.parquet");
let first_run_record_batches = read_record_batches(&first_run_data_path).await;
assert_eq!(first_run_record_batches.len(), 1);
let first_run_record_batch = &first_run_record_batches[0];
let first_run_num_lines = first_run_record_batch.num_rows();
Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("compactor")
.arg("generate")
.arg("--catalog-dsn")
.arg(&database_url)
.arg("--object-store")
.arg("file")
.arg("--data-dir")
.arg(dir.path())
.assert()
.success();
let second_run_data_path = dir
.path()
.join("compactor_data/parquet/data_0_measure.parquet");
let second_run_record_batches = read_record_batches(&second_run_data_path).await;
assert_eq!(second_run_record_batches.len(), 1);
let second_run_record_batch = &second_run_record_batches[0];
let second_run_num_lines = second_run_record_batch.num_rows();
// If generation is appending instead of overwriting, this will fail.
assert_eq!(first_run_num_lines, second_run_num_lines);
// If generation isn't creating different data every time it's invoked, this will fail.
assert_ne!(first_run_record_batch, second_run_record_batch);
}
async fn read_record_batches(path: impl AsRef<std::path::Path>) -> Vec<RecordBatch> {
let object_store_path = ObjectStorePath::from_filesystem_path(path).unwrap();
let object_store = Arc::new(LocalFileSystem::new()) as Arc<dyn ObjectStore>;
let object_store_url = ObjectStoreUrl::local_filesystem();
let object_meta = object_store.head(&object_store_path).await.unwrap();
let reader = ParquetFileReader::try_new(object_store, object_store_url, object_meta)
.await
.unwrap();
reader.read().await.unwrap().try_collect().await.unwrap()
}

View File

@ -3,7 +3,6 @@ mod all_in_one;
// loading shared libraries: libjemalloc.so.2: cannot open shared object file: No such file or directory"
#[cfg(not(feature = "heappy"))]
mod cli;
mod compactor;
mod debug;
mod error;
mod flightsql;

1
iox_catalog/.gitignore vendored Normal file
View File

@ -0,0 +1 @@
iox_catalog.sqlite3

View File

@ -14,8 +14,10 @@ log = "0.4"
metric = { version = "0.1.0", path = "../metric" }
mutable_batch = { path = "../mutable_batch" }
observability_deps = { path = "../observability_deps" }
parking_lot = { version = "0.12" }
serde = { version = "1.0", features = ["derive"] }
snafu = "0.7"
sqlx = { version = "0.6", features = [ "runtime-tokio-rustls" , "postgres", "uuid" ] }
sqlx = { version = "0.6", features = [ "runtime-tokio-rustls" , "postgres", "uuid", "sqlite" ] }
sqlx-hotswap-pool = { path = "../sqlx-hotswap-pool" }
thiserror = "1.0.38"
tokio = { version = "1.25", features = ["io-util", "macros", "parking_lot", "rt-multi-thread", "time"] }

View File

@ -0,0 +1,233 @@
create table if not exists topic
(
id INTEGER not null
constraint kafka_topic_pkey
primary key autoincrement,
name VARCHAR not null
constraint topic_name_unique unique
);
create table if not exists query_pool
(
id INTEGER NOT NULL
constraint query_pool_pkey
primary key autoincrement,
name varchar not null
constraint query_pool_name_unique
unique
);
create table if not exists namespace
(
id INTEGER
constraint namespace_pkey
primary key autoincrement,
name varchar not null
constraint namespace_name_unique
unique,
topic_id numeric not null
constraint namespace_kafka_topic_id_fkey
references topic,
query_pool_id numeric not null
references query_pool,
max_tables integer default 10000 not null,
max_columns_per_table integer default 200 not null,
retention_period_ns numeric
);
create table if not exists table_name
(
id INTEGER
constraint table_name_pkey
primary key autoincrement,
namespace_id numeric not null
references namespace
on delete cascade,
name varchar not null,
constraint table_name_unique
unique (namespace_id, name)
);
create index if not exists table_name_namespace_idx
on table_name (namespace_id);
create table if not exists column_name
(
id INTEGER
constraint column_name_pkey
primary key autoincrement,
table_id numeric not null
references table_name
on delete cascade,
name varchar not null,
column_type smallint not null,
constraint column_name_unique
unique (table_id, name)
);
create index if not exists column_name_table_idx
on column_name (table_id);
create table if not exists shard
(
id INTEGER
constraint sequencer_pkey
primary key autoincrement,
topic_id numeric not null
constraint sequencer_kafka_topic_id_fkey
references topic,
shard_index integer not null,
min_unpersisted_sequence_number numeric,
constraint shard_unique
unique (topic_id, shard_index)
);
create table if not exists sharding_rule_override
(
id INTEGER
constraint sharding_rule_override_pkey
primary key autoincrement,
namespace_id numeric not null
references namespace,
table_id numeric not null
references table_name,
column_id numeric not null
references column_name
);
create table if not exists partition
(
id INTEGER
constraint partition_pkey
primary key autoincrement,
shard_id numeric not null
constraint partition_sequencer_id_fkey
references shard,
table_id numeric not null
references table_name
on delete cascade,
partition_key varchar not null,
sort_key text [] not null,
persisted_sequence_number numeric,
to_delete numeric,
new_file_at numeric,
constraint partition_key_unique
unique (table_id, partition_key)
);
create table if not exists parquet_file
(
id INTEGER
constraint parquet_file_pkey
primary key autoincrement,
shard_id numeric not null
constraint parquet_file_sequencer_id_fkey
references shard,
table_id numeric not null
references table_name,
partition_id numeric not null
references partition,
object_store_id uuid not null
constraint parquet_location_unique
unique,
max_sequence_number numeric,
min_time numeric,
max_time numeric,
to_delete numeric,
row_count numeric default 0 not null,
file_size_bytes numeric default 0 not null,
compaction_level smallint default 0 not null,
created_at numeric,
namespace_id numeric not null
references namespace
on delete cascade,
column_set numeric[] not null,
max_l0_created_at numeric default 0 not null
);
create index if not exists parquet_file_deleted_at_idx
on parquet_file (to_delete);
create index if not exists parquet_file_partition_idx
on parquet_file (partition_id);
create index if not exists parquet_file_table_idx
on parquet_file (table_id);
create index if not exists parquet_file_shard_compaction_delete_idx
on parquet_file (shard_id, compaction_level, to_delete);
create index if not exists parquet_file_shard_compaction_delete_created_idx
on parquet_file (shard_id, compaction_level, to_delete, created_at);
create index if not exists parquet_file_partition_created_idx
on parquet_file (partition_id, created_at);
create table if not exists tombstone
(
id INTEGER
constraint tombstone_pkey
primary key autoincrement,
table_id numeric not null
references table_name
on delete cascade,
shard_id numeric not null
constraint tombstone_sequencer_id_fkey
references shard,
sequence_number numeric not null,
min_time numeric not null,
max_time numeric not null,
serialized_predicate text not null,
constraint tombstone_unique
unique (table_id, shard_id, sequence_number)
);
create table if not exists processed_tombstone
(
tombstone_id INTEGER not null
references tombstone,
parquet_file_id numeric not null
references parquet_file
on delete cascade,
primary key (tombstone_id, parquet_file_id)
);
create table if not exists skipped_compactions
(
partition_id INTEGER not null
constraint skipped_compactions_pkey
primary key
references partition
on delete cascade,
reason text not null,
skipped_at numeric not null,
num_files numeric,
limit_num_files numeric,
estimated_bytes numeric,
limit_bytes numeric,
limit_num_files_first_in_partition numeric
);
create table if not exists billing_summary
(
namespace_id integer not null
constraint billing_summary_pkey
primary key
references namespace
on delete cascade,
total_file_size_bytes numeric not null
);
create index if not exists billing_summary_namespace_idx
on billing_summary (namespace_id);

View File

@ -0,0 +1,31 @@
create trigger if not exists update_partition
after insert
on parquet_file
for each row
when NEW.compaction_level < 2
begin
UPDATE partition set new_file_at = NEW.created_at WHERE id = NEW.partition_id;
end;
create trigger if not exists update_billing
after insert
on parquet_file
for each row
begin
INSERT INTO billing_summary (namespace_id, total_file_size_bytes)
VALUES (NEW.namespace_id, NEW.file_size_bytes)
ON CONFLICT (namespace_id) DO UPDATE
SET total_file_size_bytes = billing_summary.total_file_size_bytes + NEW.file_size_bytes
WHERE billing_summary.namespace_id = NEW.namespace_id;
end;
create trigger if not exists decrement_summary
after update
on parquet_file
for each row
when OLD.to_delete IS NULL AND NEW.to_delete IS NOT NULL
begin
UPDATE billing_summary
SET total_file_size_bytes = billing_summary.total_file_size_bytes - OLD.file_size_bytes
WHERE billing_summary.namespace_id = OLD.namespace_id;
end;

View File

@ -42,6 +42,7 @@ pub mod interface;
pub mod mem;
pub mod metrics;
pub mod postgres;
pub mod sqlite;
/// An [`crate::interface::Error`] scoped to a single table for schema validation errors.
#[derive(Debug, Error)]

2920
iox_catalog/src/sqlite.rs Normal file

File diff suppressed because it is too large Load Diff

View File

@ -1,6 +1,8 @@
//! This module contains code that implements
//! a gap-filling extension to DataFusion
mod algo;
use std::{
fmt::{self, Debug},
ops::{Bound, Range},
@ -15,11 +17,15 @@ use datafusion::{
logical_expr::{LogicalPlan, UserDefinedLogicalNode},
physical_expr::{create_physical_expr, execution_props::ExecutionProps, PhysicalSortExpr},
physical_plan::{
expressions::Column, DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
PhysicalExpr, SendableRecordBatchStream, Statistics,
expressions::Column,
metrics::{BaselineMetrics, ExecutionPlanMetricsSet},
DisplayFormatType, Distribution, ExecutionPlan, Partitioning, PhysicalExpr,
SendableRecordBatchStream, Statistics,
},
prelude::Expr,
};
use datafusion_util::{watch::WatchedTask, AdapterStream};
use tokio::sync::mpsc;
/// A logical node that represents the gap filling operation.
#[derive(Clone, Debug)]
@ -31,17 +37,60 @@ pub struct GapFill {
}
/// Parameters to the GapFill operation
#[derive(Clone, Debug)]
#[derive(Clone, Debug, PartialEq)]
pub(crate) struct GapFillParams {
/// The stride argument from the call to DATE_BIN_GAPFILL
pub stride: Expr,
/// The source time column
pub time_column: Expr,
/// The origin argument from the call to DATE_BIN_GAPFILL
pub origin: Expr,
/// The time range of the time column inferred from predicates
/// in overall the query
/// in the overall query
pub time_range: Range<Bound<Expr>>,
}
impl GapFillParams {
// Extract the expressions so they can be optimized.
fn expressions(&self) -> Vec<Expr> {
vec![
self.stride.clone(),
self.time_column.clone(),
self.origin.clone(),
bound_extract(&self.time_range.start)
.unwrap_or_else(|| panic!("lower time bound is required"))
.clone(),
bound_extract(&self.time_range.end)
.unwrap_or_else(|| panic!("upper time bound is required"))
.clone(),
]
}
#[allow(clippy::wrong_self_convention)] // follows convention of UserDefinedLogicalNode
fn from_template(&self, exprs: &[Expr]) -> Self {
assert!(
exprs.len() >= 3,
"should be a at least stride, source and origin in params"
);
let mut iter = exprs.iter().cloned();
let stride = iter.next().unwrap();
let time_column = iter.next().unwrap();
let origin = iter.next().unwrap();
let time_range = try_map_range(&self.time_range, |b| {
try_map_bound(b.as_ref(), |_| {
Ok(iter.next().expect("expr count should match template"))
})
})
.unwrap();
Self {
stride,
time_column,
origin,
time_range,
}
}
}
impl GapFill {
pub(crate) fn try_new(
input: Arc<LogicalPlan>,
@ -74,7 +123,8 @@ impl UserDefinedLogicalNode for GapFill {
fn expressions(&self) -> Vec<Expr> {
self.group_expr
.iter()
.chain(self.aggr_expr.iter())
.chain(&self.aggr_expr)
.chain(&self.params.expressions())
.cloned()
.collect()
}
@ -97,14 +147,11 @@ impl UserDefinedLogicalNode for GapFill {
inputs: &[LogicalPlan],
) -> Arc<dyn UserDefinedLogicalNode> {
let mut group_expr: Vec<_> = exprs.to_vec();
let aggr_expr = group_expr.split_off(self.group_expr.len());
let gapfill = Self::try_new(
Arc::new(inputs[0].clone()),
group_expr,
aggr_expr,
self.params.clone(),
)
.expect("should not fail");
let mut aggr_expr = group_expr.split_off(self.group_expr.len());
let param_expr = aggr_expr.split_off(self.aggr_expr.len());
let params = self.params.from_template(&param_expr);
let gapfill = Self::try_new(Arc::new(inputs[0].clone()), group_expr, aggr_expr, params)
.expect("should not fail");
Arc::new(gapfill)
}
}
@ -162,9 +209,17 @@ pub(crate) fn plan_gap_fill(
})
})?;
let origin = create_physical_expr(
&gap_fill.params.origin,
input_dfschema,
input_schema,
execution_props,
)?;
let params = GapFillExecParams {
stride,
time_column,
origin,
time_range,
};
GapFillExec::try_new(
@ -175,9 +230,9 @@ pub(crate) fn plan_gap_fill(
)
}
fn try_map_range<T, U, F>(tr: &Range<T>, f: F) -> Result<Range<U>>
fn try_map_range<T, U, F>(tr: &Range<T>, mut f: F) -> Result<Range<U>>
where
F: Fn(&T) -> Result<U>,
F: FnMut(&T) -> Result<U>,
{
Ok(Range {
start: f(&tr.start)?,
@ -185,9 +240,9 @@ where
})
}
fn try_map_bound<T, U, F>(bt: Bound<T>, f: F) -> Result<Bound<U>>
fn try_map_bound<T, U, F>(bt: Bound<T>, mut f: F) -> Result<Bound<U>>
where
F: FnOnce(T) -> Result<U>,
F: FnMut(T) -> Result<U>,
{
Ok(match bt {
Bound::Excluded(t) => Bound::Excluded(f(t)?),
@ -196,6 +251,12 @@ where
})
}
fn bound_extract<T>(b: &Bound<T>) -> Option<&T> {
match b {
Bound::Included(t) | Bound::Excluded(t) => Some(t),
Bound::Unbounded => None,
}
}
/// A physical node for the gap-fill operation.
pub struct GapFillExec {
input: Arc<dyn ExecutionPlan>,
@ -208,6 +269,8 @@ pub struct GapFillExec {
sort_expr: Vec<PhysicalSortExpr>,
// Parameters (besides streaming data) to gap filling
params: GapFillExecParams,
/// Metrics reporting behavior during execution.
metrics: ExecutionPlanMetricsSet,
}
#[derive(Clone, Debug)]
@ -216,7 +279,10 @@ struct GapFillExecParams {
stride: Arc<dyn PhysicalExpr>,
/// The timestamp column produced by date_bin
time_column: Column,
/// The time range of timestamps in the time column
/// The origin argument from the all to DATE_BIN_GAPFILL
origin: Arc<dyn PhysicalExpr>,
/// The time range of source input to DATE_BIN_GAPFILL.
/// Inferred from predicates in the overall query.
time_range: Range<Bound<Arc<dyn PhysicalExpr>>>,
}
@ -242,11 +308,9 @@ impl GapFillExec {
.iter()
.enumerate()
.find(|(_i, e)| {
if let Some(col) = e.as_any().downcast_ref::<Column>() {
col.index() == params.time_column.index()
} else {
false
}
e.as_any()
.downcast_ref::<Column>()
.map_or(false, |c| c.index() == params.time_column.index())
})
.map(|(i, _)| i);
@ -268,6 +332,7 @@ impl GapFillExec {
aggr_expr,
sort_expr,
params,
metrics: ExecutionPlanMetricsSet::new(),
})
}
}
@ -333,14 +398,29 @@ impl ExecutionPlan for GapFillExec {
fn execute(
&self,
partition: usize,
_context: Arc<TaskContext>,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
if self.output_partitioning().partition_count() <= partition {
return Err(DataFusionError::Internal(format!(
"GapFillExec invalid partition {partition}"
)));
}
Err(DataFusionError::NotImplemented("gap filling".to_string()))
let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
let output_batch_size = context.session_config().batch_size();
let input_stream = self.input.execute(partition, context)?;
let (tx, rx) = mpsc::channel(1);
let fut = algo::fill_gaps(
output_batch_size,
input_stream,
self.sort_expr.clone(),
self.aggr_expr.clone(),
self.params.clone(),
tx.clone(),
baseline_metrics,
);
let handle = WatchedTask::new(fut, vec![tx], "gapfill batches");
Ok(AdapterStream::adapt(self.schema(), rx, handle))
}
fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
@ -404,6 +484,36 @@ mod test {
logical_plan::table_scan(Some("temps"), &schema, None)?.build()
}
#[test]
fn test_from_template() -> Result<()> {
let scan = table_scan()?;
let gapfill = GapFill::try_new(
Arc::new(scan.clone()),
vec![col("loc"), col("time")],
vec![col("temp")],
GapFillParams {
stride: lit(ScalarValue::IntervalDayTime(Some(60_000))),
time_column: col("time"),
origin: lit_timestamp_nano(0),
time_range: Range {
start: Bound::Included(lit_timestamp_nano(1000)),
end: Bound::Excluded(lit_timestamp_nano(2000)),
},
},
)?;
let exprs = gapfill.expressions();
assert_eq!(8, exprs.len());
let gapfill_ft = gapfill.from_template(&exprs, &[scan]);
let gapfill_ft = gapfill_ft
.as_any()
.downcast_ref::<GapFill>()
.expect("should be a GapFill");
assert_eq!(gapfill.group_expr, gapfill_ft.group_expr);
assert_eq!(gapfill.aggr_expr, gapfill_ft.aggr_expr);
assert_eq!(gapfill.params, gapfill_ft.params);
Ok(())
}
#[test]
fn fmt_logical_plan() -> Result<()> {
// This test case does not make much sense but
@ -417,6 +527,7 @@ mod test {
GapFillParams {
stride: lit(ScalarValue::IntervalDayTime(Some(60_000))),
time_column: col("time"),
origin: lit_timestamp_nano(0),
time_range: Range {
start: Bound::Included(lit_timestamp_nano(1000)),
end: Bound::Excluded(lit_timestamp_nano(2000)),

View File

@ -0,0 +1,289 @@
use std::{ops::Bound, sync::Arc};
use arrow::{datatypes::IntervalDayTimeType, record_batch::RecordBatch};
use chrono::Duration;
use datafusion::{
error::DataFusionError,
error::Result,
physical_expr::{datetime_expressions::date_bin, PhysicalSortExpr},
physical_plan::{
metrics::BaselineMetrics, ColumnarValue, PhysicalExpr, SendableRecordBatchStream,
},
scalar::ScalarValue,
};
use tokio::sync::mpsc;
use tokio_stream::StreamExt;
use super::{try_map_bound, try_map_range, GapFillExecParams};
/// Fill in the gaps in a stream of records that represent
/// one or more time series.
///
/// # Arguments
///
/// * `output_batch_size`
/// * `input_stream`
/// * `_sort_expr` - The incoming records will be sorted by these
/// expressions. They will all be simple column references,
/// with the last one being the timestamp value for each row.
/// The last column will already have been normalized by a previous
/// call to DATE_BIN.
/// * `_aggr_expr` - A set of column expressions that are the aggregate values
/// computed by an upstream Aggregate node.
/// * `params` - The parameters for gap filling, including the stride and the
/// start and end of the time range for this operation.
/// * `_tx` - The transmit end of the channel for output.
/// * `_baseline_metrics`
pub(super) async fn fill_gaps(
_output_batch_size: usize,
mut input_stream: SendableRecordBatchStream,
_sort_expr: Vec<PhysicalSortExpr>,
_aggr_expr: Vec<Arc<dyn PhysicalExpr>>,
params: GapFillExecParams,
_tx: mpsc::Sender<Result<RecordBatch>>,
_baseline_metrics: BaselineMetrics,
) -> Result<()> {
while let Some(batch) = input_stream.next().await {
let batch = batch?;
let _params = evaluate_params(&batch, &params);
}
Err(DataFusionError::NotImplemented("gap_filling".to_string()))
}
#[derive(Debug, PartialEq)]
struct GapFillParams {
#[allow(unused)]
pub stride: i64,
#[allow(unused)]
pub first_ts: i64,
#[allow(unused)]
pub last_ts: i64,
}
/// Figure out the actual values (as native i64) for the stride,
/// first and last timestamp for gap filling.
fn evaluate_params(
batch: &RecordBatch,
params: &super::GapFillExecParams,
) -> Result<GapFillParams> {
let stride = params.stride.evaluate(batch)?;
let origin = params.origin.evaluate(batch)?;
// Evaluate the upper and lower bounds of the time range
let range = try_map_range(&params.time_range, |b| {
try_map_bound(b.as_ref(), |pe| {
extract_timestamp_nanos(&pe.evaluate(batch)?)
})
})?;
// Find the smallest timestamp that might appear in the
// range
let first_ts = match range.start {
Bound::Included(v) => v,
Bound::Excluded(v) => v + 1,
Bound::Unbounded => {
return Err(DataFusionError::Execution(
"missing lower time bound for gap filling".to_string(),
))
}
};
// Find the largest timestamp that might appear in the
// range
let last_ts = match range.end {
Bound::Included(v) => v,
Bound::Excluded(v) => v - 1,
Bound::Unbounded => {
return Err(DataFusionError::Execution(
"missing upper time bound for gap filling".to_string(),
))
}
};
// Call date_bin on the timestamps to find the first and last time bins
// for each series
let mut args = vec![stride, i64_to_columnar_ts(first_ts), origin];
let first_ts = extract_timestamp_nanos(&date_bin(&args)?)?;
args[1] = i64_to_columnar_ts(last_ts);
let last_ts = extract_timestamp_nanos(&date_bin(&args)?)?;
Ok(GapFillParams {
stride: extract_interval_nanos(&args[0])?,
first_ts,
last_ts,
})
}
fn i64_to_columnar_ts(i: i64) -> ColumnarValue {
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(i), None))
}
fn extract_timestamp_nanos(cv: &ColumnarValue) -> Result<i64> {
Ok(match cv {
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(v), _)) => *v,
_ => {
return Err(DataFusionError::Execution(
"gap filling argument must be a scalar timestamp".to_string(),
))
}
})
}
fn extract_interval_nanos(cv: &ColumnarValue) -> Result<i64> {
match cv {
ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(v))) => {
let (days, ms) = IntervalDayTimeType::to_parts(*v);
let nanos =
(Duration::days(days as i64) + Duration::milliseconds(ms as i64)).num_nanoseconds();
nanos.ok_or_else(|| {
DataFusionError::Execution("gap filling argument is too large".to_string())
})
}
_ => Err(DataFusionError::Execution(
"gap filling expects a stride parameter to be a scalar interval".to_string(),
)),
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use arrow::{
array::{ArrayRef, Float64Array, StringArray, TimestampNanosecondArray},
datatypes::{DataType, Field, Schema, TimeUnit},
error::Result as ArrowResult,
record_batch::RecordBatch,
};
use datafusion::{
datasource::empty::EmptyTable, error::Result, from_slice::FromSlice, sql::TableReference,
};
use crate::exec::{gapfill::GapFillExec, Executor, ExecutorType};
use super::GapFillParams;
fn schema() -> Schema {
Schema::new(vec![
Field::new(
"time",
DataType::Timestamp(TimeUnit::Nanosecond, None),
false,
),
Field::new(
"other_time",
DataType::Timestamp(TimeUnit::Nanosecond, None),
false,
),
Field::new("loc", DataType::Utf8, false),
Field::new("temp", DataType::Float64, false),
])
}
fn record_batch() -> ArrowResult<RecordBatch> {
let columns: Vec<ArrayRef> = vec![
Arc::new(TimestampNanosecondArray::from_slice([1000])),
Arc::new(TimestampNanosecondArray::from_slice([2000])),
Arc::new(StringArray::from_slice(["kitchen"])),
Arc::new(Float64Array::from_slice([27.1])),
];
RecordBatch::try_new(Arc::new(schema()), columns)
}
async fn plan_statement_and_get_params(sql: &str) -> Result<GapFillParams> {
let executor = Executor::new_testing();
let context = executor.new_context(ExecutorType::Query);
context.inner().register_table(
TableReference::Bare { table: "t" },
Arc::new(EmptyTable::new(Arc::new(schema()))),
)?;
let physical_plan = context.prepare_sql(sql).await?;
let gapfill_node = &physical_plan.children()[0];
let gapfill_node = gapfill_node.as_any().downcast_ref::<GapFillExec>().unwrap();
let exec_params = &gapfill_node.params;
super::evaluate_params(&record_batch()?, exec_params)
}
#[tokio::test]
async fn test_evaluate_params() -> Result<()> {
test_helpers::maybe_start_logging();
let actual = plan_statement_and_get_params(
"select\
\n date_bin_gapfill(interval '1 minute', time, timestamp '1970-01-01T00:00:00Z') minute\
\nfrom t\
\nwhere time >= timestamp '1984-01-01T16:00:00Z' - interval '5 minutes'\
\n and time <= timestamp '1984-01-01T16:00:00Z'\
\ngroup by minute",
).await?;
let expected = GapFillParams {
stride: 60_000_000_000, // 1 minute
first_ts: 441_820_500_000_000_000, // Sunday, January 1, 1984 3:55:00 PM
last_ts: 441_820_800_000_000_000, // Sunday, January 1, 1984 3:59:00 PM
};
assert_eq!(expected, actual);
Ok(())
}
#[tokio::test]
async fn test_evaluate_params_exclude_end() -> Result<()> {
test_helpers::maybe_start_logging();
let actual = plan_statement_and_get_params(
"select\
\n date_bin_gapfill(interval '1 minute', time, timestamp '1970-01-01T00:00:00Z') minute\
\nfrom t\
\nwhere time >= timestamp '1984-01-01T16:00:00Z' - interval '5 minutes'\
\n and time < timestamp '1984-01-01T16:00:00Z'\
\ngroup by minute",
).await?;
let expected = GapFillParams {
stride: 60_000_000_000, // 1 minute
first_ts: 441_820_500_000_000_000, // Sunday, January 1, 1984 3:55:00 PM
// Last bin at 16:00 is excluded
last_ts: 441_820_740_000_000_000, // Sunday, January 1, 1984 3:59:00 PM
};
assert_eq!(expected, actual);
Ok(())
}
#[tokio::test]
async fn test_evaluate_params_exclude_start() -> Result<()> {
test_helpers::maybe_start_logging();
let actual = plan_statement_and_get_params(
"select\
\n date_bin_gapfill(interval '1 minute', time, timestamp '1970-01-01T00:00:00Z') minute\
\nfrom t\
\nwhere time > timestamp '1984-01-01T16:00:00Z' - interval '5 minutes'\
\n and time <= timestamp '1984-01-01T16:00:00Z'\
\ngroup by minute",
).await?;
let expected = GapFillParams {
stride: 60_000_000_000, // 1 minute
// First bin not exluded since it truncates to 15:55:00
first_ts: 441_820_500_000_000_000, // Sunday, January 1, 1984 3:55:00 PM
last_ts: 441_820_800_000_000_000, // Sunday, January 1, 1984 3:59:00 PM
};
assert_eq!(expected, actual);
Ok(())
}
#[tokio::test]
async fn test_evaluate_params_origin() -> Result<()> {
test_helpers::maybe_start_logging();
let actual = plan_statement_and_get_params(
// origin is 9s after the epoch
"select\
\n date_bin_gapfill(interval '1 minute', time, timestamp '1970-01-01T00:00:09Z') minute\
\nfrom t\
\nwhere time >= timestamp '1984-01-01T16:00:00Z' - interval '5 minutes'\
\n and time <= timestamp '1984-01-01T16:00:00Z'\
\ngroup by minute",
).await?;
let expected = GapFillParams {
stride: 60_000_000_000, // 1 minute
first_ts: 441_820_449_000_000_000, // Sunday, January 1, 1984 3:54:09 PM
last_ts: 441_820_749_000_000_000, // Sunday, January 1, 1984 3:59:09 PM
};
assert_eq!(expected, actual);
Ok(())
}
}

View File

@ -9,6 +9,7 @@ use datafusion::{
logical_expr::{
expr_rewriter::{ExprRewritable, ExprRewriter, RewriteRecursion},
expr_visitor::{ExprVisitable, ExpressionVisitor, Recursion},
utils::expr_to_columns,
Aggregate, BuiltinScalarFunction, Extension, LogicalPlan,
},
optimizer::{optimizer::ApplyOrder, OptimizerConfig, OptimizerRule},
@ -16,6 +17,7 @@ use datafusion::{
};
use query_functions::gapfill::DATE_BIN_GAPFILL_UDF_NAME;
use std::{
collections::HashSet,
ops::{Bound, Range},
sync::Arc,
};
@ -112,24 +114,6 @@ fn handle_aggregate(aggr: &Aggregate) -> Result<Option<LogicalPlan>> {
return Ok(None);
};
if date_bin_gapfill_args.len() != 3 {
return Err(DataFusionError::Plan(format!(
"DATE_BIN_GAPFILL expects 3 arguments, got {}",
date_bin_gapfill_args.len()
)));
}
let time_col = match &date_bin_gapfill_args[1] {
Expr::Column(c) => c,
_ => {
return Err(DataFusionError::Plan(
"DATE_BIN_GAPFILL requires a column as the source argument".to_string(),
))
}
};
let time_range = range_predicate::find_time_range(input, time_col)?;
validate_time_range(&time_range)?;
let new_aggr_plan = {
// Create the aggregate node with the same output schema as the orignal
// one. This means that there will be an output column called `date_bin_gapfill(...)`
@ -146,49 +130,105 @@ fn handle_aggregate(aggr: &Aggregate) -> Result<Option<LogicalPlan>> {
new_aggr_plan
};
let new_gap_fill_plan = {
let mut new_group_expr: Vec<_> = new_aggr_plan
.schema()
.fields()
.iter()
.map(|f| Expr::Column(f.qualified_column()))
.collect();
let aggr_expr = new_group_expr.split_off(group_expr.len());
let time_column =
col(new_aggr_plan.schema().fields()[date_bin_gapfill_index].qualified_column());
let stride = date_bin_gapfill_args
.into_iter()
.next()
.expect("there are three args");
LogicalPlan::Extension(Extension {
node: Arc::new(GapFill::try_new(
Arc::new(new_aggr_plan),
new_group_expr,
aggr_expr,
GapFillParams {
stride,
time_column,
time_range,
},
)?),
})
};
let new_gap_fill_plan =
build_gapfill_node(new_aggr_plan, date_bin_gapfill_index, date_bin_gapfill_args)?;
Ok(Some(new_gap_fill_plan))
}
fn build_gapfill_node(
new_aggr_plan: LogicalPlan,
date_bin_gapfill_index: usize,
date_bin_gapfill_args: Vec<Expr>,
) -> Result<LogicalPlan> {
if date_bin_gapfill_args.len() != 3 {
return Err(DataFusionError::Plan(format!(
"DATE_BIN_GAPFILL expects 3 arguments, got {}",
date_bin_gapfill_args.len()
)));
}
let mut args_iter = date_bin_gapfill_args.into_iter();
// Ensure that stride argument is a scalar
let stride = args_iter.next().unwrap();
validate_scalar_expr("stride argument to DATE_BIN_GAPFILL", &stride)?;
// Ensure that the source argument is a column
let time_col = args_iter.next().unwrap().try_into_col().map_err(|_| {
DataFusionError::Plan(
"DATE_BIN_GAPFILL requires a column as the source argument".to_string(),
)
})?;
// Ensure that a time range was specified and is valid for gap filling
let time_range = range_predicate::find_time_range(new_aggr_plan.inputs()[0], &time_col)?;
validate_time_range(&time_range)?;
// Ensure that origin argument is a scalar
let origin = args_iter.next().unwrap();
validate_scalar_expr("origin argument to DATE_BIN_GAPFILL", &origin)?;
// Make sure the time output to the gapfill node matches what the
// aggregate output was.
let time_column =
col(new_aggr_plan.schema().fields()[date_bin_gapfill_index].qualified_column());
let aggr = Aggregate::try_from_plan(&new_aggr_plan)?;
let mut new_group_expr: Vec<_> = aggr
.schema
.fields()
.iter()
.map(|f| Expr::Column(f.qualified_column()))
.collect();
let aggr_expr = new_group_expr.split_off(aggr.group_expr.len());
Ok(LogicalPlan::Extension(Extension {
node: Arc::new(GapFill::try_new(
Arc::new(new_aggr_plan),
new_group_expr,
aggr_expr,
GapFillParams {
stride,
time_column,
origin,
time_range,
},
)?),
}))
}
fn validate_time_range(range: &Range<Bound<Expr>>) -> Result<()> {
let Range { ref start, ref end } = range;
match (start, end) {
(Bound::Unbounded, Bound::Unbounded) => Err(DataFusionError::Plan(
"no time bounds found for gap fill query".to_string(),
)),
let (start, end) = match (start, end) {
(Bound::Unbounded, Bound::Unbounded) => {
return Err(DataFusionError::Plan(
"no time bounds found for gap fill query".to_string(),
))
}
(Bound::Unbounded, _) => Err(DataFusionError::Plan(
"no lower time bound found for gap fill query".to_string(),
)),
(_, Bound::Unbounded) => Err(DataFusionError::Plan(
"no upper time bound found for gap fill query".to_string(),
)),
_ => Ok(()),
(
Bound::Included(start) | Bound::Excluded(start),
Bound::Included(end) | Bound::Excluded(end),
) => Ok((start, end)),
}?;
validate_scalar_expr("lower time bound", start)?;
validate_scalar_expr("upper time bound", end)
}
fn validate_scalar_expr(what: &str, e: &Expr) -> Result<()> {
let mut cols = HashSet::new();
expr_to_columns(e, &mut cols)?;
if !cols.is_empty() {
Err(DataFusionError::Plan(format!(
"{what} for gap fill query must evaluate to a scalar"
)))
} else {
Ok(())
}
}
@ -323,7 +363,7 @@ mod test {
use datafusion::logical_expr::{logical_plan, LogicalPlan, LogicalPlanBuilder};
use datafusion::optimizer::optimizer::Optimizer;
use datafusion::optimizer::OptimizerContext;
use datafusion::prelude::{avg, col, lit, lit_timestamp_nano, Expr};
use datafusion::prelude::{avg, case, col, lit, lit_timestamp_nano, Expr};
use datafusion::scalar::ScalarValue;
use query_functions::gapfill::DATE_BIN_GAPFILL_UDF_NAME;
@ -334,6 +374,11 @@ mod test {
DataType::Timestamp(TimeUnit::Nanosecond, None),
false,
),
Field::new(
"time2",
DataType::Timestamp(TimeUnit::Nanosecond, None),
false,
),
Field::new("loc", DataType::Utf8, false),
Field::new("temp", DataType::Float64, false),
]);
@ -341,9 +386,13 @@ mod test {
}
fn date_bin_gapfill(interval: Expr, time: Expr) -> Result<Expr> {
date_bin_gapfill_with_origin(interval, time, lit_timestamp_nano(0))
}
fn date_bin_gapfill_with_origin(interval: Expr, time: Expr, origin: Expr) -> Result<Expr> {
Ok(Expr::ScalarUDF {
fun: query_functions::registry().udf(DATE_BIN_GAPFILL_UDF_NAME)?,
args: vec![interval, time, lit_timestamp_nano(0)],
args: vec![interval, time, origin],
})
}
@ -417,7 +466,59 @@ mod test {
}
#[test]
fn no_time_range_err() -> Result<()> {
fn nonscalar_origin() -> Result<()> {
let plan = LogicalPlanBuilder::from(table_scan()?)
.filter(
col("time")
.gt_eq(lit_timestamp_nano(1000))
.and(col("time").lt(lit_timestamp_nano(2000))),
)?
.aggregate(
vec![date_bin_gapfill_with_origin(
lit(ScalarValue::IntervalDayTime(Some(60_000))),
col("time"),
col("time2"),
)?],
vec![avg(col("temp"))],
)?
.build()?;
assert_optimizer_err(
&plan,
"Error during planning: origin argument to DATE_BIN_GAPFILL for gap fill query must evaluate to a scalar",
);
Ok(())
}
#[test]
fn nonscalar_stride() -> Result<()> {
let stride = case(col("loc"))
.when(
lit("kitchen"),
lit(ScalarValue::IntervalDayTime(Some(60_000))),
)
.otherwise(lit(ScalarValue::IntervalDayTime(Some(30_000))))
.unwrap();
let plan = LogicalPlanBuilder::from(table_scan()?)
.filter(
col("time")
.gt_eq(lit_timestamp_nano(1000))
.and(col("time").lt(lit_timestamp_nano(2000))),
)?
.aggregate(
vec![date_bin_gapfill(stride, col("time"))?],
vec![avg(col("temp"))],
)?
.build()?;
assert_optimizer_err(
&plan,
"Error during planning: stride argument to DATE_BIN_GAPFILL for gap fill query must evaluate to a scalar",
);
Ok(())
}
#[test]
fn time_range_errs() -> Result<()> {
let cases = vec![
(
lit(true),
@ -431,6 +532,16 @@ mod test {
col("time").lt(lit_timestamp_nano(2000)),
"Error during planning: no lower time bound found for gap fill query",
),
(
col("time").gt_eq(col("time2")).and(
col("time").lt(lit_timestamp_nano(2000))),
"Error during planning: lower time bound for gap fill query must evaluate to a scalar",
),
(
col("time").gt_eq(lit_timestamp_nano(2000)).and(
col("time").lt(col("time2"))),
"Error during planning: upper time bound for gap fill query must evaluate to a scalar",
)
];
for c in cases {
let plan = LogicalPlanBuilder::from(table_scan()?)

View File

@ -3,7 +3,6 @@ mod queries;
use crate::snapshot_comparison::queries::TestQueries;
use crate::{run_influxql, run_sql, MiniCluster};
use arrow_util::{display::pretty_format_batches, test_util::sort_record_batch};
use snafu::{OptionExt, ResultExt, Snafu};
use std::fmt::{Display, Formatter};
use std::{
@ -11,7 +10,6 @@ use std::{
path::{Path, PathBuf},
};
use self::normalization::normalize_results;
use self::queries::Query;
#[derive(Debug, Snafu)]
@ -98,19 +96,7 @@ pub async fn run(
for q in queries.iter() {
output.push(format!("-- {}: {}", language, q.text()));
if q.sorted_compare() {
output.push("-- Results After Sorting".into())
}
if q.normalized_uuids() {
output.push("-- Results After Normalizing UUIDs".into())
}
if q.normalized_metrics() {
output.push("-- Results After Normalizing Metrics".into())
}
if q.normalized_filters() {
output.push("-- Results After Normalizing Filters".into())
}
q.add_description(&mut output);
let results = run_query(cluster, q, language).await?;
output.extend(results);
}
@ -233,7 +219,7 @@ async fn run_query(
) -> Result<Vec<String>> {
let query_text = query.text();
let mut results = match language {
let results = match language {
Language::Sql => {
run_sql(
query_text,
@ -252,22 +238,5 @@ async fn run_query(
}
};
// compare against sorted results, if requested
if query.sorted_compare() && !results.is_empty() {
let schema = results[0].schema();
let batch =
arrow::compute::concat_batches(&schema, &results).expect("concatenating batches");
results = vec![sort_record_batch(batch)];
}
let current_results = pretty_format_batches(&results)
.unwrap()
.trim()
.lines()
.map(|s| s.to_string())
.collect::<Vec<_>>();
let current_results = normalize_results(query, current_results);
Ok(current_results)
Ok(query.normalize_results(results))
}

View File

@ -1,9 +1,28 @@
use crate::snapshot_comparison::queries::Query;
use arrow::record_batch::RecordBatch;
use arrow_util::{display::pretty_format_batches, test_util::sort_record_batch};
use once_cell::sync::Lazy;
use regex::{Captures, Regex};
use std::{borrow::Cow, collections::HashMap};
use uuid::Uuid;
/// Match the parquet UUID
///
/// For example, given
/// `32/51/216/13452/1d325760-2b20-48de-ab48-2267b034133d.parquet`
///
/// matches `1d325760-2b20-48de-ab48-2267b034133d`
static REGEX_UUID: Lazy<Regex> = Lazy::new(|| {
Regex::new("[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}").expect("UUID regex")
});
/// Match the parquet directory names
/// For example, given
/// `32/51/216/13452/1d325760-2b20-48de-ab48-2267b034133d.parquet`
///
/// matches `32/51/216/13452`
static REGEX_DIRS: Lazy<Regex> =
Lazy::new(|| Regex::new(r#"[0-9]+/[0-9]+/[0-9]+/[0-9]+"#).expect("directory regex"));
/// Replace table row separators of flexible width with fixed with. This is required
/// because the original timing values may differ in "printed width", so the table
/// cells have different widths and hence the separators / borders. E.g.:
@ -22,93 +41,159 @@ static REGEX_LINESEP: Lazy<Regex> = Lazy::new(|| Regex::new(r#"[+-]{6,}"#).expec
/// ` |` -> ` |`
static REGEX_COL: Lazy<Regex> = Lazy::new(|| Regex::new(r#"\s+\|"#).expect("col regex"));
/// Matches line like `metrics=[foo=1, bar=2]`
static REGEX_METRICS: Lazy<Regex> =
Lazy::new(|| Regex::new(r#"metrics=\[([^\]]*)\]"#).expect("metrics regex"));
/// Matches things like `1s`, `1.2ms` and `10.2μs`
static REGEX_TIMING: Lazy<Regex> =
Lazy::new(|| Regex::new(r#"[0-9]+(\.[0-9]+)?.s"#).expect("timing regex"));
/// Matches things like `FilterExec: time@2 < -9223372036854775808 OR time@2 > 1640995204240217000`
static REGEX_FILTER: Lazy<Regex> =
Lazy::new(|| Regex::new("FilterExec: .*").expect("filter regex"));
fn normalize_for_variable_width(s: Cow<str>) -> String {
let s = REGEX_LINESEP.replace_all(&s, "----------");
REGEX_COL.replace_all(&s, " |").to_string()
}
pub(crate) fn normalize_results(query: &Query, mut current_results: Vec<String>) -> Vec<String> {
// normalize UUIDs, if requested
if query.normalized_uuids() {
let regex_uuid = Regex::new("[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}")
.expect("UUID regex");
let regex_dirs = Regex::new(r#"[0-9]+/[0-9]+/[0-9]+/[0-9]+"#).expect("directory regex");
/// A query to run with optional annotations
#[derive(Debug, PartialEq, Eq, Default)]
pub struct Normalizer {
/// If true, results are sorted first
pub sorted_compare: bool,
let mut seen: HashMap<String, u128> = HashMap::new();
current_results = current_results
.into_iter()
.map(|s| {
let s = regex_uuid.replace_all(&s, |s: &Captures| {
let next = seen.len() as u128;
Uuid::from_u128(
*seen
.entry(s.get(0).unwrap().as_str().to_owned())
.or_insert(next),
)
.to_string()
});
/// If true, replace UUIDs with static placeholders.
pub normalized_uuids: bool,
let s = normalize_for_variable_width(s);
/// If true, normalize timings in queries by replacing them with
/// static placeholders, for example:
///
/// `1s` -> `1.234ms`
pub normalized_metrics: bool,
regex_dirs.replace_all(&s, "1/1/1/1").to_string()
})
.collect();
}
// normalize metrics, if requested
if query.normalized_metrics() {
// Parse regex once and apply to all rows. See description around the `replace...` calls on
// why/how the regexes are used.
let regex_metrics = Regex::new(r#"metrics=\[([^\]]*)\]"#).expect("metrics regex");
let regex_timing = Regex::new(r#"[0-9]+(\.[0-9]+)?.s"#).expect("timing regex");
current_results = current_results
.into_iter()
.map(|s| {
// Replace timings with fixed value, e.g.:
//
// `1s` -> `1.234ms`
// `1.2ms` -> `1.234ms`
// `10.2μs` -> `1.234ms`
let s = regex_timing.replace_all(&s, "1.234ms");
let s = normalize_for_variable_width(s);
// Metrics are currently ordered by value (not by key), so different timings may
// reorder them. We "parse" the list and normalize the sorting. E.g.:
//
// `metrics=[]` => `metrics=[]`
// `metrics=[foo=1, bar=2]` => `metrics=[bar=2, foo=1]`
// `metrics=[foo=2, bar=1]` => `metrics=[bar=1, foo=2]`
regex_metrics
.replace_all(&s, |c: &Captures| {
let mut metrics: Vec<_> = c[1].split(", ").collect();
metrics.sort();
format!("metrics=[{}]", metrics.join(", "))
})
.to_string()
})
.collect();
}
// normalize Filters, if requested
//
// Converts:
// FilterExec: time@2 < -9223372036854775808 OR time@2 > 1640995204240217000
//
// to
// FilterExec: <REDACTED>
if query.normalized_filters() {
let filter_regex = Regex::new("FilterExec: .*").expect("filter regex");
current_results = current_results
.into_iter()
.map(|s| {
filter_regex
.replace_all(&s, |_: &Captures| "FilterExec: <REDACTED>")
.to_string()
})
.collect();
}
current_results
/// if true, normalize filter predicates for explain plans
/// `FilterExec: <REDACTED>`
pub normalized_filters: bool,
}
impl Normalizer {
#[cfg(test)]
pub fn new() -> Self {
Default::default()
}
/// Take the output of running the query and apply the specified normalizations to them
pub fn normalize_results(&self, mut results: Vec<RecordBatch>) -> Vec<String> {
// compare against sorted results, if requested
if self.sorted_compare && !results.is_empty() {
let schema = results[0].schema();
let batch =
arrow::compute::concat_batches(&schema, &results).expect("concatenating batches");
results = vec![sort_record_batch(batch)];
}
let mut current_results = pretty_format_batches(&results)
.unwrap()
.trim()
.lines()
.map(|s| s.to_string())
.collect::<Vec<_>>();
// normalize UUIDs, if requested
if self.normalized_uuids {
let mut seen: HashMap<String, u128> = HashMap::new();
current_results = current_results
.into_iter()
.map(|s| {
// Rewrite parquet directory names like
// `51/216/13452/1d325760-2b20-48de-ab48-2267b034133d.parquet`
//
// to:
// 1/1/1/1/00000000-0000-0000-0000-000000000000.parquet
let s = REGEX_UUID.replace_all(&s, |s: &Captures| {
let next = seen.len() as u128;
Uuid::from_u128(
*seen
.entry(s.get(0).unwrap().as_str().to_owned())
.or_insert(next),
)
.to_string()
});
let s = normalize_for_variable_width(s);
REGEX_DIRS.replace_all(&s, "1/1/1/1").to_string()
})
.collect();
}
// normalize metrics, if requested
if self.normalized_metrics {
current_results = current_results
.into_iter()
.map(|s| {
// Replace timings with fixed value, e.g.:
//
// `1s` -> `1.234ms`
// `1.2ms` -> `1.234ms`
// `10.2μs` -> `1.234ms`
let s = REGEX_TIMING.replace_all(&s, "1.234ms");
let s = normalize_for_variable_width(s);
// Metrics are currently ordered by value (not by key), so different timings may
// reorder them. We "parse" the list and normalize the sorting. E.g.:
//
// `metrics=[]` => `metrics=[]`
// `metrics=[foo=1, bar=2]` => `metrics=[bar=2, foo=1]`
// `metrics=[foo=2, bar=1]` => `metrics=[bar=1, foo=2]`
REGEX_METRICS
.replace_all(&s, |c: &Captures| {
let mut metrics: Vec<_> = c[1].split(", ").collect();
metrics.sort();
format!("metrics=[{}]", metrics.join(", "))
})
.to_string()
})
.collect();
}
// normalize Filters, if requested
//
// Converts:
// FilterExec: time@2 < -9223372036854775808 OR time@2 > 1640995204240217000
//
// to
// FilterExec: <REDACTED>
if self.normalized_filters {
current_results = current_results
.into_iter()
.map(|s| {
REGEX_FILTER
.replace_all(&s, |_: &Captures| "FilterExec: <REDACTED>")
.to_string()
})
.collect();
}
current_results
}
/// Adds information on what normalizations were applied to the input
pub fn add_description(&self, output: &mut Vec<String>) {
if self.sorted_compare {
output.push("-- Results After Sorting".into())
}
if self.normalized_uuids {
output.push("-- Results After Normalizing UUIDs".into())
}
if self.normalized_metrics {
output.push("-- Results After Normalizing Metrics".into())
}
if self.normalized_filters {
output.push("-- Results After Normalizing Filters".into())
}
}
}

View File

@ -1,22 +1,12 @@
use arrow::record_batch::RecordBatch;
use super::normalization::Normalizer;
/// A query to run with optional annotations
#[derive(Debug, PartialEq, Eq, Default)]
pub struct Query {
/// If true, results are sorted first prior to comparison, meaning that differences in the
/// output order compared with expected order do not cause a diff
sorted_compare: bool,
/// If true, replace UUIDs with static placeholders.
normalized_uuids: bool,
/// If true, normalize timings in queries by replacing them with
/// static placeholders, for example:
///
/// `1s` -> `1.234ms`
normalized_metrics: bool,
/// if true, normalize filter predicates for explain plans
/// `FilterExec: <REDACTED>`
normalized_filters: bool,
/// Describes how query text should be normalized
normalizer: Normalizer,
/// The query string
text: String,
@ -27,49 +17,49 @@ impl Query {
fn new(text: impl Into<String>) -> Self {
let text = text.into();
Self {
sorted_compare: false,
normalized_uuids: false,
normalized_metrics: false,
normalized_filters: false,
normalizer: Normalizer::new(),
text,
}
}
#[cfg(test)]
fn with_sorted_compare(mut self) -> Self {
self.sorted_compare = true;
pub fn text(&self) -> &str {
&self.text
}
pub fn with_sorted_compare(mut self) -> Self {
self.normalizer.sorted_compare = true;
self
}
/// Get a reference to the query text.
pub fn text(&self) -> &str {
self.text.as_ref()
pub fn with_normalized_uuids(mut self) -> Self {
self.normalizer.normalized_uuids = true;
self
}
/// Get the query's sorted compare.
pub fn sorted_compare(&self) -> bool {
self.sorted_compare
pub fn with_normalize_metrics(mut self) -> Self {
self.normalizer.normalized_metrics = true;
self
}
/// Get queries normalized UUID
pub fn normalized_uuids(&self) -> bool {
self.normalized_uuids
pub fn with_normalize_filters(mut self) -> Self {
self.normalizer.normalized_filters = true;
self
}
/// Use normalized timing values
pub fn normalized_metrics(&self) -> bool {
self.normalized_metrics
/// Take the output of running the query and apply the specified normalizations to them
pub fn normalize_results(&self, results: Vec<RecordBatch>) -> Vec<String> {
self.normalizer.normalize_results(results)
}
/// Use normalized filter plans
pub fn normalized_filters(&self) -> bool {
self.normalized_filters
/// Adds information on what normalizations were applied to the input
pub fn add_description(&self, output: &mut Vec<String>) {
self.normalizer.add_description(output)
}
}
#[derive(Debug, Default)]
struct QueryBuilder {
query: Query,
pub query: Query,
}
impl QueryBuilder {
@ -85,22 +75,6 @@ impl QueryBuilder {
self.query.text.push(c)
}
fn sorted_compare(&mut self) {
self.query.sorted_compare = true;
}
fn normalized_uuids(&mut self) {
self.query.normalized_uuids = true;
}
fn normalize_metrics(&mut self) {
self.query.normalized_metrics = true;
}
fn normalize_filters(&mut self) {
self.query.normalized_filters = true;
}
fn is_empty(&self) -> bool {
self.query.text.is_empty()
}
@ -125,54 +99,57 @@ impl TestQueries {
S: AsRef<str>,
{
let mut queries = vec![];
let mut builder = QueryBuilder::new();
lines.into_iter().for_each(|line| {
let line = line.as_ref().trim();
const COMPARE_STR: &str = "-- IOX_COMPARE: ";
if line.starts_with(COMPARE_STR) {
let (_, options) = line.split_at(COMPARE_STR.len());
for option in options.split(',') {
let option = option.trim();
match option {
"sorted" => {
builder.sorted_compare();
let mut builder = lines
.into_iter()
.fold(QueryBuilder::new(), |mut builder, line| {
let line = line.as_ref().trim();
const COMPARE_STR: &str = "-- IOX_COMPARE: ";
if line.starts_with(COMPARE_STR) {
let (_, options) = line.split_at(COMPARE_STR.len());
for option in options.split(',') {
let option = option.trim();
match option {
"sorted" => {
builder.query = builder.query.with_sorted_compare();
}
"uuid" => {
builder.query = builder.query.with_normalized_uuids();
}
"metrics" => {
builder.query = builder.query.with_normalize_metrics();
}
"filters" => {
builder.query = builder.query.with_normalize_filters();
}
_ => {}
}
"uuid" => {
builder.normalized_uuids();
}
"metrics" => {
builder.normalize_metrics();
}
"filters" => {
builder.normalize_filters();
}
_ => {}
}
}
}
if line.starts_with("--") {
return;
}
if line.is_empty() {
return;
}
// replace newlines
if !builder.is_empty() {
builder.push(' ');
}
builder.push_str(line);
// declare queries when we see a semicolon at the end of the line
if line.ends_with(';') {
if let Some(q) = builder.build_and_reset() {
queries.push(q);
if line.starts_with("--") {
return builder;
}
if line.is_empty() {
return builder;
}
}
});
// replace newlines
if !builder.is_empty() {
builder.push(' ');
}
builder.push_str(line);
// declare queries when we see a semicolon at the end of the line
if line.ends_with(';') {
if let Some(q) = builder.build_and_reset() {
queries.push(q);
}
}
builder
});
// get last one, if any
if let Some(q) = builder.build_and_reset() {
queries.push(q);
}

View File

@ -37,6 +37,7 @@ flatbuffers = { version = "23", features = ["std"] }
flate2 = { version = "1", features = ["miniz_oxide", "rust_backend"] }
futures-channel = { version = "0.3", features = ["alloc", "futures-sink", "sink", "std"] }
futures-core = { version = "0.3", features = ["alloc", "std"] }
futures-executor = { version = "0.3", features = ["std"] }
futures-io = { version = "0.3", features = ["std"] }
futures-sink = { version = "0.3", features = ["alloc", "std"] }
futures-task = { version = "0.3", default-features = false, features = ["alloc", "std"] }
@ -74,8 +75,8 @@ serde_json = { version = "1", features = ["raw_value", "std"] }
sha2 = { version = "0.10", features = ["std"] }
similar = { version = "2", features = ["inline", "text"] }
smallvec = { version = "1", default-features = false, features = ["union"] }
sqlx = { version = "0.6", features = ["_rt-tokio", "json", "macros", "migrate", "postgres", "runtime-tokio-rustls", "sqlx-macros", "tls", "uuid"] }
sqlx-core = { version = "0.6", default-features = false, features = ["_rt-tokio", "_tls-rustls", "any", "base64", "crc", "dirs", "hkdf", "hmac", "json", "md-5", "migrate", "postgres", "rand", "runtime-tokio-rustls", "rustls", "rustls-pemfile", "serde", "serde_json", "sha1", "sha2", "tokio-stream", "uuid", "webpki-roots", "whoami"] }
sqlx = { version = "0.6", features = ["_rt-tokio", "json", "macros", "migrate", "postgres", "runtime-tokio-rustls", "sqlite", "sqlx-macros", "tls", "uuid"] }
sqlx-core = { version = "0.6", default-features = false, features = ["_rt-tokio", "_tls-rustls", "any", "base64", "crc", "dirs", "flume", "futures-executor", "hkdf", "hmac", "json", "libsqlite3-sys", "md-5", "migrate", "postgres", "rand", "runtime-tokio-rustls", "rustls", "rustls-pemfile", "serde", "serde_json", "sha1", "sha2", "sqlite", "tokio-stream", "uuid", "webpki-roots", "whoami"] }
thrift = { version = "0.17", features = ["log", "server", "threadpool"] }
tokio = { version = "1", features = ["bytes", "fs", "io-std", "io-util", "libc", "macros", "memchr", "mio", "net", "num_cpus", "parking_lot", "rt", "rt-multi-thread", "signal", "signal-hook-registry", "socket2", "sync", "time", "tokio-macros", "tracing"] }
tokio-stream = { version = "0.1", features = ["fs", "net", "time"] }
@ -107,6 +108,7 @@ either = { version = "1", features = ["use_std"] }
fixedbitset = { version = "0.4", features = ["std"] }
futures-channel = { version = "0.3", features = ["alloc", "futures-sink", "sink", "std"] }
futures-core = { version = "0.3", features = ["alloc", "std"] }
futures-executor = { version = "0.3", features = ["std"] }
futures-io = { version = "0.3", features = ["std"] }
futures-sink = { version = "0.3", features = ["alloc", "std"] }
futures-task = { version = "0.3", default-features = false, features = ["alloc", "std"] }
@ -137,8 +139,8 @@ serde = { version = "1", features = ["derive", "rc", "serde_derive", "std"] }
serde_json = { version = "1", features = ["raw_value", "std"] }
sha2 = { version = "0.10", features = ["std"] }
smallvec = { version = "1", default-features = false, features = ["union"] }
sqlx-core = { version = "0.6", default-features = false, features = ["_rt-tokio", "_tls-rustls", "any", "base64", "crc", "dirs", "hkdf", "hmac", "json", "md-5", "migrate", "postgres", "rand", "runtime-tokio-rustls", "rustls", "rustls-pemfile", "serde", "serde_json", "sha1", "sha2", "tokio-stream", "uuid", "webpki-roots", "whoami"] }
sqlx-macros = { version = "0.6", default-features = false, features = ["_rt-tokio", "json", "migrate", "postgres", "runtime-tokio-rustls", "serde_json", "sha2", "uuid"] }
sqlx-core = { version = "0.6", default-features = false, features = ["_rt-tokio", "_tls-rustls", "any", "base64", "crc", "dirs", "flume", "futures-executor", "hkdf", "hmac", "json", "libsqlite3-sys", "md-5", "migrate", "postgres", "rand", "runtime-tokio-rustls", "rustls", "rustls-pemfile", "serde", "serde_json", "sha1", "sha2", "sqlite", "tokio-stream", "uuid", "webpki-roots", "whoami"] }
sqlx-macros = { version = "0.6", default-features = false, features = ["_rt-tokio", "json", "migrate", "postgres", "runtime-tokio-rustls", "serde_json", "sha2", "sqlite", "uuid"] }
syn = { version = "1", features = ["clone-impls", "derive", "extra-traits", "full", "parsing", "printing", "proc-macro", "quote", "visit", "visit-mut"] }
tokio = { version = "1", features = ["bytes", "fs", "io-std", "io-util", "libc", "macros", "memchr", "mio", "net", "num_cpus", "parking_lot", "rt", "rt-multi-thread", "signal", "signal-hook-registry", "socket2", "sync", "time", "tokio-macros", "tracing"] }
tokio-stream = { version = "0.1", features = ["fs", "net", "time"] }