refactor: Reduce async in mutable buffer, use std::sync (#749)

* refactor: Reduce async in mutable buffer, use std::sync

* fix: logical confict with new code
pull/24376/head
Andrew Lamb 2021-02-05 06:47:40 -05:00 committed by GitHub
parent a06fdf3a5f
commit 3ec483b769
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 53 additions and 58 deletions

View File

@ -25,7 +25,7 @@ use crate::dictionary::Error as DictionaryError;
use async_trait::async_trait; use async_trait::async_trait;
use snafu::{ResultExt, Snafu}; use snafu::{ResultExt, Snafu};
use tokio::sync::RwLock; use std::sync::RwLock;
#[derive(Debug, Snafu)] #[derive(Debug, Snafu)]
pub enum Error { pub enum Error {
@ -141,15 +141,15 @@ impl MutableBufferDb {
} }
/// Directs the writes from batch into the appropriate partitions /// Directs the writes from batch into the appropriate partitions
async fn write_entries_to_partitions(&self, batch: &wal::WriteBufferBatch<'_>) -> Result<()> { fn write_entries_to_partitions(&self, batch: &wal::WriteBufferBatch<'_>) -> Result<()> {
if let Some(entries) = batch.entries() { if let Some(entries) = batch.entries() {
for entry in entries { for entry in entries {
let key = entry let key = entry
.partition_key() .partition_key()
.expect("partition key should have been inserted"); .expect("partition key should have been inserted");
let partition = self.get_partition(key).await; let partition = self.get_partition(key);
let mut partition = partition.write().await; let mut partition = partition.write().expect("mutex poisoned");
partition.write_entry(&entry)? partition.write_entry(&entry)?
} }
} }
@ -158,41 +158,43 @@ impl MutableBufferDb {
} }
/// Rolls over the active chunk in this partititon /// Rolls over the active chunk in this partititon
pub async fn rollover_partition(&self, partition_key: &str) -> Result<Arc<Chunk>> { pub fn rollover_partition(&self, partition_key: &str) -> Result<Arc<Chunk>> {
let partition = self.get_partition(partition_key).await; let partition = self.get_partition(partition_key);
let mut partition = partition.write().await; let mut partition = partition.write().expect("mutex poisoned");
Ok(partition.rollover_chunk()) Ok(partition.rollover_chunk())
} }
/// return the specified chunk from the partition /// return the specified chunk from the partition
/// Returns None if no such chunk exists. /// Returns None if no such chunk exists.
pub async fn get_chunk(&self, partition_key: &str, chunk_id: u32) -> Option<Arc<Chunk>> { pub fn get_chunk(&self, partition_key: &str, chunk_id: u32) -> Option<Arc<Chunk>> {
self.get_partition(partition_key) let partition = self.get_partition(partition_key);
.await let partition = partition.read().expect("mutex poisoned");
.read() partition.get_chunk(chunk_id).ok()
.await
.get_chunk(chunk_id)
.ok()
} }
/// drop the the specified chunk from the partition /// drop the the specified chunk from the partition
pub async fn drop_chunk(&self, partition_key: &str, chunk_id: u32) -> Result<Arc<Chunk>> { pub fn drop_chunk(&self, partition_key: &str, chunk_id: u32) -> Result<Arc<Chunk>> {
self.get_partition(partition_key) let partition = self.get_partition(partition_key);
.await let mut partition = partition.write().expect("mutex poisoned");
.write() partition
.await
.drop_chunk(chunk_id) .drop_chunk(chunk_id)
.context(DroppingChunk { partition_key }) .context(DroppingChunk { partition_key })
} }
/// The approximate size in memory of all data in the mutable buffer, in /// The approximate size in memory of all data in the mutable buffer, in
/// bytes /// bytes
pub async fn size(&self) -> usize { pub fn size(&self) -> usize {
let partitions: Vec<_> = { self.partitions.read().await.values().cloned().collect() }; let partitions = self
.partitions
.read()
.expect("lock poisoned")
.values()
.cloned()
.collect::<Vec<_>>();
let mut size = 0; let mut size = 0;
for p in partitions { for p in partitions {
size += p.read().await.size(); size += p.read().expect("lock poisoned").size();
} }
size size
@ -206,7 +208,7 @@ impl Database for MutableBufferDb {
async fn store_replicated_write(&self, write: &ReplicatedWrite) -> Result<(), Self::Error> { async fn store_replicated_write(&self, write: &ReplicatedWrite) -> Result<(), Self::Error> {
match write.write_buffer_batch() { match write.write_buffer_batch() {
Some(b) => self.write_entries_to_partitions(&b).await?, Some(b) => self.write_entries_to_partitions(&b)?,
None => { None => {
return MissingPayload { return MissingPayload {
writer: write.to_fb().writer(), writer: write.to_fb().writer(),
@ -225,11 +227,11 @@ impl Database for MutableBufferDb {
if has_exprs { if has_exprs {
let mut visitor = NamePredVisitor::new(); let mut visitor = NamePredVisitor::new();
self.accept(&mut filter, &mut visitor).await?; self.accept(&mut filter, &mut visitor)?;
Ok(visitor.plans.into()) Ok(visitor.plans.into())
} else { } else {
let mut visitor = NameVisitor::new(); let mut visitor = NameVisitor::new();
self.accept(&mut filter, &mut visitor).await?; self.accept(&mut filter, &mut visitor)?;
Ok(visitor.column_names.into()) Ok(visitor.column_names.into())
} }
} }
@ -239,7 +241,7 @@ impl Database for MutableBufferDb {
async fn field_column_names(&self, predicate: Predicate) -> Result<FieldListPlan, Self::Error> { async fn field_column_names(&self, predicate: Predicate) -> Result<FieldListPlan, Self::Error> {
let mut filter = ChunkTableFilter::new(predicate); let mut filter = ChunkTableFilter::new(predicate);
let mut visitor = TableFieldPredVisitor::new(); let mut visitor = TableFieldPredVisitor::new();
self.accept(&mut filter, &mut visitor).await?; self.accept(&mut filter, &mut visitor)?;
Ok(visitor.into_fieldlist_plan()) Ok(visitor.into_fieldlist_plan())
} }
@ -255,11 +257,11 @@ impl Database for MutableBufferDb {
if has_exprs { if has_exprs {
let mut visitor = ValuePredVisitor::new(column_name); let mut visitor = ValuePredVisitor::new(column_name);
self.accept(&mut filter, &mut visitor).await?; self.accept(&mut filter, &mut visitor)?;
Ok(visitor.plans.into()) Ok(visitor.plans.into())
} else { } else {
let mut visitor = ValueVisitor::new(column_name); let mut visitor = ValueVisitor::new(column_name);
self.accept(&mut filter, &mut visitor).await?; self.accept(&mut filter, &mut visitor)?;
Ok(visitor.column_values.into()) Ok(visitor.column_values.into())
} }
} }
@ -267,7 +269,7 @@ impl Database for MutableBufferDb {
async fn query_series(&self, predicate: Predicate) -> Result<SeriesSetPlans, Self::Error> { async fn query_series(&self, predicate: Predicate) -> Result<SeriesSetPlans, Self::Error> {
let mut filter = ChunkTableFilter::new(predicate); let mut filter = ChunkTableFilter::new(predicate);
let mut visitor = SeriesVisitor::new(); let mut visitor = SeriesVisitor::new();
self.accept(&mut filter, &mut visitor).await?; self.accept(&mut filter, &mut visitor)?;
Ok(visitor.plans.into()) Ok(visitor.plans.into())
} }
@ -284,12 +286,12 @@ impl Database for MutableBufferDb {
// can skip tables without those tags) // can skip tables without those tags)
let mut filter = filter.add_required_columns(&group_columns); let mut filter = filter.add_required_columns(&group_columns);
let mut visitor = GroupsVisitor::new(agg, group_columns); let mut visitor = GroupsVisitor::new(agg, group_columns);
self.accept(&mut filter, &mut visitor).await?; self.accept(&mut filter, &mut visitor)?;
Ok(visitor.plans.into()) Ok(visitor.plans.into())
} }
GroupByAndAggregate::Window { agg, every, offset } => { GroupByAndAggregate::Window { agg, every, offset } => {
let mut visitor = WindowGroupsVisitor::new(agg, every, offset); let mut visitor = WindowGroupsVisitor::new(agg, every, offset);
self.accept(&mut filter, &mut visitor).await?; self.accept(&mut filter, &mut visitor)?;
Ok(visitor.plans.into()) Ok(visitor.plans.into())
} }
} }
@ -297,7 +299,7 @@ impl Database for MutableBufferDb {
/// Return the partition keys for data in this DB /// Return the partition keys for data in this DB
async fn partition_keys(&self) -> Result<Vec<String>, Self::Error> { async fn partition_keys(&self) -> Result<Vec<String>, Self::Error> {
let partitions = self.partitions.read().await; let partitions = self.partitions.read().expect("mutex poisoned");
let keys = partitions.keys().cloned().collect(); let keys = partitions.keys().cloned().collect();
Ok(keys) Ok(keys)
} }
@ -305,11 +307,9 @@ impl Database for MutableBufferDb {
/// Return the list of chunks, in order of id, for the specified /// Return the list of chunks, in order of id, for the specified
/// partition_key /// partition_key
async fn chunks(&self, partition_key: &str) -> Vec<Arc<Chunk>> { async fn chunks(&self, partition_key: &str) -> Vec<Arc<Chunk>> {
self.get_partition(partition_key) let partition = self.get_partition(partition_key);
.await let partition = partition.read().expect("mutex poisoned");
.read() partition.chunks()
.await
.chunks()
} }
} }
@ -396,21 +396,23 @@ trait Visitor {
impl MutableBufferDb { impl MutableBufferDb {
/// returns the number of partitions in this database /// returns the number of partitions in this database
pub async fn len(&self) -> usize { pub fn len(&self) -> usize {
self.partitions.read().await.len() let partitions = self.partitions.read().expect("mutex poisoned");
partitions.len()
} }
/// returns true if the database has no partititons /// returns true if the database has no partititons
pub async fn is_empty(&self) -> bool { pub fn is_empty(&self) -> bool {
self.partitions.read().await.is_empty() let partitions = self.partitions.read().expect("mutex poisoned");
partitions.is_empty()
} }
/// Retrieve (or create) the partition for the specified partition key /// Retrieve (or create) the partition for the specified partition key
async fn get_partition(&self, partition_key: &str) -> Arc<RwLock<Partition>> { fn get_partition(&self, partition_key: &str) -> Arc<RwLock<Partition>> {
// until we think this code is likely to be a contention hot // until we think this code is likely to be a contention hot
// spot, simply use a write lock even when often a read lock // spot, simply use a write lock even when often a read lock
// would do. // would do.
let mut partitions = self.partitions.write().await; let mut partitions = self.partitions.write().expect("mutex poisoned");
if let Some(partition) = partitions.get(partition_key) { if let Some(partition) = partitions.get(partition_key) {
partition.clone() partition.clone()
@ -429,8 +431,8 @@ impl MutableBufferDb {
/// after this returns, new partitions can be added, and some /// after this returns, new partitions can be added, and some
/// partitions in the snapshot could be dropped from the overall /// partitions in the snapshot could be dropped from the overall
/// database /// database
async fn partition_snapshot(&self) -> Vec<Arc<RwLock<Partition>>> { fn partition_snapshot(&self) -> Vec<Arc<RwLock<Partition>>> {
let partitions = self.partitions.read().await; let partitions = self.partitions.read().expect("mutex poisoned");
partitions.values().cloned().collect() partitions.values().cloned().collect()
} }
@ -440,13 +442,9 @@ impl MutableBufferDb {
/// ///
/// Skips visiting any table or columns of `filter.should_visit_table` /// Skips visiting any table or columns of `filter.should_visit_table`
/// returns false /// returns false
async fn accept<V: Visitor>( fn accept<V: Visitor>(&self, filter: &mut ChunkTableFilter, visitor: &mut V) -> Result<()> {
&self, for partition in self.partition_snapshot().into_iter() {
filter: &mut ChunkTableFilter, let partition = partition.read().expect("mutex poisoned");
visitor: &mut V,
) -> Result<()> {
for partition in self.partition_snapshot().await.into_iter() {
let partition = partition.read().await;
if filter.should_visit_partition(&partition)? { if filter.should_visit_partition(&partition)? {
for chunk in partition.iter() { for chunk in partition.iter() {
@ -977,7 +975,7 @@ mod tests {
let partition_key = "1970-01-01T00"; let partition_key = "1970-01-01T00";
let chunk = db.get_chunk(partition_key, 0).await.unwrap(); let chunk = db.get_chunk(partition_key, 0).unwrap();
let mut batches = Vec::new(); let mut batches = Vec::new();
let selection = Selection::Some(&["region", "core"]); let selection = Selection::Some(&["region", "core"]);
chunk chunk
@ -1568,7 +1566,7 @@ mod tests {
write_lines(&db, &lines).await; write_lines(&db, &lines).await;
// ensure there are 2 chunks // ensure there are 2 chunks
assert_eq!(db.len().await, 2); assert_eq!(db.len(), 2);
// setup to run the execution plan ( // setup to run the execution plan (
let executor = Executor::default(); let executor = Executor::default();
@ -1703,7 +1701,7 @@ mod tests {
let lines: Vec<_> = parse_lines(&lp_data).map(|l| l.unwrap()).collect(); let lines: Vec<_> = parse_lines(&lp_data).map(|l| l.unwrap()).collect();
write_lines(&db, &lines).await; write_lines(&db, &lines).await;
assert_eq!(429, db.size().await); assert_eq!(429, db.size());
} }
/// Run the plan and gather the results in a order that can be compared /// Run the plan and gather the results in a order that can be compared

View File

@ -119,7 +119,6 @@ impl Db {
if let Some(local_store) = self.mutable_buffer.as_ref() { if let Some(local_store) = self.mutable_buffer.as_ref() {
local_store local_store
.rollover_partition(partition_key) .rollover_partition(partition_key)
.await
.context(RollingPartition) .context(RollingPartition)
.map(DBChunk::new_mb) .map(DBChunk::new_mb)
} else { } else {
@ -165,7 +164,6 @@ impl Db {
.as_ref() .as_ref()
.context(DatatbaseNotWriteable)? .context(DatatbaseNotWriteable)?
.drop_chunk(partition_key, chunk_id) .drop_chunk(partition_key, chunk_id)
.await
.map(DBChunk::new_mb) .map(DBChunk::new_mb)
.context(MutableBufferDrop) .context(MutableBufferDrop)
} }
@ -213,7 +211,6 @@ impl Db {
.as_ref() .as_ref()
.context(DatatbaseNotWriteable)? .context(DatatbaseNotWriteable)?
.get_chunk(partition_key, chunk_id) .get_chunk(partition_key, chunk_id)
.await
.context(UnknownMutableBufferChunk { chunk_id })?; .context(UnknownMutableBufferChunk { chunk_id })?;
let mut batches = Vec::new(); let mut batches = Vec::new();