refactor: remove querier sync loop (#4146)

Namespaces are now created on demand and contain their full schema.
Tombstones/chunks are created on demand during the query.

Closes #4123.

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Marco Neumann 2022-03-29 11:47:23 +00:00 committed by GitHub
parent e154d77bf5
commit decd018a6a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 101 additions and 338 deletions

1
Cargo.lock generated
View File

@ -4262,6 +4262,7 @@ dependencies = [
"data_types",
"datafusion 0.1.0",
"db",
"iox_catalog",
"iox_tests",
"itertools",
"metric",

View File

@ -1,29 +1,15 @@
//! Database for the querier that contains all namespaces.
use crate::{
cache::CatalogCache,
chunk::ParquetChunkAdapter,
namespace::QuerierNamespace,
poison::{PoisonCabinet, PoisonPill},
};
use crate::{cache::CatalogCache, chunk::ParquetChunkAdapter, namespace::QuerierNamespace};
use async_trait::async_trait;
use backoff::{Backoff, BackoffConfig};
use data_types2::NamespaceId;
use backoff::BackoffConfig;
use iox_catalog::interface::Catalog;
use object_store::DynObjectStore;
use observability_deps::tracing::{error, info};
use parking_lot::RwLock;
use query::exec::Executor;
use service_common::QueryDatabaseProvider;
use std::{
collections::{BTreeMap, HashMap},
sync::Arc,
time::Duration,
};
use std::{collections::HashMap, sync::Arc};
use time::TimeProvider;
use tokio_util::sync::CancellationToken;
const SYNC_INTERVAL: Duration = Duration::from_secs(1);
/// Database for the querier.
///
@ -63,14 +49,12 @@ impl QueryDatabaseProvider for QuerierDatabase {
type Db = QuerierNamespace;
async fn db(&self, name: &str) -> Option<Arc<Self::Db>> {
self.namespace(name)
self.namespace(name).await
}
}
impl QuerierDatabase {
/// Create new, empty database.
///
/// You may call [`sync`](Self::sync) to fill this database with content.
/// Create new database.
pub fn new(
catalog: Arc<dyn Catalog>,
metric_registry: Arc<metric::Registry>,
@ -102,108 +86,21 @@ impl QuerierDatabase {
}
}
/// List of namespaces.
pub fn namespaces(&self) -> Vec<Arc<QuerierNamespace>> {
self.namespaces.read().values().cloned().collect()
}
/// Get namespace if it exists
pub fn namespace(&self, name: &str) -> Option<Arc<QuerierNamespace>> {
self.namespaces.read().get(name).cloned()
}
/// Sync set of namespaces and the data of the namespaces themselves.
///
/// Should be called regularly.
pub async fn sync(&self) {
let namespaces = Backoff::new(&self.backoff_config)
.retry_all_errors("get namespaces", || async {
self.catalog.repositories().await.namespaces().list().await
})
.await
.expect("unlimited retry");
let namespaces: BTreeMap<Arc<str>, NamespaceId> = namespaces
.into_iter()
.map(|ns| (ns.name.into(), ns.id))
.collect();
// lock namespaces AFTER IO
let querier_namespaces: Vec<_> = {
let mut namespaces_guard = self.namespaces.write();
// calculate set differences
let to_add: Vec<(Arc<str>, NamespaceId)> = namespaces
.iter()
.filter_map(|(name, id)| {
(!namespaces_guard.contains_key(name)).then(|| (Arc::clone(name), *id))
})
.collect();
let to_delete: Vec<Arc<str>> = namespaces_guard
.keys()
.filter_map(|name| (!namespaces.contains_key(name)).then(|| Arc::clone(name)))
.collect();
info!(
add = to_add.len(),
delete = to_delete.len(),
actual = namespaces_guard.len(),
desired = namespaces.len(),
"Syncing namespaces",
);
// perform modification
for name in to_delete {
// TODO(marco): this is currently untested because `iox_catalog` doesn't implement namespace deletion
namespaces_guard.remove(&name);
}
for (name, id) in to_add {
namespaces_guard.insert(
Arc::clone(&name),
Arc::new(QuerierNamespace::new(
pub async fn namespace(&self, name: &str) -> Option<Arc<QuerierNamespace>> {
let name = Arc::from(name.to_owned());
let schema = self
.catalog_cache
.namespace()
.schema(Arc::clone(&name))
.await?;
Some(Arc::new(QuerierNamespace::new(
self.backoff_config.clone(),
Arc::clone(&self.chunk_adapter),
schema,
name,
id,
Arc::clone(&self.exec),
)),
);
}
// get a clone of namespace Arcs so that we can run an async operation
namespaces_guard.values().cloned().collect()
};
// downgrade guard so that other readers are allowed again and sync namespace states.
for namespace in querier_namespaces {
namespace.sync().await;
}
}
}
/// Run regular [`sync`](QuerierDatabase::sync) until shutdown token is canceled.
pub(crate) async fn database_sync_loop(
database: Arc<QuerierDatabase>,
shutdown: CancellationToken,
poison_cabinet: Arc<PoisonCabinet>,
) {
loop {
if shutdown.is_cancelled() {
info!("Database sync shutdown");
return;
}
if poison_cabinet.contains(&PoisonPill::DatabaseSyncPanic) {
panic!("Database sync poisened, panic");
}
if poison_cabinet.contains(&PoisonPill::DatabaseSyncExit) {
error!("Database sync poisened, exit early");
return;
}
database.sync().await;
tokio::select!(
_ = tokio::time::sleep(SYNC_INTERVAL) => {},
_ = shutdown.cancelled() => {},
);
)))
}
}
@ -214,7 +111,7 @@ mod tests {
use super::*;
#[tokio::test]
async fn test_sync() {
async fn test_namespace() {
let catalog = TestCatalog::new();
let db = QuerierDatabase::new(
@ -224,36 +121,10 @@ mod tests {
catalog.time_provider(),
catalog.exec(),
);
assert_eq!(ns_names(&db), vec![]);
db.sync().await;
assert_eq!(ns_names(&db), vec![]);
catalog.create_namespace("ns1").await;
catalog.create_namespace("ns2").await;
db.sync().await;
assert_eq!(ns_names(&db), vec![Arc::from("ns1"), Arc::from("ns2")]);
let ns1_a = db.namespace("ns1").unwrap();
let ns2_a = db.namespace("ns2").unwrap();
assert_eq!(ns1_a.name().as_ref(), "ns1");
assert_eq!(ns2_a.name().as_ref(), "ns2");
assert!(db.namespace("ns3").is_none());
catalog.create_namespace("ns3").await;
db.sync().await;
assert_eq!(
ns_names(&db),
vec![Arc::from("ns1"), Arc::from("ns2"), Arc::from("ns3")]
);
let ns1_b = db.namespace("ns1").unwrap();
let ns2_b = db.namespace("ns2").unwrap();
assert!(Arc::ptr_eq(&ns1_a, &ns1_b));
assert!(Arc::ptr_eq(&ns2_a, &ns2_b));
}
fn ns_names(db: &QuerierDatabase) -> Vec<Arc<str>> {
let mut names: Vec<Arc<str>> = db.namespaces().iter().map(|ns| ns.name()).collect();
names.sort();
names
assert!(db.namespace("ns1").await.is_some());
assert!(db.namespace("ns2").await.is_none());
}
}

View File

@ -12,10 +12,7 @@ use thiserror::Error;
use tokio::task::{JoinError, JoinHandle};
use tokio_util::sync::CancellationToken;
use crate::{
database::{database_sync_loop, QuerierDatabase},
poison::PoisonCabinet,
};
use crate::{database::QuerierDatabase, poison::PoisonCabinet};
#[derive(Debug, Error)]
#[allow(missing_copy_implementations, missing_docs)]
@ -63,14 +60,7 @@ impl QuerierHandlerImpl {
let shutdown = CancellationToken::new();
let poison_cabinet = Arc::new(PoisonCabinet::new());
let join_handles = vec![(
String::from("database sync"),
shared_handle(tokio::spawn(database_sync_loop(
Arc::clone(&database),
shutdown.clone(),
Arc::clone(&poison_cabinet),
))),
)];
let join_handles = vec![];
Self {
database,
join_handles,
@ -98,6 +88,8 @@ impl QuerierHandler for QuerierHandlerImpl {
panic!("Background worker '{name}' exited early!");
}
}
self.shutdown.cancelled().await;
}
fn shutdown(&self) {
@ -123,8 +115,6 @@ mod tests {
use query::exec::Executor;
use time::{MockProvider, Time};
use crate::poison::PoisonPill;
use super::*;
#[tokio::test]
@ -144,26 +134,6 @@ mod tests {
.unwrap();
}
#[tokio::test]
#[should_panic(expected = "Background worker 'database sync' exited early!")]
async fn test_supervise_database_sync_early_exit() {
let querier = TestQuerier::new().querier;
querier.poison_cabinet.add(PoisonPill::DatabaseSyncExit);
tokio::time::timeout(Duration::from_millis(2000), querier.join())
.await
.unwrap();
}
#[tokio::test]
#[should_panic(expected = "JoinError::Panic")]
async fn test_supervise_database_sync_panic() {
let querier = TestQuerier::new().querier;
querier.poison_cabinet.add(PoisonPill::DatabaseSyncPanic);
tokio::time::timeout(Duration::from_millis(2000), querier.join())
.await
.unwrap();
}
struct TestQuerier {
querier: QuerierHandlerImpl,
}

View File

@ -1,11 +1,9 @@
//! Namespace within the whole database.
use crate::{cache::CatalogCache, chunk::ParquetChunkAdapter, table::QuerierTable};
use backoff::BackoffConfig;
use data_types2::NamespaceId;
use data_types2::{NamespaceId, NamespaceSchema};
use iox_catalog::interface::Catalog;
use object_store::DynObjectStore;
use observability_deps::tracing::warn;
use parking_lot::RwLock;
use query::exec::Executor;
use schema::Schema;
use std::{collections::HashMap, sync::Arc};
@ -19,71 +17,72 @@ mod test_util;
/// Maps a catalog namespace to all the in-memory resources and sync-state that the querier needs.
///
/// # Data Structures & Sync
/// Tables and schemas are synced ahead of usage (via [`sync`](Self::sync)) because DataFusion does not implement async
/// Tables and schemas are created when [`QuerierNamespace`] is created because DataFusion does not implement async
/// schema inspection. The actual payload (chunks and tombstones) are only queried on demand.
///
/// Most access to the [IOx Catalog](Catalog) are cached.
#[derive(Debug)]
pub struct QuerierNamespace {
/// Backoff config for IO operations.
backoff_config: BackoffConfig,
/// The catalog.
catalog: Arc<dyn Catalog>,
/// Catalog IO cache.
catalog_cache: Arc<CatalogCache>,
/// Tables in this namespace.
tables: RwLock<Arc<HashMap<Arc<str>, Arc<QuerierTable>>>>,
/// Adapter to create chunks.
chunk_adapter: Arc<ParquetChunkAdapter>,
/// ID of this namespace.
id: NamespaceId,
/// Name of this namespace.
name: Arc<str>,
/// Tables in this namespace.
tables: Arc<HashMap<Arc<str>, Arc<QuerierTable>>>,
/// Executor for queries.
exec: Arc<Executor>,
}
impl QuerierNamespace {
/// Create new, empty namespace.
///
/// You may call [`sync`](Self::sync) to fill the namespace with chunks.
/// Create new namespace for given schema.
pub fn new(
backoff_config: BackoffConfig,
chunk_adapter: Arc<ParquetChunkAdapter>,
schema: Arc<NamespaceSchema>,
name: Arc<str>,
id: NamespaceId,
exec: Arc<Executor>,
) -> Self {
let catalog_cache = Arc::clone(chunk_adapter.catalog_cache());
let catalog = catalog_cache.catalog();
let tables: HashMap<_, _> = schema
.tables
.iter()
.map(|(name, table_schema)| {
let name = Arc::from(name.clone());
let id = table_schema.id;
let schema = Schema::try_from(table_schema.clone()).expect("cannot build schema");
let table = Arc::new(QuerierTable::new(
backoff_config.clone(),
id,
Arc::clone(&name),
Arc::new(schema),
Arc::clone(&chunk_adapter),
));
(name, table)
})
.collect();
let id = schema.id;
Self {
backoff_config,
catalog,
catalog_cache: Arc::clone(&catalog_cache),
tables: RwLock::new(Arc::new(HashMap::new())),
chunk_adapter,
id,
name,
tables: Arc::new(tables),
exec,
}
}
/// Create new empty namespace for testing.
/// Create new namespace for given schema, for testing.
pub fn new_testing(
catalog: Arc<dyn Catalog>,
object_store: Arc<DynObjectStore>,
metric_registry: Arc<metric::Registry>,
time_provider: Arc<dyn TimeProvider>,
name: Arc<str>,
id: NamespaceId,
schema: Arc<NamespaceSchema>,
exec: Arc<Executor>,
) -> Self {
let catalog_cache = Arc::new(CatalogCache::new(catalog, Arc::clone(&time_provider)));
@ -94,124 +93,44 @@ impl QuerierNamespace {
time_provider,
));
Self::new(BackoffConfig::default(), chunk_adapter, name, id, exec)
Self::new(BackoffConfig::default(), chunk_adapter, schema, name, exec)
}
/// Namespace name.
pub fn name(&self) -> Arc<str> {
Arc::clone(&self.name)
}
/// Sync partial namespace state.
///
/// This includes:
/// - tables
/// - schemas
///
/// Chunks and tombstones are queried on-demand.
///
/// Should be called regularly.
pub async fn sync(&self) {
let catalog_schema_desired = match self
.catalog_cache
.namespace()
.schema(Arc::clone(&self.name))
.await
{
Some(schema) => schema,
None => {
warn!(
namespace = self.name.as_ref(),
"Cannot sync namespace because it is gone",
);
return;
}
};
let tables: HashMap<_, _> = catalog_schema_desired
.tables
.iter()
.map(|(name, table_schema)| {
let name = Arc::from(name.clone());
let id = table_schema.id;
let schema = Schema::try_from(table_schema.clone()).expect("cannot build schema");
let table = Arc::new(QuerierTable::new(
self.backoff_config.clone(),
id,
Arc::clone(&name),
Arc::new(schema),
Arc::clone(&self.chunk_adapter),
));
(name, table)
})
.collect();
*self.tables.write() = Arc::new(tables);
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{cache::namespace::TTL_EXISTING, namespace::test_util::querier_namespace};
use crate::namespace::test_util::querier_namespace;
use data_types2::ColumnType;
use iox_tests::util::TestCatalog;
use schema::{builder::SchemaBuilder, InfluxColumnType, InfluxFieldType};
#[tokio::test]
async fn test_sync_namespace_gone() {
let catalog = TestCatalog::new();
let catalog_cache = Arc::new(CatalogCache::new(
catalog.catalog(),
catalog.time_provider(),
));
let chunk_adapter = Arc::new(ParquetChunkAdapter::new(
catalog_cache,
catalog.object_store(),
catalog.metric_registry(),
catalog.time_provider(),
));
let querier_namespace = QuerierNamespace::new(
BackoffConfig::default(),
chunk_adapter,
"ns".into(),
NamespaceId::new(1),
catalog.exec(),
);
// The container (`QuerierDatabase`) should prune the namespace if it's gone, however the `sync` might still be
// in-progress and must not block or panic.
querier_namespace.sync().await;
}
#[tokio::test]
async fn test_sync_tables() {
let catalog = TestCatalog::new();
let ns = catalog.create_namespace("ns").await;
let querier_namespace = querier_namespace(&ns);
querier_namespace.sync().await;
assert_eq!(tables(&querier_namespace), Vec::<String>::new());
let qns = querier_namespace(&ns).await;
assert_eq!(tables(&qns), Vec::<String>::new());
ns.create_table("table1").await;
ns.create_table("table2").await;
catalog.mock_time_provider().inc(TTL_EXISTING);
querier_namespace.sync().await;
let qns = querier_namespace(&ns).await;
assert_eq!(
tables(&querier_namespace),
tables(&qns),
vec![String::from("table1"), String::from("table2")]
);
ns.create_table("table3").await;
catalog.mock_time_provider().inc(TTL_EXISTING);
querier_namespace.sync().await;
let qns = querier_namespace(&ns).await;
assert_eq!(
tables(&querier_namespace),
tables(&qns),
vec![
String::from("table1"),
String::from("table2"),
@ -227,31 +146,27 @@ mod tests {
let ns = catalog.create_namespace("ns").await;
let table = ns.create_table("table").await;
let querier_namespace = querier_namespace(&ns);
querier_namespace.sync().await;
let qns = querier_namespace(&ns).await;
let expected_schema = SchemaBuilder::new().build().unwrap();
let actual_schema = schema(&querier_namespace, "table");
let actual_schema = schema(&qns, "table");
assert_eq!(actual_schema.as_ref(), &expected_schema,);
table.create_column("col1", ColumnType::I64).await;
table.create_column("col2", ColumnType::Bool).await;
table.create_column("col3", ColumnType::Tag).await;
catalog.mock_time_provider().inc(TTL_EXISTING);
querier_namespace.sync().await;
let qns = querier_namespace(&ns).await;
let expected_schema = SchemaBuilder::new()
.influx_column("col1", InfluxColumnType::Field(InfluxFieldType::Integer))
.influx_column("col2", InfluxColumnType::Field(InfluxFieldType::Boolean))
.influx_column("col3", InfluxColumnType::Tag)
.build()
.unwrap();
let actual_schema = schema(&querier_namespace, "table");
let actual_schema = schema(&qns, "table");
assert_eq!(actual_schema.as_ref(), &expected_schema,);
table.create_column("col4", ColumnType::Tag).await;
table.create_column("col5", ColumnType::Time).await;
catalog.mock_time_provider().inc(TTL_EXISTING);
querier_namespace.sync().await;
let qns = querier_namespace(&ns).await;
let expected_schema = SchemaBuilder::new()
.influx_column("col1", InfluxColumnType::Field(InfluxFieldType::Integer))
.influx_column("col2", InfluxColumnType::Field(InfluxFieldType::Boolean))
@ -260,7 +175,7 @@ mod tests {
.influx_column("col5", InfluxColumnType::Timestamp)
.build()
.unwrap();
let actual_schema = schema(&querier_namespace, "table");
let actual_schema = schema(&qns, "table");
assert_eq!(actual_schema.as_ref(), &expected_schema,);
}
@ -276,7 +191,6 @@ mod tests {
sorted(
querier_namespace
.tables
.read()
.keys()
.map(|s| s.to_string())
.collect(),
@ -284,6 +198,6 @@ mod tests {
}
fn schema(querier_namespace: &QuerierNamespace, table: &str) -> Arc<Schema> {
Arc::clone(querier_namespace.tables.read().get(table).unwrap().schema())
Arc::clone(querier_namespace.tables.get(table).unwrap().schema())
}
}

View File

@ -17,16 +17,13 @@ use crate::{namespace::QuerierNamespace, table::QuerierTable};
impl QueryDatabaseMeta for QuerierNamespace {
fn table_names(&self) -> Vec<String> {
let mut names: Vec<_> = self.tables.read().keys().map(|s| s.to_string()).collect();
let mut names: Vec<_> = self.tables.keys().map(|s| s.to_string()).collect();
names.sort();
names
}
fn table_schema(&self, table_name: &str) -> Option<Arc<Schema>> {
self.tables
.read()
.get(table_name)
.map(|t| Arc::clone(t.schema()))
self.tables.get(table_name).map(|t| Arc::clone(t.schema()))
}
}
@ -34,7 +31,7 @@ impl QueryDatabaseMeta for QuerierNamespace {
impl QueryDatabase for QuerierNamespace {
async fn chunks(&self, table_name: &str, predicate: &Predicate) -> Vec<Arc<dyn QueryChunk>> {
// get table metadata
let table = match self.tables.read().get(table_name).map(Arc::clone) {
let table = match self.tables.get(table_name).map(Arc::clone) {
Some(table) => table,
None => {
// table gone
@ -85,7 +82,7 @@ pub struct QuerierCatalogProvider {
impl QuerierCatalogProvider {
fn from_namespace(namespace: &QuerierNamespace) -> Self {
Self {
tables: Arc::clone(&namespace.tables.read()),
tables: Arc::clone(&namespace.tables),
}
}
}
@ -249,8 +246,7 @@ mod tests {
.create_tombstone(1, 1, 13, "host=d")
.await;
let querier_namespace = Arc::new(querier_namespace(&ns));
querier_namespace.sync().await;
let querier_namespace = Arc::new(querier_namespace(&ns).await);
assert_query(
&querier_namespace,

View File

@ -1,18 +1,26 @@
use std::sync::Arc;
use iox_catalog::interface::get_schema_by_name;
use iox_tests::util::TestNamespace;
use super::QuerierNamespace;
/// Create [`QuerierNamespace`] for testing.
pub fn querier_namespace(ns: &Arc<TestNamespace>) -> QuerierNamespace {
pub async fn querier_namespace(ns: &Arc<TestNamespace>) -> QuerierNamespace {
let mut repos = ns.catalog.catalog.repositories().await;
let schema = Arc::new(
get_schema_by_name(&ns.namespace.name, repos.as_mut())
.await
.unwrap(),
);
QuerierNamespace::new_testing(
ns.catalog.catalog(),
ns.catalog.object_store(),
ns.catalog.metric_registry(),
ns.catalog.time_provider(),
ns.namespace.name.clone().into(),
ns.namespace.id,
schema,
ns.catalog.exec(),
)
}

View File

@ -8,10 +8,7 @@ use parking_lot::{RwLock, RwLockUpgradableReadGuard};
use pin_project::pin_project;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum PoisonPill {
DatabaseSyncPanic,
DatabaseSyncExit,
}
pub enum PoisonPill {}
#[derive(Debug)]
struct PoisonCabinetInner {

View File

@ -12,6 +12,7 @@ async-trait = "0.1"
data_types = { path = "../data_types" }
datafusion = { path = "../datafusion" }
db = { path = "../db" }
iox_catalog = { path = "../iox_catalog" }
iox_tests = { path = "../iox_tests" }
itertools = "0.10"
mutable_batch_lp = { path = "../mutable_batch_lp" }

View File

@ -7,6 +7,7 @@ use db::{
utils::{count_mub_table_chunks, count_os_table_chunks, count_rub_table_chunks, make_db},
Db,
};
use iox_catalog::interface::get_schema_by_name;
use iox_tests::util::{TestCatalog, TestNamespace};
use itertools::Itertools;
use querier::QuerierNamespace;
@ -1227,16 +1228,20 @@ async fn make_ng_chunk(ns: Arc<TestNamespace>, chunk: ChunkDataNew<'_, '_>) -> S
}
async fn make_querier_namespace(ns: Arc<TestNamespace>) -> Arc<QuerierNamespace> {
let db = Arc::new(QuerierNamespace::new_testing(
let mut repos = ns.catalog.catalog.repositories().await;
let schema = Arc::new(
get_schema_by_name(&ns.namespace.name, repos.as_mut())
.await
.unwrap(),
);
Arc::new(QuerierNamespace::new_testing(
ns.catalog.catalog(),
ns.catalog.object_store(),
ns.catalog.metric_registry(),
ns.catalog.time_provider(),
ns.namespace.name.clone().into(),
ns.namespace.id,
schema,
ns.catalog.exec(),
));
db.sync().await;
db
))
}