Merge branch 'main' into ntran/compact_os_sql_tests

pull/24376/head
kodiakhq[bot] 2021-12-08 16:51:41 +00:00 committed by GitHub
commit a374dfdc8f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
49 changed files with 1342 additions and 420 deletions

1
Cargo.lock generated
View File

@ -1528,6 +1528,7 @@ dependencies = [
"assert_cmd",
"async-trait",
"backtrace",
"base64 0.13.0",
"byteorder",
"bytes",
"chrono",

View File

@ -1,5 +1,8 @@
//! A collection of testing functions for arrow based code
use std::sync::Arc;
use arrow::{
array::{ArrayRef, StringArray},
compute::kernels::sort::{lexsort, SortColumn, SortOptions},
record_batch::RecordBatch,
};
@ -89,3 +92,42 @@ pub fn sort_record_batch(batch: RecordBatch) -> RecordBatch {
RecordBatch::try_new(batch.schema(), sort_output).unwrap()
}
/// Return a new `StringArray` where each element had a normalization
/// function `norm` applied.
pub fn normalize_string_array<N>(arr: &StringArray, norm: N) -> ArrayRef
where
N: Fn(&str) -> String,
{
let normalized: StringArray = arr.iter().map(|s| s.map(&norm)).collect();
Arc::new(normalized)
}
/// Return a new set of `RecordBatch`es where the function `norm` has
/// applied to all `StringArray` rows.
pub fn normalize_batches<N>(batches: Vec<RecordBatch>, norm: N) -> Vec<RecordBatch>
where
N: Fn(&str) -> String,
{
// The idea here is is to get a function that normalizes strings
// and apply it to each StringArray element by element
batches
.into_iter()
.map(|batch| {
let new_columns: Vec<_> = batch
.columns()
.iter()
.map(|array| {
if let Some(array) = array.as_any().downcast_ref::<StringArray>() {
normalize_string_array(array, &norm)
} else {
Arc::clone(array)
}
})
.collect();
RecordBatch::try_new(batch.schema(), new_columns)
.expect("error occured during normalization")
})
.collect()
}

View File

@ -85,7 +85,7 @@ Created database imported_db
### Copy parquet files into the new database
IOx stores data files in `<db_uuid>/data/<table_name>/<partition>`,
IOx stores parquet files in `<db_uuid>/data/<table_name>/<partition>`,
and the imported data files must be in the same structure.
For example, if you are running IOx with data directory of `~/.influxdb_iox` the data
@ -106,43 +106,20 @@ my_awesome_table
Copy that directory structure into the the database's catalog:
```shell
mkdir -p ~/.influxdb_iox/dbs/4fc2236c-7ab8-4200-83c7-f29cd0c2385f/data
cp -R 'my_awesome_table' ~/.influxdb_iox/dbs/4fc2236c-7ab8-4200-83c7-f29cd0c2385f/data/
```
### Break the catalog
### WARNING
At the time of writing, in order to rebuild a catalog from parquet
files the catalog must be corrupted. One way to do so manually is to
find a transaction file, and write some junk into it. For example:
Note that If you create/move/copy files into the domain of a running database, the database's background cleanup worker thread may delete these files prior to the catalog rebuild; You can check the logs to see if this happened.
### Rebuild catalog with the `recover rebuild --force` command
Now, tell IOx to rebuild the catalog from parquet files:
```shell
find ~/.influxdb_iox/dbs/4fc2236c-7ab8-4200-83c7-f29cd0c2385f/transactions -type f
/Users/alamb/.influxdb_iox/dbs/4fc2236c-7ab8-4200-83c7-f29cd0c2385f/transactions/00000000000000000000/8dda6fb8-6907-4d89-b133-85536ccd9bd3.txn
# write something bad into the txn file
echo "JUNK" > /Users/alamb/.influxdb_iox/dbs/4fc2236c-7ab8-4200-83c7-f29cd0c2385f/transactions/00000000000000000000/8dda6fb8-6907-4d89-b133-85536ccd9bd3.txn
```
### Restart IOx (with `--wipe-catalog-on-error=false`):
In another terminal, restart the IOx server with `--wipe-catalog-on-error=false` (which is critical).
```shell
cargo run -- run -v --object-store=file --data-dir=$HOME/.influxdb_iox --server-id=42 --wipe-catalog-on-error=false
```
The database should enter into the `CatalogLoadError` state.
```text
2021-11-30T15:44:43.784118Z ERROR server::database: database in error state - operator intervention required db_name=imported_db e=error loading catalog: Cannot load preserved catalog: ... state=CatalogLoadError
```
### Use the `recover rebuild` command
Now, rebuild the catalog:
```shell
./target/debug/influxdb_iox database recover rebuild imported_db
./target/debug/influxdb_iox database recover rebuild imported_db --force
{
"operation": {
"name": "0",

View File

@ -73,7 +73,7 @@ service ManagementService {
// Get server status
rpc GetServerStatus(GetServerStatusRequest) returns (GetServerStatusResponse);
// Rebuild preserved catalog for given DB
// Rebuild preserved catalog from parquet files for given DB
rpc RebuildPreservedCatalog(RebuildPreservedCatalogRequest) returns (RebuildPreservedCatalogResponse);
// Wipe preserved catalog for given DB.
@ -432,6 +432,10 @@ message WipePreservedCatalogResponse {
message RebuildPreservedCatalogRequest {
// the name of the database
string db_name = 1;
// force the catalog to be rebuilt, even if the database has started
// successfully
bool force = 2;
}
message RebuildPreservedCatalogResponse {

View File

@ -5,6 +5,9 @@ option go_package = "github.com/influxdata/iox/router/v1";
import "influxdata/iox/router/v1/router.proto";
service RouterService {
// Get router.
rpc GetRouter(GetRouterRequest) returns (GetRouterResponse);
// List configured routers.
rpc ListRouters(ListRoutersRequest) returns (ListRoutersResponse);
@ -15,6 +18,14 @@ service RouterService {
rpc DeleteRouter(DeleteRouterRequest) returns (DeleteRouterResponse);
}
message GetRouterRequest {
string router_name = 1;
}
message GetRouterResponse {
Router router = 1;
}
message ListRoutersRequest {}
message ListRoutersResponse {

View File

@ -1,6 +1,29 @@
use crate::influxdata::iox::management::v1 as management;
use data_types::job::Job;
impl management::operation_metadata::Job {
/// Return the db_name for this job
pub fn db_name(&self) -> &str {
match self {
Self::Dummy(management::Dummy { db_name, .. }) => db_name,
Self::WriteChunk(management::WriteChunk { db_name, .. }) => db_name,
Self::WipePreservedCatalog(management::WipePreservedCatalog { db_name, .. }) => db_name,
Self::CompactChunks(management::CompactChunks { db_name, .. }) => db_name,
Self::PersistChunks(management::PersistChunks { db_name, .. }) => db_name,
Self::DropChunk(management::DropChunk { db_name, .. }) => db_name,
Self::DropPartition(management::DropPartition { db_name, .. }) => db_name,
Self::LoadReadBufferChunk(management::LoadReadBufferChunk { db_name, .. }) => db_name,
Self::RebuildPreservedCatalog(management::RebuildPreservedCatalog {
db_name, ..
}) => db_name,
Self::CompactObjectStoreChunks(management::CompactObjectStoreChunks {
db_name,
..
}) => db_name,
}
}
}
impl From<Job> for management::operation_metadata::Job {
fn from(job: Job) -> Self {
match job {

View File

@ -13,7 +13,16 @@ pub mod influxdata {
env!("OUT_DIR"),
"/influxdata.platform.storage.read.rs"
));
include!(concat!(
env!("OUT_DIR"),
"/influxdata.platform.storage.read.serde.rs"
));
include!(concat!(env!("OUT_DIR"), "/influxdata.platform.storage.rs"));
include!(concat!(
env!("OUT_DIR"),
"/influxdata.platform.storage.serde.rs"
));
// Can't implement `Default` because `prost::Message` implements `Default`
impl TimestampRange {

View File

@ -99,6 +99,7 @@ write_buffer = { path = "../write_buffer" }
# Crates.io dependencies, in alphabetical order
assert_cmd = "2.0.2"
base64 = "0.13"
hex = "0.4.2"
predicates = "2.1.0"
rand = "0.8.3"

View File

@ -48,7 +48,7 @@ enum Command {
/// Skip replay
SkipReplay(SkipReplay),
/// Rebuild preserved catalog
/// Rebuild catalog from parquet fles
Rebuild(Rebuild),
}
@ -63,9 +63,13 @@ struct Wipe {
db_name: String,
}
/// Rebuild preserved catalog.
/// Rebuild catalog from parquet files
#[derive(Debug, StructOpt)]
struct Rebuild {
/// Force rebuild, even if the database has already successfully started
#[structopt(long)]
force: bool,
/// The name of the database
db_name: String,
}
@ -104,7 +108,7 @@ pub async fn command(connection: Connection, config: Config) -> Result<()> {
}
Command::Rebuild(rebuild) => {
let operation = client
.rebuild_preserved_catalog(rebuild.db_name)
.rebuild_preserved_catalog(rebuild.db_name, rebuild.force)
.await
.context(RebuildCatalog)?;

View File

@ -0,0 +1,99 @@
//! This module implements the `router` CLI command
use influxdb_iox_client::{
connection::Connection,
router::{self, generated_types::Router as RouterConfig},
};
use structopt::StructOpt;
use thiserror::Error;
#[allow(clippy::enum_variant_names)]
#[derive(Debug, Error)]
pub enum Error {
#[error("JSON Serialization error: {0}")]
Serde(#[from] serde_json::Error),
#[error("Client error: {0}")]
ClientError(#[from] influxdb_iox_client::error::Error),
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Manage IOx databases
#[derive(Debug, StructOpt)]
pub struct Config {
#[structopt(subcommand)]
command: Command,
}
/// Create a new router or update an existing one.
#[derive(Debug, StructOpt)]
struct CreateOrUpdate {
/// The name of the router
name: String,
}
/// Return configuration of specific router
#[derive(Debug, StructOpt)]
struct Get {
/// The name of the router
name: String,
}
/// Delete specific router
#[derive(Debug, StructOpt)]
struct Delete {
/// The name of the router
name: String,
}
/// All possible subcommands for router
#[derive(Debug, StructOpt)]
enum Command {
CreateOrUpdate(CreateOrUpdate),
/// List routers
List,
Get(Get),
Delete(Delete),
}
pub async fn command(connection: Connection, config: Config) -> Result<()> {
match config.command {
Command::CreateOrUpdate(command) => {
let mut client = router::Client::new(connection);
let config = RouterConfig {
name: command.name.clone(),
..Default::default()
};
client.update_router(config).await?;
println!("Created/Updated router {}", command.name);
}
Command::List => {
let mut client = router::Client::new(connection);
let routers = client.list_routers().await?;
for router in routers {
println!("{}", router.name);
}
}
Command::Get(get) => {
let Get { name } = get;
let mut client = router::Client::new(connection);
let router = client.get_router(&name).await?;
println!("{}", serde_json::to_string_pretty(&router)?);
}
Command::Delete(delete) => {
let Delete { name } = delete;
let mut client = router::Client::new(connection);
client.delete_router(&name).await?;
println!("Deleted router {}", name);
}
}
Ok(())
}

View File

@ -13,7 +13,7 @@
// Influx crates
use data_types::{names::OrgBucketMappingError, DatabaseName};
use influxdb_iox_client::format::QueryOutputFormat;
use query::exec::ExecutionContextProvider;
use query::{exec::ExecutionContextProvider, QueryDatabase};
use server::Error;
// External crates
@ -252,6 +252,8 @@ async fn query(
let db = server.db(&db_name)?;
db.record_query("sql", &q);
let ctx = db.new_query_context(req.extensions().get().cloned());
let physical_plan = Planner::new(&ctx).sql(&q).await.context(Planning)?;

View File

@ -90,6 +90,13 @@ pub fn default_database_error_handler(error: server::database::Error) -> tonic::
error!(%source, "Unexpected error while wiping catalog");
InternalError {}.into()
}
Error::InvalidStateForRebuild { .. } => {
PreconditionViolation::DatabaseInvalidState(error.to_string()).into()
}
Error::UnexpectedTransitionForRebuild { .. } => {
error!(%error, "Unexpected error during rebuild catalog");
InternalError {}.into()
}
Error::RebuildPreservedCatalog { source, .. } => {
error!(%source, "Unexpected error while rebuilding catalog");
InternalError {}.into()

View File

@ -17,6 +17,7 @@ use arrow_flight::{
use datafusion::physical_plan::ExecutionPlan;
use futures::{SinkExt, Stream, StreamExt};
use pin_project::{pin_project, pinned_drop};
use query::QueryDatabase;
use serde::Deserialize;
use snafu::{ResultExt, Snafu};
use tokio::task::JoinHandle;
@ -170,6 +171,8 @@ impl Flight for FlightService {
.db(&database)
.map_err(default_server_error_handler)?;
db.record_query("sql", &read_info.sql_query);
let ctx = db.new_query_context(span_ctx);
let physical_plan = Planner::new(&ctx)

View File

@ -446,7 +446,7 @@ impl management_service_server::ManagementService for ManagementService {
&self,
request: Request<RebuildPreservedCatalogRequest>,
) -> Result<Response<RebuildPreservedCatalogResponse>, Status> {
let RebuildPreservedCatalogRequest { db_name } = request.into_inner();
let RebuildPreservedCatalogRequest { db_name, force } = request.into_inner();
// Validate that the database name is legit
let db_name = DatabaseName::new(db_name).scope("db_name")?;
@ -455,7 +455,7 @@ impl management_service_server::ManagementService for ManagementService {
.database(&db_name)
.map_err(default_server_error_handler)?;
let tracker = database
.rebuild_preserved_catalog()
.rebuild_preserved_catalog(force)
.await
.map_err(default_database_error_handler)?;

View File

@ -2,7 +2,7 @@
//! implemented in terms of the [`QueryDatabase`](query::QueryDatabase) and
//! [`DatabaseStore`]
use std::collections::HashMap;
use std::{collections::HashMap, sync::Arc};
use snafu::{OptionExt, ResultExt, Snafu};
use tokio::sync::mpsc;
@ -20,8 +20,12 @@ use generated_types::{
};
use observability_deps::tracing::{error, info, trace};
use predicate::predicate::PredicateBuilder;
use query::exec::{
fieldlist::FieldList, seriesset::converter::Error as SeriesSetError, ExecutionContextProvider,
use query::{
exec::{
fieldlist::FieldList, seriesset::converter::Error as SeriesSetError,
ExecutionContextProvider,
},
QueryDatabase,
};
use server::DatabaseStore;
@ -213,20 +217,21 @@ where
) -> Result<tonic::Response<Self::ReadFilterStream>, Status> {
let span_ctx = req.extensions().get().cloned();
let read_filter_request = req.into_inner();
let db_name = get_database_name(&read_filter_request)?;
info!(%db_name, ?read_filter_request.range, predicate=%read_filter_request.predicate.loggable(), "read filter");
let req = req.into_inner();
let db_name = get_database_name(&req)?;
info!(%db_name, ?req.range, predicate=%req.predicate.loggable(), "read filter");
let results = read_filter_impl(
self.db_store.as_ref(),
db_name,
read_filter_request,
span_ctx,
)
.await?
.into_iter()
.map(Ok)
.collect::<Vec<_>>();
let db = self
.db_store
.db(&db_name)
.context(DatabaseNotFound { db_name: &db_name })?;
db.record_query("read_filter", defer_json(&req));
let results = read_filter_impl(db, db_name, req, span_ctx)
.await?
.into_iter()
.map(Ok)
.collect::<Vec<_>>();
Ok(tonic::Response::new(futures::stream::iter(results)))
}
@ -238,9 +243,14 @@ where
req: tonic::Request<ReadGroupRequest>,
) -> Result<tonic::Response<Self::ReadGroupStream>, Status> {
let span_ctx = req.extensions().get().cloned();
let read_group_request = req.into_inner();
let req = req.into_inner();
let db_name = get_database_name(&read_group_request)?;
let db_name = get_database_name(&req)?;
let db = self
.db_store
.db(&db_name)
.context(DatabaseNotFound { db_name: &db_name })?;
db.record_query("read_group", defer_json(&req));
let ReadGroupRequest {
read_source: _read_source,
@ -249,7 +259,7 @@ where
group_keys,
group,
aggregate,
} = read_group_request;
} = req;
info!(%db_name, ?range, ?group_keys, ?group, ?aggregate,predicate=%predicate.loggable(),"read_group");
@ -265,19 +275,12 @@ where
let gby_agg = expr::make_read_group_aggregate(aggregate, group, group_keys)
.context(ConvertingReadGroupAggregate { aggregate_string })?;
let results = query_group_impl(
self.db_store.as_ref(),
db_name,
range,
predicate,
gby_agg,
span_ctx,
)
.await
.map_err(|e| e.to_status())?
.into_iter()
.map(Ok)
.collect::<Vec<_>>();
let results = query_group_impl(db, db_name, range, predicate, gby_agg, span_ctx)
.await
.map_err(|e| e.to_status())?
.into_iter()
.map(Ok)
.collect::<Vec<_>>();
Ok(tonic::Response::new(futures::stream::iter(results)))
}
@ -290,9 +293,14 @@ where
req: tonic::Request<ReadWindowAggregateRequest>,
) -> Result<tonic::Response<Self::ReadGroupStream>, Status> {
let span_ctx = req.extensions().get().cloned();
let read_window_aggregate_request = req.into_inner();
let req = req.into_inner();
let db_name = get_database_name(&read_window_aggregate_request)?;
let db_name = get_database_name(&req)?;
let db = self
.db_store
.db(&db_name)
.context(DatabaseNotFound { db_name: &db_name })?;
db.record_query("read_window_aggregate", defer_json(&req));
let ReadWindowAggregateRequest {
read_source: _read_source,
@ -302,7 +310,7 @@ where
offset,
aggregate,
window,
} = read_window_aggregate_request;
} = req;
info!(%db_name, ?range, ?window_every, ?offset, ?aggregate, ?window, predicate=%predicate.loggable(),"read_window_aggregate");
@ -314,19 +322,12 @@ where
let gby_agg = expr::make_read_window_aggregate(aggregate, window_every, offset, window)
.context(ConvertingWindowAggregate { aggregate_string })?;
let results = query_group_impl(
self.db_store.as_ref(),
db_name,
range,
predicate,
gby_agg,
span_ctx,
)
.await
.map_err(|e| e.to_status())?
.into_iter()
.map(Ok)
.collect::<Vec<_>>();
let results = query_group_impl(db, db_name, range, predicate, gby_agg, span_ctx)
.await
.map_err(|e| e.to_status())?
.into_iter()
.map(Ok)
.collect::<Vec<_>>();
Ok(tonic::Response::new(futures::stream::iter(results)))
}
@ -340,30 +341,28 @@ where
let span_ctx = req.extensions().get().cloned();
let (tx, rx) = mpsc::channel(4);
let tag_keys_request = req.into_inner();
let req = req.into_inner();
let db_name = get_database_name(&tag_keys_request)?;
let db_name = get_database_name(&req)?;
let db = self
.db_store
.db(&db_name)
.context(DatabaseNotFound { db_name: &db_name })?;
db.record_query("tag_keys", defer_json(&req));
let TagKeysRequest {
tags_source: _tag_source,
range,
predicate,
} = tag_keys_request;
} = req;
info!(%db_name, ?range, predicate=%predicate.loggable(), "tag_keys");
let measurement = None;
let response = tag_keys_impl(
self.db_store.as_ref(),
db_name,
measurement,
range,
predicate,
span_ctx,
)
.await
.map_err(|e| e.to_status());
let response = tag_keys_impl(db, db_name, measurement, range, predicate, span_ctx)
.await
.map_err(|e| e.to_status());
tx.send(response)
.await
@ -381,16 +380,21 @@ where
let span_ctx = req.extensions().get().cloned();
let (tx, rx) = mpsc::channel(4);
let tag_values_request = req.into_inner();
let req = req.into_inner();
let db_name = get_database_name(&tag_values_request)?;
let db_name = get_database_name(&req)?;
let db = self
.db_store
.db(&db_name)
.context(DatabaseNotFound { db_name: &db_name })?;
db.record_query("tag_values", defer_json(&req));
let TagValuesRequest {
tags_source: _tag_source,
range,
predicate,
tag_key,
} = tag_values_request;
} = req;
let measurement = None;
@ -406,19 +410,11 @@ where
.to_status());
}
measurement_name_impl(self.db_store.as_ref(), db_name, range, span_ctx).await
measurement_name_impl(db, db_name, range, span_ctx).await
} else if tag_key.is_field() {
info!(%db_name, ?range, predicate=%predicate.loggable(), "tag_values with tag_key=[xff] (field name)");
let fieldlist = field_names_impl(
self.db_store.as_ref(),
db_name,
None,
range,
predicate,
span_ctx,
)
.await?;
let fieldlist = field_names_impl(db, db_name, None, range, predicate, span_ctx).await?;
// Pick out the field names into a Vec<Vec<u8>>for return
let values = fieldlist
@ -434,7 +430,7 @@ where
info!(%db_name, ?range, %tag_key, predicate=%predicate.loggable(), "tag_values",);
tag_values_impl(
self.db_store.as_ref(),
db,
db_name,
tag_key,
measurement,
@ -460,7 +456,7 @@ where
&self,
_req: tonic::Request<ReadSeriesCardinalityRequest>,
) -> Result<tonic::Response<Self::ReadSeriesCardinalityStream>, Status> {
unimplemented!("read_series_cardinality not yet implemented");
unimplemented!("read_series_cardinality not yet implemented. https://github.com/influxdata/influxdb_iox/issues/447");
}
async fn capabilities(
@ -508,15 +504,20 @@ where
let span_ctx = req.extensions().get().cloned();
let (tx, rx) = mpsc::channel(4);
let measurement_names_request = req.into_inner();
let req = req.into_inner();
let db_name = get_database_name(&measurement_names_request)?;
let db_name = get_database_name(&req)?;
let db = self
.db_store
.db(&db_name)
.context(DatabaseNotFound { db_name: &db_name })?;
db.record_query("measurement_names", defer_json(&req));
let MeasurementNamesRequest {
source: _source,
range,
predicate,
} = measurement_names_request;
} = req;
if let Some(predicate) = predicate {
return NotYetImplemented {
@ -531,7 +532,7 @@ where
info!(%db_name, ?range, predicate=%predicate.loggable(), "measurement_names");
let response = measurement_name_impl(self.db_store.as_ref(), db_name, range, span_ctx)
let response = measurement_name_impl(db, db_name, range, span_ctx)
.await
.map_err(|e| e.to_status());
@ -551,31 +552,29 @@ where
let span_ctx = req.extensions().get().cloned();
let (tx, rx) = mpsc::channel(4);
let measurement_tag_keys_request = req.into_inner();
let req = req.into_inner();
let db_name = get_database_name(&measurement_tag_keys_request)?;
let db_name = get_database_name(&req)?;
let db = self
.db_store
.db(&db_name)
.context(DatabaseNotFound { db_name: &db_name })?;
db.record_query("measurement_tag_keys", defer_json(&req));
let MeasurementTagKeysRequest {
source: _source,
measurement,
range,
predicate,
} = measurement_tag_keys_request;
} = req;
info!(%db_name, ?range, %measurement, predicate=%predicate.loggable(), "measurement_tag_keys");
let measurement = Some(measurement);
let response = tag_keys_impl(
self.db_store.as_ref(),
db_name,
measurement,
range,
predicate,
span_ctx,
)
.await
.map_err(|e| e.to_status());
let response = tag_keys_impl(db, db_name, measurement, range, predicate, span_ctx)
.await
.map_err(|e| e.to_status());
tx.send(response)
.await
@ -593,9 +592,14 @@ where
let span_ctx = req.extensions().get().cloned();
let (tx, rx) = mpsc::channel(4);
let measurement_tag_values_request = req.into_inner();
let req = req.into_inner();
let db_name = get_database_name(&measurement_tag_values_request)?;
let db_name = get_database_name(&req)?;
let db = self
.db_store
.db(&db_name)
.context(DatabaseNotFound { db_name: &db_name })?;
db.record_query("measurement_tag_values", defer_json(&req));
let MeasurementTagValuesRequest {
source: _source,
@ -603,14 +607,14 @@ where
range,
predicate,
tag_key,
} = measurement_tag_values_request;
} = req;
info!(%db_name, ?range, %measurement, %tag_key, predicate=%predicate.loggable(), "measurement_tag_values");
let measurement = Some(measurement);
let response = tag_values_impl(
self.db_store.as_ref(),
db,
db_name,
tag_key,
measurement,
@ -637,36 +641,34 @@ where
let span_ctx = req.extensions().get().cloned();
let (tx, rx) = mpsc::channel(4);
let measurement_fields_request = req.into_inner();
let req = req.into_inner();
let db_name = get_database_name(&measurement_fields_request)?;
let db_name = get_database_name(&req)?;
let db = self
.db_store
.db(&db_name)
.context(DatabaseNotFound { db_name: &db_name })?;
db.record_query("measurement_fields", defer_json(&req));
let MeasurementFieldsRequest {
source: _source,
measurement,
range,
predicate,
} = measurement_fields_request;
} = req;
info!(%db_name, ?range, predicate=%predicate.loggable(), "measurement_fields");
let measurement = Some(measurement);
let response = field_names_impl(
self.db_store.as_ref(),
db_name,
measurement,
range,
predicate,
span_ctx,
)
.await
.map(|fieldlist| {
fieldlist_to_measurement_fields_response(fieldlist)
.context(ConvertingFieldList)
.map_err(|e| e.to_status())
})
.map_err(|e| e.to_status())?;
let response = field_names_impl(db, db_name, measurement, range, predicate, span_ctx)
.await
.map(|fieldlist| {
fieldlist_to_measurement_fields_response(fieldlist)
.context(ConvertingFieldList)
.map_err(|e| e.to_status())
})
.map_err(|e| e.to_status())?;
tx.send(response)
.await
@ -714,19 +716,18 @@ fn get_database_name(input: &impl GrpcInputs) -> Result<DatabaseName<'static>, S
/// Gathers all measurement names that have data in the specified
/// (optional) range
async fn measurement_name_impl<T>(
db_store: &T,
async fn measurement_name_impl<D>(
db: Arc<D>,
db_name: DatabaseName<'static>,
range: Option<TimestampRange>,
span_ctx: Option<SpanContext>,
) -> Result<StringValuesResponse>
where
T: DatabaseStore + 'static,
D: QueryDatabase + ExecutionContextProvider + 'static,
{
let predicate = PredicateBuilder::default().set_range(range).build();
let db_name = db_name.as_ref();
let db_name = db_name.as_str();
let db = db_store.db(db_name).context(DatabaseNotFound { db_name })?;
let ctx = db.new_query_context(span_ctx);
let plan = Planner::new(&ctx)
@ -753,8 +754,8 @@ where
/// Return tag keys with optional measurement, timestamp and arbitratry
/// predicates
async fn tag_keys_impl<T>(
db_store: &T,
async fn tag_keys_impl<D>(
db: Arc<D>,
db_name: DatabaseName<'static>,
measurement: Option<String>,
range: Option<TimestampRange>,
@ -762,9 +763,10 @@ async fn tag_keys_impl<T>(
span_ctx: Option<SpanContext>,
) -> Result<StringValuesResponse>
where
T: DatabaseStore + 'static,
D: QueryDatabase + ExecutionContextProvider + 'static,
{
let rpc_predicate_string = format!("{:?}", rpc_predicate);
let db_name = db_name.as_str();
let predicate = PredicateBuilder::default()
.set_range(range)
@ -775,27 +777,19 @@ where
})?
.build();
let db = db_store.db(&db_name).context(DatabaseNotFound {
db_name: db_name.as_str(),
})?;
let ctx = db.new_query_context(span_ctx);
let tag_key_plan = Planner::new(&ctx)
.tag_keys(db, predicate)
.await
.map_err(|e| Box::new(e) as _)
.context(ListingColumns {
db_name: db_name.as_str(),
})?;
.context(ListingColumns { db_name })?;
let tag_keys = ctx
.to_string_set(tag_key_plan)
.await
.map_err(|e| Box::new(e) as _)
.context(ListingColumns {
db_name: db_name.as_str(),
})?;
.context(ListingColumns { db_name })?;
// Map the resulting collection of Strings into a Vec<Vec<u8>>for return
let values = tag_keys_to_byte_vecs(tag_keys);
@ -806,8 +800,8 @@ where
/// Return tag values for tag_name, with optional measurement, timestamp and
/// arbitratry predicates
async fn tag_values_impl<T>(
db_store: &T,
async fn tag_values_impl<D>(
db: Arc<D>,
db_name: DatabaseName<'static>,
tag_name: String,
measurement: Option<String>,
@ -816,7 +810,7 @@ async fn tag_values_impl<T>(
span_ctx: Option<SpanContext>,
) -> Result<StringValuesResponse>
where
T: DatabaseStore + 'static,
D: QueryDatabase + ExecutionContextProvider + 'static,
{
let rpc_predicate_string = format!("{:?}", rpc_predicate);
@ -832,7 +826,6 @@ where
let db_name = db_name.as_str();
let tag_name = &tag_name;
let db = db_store.db(db_name).context(DatabaseNotFound { db_name })?;
let ctx = db.new_query_context(span_ctx);
let tag_value_plan = Planner::new(&ctx)
@ -858,15 +851,19 @@ where
}
/// Launch async tasks that materialises the result of executing read_filter.
async fn read_filter_impl<T>(
db_store: &T,
async fn read_filter_impl<D>(
db: Arc<D>,
db_name: DatabaseName<'static>,
req: ReadFilterRequest,
span_ctx: Option<SpanContext>,
) -> Result<Vec<ReadResponse>, Error>
where
T: DatabaseStore + 'static,
D: QueryDatabase + ExecutionContextProvider + 'static,
{
let db_name = db_name.as_str();
let ctx = db.new_query_context(span_ctx);
let rpc_predicate_string = format!("{:?}", req.predicate);
let predicate = PredicateBuilder::default()
@ -877,10 +874,6 @@ where
})?
.build();
let db_name = db_name.as_str();
let db = db_store.db(db_name).context(DatabaseNotFound { db_name })?;
let ctx = db.new_query_context(span_ctx);
// PERF - This used to send responses to the client before execution had
// completed, but now it doesn't. We may need to revisit this in the future
// if big queries are causing a significant latency in TTFB.
@ -907,8 +900,8 @@ where
}
/// Launch async tasks that send the result of executing read_group to `tx`
async fn query_group_impl<T>(
db_store: &T,
async fn query_group_impl<D>(
db: Arc<D>,
db_name: DatabaseName<'static>,
range: Option<TimestampRange>,
rpc_predicate: Option<Predicate>,
@ -916,8 +909,11 @@ async fn query_group_impl<T>(
span_ctx: Option<SpanContext>,
) -> Result<Vec<ReadResponse>, Error>
where
T: DatabaseStore + 'static,
D: QueryDatabase + ExecutionContextProvider + 'static,
{
let db_name = db_name.as_str();
let ctx = db.new_query_context(span_ctx);
let rpc_predicate_string = format!("{:?}", rpc_predicate);
let predicate = PredicateBuilder::default()
@ -928,14 +924,6 @@ where
})?
.build();
// keep original name so we can transfer ownership
// to closure below
let owned_db_name = db_name;
let db_name = owned_db_name.as_str();
let db = db_store.db(db_name).context(DatabaseNotFound { db_name })?;
let ctx = db.new_query_context(span_ctx);
let planner = Planner::new(&ctx);
let grouped_series_set_plan = match gby_agg {
GroupByAndAggregate::Columns { agg, group_columns } => {
@ -960,9 +948,7 @@ where
.to_series_and_groups(grouped_series_set_plan)
.await
.map_err(|e| Box::new(e) as _)
.context(GroupingSeries {
db_name: owned_db_name.as_str(),
})
.context(GroupingSeries { db_name })
.log_if_error("Running Grouped SeriesSet Plan")?;
// ReadGroupRequest does not have a field to control the format of
@ -974,8 +960,8 @@ where
/// Return field names, restricted via optional measurement, timestamp and
/// predicate
async fn field_names_impl<T>(
db_store: &T,
async fn field_names_impl<D>(
db: Arc<D>,
db_name: DatabaseName<'static>,
measurement: Option<String>,
range: Option<TimestampRange>,
@ -983,7 +969,7 @@ async fn field_names_impl<T>(
span_ctx: Option<SpanContext>,
) -> Result<FieldList>
where
T: DatabaseStore + 'static,
D: QueryDatabase + ExecutionContextProvider + 'static,
{
let rpc_predicate_string = format!("{:?}", rpc_predicate);
@ -997,7 +983,6 @@ where
.build();
let db_name = db_name.as_str();
let db = db_store.db(db_name).context(DatabaseNotFound { db_name })?;
let ctx = db.new_query_context(span_ctx);
let field_list_plan = Planner::new(&ctx)
@ -1016,6 +1001,35 @@ where
Ok(field_list)
}
/// Return something which can be formatted as json ("pbjson"
/// specifically)
fn defer_json<S>(s: &S) -> impl Into<String> + '_
where
S: serde::Serialize,
{
/// Defers conversion into a String
struct DeferredToJson<'a, S>
where
S: serde::Serialize,
{
s: &'a S,
}
impl<S> From<DeferredToJson<'_, S>> for String
where
S: serde::Serialize,
{
fn from(w: DeferredToJson<'_, S>) -> Self {
match serde_json::to_string_pretty(&w.s) {
Ok(json) => json,
Err(e) => e.to_string(),
}
}
}
DeferredToJson { s }
}
#[cfg(test)]
mod tests {
use std::{

View File

@ -1,6 +1,9 @@
use std::sync::Arc;
use generated_types::{google::FromOptionalField, influxdata::iox::router::v1::*};
use generated_types::{
google::{FromOptionalField, NotFound, ResourceType},
influxdata::iox::router::v1::*,
};
use router::server::RouterServer;
use tonic::{Request, Response, Status};
@ -10,6 +13,20 @@ struct RouterService {
#[tonic::async_trait]
impl router_service_server::RouterService for RouterService {
async fn get_router(
&self,
request: Request<GetRouterRequest>,
) -> Result<Response<GetRouterResponse>, Status> {
let GetRouterRequest { router_name } = request.into_inner();
let router = self
.server
.router(&router_name)
.ok_or_else(|| NotFound::new(ResourceType::Router, router_name))?;
Ok(Response::new(GetRouterResponse {
router: Some(router.config().clone().into()),
}))
}
async fn list_routers(
&self,
_: Request<ListRoutersRequest>,

View File

@ -22,6 +22,7 @@ mod commands {
pub mod database;
pub mod debug;
pub mod operations;
pub mod router;
pub mod run;
pub mod server;
pub mod server_remote;
@ -147,6 +148,7 @@ enum Command {
Database(commands::database::Config),
// Clippy recommended boxing this variant because it's much larger than the others
Run(Box<commands::run::Config>),
Router(commands::router::Config),
Server(commands::server::Config),
Operation(commands::operations::Config),
Sql(commands::sql::Config),
@ -216,6 +218,14 @@ fn main() -> Result<(), std::io::Error> {
std::process::exit(ReturnCode::Failure as _)
}
}
Command::Router(config) => {
let _tracing_guard = handle_init_logs(init_simple_logs(log_verbose_count));
let connection = connection().await;
if let Err(e) = commands::router::command(connection, config).await {
eprintln!("{}", e);
std::process::exit(ReturnCode::Failure as _)
}
}
Command::Run(config) => {
let _tracing_guard =
handle_init_logs(init_logs_and_tracing(log_verbose_count, &config));

View File

@ -1,18 +1,29 @@
//! Contains tests using the CLI and other tools to test scenarios for
//! moving data from one server/database to another
use std::path::{Path, PathBuf};
use std::{
path::{Path, PathBuf},
time::Duration,
};
use arrow_util::assert_batches_eq;
use assert_cmd::Command;
use data_types::chunk_metadata::ChunkStorage;
use predicates::prelude::*;
use tempfile::TempDir;
use test_helpers::assert_contains;
use uuid::Uuid;
use crate::{
common::server_fixture::{ServerFixture, ServerType},
end_to_end_cases::scenario::{data_dir, db_data_dir, Scenario},
end_to_end_cases::scenario::{
collect_query, create_readable_database, data_dir, db_data_dir, rand_name,
wait_for_exact_chunk_states, wait_for_operations_to_complete,
},
};
use super::scenario::DatabaseBuilder;
/// Copy the `source_dir` directory into the `target_dir` directory using the `cp` command
fn cp_dir(source_dir: impl AsRef<Path>, target_dir: impl AsRef<Path>) {
let source_dir = source_dir.as_ref();
@ -37,13 +48,14 @@ fn cp_dir(source_dir: impl AsRef<Path>, target_dir: impl AsRef<Path>) {
.success();
}
/// Creates a new database on a shared server, writes data to it,
/// shuts it down cleanly, and copies the files to Tempdir/uuid
/// Creates a new database on a shared server, writes data to
/// `the_table` table, shuts it down cleanly, and copies the files to
/// Tempdir/uuid
///
/// Returns (db_name, uuid, tmp_dir)
async fn create_copied_database() -> (String, Uuid, TempDir) {
let server_fixture = ServerFixture::create_single_use(ServerType::Database).await;
let addr = server_fixture.grpc_base();
let fixture = ServerFixture::create_single_use(ServerType::Database).await;
let addr = fixture.grpc_base();
Command::cargo_bin("influxdb_iox")
.unwrap()
@ -66,37 +78,60 @@ async fn create_copied_database() -> (String, Uuid, TempDir) {
.success()
.stdout(predicate::str::contains("Server initialized."));
let mut management_client = server_fixture.management_client();
let scenario = Scenario::new();
let (db_name, db_uuid) = scenario.create_database(&mut management_client).await;
let db_name = rand_name();
let db_uuid = DatabaseBuilder::new(db_name.clone())
.persist(true)
.persist_age_threshold_seconds(1)
.late_arrive_window_seconds(1)
.build(fixture.grpc_channel())
.await;
// todo write data and force it to be written to disk
let lp_lines: Vec<_> = (0..1_000)
.map(|i| format!("the_table,tag1=val{} x={} {}", i, i * 10, i))
.collect();
let mut write_client = fixture.write_client();
let num_lines_written = write_client
.write_lp(&db_name, lp_lines.join("\n"), 0)
.await
.expect("successful write");
assert_eq!(num_lines_written, 1000);
wait_for_exact_chunk_states(
&fixture,
&db_name,
vec![ChunkStorage::ReadBufferAndObjectStore],
Duration::from_secs(5),
)
.await;
// figure out where the database lives and copy its data to a temporary directory,
// as you might copy data from remote object storage to local disk for debugging.
let source_dir = db_data_dir(server_fixture.dir(), db_uuid);
let source_dir = db_data_dir(fixture.dir(), db_uuid);
let tmp_dir = TempDir::new().expect("making tmp dir");
cp_dir(source_dir, tmp_dir.path());
// stop the first server (note this call blocks until the process stops)
std::mem::drop(server_fixture);
std::mem::drop(fixture);
(db_name.to_string(), db_uuid, tmp_dir)
}
#[tokio::test]
async fn migrate_database_files_from_one_server_to_another() {
async fn migrate_all_database_files_from_one_server_to_another() {
// Create a database with some parquet files
let (db_name, db_uuid, tmp_dir) = create_copied_database().await;
// Now start another server that can claim the database
let server_fixture = ServerFixture::create_shared(ServerType::Database).await;
let addr = server_fixture.grpc_base();
let fixture = ServerFixture::create_shared(ServerType::Database).await;
let addr = fixture.grpc_base();
// copy the data from tmp_dir/<uuid> to the new server's location
let mut source_dir: PathBuf = tmp_dir.path().into();
source_dir.push(db_uuid.to_string());
let target_dir = data_dir(server_fixture.dir());
let target_dir = data_dir(fixture.dir());
cp_dir(source_dir, &target_dir);
// Claiming without --force doesn't work as owner.pb still record the other server owning it
@ -129,3 +164,89 @@ async fn migrate_database_files_from_one_server_to_another() {
db_name
)));
}
#[tokio::test]
async fn migrate_table_files_from_one_server_to_another() {
let (_, db_uuid, tmp_dir) = create_copied_database().await;
// Now start another server and create a database to receive the files
let fixture = ServerFixture::create_shared(ServerType::Database).await;
let addr = fixture.grpc_base();
let db_name = rand_name();
let new_db_uuid = create_readable_database(&db_name, fixture.grpc_channel()).await;
let sql_query = "select count(*) from the_table";
// No data for the_table yet
let mut flight_client = fixture.flight_client();
let query_results = flight_client
.perform_query(&db_name, sql_query)
.await
.unwrap_err()
.to_string();
assert_contains!(query_results, "'the_table' not found");
// copy the data from tmp_dir/<uuid>/data/the_table to the new server's location
let mut source_dir: PathBuf = tmp_dir.path().into();
source_dir.push(db_uuid.to_string());
source_dir.push("data");
source_dir.push("the_table");
let mut target_dir = db_data_dir(fixture.dir(), new_db_uuid);
target_dir.push("data");
cp_dir(source_dir, &target_dir);
// rebuilding without --force doesn't work as db is in ok state
Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("database")
.arg("recover")
.arg("rebuild")
.arg(&db_name)
.arg("--host")
.arg(addr)
.assert()
.failure()
.stderr(predicate::str::contains(
"in invalid state for catalog rebuild (Initialized). Expected (CatalogLoadError)",
));
// however with --force rebuilding will work
Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("database")
.arg("recover")
.arg("rebuild")
.arg(&db_name)
.arg("--force") // sudo make me a sandwich
.arg("--host")
.arg(addr)
.assert()
.success()
.stdout(predicate::str::contains("operation"));
// Wait for the rebuild to complete (maybe not needed given we
// wait right below for the server to be initialized)
wait_for_operations_to_complete(&fixture, &db_name, Duration::from_secs(5)).await;
// Wait for all databases to complete re-initialization here
fixture.wait_server_initialized().await;
// Now the data shoudl be available for the_table
let query_results = flight_client
.perform_query(&db_name, sql_query)
.await
.unwrap();
let batches = collect_query(query_results).await;
let expected = vec![
"+-----------------+",
"| COUNT(UInt8(1)) |",
"+-----------------+",
"| 1000 |",
"+-----------------+",
];
assert_batches_eq!(expected, &batches);
}

View File

@ -1204,7 +1204,7 @@ async fn test_rebuild_preserved_catalog() {
let mut management_client = fixture.management_client();
let mut operations_client = fixture.operations_client();
let iox_operation = management_client
.rebuild_preserved_catalog(&db_name)
.rebuild_preserved_catalog(&db_name, false)
.await
.unwrap();

View File

@ -19,6 +19,7 @@ mod read_cli;
mod remote_api;
mod remote_cli;
mod router_api;
mod router_cli;
mod run_cli;
pub mod scenario;
mod sql_cli;

View File

@ -45,6 +45,7 @@ async fn test_router_crud() {
// no routers
assert_eq!(client.list_routers().await.unwrap().len(), 0);
assert_error!(client.get_router(&cfg_foo_1.name).await, Error::NotFound(_));
client.delete_router(&router_name_b).await.unwrap();
// add routers
@ -54,6 +55,8 @@ async fn test_router_crud() {
assert_eq!(routers.len(), 2);
assert_eq!(&routers[0], &cfg_bar);
assert_eq!(&routers[1], &cfg_foo_1);
assert_eq!(client.get_router(&cfg_bar.name).await.unwrap(), cfg_bar);
assert_eq!(client.get_router(&cfg_foo_1.name).await.unwrap(), cfg_foo_1);
// update router
client.update_router(cfg_foo_2.clone()).await.unwrap();
@ -61,12 +64,18 @@ async fn test_router_crud() {
assert_eq!(routers.len(), 2);
assert_eq!(&routers[0], &cfg_bar);
assert_eq!(&routers[1], &cfg_foo_2);
assert_eq!(client.get_router(&cfg_bar.name).await.unwrap(), cfg_bar);
assert_eq!(client.get_router(&cfg_foo_2.name).await.unwrap(), cfg_foo_2);
// delete routers
client.delete_router(&router_name_b).await.unwrap();
let routers = client.list_routers().await.unwrap();
assert_eq!(routers.len(), 1);
assert_eq!(&routers[0], &cfg_bar);
assert_eq!(client.get_router(&cfg_bar.name).await.unwrap(), cfg_bar);
assert_error!(client.get_router(&cfg_foo_2.name).await, Error::NotFound(_));
// deleting router a second time is a no-op
client.delete_router(&router_name_b).await.unwrap();
}

View File

@ -0,0 +1,91 @@
use crate::{
common::server_fixture::{ServerFixture, ServerType},
end_to_end_cases::scenario::rand_name,
};
use assert_cmd::Command;
use predicates::prelude::*;
#[tokio::test]
async fn test_router_crud() {
let server_fixture = ServerFixture::create_shared(ServerType::Router).await;
let addr = server_fixture.grpc_base();
let router_name = rand_name();
Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("router")
.arg("get")
.arg(&router_name)
.arg("--host")
.arg(addr)
.assert()
.failure()
.stderr(predicate::str::contains(format!(
"Resource router/{} not found",
router_name,
)));
Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("router")
.arg("create-or-update")
.arg(&router_name)
.arg("--host")
.arg(addr)
.assert()
.success()
.stdout(predicate::str::contains(format!(
"Created/Updated router {}",
router_name
)));
Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("router")
.arg("list")
.arg("--host")
.arg(addr)
.assert()
.success()
.stdout(predicate::str::contains(&router_name));
Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("router")
.arg("get")
.arg(&router_name)
.arg("--host")
.arg(addr)
.assert()
.success()
.stdout(
predicate::str::contains(&router_name).and(predicate::str::contains(format!(
r#""name": "{}"#,
&router_name
))), // validate the defaults have been set reasonably
);
Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("router")
.arg("delete")
.arg(&router_name)
.arg("--host")
.arg(addr)
.assert()
.success()
.stdout(predicate::str::contains(format!(
"Deleted router {}",
router_name
)));
Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("router")
.arg("list")
.arg("--host")
.arg(addr)
.assert()
.success()
.stdout(predicate::str::contains(&router_name).not());
}

View File

@ -127,6 +127,28 @@ impl Scenario {
})
}
/// returns a function suitable for normalizing output that
/// contains org and bucket ids.
///
/// Specifically, the function will replace all instances of
/// `org_id` with `XXXXXXXXXXXXXXXX` and the `bucket_id` with a
/// `YYYYYYYYYYYYYY`, and the read source with `ZZZZZZZZZZZZZZZZ`
pub fn normalizer(&self) -> impl Fn(&str) -> String {
let org_id = self.org_id.clone();
let bucket_id = self.bucket_id.clone();
// also, the actual gRPC request has the org id encoded in the ReadSource,
// \"value\": \"CLmSwbj3opLLdRCWrJ2bgoeRw5kBGP////8P\" |",
let read_source_value = self.read_source().unwrap().value;
let read_source_value = base64::encode(&read_source_value);
move |s: &str| {
s.replace(&org_id, "XXXXXXXXXXXXXXXX")
.replace(&bucket_id, "YYYYYYYYYYYYYY")
.replace(&read_source_value, "ZZZZZZZZZZZZZZZZ")
}
}
/// Creates the database on the server for this scenario,
/// returning (name, uuid)
pub async fn create_database(
@ -314,9 +336,9 @@ pub struct DatabaseBuilder {
}
impl DatabaseBuilder {
pub fn new(name: String) -> Self {
pub fn new(name: impl Into<String>) -> Self {
Self {
name,
name: name.into(),
partition_template: PartitionTemplate {
parts: vec![partition_template::Part {
part: Some(partition_template::part::Part::Table(Empty {})),
@ -377,11 +399,11 @@ impl DatabaseBuilder {
self
}
// Build a database
// Build a database, returning the UUID of the created database
pub async fn try_build(
self,
channel: Connection,
) -> Result<(), influxdb_iox_client::error::Error> {
) -> Result<Uuid, influxdb_iox_client::error::Error> {
let mut management_client = management::Client::new(channel);
management_client
@ -392,22 +414,21 @@ impl DatabaseBuilder {
worker_cleanup_avg_sleep: None,
write_buffer_connection: self.write_buffer,
})
.await?;
Ok(())
.await
}
// Build a database
pub async fn build(self, channel: Connection) {
pub async fn build(self, channel: Connection) -> Uuid {
self.try_build(channel)
.await
.expect("create database failed");
.expect("create database failed")
}
}
/// given a channel to talk with the management api, create a new
/// database with the specified name configured with a 10MB mutable
/// buffer, partitioned on table
pub async fn create_readable_database(db_name: impl Into<String>, channel: Connection) {
/// buffer, partitioned on table, returning the UUID of the created database
pub async fn create_readable_database(db_name: impl Into<String>, channel: Connection) -> Uuid {
DatabaseBuilder::new(db_name.into()).build(channel).await
}
@ -522,6 +543,52 @@ where
}
}
// Wait for up to `wait_time` all operations to be complete
pub async fn wait_for_operations_to_complete(
fixture: &ServerFixture,
db_name: &str,
wait_time: std::time::Duration,
) {
let t_start = std::time::Instant::now();
let mut operations_client = fixture.operations_client();
loop {
let mut operations = operations_client.list_operations().await.unwrap();
operations.sort_by(|a, b| a.operation.name.cmp(&b.operation.name));
// if all operations are complete, great!
let all_ops_done = operations
.iter()
.filter(|op| {
// job name matches
op.metadata
.job
.as_ref()
.map(|job| job.db_name() == db_name)
.unwrap_or(false)
})
.all(|op| op.operation.done);
if all_ops_done {
println!(
"All operations for {} complete after {:?}:\n\n{:#?}",
db_name,
t_start.elapsed(),
operations
);
return;
}
if t_start.elapsed() >= wait_time {
panic!(
"Operations for {} did not complete in {:?}:\n\n{:#?}",
db_name, wait_time, operations
);
}
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
}
}
/// Gets the list of ChunkSummaries from the server
pub async fn list_chunks(fixture: &ServerFixture, db_name: &str) -> Vec<ChunkSummary> {
let mut management_client = fixture.management_client();

View File

@ -1,5 +1,8 @@
use crate::common::server_fixture::{ServerFixture, ServerType};
use arrow_util::assert_batches_eq;
use crate::{
common::server_fixture::{ServerFixture, ServerType},
end_to_end_cases::scenario::Scenario,
};
use arrow_util::{assert_batches_eq, test_util::normalize_batches};
use super::scenario::{collect_query, create_readable_database, list_chunks, rand_name};
@ -73,3 +76,64 @@ async fn test_operations() {
assert_batches_eq!(expected_read_data, &batches);
}
#[tokio::test]
async fn test_queries() {
let fixture = ServerFixture::create_shared(ServerType::Database).await;
let scenario = Scenario::new();
let (db_name, _db_uuid) = scenario
.create_database(&mut fixture.management_client())
.await;
// issue a storage gRPC query as well (likewise will error, but we
// are just checking that things are hooked up here).
let read_source = scenario.read_source();
let range = Some(generated_types::TimestampRange {
start: 111111,
end: 222222,
});
let read_filter_request = tonic::Request::new(generated_types::ReadFilterRequest {
read_source,
range,
..Default::default()
});
fixture
.storage_client()
.read_filter(read_filter_request)
.await
.unwrap();
// Note: don't select issue_time as that changes from run to run
let query = "select query_type, query_text from system.queries";
// Query system.queries and should have an entry for the storage rpc
let query_results = fixture
.flight_client()
.perform_query(&db_name, query)
.await
.unwrap();
let batches = collect_query(query_results).await;
let batches = normalize_batches(batches, scenario.normalizer());
let expected_read_data = vec![
"+-------------+---------------------------------------------------+",
"| query_type | query_text |",
"+-------------+---------------------------------------------------+",
"| read_filter | { |",
"| | \"ReadSource\": { |",
"| | \"typeUrl\": \"/TODO\", |",
"| | \"value\": \"ZZZZZZZZZZZZZZZZ\" |",
"| | }, |",
"| | \"range\": { |",
"| | \"start\": \"111111\", |",
"| | \"end\": \"222222\" |",
"| | } |",
"| | } |",
"| sql | select query_type, query_text from system.queries |",
"+-------------+---------------------------------------------------+",
];
assert_batches_eq!(expected_read_data, &batches);
}

View File

@ -429,12 +429,13 @@ impl Client {
pub async fn rebuild_preserved_catalog(
&mut self,
db_name: impl Into<String> + Send,
force: bool,
) -> Result<IoxOperation, Error> {
let db_name = db_name.into();
let response = self
.inner
.rebuild_preserved_catalog(RebuildPreservedCatalogRequest { db_name })
.rebuild_preserved_catalog(RebuildPreservedCatalogRequest { db_name, force })
.await?;
Ok(response

View File

@ -1,3 +1,5 @@
use ::generated_types::google::OptionalField;
use self::generated_types::{router_service_client::RouterServiceClient, *};
use crate::connection::Connection;
@ -49,6 +51,21 @@ impl Client {
}
}
/// Get router
pub async fn get_router(
&mut self,
router_name: &str,
) -> Result<generated_types::Router, Error> {
let response = self
.inner
.get_router(GetRouterRequest {
router_name: router_name.to_string(),
})
.await?;
Ok(response.into_inner().router.unwrap_field("router")?)
}
/// List routers.
pub async fn list_routers(&mut self) -> Result<Vec<generated_types::Router>, Error> {
let response = self.inner.list_routers(ListRoutersRequest {}).await?;

View File

@ -187,19 +187,19 @@ mod tests {
let mut transaction = catalog.open_transaction().await;
// an ordinary tracked parquet file => keep
let (chunk, _) = generator.generate().await;
let (chunk, _) = generator.generate().await.unwrap();
transaction.add_parquet(&CatalogParquetInfo::from_chunk(&chunk));
paths_keep.push(chunk.path().clone());
// another ordinary tracked parquet file that was added and removed => keep (for time
// travel)
let (chunk, _) = generator.generate().await;
let (chunk, _) = generator.generate().await.unwrap();
transaction.add_parquet(&CatalogParquetInfo::from_chunk(&chunk));
transaction.remove_parquet(chunk.path());
paths_keep.push(chunk.path().clone());
// an untracked parquet file => delete
let (chunk, _) = generator.generate().await;
let (chunk, _) = generator.generate().await.unwrap();
paths_delete.push(chunk.path().clone());
transaction.commit().await.unwrap();
@ -240,14 +240,14 @@ mod tests {
// not trick the cleanup logic to remove the actual file because file paths contains a
// UUIDv4 part.
if i % 2 == 0 {
generator.generate_id(i).await;
generator.generate_id(i).await.unwrap();
}
let (chunk, _) = tokio::join!(
async {
let guard = lock.read().await;
let (chunk, _) = generator.generate_id(i).await;
let (chunk, _) = generator.generate_id(i).await.unwrap();
let mut transaction = catalog.open_transaction().await;
transaction.add_parquet(&CatalogParquetInfo::from_chunk(&chunk));
@ -284,7 +284,7 @@ mod tests {
// create some files
let mut to_remove = HashSet::default();
for _ in 0..3 {
let (chunk, _) = generator.generate().await;
let (chunk, _) = generator.generate().await.unwrap();
to_remove.insert(chunk.path().clone());
}

View File

@ -1663,7 +1663,7 @@ mod tests {
// create another transaction on-top that adds a file (this transaction will be required to load the full state)
{
let (chunk, _) = generator.generate_id(1337).await;
let (chunk, _) = generator.generate_id(1337).await.unwrap();
let mut transaction = catalog.open_transaction().await;
let info = CatalogParquetInfo::from_chunk(&chunk);
@ -1717,7 +1717,7 @@ mod tests {
// create 3 chunks
let mut chunk_addrs = vec![];
for _ in 0..3 {
let (chunk, metadata) = generator.generate().await;
let (chunk, metadata) = generator.generate().await.unwrap();
let chunk_addr = ChunkAddr::new(generator.partition(), metadata.chunk_id);
let info = CatalogParquetInfo::from_chunk(&chunk);
@ -1902,7 +1902,7 @@ mod tests {
let mut t = catalog.open_transaction().await;
for _ in 0..4 {
let (chunk, _) = generator.generate().await;
let (chunk, _) = generator.generate().await.unwrap();
let info = CatalogParquetInfo::from_chunk(&chunk);
expected.push(chunk);
state.insert(info.clone()).unwrap();
@ -1917,7 +1917,7 @@ mod tests {
// modify catalog with examples
{
let (chunk, _) = generator.generate().await;
let (chunk, _) = generator.generate().await.unwrap();
let info = CatalogParquetInfo::from_chunk(&chunk);
expected.push(chunk);
@ -1941,7 +1941,7 @@ mod tests {
{
let mut t = catalog.open_transaction().await;
let (chunk, _) = generator.generate().await;
let (chunk, _) = generator.generate().await.unwrap();
let info = CatalogParquetInfo::from_chunk(&chunk);
t.add_parquet(&info);

View File

@ -241,7 +241,7 @@ mod tests {
// build catalog with some data
let catalog = PreservedCatalog::new_empty(config.clone()).await.unwrap();
{
let (chunk, _) = generator.generate().await;
let (chunk, _) = generator.generate().await.unwrap();
let mut transaction = catalog.open_transaction().await;
transaction.add_parquet(&CatalogParquetInfo::from_chunk(&chunk));
transaction.commit().await.unwrap();
@ -352,7 +352,7 @@ File {
// build catalog with some data
let catalog = PreservedCatalog::new_empty(config.clone()).await.unwrap();
{
let (chunk, _) = generator.generate().await;
let (chunk, _) = generator.generate().await.unwrap();
let mut transaction = catalog.open_transaction().await;
transaction.add_parquet(&CatalogParquetInfo::from_chunk(&chunk));
transaction.commit().await.unwrap();

View File

@ -26,6 +26,9 @@ pub enum Error {
path: ParquetFilePath,
},
#[snafu(display("No row groups from parquet file ({:?})", path))]
NoRowGroups { path: ParquetFilePath },
#[snafu(display("Cannot add file to transaction: {}", source))]
FileRecordFailure {
source: crate::interface::CatalogStateAddError,
@ -143,7 +146,12 @@ async fn read_parquet(
let file_size_bytes = data.len();
let parquet_metadata = IoxParquetMetaData::from_file_bytes(data)
.context(MetadataReadFailure { path: path.clone() })?;
.context(MetadataReadFailure { path: path.clone() })?; // Error reading metadata
if parquet_metadata.is_none() {
return NoRowGroups { path: path.clone() }.fail();
} // No data and hence no metadata
let parquet_metadata = parquet_metadata.unwrap();
// validate IOxMetadata
parquet_metadata
@ -417,7 +425,9 @@ mod tests {
} // drop the reference to the MemWriter that the SerializedFileWriter has
let data = mem_writer.into_inner().unwrap();
let md = IoxParquetMetaData::from_file_bytes(data.clone()).unwrap();
let md = IoxParquetMetaData::from_file_bytes(data.clone())
.unwrap()
.unwrap();
let storage = Storage::new(Arc::clone(iox_object_store));
let chunk_addr = ChunkAddr {
db_name: Arc::clone(db_name),

View File

@ -274,7 +274,7 @@ where
// add files
{
for chunk_id in 1..5 {
let (chunk, _) = generator.generate_id(chunk_id).await;
let (chunk, _) = generator.generate_id(chunk_id).await.unwrap();
state
.add(
Arc::clone(iox_object_store),
@ -295,7 +295,7 @@ where
// add and remove in the same transaction
{
let (chunk, _) = generator.generate_id(5).await;
let (chunk, _) = generator.generate_id(5).await.unwrap();
state
.add(
Arc::clone(iox_object_store),
@ -321,7 +321,7 @@ where
// add, remove, add in the same transaction
{
let (chunk, _) = generator.generate_id(6).await;
let (chunk, _) = generator.generate_id(6).await.unwrap();
state
.add(
Arc::clone(iox_object_store),
@ -358,7 +358,7 @@ where
// TODO: Error handling should disambiguate between chunk collision and filename collision
// chunk with same ID already exists (should also not change the metadata)
let (chunk, _) = generator.generate_id(2).await;
let (chunk, _) = generator.generate_id(2).await.unwrap();
let err = state
.add(
Arc::clone(iox_object_store),
@ -375,7 +375,7 @@ where
// error handling, still something works
{
// already exists (should also not change the metadata)
let (chunk, _) = generator.generate_id(2).await;
let (chunk, _) = generator.generate_id(2).await.unwrap();
let err = state
.add(
Arc::clone(iox_object_store),
@ -388,7 +388,7 @@ where
));
// this transaction will still work
let (chunk, _) = generator.generate_id(7).await;
let (chunk, _) = generator.generate_id(7).await.unwrap();
let info = CatalogParquetInfo::from_chunk(&chunk);
state
.add(Arc::clone(iox_object_store), info.clone())
@ -418,7 +418,7 @@ where
// add predicates
{
// create two chunks that we can use for delete predicate
let (chunk, metadata) = generator.generate_id(8).await;
let (chunk, metadata) = generator.generate_id(8).await.unwrap();
let chunk_addr_1 = ChunkAddr::new(generator.partition(), metadata.chunk_id);
state
@ -429,7 +429,7 @@ where
.unwrap();
expected_chunks.insert(8, chunk);
let (chunk, metadata) = generator.generate_id(9).await;
let (chunk, metadata) = generator.generate_id(9).await.unwrap();
let chunk_addr_2 = ChunkAddr::new(generator.partition(), metadata.chunk_id);
state
@ -453,7 +453,7 @@ where
expected_predicates.insert(predicate_2, chunks_2.into_iter().collect());
// chunks created afterwards are unaffected
let (chunk, _) = generator.generate_id(10).await;
let (chunk, _) = generator.generate_id(10).await.unwrap();
state
.add(
Arc::clone(iox_object_store),

View File

@ -482,12 +482,17 @@ pub struct IoxParquetMetaData {
impl IoxParquetMetaData {
/// Read parquet metadata from a parquet file.
pub fn from_file_bytes(data: Vec<u8>) -> Result<Self> {
pub fn from_file_bytes(data: Vec<u8>) -> Result<Option<Self>> {
if data.is_empty() {
return Ok(None);
}
let cursor = SliceableCursor::new(data);
let reader = SerializedFileReader::new(cursor).context(ParquetMetaDataRead {})?;
let parquet_md = reader.metadata().clone();
let data = Self::parquet_md_to_thrift(parquet_md)?;
Ok(Self::from_thrift_bytes(data))
Ok(Some(Self::from_thrift_bytes(data)))
}
/// Read parquet metadata from thrift bytes.
@ -877,7 +882,7 @@ mod tests {
async fn test_restore_from_file() {
// setup: preserve chunk to object store
let mut generator = ChunkGenerator::new().await;
let (chunk, _) = generator.generate().await;
let (chunk, _) = generator.generate().await.unwrap();
let parquet_metadata = chunk.parquet_metadata();
let decoded = parquet_metadata.decode().unwrap();
@ -901,7 +906,7 @@ mod tests {
async fn test_restore_from_thrift() {
// setup: write chunk to object store and only keep thrift-encoded metadata
let mut generator = ChunkGenerator::new().await;
let (chunk, _) = generator.generate().await;
let (chunk, _) = generator.generate().await.unwrap();
let parquet_metadata = chunk.parquet_metadata();
let data = parquet_metadata.thrift_bytes().to_vec();
let parquet_metadata = IoxParquetMetaData::from_thrift_bytes(data);
@ -923,52 +928,14 @@ mod tests {
// setup: preserve chunk to object store
let mut generator = ChunkGenerator::new().await;
generator.set_config(GeneratorConfig::NoData);
let (chunk, _) = generator.generate().await;
let parquet_metadata = chunk.parquet_metadata();
let decoded = parquet_metadata.decode().unwrap();
// step 1: read back schema
let schema_actual = decoded.read_schema().unwrap();
let schema_expected = chunk.schema();
assert_eq!(schema_actual, schema_expected);
// step 2: reading back statistics fails
let res = decoded.read_statistics(&schema_actual);
assert_eq!(
res.unwrap_err().to_string(),
"No row group found, cannot recover statistics"
);
}
#[tokio::test]
async fn test_restore_from_thrift_no_row_group() {
// setup: write chunk to object store and only keep thrift-encoded metadata
let mut generator = ChunkGenerator::new().await;
generator.set_config(GeneratorConfig::NoData);
let (chunk, _) = generator.generate().await;
let parquet_metadata = chunk.parquet_metadata();
let data = parquet_metadata.thrift_bytes().to_vec();
let parquet_metadata = IoxParquetMetaData::from_thrift_bytes(data);
let decoded = parquet_metadata.decode().unwrap();
// step 1: read back schema
let schema_actual = decoded.read_schema().unwrap();
let schema_expected = chunk.schema();
assert_eq!(schema_actual, schema_expected);
// step 2: reading back statistics fails
let res = decoded.read_statistics(&schema_actual);
assert_eq!(
res.unwrap_err().to_string(),
"No row group found, cannot recover statistics"
);
let result = generator.generate().await;
assert!(result.is_none());
}
#[tokio::test]
async fn test_make_chunk() {
let mut generator = ChunkGenerator::new().await;
let (chunk, _) = generator.generate().await;
let (chunk, _) = generator.generate().await.unwrap();
let parquet_metadata = chunk.parquet_metadata();
let decoded = parquet_metadata.decode().unwrap();
@ -994,30 +961,6 @@ mod tests {
}
}
#[tokio::test]
async fn test_make_chunk_no_row_group() {
let mut generator = ChunkGenerator::new().await;
generator.set_config(GeneratorConfig::NoData);
let (chunk, _) = generator.generate().await;
let parquet_metadata = chunk.parquet_metadata();
let decoded = parquet_metadata.decode().unwrap();
assert_eq!(decoded.md.num_row_groups(), 0);
assert_ne!(decoded.md.file_metadata().schema_descr().num_columns(), 0);
assert_eq!(decoded.md.file_metadata().num_rows(), 0);
// column count in summary including the timestamp column
assert_eq!(
chunk.table_summary().columns.len(),
decoded.md.file_metadata().schema_descr().num_columns()
);
// check column names
for column in decoded.md.file_metadata().schema_descr().columns() {
assert!((column.name() == TIME_COLUMN_NAME) || column.name().starts_with("foo_"));
}
}
#[test]
fn test_iox_metadata_from_protobuf_checks_version() {
let table_name = Arc::from("table1");
@ -1062,7 +1005,7 @@ mod tests {
async fn test_parquet_metadata_size() {
// setup: preserve chunk to object store
let mut generator = ChunkGenerator::new().await;
let (chunk, _) = generator.generate().await;
let (chunk, _) = generator.generate().await.unwrap();
let parquet_metadata = chunk.parquet_metadata();
assert_eq!(parquet_metadata.size(), 3729);
}

View File

@ -77,6 +77,9 @@ pub enum Error {
ParquetReader {
source: parquet::errors::ParquetError,
},
#[snafu(display("No data to convert to parquet"))]
NoData {},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@ -109,14 +112,16 @@ impl Storage {
let schema = stream.schema();
let data = Self::parquet_stream_to_bytes(stream, schema, metadata).await?;
// TODO: make this work w/o cloning the byte vector (https://github.com/influxdata/influxdb_iox/issues/1504)
// no data
if data.is_empty() {
return Ok(None);
}
// TODO: make this work w/o cloning the byte vector (https://github.com/influxdata/influxdb_iox/issues/1504)
let file_size_bytes = data.len();
let md =
IoxParquetMetaData::from_file_bytes(data.clone()).context(ExtractingMetadataFailure)?;
let md = IoxParquetMetaData::from_file_bytes(data.clone())
.context(ExtractingMetadataFailure)?
.context(NoData)?;
self.to_object_store(data, &path).await?;
Ok(Some((path, file_size_bytes, md)))
@ -146,10 +151,15 @@ impl Storage {
{
let mut writer = ArrowWriter::try_new(mem_writer.clone(), schema, Some(props))
.context(OpeningParquetWriter)?;
let mut no_stream_data = true;
while let Some(batch) = stream.next().await {
no_stream_data = false;
let batch = batch.context(ReadingStream)?;
writer.write(&batch).context(WritingParquetToMemory)?;
}
if no_stream_data {
return Ok(vec![]);
}
writer.close().context(ClosingParquetWriter)?;
} // drop the reference to the MemWriter that the SerializedFileWriter has
@ -362,10 +372,10 @@ mod tests {
};
// create parquet file
let (_record_batches, schema, _column_summaries, _num_rows) =
make_record_batch("foo", TestSize::Full);
let (record_batches, schema, _column_summaries, _num_rows) =
make_record_batch("foo", TestSize::Minimal);
let stream: SendableRecordBatchStream = Box::pin(MemoryStream::new_with_schema(
vec![],
record_batches,
Arc::clone(schema.inner()),
));
let bytes =
@ -374,7 +384,7 @@ mod tests {
.unwrap();
// extract metadata
let md = IoxParquetMetaData::from_file_bytes(bytes).unwrap();
let md = IoxParquetMetaData::from_file_bytes(bytes).unwrap().unwrap();
let metadata_roundtrip = md.decode().unwrap().read_iox_metadata().unwrap();
// compare with input
@ -485,7 +495,7 @@ mod tests {
// Store the data as a chunk and write it to in the object store
// This tests Storage::write_to_object_store
let mut generator = ChunkGenerator::new().await;
let (chunk, _) = generator.generate().await;
let (chunk, _) = generator.generate().await.unwrap();
let key_value_metadata = chunk.schema().as_arrow().metadata().clone();
////////////////////
@ -494,7 +504,9 @@ mod tests {
let parquet_data = load_parquet_from_store(&chunk, Arc::clone(generator.store()))
.await
.unwrap();
let parquet_metadata = IoxParquetMetaData::from_file_bytes(parquet_data.clone()).unwrap();
let parquet_metadata = IoxParquetMetaData::from_file_bytes(parquet_data.clone())
.unwrap()
.unwrap();
let decoded = parquet_metadata.decode().unwrap();
//
// 1. Check metadata at file level: Everything is correct

View File

@ -66,13 +66,13 @@ impl ChunkGenerator {
&self.partition
}
pub async fn generate(&mut self) -> (ParquetChunk, IoxMetadata) {
pub async fn generate(&mut self) -> Option<(ParquetChunk, IoxMetadata)> {
let id = self.next_chunk;
self.next_chunk += 1;
self.generate_id(id).await
}
pub async fn generate_id(&mut self, id: u32) -> (ParquetChunk, IoxMetadata) {
pub async fn generate_id(&mut self, id: u32) -> Option<(ParquetChunk, IoxMetadata)> {
let (partition_checkpoint, database_checkpoint) = create_partition_and_database_checkpoint(
Arc::clone(&self.partition.table_name),
Arc::clone(&self.partition.partition_key),
@ -116,13 +116,15 @@ impl ChunkGenerator {
Arc::clone(schema.inner()),
));
let (path, file_size_bytes, parquet_metadata) = self
let written_result = self
.storage
.write_to_object_store(chunk_addr, stream, metadata.clone())
.await
.unwrap()
.unwrap();
written_result.as_ref()?;
let (path, file_size_bytes, parquet_metadata) = written_result.unwrap();
let chunk = ParquetChunk::new_from_parts(
Arc::clone(&self.partition.partition_key),
Arc::new(table_summary),
@ -135,6 +137,6 @@ impl ChunkGenerator {
ChunkMetrics::new_unregistered(),
);
(chunk, metadata)
Some((chunk, metadata))
}
}

View File

@ -48,6 +48,38 @@ impl ReorgPlanner {
Self::default()
}
/// Creates an execution plan for a full scan of a single chunk.
/// This plan is primarilty used to load chunks from one storage medium to
/// another.
pub fn scan_single_chunk_plan<C>(
&self,
schema: Arc<Schema>,
chunk: Arc<C>,
) -> Result<LogicalPlan>
where
C: QueryChunk + 'static,
{
let table_name = chunk.table_name();
// Prepare the plan for the table
let mut builder = ProviderBuilder::new(table_name, schema);
// There are no predicates in these plans, so no need to prune them
builder = builder.add_no_op_pruner();
builder = builder.add_chunk(Arc::clone(&chunk));
let provider = builder.build().context(CreatingProvider { table_name })?;
// Logical plan to scan all columns with no predicates
let plan = LogicalPlanBuilder::scan(table_name, Arc::new(provider) as _, None)
.context(BuildingPlan)?
.build()
.context(BuildingPlan)?;
debug!(%table_name, plan=%plan.display_indent_schema(),
"created single chunk scan plan");
Ok(plan)
}
/// Creates an execution plan for the COMPACT operations which does the following:
///
/// 1. Merges chunks together into a single stream
@ -186,12 +218,13 @@ impl ReorgPlanner {
}
/// Creates a scan plan for the given set of chunks.
/// Output data of the scan will be deduplicated and sorted
/// on the optimal sort order of the chunks' PK columns (tags and time).
/// Output data of the scan will be deduplicated sorted if `sort=true` on
/// the optimal sort order of the chunks' PK columns (tags and time).
///
/// The optimal sort order is computed based on the PK columns cardinality
/// that will be best for RLE encoding.
///
/// Prefer to query::provider::build_scan_plan for the detail of the plan
/// Refer to query::provider::build_scan_plan for the detail of the plan
///
fn sorted_scan_plan<C, I>(&self, schema: Arc<Schema>, chunks: I) -> Result<ScanPlan<C>>
where
@ -207,7 +240,6 @@ impl ReorgPlanner {
// Prepare the plan for the table
let mut builder = ProviderBuilder::new(table_name, schema);
// Tell the scan of this provider to sort its output on the chunks' PK
builder.ensure_pk_sort();
@ -318,6 +350,45 @@ mod test {
(Arc::new(schema), vec![chunk1, chunk2])
}
#[tokio::test]
async fn test_sorted_scan_plan() {
test_helpers::maybe_start_logging();
let (schema, chunks) = get_test_chunks().await;
let scan_plan = ReorgPlanner::new()
.scan_single_chunk_plan(schema, chunks.into_iter().next().unwrap())
.expect("created compact plan");
let executor = Executor::new(1);
let physical_plan = executor
.new_context(ExecutorType::Reorg)
.prepare_plan(&scan_plan)
.await
.unwrap();
// single chunk processed
assert_eq!(physical_plan.output_partitioning().partition_count(), 1);
let batches = datafusion::physical_plan::collect(physical_plan)
.await
.unwrap();
// all data from chunk
let expected = vec![
"+-----------+------------+------+--------------------------------+",
"| field_int | field_int2 | tag1 | time |",
"+-----------+------------+------+--------------------------------+",
"| 1000 | | MT | 1970-01-01T00:00:00.000001Z |",
"| 10 | | MT | 1970-01-01T00:00:00.000007Z |",
"| 70 | | CT | 1970-01-01T00:00:00.000000100Z |",
"| 100 | | AL | 1970-01-01T00:00:00.000000050Z |",
"| 5 | | MT | 1970-01-01T00:00:00.000005Z |",
"+-----------+------------+------+--------------------------------+",
];
assert_batches_eq!(&expected, &batches);
}
#[tokio::test]
async fn test_compact_plan() {
test_helpers::maybe_start_logging();

View File

@ -99,6 +99,9 @@ pub trait QueryDatabase: Debug + Send + Sync {
/// Return a summary of all chunks in this database, in all partitions
fn chunk_summaries(&self) -> Result<Vec<ChunkSummary>, Self::Error>;
/// Record that particular type of query was run / planned
fn record_query(&self, query_type: impl Into<String>, query_text: impl Into<String>);
}
/// Collection of data that shares the same partition key

View File

@ -134,6 +134,8 @@ impl QueryDatabase for TestDatabase {
found_one.then(|| Arc::new(merger.build()))
}
fn record_query(&self, _query_type: impl Into<String>, _query_text: impl Into<String>) {}
}
impl ExecutionContextProvider for TestDatabase {

View File

@ -1,27 +1,7 @@
-- Test Setup: OneMeasurementAllChunksDropped
-- SQL: SELECT * from information_schema.tables;
+---------------+--------------------+---------------------+------------+
| table_catalog | table_schema | table_name | table_type |
+---------------+--------------------+---------------------+------------+
| public | iox | h2o | BASE TABLE |
| public | system | chunks | BASE TABLE |
| public | system | columns | BASE TABLE |
| public | system | chunk_columns | BASE TABLE |
| public | system | operations | BASE TABLE |
| public | system | persistence_windows | BASE TABLE |
| public | information_schema | tables | VIEW |
| public | information_schema | columns | VIEW |
+---------------+--------------------+---------------------+------------+
-- SQL: SHOW TABLES;
+---------------+--------------------+---------------------+------------+
| table_catalog | table_schema | table_name | table_type |
+---------------+--------------------+---------------------+------------+
| public | iox | h2o | BASE TABLE |
| public | system | chunks | BASE TABLE |
| public | system | columns | BASE TABLE |
| public | system | chunk_columns | BASE TABLE |
| public | system | operations | BASE TABLE |
| public | system | persistence_windows | BASE TABLE |
| public | information_schema | tables | VIEW |
| public | information_schema | columns | VIEW |
+---------------+--------------------+---------------------+------------+
-- SQL: SELECT * from information_schema.tables where table_schema = 'iox';
+---------------+--------------+------------+------------+
| table_catalog | table_schema | table_name | table_type |
+---------------+--------------+------------+------------+
| public | iox | h2o | BASE TABLE |
+---------------+--------------+------------+------------+

View File

@ -1,8 +1,4 @@
-- Test for predicate push down explains
-- IOX_SETUP: OneMeasurementAllChunksDropped
-- list information schema
SELECT * from information_schema.tables;
-- same but shorter
SHOW TABLES;
-- list information schema (show that all the chunks were dropped)
SELECT * from information_schema.tables where table_schema = 'iox';

View File

@ -140,6 +140,7 @@ async fn sql_select_from_information_schema_tables() {
"| public | system | columns | BASE TABLE |",
"| public | system | operations | BASE TABLE |",
"| public | system | persistence_windows | BASE TABLE |",
"| public | system | queries | BASE TABLE |",
"+---------------+--------------------+---------------------+------------+",
];
run_sql_test_case(

View File

@ -74,6 +74,28 @@ pub enum Error {
source: Box<parquet_catalog::core::Error>,
},
#[snafu(display(
"database ({}) in invalid state for catalog rebuild ({:?}). Expected {}",
db_name,
state,
expected
))]
InvalidStateForRebuild {
db_name: String,
expected: String,
state: DatabaseStateCode,
},
#[snafu(display(
"Internal error during rebuild. Database ({}) transitioned to unexpected state ({:?})",
db_name,
state,
))]
UnexpectedTransitionForRebuild {
db_name: String,
state: DatabaseStateCode,
},
#[snafu(display(
"failed to rebuild preserved catalog of database ({}): {}",
db_name,
@ -548,13 +570,28 @@ impl Database {
Ok(tracker)
}
/// Recover from a CatalogLoadError by wiping and rebuilding the catalog
pub async fn rebuild_preserved_catalog(&self) -> Result<TaskTracker<Job>, Error> {
/// Rebuilding the catalog from parquet files. This can be used to
/// recover from a CatalogLoadError, or if new parquet files are
/// added to the data directory
pub async fn rebuild_preserved_catalog(&self, force: bool) -> Result<TaskTracker<Job>, Error> {
let handle = self.shared.state.read().freeze();
let handle = handle.await;
let db_name = &self.shared.config.name;
let current_state = error_state!(self, "RebuildPreservedCatalog", CatalogLoadError);
{
// If the force flag is not specified, can only rebuild
// the catalog if it is in ended up in an error loading
let state = self.shared.state.read();
if !force && !matches!(&**state, DatabaseState::CatalogLoadError { .. }) {
return InvalidStateForRebuild {
db_name,
state: state.state_code(),
expected: "(CatalogLoadError)",
}
.fail();
}
}
let registry = self.shared.application.job_registry();
let (tracker, registration) = registry.register(Job::RebuildPreservedCatalog {
@ -562,19 +599,59 @@ impl Database {
});
let shared = Arc::clone(&self.shared);
let iox_object_store = self.iox_object_store().context(InvalidStateForRebuild {
db_name,
state: shared.state.read().state_code(),
expected: "Object store initialized",
})?;
tokio::spawn(
async move {
let db_name = &shared.config.name;
// First attempt to wipe the catalog
PreservedCatalog::wipe(&current_state.iox_object_store)
// shutdown / stop the DB if it is running so it can't
// be read / written to, while also preventing
// anything else from driving the state machine
// forward.
info!(%db_name, "rebuilding catalog, resetting database state");
{
let mut state = shared.state.write();
// Dropping the state here also terminates the
// LifeCycleWorker and WriteBufferConsumer so all
// background work should have completed.
*state.unfreeze(handle) = DatabaseState::Known(DatabaseStateKnown {});
// tell existing db background tasks, if any, to start shutting down
shared.state_notify.notify_waiters();
};
// get another freeze handle to prevent anything else
// from messing with the state while we rebuild the
// catalog (is there a better way??)
let handle = shared.state.read().freeze();
let _handle = handle.await;
// check that during lock gap the state has not changed
{
let state = shared.state.read();
ensure!(
matches!(&**state, DatabaseState::Known(_)),
UnexpectedTransitionForRebuild {
db_name,
state: state.state_code()
}
);
}
info!(%db_name, "rebuilding catalog from parquet files");
// Now wipe the catalog and rebuild it from parquet files
PreservedCatalog::wipe(iox_object_store.as_ref())
.await
.map_err(Box::new)
.context(WipePreservedCatalog { db_name })?;
let config = PreservedCatalogConfig::new(
Arc::clone(&current_state.iox_object_store),
Arc::clone(&iox_object_store),
db_name.to_string(),
Arc::clone(shared.application.time_provider()),
);
@ -583,10 +660,11 @@ impl Database {
.map_err(Box::new)
.context(RebuildPreservedCatalog { db_name })?;
{
let mut state = shared.state.write();
*state.unfreeze(handle) = DatabaseState::RulesLoaded(current_state);
}
// Double check the state hasn't changed (we hold the
// freeze handle to make sure it does not)
assert!(matches!(&**shared.state.read(), DatabaseState::Known(_)));
info!(%db_name, "catalog rebuilt successfully");
Ok::<_, Error>(())
}

View File

@ -75,6 +75,7 @@ mod chunk;
mod lifecycle;
pub mod load;
pub mod pred;
mod query_log;
mod replay;
mod streams;
mod system_tables;
@ -278,19 +279,31 @@ pub(crate) struct DatabaseToCommit {
impl Db {
pub(crate) fn new(database_to_commit: DatabaseToCommit, jobs: Arc<JobRegistry>) -> Self {
let name = Arc::from(database_to_commit.rules.name.as_str());
let DatabaseToCommit {
server_id,
iox_object_store,
exec,
preserved_catalog,
catalog,
rules,
time_provider,
metric_registry,
} = database_to_commit;
let rules = RwLock::new(database_to_commit.rules);
let server_id = database_to_commit.server_id;
let iox_object_store = Arc::clone(&database_to_commit.iox_object_store);
let name = Arc::from(rules.name.as_str());
let catalog = Arc::new(database_to_commit.catalog);
let rules = RwLock::new(rules);
let server_id = server_id;
let iox_object_store = Arc::clone(&iox_object_store);
let catalog = Arc::new(catalog);
let catalog_access = QueryCatalogAccess::new(
&*name,
Arc::clone(&catalog),
Arc::clone(&jobs),
database_to_commit.metric_registry.as_ref(),
Arc::clone(&time_provider),
metric_registry.as_ref(),
);
let catalog_access = Arc::new(catalog_access);
@ -299,16 +312,16 @@ impl Db {
name,
server_id,
iox_object_store,
exec: database_to_commit.exec,
preserved_catalog: Arc::new(database_to_commit.preserved_catalog),
exec,
preserved_catalog: Arc::new(preserved_catalog),
catalog,
jobs,
metric_registry: database_to_commit.metric_registry,
metric_registry,
catalog_access,
worker_iterations_cleanup: AtomicUsize::new(0),
worker_iterations_delete_predicate_preservation: AtomicUsize::new(0),
cleanup_lock: Default::default(),
time_provider: database_to_commit.time_provider,
time_provider,
delete_predicates_mailbox: Default::default(),
persisted_chunk_id_override: Default::default(),
}
@ -1170,6 +1183,10 @@ impl QueryDatabase for Db {
fn table_schema(&self, table_name: &str) -> Option<Arc<Schema>> {
self.catalog_access.table_schema(table_name)
}
fn record_query(&self, query_type: impl Into<String>, query_text: impl Into<String>) {
self.catalog_access.record_query(query_type, query_text)
}
}
impl ExecutionContextProvider for Db {
@ -2119,7 +2136,9 @@ mod tests {
.await
.unwrap();
let parquet_metadata = IoxParquetMetaData::from_file_bytes(parquet_data.clone()).unwrap();
let parquet_metadata = IoxParquetMetaData::from_file_bytes(parquet_data.clone())
.unwrap()
.unwrap();
// Read metadata at file level
let schema = parquet_metadata.decode().unwrap().read_schema().unwrap();
// Read data
@ -2246,7 +2265,9 @@ mod tests {
load_parquet_from_store_for_path(&path_list[0], Arc::clone(&db.iox_object_store))
.await
.unwrap();
let parquet_metadata = IoxParquetMetaData::from_file_bytes(parquet_data.clone()).unwrap();
let parquet_metadata = IoxParquetMetaData::from_file_bytes(parquet_data.clone())
.unwrap()
.unwrap();
// Read metadata at file level
let schema = parquet_metadata.decode().unwrap().read_schema().unwrap();
// Read data

View File

@ -7,6 +7,7 @@ use std::{any::Any, sync::Arc};
use super::{
catalog::{Catalog, TableNameFilter},
chunk::DbChunk,
query_log::QueryLog,
Error, Result,
};
@ -32,6 +33,10 @@ use query::{
pruning::{prune_chunks, PruningObserver},
QueryDatabase,
};
use time::TimeProvider;
/// The number of entries to store in the circular query buffer log
const QUERY_LOG_SIZE: usize = 100;
/// Metrics related to chunk access (pruning specifically)
#[derive(Debug)]
@ -105,6 +110,9 @@ pub(crate) struct QueryCatalogAccess {
/// Handles finding / pruning chunks based on predicates
chunk_access: Arc<ChunkAccess>,
/// Stores queries which have been executed
query_log: Arc<QueryLog>,
/// Provides access to system tables
system_tables: Arc<SystemSchemaProvider>,
@ -117,25 +125,28 @@ impl QueryCatalogAccess {
db_name: impl Into<String>,
catalog: Arc<Catalog>,
jobs: Arc<JobRegistry>,
time_provider: Arc<dyn TimeProvider>,
metric_registry: &metric::Registry,
) -> Self {
let db_name = Arc::from(db_name.into());
let access_metrics = AccessMetrics::new(metric_registry, Arc::clone(&db_name));
let chunk_access = Arc::new(ChunkAccess::new(Arc::clone(&catalog), access_metrics));
let query_log = Arc::new(QueryLog::new(QUERY_LOG_SIZE, time_provider));
let system_tables = Arc::new(SystemSchemaProvider::new(
db_name.as_ref(),
Arc::clone(&catalog),
jobs,
Arc::clone(&query_log),
));
let user_tables = Arc::new(DbSchemaProvider::new(
Arc::clone(&catalog),
Arc::clone(&chunk_access),
));
Self {
catalog,
chunk_access,
query_log,
system_tables,
user_tables,
}
@ -229,6 +240,10 @@ impl QueryDatabase for QueryCatalogAccess {
.ok()
.map(|table| Arc::clone(&table.schema().read()))
}
fn record_query(&self, query_type: impl Into<String>, query_text: impl Into<String>) {
self.query_log.push(query_type, query_text)
}
}
// Datafusion catalog provider interface

View File

@ -1007,7 +1007,7 @@ mod tests {
use data_types::{delete_predicate::DeleteExpr, timestamp::TimestampRange};
use mutable_buffer::test_helpers::write_lp_to_new_chunk;
use parquet_file::test_utils::generator::{ChunkGenerator, GeneratorConfig};
use parquet_file::test_utils::generator::ChunkGenerator;
#[test]
fn test_new_open() {
@ -1241,8 +1241,7 @@ mod tests {
async fn make_persisted_chunk() -> CatalogChunk {
let mut generator = ChunkGenerator::new().await;
generator.set_config(GeneratorConfig::NoData);
let (parquet_chunk, metadata) = generator.generate().await;
let (parquet_chunk, metadata) = generator.generate().await.unwrap();
let addr = ChunkAddr::new(generator.partition(), metadata.chunk_id);
let now = Time::from_timestamp_nanos(43564);

View File

@ -847,7 +847,6 @@ mod tests {
assert_eq!(min_partition_checkpoint, compacted_partition_checkpoint);
}
#[ignore]
#[tokio::test]
async fn test_compact_os_on_chunk_delete_all() {
test_helpers::maybe_start_logging();
@ -895,15 +894,12 @@ mod tests {
// compact the only OS chunk
let partition = partition.upgrade();
let chunk1 = chunks[0].write();
let compacted_chunk = compact_object_store_chunks(partition, vec![chunk1])
compact_object_store_chunks(partition, vec![chunk1])
.unwrap()
.1
.await
.unwrap()
.unwrap();
//.unwrap();
let err = compacted_chunk.unwrap_err();
println!("{}", err.to_string());
// verify results
let partition = db.partition("cpu", partition_key).unwrap();

View File

@ -8,7 +8,7 @@ use lifecycle::LifecycleWriteGuard;
use observability_deps::tracing::info;
use query::exec::ExecutorType;
use query::frontend::reorg::ReorgPlanner;
use query::{compute_sort_key, QueryChunkMeta};
use query::QueryChunkMeta;
use tracker::{TaskTracker, TrackedFuture, TrackedFutureExt};
use crate::db::lifecycle::collect_rub;
@ -43,14 +43,8 @@ pub fn load_chunk(
let ctx = db.exec.new_context(ExecutorType::Reorg);
let fut = async move {
let key = compute_sort_key(std::iter::once(db_chunk.summary()));
// Cannot move query_chunks as the sort key borrows the column names
let (_, plan) = ReorgPlanner::new().compact_plan(
db_chunk.schema(),
std::iter::once(Arc::clone(&db_chunk)),
key,
)?;
let plan =
ReorgPlanner::new().scan_single_chunk_plan(db_chunk.schema(), Arc::clone(&db_chunk))?;
let physical_plan = ctx.prepare_plan(&plan).await?;
let stream = ctx.execute_stream(physical_plan).await?;

View File

@ -0,0 +1,77 @@
//! Ring buffer of queries that have been run with some brief information
use std::{collections::VecDeque, sync::Arc};
use parking_lot::Mutex;
use time::{Time, TimeProvider};
/// Information about a single query that was executed
#[derive(Debug)]
pub struct QueryLogEntry {
/// The type of query
pub query_type: String,
/// The text of the query (SQL for sql queries, pbjson for storage rpc queries)
pub query_text: String,
/// Time at which the query was run
pub issue_time: Time,
}
impl QueryLogEntry {
/// Creates a new QueryLogEntry -- use `QueryLog::push` to add new entries to the log
fn new(query_type: String, query_text: String, issue_time: Time) -> Self {
Self {
query_type,
query_text,
issue_time,
}
}
}
/// Stores a fixed number `QueryExcutions` -- handles locking
/// internally so can be shared across multiple
#[derive(Debug)]
pub struct QueryLog {
log: Mutex<VecDeque<Arc<QueryLogEntry>>>,
max_size: usize,
time_provider: Arc<dyn TimeProvider>,
}
impl QueryLog {
/// Create a new QueryLog that can hold at most `size` items.
/// When the `size+1` item is added, item `0` is evicted.
pub fn new(max_size: usize, time_provider: Arc<dyn TimeProvider>) -> Self {
Self {
log: Mutex::new(VecDeque::with_capacity(max_size)),
max_size,
time_provider,
}
}
pub fn push(&self, query_type: impl Into<String>, query_text: impl Into<String>) {
if self.max_size == 0 {
return;
}
let entry = Arc::new(QueryLogEntry::new(
query_type.into(),
query_text.into(),
self.time_provider.now(),
));
let mut log = self.log.lock();
// enforce limit
if log.len() == self.max_size {
log.pop_front();
}
log.push_back(entry);
}
pub fn entries(&self) -> VecDeque<Arc<QueryLogEntry>> {
let log = self.log.lock();
log.clone()
}
}

View File

@ -7,7 +7,7 @@
//!
//! For example `SELECT * FROM system.chunks`
use super::catalog::Catalog;
use super::{catalog::Catalog, query_log::QueryLog};
use crate::JobRegistry;
use arrow::{
datatypes::{Field, Schema, SchemaRef},
@ -27,6 +27,7 @@ mod chunks;
mod columns;
mod operations;
mod persistence;
mod queries;
// The IOx system schema
pub const SYSTEM_SCHEMA: &str = "system";
@ -36,6 +37,7 @@ const COLUMNS: &str = "columns";
const CHUNK_COLUMNS: &str = "chunk_columns";
const OPERATIONS: &str = "operations";
const PERSISTENCE_WINDOWS: &str = "persistence_windows";
const QUERIES: &str = "queries";
pub struct SystemSchemaProvider {
chunks: Arc<dyn TableProvider>,
@ -43,6 +45,7 @@ pub struct SystemSchemaProvider {
chunk_columns: Arc<dyn TableProvider>,
operations: Arc<dyn TableProvider>,
persistence_windows: Arc<dyn TableProvider>,
queries: Arc<dyn TableProvider>,
}
impl std::fmt::Debug for SystemSchemaProvider {
@ -54,7 +57,12 @@ impl std::fmt::Debug for SystemSchemaProvider {
}
impl SystemSchemaProvider {
pub fn new(db_name: impl Into<String>, catalog: Arc<Catalog>, jobs: Arc<JobRegistry>) -> Self {
pub fn new(
db_name: impl Into<String>,
catalog: Arc<Catalog>,
jobs: Arc<JobRegistry>,
query_log: Arc<QueryLog>,
) -> Self {
let db_name = db_name.into();
let chunks = Arc::new(SystemTableProvider {
inner: chunks::ChunksTable::new(Arc::clone(&catalog)),
@ -71,22 +79,27 @@ impl SystemSchemaProvider {
let persistence_windows = Arc::new(SystemTableProvider {
inner: persistence::PersistenceWindowsTable::new(catalog),
});
let queries = Arc::new(SystemTableProvider {
inner: queries::QueriesTable::new(query_log),
});
Self {
chunks,
columns,
chunk_columns,
operations,
persistence_windows,
queries,
}
}
}
const ALL_SYSTEM_TABLES: [&str; 5] = [
const ALL_SYSTEM_TABLES: [&str; 6] = [
CHUNKS,
COLUMNS,
CHUNK_COLUMNS,
OPERATIONS,
PERSISTENCE_WINDOWS,
QUERIES,
];
impl SchemaProvider for SystemSchemaProvider {
@ -108,6 +121,7 @@ impl SchemaProvider for SystemSchemaProvider {
CHUNK_COLUMNS => Some(Arc::clone(&self.chunk_columns)),
OPERATIONS => Some(Arc::clone(&self.operations)),
PERSISTENCE_WINDOWS => Some(Arc::clone(&self.persistence_windows)),
QUERIES => Some(Arc::clone(&self.queries)),
_ => None,
}
}

View File

@ -0,0 +1,113 @@
use crate::db::{
query_log::{QueryLog, QueryLogEntry},
system_tables::IoxSystemTable,
};
use arrow::{
array::{StringArray, TimestampNanosecondArray},
datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit},
error::Result,
record_batch::RecordBatch,
};
use data_types::error::ErrorLogger;
use std::{collections::VecDeque, sync::Arc};
/// Implementation of system.queries table
#[derive(Debug)]
pub(super) struct QueriesTable {
schema: SchemaRef,
query_log: Arc<QueryLog>,
}
impl QueriesTable {
pub(super) fn new(query_log: Arc<QueryLog>) -> Self {
Self {
schema: queries_schema(),
query_log,
}
}
}
impl IoxSystemTable for QueriesTable {
fn schema(&self) -> SchemaRef {
Arc::clone(&self.schema)
}
fn batch(&self) -> Result<RecordBatch> {
from_query_log_entries(self.schema(), self.query_log.entries())
.log_if_error("system.chunks table")
}
}
fn queries_schema() -> SchemaRef {
Arc::new(Schema::new(vec![
Field::new(
"issue_time",
DataType::Timestamp(TimeUnit::Nanosecond, None),
false,
),
Field::new("query_type", DataType::Utf8, false),
Field::new("query_text", DataType::Utf8, false),
]))
}
fn from_query_log_entries(
schema: SchemaRef,
entries: VecDeque<Arc<QueryLogEntry>>,
) -> Result<RecordBatch> {
let issue_time = entries
.iter()
.map(|e| e.issue_time)
.map(|ts| Some(ts.timestamp_nanos()))
.collect::<TimestampNanosecondArray>();
let query_type = entries
.iter()
.map(|e| Some(&e.query_type))
.collect::<StringArray>();
let query_text = entries
.iter()
.map(|e| Some(&e.query_text))
.collect::<StringArray>();
RecordBatch::try_new(
schema,
vec![
Arc::new(issue_time),
Arc::new(query_type),
Arc::new(query_text),
],
)
}
#[cfg(test)]
mod tests {
use super::*;
use arrow_util::assert_batches_eq;
use time::{Time, TimeProvider};
#[test]
fn test_from_query_log() {
let now = Time::from_rfc3339("1996-12-19T16:39:57+00:00").unwrap();
let time_provider = Arc::new(time::MockProvider::new(now));
let query_log = QueryLog::new(10, Arc::clone(&time_provider) as Arc<dyn TimeProvider>);
query_log.push("sql", "select * from foo");
time_provider.inc(std::time::Duration::from_secs(24 * 60 * 60));
query_log.push("sql", "select * from bar");
query_log.push("read_filter", "json goop");
let expected = vec![
"+----------------------+-------------+-------------------+",
"| issue_time | query_type | query_text |",
"+----------------------+-------------+-------------------+",
"| 1996-12-19T16:39:57Z | sql | select * from foo |",
"| 1996-12-20T16:39:57Z | sql | select * from bar |",
"| 1996-12-20T16:39:57Z | read_filter | json goop |",
"+----------------------+-------------+-------------------+",
];
let schema = queries_schema();
let batch = from_query_log_entries(schema, query_log.entries()).unwrap();
assert_batches_eq!(&expected, &[batch]);
}
}