commit
2899f65ad1
|
@ -100,7 +100,7 @@ fn create_table(results: &[RecordBatch]) -> Result<Table> {
|
|||
let mut cells = Vec::new();
|
||||
for col in 0..batch.num_columns() {
|
||||
let column = batch.column(col);
|
||||
cells.push(Cell::new(&array_value_to_string(column, row)?));
|
||||
cells.push(Cell::new(array_value_to_string(column, row)?));
|
||||
}
|
||||
table.add_row(cells);
|
||||
}
|
||||
|
|
|
@ -333,10 +333,7 @@ impl TestStateTtlAndRefresh {
|
|||
// set up "RNG" that always generates the maximum, so we can test things easier
|
||||
let rng_overwrite = StepRng::new(u64::MAX, 0);
|
||||
|
||||
let mut backend = PolicyBackend::new(
|
||||
Box::new(HashMap::<u8, String>::new()),
|
||||
Arc::clone(&time_provider) as _,
|
||||
);
|
||||
let mut backend = PolicyBackend::hashmap_backed(Arc::clone(&time_provider) as _);
|
||||
backend.add_policy(RefreshPolicy::new_inner(
|
||||
Arc::clone(&time_provider) as _,
|
||||
Arc::clone(&refresh_duration_provider) as _,
|
||||
|
@ -387,10 +384,7 @@ impl TestStateLRUAndRefresh {
|
|||
// set up "RNG" that always generates the maximum, so we can test things easier
|
||||
let rng_overwrite = StepRng::new(u64::MAX, 0);
|
||||
|
||||
let mut backend = PolicyBackend::new(
|
||||
Box::new(HashMap::<u8, String>::new()),
|
||||
Arc::clone(&time_provider) as _,
|
||||
);
|
||||
let mut backend = PolicyBackend::hashmap_backed(Arc::clone(&time_provider) as _);
|
||||
backend.add_policy(RefreshPolicy::new_inner(
|
||||
Arc::clone(&time_provider) as _,
|
||||
Arc::clone(&refresh_duration_provider) as _,
|
||||
|
@ -440,10 +434,7 @@ impl TestStateTtlAndLRU {
|
|||
let metric_registry = Arc::new(metric::Registry::new());
|
||||
let size_estimator = Arc::new(TestSizeEstimator::default());
|
||||
|
||||
let mut backend = PolicyBackend::new(
|
||||
Box::new(HashMap::<u8, String>::new()),
|
||||
Arc::clone(&time_provider) as _,
|
||||
);
|
||||
let mut backend = PolicyBackend::hashmap_backed(Arc::clone(&time_provider) as _);
|
||||
backend.add_policy(TtlPolicy::new(
|
||||
Arc::clone(&ttl_provider) as _,
|
||||
"my_cache",
|
||||
|
@ -484,10 +475,7 @@ impl TestStateLruAndRemoveIf {
|
|||
let metric_registry = Arc::new(metric::Registry::new());
|
||||
let size_estimator = Arc::new(TestSizeEstimator::default());
|
||||
|
||||
let mut backend = PolicyBackend::new(
|
||||
Box::new(HashMap::<u8, String>::new()),
|
||||
Arc::clone(&time_provider) as _,
|
||||
);
|
||||
let mut backend = PolicyBackend::hashmap_backed(Arc::clone(&time_provider) as _);
|
||||
|
||||
let pool = Arc::new(ResourcePool::new(
|
||||
"my_pool",
|
||||
|
@ -535,10 +523,7 @@ impl TestStateLruAndRefresh {
|
|||
// set up "RNG" that always generates the maximum, so we can test things easier
|
||||
let rng_overwrite = StepRng::new(u64::MAX, 0);
|
||||
|
||||
let mut backend = PolicyBackend::new(
|
||||
Box::new(HashMap::<u8, String>::new()),
|
||||
Arc::clone(&time_provider) as _,
|
||||
);
|
||||
let mut backend = PolicyBackend::hashmap_backed(Arc::clone(&time_provider) as _);
|
||||
backend.add_policy(RefreshPolicy::new_inner(
|
||||
Arc::clone(&time_provider) as _,
|
||||
Arc::clone(&refresh_duration_provider) as _,
|
||||
|
|
|
@ -856,7 +856,7 @@ mod tests {
|
|||
));
|
||||
let resource_estimator = Arc::new(TestResourceEstimator {});
|
||||
|
||||
let mut backend = PolicyBackend::new(Box::new(HashMap::new()), time_provider);
|
||||
let mut backend = PolicyBackend::hashmap_backed(time_provider);
|
||||
let policy_constructor = LruPolicy::new(
|
||||
Arc::clone(&pool),
|
||||
"id",
|
||||
|
@ -879,15 +879,14 @@ mod tests {
|
|||
));
|
||||
let resource_estimator = Arc::new(TestResourceEstimator {});
|
||||
|
||||
let mut backend1 =
|
||||
PolicyBackend::new(Box::new(HashMap::new()), Arc::clone(&time_provider) as _);
|
||||
let mut backend1 = PolicyBackend::hashmap_backed(Arc::clone(&time_provider) as _);
|
||||
backend1.add_policy(LruPolicy::new(
|
||||
Arc::clone(&pool),
|
||||
"id",
|
||||
Arc::clone(&resource_estimator) as _,
|
||||
));
|
||||
|
||||
let mut backend2 = PolicyBackend::new(Box::new(HashMap::new()), time_provider);
|
||||
let mut backend2 = PolicyBackend::hashmap_backed(time_provider);
|
||||
backend2.add_policy(LruPolicy::new(
|
||||
Arc::clone(&pool),
|
||||
"id",
|
||||
|
@ -905,8 +904,7 @@ mod tests {
|
|||
));
|
||||
let resource_estimator = Arc::new(TestResourceEstimator {});
|
||||
|
||||
let mut backend1 =
|
||||
PolicyBackend::new(Box::new(HashMap::new()), Arc::clone(&time_provider) as _);
|
||||
let mut backend1 = PolicyBackend::hashmap_backed(Arc::clone(&time_provider) as _);
|
||||
backend1.add_policy(LruPolicy::new(
|
||||
Arc::clone(&pool),
|
||||
"id",
|
||||
|
@ -915,7 +913,7 @@ mod tests {
|
|||
|
||||
// drop the backend so re-registering the same ID ("id") MUST NOT panic
|
||||
drop(backend1);
|
||||
let mut backend2 = PolicyBackend::new(Box::new(HashMap::new()), time_provider);
|
||||
let mut backend2 = PolicyBackend::hashmap_backed(time_provider);
|
||||
backend2.add_policy(LruPolicy::new(
|
||||
Arc::clone(&pool),
|
||||
"id",
|
||||
|
@ -935,7 +933,7 @@ mod tests {
|
|||
|
||||
assert_eq!(pool.current().0, 0);
|
||||
|
||||
let mut backend = PolicyBackend::new(Box::new(HashMap::new()), time_provider);
|
||||
let mut backend = PolicyBackend::hashmap_backed(time_provider);
|
||||
backend.add_policy(LruPolicy::new(
|
||||
Arc::clone(&pool),
|
||||
"id1",
|
||||
|
@ -955,8 +953,7 @@ mod tests {
|
|||
));
|
||||
let resource_estimator = Arc::new(TestResourceEstimator {});
|
||||
|
||||
let mut backend =
|
||||
PolicyBackend::new(Box::new(HashMap::new()), Arc::clone(&time_provider) as _);
|
||||
let mut backend = PolicyBackend::hashmap_backed(Arc::clone(&time_provider) as _);
|
||||
backend.add_policy(LruPolicy::new(
|
||||
Arc::clone(&pool),
|
||||
"id1",
|
||||
|
@ -988,7 +985,7 @@ mod tests {
|
|||
));
|
||||
let resource_estimator = Arc::new(TestResourceEstimator {});
|
||||
|
||||
let mut backend = PolicyBackend::new(Box::new(HashMap::new()), time_provider);
|
||||
let mut backend = PolicyBackend::hashmap_backed(time_provider);
|
||||
backend.add_policy(LruPolicy::new(
|
||||
Arc::clone(&pool),
|
||||
"id1",
|
||||
|
@ -1015,7 +1012,7 @@ mod tests {
|
|||
));
|
||||
let resource_estimator = Arc::new(TestResourceEstimator {});
|
||||
|
||||
let mut backend = PolicyBackend::new(Box::new(HashMap::new()), time_provider);
|
||||
let mut backend = PolicyBackend::hashmap_backed(time_provider);
|
||||
backend.add_policy(LruPolicy::new(
|
||||
Arc::clone(&pool),
|
||||
"id1",
|
||||
|
@ -1049,16 +1046,14 @@ mod tests {
|
|||
));
|
||||
let resource_estimator = Arc::new(TestResourceEstimator {});
|
||||
|
||||
let mut backend1 =
|
||||
PolicyBackend::new(Box::new(HashMap::new()), Arc::clone(&time_provider) as _);
|
||||
let mut backend1 = PolicyBackend::hashmap_backed(Arc::clone(&time_provider) as _);
|
||||
backend1.add_policy(LruPolicy::new(
|
||||
Arc::clone(&pool),
|
||||
"id1",
|
||||
Arc::clone(&resource_estimator) as _,
|
||||
));
|
||||
|
||||
let mut backend2 =
|
||||
PolicyBackend::new(Box::new(HashMap::new()), Arc::clone(&time_provider) as _);
|
||||
let mut backend2 = PolicyBackend::hashmap_backed(Arc::clone(&time_provider) as _);
|
||||
backend2.add_policy(LruPolicy::new(
|
||||
Arc::clone(&pool),
|
||||
"id2",
|
||||
|
@ -1190,8 +1185,7 @@ mod tests {
|
|||
));
|
||||
let resource_estimator = Arc::new(TestResourceEstimator {});
|
||||
|
||||
let mut backend =
|
||||
PolicyBackend::new(Box::new(HashMap::new()), Arc::clone(&time_provider) as _);
|
||||
let mut backend = PolicyBackend::hashmap_backed(Arc::clone(&time_provider) as _);
|
||||
backend.add_policy(LruPolicy::new(
|
||||
Arc::clone(&pool),
|
||||
"id1",
|
||||
|
@ -1237,7 +1231,7 @@ mod tests {
|
|||
));
|
||||
let resource_estimator = Arc::new(TestResourceEstimator {});
|
||||
|
||||
let mut backend = PolicyBackend::new(Box::new(HashMap::new()), time_provider);
|
||||
let mut backend = PolicyBackend::hashmap_backed(time_provider);
|
||||
backend.add_policy(LruPolicy::new(
|
||||
Arc::clone(&pool),
|
||||
"id1",
|
||||
|
@ -1276,8 +1270,7 @@ mod tests {
|
|||
|
||||
let resource_estimator = Arc::new(Provider {});
|
||||
|
||||
let mut backend =
|
||||
PolicyBackend::new(Box::new(HashMap::new()), Arc::clone(&time_provider) as _);
|
||||
let mut backend = PolicyBackend::hashmap_backed(Arc::clone(&time_provider) as _);
|
||||
backend.add_policy(LruPolicy::new(
|
||||
Arc::clone(&pool),
|
||||
"id1",
|
||||
|
@ -1395,7 +1388,7 @@ mod tests {
|
|||
&Observation::U64Gauge(0)
|
||||
);
|
||||
|
||||
let mut backend = PolicyBackend::new(Box::new(HashMap::new()), time_provider);
|
||||
let mut backend = PolicyBackend::hashmap_backed(time_provider);
|
||||
backend.add_policy(LruPolicy::new(
|
||||
Arc::clone(&pool),
|
||||
"id",
|
||||
|
@ -1522,7 +1515,7 @@ mod tests {
|
|||
));
|
||||
let resource_estimator = Arc::new(ZeroSizeProvider {});
|
||||
|
||||
let mut backend = PolicyBackend::new(Box::new(HashMap::new()), time_provider);
|
||||
let mut backend = PolicyBackend::hashmap_backed(time_provider);
|
||||
backend.add_policy(LruPolicy::new(
|
||||
Arc::clone(&pool),
|
||||
"id",
|
||||
|
|
|
@ -2,7 +2,7 @@
|
|||
|
||||
use std::{
|
||||
cell::RefCell,
|
||||
collections::VecDeque,
|
||||
collections::{HashMap, VecDeque},
|
||||
fmt::Debug,
|
||||
hash::Hash,
|
||||
marker::PhantomData,
|
||||
|
@ -39,78 +39,88 @@ macro_rules! lock_inner {
|
|||
|
||||
/// Backend that is controlled by different policies.
|
||||
///
|
||||
///
|
||||
/// # Policies & Recursion
|
||||
///
|
||||
/// Policies have two tasks:
|
||||
///
|
||||
/// - initiate changes (e.g. based on timers)
|
||||
/// - react to changes
|
||||
///
|
||||
/// Getting data from a [`PolicyBackend`] and feeding data back into it in a somewhat synchronous manner sounds really
|
||||
/// close to recursion. Uncontrolled recursion however is bad for the following reasons:
|
||||
/// Getting data from a [`PolicyBackend`] and feeding data back into it in a somewhat synchronous
|
||||
/// manner sounds really close to recursion. Uncontrolled recursion however is bad for the
|
||||
/// following reasons:
|
||||
///
|
||||
/// 1. **Stack space:** We may easily run out of stack space.
|
||||
/// 2. **Ownership:** Looping back into the same data structure can easily lead to deadlocks (data corruption is luckily
|
||||
/// prevented by Rust's ownership model).
|
||||
/// 2. **Ownership:** Looping back into the same data structure can easily lead to deadlocks (data
|
||||
/// corruption is luckily prevented by Rust's ownership model).
|
||||
///
|
||||
/// However sometimes we need to have interactions of policies in a "recursive" manner. E.g.:
|
||||
///
|
||||
/// 1. A refresh policies updates a value based on a timer. The value gets bigger.
|
||||
/// 2. Some resource-pool policy decides that this is now too much data and wants to evict data.
|
||||
/// 3. The refresh policy gets informed about the values that are removed so it can stop refreshing them.
|
||||
/// 3. The refresh policy gets informed about the values that are removed so it can stop refreshing
|
||||
/// them.
|
||||
///
|
||||
/// The solution that [`PolicyBackend`] uses is the following:
|
||||
///
|
||||
/// All interaction of the policy with a [`PolicyBackend`] happens through a proxy object called [`ChangeRequest`]. The
|
||||
/// [`ChangeRequest`] encapsulates a single atomic "transaction" on the underlying store. This can be a simple operation
|
||||
/// as [`REMOVE`](CacheBackend::remove) but also combound operation like "get+remove" (e.g. to check if a value needs to
|
||||
/// be pruned from the cache). The policy has two ways of issuing [`ChangeRequest`]s:
|
||||
/// All interaction of the policy with a [`PolicyBackend`] happens through a proxy object called
|
||||
/// [`ChangeRequest`]. The [`ChangeRequest`] encapsulates a single atomic "transaction" on the
|
||||
/// underlying store. This can be a simple operation as [`REMOVE`](CacheBackend::remove) but also
|
||||
/// compound operations like "get+remove" (e.g. to check if a value needs to be pruned from the
|
||||
/// cache). The policy has two ways of issuing [`ChangeRequest`]s:
|
||||
///
|
||||
/// 1. **Initial / self-driven:** Upon creation the policy receives a [`CallbackHandle`] that it can use initiate
|
||||
/// requests. This handle must only be used to create requests "out of thin air" (e.g. based on a timer). It MUST NOT
|
||||
/// be used to react to changes (see next point) to avoid deadlocks.
|
||||
/// 2. **Reactions:** Each policy implements a [`Subscriber`] that receives notifications for each changes. These
|
||||
/// notification return [`ChangeRequest`]s that the policy wishes to be performed. This construct is designed to
|
||||
/// avoid recursion.
|
||||
/// 1. **Initial / self-driven:** Upon creation the policy receives a [`CallbackHandle`] that it
|
||||
/// can use initiate requests. This handle must only be used to create requests "out of thin
|
||||
/// air" (e.g. based on a timer). It MUST NOT be used to react to changes (see next point) to
|
||||
/// avoid deadlocks.
|
||||
/// 2. **Reactions:** Each policy implements a [`Subscriber`] that receives notifications for each
|
||||
/// changes. These notification return [`ChangeRequest`]s that the policy wishes to be
|
||||
/// performed. This construct is designed to avoid recursion.
|
||||
///
|
||||
/// Also note that a policy that uses the subscriber interface MUST NOT hold locks on their internal data structure
|
||||
/// while performing _initial requests_ to avoid deadlocks (since the subscriber will be informed about the changes).
|
||||
///
|
||||
/// We cannot guarantee that policies fulfill this interface, but [`PolicyBackend`] performs some sanity checks (e.g. it
|
||||
/// will catch if the same thread that started an initial requests recurses into another initial request).
|
||||
/// Also note that a policy that uses the subscriber interface MUST NOT hold locks on their
|
||||
/// internal data structure while performing _initial requests_ to avoid deadlocks (since the
|
||||
/// subscriber will be informed about the changes).
|
||||
///
|
||||
/// We cannot guarantee that policies fulfill this interface, but [`PolicyBackend`] performs some
|
||||
/// sanity checks (e.g. it will catch if the same thread that started an initial requests recurses
|
||||
/// into another initial request).
|
||||
///
|
||||
/// # Change Propagation
|
||||
/// Each [`ChangeRequest`]s is processed atomically, so "get + set" / "compare + exchange" patterns work as expected.
|
||||
///
|
||||
/// Changes will be propated "breadth first". This means that the initial changes will form a task list. For
|
||||
/// every task in this list (front to back), we will execute the [`ChangeRequest`]. Every change that is performed
|
||||
/// within this request (usually only one) we propagate the change as following:
|
||||
/// Each [`ChangeRequest`] is processed atomically, so "get + set" / "compare + exchange" patterns
|
||||
/// work as expected.
|
||||
///
|
||||
/// Changes will be propagated "breadth first". This means that the initial changes will form a
|
||||
/// task list. For every task in this list (front to back), we will execute the [`ChangeRequest`].
|
||||
/// Every change that is performed within this request (usually only one) we propagate the change
|
||||
/// as follows:
|
||||
///
|
||||
/// 1. underlying backend
|
||||
/// 2. policies (in the order they where added)
|
||||
///
|
||||
/// From step 2 we collect new change requests that will added to the back of the task list.
|
||||
/// From step 2 we collect new change requests that will be added to the back of the task list.
|
||||
///
|
||||
/// The original requests will return to the caller once all tasks are completed.
|
||||
///
|
||||
/// When a [`ChangeRequest`] performs multiple operations -- e.g. [`GET`](CacheBackend::get) and
|
||||
/// [`SET`](CacheBackend::set) -- we first inform all subscribers about the first operation (in this case:
|
||||
/// [`GET`](CacheBackend::get)) and collect the resulting [`ChangeRequest`]s and then we process the second operation
|
||||
/// (in this case: [`SET`](CacheBackend::set)).
|
||||
///
|
||||
/// [`SET`](CacheBackend::set) -- we first inform all subscribers about the first operation (in
|
||||
/// this case: [`GET`](CacheBackend::get)) and collect the resulting [`ChangeRequest`]s. Then we
|
||||
/// process the second operation (in this case: [`SET`](CacheBackend::set)).
|
||||
///
|
||||
/// # `GET`
|
||||
/// The return value for [`CacheBackend::get`] is fetched from the inner backend AFTER all changes are applied.
|
||||
///
|
||||
/// Note [`ChangeRequest::get`] has no way of returning a result to the [`Subscriber`] that created it. The "changes"
|
||||
/// solely act as some kind of "keep alive" / "this was used" signal.
|
||||
/// The return value for [`CacheBackend::get`] is fetched from the inner backend AFTER all changes
|
||||
/// are applied.
|
||||
///
|
||||
/// Note [`ChangeRequest::get`] has no way of returning a result to the [`Subscriber`] that created
|
||||
/// it. The "changes" solely act as some kind of "keep alive" / "this was used" signal.
|
||||
///
|
||||
/// # Example
|
||||
///
|
||||
/// **The policies in these examples are deliberately silly but simple!**
|
||||
///
|
||||
/// Let's start with a purely reactive policy that will round up all integer values to the next even number:
|
||||
/// Let's start with a purely reactive policy that will round up all integer values to the next
|
||||
/// even number:
|
||||
///
|
||||
/// ```
|
||||
/// use std::{
|
||||
|
@ -261,6 +271,7 @@ where
|
|||
/// Create new backend w/o any policies.
|
||||
///
|
||||
/// # Panic
|
||||
///
|
||||
/// Panics if `inner` is not empty.
|
||||
pub fn new(
|
||||
inner: Box<dyn CacheBackend<K = K, V = V>>,
|
||||
|
@ -277,12 +288,23 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
/// Create a new backend with a HashMap as the [`CacheBackend`].
|
||||
pub fn hashmap_backed(time_provider: Arc<dyn TimeProvider>) -> Self {
|
||||
// See <https://github.com/rust-lang/rust-clippy/issues/9621>. This clippy lint suggests
|
||||
// replacing `Box::new(HashMap::new())` with `Box::default()`, which in most cases would be
|
||||
// shorter, but because this type is actually a `Box<dyn Trait>`, the replacement would
|
||||
// need to be `Box::<HashMap<_, _>>::default()`, which doesn't seem like an improvement.
|
||||
#[allow(clippy::box_default)]
|
||||
Self::new(Box::new(HashMap::new()), Arc::clone(&time_provider))
|
||||
}
|
||||
|
||||
/// Adds new policy.
|
||||
///
|
||||
/// See documentation of [`PolicyBackend`] for more information.
|
||||
///
|
||||
/// This is called with a function that receives the "callback backend" to this backend and should return a
|
||||
/// [`Subscriber`]. This loopy construct was chosen to discourage the leakage of the "callback backend" to any other object.
|
||||
/// This is called with a function that receives the "callback backend" to this backend and
|
||||
/// should return a [`Subscriber`]. This loopy construct was chosen to discourage the leakage
|
||||
/// of the "callback backend" to any other object.
|
||||
pub fn add_policy<C, S>(&mut self, policy_constructor: C)
|
||||
where
|
||||
C: FnOnce(CallbackHandle<K, V>) -> S,
|
||||
|
@ -300,8 +322,8 @@ where
|
|||
///
|
||||
/// This is mostly useful for debugging and testing.
|
||||
pub fn inner_ref(&mut self) -> InnerBackendRef<'_, K, V> {
|
||||
// NOTE: We deliberately use a mutable reference here to prevent users from using `<Self as CacheBackend>` while
|
||||
// we hold a lock to the underlying backend.
|
||||
// NOTE: We deliberately use a mutable reference here to prevent users from using `<Self as
|
||||
// CacheBackend>` while we hold a lock to the underlying backend.
|
||||
lock_inner!(guard = self.inner);
|
||||
InnerBackendRef {
|
||||
inner: guard.inner.lock_arc(),
|
||||
|
@ -348,8 +370,8 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
/// Handle that allows a [`Subscriber`] to send [`ChangeRequest`]s back to the [`PolicyBackend`] that owns that very
|
||||
/// [`Subscriber`].
|
||||
/// Handle that allows a [`Subscriber`] to send [`ChangeRequest`]s back to the [`PolicyBackend`]
|
||||
/// that owns that very [`Subscriber`].
|
||||
#[derive(Debug)]
|
||||
pub struct CallbackHandle<K, V>
|
||||
where
|
||||
|
@ -366,9 +388,10 @@ where
|
|||
{
|
||||
/// Start a series of requests to the [`PolicyBackend`] that is referenced by this handle.
|
||||
///
|
||||
/// This method returns AFTER the requests and all the follow-up changes requested by all policies are played out.
|
||||
/// You should NOT hold a lock on your policies internal data structures while calling this function if you plan to
|
||||
/// also [subscribe](Subscriber) to changes because this would easily lead to deadlocks.
|
||||
/// This method returns AFTER the requests and all the follow-up changes requested by all
|
||||
/// policies are played out. You should NOT hold a lock on your policies internal data
|
||||
/// structures while calling this function if you plan to also [subscribe](Subscriber) to
|
||||
/// changes because this would easily lead to deadlocks.
|
||||
pub fn execute_requests(&mut self, change_requests: Vec<ChangeRequest<'_, K, V>>) {
|
||||
let inner = self.inner.upgrade().expect("backend gone");
|
||||
lock_inner!(mut guard = inner);
|
||||
|
@ -384,15 +407,15 @@ where
|
|||
{
|
||||
/// Underlying cache backend.
|
||||
///
|
||||
/// This is wrapped into another `Arc<Mutex<...>>` construct even though [`PolicyBackendInner`] is already guarded
|
||||
/// by a lock, because we need to reference the underlying backend from [`Recorder`] and [`Recorder`] implements
|
||||
/// [`CacheBackend`] which is `'static`.
|
||||
/// This is wrapped into another `Arc<Mutex<...>>` construct even though [`PolicyBackendInner`]
|
||||
/// is already guarded by a lock because we need to reference the underlying backend from
|
||||
/// [`Recorder`], and [`Recorder`] implements [`CacheBackend`] which is `'static`.
|
||||
inner: Arc<Mutex<Box<dyn CacheBackend<K = K, V = V>>>>,
|
||||
|
||||
/// List of subscribers.
|
||||
subscribers: Vec<Box<dyn Subscriber<K = K, V = V>>>,
|
||||
|
||||
/// Time provider
|
||||
/// Time provider.
|
||||
time_provider: Arc<dyn TimeProvider>,
|
||||
}
|
||||
|
||||
|
@ -442,9 +465,8 @@ pub trait Subscriber: Debug + Send + 'static {
|
|||
|
||||
/// Get value for given key if it exists.
|
||||
///
|
||||
/// The current time `now` is provided as a parameter so that all policies and
|
||||
/// backends use a unified timestamp rather than their own provider, which is
|
||||
/// more consistent and performant.
|
||||
/// The current time `now` is provided as a parameter so that all policies and backends use a
|
||||
/// unified timestamp rather than their own provider, which is more consistent and performant.
|
||||
fn get(&mut self, _k: &Self::K, _now: Time) -> Vec<ChangeRequest<'static, Self::K, Self::V>> {
|
||||
// do nothing by default
|
||||
vec![]
|
||||
|
@ -454,9 +476,8 @@ pub trait Subscriber: Debug + Send + 'static {
|
|||
///
|
||||
/// It is OK to set and override a key that already exists.
|
||||
///
|
||||
/// The current time `now` is provided as a parameter so that all policies and
|
||||
/// backends use a unified timestamp rather than their own provider, which is
|
||||
/// more consistent and performant.
|
||||
/// The current time `now` is provided as a parameter so that all policies and backends use a
|
||||
/// unified timestamp rather than their own provider, which is more consistent and performant.
|
||||
fn set(
|
||||
&mut self,
|
||||
_k: &Self::K,
|
||||
|
@ -471,9 +492,8 @@ pub trait Subscriber: Debug + Send + 'static {
|
|||
///
|
||||
/// It is OK to remove a key even when it does not exist.
|
||||
///
|
||||
/// The current time `now` is provided as a parameter so that all policies and
|
||||
/// backends use a unified timestamp rather than their own provider, which is
|
||||
/// more consistent and performant.
|
||||
/// The current time `now` is provided as a parameter so that all policies and backends use a
|
||||
/// unified timestamp rather than their own provider, which is more consistent and performant.
|
||||
fn remove(
|
||||
&mut self,
|
||||
_k: &Self::K,
|
||||
|
@ -510,11 +530,12 @@ where
|
|||
{
|
||||
/// Custom way of constructing a change request.
|
||||
///
|
||||
/// This is considered a rather low-level function and you should prefer the higher-level constructs like
|
||||
/// [`get`](Self::get), [`set`](Self::set), and [`remove`](Self::remove).
|
||||
/// This is considered a rather low-level function and you should prefer the higher-level
|
||||
/// constructs like [`get`](Self::get), [`set`](Self::set), and [`remove`](Self::remove).
|
||||
///
|
||||
/// Takes a "callback backend" and can freely act on it. The underlying backend of [`PolicyBackend`] is guaranteed to be
|
||||
/// locked during a single request, so "get + modify" patterns work out of the box without the need to fair interleaving modifications.
|
||||
/// Takes a "callback backend" and can freely act on it. The underlying backend of
|
||||
/// [`PolicyBackend`] is guaranteed to be locked during a single request, so "get + modify"
|
||||
/// patterns work out of the box without the need to fear interleaving modifications.
|
||||
pub fn from_fn<F>(f: F) -> Self
|
||||
where
|
||||
F: for<'b> FnOnce(&'b mut Recorder<K, V>) + 'a,
|
||||
|
@ -543,7 +564,7 @@ where
|
|||
})
|
||||
}
|
||||
|
||||
/// Ensure that backends is empty and panic otherwise.
|
||||
/// Ensure that backend is empty and panic otherwise.
|
||||
///
|
||||
/// This is mostly useful during initialization.
|
||||
pub fn ensure_empty() -> Self {
|
||||
|
@ -586,8 +607,8 @@ enum Record<K, V> {
|
|||
},
|
||||
}
|
||||
|
||||
/// Specialized [`CacheBackend`] that forwards changes and requests to the underlying backend of [`PolicyBackend`] but
|
||||
/// also records all changes into [`Record`]s.
|
||||
/// Specialized [`CacheBackend`] that forwards changes and requests to the underlying backend of
|
||||
/// [`PolicyBackend`] but also records all changes into [`Record`]s.
|
||||
#[derive(Debug)]
|
||||
pub struct Recorder<K, V>
|
||||
where
|
||||
|
@ -605,10 +626,12 @@ where
|
|||
{
|
||||
/// Perform a [`GET`](CacheBackend::get) request that is NOT seen by other policies.
|
||||
///
|
||||
/// This is helpful if you just want to check the underlying data of a key without treating it as "used".
|
||||
/// This is helpful if you just want to check the underlying data of a key without treating it
|
||||
/// as "used".
|
||||
///
|
||||
/// Note that this functionality only exists for [`GET`](CacheBackend::get) requests, not for modifying requests
|
||||
/// like [`SET`](CacheBackend::set) or [`REMOVE`](CacheBackend::remove) since they always require policies to be in-sync.
|
||||
/// Note that this functionality only exists for [`GET`](CacheBackend::get) requests, not for
|
||||
/// modifying requests like [`SET`](CacheBackend::set) or [`REMOVE`](CacheBackend::remove)
|
||||
/// since they always require policies to be in-sync.
|
||||
pub fn get_untracked(&mut self, k: &K) -> Option<V> {
|
||||
self.inner.lock().get(k)
|
||||
}
|
||||
|
@ -716,7 +739,7 @@ mod tests {
|
|||
fn test_generic() {
|
||||
crate::backend::test_util::test_generic(|| {
|
||||
let time_provider = Arc::new(MockProvider::new(Time::MIN));
|
||||
PolicyBackend::new(Box::new(HashMap::new()), time_provider)
|
||||
PolicyBackend::hashmap_backed(time_provider)
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -724,7 +747,7 @@ mod tests {
|
|||
#[should_panic(expected = "test steps left")]
|
||||
fn test_meta_panic_steps_left() {
|
||||
let time_provider = Arc::new(MockProvider::new(Time::MIN));
|
||||
let mut backend = PolicyBackend::new(Box::new(HashMap::new()), time_provider);
|
||||
let mut backend = PolicyBackend::hashmap_backed(time_provider);
|
||||
backend.add_policy(create_test_policy(vec![TestStep {
|
||||
condition: TestBackendInteraction::Set {
|
||||
k: String::from("a"),
|
||||
|
@ -738,7 +761,7 @@ mod tests {
|
|||
#[should_panic(expected = "step left for get operation")]
|
||||
fn test_meta_panic_requires_condition_get() {
|
||||
let time_provider = Arc::new(MockProvider::new(Time::MIN));
|
||||
let mut backend = PolicyBackend::new(Box::new(HashMap::new()), time_provider);
|
||||
let mut backend = PolicyBackend::hashmap_backed(time_provider);
|
||||
backend.add_policy(create_test_policy(vec![]));
|
||||
|
||||
backend.get(&String::from("a"));
|
||||
|
@ -748,7 +771,7 @@ mod tests {
|
|||
#[should_panic(expected = "step left for set operation")]
|
||||
fn test_meta_panic_requires_condition_set() {
|
||||
let time_provider = Arc::new(MockProvider::new(Time::MIN));
|
||||
let mut backend = PolicyBackend::new(Box::new(HashMap::new()), time_provider);
|
||||
let mut backend = PolicyBackend::hashmap_backed(time_provider);
|
||||
backend.add_policy(create_test_policy(vec![]));
|
||||
|
||||
backend.set(String::from("a"), 2);
|
||||
|
@ -758,8 +781,7 @@ mod tests {
|
|||
#[should_panic(expected = "step left for remove operation")]
|
||||
fn test_meta_panic_requires_condition_remove() {
|
||||
let time_provider = Arc::new(MockProvider::new(Time::MIN));
|
||||
let mut backend =
|
||||
PolicyBackend::<String, usize>::new(Box::new(HashMap::new()), time_provider);
|
||||
let mut backend = PolicyBackend::hashmap_backed(time_provider);
|
||||
backend.add_policy(create_test_policy(vec![]));
|
||||
|
||||
backend.remove(&String::from("a"));
|
||||
|
@ -769,7 +791,7 @@ mod tests {
|
|||
#[should_panic(expected = "Condition mismatch")]
|
||||
fn test_meta_panic_checks_condition_get() {
|
||||
let time_provider = Arc::new(MockProvider::new(Time::MIN));
|
||||
let mut backend = PolicyBackend::new(Box::new(HashMap::new()), time_provider);
|
||||
let mut backend = PolicyBackend::hashmap_backed(time_provider);
|
||||
backend.add_policy(create_test_policy(vec![TestStep {
|
||||
condition: TestBackendInteraction::Get {
|
||||
k: String::from("a"),
|
||||
|
@ -784,7 +806,7 @@ mod tests {
|
|||
#[should_panic(expected = "Condition mismatch")]
|
||||
fn test_meta_panic_checks_condition_set() {
|
||||
let time_provider = Arc::new(MockProvider::new(Time::MIN));
|
||||
let mut backend = PolicyBackend::new(Box::new(HashMap::new()), time_provider);
|
||||
let mut backend = PolicyBackend::hashmap_backed(time_provider);
|
||||
backend.add_policy(create_test_policy(vec![TestStep {
|
||||
condition: TestBackendInteraction::Set {
|
||||
k: String::from("a"),
|
||||
|
@ -800,7 +822,7 @@ mod tests {
|
|||
#[should_panic(expected = "Condition mismatch")]
|
||||
fn test_meta_panic_checks_condition_remove() {
|
||||
let time_provider = Arc::new(MockProvider::new(Time::MIN));
|
||||
let mut backend = PolicyBackend::new(Box::new(HashMap::new()), time_provider);
|
||||
let mut backend = PolicyBackend::hashmap_backed(time_provider);
|
||||
backend.add_policy(create_test_policy(vec![TestStep {
|
||||
condition: TestBackendInteraction::Remove {
|
||||
k: String::from("a"),
|
||||
|
@ -814,7 +836,7 @@ mod tests {
|
|||
#[test]
|
||||
fn test_basic_propagation() {
|
||||
let time_provider = Arc::new(MockProvider::new(Time::MIN));
|
||||
let mut backend = PolicyBackend::new(Box::new(HashMap::new()), time_provider);
|
||||
let mut backend = PolicyBackend::hashmap_backed(time_provider);
|
||||
backend.add_policy(create_test_policy(vec![
|
||||
TestStep {
|
||||
condition: TestBackendInteraction::Set {
|
||||
|
@ -862,7 +884,7 @@ mod tests {
|
|||
#[should_panic(expected = "illegal recursive access")]
|
||||
fn test_panic_recursion_detection_get() {
|
||||
let time_provider = Arc::new(MockProvider::new(Time::MIN));
|
||||
let mut backend = PolicyBackend::new(Box::new(HashMap::new()), time_provider);
|
||||
let mut backend = PolicyBackend::hashmap_backed(time_provider);
|
||||
backend.add_policy(create_test_policy(vec![TestStep {
|
||||
condition: TestBackendInteraction::Remove {
|
||||
k: String::from("a"),
|
||||
|
@ -879,7 +901,7 @@ mod tests {
|
|||
#[should_panic(expected = "illegal recursive access")]
|
||||
fn test_panic_recursion_detection_set() {
|
||||
let time_provider = Arc::new(MockProvider::new(Time::MIN));
|
||||
let mut backend = PolicyBackend::new(Box::new(HashMap::new()), time_provider);
|
||||
let mut backend = PolicyBackend::hashmap_backed(time_provider);
|
||||
backend.add_policy(create_test_policy(vec![TestStep {
|
||||
condition: TestBackendInteraction::Remove {
|
||||
k: String::from("a"),
|
||||
|
@ -897,7 +919,7 @@ mod tests {
|
|||
#[should_panic(expected = "illegal recursive access")]
|
||||
fn test_panic_recursion_detection_remove() {
|
||||
let time_provider = Arc::new(MockProvider::new(Time::MIN));
|
||||
let mut backend = PolicyBackend::new(Box::new(HashMap::new()), time_provider);
|
||||
let mut backend = PolicyBackend::hashmap_backed(time_provider);
|
||||
backend.add_policy(create_test_policy(vec![TestStep {
|
||||
condition: TestBackendInteraction::Remove {
|
||||
k: String::from("a"),
|
||||
|
@ -913,7 +935,7 @@ mod tests {
|
|||
#[test]
|
||||
fn test_get_untracked() {
|
||||
let time_provider = Arc::new(MockProvider::new(Time::MIN));
|
||||
let mut backend = PolicyBackend::new(Box::new(HashMap::new()), time_provider);
|
||||
let mut backend = PolicyBackend::hashmap_backed(time_provider);
|
||||
backend.add_policy(create_test_policy(vec![
|
||||
TestStep {
|
||||
condition: TestBackendInteraction::Set {
|
||||
|
@ -935,7 +957,7 @@ mod tests {
|
|||
#[test]
|
||||
fn test_basic_get_set() {
|
||||
let time_provider = Arc::new(MockProvider::new(Time::MIN));
|
||||
let mut backend = PolicyBackend::new(Box::new(HashMap::new()), time_provider);
|
||||
let mut backend = PolicyBackend::hashmap_backed(time_provider);
|
||||
backend.add_policy(create_test_policy(vec![
|
||||
TestStep {
|
||||
condition: TestBackendInteraction::Get {
|
||||
|
@ -961,7 +983,7 @@ mod tests {
|
|||
#[test]
|
||||
fn test_basic_get_get() {
|
||||
let time_provider = Arc::new(MockProvider::new(Time::MIN));
|
||||
let mut backend = PolicyBackend::new(Box::new(HashMap::new()), time_provider);
|
||||
let mut backend = PolicyBackend::hashmap_backed(time_provider);
|
||||
backend.add_policy(create_test_policy(vec![
|
||||
TestStep {
|
||||
condition: TestBackendInteraction::Get {
|
||||
|
@ -985,7 +1007,7 @@ mod tests {
|
|||
#[test]
|
||||
fn test_basic_set_set_get_get() {
|
||||
let time_provider = Arc::new(MockProvider::new(Time::MIN));
|
||||
let mut backend = PolicyBackend::new(Box::new(HashMap::new()), time_provider);
|
||||
let mut backend = PolicyBackend::hashmap_backed(time_provider);
|
||||
backend.add_policy(create_test_policy(vec![
|
||||
TestStep {
|
||||
condition: TestBackendInteraction::Set {
|
||||
|
@ -1027,7 +1049,7 @@ mod tests {
|
|||
#[test]
|
||||
fn test_basic_set_remove_get() {
|
||||
let time_provider = Arc::new(MockProvider::new(Time::MIN));
|
||||
let mut backend = PolicyBackend::new(Box::new(HashMap::new()), time_provider);
|
||||
let mut backend = PolicyBackend::hashmap_backed(time_provider);
|
||||
backend.add_policy(create_test_policy(vec![
|
||||
TestStep {
|
||||
condition: TestBackendInteraction::Set {
|
||||
|
@ -1060,7 +1082,7 @@ mod tests {
|
|||
#[test]
|
||||
fn test_basic_remove_set_get_get() {
|
||||
let time_provider = Arc::new(MockProvider::new(Time::MIN));
|
||||
let mut backend = PolicyBackend::new(Box::new(HashMap::new()), time_provider);
|
||||
let mut backend = PolicyBackend::hashmap_backed(time_provider);
|
||||
backend.add_policy(create_test_policy(vec![
|
||||
TestStep {
|
||||
condition: TestBackendInteraction::Remove {
|
||||
|
@ -1101,7 +1123,7 @@ mod tests {
|
|||
#[test]
|
||||
fn test_basic_remove_remove_get_get() {
|
||||
let time_provider = Arc::new(MockProvider::new(Time::MIN));
|
||||
let mut backend = PolicyBackend::new(Box::new(HashMap::new()), time_provider);
|
||||
let mut backend = PolicyBackend::hashmap_backed(time_provider);
|
||||
backend.add_policy(create_test_policy(vec![
|
||||
TestStep {
|
||||
condition: TestBackendInteraction::Remove {
|
||||
|
@ -1140,7 +1162,7 @@ mod tests {
|
|||
#[test]
|
||||
fn test_ordering_within_requests_vector() {
|
||||
let time_provider = Arc::new(MockProvider::new(Time::MIN));
|
||||
let mut backend = PolicyBackend::new(Box::new(HashMap::new()), time_provider);
|
||||
let mut backend = PolicyBackend::hashmap_backed(time_provider);
|
||||
backend.add_policy(create_test_policy(vec![
|
||||
TestStep {
|
||||
condition: TestBackendInteraction::Set {
|
||||
|
@ -1182,7 +1204,7 @@ mod tests {
|
|||
#[test]
|
||||
fn test_ordering_across_policies() {
|
||||
let time_provider = Arc::new(MockProvider::new(Time::MIN));
|
||||
let mut backend = PolicyBackend::new(Box::new(HashMap::new()), time_provider);
|
||||
let mut backend = PolicyBackend::hashmap_backed(time_provider);
|
||||
backend.add_policy(create_test_policy(vec![
|
||||
TestStep {
|
||||
condition: TestBackendInteraction::Set {
|
||||
|
@ -1256,7 +1278,7 @@ mod tests {
|
|||
#[test]
|
||||
fn test_ping_pong() {
|
||||
let time_provider = Arc::new(MockProvider::new(Time::MIN));
|
||||
let mut backend = PolicyBackend::new(Box::new(HashMap::new()), time_provider);
|
||||
let mut backend = PolicyBackend::hashmap_backed(time_provider);
|
||||
backend.add_policy(create_test_policy(vec![
|
||||
TestStep {
|
||||
condition: TestBackendInteraction::Set {
|
||||
|
@ -1351,7 +1373,7 @@ mod tests {
|
|||
let barrier_post = Arc::new(Barrier::new(1));
|
||||
|
||||
let time_provider = Arc::new(MockProvider::new(Time::MIN));
|
||||
let mut backend = PolicyBackend::new(Box::new(HashMap::new()), time_provider);
|
||||
let mut backend = PolicyBackend::hashmap_backed(time_provider);
|
||||
backend.add_policy(create_test_policy(vec![TestStep {
|
||||
condition: TestBackendInteraction::Set {
|
||||
k: String::from("a"),
|
||||
|
@ -1370,15 +1392,15 @@ mod tests {
|
|||
// panic on drop
|
||||
}
|
||||
|
||||
/// Checks that a policy background task can access the "callback backend" without triggering the "illegal
|
||||
/// recursion" detection.
|
||||
/// Checks that a policy background task can access the "callback backend" without triggering
|
||||
/// the "illegal recursion" detection.
|
||||
#[test]
|
||||
fn test_multithread() {
|
||||
let barrier_pre = Arc::new(Barrier::new(2));
|
||||
let barrier_post = Arc::new(Barrier::new(2));
|
||||
|
||||
let time_provider = Arc::new(MockProvider::new(Time::MIN));
|
||||
let mut backend = PolicyBackend::new(Box::new(HashMap::new()), time_provider);
|
||||
let mut backend = PolicyBackend::hashmap_backed(time_provider);
|
||||
backend.add_policy(create_test_policy(vec![
|
||||
TestStep {
|
||||
condition: TestBackendInteraction::Set {
|
||||
|
@ -1447,7 +1469,7 @@ mod tests {
|
|||
let barrier_post = Arc::new(Barrier::new(2));
|
||||
|
||||
let time_provider = Arc::new(MockProvider::new(Time::MIN));
|
||||
let mut backend = PolicyBackend::new(Box::new(HashMap::new()), time_provider);
|
||||
let mut backend = PolicyBackend::hashmap_backed(time_provider);
|
||||
backend.add_policy(create_test_policy(vec![
|
||||
TestStep {
|
||||
condition: TestBackendInteraction::Set {
|
||||
|
@ -1497,25 +1519,28 @@ mod tests {
|
|||
assert_eq!(Arc::strong_count(&marker_policy), 1);
|
||||
}
|
||||
|
||||
/// We have to ways of handling "compound" [`ChangeRequest`]s, i.e. requests that perform multiple operations:
|
||||
/// We have to ways of handling "compound" [`ChangeRequest`]s, i.e. requests that perform
|
||||
/// multiple operations:
|
||||
///
|
||||
/// 1. We could loop over the operations and inner-loop over the policies to collect reactions
|
||||
/// 2. We could loop over all the policies and present each polices all operations in one go
|
||||
///
|
||||
/// We've decided to chose option 1. This test ensures that by setting up a compound request (reacting to
|
||||
/// `set("a", 11)`) with a compound of two operations (`set("a", 12)`, `set("a", 13)`) which we call `C1` and `C2`
|
||||
/// (for "compound 1 and 2"). The two policies react two these two compound operations as followed:
|
||||
/// We've decided to chose option 1. This test ensures that by setting up a compound request
|
||||
/// (reacting to `set("a", 11)`) with a compound of two operations (`set("a", 12)`, `set("a",
|
||||
/// 13)`) which we call `C1` and `C2` (for "compound 1 and 2"). The two policies react to
|
||||
/// these two compound operations as follows:
|
||||
///
|
||||
/// | | Policy 1 | Policy 2 |
|
||||
/// | -- | -------------- | -------------- |
|
||||
/// | C1 | `set("a", 14)` | `set("a", 15)` |
|
||||
/// | C2 | `set("a", 16)` | -- |
|
||||
///
|
||||
/// For option (1) the outcome will be `"a" -> 16`, for option (2) the outcome would be `"a" -> 15`.
|
||||
/// For option (1) the outcome will be `"a" -> 16`, for option (2) the outcome would be `"a" ->
|
||||
/// 15`.
|
||||
#[test]
|
||||
fn test_ordering_within_compound_requests() {
|
||||
let time_provider = Arc::new(MockProvider::new(Time::MIN));
|
||||
let mut backend = PolicyBackend::new(Box::new(HashMap::new()), time_provider);
|
||||
let mut backend = PolicyBackend::hashmap_backed(time_provider);
|
||||
backend.add_policy(create_test_policy(vec![
|
||||
TestStep {
|
||||
condition: TestBackendInteraction::Set {
|
||||
|
|
|
@ -857,10 +857,7 @@ mod tests {
|
|||
|
||||
let time_provider = Arc::new(MockProvider::new(Time::MIN));
|
||||
let loader = Arc::new(TestLoader::default());
|
||||
let mut backend = PolicyBackend::new(
|
||||
Box::new(HashMap::<u8, String>::new()),
|
||||
Arc::clone(&time_provider) as _,
|
||||
);
|
||||
let mut backend = PolicyBackend::hashmap_backed(Arc::clone(&time_provider) as _);
|
||||
let policy_constructor = RefreshPolicy::new(
|
||||
time_provider,
|
||||
refresh_duration_provider,
|
||||
|
@ -890,10 +887,7 @@ mod tests {
|
|||
let metric_registry = metric::Registry::new();
|
||||
let time_provider = Arc::new(MockProvider::new(Time::MAX - Duration::from_secs(1)));
|
||||
let loader = Arc::new(TestLoader::default());
|
||||
let mut backend = PolicyBackend::new(
|
||||
Box::new(HashMap::<u8, String>::new()),
|
||||
Arc::clone(&time_provider) as _,
|
||||
);
|
||||
let mut backend = PolicyBackend::hashmap_backed(Arc::clone(&time_provider) as _);
|
||||
backend.add_policy(RefreshPolicy::new(
|
||||
Arc::clone(&time_provider) as _,
|
||||
refresh_duration_provider,
|
||||
|
@ -1105,10 +1099,7 @@ mod tests {
|
|||
let time_provider = Arc::new(MockProvider::new(Time::MIN));
|
||||
let metric_registry = metric::Registry::new();
|
||||
let loader = Arc::new(TestLoader::default());
|
||||
let mut backend = PolicyBackend::new(
|
||||
Box::new(HashMap::<u8, String>::new()),
|
||||
Arc::clone(&time_provider) as _,
|
||||
);
|
||||
let mut backend = PolicyBackend::hashmap_backed(Arc::clone(&time_provider) as _);
|
||||
|
||||
backend.add_policy(RefreshPolicy::new(
|
||||
time_provider,
|
||||
|
@ -1142,10 +1133,7 @@ mod tests {
|
|||
// set up "RNG" that always generates the maximum, so we can test things easier
|
||||
let rng_overwrite = StepRng::new(u64::MAX, 0);
|
||||
|
||||
let mut backend = PolicyBackend::new(
|
||||
Box::new(HashMap::<u8, String>::new()),
|
||||
Arc::clone(&time_provider) as _,
|
||||
);
|
||||
let mut backend = PolicyBackend::hashmap_backed(Arc::clone(&time_provider) as _);
|
||||
backend.add_policy(RefreshPolicy::new_inner(
|
||||
Arc::clone(&time_provider) as _,
|
||||
Arc::clone(&refresh_duration_provider) as _,
|
||||
|
|
|
@ -186,8 +186,6 @@ where
|
|||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::collections::HashMap;
|
||||
|
||||
use iox_time::{MockProvider, Time};
|
||||
use metric::{Observation, RawReporter};
|
||||
|
||||
|
@ -202,8 +200,7 @@ mod tests {
|
|||
test_generic(|| {
|
||||
let metric_registry = metric::Registry::new();
|
||||
let time_provider = Arc::new(MockProvider::new(Time::MIN));
|
||||
let mut backend =
|
||||
PolicyBackend::new(Box::new(HashMap::<u8, String>::new()), time_provider);
|
||||
let mut backend = PolicyBackend::hashmap_backed(time_provider);
|
||||
let (policy_constructor, _handle) =
|
||||
RemoveIfPolicy::create_constructor_and_handle("my_cache", &metric_registry);
|
||||
backend.add_policy(policy_constructor);
|
||||
|
@ -215,7 +212,7 @@ mod tests {
|
|||
fn test_remove_if() {
|
||||
let metric_registry = metric::Registry::new();
|
||||
let time_provider = Arc::new(MockProvider::new(Time::MIN));
|
||||
let mut backend = PolicyBackend::new(Box::new(HashMap::<u8, String>::new()), time_provider);
|
||||
let mut backend: PolicyBackend<u8, String> = PolicyBackend::hashmap_backed(time_provider);
|
||||
let (policy_constructor, handle) =
|
||||
RemoveIfPolicy::create_constructor_and_handle("my_cache", &metric_registry);
|
||||
backend.add_policy(policy_constructor);
|
||||
|
|
|
@ -328,7 +328,7 @@ mod tests {
|
|||
let metric_registry = metric::Registry::new();
|
||||
|
||||
let time_provider = Arc::new(MockProvider::new(Time::MIN));
|
||||
let mut backend = PolicyBackend::new(Box::new(HashMap::<u8, String>::new()), time_provider);
|
||||
let mut backend: PolicyBackend<u8, String> = PolicyBackend::hashmap_backed(time_provider);
|
||||
let policy_constructor =
|
||||
TtlPolicy::new(Arc::clone(&ttl_provider) as _, "my_cache", &metric_registry);
|
||||
backend.add_policy(|mut handle| {
|
||||
|
@ -367,7 +367,7 @@ mod tests {
|
|||
|
||||
// init time provider at MAX!
|
||||
let time_provider = Arc::new(MockProvider::new(Time::MAX));
|
||||
let mut backend = PolicyBackend::new(Box::new(HashMap::<u8, String>::new()), time_provider);
|
||||
let mut backend: PolicyBackend<u8, String> = PolicyBackend::hashmap_backed(time_provider);
|
||||
backend.add_policy(TtlPolicy::new(
|
||||
Arc::clone(&ttl_provider) as _,
|
||||
"my_cache",
|
||||
|
@ -481,10 +481,8 @@ mod tests {
|
|||
|
||||
// init time provider at nearly MAX!
|
||||
let time_provider = Arc::new(MockProvider::new(Time::MAX - Duration::from_secs(2)));
|
||||
let mut backend = PolicyBackend::new(
|
||||
Box::new(HashMap::<u8, String>::new()),
|
||||
Arc::clone(&time_provider) as _,
|
||||
);
|
||||
let mut backend: PolicyBackend<u8, String> =
|
||||
PolicyBackend::hashmap_backed(Arc::clone(&time_provider) as _);
|
||||
backend.add_policy(TtlPolicy::new(
|
||||
Arc::clone(&ttl_provider) as _,
|
||||
"my_cache",
|
||||
|
@ -648,8 +646,7 @@ mod tests {
|
|||
let ttl_provider = Arc::new(NeverTtlProvider::default());
|
||||
let time_provider = Arc::new(MockProvider::new(Time::MIN));
|
||||
let metric_registry = metric::Registry::new();
|
||||
let mut backend =
|
||||
PolicyBackend::new(Box::new(HashMap::<u8, String>::new()), time_provider);
|
||||
let mut backend = PolicyBackend::hashmap_backed(time_provider);
|
||||
backend.add_policy(TtlPolicy::new(
|
||||
Arc::clone(&ttl_provider) as _,
|
||||
"my_cache",
|
||||
|
@ -672,10 +669,7 @@ mod tests {
|
|||
let time_provider = Arc::new(MockProvider::new(Time::MIN));
|
||||
let metric_registry = metric::Registry::new();
|
||||
|
||||
let mut backend = PolicyBackend::new(
|
||||
Box::new(HashMap::<u8, String>::new()),
|
||||
Arc::clone(&time_provider) as _,
|
||||
);
|
||||
let mut backend = PolicyBackend::hashmap_backed(Arc::clone(&time_provider) as _);
|
||||
backend.add_policy(TtlPolicy::new(
|
||||
Arc::clone(&ttl_provider) as _,
|
||||
"my_cache",
|
||||
|
|
|
@ -254,7 +254,7 @@ async fn compact_candidates_with_memory_budget<C, Fut>(
|
|||
// . already considered all remaining candidates.
|
||||
// . hit the max number of partitions to compact in parallel
|
||||
if (!parallel_compacting_candidates.is_empty())
|
||||
&& ((remaining_budget_bytes <= (compactor.config.memory_budget_bytes / 10) as u64)
|
||||
&& ((remaining_budget_bytes <= compactor.config.memory_budget_bytes / 10)
|
||||
|| (candidates.is_empty())
|
||||
|| (count == num_remaining_candidates)
|
||||
|| (count as u64 == compactor.config.max_parallel_partitions))
|
||||
|
|
|
@ -1810,7 +1810,7 @@ impl<T> StatValues<T> {
|
|||
|
||||
/// updates the statistics keeping the min, max and incrementing count.
|
||||
///
|
||||
/// The type plumbing exists to allow calling with &str on a StatValues<String>
|
||||
/// The type plumbing exists to allow calling with `&str` on a `StatValues<String>`.
|
||||
pub fn update<U: ?Sized>(&mut self, other: &U)
|
||||
where
|
||||
T: Borrow<U>,
|
||||
|
@ -2243,7 +2243,7 @@ impl TimestampRange {
|
|||
///
|
||||
/// If `start > end`, this will be interpreted as an empty time range and `start` will be set to `end`.
|
||||
pub fn new(start: i64, end: i64) -> Self {
|
||||
let start = start.max(MIN_NANO_TIME).min(end);
|
||||
let start = start.clamp(MIN_NANO_TIME, end);
|
||||
let end = end.max(MIN_NANO_TIME);
|
||||
Self { start, end }
|
||||
}
|
||||
|
|
|
@ -60,10 +60,7 @@ pub fn merge_responses(
|
|||
.or_insert(info);
|
||||
});
|
||||
|
||||
let shard_infos = shard_infos
|
||||
.into_iter()
|
||||
.map(|(_shard_index, info)| info)
|
||||
.collect();
|
||||
let shard_infos = shard_infos.into_values().collect();
|
||||
|
||||
proto::GetWriteInfoResponse { shard_infos }
|
||||
}
|
||||
|
|
|
@ -114,7 +114,7 @@ message ClientHeader {
|
|||
Metadata metadata = 1;
|
||||
|
||||
// The name of the RPC method, which looks something like:
|
||||
// /<service>/<method>
|
||||
// `/<service>/<method>`
|
||||
// Note the leading "/" character.
|
||||
string method_name = 2;
|
||||
|
||||
|
@ -122,7 +122,7 @@ message ClientHeader {
|
|||
// servers with different identities.
|
||||
// The authority is the name of such a server identitiy.
|
||||
// It is typically a portion of the URI in the form of
|
||||
// <host> or <host>:<port> .
|
||||
// `<host>` or `<host>:<port>`.
|
||||
string authority = 3;
|
||||
|
||||
// the RPC timeout
|
||||
|
|
|
@ -58,7 +58,7 @@ mod tests {
|
|||
)
|
||||
.create();
|
||||
|
||||
let client = Client::new(&mockito::server_url(), token);
|
||||
let client = Client::new(mockito::server_url(), token);
|
||||
|
||||
let _result = client
|
||||
.create_bucket(Some(PostBucketRequest::new(org_id, bucket)))
|
||||
|
|
|
@ -43,7 +43,7 @@ mod tests {
|
|||
async fn health() {
|
||||
let mock_server = mock("GET", "/health").create();
|
||||
|
||||
let client = Client::new(&mockito::server_url(), "");
|
||||
let client = Client::new(mockito::server_url(), "");
|
||||
|
||||
let _result = client.health().await;
|
||||
|
||||
|
|
|
@ -151,7 +151,7 @@ mod tests {
|
|||
.match_header("Authorization", format!("Token {}", token).as_str())
|
||||
.create();
|
||||
|
||||
let client = Client::new(&mockito::server_url(), token);
|
||||
let client = Client::new(mockito::server_url(), token);
|
||||
|
||||
let _result = client.labels().await;
|
||||
|
||||
|
@ -167,7 +167,7 @@ mod tests {
|
|||
.match_header("Authorization", format!("Token {}", token).as_str())
|
||||
.create();
|
||||
|
||||
let client = Client::new(&mockito::server_url(), token);
|
||||
let client = Client::new(mockito::server_url(), token);
|
||||
|
||||
let _result = client.labels_by_org(org_id).await;
|
||||
|
||||
|
@ -183,7 +183,7 @@ mod tests {
|
|||
.match_header("Authorization", format!("Token {}", token).as_str())
|
||||
.create();
|
||||
|
||||
let client = Client::new(&mockito::server_url(), token);
|
||||
let client = Client::new(mockito::server_url(), token);
|
||||
|
||||
let _result = client.find_label(label_id).await;
|
||||
|
||||
|
@ -210,7 +210,7 @@ mod tests {
|
|||
)
|
||||
.create();
|
||||
|
||||
let client = Client::new(&mockito::server_url(), token);
|
||||
let client = Client::new(mockito::server_url(), token);
|
||||
|
||||
let _result = client.create_label(org_id, name, Some(properties)).await;
|
||||
|
||||
|
@ -229,7 +229,7 @@ mod tests {
|
|||
.match_body(format!(r#"{{"orgID":"{}","name":"{}"}}"#, org_id, name).as_str())
|
||||
.create();
|
||||
|
||||
let client = Client::new(&mockito::server_url(), token);
|
||||
let client = Client::new(mockito::server_url(), token);
|
||||
|
||||
let _result = client.create_label(org_id, name, None).await;
|
||||
|
||||
|
@ -256,7 +256,7 @@ mod tests {
|
|||
)
|
||||
.create();
|
||||
|
||||
let client = Client::new(&mockito::server_url(), token);
|
||||
let client = Client::new(mockito::server_url(), token);
|
||||
|
||||
let _result = client
|
||||
.update_label(Some(name.to_string()), Some(properties), label_id)
|
||||
|
@ -276,7 +276,7 @@ mod tests {
|
|||
.match_body("{}")
|
||||
.create();
|
||||
|
||||
let client = Client::new(&mockito::server_url(), token);
|
||||
let client = Client::new(mockito::server_url(), token);
|
||||
|
||||
let _result = client.update_label(None, None, label_id).await;
|
||||
|
||||
|
@ -292,7 +292,7 @@ mod tests {
|
|||
.match_header("Authorization", format!("Token {}", token).as_str())
|
||||
.create();
|
||||
|
||||
let client = Client::new(&mockito::server_url(), token);
|
||||
let client = Client::new(mockito::server_url(), token);
|
||||
|
||||
let _result = client.delete_label(label_id).await;
|
||||
|
||||
|
|
|
@ -158,7 +158,7 @@ mod tests {
|
|||
.match_header("Authorization", format!("Token {}", token).as_str())
|
||||
.create();
|
||||
|
||||
let client = Client::new(&mockito::server_url(), token);
|
||||
let client = Client::new(mockito::server_url(), token);
|
||||
|
||||
let _result = client.query_suggestions().await;
|
||||
|
||||
|
@ -181,7 +181,7 @@ mod tests {
|
|||
.match_header("Authorization", format!("Token {}", token).as_str())
|
||||
.create();
|
||||
|
||||
let client = Client::new(&mockito::server_url(), token);
|
||||
let client = Client::new(mockito::server_url(), token);
|
||||
|
||||
let _result = client.query_suggestions_name(suggestion_name).await;
|
||||
|
||||
|
@ -205,7 +205,7 @@ mod tests {
|
|||
)
|
||||
.create();
|
||||
|
||||
let client = Client::new(&mockito::server_url(), token);
|
||||
let client = Client::new(mockito::server_url(), token);
|
||||
|
||||
let _result = client.query_raw(org, query).await;
|
||||
|
||||
|
@ -230,7 +230,7 @@ mod tests {
|
|||
)
|
||||
.create();
|
||||
|
||||
let client = Client::new(&mockito::server_url(), token);
|
||||
let client = Client::new(mockito::server_url(), token);
|
||||
|
||||
let _result = client.query_raw(org, None).await;
|
||||
|
||||
|
@ -251,7 +251,7 @@ mod tests {
|
|||
)
|
||||
.create();
|
||||
|
||||
let client = Client::new(&mockito::server_url(), token);
|
||||
let client = Client::new(mockito::server_url(), token);
|
||||
|
||||
let _result = client.query_analyze(query).await;
|
||||
|
||||
|
@ -272,7 +272,7 @@ mod tests {
|
|||
)
|
||||
.create();
|
||||
|
||||
let client = Client::new(&mockito::server_url(), token);
|
||||
let client = Client::new(mockito::server_url(), token);
|
||||
|
||||
let _result = client.query_analyze(query).await;
|
||||
|
||||
|
@ -294,7 +294,7 @@ mod tests {
|
|||
)
|
||||
.create();
|
||||
|
||||
let client = Client::new(&mockito::server_url(), token);
|
||||
let client = Client::new(mockito::server_url(), token);
|
||||
|
||||
let _result = client.query_ast(language_request).await;
|
||||
|
||||
|
@ -315,7 +315,7 @@ mod tests {
|
|||
)
|
||||
.create();
|
||||
|
||||
let client = Client::new(&mockito::server_url(), token);
|
||||
let client = Client::new(mockito::server_url(), token);
|
||||
|
||||
let _result = client.query_ast(language_request).await;
|
||||
|
||||
|
@ -340,7 +340,7 @@ mod tests {
|
|||
.with_body("")
|
||||
.create();
|
||||
|
||||
let client = Client::new(&mockito::server_url(), token);
|
||||
let client = Client::new(mockito::server_url(), token);
|
||||
|
||||
let result = client.query_raw(org, query).await.expect("request success");
|
||||
assert_eq!(result, "");
|
||||
|
|
|
@ -37,7 +37,7 @@ mod tests {
|
|||
async fn ready() {
|
||||
let mock_server = mock("GET", "/ready").create();
|
||||
|
||||
let client = Client::new(&mockito::server_url(), "");
|
||||
let client = Client::new(mockito::server_url(), "");
|
||||
|
||||
let _result = client.ready().await;
|
||||
|
||||
|
|
|
@ -123,7 +123,7 @@ mod tests {
|
|||
async fn is_onboarding_allowed() {
|
||||
let mock_server = mock("GET", "/api/v2/setup").create();
|
||||
|
||||
let client = Client::new(&mockito::server_url(), "");
|
||||
let client = Client::new(mockito::server_url(), "");
|
||||
|
||||
let _result = client.is_onboarding_allowed().await;
|
||||
|
||||
|
@ -149,7 +149,7 @@ mod tests {
|
|||
)
|
||||
.create();
|
||||
|
||||
let client = Client::new(&mockito::server_url(), token);
|
||||
let client = Client::new(mockito::server_url(), token);
|
||||
|
||||
let _result = client
|
||||
.onboarding(
|
||||
|
@ -185,7 +185,7 @@ mod tests {
|
|||
)
|
||||
.create();
|
||||
|
||||
let client = Client::new(&mockito::server_url(), token);
|
||||
let client = Client::new(mockito::server_url(), token);
|
||||
|
||||
let _result = client
|
||||
.post_setup_user(
|
||||
|
@ -218,7 +218,7 @@ mod tests {
|
|||
)
|
||||
.create();
|
||||
|
||||
let client = Client::new(&mockito::server_url(), "");
|
||||
let client = Client::new(mockito::server_url(), "");
|
||||
|
||||
let _result = client
|
||||
.onboarding(username, org, bucket, None, None, None)
|
||||
|
@ -246,7 +246,7 @@ mod tests {
|
|||
)
|
||||
.create();
|
||||
|
||||
let client = Client::new(&mockito::server_url(), token);
|
||||
let client = Client::new(mockito::server_url(), token);
|
||||
|
||||
let _result = client
|
||||
.post_setup_user(username, org, bucket, None, None, None)
|
||||
|
|
|
@ -85,7 +85,7 @@ cpu,host=server01,region=us-west usage=0.87
|
|||
)
|
||||
.create();
|
||||
|
||||
let client = Client::new(&mockito::server_url(), token);
|
||||
let client = Client::new(mockito::server_url(), token);
|
||||
|
||||
let points = vec![
|
||||
DataPoint::builder("cpu")
|
||||
|
|
|
@ -63,7 +63,7 @@ pub enum Expr {
|
|||
/// A literal wildcard (`*`) with an optional data type selection.
|
||||
Wildcard(Option<WildcardType>),
|
||||
|
||||
/// A DISTINCT <identifier> expression.
|
||||
/// A DISTINCT `<identifier>` expression.
|
||||
Distinct(Identifier),
|
||||
|
||||
/// Unary operation such as + 5 or - 1h3m
|
||||
|
|
|
@ -31,11 +31,8 @@ async fn ingester_schema_client() {
|
|||
.get("my_awesome_table")
|
||||
.expect("table not found");
|
||||
|
||||
let mut column_names: Vec<_> = table
|
||||
.columns
|
||||
.iter()
|
||||
.map(|(name, _col)| name.to_string())
|
||||
.collect();
|
||||
let mut column_names: Vec<_> =
|
||||
table.columns.keys().map(ToString::to_string).collect();
|
||||
column_names.sort_unstable();
|
||||
|
||||
assert_eq!(column_names, &["tag1", "tag2", "time", "val"]);
|
||||
|
|
|
@ -141,7 +141,7 @@ pub fn encode(src: &[f64], dst: &mut Vec<u8>) -> Result<(), Box<dyn Error>> {
|
|||
if m <= 3 {
|
||||
// 5 bits fit in current byte
|
||||
dst[n >> 3] |= (mask >> m) as u8;
|
||||
n += l as usize;
|
||||
n += l;
|
||||
} else {
|
||||
// not enough bits available in current byte
|
||||
let written = 8 - m;
|
||||
|
|
|
@ -53,7 +53,7 @@ mod tests {
|
|||
|
||||
#[test]
|
||||
fn encode_uncompressed() {
|
||||
let src: Vec<u64> = vec![1000, 0, simple8b::MAX_VALUE as u64, 213123421];
|
||||
let src: Vec<u64> = vec![1000, 0, simple8b::MAX_VALUE, 213123421];
|
||||
let mut dst = vec![];
|
||||
|
||||
let exp = src.clone();
|
||||
|
|
|
@ -71,26 +71,30 @@ pub enum DataError {
|
|||
|
||||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||
|
||||
/// parses the the measurement, field key and tag
|
||||
/// set from a tsm index key
|
||||
/// Parses the the measurement, field key and tag set from a TSM index key
|
||||
///
|
||||
/// It does not provide access to the org and bucket ids on the key, these can
|
||||
/// be accessed via org_id() and bucket_id() respectively.
|
||||
/// It does not provide access to the org and bucket IDs on the key; these can be accessed via
|
||||
/// `org_id()` and `bucket_id()` respectively.
|
||||
///
|
||||
/// Loosely based on [points.go](https://github.com/influxdata/influxdb/blob/751d70a213e5fdae837eda13d7ecb37763e69abb/models/points.go#L462)
|
||||
///
|
||||
/// The format looks roughly like:
|
||||
///
|
||||
/// ```text
|
||||
/// <org_id bucket_id>,\x00=<measurement>,<tag_keys_str>,\xff=<field_key_str>#!
|
||||
/// ~#<field_key_str>
|
||||
/// ```
|
||||
///
|
||||
/// For example:
|
||||
///
|
||||
/// ```text
|
||||
/// <org_id bucket_id>,\x00=http_api_request_duration_seconds,status=2XX,\
|
||||
/// xff=sum#!~#sum
|
||||
///
|
||||
/// measurement = "http_api_request"
|
||||
/// tags = [("status", "2XX")]
|
||||
/// field = "sum"
|
||||
/// ```
|
||||
pub fn parse_tsm_key(key: &[u8]) -> Result<ParsedTsmKey, Error> {
|
||||
// Wrap in an internal function to translate error types and add key context
|
||||
parse_tsm_key_internal(key).context(ParsingTsmKeySnafu {
|
||||
|
|
|
@ -258,8 +258,7 @@ impl IngesterData {
|
|||
for shard_index in shard_indexes {
|
||||
let shard_data = self
|
||||
.shards
|
||||
.iter()
|
||||
.map(|(_, shard_data)| shard_data)
|
||||
.values()
|
||||
.find(|shard_data| shard_data.shard_index() == shard_index);
|
||||
|
||||
let progress = match shard_data {
|
||||
|
|
|
@ -401,7 +401,7 @@ async fn new_pool(
|
|||
options: &PostgresConnectionOptions,
|
||||
) -> Result<HotSwapPool<Postgres>, sqlx::Error> {
|
||||
let parsed_dsn = match get_dsn_file_path(&options.dsn) {
|
||||
Some(filename) => std::fs::read_to_string(&filename)?,
|
||||
Some(filename) => std::fs::read_to_string(filename)?,
|
||||
None => options.dsn.clone(),
|
||||
};
|
||||
let pool = HotSwapPool::new(new_raw_pool(options, &parsed_dsn).await?);
|
||||
|
|
|
@ -173,7 +173,7 @@ impl IntoFieldList for Vec<FieldList> {
|
|||
}
|
||||
}
|
||||
|
||||
let mut fields = field_map.into_iter().map(|(_, v)| v).collect::<Vec<_>>();
|
||||
let mut fields = field_map.into_values().collect::<Vec<_>>();
|
||||
fields.sort_by(|a, b| a.name.cmp(&b.name));
|
||||
|
||||
Ok(FieldList { fields })
|
||||
|
|
|
@ -297,7 +297,7 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
/// Implement ChunkMeta for Arc<dyn QueryChunk>
|
||||
/// Implement `ChunkMeta` for `Arc<dyn QueryChunk>`
|
||||
impl QueryChunkMeta for Arc<dyn QueryChunk> {
|
||||
fn summary(&self) -> Arc<TableSummary> {
|
||||
self.as_ref().summary()
|
||||
|
|
|
@ -236,7 +236,7 @@ impl Stream for SchemaAdapterStream {
|
|||
/// Describes how to create column in the output.
|
||||
#[derive(Debug)]
|
||||
enum ColumnMapping {
|
||||
/// Output column is found at <index> column of the input schema
|
||||
/// Output column is found at `<index>` column of the input schema
|
||||
FromInput(usize),
|
||||
/// Output colum should be synthesized with nulls of the specified type
|
||||
MakeNull(DataType),
|
||||
|
|
|
@ -82,9 +82,8 @@ pub fn prune_chunks(
|
|||
prune_summaries(table_schema, &summaries, predicate)
|
||||
}
|
||||
|
||||
/// Given a Vec of pruning summaries, return a Vec<bool>
|
||||
/// where `false` indicates that the predicate can be proven to evaluate to
|
||||
/// `false` for every single row.
|
||||
/// Given a `Vec` of pruning summaries, return a `Vec<bool>` where `false` indicates that the
|
||||
/// predicate can be proven to evaluate to `false` for every single row.
|
||||
pub fn prune_summaries(
|
||||
table_schema: Arc<Schema>,
|
||||
summaries: &Vec<Arc<TableSummary>>,
|
||||
|
|
|
@ -301,7 +301,7 @@ pub struct IoxMetadata {
|
|||
impl IoxMetadata {
|
||||
/// Convert to base64 encoded protobuf format
|
||||
pub fn to_base64(&self) -> std::result::Result<String, prost::EncodeError> {
|
||||
Ok(base64::encode(&self.to_protobuf()?))
|
||||
Ok(base64::encode(self.to_protobuf()?))
|
||||
}
|
||||
|
||||
/// Read from base64 encoded protobuf format
|
||||
|
|
|
@ -113,7 +113,7 @@ impl NamespaceCache {
|
|||
testing,
|
||||
));
|
||||
|
||||
let mut backend = PolicyBackend::new(Box::new(HashMap::new()), Arc::clone(&time_provider));
|
||||
let mut backend = PolicyBackend::hashmap_backed(Arc::clone(&time_provider));
|
||||
backend.add_policy(TtlPolicy::new(
|
||||
Arc::new(OptionalValueTtlProvider::new(
|
||||
Some(TTL_NON_EXISTING),
|
||||
|
@ -219,14 +219,14 @@ impl CachedTable {
|
|||
+ (self.column_id_map.capacity() * size_of::<(ColumnId, Arc<str>)>())
|
||||
+ self
|
||||
.column_id_map
|
||||
.iter()
|
||||
.map(|(_id, name)| name.len())
|
||||
.values()
|
||||
.map(|name| name.len())
|
||||
.sum::<usize>()
|
||||
+ (self.column_id_map_rev.capacity() * size_of::<(Arc<str>, ColumnId)>())
|
||||
+ self
|
||||
.column_id_map_rev
|
||||
.iter()
|
||||
.map(|(name, _id)| name.len())
|
||||
.keys()
|
||||
.map(|name| name.len())
|
||||
.sum::<usize>()
|
||||
+ (self.primary_key_column_ids.capacity() * size_of::<ColumnId>())
|
||||
}
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
//! Cache for immutable object store entires.
|
||||
use std::{collections::HashMap, mem::size_of_val, ops::Range, sync::Arc};
|
||||
use std::{mem::size_of_val, ops::Range, sync::Arc};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use backoff::{Backoff, BackoffConfig};
|
||||
|
@ -105,7 +105,7 @@ impl ObjectStoreCache {
|
|||
));
|
||||
|
||||
// add to memory pool
|
||||
let mut backend = PolicyBackend::new(Box::new(HashMap::new()), Arc::clone(&time_provider));
|
||||
let mut backend = PolicyBackend::hashmap_backed(Arc::clone(&time_provider));
|
||||
backend.add_policy(LruPolicy::new(
|
||||
Arc::clone(&ram_pool),
|
||||
CACHE_ID,
|
||||
|
|
|
@ -156,7 +156,7 @@ impl ParquetFileCache {
|
|||
testing,
|
||||
));
|
||||
|
||||
let mut backend = PolicyBackend::new(Box::new(HashMap::new()), Arc::clone(&time_provider));
|
||||
let mut backend = PolicyBackend::hashmap_backed(Arc::clone(&time_provider));
|
||||
let (policy_constructor, remove_if_handle) =
|
||||
RemoveIfPolicy::create_constructor_and_handle(CACHE_ID, metric_registry);
|
||||
backend.add_policy(policy_constructor);
|
||||
|
|
|
@ -89,7 +89,7 @@ impl PartitionCache {
|
|||
testing,
|
||||
));
|
||||
|
||||
let mut backend = PolicyBackend::new(Box::new(HashMap::new()), Arc::clone(&time_provider));
|
||||
let mut backend = PolicyBackend::hashmap_backed(Arc::clone(&time_provider));
|
||||
let (policy_constructor, remove_if_handle) =
|
||||
RemoveIfPolicy::create_constructor_and_handle(CACHE_ID, metric_registry);
|
||||
backend.add_policy(policy_constructor);
|
||||
|
|
|
@ -14,7 +14,7 @@ use cache_system::{
|
|||
use data_types::{ParquetFileId, TombstoneId};
|
||||
use iox_catalog::interface::Catalog;
|
||||
use iox_time::TimeProvider;
|
||||
use std::{collections::HashMap, mem::size_of_val, sync::Arc, time::Duration};
|
||||
use std::{mem::size_of_val, sync::Arc, time::Duration};
|
||||
use trace::span::Span;
|
||||
|
||||
use super::ram::RamSize;
|
||||
|
@ -78,7 +78,7 @@ impl ProcessedTombstonesCache {
|
|||
testing,
|
||||
));
|
||||
|
||||
let mut backend = PolicyBackend::new(Box::new(HashMap::new()), Arc::clone(&time_provider));
|
||||
let mut backend = PolicyBackend::hashmap_backed(Arc::clone(&time_provider));
|
||||
backend.add_policy(TtlPolicy::new(
|
||||
Arc::new(KeepExistsForever {}),
|
||||
CACHE_ID,
|
||||
|
|
|
@ -3,7 +3,6 @@
|
|||
//! While this is technically NOT caching catalog requests (i.e. CPU and IO work), it heavily reduced memory when
|
||||
//! creating [`QuerierParquetChunk`](crate::parquet::QuerierParquetChunk)s.
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
mem::{size_of, size_of_val},
|
||||
sync::Arc,
|
||||
};
|
||||
|
@ -114,7 +113,7 @@ impl ProjectedSchemaCache {
|
|||
));
|
||||
|
||||
// add to memory pool
|
||||
let mut backend = PolicyBackend::new(Box::new(HashMap::new()), Arc::clone(&time_provider));
|
||||
let mut backend = PolicyBackend::hashmap_backed(Arc::clone(&time_provider));
|
||||
backend.add_policy(LruPolicy::new(
|
||||
Arc::clone(&ram_pool),
|
||||
CACHE_ID,
|
||||
|
@ -160,6 +159,7 @@ impl ProjectedSchemaCache {
|
|||
mod tests {
|
||||
use iox_time::SystemProvider;
|
||||
use schema::{builder::SchemaBuilder, TIME_COLUMN_NAME};
|
||||
use std::collections::HashMap;
|
||||
|
||||
use crate::cache::ram::test_util::test_ram_pool;
|
||||
|
||||
|
|
|
@ -15,7 +15,7 @@ use data_types::{SequenceNumber, TableId, Tombstone};
|
|||
use iox_catalog::interface::Catalog;
|
||||
use iox_time::TimeProvider;
|
||||
use snafu::{ResultExt, Snafu};
|
||||
use std::{collections::HashMap, mem, sync::Arc};
|
||||
use std::{mem, sync::Arc};
|
||||
use trace::span::Span;
|
||||
|
||||
use super::ram::RamSize;
|
||||
|
@ -125,8 +125,7 @@ impl TombstoneCache {
|
|||
testing,
|
||||
));
|
||||
|
||||
let mut backend =
|
||||
PolicyBackend::new(Box::new(HashMap::new()), Arc::clone(&time_provider) as _);
|
||||
let mut backend = PolicyBackend::hashmap_backed(Arc::clone(&time_provider) as _);
|
||||
let (policy_constructor, remove_if_handle) =
|
||||
RemoveIfPolicy::create_constructor_and_handle(CACHE_ID, metric_registry);
|
||||
backend.add_policy(policy_constructor);
|
||||
|
|
|
@ -357,7 +357,7 @@ fn truncate_by_months(t: i64, d: &Duration) -> i64 {
|
|||
|
||||
// Determine the total number of months and truncate
|
||||
// the number of months by the duration amount.
|
||||
let mut total = (year * 12) as i32 + (month - 1) as i32;
|
||||
let mut total = (year * 12) + (month - 1) as i32;
|
||||
let remainder = total % d.months() as i32;
|
||||
total -= remainder;
|
||||
|
||||
|
|
|
@ -653,7 +653,7 @@ SELECT * from cpu ORDER BY time DESC;
|
|||
std::fs::create_dir(&in_dir).expect("create in-dir");
|
||||
|
||||
let out_dir = dir.path().join("out");
|
||||
std::fs::create_dir(&out_dir).expect("create out-dir");
|
||||
std::fs::create_dir(out_dir).expect("create out-dir");
|
||||
|
||||
let mut file = in_dir;
|
||||
file.push("foo.sql");
|
||||
|
|
|
@ -1,3 +1,3 @@
|
|||
[toolchain]
|
||||
channel = "1.65"
|
||||
channel = "1.66"
|
||||
components = [ "rustfmt", "clippy" ]
|
||||
|
|
|
@ -291,7 +291,7 @@ impl Schema {
|
|||
self.inner.fields().is_empty()
|
||||
}
|
||||
|
||||
/// Returns an iterator of (Option<InfluxColumnType>, &Field) for
|
||||
/// Returns an iterator of `(Option<InfluxColumnType>, &Field)` for
|
||||
/// all the columns of this schema, in order
|
||||
pub fn iter(&self) -> SchemaIter<'_> {
|
||||
SchemaIter::new(self)
|
||||
|
|
|
@ -38,22 +38,22 @@ pub enum Error {
|
|||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||
|
||||
/// Convert a set of tag_keys into a form suitable for gRPC transport,
|
||||
/// adding the special 0x00 (_m) and 0xff (_f) tag keys
|
||||
/// adding the special `0x00` (`_m`) and `0xff` (`_f`) tag keys
|
||||
///
|
||||
/// Namely, a Vec<Vec<u8>>, including the measurement and field names
|
||||
/// Namely, a `Vec<Vec<u8>>`, including the measurement and field names
|
||||
pub fn tag_keys_to_byte_vecs(tag_keys: Arc<BTreeSet<String>>) -> Vec<Vec<u8>> {
|
||||
// special case measurement (0x00) and field (0xff)
|
||||
// special case measurement (`0x00`) and field (`0xff`)
|
||||
// ensuring they are in the correct sort order (first and last, respectively)
|
||||
let mut byte_vecs = Vec::with_capacity(2 + tag_keys.len());
|
||||
byte_vecs.push(TAG_KEY_MEASUREMENT.to_vec()); // Shown as _m == _measurement
|
||||
byte_vecs.push(TAG_KEY_MEASUREMENT.to_vec()); // Shown as `_m == _measurement`
|
||||
tag_keys.iter().for_each(|name| {
|
||||
byte_vecs.push(name.bytes().collect());
|
||||
});
|
||||
byte_vecs.push(TAG_KEY_FIELD.to_vec()); // Shown as _f == _field
|
||||
byte_vecs.push(TAG_KEY_FIELD.to_vec()); // Shown as `_f == _field`
|
||||
byte_vecs
|
||||
}
|
||||
|
||||
/// Convert Series and Groups ` into a form suitable for gRPC transport:
|
||||
/// Convert Series and Groups into a form suitable for gRPC transport:
|
||||
///
|
||||
/// ```text
|
||||
/// (GroupFrame) potentially
|
||||
|
@ -157,7 +157,7 @@ fn group_to_frame(group: series::Group) -> Frame {
|
|||
Frame { data: Some(data) }
|
||||
}
|
||||
|
||||
/// Convert the tag=value pairs from Arc<str> to Vec<u8> for gRPC transport
|
||||
/// Convert the `tag=value` pairs from `Arc<str>` to `Vec<u8>` for gRPC transport
|
||||
fn convert_tags(tags: Vec<series::Tag>, tag_key_binary_format: bool) -> Vec<Tag> {
|
||||
let mut res: Vec<Tag> = tags
|
||||
.into_iter()
|
||||
|
|
|
@ -1652,7 +1652,7 @@ pub trait ErrorLogger {
|
|||
/// Log the contents of self with a string of context. The context
|
||||
/// should appear in a message such as
|
||||
///
|
||||
/// "Error <context>: <formatted error message>
|
||||
/// "Error `<context>`: `<formatted error message>`
|
||||
fn log_if_error(self, context: &str) -> Self;
|
||||
|
||||
/// Provided method to log an error via the `error!` macro
|
||||
|
|
|
@ -133,7 +133,7 @@ pub fn maybe_start_logging() {
|
|||
/// Is a macro so test error
|
||||
/// messages are on the same line as the failure;
|
||||
///
|
||||
/// Both arguments must be convertable into Strings (Into<String>)
|
||||
/// Both arguments must be convertable into `String`s (`Into<String>`)
|
||||
macro_rules! assert_contains {
|
||||
($ACTUAL: expr, $EXPECTED: expr) => {
|
||||
let actual_value: String = $ACTUAL.into();
|
||||
|
@ -152,7 +152,7 @@ macro_rules! assert_contains {
|
|||
/// a nice error message if that check fails. Is a macro so test error
|
||||
/// messages are on the same line as the failure;
|
||||
///
|
||||
/// Both arguments must be convertable into Strings (Into<String>)
|
||||
/// Both arguments must be convertable into `String`s (`Into<String>`)
|
||||
macro_rules! assert_not_contains {
|
||||
($ACTUAL: expr, $UNEXPECTED: expr) => {
|
||||
let actual_value: String = $ACTUAL.into();
|
||||
|
@ -182,10 +182,10 @@ macro_rules! assert_error {
|
|||
}
|
||||
|
||||
#[macro_export]
|
||||
/// Assert that `actual` and `expected` values are within `epsilon` of
|
||||
/// each other. Used to compare values that may fluctuate from run to run (e.g. because they encode timestamps)
|
||||
/// Assert that `actual` and `expected` values are within `epsilon` of each other. Used to compare
|
||||
/// values that may fluctuate from run to run (e.g. because they encode timestamps)
|
||||
///
|
||||
/// Usage: assert_close!(actual, expected, epsilon);
|
||||
/// Usage: `assert_close!(actual, expected, epsilon);`
|
||||
macro_rules! assert_close {
|
||||
($ACTUAL:expr, $EXPECTED:expr, $EPSILON:expr) => {{
|
||||
{
|
||||
|
|
|
@ -280,7 +280,7 @@ impl TestConfig {
|
|||
"INFLUXDB_IOX_WRITE_BUFFER_AUTO_CREATE_TOPICS",
|
||||
n_shards.to_string(),
|
||||
)
|
||||
.with_env("INFLUXDB_IOX_WRITE_BUFFER_ADDR", &write_buffer_string)
|
||||
.with_env("INFLUXDB_IOX_WRITE_BUFFER_ADDR", write_buffer_string)
|
||||
}
|
||||
|
||||
/// Configures this TestConfig to use the same write buffer as other
|
||||
|
@ -307,7 +307,7 @@ impl TestConfig {
|
|||
|
||||
let wal_string = tmpdir.path().display().to_string();
|
||||
self.wal_dir = Some(Arc::new(tmpdir));
|
||||
self.with_env("INFLUXDB_IOX_WAL_DIRECTORY", &wal_string)
|
||||
self.with_env("INFLUXDB_IOX_WAL_DIRECTORY", wal_string)
|
||||
}
|
||||
|
||||
/// Configures a new object store
|
||||
|
@ -317,7 +317,7 @@ impl TestConfig {
|
|||
let object_store_string = tmpdir.path().display().to_string();
|
||||
self.object_store_dir = Some(Arc::new(tmpdir));
|
||||
self.with_env("INFLUXDB_IOX_OBJECT_STORE", "file")
|
||||
.with_env("INFLUXDB_IOX_DB_DIR", &object_store_string)
|
||||
.with_env("INFLUXDB_IOX_DB_DIR", object_store_string)
|
||||
}
|
||||
|
||||
/// Configures this TestConfig to use the same object store as other
|
||||
|
|
|
@ -159,18 +159,20 @@ impl GrpcRequestBuilder {
|
|||
self.regex_predicate(tag_key_name, pattern, Comparison::NotRegex)
|
||||
}
|
||||
|
||||
/// Set predicate to tag_name <op> /pattern/
|
||||
/// Set predicate to `tag_name <op> /pattern/`
|
||||
///
|
||||
/// where op is `Regex` or `NotRegEx`
|
||||
/// The constitution of this request was formed by looking at a real request
|
||||
/// made to storage, which looked like this:
|
||||
///
|
||||
/// ```text
|
||||
/// root:<
|
||||
/// node_type:COMPARISON_EXPRESSION
|
||||
/// children:<node_type:TAG_REF tag_ref_value:"tag_key_name" >
|
||||
/// children:<node_type:LITERAL regex_value:"pattern" >
|
||||
/// comparison:REGEX
|
||||
/// >
|
||||
/// ```
|
||||
pub fn regex_predicate(
|
||||
self,
|
||||
tag_key_name: impl Into<String>,
|
||||
|
|
|
@ -296,7 +296,7 @@ impl Wal {
|
|||
/// Open a reader to a closed segment
|
||||
pub fn reader_for_segment(&self, id: SegmentId) -> Result<ClosedSegmentFileReader> {
|
||||
let path = build_segment_path(&self.root, id);
|
||||
ClosedSegmentFileReader::from_path(&path)
|
||||
ClosedSegmentFileReader::from_path(path)
|
||||
}
|
||||
|
||||
/// Writes one [`SequencedWalOp`] to the buffer and returns a watch channel for when the buffer
|
||||
|
|
|
@ -27,13 +27,13 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
|
|||
/// This struct contains sufficient information to determine the
|
||||
/// current state of the write as a whole
|
||||
#[derive(Debug, Default, Clone, PartialEq, Eq)]
|
||||
/// Summary of a Vec<Vec<DmlMeta>>
|
||||
/// Summary of a `Vec<Vec<DmlMeta>>`
|
||||
pub struct WriteSummary {
|
||||
/// Key is the shard index from the DmlMeta structure (aka kafka
|
||||
/// Key is the shard index from the `DmlMeta` structure (aka kafka
|
||||
/// partition id), value is the sequence numbers from that
|
||||
/// shard.
|
||||
///
|
||||
/// Note: BTreeMap to ensure the output is in a consistent order
|
||||
/// Note: `BTreeMap` to ensure the output is in a consistent order
|
||||
shards: BTreeMap<ShardIndex, Vec<SequenceNumber>>,
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue