feat: cache decoded partition value ranges (#8002)

Currently this only works for tags. We may want to decode the time
template as well at some point.

For #7974.
pull/24376/head
Marco Neumann 2023-06-16 11:38:34 +02:00 committed by GitHub
parent e346ec9286
commit 93ecb78ab9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 326 additions and 6 deletions

View File

@ -500,6 +500,11 @@ impl PartitionKey {
pub fn ptr_eq(&self, other: &Self) -> bool {
Arc::ptr_eq(&self.0, &other.0)
}
/// Returns underlying string.
pub fn inner(&self) -> &str {
&self.0
}
}
impl Display for PartitionKey {

View File

@ -11,7 +11,11 @@ use cache_system::{
loader::{metrics::MetricsLoader, FunctionLoader},
resource_consumption::FunctionEstimator,
};
use data_types::{ColumnId, PartitionId};
use data_types::{
partition_template::{build_column_values, ColumnValue},
ColumnId, Partition, PartitionId,
};
use datafusion::scalar::ScalarValue;
use iox_catalog::interface::Catalog;
use iox_time::TimeProvider;
use schema::sort::SortKey;
@ -70,11 +74,7 @@ impl PartitionCache {
.await
.expect("retry forever")?;
let sort_key = partition.sort_key().map(|sort_key| {
Arc::new(PartitionSortKey::new(sort_key, &extra.column_id_map_rev))
});
Some(CachedPartition { sort_key })
Some(CachedPartition::new(partition, &extra))
}
});
let loader = Arc::new(MetricsLoader::new(
@ -144,14 +144,118 @@ impl PartitionCache {
.await
.and_then(|p| p.sort_key)
}
/// Get known column ranges.
#[allow(dead_code)]
pub async fn column_ranges(
&self,
cached_table: Arc<CachedTable>,
partition_id: PartitionId,
span: Option<Span>,
) -> Option<ColumnRanges> {
self.cache
.get(partition_id, (cached_table, span))
.await
.map(|p| p.column_ranges)
}
}
/// Represent known min/max values for a specific column.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ColumnRange {
pub min_value: Arc<ScalarValue>,
pub max_value: Arc<ScalarValue>,
}
/// Represents the known min/max values for a subset (not all) of the columns in a partition.
///
/// The values may not actually in any row.
///
/// These ranges apply to ALL rows (esp. in ALL files and ingester chunks) within in given partition.
pub type ColumnRanges = Arc<HashMap<Arc<str>, ColumnRange>>;
#[derive(Debug, Clone)]
struct CachedPartition {
sort_key: Option<Arc<PartitionSortKey>>,
column_ranges: ColumnRanges,
}
impl CachedPartition {
fn new(partition: Partition, table: &CachedTable) -> Self {
let sort_key = partition
.sort_key()
.map(|sort_key| Arc::new(PartitionSortKey::new(sort_key, &table.column_id_map_rev)));
let mut column_ranges =
build_column_values(&table.partition_template, partition.partition_key.inner())
.filter_map(|(col, val)| {
// resolve column name to already existing Arc for cheaper storage
let col = Arc::clone(table.column_id_map_rev.get_key_value(col)?.0);
let range = match val {
ColumnValue::Identity(s) => {
let s = Arc::new(ScalarValue::from(s.as_ref()));
ColumnRange {
min_value: Arc::clone(&s),
max_value: s,
}
}
ColumnValue::Prefix(p) => {
if p.is_empty() {
// full range => value is useless
return None;
}
// If the partition only has a prefix of the tag value (it was truncated) then form a conservative
// range:
//
//
// # Minimum
// Use the prefix itself.
//
// Note that the minimum is inclusive.
//
// All values in the partition are either:
// - identical to the prefix, in which case they are included by the inclusive minimum
// - have the form `"<prefix><s>"`, and it holds that `"<prefix><s>" > "<prefix>"` for all
// strings `"<s>"`.
//
//
// # Maximum
// Use `"<prefix_excluding_last_char><char::max>"`.
//
// Note that the maximum is inclusive.
//
// All strings in this partition must be smaller than this constructed maximum, because
// string comparison is front-to-back and the `"<prefix_excluding_last_char><char::max>" > "<prefix>"`.
let min_value = Arc::new(ScalarValue::from(p.as_ref()));
let mut chars = p.as_ref().chars().collect::<Vec<_>>();
*chars.last_mut().expect("checked that prefix is not empty") =
std::char::MAX;
let max_value = Arc::new(ScalarValue::from(
chars.into_iter().collect::<String>().as_str(),
));
ColumnRange {
min_value,
max_value,
}
}
};
Some((col, range))
})
.collect::<HashMap<_, _>>();
column_ranges.shrink_to_fit();
Self {
sort_key,
column_ranges: Arc::new(column_ranges),
}
}
/// RAM-bytes EXCLUDING `self`.
fn size(&self) -> usize {
// Arc content
@ -159,6 +263,13 @@ impl CachedPartition {
.as_ref()
.map(|sk| sk.size())
.unwrap_or_default()
+ std::mem::size_of::<HashMap<Arc<str>, ColumnRange>>()
+ (self.column_ranges.capacity() * std::mem::size_of::<(Arc<str>, ColumnRange)>())
+ self
.column_ranges
.iter()
.map(|(col, range)| col.len() + range.min_value.size() + range.max_value.size())
.sum::<usize>()
}
}
@ -206,6 +317,9 @@ mod tests {
use super::*;
use crate::cache::{ram::test_util::test_ram_pool, test_util::assert_histogram_metric_count};
use data_types::{partition_template::TablePartitionTemplateOverride, ColumnType};
use generated_types::influxdata::iox::partition_template::v1::{
template_part::Part, PartitionTemplate, TemplatePart,
};
use iox_tests::TestCatalog;
use schema::{Schema, SchemaBuilder};
@ -294,6 +408,198 @@ mod tests {
}
}
#[tokio::test]
async fn test_column_ranges() {
let catalog = TestCatalog::new();
let ns = catalog.create_namespace_1hr_retention("ns").await;
let t = ns
.create_table_with_partition_template(
"table",
Some(PartitionTemplate {
parts: vec![
TemplatePart {
part: Some(Part::TagValue(String::from("tag2"))),
},
TemplatePart {
part: Some(Part::TagValue(String::from("tag1"))),
},
],
}),
)
.await;
let c1 = t.create_column("tag1", ColumnType::Tag).await;
let c2 = t.create_column("tag2", ColumnType::Tag).await;
let c3 = t.create_column("tag3", ColumnType::Tag).await;
let c4 = t.create_column("time", ColumnType::Time).await;
// See `data_types::partition_template` for the template language.
// Two normal values.
let p1 = t.create_partition("v1|v2").await.partition.clone();
// 2nd part is NULL
let p2 = t.create_partition("v1|!").await.partition.clone();
// 2nd part is empty
let p3 = t.create_partition("v1|^").await.partition.clone();
// 2nd part is truncated (i.e. the original value was longer)
let p4 = t.create_partition("v1|v2#").await.partition.clone();
// 2nd part is truncated to empty string
let p5 = t.create_partition("v1|#").await.partition.clone();
let cached_table = Arc::new(CachedTable {
id: t.table.id,
schema: schema(),
column_id_map: HashMap::from([
(c1.column.id, Arc::from(c1.column.name.clone())),
(c2.column.id, Arc::from(c2.column.name.clone())),
(c3.column.id, Arc::from(c3.column.name.clone())),
(c4.column.id, Arc::from(c4.column.name.clone())),
]),
column_id_map_rev: HashMap::from([
(Arc::from(c1.column.name.clone()), c1.column.id),
(Arc::from(c2.column.name.clone()), c2.column.id),
(Arc::from(c3.column.name.clone()), c3.column.id),
(Arc::from(c4.column.name.clone()), c4.column.id),
]),
primary_key_column_ids: [c1.column.id, c2.column.id, c3.column.id, c4.column.id].into(),
partition_template: t.table.partition_template.clone(),
});
let cache = PartitionCache::new(
catalog.catalog(),
BackoffConfig::default(),
catalog.time_provider(),
&catalog.metric_registry(),
test_ram_pool(),
true,
);
let ranges1a = cache
.column_ranges(Arc::clone(&cached_table), p1.id, None)
.await
.unwrap();
assert_eq!(
ranges1a.as_ref(),
&HashMap::from([
(
Arc::from("tag1"),
ColumnRange {
min_value: Arc::new(ScalarValue::from("v2")),
max_value: Arc::new(ScalarValue::from("v2"))
}
),
(
Arc::from("tag2"),
ColumnRange {
min_value: Arc::new(ScalarValue::from("v1")),
max_value: Arc::new(ScalarValue::from("v1"))
}
),
]),
);
assert!(Arc::ptr_eq(
&ranges1a.get("tag1").unwrap().min_value,
&ranges1a.get("tag1").unwrap().max_value,
));
assert_histogram_metric_count(&catalog.metric_registry, "partition_get_by_id", 1);
let ranges2 = cache
.column_ranges(Arc::clone(&cached_table), p2.id, None)
.await
.unwrap();
assert_eq!(
ranges2.as_ref(),
&HashMap::from([(
Arc::from("tag2"),
ColumnRange {
min_value: Arc::new(ScalarValue::from("v1")),
max_value: Arc::new(ScalarValue::from("v1"))
}
),]),
);
assert_histogram_metric_count(&catalog.metric_registry, "partition_get_by_id", 2);
let ranges3 = cache
.column_ranges(Arc::clone(&cached_table), p3.id, None)
.await
.unwrap();
assert_eq!(
ranges3.as_ref(),
&HashMap::from([
(
Arc::from("tag1"),
ColumnRange {
min_value: Arc::new(ScalarValue::from("")),
max_value: Arc::new(ScalarValue::from(""))
}
),
(
Arc::from("tag2"),
ColumnRange {
min_value: Arc::new(ScalarValue::from("v1")),
max_value: Arc::new(ScalarValue::from("v1"))
}
),
]),
);
assert_histogram_metric_count(&catalog.metric_registry, "partition_get_by_id", 3);
let ranges4 = cache
.column_ranges(Arc::clone(&cached_table), p4.id, None)
.await
.unwrap();
assert_eq!(
ranges4.as_ref(),
&HashMap::from([
(
Arc::from("tag1"),
ColumnRange {
min_value: Arc::new(ScalarValue::from("v2")),
max_value: Arc::new(ScalarValue::from("v\u{10FFFF}"))
}
),
(
Arc::from("tag2"),
ColumnRange {
min_value: Arc::new(ScalarValue::from("v1")),
max_value: Arc::new(ScalarValue::from("v1"))
}
),
]),
);
assert_histogram_metric_count(&catalog.metric_registry, "partition_get_by_id", 4);
let ranges5 = cache
.column_ranges(Arc::clone(&cached_table), p5.id, None)
.await
.unwrap();
assert_eq!(
ranges5.as_ref(),
&HashMap::from([(
Arc::from("tag2"),
ColumnRange {
min_value: Arc::new(ScalarValue::from("v1")),
max_value: Arc::new(ScalarValue::from("v1"))
}
),]),
);
assert_histogram_metric_count(&catalog.metric_registry, "partition_get_by_id", 5);
let ranges1b = cache
.column_ranges(Arc::clone(&cached_table), p1.id, None)
.await
.unwrap();
assert!(Arc::ptr_eq(&ranges1a, &ranges1b));
assert_histogram_metric_count(&catalog.metric_registry, "partition_get_by_id", 5);
// non-existing partition
for _ in 0..2 {
let res = cache
.column_ranges(Arc::clone(&cached_table), PartitionId::new(i64::MAX), None)
.await;
assert_eq!(res, None);
assert_histogram_metric_count(&catalog.metric_registry, "partition_get_by_id", 6);
}
}
#[tokio::test]
async fn test_cache_sharing() {
let catalog = TestCatalog::new();
@ -336,13 +642,22 @@ mod tests {
cache
.sort_key(Arc::clone(&cached_table), p3.id, &Vec::new(), None)
.await;
cache
.column_ranges(Arc::clone(&cached_table), p3.id, None)
.await;
assert_histogram_metric_count(&catalog.metric_registry, "partition_get_by_id", 1);
cache
.sort_key(Arc::clone(&cached_table), p2.id, &Vec::new(), None)
.await;
cache
.column_ranges(Arc::clone(&cached_table), p2.id, None)
.await;
assert_histogram_metric_count(&catalog.metric_registry, "partition_get_by_id", 2);
cache
.column_ranges(Arc::clone(&cached_table), p1.id, None)
.await;
cache
.sort_key(Arc::clone(&cached_table), p1.id, &Vec::new(), None)
.await;