Merge branch 'main' into dom/deferred-namespace-name
commit
2e7a1391f8
|
@ -711,12 +711,7 @@ pub mod tests {
|
|||
let pool = txn.query_pools().create_or_get("foo").await.unwrap();
|
||||
let namespace = txn
|
||||
.namespaces()
|
||||
.create(
|
||||
"namespace_hot_partitions_to_compact",
|
||||
"inf",
|
||||
topic.id,
|
||||
pool.id,
|
||||
)
|
||||
.create("namespace_hot_partitions_to_compact", topic.id, pool.id)
|
||||
.await
|
||||
.unwrap();
|
||||
let table = txn
|
||||
|
|
|
@ -149,7 +149,7 @@ mod tests {
|
|||
let pool = txn.query_pools().create_or_get("foo").await.unwrap();
|
||||
let namespace = txn
|
||||
.namespaces()
|
||||
.create("gc_leave_undeleted_files_alone", "inf", topic.id, pool.id)
|
||||
.create("gc_leave_undeleted_files_alone", topic.id, pool.id)
|
||||
.await
|
||||
.unwrap();
|
||||
let table = txn
|
||||
|
@ -229,7 +229,7 @@ mod tests {
|
|||
let pool = txn.query_pools().create_or_get("foo").await.unwrap();
|
||||
let namespace = txn
|
||||
.namespaces()
|
||||
.create("gc_leave_too_new_files_alone", "inf", topic.id, pool.id)
|
||||
.create("gc_leave_too_new_files_alone", topic.id, pool.id)
|
||||
.await
|
||||
.unwrap();
|
||||
let table = txn
|
||||
|
@ -313,7 +313,7 @@ mod tests {
|
|||
let pool = txn.query_pools().create_or_get("foo").await.unwrap();
|
||||
let namespace = txn
|
||||
.namespaces()
|
||||
.create("gc_remove_old_enough_files", "inf", topic.id, pool.id)
|
||||
.create("gc_remove_old_enough_files", topic.id, pool.id)
|
||||
.await
|
||||
.unwrap();
|
||||
let table = txn
|
||||
|
|
|
@ -202,19 +202,19 @@ WHERE partition.id = skipped_compactions.partition_id and partition.shard_id = s
|
|||
ORDER BY shard_index, table_id, partition_key, skipped_at;
|
||||
|
||||
-- Number of files per level for top 50 partitions with most files of a specified day
|
||||
SELECT s.shard_index, pf.table_id, partition_id, partition_key,
|
||||
count(case when to_delete is null then 1 end) total_not_deleted,
|
||||
count(case when compaction_level=0 and to_delete is null then 1 end) num_l0,
|
||||
count(case when compaction_level=1 and to_delete is null then 1 end) num_l1,
|
||||
count(case when compaction_level=2 and to_delete is null then 1 end) num_l2 ,
|
||||
count(case when compaction_level=0 and to_delete is not null then 1 end) deleted_num_l0,
|
||||
count(case when compaction_level=1 and to_delete is not null then 1 end) deleted_num_l1,
|
||||
count(case when compaction_level=2 and to_delete is not null then 1 end) deleted_num_l2
|
||||
SELECT s.shard_index, pf.table_id, pf.partition_id, p.partition_key,
|
||||
count(case when pf.to_delete is null then 1 end) total_not_deleted,
|
||||
count(case when pf.compaction_level=0 and pf.to_delete is null then 1 end) num_l0,
|
||||
count(case when pf.compaction_level=1 and pf.to_delete is null then 1 end) num_l1,
|
||||
count(case when pf.compaction_level=2 and pf.to_delete is null then 1 end) num_l2 ,
|
||||
count(case when pf.compaction_level=0 and pf.to_delete is not null then 1 end) deleted_num_l0,
|
||||
count(case when pf.compaction_level=1 and pf.to_delete is not null then 1 end) deleted_num_l1,
|
||||
count(case when pf.compaction_level=2 and pf.to_delete is not null then 1 end) deleted_num_l2
|
||||
FROM parquet_file pf, partition p, shard s
|
||||
WHERE pf.partition_id = p.id AND pf.shard_id = s.id
|
||||
AND partition_key = '2022-10-11'
|
||||
GROUP BY s.shard_index, pf.table_id, partition_id, partition_key
|
||||
ORDER BY count(case when to_delete is null then 1 end) DESC
|
||||
AND p.partition_key = '2022-10-11'
|
||||
GROUP BY s.shard_index, pf.table_id, pf.partition_id, p.partition_key
|
||||
ORDER BY count(case when pf.to_delete is null then 1 end) DESC
|
||||
LIMIT 50;
|
||||
|
||||
-- Partitions with level-0 files ingested within the last 4 hours
|
||||
|
|
|
@ -138,7 +138,7 @@ mod tests {
|
|||
let pool = repos.query_pools().create_or_get("foo").await.unwrap();
|
||||
let namespace = repos
|
||||
.namespaces()
|
||||
.create("namespace_parquet_file_test", "inf", topic.id, pool.id)
|
||||
.create("namespace_parquet_file_test", topic.id, pool.id)
|
||||
.await
|
||||
.unwrap();
|
||||
let table = repos
|
||||
|
|
|
@ -55,7 +55,6 @@ pub async fn update_iox_catalog<'a>(
|
|||
merged_tsm_schema: &'a AggregateTSMSchema,
|
||||
topic: &'a str,
|
||||
query_pool_name: Option<&'a str>,
|
||||
retention: Option<&'a str>,
|
||||
catalog: Arc<dyn Catalog>,
|
||||
connection: Connection,
|
||||
) -> Result<(), UpdateCatalogError> {
|
||||
|
@ -67,8 +66,8 @@ pub async fn update_iox_catalog<'a>(
|
|||
Ok(iox_schema) => iox_schema,
|
||||
Err(iox_catalog::interface::Error::NamespaceNotFoundByName { .. }) => {
|
||||
// Namespace has to be created; ensure the user provided the required parameters
|
||||
let (query_pool_name, retention) = match (query_pool_name, retention) {
|
||||
(Some(query_pool_name), Some(retention)) => (query_pool_name, retention),
|
||||
let query_pool_name = match query_pool_name {
|
||||
Some(query_pool_name) => query_pool_name,
|
||||
_ => {
|
||||
return Err(UpdateCatalogError::NamespaceCreationError("in order to create the namespace you must provide query_pool_name and retention args".to_string()));
|
||||
}
|
||||
|
@ -78,7 +77,6 @@ pub async fn update_iox_catalog<'a>(
|
|||
get_topic_id_and_query_id(repos.deref_mut(), topic, query_pool_name).await?;
|
||||
let _namespace = create_namespace(
|
||||
namespace_name.as_str(),
|
||||
retention,
|
||||
topic_id,
|
||||
query_id,
|
||||
repos.deref_mut(),
|
||||
|
@ -138,7 +136,6 @@ where
|
|||
|
||||
async fn create_namespace<R>(
|
||||
name: &str,
|
||||
retention: &str,
|
||||
topic_id: TopicId,
|
||||
query_id: QueryPoolId,
|
||||
repos: &mut R,
|
||||
|
@ -146,11 +143,7 @@ async fn create_namespace<R>(
|
|||
where
|
||||
R: RepoCollection + ?Sized,
|
||||
{
|
||||
match repos
|
||||
.namespaces()
|
||||
.create(name, retention, topic_id, query_id)
|
||||
.await
|
||||
{
|
||||
match repos.namespaces().create(name, topic_id, query_id).await {
|
||||
Ok(ns) => Ok(ns),
|
||||
Err(iox_catalog::interface::Error::NameExists { .. }) => {
|
||||
// presumably it got created in the meantime?
|
||||
|
@ -532,7 +525,6 @@ mod tests {
|
|||
&agg_schema,
|
||||
"iox-shared",
|
||||
Some("iox-shared"),
|
||||
Some("inf"),
|
||||
Arc::clone(&catalog),
|
||||
connection,
|
||||
)
|
||||
|
@ -598,7 +590,7 @@ mod tests {
|
|||
// create namespace, table and columns for weather measurement
|
||||
let namespace = txn
|
||||
.namespaces()
|
||||
.create("1234_5678", "inf", TopicId::new(1), QueryPoolId::new(1))
|
||||
.create("1234_5678", TopicId::new(1), QueryPoolId::new(1))
|
||||
.await
|
||||
.expect("namespace created");
|
||||
let mut table = txn
|
||||
|
@ -652,7 +644,6 @@ mod tests {
|
|||
&agg_schema,
|
||||
"iox-shared",
|
||||
Some("iox-shared"),
|
||||
Some("inf"),
|
||||
Arc::clone(&catalog),
|
||||
connection,
|
||||
)
|
||||
|
@ -703,7 +694,7 @@ mod tests {
|
|||
// create namespace, table and columns for weather measurement
|
||||
let namespace = txn
|
||||
.namespaces()
|
||||
.create("1234_5678", "inf", TopicId::new(1), QueryPoolId::new(1))
|
||||
.create("1234_5678", TopicId::new(1), QueryPoolId::new(1))
|
||||
.await
|
||||
.expect("namespace created");
|
||||
let mut table = txn
|
||||
|
@ -750,7 +741,6 @@ mod tests {
|
|||
&agg_schema,
|
||||
"iox-shared",
|
||||
Some("iox-shared"),
|
||||
Some("inf"),
|
||||
Arc::clone(&catalog),
|
||||
connection,
|
||||
)
|
||||
|
@ -784,7 +774,7 @@ mod tests {
|
|||
// create namespace, table and columns for weather measurement
|
||||
let namespace = txn
|
||||
.namespaces()
|
||||
.create("1234_5678", "inf", TopicId::new(1), QueryPoolId::new(1))
|
||||
.create("1234_5678", TopicId::new(1), QueryPoolId::new(1))
|
||||
.await
|
||||
.expect("namespace created");
|
||||
let mut table = txn
|
||||
|
@ -830,7 +820,6 @@ mod tests {
|
|||
&agg_schema,
|
||||
"iox-shared",
|
||||
Some("iox-shared"),
|
||||
Some("inf"),
|
||||
Arc::clone(&catalog),
|
||||
connection,
|
||||
)
|
||||
|
@ -883,57 +872,6 @@ mod tests {
|
|||
&agg_schema,
|
||||
"iox-shared",
|
||||
None,
|
||||
Some("inf"),
|
||||
Arc::clone(&catalog),
|
||||
connection,
|
||||
)
|
||||
.await
|
||||
.expect_err("should fail namespace creation");
|
||||
assert_matches!(err, UpdateCatalogError::NamespaceCreationError(_));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn needs_creating_but_missing_retention() {
|
||||
// init a test catalog stack
|
||||
let metrics = Arc::new(metric::Registry::default());
|
||||
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new(Arc::clone(&metrics)));
|
||||
catalog
|
||||
.repositories()
|
||||
.await
|
||||
.topics()
|
||||
.create_or_get("iox-shared")
|
||||
.await
|
||||
.expect("topic created");
|
||||
let (connection, _join_handle, _requests) = create_test_shard_service(MapToShardResponse {
|
||||
shard_id: 0,
|
||||
shard_index: 0,
|
||||
})
|
||||
.await;
|
||||
|
||||
let json = r#"
|
||||
{
|
||||
"org_id": "1234",
|
||||
"bucket_id": "5678",
|
||||
"measurements": {
|
||||
"cpu": {
|
||||
"tags": [
|
||||
{ "name": "host", "values": ["server", "desktop"] }
|
||||
],
|
||||
"fields": [
|
||||
{ "name": "usage", "types": ["Float"] }
|
||||
],
|
||||
"earliest_time": "2022-01-01T00:00:00.00Z",
|
||||
"latest_time": "2022-07-07T06:00:00.00Z"
|
||||
}
|
||||
}
|
||||
}
|
||||
"#;
|
||||
let agg_schema: AggregateTSMSchema = json.try_into().unwrap();
|
||||
let err = update_iox_catalog(
|
||||
&agg_schema,
|
||||
"iox-shared",
|
||||
Some("iox-shared"),
|
||||
None,
|
||||
Arc::clone(&catalog),
|
||||
connection,
|
||||
)
|
||||
|
@ -992,7 +930,6 @@ mod tests {
|
|||
&agg_schema,
|
||||
"iox-shared",
|
||||
Some("iox-shared"),
|
||||
Some("inf"),
|
||||
Arc::clone(&catalog),
|
||||
connection,
|
||||
)
|
||||
|
|
|
@ -183,7 +183,6 @@ pub async fn command(connection: Connection, config: Config) -> Result<(), Schem
|
|||
&merged_tsm_schema,
|
||||
merge_config.write_buffer_config.topic(),
|
||||
merge_config.query_pool_name.as_deref(),
|
||||
merge_config.retention.as_deref(),
|
||||
Arc::clone(&catalog),
|
||||
connection.clone(),
|
||||
)
|
||||
|
|
|
@ -246,7 +246,7 @@ async fn load_schema(
|
|||
|
||||
let namespace = match repos
|
||||
.namespaces()
|
||||
.create(namespace, "inf", topic.id, query_pool.id)
|
||||
.create(namespace, topic.id, query_pool.id)
|
||||
.await
|
||||
{
|
||||
Ok(n) => n,
|
||||
|
@ -527,7 +527,7 @@ mod tests {
|
|||
.unwrap();
|
||||
namespace = repos
|
||||
.namespaces()
|
||||
.create("load_parquet_files", "", topic.id, query_pool.id)
|
||||
.create("load_parquet_files", topic.id, query_pool.id)
|
||||
.await
|
||||
.unwrap();
|
||||
table = repos
|
||||
|
|
|
@ -607,7 +607,7 @@ async fn namespace_retention() {
|
|||
.arg("retention")
|
||||
.arg("--retention-hours")
|
||||
.arg(retention_period_hours.to_string())
|
||||
.arg(&namespace)
|
||||
.arg(namespace)
|
||||
.assert()
|
||||
.success()
|
||||
.stdout(
|
||||
|
@ -638,7 +638,7 @@ async fn namespace_retention() {
|
|||
.arg("retention")
|
||||
.arg("--retention-hours")
|
||||
.arg(retention_period_hours.to_string())
|
||||
.arg(&namespace)
|
||||
.arg(namespace)
|
||||
.assert()
|
||||
.success()
|
||||
.stdout(
|
||||
|
|
|
@ -653,7 +653,7 @@ mod tests {
|
|||
let shard_index = ShardIndex::new(0);
|
||||
let namespace = repos
|
||||
.namespaces()
|
||||
.create("foo", "inf", topic.id, query_pool.id)
|
||||
.create("foo", topic.id, query_pool.id)
|
||||
.await
|
||||
.unwrap();
|
||||
let shard1 = repos
|
||||
|
@ -754,7 +754,7 @@ mod tests {
|
|||
let shard_index = ShardIndex::new(0);
|
||||
let namespace = repos
|
||||
.namespaces()
|
||||
.create("foo", "inf", topic.id, query_pool.id)
|
||||
.create("foo", topic.id, query_pool.id)
|
||||
.await
|
||||
.unwrap();
|
||||
let shard1 = repos
|
||||
|
@ -857,7 +857,7 @@ mod tests {
|
|||
let shard_index = ShardIndex::new(0);
|
||||
let namespace = repos
|
||||
.namespaces()
|
||||
.create("foo", "inf", topic.id, query_pool.id)
|
||||
.create("foo", topic.id, query_pool.id)
|
||||
.await
|
||||
.unwrap();
|
||||
let shard1 = repos
|
||||
|
@ -1137,7 +1137,7 @@ mod tests {
|
|||
let shard_index = ShardIndex::new(0);
|
||||
let namespace = repos
|
||||
.namespaces()
|
||||
.create("foo", "inf", topic.id, query_pool.id)
|
||||
.create("foo", topic.id, query_pool.id)
|
||||
.await
|
||||
.unwrap();
|
||||
let shard1 = repos
|
||||
|
@ -1279,7 +1279,7 @@ mod tests {
|
|||
let shard_index = ShardIndex::new(0);
|
||||
let namespace = repos
|
||||
.namespaces()
|
||||
.create("foo", "inf", topic.id, query_pool.id)
|
||||
.create("foo", topic.id, query_pool.id)
|
||||
.await
|
||||
.unwrap();
|
||||
let shard = repos
|
||||
|
@ -1459,7 +1459,7 @@ mod tests {
|
|||
let shard_index = ShardIndex::new(0);
|
||||
let namespace = repos
|
||||
.namespaces()
|
||||
.create("foo", "inf", topic.id, query_pool.id)
|
||||
.create("foo", topic.id, query_pool.id)
|
||||
.await
|
||||
.unwrap();
|
||||
let shard1 = repos
|
||||
|
|
|
@ -111,12 +111,7 @@ mod tests {
|
|||
let q = repos.query_pools().create_or_get("platanos").await.unwrap();
|
||||
let ns = repos
|
||||
.namespaces()
|
||||
.create(
|
||||
TABLE_NAME,
|
||||
iox_catalog::INFINITE_RETENTION_POLICY,
|
||||
t.id,
|
||||
q.id,
|
||||
)
|
||||
.create(TABLE_NAME, t.id, q.id)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
|
|
|
@ -517,7 +517,7 @@ mod tests {
|
|||
let shard_index = ShardIndex::new(0);
|
||||
let namespace = txn
|
||||
.namespaces()
|
||||
.create("foo", "inf", topic.id, query_pool.id)
|
||||
.create("foo", topic.id, query_pool.id)
|
||||
.await
|
||||
.unwrap();
|
||||
let mut shard = txn
|
||||
|
|
|
@ -730,12 +730,7 @@ pub(crate) async fn populate_catalog(
|
|||
let query_pool = c.query_pools().create_or_get("query-pool").await.unwrap();
|
||||
let ns_id = c
|
||||
.namespaces()
|
||||
.create(
|
||||
namespace,
|
||||
iox_catalog::INFINITE_RETENTION_POLICY,
|
||||
topic.id,
|
||||
query_pool.id,
|
||||
)
|
||||
.create(namespace, topic.id, query_pool.id)
|
||||
.await
|
||||
.unwrap()
|
||||
.id;
|
||||
|
|
|
@ -181,12 +181,7 @@ impl TestContext {
|
|||
.repositories()
|
||||
.await
|
||||
.namespaces()
|
||||
.create(
|
||||
name,
|
||||
iox_catalog::INFINITE_RETENTION_POLICY,
|
||||
self.topic_id,
|
||||
self.query_id,
|
||||
)
|
||||
.create(name, self.topic_id, self.query_id)
|
||||
.await
|
||||
.expect("failed to create test namespace");
|
||||
|
||||
|
|
|
@ -290,7 +290,6 @@ pub trait NamespaceRepo: Send + Sync {
|
|||
async fn create(
|
||||
&mut self,
|
||||
name: &str,
|
||||
retention_duration: &str,
|
||||
topic_id: TopicId,
|
||||
query_pool_id: QueryPoolId,
|
||||
) -> Result<Namespace>;
|
||||
|
@ -977,7 +976,7 @@ pub(crate) mod test_helpers {
|
|||
let namespace_name = "test_namespace";
|
||||
let namespace = repos
|
||||
.namespaces()
|
||||
.create(namespace_name, "inf", topic.id, pool.id)
|
||||
.create(namespace_name, topic.id, pool.id)
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(namespace.id > NamespaceId::new(0));
|
||||
|
@ -992,7 +991,7 @@ pub(crate) mod test_helpers {
|
|||
|
||||
let conflict = repos
|
||||
.namespaces()
|
||||
.create(namespace_name, "inf", topic.id, pool.id)
|
||||
.create(namespace_name, topic.id, pool.id)
|
||||
.await;
|
||||
assert!(matches!(
|
||||
conflict.unwrap_err(),
|
||||
|
@ -1032,7 +1031,7 @@ pub(crate) mod test_helpers {
|
|||
let namespace2_name = "test_namespace2";
|
||||
let namespace2 = repos
|
||||
.namespaces()
|
||||
.create(namespace2_name, "inf", topic.id, pool.id)
|
||||
.create(namespace2_name, topic.id, pool.id)
|
||||
.await
|
||||
.unwrap();
|
||||
let mut namespaces = repos.namespaces().list().await.unwrap();
|
||||
|
@ -1081,7 +1080,7 @@ pub(crate) mod test_helpers {
|
|||
let pool = repos.query_pools().create_or_get("foo").await.unwrap();
|
||||
let namespace = repos
|
||||
.namespaces()
|
||||
.create("namespace_table_test", "inf", topic.id, pool.id)
|
||||
.create("namespace_table_test", topic.id, pool.id)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
|
@ -1118,7 +1117,7 @@ pub(crate) mod test_helpers {
|
|||
// test we can create a table of the same name in a different namespace
|
||||
let namespace2 = repos
|
||||
.namespaces()
|
||||
.create("two", "inf", topic.id, pool.id)
|
||||
.create("two", topic.id, pool.id)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_ne!(namespace, namespace2);
|
||||
|
@ -1209,7 +1208,7 @@ pub(crate) mod test_helpers {
|
|||
let pool = repos.query_pools().create_or_get("foo").await.unwrap();
|
||||
let namespace = repos
|
||||
.namespaces()
|
||||
.create("namespace_column_test", "inf", topic.id, pool.id)
|
||||
.create("namespace_column_test", topic.id, pool.id)
|
||||
.await
|
||||
.unwrap();
|
||||
let table = repos
|
||||
|
@ -1420,7 +1419,7 @@ pub(crate) mod test_helpers {
|
|||
let pool = repos.query_pools().create_or_get("foo").await.unwrap();
|
||||
let namespace = repos
|
||||
.namespaces()
|
||||
.create("namespace_partition_test", "inf", topic.id, pool.id)
|
||||
.create("namespace_partition_test", topic.id, pool.id)
|
||||
.await
|
||||
.unwrap();
|
||||
let table = repos
|
||||
|
@ -1498,7 +1497,7 @@ pub(crate) mod test_helpers {
|
|||
// test list_by_namespace
|
||||
let namespace2 = repos
|
||||
.namespaces()
|
||||
.create("namespace_partition_test2", "inf", topic.id, pool.id)
|
||||
.create("namespace_partition_test2", topic.id, pool.id)
|
||||
.await
|
||||
.unwrap();
|
||||
let table2 = repos
|
||||
|
@ -1695,7 +1694,7 @@ pub(crate) mod test_helpers {
|
|||
let pool = repos.query_pools().create_or_get("foo").await.unwrap();
|
||||
let namespace = repos
|
||||
.namespaces()
|
||||
.create("namespace_tombstone_test", "inf", topic.id, pool.id)
|
||||
.create("namespace_tombstone_test", topic.id, pool.id)
|
||||
.await
|
||||
.unwrap();
|
||||
let table = repos
|
||||
|
@ -1779,7 +1778,7 @@ pub(crate) mod test_helpers {
|
|||
// test list_by_namespace
|
||||
let namespace2 = repos
|
||||
.namespaces()
|
||||
.create("namespace_tombstone_test2", "inf", topic.id, pool.id)
|
||||
.create("namespace_tombstone_test2", topic.id, pool.id)
|
||||
.await
|
||||
.unwrap();
|
||||
let table2 = repos
|
||||
|
@ -1860,7 +1859,6 @@ pub(crate) mod test_helpers {
|
|||
.namespaces()
|
||||
.create(
|
||||
"namespace_tombstones_by_parquet_file_test",
|
||||
"inf",
|
||||
topic.id,
|
||||
pool.id,
|
||||
)
|
||||
|
@ -2077,7 +2075,7 @@ pub(crate) mod test_helpers {
|
|||
let pool = repos.query_pools().create_or_get("foo").await.unwrap();
|
||||
let namespace = repos
|
||||
.namespaces()
|
||||
.create("namespace_parquet_file_test", "inf", topic.id, pool.id)
|
||||
.create("namespace_parquet_file_test", topic.id, pool.id)
|
||||
.await
|
||||
.unwrap();
|
||||
let table = repos
|
||||
|
@ -2232,7 +2230,7 @@ pub(crate) mod test_helpers {
|
|||
// test list_by_namespace_not_to_delete
|
||||
let namespace2 = repos
|
||||
.namespaces()
|
||||
.create("namespace_parquet_file_test1", "inf", topic.id, pool.id)
|
||||
.create("namespace_parquet_file_test1", topic.id, pool.id)
|
||||
.await
|
||||
.unwrap();
|
||||
let table2 = repos
|
||||
|
@ -2583,7 +2581,6 @@ pub(crate) mod test_helpers {
|
|||
.namespaces()
|
||||
.create(
|
||||
"namespace_parquet_file_compaction_level_0_test",
|
||||
"inf",
|
||||
topic.id,
|
||||
pool.id,
|
||||
)
|
||||
|
@ -2701,7 +2698,6 @@ pub(crate) mod test_helpers {
|
|||
.namespaces()
|
||||
.create(
|
||||
"namespace_parquet_file_compaction_level_1_test",
|
||||
"inf",
|
||||
topic.id,
|
||||
pool.id,
|
||||
)
|
||||
|
@ -2922,12 +2918,7 @@ pub(crate) mod test_helpers {
|
|||
.unwrap();
|
||||
let namespace = repos
|
||||
.namespaces()
|
||||
.create(
|
||||
"test_most_level_0_files_partitions",
|
||||
"inf",
|
||||
topic.id,
|
||||
pool.id,
|
||||
)
|
||||
.create("test_most_level_0_files_partitions", topic.id, pool.id)
|
||||
.await
|
||||
.unwrap();
|
||||
let table = repos
|
||||
|
@ -3381,7 +3372,6 @@ pub(crate) mod test_helpers {
|
|||
.namespaces()
|
||||
.create(
|
||||
"test_recent_highest_throughput_partitions",
|
||||
"inf",
|
||||
topic.id,
|
||||
pool.id,
|
||||
)
|
||||
|
@ -3672,7 +3662,6 @@ pub(crate) mod test_helpers {
|
|||
.namespaces()
|
||||
.create(
|
||||
"namespace_parquet_file_test_list_by_partiton_not_to_delete",
|
||||
"inf",
|
||||
topic.id,
|
||||
pool.id,
|
||||
)
|
||||
|
@ -3785,7 +3774,6 @@ pub(crate) mod test_helpers {
|
|||
.namespaces()
|
||||
.create(
|
||||
"namespace_update_to_compaction_level_1_test",
|
||||
"inf",
|
||||
topic.id,
|
||||
pool.id,
|
||||
)
|
||||
|
@ -3907,12 +3895,7 @@ pub(crate) mod test_helpers {
|
|||
let pool = repos.query_pools().create_or_get("foo").await.unwrap();
|
||||
let namespace = repos
|
||||
.namespaces()
|
||||
.create(
|
||||
"namespace_processed_tombstone_test",
|
||||
"inf",
|
||||
topic.id,
|
||||
pool.id,
|
||||
)
|
||||
.create("namespace_processed_tombstone_test", topic.id, pool.id)
|
||||
.await
|
||||
.unwrap();
|
||||
let table = repos
|
||||
|
@ -4152,7 +4135,7 @@ pub(crate) mod test_helpers {
|
|||
let pool = repos.query_pools().create_or_get("foo").await.unwrap();
|
||||
let namespace = repos
|
||||
.namespaces()
|
||||
.create(namespace_name, "inf", topic.id, pool.id)
|
||||
.create(namespace_name, topic.id, pool.id)
|
||||
.await;
|
||||
|
||||
let namespace = match namespace {
|
||||
|
|
|
@ -36,8 +36,6 @@ pub const DEFAULT_MAX_COLUMNS_PER_TABLE: i32 = 200;
|
|||
pub const DEFAULT_RETENTION_PERIOD: Option<i64> = None;
|
||||
|
||||
/// A string value representing an infinite retention policy.
|
||||
pub const INFINITE_RETENTION_POLICY: &str = "inf";
|
||||
|
||||
pub mod interface;
|
||||
pub mod mem;
|
||||
pub mod metrics;
|
||||
|
@ -272,7 +270,7 @@ mod tests {
|
|||
|
||||
let namespace = txn
|
||||
.namespaces()
|
||||
.create(NAMESPACE_NAME, "inf", topic.id, query_pool.id)
|
||||
.create(NAMESPACE_NAME, topic.id, query_pool.id)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
|
|
|
@ -285,7 +285,6 @@ impl NamespaceRepo for MemTxn {
|
|||
async fn create(
|
||||
&mut self,
|
||||
name: &str,
|
||||
retention_duration: &str,
|
||||
topic_id: TopicId,
|
||||
query_pool_id: QueryPoolId,
|
||||
) -> Result<Namespace> {
|
||||
|
@ -302,7 +301,7 @@ impl NamespaceRepo for MemTxn {
|
|||
name: name.to_string(),
|
||||
topic_id,
|
||||
query_pool_id,
|
||||
retention_duration: Some(retention_duration.to_string()),
|
||||
retention_duration: Some("inf".to_string()), // temporary until the field retention_durantion is removed in the catalgo table in next PR
|
||||
max_tables: DEFAULT_MAX_TABLES,
|
||||
max_columns_per_table: DEFAULT_MAX_COLUMNS_PER_TABLE,
|
||||
retention_period_ns: DEFAULT_RETENTION_PERIOD,
|
||||
|
|
|
@ -193,7 +193,7 @@ decorate!(
|
|||
decorate!(
|
||||
impl_trait = NamespaceRepo,
|
||||
methods = [
|
||||
"namespace_create" = create(&mut self, name: &str, retention_duration: &str, topic_id: TopicId, query_pool_id: QueryPoolId) -> Result<Namespace>;
|
||||
"namespace_create" = create(&mut self, name: &str, topic_id: TopicId, query_pool_id: QueryPoolId) -> Result<Namespace>;
|
||||
"namespace_update_retention_period" = update_retention_period(&mut self, name: &str, retention_hours: i64) -> Result<Namespace>;
|
||||
"namespace_list" = list(&mut self) -> Result<Vec<Namespace>>;
|
||||
"namespace_get_by_id" = get_by_id(&mut self, id: NamespaceId) -> Result<Option<Namespace>>;
|
||||
|
|
|
@ -589,21 +589,19 @@ impl NamespaceRepo for PostgresTxn {
|
|||
async fn create(
|
||||
&mut self,
|
||||
name: &str,
|
||||
retention_duration: &str,
|
||||
topic_id: TopicId,
|
||||
query_pool_id: QueryPoolId,
|
||||
) -> Result<Namespace> {
|
||||
let rec = sqlx::query_as::<_, Namespace>(
|
||||
r#"
|
||||
INSERT INTO namespace ( name, retention_duration, topic_id, query_pool_id )
|
||||
VALUES ( $1, $2, $3, $4 )
|
||||
RETURNING *;
|
||||
"#,
|
||||
INSERT INTO namespace ( name, topic_id, query_pool_id )
|
||||
VALUES ( $1, $2, $3 )
|
||||
RETURNING *;
|
||||
"#,
|
||||
)
|
||||
.bind(name) // $1
|
||||
.bind(retention_duration) // $2
|
||||
.bind(topic_id) // $3
|
||||
.bind(query_pool_id) // $4
|
||||
.bind(topic_id) // $2
|
||||
.bind(query_pool_id) // $3
|
||||
.fetch_one(&mut self.inner)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
|
@ -2355,7 +2353,7 @@ mod tests {
|
|||
.repositories()
|
||||
.await
|
||||
.namespaces()
|
||||
.create("ns", crate::INFINITE_RETENTION_POLICY, kafka.id, query.id)
|
||||
.create("ns", kafka.id, query.id)
|
||||
.await
|
||||
.expect("namespace create failed")
|
||||
.id;
|
||||
|
@ -2431,7 +2429,7 @@ mod tests {
|
|||
.repositories()
|
||||
.await
|
||||
.namespaces()
|
||||
.create("ns2", crate::INFINITE_RETENTION_POLICY, kafka.id, query.id)
|
||||
.create("ns2", kafka.id, query.id)
|
||||
.await
|
||||
.expect("namespace create failed")
|
||||
.id;
|
||||
|
@ -2514,7 +2512,7 @@ mod tests {
|
|||
.repositories()
|
||||
.await
|
||||
.namespaces()
|
||||
.create("ns4", crate::INFINITE_RETENTION_POLICY, kafka.id, query.id)
|
||||
.create("ns4", kafka.id, query.id)
|
||||
.await
|
||||
.expect("namespace create failed")
|
||||
.id;
|
||||
|
@ -2573,7 +2571,7 @@ mod tests {
|
|||
.repositories()
|
||||
.await
|
||||
.namespaces()
|
||||
.create("ns3", crate::INFINITE_RETENTION_POLICY, kafka.id, query.id)
|
||||
.create("ns3", kafka.id, query.id)
|
||||
.await
|
||||
.expect("namespace create failed")
|
||||
.id;
|
||||
|
@ -2734,7 +2732,7 @@ mod tests {
|
|||
.repositories()
|
||||
.await
|
||||
.namespaces()
|
||||
.create("ns4", crate::INFINITE_RETENTION_POLICY, kafka.id, query.id)
|
||||
.create("ns4", kafka.id, query.id)
|
||||
.await
|
||||
.expect("namespace create failed")
|
||||
.id;
|
||||
|
@ -2916,7 +2914,7 @@ mod tests {
|
|||
.repositories()
|
||||
.await
|
||||
.namespaces()
|
||||
.create("ns4", crate::INFINITE_RETENTION_POLICY, kafka.id, query.id)
|
||||
.create("ns4", kafka.id, query.id)
|
||||
.await
|
||||
.expect("namespace create failed")
|
||||
.id;
|
||||
|
|
|
@ -151,7 +151,7 @@ impl TestCatalog {
|
|||
let query_pool = repos.query_pools().create_or_get("pool").await.unwrap();
|
||||
let namespace = repos
|
||||
.namespaces()
|
||||
.create(name, "1y", topic.id, query_pool.id)
|
||||
.create(name, topic.id, query_pool.id)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
|
|
|
@ -258,7 +258,6 @@ pub async fn create_router_server_type(
|
|||
Arc::clone(&catalog),
|
||||
topic_id,
|
||||
query_id,
|
||||
iox_catalog::INFINITE_RETENTION_POLICY.to_owned(),
|
||||
);
|
||||
//
|
||||
////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -412,7 +411,7 @@ mod tests {
|
|||
let pool = repos.query_pools().create_or_get("foo").await.unwrap();
|
||||
let namespace = repos
|
||||
.namespaces()
|
||||
.create("test_ns", "inf", topic.id, pool.id)
|
||||
.create("test_ns", topic.id, pool.id)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
|
|
|
@ -224,7 +224,6 @@ mod tests {
|
|||
Namespace {
|
||||
id: NamespaceId::new(1),
|
||||
name: ns.to_string(),
|
||||
retention_duration: Some("inf".to_owned()),
|
||||
topic_id: TopicId::new(42),
|
||||
query_pool_id: QueryPoolId::new(42),
|
||||
max_tables: iox_catalog::DEFAULT_MAX_TABLES,
|
||||
|
|
|
@ -167,12 +167,7 @@ mod tests {
|
|||
let query_pool = repos.query_pools().create_or_get("platanos").await.unwrap();
|
||||
repos
|
||||
.namespaces()
|
||||
.create(
|
||||
&ns,
|
||||
iox_catalog::INFINITE_RETENTION_POLICY,
|
||||
topic.id,
|
||||
query_pool.id,
|
||||
)
|
||||
.create(&ns, topic.id, query_pool.id)
|
||||
.await
|
||||
.expect("failed to setup catalog state");
|
||||
}
|
||||
|
|
|
@ -30,7 +30,6 @@ pub struct NamespaceAutocreation<C, T> {
|
|||
|
||||
topic_id: TopicId,
|
||||
query_id: QueryPoolId,
|
||||
retention: String,
|
||||
}
|
||||
|
||||
impl<C, T> NamespaceAutocreation<C, T> {
|
||||
|
@ -48,7 +47,6 @@ impl<C, T> NamespaceAutocreation<C, T> {
|
|||
catalog: Arc<dyn Catalog>,
|
||||
topic_id: TopicId,
|
||||
query_id: QueryPoolId,
|
||||
retention: String,
|
||||
) -> Self {
|
||||
Self {
|
||||
inner,
|
||||
|
@ -56,7 +54,6 @@ impl<C, T> NamespaceAutocreation<C, T> {
|
|||
catalog,
|
||||
topic_id,
|
||||
query_id,
|
||||
retention,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -80,12 +77,7 @@ where
|
|||
|
||||
match repos
|
||||
.namespaces()
|
||||
.create(
|
||||
namespace.as_str(),
|
||||
&self.retention,
|
||||
self.topic_id,
|
||||
self.query_id,
|
||||
)
|
||||
.create(namespace.as_str(), self.topic_id, self.query_id)
|
||||
.await
|
||||
{
|
||||
Ok(_) => {
|
||||
|
@ -148,7 +140,6 @@ mod tests {
|
|||
Arc::clone(&catalog),
|
||||
TopicId::new(42),
|
||||
QueryPoolId::new(42),
|
||||
"inf".to_owned(),
|
||||
);
|
||||
|
||||
// Drive the code under test
|
||||
|
@ -186,7 +177,6 @@ mod tests {
|
|||
Arc::clone(&catalog),
|
||||
TopicId::new(42),
|
||||
QueryPoolId::new(42),
|
||||
"inf".to_owned(),
|
||||
);
|
||||
|
||||
let created_id = creator
|
||||
|
|
|
@ -116,7 +116,6 @@ impl TestContext {
|
|||
Arc::clone(&catalog),
|
||||
TopicId::new(TEST_TOPIC_ID),
|
||||
QueryPoolId::new(TEST_QUERY_POOL_ID),
|
||||
iox_catalog::INFINITE_RETENTION_POLICY.to_owned(),
|
||||
);
|
||||
|
||||
let delegate = HttpDelegate::new(1024, 100, namespace_resolver, handler_stack, &metrics);
|
||||
|
@ -193,10 +192,6 @@ async fn test_write_ok() {
|
|||
.expect("query should succeed")
|
||||
.expect("namespace not found");
|
||||
assert_eq!(ns.name, "bananas_test");
|
||||
assert_eq!(
|
||||
ns.retention_duration.as_deref(),
|
||||
Some(iox_catalog::INFINITE_RETENTION_POLICY)
|
||||
);
|
||||
assert_eq!(ns.topic_id, TopicId::new(TEST_TOPIC_ID));
|
||||
assert_eq!(ns.query_pool_id, QueryPoolId::new(TEST_QUERY_POOL_ID));
|
||||
|
||||
|
@ -357,7 +352,6 @@ async fn test_write_propagate_ids() {
|
|||
.namespaces()
|
||||
.create(
|
||||
"bananas_test",
|
||||
iox_catalog::INFINITE_RETENTION_POLICY,
|
||||
TopicId::new(TEST_TOPIC_ID),
|
||||
QueryPoolId::new(TEST_QUERY_POOL_ID),
|
||||
)
|
||||
|
@ -432,7 +426,6 @@ async fn test_delete_propagate_ids() {
|
|||
.namespaces()
|
||||
.create(
|
||||
"bananas_test",
|
||||
iox_catalog::INFINITE_RETENTION_POLICY,
|
||||
TopicId::new(TEST_TOPIC_ID),
|
||||
QueryPoolId::new(TEST_QUERY_POOL_ID),
|
||||
)
|
||||
|
|
|
@ -194,7 +194,7 @@ mod tests {
|
|||
.unwrap();
|
||||
let namespace = repos
|
||||
.namespaces()
|
||||
.create("catalog_partition_test", "inf", topic.id, pool.id)
|
||||
.create("catalog_partition_test", topic.id, pool.id)
|
||||
.await
|
||||
.unwrap();
|
||||
let table = repos
|
||||
|
@ -271,7 +271,7 @@ mod tests {
|
|||
.unwrap();
|
||||
let namespace = repos
|
||||
.namespaces()
|
||||
.create("catalog_partition_test", "inf", topic.id, pool.id)
|
||||
.create("catalog_partition_test", topic.id, pool.id)
|
||||
.await
|
||||
.unwrap();
|
||||
let table = repos
|
||||
|
|
|
@ -127,7 +127,7 @@ mod tests {
|
|||
.unwrap();
|
||||
let namespace = repos
|
||||
.namespaces()
|
||||
.create("catalog_partition_test", "inf", topic.id, pool.id)
|
||||
.create("catalog_partition_test", topic.id, pool.id)
|
||||
.await
|
||||
.unwrap();
|
||||
let table = repos
|
||||
|
|
|
@ -123,7 +123,7 @@ mod tests {
|
|||
let pool = repos.query_pools().create_or_get("franz").await.unwrap();
|
||||
let namespace = repos
|
||||
.namespaces()
|
||||
.create("namespace_schema_test", "inf", topic.id, pool.id)
|
||||
.create("namespace_schema_test", topic.id, pool.id)
|
||||
.await
|
||||
.unwrap();
|
||||
let table = repos
|
||||
|
|
Loading…
Reference in New Issue