From 4d2954ec1d14f9e5e1fad601a2ea062243b8d12c Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Thu, 24 Jun 2021 11:11:18 -0400 Subject: [PATCH 1/4] test: Write a failing tests for partition_writes being ignored after a failure --- server/src/db.rs | 58 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 58 insertions(+) diff --git a/server/src/db.rs b/server/src/db.rs index ef760b6aca..8cc6e58e18 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -1189,6 +1189,64 @@ mod tests { assert_batches_eq!(expected, &batches); } + #[tokio::test] + async fn try_all_partition_writes_when_some_fail() { + let db = Arc::new(make_db().await.db); + + let nanoseconds_per_hour = 60 * 60 * 1_000_000_000u64; + + // 3 lines that will go into 3 hour partitions and start new chunks. + let lp = format!( + "foo,t1=alpha iv=1i {} + foo,t1=bravo iv=1i {} + foo,t1=charlie iv=1i {}", + 0, + nanoseconds_per_hour, + nanoseconds_per_hour * 2, + ); + + let entry = lp_to_entry(&lp); + + // This should succeed and start chunks in the MUB + db.store_entry(entry).await.unwrap(); + + // 3 more lines that should go in the 3 partitions/chunks. + // Line 1 has the same schema and should end up in the MUB. + // Line 2 has a different schema than line 1 and should error + // Line 3 has the same schema as line 1 and should end up in the MUB. + let lp = format!( + "foo,t1=delta iv=1i {} + foo t1=10i {} + foo,t1=important iv=1i {}", + 1, + nanoseconds_per_hour + 1, + nanoseconds_per_hour * 2 + 1, + ); + + let entry = lp_to_entry(&lp); + + // This should return an error because there was at least one error in the loop + let result = db.store_entry(entry).await; + assert!(result.is_err()); + + // But 5 points should be returned, most importantly the last one after the line with + // the mismatched schema + let batches = run_query(db, "select t1 from foo").await; + + let expected = vec![ + "+-----------+", + "| t1 |", + "+-----------+", + "| alpha |", + "| bravo |", + "| charlie |", + "| delta |", + "| important |", + "+-----------+", + ]; + assert_batches_sorted_eq!(expected, &batches); + } + fn catalog_chunk_size_bytes_metric_eq( reg: &metrics::TestMetricRegistry, source: &'static str, From f3a3a9b2677a6ffb3fe547c2bdfe3a4aef56ede9 Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Thu, 24 Jun 2021 11:23:09 -0400 Subject: [PATCH 2/4] fix: Try to write all partition_writes even if one fails, collect all errors and report at the end --- server/src/db.rs | 33 +++++++++++++++++++++++++++------ 1 file changed, 27 insertions(+), 6 deletions(-) diff --git a/server/src/db.rs b/server/src/db.rs index 8cc6e58e18..9195c5cba7 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -17,7 +17,7 @@ use arrow::datatypes::SchemaRef as ArrowSchemaRef; use async_trait::async_trait; use parking_lot::RwLock; use rand_distr::{Distribution, Poisson}; -use snafu::{ResultExt, Snafu}; +use snafu::{ResultExt, Snafu, ensure}; use ::lifecycle::{LifecycleWriteGuard, LockableChunk}; use catalog::{chunk::CatalogChunk, Catalog}; @@ -148,6 +148,12 @@ pub enum Error { source: mutable_buffer::chunk::Error, }, + #[snafu(display( + "Storing sequenced entry failed with the following error(s): {}", + errors.iter().map(ToString::to_string).collect::>().join(", ") + ))] + StoreSequencedEntryFailures { errors: Vec }, + #[snafu(display("Error building sequenced entry: {}", source))] SequencedEntryError { source: entry::SequencedEntryError }, @@ -863,6 +869,8 @@ impl Db { } if let Some(partitioned_writes) = sequenced_entry.partition_writes() { + let mut errors = vec![]; + for write in partitioned_writes { let partition_key = write.key(); for table_batch in write.table_batches() { @@ -891,7 +899,7 @@ impl Db { let mb_chunk = chunk.mutable_buffer().expect("cannot mutate open chunk"); - mb_chunk + if let Err(e) = mb_chunk .write_table_batch( sequenced_entry.sequence().id, sequenced_entry.sequence().number, @@ -900,7 +908,11 @@ impl Db { .context(WriteEntry { partition_key, chunk_id, - })?; + }) + { + errors.push(e); + continue; + }; check_chunk_closed(&mut *chunk, mutable_size_threshold); } @@ -917,13 +929,17 @@ impl Db { ), ); - mb_chunk + if let Err(e) = mb_chunk .write_table_batch( sequenced_entry.sequence().id, sequenced_entry.sequence().number, table_batch, ) - .context(WriteEntryInitial { partition_key })?; + .context(WriteEntryInitial { partition_key }) + { + errors.push(e); + continue; + } let new_chunk = partition.create_open_chunk(mb_chunk); check_chunk_closed(&mut *new_chunk.write(), mutable_size_threshold); @@ -954,6 +970,8 @@ impl Db { } } } + + ensure!(errors.is_empty(), StoreSequencedEntryFailures { errors }); } Ok(()) @@ -1227,7 +1245,10 @@ mod tests { // This should return an error because there was at least one error in the loop let result = db.store_entry(entry).await; - assert!(result.is_err()); + assert_contains!( + result.unwrap_err().to_string(), + "Storing sequenced entry failed with the following error(s):" + ); // But 5 points should be returned, most importantly the last one after the line with // the mismatched schema From 1e171e2e9ac1fd0c83d939c9134f6c8e24edf584 Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Thu, 24 Jun 2021 14:04:15 -0400 Subject: [PATCH 3/4] refactor: Organize `use` statements and let rustfmt manage order --- server/src/db.rs | 60 ++++++++++++++++++++++++------------------------ 1 file changed, 30 insertions(+), 30 deletions(-) diff --git a/server/src/db.rs b/server/src/db.rs index 9195c5cba7..6a785d57c6 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -1,30 +1,25 @@ //! This module contains the main IOx Database object which has the //! instances of the mutable buffer, read buffer, and object store -use std::collections::HashMap; -use std::future::Future; -use std::{ - any::Any, - num::NonZeroUsize, - sync::{ - atomic::{AtomicUsize, Ordering}, - Arc, +pub(crate) use crate::db::chunk::DbChunk; +use crate::{ + db::{ + access::QueryCatalogAccess, + catalog::{ + chunk::{CatalogChunk, ChunkStage}, + partition::Partition, + Catalog, TableNameFilter, + }, + lifecycle::LockableCatalogChunk, }, - time::{Duration, Instant}, + write_buffer::WriteBuffer, + JobRegistry, }; - +use ::lifecycle::{LifecycleWriteGuard, LockableChunk}; use arrow::datatypes::SchemaRef as ArrowSchemaRef; use async_trait::async_trait; -use parking_lot::RwLock; -use rand_distr::{Distribution, Poisson}; -use snafu::{ResultExt, Snafu, ensure}; - -use ::lifecycle::{LifecycleWriteGuard, LockableChunk}; -use catalog::{chunk::CatalogChunk, Catalog}; -pub(crate) use chunk::DbChunk; -use data_types::chunk_metadata::ChunkAddr; use data_types::{ - chunk_metadata::ChunkSummary, + chunk_metadata::{ChunkAddr, ChunkSummary}, database_rules::DatabaseRules, job::Job, partition_metadata::{PartitionSummary, TableSummary}, @@ -41,27 +36,31 @@ use mutable_buffer::chunk::{ChunkMetrics as MutableBufferChunkMetrics, MBChunk}; use mutable_buffer::persistence_windows::PersistenceWindows; use object_store::{path::parsed::DirsAndFileName, ObjectStore}; use observability_deps::tracing::{debug, error, info, warn}; -use parquet_file::catalog::CheckpointData; +use parking_lot::RwLock; use parquet_file::{ - catalog::PreservedCatalog, + catalog::{CheckpointData, PreservedCatalog}, chunk::{ChunkMetrics as ParquetChunkMetrics, ParquetChunk}, cleanup::cleanup_unreferenced_parquet_files, metadata::IoxMetadata, storage::Storage, }; use query::{exec::Executor, predicate::Predicate, QueryDatabase}; +use rand_distr::{Distribution, Poisson}; use read_buffer::{ChunkMetrics as ReadBufferChunkMetrics, RBChunk}; +use snafu::{ensure, ResultExt, Snafu}; +use std::{ + any::Any, + collections::HashMap, + future::Future, + num::NonZeroUsize, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, + time::{Duration, Instant}, +}; use tracker::{TaskTracker, TrackedFuture, TrackedFutureExt}; -use crate::db::catalog::chunk::ChunkStage; -use crate::db::catalog::partition::Partition; -use crate::db::lifecycle::LockableCatalogChunk; - -use super::{write_buffer::WriteBuffer, JobRegistry}; - -use self::access::QueryCatalogAccess; -use self::catalog::TableNameFilter; - pub mod access; pub mod catalog; mod chunk; @@ -179,6 +178,7 @@ pub enum Error { #[snafu(display("error finding min/max time on table batch: {}", source))] TableBatchTimeError { source: entry::Error }, } + pub type Result = std::result::Result; /// This is the main IOx Database object. It is the root object of any From 0f7c47d10e70e3a2410a86638282a50afbcdfaf7 Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Fri, 25 Jun 2021 17:10:46 -0400 Subject: [PATCH 4/4] fix: Limit the number of errors per sequenced entry we'll collect --- server/src/db.rs | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/server/src/db.rs b/server/src/db.rs index 6a785d57c6..8a4af0e7e5 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -148,7 +148,7 @@ pub enum Error { }, #[snafu(display( - "Storing sequenced entry failed with the following error(s): {}", + "Storing sequenced entry failed with the following error(s), and possibly more: {}", errors.iter().map(ToString::to_string).collect::>().join(", ") ))] StoreSequencedEntryFailures { errors: Vec }, @@ -869,7 +869,10 @@ impl Db { } if let Some(partitioned_writes) = sequenced_entry.partition_writes() { - let mut errors = vec![]; + // Protect against DoS by limiting the number of errors we might collect + const MAX_ERRORS_PER_SEQUENCED_ENTRY: usize = 10; + + let mut errors = Vec::with_capacity(MAX_ERRORS_PER_SEQUENCED_ENTRY); for write in partitioned_writes { let partition_key = write.key(); @@ -910,7 +913,9 @@ impl Db { chunk_id, }) { - errors.push(e); + if errors.len() < MAX_ERRORS_PER_SEQUENCED_ENTRY { + errors.push(e); + } continue; }; @@ -937,7 +942,9 @@ impl Db { ) .context(WriteEntryInitial { partition_key }) { - errors.push(e); + if errors.len() < MAX_ERRORS_PER_SEQUENCED_ENTRY { + errors.push(e); + } continue; } @@ -1247,7 +1254,7 @@ mod tests { let result = db.store_entry(entry).await; assert_contains!( result.unwrap_err().to_string(), - "Storing sequenced entry failed with the following error(s):" + "Storing sequenced entry failed with the following error(s), and possibly more:" ); // But 5 points should be returned, most importantly the last one after the line with