refactor: remove Entry from Db interfaces (#2724) (#3027)

pull/24376/head
Raphael Taylor-Davies 2021-11-04 13:28:47 +00:00 committed by GitHub
parent 07ba629e2b
commit d28749bd93
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 61 additions and 190 deletions

View File

@ -923,32 +923,10 @@ mod tests {
.unwrap()
.clone();
let entry_ingest = metric_registry
.get_instrument::<Metric<U64Counter>>("ingest_entries_bytes")
.unwrap();
let entry_ingest_ok = entry_ingest
.get_observer(&Attributes::from(&[
("db_name", "MyOrg_MyBucket"),
("status", "ok"),
]))
.unwrap()
.clone();
let entry_ingest_error = entry_ingest
.get_observer(&Attributes::from(&[
("db_name", "MyOrg_MyBucket"),
("status", "error"),
]))
.unwrap()
.clone();
assert_eq!(request_duration_ok.fetch().sample_count(), 1);
assert_eq!(request_count_ok.fetch(), 1);
assert_eq!(request_count_client_error.fetch(), 0);
assert_eq!(request_count_server_error.fetch(), 0);
assert_ne!(entry_ingest_ok.fetch(), 0);
assert_eq!(entry_ingest_error.fetch(), 0);
// A single successful point landed
let ingest_lines = metric_registry
@ -1051,8 +1029,6 @@ mod tests {
assert_eq!(request_count_ok.fetch(), 1);
assert_eq!(request_count_client_error.fetch(), 0);
assert_eq!(request_count_server_error.fetch(), 1);
assert_ne!(entry_ingest_ok.fetch(), 0);
assert_ne!(entry_ingest_error.fetch(), 0);
}
/// Sets up a test database with some data for testing the query endpoint

View File

@ -18,6 +18,7 @@ use generated_types::{
};
use internal_types::freezable::Freezable;
use iox_object_store::IoxObjectStore;
use mutable_batch::DbWrite;
use observability_deps::tracing::{error, info, warn};
use parking_lot::{MappedRwLockReadGuard, RwLock, RwLockReadGuard};
use parquet_catalog::core::PreservedCatalog;
@ -26,13 +27,10 @@ use snafu::{ensure, OptionExt, ResultExt, Snafu};
use std::{future::Future, sync::Arc, time::Duration};
use tokio::{sync::Notify, task::JoinError};
use tokio_util::sync::CancellationToken;
use trace::ctx::SpanContext;
use uuid::Uuid;
const INIT_BACKOFF: Duration = Duration::from_secs(1);
mod metrics;
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display(
@ -162,16 +160,12 @@ impl Database {
"new database"
);
let metrics =
metrics::Metrics::new(application.metric_registry().as_ref(), config.name.as_str());
let shared = Arc::new(DatabaseShared {
config,
application,
shutdown: Default::default(),
state: RwLock::new(Freezable::new(DatabaseState::Known(DatabaseStateKnown {}))),
state_notify: Default::default(),
metrics,
});
let handle = tokio::spawn(background_worker(Arc::clone(&shared)));
@ -589,18 +583,12 @@ impl Database {
Ok(())
}
/// Writes an entry to this `Database` this will either:
/// Writes a [`DbWrite`] to this `Database` this will either:
///
/// - write it to a write buffer
/// - write it to a local `Db`
///
pub async fn write_entry(
&self,
entry: entry::Entry,
span_ctx: Option<SpanContext>,
) -> Result<(), WriteError> {
let recorder = self.shared.metrics.entry_ingest(entry.data().len());
pub async fn route_write(&self, write: &DbWrite) -> Result<(), WriteError> {
let db = {
let state = self.shared.state.read();
match &**state {
@ -617,7 +605,7 @@ impl Database {
}
};
db.store_entry(entry, span_ctx).await.map_err(|e| {
db.route_write(write).await.map_err(|e| {
use super::db::Error;
match e {
// TODO: Pull write buffer producer out of Db
@ -628,7 +616,6 @@ impl Database {
}
})?;
recorder.success();
Ok(())
}
}
@ -666,9 +653,6 @@ struct DatabaseShared {
/// Notify that the database state has changed
state_notify: Notify,
/// Metrics for this database
metrics: metrics::Metrics,
}
/// The background worker for `Database` - there should only ever be one

View File

@ -1,58 +0,0 @@
use metric::U64Counter;
#[derive(Debug)]
pub struct Metrics {
ingest_entries_bytes_ok: U64Counter,
ingest_entries_bytes_error: U64Counter,
}
impl Metrics {
pub fn new(registry: &metric::Registry, db_name: impl Into<String>) -> Self {
let db_name = db_name.into();
let metric = registry
.register_metric::<U64Counter>("ingest_entries_bytes", "total ingested entry bytes");
Self {
ingest_entries_bytes_ok: metric
.recorder([("db_name", db_name.clone().into()), ("status", "ok".into())]),
ingest_entries_bytes_error: metric
.recorder([("db_name", db_name.into()), ("status", "error".into())]),
}
}
/// Get a recorder for reporting entry ingest
pub fn entry_ingest(&self, bytes: usize) -> EntryIngestRecorder<'_> {
EntryIngestRecorder {
metrics: self,
recorded: false,
bytes,
}
}
}
/// An RAII handle that records metrics for ingest
///
/// Records an error on drop unless `EntryIngestRecorder::success` invoked
pub struct EntryIngestRecorder<'a> {
metrics: &'a Metrics,
bytes: usize,
recorded: bool,
}
impl<'a> EntryIngestRecorder<'a> {
pub fn success(mut self) {
self.recorded = true;
self.metrics.ingest_entries_bytes_ok.inc(self.bytes as u64)
}
}
impl<'a> Drop for EntryIngestRecorder<'a> {
fn drop(&mut self) {
if !self.recorded {
self.metrics
.ingest_entries_bytes_error
.inc(self.bytes as u64);
}
}
}

View File

@ -26,11 +26,8 @@ use data_types::{
server_id::ServerId,
};
use datafusion::catalog::{catalog::CatalogProvider, schema::SchemaProvider};
use entry::Entry;
use iox_object_store::IoxObjectStore;
use mutable_batch::payload::{DbWrite, PartitionWrite};
use mutable_batch::WriteMeta;
use mutable_batch_entry::entry_to_batches;
use mutable_buffer::{ChunkMetrics as MutableBufferChunkMetrics, MBChunk};
use observability_deps::tracing::{debug, error, info, warn};
use parquet_catalog::{
@ -105,9 +102,6 @@ pub enum Error {
#[snafu(display("Hard buffer size limit reached"))]
HardLimitReached {},
#[snafu(display("Cannot convert entry to db write: {}", source))]
EntryConversion { source: mutable_batch_entry::Error },
#[snafu(display(
"Cannot delete data from non-existing table, {}: {}",
table_name,
@ -915,19 +909,16 @@ impl Db {
Ok(())
}
/// Stores an entry based on the configuration.
/// Stores the write on this [`Db`] and/or routes it to the write buffer
///
/// TODO: Remove this method (#2243)
pub async fn store_entry(&self, entry: Entry, span_ctx: Option<SpanContext>) -> Result<()> {
pub async fn route_write(&self, write: &DbWrite) -> Result<()> {
let immutable = {
let rules = self.rules.read();
rules.lifecycle_rules.immutable
};
debug!(%immutable, has_write_buffer_producer=self.write_buffer_producer.is_some(), "storing entry");
let tables = entry_to_batches(&entry).context(EntryConversion)?;
let mut write = DbWrite::new(tables, WriteMeta::new(None, None, span_ctx, None));
match (self.write_buffer_producer.as_ref(), immutable) {
(Some(write_buffer), true) => {
// If only the write buffer is configured, this is passing the data through to
@ -936,7 +927,7 @@ impl Db {
// TODO: be smarter than always using sequencer 0
let _ = write_buffer
.store_write(0, &write)
.store_write(0, write)
.await
.context(WriteBufferWritingError)?;
@ -947,12 +938,10 @@ impl Db {
// buffer to return success before adding the entry to the mutable buffer.
// TODO: be smarter than always using sequencer 0
let meta = write_buffer
.store_write(0, &write)
write_buffer
.store_write(0, write)
.await
.context(WriteBufferWritingError)?;
write.set_meta(meta);
}
(_, true) => {
// If not configured to send entries to the write buffer and the database is
@ -966,7 +955,7 @@ impl Db {
}
};
self.store_write(&write)
self.store_write(write)
}
/// Writes the provided [`DbWrite`] to this database
@ -1206,37 +1195,25 @@ pub(crate) fn checkpoint_data_from_catalog(catalog: &Catalog) -> CheckpointData
}
pub mod test_helpers {
use std::collections::{BTreeSet, HashSet};
use std::collections::BTreeSet;
use arrow::record_batch::RecordBatch;
use data_types::chunk_metadata::ChunkStorage;
use entry::test_helpers::lp_to_entries;
use mutable_batch_lp::lines_to_batches;
use query::frontend::sql::SqlQueryPlanner;
use super::*;
/// Try to write lineprotocol data and return all tables that where written.
pub async fn try_write_lp(db: &Db, lp: &str) -> Result<Vec<String>> {
let entries = {
let partitioner = &db.rules.read().partition_template;
lp_to_entries(lp, partitioner)
};
let tables = lines_to_batches(lp, 0).unwrap();
let mut table_names: Vec<_> = tables.keys().cloned().collect();
let mut tables = HashSet::new();
for entry in entries {
if let Some(writes) = entry.partition_writes() {
for write in writes {
for batch in write.table_batches() {
tables.insert(batch.name().to_string());
}
}
db.store_entry(entry, None).await?;
}
}
let write = DbWrite::new(tables, Default::default());
db.route_write(&write).await?;
let mut tables: Vec<_> = tables.into_iter().collect();
tables.sort();
Ok(tables)
table_names.sort_unstable();
Ok(table_names)
}
/// Same was [`try_write_lp`](try_write_lp) but will panic on failure.
@ -1377,17 +1354,16 @@ mod tests {
partition_metadata::{ColumnSummary, InfluxDbType, StatValues, Statistics, TableSummary},
write_summary::TimestampSummary,
};
use entry::test_helpers::lp_to_entry;
use futures::{stream, StreamExt, TryStreamExt};
use iox_object_store::ParquetFilePath;
use metric::{Attributes, CumulativeGauge, Metric, Observation};
use mutable_batch_lp::lines_to_batches;
use object_store::ObjectStore;
use parquet_catalog::test_helpers::load_ok;
use parquet_file::{
metadata::IoxParquetMetaData,
test_utils::{load_parquet_from_store_for_path, read_data_from_parquet_data},
};
use persistence_windows::min_max_sequence::MinMaxSequence;
use query::{QueryChunk, QueryDatabase};
use schema::selection::Selection;
use schema::Schema;
@ -1424,8 +1400,9 @@ mod tests {
// Validate that writes are rejected if there is no mutable buffer
let db = immutable_db().await;
let entry = lp_to_entry("cpu bar=1 10");
let res = db.store_entry(entry, None).await;
let tables = lines_to_batches("cpu bar=1 10", 0).unwrap();
let write = DbWrite::new(tables, Default::default());
let res = db.route_write(&write).await;
assert_contains!(
res.unwrap_err().to_string(),
"Cannot write to this database: no mutable buffer configured"
@ -1452,8 +1429,9 @@ mod tests {
.await
.db;
let entry = lp_to_entry("cpu bar=1 10");
test_db.store_entry(entry, None).await.unwrap();
let tables = lines_to_batches("cpu bar=1 10", 0).unwrap();
let write = DbWrite::new(tables, Default::default());
test_db.route_write(&write).await.unwrap();
assert_eq!(write_buffer_state.get_messages(0).len(), 1);
}
@ -1474,8 +1452,9 @@ mod tests {
.await
.db;
let entry = lp_to_entry("cpu bar=1 10");
db.store_entry(entry, None).await.unwrap();
let tables = lines_to_batches("cpu bar=1 10", 0).unwrap();
let write = DbWrite::new(tables, Default::default());
db.route_write(&write).await.unwrap();
assert_eq!(write_buffer_state.get_messages(0).len(), 1);
@ -1501,9 +1480,9 @@ mod tests {
.await
.db;
let entry = lp_to_entry("cpu bar=1 10");
let res = db.store_entry(entry, None).await;
let tables = lines_to_batches("cpu bar=1 10", 0).unwrap();
let write = DbWrite::new(tables, Default::default());
let res = db.route_write(&write).await;
assert!(
matches!(res, Err(Error::WriteBufferWritingError { .. })),
@ -1516,8 +1495,9 @@ mod tests {
async fn cant_write_when_reading_from_write_buffer() {
// Validate that writes are rejected if this database is reading from the write buffer
let db = immutable_db().await;
let entry = lp_to_entry("cpu bar=1 10");
let res = db.store_entry(entry, None).await;
let tables = lines_to_batches("cpu bar=1 10", 0).unwrap();
let write = DbWrite::new(tables, Default::default());
let res = db.route_write(&write).await;
assert_contains!(
res.unwrap_err().to_string(),
"Cannot write to this database: no mutable buffer configured"
@ -1552,10 +1532,11 @@ mod tests {
bar,t1=alpha iv=1i 1
"#;
let entry = lp_to_entry(lp);
let tables = lines_to_batches(lp, 0).unwrap();
let write = DbWrite::new(tables, Default::default());
// This should succeed and start chunks in the MUB
db.store_entry(entry, None).await.unwrap();
db.route_write(&write).await.unwrap();
// Line 1 has the same schema and should end up in the MUB.
// Line 2 has a different schema than line 1 and should error
@ -1565,12 +1546,13 @@ mod tests {
foo,t1=important iv=1i 3"
.to_string();
let entry = lp_to_entry(&lp);
let tables = lines_to_batches(&lp, 0).unwrap();
let write = DbWrite::new(tables, Default::default());
// This should return an error because there was at least one error in the loop
let result = db.store_entry(entry, None).await;
let err = db.route_write(&write).await.unwrap_err();
assert_contains!(
result.unwrap_err().to_string(),
err.to_string(),
"Storing database write failed with the following error(s), and possibly more:"
);
@ -2366,9 +2348,9 @@ mod tests {
time.inc(Duration::from_secs(1));
let entry = lp_to_entry("cpu bar=true 10");
let result = db.store_entry(entry, None).await;
assert!(result.is_err());
let tables = lines_to_batches("cpu bar=true 10", 0).unwrap();
let write = DbWrite::new(tables, Default::default());
db.route_write(&write).await.unwrap_err();
{
let partition = db.catalog.partition("cpu", partition_key).unwrap();
let partition = partition.read();
@ -2382,19 +2364,8 @@ mod tests {
#[tokio::test]
async fn write_updates_persistence_windows() {
// Writes should update the persistence windows when there
// is a write buffer configured.
let write_buffer_state =
MockBufferSharedState::empty_with_n_sequencers(NonZeroU32::try_from(1).unwrap());
let time_provider = Arc::new(time::MockProvider::new(Time::from_timestamp_nanos(0)));
let write_buffer = Arc::new(
MockBufferForWriting::new(write_buffer_state.clone(), None, time_provider).unwrap(),
);
let db = TestDb::builder()
.write_buffer_producer(write_buffer)
.build()
.await
.db;
// Writes should update the persistence windows
let db = make_db().await.db;
let partition_key = "1970-01-01T00";
write_lp(&db, "cpu bar=1 10").await; // seq 0
@ -2406,8 +2377,8 @@ mod tests {
let windows = partition.persistence_windows().unwrap();
let seq = windows.minimum_unpersisted_sequence().unwrap();
let seq = seq.get(&0).unwrap();
assert_eq!(seq, &MinMaxSequence::new(0, 2));
// No write buffer configured
assert!(seq.is_empty());
}
#[tokio::test]

View File

@ -105,6 +105,8 @@ use uuid::Uuid;
pub use application::ApplicationState;
pub use db::Db;
pub use job::JobRegistry;
use mutable_batch::{DbWrite, WriteMeta};
use mutable_batch_entry::entry_to_batches;
pub use resolver::{GrpcConnectionString, RemoteTemplate};
mod application;
@ -272,6 +274,9 @@ pub enum Error {
#[snafu(display("error persisting server config to object storage: {}", source))]
PersistServerConfig { source: object_store::Error },
#[snafu(display("Cannot convert entry to db write: {}", source))]
EntryConversion { source: mutable_batch_entry::Error },
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@ -1045,8 +1050,11 @@ where
) -> Result<()> {
use database::WriteError;
let tables = entry_to_batches(&entry).context(EntryConversion)?;
let write = DbWrite::new(tables, WriteMeta::new(None, None, span_ctx, None));
self.active_database(db_name)?
.write_entry(entry, span_ctx)
.route_write(&write)
.await
.map_err(|e| match e {
WriteError::NotInitialized { .. } => Error::DatabaseNotInitialized {
@ -1498,7 +1506,6 @@ mod tests {
use entry::test_helpers::lp_to_entry;
use influxdb_line_protocol::parse_lines;
use iox_object_store::IoxObjectStore;
use metric::{Attributes, Metric, U64Counter};
use object_store::ObjectStore;
use parquet_catalog::{
core::{PreservedCatalog, PreservedCatalogConfig},
@ -1856,9 +1863,7 @@ mod tests {
#[tokio::test]
async fn write_entry_local() {
let application = make_application();
let registry = Arc::clone(application.metric_registry());
let server = make_server(application);
let server = make_server(make_application());
server.set_id(ServerId::try_from(1).unwrap()).unwrap();
server.wait_for_init().await.unwrap();
@ -1881,10 +1886,11 @@ mod tests {
rules.as_ref(),
)
.expect("sharded entries");
assert_eq!(sharded_entries.len(), 1);
let entry = &sharded_entries[0].entry;
let entry = sharded_entries.into_iter().next().unwrap().entry;
server
.write_entry_local(&name, entry.clone(), None)
.write_entry_local(&name, entry, None)
.await
.expect("write entry");
@ -1897,14 +1903,6 @@ mod tests {
"+-----+--------------------------------+",
];
assert_batches_eq!(expected, &batches);
let bytes = registry
.get_instrument::<Metric<U64Counter>>("ingest_entries_bytes")
.unwrap()
.get_observer(&Attributes::from(&[("status", "ok"), ("db_name", "foo")]))
.unwrap()
.fetch();
assert_eq!(bytes, 240)
}
// This tests sets up a database with a sharding config which defines exactly one shard