Merge branch 'main' into dependabot/cargo/clap-4.4.4
commit
500112bd47
querier
src/cache
|
@ -4548,6 +4548,7 @@ dependencies = [
|
|||
"backoff",
|
||||
"bytes",
|
||||
"cache_system",
|
||||
"chrono",
|
||||
"client_util",
|
||||
"data_types",
|
||||
"datafusion",
|
||||
|
|
|
@ -12,6 +12,7 @@ async-trait = "0.1.73"
|
|||
backoff = { path = "../backoff" }
|
||||
bytes = "1.5"
|
||||
cache_system = { path = "../cache_system" }
|
||||
chrono = { version = "0.4", default-features = false }
|
||||
client_util = { path = "../client_util" }
|
||||
data_types = { path = "../data_types" }
|
||||
datafusion = { workspace = true }
|
||||
|
|
|
@ -15,16 +15,17 @@ use cache_system::{
|
|||
},
|
||||
resource_consumption::FunctionEstimator,
|
||||
};
|
||||
use chrono::{DateTime, Duration, TimeZone, Utc};
|
||||
use data_types::{
|
||||
partition_template::{build_column_values, ColumnValue},
|
||||
ColumnId, Partition, SortedColumnSet, TransitionPartitionId,
|
||||
ColumnId, Partition, SortedColumnSet, TransitionPartitionId, MAX_NANO_TIME, MIN_NANO_TIME,
|
||||
};
|
||||
use datafusion::scalar::ScalarValue;
|
||||
use iox_catalog::{interface::Catalog, partition_lookup_batch};
|
||||
use iox_query::chunk_statistics::{ColumnRange, ColumnRanges};
|
||||
use iox_time::TimeProvider;
|
||||
use observability_deps::tracing::debug;
|
||||
use schema::sort::SortKey;
|
||||
use schema::{sort::SortKey, TIME_DATA_TIMEZONE};
|
||||
use std::{
|
||||
collections::{hash_map::Entry, HashMap, HashSet},
|
||||
mem::{size_of, size_of_val},
|
||||
|
@ -266,72 +267,107 @@ impl CachedPartition {
|
|||
p_sort_key.as_ref()
|
||||
);
|
||||
|
||||
let mut column_ranges =
|
||||
let mut column_ranges = HashMap::new();
|
||||
let mut ignore = HashSet::new();
|
||||
for (col, val) in
|
||||
build_column_values(&table.partition_template, partition.partition_key.inner())
|
||||
.filter_map(|(col, val)| {
|
||||
// resolve column name to already existing Arc for cheaper storage
|
||||
let col = Arc::clone(table.column_id_map_rev.get_key_value(col)?.0);
|
||||
{
|
||||
if ignore.contains(col) {
|
||||
continue;
|
||||
}
|
||||
|
||||
let range = match val {
|
||||
ColumnValue::Identity(s) => {
|
||||
let s = Arc::new(ScalarValue::from(s.as_ref()));
|
||||
ColumnRange {
|
||||
min_value: Arc::clone(&s),
|
||||
max_value: s,
|
||||
}
|
||||
}
|
||||
ColumnValue::Prefix(p) => {
|
||||
if p.is_empty() {
|
||||
// full range => value is useless
|
||||
return None;
|
||||
}
|
||||
// resolve column name to already existing Arc for cheaper storage
|
||||
let Some((col, _id)) = table.column_id_map_rev.get_key_value(col) else {
|
||||
continue;
|
||||
};
|
||||
let col = Arc::clone(col);
|
||||
|
||||
// If the partition only has a prefix of the tag value (it was truncated) then form a conservative
|
||||
// range:
|
||||
//
|
||||
//
|
||||
// # Minimum
|
||||
// Use the prefix itself.
|
||||
//
|
||||
// Note that the minimum is inclusive.
|
||||
//
|
||||
// All values in the partition are either:
|
||||
// - identical to the prefix, in which case they are included by the inclusive minimum
|
||||
// - have the form `"<prefix><s>"`, and it holds that `"<prefix><s>" > "<prefix>"` for all
|
||||
// strings `"<s>"`.
|
||||
//
|
||||
//
|
||||
// # Maximum
|
||||
// Use `"<prefix_excluding_last_char><char::max>"`.
|
||||
//
|
||||
// Note that the maximum is inclusive.
|
||||
//
|
||||
// All strings in this partition must be smaller than this constructed maximum, because
|
||||
// string comparison is front-to-back and the `"<prefix_excluding_last_char><char::max>" > "<prefix>"`.
|
||||
let range = match val {
|
||||
ColumnValue::Identity(s) => {
|
||||
let s = Arc::new(ScalarValue::from(s.as_ref()));
|
||||
ColumnRange {
|
||||
min_value: Arc::clone(&s),
|
||||
max_value: s,
|
||||
}
|
||||
}
|
||||
ColumnValue::Prefix(p) => {
|
||||
if p.is_empty() {
|
||||
// full range => value is useless
|
||||
continue;
|
||||
}
|
||||
|
||||
let min_value = Arc::new(ScalarValue::from(p.as_ref()));
|
||||
// If the partition only has a prefix of the tag value (it was truncated) then form a conservative
|
||||
// range:
|
||||
//
|
||||
//
|
||||
// # Minimum
|
||||
// Use the prefix itself.
|
||||
//
|
||||
// Note that the minimum is inclusive.
|
||||
//
|
||||
// All values in the partition are either:
|
||||
// - identical to the prefix, in which case they are included by the inclusive minimum
|
||||
// - have the form `"<prefix><s>"`, and it holds that `"<prefix><s>" > "<prefix>"` for all
|
||||
// strings `"<s>"`.
|
||||
//
|
||||
//
|
||||
// # Maximum
|
||||
// Use `"<prefix_excluding_last_char><char::max>"`.
|
||||
//
|
||||
// Note that the maximum is inclusive.
|
||||
//
|
||||
// All strings in this partition must be smaller than this constructed maximum, because
|
||||
// string comparison is front-to-back and the `"<prefix_excluding_last_char><char::max>" > "<prefix>"`.
|
||||
|
||||
let mut chars = p.as_ref().chars().collect::<Vec<_>>();
|
||||
*chars.last_mut().expect("checked that prefix is not empty") =
|
||||
std::char::MAX;
|
||||
let max_value = Arc::new(ScalarValue::from(
|
||||
chars.into_iter().collect::<String>().as_str(),
|
||||
));
|
||||
let min_value = Arc::new(ScalarValue::from(p.as_ref()));
|
||||
|
||||
ColumnRange {
|
||||
min_value,
|
||||
max_value,
|
||||
}
|
||||
}
|
||||
ColumnValue::Datetime { .. } => {
|
||||
// not yet supported
|
||||
return None;
|
||||
}
|
||||
};
|
||||
let mut chars = p.as_ref().chars().collect::<Vec<_>>();
|
||||
*chars.last_mut().expect("checked that prefix is not empty") = std::char::MAX;
|
||||
let max_value = Arc::new(ScalarValue::from(
|
||||
chars.into_iter().collect::<String>().as_str(),
|
||||
));
|
||||
|
||||
Some((col, range))
|
||||
})
|
||||
.collect::<HashMap<_, _>>();
|
||||
ColumnRange {
|
||||
min_value,
|
||||
max_value,
|
||||
}
|
||||
}
|
||||
ColumnValue::Datetime { begin, end } => ColumnRange {
|
||||
min_value: Arc::new(ScalarValue::TimestampNanosecond(
|
||||
Some(
|
||||
begin
|
||||
.max(t_min())
|
||||
.min(t_max())
|
||||
.timestamp_nanos_opt()
|
||||
.expect("min ts in range"),
|
||||
),
|
||||
TIME_DATA_TIMEZONE(),
|
||||
)),
|
||||
max_value: Arc::new(ScalarValue::TimestampNanosecond(
|
||||
// convert exclusive to inclusive end
|
||||
Some(
|
||||
end.checked_sub_signed(Duration::nanoseconds(1))
|
||||
.unwrap_or(end)
|
||||
.max(t_min())
|
||||
.min(t_max())
|
||||
.timestamp_nanos_opt()
|
||||
.expect("max ts in range"),
|
||||
),
|
||||
TIME_DATA_TIMEZONE(),
|
||||
)),
|
||||
},
|
||||
};
|
||||
|
||||
match column_ranges.entry(col) {
|
||||
Entry::Occupied(o) => {
|
||||
let (col, _) = o.remove_entry();
|
||||
ignore.insert(col);
|
||||
}
|
||||
Entry::Vacant(v) => {
|
||||
v.insert(range);
|
||||
}
|
||||
}
|
||||
}
|
||||
column_ranges.shrink_to_fit();
|
||||
|
||||
Self {
|
||||
|
@ -406,6 +442,16 @@ impl PartitionSortKey {
|
|||
}
|
||||
}
|
||||
|
||||
/// Minimum datatime that can exist in IOx.
|
||||
fn t_min() -> DateTime<Utc> {
|
||||
Utc.timestamp_nanos(MIN_NANO_TIME)
|
||||
}
|
||||
|
||||
/// Maximum datatime that can exist in IOx.
|
||||
fn t_max() -> DateTime<Utc> {
|
||||
Utc.timestamp_nanos(MAX_NANO_TIME)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
@ -413,6 +459,7 @@ mod tests {
|
|||
ram::test_util::test_ram_pool, test_util::assert_catalog_access_metric_count,
|
||||
};
|
||||
use async_trait::async_trait;
|
||||
use chrono::{Datelike, TimeZone, Utc};
|
||||
use data_types::{
|
||||
partition_template::TablePartitionTemplateOverride, ColumnType, PartitionId, PartitionKey,
|
||||
SortedColumnSet, TableId,
|
||||
|
@ -422,7 +469,7 @@ mod tests {
|
|||
template_part::Part, PartitionTemplate, TemplatePart,
|
||||
};
|
||||
use iox_tests::{TestCatalog, TestNamespace};
|
||||
use schema::{Schema, SchemaBuilder};
|
||||
use schema::{Schema, SchemaBuilder, TIME_COLUMN_NAME};
|
||||
use tokio::sync::Barrier;
|
||||
|
||||
#[tokio::test]
|
||||
|
@ -569,6 +616,9 @@ mod tests {
|
|||
"table",
|
||||
Some(PartitionTemplate {
|
||||
parts: vec![
|
||||
TemplatePart {
|
||||
part: Some(Part::TimeFormat(String::from("%Y"))),
|
||||
},
|
||||
TemplatePart {
|
||||
part: Some(Part::TagValue(String::from("tag2"))),
|
||||
},
|
||||
|
@ -582,19 +632,19 @@ mod tests {
|
|||
let c1 = t.create_column("tag1", ColumnType::Tag).await;
|
||||
let c2 = t.create_column("tag2", ColumnType::Tag).await;
|
||||
let c3 = t.create_column("tag3", ColumnType::Tag).await;
|
||||
let c4 = t.create_column("time", ColumnType::Time).await;
|
||||
let c4 = t.create_column(TIME_COLUMN_NAME, ColumnType::Time).await;
|
||||
|
||||
// See `data_types::partition_template` for the template language.
|
||||
// Two normal values.
|
||||
let p1 = t.create_partition("v1|v2").await.partition.clone();
|
||||
let p1 = t.create_partition("2023|v1|v2").await.partition.clone();
|
||||
// 2nd part is NULL
|
||||
let p2 = t.create_partition("v1|!").await.partition.clone();
|
||||
let p2 = t.create_partition("2023|v1|!").await.partition.clone();
|
||||
// 2nd part is empty
|
||||
let p3 = t.create_partition("v1|^").await.partition.clone();
|
||||
let p3 = t.create_partition("2023|v1|^").await.partition.clone();
|
||||
// 2nd part is truncated (i.e. the original value was longer)
|
||||
let p4 = t.create_partition("v1|v2#").await.partition.clone();
|
||||
let p4 = t.create_partition("2023|v1|v2#").await.partition.clone();
|
||||
// 2nd part is truncated to empty string
|
||||
let p5 = t.create_partition("v1|#").await.partition.clone();
|
||||
let p5 = t.create_partition("2023|v1|#").await.partition.clone();
|
||||
let cached_table = Arc::new(CachedTable {
|
||||
id: t.table.id,
|
||||
schema: schema(),
|
||||
|
@ -637,6 +687,7 @@ mod tests {
|
|||
assert_eq!(
|
||||
ranges1a.as_ref(),
|
||||
&HashMap::from([
|
||||
(Arc::from(TIME_COLUMN_NAME), year_column_range(2023),),
|
||||
(
|
||||
Arc::from("tag1"),
|
||||
ColumnRange {
|
||||
|
@ -670,13 +721,16 @@ mod tests {
|
|||
.column_ranges;
|
||||
assert_eq!(
|
||||
ranges2.as_ref(),
|
||||
&HashMap::from([(
|
||||
Arc::from("tag2"),
|
||||
ColumnRange {
|
||||
min_value: Arc::new(ScalarValue::from("v1")),
|
||||
max_value: Arc::new(ScalarValue::from("v1"))
|
||||
}
|
||||
),]),
|
||||
&HashMap::from([
|
||||
(Arc::from(TIME_COLUMN_NAME), year_column_range(2023),),
|
||||
(
|
||||
Arc::from("tag2"),
|
||||
ColumnRange {
|
||||
min_value: Arc::new(ScalarValue::from("v1")),
|
||||
max_value: Arc::new(ScalarValue::from("v1"))
|
||||
}
|
||||
),
|
||||
]),
|
||||
);
|
||||
assert_catalog_access_metric_count(
|
||||
&catalog.metric_registry,
|
||||
|
@ -692,6 +746,7 @@ mod tests {
|
|||
assert_eq!(
|
||||
ranges3.as_ref(),
|
||||
&HashMap::from([
|
||||
(Arc::from(TIME_COLUMN_NAME), year_column_range(2023),),
|
||||
(
|
||||
Arc::from("tag1"),
|
||||
ColumnRange {
|
||||
|
@ -722,6 +777,7 @@ mod tests {
|
|||
assert_eq!(
|
||||
ranges4.as_ref(),
|
||||
&HashMap::from([
|
||||
(Arc::from(TIME_COLUMN_NAME), year_column_range(2023),),
|
||||
(
|
||||
Arc::from("tag1"),
|
||||
ColumnRange {
|
||||
|
@ -751,13 +807,16 @@ mod tests {
|
|||
.column_ranges;
|
||||
assert_eq!(
|
||||
ranges5.as_ref(),
|
||||
&HashMap::from([(
|
||||
Arc::from("tag2"),
|
||||
ColumnRange {
|
||||
min_value: Arc::new(ScalarValue::from("v1")),
|
||||
max_value: Arc::new(ScalarValue::from("v1"))
|
||||
}
|
||||
),]),
|
||||
&HashMap::from([
|
||||
(Arc::from(TIME_COLUMN_NAME), year_column_range(2023)),
|
||||
(
|
||||
Arc::from("tag2"),
|
||||
ColumnRange {
|
||||
min_value: Arc::new(ScalarValue::from("v1")),
|
||||
max_value: Arc::new(ScalarValue::from("v1"))
|
||||
}
|
||||
),
|
||||
]),
|
||||
);
|
||||
assert_catalog_access_metric_count(
|
||||
&catalog.metric_registry,
|
||||
|
@ -816,6 +875,186 @@ mod tests {
|
|||
}
|
||||
}
|
||||
|
||||
/// Having multiple time-based parts is currently not supported. Most users will usually have a SINGLE time format
|
||||
/// tempate though, so this is usually not a big problem. This tests just ensures that we don't end up with anything weird.
|
||||
#[tokio::test]
|
||||
async fn test_column_ranges_unsupported_time() {
|
||||
let catalog = TestCatalog::new();
|
||||
|
||||
let ns = catalog.create_namespace_1hr_retention("ns").await;
|
||||
let t = ns
|
||||
.create_table_with_partition_template(
|
||||
"table",
|
||||
Some(PartitionTemplate {
|
||||
parts: vec![
|
||||
TemplatePart {
|
||||
part: Some(Part::TimeFormat(String::from("%Y"))),
|
||||
},
|
||||
TemplatePart {
|
||||
part: Some(Part::TimeFormat(String::from("%Y-%m"))),
|
||||
},
|
||||
TemplatePart {
|
||||
part: Some(Part::TimeFormat(String::from("%Y"))),
|
||||
},
|
||||
],
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
let c = t.create_column(TIME_COLUMN_NAME, ColumnType::Time).await;
|
||||
let p = t
|
||||
.create_partition("2023|2023-01|2023|2023-01")
|
||||
.await
|
||||
.partition
|
||||
.clone();
|
||||
let cached_table = Arc::new(CachedTable {
|
||||
id: t.table.id,
|
||||
schema: schema(),
|
||||
column_id_map: HashMap::from([(c.column.id, Arc::from(c.column.name.clone()))]),
|
||||
column_id_map_rev: HashMap::from([(Arc::from(c.column.name.clone()), c.column.id)]),
|
||||
primary_key_column_ids: [c.column.id].into(),
|
||||
partition_template: t.table.partition_template.clone(),
|
||||
});
|
||||
|
||||
let cache = PartitionCache::new(
|
||||
catalog.catalog(),
|
||||
BackoffConfig::default(),
|
||||
catalog.time_provider(),
|
||||
&catalog.metric_registry(),
|
||||
test_ram_pool(),
|
||||
true,
|
||||
);
|
||||
|
||||
let ranges = &cache
|
||||
.get_one(
|
||||
Arc::clone(&cached_table),
|
||||
&p.transition_partition_id(),
|
||||
&[],
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.unwrap()
|
||||
.column_ranges;
|
||||
assert_eq!(ranges.as_ref(), &HashMap::new(),);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_column_ranges_time_edges() {
|
||||
let catalog = TestCatalog::new();
|
||||
|
||||
// check that t_min and t_max roundtrip
|
||||
assert_eq!(t_min().timestamp_nanos_opt().unwrap(), MIN_NANO_TIME,);
|
||||
assert_eq!(t_max().timestamp_nanos_opt().unwrap(), MAX_NANO_TIME,);
|
||||
|
||||
let year_min = t_min().year();
|
||||
let year_max = t_max().year();
|
||||
println!("year_min: {year_min}, year_max: {year_max}");
|
||||
|
||||
let ns = catalog.create_namespace_1hr_retention("ns").await;
|
||||
let t = ns
|
||||
.create_table_with_partition_template(
|
||||
"table",
|
||||
Some(PartitionTemplate {
|
||||
parts: vec![TemplatePart {
|
||||
part: Some(Part::TimeFormat(String::from("%Y"))),
|
||||
}],
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
let c = t.create_column(TIME_COLUMN_NAME, ColumnType::Time).await;
|
||||
let p_min = t
|
||||
.create_partition(&year_min.to_string())
|
||||
.await
|
||||
.partition
|
||||
.clone();
|
||||
let p_max = t
|
||||
.create_partition(&year_max.to_string())
|
||||
.await
|
||||
.partition
|
||||
.clone();
|
||||
let cached_table = Arc::new(CachedTable {
|
||||
id: t.table.id,
|
||||
schema: schema(),
|
||||
column_id_map: HashMap::from([(c.column.id, Arc::from(c.column.name.clone()))]),
|
||||
column_id_map_rev: HashMap::from([(Arc::from(c.column.name.clone()), c.column.id)]),
|
||||
primary_key_column_ids: [c.column.id].into(),
|
||||
partition_template: t.table.partition_template.clone(),
|
||||
});
|
||||
|
||||
let cache = PartitionCache::new(
|
||||
catalog.catalog(),
|
||||
BackoffConfig::default(),
|
||||
catalog.time_provider(),
|
||||
&catalog.metric_registry(),
|
||||
test_ram_pool(),
|
||||
true,
|
||||
);
|
||||
|
||||
let ranges_min = &cache
|
||||
.get_one(
|
||||
Arc::clone(&cached_table),
|
||||
&p_min.transition_partition_id(),
|
||||
&[],
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.unwrap()
|
||||
.column_ranges;
|
||||
assert_eq!(
|
||||
ranges_min.as_ref(),
|
||||
&HashMap::from([(
|
||||
Arc::from(TIME_COLUMN_NAME),
|
||||
ColumnRange {
|
||||
min_value: Arc::new(ScalarValue::TimestampNanosecond(
|
||||
Some(t_min().timestamp_nanos_opt().unwrap()),
|
||||
TIME_DATA_TIMEZONE()
|
||||
)),
|
||||
max_value: Arc::new(ScalarValue::TimestampNanosecond(
|
||||
Some(
|
||||
Utc.with_ymd_and_hms(year_min + 1, 1, 1, 0, 0, 0)
|
||||
.unwrap()
|
||||
.timestamp_nanos_opt()
|
||||
.unwrap()
|
||||
- 1
|
||||
),
|
||||
TIME_DATA_TIMEZONE()
|
||||
))
|
||||
}
|
||||
)]),
|
||||
);
|
||||
|
||||
let ranges_max = &cache
|
||||
.get_one(
|
||||
Arc::clone(&cached_table),
|
||||
&p_max.transition_partition_id(),
|
||||
&[],
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.unwrap()
|
||||
.column_ranges;
|
||||
assert_eq!(
|
||||
ranges_max.as_ref(),
|
||||
&HashMap::from([(
|
||||
Arc::from(TIME_COLUMN_NAME),
|
||||
ColumnRange {
|
||||
min_value: Arc::new(ScalarValue::TimestampNanosecond(
|
||||
Some(
|
||||
Utc.with_ymd_and_hms(year_max, 1, 1, 0, 0, 0)
|
||||
.unwrap()
|
||||
.timestamp_nanos_opt()
|
||||
.unwrap()
|
||||
),
|
||||
TIME_DATA_TIMEZONE()
|
||||
)),
|
||||
max_value: Arc::new(ScalarValue::TimestampNanosecond(
|
||||
Some(t_max().timestamp_nanos_opt().unwrap()),
|
||||
TIME_DATA_TIMEZONE()
|
||||
))
|
||||
}
|
||||
)]),
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_expiration() {
|
||||
let catalog = TestCatalog::new();
|
||||
|
@ -1312,4 +1551,28 @@ mod tests {
|
|||
.next()
|
||||
}
|
||||
}
|
||||
|
||||
fn year_column_range(y: i32) -> ColumnRange {
|
||||
ColumnRange {
|
||||
min_value: Arc::new(ScalarValue::TimestampNanosecond(
|
||||
Some(
|
||||
Utc.with_ymd_and_hms(y, 1, 1, 0, 0, 0)
|
||||
.unwrap()
|
||||
.timestamp_nanos_opt()
|
||||
.unwrap(),
|
||||
),
|
||||
TIME_DATA_TIMEZONE(),
|
||||
)),
|
||||
max_value: Arc::new(ScalarValue::TimestampNanosecond(
|
||||
Some(
|
||||
Utc.with_ymd_and_hms(y + 1, 1, 1, 0, 0, 0)
|
||||
.unwrap()
|
||||
.timestamp_nanos_opt()
|
||||
.unwrap()
|
||||
- 1,
|
||||
),
|
||||
TIME_DATA_TIMEZONE(),
|
||||
)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue