Merge branch 'main' into pd/data-generator-optimiziation
commit
f19788ce94
|
@ -923,32 +923,10 @@ mod tests {
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.clone();
|
.clone();
|
||||||
|
|
||||||
let entry_ingest = metric_registry
|
|
||||||
.get_instrument::<Metric<U64Counter>>("ingest_entries_bytes")
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
let entry_ingest_ok = entry_ingest
|
|
||||||
.get_observer(&Attributes::from(&[
|
|
||||||
("db_name", "MyOrg_MyBucket"),
|
|
||||||
("status", "ok"),
|
|
||||||
]))
|
|
||||||
.unwrap()
|
|
||||||
.clone();
|
|
||||||
|
|
||||||
let entry_ingest_error = entry_ingest
|
|
||||||
.get_observer(&Attributes::from(&[
|
|
||||||
("db_name", "MyOrg_MyBucket"),
|
|
||||||
("status", "error"),
|
|
||||||
]))
|
|
||||||
.unwrap()
|
|
||||||
.clone();
|
|
||||||
|
|
||||||
assert_eq!(request_duration_ok.fetch().sample_count(), 1);
|
assert_eq!(request_duration_ok.fetch().sample_count(), 1);
|
||||||
assert_eq!(request_count_ok.fetch(), 1);
|
assert_eq!(request_count_ok.fetch(), 1);
|
||||||
assert_eq!(request_count_client_error.fetch(), 0);
|
assert_eq!(request_count_client_error.fetch(), 0);
|
||||||
assert_eq!(request_count_server_error.fetch(), 0);
|
assert_eq!(request_count_server_error.fetch(), 0);
|
||||||
assert_ne!(entry_ingest_ok.fetch(), 0);
|
|
||||||
assert_eq!(entry_ingest_error.fetch(), 0);
|
|
||||||
|
|
||||||
// A single successful point landed
|
// A single successful point landed
|
||||||
let ingest_lines = metric_registry
|
let ingest_lines = metric_registry
|
||||||
|
@ -1051,8 +1029,6 @@ mod tests {
|
||||||
assert_eq!(request_count_ok.fetch(), 1);
|
assert_eq!(request_count_ok.fetch(), 1);
|
||||||
assert_eq!(request_count_client_error.fetch(), 0);
|
assert_eq!(request_count_client_error.fetch(), 0);
|
||||||
assert_eq!(request_count_server_error.fetch(), 1);
|
assert_eq!(request_count_server_error.fetch(), 1);
|
||||||
assert_ne!(entry_ingest_ok.fetch(), 0);
|
|
||||||
assert_ne!(entry_ingest_error.fetch(), 0);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Sets up a test database with some data for testing the query endpoint
|
/// Sets up a test database with some data for testing the query endpoint
|
||||||
|
|
|
@ -18,6 +18,7 @@ use generated_types::{
|
||||||
};
|
};
|
||||||
use internal_types::freezable::Freezable;
|
use internal_types::freezable::Freezable;
|
||||||
use iox_object_store::IoxObjectStore;
|
use iox_object_store::IoxObjectStore;
|
||||||
|
use mutable_batch::DbWrite;
|
||||||
use observability_deps::tracing::{error, info, warn};
|
use observability_deps::tracing::{error, info, warn};
|
||||||
use parking_lot::{MappedRwLockReadGuard, RwLock, RwLockReadGuard};
|
use parking_lot::{MappedRwLockReadGuard, RwLock, RwLockReadGuard};
|
||||||
use parquet_catalog::core::PreservedCatalog;
|
use parquet_catalog::core::PreservedCatalog;
|
||||||
|
@ -26,13 +27,10 @@ use snafu::{ensure, OptionExt, ResultExt, Snafu};
|
||||||
use std::{future::Future, sync::Arc, time::Duration};
|
use std::{future::Future, sync::Arc, time::Duration};
|
||||||
use tokio::{sync::Notify, task::JoinError};
|
use tokio::{sync::Notify, task::JoinError};
|
||||||
use tokio_util::sync::CancellationToken;
|
use tokio_util::sync::CancellationToken;
|
||||||
use trace::ctx::SpanContext;
|
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
const INIT_BACKOFF: Duration = Duration::from_secs(1);
|
const INIT_BACKOFF: Duration = Duration::from_secs(1);
|
||||||
|
|
||||||
mod metrics;
|
|
||||||
|
|
||||||
#[derive(Debug, Snafu)]
|
#[derive(Debug, Snafu)]
|
||||||
pub enum Error {
|
pub enum Error {
|
||||||
#[snafu(display(
|
#[snafu(display(
|
||||||
|
@ -162,16 +160,12 @@ impl Database {
|
||||||
"new database"
|
"new database"
|
||||||
);
|
);
|
||||||
|
|
||||||
let metrics =
|
|
||||||
metrics::Metrics::new(application.metric_registry().as_ref(), config.name.as_str());
|
|
||||||
|
|
||||||
let shared = Arc::new(DatabaseShared {
|
let shared = Arc::new(DatabaseShared {
|
||||||
config,
|
config,
|
||||||
application,
|
application,
|
||||||
shutdown: Default::default(),
|
shutdown: Default::default(),
|
||||||
state: RwLock::new(Freezable::new(DatabaseState::Known(DatabaseStateKnown {}))),
|
state: RwLock::new(Freezable::new(DatabaseState::Known(DatabaseStateKnown {}))),
|
||||||
state_notify: Default::default(),
|
state_notify: Default::default(),
|
||||||
metrics,
|
|
||||||
});
|
});
|
||||||
|
|
||||||
let handle = tokio::spawn(background_worker(Arc::clone(&shared)));
|
let handle = tokio::spawn(background_worker(Arc::clone(&shared)));
|
||||||
|
@ -589,18 +583,12 @@ impl Database {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Writes an entry to this `Database` this will either:
|
/// Writes a [`DbWrite`] to this `Database` this will either:
|
||||||
///
|
///
|
||||||
/// - write it to a write buffer
|
/// - write it to a write buffer
|
||||||
/// - write it to a local `Db`
|
/// - write it to a local `Db`
|
||||||
///
|
///
|
||||||
pub async fn write_entry(
|
pub async fn route_write(&self, write: &DbWrite) -> Result<(), WriteError> {
|
||||||
&self,
|
|
||||||
entry: entry::Entry,
|
|
||||||
span_ctx: Option<SpanContext>,
|
|
||||||
) -> Result<(), WriteError> {
|
|
||||||
let recorder = self.shared.metrics.entry_ingest(entry.data().len());
|
|
||||||
|
|
||||||
let db = {
|
let db = {
|
||||||
let state = self.shared.state.read();
|
let state = self.shared.state.read();
|
||||||
match &**state {
|
match &**state {
|
||||||
|
@ -617,7 +605,7 @@ impl Database {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
db.store_entry(entry, span_ctx).await.map_err(|e| {
|
db.route_write(write).await.map_err(|e| {
|
||||||
use super::db::Error;
|
use super::db::Error;
|
||||||
match e {
|
match e {
|
||||||
// TODO: Pull write buffer producer out of Db
|
// TODO: Pull write buffer producer out of Db
|
||||||
|
@ -628,7 +616,6 @@ impl Database {
|
||||||
}
|
}
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
recorder.success();
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -666,9 +653,6 @@ struct DatabaseShared {
|
||||||
|
|
||||||
/// Notify that the database state has changed
|
/// Notify that the database state has changed
|
||||||
state_notify: Notify,
|
state_notify: Notify,
|
||||||
|
|
||||||
/// Metrics for this database
|
|
||||||
metrics: metrics::Metrics,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// The background worker for `Database` - there should only ever be one
|
/// The background worker for `Database` - there should only ever be one
|
||||||
|
|
|
@ -1,58 +0,0 @@
|
||||||
use metric::U64Counter;
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub struct Metrics {
|
|
||||||
ingest_entries_bytes_ok: U64Counter,
|
|
||||||
|
|
||||||
ingest_entries_bytes_error: U64Counter,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Metrics {
|
|
||||||
pub fn new(registry: &metric::Registry, db_name: impl Into<String>) -> Self {
|
|
||||||
let db_name = db_name.into();
|
|
||||||
let metric = registry
|
|
||||||
.register_metric::<U64Counter>("ingest_entries_bytes", "total ingested entry bytes");
|
|
||||||
|
|
||||||
Self {
|
|
||||||
ingest_entries_bytes_ok: metric
|
|
||||||
.recorder([("db_name", db_name.clone().into()), ("status", "ok".into())]),
|
|
||||||
ingest_entries_bytes_error: metric
|
|
||||||
.recorder([("db_name", db_name.into()), ("status", "error".into())]),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Get a recorder for reporting entry ingest
|
|
||||||
pub fn entry_ingest(&self, bytes: usize) -> EntryIngestRecorder<'_> {
|
|
||||||
EntryIngestRecorder {
|
|
||||||
metrics: self,
|
|
||||||
recorded: false,
|
|
||||||
bytes,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// An RAII handle that records metrics for ingest
|
|
||||||
///
|
|
||||||
/// Records an error on drop unless `EntryIngestRecorder::success` invoked
|
|
||||||
pub struct EntryIngestRecorder<'a> {
|
|
||||||
metrics: &'a Metrics,
|
|
||||||
bytes: usize,
|
|
||||||
recorded: bool,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<'a> EntryIngestRecorder<'a> {
|
|
||||||
pub fn success(mut self) {
|
|
||||||
self.recorded = true;
|
|
||||||
self.metrics.ingest_entries_bytes_ok.inc(self.bytes as u64)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<'a> Drop for EntryIngestRecorder<'a> {
|
|
||||||
fn drop(&mut self) {
|
|
||||||
if !self.recorded {
|
|
||||||
self.metrics
|
|
||||||
.ingest_entries_bytes_error
|
|
||||||
.inc(self.bytes as u64);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
117
server/src/db.rs
117
server/src/db.rs
|
@ -26,11 +26,8 @@ use data_types::{
|
||||||
server_id::ServerId,
|
server_id::ServerId,
|
||||||
};
|
};
|
||||||
use datafusion::catalog::{catalog::CatalogProvider, schema::SchemaProvider};
|
use datafusion::catalog::{catalog::CatalogProvider, schema::SchemaProvider};
|
||||||
use entry::Entry;
|
|
||||||
use iox_object_store::IoxObjectStore;
|
use iox_object_store::IoxObjectStore;
|
||||||
use mutable_batch::payload::{DbWrite, PartitionWrite};
|
use mutable_batch::payload::{DbWrite, PartitionWrite};
|
||||||
use mutable_batch::WriteMeta;
|
|
||||||
use mutable_batch_entry::entry_to_batches;
|
|
||||||
use mutable_buffer::{ChunkMetrics as MutableBufferChunkMetrics, MBChunk};
|
use mutable_buffer::{ChunkMetrics as MutableBufferChunkMetrics, MBChunk};
|
||||||
use observability_deps::tracing::{debug, error, info, warn};
|
use observability_deps::tracing::{debug, error, info, warn};
|
||||||
use parquet_catalog::{
|
use parquet_catalog::{
|
||||||
|
@ -105,9 +102,6 @@ pub enum Error {
|
||||||
#[snafu(display("Hard buffer size limit reached"))]
|
#[snafu(display("Hard buffer size limit reached"))]
|
||||||
HardLimitReached {},
|
HardLimitReached {},
|
||||||
|
|
||||||
#[snafu(display("Cannot convert entry to db write: {}", source))]
|
|
||||||
EntryConversion { source: mutable_batch_entry::Error },
|
|
||||||
|
|
||||||
#[snafu(display(
|
#[snafu(display(
|
||||||
"Cannot delete data from non-existing table, {}: {}",
|
"Cannot delete data from non-existing table, {}: {}",
|
||||||
table_name,
|
table_name,
|
||||||
|
@ -915,19 +909,16 @@ impl Db {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Stores an entry based on the configuration.
|
/// Stores the write on this [`Db`] and/or routes it to the write buffer
|
||||||
///
|
///
|
||||||
/// TODO: Remove this method (#2243)
|
/// TODO: Remove this method (#2243)
|
||||||
pub async fn store_entry(&self, entry: Entry, span_ctx: Option<SpanContext>) -> Result<()> {
|
pub async fn route_write(&self, write: &DbWrite) -> Result<()> {
|
||||||
let immutable = {
|
let immutable = {
|
||||||
let rules = self.rules.read();
|
let rules = self.rules.read();
|
||||||
rules.lifecycle_rules.immutable
|
rules.lifecycle_rules.immutable
|
||||||
};
|
};
|
||||||
debug!(%immutable, has_write_buffer_producer=self.write_buffer_producer.is_some(), "storing entry");
|
debug!(%immutable, has_write_buffer_producer=self.write_buffer_producer.is_some(), "storing entry");
|
||||||
|
|
||||||
let tables = entry_to_batches(&entry).context(EntryConversion)?;
|
|
||||||
let mut write = DbWrite::new(tables, WriteMeta::new(None, None, span_ctx, None));
|
|
||||||
|
|
||||||
match (self.write_buffer_producer.as_ref(), immutable) {
|
match (self.write_buffer_producer.as_ref(), immutable) {
|
||||||
(Some(write_buffer), true) => {
|
(Some(write_buffer), true) => {
|
||||||
// If only the write buffer is configured, this is passing the data through to
|
// If only the write buffer is configured, this is passing the data through to
|
||||||
|
@ -936,7 +927,7 @@ impl Db {
|
||||||
|
|
||||||
// TODO: be smarter than always using sequencer 0
|
// TODO: be smarter than always using sequencer 0
|
||||||
let _ = write_buffer
|
let _ = write_buffer
|
||||||
.store_write(0, &write)
|
.store_write(0, write)
|
||||||
.await
|
.await
|
||||||
.context(WriteBufferWritingError)?;
|
.context(WriteBufferWritingError)?;
|
||||||
|
|
||||||
|
@ -947,12 +938,10 @@ impl Db {
|
||||||
// buffer to return success before adding the entry to the mutable buffer.
|
// buffer to return success before adding the entry to the mutable buffer.
|
||||||
|
|
||||||
// TODO: be smarter than always using sequencer 0
|
// TODO: be smarter than always using sequencer 0
|
||||||
let meta = write_buffer
|
write_buffer
|
||||||
.store_write(0, &write)
|
.store_write(0, write)
|
||||||
.await
|
.await
|
||||||
.context(WriteBufferWritingError)?;
|
.context(WriteBufferWritingError)?;
|
||||||
|
|
||||||
write.set_meta(meta);
|
|
||||||
}
|
}
|
||||||
(_, true) => {
|
(_, true) => {
|
||||||
// If not configured to send entries to the write buffer and the database is
|
// If not configured to send entries to the write buffer and the database is
|
||||||
|
@ -966,7 +955,7 @@ impl Db {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
self.store_write(&write)
|
self.store_write(write)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Writes the provided [`DbWrite`] to this database
|
/// Writes the provided [`DbWrite`] to this database
|
||||||
|
@ -1206,37 +1195,25 @@ pub(crate) fn checkpoint_data_from_catalog(catalog: &Catalog) -> CheckpointData
|
||||||
}
|
}
|
||||||
|
|
||||||
pub mod test_helpers {
|
pub mod test_helpers {
|
||||||
use std::collections::{BTreeSet, HashSet};
|
use std::collections::BTreeSet;
|
||||||
|
|
||||||
use arrow::record_batch::RecordBatch;
|
use arrow::record_batch::RecordBatch;
|
||||||
use data_types::chunk_metadata::ChunkStorage;
|
use data_types::chunk_metadata::ChunkStorage;
|
||||||
use entry::test_helpers::lp_to_entries;
|
use mutable_batch_lp::lines_to_batches;
|
||||||
use query::frontend::sql::SqlQueryPlanner;
|
use query::frontend::sql::SqlQueryPlanner;
|
||||||
|
|
||||||
use super::*;
|
use super::*;
|
||||||
|
|
||||||
/// Try to write lineprotocol data and return all tables that where written.
|
/// Try to write lineprotocol data and return all tables that where written.
|
||||||
pub async fn try_write_lp(db: &Db, lp: &str) -> Result<Vec<String>> {
|
pub async fn try_write_lp(db: &Db, lp: &str) -> Result<Vec<String>> {
|
||||||
let entries = {
|
let tables = lines_to_batches(lp, 0).unwrap();
|
||||||
let partitioner = &db.rules.read().partition_template;
|
let mut table_names: Vec<_> = tables.keys().cloned().collect();
|
||||||
lp_to_entries(lp, partitioner)
|
|
||||||
};
|
|
||||||
|
|
||||||
let mut tables = HashSet::new();
|
let write = DbWrite::new(tables, Default::default());
|
||||||
for entry in entries {
|
db.route_write(&write).await?;
|
||||||
if let Some(writes) = entry.partition_writes() {
|
|
||||||
for write in writes {
|
|
||||||
for batch in write.table_batches() {
|
|
||||||
tables.insert(batch.name().to_string());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
db.store_entry(entry, None).await?;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut tables: Vec<_> = tables.into_iter().collect();
|
table_names.sort_unstable();
|
||||||
tables.sort();
|
Ok(table_names)
|
||||||
Ok(tables)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Same was [`try_write_lp`](try_write_lp) but will panic on failure.
|
/// Same was [`try_write_lp`](try_write_lp) but will panic on failure.
|
||||||
|
@ -1377,17 +1354,16 @@ mod tests {
|
||||||
partition_metadata::{ColumnSummary, InfluxDbType, StatValues, Statistics, TableSummary},
|
partition_metadata::{ColumnSummary, InfluxDbType, StatValues, Statistics, TableSummary},
|
||||||
write_summary::TimestampSummary,
|
write_summary::TimestampSummary,
|
||||||
};
|
};
|
||||||
use entry::test_helpers::lp_to_entry;
|
|
||||||
use futures::{stream, StreamExt, TryStreamExt};
|
use futures::{stream, StreamExt, TryStreamExt};
|
||||||
use iox_object_store::ParquetFilePath;
|
use iox_object_store::ParquetFilePath;
|
||||||
use metric::{Attributes, CumulativeGauge, Metric, Observation};
|
use metric::{Attributes, CumulativeGauge, Metric, Observation};
|
||||||
|
use mutable_batch_lp::lines_to_batches;
|
||||||
use object_store::ObjectStore;
|
use object_store::ObjectStore;
|
||||||
use parquet_catalog::test_helpers::load_ok;
|
use parquet_catalog::test_helpers::load_ok;
|
||||||
use parquet_file::{
|
use parquet_file::{
|
||||||
metadata::IoxParquetMetaData,
|
metadata::IoxParquetMetaData,
|
||||||
test_utils::{load_parquet_from_store_for_path, read_data_from_parquet_data},
|
test_utils::{load_parquet_from_store_for_path, read_data_from_parquet_data},
|
||||||
};
|
};
|
||||||
use persistence_windows::min_max_sequence::MinMaxSequence;
|
|
||||||
use query::{QueryChunk, QueryDatabase};
|
use query::{QueryChunk, QueryDatabase};
|
||||||
use schema::selection::Selection;
|
use schema::selection::Selection;
|
||||||
use schema::Schema;
|
use schema::Schema;
|
||||||
|
@ -1424,8 +1400,9 @@ mod tests {
|
||||||
// Validate that writes are rejected if there is no mutable buffer
|
// Validate that writes are rejected if there is no mutable buffer
|
||||||
let db = immutable_db().await;
|
let db = immutable_db().await;
|
||||||
|
|
||||||
let entry = lp_to_entry("cpu bar=1 10");
|
let tables = lines_to_batches("cpu bar=1 10", 0).unwrap();
|
||||||
let res = db.store_entry(entry, None).await;
|
let write = DbWrite::new(tables, Default::default());
|
||||||
|
let res = db.route_write(&write).await;
|
||||||
assert_contains!(
|
assert_contains!(
|
||||||
res.unwrap_err().to_string(),
|
res.unwrap_err().to_string(),
|
||||||
"Cannot write to this database: no mutable buffer configured"
|
"Cannot write to this database: no mutable buffer configured"
|
||||||
|
@ -1452,8 +1429,9 @@ mod tests {
|
||||||
.await
|
.await
|
||||||
.db;
|
.db;
|
||||||
|
|
||||||
let entry = lp_to_entry("cpu bar=1 10");
|
let tables = lines_to_batches("cpu bar=1 10", 0).unwrap();
|
||||||
test_db.store_entry(entry, None).await.unwrap();
|
let write = DbWrite::new(tables, Default::default());
|
||||||
|
test_db.route_write(&write).await.unwrap();
|
||||||
|
|
||||||
assert_eq!(write_buffer_state.get_messages(0).len(), 1);
|
assert_eq!(write_buffer_state.get_messages(0).len(), 1);
|
||||||
}
|
}
|
||||||
|
@ -1474,8 +1452,9 @@ mod tests {
|
||||||
.await
|
.await
|
||||||
.db;
|
.db;
|
||||||
|
|
||||||
let entry = lp_to_entry("cpu bar=1 10");
|
let tables = lines_to_batches("cpu bar=1 10", 0).unwrap();
|
||||||
db.store_entry(entry, None).await.unwrap();
|
let write = DbWrite::new(tables, Default::default());
|
||||||
|
db.route_write(&write).await.unwrap();
|
||||||
|
|
||||||
assert_eq!(write_buffer_state.get_messages(0).len(), 1);
|
assert_eq!(write_buffer_state.get_messages(0).len(), 1);
|
||||||
|
|
||||||
|
@ -1501,9 +1480,9 @@ mod tests {
|
||||||
.await
|
.await
|
||||||
.db;
|
.db;
|
||||||
|
|
||||||
let entry = lp_to_entry("cpu bar=1 10");
|
let tables = lines_to_batches("cpu bar=1 10", 0).unwrap();
|
||||||
|
let write = DbWrite::new(tables, Default::default());
|
||||||
let res = db.store_entry(entry, None).await;
|
let res = db.route_write(&write).await;
|
||||||
|
|
||||||
assert!(
|
assert!(
|
||||||
matches!(res, Err(Error::WriteBufferWritingError { .. })),
|
matches!(res, Err(Error::WriteBufferWritingError { .. })),
|
||||||
|
@ -1516,8 +1495,9 @@ mod tests {
|
||||||
async fn cant_write_when_reading_from_write_buffer() {
|
async fn cant_write_when_reading_from_write_buffer() {
|
||||||
// Validate that writes are rejected if this database is reading from the write buffer
|
// Validate that writes are rejected if this database is reading from the write buffer
|
||||||
let db = immutable_db().await;
|
let db = immutable_db().await;
|
||||||
let entry = lp_to_entry("cpu bar=1 10");
|
let tables = lines_to_batches("cpu bar=1 10", 0).unwrap();
|
||||||
let res = db.store_entry(entry, None).await;
|
let write = DbWrite::new(tables, Default::default());
|
||||||
|
let res = db.route_write(&write).await;
|
||||||
assert_contains!(
|
assert_contains!(
|
||||||
res.unwrap_err().to_string(),
|
res.unwrap_err().to_string(),
|
||||||
"Cannot write to this database: no mutable buffer configured"
|
"Cannot write to this database: no mutable buffer configured"
|
||||||
|
@ -1552,10 +1532,11 @@ mod tests {
|
||||||
bar,t1=alpha iv=1i 1
|
bar,t1=alpha iv=1i 1
|
||||||
"#;
|
"#;
|
||||||
|
|
||||||
let entry = lp_to_entry(lp);
|
let tables = lines_to_batches(lp, 0).unwrap();
|
||||||
|
let write = DbWrite::new(tables, Default::default());
|
||||||
|
|
||||||
// This should succeed and start chunks in the MUB
|
// This should succeed and start chunks in the MUB
|
||||||
db.store_entry(entry, None).await.unwrap();
|
db.route_write(&write).await.unwrap();
|
||||||
|
|
||||||
// Line 1 has the same schema and should end up in the MUB.
|
// Line 1 has the same schema and should end up in the MUB.
|
||||||
// Line 2 has a different schema than line 1 and should error
|
// Line 2 has a different schema than line 1 and should error
|
||||||
|
@ -1565,12 +1546,13 @@ mod tests {
|
||||||
foo,t1=important iv=1i 3"
|
foo,t1=important iv=1i 3"
|
||||||
.to_string();
|
.to_string();
|
||||||
|
|
||||||
let entry = lp_to_entry(&lp);
|
let tables = lines_to_batches(&lp, 0).unwrap();
|
||||||
|
let write = DbWrite::new(tables, Default::default());
|
||||||
|
|
||||||
// This should return an error because there was at least one error in the loop
|
// This should return an error because there was at least one error in the loop
|
||||||
let result = db.store_entry(entry, None).await;
|
let err = db.route_write(&write).await.unwrap_err();
|
||||||
assert_contains!(
|
assert_contains!(
|
||||||
result.unwrap_err().to_string(),
|
err.to_string(),
|
||||||
"Storing database write failed with the following error(s), and possibly more:"
|
"Storing database write failed with the following error(s), and possibly more:"
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -2366,9 +2348,9 @@ mod tests {
|
||||||
|
|
||||||
time.inc(Duration::from_secs(1));
|
time.inc(Duration::from_secs(1));
|
||||||
|
|
||||||
let entry = lp_to_entry("cpu bar=true 10");
|
let tables = lines_to_batches("cpu bar=true 10", 0).unwrap();
|
||||||
let result = db.store_entry(entry, None).await;
|
let write = DbWrite::new(tables, Default::default());
|
||||||
assert!(result.is_err());
|
db.route_write(&write).await.unwrap_err();
|
||||||
{
|
{
|
||||||
let partition = db.catalog.partition("cpu", partition_key).unwrap();
|
let partition = db.catalog.partition("cpu", partition_key).unwrap();
|
||||||
let partition = partition.read();
|
let partition = partition.read();
|
||||||
|
@ -2382,19 +2364,8 @@ mod tests {
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn write_updates_persistence_windows() {
|
async fn write_updates_persistence_windows() {
|
||||||
// Writes should update the persistence windows when there
|
// Writes should update the persistence windows
|
||||||
// is a write buffer configured.
|
let db = make_db().await.db;
|
||||||
let write_buffer_state =
|
|
||||||
MockBufferSharedState::empty_with_n_sequencers(NonZeroU32::try_from(1).unwrap());
|
|
||||||
let time_provider = Arc::new(time::MockProvider::new(Time::from_timestamp_nanos(0)));
|
|
||||||
let write_buffer = Arc::new(
|
|
||||||
MockBufferForWriting::new(write_buffer_state.clone(), None, time_provider).unwrap(),
|
|
||||||
);
|
|
||||||
let db = TestDb::builder()
|
|
||||||
.write_buffer_producer(write_buffer)
|
|
||||||
.build()
|
|
||||||
.await
|
|
||||||
.db;
|
|
||||||
|
|
||||||
let partition_key = "1970-01-01T00";
|
let partition_key = "1970-01-01T00";
|
||||||
write_lp(&db, "cpu bar=1 10").await; // seq 0
|
write_lp(&db, "cpu bar=1 10").await; // seq 0
|
||||||
|
@ -2406,8 +2377,8 @@ mod tests {
|
||||||
let windows = partition.persistence_windows().unwrap();
|
let windows = partition.persistence_windows().unwrap();
|
||||||
let seq = windows.minimum_unpersisted_sequence().unwrap();
|
let seq = windows.minimum_unpersisted_sequence().unwrap();
|
||||||
|
|
||||||
let seq = seq.get(&0).unwrap();
|
// No write buffer configured
|
||||||
assert_eq!(seq, &MinMaxSequence::new(0, 2));
|
assert!(seq.is_empty());
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
|
|
|
@ -105,6 +105,8 @@ use uuid::Uuid;
|
||||||
pub use application::ApplicationState;
|
pub use application::ApplicationState;
|
||||||
pub use db::Db;
|
pub use db::Db;
|
||||||
pub use job::JobRegistry;
|
pub use job::JobRegistry;
|
||||||
|
use mutable_batch::{DbWrite, WriteMeta};
|
||||||
|
use mutable_batch_entry::entry_to_batches;
|
||||||
pub use resolver::{GrpcConnectionString, RemoteTemplate};
|
pub use resolver::{GrpcConnectionString, RemoteTemplate};
|
||||||
|
|
||||||
mod application;
|
mod application;
|
||||||
|
@ -272,6 +274,9 @@ pub enum Error {
|
||||||
|
|
||||||
#[snafu(display("error persisting server config to object storage: {}", source))]
|
#[snafu(display("error persisting server config to object storage: {}", source))]
|
||||||
PersistServerConfig { source: object_store::Error },
|
PersistServerConfig { source: object_store::Error },
|
||||||
|
|
||||||
|
#[snafu(display("Cannot convert entry to db write: {}", source))]
|
||||||
|
EntryConversion { source: mutable_batch_entry::Error },
|
||||||
}
|
}
|
||||||
|
|
||||||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||||
|
@ -1045,8 +1050,11 @@ where
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
use database::WriteError;
|
use database::WriteError;
|
||||||
|
|
||||||
|
let tables = entry_to_batches(&entry).context(EntryConversion)?;
|
||||||
|
let write = DbWrite::new(tables, WriteMeta::new(None, None, span_ctx, None));
|
||||||
|
|
||||||
self.active_database(db_name)?
|
self.active_database(db_name)?
|
||||||
.write_entry(entry, span_ctx)
|
.route_write(&write)
|
||||||
.await
|
.await
|
||||||
.map_err(|e| match e {
|
.map_err(|e| match e {
|
||||||
WriteError::NotInitialized { .. } => Error::DatabaseNotInitialized {
|
WriteError::NotInitialized { .. } => Error::DatabaseNotInitialized {
|
||||||
|
@ -1498,7 +1506,6 @@ mod tests {
|
||||||
use entry::test_helpers::lp_to_entry;
|
use entry::test_helpers::lp_to_entry;
|
||||||
use influxdb_line_protocol::parse_lines;
|
use influxdb_line_protocol::parse_lines;
|
||||||
use iox_object_store::IoxObjectStore;
|
use iox_object_store::IoxObjectStore;
|
||||||
use metric::{Attributes, Metric, U64Counter};
|
|
||||||
use object_store::ObjectStore;
|
use object_store::ObjectStore;
|
||||||
use parquet_catalog::{
|
use parquet_catalog::{
|
||||||
core::{PreservedCatalog, PreservedCatalogConfig},
|
core::{PreservedCatalog, PreservedCatalogConfig},
|
||||||
|
@ -1856,9 +1863,7 @@ mod tests {
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn write_entry_local() {
|
async fn write_entry_local() {
|
||||||
let application = make_application();
|
let server = make_server(make_application());
|
||||||
let registry = Arc::clone(application.metric_registry());
|
|
||||||
let server = make_server(application);
|
|
||||||
server.set_id(ServerId::try_from(1).unwrap()).unwrap();
|
server.set_id(ServerId::try_from(1).unwrap()).unwrap();
|
||||||
server.wait_for_init().await.unwrap();
|
server.wait_for_init().await.unwrap();
|
||||||
|
|
||||||
|
@ -1881,10 +1886,11 @@ mod tests {
|
||||||
rules.as_ref(),
|
rules.as_ref(),
|
||||||
)
|
)
|
||||||
.expect("sharded entries");
|
.expect("sharded entries");
|
||||||
|
assert_eq!(sharded_entries.len(), 1);
|
||||||
|
|
||||||
let entry = &sharded_entries[0].entry;
|
let entry = sharded_entries.into_iter().next().unwrap().entry;
|
||||||
server
|
server
|
||||||
.write_entry_local(&name, entry.clone(), None)
|
.write_entry_local(&name, entry, None)
|
||||||
.await
|
.await
|
||||||
.expect("write entry");
|
.expect("write entry");
|
||||||
|
|
||||||
|
@ -1897,14 +1903,6 @@ mod tests {
|
||||||
"+-----+--------------------------------+",
|
"+-----+--------------------------------+",
|
||||||
];
|
];
|
||||||
assert_batches_eq!(expected, &batches);
|
assert_batches_eq!(expected, &batches);
|
||||||
|
|
||||||
let bytes = registry
|
|
||||||
.get_instrument::<Metric<U64Counter>>("ingest_entries_bytes")
|
|
||||||
.unwrap()
|
|
||||||
.get_observer(&Attributes::from(&[("status", "ok"), ("db_name", "foo")]))
|
|
||||||
.unwrap()
|
|
||||||
.fetch();
|
|
||||||
assert_eq!(bytes, 240)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// This tests sets up a database with a sharding config which defines exactly one shard
|
// This tests sets up a database with a sharding config which defines exactly one shard
|
||||||
|
|
Loading…
Reference in New Issue