From 8399c565870eab61cee149f7e59ecfc9230f0502 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 5 Feb 2021 16:58:17 -0500 Subject: [PATCH] feat: remove RwLock on entire ReadBuffer (#761) --- read_buffer/benches/database.rs | 2 +- read_buffer/src/lib.rs | 14 +++++++------- server/src/db.rs | 16 +++++----------- server/src/db/chunk.rs | 10 +++------- 4 files changed, 16 insertions(+), 26 deletions(-) diff --git a/read_buffer/benches/database.rs b/read_buffer/benches/database.rs index 778c9e5ec0..01e218ae63 100644 --- a/read_buffer/benches/database.rs +++ b/read_buffer/benches/database.rs @@ -14,7 +14,7 @@ const ONE_MS: i64 = 1_000_000; fn table_names(c: &mut Criterion) { let rb = generate_row_group(500_000); - let mut db = Database::new(); + let db = Database::new(); db.upsert_partition("hour_1", 0, "table_a", rb.clone()); // no predicate - return all the tables diff --git a/read_buffer/src/lib.rs b/read_buffer/src/lib.rs index 7e006e304b..70012e8eeb 100644 --- a/read_buffer/src/lib.rs +++ b/read_buffer/src/lib.rs @@ -103,7 +103,7 @@ impl Database { /// chunk. If the `Table` or `Chunk` does not exist they will be created, /// otherwise relevant structures will be updated. pub fn upsert_partition( - &mut self, + &self, partition_key: &str, chunk_id: u32, table_name: &str, @@ -153,7 +153,7 @@ impl Database { } /// Remove all row groups and tables for the specified chunks and partition. - pub fn drop_chunk(&mut self, partition_key: &str, chunk_id: u32) -> Result<()> { + pub fn drop_chunk(&self, partition_key: &str, chunk_id: u32) -> Result<()> { let mut partition_data = self.data.write().unwrap(); let partition = partition_data @@ -1023,7 +1023,7 @@ mod test { #[test] fn table_names() { - let mut db = Database::new(); + let db = Database::new(); let res_col = TABLE_NAMES_COLUMN_NAME; db.upsert_partition("hour_1", 22, "Coolverine", gen_recordbatch()); @@ -1070,7 +1070,7 @@ mod test { #[test] fn column_names() { - let mut db = Database::new(); + let db = Database::new(); let res_col = COLUMN_NAMES_COLUMN_NAME; let schema = SchemaBuilder::new() @@ -1190,7 +1190,7 @@ mod test { #[test] fn read_filter_single_chunk() { - let mut db = Database::new(); + let db = Database::new(); // Add a bunch of row groups to a single table in a single chunk for &i in &[100, 200, 300] { @@ -1272,7 +1272,7 @@ mod test { #[test] fn read_filter_multiple_chunks() { - let mut db = Database::new(); + let db = Database::new(); // Add a bunch of row groups to a single table across multiple chunks for &i in &[100, 200, 300] { @@ -1342,7 +1342,7 @@ mod test { #[test] fn read_aggregate() { - let mut db = Database::new(); + let db = Database::new(); // Add a bunch of row groups to a single table in a single chunks for &i in &[100, 200, 300] { diff --git a/server/src/db.rs b/server/src/db.rs index 0817b21017..3282c489a3 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -5,7 +5,7 @@ use std::{ collections::BTreeMap, sync::{ atomic::{AtomicU64, Ordering}, - Arc, Mutex, RwLock, + Arc, Mutex, }, }; @@ -83,9 +83,7 @@ pub struct Db { #[serde(skip)] /// The read buffer holds chunk data in an in-memory optimized /// format. - /// - /// TODO: finer grained locking see ticket https://github.com/influxdata/influxdb_iox/issues/669 - pub read_buffer: Arc>, + pub read_buffer: Arc, #[serde(skip)] /// The wal buffer holds replicated writes in an append in-memory @@ -104,7 +102,7 @@ impl Db { wal_buffer: Option, ) -> Self { let wal_buffer = wal_buffer.map(Mutex::new); - let read_buffer = Arc::new(RwLock::new(read_buffer)); + let read_buffer = Arc::new(read_buffer); Self { rules, mutable_buffer, @@ -145,8 +143,6 @@ impl Db { /// List chunks that are currently in the read buffer pub async fn read_buffer_chunks(&self, partition_key: &str) -> Vec> { self.read_buffer - .read() - .expect("mutex poisoned") .chunk_ids(partition_key) .into_iter() .map(|chunk_id| DBChunk::new_rb(self.read_buffer.clone(), partition_key, chunk_id)) @@ -176,8 +172,6 @@ impl Db { chunk_id: u32, ) -> Result> { self.read_buffer - .write() - .expect("mutex poisoned") .drop_chunk(partition_key, chunk_id) .context(ReadBufferDrop)?; @@ -222,8 +216,8 @@ impl Db { // As implemented now, taking this write lock will wait // until all reads to the read buffer to complete and // then will block all reads while the insert is occuring - let mut read_buffer = self.read_buffer.write().expect("mutex poisoned"); - read_buffer.upsert_partition(partition_key, mb_chunk.id(), &stats.name, batch) + self.read_buffer + .upsert_partition(partition_key, mb_chunk.id(), &stats.name, batch) } } diff --git a/server/src/db/chunk.rs b/server/src/db/chunk.rs index eb9f8d3e07..3b2712145a 100644 --- a/server/src/db/chunk.rs +++ b/server/src/db/chunk.rs @@ -8,7 +8,7 @@ use query::{predicate::Predicate, util::make_scan_plan, PartitionChunk}; use read_buffer::Database as ReadBufferDb; use snafu::{ResultExt, Snafu}; -use std::sync::{Arc, RwLock}; +use std::sync::Arc; use super::{ pred::to_read_buffer_predicate, @@ -53,7 +53,7 @@ pub enum DBChunk { chunk: Arc, }, ReadBuffer { - db: Arc>, + db: Arc, partition_key: String, chunk_id: u32, }, @@ -68,7 +68,7 @@ impl DBChunk { /// create a new read buffer chunk pub fn new_rb( - db: Arc>, + db: Arc, partition_key: impl Into, chunk_id: u32, ) -> Arc { @@ -135,7 +135,6 @@ impl PartitionChunk for DBChunk { let rb_predicate = to_read_buffer_predicate(&predicate).context(InternalPredicateConversion)?; - let db = db.read().unwrap(); let batch = db .table_names(partition_key, &[chunk_id], rb_predicate) .context(ReadBufferChunk { chunk_id })?; @@ -162,7 +161,6 @@ impl PartitionChunk for DBChunk { chunk_id, } => { let chunk_id = *chunk_id; - let db = db.read().unwrap(); // TODO: Andrew -- I think technically this reordering // should be happening inside the read buffer, but @@ -204,7 +202,6 @@ impl PartitionChunk for DBChunk { chunk_id, } => { let chunk_id = *chunk_id; - let db = db.read().unwrap(); db.has_table(partition_key, table_name, &[chunk_id]) } DBChunk::ParquetFile => { @@ -243,7 +240,6 @@ impl PartitionChunk for DBChunk { let chunk_id = *chunk_id; let rb_predicate = to_read_buffer_predicate(&predicate).context(InternalPredicateConversion)?; - let db = db.read().expect("mutex poisoned"); let read_results = db .read_filter( partition_key,