From bdff4e88487717018490d10feabc8239f6357f94 Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Thu, 10 Nov 2022 16:05:24 -0500 Subject: [PATCH 1/8] fix: Consistently use 'namespace' instead of 'database' in comments and other internal text --- README.md | 4 ++-- dml/src/lib.rs | 2 +- docs/cli.md | 2 +- docs/env.example | 2 +- docs/local_filesystems.md | 2 +- docs/observability.md | 11 +++++------ docs/sql.md | 2 +- docs/testing.md | 10 +++++----- .../protos/influxdata/iox/querier/v1/flight.proto | 2 +- influxdb_iox/src/commands/namespace/retention.rs | 5 +++-- influxdb_iox/src/commands/write.rs | 2 +- influxdb_iox/src/main.rs | 8 ++++---- influxdb_iox_client/src/client/flight/low_level.rs | 2 +- influxdb_iox_client/src/client/flight/mod.rs | 5 ++--- influxdb_iox_client/src/client/write.rs | 4 ++-- influxdb_storage_client/src/lib.rs | 7 +++---- iox_catalog/src/interface.rs | 2 +- iox_query/src/provider/adapter.rs | 2 +- ioxd_common/src/lib.rs | 5 ++--- ioxd_common/src/server_type/common_state.rs | 2 +- ioxd_router/src/lib.rs | 2 +- parquet_file/src/chunk.rs | 2 +- parquet_file/src/lib.rs | 2 +- querier/src/database.rs | 2 +- querier/src/namespace/mod.rs | 2 +- router/src/namespace_cache/memory.rs | 2 +- router/src/namespace_cache/metrics.rs | 4 ++-- router/src/namespace_cache/sharded_cache.rs | 2 +- router/src/server/http.rs | 4 ++-- schema/src/builder.rs | 2 +- schema/src/lib.rs | 2 +- schema/src/merge.rs | 2 +- test_helpers_end_to_end/src/lib.rs | 2 +- 33 files changed, 54 insertions(+), 57 deletions(-) diff --git a/README.md b/README.md index 25f1616240..351cc2be04 100644 --- a/README.md +++ b/README.md @@ -235,7 +235,7 @@ See [docs/testing.md] for more information Data can be written to InfluxDB IOx by sending [line protocol] format to the `/api/v2/write` endpoint or using the CLI. -For example, assuming you are running in local mode, this command will send data in the `test_fixtures/lineproto/metrics.lp` file to the `company_sensors` database. +For example, assuming you are running in local mode, this command will send data in the `test_fixtures/lineproto/metrics.lp` file to the `company_sensors` namespace. ```shell ./target/debug/influxdb_iox -vv write company_sensors test_fixtures/lineproto/metrics.lp --host http://localhost:8080 @@ -243,7 +243,7 @@ For example, assuming you are running in local mode, this command will send data Note that `--host http://localhost:8080` is required as the `/v2/api` endpoint is hosted on port `8080` while the default is the querier gRPC port `8082`. -To query the data stored in the `company_sensors` database: +To query the data stored in the `company_sensors` namespace: ```shell ./target/debug/influxdb_iox query company_sensors "SELECT * FROM cpu LIMIT 10" diff --git a/dml/src/lib.rs b/dml/src/lib.rs index 67125ce7bd..c9a2ecf421 100644 --- a/dml/src/lib.rs +++ b/dml/src/lib.rs @@ -186,7 +186,7 @@ impl From for DmlOperation { } } -/// A collection of writes to potentially multiple tables within the same database +/// A collection of writes to potentially multiple tables within the same namespace #[derive(Debug, Clone)] pub struct DmlWrite { /// The namespace being written to diff --git a/docs/cli.md b/docs/cli.md index 8207a77655..df5775b66b 100644 --- a/docs/cli.md +++ b/docs/cli.md @@ -62,7 +62,7 @@ Connected to IOx Server Set output format format to pretty Ready for commands. (Hint: try 'help;') > use 26f7e5a4b7be365b_917b97a92e883afc; -You are now in remote mode, querying database 26f7e5a4b7be365b_917b97a92e883afc +You are now in remote mode, querying namespace 26f7e5a4b7be365b_917b97a92e883afc 26f7e5a4b7be365b_917b97a92e883afc> select count(*) from cpu; +-----------------+ | COUNT(UInt8(1)) | diff --git a/docs/env.example b/docs/env.example index e95366ce70..e88aa35ba0 100644 --- a/docs/env.example +++ b/docs/env.example @@ -7,7 +7,7 @@ # The full list of available configuration values can be found by in # the command line help (e.g. `env: INFLUXDB_IOX_DB_DIR=`): # -# ./influxdb_iox run database --help +# ./influxdb_iox run --help # # # The identifier for the server. Used for writing to object storage and as diff --git a/docs/local_filesystems.md b/docs/local_filesystems.md index e53eb896ff..e4310c3702 100644 --- a/docs/local_filesystems.md +++ b/docs/local_filesystems.md @@ -18,6 +18,6 @@ The rationale for this assumption is to enable IOx to be operated in a container This configuration is used when running IOx on a "plain old server" operating at the edge as well as for local testing. -In this configuration, IOx assumes the contents of the local file system are preserved at least as long as the life of the IOx Database and that external measures are taken to backup or otherwise manage this. +In this configuration, IOx assumes the contents of the local file system are preserved at least as long as the life of the IOx instance and that external measures are taken to backup or otherwise manage this. In other words, unsurprisingly, when using the local filesystem as object storage, the durability of the data in IOx is tied to the durability of the filesystem. diff --git a/docs/observability.md b/docs/observability.md index 58daa10afa..f06bc61335 100644 --- a/docs/observability.md +++ b/docs/observability.md @@ -2,7 +2,7 @@ ## Background -Observability context is how a component exposes metrics, traces, logs, panics, etc... in a way that places them in the context of the wider system. Most commonly this might be the database name, but might also be the table name, chunk ID, etc... Crucially this information may not be relevant to the component's primary function, e.g. if we never needed to observe `Db` it wouldn't need to know the database name at all, as only `Server` would need to know +Observability context is how a component exposes metrics, traces, logs, panics, etc... in a way that places them in the context of the wider system. Most commonly this might be the namespace name, but might also be the table name, chunk ID, etc... Crucially this information may not be relevant to the component's primary function, e.g. if we never needed to observe `Db` it wouldn't need to know the namespace name at all, as only `Server` would need to know Broadly speaking there are 3 approaches to how to inject this context: @@ -12,14 +12,13 @@ Broadly speaking there are 3 approaches to how to inject this context: Effectively the 3 trade-off between "polluting" the object or the callsites. -To give a concrete example, from a purely logic perspective the catalog does not need to know the database name, only the path -to the object store. However, it is helpful for logs, metrics, etc... to be in terms of database name. +To give a concrete example, from a purely logic perspective the catalog does not need to know the namespace name, only the path to the object store. However, it is helpful for logs, metrics, etc... to be in terms of namespace name. The three approaches would therefore be -1. Inject database name on construction onto the catalog object -2. Inject metrics, logs, etc... wrappers that carry the database name context internally -3. Inject database name at every call site, either explicitly passing it as an argument, or implicitly by wrapping in a span, mapping the returned error, etc... +1. Inject namespace name on construction onto the catalog object +2. Inject metrics, logs, etc... wrappers that carry the namespace name context internally +3. Inject namespace name at every call site, either explicitly passing it as an argument, or implicitly by wrapping in a span, mapping the returned error, etc... ## Outcome diff --git a/docs/sql.md b/docs/sql.md index 977da2f6ec..9c3f07687a 100644 --- a/docs/sql.md +++ b/docs/sql.md @@ -131,7 +131,7 @@ In this section, IOx specific SQL tables, commands, and extensions are documente ## System Tables -In addition to the SQL standard `information_schema`, IOx contains several *system tables* that provide access to IOx specific information. The information in each system table is scoped to that particular database. Cross database queries are not possible due to the design of IOx's security model. +In addition to the SQL standard `information_schema`, IOx contains several *system tables* that provide access to IOx specific information. The information in each system table is scoped to that particular namespace. Cross namespace queries are not possible due to the design of IOx's security model. ### `system.queries` `system.queries` contains information about queries run against this IOx instance diff --git a/docs/testing.md b/docs/testing.md index 44253ff320..34fb7f2a92 100644 --- a/docs/testing.md +++ b/docs/testing.md @@ -88,14 +88,14 @@ set. ### Configuration differences when running the tests -When running `influxdb_iox run database`, you can pick one object store to use. When running the tests, -you can run them against all the possible object stores. There's still only one -`INFLUXDB_IOX_BUCKET` variable, though, so that will set the bucket name for all configured object -stores. Use the same bucket name when setting up the different services. +When running `influxdb_iox run`, you can pick one object store to use. When running the tests, you +can run them against all the possible object stores. There's still only one `INFLUXDB_IOX_BUCKET` +variable, though, so that will set the bucket name for all configured object stores. Use the same +bucket name when setting up the different services. Other than possibly configuring multiple object stores, configuring the tests to use the object store services is the same as configuring the server to use an object store service. See the output -of `influxdb_iox run database --help` for instructions. +of `influxdb_iox run --help` for instructions. ## InfluxDB 2 Client diff --git a/generated_types/protos/influxdata/iox/querier/v1/flight.proto b/generated_types/protos/influxdata/iox/querier/v1/flight.proto index 9c2aa93e0d..858f105e5a 100644 --- a/generated_types/protos/influxdata/iox/querier/v1/flight.proto +++ b/generated_types/protos/influxdata/iox/querier/v1/flight.proto @@ -4,7 +4,7 @@ option go_package = "github.com/influxdata/iox/querier/v1"; // Request body for ticket in "end-user to querier" flight requests. message ReadInfo { - // Namespace(/database) name. + // Namespace name. string namespace_name = 1; // SQL query. diff --git a/influxdb_iox/src/commands/namespace/retention.rs b/influxdb_iox/src/commands/namespace/retention.rs index e7b97aa141..68193c7b87 100644 --- a/influxdb_iox/src/commands/namespace/retention.rs +++ b/influxdb_iox/src/commands/namespace/retention.rs @@ -11,14 +11,15 @@ pub enum Error { ClientError(#[from] influxdb_iox_client::error::Error), } -/// Write data into the specified database +/// Update the specified namespace's data retention period #[derive(Debug, clap::Parser)] pub struct Config { /// The namespace to update the retention period for #[clap(action)] namespace: String, - /// Num of hours of the retention period of this namespace. Default is 0 representing infinite retention + /// Num of hours of the retention period of this namespace. Default is 0 representing infinite + /// retention #[clap(action, long, short = 'c', default_value = "0")] retention_hours: u32, } diff --git a/influxdb_iox/src/commands/write.rs b/influxdb_iox/src/commands/write.rs index 21d87f56b7..8af7401d2f 100644 --- a/influxdb_iox/src/commands/write.rs +++ b/influxdb_iox/src/commands/write.rs @@ -47,7 +47,7 @@ pub enum Error { pub type Result = std::result::Result; -/// Write data into the specified database +/// Write data into the specified namespace #[derive(Debug, clap::Parser)] pub struct Config { /// If specified, restricts the maxium amount of line protocol diff --git a/influxdb_iox/src/main.rs b/influxdb_iox/src/main.rs index 7c9679337c..5e817b45b3 100644 --- a/influxdb_iox/src/main.rs +++ b/influxdb_iox/src/main.rs @@ -115,9 +115,9 @@ Command are generally structured in the form: For example, a command such as the following shows all actions - available for database chunks, including get and list. + available for namespaces, including `list` and `retention`. - influxdb_iox database chunk --help + influxdb_iox namespace --help "# )] struct Config { @@ -184,13 +184,13 @@ enum Command { /// Various commands for compactor manipulation Compactor(Box), - /// Interrogate internal database data + /// Interrogate internal data Debug(commands::debug::Config), /// Initiate a read request to the gRPC storage service. Storage(commands::storage::Config), - /// Write data into the specified database + /// Write data into the specified namespace Write(commands::write::Config), /// Query the data with SQL diff --git a/influxdb_iox_client/src/client/flight/low_level.rs b/influxdb_iox_client/src/client/flight/low_level.rs index e7b498090b..be178d8093 100644 --- a/influxdb_iox_client/src/client/flight/low_level.rs +++ b/influxdb_iox_client/src/client/flight/low_level.rs @@ -110,7 +110,7 @@ where } } - /// Query the given database with the given SQL query, and return a [`PerformQuery`] instance + /// Query the given namespace with the given SQL query, and return a [`PerformQuery`] instance /// that streams low-level message results. pub async fn perform_query(&mut self, request: T) -> Result, Error> { PerformQuery::::new(self, request).await diff --git a/influxdb_iox_client/src/client/flight/mod.rs b/influxdb_iox_client/src/client/flight/mod.rs index 3b3e3c6044..0aa9fd18f3 100644 --- a/influxdb_iox_client/src/client/flight/mod.rs +++ b/influxdb_iox_client/src/client/flight/mod.rs @@ -21,8 +21,7 @@ pub use low_level::{Client as LowLevelClient, PerformQuery as LowLevelPerformQue use self::low_level::LowLevelMessage; -/// Error responses when querying an IOx database using the Arrow Flight gRPC -/// API. +/// Error responses when querying an IOx namespace using the Arrow Flight gRPC API. #[derive(Debug, Error)] pub enum Error { /// There were no FlightData messages returned when we expected to get one @@ -124,7 +123,7 @@ impl Client { } } - /// Query the given database with the given SQL query, and return a + /// Query the given namespace with the given SQL query, and return a /// [`PerformQuery`] instance that streams Arrow `RecordBatch` results. pub async fn perform_query(&mut self, request: ReadInfo) -> Result { PerformQuery::new(self, request).await diff --git a/influxdb_iox_client/src/client/write.rs b/influxdb_iox_client/src/client/write.rs index 4771970f11..12af2e037c 100644 --- a/influxdb_iox_client/src/client/write.rs +++ b/influxdb_iox_client/src/client/write.rs @@ -101,7 +101,7 @@ impl Client { /// Write the [LineProtocol] formatted string in `lp_data` to /// namespace `namespace`. /// - /// Returns the number of bytes which were written to the database + /// Returns the number of bytes which were written to the namespace. /// /// [LineProtocol]: https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/#data-types-and-format pub async fn write_lp( @@ -119,7 +119,7 @@ impl Client { /// individual lines (points) do not cross these strings /// /// Returns the number of bytes, in total, which were written to - /// the database + /// the namespace. /// /// [LineProtocol]: https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/#data-types-and-format pub async fn write_lp_stream( diff --git a/influxdb_storage_client/src/lib.rs b/influxdb_storage_client/src/lib.rs index 445622b248..4dc988621c 100644 --- a/influxdb_storage_client/src/lib.rs +++ b/influxdb_storage_client/src/lib.rs @@ -36,10 +36,9 @@ use ::generated_types::google::protobuf::*; use observability_deps::tracing::{debug, trace}; use std::num::NonZeroU64; -/// InfluxDB IOx deals with database names. The gRPC interface deals -/// with org_id and bucket_id represented as 16 digit hex -/// values. This struct manages creating the org_id, bucket_id, -/// and database names to be consistent with the implementation +/// InfluxDB IOx deals with namespace names. The gRPC interface deals with org_id and bucket_id +/// represented as 16 digit hex values. This struct manages creating the org_id, bucket_id, and +/// namespace names to be consistent with the implementation. #[derive(Debug, Clone)] pub struct OrgAndBucket { org_id: NonZeroU64, diff --git a/iox_catalog/src/interface.rs b/iox_catalog/src/interface.rs index e81ba8bd14..d42eeef469 100644 --- a/iox_catalog/src/interface.rs +++ b/iox_catalog/src/interface.rs @@ -416,7 +416,7 @@ pub trait ShardRepo: Send + Sync { } /// Functions for working with IOx partitions in the catalog. Note that these are how IOx splits up -/// data within a database, which is different than Kafka partitions. +/// data within a namespace, which is different than Kafka partitions. #[async_trait] pub trait PartitionRepo: Send + Sync { /// create or get a partition record for the given partition key, shard and table diff --git a/iox_query/src/provider/adapter.rs b/iox_query/src/provider/adapter.rs index cf143dcb57..6dab683085 100644 --- a/iox_query/src/provider/adapter.rs +++ b/iox_query/src/provider/adapter.rs @@ -15,7 +15,7 @@ use datafusion::physical_plan::{ }; use futures::Stream; -/// Database schema creation / validation errors. +/// Schema creation / validation errors. #[allow(clippy::enum_variant_names)] #[derive(Debug, Snafu)] pub enum Error { diff --git a/ioxd_common/src/lib.rs b/ioxd_common/src/lib.rs index a25307273a..84b7c03fe7 100644 --- a/ioxd_common/src/lib.rs +++ b/ioxd_common/src/lib.rs @@ -100,9 +100,8 @@ pub async fn http_listener(addr: SocketAddr) -> Result { Ok(listener) } -/// Instantiates the gRPC and optional HTTP listeners and returns a -/// Future that completes when these listeners, the Server, Databases, -/// etc... have all exited or the frontend_shutdown token is called. +/// Instantiates the gRPC and optional HTTP listeners and returns a `Future` that completes when +/// the listeners have all exited or the `frontend_shutdown` token is called. pub async fn serve( common_state: CommonServerState, frontend_shutdown: CancellationToken, diff --git a/ioxd_common/src/server_type/common_state.rs b/ioxd_common/src/server_type/common_state.rs index cd5c8d446e..f9129d2c16 100644 --- a/ioxd_common/src/server_type/common_state.rs +++ b/ioxd_common/src/server_type/common_state.rs @@ -11,7 +11,7 @@ pub enum CommonServerStateError { Tracing { source: trace_exporters::Error }, } -/// Common state used by all server types (e.g. `Database` and `Router`) +/// Common state used by all server types #[derive(Debug, Clone)] pub struct CommonServerState { run_config: RunConfig, diff --git a/ioxd_router/src/lib.rs b/ioxd_router/src/lib.rs index 4d64cdaead..bb710b8680 100644 --- a/ioxd_router/src/lib.rs +++ b/ioxd_router/src/lib.rs @@ -387,7 +387,7 @@ where .await? .for_each(|(ns, schema)| { let name = NamespaceName::try_from(ns.name) - .expect("cannot convert existing namespace name to database name"); + .expect("cannot convert existing namespace string to a `NamespaceName` instance"); cache.put_schema(name, schema); }); diff --git a/parquet_file/src/chunk.rs b/parquet_file/src/chunk.rs index 9a7ac76efd..d125f09ef2 100644 --- a/parquet_file/src/chunk.rs +++ b/parquet_file/src/chunk.rs @@ -20,7 +20,7 @@ pub struct ParquetChunk { /// Schema that goes with this table's parquet file schema: Arc, - /// Persists the parquet file within a database's relative path + /// Persists the parquet file within a namespace's relative path store: ParquetStorage, } diff --git a/parquet_file/src/lib.rs b/parquet_file/src/lib.rs index 427f40b26a..b35d705687 100644 --- a/parquet_file/src/lib.rs +++ b/parquet_file/src/lib.rs @@ -24,7 +24,7 @@ use data_types::{NamespaceId, ParquetFile, PartitionId, ShardId, TableId}; use object_store::path::Path; use uuid::Uuid; -/// Location of a Parquet file within a database's object store. +/// Location of a Parquet file within a namespace's object store. /// The exact format is an implementation detail and is subject to change. #[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, Ord, PartialOrd)] pub struct ParquetFilePath { diff --git a/querier/src/database.rs b/querier/src/database.rs index df269114f5..38185c0193 100644 --- a/querier/src/database.rs +++ b/querier/src/database.rs @@ -65,7 +65,7 @@ pub struct QuerierDatabase { /// /// This should be a 1-to-1 relation to the number of active queries. /// - /// If the same database is requested twice for different queries, it is counted twice. + /// If the same namespace is requested twice for different queries, it is counted twice. query_execution_semaphore: Arc, /// Sharder to determine which ingesters to query for a particular table and namespace. diff --git a/querier/src/namespace/mod.rs b/querier/src/namespace/mod.rs index 4580b8a2b9..66081e82a2 100644 --- a/querier/src/namespace/mod.rs +++ b/querier/src/namespace/mod.rs @@ -1,4 +1,4 @@ -//! Namespace within the whole database. +//! Namespace within the whole catalog. use crate::{ cache::{namespace::CachedNamespace, CatalogCache}, diff --git a/router/src/namespace_cache/memory.rs b/router/src/namespace_cache/memory.rs index 04390a1b29..db1c66a39b 100644 --- a/router/src/namespace_cache/memory.rs +++ b/router/src/namespace_cache/memory.rs @@ -35,7 +35,7 @@ mod tests { #[test] fn test_put_get() { - let ns = NamespaceName::new("test").expect("database name is valid"); + let ns = NamespaceName::new("test").expect("namespace name is valid"); let cache = Arc::new(MemoryNamespaceCache::default()); assert!(cache.get_schema(&ns).is_none()); diff --git a/router/src/namespace_cache/metrics.rs b/router/src/namespace_cache/metrics.rs index 5c04267e56..40bdcf662b 100644 --- a/router/src/namespace_cache/metrics.rs +++ b/router/src/namespace_cache/metrics.rs @@ -224,7 +224,7 @@ mod tests { #[test] fn test_put() { - let ns = NamespaceName::new("test").expect("database name is valid"); + let ns = NamespaceName::new("test").expect("namespace name is valid"); let registry = metric::Registry::default(); let cache = Arc::new(MemoryNamespaceCache::default()); let cache = Arc::new(InstrumentedCache::new(cache, ®istry)); @@ -356,7 +356,7 @@ mod tests { assert_eq!(cache.column_count.observe(), Observation::U64Gauge(11)); // Add a new namespace - let ns = NamespaceName::new("another").expect("database name is valid"); + let ns = NamespaceName::new("another").expect("namespace name is valid"); let schema = new_schema(&[10, 12, 9]); assert!(cache.put_schema(ns.clone(), schema).is_none()); assert_histogram_hit( diff --git a/router/src/namespace_cache/sharded_cache.rs b/router/src/namespace_cache/sharded_cache.rs index 4811221dc8..a94e48ed94 100644 --- a/router/src/namespace_cache/sharded_cache.rs +++ b/router/src/namespace_cache/sharded_cache.rs @@ -55,7 +55,7 @@ mod tests { .map(char::from) .collect::() .try_into() - .expect("generated invalid random database name") + .expect("generated invalid random namespace name") } fn schema_with_id(id: i64) -> NamespaceSchema { diff --git a/router/src/server/http.rs b/router/src/server/http.rs index 79dac16ad9..b040afd754 100644 --- a/router/src/server/http.rs +++ b/router/src/server/http.rs @@ -145,7 +145,7 @@ impl From<&DmlError> for StatusCode { } /// Errors returned when decoding the organisation / bucket information from a -/// HTTP request and deriving the database name from it. +/// HTTP request and deriving the namespace name from it. #[derive(Debug, Error)] pub enum OrgBucketError { /// The request contains no org/bucket destination information. @@ -156,7 +156,7 @@ pub enum OrgBucketError { #[error("failed to deserialize org/bucket/precision in request: {0}")] DecodeFail(#[from] serde::de::value::Error), - /// The provided org/bucket could not be converted into a database name. + /// The provided org/bucket could not be converted into a namespace name. #[error(transparent)] MappingFail(#[from] OrgBucketMappingError), } diff --git a/schema/src/builder.rs b/schema/src/builder.rs index 05704b6433..8a652c9d57 100644 --- a/schema/src/builder.rs +++ b/schema/src/builder.rs @@ -5,7 +5,7 @@ use snafu::{ResultExt, Snafu}; use super::{InfluxColumnType, InfluxFieldType, Schema, TIME_COLUMN_NAME}; -/// Database schema creation / validation errors. +/// Namespace schema creation / validation errors. #[derive(Debug, Snafu)] pub enum Error { #[snafu(display("Error validating schema: {}", source))] diff --git a/schema/src/lib.rs b/schema/src/lib.rs index d03db6e331..1f92ba3bdc 100644 --- a/schema/src/lib.rs +++ b/schema/src/lib.rs @@ -46,7 +46,7 @@ pub mod sort; pub use builder::SchemaBuilder; pub use projection::Projection; -/// Database schema creation / validation errors. +/// Namespace schema creation / validation errors. #[derive(Debug, Snafu)] pub enum Error { #[snafu(display( diff --git a/schema/src/merge.rs b/schema/src/merge.rs index 147eee2490..8b718da069 100644 --- a/schema/src/merge.rs +++ b/schema/src/merge.rs @@ -9,7 +9,7 @@ use crate::interner::SchemaInterner; use super::{InfluxColumnType, Schema}; -/// Database schema creation / validation errors. +/// Namespace schema creation / validation errors. #[derive(Debug, Snafu)] pub enum Error { #[snafu(display("No schemas found when building merged schema"))] diff --git a/test_helpers_end_to_end/src/lib.rs b/test_helpers_end_to_end/src/lib.rs index 180ffe9c7b..ca2ef263d4 100644 --- a/test_helpers_end_to_end/src/lib.rs +++ b/test_helpers_end_to_end/src/lib.rs @@ -26,7 +26,7 @@ pub use server_type::{AddAddrEnv, ServerType}; pub use steps::{FCustom, Step, StepTest, StepTestState}; pub use udp_listener::UdpCapture; -/// Return a random string suitable for use as a database name +/// Return a random string suitable for use as a namespace name pub fn rand_name() -> String { thread_rng() .sample_iter(&Alphanumeric) From d965004e52a871a91c30e633351c331a55266891 Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Thu, 10 Nov 2022 16:06:30 -0500 Subject: [PATCH 2/8] fix: Rename DmlError::DatabaseNotFound to NamespaceNotFound --- router/src/dml_handlers/instrumentation.rs | 6 +++--- router/src/dml_handlers/trait.rs | 6 +++--- router/src/server/http.rs | 14 +++++++------- 3 files changed, 13 insertions(+), 13 deletions(-) diff --git a/router/src/dml_handlers/instrumentation.rs b/router/src/dml_handlers/instrumentation.rs index e79fdd70e5..be513d094a 100644 --- a/router/src/dml_handlers/instrumentation.rs +++ b/router/src/dml_handlers/instrumentation.rs @@ -220,7 +220,7 @@ mod tests { let ns = "platanos".try_into().unwrap(); let handler = Arc::new( MockDmlHandler::default() - .with_write_return([Err(DmlError::DatabaseNotFound("nope".to_owned()))]), + .with_write_return([Err(DmlError::NamespaceNotFound("nope".to_owned()))]), ); let metrics = Arc::new(metric::Registry::default()); @@ -234,7 +234,7 @@ mod tests { .await .expect_err("inner handler configured to fail"); - assert_matches!(err, DmlError::DatabaseNotFound(_)); + assert_matches!(err, DmlError::NamespaceNotFound(_)); assert_metric_hit(&metrics, "dml_handler_write_duration", "error"); assert_trace(traces, SpanStatus::Err); @@ -270,7 +270,7 @@ mod tests { let ns = "platanos".try_into().unwrap(); let handler = Arc::new( MockDmlHandler::<()>::default() - .with_delete_return([Err(DmlError::DatabaseNotFound("nope".to_owned()))]), + .with_delete_return([Err(DmlError::NamespaceNotFound("nope".to_owned()))]), ); let metrics = Arc::new(metric::Registry::default()); diff --git a/router/src/dml_handlers/trait.rs b/router/src/dml_handlers/trait.rs index e264ce7b04..626d22cbbd 100644 --- a/router/src/dml_handlers/trait.rs +++ b/router/src/dml_handlers/trait.rs @@ -11,9 +11,9 @@ use super::{partitioner::PartitionError, SchemaError, ShardError}; /// processing. #[derive(Debug, Error)] pub enum DmlError { - /// The database specified by the caller does not exist. - #[error("database {0} does not exist")] - DatabaseNotFound(String), + /// The namespace specified by the caller does not exist. + #[error("namespace {0} does not exist")] + NamespaceNotFound(String), /// An error sharding the writes and pushing them to the write buffer. #[error(transparent)] diff --git a/router/src/server/http.rs b/router/src/server/http.rs index b040afd754..996f7a6c4c 100644 --- a/router/src/server/http.rs +++ b/router/src/server/http.rs @@ -121,7 +121,7 @@ impl Error { impl From<&DmlError> for StatusCode { fn from(e: &DmlError) -> Self { match e { - DmlError::DatabaseNotFound(_) => StatusCode::NOT_FOUND, + DmlError::NamespaceNotFound(_) => StatusCode::NOT_FOUND, // Schema validation error cases DmlError::Schema(SchemaError::NamespaceLookup(_)) => { @@ -921,8 +921,8 @@ mod tests { db_not_found, query_string = "?org=bananas&bucket=test", body = "platanos,tag1=A,tag2=B val=42i 123456".as_bytes(), - dml_handler = [Err(DmlError::DatabaseNotFound("bananas_test".to_string()))], - want_result = Err(Error::DmlHandler(DmlError::DatabaseNotFound(_))), + dml_handler = [Err(DmlError::NamespaceNotFound("bananas_test".to_string()))], + want_result = Err(Error::DmlHandler(DmlError::NamespaceNotFound(_))), want_dml_calls = [MockDmlHandlerCall::Write{namespace, ..}] => { assert_eq!(namespace, "bananas_test"); } @@ -1038,8 +1038,8 @@ mod tests { db_not_found, query_string = "?org=bananas&bucket=test", body = r#"{"start":"2021-04-01T14:00:00Z","stop":"2021-04-02T14:00:00Z", "predicate":"_measurement=its_a_table and location=Boston"}"#.as_bytes(), - dml_handler = [Err(DmlError::DatabaseNotFound("bananas_test".to_string()))], - want_result = Err(Error::DmlHandler(DmlError::DatabaseNotFound(_))), + dml_handler = [Err(DmlError::NamespaceNotFound("bananas_test".to_string()))], + want_result = Err(Error::DmlHandler(DmlError::NamespaceNotFound(_))), want_dml_calls = [MockDmlHandlerCall::Delete{namespace, namespace_id, table, predicate}] => { assert_eq!(table, "its_a_table"); assert_eq!(namespace, "bananas_test"); @@ -1487,8 +1487,8 @@ mod tests { ), ( - DmlHandler(DmlError::DatabaseNotFound("[database name]".into())), - "dml handler error: database [database name] does not exist", + DmlHandler(DmlError::NamespaceNotFound("[namespace name]".into())), + "dml handler error: namespace [namespace name] does not exist", ), ( From 621560a0dcb2519afbbbeb49a55d6bf4b903f885 Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Thu, 10 Nov 2022 17:15:38 -0500 Subject: [PATCH 3/8] fix: Rename QueryDatabaseMeta to QueryNamespaceMeta --- iox_query/src/frontend/influxrpc.rs | 2 +- iox_query/src/lib.rs | 8 ++++---- iox_query/src/test.rs | 6 +++--- predicate/src/rpc_predicate.rs | 6 +++--- querier/src/namespace/query_access.rs | 6 +++--- service_grpc_influxrpc/src/expr.rs | 4 ++-- 6 files changed, 16 insertions(+), 16 deletions(-) diff --git a/iox_query/src/frontend/influxrpc.rs b/iox_query/src/frontend/influxrpc.rs index f82331ba55..07a68c2714 100644 --- a/iox_query/src/frontend/influxrpc.rs +++ b/iox_query/src/frontend/influxrpc.rs @@ -1927,7 +1927,7 @@ mod tests { use datafusion::prelude::{col, lit, lit_timestamp_nano}; use datafusion_util::lit_dict; use futures::{future::BoxFuture, FutureExt}; - use predicate::{rpc_predicate::QueryDatabaseMeta, Predicate}; + use predicate::{rpc_predicate::QueryNamespaceMeta, Predicate}; use crate::{ exec::{ExecutionContextProvider, Executor}, diff --git a/iox_query/src/lib.rs b/iox_query/src/lib.rs index 37784ba289..b0341680d3 100644 --- a/iox_query/src/lib.rs +++ b/iox_query/src/lib.rs @@ -18,7 +18,7 @@ use exec::{stringset::StringSet, IOxSessionContext}; use hashbrown::HashMap; use observability_deps::tracing::{debug, trace}; use parquet_file::storage::ParquetExecInput; -use predicate::{rpc_predicate::QueryDatabaseMeta, Predicate, PredicateMatch}; +use predicate::{rpc_predicate::QueryNamespaceMeta, Predicate, PredicateMatch}; use schema::{ sort::{SortKey, SortKeyBuilder}, Projection, Schema, TIME_COLUMN_NAME, @@ -142,7 +142,7 @@ pub type QueryText = Box; /// Databases store data organized by partitions and each partition stores /// data in Chunks. #[async_trait] -pub trait QueryDatabase: QueryDatabaseMeta + Debug + Send + Sync { +pub trait QueryDatabase: QueryNamespaceMeta + Debug + Send + Sync { /// Returns a set of chunks within the partition with data that may match /// the provided predicate. /// @@ -168,10 +168,10 @@ pub trait QueryDatabase: QueryDatabaseMeta + Debug + Send + Sync { query_text: QueryText, ) -> QueryCompletedToken; - /// Upcast to [`QueryDatabaseMeta`]. + /// Upcast to [`QueryNamespaceMeta`]. /// /// This is required until is fixed. - fn as_meta(&self) -> &dyn QueryDatabaseMeta; + fn as_meta(&self) -> &dyn QueryNamespaceMeta; } /// Raw data of a [`QueryChunk`]. diff --git a/iox_query/src/test.rs b/iox_query/src/test.rs index 82e18b01b9..6b578775cb 100644 --- a/iox_query/src/test.rs +++ b/iox_query/src/test.rs @@ -27,7 +27,7 @@ use datafusion::error::DataFusionError; use hashbrown::HashSet; use observability_deps::tracing::debug; use parking_lot::Mutex; -use predicate::rpc_predicate::QueryDatabaseMeta; +use predicate::rpc_predicate::QueryNamespaceMeta; use schema::{ builder::SchemaBuilder, merge::SchemaMerger, sort::SortKey, InfluxColumnType, Projection, Schema, TIME_COLUMN_NAME, @@ -137,12 +137,12 @@ impl QueryDatabase for TestDatabase { QueryCompletedToken::new(|_| {}) } - fn as_meta(&self) -> &dyn QueryDatabaseMeta { + fn as_meta(&self) -> &dyn QueryNamespaceMeta { self } } -impl QueryDatabaseMeta for TestDatabase { +impl QueryNamespaceMeta for TestDatabase { fn table_schema(&self, table_name: &str) -> Option> { let mut merger = SchemaMerger::new(); let mut found_one = false; diff --git a/predicate/src/rpc_predicate.rs b/predicate/src/rpc_predicate.rs index f78c28e13b..d3087c3359 100644 --- a/predicate/src/rpc_predicate.rs +++ b/predicate/src/rpc_predicate.rs @@ -108,7 +108,7 @@ impl InfluxRpcPredicate { /// Returns a list of (TableName, [`Predicate`]) pub fn table_predicates( &self, - table_info: &dyn QueryDatabaseMeta, + table_info: &dyn QueryNamespaceMeta, ) -> DataFusionResult, Predicate)>> { let table_names = match &self.table_names { Some(table_names) => itertools::Either::Left(table_names.iter().cloned()), @@ -146,8 +146,8 @@ impl InfluxRpcPredicate { } /// Information required to normalize predicates -pub trait QueryDatabaseMeta { - /// Returns a list of table names in this DB +pub trait QueryNamespaceMeta { + /// Returns a list of table names in this namespace fn table_names(&self) -> Vec; /// Schema for a specific table if the table exists. diff --git a/querier/src/namespace/query_access.rs b/querier/src/namespace/query_access.rs index 7bb4ab443f..855bb5bbc7 100644 --- a/querier/src/namespace/query_access.rs +++ b/querier/src/namespace/query_access.rs @@ -19,12 +19,12 @@ use iox_query::{ QueryChunk, QueryCompletedToken, QueryDatabase, QueryText, }; use observability_deps::tracing::{debug, trace}; -use predicate::{rpc_predicate::QueryDatabaseMeta, Predicate}; +use predicate::{rpc_predicate::QueryNamespaceMeta, Predicate}; use schema::Schema; use std::{any::Any, collections::HashMap, sync::Arc}; use trace::ctx::SpanContext; -impl QueryDatabaseMeta for QuerierNamespace { +impl QueryNamespaceMeta for QuerierNamespace { fn table_names(&self) -> Vec { let mut names: Vec<_> = self.tables.keys().map(|s| s.to_string()).collect(); names.sort(); @@ -94,7 +94,7 @@ impl QueryDatabase for QuerierNamespace { QueryCompletedToken::new(move |success| query_log.set_completed(entry, success)) } - fn as_meta(&self) -> &dyn QueryDatabaseMeta { + fn as_meta(&self) -> &dyn QueryNamespaceMeta { self } } diff --git a/service_grpc_influxrpc/src/expr.rs b/service_grpc_influxrpc/src/expr.rs index 7de386f7d9..47b500a42c 100644 --- a/service_grpc_influxrpc/src/expr.rs +++ b/service_grpc_influxrpc/src/expr.rs @@ -878,7 +878,7 @@ mod tests { use arrow::datatypes::DataType; use datafusion_util::lit_dict; use generated_types::node::Type as RPCNodeType; - use predicate::{rpc_predicate::QueryDatabaseMeta, Predicate}; + use predicate::{rpc_predicate::QueryNamespaceMeta, Predicate}; use schema::{Schema, SchemaBuilder}; use std::{collections::BTreeSet, sync::Arc}; @@ -895,7 +895,7 @@ mod tests { } } - impl QueryDatabaseMeta for Tables { + impl QueryNamespaceMeta for Tables { fn table_names(&self) -> Vec { self.table_names.clone() } From 0657ad9600a9a58237b7fad769aa8196715d766b Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Fri, 11 Nov 2022 14:20:31 -0500 Subject: [PATCH 4/8] fix: Rename QueryDatabase to QueryNamespace --- iox_query/src/frontend/influxrpc.rs | 105 +++++++++--------- iox_query/src/lib.rs | 24 ++-- iox_query/src/test.rs | 7 +- querier/src/namespace/query_access.rs | 4 +- query_tests/src/db.rs | 10 +- query_tests/src/influxrpc/field_columns.rs | 2 +- query_tests/src/influxrpc/read_filter.rs | 2 +- query_tests/src/influxrpc/read_group.rs | 2 +- .../src/influxrpc/read_window_aggregate.rs | 2 +- query_tests/src/influxrpc/table_names.rs | 2 +- query_tests/src/influxrpc/tag_keys.rs | 2 +- query_tests/src/influxrpc/tag_values.rs | 4 +- service_common/src/lib.rs | 14 +-- service_common/src/planner.rs | 60 +++++----- service_grpc_flight/src/lib.rs | 2 +- service_grpc_influxrpc/src/service.rs | 58 +++++----- 16 files changed, 147 insertions(+), 153 deletions(-) diff --git a/iox_query/src/frontend/influxrpc.rs b/iox_query/src/frontend/influxrpc.rs index 07a68c2714..26dfda0ff2 100644 --- a/iox_query/src/frontend/influxrpc.rs +++ b/iox_query/src/frontend/influxrpc.rs @@ -11,7 +11,7 @@ use crate::{ seriesset::{SeriesSetPlan, SeriesSetPlans}, stringset::{Error as StringSetError, StringSetPlan, StringSetPlanBuilder}, }, - QueryChunk, QueryDatabase, + QueryChunk, QueryNamespace, }; use arrow::datatypes::DataType; use data_types::ChunkId; @@ -226,7 +226,7 @@ impl InfluxRpcPlanner { /// . chunks without deleted data but cannot be decided from meta data pub async fn table_names( &self, - database: Arc, + namespace: Arc, rpc_predicate: InfluxRpcPredicate, ) -> Result { let ctx = self.ctx.child_ctx("table_names planning"); @@ -236,10 +236,10 @@ impl InfluxRpcPlanner { let rpc_predicate = rpc_predicate.clear_timestamp_if_max_range(); let table_predicates = rpc_predicate - .table_predicates(database.as_meta()) + .table_predicates(namespace.as_meta()) .context(CreatingPredicatesSnafu)?; let tables: Vec<_> = - table_chunk_stream(Arc::clone(&database), false, &table_predicates, &ctx) + table_chunk_stream(Arc::clone(&namespace), false, &table_predicates, &ctx) .try_filter_map(|(table_name, predicate, chunks)| async move { // Identify which chunks can answer from its metadata and then record its table, // and which chunks needs full plan and group them into their table @@ -291,7 +291,7 @@ impl InfluxRpcPlanner { builder.append_string(table_name.to_string()); } Some((predicate, chunks)) => { - let schema = database + let schema = namespace .table_schema(table_name) .context(TableRemovedSnafu { table_name: table_name.as_ref(), @@ -315,13 +315,12 @@ impl InfluxRpcPlanner { builder.build().context(CreatingStringSetSnafu) } - /// Returns a set of plans that produces the names of "tag" - /// columns (as defined in the InfluxDB Data model) names in this - /// database that have more than zero rows which pass the - /// conditions specified by `predicate`. + /// Returns a set of plans that produces the names of "tag" columns (as defined in the InfluxDB + /// data model) names in this namespace that have more than zero rows which pass the conditions + /// specified by `predicate`. pub async fn tag_keys( &self, - database: Arc, + namespace: Arc, rpc_predicate: InfluxRpcPredicate, ) -> Result { let ctx = self.ctx.child_ctx("tag_keys planning"); @@ -334,11 +333,11 @@ impl InfluxRpcPlanner { // // 1. Find all the potential tables in the chunks // - // 2. For each table/chunk pair, figure out which can be found - // from only metadata and which need full plans + // 2. For each table/chunk pair, figure out which can be found from only metadata and which + // need full plans let table_predicates = rpc_predicate - .table_predicates(database.as_meta()) + .table_predicates(namespace.as_meta()) .context(CreatingPredicatesSnafu)?; let mut table_predicates_need_chunks = vec![]; @@ -348,7 +347,7 @@ impl InfluxRpcPlanner { // special case - return the columns from metadata only. // Note that columns with all rows deleted will still show here builder = builder.append_other( - database + namespace .table_schema(&table_name) .context(TableRemovedSnafu { table_name: table_name.as_ref(), @@ -364,7 +363,7 @@ impl InfluxRpcPlanner { } let tables: Vec<_> = table_chunk_stream( - Arc::clone(&database), + Arc::clone(&namespace), false, &table_predicates_need_chunks, &ctx, @@ -458,7 +457,7 @@ impl InfluxRpcPlanner { // out chunks (and tables) where all columns in that chunk // were already known to have data (based on the contents of known_columns) - let schema = database + let schema = namespace .table_schema(table_name) .context(TableRemovedSnafu { table_name: table_name.as_ref(), @@ -478,12 +477,11 @@ impl InfluxRpcPlanner { builder.build().context(CreatingStringSetSnafu) } - /// Returns a plan which finds the distinct, non-null tag values - /// in the specified `tag_name` column of this database which pass - /// the conditions specified by `predicate`. + /// Returns a plan which finds the distinct, non-null tag values in the specified `tag_name` + /// column of this namespace which pass the conditions specified by `predicate`. pub async fn tag_values( &self, - database: Arc, + namespace: Arc, tag_name: &str, rpc_predicate: InfluxRpcPredicate, ) -> Result { @@ -499,14 +497,14 @@ impl InfluxRpcPlanner { // which need full plans let table_predicates = rpc_predicate - .table_predicates(database.as_meta()) + .table_predicates(namespace.as_meta()) .context(CreatingPredicatesSnafu)?; - // filter out tables that do NOT contain `tag_name` early, esp. before performing any chunk scan (which includes - // ingester RPC) + // filter out tables that do NOT contain `tag_name` early, esp. before performing any chunk + // scan (which includes ingester RPC) let mut table_predicates_filtered = Vec::with_capacity(table_predicates.len()); for (table_name, predicate) in table_predicates { - let schema = database + let schema = namespace .table_schema(&table_name) .context(TableRemovedSnafu { table_name: table_name.as_ref(), @@ -521,7 +519,7 @@ impl InfluxRpcPlanner { } let tables: Vec<_> = table_chunk_stream( - Arc::clone(&database), + Arc::clone(&namespace), false, &table_predicates_filtered, &ctx, @@ -546,8 +544,8 @@ impl InfluxRpcPlanner { let schema = chunk.schema(); // Skip this table if the tag_name is not a column in this chunk - // Note: This may happen even when the table contains the tag_name, because some chunks may not - // contain all columns. + // Note: This may happen even when the table contains the tag_name, because some + // chunks may not contain all columns. let idx = if let Some(idx) = schema.find_index_of(tag_name) { idx } else { @@ -571,8 +569,8 @@ impl InfluxRpcPlanner { } ); - // If there are delete predicates, we need to scan (or do full plan) the data to eliminate - // deleted data before getting tag values + // If there are delete predicates, we need to scan (or do full plan) the data to + // eliminate deleted data before getting tag values if chunk.has_delete_predicates() { debug!( %table_name, @@ -628,7 +626,7 @@ impl InfluxRpcPlanner { builder = builder.append_other(known_values.into()); if !chunks_full.is_empty() { - let schema = database + let schema = namespace .table_schema(table_name) .context(TableRemovedSnafu { table_name: table_name.as_ref(), @@ -670,13 +668,12 @@ impl InfluxRpcPlanner { builder.build().context(CreatingStringSetSnafu) } - /// Returns a plan that produces a list of columns and their - /// datatypes (as defined in the data written via `write_lines`), - /// and which have more than zero rows which pass the conditions + /// Returns a plan that produces a list of columns and their datatypes (as defined in the data + /// written via `write_lines`), and which have more than zero rows which pass the conditions /// specified by `predicate`. pub async fn field_columns( &self, - database: Arc, + namespace: Arc, rpc_predicate: InfluxRpcPredicate, ) -> Result { let ctx = self.ctx.child_ctx("field_columns planning"); @@ -692,7 +689,7 @@ impl InfluxRpcPlanner { // values and stops the plan executing once it has them let table_predicates = rpc_predicate - .table_predicates(database.as_meta()) + .table_predicates(namespace.as_meta()) .context(CreatingPredicatesSnafu)?; // optimization: just get the field columns from metadata. @@ -701,7 +698,7 @@ impl InfluxRpcPlanner { let mut table_predicates_need_chunks = Vec::with_capacity(table_predicates.len()); for (table_name, predicate) in table_predicates { if predicate.is_empty() { - let schema = database + let schema = namespace .table_schema(&table_name) .context(TableRemovedSnafu { table_name: table_name.as_ref(), @@ -721,7 +718,7 @@ impl InfluxRpcPlanner { // full scans let plans = create_plans( - database, + namespace, &table_predicates_need_chunks, ctx, |ctx, table_name, predicate, chunks, schema| { @@ -762,18 +759,18 @@ impl InfluxRpcPlanner { /// same) occur together in the plan pub async fn read_filter( &self, - database: Arc, + namespace: Arc, rpc_predicate: InfluxRpcPredicate, ) -> Result { let ctx = self.ctx.child_ctx("planning_read_filter"); debug!(?rpc_predicate, "planning read_filter"); let table_predicates = rpc_predicate - .table_predicates(database.as_meta()) + .table_predicates(namespace.as_meta()) .context(CreatingPredicatesSnafu)?; let plans = create_plans( - database, + namespace, &table_predicates, ctx, |ctx, table_name, predicate, chunks, schema| { @@ -813,7 +810,7 @@ impl InfluxRpcPlanner { /// (apply filters) pub async fn read_group( &self, - database: Arc, + namespace: Arc, rpc_predicate: InfluxRpcPredicate, agg: Aggregate, group_columns: &[impl AsRef + Send + Sync], @@ -822,11 +819,11 @@ impl InfluxRpcPlanner { debug!(?rpc_predicate, ?agg, "planning read_group"); let table_predicates = rpc_predicate - .table_predicates(database.as_meta()) + .table_predicates(namespace.as_meta()) .context(CreatingPredicatesSnafu)?; let plans = create_plans( - database, + namespace, &table_predicates, ctx, |ctx, table_name, predicate, chunks, schema| match agg { @@ -863,7 +860,7 @@ impl InfluxRpcPlanner { /// that are grouped by window definitions pub async fn read_window_aggregate( &self, - database: Arc, + namespace: Arc, rpc_predicate: InfluxRpcPredicate, agg: Aggregate, every: WindowDuration, @@ -879,11 +876,11 @@ impl InfluxRpcPlanner { ); let table_predicates = rpc_predicate - .table_predicates(database.as_meta()) + .table_predicates(namespace.as_meta()) .context(CreatingPredicatesSnafu)?; let plans = create_plans( - database, + namespace, &table_predicates, ctx, |ctx, table_name, predicate, chunks, schema| { @@ -1375,7 +1372,7 @@ impl InfluxRpcPlanner { /// This function is indirectly invoked by `field_columns`, `read_filter`, `read_group` and `read_window_aggregate` /// through the function `create_plans` where need_fields should be true. fn table_chunk_stream<'a>( - database: Arc, + namespace: Arc, need_fields: bool, table_predicates: &'a [(Arc, Predicate)], ctx: &'a IOxSessionContext, @@ -1385,9 +1382,9 @@ fn table_chunk_stream<'a>( let mut ctx = ctx.child_ctx("table"); ctx.set_metadata("table", table_name.to_string()); - let database = Arc::clone(&database); + let namespace = Arc::clone(&namespace); - let table_schema = database.table_schema(table_name); + let table_schema = namespace.table_schema(table_name); let projection = match table_schema { Some(table_schema) => { columns_in_predicates(need_fields, table_schema, table_name, predicate) @@ -1396,7 +1393,7 @@ fn table_chunk_stream<'a>( }; async move { - let chunks = database + let chunks = namespace .chunks( table_name, predicate, @@ -1507,7 +1504,7 @@ fn columns_in_predicates( /// `f(ctx, table_name, table_predicate, chunks, table_schema)` is /// invoked on the chunks for each table to produce a plan for each async fn create_plans( - database: Arc, + namespace: Arc, table_predicates: &[(Arc, Predicate)], ctx: IOxSessionContext, f: F, @@ -1525,7 +1522,7 @@ where + Sync, P: Send, { - table_chunk_stream(Arc::clone(&database), true, table_predicates, &ctx) + table_chunk_stream(Arc::clone(&namespace), true, table_predicates, &ctx) .and_then(|(table_name, predicate, chunks)| async move { let chunks = prune_chunks_metadata(chunks, predicate)?; Ok((table_name, predicate, chunks)) @@ -1540,11 +1537,11 @@ where let mut ctx = ctx.child_ctx("table"); ctx.set_metadata("table", table_name.to_string()); - let database = Arc::clone(&database); + let namespace = Arc::clone(&namespace); let f = f.clone(); async move { - let schema = database + let schema = namespace .table_schema(table_name) .context(TableRemovedSnafu { table_name: table_name.as_ref(), diff --git a/iox_query/src/lib.rs b/iox_query/src/lib.rs index b0341680d3..8570ab5b8b 100644 --- a/iox_query/src/lib.rs +++ b/iox_query/src/lib.rs @@ -89,7 +89,7 @@ pub trait QueryChunkMeta { } /// A `QueryCompletedToken` is returned by `record_query` implementations of -/// a `QueryDatabase`. It is used to trigger side-effects (such as query timing) +/// a `QueryNamespace`. It is used to trigger side-effects (such as query timing) /// on query completion. /// pub struct QueryCompletedToken { @@ -136,22 +136,20 @@ impl Drop for QueryCompletedToken { /// This avoids storing potentially large strings pub type QueryText = Box; -/// A `Database` is the main trait implemented by the IOx subsystems -/// that store actual data. +/// `QueryNamespace` is the main trait implemented by the IOx subsystems that store actual data. /// -/// Databases store data organized by partitions and each partition stores -/// data in Chunks. +/// Namespaces store data organized by partitions and each partition stores data in Chunks. #[async_trait] -pub trait QueryDatabase: QueryNamespaceMeta + Debug + Send + Sync { - /// Returns a set of chunks within the partition with data that may match - /// the provided predicate. +pub trait QueryNamespace: QueryNamespaceMeta + Debug + Send + Sync { + /// Returns a set of chunks within the partition with data that may match the provided + /// predicate. /// - /// If possible, chunks which have no rows that can - /// possibly match the predicate may be omitted. + /// If possible, chunks which have no rows that can possibly match the predicate may be omitted. /// - /// If projection is None, returned chunks will include all columns of its original data. Otherwise, - /// returned chunks will include PK columns (tags and time) and columns specified in the projection. Projecting - /// chunks here is optional and a mere optimization. The query subsystem does NOT rely on it. + /// If projection is `None`, returned chunks will include all columns of its original data. + /// Otherwise, returned chunks will include PK columns (tags and time) and columns specified in + /// the projection. Projecting chunks here is optional and a mere optimization. The query + /// subsystem does NOT rely on it. async fn chunks( &self, table_name: &str, diff --git a/iox_query/src/test.rs b/iox_query/src/test.rs index 6b578775cb..6ecdc32291 100644 --- a/iox_query/src/test.rs +++ b/iox_query/src/test.rs @@ -1,5 +1,4 @@ -//! This module provides a reference implementation of -//! [`QueryDatabase`] for use in testing. +//! This module provides a reference implementation of [`QueryNamespace`] for use in testing. //! //! AKA it is a Mock @@ -9,7 +8,7 @@ use crate::{ ExecutionContextProvider, Executor, ExecutorType, IOxSessionContext, }, Predicate, PredicateMatch, QueryChunk, QueryChunkData, QueryChunkMeta, QueryCompletedToken, - QueryDatabase, QueryText, + QueryNamespace, QueryText, }; use arrow::{ array::{ @@ -100,7 +99,7 @@ impl TestDatabase { } #[async_trait] -impl QueryDatabase for TestDatabase { +impl QueryNamespace for TestDatabase { async fn chunks( &self, table_name: &str, diff --git a/querier/src/namespace/query_access.rs b/querier/src/namespace/query_access.rs index 855bb5bbc7..1d6df8fe5c 100644 --- a/querier/src/namespace/query_access.rs +++ b/querier/src/namespace/query_access.rs @@ -16,7 +16,7 @@ use datafusion::{ use datafusion_util::config::DEFAULT_SCHEMA; use iox_query::{ exec::{ExecutionContextProvider, ExecutorType, IOxSessionContext}, - QueryChunk, QueryCompletedToken, QueryDatabase, QueryText, + QueryChunk, QueryCompletedToken, QueryNamespace, QueryText, }; use observability_deps::tracing::{debug, trace}; use predicate::{rpc_predicate::QueryNamespaceMeta, Predicate}; @@ -37,7 +37,7 @@ impl QueryNamespaceMeta for QuerierNamespace { } #[async_trait] -impl QueryDatabase for QuerierNamespace { +impl QueryNamespace for QuerierNamespace { async fn chunks( &self, table_name: &str, diff --git a/query_tests/src/db.rs b/query_tests/src/db.rs index c6d4362e0a..d2d7245892 100644 --- a/query_tests/src/db.rs +++ b/query_tests/src/db.rs @@ -1,11 +1,11 @@ use std::{any::Any, sync::Arc}; use datafusion::catalog::catalog::CatalogProvider; -use iox_query::{exec::ExecutionContextProvider, QueryDatabase}; +use iox_query::{exec::ExecutionContextProvider, QueryNamespace}; use querier::QuerierNamespace; /// Abstract database used during testing. -pub trait AbstractDb: CatalogProvider + ExecutionContextProvider + QueryDatabase { +pub trait AbstractDb: CatalogProvider + ExecutionContextProvider + QueryNamespace { fn as_any_arc(self: Arc) -> Arc; /// Upcast to [`CatalogProvider`]. @@ -13,10 +13,10 @@ pub trait AbstractDb: CatalogProvider + ExecutionContextProvider + QueryDatabase /// This is required due to . fn as_catalog_provider_arc(self: Arc) -> Arc; - /// Upcast to [`QueryDatabase`]. + /// Upcast to [`QueryNamespace`]. /// /// This is required due to . - fn as_query_database_arc(self: Arc) -> Arc; + fn as_query_namespace_arc(self: Arc) -> Arc; } impl AbstractDb for QuerierNamespace { @@ -28,7 +28,7 @@ impl AbstractDb for QuerierNamespace { self as _ } - fn as_query_database_arc(self: Arc) -> Arc { + fn as_query_namespace_arc(self: Arc) -> Arc { self } } diff --git a/query_tests/src/influxrpc/field_columns.rs b/query_tests/src/influxrpc/field_columns.rs index dc3fce2297..70a8021cac 100644 --- a/query_tests/src/influxrpc/field_columns.rs +++ b/query_tests/src/influxrpc/field_columns.rs @@ -32,7 +32,7 @@ async fn run_field_columns_test_case( let planner = InfluxRpcPlanner::new(ctx.child_ctx("planner")); let plan = planner - .field_columns(db.as_query_database_arc(), predicate.clone()) + .field_columns(db.as_query_namespace_arc(), predicate.clone()) .await .expect("built plan successfully"); let fields = ctx diff --git a/query_tests/src/influxrpc/read_filter.rs b/query_tests/src/influxrpc/read_filter.rs index c0ecfe18c7..d90253330e 100644 --- a/query_tests/src/influxrpc/read_filter.rs +++ b/query_tests/src/influxrpc/read_filter.rs @@ -64,7 +64,7 @@ async fn run_read_filter( let planner = InfluxRpcPlanner::new(ctx.child_ctx("planner")); let plan = planner - .read_filter(db.as_query_database_arc(), predicate) + .read_filter(db.as_query_namespace_arc(), predicate) .await .map_err(|e| e.to_string())?; diff --git a/query_tests/src/influxrpc/read_group.rs b/query_tests/src/influxrpc/read_group.rs index 4e9f964c2c..08adc434a7 100644 --- a/query_tests/src/influxrpc/read_group.rs +++ b/query_tests/src/influxrpc/read_group.rs @@ -38,7 +38,7 @@ async fn run_read_group_test_case( let plans = planner .read_group( - db.as_query_database_arc(), + db.as_query_namespace_arc(), predicate.clone(), agg, &group_columns, diff --git a/query_tests/src/influxrpc/read_window_aggregate.rs b/query_tests/src/influxrpc/read_window_aggregate.rs index 65942baa1b..2f6445deaa 100644 --- a/query_tests/src/influxrpc/read_window_aggregate.rs +++ b/query_tests/src/influxrpc/read_window_aggregate.rs @@ -30,7 +30,7 @@ async fn run_read_window_aggregate_test_case( let plan = planner .read_window_aggregate( - db.as_query_database_arc(), + db.as_query_namespace_arc(), predicate.clone(), agg, every, diff --git a/query_tests/src/influxrpc/table_names.rs b/query_tests/src/influxrpc/table_names.rs index f787b36410..e171c4fc9d 100644 --- a/query_tests/src/influxrpc/table_names.rs +++ b/query_tests/src/influxrpc/table_names.rs @@ -29,7 +29,7 @@ async fn run_table_names_test_case( let planner = InfluxRpcPlanner::new(ctx.child_ctx("planner")); let plan = planner - .table_names(db.as_query_database_arc(), predicate.clone()) + .table_names(db.as_query_namespace_arc(), predicate.clone()) .await .expect("built plan successfully"); diff --git a/query_tests/src/influxrpc/tag_keys.rs b/query_tests/src/influxrpc/tag_keys.rs index 72551599f4..aeac01d474 100644 --- a/query_tests/src/influxrpc/tag_keys.rs +++ b/query_tests/src/influxrpc/tag_keys.rs @@ -31,7 +31,7 @@ async fn run_tag_keys_test_case( let planner = InfluxRpcPlanner::new(ctx.child_ctx("planner")); let plan = planner - .tag_keys(db.as_query_database_arc(), predicate.clone()) + .tag_keys(db.as_query_namespace_arc(), predicate.clone()) .await .expect("built plan successfully"); let names = ctx diff --git a/query_tests/src/influxrpc/tag_values.rs b/query_tests/src/influxrpc/tag_values.rs index b98b2fc62e..a6b61149a6 100644 --- a/query_tests/src/influxrpc/tag_values.rs +++ b/query_tests/src/influxrpc/tag_values.rs @@ -30,7 +30,7 @@ async fn run_tag_values_test_case( let planner = InfluxRpcPlanner::new(ctx.child_ctx("planner")); let plan = planner - .tag_values(db.as_query_database_arc(), tag_name, predicate.clone()) + .tag_values(db.as_query_namespace_arc(), tag_name, predicate.clone()) .await .expect("built plan successfully"); let names = ctx @@ -292,7 +292,7 @@ async fn list_tag_values_field_col_on_tag() { let tag_name = "temp"; let plan_result = planner .tag_values( - db.as_query_database_arc(), + db.as_query_namespace_arc(), tag_name, InfluxRpcPredicate::default(), ) diff --git a/service_common/src/lib.rs b/service_common/src/lib.rs index 99cff740ee..242f51ccd0 100644 --- a/service_common/src/lib.rs +++ b/service_common/src/lib.rs @@ -7,20 +7,20 @@ pub mod test_util; use std::sync::Arc; use async_trait::async_trait; -use iox_query::{exec::ExecutionContextProvider, QueryDatabase}; +use iox_query::{exec::ExecutionContextProvider, QueryNamespace}; use trace::span::Span; use tracker::InstrumentedAsyncOwnedSemaphorePermit; -/// Trait that allows the query engine (which includes flight and storage/InfluxRPC) to access a virtual set of -/// databases. +/// Trait that allows the query engine (which includes flight and storage/InfluxRPC) to access a +/// virtual set of namespaces. /// -/// The query engine MUST ONLY use this trait to access the databases / catalogs. +/// The query engine MUST ONLY use this trait to access the namespaces / catalogs. #[async_trait] pub trait QueryDatabaseProvider: std::fmt::Debug + Send + Sync + 'static { - /// Abstract database. - type Db: ExecutionContextProvider + QueryDatabase; + /// Abstract namespace. + type Db: ExecutionContextProvider + QueryNamespace; - /// Get database if it exists. + /// Get namespace if it exists. async fn db(&self, name: &str, span: Option) -> Option>; /// Acquire concurrency-limiting sempahore diff --git a/service_common/src/planner.rs b/service_common/src/planner.rs index e1bc5adf71..67de772667 100644 --- a/service_common/src/planner.rs +++ b/service_common/src/planner.rs @@ -6,7 +6,7 @@ use iox_query::{ exec::IOxSessionContext, frontend::{influxrpc::InfluxRpcPlanner, sql::SqlQueryPlanner}, plan::{fieldlist::FieldListPlan, seriesset::SeriesSetPlans, stringset::StringSetPlan}, - Aggregate, QueryDatabase, WindowDuration, + Aggregate, QueryNamespace, WindowDuration, }; pub use datafusion::error::{DataFusionError as Error, Result}; @@ -31,7 +31,7 @@ impl Planner { } } - /// Plan a SQL query against the data in `database`, and return a + /// Plan a SQL query against the data in a namespace, and return a /// DataFusion physical execution plan. pub async fn sql(&self, query: impl Into + Send) -> Result> { let planner = SqlQueryPlanner::new(); @@ -45,20 +45,20 @@ impl Planner { /// Creates a plan as described on /// [`InfluxRpcPlanner::table_names`], on a separate threadpool - pub async fn table_names( + pub async fn table_names( &self, - database: Arc, + namespace: Arc, predicate: InfluxRpcPredicate, ) -> Result where - D: QueryDatabase + 'static, + N: QueryNamespace + 'static, { let planner = InfluxRpcPlanner::new(self.ctx.child_ctx("planner table_names")); self.ctx .run(async move { planner - .table_names(database, predicate) + .table_names(namespace, predicate) .await .map_err(|e| e.to_df_error("table_names")) }) @@ -67,20 +67,20 @@ impl Planner { /// Creates a plan as described on /// [`InfluxRpcPlanner::tag_keys`], on a separate threadpool - pub async fn tag_keys( + pub async fn tag_keys( &self, - database: Arc, + namespace: Arc, predicate: InfluxRpcPredicate, ) -> Result where - D: QueryDatabase + 'static, + N: QueryNamespace + 'static, { let planner = InfluxRpcPlanner::new(self.ctx.child_ctx("planner tag_keys")); self.ctx .run(async move { planner - .tag_keys(database, predicate) + .tag_keys(namespace, predicate) .await .map_err(|e| e.to_df_error("tag_keys")) }) @@ -89,14 +89,14 @@ impl Planner { /// Creates a plan as described on /// [`InfluxRpcPlanner::tag_values`], on a separate threadpool - pub async fn tag_values( + pub async fn tag_values( &self, - database: Arc, + namespace: Arc, tag_name: impl Into + Send, predicate: InfluxRpcPredicate, ) -> Result where - D: QueryDatabase + 'static, + N: QueryNamespace + 'static, { let tag_name = tag_name.into(); let planner = InfluxRpcPlanner::new(self.ctx.child_ctx("planner tag_values")); @@ -104,7 +104,7 @@ impl Planner { self.ctx .run(async move { planner - .tag_values(database, &tag_name, predicate) + .tag_values(namespace, &tag_name, predicate) .await .map_err(|e| e.to_df_error("tag_values")) }) @@ -113,20 +113,20 @@ impl Planner { /// Creates a plan as described on /// [`InfluxRpcPlanner::field_columns`], on a separate threadpool - pub async fn field_columns( + pub async fn field_columns( &self, - database: Arc, + namespace: Arc, predicate: InfluxRpcPredicate, ) -> Result where - D: QueryDatabase + 'static, + N: QueryNamespace + 'static, { let planner = InfluxRpcPlanner::new(self.ctx.child_ctx("planner field_columns")); self.ctx .run(async move { planner - .field_columns(database, predicate) + .field_columns(namespace, predicate) .await .map_err(|e| e.to_df_error("field_columns")) }) @@ -135,20 +135,20 @@ impl Planner { /// Creates a plan as described on /// [`InfluxRpcPlanner::read_filter`], on a separate threadpool - pub async fn read_filter( + pub async fn read_filter( &self, - database: Arc, + namespace: Arc, predicate: InfluxRpcPredicate, ) -> Result where - D: QueryDatabase + 'static, + N: QueryNamespace + 'static, { let planner = InfluxRpcPlanner::new(self.ctx.child_ctx("planner read_filter")); self.ctx .run(async move { planner - .read_filter(database, predicate) + .read_filter(namespace, predicate) .await .map_err(|e| e.to_df_error("read_filter")) }) @@ -157,22 +157,22 @@ impl Planner { /// Creates a plan as described on /// [`InfluxRpcPlanner::read_group`], on a separate threadpool - pub async fn read_group( + pub async fn read_group( &self, - database: Arc, + namespace: Arc, predicate: InfluxRpcPredicate, agg: Aggregate, group_columns: Vec, ) -> Result where - D: QueryDatabase + 'static, + N: QueryNamespace + 'static, { let planner = InfluxRpcPlanner::new(self.ctx.child_ctx("planner read_group")); self.ctx .run(async move { planner - .read_group(database, predicate, agg, &group_columns) + .read_group(namespace, predicate, agg, &group_columns) .await .map_err(|e| e.to_df_error("read_group")) }) @@ -181,23 +181,23 @@ impl Planner { /// Creates a plan as described on /// [`InfluxRpcPlanner::read_window_aggregate`], on a separate threadpool - pub async fn read_window_aggregate( + pub async fn read_window_aggregate( &self, - database: Arc, + namespace: Arc, predicate: InfluxRpcPredicate, agg: Aggregate, every: WindowDuration, offset: WindowDuration, ) -> Result where - D: QueryDatabase + 'static, + N: QueryNamespace + 'static, { let planner = InfluxRpcPlanner::new(self.ctx.child_ctx("planner read_window_aggregate")); self.ctx .run(async move { planner - .read_window_aggregate(database, predicate, agg, every, offset) + .read_window_aggregate(namespace, predicate, agg, every, offset) .await .map_err(|e| e.to_df_error("read_window_aggregate")) }) diff --git a/service_grpc_flight/src/lib.rs b/service_grpc_flight/src/lib.rs index d90e3991ea..0d8b61ef82 100644 --- a/service_grpc_flight/src/lib.rs +++ b/service_grpc_flight/src/lib.rs @@ -14,7 +14,7 @@ use futures::{SinkExt, Stream, StreamExt}; use generated_types::influxdata::iox::querier::v1 as proto; use iox_query::{ exec::{ExecutionContextProvider, IOxSessionContext}, - QueryCompletedToken, QueryDatabase, + QueryCompletedToken, QueryNamespace, }; use observability_deps::tracing::{debug, info, warn}; use pin_project::{pin_project, pinned_drop}; diff --git a/service_grpc_influxrpc/src/service.rs b/service_grpc_influxrpc/src/service.rs index cc07195d7c..e96eafb1e6 100644 --- a/service_grpc_influxrpc/src/service.rs +++ b/service_grpc_influxrpc/src/service.rs @@ -1,5 +1,5 @@ //! This module contains implementations for the storage gRPC service -//! implemented in terms of the [`QueryDatabase`](iox_query::QueryDatabase). +//! implemented in terms of the [`QueryNamespace`](iox_query::QueryNamespace). use super::{TAG_KEY_FIELD, TAG_KEY_MEASUREMENT}; use crate::{ @@ -30,7 +30,7 @@ use iox_query::{ fieldlist::FieldList, seriesset::converter::Error as SeriesSetError, ExecutionContextProvider, IOxSessionContext, }, - QueryDatabase, QueryText, + QueryNamespace, QueryText, }; use observability_deps::tracing::{error, info, trace, warn}; use pin_project::pin_project; @@ -1015,15 +1015,15 @@ fn get_namespace_name(input: &impl GrpcInputs) -> Result, /// Gathers all measurement names that have data in the specified /// (optional) range -async fn measurement_name_impl( - db: Arc, +async fn measurement_name_impl( + db: Arc, db_name: NamespaceName<'static>, range: Option, rpc_predicate: Option, ctx: &IOxSessionContext, ) -> Result where - D: QueryDatabase + ExecutionContextProvider + 'static, + N: QueryNamespace + ExecutionContextProvider + 'static, { let rpc_predicate_string = format!("{:?}", rpc_predicate); let db_name = db_name.as_str(); @@ -1058,8 +1058,8 @@ where /// Return tag keys with optional measurement, timestamp and arbitrary /// predicates -async fn tag_keys_impl( - db: Arc, +async fn tag_keys_impl( + db: Arc, db_name: NamespaceName<'static>, measurement: Option, range: Option, @@ -1067,7 +1067,7 @@ async fn tag_keys_impl( ctx: &IOxSessionContext, ) -> Result where - D: QueryDatabase + ExecutionContextProvider + 'static, + N: QueryNamespace + ExecutionContextProvider + 'static, { let rpc_predicate_string = format!("{:?}", rpc_predicate); let db_name = db_name.as_str(); @@ -1100,8 +1100,8 @@ where /// Return tag values for tag_name, with optional measurement, timestamp and /// arbitratry predicates -async fn tag_values_impl( - db: Arc, +async fn tag_values_impl( + db: Arc, db_name: NamespaceName<'static>, tag_name: String, measurement: Option, @@ -1110,7 +1110,7 @@ async fn tag_values_impl( ctx: &IOxSessionContext, ) -> Result where - D: QueryDatabase + ExecutionContextProvider + 'static, + N: QueryNamespace + ExecutionContextProvider + 'static, { let rpc_predicate_string = format!("{:?}", rpc_predicate); @@ -1148,14 +1148,14 @@ where /// Return tag values grouped by one or more measurements with optional /// filtering predicate and optionally scoped to one or more tag keys. -async fn tag_values_grouped_by_measurement_and_tag_key_impl( - db: Arc, +async fn tag_values_grouped_by_measurement_and_tag_key_impl( + db: Arc, db_name: NamespaceName<'static>, req: TagValuesGroupedByMeasurementAndTagKeyRequest, ctx: &IOxSessionContext, ) -> Result, Error> where - D: QueryDatabase + ExecutionContextProvider + 'static, + N: QueryNamespace + ExecutionContextProvider + 'static, { // Extract the tag key predicate. // See https://docs.influxdata.com/influxdb/v1.8/query_language/explore-schema/#show-tag-values @@ -1222,14 +1222,14 @@ where } /// Launch async tasks that materialises the result of executing read_filter. -async fn read_filter_impl( - db: Arc, +async fn read_filter_impl( + db: Arc, db_name: NamespaceName<'static>, req: ReadFilterRequest, ctx: &IOxSessionContext, ) -> Result, Error> where - D: QueryDatabase + ExecutionContextProvider + 'static, + N: QueryNamespace + ExecutionContextProvider + 'static, { let db_name = db_name.as_str(); @@ -1267,8 +1267,8 @@ where } /// Launch async tasks that send the result of executing read_group to `tx` -async fn query_group_impl( - db: Arc, +async fn query_group_impl( + db: Arc, db_name: NamespaceName<'static>, range: Option, rpc_predicate: Option, @@ -1277,7 +1277,7 @@ async fn query_group_impl( ctx: &IOxSessionContext, ) -> Result, Error> where - D: QueryDatabase + ExecutionContextProvider + 'static, + N: QueryNamespace + ExecutionContextProvider + 'static, { let db_name = db_name.as_str(); @@ -1324,8 +1324,8 @@ where /// Return field names, restricted via optional measurement, timestamp and /// predicate -async fn field_names_impl( - db: Arc, +async fn field_names_impl( + db: Arc, db_name: NamespaceName<'static>, measurement: Option, range: Option, @@ -1333,7 +1333,7 @@ async fn field_names_impl( ctx: &IOxSessionContext, ) -> Result where - D: QueryDatabase + ExecutionContextProvider + 'static, + N: QueryNamespace + ExecutionContextProvider + 'static, { let rpc_predicate_string = format!("{:?}", rpc_predicate); @@ -1364,14 +1364,14 @@ where /// Materialises a collection of measurement names. Typically used as part of /// a plan to scope and group multiple plans by measurement name. -async fn materialise_measurement_names( - db: Arc, +async fn materialise_measurement_names( + db: Arc, db_name: NamespaceName<'static>, measurement_exprs: Vec, ctx: &IOxSessionContext, ) -> Result, Error> where - D: QueryDatabase + ExecutionContextProvider + 'static, + N: QueryNamespace + ExecutionContextProvider + 'static, { use generated_types::{ node::{Comparison, Type, Value}, @@ -1442,15 +1442,15 @@ where /// /// TODO(edd): this might be better represented as a plan against the `columns` /// system table. -async fn materialise_tag_keys( - db: Arc, +async fn materialise_tag_keys( + db: Arc, db_name: NamespaceName<'static>, measurement_name: String, tag_key_predicate: tag_key_predicate::Value, ctx: &IOxSessionContext, ) -> Result, Error> where - D: QueryDatabase + ExecutionContextProvider + 'static, + N: QueryNamespace + ExecutionContextProvider + 'static, { use generated_types::tag_key_predicate::Value; From 3dde82b3b927ab376d184a9b52921811dc15f69f Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Fri, 11 Nov 2022 14:22:28 -0500 Subject: [PATCH 5/8] fix: Rename QueryDatabaseProvider to QueryNamespaceProvider --- querier/src/database.rs | 4 ++-- service_common/src/lib.rs | 2 +- service_common/src/test_util.rs | 4 ++-- service_grpc_flight/src/lib.rs | 10 +++++----- service_grpc_influxrpc/src/lib.rs | 6 +++--- service_grpc_influxrpc/src/service.rs | 6 +++--- 6 files changed, 16 insertions(+), 16 deletions(-) diff --git a/querier/src/database.rs b/querier/src/database.rs index 38185c0193..ad88f55dce 100644 --- a/querier/src/database.rs +++ b/querier/src/database.rs @@ -9,7 +9,7 @@ use backoff::{Backoff, BackoffConfig}; use data_types::{Namespace, ShardIndex}; use iox_catalog::interface::Catalog; use iox_query::exec::Executor; -use service_common::QueryDatabaseProvider; +use service_common::QueryNamespaceProvider; use sharder::JumpHash; use snafu::Snafu; use std::{collections::BTreeSet, sync::Arc}; @@ -79,7 +79,7 @@ pub struct QuerierDatabase { } #[async_trait] -impl QueryDatabaseProvider for QuerierDatabase { +impl QueryNamespaceProvider for QuerierDatabase { type Db = QuerierNamespace; async fn db(&self, name: &str, span: Option) -> Option> { diff --git a/service_common/src/lib.rs b/service_common/src/lib.rs index 242f51ccd0..3b3486b415 100644 --- a/service_common/src/lib.rs +++ b/service_common/src/lib.rs @@ -16,7 +16,7 @@ use tracker::InstrumentedAsyncOwnedSemaphorePermit; /// /// The query engine MUST ONLY use this trait to access the namespaces / catalogs. #[async_trait] -pub trait QueryDatabaseProvider: std::fmt::Debug + Send + Sync + 'static { +pub trait QueryNamespaceProvider: std::fmt::Debug + Send + Sync + 'static { /// Abstract namespace. type Db: ExecutionContextProvider + QueryNamespace; diff --git a/service_common/src/test_util.rs b/service_common/src/test_util.rs index 516ca951d1..66608a7961 100644 --- a/service_common/src/test_util.rs +++ b/service_common/src/test_util.rs @@ -8,7 +8,7 @@ use tracker::{ AsyncSemaphoreMetrics, InstrumentedAsyncOwnedSemaphorePermit, InstrumentedAsyncSemaphore, }; -use crate::QueryDatabaseProvider; +use crate::QueryNamespaceProvider; #[derive(Debug)] pub struct TestDatabaseStore { @@ -57,7 +57,7 @@ impl Default for TestDatabaseStore { } #[async_trait] -impl QueryDatabaseProvider for TestDatabaseStore { +impl QueryNamespaceProvider for TestDatabaseStore { type Db = TestDatabase; /// Retrieve the database specified name diff --git a/service_grpc_flight/src/lib.rs b/service_grpc_flight/src/lib.rs index 0d8b61ef82..558798c559 100644 --- a/service_grpc_flight/src/lib.rs +++ b/service_grpc_flight/src/lib.rs @@ -20,7 +20,7 @@ use observability_deps::tracing::{debug, info, warn}; use pin_project::{pin_project, pinned_drop}; use prost::Message; use serde::Deserialize; -use service_common::{datafusion_error_to_tonic_code, planner::Planner, QueryDatabaseProvider}; +use service_common::{datafusion_error_to_tonic_code, planner::Planner, QueryNamespaceProvider}; use snafu::{ResultExt, Snafu}; use std::{fmt::Debug, pin::Pin, sync::Arc, task::Poll, time::Instant}; use tokio::task::JoinHandle; @@ -151,21 +151,21 @@ impl ReadInfo { #[derive(Debug)] struct FlightService where - S: QueryDatabaseProvider, + S: QueryNamespaceProvider, { server: Arc, } pub fn make_server(server: Arc) -> FlightServer where - S: QueryDatabaseProvider, + S: QueryNamespaceProvider, { FlightServer::new(FlightService { server }) } impl FlightService where - S: QueryDatabaseProvider, + S: QueryNamespaceProvider, { async fn run_query( &self, @@ -198,7 +198,7 @@ where #[tonic::async_trait] impl Flight for FlightService where - S: QueryDatabaseProvider, + S: QueryNamespaceProvider, { type HandshakeStream = TonicStream; type ListFlightsStream = TonicStream; diff --git a/service_grpc_influxrpc/src/lib.rs b/service_grpc_influxrpc/src/lib.rs index dc26550ec3..6f1e7b58b7 100644 --- a/service_grpc_influxrpc/src/lib.rs +++ b/service_grpc_influxrpc/src/lib.rs @@ -15,16 +15,16 @@ pub mod input; pub mod service; use generated_types::storage_server::{Storage, StorageServer}; -use service_common::QueryDatabaseProvider; +use service_common::QueryNamespaceProvider; use std::sync::Arc; /// Concrete implementation of the gRPC InfluxDB Storage Service API #[derive(Debug)] -struct StorageService { +struct StorageService { pub db_store: Arc, } -pub fn make_server( +pub fn make_server( db_store: Arc, ) -> StorageServer { StorageServer::new(StorageService { db_store }) diff --git a/service_grpc_influxrpc/src/service.rs b/service_grpc_influxrpc/src/service.rs index e96eafb1e6..18bc674a88 100644 --- a/service_grpc_influxrpc/src/service.rs +++ b/service_grpc_influxrpc/src/service.rs @@ -34,7 +34,7 @@ use iox_query::{ }; use observability_deps::tracing::{error, info, trace, warn}; use pin_project::pin_project; -use service_common::{datafusion_error_to_tonic_code, planner::Planner, QueryDatabaseProvider}; +use service_common::{datafusion_error_to_tonic_code, planner::Planner, QueryNamespaceProvider}; use snafu::{OptionExt, ResultExt, Snafu}; use std::{ collections::{BTreeSet, HashMap}, @@ -236,11 +236,11 @@ fn add_headers(metadata: &mut MetadataMap) { metadata.insert("storage-type", "iox".parse().unwrap()); } -/// Implements the protobuf defined Storage service for a [`QueryDatabaseProvider`] +/// Implements the protobuf defined Storage service for a [`QueryNamespaceProvider`] #[tonic::async_trait] impl Storage for StorageService where - T: QueryDatabaseProvider + 'static, + T: QueryNamespaceProvider + 'static, { type ReadFilterStream = StreamWithPermit>>>; From 042bf545fe0a898f84e8b1d237b7cb04fda5be47 Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Fri, 11 Nov 2022 14:45:37 -0500 Subject: [PATCH 6/8] fix: Rename database types/vars to namespace in the storage command code --- influxdb_iox/src/commands/storage.rs | 49 +++++++++++----------------- 1 file changed, 19 insertions(+), 30 deletions(-) diff --git a/influxdb_iox/src/commands/storage.rs b/influxdb_iox/src/commands/storage.rs index d87faf30f6..d6f8001d31 100644 --- a/influxdb_iox/src/commands/storage.rs +++ b/influxdb_iox/src/commands/storage.rs @@ -1,26 +1,23 @@ pub(crate) mod request; pub(crate) mod response; -use std::num::NonZeroU64; -use std::time::Duration; - -use snafu::{ResultExt, Snafu}; -use tonic::Status; - use generated_types::{ aggregate::AggregateType, influxdata::platform::storage::read_group_request::Group, Predicate, }; use influxdb_storage_client::{connection::Connection, Client, OrgAndBucket}; use influxrpc_parser::predicate; use iox_time; +use snafu::{ensure, OptionExt, ResultExt, Snafu}; +use std::{num::NonZeroU64, time::Duration}; +use tonic::Status; #[derive(Debug, Snafu)] pub enum ParseError { #[snafu(display("unable to parse timestamp '{:?}'", t))] Timestamp { t: String }, - #[snafu(display("unable to parse database name '{:?}'", db_name))] - DBName { db_name: String }, + #[snafu(display("unable to parse namespace name '{:?}'", db_name))] + NamespaceName { db_name: String }, #[snafu(display("unable to parse predicate: {:?}", source))] Predicate { source: predicate::Error }, @@ -55,9 +52,9 @@ pub struct Config { #[clap(subcommand)] command: Command, - /// The name of the database + /// The name of the namespace #[clap( - value_parser = parse_db_name, + value_parser = parse_namespace_name, )] db_name: OrgAndBucket, @@ -121,31 +118,23 @@ fn parse_predicate(expr: &str) -> Result { predicate::expr_to_rpc_predicate(expr).context(PredicateSnafu) } -// Attempts to parse the database name into and org and bucket ID. -fn parse_db_name(db_name: &str) -> Result { +// Attempts to parse the namespace name into and org and bucket ID. +fn parse_namespace_name(db_name: &str) -> Result { let parts = db_name.split('_').collect::>(); - if parts.len() != 2 { - return DBNameSnafu { - db_name: db_name.to_owned(), - } - .fail(); - } - let org_id = usize::from_str_radix(parts[0], 16).map_err(|_| ParseError::DBName { - db_name: db_name.to_owned(), - })?; + ensure!(parts.len() == 2, NamespaceNameSnafu { db_name }); - let bucket_id = usize::from_str_radix(parts[1], 16).map_err(|_| ParseError::DBName { - db_name: db_name.to_owned(), - })?; + let org_id = usize::from_str_radix(parts[0], 16) + .ok() + .context(NamespaceNameSnafu { db_name })?; + + let bucket_id = usize::from_str_radix(parts[1], 16) + .ok() + .context(NamespaceNameSnafu { db_name })?; Ok(OrgAndBucket::new( - NonZeroU64::new(org_id as u64).ok_or_else(|| ParseError::DBName { - db_name: db_name.to_owned(), - })?, - NonZeroU64::new(bucket_id as u64).ok_or_else(|| ParseError::DBName { - db_name: db_name.to_owned(), - })?, + NonZeroU64::new(org_id as u64).context(NamespaceNameSnafu { db_name })?, + NonZeroU64::new(bucket_id as u64).context(NamespaceNameSnafu { db_name })?, )) } From 51afe53d23ac9a4b728595041c4230bed65a2d4e Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Fri, 11 Nov 2022 14:59:49 -0500 Subject: [PATCH 7/8] fix: Rename database to namespace in service_grpc_flight types, vars, and errors --- service_grpc_flight/src/lib.rs | 48 +++++++++++++++++----------------- 1 file changed, 24 insertions(+), 24 deletions(-) diff --git a/service_grpc_flight/src/lib.rs b/service_grpc_flight/src/lib.rs index 558798c559..f50d94383d 100644 --- a/service_grpc_flight/src/lib.rs +++ b/service_grpc_flight/src/lib.rs @@ -44,20 +44,20 @@ pub enum Error { source: serde_json::Error, }, - #[snafu(display("Database {} not found", database_name))] - DatabaseNotFound { database_name: String }, + #[snafu(display("Namespace {} not found", namespace_name))] + NamespaceNotFound { namespace_name: String }, #[snafu(display( - "Internal error reading points from database {}: {}", - database_name, + "Internal error reading points from namespace {}: {}", + namespace_name, source ))] Query { - database_name: String, + namespace_name: String, source: DataFusionError, }, - #[snafu(display("Invalid database name: {}", source))] + #[snafu(display("Invalid namespace name: {}", source))] InvalidNamespaceName { source: NamespaceNameError }, #[snafu(display("Failed to optimize record batch: {}", source))] @@ -81,7 +81,7 @@ impl From for tonic::Status { // logging is handled for any new error variants. let msg = "Error handling Flight gRPC request"; match err { - Error::DatabaseNotFound { .. } + Error::NamespaceNotFound { .. } | Error::InvalidTicket { .. } | Error::InvalidTicketLegacy { .. } | Error::InvalidQuery { .. } @@ -102,7 +102,7 @@ impl Error { let msg = self.to_string(); let code = match self { - Self::DatabaseNotFound { .. } => tonic::Code::NotFound, + Self::NamespaceNotFound { .. } => tonic::Code::NotFound, Self::InvalidTicket { .. } | Self::InvalidTicketLegacy { .. } | Self::InvalidQuery { .. } @@ -122,7 +122,7 @@ type TonicStream = Pin> + Send #[derive(Deserialize, Debug)] /// Body of the `Ticket` serialized and sent to the do_get endpoint. struct ReadInfo { - database_name: String, + namespace_name: String, sql_query: String, } @@ -141,7 +141,7 @@ impl ReadInfo { proto::ReadInfo::decode(Bytes::from(ticket.to_vec())).context(InvalidTicketSnafu {})?; Ok(Self { - database_name: read_info.namespace_name, + namespace_name: read_info.namespace_name, sql_query: read_info.sql_query, }) } @@ -172,13 +172,13 @@ where span_ctx: Option, permit: InstrumentedAsyncOwnedSemaphorePermit, sql_query: String, - database: String, + namespace: String, ) -> Result>, tonic::Status> { let db = self .server - .db(&database, span_ctx.child_span("get namespace")) + .db(&namespace, span_ctx.child_span("get namespace")) .await - .ok_or_else(|| tonic::Status::not_found(format!("Unknown namespace: {database}")))?; + .ok_or_else(|| tonic::Status::not_found(format!("Unknown namespace: {namespace}")))?; let ctx = db.new_query_context(span_ctx); let query_completed_token = db.record_query(&ctx, "sql", Box::new(sql_query.clone())); @@ -189,7 +189,7 @@ where .context(PlanningSnafu)?; let output = - GetStream::new(ctx, physical_plan, database, query_completed_token, permit).await?; + GetStream::new(ctx, physical_plan, namespace, query_completed_token, permit).await?; Ok(Response::new(Box::pin(output) as TonicStream)) } @@ -231,10 +231,10 @@ where }); if let Err(e) = &read_info { - info!(%e, "Error decoding database and SQL query name from flight ticket"); + info!(%e, "Error decoding namespace and SQL query name from flight ticket"); }; let ReadInfo { - database_name, + namespace_name, sql_query, } = read_info?; @@ -245,17 +245,17 @@ where // Log after we acquire the permit and are about to start execution let start = Instant::now(); - info!(db_name=%database_name, %sql_query, %trace, "Running SQL via flight do_get"); + info!(%namespace_name, %sql_query, %trace, "Running SQL via flight do_get"); let response = self - .run_query(span_ctx, permit, sql_query.clone(), database_name.clone()) + .run_query(span_ctx, permit, sql_query.clone(), namespace_name.clone()) .await; if let Err(e) = &response { - info!(db_name=%database_name, %sql_query, %trace, %e, "Error running SQL query"); + info!(%namespace_name, %sql_query, %trace, %e, "Error running SQL query"); } else { let elapsed = Instant::now() - start; - debug!(db_name=%database_name,%sql_query,%trace, ?elapsed, "Completed SQL query successfully"); + debug!(%namespace_name,%sql_query,%trace, ?elapsed, "Completed SQL query successfully"); } response } @@ -330,7 +330,7 @@ impl GetStream { async fn new( ctx: IOxSessionContext, physical_plan: Arc, - database_name: String, + namespace_name: String, mut query_completed_token: QueryCompletedToken, permit: InstrumentedAsyncOwnedSemaphorePermit, ) -> Result { @@ -354,7 +354,7 @@ impl GetStream { .execute_stream(Arc::clone(&physical_plan)) .await .context(QuerySnafu { - database_name: &database_name, + namespace_name: &namespace_name, })?; let join_handle = tokio::spawn(async move { @@ -399,7 +399,7 @@ impl GetStream { Err(e) => { // failure sending here is OK because we're cutting the stream anyways tx.send(Err(Error::Query { - database_name: database_name.clone(), + namespace_name: namespace_name.clone(), source: DataFusionError::ArrowError(e), } .into())) @@ -495,7 +495,7 @@ mod tests { server: Arc::clone(&test_storage), }; let ticket = Ticket { - ticket: br#"{"database_name": "my_db", "sql_query": "SELECT 1;"}"#.to_vec(), + ticket: br#"{"namespace_name": "my_db", "sql_query": "SELECT 1;"}"#.to_vec(), }; let streaming_resp1 = service .do_get(tonic::Request::new(ticket.clone())) From 98621f560b915e1a1a5a618d32e223aa2ad07de5 Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Fri, 11 Nov 2022 15:16:53 -0500 Subject: [PATCH 8/8] fix: Remove obsolete SERVER_MODE from the Dockerfile --- Dockerfile | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/Dockerfile b/Dockerfile index 6b4f3a1212..34c1c78700 100644 --- a/Dockerfile +++ b/Dockerfile @@ -57,10 +57,9 @@ ENV PACKAGE=$PACKAGE COPY --from=build "/root/$PACKAGE" "/usr/bin/$PACKAGE" COPY docker/entrypoint.sh /usr/bin/entrypoint.sh -ENV INFLUXDB_IOX_SERVER_MODE=database EXPOSE 8080 8082 ENTRYPOINT ["/usr/bin/entrypoint.sh"] -CMD ["run", "$INFLUXDB_IOX_SERVER_MODE"] +CMD ["run"]