diff --git a/ingester/src/data.rs b/ingester/src/data.rs index 89486ed2df..5bbd422ca7 100644 --- a/ingester/src/data.rs +++ b/ingester/src/data.rs @@ -815,7 +815,7 @@ mod tests { let (table_id, partition_id) = { let sd = data.shards.get(&shard1.id).unwrap(); - let n = sd.namespace("foo").unwrap(); + let n = sd.namespace(&"foo".into()).unwrap(); let mem_table = n.table_data("mem").unwrap(); assert!(n.table_data("mem").is_some()); let mem_table = mem_table.write().await; @@ -957,7 +957,7 @@ mod tests { assert_progress(&data, shard_index, expected_progress).await; let sd = data.shards.get(&shard1.id).unwrap(); - let n = sd.namespace("foo").unwrap(); + let n = sd.namespace(&"foo".into()).unwrap(); let partition_id; let table_id; { @@ -1193,7 +1193,7 @@ mod tests { // Get the namespace let sd = data.shards.get(&shard1.id).unwrap(); - let n = sd.namespace("foo").unwrap(); + let n = sd.namespace(&"foo".into()).unwrap(); let expected_progress = ShardProgress::new().with_buffered(SequenceNumber::new(1)); assert_progress(&data, shard_index, expected_progress).await; @@ -1356,7 +1356,13 @@ mod tests { let partition_provider = Arc::new(CatalogPartitionResolver::new(Arc::clone(&catalog))); - let data = NamespaceData::new(namespace.id, shard.id, partition_provider, &*metrics); + let data = NamespaceData::new( + namespace.id, + "foo".into(), + shard.id, + partition_provider, + &*metrics, + ); // w1 should be ignored because the per-partition replay offset is set // to 1 already, so it shouldn't be buffered and the buffer should @@ -1473,7 +1479,7 @@ mod tests { assert_eq!( data.shard(shard1.id) .unwrap() - .namespace(&namespace.name) + .namespace(&namespace.name.clone().into()) .unwrap() .table_data("mem") .unwrap() @@ -1505,7 +1511,7 @@ mod tests { assert_eq!( data.shard(shard1.id) .unwrap() - .namespace(&namespace.name) + .namespace(&namespace.name.into()) .unwrap() .table_data("mem") .unwrap() diff --git a/ingester/src/data/namespace.rs b/ingester/src/data/namespace.rs index 987dad6c27..418b38c6db 100644 --- a/ingester/src/data/namespace.rs +++ b/ingester/src/data/namespace.rs @@ -44,11 +44,37 @@ impl DoubleRef { } } +/// The string name / identifier of a Namespace. +/// +/// A reference-counted, cheap clone-able string. +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub(crate) struct NamespaceName(Arc); + +impl From for NamespaceName +where + T: AsRef, +{ + fn from(v: T) -> Self { + Self(Arc::from(v.as_ref())) + } +} + +impl std::ops::Deref for NamespaceName { + type Target = str; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + /// Data of a Namespace that belongs to a given Shard #[derive(Debug)] pub(crate) struct NamespaceData { namespace_id: NamespaceId, + #[allow(dead_code)] + namespace_name: NamespaceName, + /// The catalog ID of the shard this namespace is being populated from. shard_id: ShardId, @@ -111,6 +137,7 @@ impl NamespaceData { /// Initialize new tables with default partition template of daily pub fn new( namespace_id: NamespaceId, + namespace_name: NamespaceName, shard_id: ShardId, partition_provider: Arc, metrics: &metric::Registry, @@ -124,6 +151,7 @@ impl NamespaceData { Self { namespace_id, + namespace_name, shard_id, tables: Default::default(), table_count, @@ -353,6 +381,12 @@ impl NamespaceData { pub(super) fn table_count(&self) -> &U64Counter { &self.table_count } + + /// Returns the [`NamespaceName`] for this namespace. + #[cfg(test)] + pub(crate) fn namespace_name(&self) -> &NamespaceName { + &self.namespace_name + } } /// RAAI struct that sets buffering sequence number on creation and clears it on free @@ -432,7 +466,16 @@ mod tests { ), )); - let ns = NamespaceData::new(ns_id, shard_id, partition_provider, &*metrics); + let ns = NamespaceData::new( + ns_id, + NAMESPACE_NAME.into(), + shard_id, + partition_provider, + &*metrics, + ); + + // Assert the namespace name was stored + assert_eq!(&**ns.namespace_name(), NAMESPACE_NAME); // Assert the namespace does not contain the test data assert!(ns.table_data(TABLE_NAME).is_none()); diff --git a/ingester/src/data/shard.rs b/ingester/src/data/shard.rs index 3390b2aed8..11432f688c 100644 --- a/ingester/src/data/shard.rs +++ b/ingester/src/data/shard.rs @@ -11,7 +11,10 @@ use parking_lot::RwLock; use snafu::{OptionExt, ResultExt}; use write_summary::ShardProgress; -use super::{namespace::NamespaceData, partition::resolver::PartitionProvider}; +use super::{ + namespace::{NamespaceData, NamespaceName}, + partition::resolver::PartitionProvider, +}; use crate::lifecycle::LifecycleHandle; /// A double-referenced map where [`NamespaceData`] can be looked up by name, or @@ -19,12 +22,12 @@ use crate::lifecycle::LifecycleHandle; #[derive(Debug, Default)] struct DoubleRef { // TODO(4880): this can be removed when IDs are sent over the wire. - by_name: HashMap>, + by_name: HashMap>, by_id: HashMap>, } impl DoubleRef { - fn insert(&mut self, name: String, ns: NamespaceData) -> Arc { + fn insert(&mut self, name: NamespaceName, ns: NamespaceData) -> Arc { let id = ns.namespace_id(); let ns = Arc::new(ns); @@ -33,7 +36,7 @@ impl DoubleRef { ns } - fn by_name(&self, name: &str) -> Option> { + fn by_name(&self, name: &NamespaceName) -> Option> { self.by_name.get(name).map(Arc::clone) } @@ -99,7 +102,7 @@ impl ShardData { lifecycle_handle: &dyn LifecycleHandle, executor: &Executor, ) -> Result { - let namespace_data = match self.namespace(dml_operation.namespace()) { + let namespace_data = match self.namespace(&NamespaceName::from(dml_operation.namespace())) { Some(d) => d, None => { self.insert_namespace(dml_operation.namespace(), &**catalog) @@ -113,7 +116,7 @@ impl ShardData { } /// Gets the namespace data out of the map - pub(crate) fn namespace(&self, namespace: &str) -> Option> { + pub(crate) fn namespace(&self, namespace: &NamespaceName) -> Option> { let n = self.namespaces.read(); n.by_name(namespace) } @@ -136,6 +139,8 @@ impl ShardData { catalog: &dyn Catalog, ) -> Result, super::Error> { let mut repos = catalog.repositories().await; + + let ns_name = NamespaceName::from(namespace); let namespace = repos .namespaces() .get_by_name(namespace) @@ -145,16 +150,17 @@ impl ShardData { let mut n = self.namespaces.write(); - Ok(match n.by_name(&namespace.name) { + Ok(match n.by_name(&ns_name) { Some(v) => v, None => { self.namespace_count.inc(1); // Insert the table and then return a ref to it. n.insert( - namespace.name, + ns_name.clone(), NamespaceData::new( namespace.id, + ns_name, self.shard_id, Arc::clone(&self.partition_provider), &*self.metrics, @@ -240,7 +246,7 @@ mod tests { ); // Assert the namespace does not contain the test data - assert!(shard.namespace(NAMESPACE_NAME).is_none()); + assert!(shard.namespace(&NAMESPACE_NAME.into()).is_none()); assert!(shard.namespace_by_id(ns_id).is_none()); // Write some test data @@ -261,7 +267,7 @@ mod tests { .expect("buffer op should succeed"); // Both forms of referencing the table should succeed - assert!(shard.namespace(NAMESPACE_NAME).is_some()); + assert!(shard.namespace(&NAMESPACE_NAME.into()).is_some()); assert!(shard.namespace_by_id(ns_id).is_some()); // And the table counter metric should increase diff --git a/ingester/src/handler.rs b/ingester/src/handler.rs index dde159dc52..7f51190102 100644 --- a/ingester/src/handler.rs +++ b/ingester/src/handler.rs @@ -499,11 +499,12 @@ mod tests { // give the writes some time to go through the buffer. Exit once we've verified there's // data in there from both writes. tokio::time::timeout(Duration::from_secs(2), async { + let ns_name = ingester.namespace.name.into(); loop { let mut has_measurement = false; if let Some(data) = ingester.ingester.data.shard(ingester.shard.id) { - if let Some(data) = data.namespace(&ingester.namespace.name) { + if let Some(data) = data.namespace(&ns_name) { // verify there's data in the buffer if let Some((b, _)) = data.snapshot("a", &"1970-01-01".into()).await { if let Some(b) = b.first() { @@ -740,11 +741,12 @@ mod tests { // give the writes some time to go through the buffer. Exit once we've verified there's // data in there tokio::time::timeout(Duration::from_secs(1), async move { + let ns_name = namespace.name.into(); loop { let mut has_measurement = false; if let Some(data) = ingester.data.shard(shard.id) { - if let Some(data) = data.namespace(&namespace.name) { + if let Some(data) = data.namespace(&ns_name) { // verify there's data in the buffer if let Some((b, _)) = data.snapshot("cpu", &"1970-01-01".into()).await { if let Some(b) = b.first() { diff --git a/ingester/src/querier_handler.rs b/ingester/src/querier_handler.rs index d3c8e37e19..cf58daab0c 100644 --- a/ingester/src/querier_handler.rs +++ b/ingester/src/querier_handler.rs @@ -12,8 +12,8 @@ use snafu::{ensure, Snafu}; use crate::{ data::{ - partition::UnpersistedPartitionData, IngesterData, IngesterQueryPartition, - IngesterQueryResponse, + namespace::NamespaceName, partition::UnpersistedPartitionData, IngesterData, + IngesterQueryPartition, IngesterQueryResponse, }, query::QueryableBatch, }; @@ -57,7 +57,8 @@ pub async fn prepare_data_to_querier( let mut found_namespace = false; for (shard_id, shard_data) in ingest_data.shards() { debug!(shard_id=%shard_id.get()); - let namespace_data = match shard_data.namespace(&request.namespace) { + let namespace_name = NamespaceName::from(&request.namespace); + let namespace_data = match shard_data.namespace(&namespace_name) { Some(namespace_data) => { debug!(namespace=%request.namespace, "found namespace"); found_namespace = true; diff --git a/ingester/src/test_util.rs b/ingester/src/test_util.rs index 09045083e8..ed3f8b6348 100644 --- a/ingester/src/test_util.rs +++ b/ingester/src/test_util.rs @@ -655,7 +655,7 @@ pub(crate) async fn make_ingester_data(two_partitions: bool, loc: DataLocation) let _ignored = ingester .shard(shard_id) .unwrap() - .namespace(TEST_NAMESPACE) + .namespace(&TEST_NAMESPACE.into()) .unwrap() .snapshot_to_persisting(TEST_TABLE, &PartitionKey::from(TEST_PARTITION_1)) .await; @@ -664,7 +664,7 @@ pub(crate) async fn make_ingester_data(two_partitions: bool, loc: DataLocation) let _ignored = ingester .shard(shard_id) .unwrap() - .namespace(TEST_NAMESPACE) + .namespace(&TEST_NAMESPACE.into()) .unwrap() .snapshot(TEST_TABLE, &PartitionKey::from(TEST_PARTITION_1)) .await; @@ -824,7 +824,7 @@ async fn make_one_partition_with_tombstones( let _ignored = ingester .shard(shard_id) .unwrap() - .namespace(TEST_NAMESPACE) + .namespace(&TEST_NAMESPACE.into()) .unwrap() .snapshot_to_persisting(TEST_TABLE, &PartitionKey::from(TEST_PARTITION_1)) .await; @@ -833,7 +833,7 @@ async fn make_one_partition_with_tombstones( let _ignored = ingester .shard(shard_id) .unwrap() - .namespace(TEST_NAMESPACE) + .namespace(&TEST_NAMESPACE.into()) .unwrap() .snapshot(TEST_TABLE, &PartitionKey::from(TEST_PARTITION_1)) .await;