fix: ensure querier cache is refreshed for partition sort key (#4660)

* test: call `maybe_start_logging` in auto-generated cases

* fix: ensure querier cache is refreshed for partition sort key

Fixes #4631.

* docs: explain querier sort key handling and test

* test: test another version of issue 4631

* fix: correctly invalidate partition sort keys

* fix: fix `table_not_found_on_ingester`
pull/24376/head
Marco Neumann 2022-05-25 12:44:42 +02:00 committed by GitHub
parent cdbe546e50
commit a08a91c5ba
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 647 additions and 179 deletions

View File

@ -132,7 +132,8 @@ async fn table_not_found_on_ingester() {
let table_name = "the_table";
// Set up the cluster ====================================
let mut cluster = MiniCluster::create_shared(database_url).await;
// cannot use shared cluster because we're restarting the ingester
let mut cluster = MiniCluster::create_non_shared_standard(database_url).await;
StepTest::new(
&mut cluster,
@ -261,3 +262,156 @@ async fn ingester_panic() {
.run()
.await
}
#[tokio::test]
async fn issue_4631_a() {
// See https://github.com/influxdata/influxdb_iox/issues/4631
//
// The symptom was that on rare occasion the querier would panic because the query engine was sure there must be a
// partition sort key but the querier did not provide any. For this to happen we need overlapping chunks and all
// these chunks must be sorted. This is only the case if all chunks are persisted (i.e. parquet-backed, so no
// ingester data). The reason why the querier did NOT provide a partition sort key was because it once got queried
// when the partition was fresh and only existed within the ingester, so no partition sort key was calculated yet.
// During that initial query the querier would cache the partition information (incl. the absence of a partition
// sort key) and when queried again (this time all chunks are peristed but overlapping) it would use this stale
// information, confusing the query engine.
test_helpers::maybe_start_logging();
let database_url = maybe_skip_integration!();
let table_name = "the_table";
// Set up the cluster ====================================
let router_config = TestConfig::new_router(&database_url);
let ingester_config =
TestConfig::new_ingester(&router_config).with_ingester_persist_memory_threshold(1000);
let querier_config = TestConfig::new_querier(&ingester_config);
let mut cluster = MiniCluster::new()
.with_router(router_config)
.await
.with_ingester(ingester_config)
.await
.with_querier(querier_config)
.await;
// We need a trigger for persistence that is not time so the test is as stable as possible. We use a long string to
// cross the persistence memory threshold.
let mut super_long_string = String::new();
for _ in 0..10_000 {
super_long_string.push('x');
}
StepTest::new(
&mut cluster,
vec![
// create UNPERSISTED ingester data
// IMPORTANT: The data MUST NOT be persisted before the first query is executed, because persistence
// calculates the partition sort key. The original bug was that the first query on a completely
// unpersisted partition would cache the NULL/None sort key which would later lead to a panic.
Step::WriteLineProtocol(format!("{},tag=A val=\"foo\" 1", table_name)),
Step::WaitForReadable,
Step::AssertNotPersisted,
// cache partition in querier w/o any partition sort key (yet)
// This MUST happen after we have some ingester data but before ANYTHING was persisted. In the original bug
// the querier would now cache the partition w/o a sort key (and would never invalidate this information).
Step::Query {
sql: format!("select * from {}", table_name),
expected: vec![
"+-----+--------------------------------+-----+",
"| tag | time | val |",
"+-----+--------------------------------+-----+",
"| A | 1970-01-01T00:00:00.000000001Z | foo |",
"+-----+--------------------------------+-----+",
],
},
// flush ingester data
// Here the ingester calculates the partition sort key.
Step::WriteLineProtocol(format!(
"{},tag=B val=\"{}\" 2\n",
table_name, super_long_string
)),
Step::WaitForPersisted,
// create overlapping 2nd parquet file
// This is important to trigger the bug within the query engine, because only if there are multiple chunks
// that need to be de-duplicated the bug will occur.
Step::WriteLineProtocol(format!(
"{},tag=A val=\"bar\" 1\n{},tag=B val=\"{}\" 2\n",
table_name, table_name, super_long_string
)),
Step::WaitForPersisted,
// query
// In the original bug the querier would still use NULL/None as a partition sort key but present two sorted
// but overlapping chunks to the query engine.
Step::Query {
sql: format!("select * from {} where tag='A'", table_name),
expected: vec![
"+-----+--------------------------------+-----+",
"| tag | time | val |",
"+-----+--------------------------------+-----+",
"| A | 1970-01-01T00:00:00.000000001Z | bar |",
"+-----+--------------------------------+-----+",
],
},
],
)
.run()
.await
}
#[tokio::test]
async fn issue_4631_b() {
// This is similar to `issue_4631_a` but instead of updating the sort key from NULL/None to something we update it
// with a new tag.
test_helpers::maybe_start_logging();
let database_url = maybe_skip_integration!();
let table_name = "the_table";
// Set up the cluster ====================================
let mut cluster = MiniCluster::create_shared(database_url).await;
StepTest::new(
&mut cluster,
vec![
// create persisted chunk with a single tag column
Step::WriteLineProtocol(format!("{},tag=A val=\"foo\" 1", table_name)),
Step::WaitForPersisted,
// query to prime the querier caches with partition sort key
Step::Query {
sql: format!("select * from {}", table_name),
expected: vec![
"+-----+--------------------------------+-----+",
"| tag | time | val |",
"+-----+--------------------------------+-----+",
"| A | 1970-01-01T00:00:00.000000001Z | foo |",
"+-----+--------------------------------+-----+",
],
},
// create 2nd chunk with an additional tag column (which will be included in the partition sort key)
Step::WriteLineProtocol(format!("{},tag=A,tag2=B val=\"bar\" 1\n", table_name)),
Step::WaitForPersisted,
// in the original bug the querier would now panic with:
//
// Partition sort key tag, time, does not cover or is sorted on the same order of the chunk sort key tag, tag2, time,
//
// Note that we cannot query tag2 because the schema is cached for a while.
Step::Query {
sql: format!(
"select tag, val from {} where tag='A' order by val",
table_name
),
expected: vec![
"+-----+-----+",
"| tag | val |",
"+-----+-----+",
"| A | bar |",
"| A | foo |",
"+-----+-----+",
],
},
],
)
.run()
.await
}

View File

@ -428,6 +428,27 @@ pub struct TestPartition {
}
impl TestPartition {
/// Update sort key.
pub async fn update_sort_key(self: &Arc<Self>, sort_key: SortKey) -> Self {
let partition = self
.catalog
.catalog
.repositories()
.await
.partitions()
.update_sort_key(self.partition.id, &sort_key.to_columns())
.await
.unwrap();
Self {
catalog: Arc::clone(&self.catalog),
namespace: Arc::clone(&self.namespace),
table: Arc::clone(&self.table),
sequencer: Arc::clone(&self.sequencer),
partition,
}
}
/// Create a parquet for the partition
pub async fn create_parquet_file(self: &Arc<Self>, lp: &str) -> TestParquetFile {
self.create_parquet_file_with_min_max(

View File

@ -5,6 +5,7 @@ use cache_system::{
backend::{
lru::{LruBackend, ResourcePool},
resource_consumption::FunctionEstimator,
shared::SharedBackend,
},
driver::Cache,
loader::{metrics::MetricsLoader, FunctionLoader},
@ -13,7 +14,11 @@ use data_types::{PartitionId, SequencerId};
use iox_catalog::interface::Catalog;
use iox_time::TimeProvider;
use schema::sort::SortKey;
use std::{collections::HashMap, mem::size_of_val, sync::Arc};
use std::{
collections::{HashMap, HashSet},
mem::size_of_val,
sync::Arc,
};
use super::ram::RamSize;
@ -23,6 +28,7 @@ const CACHE_ID: &str = "partition";
#[derive(Debug)]
pub struct PartitionCache {
cache: Cache<PartitionId, CachedPartition>,
backend: SharedBackend<PartitionId, CachedPartition>,
}
impl PartitionCache {
@ -54,7 +60,7 @@ impl PartitionCache {
CachedPartition {
sequencer_id: partition.sequencer_id,
sort_key: partition.sort_key(),
sort_key: Arc::new(partition.sort_key()),
}
}
}));
@ -74,9 +80,11 @@ impl PartitionCache {
RamSize(size_of_val(k) + size_of_val(v) + v.size())
})),
));
let backend = SharedBackend::new(backend);
Self {
cache: Cache::new(loader, backend),
cache: Cache::new(loader, Box::new(backend.clone())),
backend,
}
}
@ -86,21 +94,42 @@ impl PartitionCache {
}
/// Get sort key
pub async fn sort_key(&self, partition_id: PartitionId) -> Option<SortKey> {
pub async fn sort_key(&self, partition_id: PartitionId) -> Arc<Option<SortKey>> {
self.cache.get(partition_id).await.sort_key
}
/// Expire partition if the cached sort key does NOT cover the given set of columns.
pub fn expire_if_sort_key_does_not_cover(
&self,
partition_id: PartitionId,
columns: &HashSet<String>,
) {
self.backend.remove_if(&partition_id, |cached_partition| {
if let Some(sort_key) = cached_partition.sort_key.as_ref().as_ref() {
let covered: HashSet<_> = sort_key
.iter()
.map(|(col, _options)| Arc::clone(col))
.collect();
columns.iter().any(|col| !covered.contains(col.as_str()))
} else {
// no sort key at all => need to update
true
}
});
}
}
#[derive(Debug, Clone)]
struct CachedPartition {
sequencer_id: SequencerId,
sort_key: Option<SortKey>,
sort_key: Arc<Option<SortKey>>,
}
impl CachedPartition {
/// RAM-bytes EXCLUDING `self`.
fn size(&self) -> usize {
self.sort_key
.as_ref()
.as_ref()
.map(|sk| sk.size() - size_of_val(sk))
.unwrap_or_default()
@ -185,15 +214,15 @@ mod tests {
);
let sort_key1 = cache.sort_key(p1.id).await;
assert_eq!(sort_key1, p1.sort_key());
assert_eq!(sort_key1.as_ref(), &p1.sort_key());
assert_histogram_metric_count(&catalog.metric_registry, "partition_get_by_id", 1);
let sort_key2 = cache.sort_key(p2.id).await;
assert_eq!(sort_key2, p2.sort_key());
assert_eq!(sort_key2.as_ref(), &p2.sort_key());
assert_histogram_metric_count(&catalog.metric_registry, "partition_get_by_id", 2);
let sort_key1 = cache.sort_key(p1.id).await;
assert_eq!(sort_key1, p1.sort_key());
assert_eq!(sort_key1.as_ref(), &p1.sort_key());
assert_histogram_metric_count(&catalog.metric_registry, "partition_get_by_id", 2);
}
@ -244,4 +273,73 @@ mod tests {
cache.sequencer_id(p2.id).await;
assert_histogram_metric_count(&catalog.metric_registry, "partition_get_by_id", 3);
}
#[tokio::test]
async fn test_expiration() {
let catalog = TestCatalog::new();
let ns = catalog.create_namespace("ns").await;
let t = ns.create_table("table").await;
let s = ns.create_sequencer(1).await;
let p = t.with_sequencer(&s).create_partition("k1").await;
let p_id = p.partition.id;
let p_sort_key = p.partition.sort_key();
let cache = PartitionCache::new(
catalog.catalog(),
BackoffConfig::default(),
catalog.time_provider(),
&catalog.metric_registry(),
test_ram_pool(),
);
let sort_key = cache.sort_key(p_id).await;
assert_eq!(sort_key.as_ref(), &p_sort_key);
assert_histogram_metric_count(&catalog.metric_registry, "partition_get_by_id", 1);
// non-existing sort keys always expire
assert!(p_sort_key.is_none());
cache.expire_if_sort_key_does_not_cover(p_id, &HashSet::new());
let sort_key = cache.sort_key(p_id).await;
assert_eq!(sort_key.as_ref(), &p_sort_key);
assert_histogram_metric_count(&catalog.metric_registry, "partition_get_by_id", 2);
// set sort key
let p = p
.update_sort_key(SortKey::from_columns(["foo", "bar"]))
.await;
// just changing the sort key doesn't expire, we need a signal
let sort_key = cache.sort_key(p_id).await;
assert_eq!(sort_key.as_ref(), &p_sort_key);
assert_histogram_metric_count(&catalog.metric_registry, "partition_get_by_id", 2);
// expire
let p_sort_key = p.partition.sort_key();
cache.expire_if_sort_key_does_not_cover(p_id, &HashSet::from([String::from("foo")]));
let sort_key = cache.sort_key(p_id).await;
assert_eq!(sort_key.as_ref(), &p_sort_key);
assert_histogram_metric_count(&catalog.metric_registry, "partition_get_by_id", 3);
// subsets and the full key don't expire
cache.expire_if_sort_key_does_not_cover(p_id, &HashSet::new());
cache.expire_if_sort_key_does_not_cover(p_id, &HashSet::from([String::from("foo")]));
cache.expire_if_sort_key_does_not_cover(p_id, &HashSet::from([String::from("bar")]));
cache.expire_if_sort_key_does_not_cover(
p_id,
&HashSet::from([String::from("foo"), String::from("bar")]),
);
let sort_key = cache.sort_key(p_id).await;
assert_eq!(sort_key.as_ref(), &p_sort_key);
assert_histogram_metric_count(&catalog.metric_registry, "partition_get_by_id", 3);
// unknown columns expire
cache.expire_if_sort_key_does_not_cover(
p_id,
&HashSet::from([String::from("foo"), String::from("x")]),
);
let sort_key = cache.sort_key(p_id).await;
assert_eq!(sort_key.as_ref(), &p_sort_key);
assert_histogram_metric_count(&catalog.metric_registry, "partition_get_by_id", 4);
}
}

View File

@ -35,9 +35,6 @@ pub struct ChunkMeta {
/// Sort key.
sort_key: Option<SortKey>,
/// Partition sort key
partition_sort_key: Option<SortKey>,
/// Sequencer that created the data within this chunk.
sequencer_id: SequencerId,
@ -62,11 +59,6 @@ impl ChunkMeta {
self.sort_key.as_ref()
}
/// Partition sort key
pub fn partition_sort_key(&self) -> Option<&SortKey> {
self.partition_sort_key.as_ref()
}
/// Sequencer that created the data within this chunk.
pub fn sequencer_id(&self) -> SequencerId {
self.sequencer_id
@ -115,6 +107,9 @@ pub struct QuerierChunk {
/// Delete predicates of this chunk
delete_predicates: Vec<Arc<DeletePredicate>>,
/// Partition sort key
partition_sort_key: Arc<Option<SortKey>>,
}
impl QuerierChunk {
@ -123,6 +118,7 @@ impl QuerierChunk {
parquet_file_id: ParquetFileId,
chunk: Arc<ParquetChunk>,
meta: Arc<ChunkMeta>,
partition_sort_key: Arc<Option<SortKey>>,
) -> Self {
Self {
storage: ChunkStorage::Parquet {
@ -131,15 +127,23 @@ impl QuerierChunk {
},
meta,
delete_predicates: Vec::new(),
partition_sort_key,
}
}
/// Set delete predicates of the given chunk.
pub fn with_delete_predicates(self, delete_predicates: Vec<Arc<DeletePredicate>>) -> Self {
Self {
storage: self.storage,
meta: self.meta,
delete_predicates,
..self
}
}
/// Set partition sort key
pub fn with_partition_sort_key(self, partition_sort_key: Arc<Option<SortKey>>) -> Self {
Self {
partition_sort_key,
..self
}
}
@ -163,6 +167,11 @@ impl QuerierChunk {
ChunkStorage::Parquet { chunk, .. } => chunk.timestamp_min_max(),
}
}
/// Partition sort key
pub fn partition_sort_key(&self) -> Option<&SortKey> {
self.partition_sort_key.as_ref().as_ref()
}
}
/// Adapter that can create chunks.
@ -270,14 +279,18 @@ impl ParquetChunkAdapter {
table_name,
order,
sort_key: iox_metadata.sort_key.clone(),
partition_sort_key,
sequencer_id: iox_metadata.sequencer_id,
partition_id: iox_metadata.partition_id,
min_sequence_number: parquet_file.min_sequence_number,
max_sequence_number: parquet_file.max_sequence_number,
});
Some(QuerierChunk::new_parquet(parquet_file.id, chunk, meta))
Some(QuerierChunk::new_parquet(
parquet_file.id,
chunk,
meta,
partition_sort_key,
))
}
}

View File

@ -32,7 +32,7 @@ impl QueryChunkMeta for QuerierChunk {
}
fn partition_sort_key(&self) -> Option<&SortKey> {
self.meta().partition_sort_key()
self.partition_sort_key()
}
fn partition_id(&self) -> Option<PartitionId> {

View File

@ -147,7 +147,7 @@ pub trait IngesterConnection: std::fmt::Debug + Send + Sync + 'static {
columns: Vec<String>,
predicate: &Predicate,
expected_schema: Arc<Schema>,
) -> Result<Vec<Arc<IngesterPartition>>>;
) -> Result<Vec<IngesterPartition>>;
/// Returns the most recent partition sstatus info across all ingester(s) for the specified
/// write token.
@ -211,7 +211,7 @@ struct GetPartitionForIngester<'a> {
}
/// Fetches the partitions for a single ingester
async fn execute(request: GetPartitionForIngester<'_>) -> Result<Vec<Arc<IngesterPartition>>> {
async fn execute(request: GetPartitionForIngester<'_>) -> Result<Vec<IngesterPartition>> {
let GetPartitionForIngester {
flight_client,
catalog_cache,
@ -322,6 +322,7 @@ async fn execute(request: GetPartitionForIngester<'_>) -> Result<Vec<Arc<Ingeste
// to select the right parquet files and tombstones
let partition_id = PartitionId::new(partition_id);
let sequencer_id = catalog_cache.partition().sequencer_id(partition_id).await;
let partition_sort_key = catalog_cache.partition().sort_key(partition_id).await;
let ingester_partition = IngesterPartition::try_new(
Arc::clone(&ingester_address),
ChunkId::new(),
@ -332,9 +333,10 @@ async fn execute(request: GetPartitionForIngester<'_>) -> Result<Vec<Arc<Ingeste
Arc::clone(&expected_schema),
state.parquet_max_sequence_number.map(SequenceNumber::new),
state.tombstone_max_sequence_number.map(SequenceNumber::new),
partition_sort_key,
batches,
)?;
ingester_partitions.push(Arc::new(ingester_partition));
ingester_partitions.push(ingester_partition);
}
Ok(ingester_partitions)
@ -367,8 +369,8 @@ impl IngesterConnection for IngesterConnectionImpl {
columns: Vec<String>,
predicate: &Predicate,
expected_schema: Arc<Schema>,
) -> Result<Vec<Arc<IngesterPartition>>> {
let mut ingester_partitions: Vec<Arc<IngesterPartition>> = self
) -> Result<Vec<IngesterPartition>> {
let mut ingester_partitions: Vec<IngesterPartition> = self
.ingester_addresses
.iter()
.map(|ingester_address| {
@ -464,6 +466,9 @@ pub struct IngesterPartition {
/// persisted for this partition
tombstone_max_sequence_number: Option<SequenceNumber>,
/// Partition-wide sort key.
partition_sort_key: Arc<Option<SortKey>>,
/// The raw table data
batches: Vec<RecordBatch>,
@ -485,6 +490,7 @@ impl IngesterPartition {
expected_schema: Arc<Schema>,
parquet_max_sequence_number: Option<SequenceNumber>,
tombstone_max_sequence_number: Option<SequenceNumber>,
partition_sort_key: Arc<Option<SortKey>>,
batches: Vec<RecordBatch>,
) -> Result<Self> {
// ensure that the schema of the batches matches the required
@ -510,11 +516,19 @@ impl IngesterPartition {
schema: expected_schema,
parquet_max_sequence_number,
tombstone_max_sequence_number,
partition_sort_key,
batches,
summary,
})
}
pub(crate) fn with_partition_sort_key(self, partition_sort_key: Arc<Option<SortKey>>) -> Self {
Self {
partition_sort_key,
..self
}
}
pub(crate) fn ingester(&self) -> &Arc<str> {
&self.ingester
}
@ -551,7 +565,7 @@ impl QueryChunkMeta for IngesterPartition {
}
fn partition_sort_key(&self) -> Option<&SortKey> {
None // data comes from Ingester is not persisted yet and should not yet attached to any catalog partition
self.partition_sort_key.as_ref().as_ref()
}
fn partition_id(&self) -> Option<PartitionId> {
@ -1136,7 +1150,7 @@ mod tests {
async fn get_partitions(
ingester_conn: &IngesterConnectionImpl,
) -> Result<Vec<Arc<IngesterPartition>>, Error> {
) -> Result<Vec<IngesterPartition>, Error> {
let namespace = Arc::from("namespace");
let table = Arc::from("table");
let columns = vec![String::from("col")];
@ -1274,6 +1288,7 @@ mod tests {
Arc::clone(&expected_schema),
parquet_max_sequence_number,
tombstone_max_sequence_number,
Arc::new(None),
vec![case],
)
.unwrap();
@ -1309,6 +1324,7 @@ mod tests {
Arc::clone(&expected_schema),
parquet_max_sequence_number,
tombstone_max_sequence_number,
Arc::new(None),
vec![batch],
)
.unwrap_err();
@ -1340,6 +1356,7 @@ mod tests {
Arc::clone(&expected_schema),
parquet_max_sequence_number,
tombstone_max_sequence_number,
Arc::new(None),
vec![batch],
)
.unwrap();

View File

@ -7,7 +7,7 @@ use std::{any::Any, sync::Arc};
/// IngesterConnection for testing
#[derive(Debug, Default)]
pub struct MockIngesterConnection {
next_response: Mutex<Option<super::Result<Vec<Arc<super::IngesterPartition>>>>>,
next_response: Mutex<Option<super::Result<Vec<super::IngesterPartition>>>>,
}
impl MockIngesterConnection {
@ -18,7 +18,7 @@ impl MockIngesterConnection {
/// Set next response for this connection.
#[allow(dead_code)]
pub fn next_response(&self, response: super::Result<Vec<Arc<super::IngesterPartition>>>) {
pub fn next_response(&self, response: super::Result<Vec<super::IngesterPartition>>) {
*self.next_response.lock() = Some(response);
}
}
@ -32,7 +32,7 @@ impl IngesterConnection for MockIngesterConnection {
_columns: Vec<String>,
_predicate: &predicate::Predicate,
_expected_schema: Arc<schema::Schema>,
) -> super::Result<Vec<Arc<super::IngesterPartition>>> {
) -> super::Result<Vec<super::IngesterPartition>> {
self.next_response
.lock()
.take()

View File

@ -181,10 +181,7 @@ impl QuerierTable {
}
/// Get partitions from ingesters.
async fn ingester_partitions(
&self,
predicate: &Predicate,
) -> Result<Vec<Arc<IngesterPartition>>> {
async fn ingester_partitions(&self, predicate: &Predicate) -> Result<Vec<IngesterPartition>> {
// For now, ask for *all* columns in the table from the ingester (need
// at least all pk (time, tag) columns for
// deduplication.
@ -453,21 +450,20 @@ mod tests {
.as_any()
.downcast_ref::<MockIngesterConnection>()
.unwrap()
.next_response(Ok(vec![Arc::new(
IngesterPartition::try_new(
Arc::from("ingester"),
ChunkId::new(),
Arc::from(ns.namespace.name.clone()),
Arc::from(table.table.name.clone()),
partition.partition.id,
sequencer.sequencer.id,
Arc::new(SchemaBuilder::new().build().unwrap()),
Some(SequenceNumber::new(1)),
None,
vec![],
)
.unwrap(),
)]));
.next_response(Ok(vec![IngesterPartition::try_new(
Arc::from("ingester"),
ChunkId::new(),
Arc::from(ns.namespace.name.clone()),
Arc::from(table.table.name.clone()),
partition.partition.id,
sequencer.sequencer.id,
Arc::new(SchemaBuilder::new().build().unwrap()),
Some(SequenceNumber::new(1)),
None,
Arc::new(None),
vec![],
)
.unwrap()]));
let err = querier_table.chunks(&pred).await.unwrap_err();
assert_matches!(err, Error::StateFusion { .. });
@ -566,53 +562,51 @@ mod tests {
.unwrap()
.next_response(Ok(vec![
// this chunk is kept
Arc::new(
IngesterPartition::try_new(
Arc::from("ingester"),
ingester_chunk_id1,
Arc::from(ns.namespace.name.clone()),
Arc::from(table.table.name.clone()),
partition1.partition.id,
sequencer.sequencer.id,
Arc::new(
SchemaBuilder::new()
.influx_field("foo", InfluxFieldType::Integer)
.timestamp()
.build()
.unwrap(),
),
// parquet max persisted sequence number
Some(SequenceNumber::new(2)),
// tombstone max persisted sequence number
Some(SequenceNumber::new(10)),
vec![lp_to_record_batch("table foo=3i 33")],
)
.unwrap(),
),
IngesterPartition::try_new(
Arc::from("ingester"),
ingester_chunk_id1,
Arc::from(ns.namespace.name.clone()),
Arc::from(table.table.name.clone()),
partition1.partition.id,
sequencer.sequencer.id,
Arc::new(
SchemaBuilder::new()
.influx_field("foo", InfluxFieldType::Integer)
.timestamp()
.build()
.unwrap(),
),
// parquet max persisted sequence number
Some(SequenceNumber::new(2)),
// tombstone max persisted sequence number
Some(SequenceNumber::new(10)),
Arc::new(None),
vec![lp_to_record_batch("table foo=3i 33")],
)
.unwrap(),
// this chunk is filtered out because it has no record batches but the reconciling still takes place
Arc::new(
IngesterPartition::try_new(
Arc::from("ingester"),
ingester_chunk_id2,
Arc::from(ns.namespace.name.clone()),
Arc::from(table.table.name.clone()),
partition2.partition.id,
sequencer.sequencer.id,
Arc::new(
SchemaBuilder::new()
.influx_field("foo", InfluxFieldType::Integer)
.timestamp()
.build()
.unwrap(),
),
// parquet max persisted sequence number
Some(SequenceNumber::new(3)),
// tombstone max persisted sequence number
Some(SequenceNumber::new(11)),
vec![],
)
.unwrap(),
),
IngesterPartition::try_new(
Arc::from("ingester"),
ingester_chunk_id2,
Arc::from(ns.namespace.name.clone()),
Arc::from(table.table.name.clone()),
partition2.partition.id,
sequencer.sequencer.id,
Arc::new(
SchemaBuilder::new()
.influx_field("foo", InfluxFieldType::Integer)
.timestamp()
.build()
.unwrap(),
),
// parquet max persisted sequence number
Some(SequenceNumber::new(3)),
// tombstone max persisted sequence number
Some(SequenceNumber::new(11)),
Arc::new(None),
vec![],
)
.unwrap(),
]));
let mut chunks = querier_table.chunks(&pred).await.unwrap();
@ -668,75 +662,72 @@ mod tests {
.downcast_ref::<MockIngesterConnection>()
.unwrap()
.next_response(Ok(vec![
Arc::new(
IngesterPartition::try_new(
Arc::from("ingester1"),
ingester_chunk_id1,
Arc::from(ns.namespace.name.clone()),
Arc::from(table.table.name.clone()),
partition1.partition.id,
sequencer.sequencer.id,
Arc::new(
SchemaBuilder::new()
.influx_field("foo", InfluxFieldType::Integer)
.timestamp()
.build()
.unwrap(),
),
// parquet max persisted sequence number
None,
// tombstone max persisted sequence number
None,
vec![lp_to_record_batch("table foo=1i 1")],
)
.unwrap(),
),
Arc::new(
IngesterPartition::try_new(
Arc::from("ingester1"),
ingester_chunk_id2,
Arc::from(ns.namespace.name.clone()),
Arc::from(table.table.name.clone()),
partition2.partition.id,
sequencer.sequencer.id,
Arc::new(
SchemaBuilder::new()
.influx_field("foo", InfluxFieldType::Integer)
.timestamp()
.build()
.unwrap(),
),
// parquet max persisted sequence number
None,
// tombstone max persisted sequence number
None,
vec![lp_to_record_batch("table foo=2i 2")],
)
.unwrap(),
),
Arc::new(
IngesterPartition::try_new(
Arc::from("ingester2"),
ingester_chunk_id3,
Arc::from(ns.namespace.name.clone()),
Arc::from(table.table.name.clone()),
partition1.partition.id,
sequencer.sequencer.id,
Arc::new(
SchemaBuilder::new()
.influx_field("foo", InfluxFieldType::Integer)
.timestamp()
.build()
.unwrap(),
),
// parquet max persisted sequence number
None,
// tombstone max persisted sequence number
None,
vec![lp_to_record_batch("table foo=3i 3")],
)
.unwrap(),
),
IngesterPartition::try_new(
Arc::from("ingester1"),
ingester_chunk_id1,
Arc::from(ns.namespace.name.clone()),
Arc::from(table.table.name.clone()),
partition1.partition.id,
sequencer.sequencer.id,
Arc::new(
SchemaBuilder::new()
.influx_field("foo", InfluxFieldType::Integer)
.timestamp()
.build()
.unwrap(),
),
// parquet max persisted sequence number
None,
// tombstone max persisted sequence number
None,
Arc::new(None),
vec![lp_to_record_batch("table foo=1i 1")],
)
.unwrap(),
IngesterPartition::try_new(
Arc::from("ingester1"),
ingester_chunk_id2,
Arc::from(ns.namespace.name.clone()),
Arc::from(table.table.name.clone()),
partition2.partition.id,
sequencer.sequencer.id,
Arc::new(
SchemaBuilder::new()
.influx_field("foo", InfluxFieldType::Integer)
.timestamp()
.build()
.unwrap(),
),
// parquet max persisted sequence number
None,
// tombstone max persisted sequence number
None,
Arc::new(None),
vec![lp_to_record_batch("table foo=2i 2")],
)
.unwrap(),
IngesterPartition::try_new(
Arc::from("ingester2"),
ingester_chunk_id3,
Arc::from(ns.namespace.name.clone()),
Arc::from(table.table.name.clone()),
partition1.partition.id,
sequencer.sequencer.id,
Arc::new(
SchemaBuilder::new()
.influx_field("foo", InfluxFieldType::Integer)
.timestamp()
.build()
.unwrap(),
),
// parquet max persisted sequence number
None,
// tombstone max persisted sequence number
None,
Arc::new(None),
vec![lp_to_record_batch("table foo=3i 3")],
)
.unwrap(),
]));
let err = querier_table.chunks(&pred).await.unwrap_err();

View File

@ -16,13 +16,18 @@ mod interface;
use data_types::{ParquetFileWithMetadata, PartitionId, SequencerId, Tombstone, TombstoneId};
use iox_query::QueryChunk;
use observability_deps::tracing::debug;
use schema::{sort::SortKey, InfluxColumnType};
use snafu::Snafu;
use std::{
collections::{HashMap, HashSet},
sync::Arc,
};
use crate::{chunk::ParquetChunkAdapter, tombstone::QuerierTombstone, IngesterPartition};
use crate::{
chunk::{ParquetChunkAdapter, QuerierChunk},
tombstone::QuerierTombstone,
IngesterPartition,
};
use self::interface::{IngesterPartitionInfo, ParquetFileInfo, TombstoneInfo};
@ -58,11 +63,33 @@ impl Reconciler {
/// chunks to query
pub(crate) async fn reconcile(
&self,
ingester_partitions: Vec<Arc<IngesterPartition>>,
ingester_partitions: Vec<IngesterPartition>,
tombstones: Vec<Tombstone>,
parquet_files: Vec<ParquetFileWithMetadata>,
) -> Result<Vec<Arc<dyn QueryChunk>>, ReconcileError> {
let tombstone_exclusion = tombstone_exclude_list(&ingester_partitions, &tombstones);
let mut chunks = self
.build_parquet_chunks(&ingester_partitions, tombstones, parquet_files)
.await?;
chunks.extend(self.build_ingester_chunks(ingester_partitions));
debug!(num_chunks=%chunks.len(), "Final chunk count after reconcilation");
let chunks = self.sync_partition_sort_keys(chunks).await;
let chunks: Vec<Arc<dyn QueryChunk>> = chunks
.into_iter()
.map(|c| c.upcast_to_querier_chunk().into())
.collect();
Ok(chunks)
}
async fn build_parquet_chunks(
&self,
ingester_partitions: &[IngesterPartition],
tombstones: Vec<Tombstone>,
parquet_files: Vec<ParquetFileWithMetadata>,
) -> Result<Vec<Box<dyn UpdatableQuerierChunk>>, ReconcileError> {
let tombstone_exclusion = tombstone_exclude_list(ingester_partitions, &tombstones);
let querier_tombstones: Vec<_> =
tombstones.into_iter().map(QuerierTombstone::from).collect();
@ -79,7 +106,7 @@ impl Reconciler {
}
//
let parquet_files = filter_parquet_files(&ingester_partitions, parquet_files)?;
let parquet_files = filter_parquet_files(ingester_partitions, parquet_files)?;
debug!(
?parquet_files,
@ -101,7 +128,7 @@ impl Reconciler {
}
debug!(num_chunks=%parquet_chunks.len(), "Created parquet chunks");
let mut chunks: Vec<Arc<dyn QueryChunk>> =
let mut chunks: Vec<Box<dyn UpdatableQuerierChunk>> =
Vec::with_capacity(parquet_chunks.len() + ingester_partitions.len());
for chunk in parquet_chunks.into_iter() {
@ -165,21 +192,74 @@ impl Reconciler {
chunk
};
chunks.push(Arc::new(chunk) as Arc<dyn QueryChunk>);
chunks.push(Box::new(chunk) as Box<dyn UpdatableQuerierChunk>);
}
Ok(chunks)
}
fn build_ingester_chunks(
&self,
ingester_partitions: Vec<IngesterPartition>,
) -> impl Iterator<Item = Box<dyn UpdatableQuerierChunk>> {
// Add ingester chunks to the overall chunk list.
// - filter out chunks that don't have any record batches
// - tombstones don't need to be applied since they were already materialized by the ingester
let ingester_chunks = ingester_partitions
ingester_partitions
.into_iter()
.filter(|c| c.has_batches())
.map(|c| c as Arc<dyn QueryChunk>);
.map(|c| Box::new(c) as Box<dyn UpdatableQuerierChunk>)
}
chunks.extend(ingester_chunks);
debug!(num_chunks=%chunks.len(), "Final chunk count after reconcilation");
async fn sync_partition_sort_keys(
&self,
chunks: Vec<Box<dyn UpdatableQuerierChunk>>,
) -> Vec<Box<dyn UpdatableQuerierChunk>> {
// collect columns
let mut all_columns: HashMap<PartitionId, HashSet<String>> = HashMap::new();
for chunk in &chunks {
if let Some(partition_id) = chunk.partition_id() {
// columns for this partition MUST include the primary key of this chunk
all_columns.entry(partition_id).or_default().extend(
chunk
.schema()
.iter()
.filter(|(t, _field)| {
matches!(t, Some(InfluxColumnType::Tag | InfluxColumnType::Timestamp))
})
.map(|(_t, field)| field.name().clone()),
);
}
}
Ok(chunks)
// report expiration signal to cache
let partition_cache = self.chunk_adapter.catalog_cache().partition();
for (partition_id, columns) in &all_columns {
partition_cache.expire_if_sort_key_does_not_cover(*partition_id, columns);
}
// get cached (or fresh) sort keys
let mut sort_keys: HashMap<PartitionId, Arc<Option<SortKey>>> =
HashMap::with_capacity(all_columns.len());
for partition_id in all_columns.into_keys() {
let sort_key = partition_cache.sort_key(partition_id).await;
sort_keys.insert(partition_id, sort_key);
}
// write partition sort keys to chunks
chunks
.into_iter()
.map(|chunk| {
if let Some(partition_id) = chunk.partition_id() {
let sort_key = sort_keys
.get(&partition_id)
.expect("sort key for this partition should be fetched by now");
chunk.update_partition_sort_key(Arc::clone(sort_key))
} else {
chunk
}
})
.collect()
}
#[must_use]
@ -193,6 +273,41 @@ impl Reconciler {
}
}
trait UpdatableQuerierChunk: QueryChunk {
fn update_partition_sort_key(
self: Box<Self>,
sort_key: Arc<Option<SortKey>>,
) -> Box<dyn UpdatableQuerierChunk>;
fn upcast_to_querier_chunk(self: Box<Self>) -> Box<dyn QueryChunk>;
}
impl UpdatableQuerierChunk for QuerierChunk {
fn update_partition_sort_key(
self: Box<Self>,
sort_key: Arc<Option<SortKey>>,
) -> Box<dyn UpdatableQuerierChunk> {
Box::new(self.with_partition_sort_key(sort_key))
}
fn upcast_to_querier_chunk(self: Box<Self>) -> Box<dyn QueryChunk> {
self as _
}
}
impl UpdatableQuerierChunk for IngesterPartition {
fn update_partition_sort_key(
self: Box<Self>,
sort_key: Arc<Option<SortKey>>,
) -> Box<dyn UpdatableQuerierChunk> {
Box::new(self.with_partition_sort_key(sort_key))
}
fn upcast_to_querier_chunk(self: Box<Self>) -> Box<dyn QueryChunk> {
self as _
}
}
/// Filter out parquet files that contain "too new" data.
///
/// The caller may only use the returned parquet files.

View File

@ -16,7 +16,28 @@ pub trait IngesterPartitionInfo {
fn tombstone_max_sequence_number(&self) -> Option<SequenceNumber>;
}
impl IngesterPartitionInfo for Arc<IngesterPartition> {
impl IngesterPartitionInfo for IngesterPartition {
fn partition_id(&self) -> PartitionId {
self.deref().partition_id()
}
fn sequencer_id(&self) -> SequencerId {
self.deref().sequencer_id()
}
fn parquet_max_sequence_number(&self) -> Option<SequenceNumber> {
self.deref().parquet_max_sequence_number()
}
fn tombstone_max_sequence_number(&self) -> Option<SequenceNumber> {
self.deref().tombstone_max_sequence_number()
}
}
impl<T> IngesterPartitionInfo for Arc<T>
where
T: IngesterPartitionInfo,
{
fn partition_id(&self) -> PartitionId {
self.deref().partition_id()
}

View File

@ -77,6 +77,8 @@ use crate::runner::Runner;"#
#[tokio::test]
// Tests from {:?},
async fn test_cases_{}() {{
test_helpers::maybe_start_logging();
let input_path = Path::new("cases").join("in").join("{}");
let mut runner = Runner::new();
runner

View File

@ -7,6 +7,8 @@ use crate::runner::Runner;
#[tokio::test]
// Tests from "basic.sql",
async fn test_cases_basic_sql() {
test_helpers::maybe_start_logging();
let input_path = Path::new("cases").join("in").join("basic.sql");
let mut runner = Runner::new();
runner
@ -21,6 +23,8 @@ async fn test_cases_basic_sql() {
#[tokio::test]
// Tests from "delete_all.sql",
async fn test_cases_delete_all_sql() {
test_helpers::maybe_start_logging();
let input_path = Path::new("cases").join("in").join("delete_all.sql");
let mut runner = Runner::new();
runner
@ -35,6 +39,8 @@ async fn test_cases_delete_all_sql() {
#[tokio::test]
// Tests from "delete_multi_expr_one_chunk.sql",
async fn test_cases_delete_multi_expr_one_chunk_sql() {
test_helpers::maybe_start_logging();
let input_path = Path::new("cases").join("in").join("delete_multi_expr_one_chunk.sql");
let mut runner = Runner::new();
runner
@ -49,6 +55,8 @@ async fn test_cases_delete_multi_expr_one_chunk_sql() {
#[tokio::test]
// Tests from "delete_simple_pred_one_chunk.sql",
async fn test_cases_delete_simple_pred_one_chunk_sql() {
test_helpers::maybe_start_logging();
let input_path = Path::new("cases").join("in").join("delete_simple_pred_one_chunk.sql");
let mut runner = Runner::new();
runner
@ -63,6 +71,8 @@ async fn test_cases_delete_simple_pred_one_chunk_sql() {
#[tokio::test]
// Tests from "delete_three_delete_three_chunks.sql",
async fn test_cases_delete_three_delete_three_chunks_sql() {
test_helpers::maybe_start_logging();
let input_path = Path::new("cases").join("in").join("delete_three_delete_three_chunks.sql");
let mut runner = Runner::new();
runner
@ -77,6 +87,8 @@ async fn test_cases_delete_three_delete_three_chunks_sql() {
#[tokio::test]
// Tests from "delete_two_del_multi_expr_one_chunk.sql",
async fn test_cases_delete_two_del_multi_expr_one_chunk_sql() {
test_helpers::maybe_start_logging();
let input_path = Path::new("cases").join("in").join("delete_two_del_multi_expr_one_chunk.sql");
let mut runner = Runner::new();
runner
@ -91,6 +103,8 @@ async fn test_cases_delete_two_del_multi_expr_one_chunk_sql() {
#[tokio::test]
// Tests from "duplicates.sql",
async fn test_cases_duplicates_sql() {
test_helpers::maybe_start_logging();
let input_path = Path::new("cases").join("in").join("duplicates.sql");
let mut runner = Runner::new();
runner
@ -105,6 +119,8 @@ async fn test_cases_duplicates_sql() {
#[tokio::test]
// Tests from "new_sql_system_tables.sql",
async fn test_cases_new_sql_system_tables_sql() {
test_helpers::maybe_start_logging();
let input_path = Path::new("cases").join("in").join("new_sql_system_tables.sql");
let mut runner = Runner::new();
runner
@ -119,6 +135,8 @@ async fn test_cases_new_sql_system_tables_sql() {
#[tokio::test]
// Tests from "pushdown.sql",
async fn test_cases_pushdown_sql() {
test_helpers::maybe_start_logging();
let input_path = Path::new("cases").join("in").join("pushdown.sql");
let mut runner = Runner::new();
runner
@ -133,6 +151,8 @@ async fn test_cases_pushdown_sql() {
#[tokio::test]
// Tests from "several_chunks.sql",
async fn test_cases_several_chunks_sql() {
test_helpers::maybe_start_logging();
let input_path = Path::new("cases").join("in").join("several_chunks.sql");
let mut runner = Runner::new();
runner
@ -147,6 +167,8 @@ async fn test_cases_several_chunks_sql() {
#[tokio::test]
// Tests from "sql_information_schema.sql",
async fn test_cases_sql_information_schema_sql() {
test_helpers::maybe_start_logging();
let input_path = Path::new("cases").join("in").join("sql_information_schema.sql");
let mut runner = Runner::new();
runner
@ -161,6 +183,8 @@ async fn test_cases_sql_information_schema_sql() {
#[tokio::test]
// Tests from "timestamps.sql",
async fn test_cases_timestamps_sql() {
test_helpers::maybe_start_logging();
let input_path = Path::new("cases").join("in").join("timestamps.sql");
let mut runner = Runner::new();
runner
@ -175,6 +199,8 @@ async fn test_cases_timestamps_sql() {
#[tokio::test]
// Tests from "two_chunks.sql",
async fn test_cases_two_chunks_sql() {
test_helpers::maybe_start_logging();
let input_path = Path::new("cases").join("in").join("two_chunks.sql");
let mut runner = Runner::new();
runner
@ -189,6 +215,8 @@ async fn test_cases_two_chunks_sql() {
#[tokio::test]
// Tests from "two_chunks_missing_columns.sql",
async fn test_cases_two_chunks_missing_columns_sql() {
test_helpers::maybe_start_logging();
let input_path = Path::new("cases").join("in").join("two_chunks_missing_columns.sql");
let mut runner = Runner::new();
runner

View File

@ -146,10 +146,18 @@ impl TestConfig {
/// Adds default ingester options
fn with_default_ingester_options(self) -> Self {
self.with_env("INFLUXDB_IOX_PAUSE_INGEST_SIZE_BYTES", "2000000")
.with_env("INFLUXDB_IOX_PERSIST_MEMORY_THRESHOLD_BYTES", "10")
.with_ingester_persist_memory_threshold(10)
.with_kafka_partition(0)
}
/// Sets memory threshold for ingester.
pub fn with_ingester_persist_memory_threshold(self, bytes: u64) -> Self {
self.with_env(
"INFLUXDB_IOX_PERSIST_MEMORY_THRESHOLD_BYTES",
bytes.to_string(),
)
}
/// Adds an ingester that ingests from the specified kafka partition
pub fn with_kafka_partition(self, kafka_partition_id: u64) -> Self {
self.with_env(