Merge branch 'main' into cn/1.54

pull/24376/head
kodiakhq[bot] 2021-07-30 17:01:37 +00:00 committed by GitHub
commit 0297aae17e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 217 additions and 28 deletions

2
Cargo.lock generated
View File

@ -1619,6 +1619,7 @@ dependencies = [
"arrow-flight",
"arrow_util",
"assert_cmd",
"backtrace",
"byteorder",
"bytes",
"chrono",
@ -1640,6 +1641,7 @@ dependencies = [
"influxdb_line_protocol",
"internal_types",
"itertools 0.10.1",
"libc",
"logfmt",
"metrics",
"mutable_buffer",

View File

@ -77,6 +77,7 @@ trogging = { path = "trogging", features = ["structopt"] }
# Crates.io dependencies, in alphabetical order
arrow = { version = "5.0", features = ["prettyprint"] }
arrow-flight = "5.0"
backtrace = "0.3"
byteorder = "1.3.4"
bytes = "1.0"
chrono = "0.4"
@ -88,6 +89,7 @@ flate2 = "1.0"
futures = "0.3"
http = "0.2.0"
hyper = "0.14"
libc = { version = "0.2" }
once_cell = { version = "1.4.0", features = ["parking_lot"] }
opentelemetry-jaeger = { version = "0.12", features = ["tokio"] }
opentelemetry-otlp = "0.6"

View File

@ -20,6 +20,7 @@ futures-util = { version = "0.3.1", optional = true }
http = "0.2.3"
hyper = "0.14"
prost = "0.7"
rand = "0.8.3"
serde = "1.0.118"
serde_json = { version = "1.0.44", optional = true }
thiserror = "1.0.23"
@ -27,5 +28,4 @@ tokio = { version = "1.0", features = ["macros"] }
tonic = { version = "0.4.0" }
[dev-dependencies] # In alphabetical order
rand = "0.8.3"
serde_json = "1.0"

View File

@ -1,5 +1,6 @@
use std::{convert::TryFrom, sync::Arc};
use futures_util::stream;
use futures_util::stream::StreamExt;
use serde::Serialize;
use thiserror::Error;
@ -13,10 +14,11 @@ use arrow::{
};
use arrow_flight::{
flight_service_client::FlightServiceClient, utils::flight_data_to_arrow_batch, FlightData,
Ticket,
HandshakeRequest, Ticket,
};
use crate::connection::Connection;
use rand::Rng;
/// Error responses when querying an IOx database using the Arrow Flight gRPC
/// API.
@ -49,6 +51,10 @@ pub enum Error {
/// from the server.
#[error(transparent)]
GrpcError(#[from] tonic::Status),
/// Arrow Flight handshake failed.
#[error("Handshake failed")]
HandshakeFailed,
}
/// An IOx Arrow Flight gRPC API client.
@ -99,6 +105,29 @@ impl Client {
) -> Result<PerformQuery, Error> {
PerformQuery::new(self, database_name.into(), sql_query.into()).await
}
/// Perform a handshake with the server, as defined by the Arrow Flight API.
pub async fn handshake(&mut self) -> Result<(), Error> {
let request = HandshakeRequest {
protocol_version: 0,
payload: rand::thread_rng().gen::<[u8; 16]>().to_vec(),
};
let mut response = self
.inner
.handshake(stream::iter(vec![request.clone()]))
.await?
.into_inner();
if request.payload.eq(&response
.next()
.await
.ok_or(Error::HandshakeFailed)??
.payload)
{
Result::Ok(())
} else {
Result::Err(Error::HandshakeFailed)
}
}
}
// TODO: this should be shared

View File

@ -832,6 +832,7 @@ mod test {
fn table_summaries() {
let schema = SchemaBuilder::new()
.non_null_tag("env")
.tag("host")
.non_null_field("temp", Float64)
.non_null_field("counter", UInt64)
.non_null_field("icounter", Int64)
@ -847,6 +848,11 @@ mod test {
.into_iter()
.collect::<DictionaryArray<Int32Type>>(),
),
Arc::new(
(vec![None, None, None] as Vec<Option<&str>>)
.into_iter()
.collect::<DictionaryArray<Int32Type>>(),
),
Arc::new(Float64Array::from(vec![10.0, 30000.0, 4500.0])),
Arc::new(UInt64Array::from(vec![1000, 3000, 5000])),
Arc::new(Int64Array::from(vec![1000, -1000, 4000])),
@ -892,6 +898,11 @@ mod test {
distinct_count: Some(NonZeroU64::new(2).unwrap()),
}),
},
ColumnSummary {
name: "host".into(),
influxdb_type: Some(InfluxDbType::Tag),
stats: Statistics::String(StatValues::new_all_null(3)),
},
ColumnSummary {
name: "icounter".into(),
influxdb_type: Some(InfluxDbType::Field),

View File

@ -152,32 +152,32 @@ impl Column {
OwnedValue::String(min.clone()),
OwnedValue::String(max.clone()),
),
None => (OwnedValue::Null, OwnedValue::Null),
None => (OwnedValue::new_null(), OwnedValue::new_null()),
},
Self::Float(meta, _) => match meta.range {
Some((min, max)) => (
OwnedValue::Scalar(Scalar::F64(min)),
OwnedValue::Scalar(Scalar::F64(max)),
),
None => (OwnedValue::Null, OwnedValue::Null),
None => (OwnedValue::new_null(), OwnedValue::new_null()),
},
Self::Integer(meta, _) => match meta.range {
Some((min, max)) => (
OwnedValue::Scalar(Scalar::I64(min)),
OwnedValue::Scalar(Scalar::I64(max)),
),
None => (OwnedValue::Null, OwnedValue::Null),
None => (OwnedValue::new_null(), OwnedValue::new_null()),
},
Self::Unsigned(meta, _) => match meta.range {
Some((min, max)) => (
OwnedValue::Scalar(Scalar::U64(min)),
OwnedValue::Scalar(Scalar::U64(max)),
),
None => (OwnedValue::Null, OwnedValue::Null),
None => (OwnedValue::new_null(), OwnedValue::new_null()),
},
Self::Bool(meta, _) => match meta.range {
Some((min, max)) => (OwnedValue::Boolean(min), OwnedValue::Boolean(max)),
None => (OwnedValue::Null, OwnedValue::Null),
None => (OwnedValue::new_null(), OwnedValue::new_null()),
},
Self::ByteArray(_, _) => todo!(),
}
@ -1430,7 +1430,7 @@ mod test {
),
(
StringArray::from(vec![None, None]),
(OwnedValue::Null, OwnedValue::Null),
(OwnedValue::new_null(), OwnedValue::new_null()),
),
];
@ -1459,7 +1459,7 @@ mod test {
),
(
Int64Array::from(vec![None, None]),
(OwnedValue::Null, OwnedValue::Null),
(OwnedValue::new_null(), OwnedValue::new_null()),
),
];

View File

@ -218,8 +218,11 @@ impl std::fmt::Display for AggregateType {
}
}
/// Describes the semantic meaning of the column in a set of results. That is,
/// whether the column is a "tag", "field", "timestamp", or "other".
/// Describes the semantic meaning of the column in a set of results
/// and the column name.
///
/// Semantic meaning specifies if the column is a "tag", "field",
/// "timestamp", or "other".
#[derive(PartialEq, Debug, PartialOrd, Clone)]
pub enum ColumnType {
Tag(String),

View File

@ -705,6 +705,9 @@ impl MetaData {
distinct_count,
})
}
(OwnedValue::String(_), mismatch) => {
panic!("inconsistent min/max expected String got {}", mismatch)
}
(OwnedValue::Boolean(min), OwnedValue::Boolean(max)) => {
Statistics::Bool(StatValues {
min: Some(*min),
@ -714,6 +717,9 @@ impl MetaData {
distinct_count,
})
}
(OwnedValue::Boolean(_), mismatch) => {
panic!("inconsistent min/max expected Boolean got {}", mismatch)
}
(OwnedValue::Scalar(min), OwnedValue::Scalar(max)) => match (min, max) {
(Scalar::I64(min), Scalar::I64(max)) => Statistics::I64(StatValues {
min: Some(*min),
@ -722,6 +728,9 @@ impl MetaData {
null_count,
distinct_count,
}),
(Scalar::I64(_), mismatch) => {
panic!("inconsistent min/max expected I64 got {}", mismatch)
}
(Scalar::U64(min), Scalar::U64(max)) => Statistics::U64(StatValues {
min: Some(*min),
max: Some(*max),
@ -729,6 +738,9 @@ impl MetaData {
null_count,
distinct_count,
}),
(Scalar::U64(_), mismatch) => {
panic!("inconsistent min/max expected U64 got {}", mismatch)
}
(Scalar::F64(min), Scalar::F64(max)) => Statistics::F64(StatValues {
min: Some(*min),
max: Some(*max),
@ -736,15 +748,37 @@ impl MetaData {
null_count,
distinct_count,
}),
_ => panic!(
"unsupported type scalar stats in read buffer: {:?}, {:?}",
min, max
),
(Scalar::F64(_), mismatch) => {
panic!("inconsistent min/max expected F64 got {}", mismatch)
}
(Scalar::Null, Scalar::Null) => {
assert_eq!(
total_count, null_count,
"expected only null values: {:?}",
column_meta,
);
assert_eq!(
distinct_count,
std::num::NonZeroU64::new(1),
"distinct count for all null was not 1: {:?}",
column_meta,
);
make_null_stats(total_count, &column_meta.logical_data_type)
}
(Scalar::Null, mismatch) => {
panic!("inconsistent min/max expected NULL got {}", mismatch)
}
},
_ => panic!(
"unsupported type of stats in read buffer: {:?}",
column_meta.range
),
(OwnedValue::Scalar(_), mismatch) => {
panic!("inconsistent min/max expected Scalar got {}", mismatch)
}
(OwnedValue::ByteArray(_), OwnedValue::ByteArray(_)) => {
panic!("unsupported type statistcs type ByteArray")
}
(OwnedValue::ByteArray(_), mismatch) => {
panic!("inconsistent min/max expected ByteArray got {}", mismatch)
}
};
ColumnSummary {
@ -766,6 +800,24 @@ impl MetaData {
}
}
// Create statistics for the specified data type with no values
fn make_null_stats(
total_count: u64,
logical_data_type: &LogicalDataType,
) -> data_types::partition_metadata::Statistics {
use data_types::partition_metadata::{StatValues, Statistics};
use LogicalDataType::*;
match logical_data_type {
Integer => Statistics::I64(StatValues::new_all_null(total_count)),
Unsigned => Statistics::U64(StatValues::new_all_null(total_count)),
Float => Statistics::F64(StatValues::new_all_null(total_count)),
String => Statistics::String(StatValues::new_all_null(total_count)),
Binary => panic!("Binary statistics not supported"),
Boolean => Statistics::Bool(StatValues::new_all_null(total_count)),
}
}
// Builds new table meta-data from a collection of row groups. Useful
// for rebuilding state when a row group has been removed from the table.
impl From<&[Arc<RowGroup>]> for MetaData {
@ -998,6 +1050,8 @@ impl std::fmt::Display for DisplayReadAggregateResults<'_> {
#[cfg(test)]
mod test {
use data_types::partition_metadata::{StatValues, Statistics};
use super::*;
use crate::{
column::Column,
@ -1559,4 +1613,40 @@ west,host-b,100
assert_eq!(table.time_range().unwrap(), (-100, 3));
}
#[test]
fn null_stats_ifield() {
let actual = make_null_stats(12, &LogicalDataType::Integer);
assert_eq!(actual, Statistics::I64(StatValues::new_all_null(12)));
}
#[test]
fn null_stats_ufield() {
let actual = make_null_stats(12, &LogicalDataType::Unsigned);
assert_eq!(actual, Statistics::U64(StatValues::new_all_null(12)));
}
#[test]
fn null_stats_float() {
let actual = make_null_stats(12, &LogicalDataType::Float);
assert_eq!(actual, Statistics::F64(StatValues::new_all_null(12)));
}
#[test]
fn null_stats_string() {
let actual = make_null_stats(12, &LogicalDataType::String);
assert_eq!(actual, Statistics::String(StatValues::new_all_null(12)));
}
#[test]
#[should_panic(expected = "Binary statistics not supported")]
fn null_stats_binary() {
make_null_stats(12, &LogicalDataType::Binary);
}
#[test]
fn null_stats_boolean() {
let actual = make_null_stats(12, &LogicalDataType::Boolean);
assert_eq!(actual, Statistics::Bool(StatValues::new_all_null(12)));
}
}

View File

@ -1138,23 +1138,24 @@ impl std::fmt::Display for &Scalar {
#[derive(Debug, PartialEq, PartialOrd, Clone)]
pub enum OwnedValue {
// Represents a NULL value in a column row.
Null,
// A UTF-8 valid string.
/// A UTF-8 valid string.
String(String),
// An arbitrary byte array.
/// An arbitrary byte array.
ByteArray(Vec<u8>),
// A boolean value.
/// A boolean value.
Boolean(bool),
// A numeric scalar value.
/// A numeric scalar value, or NULL
Scalar(Scalar),
}
impl OwnedValue {
pub fn new_null() -> Self {
Self::Scalar(Scalar::Null)
}
/// The size in bytes of this value.
pub fn size(&self) -> usize {
let self_size = size_of::<Self>();
@ -1169,7 +1170,6 @@ impl OwnedValue {
impl std::fmt::Display for &OwnedValue {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
OwnedValue::Null => write!(f, "NULL"),
OwnedValue::String(s) => s.fmt(f),
OwnedValue::ByteArray(s) => write!(f, "{}", String::from_utf8_lossy(s)),
OwnedValue::Boolean(b) => b.fmt(f),
@ -1882,7 +1882,7 @@ mod test {
#[test]
fn size() {
let v1 = OwnedValue::Null;
let v1 = OwnedValue::new_null();
assert_eq!(v1.size(), 32);
let v1 = OwnedValue::Scalar(Scalar::I64(22));

View File

@ -137,6 +137,8 @@ enum Command {
}
fn main() -> Result<(), std::io::Error> {
install_crash_handler(); // attempt to render a useful stacktrace to stderr
// load all environment variables from .env before doing anything
load_dotenv();
@ -256,3 +258,50 @@ fn load_dotenv() {
}
};
}
// Based on ideas from
// https://github.com/servo/servo/blob/f03ddf6c6c6e94e799ab2a3a89660aea4a01da6f/ports/servo/main.rs#L58-L79
fn install_crash_handler() {
unsafe {
set_signal_handler(libc::SIGSEGV, signal_handler); // handle segfaults
set_signal_handler(libc::SIGILL, signal_handler); // handle stack overflow and unsupported CPUs
set_signal_handler(libc::SIGBUS, signal_handler); // handle invalid memory access
}
}
unsafe extern "C" fn signal_handler(sig: i32) {
use backtrace::Backtrace;
use std::process::abort;
let name = std::thread::current()
.name()
.map(|n| format!(" for thread \"{}\"", n))
.unwrap_or_else(|| "".to_owned());
eprintln!(
"Signal {}, Stack trace{}\n{:?}",
sig,
name,
Backtrace::new()
);
abort();
}
// based on https://github.com/adjivas/sig/blob/master/src/lib.rs#L34-L52
unsafe fn set_signal_handler(signal: libc::c_int, handler: unsafe extern "C" fn(libc::c_int)) {
use libc::{sigaction, sigfillset, sighandler_t};
let mut sigset = std::mem::zeroed();
// Block all signals during the handler. This is the expected behavior, but
// it's not guaranteed by `signal()`.
if sigfillset(&mut sigset) != -1 {
// Done because sigaction has private members.
// This is safe because sa_restorer and sa_handlers are pointers that
// might be null (that is, zero).
let mut action: sigaction = std::mem::zeroed();
// action.sa_flags = 0;
action.sa_mask = sigset;
action.sa_sigaction = handler as sighandler_t;
sigaction(signal, &action, std::ptr::null_mut());
}
}

View File

@ -17,6 +17,9 @@ pub async fn test() {
let mut client = server_fixture.flight_client();
// This does nothing except test the client handshake implementation.
client.handshake().await.unwrap();
let query_results = client
.perform_query(scenario.database_name(), sql_query)
.await