test: add test helpers for object store types (#25420)
This adds a new crate `influxdb3_test_helpers` which provides two object store helper types that can be used to track request counts made through the store, as well as synchronize requests made through the store, resp.praveen/parquet-file-size
parent
7a903ca080
commit
7d37bbbce7
|
@ -2790,6 +2790,19 @@ dependencies = [
|
|||
"tokio",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "influxdb3_test_helpers"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"bytes",
|
||||
"futures",
|
||||
"hashbrown 0.14.5",
|
||||
"object_store",
|
||||
"parking_lot",
|
||||
"tokio",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "influxdb3_wal"
|
||||
version = "0.1.0"
|
||||
|
@ -2839,6 +2852,7 @@ dependencies = [
|
|||
"influxdb-line-protocol",
|
||||
"influxdb3_catalog",
|
||||
"influxdb3_id",
|
||||
"influxdb3_test_helpers",
|
||||
"influxdb3_wal",
|
||||
"insta",
|
||||
"iox_catalog",
|
||||
|
|
|
@ -8,10 +8,11 @@ members = [
|
|||
"influxdb3_load_generator",
|
||||
"influxdb3_process",
|
||||
"influxdb3_server",
|
||||
"influxdb3_telemetry",
|
||||
"influxdb3_test_helpers",
|
||||
"influxdb3_wal",
|
||||
"influxdb3_write",
|
||||
"iox_query_influxql_rewrite",
|
||||
"influxdb3_telemetry",
|
||||
]
|
||||
default-members = ["influxdb3"]
|
||||
|
||||
|
|
|
@ -0,0 +1,18 @@
|
|||
[package]
|
||||
name = "influxdb3_test_helpers"
|
||||
version.workspace = true
|
||||
authors.workspace = true
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[dependencies]
|
||||
async-trait.workspace = true
|
||||
bytes.workspace = true
|
||||
futures.workspace = true
|
||||
hashbrown.workspace = true
|
||||
object_store.workspace = true
|
||||
parking_lot.workspace = true
|
||||
tokio.workspace = true
|
||||
|
||||
[lints]
|
||||
workspace = true
|
|
@ -0,0 +1 @@
|
|||
pub mod object_store;
|
|
@ -0,0 +1,341 @@
|
|||
use std::{ops::Range, sync::Arc};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use bytes::Bytes;
|
||||
use futures::stream::BoxStream;
|
||||
use hashbrown::HashMap;
|
||||
use object_store::{
|
||||
path::Path, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore,
|
||||
PutMultipartOpts, PutOptions, PutPayload, PutResult,
|
||||
};
|
||||
use parking_lot::RwLock;
|
||||
use tokio::sync::Notify;
|
||||
|
||||
type RequestCounter = RwLock<HashMap<Path, usize>>;
|
||||
|
||||
/// A wrapper around an inner object store that tracks requests made to the inner store
|
||||
#[derive(Debug)]
|
||||
pub struct RequestCountedObjectStore {
|
||||
inner: Arc<dyn ObjectStore>,
|
||||
get: RequestCounter,
|
||||
get_opts: RequestCounter,
|
||||
get_range: RequestCounter,
|
||||
get_ranges: RequestCounter,
|
||||
head: RequestCounter,
|
||||
}
|
||||
|
||||
impl RequestCountedObjectStore {
|
||||
pub fn new(inner: Arc<dyn ObjectStore>) -> Self {
|
||||
Self {
|
||||
inner,
|
||||
get: Default::default(),
|
||||
get_opts: Default::default(),
|
||||
get_range: Default::default(),
|
||||
get_ranges: Default::default(),
|
||||
head: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Get the total request count accross READ-style requests for a specific `Path` in the inner
|
||||
/// object store.
|
||||
pub fn total_read_request_count(&self, path: &Path) -> usize {
|
||||
self.get_request_count(path)
|
||||
+ self.get_opts_request_count(path)
|
||||
+ self.get_range_request_count(path)
|
||||
+ self.get_ranges_request_count(path)
|
||||
+ self.head_request_count(path)
|
||||
}
|
||||
|
||||
pub fn get_request_count(&self, path: &Path) -> usize {
|
||||
self.get.read().get(path).copied().unwrap_or(0)
|
||||
}
|
||||
|
||||
pub fn get_opts_request_count(&self, path: &Path) -> usize {
|
||||
self.get_opts.read().get(path).copied().unwrap_or(0)
|
||||
}
|
||||
|
||||
pub fn get_range_request_count(&self, path: &Path) -> usize {
|
||||
self.get_range.read().get(path).copied().unwrap_or(0)
|
||||
}
|
||||
|
||||
pub fn get_ranges_request_count(&self, path: &Path) -> usize {
|
||||
self.get_ranges.read().get(path).copied().unwrap_or(0)
|
||||
}
|
||||
|
||||
pub fn head_request_count(&self, path: &Path) -> usize {
|
||||
self.head.read().get(path).copied().unwrap_or(0)
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for RequestCountedObjectStore {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "TestObjectStore({})", self.inner)
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl ObjectStore for RequestCountedObjectStore {
|
||||
async fn put(&self, location: &Path, bytes: PutPayload) -> object_store::Result<PutResult> {
|
||||
self.inner.put(location, bytes).await
|
||||
}
|
||||
|
||||
async fn put_opts(
|
||||
&self,
|
||||
location: &Path,
|
||||
bytes: PutPayload,
|
||||
opts: PutOptions,
|
||||
) -> object_store::Result<PutResult> {
|
||||
self.inner.put_opts(location, bytes, opts).await
|
||||
}
|
||||
|
||||
async fn put_multipart(
|
||||
&self,
|
||||
location: &Path,
|
||||
) -> object_store::Result<Box<dyn MultipartUpload>> {
|
||||
self.inner.put_multipart(location).await
|
||||
}
|
||||
|
||||
async fn put_multipart_opts(
|
||||
&self,
|
||||
location: &Path,
|
||||
opts: PutMultipartOpts,
|
||||
) -> object_store::Result<Box<dyn MultipartUpload>> {
|
||||
self.inner.put_multipart_opts(location, opts).await
|
||||
}
|
||||
|
||||
async fn get(&self, location: &Path) -> object_store::Result<GetResult> {
|
||||
*self.get.write().entry(location.clone()).or_insert(0) += 1;
|
||||
self.inner.get(location).await
|
||||
}
|
||||
|
||||
async fn get_opts(
|
||||
&self,
|
||||
location: &Path,
|
||||
options: GetOptions,
|
||||
) -> object_store::Result<GetResult> {
|
||||
*self.get_opts.write().entry(location.clone()).or_insert(0) += 1;
|
||||
self.inner.get_opts(location, options).await
|
||||
}
|
||||
|
||||
async fn get_range(&self, location: &Path, range: Range<usize>) -> object_store::Result<Bytes> {
|
||||
*self.get_range.write().entry(location.clone()).or_insert(0) += 1;
|
||||
self.inner.get_range(location, range).await
|
||||
}
|
||||
|
||||
async fn get_ranges(
|
||||
&self,
|
||||
location: &Path,
|
||||
ranges: &[Range<usize>],
|
||||
) -> object_store::Result<Vec<Bytes>> {
|
||||
*self.get_ranges.write().entry(location.clone()).or_insert(0) += 1;
|
||||
self.inner.get_ranges(location, ranges).await
|
||||
}
|
||||
|
||||
async fn head(&self, location: &Path) -> object_store::Result<ObjectMeta> {
|
||||
*self.head.write().entry(location.clone()).or_insert(0) += 1;
|
||||
self.inner.head(location).await
|
||||
}
|
||||
|
||||
/// Delete an object on object store, but also remove it from the cache.
|
||||
async fn delete(&self, location: &Path) -> object_store::Result<()> {
|
||||
self.inner.delete(location).await
|
||||
}
|
||||
|
||||
fn delete_stream<'a>(
|
||||
&'a self,
|
||||
locations: BoxStream<'a, object_store::Result<Path>>,
|
||||
) -> BoxStream<'a, object_store::Result<Path>> {
|
||||
self.inner.delete_stream(locations)
|
||||
}
|
||||
|
||||
fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, object_store::Result<ObjectMeta>> {
|
||||
self.inner.list(prefix)
|
||||
}
|
||||
|
||||
fn list_with_offset(
|
||||
&self,
|
||||
prefix: Option<&Path>,
|
||||
offset: &Path,
|
||||
) -> BoxStream<'_, object_store::Result<ObjectMeta>> {
|
||||
self.inner.list_with_offset(prefix, offset)
|
||||
}
|
||||
|
||||
async fn list_with_delimiter(&self, prefix: Option<&Path>) -> object_store::Result<ListResult> {
|
||||
self.inner.list_with_delimiter(prefix).await
|
||||
}
|
||||
|
||||
async fn copy(&self, from: &Path, to: &Path) -> object_store::Result<()> {
|
||||
self.inner.copy(from, to).await
|
||||
}
|
||||
|
||||
async fn rename(&self, from: &Path, to: &Path) -> object_store::Result<()> {
|
||||
self.inner.rename(from, to).await
|
||||
}
|
||||
|
||||
async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> object_store::Result<()> {
|
||||
self.inner.copy_if_not_exists(from, to).await
|
||||
}
|
||||
|
||||
async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> object_store::Result<()> {
|
||||
self.inner.rename_if_not_exists(from, to).await
|
||||
}
|
||||
}
|
||||
|
||||
/// A wrapper around an inner object store that can hold execution of certain object store methods
|
||||
/// to synchronize other processes before the request is forwarded to the inner object store
|
||||
///
|
||||
/// # Example
|
||||
/// ```ignore
|
||||
/// // set up notifiers:
|
||||
/// let to_store_notify = Arc::new(Notify::new());
|
||||
/// let from_store_notify = Arc::new(Notify::new());
|
||||
///
|
||||
/// // create the synchronized store wrapping an in-memory object store:
|
||||
/// let inner_store = Arc::new(
|
||||
/// SynchronizedObjectStore::new(Arc::new(InMemory::new()))
|
||||
/// .with_notifies(Arc::clone(&to_store_notify), Arc::clone(&from_store_notify)),
|
||||
/// );
|
||||
///
|
||||
/// // we are in the middle of a get request once this call to notified wakes:
|
||||
/// let _ = from_store_notify.notified().await;
|
||||
///
|
||||
/// // spawn a thread to wake the in-flight get request:
|
||||
/// let h = tokio::spawn(async move {
|
||||
/// to_store_notify.notify_one();
|
||||
/// let _ = notifier_rx.await;
|
||||
/// });
|
||||
/// ```
|
||||
#[derive(Debug)]
|
||||
pub struct SynchronizedObjectStore {
|
||||
inner: Arc<dyn ObjectStore>,
|
||||
get_notifies: Option<(Arc<Notify>, Arc<Notify>)>,
|
||||
}
|
||||
|
||||
impl SynchronizedObjectStore {
|
||||
pub fn new(inner: Arc<dyn ObjectStore>) -> Self {
|
||||
Self {
|
||||
inner,
|
||||
get_notifies: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Add notifiers for `get` requests so that async execution can be halted to synchronize other
|
||||
/// processes before the request is forwarded to the inner object store.
|
||||
pub fn with_get_notifies(mut self, inbound: Arc<Notify>, outbound: Arc<Notify>) -> Self {
|
||||
self.get_notifies = Some((inbound, outbound));
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for SynchronizedObjectStore {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "TestObjectStore({})", self.inner)
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl ObjectStore for SynchronizedObjectStore {
|
||||
async fn put(&self, location: &Path, bytes: PutPayload) -> object_store::Result<PutResult> {
|
||||
self.inner.put(location, bytes).await
|
||||
}
|
||||
|
||||
async fn put_opts(
|
||||
&self,
|
||||
location: &Path,
|
||||
bytes: PutPayload,
|
||||
opts: PutOptions,
|
||||
) -> object_store::Result<PutResult> {
|
||||
self.inner.put_opts(location, bytes, opts).await
|
||||
}
|
||||
|
||||
async fn put_multipart(
|
||||
&self,
|
||||
location: &Path,
|
||||
) -> object_store::Result<Box<dyn MultipartUpload>> {
|
||||
self.inner.put_multipart(location).await
|
||||
}
|
||||
|
||||
async fn put_multipart_opts(
|
||||
&self,
|
||||
location: &Path,
|
||||
opts: PutMultipartOpts,
|
||||
) -> object_store::Result<Box<dyn MultipartUpload>> {
|
||||
self.inner.put_multipart_opts(location, opts).await
|
||||
}
|
||||
|
||||
async fn get(&self, location: &Path) -> object_store::Result<GetResult> {
|
||||
if let Some((inbound, outbound)) = &self.get_notifies {
|
||||
outbound.notify_one();
|
||||
inbound.notified().await;
|
||||
}
|
||||
self.inner.get(location).await
|
||||
}
|
||||
|
||||
async fn get_opts(
|
||||
&self,
|
||||
location: &Path,
|
||||
options: GetOptions,
|
||||
) -> object_store::Result<GetResult> {
|
||||
self.inner.get_opts(location, options).await
|
||||
}
|
||||
|
||||
async fn get_range(&self, location: &Path, range: Range<usize>) -> object_store::Result<Bytes> {
|
||||
self.inner.get_range(location, range).await
|
||||
}
|
||||
|
||||
async fn get_ranges(
|
||||
&self,
|
||||
location: &Path,
|
||||
ranges: &[Range<usize>],
|
||||
) -> object_store::Result<Vec<Bytes>> {
|
||||
self.inner.get_ranges(location, ranges).await
|
||||
}
|
||||
|
||||
async fn head(&self, location: &Path) -> object_store::Result<ObjectMeta> {
|
||||
self.inner.head(location).await
|
||||
}
|
||||
|
||||
/// Delete an object on object store, but also remove it from the cache.
|
||||
async fn delete(&self, location: &Path) -> object_store::Result<()> {
|
||||
self.inner.delete(location).await
|
||||
}
|
||||
|
||||
fn delete_stream<'a>(
|
||||
&'a self,
|
||||
locations: BoxStream<'a, object_store::Result<Path>>,
|
||||
) -> BoxStream<'a, object_store::Result<Path>> {
|
||||
self.inner.delete_stream(locations)
|
||||
}
|
||||
|
||||
fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, object_store::Result<ObjectMeta>> {
|
||||
self.inner.list(prefix)
|
||||
}
|
||||
|
||||
fn list_with_offset(
|
||||
&self,
|
||||
prefix: Option<&Path>,
|
||||
offset: &Path,
|
||||
) -> BoxStream<'_, object_store::Result<ObjectMeta>> {
|
||||
self.inner.list_with_offset(prefix, offset)
|
||||
}
|
||||
|
||||
async fn list_with_delimiter(&self, prefix: Option<&Path>) -> object_store::Result<ListResult> {
|
||||
self.inner.list_with_delimiter(prefix).await
|
||||
}
|
||||
|
||||
async fn copy(&self, from: &Path, to: &Path) -> object_store::Result<()> {
|
||||
self.inner.copy(from, to).await
|
||||
}
|
||||
|
||||
async fn rename(&self, from: &Path, to: &Path) -> object_store::Result<()> {
|
||||
self.inner.rename(from, to).await
|
||||
}
|
||||
|
||||
async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> object_store::Result<()> {
|
||||
self.inner.copy_if_not_exists(from, to).await
|
||||
}
|
||||
|
||||
async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> object_store::Result<()> {
|
||||
self.inner.rename_if_not_exists(from, to).await
|
||||
}
|
||||
}
|
|
@ -21,6 +21,7 @@ schema.workspace = true
|
|||
# Local deps
|
||||
influxdb3_catalog = { path = "../influxdb3_catalog" }
|
||||
influxdb3_id = { path = "../influxdb3_id" }
|
||||
influxdb3_test_helpers = { path = "../influxdb3_test_helpers" }
|
||||
influxdb3_wal = { path = "../influxdb3_wal" }
|
||||
|
||||
# crates.io dependencies
|
||||
|
|
|
@ -677,19 +677,15 @@ fn background_cache_pruner(
|
|||
|
||||
#[cfg(test)]
|
||||
pub(crate) mod tests {
|
||||
use std::{ops::Range, sync::Arc, time::Duration};
|
||||
use std::{sync::Arc, time::Duration};
|
||||
|
||||
use arrow::datatypes::ToByteSlice;
|
||||
use async_trait::async_trait;
|
||||
use bytes::Bytes;
|
||||
use futures::stream::BoxStream;
|
||||
use hashbrown::HashMap;
|
||||
use iox_time::{MockProvider, Time, TimeProvider};
|
||||
use object_store::{
|
||||
memory::InMemory, path::Path, GetOptions, GetResult, ListResult, MultipartUpload,
|
||||
ObjectMeta, ObjectStore, PutMultipartOpts, PutOptions, PutPayload, PutResult,
|
||||
use influxdb3_test_helpers::object_store::{
|
||||
RequestCountedObjectStore, SynchronizedObjectStore,
|
||||
};
|
||||
use parking_lot::RwLock;
|
||||
use iox_time::{MockProvider, Time, TimeProvider};
|
||||
use object_store::{memory::InMemory, path::Path, ObjectStore, PutPayload};
|
||||
|
||||
use pretty_assertions::assert_eq;
|
||||
use tokio::sync::Notify;
|
||||
|
||||
|
@ -716,7 +712,7 @@ pub(crate) mod tests {
|
|||
#[tokio::test]
|
||||
async fn hit_cache_instead_of_object_store() {
|
||||
// set up the inner test object store and then wrap it with the mem cached store:
|
||||
let inner_store = Arc::new(TestObjectStore::new(Arc::new(InMemory::new())));
|
||||
let inner_store = Arc::new(RequestCountedObjectStore::new(Arc::new(InMemory::new())));
|
||||
let time_provider: Arc<dyn TimeProvider> =
|
||||
Arc::new(MockProvider::new(Time::from_timestamp_nanos(0)));
|
||||
let (cached_store, oracle) = test_cached_obj_store_and_oracle(
|
||||
|
@ -733,8 +729,7 @@ pub(crate) mod tests {
|
|||
|
||||
// GET the payload from the object store before caching:
|
||||
assert_payload_at_equals!(cached_store, payload, path);
|
||||
assert_eq!(1, inner_store.total_get_request_count());
|
||||
assert_eq!(1, inner_store.get_request_count(&path));
|
||||
assert_eq!(1, inner_store.total_read_request_count(&path));
|
||||
|
||||
// cache the entry:
|
||||
let (cache_request, notifier_rx) = CacheRequest::create(path.clone());
|
||||
|
@ -744,21 +739,19 @@ pub(crate) mod tests {
|
|||
let _ = notifier_rx.await;
|
||||
|
||||
// another request to inner store should have been made:
|
||||
assert_eq!(2, inner_store.total_get_request_count());
|
||||
assert_eq!(2, inner_store.get_request_count(&path));
|
||||
assert_eq!(2, inner_store.total_read_request_count(&path));
|
||||
|
||||
// get the payload from the outer store again:
|
||||
assert_payload_at_equals!(cached_store, payload, path);
|
||||
|
||||
// should hit the cache this time, so the inner store should not have been hit, and counts
|
||||
// should therefore be same as previous:
|
||||
assert_eq!(2, inner_store.total_get_request_count());
|
||||
assert_eq!(2, inner_store.get_request_count(&path));
|
||||
assert_eq!(2, inner_store.total_read_request_count(&path));
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
|
||||
async fn cache_evicts_lru_when_full() {
|
||||
let inner_store = Arc::new(TestObjectStore::new(Arc::new(InMemory::new())));
|
||||
let inner_store = Arc::new(RequestCountedObjectStore::new(Arc::new(InMemory::new())));
|
||||
let time_provider = Arc::new(MockProvider::new(Time::from_timestamp_nanos(0)));
|
||||
// these are magic numbers that will make it so the third entry exceeds the cache capacity:
|
||||
let cache_capacity_bytes = 60;
|
||||
|
@ -784,8 +777,7 @@ pub(crate) mod tests {
|
|||
oracle.register(cache_request);
|
||||
let _ = notifier_rx.await;
|
||||
// there will have been one get request made by the cache oracle:
|
||||
assert_eq!(1, inner_store.total_get_request_count());
|
||||
assert_eq!(1, inner_store.get_request_count(&path_1));
|
||||
assert_eq!(1, inner_store.total_read_request_count(&path_1));
|
||||
|
||||
// update time:
|
||||
time_provider.set(Time::from_timestamp_nanos(1));
|
||||
|
@ -793,8 +785,7 @@ pub(crate) mod tests {
|
|||
// GET the entry to check its there and was retrieved from cache, i.e., that the request
|
||||
// counts do not change:
|
||||
assert_payload_at_equals!(cached_store, payload_1, path_1);
|
||||
assert_eq!(1, inner_store.total_get_request_count());
|
||||
assert_eq!(1, inner_store.get_request_count(&path_1));
|
||||
assert_eq!(1, inner_store.total_read_request_count(&path_1));
|
||||
|
||||
// PUT a second entry into the store:
|
||||
let path_2 = Path::from("1.parquet");
|
||||
|
@ -813,9 +804,8 @@ pub(crate) mod tests {
|
|||
oracle.register(cache_request);
|
||||
let _ = notifier_rx.await;
|
||||
// will have another request for the second path to the inner store, by the oracle:
|
||||
assert_eq!(2, inner_store.total_get_request_count());
|
||||
assert_eq!(1, inner_store.get_request_count(&path_1));
|
||||
assert_eq!(1, inner_store.get_request_count(&path_2));
|
||||
assert_eq!(1, inner_store.total_read_request_count(&path_1));
|
||||
assert_eq!(1, inner_store.total_read_request_count(&path_2));
|
||||
|
||||
// update time:
|
||||
time_provider.set(Time::from_timestamp_nanos(3));
|
||||
|
@ -823,9 +813,8 @@ pub(crate) mod tests {
|
|||
// GET the second entry and assert that it was retrieved from the cache, i.e., that the
|
||||
// request counts do not change:
|
||||
assert_payload_at_equals!(cached_store, payload_2, path_2);
|
||||
assert_eq!(2, inner_store.total_get_request_count());
|
||||
assert_eq!(1, inner_store.get_request_count(&path_1));
|
||||
assert_eq!(1, inner_store.get_request_count(&path_2));
|
||||
assert_eq!(1, inner_store.total_read_request_count(&path_1));
|
||||
assert_eq!(1, inner_store.total_read_request_count(&path_2));
|
||||
|
||||
// update time:
|
||||
time_provider.set(Time::from_timestamp_nanos(4));
|
||||
|
@ -834,8 +823,8 @@ pub(crate) mod tests {
|
|||
// will also update the hit count so that the first entry (janeway) was used more recently
|
||||
// than the second entry (paris):
|
||||
assert_payload_at_equals!(cached_store, payload_1, path_1);
|
||||
assert_eq!(2, inner_store.total_get_request_count());
|
||||
assert_eq!(1, inner_store.get_request_count(&path_1));
|
||||
assert_eq!(1, inner_store.total_read_request_count(&path_1));
|
||||
assert_eq!(1, inner_store.total_read_request_count(&path_2));
|
||||
|
||||
// PUT a third entry into the store:
|
||||
let path_3 = Path::from("2.parquet");
|
||||
|
@ -854,20 +843,18 @@ pub(crate) mod tests {
|
|||
oracle.register(cache_request);
|
||||
let _ = notifier_rx.await;
|
||||
// will now have another request for the third path to the inner store, by the oracle:
|
||||
assert_eq!(3, inner_store.total_get_request_count());
|
||||
assert_eq!(1, inner_store.get_request_count(&path_1));
|
||||
assert_eq!(1, inner_store.get_request_count(&path_2));
|
||||
assert_eq!(1, inner_store.get_request_count(&path_3));
|
||||
assert_eq!(1, inner_store.total_read_request_count(&path_1));
|
||||
assert_eq!(1, inner_store.total_read_request_count(&path_2));
|
||||
assert_eq!(1, inner_store.total_read_request_count(&path_3));
|
||||
|
||||
// update time:
|
||||
time_provider.set(Time::from_timestamp_nanos(6));
|
||||
|
||||
// GET the new entry from the strore, and check that it was served by the cache:
|
||||
assert_payload_at_equals!(cached_store, payload_3, path_3);
|
||||
assert_eq!(3, inner_store.total_get_request_count());
|
||||
assert_eq!(1, inner_store.get_request_count(&path_1));
|
||||
assert_eq!(1, inner_store.get_request_count(&path_2));
|
||||
assert_eq!(1, inner_store.get_request_count(&path_3));
|
||||
assert_eq!(1, inner_store.total_read_request_count(&path_1));
|
||||
assert_eq!(1, inner_store.total_read_request_count(&path_2));
|
||||
assert_eq!(1, inner_store.total_read_request_count(&path_3));
|
||||
|
||||
// allow some time for pruning:
|
||||
tokio::time::sleep(Duration::from_millis(500)).await;
|
||||
|
@ -875,20 +862,21 @@ pub(crate) mod tests {
|
|||
// GET paris from the cached store, this will not be served by the cache, because paris was
|
||||
// evicted by neelix:
|
||||
assert_payload_at_equals!(cached_store, payload_2, path_2);
|
||||
assert_eq!(4, inner_store.total_get_request_count());
|
||||
assert_eq!(1, inner_store.get_request_count(&path_1));
|
||||
assert_eq!(2, inner_store.get_request_count(&path_2));
|
||||
assert_eq!(1, inner_store.get_request_count(&path_3));
|
||||
assert_eq!(1, inner_store.total_read_request_count(&path_1));
|
||||
assert_eq!(2, inner_store.total_read_request_count(&path_2));
|
||||
assert_eq!(1, inner_store.total_read_request_count(&path_3));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn cache_hit_while_fetching() {
|
||||
// Create a test store with a barrier:
|
||||
// Create the object store with the following layers:
|
||||
// Synchronized -> RequestCounted -> Inner
|
||||
let to_store_notify = Arc::new(Notify::new());
|
||||
let from_store_notify = Arc::new(Notify::new());
|
||||
let counter = Arc::new(RequestCountedObjectStore::new(Arc::new(InMemory::new())));
|
||||
let inner_store = Arc::new(
|
||||
TestObjectStore::new(Arc::new(InMemory::new()))
|
||||
.with_notifies(Arc::clone(&to_store_notify), Arc::clone(&from_store_notify)),
|
||||
SynchronizedObjectStore::new(Arc::clone(&counter) as _)
|
||||
.with_get_notifies(Arc::clone(&to_store_notify), Arc::clone(&from_store_notify)),
|
||||
);
|
||||
let time_provider = Arc::new(MockProvider::new(Time::from_timestamp_nanos(0)));
|
||||
let (cached_store, oracle) = test_cached_obj_store_and_oracle(
|
||||
|
@ -928,193 +916,10 @@ pub(crate) mod tests {
|
|||
h.await.unwrap();
|
||||
|
||||
// there should only have been one request made, i.e., from the cache oracle:
|
||||
assert_eq!(1, inner_store.total_get_request_count());
|
||||
assert_eq!(1, inner_store.get_request_count(&path));
|
||||
assert_eq!(1, counter.total_read_request_count(&path));
|
||||
|
||||
// make another request to the store, to be sure that it is in the cache:
|
||||
assert_payload_at_equals!(cached_store, payload, path);
|
||||
assert_eq!(1, inner_store.total_get_request_count());
|
||||
assert_eq!(1, inner_store.get_request_count(&path));
|
||||
}
|
||||
|
||||
type RequestCounter = RwLock<HashMap<Path, usize>>;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct TestObjectStore {
|
||||
inner: Arc<dyn ObjectStore>,
|
||||
get: RequestCounter,
|
||||
get_opts: RequestCounter,
|
||||
get_range: RequestCounter,
|
||||
get_ranges: RequestCounter,
|
||||
head: RequestCounter,
|
||||
notifies: Option<(Arc<Notify>, Arc<Notify>)>,
|
||||
}
|
||||
|
||||
impl TestObjectStore {
|
||||
pub(crate) fn new(inner: Arc<dyn ObjectStore>) -> Self {
|
||||
Self {
|
||||
inner,
|
||||
get: Default::default(),
|
||||
get_opts: Default::default(),
|
||||
get_range: Default::default(),
|
||||
get_ranges: Default::default(),
|
||||
head: Default::default(),
|
||||
notifies: None,
|
||||
}
|
||||
}
|
||||
|
||||
fn with_notifies(mut self, inbound: Arc<Notify>, outbound: Arc<Notify>) -> Self {
|
||||
self.notifies = Some((inbound, outbound));
|
||||
self
|
||||
}
|
||||
|
||||
pub(crate) fn total_get_request_count(&self) -> usize {
|
||||
self.get.read().iter().map(|(_, size)| size).sum()
|
||||
}
|
||||
|
||||
pub(crate) fn get_request_count(&self, path: &Path) -> usize {
|
||||
self.get.read().get(path).copied().unwrap_or(0)
|
||||
}
|
||||
|
||||
pub(crate) fn get_opts_request_count(&self, path: &Path) -> usize {
|
||||
self.get_opts.read().get(path).copied().unwrap_or(0)
|
||||
}
|
||||
|
||||
pub(crate) fn get_range_request_count(&self, path: &Path) -> usize {
|
||||
self.get_range.read().get(path).copied().unwrap_or(0)
|
||||
}
|
||||
|
||||
pub(crate) fn get_ranges_request_count(&self, path: &Path) -> usize {
|
||||
self.get_ranges.read().get(path).copied().unwrap_or(0)
|
||||
}
|
||||
|
||||
pub(crate) fn head_request_count(&self, path: &Path) -> usize {
|
||||
self.head.read().get(path).copied().unwrap_or(0)
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for TestObjectStore {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "TestObjectStore({})", self.inner)
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl ObjectStore for TestObjectStore {
|
||||
async fn put(&self, location: &Path, bytes: PutPayload) -> object_store::Result<PutResult> {
|
||||
self.inner.put(location, bytes).await
|
||||
}
|
||||
|
||||
async fn put_opts(
|
||||
&self,
|
||||
location: &Path,
|
||||
bytes: PutPayload,
|
||||
opts: PutOptions,
|
||||
) -> object_store::Result<PutResult> {
|
||||
self.inner.put_opts(location, bytes, opts).await
|
||||
}
|
||||
|
||||
async fn put_multipart(
|
||||
&self,
|
||||
location: &Path,
|
||||
) -> object_store::Result<Box<dyn MultipartUpload>> {
|
||||
self.inner.put_multipart(location).await
|
||||
}
|
||||
|
||||
async fn put_multipart_opts(
|
||||
&self,
|
||||
location: &Path,
|
||||
opts: PutMultipartOpts,
|
||||
) -> object_store::Result<Box<dyn MultipartUpload>> {
|
||||
self.inner.put_multipart_opts(location, opts).await
|
||||
}
|
||||
|
||||
async fn get(&self, location: &Path) -> object_store::Result<GetResult> {
|
||||
*self.get.write().entry(location.clone()).or_insert(0) += 1;
|
||||
if let Some((inbound, outbound)) = &self.notifies {
|
||||
outbound.notify_one();
|
||||
inbound.notified().await;
|
||||
}
|
||||
self.inner.get(location).await
|
||||
}
|
||||
|
||||
async fn get_opts(
|
||||
&self,
|
||||
location: &Path,
|
||||
options: GetOptions,
|
||||
) -> object_store::Result<GetResult> {
|
||||
*self.get_opts.write().entry(location.clone()).or_insert(0) += 1;
|
||||
self.inner.get_opts(location, options).await
|
||||
}
|
||||
|
||||
async fn get_range(
|
||||
&self,
|
||||
location: &Path,
|
||||
range: Range<usize>,
|
||||
) -> object_store::Result<Bytes> {
|
||||
*self.get_range.write().entry(location.clone()).or_insert(0) += 1;
|
||||
self.inner.get_range(location, range).await
|
||||
}
|
||||
|
||||
async fn get_ranges(
|
||||
&self,
|
||||
location: &Path,
|
||||
ranges: &[Range<usize>],
|
||||
) -> object_store::Result<Vec<Bytes>> {
|
||||
*self.get_ranges.write().entry(location.clone()).or_insert(0) += 1;
|
||||
self.inner.get_ranges(location, ranges).await
|
||||
}
|
||||
|
||||
async fn head(&self, location: &Path) -> object_store::Result<ObjectMeta> {
|
||||
*self.head.write().entry(location.clone()).or_insert(0) += 1;
|
||||
self.inner.head(location).await
|
||||
}
|
||||
|
||||
/// Delete an object on object store, but also remove it from the cache.
|
||||
async fn delete(&self, location: &Path) -> object_store::Result<()> {
|
||||
self.inner.delete(location).await
|
||||
}
|
||||
|
||||
fn delete_stream<'a>(
|
||||
&'a self,
|
||||
locations: BoxStream<'a, object_store::Result<Path>>,
|
||||
) -> BoxStream<'a, object_store::Result<Path>> {
|
||||
self.inner.delete_stream(locations)
|
||||
}
|
||||
|
||||
fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, object_store::Result<ObjectMeta>> {
|
||||
self.inner.list(prefix)
|
||||
}
|
||||
|
||||
fn list_with_offset(
|
||||
&self,
|
||||
prefix: Option<&Path>,
|
||||
offset: &Path,
|
||||
) -> BoxStream<'_, object_store::Result<ObjectMeta>> {
|
||||
self.inner.list_with_offset(prefix, offset)
|
||||
}
|
||||
|
||||
async fn list_with_delimiter(
|
||||
&self,
|
||||
prefix: Option<&Path>,
|
||||
) -> object_store::Result<ListResult> {
|
||||
self.inner.list_with_delimiter(prefix).await
|
||||
}
|
||||
|
||||
async fn copy(&self, from: &Path, to: &Path) -> object_store::Result<()> {
|
||||
self.inner.copy(from, to).await
|
||||
}
|
||||
|
||||
async fn rename(&self, from: &Path, to: &Path) -> object_store::Result<()> {
|
||||
self.inner.rename(from, to).await
|
||||
}
|
||||
|
||||
async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> object_store::Result<()> {
|
||||
self.inner.copy_if_not_exists(from, to).await
|
||||
}
|
||||
|
||||
async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> object_store::Result<()> {
|
||||
self.inner.rename_if_not_exists(from, to).await
|
||||
}
|
||||
assert_eq!(1, counter.total_read_request_count(&path));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -526,7 +526,6 @@ impl WriteBuffer for WriteBufferImpl {}
|
|||
mod tests {
|
||||
use super::*;
|
||||
use crate::parquet_cache::test_cached_obj_store_and_oracle;
|
||||
use crate::parquet_cache::tests::TestObjectStore;
|
||||
use crate::paths::{CatalogFilePath, SnapshotInfoFilePath};
|
||||
use crate::persister::Persister;
|
||||
use crate::PersistedSnapshot;
|
||||
|
@ -537,6 +536,7 @@ mod tests {
|
|||
use futures_util::StreamExt;
|
||||
use influxdb3_catalog::catalog::SequenceNumber;
|
||||
use influxdb3_id::DbId;
|
||||
use influxdb3_test_helpers::object_store::RequestCountedObjectStore;
|
||||
use influxdb3_wal::{Gen1Duration, SnapshotSequenceNumber, WalFileSequenceNumber};
|
||||
use iox_query::exec::IOxSessionContext;
|
||||
use iox_time::{MockProvider, Time};
|
||||
|
@ -1580,7 +1580,7 @@ mod tests {
|
|||
async fn test_parquet_cache() {
|
||||
// set up a write buffer using a TestObjectStore so we can spy on requests that get
|
||||
// through to the object store for parquet files:
|
||||
let test_store = Arc::new(TestObjectStore::new(Arc::new(InMemory::new())));
|
||||
let test_store = Arc::new(RequestCountedObjectStore::new(Arc::new(InMemory::new())));
|
||||
let obj_store: Arc<dyn ObjectStore> = Arc::clone(&test_store) as _;
|
||||
let (wbuf, ctx) = setup_cache_optional(
|
||||
Time::from_timestamp_nanos(0),
|
||||
|
@ -1684,7 +1684,7 @@ mod tests {
|
|||
async fn test_no_parquet_cache() {
|
||||
// set up a write buffer using a TestObjectStore so we can spy on requests that get
|
||||
// through to the object store for parquet files:
|
||||
let test_store = Arc::new(TestObjectStore::new(Arc::new(InMemory::new())));
|
||||
let test_store = Arc::new(RequestCountedObjectStore::new(Arc::new(InMemory::new())));
|
||||
let obj_store: Arc<dyn ObjectStore> = Arc::clone(&test_store) as _;
|
||||
let (wbuf, ctx) = setup_cache_optional(
|
||||
Time::from_timestamp_nanos(0),
|
||||
|
|
Loading…
Reference in New Issue