refactor: carry namespace name in NamespaceData

Changes the ingester's NamespaceData to carry a ref-counted string
identifier as well as the ID.

The backing storage for the name in NamespaceData is shared with the
index map in ShardData, so it is effectively free!
pull/24376/head
Dom Dwyer 2022-10-04 15:41:25 +02:00
parent e81dad972f
commit abb9122e2c
6 changed files with 84 additions and 26 deletions

View File

@ -815,7 +815,7 @@ mod tests {
let (table_id, partition_id) = {
let sd = data.shards.get(&shard1.id).unwrap();
let n = sd.namespace("foo").unwrap();
let n = sd.namespace(&"foo".into()).unwrap();
let mem_table = n.table_data("mem").unwrap();
assert!(n.table_data("mem").is_some());
let mem_table = mem_table.write().await;
@ -957,7 +957,7 @@ mod tests {
assert_progress(&data, shard_index, expected_progress).await;
let sd = data.shards.get(&shard1.id).unwrap();
let n = sd.namespace("foo").unwrap();
let n = sd.namespace(&"foo".into()).unwrap();
let partition_id;
let table_id;
{
@ -1193,7 +1193,7 @@ mod tests {
// Get the namespace
let sd = data.shards.get(&shard1.id).unwrap();
let n = sd.namespace("foo").unwrap();
let n = sd.namespace(&"foo".into()).unwrap();
let expected_progress = ShardProgress::new().with_buffered(SequenceNumber::new(1));
assert_progress(&data, shard_index, expected_progress).await;
@ -1356,7 +1356,13 @@ mod tests {
let partition_provider = Arc::new(CatalogPartitionResolver::new(Arc::clone(&catalog)));
let data = NamespaceData::new(namespace.id, shard.id, partition_provider, &*metrics);
let data = NamespaceData::new(
namespace.id,
"foo".into(),
shard.id,
partition_provider,
&*metrics,
);
// w1 should be ignored because the per-partition replay offset is set
// to 1 already, so it shouldn't be buffered and the buffer should
@ -1473,7 +1479,7 @@ mod tests {
assert_eq!(
data.shard(shard1.id)
.unwrap()
.namespace(&namespace.name)
.namespace(&namespace.name.clone().into())
.unwrap()
.table_data("mem")
.unwrap()
@ -1505,7 +1511,7 @@ mod tests {
assert_eq!(
data.shard(shard1.id)
.unwrap()
.namespace(&namespace.name)
.namespace(&namespace.name.into())
.unwrap()
.table_data("mem")
.unwrap()

View File

@ -44,11 +44,37 @@ impl DoubleRef {
}
}
/// The string name / identifier of a Namespace.
///
/// A reference-counted, cheap clone-able string.
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub(crate) struct NamespaceName(Arc<str>);
impl<T> From<T> for NamespaceName
where
T: AsRef<str>,
{
fn from(v: T) -> Self {
Self(Arc::from(v.as_ref()))
}
}
impl std::ops::Deref for NamespaceName {
type Target = str;
fn deref(&self) -> &Self::Target {
&self.0
}
}
/// Data of a Namespace that belongs to a given Shard
#[derive(Debug)]
pub(crate) struct NamespaceData {
namespace_id: NamespaceId,
#[allow(dead_code)]
namespace_name: NamespaceName,
/// The catalog ID of the shard this namespace is being populated from.
shard_id: ShardId,
@ -111,6 +137,7 @@ impl NamespaceData {
/// Initialize new tables with default partition template of daily
pub fn new(
namespace_id: NamespaceId,
namespace_name: NamespaceName,
shard_id: ShardId,
partition_provider: Arc<dyn PartitionProvider>,
metrics: &metric::Registry,
@ -124,6 +151,7 @@ impl NamespaceData {
Self {
namespace_id,
namespace_name,
shard_id,
tables: Default::default(),
table_count,
@ -353,6 +381,12 @@ impl NamespaceData {
pub(super) fn table_count(&self) -> &U64Counter {
&self.table_count
}
/// Returns the [`NamespaceName`] for this namespace.
#[cfg(test)]
pub(crate) fn namespace_name(&self) -> &NamespaceName {
&self.namespace_name
}
}
/// RAAI struct that sets buffering sequence number on creation and clears it on free
@ -432,7 +466,16 @@ mod tests {
),
));
let ns = NamespaceData::new(ns_id, shard_id, partition_provider, &*metrics);
let ns = NamespaceData::new(
ns_id,
NAMESPACE_NAME.into(),
shard_id,
partition_provider,
&*metrics,
);
// Assert the namespace name was stored
assert_eq!(&**ns.namespace_name(), NAMESPACE_NAME);
// Assert the namespace does not contain the test data
assert!(ns.table_data(TABLE_NAME).is_none());

View File

@ -11,7 +11,10 @@ use parking_lot::RwLock;
use snafu::{OptionExt, ResultExt};
use write_summary::ShardProgress;
use super::{namespace::NamespaceData, partition::resolver::PartitionProvider};
use super::{
namespace::{NamespaceData, NamespaceName},
partition::resolver::PartitionProvider,
};
use crate::lifecycle::LifecycleHandle;
/// A double-referenced map where [`NamespaceData`] can be looked up by name, or
@ -19,12 +22,12 @@ use crate::lifecycle::LifecycleHandle;
#[derive(Debug, Default)]
struct DoubleRef {
// TODO(4880): this can be removed when IDs are sent over the wire.
by_name: HashMap<String, Arc<NamespaceData>>,
by_name: HashMap<NamespaceName, Arc<NamespaceData>>,
by_id: HashMap<NamespaceId, Arc<NamespaceData>>,
}
impl DoubleRef {
fn insert(&mut self, name: String, ns: NamespaceData) -> Arc<NamespaceData> {
fn insert(&mut self, name: NamespaceName, ns: NamespaceData) -> Arc<NamespaceData> {
let id = ns.namespace_id();
let ns = Arc::new(ns);
@ -33,7 +36,7 @@ impl DoubleRef {
ns
}
fn by_name(&self, name: &str) -> Option<Arc<NamespaceData>> {
fn by_name(&self, name: &NamespaceName) -> Option<Arc<NamespaceData>> {
self.by_name.get(name).map(Arc::clone)
}
@ -99,7 +102,7 @@ impl ShardData {
lifecycle_handle: &dyn LifecycleHandle,
executor: &Executor,
) -> Result<bool, super::Error> {
let namespace_data = match self.namespace(dml_operation.namespace()) {
let namespace_data = match self.namespace(&NamespaceName::from(dml_operation.namespace())) {
Some(d) => d,
None => {
self.insert_namespace(dml_operation.namespace(), &**catalog)
@ -113,7 +116,7 @@ impl ShardData {
}
/// Gets the namespace data out of the map
pub(crate) fn namespace(&self, namespace: &str) -> Option<Arc<NamespaceData>> {
pub(crate) fn namespace(&self, namespace: &NamespaceName) -> Option<Arc<NamespaceData>> {
let n = self.namespaces.read();
n.by_name(namespace)
}
@ -136,6 +139,8 @@ impl ShardData {
catalog: &dyn Catalog,
) -> Result<Arc<NamespaceData>, super::Error> {
let mut repos = catalog.repositories().await;
let ns_name = NamespaceName::from(namespace);
let namespace = repos
.namespaces()
.get_by_name(namespace)
@ -145,16 +150,17 @@ impl ShardData {
let mut n = self.namespaces.write();
Ok(match n.by_name(&namespace.name) {
Ok(match n.by_name(&ns_name) {
Some(v) => v,
None => {
self.namespace_count.inc(1);
// Insert the table and then return a ref to it.
n.insert(
namespace.name,
ns_name.clone(),
NamespaceData::new(
namespace.id,
ns_name,
self.shard_id,
Arc::clone(&self.partition_provider),
&*self.metrics,
@ -240,7 +246,7 @@ mod tests {
);
// Assert the namespace does not contain the test data
assert!(shard.namespace(NAMESPACE_NAME).is_none());
assert!(shard.namespace(&NAMESPACE_NAME.into()).is_none());
assert!(shard.namespace_by_id(ns_id).is_none());
// Write some test data
@ -261,7 +267,7 @@ mod tests {
.expect("buffer op should succeed");
// Both forms of referencing the table should succeed
assert!(shard.namespace(NAMESPACE_NAME).is_some());
assert!(shard.namespace(&NAMESPACE_NAME.into()).is_some());
assert!(shard.namespace_by_id(ns_id).is_some());
// And the table counter metric should increase

View File

@ -499,11 +499,12 @@ mod tests {
// give the writes some time to go through the buffer. Exit once we've verified there's
// data in there from both writes.
tokio::time::timeout(Duration::from_secs(2), async {
let ns_name = ingester.namespace.name.into();
loop {
let mut has_measurement = false;
if let Some(data) = ingester.ingester.data.shard(ingester.shard.id) {
if let Some(data) = data.namespace(&ingester.namespace.name) {
if let Some(data) = data.namespace(&ns_name) {
// verify there's data in the buffer
if let Some((b, _)) = data.snapshot("a", &"1970-01-01".into()).await {
if let Some(b) = b.first() {
@ -740,11 +741,12 @@ mod tests {
// give the writes some time to go through the buffer. Exit once we've verified there's
// data in there
tokio::time::timeout(Duration::from_secs(1), async move {
let ns_name = namespace.name.into();
loop {
let mut has_measurement = false;
if let Some(data) = ingester.data.shard(shard.id) {
if let Some(data) = data.namespace(&namespace.name) {
if let Some(data) = data.namespace(&ns_name) {
// verify there's data in the buffer
if let Some((b, _)) = data.snapshot("cpu", &"1970-01-01".into()).await {
if let Some(b) = b.first() {

View File

@ -12,8 +12,8 @@ use snafu::{ensure, Snafu};
use crate::{
data::{
partition::UnpersistedPartitionData, IngesterData, IngesterQueryPartition,
IngesterQueryResponse,
namespace::NamespaceName, partition::UnpersistedPartitionData, IngesterData,
IngesterQueryPartition, IngesterQueryResponse,
},
query::QueryableBatch,
};
@ -57,7 +57,8 @@ pub async fn prepare_data_to_querier(
let mut found_namespace = false;
for (shard_id, shard_data) in ingest_data.shards() {
debug!(shard_id=%shard_id.get());
let namespace_data = match shard_data.namespace(&request.namespace) {
let namespace_name = NamespaceName::from(&request.namespace);
let namespace_data = match shard_data.namespace(&namespace_name) {
Some(namespace_data) => {
debug!(namespace=%request.namespace, "found namespace");
found_namespace = true;

View File

@ -655,7 +655,7 @@ pub(crate) async fn make_ingester_data(two_partitions: bool, loc: DataLocation)
let _ignored = ingester
.shard(shard_id)
.unwrap()
.namespace(TEST_NAMESPACE)
.namespace(&TEST_NAMESPACE.into())
.unwrap()
.snapshot_to_persisting(TEST_TABLE, &PartitionKey::from(TEST_PARTITION_1))
.await;
@ -664,7 +664,7 @@ pub(crate) async fn make_ingester_data(two_partitions: bool, loc: DataLocation)
let _ignored = ingester
.shard(shard_id)
.unwrap()
.namespace(TEST_NAMESPACE)
.namespace(&TEST_NAMESPACE.into())
.unwrap()
.snapshot(TEST_TABLE, &PartitionKey::from(TEST_PARTITION_1))
.await;
@ -824,7 +824,7 @@ async fn make_one_partition_with_tombstones(
let _ignored = ingester
.shard(shard_id)
.unwrap()
.namespace(TEST_NAMESPACE)
.namespace(&TEST_NAMESPACE.into())
.unwrap()
.snapshot_to_persisting(TEST_TABLE, &PartitionKey::from(TEST_PARTITION_1))
.await;
@ -833,7 +833,7 @@ async fn make_one_partition_with_tombstones(
let _ignored = ingester
.shard(shard_id)
.unwrap()
.namespace(TEST_NAMESPACE)
.namespace(&TEST_NAMESPACE.into())
.unwrap()
.snapshot(TEST_TABLE, &PartitionKey::from(TEST_PARTITION_1))
.await;