Merge branch 'main' into dom/gossip-parquet-crate

pull/24376/head
Dom 2023-08-25 13:07:07 +01:00 committed by GitHub
commit fa738cfec3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 788 additions and 576 deletions

39
Cargo.lock generated
View File

@ -89,16 +89,15 @@ checksum = "4b46cbb362ab8752921c97e041f5e366ee6297bd428a31275b9fcf1e380f7299"
[[package]] [[package]]
name = "anstream" name = "anstream"
version = "0.3.2" version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ca84f3628370c59db74ee214b3263d58f9aadd9b4fe7e711fd87dc452b7f163" checksum = "b1f58811cfac344940f1a400b6e6231ce35171f614f26439e80f8c1465c5cc0c"
dependencies = [ dependencies = [
"anstyle", "anstyle",
"anstyle-parse", "anstyle-parse",
"anstyle-query", "anstyle-query",
"anstyle-wincon", "anstyle-wincon",
"colorchoice", "colorchoice",
"is-terminal",
"utf8parse", "utf8parse",
] ]
@ -128,9 +127,9 @@ dependencies = [
[[package]] [[package]]
name = "anstyle-wincon" name = "anstyle-wincon"
version = "1.0.2" version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c677ab05e09154296dd37acecd46420c17b9713e8366facafa8fc0885167cf4c" checksum = "58f54d10c6dfa51283a066ceab3ec1ab78d13fae00aa49243a45e4571fb79dfd"
dependencies = [ dependencies = [
"anstyle", "anstyle",
"windows-sys 0.48.0", "windows-sys 0.48.0",
@ -869,9 +868,9 @@ dependencies = [
[[package]] [[package]]
name = "clap" name = "clap"
version = "4.3.24" version = "4.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fb690e81c7840c0d7aade59f242ea3b41b9bc27bcd5997890e7702ae4b32e487" checksum = "1d5f1946157a96594eb2d2c10eb7ad9a2b27518cb3000209dec700c35df9197d"
dependencies = [ dependencies = [
"clap_builder", "clap_builder",
"clap_derive", "clap_derive",
@ -902,9 +901,9 @@ dependencies = [
[[package]] [[package]]
name = "clap_builder" name = "clap_builder"
version = "4.3.24" version = "4.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5ed2e96bc16d8d740f6f48d663eddf4b8a0983e79210fd55479b7bcd0a69860e" checksum = "78116e32a042dd73c2901f0dc30790d20ff3447f3e3472fad359e8c3d282bcd6"
dependencies = [ dependencies = [
"anstream", "anstream",
"anstyle", "anstyle",
@ -915,9 +914,9 @@ dependencies = [
[[package]] [[package]]
name = "clap_derive" name = "clap_derive"
version = "4.3.12" version = "4.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "54a9bb5758fc5dfe728d1019941681eccaf0cf8a4189b692a0ee2f2ecf90a050" checksum = "c9fd1a5729c4548118d7d70ff234a44868d00489a4b6597b0b020918a0e91a1a"
dependencies = [ dependencies = [
"heck", "heck",
"proc-macro2", "proc-macro2",
@ -1390,7 +1389,7 @@ dependencies = [
[[package]] [[package]]
name = "datafusion" name = "datafusion"
version = "29.0.0" version = "29.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=f2c0100a5a10bf3ea166a1a590d94b8c9b6cf673#f2c0100a5a10bf3ea166a1a590d94b8c9b6cf673" source = "git+https://github.com/apache/arrow-datafusion.git?rev=f3722c0af8418bcb19cf9dc5f7e458a3aa5f0f32#f3722c0af8418bcb19cf9dc5f7e458a3aa5f0f32"
dependencies = [ dependencies = [
"ahash", "ahash",
"arrow", "arrow",
@ -1438,7 +1437,7 @@ dependencies = [
[[package]] [[package]]
name = "datafusion-common" name = "datafusion-common"
version = "29.0.0" version = "29.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=f2c0100a5a10bf3ea166a1a590d94b8c9b6cf673#f2c0100a5a10bf3ea166a1a590d94b8c9b6cf673" source = "git+https://github.com/apache/arrow-datafusion.git?rev=f3722c0af8418bcb19cf9dc5f7e458a3aa5f0f32#f3722c0af8418bcb19cf9dc5f7e458a3aa5f0f32"
dependencies = [ dependencies = [
"arrow", "arrow",
"arrow-array", "arrow-array",
@ -1452,7 +1451,7 @@ dependencies = [
[[package]] [[package]]
name = "datafusion-execution" name = "datafusion-execution"
version = "29.0.0" version = "29.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=f2c0100a5a10bf3ea166a1a590d94b8c9b6cf673#f2c0100a5a10bf3ea166a1a590d94b8c9b6cf673" source = "git+https://github.com/apache/arrow-datafusion.git?rev=f3722c0af8418bcb19cf9dc5f7e458a3aa5f0f32#f3722c0af8418bcb19cf9dc5f7e458a3aa5f0f32"
dependencies = [ dependencies = [
"arrow", "arrow",
"dashmap", "dashmap",
@ -1471,7 +1470,7 @@ dependencies = [
[[package]] [[package]]
name = "datafusion-expr" name = "datafusion-expr"
version = "29.0.0" version = "29.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=f2c0100a5a10bf3ea166a1a590d94b8c9b6cf673#f2c0100a5a10bf3ea166a1a590d94b8c9b6cf673" source = "git+https://github.com/apache/arrow-datafusion.git?rev=f3722c0af8418bcb19cf9dc5f7e458a3aa5f0f32#f3722c0af8418bcb19cf9dc5f7e458a3aa5f0f32"
dependencies = [ dependencies = [
"ahash", "ahash",
"arrow", "arrow",
@ -1485,7 +1484,7 @@ dependencies = [
[[package]] [[package]]
name = "datafusion-optimizer" name = "datafusion-optimizer"
version = "29.0.0" version = "29.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=f2c0100a5a10bf3ea166a1a590d94b8c9b6cf673#f2c0100a5a10bf3ea166a1a590d94b8c9b6cf673" source = "git+https://github.com/apache/arrow-datafusion.git?rev=f3722c0af8418bcb19cf9dc5f7e458a3aa5f0f32#f3722c0af8418bcb19cf9dc5f7e458a3aa5f0f32"
dependencies = [ dependencies = [
"arrow", "arrow",
"async-trait", "async-trait",
@ -1502,7 +1501,7 @@ dependencies = [
[[package]] [[package]]
name = "datafusion-physical-expr" name = "datafusion-physical-expr"
version = "29.0.0" version = "29.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=f2c0100a5a10bf3ea166a1a590d94b8c9b6cf673#f2c0100a5a10bf3ea166a1a590d94b8c9b6cf673" source = "git+https://github.com/apache/arrow-datafusion.git?rev=f3722c0af8418bcb19cf9dc5f7e458a3aa5f0f32#f3722c0af8418bcb19cf9dc5f7e458a3aa5f0f32"
dependencies = [ dependencies = [
"ahash", "ahash",
"arrow", "arrow",
@ -1536,7 +1535,7 @@ dependencies = [
[[package]] [[package]]
name = "datafusion-proto" name = "datafusion-proto"
version = "29.0.0" version = "29.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=f2c0100a5a10bf3ea166a1a590d94b8c9b6cf673#f2c0100a5a10bf3ea166a1a590d94b8c9b6cf673" source = "git+https://github.com/apache/arrow-datafusion.git?rev=f3722c0af8418bcb19cf9dc5f7e458a3aa5f0f32#f3722c0af8418bcb19cf9dc5f7e458a3aa5f0f32"
dependencies = [ dependencies = [
"arrow", "arrow",
"chrono", "chrono",
@ -1550,7 +1549,7 @@ dependencies = [
[[package]] [[package]]
name = "datafusion-sql" name = "datafusion-sql"
version = "29.0.0" version = "29.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=f2c0100a5a10bf3ea166a1a590d94b8c9b6cf673#f2c0100a5a10bf3ea166a1a590d94b8c9b6cf673" source = "git+https://github.com/apache/arrow-datafusion.git?rev=f3722c0af8418bcb19cf9dc5f7e458a3aa5f0f32#f3722c0af8418bcb19cf9dc5f7e458a3aa5f0f32"
dependencies = [ dependencies = [
"arrow", "arrow",
"arrow-schema", "arrow-schema",
@ -6382,7 +6381,6 @@ name = "trogging"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"clap", "clap",
"is-terminal",
"logfmt", "logfmt",
"observability_deps", "observability_deps",
"regex", "regex",
@ -6977,7 +6975,6 @@ dependencies = [
"regex-syntax 0.7.4", "regex-syntax 0.7.4",
"reqwest", "reqwest",
"ring", "ring",
"rustix",
"rustls", "rustls",
"scopeguard", "scopeguard",
"serde", "serde",

View File

@ -123,8 +123,8 @@ license = "MIT OR Apache-2.0"
[workspace.dependencies] [workspace.dependencies]
arrow = { version = "45.0.0" } arrow = { version = "45.0.0" }
arrow-flight = { version = "45.0.0" } arrow-flight = { version = "45.0.0" }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "f2c0100a5a10bf3ea166a1a590d94b8c9b6cf673", default-features = false } datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "f3722c0af8418bcb19cf9dc5f7e458a3aa5f0f32", default-features = false }
datafusion-proto = { git = "https://github.com/apache/arrow-datafusion.git", rev = "f2c0100a5a10bf3ea166a1a590d94b8c9b6cf673" } datafusion-proto = { git = "https://github.com/apache/arrow-datafusion.git", rev = "f3722c0af8418bcb19cf9dc5f7e458a3aa5f0f32" }
hashbrown = { version = "0.14.0" } hashbrown = { version = "0.14.0" }
object_store = { version = "0.6.0" } object_store = { version = "0.6.0" }

View File

@ -0,0 +1,46 @@
use std::{fmt::Display, sync::Arc};
use async_trait::async_trait;
use backoff::{Backoff, BackoffConfig};
use data_types::{Column, TableId};
use iox_catalog::interface::Catalog;
use super::ColumnsSource;
#[derive(Debug)]
pub struct CatalogColumnsSource {
backoff_config: BackoffConfig,
catalog: Arc<dyn Catalog>,
}
impl CatalogColumnsSource {
pub fn new(backoff_config: BackoffConfig, catalog: Arc<dyn Catalog>) -> Self {
Self {
backoff_config,
catalog,
}
}
}
impl Display for CatalogColumnsSource {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "catalog")
}
}
#[async_trait]
impl ColumnsSource for CatalogColumnsSource {
async fn fetch(&self, table: TableId) -> Vec<Column> {
Backoff::new(&self.backoff_config)
.retry_all_errors("table_of_given_table_id", || async {
self.catalog
.repositories()
.await
.columns()
.list_by_table_id(table)
.await
})
.await
.expect("retry forever")
}
}

View File

@ -0,0 +1,71 @@
use std::{collections::HashMap, fmt::Display};
use async_trait::async_trait;
use data_types::{Column, TableId};
use super::ColumnsSource;
#[derive(Debug)]
pub struct MockColumnsSource {
tables: HashMap<TableId, Vec<Column>>,
}
impl MockColumnsSource {
#[allow(dead_code)] // not used anywhere
pub fn new(tables: HashMap<TableId, Vec<Column>>) -> Self {
Self { tables }
}
}
impl Display for MockColumnsSource {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "mock")
}
}
#[async_trait]
impl ColumnsSource for MockColumnsSource {
async fn fetch(&self, table: TableId) -> Vec<Column> {
self.tables.get(&table).cloned().unwrap_or_default()
}
}
#[cfg(test)]
mod tests {
use data_types::ColumnType;
use iox_tests::{ColumnBuilder, TableBuilder};
use super::*;
#[test]
fn test_display() {
assert_eq!(
MockColumnsSource::new(HashMap::default()).to_string(),
"mock",
)
}
#[tokio::test]
async fn test_fetch() {
// // t_1 has one column and t_2 has no column
let t1 = TableBuilder::new(1).with_name("table1").build();
let t1_c1 = ColumnBuilder::new(1, t1.id.get())
.with_name("time")
.with_column_type(ColumnType::Time)
.build();
let t2 = TableBuilder::new(2).with_name("table2").build();
let tables = HashMap::from([(t1.id, vec![t1_c1.clone()]), (t2.id, vec![])]);
let source = MockColumnsSource::new(tables);
// different tables
assert_eq!(source.fetch(t1.id).await, vec![t1_c1.clone()],);
assert_eq!(source.fetch(t2.id).await, vec![]);
// fetching does not drain
assert_eq!(source.fetch(t1.id).await, vec![t1_c1],);
// unknown table => empty result
assert_eq!(source.fetch(TableId::new(3)).await, vec![]);
}
}

View File

@ -0,0 +1,15 @@
use std::fmt::{Debug, Display};
use async_trait::async_trait;
use data_types::{Column, TableId};
pub mod catalog;
pub mod mock;
#[async_trait]
pub trait ColumnsSource: Debug + Display + Send + Sync {
/// Get Columns for a given table
///
/// This method performs retries.
async fn fetch(&self, table: TableId) -> Vec<Column>;
}

View File

@ -12,6 +12,7 @@ use crate::{config::Config, error::ErrorKind, object_store::ignore_writes::Ignor
use super::{ use super::{
changed_files_filter::logging::LoggingChangedFiles, changed_files_filter::logging::LoggingChangedFiles,
columns_source::catalog::CatalogColumnsSource,
commit::CommitToScheduler, commit::CommitToScheduler,
compaction_job_done_sink::{ compaction_job_done_sink::{
error_kind::ErrorKindCompactionJobDoneSinkWrapper, error_kind::ErrorKindCompactionJobDoneSinkWrapper,
@ -192,6 +193,7 @@ fn make_compaction_job_stream(
fn make_partition_info_source(config: &Config) -> Arc<dyn PartitionInfoSource> { fn make_partition_info_source(config: &Config) -> Arc<dyn PartitionInfoSource> {
Arc::new(SubSourcePartitionInfoSource::new( Arc::new(SubSourcePartitionInfoSource::new(
CatalogColumnsSource::new(config.backoff_config.clone(), Arc::clone(&config.catalog)),
LoggingPartitionSourceWrapper::new(MetricsPartitionSourceWrapper::new( LoggingPartitionSourceWrapper::new(MetricsPartitionSourceWrapper::new(
CatalogPartitionSource::new(config.backoff_config.clone(), Arc::clone(&config.catalog)), CatalogPartitionSource::new(config.backoff_config.clone(), Arc::clone(&config.catalog)),
&config.metric_registry, &config.metric_registry,

View File

@ -12,6 +12,7 @@ use self::{
}; };
pub mod changed_files_filter; pub mod changed_files_filter;
pub mod columns_source;
pub(crate) mod commit; pub(crate) mod commit;
pub mod compaction_job_done_sink; pub mod compaction_job_done_sink;
pub mod compaction_job_stream; pub mod compaction_job_stream;

View File

@ -2,11 +2,12 @@ use std::{fmt::Display, sync::Arc};
use async_trait::async_trait; use async_trait::async_trait;
use data_types::PartitionId; use data_types::PartitionId;
use schema::sort::SortKey;
use crate::{ use crate::{
components::{ components::{
namespaces_source::NamespacesSource, partition_source::PartitionSource, columns_source::ColumnsSource, namespaces_source::NamespacesSource,
tables_source::TablesSource, partition_source::PartitionSource, tables_source::TablesSource,
}, },
error::DynError, error::DynError,
partition_info::PartitionInfo, partition_info::PartitionInfo,
@ -15,25 +16,34 @@ use crate::{
use super::PartitionInfoSource; use super::PartitionInfoSource;
#[derive(Debug)] #[derive(Debug)]
pub struct SubSourcePartitionInfoSource<P, T, N> pub struct SubSourcePartitionInfoSource<C, P, T, N>
where where
C: ColumnsSource,
P: PartitionSource, P: PartitionSource,
T: TablesSource, T: TablesSource,
N: NamespacesSource, N: NamespacesSource,
{ {
columns_source: C,
partition_source: P, partition_source: P,
tables_source: T, tables_source: T,
namespaces_source: N, namespaces_source: N,
} }
impl<P, T, N> SubSourcePartitionInfoSource<P, T, N> impl<C, P, T, N> SubSourcePartitionInfoSource<C, P, T, N>
where where
C: ColumnsSource,
P: PartitionSource, P: PartitionSource,
T: TablesSource, T: TablesSource,
N: NamespacesSource, N: NamespacesSource,
{ {
pub fn new(partition_source: P, tables_source: T, namespaces_source: N) -> Self { pub fn new(
columns_source: C,
partition_source: P,
tables_source: T,
namespaces_source: N,
) -> Self {
Self { Self {
columns_source,
partition_source, partition_source,
tables_source, tables_source,
namespaces_source, namespaces_source,
@ -41,8 +51,9 @@ where
} }
} }
impl<P, T, N> Display for SubSourcePartitionInfoSource<P, T, N> impl<C, P, T, N> Display for SubSourcePartitionInfoSource<C, P, T, N>
where where
C: ColumnsSource,
P: PartitionSource, P: PartitionSource,
T: TablesSource, T: TablesSource,
N: NamespacesSource, N: NamespacesSource,
@ -57,8 +68,9 @@ where
} }
#[async_trait] #[async_trait]
impl<P, T, N> PartitionInfoSource for SubSourcePartitionInfoSource<P, T, N> impl<C, P, T, N> PartitionInfoSource for SubSourcePartitionInfoSource<C, P, T, N>
where where
C: ColumnsSource,
P: PartitionSource, P: PartitionSource,
T: TablesSource, T: TablesSource,
N: NamespacesSource, N: NamespacesSource,
@ -96,6 +108,43 @@ where
.get(&table.name) .get(&table.name)
.ok_or_else::<DynError, _>(|| String::from("Cannot find table schema").into())?; .ok_or_else::<DynError, _>(|| String::from("Cannot find table schema").into())?;
// fetch table columns to get column names for the partition's sort_key_ids
let columns = self.columns_source.fetch(table.id).await;
// sort_key_ids of the partition
let sort_key_ids = partition.sort_key_ids_none_if_empty();
// sort_key of the partition. This will be removed but until then, use it to validate the
// sort_key computed by mapping sort_key_ids to column names
let p_sort_key = partition.sort_key();
// convert column ids to column names
let sort_key = sort_key_ids.as_ref().map(|ids| {
let names = ids
.iter()
.map(|id| {
columns
.iter()
.find(|c| c.id == *id)
.map(|c| c.name.clone())
.ok_or_else::<DynError, _>(|| {
format!(
"Cannot find column with id {} for table {}",
id.get(),
table.name
)
.into()
})
})
.collect::<Result<Vec<_>, _>>()
.expect("Cannot find column names for sort key ids");
SortKey::from_columns(names.iter().map(|s| &**s))
});
// This is here to catch bugs if any while mapping sort_key_ids to column names
// This wil be removed once sort_key is removed from partition
assert_eq!(sort_key, p_sort_key);
Ok(Arc::new(PartitionInfo { Ok(Arc::new(PartitionInfo {
partition_id, partition_id,
partition_hash_id: partition.hash_id().cloned(), partition_hash_id: partition.hash_id().cloned(),
@ -103,7 +152,7 @@ where
namespace_name: namespace.name, namespace_name: namespace.name,
table: Arc::new(table), table: Arc::new(table),
table_schema: Arc::new(table_schema.clone()), table_schema: Arc::new(table_schema.clone()),
sort_key: partition.sort_key(), sort_key,
partition_key: partition.partition_key, partition_key: partition.partition_key,
})) }))
} }

View File

@ -73,6 +73,8 @@ async fn compact_partition(
span.set_metadata("partition_id", partition_id.get().to_string()); span.set_metadata("partition_id", partition_id.get().to_string());
let scratchpad = components.scratchpad_gen.pad(); let scratchpad = components.scratchpad_gen.pad();
info!(partition_id = partition_id.get(), "compaction job starting");
let res = timeout_with_progress_checking(partition_timeout, |transmit_progress_signal| { let res = timeout_with_progress_checking(partition_timeout, |transmit_progress_signal| {
let components = Arc::clone(&components); let components = Arc::clone(&components);
let scratchpad = Arc::clone(&scratchpad); let scratchpad = Arc::clone(&scratchpad);

View File

@ -738,7 +738,7 @@ async fn random_backfill_empty_partition() {
- "L0 " - "L0 "
- "L0.190[0,355] 1.02us 76mb|------------L0.190------------| " - "L0.190[0,355] 1.02us 76mb|------------L0.190------------| "
- "L0.193[356,668] 1.02us 79mb |----------L0.193----------| " - "L0.193[356,668] 1.02us 79mb |----------L0.193----------| "
- "L0.195[669,986] 1.02us 67mb |----------L0.195----------| " - "L0.194[669,986] 1.02us 67mb |----------L0.194----------| "
- "L0.191[42,355] 1.04us 71mb |----------L0.191----------| " - "L0.191[42,355] 1.04us 71mb |----------L0.191----------| "
- "**** 3 Output Files (parquet_file_id not yet assigned), 292mb total:" - "**** 3 Output Files (parquet_file_id not yet assigned), 292mb total:"
- "L1 " - "L1 "
@ -746,11 +746,11 @@ async fn random_backfill_empty_partition() {
- "L1.?[339,676] 1.04us 100mb |------------L1.?------------| " - "L1.?[339,676] 1.04us 100mb |------------L1.?------------| "
- "L1.?[677,986] 1.04us 92mb |-----------L1.?-----------| " - "L1.?[677,986] 1.04us 92mb |-----------L1.?-----------| "
- "Committing partition 1:" - "Committing partition 1:"
- " Soft Deleting 4 files: L0.190, L0.191, L0.193, L0.195" - " Soft Deleting 4 files: L0.190, L0.191, L0.193, L0.194"
- " Creating 3 files" - " Creating 3 files"
- "**** Simulation run 59, type=split(ReduceOverlap)(split_times=[676]). 1 Input Files, 60mb total:" - "**** Simulation run 59, type=split(ReduceOverlap)(split_times=[676]). 1 Input Files, 60mb total:"
- "L0, all files 60mb " - "L0, all files 60mb "
- "L0.196[669,986] 1.05us |-----------------------------------------L0.196-----------------------------------------|" - "L0.195[669,986] 1.05us |-----------------------------------------L0.195-----------------------------------------|"
- "**** 2 Output Files (parquet_file_id not yet assigned), 60mb total:" - "**** 2 Output Files (parquet_file_id not yet assigned), 60mb total:"
- "L0 " - "L0 "
- "L0.?[669,676] 1.05us 2mb |L0.?| " - "L0.?[669,676] 1.05us 2mb |L0.?| "
@ -763,11 +763,11 @@ async fn random_backfill_empty_partition() {
- "L0.?[42,338] 1.05us 38mb |---------------------------------------L0.?----------------------------------------| " - "L0.?[42,338] 1.05us 38mb |---------------------------------------L0.?----------------------------------------| "
- "L0.?[339,355] 1.05us 2mb |L0.?|" - "L0.?[339,355] 1.05us 2mb |L0.?|"
- "Committing partition 1:" - "Committing partition 1:"
- " Soft Deleting 2 files: L0.192, L0.196" - " Soft Deleting 2 files: L0.192, L0.195"
- " Creating 4 files" - " Creating 4 files"
- "**** Simulation run 61, type=split(CompactAndSplitOutput(FoundSubsetLessThanMaxCompactSize))(split_times=[528]). 2 Input Files, 179mb total:" - "**** Simulation run 61, type=split(CompactAndSplitOutput(FoundSubsetLessThanMaxCompactSize))(split_times=[528]). 2 Input Files, 179mb total:"
- "L0 " - "L0 "
- "L0.194[356,668] 1.04us 79mb |-------------------------------------L0.194--------------------------------------| " - "L0.196[356,668] 1.04us 79mb |-------------------------------------L0.196--------------------------------------| "
- "L1 " - "L1 "
- "L1.199[339,676] 1.04us 100mb|-----------------------------------------L1.199-----------------------------------------|" - "L1.199[339,676] 1.04us 100mb|-----------------------------------------L1.199-----------------------------------------|"
- "**** 2 Output Files (parquet_file_id not yet assigned), 179mb total:" - "**** 2 Output Files (parquet_file_id not yet assigned), 179mb total:"
@ -775,7 +775,7 @@ async fn random_backfill_empty_partition() {
- "L1.?[339,528] 1.04us 100mb|----------------------L1.?----------------------| " - "L1.?[339,528] 1.04us 100mb|----------------------L1.?----------------------| "
- "L1.?[529,676] 1.04us 78mb |----------------L1.?-----------------| " - "L1.?[529,676] 1.04us 78mb |----------------L1.?-----------------| "
- "Committing partition 1:" - "Committing partition 1:"
- " Soft Deleting 2 files: L0.194, L1.199" - " Soft Deleting 2 files: L0.196, L1.199"
- " Creating 2 files" - " Creating 2 files"
- "**** Simulation run 62, type=split(ReduceOverlap)(split_times=[528]). 1 Input Files, 38mb total:" - "**** Simulation run 62, type=split(ReduceOverlap)(split_times=[528]). 1 Input Files, 38mb total:"
- "L0, all files 38mb " - "L0, all files 38mb "

File diff suppressed because it is too large Load Diff

View File

@ -527,6 +527,21 @@ impl Partition {
pub fn sort_key_ids(&self) -> Option<&SortedColumnSet> { pub fn sort_key_ids(&self) -> Option<&SortedColumnSet> {
self.sort_key_ids.as_ref() self.sort_key_ids.as_ref()
} }
// todo: resue the same function in https://github.com/influxdata/influxdb_iox/pull/8556/
/// The sort_key_ids if present and not empty
pub fn sort_key_ids_none_if_empty(&self) -> Option<SortedColumnSet> {
match self.sort_key_ids.as_ref() {
None => None,
Some(sort_key_ids) => {
if sort_key_ids.is_empty() {
None
} else {
Some(sort_key_ids.clone())
}
}
}
}
} }
#[cfg(test)] #[cfg(test)]

View File

@ -34,10 +34,6 @@ use arrow::record_batch::RecordBatch;
use async_trait::async_trait; use async_trait::async_trait;
use datafusion::{ use datafusion::{
catalog::CatalogProvider, catalog::CatalogProvider,
common::{
plan_err,
tree_node::{TreeNode, TreeNodeVisitor, VisitRecursion},
},
execution::{ execution::{
context::{QueryPlanner, SessionState, TaskContext}, context::{QueryPlanner, SessionState, TaskContext},
memory_pool::MemoryPool, memory_pool::MemoryPool,
@ -346,10 +342,13 @@ impl IOxSessionContext {
pub async fn sql_to_logical_plan(&self, sql: &str) -> Result<LogicalPlan> { pub async fn sql_to_logical_plan(&self, sql: &str) -> Result<LogicalPlan> {
let ctx = self.child_ctx("sql_to_logical_plan"); let ctx = self.child_ctx("sql_to_logical_plan");
debug!(text=%sql, "planning SQL query"); debug!(text=%sql, "planning SQL query");
// NOTE can not use ctx.inner.sql() here as it also interprets DDL
let plan = ctx.inner.state().create_logical_plan(sql).await?; let plan = ctx.inner.state().create_logical_plan(sql).await?;
// TODO use better API: https://github.com/apache/arrow-datafusion/issues/7328 // ensure the plan does not contain unwanted statements
verify_plan(&plan)?; let verifier = SQLOptions::new()
.with_allow_ddl(false) // no CREATE ...
.with_allow_dml(false) // no INSERT or COPY
.with_allow_statements(false); // no SET VARIABLE, etc
verifier.verify_plan(&plan)?;
Ok(plan) Ok(plan)
} }
@ -696,32 +695,6 @@ impl IOxSessionContext {
} }
} }
/// Returns an error if this plan contains any unsupported statements:
///
/// * DDL (`CREATE TABLE`) - creates state in a context that is dropped at the end of the request
/// * Statements (`SET VARIABLE`) - can cause denial of service by using more memory or cput
/// * DML (`INSERT`, `COPY`) - can write local files so is a security risk on servers
fn verify_plan(plan: &LogicalPlan) -> Result<()> {
plan.visit(&mut BadPlanVisitor {})?;
Ok(())
}
struct BadPlanVisitor {}
impl TreeNodeVisitor for BadPlanVisitor {
type N = LogicalPlan;
fn pre_visit(&mut self, node: &Self::N) -> Result<VisitRecursion> {
match node {
LogicalPlan::Ddl(ddl) => plan_err!("DDL not supported: {}", ddl.name()),
LogicalPlan::Dml(dml) => plan_err!("DML not supported: {}", dml.op),
LogicalPlan::Copy(_) => plan_err!("DML not supported: COPY"),
LogicalPlan::Statement(stmt) => plan_err!("Statement not supported: {}", stmt.name()),
_ => Ok(VisitRecursion::Continue),
}
}
}
/// Extension trait to pull IOx spans out of DataFusion contexts. /// Extension trait to pull IOx spans out of DataFusion contexts.
pub trait SessionContextIOxExt { pub trait SessionContextIOxExt {
/// Get child span of the current context. /// Get child span of the current context.

View File

@ -1,6 +1,7 @@
use data_types::{ use data_types::{
ColumnSet, CompactionLevel, NamespaceId, ParquetFile, ParquetFileId, Partition, PartitionId, Column, ColumnId, ColumnSet, ColumnType, CompactionLevel, NamespaceId, ParquetFile,
PartitionKey, SkippedCompaction, Table, TableId, Timestamp, TransitionPartitionId, ParquetFileId, Partition, PartitionId, PartitionKey, SkippedCompaction, Table, TableId,
Timestamp, TransitionPartitionId,
}; };
use uuid::Uuid; use uuid::Uuid;
@ -111,6 +112,51 @@ impl From<ParquetFile> for ParquetFileBuilder {
} }
} }
#[derive(Debug)]
/// Build [`Column`]s for testing
pub struct ColumnBuilder {
column: Column,
}
impl ColumnBuilder {
/// Create a builder to create a column with `table_id` `id`
pub fn new(id: i64, table_id: i64) -> Self {
Self {
column: Column {
id: ColumnId::new(id),
table_id: TableId::new(table_id),
name: "column".to_string(),
column_type: ColumnType::Tag,
},
}
}
/// Set the column name
pub fn with_name(self, name: &str) -> Self {
Self {
column: Column {
name: name.to_string(),
..self.column
},
}
}
/// Set column type
pub fn with_column_type(self, column_type: ColumnType) -> Self {
Self {
column: Column {
column_type,
..self.column
},
}
}
/// Create the table
pub fn build(self) -> Column {
self.column
}
}
#[derive(Debug)] #[derive(Debug)]
/// Build [`Table`]s for testing /// Build [`Table`]s for testing
pub struct TableBuilder { pub struct TableBuilder {

View File

@ -25,7 +25,9 @@ pub use catalog::{
}; };
mod builders; mod builders;
pub use builders::{ParquetFileBuilder, PartitionBuilder, SkippedCompactionBuilder, TableBuilder}; pub use builders::{
ColumnBuilder, ParquetFileBuilder, PartitionBuilder, SkippedCompactionBuilder, TableBuilder,
};
/// Create a partition identifier from an int (which gets used as the table ID) and a partition key /// Create a partition identifier from an int (which gets used as the table ID) and a partition key
/// with the string "arbitrary". Most useful in cases where there isn't any actual catalog /// with the string "arbitrary". Most useful in cases where there isn't any actual catalog

View File

@ -8,7 +8,6 @@ license.workspace = true
[dependencies] [dependencies]
clap = { version = "4", features = ["derive", "env"], optional = true } clap = { version = "4", features = ["derive", "env"], optional = true }
is-terminal = "0.4.9"
logfmt = { path = "../logfmt" } logfmt = { path = "../logfmt" }
observability_deps = { path = "../observability_deps" } observability_deps = { path = "../observability_deps" }
thiserror = "1.0.47" thiserror = "1.0.47"

View File

@ -20,14 +20,14 @@ pub mod config;
pub use config::*; pub use config::*;
use is_terminal::IsTerminal;
// Re-export tracing_subscriber // Re-export tracing_subscriber
pub use tracing_subscriber; pub use tracing_subscriber;
use observability_deps::tracing::{self, Subscriber}; use observability_deps::tracing::{self, Subscriber};
use std::cmp::min; use std::{
use std::io; cmp::min,
use std::io::Write; io::{self, IsTerminal, Write},
};
use thiserror::Error; use thiserror::Error;
use tracing_subscriber::{ use tracing_subscriber::{
fmt::{self, writer::BoxMakeWriter, MakeWriter}, fmt::{self, writer::BoxMakeWriter, MakeWriter},

View File

@ -28,9 +28,9 @@ bytes = { version = "1" }
chrono = { version = "0.4", default-features = false, features = ["alloc", "clock", "serde"] } chrono = { version = "0.4", default-features = false, features = ["alloc", "clock", "serde"] }
crossbeam-utils = { version = "0.8" } crossbeam-utils = { version = "0.8" }
crypto-common = { version = "0.1", default-features = false, features = ["std"] } crypto-common = { version = "0.1", default-features = false, features = ["std"] }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "f2c0100a5a10bf3ea166a1a590d94b8c9b6cf673" } datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "f3722c0af8418bcb19cf9dc5f7e458a3aa5f0f32" }
datafusion-optimizer = { git = "https://github.com/apache/arrow-datafusion.git", rev = "f2c0100a5a10bf3ea166a1a590d94b8c9b6cf673", default-features = false, features = ["crypto_expressions", "regex_expressions", "unicode_expressions"] } datafusion-optimizer = { git = "https://github.com/apache/arrow-datafusion.git", rev = "f3722c0af8418bcb19cf9dc5f7e458a3aa5f0f32", default-features = false, features = ["crypto_expressions", "regex_expressions", "unicode_expressions"] }
datafusion-physical-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev = "f2c0100a5a10bf3ea166a1a590d94b8c9b6cf673", default-features = false, features = ["crypto_expressions", "encoding_expressions", "regex_expressions", "unicode_expressions"] } datafusion-physical-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev = "f3722c0af8418bcb19cf9dc5f7e458a3aa5f0f32", default-features = false, features = ["crypto_expressions", "encoding_expressions", "regex_expressions", "unicode_expressions"] }
digest = { version = "0.10", features = ["mac", "std"] } digest = { version = "0.10", features = ["mac", "std"] }
either = { version = "1", features = ["serde"] } either = { version = "1", features = ["serde"] }
fixedbitset = { version = "0.4" } fixedbitset = { version = "0.4" }
@ -166,39 +166,33 @@ uuid = { version = "1", features = ["v4"] }
bitflags = { version = "2", default-features = false, features = ["std"] } bitflags = { version = "2", default-features = false, features = ["std"] }
nix = { version = "0.26" } nix = { version = "0.26" }
once_cell = { version = "1", default-features = false, features = ["unstable"] } once_cell = { version = "1", default-features = false, features = ["unstable"] }
rustix = { version = "0.38", features = ["fs", "termios"] }
rustls = { version = "0.21" } rustls = { version = "0.21" }
[target.x86_64-unknown-linux-gnu.build-dependencies] [target.x86_64-unknown-linux-gnu.build-dependencies]
bitflags = { version = "2", default-features = false, features = ["std"] } bitflags = { version = "2", default-features = false, features = ["std"] }
once_cell = { version = "1", default-features = false, features = ["unstable"] } once_cell = { version = "1", default-features = false, features = ["unstable"] }
rustix = { version = "0.38", features = ["fs", "termios"] }
rustls = { version = "0.21" } rustls = { version = "0.21" }
[target.x86_64-apple-darwin.dependencies] [target.x86_64-apple-darwin.dependencies]
bitflags = { version = "2", default-features = false, features = ["std"] } bitflags = { version = "2", default-features = false, features = ["std"] }
nix = { version = "0.26" } nix = { version = "0.26" }
once_cell = { version = "1", default-features = false, features = ["unstable"] } once_cell = { version = "1", default-features = false, features = ["unstable"] }
rustix = { version = "0.38", features = ["fs", "termios"] }
rustls = { version = "0.21" } rustls = { version = "0.21" }
[target.x86_64-apple-darwin.build-dependencies] [target.x86_64-apple-darwin.build-dependencies]
bitflags = { version = "2", default-features = false, features = ["std"] } bitflags = { version = "2", default-features = false, features = ["std"] }
once_cell = { version = "1", default-features = false, features = ["unstable"] } once_cell = { version = "1", default-features = false, features = ["unstable"] }
rustix = { version = "0.38", features = ["fs", "termios"] }
rustls = { version = "0.21" } rustls = { version = "0.21" }
[target.aarch64-apple-darwin.dependencies] [target.aarch64-apple-darwin.dependencies]
bitflags = { version = "2", default-features = false, features = ["std"] } bitflags = { version = "2", default-features = false, features = ["std"] }
nix = { version = "0.26" } nix = { version = "0.26" }
once_cell = { version = "1", default-features = false, features = ["unstable"] } once_cell = { version = "1", default-features = false, features = ["unstable"] }
rustix = { version = "0.38", features = ["fs", "termios"] }
rustls = { version = "0.21" } rustls = { version = "0.21" }
[target.aarch64-apple-darwin.build-dependencies] [target.aarch64-apple-darwin.build-dependencies]
bitflags = { version = "2", default-features = false, features = ["std"] } bitflags = { version = "2", default-features = false, features = ["std"] }
once_cell = { version = "1", default-features = false, features = ["unstable"] } once_cell = { version = "1", default-features = false, features = ["unstable"] }
rustix = { version = "0.38", features = ["fs", "termios"] }
rustls = { version = "0.21" } rustls = { version = "0.21" }
[target.x86_64-pc-windows-msvc.dependencies] [target.x86_64-pc-windows-msvc.dependencies]