feat: remove RwLock on entire ReadBuffer (#761)
parent
74b8e8ec7d
commit
8399c56587
|
@ -14,7 +14,7 @@ const ONE_MS: i64 = 1_000_000;
|
||||||
|
|
||||||
fn table_names(c: &mut Criterion) {
|
fn table_names(c: &mut Criterion) {
|
||||||
let rb = generate_row_group(500_000);
|
let rb = generate_row_group(500_000);
|
||||||
let mut db = Database::new();
|
let db = Database::new();
|
||||||
db.upsert_partition("hour_1", 0, "table_a", rb.clone());
|
db.upsert_partition("hour_1", 0, "table_a", rb.clone());
|
||||||
|
|
||||||
// no predicate - return all the tables
|
// no predicate - return all the tables
|
||||||
|
|
|
@ -103,7 +103,7 @@ impl Database {
|
||||||
/// chunk. If the `Table` or `Chunk` does not exist they will be created,
|
/// chunk. If the `Table` or `Chunk` does not exist they will be created,
|
||||||
/// otherwise relevant structures will be updated.
|
/// otherwise relevant structures will be updated.
|
||||||
pub fn upsert_partition(
|
pub fn upsert_partition(
|
||||||
&mut self,
|
&self,
|
||||||
partition_key: &str,
|
partition_key: &str,
|
||||||
chunk_id: u32,
|
chunk_id: u32,
|
||||||
table_name: &str,
|
table_name: &str,
|
||||||
|
@ -153,7 +153,7 @@ impl Database {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Remove all row groups and tables for the specified chunks and partition.
|
/// Remove all row groups and tables for the specified chunks and partition.
|
||||||
pub fn drop_chunk(&mut self, partition_key: &str, chunk_id: u32) -> Result<()> {
|
pub fn drop_chunk(&self, partition_key: &str, chunk_id: u32) -> Result<()> {
|
||||||
let mut partition_data = self.data.write().unwrap();
|
let mut partition_data = self.data.write().unwrap();
|
||||||
|
|
||||||
let partition = partition_data
|
let partition = partition_data
|
||||||
|
@ -1023,7 +1023,7 @@ mod test {
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn table_names() {
|
fn table_names() {
|
||||||
let mut db = Database::new();
|
let db = Database::new();
|
||||||
let res_col = TABLE_NAMES_COLUMN_NAME;
|
let res_col = TABLE_NAMES_COLUMN_NAME;
|
||||||
|
|
||||||
db.upsert_partition("hour_1", 22, "Coolverine", gen_recordbatch());
|
db.upsert_partition("hour_1", 22, "Coolverine", gen_recordbatch());
|
||||||
|
@ -1070,7 +1070,7 @@ mod test {
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn column_names() {
|
fn column_names() {
|
||||||
let mut db = Database::new();
|
let db = Database::new();
|
||||||
let res_col = COLUMN_NAMES_COLUMN_NAME;
|
let res_col = COLUMN_NAMES_COLUMN_NAME;
|
||||||
|
|
||||||
let schema = SchemaBuilder::new()
|
let schema = SchemaBuilder::new()
|
||||||
|
@ -1190,7 +1190,7 @@ mod test {
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn read_filter_single_chunk() {
|
fn read_filter_single_chunk() {
|
||||||
let mut db = Database::new();
|
let db = Database::new();
|
||||||
|
|
||||||
// Add a bunch of row groups to a single table in a single chunk
|
// Add a bunch of row groups to a single table in a single chunk
|
||||||
for &i in &[100, 200, 300] {
|
for &i in &[100, 200, 300] {
|
||||||
|
@ -1272,7 +1272,7 @@ mod test {
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn read_filter_multiple_chunks() {
|
fn read_filter_multiple_chunks() {
|
||||||
let mut db = Database::new();
|
let db = Database::new();
|
||||||
|
|
||||||
// Add a bunch of row groups to a single table across multiple chunks
|
// Add a bunch of row groups to a single table across multiple chunks
|
||||||
for &i in &[100, 200, 300] {
|
for &i in &[100, 200, 300] {
|
||||||
|
@ -1342,7 +1342,7 @@ mod test {
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn read_aggregate() {
|
fn read_aggregate() {
|
||||||
let mut db = Database::new();
|
let db = Database::new();
|
||||||
|
|
||||||
// Add a bunch of row groups to a single table in a single chunks
|
// Add a bunch of row groups to a single table in a single chunks
|
||||||
for &i in &[100, 200, 300] {
|
for &i in &[100, 200, 300] {
|
||||||
|
|
|
@ -5,7 +5,7 @@ use std::{
|
||||||
collections::BTreeMap,
|
collections::BTreeMap,
|
||||||
sync::{
|
sync::{
|
||||||
atomic::{AtomicU64, Ordering},
|
atomic::{AtomicU64, Ordering},
|
||||||
Arc, Mutex, RwLock,
|
Arc, Mutex,
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -83,9 +83,7 @@ pub struct Db {
|
||||||
#[serde(skip)]
|
#[serde(skip)]
|
||||||
/// The read buffer holds chunk data in an in-memory optimized
|
/// The read buffer holds chunk data in an in-memory optimized
|
||||||
/// format.
|
/// format.
|
||||||
///
|
pub read_buffer: Arc<ReadBufferDb>,
|
||||||
/// TODO: finer grained locking see ticket https://github.com/influxdata/influxdb_iox/issues/669
|
|
||||||
pub read_buffer: Arc<RwLock<ReadBufferDb>>,
|
|
||||||
|
|
||||||
#[serde(skip)]
|
#[serde(skip)]
|
||||||
/// The wal buffer holds replicated writes in an append in-memory
|
/// The wal buffer holds replicated writes in an append in-memory
|
||||||
|
@ -104,7 +102,7 @@ impl Db {
|
||||||
wal_buffer: Option<Buffer>,
|
wal_buffer: Option<Buffer>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
let wal_buffer = wal_buffer.map(Mutex::new);
|
let wal_buffer = wal_buffer.map(Mutex::new);
|
||||||
let read_buffer = Arc::new(RwLock::new(read_buffer));
|
let read_buffer = Arc::new(read_buffer);
|
||||||
Self {
|
Self {
|
||||||
rules,
|
rules,
|
||||||
mutable_buffer,
|
mutable_buffer,
|
||||||
|
@ -145,8 +143,6 @@ impl Db {
|
||||||
/// List chunks that are currently in the read buffer
|
/// List chunks that are currently in the read buffer
|
||||||
pub async fn read_buffer_chunks(&self, partition_key: &str) -> Vec<Arc<DBChunk>> {
|
pub async fn read_buffer_chunks(&self, partition_key: &str) -> Vec<Arc<DBChunk>> {
|
||||||
self.read_buffer
|
self.read_buffer
|
||||||
.read()
|
|
||||||
.expect("mutex poisoned")
|
|
||||||
.chunk_ids(partition_key)
|
.chunk_ids(partition_key)
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|chunk_id| DBChunk::new_rb(self.read_buffer.clone(), partition_key, chunk_id))
|
.map(|chunk_id| DBChunk::new_rb(self.read_buffer.clone(), partition_key, chunk_id))
|
||||||
|
@ -176,8 +172,6 @@ impl Db {
|
||||||
chunk_id: u32,
|
chunk_id: u32,
|
||||||
) -> Result<Arc<DBChunk>> {
|
) -> Result<Arc<DBChunk>> {
|
||||||
self.read_buffer
|
self.read_buffer
|
||||||
.write()
|
|
||||||
.expect("mutex poisoned")
|
|
||||||
.drop_chunk(partition_key, chunk_id)
|
.drop_chunk(partition_key, chunk_id)
|
||||||
.context(ReadBufferDrop)?;
|
.context(ReadBufferDrop)?;
|
||||||
|
|
||||||
|
@ -222,8 +216,8 @@ impl Db {
|
||||||
// As implemented now, taking this write lock will wait
|
// As implemented now, taking this write lock will wait
|
||||||
// until all reads to the read buffer to complete and
|
// until all reads to the read buffer to complete and
|
||||||
// then will block all reads while the insert is occuring
|
// then will block all reads while the insert is occuring
|
||||||
let mut read_buffer = self.read_buffer.write().expect("mutex poisoned");
|
self.read_buffer
|
||||||
read_buffer.upsert_partition(partition_key, mb_chunk.id(), &stats.name, batch)
|
.upsert_partition(partition_key, mb_chunk.id(), &stats.name, batch)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -8,7 +8,7 @@ use query::{predicate::Predicate, util::make_scan_plan, PartitionChunk};
|
||||||
use read_buffer::Database as ReadBufferDb;
|
use read_buffer::Database as ReadBufferDb;
|
||||||
use snafu::{ResultExt, Snafu};
|
use snafu::{ResultExt, Snafu};
|
||||||
|
|
||||||
use std::sync::{Arc, RwLock};
|
use std::sync::Arc;
|
||||||
|
|
||||||
use super::{
|
use super::{
|
||||||
pred::to_read_buffer_predicate,
|
pred::to_read_buffer_predicate,
|
||||||
|
@ -53,7 +53,7 @@ pub enum DBChunk {
|
||||||
chunk: Arc<MBChunk>,
|
chunk: Arc<MBChunk>,
|
||||||
},
|
},
|
||||||
ReadBuffer {
|
ReadBuffer {
|
||||||
db: Arc<RwLock<ReadBufferDb>>,
|
db: Arc<ReadBufferDb>,
|
||||||
partition_key: String,
|
partition_key: String,
|
||||||
chunk_id: u32,
|
chunk_id: u32,
|
||||||
},
|
},
|
||||||
|
@ -68,7 +68,7 @@ impl DBChunk {
|
||||||
|
|
||||||
/// create a new read buffer chunk
|
/// create a new read buffer chunk
|
||||||
pub fn new_rb(
|
pub fn new_rb(
|
||||||
db: Arc<RwLock<ReadBufferDb>>,
|
db: Arc<ReadBufferDb>,
|
||||||
partition_key: impl Into<String>,
|
partition_key: impl Into<String>,
|
||||||
chunk_id: u32,
|
chunk_id: u32,
|
||||||
) -> Arc<Self> {
|
) -> Arc<Self> {
|
||||||
|
@ -135,7 +135,6 @@ impl PartitionChunk for DBChunk {
|
||||||
let rb_predicate =
|
let rb_predicate =
|
||||||
to_read_buffer_predicate(&predicate).context(InternalPredicateConversion)?;
|
to_read_buffer_predicate(&predicate).context(InternalPredicateConversion)?;
|
||||||
|
|
||||||
let db = db.read().unwrap();
|
|
||||||
let batch = db
|
let batch = db
|
||||||
.table_names(partition_key, &[chunk_id], rb_predicate)
|
.table_names(partition_key, &[chunk_id], rb_predicate)
|
||||||
.context(ReadBufferChunk { chunk_id })?;
|
.context(ReadBufferChunk { chunk_id })?;
|
||||||
|
@ -162,7 +161,6 @@ impl PartitionChunk for DBChunk {
|
||||||
chunk_id,
|
chunk_id,
|
||||||
} => {
|
} => {
|
||||||
let chunk_id = *chunk_id;
|
let chunk_id = *chunk_id;
|
||||||
let db = db.read().unwrap();
|
|
||||||
|
|
||||||
// TODO: Andrew -- I think technically this reordering
|
// TODO: Andrew -- I think technically this reordering
|
||||||
// should be happening inside the read buffer, but
|
// should be happening inside the read buffer, but
|
||||||
|
@ -204,7 +202,6 @@ impl PartitionChunk for DBChunk {
|
||||||
chunk_id,
|
chunk_id,
|
||||||
} => {
|
} => {
|
||||||
let chunk_id = *chunk_id;
|
let chunk_id = *chunk_id;
|
||||||
let db = db.read().unwrap();
|
|
||||||
db.has_table(partition_key, table_name, &[chunk_id])
|
db.has_table(partition_key, table_name, &[chunk_id])
|
||||||
}
|
}
|
||||||
DBChunk::ParquetFile => {
|
DBChunk::ParquetFile => {
|
||||||
|
@ -243,7 +240,6 @@ impl PartitionChunk for DBChunk {
|
||||||
let chunk_id = *chunk_id;
|
let chunk_id = *chunk_id;
|
||||||
let rb_predicate =
|
let rb_predicate =
|
||||||
to_read_buffer_predicate(&predicate).context(InternalPredicateConversion)?;
|
to_read_buffer_predicate(&predicate).context(InternalPredicateConversion)?;
|
||||||
let db = db.read().expect("mutex poisoned");
|
|
||||||
let read_results = db
|
let read_results = db
|
||||||
.read_filter(
|
.read_filter(
|
||||||
partition_key,
|
partition_key,
|
||||||
|
|
Loading…
Reference in New Issue