Merge branch 'main' into cn/timeout
commit
11aa3571c7
|
@ -80,6 +80,12 @@ version = "0.1.6"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4b46cbb362ab8752921c97e041f5e366ee6297bd428a31275b9fcf1e380f7299"
|
||||
|
||||
[[package]]
|
||||
name = "anstyle"
|
||||
version = "0.3.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "80c697cc33851b02ab0c26b2e8a211684fbe627ff1cc506131f35026dd7686dd"
|
||||
|
||||
[[package]]
|
||||
name = "anyhow"
|
||||
version = "1.0.69"
|
||||
|
@ -360,10 +366,11 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "assert_cmd"
|
||||
version = "2.0.8"
|
||||
version = "2.0.9"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9834fcc22e0874394a010230586367d4a3e9f11b560f469262678547e1d2575e"
|
||||
checksum = "c0dcbed38184f9219183fcf38beb4cdbf5df7163a6d7cd227c6ac89b7966d6fe"
|
||||
dependencies = [
|
||||
"anstyle",
|
||||
"bstr",
|
||||
"doc-comment",
|
||||
"predicates",
|
||||
|
@ -1409,7 +1416,7 @@ dependencies = [
|
|||
[[package]]
|
||||
name = "datafusion"
|
||||
version = "20.0.0"
|
||||
source = "git+https://github.com/apache/arrow-datafusion.git?rev=612eb1d0ce338af7980fa906df8796eb47c4be44#612eb1d0ce338af7980fa906df8796eb47c4be44"
|
||||
source = "git+https://github.com/apache/arrow-datafusion.git?rev=4afd67a0e496e1834ad6184629f28e60f66b2777#4afd67a0e496e1834ad6184629f28e60f66b2777"
|
||||
dependencies = [
|
||||
"ahash 0.8.3",
|
||||
"arrow",
|
||||
|
@ -1456,7 +1463,7 @@ dependencies = [
|
|||
[[package]]
|
||||
name = "datafusion-common"
|
||||
version = "20.0.0"
|
||||
source = "git+https://github.com/apache/arrow-datafusion.git?rev=612eb1d0ce338af7980fa906df8796eb47c4be44#612eb1d0ce338af7980fa906df8796eb47c4be44"
|
||||
source = "git+https://github.com/apache/arrow-datafusion.git?rev=4afd67a0e496e1834ad6184629f28e60f66b2777#4afd67a0e496e1834ad6184629f28e60f66b2777"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"chrono",
|
||||
|
@ -1469,7 +1476,7 @@ dependencies = [
|
|||
[[package]]
|
||||
name = "datafusion-execution"
|
||||
version = "20.0.0"
|
||||
source = "git+https://github.com/apache/arrow-datafusion.git?rev=612eb1d0ce338af7980fa906df8796eb47c4be44#612eb1d0ce338af7980fa906df8796eb47c4be44"
|
||||
source = "git+https://github.com/apache/arrow-datafusion.git?rev=4afd67a0e496e1834ad6184629f28e60f66b2777#4afd67a0e496e1834ad6184629f28e60f66b2777"
|
||||
dependencies = [
|
||||
"dashmap",
|
||||
"datafusion-common",
|
||||
|
@ -1486,7 +1493,7 @@ dependencies = [
|
|||
[[package]]
|
||||
name = "datafusion-expr"
|
||||
version = "20.0.0"
|
||||
source = "git+https://github.com/apache/arrow-datafusion.git?rev=612eb1d0ce338af7980fa906df8796eb47c4be44#612eb1d0ce338af7980fa906df8796eb47c4be44"
|
||||
source = "git+https://github.com/apache/arrow-datafusion.git?rev=4afd67a0e496e1834ad6184629f28e60f66b2777#4afd67a0e496e1834ad6184629f28e60f66b2777"
|
||||
dependencies = [
|
||||
"ahash 0.8.3",
|
||||
"arrow",
|
||||
|
@ -1497,7 +1504,7 @@ dependencies = [
|
|||
[[package]]
|
||||
name = "datafusion-optimizer"
|
||||
version = "20.0.0"
|
||||
source = "git+https://github.com/apache/arrow-datafusion.git?rev=612eb1d0ce338af7980fa906df8796eb47c4be44#612eb1d0ce338af7980fa906df8796eb47c4be44"
|
||||
source = "git+https://github.com/apache/arrow-datafusion.git?rev=4afd67a0e496e1834ad6184629f28e60f66b2777#4afd67a0e496e1834ad6184629f28e60f66b2777"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"async-trait",
|
||||
|
@ -1514,7 +1521,7 @@ dependencies = [
|
|||
[[package]]
|
||||
name = "datafusion-physical-expr"
|
||||
version = "20.0.0"
|
||||
source = "git+https://github.com/apache/arrow-datafusion.git?rev=612eb1d0ce338af7980fa906df8796eb47c4be44#612eb1d0ce338af7980fa906df8796eb47c4be44"
|
||||
source = "git+https://github.com/apache/arrow-datafusion.git?rev=4afd67a0e496e1834ad6184629f28e60f66b2777#4afd67a0e496e1834ad6184629f28e60f66b2777"
|
||||
dependencies = [
|
||||
"ahash 0.8.3",
|
||||
"arrow",
|
||||
|
@ -1544,7 +1551,7 @@ dependencies = [
|
|||
[[package]]
|
||||
name = "datafusion-proto"
|
||||
version = "20.0.0"
|
||||
source = "git+https://github.com/apache/arrow-datafusion.git?rev=612eb1d0ce338af7980fa906df8796eb47c4be44#612eb1d0ce338af7980fa906df8796eb47c4be44"
|
||||
source = "git+https://github.com/apache/arrow-datafusion.git?rev=4afd67a0e496e1834ad6184629f28e60f66b2777#4afd67a0e496e1834ad6184629f28e60f66b2777"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"chrono",
|
||||
|
@ -1560,7 +1567,7 @@ dependencies = [
|
|||
[[package]]
|
||||
name = "datafusion-row"
|
||||
version = "20.0.0"
|
||||
source = "git+https://github.com/apache/arrow-datafusion.git?rev=612eb1d0ce338af7980fa906df8796eb47c4be44#612eb1d0ce338af7980fa906df8796eb47c4be44"
|
||||
source = "git+https://github.com/apache/arrow-datafusion.git?rev=4afd67a0e496e1834ad6184629f28e60f66b2777#4afd67a0e496e1834ad6184629f28e60f66b2777"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"datafusion-common",
|
||||
|
@ -1571,7 +1578,7 @@ dependencies = [
|
|||
[[package]]
|
||||
name = "datafusion-sql"
|
||||
version = "20.0.0"
|
||||
source = "git+https://github.com/apache/arrow-datafusion.git?rev=612eb1d0ce338af7980fa906df8796eb47c4be44#612eb1d0ce338af7980fa906df8796eb47c4be44"
|
||||
source = "git+https://github.com/apache/arrow-datafusion.git?rev=4afd67a0e496e1834ad6184629f28e60f66b2777#4afd67a0e496e1834ad6184629f28e60f66b2777"
|
||||
dependencies = [
|
||||
"arrow-schema",
|
||||
"datafusion-common",
|
||||
|
@ -2514,6 +2521,7 @@ dependencies = [
|
|||
"influxdb_iox_client",
|
||||
"influxdb_storage_client",
|
||||
"influxrpc_parser",
|
||||
"insta",
|
||||
"iox_catalog",
|
||||
"iox_query",
|
||||
"iox_time",
|
||||
|
@ -4321,10 +4329,11 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "predicates"
|
||||
version = "2.1.5"
|
||||
version = "3.0.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "59230a63c37f3e18569bdb90e4a89cbf5bf8b06fea0b84e65ea10cc4df47addd"
|
||||
checksum = "1ba7d6ead3e3966038f68caa9fc1f860185d95a793180bbcfe0d0da47b3961ed"
|
||||
dependencies = [
|
||||
"anstyle",
|
||||
"difflib",
|
||||
"float-cmp",
|
||||
"itertools",
|
||||
|
@ -4335,9 +4344,9 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "predicates-core"
|
||||
version = "1.0.5"
|
||||
version = "1.0.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "72f883590242d3c6fc5bf50299011695fa6590c2c70eac95ee1bdb9a733ad1a2"
|
||||
checksum = "b794032607612e7abeb4db69adb4e33590fa6cf1149e95fd7cb00e634b92f174"
|
||||
|
||||
[[package]]
|
||||
name = "predicates-tree"
|
||||
|
|
|
@ -118,8 +118,8 @@ license = "MIT OR Apache-2.0"
|
|||
[workspace.dependencies]
|
||||
arrow = { version = "34.0.0" }
|
||||
arrow-flight = { version = "34.0.0" }
|
||||
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev="612eb1d0ce338af7980fa906df8796eb47c4be44", default-features = false }
|
||||
datafusion-proto = { git = "https://github.com/apache/arrow-datafusion.git", rev="612eb1d0ce338af7980fa906df8796eb47c4be44" }
|
||||
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev="4afd67a0e496e1834ad6184629f28e60f66b2777", default-features = false }
|
||||
datafusion-proto = { git = "https://github.com/apache/arrow-datafusion.git", rev="4afd67a0e496e1834ad6184629f28e60f66b2777" }
|
||||
hashbrown = { version = "0.13.2" }
|
||||
parquet = { version = "34.0.0" }
|
||||
|
||||
|
|
|
@ -4,7 +4,7 @@ use std::fmt::Display;
|
|||
|
||||
use arrow_flight::sql::{
|
||||
ActionClosePreparedStatementRequest, ActionCreatePreparedStatementRequest, Any,
|
||||
CommandPreparedStatementQuery, CommandStatementQuery,
|
||||
CommandGetCatalogs, CommandGetDbSchemas, CommandPreparedStatementQuery, CommandStatementQuery,
|
||||
};
|
||||
use bytes::Bytes;
|
||||
use prost::Message;
|
||||
|
@ -14,7 +14,7 @@ use crate::error::*;
|
|||
|
||||
/// Represents a prepared statement "handle". IOx passes all state
|
||||
/// required to run the prepared statement back and forth to the
|
||||
/// client so any querier instance can run it
|
||||
/// client, so any querier instance can run it
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
pub struct PreparedStatementHandle {
|
||||
/// The raw SQL query text
|
||||
|
@ -57,14 +57,25 @@ impl From<PreparedStatementHandle> for Bytes {
|
|||
}
|
||||
}
|
||||
|
||||
/// Decoded / validated FlightSQL command messages
|
||||
/// Decoded / validated FlightSQL command messages
|
||||
///
|
||||
/// Handles encoding/decoding prost::Any messages back
|
||||
/// and forth to native Rust types
|
||||
///
|
||||
/// TODO use / contribute upstream arrow-flight implementation, when ready:
|
||||
/// <https://github.com/apache/arrow-rs/issues/3874>
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
pub enum FlightSQLCommand {
|
||||
CommandStatementQuery(String),
|
||||
/// Run a prepared statement
|
||||
CommandStatementQuery(CommandStatementQuery),
|
||||
/// Run a prepared statement.
|
||||
CommandPreparedStatementQuery(PreparedStatementHandle),
|
||||
/// Get a list of the available catalogs. See [`CommandGetCatalogs`] for details.
|
||||
CommandGetCatalogs(CommandGetCatalogs),
|
||||
/// Get a list of the available schemas. See [`CommandGetDbSchemas`]
|
||||
/// for details and how to interpret the parameters.
|
||||
CommandGetDbSchemas(CommandGetDbSchemas),
|
||||
/// Create a prepared statement
|
||||
ActionCreatePreparedStatementRequest(String),
|
||||
ActionCreatePreparedStatementRequest(ActionCreatePreparedStatementRequest),
|
||||
/// Close a prepared statement
|
||||
ActionClosePreparedStatementRequest(PreparedStatementHandle),
|
||||
}
|
||||
|
@ -72,10 +83,29 @@ pub enum FlightSQLCommand {
|
|||
impl Display for FlightSQLCommand {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
Self::CommandStatementQuery(q) => write!(f, "CommandStatementQuery{q}"),
|
||||
Self::CommandStatementQuery(CommandStatementQuery { query }) => {
|
||||
write!(f, "CommandStatementQuery{query}")
|
||||
}
|
||||
Self::CommandPreparedStatementQuery(h) => write!(f, "CommandPreparedStatementQuery{h}"),
|
||||
Self::ActionCreatePreparedStatementRequest(q) => {
|
||||
write!(f, "ActionCreatePreparedStatementRequest{q}")
|
||||
Self::CommandGetCatalogs(CommandGetCatalogs {}) => write!(f, "CommandGetCatalogs"),
|
||||
Self::CommandGetDbSchemas(CommandGetDbSchemas {
|
||||
catalog,
|
||||
db_schema_filter_pattern,
|
||||
}) => {
|
||||
write!(
|
||||
f,
|
||||
"CommandGetCatalogs(catalog={}, db_schema_filter_pattern={}",
|
||||
catalog.as_ref().map(|c| c.as_str()).unwrap_or("<NONE>"),
|
||||
db_schema_filter_pattern
|
||||
.as_ref()
|
||||
.map(|c| c.as_str())
|
||||
.unwrap_or("<NONE>")
|
||||
)
|
||||
}
|
||||
Self::ActionCreatePreparedStatementRequest(ActionCreatePreparedStatementRequest {
|
||||
query,
|
||||
}) => {
|
||||
write!(f, "ActionCreatePreparedStatementRequest{query}")
|
||||
}
|
||||
Self::ActionClosePreparedStatementRequest(h) => {
|
||||
write!(f, "ActionClosePreparedStatementRequest{h}")
|
||||
|
@ -91,21 +121,24 @@ impl FlightSQLCommand {
|
|||
let msg: Any = Message::decode(msg)?;
|
||||
|
||||
if let Some(decoded_cmd) = Any::unpack::<CommandStatementQuery>(&msg)? {
|
||||
let CommandStatementQuery { query } = decoded_cmd;
|
||||
Ok(Self::CommandStatementQuery(query))
|
||||
Ok(Self::CommandStatementQuery(decoded_cmd))
|
||||
} else if let Some(decoded_cmd) = Any::unpack::<CommandPreparedStatementQuery>(&msg)? {
|
||||
let CommandPreparedStatementQuery {
|
||||
prepared_statement_handle,
|
||||
} = decoded_cmd;
|
||||
|
||||
// Decode to IOx specific structure
|
||||
let handle = PreparedStatementHandle::try_decode(prepared_statement_handle)?;
|
||||
Ok(Self::CommandPreparedStatementQuery(handle))
|
||||
} else if let Some(decoded_cmd) = Any::unpack::<CommandGetCatalogs>(&msg)? {
|
||||
Ok(Self::CommandGetCatalogs(decoded_cmd))
|
||||
} else if let Some(decoded_cmd) = Any::unpack::<CommandGetDbSchemas>(&msg)? {
|
||||
Ok(Self::CommandGetDbSchemas(decoded_cmd))
|
||||
} else if let Some(decoded_cmd) = Any::unpack::<ActionCreatePreparedStatementRequest>(&msg)?
|
||||
{
|
||||
let ActionCreatePreparedStatementRequest { query } = decoded_cmd;
|
||||
Ok(Self::ActionCreatePreparedStatementRequest(query))
|
||||
Ok(Self::ActionCreatePreparedStatementRequest(decoded_cmd))
|
||||
} else if let Some(decoded_cmd) = Any::unpack::<ActionClosePreparedStatementRequest>(&msg)?
|
||||
{
|
||||
// Decode to IOx specific structure
|
||||
let ActionClosePreparedStatementRequest {
|
||||
prepared_statement_handle,
|
||||
} = decoded_cmd;
|
||||
|
@ -122,18 +155,17 @@ impl FlightSQLCommand {
|
|||
// Encode the command as a flightsql message (bytes)
|
||||
pub fn try_encode(self) -> Result<Bytes> {
|
||||
let msg = match self {
|
||||
FlightSQLCommand::CommandStatementQuery(query) => {
|
||||
Any::pack(&CommandStatementQuery { query })
|
||||
}
|
||||
FlightSQLCommand::CommandStatementQuery(cmd) => Any::pack(&cmd),
|
||||
FlightSQLCommand::CommandPreparedStatementQuery(handle) => {
|
||||
let prepared_statement_handle = handle.encode();
|
||||
Any::pack(&CommandPreparedStatementQuery {
|
||||
let cmd = CommandPreparedStatementQuery {
|
||||
prepared_statement_handle,
|
||||
})
|
||||
}
|
||||
FlightSQLCommand::ActionCreatePreparedStatementRequest(query) => {
|
||||
Any::pack(&ActionCreatePreparedStatementRequest { query })
|
||||
};
|
||||
Any::pack(&cmd)
|
||||
}
|
||||
FlightSQLCommand::CommandGetCatalogs(cmd) => Any::pack(&cmd),
|
||||
FlightSQLCommand::CommandGetDbSchemas(cmd) => Any::pack(&cmd),
|
||||
FlightSQLCommand::ActionCreatePreparedStatementRequest(cmd) => Any::pack(&cmd),
|
||||
FlightSQLCommand::ActionClosePreparedStatementRequest(handle) => {
|
||||
let prepared_statement_handle = handle.encode();
|
||||
Any::pack(&ActionClosePreparedStatementRequest {
|
||||
|
|
|
@ -1,13 +1,16 @@
|
|||
//! FlightSQL handling
|
||||
use std::sync::Arc;
|
||||
|
||||
use arrow::{error::ArrowError, ipc::writer::IpcWriteOptions};
|
||||
use arrow::{datatypes::Schema, error::ArrowError, ipc::writer::IpcWriteOptions};
|
||||
use arrow_flight::{
|
||||
sql::{ActionCreatePreparedStatementResult, Any},
|
||||
sql::{
|
||||
ActionCreatePreparedStatementRequest, ActionCreatePreparedStatementResult, Any,
|
||||
CommandGetCatalogs, CommandGetDbSchemas, CommandStatementQuery,
|
||||
},
|
||||
IpcMessage, SchemaAsIpc,
|
||||
};
|
||||
use bytes::Bytes;
|
||||
use datafusion::physical_plan::ExecutionPlan;
|
||||
use datafusion::{logical_expr::LogicalPlan, physical_plan::ExecutionPlan, scalar::ScalarValue};
|
||||
use iox_query::{exec::IOxSessionContext, QueryNamespace};
|
||||
use observability_deps::tracing::debug;
|
||||
use prost::Message;
|
||||
|
@ -34,13 +37,25 @@ impl FlightSQLPlanner {
|
|||
debug!(%namespace_name, %cmd, "Handling flightsql get_flight_info");
|
||||
|
||||
match cmd {
|
||||
FlightSQLCommand::CommandStatementQuery(query) => {
|
||||
Self::get_schema_for_query(&query, ctx).await
|
||||
FlightSQLCommand::CommandStatementQuery(CommandStatementQuery { query }) => {
|
||||
get_schema_for_query(&query, ctx).await
|
||||
}
|
||||
FlightSQLCommand::CommandPreparedStatementQuery(handle) => {
|
||||
Self::get_schema_for_query(handle.query(), ctx).await
|
||||
get_schema_for_query(handle.query(), ctx).await
|
||||
}
|
||||
_ => ProtocolSnafu {
|
||||
FlightSQLCommand::CommandGetCatalogs(CommandGetCatalogs {}) => {
|
||||
let plan = plan_get_catalogs(ctx).await?;
|
||||
get_schema_for_plan(plan)
|
||||
}
|
||||
FlightSQLCommand::CommandGetDbSchemas(CommandGetDbSchemas {
|
||||
catalog,
|
||||
db_schema_filter_pattern,
|
||||
}) => {
|
||||
let plan = plan_get_db_schemas(ctx, catalog, db_schema_filter_pattern).await?;
|
||||
get_schema_for_plan(plan)
|
||||
}
|
||||
FlightSQLCommand::ActionCreatePreparedStatementRequest(_)
|
||||
| FlightSQLCommand::ActionClosePreparedStatementRequest(_) => ProtocolSnafu {
|
||||
cmd: format!("{cmd:?}"),
|
||||
method: "GetFlightInfo",
|
||||
}
|
||||
|
@ -48,24 +63,6 @@ impl FlightSQLPlanner {
|
|||
}
|
||||
}
|
||||
|
||||
/// Return the schema for the specified query
|
||||
///
|
||||
/// returns: IPC encoded (schema_bytes) for this query
|
||||
async fn get_schema_for_query(query: &str, ctx: &IOxSessionContext) -> Result<Bytes> {
|
||||
// gather real schema, but only
|
||||
let logical_plan = ctx.plan_sql(query).await?;
|
||||
let schema = arrow::datatypes::Schema::from(logical_plan.schema().as_ref());
|
||||
let options = IpcWriteOptions::default();
|
||||
|
||||
// encode the schema into the correct form
|
||||
let message: Result<IpcMessage, ArrowError> =
|
||||
SchemaAsIpc::new(&schema, &options).try_into();
|
||||
|
||||
let IpcMessage(schema) = message?;
|
||||
|
||||
Ok(schema)
|
||||
}
|
||||
|
||||
/// Returns a plan that computes results requested in msg
|
||||
pub async fn do_get(
|
||||
namespace_name: impl Into<String>,
|
||||
|
@ -77,7 +74,7 @@ impl FlightSQLPlanner {
|
|||
debug!(%namespace_name, %cmd, "Handling flightsql do_get");
|
||||
|
||||
match cmd {
|
||||
FlightSQLCommand::CommandStatementQuery(query) => {
|
||||
FlightSQLCommand::CommandStatementQuery(CommandStatementQuery { query }) => {
|
||||
debug!(%query, "Planning FlightSQL query");
|
||||
Ok(ctx.prepare_sql(&query).await?)
|
||||
}
|
||||
|
@ -86,7 +83,25 @@ impl FlightSQLPlanner {
|
|||
debug!(%query, "Planning FlightSQL prepared query");
|
||||
Ok(ctx.prepare_sql(query).await?)
|
||||
}
|
||||
_ => ProtocolSnafu {
|
||||
FlightSQLCommand::CommandGetCatalogs(CommandGetCatalogs {}) => {
|
||||
debug!("Planning GetCatalogs query");
|
||||
let plan = plan_get_catalogs(ctx).await?;
|
||||
Ok(ctx.create_physical_plan(&plan).await?)
|
||||
}
|
||||
FlightSQLCommand::CommandGetDbSchemas(CommandGetDbSchemas {
|
||||
catalog,
|
||||
db_schema_filter_pattern,
|
||||
}) => {
|
||||
debug!(
|
||||
?catalog,
|
||||
?db_schema_filter_pattern,
|
||||
"Planning GetDbSchemas query"
|
||||
);
|
||||
let plan = plan_get_db_schemas(ctx, catalog, db_schema_filter_pattern).await?;
|
||||
Ok(ctx.create_physical_plan(&plan).await?)
|
||||
}
|
||||
FlightSQLCommand::ActionClosePreparedStatementRequest(_)
|
||||
| FlightSQLCommand::ActionCreatePreparedStatementRequest(_) => ProtocolSnafu {
|
||||
cmd: format!("{cmd:?}"),
|
||||
method: "DoGet",
|
||||
}
|
||||
|
@ -107,14 +122,16 @@ impl FlightSQLPlanner {
|
|||
debug!(%namespace_name, %cmd, "Handling flightsql do_action");
|
||||
|
||||
match cmd {
|
||||
FlightSQLCommand::ActionCreatePreparedStatementRequest(query) => {
|
||||
FlightSQLCommand::ActionCreatePreparedStatementRequest(
|
||||
ActionCreatePreparedStatementRequest { query },
|
||||
) => {
|
||||
debug!(%query, "Creating prepared statement");
|
||||
|
||||
// todo run the planner here and actually figure out parameter schemas
|
||||
// see https://github.com/apache/arrow-datafusion/pull/4701
|
||||
let parameter_schema = vec![];
|
||||
|
||||
let dataset_schema = Self::get_schema_for_query(&query, ctx).await?;
|
||||
let dataset_schema = get_schema_for_query(&query, ctx).await?;
|
||||
let handle = PreparedStatementHandle::new(query);
|
||||
|
||||
let result = ActionCreatePreparedStatementResult {
|
||||
|
@ -141,3 +158,82 @@ impl FlightSQLPlanner {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Return the schema for the specified query
|
||||
///
|
||||
/// returns: IPC encoded (schema_bytes) for this query
|
||||
async fn get_schema_for_query(query: &str, ctx: &IOxSessionContext) -> Result<Bytes> {
|
||||
get_schema_for_plan(ctx.plan_sql(query).await?)
|
||||
}
|
||||
|
||||
/// Return the schema for the specified logical plan
|
||||
///
|
||||
/// returns: IPC encoded (schema_bytes) for this query
|
||||
fn get_schema_for_plan(logical_plan: LogicalPlan) -> Result<Bytes> {
|
||||
// gather real schema, but only
|
||||
let schema = Schema::from(logical_plan.schema().as_ref());
|
||||
encode_schema(&schema)
|
||||
}
|
||||
|
||||
/// Encodes the schema IPC encoded (schema_bytes)
|
||||
fn encode_schema(schema: &Schema) -> Result<Bytes> {
|
||||
let options = IpcWriteOptions::default();
|
||||
|
||||
// encode the schema into the correct form
|
||||
let message: Result<IpcMessage, ArrowError> = SchemaAsIpc::new(schema, &options).try_into();
|
||||
|
||||
let IpcMessage(schema) = message?;
|
||||
|
||||
Ok(schema)
|
||||
}
|
||||
|
||||
/// Return a `LogicalPlan` for GetCatalogs
|
||||
///
|
||||
/// In the future this could be made more efficient by building the
|
||||
/// response directly from the IOx catalog rather than running an
|
||||
/// entire DataFusion plan.
|
||||
async fn plan_get_catalogs(ctx: &IOxSessionContext) -> Result<LogicalPlan> {
|
||||
let query = "SELECT DISTINCT table_catalog AS catalog_name FROM information_schema.tables ORDER BY table_catalog";
|
||||
Ok(ctx.plan_sql(query).await?)
|
||||
}
|
||||
|
||||
/// Return a `LogicalPlan` for GetDbSchemas
|
||||
///
|
||||
/// # Parameters
|
||||
///
|
||||
/// Definition from <https://github.com/apache/arrow/blob/44edc27e549d82db930421b0d4c76098941afd71/format/FlightSql.proto#L1156-L1173>
|
||||
///
|
||||
/// catalog: Specifies the Catalog to search for the tables.
|
||||
/// An empty string retrieves those without a catalog.
|
||||
/// If omitted the catalog name should not be used to narrow the search.
|
||||
///
|
||||
/// db_schema_filter_pattern: Specifies a filter pattern for schemas to search for.
|
||||
/// When no db_schema_filter_pattern is provided, the pattern will not be used to narrow the search.
|
||||
/// In the pattern string, two special characters can be used to denote matching rules:
|
||||
/// - "%" means to match any substring with 0 or more characters.
|
||||
/// - "_" means to match any one character.
|
||||
///
|
||||
async fn plan_get_db_schemas(
|
||||
ctx: &IOxSessionContext,
|
||||
catalog: Option<String>,
|
||||
db_schema_filter_pattern: Option<String>,
|
||||
) -> Result<LogicalPlan> {
|
||||
// use '%' to match anything if filters are not specified
|
||||
let catalog = catalog.unwrap_or_else(|| String::from("%"));
|
||||
let db_schema_filter_pattern = db_schema_filter_pattern.unwrap_or_else(|| String::from("%"));
|
||||
|
||||
let query = "PREPARE my_plan(VARCHAR, VARCHAR) AS \
|
||||
SELECT DISTINCT table_catalog AS catalog_name, table_schema AS db_schema_name \
|
||||
FROM information_schema.tables \
|
||||
WHERE table_catalog like $1 AND table_schema like $2 \
|
||||
ORDER BY table_catalog, table_schema";
|
||||
|
||||
let params = vec![
|
||||
ScalarValue::Utf8(Some(catalog)),
|
||||
ScalarValue::Utf8(Some(db_schema_filter_pattern)),
|
||||
];
|
||||
|
||||
let plan = ctx.plan_sql(query).await?;
|
||||
debug!(?plan, "Prepared plan is");
|
||||
Ok(plan.with_param_values(params)?)
|
||||
}
|
||||
|
|
|
@ -83,13 +83,14 @@ workspace-hack = { version = "0.1", path = "../workspace-hack" }
|
|||
[dev-dependencies]
|
||||
# In alphabetical order
|
||||
arrow_util = { path = "../arrow_util" }
|
||||
assert_cmd = "2.0.8"
|
||||
assert_cmd = "2.0.9"
|
||||
async-trait = "0.1"
|
||||
predicate = { path = "../predicate" }
|
||||
predicates = "2.1.0"
|
||||
predicates = "3.0.1"
|
||||
serde = "1.0.156"
|
||||
test_helpers = { path = "../test_helpers", features = ["future_timeout"] }
|
||||
test_helpers_end_to_end = { path = "../test_helpers_end_to_end" }
|
||||
insta = { version = "1", features = ["yaml"] }
|
||||
|
||||
[features]
|
||||
default = ["jemalloc_replacing_malloc"]
|
||||
|
|
|
@ -1,6 +1,8 @@
|
|||
use std::path::PathBuf;
|
||||
|
||||
use arrow_util::assert_batches_sorted_eq;
|
||||
use arrow::record_batch::RecordBatch;
|
||||
use arrow_flight::decode::FlightRecordBatchStream;
|
||||
use arrow_util::test_util::batches_to_sorted_lines;
|
||||
use assert_cmd::Command;
|
||||
use datafusion::common::assert_contains;
|
||||
use futures::{FutureExt, TryStreamExt};
|
||||
|
@ -28,33 +30,22 @@ async fn flightsql_adhoc_query() {
|
|||
Step::Custom(Box::new(move |state: &mut StepTestState| {
|
||||
async move {
|
||||
let sql = format!("select * from {table_name}");
|
||||
let expected = vec![
|
||||
"+------+------+--------------------------------+-----+",
|
||||
"| tag1 | tag2 | time | val |",
|
||||
"+------+------+--------------------------------+-----+",
|
||||
"| A | B | 1970-01-01T00:00:00.000123456Z | 42 |",
|
||||
"| A | C | 1970-01-01T00:00:00.000123457Z | 43 |",
|
||||
"+------+------+--------------------------------+-----+",
|
||||
];
|
||||
let mut client = flightsql_client(state.cluster());
|
||||
|
||||
let connection = state.cluster().querier().querier_grpc_connection();
|
||||
let (channel, _headers) = connection.into_grpc_connection().into_parts();
|
||||
|
||||
let mut client = FlightSqlClient::new(channel);
|
||||
|
||||
// Add namespace to client headers until it is fully supported by FlightSQL
|
||||
let namespace = state.cluster().namespace();
|
||||
client.add_header("iox-namespace-name", namespace).unwrap();
|
||||
|
||||
let batches: Vec<_> = client
|
||||
.query(sql)
|
||||
.await
|
||||
.expect("ran SQL query")
|
||||
.try_collect()
|
||||
.await
|
||||
.expect("got batches");
|
||||
|
||||
assert_batches_sorted_eq!(&expected, &batches);
|
||||
let stream = client.query(sql).await.unwrap();
|
||||
let batches = collect_stream(stream).await;
|
||||
insta::assert_yaml_snapshot!(
|
||||
batches_to_sorted_lines(&batches),
|
||||
@r###"
|
||||
---
|
||||
- +------+------+--------------------------------+-----+
|
||||
- "| tag1 | tag2 | time | val |"
|
||||
- +------+------+--------------------------------+-----+
|
||||
- "| A | B | 1970-01-01T00:00:00.000123456Z | 42 |"
|
||||
- "| A | C | 1970-01-01T00:00:00.000123457Z | 43 |"
|
||||
- +------+------+--------------------------------+-----+
|
||||
"###
|
||||
);
|
||||
}
|
||||
.boxed()
|
||||
})),
|
||||
|
@ -84,14 +75,7 @@ async fn flightsql_adhoc_query_error() {
|
|||
async move {
|
||||
let sql = String::from("select * from incorrect_table");
|
||||
|
||||
let connection = state.cluster().querier().querier_grpc_connection();
|
||||
let (channel, _headers) = connection.into_grpc_connection().into_parts();
|
||||
|
||||
let mut client = FlightSqlClient::new(channel);
|
||||
|
||||
// Add namespace to client headers until it is fully supported by FlightSQL
|
||||
let namespace = state.cluster().namespace();
|
||||
client.add_header("iox-namespace-name", namespace).unwrap();
|
||||
let mut client = flightsql_client(state.cluster());
|
||||
|
||||
let err = client.query(sql).await.unwrap_err();
|
||||
|
||||
|
@ -129,35 +113,193 @@ async fn flightsql_prepared_query() {
|
|||
Step::Custom(Box::new(move |state: &mut StepTestState| {
|
||||
async move {
|
||||
let sql = format!("select * from {table_name}");
|
||||
let expected = vec![
|
||||
"+------+------+--------------------------------+-----+",
|
||||
"| tag1 | tag2 | time | val |",
|
||||
"+------+------+--------------------------------+-----+",
|
||||
"| A | B | 1970-01-01T00:00:00.000123456Z | 42 |",
|
||||
"| A | C | 1970-01-01T00:00:00.000123457Z | 43 |",
|
||||
"+------+------+--------------------------------+-----+",
|
||||
];
|
||||
|
||||
let connection = state.cluster().querier().querier_grpc_connection();
|
||||
let (channel, _headers) = connection.into_grpc_connection().into_parts();
|
||||
|
||||
let mut client = FlightSqlClient::new(channel);
|
||||
|
||||
// Add namespace to client headers until it is fully supported by FlightSQL
|
||||
let namespace = state.cluster().namespace();
|
||||
client.add_header("iox-namespace-name", namespace).unwrap();
|
||||
let mut client = flightsql_client(state.cluster());
|
||||
|
||||
let handle = client.prepare(sql).await.unwrap();
|
||||
let stream = client.execute(handle).await.unwrap();
|
||||
|
||||
let batches: Vec<_> = client
|
||||
.execute(handle)
|
||||
.await
|
||||
.expect("ran SQL query")
|
||||
.try_collect()
|
||||
.await
|
||||
.expect("got batches");
|
||||
let batches = collect_stream(stream).await;
|
||||
insta::assert_yaml_snapshot!(
|
||||
batches_to_sorted_lines(&batches),
|
||||
@r###"
|
||||
---
|
||||
- +------+------+--------------------------------+-----+
|
||||
- "| tag1 | tag2 | time | val |"
|
||||
- +------+------+--------------------------------+-----+
|
||||
- "| A | B | 1970-01-01T00:00:00.000123456Z | 42 |"
|
||||
- "| A | C | 1970-01-01T00:00:00.000123457Z | 43 |"
|
||||
- +------+------+--------------------------------+-----+
|
||||
"###
|
||||
);
|
||||
}
|
||||
.boxed()
|
||||
})),
|
||||
],
|
||||
)
|
||||
.run()
|
||||
.await
|
||||
}
|
||||
|
||||
assert_batches_sorted_eq!(&expected, &batches);
|
||||
#[tokio::test]
|
||||
async fn flightsql_get_catalogs() {
|
||||
test_helpers::maybe_start_logging();
|
||||
let database_url = maybe_skip_integration!();
|
||||
|
||||
let table_name = "the_table";
|
||||
|
||||
// Set up the cluster ====================================
|
||||
let mut cluster = MiniCluster::create_shared2(database_url).await;
|
||||
|
||||
StepTest::new(
|
||||
&mut cluster,
|
||||
vec![
|
||||
Step::WriteLineProtocol(format!(
|
||||
"{table_name},tag1=A,tag2=B val=42i 123456\n\
|
||||
{table_name},tag1=A,tag2=C val=43i 123457"
|
||||
)),
|
||||
Step::Custom(Box::new(move |state: &mut StepTestState| {
|
||||
async move {
|
||||
let mut client = flightsql_client(state.cluster());
|
||||
|
||||
let stream = client.get_catalogs().await.unwrap();
|
||||
let batches = collect_stream(stream).await;
|
||||
|
||||
insta::assert_yaml_snapshot!(
|
||||
batches_to_sorted_lines(&batches),
|
||||
@r###"
|
||||
---
|
||||
- +--------------+
|
||||
- "| catalog_name |"
|
||||
- +--------------+
|
||||
- "| public |"
|
||||
- +--------------+
|
||||
"###
|
||||
);
|
||||
}
|
||||
.boxed()
|
||||
})),
|
||||
],
|
||||
)
|
||||
.run()
|
||||
.await
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn flightsql_get_db_schemas() {
|
||||
test_helpers::maybe_start_logging();
|
||||
let database_url = maybe_skip_integration!();
|
||||
|
||||
let table_name = "the_table";
|
||||
|
||||
// Set up the cluster ====================================
|
||||
let mut cluster = MiniCluster::create_shared2(database_url).await;
|
||||
|
||||
StepTest::new(
|
||||
&mut cluster,
|
||||
vec![
|
||||
Step::WriteLineProtocol(format!(
|
||||
"{table_name},tag1=A,tag2=B val=42i 123456\n\
|
||||
{table_name},tag1=A,tag2=C val=43i 123457"
|
||||
)),
|
||||
Step::Custom(Box::new(move |state: &mut StepTestState| {
|
||||
async move {
|
||||
struct TestCase {
|
||||
catalog: Option<&'static str>,
|
||||
db_schema_filter_pattern: Option<&'static str>,
|
||||
}
|
||||
let cases = [
|
||||
TestCase {
|
||||
catalog: None,
|
||||
db_schema_filter_pattern: None,
|
||||
},
|
||||
TestCase {
|
||||
// pub <> public
|
||||
catalog: Some("pub"),
|
||||
db_schema_filter_pattern: None,
|
||||
},
|
||||
TestCase {
|
||||
// pub% should match all
|
||||
catalog: Some("pub%"),
|
||||
db_schema_filter_pattern: None,
|
||||
},
|
||||
TestCase {
|
||||
catalog: None,
|
||||
db_schema_filter_pattern: Some("%for%"),
|
||||
},
|
||||
TestCase {
|
||||
catalog: Some("public"),
|
||||
db_schema_filter_pattern: Some("iox"),
|
||||
},
|
||||
];
|
||||
|
||||
let mut client = flightsql_client(state.cluster());
|
||||
|
||||
let mut output = vec![];
|
||||
for case in cases {
|
||||
let TestCase {
|
||||
catalog,
|
||||
db_schema_filter_pattern,
|
||||
} = case;
|
||||
output.push(format!("catalog:{catalog:?}"));
|
||||
output.push(format!(
|
||||
"db_schema_filter_pattern:{db_schema_filter_pattern:?}"
|
||||
));
|
||||
output.push("*********************".into());
|
||||
|
||||
let stream = client
|
||||
.get_db_schemas(catalog, db_schema_filter_pattern)
|
||||
.await
|
||||
.unwrap();
|
||||
let batches = collect_stream(stream).await;
|
||||
output.extend(batches_to_sorted_lines(&batches))
|
||||
}
|
||||
insta::assert_yaml_snapshot!(
|
||||
output,
|
||||
@r###"
|
||||
---
|
||||
- "catalog:None"
|
||||
- "db_schema_filter_pattern:None"
|
||||
- "*********************"
|
||||
- +--------------+--------------------+
|
||||
- "| catalog_name | db_schema_name |"
|
||||
- +--------------+--------------------+
|
||||
- "| public | information_schema |"
|
||||
- "| public | iox |"
|
||||
- "| public | system |"
|
||||
- +--------------+--------------------+
|
||||
- "catalog:Some(\"pub\")"
|
||||
- "db_schema_filter_pattern:None"
|
||||
- "*********************"
|
||||
- ++
|
||||
- ++
|
||||
- "catalog:Some(\"pub%\")"
|
||||
- "db_schema_filter_pattern:None"
|
||||
- "*********************"
|
||||
- +--------------+--------------------+
|
||||
- "| catalog_name | db_schema_name |"
|
||||
- +--------------+--------------------+
|
||||
- "| public | information_schema |"
|
||||
- "| public | iox |"
|
||||
- "| public | system |"
|
||||
- +--------------+--------------------+
|
||||
- "catalog:None"
|
||||
- "db_schema_filter_pattern:Some(\"%for%\")"
|
||||
- "*********************"
|
||||
- +--------------+--------------------+
|
||||
- "| catalog_name | db_schema_name |"
|
||||
- +--------------+--------------------+
|
||||
- "| public | information_schema |"
|
||||
- +--------------+--------------------+
|
||||
- "catalog:Some(\"public\")"
|
||||
- "db_schema_filter_pattern:Some(\"iox\")"
|
||||
- "*********************"
|
||||
- +--------------+----------------+
|
||||
- "| catalog_name | db_schema_name |"
|
||||
- +--------------+----------------+
|
||||
- "| public | iox |"
|
||||
- +--------------+----------------+
|
||||
"###
|
||||
);
|
||||
}
|
||||
.boxed()
|
||||
})),
|
||||
|
@ -243,6 +385,32 @@ async fn flightsql_jdbc() {
|
|||
.success()
|
||||
.stdout(predicate::str::contains("Running Prepared SQL Query"))
|
||||
.stdout(predicate::str::contains("A, B"));
|
||||
|
||||
// CommandGetCatalogs output
|
||||
let expected_catalogs = "**************\n\
|
||||
Catalogs:\n\
|
||||
**************\n\
|
||||
TABLE_CAT\n\
|
||||
------------\n\
|
||||
public";
|
||||
|
||||
let expected_schemas = "**************\n\
|
||||
Schemas:\n\
|
||||
**************\n\
|
||||
TABLE_SCHEM, TABLE_CATALOG\n\
|
||||
------------\n\
|
||||
information_schema, public\n\
|
||||
iox, public\n\
|
||||
system, public";
|
||||
|
||||
// Validate metadata: jdbc_client <url> metadata
|
||||
Command::from_std(std::process::Command::new(&path))
|
||||
.arg(&jdbc_url)
|
||||
.arg("metadata")
|
||||
.assert()
|
||||
.success()
|
||||
.stdout(predicate::str::contains(expected_catalogs))
|
||||
.stdout(predicate::str::contains(expected_schemas));
|
||||
}
|
||||
.boxed()
|
||||
})),
|
||||
|
@ -251,3 +419,21 @@ async fn flightsql_jdbc() {
|
|||
.run()
|
||||
.await
|
||||
}
|
||||
|
||||
/// Return a [`FlightSqlClient`] configured for use
|
||||
fn flightsql_client(cluster: &MiniCluster) -> FlightSqlClient {
|
||||
let connection = cluster.querier().querier_grpc_connection();
|
||||
let (channel, _headers) = connection.into_grpc_connection().into_parts();
|
||||
|
||||
let mut client = FlightSqlClient::new(channel);
|
||||
|
||||
// Add namespace to client headers until it is fully supported by FlightSQL
|
||||
let namespace = cluster.namespace();
|
||||
client.add_header("iox-namespace-name", namespace).unwrap();
|
||||
|
||||
client
|
||||
}
|
||||
|
||||
async fn collect_stream(stream: FlightRecordBatchStream) -> Vec<RecordBatch> {
|
||||
stream.try_collect().await.expect("collecting batches")
|
||||
}
|
||||
|
|
|
@ -19,7 +19,7 @@ public class Main {
|
|||
System.err.println("# Run specified prepared query without parameters");
|
||||
System.err.println("jdbc_client <url> prepared_query <sql>");
|
||||
System.err.println("# Run metadata tests");
|
||||
System.err.println("jdbc_client <url> props");
|
||||
System.err.println("jdbc_client <url> metadata");
|
||||
System.exit(1);
|
||||
}
|
||||
|
||||
|
@ -63,9 +63,9 @@ public class Main {
|
|||
}
|
||||
break;
|
||||
|
||||
case "props":
|
||||
case "metadata":
|
||||
{
|
||||
run_props(url);
|
||||
run_metadata(url);
|
||||
}
|
||||
break;
|
||||
|
||||
|
@ -115,17 +115,31 @@ public class Main {
|
|||
print_result_set(rs);
|
||||
}
|
||||
|
||||
static void run_props(String url) throws SQLException {
|
||||
static void run_metadata(String url) throws SQLException {
|
||||
Connection conn = connect(url);
|
||||
System.out.println(conn.getCatalog());
|
||||
DatabaseMetaData md = conn.getMetaData();
|
||||
System.out.println("isReadOnly: " + md.isReadOnly());
|
||||
System.out.println("getSearchStringEscape: " + md.getSearchStringEscape());
|
||||
System.out.println("getDriverVersion: " + md.getDriverVersion());
|
||||
System.out.println("getDatabaseProductVersion: " + md.getDatabaseProductVersion());
|
||||
System.out.println("getJDBCMajorVersion: " + md.getJDBCMajorVersion());
|
||||
System.out.println("getJDBCMinorVersion: " + md.getJDBCMinorVersion());
|
||||
System.out.println("getDriverName: " + md.getDriverName());
|
||||
// Note yet implemented
|
||||
// (see https://github.com/influxdata/influxdb_iox/issues/7210 )
|
||||
|
||||
System.out.println("**************");
|
||||
System.out.println("Catalogs:");
|
||||
System.out.println("**************");
|
||||
print_result_set(md.getCatalogs());
|
||||
|
||||
System.out.println("**************");
|
||||
System.out.println("Schemas:");
|
||||
System.out.println("**************");
|
||||
print_result_set(md.getSchemas());
|
||||
|
||||
//System.out.println("isReadOnly: " + md.isReadOnly());
|
||||
//System.out.println("getSearchStringEscape: " + md.getSearchStringEscape());
|
||||
//System.out.println("getDriverVersion: " + md.getDriverVersion());
|
||||
//System.out.println("getDatabaseProductVersion: " + md.getDatabaseProductVersion());
|
||||
//System.out.println("getJDBCMajorVersion: " + md.getJDBCMajorVersion());
|
||||
//System.out.println("getJDBCMinorVersion: " + md.getJDBCMinorVersion());
|
||||
//System.out.println("getDriverName: " + md.getDriverName());
|
||||
|
||||
}
|
||||
|
||||
// Print out the ResultSet in a whitespace delimited form
|
||||
|
|
|
@ -3,9 +3,9 @@
|
|||
+------+------+----------------------+
|
||||
| host | load | time |
|
||||
+------+------+----------------------+
|
||||
| a | 1.0 | 2022-01-01T01:00:00Z |
|
||||
| b | 2.0 | 2022-01-01T01:00:00Z |
|
||||
| bb | 21.0 | 2022-01-01T01:00:00Z |
|
||||
| a | 1.0 | 2022-01-01T11:00:00Z |
|
||||
| b | 2.0 | 2022-01-01T11:00:00Z |
|
||||
| bb | 21.0 | 2022-01-01T11:00:00Z |
|
||||
+------+------+----------------------+
|
||||
-- SQL: EXPLAIN SELECT * FROM cpu order by host, load, time;
|
||||
-- Results After Normalizing UUIDs
|
||||
|
@ -31,8 +31,8 @@
|
|||
+------+------+----------------------+
|
||||
| host | load | time |
|
||||
+------+------+----------------------+
|
||||
| a | 1.0 | 2022-01-01T01:00:00Z |
|
||||
| bb | 21.0 | 2022-01-01T01:00:00Z |
|
||||
| a | 1.0 | 2022-01-01T11:00:00Z |
|
||||
| bb | 21.0 | 2022-01-01T11:00:00Z |
|
||||
+------+------+----------------------+
|
||||
-- SQL: EXPLAIN SELECT * FROM cpu WHERE host != 'b' ORDER BY host,time;
|
||||
-- Results After Normalizing UUIDs
|
||||
|
|
|
@ -1291,7 +1291,12 @@ impl RetentionSetup {
|
|||
let retention_period_1_hour_ns = 3600 * 1_000_000_000;
|
||||
|
||||
// Data is relative to this particular time stamp
|
||||
let cutoff = Time::from_rfc3339("2022-01-01T00:00:00+00:00")
|
||||
//
|
||||
// Use a cutoff date that is NOT at the start of the partition so that `lp_partially_inside` only spans a single
|
||||
// partition, not two. This is important because otherwise this will result in two chunks / files, not one.
|
||||
// However a partial inside/outside chunk is important for the query tests so that we can proof that it is not
|
||||
// sufficient to prune the chunks solely on statistics but that there needs to be an actual row-wise filter.
|
||||
let cutoff = Time::from_rfc3339("2022-01-01T10:00:00+00:00")
|
||||
.unwrap()
|
||||
.timestamp_nanos();
|
||||
// Timestamp 1 hour later than the cutoff, so the data will be retained for 1 hour
|
||||
|
|
|
@ -10,8 +10,7 @@ async fn schema_merge_nonexistent_column() {
|
|||
setup_name: "MultiChunkSchemaMerge",
|
||||
sql: "SELECT * from cpu where foo = 8",
|
||||
expected_error_code: tonic::Code::InvalidArgument,
|
||||
expected_message: "Error while planning query: Schema error: No field named 'foo'. \
|
||||
Valid fields are 'cpu'.'host', 'cpu'.'region', 'cpu'.'system', 'cpu'.'time', 'cpu'.'user'.",
|
||||
expected_message: r#"Error while planning query: Schema error: No field named "foo". Valid fields are "cpu"."host", "cpu"."region", "cpu"."system", "cpu"."time", "cpu"."user"."#,
|
||||
}
|
||||
.run()
|
||||
.await;
|
||||
|
|
|
@ -29,7 +29,8 @@ use arrow_flight::{
|
|||
error::{FlightError, Result},
|
||||
sql::{
|
||||
ActionCreatePreparedStatementRequest, ActionCreatePreparedStatementResult, Any,
|
||||
CommandPreparedStatementQuery, CommandStatementQuery, ProstMessageExt,
|
||||
CommandGetCatalogs, CommandGetDbSchemas, CommandPreparedStatementQuery,
|
||||
CommandStatementQuery, ProstMessageExt,
|
||||
},
|
||||
Action, FlightClient, FlightDescriptor, FlightInfo, IpcMessage, Ticket,
|
||||
};
|
||||
|
@ -124,7 +125,54 @@ impl FlightSqlClient {
|
|||
self.do_get_with_cmd(msg.as_any()).await
|
||||
}
|
||||
|
||||
async fn do_get_with_cmd(
|
||||
/// List the catalogs on this server using a [`CommandGetCatalogs`] message.
|
||||
///
|
||||
/// This implementation does not support alternate endpoints
|
||||
///
|
||||
/// [`CommandGetCatalogs`]: https://github.com/apache/arrow/blob/3a6fc1f9eedd41df2d8ffbcbdfbdab911ff6d82e/format/FlightSql.proto#L1125-L1140
|
||||
pub async fn get_catalogs(&mut self) -> Result<FlightRecordBatchStream> {
|
||||
let msg = CommandGetCatalogs {};
|
||||
self.do_get_with_cmd(msg.as_any()).await
|
||||
}
|
||||
|
||||
/// List the schemas on this server
|
||||
///
|
||||
/// # Parameters
|
||||
///
|
||||
/// Definition from <https://github.com/apache/arrow/blob/44edc27e549d82db930421b0d4c76098941afd71/format/FlightSql.proto#L1156-L1173>
|
||||
///
|
||||
/// catalog: Specifies the Catalog to search for the tables.
|
||||
/// An empty string retrieves those without a catalog.
|
||||
/// If omitted the catalog name should not be used to narrow the search.
|
||||
///
|
||||
/// db_schema_filter_pattern: Specifies a filter pattern for schemas to search for.
|
||||
/// When no db_schema_filter_pattern is provided, the pattern will not be used to narrow the search.
|
||||
/// In the pattern string, two special characters can be used to denote matching rules:
|
||||
/// - "%" means to match any substring with 0 or more characters.
|
||||
/// - "_" means to match any one character.
|
||||
///
|
||||
/// This implementation does not support alternate endpoints
|
||||
pub async fn get_db_schemas(
|
||||
&mut self,
|
||||
catalog: Option<impl Into<String> + Send>,
|
||||
db_schema_filter_pattern: Option<impl Into<String> + Send>,
|
||||
) -> Result<FlightRecordBatchStream> {
|
||||
let msg = CommandGetDbSchemas {
|
||||
catalog: catalog.map(|s| s.into()),
|
||||
db_schema_filter_pattern: db_schema_filter_pattern.map(|s| s.into()),
|
||||
};
|
||||
self.do_get_with_cmd(msg.as_any()).await
|
||||
}
|
||||
|
||||
/// Implements the canonical interaction for most FlightSQL messages:
|
||||
///
|
||||
/// 1. Call `GetFlightInfo` with the provided message, and get a
|
||||
/// [`FlightInfo`] and embedded ticket.
|
||||
///
|
||||
/// 2. Call `DoGet` with the provided ticket.
|
||||
///
|
||||
/// TODO: example calling with GetDbSchemas
|
||||
pub async fn do_get_with_cmd(
|
||||
&mut self,
|
||||
cmd: arrow_flight::sql::Any,
|
||||
) -> Result<FlightRecordBatchStream> {
|
||||
|
|
|
@ -22,7 +22,7 @@ pub(crate) trait PersistCompletionObserver: Send + Sync + Debug {
|
|||
}
|
||||
|
||||
/// A set of details describing the persisted data.
|
||||
#[derive(Debug, Clone)]
|
||||
#[derive(Debug)]
|
||||
pub struct CompletedPersist {
|
||||
/// The catalog identifiers for the persisted partition.
|
||||
namespace_id: NamespaceId,
|
||||
|
|
|
@ -0,0 +1,340 @@
|
|||
use std::sync::Arc;
|
||||
|
||||
use data_types::{
|
||||
sequence_number_set::{self, SequenceNumberSet},
|
||||
SequenceNumber,
|
||||
};
|
||||
use hashbrown::HashMap;
|
||||
use metric::U64Gauge;
|
||||
use observability_deps::tracing::{debug, info};
|
||||
use tokio::{select, sync::mpsc};
|
||||
use wal::SegmentId;
|
||||
|
||||
use crate::{
|
||||
persist::completion_observer::CompletedPersist, wal::reference_tracker::WalFileDeleter,
|
||||
};
|
||||
|
||||
/// A WAL file reference-count tracker.
|
||||
///
|
||||
/// See [`WalReferenceHandle`].
|
||||
///
|
||||
/// [`WalReferenceHandle`]: super::WalReferenceHandle
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct WalReferenceActor<T = Arc<wal::Wal>> {
|
||||
wal: T,
|
||||
|
||||
/// The set of IDs of persisted data that do not yet appear in
|
||||
/// `wal_files`, the set of WAL files rotated out of active use. This is
|
||||
/// an intermediate buffer necessary to tolerate out-of-order persist
|
||||
/// notifications w.r.t file notifications.
|
||||
///
|
||||
/// IDs that appear in this set are most likely part of the active WAL
|
||||
/// segment file and should be reconciled when it rotates.
|
||||
persisted: SequenceNumberSet,
|
||||
|
||||
/// The set of closed WAL segment files, and the set of unpersisted
|
||||
/// [`SequenceNumber`] they contain.
|
||||
///
|
||||
/// These [`SequenceNumberSet`] are slowly drained / have IDs removed in
|
||||
/// response to persisted data notifications. Once the set is of length 0,
|
||||
/// the file can be deleted as all the entries the file contains has been
|
||||
/// persisted.
|
||||
///
|
||||
/// Invariant: sets in this map are always non-empty.
|
||||
wal_files: HashMap<wal::SegmentId, SequenceNumberSet>,
|
||||
|
||||
/// Channels for input from the [`WalReferenceHandle`].
|
||||
///
|
||||
/// [`WalReferenceHandle`]: super::WalReferenceHandle
|
||||
file_rx: mpsc::Receiver<(SegmentId, SequenceNumberSet)>,
|
||||
persist_rx: mpsc::Receiver<Arc<CompletedPersist>>,
|
||||
unbuffered_rx: mpsc::Receiver<SequenceNumber>,
|
||||
|
||||
/// A metric tracking the number of rotated WAL files being reference
|
||||
/// tracked.
|
||||
num_files: U64Gauge,
|
||||
/// The minimum [`SegmentId`] in `wal_files`, the set of old (rotated out)
|
||||
/// files that will eventually be deleted.
|
||||
///
|
||||
/// If this value never changes over the lifetime of an ingester, it is an
|
||||
/// indication of a reference leak bug, causing a WAL file to never be
|
||||
/// deleted.
|
||||
min_id: U64Gauge,
|
||||
/// The number of references to unpersisted operations remaining in the old
|
||||
/// (rotated out) WAL files, decreasing as persistence completes, and
|
||||
/// increasing as non-empty WAL files are rotated into `wal_files`.
|
||||
referenced_ops: U64Gauge,
|
||||
}
|
||||
|
||||
impl<T> WalReferenceActor<T>
|
||||
where
|
||||
T: WalFileDeleter,
|
||||
{
|
||||
pub(super) fn new(
|
||||
wal: T,
|
||||
file_rx: mpsc::Receiver<(SegmentId, SequenceNumberSet)>,
|
||||
persist_rx: mpsc::Receiver<Arc<CompletedPersist>>,
|
||||
unbuffered_rx: mpsc::Receiver<SequenceNumber>,
|
||||
metrics: &metric::Registry,
|
||||
) -> Self {
|
||||
let num_files = metrics
|
||||
.register_metric::<U64Gauge>(
|
||||
"ingester_wal_inactive_file_count",
|
||||
"number of WAL files that are not being actively wrote to, but contain unpersisted data"
|
||||
)
|
||||
.recorder(&[]);
|
||||
|
||||
let min_id = metrics
|
||||
.register_metric::<U64Gauge>(
|
||||
"ingester_wal_inactive_min_id",
|
||||
"the segment ID of the oldest inactive wal file",
|
||||
)
|
||||
.recorder(&[]);
|
||||
|
||||
let referenced_ops = metrics
|
||||
.register_metric::<U64Gauge>(
|
||||
"ingester_wal_inactive_file_op_reference_count",
|
||||
"the number of unpersisted operations referenced in inactive WAL files",
|
||||
)
|
||||
.recorder(&[]);
|
||||
|
||||
Self {
|
||||
wal,
|
||||
persisted: SequenceNumberSet::default(),
|
||||
wal_files: HashMap::with_capacity(3),
|
||||
file_rx,
|
||||
persist_rx,
|
||||
unbuffered_rx,
|
||||
num_files,
|
||||
min_id,
|
||||
referenced_ops,
|
||||
}
|
||||
}
|
||||
|
||||
/// Execute the actor task.
|
||||
///
|
||||
/// This task exits once the sender side of the input channels have been
|
||||
/// dropped.
|
||||
pub(crate) async fn run(mut self) {
|
||||
loop {
|
||||
select! {
|
||||
// Prefer polling the channels in the specified order.
|
||||
//
|
||||
// By consuming file_rx first, there's a greater chance that
|
||||
// subsequent persist/ignore events can be applied directly to
|
||||
// the file sets, rather than having to wait in the intermediate
|
||||
// "persisted" set, reducing memory utilisation.
|
||||
biased;
|
||||
|
||||
Some((id, f)) = self.file_rx.recv() => self.handle_new_file(id, f).await,
|
||||
Some(p) = self.persist_rx.recv() => self.handle_persisted(p).await,
|
||||
Some(i) = self.unbuffered_rx.recv() => self.handle_unbuffered(i).await,
|
||||
else => break
|
||||
}
|
||||
|
||||
// After each action is processed, update the metrics.
|
||||
self.update_metrics();
|
||||
}
|
||||
|
||||
debug!("stopping wal reference counter task");
|
||||
}
|
||||
|
||||
/// Update the metrics to match the internal state.
|
||||
fn update_metrics(&self) {
|
||||
let num_files = self.wal_files.len();
|
||||
|
||||
// Build a set of (id, set_len) tuples for debug logging.
|
||||
let id_lens = self
|
||||
.wal_files
|
||||
.iter()
|
||||
.map(|(id, set)| (*id, set.len()))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
// Emit a log for debugging purposes, showing the current state.
|
||||
debug!(
|
||||
num_files,
|
||||
files=?id_lens,
|
||||
persisted_set_len=self.persisted.len(),
|
||||
"updated reference state"
|
||||
);
|
||||
|
||||
// Reduce (id, set_len) tuples to the min ID and sum of the set lengths,
|
||||
// defaulting to 0 for the length and u64::MAX for the ID if the file
|
||||
// set is empty.
|
||||
let (min_id, referenced_ops) =
|
||||
id_lens
|
||||
.into_iter()
|
||||
.fold((u64::MAX, 0), |(id_min, len_sum), e| {
|
||||
assert!(e.1 > 0); // Invariant: sets in file map are never empty
|
||||
(id_min.min(e.0.get()), len_sum + e.1)
|
||||
});
|
||||
|
||||
// And update the various exported metrics.
|
||||
self.num_files.set(num_files as _);
|
||||
self.min_id.set(min_id);
|
||||
self.referenced_ops.set(referenced_ops);
|
||||
}
|
||||
|
||||
/// Track a newly rotated WAL segment, with the given [`SegmentId`] and
|
||||
/// containing the operations specified in [`SequenceNumberSet`].
|
||||
///
|
||||
/// This method tolerates an empty `set`.
|
||||
async fn handle_new_file(&mut self, segment_id: SegmentId, mut set: SequenceNumberSet) {
|
||||
debug!(
|
||||
%segment_id,
|
||||
sequence_number_set = ?set,
|
||||
"notified of new segment file"
|
||||
);
|
||||
|
||||
// Clear the overlap between the "persisted" set, and this new file from
|
||||
// both.
|
||||
let n = clear_intersection(&mut self.persisted, &mut set);
|
||||
if n > 0 {
|
||||
debug!(n, "released previously persisted IDs");
|
||||
}
|
||||
|
||||
// If the file set is now completely empty, it can be immediately
|
||||
// deleted.
|
||||
if set.is_empty() {
|
||||
debug!(n, "immediately dropping empty segment file");
|
||||
return delete_file(&self.wal, segment_id).await;
|
||||
}
|
||||
|
||||
// Otherwise, retain this file for later persist notifications.
|
||||
//
|
||||
// Run-optimise the bitmap to minimise memory utilisation of this set.
|
||||
// This is a relatively fast operation, and the file sets are expected
|
||||
// to be highly suitable for RLE compression due to the monotonic
|
||||
// sequence number assignments.
|
||||
set.run_optimise();
|
||||
|
||||
// Insert the file set into the files being tracked
|
||||
assert!(!set.is_empty()); // Invariant: sets in file map are never empty
|
||||
assert!(
|
||||
self.wal_files.insert(segment_id, set).is_none(),
|
||||
"duplicate segment ID"
|
||||
);
|
||||
}
|
||||
|
||||
/// Process a persistence completion notification, decreasing the reference
|
||||
/// counts against tracked WAL files, and holding any remaining IDs (in the
|
||||
/// untracked active WAL segment) in a temporary "persisted" buffer.
|
||||
async fn handle_persisted(&mut self, note: Arc<CompletedPersist>) {
|
||||
debug!(
|
||||
namespace_id = %note.namespace_id(),
|
||||
table_id = %note.table_id(),
|
||||
partition_id = %note.partition_id(),
|
||||
sequence_number_set = ?note.sequence_numbers(),
|
||||
"notified of persisted data"
|
||||
);
|
||||
|
||||
self.remove(note.owned_sequence_numbers()).await;
|
||||
}
|
||||
|
||||
/// Handle a write that has been added to the WAL, but that did not complete
|
||||
/// / buffer.
|
||||
///
|
||||
/// Because the write was added to the WAL, its ID will be part of the WAL
|
||||
/// file's [`SequenceNumberSet`], but because the write was not buffered, it
|
||||
/// will never be persisted and therefore the WAL set will always have an
|
||||
/// outstanding reference unless it is accounted for here.
|
||||
async fn handle_unbuffered(&mut self, id: SequenceNumber) {
|
||||
debug!(sequence_number = id.get(), "notified of unbuffered write");
|
||||
|
||||
// Delegate to the same code as persisted by presenting this ID as a set
|
||||
// - the same behaviour is required.
|
||||
let mut set = SequenceNumberSet::with_capacity(1);
|
||||
set.add(id);
|
||||
|
||||
self.remove(set).await;
|
||||
}
|
||||
|
||||
/// Remove the intersection of `set` from all the sets in `self` (file sets,
|
||||
/// and the untracked / "persisted" buffer set).
|
||||
///
|
||||
/// Deletes all WAL files that are no longer referenced / have unpersisted
|
||||
/// entries.
|
||||
async fn remove(&mut self, mut set: SequenceNumberSet) {
|
||||
// First remove this set from the "persisted" / file-less set.
|
||||
let n = clear_intersection(&mut set, &mut self.persisted);
|
||||
if n > 0 {
|
||||
debug!(n, "released previously persisted IDs");
|
||||
}
|
||||
|
||||
if set.is_empty() {
|
||||
debug!(n, "fully matched previously persisted IDs");
|
||||
return;
|
||||
}
|
||||
|
||||
// And then walk the WAL file sets.
|
||||
let mut remove_ids = Vec::with_capacity(0);
|
||||
for (id, file_set) in self.wal_files.iter_mut() {
|
||||
// Invariant: files in the file set always have at least 1 reference
|
||||
assert!(!file_set.is_empty());
|
||||
|
||||
// Early exit the loop if possible.
|
||||
if set.is_empty() {
|
||||
break;
|
||||
}
|
||||
|
||||
// Clear the intersection of both sets.
|
||||
let n = clear_intersection(&mut set, file_set);
|
||||
if n == 0 {
|
||||
continue;
|
||||
}
|
||||
|
||||
debug!(n, segment_id=%id, "matched file IDs");
|
||||
|
||||
// At least 1 element was removed from the file set, it may now be
|
||||
// empty.
|
||||
if file_set.is_empty() {
|
||||
remove_ids.push(*id);
|
||||
}
|
||||
}
|
||||
|
||||
// Union whatever IDs remain with the file-less persisted set.
|
||||
if !set.is_empty() {
|
||||
debug!(n = set.len(), "retaining file-less IDs");
|
||||
self.persisted.add_set(&set);
|
||||
}
|
||||
|
||||
// And delete any newly empty files
|
||||
for id in remove_ids {
|
||||
let file_set = self
|
||||
.wal_files
|
||||
.remove(&id)
|
||||
.expect("id was obtained during iter");
|
||||
|
||||
// Invariant: the file being removed always has no references.
|
||||
assert!(file_set.is_empty());
|
||||
|
||||
delete_file(&self.wal, id).await
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Remove the intersection of `a` and `b`, from both `a` and `b`, and return
|
||||
/// the cardinality of the intersection.
|
||||
fn clear_intersection(a: &mut SequenceNumberSet, b: &mut SequenceNumberSet) -> usize {
|
||||
let intersection = sequence_number_set::intersect(a, b);
|
||||
|
||||
a.remove_set(&intersection);
|
||||
b.remove_set(&intersection);
|
||||
|
||||
intersection.len() as _
|
||||
}
|
||||
|
||||
/// Delete the specified WAL segment from `wal`, and log it at info.
|
||||
async fn delete_file<T>(wal: &T, id: SegmentId)
|
||||
where
|
||||
T: WalFileDeleter,
|
||||
{
|
||||
info!(
|
||||
%id,
|
||||
"deleted fully-persisted wal segment"
|
||||
);
|
||||
|
||||
wal.delete_file(id).await
|
||||
}
|
||||
|
||||
// Tests in actor.rs
|
|
@ -1,38 +1,15 @@
|
|||
//! A WAL file reference tracker, responsible for deleting files that contain
|
||||
//! entirely persisted data.
|
||||
|
||||
use std::{fmt::Debug, sync::Arc};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use data_types::{
|
||||
sequence_number_set::{self, SequenceNumberSet},
|
||||
SequenceNumber,
|
||||
};
|
||||
use hashbrown::HashMap;
|
||||
use observability_deps::tracing::{debug, info, warn};
|
||||
use tokio::{
|
||||
select,
|
||||
sync::mpsc::{self, error::TrySendError},
|
||||
};
|
||||
use data_types::{sequence_number_set::SequenceNumberSet, SequenceNumber};
|
||||
use observability_deps::tracing::warn;
|
||||
use tokio::sync::mpsc::{self, error::TrySendError};
|
||||
use wal::SegmentId;
|
||||
|
||||
use crate::persist::completion_observer::CompletedPersist;
|
||||
use crate::{
|
||||
persist::completion_observer::CompletedPersist, wal::reference_tracker::WalFileDeleter,
|
||||
};
|
||||
|
||||
/// An abstraction defining the ability of an implementer to delete WAL segment
|
||||
/// files by ID.
|
||||
#[async_trait]
|
||||
pub(crate) trait WalFileDeleter: Debug + Send + Sync + 'static {
|
||||
/// Delete the WAL segment with the specified [`SegmentId`], or panic if
|
||||
/// deletion fails.
|
||||
async fn delete_file(&self, id: SegmentId);
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl WalFileDeleter for Arc<wal::Wal> {
|
||||
async fn delete_file(&self, id: SegmentId) {
|
||||
self.delete(id).await.expect("failed to drop wal segment");
|
||||
}
|
||||
}
|
||||
use super::WalReferenceActor;
|
||||
|
||||
/// A WAL file reference-count tracker handle.
|
||||
///
|
||||
|
@ -112,7 +89,7 @@ impl WalReferenceHandle {
|
|||
/// The returned [`WalReferenceActor`] SHOULD be
|
||||
/// [`WalReferenceActor::run()`] before the handle is used to avoid
|
||||
/// potential deadlocks.
|
||||
pub(crate) fn new<T>(wal: T) -> (Self, WalReferenceActor<T>)
|
||||
pub(crate) fn new<T>(wal: T, metrics: &metric::Registry) -> (Self, WalReferenceActor<T>)
|
||||
where
|
||||
T: WalFileDeleter,
|
||||
{
|
||||
|
@ -120,14 +97,7 @@ impl WalReferenceHandle {
|
|||
let (persist_tx, persist_rx) = mpsc::channel(50);
|
||||
let (unbuffered_tx, unbuffered_rx) = mpsc::channel(50);
|
||||
|
||||
let actor = WalReferenceActor {
|
||||
wal,
|
||||
persisted: SequenceNumberSet::default(),
|
||||
wal_files: HashMap::with_capacity(3),
|
||||
file_rx,
|
||||
persist_rx,
|
||||
unbuffered_rx,
|
||||
};
|
||||
let actor = WalReferenceActor::new(wal, file_rx, persist_rx, unbuffered_rx, metrics);
|
||||
|
||||
(
|
||||
Self {
|
||||
|
@ -179,236 +149,16 @@ impl WalReferenceHandle {
|
|||
}
|
||||
}
|
||||
|
||||
/// A WAL file reference-count tracker.
|
||||
///
|
||||
/// See [`WalReferenceHandle`].
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct WalReferenceActor<T = Arc<wal::Wal>> {
|
||||
wal: T,
|
||||
|
||||
/// The set of IDs of persisted data that do not yet appear in
|
||||
/// `wal_files`, the set of WAL files rotated out of active use. This is
|
||||
/// an intermediate buffer necessary to tolerate out-of-order persist
|
||||
/// notifications w.r.t file notifications.
|
||||
///
|
||||
/// IDs that appear in this set are most likely part of the active WAL
|
||||
/// segment file and should be reconciled when it rotates.
|
||||
persisted: SequenceNumberSet,
|
||||
|
||||
/// The set of closed WAL segment files, and the set of unpersisted
|
||||
/// [`SequenceNumber`] they contain.
|
||||
///
|
||||
/// These [`SequenceNumberSet`] are slowly drained / have IDs removed in
|
||||
/// response to persisted data notifications. Once the set is of length 0,
|
||||
/// the file can be deleted as all the entries the file contains has been
|
||||
/// persisted.
|
||||
///
|
||||
/// Invariant: sets in this map are always non-empty.
|
||||
wal_files: HashMap<wal::SegmentId, SequenceNumberSet>,
|
||||
|
||||
file_rx: mpsc::Receiver<(SegmentId, SequenceNumberSet)>,
|
||||
persist_rx: mpsc::Receiver<Arc<CompletedPersist>>,
|
||||
unbuffered_rx: mpsc::Receiver<SequenceNumber>,
|
||||
}
|
||||
|
||||
impl<T> WalReferenceActor<T>
|
||||
where
|
||||
T: WalFileDeleter,
|
||||
{
|
||||
/// Execute the actor task.
|
||||
///
|
||||
/// This task exits once the sender side of the input channels have been
|
||||
/// dropped.
|
||||
pub(crate) async fn run(mut self) {
|
||||
loop {
|
||||
select! {
|
||||
// Prefer polling the channels in the specified order.
|
||||
//
|
||||
// By consuming file_rx first, there's a greater chance that
|
||||
// subsequent persist/ignore events can be applied directly to
|
||||
// the file sets, rather than having to wait in the intermediate
|
||||
// "persisted" set, reducing memory utilisation.
|
||||
biased;
|
||||
|
||||
Some((id, f)) = self.file_rx.recv() => self.handle_new_file(id, f).await,
|
||||
Some(p) = self.persist_rx.recv() => self.handle_persisted(p).await,
|
||||
Some(i) = self.unbuffered_rx.recv() => self.handle_unbuffered(i).await,
|
||||
else => break
|
||||
}
|
||||
}
|
||||
|
||||
debug!("stopping wal reference counter task");
|
||||
}
|
||||
|
||||
/// Track a newly rotated WAL segment, with the given [`SegmentId`] and
|
||||
/// containing the operations specified in [`SequenceNumberSet`].
|
||||
///
|
||||
/// This method tolerates an empty `set`.
|
||||
async fn handle_new_file(&mut self, segment_id: SegmentId, mut set: SequenceNumberSet) {
|
||||
debug!(
|
||||
%segment_id,
|
||||
sequence_number_set = ?set,
|
||||
"notified of new segment file"
|
||||
);
|
||||
|
||||
// Clear the overlap between the "persisted" set, and this new file from
|
||||
// both.
|
||||
let n = clear_intersection(&mut self.persisted, &mut set);
|
||||
if n > 0 {
|
||||
debug!(n, "released previously persisted IDs");
|
||||
}
|
||||
|
||||
// If the file set is now completely empty, it can be immediately
|
||||
// deleted.
|
||||
if set.is_empty() {
|
||||
debug!(n, "immediately dropping empty segment file");
|
||||
return delete_file(&self.wal, segment_id).await;
|
||||
}
|
||||
|
||||
// Otherwise, retain this file for later persist notifications.
|
||||
//
|
||||
// Run-optimise the bitmap to minimise memory utilisation of this set.
|
||||
// This is a relatively fast operation, and the file sets are expected
|
||||
// to be highly suitable for RLE compression due to the monotonic
|
||||
// sequence number assignments.
|
||||
set.run_optimise();
|
||||
|
||||
// Insert the file set into the files being tracked
|
||||
assert!(!set.is_empty()); // Invariant: sets in file map are never empty
|
||||
assert!(
|
||||
self.wal_files.insert(segment_id, set).is_none(),
|
||||
"duplicate segment ID"
|
||||
);
|
||||
}
|
||||
|
||||
/// Process a persistence completion notification, decreasing the reference
|
||||
/// counts against tracked WAL files, and holding any remaining IDs (in the
|
||||
/// untracked active WAL segment) in a temporary "persisted" buffer.
|
||||
async fn handle_persisted(&mut self, note: Arc<CompletedPersist>) {
|
||||
debug!(
|
||||
namespace_id = %note.namespace_id(),
|
||||
table_id = %note.table_id(),
|
||||
partition_id = %note.partition_id(),
|
||||
sequence_number_set = ?note.sequence_numbers(),
|
||||
"notified of persisted data"
|
||||
);
|
||||
|
||||
self.remove(note.owned_sequence_numbers()).await;
|
||||
}
|
||||
|
||||
/// Handle a write that has been added to the WAL, but that did not complete
|
||||
/// / buffer.
|
||||
///
|
||||
/// Because the write was added to the WAL, its ID will be part of the WAL
|
||||
/// file's [`SequenceNumberSet`], but because the write was not buffered, it
|
||||
/// will never be persisted and therefore the WAL set will always have an
|
||||
/// outstanding reference unless it is accounted for here.
|
||||
async fn handle_unbuffered(&mut self, id: SequenceNumber) {
|
||||
debug!(sequence_number = id.get(), "notified of unbuffered write");
|
||||
|
||||
// Delegate to the same code as persisted by presenting this ID as a set
|
||||
// - the same behaviour is required.
|
||||
let mut set = SequenceNumberSet::with_capacity(1);
|
||||
set.add(id);
|
||||
|
||||
self.remove(set).await;
|
||||
}
|
||||
|
||||
/// Remove the intersection of `set` from all the sets in `self` (file sets,
|
||||
/// and the untracked / "persisted" buffer set).
|
||||
///
|
||||
/// Deletes all WAL files that are no longer referenced / have unpersisted
|
||||
/// entries.
|
||||
async fn remove(&mut self, mut set: SequenceNumberSet) {
|
||||
// First remove this set from the "persisted" / file-less set.
|
||||
let n = clear_intersection(&mut set, &mut self.persisted);
|
||||
if n > 0 {
|
||||
debug!(n, "released previously persisted IDs");
|
||||
}
|
||||
|
||||
if set.is_empty() {
|
||||
debug!(n, "fully matched previously persisted IDs");
|
||||
return;
|
||||
}
|
||||
|
||||
// And then walk the WAL file sets.
|
||||
let mut remove_ids = Vec::with_capacity(0);
|
||||
for (id, file_set) in self.wal_files.iter_mut() {
|
||||
// Invariant: files in the file set always have at least 1 reference
|
||||
assert!(!file_set.is_empty());
|
||||
|
||||
// Early exit the loop if possible.
|
||||
if set.is_empty() {
|
||||
break;
|
||||
}
|
||||
|
||||
// Clear the intersection of both sets.
|
||||
let n = clear_intersection(&mut set, file_set);
|
||||
if n == 0 {
|
||||
continue;
|
||||
}
|
||||
|
||||
debug!(n, segment_id=%id, "matched file IDs");
|
||||
|
||||
// At least 1 element was removed from the file set, it may now be
|
||||
// empty.
|
||||
if file_set.is_empty() {
|
||||
remove_ids.push(*id);
|
||||
}
|
||||
}
|
||||
|
||||
// Union whatever IDs remain with the file-less persisted set.
|
||||
if !set.is_empty() {
|
||||
debug!(n = set.len(), "retaining file-less IDs");
|
||||
self.persisted.add_set(&set);
|
||||
}
|
||||
|
||||
// And delete any newly empty files
|
||||
for id in remove_ids {
|
||||
let file_set = self
|
||||
.wal_files
|
||||
.remove(&id)
|
||||
.expect("id was obtained during iter");
|
||||
|
||||
// Invariant: the file being removed always has no references.
|
||||
assert!(file_set.is_empty());
|
||||
|
||||
delete_file(&self.wal, id).await
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Remove the intersection of `a` and `b`, from both `a` and `b`, and return
|
||||
/// the cardinality of the intersection.
|
||||
fn clear_intersection(a: &mut SequenceNumberSet, b: &mut SequenceNumberSet) -> usize {
|
||||
let intersection = sequence_number_set::intersect(a, b);
|
||||
|
||||
a.remove_set(&intersection);
|
||||
b.remove_set(&intersection);
|
||||
|
||||
intersection.len() as _
|
||||
}
|
||||
|
||||
/// Delete the specified WAL segment from `wal`, and log it at info.
|
||||
async fn delete_file<T>(wal: &T, id: SegmentId)
|
||||
where
|
||||
T: WalFileDeleter,
|
||||
{
|
||||
info!(
|
||||
%id,
|
||||
"deleted fully-persisted wal segment"
|
||||
);
|
||||
|
||||
wal.delete_file(id).await
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::{sync::Arc, time::Duration};
|
||||
|
||||
use std::time::Duration;
|
||||
|
||||
use assert_matches::assert_matches;
|
||||
use async_trait::async_trait;
|
||||
use data_types::{NamespaceId, PartitionId, TableId};
|
||||
use futures::Future;
|
||||
use metric::{assert_counter, U64Gauge};
|
||||
use parking_lot::Mutex;
|
||||
use test_helpers::timeout::FutureTimeout;
|
||||
use tokio::sync::Notify;
|
||||
|
@ -477,8 +227,9 @@ mod tests {
|
|||
async fn test_rotate_persist_delete() {
|
||||
const SEGMENT_ID: SegmentId = SegmentId::new(42);
|
||||
|
||||
let metrics = metric::Registry::default();
|
||||
let wal = Arc::new(MockWalDeleter::default());
|
||||
let (handle, actor) = WalReferenceHandle::new(Arc::clone(&wal));
|
||||
let (handle, actor) = WalReferenceHandle::new(Arc::clone(&wal), &metrics);
|
||||
|
||||
let actor_task = tokio::spawn(actor.run());
|
||||
|
||||
|
@ -511,6 +262,20 @@ mod tests {
|
|||
// Validate the correct ID was deleted
|
||||
assert_matches!(wal.calls().as_slice(), &[v] if v == SEGMENT_ID);
|
||||
|
||||
assert_counter!(
|
||||
metrics,
|
||||
U64Gauge,
|
||||
"ingester_wal_inactive_file_count",
|
||||
value = 0,
|
||||
);
|
||||
|
||||
assert_counter!(
|
||||
metrics,
|
||||
U64Gauge,
|
||||
"ingester_wal_inactive_file_op_reference_count",
|
||||
value = 0,
|
||||
);
|
||||
|
||||
// Assert clean shutdown behaviour.
|
||||
drop(handle);
|
||||
actor_task
|
||||
|
@ -535,8 +300,9 @@ mod tests {
|
|||
const SEGMENT_ID_1: SegmentId = SegmentId::new(42);
|
||||
const SEGMENT_ID_2: SegmentId = SegmentId::new(24);
|
||||
|
||||
let metrics = metric::Registry::default();
|
||||
let wal = Arc::new(MockWalDeleter::default());
|
||||
let (handle, actor) = WalReferenceHandle::new(Arc::clone(&wal));
|
||||
let (handle, actor) = WalReferenceHandle::new(Arc::clone(&wal), &metrics);
|
||||
|
||||
let actor_task = tokio::spawn(actor.run());
|
||||
|
||||
|
@ -598,8 +364,9 @@ mod tests {
|
|||
async fn test_empty_file_set() {
|
||||
const SEGMENT_ID: SegmentId = SegmentId::new(42);
|
||||
|
||||
let metrics = metric::Registry::default();
|
||||
let wal = Arc::new(MockWalDeleter::default());
|
||||
let (handle, actor) = WalReferenceHandle::new(Arc::clone(&wal));
|
||||
let (handle, actor) = WalReferenceHandle::new(Arc::clone(&wal), &metrics);
|
||||
|
||||
let actor_task = tokio::spawn(actor.run());
|
||||
|
||||
|
@ -625,8 +392,9 @@ mod tests {
|
|||
#[tokio::test]
|
||||
#[should_panic(expected = "duplicate segment ID")]
|
||||
async fn test_duplicate_segment_ids() {
|
||||
let metrics = metric::Registry::default();
|
||||
let wal = Arc::new(MockWalDeleter::default());
|
||||
let (handle, actor) = WalReferenceHandle::new(Arc::clone(&wal));
|
||||
let (handle, actor) = WalReferenceHandle::new(Arc::clone(&wal), &metrics);
|
||||
|
||||
// Enqueuing a notification before the actor is running should succeed
|
||||
// because of the channel buffer capacity.
|
||||
|
@ -643,4 +411,76 @@ mod tests {
|
|||
// This should panic after processing the second file.
|
||||
actor.run().with_timeout_panic(Duration::from_secs(5)).await;
|
||||
}
|
||||
|
||||
/// Enqueue two segment files, enqueue persist notifications for the second
|
||||
/// file and wait for it to be deleted to synchronise the state (so it's not
|
||||
/// a racy test).
|
||||
///
|
||||
/// Then assert the metric values for the known state.
|
||||
#[tokio::test]
|
||||
async fn test_metrics() {
|
||||
const SEGMENT_ID_1: SegmentId = SegmentId::new(42);
|
||||
const SEGMENT_ID_2: SegmentId = SegmentId::new(24);
|
||||
|
||||
let metrics = metric::Registry::default();
|
||||
let wal = Arc::new(MockWalDeleter::default());
|
||||
let (handle, actor) = WalReferenceHandle::new(Arc::clone(&wal), &metrics);
|
||||
|
||||
let actor_task = tokio::spawn(actor.run());
|
||||
|
||||
// Add a file with 4 references
|
||||
handle
|
||||
.enqueue_rotated_file(SEGMENT_ID_1, new_set([1, 2, 3, 4, 5]))
|
||||
.await;
|
||||
|
||||
// Reduce the reference count for file 1 (leaving 3 references)
|
||||
handle.enqueue_persist_notification(new_note([1, 2])).await;
|
||||
|
||||
// Enqueue the second file.
|
||||
handle
|
||||
.enqueue_rotated_file(SEGMENT_ID_2, new_set([6]))
|
||||
.await;
|
||||
|
||||
// Release the references to file 2
|
||||
let waker = wal.waker();
|
||||
handle.enqueue_persist_notification(new_note([6])).await;
|
||||
|
||||
waker.await;
|
||||
|
||||
//
|
||||
// At this point, the actor has deleted the second file, which means it
|
||||
// has already processed the first enqueued file, and the first persist
|
||||
// notification that relates to the first file.
|
||||
//
|
||||
// A non-racy assert can now be made against this known state.
|
||||
//
|
||||
|
||||
assert_counter!(
|
||||
metrics,
|
||||
U64Gauge,
|
||||
"ingester_wal_inactive_file_count",
|
||||
value = 1,
|
||||
);
|
||||
|
||||
assert_counter!(
|
||||
metrics,
|
||||
U64Gauge,
|
||||
"ingester_wal_inactive_file_op_reference_count",
|
||||
value = 3, // 5 initial, reduced by 2 via persist notification
|
||||
);
|
||||
|
||||
assert_counter!(
|
||||
metrics,
|
||||
U64Gauge,
|
||||
"ingester_wal_inactive_min_id",
|
||||
value = SEGMENT_ID_1.get(),
|
||||
);
|
||||
|
||||
// Assert clean shutdown behaviour.
|
||||
drop(handle);
|
||||
actor_task
|
||||
.with_timeout_panic(Duration::from_secs(5))
|
||||
.await
|
||||
.expect("actor task should stop cleanly")
|
||||
}
|
||||
}
|
|
@ -0,0 +1,8 @@
|
|||
mod actor;
|
||||
mod handle;
|
||||
mod wal_deleter;
|
||||
|
||||
pub(crate) use actor::*;
|
||||
#[allow(unused_imports)] // Used by docs - will be used by code soon.
|
||||
pub(crate) use handle::*;
|
||||
pub(crate) use wal_deleter::*;
|
|
@ -0,0 +1,20 @@
|
|||
use std::{fmt::Debug, sync::Arc};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use wal::SegmentId;
|
||||
|
||||
/// An abstraction defining the ability of an implementer to delete WAL segment
|
||||
/// files by ID.
|
||||
#[async_trait]
|
||||
pub(crate) trait WalFileDeleter: Debug + Send + Sync + 'static {
|
||||
/// Delete the WAL segment with the specified [`SegmentId`], or panic if
|
||||
/// deletion fails.
|
||||
async fn delete_file(&self, id: SegmentId);
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl WalFileDeleter for Arc<wal::Wal> {
|
||||
async fn delete_file(&self, id: SegmentId) {
|
||||
self.delete(id).await.expect("failed to drop wal segment");
|
||||
}
|
||||
}
|
|
@ -329,6 +329,15 @@ impl IOxSessionContext {
|
|||
pub async fn prepare_sql(&self, sql: &str) -> Result<Arc<dyn ExecutionPlan>> {
|
||||
let logical_plan = self.plan_sql(sql).await?;
|
||||
|
||||
let ctx = self.child_ctx("prepare_sql");
|
||||
ctx.create_physical_plan(&logical_plan).await
|
||||
}
|
||||
|
||||
/// Prepare (optimize + plan) a pre-created [`LogicalPlan`] for execution
|
||||
pub async fn create_physical_plan(
|
||||
&self,
|
||||
logical_plan: &LogicalPlan,
|
||||
) -> Result<Arc<dyn ExecutionPlan>> {
|
||||
// Make nicer erorrs for unsupported SQL
|
||||
// (By default datafusion returns Internal Error)
|
||||
match &logical_plan {
|
||||
|
@ -353,15 +362,9 @@ impl IOxSessionContext {
|
|||
_ => (),
|
||||
}
|
||||
|
||||
let ctx = self.child_ctx("prepare_sql");
|
||||
ctx.create_physical_plan(&logical_plan).await
|
||||
}
|
||||
|
||||
/// Prepare (optimize + plan) a pre-created [`LogicalPlan`] for execution
|
||||
pub async fn create_physical_plan(&self, plan: &LogicalPlan) -> Result<Arc<dyn ExecutionPlan>> {
|
||||
let mut ctx = self.child_ctx("create_physical_plan");
|
||||
debug!(text=%plan.display_indent_schema(), "create_physical_plan: initial plan");
|
||||
let physical_plan = ctx.inner.state().create_physical_plan(plan).await?;
|
||||
debug!(text=%logical_plan.display_indent_schema(), "create_physical_plan: initial plan");
|
||||
let physical_plan = ctx.inner.state().create_physical_plan(logical_plan).await?;
|
||||
|
||||
ctx.recorder.event("physical plan");
|
||||
debug!(text=%displayable(physical_plan.as_ref()).indent(), "create_physical_plan: plan to run");
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
use std::sync::Arc;
|
||||
|
||||
use datafusion::{
|
||||
catalog::TableReference,
|
||||
datasource::provider_as_source,
|
||||
logical_expr::{expr_rewriter::ExprRewritable, LogicalPlanBuilder},
|
||||
};
|
||||
|
@ -183,8 +184,10 @@ impl<'a> ScanPlanBuilder<'a> {
|
|||
// later if possible)
|
||||
let projection = None;
|
||||
|
||||
let mut plan_builder = LogicalPlanBuilder::scan(table_name.as_ref(), source, projection)
|
||||
.context(BuildingPlanSnafu)?;
|
||||
// Do not parse the tablename as a SQL identifer, but use as is
|
||||
let table_ref = TableReference::bare(table_name.to_string());
|
||||
let mut plan_builder =
|
||||
LogicalPlanBuilder::scan(table_ref, source, projection).context(BuildingPlanSnafu)?;
|
||||
|
||||
// Use a filter node to add general predicates + timestamp
|
||||
// range, if any
|
||||
|
|
|
@ -1,10 +1,14 @@
|
|||
use std::sync::Arc;
|
||||
|
||||
use datafusion::physical_plan::{
|
||||
empty::EmptyExec, file_format::ParquetExec, union::UnionExec, visit_execution_plan,
|
||||
ExecutionPlan, ExecutionPlanVisitor,
|
||||
use arrow::datatypes::SchemaRef;
|
||||
use datafusion::{
|
||||
error::DataFusionError,
|
||||
physical_plan::{
|
||||
empty::EmptyExec, file_format::ParquetExec, union::UnionExec, visit_execution_plan,
|
||||
ExecutionPlan, ExecutionPlanVisitor,
|
||||
},
|
||||
};
|
||||
use schema::Schema;
|
||||
use observability_deps::tracing::debug;
|
||||
|
||||
use crate::{
|
||||
provider::{PartitionedFileExt, RecordBatchesExec},
|
||||
|
@ -20,29 +24,36 @@ use crate::{
|
|||
/// additional nodes (like de-duplication, filtering, projection) then NO data will be returned.
|
||||
///
|
||||
/// [`chunks_to_physical_nodes`]: crate::provider::chunks_to_physical_nodes
|
||||
pub fn extract_chunks(plan: &dyn ExecutionPlan) -> Option<(Schema, Vec<Arc<dyn QueryChunk>>)> {
|
||||
pub fn extract_chunks(plan: &dyn ExecutionPlan) -> Option<(SchemaRef, Vec<Arc<dyn QueryChunk>>)> {
|
||||
let mut visitor = ExtractChunksVisitor::default();
|
||||
visit_execution_plan(plan, &mut visitor).ok()?;
|
||||
if let Err(e) = visit_execution_plan(plan, &mut visitor) {
|
||||
debug!(
|
||||
%e,
|
||||
"cannot extract chunks",
|
||||
);
|
||||
return None;
|
||||
}
|
||||
visitor.schema.map(|schema| (schema, visitor.chunks))
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
struct ExtractChunksVisitor {
|
||||
chunks: Vec<Arc<dyn QueryChunk>>,
|
||||
schema: Option<Schema>,
|
||||
schema: Option<SchemaRef>,
|
||||
}
|
||||
|
||||
impl ExtractChunksVisitor {
|
||||
fn add_chunk(&mut self, chunk: Arc<dyn QueryChunk>) -> Result<(), ()> {
|
||||
fn add_chunk(&mut self, chunk: Arc<dyn QueryChunk>) {
|
||||
self.chunks.push(chunk);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn add_schema_from_exec(&mut self, exec: &dyn ExecutionPlan) -> Result<(), ()> {
|
||||
let schema = Schema::try_from(exec.schema()).map_err(|_| ())?;
|
||||
fn add_schema_from_exec(&mut self, exec: &dyn ExecutionPlan) -> Result<(), DataFusionError> {
|
||||
let schema = exec.schema();
|
||||
if let Some(existing) = &self.schema {
|
||||
if existing != &schema {
|
||||
return Err(());
|
||||
return Err(DataFusionError::External(
|
||||
String::from("Different schema").into(),
|
||||
));
|
||||
}
|
||||
} else {
|
||||
self.schema = Some(schema);
|
||||
|
@ -52,19 +63,27 @@ impl ExtractChunksVisitor {
|
|||
}
|
||||
|
||||
impl ExecutionPlanVisitor for ExtractChunksVisitor {
|
||||
type Error = ();
|
||||
type Error = DataFusionError;
|
||||
|
||||
fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> Result<bool, Self::Error> {
|
||||
let plan_any = plan.as_any();
|
||||
|
||||
if let Some(record_batches_exec) = plan_any.downcast_ref::<RecordBatchesExec>() {
|
||||
self.add_schema_from_exec(record_batches_exec)?;
|
||||
self.add_schema_from_exec(record_batches_exec)
|
||||
.map_err(|e| {
|
||||
DataFusionError::Context(
|
||||
"add schema from RecordBatchesExec".to_owned(),
|
||||
Box::new(e),
|
||||
)
|
||||
})?;
|
||||
|
||||
for chunk in record_batches_exec.chunks() {
|
||||
self.add_chunk(Arc::clone(chunk))?;
|
||||
self.add_chunk(Arc::clone(chunk));
|
||||
}
|
||||
} else if let Some(parquet_exec) = plan_any.downcast_ref::<ParquetExec>() {
|
||||
self.add_schema_from_exec(parquet_exec)?;
|
||||
self.add_schema_from_exec(parquet_exec).map_err(|e| {
|
||||
DataFusionError::Context("add schema from ParquetExec".to_owned(), Box::new(e))
|
||||
})?;
|
||||
|
||||
for group in &parquet_exec.base_config().file_groups {
|
||||
for file in group {
|
||||
|
@ -72,22 +91,32 @@ impl ExecutionPlanVisitor for ExtractChunksVisitor {
|
|||
.extensions
|
||||
.as_ref()
|
||||
.and_then(|any| any.downcast_ref::<PartitionedFileExt>())
|
||||
.ok_or(())?;
|
||||
self.add_chunk(Arc::clone(&ext.0))?;
|
||||
.ok_or_else(|| {
|
||||
DataFusionError::External(
|
||||
String::from("PartitionedFileExt not found").into(),
|
||||
)
|
||||
})?;
|
||||
self.add_chunk(Arc::clone(&ext.0));
|
||||
}
|
||||
}
|
||||
} else if let Some(empty_exec) = plan_any.downcast_ref::<EmptyExec>() {
|
||||
// should not produce dummy data
|
||||
if empty_exec.produce_one_row() {
|
||||
return Err(());
|
||||
return Err(DataFusionError::External(
|
||||
String::from("EmptyExec produces row").into(),
|
||||
));
|
||||
}
|
||||
|
||||
self.add_schema_from_exec(empty_exec)?;
|
||||
self.add_schema_from_exec(empty_exec).map_err(|e| {
|
||||
DataFusionError::Context("add schema from EmptyExec".to_owned(), Box::new(e))
|
||||
})?;
|
||||
} else if plan_any.downcast_ref::<UnionExec>().is_some() {
|
||||
// continue visiting
|
||||
} else {
|
||||
// unsupported node
|
||||
return Err(());
|
||||
return Err(DataFusionError::External(
|
||||
String::from("Unsupported node").into(),
|
||||
));
|
||||
}
|
||||
|
||||
Ok(true)
|
||||
|
@ -112,20 +141,20 @@ mod tests {
|
|||
|
||||
#[test]
|
||||
fn test_roundtrip_empty() {
|
||||
let schema = chunk(1).schema().clone();
|
||||
let schema = chunk(1).schema().as_arrow();
|
||||
assert_roundtrip(schema, vec![]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_roundtrip_single_record_batch() {
|
||||
let chunk1 = chunk(1);
|
||||
assert_roundtrip(chunk1.schema().clone(), vec![Arc::new(chunk1)]);
|
||||
assert_roundtrip(chunk1.schema().as_arrow(), vec![Arc::new(chunk1)]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_roundtrip_single_parquet() {
|
||||
let chunk1 = chunk(1).with_dummy_parquet_file();
|
||||
assert_roundtrip(chunk1.schema().clone(), vec![Arc::new(chunk1)]);
|
||||
assert_roundtrip(chunk1.schema().as_arrow(), vec![Arc::new(chunk1)]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
@ -136,7 +165,7 @@ mod tests {
|
|||
let chunk4 = chunk(4);
|
||||
let chunk5 = chunk(5);
|
||||
assert_roundtrip(
|
||||
chunk1.schema().clone(),
|
||||
chunk1.schema().as_arrow(),
|
||||
vec![
|
||||
Arc::new(chunk1),
|
||||
Arc::new(chunk2),
|
||||
|
@ -174,14 +203,16 @@ mod tests {
|
|||
DataType::Float64,
|
||||
true,
|
||||
)]));
|
||||
let plan = EmptyExec::new(false, schema);
|
||||
assert!(extract_chunks(&plan).is_none());
|
||||
let plan = EmptyExec::new(false, Arc::clone(&schema));
|
||||
let (schema2, chunks) = extract_chunks(&plan).unwrap();
|
||||
assert_eq!(schema, schema2);
|
||||
assert!(chunks.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_stop_at_other_node_types() {
|
||||
let chunk1 = chunk(1);
|
||||
let schema = chunk1.schema().clone();
|
||||
let schema = chunk1.schema().as_arrow();
|
||||
let plan = chunks_to_physical_nodes(
|
||||
&schema,
|
||||
None,
|
||||
|
@ -206,7 +237,8 @@ mod tests {
|
|||
.unwrap()
|
||||
.merge(&schema_ext)
|
||||
.unwrap()
|
||||
.build();
|
||||
.build()
|
||||
.as_arrow();
|
||||
assert_roundtrip(schema, vec![Arc::new(chunk)]);
|
||||
}
|
||||
|
||||
|
@ -219,12 +251,13 @@ mod tests {
|
|||
.unwrap()
|
||||
.merge(&schema_ext)
|
||||
.unwrap()
|
||||
.build();
|
||||
.build()
|
||||
.as_arrow();
|
||||
assert_roundtrip(schema, vec![Arc::new(chunk)]);
|
||||
}
|
||||
|
||||
#[track_caller]
|
||||
fn assert_roundtrip(schema: Schema, chunks: Vec<Arc<dyn QueryChunk>>) {
|
||||
fn assert_roundtrip(schema: SchemaRef, chunks: Vec<Arc<dyn QueryChunk>>) {
|
||||
let plan = chunks_to_physical_nodes(&schema, None, chunks.clone(), Predicate::default(), 2);
|
||||
let (schema2, chunks2) = extract_chunks(plan.as_ref()).expect("data found");
|
||||
assert_eq!(schema, schema2);
|
||||
|
|
|
@ -4,7 +4,7 @@ use datafusion::{
|
|||
config::ConfigOptions,
|
||||
error::Result,
|
||||
physical_optimizer::PhysicalOptimizerRule,
|
||||
physical_plan::{rewrite::TreeNodeRewritable, ExecutionPlan},
|
||||
physical_plan::{tree_node::TreeNodeRewritable, ExecutionPlan},
|
||||
};
|
||||
use predicate::Predicate;
|
||||
|
||||
|
@ -34,9 +34,9 @@ impl PhysicalOptimizerRule for CombineChunks {
|
|||
config: &ConfigOptions,
|
||||
) -> Result<Arc<dyn ExecutionPlan>> {
|
||||
plan.transform_up(&|plan| {
|
||||
if let Some((iox_schema, chunks)) = extract_chunks(plan.as_ref()) {
|
||||
if let Some((schema, chunks)) = extract_chunks(plan.as_ref()) {
|
||||
return Ok(Some(chunks_to_physical_nodes(
|
||||
&iox_schema,
|
||||
&schema,
|
||||
None,
|
||||
chunks,
|
||||
Predicate::new(),
|
||||
|
@ -72,7 +72,7 @@ mod tests {
|
|||
let chunk3 = TestChunk::new("table").with_id(3);
|
||||
let chunk4 = TestChunk::new("table").with_id(4).with_dummy_parquet_file();
|
||||
let chunk5 = TestChunk::new("table").with_id(5).with_dummy_parquet_file();
|
||||
let schema = chunk1.schema().clone();
|
||||
let schema = chunk1.schema().as_arrow();
|
||||
let plan = Arc::new(UnionExec::new(vec![
|
||||
chunks_to_physical_nodes(
|
||||
&schema,
|
||||
|
|
|
@ -4,7 +4,7 @@ use datafusion::{
|
|||
config::ConfigOptions,
|
||||
error::Result,
|
||||
physical_optimizer::PhysicalOptimizerRule,
|
||||
physical_plan::{rewrite::TreeNodeRewritable, ExecutionPlan},
|
||||
physical_plan::{tree_node::TreeNodeRewritable, ExecutionPlan},
|
||||
};
|
||||
use predicate::Predicate;
|
||||
use schema::{sort::SortKeyBuilder, TIME_COLUMN_NAME};
|
||||
|
@ -44,7 +44,7 @@ impl PhysicalOptimizerRule for DedupNullColumns {
|
|||
return Ok(None);
|
||||
};
|
||||
|
||||
let pk_cols = schema.primary_key().into_iter().collect::<HashSet<_>>();
|
||||
let pk_cols = dedup_exec.sort_columns();
|
||||
|
||||
let mut used_pk_cols = HashSet::new();
|
||||
for chunk in &chunks {
|
||||
|
@ -72,7 +72,7 @@ impl PhysicalOptimizerRule for DedupNullColumns {
|
|||
config.execution.target_partitions,
|
||||
);
|
||||
|
||||
let sort_exprs = arrow_sort_key_exprs(&sort_key, schema.as_arrow().as_ref());
|
||||
let sort_exprs = arrow_sort_key_exprs(&sort_key, &schema);
|
||||
return Ok(Some(Arc::new(DeduplicateExec::new(child, sort_exprs))));
|
||||
}
|
||||
|
||||
|
|
|
@ -5,7 +5,7 @@ use datafusion::{
|
|||
config::ConfigOptions,
|
||||
error::Result,
|
||||
physical_optimizer::PhysicalOptimizerRule,
|
||||
physical_plan::{rewrite::TreeNodeRewritable, ExecutionPlan},
|
||||
physical_plan::{tree_node::TreeNodeRewritable, ExecutionPlan},
|
||||
};
|
||||
use indexmap::IndexSet;
|
||||
use predicate::Predicate;
|
||||
|
@ -132,7 +132,7 @@ impl PhysicalOptimizerRule for DedupSortOrder {
|
|||
config.execution.target_partitions,
|
||||
);
|
||||
|
||||
let sort_exprs = arrow_sort_key_exprs(&quorum_sort_key, schema.as_arrow().as_ref());
|
||||
let sort_exprs = arrow_sort_key_exprs(&quorum_sort_key, &schema);
|
||||
return Ok(Some(Arc::new(DeduplicateExec::new(child, sort_exprs))));
|
||||
}
|
||||
|
||||
|
|
|
@ -5,7 +5,7 @@ use datafusion::{
|
|||
config::ConfigOptions,
|
||||
error::Result,
|
||||
physical_optimizer::PhysicalOptimizerRule,
|
||||
physical_plan::{rewrite::TreeNodeRewritable, union::UnionExec, ExecutionPlan},
|
||||
physical_plan::{tree_node::TreeNodeRewritable, union::UnionExec, ExecutionPlan},
|
||||
};
|
||||
use hashbrown::HashMap;
|
||||
use observability_deps::tracing::warn;
|
||||
|
|
|
@ -4,7 +4,7 @@ use datafusion::{
|
|||
config::ConfigOptions,
|
||||
error::Result,
|
||||
physical_optimizer::PhysicalOptimizerRule,
|
||||
physical_plan::{rewrite::TreeNodeRewritable, ExecutionPlan},
|
||||
physical_plan::{tree_node::TreeNodeRewritable, ExecutionPlan},
|
||||
};
|
||||
use predicate::Predicate;
|
||||
|
||||
|
|
|
@ -16,7 +16,7 @@ pub fn dedup_plan(schema: Schema, chunks: Vec<TestChunk>) -> Arc<dyn ExecutionPl
|
|||
.into_iter()
|
||||
.map(|c| Arc::new(c) as _)
|
||||
.collect::<Vec<Arc<dyn QueryChunk>>>();
|
||||
let plan = chunks_to_physical_nodes(&schema, None, chunks, Predicate::new(), 2);
|
||||
let plan = chunks_to_physical_nodes(&schema.as_arrow(), None, chunks, Predicate::new(), 2);
|
||||
|
||||
let sort_key = schema::sort::SortKey::from_columns(schema.primary_key());
|
||||
let sort_exprs = arrow_sort_key_exprs(&sort_key, &schema.as_arrow());
|
||||
|
|
|
@ -4,7 +4,7 @@ use datafusion::{
|
|||
config::ConfigOptions,
|
||||
error::Result,
|
||||
physical_optimizer::PhysicalOptimizerRule,
|
||||
physical_plan::{rewrite::TreeNodeRewritable, union::UnionExec, ExecutionPlan},
|
||||
physical_plan::{tree_node::TreeNodeRewritable, union::UnionExec, ExecutionPlan},
|
||||
};
|
||||
use observability_deps::tracing::warn;
|
||||
use predicate::Predicate;
|
||||
|
|
|
@ -15,7 +15,7 @@ use datafusion::{
|
|||
expressions::{BinaryExpr, Column},
|
||||
file_format::ParquetExec,
|
||||
filter::FilterExec,
|
||||
rewrite::TreeNodeRewritable,
|
||||
tree_node::TreeNodeRewritable,
|
||||
union::UnionExec,
|
||||
ExecutionPlan, PhysicalExpr,
|
||||
},
|
||||
|
|
|
@ -18,8 +18,8 @@ use datafusion::{
|
|||
file_format::{FileScanConfig, ParquetExec},
|
||||
filter::FilterExec,
|
||||
projection::ProjectionExec,
|
||||
rewrite::TreeNodeRewritable,
|
||||
sorts::sort::SortExec,
|
||||
sorts::{sort::SortExec, sort_preserving_merge::SortPreservingMergeExec},
|
||||
tree_node::TreeNodeRewritable,
|
||||
union::UnionExec,
|
||||
ExecutionPlan, PhysicalExpr,
|
||||
},
|
||||
|
@ -32,10 +32,11 @@ use crate::provider::{DeduplicateExec, RecordBatchesExec};
|
|||
pub struct ProjectionPushdown;
|
||||
|
||||
impl PhysicalOptimizerRule for ProjectionPushdown {
|
||||
#[allow(clippy::only_used_in_recursion)]
|
||||
fn optimize(
|
||||
&self,
|
||||
plan: Arc<dyn ExecutionPlan>,
|
||||
_config: &ConfigOptions,
|
||||
config: &ConfigOptions,
|
||||
) -> Result<Arc<dyn ExecutionPlan>> {
|
||||
plan.transform_down(&|plan| {
|
||||
let plan_any = plan.as_any();
|
||||
|
@ -149,6 +150,47 @@ impl PhysicalOptimizerRule for ProjectionPushdown {
|
|||
},
|
||||
)?;
|
||||
|
||||
return Ok(Some(plan));
|
||||
} else if let Some(child_sort) = child_any.downcast_ref::<SortPreservingMergeExec>()
|
||||
{
|
||||
let sort_required_cols = child_sort
|
||||
.expr()
|
||||
.iter()
|
||||
.map(|expr| collect_columns(&expr.expr))
|
||||
.collect::<Vec<_>>();
|
||||
let sort_required_cols = sort_required_cols
|
||||
.iter()
|
||||
.flat_map(|cols| cols.iter())
|
||||
.map(|col| col.name())
|
||||
.collect::<HashSet<_>>();
|
||||
|
||||
let plan = wrap_user_into_projections(
|
||||
&sort_required_cols,
|
||||
&column_names,
|
||||
Arc::clone(child_sort.input()),
|
||||
|plan| {
|
||||
Ok(Arc::new(SortPreservingMergeExec::new(
|
||||
reassign_sort_exprs_columns(child_sort.expr(), &plan.schema())?,
|
||||
plan,
|
||||
)))
|
||||
},
|
||||
)?;
|
||||
|
||||
return Ok(Some(plan));
|
||||
} else if let Some(child_proj) = child_any.downcast_ref::<ProjectionExec>() {
|
||||
let expr = column_indices
|
||||
.iter()
|
||||
.map(|idx| child_proj.expr()[*idx].clone())
|
||||
.collect();
|
||||
let plan = Arc::new(ProjectionExec::try_new(
|
||||
expr,
|
||||
Arc::clone(child_proj.input()),
|
||||
)?);
|
||||
|
||||
// need to call `optimize` directly on the plan, because otherwise we would continue with the child
|
||||
// and miss the optimization of that particular new ProjectionExec
|
||||
let plan = self.optimize(plan, config)?;
|
||||
|
||||
return Ok(Some(plan));
|
||||
} else if let Some(child_dedup) = child_any.downcast_ref::<DeduplicateExec>() {
|
||||
let dedup_required_cols = child_dedup.sort_columns();
|
||||
|
@ -793,6 +835,135 @@ mod tests {
|
|||
);
|
||||
}
|
||||
|
||||
// since `SortPreservingMergeExec` and `FilterExec` both use `wrap_user_into_projections`, we only test one variant for `SortPreservingMergeExec`
|
||||
#[test]
|
||||
fn test_sortpreservingmerge_projection_split() {
|
||||
let schema = schema();
|
||||
let plan = Arc::new(
|
||||
ProjectionExec::try_new(
|
||||
vec![(expr_col("tag1", &schema), String::from("tag1"))],
|
||||
Arc::new(SortPreservingMergeExec::new(
|
||||
vec![PhysicalSortExpr {
|
||||
expr: expr_col("tag2", &schema),
|
||||
options: SortOptions {
|
||||
descending: true,
|
||||
..Default::default()
|
||||
},
|
||||
}],
|
||||
Arc::new(TestExec::new(schema)),
|
||||
)),
|
||||
)
|
||||
.unwrap(),
|
||||
);
|
||||
let opt = ProjectionPushdown::default();
|
||||
insta::assert_yaml_snapshot!(
|
||||
OptimizationTest::new(plan, opt),
|
||||
@r###"
|
||||
---
|
||||
input:
|
||||
- " ProjectionExec: expr=[tag1@0 as tag1]"
|
||||
- " SortPreservingMergeExec: [tag2@1 DESC]"
|
||||
- " Test"
|
||||
output:
|
||||
Ok:
|
||||
- " ProjectionExec: expr=[tag1@0 as tag1]"
|
||||
- " SortPreservingMergeExec: [tag2@1 DESC]"
|
||||
- " ProjectionExec: expr=[tag1@0 as tag1, tag2@1 as tag2]"
|
||||
- " Test"
|
||||
"###
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_nested_proj_inner_is_impure() {
|
||||
let schema = schema();
|
||||
let plan = Arc::new(EmptyExec::new(false, schema));
|
||||
let plan = Arc::new(
|
||||
ProjectionExec::try_new(
|
||||
vec![
|
||||
(
|
||||
Arc::new(Literal::new(ScalarValue::from("foo"))),
|
||||
String::from("tag1"),
|
||||
),
|
||||
(
|
||||
Arc::new(Literal::new(ScalarValue::from("bar"))),
|
||||
String::from("tag2"),
|
||||
),
|
||||
],
|
||||
plan,
|
||||
)
|
||||
.unwrap(),
|
||||
);
|
||||
let plan = Arc::new(
|
||||
ProjectionExec::try_new(
|
||||
vec![(expr_col("tag1", &plan.schema()), String::from("tag1"))],
|
||||
plan,
|
||||
)
|
||||
.unwrap(),
|
||||
);
|
||||
let opt = ProjectionPushdown::default();
|
||||
insta::assert_yaml_snapshot!(
|
||||
OptimizationTest::new(plan, opt),
|
||||
@r###"
|
||||
---
|
||||
input:
|
||||
- " ProjectionExec: expr=[tag1@0 as tag1]"
|
||||
- " ProjectionExec: expr=[foo as tag1, bar as tag2]"
|
||||
- " EmptyExec: produce_one_row=false"
|
||||
output:
|
||||
Ok:
|
||||
- " ProjectionExec: expr=[foo as tag1]"
|
||||
- " EmptyExec: produce_one_row=false"
|
||||
"###
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_nested_proj_inner_is_pure() {
|
||||
let schema = schema();
|
||||
let plan = Arc::new(EmptyExec::new(false, schema));
|
||||
let plan = Arc::new(
|
||||
ProjectionExec::try_new(
|
||||
vec![
|
||||
(expr_col("tag1", &plan.schema()), String::from("tag1")),
|
||||
(expr_col("tag2", &plan.schema()), String::from("tag2")),
|
||||
],
|
||||
plan,
|
||||
)
|
||||
.unwrap(),
|
||||
);
|
||||
let plan = Arc::new(
|
||||
ProjectionExec::try_new(
|
||||
vec![(expr_col("tag1", &plan.schema()), String::from("tag1"))],
|
||||
plan,
|
||||
)
|
||||
.unwrap(),
|
||||
);
|
||||
let opt = ProjectionPushdown::default();
|
||||
let test = OptimizationTest::new(plan, opt);
|
||||
insta::assert_yaml_snapshot!(
|
||||
test,
|
||||
@r###"
|
||||
---
|
||||
input:
|
||||
- " ProjectionExec: expr=[tag1@0 as tag1]"
|
||||
- " ProjectionExec: expr=[tag1@0 as tag1, tag2@1 as tag2]"
|
||||
- " EmptyExec: produce_one_row=false"
|
||||
output:
|
||||
Ok:
|
||||
- " EmptyExec: produce_one_row=false"
|
||||
"###
|
||||
);
|
||||
let empty_exec = test
|
||||
.output_plan()
|
||||
.unwrap()
|
||||
.as_any()
|
||||
.downcast_ref::<EmptyExec>()
|
||||
.unwrap();
|
||||
let expected_schema = Schema::new(vec![Field::new("tag1", DataType::Utf8, true)]);
|
||||
assert_eq!(empty_exec.schema().as_ref(), &expected_schema);
|
||||
}
|
||||
|
||||
// since `DeduplicateExec` and `FilterExec` both use `wrap_user_into_projections`, we only test a few variants for `DeduplicateExec`
|
||||
#[test]
|
||||
fn test_dedup_projection_split1() {
|
||||
|
|
|
@ -4,7 +4,7 @@ use datafusion::{
|
|||
config::ConfigOptions,
|
||||
error::Result,
|
||||
physical_optimizer::PhysicalOptimizerRule,
|
||||
physical_plan::{rewrite::TreeNodeRewritable, sorts::sort::SortExec, ExecutionPlan},
|
||||
physical_plan::{sorts::sort::SortExec, tree_node::TreeNodeRewritable, ExecutionPlan},
|
||||
};
|
||||
|
||||
/// Removes [`SortExec`] if it is no longer needed.
|
||||
|
|
|
@ -5,7 +5,7 @@ use datafusion::{
|
|||
error::Result,
|
||||
physical_optimizer::PhysicalOptimizerRule,
|
||||
physical_plan::{
|
||||
rewrite::TreeNodeRewritable, sorts::sort::SortExec, union::UnionExec, ExecutionPlan,
|
||||
sorts::sort::SortExec, tree_node::TreeNodeRewritable, union::UnionExec, ExecutionPlan,
|
||||
},
|
||||
};
|
||||
|
||||
|
|
|
@ -4,7 +4,7 @@ use datafusion::{
|
|||
config::ConfigOptions,
|
||||
error::Result,
|
||||
physical_optimizer::PhysicalOptimizerRule,
|
||||
physical_plan::{rewrite::TreeNodeRewritable, union::UnionExec, ExecutionPlan},
|
||||
physical_plan::{tree_node::TreeNodeRewritable, union::UnionExec, ExecutionPlan},
|
||||
};
|
||||
|
||||
/// Optimizer that replaces nested [`UnionExec`]s with a single level.
|
||||
|
|
|
@ -4,7 +4,7 @@ use datafusion::{
|
|||
config::ConfigOptions,
|
||||
error::Result,
|
||||
physical_optimizer::PhysicalOptimizerRule,
|
||||
physical_plan::{rewrite::TreeNodeRewritable, union::UnionExec, ExecutionPlan},
|
||||
physical_plan::{tree_node::TreeNodeRewritable, union::UnionExec, ExecutionPlan},
|
||||
};
|
||||
|
||||
/// Optimizer that replaces [`UnionExec`] with a single child node w/ the child note itself.
|
||||
|
|
|
@ -1089,7 +1089,7 @@ impl Deduplicater {
|
|||
|
||||
// Create the bottom node RecordBatchesExec for this chunk
|
||||
let mut input = chunks_to_physical_nodes(
|
||||
&input_schema,
|
||||
&input_schema.as_arrow(),
|
||||
output_sort_key,
|
||||
vec![Arc::clone(&chunk)],
|
||||
predicate,
|
||||
|
@ -1267,7 +1267,7 @@ impl Deduplicater {
|
|||
debug!("Build one scan RecordBatchesExec for all non duplicated chunks even if empty");
|
||||
|
||||
plans.push(chunks_to_physical_nodes(
|
||||
output_schema,
|
||||
&output_schema.as_arrow(),
|
||||
output_sort_key,
|
||||
chunks.into_no_duplicates(deduplication),
|
||||
predicate,
|
||||
|
@ -1452,7 +1452,7 @@ mod test {
|
|||
|
||||
// IOx scan operator
|
||||
let input = chunks_to_physical_nodes(
|
||||
chunk.schema(),
|
||||
&chunk.schema().as_arrow(),
|
||||
None,
|
||||
vec![Arc::clone(&chunk)],
|
||||
Predicate::default(),
|
||||
|
@ -1540,7 +1540,7 @@ mod test {
|
|||
|
||||
// IOx scan operator
|
||||
let input = chunks_to_physical_nodes(
|
||||
chunk.schema(),
|
||||
&chunk.schema().as_arrow(),
|
||||
None,
|
||||
vec![Arc::clone(&chunk)],
|
||||
Predicate::default(),
|
||||
|
|
|
@ -4,6 +4,7 @@ use crate::{
|
|||
provider::record_batch_exec::RecordBatchesExec, util::arrow_sort_key_exprs, QueryChunk,
|
||||
QueryChunkData,
|
||||
};
|
||||
use arrow::datatypes::SchemaRef;
|
||||
use datafusion::{
|
||||
datasource::{listing::PartitionedFile, object_store::ObjectStoreUrl},
|
||||
physical_expr::execution_props::ExecutionProps,
|
||||
|
@ -135,14 +136,14 @@ fn combine_sort_key(
|
|||
/// pushdown ([`RecordBatchesExec`] has NO builtin filter function). Delete predicates are NOT applied at all. The
|
||||
/// caller is responsible for wrapping the output node into appropriate filter nodes.
|
||||
pub fn chunks_to_physical_nodes(
|
||||
iox_schema: &Schema,
|
||||
schema: &SchemaRef,
|
||||
output_sort_key: Option<&SortKey>,
|
||||
chunks: Vec<Arc<dyn QueryChunk>>,
|
||||
predicate: Predicate,
|
||||
target_partitions: usize,
|
||||
) -> Arc<dyn ExecutionPlan> {
|
||||
if chunks.is_empty() {
|
||||
return Arc::new(EmptyExec::new(false, iox_schema.as_arrow()));
|
||||
return Arc::new(EmptyExec::new(false, Arc::clone(schema)));
|
||||
}
|
||||
|
||||
let mut record_batch_chunks: Vec<Arc<dyn QueryChunk>> = vec![];
|
||||
|
@ -177,7 +178,7 @@ pub fn chunks_to_physical_nodes(
|
|||
if !record_batch_chunks.is_empty() {
|
||||
output_nodes.push(Arc::new(RecordBatchesExec::new(
|
||||
record_batch_chunks,
|
||||
iox_schema.as_arrow(),
|
||||
Arc::clone(schema),
|
||||
)));
|
||||
}
|
||||
let mut parquet_chunks: Vec<_> = parquet_chunks.into_iter().collect();
|
||||
|
@ -202,14 +203,12 @@ pub fn chunks_to_physical_nodes(
|
|||
);
|
||||
|
||||
// Tell datafusion about the sort key, if any
|
||||
let file_schema = iox_schema.as_arrow();
|
||||
let output_ordering =
|
||||
sort_key.map(|sort_key| arrow_sort_key_exprs(&sort_key, &file_schema));
|
||||
let output_ordering = sort_key.map(|sort_key| arrow_sort_key_exprs(&sort_key, schema));
|
||||
|
||||
let props = ExecutionProps::new();
|
||||
let filter_expr = predicate.filter_expr()
|
||||
.and_then(|filter_expr| {
|
||||
match create_physical_expr_from_schema(&props, &filter_expr, &file_schema) {
|
||||
match create_physical_expr_from_schema(&props, &filter_expr, schema) {
|
||||
Ok(f) => Some(f),
|
||||
Err(e) => {
|
||||
warn!(%e, ?filter_expr, "Error creating physical filter expression, can not push down");
|
||||
|
@ -220,7 +219,7 @@ pub fn chunks_to_physical_nodes(
|
|||
|
||||
let base_config = FileScanConfig {
|
||||
object_store_url,
|
||||
file_schema,
|
||||
file_schema: Arc::clone(schema),
|
||||
file_groups,
|
||||
statistics: Statistics::default(),
|
||||
projection: None,
|
||||
|
@ -361,7 +360,7 @@ mod tests {
|
|||
|
||||
#[test]
|
||||
fn test_chunks_to_physical_nodes_empty() {
|
||||
let schema = TestChunk::new("table").schema().clone();
|
||||
let schema = TestChunk::new("table").schema().as_arrow();
|
||||
let plan = chunks_to_physical_nodes(&schema, None, vec![], Predicate::new(), 2);
|
||||
insta::assert_yaml_snapshot!(
|
||||
format_execution_plan(&plan),
|
||||
|
@ -375,7 +374,7 @@ mod tests {
|
|||
#[test]
|
||||
fn test_chunks_to_physical_nodes_recordbatch() {
|
||||
let chunk = TestChunk::new("table");
|
||||
let schema = chunk.schema().clone();
|
||||
let schema = chunk.schema().as_arrow();
|
||||
let plan =
|
||||
chunks_to_physical_nodes(&schema, None, vec![Arc::new(chunk)], Predicate::new(), 2);
|
||||
insta::assert_yaml_snapshot!(
|
||||
|
@ -391,7 +390,7 @@ mod tests {
|
|||
#[test]
|
||||
fn test_chunks_to_physical_nodes_parquet_one_file() {
|
||||
let chunk = TestChunk::new("table").with_dummy_parquet_file();
|
||||
let schema = chunk.schema().clone();
|
||||
let schema = chunk.schema().as_arrow();
|
||||
let plan =
|
||||
chunks_to_physical_nodes(&schema, None, vec![Arc::new(chunk)], Predicate::new(), 2);
|
||||
insta::assert_yaml_snapshot!(
|
||||
|
@ -409,7 +408,7 @@ mod tests {
|
|||
let chunk1 = TestChunk::new("table").with_id(0).with_dummy_parquet_file();
|
||||
let chunk2 = TestChunk::new("table").with_id(1).with_dummy_parquet_file();
|
||||
let chunk3 = TestChunk::new("table").with_id(2).with_dummy_parquet_file();
|
||||
let schema = chunk1.schema().clone();
|
||||
let schema = chunk1.schema().as_arrow();
|
||||
let plan = chunks_to_physical_nodes(
|
||||
&schema,
|
||||
None,
|
||||
|
@ -435,7 +434,7 @@ mod tests {
|
|||
let chunk2 = TestChunk::new("table")
|
||||
.with_id(1)
|
||||
.with_dummy_parquet_file_and_store("iox2://");
|
||||
let schema = chunk1.schema().clone();
|
||||
let schema = chunk1.schema().as_arrow();
|
||||
let plan = chunks_to_physical_nodes(
|
||||
&schema,
|
||||
None,
|
||||
|
@ -458,7 +457,7 @@ mod tests {
|
|||
fn test_chunks_to_physical_nodes_mixed() {
|
||||
let chunk1 = TestChunk::new("table").with_dummy_parquet_file();
|
||||
let chunk2 = TestChunk::new("table");
|
||||
let schema = chunk1.schema().clone();
|
||||
let schema = chunk1.schema().as_arrow();
|
||||
let plan = chunks_to_physical_nodes(
|
||||
&schema,
|
||||
None,
|
||||
|
|
|
@ -4,6 +4,7 @@ use crate::plan::rewriter::rewrite_statement;
|
|||
use crate::plan::util::{binary_operator_to_df_operator, Schemas};
|
||||
use crate::plan::var_ref::{column_type_to_var_ref_data_type, var_ref_data_type_to_data_type};
|
||||
use arrow::datatypes::DataType;
|
||||
use datafusion::catalog::TableReference;
|
||||
use datafusion::common::{DFSchema, DFSchemaRef, DataFusionError, Result, ScalarValue, ToDFSchema};
|
||||
use datafusion::logical_expr::expr_rewriter::{normalize_col, ExprRewritable, ExprRewriter};
|
||||
use datafusion::logical_expr::logical_plan::builder::project;
|
||||
|
@ -725,8 +726,9 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
|
|||
/// by the [`rewrite_statement`] function.
|
||||
fn create_table_ref(&self, table_name: String) -> Result<Option<LogicalPlan>> {
|
||||
Ok(if let Ok(source) = self.s.get_table_provider(&table_name) {
|
||||
let table_ref = TableReference::bare(table_name.to_string());
|
||||
Some(project(
|
||||
LogicalPlanBuilder::scan(&table_name, source, None)?.build()?,
|
||||
LogicalPlanBuilder::scan(table_ref, source, None)?.build()?,
|
||||
iter::once(lit_dict(&table_name).alias(INFLUXQL_MEASUREMENT_COLUMN_NAME)),
|
||||
)?)
|
||||
} else {
|
||||
|
|
|
@ -216,6 +216,7 @@ impl IoxGetRequest {
|
|||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use arrow_flight::sql::CommandStatementQuery;
|
||||
use assert_matches::assert_matches;
|
||||
use generated_types::influxdata::iox::querier::v1::read_info::QueryType;
|
||||
|
||||
|
@ -385,7 +386,9 @@ mod tests {
|
|||
|
||||
#[test]
|
||||
fn round_trip_flightsql() {
|
||||
let cmd = FlightSQLCommand::CommandStatementQuery("select * from foo".into());
|
||||
let cmd = FlightSQLCommand::CommandStatementQuery(CommandStatementQuery {
|
||||
query: "select * from foo".into(),
|
||||
});
|
||||
|
||||
let request = IoxGetRequest {
|
||||
namespace_name: "foo_blarg".into(),
|
||||
|
|
|
@ -9,7 +9,7 @@ license.workspace = true
|
|||
arrow = { workspace = true, features = ["prettyprint"] }
|
||||
arrow-flight = { workspace = true }
|
||||
arrow_util = { path = "../arrow_util" }
|
||||
assert_cmd = "2.0.8"
|
||||
assert_cmd = "2.0.9"
|
||||
bytes = "1.4"
|
||||
data_types = { path = "../data_types" }
|
||||
dml = { path = "../dml" }
|
||||
|
|
|
@ -29,9 +29,9 @@ bytes = { version = "1" }
|
|||
chrono = { version = "0.4", default-features = false, features = ["alloc", "clock", "serde"] }
|
||||
crossbeam-utils = { version = "0.8" }
|
||||
crypto-common = { version = "0.1", default-features = false, features = ["std"] }
|
||||
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "612eb1d0ce338af7980fa906df8796eb47c4be44" }
|
||||
datafusion-optimizer = { git = "https://github.com/apache/arrow-datafusion.git", rev = "612eb1d0ce338af7980fa906df8796eb47c4be44", default-features = false, features = ["crypto_expressions", "regex_expressions", "unicode_expressions"] }
|
||||
datafusion-physical-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev = "612eb1d0ce338af7980fa906df8796eb47c4be44", default-features = false, features = ["crypto_expressions", "regex_expressions", "unicode_expressions"] }
|
||||
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "4afd67a0e496e1834ad6184629f28e60f66b2777" }
|
||||
datafusion-optimizer = { git = "https://github.com/apache/arrow-datafusion.git", rev = "4afd67a0e496e1834ad6184629f28e60f66b2777", default-features = false, features = ["crypto_expressions", "regex_expressions", "unicode_expressions"] }
|
||||
datafusion-physical-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev = "4afd67a0e496e1834ad6184629f28e60f66b2777", default-features = false, features = ["crypto_expressions", "regex_expressions", "unicode_expressions"] }
|
||||
digest = { version = "0.10", features = ["mac", "std"] }
|
||||
either = { version = "1" }
|
||||
fixedbitset = { version = "0.4" }
|
||||
|
@ -63,7 +63,7 @@ parking_lot = { version = "0.12", features = ["arc_lock"] }
|
|||
parquet = { version = "34", features = ["async", "experimental"] }
|
||||
petgraph = { version = "0.6" }
|
||||
phf_shared = { version = "0.11" }
|
||||
predicates = { version = "2" }
|
||||
predicates = { version = "3" }
|
||||
prost = { version = "0.11" }
|
||||
prost-types = { version = "0.11" }
|
||||
rand = { version = "0.8", features = ["small_rng"] }
|
||||
|
|
Loading…
Reference in New Issue