chore: porting the changes from pro (#25660)

move ringbuffer to be allocated on heap as the MAX_CAPACITY per event
type has gone up to 10k
pull/25663/head
praveen-influx 2024-12-16 15:12:19 +00:00 committed by GitHub
parent df84f9e68e
commit 9fa4932598
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 19 additions and 12 deletions

View File

@ -12,7 +12,7 @@ use iox_time::{SystemProvider, TimeProvider};
use observability_deps::tracing::debug;
use rand::Rng;
const MAX_WRITE_ITERATIONS: u32 = 5000;
const MAX_WRITE_ITERATIONS: u32 = 100_000;
#[allow(dead_code)]
#[derive(Debug)]

View File

@ -10,7 +10,7 @@ use arrow_array::RecordBatch;
use dashmap::DashMap;
use iox_time::TimeProvider;
const MAX_CAPACITY: usize = 1000;
const MAX_CAPACITY: usize = 10_000;
/// This trait is not dyn compatible
pub trait ToRecordBatch<E> {
@ -105,34 +105,41 @@ impl SysEventStore {
}
}
pub type RingBuffer<T> = RingBufferArray<T, MAX_CAPACITY>;
// we've increased the max capacity to 10k by default, it makes
// sense to use heap.
pub type RingBuffer<T> = RingBufferVec<T>;
pub struct RingBufferArray<T, const N: usize> {
buf: [Option<T>; N],
pub struct RingBufferVec<T> {
buf: Vec<T>,
max: usize,
write_index: usize,
}
impl<T, const N: usize> RingBufferArray<T, N> {
impl<T> RingBufferVec<T> {
fn new(capacity: usize) -> Self {
let buf_array: [Option<T>; N] = [const { None }; N];
Self {
buf: buf_array,
buf: Vec::with_capacity(capacity),
max: capacity,
write_index: 0,
}
}
fn push(&mut self, val: T) {
let _ = replace(&mut self.buf[self.write_index], Some(val));
if !self.is_at_max() {
self.buf.push(val);
} else {
let _ = replace(&mut self.buf[self.write_index], val);
}
self.write_index = (self.write_index + 1) % self.max;
}
fn is_at_max(&mut self) -> bool {
self.buf.len() >= self.max
}
pub fn in_order(&self) -> impl Iterator<Item = &T> {
let (head, tail) = self.buf.split_at(self.write_index);
tail.iter()
.chain(head.iter())
.filter_map(|item| item.as_ref())
tail.iter().chain(head.iter())
}
}