refactor: clean up `querier` a bit

Before adding more and more features, here is a bit of a clean up and
prep work:

- Pull out caching into its own module and add proper tests for it.
- Start to build a test infrastructure so tests are shorter and easier
  to read. This doesn't fully pay off just yet but gets more and more
  important when we actually sync tables and chunks.
pull/24376/head
Marco Neumann 2022-03-01 11:51:46 +01:00
parent 48722783f9
commit daf14f6506
6 changed files with 653 additions and 368 deletions

314
querier/src/cache.rs Normal file
View File

@ -0,0 +1,314 @@
use std::{collections::HashMap, sync::Arc};
use backoff::{Backoff, BackoffConfig};
use iox_catalog::interface::{Catalog, NamespaceId, PartitionId, SequencerId, TableId};
use parking_lot::RwLock;
/// Caches request to the [`Catalog`].
#[derive(Debug)]
pub struct CatalogCache {
/// Backoff config for IO operations.
backoff_config: BackoffConfig,
/// Catalog.
catalog: Arc<dyn Catalog>,
/// Partition keys cache for old gen.
old_gen_partition_key_cache: RwLock<HashMap<PartitionId, Arc<str>>>,
/// Partition key and sequencer ID cache.
partition_cache: RwLock<HashMap<PartitionId, (Arc<str>, SequencerId)>>,
/// Table name and namespace cache.
table_cache: RwLock<HashMap<TableId, (Arc<str>, NamespaceId)>>,
/// Namespace name cache.
namespace_cache: RwLock<HashMap<NamespaceId, Arc<str>>>,
}
impl CatalogCache {
/// Create empty cache.
pub fn new(catalog: Arc<dyn Catalog>) -> Self {
Self {
backoff_config: BackoffConfig::default(),
catalog,
old_gen_partition_key_cache: RwLock::new(HashMap::default()),
partition_cache: RwLock::new(HashMap::default()),
table_cache: RwLock::new(HashMap::default()),
namespace_cache: RwLock::new(HashMap::default()),
}
}
/// Get partition key for old gen.
///
/// This either uses a cached value or -- if required -- creates a fresh string.
pub async fn old_gen_partition_key(&self, partition_id: PartitionId) -> Arc<str> {
if let Some(key) = self.old_gen_partition_key_cache.read().get(&partition_id) {
return Arc::clone(key);
}
let (partition_key, sequencer_id) = self.cached_partition(partition_id).await;
let og_partition_key = Arc::from(format!("{}-{}", sequencer_id.get(), partition_key));
Arc::clone(
self.old_gen_partition_key_cache
.write()
.entry(partition_id)
.or_insert(og_partition_key),
)
}
/// Get the partition key and sequencer ID for the given partition ID.
///
/// This either uses a cached value or -- if required -- fetches the mapping from the catalog.
async fn cached_partition(&self, partition_id: PartitionId) -> (Arc<str>, SequencerId) {
if let Some((key, id)) = self.partition_cache.read().get(&partition_id) {
return (Arc::clone(key), *id);
}
let partition = Backoff::new(&self.backoff_config)
.retry_all_errors("get partition_key", || async {
self.catalog
.repositories()
.await
.partitions()
.get_by_id(partition_id)
.await
})
.await
.expect("retry forever")
.expect("partition gone from catalog?!");
let key = Arc::from(partition.partition_key);
let mut partition_cache = self.partition_cache.write();
let (key, id) = partition_cache
.entry(partition_id)
.or_insert((key, partition.sequencer_id));
(Arc::clone(key), *id)
}
/// Get the table name and namespace ID for the given table ID.
///
/// This either uses a cached value or -- if required -- fetches the mapping from the catalog.
async fn cached_table(&self, table_id: TableId) -> (Arc<str>, NamespaceId) {
if let Some((name, ns)) = self.table_cache.read().get(&table_id) {
return (Arc::clone(name), *ns);
}
let table = Backoff::new(&self.backoff_config)
.retry_all_errors("get table_name", || async {
self.catalog
.repositories()
.await
.tables()
.get_by_id(table_id)
.await
})
.await
.expect("retry forever")
.expect("table gone from catalog?!");
let name = Arc::from(table.name);
let mut table_cache = self.table_cache.write();
let (name, ns) = table_cache
.entry(table_id)
.or_insert((name, table.namespace_id));
(Arc::clone(name), *ns)
}
/// Get the table name for the given table ID.
///
/// This either uses a cached value or -- if required -- fetches the mapping from the catalog.
pub async fn table_name(&self, table_id: TableId) -> Arc<str> {
self.cached_table(table_id).await.0
}
/// Get the table namespace ID for the given table ID.
///
/// This either uses a cached value or -- if required -- fetches the mapping from the catalog.
pub async fn table_namespace_id(&self, table_id: TableId) -> NamespaceId {
self.cached_table(table_id).await.1
}
/// Get the namespace name for the given namespace ID.
///
/// This either uses a cached value or -- if required -- fetches the mapping from the catalog.
pub async fn namespace_name(&self, namespace_id: NamespaceId) -> Arc<str> {
if let Some(name) = self.namespace_cache.read().get(&namespace_id) {
return Arc::clone(name);
}
let namespace = Backoff::new(&self.backoff_config)
.retry_all_errors("get namespace_name", || async {
self.catalog
.repositories()
.await
.namespaces()
.get_by_id(namespace_id)
.await
})
.await
.expect("retry forever")
.expect("namespace gone from catalog?!");
let name = Arc::from(namespace.name);
Arc::clone(
self.namespace_cache
.write()
.entry(namespace_id)
.or_insert(name),
)
}
}
#[cfg(test)]
mod tests {
use metric::{Attributes, Metric, U64Histogram};
use crate::test_util::TestCatalog;
use super::*;
#[tokio::test]
async fn test_namespace_name() {
let catalog = TestCatalog::new();
let ns1 = catalog.create_namespace("ns1").await.namespace.clone();
let ns2 = catalog.create_namespace("ns2").await.namespace.clone();
assert_ne!(ns1.id, ns2.id);
let cache = CatalogCache::new(catalog.catalog());
let name1_a = cache.namespace_name(ns1.id).await;
assert_eq!(name1_a.as_ref(), ns1.name.as_str());
assert_metric(&catalog.metric_registry, "namespace_get_by_id", 1);
let name2 = cache.namespace_name(ns2.id).await;
assert_eq!(name2.as_ref(), ns2.name.as_str());
assert_metric(&catalog.metric_registry, "namespace_get_by_id", 2);
let name1_b = cache.namespace_name(ns1.id).await;
assert!(Arc::ptr_eq(&name1_a, &name1_b));
assert_metric(&catalog.metric_registry, "namespace_get_by_id", 2);
}
#[tokio::test]
async fn test_table_name() {
let catalog = TestCatalog::new();
let ns = catalog.create_namespace("ns").await;
let t1 = ns.create_table("table1").await.table.clone();
let t2 = ns.create_table("table2").await.table.clone();
assert_ne!(t1.id, t2.id);
let cache = CatalogCache::new(catalog.catalog());
let name1_a = cache.table_name(t1.id).await;
assert_eq!(name1_a.as_ref(), t1.name.as_str());
assert_metric(&catalog.metric_registry, "table_get_by_id", 1);
let name2 = cache.table_name(t2.id).await;
assert_eq!(name2.as_ref(), t2.name.as_str());
assert_metric(&catalog.metric_registry, "table_get_by_id", 2);
let name1_b = cache.table_name(t1.id).await;
assert!(Arc::ptr_eq(&name1_a, &name1_b));
assert_metric(&catalog.metric_registry, "table_get_by_id", 2);
}
#[tokio::test]
async fn test_table_namespace_id() {
let catalog = TestCatalog::new();
let ns2 = catalog.create_namespace("ns1").await;
let ns1 = catalog.create_namespace("ns2").await;
let t1 = ns1.create_table("table1").await.table.clone();
let t2 = ns2.create_table("table2").await.table.clone();
assert_ne!(t1.id, t2.id);
assert_ne!(t1.namespace_id, t2.namespace_id);
let cache = CatalogCache::new(catalog.catalog());
let id1_a = cache.table_namespace_id(t1.id).await;
assert_eq!(id1_a, t1.namespace_id);
assert_metric(&catalog.metric_registry, "table_get_by_id", 1);
let id2 = cache.table_namespace_id(t2.id).await;
assert_eq!(id2, t2.namespace_id);
assert_metric(&catalog.metric_registry, "table_get_by_id", 2);
let id1_b = cache.table_namespace_id(t1.id).await;
assert_eq!(id1_b, t1.namespace_id);
assert_metric(&catalog.metric_registry, "table_get_by_id", 2);
}
#[tokio::test]
async fn test_table_shared_cache() {
let catalog = TestCatalog::new();
let ns = catalog.create_namespace("ns").await;
let t1 = ns.create_table("table1").await.table.clone();
let t2 = ns.create_table("table2").await.table.clone();
assert_ne!(t1.id, t2.id);
let cache = CatalogCache::new(catalog.catalog());
cache.table_name(t1.id).await;
cache.table_namespace_id(t2.id).await;
assert_metric(&catalog.metric_registry, "table_get_by_id", 2);
// `table_name` and `table_namespace_id` use the same underlying cache
cache.table_namespace_id(t1.id).await;
cache.table_name(t2.id).await;
assert_metric(&catalog.metric_registry, "table_get_by_id", 2);
}
#[tokio::test]
async fn test_old_partition_key() {
let catalog = TestCatalog::new();
let ns = catalog.create_namespace("ns").await;
let t = ns.create_table("table").await;
let p11 = t.create_partition("k1", 1).await.partition.clone();
let p12 = t.create_partition("k1", 2).await.partition.clone();
let p21 = t.create_partition("k2", 1).await.partition.clone();
let p22 = t.create_partition("k2", 2).await.partition.clone();
let cache = CatalogCache::new(catalog.catalog());
let name11_a = cache.old_gen_partition_key(p11.id).await;
assert_eq!(name11_a.as_ref(), "1-k1");
assert_metric(&catalog.metric_registry, "partition_get_by_id", 1);
let name12 = cache.old_gen_partition_key(p12.id).await;
assert_eq!(name12.as_ref(), "2-k1");
assert_metric(&catalog.metric_registry, "partition_get_by_id", 2);
let name21 = cache.old_gen_partition_key(p21.id).await;
assert_eq!(name21.as_ref(), "1-k2");
assert_metric(&catalog.metric_registry, "partition_get_by_id", 3);
let name22 = cache.old_gen_partition_key(p22.id).await;
assert_eq!(name22.as_ref(), "2-k2");
assert_metric(&catalog.metric_registry, "partition_get_by_id", 4);
let name11_b = cache.old_gen_partition_key(p11.id).await;
assert!(Arc::ptr_eq(&name11_a, &name11_b));
assert_metric(&catalog.metric_registry, "partition_get_by_id", 4);
}
fn assert_metric(metrics: &metric::Registry, name: &'static str, n: u64) {
let histogram = metrics
.get_instrument::<Metric<U64Histogram>>("catalog_op_duration_ms")
.expect("failed to read metric")
.get_observer(&Attributes::from(&[("op", name), ("result", "success")]))
.expect("failed to get observer")
.fetch();
let hit_count = histogram.buckets.iter().fold(0, |acc, v| acc + v.count);
assert_eq!(hit_count, n);
}
}

View File

@ -1,14 +1,10 @@
use std::{collections::HashMap, sync::Arc};
use std::sync::Arc;
use backoff::{Backoff, BackoffConfig};
use data_types::chunk_metadata::{ChunkAddr, ChunkOrder};
use db::catalog::chunk::{CatalogChunk, ChunkMetadata, ChunkMetrics as CatalogChunkMetrics};
use iox_catalog::interface::{
Catalog, NamespaceId, ParquetFile, PartitionId, SequencerId, TableId,
};
use iox_catalog::interface::ParquetFile;
use iox_object_store::{IoxObjectStore, ParquetFilePath};
use object_store::ObjectStore;
use parking_lot::RwLock;
use parquet_file::{
chunk::{ChunkMetrics as ParquetChunkMetrics, ParquetChunk},
metadata::{DecodedIoxParquetMetaData, IoxMetadata, IoxParquetMetaData},
@ -16,6 +12,8 @@ use parquet_file::{
use time::TimeProvider;
use uuid::Uuid;
use crate::cache::CatalogCache;
/// Parquet file with decoded metadata.
struct DecodedParquetFile {
parquet_file: ParquetFile,
@ -46,11 +44,8 @@ impl DecodedParquetFile {
/// Adapter that can create old-gen chunks for the new-gen catalog.
#[derive(Debug)]
pub struct ParquetChunkAdapter {
/// Backoff config for IO operations.
backoff_config: BackoffConfig,
/// Catalog.
catalog: Arc<dyn Catalog>,
/// Cache
catalog_cache: Arc<CatalogCache>,
/// Old-gen object store.
iox_object_store: Arc<IoxObjectStore>,
@ -60,24 +55,12 @@ pub struct ParquetChunkAdapter {
/// Time provider.
time_provider: Arc<dyn TimeProvider>,
/// Partition keys cache for old gen.
old_gen_partition_key_cache: RwLock<HashMap<(SequencerId, PartitionId), Arc<str>>>,
/// Partition key cache.
partition_cache: RwLock<HashMap<PartitionId, Arc<str>>>,
/// Table name and namespace cache.
table_cache: RwLock<HashMap<TableId, (Arc<str>, NamespaceId)>>,
/// Namespace name cache.
namespace_cache: RwLock<HashMap<NamespaceId, Arc<str>>>,
}
impl ParquetChunkAdapter {
/// Create new adapter with empty cache.
pub fn new(
catalog: Arc<dyn Catalog>,
catalog_cache: Arc<CatalogCache>,
object_store: Arc<ObjectStore>,
metric_registry: Arc<metric::Registry>,
time_provider: Arc<dyn TimeProvider>,
@ -89,15 +72,10 @@ impl ParquetChunkAdapter {
));
Self {
backoff_config: BackoffConfig::default(),
catalog,
catalog_cache,
iox_object_store,
metric_registry,
time_provider,
old_gen_partition_key_cache: RwLock::new(HashMap::default()),
partition_cache: RwLock::new(HashMap::default()),
table_cache: RwLock::new(HashMap::default()),
namespace_cache: RwLock::new(HashMap::default()),
}
}
@ -114,8 +92,11 @@ impl ParquetChunkAdapter {
let parquet_file = &decoded_parquet_file.parquet_file;
let file_size_bytes = parquet_file.file_size_bytes as usize;
let table_name = self.table_name(parquet_file.table_id).await;
let partition_key = self.partition_key(parquet_file.partition_id).await;
let table_name = self.catalog_cache.table_name(parquet_file.table_id).await;
let partition_key = self
.catalog_cache
.old_gen_partition_key(parquet_file.partition_id)
.await;
let metrics = ParquetChunkMetrics::new(self.metric_registry.as_ref());
ParquetChunk::new(
@ -180,190 +161,60 @@ impl ParquetChunkAdapter {
async fn old_gen_chunk_addr(&self, parquet_file: &ParquetFile) -> ChunkAddr {
ChunkAddr {
db_name: self
.namespace_name(self.table_namespace_id(parquet_file.table_id).await)
.catalog_cache
.namespace_name(
self.catalog_cache
.table_namespace_id(parquet_file.table_id)
.await,
)
.await,
table_name: self.table_name(parquet_file.table_id).await,
table_name: self.catalog_cache.table_name(parquet_file.table_id).await,
partition_key: self
.old_gen_partition_key(parquet_file.sequencer_id, parquet_file.partition_id)
.catalog_cache
.old_gen_partition_key(parquet_file.partition_id)
.await,
chunk_id: data_types::chunk_metadata::ChunkId::from(Uuid::from_u128(
parquet_file.id.get() as _,
)),
}
}
/// Get partition key for old gen.
///
/// This either uses a cached value or -- if required -- creates a fresh string.
async fn old_gen_partition_key(
&self,
sequencer_id: SequencerId,
partition_id: PartitionId,
) -> Arc<str> {
if let Some(key) = self
.old_gen_partition_key_cache
.read()
.get(&(sequencer_id, partition_id))
{
return Arc::clone(key);
}
let partition_key = self.partition_key(partition_id).await;
let og_partition_key = Arc::from(format!("{}-{}", sequencer_id.get(), partition_key));
Arc::clone(
self.old_gen_partition_key_cache
.write()
.entry((sequencer_id, partition_id))
.or_insert(og_partition_key),
)
}
/// Get the partition key for the given partition ID.
///
/// This either uses a cached value or -- if required -- fetches the mapping from the catalog.
async fn partition_key(&self, partition_id: PartitionId) -> Arc<str> {
if let Some(key) = self.partition_cache.read().get(&partition_id) {
return Arc::clone(key);
}
let partition = Backoff::new(&self.backoff_config)
.retry_all_errors("get partition_key", || async {
self.catalog
.repositories()
.await
.partitions()
.get_by_id(partition_id)
.await
})
.await
.expect("retry forever")
.expect("partition gone from catalog?!");
let key = Arc::from(partition.partition_key);
Arc::clone(
self.partition_cache
.write()
.entry(partition_id)
.or_insert(key),
)
}
/// Get the table name and namespace ID for the given table ID.
///
/// This either uses a cached value or -- if required -- fetches the mapping from the catalog.
async fn cached_table(&self, table_id: TableId) -> (Arc<str>, NamespaceId) {
if let Some((name, ns)) = self.table_cache.read().get(&table_id) {
return (Arc::clone(name), *ns);
}
let table = Backoff::new(&self.backoff_config)
.retry_all_errors("get table_name", || async {
self.catalog
.repositories()
.await
.tables()
.get_by_id(table_id)
.await
})
.await
.expect("retry forever")
.expect("table gone from catalog?!");
let name = Arc::from(table.name);
let mut table_cache = self.table_cache.write();
let (name, ns) = table_cache
.entry(table_id)
.or_insert((name, table.namespace_id));
(Arc::clone(name), *ns)
}
/// Get the table name for the given table ID.
///
/// This either uses a cached value or -- if required -- fetches the mapping from the catalog.
async fn table_name(&self, table_id: TableId) -> Arc<str> {
self.cached_table(table_id).await.0
}
/// Get the table namespace ID for the given table ID.
///
/// This either uses a cached value or -- if required -- fetches the mapping from the catalog.
async fn table_namespace_id(&self, table_id: TableId) -> NamespaceId {
self.cached_table(table_id).await.1
}
/// Get the namespace name for the given namespace ID.
///
/// This either uses a cached value or -- if required -- fetches the mapping from the catalog.
async fn namespace_name(&self, namespace_id: NamespaceId) -> Arc<str> {
if let Some(name) = self.namespace_cache.read().get(&namespace_id) {
return Arc::clone(name);
}
let namespace = Backoff::new(&self.backoff_config)
.retry_all_errors("get namespace_name", || async {
self.catalog
.repositories()
.await
.namespaces()
.get_by_id(namespace_id)
.await
})
.await
.expect("retry forever")
.expect("namespace gone from catalog?!");
let name = Arc::from(namespace.name);
Arc::clone(
self.namespace_cache
.write()
.entry(namespace_id)
.or_insert(name),
)
}
}
#[cfg(test)]
mod tests {
use arrow::record_batch::RecordBatch;
use arrow_util::assert_batches_eq;
use bytes::Bytes;
use data_types::partition_metadata::InfluxDbType;
use db::chunk::DbChunk;
use futures::StreamExt;
use iox_catalog::{
interface::{KafkaPartition, SequenceNumber, Timestamp},
mem::MemCatalog,
};
use parquet_file::metadata::IoxMetadata;
use query::{
test::{raw_data, TestChunk},
QueryChunk,
};
use query::QueryChunk;
use schema::selection::Selection;
use time::{MockProvider, Time};
use uuid::Uuid;
use crate::test_util::TestCatalog;
use super::*;
#[tokio::test]
async fn test_create_record() {
let metric_registry = Arc::new(metric::Registry::new());
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new(Arc::clone(&metric_registry)));
let object_store = Arc::new(ObjectStore::new_in_memory());
let time_provider = Arc::new(MockProvider::new(now()));
let catalog = TestCatalog::new();
let adapter = ParquetChunkAdapter::new(
Arc::clone(&catalog),
Arc::clone(&object_store),
metric_registry,
time_provider,
Arc::new(CatalogCache::new(catalog.catalog())),
catalog.object_store(),
catalog.metric_registry(),
catalog.time_provider(),
);
let parquet_file = parquet_file(catalog.as_ref(), &object_store).await;
let parquet_file = catalog
.create_namespace("ns")
.await
.create_table("table")
.await
.create_partition("part", 1)
.await
.create_parquet_file()
.await
.parquet_file
.clone();
let catalog_chunk = adapter.new_catalog_chunk(parquet_file).await;
assert_eq!(
@ -387,146 +238,6 @@ mod tests {
);
}
async fn parquet_file(catalog: &dyn Catalog, object_store: &Arc<ObjectStore>) -> ParquetFile {
let mut repos = catalog.repositories().await;
let kafka_topic = repos
.kafka_topics()
.create_or_get("kafka_topic")
.await
.unwrap();
let sequencer = repos
.sequencers()
.create_or_get(&kafka_topic, KafkaPartition::new(1))
.await
.unwrap();
let query_pool = repos.query_pools().create_or_get("pool").await.unwrap();
let namespace = repos
.namespaces()
.create("ns", "1y", kafka_topic.id, query_pool.id)
.await
.unwrap();
let table = repos
.tables()
.create_or_get("table", namespace.id)
.await
.unwrap();
let partition = repos
.partitions()
.create_or_get("part", sequencer.id, table.id)
.await
.unwrap();
let object_store_id = Uuid::nil();
let min_sequence_number = SequenceNumber::new(1);
let max_sequence_number = SequenceNumber::new(100);
let row_count_expected = 3;
let metadata = IoxMetadata {
object_store_id,
creation_timestamp: now(),
namespace_id: namespace.id,
namespace_name: namespace.name.into(),
sequencer_id: sequencer.id,
table_id: table.id,
table_name: table.name.into(),
partition_id: partition.id,
partition_key: partition.partition_key.into(),
time_of_first_write: now(),
time_of_last_write: now(),
min_sequence_number,
max_sequence_number,
row_count: row_count_expected,
};
let (parquet_metadata_bin, file_size_bytes) =
create_parquet_file(object_store, &metadata).await;
// decode metadata because we need to store them within the catalog
let parquet_metadata = Arc::new(IoxParquetMetaData::from_thrift_bytes(
parquet_metadata_bin.clone(),
));
let decoded_metadata = parquet_metadata.decode().unwrap();
let schema = decoded_metadata.read_schema().unwrap();
let stats = decoded_metadata.read_statistics(&schema).unwrap();
let row_count = stats[0].total_count();
assert_eq!(row_count as i64, row_count_expected);
let ts_min_max = stats
.iter()
.find_map(|stat| {
(stat.influxdb_type == Some(InfluxDbType::Timestamp))
.then(|| stat.stats.timestamp_min_max().unwrap())
})
.unwrap();
let parquet_file = repos
.parquet_files()
.create(
sequencer.id,
table.id,
partition.id,
object_store_id,
min_sequence_number,
max_sequence_number,
Timestamp::new(ts_min_max.min),
Timestamp::new(ts_min_max.max),
file_size_bytes as i64,
parquet_metadata_bin,
row_count as i64,
)
.await
.unwrap();
parquet_file
}
/// Create parquet file and return thrift-encoded and zstd-compressed parquet metadata as well as the file size.
async fn create_parquet_file(
object_store: &Arc<ObjectStore>,
metadata: &IoxMetadata,
) -> (Vec<u8>, usize) {
let iox_object_store = Arc::new(IoxObjectStore::existing(
Arc::clone(object_store),
IoxObjectStore::root_path_for(object_store, uuid::Uuid::new_v4()),
));
let chunk1 = Arc::new(
TestChunk::new("t")
.with_id(1)
.with_time_column()
.with_tag_column("tag1")
.with_i64_field_column("field_int")
.with_three_rows_of_data(),
);
let record_batches = raw_data(&[chunk1]).await;
let schema = record_batches.first().unwrap().schema();
let data = parquet_file::storage::Storage::new(Arc::clone(&iox_object_store))
.parquet_bytes(record_batches, schema, metadata)
.await
.unwrap();
let data = Arc::new(data);
let md = IoxParquetMetaData::from_file_bytes(Arc::clone(&data))
.unwrap()
.unwrap();
let parquet_md = md.thrift_bytes().to_vec();
let data = Arc::try_unwrap(data).expect("dangling reference to data");
let file_size = data.len();
let bytes = Bytes::from(data);
let path = ParquetFilePath::new_new_gen(
metadata.namespace_id,
metadata.table_id,
metadata.sequencer_id,
metadata.partition_id,
metadata.object_store_id,
);
iox_object_store
.put_parquet_file(&path, bytes)
.await
.unwrap();
(parquet_md, file_size)
}
async fn collect_read_filter(chunk: &DbChunk) -> Vec<RecordBatch> {
chunk
.read_filter(&Default::default(), Selection::All)
@ -537,8 +248,4 @@ mod tests {
.map(Result::unwrap)
.collect()
}
fn now() -> Time {
Time::from_timestamp(0, 0)
}
}

View File

@ -12,7 +12,7 @@ use parking_lot::RwLock;
use time::TimeProvider;
use tokio_util::sync::CancellationToken;
use crate::namespace::QuerierNamespace;
use crate::{cache::CatalogCache, namespace::QuerierNamespace};
const SYNC_INTERVAL: Duration = Duration::from_secs(1);
@ -27,6 +27,9 @@ pub struct QuerierDatabase {
/// Catalog.
catalog: Arc<dyn Catalog>,
/// Catalog cache.
catalog_cache: Arc<CatalogCache>,
/// Metric registry
metric_registry: Arc<metric::Registry>,
@ -50,9 +53,12 @@ impl QuerierDatabase {
object_store: Arc<ObjectStore>,
time_provider: Arc<dyn TimeProvider>,
) -> Self {
let catalog_cache = Arc::new(CatalogCache::new(Arc::clone(&catalog)));
Self {
backoff_config: BackoffConfig::default(),
catalog,
catalog_cache,
metric_registry,
namespaces: RwLock::new(HashMap::new()),
object_store,
@ -103,13 +109,14 @@ impl QuerierDatabase {
// perform modification
for name in to_delete {
// TODO(marco): this is currently untested because `iox_catalog` doesn't implement namespace deletion
namespaces_guard.remove(&name);
}
for (name, id) in to_add {
namespaces_guard.insert(
Arc::clone(&name),
Arc::new(QuerierNamespace::new(
Arc::clone(&self.catalog),
Arc::clone(&self.catalog_cache),
name,
id,
Arc::clone(&self.metric_registry),
@ -152,35 +159,31 @@ pub(crate) async fn database_sync_loop(
#[cfg(test)]
mod tests {
use iox_catalog::mem::MemCatalog;
use time::{MockProvider, Time};
use crate::test_util::TestCatalog;
use super::*;
#[tokio::test]
async fn test_sync() {
let metric_registry = Arc::new(metric::Registry::new());
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new(Arc::clone(&metric_registry)));
let object_store = Arc::new(ObjectStore::new_in_memory());
let time_provider = Arc::new(MockProvider::new(Time::from_timestamp(0, 0)));
let catalog = TestCatalog::new();
let db = QuerierDatabase::new(
Arc::clone(&catalog),
metric_registry,
object_store,
time_provider,
catalog.catalog(),
catalog.metric_registry(),
catalog.object_store(),
catalog.time_provider(),
);
assert_eq!(ns_names(&db), vec![]);
db.sync().await;
assert_eq!(ns_names(&db), vec![]);
create_namespace(catalog.as_ref(), "ns1").await;
create_namespace(catalog.as_ref(), "ns2").await;
catalog.create_namespace("ns1").await;
catalog.create_namespace("ns2").await;
db.sync().await;
assert_eq!(ns_names(&db), vec![Arc::from("ns1"), Arc::from("ns2")]);
create_namespace(catalog.as_ref(), "ns3").await;
catalog.create_namespace("ns3").await;
db.sync().await;
assert_eq!(
ns_names(&db),
@ -193,19 +196,4 @@ mod tests {
names.sort();
names
}
async fn create_namespace(catalog: &dyn Catalog, name: &str) {
let mut repos = catalog.repositories().await;
let kafka_topic = repos
.kafka_topics()
.create_or_get("kafka_topic")
.await
.unwrap();
let query_pool = repos.query_pools().create_or_get("pool").await.unwrap();
repos
.namespaces()
.create(name, "1y", kafka_topic.id, query_pool.id)
.await
.unwrap();
}
}

View File

@ -13,6 +13,7 @@
pub use client_util::connection;
mod cache;
mod chunk;
mod database;
/// Flight client to the ingester to request in-memory data.
@ -20,3 +21,6 @@ pub mod flight;
pub mod handler;
mod namespace;
pub mod server;
#[cfg(test)]
mod test_util;

View File

@ -1,11 +1,11 @@
use std::sync::Arc;
use db::catalog::Catalog as DbCatalog;
use iox_catalog::interface::{Catalog, NamespaceId};
use iox_catalog::interface::NamespaceId;
use object_store::ObjectStore;
use time::TimeProvider;
use crate::chunk::ParquetChunkAdapter;
use crate::{cache::CatalogCache, chunk::ParquetChunkAdapter};
/// Maps a catalog namespace to all the in-memory resources and sync-state that the querier needs.
#[derive(Debug)]
@ -28,7 +28,7 @@ impl QuerierNamespace {
///
/// You may call [`sync`](Self::sync) to fill the namespace with chunks.
pub fn new(
catalog: Arc<dyn Catalog>,
catalog_cache: Arc<CatalogCache>,
name: Arc<str>,
id: NamespaceId,
metric_registry: Arc<metric::Registry>,
@ -42,7 +42,7 @@ impl QuerierNamespace {
Arc::clone(&time_provider),
),
chunk_adapter: ParquetChunkAdapter::new(
catalog,
catalog_cache,
object_store,
metric_registry,
time_provider,

272
querier/src/test_util.rs Normal file
View File

@ -0,0 +1,272 @@
use std::sync::Arc;
use bytes::Bytes;
use data_types::partition_metadata::InfluxDbType;
use iox_catalog::{
interface::{
Catalog, KafkaPartition, KafkaTopic, Namespace, ParquetFile, Partition, QueryPool,
SequenceNumber, Sequencer, Table, Timestamp,
},
mem::MemCatalog,
};
use iox_object_store::{IoxObjectStore, ParquetFilePath};
use object_store::ObjectStore;
use parquet_file::metadata::{IoxMetadata, IoxParquetMetaData};
use query::test::{raw_data, TestChunk};
use time::{MockProvider, Time, TimeProvider};
use uuid::Uuid;
pub struct TestCatalog {
pub catalog: Arc<dyn Catalog>,
pub metric_registry: Arc<metric::Registry>,
pub object_store: Arc<ObjectStore>,
pub time_provider: Arc<dyn TimeProvider>,
}
impl TestCatalog {
pub fn new() -> Arc<Self> {
let metric_registry = Arc::new(metric::Registry::new());
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new(Arc::clone(&metric_registry)));
let object_store = Arc::new(ObjectStore::new_in_memory());
let time_provider = Arc::new(MockProvider::new(Time::from_timestamp(0, 0)));
Arc::new(Self {
metric_registry,
catalog,
object_store,
time_provider,
})
}
pub fn catalog(&self) -> Arc<dyn Catalog> {
Arc::clone(&self.catalog)
}
pub fn metric_registry(&self) -> Arc<metric::Registry> {
Arc::clone(&self.metric_registry)
}
pub fn object_store(&self) -> Arc<ObjectStore> {
Arc::clone(&self.object_store)
}
pub fn time_provider(&self) -> Arc<dyn TimeProvider> {
Arc::clone(&self.time_provider)
}
pub async fn create_namespace(self: &Arc<Self>, name: &str) -> Arc<TestNamespace> {
let mut repos = self.catalog.repositories().await;
let kafka_topic = repos
.kafka_topics()
.create_or_get("kafka_topic")
.await
.unwrap();
let query_pool = repos.query_pools().create_or_get("pool").await.unwrap();
let namespace = repos
.namespaces()
.create(name, "1y", kafka_topic.id, query_pool.id)
.await
.unwrap();
Arc::new(TestNamespace {
catalog: Arc::clone(self),
kafka_topic,
query_pool,
namespace,
})
}
}
pub struct TestNamespace {
pub catalog: Arc<TestCatalog>,
pub kafka_topic: KafkaTopic,
pub query_pool: QueryPool,
pub namespace: Namespace,
}
impl TestNamespace {
pub async fn create_table(self: &Arc<Self>, name: &str) -> Arc<TestTable> {
let mut repos = self.catalog.catalog.repositories().await;
let table = repos
.tables()
.create_or_get(name, self.namespace.id)
.await
.unwrap();
Arc::new(TestTable {
catalog: Arc::clone(&self.catalog),
namespace: Arc::clone(self),
table,
})
}
}
pub struct TestTable {
pub catalog: Arc<TestCatalog>,
pub namespace: Arc<TestNamespace>,
pub table: Table,
}
impl TestTable {
pub async fn create_partition(
self: &Arc<Self>,
key: &str,
sequencer: i32,
) -> Arc<TestPartition> {
let mut repos = self.catalog.catalog.repositories().await;
let sequencer = repos
.sequencers()
.create_or_get(&self.namespace.kafka_topic, KafkaPartition::new(sequencer))
.await
.unwrap();
let partition = repos
.partitions()
.create_or_get(key, sequencer.id, self.table.id)
.await
.unwrap();
Arc::new(TestPartition {
catalog: Arc::clone(&self.catalog),
namespace: Arc::clone(&self.namespace),
table: Arc::clone(self),
sequencer,
partition,
})
}
}
pub struct TestPartition {
pub catalog: Arc<TestCatalog>,
pub namespace: Arc<TestNamespace>,
pub table: Arc<TestTable>,
pub sequencer: Sequencer,
pub partition: Partition,
}
impl TestPartition {
pub async fn create_parquet_file(self: &Arc<Self>) -> Arc<TestParquetFile> {
let mut repos = self.catalog.catalog.repositories().await;
let object_store_id = Uuid::nil();
let min_sequence_number = SequenceNumber::new(1);
let max_sequence_number = SequenceNumber::new(100);
let row_count_expected = 3;
let metadata = IoxMetadata {
object_store_id,
creation_timestamp: now(),
namespace_id: self.namespace.namespace.id,
namespace_name: self.namespace.namespace.name.clone().into(),
sequencer_id: self.sequencer.id,
table_id: self.table.table.id,
table_name: self.table.table.name.clone().into(),
partition_id: self.partition.id,
partition_key: self.partition.partition_key.clone().into(),
time_of_first_write: now(),
time_of_last_write: now(),
min_sequence_number,
max_sequence_number,
row_count: row_count_expected,
};
let (parquet_metadata_bin, file_size_bytes) =
create_parquet_file(&self.catalog.object_store, &metadata).await;
// decode metadata because we need to store them within the catalog
let parquet_metadata = Arc::new(IoxParquetMetaData::from_thrift_bytes(
parquet_metadata_bin.clone(),
));
let decoded_metadata = parquet_metadata.decode().unwrap();
let schema = decoded_metadata.read_schema().unwrap();
let stats = decoded_metadata.read_statistics(&schema).unwrap();
let row_count = stats[0].total_count();
assert_eq!(row_count as i64, row_count_expected);
let ts_min_max = stats
.iter()
.find_map(|stat| {
(stat.influxdb_type == Some(InfluxDbType::Timestamp))
.then(|| stat.stats.timestamp_min_max().unwrap())
})
.unwrap();
let parquet_file = repos
.parquet_files()
.create(
self.sequencer.id,
self.table.table.id,
self.partition.id,
object_store_id,
min_sequence_number,
max_sequence_number,
Timestamp::new(ts_min_max.min),
Timestamp::new(ts_min_max.max),
file_size_bytes as i64,
parquet_metadata_bin,
row_count as i64,
)
.await
.unwrap();
Arc::new(TestParquetFile { parquet_file })
}
}
/// Create parquet file and return thrift-encoded and zstd-compressed parquet metadata as well as the file size.
async fn create_parquet_file(
object_store: &Arc<ObjectStore>,
metadata: &IoxMetadata,
) -> (Vec<u8>, usize) {
let iox_object_store = Arc::new(IoxObjectStore::existing(
Arc::clone(object_store),
IoxObjectStore::root_path_for(object_store, uuid::Uuid::new_v4()),
));
let chunk1 = Arc::new(
TestChunk::new("t")
.with_id(1)
.with_time_column()
.with_tag_column("tag1")
.with_i64_field_column("field_int")
.with_three_rows_of_data(),
);
let record_batches = raw_data(&[chunk1]).await;
let schema = record_batches.first().unwrap().schema();
let data = parquet_file::storage::Storage::new(Arc::clone(&iox_object_store))
.parquet_bytes(record_batches, schema, metadata)
.await
.unwrap();
let data = Arc::new(data);
let md = IoxParquetMetaData::from_file_bytes(Arc::clone(&data))
.unwrap()
.unwrap();
let parquet_md = md.thrift_bytes().to_vec();
let data = Arc::try_unwrap(data).expect("dangling reference to data");
let file_size = data.len();
let bytes = Bytes::from(data);
let path = ParquetFilePath::new_new_gen(
metadata.namespace_id,
metadata.table_id,
metadata.sequencer_id,
metadata.partition_id,
metadata.object_store_id,
);
iox_object_store
.put_parquet_file(&path, bytes)
.await
.unwrap();
(parquet_md, file_size)
}
pub struct TestParquetFile {
pub parquet_file: ParquetFile,
}
fn now() -> Time {
Time::from_timestamp(0, 0)
}