fix: last cache with specific value columns can be queried (#25924)
parent
9a5424693c
commit
705a1659ad
|
@ -319,10 +319,6 @@ impl LastCache {
|
|||
Arc::clone(&self.schema)
|
||||
}
|
||||
|
||||
fn accept_new_fields(&self) -> bool {
|
||||
matches!(self.value_columns, ValueColumnType::AcceptNew { .. })
|
||||
}
|
||||
|
||||
fn should_update_schema_from_row(&self, row: &Row) -> bool {
|
||||
match &self.value_columns {
|
||||
ValueColumnType::AcceptNew { seen } => row.fields.iter().any(|f| !seen.contains(&f.id)),
|
||||
|
@ -352,7 +348,6 @@ impl LastCache {
|
|||
};
|
||||
values.push(value);
|
||||
}
|
||||
let accept_new_fields = self.accept_new_fields();
|
||||
let mut target = &mut self.state;
|
||||
let mut iter = self.key_column_ids.iter().zip(values).peekable();
|
||||
while let (Some((col_id, value)), peek) = (iter.next(), iter.peek()) {
|
||||
|
@ -380,7 +375,7 @@ impl LastCache {
|
|||
Arc::clone(&table_def),
|
||||
Arc::clone(&self.key_column_ids),
|
||||
&self.series_key,
|
||||
accept_new_fields,
|
||||
&self.value_columns,
|
||||
))
|
||||
}
|
||||
});
|
||||
|
@ -393,7 +388,7 @@ impl LastCache {
|
|||
Arc::clone(&table_def),
|
||||
Arc::clone(&self.key_column_ids),
|
||||
&self.series_key,
|
||||
accept_new_fields,
|
||||
&self.value_columns,
|
||||
));
|
||||
}
|
||||
let store = target.as_store_mut().expect(
|
||||
|
@ -796,7 +791,7 @@ struct LastCacheStore {
|
|||
/// shared with the parent `LastCache`.
|
||||
key_column_ids: Arc<IndexSet<ColumnId>>,
|
||||
/// Whether or not this store accepts new fields when they are added to the cached table
|
||||
accept_new_fields: bool,
|
||||
value_column_ids: Option<Vec<ColumnId>>,
|
||||
/// A ring buffer holding the instants at which entries in the cache were inserted
|
||||
///
|
||||
/// This is used to evict cache values that outlive the `ttl`
|
||||
|
@ -819,19 +814,49 @@ impl LastCacheStore {
|
|||
table_def: Arc<TableDefinition>,
|
||||
key_column_ids: Arc<IndexSet<ColumnId>>,
|
||||
series_keys: &HashSet<ColumnId>,
|
||||
accept_new_fields: bool,
|
||||
value_columns: &ValueColumnType,
|
||||
) -> Self {
|
||||
let cache = table_def
|
||||
.columns
|
||||
.iter()
|
||||
.filter(|&(col_id, _)| (!key_column_ids.contains(col_id)))
|
||||
.map(|(col_id, col_def)| {
|
||||
(
|
||||
*col_id,
|
||||
CacheColumn::new(col_def.data_type, count, series_keys.contains(col_id)),
|
||||
)
|
||||
})
|
||||
.collect();
|
||||
let (cache, value_column_ids) = match value_columns {
|
||||
ValueColumnType::AcceptNew { .. } => {
|
||||
let cache = table_def
|
||||
.columns
|
||||
.iter()
|
||||
.filter(|&(col_id, _)| (!key_column_ids.contains(col_id)))
|
||||
.map(|(col_id, col_def)| {
|
||||
(
|
||||
*col_id,
|
||||
CacheColumn::new(
|
||||
col_def.data_type,
|
||||
count,
|
||||
series_keys.contains(col_id),
|
||||
),
|
||||
)
|
||||
})
|
||||
.collect();
|
||||
(cache, None)
|
||||
}
|
||||
ValueColumnType::Explicit { columns } => {
|
||||
let cache = columns
|
||||
.iter()
|
||||
.map(|id| {
|
||||
table_def
|
||||
.column_definition_by_id(id)
|
||||
.expect("valid column id")
|
||||
})
|
||||
.map(|col_def| {
|
||||
(
|
||||
col_def.id,
|
||||
CacheColumn::new(
|
||||
col_def.data_type,
|
||||
count,
|
||||
series_keys.contains(&col_def.id),
|
||||
),
|
||||
)
|
||||
})
|
||||
.collect();
|
||||
(cache, Some(columns.clone()))
|
||||
}
|
||||
};
|
||||
Self {
|
||||
cache,
|
||||
key_column_ids,
|
||||
|
@ -839,7 +864,7 @@ impl LastCacheStore {
|
|||
count,
|
||||
ttl,
|
||||
last_time: Time::from_timestamp_nanos(0),
|
||||
accept_new_fields,
|
||||
value_column_ids,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -865,35 +890,38 @@ impl LastCacheStore {
|
|||
return;
|
||||
}
|
||||
let mut seen = HashSet::<ColumnId>::new();
|
||||
if self.accept_new_fields {
|
||||
// Check the length before any rows are added to ensure that the correct amount
|
||||
// of nulls are back-filled when new fields/columns are added:
|
||||
let starting_cache_size = self.len();
|
||||
for field in row.fields.iter() {
|
||||
seen.insert(field.id);
|
||||
if let Some(col) = self.cache.get_mut(&field.id) {
|
||||
// In this case, the field already has an entry in the cache, so just push:
|
||||
col.push(&field.value);
|
||||
} else if !self.key_column_ids.contains(&field.id) {
|
||||
// In this case, there is not an entry for the field in the cache, so if the
|
||||
// value is not one of the key columns, then it is a new field being added.
|
||||
let col = self.cache.entry(field.id).or_insert_with(|| {
|
||||
CacheColumn::new(data_type_from_buffer_field(field), self.count, false)
|
||||
});
|
||||
// Back-fill the new cache entry with nulls, then push the new value:
|
||||
for _ in 0..starting_cache_size {
|
||||
col.push_null();
|
||||
match self.value_column_ids {
|
||||
Some(_) => {
|
||||
for field in row.fields.iter() {
|
||||
seen.insert(field.id);
|
||||
if let Some(c) = self.cache.get_mut(&field.id) {
|
||||
c.push(&field.value);
|
||||
}
|
||||
col.push(&field.value);
|
||||
}
|
||||
// There is no else block, because the only alternative would be that this is a
|
||||
// key column, which we ignore.
|
||||
}
|
||||
} else {
|
||||
for field in row.fields.iter() {
|
||||
seen.insert(field.id);
|
||||
if let Some(c) = self.cache.get_mut(&field.id) {
|
||||
c.push(&field.value);
|
||||
None => {
|
||||
// Check the length before any rows are added to ensure that the correct amount
|
||||
// of nulls are back-filled when new fields/columns are added:
|
||||
let starting_cache_size = self.len();
|
||||
for field in row.fields.iter() {
|
||||
seen.insert(field.id);
|
||||
if let Some(col) = self.cache.get_mut(&field.id) {
|
||||
// In this case, the field already has an entry in the cache, so just push:
|
||||
col.push(&field.value);
|
||||
} else if !self.key_column_ids.contains(&field.id) {
|
||||
// In this case, there is not an entry for the field in the cache, so if the
|
||||
// value is not one of the key columns, then it is a new field being added.
|
||||
let col = self.cache.entry(field.id).or_insert_with(|| {
|
||||
CacheColumn::new(data_type_from_buffer_field(field), self.count, false)
|
||||
});
|
||||
// Back-fill the new cache entry with nulls, then push the new value:
|
||||
for _ in 0..starting_cache_size {
|
||||
col.push_null();
|
||||
}
|
||||
col.push(&field.value);
|
||||
}
|
||||
// There is no else block, because the only alternative would be that this is a
|
||||
// key column, which we ignore.
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -929,29 +957,34 @@ impl LastCacheStore {
|
|||
n_non_expired: usize,
|
||||
) -> Result<RecordBatch, ArrowError> {
|
||||
let mut arrays = extended.unwrap_or_default();
|
||||
if self.accept_new_fields {
|
||||
for field in schema.fields().iter() {
|
||||
let id = table_def
|
||||
.column_name_to_id(field.name().as_str())
|
||||
.ok_or_else(|| {
|
||||
ArrowError::from_external_error(Box::new(Error::ColumnDoesNotExistByName {
|
||||
column_name: field.name().to_string(),
|
||||
}))
|
||||
})?;
|
||||
if self.key_column_ids.contains(&id) {
|
||||
continue;
|
||||
}
|
||||
arrays.push(self.cache.get(&id).map_or_else(
|
||||
|| new_null_array(field.data_type(), n_non_expired),
|
||||
|c| c.data.as_array(n_non_expired),
|
||||
));
|
||||
match self.value_column_ids {
|
||||
Some(_) => {
|
||||
arrays.extend(
|
||||
self.cache
|
||||
.iter()
|
||||
.map(|(_, col)| col.data.as_array(n_non_expired)),
|
||||
);
|
||||
}
|
||||
None => {
|
||||
for field in schema.fields().iter() {
|
||||
let id = table_def
|
||||
.column_name_to_id(field.name().as_str())
|
||||
.ok_or_else(|| {
|
||||
ArrowError::from_external_error(Box::new(
|
||||
Error::ColumnDoesNotExistByName {
|
||||
column_name: field.name().to_string(),
|
||||
},
|
||||
))
|
||||
})?;
|
||||
if self.key_column_ids.contains(&id) {
|
||||
continue;
|
||||
}
|
||||
arrays.push(self.cache.get(&id).map_or_else(
|
||||
|| new_null_array(field.data_type(), n_non_expired),
|
||||
|c| c.data.as_array(n_non_expired),
|
||||
));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
arrays.extend(
|
||||
self.cache
|
||||
.iter()
|
||||
.map(|(_, col)| col.data.as_array(n_non_expired)),
|
||||
);
|
||||
}
|
||||
RecordBatch::try_new(schema, arrays)
|
||||
}
|
||||
|
|
|
@ -1631,4 +1631,115 @@ mod tests {
|
|||
);
|
||||
}
|
||||
}
|
||||
|
||||
#[test_log::test(tokio::test)]
|
||||
async fn test_non_specified_key_val_cols() {
|
||||
let writer = TestWriter::new();
|
||||
let _ = writer.write_lp_to_write_batch("cpu,region=us-east,host=a usage=99,temp=88", 0);
|
||||
|
||||
// create a last cache provider so we can use it to create our UDTF provider:
|
||||
let db_schema = writer.db_schema();
|
||||
let table_def = db_schema.table_definition("cpu").unwrap();
|
||||
let provider = LastCacheProvider::new_from_catalog(writer.catalog()).unwrap();
|
||||
let usage_col_id = table_def.column_name_to_id("usage").unwrap();
|
||||
provider
|
||||
.create_cache(
|
||||
db_schema.id,
|
||||
None,
|
||||
CreateLastCacheArgs {
|
||||
table_def,
|
||||
count: LastCacheSize::default(),
|
||||
ttl: LastCacheTtl::default(),
|
||||
key_columns: LastCacheKeyColumnsArg::SeriesKey,
|
||||
value_columns: LastCacheValueColumnsArg::Explicit(vec![usage_col_id]),
|
||||
},
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let write_batch = writer.write_lp_to_write_batch(
|
||||
"\
|
||||
cpu,region=us-east,host=a usage=77,temp=66\n\
|
||||
cpu,region=us-east,host=b usage=77,temp=66\n\
|
||||
cpu,region=us-west,host=c usage=77,temp=66\n\
|
||||
cpu,region=us-west,host=d usage=77,temp=66\n\
|
||||
cpu,region=ca-east,host=e usage=77,temp=66\n\
|
||||
cpu,region=ca-cent,host=f usage=77,temp=66\n\
|
||||
cpu,region=ca-west,host=g usage=77,temp=66\n\
|
||||
cpu,region=ca-west,host=h usage=77,temp=66\n\
|
||||
cpu,region=eu-cent,host=i usage=77,temp=66\n\
|
||||
cpu,region=eu-cent,host=j usage=77,temp=66\n\
|
||||
cpu,region=eu-west,host=k usage=77,temp=66\n\
|
||||
cpu,region=eu-west,host=l usage=77,temp=66\n\
|
||||
",
|
||||
1_000,
|
||||
);
|
||||
|
||||
let wal_contents = influxdb3_wal::create::wal_contents(
|
||||
(0, 1, 0),
|
||||
[influxdb3_wal::create::write_batch_op(write_batch)],
|
||||
);
|
||||
provider.write_wal_contents_to_cache(&wal_contents);
|
||||
|
||||
let ctx = SessionContext::new();
|
||||
let last_cache_fn = LastCacheFunction::new(db_schema.id, Arc::clone(&provider));
|
||||
ctx.register_udtf(LAST_CACHE_UDTF_NAME, Arc::new(last_cache_fn));
|
||||
|
||||
struct TestCase<'a> {
|
||||
sql: &'a str,
|
||||
expected: &'a [&'a str],
|
||||
}
|
||||
|
||||
let test_cases = [
|
||||
TestCase {
|
||||
sql: "select * from last_cache('cpu')",
|
||||
expected: &[
|
||||
"+---------+------+-------+-----------------------------+",
|
||||
"| region | host | usage | time |",
|
||||
"+---------+------+-------+-----------------------------+",
|
||||
"| ca-cent | f | 77.0 | 1970-01-01T00:00:00.000001Z |",
|
||||
"| ca-east | e | 77.0 | 1970-01-01T00:00:00.000001Z |",
|
||||
"| ca-west | g | 77.0 | 1970-01-01T00:00:00.000001Z |",
|
||||
"| ca-west | h | 77.0 | 1970-01-01T00:00:00.000001Z |",
|
||||
"| eu-cent | i | 77.0 | 1970-01-01T00:00:00.000001Z |",
|
||||
"| eu-cent | j | 77.0 | 1970-01-01T00:00:00.000001Z |",
|
||||
"| eu-west | k | 77.0 | 1970-01-01T00:00:00.000001Z |",
|
||||
"| eu-west | l | 77.0 | 1970-01-01T00:00:00.000001Z |",
|
||||
"| us-east | a | 77.0 | 1970-01-01T00:00:00.000001Z |",
|
||||
"| us-east | b | 77.0 | 1970-01-01T00:00:00.000001Z |",
|
||||
"| us-west | c | 77.0 | 1970-01-01T00:00:00.000001Z |",
|
||||
"| us-west | d | 77.0 | 1970-01-01T00:00:00.000001Z |",
|
||||
"+---------+------+-------+-----------------------------+",
|
||||
],
|
||||
},
|
||||
TestCase {
|
||||
sql: "select * from last_cache('cpu') WHERE region = 'eu-west'",
|
||||
expected: &[
|
||||
"+---------+------+-------+-----------------------------+",
|
||||
"| region | host | usage | time |",
|
||||
"+---------+------+-------+-----------------------------+",
|
||||
"| eu-west | k | 77.0 | 1970-01-01T00:00:00.000001Z |",
|
||||
"| eu-west | l | 77.0 | 1970-01-01T00:00:00.000001Z |",
|
||||
"+---------+------+-------+-----------------------------+",
|
||||
],
|
||||
},
|
||||
TestCase {
|
||||
sql: "select * from last_cache('cpu') WHERE region IN ('eu-west', 'eu-cent')",
|
||||
expected: &[
|
||||
"+---------+------+-------+-----------------------------+",
|
||||
"| region | host | usage | time |",
|
||||
"+---------+------+-------+-----------------------------+",
|
||||
"| eu-cent | i | 77.0 | 1970-01-01T00:00:00.000001Z |",
|
||||
"| eu-cent | j | 77.0 | 1970-01-01T00:00:00.000001Z |",
|
||||
"| eu-west | k | 77.0 | 1970-01-01T00:00:00.000001Z |",
|
||||
"| eu-west | l | 77.0 | 1970-01-01T00:00:00.000001Z |",
|
||||
"+---------+------+-------+-----------------------------+",
|
||||
],
|
||||
},
|
||||
];
|
||||
|
||||
for t in test_cases {
|
||||
let results = ctx.sql(t.sql).await.unwrap().collect().await.unwrap();
|
||||
assert_batches_sorted_eq!(t.expected, &results);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue