diff --git a/Cargo.lock b/Cargo.lock index 52ce97e977..b6123cd7e7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1528,6 +1528,7 @@ dependencies = [ "assert_cmd", "async-trait", "backtrace", + "base64 0.13.0", "byteorder", "bytes", "chrono", diff --git a/arrow_util/src/test_util.rs b/arrow_util/src/test_util.rs index d8c6034035..15b3db75af 100644 --- a/arrow_util/src/test_util.rs +++ b/arrow_util/src/test_util.rs @@ -1,5 +1,8 @@ //! A collection of testing functions for arrow based code +use std::sync::Arc; + use arrow::{ + array::{ArrayRef, StringArray}, compute::kernels::sort::{lexsort, SortColumn, SortOptions}, record_batch::RecordBatch, }; @@ -89,3 +92,42 @@ pub fn sort_record_batch(batch: RecordBatch) -> RecordBatch { RecordBatch::try_new(batch.schema(), sort_output).unwrap() } + +/// Return a new `StringArray` where each element had a normalization +/// function `norm` applied. +pub fn normalize_string_array(arr: &StringArray, norm: N) -> ArrayRef +where + N: Fn(&str) -> String, +{ + let normalized: StringArray = arr.iter().map(|s| s.map(&norm)).collect(); + Arc::new(normalized) +} + +/// Return a new set of `RecordBatch`es where the function `norm` has +/// applied to all `StringArray` rows. +pub fn normalize_batches(batches: Vec, norm: N) -> Vec +where + N: Fn(&str) -> String, +{ + // The idea here is is to get a function that normalizes strings + // and apply it to each StringArray element by element + batches + .into_iter() + .map(|batch| { + let new_columns: Vec<_> = batch + .columns() + .iter() + .map(|array| { + if let Some(array) = array.as_any().downcast_ref::() { + normalize_string_array(array, &norm) + } else { + Arc::clone(array) + } + }) + .collect(); + + RecordBatch::try_new(batch.schema(), new_columns) + .expect("error occured during normalization") + }) + .collect() +} diff --git a/docs/server_startup.md b/docs/server_startup.md index 42d9c7222f..deeac48516 100644 --- a/docs/server_startup.md +++ b/docs/server_startup.md @@ -85,7 +85,7 @@ Created database imported_db ### Copy parquet files into the new database -IOx stores data files in `/data//`, +IOx stores parquet files in `/data//`, and the imported data files must be in the same structure. For example, if you are running IOx with data directory of `~/.influxdb_iox` the data @@ -106,43 +106,20 @@ my_awesome_table Copy that directory structure into the the database's catalog: ```shell +mkdir -p ~/.influxdb_iox/dbs/4fc2236c-7ab8-4200-83c7-f29cd0c2385f/data cp -R 'my_awesome_table' ~/.influxdb_iox/dbs/4fc2236c-7ab8-4200-83c7-f29cd0c2385f/data/ ``` -### Break the catalog +### WARNING -At the time of writing, in order to rebuild a catalog from parquet -files the catalog must be corrupted. One way to do so manually is to -find a transaction file, and write some junk into it. For example: +Note that If you create/move/copy files into the domain of a running database, the database's background cleanup worker thread may delete these files prior to the catalog rebuild; You can check the logs to see if this happened. + +### Rebuild catalog with the `recover rebuild --force` command + +Now, tell IOx to rebuild the catalog from parquet files: ```shell -find ~/.influxdb_iox/dbs/4fc2236c-7ab8-4200-83c7-f29cd0c2385f/transactions -type f -/Users/alamb/.influxdb_iox/dbs/4fc2236c-7ab8-4200-83c7-f29cd0c2385f/transactions/00000000000000000000/8dda6fb8-6907-4d89-b133-85536ccd9bd3.txn - -# write something bad into the txn file -echo "JUNK" > /Users/alamb/.influxdb_iox/dbs/4fc2236c-7ab8-4200-83c7-f29cd0c2385f/transactions/00000000000000000000/8dda6fb8-6907-4d89-b133-85536ccd9bd3.txn -``` - -### Restart IOx (with `--wipe-catalog-on-error=false`): - -In another terminal, restart the IOx server with `--wipe-catalog-on-error=false` (which is critical). - -```shell -cargo run -- run -v --object-store=file --data-dir=$HOME/.influxdb_iox --server-id=42 --wipe-catalog-on-error=false -``` - -The database should enter into the `CatalogLoadError` state. - -```text -2021-11-30T15:44:43.784118Z ERROR server::database: database in error state - operator intervention required db_name=imported_db e=error loading catalog: Cannot load preserved catalog: ... state=CatalogLoadError -``` - -### Use the `recover rebuild` command - -Now, rebuild the catalog: - -```shell -./target/debug/influxdb_iox database recover rebuild imported_db +./target/debug/influxdb_iox database recover rebuild imported_db --force { "operation": { "name": "0", diff --git a/generated_types/protos/influxdata/iox/management/v1/service.proto b/generated_types/protos/influxdata/iox/management/v1/service.proto index c22285d09c..15191e0802 100644 --- a/generated_types/protos/influxdata/iox/management/v1/service.proto +++ b/generated_types/protos/influxdata/iox/management/v1/service.proto @@ -73,7 +73,7 @@ service ManagementService { // Get server status rpc GetServerStatus(GetServerStatusRequest) returns (GetServerStatusResponse); - // Rebuild preserved catalog for given DB + // Rebuild preserved catalog from parquet files for given DB rpc RebuildPreservedCatalog(RebuildPreservedCatalogRequest) returns (RebuildPreservedCatalogResponse); // Wipe preserved catalog for given DB. @@ -432,6 +432,10 @@ message WipePreservedCatalogResponse { message RebuildPreservedCatalogRequest { // the name of the database string db_name = 1; + + // force the catalog to be rebuilt, even if the database has started + // successfully + bool force = 2; } message RebuildPreservedCatalogResponse { diff --git a/generated_types/protos/influxdata/iox/router/v1/service.proto b/generated_types/protos/influxdata/iox/router/v1/service.proto index c6eefed97b..8ac87820d8 100644 --- a/generated_types/protos/influxdata/iox/router/v1/service.proto +++ b/generated_types/protos/influxdata/iox/router/v1/service.proto @@ -5,6 +5,9 @@ option go_package = "github.com/influxdata/iox/router/v1"; import "influxdata/iox/router/v1/router.proto"; service RouterService { + // Get router. + rpc GetRouter(GetRouterRequest) returns (GetRouterResponse); + // List configured routers. rpc ListRouters(ListRoutersRequest) returns (ListRoutersResponse); @@ -15,6 +18,14 @@ service RouterService { rpc DeleteRouter(DeleteRouterRequest) returns (DeleteRouterResponse); } +message GetRouterRequest { + string router_name = 1; +} + +message GetRouterResponse { + Router router = 1; +} + message ListRoutersRequest {} message ListRoutersResponse { diff --git a/generated_types/src/job.rs b/generated_types/src/job.rs index c2ee3e6a31..6328930d4b 100644 --- a/generated_types/src/job.rs +++ b/generated_types/src/job.rs @@ -1,6 +1,29 @@ use crate::influxdata::iox::management::v1 as management; use data_types::job::Job; +impl management::operation_metadata::Job { + /// Return the db_name for this job + pub fn db_name(&self) -> &str { + match self { + Self::Dummy(management::Dummy { db_name, .. }) => db_name, + Self::WriteChunk(management::WriteChunk { db_name, .. }) => db_name, + Self::WipePreservedCatalog(management::WipePreservedCatalog { db_name, .. }) => db_name, + Self::CompactChunks(management::CompactChunks { db_name, .. }) => db_name, + Self::PersistChunks(management::PersistChunks { db_name, .. }) => db_name, + Self::DropChunk(management::DropChunk { db_name, .. }) => db_name, + Self::DropPartition(management::DropPartition { db_name, .. }) => db_name, + Self::LoadReadBufferChunk(management::LoadReadBufferChunk { db_name, .. }) => db_name, + Self::RebuildPreservedCatalog(management::RebuildPreservedCatalog { + db_name, .. + }) => db_name, + Self::CompactObjectStoreChunks(management::CompactObjectStoreChunks { + db_name, + .. + }) => db_name, + } + } +} + impl From for management::operation_metadata::Job { fn from(job: Job) -> Self { match job { diff --git a/generated_types/src/lib.rs b/generated_types/src/lib.rs index c293783f72..3da1a0a9c5 100644 --- a/generated_types/src/lib.rs +++ b/generated_types/src/lib.rs @@ -13,7 +13,16 @@ pub mod influxdata { env!("OUT_DIR"), "/influxdata.platform.storage.read.rs" )); + include!(concat!( + env!("OUT_DIR"), + "/influxdata.platform.storage.read.serde.rs" + )); + include!(concat!(env!("OUT_DIR"), "/influxdata.platform.storage.rs")); + include!(concat!( + env!("OUT_DIR"), + "/influxdata.platform.storage.serde.rs" + )); // Can't implement `Default` because `prost::Message` implements `Default` impl TimestampRange { diff --git a/influxdb_iox/Cargo.toml b/influxdb_iox/Cargo.toml index cf0524ad36..f73cadb89c 100644 --- a/influxdb_iox/Cargo.toml +++ b/influxdb_iox/Cargo.toml @@ -99,6 +99,7 @@ write_buffer = { path = "../write_buffer" } # Crates.io dependencies, in alphabetical order assert_cmd = "2.0.2" +base64 = "0.13" hex = "0.4.2" predicates = "2.1.0" rand = "0.8.3" diff --git a/influxdb_iox/src/commands/database/recover.rs b/influxdb_iox/src/commands/database/recover.rs index 2fd41d0705..30c789beb7 100644 --- a/influxdb_iox/src/commands/database/recover.rs +++ b/influxdb_iox/src/commands/database/recover.rs @@ -48,7 +48,7 @@ enum Command { /// Skip replay SkipReplay(SkipReplay), - /// Rebuild preserved catalog + /// Rebuild catalog from parquet fles Rebuild(Rebuild), } @@ -63,9 +63,13 @@ struct Wipe { db_name: String, } -/// Rebuild preserved catalog. +/// Rebuild catalog from parquet files #[derive(Debug, StructOpt)] struct Rebuild { + /// Force rebuild, even if the database has already successfully started + #[structopt(long)] + force: bool, + /// The name of the database db_name: String, } @@ -104,7 +108,7 @@ pub async fn command(connection: Connection, config: Config) -> Result<()> { } Command::Rebuild(rebuild) => { let operation = client - .rebuild_preserved_catalog(rebuild.db_name) + .rebuild_preserved_catalog(rebuild.db_name, rebuild.force) .await .context(RebuildCatalog)?; diff --git a/influxdb_iox/src/commands/router.rs b/influxdb_iox/src/commands/router.rs new file mode 100644 index 0000000000..996a27342f --- /dev/null +++ b/influxdb_iox/src/commands/router.rs @@ -0,0 +1,99 @@ +//! This module implements the `router` CLI command + +use influxdb_iox_client::{ + connection::Connection, + router::{self, generated_types::Router as RouterConfig}, +}; +use structopt::StructOpt; +use thiserror::Error; + +#[allow(clippy::enum_variant_names)] +#[derive(Debug, Error)] +pub enum Error { + #[error("JSON Serialization error: {0}")] + Serde(#[from] serde_json::Error), + + #[error("Client error: {0}")] + ClientError(#[from] influxdb_iox_client::error::Error), +} + +pub type Result = std::result::Result; + +/// Manage IOx databases +#[derive(Debug, StructOpt)] +pub struct Config { + #[structopt(subcommand)] + command: Command, +} + +/// Create a new router or update an existing one. +#[derive(Debug, StructOpt)] +struct CreateOrUpdate { + /// The name of the router + name: String, +} + +/// Return configuration of specific router +#[derive(Debug, StructOpt)] +struct Get { + /// The name of the router + name: String, +} + +/// Delete specific router +#[derive(Debug, StructOpt)] +struct Delete { + /// The name of the router + name: String, +} + +/// All possible subcommands for router +#[derive(Debug, StructOpt)] +enum Command { + CreateOrUpdate(CreateOrUpdate), + + /// List routers + List, + + Get(Get), + + Delete(Delete), +} + +pub async fn command(connection: Connection, config: Config) -> Result<()> { + match config.command { + Command::CreateOrUpdate(command) => { + let mut client = router::Client::new(connection); + let config = RouterConfig { + name: command.name.clone(), + ..Default::default() + }; + + client.update_router(config).await?; + + println!("Created/Updated router {}", command.name); + } + Command::List => { + let mut client = router::Client::new(connection); + let routers = client.list_routers().await?; + for router in routers { + println!("{}", router.name); + } + } + Command::Get(get) => { + let Get { name } = get; + let mut client = router::Client::new(connection); + let router = client.get_router(&name).await?; + println!("{}", serde_json::to_string_pretty(&router)?); + } + Command::Delete(delete) => { + let Delete { name } = delete; + let mut client = router::Client::new(connection); + client.delete_router(&name).await?; + + println!("Deleted router {}", name); + } + } + + Ok(()) +} diff --git a/influxdb_iox/src/influxdb_ioxd/server_type/database/http.rs b/influxdb_iox/src/influxdb_ioxd/server_type/database/http.rs index 3d81351905..b240034fd9 100644 --- a/influxdb_iox/src/influxdb_ioxd/server_type/database/http.rs +++ b/influxdb_iox/src/influxdb_ioxd/server_type/database/http.rs @@ -13,7 +13,7 @@ // Influx crates use data_types::{names::OrgBucketMappingError, DatabaseName}; use influxdb_iox_client::format::QueryOutputFormat; -use query::exec::ExecutionContextProvider; +use query::{exec::ExecutionContextProvider, QueryDatabase}; use server::Error; // External crates @@ -252,6 +252,8 @@ async fn query( let db = server.db(&db_name)?; + db.record_query("sql", &q); + let ctx = db.new_query_context(req.extensions().get().cloned()); let physical_plan = Planner::new(&ctx).sql(&q).await.context(Planning)?; diff --git a/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/error.rs b/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/error.rs index abfd0bff7d..bfc2368651 100644 --- a/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/error.rs +++ b/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/error.rs @@ -90,6 +90,13 @@ pub fn default_database_error_handler(error: server::database::Error) -> tonic:: error!(%source, "Unexpected error while wiping catalog"); InternalError {}.into() } + Error::InvalidStateForRebuild { .. } => { + PreconditionViolation::DatabaseInvalidState(error.to_string()).into() + } + Error::UnexpectedTransitionForRebuild { .. } => { + error!(%error, "Unexpected error during rebuild catalog"); + InternalError {}.into() + } Error::RebuildPreservedCatalog { source, .. } => { error!(%source, "Unexpected error while rebuilding catalog"); InternalError {}.into() diff --git a/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/flight.rs b/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/flight.rs index 08ea5a3ec4..9cbf8e20b0 100644 --- a/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/flight.rs +++ b/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/flight.rs @@ -17,6 +17,7 @@ use arrow_flight::{ use datafusion::physical_plan::ExecutionPlan; use futures::{SinkExt, Stream, StreamExt}; use pin_project::{pin_project, pinned_drop}; +use query::QueryDatabase; use serde::Deserialize; use snafu::{ResultExt, Snafu}; use tokio::task::JoinHandle; @@ -170,6 +171,8 @@ impl Flight for FlightService { .db(&database) .map_err(default_server_error_handler)?; + db.record_query("sql", &read_info.sql_query); + let ctx = db.new_query_context(span_ctx); let physical_plan = Planner::new(&ctx) diff --git a/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/management.rs b/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/management.rs index ff65f73930..5f4de6c73e 100644 --- a/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/management.rs +++ b/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/management.rs @@ -446,7 +446,7 @@ impl management_service_server::ManagementService for ManagementService { &self, request: Request, ) -> Result, Status> { - let RebuildPreservedCatalogRequest { db_name } = request.into_inner(); + let RebuildPreservedCatalogRequest { db_name, force } = request.into_inner(); // Validate that the database name is legit let db_name = DatabaseName::new(db_name).scope("db_name")?; @@ -455,7 +455,7 @@ impl management_service_server::ManagementService for ManagementService { .database(&db_name) .map_err(default_server_error_handler)?; let tracker = database - .rebuild_preserved_catalog() + .rebuild_preserved_catalog(force) .await .map_err(default_database_error_handler)?; diff --git a/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/storage/service.rs b/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/storage/service.rs index 3c71b6462d..5accb728cd 100644 --- a/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/storage/service.rs +++ b/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/storage/service.rs @@ -2,7 +2,7 @@ //! implemented in terms of the [`QueryDatabase`](query::QueryDatabase) and //! [`DatabaseStore`] -use std::collections::HashMap; +use std::{collections::HashMap, sync::Arc}; use snafu::{OptionExt, ResultExt, Snafu}; use tokio::sync::mpsc; @@ -20,8 +20,12 @@ use generated_types::{ }; use observability_deps::tracing::{error, info, trace}; use predicate::predicate::PredicateBuilder; -use query::exec::{ - fieldlist::FieldList, seriesset::converter::Error as SeriesSetError, ExecutionContextProvider, +use query::{ + exec::{ + fieldlist::FieldList, seriesset::converter::Error as SeriesSetError, + ExecutionContextProvider, + }, + QueryDatabase, }; use server::DatabaseStore; @@ -213,20 +217,21 @@ where ) -> Result, Status> { let span_ctx = req.extensions().get().cloned(); - let read_filter_request = req.into_inner(); - let db_name = get_database_name(&read_filter_request)?; - info!(%db_name, ?read_filter_request.range, predicate=%read_filter_request.predicate.loggable(), "read filter"); + let req = req.into_inner(); + let db_name = get_database_name(&req)?; + info!(%db_name, ?req.range, predicate=%req.predicate.loggable(), "read filter"); - let results = read_filter_impl( - self.db_store.as_ref(), - db_name, - read_filter_request, - span_ctx, - ) - .await? - .into_iter() - .map(Ok) - .collect::>(); + let db = self + .db_store + .db(&db_name) + .context(DatabaseNotFound { db_name: &db_name })?; + db.record_query("read_filter", defer_json(&req)); + + let results = read_filter_impl(db, db_name, req, span_ctx) + .await? + .into_iter() + .map(Ok) + .collect::>(); Ok(tonic::Response::new(futures::stream::iter(results))) } @@ -238,9 +243,14 @@ where req: tonic::Request, ) -> Result, Status> { let span_ctx = req.extensions().get().cloned(); - let read_group_request = req.into_inner(); + let req = req.into_inner(); - let db_name = get_database_name(&read_group_request)?; + let db_name = get_database_name(&req)?; + let db = self + .db_store + .db(&db_name) + .context(DatabaseNotFound { db_name: &db_name })?; + db.record_query("read_group", defer_json(&req)); let ReadGroupRequest { read_source: _read_source, @@ -249,7 +259,7 @@ where group_keys, group, aggregate, - } = read_group_request; + } = req; info!(%db_name, ?range, ?group_keys, ?group, ?aggregate,predicate=%predicate.loggable(),"read_group"); @@ -265,19 +275,12 @@ where let gby_agg = expr::make_read_group_aggregate(aggregate, group, group_keys) .context(ConvertingReadGroupAggregate { aggregate_string })?; - let results = query_group_impl( - self.db_store.as_ref(), - db_name, - range, - predicate, - gby_agg, - span_ctx, - ) - .await - .map_err(|e| e.to_status())? - .into_iter() - .map(Ok) - .collect::>(); + let results = query_group_impl(db, db_name, range, predicate, gby_agg, span_ctx) + .await + .map_err(|e| e.to_status())? + .into_iter() + .map(Ok) + .collect::>(); Ok(tonic::Response::new(futures::stream::iter(results))) } @@ -290,9 +293,14 @@ where req: tonic::Request, ) -> Result, Status> { let span_ctx = req.extensions().get().cloned(); - let read_window_aggregate_request = req.into_inner(); + let req = req.into_inner(); - let db_name = get_database_name(&read_window_aggregate_request)?; + let db_name = get_database_name(&req)?; + let db = self + .db_store + .db(&db_name) + .context(DatabaseNotFound { db_name: &db_name })?; + db.record_query("read_window_aggregate", defer_json(&req)); let ReadWindowAggregateRequest { read_source: _read_source, @@ -302,7 +310,7 @@ where offset, aggregate, window, - } = read_window_aggregate_request; + } = req; info!(%db_name, ?range, ?window_every, ?offset, ?aggregate, ?window, predicate=%predicate.loggable(),"read_window_aggregate"); @@ -314,19 +322,12 @@ where let gby_agg = expr::make_read_window_aggregate(aggregate, window_every, offset, window) .context(ConvertingWindowAggregate { aggregate_string })?; - let results = query_group_impl( - self.db_store.as_ref(), - db_name, - range, - predicate, - gby_agg, - span_ctx, - ) - .await - .map_err(|e| e.to_status())? - .into_iter() - .map(Ok) - .collect::>(); + let results = query_group_impl(db, db_name, range, predicate, gby_agg, span_ctx) + .await + .map_err(|e| e.to_status())? + .into_iter() + .map(Ok) + .collect::>(); Ok(tonic::Response::new(futures::stream::iter(results))) } @@ -340,30 +341,28 @@ where let span_ctx = req.extensions().get().cloned(); let (tx, rx) = mpsc::channel(4); - let tag_keys_request = req.into_inner(); + let req = req.into_inner(); - let db_name = get_database_name(&tag_keys_request)?; + let db_name = get_database_name(&req)?; + let db = self + .db_store + .db(&db_name) + .context(DatabaseNotFound { db_name: &db_name })?; + db.record_query("tag_keys", defer_json(&req)); let TagKeysRequest { tags_source: _tag_source, range, predicate, - } = tag_keys_request; + } = req; info!(%db_name, ?range, predicate=%predicate.loggable(), "tag_keys"); let measurement = None; - let response = tag_keys_impl( - self.db_store.as_ref(), - db_name, - measurement, - range, - predicate, - span_ctx, - ) - .await - .map_err(|e| e.to_status()); + let response = tag_keys_impl(db, db_name, measurement, range, predicate, span_ctx) + .await + .map_err(|e| e.to_status()); tx.send(response) .await @@ -381,16 +380,21 @@ where let span_ctx = req.extensions().get().cloned(); let (tx, rx) = mpsc::channel(4); - let tag_values_request = req.into_inner(); + let req = req.into_inner(); - let db_name = get_database_name(&tag_values_request)?; + let db_name = get_database_name(&req)?; + let db = self + .db_store + .db(&db_name) + .context(DatabaseNotFound { db_name: &db_name })?; + db.record_query("tag_values", defer_json(&req)); let TagValuesRequest { tags_source: _tag_source, range, predicate, tag_key, - } = tag_values_request; + } = req; let measurement = None; @@ -406,19 +410,11 @@ where .to_status()); } - measurement_name_impl(self.db_store.as_ref(), db_name, range, span_ctx).await + measurement_name_impl(db, db_name, range, span_ctx).await } else if tag_key.is_field() { info!(%db_name, ?range, predicate=%predicate.loggable(), "tag_values with tag_key=[xff] (field name)"); - let fieldlist = field_names_impl( - self.db_store.as_ref(), - db_name, - None, - range, - predicate, - span_ctx, - ) - .await?; + let fieldlist = field_names_impl(db, db_name, None, range, predicate, span_ctx).await?; // Pick out the field names into a Vec>for return let values = fieldlist @@ -434,7 +430,7 @@ where info!(%db_name, ?range, %tag_key, predicate=%predicate.loggable(), "tag_values",); tag_values_impl( - self.db_store.as_ref(), + db, db_name, tag_key, measurement, @@ -460,7 +456,7 @@ where &self, _req: tonic::Request, ) -> Result, Status> { - unimplemented!("read_series_cardinality not yet implemented"); + unimplemented!("read_series_cardinality not yet implemented. https://github.com/influxdata/influxdb_iox/issues/447"); } async fn capabilities( @@ -508,15 +504,20 @@ where let span_ctx = req.extensions().get().cloned(); let (tx, rx) = mpsc::channel(4); - let measurement_names_request = req.into_inner(); + let req = req.into_inner(); - let db_name = get_database_name(&measurement_names_request)?; + let db_name = get_database_name(&req)?; + let db = self + .db_store + .db(&db_name) + .context(DatabaseNotFound { db_name: &db_name })?; + db.record_query("measurement_names", defer_json(&req)); let MeasurementNamesRequest { source: _source, range, predicate, - } = measurement_names_request; + } = req; if let Some(predicate) = predicate { return NotYetImplemented { @@ -531,7 +532,7 @@ where info!(%db_name, ?range, predicate=%predicate.loggable(), "measurement_names"); - let response = measurement_name_impl(self.db_store.as_ref(), db_name, range, span_ctx) + let response = measurement_name_impl(db, db_name, range, span_ctx) .await .map_err(|e| e.to_status()); @@ -551,31 +552,29 @@ where let span_ctx = req.extensions().get().cloned(); let (tx, rx) = mpsc::channel(4); - let measurement_tag_keys_request = req.into_inner(); + let req = req.into_inner(); - let db_name = get_database_name(&measurement_tag_keys_request)?; + let db_name = get_database_name(&req)?; + let db = self + .db_store + .db(&db_name) + .context(DatabaseNotFound { db_name: &db_name })?; + db.record_query("measurement_tag_keys", defer_json(&req)); let MeasurementTagKeysRequest { source: _source, measurement, range, predicate, - } = measurement_tag_keys_request; + } = req; info!(%db_name, ?range, %measurement, predicate=%predicate.loggable(), "measurement_tag_keys"); let measurement = Some(measurement); - let response = tag_keys_impl( - self.db_store.as_ref(), - db_name, - measurement, - range, - predicate, - span_ctx, - ) - .await - .map_err(|e| e.to_status()); + let response = tag_keys_impl(db, db_name, measurement, range, predicate, span_ctx) + .await + .map_err(|e| e.to_status()); tx.send(response) .await @@ -593,9 +592,14 @@ where let span_ctx = req.extensions().get().cloned(); let (tx, rx) = mpsc::channel(4); - let measurement_tag_values_request = req.into_inner(); + let req = req.into_inner(); - let db_name = get_database_name(&measurement_tag_values_request)?; + let db_name = get_database_name(&req)?; + let db = self + .db_store + .db(&db_name) + .context(DatabaseNotFound { db_name: &db_name })?; + db.record_query("measurement_tag_values", defer_json(&req)); let MeasurementTagValuesRequest { source: _source, @@ -603,14 +607,14 @@ where range, predicate, tag_key, - } = measurement_tag_values_request; + } = req; info!(%db_name, ?range, %measurement, %tag_key, predicate=%predicate.loggable(), "measurement_tag_values"); let measurement = Some(measurement); let response = tag_values_impl( - self.db_store.as_ref(), + db, db_name, tag_key, measurement, @@ -637,36 +641,34 @@ where let span_ctx = req.extensions().get().cloned(); let (tx, rx) = mpsc::channel(4); - let measurement_fields_request = req.into_inner(); + let req = req.into_inner(); - let db_name = get_database_name(&measurement_fields_request)?; + let db_name = get_database_name(&req)?; + let db = self + .db_store + .db(&db_name) + .context(DatabaseNotFound { db_name: &db_name })?; + db.record_query("measurement_fields", defer_json(&req)); let MeasurementFieldsRequest { source: _source, measurement, range, predicate, - } = measurement_fields_request; + } = req; info!(%db_name, ?range, predicate=%predicate.loggable(), "measurement_fields"); let measurement = Some(measurement); - let response = field_names_impl( - self.db_store.as_ref(), - db_name, - measurement, - range, - predicate, - span_ctx, - ) - .await - .map(|fieldlist| { - fieldlist_to_measurement_fields_response(fieldlist) - .context(ConvertingFieldList) - .map_err(|e| e.to_status()) - }) - .map_err(|e| e.to_status())?; + let response = field_names_impl(db, db_name, measurement, range, predicate, span_ctx) + .await + .map(|fieldlist| { + fieldlist_to_measurement_fields_response(fieldlist) + .context(ConvertingFieldList) + .map_err(|e| e.to_status()) + }) + .map_err(|e| e.to_status())?; tx.send(response) .await @@ -714,19 +716,18 @@ fn get_database_name(input: &impl GrpcInputs) -> Result, S /// Gathers all measurement names that have data in the specified /// (optional) range -async fn measurement_name_impl( - db_store: &T, +async fn measurement_name_impl( + db: Arc, db_name: DatabaseName<'static>, range: Option, span_ctx: Option, ) -> Result where - T: DatabaseStore + 'static, + D: QueryDatabase + ExecutionContextProvider + 'static, { let predicate = PredicateBuilder::default().set_range(range).build(); - let db_name = db_name.as_ref(); + let db_name = db_name.as_str(); - let db = db_store.db(db_name).context(DatabaseNotFound { db_name })?; let ctx = db.new_query_context(span_ctx); let plan = Planner::new(&ctx) @@ -753,8 +754,8 @@ where /// Return tag keys with optional measurement, timestamp and arbitratry /// predicates -async fn tag_keys_impl( - db_store: &T, +async fn tag_keys_impl( + db: Arc, db_name: DatabaseName<'static>, measurement: Option, range: Option, @@ -762,9 +763,10 @@ async fn tag_keys_impl( span_ctx: Option, ) -> Result where - T: DatabaseStore + 'static, + D: QueryDatabase + ExecutionContextProvider + 'static, { let rpc_predicate_string = format!("{:?}", rpc_predicate); + let db_name = db_name.as_str(); let predicate = PredicateBuilder::default() .set_range(range) @@ -775,27 +777,19 @@ where })? .build(); - let db = db_store.db(&db_name).context(DatabaseNotFound { - db_name: db_name.as_str(), - })?; - let ctx = db.new_query_context(span_ctx); let tag_key_plan = Planner::new(&ctx) .tag_keys(db, predicate) .await .map_err(|e| Box::new(e) as _) - .context(ListingColumns { - db_name: db_name.as_str(), - })?; + .context(ListingColumns { db_name })?; let tag_keys = ctx .to_string_set(tag_key_plan) .await .map_err(|e| Box::new(e) as _) - .context(ListingColumns { - db_name: db_name.as_str(), - })?; + .context(ListingColumns { db_name })?; // Map the resulting collection of Strings into a Vec>for return let values = tag_keys_to_byte_vecs(tag_keys); @@ -806,8 +800,8 @@ where /// Return tag values for tag_name, with optional measurement, timestamp and /// arbitratry predicates -async fn tag_values_impl( - db_store: &T, +async fn tag_values_impl( + db: Arc, db_name: DatabaseName<'static>, tag_name: String, measurement: Option, @@ -816,7 +810,7 @@ async fn tag_values_impl( span_ctx: Option, ) -> Result where - T: DatabaseStore + 'static, + D: QueryDatabase + ExecutionContextProvider + 'static, { let rpc_predicate_string = format!("{:?}", rpc_predicate); @@ -832,7 +826,6 @@ where let db_name = db_name.as_str(); let tag_name = &tag_name; - let db = db_store.db(db_name).context(DatabaseNotFound { db_name })?; let ctx = db.new_query_context(span_ctx); let tag_value_plan = Planner::new(&ctx) @@ -858,15 +851,19 @@ where } /// Launch async tasks that materialises the result of executing read_filter. -async fn read_filter_impl( - db_store: &T, +async fn read_filter_impl( + db: Arc, db_name: DatabaseName<'static>, req: ReadFilterRequest, span_ctx: Option, ) -> Result, Error> where - T: DatabaseStore + 'static, + D: QueryDatabase + ExecutionContextProvider + 'static, { + let db_name = db_name.as_str(); + + let ctx = db.new_query_context(span_ctx); + let rpc_predicate_string = format!("{:?}", req.predicate); let predicate = PredicateBuilder::default() @@ -877,10 +874,6 @@ where })? .build(); - let db_name = db_name.as_str(); - let db = db_store.db(db_name).context(DatabaseNotFound { db_name })?; - let ctx = db.new_query_context(span_ctx); - // PERF - This used to send responses to the client before execution had // completed, but now it doesn't. We may need to revisit this in the future // if big queries are causing a significant latency in TTFB. @@ -907,8 +900,8 @@ where } /// Launch async tasks that send the result of executing read_group to `tx` -async fn query_group_impl( - db_store: &T, +async fn query_group_impl( + db: Arc, db_name: DatabaseName<'static>, range: Option, rpc_predicate: Option, @@ -916,8 +909,11 @@ async fn query_group_impl( span_ctx: Option, ) -> Result, Error> where - T: DatabaseStore + 'static, + D: QueryDatabase + ExecutionContextProvider + 'static, { + let db_name = db_name.as_str(); + let ctx = db.new_query_context(span_ctx); + let rpc_predicate_string = format!("{:?}", rpc_predicate); let predicate = PredicateBuilder::default() @@ -928,14 +924,6 @@ where })? .build(); - // keep original name so we can transfer ownership - // to closure below - let owned_db_name = db_name; - let db_name = owned_db_name.as_str(); - - let db = db_store.db(db_name).context(DatabaseNotFound { db_name })?; - let ctx = db.new_query_context(span_ctx); - let planner = Planner::new(&ctx); let grouped_series_set_plan = match gby_agg { GroupByAndAggregate::Columns { agg, group_columns } => { @@ -960,9 +948,7 @@ where .to_series_and_groups(grouped_series_set_plan) .await .map_err(|e| Box::new(e) as _) - .context(GroupingSeries { - db_name: owned_db_name.as_str(), - }) + .context(GroupingSeries { db_name }) .log_if_error("Running Grouped SeriesSet Plan")?; // ReadGroupRequest does not have a field to control the format of @@ -974,8 +960,8 @@ where /// Return field names, restricted via optional measurement, timestamp and /// predicate -async fn field_names_impl( - db_store: &T, +async fn field_names_impl( + db: Arc, db_name: DatabaseName<'static>, measurement: Option, range: Option, @@ -983,7 +969,7 @@ async fn field_names_impl( span_ctx: Option, ) -> Result where - T: DatabaseStore + 'static, + D: QueryDatabase + ExecutionContextProvider + 'static, { let rpc_predicate_string = format!("{:?}", rpc_predicate); @@ -997,7 +983,6 @@ where .build(); let db_name = db_name.as_str(); - let db = db_store.db(db_name).context(DatabaseNotFound { db_name })?; let ctx = db.new_query_context(span_ctx); let field_list_plan = Planner::new(&ctx) @@ -1016,6 +1001,35 @@ where Ok(field_list) } +/// Return something which can be formatted as json ("pbjson" +/// specifically) +fn defer_json(s: &S) -> impl Into + '_ +where + S: serde::Serialize, +{ + /// Defers conversion into a String + struct DeferredToJson<'a, S> + where + S: serde::Serialize, + { + s: &'a S, + } + + impl From> for String + where + S: serde::Serialize, + { + fn from(w: DeferredToJson<'_, S>) -> Self { + match serde_json::to_string_pretty(&w.s) { + Ok(json) => json, + Err(e) => e.to_string(), + } + } + } + + DeferredToJson { s } +} + #[cfg(test)] mod tests { use std::{ diff --git a/influxdb_iox/src/influxdb_ioxd/server_type/router/rpc/router.rs b/influxdb_iox/src/influxdb_ioxd/server_type/router/rpc/router.rs index d82d19bad5..d5aa5d138c 100644 --- a/influxdb_iox/src/influxdb_ioxd/server_type/router/rpc/router.rs +++ b/influxdb_iox/src/influxdb_ioxd/server_type/router/rpc/router.rs @@ -1,6 +1,9 @@ use std::sync::Arc; -use generated_types::{google::FromOptionalField, influxdata::iox::router::v1::*}; +use generated_types::{ + google::{FromOptionalField, NotFound, ResourceType}, + influxdata::iox::router::v1::*, +}; use router::server::RouterServer; use tonic::{Request, Response, Status}; @@ -10,6 +13,20 @@ struct RouterService { #[tonic::async_trait] impl router_service_server::RouterService for RouterService { + async fn get_router( + &self, + request: Request, + ) -> Result, Status> { + let GetRouterRequest { router_name } = request.into_inner(); + let router = self + .server + .router(&router_name) + .ok_or_else(|| NotFound::new(ResourceType::Router, router_name))?; + Ok(Response::new(GetRouterResponse { + router: Some(router.config().clone().into()), + })) + } + async fn list_routers( &self, _: Request, diff --git a/influxdb_iox/src/main.rs b/influxdb_iox/src/main.rs index 8067682952..7212801dd1 100644 --- a/influxdb_iox/src/main.rs +++ b/influxdb_iox/src/main.rs @@ -22,6 +22,7 @@ mod commands { pub mod database; pub mod debug; pub mod operations; + pub mod router; pub mod run; pub mod server; pub mod server_remote; @@ -147,6 +148,7 @@ enum Command { Database(commands::database::Config), // Clippy recommended boxing this variant because it's much larger than the others Run(Box), + Router(commands::router::Config), Server(commands::server::Config), Operation(commands::operations::Config), Sql(commands::sql::Config), @@ -216,6 +218,14 @@ fn main() -> Result<(), std::io::Error> { std::process::exit(ReturnCode::Failure as _) } } + Command::Router(config) => { + let _tracing_guard = handle_init_logs(init_simple_logs(log_verbose_count)); + let connection = connection().await; + if let Err(e) = commands::router::command(connection, config).await { + eprintln!("{}", e); + std::process::exit(ReturnCode::Failure as _) + } + } Command::Run(config) => { let _tracing_guard = handle_init_logs(init_logs_and_tracing(log_verbose_count, &config)); diff --git a/influxdb_iox/tests/end_to_end_cases/database_migration.rs b/influxdb_iox/tests/end_to_end_cases/database_migration.rs index d3e16f7825..d7d3540417 100644 --- a/influxdb_iox/tests/end_to_end_cases/database_migration.rs +++ b/influxdb_iox/tests/end_to_end_cases/database_migration.rs @@ -1,18 +1,29 @@ //! Contains tests using the CLI and other tools to test scenarios for //! moving data from one server/database to another -use std::path::{Path, PathBuf}; +use std::{ + path::{Path, PathBuf}, + time::Duration, +}; +use arrow_util::assert_batches_eq; use assert_cmd::Command; +use data_types::chunk_metadata::ChunkStorage; use predicates::prelude::*; use tempfile::TempDir; +use test_helpers::assert_contains; use uuid::Uuid; use crate::{ common::server_fixture::{ServerFixture, ServerType}, - end_to_end_cases::scenario::{data_dir, db_data_dir, Scenario}, + end_to_end_cases::scenario::{ + collect_query, create_readable_database, data_dir, db_data_dir, rand_name, + wait_for_exact_chunk_states, wait_for_operations_to_complete, + }, }; +use super::scenario::DatabaseBuilder; + /// Copy the `source_dir` directory into the `target_dir` directory using the `cp` command fn cp_dir(source_dir: impl AsRef, target_dir: impl AsRef) { let source_dir = source_dir.as_ref(); @@ -37,13 +48,14 @@ fn cp_dir(source_dir: impl AsRef, target_dir: impl AsRef) { .success(); } -/// Creates a new database on a shared server, writes data to it, -/// shuts it down cleanly, and copies the files to Tempdir/uuid +/// Creates a new database on a shared server, writes data to +/// `the_table` table, shuts it down cleanly, and copies the files to +/// Tempdir/uuid /// /// Returns (db_name, uuid, tmp_dir) async fn create_copied_database() -> (String, Uuid, TempDir) { - let server_fixture = ServerFixture::create_single_use(ServerType::Database).await; - let addr = server_fixture.grpc_base(); + let fixture = ServerFixture::create_single_use(ServerType::Database).await; + let addr = fixture.grpc_base(); Command::cargo_bin("influxdb_iox") .unwrap() @@ -66,37 +78,60 @@ async fn create_copied_database() -> (String, Uuid, TempDir) { .success() .stdout(predicate::str::contains("Server initialized.")); - let mut management_client = server_fixture.management_client(); - let scenario = Scenario::new(); - let (db_name, db_uuid) = scenario.create_database(&mut management_client).await; + let db_name = rand_name(); + let db_uuid = DatabaseBuilder::new(db_name.clone()) + .persist(true) + .persist_age_threshold_seconds(1) + .late_arrive_window_seconds(1) + .build(fixture.grpc_channel()) + .await; - // todo write data and force it to be written to disk + let lp_lines: Vec<_> = (0..1_000) + .map(|i| format!("the_table,tag1=val{} x={} {}", i, i * 10, i)) + .collect(); + + let mut write_client = fixture.write_client(); + + let num_lines_written = write_client + .write_lp(&db_name, lp_lines.join("\n"), 0) + .await + .expect("successful write"); + assert_eq!(num_lines_written, 1000); + + wait_for_exact_chunk_states( + &fixture, + &db_name, + vec![ChunkStorage::ReadBufferAndObjectStore], + Duration::from_secs(5), + ) + .await; // figure out where the database lives and copy its data to a temporary directory, // as you might copy data from remote object storage to local disk for debugging. - let source_dir = db_data_dir(server_fixture.dir(), db_uuid); + let source_dir = db_data_dir(fixture.dir(), db_uuid); let tmp_dir = TempDir::new().expect("making tmp dir"); cp_dir(source_dir, tmp_dir.path()); // stop the first server (note this call blocks until the process stops) - std::mem::drop(server_fixture); + std::mem::drop(fixture); (db_name.to_string(), db_uuid, tmp_dir) } #[tokio::test] -async fn migrate_database_files_from_one_server_to_another() { +async fn migrate_all_database_files_from_one_server_to_another() { + // Create a database with some parquet files let (db_name, db_uuid, tmp_dir) = create_copied_database().await; // Now start another server that can claim the database - let server_fixture = ServerFixture::create_shared(ServerType::Database).await; - let addr = server_fixture.grpc_base(); + let fixture = ServerFixture::create_shared(ServerType::Database).await; + let addr = fixture.grpc_base(); // copy the data from tmp_dir/ to the new server's location let mut source_dir: PathBuf = tmp_dir.path().into(); source_dir.push(db_uuid.to_string()); - let target_dir = data_dir(server_fixture.dir()); + let target_dir = data_dir(fixture.dir()); cp_dir(source_dir, &target_dir); // Claiming without --force doesn't work as owner.pb still record the other server owning it @@ -129,3 +164,89 @@ async fn migrate_database_files_from_one_server_to_another() { db_name ))); } + +#[tokio::test] +async fn migrate_table_files_from_one_server_to_another() { + let (_, db_uuid, tmp_dir) = create_copied_database().await; + + // Now start another server and create a database to receive the files + let fixture = ServerFixture::create_shared(ServerType::Database).await; + let addr = fixture.grpc_base(); + + let db_name = rand_name(); + let new_db_uuid = create_readable_database(&db_name, fixture.grpc_channel()).await; + let sql_query = "select count(*) from the_table"; + + // No data for the_table yet + let mut flight_client = fixture.flight_client(); + let query_results = flight_client + .perform_query(&db_name, sql_query) + .await + .unwrap_err() + .to_string(); + assert_contains!(query_results, "'the_table' not found"); + + // copy the data from tmp_dir//data/the_table to the new server's location + let mut source_dir: PathBuf = tmp_dir.path().into(); + source_dir.push(db_uuid.to_string()); + source_dir.push("data"); + source_dir.push("the_table"); + + let mut target_dir = db_data_dir(fixture.dir(), new_db_uuid); + target_dir.push("data"); + cp_dir(source_dir, &target_dir); + + // rebuilding without --force doesn't work as db is in ok state + Command::cargo_bin("influxdb_iox") + .unwrap() + .arg("database") + .arg("recover") + .arg("rebuild") + .arg(&db_name) + .arg("--host") + .arg(addr) + .assert() + .failure() + .stderr(predicate::str::contains( + "in invalid state for catalog rebuild (Initialized). Expected (CatalogLoadError)", + )); + + // however with --force rebuilding will work + Command::cargo_bin("influxdb_iox") + .unwrap() + .arg("database") + .arg("recover") + .arg("rebuild") + .arg(&db_name) + .arg("--force") // sudo make me a sandwich + .arg("--host") + .arg(addr) + .assert() + .success() + .stdout(predicate::str::contains("operation")); + + // Wait for the rebuild to complete (maybe not needed given we + // wait right below for the server to be initialized) + wait_for_operations_to_complete(&fixture, &db_name, Duration::from_secs(5)).await; + + // Wait for all databases to complete re-initialization here + fixture.wait_server_initialized().await; + + // Now the data shoudl be available for the_table + let query_results = flight_client + .perform_query(&db_name, sql_query) + .await + .unwrap(); + + let batches = collect_query(query_results).await; + + let expected = vec![ + "+-----------------+", + "| COUNT(UInt8(1)) |", + "+-----------------+", + "| 1000 |", + "+-----------------+", + ]; + + assert_batches_eq!(expected, &batches); +} diff --git a/influxdb_iox/tests/end_to_end_cases/management_api.rs b/influxdb_iox/tests/end_to_end_cases/management_api.rs index bfed8e74a7..fdae0d48a8 100644 --- a/influxdb_iox/tests/end_to_end_cases/management_api.rs +++ b/influxdb_iox/tests/end_to_end_cases/management_api.rs @@ -1204,7 +1204,7 @@ async fn test_rebuild_preserved_catalog() { let mut management_client = fixture.management_client(); let mut operations_client = fixture.operations_client(); let iox_operation = management_client - .rebuild_preserved_catalog(&db_name) + .rebuild_preserved_catalog(&db_name, false) .await .unwrap(); diff --git a/influxdb_iox/tests/end_to_end_cases/mod.rs b/influxdb_iox/tests/end_to_end_cases/mod.rs index bd009b0613..df22d58e96 100644 --- a/influxdb_iox/tests/end_to_end_cases/mod.rs +++ b/influxdb_iox/tests/end_to_end_cases/mod.rs @@ -19,6 +19,7 @@ mod read_cli; mod remote_api; mod remote_cli; mod router_api; +mod router_cli; mod run_cli; pub mod scenario; mod sql_cli; diff --git a/influxdb_iox/tests/end_to_end_cases/router_api.rs b/influxdb_iox/tests/end_to_end_cases/router_api.rs index f00a2d9a28..e1d7baf8fd 100644 --- a/influxdb_iox/tests/end_to_end_cases/router_api.rs +++ b/influxdb_iox/tests/end_to_end_cases/router_api.rs @@ -45,6 +45,7 @@ async fn test_router_crud() { // no routers assert_eq!(client.list_routers().await.unwrap().len(), 0); + assert_error!(client.get_router(&cfg_foo_1.name).await, Error::NotFound(_)); client.delete_router(&router_name_b).await.unwrap(); // add routers @@ -54,6 +55,8 @@ async fn test_router_crud() { assert_eq!(routers.len(), 2); assert_eq!(&routers[0], &cfg_bar); assert_eq!(&routers[1], &cfg_foo_1); + assert_eq!(client.get_router(&cfg_bar.name).await.unwrap(), cfg_bar); + assert_eq!(client.get_router(&cfg_foo_1.name).await.unwrap(), cfg_foo_1); // update router client.update_router(cfg_foo_2.clone()).await.unwrap(); @@ -61,12 +64,18 @@ async fn test_router_crud() { assert_eq!(routers.len(), 2); assert_eq!(&routers[0], &cfg_bar); assert_eq!(&routers[1], &cfg_foo_2); + assert_eq!(client.get_router(&cfg_bar.name).await.unwrap(), cfg_bar); + assert_eq!(client.get_router(&cfg_foo_2.name).await.unwrap(), cfg_foo_2); // delete routers client.delete_router(&router_name_b).await.unwrap(); let routers = client.list_routers().await.unwrap(); assert_eq!(routers.len(), 1); assert_eq!(&routers[0], &cfg_bar); + assert_eq!(client.get_router(&cfg_bar.name).await.unwrap(), cfg_bar); + assert_error!(client.get_router(&cfg_foo_2.name).await, Error::NotFound(_)); + + // deleting router a second time is a no-op client.delete_router(&router_name_b).await.unwrap(); } diff --git a/influxdb_iox/tests/end_to_end_cases/router_cli.rs b/influxdb_iox/tests/end_to_end_cases/router_cli.rs new file mode 100644 index 0000000000..5695879c43 --- /dev/null +++ b/influxdb_iox/tests/end_to_end_cases/router_cli.rs @@ -0,0 +1,91 @@ +use crate::{ + common::server_fixture::{ServerFixture, ServerType}, + end_to_end_cases::scenario::rand_name, +}; +use assert_cmd::Command; +use predicates::prelude::*; + +#[tokio::test] +async fn test_router_crud() { + let server_fixture = ServerFixture::create_shared(ServerType::Router).await; + let addr = server_fixture.grpc_base(); + let router_name = rand_name(); + + Command::cargo_bin("influxdb_iox") + .unwrap() + .arg("router") + .arg("get") + .arg(&router_name) + .arg("--host") + .arg(addr) + .assert() + .failure() + .stderr(predicate::str::contains(format!( + "Resource router/{} not found", + router_name, + ))); + + Command::cargo_bin("influxdb_iox") + .unwrap() + .arg("router") + .arg("create-or-update") + .arg(&router_name) + .arg("--host") + .arg(addr) + .assert() + .success() + .stdout(predicate::str::contains(format!( + "Created/Updated router {}", + router_name + ))); + + Command::cargo_bin("influxdb_iox") + .unwrap() + .arg("router") + .arg("list") + .arg("--host") + .arg(addr) + .assert() + .success() + .stdout(predicate::str::contains(&router_name)); + + Command::cargo_bin("influxdb_iox") + .unwrap() + .arg("router") + .arg("get") + .arg(&router_name) + .arg("--host") + .arg(addr) + .assert() + .success() + .stdout( + predicate::str::contains(&router_name).and(predicate::str::contains(format!( + r#""name": "{}"#, + &router_name + ))), // validate the defaults have been set reasonably + ); + + Command::cargo_bin("influxdb_iox") + .unwrap() + .arg("router") + .arg("delete") + .arg(&router_name) + .arg("--host") + .arg(addr) + .assert() + .success() + .stdout(predicate::str::contains(format!( + "Deleted router {}", + router_name + ))); + + Command::cargo_bin("influxdb_iox") + .unwrap() + .arg("router") + .arg("list") + .arg("--host") + .arg(addr) + .assert() + .success() + .stdout(predicate::str::contains(&router_name).not()); +} diff --git a/influxdb_iox/tests/end_to_end_cases/scenario.rs b/influxdb_iox/tests/end_to_end_cases/scenario.rs index 508c1d0221..c80bc28e99 100644 --- a/influxdb_iox/tests/end_to_end_cases/scenario.rs +++ b/influxdb_iox/tests/end_to_end_cases/scenario.rs @@ -127,6 +127,28 @@ impl Scenario { }) } + /// returns a function suitable for normalizing output that + /// contains org and bucket ids. + /// + /// Specifically, the function will replace all instances of + /// `org_id` with `XXXXXXXXXXXXXXXX` and the `bucket_id` with a + /// `YYYYYYYYYYYYYY`, and the read source with `ZZZZZZZZZZZZZZZZ` + pub fn normalizer(&self) -> impl Fn(&str) -> String { + let org_id = self.org_id.clone(); + let bucket_id = self.bucket_id.clone(); + + // also, the actual gRPC request has the org id encoded in the ReadSource, + // \"value\": \"CLmSwbj3opLLdRCWrJ2bgoeRw5kBGP////8P\" |", + let read_source_value = self.read_source().unwrap().value; + let read_source_value = base64::encode(&read_source_value); + + move |s: &str| { + s.replace(&org_id, "XXXXXXXXXXXXXXXX") + .replace(&bucket_id, "YYYYYYYYYYYYYY") + .replace(&read_source_value, "ZZZZZZZZZZZZZZZZ") + } + } + /// Creates the database on the server for this scenario, /// returning (name, uuid) pub async fn create_database( @@ -314,9 +336,9 @@ pub struct DatabaseBuilder { } impl DatabaseBuilder { - pub fn new(name: String) -> Self { + pub fn new(name: impl Into) -> Self { Self { - name, + name: name.into(), partition_template: PartitionTemplate { parts: vec![partition_template::Part { part: Some(partition_template::part::Part::Table(Empty {})), @@ -377,11 +399,11 @@ impl DatabaseBuilder { self } - // Build a database + // Build a database, returning the UUID of the created database pub async fn try_build( self, channel: Connection, - ) -> Result<(), influxdb_iox_client::error::Error> { + ) -> Result { let mut management_client = management::Client::new(channel); management_client @@ -392,22 +414,21 @@ impl DatabaseBuilder { worker_cleanup_avg_sleep: None, write_buffer_connection: self.write_buffer, }) - .await?; - Ok(()) + .await } // Build a database - pub async fn build(self, channel: Connection) { + pub async fn build(self, channel: Connection) -> Uuid { self.try_build(channel) .await - .expect("create database failed"); + .expect("create database failed") } } /// given a channel to talk with the management api, create a new /// database with the specified name configured with a 10MB mutable -/// buffer, partitioned on table -pub async fn create_readable_database(db_name: impl Into, channel: Connection) { +/// buffer, partitioned on table, returning the UUID of the created database +pub async fn create_readable_database(db_name: impl Into, channel: Connection) -> Uuid { DatabaseBuilder::new(db_name.into()).build(channel).await } @@ -522,6 +543,52 @@ where } } +// Wait for up to `wait_time` all operations to be complete +pub async fn wait_for_operations_to_complete( + fixture: &ServerFixture, + db_name: &str, + wait_time: std::time::Duration, +) { + let t_start = std::time::Instant::now(); + let mut operations_client = fixture.operations_client(); + + loop { + let mut operations = operations_client.list_operations().await.unwrap(); + operations.sort_by(|a, b| a.operation.name.cmp(&b.operation.name)); + + // if all operations are complete, great! + let all_ops_done = operations + .iter() + .filter(|op| { + // job name matches + op.metadata + .job + .as_ref() + .map(|job| job.db_name() == db_name) + .unwrap_or(false) + }) + .all(|op| op.operation.done); + + if all_ops_done { + println!( + "All operations for {} complete after {:?}:\n\n{:#?}", + db_name, + t_start.elapsed(), + operations + ); + return; + } + + if t_start.elapsed() >= wait_time { + panic!( + "Operations for {} did not complete in {:?}:\n\n{:#?}", + db_name, wait_time, operations + ); + } + tokio::time::sleep(std::time::Duration::from_millis(500)).await; + } +} + /// Gets the list of ChunkSummaries from the server pub async fn list_chunks(fixture: &ServerFixture, db_name: &str) -> Vec { let mut management_client = fixture.management_client(); diff --git a/influxdb_iox/tests/end_to_end_cases/system_tables.rs b/influxdb_iox/tests/end_to_end_cases/system_tables.rs index 36fdfb20ba..2226d455a6 100644 --- a/influxdb_iox/tests/end_to_end_cases/system_tables.rs +++ b/influxdb_iox/tests/end_to_end_cases/system_tables.rs @@ -1,5 +1,8 @@ -use crate::common::server_fixture::{ServerFixture, ServerType}; -use arrow_util::assert_batches_eq; +use crate::{ + common::server_fixture::{ServerFixture, ServerType}, + end_to_end_cases::scenario::Scenario, +}; +use arrow_util::{assert_batches_eq, test_util::normalize_batches}; use super::scenario::{collect_query, create_readable_database, list_chunks, rand_name}; @@ -73,3 +76,64 @@ async fn test_operations() { assert_batches_eq!(expected_read_data, &batches); } + +#[tokio::test] +async fn test_queries() { + let fixture = ServerFixture::create_shared(ServerType::Database).await; + + let scenario = Scenario::new(); + let (db_name, _db_uuid) = scenario + .create_database(&mut fixture.management_client()) + .await; + + // issue a storage gRPC query as well (likewise will error, but we + // are just checking that things are hooked up here). + let read_source = scenario.read_source(); + let range = Some(generated_types::TimestampRange { + start: 111111, + end: 222222, + }); + + let read_filter_request = tonic::Request::new(generated_types::ReadFilterRequest { + read_source, + range, + ..Default::default() + }); + fixture + .storage_client() + .read_filter(read_filter_request) + .await + .unwrap(); + + // Note: don't select issue_time as that changes from run to run + let query = "select query_type, query_text from system.queries"; + + // Query system.queries and should have an entry for the storage rpc + let query_results = fixture + .flight_client() + .perform_query(&db_name, query) + .await + .unwrap(); + + let batches = collect_query(query_results).await; + let batches = normalize_batches(batches, scenario.normalizer()); + + let expected_read_data = vec![ + "+-------------+---------------------------------------------------+", + "| query_type | query_text |", + "+-------------+---------------------------------------------------+", + "| read_filter | { |", + "| | \"ReadSource\": { |", + "| | \"typeUrl\": \"/TODO\", |", + "| | \"value\": \"ZZZZZZZZZZZZZZZZ\" |", + "| | }, |", + "| | \"range\": { |", + "| | \"start\": \"111111\", |", + "| | \"end\": \"222222\" |", + "| | } |", + "| | } |", + "| sql | select query_type, query_text from system.queries |", + "+-------------+---------------------------------------------------+", + ]; + assert_batches_eq!(expected_read_data, &batches); +} diff --git a/influxdb_iox_client/src/client/management.rs b/influxdb_iox_client/src/client/management.rs index 4d56431aeb..3ec031cdb9 100644 --- a/influxdb_iox_client/src/client/management.rs +++ b/influxdb_iox_client/src/client/management.rs @@ -429,12 +429,13 @@ impl Client { pub async fn rebuild_preserved_catalog( &mut self, db_name: impl Into + Send, + force: bool, ) -> Result { let db_name = db_name.into(); let response = self .inner - .rebuild_preserved_catalog(RebuildPreservedCatalogRequest { db_name }) + .rebuild_preserved_catalog(RebuildPreservedCatalogRequest { db_name, force }) .await?; Ok(response diff --git a/influxdb_iox_client/src/client/router.rs b/influxdb_iox_client/src/client/router.rs index 9585ce0cd7..fbb2a73f5d 100644 --- a/influxdb_iox_client/src/client/router.rs +++ b/influxdb_iox_client/src/client/router.rs @@ -1,3 +1,5 @@ +use ::generated_types::google::OptionalField; + use self::generated_types::{router_service_client::RouterServiceClient, *}; use crate::connection::Connection; @@ -49,6 +51,21 @@ impl Client { } } + /// Get router + pub async fn get_router( + &mut self, + router_name: &str, + ) -> Result { + let response = self + .inner + .get_router(GetRouterRequest { + router_name: router_name.to_string(), + }) + .await?; + + Ok(response.into_inner().router.unwrap_field("router")?) + } + /// List routers. pub async fn list_routers(&mut self) -> Result, Error> { let response = self.inner.list_routers(ListRoutersRequest {}).await?; diff --git a/parquet_catalog/src/cleanup.rs b/parquet_catalog/src/cleanup.rs index 507bcc2e11..ccefeff05d 100644 --- a/parquet_catalog/src/cleanup.rs +++ b/parquet_catalog/src/cleanup.rs @@ -187,19 +187,19 @@ mod tests { let mut transaction = catalog.open_transaction().await; // an ordinary tracked parquet file => keep - let (chunk, _) = generator.generate().await; + let (chunk, _) = generator.generate().await.unwrap(); transaction.add_parquet(&CatalogParquetInfo::from_chunk(&chunk)); paths_keep.push(chunk.path().clone()); // another ordinary tracked parquet file that was added and removed => keep (for time // travel) - let (chunk, _) = generator.generate().await; + let (chunk, _) = generator.generate().await.unwrap(); transaction.add_parquet(&CatalogParquetInfo::from_chunk(&chunk)); transaction.remove_parquet(chunk.path()); paths_keep.push(chunk.path().clone()); // an untracked parquet file => delete - let (chunk, _) = generator.generate().await; + let (chunk, _) = generator.generate().await.unwrap(); paths_delete.push(chunk.path().clone()); transaction.commit().await.unwrap(); @@ -240,14 +240,14 @@ mod tests { // not trick the cleanup logic to remove the actual file because file paths contains a // UUIDv4 part. if i % 2 == 0 { - generator.generate_id(i).await; + generator.generate_id(i).await.unwrap(); } let (chunk, _) = tokio::join!( async { let guard = lock.read().await; - let (chunk, _) = generator.generate_id(i).await; + let (chunk, _) = generator.generate_id(i).await.unwrap(); let mut transaction = catalog.open_transaction().await; transaction.add_parquet(&CatalogParquetInfo::from_chunk(&chunk)); @@ -284,7 +284,7 @@ mod tests { // create some files let mut to_remove = HashSet::default(); for _ in 0..3 { - let (chunk, _) = generator.generate().await; + let (chunk, _) = generator.generate().await.unwrap(); to_remove.insert(chunk.path().clone()); } diff --git a/parquet_catalog/src/core.rs b/parquet_catalog/src/core.rs index cbc1819f9c..3c82b60dee 100644 --- a/parquet_catalog/src/core.rs +++ b/parquet_catalog/src/core.rs @@ -1663,7 +1663,7 @@ mod tests { // create another transaction on-top that adds a file (this transaction will be required to load the full state) { - let (chunk, _) = generator.generate_id(1337).await; + let (chunk, _) = generator.generate_id(1337).await.unwrap(); let mut transaction = catalog.open_transaction().await; let info = CatalogParquetInfo::from_chunk(&chunk); @@ -1717,7 +1717,7 @@ mod tests { // create 3 chunks let mut chunk_addrs = vec![]; for _ in 0..3 { - let (chunk, metadata) = generator.generate().await; + let (chunk, metadata) = generator.generate().await.unwrap(); let chunk_addr = ChunkAddr::new(generator.partition(), metadata.chunk_id); let info = CatalogParquetInfo::from_chunk(&chunk); @@ -1902,7 +1902,7 @@ mod tests { let mut t = catalog.open_transaction().await; for _ in 0..4 { - let (chunk, _) = generator.generate().await; + let (chunk, _) = generator.generate().await.unwrap(); let info = CatalogParquetInfo::from_chunk(&chunk); expected.push(chunk); state.insert(info.clone()).unwrap(); @@ -1917,7 +1917,7 @@ mod tests { // modify catalog with examples { - let (chunk, _) = generator.generate().await; + let (chunk, _) = generator.generate().await.unwrap(); let info = CatalogParquetInfo::from_chunk(&chunk); expected.push(chunk); @@ -1941,7 +1941,7 @@ mod tests { { let mut t = catalog.open_transaction().await; - let (chunk, _) = generator.generate().await; + let (chunk, _) = generator.generate().await.unwrap(); let info = CatalogParquetInfo::from_chunk(&chunk); t.add_parquet(&info); diff --git a/parquet_catalog/src/dump.rs b/parquet_catalog/src/dump.rs index f132d88839..7bab580531 100644 --- a/parquet_catalog/src/dump.rs +++ b/parquet_catalog/src/dump.rs @@ -241,7 +241,7 @@ mod tests { // build catalog with some data let catalog = PreservedCatalog::new_empty(config.clone()).await.unwrap(); { - let (chunk, _) = generator.generate().await; + let (chunk, _) = generator.generate().await.unwrap(); let mut transaction = catalog.open_transaction().await; transaction.add_parquet(&CatalogParquetInfo::from_chunk(&chunk)); transaction.commit().await.unwrap(); @@ -352,7 +352,7 @@ File { // build catalog with some data let catalog = PreservedCatalog::new_empty(config.clone()).await.unwrap(); { - let (chunk, _) = generator.generate().await; + let (chunk, _) = generator.generate().await.unwrap(); let mut transaction = catalog.open_transaction().await; transaction.add_parquet(&CatalogParquetInfo::from_chunk(&chunk)); transaction.commit().await.unwrap(); diff --git a/parquet_catalog/src/rebuild.rs b/parquet_catalog/src/rebuild.rs index caf8613725..67d9935e46 100644 --- a/parquet_catalog/src/rebuild.rs +++ b/parquet_catalog/src/rebuild.rs @@ -26,6 +26,9 @@ pub enum Error { path: ParquetFilePath, }, + #[snafu(display("No row groups from parquet file ({:?})", path))] + NoRowGroups { path: ParquetFilePath }, + #[snafu(display("Cannot add file to transaction: {}", source))] FileRecordFailure { source: crate::interface::CatalogStateAddError, @@ -143,7 +146,12 @@ async fn read_parquet( let file_size_bytes = data.len(); let parquet_metadata = IoxParquetMetaData::from_file_bytes(data) - .context(MetadataReadFailure { path: path.clone() })?; + .context(MetadataReadFailure { path: path.clone() })?; // Error reading metadata + + if parquet_metadata.is_none() { + return NoRowGroups { path: path.clone() }.fail(); + } // No data and hence no metadata + let parquet_metadata = parquet_metadata.unwrap(); // validate IOxMetadata parquet_metadata @@ -417,7 +425,9 @@ mod tests { } // drop the reference to the MemWriter that the SerializedFileWriter has let data = mem_writer.into_inner().unwrap(); - let md = IoxParquetMetaData::from_file_bytes(data.clone()).unwrap(); + let md = IoxParquetMetaData::from_file_bytes(data.clone()) + .unwrap() + .unwrap(); let storage = Storage::new(Arc::clone(iox_object_store)); let chunk_addr = ChunkAddr { db_name: Arc::clone(db_name), diff --git a/parquet_catalog/src/test_helpers.rs b/parquet_catalog/src/test_helpers.rs index 3eb06d4e54..9bd3bfcd3d 100644 --- a/parquet_catalog/src/test_helpers.rs +++ b/parquet_catalog/src/test_helpers.rs @@ -274,7 +274,7 @@ where // add files { for chunk_id in 1..5 { - let (chunk, _) = generator.generate_id(chunk_id).await; + let (chunk, _) = generator.generate_id(chunk_id).await.unwrap(); state .add( Arc::clone(iox_object_store), @@ -295,7 +295,7 @@ where // add and remove in the same transaction { - let (chunk, _) = generator.generate_id(5).await; + let (chunk, _) = generator.generate_id(5).await.unwrap(); state .add( Arc::clone(iox_object_store), @@ -321,7 +321,7 @@ where // add, remove, add in the same transaction { - let (chunk, _) = generator.generate_id(6).await; + let (chunk, _) = generator.generate_id(6).await.unwrap(); state .add( Arc::clone(iox_object_store), @@ -358,7 +358,7 @@ where // TODO: Error handling should disambiguate between chunk collision and filename collision // chunk with same ID already exists (should also not change the metadata) - let (chunk, _) = generator.generate_id(2).await; + let (chunk, _) = generator.generate_id(2).await.unwrap(); let err = state .add( Arc::clone(iox_object_store), @@ -375,7 +375,7 @@ where // error handling, still something works { // already exists (should also not change the metadata) - let (chunk, _) = generator.generate_id(2).await; + let (chunk, _) = generator.generate_id(2).await.unwrap(); let err = state .add( Arc::clone(iox_object_store), @@ -388,7 +388,7 @@ where )); // this transaction will still work - let (chunk, _) = generator.generate_id(7).await; + let (chunk, _) = generator.generate_id(7).await.unwrap(); let info = CatalogParquetInfo::from_chunk(&chunk); state .add(Arc::clone(iox_object_store), info.clone()) @@ -418,7 +418,7 @@ where // add predicates { // create two chunks that we can use for delete predicate - let (chunk, metadata) = generator.generate_id(8).await; + let (chunk, metadata) = generator.generate_id(8).await.unwrap(); let chunk_addr_1 = ChunkAddr::new(generator.partition(), metadata.chunk_id); state @@ -429,7 +429,7 @@ where .unwrap(); expected_chunks.insert(8, chunk); - let (chunk, metadata) = generator.generate_id(9).await; + let (chunk, metadata) = generator.generate_id(9).await.unwrap(); let chunk_addr_2 = ChunkAddr::new(generator.partition(), metadata.chunk_id); state @@ -453,7 +453,7 @@ where expected_predicates.insert(predicate_2, chunks_2.into_iter().collect()); // chunks created afterwards are unaffected - let (chunk, _) = generator.generate_id(10).await; + let (chunk, _) = generator.generate_id(10).await.unwrap(); state .add( Arc::clone(iox_object_store), diff --git a/parquet_file/src/metadata.rs b/parquet_file/src/metadata.rs index 47b1492829..8507ad1e17 100644 --- a/parquet_file/src/metadata.rs +++ b/parquet_file/src/metadata.rs @@ -482,12 +482,17 @@ pub struct IoxParquetMetaData { impl IoxParquetMetaData { /// Read parquet metadata from a parquet file. - pub fn from_file_bytes(data: Vec) -> Result { + pub fn from_file_bytes(data: Vec) -> Result> { + if data.is_empty() { + return Ok(None); + } + let cursor = SliceableCursor::new(data); let reader = SerializedFileReader::new(cursor).context(ParquetMetaDataRead {})?; let parquet_md = reader.metadata().clone(); + let data = Self::parquet_md_to_thrift(parquet_md)?; - Ok(Self::from_thrift_bytes(data)) + Ok(Some(Self::from_thrift_bytes(data))) } /// Read parquet metadata from thrift bytes. @@ -877,7 +882,7 @@ mod tests { async fn test_restore_from_file() { // setup: preserve chunk to object store let mut generator = ChunkGenerator::new().await; - let (chunk, _) = generator.generate().await; + let (chunk, _) = generator.generate().await.unwrap(); let parquet_metadata = chunk.parquet_metadata(); let decoded = parquet_metadata.decode().unwrap(); @@ -901,7 +906,7 @@ mod tests { async fn test_restore_from_thrift() { // setup: write chunk to object store and only keep thrift-encoded metadata let mut generator = ChunkGenerator::new().await; - let (chunk, _) = generator.generate().await; + let (chunk, _) = generator.generate().await.unwrap(); let parquet_metadata = chunk.parquet_metadata(); let data = parquet_metadata.thrift_bytes().to_vec(); let parquet_metadata = IoxParquetMetaData::from_thrift_bytes(data); @@ -923,52 +928,14 @@ mod tests { // setup: preserve chunk to object store let mut generator = ChunkGenerator::new().await; generator.set_config(GeneratorConfig::NoData); - let (chunk, _) = generator.generate().await; - let parquet_metadata = chunk.parquet_metadata(); - let decoded = parquet_metadata.decode().unwrap(); - - // step 1: read back schema - let schema_actual = decoded.read_schema().unwrap(); - let schema_expected = chunk.schema(); - assert_eq!(schema_actual, schema_expected); - - // step 2: reading back statistics fails - let res = decoded.read_statistics(&schema_actual); - assert_eq!( - res.unwrap_err().to_string(), - "No row group found, cannot recover statistics" - ); - } - - #[tokio::test] - async fn test_restore_from_thrift_no_row_group() { - // setup: write chunk to object store and only keep thrift-encoded metadata - let mut generator = ChunkGenerator::new().await; - generator.set_config(GeneratorConfig::NoData); - let (chunk, _) = generator.generate().await; - let parquet_metadata = chunk.parquet_metadata(); - - let data = parquet_metadata.thrift_bytes().to_vec(); - let parquet_metadata = IoxParquetMetaData::from_thrift_bytes(data); - let decoded = parquet_metadata.decode().unwrap(); - - // step 1: read back schema - let schema_actual = decoded.read_schema().unwrap(); - let schema_expected = chunk.schema(); - assert_eq!(schema_actual, schema_expected); - - // step 2: reading back statistics fails - let res = decoded.read_statistics(&schema_actual); - assert_eq!( - res.unwrap_err().to_string(), - "No row group found, cannot recover statistics" - ); + let result = generator.generate().await; + assert!(result.is_none()); } #[tokio::test] async fn test_make_chunk() { let mut generator = ChunkGenerator::new().await; - let (chunk, _) = generator.generate().await; + let (chunk, _) = generator.generate().await.unwrap(); let parquet_metadata = chunk.parquet_metadata(); let decoded = parquet_metadata.decode().unwrap(); @@ -994,30 +961,6 @@ mod tests { } } - #[tokio::test] - async fn test_make_chunk_no_row_group() { - let mut generator = ChunkGenerator::new().await; - generator.set_config(GeneratorConfig::NoData); - let (chunk, _) = generator.generate().await; - let parquet_metadata = chunk.parquet_metadata(); - let decoded = parquet_metadata.decode().unwrap(); - - assert_eq!(decoded.md.num_row_groups(), 0); - assert_ne!(decoded.md.file_metadata().schema_descr().num_columns(), 0); - assert_eq!(decoded.md.file_metadata().num_rows(), 0); - - // column count in summary including the timestamp column - assert_eq!( - chunk.table_summary().columns.len(), - decoded.md.file_metadata().schema_descr().num_columns() - ); - - // check column names - for column in decoded.md.file_metadata().schema_descr().columns() { - assert!((column.name() == TIME_COLUMN_NAME) || column.name().starts_with("foo_")); - } - } - #[test] fn test_iox_metadata_from_protobuf_checks_version() { let table_name = Arc::from("table1"); @@ -1062,7 +1005,7 @@ mod tests { async fn test_parquet_metadata_size() { // setup: preserve chunk to object store let mut generator = ChunkGenerator::new().await; - let (chunk, _) = generator.generate().await; + let (chunk, _) = generator.generate().await.unwrap(); let parquet_metadata = chunk.parquet_metadata(); assert_eq!(parquet_metadata.size(), 3729); } diff --git a/parquet_file/src/storage.rs b/parquet_file/src/storage.rs index 343dbe0e24..e6cdc1fb76 100644 --- a/parquet_file/src/storage.rs +++ b/parquet_file/src/storage.rs @@ -77,6 +77,9 @@ pub enum Error { ParquetReader { source: parquet::errors::ParquetError, }, + + #[snafu(display("No data to convert to parquet"))] + NoData {}, } pub type Result = std::result::Result; @@ -109,14 +112,16 @@ impl Storage { let schema = stream.schema(); let data = Self::parquet_stream_to_bytes(stream, schema, metadata).await?; - // TODO: make this work w/o cloning the byte vector (https://github.com/influxdata/influxdb_iox/issues/1504) + // no data if data.is_empty() { return Ok(None); } + // TODO: make this work w/o cloning the byte vector (https://github.com/influxdata/influxdb_iox/issues/1504) let file_size_bytes = data.len(); - let md = - IoxParquetMetaData::from_file_bytes(data.clone()).context(ExtractingMetadataFailure)?; + let md = IoxParquetMetaData::from_file_bytes(data.clone()) + .context(ExtractingMetadataFailure)? + .context(NoData)?; self.to_object_store(data, &path).await?; Ok(Some((path, file_size_bytes, md))) @@ -146,10 +151,15 @@ impl Storage { { let mut writer = ArrowWriter::try_new(mem_writer.clone(), schema, Some(props)) .context(OpeningParquetWriter)?; + let mut no_stream_data = true; while let Some(batch) = stream.next().await { + no_stream_data = false; let batch = batch.context(ReadingStream)?; writer.write(&batch).context(WritingParquetToMemory)?; } + if no_stream_data { + return Ok(vec![]); + } writer.close().context(ClosingParquetWriter)?; } // drop the reference to the MemWriter that the SerializedFileWriter has @@ -362,10 +372,10 @@ mod tests { }; // create parquet file - let (_record_batches, schema, _column_summaries, _num_rows) = - make_record_batch("foo", TestSize::Full); + let (record_batches, schema, _column_summaries, _num_rows) = + make_record_batch("foo", TestSize::Minimal); let stream: SendableRecordBatchStream = Box::pin(MemoryStream::new_with_schema( - vec![], + record_batches, Arc::clone(schema.inner()), )); let bytes = @@ -374,7 +384,7 @@ mod tests { .unwrap(); // extract metadata - let md = IoxParquetMetaData::from_file_bytes(bytes).unwrap(); + let md = IoxParquetMetaData::from_file_bytes(bytes).unwrap().unwrap(); let metadata_roundtrip = md.decode().unwrap().read_iox_metadata().unwrap(); // compare with input @@ -485,7 +495,7 @@ mod tests { // Store the data as a chunk and write it to in the object store // This tests Storage::write_to_object_store let mut generator = ChunkGenerator::new().await; - let (chunk, _) = generator.generate().await; + let (chunk, _) = generator.generate().await.unwrap(); let key_value_metadata = chunk.schema().as_arrow().metadata().clone(); //////////////////// @@ -494,7 +504,9 @@ mod tests { let parquet_data = load_parquet_from_store(&chunk, Arc::clone(generator.store())) .await .unwrap(); - let parquet_metadata = IoxParquetMetaData::from_file_bytes(parquet_data.clone()).unwrap(); + let parquet_metadata = IoxParquetMetaData::from_file_bytes(parquet_data.clone()) + .unwrap() + .unwrap(); let decoded = parquet_metadata.decode().unwrap(); // // 1. Check metadata at file level: Everything is correct diff --git a/parquet_file/src/test_utils/generator.rs b/parquet_file/src/test_utils/generator.rs index fe44b67ccf..ade2faea67 100644 --- a/parquet_file/src/test_utils/generator.rs +++ b/parquet_file/src/test_utils/generator.rs @@ -66,13 +66,13 @@ impl ChunkGenerator { &self.partition } - pub async fn generate(&mut self) -> (ParquetChunk, IoxMetadata) { + pub async fn generate(&mut self) -> Option<(ParquetChunk, IoxMetadata)> { let id = self.next_chunk; self.next_chunk += 1; self.generate_id(id).await } - pub async fn generate_id(&mut self, id: u32) -> (ParquetChunk, IoxMetadata) { + pub async fn generate_id(&mut self, id: u32) -> Option<(ParquetChunk, IoxMetadata)> { let (partition_checkpoint, database_checkpoint) = create_partition_and_database_checkpoint( Arc::clone(&self.partition.table_name), Arc::clone(&self.partition.partition_key), @@ -116,13 +116,15 @@ impl ChunkGenerator { Arc::clone(schema.inner()), )); - let (path, file_size_bytes, parquet_metadata) = self + let written_result = self .storage .write_to_object_store(chunk_addr, stream, metadata.clone()) .await - .unwrap() .unwrap(); + written_result.as_ref()?; + let (path, file_size_bytes, parquet_metadata) = written_result.unwrap(); + let chunk = ParquetChunk::new_from_parts( Arc::clone(&self.partition.partition_key), Arc::new(table_summary), @@ -135,6 +137,6 @@ impl ChunkGenerator { ChunkMetrics::new_unregistered(), ); - (chunk, metadata) + Some((chunk, metadata)) } } diff --git a/query/src/frontend/reorg.rs b/query/src/frontend/reorg.rs index e4aeec6616..7fc4550f8c 100644 --- a/query/src/frontend/reorg.rs +++ b/query/src/frontend/reorg.rs @@ -48,6 +48,38 @@ impl ReorgPlanner { Self::default() } + /// Creates an execution plan for a full scan of a single chunk. + /// This plan is primarilty used to load chunks from one storage medium to + /// another. + pub fn scan_single_chunk_plan( + &self, + schema: Arc, + chunk: Arc, + ) -> Result + where + C: QueryChunk + 'static, + { + let table_name = chunk.table_name(); + // Prepare the plan for the table + let mut builder = ProviderBuilder::new(table_name, schema); + + // There are no predicates in these plans, so no need to prune them + builder = builder.add_no_op_pruner(); + builder = builder.add_chunk(Arc::clone(&chunk)); + + let provider = builder.build().context(CreatingProvider { table_name })?; + + // Logical plan to scan all columns with no predicates + let plan = LogicalPlanBuilder::scan(table_name, Arc::new(provider) as _, None) + .context(BuildingPlan)? + .build() + .context(BuildingPlan)?; + + debug!(%table_name, plan=%plan.display_indent_schema(), + "created single chunk scan plan"); + Ok(plan) + } + /// Creates an execution plan for the COMPACT operations which does the following: /// /// 1. Merges chunks together into a single stream @@ -186,12 +218,13 @@ impl ReorgPlanner { } /// Creates a scan plan for the given set of chunks. - /// Output data of the scan will be deduplicated and sorted - /// on the optimal sort order of the chunks' PK columns (tags and time). + /// Output data of the scan will be deduplicated sorted if `sort=true` on + /// the optimal sort order of the chunks' PK columns (tags and time). + /// /// The optimal sort order is computed based on the PK columns cardinality /// that will be best for RLE encoding. /// - /// Prefer to query::provider::build_scan_plan for the detail of the plan + /// Refer to query::provider::build_scan_plan for the detail of the plan /// fn sorted_scan_plan(&self, schema: Arc, chunks: I) -> Result> where @@ -207,7 +240,6 @@ impl ReorgPlanner { // Prepare the plan for the table let mut builder = ProviderBuilder::new(table_name, schema); - // Tell the scan of this provider to sort its output on the chunks' PK builder.ensure_pk_sort(); @@ -318,6 +350,45 @@ mod test { (Arc::new(schema), vec![chunk1, chunk2]) } + #[tokio::test] + async fn test_sorted_scan_plan() { + test_helpers::maybe_start_logging(); + + let (schema, chunks) = get_test_chunks().await; + let scan_plan = ReorgPlanner::new() + .scan_single_chunk_plan(schema, chunks.into_iter().next().unwrap()) + .expect("created compact plan"); + + let executor = Executor::new(1); + let physical_plan = executor + .new_context(ExecutorType::Reorg) + .prepare_plan(&scan_plan) + .await + .unwrap(); + + // single chunk processed + assert_eq!(physical_plan.output_partitioning().partition_count(), 1); + + let batches = datafusion::physical_plan::collect(physical_plan) + .await + .unwrap(); + + // all data from chunk + let expected = vec![ + "+-----------+------------+------+--------------------------------+", + "| field_int | field_int2 | tag1 | time |", + "+-----------+------------+------+--------------------------------+", + "| 1000 | | MT | 1970-01-01T00:00:00.000001Z |", + "| 10 | | MT | 1970-01-01T00:00:00.000007Z |", + "| 70 | | CT | 1970-01-01T00:00:00.000000100Z |", + "| 100 | | AL | 1970-01-01T00:00:00.000000050Z |", + "| 5 | | MT | 1970-01-01T00:00:00.000005Z |", + "+-----------+------------+------+--------------------------------+", + ]; + + assert_batches_eq!(&expected, &batches); + } + #[tokio::test] async fn test_compact_plan() { test_helpers::maybe_start_logging(); diff --git a/query/src/lib.rs b/query/src/lib.rs index 2ecc90b4c4..da63522170 100644 --- a/query/src/lib.rs +++ b/query/src/lib.rs @@ -99,6 +99,9 @@ pub trait QueryDatabase: Debug + Send + Sync { /// Return a summary of all chunks in this database, in all partitions fn chunk_summaries(&self) -> Result, Self::Error>; + + /// Record that particular type of query was run / planned + fn record_query(&self, query_type: impl Into, query_text: impl Into); } /// Collection of data that shares the same partition key diff --git a/query/src/test.rs b/query/src/test.rs index 9791679958..5c405e520a 100644 --- a/query/src/test.rs +++ b/query/src/test.rs @@ -134,6 +134,8 @@ impl QueryDatabase for TestDatabase { found_one.then(|| Arc::new(merger.build())) } + + fn record_query(&self, _query_type: impl Into, _query_text: impl Into) {} } impl ExecutionContextProvider for TestDatabase { diff --git a/query_tests/cases/in/all_chunks_dropped.expected b/query_tests/cases/in/all_chunks_dropped.expected index 65e17df50a..c18cbb81b9 100644 --- a/query_tests/cases/in/all_chunks_dropped.expected +++ b/query_tests/cases/in/all_chunks_dropped.expected @@ -1,27 +1,7 @@ -- Test Setup: OneMeasurementAllChunksDropped --- SQL: SELECT * from information_schema.tables; -+---------------+--------------------+---------------------+------------+ -| table_catalog | table_schema | table_name | table_type | -+---------------+--------------------+---------------------+------------+ -| public | iox | h2o | BASE TABLE | -| public | system | chunks | BASE TABLE | -| public | system | columns | BASE TABLE | -| public | system | chunk_columns | BASE TABLE | -| public | system | operations | BASE TABLE | -| public | system | persistence_windows | BASE TABLE | -| public | information_schema | tables | VIEW | -| public | information_schema | columns | VIEW | -+---------------+--------------------+---------------------+------------+ --- SQL: SHOW TABLES; -+---------------+--------------------+---------------------+------------+ -| table_catalog | table_schema | table_name | table_type | -+---------------+--------------------+---------------------+------------+ -| public | iox | h2o | BASE TABLE | -| public | system | chunks | BASE TABLE | -| public | system | columns | BASE TABLE | -| public | system | chunk_columns | BASE TABLE | -| public | system | operations | BASE TABLE | -| public | system | persistence_windows | BASE TABLE | -| public | information_schema | tables | VIEW | -| public | information_schema | columns | VIEW | -+---------------+--------------------+---------------------+------------+ +-- SQL: SELECT * from information_schema.tables where table_schema = 'iox'; ++---------------+--------------+------------+------------+ +| table_catalog | table_schema | table_name | table_type | ++---------------+--------------+------------+------------+ +| public | iox | h2o | BASE TABLE | ++---------------+--------------+------------+------------+ diff --git a/query_tests/cases/in/all_chunks_dropped.sql b/query_tests/cases/in/all_chunks_dropped.sql index ff66029f2e..3059ff83db 100644 --- a/query_tests/cases/in/all_chunks_dropped.sql +++ b/query_tests/cases/in/all_chunks_dropped.sql @@ -1,8 +1,4 @@ --- Test for predicate push down explains -- IOX_SETUP: OneMeasurementAllChunksDropped --- list information schema -SELECT * from information_schema.tables; - --- same but shorter -SHOW TABLES; +-- list information schema (show that all the chunks were dropped) +SELECT * from information_schema.tables where table_schema = 'iox'; diff --git a/query_tests/src/sql.rs b/query_tests/src/sql.rs index c3a3ebae2e..07104b2353 100644 --- a/query_tests/src/sql.rs +++ b/query_tests/src/sql.rs @@ -140,6 +140,7 @@ async fn sql_select_from_information_schema_tables() { "| public | system | columns | BASE TABLE |", "| public | system | operations | BASE TABLE |", "| public | system | persistence_windows | BASE TABLE |", + "| public | system | queries | BASE TABLE |", "+---------------+--------------------+---------------------+------------+", ]; run_sql_test_case( diff --git a/server/src/database.rs b/server/src/database.rs index cb3a9fc6df..c2685c803f 100644 --- a/server/src/database.rs +++ b/server/src/database.rs @@ -74,6 +74,28 @@ pub enum Error { source: Box, }, + #[snafu(display( + "database ({}) in invalid state for catalog rebuild ({:?}). Expected {}", + db_name, + state, + expected + ))] + InvalidStateForRebuild { + db_name: String, + expected: String, + state: DatabaseStateCode, + }, + + #[snafu(display( + "Internal error during rebuild. Database ({}) transitioned to unexpected state ({:?})", + db_name, + state, + ))] + UnexpectedTransitionForRebuild { + db_name: String, + state: DatabaseStateCode, + }, + #[snafu(display( "failed to rebuild preserved catalog of database ({}): {}", db_name, @@ -548,13 +570,28 @@ impl Database { Ok(tracker) } - /// Recover from a CatalogLoadError by wiping and rebuilding the catalog - pub async fn rebuild_preserved_catalog(&self) -> Result, Error> { + /// Rebuilding the catalog from parquet files. This can be used to + /// recover from a CatalogLoadError, or if new parquet files are + /// added to the data directory + pub async fn rebuild_preserved_catalog(&self, force: bool) -> Result, Error> { let handle = self.shared.state.read().freeze(); let handle = handle.await; let db_name = &self.shared.config.name; - let current_state = error_state!(self, "RebuildPreservedCatalog", CatalogLoadError); + + { + // If the force flag is not specified, can only rebuild + // the catalog if it is in ended up in an error loading + let state = self.shared.state.read(); + if !force && !matches!(&**state, DatabaseState::CatalogLoadError { .. }) { + return InvalidStateForRebuild { + db_name, + state: state.state_code(), + expected: "(CatalogLoadError)", + } + .fail(); + } + } let registry = self.shared.application.job_registry(); let (tracker, registration) = registry.register(Job::RebuildPreservedCatalog { @@ -562,19 +599,59 @@ impl Database { }); let shared = Arc::clone(&self.shared); + let iox_object_store = self.iox_object_store().context(InvalidStateForRebuild { + db_name, + state: shared.state.read().state_code(), + expected: "Object store initialized", + })?; tokio::spawn( async move { let db_name = &shared.config.name; - // First attempt to wipe the catalog - PreservedCatalog::wipe(¤t_state.iox_object_store) + // shutdown / stop the DB if it is running so it can't + // be read / written to, while also preventing + // anything else from driving the state machine + // forward. + info!(%db_name, "rebuilding catalog, resetting database state"); + { + let mut state = shared.state.write(); + // Dropping the state here also terminates the + // LifeCycleWorker and WriteBufferConsumer so all + // background work should have completed. + *state.unfreeze(handle) = DatabaseState::Known(DatabaseStateKnown {}); + // tell existing db background tasks, if any, to start shutting down + shared.state_notify.notify_waiters(); + }; + + // get another freeze handle to prevent anything else + // from messing with the state while we rebuild the + // catalog (is there a better way??) + let handle = shared.state.read().freeze(); + let _handle = handle.await; + + // check that during lock gap the state has not changed + { + let state = shared.state.read(); + ensure!( + matches!(&**state, DatabaseState::Known(_)), + UnexpectedTransitionForRebuild { + db_name, + state: state.state_code() + } + ); + } + + info!(%db_name, "rebuilding catalog from parquet files"); + + // Now wipe the catalog and rebuild it from parquet files + PreservedCatalog::wipe(iox_object_store.as_ref()) .await .map_err(Box::new) .context(WipePreservedCatalog { db_name })?; let config = PreservedCatalogConfig::new( - Arc::clone(¤t_state.iox_object_store), + Arc::clone(&iox_object_store), db_name.to_string(), Arc::clone(shared.application.time_provider()), ); @@ -583,10 +660,11 @@ impl Database { .map_err(Box::new) .context(RebuildPreservedCatalog { db_name })?; - { - let mut state = shared.state.write(); - *state.unfreeze(handle) = DatabaseState::RulesLoaded(current_state); - } + // Double check the state hasn't changed (we hold the + // freeze handle to make sure it does not) + assert!(matches!(&**shared.state.read(), DatabaseState::Known(_))); + + info!(%db_name, "catalog rebuilt successfully"); Ok::<_, Error>(()) } diff --git a/server/src/db.rs b/server/src/db.rs index 6180532f1b..1e7836091f 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -75,6 +75,7 @@ mod chunk; mod lifecycle; pub mod load; pub mod pred; +mod query_log; mod replay; mod streams; mod system_tables; @@ -278,19 +279,31 @@ pub(crate) struct DatabaseToCommit { impl Db { pub(crate) fn new(database_to_commit: DatabaseToCommit, jobs: Arc) -> Self { - let name = Arc::from(database_to_commit.rules.name.as_str()); + let DatabaseToCommit { + server_id, + iox_object_store, + exec, + preserved_catalog, + catalog, + rules, + time_provider, + metric_registry, + } = database_to_commit; - let rules = RwLock::new(database_to_commit.rules); - let server_id = database_to_commit.server_id; - let iox_object_store = Arc::clone(&database_to_commit.iox_object_store); + let name = Arc::from(rules.name.as_str()); - let catalog = Arc::new(database_to_commit.catalog); + let rules = RwLock::new(rules); + let server_id = server_id; + let iox_object_store = Arc::clone(&iox_object_store); + + let catalog = Arc::new(catalog); let catalog_access = QueryCatalogAccess::new( &*name, Arc::clone(&catalog), Arc::clone(&jobs), - database_to_commit.metric_registry.as_ref(), + Arc::clone(&time_provider), + metric_registry.as_ref(), ); let catalog_access = Arc::new(catalog_access); @@ -299,16 +312,16 @@ impl Db { name, server_id, iox_object_store, - exec: database_to_commit.exec, - preserved_catalog: Arc::new(database_to_commit.preserved_catalog), + exec, + preserved_catalog: Arc::new(preserved_catalog), catalog, jobs, - metric_registry: database_to_commit.metric_registry, + metric_registry, catalog_access, worker_iterations_cleanup: AtomicUsize::new(0), worker_iterations_delete_predicate_preservation: AtomicUsize::new(0), cleanup_lock: Default::default(), - time_provider: database_to_commit.time_provider, + time_provider, delete_predicates_mailbox: Default::default(), persisted_chunk_id_override: Default::default(), } @@ -1170,6 +1183,10 @@ impl QueryDatabase for Db { fn table_schema(&self, table_name: &str) -> Option> { self.catalog_access.table_schema(table_name) } + + fn record_query(&self, query_type: impl Into, query_text: impl Into) { + self.catalog_access.record_query(query_type, query_text) + } } impl ExecutionContextProvider for Db { @@ -2119,7 +2136,9 @@ mod tests { .await .unwrap(); - let parquet_metadata = IoxParquetMetaData::from_file_bytes(parquet_data.clone()).unwrap(); + let parquet_metadata = IoxParquetMetaData::from_file_bytes(parquet_data.clone()) + .unwrap() + .unwrap(); // Read metadata at file level let schema = parquet_metadata.decode().unwrap().read_schema().unwrap(); // Read data @@ -2246,7 +2265,9 @@ mod tests { load_parquet_from_store_for_path(&path_list[0], Arc::clone(&db.iox_object_store)) .await .unwrap(); - let parquet_metadata = IoxParquetMetaData::from_file_bytes(parquet_data.clone()).unwrap(); + let parquet_metadata = IoxParquetMetaData::from_file_bytes(parquet_data.clone()) + .unwrap() + .unwrap(); // Read metadata at file level let schema = parquet_metadata.decode().unwrap().read_schema().unwrap(); // Read data diff --git a/server/src/db/access.rs b/server/src/db/access.rs index d9fb9a3557..aa501a30bf 100644 --- a/server/src/db/access.rs +++ b/server/src/db/access.rs @@ -7,6 +7,7 @@ use std::{any::Any, sync::Arc}; use super::{ catalog::{Catalog, TableNameFilter}, chunk::DbChunk, + query_log::QueryLog, Error, Result, }; @@ -32,6 +33,10 @@ use query::{ pruning::{prune_chunks, PruningObserver}, QueryDatabase, }; +use time::TimeProvider; + +/// The number of entries to store in the circular query buffer log +const QUERY_LOG_SIZE: usize = 100; /// Metrics related to chunk access (pruning specifically) #[derive(Debug)] @@ -105,6 +110,9 @@ pub(crate) struct QueryCatalogAccess { /// Handles finding / pruning chunks based on predicates chunk_access: Arc, + /// Stores queries which have been executed + query_log: Arc, + /// Provides access to system tables system_tables: Arc, @@ -117,25 +125,28 @@ impl QueryCatalogAccess { db_name: impl Into, catalog: Arc, jobs: Arc, + time_provider: Arc, metric_registry: &metric::Registry, ) -> Self { let db_name = Arc::from(db_name.into()); let access_metrics = AccessMetrics::new(metric_registry, Arc::clone(&db_name)); let chunk_access = Arc::new(ChunkAccess::new(Arc::clone(&catalog), access_metrics)); + let query_log = Arc::new(QueryLog::new(QUERY_LOG_SIZE, time_provider)); let system_tables = Arc::new(SystemSchemaProvider::new( db_name.as_ref(), Arc::clone(&catalog), jobs, + Arc::clone(&query_log), )); let user_tables = Arc::new(DbSchemaProvider::new( Arc::clone(&catalog), Arc::clone(&chunk_access), )); - Self { catalog, chunk_access, + query_log, system_tables, user_tables, } @@ -229,6 +240,10 @@ impl QueryDatabase for QueryCatalogAccess { .ok() .map(|table| Arc::clone(&table.schema().read())) } + + fn record_query(&self, query_type: impl Into, query_text: impl Into) { + self.query_log.push(query_type, query_text) + } } // Datafusion catalog provider interface diff --git a/server/src/db/catalog/chunk.rs b/server/src/db/catalog/chunk.rs index 3b41c49cf9..b36c82047d 100644 --- a/server/src/db/catalog/chunk.rs +++ b/server/src/db/catalog/chunk.rs @@ -1007,7 +1007,7 @@ mod tests { use data_types::{delete_predicate::DeleteExpr, timestamp::TimestampRange}; use mutable_buffer::test_helpers::write_lp_to_new_chunk; - use parquet_file::test_utils::generator::{ChunkGenerator, GeneratorConfig}; + use parquet_file::test_utils::generator::ChunkGenerator; #[test] fn test_new_open() { @@ -1241,8 +1241,7 @@ mod tests { async fn make_persisted_chunk() -> CatalogChunk { let mut generator = ChunkGenerator::new().await; - generator.set_config(GeneratorConfig::NoData); - let (parquet_chunk, metadata) = generator.generate().await; + let (parquet_chunk, metadata) = generator.generate().await.unwrap(); let addr = ChunkAddr::new(generator.partition(), metadata.chunk_id); let now = Time::from_timestamp_nanos(43564); diff --git a/server/src/db/lifecycle/compact_object_store.rs b/server/src/db/lifecycle/compact_object_store.rs index 8d923e5c6f..41c7ca489d 100644 --- a/server/src/db/lifecycle/compact_object_store.rs +++ b/server/src/db/lifecycle/compact_object_store.rs @@ -847,7 +847,6 @@ mod tests { assert_eq!(min_partition_checkpoint, compacted_partition_checkpoint); } - #[ignore] #[tokio::test] async fn test_compact_os_on_chunk_delete_all() { test_helpers::maybe_start_logging(); @@ -895,15 +894,12 @@ mod tests { // compact the only OS chunk let partition = partition.upgrade(); let chunk1 = chunks[0].write(); - let compacted_chunk = compact_object_store_chunks(partition, vec![chunk1]) + compact_object_store_chunks(partition, vec![chunk1]) .unwrap() .1 .await + .unwrap() .unwrap(); - //.unwrap(); - - let err = compacted_chunk.unwrap_err(); - println!("{}", err.to_string()); // verify results let partition = db.partition("cpu", partition_key).unwrap(); diff --git a/server/src/db/lifecycle/load.rs b/server/src/db/lifecycle/load.rs index fce489dc5a..a35cffa0a1 100644 --- a/server/src/db/lifecycle/load.rs +++ b/server/src/db/lifecycle/load.rs @@ -8,7 +8,7 @@ use lifecycle::LifecycleWriteGuard; use observability_deps::tracing::info; use query::exec::ExecutorType; use query::frontend::reorg::ReorgPlanner; -use query::{compute_sort_key, QueryChunkMeta}; +use query::QueryChunkMeta; use tracker::{TaskTracker, TrackedFuture, TrackedFutureExt}; use crate::db::lifecycle::collect_rub; @@ -43,14 +43,8 @@ pub fn load_chunk( let ctx = db.exec.new_context(ExecutorType::Reorg); let fut = async move { - let key = compute_sort_key(std::iter::once(db_chunk.summary())); - - // Cannot move query_chunks as the sort key borrows the column names - let (_, plan) = ReorgPlanner::new().compact_plan( - db_chunk.schema(), - std::iter::once(Arc::clone(&db_chunk)), - key, - )?; + let plan = + ReorgPlanner::new().scan_single_chunk_plan(db_chunk.schema(), Arc::clone(&db_chunk))?; let physical_plan = ctx.prepare_plan(&plan).await?; let stream = ctx.execute_stream(physical_plan).await?; diff --git a/server/src/db/query_log.rs b/server/src/db/query_log.rs new file mode 100644 index 0000000000..d20365ed5b --- /dev/null +++ b/server/src/db/query_log.rs @@ -0,0 +1,77 @@ +//! Ring buffer of queries that have been run with some brief information + +use std::{collections::VecDeque, sync::Arc}; + +use parking_lot::Mutex; +use time::{Time, TimeProvider}; + +/// Information about a single query that was executed +#[derive(Debug)] +pub struct QueryLogEntry { + /// The type of query + pub query_type: String, + + /// The text of the query (SQL for sql queries, pbjson for storage rpc queries) + pub query_text: String, + + /// Time at which the query was run + pub issue_time: Time, +} + +impl QueryLogEntry { + /// Creates a new QueryLogEntry -- use `QueryLog::push` to add new entries to the log + fn new(query_type: String, query_text: String, issue_time: Time) -> Self { + Self { + query_type, + query_text, + issue_time, + } + } +} + +/// Stores a fixed number `QueryExcutions` -- handles locking +/// internally so can be shared across multiple +#[derive(Debug)] +pub struct QueryLog { + log: Mutex>>, + max_size: usize, + time_provider: Arc, +} + +impl QueryLog { + /// Create a new QueryLog that can hold at most `size` items. + /// When the `size+1` item is added, item `0` is evicted. + pub fn new(max_size: usize, time_provider: Arc) -> Self { + Self { + log: Mutex::new(VecDeque::with_capacity(max_size)), + max_size, + time_provider, + } + } + + pub fn push(&self, query_type: impl Into, query_text: impl Into) { + if self.max_size == 0 { + return; + } + + let entry = Arc::new(QueryLogEntry::new( + query_type.into(), + query_text.into(), + self.time_provider.now(), + )); + + let mut log = self.log.lock(); + + // enforce limit + if log.len() == self.max_size { + log.pop_front(); + } + + log.push_back(entry); + } + + pub fn entries(&self) -> VecDeque> { + let log = self.log.lock(); + log.clone() + } +} diff --git a/server/src/db/system_tables.rs b/server/src/db/system_tables.rs index ac2aa3ea99..2ddf16b703 100644 --- a/server/src/db/system_tables.rs +++ b/server/src/db/system_tables.rs @@ -7,7 +7,7 @@ //! //! For example `SELECT * FROM system.chunks` -use super::catalog::Catalog; +use super::{catalog::Catalog, query_log::QueryLog}; use crate::JobRegistry; use arrow::{ datatypes::{Field, Schema, SchemaRef}, @@ -27,6 +27,7 @@ mod chunks; mod columns; mod operations; mod persistence; +mod queries; // The IOx system schema pub const SYSTEM_SCHEMA: &str = "system"; @@ -36,6 +37,7 @@ const COLUMNS: &str = "columns"; const CHUNK_COLUMNS: &str = "chunk_columns"; const OPERATIONS: &str = "operations"; const PERSISTENCE_WINDOWS: &str = "persistence_windows"; +const QUERIES: &str = "queries"; pub struct SystemSchemaProvider { chunks: Arc, @@ -43,6 +45,7 @@ pub struct SystemSchemaProvider { chunk_columns: Arc, operations: Arc, persistence_windows: Arc, + queries: Arc, } impl std::fmt::Debug for SystemSchemaProvider { @@ -54,7 +57,12 @@ impl std::fmt::Debug for SystemSchemaProvider { } impl SystemSchemaProvider { - pub fn new(db_name: impl Into, catalog: Arc, jobs: Arc) -> Self { + pub fn new( + db_name: impl Into, + catalog: Arc, + jobs: Arc, + query_log: Arc, + ) -> Self { let db_name = db_name.into(); let chunks = Arc::new(SystemTableProvider { inner: chunks::ChunksTable::new(Arc::clone(&catalog)), @@ -71,22 +79,27 @@ impl SystemSchemaProvider { let persistence_windows = Arc::new(SystemTableProvider { inner: persistence::PersistenceWindowsTable::new(catalog), }); + let queries = Arc::new(SystemTableProvider { + inner: queries::QueriesTable::new(query_log), + }); Self { chunks, columns, chunk_columns, operations, persistence_windows, + queries, } } } -const ALL_SYSTEM_TABLES: [&str; 5] = [ +const ALL_SYSTEM_TABLES: [&str; 6] = [ CHUNKS, COLUMNS, CHUNK_COLUMNS, OPERATIONS, PERSISTENCE_WINDOWS, + QUERIES, ]; impl SchemaProvider for SystemSchemaProvider { @@ -108,6 +121,7 @@ impl SchemaProvider for SystemSchemaProvider { CHUNK_COLUMNS => Some(Arc::clone(&self.chunk_columns)), OPERATIONS => Some(Arc::clone(&self.operations)), PERSISTENCE_WINDOWS => Some(Arc::clone(&self.persistence_windows)), + QUERIES => Some(Arc::clone(&self.queries)), _ => None, } } diff --git a/server/src/db/system_tables/queries.rs b/server/src/db/system_tables/queries.rs new file mode 100644 index 0000000000..3df4a0cb71 --- /dev/null +++ b/server/src/db/system_tables/queries.rs @@ -0,0 +1,113 @@ +use crate::db::{ + query_log::{QueryLog, QueryLogEntry}, + system_tables::IoxSystemTable, +}; +use arrow::{ + array::{StringArray, TimestampNanosecondArray}, + datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit}, + error::Result, + record_batch::RecordBatch, +}; +use data_types::error::ErrorLogger; +use std::{collections::VecDeque, sync::Arc}; + +/// Implementation of system.queries table +#[derive(Debug)] +pub(super) struct QueriesTable { + schema: SchemaRef, + query_log: Arc, +} + +impl QueriesTable { + pub(super) fn new(query_log: Arc) -> Self { + Self { + schema: queries_schema(), + query_log, + } + } +} + +impl IoxSystemTable for QueriesTable { + fn schema(&self) -> SchemaRef { + Arc::clone(&self.schema) + } + + fn batch(&self) -> Result { + from_query_log_entries(self.schema(), self.query_log.entries()) + .log_if_error("system.chunks table") + } +} + +fn queries_schema() -> SchemaRef { + Arc::new(Schema::new(vec![ + Field::new( + "issue_time", + DataType::Timestamp(TimeUnit::Nanosecond, None), + false, + ), + Field::new("query_type", DataType::Utf8, false), + Field::new("query_text", DataType::Utf8, false), + ])) +} + +fn from_query_log_entries( + schema: SchemaRef, + entries: VecDeque>, +) -> Result { + let issue_time = entries + .iter() + .map(|e| e.issue_time) + .map(|ts| Some(ts.timestamp_nanos())) + .collect::(); + + let query_type = entries + .iter() + .map(|e| Some(&e.query_type)) + .collect::(); + + let query_text = entries + .iter() + .map(|e| Some(&e.query_text)) + .collect::(); + + RecordBatch::try_new( + schema, + vec![ + Arc::new(issue_time), + Arc::new(query_type), + Arc::new(query_text), + ], + ) +} + +#[cfg(test)] +mod tests { + use super::*; + use arrow_util::assert_batches_eq; + use time::{Time, TimeProvider}; + + #[test] + fn test_from_query_log() { + let now = Time::from_rfc3339("1996-12-19T16:39:57+00:00").unwrap(); + let time_provider = Arc::new(time::MockProvider::new(now)); + let query_log = QueryLog::new(10, Arc::clone(&time_provider) as Arc); + query_log.push("sql", "select * from foo"); + time_provider.inc(std::time::Duration::from_secs(24 * 60 * 60)); + query_log.push("sql", "select * from bar"); + query_log.push("read_filter", "json goop"); + + let expected = vec![ + "+----------------------+-------------+-------------------+", + "| issue_time | query_type | query_text |", + "+----------------------+-------------+-------------------+", + "| 1996-12-19T16:39:57Z | sql | select * from foo |", + "| 1996-12-20T16:39:57Z | sql | select * from bar |", + "| 1996-12-20T16:39:57Z | read_filter | json goop |", + "+----------------------+-------------+-------------------+", + ]; + + let schema = queries_schema(); + let batch = from_query_log_entries(schema, query_log.entries()).unwrap(); + assert_batches_eq!(&expected, &[batch]); + } +}