Merge branch 'main' into crepererum/write_buffer_span_ctx

pull/24376/head
kodiakhq[bot] 2021-10-14 11:50:07 +00:00 committed by GitHub
commit 61ec559eee
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 201 additions and 164 deletions

2
Cargo.lock generated
View File

@ -1010,6 +1010,7 @@ checksum = "c34f04666d835ff5d62e058c3995147c06f42fe86ff053337632bca83e42702d"
name = "entry"
version = "0.1.0"
dependencies = [
"bytes",
"chrono",
"data_types",
"flatbuffers",
@ -1801,7 +1802,6 @@ version = "0.1.0"
dependencies = [
"futures",
"parking_lot",
"snafu",
"time 0.1.0",
"tokio",
]

View File

@ -6,6 +6,7 @@ edition = "2018"
description = "The entry format used by the write buffer"
[dependencies]
bytes = "1.0"
chrono = "0.4"
data_types = { path = "../data_types" }
# See docs/regenerating_flatbuffers.md about updating generated code when updating the

View File

@ -1,6 +1,7 @@
//! This module contains helper code for building `Entry` from line protocol and the
//! `DatabaseRules` configuration.
use bytes::Bytes;
use std::{collections::BTreeMap, convert::TryFrom, fmt::Formatter};
use chrono::{DateTime, TimeZone, Utc};
@ -738,7 +739,7 @@ pub struct ShardedEntry {
/// iterating through the partitioned writes.
#[self_referencing]
pub struct Entry {
data: Vec<u8>,
data: Bytes,
#[borrows(data)]
#[covariant]
fb: entry_fb::Entry<'this>,
@ -799,6 +800,14 @@ impl TryFrom<Vec<u8>> for Entry {
type Error = flatbuffers::InvalidFlatbuffer;
fn try_from(data: Vec<u8>) -> Result<Self, Self::Error> {
Self::try_from(Bytes::from(data))
}
}
impl TryFrom<Bytes> for Entry {
type Error = flatbuffers::InvalidFlatbuffer;
fn try_from(data: Bytes) -> Result<Self, Self::Error> {
EntryTryBuilder {
data,
fb_builder: |data| flatbuffers::root::<entry_fb::Entry<'_>>(data),
@ -807,7 +816,7 @@ impl TryFrom<Vec<u8>> for Entry {
}
}
impl From<Entry> for Vec<u8> {
impl From<Entry> for Bytes {
fn from(entry: Entry) -> Self {
entry.into_heads().data
}
@ -815,8 +824,8 @@ impl From<Entry> for Vec<u8> {
impl Clone for Entry {
fn clone(&self) -> Self {
Self::try_from(self.data().to_vec())
.expect("flatbuffer was valid, should not be broken now")
let bytes: &Bytes = self.borrow_data();
Self::try_from(bytes.clone()).expect("flatbuffer was valid, should not be broken now")
}
}

View File

@ -78,6 +78,7 @@ fn generate_grpc_types(root: &Path) -> Result<()> {
".influxdata.iox.management.v1.PersistChunks.chunks",
".influxdata.iox.management.v1.WriteChunk.chunk_id",
".influxdata.iox.management.v1.UnloadPartitionChunkRequest.chunk_id",
".influxdata.iox.write.v1.WriteEntryRequest.entry",
])
.btree_map(&[
".influxdata.iox.catalog.v1.DatabaseCheckpoint.sequencer_numbers",

View File

@ -1,3 +1,4 @@
use bytes::Bytes;
use thiserror::Error;
use generated_types::influxdata::iox::write::v1 as write;
@ -88,7 +89,7 @@ impl Client {
pub async fn write_entry(
&mut self,
db_name: impl Into<String> + Send,
entry: impl Into<Vec<u8>> + Send,
entry: impl Into<Bytes> + Send,
) -> Result<(), WriteError> {
let db_name = db_name.into();
let entry = entry.into();

View File

@ -8,7 +8,6 @@ readme = "README.md"
[dependencies]
parking_lot = "0.11"
snafu = "0.6"
time = { path = "../time" }
tokio = { version = "1.11", features = ["sync"] }

View File

@ -48,17 +48,6 @@ impl ApplicationState {
}
}
/// Overrides the write_buffer_factory
pub fn with_write_buffer_factory(
self,
write_buffer_factory: Arc<WriteBufferConfigFactory>,
) -> Self {
Self {
write_buffer_factory,
..self
}
}
pub fn object_store(&self) -> &Arc<ObjectStore> {
&self.object_store
}

View File

@ -1226,7 +1226,7 @@ mod tests {
};
use time::Time;
use uuid::Uuid;
use write_buffer::{config::WriteBufferConfigFactory, mock::MockBufferSharedState};
use write_buffer::mock::MockBufferSharedState;
#[tokio::test]
async fn database_shutdown_waits_for_jobs() {
@ -1429,12 +1429,13 @@ mod tests {
));
// setup application
let application = ApplicationState::new(Arc::new(ObjectStore::new_in_memory()), None);
let mut factory = WriteBufferConfigFactory::new(Arc::clone(application.time_provider()));
factory.register_mock("my_mock".to_string(), state.clone());
let application = Arc::new(application.with_write_buffer_factory(Arc::new(factory)));
let application = Arc::new(ApplicationState::new(
Arc::new(ObjectStore::new_in_memory()),
None,
));
application
.write_buffer_factory()
.register_mock("my_mock".to_string(), state.clone());
let server_id = ServerId::try_from(1).unwrap();

View File

@ -1225,10 +1225,48 @@ where
}
}
pub mod test_utils {
use super::*;
use crate::connection::test_helpers::TestConnectionManager;
use object_store::ObjectStore;
/// Create a new [`ApplicationState`] with an in-memory object store
pub fn make_application() -> Arc<ApplicationState> {
Arc::new(ApplicationState::new(
Arc::new(ObjectStore::new_in_memory()),
None,
))
}
/// Creates a new server with the provided [`ApplicationState`]
pub fn make_server(application: Arc<ApplicationState>) -> Arc<Server<TestConnectionManager>> {
Arc::new(Server::new(
TestConnectionManager::new(),
application,
Default::default(),
))
}
/// Creates a new server with the provided [`ApplicationState`]
///
/// Sets the `server_id` provided and waits for it to initialize
pub async fn make_initialized_server(
server_id: ServerId,
application: Arc<ApplicationState>,
) -> Arc<Server<TestConnectionManager>> {
let server = make_server(application);
server.set_id(server_id).unwrap();
server.wait_for_init().await.unwrap();
server
}
}
#[cfg(test)]
mod tests {
use super::*;
use ::write_buffer::config::WriteBufferConfigFactory;
use super::{
test_utils::{make_application, make_server},
*,
};
use arrow::record_batch::RecordBatch;
use arrow_util::assert_batches_eq;
use bytes::Bytes;
@ -1267,21 +1305,6 @@ mod tests {
const ARBITRARY_DEFAULT_TIME: i64 = 456;
fn make_application() -> Arc<ApplicationState> {
Arc::new(ApplicationState::new(
Arc::new(ObjectStore::new_in_memory()),
None,
))
}
fn make_server(application: Arc<ApplicationState>) -> Arc<Server<TestConnectionManager>> {
Arc::new(Server::new(
TestConnectionManager::new(),
application,
Default::default(),
))
}
#[tokio::test]
async fn server_api_calls_return_error_with_no_id_set() {
let server = make_server(make_application());
@ -2412,12 +2435,11 @@ mod tests {
#[tokio::test]
async fn write_buffer_errors_propagate() {
let application = ApplicationState::new(Arc::new(ObjectStore::new_in_memory()), None);
let application = make_application();
let mut factory = WriteBufferConfigFactory::new(Arc::clone(application.time_provider()));
factory.register_always_fail_mock("my_mock".to_string());
let application = Arc::new(application.with_write_buffer_factory(Arc::new(factory)));
application
.write_buffer_factory()
.register_always_fail_mock("my_mock".to_string());
let server = make_server(application);
server.set_id(ServerId::try_from(1).unwrap()).unwrap();

View File

@ -192,18 +192,14 @@ async fn stream_in_sequenced_entries<'a>(
mod tests {
use std::collections::BTreeMap;
use std::convert::TryFrom;
use std::num::{NonZeroU32, NonZeroUsize};
use std::num::NonZeroU32;
use ::test_helpers::assert_contains;
use arrow_util::assert_batches_eq;
use data_types::database_rules::{PartitionTemplate, TemplatePart};
use data_types::sequence::Sequence;
use entry::test_helpers::lp_to_entry;
use persistence_windows::min_max_sequence::MinMaxSequence;
use query::exec::ExecutionContextProvider;
use query::frontend::sql::SqlQueryPlanner;
use query::QueryDatabase;
use test_helpers::tracing::TracingCapture;
use write_buffer::mock::{MockBufferForReading, MockBufferSharedState};
use crate::db::test_helpers::run_query;
@ -435,87 +431,6 @@ mod tests {
assert_batches_eq!(expected, &batches);
}
#[tokio::test]
async fn write_buffer_reads_wait_for_compaction() {
let tracing_capture = TracingCapture::new();
// setup write buffer
// these numbers are handtuned to trigger hard buffer limits w/o making the test too big
let n_entries = 50u64;
let write_buffer_state =
MockBufferSharedState::empty_with_n_sequencers(NonZeroU32::try_from(1).unwrap());
for sequence_number in 0..n_entries {
let lp = format!(
"table_1,tag_partition_by=a foo=\"hello\",bar=1 {}",
sequence_number / 2
);
write_buffer_state.push_entry(SequencedEntry::new_from_sequence(
Sequence::new(0, sequence_number),
Time::from_timestamp_nanos(0),
lp_to_entry(&lp),
));
}
write_buffer_state.push_entry(SequencedEntry::new_from_sequence(
Sequence::new(0, n_entries),
Time::from_timestamp_nanos(0),
lp_to_entry("table_2,partition_by=a foo=1 0"),
));
// create DB
let partition_template = PartitionTemplate {
parts: vec![TemplatePart::Column("tag_partition_by".to_string())],
};
let test_db = TestDb::builder()
.lifecycle_rules(data_types::database_rules::LifecycleRules {
buffer_size_hard: Some(NonZeroUsize::new(10_000).unwrap()),
mub_row_threshold: NonZeroUsize::new(10).unwrap(),
..Default::default()
})
.partition_template(partition_template)
.build()
.await;
let db = test_db.db;
// start background task loop
let shutdown: CancellationToken = Default::default();
let shutdown_captured = shutdown.clone();
let db_captured = Arc::clone(&db);
let join_handle =
tokio::spawn(async move { db_captured.background_worker(shutdown_captured).await });
let consumer = WriteBufferConsumer::new(
Box::new(MockBufferForReading::new(write_buffer_state, None).unwrap()),
Arc::clone(&db),
test_db.metric_registry.as_ref(),
);
// after a while the table should exist
let t_0 = Instant::now();
loop {
if db.table_schema("table_2").is_some() {
break;
}
assert!(t_0.elapsed() < Duration::from_secs(10));
tokio::time::sleep(Duration::from_millis(100)).await;
}
// do: stop background task loop
shutdown.cancel();
join_handle.await.unwrap();
consumer.shutdown();
consumer.join().await.unwrap();
// no rows should be dropped
let batches = run_query(db, "select sum(bar) as n from table_1").await;
let expected = vec!["+----+", "| n |", "+----+", "| 25 |", "+----+"];
assert_batches_eq!(expected, &batches);
// check that hard buffer limit was actually hit (otherwise this test is pointless/outdated)
assert_contains!(tracing_capture.to_string(), "Hard limit reached while reading from write buffer, waiting for compaction to catch up");
}
#[tokio::test]
async fn error_converting_data_from_write_buffer_to_sequenced_entry_is_reported() {
let write_buffer_state =

View File

@ -14,42 +14,26 @@ use data_types::{
DatabaseName,
};
use iox_object_store::IoxObjectStore;
use object_store::ObjectStore;
use predicate::{delete_expr::DeleteExpr, delete_predicate::DeletePredicate};
use query::{QueryChunk, QueryChunkMeta, QueryDatabase};
use server::{
connection::test_helpers::TestConnectionManager,
db::test_helpers::{run_query, write_lp},
rules::ProvidedDatabaseRules,
ApplicationState, Db, Server,
test_utils::{make_application, make_initialized_server},
Db,
};
use test_helpers::maybe_start_logging;
async fn start_server(
server_id: ServerId,
application: Arc<ApplicationState>,
) -> Arc<Server<TestConnectionManager>> {
let server = Arc::new(Server::new(
TestConnectionManager::new(),
application,
Default::default(),
));
server.set_id(server_id).unwrap();
server.wait_for_init().await.unwrap();
server
}
#[tokio::test]
async fn delete_predicate_preservation() {
maybe_start_logging();
// ==================== setup ====================
let object_store = Arc::new(ObjectStore::new_in_memory());
let server_id = ServerId::new(NonZeroU32::new(1).unwrap());
let db_name = DatabaseName::new("delete_predicate_preservation_test").unwrap();
let application = Arc::new(ApplicationState::new(Arc::clone(&object_store), None));
let server = start_server(server_id, Arc::clone(&application)).await;
let application = make_application();
let server = make_initialized_server(server_id, Arc::clone(&application)).await;
// Test that delete predicates are stored within the preserved catalog

View File

@ -0,0 +1,112 @@
use std::num::{NonZeroU32, NonZeroUsize};
use std::time::{Duration, Instant};
use arrow_util::assert_batches_eq;
use data_types::database_rules::{
DatabaseRules, LifecycleRules, PartitionTemplate, TemplatePart, WriteBufferConnection,
WriteBufferDirection,
};
use data_types::{sequence::Sequence, server_id::ServerId, DatabaseName};
use entry::{test_helpers::lp_to_entry, SequencedEntry};
use query::QueryDatabase;
use server::{
db::test_helpers::run_query,
rules::ProvidedDatabaseRules,
test_utils::{make_application, make_initialized_server},
};
use test_helpers::{assert_contains, tracing::TracingCapture};
use time::Time;
use write_buffer::mock::MockBufferSharedState;
#[tokio::test]
async fn write_buffer_reads_wait_for_compaction() {
let tracing_capture = TracingCapture::new();
// ==================== setup ====================
let server_id = ServerId::new(NonZeroU32::new(1).unwrap());
let db_name = DatabaseName::new("delete_predicate_preservation_test").unwrap();
// Test that delete predicates are stored within the preserved catalog
// setup write buffer
// these numbers are handtuned to trigger hard buffer limits w/o making the test too big
let n_entries = 50u64;
let write_buffer_state =
MockBufferSharedState::empty_with_n_sequencers(NonZeroU32::new(1).unwrap());
for sequence_number in 0..n_entries {
let lp = format!(
"table_1,tag_partition_by=a foo=\"hello\",bar=1 {}",
sequence_number / 2
);
write_buffer_state.push_entry(SequencedEntry::new_from_sequence(
Sequence::new(0, sequence_number),
Time::from_timestamp_nanos(0),
lp_to_entry(&lp),
));
}
write_buffer_state.push_entry(SequencedEntry::new_from_sequence(
Sequence::new(0, n_entries),
Time::from_timestamp_nanos(0),
lp_to_entry("table_2,partition_by=a foo=1 0"),
));
let application = make_application();
application
.write_buffer_factory()
.register_mock("my_mock".to_string(), write_buffer_state);
let server = make_initialized_server(server_id, application).await;
// create DB
let rules = DatabaseRules {
partition_template: PartitionTemplate {
parts: vec![TemplatePart::Column("tag_partition_by".to_string())],
},
lifecycle_rules: LifecycleRules {
buffer_size_hard: Some(NonZeroUsize::new(10_000).unwrap()),
mub_row_threshold: NonZeroUsize::new(10).unwrap(),
..Default::default()
},
write_buffer_connection: Some(WriteBufferConnection {
direction: WriteBufferDirection::Read,
type_: "mock".to_string(),
connection: "my_mock".to_string(),
..Default::default()
}),
..DatabaseRules::new(db_name.clone())
};
let database = server
.create_database(ProvidedDatabaseRules::new_rules(rules.clone().into()).unwrap())
.await
.unwrap();
let db = database.initialized_db().unwrap();
// after a while the table should exist
let t_0 = Instant::now();
loop {
if db.table_schema("table_2").is_some() {
break;
}
assert!(t_0.elapsed() < Duration::from_secs(10));
tokio::time::sleep(Duration::from_millis(100)).await;
}
// no rows should be dropped
let batches = run_query(db, "select sum(bar) as n from table_1").await;
let expected = vec!["+----+", "| n |", "+----+", "| 25 |", "+----+"];
assert_batches_eq!(expected, &batches);
// check that hard buffer limit was actually hit (otherwise this test is pointless/outdated)
assert_contains!(
tracing_capture.to_string(),
"Hard limit reached while reading from write buffer, waiting for compaction to catch up"
);
server.shutdown();
server.join().await.unwrap();
}

View File

@ -113,7 +113,7 @@ async fn test_write_entry() {
lines_to_sharded_entries(&lines, default_time, sharder(1).as_ref(), &partitioner(1))
.unwrap();
let entry: Vec<u8> = sharded_entries.into_iter().next().unwrap().entry.into();
let entry = sharded_entries.into_iter().next().unwrap().entry;
write_client.write_entry(&db_name, entry).await.unwrap();

View File

@ -1,3 +1,4 @@
use parking_lot::RwLock;
use std::{
collections::{btree_map::Entry, BTreeMap},
sync::Arc,
@ -35,7 +36,7 @@ enum Mock {
/// from [`WriteBufferConnection`].
#[derive(Debug)]
pub struct WriteBufferConfigFactory {
mocks: BTreeMap<String, Mock>,
mocks: RwLock<BTreeMap<String, Mock>>,
time_provider: Arc<dyn TimeProvider>,
}
@ -52,7 +53,7 @@ impl WriteBufferConfigFactory {
///
/// # Panics
/// When mock with identical name is already registered.
pub fn register_mock(&mut self, name: String, state: MockBufferSharedState) {
pub fn register_mock(&self, name: String, state: MockBufferSharedState) {
self.set_mock(name, Mock::Normal(state));
}
@ -60,12 +61,13 @@ impl WriteBufferConfigFactory {
///
/// # Panics
/// When mock with identical name is already registered.
pub fn register_always_fail_mock(&mut self, name: String) {
pub fn register_always_fail_mock(&self, name: String) {
self.set_mock(name, Mock::AlwaysFailing);
}
fn set_mock(&mut self, name: String, mock: Mock) {
match self.mocks.entry(name) {
fn set_mock(&self, name: String, mock: Mock) {
let mut mocks = self.mocks.write();
match mocks.entry(name) {
Entry::Vacant(v) => {
v.insert(mock);
}
@ -77,6 +79,7 @@ impl WriteBufferConfigFactory {
fn get_mock(&self, name: &str) -> Result<Mock, WriteBufferError> {
self.mocks
.read()
.get(name)
.cloned()
.ok_or_else::<WriteBufferError, _>(|| format!("Unknown mock ID: {}", name).into())
@ -237,7 +240,7 @@ mod tests {
#[tokio::test]
async fn test_writing_mock() {
let time = Arc::new(time::SystemProvider::new());
let mut factory = WriteBufferConfigFactory::new(time);
let factory = WriteBufferConfigFactory::new(time);
let state =
MockBufferSharedState::empty_with_n_sequencers(NonZeroU32::try_from(1).unwrap());
@ -277,7 +280,7 @@ mod tests {
let trace_collector: Arc<dyn TraceCollector> = Arc::new(RingBufferTraceCollector::new(5));
let time = Arc::new(time::SystemProvider::new());
let mut factory = WriteBufferConfigFactory::new(time);
let factory = WriteBufferConfigFactory::new(time);
let state =
MockBufferSharedState::empty_with_n_sequencers(NonZeroU32::try_from(1).unwrap());
@ -316,7 +319,7 @@ mod tests {
#[tokio::test]
async fn test_writing_mock_failing() {
let time = Arc::new(time::SystemProvider::new());
let mut factory = WriteBufferConfigFactory::new(time);
let factory = WriteBufferConfigFactory::new(time);
let mock_name = "some_mock";
factory.register_always_fail_mock(mock_name.to_string());
@ -354,7 +357,7 @@ mod tests {
let trace_collector: Arc<dyn TraceCollector> = Arc::new(RingBufferTraceCollector::new(5));
let time = Arc::new(time::SystemProvider::new());
let mut factory = WriteBufferConfigFactory::new(time);
let factory = WriteBufferConfigFactory::new(time);
let mock_name = "some_mock";
factory.register_always_fail_mock(mock_name.to_string());
@ -393,7 +396,7 @@ mod tests {
#[should_panic(expected = "Mock with the name 'some_mock' already registered")]
fn test_register_mock_twice_panics() {
let time = Arc::new(time::SystemProvider::new());
let mut factory = WriteBufferConfigFactory::new(time);
let factory = WriteBufferConfigFactory::new(time);
let state =
MockBufferSharedState::empty_with_n_sequencers(NonZeroU32::try_from(1).unwrap());