Merge pull request #4446 from influxdata/dom/schema-cache-warm

feat(router2): pre-warm namespace schema cache
pull/24376/head
kodiakhq[bot] 2022-04-29 13:27:50 +00:00 committed by GitHub
commit a81fc6f641
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 298 additions and 9 deletions

View File

@ -10,7 +10,12 @@ use data_types2::{
};
use iox_time::TimeProvider;
use snafu::{OptionExt, Snafu};
use std::{collections::BTreeMap, convert::TryFrom, fmt::Debug, sync::Arc};
use std::{
collections::{BTreeMap, HashMap, HashSet},
convert::TryFrom,
fmt::Debug,
sync::Arc,
};
use uuid::Uuid;
#[derive(Debug, Snafu)]
@ -297,6 +302,9 @@ pub trait TableRepo: Send + Sync {
/// Lists all tables in the catalog for the given namespace id.
async fn list_by_namespace_id(&mut self, namespace_id: NamespaceId) -> Result<Vec<Table>>;
/// List all tables.
async fn list(&mut self) -> Result<Vec<Table>>;
/// Gets the table persistence info for the given sequencer
async fn get_table_persist_info(
&mut self,
@ -354,6 +362,9 @@ pub trait ColumnRepo: Send + Sync {
/// Lists all columns in the passed in namespace id.
async fn list_by_namespace_id(&mut self, namespace_id: NamespaceId) -> Result<Vec<Column>>;
/// List all columns.
async fn list(&mut self) -> Result<Vec<Column>>;
}
/// Functions for working with sequencers in the catalog
@ -663,13 +674,112 @@ where
Ok(namespace)
}
/// Fetch all [`NamespaceSchema`] in the catalog.
///
/// This method performs the minimal number of queries needed to build the
/// result set. No table lock is obtained, nor are queries executed within a
/// transaction, but this method does return a point-in-time snapshot of the
/// catalog state.
pub async fn list_schemas(
catalog: &dyn Catalog,
) -> Result<impl Iterator<Item = (Namespace, NamespaceSchema)>> {
let mut repos = catalog.repositories().await;
// In order to obtain a point-in-time snapshot, first fetch the columns,
// then the tables, and then resolve the namespace IDs to Namespace in order
// to construct the schemas.
//
// The set of columns returned forms the state snapshot, with the subsequent
// queries resolving only what is needed to construct schemas for the
// retrieved columns (ignoring any newly added tables/namespaces since the
// column snapshot was taken).
// First fetch all the columns - this is the state snapshot of the catalog
// schemas.
let columns = repos.columns().list().await?;
// Construct the set of table IDs these columns belong to.
let retain_table_ids = columns.iter().map(|c| c.table_id).collect::<HashSet<_>>();
// Fetch all tables, and filter for those that are needed to construct
// schemas for "columns" only.
//
// Discard any tables that have no columns or have been created since
// the "columns" snapshot was retrieved, and construct a map of ID->Table.
let tables = repos
.tables()
.list()
.await?
.into_iter()
.filter_map(|t| {
if !retain_table_ids.contains(&t.id) {
return None;
}
Some((t.id, t))
})
.collect::<HashMap<_, _>>();
// Drop the table ID set as it will not be referenced again.
drop(retain_table_ids);
// Do all the I/O to fetch the namespaces in the background, while this
// thread constructs the NamespaceId->TableSchema map below.
let namespaces = tokio::spawn(async move { repos.namespaces().list().await });
// A set of tables within a single namespace.
type NamespaceTables = BTreeMap<String, TableSchema>;
let mut joined = HashMap::<NamespaceId, NamespaceTables>::default();
for column in columns {
// Resolve the table this column references
let table = tables.get(&column.table_id).expect("no table for column");
let table_schema = joined
// Find or create a record in the joined <NamespaceId, Tables> map
// for this namespace ID.
.entry(table.namespace_id)
.or_default()
// Fetch the schema record for this table, or create an empty one.
.entry(table.name.clone())
.or_insert_with(|| TableSchema::new(column.table_id));
table_schema.add_column(&column);
}
// The table map is no longer needed - immediately reclaim the memory.
drop(tables);
// Convert the Namespace instances into NamespaceSchema instances.
let iter = namespaces
.await
.expect("namespace list task panicked")?
.into_iter()
// Ignore any namespaces that did not exist when the "columns" snapshot
// was created, or have no tables/columns (and therefore have no entry
// in "joined").
.filter_map(move |v| {
let mut ns = NamespaceSchema::new(v.id, v.kafka_topic_id, v.query_pool_id);
ns.tables = joined.remove(&v.id)?;
Some((v, ns))
});
Ok(iter)
}
#[cfg(test)]
pub(crate) mod test_helpers {
use crate::validate_or_insert_schema;
use super::*;
use ::test_helpers::{assert_contains, tracing::TracingCapture};
use data_types2::ColumnId;
use metric::{Attributes, Metric, U64Histogram};
use std::{ops::Add, sync::Arc, time::Duration};
use std::{
ops::{Add, DerefMut},
sync::Arc,
time::Duration,
};
pub(crate) async fn test_catalog(catalog: Arc<dyn Catalog>) {
test_setup(Arc::clone(&catalog)).await;
@ -690,6 +800,7 @@ pub(crate) mod test_helpers {
test_list_by_partiton_not_to_delete(Arc::clone(&catalog)).await;
test_txn_isolation(Arc::clone(&catalog)).await;
test_txn_drop(Arc::clone(&catalog)).await;
test_list_schemas(Arc::clone(&catalog)).await;
let metrics = catalog.metrics();
assert_metric_hit(&*metrics, "kafka_create_or_get");
@ -904,18 +1015,24 @@ pub(crate) mod test_helpers {
.tables()
.get_by_namespace_and_name(namespace2.id, "test_table")
.await
.unwrap(),
Some(test_table)
.unwrap()
.as_ref(),
Some(&test_table)
);
assert_eq!(
repos
.tables()
.get_by_namespace_and_name(namespace2.id, "foo")
.await
.unwrap(),
Some(foo_table)
.unwrap()
.as_ref(),
Some(&foo_table)
);
// All tables should be returned by list(), regardless of namespace
let list = repos.tables().list().await.unwrap();
assert_eq!(list.as_slice(), [tt, test_table, foo_table]);
// test we can get table persistence info with no persistence so far
let seq = repos
.sequencers()
@ -1070,6 +1187,10 @@ pub(crate) mod test_helpers {
want.extend(cols3);
assert_eq!(want, columns);
// Listing columns should return all columns in the catalog
let list = repos.columns().list().await.unwrap();
assert_eq!(list, want);
// test per-namespace column limits
repos
.namespaces()
@ -2885,6 +3006,73 @@ pub(crate) mod test_helpers {
txn.abort().await.unwrap();
}
/// Upsert a namespace called `namespace_name` and write `lines` to it.
async fn populate_namespace<R>(
repos: &mut R,
namespace_name: &str,
lines: &str,
) -> (Namespace, NamespaceSchema)
where
R: RepoCollection + ?Sized,
{
let kafka = repos.kafka_topics().create_or_get("foo").await.unwrap();
let pool = repos.query_pools().create_or_get("foo").await.unwrap();
let namespace = repos
.namespaces()
.create(namespace_name, "inf", kafka.id, pool.id)
.await;
let namespace = match namespace {
Ok(v) => v,
Err(Error::NameExists { .. }) => repos
.namespaces()
.get_by_name(namespace_name)
.await
.unwrap()
.unwrap(),
e @ Err(_) => e.unwrap(),
};
let batches = mutable_batch_lp::lines_to_batches(lines, 42).unwrap();
let batches = batches.iter().map(|(table, batch)| (table.as_str(), batch));
let ns = NamespaceSchema::new(namespace.id, kafka.id, pool.id);
let schema = validate_or_insert_schema(batches, &ns, repos)
.await
.expect("validate schema failed")
.unwrap_or(ns);
(namespace, schema)
}
async fn test_list_schemas(catalog: Arc<dyn Catalog>) {
let mut repos = catalog.repositories().await;
let ns1 = populate_namespace(
repos.deref_mut(),
"ns1",
"cpu,tag=1 field=1i\nanother,tag=1 field=1.0",
)
.await;
let ns2 = populate_namespace(
repos.deref_mut(),
"ns2",
"cpu,tag=1 field=1i\nsomethingelse field=1u",
)
.await;
// Otherwise the in-mem catalog deadlocks.... (but not postgres)
drop(repos);
let got = list_schemas(&*catalog)
.await
.expect("should be able to list the schemas")
.collect::<Vec<_>>();
assert!(got.contains(&ns1), "{:#?}\n\nwant{:#?}", got, &ns1);
assert!(got.contains(&ns2), "{:#?}\n\nwant{:#?}", got, &ns2);
}
fn assert_metric_hit(metrics: &metric::Registry, name: &'static str) {
let histogram = metrics
.get_instrument::<Metric<U64Histogram>>("catalog_op_duration_ms")

View File

@ -247,7 +247,7 @@ mod tests {
let schema = {
let lp: String = $lp.to_string();
let (writes, _) = mutable_batch_lp::lines_to_batches_stats(lp.as_str(), 42)
let writes = mutable_batch_lp::lines_to_batches(lp.as_str(), 42)
.expect("failed to build test writes from LP");
let got = validate_or_insert_schema(writes.iter().map(|(k, v)| (k.as_str(), v)), &schema, txn.deref_mut())

View File

@ -437,6 +437,11 @@ impl TableRepo for MemTxn {
Ok(tables)
}
async fn list(&mut self) -> Result<Vec<Table>> {
let stage = self.stage();
Ok(stage.tables.clone())
}
async fn get_table_persist_info(
&mut self,
sequencer_id: SequencerId,
@ -577,6 +582,11 @@ impl ColumnRepo for MemTxn {
Ok(columns)
}
async fn list(&mut self) -> Result<Vec<Column>> {
let stage = self.stage();
Ok(stage.columns.clone())
}
}
#[async_trait]

View File

@ -214,6 +214,7 @@ decorate!(
"table_get_by_namespace_and_name" = get_by_namespace_and_name(&mut self, namespace_id: NamespaceId, name: &str) -> Result<Option<Table>>;
"table_list_by_namespace_id" = list_by_namespace_id(&mut self, namespace_id: NamespaceId) -> Result<Vec<Table>>;
"get_table_persist_info" = get_table_persist_info(&mut self, sequencer_id: SequencerId, namespace_id: NamespaceId, table_name: &str) -> Result<Option<TablePersistInfo>>;
"table_list" = list(&mut self) -> Result<Vec<Table>>;
]
);
@ -223,6 +224,7 @@ decorate!(
"column_create_or_get" = create_or_get(&mut self, name: &str, table_id: TableId, column_type: ColumnType) -> Result<Column>;
"column_list_by_namespace_id" = list_by_namespace_id(&mut self, namespace_id: NamespaceId) -> Result<Vec<Column>>;
"column_create_or_get_many" = create_or_get_many(&mut self, columns: &[ColumnUpsertRequest<'_>]) -> Result<Vec<Column>>;
"column_list" = list(&mut self) -> Result<Vec<Column>>;
]
);

View File

@ -815,6 +815,15 @@ WHERE namespace_id = $1;
Ok(rec)
}
async fn list(&mut self) -> Result<Vec<Table>> {
let rec = sqlx::query_as::<_, Table>("SELECT * FROM table_name;")
.fetch_all(&mut self.inner)
.await
.map_err(|e| Error::SqlxError { source: e })?;
Ok(rec)
}
async fn get_table_persist_info(
&mut self,
sequencer_id: SequencerId,
@ -921,6 +930,15 @@ WHERE table_name.namespace_id = $1;
Ok(rec)
}
async fn list(&mut self) -> Result<Vec<Column>> {
let rec = sqlx::query_as::<_, Column>("SELECT * FROM column_name;")
.fetch_all(&mut self.inner)
.await
.map_err(|e| Error::SqlxError { source: e })?;
Ok(rec)
}
async fn create_or_get_many(
&mut self,
columns: &[ColumnUpsertRequest<'_>],

View File

@ -6,7 +6,7 @@ use std::{
use async_trait::async_trait;
use clap_blocks::write_buffer::WriteBufferConfig;
use data_types2::{PartitionTemplate, TemplatePart};
use data_types2::{DatabaseName, PartitionTemplate, TemplatePart};
use hashbrown::HashMap;
use hyper::{Body, Request, Response};
use iox_catalog::interface::Catalog;
@ -19,7 +19,9 @@ use router2::{
NamespaceAutocreation, Partitioner, SchemaValidator, ShardedWriteBuffer,
WriteSummaryAdapter,
},
namespace_cache::{metrics::InstrumentedCache, MemoryNamespaceCache, ShardedCache},
namespace_cache::{
metrics::InstrumentedCache, MemoryNamespaceCache, NamespaceCache, ShardedCache,
},
sequencer::Sequencer,
server::{grpc::GrpcDelegate, http::HttpDelegate, RouterServer},
sharder::JumpHash,
@ -179,6 +181,10 @@ pub async fn create_router2_server_type(
&*metrics,
));
pre_warm_schema_cache(&ns_cache, &*catalog)
.await
.expect("namespace cache pre-warming failed");
// Initialise and instrument the schema validator
let schema_validator =
SchemaValidator::new(Arc::clone(&catalog), Arc::clone(&ns_cache), &*metrics);
@ -323,3 +329,68 @@ async fn init_write_buffer(
.collect::<JumpHash<_>>(),
))
}
/// Pre-populate `cache` with the all existing schemas in `catalog`.
async fn pre_warm_schema_cache<T>(
cache: &T,
catalog: &dyn Catalog,
) -> Result<(), iox_catalog::interface::Error>
where
T: NamespaceCache,
{
iox_catalog::interface::list_schemas(catalog)
.await?
.for_each(|(ns, schema)| {
let name = DatabaseName::try_from(ns.name)
.expect("cannot convert existing namespace name to database name");
cache.put_schema(name, schema);
});
Ok(())
}
#[cfg(test)]
mod tests {
use data_types2::ColumnType;
use iox_catalog::mem::MemCatalog;
use super::*;
#[tokio::test]
async fn test_pre_warm_cache() {
let catalog = Arc::new(MemCatalog::new(Default::default()));
let mut repos = catalog.repositories().await;
let kafka = repos.kafka_topics().create_or_get("foo").await.unwrap();
let pool = repos.query_pools().create_or_get("foo").await.unwrap();
let namespace = repos
.namespaces()
.create("test_ns", "inf", kafka.id, pool.id)
.await
.unwrap();
let table = repos
.tables()
.create_or_get("name", namespace.id)
.await
.unwrap();
let _column = repos
.columns()
.create_or_get("name", table.id, ColumnType::U64)
.await
.unwrap();
drop(repos); // Or it'll deadlock.
let cache = Arc::new(MemoryNamespaceCache::default());
pre_warm_schema_cache(&cache, &*catalog)
.await
.expect("pre-warming failed");
let name = DatabaseName::new("test_ns").unwrap();
let got = cache.get_schema(&name).expect("should contain a schema");
assert!(got.tables.get("name").is_some());
}
}