Merge branch 'main' into crepererum/querier_cleanup

pull/24376/head
kodiakhq[bot] 2022-03-01 19:51:38 +00:00 committed by GitHub
commit 14a3f33424
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 894 additions and 635 deletions

View File

@ -195,7 +195,7 @@ jobs:
- image: postgres
environment:
POSTGRES_HOST_AUTH_METHOD: trust
resource_class: xlarge # use of a smaller executor tends crashes on link
resource_class: 2xlarge # use of a smaller executor tends crashes on link
environment:
# Disable incremental compilation to avoid overhead. We are not preserving these files anyway.
CARGO_INCREMENTAL: "0"
@ -247,7 +247,7 @@ jobs:
test_heappy:
docker:
- image: quay.io/influxdb/rust:ci
resource_class: xlarge # use of a smaller executor tends crashes on link
resource_class: xlarge # use of a smaller executor tends crashes on link
environment:
# Disable incremental compilation to avoid overhead. We are not preserving these files anyway.
CARGO_INCREMENTAL: "0"
@ -326,7 +326,7 @@ jobs:
build_dev:
docker:
- image: quay.io/influxdb/rust:ci
resource_class: xlarge # use of a smaller executor tends crashes on link
resource_class: xlarge # use of a smaller executor tends crashes on link
environment:
# Disable incremental compilation to avoid overhead. We are not preserving these files anyway.
CARGO_INCREMENTAL: "0"
@ -403,7 +403,7 @@ jobs:
# kinda small node)
machine:
image: ubuntu-2004:202111-01
resource_class: xlarge # use of a smaller executor tends crashes on link
resource_class: xlarge # use of a smaller executor tends crashes on link
environment:
# Disable incremental compilation to avoid overhead. We are not preserving these files anyway.
CARGO_INCREMENTAL: "0"
@ -425,14 +425,12 @@ jobs:
COMMIT_SHA="$(git rev-parse --short HEAD)"
RUST_VERSION="$(sed -E -ne 's/channel = "(.*)"/\1/p' rust-toolchain.toml)"
BRANCH="$(echo "$CIRCLE_BRANCH" | tr '[:upper:]' '[:lower:]' | sed 's/[^a-z0-9]/_/g')"
docker buildx build \
--build-arg RUST_VERSION="$RUST_VERSION" \
--build-arg RUSTFLAGS="-C target-feature=+avx2 -C link-arg=-fuse-ld=lld" \
--progress plain \
--tag quay.io/influxdb/iox:"$COMMIT_SHA" \
--tag quay.io/influxdb/iox:"$BRANCH" \
.
docker buildx build \
--build-arg FEATURES="" \
@ -441,7 +439,6 @@ jobs:
--build-arg RUSTFLAGS="-C target-feature=+avx2 -C link-arg=-fuse-ld=lld" \
--progress plain \
--tag quay.io/influxdb/iox_data_generator:"$COMMIT_SHA" \
--tag quay.io/influxdb/iox_data_generator:"$BRANCH" \
.
docker buildx build \
--build-arg FEATURES="" \
@ -450,24 +447,52 @@ jobs:
--build-arg RUSTFLAGS="-C target-feature=+avx2 -C link-arg=-fuse-ld=lld" \
--progress plain \
--tag quay.io/influxdb/iox_gitops_adapter:"$COMMIT_SHA" \
--tag quay.io/influxdb/iox_gitops_adapter:"$BRANCH" \
.
docker run -it --rm quay.io/influxdb/iox:$COMMIT_SHA debug print-cpu
docker push --all-tags quay.io/influxdb/iox
docker push --all-tags quay.io/influxdb/iox_data_generator
docker push --all-tags quay.io/influxdb/iox_gitops_adapter
echo "export COMMIT_SHA=${COMMIT_SHA}" >> $BASH_ENV
docker push quay.io/influxdb/iox:"$COMMIT_SHA"
docker push quay.io/influxdb/iox_data_generator:"$COMMIT_SHA"
docker push quay.io/influxdb/iox_gitops_adapter:"$COMMIT_SHA"
# linking might take a while and doesn't produce CLI output
no_output_timeout: 20m
no_output_timeout: 30m
- cache_save
deploy_release:
docker:
- image: cimg/base:2021.04
steps:
- setup_remote_docker:
version: 19.03.13
docker_layer_caching: true
- checkout
- run: |
echo "$QUAY_INFLUXDB_IOX_PASS" | docker login quay.io --username $QUAY_INFLUXDB_IOX_USER --password-stdin
- run:
name: Update docker branch tags
command: |
COMMIT_SHA="$(git rev-parse --short HEAD)"
BRANCH="$(echo "$CIRCLE_BRANCH" | tr '[:upper:]' '[:lower:]' | sed 's/[^a-z0-9]/_/g')"
docker pull quay.io/influxdb/iox:"$COMMIT_SHA"
docker pull quay.io/influxdb/iox_data_generator:"$COMMIT_SHA"
docker pull quay.io/influxdb/iox_gitops_adapter:"$COMMIT_SHA"
docker tag quay.io/influxdb/iox:"$COMMIT_SHA" quay.io/influxdb/iox:"$BRANCH"
docker tag quay.io/influxdb/iox_data_generator:"$COMMIT_SHA" quay.io/influxdb/iox_data_generator:"$BRANCH"
docker tag quay.io/influxdb/iox_gitops_adapter:"$COMMIT_SHA" quay.io/influxdb/iox_gitops_adapter:"$BRANCH"
docker push quay.io/influxdb/iox:"$BRANCH"
docker push quay.io/influxdb/iox_data_generator:"$BRANCH"
docker push quay.io/influxdb/iox_gitops_adapter:"$BRANCH"
echo "export COMMIT_SHA=${COMMIT_SHA}" >> $BASH_ENV
- run:
name: Deploy tags
command: |
echo "$QUAY_PASS" | docker login quay.io --username $QUAY_USER --password-stdin
./.circleci/get-deploy-tags.sh "${COMMIT_SHA}"
- cache_save
# Prepare the CI image used for other tasks.
#
@ -495,7 +520,6 @@ jobs:
docker build -t quay.io/influxdb/rust:$COMMIT_SHA -t quay.io/influxdb/rust:ci -f docker/Dockerfile.ci --build-arg RUST_VERSION=$RUST_VERSION .
docker push --all-tags quay.io/influxdb/rust
parameters:
ci_image:
description: "Trigger build of CI image"
@ -513,7 +537,9 @@ workflows:
# CI for all pull requests.
ci:
when:
not: << pipeline.parameters.ci_image >>
and:
- not: << pipeline.parameters.ci_image >>
- not: << pipeline.parameters.build_perf >>
jobs:
- fmt
- lint
@ -530,7 +556,11 @@ workflows:
filters:
branches:
only: main
requires: # Only do a release build if all tests have passed
- deploy_release:
filters:
branches:
only: main
requires: # Only deploy if all tests have passed
- fmt
- lint
- cargo_audit
@ -540,6 +570,7 @@ workflows:
- test_heappy
- test_perf
- build_dev
- build_release
- doc
# Manual build of CI image
@ -564,6 +595,9 @@ workflows:
when: << pipeline.parameters.build_perf >>
jobs:
- build_release
- deploy_release:
requires:
- build_release
# Nightly rebuild of the build container
ci_image_nightly:

2
Cargo.lock generated
View File

@ -4240,7 +4240,7 @@ dependencies = [
[[package]]
name = "rskafka"
version = "0.2.0"
source = "git+https://github.com/influxdata/rskafka.git?rev=3af1939fd47f8680d40074dc3fd2e2a4a0da6b8c#3af1939fd47f8680d40074dc3fd2e2a4a0da6b8c"
source = "git+https://github.com/influxdata/rskafka.git?rev=f7eef8560ac871e056887a62b7014582835cba78#f7eef8560ac871e056887a62b7014582835cba78"
dependencies = [
"async-trait",
"bytes",

View File

@ -93,7 +93,7 @@ exclude = [
# This profile optimizes for runtime performance and small binary size at the expense of longer
# build times. It's most suitable for final release builds.
[profile.release]
codegen-units = 1
codegen-units = 16
debug = true
lto = "thin"

View File

@ -14,10 +14,13 @@
use parking_lot::Mutex;
use pin_project::{pin_project, pinned_drop};
use std::{pin::Pin, sync::Arc};
use tokio::sync::oneshot::Receiver;
use tokio::sync::oneshot::{error::RecvError, Receiver};
use tokio_util::sync::CancellationToken;
use futures::Future;
use futures::{
future::{BoxFuture, Shared},
Future, FutureExt, TryFutureExt,
};
use observability_deps::tracing::warn;
@ -55,10 +58,21 @@ pub type Error = tokio::sync::oneshot::error::RecvError;
#[derive(Debug)]
pub struct Job<T> {
cancel: CancellationToken,
detached: bool,
#[pin]
rx: Receiver<T>,
}
impl<T> Job<T> {
/// Detached job so dropping it does not cancel it.
///
/// You must ensure that this task eventually finishes, otherwise [`DedicatedExecutor::join`] may never return!
pub fn detach(mut self) {
// cannot destructure `Self` because we implement `Drop`, so we use a flag instead to prevent cancelation.
self.detached = true;
}
}
impl<T> Future for Job<T> {
type Output = Result<T, Error>;
@ -74,7 +88,9 @@ impl<T> Future for Job<T> {
#[pinned_drop]
impl<T> PinnedDrop for Job<T> {
fn drop(self: Pin<&mut Self>) {
self.cancel.cancel();
if !self.detached {
self.cancel.cancel();
}
}
}
@ -90,13 +106,37 @@ pub struct DedicatedExecutor {
struct State {
/// Channel for requests -- the dedicated executor takes requests
/// from here and runs them.
///
/// This is `None` if we triggered shutdown.
requests: Option<std::sync::mpsc::Sender<Task>>,
/// The thread that is doing the work
thread: Option<std::thread::JoinHandle<()>>,
/// Receiver side indicating that shutdown is complete.
completed_shutdown: Shared<BoxFuture<'static, Result<(), Arc<RecvError>>>>,
/// Task counter (uses Arc strong count).
task_refs: Arc<()>,
/// The inner thread that can be used to join during drop.
thread: Option<std::thread::JoinHandle<()>>,
}
// IMPORTANT: Implement `Drop` for `State`, NOT for `DedicatedExecutor`, because the executor can be cloned and clones
// share their inner state.
impl Drop for State {
fn drop(&mut self) {
if self.requests.is_some() {
warn!("DedicatedExecutor dropped without calling shutdown()");
self.requests = None;
}
// do NOT poll the shared future if we are panicking due to https://github.com/rust-lang/futures-rs/issues/2575
if !std::thread::panicking() && self.completed_shutdown.clone().now_or_never().is_none() {
warn!("DedicatedExecutor dropped without waiting for worker termination",);
}
// join thread but don't care about the results
self.thread.take().expect("not dropped yet").join().ok();
}
}
/// The default worker priority (value passed to `libc::setpriority`);
@ -129,7 +169,8 @@ impl DedicatedExecutor {
pub fn new(thread_name: &str, num_threads: usize) -> Self {
let thread_name = thread_name.to_string();
let (tx, rx) = std::sync::mpsc::channel::<Task>();
let (tx_tasks, rx_tasks) = std::sync::mpsc::channel::<Task>();
let (tx_shutdown, rx_shutdown) = tokio::sync::oneshot::channel();
let thread = std::thread::spawn(move || {
let runtime = tokio::runtime::Builder::new_multi_thread()
@ -146,7 +187,7 @@ impl DedicatedExecutor {
// We therefore use a RwLock to wait for tasks to complete
let join = Arc::new(tokio::sync::RwLock::new(()));
while let Ok(task) = rx.recv() {
while let Ok(task) = rx_tasks.recv() {
let join = Arc::clone(&join);
let handle = join.read_owned().await;
@ -158,13 +199,17 @@ impl DedicatedExecutor {
// Wait for all tasks to finish
join.write().await;
// signal shutdown, but it's OK if the other side is gone
tx_shutdown.send(()).ok();
})
});
let state = State {
requests: Some(tx),
thread: Some(thread),
requests: Some(tx_tasks),
task_refs: Arc::new(()),
completed_shutdown: rx_shutdown.map_err(Arc::new).boxed().shared(),
thread: Some(thread),
};
Self {
@ -205,7 +250,11 @@ impl DedicatedExecutor {
warn!("tried to schedule task on an executor that was shutdown");
}
Job { rx, cancel }
Job {
rx,
cancel,
detached: false,
}
}
/// Number of currently active tasks.
@ -231,20 +280,23 @@ impl DedicatedExecutor {
/// Only the first all to `join` will actually wait for the
/// executing thread to complete. All other calls to join will
/// complete immediately.
pub fn join(&self) {
///
/// # Panic / Drop
/// [`DedicatedExecutor`] implements shutdown on [`Drop`]. You should just use this behavior and NOT call
/// [`join`](Self::join) manually during [`Drop`] or panics because this might lead to another panic, see
/// <https://github.com/rust-lang/futures-rs/issues/2575>.
pub async fn join(&self) {
self.shutdown();
// take the thread out when mutex is held
let thread = {
let mut state = self.state.lock();
state.thread.take()
// get handle mutex is held
let handle = {
let state = self.state.lock();
state.completed_shutdown.clone()
};
// wait for completion while not holding the mutex to avoid
// deadlocks
if let Some(thread) = thread {
thread.join().ok();
}
handle.await.expect("Thread died?")
}
}
@ -296,6 +348,8 @@ mod tests {
// should be able to get the result
assert_eq!(dedicated_task.await.unwrap(), 42);
exec.join().await;
}
#[tokio::test]
@ -306,6 +360,40 @@ mod tests {
let dedicated_task = exec.clone().spawn(do_work(42, Arc::clone(&barrier)));
barrier.wait();
assert_eq!(dedicated_task.await.unwrap(), 42);
exec.join().await;
}
#[tokio::test]
async fn drop_clone() {
let barrier = Arc::new(Barrier::new(2));
let exec = DedicatedExecutor::new("Test DedicatedExecutor", 1);
drop(exec.clone());
let task = exec.spawn(do_work(42, Arc::clone(&barrier)));
barrier.wait();
assert_eq!(task.await.unwrap(), 42);
exec.join().await;
}
#[tokio::test]
#[should_panic(expected = "foo")]
async fn just_panic() {
struct S(DedicatedExecutor);
impl Drop for S {
fn drop(&mut self) {
self.0.join().now_or_never();
}
}
let exec = DedicatedExecutor::new("Test DedicatedExecutor", 1);
let _s = S(exec);
// this must not lead to a double-panic and SIGILL
panic!("foo")
}
#[tokio::test]
@ -324,7 +412,7 @@ mod tests {
assert_eq!(dedicated_task1.await.unwrap(), 11);
assert_eq!(dedicated_task2.await.unwrap(), 42);
exec.join();
exec.join().await;
}
#[tokio::test]
@ -334,6 +422,8 @@ mod tests {
let dedicated_task = exec.spawn(async move { get_current_thread_priority() });
assert_eq!(dedicated_task.await.unwrap(), WORKER_PRIORITY);
exec.join().await;
}
#[tokio::test]
@ -356,6 +446,8 @@ mod tests {
// Validate the inner task ran to completion (aka it did not panic)
assert_eq!(dedicated_task.await.unwrap(), 25);
exec.join().await;
}
#[tokio::test]
@ -371,6 +463,8 @@ mod tests {
// should not be able to get the result
dedicated_task.await.unwrap_err();
exec.join().await;
}
#[tokio::test]
@ -390,6 +484,8 @@ mod tests {
// task should complete successfully
assert_eq!(dedicated_task.await.unwrap(), 42);
exec.join().await;
}
#[tokio::test]
@ -402,6 +498,8 @@ mod tests {
// task should complete, but return an error
dedicated_task.await.unwrap_err();
exec.join().await;
}
#[tokio::test]
@ -409,20 +507,30 @@ mod tests {
let exec = DedicatedExecutor::new("Test DedicatedExecutor", 1);
// shutdown the clone (but not the exec)
exec.clone().join();
exec.clone().join().await;
// Simulate trying to submit tasks once executor has shutdown
let dedicated_task = exec.spawn(async { 11 });
// task should complete, but return an error
dedicated_task.await.unwrap_err();
exec.join().await;
}
#[tokio::test]
async fn executor_join() {
let exec = DedicatedExecutor::new("Test DedicatedExecutor", 1);
// test it doesn't hang
exec.join()
exec.join().await;
}
#[tokio::test]
async fn executor_join2() {
let exec = DedicatedExecutor::new("Test DedicatedExecutor", 1);
// test it doesn't hang
exec.join().await;
exec.join().await;
}
#[tokio::test]
@ -430,9 +538,9 @@ mod tests {
async fn executor_clone_join() {
let exec = DedicatedExecutor::new("Test DedicatedExecutor", 1);
// test it doesn't hang
exec.clone().join();
exec.clone().join();
exec.join();
exec.clone().join().await;
exec.clone().join().await;
exec.join().await;
}
#[tokio::test]
@ -463,7 +571,39 @@ mod tests {
wait_for_tasks(&exec, 0).await;
assert_eq!(exec.tasks(), 0);
exec.join()
exec.join().await;
}
#[tokio::test]
async fn detach_receiver() {
// create empty executor
let exec = DedicatedExecutor::new("Test DedicatedExecutor", 1);
assert_eq!(exec.tasks(), 0);
// create first task
// `detach()` consumes the task but doesn't abort the task (in contrast to `drop`). We'll proof the that the
// task is still running by linking it to a 2nd task using a barrier with size 3 (two tasks plus the main thread).
let barrier = Arc::new(AsyncBarrier::new(3));
let dedicated_task = exec.spawn(do_work_async(11, Arc::clone(&barrier)));
dedicated_task.detach();
assert_eq!(exec.tasks(), 1);
// create second task
let dedicated_task = exec.spawn(do_work_async(22, Arc::clone(&barrier)));
assert_eq!(exec.tasks(), 2);
// wait a bit just to make sure that our tasks doesn't get dropped
tokio::time::sleep(Duration::from_millis(10)).await;
assert_eq!(exec.tasks(), 2);
// tasks should be unblocked because they both wait on the same barrier
// unblock tasks
barrier.wait().await;
wait_for_tasks(&exec, 0).await;
let result = dedicated_task.await.unwrap();
assert_eq!(result, 22);
exec.join().await;
}
/// Wait for the barrier and then return `result`

View File

@ -11,7 +11,7 @@ use write_buffer::{
pub struct WriteBufferConfig {
/// The type of write buffer to use.
///
/// Valid options are: file, kafka, rskafka
/// Valid options are: file, kafka
#[clap(long = "--write-buffer", env = "INFLUXDB_IOX_WRITE_BUFFER_TYPE")]
pub(crate) type_: String,

View File

@ -59,7 +59,7 @@ pub async fn command(connection: Connection, config: Config) -> Result<()> {
async fn check_health(connection: Connection) -> Result<()> {
let response = health::Client::new(connection)
.check_storage()
.check_arrow()
.await
.context(ClientSnafu)?;

View File

@ -11,6 +11,7 @@ use crate::influxdb_ioxd::{
serving_readiness::ServingReadiness,
};
pub(crate) mod flight;
pub(crate) mod testing;
/// Returns the name of the gRPC service S.

View File

@ -0,0 +1,587 @@
//! Implements the native gRPC IOx query API using Arrow Flight
use std::fmt::Debug;
use std::task::Poll;
use std::{pin::Pin, sync::Arc};
use arrow::{
array::{make_array, ArrayRef, MutableArrayData},
datatypes::{DataType, Field, Schema, SchemaRef},
error::ArrowError,
record_batch::RecordBatch,
};
use arrow_flight::{
flight_service_server::{FlightService as Flight, FlightServiceServer as FlightServer},
Action, ActionType, Criteria, Empty, FlightData, FlightDescriptor, FlightInfo,
HandshakeRequest, HandshakeResponse, PutResult, SchemaAsIpc, SchemaResult, Ticket,
};
use datafusion::physical_plan::ExecutionPlan;
use futures::{SinkExt, Stream, StreamExt};
use pin_project::{pin_project, pinned_drop};
use query::QueryDatabase;
use serde::Deserialize;
use snafu::{ResultExt, Snafu};
use tokio::task::JoinHandle;
use tonic::{Request, Response, Streaming};
use data_types::{DatabaseName, DatabaseNameError};
use observability_deps::tracing::{info, warn};
use query::exec::{ExecutionContextProvider, IOxExecutionContext};
use crate::influxdb_ioxd::planner::Planner;
#[allow(clippy::enum_variant_names)]
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Invalid ticket. Error: {:?} Ticket: {:?}", source, ticket))]
InvalidTicket {
source: std::string::FromUtf8Error,
ticket: Vec<u8>,
},
#[snafu(display("Invalid query, could not parse '{}': {}", query, source))]
InvalidQuery {
query: String,
source: serde_json::Error,
},
#[snafu(display("Database {} not found", database_name))]
DatabaseNotFound { database_name: String },
#[snafu(display(
"Internal error reading points from database {}: {}",
database_name,
source
))]
Query {
database_name: String,
source: Box<dyn std::error::Error + Send + Sync>,
},
#[snafu(display("Invalid database name: {}", source))]
InvalidDatabaseName { source: DatabaseNameError },
#[snafu(display("Invalid RecordBatch: {}", source))]
InvalidRecordBatch { source: ArrowError },
#[snafu(display("Failed to hydrate dictionary: {}", source))]
DictionaryError { source: ArrowError },
#[snafu(display("Error while planning query: {}", source))]
Planning {
source: crate::influxdb_ioxd::planner::Error,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
impl From<Error> for tonic::Status {
/// Converts a result from the business logic into the appropriate tonic
/// status
fn from(err: Error) -> Self {
// An explicit match on the Error enum will ensure appropriate
// logging is handled for any new error variants.
let msg = "Error handling Flight gRPC request";
match err {
Error::DatabaseNotFound { .. }
| Error::InvalidTicket { .. }
| Error::InvalidQuery { .. }
// TODO(edd): this should be `debug`. Keeping at info whilst IOx still in early development
| Error::InvalidDatabaseName { .. } => info!(?err, msg),
Error::Query { .. } => info!(?err, msg),
Error::DictionaryError { .. }
| Error::InvalidRecordBatch { .. }
| Error::Planning { .. } => warn!(?err, msg),
}
err.to_status()
}
}
impl Error {
/// Converts a result from the business logic into the appropriate tonic
/// status
fn to_status(&self) -> tonic::Status {
use tonic::Status;
match &self {
Self::InvalidTicket { .. } => Status::invalid_argument(self.to_string()),
Self::InvalidQuery { .. } => Status::invalid_argument(self.to_string()),
Self::DatabaseNotFound { .. } => Status::not_found(self.to_string()),
Self::Query { .. } => Status::internal(self.to_string()),
Self::InvalidDatabaseName { .. } => Status::invalid_argument(self.to_string()),
Self::InvalidRecordBatch { .. } => Status::internal(self.to_string()),
Self::Planning { .. } => Status::invalid_argument(self.to_string()),
Self::DictionaryError { .. } => Status::internal(self.to_string()),
}
}
}
type TonicStream<T> = Pin<Box<dyn Stream<Item = Result<T, tonic::Status>> + Send + Sync + 'static>>;
#[derive(Deserialize, Debug)]
/// Body of the `Ticket` serialized and sent to the do_get endpoint; this should
/// be shared with the read API probably...
struct ReadInfo {
database_name: String,
sql_query: String,
}
pub trait QueryDatabaseProvider: std::fmt::Debug + Send + Sync + 'static {
type Db: ExecutionContextProvider + QueryDatabase;
fn db(&self, db_name: &DatabaseName<'_>) -> std::result::Result<Arc<Self::Db>, tonic::Status>;
}
/// Concrete implementation of the gRPC Arrow Flight Service API
#[derive(Debug)]
struct FlightService<S>
where
S: QueryDatabaseProvider,
{
server: Arc<S>,
}
pub fn make_server<S>(server: Arc<S>) -> FlightServer<impl Flight>
where
S: QueryDatabaseProvider,
{
FlightServer::new(FlightService { server })
}
#[tonic::async_trait]
impl<S> Flight for FlightService<S>
where
S: QueryDatabaseProvider,
{
type HandshakeStream = TonicStream<HandshakeResponse>;
type ListFlightsStream = TonicStream<FlightInfo>;
type DoGetStream = TonicStream<FlightData>;
type DoPutStream = TonicStream<PutResult>;
type DoActionStream = TonicStream<arrow_flight::Result>;
type ListActionsStream = TonicStream<ActionType>;
type DoExchangeStream = TonicStream<FlightData>;
async fn get_schema(
&self,
_request: Request<FlightDescriptor>,
) -> Result<Response<SchemaResult>, tonic::Status> {
Err(tonic::Status::unimplemented("Not yet implemented"))
}
async fn do_get(
&self,
request: Request<Ticket>,
) -> Result<Response<Self::DoGetStream>, tonic::Status> {
let span_ctx = request.extensions().get().cloned();
let ticket = request.into_inner();
let json_str = String::from_utf8(ticket.ticket.to_vec()).context(InvalidTicketSnafu {
ticket: ticket.ticket,
})?;
let read_info: ReadInfo =
serde_json::from_str(&json_str).context(InvalidQuerySnafu { query: &json_str })?;
let database =
DatabaseName::new(&read_info.database_name).context(InvalidDatabaseNameSnafu)?;
let db = self.server.db(&database)?;
let _query_completed_token = db.record_query("sql", Box::new(read_info.sql_query.clone()));
let ctx = db.new_query_context(span_ctx);
let physical_plan = Planner::new(&ctx)
.sql(&read_info.sql_query)
.await
.context(PlanningSnafu)?;
let output = GetStream::new(ctx, physical_plan, read_info.database_name).await?;
Ok(Response::new(Box::pin(output) as Self::DoGetStream))
}
async fn handshake(
&self,
request: Request<Streaming<HandshakeRequest>>,
) -> Result<Response<Self::HandshakeStream>, tonic::Status> {
let request = request.into_inner().message().await?.unwrap();
let response = HandshakeResponse {
protocol_version: request.protocol_version,
payload: request.payload,
};
let output = futures::stream::iter(std::iter::once(Ok(response)));
Ok(Response::new(Box::pin(output) as Self::HandshakeStream))
}
async fn list_flights(
&self,
_request: Request<Criteria>,
) -> Result<Response<Self::ListFlightsStream>, tonic::Status> {
Err(tonic::Status::unimplemented("Not yet implemented"))
}
async fn get_flight_info(
&self,
_request: Request<FlightDescriptor>,
) -> Result<Response<FlightInfo>, tonic::Status> {
Err(tonic::Status::unimplemented("Not yet implemented"))
}
async fn do_put(
&self,
_request: Request<Streaming<FlightData>>,
) -> Result<Response<Self::DoPutStream>, tonic::Status> {
Err(tonic::Status::unimplemented("Not yet implemented"))
}
async fn do_action(
&self,
_request: Request<Action>,
) -> Result<Response<Self::DoActionStream>, tonic::Status> {
Err(tonic::Status::unimplemented("Not yet implemented"))
}
async fn list_actions(
&self,
_request: Request<Empty>,
) -> Result<Response<Self::ListActionsStream>, tonic::Status> {
Err(tonic::Status::unimplemented("Not yet implemented"))
}
async fn do_exchange(
&self,
_request: Request<Streaming<FlightData>>,
) -> Result<Response<Self::DoExchangeStream>, tonic::Status> {
Err(tonic::Status::unimplemented("Not yet implemented"))
}
}
#[pin_project(PinnedDrop)]
struct GetStream {
#[pin]
rx: futures::channel::mpsc::Receiver<Result<FlightData, tonic::Status>>,
join_handle: JoinHandle<()>,
done: bool,
}
impl GetStream {
async fn new(
ctx: IOxExecutionContext,
physical_plan: Arc<dyn ExecutionPlan>,
database_name: String,
) -> Result<Self, tonic::Status> {
// setup channel
let (mut tx, rx) = futures::channel::mpsc::channel::<Result<FlightData, tonic::Status>>(1);
// get schema
let schema = Arc::new(optimize_schema(&physical_plan.schema()));
// setup stream
let options = arrow::ipc::writer::IpcWriteOptions::default();
let schema_flight_data = SchemaAsIpc::new(&schema, &options).into();
let mut stream_record_batches = ctx
.execute_stream(Arc::clone(&physical_plan))
.await
.map_err(|e| Box::new(e) as _)
.context(QuerySnafu {
database_name: &database_name,
})?;
let join_handle = tokio::spawn(async move {
if tx.send(Ok(schema_flight_data)).await.is_err() {
// receiver gone
return;
}
while let Some(batch_or_err) = stream_record_batches.next().await {
match batch_or_err {
Ok(batch) => {
match optimize_record_batch(&batch, Arc::clone(&schema)) {
Ok(batch) => {
let (flight_dictionaries, flight_batch) =
arrow_flight::utils::flight_data_from_arrow_batch(
&batch, &options,
);
for dict in flight_dictionaries {
if tx.send(Ok(dict)).await.is_err() {
// receiver is gone
return;
}
}
if tx.send(Ok(flight_batch)).await.is_err() {
// receiver is gone
return;
}
}
Err(e) => {
// failure sending here is OK because we're cutting the stream anyways
tx.send(Err(e.into())).await.ok();
// end stream
return;
}
}
}
Err(e) => {
// failure sending here is OK because we're cutting the stream anyways
tx.send(Err(Error::Query {
database_name: database_name.clone(),
source: Box::new(e),
}
.into()))
.await
.ok();
// end stream
return;
}
}
}
});
Ok(Self {
rx,
join_handle,
done: false,
})
}
}
#[pinned_drop]
impl PinnedDrop for GetStream {
fn drop(self: Pin<&mut Self>) {
self.join_handle.abort();
}
}
impl Stream for GetStream {
type Item = Result<FlightData, tonic::Status>;
fn poll_next(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let this = self.project();
if *this.done {
Poll::Ready(None)
} else {
match this.rx.poll_next(cx) {
Poll::Ready(None) => {
*this.done = true;
Poll::Ready(None)
}
e @ Poll::Ready(Some(Err(_))) => {
*this.done = true;
e
}
other => other,
}
}
}
}
/// Some batches are small slices of the underlying arrays.
/// At this stage we only know the number of rows in the record batch
/// and the sizes in bytes of the backing buffers of the column arrays.
/// There is no straight-forward relationship between these two quantities,
/// since some columns can host variable length data such as strings.
///
/// However we can apply a quick&dirty heuristic:
/// if the backing buffer is two orders of magnitudes bigger
/// than the number of rows in the result set, we assume
/// that deep-copying the record batch is cheaper than the and transfer costs.
///
/// Possible improvements: take the type of the columns into consideration
/// and perhaps sample a few element sizes (taking care of not doing more work
/// than to always copying the results in the first place).
///
/// Or we just fix this upstream in
/// arrow_flight::utils::flight_data_from_arrow_batch and re-encode the array
/// into a smaller buffer while we have to copy stuff around anyway.
///
/// See rationale and discussions about future improvements on
/// <https://github.com/influxdata/influxdb_iox/issues/1133>
fn optimize_record_batch(batch: &RecordBatch, schema: SchemaRef) -> Result<RecordBatch, Error> {
let max_buf_len = batch
.columns()
.iter()
.map(|a| a.get_array_memory_size())
.max()
.unwrap_or_default();
let columns: Result<Vec<_>, _> = batch
.columns()
.iter()
.map(|column| {
if matches!(column.data_type(), DataType::Dictionary(_, _)) {
hydrate_dictionary(column)
} else if max_buf_len > batch.num_rows() * 100 {
Ok(deep_clone_array(column))
} else {
Ok(Arc::clone(column))
}
})
.collect();
RecordBatch::try_new(schema, columns?).context(InvalidRecordBatchSnafu)
}
fn deep_clone_array(array: &ArrayRef) -> ArrayRef {
let mut mutable = MutableArrayData::new(vec![array.data()], false, 0);
mutable.extend(0, 0, array.len());
make_array(mutable.freeze())
}
/// Convert dictionary types to underlying types
/// See hydrate_dictionary for more information
fn optimize_schema(schema: &Schema) -> Schema {
let fields = schema
.fields()
.iter()
.map(|field| match field.data_type() {
DataType::Dictionary(_, value_type) => Field::new(
field.name(),
value_type.as_ref().clone(),
field.is_nullable(),
),
_ => field.clone(),
})
.collect();
Schema::new(fields)
}
/// Hydrates a dictionary to its underlying type
///
/// An IPC response, streaming or otherwise, defines its schema up front
/// which defines the mapping from dictionary IDs. It then sends these
/// dictionaries over the wire.
///
/// This requires identifying the different dictionaries in use, assigning
/// them IDs, and sending new dictionaries, delta or otherwise, when needed
///
/// This is tracked by #1318
///
/// For now we just hydrate the dictionaries to their underlying type
fn hydrate_dictionary(array: &ArrayRef) -> Result<ArrayRef, Error> {
match array.data_type() {
DataType::Dictionary(_, value) => {
arrow::compute::cast(array, value).context(DictionarySnafu)
}
_ => unreachable!("not a dictionary"),
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use arrow::array::StringArray;
use arrow::{
array::{DictionaryArray, UInt32Array},
datatypes::{DataType, Int32Type},
};
use arrow_flight::utils::flight_data_to_arrow_batch;
use datafusion::physical_plan::limit::truncate_batch;
use super::*;
#[test]
fn test_deep_clone_array() {
let mut builder = UInt32Array::builder(1000);
builder.append_slice(&[1, 2, 3, 4, 5, 6]).unwrap();
let array: ArrayRef = Arc::new(builder.finish());
assert_eq!(array.len(), 6);
let sliced = array.slice(0, 2);
assert_eq!(sliced.len(), 2);
let deep_cloned = deep_clone_array(&sliced);
assert!(sliced.data().get_array_memory_size() > deep_cloned.data().get_array_memory_size());
}
#[test]
fn test_encode_flight_data() {
let options = arrow::ipc::writer::IpcWriteOptions::default();
let c1 = UInt32Array::from(vec![1, 2, 3, 4, 5, 6]);
let batch = RecordBatch::try_from_iter(vec![("a", Arc::new(c1) as ArrayRef)])
.expect("cannot create record batch");
let schema = batch.schema();
let (_, baseline_flight_batch) =
arrow_flight::utils::flight_data_from_arrow_batch(&batch, &options);
let big_batch = truncate_batch(&batch, batch.num_rows() - 1);
let optimized_big_batch =
optimize_record_batch(&big_batch, Arc::clone(&schema)).expect("failed to optimize");
let (_, optimized_big_flight_batch) =
arrow_flight::utils::flight_data_from_arrow_batch(&optimized_big_batch, &options);
assert_eq!(
baseline_flight_batch.data_body.len(),
optimized_big_flight_batch.data_body.len()
);
let small_batch = truncate_batch(&batch, 1);
let optimized_small_batch =
optimize_record_batch(&small_batch, Arc::clone(&schema)).expect("failed to optimize");
let (_, optimized_small_flight_batch) =
arrow_flight::utils::flight_data_from_arrow_batch(&optimized_small_batch, &options);
assert!(
baseline_flight_batch.data_body.len() > optimized_small_flight_batch.data_body.len()
);
}
#[test]
fn test_encode_flight_data_dictionary() {
let options = arrow::ipc::writer::IpcWriteOptions::default();
let c1 = UInt32Array::from(vec![1, 2, 3, 4, 5, 6]);
let c2: DictionaryArray<Int32Type> = vec![
Some("foo"),
Some("bar"),
None,
Some("fiz"),
None,
Some("foo"),
]
.into_iter()
.collect();
let batch =
RecordBatch::try_from_iter(vec![("a", Arc::new(c1) as ArrayRef), ("b", Arc::new(c2))])
.expect("cannot create record batch");
let original_schema = batch.schema();
let optimized_schema = Arc::new(optimize_schema(&original_schema));
let optimized_batch = optimize_record_batch(&batch, Arc::clone(&optimized_schema)).unwrap();
let (_, flight_data) =
arrow_flight::utils::flight_data_from_arrow_batch(&optimized_batch, &options);
let batch =
flight_data_to_arrow_batch(&flight_data, Arc::clone(&optimized_schema), &[None, None])
.unwrap();
// Should hydrate string dictionary for transport
assert_eq!(optimized_schema.field(1).data_type(), &DataType::Utf8);
let array = batch
.column(1)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let expected = StringArray::from(vec![
Some("foo"),
Some("bar"),
None,
Some("fiz"),
None,
Some("foo"),
]);
assert_eq!(array, &expected)
}
}

View File

@ -99,7 +99,7 @@ impl ServerType for DatabaseServerType {
info!("server completed shutting down");
self.application.join();
self.application.join().await;
info!("shared application state completed shutting down");
}

View File

@ -1,577 +1,24 @@
//! Implements the native gRPC IOx query API using Arrow Flight
use std::fmt::Debug;
use std::task::Poll;
use std::{pin::Pin, sync::Arc};
use std::sync::Arc;
use arrow::{
array::{make_array, ArrayRef, MutableArrayData},
datatypes::{DataType, Field, Schema, SchemaRef},
error::ArrowError,
record_batch::RecordBatch,
use arrow_flight::flight_service_server::{
FlightService as Flight, FlightServiceServer as FlightServer,
};
use arrow_flight::{
flight_service_server::{FlightService as Flight, FlightServiceServer as FlightServer},
Action, ActionType, Criteria, Empty, FlightData, FlightDescriptor, FlightInfo,
HandshakeRequest, HandshakeResponse, PutResult, SchemaAsIpc, SchemaResult, Ticket,
};
use datafusion::physical_plan::ExecutionPlan;
use futures::{SinkExt, Stream, StreamExt};
use pin_project::{pin_project, pinned_drop};
use query::QueryDatabase;
use serde::Deserialize;
use snafu::{ResultExt, Snafu};
use tokio::task::JoinHandle;
use tonic::{Request, Response, Streaming};
use data_types::{DatabaseName, DatabaseNameError};
use observability_deps::tracing::{info, warn};
use query::exec::{ExecutionContextProvider, IOxExecutionContext};
use data_types::DatabaseName;
use db::Db;
use server::Server;
use crate::influxdb_ioxd::rpc::flight::{make_server as make_server_inner, QueryDatabaseProvider};
use super::error::default_server_error_handler;
use crate::influxdb_ioxd::planner::Planner;
#[allow(clippy::enum_variant_names)]
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Invalid ticket. Error: {:?} Ticket: {:?}", source, ticket))]
InvalidTicket {
source: std::string::FromUtf8Error,
ticket: Vec<u8>,
},
#[snafu(display("Invalid query, could not parse '{}': {}", query, source))]
InvalidQuery {
query: String,
source: serde_json::Error,
},
impl QueryDatabaseProvider for Server {
type Db = Db;
#[snafu(display("Database {} not found", database_name))]
DatabaseNotFound { database_name: String },
#[snafu(display(
"Internal error reading points from database {}: {}",
database_name,
source
))]
Query {
database_name: String,
source: Box<dyn std::error::Error + Send + Sync>,
},
#[snafu(display("Invalid database name: {}", source))]
InvalidDatabaseName { source: DatabaseNameError },
#[snafu(display("Invalid RecordBatch: {}", source))]
InvalidRecordBatch { source: ArrowError },
#[snafu(display("Failed to hydrate dictionary: {}", source))]
DictionaryError { source: ArrowError },
#[snafu(display("Error while planning query: {}", source))]
Planning {
source: crate::influxdb_ioxd::planner::Error,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
impl From<Error> for tonic::Status {
/// Converts a result from the business logic into the appropriate tonic
/// status
fn from(err: Error) -> Self {
// An explicit match on the Error enum will ensure appropriate
// logging is handled for any new error variants.
let msg = "Error handling Flight gRPC request";
match err {
Error::DatabaseNotFound { .. }
| Error::InvalidTicket { .. }
| Error::InvalidQuery { .. }
// TODO(edd): this should be `debug`. Keeping at info whilst IOx still in early development
| Error::InvalidDatabaseName { .. } => info!(?err, msg),
Error::Query { .. } => info!(?err, msg),
Error::DictionaryError { .. }
| Error::InvalidRecordBatch { .. }
| Error::Planning { .. } => warn!(?err, msg),
}
err.to_status()
fn db(&self, db_name: &DatabaseName<'_>) -> Result<Arc<Self::Db>, tonic::Status> {
self.db(db_name).map_err(default_server_error_handler)
}
}
impl Error {
/// Converts a result from the business logic into the appropriate tonic
/// status
fn to_status(&self) -> tonic::Status {
use tonic::Status;
match &self {
Self::InvalidTicket { .. } => Status::invalid_argument(self.to_string()),
Self::InvalidQuery { .. } => Status::invalid_argument(self.to_string()),
Self::DatabaseNotFound { .. } => Status::not_found(self.to_string()),
Self::Query { .. } => Status::internal(self.to_string()),
Self::InvalidDatabaseName { .. } => Status::invalid_argument(self.to_string()),
Self::InvalidRecordBatch { .. } => Status::internal(self.to_string()),
Self::Planning { .. } => Status::invalid_argument(self.to_string()),
Self::DictionaryError { .. } => Status::internal(self.to_string()),
}
}
}
type TonicStream<T> = Pin<Box<dyn Stream<Item = Result<T, tonic::Status>> + Send + Sync + 'static>>;
#[derive(Deserialize, Debug)]
/// Body of the `Ticket` serialized and sent to the do_get endpoint; this should
/// be shared with the read API probably...
struct ReadInfo {
database_name: String,
sql_query: String,
}
/// Concrete implementation of the gRPC Arrow Flight Service API
#[derive(Debug)]
struct FlightService {
server: Arc<Server>,
}
pub fn make_server(server: Arc<Server>) -> FlightServer<impl Flight> {
FlightServer::new(FlightService { server })
}
#[tonic::async_trait]
impl Flight for FlightService {
type HandshakeStream = TonicStream<HandshakeResponse>;
type ListFlightsStream = TonicStream<FlightInfo>;
type DoGetStream = TonicStream<FlightData>;
type DoPutStream = TonicStream<PutResult>;
type DoActionStream = TonicStream<arrow_flight::Result>;
type ListActionsStream = TonicStream<ActionType>;
type DoExchangeStream = TonicStream<FlightData>;
async fn get_schema(
&self,
_request: Request<FlightDescriptor>,
) -> Result<Response<SchemaResult>, tonic::Status> {
Err(tonic::Status::unimplemented("Not yet implemented"))
}
async fn do_get(
&self,
request: Request<Ticket>,
) -> Result<Response<Self::DoGetStream>, tonic::Status> {
let span_ctx = request.extensions().get().cloned();
let ticket = request.into_inner();
let json_str = String::from_utf8(ticket.ticket.to_vec()).context(InvalidTicketSnafu {
ticket: ticket.ticket,
})?;
let read_info: ReadInfo =
serde_json::from_str(&json_str).context(InvalidQuerySnafu { query: &json_str })?;
let database =
DatabaseName::new(&read_info.database_name).context(InvalidDatabaseNameSnafu)?;
let db = self
.server
.db(&database)
.map_err(default_server_error_handler)?;
let _query_completed_token = db.record_query("sql", Box::new(read_info.sql_query.clone()));
let ctx = db.new_query_context(span_ctx);
let physical_plan = Planner::new(&ctx)
.sql(&read_info.sql_query)
.await
.context(PlanningSnafu)?;
let output = GetStream::new(ctx, physical_plan, read_info.database_name).await?;
Ok(Response::new(Box::pin(output) as Self::DoGetStream))
}
async fn handshake(
&self,
request: Request<Streaming<HandshakeRequest>>,
) -> Result<Response<Self::HandshakeStream>, tonic::Status> {
let request = request.into_inner().message().await?.unwrap();
let response = HandshakeResponse {
protocol_version: request.protocol_version,
payload: request.payload,
};
let output = futures::stream::iter(std::iter::once(Ok(response)));
Ok(Response::new(Box::pin(output) as Self::HandshakeStream))
}
async fn list_flights(
&self,
_request: Request<Criteria>,
) -> Result<Response<Self::ListFlightsStream>, tonic::Status> {
Err(tonic::Status::unimplemented("Not yet implemented"))
}
async fn get_flight_info(
&self,
_request: Request<FlightDescriptor>,
) -> Result<Response<FlightInfo>, tonic::Status> {
Err(tonic::Status::unimplemented("Not yet implemented"))
}
async fn do_put(
&self,
_request: Request<Streaming<FlightData>>,
) -> Result<Response<Self::DoPutStream>, tonic::Status> {
Err(tonic::Status::unimplemented("Not yet implemented"))
}
async fn do_action(
&self,
_request: Request<Action>,
) -> Result<Response<Self::DoActionStream>, tonic::Status> {
Err(tonic::Status::unimplemented("Not yet implemented"))
}
async fn list_actions(
&self,
_request: Request<Empty>,
) -> Result<Response<Self::ListActionsStream>, tonic::Status> {
Err(tonic::Status::unimplemented("Not yet implemented"))
}
async fn do_exchange(
&self,
_request: Request<Streaming<FlightData>>,
) -> Result<Response<Self::DoExchangeStream>, tonic::Status> {
Err(tonic::Status::unimplemented("Not yet implemented"))
}
}
#[pin_project(PinnedDrop)]
struct GetStream {
#[pin]
rx: futures::channel::mpsc::Receiver<Result<FlightData, tonic::Status>>,
join_handle: JoinHandle<()>,
done: bool,
}
impl GetStream {
async fn new(
ctx: IOxExecutionContext,
physical_plan: Arc<dyn ExecutionPlan>,
database_name: String,
) -> Result<Self, tonic::Status> {
// setup channel
let (mut tx, rx) = futures::channel::mpsc::channel::<Result<FlightData, tonic::Status>>(1);
// get schema
let schema = Arc::new(optimize_schema(&physical_plan.schema()));
// setup stream
let options = arrow::ipc::writer::IpcWriteOptions::default();
let schema_flight_data = SchemaAsIpc::new(&schema, &options).into();
let mut stream_record_batches = ctx
.execute_stream(Arc::clone(&physical_plan))
.await
.map_err(|e| Box::new(e) as _)
.context(QuerySnafu {
database_name: &database_name,
})?;
let join_handle = tokio::spawn(async move {
if tx.send(Ok(schema_flight_data)).await.is_err() {
// receiver gone
return;
}
while let Some(batch_or_err) = stream_record_batches.next().await {
match batch_or_err {
Ok(batch) => {
match optimize_record_batch(&batch, Arc::clone(&schema)) {
Ok(batch) => {
let (flight_dictionaries, flight_batch) =
arrow_flight::utils::flight_data_from_arrow_batch(
&batch, &options,
);
for dict in flight_dictionaries {
if tx.send(Ok(dict)).await.is_err() {
// receiver is gone
return;
}
}
if tx.send(Ok(flight_batch)).await.is_err() {
// receiver is gone
return;
}
}
Err(e) => {
// failure sending here is OK because we're cutting the stream anyways
tx.send(Err(e.into())).await.ok();
// end stream
return;
}
}
}
Err(e) => {
// failure sending here is OK because we're cutting the stream anyways
tx.send(Err(Error::Query {
database_name: database_name.clone(),
source: Box::new(e),
}
.into()))
.await
.ok();
// end stream
return;
}
}
}
});
Ok(Self {
rx,
join_handle,
done: false,
})
}
}
#[pinned_drop]
impl PinnedDrop for GetStream {
fn drop(self: Pin<&mut Self>) {
self.join_handle.abort();
}
}
impl Stream for GetStream {
type Item = Result<FlightData, tonic::Status>;
fn poll_next(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let this = self.project();
if *this.done {
Poll::Ready(None)
} else {
match this.rx.poll_next(cx) {
Poll::Ready(None) => {
*this.done = true;
Poll::Ready(None)
}
e @ Poll::Ready(Some(Err(_))) => {
*this.done = true;
e
}
other => other,
}
}
}
}
/// Some batches are small slices of the underlying arrays.
/// At this stage we only know the number of rows in the record batch
/// and the sizes in bytes of the backing buffers of the column arrays.
/// There is no straight-forward relationship between these two quantities,
/// since some columns can host variable length data such as strings.
///
/// However we can apply a quick&dirty heuristic:
/// if the backing buffer is two orders of magnitudes bigger
/// than the number of rows in the result set, we assume
/// that deep-copying the record batch is cheaper than the and transfer costs.
///
/// Possible improvements: take the type of the columns into consideration
/// and perhaps sample a few element sizes (taking care of not doing more work
/// than to always copying the results in the first place).
///
/// Or we just fix this upstream in
/// arrow_flight::utils::flight_data_from_arrow_batch and re-encode the array
/// into a smaller buffer while we have to copy stuff around anyway.
///
/// See rationale and discussions about future improvements on
/// <https://github.com/influxdata/influxdb_iox/issues/1133>
fn optimize_record_batch(batch: &RecordBatch, schema: SchemaRef) -> Result<RecordBatch, Error> {
let max_buf_len = batch
.columns()
.iter()
.map(|a| a.get_array_memory_size())
.max()
.unwrap_or_default();
let columns: Result<Vec<_>, _> = batch
.columns()
.iter()
.map(|column| {
if matches!(column.data_type(), DataType::Dictionary(_, _)) {
hydrate_dictionary(column)
} else if max_buf_len > batch.num_rows() * 100 {
Ok(deep_clone_array(column))
} else {
Ok(Arc::clone(column))
}
})
.collect();
RecordBatch::try_new(schema, columns?).context(InvalidRecordBatchSnafu)
}
fn deep_clone_array(array: &ArrayRef) -> ArrayRef {
let mut mutable = MutableArrayData::new(vec![array.data()], false, 0);
mutable.extend(0, 0, array.len());
make_array(mutable.freeze())
}
/// Convert dictionary types to underlying types
/// See hydrate_dictionary for more information
fn optimize_schema(schema: &Schema) -> Schema {
let fields = schema
.fields()
.iter()
.map(|field| match field.data_type() {
DataType::Dictionary(_, value_type) => Field::new(
field.name(),
value_type.as_ref().clone(),
field.is_nullable(),
),
_ => field.clone(),
})
.collect();
Schema::new(fields)
}
/// Hydrates a dictionary to its underlying type
///
/// An IPC response, streaming or otherwise, defines its schema up front
/// which defines the mapping from dictionary IDs. It then sends these
/// dictionaries over the wire.
///
/// This requires identifying the different dictionaries in use, assigning
/// them IDs, and sending new dictionaries, delta or otherwise, when needed
///
/// This is tracked by #1318
///
/// For now we just hydrate the dictionaries to their underlying type
fn hydrate_dictionary(array: &ArrayRef) -> Result<ArrayRef, Error> {
match array.data_type() {
DataType::Dictionary(_, value) => {
arrow::compute::cast(array, value).context(DictionarySnafu)
}
_ => unreachable!("not a dictionary"),
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use arrow::array::StringArray;
use arrow::{
array::{DictionaryArray, UInt32Array},
datatypes::{DataType, Int32Type},
};
use arrow_flight::utils::flight_data_to_arrow_batch;
use datafusion::physical_plan::limit::truncate_batch;
use super::*;
#[test]
fn test_deep_clone_array() {
let mut builder = UInt32Array::builder(1000);
builder.append_slice(&[1, 2, 3, 4, 5, 6]).unwrap();
let array: ArrayRef = Arc::new(builder.finish());
assert_eq!(array.len(), 6);
let sliced = array.slice(0, 2);
assert_eq!(sliced.len(), 2);
let deep_cloned = deep_clone_array(&sliced);
assert!(sliced.data().get_array_memory_size() > deep_cloned.data().get_array_memory_size());
}
#[test]
fn test_encode_flight_data() {
let options = arrow::ipc::writer::IpcWriteOptions::default();
let c1 = UInt32Array::from(vec![1, 2, 3, 4, 5, 6]);
let batch = RecordBatch::try_from_iter(vec![("a", Arc::new(c1) as ArrayRef)])
.expect("cannot create record batch");
let schema = batch.schema();
let (_, baseline_flight_batch) =
arrow_flight::utils::flight_data_from_arrow_batch(&batch, &options);
let big_batch = truncate_batch(&batch, batch.num_rows() - 1);
let optimized_big_batch =
optimize_record_batch(&big_batch, Arc::clone(&schema)).expect("failed to optimize");
let (_, optimized_big_flight_batch) =
arrow_flight::utils::flight_data_from_arrow_batch(&optimized_big_batch, &options);
assert_eq!(
baseline_flight_batch.data_body.len(),
optimized_big_flight_batch.data_body.len()
);
let small_batch = truncate_batch(&batch, 1);
let optimized_small_batch =
optimize_record_batch(&small_batch, Arc::clone(&schema)).expect("failed to optimize");
let (_, optimized_small_flight_batch) =
arrow_flight::utils::flight_data_from_arrow_batch(&optimized_small_batch, &options);
assert!(
baseline_flight_batch.data_body.len() > optimized_small_flight_batch.data_body.len()
);
}
#[test]
fn test_encode_flight_data_dictionary() {
let options = arrow::ipc::writer::IpcWriteOptions::default();
let c1 = UInt32Array::from(vec![1, 2, 3, 4, 5, 6]);
let c2: DictionaryArray<Int32Type> = vec![
Some("foo"),
Some("bar"),
None,
Some("fiz"),
None,
Some("foo"),
]
.into_iter()
.collect();
let batch =
RecordBatch::try_from_iter(vec![("a", Arc::new(c1) as ArrayRef), ("b", Arc::new(c2))])
.expect("cannot create record batch");
let original_schema = batch.schema();
let optimized_schema = Arc::new(optimize_schema(&original_schema));
let optimized_batch = optimize_record_batch(&batch, Arc::clone(&optimized_schema)).unwrap();
let (_, flight_data) =
arrow_flight::utils::flight_data_from_arrow_batch(&optimized_batch, &options);
let batch =
flight_data_to_arrow_batch(&flight_data, Arc::clone(&optimized_schema), &[None, None])
.unwrap();
// Should hydrate string dictionary for transport
assert_eq!(optimized_schema.field(1).data_type(), &DataType::Utf8);
let array = batch
.column(1)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let expected = StringArray::from(vec![
Some("foo"),
Some("bar"),
None,
Some("fiz"),
None,
Some("foo"),
]);
assert_eq!(array, &expected)
}
make_server_inner(server)
}

View File

@ -1893,6 +1893,8 @@ mod tests {
assert_batches_sorted_eq!(&expected, &[data]);
assert_eq!(p.inner.read().snapshots[0].min_sequencer_number.get(), 8);
assert_eq!(p.inner.read().snapshots[0].max_sequencer_number.get(), 9);
exec.join().await;
}
#[tokio::test]

View File

@ -229,10 +229,13 @@ impl IngestHandler for IngestHandlerImpl {
panic!("Background worker '{name}' exited early!");
}
}
self.data.exec.join().await;
}
fn shutdown(&self) {
self.shutdown.cancel();
self.data.exec.shutdown();
}
}
@ -251,6 +254,8 @@ impl Drop for IngestHandlerImpl {
);
}
}
// `self.data.exec` implements `Drop`, so we don't need to do anything
}
}

View File

@ -299,6 +299,8 @@ mod tests {
"+-----------+------+-----------------------------+",
];
assert_batches_eq!(&expected, &output_batches);
exc.join().await;
}
#[tokio::test]
@ -334,6 +336,8 @@ mod tests {
"+------+-----------------------------+",
];
assert_batches_eq!(&expected, &output_batches);
exc.join().await;
}
#[tokio::test]
@ -364,6 +368,8 @@ mod tests {
// verify data: return nothing because the selected row already deleted
let expected = vec!["++", "++"];
assert_batches_eq!(&expected, &output_batches);
exc.join().await;
}
#[tokio::test]

View File

@ -121,23 +121,25 @@ impl Executor {
}
}
/// Initializes shutdown.
pub fn shutdown(&self) {
self.query_exec.shutdown();
self.reorg_exec.shutdown();
}
/// Stops all subsequent task executions, and waits for the worker
/// thread to complete. Note this will shutdown all created contexts.
///
/// Only the first all to `join` will actually wait for the
/// executing thread to complete. All other calls to join will
/// complete immediately.
pub fn join(&self) {
self.query_exec.join();
self.reorg_exec.join();
pub async fn join(&self) {
self.query_exec.join().await;
self.reorg_exec.join().await;
}
}
impl Drop for Executor {
fn drop(&mut self) {
self.join();
}
}
// No need to implement `Drop` because this is done by DedicatedExecutor already
/// Create a SchemaPivot node which an arbitrary input like
/// ColA | ColB | ColC
@ -265,6 +267,8 @@ mod tests {
let ctx = exec.new_context(ExecutorType::Query);
let result_strings = ctx.to_string_set(plan).await.unwrap();
assert_eq!(result_strings, expected_strings);
exec.join().await;
}
#[tokio::test]
@ -279,6 +283,8 @@ mod tests {
let results = ctx.to_string_set(plan).await.unwrap();
assert_eq!(results, StringSetRef::new(StringSet::new()));
exec.join().await;
}
#[tokio::test]
@ -295,6 +301,8 @@ mod tests {
let results = ctx.to_string_set(plan).await.unwrap();
assert_eq!(results, to_set(&["foo", "bar", "baz"]));
exec.join().await;
}
#[tokio::test]
@ -315,6 +323,8 @@ mod tests {
let results = ctx.to_string_set(plan).await.unwrap();
assert_eq!(results, to_set(&["foo", "bar", "baz"]));
exec.join().await;
}
#[tokio::test]
@ -339,6 +349,8 @@ mod tests {
let results = ctx.to_string_set(plan).await.unwrap();
assert_eq!(results, to_set(&["foo", "bar", "baz"]));
exec.join().await;
}
#[tokio::test]
@ -370,6 +382,8 @@ mod tests {
expected_error,
actual_error,
);
exec.join().await;
}
#[tokio::test]
@ -397,6 +411,8 @@ mod tests {
expected_error,
actual_error
);
exec.join().await;
}
#[tokio::test]
@ -418,6 +434,8 @@ mod tests {
let results = ctx.to_string_set(plan).await.expect("Executed plan");
assert_eq!(results, to_set(&["f1", "f2"]));
exec.join().await;
}
/// return a set for testing

View File

@ -111,6 +111,8 @@ mod test {
.unwrap();
assert_extracted_metrics!(extracted, 8);
executor.join().await;
}
// Extracted baseline metrics for the specified operator

View File

@ -1948,5 +1948,7 @@ mod tests {
"\nActual: {:?}\nExpected: {:?}",
actual_predicate, expected_predicate
);
executor.join().await;
}
}

View File

@ -376,6 +376,8 @@ mod test {
];
assert_batches_eq!(&expected, &batches);
executor.join().await;
}
#[tokio::test]
@ -425,6 +427,8 @@ mod test {
];
assert_batches_eq!(&expected, &batches);
executor.join().await;
}
#[tokio::test]
@ -489,5 +493,7 @@ mod test {
];
assert_batches_eq!(&expected, &batches1);
executor.join().await;
}
}

View File

@ -225,6 +225,8 @@ mod tests {
let ctx = exec.new_context(ExecutorType::Query);
let ss = ctx.to_string_set(plan).await.unwrap();
assert_eq!(ss, expected_ss);
exec.join().await;
}
fn to_string_set(v: &[&str]) -> StringSet {

View File

@ -98,7 +98,7 @@ impl ApplicationState {
&self.executor
}
pub fn join(&self) {
self.executor.join()
pub async fn join(&self) {
self.executor.join().await;
}
}

View File

@ -21,7 +21,8 @@ observability_deps = { path = "../observability_deps" }
parking_lot = "0.12"
pin-project = "1.0"
prost = "0.9"
rskafka = { git = "https://github.com/influxdata/rskafka.git", rev="3af1939fd47f8680d40074dc3fd2e2a4a0da6b8c", default-features = false, features = ["compression-snappy"] }
# TODO: Temporary additional logging (#3805)
rskafka = { git = "https://github.com/influxdata/rskafka.git", rev="f7eef8560ac871e056887a62b7014582835cba78", default-features = false, features = ["compression-snappy"] }
schema = { path = "../schema" }
time = { path = "../time" }
tokio = { version = "1.17", features = ["fs", "macros", "parking_lot", "rt", "sync", "time"] }

View File

@ -4,7 +4,7 @@ use data_types::sequence::Sequence;
use dml::{DmlMeta, DmlOperation, DmlWrite};
use hashbrown::{hash_map::Entry, HashMap};
use mutable_batch::MutableBatch;
use observability_deps::tracing::{info, warn};
use observability_deps::tracing::{error, info, warn};
use rskafka::{
client::producer::aggregator::{self, Aggregator, StatusDeaggregator, TryPush},
record::Record,
@ -240,7 +240,7 @@ pub struct DmlAggregator {
collector: Option<Arc<dyn TraceCollector>>,
/// Database name.
database_name: String,
database_name: Arc<str>,
/// Maximum batch size in bytes.
max_size: usize,
@ -258,14 +258,14 @@ pub struct DmlAggregator {
impl DmlAggregator {
pub fn new(
collector: Option<Arc<dyn TraceCollector>>,
database_name: String,
database_name: impl Into<Arc<str>>,
max_size: usize,
sequencer_id: u32,
time_provider: Arc<dyn TimeProvider>,
) -> Self {
Self {
collector,
database_name,
database_name: database_name.into(),
max_size,
sequencer_id,
state: DmlAggregatorState::default(),
@ -418,6 +418,7 @@ impl Aggregator for DmlAggregator {
records,
Deaggregator {
sequencer_id: self.sequencer_id,
database_name: Arc::clone(&self.database_name),
metadata,
tag_to_record: state.tag_to_record,
},
@ -484,6 +485,9 @@ pub struct Deaggregator {
/// Sequencer ID.
sequencer_id: u32,
/// Database name
database_name: Arc<str>,
/// Metadata for every record.
///
/// This is NOT per-tag, use `tag_to_record` to map tags to records first.
@ -504,13 +508,15 @@ impl StatusDeaggregator for Deaggregator {
tag: Self::Tag,
) -> Result<Self::Status, aggregator::Error> {
assert_eq!(input.len(), self.tag_to_record.len(), "invalid offsets");
assert!(
self.tag_to_record.len() > tag.0,
"tag {} out of range (tag_to_record: {:?}, offsets: {:?})",
tag.0,
self.tag_to_record,
input
);
if self.tag_to_record.len() <= tag.0 {
error!(
"tag {} out of range (database_name: {}, tag_to_record: {:?}, offsets: {:?})",
tag.0, self.database_name, self.tag_to_record, input
);
// TODO: Temporary non-fatal assertion to reduce log spam (#3805)
return Err("internal aggregator error: invalid tag".into());
}
let record = self.tag_to_record[tag.0];