Merge pull request #1406 from influxdata/cn/hook-up-write-buffer

feat: Actually route SequencedEntry to the Write Buffer, if present
pull/24376/head
kodiakhq[bot] 2021-05-05 15:15:58 +00:00 committed by GitHub
commit 8dba20d9db
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 203 additions and 133 deletions

View File

@ -1202,7 +1202,7 @@ enum InnerClockValueError {
ValueMayNotBeZero,
}
pub trait SequencedEntry {
pub trait SequencedEntry: Send + Sync + std::fmt::Debug {
fn partition_writes(&self) -> Option<Vec<PartitionWrite<'_>>>;
fn fb(&self) -> &entry_fb::SequencedEntry<'_>;
@ -1392,7 +1392,7 @@ impl Segment {
segment_id: u64,
server_id: ServerId,
clock_value: Option<ClockValue>,
entries: &[Arc<OwnedSequencedEntry>],
entries: &[Arc<dyn SequencedEntry>],
) -> Self {
let mut fbb = FlatBufferBuilder::new_with_capacity(1024);

View File

@ -1,9 +1,7 @@
//! This module contains code for managing the Write Buffer
use data_types::{database_rules::WriteBufferRollover, server_id::ServerId, DatabaseName};
use internal_types::entry::{
ClockValue, OwnedSequencedEntry, Segment as EntrySegment, SequencedEntry,
};
use internal_types::entry::{ClockValue, Segment as EntrySegment, SequencedEntry};
use object_store::{path::ObjectStorePath, ObjectStore, ObjectStoreApi};
use std::{convert::TryInto, mem, sync::Arc};
@ -128,7 +126,7 @@ impl Buffer {
/// has been closed out. If the max size of the buffer would be exceeded
/// by accepting the write, the oldest (first) of the closed segments
/// will be dropped, if it is persisted. Otherwise, an error is returned.
pub fn append(&mut self, write: Arc<OwnedSequencedEntry>) -> Result<Option<Arc<Segment>>> {
pub fn append(&mut self, write: Arc<dyn SequencedEntry>) -> Result<Option<Arc<Segment>>> {
let write_size = write.size();
while self.current_size + write_size > self.max_size {
@ -193,7 +191,7 @@ impl Buffer {
}
/// Returns any writes after the passed in `ClockValue`
pub fn writes_since(&self, since: ClockValue) -> Vec<Arc<OwnedSequencedEntry>> {
pub fn writes_since(&self, since: ClockValue) -> Vec<Arc<dyn SequencedEntry>> {
let mut writes = Vec::new();
// start with the newest writes and go back. Hopefully they're asking for
@ -203,7 +201,7 @@ impl Buffer {
writes.reverse();
return writes;
}
writes.push(Arc::clone(&w));
writes.push(Arc::clone(w));
}
for s in self.closed_segments.iter().rev() {
@ -212,7 +210,7 @@ impl Buffer {
writes.reverse();
return writes;
}
writes.push(Arc::clone(&w));
writes.push(Arc::clone(w));
}
}
@ -234,7 +232,7 @@ impl Buffer {
pub struct Segment {
pub(crate) id: u64,
size: usize,
pub writes: Vec<Arc<OwnedSequencedEntry>>,
pub writes: Vec<Arc<dyn SequencedEntry>>,
// Time this segment was initialized
created_at: DateTime<Utc>,
// Persistence metadata if segment is persisted
@ -257,7 +255,7 @@ impl Segment {
}
// appends the write to the segment
fn append(&mut self, write: Arc<OwnedSequencedEntry>) {
fn append(&mut self, write: Arc<dyn SequencedEntry>) {
self.size += write.size();
self.writes.push(write);
}
@ -416,7 +414,7 @@ mod tests {
use super::*;
use internal_types::entry::{test_helpers::lp_to_sequenced_entry as lp_2_se, SequencedEntry};
use object_store::memory::InMemory;
use std::convert::TryFrom;
use std::{convert::TryFrom, ops::Deref};
#[test]
fn append_increments_current_size_and_uses_existing_segment() {
@ -617,7 +615,7 @@ mod tests {
}
fn equal_to_server_id_and_clock_value(
sequenced_entry: &OwnedSequencedEntry,
sequenced_entry: &dyn SequencedEntry,
expected_server_id: ServerId,
expected_clock_value: u64,
) {
@ -666,9 +664,9 @@ mod tests {
let writes = buf.writes_since(ClockValue::try_from(1).unwrap());
assert_eq!(3, writes.len());
equal_to_server_id_and_clock_value(&writes[0], server_id1, 2);
equal_to_server_id_and_clock_value(&writes[1], server_id1, 3);
equal_to_server_id_and_clock_value(&writes[2], server_id2, 2);
equal_to_server_id_and_clock_value(writes[0].deref(), server_id1, 2);
equal_to_server_id_and_clock_value(writes[1].deref(), server_id1, 3);
equal_to_server_id_and_clock_value(writes[2].deref(), server_id2, 2);
}
#[test]
@ -745,7 +743,7 @@ mod tests {
server_id: ServerId,
clock_value: u64,
line_protocol: &str,
) -> Arc<OwnedSequencedEntry> {
) -> Arc<dyn SequencedEntry> {
Arc::new(lp_2_se(line_protocol, server_id.get_u32(), clock_value))
}
}

View File

@ -1,7 +1,10 @@
//! This module contains the main IOx Database object which has the
//! instances of the mutable buffer, read buffer, and object store
use super::{buffer::Buffer, JobRegistry};
use super::{
buffer::{self, Buffer},
JobRegistry,
};
use arrow_deps::{
arrow::datatypes::SchemaRef as ArrowSchemaRef,
datafusion::{
@ -202,6 +205,9 @@ pub enum Error {
#[snafu(display("Invalid Clock Value: {}", source))]
InvalidClockValue { source: ClockValueError },
#[snafu(display("Error sending Sequenced Entry to Write Buffer: {}", source))]
WriteBufferError { source: buffer::Error },
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@ -1070,28 +1076,42 @@ impl Db {
}
/// Stores an entry based on the configuration. The Entry will first be
/// converted into a Sequenced Entry with the logical clock assigned
/// from the database. If the write buffer is configured, the sequenced
/// entry is written into the buffer and replicated based on the
/// configured rules. If the mutable buffer is configured, the sequenced
/// entry is then written into the mutable buffer.
/// converted into a `SequencedEntry` with the logical clock assigned
/// from the database, and then the `SequencedEntry` will be passed to
/// `store_sequenced_entry`.
pub fn store_entry(&self, entry: Entry) -> Result<()> {
// TODO: build this based on either this or on the write buffer, if configured
let sequenced_entry = OwnedSequencedEntry::new_from_entry_bytes(
ClockValue::try_from(self.next_sequence()).context(InvalidClockValue)?,
self.server_id,
entry.data(),
)
.context(SequencedEntryError)?;
if self.rules.read().write_buffer_config.is_some() {
todo!("route to the Write Buffer. TODO: carols10cents #1157")
}
let sequenced_entry = Arc::new(
OwnedSequencedEntry::new_from_entry_bytes(
ClockValue::try_from(self.next_sequence()).context(InvalidClockValue)?,
self.server_id,
entry.data(),
)
.context(SequencedEntryError)?,
);
self.store_sequenced_entry(sequenced_entry)
}
pub fn store_sequenced_entry(&self, sequenced_entry: impl SequencedEntry) -> Result<()> {
/// Given a `SequencedEntry`:
///
/// - If the write buffer is configured, write the `SequencedEntry` into the buffer, which
/// will replicate the `SequencedEntry` based on the configured rules.
/// - If the mutable buffer is configured, the `SequencedEntry` is then written into the
/// mutable buffer.
///
/// Note that if the write buffer is configured but there is an error storing the
/// `SequencedEntry` in the write buffer, the `SequencedEntry` will *not* reach the mutable
/// buffer.
pub fn store_sequenced_entry(&self, sequenced_entry: Arc<dyn SequencedEntry>) -> Result<()> {
// Send to the write buffer, if configured
if let Some(wb) = &self.write_buffer {
wb.lock()
.append(Arc::clone(&sequenced_entry))
.context(WriteBufferError)?;
}
// Send to the mutable buffer
let rules = self.rules.read();
let mutable_size_threshold = rules.lifecycle_rules.mutable_size_threshold;
if rules.lifecycle_rules.immutable {
@ -1283,7 +1303,11 @@ pub mod test_helpers {
#[cfg(test)]
mod tests {
use crate::query_tests::utils::{make_database, make_db};
use super::{
test_helpers::{try_write_lp, write_lp},
*,
};
use crate::query_tests::utils::{make_db, TestDb};
use ::test_helpers::assert_contains;
use arrow_deps::{
arrow::record_batch::RecordBatch, assert_batches_eq, assert_batches_sorted_eq,
@ -1295,20 +1319,11 @@ mod tests {
database_rules::{Order, Sort, SortOrder},
partition_metadata::{ColumnSummary, StatValues, Statistics, TableSummary},
};
use object_store::{
disk::File, path::ObjectStorePath, path::Path, ObjectStore, ObjectStoreApi,
};
use query::{frontend::sql::SqlQueryPlanner, PartitionChunk};
use super::*;
use futures::stream;
use futures::{StreamExt, TryStreamExt};
use std::{convert::TryFrom, iter::Iterator};
use super::test_helpers::{try_write_lp, write_lp};
use futures::{stream, StreamExt, TryStreamExt};
use internal_types::entry::test_helpers::lp_to_entry;
use std::num::NonZeroUsize;
use std::str;
use object_store::{disk::File, path::Path, ObjectStore, ObjectStoreApi};
use query::{frontend::sql::SqlQueryPlanner, PartitionChunk};
use std::{convert::TryFrom, iter::Iterator, num::NonZeroUsize, str};
use tempfile::TempDir;
type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
@ -1761,7 +1776,12 @@ mod tests {
// Create a DB given a server id, an object store and a db name
let server_id = ServerId::try_from(10).unwrap();
let db_name = "parquet_test_db";
let test_db = make_database(server_id, Arc::clone(&object_store), db_name);
let test_db = TestDb::builder()
.server_id(server_id)
.object_store(Arc::clone(&object_store))
.db_name(db_name)
.build();
let db = Arc::new(test_db.db);
// Write some line protocols in Mutable buffer of the DB
@ -1822,17 +1842,21 @@ mod tests {
assert_eq!(path_list, paths.clone());
// Get full string path
let root_path = format!("{:?}", root.path());
let root_path = root_path.trim_matches('"');
let path = format!("{}/{}", root_path, paths[0].display());
println!("path: {}", path);
let path0 = match &paths[0] {
Path::File(file_path) => file_path.to_raw(),
other => panic!("expected `Path::File`, got: {:?}", other),
};
let mut path = root.path().to_path_buf();
path.push(&path0);
println!("path: {}", path.display());
// Create External table of this parquet file to get its content in a human
// readable form
// Note: We do not care about escaping quotes here because it is just a test
let sql = format!(
"CREATE EXTERNAL TABLE parquet_table STORED AS PARQUET LOCATION '{}'",
path
path.display()
);
let mut ctx = context::ExecutionContext::new();
@ -1867,7 +1891,12 @@ mod tests {
// Create a DB given a server id, an object store and a db name
let server_id = ServerId::try_from(10).unwrap();
let db_name = "unload_read_buffer_test_db";
let test_db = make_database(server_id, Arc::clone(&object_store), db_name);
let test_db = TestDb::builder()
.server_id(server_id)
.object_store(Arc::clone(&object_store))
.db_name(db_name)
.build();
let db = Arc::new(test_db.db);
// Write some line protocols in Mutable buffer of the DB
@ -1956,17 +1985,21 @@ mod tests {
assert_eq!(path_list, paths.clone());
// Get full string path
let root_path = format!("{:?}", root.path());
let root_path = root_path.trim_matches('"');
let path = format!("{}/{}", root_path, paths[0].display());
println!("path: {}", path);
let path0 = match &paths[0] {
Path::File(file_path) => file_path.to_raw(),
other => panic!("expected `Path::File`, got: {:?}", other),
};
let mut path = root.path().to_path_buf();
path.push(&path0);
println!("path: {}", path.display());
// Create External table of this parquet file to get its content in a human
// readable form
// Note: We do not care about escaping quotes here because it is just a test
let sql = format!(
"CREATE EXTERNAL TABLE parquet_table STORED AS PARQUET LOCATION '{}'",
path
path.display()
);
let mut ctx = context::ExecutionContext::new();
@ -2609,4 +2642,13 @@ mod tests {
Err(super::Error::HardLimitReached {})
));
}
#[tokio::test]
async fn write_goes_to_write_buffer_if_configured() {
let db = Arc::new(TestDb::builder().write_buffer(true).build().db);
assert_eq!(db.write_buffer.as_ref().unwrap().lock().size(), 0);
write_lp(db.as_ref(), "cpu bar=1 10");
assert_ne!(db.write_buffer.as_ref().unwrap().lock().size(), 0);
}
}

View File

@ -666,7 +666,7 @@ impl<M: ConnectionManager> Server<M> {
db: &Db,
sequenced_entry: OwnedSequencedEntry,
) -> Result<()> {
db.store_sequenced_entry(sequenced_entry)
db.store_sequenced_entry(Arc::new(sequenced_entry))
.map_err(|e| Error::UnknownDatabaseError {
source: Box::new(e),
})?;

View File

@ -1,14 +1,14 @@
use data_types::{
chunk::{ChunkStorage, ChunkSummary},
database_rules::DatabaseRules,
database_rules::{DatabaseRules, WriteBufferRollover},
server_id::ServerId,
DatabaseName,
};
use object_store::{disk::File, ObjectStore};
use query::{exec::Executor, Database};
use crate::{db::Db, JobRegistry};
use std::{convert::TryFrom, sync::Arc};
use crate::{buffer::Buffer, db::Db, JobRegistry};
use std::{borrow::Cow, convert::TryFrom, sync::Arc};
use tempfile::TempDir;
// A wrapper around a Db and a metrics registry allowing for isolated testing
@ -19,49 +19,99 @@ pub struct TestDb {
pub metric_registry: metrics::TestMetricRegistry,
}
/// Used for testing: create a Database with a local store
pub fn make_db() -> TestDb {
let server_id = ServerId::try_from(1).unwrap();
// TODO: When we support parquet file in memory, we will either turn this test back to memory
// or have both tests: local disk and memory
//let object_store = Arc::new(ObjectStore::new_in_memory(InMemory::new()));
//
// Create an object store with a specified location in a local disk
let root = TempDir::new().unwrap();
let object_store = Arc::new(ObjectStore::new_file(File::new(root.path())));
let exec = Arc::new(Executor::new(1));
let metrics_registry = Arc::new(metrics::MetricRegistry::new());
TestDb {
metric_registry: metrics::TestMetricRegistry::new(Arc::clone(&metrics_registry)),
db: Db::new(
DatabaseRules::new(DatabaseName::new("placeholder").unwrap()),
server_id,
object_store,
exec,
None, // write buffer
Arc::new(JobRegistry::new()),
metrics_registry,
),
impl TestDb {
pub fn builder() -> TestDbBuilder {
TestDbBuilder::new()
}
}
/// Used for testing: create a Database with a local store and a specified name
pub fn make_database(server_id: ServerId, object_store: Arc<ObjectStore>, db_name: &str) -> TestDb {
let exec = Arc::new(Executor::new(1));
let metrics_registry = Arc::new(metrics::MetricRegistry::new());
TestDb {
metric_registry: metrics::TestMetricRegistry::new(Arc::clone(&metrics_registry)),
db: Db::new(
DatabaseRules::new(DatabaseName::new(db_name.to_string()).unwrap()),
server_id,
object_store,
exec,
None, // write buffer
Arc::new(JobRegistry::new()),
metrics_registry,
),
#[derive(Debug, Default)]
pub struct TestDbBuilder {
server_id: Option<ServerId>,
object_store: Option<Arc<ObjectStore>>,
db_name: Option<DatabaseName<'static>>,
write_buffer: bool,
}
impl TestDbBuilder {
pub fn new() -> Self {
Self::default()
}
pub fn build(self) -> TestDb {
let server_id = self
.server_id
.unwrap_or_else(|| ServerId::try_from(1).unwrap());
let db_name = self
.db_name
.unwrap_or_else(|| DatabaseName::new("placeholder").unwrap());
// TODO: When we support parquet file in memory, we will either turn this test back to
// memory or have both tests: local disk and memory
//let object_store = Arc::new(ObjectStore::new_in_memory(InMemory::new()));
//
// Unless otherwise specified, create an object store with a specified location in a local
// disk
let object_store = self.object_store.unwrap_or_else(|| {
let root = TempDir::new().unwrap();
Arc::new(ObjectStore::new_file(File::new(root.path())))
});
let exec = Arc::new(Executor::new(1));
let metrics_registry = Arc::new(metrics::MetricRegistry::new());
let write_buffer = if self.write_buffer {
let max = 1 << 32;
let segment = 1 << 16;
Some(Buffer::new(
max,
segment,
WriteBufferRollover::ReturnError,
false,
server_id,
))
} else {
None
};
TestDb {
metric_registry: metrics::TestMetricRegistry::new(Arc::clone(&metrics_registry)),
db: Db::new(
DatabaseRules::new(db_name),
server_id,
object_store,
exec,
write_buffer,
Arc::new(JobRegistry::new()),
metrics_registry,
),
}
}
pub fn server_id(mut self, server_id: ServerId) -> Self {
self.server_id = Some(server_id);
self
}
pub fn object_store(mut self, object_store: Arc<ObjectStore>) -> Self {
self.object_store = Some(object_store);
self
}
pub fn db_name<T: Into<Cow<'static, str>>>(mut self, db_name: T) -> Self {
self.db_name = Some(DatabaseName::new(db_name).unwrap());
self
}
pub fn write_buffer(mut self, enabled: bool) -> Self {
self.write_buffer = enabled;
self
}
}
/// Used for testing: create a Database with a local store
pub fn make_db() -> TestDb {
TestDb::builder().build()
}
fn chunk_summary_iter(db: &Db) -> impl Iterator<Item = ChunkSummary> + '_ {

View File

@ -267,20 +267,15 @@ where
#[cfg(test)]
mod tests {
use std::convert::TryFrom;
use crate::{
db::{Db, DbChunk},
JobRegistry,
};
use super::*;
use crate::db::test_helpers::write_lp;
use data_types::{database_rules::DatabaseRules, server_id::ServerId, DatabaseName};
use crate::{
db::{test_helpers::write_lp, DbChunk},
query_tests::utils::TestDb,
};
use futures::TryStreamExt;
use mutable_buffer::chunk::Chunk as ChunkWB;
use object_store::memory::InMemory;
use query::{exec::Executor, predicate::Predicate, Database};
use query::{predicate::Predicate, Database};
use tracker::MemRegistry;
#[tokio::test]
@ -291,7 +286,10 @@ cpu,host=A,region=west user=3.2,system=50.1 10
cpu,host=B,region=east user=10.0,system=74.1 1
"#;
let db = make_db();
let db = TestDb::builder()
.object_store(Arc::new(ObjectStore::new_in_memory(InMemory::new())))
.build()
.db;
write_lp(&db, &lp);
let store = Arc::new(ObjectStore::new_in_memory(InMemory::new()));
@ -388,22 +386,4 @@ cpu,host=B,region=east user=10.0,system=74.1 1
snapshot.mark_table_finished(2);
assert!(snapshot.finished());
}
/// Create a Database with a local store
pub fn make_db() -> Db {
let metrics_registry = Arc::new(metrics::MetricRegistry::new());
let object_store = Arc::new(ObjectStore::new_in_memory(InMemory::new()));
let server_id = ServerId::try_from(1).unwrap();
let exec = Arc::new(Executor::new(1));
Db::new(
DatabaseRules::new(DatabaseName::new("placeholder").unwrap()),
server_id,
object_store,
exec,
None, // write buffer
Arc::new(JobRegistry::new()),
Arc::clone(&metrics_registry),
)
}
}