Merge branch 'main' into alamb/underscore_in_bucket_names_2

pull/24376/head
Andrew Lamb 2021-01-21 15:53:27 -05:00 committed by GitHub
commit c50f9b1baf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 377 additions and 352 deletions

View File

@ -38,8 +38,11 @@ jq --null-input --sort-keys \
--arg imgPrefix "${DOCKER_IMAGE}" \ --arg imgPrefix "${DOCKER_IMAGE}" \
--arg appKey "$APP_NAME" \ --arg appKey "$APP_NAME" \
'$tagDigest | split(" ") as $td | { '$tagDigest | split(" ") as $td | {
Images: {
($appKey): { ($appKey): {
Tag: ($imgPrefix + ":" + $td[0]), Tag: ($imgPrefix + ":" + $td[0]),
Digest: ($imgPrefix + "@" + $td[1]), Digest: ($imgPrefix + "@" + $td[1]),
} },
},
PublishedAt: (now | todateiso8601)
}' }'

558
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -11,10 +11,10 @@ description = "Apache Arrow / Parquet / DataFusion dependencies for InfluxDB IOx
[dependencies] [dependencies]
# We are using development version of arrow/parquet/datafusion and the dependencies are at the same rev # We are using development version of arrow/parquet/datafusion and the dependencies are at the same rev
# The version can be found here: https://github.com/apache/arrow/commit/c46fd102678fd22b9781642437ad8821f907d9db # The version can be found here: https://github.com/apache/arrow/commit/84126d5d70e9f05f82616e9a8506b53fe1df4a22
# #
arrow = { git = "https://github.com/apache/arrow.git", rev = "c46fd102678fd22b9781642437ad8821f907d9db" , features = ["simd"] } arrow = { git = "https://github.com/apache/arrow.git", rev = "84126d5d70e9f05f82616e9a8506b53fe1df4a22" , features = ["simd"] }
datafusion = { git = "https://github.com/apache/arrow.git", rev = "c46fd102678fd22b9781642437ad8821f907d9db" } datafusion = { git = "https://github.com/apache/arrow.git", rev = "84126d5d70e9f05f82616e9a8506b53fe1df4a22" }
# Turn off the "arrow" feature; it currently has a bug that causes the crate to rebuild every time # Turn off the "arrow" feature; it currently has a bug that causes the crate to rebuild every time
# and we're not currently using it anyway # and we're not currently using it anyway
parquet = { git = "https://github.com/apache/arrow.git", rev = "c46fd102678fd22b9781642437ad8821f907d9db", default-features = false, features = ["snap", "brotli", "flate2", "lz4", "zstd"] } parquet = { git = "https://github.com/apache/arrow.git", rev = "84126d5d70e9f05f82616e9a8506b53fe1df4a22", default-features = false, features = ["snap", "brotli", "flate2", "lz4", "zstd"] }

View File

@ -287,9 +287,7 @@ mod tests {
use super::*; use super::*;
use influxdb_line_protocol::parse_lines; use influxdb_line_protocol::parse_lines;
#[allow(dead_code)]
type TestError = Box<dyn std::error::Error + Send + Sync + 'static>; type TestError = Box<dyn std::error::Error + Send + Sync + 'static>;
#[allow(dead_code)]
type Result<T = (), E = TestError> = std::result::Result<T, E>; type Result<T = (), E = TestError> = std::result::Result<T, E>;
#[test] #[test]

View File

@ -46,7 +46,6 @@ impl TryFrom<u8> for BlockType {
/// `Block` holds information about location and time range of a block of data. /// `Block` holds information about location and time range of a block of data.
#[derive(Debug, Copy, Clone, PartialEq)] #[derive(Debug, Copy, Clone, PartialEq)]
#[allow(dead_code)]
pub struct Block { pub struct Block {
pub min_time: i64, pub min_time: i64,
pub max_time: i64, pub max_time: i64,
@ -77,8 +76,8 @@ const MAX_BLOCK_VALUES: usize = 1000;
/// organization and bucket identifiers. /// organization and bucket identifiers.
pub struct InfluxID(u64); pub struct InfluxID(u64);
#[allow(dead_code)]
impl InfluxID { impl InfluxID {
#[allow(dead_code)]
fn new_str(s: &str) -> Result<Self, TSMError> { fn new_str(s: &str) -> Result<Self, TSMError> {
let v = u64::from_str_radix(s, 16).map_err(|e| TSMError { let v = u64::from_str_radix(s, 16).map_err(|e| TSMError {
description: e.to_string(), description: e.to_string(),

View File

@ -1,12 +1,16 @@
//! Represents a Chunk of data (a collection of tables and their data within //! Represents a Chunk of data (a collection of tables and their data within
//! some chunk) in the mutable store. //! some chunk) in the mutable store.
use arrow_deps::datafusion::error::Result as ArrowResult;
use arrow_deps::{ use arrow_deps::{
arrow::record_batch::RecordBatch, arrow::record_batch::RecordBatch,
datafusion::{ datafusion::{
logical_plan::Expr, logical_plan::Operator, optimizer::utils::expr_to_column_names, error::DataFusionError,
logical_plan::{Expr, ExpressionVisitor, Operator, Recursion},
optimizer::utils::expr_to_column_names,
prelude::*, prelude::*,
}, },
}; };
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use generated_types::wal as wb; use generated_types::wal as wb;
use std::collections::{BTreeSet, HashMap, HashSet}; use std::collections::{BTreeSet, HashMap, HashSet};
@ -14,7 +18,7 @@ use std::collections::{BTreeSet, HashMap, HashSet};
use data_types::{partition_metadata::Table as TableStats, TIME_COLUMN_NAME}; use data_types::{partition_metadata::Table as TableStats, TIME_COLUMN_NAME};
use query::{ use query::{
predicate::{Predicate, TimestampRange}, predicate::{Predicate, TimestampRange},
util::{visit_expression, AndExprBuilder, ExpressionVisitor}, util::AndExprBuilder,
}; };
use crate::dictionary::{Dictionary, Error as DictionaryError}; use crate::dictionary::{Dictionary, Error as DictionaryError};
@ -36,6 +40,9 @@ pub enum Error {
source: crate::table::Error, source: crate::table::Error,
}, },
#[snafu(display("Unsupported predicate. Mutable buffer does not support: {}", source))]
UnsupportedPredicate { source: DataFusionError },
#[snafu(display("Table ID {} not found in dictionary of chunk {}", table, chunk))] #[snafu(display("Table ID {} not found in dictionary of chunk {}", table, chunk))]
TableIdNotFoundInDictionary { TableIdNotFoundInDictionary {
table: u32, table: u32,
@ -251,7 +258,7 @@ impl Chunk {
let mut visitor = SupportVisitor {}; let mut visitor = SupportVisitor {};
let mut predicate_columns: HashSet<String> = HashSet::new(); let mut predicate_columns: HashSet<String> = HashSet::new();
for expr in &chunk_exprs { for expr in &chunk_exprs {
visit_expression(expr, &mut visitor); visitor = expr.accept(visitor).context(UnsupportedPredicate)?;
expr_to_column_names(&expr, &mut predicate_columns).unwrap(); expr_to_column_names(&expr, &mut predicate_columns).unwrap();
} }
@ -439,10 +446,10 @@ impl query::PartitionChunk for Chunk {
struct SupportVisitor {} struct SupportVisitor {}
impl ExpressionVisitor for SupportVisitor { impl ExpressionVisitor for SupportVisitor {
fn pre_visit(&mut self, expr: &Expr) { fn pre_visit(self, expr: &Expr) -> ArrowResult<Recursion<Self>> {
match expr { match expr {
Expr::Literal(..) => {} Expr::Literal(..) => Ok(Recursion::Continue(self)),
Expr::Column(..) => {} Expr::Column(..) => Ok(Recursion::Continue(self)),
Expr::BinaryExpr { op, .. } => { Expr::BinaryExpr { op, .. } => {
match op { match op {
Operator::Eq Operator::Eq
@ -455,17 +462,20 @@ impl ExpressionVisitor for SupportVisitor {
| Operator::Multiply | Operator::Multiply
| Operator::Divide | Operator::Divide
| Operator::And | Operator::And
| Operator::Or => {} | Operator::Or => Ok(Recursion::Continue(self)),
// Unsupported (need to think about ramifications) // Unsupported (need to think about ramifications)
Operator::NotEq | Operator::Modulus | Operator::Like | Operator::NotLike => { Operator::NotEq | Operator::Modulus | Operator::Like | Operator::NotLike => {
panic!("Unsupported binary operator in expression: {:?}", expr) Err(DataFusionError::NotImplemented(format!(
"Operator {:?} not yet supported in IOx MutableBuffer",
op
)))
} }
} }
} }
_ => panic!( _ => Err(DataFusionError::NotImplemented(format!(
"Unsupported expression in mutable_buffer database: {:?}", "Unsupported expression in mutable_buffer database: {:?}",
expr expr
), ))),
} }
} }
} }

View File

@ -999,7 +999,7 @@ mod tests {
datafusion::{physical_plan::collect, prelude::*}, datafusion::{physical_plan::collect, prelude::*},
}; };
use influxdb_line_protocol::{parse_lines, ParsedLine}; use influxdb_line_protocol::{parse_lines, ParsedLine};
use test_helpers::str_pair_vec_to_vec; use test_helpers::{assert_contains, str_pair_vec_to_vec};
use tokio::sync::mpsc; use tokio::sync::mpsc;
type TestError = Box<dyn std::error::Error + Send + Sync + 'static>; type TestError = Box<dyn std::error::Error + Send + Sync + 'static>;
@ -1659,9 +1659,6 @@ mod tests {
} }
#[tokio::test] #[tokio::test]
#[should_panic(
expected = "Unsupported binary operator in expression: #state NotEq Utf8(\"MA\")"
)]
async fn test_query_series_pred_neq() { async fn test_query_series_pred_neq() {
let db = MutableBufferDb::new("column_namedb"); let db = MutableBufferDb::new("column_namedb");
@ -1679,8 +1676,12 @@ mod tests {
.add_expr(col("state").not_eq(lit("MA"))) .add_expr(col("state").not_eq(lit("MA")))
.build(); .build();
// Should panic as the neq path isn't implemented yet // Should err as the neq path isn't implemented yet
db.query_series(predicate).await.unwrap(); let err = db.query_series(predicate).await.unwrap_err();
assert_contains!(
err.to_string(),
"Operator NotEq not yet supported in IOx MutableBuffer"
);
} }
#[tokio::test] #[tokio::test]

View File

@ -33,7 +33,7 @@ use bytes::Bytes;
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use futures::{Stream, StreamExt, TryStreamExt}; use futures::{Stream, StreamExt, TryStreamExt};
use snafu::Snafu; use snafu::Snafu;
use std::{io, path::PathBuf, unimplemented}; use std::{io, path::PathBuf};
/// Universal interface to multiple object store services. /// Universal interface to multiple object store services.
#[derive(Debug)] #[derive(Debug)]

View File

@ -64,7 +64,6 @@ pub enum Error {
JoinError { source: tokio::task::JoinError }, JoinError { source: tokio::task::JoinError },
} }
#[allow(dead_code)]
pub type Result<T, E = Error> = std::result::Result<T, E>; pub type Result<T, E = Error> = std::result::Result<T, E>;
#[derive(Debug)] #[derive(Debug)]

View File

@ -1,91 +1,6 @@
//! This module contains DataFusion utility functions and helpers //! This module contains DataFusion utility functions and helpers
use arrow_deps::datafusion::logical_plan::{binary_expr, Expr, Operator}; use arrow_deps::datafusion::logical_plan::{binary_expr, Expr, Operator};
/// Encode the traversal of an expression tree. When passed to
/// `visit_expression`, `ExpressionVisitor::visit` is invoked
/// recursively on all nodes of an expression tree
///
/// TODO contribute this back upstream to datafusion??
pub trait ExpressionVisitor {
/// Invoked before children of expr are visisted
fn pre_visit(&mut self, expr: &Expr);
/// Invoked after children of expr are visited. Default
/// implementation does nothing.
fn post_visit(&mut self, _expr: &Expr) {}
}
pub fn visit_expression<V: ExpressionVisitor>(expr: &Expr, visitor: &mut V) {
visitor.pre_visit(expr);
// recurse
match expr {
// expression types without inputs
Expr::Alias(..)
| Expr::Column(..)
| Expr::ScalarVariable(..)
| Expr::Literal(..)
| Expr::Wildcard => {
// No inputs, so no more recursion needed
}
Expr::BinaryExpr { left, right, .. } => {
visit_expression(left, visitor);
visit_expression(right, visitor);
}
Expr::Cast { expr, .. } => visit_expression(expr, visitor),
Expr::Case {
expr,
when_then_expr,
else_expr,
} => {
if let Some(expr) = expr.as_ref() {
visit_expression(expr, visitor);
}
when_then_expr.iter().for_each(|(when, then)| {
visit_expression(when, visitor);
visit_expression(then, visitor);
});
if let Some(else_expr) = else_expr.as_ref() {
visit_expression(else_expr, visitor);
}
}
Expr::Not(expr) => visit_expression(expr, visitor),
Expr::Negative(expr) => visit_expression(expr, visitor),
Expr::Between {
expr, low, high, ..
} => {
visit_expression(expr, visitor);
visit_expression(low, visitor);
visit_expression(high, visitor);
}
Expr::IsNull(expr) => visit_expression(expr, visitor),
Expr::IsNotNull(expr) => visit_expression(expr, visitor),
Expr::ScalarFunction { args, .. } => {
for arg in args {
visit_expression(arg, visitor)
}
}
Expr::ScalarUDF { args, .. } => {
for arg in args {
visit_expression(arg, visitor)
}
}
Expr::AggregateFunction { args, .. } => {
for arg in args {
visit_expression(arg, visitor)
}
}
Expr::AggregateUDF { args, .. } => {
for arg in args {
visit_expression(arg, visitor)
}
}
Expr::Sort { expr, .. } => visit_expression(expr, visitor),
}
visitor.post_visit(expr);
}
/// Creates a single expression representing the conjunction (aka /// Creates a single expression representing the conjunction (aka
/// AND'ing) together of a set of expressions /// AND'ing) together of a set of expressions
#[derive(Debug, Default)] #[derive(Debug, Default)]

View File

@ -70,7 +70,6 @@ pub enum Error {
InvalidFlatbuffersSegment, InvalidFlatbuffersSegment,
} }
#[allow(dead_code)]
pub type Result<T, E = Error> = std::result::Result<T, E>; pub type Result<T, E = Error> = std::result::Result<T, E>;
/// An in-memory buffer of a write ahead log. It is split up into segments, /// An in-memory buffer of a write ahead log. It is split up into segments,
@ -87,7 +86,6 @@ pub struct Buffer {
} }
impl Buffer { impl Buffer {
#[allow(dead_code)]
pub fn new( pub fn new(
max_size: u64, max_size: u64,
segment_size: u64, segment_size: u64,
@ -109,7 +107,6 @@ impl Buffer {
/// has been closed out. If the max size of the buffer would be exceeded /// has been closed out. If the max size of the buffer would be exceeded
/// by accepting the write, the oldest (first) of the closed segments /// by accepting the write, the oldest (first) of the closed segments
/// will be dropped, if it is persisted. Otherwise, an error is returned. /// will be dropped, if it is persisted. Otherwise, an error is returned.
#[allow(dead_code)]
pub fn append(&mut self, write: Arc<ReplicatedWrite>) -> Result<Option<Arc<Segment>>> { pub fn append(&mut self, write: Arc<ReplicatedWrite>) -> Result<Option<Arc<Segment>>> {
let write_size = u64::try_from(write.data.len()) let write_size = u64::try_from(write.data.len())
.expect("appended data must be less than a u64 in length"); .expect("appended data must be less than a u64 in length");
@ -167,7 +164,6 @@ impl Buffer {
} }
/// Returns the current size of the buffer. /// Returns the current size of the buffer.
#[allow(dead_code)]
pub fn size(&self) -> u64 { pub fn size(&self) -> u64 {
self.current_size self.current_size
} }
@ -177,7 +173,6 @@ impl Buffer {
/// given writer ID and sequence are to identify from what point to /// given writer ID and sequence are to identify from what point to
/// replay writes. If no write matches the given writer ID and sequence /// replay writes. If no write matches the given writer ID and sequence
/// number, all replicated writes within the buffer will be returned. /// number, all replicated writes within the buffer will be returned.
#[allow(dead_code)]
pub fn all_writes_since(&self, since: WriterSequence) -> Vec<Arc<ReplicatedWrite>> { pub fn all_writes_since(&self, since: WriterSequence) -> Vec<Arc<ReplicatedWrite>> {
let mut writes = Vec::new(); let mut writes = Vec::new();
@ -209,7 +204,6 @@ impl Buffer {
/// onward. This returns only writes from the passed in writer ID. If no /// onward. This returns only writes from the passed in writer ID. If no
/// write matches the given writer ID and sequence number, all /// write matches the given writer ID and sequence number, all
/// replicated writes within the buffer for that writer will be returned. /// replicated writes within the buffer for that writer will be returned.
#[allow(dead_code)]
pub fn writes_since(&self, since: WriterSequence) -> Vec<Arc<ReplicatedWrite>> { pub fn writes_since(&self, since: WriterSequence) -> Vec<Arc<ReplicatedWrite>> {
let mut writes = Vec::new(); let mut writes = Vec::new();
@ -244,7 +238,6 @@ impl Buffer {
} }
// Removes the oldest segment present in the buffer, returning its id // Removes the oldest segment present in the buffer, returning its id
#[allow(dead_code)]
fn remove_oldest_segment(&mut self) -> u64 { fn remove_oldest_segment(&mut self) -> u64 {
let removed_segment = self.closed_segments.remove(0); let removed_segment = self.closed_segments.remove(0);
self.current_size -= removed_segment.size; self.current_size -= removed_segment.size;
@ -276,7 +269,6 @@ pub struct Segment {
} }
impl Segment { impl Segment {
#[allow(dead_code)]
fn new(id: u64) -> Self { fn new(id: u64) -> Self {
Self { Self {
id, id,
@ -299,7 +291,6 @@ impl Segment {
// appends the write to the segment, keeping track of the summary information // appends the write to the segment, keeping track of the summary information
// about the writer // about the writer
#[allow(dead_code)]
fn append(&mut self, write: Arc<ReplicatedWrite>) -> Result<()> { fn append(&mut self, write: Arc<ReplicatedWrite>) -> Result<()> {
let (writer_id, sequence_number) = write.writer_and_sequence(); let (writer_id, sequence_number) = write.writer_and_sequence();
self.validate_and_update_sequence_summary(writer_id, sequence_number)?; self.validate_and_update_sequence_summary(writer_id, sequence_number)?;
@ -350,7 +341,6 @@ impl Segment {
} }
/// sets the time this segment was persisted at /// sets the time this segment was persisted at
#[allow(dead_code)]
pub fn set_persisted_at(&self, time: DateTime<Utc>) { pub fn set_persisted_at(&self, time: DateTime<Utc>) {
let mut persisted = self.persisted.lock().expect("mutex poisoned"); let mut persisted = self.persisted.lock().expect("mutex poisoned");
*persisted = Some(time); *persisted = Some(time);

View File

@ -87,7 +87,11 @@ pub fn enable_logging() {
#[macro_export] #[macro_export]
/// A macro to assert that one string is contained within another with /// A macro to assert that one string is contained within another with
/// a nice error message if they are not. Is a macro so test error /// a nice error message if they are not.
///
/// Usage: `assert_contains!(actual, expected)`
///
/// Is a macro so test error
/// messages are on the same line as the failure; /// messages are on the same line as the failure;
/// ///
/// Both arguments must be convertable into Strings (Into<String>) /// Both arguments must be convertable into Strings (Into<String>)