From 3ec483b769759f92dac7a7ec6e5c6bf763e110f3 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 5 Feb 2021 06:47:40 -0500 Subject: [PATCH] refactor: Reduce async in mutable buffer, use std::sync (#749) * refactor: Reduce async in mutable buffer, use std::sync * fix: logical confict with new code --- mutable_buffer/src/database.rs | 108 ++++++++++++++++----------------- server/src/db.rs | 3 - 2 files changed, 53 insertions(+), 58 deletions(-) diff --git a/mutable_buffer/src/database.rs b/mutable_buffer/src/database.rs index 6740d16475..d11c6df8e0 100644 --- a/mutable_buffer/src/database.rs +++ b/mutable_buffer/src/database.rs @@ -25,7 +25,7 @@ use crate::dictionary::Error as DictionaryError; use async_trait::async_trait; use snafu::{ResultExt, Snafu}; -use tokio::sync::RwLock; +use std::sync::RwLock; #[derive(Debug, Snafu)] pub enum Error { @@ -141,15 +141,15 @@ impl MutableBufferDb { } /// Directs the writes from batch into the appropriate partitions - async fn write_entries_to_partitions(&self, batch: &wal::WriteBufferBatch<'_>) -> Result<()> { + fn write_entries_to_partitions(&self, batch: &wal::WriteBufferBatch<'_>) -> Result<()> { if let Some(entries) = batch.entries() { for entry in entries { let key = entry .partition_key() .expect("partition key should have been inserted"); - let partition = self.get_partition(key).await; - let mut partition = partition.write().await; + let partition = self.get_partition(key); + let mut partition = partition.write().expect("mutex poisoned"); partition.write_entry(&entry)? } } @@ -158,41 +158,43 @@ impl MutableBufferDb { } /// Rolls over the active chunk in this partititon - pub async fn rollover_partition(&self, partition_key: &str) -> Result> { - let partition = self.get_partition(partition_key).await; - let mut partition = partition.write().await; + pub fn rollover_partition(&self, partition_key: &str) -> Result> { + let partition = self.get_partition(partition_key); + let mut partition = partition.write().expect("mutex poisoned"); Ok(partition.rollover_chunk()) } /// return the specified chunk from the partition /// Returns None if no such chunk exists. - pub async fn get_chunk(&self, partition_key: &str, chunk_id: u32) -> Option> { - self.get_partition(partition_key) - .await - .read() - .await - .get_chunk(chunk_id) - .ok() + pub fn get_chunk(&self, partition_key: &str, chunk_id: u32) -> Option> { + let partition = self.get_partition(partition_key); + let partition = partition.read().expect("mutex poisoned"); + partition.get_chunk(chunk_id).ok() } /// drop the the specified chunk from the partition - pub async fn drop_chunk(&self, partition_key: &str, chunk_id: u32) -> Result> { - self.get_partition(partition_key) - .await - .write() - .await + pub fn drop_chunk(&self, partition_key: &str, chunk_id: u32) -> Result> { + let partition = self.get_partition(partition_key); + let mut partition = partition.write().expect("mutex poisoned"); + partition .drop_chunk(chunk_id) .context(DroppingChunk { partition_key }) } /// The approximate size in memory of all data in the mutable buffer, in /// bytes - pub async fn size(&self) -> usize { - let partitions: Vec<_> = { self.partitions.read().await.values().cloned().collect() }; + pub fn size(&self) -> usize { + let partitions = self + .partitions + .read() + .expect("lock poisoned") + .values() + .cloned() + .collect::>(); let mut size = 0; for p in partitions { - size += p.read().await.size(); + size += p.read().expect("lock poisoned").size(); } size @@ -206,7 +208,7 @@ impl Database for MutableBufferDb { async fn store_replicated_write(&self, write: &ReplicatedWrite) -> Result<(), Self::Error> { match write.write_buffer_batch() { - Some(b) => self.write_entries_to_partitions(&b).await?, + Some(b) => self.write_entries_to_partitions(&b)?, None => { return MissingPayload { writer: write.to_fb().writer(), @@ -225,11 +227,11 @@ impl Database for MutableBufferDb { if has_exprs { let mut visitor = NamePredVisitor::new(); - self.accept(&mut filter, &mut visitor).await?; + self.accept(&mut filter, &mut visitor)?; Ok(visitor.plans.into()) } else { let mut visitor = NameVisitor::new(); - self.accept(&mut filter, &mut visitor).await?; + self.accept(&mut filter, &mut visitor)?; Ok(visitor.column_names.into()) } } @@ -239,7 +241,7 @@ impl Database for MutableBufferDb { async fn field_column_names(&self, predicate: Predicate) -> Result { let mut filter = ChunkTableFilter::new(predicate); let mut visitor = TableFieldPredVisitor::new(); - self.accept(&mut filter, &mut visitor).await?; + self.accept(&mut filter, &mut visitor)?; Ok(visitor.into_fieldlist_plan()) } @@ -255,11 +257,11 @@ impl Database for MutableBufferDb { if has_exprs { let mut visitor = ValuePredVisitor::new(column_name); - self.accept(&mut filter, &mut visitor).await?; + self.accept(&mut filter, &mut visitor)?; Ok(visitor.plans.into()) } else { let mut visitor = ValueVisitor::new(column_name); - self.accept(&mut filter, &mut visitor).await?; + self.accept(&mut filter, &mut visitor)?; Ok(visitor.column_values.into()) } } @@ -267,7 +269,7 @@ impl Database for MutableBufferDb { async fn query_series(&self, predicate: Predicate) -> Result { let mut filter = ChunkTableFilter::new(predicate); let mut visitor = SeriesVisitor::new(); - self.accept(&mut filter, &mut visitor).await?; + self.accept(&mut filter, &mut visitor)?; Ok(visitor.plans.into()) } @@ -284,12 +286,12 @@ impl Database for MutableBufferDb { // can skip tables without those tags) let mut filter = filter.add_required_columns(&group_columns); let mut visitor = GroupsVisitor::new(agg, group_columns); - self.accept(&mut filter, &mut visitor).await?; + self.accept(&mut filter, &mut visitor)?; Ok(visitor.plans.into()) } GroupByAndAggregate::Window { agg, every, offset } => { let mut visitor = WindowGroupsVisitor::new(agg, every, offset); - self.accept(&mut filter, &mut visitor).await?; + self.accept(&mut filter, &mut visitor)?; Ok(visitor.plans.into()) } } @@ -297,7 +299,7 @@ impl Database for MutableBufferDb { /// Return the partition keys for data in this DB async fn partition_keys(&self) -> Result, Self::Error> { - let partitions = self.partitions.read().await; + let partitions = self.partitions.read().expect("mutex poisoned"); let keys = partitions.keys().cloned().collect(); Ok(keys) } @@ -305,11 +307,9 @@ impl Database for MutableBufferDb { /// Return the list of chunks, in order of id, for the specified /// partition_key async fn chunks(&self, partition_key: &str) -> Vec> { - self.get_partition(partition_key) - .await - .read() - .await - .chunks() + let partition = self.get_partition(partition_key); + let partition = partition.read().expect("mutex poisoned"); + partition.chunks() } } @@ -396,21 +396,23 @@ trait Visitor { impl MutableBufferDb { /// returns the number of partitions in this database - pub async fn len(&self) -> usize { - self.partitions.read().await.len() + pub fn len(&self) -> usize { + let partitions = self.partitions.read().expect("mutex poisoned"); + partitions.len() } /// returns true if the database has no partititons - pub async fn is_empty(&self) -> bool { - self.partitions.read().await.is_empty() + pub fn is_empty(&self) -> bool { + let partitions = self.partitions.read().expect("mutex poisoned"); + partitions.is_empty() } /// Retrieve (or create) the partition for the specified partition key - async fn get_partition(&self, partition_key: &str) -> Arc> { + fn get_partition(&self, partition_key: &str) -> Arc> { // until we think this code is likely to be a contention hot // spot, simply use a write lock even when often a read lock // would do. - let mut partitions = self.partitions.write().await; + let mut partitions = self.partitions.write().expect("mutex poisoned"); if let Some(partition) = partitions.get(partition_key) { partition.clone() @@ -429,8 +431,8 @@ impl MutableBufferDb { /// after this returns, new partitions can be added, and some /// partitions in the snapshot could be dropped from the overall /// database - async fn partition_snapshot(&self) -> Vec>> { - let partitions = self.partitions.read().await; + fn partition_snapshot(&self) -> Vec>> { + let partitions = self.partitions.read().expect("mutex poisoned"); partitions.values().cloned().collect() } @@ -440,13 +442,9 @@ impl MutableBufferDb { /// /// Skips visiting any table or columns of `filter.should_visit_table` /// returns false - async fn accept( - &self, - filter: &mut ChunkTableFilter, - visitor: &mut V, - ) -> Result<()> { - for partition in self.partition_snapshot().await.into_iter() { - let partition = partition.read().await; + fn accept(&self, filter: &mut ChunkTableFilter, visitor: &mut V) -> Result<()> { + for partition in self.partition_snapshot().into_iter() { + let partition = partition.read().expect("mutex poisoned"); if filter.should_visit_partition(&partition)? { for chunk in partition.iter() { @@ -977,7 +975,7 @@ mod tests { let partition_key = "1970-01-01T00"; - let chunk = db.get_chunk(partition_key, 0).await.unwrap(); + let chunk = db.get_chunk(partition_key, 0).unwrap(); let mut batches = Vec::new(); let selection = Selection::Some(&["region", "core"]); chunk @@ -1568,7 +1566,7 @@ mod tests { write_lines(&db, &lines).await; // ensure there are 2 chunks - assert_eq!(db.len().await, 2); + assert_eq!(db.len(), 2); // setup to run the execution plan ( let executor = Executor::default(); @@ -1703,7 +1701,7 @@ mod tests { let lines: Vec<_> = parse_lines(&lp_data).map(|l| l.unwrap()).collect(); write_lines(&db, &lines).await; - assert_eq!(429, db.size().await); + assert_eq!(429, db.size()); } /// Run the plan and gather the results in a order that can be compared diff --git a/server/src/db.rs b/server/src/db.rs index 2407d64472..79eda5f9b4 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -119,7 +119,6 @@ impl Db { if let Some(local_store) = self.mutable_buffer.as_ref() { local_store .rollover_partition(partition_key) - .await .context(RollingPartition) .map(DBChunk::new_mb) } else { @@ -165,7 +164,6 @@ impl Db { .as_ref() .context(DatatbaseNotWriteable)? .drop_chunk(partition_key, chunk_id) - .await .map(DBChunk::new_mb) .context(MutableBufferDrop) } @@ -213,7 +211,6 @@ impl Db { .as_ref() .context(DatatbaseNotWriteable)? .get_chunk(partition_key, chunk_id) - .await .context(UnknownMutableBufferChunk { chunk_id })?; let mut batches = Vec::new();