chore: Update to Rust 1.53.0 (#1922)
* chore: Update to Rust 1.53.0 * fix: Update to latest clippy standards * fix: bad refactor * fix: Update escaping * test: update test output Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>pull/24376/head
parent
090b0aba11
commit
e6d995cbd8
|
@ -250,7 +250,7 @@ mod tests {
|
|||
let hash_ring: HashRing = protobuf.clone().try_into().unwrap();
|
||||
let back: management::HashRing = hash_ring.clone().into();
|
||||
|
||||
assert_eq!(hash_ring.table_name, false);
|
||||
assert!(!hash_ring.table_name);
|
||||
assert_eq!(protobuf.table_name, back.table_name);
|
||||
assert!(hash_ring.columns.is_empty());
|
||||
assert_eq!(protobuf.columns, back.columns);
|
||||
|
@ -308,7 +308,7 @@ mod tests {
|
|||
assert!(shard_config.hash_ring.is_none());
|
||||
assert_eq!(protobuf.hash_ring, back.hash_ring);
|
||||
|
||||
assert_eq!(shard_config.ignore_errors, false);
|
||||
assert!(!shard_config.ignore_errors);
|
||||
assert_eq!(protobuf.ignore_errors, back.ignore_errors);
|
||||
|
||||
assert!(shard_config.shards.is_empty());
|
||||
|
|
|
@ -1286,7 +1286,7 @@ mod test {
|
|||
assert_eq!(vals[0].series.measurement, "foo");
|
||||
assert_eq!(vals[0].timestamp, Some(1234));
|
||||
assert_eq!(vals[0].field_set[0].0, "asdf");
|
||||
assert_eq!(vals[0].field_set[0].1.unwrap_bool(), true);
|
||||
assert!(vals[0].field_set[0].1.unwrap_bool());
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
@ -1423,7 +1423,7 @@ mod test {
|
|||
assert_eq!(vals[0].field_set[3].1.unwrap_string(), "the string");
|
||||
|
||||
assert_eq!(vals[0].field_set[4].0, "frab");
|
||||
assert_eq!(vals[0].field_set[4].1.unwrap_bool(), false);
|
||||
assert!(!vals[0].field_set[4].1.unwrap_bool());
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
|
|
@ -230,7 +230,7 @@ mod test {
|
|||
Box::new(ArrowDataType::Utf8)
|
||||
)
|
||||
);
|
||||
assert_eq!(field.is_nullable(), true);
|
||||
assert!(field.is_nullable());
|
||||
assert_eq!(influxdb_column_type, Some(Tag));
|
||||
|
||||
let (influxdb_column_type, field) = s.field(1);
|
||||
|
@ -242,7 +242,7 @@ mod test {
|
|||
Box::new(ArrowDataType::Utf8)
|
||||
)
|
||||
);
|
||||
assert_eq!(field.is_nullable(), false);
|
||||
assert!(!field.is_nullable());
|
||||
assert_eq!(influxdb_column_type, Some(Tag));
|
||||
|
||||
assert_eq!(s.len(), 2);
|
||||
|
@ -260,13 +260,13 @@ mod test {
|
|||
let (influxdb_column_type, field) = s.field(0);
|
||||
assert_eq!(field.name(), "the_influx_field");
|
||||
assert_eq!(field.data_type(), &ArrowDataType::Float64);
|
||||
assert_eq!(field.is_nullable(), true);
|
||||
assert!(field.is_nullable());
|
||||
assert_eq!(influxdb_column_type, Some(Field(Float)));
|
||||
|
||||
let (influxdb_column_type, field) = s.field(1);
|
||||
assert_eq!(field.name(), "the_no_influx_field");
|
||||
assert_eq!(field.data_type(), &ArrowDataType::Decimal(10, 0));
|
||||
assert_eq!(field.is_nullable(), true);
|
||||
assert!(field.is_nullable());
|
||||
assert_eq!(influxdb_column_type, None);
|
||||
|
||||
assert_eq!(s.len(), 2);
|
||||
|
@ -282,7 +282,7 @@ mod test {
|
|||
let (influxdb_column_type, field) = s.field(0);
|
||||
assert_eq!(field.name(), "the_influx_field");
|
||||
assert_eq!(field.data_type(), &ArrowDataType::Float64);
|
||||
assert_eq!(field.is_nullable(), true);
|
||||
assert!(field.is_nullable());
|
||||
assert_eq!(influxdb_column_type, Some(Field(Float)));
|
||||
|
||||
assert_eq!(s.len(), 1);
|
||||
|
@ -300,13 +300,13 @@ mod test {
|
|||
let (influxdb_column_type, field) = s.field(0);
|
||||
assert_eq!(field.name(), "the_influx_field");
|
||||
assert_eq!(field.data_type(), &ArrowDataType::Float64);
|
||||
assert_eq!(field.is_nullable(), false);
|
||||
assert!(!field.is_nullable());
|
||||
assert_eq!(influxdb_column_type, Some(Field(Float)));
|
||||
|
||||
let (influxdb_column_type, field) = s.field(1);
|
||||
assert_eq!(field.name(), "the_no_influx_field");
|
||||
assert_eq!(field.data_type(), &ArrowDataType::Decimal(10, 0));
|
||||
assert_eq!(field.is_nullable(), false);
|
||||
assert!(!field.is_nullable());
|
||||
assert_eq!(influxdb_column_type, None);
|
||||
|
||||
assert_eq!(s.len(), 2);
|
||||
|
|
|
@ -1230,10 +1230,10 @@ mod test {
|
|||
|
||||
// w,w,w,e,e,n,n,n,n
|
||||
// 0 1 2 3 4 5 6 7 8
|
||||
assert_eq!(drle.has_non_null_value_in_row_ids(&[0]), true);
|
||||
assert_eq!(drle.has_non_null_value_in_row_ids(&[1, 3]), true);
|
||||
assert_eq!(drle.has_non_null_value_in_row_ids(&[8]), true);
|
||||
assert_eq!(drle.has_non_null_value_in_row_ids(&[12, 132]), false);
|
||||
assert!(drle.has_non_null_value_in_row_ids(&[0]));
|
||||
assert!(drle.has_non_null_value_in_row_ids(&[1, 3]));
|
||||
assert!(drle.has_non_null_value_in_row_ids(&[8]));
|
||||
assert!(!drle.has_non_null_value_in_row_ids(&[12, 132]));
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
@ -1251,17 +1251,17 @@ mod test {
|
|||
|
||||
// w,w,w,?,e,e,n,n,n,n, ?, ?, ?, ?, w, w, w
|
||||
// 0 1 2 3 4 5 6 7 8 9 10 11, 12, 13, 14, 15, 16
|
||||
assert_eq!(drle.has_non_null_value_in_row_ids(&[0]), true);
|
||||
assert_eq!(drle.has_non_null_value_in_row_ids(&[2, 3]), true);
|
||||
assert_eq!(drle.has_non_null_value_in_row_ids(&[2, 3]), true);
|
||||
assert_eq!(drle.has_non_null_value_in_row_ids(&[3, 4, 10]), true);
|
||||
assert_eq!(drle.has_non_null_value_in_row_ids(&[16, 19]), true);
|
||||
assert!(drle.has_non_null_value_in_row_ids(&[0]));
|
||||
assert!(drle.has_non_null_value_in_row_ids(&[2, 3]));
|
||||
assert!(drle.has_non_null_value_in_row_ids(&[2, 3]));
|
||||
assert!(drle.has_non_null_value_in_row_ids(&[3, 4, 10]));
|
||||
assert!(drle.has_non_null_value_in_row_ids(&[16, 19]));
|
||||
|
||||
assert_eq!(drle.has_non_null_value_in_row_ids(&[3]), false);
|
||||
assert_eq!(drle.has_non_null_value_in_row_ids(&[3, 10]), false);
|
||||
assert_eq!(drle.has_non_null_value_in_row_ids(&[17]), false);
|
||||
assert_eq!(drle.has_non_null_value_in_row_ids(&[17, 19]), false);
|
||||
assert_eq!(drle.has_non_null_value_in_row_ids(&[12, 19]), false);
|
||||
assert!(!drle.has_non_null_value_in_row_ids(&[3]));
|
||||
assert!(!drle.has_non_null_value_in_row_ids(&[3, 10]));
|
||||
assert!(!drle.has_non_null_value_in_row_ids(&[17]));
|
||||
assert!(!drle.has_non_null_value_in_row_ids(&[17, 19]));
|
||||
assert!(!drle.has_non_null_value_in_row_ids(&[12, 19]));
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
|
|
@ -521,11 +521,11 @@ mod test {
|
|||
packer.push_option(None);
|
||||
packer.push(33.3);
|
||||
|
||||
assert_eq!(packer.is_null(0), false);
|
||||
assert_eq!(packer.is_null(1), false);
|
||||
assert_eq!(packer.is_null(2), true);
|
||||
assert_eq!(packer.is_null(3), false);
|
||||
assert_eq!(packer.is_null(4), true); // out of bounds
|
||||
assert!(!packer.is_null(0));
|
||||
assert!(!packer.is_null(1));
|
||||
assert!(packer.is_null(2));
|
||||
assert!(!packer.is_null(3));
|
||||
assert!(packer.is_null(4)); // out of bounds
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
|
|
@ -90,9 +90,9 @@ mod tests {
|
|||
"| foo_tag_normal | foo_tag_empty | foo_tag_null_some | foo_tag_null_all | foo_field_string_normal | foo_field_string_empty | foo_field_string_null_some | foo_field_string_null_all | foo_field_i64_normal | foo_field_i64_range | foo_field_i64_null_some | foo_field_i64_null_all | foo_field_u64_normal | foo_field_u64_range | foo_field_u64_null_some | foo_field_u64_null_all | foo_field_f64_normal | foo_field_f64_inf | foo_field_f64_zero | foo_field_f64_nan_some | foo_field_f64_nan_all | foo_field_f64_null_some | foo_field_f64_null_all | foo_field_bool_normal | foo_field_bool_null_some | foo_field_bool_null_all | time |",
|
||||
"+----------------+---------------+-------------------+------------------+-------------------------+------------------------+----------------------------+---------------------------+----------------------+----------------------+-------------------------+------------------------+----------------------+----------------------+-------------------------+------------------------+----------------------+-------------------+--------------------+------------------------+-----------------------+-------------------------+------------------------+-----------------------+--------------------------+-------------------------+----------------------------+",
|
||||
"| foo | | | | foo | | | | -1 | -9223372036854775808 | | | 1 | 0 | | | 10.1 | 0 | 0 | NaN | NaN | | | true | | | 1970-01-01 00:00:00.000001 |",
|
||||
"| bar | | bar | | bar | | bar | | 2 | 9223372036854775807 | 2 | | 2 | 18446744073709551615 | 2 | | 20.1 | inf | 0 | 2 | NaN | 20.1 | | false | false | | 1970-01-01 00:00:00.000002 |",
|
||||
"| bar | | bar | | bar | | bar | | 2 | 9223372036854775807 | 2 | | 2 | 18446744073709551615 | 2 | | 20.1 | inf | -0 | 2 | NaN | 20.1 | | false | false | | 1970-01-01 00:00:00.000002 |",
|
||||
"| baz | | baz | | baz | | baz | | 3 | -9223372036854775808 | 3 | | 3 | 0 | 3 | | 30.1 | -inf | 0 | 1 | NaN | 30.1 | | true | true | | 1970-01-01 00:00:00.000003 |",
|
||||
"| foo | | | | foo | | | | 4 | 9223372036854775807 | | | 4 | 18446744073709551615 | | | 40.1 | 1 | 0 | NaN | NaN | | | false | | | 1970-01-01 00:00:00.000004 |",
|
||||
"| foo | | | | foo | | | | 4 | 9223372036854775807 | | | 4 | 18446744073709551615 | | | 40.1 | 1 | -0 | NaN | NaN | | | false | | | 1970-01-01 00:00:00.000004 |",
|
||||
"+----------------+---------------+-------------------+------------------+-------------------------+------------------------+----------------------------+---------------------------+----------------------+----------------------+-------------------------+------------------------+----------------------+----------------------+-------------------------+------------------------+----------------------+-------------------+--------------------+------------------------+-----------------------+-------------------------+------------------------+-----------------------+--------------------------+-------------------------+----------------------------+",
|
||||
];
|
||||
assert_eq!(num_rows, actual_num_rows);
|
||||
|
|
|
@ -85,7 +85,6 @@ mod test {
|
|||
logical_plan::{col, Expr},
|
||||
prelude::ExecutionContext,
|
||||
};
|
||||
use std::iter::FromIterator;
|
||||
use std::sync::Arc;
|
||||
|
||||
#[tokio::test]
|
||||
|
@ -197,9 +196,12 @@ mod test {
|
|||
Arc::clone(&schema),
|
||||
vec![
|
||||
Arc::new(StringArray::from(words.clone())),
|
||||
Arc::new(UInt64Array::from_iter(
|
||||
words.iter().map(|word| word.map(|word| word.len() as u64)),
|
||||
)),
|
||||
Arc::new(
|
||||
words
|
||||
.iter()
|
||||
.map(|word| word.map(|word| word.len() as u64))
|
||||
.collect::<UInt64Array>(),
|
||||
),
|
||||
],
|
||||
)
|
||||
.unwrap();
|
||||
|
|
|
@ -3,7 +3,7 @@ mod internal;
|
|||
pub use internal::{Duration, Window};
|
||||
use internal_types::schema::TIME_DATA_TYPE;
|
||||
|
||||
use std::{iter::FromIterator, sync::Arc};
|
||||
use std::sync::Arc;
|
||||
|
||||
use arrow::array::{ArrayRef, TimestampNanosecondArray};
|
||||
use datafusion::{logical_plan::Expr, physical_plan::functions::make_scalar_function, prelude::*};
|
||||
|
@ -57,7 +57,7 @@ fn window_bounds(args: &[ArrayRef], every: &WindowDuration, offset: &WindowDurat
|
|||
})
|
||||
});
|
||||
|
||||
let array = TimestampNanosecondArray::from_iter(values);
|
||||
let array = values.collect::<TimestampNanosecondArray>();
|
||||
Arc::new(array) as ArrayRef
|
||||
}
|
||||
|
||||
|
|
|
@ -77,7 +77,7 @@ pub enum WindowDuration {
|
|||
|
||||
impl Aggregate {
|
||||
/// Create the appropriate DataFusion expression for this aggregate
|
||||
pub fn to_datafusion_expr(&self, input: Expr) -> Result<Expr> {
|
||||
pub fn to_datafusion_expr(self, input: Expr) -> Result<Expr> {
|
||||
use datafusion::logical_plan::{avg, count, max, min, sum};
|
||||
match self {
|
||||
Self::Sum => Ok(sum(input)),
|
||||
|
|
|
@ -123,11 +123,10 @@ impl SchemaAdapterStream {
|
|||
// not present in the desired output schema (otherwise we
|
||||
// are dropping fields -- theys should have been selected
|
||||
// out with projection push down)
|
||||
if output_schema
|
||||
if !output_schema
|
||||
.fields()
|
||||
.iter()
|
||||
.find(|output_field| input_field.name() == output_field.name())
|
||||
.is_none()
|
||||
.any(|output_field| input_field.name() == output_field.name())
|
||||
{
|
||||
return InternalLostInputField {
|
||||
field_name: input_field.name(),
|
||||
|
|
|
@ -2150,10 +2150,7 @@ mod test {
|
|||
// Future improvement would be to support this type of check.
|
||||
let input = &[100_i64, -20];
|
||||
let col = Column::from(&input[..]);
|
||||
assert_eq!(
|
||||
col.predicate_matches_all_values(&cmp::Operator::LT, &Value::from(u64::MAX)),
|
||||
false
|
||||
);
|
||||
assert!(!col.predicate_matches_all_values(&cmp::Operator::LT, &Value::from(u64::MAX)));
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
|
|
@ -8,7 +8,6 @@ use arrow::datatypes::Int8Type;
|
|||
use arrow::datatypes::UInt16Type;
|
||||
use arrow::datatypes::UInt32Type;
|
||||
use arrow::datatypes::UInt8Type;
|
||||
use std::iter::FromIterator;
|
||||
use std::mem::size_of;
|
||||
|
||||
use super::encoding::scalar::{rle, transcoders::*, ScalarEncoding};
|
||||
|
@ -429,7 +428,7 @@ impl From<Float64Array> for FloatEncoding {
|
|||
// Column is all NULL - encode as RLE u8
|
||||
if min.is_none() {
|
||||
let arr: PrimitiveArray<UInt8Type> =
|
||||
PrimitiveArray::from_iter(arr.iter().map::<Option<u8>, _>(|_| None));
|
||||
arr.iter().map::<Option<u8>, _>(|_| None).collect();
|
||||
let enc: Box<dyn ScalarEncoding<f64>> =
|
||||
Box::new(RLE::new_from_iter_opt(arr.iter(), FloatByteTrimmer {}));
|
||||
let name = enc.name();
|
||||
|
@ -488,8 +487,10 @@ fn from_array_with_byte_trimming(arr: Float64Array, range: (f64, f64)) -> FloatE
|
|||
let (enc, name) = match (min, max) {
|
||||
// encode as u8 values
|
||||
(min, max) if min >= 0.0 && max <= u8::MAX as f64 => {
|
||||
let arr: PrimitiveArray<UInt8Type> =
|
||||
PrimitiveArray::from_iter(arr.into_iter().map(|v| v.map(|v| transcoder.encode(v))));
|
||||
let arr: PrimitiveArray<UInt8Type> = arr
|
||||
.into_iter()
|
||||
.map(|v| v.map(|v| transcoder.encode(v)))
|
||||
.collect();
|
||||
let enc: Box<dyn ScalarEncoding<f64>> = if rle {
|
||||
Box::new(RLE::new_from_iter_opt(arr.iter(), transcoder))
|
||||
} else {
|
||||
|
@ -500,8 +501,10 @@ fn from_array_with_byte_trimming(arr: Float64Array, range: (f64, f64)) -> FloatE
|
|||
}
|
||||
// encode as i8 values
|
||||
(min, max) if min >= i8::MIN as f64 && max <= i8::MAX as f64 => {
|
||||
let arr: PrimitiveArray<Int8Type> =
|
||||
PrimitiveArray::from_iter(arr.into_iter().map(|v| v.map(|v| transcoder.encode(v))));
|
||||
let arr: PrimitiveArray<Int8Type> = arr
|
||||
.into_iter()
|
||||
.map(|v| v.map(|v| transcoder.encode(v)))
|
||||
.collect();
|
||||
let enc: Box<dyn ScalarEncoding<f64>> = if rle {
|
||||
Box::new(RLE::new_from_iter_opt(arr.iter(), transcoder))
|
||||
} else {
|
||||
|
@ -512,8 +515,10 @@ fn from_array_with_byte_trimming(arr: Float64Array, range: (f64, f64)) -> FloatE
|
|||
}
|
||||
// encode as u16 values
|
||||
(min, max) if min >= 0.0 && max <= u16::MAX as f64 => {
|
||||
let arr: PrimitiveArray<UInt16Type> =
|
||||
PrimitiveArray::from_iter(arr.into_iter().map(|v| v.map(|v| transcoder.encode(v))));
|
||||
let arr: PrimitiveArray<UInt16Type> = arr
|
||||
.into_iter()
|
||||
.map(|v| v.map(|v| transcoder.encode(v)))
|
||||
.collect();
|
||||
let enc: Box<dyn ScalarEncoding<f64>> = if rle {
|
||||
Box::new(RLE::new_from_iter_opt(arr.iter(), transcoder))
|
||||
} else {
|
||||
|
@ -524,8 +529,10 @@ fn from_array_with_byte_trimming(arr: Float64Array, range: (f64, f64)) -> FloatE
|
|||
}
|
||||
// encode as i16 values
|
||||
(min, max) if min >= i16::MIN as f64 && max <= i16::MAX as f64 => {
|
||||
let arr: PrimitiveArray<Int16Type> =
|
||||
PrimitiveArray::from_iter(arr.into_iter().map(|v| v.map(|v| transcoder.encode(v))));
|
||||
let arr: PrimitiveArray<Int16Type> = arr
|
||||
.into_iter()
|
||||
.map(|v| v.map(|v| transcoder.encode(v)))
|
||||
.collect();
|
||||
let enc: Box<dyn ScalarEncoding<f64>> = if rle {
|
||||
Box::new(RLE::new_from_iter_opt(arr.iter(), transcoder))
|
||||
} else {
|
||||
|
@ -536,8 +543,10 @@ fn from_array_with_byte_trimming(arr: Float64Array, range: (f64, f64)) -> FloatE
|
|||
}
|
||||
// encode as u32 values
|
||||
(min, max) if min >= 0.0 && max <= u32::MAX as f64 => {
|
||||
let arr: PrimitiveArray<UInt32Type> =
|
||||
PrimitiveArray::from_iter(arr.into_iter().map(|v| v.map(|v| transcoder.encode(v))));
|
||||
let arr: PrimitiveArray<UInt32Type> = arr
|
||||
.into_iter()
|
||||
.map(|v| v.map(|v| transcoder.encode(v)))
|
||||
.collect();
|
||||
let enc: Box<dyn ScalarEncoding<f64>> = if rle {
|
||||
Box::new(RLE::new_from_iter_opt(arr.iter(), transcoder))
|
||||
} else {
|
||||
|
@ -548,8 +557,10 @@ fn from_array_with_byte_trimming(arr: Float64Array, range: (f64, f64)) -> FloatE
|
|||
}
|
||||
// encode as i32 values
|
||||
(min, max) if min >= i32::MIN as f64 && max <= i32::MAX as f64 => {
|
||||
let arr: PrimitiveArray<Int32Type> =
|
||||
PrimitiveArray::from_iter(arr.into_iter().map(|v| v.map(|v| transcoder.encode(v))));
|
||||
let arr: PrimitiveArray<Int32Type> = arr
|
||||
.into_iter()
|
||||
.map(|v| v.map(|v| transcoder.encode(v)))
|
||||
.collect();
|
||||
let enc: Box<dyn ScalarEncoding<f64>> = if rle {
|
||||
Box::new(RLE::new_from_iter_opt(arr.iter(), transcoder))
|
||||
} else {
|
||||
|
|
|
@ -1,5 +1,4 @@
|
|||
use std::fmt::Display;
|
||||
use std::iter::FromIterator;
|
||||
use std::mem::size_of;
|
||||
|
||||
use arrow::array::{Array, PrimitiveArray};
|
||||
|
@ -418,7 +417,10 @@ impl From<arrow::array::Int64Array> for IntegerEncoding {
|
|||
let (enc, name) = match (min, max) {
|
||||
// data is all NULL. Store u8 RLE
|
||||
(None, None) => {
|
||||
let arr = PrimitiveArray::from_iter(arr.iter().map::<Option<u8>, _>(|_| None));
|
||||
let arr = arr
|
||||
.iter()
|
||||
.map::<Option<u8>, _>(|_| None)
|
||||
.collect::<PrimitiveArray<_>>();
|
||||
let enc: Box<dyn ScalarEncoding<i64>> = if rle {
|
||||
Box::new(RLE::new_from_iter_opt(arr.iter(), transcoder))
|
||||
} else {
|
||||
|
@ -429,9 +431,10 @@ impl From<arrow::array::Int64Array> for IntegerEncoding {
|
|||
}
|
||||
// encode as u8 values
|
||||
(min, max) if min >= Some(0) && max <= Some(u8::MAX as i64) => {
|
||||
let arr = PrimitiveArray::from_iter(
|
||||
arr.into_iter().map(|v| v.map(|v| transcoder.encode(v))), // encode i64 as u8
|
||||
);
|
||||
let arr = arr
|
||||
.into_iter()
|
||||
.map(|v| v.map(|v| transcoder.encode(v)))
|
||||
.collect::<PrimitiveArray<_>>(); // encode i64 as u8
|
||||
|
||||
let enc: Box<dyn ScalarEncoding<i64>> = if rle {
|
||||
Box::new(RLE::new_from_iter_opt(arr.iter(), transcoder))
|
||||
|
@ -443,9 +446,10 @@ impl From<arrow::array::Int64Array> for IntegerEncoding {
|
|||
}
|
||||
// encode as i8 values
|
||||
(min, max) if min >= Some(i8::MIN as i64) && max <= Some(i8::MAX as i64) => {
|
||||
let arr = PrimitiveArray::from_iter(
|
||||
arr.into_iter().map(|v| v.map(|v| transcoder.encode(v))), // encode i64 as i8
|
||||
);
|
||||
let arr = arr
|
||||
.into_iter()
|
||||
.map(|v| v.map(|v| transcoder.encode(v)))
|
||||
.collect::<PrimitiveArray<_>>(); // encode i64 as i8
|
||||
|
||||
let enc: Box<dyn ScalarEncoding<i64>> = if rle {
|
||||
Box::new(RLE::new_from_iter_opt(arr.iter(), transcoder))
|
||||
|
@ -457,9 +461,10 @@ impl From<arrow::array::Int64Array> for IntegerEncoding {
|
|||
}
|
||||
// encode as u16 values
|
||||
(min, max) if min >= Some(0) && max <= Some(u16::MAX as i64) => {
|
||||
let arr = PrimitiveArray::from_iter(
|
||||
arr.into_iter().map(|v| v.map(|v| transcoder.encode(v))), // encode i64 as u16
|
||||
);
|
||||
let arr = arr
|
||||
.into_iter()
|
||||
.map(|v| v.map(|v| transcoder.encode(v)))
|
||||
.collect::<PrimitiveArray<_>>(); // encode i64 as u16
|
||||
|
||||
let enc: Box<dyn ScalarEncoding<i64>> = if rle {
|
||||
Box::new(RLE::new_from_iter_opt(arr.iter(), transcoder))
|
||||
|
@ -471,10 +476,10 @@ impl From<arrow::array::Int64Array> for IntegerEncoding {
|
|||
}
|
||||
// encode as i16 values
|
||||
(min, max) if min >= Some(i16::MIN as i64) && max <= Some(i16::MAX as i64) => {
|
||||
let arr = PrimitiveArray::from_iter(
|
||||
arr.into_iter().map(|v| v.map(|v| transcoder.encode(v))), // encode i64 as i16
|
||||
);
|
||||
|
||||
let arr = arr
|
||||
.into_iter()
|
||||
.map(|v| v.map(|v| transcoder.encode(v)))
|
||||
.collect::<PrimitiveArray<_>>(); // encode i64 as i16
|
||||
let enc: Box<dyn ScalarEncoding<i64>> = if rle {
|
||||
Box::new(RLE::new_from_iter_opt(arr.iter(), transcoder))
|
||||
} else {
|
||||
|
@ -485,9 +490,10 @@ impl From<arrow::array::Int64Array> for IntegerEncoding {
|
|||
}
|
||||
// encode as u32 values
|
||||
(min, max) if min >= Some(0) && max <= Some(u32::MAX as i64) => {
|
||||
let arr = PrimitiveArray::from_iter(
|
||||
arr.into_iter().map(|v| v.map(|v| transcoder.encode(v))), // encode i64 as u32
|
||||
);
|
||||
let arr = arr
|
||||
.into_iter()
|
||||
.map(|v| v.map(|v| transcoder.encode(v)))
|
||||
.collect::<PrimitiveArray<_>>(); // encode i64 as u32
|
||||
|
||||
let enc: Box<dyn ScalarEncoding<i64>> = if rle {
|
||||
Box::new(RLE::new_from_iter_opt(arr.iter(), transcoder))
|
||||
|
@ -499,9 +505,10 @@ impl From<arrow::array::Int64Array> for IntegerEncoding {
|
|||
}
|
||||
// encode as i32 values
|
||||
(min, max) if min >= Some(i32::MIN as i64) && max <= Some(i32::MAX as i64) => {
|
||||
let arr = PrimitiveArray::from_iter(
|
||||
arr.into_iter().map(|v| v.map(|v| transcoder.encode(v))), // encode i64 as i32
|
||||
);
|
||||
let arr = arr
|
||||
.into_iter()
|
||||
.map(|v| v.map(|v| transcoder.encode(v)))
|
||||
.collect::<PrimitiveArray<_>>(); // encode i64 as i32
|
||||
|
||||
let enc: Box<dyn ScalarEncoding<i64>> = if rle {
|
||||
Box::new(RLE::new_from_iter_opt(arr.iter(), transcoder))
|
||||
|
@ -627,7 +634,10 @@ impl From<arrow::array::UInt64Array> for IntegerEncoding {
|
|||
// data is all NULL. Store as single byte column for now.
|
||||
// TODO(edd): this will be smaller when stored using RLE
|
||||
None => {
|
||||
let arr = PrimitiveArray::from_iter(arr.iter().map::<Option<u8>, _>(|_| None));
|
||||
let arr = arr
|
||||
.iter()
|
||||
.map::<Option<u8>, _>(|_| None)
|
||||
.collect::<PrimitiveArray<_>>();
|
||||
let enc: Box<dyn ScalarEncoding<u64>> = if rle {
|
||||
Box::new(RLE::new_from_iter_opt(arr.iter(), transcoder))
|
||||
} else {
|
||||
|
@ -638,9 +648,10 @@ impl From<arrow::array::UInt64Array> for IntegerEncoding {
|
|||
}
|
||||
// encode as u8 values
|
||||
max if max <= Some(u8::MAX as u64) => {
|
||||
let arr = PrimitiveArray::from_iter(
|
||||
arr.into_iter().map(|v| v.map(|v| transcoder.encode(v))), // encode u64 as u8
|
||||
);
|
||||
let arr = arr
|
||||
.into_iter()
|
||||
.map(|v| v.map(|v| transcoder.encode(v)))
|
||||
.collect::<PrimitiveArray<_>>(); // encode u64 as u8
|
||||
|
||||
let enc: Box<dyn ScalarEncoding<u64>> = if rle {
|
||||
Box::new(RLE::new_from_iter_opt(arr.iter(), transcoder))
|
||||
|
@ -652,9 +663,10 @@ impl From<arrow::array::UInt64Array> for IntegerEncoding {
|
|||
}
|
||||
// encode as u16 values
|
||||
max if max <= Some(u16::MAX as u64) => {
|
||||
let arr = PrimitiveArray::from_iter(
|
||||
arr.into_iter().map(|v| v.map(|v| transcoder.encode(v))), // encode u64 as u16
|
||||
);
|
||||
let arr = arr
|
||||
.into_iter()
|
||||
.map(|v| v.map(|v| transcoder.encode(v)))
|
||||
.collect::<PrimitiveArray<_>>(); // encode u64 as u16
|
||||
|
||||
let enc: Box<dyn ScalarEncoding<u64>> = if rle {
|
||||
Box::new(RLE::new_from_iter_opt(arr.iter(), transcoder))
|
||||
|
@ -666,9 +678,10 @@ impl From<arrow::array::UInt64Array> for IntegerEncoding {
|
|||
}
|
||||
// encode as u32 values
|
||||
max if max <= Some(u32::MAX as u64) => {
|
||||
let arr = PrimitiveArray::from_iter(
|
||||
arr.into_iter().map(|v| v.map(|v| transcoder.encode(v))), // encode u64 as u32
|
||||
);
|
||||
let arr = arr
|
||||
.into_iter()
|
||||
.map(|v| v.map(|v| transcoder.encode(v)))
|
||||
.collect::<PrimitiveArray<_>>(); // encode u64 as u32
|
||||
|
||||
let enc: Box<dyn ScalarEncoding<u64>> = if rle {
|
||||
Box::new(RLE::new_from_iter_opt(arr.iter(), transcoder))
|
||||
|
|
|
@ -2912,46 +2912,46 @@ west,POST,304,101,203
|
|||
let row_group = RowGroup::new(6, columns);
|
||||
|
||||
let mut predicate = Predicate::default();
|
||||
assert_eq!(row_group.satisfies_predicate(&predicate), true);
|
||||
assert!(row_group.satisfies_predicate(&predicate));
|
||||
|
||||
predicate = Predicate::new(vec![BinaryExpr::from(("region", "=", "east"))]);
|
||||
assert_eq!(row_group.satisfies_predicate(&predicate), true);
|
||||
assert!(row_group.satisfies_predicate(&predicate));
|
||||
|
||||
// all expressions satisfied in data
|
||||
predicate = Predicate::new(vec![
|
||||
BinaryExpr::from(("region", "=", "east")),
|
||||
BinaryExpr::from(("method", "!=", "POST")),
|
||||
]);
|
||||
assert_eq!(row_group.satisfies_predicate(&predicate), true);
|
||||
assert!(row_group.satisfies_predicate(&predicate));
|
||||
|
||||
// all expressions satisfied in data by all rows
|
||||
predicate = Predicate::new(vec![BinaryExpr::from(("method", "=", "GET"))]);
|
||||
assert_eq!(row_group.satisfies_predicate(&predicate), true);
|
||||
assert!(row_group.satisfies_predicate(&predicate));
|
||||
|
||||
// one expression satisfied in data but other ruled out via column pruning.
|
||||
predicate = Predicate::new(vec![
|
||||
BinaryExpr::from(("region", "=", "east")),
|
||||
BinaryExpr::from(("method", ">", "GET")),
|
||||
]);
|
||||
assert_eq!(row_group.satisfies_predicate(&predicate), false);
|
||||
assert!(!row_group.satisfies_predicate(&predicate));
|
||||
|
||||
// all expressions rules out via column pruning.
|
||||
predicate = Predicate::new(vec![
|
||||
BinaryExpr::from(("region", ">", "west")),
|
||||
BinaryExpr::from(("method", ">", "GET")),
|
||||
]);
|
||||
assert_eq!(row_group.satisfies_predicate(&predicate), false);
|
||||
assert!(!row_group.satisfies_predicate(&predicate));
|
||||
|
||||
// column does not exist
|
||||
predicate = Predicate::new(vec![BinaryExpr::from(("track", "=", "Jeanette"))]);
|
||||
assert_eq!(row_group.satisfies_predicate(&predicate), false);
|
||||
assert!(!row_group.satisfies_predicate(&predicate));
|
||||
|
||||
// one column satisfies expression but other column does not exist
|
||||
predicate = Predicate::new(vec![
|
||||
BinaryExpr::from(("region", "=", "south")),
|
||||
BinaryExpr::from(("track", "=", "Jeanette")),
|
||||
]);
|
||||
assert_eq!(row_group.satisfies_predicate(&predicate), false);
|
||||
assert!(!row_group.satisfies_predicate(&predicate));
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
|
|
@ -1573,9 +1573,12 @@ impl From<Values<'_>> for arrow::array::ArrayRef {
|
|||
// drop NULL value entry as this is not stored in Arrow's
|
||||
// dictionary values array.
|
||||
assert!(values[0].is_none());
|
||||
arrow::array::StringArray::from_iter(values.into_iter().skip(1))
|
||||
values
|
||||
.into_iter()
|
||||
.skip(1)
|
||||
.collect::<arrow::array::StringArray>()
|
||||
} else {
|
||||
arrow::array::StringArray::from(values)
|
||||
values.into_iter().collect::<arrow::array::StringArray>()
|
||||
};
|
||||
|
||||
let mut builder = ArrayDataBuilder::new(DataType::Dictionary(
|
||||
|
|
|
@ -1,3 +1,3 @@
|
|||
[toolchain]
|
||||
channel = "1.52.1"
|
||||
channel = "1.53.0"
|
||||
components = [ "rustfmt", "clippy" ]
|
||||
|
|
|
@ -3,7 +3,6 @@
|
|||
//! For example `SELECT * FROM system.chunks`
|
||||
|
||||
use std::convert::AsRef;
|
||||
use std::iter::FromIterator;
|
||||
use std::sync::Arc;
|
||||
use std::{any::Any, collections::HashMap};
|
||||
|
||||
|
@ -201,22 +200,42 @@ fn chunk_summaries_schema() -> SchemaRef {
|
|||
}
|
||||
|
||||
fn from_chunk_summaries(schema: SchemaRef, chunks: Vec<ChunkSummary>) -> Result<RecordBatch> {
|
||||
let id = UInt32Array::from_iter(chunks.iter().map(|c| Some(c.id)));
|
||||
let partition_key =
|
||||
StringArray::from_iter(chunks.iter().map(|c| Some(c.partition_key.as_ref())));
|
||||
let table_name = StringArray::from_iter(chunks.iter().map(|c| Some(c.table_name.as_ref())));
|
||||
let storage = StringArray::from_iter(chunks.iter().map(|c| Some(c.storage.as_str())));
|
||||
let estimated_bytes =
|
||||
UInt64Array::from_iter(chunks.iter().map(|c| Some(c.estimated_bytes as u64)));
|
||||
let row_counts = UInt64Array::from_iter(chunks.iter().map(|c| Some(c.row_count as u64)));
|
||||
let time_of_first_write = TimestampNanosecondArray::from_iter(
|
||||
chunks.iter().map(|c| c.time_of_first_write).map(time_to_ts),
|
||||
);
|
||||
let time_of_last_write = TimestampNanosecondArray::from_iter(
|
||||
chunks.iter().map(|c| c.time_of_last_write).map(time_to_ts),
|
||||
);
|
||||
let time_closed =
|
||||
TimestampNanosecondArray::from_iter(chunks.iter().map(|c| c.time_closed).map(time_to_ts));
|
||||
let id = chunks.iter().map(|c| Some(c.id)).collect::<UInt32Array>();
|
||||
let partition_key = chunks
|
||||
.iter()
|
||||
.map(|c| Some(c.partition_key.as_ref()))
|
||||
.collect::<StringArray>();
|
||||
let table_name = chunks
|
||||
.iter()
|
||||
.map(|c| Some(c.table_name.as_ref()))
|
||||
.collect::<StringArray>();
|
||||
let storage = chunks
|
||||
.iter()
|
||||
.map(|c| Some(c.storage.as_str()))
|
||||
.collect::<StringArray>();
|
||||
let estimated_bytes = chunks
|
||||
.iter()
|
||||
.map(|c| Some(c.estimated_bytes as u64))
|
||||
.collect::<UInt64Array>();
|
||||
let row_counts = chunks
|
||||
.iter()
|
||||
.map(|c| Some(c.row_count as u64))
|
||||
.collect::<UInt64Array>();
|
||||
let time_of_first_write = chunks
|
||||
.iter()
|
||||
.map(|c| c.time_of_first_write)
|
||||
.map(time_to_ts)
|
||||
.collect::<TimestampNanosecondArray>();
|
||||
let time_of_last_write = chunks
|
||||
.iter()
|
||||
.map(|c| c.time_of_last_write)
|
||||
.map(time_to_ts)
|
||||
.collect::<TimestampNanosecondArray>();
|
||||
let time_closed = chunks
|
||||
.iter()
|
||||
.map(|c| c.time_closed)
|
||||
.map(time_to_ts)
|
||||
.collect::<TimestampNanosecondArray>();
|
||||
|
||||
RecordBatch::try_new(
|
||||
schema,
|
||||
|
@ -484,21 +503,34 @@ fn from_task_trackers(
|
|||
.filter(|job| job.metadata().db_name() == Some(db_name))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let ids = StringArray::from_iter(jobs.iter().map(|job| Some(job.id().to_string())));
|
||||
let statuses = StringArray::from_iter(jobs.iter().map(|job| Some(job.get_status().name())));
|
||||
let cpu_time_used = Time64NanosecondArray::from_iter(
|
||||
jobs.iter()
|
||||
.map(|job| job.get_status().cpu_nanos().map(|n| n as i64)),
|
||||
);
|
||||
let wall_time_used = Time64NanosecondArray::from_iter(
|
||||
jobs.iter()
|
||||
.map(|job| job.get_status().wall_nanos().map(|n| n as i64)),
|
||||
);
|
||||
let partition_keys =
|
||||
StringArray::from_iter(jobs.iter().map(|job| job.metadata().partition_key()));
|
||||
let chunk_ids = UInt32Array::from_iter(jobs.iter().map(|job| job.metadata().chunk_id()));
|
||||
let descriptions =
|
||||
StringArray::from_iter(jobs.iter().map(|job| Some(job.metadata().description())));
|
||||
let ids = jobs
|
||||
.iter()
|
||||
.map(|job| Some(job.id().to_string()))
|
||||
.collect::<StringArray>();
|
||||
let statuses = jobs
|
||||
.iter()
|
||||
.map(|job| Some(job.get_status().name()))
|
||||
.collect::<StringArray>();
|
||||
let cpu_time_used = jobs
|
||||
.iter()
|
||||
.map(|job| job.get_status().cpu_nanos().map(|n| n as i64))
|
||||
.collect::<Time64NanosecondArray>();
|
||||
let wall_time_used = jobs
|
||||
.iter()
|
||||
.map(|job| job.get_status().wall_nanos().map(|n| n as i64))
|
||||
.collect::<Time64NanosecondArray>();
|
||||
let partition_keys = jobs
|
||||
.iter()
|
||||
.map(|job| job.metadata().partition_key())
|
||||
.collect::<StringArray>();
|
||||
let chunk_ids = jobs
|
||||
.iter()
|
||||
.map(|job| job.metadata().chunk_id())
|
||||
.collect::<UInt32Array>();
|
||||
let descriptions = jobs
|
||||
.iter()
|
||||
.map(|job| Some(job.metadata().description()))
|
||||
.collect::<StringArray>();
|
||||
|
||||
RecordBatch::try_new(
|
||||
schema,
|
||||
|
|
|
@ -746,7 +746,7 @@ where
|
|||
}
|
||||
};
|
||||
}
|
||||
return NoRemoteReachable { errors }.fail();
|
||||
NoRemoteReachable { errors }.fail()
|
||||
}
|
||||
|
||||
pub async fn write_entry(&self, db_name: &str, entry_bytes: Vec<u8>) -> Result<()> {
|
||||
|
@ -1562,8 +1562,8 @@ mod tests {
|
|||
ConnectionManagerError::RemoteServerConnectError {..}
|
||||
)
|
||||
));
|
||||
assert_eq!(written_1.load(Ordering::Relaxed), false);
|
||||
assert_eq!(written_2.load(Ordering::Relaxed), false);
|
||||
assert!(!written_1.load(Ordering::Relaxed));
|
||||
assert!(!written_2.load(Ordering::Relaxed));
|
||||
|
||||
// We configure the address for the other remote, this time connection will succeed
|
||||
// despite the bad remote failing to connect.
|
||||
|
@ -1578,8 +1578,8 @@ mod tests {
|
|||
.await
|
||||
.expect("cannot write lines");
|
||||
}
|
||||
assert_eq!(written_1.load(Ordering::Relaxed), true);
|
||||
assert_eq!(written_2.load(Ordering::Relaxed), true);
|
||||
assert!(written_1.load(Ordering::Relaxed));
|
||||
assert!(written_2.load(Ordering::Relaxed));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
|
|
@ -173,7 +173,7 @@ async fn test_read_error(db_name: &str, addr: &str) {
|
|||
.assert()
|
||||
.failure()
|
||||
.stderr(predicate::str::contains(
|
||||
"Table or CTE with name \\'unknown_table\\' not found",
|
||||
"Table or CTE with name 'unknown_table' not found",
|
||||
));
|
||||
|
||||
Command::cargo_bin("influxdb_iox")
|
||||
|
|
|
@ -313,7 +313,7 @@ async fn test_write_routed() {
|
|||
.await
|
||||
.unwrap_err()
|
||||
.to_string()
|
||||
.contains("Table or CTE with name \\\'disk\\\' not found\""));
|
||||
.contains("Table or CTE with name 'disk' not found\""));
|
||||
|
||||
let mut query_results = target_2
|
||||
.flight_client()
|
||||
|
@ -341,7 +341,7 @@ async fn test_write_routed() {
|
|||
.await
|
||||
.unwrap_err()
|
||||
.to_string()
|
||||
.contains("Table or CTE with name \\\'cpu\\\' not found\""));
|
||||
.contains("Table or CTE with name 'cpu' not found\""));
|
||||
|
||||
////
|
||||
|
||||
|
@ -571,7 +571,7 @@ async fn test_write_routed_no_shard() {
|
|||
.await
|
||||
.unwrap_err()
|
||||
.to_string()
|
||||
.contains("Table or CTE with name \\\'disk\\\' not found\""));
|
||||
.contains("Table or CTE with name 'disk' not found\""));
|
||||
|
||||
let mut query_results = target_2
|
||||
.flight_client()
|
||||
|
@ -599,7 +599,7 @@ async fn test_write_routed_no_shard() {
|
|||
.await
|
||||
.unwrap_err()
|
||||
.to_string()
|
||||
.contains("Table or CTE with name \\\'cpu\\\' not found\""));
|
||||
.contains("Table or CTE with name 'cpu' not found\""));
|
||||
|
||||
// Ensure that target_3 didn't get any writes.
|
||||
|
||||
|
@ -609,7 +609,7 @@ async fn test_write_routed_no_shard() {
|
|||
.await
|
||||
.unwrap_err()
|
||||
.to_string()
|
||||
.contains("Table or CTE with name \\\'cpu\\\' not found\""));
|
||||
.contains("Table or CTE with name 'cpu' not found\""));
|
||||
|
||||
assert!(target_3
|
||||
.flight_client()
|
||||
|
@ -617,5 +617,5 @@ async fn test_write_routed_no_shard() {
|
|||
.await
|
||||
.unwrap_err()
|
||||
.to_string()
|
||||
.contains("Table or CTE with name \\\'disk\\\' not found\""));
|
||||
.contains("Table or CTE with name 'disk' not found\""));
|
||||
}
|
||||
|
|
|
@ -730,7 +730,7 @@ ERROR foo
|
|||
.trim_start(),
|
||||
);
|
||||
|
||||
assert_eq!(called.load(Ordering::SeqCst), false);
|
||||
assert!(!called.load(Ordering::SeqCst));
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
|
Loading…
Reference in New Issue