chore: Merge remote-tracking branch 'origin/main' into smith/remove-transactions-main

pull/24376/head
Jeffrey Smith II 2023-05-04 13:46:05 -04:00
commit 41d93aea4d
44 changed files with 440 additions and 1547 deletions

View File

@ -1,14 +1,13 @@
//! Catalog-DSN-related configs.
use iox_catalog::sqlite::{SqliteCatalog, SqliteConnectionOptions};
use iox_catalog::{
create_or_get_default_records,
interface::Catalog,
mem::MemCatalog,
postgres::{PostgresCatalog, PostgresConnectionOptions},
};
use observability_deps::tracing::*;
use snafu::{OptionExt, ResultExt, Snafu};
use std::{ops::DerefMut, sync::Arc, time::Duration};
use std::{sync::Arc, time::Duration};
#[derive(Debug, Snafu)]
#[allow(missing_docs)]
@ -211,12 +210,6 @@ impl CatalogDsnConfig {
}
CatalogType::Memory => {
let mem = MemCatalog::new(metrics);
let mut txn = mem.repositories().await;
create_or_get_default_records(txn.deref_mut())
.await
.context(CatalogSnafu)?;
Arc::new(mem) as Arc<dyn Catalog>
}
CatalogType::Sqlite => {

View File

@ -71,26 +71,6 @@ pub struct Router2Config {
)]
pub ingester_addresses: Vec<IngesterAddress>,
/// Write buffer topic/database that should be used.
// This isn't really relevant to the RPC write path and will be removed eventually.
#[clap(
long = "write-buffer-topic",
env = "INFLUXDB_IOX_WRITE_BUFFER_TOPIC",
default_value = "iox-shared",
action
)]
pub topic: String,
/// Query pool name to dispatch writes to.
// This isn't really relevant to the RPC write path and will be removed eventually.
#[clap(
long = "query-pool",
env = "INFLUXDB_IOX_QUERY_POOL_NAME",
default_value = "iox-shared",
action
)]
pub query_pool_name: String,
/// Retention period to use when auto-creating namespaces.
/// For infinite retention, leave this unset and it will default to `None`.
/// Setting it to zero will not make it infinite.

View File

@ -49,9 +49,7 @@ impl NamespacesSource for MockNamespacesSource {
mod tests {
use std::collections::BTreeMap;
use data_types::{
ColumnId, ColumnSchema, ColumnType, QueryPoolId, TableId, TableSchema, TopicId,
};
use data_types::{ColumnId, ColumnSchema, ColumnType, TableId, TableSchema};
use super::*;
@ -182,15 +180,11 @@ mod tests {
]);
let id = NamespaceId::new(id);
let topic_id = TopicId::new(0);
let query_pool_id = QueryPoolId::new(0);
Self {
namespace: NamespaceWrapper {
ns: Namespace {
id,
name: "ns".to_string(),
topic_id,
query_pool_id,
max_tables: 10,
max_columns_per_table: 10,
retention_period_ns: None,
@ -198,8 +192,6 @@ mod tests {
},
schema: NamespaceSchema {
id,
topic_id,
query_pool_id,
tables,
max_columns_per_table: 10,
max_tables: 42,

View File

@ -129,42 +129,6 @@ impl std::fmt::Display for NamespaceId {
}
}
/// Unique ID for a Topic, assigned by the catalog and used in [`TopicMetadata`]
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, sqlx::Type)]
#[sqlx(transparent)]
pub struct TopicId(i64);
#[allow(missing_docs)]
impl TopicId {
pub const fn new(v: i64) -> Self {
Self(v)
}
pub fn get(&self) -> i64 {
self.0
}
}
impl std::fmt::Display for TopicId {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
/// Unique ID for a `QueryPool`
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, sqlx::Type)]
#[sqlx(transparent)]
pub struct QueryPoolId(i64);
#[allow(missing_docs)]
impl QueryPoolId {
pub fn new(v: i64) -> Self {
Self(v)
}
pub fn get(&self) -> i64 {
self.0
}
}
/// Unique ID for a `Table`
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, sqlx::Type)]
#[sqlx(transparent)]
@ -341,25 +305,6 @@ impl std::fmt::Display for ParquetFileId {
}
}
/// Data object for a topic. When Kafka is used as the write buffer, this is the Kafka topic name
/// plus a catalog-assigned ID.
#[derive(Debug, Clone, Eq, PartialEq, sqlx::FromRow)]
pub struct TopicMetadata {
/// The id of the topic
pub id: TopicId,
/// The unique name of the topic
pub name: String,
}
/// Data object for a query pool
#[derive(Debug, Clone, Eq, PartialEq, sqlx::FromRow)]
pub struct QueryPool {
/// The id of the pool
pub id: QueryPoolId,
/// The unique name of the pool
pub name: String,
}
/// Data object for a namespace
#[derive(Debug, Clone, Eq, PartialEq, sqlx::FromRow)]
pub struct Namespace {
@ -370,10 +315,6 @@ pub struct Namespace {
#[sqlx(default)]
/// The retention period in ns. None represents infinite duration (i.e. never drop data).
pub retention_period_ns: Option<i64>,
/// The topic that writes to this namespace will land in
pub topic_id: TopicId,
/// The query pool assigned to answer queries for this namespace
pub query_pool_id: QueryPoolId,
/// The maximum number of tables that can exist in this namespace
pub max_tables: i32,
/// The maximum number of columns per table in this namespace
@ -388,10 +329,6 @@ pub struct Namespace {
pub struct NamespaceSchema {
/// the namespace id
pub id: NamespaceId,
/// the topic this namespace gets data written to
pub topic_id: TopicId,
/// the query pool assigned to answer queries for this namespace
pub query_pool_id: QueryPoolId,
/// the tables in the namespace by name
pub tables: BTreeMap<String, TableSchema>,
/// the number of columns per table this namespace allows
@ -407,8 +344,6 @@ impl NamespaceSchema {
/// Create a new `NamespaceSchema`
pub fn new(
id: NamespaceId,
topic_id: TopicId,
query_pool_id: QueryPoolId,
max_columns_per_table: i32,
max_tables: i32,
retention_period_ns: Option<i64>,
@ -416,8 +351,6 @@ impl NamespaceSchema {
Self {
id,
tables: BTreeMap::new(),
topic_id,
query_pool_id,
max_columns_per_table: max_columns_per_table as usize,
max_tables: max_tables as usize,
retention_period_ns,
@ -1171,35 +1104,6 @@ pub enum TemplatePart {
/// partition key parts such as "2021-03-14 12:25:21" and
/// "2021-04-14 12:24:21"
TimeFormat(String),
/// Applies a regex to the value in a string column
RegexCapture(RegexCapture),
/// Applies a `strftime` pattern to some column other than "time"
StrftimeColumn(StrftimeColumn),
}
/// `RegexCapture` is for pulling parts of a string column into the partition
/// key.
#[derive(Debug, Eq, PartialEq, Clone)]
#[allow(missing_docs)]
pub struct RegexCapture {
pub column: String,
pub regex: String,
}
/// [`StrftimeColumn`] is used to create a time based partition key off some
/// column other than the builtin `time` column.
///
/// The value of the named column is formatted using a `strftime`
/// style string.
///
/// For example, a time format of "%Y-%m-%d %H:%M:%S" will produce
/// partition key parts such as "2021-03-14 12:25:21" and
/// "2021-04-14 12:24:21"
#[derive(Debug, Eq, PartialEq, Clone)]
#[allow(missing_docs)]
pub struct StrftimeColumn {
pub column: String,
pub format: String,
}
/// Represents a parsed delete predicate for evaluation by the InfluxDB IOx
@ -3006,8 +2910,6 @@ mod tests {
fn test_namespace_schema_size() {
let schema1 = NamespaceSchema {
id: NamespaceId::new(1),
topic_id: TopicId::new(2),
query_pool_id: QueryPoolId::new(3),
tables: BTreeMap::from([]),
max_columns_per_table: 4,
max_tables: 42,
@ -3015,8 +2917,6 @@ mod tests {
};
let schema2 = NamespaceSchema {
id: NamespaceId::new(1),
topic_id: TopicId::new(2),
query_pool_id: QueryPoolId::new(3),
tables: BTreeMap::from([(String::from("foo"), TableSchema::new(TableId::new(1)))]),
max_columns_per_table: 4,
max_tables: 42,

View File

@ -143,8 +143,6 @@ $ influxdb_iox namespace list
$ influxdb_iox debug schema get 26f7e5a4b7be365b_917b97a92e883afc
{
"id": "1",
"topicId": "1",
"queryPoolId": "1",
"tables": {
"cpu": {
"id": "5",

View File

@ -48,13 +48,6 @@ OBJECT_STORE=file \
DATABASE_DIRECTORY=~/data_dir \
LOG_FILTER=debug \
./target/release/influxdb_iox catalog setup
# initialize the topic
INFLUXDB_IOX_CATALOG_DSN=postgres://postgres@localhost:5432/postgres \
OBJECT_STORE=file \
DATABASE_DIRECTORY=~/data_dir \
LOG_FILTER=debug \
./target/release/influxdb_iox catalog topic update iox-shared
```
## Inspecting Catalog state

View File

@ -155,11 +155,9 @@ mod tests {
let metric_registry = Arc::new(metric::Registry::new());
let catalog = Arc::new(MemCatalog::new(Arc::clone(&metric_registry)));
let mut repos = catalog.repositories().await;
let topic = repos.topics().create_or_get("foo").await.unwrap();
let pool = repos.query_pools().create_or_get("foo").await.unwrap();
let namespace = repos
.namespaces()
.create("namespace_parquet_file_test", None, topic.id, pool.id)
.create("namespace_parquet_file_test", None)
.await
.unwrap();
let table = repos

View File

@ -20,13 +20,15 @@ message NamespaceSchema {
// Renamed to topic_id
reserved 2;
reserved "kafka_topic_id";
// Removed topic ID
reserved 5;
reserved "topic_id";
// Removed query pool ID
reserved 3;
reserved "query_pool_id";
// Namespace ID
int64 id = 1;
// Topic ID
int64 topic_id = 5;
// Query Pool ID
int64 query_pool_id = 3;
// Map of Table Name -> Table Schema
map<string, TableSchema> tables = 4;
}

View File

@ -2,7 +2,7 @@ use crate::{AggregateTSMMeasurement, AggregateTSMSchema};
use chrono::{format::StrftimeItems, offset::FixedOffset, DateTime, Duration};
use data_types::{
ColumnType, Namespace, NamespaceName, NamespaceSchema, OrgBucketMappingError, Partition,
PartitionKey, QueryPoolId, TableSchema, TopicId,
PartitionKey, TableSchema,
};
use iox_catalog::interface::{
get_schema_by_name, CasFailure, Catalog, RepoCollection, SoftDeletedRows,
@ -25,9 +25,6 @@ pub enum UpdateCatalogError {
#[error("Couldn't construct namespace from org and bucket: {0}")]
InvalidOrgBucket(#[from] OrgBucketMappingError),
#[error("No topic named '{topic_name}' found in the catalog")]
TopicCatalogLookup { topic_name: String },
#[error("No namespace named {0} in Catalog")]
NamespaceNotFound(String),
@ -43,14 +40,10 @@ pub enum UpdateCatalogError {
/// Given a merged schema, update the IOx catalog to either merge that schema into the existing one
/// for the namespace, or create the namespace and schema using the merged schema.
/// Will error if the namespace needs to be created but the user hasn't explicitly set the query
/// pool name and retention setting, allowing the user to not provide them if they're not needed.
/// Would have done the same for `topic` but that comes from the shared clap block and isn't
/// an `Option`.
pub async fn update_iox_catalog<'a>(
merged_tsm_schema: &'a AggregateTSMSchema,
topic: &'a str,
query_pool_name: &'a str,
/// Will error if the namespace needs to be created but the user hasn't explicitly set the
/// retention setting, allowing the user to not provide it if it's not needed.
pub async fn update_iox_catalog(
merged_tsm_schema: &AggregateTSMSchema,
catalog: Arc<dyn Catalog>,
) -> Result<(), UpdateCatalogError> {
let namespace_name =
@ -67,15 +60,7 @@ pub async fn update_iox_catalog<'a>(
Ok(iox_schema) => iox_schema,
Err(iox_catalog::interface::Error::NamespaceNotFoundByName { .. }) => {
// create the namespace
let (topic_id, query_id) =
get_topic_id_and_query_id(repos.deref_mut(), topic, query_pool_name).await?;
let _namespace = create_namespace(
namespace_name.as_str(),
topic_id,
query_id,
repos.deref_mut(),
)
.await?;
let _namespace = create_namespace(namespace_name.as_str(), repos.deref_mut()).await?;
// fetch the newly-created schema (which will be empty except for the time column,
// which won't impact the merge we're about to do)
match get_schema_by_name(
@ -98,46 +83,11 @@ pub async fn update_iox_catalog<'a>(
Ok(())
}
async fn get_topic_id_and_query_id<'a, R>(
repos: &mut R,
topic_name: &'a str,
query_pool_name: &'a str,
) -> Result<(TopicId, QueryPoolId), UpdateCatalogError>
async fn create_namespace<R>(name: &str, repos: &mut R) -> Result<Namespace, UpdateCatalogError>
where
R: RepoCollection + ?Sized,
{
let topic_id = repos
.topics()
.get_by_name(topic_name)
.await
.map_err(UpdateCatalogError::CatalogError)?
.map(|v| v.id)
.ok_or_else(|| UpdateCatalogError::TopicCatalogLookup {
topic_name: topic_name.to_string(),
})?;
let query_id = repos
.query_pools()
.create_or_get(query_pool_name)
.await
.map(|v| v.id)
.map_err(UpdateCatalogError::CatalogError)?;
Ok((topic_id, query_id))
}
async fn create_namespace<R>(
name: &str,
topic_id: TopicId,
query_id: QueryPoolId,
repos: &mut R,
) -> Result<Namespace, UpdateCatalogError>
where
R: RepoCollection + ?Sized,
{
match repos
.namespaces()
.create(name, None, topic_id, query_id)
.await
{
match repos.namespaces().create(name, None).await {
Ok(ns) => Ok(ns),
Err(iox_catalog::interface::Error::NameExists { .. }) => {
// presumably it got created in the meantime?
@ -401,13 +351,6 @@ mod tests {
// init a test catalog stack
let metrics = Arc::new(metric::Registry::default());
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new(Arc::clone(&metrics)));
catalog
.repositories()
.await
.topics()
.create_or_get("iox-shared")
.await
.expect("topic created");
let json = r#"
{
@ -428,14 +371,9 @@ mod tests {
}
"#;
let agg_schema: AggregateTSMSchema = json.try_into().unwrap();
update_iox_catalog(
&agg_schema,
"iox-shared",
"iox-shared",
Arc::clone(&catalog),
)
.await
.expect("schema update worked");
update_iox_catalog(&agg_schema, Arc::clone(&catalog))
.await
.expect("schema update worked");
let mut repos = catalog.repositories().await;
let iox_schema = get_schema_by_name(
"1234_5678",
@ -483,19 +421,13 @@ mod tests {
// init a test catalog stack
let metrics = Arc::new(metric::Registry::default());
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new(Arc::clone(&metrics)));
// We need txn to go out of scope to release the lock before update_iox_catalog
{
let mut txn = catalog.repositories().await;
txn.topics()
.create_or_get("iox-shared")
.await
.expect("topic created");
// create namespace, table and columns for weather measurement
let namespace = txn
.namespaces()
.create("1234_5678", None, TopicId::new(1), QueryPoolId::new(1))
.create("1234_5678", None)
.await
.expect("namespace created");
let mut table = txn
@ -545,14 +477,9 @@ mod tests {
}
"#;
let agg_schema: AggregateTSMSchema = json.try_into().unwrap();
update_iox_catalog(
&agg_schema,
"iox-shared",
"iox-shared",
Arc::clone(&catalog),
)
.await
.expect("schema update worked");
update_iox_catalog(&agg_schema, Arc::clone(&catalog))
.await
.expect("schema update worked");
let mut repos = catalog.repositories().await;
let iox_schema = get_schema_by_name(
"1234_5678",
@ -589,15 +516,10 @@ mod tests {
// We need txn to go out of scope to release the lock before update_iox_catalog
{
let mut txn = catalog.repositories().await;
txn.topics()
.create_or_get("iox-shared")
.await
.expect("topic created");
// create namespace, table and columns for weather measurement
let namespace = txn
.namespaces()
.create("1234_5678", None, TopicId::new(1), QueryPoolId::new(1))
.create("1234_5678", None)
.await
.expect("namespace created");
let mut table = txn
@ -640,14 +562,9 @@ mod tests {
}
"#;
let agg_schema: AggregateTSMSchema = json.try_into().unwrap();
let err = update_iox_catalog(
&agg_schema,
"iox-shared",
"iox-shared",
Arc::clone(&catalog),
)
.await
.expect_err("should fail catalog update");
let err = update_iox_catalog(&agg_schema, Arc::clone(&catalog))
.await
.expect_err("should fail catalog update");
assert_matches!(err, UpdateCatalogError::SchemaUpdateError(_));
assert!(err
.to_string()
@ -663,15 +580,11 @@ mod tests {
// We need txn to go out of scope to release the lock before update_iox_catalog
{
let mut txn = catalog.repositories().await;
txn.topics()
.create_or_get("iox-shared")
.await
.expect("topic created");
// create namespace, table and columns for weather measurement
let namespace = txn
.namespaces()
.create("1234_5678", None, TopicId::new(1), QueryPoolId::new(1))
.create("1234_5678", None)
.await
.expect("namespace created");
let mut table = txn
@ -713,14 +626,9 @@ mod tests {
}
"#;
let agg_schema: AggregateTSMSchema = json.try_into().unwrap();
let err = update_iox_catalog(
&agg_schema,
"iox-shared",
"iox-shared",
Arc::clone(&catalog),
)
.await
.expect_err("should fail catalog update");
let err = update_iox_catalog(&agg_schema, Arc::clone(&catalog))
.await
.expect_err("should fail catalog update");
assert_matches!(err, UpdateCatalogError::SchemaUpdateError(_));
assert!(err.to_string().ends_with(
"a column with name temperature already exists in the schema with a different type"

View File

@ -5,14 +5,9 @@ use thiserror::Error;
use crate::process_info::setup_metric_registry;
mod topic;
#[allow(clippy::enum_variant_names)]
#[derive(Debug, Error)]
pub enum Error {
#[error("Error in topic subcommand: {0}")]
Topic(#[from] topic::Error),
#[error("Catalog error: {0}")]
Catalog(#[from] iox_catalog::interface::Error),
@ -39,9 +34,6 @@ struct Setup {
enum Command {
/// Run database migrations
Setup(Setup),
/// Manage topic
Topic(topic::Config),
}
pub async fn command(config: Config) -> Result<(), Error> {
@ -52,9 +44,6 @@ pub async fn command(config: Config) -> Result<(), Error> {
catalog.setup().await?;
println!("OK");
}
Command::Topic(config) => {
topic::command(config).await?;
}
}
Ok(())

View File

@ -1,54 +0,0 @@
//! This module implements the `catalog topic` CLI subcommand
use thiserror::Error;
use clap_blocks::catalog_dsn::CatalogDsnConfig;
use crate::process_info::setup_metric_registry;
#[allow(clippy::enum_variant_names)]
#[derive(Debug, Error)]
pub enum Error {
#[error("Error updating catalog: {0}")]
UpdateCatalogError(#[from] iox_catalog::interface::Error),
#[error("Catalog DSN error: {0}")]
CatalogDsn(#[from] clap_blocks::catalog_dsn::Error),
}
/// Manage IOx chunks
#[derive(Debug, clap::Parser)]
pub struct Config {
#[clap(subcommand)]
command: Command,
}
/// Create or update a topic
#[derive(Debug, clap::Parser)]
struct Update {
#[clap(flatten)]
catalog_dsn: CatalogDsnConfig,
/// The name of the topic
#[clap(action)]
db_name: String,
}
/// All possible subcommands for topic
#[derive(Debug, clap::Parser)]
enum Command {
Update(Update),
}
pub async fn command(config: Config) -> Result<(), Error> {
match config.command {
Command::Update(update) => {
let metrics = setup_metric_registry();
let catalog = update.catalog_dsn.get_catalog("cli", metrics).await?;
let mut repos = catalog.repositories().await;
let topic = repos.topics().create_or_get(&update.db_name).await?;
println!("{}", topic.id);
Ok(())
}
}
}

View File

@ -84,26 +84,6 @@ pub struct MergeConfig {
#[clap(flatten)]
catalog_dsn: CatalogDsnConfig,
/// Write buffer topic/database that should be used.
// This isn't really relevant to the RPC write path and will be removed eventually.
#[clap(
long = "write-buffer-topic",
env = "INFLUXDB_IOX_WRITE_BUFFER_TOPIC",
default_value = "iox-shared",
action
)]
pub topic: String,
/// Query pool name to dispatch writes to.
// This isn't really relevant to the RPC write path and will be removed eventually.
#[clap(
long = "query-pool",
env = "INFLUXDB_IOX_QUERY_POOL_NAME",
default_value = "iox-shared",
action
)]
pub query_pool_name: String,
#[clap(long)]
/// Retention setting setting (used only if we need to create the namespace)
retention: Option<String>,
@ -192,13 +172,7 @@ pub async fn command(config: Config) -> Result<(), SchemaCommandError> {
// given we have a valid aggregate TSM schema, fetch the schema for the namespace from
// the IOx catalog, if it exists, and update it with our aggregate schema
update_iox_catalog(
&merged_tsm_schema,
&merge_config.topic,
&merge_config.query_pool_name,
Arc::clone(&catalog),
)
.await?;
update_iox_catalog(&merged_tsm_schema, Arc::clone(&catalog)).await?;
Ok(())
}

View File

@ -64,9 +64,6 @@ pub const DEFAULT_INGESTER_GRPC_BIND_ADDR: &str = "127.0.0.1:8083";
/// The default bind address for the Compactor gRPC
pub const DEFAULT_COMPACTOR_GRPC_BIND_ADDR: &str = "127.0.0.1:8084";
// If you want this level of control, should be instantiating the services individually
const QUERY_POOL_NAME: &str = "iox-shared";
#[derive(Debug, Error)]
pub enum Error {
#[error("Run: {0}")]
@ -472,13 +469,11 @@ impl Config {
let router_config = Router2Config {
authz_address: authz_address.clone(),
single_tenant_deployment,
query_pool_name: QUERY_POOL_NAME.to_string(),
http_request_limit: 1_000,
ingester_addresses: ingester_addresses.clone(),
new_namespace_retention_hours: None, // infinite retention
namespace_autocreation_enabled: true,
partition_key_pattern: "%Y-%m-%d".to_string(),
topic: QUERY_POOL_NAME.to_string(),
rpc_write_timeout_seconds: Duration::new(3, 0),
rpc_write_replicas: None,
rpc_write_max_outgoing_bytes: ingester_config.rpc_write_max_incoming_bytes,
@ -590,14 +585,6 @@ pub async fn command(config: Config) -> Result<()> {
info!("running db migrations");
catalog.setup().await?;
// Create a topic
catalog
.repositories()
.await
.topics()
.create_or_get(QUERY_POOL_NAME)
.await?;
let object_store: Arc<DynObjectStore> =
make_object_store(router_run_config.object_store_config())
.map_err(Error::ObjectStoreParsing)?;

View File

@ -114,11 +114,9 @@ mod tests {
let (namespace_id, table_id) = {
let mut repos = catalog.repositories().await;
let t = repos.topics().create_or_get("platanos").await.unwrap();
let q = repos.query_pools().create_or_get("platanos").await.unwrap();
let ns = repos
.namespaces()
.create(TABLE_NAME, None, t.id, q.id)
.create(TABLE_NAME, None)
.await
.unwrap();

View File

@ -283,14 +283,7 @@ pub(crate) async fn populate_catalog(
table: &str,
) -> (NamespaceId, TableId) {
let mut c = catalog.repositories().await;
let topic = c.topics().create_or_get("kafka-topic").await.unwrap();
let query_pool = c.query_pools().create_or_get("query-pool").await.unwrap();
let ns_id = c
.namespaces()
.create(namespace, None, topic.id, query_pool.id)
.await
.unwrap()
.id;
let ns_id = c.namespaces().create(namespace, None).await.unwrap().id;
let table_id = c.tables().create_or_get(table, ns_id).await.unwrap().id;
(ns_id, table_id)

View File

@ -17,8 +17,7 @@ use std::{collections::HashMap, sync::Arc, time::Duration};
use arrow::record_batch::RecordBatch;
use arrow_flight::{decode::FlightRecordBatchStream, flight_service_server::FlightService, Ticket};
use data_types::{
Namespace, NamespaceId, NamespaceSchema, ParquetFile, PartitionKey, QueryPoolId,
SequenceNumber, TableId, TopicId, TopicMetadata,
Namespace, NamespaceId, NamespaceSchema, ParquetFile, PartitionKey, SequenceNumber, TableId,
};
use dml::{DmlMeta, DmlWrite};
use futures::{stream::FuturesUnordered, FutureExt, StreamExt, TryStreamExt};
@ -43,9 +42,6 @@ use tokio::sync::oneshot;
use tokio_util::sync::CancellationToken;
use tonic::Request;
/// The (legacy) topic name this ingester uses.
pub const TEST_TOPIC_NAME: &str = "banana-topics";
/// The default max persist queue depth - configurable with
/// [`TestContextBuilder::with_max_persist_queue_depth()`].
pub const DEFAULT_MAX_PERSIST_QUEUE_DEPTH: usize = 5;
@ -53,6 +49,11 @@ pub const DEFAULT_MAX_PERSIST_QUEUE_DEPTH: usize = 5;
/// [`TestContextBuilder::with_persist_hot_partition_cost()`].
pub const DEFAULT_PERSIST_HOT_PARTITION_COST: usize = 20_000_000;
/// Construct a new [`TestContextBuilder`] to make a [`TestContext`] for an [`ingester2`] instance.
pub fn test_context() -> TestContextBuilder {
TestContextBuilder::default()
}
/// Configure and construct a [`TestContext`] containing an [`ingester2`] instance.
#[derive(Debug)]
pub struct TestContextBuilder {
@ -126,24 +127,6 @@ impl TestContextBuilder {
let storage =
ParquetStorage::new(object_store, parquet_file::storage::StorageId::from("iox"));
// Initialise a topic and query pool.
//
// Note that tests should set up their own namespace via
// ensure_namespace()
let topic: TopicMetadata;
let query_id: QueryPoolId;
// txn must go out of scope so the lock is released for `ingester2::new`
{
let mut txn = catalog.repositories().await;
topic = txn.topics().create_or_get(TEST_TOPIC_NAME).await.unwrap();
query_id = txn
.query_pools()
.create_or_get("banana-query-pool")
.await
.unwrap()
.id;
}
// Settings so that the ingester will effectively never persist by itself, only on demand
let wal_rotation_period = Duration::from_secs(1_000_000);
@ -175,8 +158,6 @@ impl TestContextBuilder {
_dir: dir,
catalog,
_storage: storage,
query_id,
topic_id: topic.id,
metrics,
namespaces: Default::default(),
}
@ -193,8 +174,6 @@ pub struct TestContext<T> {
shutdown_tx: oneshot::Sender<CancellationToken>,
catalog: Arc<dyn Catalog>,
_storage: ParquetStorage,
query_id: QueryPoolId,
topic_id: TopicId,
metrics: Arc<metric::Registry>,
/// Once the last [`TempDir`] reference is dropped, the directory it
@ -228,7 +207,7 @@ where
.repositories()
.await
.namespaces()
.create(name, None, self.topic_id, self.query_id)
.create(name, None)
.await
.expect("failed to create test namespace");
@ -238,8 +217,6 @@ where
ns.id,
NamespaceSchema::new(
ns.id,
self.topic_id,
self.query_id,
iox_catalog::DEFAULT_MAX_COLUMNS_PER_TABLE,
iox_catalog::DEFAULT_MAX_TABLES,
retention_period_ns,

View File

@ -27,7 +27,6 @@ You'll then need to create the database. You can do this via the sqlx command li
cargo install sqlx-cli
DATABASE_URL=<dsn> sqlx database create
cargo run -q -- catalog setup
cargo run -- catalog topic update iox-shared
```
This will set up the database based on the files in `./migrations` in this crate. SQLx also creates

View File

@ -3,8 +3,8 @@
use async_trait::async_trait;
use data_types::{
Column, ColumnSchema, ColumnType, CompactionLevel, Namespace, NamespaceId, NamespaceSchema,
ParquetFile, ParquetFileId, ParquetFileParams, Partition, PartitionId, PartitionKey, QueryPool,
QueryPoolId, SkippedCompaction, Table, TableId, TableSchema, Timestamp, TopicId, TopicMetadata,
ParquetFile, ParquetFileId, ParquetFileParams, Partition, PartitionId, PartitionKey,
SkippedCompaction, Table, TableId, TableSchema, Timestamp,
};
use iox_time::TimeProvider;
use snafu::{OptionExt, Snafu};
@ -218,12 +218,6 @@ pub trait Catalog: Send + Sync + Debug + Display {
/// should and must not care how these are implemented.
#[async_trait]
pub trait RepoCollection: Send + Sync + Debug {
/// Repository for [topics](data_types::TopicMetadata).
fn topics(&mut self) -> &mut dyn TopicMetadataRepo;
/// Repository for [query pools](data_types::QueryPool).
fn query_pools(&mut self) -> &mut dyn QueryPoolRepo;
/// Repository for [namespaces](data_types::Namespace).
fn namespaces(&mut self) -> &mut dyn NamespaceRepo;
@ -240,36 +234,13 @@ pub trait RepoCollection: Send + Sync + Debug {
fn parquet_files(&mut self) -> &mut dyn ParquetFileRepo;
}
/// Functions for working with topics in the catalog.
#[async_trait]
pub trait TopicMetadataRepo: Send + Sync {
/// Creates the topic in the catalog or gets the existing record by name.
async fn create_or_get(&mut self, name: &str) -> Result<TopicMetadata>;
/// Gets the topic by its unique name
async fn get_by_name(&mut self, name: &str) -> Result<Option<TopicMetadata>>;
}
/// Functions for working with query pools in the catalog.
#[async_trait]
pub trait QueryPoolRepo: Send + Sync {
/// Creates the query pool in the catalog or gets the existing record by name.
async fn create_or_get(&mut self, name: &str) -> Result<QueryPool>;
}
/// Functions for working with namespaces in the catalog
#[async_trait]
pub trait NamespaceRepo: Send + Sync {
/// Creates the namespace in the catalog. If one by the same name already exists, an
/// error is returned.
/// Specify `None` for `retention_period_ns` to get infinite retention.
async fn create(
&mut self,
name: &str,
retention_period_ns: Option<i64>,
topic_id: TopicId,
query_pool_id: QueryPoolId,
) -> Result<Namespace>;
async fn create(&mut self, name: &str, retention_period_ns: Option<i64>) -> Result<Namespace>;
/// Update retention period for a namespace
async fn update_retention_period(
@ -562,8 +533,6 @@ where
let mut namespace = NamespaceSchema::new(
namespace.id,
namespace.topic_id,
namespace.query_pool_id,
namespace.max_columns_per_table,
namespace.max_tables,
namespace.retention_period_ns,
@ -715,8 +684,6 @@ pub async fn list_schemas(
let mut ns = NamespaceSchema::new(
v.id,
v.topic_id,
v.query_pool_id,
v.max_columns_per_table,
v.max_tables,
v.retention_period_ns,
@ -747,7 +714,6 @@ pub(crate) mod test_helpers {
test_setup(clean_state().await).await;
test_namespace_soft_deletion(clean_state().await).await;
test_partitions_new_file_between(clean_state().await).await;
test_query_pool(clean_state().await).await;
test_column(clean_state().await).await;
test_partition(clean_state().await).await;
test_parquet_file(clean_state().await).await;
@ -758,10 +724,6 @@ pub(crate) mod test_helpers {
test_list_schemas_soft_deleted_rows(clean_state().await).await;
test_delete_namespace(clean_state().await).await;
let catalog = clean_state().await;
test_topic(Arc::clone(&catalog)).await;
assert_metric_hit(&catalog.metrics(), "topic_create_or_get");
let catalog = clean_state().await;
test_namespace(Arc::clone(&catalog)).await;
assert_metric_hit(&catalog.metrics(), "namespace_create");
@ -788,41 +750,12 @@ pub(crate) mod test_helpers {
catalog.setup().await.expect("second catalog setup");
}
async fn test_topic(catalog: Arc<dyn Catalog>) {
let mut repos = catalog.repositories().await;
let topic_repo = repos.topics();
let k = topic_repo.create_or_get("foo").await.unwrap();
assert!(k.id > TopicId::new(0));
assert_eq!(k.name, "foo");
let k2 = topic_repo.create_or_get("foo").await.unwrap();
assert_eq!(k, k2);
let k3 = topic_repo.get_by_name("foo").await.unwrap().unwrap();
assert_eq!(k3, k);
let k3 = topic_repo.get_by_name("asdf").await.unwrap();
assert!(k3.is_none());
}
async fn test_query_pool(catalog: Arc<dyn Catalog>) {
let mut repos = catalog.repositories().await;
let query_repo = repos.query_pools();
let q = query_repo.create_or_get("foo").await.unwrap();
assert!(q.id > QueryPoolId::new(0));
assert_eq!(q.name, "foo");
let q2 = query_repo.create_or_get("foo").await.unwrap();
assert_eq!(q, q2);
}
async fn test_namespace(catalog: Arc<dyn Catalog>) {
let mut repos = catalog.repositories().await;
let topic = repos.topics().create_or_get("foo").await.unwrap();
let pool = repos.query_pools().create_or_get("foo").await.unwrap();
let namespace_name = "test_namespace";
let namespace = repos
.namespaces()
.create(namespace_name, None, topic.id, pool.id)
.create(namespace_name, None)
.await
.unwrap();
assert!(namespace.id > NamespaceId::new(0));
@ -835,10 +768,7 @@ pub(crate) mod test_helpers {
DEFAULT_MAX_COLUMNS_PER_TABLE
);
let conflict = repos
.namespaces()
.create(namespace_name, None, topic.id, pool.id)
.await;
let conflict = repos.namespaces().create(namespace_name, None).await;
assert!(matches!(
conflict.unwrap_err(),
Error::NameExists { name: _ }
@ -877,7 +807,7 @@ pub(crate) mod test_helpers {
let namespace2_name = "test_namespace2";
let namespace2 = repos
.namespaces()
.create(namespace2_name, None, topic.id, pool.id)
.create(namespace2_name, None)
.await
.unwrap();
let mut namespaces = repos
@ -926,7 +856,7 @@ pub(crate) mod test_helpers {
let namespace3_name = "test_namespace3";
let namespace3 = repos
.namespaces()
.create(namespace3_name, None, topic.id, pool.id)
.create(namespace3_name, None)
.await
.expect("namespace with NULL retention should be created");
assert!(namespace3.retention_period_ns.is_none());
@ -935,12 +865,7 @@ pub(crate) mod test_helpers {
let namespace4_name = "test_namespace4";
let namespace4 = repos
.namespaces()
.create(
namespace4_name,
Some(NEW_RETENTION_PERIOD_NS),
topic.id,
pool.id,
)
.create(namespace4_name, Some(NEW_RETENTION_PERIOD_NS))
.await
.expect("namespace with 5-hour retention should be created");
assert_eq!(
@ -986,19 +911,9 @@ pub(crate) mod test_helpers {
/// the expected rows for all three states of [`SoftDeletedRows`].
async fn test_namespace_soft_deletion(catalog: Arc<dyn Catalog>) {
let mut repos = catalog.repositories().await;
let topic = repos.topics().create_or_get("foo").await.unwrap();
let pool = repos.query_pools().create_or_get("foo").await.unwrap();
let deleted_ns = repos
.namespaces()
.create("deleted-ns", None, topic.id, pool.id)
.await
.unwrap();
let active_ns = repos
.namespaces()
.create("active-ns", None, topic.id, pool.id)
.await
.unwrap();
let deleted_ns = repos.namespaces().create("deleted-ns", None).await.unwrap();
let active_ns = repos.namespaces().create("active-ns", None).await.unwrap();
// Mark "deleted-ns" as soft-deleted.
repos.namespaces().soft_delete("deleted-ns").await.unwrap();
@ -1152,11 +1067,9 @@ pub(crate) mod test_helpers {
async fn test_table(catalog: Arc<dyn Catalog>) {
let mut repos = catalog.repositories().await;
let topic = repos.topics().create_or_get("foo").await.unwrap();
let pool = repos.query_pools().create_or_get("foo").await.unwrap();
let namespace = repos
.namespaces()
.create("namespace_table_test", None, topic.id, pool.id)
.create("namespace_table_test", None)
.await
.unwrap();
@ -1191,11 +1104,7 @@ pub(crate) mod test_helpers {
assert_eq!(vec![t.clone()], tables);
// test we can create a table of the same name in a different namespace
let namespace2 = repos
.namespaces()
.create("two", None, topic.id, pool.id)
.await
.unwrap();
let namespace2 = repos.namespaces().create("two", None).await.unwrap();
assert_ne!(namespace, namespace2);
let test_table = repos
.tables()
@ -1291,11 +1200,9 @@ pub(crate) mod test_helpers {
async fn test_column(catalog: Arc<dyn Catalog>) {
let mut repos = catalog.repositories().await;
let topic = repos.topics().create_or_get("foo").await.unwrap();
let pool = repos.query_pools().create_or_get("foo").await.unwrap();
let namespace = repos
.namespaces()
.create("namespace_column_test", None, topic.id, pool.id)
.create("namespace_column_test", None)
.await
.unwrap();
let table = repos
@ -1426,11 +1333,9 @@ pub(crate) mod test_helpers {
async fn test_partition(catalog: Arc<dyn Catalog>) {
let mut repos = catalog.repositories().await;
let topic = repos.topics().create_or_get("foo").await.unwrap();
let pool = repos.query_pools().create_or_get("foo").await.unwrap();
let namespace = repos
.namespaces()
.create("namespace_partition_test", None, topic.id, pool.id)
.create("namespace_partition_test", None)
.await
.unwrap();
let table = repos
@ -1710,11 +1615,9 @@ pub(crate) mod test_helpers {
/// tests many interactions with the catalog and parquet files. See the individual conditions herein
async fn test_parquet_file(catalog: Arc<dyn Catalog>) {
let mut repos = catalog.repositories().await;
let topic = repos.topics().create_or_get("foo").await.unwrap();
let pool = repos.query_pools().create_or_get("foo").await.unwrap();
let namespace = repos
.namespaces()
.create("namespace_parquet_file_test", None, topic.id, pool.id)
.create("namespace_parquet_file_test", None)
.await
.unwrap();
let table = repos
@ -1899,7 +1802,7 @@ pub(crate) mod test_helpers {
// test list_by_namespace_not_to_delete
let namespace2 = repos
.namespaces()
.create("namespace_parquet_file_test1", None, topic.id, pool.id)
.create("namespace_parquet_file_test1", None)
.await
.unwrap();
let table2 = repos
@ -2192,16 +2095,14 @@ pub(crate) mod test_helpers {
async fn test_parquet_file_delete_broken(catalog: Arc<dyn Catalog>) {
let mut repos = catalog.repositories().await;
let topic = repos.topics().create_or_get("foo").await.unwrap();
let pool = repos.query_pools().create_or_get("foo").await.unwrap();
let namespace_1 = repos
.namespaces()
.create("retention_broken_1", None, topic.id, pool.id)
.create("retention_broken_1", None)
.await
.unwrap();
let namespace_2 = repos
.namespaces()
.create("retention_broken_2", Some(1), topic.id, pool.id)
.create("retention_broken_2", Some(1))
.await
.unwrap();
let table_1 = repos
@ -2274,19 +2175,9 @@ pub(crate) mod test_helpers {
async fn test_partitions_new_file_between(catalog: Arc<dyn Catalog>) {
let mut repos = catalog.repositories().await;
let topic = repos
.topics()
.create_or_get("new_file_between")
.await
.unwrap();
let pool = repos
.query_pools()
.create_or_get("new_file_between")
.await
.unwrap();
let namespace = repos
.namespaces()
.create("test_partitions_new_file_between", None, topic.id, pool.id)
.create("test_partitions_new_file_between", None)
.await
.unwrap();
let table = repos
@ -2650,15 +2541,11 @@ pub(crate) mod test_helpers {
async fn test_list_by_partiton_not_to_delete(catalog: Arc<dyn Catalog>) {
let mut repos = catalog.repositories().await;
let topic = repos.topics().create_or_get("foo").await.unwrap();
let pool = repos.query_pools().create_or_get("foo").await.unwrap();
let namespace = repos
.namespaces()
.create(
"namespace_parquet_file_test_list_by_partiton_not_to_delete",
None,
topic.id,
pool.id,
)
.await
.unwrap();
@ -2764,16 +2651,9 @@ pub(crate) mod test_helpers {
async fn test_update_to_compaction_level_1(catalog: Arc<dyn Catalog>) {
let mut repos = catalog.repositories().await;
let topic = repos.topics().create_or_get("foo").await.unwrap();
let pool = repos.query_pools().create_or_get("foo").await.unwrap();
let namespace = repos
.namespaces()
.create(
"namespace_update_to_compaction_level_1_test",
None,
topic.id,
pool.id,
)
.create("namespace_update_to_compaction_level_1_test", None)
.await
.unwrap();
let table = repos
@ -2857,11 +2737,9 @@ pub(crate) mod test_helpers {
/// effective.
async fn test_delete_namespace(catalog: Arc<dyn Catalog>) {
let mut repos = catalog.repositories().await;
let topic = repos.topics().create_or_get("foo").await.unwrap();
let pool = repos.query_pools().create_or_get("foo").await.unwrap();
let namespace_1 = repos
.namespaces()
.create("namespace_test_delete_namespace_1", None, topic.id, pool.id)
.create("namespace_test_delete_namespace_1", None)
.await
.unwrap();
let table_1 = repos
@ -2916,7 +2794,7 @@ pub(crate) mod test_helpers {
// it, let's create another so we can ensure that doesn't get deleted.
let namespace_2 = repos
.namespaces()
.create("namespace_test_delete_namespace_2", None, topic.id, pool.id)
.create("namespace_test_delete_namespace_2", None)
.await
.unwrap();
let table_2 = repos
@ -3101,12 +2979,7 @@ pub(crate) mod test_helpers {
where
R: RepoCollection + ?Sized,
{
let topic = repos.topics().create_or_get("foo").await.unwrap();
let pool = repos.query_pools().create_or_get("foo").await.unwrap();
let namespace = repos
.namespaces()
.create(namespace_name, None, topic.id, pool.id)
.await;
let namespace = repos.namespaces().create(namespace_name, None).await;
let namespace = match namespace {
Ok(v) => v,
@ -3123,8 +2996,6 @@ pub(crate) mod test_helpers {
let batches = batches.iter().map(|(table, batch)| (table.as_str(), batch));
let ns = NamespaceSchema::new(
namespace.id,
topic.id,
pool.id,
namespace.max_columns_per_table,
namespace.max_tables,
namespace.retention_period_ns,

View File

@ -1,11 +1,13 @@
use data_types::TopicId;
/// Magic number to be used shard indices and shard ids in "kafkaless".
pub(crate) const TRANSITION_SHARD_NUMBER: i32 = 1234;
/// In kafkaless mode all new persisted data uses this shard id.
pub(crate) const TRANSITION_SHARD_ID: ShardId = ShardId::new(TRANSITION_SHARD_NUMBER as i64);
/// In kafkaless mode all new persisted data uses this shard index.
pub(crate) const TRANSITION_SHARD_INDEX: ShardIndex = ShardIndex::new(TRANSITION_SHARD_NUMBER);
pub(crate) const SHARED_TOPIC_NAME: &str = "iox-shared";
pub(crate) const SHARED_TOPIC_ID: TopicId = TopicId::new(1);
pub(crate) const SHARED_QUERY_POOL_ID: QueryPoolId = QueryPoolId::new(1);
pub(crate) const SHARED_QUERY_POOL: &str = SHARED_TOPIC_NAME;
/// Unique ID for a `Shard`, assigned by the catalog. Joins to other catalog tables to uniquely
/// identify shards independently of the underlying write buffer implementation.
@ -67,3 +69,27 @@ pub(crate) struct Shard {
/// and write buffer
pub(crate) shard_index: ShardIndex,
}
/// Unique ID for a Topic, assigned by the catalog
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, sqlx::Type)]
#[sqlx(transparent)]
pub struct TopicId(i64);
#[allow(missing_docs)]
impl TopicId {
pub const fn new(v: i64) -> Self {
Self(v)
}
}
/// Unique ID for a `QueryPool`
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, sqlx::Type)]
#[sqlx(transparent)]
pub struct QueryPoolId(i64);
#[allow(missing_docs)]
impl QueryPoolId {
pub const fn new(v: i64) -> Self {
Self(v)
}
}

View File

@ -14,14 +14,11 @@
)]
use crate::interface::{ColumnTypeMismatchSnafu, Error, RepoCollection, Result};
use data_types::{ColumnType, NamespaceSchema, QueryPool, TableSchema, TopicId, TopicMetadata};
use data_types::{ColumnType, NamespaceSchema, TableSchema};
use mutable_batch::MutableBatch;
use std::{borrow::Cow, collections::HashMap};
use thiserror::Error;
const SHARED_TOPIC_NAME: &str = "iox-shared";
const SHARED_TOPIC_ID: TopicId = TopicId::new(1);
const SHARED_QUERY_POOL: &str = SHARED_TOPIC_NAME;
const TIME_COLUMN: &str = "time";
/// Default per-namespace table count service protection limit.
@ -204,19 +201,6 @@ where
Ok(())
}
/// Creates or gets records in the catalog for the shared topic and query pool for each of the
/// partitions.
///
/// Used in tests and when creating an in-memory catalog.
pub async fn create_or_get_default_records(
txn: &mut dyn RepoCollection,
) -> Result<(TopicMetadata, QueryPool)> {
let topic = txn.topics().create_or_get(SHARED_TOPIC_NAME).await?;
let query_pool = txn.query_pools().create_or_get(SHARED_QUERY_POOL).await?;
Ok((topic, query_pool))
}
#[cfg(test)]
mod tests {
use std::{collections::BTreeMap, sync::Arc};
@ -251,20 +235,15 @@ mod tests {
let metrics = Arc::new(metric::Registry::default());
let repo = MemCatalog::new(metrics);
let mut txn = repo.repositories().await;
let (topic, query_pool) = create_or_get_default_records(
txn.deref_mut()
).await.unwrap();
let namespace = txn
.namespaces()
.create(NAMESPACE_NAME, None, topic.id, query_pool.id)
.create(NAMESPACE_NAME, None)
.await
.unwrap();
let schema = NamespaceSchema::new(
namespace.id,
namespace.topic_id,
namespace.query_pool_id,
namespace.max_columns_per_table,
namespace.max_tables,
namespace.retention_period_ns,

View File

@ -4,18 +4,18 @@
use crate::{
interface::{
CasFailure, Catalog, ColumnRepo, ColumnTypeMismatchSnafu, Error, NamespaceRepo,
ParquetFileRepo, PartitionRepo, QueryPoolRepo, RepoCollection, Result, SoftDeletedRows,
TableRepo, TopicMetadataRepo, MAX_PARQUET_FILES_SELECTED_ONCE,
ParquetFileRepo, PartitionRepo, RepoCollection, Result, SoftDeletedRows, TableRepo,
MAX_PARQUET_FILES_SELECTED_ONCE,
},
kafkaless_transition::{Shard, TRANSITION_SHARD_ID, TRANSITION_SHARD_INDEX},
kafkaless_transition::{Shard, SHARED_TOPIC_ID, TRANSITION_SHARD_ID, TRANSITION_SHARD_INDEX},
metrics::MetricDecorator,
DEFAULT_MAX_COLUMNS_PER_TABLE, DEFAULT_MAX_TABLES, SHARED_TOPIC_ID, SHARED_TOPIC_NAME,
DEFAULT_MAX_COLUMNS_PER_TABLE, DEFAULT_MAX_TABLES,
};
use async_trait::async_trait;
use data_types::{
Column, ColumnId, ColumnType, CompactionLevel, Namespace, NamespaceId, ParquetFile,
ParquetFileId, ParquetFileParams, Partition, PartitionId, PartitionKey, QueryPool, QueryPoolId,
SkippedCompaction, Table, TableId, Timestamp, TopicId, TopicMetadata,
ParquetFileId, ParquetFileParams, Partition, PartitionId, PartitionKey, SkippedCompaction,
Table, TableId, Timestamp,
};
use iox_time::{SystemProvider, TimeProvider};
use snafu::ensure;
@ -62,8 +62,6 @@ impl std::fmt::Debug for MemCatalog {
#[derive(Default, Debug, Clone)]
struct MemCollections {
topics: Vec<TopicMetadata>,
query_pools: Vec<QueryPool>,
namespaces: Vec<Namespace>,
tables: Vec<Table>,
columns: Vec<Column>,
@ -98,18 +96,10 @@ impl Catalog for MemCatalog {
let mut guard = Arc::clone(&self.collections).lock_owned().await;
let mut stage = guard.clone();
// We need to manually insert the topic here so that we can create the transition shard
// below.
let topic = TopicMetadata {
id: SHARED_TOPIC_ID,
name: SHARED_TOPIC_NAME.to_string(),
};
stage.topics.push(topic.clone());
// The transition shard must exist and must have magic ID and INDEX.
let shard = Shard {
id: TRANSITION_SHARD_ID,
topic_id: topic.id,
topic_id: SHARED_TOPIC_ID,
shard_index: TRANSITION_SHARD_INDEX,
};
stage.shards.push(shard);
@ -139,14 +129,6 @@ impl Catalog for MemCatalog {
#[async_trait]
impl RepoCollection for MemTxn {
fn topics(&mut self) -> &mut dyn TopicMetadataRepo {
self
}
fn query_pools(&mut self) -> &mut dyn QueryPoolRepo {
self
}
fn namespaces(&mut self) -> &mut dyn NamespaceRepo {
self
}
@ -168,64 +150,9 @@ impl RepoCollection for MemTxn {
}
}
#[async_trait]
impl TopicMetadataRepo for MemTxn {
async fn create_or_get(&mut self, name: &str) -> Result<TopicMetadata> {
let stage = self.stage();
let topic = match stage.topics.iter().find(|t| t.name == name) {
Some(t) => t,
None => {
let topic = TopicMetadata {
id: TopicId::new(stage.topics.len() as i64 + 1),
name: name.to_string(),
};
stage.topics.push(topic);
stage.topics.last().unwrap()
}
};
Ok(topic.clone())
}
async fn get_by_name(&mut self, name: &str) -> Result<Option<TopicMetadata>> {
let stage = self.stage();
let topic = stage.topics.iter().find(|t| t.name == name).cloned();
Ok(topic)
}
}
#[async_trait]
impl QueryPoolRepo for MemTxn {
async fn create_or_get(&mut self, name: &str) -> Result<QueryPool> {
let stage = self.stage();
let pool = match stage.query_pools.iter().find(|t| t.name == name) {
Some(t) => t,
None => {
let pool = QueryPool {
id: QueryPoolId::new(stage.query_pools.len() as i64 + 1),
name: name.to_string(),
};
stage.query_pools.push(pool);
stage.query_pools.last().unwrap()
}
};
Ok(pool.clone())
}
}
#[async_trait]
impl NamespaceRepo for MemTxn {
async fn create(
&mut self,
name: &str,
retention_period_ns: Option<i64>,
topic_id: TopicId,
query_pool_id: QueryPoolId,
) -> Result<Namespace> {
async fn create(&mut self, name: &str, retention_period_ns: Option<i64>) -> Result<Namespace> {
let stage = self.stage();
if stage.namespaces.iter().any(|n| n.name == name) {
@ -237,8 +164,6 @@ impl NamespaceRepo for MemTxn {
let namespace = Namespace {
id: NamespaceId::new(stage.namespaces.len() as i64 + 1),
name: name.to_string(),
topic_id,
query_pool_id,
max_tables: DEFAULT_MAX_TABLES,
max_columns_per_table: DEFAULT_MAX_COLUMNS_PER_TABLE,
retention_period_ns,

View File

@ -1,14 +1,14 @@
//! Metric instrumentation for catalog implementations.
use crate::interface::{
CasFailure, ColumnRepo, NamespaceRepo, ParquetFileRepo, PartitionRepo, QueryPoolRepo,
RepoCollection, Result, SoftDeletedRows, TableRepo, TopicMetadataRepo,
CasFailure, ColumnRepo, NamespaceRepo, ParquetFileRepo, PartitionRepo, RepoCollection, Result,
SoftDeletedRows, TableRepo,
};
use async_trait::async_trait;
use data_types::{
Column, ColumnType, CompactionLevel, Namespace, NamespaceId, ParquetFile, ParquetFileId,
ParquetFileParams, Partition, PartitionId, PartitionKey, QueryPool, QueryPoolId,
SkippedCompaction, Table, TableId, Timestamp, TopicId, TopicMetadata,
ParquetFileParams, Partition, PartitionId, PartitionKey, SkippedCompaction, Table, TableId,
Timestamp,
};
use iox_time::{SystemProvider, TimeProvider};
use metric::{DurationHistogram, Metric};
@ -41,24 +41,9 @@ impl<T> MetricDecorator<T> {
impl<T, P> RepoCollection for MetricDecorator<T, P>
where
T: TopicMetadataRepo
+ QueryPoolRepo
+ NamespaceRepo
+ TableRepo
+ ColumnRepo
+ PartitionRepo
+ ParquetFileRepo
+ Debug,
T: NamespaceRepo + TableRepo + ColumnRepo + PartitionRepo + ParquetFileRepo + Debug,
P: TimeProvider,
{
fn topics(&mut self) -> &mut dyn TopicMetadataRepo {
self
}
fn query_pools(&mut self) -> &mut dyn QueryPoolRepo {
self
}
fn namespaces(&mut self) -> &mut dyn NamespaceRepo {
self
}
@ -143,25 +128,10 @@ macro_rules! decorate {
};
}
decorate!(
impl_trait = TopicMetadataRepo,
methods = [
"topic_create_or_get" = create_or_get(&mut self, name: &str) -> Result<TopicMetadata>;
"topic_get_by_name" = get_by_name(&mut self, name: &str) -> Result<Option<TopicMetadata>>;
]
);
decorate!(
impl_trait = QueryPoolRepo,
methods = [
"query_create_or_get" = create_or_get(&mut self, name: &str) -> Result<QueryPool>;
]
);
decorate!(
impl_trait = NamespaceRepo,
methods = [
"namespace_create" = create(&mut self, name: &str, retention_period_ns: Option<i64>, topic_id: TopicId, query_pool_id: QueryPoolId) -> Result<Namespace>;
"namespace_create" = create(&mut self, name: &str, retention_period_ns: Option<i64>) -> Result<Namespace>;
"namespace_update_retention_period" = update_retention_period(&mut self, name: &str, retention_period_ns: Option<i64>) -> Result<Namespace>;
"namespace_list" = list(&mut self, deleted: SoftDeletedRows) -> Result<Vec<Namespace>>;
"namespace_get_by_id" = get_by_id(&mut self, id: NamespaceId, deleted: SoftDeletedRows) -> Result<Option<Namespace>>;

View File

@ -3,18 +3,21 @@
use crate::{
interface::{
self, CasFailure, Catalog, ColumnRepo, ColumnTypeMismatchSnafu, Error, NamespaceRepo,
ParquetFileRepo, PartitionRepo, QueryPoolRepo, RepoCollection, Result, SoftDeletedRows,
TableRepo, TopicMetadataRepo, MAX_PARQUET_FILES_SELECTED_ONCE,
ParquetFileRepo, PartitionRepo, RepoCollection, Result, SoftDeletedRows, TableRepo,
MAX_PARQUET_FILES_SELECTED_ONCE,
},
kafkaless_transition::{
SHARED_QUERY_POOL, SHARED_QUERY_POOL_ID, SHARED_TOPIC_ID, SHARED_TOPIC_NAME,
TRANSITION_SHARD_ID, TRANSITION_SHARD_INDEX,
},
kafkaless_transition::{TRANSITION_SHARD_ID, TRANSITION_SHARD_INDEX},
metrics::MetricDecorator,
DEFAULT_MAX_COLUMNS_PER_TABLE, DEFAULT_MAX_TABLES, SHARED_TOPIC_ID, SHARED_TOPIC_NAME,
DEFAULT_MAX_COLUMNS_PER_TABLE, DEFAULT_MAX_TABLES,
};
use async_trait::async_trait;
use data_types::{
Column, ColumnType, CompactionLevel, Namespace, NamespaceId, ParquetFile, ParquetFileId,
ParquetFileParams, Partition, PartitionId, PartitionKey, QueryPool, QueryPoolId,
SkippedCompaction, Table, TableId, Timestamp, TopicId, TopicMetadata,
ParquetFileParams, Partition, PartitionId, PartitionKey, SkippedCompaction, Table, TableId,
Timestamp,
};
use iox_time::{SystemProvider, TimeProvider};
use observability_deps::tracing::{debug, info, warn};
@ -240,7 +243,8 @@ impl Catalog for PostgresCatalog {
.await
.map_err(|e| Error::Setup { source: e.into() })?;
// We need to manually insert the topic here so that we can create the transition shard below.
// We need to manually insert the topic here so that we can create the transition shard
// below.
sqlx::query(
r#"
INSERT INTO topic (name)
@ -271,6 +275,21 @@ DO NOTHING;
.await
.map_err(|e| Error::Setup { source: e })?;
// We need to manually insert the query pool here so that we can create namespaces that
// reference it.
sqlx::query(
r#"
INSERT INTO query_pool (name)
VALUES ($1)
ON CONFLICT ON CONSTRAINT query_pool_name_unique
DO NOTHING;
"#,
)
.bind(SHARED_QUERY_POOL)
.execute(&self.pool)
.await
.map_err(|e| Error::Setup { source: e })?;
Ok(())
}
@ -458,14 +477,6 @@ fn get_dsn_file_path(dsn: &str) -> Option<String> {
#[async_trait]
impl RepoCollection for PostgresTxn {
fn topics(&mut self) -> &mut dyn TopicMetadataRepo {
self
}
fn query_pools(&mut self) -> &mut dyn QueryPoolRepo {
self
}
fn namespaces(&mut self) -> &mut dyn NamespaceRepo {
self
}
@ -487,88 +498,19 @@ impl RepoCollection for PostgresTxn {
}
}
#[async_trait]
impl TopicMetadataRepo for PostgresTxn {
async fn create_or_get(&mut self, name: &str) -> Result<TopicMetadata> {
let rec = sqlx::query_as::<_, TopicMetadata>(
r#"
INSERT INTO topic ( name )
VALUES ( $1 )
ON CONFLICT ON CONSTRAINT topic_name_unique
DO UPDATE SET name = topic.name
RETURNING *;
"#,
)
.bind(name) // $1
.fetch_one(&mut self.inner)
.await
.map_err(|e| Error::SqlxError { source: e })?;
Ok(rec)
}
async fn get_by_name(&mut self, name: &str) -> Result<Option<TopicMetadata>> {
let rec = sqlx::query_as::<_, TopicMetadata>(
r#"
SELECT *
FROM topic
WHERE name = $1;
"#,
)
.bind(name) // $1
.fetch_one(&mut self.inner)
.await;
if let Err(sqlx::Error::RowNotFound) = rec {
return Ok(None);
}
let topic = rec.map_err(|e| Error::SqlxError { source: e })?;
Ok(Some(topic))
}
}
#[async_trait]
impl QueryPoolRepo for PostgresTxn {
async fn create_or_get(&mut self, name: &str) -> Result<QueryPool> {
let rec = sqlx::query_as::<_, QueryPool>(
r#"
INSERT INTO query_pool ( name )
VALUES ( $1 )
ON CONFLICT ON CONSTRAINT query_pool_name_unique
DO UPDATE SET name = query_pool.name
RETURNING *;
"#,
)
.bind(name) // $1
.fetch_one(&mut self.inner)
.await
.map_err(|e| Error::SqlxError { source: e })?;
Ok(rec)
}
}
#[async_trait]
impl NamespaceRepo for PostgresTxn {
async fn create(
&mut self,
name: &str,
retention_period_ns: Option<i64>,
topic_id: TopicId,
query_pool_id: QueryPoolId,
) -> Result<Namespace> {
async fn create(&mut self, name: &str, retention_period_ns: Option<i64>) -> Result<Namespace> {
let rec = sqlx::query_as::<_, Namespace>(
r#"
INSERT INTO namespace ( name, topic_id, query_pool_id, retention_period_ns, max_tables )
VALUES ( $1, $2, $3, $4, $5 )
RETURNING *;
RETURNING id, name, retention_period_ns, max_tables, max_columns_per_table, deleted_at;
"#,
)
.bind(name) // $1
.bind(topic_id) // $2
.bind(query_pool_id) // $3
.bind(SHARED_TOPIC_ID) // $2
.bind(SHARED_QUERY_POOL_ID) // $3
.bind(retention_period_ns) // $4
.bind(DEFAULT_MAX_TABLES); // $5
@ -594,7 +536,11 @@ impl NamespaceRepo for PostgresTxn {
async fn list(&mut self, deleted: SoftDeletedRows) -> Result<Vec<Namespace>> {
let rec = sqlx::query_as::<_, Namespace>(
format!(
r#"SELECT * FROM namespace WHERE {v};"#,
r#"
SELECT id, name, retention_period_ns, max_tables, max_columns_per_table, deleted_at
FROM namespace
WHERE {v};
"#,
v = deleted.as_sql_predicate()
)
.as_str(),
@ -613,7 +559,11 @@ impl NamespaceRepo for PostgresTxn {
) -> Result<Option<Namespace>> {
let rec = sqlx::query_as::<_, Namespace>(
format!(
r#"SELECT * FROM namespace WHERE id=$1 AND {v};"#,
r#"
SELECT id, name, retention_period_ns, max_tables, max_columns_per_table, deleted_at
FROM namespace
WHERE id=$1 AND {v};
"#,
v = deleted.as_sql_predicate()
)
.as_str(),
@ -638,7 +588,11 @@ impl NamespaceRepo for PostgresTxn {
) -> Result<Option<Namespace>> {
let rec = sqlx::query_as::<_, Namespace>(
format!(
r#"SELECT * FROM namespace WHERE name=$1 AND {v};"#,
r#"
SELECT id, name, retention_period_ns, max_tables, max_columns_per_table, deleted_at
FROM namespace
WHERE name=$1 AND {v};
"#,
v = deleted.as_sql_predicate()
)
.as_str(),
@ -675,7 +629,7 @@ impl NamespaceRepo for PostgresTxn {
UPDATE namespace
SET max_tables = $1
WHERE name = $2
RETURNING *;
RETURNING id, name, retention_period_ns, max_tables, max_columns_per_table, deleted_at;
"#,
)
.bind(new_max)
@ -699,7 +653,7 @@ RETURNING *;
UPDATE namespace
SET max_columns_per_table = $1
WHERE name = $2
RETURNING *;
RETURNING id, name, retention_period_ns, max_tables, max_columns_per_table, deleted_at;
"#,
)
.bind(new_max)
@ -723,7 +677,12 @@ RETURNING *;
retention_period_ns: Option<i64>,
) -> Result<Namespace> {
let rec = sqlx::query_as::<_, Namespace>(
r#"UPDATE namespace SET retention_period_ns = $1 WHERE name = $2 RETURNING *;"#,
r#"
UPDATE namespace
SET retention_period_ns = $1
WHERE name = $2
RETURNING id, name, retention_period_ns, max_tables, max_columns_per_table, deleted_at;
"#,
)
.bind(retention_period_ns) // $1
.bind(name) // $2
@ -1567,7 +1526,7 @@ INSERT INTO parquet_file (
shard_id, table_id, partition_id, object_store_id,
min_time, max_time, file_size_bytes,
row_count, compaction_level, created_at, namespace_id, column_set, max_l0_created_at )
VALUES ( $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14 )
VALUES ( $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13 )
RETURNING
id, table_id, partition_id, object_store_id,
min_time, max_time, to_delete, file_size_bytes,
@ -1681,13 +1640,12 @@ fn is_fk_violation(e: &sqlx::Error) -> bool {
#[cfg(test)]
mod tests {
use super::*;
use crate::create_or_get_default_records;
use assert_matches::assert_matches;
use data_types::{ColumnId, ColumnSet};
use metric::{Attributes, DurationHistogram, Metric};
use rand::Rng;
use sqlx::migrate::MigrateDatabase;
use std::{env, io::Write, ops::DerefMut, sync::Arc, time::Instant};
use std::{env, io::Write, sync::Arc, time::Instant};
use tempfile::NamedTempFile;
// Helper macro to skip tests if TEST_INTEGRATION and TEST_INFLUXDB_IOX_CATALOG_DSN environment
@ -1864,18 +1822,13 @@ mod tests {
maybe_skip_integration!();
let postgres = setup_db().await;
let postgres: Arc<dyn Catalog> = Arc::new(postgres);
let mut txn = postgres.repositories().await;
let (kafka, query) = create_or_get_default_records(txn.deref_mut())
.await
.expect("db init failed");
let namespace_id = postgres
.repositories()
.await
.namespaces()
.create("ns4", None, kafka.id, query.id)
.create("ns4", None)
.await
.expect("namespace create failed")
.id;
@ -2007,18 +1960,13 @@ mod tests {
let postgres = setup_db().await;
let metrics = Arc::clone(&postgres.metrics);
let postgres: Arc<dyn Catalog> = Arc::new(postgres);
let mut txn = postgres.repositories().await;
let (kafka, query) = create_or_get_default_records(txn.deref_mut())
.await
.expect("db init failed");
let namespace_id = postgres
.repositories()
.await
.namespaces()
.create("ns4", None, kafka.id, query.id)
.create("ns4", None)
.await
.expect("namespace create failed")
.id;
@ -2175,18 +2123,13 @@ mod tests {
let postgres = setup_db().await;
let pool = postgres.pool.clone();
let postgres: Arc<dyn Catalog> = Arc::new(postgres);
let mut txn = postgres.repositories().await;
let (kafka, query) = create_or_get_default_records(txn.deref_mut())
.await
.expect("db init failed");
let namespace_id = postgres
.repositories()
.await
.namespaces()
.create("ns4", None, kafka.id, query.id)
.create("ns4", None)
.await
.expect("namespace create failed")
.id;

View File

@ -3,18 +3,21 @@
use crate::{
interface::{
self, CasFailure, Catalog, ColumnRepo, ColumnTypeMismatchSnafu, Error, NamespaceRepo,
ParquetFileRepo, PartitionRepo, QueryPoolRepo, RepoCollection, Result, SoftDeletedRows,
TableRepo, TopicMetadataRepo, MAX_PARQUET_FILES_SELECTED_ONCE,
ParquetFileRepo, PartitionRepo, RepoCollection, Result, SoftDeletedRows, TableRepo,
MAX_PARQUET_FILES_SELECTED_ONCE,
},
kafkaless_transition::{
SHARED_QUERY_POOL, SHARED_QUERY_POOL_ID, SHARED_TOPIC_ID, SHARED_TOPIC_NAME,
TRANSITION_SHARD_ID, TRANSITION_SHARD_INDEX,
},
kafkaless_transition::{TRANSITION_SHARD_ID, TRANSITION_SHARD_INDEX},
metrics::MetricDecorator,
DEFAULT_MAX_COLUMNS_PER_TABLE, DEFAULT_MAX_TABLES, SHARED_TOPIC_ID, SHARED_TOPIC_NAME,
DEFAULT_MAX_COLUMNS_PER_TABLE, DEFAULT_MAX_TABLES,
};
use async_trait::async_trait;
use data_types::{
Column, ColumnId, ColumnSet, ColumnType, CompactionLevel, Namespace, NamespaceId, ParquetFile,
ParquetFileId, ParquetFileParams, Partition, PartitionId, PartitionKey, QueryPool, QueryPoolId,
SkippedCompaction, Table, TableId, Timestamp, TopicId, TopicMetadata,
ParquetFileId, ParquetFileParams, Partition, PartitionId, PartitionKey, SkippedCompaction,
Table, TableId, Timestamp,
};
use serde::{Deserialize, Serialize};
use std::collections::HashSet;
@ -165,7 +168,8 @@ impl Catalog for SqliteCatalog {
.await
.map_err(|e| Error::Setup { source: e.into() })?;
// We need to manually insert the topic here so that we can create the transition shard below.
// We need to manually insert the topic here so that we can create the transition shard
// below.
sqlx::query(
r#"
INSERT INTO topic (name)
@ -195,6 +199,21 @@ DO NOTHING;
.await
.map_err(|e| Error::Setup { source: e })?;
// We need to manually insert the query pool here so that we can create namespaces that
// reference it.
sqlx::query(
r#"
INSERT INTO query_pool (name)
VALUES ($1)
ON CONFLICT (name)
DO NOTHING;
"#,
)
.bind(SHARED_QUERY_POOL)
.execute(&self.pool)
.await
.map_err(|e| Error::Setup { source: e })?;
Ok(())
}
@ -221,14 +240,6 @@ DO NOTHING;
#[async_trait]
impl RepoCollection for SqliteTxn {
fn topics(&mut self) -> &mut dyn TopicMetadataRepo {
self
}
fn query_pools(&mut self) -> &mut dyn QueryPoolRepo {
self
}
fn namespaces(&mut self) -> &mut dyn NamespaceRepo {
self
}
@ -250,90 +261,21 @@ impl RepoCollection for SqliteTxn {
}
}
#[async_trait]
impl TopicMetadataRepo for SqliteTxn {
async fn create_or_get(&mut self, name: &str) -> Result<TopicMetadata> {
let rec = sqlx::query_as::<_, TopicMetadata>(
r#"
INSERT INTO topic ( name )
VALUES ( $1 )
ON CONFLICT (name)
DO UPDATE SET name = topic.name
RETURNING *;
"#,
)
.bind(name) // $1
.fetch_one(self.inner.get_mut())
.await
.map_err(|e| Error::SqlxError { source: e })?;
Ok(rec)
}
async fn get_by_name(&mut self, name: &str) -> Result<Option<TopicMetadata>> {
let rec = sqlx::query_as::<_, TopicMetadata>(
r#"
SELECT *
FROM topic
WHERE name = $1;
"#,
)
.bind(name) // $1
.fetch_one(self.inner.get_mut())
.await;
if let Err(sqlx::Error::RowNotFound) = rec {
return Ok(None);
}
let topic = rec.map_err(|e| Error::SqlxError { source: e })?;
Ok(Some(topic))
}
}
#[async_trait]
impl QueryPoolRepo for SqliteTxn {
async fn create_or_get(&mut self, name: &str) -> Result<QueryPool> {
let rec = sqlx::query_as::<_, QueryPool>(
r#"
INSERT INTO query_pool ( name )
VALUES ( $1 )
ON CONFLICT (name)
DO UPDATE SET name = query_pool.name
RETURNING *;
"#,
)
.bind(name) // $1
.fetch_one(self.inner.get_mut())
.await
.map_err(|e| Error::SqlxError { source: e })?;
Ok(rec)
}
}
#[async_trait]
impl NamespaceRepo for SqliteTxn {
async fn create(
&mut self,
name: &str,
retention_period_ns: Option<i64>,
topic_id: TopicId,
query_pool_id: QueryPoolId,
) -> Result<Namespace> {
async fn create(&mut self, name: &str, retention_period_ns: Option<i64>) -> Result<Namespace> {
let rec = sqlx::query_as::<_, Namespace>(
r#"
INSERT INTO namespace ( name, topic_id, query_pool_id, retention_period_ns, max_tables )
VALUES ( $1, $2, $3, $4, $5 )
RETURNING *;
INSERT INTO namespace ( name, topic_id, query_pool_id, retention_period_ns, max_tables )
VALUES ( $1, $2, $3, $4, $5 )
RETURNING id, name, retention_period_ns, max_tables, max_columns_per_table, deleted_at;
"#,
)
.bind(name) // $1
.bind(topic_id) // $2
.bind(query_pool_id) // $3
.bind(retention_period_ns) // $4
.bind(DEFAULT_MAX_TABLES); // $5
.bind(name) // $1
.bind(SHARED_TOPIC_ID) // $2
.bind(SHARED_QUERY_POOL_ID) // $3
.bind(retention_period_ns) // $4
.bind(DEFAULT_MAX_TABLES); // $5
let rec = rec.fetch_one(self.inner.get_mut()).await.map_err(|e| {
if is_unique_violation(&e) {
@ -357,7 +299,11 @@ impl NamespaceRepo for SqliteTxn {
async fn list(&mut self, deleted: SoftDeletedRows) -> Result<Vec<Namespace>> {
let rec = sqlx::query_as::<_, Namespace>(
format!(
r#"SELECT * FROM namespace WHERE {v};"#,
r#"
SELECT id, name, retention_period_ns, max_tables, max_columns_per_table, deleted_at
FROM namespace
WHERE {v};
"#,
v = deleted.as_sql_predicate()
)
.as_str(),
@ -376,7 +322,11 @@ impl NamespaceRepo for SqliteTxn {
) -> Result<Option<Namespace>> {
let rec = sqlx::query_as::<_, Namespace>(
format!(
r#"SELECT * FROM namespace WHERE id=$1 AND {v};"#,
r#"
SELECT id, name, retention_period_ns, max_tables, max_columns_per_table, deleted_at
FROM namespace
WHERE id=$1 AND {v};
"#,
v = deleted.as_sql_predicate()
)
.as_str(),
@ -401,7 +351,11 @@ impl NamespaceRepo for SqliteTxn {
) -> Result<Option<Namespace>> {
let rec = sqlx::query_as::<_, Namespace>(
format!(
r#"SELECT * FROM namespace WHERE name=$1 AND {v};"#,
r#"
SELECT id, name, retention_period_ns, max_tables, max_columns_per_table, deleted_at
FROM namespace
WHERE name=$1 AND {v};
"#,
v = deleted.as_sql_predicate()
)
.as_str(),
@ -438,7 +392,7 @@ impl NamespaceRepo for SqliteTxn {
UPDATE namespace
SET max_tables = $1
WHERE name = $2
RETURNING *;
RETURNING id, name, retention_period_ns, max_tables, max_columns_per_table, deleted_at;
"#,
)
.bind(new_max)
@ -462,7 +416,7 @@ RETURNING *;
UPDATE namespace
SET max_columns_per_table = $1
WHERE name = $2
RETURNING *;
RETURNING id, name, retention_period_ns, max_tables, max_columns_per_table, deleted_at;
"#,
)
.bind(new_max)
@ -486,7 +440,12 @@ RETURNING *;
retention_period_ns: Option<i64>,
) -> Result<Namespace> {
let rec = sqlx::query_as::<_, Namespace>(
r#"UPDATE namespace SET retention_period_ns = $1 WHERE name = $2 RETURNING *;"#,
r#"
UPDATE namespace
SET retention_period_ns = $1
WHERE name = $2
RETURNING id, name, retention_period_ns, max_tables, max_columns_per_table, deleted_at;
"#,
)
.bind(retention_period_ns) // $1
.bind(name) // $2
@ -1436,7 +1395,7 @@ INSERT INTO parquet_file (
shard_id, table_id, partition_id, object_store_id,
min_time, max_time, file_size_bytes,
row_count, compaction_level, created_at, namespace_id, column_set, max_l0_created_at )
VALUES ( $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14 )
VALUES ( $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13 )
RETURNING
id, table_id, partition_id, object_store_id,
min_time, max_time, to_delete, file_size_bytes,
@ -1552,10 +1511,9 @@ fn is_unique_violation(e: &sqlx::Error) -> bool {
#[cfg(test)]
mod tests {
use super::*;
use crate::create_or_get_default_records;
use assert_matches::assert_matches;
use metric::{Attributes, DurationHistogram, Metric};
use std::{ops::DerefMut, sync::Arc};
use std::sync::Arc;
fn assert_metric_hit(metrics: &Registry, name: &'static str) {
let histogram = metrics
@ -1596,16 +1554,12 @@ mod tests {
let sqlite = setup_db().await;
let sqlite: Arc<dyn Catalog> = Arc::new(sqlite);
let mut txn = sqlite.repositories().await;
let (kafka, query) = create_or_get_default_records(txn.deref_mut())
.await
.expect("db init failed");
let namespace_id = sqlite
.repositories()
.await
.namespaces()
.create("ns4", None, kafka.id, query.id)
.create("ns4", None)
.await
.expect("namespace create failed")
.id;
@ -1653,16 +1607,12 @@ mod tests {
let metrics = Arc::clone(&sqlite.metrics);
let sqlite: Arc<dyn Catalog> = Arc::new(sqlite);
let mut txn = sqlite.repositories().await;
let (kafka, query) = create_or_get_default_records(txn.deref_mut())
.await
.expect("db init failed");
let namespace_id = sqlite
.repositories()
.await
.namespaces()
.create("ns4", None, kafka.id, query.id)
.create("ns4", None)
.await
.expect("namespace create failed")
.id;
@ -1819,16 +1769,12 @@ mod tests {
let pool = sqlite.pool.clone();
let sqlite: Arc<dyn Catalog> = Arc::new(sqlite);
let mut txn = sqlite.repositories().await;
let (kafka, query) = create_or_get_default_records(txn.deref_mut())
.await
.expect("db init failed");
let namespace_id = sqlite
.repositories()
.await
.namespaces()
.create("ns4", None, kafka.id, query.id)
.create("ns4", None)
.await
.expect("namespace create failed")
.id;

View File

@ -6,8 +6,7 @@ use arrow::{
};
use data_types::{
Column, ColumnSet, ColumnType, CompactionLevel, Namespace, NamespaceSchema, ParquetFile,
ParquetFileParams, Partition, PartitionId, QueryPool, Table, TableId, TableSchema, Timestamp,
TopicMetadata,
ParquetFileParams, Partition, PartitionId, Table, TableId, TableSchema, Timestamp,
};
use datafusion::physical_plan::metrics::Count;
use datafusion_util::MemoryStream;
@ -144,19 +143,14 @@ impl TestCatalog {
retention_period_ns: Option<i64>,
) -> Arc<TestNamespace> {
let mut repos = self.catalog.repositories().await;
let topic = repos.topics().create_or_get("topic").await.unwrap();
let query_pool = repos.query_pools().create_or_get("pool").await.unwrap();
let namespace = repos
.namespaces()
.create(name, retention_period_ns, topic.id, query_pool.id)
.create(name, retention_period_ns)
.await
.unwrap();
Arc::new(TestNamespace {
catalog: Arc::clone(self),
topic,
query_pool,
namespace,
})
}
@ -216,8 +210,6 @@ impl TestCatalog {
#[allow(missing_docs)]
pub struct TestNamespace {
pub catalog: Arc<TestCatalog>,
pub topic: TopicMetadata,
pub query_pool: QueryPool,
pub namespace: Namespace,
}

View File

@ -64,9 +64,6 @@ pub enum Error {
#[error("Catalog DSN error: {0}")]
CatalogDsn(#[from] clap_blocks::catalog_dsn::Error),
#[error("No topic named '{topic_name}' found in the catalog")]
TopicCatalogLookup { topic_name: String },
#[error("authz configuration error for '{addr}': '{source}'")]
AuthzConfig {
source: Box<dyn std::error::Error>,
@ -273,46 +270,10 @@ pub async fn create_router2_server_type(
// Initialise the Namespace ID lookup + cache
let namespace_resolver = NamespaceSchemaResolver::new(Arc::clone(&ns_cache));
////////////////////////////////////////////////////////////////////////////
//
// THIS CODE IS FOR TESTING ONLY.
//
// The source of truth for the topics & query pools will be read from
// the DB, rather than CLI args for a prod deployment.
//
////////////////////////////////////////////////////////////////////////////
//
// Look up the topic ID needed to populate namespace creation
// requests.
//
// This code / auto-creation is for architecture testing purposes only - a
// prod deployment would expect namespaces to be explicitly created and this
// layer would be removed.
let mut txn = catalog.repositories().await;
let topic_id = txn
.topics()
.get_by_name(&router_config.topic)
.await?
.map(|v| v.id)
.unwrap_or_else(|| panic!("no topic named {} in catalog", router_config.topic));
let query_id = txn
.query_pools()
.create_or_get(&router_config.query_pool_name)
.await
.map(|v| v.id)
.unwrap_or_else(|e| {
panic!(
"failed to upsert query pool {} in catalog: {}",
router_config.query_pool_name, e
)
});
let namespace_resolver = NamespaceAutocreation::new(
namespace_resolver,
Arc::clone(&ns_cache),
Arc::clone(&catalog),
topic_id,
query_id,
{
if router_config.namespace_autocreation_enabled {
MissingNamespaceAction::AutoCreate(
@ -394,7 +355,7 @@ pub async fn create_router2_server_type(
// Initialize the gRPC API delegate that creates the services relevant to the RPC
// write router path and use it to create the relevant `RpcWriteRouterServer` and
// `RpcWriteRouterServerType`.
let grpc = RpcWriteGrpcDelegate::new(catalog, object_store, topic_id, query_id);
let grpc = RpcWriteGrpcDelegate::new(catalog, object_store);
let router_server =
RpcWriteRouterServer::new(http, grpc, metrics, common_state.trace_collector());
@ -434,13 +395,7 @@ mod tests {
let catalog = Arc::new(MemCatalog::new(Default::default()));
let mut repos = catalog.repositories().await;
let topic = repos.topics().create_or_get("foo").await.unwrap();
let pool = repos.query_pools().create_or_get("foo").await.unwrap();
let namespace = repos
.namespaces()
.create("test_ns", None, topic.id, pool.id)
.await
.unwrap();
let namespace = repos.namespaces().create("test_ns", None).await.unwrap();
let table = repos
.tables()

View File

@ -94,8 +94,6 @@ fn partition_keys<'a>(
|col| Template::Column(col, name),
),
TemplatePart::TimeFormat(fmt) => Template::TimeFormat(time, StrftimeItems::new(fmt)),
TemplatePart::RegexCapture(_) => unimplemented!(),
TemplatePart::StrftimeColumn(_) => unimplemented!(),
})
.collect();

View File

@ -112,10 +112,8 @@ use datafusion::{
/// Internal implementations of the selector functions
mod internal;
use internal::{
BooleanFirstSelector, BooleanLastSelector, BooleanMaxSelector, BooleanMinSelector,
F64FirstSelector, F64LastSelector, F64MaxSelector, F64MinSelector, I64FirstSelector,
I64LastSelector, I64MaxSelector, I64MinSelector, U64FirstSelector, U64LastSelector,
U64MaxSelector, U64MinSelector, Utf8FirstSelector, Utf8LastSelector, Utf8MaxSelector,
BooleanMaxSelector, BooleanMinSelector, F64MaxSelector, F64MinSelector, FirstSelector,
I64MaxSelector, I64MinSelector, LastSelector, U64MaxSelector, U64MinSelector, Utf8MaxSelector,
Utf8MinSelector,
};
use schema::TIME_DATA_TYPE;
@ -255,40 +253,29 @@ impl FactoryBuilder {
let accumulator: Box<dyn Accumulator> = match (selector_type, value_type) {
// First
(SelectorType::First, DataType::Float64) => {
Box::new(SelectorAccumulator::<F64FirstSelector>::new())
(SelectorType::First, value_type) => {
Box::new(SelectorAccumulator::new(FirstSelector::new(value_type)?))
}
(SelectorType::First, DataType::Int64) => Box::new(SelectorAccumulator::<I64FirstSelector>::new()),
(SelectorType::First, DataType::UInt64) => Box::new(SelectorAccumulator::<U64FirstSelector>::new()),
(SelectorType::First, DataType::Utf8) => Box::new(SelectorAccumulator::<Utf8FirstSelector>::new()),
(SelectorType::First, DataType::Boolean) => Box::new(SelectorAccumulator::<BooleanFirstSelector>::new(
)),
// Last
(SelectorType::Last, DataType::Float64) => Box::new(SelectorAccumulator::<F64LastSelector>::new()),
(SelectorType::Last, DataType::Int64) => Box::new(SelectorAccumulator::<I64LastSelector>::new()),
(SelectorType::Last, DataType::UInt64) => Box::new(SelectorAccumulator::<U64LastSelector>::new()),
(SelectorType::Last, DataType::Utf8) => Box::new(SelectorAccumulator::<Utf8LastSelector>::new()),
(SelectorType::Last, DataType::Boolean) => {
Box::new(SelectorAccumulator::<BooleanLastSelector>::new())
},
(SelectorType::Last, data_type) => Box::new(SelectorAccumulator::new(LastSelector::new(data_type)?)),
// Min
(SelectorType::Min, DataType::Float64) => Box::new(SelectorAccumulator::<F64MinSelector>::new()),
(SelectorType::Min, DataType::Int64) => Box::new(SelectorAccumulator::<I64MinSelector>::new()),
(SelectorType::Min, DataType::UInt64) => Box::new(SelectorAccumulator::<U64MinSelector>::new()),
(SelectorType::Min, DataType::Utf8) => Box::new(SelectorAccumulator::<Utf8MinSelector>::new()),
(SelectorType::Min, DataType::Float64) => Box::new(SelectorAccumulator::new(F64MinSelector::default())),
(SelectorType::Min, DataType::Int64) => Box::new(SelectorAccumulator::new(I64MinSelector::default())),
(SelectorType::Min, DataType::UInt64) => Box::new(SelectorAccumulator::new(U64MinSelector::default())),
(SelectorType::Min, DataType::Utf8) => Box::new(SelectorAccumulator::new(Utf8MinSelector::default())),
(SelectorType::Min, DataType::Boolean) => {
Box::new(SelectorAccumulator::<BooleanMinSelector>::new())
Box::new(SelectorAccumulator::<>::new(BooleanMinSelector::default()))
},
// Max
(SelectorType::Max, DataType::Float64) => Box::new(SelectorAccumulator::<F64MaxSelector>::new()),
(SelectorType::Max, DataType::Int64) => Box::new(SelectorAccumulator::<I64MaxSelector>::new()),
(SelectorType::Max, DataType::UInt64) => Box::new(SelectorAccumulator::<U64MaxSelector>::new()),
(SelectorType::Max, DataType::Utf8) => Box::new(SelectorAccumulator::<Utf8MaxSelector>::new()),
(SelectorType::Max, DataType::Float64) => Box::new(SelectorAccumulator::new(F64MaxSelector::default())),
(SelectorType::Max, DataType::Int64) => Box::new(SelectorAccumulator::new(I64MaxSelector::default())),
(SelectorType::Max, DataType::UInt64) => Box::new(SelectorAccumulator::new(U64MaxSelector::default())),
(SelectorType::Max, DataType::Utf8) => Box::new(SelectorAccumulator::new(Utf8MaxSelector::default())),
(SelectorType::Max, DataType::Boolean) => {
Box::new(SelectorAccumulator::<BooleanMaxSelector>::new())
Box::new(SelectorAccumulator::new(BooleanMaxSelector::default()))
},
// Catch
(selector_type, value_type) => return Err(DataFusionError::Internal(format!(
@ -303,7 +290,7 @@ impl FactoryBuilder {
/// Implements the logic of the specific selector function (this is a
/// cutdown version of the Accumulator DataFusion trait, to allow
/// sharing between implementations)
trait Selector: Debug + Default + Send + Sync {
trait Selector: Debug + Send + Sync {
/// return state in a form that DataFusion can store during execution
fn datafusion_state(&self) -> DataFusionResult<Vec<ScalarValue>>;
@ -412,10 +399,8 @@ impl<SELECTOR> SelectorAccumulator<SELECTOR>
where
SELECTOR: Selector,
{
pub fn new() -> Self {
Self {
selector: SELECTOR::default(),
}
pub fn new(selector: SELECTOR) -> Self {
Self { selector }
}
}

View File

@ -17,7 +17,7 @@ use arrow::{
max as array_max, max_boolean as array_max_boolean, max_string as array_max_string,
min as array_min, min_boolean as array_min_boolean, min_string as array_min_string,
},
datatypes::{Field, Fields},
datatypes::{DataType, Field, Fields},
};
use datafusion::{error::Result as DataFusionResult, scalar::ScalarValue};
@ -116,219 +116,169 @@ fn make_scalar_struct(data_fields: Vec<ScalarValue>) -> ScalarValue {
ScalarValue::Struct(Some(data_fields), Fields::from(fields))
}
macro_rules! make_first_selector {
($STRUCTNAME:ident, $RUSTTYPE:ident, $ARRTYPE:ident, $MINFUNC:ident, $TO_SCALARVALUE: expr) => {
#[derive(Debug)]
pub struct $STRUCTNAME {
value: Option<$RUSTTYPE>,
time: Option<i64>,
}
impl Default for $STRUCTNAME {
fn default() -> Self {
Self {
value: None,
time: None,
}
}
}
impl Selector for $STRUCTNAME {
fn datafusion_state(&self) -> DataFusionResult<Vec<ScalarValue>> {
Ok(vec![
$TO_SCALARVALUE(self.value.clone()),
ScalarValue::TimestampNanosecond(self.time, None),
])
}
fn evaluate(&self) -> DataFusionResult<ScalarValue> {
Ok(make_scalar_struct(vec![
$TO_SCALARVALUE(self.value.clone()),
ScalarValue::TimestampNanosecond(self.time, None),
]))
}
fn update_batch(
&mut self,
value_arr: &ArrayRef,
time_arr: &ArrayRef,
) -> DataFusionResult<()> {
let value_arr = value_arr
.as_any()
.downcast_ref::<$ARRTYPE>()
// the input type arguments should be ensured by datafusion
.expect("First argument was value");
let time_arr = time_arr
.as_any()
.downcast_ref::<TimestampNanosecondArray>()
// the input type arguments should be ensured by datafusion
.expect("Second argument was time");
// Only look for times where the array also has a non
// null value (the time array should have no nulls itself)
//
// For example, for the following input, the correct
// current min time is 200 (not 100)
//
// value | time
// --------------
// NULL | 100
// A | 200
// B | 300
//
// Note this could likely be faster if we used `ArrayData` APIs
let time_arr: TimestampNanosecondArray = time_arr
.iter()
.zip(value_arr.iter())
.map(|(ts, value)| if value.is_some() { ts } else { None })
.collect();
let cur_min_time = $MINFUNC(&time_arr);
let need_update = match (&self.time, &cur_min_time) {
(Some(time), Some(cur_min_time)) => cur_min_time < time,
// No existing minimum, so update needed
(None, Some(_)) => true,
// No actual minimum time found, so no update needed
(_, None) => false,
};
if need_update {
let index = time_arr
.iter()
// arrow doesn't tell us what index had the
// minimum, so need to find it ourselves see also
// https://github.com/apache/arrow-datafusion/issues/600
.enumerate()
.find(|(_, time)| cur_min_time == *time)
.map(|(idx, _)| idx)
.unwrap(); // value always exists
self.time = cur_min_time;
self.value = if value_arr.is_null(index) {
None
} else {
Some(value_arr.value(index).to_owned())
};
}
Ok(())
}
fn size(&self) -> usize {
// no nested types
std::mem::size_of_val(self)
}
}
};
#[derive(Debug)]
pub struct FirstSelector {
value: ScalarValue,
time: Option<i64>,
}
macro_rules! make_last_selector {
($STRUCTNAME:ident, $RUSTTYPE:ident, $ARRTYPE:ident, $MAXFUNC:ident, $TO_SCALARVALUE: expr) => {
#[derive(Debug)]
pub struct $STRUCTNAME {
value: Option<$RUSTTYPE>,
time: Option<i64>,
impl FirstSelector {
pub fn new(data_type: &DataType) -> DataFusionResult<Self> {
Ok(Self {
value: ScalarValue::try_from(data_type)?,
time: None,
})
}
}
impl Selector for FirstSelector {
fn datafusion_state(&self) -> DataFusionResult<Vec<ScalarValue>> {
Ok(vec![
self.value.clone(),
ScalarValue::TimestampNanosecond(self.time, None),
])
}
fn evaluate(&self) -> DataFusionResult<ScalarValue> {
Ok(make_scalar_struct(vec![
self.value.clone(),
ScalarValue::TimestampNanosecond(self.time, None),
]))
}
fn update_batch(&mut self, value_arr: &ArrayRef, time_arr: &ArrayRef) -> DataFusionResult<()> {
// Only look for times where the array also has a non
// null value (the time array should have no nulls itself)
//
// For example, for the following input, the correct
// current min time is 200 (not 100)
//
// value | time
// --------------
// NULL | 100
// A | 200
// B | 300
//
let time_arr = arrow::compute::nullif(time_arr, &arrow::compute::is_null(&value_arr)?)?;
let time_arr = time_arr
.as_any()
.downcast_ref::<TimestampNanosecondArray>()
// the input type arguments should be ensured by datafusion
.expect("Second argument was time");
let cur_min_time = array_min(time_arr);
let need_update = match (&self.time, &cur_min_time) {
(Some(time), Some(cur_min_time)) => cur_min_time < time,
// No existing minimum, so update needed
(None, Some(_)) => true,
// No actual minimum time found, so no update needed
(_, None) => false,
};
if need_update {
let index = time_arr
.iter()
// arrow doesn't tell us what index had the
// minimum, so need to find it ourselves see also
// https://github.com/apache/arrow-datafusion/issues/600
.enumerate()
.find(|(_, time)| cur_min_time == *time)
.map(|(idx, _)| idx)
.unwrap(); // value always exists
self.time = cur_min_time;
self.value = ScalarValue::try_from_array(&value_arr, index)?;
}
impl Default for $STRUCTNAME {
fn default() -> Self {
Self {
value: None,
time: None,
}
}
Ok(())
}
fn size(&self) -> usize {
std::mem::size_of_val(self) - std::mem::size_of_val(&self.value) + self.value.size()
}
}
#[derive(Debug)]
pub struct LastSelector {
value: ScalarValue,
time: Option<i64>,
}
impl LastSelector {
pub fn new(data_type: &DataType) -> DataFusionResult<Self> {
Ok(Self {
value: ScalarValue::try_from(data_type)?,
time: None,
})
}
}
impl Selector for LastSelector {
fn datafusion_state(&self) -> DataFusionResult<Vec<ScalarValue>> {
Ok(vec![
self.value.clone(),
ScalarValue::TimestampNanosecond(self.time, None),
])
}
fn evaluate(&self) -> DataFusionResult<ScalarValue> {
Ok(make_scalar_struct(vec![
self.value.clone(),
ScalarValue::TimestampNanosecond(self.time, None),
]))
}
fn update_batch(&mut self, value_arr: &ArrayRef, time_arr: &ArrayRef) -> DataFusionResult<()> {
// Only look for times where the array also has a non
// null value (the time array should have no nulls itself)
//
// For example, for the following input, the correct
// current max time is 200 (not 300)
//
// value | time
// --------------
// A | 100
// B | 200
// NULL | 300
//
let time_arr = arrow::compute::nullif(time_arr, &arrow::compute::is_null(&value_arr)?)?;
let time_arr = time_arr
.as_any()
.downcast_ref::<TimestampNanosecondArray>()
// the input type arguments should be ensured by datafusion
.expect("Second argument was time");
let cur_max_time = array_max(time_arr);
let need_update = match (&self.time, &cur_max_time) {
(Some(time), Some(cur_max_time)) => time < cur_max_time,
// No existing maximum, so update needed
(None, Some(_)) => true,
// No actual maximum value found, so no update needed
(_, None) => false,
};
if need_update {
let index = time_arr
.iter()
// arrow doesn't tell us what index had the
// maximum, so need to find it ourselves
.enumerate()
.find(|(_, time)| cur_max_time == *time)
.map(|(idx, _)| idx)
.unwrap(); // value always exists
self.time = cur_max_time;
self.value = ScalarValue::try_from_array(&value_arr, index)?;
}
impl Selector for $STRUCTNAME {
fn datafusion_state(&self) -> DataFusionResult<Vec<ScalarValue>> {
Ok(vec![
$TO_SCALARVALUE(self.value.clone()),
ScalarValue::TimestampNanosecond(self.time, None),
])
}
Ok(())
}
fn evaluate(&self) -> DataFusionResult<ScalarValue> {
Ok(make_scalar_struct(vec![
$TO_SCALARVALUE(self.value.clone()),
ScalarValue::TimestampNanosecond(self.time, None),
]))
}
fn update_batch(
&mut self,
value_arr: &ArrayRef,
time_arr: &ArrayRef,
) -> DataFusionResult<()> {
let value_arr = value_arr
.as_any()
.downcast_ref::<$ARRTYPE>()
// the input type arguments should be ensured by datafusion
.expect("First argument was value");
let time_arr = time_arr
.as_any()
.downcast_ref::<TimestampNanosecondArray>()
// the input type arguments should be ensured by datafusion
.expect("Second argument was time");
// Only look for times where the array also has a non
// null value (the time array should have no nulls itself)
//
// For example, for the following input, the correct
// current max time is 200 (not 300)
//
// value | time
// --------------
// A | 100
// B | 200
// NULL | 300
//
// Note this could likely be faster if we used `ArrayData` APIs
let time_arr: TimestampNanosecondArray = time_arr
.iter()
.zip(value_arr.iter())
.map(|(ts, value)| if value.is_some() { ts } else { None })
.collect();
let cur_max_time = $MAXFUNC(&time_arr);
let need_update = match (&self.time, &cur_max_time) {
(Some(time), Some(cur_max_time)) => time < cur_max_time,
// No existing maximum, so update needed
(None, Some(_)) => true,
// No actual maximum value found, so no update needed
(_, None) => false,
};
if need_update {
let index = time_arr
.iter()
// arrow doesn't tell us what index had the
// maximum, so need to find it ourselves
.enumerate()
.find(|(_, time)| cur_max_time == *time)
.map(|(idx, _)| idx)
.unwrap(); // value always exists
self.time = cur_max_time;
self.value = if value_arr.is_null(index) {
None
} else {
Some(value_arr.value(index).to_owned())
};
}
Ok(())
}
fn size(&self) -> usize {
// no nested types
std::mem::size_of_val(self)
}
}
};
fn size(&self) -> usize {
std::mem::size_of_val(self) - std::mem::size_of_val(&self.value) + self.value.size()
}
}
/// Did we find a new min/max
@ -583,82 +533,6 @@ macro_rules! make_max_selector {
};
}
// FIRST
make_first_selector!(
F64FirstSelector,
f64,
Float64Array,
array_min,
ScalarValue::Float64
);
make_first_selector!(
I64FirstSelector,
i64,
Int64Array,
array_min,
ScalarValue::Int64
);
make_first_selector!(
U64FirstSelector,
u64,
UInt64Array,
array_min,
ScalarValue::UInt64
);
make_first_selector!(
Utf8FirstSelector,
String,
StringArray,
array_min,
ScalarValue::Utf8
);
make_first_selector!(
BooleanFirstSelector,
bool,
BooleanArray,
array_min,
ScalarValue::Boolean
);
// LAST
make_last_selector!(
F64LastSelector,
f64,
Float64Array,
array_max,
ScalarValue::Float64
);
make_last_selector!(
I64LastSelector,
i64,
Int64Array,
array_max,
ScalarValue::Int64
);
make_last_selector!(
U64LastSelector,
u64,
UInt64Array,
array_max,
ScalarValue::UInt64
);
make_last_selector!(
Utf8LastSelector,
String,
StringArray,
array_max,
ScalarValue::Utf8
);
make_last_selector!(
BooleanLastSelector,
bool,
BooleanArray,
array_max,
ScalarValue::Boolean
);
// MIN
make_min_selector!(

View File

@ -139,8 +139,7 @@ mod tests {
use assert_matches::assert_matches;
use data_types::{
Column, ColumnId, ColumnSchema, ColumnType, NamespaceId, QueryPoolId, TableId, TableSchema,
TopicId,
Column, ColumnId, ColumnSchema, ColumnType, NamespaceId, TableId, TableSchema,
};
use proptest::{prelude::*, prop_compose, proptest};
@ -162,8 +161,6 @@ mod tests {
let schema1 = NamespaceSchema {
id: TEST_NAMESPACE_ID,
topic_id: TopicId::new(24),
query_pool_id: QueryPoolId::new(1234),
tables: Default::default(),
max_columns_per_table: 50,
max_tables: 24,
@ -180,8 +177,6 @@ mod tests {
let schema2 = NamespaceSchema {
id: TEST_NAMESPACE_ID,
topic_id: TopicId::new(2),
query_pool_id: QueryPoolId::new(2),
tables: Default::default(),
max_columns_per_table: 10,
max_tables: 42,
@ -228,8 +223,6 @@ mod tests {
let schema_update_1 = NamespaceSchema {
id: NamespaceId::new(42),
topic_id: TopicId::new(76),
query_pool_id: QueryPoolId::new(64),
tables: BTreeMap::from([(String::from(table_name), first_write_table_schema)]),
max_columns_per_table: 50,
max_tables: 24,
@ -311,8 +304,6 @@ mod tests {
let schema_update_1 = NamespaceSchema {
id: NamespaceId::new(42),
topic_id: TopicId::new(76),
query_pool_id: QueryPoolId::new(64),
tables: BTreeMap::from([
(String::from("table_1"), table_1.to_owned()),
(String::from("table_2"), table_2.to_owned()),
@ -410,8 +401,6 @@ mod tests {
NamespaceSchema {
id: TEST_NAMESPACE_ID,
tables,
topic_id: TopicId::new(1), // Ignored
query_pool_id: QueryPoolId::new(1), // Ignored
max_columns_per_table,
max_tables,
retention_period_ns,

View File

@ -128,9 +128,7 @@ mod tests {
use std::collections::BTreeMap;
use assert_matches::assert_matches;
use data_types::{
ColumnId, ColumnSchema, ColumnType, NamespaceId, QueryPoolId, TableId, TableSchema, TopicId,
};
use data_types::{ColumnId, ColumnSchema, ColumnType, NamespaceId, TableId, TableSchema};
use metric::{Attributes, MetricObserver, Observation};
use super::*;
@ -168,8 +166,6 @@ mod tests {
NamespaceSchema {
id: NamespaceId::new(42),
topic_id: TopicId::new(24),
query_pool_id: QueryPoolId::new(1234),
tables,
max_columns_per_table: 100,
max_tables: 42,

View File

@ -98,7 +98,7 @@ where
#[cfg(test)]
mod tests {
use assert_matches::assert_matches;
use data_types::{NamespaceId, QueryPoolId, TopicId};
use data_types::NamespaceId;
use iox_catalog::mem::MemCatalog;
use super::*;
@ -120,8 +120,6 @@ mod tests {
// Place a schema in the cache for that name
let schema1 = NamespaceSchema::new(
NamespaceId::new(1),
TopicId::new(2),
QueryPoolId::new(3),
iox_catalog::DEFAULT_MAX_COLUMNS_PER_TABLE,
iox_catalog::DEFAULT_MAX_TABLES,
iox_catalog::DEFAULT_RETENTION_PERIOD,
@ -156,8 +154,6 @@ mod tests {
// Place a schema in the catalog for that name
let schema1 = NamespaceSchema::new(
NamespaceId::new(1),
TopicId::new(2),
QueryPoolId::new(3),
iox_catalog::DEFAULT_MAX_COLUMNS_PER_TABLE,
iox_catalog::DEFAULT_MAX_TABLES,
iox_catalog::DEFAULT_RETENTION_PERIOD,
@ -167,12 +163,7 @@ mod tests {
.repositories()
.await
.namespaces()
.create(
&ns,
iox_catalog::DEFAULT_RETENTION_PERIOD,
schema1.topic_id,
schema1.query_pool_id,
)
.create(&ns, iox_catalog::DEFAULT_RETENTION_PERIOD,)
.await,
Ok(_)
);

View File

@ -51,7 +51,7 @@ mod tests {
use std::{collections::HashMap, iter};
use assert_matches::assert_matches;
use data_types::{NamespaceId, QueryPoolId, TopicId};
use data_types::NamespaceId;
use rand::{distributions::Alphanumeric, thread_rng, Rng};
use super::*;
@ -70,8 +70,6 @@ mod tests {
fn schema_with_id(id: i64) -> NamespaceSchema {
NamespaceSchema {
id: NamespaceId::new(id),
topic_id: TopicId::new(1),
query_pool_id: QueryPoolId::new(1),
tables: Default::default(),
max_columns_per_table: 7,
max_tables: 42,

View File

@ -71,7 +71,7 @@ mod tests {
use std::sync::Arc;
use assert_matches::assert_matches;
use data_types::{NamespaceId, NamespaceSchema, QueryPoolId, TopicId};
use data_types::{NamespaceId, NamespaceSchema};
use iox_catalog::{
interface::{Catalog, SoftDeletedRows},
mem::MemCatalog,
@ -96,8 +96,6 @@ mod tests {
ns.clone(),
NamespaceSchema {
id: NamespaceId::new(42),
topic_id: TopicId::new(2),
query_pool_id: QueryPoolId::new(3),
tables: Default::default(),
max_columns_per_table: 4,
max_tables: 42,
@ -143,11 +141,9 @@ mod tests {
// Create the namespace in the catalog
{
let mut repos = catalog.repositories().await;
let topic = repos.topics().create_or_get("bananas").await.unwrap();
let query_pool = repos.query_pools().create_or_get("platanos").await.unwrap();
repos
.namespaces()
.create(&ns, None, topic.id, query_pool.id)
.create(&ns, None)
.await
.expect("failed to setup catalog state");
}
@ -177,11 +173,9 @@ mod tests {
// Create the namespace in the catalog and mark it as deleted
{
let mut repos = catalog.repositories().await;
let topic = repos.topics().create_or_get("bananas").await.unwrap();
let query_pool = repos.query_pools().create_or_get("platanos").await.unwrap();
repos
.namespaces()
.create(&ns, None, topic.id, query_pool.id)
.create(&ns, None)
.await
.expect("failed to setup catalog state");
repos

View File

@ -1,7 +1,7 @@
use std::{fmt::Debug, sync::Arc};
use async_trait::async_trait;
use data_types::{NamespaceId, NamespaceName, QueryPoolId, TopicId};
use data_types::{NamespaceId, NamespaceName};
use iox_catalog::interface::Catalog;
use observability_deps::tracing::*;
use thiserror::Error;
@ -43,8 +43,6 @@ pub struct NamespaceAutocreation<C, T> {
cache: C,
catalog: Arc<dyn Catalog>,
topic_id: TopicId,
query_id: QueryPoolId,
action: MissingNamespaceAction,
}
@ -53,7 +51,7 @@ impl<C, T> NamespaceAutocreation<C, T> {
/// namespace exists in `catalog`.
///
/// If the namespace does not exist, it is created with the specified
/// `topic_id`, `query_id` and `retention` policy.
/// `retention` policy.
///
/// Namespaces are looked up in `cache`, skipping the creation request to
/// the catalog if there's a hit.
@ -61,16 +59,12 @@ impl<C, T> NamespaceAutocreation<C, T> {
inner: T,
cache: C,
catalog: Arc<dyn Catalog>,
topic_id: TopicId,
query_id: QueryPoolId,
action: MissingNamespaceAction,
) -> Self {
Self {
inner,
cache,
catalog,
topic_id,
query_id,
action,
}
}
@ -113,12 +107,7 @@ where
.repositories()
.await
.namespaces()
.create(
namespace.as_str(),
retention_period_ns,
self.topic_id,
self.query_id,
)
.create(namespace.as_str(), retention_period_ns)
.await
{
Ok(_) => {
@ -178,8 +167,6 @@ mod tests {
ns.clone(),
NamespaceSchema {
id: NAMESPACE_ID,
topic_id: TopicId::new(2),
query_pool_id: QueryPoolId::new(3),
tables: Default::default(),
max_columns_per_table: 4,
max_tables: 42,
@ -191,8 +178,6 @@ mod tests {
MockNamespaceResolver::default().with_mapping(ns.clone(), NAMESPACE_ID),
cache,
Arc::clone(&catalog),
TopicId::new(42),
QueryPoolId::new(42),
MissingNamespaceAction::AutoCreate(TEST_RETENTION_PERIOD_NS),
);
@ -233,8 +218,6 @@ mod tests {
MockNamespaceResolver::default().with_mapping(ns.clone(), NamespaceId::new(1)),
cache,
Arc::clone(&catalog),
TopicId::new(42),
QueryPoolId::new(42),
MissingNamespaceAction::AutoCreate(TEST_RETENTION_PERIOD_NS),
);
@ -259,8 +242,6 @@ mod tests {
Namespace {
id: NamespaceId::new(1),
name: ns.to_string(),
topic_id: TopicId::new(42),
query_pool_id: QueryPoolId::new(42),
max_tables: iox_catalog::DEFAULT_MAX_TABLES,
max_columns_per_table: iox_catalog::DEFAULT_MAX_COLUMNS_PER_TABLE,
retention_period_ns: TEST_RETENTION_PERIOD_NS,
@ -284,8 +265,6 @@ mod tests {
MockNamespaceResolver::default(),
cache,
Arc::clone(&catalog),
TopicId::new(42),
QueryPoolId::new(42),
MissingNamespaceAction::Reject,
);
@ -324,8 +303,6 @@ mod tests {
NamespaceSchemaResolver::new(Arc::clone(&cache)),
Arc::clone(&cache),
Arc::clone(&catalog),
TopicId::new(42),
QueryPoolId::new(42),
MissingNamespaceAction::AutoCreate(TEST_RETENTION_PERIOD_NS),
);
@ -339,8 +316,6 @@ mod tests {
NamespaceSchemaResolver::new(Arc::clone(&cache)),
cache,
Arc::clone(&catalog),
TopicId::new(42),
QueryPoolId::new(42),
MissingNamespaceAction::Reject,
);

View File

@ -1,6 +1,5 @@
//! gRPC service implementations for `router`.
use data_types::{QueryPoolId, TopicId};
use generated_types::influxdata::iox::{catalog::v1::*, namespace::v1::*, object_store::v1::*};
use iox_catalog::interface::Catalog;
use object_store::DynObjectStore;
@ -15,25 +14,14 @@ use std::sync::Arc;
pub struct RpcWriteGrpcDelegate {
catalog: Arc<dyn Catalog>,
object_store: Arc<DynObjectStore>,
// Temporary values during kafka -> kafkaless transition.
topic_id: TopicId,
query_id: QueryPoolId,
}
impl RpcWriteGrpcDelegate {
/// Create a new gRPC handler
pub fn new(
catalog: Arc<dyn Catalog>,
object_store: Arc<DynObjectStore>,
topic_id: TopicId,
query_id: QueryPoolId,
) -> Self {
pub fn new(catalog: Arc<dyn Catalog>, object_store: Arc<DynObjectStore>) -> Self {
Self {
catalog,
object_store,
topic_id,
query_id,
}
}
@ -62,10 +50,6 @@ impl RpcWriteGrpcDelegate {
///
/// [`NamespaceService`]: generated_types::influxdata::iox::namespace::v1::namespace_service_server::NamespaceService.
pub fn namespace_service(&self) -> impl namespace_service_server::NamespaceService {
NamespaceService::new(
Arc::clone(&self.catalog),
Some(self.topic_id),
Some(self.query_id),
)
NamespaceService::new(Arc::clone(&self.catalog))
}
}

View File

@ -1,6 +1,6 @@
use std::{iter, string::String, sync::Arc, time::Duration};
use data_types::{PartitionTemplate, QueryPoolId, TableId, TemplatePart, TopicId};
use data_types::{PartitionTemplate, TableId, TemplatePart};
use generated_types::influxdata::iox::ingester::v1::WriteRequest;
use hashbrown::HashMap;
use hyper::{Body, Request, Response};
@ -24,14 +24,6 @@ use router::{
},
};
/// The topic catalog ID assigned by the namespace auto-creator in the
/// handler stack for namespaces it has not yet observed.
pub const TEST_TOPIC_ID: i64 = 1;
/// The query pool catalog ID assigned by the namespace auto-creator in the
/// handler stack for namespaces it has not yet observed.
pub const TEST_QUERY_POOL_ID: i64 = 1;
/// Common retention period value we'll use in tests
pub const TEST_RETENTION_PERIOD: Duration = Duration::from_secs(3600);
@ -168,8 +160,6 @@ impl TestContext {
namespace_resolver,
Arc::clone(&ns_cache),
Arc::clone(&catalog),
TopicId::new(TEST_TOPIC_ID),
QueryPoolId::new(TEST_QUERY_POOL_ID),
namespace_autocreation,
);
@ -193,12 +183,8 @@ impl TestContext {
write_request_unifier,
);
let grpc_delegate = RpcWriteGrpcDelegate::new(
Arc::clone(&catalog),
Arc::new(InMemory::default()),
TopicId::new(TEST_TOPIC_ID),
QueryPoolId::new(TEST_QUERY_POOL_ID),
);
let grpc_delegate =
RpcWriteGrpcDelegate::new(Arc::clone(&catalog), Arc::new(InMemory::default()));
Self {
client,

View File

@ -1,6 +1,6 @@
use crate::common::{TestContextBuilder, TEST_QUERY_POOL_ID, TEST_RETENTION_PERIOD, TEST_TOPIC_ID};
use crate::common::{TestContextBuilder, TEST_RETENTION_PERIOD};
use assert_matches::assert_matches;
use data_types::{ColumnType, QueryPoolId, TopicId};
use data_types::ColumnType;
use futures::{stream::FuturesUnordered, StreamExt};
use generated_types::influxdata::{iox::ingester::v1::WriteRequest, pbdata::v1::DatabaseBatch};
use hashbrown::HashMap;
@ -62,8 +62,6 @@ async fn test_write_ok() {
.expect("query should succeed")
.expect("namespace not found");
assert_eq!(ns.name, "bananas_test");
assert_eq!(ns.topic_id, TopicId::new(TEST_TOPIC_ID));
assert_eq!(ns.query_pool_id, QueryPoolId::new(TEST_QUERY_POOL_ID));
assert_eq!(ns.retention_period_ns, None);
// Ensure the metric instrumentation was hit
@ -272,12 +270,7 @@ async fn test_write_propagate_ids() {
.repositories()
.await
.namespaces()
.create(
"bananas_test",
None,
TopicId::new(TEST_TOPIC_ID),
QueryPoolId::new(TEST_QUERY_POOL_ID),
)
.create("bananas_test", None)
.await
.expect("failed to update table limit");
@ -359,12 +352,7 @@ async fn test_delete_unsupported() {
.repositories()
.await
.namespaces()
.create(
"bananas_test",
None,
TopicId::new(TEST_TOPIC_ID),
QueryPoolId::new(TEST_QUERY_POOL_ID),
)
.create("bananas_test", None)
.await
.expect("failed to update table limit");

View File

@ -31,7 +31,6 @@ export INFLUXDB_IOX_CATALOG_DSN="postgresql://postgres@localhost:5432/postgres"
export DATABASE_URL="${INFLUXDB_IOX_CATALOG_DSN}"
cargo sqlx database create
cargo run -q -- catalog setup
cargo run -q -- catalog topic update iox-shared
echo "Enjoy your database! Point IOx to it by running the following:"
echo "\$ export INFLUXDB_IOX_CATALOG_DSN=\"${DATABASE_URL}\""

View File

@ -212,15 +212,9 @@ mod tests {
let metrics = Arc::new(metric::Registry::default());
let catalog = Arc::new(MemCatalog::new(metrics));
let mut repos = catalog.repositories().await;
let topic = repos.topics().create_or_get("iox-shared").await.unwrap();
let pool = repos
.query_pools()
.create_or_get("iox-shared")
.await
.unwrap();
let namespace = repos
.namespaces()
.create("catalog_partition_test", None, topic.id, pool.id)
.create("catalog_partition_test", None)
.await
.unwrap();
let table = repos
@ -281,15 +275,9 @@ mod tests {
let metrics = Arc::new(metric::Registry::default());
let catalog = Arc::new(MemCatalog::new(metrics));
let mut repos = catalog.repositories().await;
let topic = repos.topics().create_or_get("iox-shared").await.unwrap();
let pool = repos
.query_pools()
.create_or_get("iox-shared")
.await
.unwrap();
let namespace = repos
.namespaces()
.create("catalog_partition_test", None, topic.id, pool.id)
.create("catalog_partition_test", None)
.await
.unwrap();
let table = repos

View File

@ -1,7 +1,7 @@
//! Implementation of the namespace gRPC service
use std::sync::Arc;
use data_types::{Namespace as CatalogNamespace, NamespaceName, QueryPoolId, TopicId};
use data_types::{Namespace as CatalogNamespace, NamespaceName};
use generated_types::influxdata::iox::namespace::v1::{
update_namespace_service_protection_limit_request::LimitUpdate, *,
};
@ -14,21 +14,11 @@ use tonic::{Request, Response, Status};
pub struct NamespaceService {
/// Catalog.
catalog: Arc<dyn Catalog>,
topic_id: Option<TopicId>,
query_id: Option<QueryPoolId>,
}
impl NamespaceService {
pub fn new(
catalog: Arc<dyn Catalog>,
topic_id: Option<TopicId>,
query_id: Option<QueryPoolId>,
) -> Self {
Self {
catalog,
topic_id,
query_id,
}
pub fn new(catalog: Arc<dyn Catalog>) -> Self {
Self { catalog }
}
}
@ -58,10 +48,6 @@ impl namespace_service_server::NamespaceService for NamespaceService {
&self,
request: Request<CreateNamespaceRequest>,
) -> Result<Response<CreateNamespaceResponse>, Status> {
if self.topic_id.is_none() || self.query_id.is_none() {
return Err(Status::invalid_argument("topic_id or query_id not set"));
}
let mut repos = self.catalog.repositories().await;
let CreateNamespaceRequest {
@ -80,12 +66,7 @@ impl namespace_service_server::NamespaceService for NamespaceService {
let namespace = repos
.namespaces()
.create(
&namespace_name,
retention_period_ns,
self.topic_id.unwrap(),
self.query_id.unwrap(),
)
.create(&namespace_name, retention_period_ns)
.await
.map_err(|e| {
warn!(error=%e, %namespace_name, "failed to create namespace");
@ -349,22 +330,7 @@ mod tests {
let catalog: Arc<dyn Catalog> =
Arc::new(MemCatalog::new(Arc::new(metric::Registry::default())));
let topic = catalog
.repositories()
.await
.topics()
.create_or_get("kafka-topic")
.await
.unwrap();
let query_pool = catalog
.repositories()
.await
.query_pools()
.create_or_get("query-pool")
.await
.unwrap();
let handler = NamespaceService::new(catalog, Some(topic.id), Some(query_pool.id));
let handler = NamespaceService::new(catalog);
// There should be no namespaces to start with.
{
@ -499,22 +465,7 @@ mod tests {
let catalog: Arc<dyn Catalog> =
Arc::new(MemCatalog::new(Arc::new(metric::Registry::default())));
let topic = catalog
.repositories()
.await
.topics()
.create_or_get("kafka-topic")
.await
.unwrap();
let query_pool = catalog
.repositories()
.await
.query_pools()
.create_or_get("query-pool")
.await
.unwrap();
let handler = NamespaceService::new(catalog, Some(topic.id), Some(query_pool.id));
let handler = NamespaceService::new(catalog);
let req = CreateNamespaceRequest {
name: NS_NAME.to_string(),
retention_period_ns: Some(RETENTION),
@ -572,22 +523,7 @@ mod tests {
let catalog: Arc<dyn Catalog> =
Arc::new(MemCatalog::new(Arc::new(metric::Registry::default())));
let topic = catalog
.repositories()
.await
.topics()
.create_or_get("kafka-topic")
.await
.unwrap();
let query_pool = catalog
.repositories()
.await
.query_pools()
.create_or_get("query-pool")
.await
.unwrap();
let handler = NamespaceService::new(catalog, Some(topic.id), Some(query_pool.id));
let handler = NamespaceService::new(catalog);
let req = CreateNamespaceRequest {
name: String::from($name),

View File

@ -110,15 +110,9 @@ mod tests {
let metrics = Arc::new(metric::Registry::default());
let catalog = Arc::new(MemCatalog::new(metrics));
let mut repos = catalog.repositories().await;
let topic = repos.topics().create_or_get("iox-shared").await.unwrap();
let pool = repos
.query_pools()
.create_or_get("iox-shared")
.await
.unwrap();
let namespace = repos
.namespaces()
.create("catalog_partition_test", None, topic.id, pool.id)
.create("catalog_partition_test", None)
.await
.unwrap();
let table = repos

View File

@ -48,8 +48,6 @@ fn schema_to_proto(schema: Arc<data_types::NamespaceSchema>) -> GetSchemaRespons
let response = GetSchemaResponse {
schema: Some(NamespaceSchema {
id: schema.id.get(),
topic_id: schema.topic_id.get(),
query_pool_id: schema.query_pool_id.get(),
tables: schema
.tables
.iter()
@ -95,11 +93,9 @@ mod tests {
let metrics = Arc::new(metric::Registry::default());
let catalog = Arc::new(MemCatalog::new(metrics));
let mut repos = catalog.repositories().await;
let topic = repos.topics().create_or_get("franz").await.unwrap();
let pool = repos.query_pools().create_or_get("franz").await.unwrap();
let namespace = repos
.namespaces()
.create("namespace_schema_test", None, topic.id, pool.id)
.create("namespace_schema_test", None)
.await
.unwrap();
let table = repos

View File

@ -40,17 +40,5 @@ pub async fn initialize_db(dsn: &str, schema_name: &str) {
.ok()
.unwrap();
// Create the shared topic in the catalog
Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("catalog")
.arg("topic")
.arg("update")
.arg("iox-shared")
.env("INFLUXDB_IOX_CATALOG_DSN", dsn)
.env("INFLUXDB_IOX_CATALOG_POSTGRES_SCHEMA_NAME", schema_name)
.ok()
.unwrap();
init.insert(schema_name.into());
}