feat: add the `system.queries` table (#24992)
The system.queries table is now accessible, when queries are initiated in debug mode, which is not currently enabled via the HTTP API, therefore this is not yet accessible unless via the gRPC interface. The system.queries table lists all queries in the QueryLog on the QueryExecutorImpl.pull/25000/head^2
parent
1cb3652692
commit
0201febd52
|
@ -559,7 +559,7 @@ checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193"
|
|||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.63",
|
||||
"syn 2.0.64",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -570,7 +570,7 @@ checksum = "c6fa2087f2753a7da8cc1c0dbfcf89579dd57458e36769de5ac750b4671737ca"
|
|||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.63",
|
||||
"syn 2.0.64",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -992,7 +992,7 @@ dependencies = [
|
|||
"heck 0.5.0",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.63",
|
||||
"syn 2.0.64",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -1294,7 +1294,7 @@ checksum = "f46882e17999c6cc590af592290432be3bce0428cb0d5f8b6715e4dc7b383eb3"
|
|||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.63",
|
||||
"syn 2.0.64",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -1752,9 +1752,9 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "either"
|
||||
version = "1.11.0"
|
||||
version = "1.12.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a47c1c47d2f5964e29c61246e81db715514cd532db6b5116a25ea3c03d6780a2"
|
||||
checksum = "3dca9240753cf90908d7e4aac30f630662b02aebaa1b58a3cadabdb23385b58b"
|
||||
dependencies = [
|
||||
"serde",
|
||||
]
|
||||
|
@ -1991,7 +1991,7 @@ checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac"
|
|||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.63",
|
||||
"syn 2.0.64",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -2057,9 +2057,9 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "getrandom"
|
||||
version = "0.2.15"
|
||||
version = "0.2.14"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c4567c8db10ae91089c99af84c68c38da3ec2f087c3f82960bcdbf3656b6f4d7"
|
||||
checksum = "94b22e06ecb0110981051723910cbf0b5f5e09a2062dd7663334ee79a9d1286c"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"libc",
|
||||
|
@ -2596,6 +2596,7 @@ dependencies = [
|
|||
"iox_query_influxql",
|
||||
"iox_query_influxql_rewrite",
|
||||
"iox_query_params",
|
||||
"iox_system_tables",
|
||||
"iox_time",
|
||||
"metric",
|
||||
"metric_exporters",
|
||||
|
@ -2909,6 +2910,18 @@ dependencies = [
|
|||
"workspace-hack",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "iox_system_tables"
|
||||
version = "0.1.0"
|
||||
source = "git+https://github.com/influxdata/influxdb3_core?rev=0f5ecbd6b17f83f7ad4ba55699fc2cd3e151cf94#0f5ecbd6b17f83f7ad4ba55699fc2cd3e151cf94"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"async-trait",
|
||||
"datafusion",
|
||||
"futures",
|
||||
"workspace-hack",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "iox_time"
|
||||
version = "0.1.0"
|
||||
|
@ -3110,9 +3123,9 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "libc"
|
||||
version = "0.2.154"
|
||||
version = "0.2.153"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ae743338b92ff9146ce83992f766a31066a91a8c84a45e0e9f21e7cf6de6d346"
|
||||
checksum = "9c198f91728a82281a64e1f4f9eeb25d82cb32a5de251c6bd1b5154d63a8e7bd"
|
||||
|
||||
[[package]]
|
||||
name = "libm"
|
||||
|
@ -3139,9 +3152,9 @@ checksum = "0717cef1bc8b636c6e1c1bbdefc09e6322da8a9321966e8928ef80d20f7f770f"
|
|||
|
||||
[[package]]
|
||||
name = "linux-raw-sys"
|
||||
version = "0.4.13"
|
||||
version = "0.4.14"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "01cda141df6706de531b6c46c3a33ecca755538219bd484262fa09410c13539c"
|
||||
checksum = "78b3ae25bc7c8c38cec158d1f2757ee79e9b3740fbc7ccf0e59e4b08d793fa89"
|
||||
|
||||
[[package]]
|
||||
name = "lock_api"
|
||||
|
@ -3842,7 +3855,7 @@ checksum = "2f38a4412a78282e09a2cf38d195ea5420d15ba0602cb375210efbc877243965"
|
|||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.63",
|
||||
"syn 2.0.64",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -3962,7 +3975,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
|||
checksum = "5f12335488a2f3b0a83b14edad48dca9879ce89b2edd10e80237e4e852dd645e"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"syn 2.0.63",
|
||||
"syn 2.0.64",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -4037,7 +4050,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
|||
checksum = "d0f5d036824e4761737860779c906171497f6d55681139d8312388f8fe398922"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"prost-derive 0.12.5",
|
||||
"prost-derive 0.12.4",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -4057,7 +4070,7 @@ dependencies = [
|
|||
"prost 0.12.4",
|
||||
"prost-types 0.12.4",
|
||||
"regex",
|
||||
"syn 2.0.63",
|
||||
"syn 2.0.64",
|
||||
"tempfile",
|
||||
]
|
||||
|
||||
|
@ -4076,15 +4089,15 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "prost-derive"
|
||||
version = "0.12.5"
|
||||
version = "0.12.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9554e3ab233f0a932403704f1a1d08c30d5ccd931adfdfa1e8b5a19b52c1d55a"
|
||||
checksum = "19de2de2a00075bf566bee3bd4db014b11587e84184d3f7a791bc17f1a8e9e48"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"itertools 0.12.1",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.63",
|
||||
"syn 2.0.64",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -4440,7 +4453,7 @@ dependencies = [
|
|||
"log",
|
||||
"ring",
|
||||
"rustls-pki-types",
|
||||
"rustls-webpki 0.102.3",
|
||||
"rustls-webpki 0.102.4",
|
||||
"subtle",
|
||||
"zeroize",
|
||||
]
|
||||
|
@ -4507,9 +4520,9 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "rustls-webpki"
|
||||
version = "0.102.3"
|
||||
version = "0.102.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f3bce581c0dd41bce533ce695a1437fa16a7ab5ac3ccfa99fe1a620a7885eabf"
|
||||
checksum = "ff448f7e92e913c4b7d4c6d8e4540a1724b319b4152b8aef6d4cf8339712b33e"
|
||||
dependencies = [
|
||||
"ring",
|
||||
"rustls-pki-types",
|
||||
|
@ -4649,7 +4662,7 @@ checksum = "6048858004bcff69094cd972ed40a32500f153bd3be9f716b2eed2e8217c4838"
|
|||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.63",
|
||||
"syn 2.0.64",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -4853,7 +4866,7 @@ dependencies = [
|
|||
"heck 0.4.1",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.63",
|
||||
"syn 2.0.64",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -4926,7 +4939,7 @@ checksum = "01b2e185515564f15375f593fb966b5718bc624ba77fe49fa4616ad619690554"
|
|||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.63",
|
||||
"syn 2.0.64",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -5183,7 +5196,7 @@ dependencies = [
|
|||
"proc-macro2",
|
||||
"quote",
|
||||
"rustversion",
|
||||
"syn 2.0.63",
|
||||
"syn 2.0.64",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -5205,9 +5218,9 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "syn"
|
||||
version = "2.0.63"
|
||||
version = "2.0.64"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "bf5be731623ca1a1fb7d8be6f261a3be6d3e2337b8a1f97be944d020c8fcb704"
|
||||
checksum = "7ad3dee41f36859875573074334c200d1add8e4a87bb37113ebd31d926b7b11f"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
|
@ -5358,7 +5371,7 @@ checksum = "e2470041c06ec3ac1ab38d0356a6119054dedaea53e12fbefc0de730a1c08524"
|
|||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.63",
|
||||
"syn 2.0.64",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -5486,7 +5499,7 @@ checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b"
|
|||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.63",
|
||||
"syn 2.0.64",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -5626,7 +5639,7 @@ dependencies = [
|
|||
"proc-macro2",
|
||||
"prost-build",
|
||||
"quote",
|
||||
"syn 2.0.63",
|
||||
"syn 2.0.64",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -5791,7 +5804,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7"
|
|||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.63",
|
||||
"syn 2.0.64",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -6087,7 +6100,7 @@ dependencies = [
|
|||
"once_cell",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.63",
|
||||
"syn 2.0.64",
|
||||
"wasm-bindgen-shared",
|
||||
]
|
||||
|
||||
|
@ -6121,7 +6134,7 @@ checksum = "e94f17b526d0a461a191c78ea52bbce64071ed5c04c9ffe424dcb38f74171bb7"
|
|||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.63",
|
||||
"syn 2.0.64",
|
||||
"wasm-bindgen-backend",
|
||||
"wasm-bindgen-shared",
|
||||
]
|
||||
|
@ -6471,7 +6484,7 @@ dependencies = [
|
|||
"strum",
|
||||
"subtle",
|
||||
"syn 1.0.109",
|
||||
"syn 2.0.63",
|
||||
"syn 2.0.64",
|
||||
"thrift",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
|
@ -6525,7 +6538,7 @@ checksum = "15e934569e47891f7d9411f1a451d947a60e000ab3bd24fbb970f000387d1b3b"
|
|||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.63",
|
||||
"syn 2.0.64",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -6545,7 +6558,7 @@ checksum = "ce36e65b0d2999d2aafac989fb249189a141aee1f53c612c1f37d72631959f69"
|
|||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.63",
|
||||
"syn 2.0.64",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
|
|
@ -156,6 +156,17 @@ pub struct Config {
|
|||
action
|
||||
)]
|
||||
pub segment_duration: SegmentDuration,
|
||||
|
||||
// TODO - tune this default:
|
||||
/// The size of the query log. Up to this many queries will remain in the log before
|
||||
/// old queries are evicted to make room for new ones.
|
||||
#[clap(
|
||||
long = "query-log-size",
|
||||
env = "INFLUXDB3_QUERY_LOG_SIZE",
|
||||
default_value = "1000",
|
||||
action
|
||||
)]
|
||||
pub query_log_size: usize,
|
||||
}
|
||||
|
||||
/// If `p` does not exist, try to create it as a directory.
|
||||
|
@ -275,6 +286,7 @@ pub async fn command(config: Config) -> Result<()> {
|
|||
Arc::clone(&metrics),
|
||||
Arc::new(config.datafusion_config),
|
||||
10,
|
||||
config.query_log_size,
|
||||
));
|
||||
|
||||
let builder = ServerBuilder::new(common_state)
|
||||
|
|
|
@ -17,6 +17,7 @@ mod flight;
|
|||
mod limits;
|
||||
mod ping;
|
||||
mod query;
|
||||
mod system_tables;
|
||||
mod write;
|
||||
|
||||
/// Configuration for a [`TestServer`]
|
||||
|
@ -115,6 +116,14 @@ impl TestServer {
|
|||
|
||||
/// Get a [`FlightSqlClient`] for making requests to the running service over gRPC
|
||||
pub async fn flight_sql_client(&self, database: &str) -> FlightSqlClient {
|
||||
self.flight_sql_client_debug_mode(database, false).await
|
||||
}
|
||||
|
||||
pub async fn flight_sql_client_debug_mode(
|
||||
&self,
|
||||
database: &str,
|
||||
debug_mode: bool,
|
||||
) -> FlightSqlClient {
|
||||
let channel = tonic::transport::Channel::from_shared(self.client_addr())
|
||||
.expect("create tonic channel")
|
||||
.connect()
|
||||
|
@ -122,7 +131,9 @@ impl TestServer {
|
|||
.expect("connect to gRPC client");
|
||||
let mut client = FlightSqlClient::new(channel);
|
||||
client.add_header("database", database).unwrap();
|
||||
client.add_header("iox-debug", "true").unwrap();
|
||||
if debug_mode {
|
||||
client.add_header("iox-debug", "true").unwrap();
|
||||
}
|
||||
client
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,92 @@
|
|||
use arrow_util::assert_batches_sorted_eq;
|
||||
use influxdb3_client::Precision;
|
||||
|
||||
use crate::{collect_stream, TestServer};
|
||||
|
||||
#[tokio::test]
|
||||
async fn queries_table() {
|
||||
let server = TestServer::spawn().await;
|
||||
|
||||
server
|
||||
.write_lp_to_db(
|
||||
"foo",
|
||||
"cpu,host=s1,region=us-east usage=0.9 1\n\
|
||||
cpu,host=s1,region=us-east usage=0.89 2\n\
|
||||
cpu,host=s1,region=us-east usage=0.85 3",
|
||||
Precision::Nanosecond,
|
||||
)
|
||||
.await
|
||||
.expect("write some lp");
|
||||
|
||||
let mut client = server.flight_sql_client_debug_mode("foo", true).await;
|
||||
|
||||
// Check queries table for completed queries, will be empty:
|
||||
{
|
||||
let response = client
|
||||
.query("SELECT COUNT(*) FROM system.queries WHERE running = false")
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let batches = collect_stream(response).await;
|
||||
assert_batches_sorted_eq!(
|
||||
[
|
||||
"+----------+",
|
||||
"| COUNT(*) |",
|
||||
"+----------+",
|
||||
"| 0 |",
|
||||
"+----------+",
|
||||
],
|
||||
&batches
|
||||
);
|
||||
}
|
||||
|
||||
// Do some queries, to produce some query logs:
|
||||
{
|
||||
let queries = [
|
||||
"SELECT * FROM cpu", // valid
|
||||
"SELECT * FROM mem", // not valid table, will fail, and not be logged
|
||||
"SELECT usage, time FROM cpu", // specific columns
|
||||
];
|
||||
for q in queries {
|
||||
let response = client.query(q).await;
|
||||
// collect the stream to make sure the query completes:
|
||||
if let Ok(stream) = response {
|
||||
let _batches = collect_stream(stream).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Now check the log:
|
||||
{
|
||||
// A sub-set of columns is selected in this query, because the queries
|
||||
// table contains may columns whose values change in susequent test runs
|
||||
let response = client
|
||||
.query(
|
||||
"SELECT \
|
||||
phase, \
|
||||
query_type, \
|
||||
query_text, \
|
||||
success, \
|
||||
running, \
|
||||
cancelled \
|
||||
FROM system.queries \
|
||||
WHERE success = true",
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let batches = collect_stream(response).await;
|
||||
assert_batches_sorted_eq!(
|
||||
[
|
||||
"+---------+------------+--------------------------------------------------------------------------------+---------+---------+-----------+",
|
||||
"| phase | query_type | query_text | success | running | cancelled |",
|
||||
"+---------+------------+--------------------------------------------------------------------------------+---------+---------+-----------+",
|
||||
"| success | flightsql | CommandStatementQuerySELECT * FROM cpu | true | false | false |",
|
||||
"| success | flightsql | CommandStatementQuerySELECT COUNT(*) FROM system.queries WHERE running = false | true | false | false |",
|
||||
"| success | flightsql | CommandStatementQuerySELECT usage, time FROM cpu | true | false | false |",
|
||||
"+---------+------------+--------------------------------------------------------------------------------+---------+---------+-----------+",
|
||||
],
|
||||
&batches
|
||||
);
|
||||
}
|
||||
}
|
|
@ -16,6 +16,7 @@ iox_http.workspace = true
|
|||
iox_query.workspace = true
|
||||
iox_query_params.workspace = true
|
||||
iox_query_influxql.workspace = true
|
||||
iox_system_tables.workspace = true
|
||||
iox_time.workspace = true
|
||||
metric.workspace = true
|
||||
metric_exporters.workspace = true
|
||||
|
|
|
@ -290,6 +290,7 @@ mod tests {
|
|||
Arc::clone(&metrics),
|
||||
Arc::new(HashMap::new()),
|
||||
10,
|
||||
10,
|
||||
));
|
||||
|
||||
let server = ServerBuilder::new(common_state)
|
||||
|
@ -449,6 +450,7 @@ mod tests {
|
|||
Arc::clone(&metrics),
|
||||
Arc::new(HashMap::new()),
|
||||
10,
|
||||
10,
|
||||
);
|
||||
|
||||
let server = ServerBuilder::new(common_state)
|
||||
|
@ -655,6 +657,7 @@ mod tests {
|
|||
Arc::clone(&metrics),
|
||||
Arc::new(HashMap::new()),
|
||||
10,
|
||||
10,
|
||||
);
|
||||
|
||||
let server = ServerBuilder::new(common_state)
|
||||
|
|
|
@ -1,9 +1,12 @@
|
|||
//! module for query executor
|
||||
use crate::{QueryExecutor, QueryKind};
|
||||
use arrow::array::{ArrayRef, Int64Builder, StringBuilder, StructArray};
|
||||
use arrow::array::{
|
||||
ArrayRef, BooleanArray, DurationNanosecondArray, Int64Array, Int64Builder, StringBuilder,
|
||||
StructArray, TimestampNanosecondArray,
|
||||
};
|
||||
use arrow::datatypes::SchemaRef;
|
||||
use arrow::record_batch::RecordBatch;
|
||||
use arrow_schema::ArrowError;
|
||||
use arrow_schema::{ArrowError, TimeUnit};
|
||||
use async_trait::async_trait;
|
||||
use data_types::NamespaceId;
|
||||
use datafusion::catalog::schema::SchemaProvider;
|
||||
|
@ -26,14 +29,15 @@ use influxdb3_write::{
|
|||
use iox_query::exec::{Executor, IOxSessionContext, QueryConfig};
|
||||
use iox_query::frontend::sql::SqlQueryPlanner;
|
||||
use iox_query::provider::ProviderBuilder;
|
||||
use iox_query::query_log::QueryLog;
|
||||
use iox_query::query_log::QueryText;
|
||||
use iox_query::query_log::StateReceived;
|
||||
use iox_query::query_log::{QueryCompletedToken, QueryLogEntries};
|
||||
use iox_query::query_log::{QueryLog, QueryLogEntryState};
|
||||
use iox_query::query_log::{QueryPhase, QueryText};
|
||||
use iox_query::QueryDatabase;
|
||||
use iox_query::{QueryChunk, QueryNamespace};
|
||||
use iox_query_influxql::frontend::planner::InfluxQLQueryPlanner;
|
||||
use iox_query_params::StatementParams;
|
||||
use iox_system_tables::{IoxSystemTable, SystemTableProvider};
|
||||
use metric::Registry;
|
||||
use observability_deps::tracing::{debug, info, trace};
|
||||
use schema::Schema;
|
||||
|
@ -66,6 +70,7 @@ impl<W: WriteBuffer> QueryExecutorImpl<W> {
|
|||
metrics: Arc<Registry>,
|
||||
datafusion_config: Arc<HashMap<String, String>>,
|
||||
concurrent_query_limit: usize,
|
||||
query_log_size: usize,
|
||||
) -> Self {
|
||||
let semaphore_metrics = Arc::new(AsyncSemaphoreMetrics::new(
|
||||
&metrics,
|
||||
|
@ -73,10 +78,8 @@ impl<W: WriteBuffer> QueryExecutorImpl<W> {
|
|||
));
|
||||
let query_execution_semaphore =
|
||||
Arc::new(semaphore_metrics.new_semaphore(concurrent_query_limit));
|
||||
// TODO Fine tune this number or make configurable
|
||||
const QUERY_LOG_LIMIT: usize = 1_000;
|
||||
let query_log = Arc::new(QueryLog::new(
|
||||
QUERY_LOG_LIMIT,
|
||||
query_log_size,
|
||||
Arc::new(iox_time::SystemProvider::new()),
|
||||
));
|
||||
Self {
|
||||
|
@ -117,16 +120,15 @@ impl<W: WriteBuffer> QueryExecutor for QueryExecutorImpl<W> {
|
|||
// TODO - configure query here?
|
||||
let ctx = db.new_query_context(span_ctx, Default::default());
|
||||
|
||||
let params = params.unwrap_or_default();
|
||||
let token = db.record_query(
|
||||
external_span_ctx.as_ref().map(RequestLogContext::ctx),
|
||||
"sql",
|
||||
Box::new(q.to_string()),
|
||||
// TODO - ignoring params for now:
|
||||
StatementParams::default(),
|
||||
params.clone(),
|
||||
);
|
||||
|
||||
info!("plan");
|
||||
let params = params.unwrap_or_default();
|
||||
let plan = match kind {
|
||||
QueryKind::Sql => {
|
||||
let planner = SqlQueryPlanner::new();
|
||||
|
@ -137,7 +139,14 @@ impl<W: WriteBuffer> QueryExecutor for QueryExecutorImpl<W> {
|
|||
planner.query(q, params, &ctx).await
|
||||
}
|
||||
}
|
||||
.map_err(Error::QueryPlanning)?;
|
||||
.map_err(Error::QueryPlanning);
|
||||
let plan = match plan {
|
||||
Ok(plan) => plan,
|
||||
Err(e) => {
|
||||
token.fail();
|
||||
return Err(e);
|
||||
}
|
||||
};
|
||||
let token = token.planned(&ctx, Arc::clone(&plan));
|
||||
|
||||
// TODO: Enforce concurrency limit here
|
||||
|
@ -318,7 +327,7 @@ impl<W: WriteBuffer> QueryDatabase for QueryExecutorImpl<W> {
|
|||
}
|
||||
|
||||
fn query_log(&self) -> QueryLogEntries {
|
||||
todo!();
|
||||
self.query_log.entries()
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -448,13 +457,14 @@ impl<B: WriteBuffer> CatalogProvider for Database<B> {
|
|||
|
||||
fn schema_names(&self) -> Vec<String> {
|
||||
info!("CatalogProvider schema_names");
|
||||
vec![DEFAULT_SCHEMA.to_string()]
|
||||
vec![DEFAULT_SCHEMA.to_string(), SYSTEM_SCHEMA.to_string()]
|
||||
}
|
||||
|
||||
fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
|
||||
info!("CatalogProvider schema {}", name);
|
||||
match name {
|
||||
DEFAULT_SCHEMA => Some(Arc::new(Self::from_namespace(self))),
|
||||
SYSTEM_SCHEMA => Some(Arc::clone(&self.system_schema_provider) as _),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
@ -554,7 +564,9 @@ impl<B: WriteBuffer> TableProvider for QueryTable<B> {
|
|||
}
|
||||
}
|
||||
|
||||
const _QUERIES_TABLE: &str = "queries";
|
||||
pub const SYSTEM_SCHEMA: &str = "system";
|
||||
|
||||
const QUERIES_TABLE: &str = "queries";
|
||||
const _PARQUET_FILES_TABLE: &str = "parquet_files";
|
||||
|
||||
struct SystemSchemaProvider {
|
||||
|
@ -573,17 +585,251 @@ impl std::fmt::Debug for SystemSchemaProvider {
|
|||
}
|
||||
|
||||
impl SystemSchemaProvider {
|
||||
fn new(_catalog: Arc<Catalog>, _query_log: Arc<QueryLog>, include_debug_info: bool) -> Self {
|
||||
let tables = HashMap::new();
|
||||
fn new(_catalog: Arc<Catalog>, query_log: Arc<QueryLog>, include_debug_info: bool) -> Self {
|
||||
let mut tables = HashMap::<&'static str, Arc<dyn TableProvider>>::new();
|
||||
if include_debug_info {
|
||||
// Using todo!() here causes gRPC integration tests to fail, likely because they
|
||||
// enable debug mode by default, thus entering this if block. So, just leaving this
|
||||
// here in lieu of todo!().
|
||||
//
|
||||
// Eventually, we will implement the queries and parquet_files tables and they will
|
||||
// be injected to the provider's table hashmap here...
|
||||
info!("TODO - gather system tables");
|
||||
// TODO - remaining system tables gathered here...
|
||||
let queries = Arc::new(SystemTableProvider::new(Arc::new(QueriesTable::new(
|
||||
query_log,
|
||||
))));
|
||||
tables.insert(QUERIES_TABLE, queries);
|
||||
}
|
||||
Self { tables }
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl SchemaProvider for SystemSchemaProvider {
|
||||
fn as_any(&self) -> &dyn Any {
|
||||
self as &dyn Any
|
||||
}
|
||||
|
||||
fn table_names(&self) -> Vec<String> {
|
||||
let mut names = self
|
||||
.tables
|
||||
.keys()
|
||||
.map(|s| (*s).to_owned())
|
||||
.collect::<Vec<_>>();
|
||||
names.sort();
|
||||
names
|
||||
}
|
||||
|
||||
async fn table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>, DataFusionError> {
|
||||
Ok(self.tables.get(name).cloned())
|
||||
}
|
||||
|
||||
fn table_exist(&self, name: &str) -> bool {
|
||||
self.tables.contains_key(name)
|
||||
}
|
||||
}
|
||||
|
||||
struct QueriesTable {
|
||||
schema: SchemaRef,
|
||||
query_log: Arc<QueryLog>,
|
||||
}
|
||||
|
||||
impl QueriesTable {
|
||||
fn new(query_log: Arc<QueryLog>) -> Self {
|
||||
Self {
|
||||
schema: queries_schema(),
|
||||
query_log,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl IoxSystemTable for QueriesTable {
|
||||
fn schema(&self) -> SchemaRef {
|
||||
Arc::clone(&self.schema)
|
||||
}
|
||||
|
||||
async fn scan(
|
||||
&self,
|
||||
_filters: Option<Vec<Expr>>,
|
||||
_limit: Option<usize>,
|
||||
) -> Result<RecordBatch, DataFusionError> {
|
||||
let schema = self.schema();
|
||||
|
||||
let entries = self
|
||||
.query_log
|
||||
.entries()
|
||||
.entries
|
||||
.into_iter()
|
||||
.map(|e| e.state())
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
from_query_log_entries(Arc::clone(&schema), &entries)
|
||||
}
|
||||
}
|
||||
|
||||
fn queries_schema() -> SchemaRef {
|
||||
let columns = vec![
|
||||
Field::new("id", DataType::Utf8, false),
|
||||
Field::new("phase", DataType::Utf8, false),
|
||||
Field::new(
|
||||
"issue_time",
|
||||
DataType::Timestamp(TimeUnit::Nanosecond, None),
|
||||
false,
|
||||
),
|
||||
Field::new("query_type", DataType::Utf8, false),
|
||||
Field::new("query_text", DataType::Utf8, false),
|
||||
Field::new("partitions", DataType::Int64, true),
|
||||
Field::new("parquet_files", DataType::Int64, true),
|
||||
Field::new(
|
||||
"plan_duration",
|
||||
DataType::Duration(TimeUnit::Nanosecond),
|
||||
true,
|
||||
),
|
||||
Field::new(
|
||||
"permit_duration",
|
||||
DataType::Duration(TimeUnit::Nanosecond),
|
||||
true,
|
||||
),
|
||||
Field::new(
|
||||
"execute_duration",
|
||||
DataType::Duration(TimeUnit::Nanosecond),
|
||||
true,
|
||||
),
|
||||
Field::new(
|
||||
"end2end_duration",
|
||||
DataType::Duration(TimeUnit::Nanosecond),
|
||||
true,
|
||||
),
|
||||
Field::new(
|
||||
"compute_duration",
|
||||
DataType::Duration(TimeUnit::Nanosecond),
|
||||
true,
|
||||
),
|
||||
Field::new("max_memory", DataType::Int64, true),
|
||||
Field::new("success", DataType::Boolean, false),
|
||||
Field::new("running", DataType::Boolean, false),
|
||||
Field::new("cancelled", DataType::Boolean, false),
|
||||
Field::new("trace_id", DataType::Utf8, true),
|
||||
];
|
||||
|
||||
Arc::new(DatafusionSchema::new(columns))
|
||||
}
|
||||
|
||||
fn from_query_log_entries(
|
||||
schema: SchemaRef,
|
||||
entries: &[Arc<QueryLogEntryState>],
|
||||
) -> Result<RecordBatch, DataFusionError> {
|
||||
let mut columns: Vec<ArrayRef> = vec![];
|
||||
|
||||
columns.push(Arc::new(
|
||||
entries
|
||||
.iter()
|
||||
.map(|e| Some(e.id.to_string()))
|
||||
.collect::<StringArray>(),
|
||||
));
|
||||
|
||||
columns.push(Arc::new(
|
||||
entries
|
||||
.iter()
|
||||
.map(|e| Some(e.phase.name()))
|
||||
.collect::<StringArray>(),
|
||||
));
|
||||
|
||||
columns.push(Arc::new(
|
||||
entries
|
||||
.iter()
|
||||
.map(|e| e.issue_time)
|
||||
.map(|ts| Some(ts.timestamp_nanos()))
|
||||
.collect::<TimestampNanosecondArray>(),
|
||||
));
|
||||
|
||||
columns.push(Arc::new(
|
||||
entries
|
||||
.iter()
|
||||
.map(|e| Some(&e.query_type))
|
||||
.collect::<StringArray>(),
|
||||
));
|
||||
|
||||
columns.push(Arc::new(
|
||||
entries
|
||||
.iter()
|
||||
.map(|e| Some(e.query_text.to_string()))
|
||||
.collect::<StringArray>(),
|
||||
));
|
||||
|
||||
columns.push(Arc::new(
|
||||
entries.iter().map(|e| e.partitions).collect::<Int64Array>(),
|
||||
));
|
||||
|
||||
columns.push(Arc::new(
|
||||
entries
|
||||
.iter()
|
||||
.map(|e| e.parquet_files)
|
||||
.collect::<Int64Array>(),
|
||||
));
|
||||
|
||||
columns.push(Arc::new(
|
||||
entries
|
||||
.iter()
|
||||
.map(|e| e.plan_duration.map(|d| d.as_nanos() as i64))
|
||||
.collect::<DurationNanosecondArray>(),
|
||||
));
|
||||
|
||||
columns.push(Arc::new(
|
||||
entries
|
||||
.iter()
|
||||
.map(|e| e.permit_duration.map(|d| d.as_nanos() as i64))
|
||||
.collect::<DurationNanosecondArray>(),
|
||||
));
|
||||
|
||||
columns.push(Arc::new(
|
||||
entries
|
||||
.iter()
|
||||
.map(|e| e.execute_duration.map(|d| d.as_nanos() as i64))
|
||||
.collect::<DurationNanosecondArray>(),
|
||||
));
|
||||
|
||||
columns.push(Arc::new(
|
||||
entries
|
||||
.iter()
|
||||
.map(|e| e.end2end_duration.map(|d| d.as_nanos() as i64))
|
||||
.collect::<DurationNanosecondArray>(),
|
||||
));
|
||||
|
||||
columns.push(Arc::new(
|
||||
entries
|
||||
.iter()
|
||||
.map(|e| e.compute_duration.map(|d| d.as_nanos() as i64))
|
||||
.collect::<DurationNanosecondArray>(),
|
||||
));
|
||||
|
||||
columns.push(Arc::new(
|
||||
entries.iter().map(|e| e.max_memory).collect::<Int64Array>(),
|
||||
));
|
||||
|
||||
columns.push(Arc::new(
|
||||
entries
|
||||
.iter()
|
||||
.map(|e| Some(e.success))
|
||||
.collect::<BooleanArray>(),
|
||||
));
|
||||
|
||||
columns.push(Arc::new(
|
||||
entries
|
||||
.iter()
|
||||
.map(|e| Some(e.running))
|
||||
.collect::<BooleanArray>(),
|
||||
));
|
||||
|
||||
columns.push(Arc::new(
|
||||
entries
|
||||
.iter()
|
||||
.map(|e| Some(e.phase == QueryPhase::Cancel))
|
||||
.collect::<BooleanArray>(),
|
||||
));
|
||||
|
||||
columns.push(Arc::new(
|
||||
entries
|
||||
.iter()
|
||||
.map(|e| e.trace_id.map(|x| format!("{:x}", x.0)))
|
||||
.collect::<StringArray>(),
|
||||
));
|
||||
|
||||
let batch = RecordBatch::try_new(schema, columns)?;
|
||||
Ok(batch)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue