feat: basic snapshot caching (#1184)
parent
ee46764a2d
commit
55a77914b1
|
|
@ -1885,6 +1885,7 @@ dependencies = [
|
|||
"influxdb_line_protocol",
|
||||
"internal_types",
|
||||
"observability_deps",
|
||||
"parking_lot",
|
||||
"snafu",
|
||||
"string-interner",
|
||||
"test_helpers",
|
||||
|
|
|
|||
|
|
@ -24,6 +24,7 @@ generated_types = { path = "../generated_types" }
|
|||
influxdb_line_protocol = { path = "../influxdb_line_protocol" }
|
||||
internal_types = { path = "../internal_types" }
|
||||
observability_deps = { path = "../observability_deps" }
|
||||
parking_lot = "0.11.1"
|
||||
snafu = "0.6.2"
|
||||
string-interner = "0.12.2"
|
||||
tokio = { version = "1.0", features = ["macros"] }
|
||||
|
|
|
|||
|
|
@ -21,6 +21,7 @@ use crate::{
|
|||
pred::{ChunkPredicate, ChunkPredicateBuilder},
|
||||
table::Table,
|
||||
};
|
||||
use parking_lot::Mutex;
|
||||
|
||||
pub mod snapshot;
|
||||
|
||||
|
|
@ -129,6 +130,12 @@ pub struct Chunk {
|
|||
|
||||
/// keep track of memory used by chunk
|
||||
tracker: MemTracker,
|
||||
|
||||
/// Cached chunk snapshot
|
||||
///
|
||||
/// Note: This is a mutex to allow mutation within
|
||||
/// `Chunk::snapshot()` which only takes an immutable borrow
|
||||
snapshot: Mutex<Option<Arc<ChunkSnapshot>>>,
|
||||
}
|
||||
|
||||
impl Chunk {
|
||||
|
|
@ -138,6 +145,7 @@ impl Chunk {
|
|||
dictionary: Dictionary::new(),
|
||||
tables: HashMap::new(),
|
||||
tracker: memory_registry.register(),
|
||||
snapshot: Mutex::new(None),
|
||||
};
|
||||
chunk.tracker.set_bytes(chunk.size());
|
||||
chunk
|
||||
|
|
@ -164,6 +172,12 @@ impl Chunk {
|
|||
.context(TableWrite { table_name })?;
|
||||
}
|
||||
|
||||
// Invalidate chunk snapshot
|
||||
*self
|
||||
.snapshot
|
||||
.try_lock()
|
||||
.expect("concurrent readers/writers to MBChunk") = None;
|
||||
|
||||
self.tracker.set_bytes(self.size());
|
||||
|
||||
Ok(())
|
||||
|
|
@ -181,8 +195,15 @@ impl Chunk {
|
|||
|
||||
/// Returns a queryable snapshot of this chunk
|
||||
pub fn snapshot(&self) -> Arc<ChunkSnapshot> {
|
||||
// TODO: Cache this
|
||||
Arc::new(ChunkSnapshot::new(self))
|
||||
let mut guard = self.snapshot.lock();
|
||||
if let Some(snapshot) = &*guard {
|
||||
return Arc::clone(snapshot);
|
||||
}
|
||||
|
||||
// TODO: Incremental snapshot generation
|
||||
let snapshot = Arc::new(ChunkSnapshot::new(self));
|
||||
*guard = Some(Arc::clone(&snapshot));
|
||||
snapshot
|
||||
}
|
||||
|
||||
/// Return all the names of the tables names in this chunk that match
|
||||
|
|
@ -593,6 +614,31 @@ mod tests {
|
|||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_snapshot() {
|
||||
let mr = MemRegistry::new();
|
||||
let mut chunk = Chunk::new(1, &mr);
|
||||
|
||||
let lp = vec![
|
||||
"cpu,host=a val=23 1",
|
||||
"cpu,host=b val=2 1",
|
||||
"mem,host=a val=23432i 1",
|
||||
]
|
||||
.join("\n");
|
||||
|
||||
write_lp_to_chunk(&lp, &mut chunk).unwrap();
|
||||
let s1 = chunk.snapshot();
|
||||
let s2 = chunk.snapshot();
|
||||
|
||||
write_lp_to_chunk(&lp, &mut chunk).unwrap();
|
||||
let s3 = chunk.snapshot();
|
||||
let s4 = chunk.snapshot();
|
||||
|
||||
assert_eq!(Arc::as_ptr(&s1), Arc::as_ptr(&s2));
|
||||
assert_ne!(Arc::as_ptr(&s1), Arc::as_ptr(&s3));
|
||||
assert_eq!(Arc::as_ptr(&s3), Arc::as_ptr(&s4));
|
||||
}
|
||||
|
||||
fn assert_table(chunk: &Chunk, table: &str, data: &[&str]) {
|
||||
let mut batches = vec![];
|
||||
chunk
|
||||
|
|
|
|||
Loading…
Reference in New Issue