Merge pull request #1798 from influxdata/cn/fix-loop-problem

fix: Try all partition_writes even if one in the middle fails
pull/24376/head
kodiakhq[bot] 2021-06-28 13:53:39 +00:00 committed by GitHub
commit 9456ae5d39
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 120 additions and 34 deletions

View File

@ -1,30 +1,25 @@
//! This module contains the main IOx Database object which has the
//! instances of the mutable buffer, read buffer, and object store
use std::collections::HashMap;
use std::future::Future;
use std::{
any::Any,
num::NonZeroUsize,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
pub(crate) use crate::db::chunk::DbChunk;
use crate::{
db::{
access::QueryCatalogAccess,
catalog::{
chunk::{CatalogChunk, ChunkStage},
partition::Partition,
Catalog, TableNameFilter,
},
lifecycle::LockableCatalogChunk,
},
time::{Duration, Instant},
write_buffer::WriteBuffer,
JobRegistry,
};
use ::lifecycle::{LifecycleWriteGuard, LockableChunk};
use arrow::datatypes::SchemaRef as ArrowSchemaRef;
use async_trait::async_trait;
use parking_lot::RwLock;
use rand_distr::{Distribution, Poisson};
use snafu::{ResultExt, Snafu};
use ::lifecycle::{LifecycleWriteGuard, LockableChunk};
use catalog::{chunk::CatalogChunk, Catalog};
pub(crate) use chunk::DbChunk;
use data_types::chunk_metadata::ChunkAddr;
use data_types::{
chunk_metadata::ChunkSummary,
chunk_metadata::{ChunkAddr, ChunkSummary},
database_rules::DatabaseRules,
job::Job,
partition_metadata::{PartitionSummary, TableSummary},
@ -41,27 +36,31 @@ use mutable_buffer::chunk::{ChunkMetrics as MutableBufferChunkMetrics, MBChunk};
use mutable_buffer::persistence_windows::PersistenceWindows;
use object_store::{path::parsed::DirsAndFileName, ObjectStore};
use observability_deps::tracing::{debug, error, info, warn};
use parquet_file::catalog::CheckpointData;
use parking_lot::RwLock;
use parquet_file::{
catalog::PreservedCatalog,
catalog::{CheckpointData, PreservedCatalog},
chunk::{ChunkMetrics as ParquetChunkMetrics, ParquetChunk},
cleanup::cleanup_unreferenced_parquet_files,
metadata::IoxMetadata,
storage::Storage,
};
use query::{exec::Executor, predicate::Predicate, QueryDatabase};
use rand_distr::{Distribution, Poisson};
use read_buffer::{ChunkMetrics as ReadBufferChunkMetrics, RBChunk};
use snafu::{ensure, ResultExt, Snafu};
use std::{
any::Any,
collections::HashMap,
future::Future,
num::NonZeroUsize,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
time::{Duration, Instant},
};
use tracker::{TaskTracker, TrackedFuture, TrackedFutureExt};
use crate::db::catalog::chunk::ChunkStage;
use crate::db::catalog::partition::Partition;
use crate::db::lifecycle::LockableCatalogChunk;
use super::{write_buffer::WriteBuffer, JobRegistry};
use self::access::QueryCatalogAccess;
use self::catalog::TableNameFilter;
pub mod access;
pub mod catalog;
mod chunk;
@ -148,6 +147,12 @@ pub enum Error {
source: mutable_buffer::chunk::Error,
},
#[snafu(display(
"Storing sequenced entry failed with the following error(s), and possibly more: {}",
errors.iter().map(ToString::to_string).collect::<Vec<_>>().join(", ")
))]
StoreSequencedEntryFailures { errors: Vec<Error> },
#[snafu(display("Error building sequenced entry: {}", source))]
SequencedEntryError { source: entry::SequencedEntryError },
@ -173,6 +178,7 @@ pub enum Error {
#[snafu(display("error finding min/max time on table batch: {}", source))]
TableBatchTimeError { source: entry::Error },
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// This is the main IOx Database object. It is the root object of any
@ -863,6 +869,11 @@ impl Db {
}
if let Some(partitioned_writes) = sequenced_entry.partition_writes() {
// Protect against DoS by limiting the number of errors we might collect
const MAX_ERRORS_PER_SEQUENCED_ENTRY: usize = 10;
let mut errors = Vec::with_capacity(MAX_ERRORS_PER_SEQUENCED_ENTRY);
for write in partitioned_writes {
let partition_key = write.key();
for table_batch in write.table_batches() {
@ -891,7 +902,7 @@ impl Db {
let mb_chunk =
chunk.mutable_buffer().expect("cannot mutate open chunk");
mb_chunk
if let Err(e) = mb_chunk
.write_table_batch(
sequenced_entry.sequence().id,
sequenced_entry.sequence().number,
@ -900,7 +911,13 @@ impl Db {
.context(WriteEntry {
partition_key,
chunk_id,
})?;
})
{
if errors.len() < MAX_ERRORS_PER_SEQUENCED_ENTRY {
errors.push(e);
}
continue;
};
check_chunk_closed(&mut *chunk, mutable_size_threshold);
}
@ -917,13 +934,19 @@ impl Db {
),
);
mb_chunk
if let Err(e) = mb_chunk
.write_table_batch(
sequenced_entry.sequence().id,
sequenced_entry.sequence().number,
table_batch,
)
.context(WriteEntryInitial { partition_key })?;
.context(WriteEntryInitial { partition_key })
{
if errors.len() < MAX_ERRORS_PER_SEQUENCED_ENTRY {
errors.push(e);
}
continue;
}
let new_chunk = partition.create_open_chunk(mb_chunk);
check_chunk_closed(&mut *new_chunk.write(), mutable_size_threshold);
@ -954,6 +977,8 @@ impl Db {
}
}
}
ensure!(errors.is_empty(), StoreSequencedEntryFailures { errors });
}
Ok(())
@ -1189,6 +1214,67 @@ mod tests {
assert_batches_eq!(expected, &batches);
}
#[tokio::test]
async fn try_all_partition_writes_when_some_fail() {
let db = Arc::new(make_db().await.db);
let nanoseconds_per_hour = 60 * 60 * 1_000_000_000u64;
// 3 lines that will go into 3 hour partitions and start new chunks.
let lp = format!(
"foo,t1=alpha iv=1i {}
foo,t1=bravo iv=1i {}
foo,t1=charlie iv=1i {}",
0,
nanoseconds_per_hour,
nanoseconds_per_hour * 2,
);
let entry = lp_to_entry(&lp);
// This should succeed and start chunks in the MUB
db.store_entry(entry).await.unwrap();
// 3 more lines that should go in the 3 partitions/chunks.
// Line 1 has the same schema and should end up in the MUB.
// Line 2 has a different schema than line 1 and should error
// Line 3 has the same schema as line 1 and should end up in the MUB.
let lp = format!(
"foo,t1=delta iv=1i {}
foo t1=10i {}
foo,t1=important iv=1i {}",
1,
nanoseconds_per_hour + 1,
nanoseconds_per_hour * 2 + 1,
);
let entry = lp_to_entry(&lp);
// This should return an error because there was at least one error in the loop
let result = db.store_entry(entry).await;
assert_contains!(
result.unwrap_err().to_string(),
"Storing sequenced entry failed with the following error(s), and possibly more:"
);
// But 5 points should be returned, most importantly the last one after the line with
// the mismatched schema
let batches = run_query(db, "select t1 from foo").await;
let expected = vec![
"+-----------+",
"| t1 |",
"+-----------+",
"| alpha |",
"| bravo |",
"| charlie |",
"| delta |",
"| important |",
"+-----------+",
];
assert_batches_sorted_eq!(expected, &batches);
}
fn catalog_chunk_size_bytes_metric_eq(
reg: &metrics::TestMetricRegistry,
source: &'static str,