From 97c6e0f8ceac3816dfa1cb0fbe690c911a7ba463 Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Tue, 4 Oct 2022 17:34:43 +0200 Subject: [PATCH] refactor: use TableName, not Arc Adds a type wrapper TableName, internally an Arc to leverage the type system instead of passing around untyped strings. --- ingester/src/data.rs | 24 ++++++----- ingester/src/data/namespace.rs | 41 +++++++++++------- ingester/src/data/partition.rs | 6 ++- ingester/src/data/partition/buffer.rs | 8 ++-- ingester/src/data/partition/resolver/cache.rs | 7 ++- .../src/data/partition/resolver/catalog.rs | 11 +++-- ingester/src/data/partition/resolver/mock.rs | 6 +-- ingester/src/data/partition/resolver/trait.rs | 18 +++----- ingester/src/data/table.rs | 43 ++++++++++++++++--- ingester/src/handler.rs | 10 +++-- ingester/src/querier_handler.rs | 7 +-- ingester/src/query.rs | 6 +-- ingester/src/test_util.rs | 8 ++-- 13 files changed, 122 insertions(+), 73 deletions(-) diff --git a/ingester/src/data.rs b/ingester/src/data.rs index 5bbd422ca7..4d89b8f976 100644 --- a/ingester/src/data.rs +++ b/ingester/src/data.rs @@ -34,6 +34,7 @@ pub mod table; use self::{ partition::{resolver::PartitionProvider, PartitionStatus}, shard::ShardData, + table::TableName, }; #[cfg(test)] @@ -449,16 +450,17 @@ impl Persister for IngesterData { .record(file_size as u64); // and remove the persisted data from memory + let table_name = TableName::from(&partition_info.table_name); namespace .mark_persisted( - &partition_info.table_name, + &table_name, &partition_info.partition.partition_key, iox_metadata.max_sequence_number, ) .await; debug!( ?partition_id, - table_name=%partition_info.table_name, + %table_name, partition_key=%partition_info.partition.partition_key, max_sequence_number=%iox_metadata.max_sequence_number.get(), "marked partition as persisted" @@ -816,8 +818,8 @@ mod tests { let (table_id, partition_id) = { let sd = data.shards.get(&shard1.id).unwrap(); let n = sd.namespace(&"foo".into()).unwrap(); - let mem_table = n.table_data("mem").unwrap(); - assert!(n.table_data("mem").is_some()); + let mem_table = n.table_data(&"mem".into()).unwrap(); + assert!(n.table_data(&"mem".into()).is_some()); let mem_table = mem_table.write().await; let p = mem_table .get_partition_by_key(&"1970-01-01".into()) @@ -961,8 +963,8 @@ mod tests { let partition_id; let table_id; { - let mem_table = n.table_data("mem").unwrap(); - assert!(n.table_data("cpu").is_some()); + let mem_table = n.table_data(&"mem".into()).unwrap(); + assert!(n.table_data(&"cpu".into()).is_some()); let mem_table = mem_table.write().await; table_id = mem_table.table_id(); @@ -1077,7 +1079,7 @@ mod tests { .unwrap(); assert_eq!(partition_info.partition.sort_key, vec!["time"]); - let mem_table = n.table_data("mem").unwrap(); + let mem_table = n.table_data(&"mem".into()).unwrap(); let mem_table = mem_table.read().await; // verify that the parquet_max_sequence_number got updated @@ -1372,7 +1374,7 @@ mod tests { .await .unwrap(); { - let table_data = data.table_data("mem").unwrap(); + let table_data = data.table_data(&"mem".into()).unwrap(); let table = table_data.read().await; let p = table.get_partition_by_key(&"1970-01-01".into()).unwrap(); assert_eq!( @@ -1388,7 +1390,7 @@ mod tests { .await .unwrap(); - let table_data = data.table_data("mem").unwrap(); + let table_data = data.table_data(&"mem".into()).unwrap(); let table = table_data.read().await; let partition = table.get_partition_by_key(&"1970-01-01".into()).unwrap(); assert_eq!( @@ -1481,7 +1483,7 @@ mod tests { .unwrap() .namespace(&namespace.name.clone().into()) .unwrap() - .table_data("mem") + .table_data(&"mem".into()) .unwrap() .read() .await @@ -1513,7 +1515,7 @@ mod tests { .unwrap() .namespace(&namespace.name.into()) .unwrap() - .table_data("mem") + .table_data(&"mem".into()) .unwrap() .read() .await diff --git a/ingester/src/data/namespace.rs b/ingester/src/data/namespace.rs index 4b67e9642c..94013b36c8 100644 --- a/ingester/src/data/namespace.rs +++ b/ingester/src/data/namespace.rs @@ -13,20 +13,23 @@ use write_summary::ShardProgress; #[cfg(test)] use super::triggers::TestTriggers; -use super::{partition::resolver::PartitionProvider, table::TableData}; +use super::{ + partition::resolver::PartitionProvider, + table::{TableData, TableName}, +}; use crate::lifecycle::LifecycleHandle; /// A double-referenced map where [`TableData`] can be looked up by name, or ID. #[derive(Debug, Default)] struct DoubleRef { // TODO(4880): this can be removed when IDs are sent over the wire. - by_name: HashMap, Arc>>, + by_name: HashMap>>, by_id: HashMap>>, } impl DoubleRef { fn insert(&mut self, t: TableData) -> Arc> { - let name = Arc::clone(t.table_name()); + let name = t.table_name().clone(); let id = t.table_id(); let t = Arc::new(tokio::sync::RwLock::new(t)); @@ -35,7 +38,7 @@ impl DoubleRef { t } - fn by_name(&self, name: &str) -> Option>> { + fn by_name(&self, name: &TableName) -> Option>> { self.by_name.get(name).map(Arc::clone) } @@ -196,6 +199,7 @@ impl NamespaceData { .clone(); for (t, b) in write.into_tables() { + let t = TableName::from(t); let table_data = match self.table_data(&t) { Some(t) => t, None => self.insert_table(&t, catalog).await?, @@ -221,10 +225,13 @@ impl NamespaceData { Ok(pause_writes) } DmlOperation::Delete(delete) => { - let table_name = delete.table_name().context(super::TableNotPresentSnafu)?; - let table_data = match self.table_data(table_name) { + let table_name = delete + .table_name() + .context(super::TableNotPresentSnafu)? + .into(); + let table_data = match self.table_data(&table_name) { Some(t) => t, - None => self.insert_table(table_name, catalog).await?, + None => self.insert_table(&table_name, catalog).await?, }; let mut table_data = table_data.write().await; @@ -244,7 +251,7 @@ impl NamespaceData { #[cfg(test)] // Only used in tests pub(crate) async fn snapshot( &self, - table_name: &str, + table_name: &TableName, partition_key: &PartitionKey, ) -> Option<( Vec>, @@ -270,7 +277,7 @@ impl NamespaceData { #[cfg(test)] // Only used in tests pub(crate) async fn snapshot_to_persisting( &self, - table_name: &str, + table_name: &TableName, partition_key: &PartitionKey, ) -> Option> { if let Some(table_data) = self.table_data(table_name) { @@ -287,7 +294,7 @@ impl NamespaceData { /// Gets the buffered table data pub(crate) fn table_data( &self, - table_name: &str, + table_name: &TableName, ) -> Option>> { let t = self.tables.read(); t.by_name(table_name) @@ -305,7 +312,7 @@ impl NamespaceData { /// Inserts the table or returns it if it happens to be inserted by some other thread async fn insert_table( &self, - table_name: &str, + table_name: &TableName, catalog: &Arc, ) -> Result>, super::Error> { let mut repos = catalog.repositories().await; @@ -314,7 +321,9 @@ impl NamespaceData { .get_table_persist_info(self.shard_id, self.namespace_id, table_name) .await .context(super::CatalogSnafu)? - .context(super::TableNotFoundSnafu { table_name })?; + .ok_or_else(|| super::Error::TableNotFound { + table_name: table_name.to_string(), + })?; let mut t = self.tables.write(); @@ -326,7 +335,7 @@ impl NamespaceData { // Insert the table and then return a ref to it. t.insert(TableData::new( info.table_id, - table_name, + table_name.clone(), self.shard_id, self.namespace_id, info.tombstone_max_sequence_number, @@ -341,7 +350,7 @@ impl NamespaceData { /// data buffer. pub(super) async fn mark_persisted( &self, - table_name: &str, + table_name: &TableName, partition_key: &PartitionKey, sequence_number: SequenceNumber, ) { @@ -479,7 +488,7 @@ mod tests { assert_eq!(&**ns.namespace_name(), NAMESPACE_NAME); // Assert the namespace does not contain the test data - assert!(ns.table_data(TABLE_NAME).is_none()); + assert!(ns.table_data(&TABLE_NAME.into()).is_none()); assert!(ns.table_id(table_id).is_none()); // Write some test data @@ -499,7 +508,7 @@ mod tests { .expect("buffer op should succeed"); // Both forms of referencing the table should succeed - assert!(ns.table_data(TABLE_NAME).is_some()); + assert!(ns.table_data(&TABLE_NAME.into()).is_some()); assert!(ns.table_id(table_id).is_some()); // And the table counter metric should increase diff --git a/ingester/src/data/partition.rs b/ingester/src/data/partition.rs index 7707f8301f..b35a2a6d31 100644 --- a/ingester/src/data/partition.rs +++ b/ingester/src/data/partition.rs @@ -19,6 +19,8 @@ use self::{ }; use crate::{data::query_dedup::query, query::QueryableBatch}; +use super::table::TableName; + mod buffer; pub mod resolver; @@ -180,7 +182,7 @@ pub struct PartitionData { namespace_id: NamespaceId, table_id: TableId, /// The name of the table this partition is part of. - table_name: Arc, + table_name: TableName, pub(super) data: DataBuffer, @@ -198,7 +200,7 @@ impl PartitionData { shard_id: ShardId, namespace_id: NamespaceId, table_id: TableId, - table_name: Arc, + table_name: TableName, sort_key: SortKeyState, max_persisted_sequence_number: Option, ) -> Self { diff --git a/ingester/src/data/partition/buffer.rs b/ingester/src/data/partition/buffer.rs index 739da735fa..3195b9c74d 100644 --- a/ingester/src/data/partition/buffer.rs +++ b/ingester/src/data/partition/buffer.rs @@ -9,6 +9,8 @@ use snafu::ResultExt; use uuid::Uuid; use write_summary::ShardProgress; +use crate::data::table::TableName; + use super::{PersistingBatch, QueryableBatch, SnapshotBatch}; /// Data of an IOx partition split into batches @@ -109,7 +111,7 @@ impl DataBuffer { /// Both buffer and snapshots will be empty after this pub(super) fn snapshot_to_queryable_batch( &mut self, - table_name: &Arc, + table_name: &TableName, partition_id: PartitionId, tombstone: Option, ) -> Option { @@ -129,7 +131,7 @@ impl DataBuffer { None } else { Some(QueryableBatch::new( - Arc::clone(table_name), + table_name.clone(), partition_id, data, tombstones, @@ -164,7 +166,7 @@ impl DataBuffer { shard_id: ShardId, table_id: TableId, partition_id: PartitionId, - table_name: &Arc, + table_name: &TableName, ) -> Option> { if self.persisting.is_some() { panic!("Unable to snapshot while persisting. This is an unexpected state.") diff --git a/ingester/src/data/partition/resolver/cache.rs b/ingester/src/data/partition/resolver/cache.rs index a9dd897444..7f282ae38c 100644 --- a/ingester/src/data/partition/resolver/cache.rs +++ b/ingester/src/data/partition/resolver/cache.rs @@ -9,7 +9,10 @@ use iox_catalog::interface::Catalog; use observability_deps::tracing::debug; use parking_lot::Mutex; -use crate::data::partition::{resolver::DeferredSortKey, PartitionData, SortKeyState}; +use crate::data::{ + partition::{resolver::DeferredSortKey, PartitionData, SortKeyState}, + table::TableName, +}; use super::r#trait::PartitionProvider; @@ -189,7 +192,7 @@ where shard_id: ShardId, namespace_id: NamespaceId, table_id: TableId, - table_name: Arc, + table_name: TableName, ) -> PartitionData { // Use the cached PartitionKey instead of the caller's partition_key, // instead preferring to reuse the already-shared Arc in the cache. diff --git a/ingester/src/data/partition/resolver/catalog.rs b/ingester/src/data/partition/resolver/catalog.rs index 128b9a5614..e42c4876c4 100644 --- a/ingester/src/data/partition/resolver/catalog.rs +++ b/ingester/src/data/partition/resolver/catalog.rs @@ -9,7 +9,10 @@ use data_types::{NamespaceId, Partition, PartitionKey, ShardId, TableId}; use iox_catalog::interface::Catalog; use observability_deps::tracing::debug; -use crate::data::partition::{PartitionData, SortKeyState}; +use crate::data::{ + partition::{PartitionData, SortKeyState}, + table::TableName, +}; use super::r#trait::PartitionProvider; @@ -55,7 +58,7 @@ impl PartitionProvider for CatalogPartitionResolver { shard_id: ShardId, namespace_id: NamespaceId, table_id: TableId, - table_name: Arc, + table_name: TableName, ) -> PartitionData { debug!( %partition_key, @@ -132,7 +135,7 @@ mod tests { }; let callers_partition_key = PartitionKey::from(PARTITION_KEY); - let table_name = TABLE_NAME.into(); + let table_name = TableName::from(TABLE_NAME); let resolver = CatalogPartitionResolver::new(Arc::clone(&catalog)); let got = resolver .get_partition( @@ -140,7 +143,7 @@ mod tests { shard_id, namespace_id, table_id, - Arc::clone(&table_name), + table_name.clone(), ) .await; assert_eq!(got.namespace_id(), namespace_id); diff --git a/ingester/src/data/partition/resolver/mock.rs b/ingester/src/data/partition/resolver/mock.rs index e65f127ef4..80f859c43e 100644 --- a/ingester/src/data/partition/resolver/mock.rs +++ b/ingester/src/data/partition/resolver/mock.rs @@ -1,12 +1,12 @@ //! A mock [`PartitionProvider`] to inject [`PartitionData`] for tests. -use std::{collections::HashMap, sync::Arc}; +use std::collections::HashMap; use async_trait::async_trait; use data_types::{NamespaceId, PartitionKey, ShardId, TableId}; use parking_lot::Mutex; -use crate::data::partition::PartitionData; +use crate::data::{partition::PartitionData, table::TableName}; use super::r#trait::PartitionProvider; @@ -58,7 +58,7 @@ impl PartitionProvider for MockPartitionProvider { shard_id: ShardId, namespace_id: NamespaceId, table_id: TableId, - table_name: Arc, + table_name: TableName, ) -> PartitionData { let p = self .partitions diff --git a/ingester/src/data/partition/resolver/trait.rs b/ingester/src/data/partition/resolver/trait.rs index a8bf3134e4..4ca50ec949 100644 --- a/ingester/src/data/partition/resolver/trait.rs +++ b/ingester/src/data/partition/resolver/trait.rs @@ -3,7 +3,7 @@ use std::{fmt::Debug, sync::Arc}; use async_trait::async_trait; use data_types::{NamespaceId, PartitionKey, ShardId, TableId}; -use crate::data::partition::PartitionData; +use crate::data::{partition::PartitionData, table::TableName}; /// An infallible resolver of [`PartitionData`] for the specified shard, table, /// and partition key, returning an initialised [`PartitionData`] buffer for it. @@ -20,7 +20,7 @@ pub trait PartitionProvider: Send + Sync + Debug { shard_id: ShardId, namespace_id: NamespaceId, table_id: TableId, - table_name: Arc, + table_name: TableName, ) -> PartitionData; } @@ -35,7 +35,7 @@ where shard_id: ShardId, namespace_id: NamespaceId, table_id: TableId, - table_name: Arc, + table_name: TableName, ) -> PartitionData { (**self) .get_partition(partition_key, shard_id, namespace_id, table_id, table_name) @@ -59,7 +59,7 @@ mod tests { let shard_id = ShardId::new(42); let namespace_id = NamespaceId::new(1234); let table_id = TableId::new(24); - let table_name = "platanos".into(); + let table_name = TableName::from("platanos"); let partition = PartitionId::new(4242); let data = PartitionData::new( partition, @@ -67,7 +67,7 @@ mod tests { shard_id, namespace_id, table_id, - Arc::clone(&table_name), + table_name.clone(), SortKeyState::Provided(None), None, ); @@ -75,13 +75,7 @@ mod tests { let mock = Arc::new(MockPartitionProvider::default().with_partition(data)); let got = mock - .get_partition( - key, - shard_id, - namespace_id, - table_id, - Arc::clone(&table_name), - ) + .get_partition(key, shard_id, namespace_id, table_id, table_name.clone()) .await; assert_eq!(got.partition_id(), partition); assert_eq!(got.namespace_id(), namespace_id); diff --git a/ingester/src/data/table.rs b/ingester/src/data/table.rs index 9a49b5f291..008f74d149 100644 --- a/ingester/src/data/table.rs +++ b/ingester/src/data/table.rs @@ -50,11 +50,40 @@ impl DoubleRef { } } +/// The string name / identifier of a Table. +/// +/// A reference-counted, cheap clone-able string. +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub struct TableName(Arc); + +impl From for TableName +where + T: AsRef, +{ + fn from(v: T) -> Self { + Self(Arc::from(v.as_ref())) + } +} + +impl std::fmt::Display for TableName { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.0.fmt(f) + } +} + +impl std::ops::Deref for TableName { + type Target = str; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + /// Data of a Table in a given Namesapce that belongs to a given Shard #[derive(Debug)] pub(crate) struct TableData { table_id: TableId, - table_name: Arc, + table_name: TableName, /// The catalog ID of the shard & namespace this table is being populated /// from. @@ -85,7 +114,7 @@ impl TableData { /// for the first time. pub(super) fn new( table_id: TableId, - table_name: &str, + table_name: TableName, shard_id: ShardId, namespace_id: NamespaceId, tombstone_max_sequence_number: Option, @@ -93,7 +122,7 @@ impl TableData { ) -> Self { Self { table_id, - table_name: table_name.into(), + table_name, shard_id, namespace_id, tombstone_max_sequence_number, @@ -137,7 +166,7 @@ impl TableData { self.shard_id, self.namespace_id, self.table_id, - Arc::clone(&self.table_name), + self.table_name.clone(), ) .await; // Add the double-referenced partition to the map. @@ -276,7 +305,7 @@ impl TableData { } /// Returns the name of this table. - pub(crate) fn table_name(&self) -> &Arc { + pub(crate) fn table_name(&self) -> &TableName { &self.table_name } } @@ -335,7 +364,7 @@ mod tests { let mut table = TableData::new( table_id, - TABLE_NAME, + TABLE_NAME.into(), shard_id, ns_id, None, @@ -395,7 +424,7 @@ mod tests { let mut table = TableData::new( table_id, - TABLE_NAME, + TABLE_NAME.into(), shard_id, ns_id, None, diff --git a/ingester/src/handler.rs b/ingester/src/handler.rs index 1f51ce194d..67b34342dd 100644 --- a/ingester/src/handler.rs +++ b/ingester/src/handler.rs @@ -445,7 +445,7 @@ mod tests { use write_buffer::mock::{MockBufferForReading, MockBufferSharedState}; use super::*; - use crate::data::partition::SnapshotBatch; + use crate::data::{partition::SnapshotBatch, table::TableName}; #[tokio::test] async fn read_from_write_buffer_write_to_mutable_buffer() { @@ -513,13 +513,15 @@ mod tests { // data in there from both writes. tokio::time::timeout(Duration::from_secs(2), async { let ns_name = ingester.namespace.name.into(); + let table_name = TableName::from("a"); loop { let mut has_measurement = false; if let Some(data) = ingester.ingester.data.shard(ingester.shard.id) { if let Some(data) = data.namespace(&ns_name) { // verify there's data in the buffer - if let Some((b, _)) = data.snapshot("a", &"1970-01-01".into()).await { + if let Some((b, _)) = data.snapshot(&table_name, &"1970-01-01".into()).await + { if let Some(b) = b.first() { if b.data.num_rows() > 0 { has_measurement = true; @@ -755,13 +757,15 @@ mod tests { // data in there tokio::time::timeout(Duration::from_secs(1), async move { let ns_name = namespace.name.into(); + let table_name = TableName::from("cpu"); loop { let mut has_measurement = false; if let Some(data) = ingester.data.shard(shard.id) { if let Some(data) = data.namespace(&ns_name) { // verify there's data in the buffer - if let Some((b, _)) = data.snapshot("cpu", &"1970-01-01".into()).await { + if let Some((b, _)) = data.snapshot(&table_name, &"1970-01-01".into()).await + { if let Some(b) = b.first() { custom_batch_verification(b); diff --git a/ingester/src/querier_handler.rs b/ingester/src/querier_handler.rs index cf58daab0c..7ff7494af0 100644 --- a/ingester/src/querier_handler.rs +++ b/ingester/src/querier_handler.rs @@ -12,8 +12,8 @@ use snafu::{ensure, Snafu}; use crate::{ data::{ - namespace::NamespaceName, partition::UnpersistedPartitionData, IngesterData, - IngesterQueryPartition, IngesterQueryResponse, + namespace::NamespaceName, partition::UnpersistedPartitionData, table::TableName, + IngesterData, IngesterQueryPartition, IngesterQueryResponse, }, query::QueryableBatch, }; @@ -69,7 +69,8 @@ pub async fn prepare_data_to_querier( } }; - let table_data = match namespace_data.table_data(&request.table) { + let table_name = TableName::from(&request.table); + let table_data = match namespace_data.table_data(&table_name) { Some(table_data) => { debug!(table_name=%request.table, "found table"); table_data diff --git a/ingester/src/query.rs b/ingester/src/query.rs index 1829ecf4ae..219dcbcf6e 100644 --- a/ingester/src/query.rs +++ b/ingester/src/query.rs @@ -28,7 +28,7 @@ use predicate::{ use schema::{merge::merge_record_batch_schemas, selection::Selection, sort::SortKey, Schema}; use snafu::{ResultExt, Snafu}; -use crate::data::partition::SnapshotBatch; +use crate::data::{partition::SnapshotBatch, table::TableName}; #[allow(clippy::enum_variant_names)] #[derive(Debug, Snafu)] @@ -60,7 +60,7 @@ pub(crate) struct QueryableBatch { pub(crate) delete_predicates: Vec>, /// This is needed to return a reference for a trait function - pub(crate) table_name: Arc, + pub(crate) table_name: TableName, /// Partition ID pub(crate) partition_id: PartitionId, @@ -69,7 +69,7 @@ pub(crate) struct QueryableBatch { impl QueryableBatch { /// Initilaize a QueryableBatch pub(crate) fn new( - table_name: Arc, + table_name: TableName, partition_id: PartitionId, data: Vec>, deletes: Vec, diff --git a/ingester/src/test_util.rs b/ingester/src/test_util.rs index ed3f8b6348..cde40ac9c2 100644 --- a/ingester/src/test_util.rs +++ b/ingester/src/test_util.rs @@ -657,7 +657,7 @@ pub(crate) async fn make_ingester_data(two_partitions: bool, loc: DataLocation) .unwrap() .namespace(&TEST_NAMESPACE.into()) .unwrap() - .snapshot_to_persisting(TEST_TABLE, &PartitionKey::from(TEST_PARTITION_1)) + .snapshot_to_persisting(&TEST_TABLE.into(), &PartitionKey::from(TEST_PARTITION_1)) .await; } else if loc.contains(DataLocation::SNAPSHOT) { // move partition 1 data to snapshot @@ -666,7 +666,7 @@ pub(crate) async fn make_ingester_data(two_partitions: bool, loc: DataLocation) .unwrap() .namespace(&TEST_NAMESPACE.into()) .unwrap() - .snapshot(TEST_TABLE, &PartitionKey::from(TEST_PARTITION_1)) + .snapshot(&TEST_TABLE.into(), &PartitionKey::from(TEST_PARTITION_1)) .await; } @@ -826,7 +826,7 @@ async fn make_one_partition_with_tombstones( .unwrap() .namespace(&TEST_NAMESPACE.into()) .unwrap() - .snapshot_to_persisting(TEST_TABLE, &PartitionKey::from(TEST_PARTITION_1)) + .snapshot_to_persisting(&TEST_TABLE.into(), &PartitionKey::from(TEST_PARTITION_1)) .await; } else if loc.contains(DataLocation::SNAPSHOT) { // move partition 1 data to snapshot @@ -835,7 +835,7 @@ async fn make_one_partition_with_tombstones( .unwrap() .namespace(&TEST_NAMESPACE.into()) .unwrap() - .snapshot(TEST_TABLE, &PartitionKey::from(TEST_PARTITION_1)) + .snapshot(&TEST_TABLE.into(), &PartitionKey::from(TEST_PARTITION_1)) .await; }