feat: configurable last cache eviction (#25520)

* refactor: make last cache eviction optional

This changes how the last cache is evicted. It will no longer run eviction
on writes to the cache, instead, there is an optional method to create a
last cache provider that will run eviction in a background task on a specified
interval.

Otherwise, when records are produced from the cache, only those that have
not expired will be produced.

This should reduce locks on the cache and hopefully improve performance.

* feat: configurable last cache eviction interval

* docs: clean up var names, code docs, and comments
praveen/last-cache-sys-table-col-names
Trevor Hilton 2024-11-06 06:59:17 -08:00 committed by GitHub
parent da294a265e
commit 391b67f9ab
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 113 additions and 61 deletions

View File

@ -271,6 +271,16 @@ pub struct Config {
action
)]
pub telemetry_endpoint: String,
/// The interval on which to evict expired entries from the Last-N-Value cache, expressed as a
/// human-readable time, e.g., "20s", "1m", "1h".
#[clap(
long = "last-cache-eviction-interval",
env = "INFLUXDB3_LAST_CACHE_EVICTION_INTERVAL",
default_value = "1m",
action
)]
pub last_cache_eviction_interval: humantime::Duration,
}
/// Specified size of the Parquet cache in megabytes (MB)
@ -436,15 +446,18 @@ pub async fn command(config: Config) -> Result<()> {
.map_err(Error::InitializePersistedCatalog)?,
);
let last_cache = LastCacheProvider::new_from_catalog(Arc::clone(&catalog) as _)
.map_err(Error::InitializeLastCache)?;
let last_cache = LastCacheProvider::new_from_catalog_with_background_eviction(
Arc::clone(&catalog) as _,
config.last_cache_eviction_interval.into(),
)
.map_err(Error::InitializeLastCache)?;
info!(instance_id = ?catalog.instance_id(), "Catalog initialized with");
let write_buffer_impl = Arc::new(
WriteBufferImpl::new(
Arc::clone(&persister),
Arc::clone(&catalog),
Arc::new(last_cache),
last_cache,
Arc::<SystemProvider>::clone(&time_provider),
Arc::clone(&exec),
wal_config,

View File

@ -780,7 +780,7 @@ mod tests {
influxdb3_write::write_buffer::WriteBufferImpl::new(
Arc::clone(&persister),
Arc::clone(&catalog),
Arc::new(LastCacheProvider::new_from_catalog(catalog as _).unwrap()),
LastCacheProvider::new_from_catalog(catalog as _).unwrap(),
Arc::<MockProvider>::clone(&time_provider),
Arc::clone(&exec),
WalConfig::test_config(),

View File

@ -674,7 +674,7 @@ mod tests {
WriteBufferImpl::new(
Arc::clone(&persister),
Arc::clone(&catalog),
Arc::new(LastCacheProvider::new_from_catalog(catalog as _).unwrap()),
LastCacheProvider::new_from_catalog(catalog as _).unwrap(),
Arc::<MockProvider>::clone(&time_provider),
Arc::clone(&exec),
WalConfig {

View File

@ -119,11 +119,11 @@ pub struct CreateCacheArguments {
impl LastCacheProvider {
/// Initialize a [`LastCacheProvider`] from a [`Catalog`]
pub fn new_from_catalog(catalog: Arc<Catalog>) -> Result<Self, Error> {
let provider = LastCacheProvider {
pub fn new_from_catalog(catalog: Arc<Catalog>) -> Result<Arc<Self>, Error> {
let provider = Arc::new(LastCacheProvider {
catalog: Arc::clone(&catalog),
cache_map: Default::default(),
};
});
for db_schema in catalog.list_db_schema() {
for table_def in db_schema.tables() {
for (cache_name, cache_def) in table_def.last_caches() {
@ -169,6 +169,20 @@ impl LastCacheProvider {
}
}
}
Ok(provider)
}
/// Initialize a [`LastCacheProvider`] from a [`Catalog`] and run a background process to
/// evict expired entries from the cache
pub fn new_from_catalog_with_background_eviction(
catalog: Arc<Catalog>,
eviction_interval: Duration,
) -> Result<Arc<Self>, Error> {
let provider = Self::new_from_catalog(catalog)?;
background_eviction_process(Arc::clone(&provider), eviction_interval);
Ok(provider)
}
@ -569,6 +583,22 @@ impl LastCacheProvider {
}
}
fn background_eviction_process(
provider: Arc<LastCacheProvider>,
eviction_interval: Duration,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
let mut interval = tokio::time::interval(eviction_interval);
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
interval.tick().await;
provider.evict_expired_cache_entries();
}
})
}
fn last_cache_schema_from_table_def(
table_def: Arc<TableDefinition>,
key_columns: Vec<ColumnId>,
@ -1011,7 +1041,9 @@ impl<'a> ExtendedLastCacheState<'a> {
.state
.as_store()
.expect("should only be calling to_record_batch when using a store");
let n = store.len();
// Determine the number of elements that have not expired up front, so that the value used
// is consistent in the chain of methods used to produce record batches below:
let n_non_expired = store.len();
let extended: Option<Vec<ArrayRef>> = if self.key_column_values.is_empty() {
None
} else {
@ -1021,28 +1053,28 @@ impl<'a> ExtendedLastCacheState<'a> {
.map(|value| match value {
KeyValue::String(v) => {
let mut builder = StringBuilder::new();
for _ in 0..n {
for _ in 0..n_non_expired {
builder.append_value(v);
}
Arc::new(builder.finish()) as ArrayRef
}
KeyValue::Int(v) => {
let mut builder = Int64Builder::new();
for _ in 0..n {
for _ in 0..n_non_expired {
builder.append_value(*v);
}
Arc::new(builder.finish()) as ArrayRef
}
KeyValue::UInt(v) => {
let mut builder = UInt64Builder::new();
for _ in 0..n {
for _ in 0..n_non_expired {
builder.append_value(*v);
}
Arc::new(builder.finish()) as ArrayRef
}
KeyValue::Bool(v) => {
let mut builder = BooleanBuilder::new();
for _ in 0..n {
for _ in 0..n_non_expired {
builder.append_value(*v);
}
Arc::new(builder.finish()) as ArrayRef
@ -1051,7 +1083,7 @@ impl<'a> ExtendedLastCacheState<'a> {
.collect(),
)
};
store.to_record_batch(table_def, schema, extended)
store.to_record_batch(table_def, schema, extended, n_non_expired)
}
}
@ -1317,9 +1349,12 @@ impl LastCacheStore {
}
}
/// Get the number of values in the cache.
/// Get the number of values in the cache that have not expired past the TTL.
fn len(&self) -> usize {
self.instants.len()
self.instants
.iter()
.filter(|i| i.elapsed() < self.ttl)
.count()
}
/// Check if the cache is empty
@ -1388,11 +1423,16 @@ impl LastCacheStore {
/// produced set of [`RecordBatch`]es. These are for the scenario where key columns are
/// included in the outputted batches, as the [`LastCacheStore`] only holds the field columns
/// for the cache.
///
/// Accepts an `n_non_expired` argument to indicate the number of non-expired elements in the
/// store. This is passed in vs. calling `self.len()`, since that is already invoked in the
/// calling function, and calling it here _could_ produce a different result.
fn to_record_batch(
&self,
table_def: Arc<TableDefinition>,
schema: ArrowSchemaRef,
extended: Option<Vec<ArrayRef>>,
n_non_expired: usize,
) -> Result<RecordBatch, ArrowError> {
let mut arrays = extended.unwrap_or_default();
if self.accept_new_fields {
@ -1408,12 +1448,16 @@ impl LastCacheStore {
continue;
}
arrays.push(self.cache.get(&id).map_or_else(
|| new_null_array(field.data_type(), self.len()),
|c| c.data.as_array(),
|| new_null_array(field.data_type(), n_non_expired),
|c| c.data.as_array(n_non_expired),
));
}
} else {
arrays.extend(self.cache.iter().map(|(_, col)| col.data.as_array()));
arrays.extend(
self.cache
.iter()
.map(|(_, col)| col.data.as_array(n_non_expired)),
);
}
RecordBatch::try_new(schema, arrays)
}
@ -1423,7 +1467,7 @@ impl LastCacheStore {
/// Returns whether or not the store is empty after expired entries are removed.
fn remove_expired(&mut self) -> bool {
while let Some(instant) = self.instants.back() {
if instant.elapsed() > self.ttl {
if instant.elapsed() >= self.ttl {
self.instants.pop_back();
} else {
break;
@ -1594,11 +1638,15 @@ impl CacheColumnData {
}
/// Produce an arrow [`ArrayRef`] from this column for the sake of producing [`RecordBatch`]es
fn as_array(&self) -> ArrayRef {
///
/// Accepts `n_non_expired` to indicate how many of the first elements in the column buffer to
/// take, i.e., those that have not yet expired. That value is determined externally by the
/// [`LastCacheStore`] that tracks TTL.
fn as_array(&self, n_non_expired: usize) -> ArrayRef {
match self {
CacheColumnData::I64(buf) => {
let mut b = Int64Builder::new();
buf.iter().for_each(|val| match val {
buf.iter().take(n_non_expired).for_each(|val| match val {
Some(v) => b.append_value(*v),
None => b.append_null(),
});
@ -1606,7 +1654,7 @@ impl CacheColumnData {
}
CacheColumnData::U64(buf) => {
let mut b = UInt64Builder::new();
buf.iter().for_each(|val| match val {
buf.iter().take(n_non_expired).for_each(|val| match val {
Some(v) => b.append_value(*v),
None => b.append_null(),
});
@ -1614,7 +1662,7 @@ impl CacheColumnData {
}
CacheColumnData::F64(buf) => {
let mut b = Float64Builder::new();
buf.iter().for_each(|val| match val {
buf.iter().take(n_non_expired).for_each(|val| match val {
Some(v) => b.append_value(*v),
None => b.append_null(),
});
@ -1622,7 +1670,7 @@ impl CacheColumnData {
}
CacheColumnData::String(buf) => {
let mut b = StringBuilder::new();
buf.iter().for_each(|val| match val {
buf.iter().take(n_non_expired).for_each(|val| match val {
Some(v) => b.append_value(v),
None => b.append_null(),
});
@ -1630,7 +1678,7 @@ impl CacheColumnData {
}
CacheColumnData::Bool(buf) => {
let mut b = BooleanBuilder::new();
buf.iter().for_each(|val| match val {
buf.iter().take(n_non_expired).for_each(|val| match val {
Some(v) => b.append_value(*v),
None => b.append_null(),
});
@ -1639,7 +1687,7 @@ impl CacheColumnData {
CacheColumnData::Tag(buf) => {
let mut b: GenericByteDictionaryBuilder<Int32Type, GenericStringType<i32>> =
StringDictionaryBuilder::new();
buf.iter().for_each(|val| match val {
buf.iter().take(n_non_expired).for_each(|val| match val {
Some(v) => b.append_value(v),
None => b.append_null(),
});
@ -1648,14 +1696,16 @@ impl CacheColumnData {
CacheColumnData::Key(buf) => {
let mut b: GenericByteDictionaryBuilder<Int32Type, GenericStringType<i32>> =
StringDictionaryBuilder::new();
buf.iter().for_each(|val| {
buf.iter().take(n_non_expired).for_each(|val| {
b.append_value(val);
});
Arc::new(b.finish())
}
CacheColumnData::Time(buf) => {
let mut b = TimestampNanosecondBuilder::new();
buf.iter().for_each(|val| b.append_value(*val));
buf.iter()
.take(n_non_expired)
.for_each(|val| b.append_value(*val));
Arc::new(b.finish())
}
}
@ -1721,7 +1771,7 @@ mod tests {
WriteBufferImpl::new(
persister,
Arc::clone(&catalog),
Arc::new(LastCacheProvider::new_from_catalog(catalog as _).unwrap()),
LastCacheProvider::new_from_catalog(catalog as _).unwrap(),
time_provider,
crate::test_help::make_exec(),
WalConfig::test_config(),
@ -2289,7 +2339,7 @@ mod tests {
Some("cache"),
// use a cache size greater than 1 to ensure the TTL is doing the evicting
Some(10),
Some(Duration::from_millis(50)),
Some(Duration::from_millis(1000)),
Some(vec![
(region_col_id, "region".into()),
(host_col_id, "host".into()),
@ -2345,25 +2395,7 @@ mod tests {
);
// wait for the TTL to clear the cache
tokio::time::sleep(Duration::from_millis(100)).await;
// the last cache eviction only happens when writes are flushed out to the buffer. If
// no writes are coming in, the last cache will still have data in it. So, we need to write
// some data to the buffer to trigger the last cache eviction.
wbuf.write_lp(
NamespaceName::new(db_name).unwrap(),
format!(
"\
{tbl_name},region=us,host=b usage=200\n\
"
)
.as_str(),
Time::from_timestamp_nanos(2_000),
false,
Precision::Nanosecond,
)
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(1000)).await;
// Check what is in the last cache:
let batches = wbuf
@ -2373,7 +2405,15 @@ mod tests {
.unwrap();
// The cache is completely empty after the TTL evicted data, so it will give back nothing:
assert_batches_sorted_eq!(["++", "++",], &batches);
assert_batches_sorted_eq!(
[
"+--------+------+------+-------+",
"| region | host | time | usage |",
"+--------+------+------+-------+",
"+--------+------+------+-------+",
],
&batches
);
// Ensure that records can be written to the cache again:
wbuf.write_lp(
@ -2446,7 +2486,7 @@ mod tests {
tbl_id,
Some("cache"),
None,
Some(Duration::from_millis(50)),
None,
Some(vec![
(component_id_col_id, "component_id".into()),
(active_col_id, "active".into()),

View File

@ -591,7 +591,7 @@ mod tests {
let write_buffer = WriteBufferImpl::new(
Arc::clone(&persister),
catalog,
Arc::new(last_cache),
last_cache,
Arc::clone(&time_provider),
crate::test_help::make_exec(),
WalConfig::test_config(),
@ -665,7 +665,7 @@ mod tests {
let write_buffer = WriteBufferImpl::new(
Arc::clone(&persister),
catalog,
Arc::new(last_cache),
last_cache,
Arc::clone(&time_provider),
crate::test_help::make_exec(),
WalConfig {
@ -723,7 +723,7 @@ mod tests {
let wbuf = WriteBufferImpl::new(
Arc::clone(&wbuf.persister),
catalog,
Arc::new(last_cache),
last_cache,
Arc::clone(&wbuf.time_provider),
Arc::clone(&wbuf.buffer.executor),
WalConfig {
@ -761,7 +761,7 @@ mod tests {
let wbuf = WriteBufferImpl::new(
Arc::clone(&wbuf.persister),
catalog,
Arc::new(last_cache),
last_cache,
Arc::clone(&wbuf.time_provider),
Arc::clone(&wbuf.buffer.executor),
WalConfig {
@ -818,7 +818,7 @@ mod tests {
let wbuf = WriteBufferImpl::new(
Arc::clone(&wbuf.persister),
catalog,
Arc::new(last_cache),
last_cache,
Arc::clone(&wbuf.time_provider),
Arc::clone(&wbuf.buffer.executor),
WalConfig {
@ -974,7 +974,7 @@ mod tests {
let write_buffer = WriteBufferImpl::new(
Arc::clone(&write_buffer.persister),
catalog,
Arc::new(last_cache),
last_cache,
Arc::clone(&write_buffer.time_provider),
Arc::clone(&write_buffer.buffer.executor),
WalConfig {
@ -1982,7 +1982,7 @@ mod tests {
let wbuf = WriteBufferImpl::new(
Arc::clone(&persister),
catalog,
Arc::new(last_cache),
last_cache,
Arc::clone(&time_provider),
crate::test_help::make_exec(),
wal_config,

View File

@ -125,9 +125,8 @@ impl QueryableBuffer {
/// Called when the wal has persisted a new file. Buffer the contents in memory and update the last cache so the data is queryable.
fn buffer_contents(&self, write: WalContents) {
let mut buffer = self.buffer.write();
self.last_cache_provider.evict_expired_cache_entries();
self.last_cache_provider.write_wal_contents_to_cache(&write);
let mut buffer = self.buffer.write();
buffer.buffer_ops(write.ops, &self.last_cache_provider);
}