Merge pull request #6127 from influxdata/cn/database

fix: Rename more instances of "database" to "namespace"
pull/24376/head
kodiakhq[bot] 2022-11-11 21:21:06 +00:00 committed by GitHub
commit e7ac4eb7fe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
57 changed files with 289 additions and 310 deletions

View File

@ -57,10 +57,9 @@ ENV PACKAGE=$PACKAGE
COPY --from=build "/root/$PACKAGE" "/usr/bin/$PACKAGE"
COPY docker/entrypoint.sh /usr/bin/entrypoint.sh
ENV INFLUXDB_IOX_SERVER_MODE=database
EXPOSE 8080 8082
ENTRYPOINT ["/usr/bin/entrypoint.sh"]
CMD ["run", "$INFLUXDB_IOX_SERVER_MODE"]
CMD ["run"]

View File

@ -235,7 +235,7 @@ See [docs/testing.md] for more information
Data can be written to InfluxDB IOx by sending [line protocol] format to the `/api/v2/write` endpoint or using the CLI.
For example, assuming you are running in local mode, this command will send data in the `test_fixtures/lineproto/metrics.lp` file to the `company_sensors` database.
For example, assuming you are running in local mode, this command will send data in the `test_fixtures/lineproto/metrics.lp` file to the `company_sensors` namespace.
```shell
./target/debug/influxdb_iox -vv write company_sensors test_fixtures/lineproto/metrics.lp --host http://localhost:8080
@ -243,7 +243,7 @@ For example, assuming you are running in local mode, this command will send data
Note that `--host http://localhost:8080` is required as the `/v2/api` endpoint is hosted on port `8080` while the default is the querier gRPC port `8082`.
To query the data stored in the `company_sensors` database:
To query the data stored in the `company_sensors` namespace:
```shell
./target/debug/influxdb_iox query company_sensors "SELECT * FROM cpu LIMIT 10"

View File

@ -186,7 +186,7 @@ impl From<DmlDelete> for DmlOperation {
}
}
/// A collection of writes to potentially multiple tables within the same database
/// A collection of writes to potentially multiple tables within the same namespace
#[derive(Debug, Clone)]
pub struct DmlWrite {
/// The namespace being written to

View File

@ -62,7 +62,7 @@ Connected to IOx Server
Set output format format to pretty
Ready for commands. (Hint: try 'help;')
> use 26f7e5a4b7be365b_917b97a92e883afc;
You are now in remote mode, querying database 26f7e5a4b7be365b_917b97a92e883afc
You are now in remote mode, querying namespace 26f7e5a4b7be365b_917b97a92e883afc
26f7e5a4b7be365b_917b97a92e883afc> select count(*) from cpu;
+-----------------+
| COUNT(UInt8(1)) |

View File

@ -7,7 +7,7 @@
# The full list of available configuration values can be found by in
# the command line help (e.g. `env: INFLUXDB_IOX_DB_DIR=`):
#
# ./influxdb_iox run database --help
# ./influxdb_iox run --help
#
#
# The identifier for the server. Used for writing to object storage and as

View File

@ -18,6 +18,6 @@ The rationale for this assumption is to enable IOx to be operated in a container
This configuration is used when running IOx on a "plain old server" operating at the edge as well as for local testing.
In this configuration, IOx assumes the contents of the local file system are preserved at least as long as the life of the IOx Database and that external measures are taken to backup or otherwise manage this.
In this configuration, IOx assumes the contents of the local file system are preserved at least as long as the life of the IOx instance and that external measures are taken to backup or otherwise manage this.
In other words, unsurprisingly, when using the local filesystem as object storage, the durability of the data in IOx is tied to the durability of the filesystem.

View File

@ -2,7 +2,7 @@
## Background
Observability context is how a component exposes metrics, traces, logs, panics, etc... in a way that places them in the context of the wider system. Most commonly this might be the database name, but might also be the table name, chunk ID, etc... Crucially this information may not be relevant to the component's primary function, e.g. if we never needed to observe `Db` it wouldn't need to know the database name at all, as only `Server` would need to know
Observability context is how a component exposes metrics, traces, logs, panics, etc... in a way that places them in the context of the wider system. Most commonly this might be the namespace name, but might also be the table name, chunk ID, etc... Crucially this information may not be relevant to the component's primary function, e.g. if we never needed to observe `Db` it wouldn't need to know the namespace name at all, as only `Server` would need to know
Broadly speaking there are 3 approaches to how to inject this context:
@ -12,14 +12,13 @@ Broadly speaking there are 3 approaches to how to inject this context:
Effectively the 3 trade-off between "polluting" the object or the callsites.
To give a concrete example, from a purely logic perspective the catalog does not need to know the database name, only the path
to the object store. However, it is helpful for logs, metrics, etc... to be in terms of database name.
To give a concrete example, from a purely logic perspective the catalog does not need to know the namespace name, only the path to the object store. However, it is helpful for logs, metrics, etc... to be in terms of namespace name.
The three approaches would therefore be
1. Inject database name on construction onto the catalog object
2. Inject metrics, logs, etc... wrappers that carry the database name context internally
3. Inject database name at every call site, either explicitly passing it as an argument, or implicitly by wrapping in a span, mapping the returned error, etc...
1. Inject namespace name on construction onto the catalog object
2. Inject metrics, logs, etc... wrappers that carry the namespace name context internally
3. Inject namespace name at every call site, either explicitly passing it as an argument, or implicitly by wrapping in a span, mapping the returned error, etc...
## Outcome

View File

@ -131,7 +131,7 @@ In this section, IOx specific SQL tables, commands, and extensions are documente
## System Tables
In addition to the SQL standard `information_schema`, IOx contains several *system tables* that provide access to IOx specific information. The information in each system table is scoped to that particular database. Cross database queries are not possible due to the design of IOx's security model.
In addition to the SQL standard `information_schema`, IOx contains several *system tables* that provide access to IOx specific information. The information in each system table is scoped to that particular namespace. Cross namespace queries are not possible due to the design of IOx's security model.
### `system.queries`
`system.queries` contains information about queries run against this IOx instance

View File

@ -88,14 +88,14 @@ set.
### Configuration differences when running the tests
When running `influxdb_iox run database`, you can pick one object store to use. When running the tests,
you can run them against all the possible object stores. There's still only one
`INFLUXDB_IOX_BUCKET` variable, though, so that will set the bucket name for all configured object
stores. Use the same bucket name when setting up the different services.
When running `influxdb_iox run`, you can pick one object store to use. When running the tests, you
can run them against all the possible object stores. There's still only one `INFLUXDB_IOX_BUCKET`
variable, though, so that will set the bucket name for all configured object stores. Use the same
bucket name when setting up the different services.
Other than possibly configuring multiple object stores, configuring the tests to use the object
store services is the same as configuring the server to use an object store service. See the output
of `influxdb_iox run database --help` for instructions.
of `influxdb_iox run --help` for instructions.
## InfluxDB 2 Client

View File

@ -4,7 +4,7 @@ option go_package = "github.com/influxdata/iox/querier/v1";
// Request body for ticket in "end-user to querier" flight requests.
message ReadInfo {
// Namespace(/database) name.
// Namespace name.
string namespace_name = 1;
// SQL query.

View File

@ -11,14 +11,15 @@ pub enum Error {
ClientError(#[from] influxdb_iox_client::error::Error),
}
/// Write data into the specified database
/// Update the specified namespace's data retention period
#[derive(Debug, clap::Parser)]
pub struct Config {
/// The namespace to update the retention period for
#[clap(action)]
namespace: String,
/// Num of hours of the retention period of this namespace. Default is 0 representing infinite retention
/// Num of hours of the retention period of this namespace. Default is 0 representing infinite
/// retention
#[clap(action, long, short = 'c', default_value = "0")]
retention_hours: u32,
}

View File

@ -1,26 +1,23 @@
pub(crate) mod request;
pub(crate) mod response;
use std::num::NonZeroU64;
use std::time::Duration;
use snafu::{ResultExt, Snafu};
use tonic::Status;
use generated_types::{
aggregate::AggregateType, influxdata::platform::storage::read_group_request::Group, Predicate,
};
use influxdb_storage_client::{connection::Connection, Client, OrgAndBucket};
use influxrpc_parser::predicate;
use iox_time;
use snafu::{ensure, OptionExt, ResultExt, Snafu};
use std::{num::NonZeroU64, time::Duration};
use tonic::Status;
#[derive(Debug, Snafu)]
pub enum ParseError {
#[snafu(display("unable to parse timestamp '{:?}'", t))]
Timestamp { t: String },
#[snafu(display("unable to parse database name '{:?}'", db_name))]
DBName { db_name: String },
#[snafu(display("unable to parse namespace name '{:?}'", db_name))]
NamespaceName { db_name: String },
#[snafu(display("unable to parse predicate: {:?}", source))]
Predicate { source: predicate::Error },
@ -55,9 +52,9 @@ pub struct Config {
#[clap(subcommand)]
command: Command,
/// The name of the database
/// The name of the namespace
#[clap(
value_parser = parse_db_name,
value_parser = parse_namespace_name,
)]
db_name: OrgAndBucket,
@ -121,31 +118,23 @@ fn parse_predicate(expr: &str) -> Result<Predicate, ParseError> {
predicate::expr_to_rpc_predicate(expr).context(PredicateSnafu)
}
// Attempts to parse the database name into and org and bucket ID.
fn parse_db_name(db_name: &str) -> Result<OrgAndBucket, ParseError> {
// Attempts to parse the namespace name into and org and bucket ID.
fn parse_namespace_name(db_name: &str) -> Result<OrgAndBucket, ParseError> {
let parts = db_name.split('_').collect::<Vec<_>>();
if parts.len() != 2 {
return DBNameSnafu {
db_name: db_name.to_owned(),
}
.fail();
}
let org_id = usize::from_str_radix(parts[0], 16).map_err(|_| ParseError::DBName {
db_name: db_name.to_owned(),
})?;
ensure!(parts.len() == 2, NamespaceNameSnafu { db_name });
let bucket_id = usize::from_str_radix(parts[1], 16).map_err(|_| ParseError::DBName {
db_name: db_name.to_owned(),
})?;
let org_id = usize::from_str_radix(parts[0], 16)
.ok()
.context(NamespaceNameSnafu { db_name })?;
let bucket_id = usize::from_str_radix(parts[1], 16)
.ok()
.context(NamespaceNameSnafu { db_name })?;
Ok(OrgAndBucket::new(
NonZeroU64::new(org_id as u64).ok_or_else(|| ParseError::DBName {
db_name: db_name.to_owned(),
})?,
NonZeroU64::new(bucket_id as u64).ok_or_else(|| ParseError::DBName {
db_name: db_name.to_owned(),
})?,
NonZeroU64::new(org_id as u64).context(NamespaceNameSnafu { db_name })?,
NonZeroU64::new(bucket_id as u64).context(NamespaceNameSnafu { db_name })?,
))
}

View File

@ -47,7 +47,7 @@ pub enum Error {
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Write data into the specified database
/// Write data into the specified namespace
#[derive(Debug, clap::Parser)]
pub struct Config {
/// If specified, restricts the maxium amount of line protocol

View File

@ -115,9 +115,9 @@ Command are generally structured in the form:
<type of object> <action> <arguments>
For example, a command such as the following shows all actions
available for database chunks, including get and list.
available for namespaces, including `list` and `retention`.
influxdb_iox database chunk --help
influxdb_iox namespace --help
"#
)]
struct Config {
@ -184,13 +184,13 @@ enum Command {
/// Various commands for compactor manipulation
Compactor(Box<commands::compactor::Config>),
/// Interrogate internal database data
/// Interrogate internal data
Debug(commands::debug::Config),
/// Initiate a read request to the gRPC storage service.
Storage(commands::storage::Config),
/// Write data into the specified database
/// Write data into the specified namespace
Write(commands::write::Config),
/// Query the data with SQL

View File

@ -110,7 +110,7 @@ where
}
}
/// Query the given database with the given SQL query, and return a [`PerformQuery`] instance
/// Query the given namespace with the given SQL query, and return a [`PerformQuery`] instance
/// that streams low-level message results.
pub async fn perform_query(&mut self, request: T) -> Result<PerformQuery<T::Response>, Error> {
PerformQuery::<T::Response>::new(self, request).await

View File

@ -21,8 +21,7 @@ pub use low_level::{Client as LowLevelClient, PerformQuery as LowLevelPerformQue
use self::low_level::LowLevelMessage;
/// Error responses when querying an IOx database using the Arrow Flight gRPC
/// API.
/// Error responses when querying an IOx namespace using the Arrow Flight gRPC API.
#[derive(Debug, Error)]
pub enum Error {
/// There were no FlightData messages returned when we expected to get one
@ -124,7 +123,7 @@ impl Client {
}
}
/// Query the given database with the given SQL query, and return a
/// Query the given namespace with the given SQL query, and return a
/// [`PerformQuery`] instance that streams Arrow `RecordBatch` results.
pub async fn perform_query(&mut self, request: ReadInfo) -> Result<PerformQuery, Error> {
PerformQuery::new(self, request).await

View File

@ -101,7 +101,7 @@ impl Client {
/// Write the [LineProtocol] formatted string in `lp_data` to
/// namespace `namespace`.
///
/// Returns the number of bytes which were written to the database
/// Returns the number of bytes which were written to the namespace.
///
/// [LineProtocol]: https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/#data-types-and-format
pub async fn write_lp(
@ -119,7 +119,7 @@ impl Client {
/// individual lines (points) do not cross these strings
///
/// Returns the number of bytes, in total, which were written to
/// the database
/// the namespace.
///
/// [LineProtocol]: https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/#data-types-and-format
pub async fn write_lp_stream(

View File

@ -36,10 +36,9 @@ use ::generated_types::google::protobuf::*;
use observability_deps::tracing::{debug, trace};
use std::num::NonZeroU64;
/// InfluxDB IOx deals with database names. The gRPC interface deals
/// with org_id and bucket_id represented as 16 digit hex
/// values. This struct manages creating the org_id, bucket_id,
/// and database names to be consistent with the implementation
/// InfluxDB IOx deals with namespace names. The gRPC interface deals with org_id and bucket_id
/// represented as 16 digit hex values. This struct manages creating the org_id, bucket_id, and
/// namespace names to be consistent with the implementation.
#[derive(Debug, Clone)]
pub struct OrgAndBucket {
org_id: NonZeroU64,

View File

@ -416,7 +416,7 @@ pub trait ShardRepo: Send + Sync {
}
/// Functions for working with IOx partitions in the catalog. Note that these are how IOx splits up
/// data within a database, which is different than Kafka partitions.
/// data within a namespace, which is different than Kafka partitions.
#[async_trait]
pub trait PartitionRepo: Send + Sync {
/// create or get a partition record for the given partition key, shard and table

View File

@ -11,7 +11,7 @@ use crate::{
seriesset::{SeriesSetPlan, SeriesSetPlans},
stringset::{Error as StringSetError, StringSetPlan, StringSetPlanBuilder},
},
QueryChunk, QueryDatabase,
QueryChunk, QueryNamespace,
};
use arrow::datatypes::DataType;
use data_types::ChunkId;
@ -226,7 +226,7 @@ impl InfluxRpcPlanner {
/// . chunks without deleted data but cannot be decided from meta data
pub async fn table_names(
&self,
database: Arc<dyn QueryDatabase>,
namespace: Arc<dyn QueryNamespace>,
rpc_predicate: InfluxRpcPredicate,
) -> Result<StringSetPlan> {
let ctx = self.ctx.child_ctx("table_names planning");
@ -236,10 +236,10 @@ impl InfluxRpcPlanner {
let rpc_predicate = rpc_predicate.clear_timestamp_if_max_range();
let table_predicates = rpc_predicate
.table_predicates(database.as_meta())
.table_predicates(namespace.as_meta())
.context(CreatingPredicatesSnafu)?;
let tables: Vec<_> =
table_chunk_stream(Arc::clone(&database), false, &table_predicates, &ctx)
table_chunk_stream(Arc::clone(&namespace), false, &table_predicates, &ctx)
.try_filter_map(|(table_name, predicate, chunks)| async move {
// Identify which chunks can answer from its metadata and then record its table,
// and which chunks needs full plan and group them into their table
@ -291,7 +291,7 @@ impl InfluxRpcPlanner {
builder.append_string(table_name.to_string());
}
Some((predicate, chunks)) => {
let schema = database
let schema = namespace
.table_schema(table_name)
.context(TableRemovedSnafu {
table_name: table_name.as_ref(),
@ -315,13 +315,12 @@ impl InfluxRpcPlanner {
builder.build().context(CreatingStringSetSnafu)
}
/// Returns a set of plans that produces the names of "tag"
/// columns (as defined in the InfluxDB Data model) names in this
/// database that have more than zero rows which pass the
/// conditions specified by `predicate`.
/// Returns a set of plans that produces the names of "tag" columns (as defined in the InfluxDB
/// data model) names in this namespace that have more than zero rows which pass the conditions
/// specified by `predicate`.
pub async fn tag_keys(
&self,
database: Arc<dyn QueryDatabase>,
namespace: Arc<dyn QueryNamespace>,
rpc_predicate: InfluxRpcPredicate,
) -> Result<StringSetPlan> {
let ctx = self.ctx.child_ctx("tag_keys planning");
@ -334,11 +333,11 @@ impl InfluxRpcPlanner {
//
// 1. Find all the potential tables in the chunks
//
// 2. For each table/chunk pair, figure out which can be found
// from only metadata and which need full plans
// 2. For each table/chunk pair, figure out which can be found from only metadata and which
// need full plans
let table_predicates = rpc_predicate
.table_predicates(database.as_meta())
.table_predicates(namespace.as_meta())
.context(CreatingPredicatesSnafu)?;
let mut table_predicates_need_chunks = vec![];
@ -348,7 +347,7 @@ impl InfluxRpcPlanner {
// special case - return the columns from metadata only.
// Note that columns with all rows deleted will still show here
builder = builder.append_other(
database
namespace
.table_schema(&table_name)
.context(TableRemovedSnafu {
table_name: table_name.as_ref(),
@ -364,7 +363,7 @@ impl InfluxRpcPlanner {
}
let tables: Vec<_> = table_chunk_stream(
Arc::clone(&database),
Arc::clone(&namespace),
false,
&table_predicates_need_chunks,
&ctx,
@ -458,7 +457,7 @@ impl InfluxRpcPlanner {
// out chunks (and tables) where all columns in that chunk
// were already known to have data (based on the contents of known_columns)
let schema = database
let schema = namespace
.table_schema(table_name)
.context(TableRemovedSnafu {
table_name: table_name.as_ref(),
@ -478,12 +477,11 @@ impl InfluxRpcPlanner {
builder.build().context(CreatingStringSetSnafu)
}
/// Returns a plan which finds the distinct, non-null tag values
/// in the specified `tag_name` column of this database which pass
/// the conditions specified by `predicate`.
/// Returns a plan which finds the distinct, non-null tag values in the specified `tag_name`
/// column of this namespace which pass the conditions specified by `predicate`.
pub async fn tag_values(
&self,
database: Arc<dyn QueryDatabase>,
namespace: Arc<dyn QueryNamespace>,
tag_name: &str,
rpc_predicate: InfluxRpcPredicate,
) -> Result<StringSetPlan> {
@ -499,14 +497,14 @@ impl InfluxRpcPlanner {
// which need full plans
let table_predicates = rpc_predicate
.table_predicates(database.as_meta())
.table_predicates(namespace.as_meta())
.context(CreatingPredicatesSnafu)?;
// filter out tables that do NOT contain `tag_name` early, esp. before performing any chunk scan (which includes
// ingester RPC)
// filter out tables that do NOT contain `tag_name` early, esp. before performing any chunk
// scan (which includes ingester RPC)
let mut table_predicates_filtered = Vec::with_capacity(table_predicates.len());
for (table_name, predicate) in table_predicates {
let schema = database
let schema = namespace
.table_schema(&table_name)
.context(TableRemovedSnafu {
table_name: table_name.as_ref(),
@ -521,7 +519,7 @@ impl InfluxRpcPlanner {
}
let tables: Vec<_> = table_chunk_stream(
Arc::clone(&database),
Arc::clone(&namespace),
false,
&table_predicates_filtered,
&ctx,
@ -546,8 +544,8 @@ impl InfluxRpcPlanner {
let schema = chunk.schema();
// Skip this table if the tag_name is not a column in this chunk
// Note: This may happen even when the table contains the tag_name, because some chunks may not
// contain all columns.
// Note: This may happen even when the table contains the tag_name, because some
// chunks may not contain all columns.
let idx = if let Some(idx) = schema.find_index_of(tag_name) {
idx
} else {
@ -571,8 +569,8 @@ impl InfluxRpcPlanner {
}
);
// If there are delete predicates, we need to scan (or do full plan) the data to eliminate
// deleted data before getting tag values
// If there are delete predicates, we need to scan (or do full plan) the data to
// eliminate deleted data before getting tag values
if chunk.has_delete_predicates() {
debug!(
%table_name,
@ -628,7 +626,7 @@ impl InfluxRpcPlanner {
builder = builder.append_other(known_values.into());
if !chunks_full.is_empty() {
let schema = database
let schema = namespace
.table_schema(table_name)
.context(TableRemovedSnafu {
table_name: table_name.as_ref(),
@ -670,13 +668,12 @@ impl InfluxRpcPlanner {
builder.build().context(CreatingStringSetSnafu)
}
/// Returns a plan that produces a list of columns and their
/// datatypes (as defined in the data written via `write_lines`),
/// and which have more than zero rows which pass the conditions
/// Returns a plan that produces a list of columns and their datatypes (as defined in the data
/// written via `write_lines`), and which have more than zero rows which pass the conditions
/// specified by `predicate`.
pub async fn field_columns(
&self,
database: Arc<dyn QueryDatabase>,
namespace: Arc<dyn QueryNamespace>,
rpc_predicate: InfluxRpcPredicate,
) -> Result<FieldListPlan> {
let ctx = self.ctx.child_ctx("field_columns planning");
@ -692,7 +689,7 @@ impl InfluxRpcPlanner {
// values and stops the plan executing once it has them
let table_predicates = rpc_predicate
.table_predicates(database.as_meta())
.table_predicates(namespace.as_meta())
.context(CreatingPredicatesSnafu)?;
// optimization: just get the field columns from metadata.
@ -701,7 +698,7 @@ impl InfluxRpcPlanner {
let mut table_predicates_need_chunks = Vec::with_capacity(table_predicates.len());
for (table_name, predicate) in table_predicates {
if predicate.is_empty() {
let schema = database
let schema = namespace
.table_schema(&table_name)
.context(TableRemovedSnafu {
table_name: table_name.as_ref(),
@ -721,7 +718,7 @@ impl InfluxRpcPlanner {
// full scans
let plans = create_plans(
database,
namespace,
&table_predicates_need_chunks,
ctx,
|ctx, table_name, predicate, chunks, schema| {
@ -762,18 +759,18 @@ impl InfluxRpcPlanner {
/// same) occur together in the plan
pub async fn read_filter(
&self,
database: Arc<dyn QueryDatabase>,
namespace: Arc<dyn QueryNamespace>,
rpc_predicate: InfluxRpcPredicate,
) -> Result<SeriesSetPlans> {
let ctx = self.ctx.child_ctx("planning_read_filter");
debug!(?rpc_predicate, "planning read_filter");
let table_predicates = rpc_predicate
.table_predicates(database.as_meta())
.table_predicates(namespace.as_meta())
.context(CreatingPredicatesSnafu)?;
let plans = create_plans(
database,
namespace,
&table_predicates,
ctx,
|ctx, table_name, predicate, chunks, schema| {
@ -813,7 +810,7 @@ impl InfluxRpcPlanner {
/// (apply filters)
pub async fn read_group(
&self,
database: Arc<dyn QueryDatabase>,
namespace: Arc<dyn QueryNamespace>,
rpc_predicate: InfluxRpcPredicate,
agg: Aggregate,
group_columns: &[impl AsRef<str> + Send + Sync],
@ -822,11 +819,11 @@ impl InfluxRpcPlanner {
debug!(?rpc_predicate, ?agg, "planning read_group");
let table_predicates = rpc_predicate
.table_predicates(database.as_meta())
.table_predicates(namespace.as_meta())
.context(CreatingPredicatesSnafu)?;
let plans = create_plans(
database,
namespace,
&table_predicates,
ctx,
|ctx, table_name, predicate, chunks, schema| match agg {
@ -863,7 +860,7 @@ impl InfluxRpcPlanner {
/// that are grouped by window definitions
pub async fn read_window_aggregate(
&self,
database: Arc<dyn QueryDatabase>,
namespace: Arc<dyn QueryNamespace>,
rpc_predicate: InfluxRpcPredicate,
agg: Aggregate,
every: WindowDuration,
@ -879,11 +876,11 @@ impl InfluxRpcPlanner {
);
let table_predicates = rpc_predicate
.table_predicates(database.as_meta())
.table_predicates(namespace.as_meta())
.context(CreatingPredicatesSnafu)?;
let plans = create_plans(
database,
namespace,
&table_predicates,
ctx,
|ctx, table_name, predicate, chunks, schema| {
@ -1375,7 +1372,7 @@ impl InfluxRpcPlanner {
/// This function is indirectly invoked by `field_columns`, `read_filter`, `read_group` and `read_window_aggregate`
/// through the function `create_plans` where need_fields should be true.
fn table_chunk_stream<'a>(
database: Arc<dyn QueryDatabase>,
namespace: Arc<dyn QueryNamespace>,
need_fields: bool,
table_predicates: &'a [(Arc<str>, Predicate)],
ctx: &'a IOxSessionContext,
@ -1385,9 +1382,9 @@ fn table_chunk_stream<'a>(
let mut ctx = ctx.child_ctx("table");
ctx.set_metadata("table", table_name.to_string());
let database = Arc::clone(&database);
let namespace = Arc::clone(&namespace);
let table_schema = database.table_schema(table_name);
let table_schema = namespace.table_schema(table_name);
let projection = match table_schema {
Some(table_schema) => {
columns_in_predicates(need_fields, table_schema, table_name, predicate)
@ -1396,7 +1393,7 @@ fn table_chunk_stream<'a>(
};
async move {
let chunks = database
let chunks = namespace
.chunks(
table_name,
predicate,
@ -1507,7 +1504,7 @@ fn columns_in_predicates(
/// `f(ctx, table_name, table_predicate, chunks, table_schema)` is
/// invoked on the chunks for each table to produce a plan for each
async fn create_plans<F, P>(
database: Arc<dyn QueryDatabase>,
namespace: Arc<dyn QueryNamespace>,
table_predicates: &[(Arc<str>, Predicate)],
ctx: IOxSessionContext,
f: F,
@ -1525,7 +1522,7 @@ where
+ Sync,
P: Send,
{
table_chunk_stream(Arc::clone(&database), true, table_predicates, &ctx)
table_chunk_stream(Arc::clone(&namespace), true, table_predicates, &ctx)
.and_then(|(table_name, predicate, chunks)| async move {
let chunks = prune_chunks_metadata(chunks, predicate)?;
Ok((table_name, predicate, chunks))
@ -1540,11 +1537,11 @@ where
let mut ctx = ctx.child_ctx("table");
ctx.set_metadata("table", table_name.to_string());
let database = Arc::clone(&database);
let namespace = Arc::clone(&namespace);
let f = f.clone();
async move {
let schema = database
let schema = namespace
.table_schema(table_name)
.context(TableRemovedSnafu {
table_name: table_name.as_ref(),
@ -1927,7 +1924,7 @@ mod tests {
use datafusion::prelude::{col, lit, lit_timestamp_nano};
use datafusion_util::lit_dict;
use futures::{future::BoxFuture, FutureExt};
use predicate::{rpc_predicate::QueryDatabaseMeta, Predicate};
use predicate::{rpc_predicate::QueryNamespaceMeta, Predicate};
use crate::{
exec::{ExecutionContextProvider, Executor},

View File

@ -18,7 +18,7 @@ use exec::{stringset::StringSet, IOxSessionContext};
use hashbrown::HashMap;
use observability_deps::tracing::{debug, trace};
use parquet_file::storage::ParquetExecInput;
use predicate::{rpc_predicate::QueryDatabaseMeta, Predicate, PredicateMatch};
use predicate::{rpc_predicate::QueryNamespaceMeta, Predicate, PredicateMatch};
use schema::{
sort::{SortKey, SortKeyBuilder},
Projection, Schema, TIME_COLUMN_NAME,
@ -89,7 +89,7 @@ pub trait QueryChunkMeta {
}
/// A `QueryCompletedToken` is returned by `record_query` implementations of
/// a `QueryDatabase`. It is used to trigger side-effects (such as query timing)
/// a `QueryNamespace`. It is used to trigger side-effects (such as query timing)
/// on query completion.
///
pub struct QueryCompletedToken {
@ -136,22 +136,20 @@ impl Drop for QueryCompletedToken {
/// This avoids storing potentially large strings
pub type QueryText = Box<dyn std::fmt::Display + Send + Sync>;
/// A `Database` is the main trait implemented by the IOx subsystems
/// that store actual data.
/// `QueryNamespace` is the main trait implemented by the IOx subsystems that store actual data.
///
/// Databases store data organized by partitions and each partition stores
/// data in Chunks.
/// Namespaces store data organized by partitions and each partition stores data in Chunks.
#[async_trait]
pub trait QueryDatabase: QueryDatabaseMeta + Debug + Send + Sync {
/// Returns a set of chunks within the partition with data that may match
/// the provided predicate.
pub trait QueryNamespace: QueryNamespaceMeta + Debug + Send + Sync {
/// Returns a set of chunks within the partition with data that may match the provided
/// predicate.
///
/// If possible, chunks which have no rows that can
/// possibly match the predicate may be omitted.
/// If possible, chunks which have no rows that can possibly match the predicate may be omitted.
///
/// If projection is None, returned chunks will include all columns of its original data. Otherwise,
/// returned chunks will include PK columns (tags and time) and columns specified in the projection. Projecting
/// chunks here is optional and a mere optimization. The query subsystem does NOT rely on it.
/// If projection is `None`, returned chunks will include all columns of its original data.
/// Otherwise, returned chunks will include PK columns (tags and time) and columns specified in
/// the projection. Projecting chunks here is optional and a mere optimization. The query
/// subsystem does NOT rely on it.
async fn chunks(
&self,
table_name: &str,
@ -168,10 +166,10 @@ pub trait QueryDatabase: QueryDatabaseMeta + Debug + Send + Sync {
query_text: QueryText,
) -> QueryCompletedToken;
/// Upcast to [`QueryDatabaseMeta`].
/// Upcast to [`QueryNamespaceMeta`].
///
/// This is required until <https://github.com/rust-lang/rust/issues/65991> is fixed.
fn as_meta(&self) -> &dyn QueryDatabaseMeta;
fn as_meta(&self) -> &dyn QueryNamespaceMeta;
}
/// Raw data of a [`QueryChunk`].

View File

@ -15,7 +15,7 @@ use datafusion::physical_plan::{
};
use futures::Stream;
/// Database schema creation / validation errors.
/// Schema creation / validation errors.
#[allow(clippy::enum_variant_names)]
#[derive(Debug, Snafu)]
pub enum Error {

View File

@ -1,5 +1,4 @@
//! This module provides a reference implementation of
//! [`QueryDatabase`] for use in testing.
//! This module provides a reference implementation of [`QueryNamespace`] for use in testing.
//!
//! AKA it is a Mock
@ -9,7 +8,7 @@ use crate::{
ExecutionContextProvider, Executor, ExecutorType, IOxSessionContext,
},
Predicate, PredicateMatch, QueryChunk, QueryChunkData, QueryChunkMeta, QueryCompletedToken,
QueryDatabase, QueryText,
QueryNamespace, QueryText,
};
use arrow::{
array::{
@ -27,7 +26,7 @@ use datafusion::error::DataFusionError;
use hashbrown::HashSet;
use observability_deps::tracing::debug;
use parking_lot::Mutex;
use predicate::rpc_predicate::QueryDatabaseMeta;
use predicate::rpc_predicate::QueryNamespaceMeta;
use schema::{
builder::SchemaBuilder, merge::SchemaMerger, sort::SortKey, InfluxColumnType, Projection,
Schema, TIME_COLUMN_NAME,
@ -100,7 +99,7 @@ impl TestDatabase {
}
#[async_trait]
impl QueryDatabase for TestDatabase {
impl QueryNamespace for TestDatabase {
async fn chunks(
&self,
table_name: &str,
@ -137,12 +136,12 @@ impl QueryDatabase for TestDatabase {
QueryCompletedToken::new(|_| {})
}
fn as_meta(&self) -> &dyn QueryDatabaseMeta {
fn as_meta(&self) -> &dyn QueryNamespaceMeta {
self
}
}
impl QueryDatabaseMeta for TestDatabase {
impl QueryNamespaceMeta for TestDatabase {
fn table_schema(&self, table_name: &str) -> Option<Arc<Schema>> {
let mut merger = SchemaMerger::new();
let mut found_one = false;

View File

@ -100,9 +100,8 @@ pub async fn http_listener(addr: SocketAddr) -> Result<AddrIncoming> {
Ok(listener)
}
/// Instantiates the gRPC and optional HTTP listeners and returns a
/// Future that completes when these listeners, the Server, Databases,
/// etc... have all exited or the frontend_shutdown token is called.
/// Instantiates the gRPC and optional HTTP listeners and returns a `Future` that completes when
/// the listeners have all exited or the `frontend_shutdown` token is called.
pub async fn serve(
common_state: CommonServerState,
frontend_shutdown: CancellationToken,

View File

@ -11,7 +11,7 @@ pub enum CommonServerStateError {
Tracing { source: trace_exporters::Error },
}
/// Common state used by all server types (e.g. `Database` and `Router`)
/// Common state used by all server types
#[derive(Debug, Clone)]
pub struct CommonServerState {
run_config: RunConfig,

View File

@ -387,7 +387,7 @@ where
.await?
.for_each(|(ns, schema)| {
let name = NamespaceName::try_from(ns.name)
.expect("cannot convert existing namespace name to database name");
.expect("cannot convert existing namespace string to a `NamespaceName` instance");
cache.put_schema(name, schema);
});

View File

@ -20,7 +20,7 @@ pub struct ParquetChunk {
/// Schema that goes with this table's parquet file
schema: Arc<Schema>,
/// Persists the parquet file within a database's relative path
/// Persists the parquet file within a namespace's relative path
store: ParquetStorage,
}

View File

@ -24,7 +24,7 @@ use data_types::{NamespaceId, ParquetFile, PartitionId, ShardId, TableId};
use object_store::path::Path;
use uuid::Uuid;
/// Location of a Parquet file within a database's object store.
/// Location of a Parquet file within a namespace's object store.
/// The exact format is an implementation detail and is subject to change.
#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, Ord, PartialOrd)]
pub struct ParquetFilePath {

View File

@ -108,7 +108,7 @@ impl InfluxRpcPredicate {
/// Returns a list of (TableName, [`Predicate`])
pub fn table_predicates(
&self,
table_info: &dyn QueryDatabaseMeta,
table_info: &dyn QueryNamespaceMeta,
) -> DataFusionResult<Vec<(Arc<str>, Predicate)>> {
let table_names = match &self.table_names {
Some(table_names) => itertools::Either::Left(table_names.iter().cloned()),
@ -146,8 +146,8 @@ impl InfluxRpcPredicate {
}
/// Information required to normalize predicates
pub trait QueryDatabaseMeta {
/// Returns a list of table names in this DB
pub trait QueryNamespaceMeta {
/// Returns a list of table names in this namespace
fn table_names(&self) -> Vec<String>;
/// Schema for a specific table if the table exists.

View File

@ -9,7 +9,7 @@ use backoff::{Backoff, BackoffConfig};
use data_types::{Namespace, ShardIndex};
use iox_catalog::interface::Catalog;
use iox_query::exec::Executor;
use service_common::QueryDatabaseProvider;
use service_common::QueryNamespaceProvider;
use sharder::JumpHash;
use snafu::Snafu;
use std::{collections::BTreeSet, sync::Arc};
@ -65,7 +65,7 @@ pub struct QuerierDatabase {
///
/// This should be a 1-to-1 relation to the number of active queries.
///
/// If the same database is requested twice for different queries, it is counted twice.
/// If the same namespace is requested twice for different queries, it is counted twice.
query_execution_semaphore: Arc<InstrumentedAsyncSemaphore>,
/// Sharder to determine which ingesters to query for a particular table and namespace.
@ -79,7 +79,7 @@ pub struct QuerierDatabase {
}
#[async_trait]
impl QueryDatabaseProvider for QuerierDatabase {
impl QueryNamespaceProvider for QuerierDatabase {
type Db = QuerierNamespace;
async fn db(&self, name: &str, span: Option<Span>) -> Option<Arc<Self::Db>> {

View File

@ -1,4 +1,4 @@
//! Namespace within the whole database.
//! Namespace within the whole catalog.
use crate::{
cache::{namespace::CachedNamespace, CatalogCache},

View File

@ -16,15 +16,15 @@ use datafusion::{
use datafusion_util::config::DEFAULT_SCHEMA;
use iox_query::{
exec::{ExecutionContextProvider, ExecutorType, IOxSessionContext},
QueryChunk, QueryCompletedToken, QueryDatabase, QueryText,
QueryChunk, QueryCompletedToken, QueryNamespace, QueryText,
};
use observability_deps::tracing::{debug, trace};
use predicate::{rpc_predicate::QueryDatabaseMeta, Predicate};
use predicate::{rpc_predicate::QueryNamespaceMeta, Predicate};
use schema::Schema;
use std::{any::Any, collections::HashMap, sync::Arc};
use trace::ctx::SpanContext;
impl QueryDatabaseMeta for QuerierNamespace {
impl QueryNamespaceMeta for QuerierNamespace {
fn table_names(&self) -> Vec<String> {
let mut names: Vec<_> = self.tables.keys().map(|s| s.to_string()).collect();
names.sort();
@ -37,7 +37,7 @@ impl QueryDatabaseMeta for QuerierNamespace {
}
#[async_trait]
impl QueryDatabase for QuerierNamespace {
impl QueryNamespace for QuerierNamespace {
async fn chunks(
&self,
table_name: &str,
@ -94,7 +94,7 @@ impl QueryDatabase for QuerierNamespace {
QueryCompletedToken::new(move |success| query_log.set_completed(entry, success))
}
fn as_meta(&self) -> &dyn QueryDatabaseMeta {
fn as_meta(&self) -> &dyn QueryNamespaceMeta {
self
}
}

View File

@ -1,11 +1,11 @@
use std::{any::Any, sync::Arc};
use datafusion::catalog::catalog::CatalogProvider;
use iox_query::{exec::ExecutionContextProvider, QueryDatabase};
use iox_query::{exec::ExecutionContextProvider, QueryNamespace};
use querier::QuerierNamespace;
/// Abstract database used during testing.
pub trait AbstractDb: CatalogProvider + ExecutionContextProvider + QueryDatabase {
pub trait AbstractDb: CatalogProvider + ExecutionContextProvider + QueryNamespace {
fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static>;
/// Upcast to [`CatalogProvider`].
@ -13,10 +13,10 @@ pub trait AbstractDb: CatalogProvider + ExecutionContextProvider + QueryDatabase
/// This is required due to <https://github.com/rust-lang/rust/issues/65991>.
fn as_catalog_provider_arc(self: Arc<Self>) -> Arc<dyn CatalogProvider>;
/// Upcast to [`QueryDatabase`].
/// Upcast to [`QueryNamespace`].
///
/// This is required due to <https://github.com/rust-lang/rust/issues/65991>.
fn as_query_database_arc(self: Arc<Self>) -> Arc<dyn QueryDatabase>;
fn as_query_namespace_arc(self: Arc<Self>) -> Arc<dyn QueryNamespace>;
}
impl AbstractDb for QuerierNamespace {
@ -28,7 +28,7 @@ impl AbstractDb for QuerierNamespace {
self as _
}
fn as_query_database_arc(self: Arc<Self>) -> Arc<dyn QueryDatabase> {
fn as_query_namespace_arc(self: Arc<Self>) -> Arc<dyn QueryNamespace> {
self
}
}

View File

@ -32,7 +32,7 @@ async fn run_field_columns_test_case<D>(
let planner = InfluxRpcPlanner::new(ctx.child_ctx("planner"));
let plan = planner
.field_columns(db.as_query_database_arc(), predicate.clone())
.field_columns(db.as_query_namespace_arc(), predicate.clone())
.await
.expect("built plan successfully");
let fields = ctx

View File

@ -64,7 +64,7 @@ async fn run_read_filter(
let planner = InfluxRpcPlanner::new(ctx.child_ctx("planner"));
let plan = planner
.read_filter(db.as_query_database_arc(), predicate)
.read_filter(db.as_query_namespace_arc(), predicate)
.await
.map_err(|e| e.to_string())?;

View File

@ -38,7 +38,7 @@ async fn run_read_group_test_case<D>(
let plans = planner
.read_group(
db.as_query_database_arc(),
db.as_query_namespace_arc(),
predicate.clone(),
agg,
&group_columns,

View File

@ -30,7 +30,7 @@ async fn run_read_window_aggregate_test_case<D>(
let plan = planner
.read_window_aggregate(
db.as_query_database_arc(),
db.as_query_namespace_arc(),
predicate.clone(),
agg,
every,

View File

@ -29,7 +29,7 @@ async fn run_table_names_test_case<D>(
let planner = InfluxRpcPlanner::new(ctx.child_ctx("planner"));
let plan = planner
.table_names(db.as_query_database_arc(), predicate.clone())
.table_names(db.as_query_namespace_arc(), predicate.clone())
.await
.expect("built plan successfully");

View File

@ -31,7 +31,7 @@ async fn run_tag_keys_test_case<D>(
let planner = InfluxRpcPlanner::new(ctx.child_ctx("planner"));
let plan = planner
.tag_keys(db.as_query_database_arc(), predicate.clone())
.tag_keys(db.as_query_namespace_arc(), predicate.clone())
.await
.expect("built plan successfully");
let names = ctx

View File

@ -30,7 +30,7 @@ async fn run_tag_values_test_case<D>(
let planner = InfluxRpcPlanner::new(ctx.child_ctx("planner"));
let plan = planner
.tag_values(db.as_query_database_arc(), tag_name, predicate.clone())
.tag_values(db.as_query_namespace_arc(), tag_name, predicate.clone())
.await
.expect("built plan successfully");
let names = ctx
@ -292,7 +292,7 @@ async fn list_tag_values_field_col_on_tag() {
let tag_name = "temp";
let plan_result = planner
.tag_values(
db.as_query_database_arc(),
db.as_query_namespace_arc(),
tag_name,
InfluxRpcPredicate::default(),
)

View File

@ -220,7 +220,7 @@ mod tests {
let ns = "platanos".try_into().unwrap();
let handler = Arc::new(
MockDmlHandler::default()
.with_write_return([Err(DmlError::DatabaseNotFound("nope".to_owned()))]),
.with_write_return([Err(DmlError::NamespaceNotFound("nope".to_owned()))]),
);
let metrics = Arc::new(metric::Registry::default());
@ -234,7 +234,7 @@ mod tests {
.await
.expect_err("inner handler configured to fail");
assert_matches!(err, DmlError::DatabaseNotFound(_));
assert_matches!(err, DmlError::NamespaceNotFound(_));
assert_metric_hit(&metrics, "dml_handler_write_duration", "error");
assert_trace(traces, SpanStatus::Err);
@ -270,7 +270,7 @@ mod tests {
let ns = "platanos".try_into().unwrap();
let handler = Arc::new(
MockDmlHandler::<()>::default()
.with_delete_return([Err(DmlError::DatabaseNotFound("nope".to_owned()))]),
.with_delete_return([Err(DmlError::NamespaceNotFound("nope".to_owned()))]),
);
let metrics = Arc::new(metric::Registry::default());

View File

@ -11,9 +11,9 @@ use super::{partitioner::PartitionError, SchemaError, ShardError};
/// processing.
#[derive(Debug, Error)]
pub enum DmlError {
/// The database specified by the caller does not exist.
#[error("database {0} does not exist")]
DatabaseNotFound(String),
/// The namespace specified by the caller does not exist.
#[error("namespace {0} does not exist")]
NamespaceNotFound(String),
/// An error sharding the writes and pushing them to the write buffer.
#[error(transparent)]

View File

@ -35,7 +35,7 @@ mod tests {
#[test]
fn test_put_get() {
let ns = NamespaceName::new("test").expect("database name is valid");
let ns = NamespaceName::new("test").expect("namespace name is valid");
let cache = Arc::new(MemoryNamespaceCache::default());
assert!(cache.get_schema(&ns).is_none());

View File

@ -224,7 +224,7 @@ mod tests {
#[test]
fn test_put() {
let ns = NamespaceName::new("test").expect("database name is valid");
let ns = NamespaceName::new("test").expect("namespace name is valid");
let registry = metric::Registry::default();
let cache = Arc::new(MemoryNamespaceCache::default());
let cache = Arc::new(InstrumentedCache::new(cache, &registry));
@ -356,7 +356,7 @@ mod tests {
assert_eq!(cache.column_count.observe(), Observation::U64Gauge(11));
// Add a new namespace
let ns = NamespaceName::new("another").expect("database name is valid");
let ns = NamespaceName::new("another").expect("namespace name is valid");
let schema = new_schema(&[10, 12, 9]);
assert!(cache.put_schema(ns.clone(), schema).is_none());
assert_histogram_hit(

View File

@ -55,7 +55,7 @@ mod tests {
.map(char::from)
.collect::<String>()
.try_into()
.expect("generated invalid random database name")
.expect("generated invalid random namespace name")
}
fn schema_with_id(id: i64) -> NamespaceSchema {

View File

@ -121,7 +121,7 @@ impl Error {
impl From<&DmlError> for StatusCode {
fn from(e: &DmlError) -> Self {
match e {
DmlError::DatabaseNotFound(_) => StatusCode::NOT_FOUND,
DmlError::NamespaceNotFound(_) => StatusCode::NOT_FOUND,
// Schema validation error cases
DmlError::Schema(SchemaError::NamespaceLookup(_)) => {
@ -145,7 +145,7 @@ impl From<&DmlError> for StatusCode {
}
/// Errors returned when decoding the organisation / bucket information from a
/// HTTP request and deriving the database name from it.
/// HTTP request and deriving the namespace name from it.
#[derive(Debug, Error)]
pub enum OrgBucketError {
/// The request contains no org/bucket destination information.
@ -156,7 +156,7 @@ pub enum OrgBucketError {
#[error("failed to deserialize org/bucket/precision in request: {0}")]
DecodeFail(#[from] serde::de::value::Error),
/// The provided org/bucket could not be converted into a database name.
/// The provided org/bucket could not be converted into a namespace name.
#[error(transparent)]
MappingFail(#[from] OrgBucketMappingError),
}
@ -921,8 +921,8 @@ mod tests {
db_not_found,
query_string = "?org=bananas&bucket=test",
body = "platanos,tag1=A,tag2=B val=42i 123456".as_bytes(),
dml_handler = [Err(DmlError::DatabaseNotFound("bananas_test".to_string()))],
want_result = Err(Error::DmlHandler(DmlError::DatabaseNotFound(_))),
dml_handler = [Err(DmlError::NamespaceNotFound("bananas_test".to_string()))],
want_result = Err(Error::DmlHandler(DmlError::NamespaceNotFound(_))),
want_dml_calls = [MockDmlHandlerCall::Write{namespace, ..}] => {
assert_eq!(namespace, "bananas_test");
}
@ -1038,8 +1038,8 @@ mod tests {
db_not_found,
query_string = "?org=bananas&bucket=test",
body = r#"{"start":"2021-04-01T14:00:00Z","stop":"2021-04-02T14:00:00Z", "predicate":"_measurement=its_a_table and location=Boston"}"#.as_bytes(),
dml_handler = [Err(DmlError::DatabaseNotFound("bananas_test".to_string()))],
want_result = Err(Error::DmlHandler(DmlError::DatabaseNotFound(_))),
dml_handler = [Err(DmlError::NamespaceNotFound("bananas_test".to_string()))],
want_result = Err(Error::DmlHandler(DmlError::NamespaceNotFound(_))),
want_dml_calls = [MockDmlHandlerCall::Delete{namespace, namespace_id, table, predicate}] => {
assert_eq!(table, "its_a_table");
assert_eq!(namespace, "bananas_test");
@ -1487,8 +1487,8 @@ mod tests {
),
(
DmlHandler(DmlError::DatabaseNotFound("[database name]".into())),
"dml handler error: database [database name] does not exist",
DmlHandler(DmlError::NamespaceNotFound("[namespace name]".into())),
"dml handler error: namespace [namespace name] does not exist",
),
(

View File

@ -5,7 +5,7 @@ use snafu::{ResultExt, Snafu};
use super::{InfluxColumnType, InfluxFieldType, Schema, TIME_COLUMN_NAME};
/// Database schema creation / validation errors.
/// Namespace schema creation / validation errors.
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Error validating schema: {}", source))]

View File

@ -46,7 +46,7 @@ pub mod sort;
pub use builder::SchemaBuilder;
pub use projection::Projection;
/// Database schema creation / validation errors.
/// Namespace schema creation / validation errors.
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display(

View File

@ -9,7 +9,7 @@ use crate::interner::SchemaInterner;
use super::{InfluxColumnType, Schema};
/// Database schema creation / validation errors.
/// Namespace schema creation / validation errors.
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("No schemas found when building merged schema"))]

View File

@ -7,20 +7,20 @@ pub mod test_util;
use std::sync::Arc;
use async_trait::async_trait;
use iox_query::{exec::ExecutionContextProvider, QueryDatabase};
use iox_query::{exec::ExecutionContextProvider, QueryNamespace};
use trace::span::Span;
use tracker::InstrumentedAsyncOwnedSemaphorePermit;
/// Trait that allows the query engine (which includes flight and storage/InfluxRPC) to access a virtual set of
/// databases.
/// Trait that allows the query engine (which includes flight and storage/InfluxRPC) to access a
/// virtual set of namespaces.
///
/// The query engine MUST ONLY use this trait to access the databases / catalogs.
/// The query engine MUST ONLY use this trait to access the namespaces / catalogs.
#[async_trait]
pub trait QueryDatabaseProvider: std::fmt::Debug + Send + Sync + 'static {
/// Abstract database.
type Db: ExecutionContextProvider + QueryDatabase;
pub trait QueryNamespaceProvider: std::fmt::Debug + Send + Sync + 'static {
/// Abstract namespace.
type Db: ExecutionContextProvider + QueryNamespace;
/// Get database if it exists.
/// Get namespace if it exists.
async fn db(&self, name: &str, span: Option<Span>) -> Option<Arc<Self::Db>>;
/// Acquire concurrency-limiting sempahore

View File

@ -6,7 +6,7 @@ use iox_query::{
exec::IOxSessionContext,
frontend::{influxrpc::InfluxRpcPlanner, sql::SqlQueryPlanner},
plan::{fieldlist::FieldListPlan, seriesset::SeriesSetPlans, stringset::StringSetPlan},
Aggregate, QueryDatabase, WindowDuration,
Aggregate, QueryNamespace, WindowDuration,
};
pub use datafusion::error::{DataFusionError as Error, Result};
@ -31,7 +31,7 @@ impl Planner {
}
}
/// Plan a SQL query against the data in `database`, and return a
/// Plan a SQL query against the data in a namespace, and return a
/// DataFusion physical execution plan.
pub async fn sql(&self, query: impl Into<String> + Send) -> Result<Arc<dyn ExecutionPlan>> {
let planner = SqlQueryPlanner::new();
@ -45,20 +45,20 @@ impl Planner {
/// Creates a plan as described on
/// [`InfluxRpcPlanner::table_names`], on a separate threadpool
pub async fn table_names<D>(
pub async fn table_names<N>(
&self,
database: Arc<D>,
namespace: Arc<N>,
predicate: InfluxRpcPredicate,
) -> Result<StringSetPlan>
where
D: QueryDatabase + 'static,
N: QueryNamespace + 'static,
{
let planner = InfluxRpcPlanner::new(self.ctx.child_ctx("planner table_names"));
self.ctx
.run(async move {
planner
.table_names(database, predicate)
.table_names(namespace, predicate)
.await
.map_err(|e| e.to_df_error("table_names"))
})
@ -67,20 +67,20 @@ impl Planner {
/// Creates a plan as described on
/// [`InfluxRpcPlanner::tag_keys`], on a separate threadpool
pub async fn tag_keys<D>(
pub async fn tag_keys<N>(
&self,
database: Arc<D>,
namespace: Arc<N>,
predicate: InfluxRpcPredicate,
) -> Result<StringSetPlan>
where
D: QueryDatabase + 'static,
N: QueryNamespace + 'static,
{
let planner = InfluxRpcPlanner::new(self.ctx.child_ctx("planner tag_keys"));
self.ctx
.run(async move {
planner
.tag_keys(database, predicate)
.tag_keys(namespace, predicate)
.await
.map_err(|e| e.to_df_error("tag_keys"))
})
@ -89,14 +89,14 @@ impl Planner {
/// Creates a plan as described on
/// [`InfluxRpcPlanner::tag_values`], on a separate threadpool
pub async fn tag_values<D>(
pub async fn tag_values<N>(
&self,
database: Arc<D>,
namespace: Arc<N>,
tag_name: impl Into<String> + Send,
predicate: InfluxRpcPredicate,
) -> Result<StringSetPlan>
where
D: QueryDatabase + 'static,
N: QueryNamespace + 'static,
{
let tag_name = tag_name.into();
let planner = InfluxRpcPlanner::new(self.ctx.child_ctx("planner tag_values"));
@ -104,7 +104,7 @@ impl Planner {
self.ctx
.run(async move {
planner
.tag_values(database, &tag_name, predicate)
.tag_values(namespace, &tag_name, predicate)
.await
.map_err(|e| e.to_df_error("tag_values"))
})
@ -113,20 +113,20 @@ impl Planner {
/// Creates a plan as described on
/// [`InfluxRpcPlanner::field_columns`], on a separate threadpool
pub async fn field_columns<D>(
pub async fn field_columns<N>(
&self,
database: Arc<D>,
namespace: Arc<N>,
predicate: InfluxRpcPredicate,
) -> Result<FieldListPlan>
where
D: QueryDatabase + 'static,
N: QueryNamespace + 'static,
{
let planner = InfluxRpcPlanner::new(self.ctx.child_ctx("planner field_columns"));
self.ctx
.run(async move {
planner
.field_columns(database, predicate)
.field_columns(namespace, predicate)
.await
.map_err(|e| e.to_df_error("field_columns"))
})
@ -135,20 +135,20 @@ impl Planner {
/// Creates a plan as described on
/// [`InfluxRpcPlanner::read_filter`], on a separate threadpool
pub async fn read_filter<D>(
pub async fn read_filter<N>(
&self,
database: Arc<D>,
namespace: Arc<N>,
predicate: InfluxRpcPredicate,
) -> Result<SeriesSetPlans>
where
D: QueryDatabase + 'static,
N: QueryNamespace + 'static,
{
let planner = InfluxRpcPlanner::new(self.ctx.child_ctx("planner read_filter"));
self.ctx
.run(async move {
planner
.read_filter(database, predicate)
.read_filter(namespace, predicate)
.await
.map_err(|e| e.to_df_error("read_filter"))
})
@ -157,22 +157,22 @@ impl Planner {
/// Creates a plan as described on
/// [`InfluxRpcPlanner::read_group`], on a separate threadpool
pub async fn read_group<D>(
pub async fn read_group<N>(
&self,
database: Arc<D>,
namespace: Arc<N>,
predicate: InfluxRpcPredicate,
agg: Aggregate,
group_columns: Vec<String>,
) -> Result<SeriesSetPlans>
where
D: QueryDatabase + 'static,
N: QueryNamespace + 'static,
{
let planner = InfluxRpcPlanner::new(self.ctx.child_ctx("planner read_group"));
self.ctx
.run(async move {
planner
.read_group(database, predicate, agg, &group_columns)
.read_group(namespace, predicate, agg, &group_columns)
.await
.map_err(|e| e.to_df_error("read_group"))
})
@ -181,23 +181,23 @@ impl Planner {
/// Creates a plan as described on
/// [`InfluxRpcPlanner::read_window_aggregate`], on a separate threadpool
pub async fn read_window_aggregate<D>(
pub async fn read_window_aggregate<N>(
&self,
database: Arc<D>,
namespace: Arc<N>,
predicate: InfluxRpcPredicate,
agg: Aggregate,
every: WindowDuration,
offset: WindowDuration,
) -> Result<SeriesSetPlans>
where
D: QueryDatabase + 'static,
N: QueryNamespace + 'static,
{
let planner = InfluxRpcPlanner::new(self.ctx.child_ctx("planner read_window_aggregate"));
self.ctx
.run(async move {
planner
.read_window_aggregate(database, predicate, agg, every, offset)
.read_window_aggregate(namespace, predicate, agg, every, offset)
.await
.map_err(|e| e.to_df_error("read_window_aggregate"))
})

View File

@ -8,7 +8,7 @@ use tracker::{
AsyncSemaphoreMetrics, InstrumentedAsyncOwnedSemaphorePermit, InstrumentedAsyncSemaphore,
};
use crate::QueryDatabaseProvider;
use crate::QueryNamespaceProvider;
#[derive(Debug)]
pub struct TestDatabaseStore {
@ -57,7 +57,7 @@ impl Default for TestDatabaseStore {
}
#[async_trait]
impl QueryDatabaseProvider for TestDatabaseStore {
impl QueryNamespaceProvider for TestDatabaseStore {
type Db = TestDatabase;
/// Retrieve the database specified name

View File

@ -14,13 +14,13 @@ use futures::{SinkExt, Stream, StreamExt};
use generated_types::influxdata::iox::querier::v1 as proto;
use iox_query::{
exec::{ExecutionContextProvider, IOxSessionContext},
QueryCompletedToken, QueryDatabase,
QueryCompletedToken, QueryNamespace,
};
use observability_deps::tracing::{debug, info, warn};
use pin_project::{pin_project, pinned_drop};
use prost::Message;
use serde::Deserialize;
use service_common::{datafusion_error_to_tonic_code, planner::Planner, QueryDatabaseProvider};
use service_common::{datafusion_error_to_tonic_code, planner::Planner, QueryNamespaceProvider};
use snafu::{ResultExt, Snafu};
use std::{fmt::Debug, pin::Pin, sync::Arc, task::Poll, time::Instant};
use tokio::task::JoinHandle;
@ -44,20 +44,20 @@ pub enum Error {
source: serde_json::Error,
},
#[snafu(display("Database {} not found", database_name))]
DatabaseNotFound { database_name: String },
#[snafu(display("Namespace {} not found", namespace_name))]
NamespaceNotFound { namespace_name: String },
#[snafu(display(
"Internal error reading points from database {}: {}",
database_name,
"Internal error reading points from namespace {}: {}",
namespace_name,
source
))]
Query {
database_name: String,
namespace_name: String,
source: DataFusionError,
},
#[snafu(display("Invalid database name: {}", source))]
#[snafu(display("Invalid namespace name: {}", source))]
InvalidNamespaceName { source: NamespaceNameError },
#[snafu(display("Failed to optimize record batch: {}", source))]
@ -81,7 +81,7 @@ impl From<Error> for tonic::Status {
// logging is handled for any new error variants.
let msg = "Error handling Flight gRPC request";
match err {
Error::DatabaseNotFound { .. }
Error::NamespaceNotFound { .. }
| Error::InvalidTicket { .. }
| Error::InvalidTicketLegacy { .. }
| Error::InvalidQuery { .. }
@ -102,7 +102,7 @@ impl Error {
let msg = self.to_string();
let code = match self {
Self::DatabaseNotFound { .. } => tonic::Code::NotFound,
Self::NamespaceNotFound { .. } => tonic::Code::NotFound,
Self::InvalidTicket { .. }
| Self::InvalidTicketLegacy { .. }
| Self::InvalidQuery { .. }
@ -122,7 +122,7 @@ type TonicStream<T> = Pin<Box<dyn Stream<Item = Result<T, tonic::Status>> + Send
#[derive(Deserialize, Debug)]
/// Body of the `Ticket` serialized and sent to the do_get endpoint.
struct ReadInfo {
database_name: String,
namespace_name: String,
sql_query: String,
}
@ -141,7 +141,7 @@ impl ReadInfo {
proto::ReadInfo::decode(Bytes::from(ticket.to_vec())).context(InvalidTicketSnafu {})?;
Ok(Self {
database_name: read_info.namespace_name,
namespace_name: read_info.namespace_name,
sql_query: read_info.sql_query,
})
}
@ -151,34 +151,34 @@ impl ReadInfo {
#[derive(Debug)]
struct FlightService<S>
where
S: QueryDatabaseProvider,
S: QueryNamespaceProvider,
{
server: Arc<S>,
}
pub fn make_server<S>(server: Arc<S>) -> FlightServer<impl Flight>
where
S: QueryDatabaseProvider,
S: QueryNamespaceProvider,
{
FlightServer::new(FlightService { server })
}
impl<S> FlightService<S>
where
S: QueryDatabaseProvider,
S: QueryNamespaceProvider,
{
async fn run_query(
&self,
span_ctx: Option<SpanContext>,
permit: InstrumentedAsyncOwnedSemaphorePermit,
sql_query: String,
database: String,
namespace: String,
) -> Result<Response<TonicStream<FlightData>>, tonic::Status> {
let db = self
.server
.db(&database, span_ctx.child_span("get namespace"))
.db(&namespace, span_ctx.child_span("get namespace"))
.await
.ok_or_else(|| tonic::Status::not_found(format!("Unknown namespace: {database}")))?;
.ok_or_else(|| tonic::Status::not_found(format!("Unknown namespace: {namespace}")))?;
let ctx = db.new_query_context(span_ctx);
let query_completed_token = db.record_query(&ctx, "sql", Box::new(sql_query.clone()));
@ -189,7 +189,7 @@ where
.context(PlanningSnafu)?;
let output =
GetStream::new(ctx, physical_plan, database, query_completed_token, permit).await?;
GetStream::new(ctx, physical_plan, namespace, query_completed_token, permit).await?;
Ok(Response::new(Box::pin(output) as TonicStream<FlightData>))
}
@ -198,7 +198,7 @@ where
#[tonic::async_trait]
impl<S> Flight for FlightService<S>
where
S: QueryDatabaseProvider,
S: QueryNamespaceProvider,
{
type HandshakeStream = TonicStream<HandshakeResponse>;
type ListFlightsStream = TonicStream<FlightInfo>;
@ -231,10 +231,10 @@ where
});
if let Err(e) = &read_info {
info!(%e, "Error decoding database and SQL query name from flight ticket");
info!(%e, "Error decoding namespace and SQL query name from flight ticket");
};
let ReadInfo {
database_name,
namespace_name,
sql_query,
} = read_info?;
@ -245,17 +245,17 @@ where
// Log after we acquire the permit and are about to start execution
let start = Instant::now();
info!(db_name=%database_name, %sql_query, %trace, "Running SQL via flight do_get");
info!(%namespace_name, %sql_query, %trace, "Running SQL via flight do_get");
let response = self
.run_query(span_ctx, permit, sql_query.clone(), database_name.clone())
.run_query(span_ctx, permit, sql_query.clone(), namespace_name.clone())
.await;
if let Err(e) = &response {
info!(db_name=%database_name, %sql_query, %trace, %e, "Error running SQL query");
info!(%namespace_name, %sql_query, %trace, %e, "Error running SQL query");
} else {
let elapsed = Instant::now() - start;
debug!(db_name=%database_name,%sql_query,%trace, ?elapsed, "Completed SQL query successfully");
debug!(%namespace_name,%sql_query,%trace, ?elapsed, "Completed SQL query successfully");
}
response
}
@ -330,7 +330,7 @@ impl GetStream {
async fn new(
ctx: IOxSessionContext,
physical_plan: Arc<dyn ExecutionPlan>,
database_name: String,
namespace_name: String,
mut query_completed_token: QueryCompletedToken,
permit: InstrumentedAsyncOwnedSemaphorePermit,
) -> Result<Self, tonic::Status> {
@ -354,7 +354,7 @@ impl GetStream {
.execute_stream(Arc::clone(&physical_plan))
.await
.context(QuerySnafu {
database_name: &database_name,
namespace_name: &namespace_name,
})?;
let join_handle = tokio::spawn(async move {
@ -399,7 +399,7 @@ impl GetStream {
Err(e) => {
// failure sending here is OK because we're cutting the stream anyways
tx.send(Err(Error::Query {
database_name: database_name.clone(),
namespace_name: namespace_name.clone(),
source: DataFusionError::ArrowError(e),
}
.into()))
@ -495,7 +495,7 @@ mod tests {
server: Arc::clone(&test_storage),
};
let ticket = Ticket {
ticket: br#"{"database_name": "my_db", "sql_query": "SELECT 1;"}"#.to_vec(),
ticket: br#"{"namespace_name": "my_db", "sql_query": "SELECT 1;"}"#.to_vec(),
};
let streaming_resp1 = service
.do_get(tonic::Request::new(ticket.clone()))

View File

@ -878,7 +878,7 @@ mod tests {
use arrow::datatypes::DataType;
use datafusion_util::lit_dict;
use generated_types::node::Type as RPCNodeType;
use predicate::{rpc_predicate::QueryDatabaseMeta, Predicate};
use predicate::{rpc_predicate::QueryNamespaceMeta, Predicate};
use schema::{Schema, SchemaBuilder};
use std::{collections::BTreeSet, sync::Arc};
@ -895,7 +895,7 @@ mod tests {
}
}
impl QueryDatabaseMeta for Tables {
impl QueryNamespaceMeta for Tables {
fn table_names(&self) -> Vec<String> {
self.table_names.clone()
}

View File

@ -15,16 +15,16 @@ pub mod input;
pub mod service;
use generated_types::storage_server::{Storage, StorageServer};
use service_common::QueryDatabaseProvider;
use service_common::QueryNamespaceProvider;
use std::sync::Arc;
/// Concrete implementation of the gRPC InfluxDB Storage Service API
#[derive(Debug)]
struct StorageService<T: QueryDatabaseProvider> {
struct StorageService<T: QueryNamespaceProvider> {
pub db_store: Arc<T>,
}
pub fn make_server<T: QueryDatabaseProvider + 'static>(
pub fn make_server<T: QueryNamespaceProvider + 'static>(
db_store: Arc<T>,
) -> StorageServer<impl Storage> {
StorageServer::new(StorageService { db_store })

View File

@ -1,5 +1,5 @@
//! This module contains implementations for the storage gRPC service
//! implemented in terms of the [`QueryDatabase`](iox_query::QueryDatabase).
//! implemented in terms of the [`QueryNamespace`](iox_query::QueryNamespace).
use super::{TAG_KEY_FIELD, TAG_KEY_MEASUREMENT};
use crate::{
@ -30,11 +30,11 @@ use iox_query::{
fieldlist::FieldList, seriesset::converter::Error as SeriesSetError,
ExecutionContextProvider, IOxSessionContext,
},
QueryDatabase, QueryText,
QueryNamespace, QueryText,
};
use observability_deps::tracing::{error, info, trace, warn};
use pin_project::pin_project;
use service_common::{datafusion_error_to_tonic_code, planner::Planner, QueryDatabaseProvider};
use service_common::{datafusion_error_to_tonic_code, planner::Planner, QueryNamespaceProvider};
use snafu::{OptionExt, ResultExt, Snafu};
use std::{
collections::{BTreeSet, HashMap},
@ -236,11 +236,11 @@ fn add_headers(metadata: &mut MetadataMap) {
metadata.insert("storage-type", "iox".parse().unwrap());
}
/// Implements the protobuf defined Storage service for a [`QueryDatabaseProvider`]
/// Implements the protobuf defined Storage service for a [`QueryNamespaceProvider`]
#[tonic::async_trait]
impl<T> Storage for StorageService<T>
where
T: QueryDatabaseProvider + 'static,
T: QueryNamespaceProvider + 'static,
{
type ReadFilterStream =
StreamWithPermit<futures::stream::Iter<std::vec::IntoIter<Result<ReadResponse, Status>>>>;
@ -1015,15 +1015,15 @@ fn get_namespace_name(input: &impl GrpcInputs) -> Result<NamespaceName<'static>,
/// Gathers all measurement names that have data in the specified
/// (optional) range
async fn measurement_name_impl<D>(
db: Arc<D>,
async fn measurement_name_impl<N>(
db: Arc<N>,
db_name: NamespaceName<'static>,
range: Option<TimestampRange>,
rpc_predicate: Option<Predicate>,
ctx: &IOxSessionContext,
) -> Result<StringValuesResponse>
where
D: QueryDatabase + ExecutionContextProvider + 'static,
N: QueryNamespace + ExecutionContextProvider + 'static,
{
let rpc_predicate_string = format!("{:?}", rpc_predicate);
let db_name = db_name.as_str();
@ -1058,8 +1058,8 @@ where
/// Return tag keys with optional measurement, timestamp and arbitrary
/// predicates
async fn tag_keys_impl<D>(
db: Arc<D>,
async fn tag_keys_impl<N>(
db: Arc<N>,
db_name: NamespaceName<'static>,
measurement: Option<String>,
range: Option<TimestampRange>,
@ -1067,7 +1067,7 @@ async fn tag_keys_impl<D>(
ctx: &IOxSessionContext,
) -> Result<StringValuesResponse>
where
D: QueryDatabase + ExecutionContextProvider + 'static,
N: QueryNamespace + ExecutionContextProvider + 'static,
{
let rpc_predicate_string = format!("{:?}", rpc_predicate);
let db_name = db_name.as_str();
@ -1100,8 +1100,8 @@ where
/// Return tag values for tag_name, with optional measurement, timestamp and
/// arbitratry predicates
async fn tag_values_impl<D>(
db: Arc<D>,
async fn tag_values_impl<N>(
db: Arc<N>,
db_name: NamespaceName<'static>,
tag_name: String,
measurement: Option<String>,
@ -1110,7 +1110,7 @@ async fn tag_values_impl<D>(
ctx: &IOxSessionContext,
) -> Result<StringValuesResponse>
where
D: QueryDatabase + ExecutionContextProvider + 'static,
N: QueryNamespace + ExecutionContextProvider + 'static,
{
let rpc_predicate_string = format!("{:?}", rpc_predicate);
@ -1148,14 +1148,14 @@ where
/// Return tag values grouped by one or more measurements with optional
/// filtering predicate and optionally scoped to one or more tag keys.
async fn tag_values_grouped_by_measurement_and_tag_key_impl<D>(
db: Arc<D>,
async fn tag_values_grouped_by_measurement_and_tag_key_impl<N>(
db: Arc<N>,
db_name: NamespaceName<'static>,
req: TagValuesGroupedByMeasurementAndTagKeyRequest,
ctx: &IOxSessionContext,
) -> Result<Vec<TagValuesResponse>, Error>
where
D: QueryDatabase + ExecutionContextProvider + 'static,
N: QueryNamespace + ExecutionContextProvider + 'static,
{
// Extract the tag key predicate.
// See https://docs.influxdata.com/influxdb/v1.8/query_language/explore-schema/#show-tag-values
@ -1222,14 +1222,14 @@ where
}
/// Launch async tasks that materialises the result of executing read_filter.
async fn read_filter_impl<D>(
db: Arc<D>,
async fn read_filter_impl<N>(
db: Arc<N>,
db_name: NamespaceName<'static>,
req: ReadFilterRequest,
ctx: &IOxSessionContext,
) -> Result<Vec<ReadResponse>, Error>
where
D: QueryDatabase + ExecutionContextProvider + 'static,
N: QueryNamespace + ExecutionContextProvider + 'static,
{
let db_name = db_name.as_str();
@ -1267,8 +1267,8 @@ where
}
/// Launch async tasks that send the result of executing read_group to `tx`
async fn query_group_impl<D>(
db: Arc<D>,
async fn query_group_impl<N>(
db: Arc<N>,
db_name: NamespaceName<'static>,
range: Option<TimestampRange>,
rpc_predicate: Option<Predicate>,
@ -1277,7 +1277,7 @@ async fn query_group_impl<D>(
ctx: &IOxSessionContext,
) -> Result<Vec<ReadResponse>, Error>
where
D: QueryDatabase + ExecutionContextProvider + 'static,
N: QueryNamespace + ExecutionContextProvider + 'static,
{
let db_name = db_name.as_str();
@ -1324,8 +1324,8 @@ where
/// Return field names, restricted via optional measurement, timestamp and
/// predicate
async fn field_names_impl<D>(
db: Arc<D>,
async fn field_names_impl<N>(
db: Arc<N>,
db_name: NamespaceName<'static>,
measurement: Option<String>,
range: Option<TimestampRange>,
@ -1333,7 +1333,7 @@ async fn field_names_impl<D>(
ctx: &IOxSessionContext,
) -> Result<FieldList>
where
D: QueryDatabase + ExecutionContextProvider + 'static,
N: QueryNamespace + ExecutionContextProvider + 'static,
{
let rpc_predicate_string = format!("{:?}", rpc_predicate);
@ -1364,14 +1364,14 @@ where
/// Materialises a collection of measurement names. Typically used as part of
/// a plan to scope and group multiple plans by measurement name.
async fn materialise_measurement_names<D>(
db: Arc<D>,
async fn materialise_measurement_names<N>(
db: Arc<N>,
db_name: NamespaceName<'static>,
measurement_exprs: Vec<LiteralOrRegex>,
ctx: &IOxSessionContext,
) -> Result<BTreeSet<String>, Error>
where
D: QueryDatabase + ExecutionContextProvider + 'static,
N: QueryNamespace + ExecutionContextProvider + 'static,
{
use generated_types::{
node::{Comparison, Type, Value},
@ -1442,15 +1442,15 @@ where
///
/// TODO(edd): this might be better represented as a plan against the `columns`
/// system table.
async fn materialise_tag_keys<D>(
db: Arc<D>,
async fn materialise_tag_keys<N>(
db: Arc<N>,
db_name: NamespaceName<'static>,
measurement_name: String,
tag_key_predicate: tag_key_predicate::Value,
ctx: &IOxSessionContext,
) -> Result<BTreeSet<String>, Error>
where
D: QueryDatabase + ExecutionContextProvider + 'static,
N: QueryNamespace + ExecutionContextProvider + 'static,
{
use generated_types::tag_key_predicate::Value;

View File

@ -26,7 +26,7 @@ pub use server_type::{AddAddrEnv, ServerType};
pub use steps::{FCustom, Step, StepTest, StepTestState};
pub use udp_listener::UdpCapture;
/// Return a random string suitable for use as a database name
/// Return a random string suitable for use as a namespace name
pub fn rand_name() -> String {
thread_rng()
.sample_iter(&Alphanumeric)