Merge pull request #2161 from influxdata/cn/1.54

feat: Upgrade to Rust 1.54
pull/24376/head
kodiakhq[bot] 2021-07-30 17:08:54 +00:00 committed by GitHub
commit 81ef17b02e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
84 changed files with 326 additions and 318 deletions

View File

@ -64,7 +64,7 @@ fn create_table(results: &[RecordBatch]) -> Result<Table> {
let mut header = Vec::new(); let mut header = Vec::new();
for field in schema.fields() { for field in schema.fields() {
header.push(Cell::new(&field.name())); header.push(Cell::new(field.name()));
} }
table.set_titles(Row::new(header)); table.set_titles(Row::new(header));
@ -73,7 +73,7 @@ fn create_table(results: &[RecordBatch]) -> Result<Table> {
let mut cells = Vec::new(); let mut cells = Vec::new();
for col in 0..batch.num_columns() { for col in 0..batch.num_columns() {
let column = batch.column(col); let column = batch.column(col);
cells.push(Cell::new(&array_value_to_string(&column, row)?)); cells.push(Cell::new(&array_value_to_string(column, row)?));
} }
table.add_row(Row::new(cells)); table.add_row(Row::new(cells));
} }

View File

@ -20,7 +20,7 @@ pub fn optimize_dictionaries(batch: &RecordBatch) -> Result<RecordBatch> {
.iter() .iter()
.zip(schema.fields()) .zip(schema.fields())
.map(|(col, field)| match field.data_type() { .map(|(col, field)| match field.data_type() {
DataType::Dictionary(key, value) => optimize_dict_col(col, &key, &value), DataType::Dictionary(key, value) => optimize_dict_col(col, key, value),
_ => Ok(Arc::clone(col)), _ => Ok(Arc::clone(col)),
}) })
.collect::<Result<Vec<_>>>()?; .collect::<Result<Vec<_>>>()?;

View File

@ -78,7 +78,7 @@ impl<K: AsPrimitive<usize> + FromPrimitive + Zero> PackedStringArray<K> {
pub fn iter(&self) -> PackedStringIterator<'_, K> { pub fn iter(&self) -> PackedStringIterator<'_, K> {
PackedStringIterator { PackedStringIterator {
array: &self, array: self,
index: 0, index: 0,
} }
} }

View File

@ -98,7 +98,7 @@ impl DatabaseRules {
} }
pub fn db_name(&self) -> &str { pub fn db_name(&self) -> &str {
&self.name.as_str() self.name.as_str()
} }
} }
@ -109,7 +109,7 @@ pub trait Partitioner {
impl Partitioner for DatabaseRules { impl Partitioner for DatabaseRules {
fn partition_key(&self, line: &ParsedLine<'_>, default_time: i64) -> Result<String> { fn partition_key(&self, line: &ParsedLine<'_>, default_time: i64) -> Result<String> {
self.partition_key(&line, default_time) self.partition_key(line, default_time)
} }
} }
@ -268,16 +268,16 @@ impl Partitioner for PartitionTemplate {
.iter() .iter()
.map(|p| match p { .map(|p| match p {
TemplatePart::Table => line.series.measurement.to_string(), TemplatePart::Table => line.series.measurement.to_string(),
TemplatePart::Column(column) => match line.tag_value(&column) { TemplatePart::Column(column) => match line.tag_value(column) {
Some(v) => format!("{}_{}", column, v), Some(v) => format!("{}_{}", column, v),
None => match line.field_value(&column) { None => match line.field_value(column) {
Some(v) => format!("{}_{}", column, v), Some(v) => format!("{}_{}", column, v),
None => "".to_string(), None => "".to_string(),
}, },
}, },
TemplatePart::TimeFormat(format) => { TemplatePart::TimeFormat(format) => {
let nanos = line.timestamp.unwrap_or(default_time); let nanos = line.timestamp.unwrap_or(default_time);
Utc.timestamp_nanos(nanos).format(&format).to_string() Utc.timestamp_nanos(nanos).format(format).to_string()
} }
_ => unimplemented!(), _ => unimplemented!(),
}) })

View File

@ -58,7 +58,7 @@ impl Job {
Self::CompactChunks { partition, .. } => Some(&partition.db_name), Self::CompactChunks { partition, .. } => Some(&partition.db_name),
Self::PersistChunks { partition, .. } => Some(&partition.db_name), Self::PersistChunks { partition, .. } => Some(&partition.db_name),
Self::DropChunk { chunk, .. } => Some(&chunk.db_name), Self::DropChunk { chunk, .. } => Some(&chunk.db_name),
Self::WipePreservedCatalog { db_name, .. } => Some(&db_name), Self::WipePreservedCatalog { db_name, .. } => Some(db_name),
} }
} }

View File

@ -306,7 +306,7 @@ pub fn pb_to_entry(database_batch: &pb::DatabaseBatch) -> Result<Entry> {
let mut table_batches = Vec::with_capacity(database_batch.table_batches.len()); let mut table_batches = Vec::with_capacity(database_batch.table_batches.len());
for table_batch in &database_batch.table_batches { for table_batch in &database_batch.table_batches {
table_batches.push(pb_table_batch_to_fb(&mut fbb, &table_batch)?); table_batches.push(pb_table_batch_to_fb(&mut fbb, table_batch)?);
} }
let partition_key = fbb.create_string("pkey"); let partition_key = fbb.create_string("pkey");
let table_batches = fbb.create_vector(&table_batches); let table_batches = fbb.create_vector(&table_batches);
@ -406,7 +406,7 @@ fn pb_table_batch_to_fb<'a>(
for column in &table_batch.columns { for column in &table_batch.columns {
columns.push(pb_column_to_fb( columns.push(pb_column_to_fb(
fbb, fbb,
&column, column,
table_batch.row_count as usize, table_batch.row_count as usize,
)?); )?);
} }
@ -1571,7 +1571,7 @@ impl<'a> ColumnBuilder<'a> {
) )
} }
ColumnRaw::Time(values) => { ColumnRaw::Time(values) => {
let values = fbb.create_vector(&values); let values = fbb.create_vector(values);
let values = entry_fb::I64Values::create( let values = entry_fb::I64Values::create(
fbb, fbb,
&entry_fb::I64ValuesArgs { &entry_fb::I64ValuesArgs {
@ -1586,7 +1586,7 @@ impl<'a> ColumnBuilder<'a> {
) )
} }
ColumnRaw::I64(values) => { ColumnRaw::I64(values) => {
let values = fbb.create_vector(&values); let values = fbb.create_vector(values);
let values = entry_fb::I64Values::create( let values = entry_fb::I64Values::create(
fbb, fbb,
&entry_fb::I64ValuesArgs { &entry_fb::I64ValuesArgs {
@ -1601,7 +1601,7 @@ impl<'a> ColumnBuilder<'a> {
) )
} }
ColumnRaw::Bool(values) => { ColumnRaw::Bool(values) => {
let values = fbb.create_vector(&values); let values = fbb.create_vector(values);
let values = entry_fb::BoolValues::create( let values = entry_fb::BoolValues::create(
fbb, fbb,
&entry_fb::BoolValuesArgs { &entry_fb::BoolValuesArgs {
@ -1616,7 +1616,7 @@ impl<'a> ColumnBuilder<'a> {
) )
} }
ColumnRaw::F64(values) => { ColumnRaw::F64(values) => {
let values = fbb.create_vector(&values); let values = fbb.create_vector(values);
let values = entry_fb::F64Values::create( let values = entry_fb::F64Values::create(
fbb, fbb,
&entry_fb::F64ValuesArgs { &entry_fb::F64ValuesArgs {
@ -1631,7 +1631,7 @@ impl<'a> ColumnBuilder<'a> {
) )
} }
ColumnRaw::U64(values) => { ColumnRaw::U64(values) => {
let values = fbb.create_vector(&values); let values = fbb.create_vector(values);
let values = entry_fb::U64Values::create( let values = entry_fb::U64Values::create(
fbb, fbb,
&entry_fb::U64ValuesArgs { &entry_fb::U64ValuesArgs {
@ -1802,7 +1802,7 @@ pub mod test_helpers {
/// Converts the line protocol to a single `Entry` with a single shard and /// Converts the line protocol to a single `Entry` with a single shard and
/// a single partition. /// a single partition.
pub fn lp_to_entry(lp: &str) -> Entry { pub fn lp_to_entry(lp: &str) -> Entry {
let lines: Vec<_> = parse_lines(&lp).map(|l| l.unwrap()).collect(); let lines: Vec<_> = parse_lines(lp).map(|l| l.unwrap()).collect();
let default_time = Utc::now().timestamp_nanos(); let default_time = Utc::now().timestamp_nanos();
@ -1822,7 +1822,7 @@ pub mod test_helpers {
/// shard and a single partition, which is useful for testing when `lp` is /// shard and a single partition, which is useful for testing when `lp` is
/// large. Batches are sized according to LP_BATCH_SIZE. /// large. Batches are sized according to LP_BATCH_SIZE.
pub fn lp_to_entries(lp: &str, partitioner: &impl Partitioner) -> Vec<Entry> { pub fn lp_to_entries(lp: &str, partitioner: &impl Partitioner) -> Vec<Entry> {
let lines: Vec<_> = parse_lines(&lp).map(|l| l.unwrap()).collect(); let lines: Vec<_> = parse_lines(lp).map(|l| l.unwrap()).collect();
let default_time = Utc::now().timestamp_nanos(); let default_time = Utc::now().timestamp_nanos();
@ -2446,7 +2446,7 @@ mod tests {
// One point that has no timestamp // One point that has no timestamp
let lp = "a val=1i"; let lp = "a val=1i";
let lines: Vec<_> = parse_lines(&lp).map(|l| l.unwrap()).collect(); let lines: Vec<_> = parse_lines(lp).map(|l| l.unwrap()).collect();
// Partition on the hour // Partition on the hour
let hour_partitioner = hour_partitioner(); let hour_partitioner = hour_partitioner();

View File

@ -1,6 +1,6 @@
mod entry; mod entry;
#[allow(unused_imports)] #[allow(unused_imports, clippy::needless_borrow)]
mod entry_generated; mod entry_generated;
pub use crate::entry::*; pub use crate::entry::*;

View File

@ -177,7 +177,7 @@ mod tests {
let client = Client::new(&mockito::server_url(), token); let client = Client::new(&mockito::server_url(), token);
let _result = client.query_suggestions_name(&suggestion_name).await; let _result = client.query_suggestions_name(suggestion_name).await;
mock_server.assert(); mock_server.assert();
} }

View File

@ -198,7 +198,7 @@ impl PerformQuery {
message message
.header_as_dictionary_batch() .header_as_dictionary_batch()
.ok_or(Error::CouldNotGetDictionaryBatch)?, .ok_or(Error::CouldNotGetDictionaryBatch)?,
&schema, schema,
dictionaries_by_field, dictionaries_by_field,
)?; )?;
@ -213,8 +213,8 @@ impl PerformQuery {
Ok(Some(flight_data_to_arrow_batch( Ok(Some(flight_data_to_arrow_batch(
&data, &data,
Arc::clone(&schema), Arc::clone(schema),
&dictionaries_by_field, dictionaries_by_field,
)?)) )?))
} }
} }

View File

@ -118,9 +118,9 @@ impl QueryOutputFormat {
/// ``` /// ```
pub fn format(&self, batches: &[RecordBatch]) -> Result<String> { pub fn format(&self, batches: &[RecordBatch]) -> Result<String> {
match self { match self {
Self::Pretty => batches_to_pretty(&batches), Self::Pretty => batches_to_pretty(batches),
Self::Csv => batches_to_csv(&batches), Self::Csv => batches_to_csv(batches),
Self::Json => batches_to_json(&batches), Self::Json => batches_to_json(batches),
} }
} }
} }

View File

@ -23,7 +23,7 @@ pub fn encode(src: &[i64], dst: &mut Vec<u8>) -> Result<(), Box<dyn Error>> {
} }
let mut max: u64 = 0; let mut max: u64 = 0;
let mut deltas = i64_to_u64_vector(&src); let mut deltas = i64_to_u64_vector(src);
for i in (1..deltas.len()).rev() { for i in (1..deltas.len()).rev() {
deltas[i] = zig_zag_encode(deltas[i].wrapping_sub(deltas[i - 1]) as i64); deltas[i] = zig_zag_encode(deltas[i].wrapping_sub(deltas[i - 1]) as i64);
if deltas[i] > max { if deltas[i] > max {

View File

@ -152,8 +152,8 @@ pub fn decode(src: &[u8], dst: &mut Vec<i64>) -> Result<(), Box<dyn Error>> {
encoding if encoding == Encoding::Uncompressed as u8 => { encoding if encoding == Encoding::Uncompressed as u8 => {
decode_uncompressed(&src[1..], dst) // first byte not used decode_uncompressed(&src[1..], dst) // first byte not used
} }
encoding if encoding == Encoding::Rle as u8 => decode_rle(&src, dst), encoding if encoding == Encoding::Rle as u8 => decode_rle(src, dst),
encoding if encoding == Encoding::Simple8b as u8 => decode_simple8b(&src, dst), encoding if encoding == Encoding::Simple8b as u8 => decode_simple8b(src, dst),
_ => Err(From::from("invalid block encoding")), _ => Err(From::from("invalid block encoding")),
} }
} }

View File

@ -6,7 +6,7 @@ use std::error::Error;
/// deltas are further compressed if possible, either via bit-packing using /// deltas are further compressed if possible, either via bit-packing using
/// simple8b or by run-length encoding the deltas if they're all the same. /// simple8b or by run-length encoding the deltas if they're all the same.
pub fn encode(src: &[u64], dst: &mut Vec<u8>) -> Result<(), Box<dyn Error>> { pub fn encode(src: &[u64], dst: &mut Vec<u8>) -> Result<(), Box<dyn Error>> {
let signed = u64_to_i64_vector(&src); let signed = u64_to_i64_vector(src);
super::integer::encode(&signed, dst) super::integer::encode(&signed, dst)
} }

View File

@ -468,7 +468,7 @@ impl BlockData {
} }
// The merged output block data to be returned // The merged output block data to be returned
let mut block_data = Self::new_from_data(&blocks.first().unwrap()); let mut block_data = Self::new_from_data(blocks.first().unwrap());
// buf will hold the next candidates from each of the sorted input // buf will hold the next candidates from each of the sorted input
// blocks. // blocks.

View File

@ -475,7 +475,7 @@ impl Schema {
// Now, sort lexographically (but put timestamp last) // Now, sort lexographically (but put timestamp last)
primary_keys.sort_by(|(a_column_type, a), (b_column_type, b)| { primary_keys.sort_by(|(a_column_type, a), (b_column_type, b)| {
match (a_column_type, b_column_type) { match (a_column_type, b_column_type) {
(Tag, Tag) => a.name().cmp(&b.name()), (Tag, Tag) => a.name().cmp(b.name()),
(Timestamp, Tag) => Ordering::Greater, (Timestamp, Tag) => Ordering::Greater,
(Tag, Timestamp) => Ordering::Less, (Tag, Timestamp) => Ordering::Less,
(Timestamp, Timestamp) => panic!("multiple timestamps in summary"), (Timestamp, Timestamp) => panic!("multiple timestamps in summary"),

View File

@ -258,6 +258,12 @@ fn normalize_spans(lines: Vec<String>) -> Vec<String> {
// Note: we include leading and trailing spaces so that span=2 // Note: we include leading and trailing spaces so that span=2
// doesn't also match span=21423 // doesn't also match span=21423
let re = Regex::new(r#" span=(\d+) "#).unwrap(); let re = Regex::new(r#" span=(\d+) "#).unwrap();
// This collect isn't needless: the `fold` below moves `lines`, so this
// iterator can't borrow `lines`, we need to collect into a `Vec` to
// stop borrowing `lines`.
// See https://github.com/rust-lang/rust-clippy/issues/7336
#[allow(clippy::needless_collect)]
let span_ids: Vec<String> = lines let span_ids: Vec<String> = lines
.iter() .iter()
.map(|line| re.find_iter(line)) .map(|line| re.find_iter(line))

View File

@ -569,7 +569,7 @@ impl AggregatableByRange for Vector {
from_row_id: usize, from_row_id: usize,
to_row_id: usize, to_row_id: usize,
) -> Aggregate { ) -> Aggregate {
Vector::aggregate_by_id_range(&self, agg_type, from_row_id, to_row_id) Vector::aggregate_by_id_range(self, agg_type, from_row_id, to_row_id)
} }
} }
@ -861,7 +861,7 @@ impl Column {
} }
let now = std::time::Instant::now(); let now = std::time::Instant::now();
let v = c.encoded_values(&row_ids); let v = c.encoded_values(row_ids);
debug!("time getting encoded values {:?}", now.elapsed()); debug!("time getting encoded values {:?}", now.elapsed());
debug!("dictionary {:?}", c.data.dictionary()); debug!("dictionary {:?}", c.data.dictionary());
@ -872,14 +872,14 @@ impl Column {
return Vector::Float(vec![]); return Vector::Float(vec![]);
} }
Vector::Float(c.encoded_values(&row_ids)) Vector::Float(c.encoded_values(row_ids))
} }
Column::Integer(c) => { Column::Integer(c) => {
if row_ids.is_empty() { if row_ids.is_empty() {
return Vector::Integer(vec![]); return Vector::Integer(vec![]);
} }
Vector::Integer(c.encoded_values(&row_ids)) Vector::Integer(c.encoded_values(row_ids))
} }
} }
} }
@ -1328,7 +1328,7 @@ impl AggregatableByRange for &Column {
from_row_id: usize, from_row_id: usize,
to_row_id: usize, to_row_id: usize,
) -> Aggregate { ) -> Aggregate {
Column::aggregate_by_id_range(&self, agg_type, from_row_id, to_row_id) Column::aggregate_by_id_range(self, agg_type, from_row_id, to_row_id)
} }
} }

View File

@ -305,7 +305,7 @@ impl Segment {
// we are grouping on and store each group as an iterator. // we are grouping on and store each group as an iterator.
let mut group_column_encoded_values = Vec::with_capacity(group_columns.len()); let mut group_column_encoded_values = Vec::with_capacity(group_columns.len());
for group_column in group_columns { for group_column in group_columns {
if let Some(column) = self.column(&group_column) { if let Some(column) = self.column(group_column) {
let encoded_values = if filtered_row_ids_vec.len() == self.meta.rows { let encoded_values = if filtered_row_ids_vec.len() == self.meta.rows {
column.all_encoded_values() column.all_encoded_values()
} else { } else {
@ -330,7 +330,7 @@ impl Segment {
for (column_name, _) in aggregates { for (column_name, _) in aggregates {
// let column_name: &String = column_name; // let column_name: &String = column_name;
if let Some(column) = self.column(&column_name) { if let Some(column) = self.column(column_name) {
let decoded_values = column.values(&filtered_row_ids_vec); let decoded_values = column.values(&filtered_row_ids_vec);
assert_eq!( assert_eq!(
filtered_row_ids.cardinality() as usize, filtered_row_ids.cardinality() as usize,
@ -514,7 +514,7 @@ impl Segment {
// we are grouping on and store each group as an iterator. // we are grouping on and store each group as an iterator.
let mut group_column_encoded_values = Vec::with_capacity(group_columns.len()); let mut group_column_encoded_values = Vec::with_capacity(group_columns.len());
for group_column in group_columns { for group_column in group_columns {
if let Some(column) = self.column(&group_column) { if let Some(column) = self.column(group_column) {
let encoded_values = if filtered_row_ids_vec.len() == self.meta.rows { let encoded_values = if filtered_row_ids_vec.len() == self.meta.rows {
column.all_encoded_values() column.all_encoded_values()
} else { } else {
@ -536,7 +536,7 @@ impl Segment {
// aggregating on. // aggregating on.
let mut aggregate_column_decoded_values = Vec::with_capacity(aggregates.len()); let mut aggregate_column_decoded_values = Vec::with_capacity(aggregates.len());
for (column_name, _) in aggregates { for (column_name, _) in aggregates {
if let Some(column) = self.column(&column_name) { if let Some(column) = self.column(column_name) {
let decoded_values = column.values(&filtered_row_ids_vec); let decoded_values = column.values(&filtered_row_ids_vec);
assert_eq!( assert_eq!(
filtered_row_ids.cardinality() as usize, filtered_row_ids.cardinality() as usize,
@ -657,7 +657,7 @@ impl Segment {
// we are grouping on and store each group as an iterator. // we are grouping on and store each group as an iterator.
let mut group_column_encoded_values = Vec::with_capacity(group_columns.len()); let mut group_column_encoded_values = Vec::with_capacity(group_columns.len());
for group_column in group_columns { for group_column in group_columns {
if let Some(column) = self.column(&group_column) { if let Some(column) = self.column(group_column) {
let encoded_values = if filtered_row_ids_vec.len() == self.meta.rows { let encoded_values = if filtered_row_ids_vec.len() == self.meta.rows {
column.all_encoded_values() column.all_encoded_values()
} else { } else {
@ -688,7 +688,7 @@ impl Segment {
aggregate_cols.push(( aggregate_cols.push((
column_name.clone(), column_name.clone(),
agg_type.clone(), agg_type.clone(),
self.column(&column_name).unwrap(), self.column(column_name).unwrap(),
)); ));
} }
@ -1358,8 +1358,8 @@ impl<'a> Segments<'a> {
segment.aggregate_by_group_with_hash( segment.aggregate_by_group_with_hash(
time_range, time_range,
predicates, predicates,
&group_columns, group_columns,
&aggregates, aggregates,
window, window,
); );
info!( info!(

View File

@ -394,7 +394,7 @@ impl Domain {
Arc::clone(vacant.insert(gauge)) Arc::clone(vacant.insert(gauge))
} }
}; };
Gauge::new(gauge, [&self.default_labels[..], &default_labels].concat()) Gauge::new(gauge, [&self.default_labels[..], default_labels].concat())
} }
/// An observer can be used to provide asynchronous fetching of values from an object /// An observer can be used to provide asynchronous fetching of values from an object

View File

@ -375,7 +375,7 @@ impl Histogram {
/// A helper method for observing latencies. Returns a new timing instrument /// A helper method for observing latencies. Returns a new timing instrument
/// which will handle submitting an observation containing a duration. /// which will handle submitting an observation containing a duration.
pub fn timer(&self) -> HistogramTimer<'_> { pub fn timer(&self) -> HistogramTimer<'_> {
HistogramTimer::new(&self) HistogramTimer::new(self)
} }
} }
#[derive(Debug)] #[derive(Debug)]

View File

@ -281,7 +281,7 @@ impl MBChunk {
); );
if let Some(c) = self.columns.get(column.name()) { if let Some(c) = self.columns.get(column.name()) {
c.validate_schema(&column).context(ColumnError { c.validate_schema(column).context(ColumnError {
column: column.name(), column: column.name(),
})?; })?;
} }
@ -445,7 +445,7 @@ mod tests {
cpu,host=c,env=stage val=11 1 cpu,host=c,env=stage val=11 1
cpu,host=a,env=prod val=14 2 cpu,host=a,env=prod val=14 2
"#; "#;
let chunk = write_lp_to_new_chunk(&lp).unwrap(); let chunk = write_lp_to_new_chunk(lp).unwrap();
let summary = chunk.table_summary(); let summary = chunk.table_summary();
assert_eq!(summary.name, "cpu"); assert_eq!(summary.name, "cpu");
@ -525,7 +525,7 @@ mod tests {
cpu,host2=a v=1 40 cpu,host2=a v=1 40
cpu,host=c v=1 50 cpu,host=c v=1 50
"#; "#;
let chunk = write_lp_to_new_chunk(&lp).unwrap(); let chunk = write_lp_to_new_chunk(lp).unwrap();
let expected = ColumnSummary { let expected = ColumnSummary {
name: "host".into(), name: "host".into(),
influxdb_type: Some(InfluxDbType::Tag), influxdb_type: Some(InfluxDbType::Tag),
@ -550,7 +550,7 @@ mod tests {
cpu,host=a val=false 40 cpu,host=a val=false 40
cpu,host=c other_val=2 50 cpu,host=c other_val=2 50
"#; "#;
let chunk = write_lp_to_new_chunk(&lp).unwrap(); let chunk = write_lp_to_new_chunk(lp).unwrap();
let expected = ColumnSummary { let expected = ColumnSummary {
name: "val".into(), name: "val".into(),
influxdb_type: Some(InfluxDbType::Field), influxdb_type: Some(InfluxDbType::Field),
@ -574,7 +574,7 @@ mod tests {
cpu,host=a val=1u 40 cpu,host=a val=1u 40
cpu,host=c other_val=2 50 cpu,host=c other_val=2 50
"#; "#;
let chunk = write_lp_to_new_chunk(&lp).unwrap(); let chunk = write_lp_to_new_chunk(lp).unwrap();
let expected = ColumnSummary { let expected = ColumnSummary {
name: "val".into(), name: "val".into(),
influxdb_type: Some(InfluxDbType::Field), influxdb_type: Some(InfluxDbType::Field),
@ -598,7 +598,7 @@ mod tests {
cpu,host=a val=1.0 40 cpu,host=a val=1.0 40
cpu,host=c other_val=2.0 50 cpu,host=c other_val=2.0 50
"#; "#;
let chunk = write_lp_to_new_chunk(&lp).unwrap(); let chunk = write_lp_to_new_chunk(lp).unwrap();
let expected = ColumnSummary { let expected = ColumnSummary {
name: "val".into(), name: "val".into(),
influxdb_type: Some(InfluxDbType::Field), influxdb_type: Some(InfluxDbType::Field),
@ -622,7 +622,7 @@ mod tests {
cpu,host=a val=1i 40 cpu,host=a val=1i 40
cpu,host=c other_val=2.0 50 cpu,host=c other_val=2.0 50
"#; "#;
let chunk = write_lp_to_new_chunk(&lp).unwrap(); let chunk = write_lp_to_new_chunk(lp).unwrap();
let expected = ColumnSummary { let expected = ColumnSummary {
name: "val".into(), name: "val".into(),
influxdb_type: Some(InfluxDbType::Field), influxdb_type: Some(InfluxDbType::Field),
@ -646,7 +646,7 @@ mod tests {
cpu,host=a val="v3" 40 cpu,host=a val="v3" 40
cpu,host=c other_val=2.0 50 cpu,host=c other_val=2.0 50
"#; "#;
let chunk = write_lp_to_new_chunk(&lp).unwrap(); let chunk = write_lp_to_new_chunk(lp).unwrap();
let expected = ColumnSummary { let expected = ColumnSummary {
name: "val".into(), name: "val".into(),
influxdb_type: Some(InfluxDbType::Field), influxdb_type: Some(InfluxDbType::Field),
@ -670,7 +670,7 @@ mod tests {
cpu,host=a val=4 2 cpu,host=a val=4 2
cpu,host=c val=25 12 cpu,host=c val=25 12
"#; "#;
let chunk = write_lp_to_new_chunk(&lp).unwrap(); let chunk = write_lp_to_new_chunk(lp).unwrap();
let expected = ColumnSummary { let expected = ColumnSummary {
name: "time".into(), name: "time".into(),
influxdb_type: Some(InfluxDbType::Timestamp), influxdb_type: Some(InfluxDbType::Timestamp),
@ -699,7 +699,7 @@ mod tests {
cpu,host2=z v=1 40 cpu,host2=z v=1 40
cpu,host=c v=1 5 cpu,host=c v=1 5
"#; "#;
write_lp_to_chunk(&lp2, &mut chunk).unwrap(); write_lp_to_chunk(lp2, &mut chunk).unwrap();
let expected = ColumnSummary { let expected = ColumnSummary {
name: "host".into(), name: "host".into(),
@ -823,7 +823,7 @@ mod tests {
let mut table = write_lp_to_new_chunk(lp).unwrap(); let mut table = write_lp_to_new_chunk(lp).unwrap();
let lp = "foo t1=\"string\" 1"; let lp = "foo t1=\"string\" 1";
let entry = lp_to_entry(&lp); let entry = lp_to_entry(lp);
let response = table let response = table
.write_columns( .write_columns(
entry entry
@ -854,7 +854,7 @@ mod tests {
); );
let lp = "foo iv=1u 1"; let lp = "foo iv=1u 1";
let entry = lp_to_entry(&lp); let entry = lp_to_entry(lp);
let response = table let response = table
.write_columns( .write_columns(
entry entry
@ -885,7 +885,7 @@ mod tests {
); );
let lp = "foo fv=1i 1"; let lp = "foo fv=1i 1";
let entry = lp_to_entry(&lp); let entry = lp_to_entry(lp);
let response = table let response = table
.write_columns( .write_columns(
entry entry
@ -916,7 +916,7 @@ mod tests {
); );
let lp = "foo bv=1 1"; let lp = "foo bv=1 1";
let entry = lp_to_entry(&lp); let entry = lp_to_entry(lp);
let response = table let response = table
.write_columns( .write_columns(
entry entry
@ -947,7 +947,7 @@ mod tests {
); );
let lp = "foo sv=true 1"; let lp = "foo sv=true 1";
let entry = lp_to_entry(&lp); let entry = lp_to_entry(lp);
let response = table let response = table
.write_columns( .write_columns(
entry entry
@ -978,7 +978,7 @@ mod tests {
); );
let lp = "foo,sv=\"bar\" f=3i 1"; let lp = "foo,sv=\"bar\" f=3i 1";
let entry = lp_to_entry(&lp); let entry = lp_to_entry(lp);
let response = table let response = table
.write_columns( .write_columns(
entry entry

View File

@ -89,7 +89,7 @@ impl ObjectStoreApi for InMemory {
} }
async fn delete(&self, location: &Self::Path) -> Result<()> { async fn delete(&self, location: &Self::Path) -> Result<()> {
self.storage.write().await.remove(&location); self.storage.write().await.remove(location);
Ok(()) Ok(())
} }

View File

@ -453,7 +453,7 @@ mod tests {
} }
async fn measure_list(store: &ThrottledStore<InMemory>, n_entries: usize) -> Duration { async fn measure_list(store: &ThrottledStore<InMemory>, n_entries: usize) -> Duration {
let prefix = place_test_objects(&store, n_entries).await; let prefix = place_test_objects(store, n_entries).await;
let t0 = Instant::now(); let t0 = Instant::now();
store store
@ -471,7 +471,7 @@ mod tests {
store: &ThrottledStore<InMemory>, store: &ThrottledStore<InMemory>,
n_entries: usize, n_entries: usize,
) -> Duration { ) -> Duration {
let prefix = place_test_objects(&store, n_entries).await; let prefix = place_test_objects(store, n_entries).await;
let t0 = Instant::now(); let t0 = Instant::now();
store.list_with_delimiter(&prefix).await.unwrap(); store.list_with_delimiter(&prefix).await.unwrap();

View File

@ -303,7 +303,7 @@ where
} }
pub fn iter(&self) -> PackerIterator<'_, T> { pub fn iter(&self) -> PackerIterator<'_, T> {
PackerIterator::new(&self) PackerIterator::new(self)
} }
// TODO(edd): I don't like these getters. They're only needed so we can // TODO(edd): I don't like these getters. They're only needed so we can

View File

@ -624,7 +624,7 @@ async fn list_files(
server_id: ServerId, server_id: ServerId,
db_name: &str, db_name: &str,
) -> Result<Vec<(Path, FileType, u64, Uuid)>> { ) -> Result<Vec<(Path, FileType, u64, Uuid)>> {
let list_path = catalog_path(&object_store, server_id, &db_name); let list_path = catalog_path(object_store, server_id, db_name);
let paths = object_store let paths = object_store
.list(Some(&list_path)) .list(Some(&list_path))
.await .await
@ -658,7 +658,7 @@ async fn store_transaction_proto(
object_store object_store
.put( .put(
&path, path,
futures::stream::once(async move { Ok(data) }), futures::stream::once(async move { Ok(data) }),
Some(len), Some(len),
) )
@ -674,7 +674,7 @@ async fn load_transaction_proto(
path: &Path, path: &Path,
) -> Result<proto::Transaction> { ) -> Result<proto::Transaction> {
let data = object_store let data = object_store
.get(&path) .get(path)
.await .await
.context(Read {})? .context(Read {})?
.map_ok(|bytes| bytes.to_vec()) .map_ok(|bytes| bytes.to_vec())
@ -2492,7 +2492,7 @@ mod tests {
&object_store, &object_store,
server_id, server_id,
db_name, db_name,
&tkey, tkey,
FileType::Checkpoint, FileType::Checkpoint,
); );
store_transaction_proto(&object_store, &path, &proto) store_transaction_proto(&object_store, &path, &proto)
@ -2650,7 +2650,7 @@ mod tests {
object_store object_store
.put( .put(
&path, path,
futures::stream::once(async move { Ok(data) }), futures::stream::once(async move { Ok(data) }),
Some(len), Some(len),
) )
@ -2661,7 +2661,7 @@ mod tests {
async fn checked_delete(object_store: &ObjectStore, path: &Path) { async fn checked_delete(object_store: &ObjectStore, path: &Path) {
// issue full GET operation to check if object is preset // issue full GET operation to check if object is preset
object_store object_store
.get(&path) .get(path)
.await .await
.unwrap() .unwrap()
.map_ok(|bytes| bytes.to_vec()) .map_ok(|bytes| bytes.to_vec())
@ -2670,7 +2670,7 @@ mod tests {
.unwrap(); .unwrap();
// delete it // delete it
object_store.delete(&path).await.unwrap(); object_store.delete(path).await.unwrap();
} }
/// Result of [`assert_single_catalog_inmem_works`]. /// Result of [`assert_single_catalog_inmem_works`].
@ -2713,7 +2713,7 @@ mod tests {
db_name: &str, db_name: &str,
) -> TestTrace { ) -> TestTrace {
let (catalog, mut state) = PreservedCatalog::new_empty( let (catalog, mut state) = PreservedCatalog::new_empty(
Arc::clone(&object_store), Arc::clone(object_store),
server_id, server_id,
db_name.to_string(), db_name.to_string(),
(), (),
@ -2991,7 +2991,7 @@ mod tests {
assert_single_catalog_inmem_works(&object_store, make_server_id(), "db1").await; assert_single_catalog_inmem_works(&object_store, make_server_id(), "db1").await;
// create junk // create junk
let mut path = catalog_path(&object_store, server_id, &db_name); let mut path = catalog_path(&object_store, server_id, db_name);
path.push_dir("0"); path.push_dir("0");
path.set_file_name(format!("{}.foo", Uuid::new_v4())); path.set_file_name(format!("{}.foo", Uuid::new_v4()));
create_empty_file(&object_store, &path).await; create_empty_file(&object_store, &path).await;
@ -3002,7 +3002,7 @@ mod tests {
.unwrap(); .unwrap();
// check file is still there // check file is still there
let prefix = catalog_path(&object_store, server_id, &db_name); let prefix = catalog_path(&object_store, server_id, db_name);
let files = object_store let files = object_store
.list(Some(&prefix)) .list(Some(&prefix))
.await .await

View File

@ -115,7 +115,7 @@ pub async fn delete_files(catalog: &PreservedCatalog, files: &[Path]) -> Result<
for path in files { for path in files {
info!(path = %path.display(), "Delete file"); info!(path = %path.display(), "Delete file");
store.delete(&path).await.context(WriteError)?; store.delete(path).await.context(WriteError)?;
} }
info!(n_files = files.len(), "Finished deletion, removed files."); info!(n_files = files.len(), "Finished deletion, removed files.");
@ -383,7 +383,7 @@ mod tests {
object_store object_store
.put( .put(
&path, path,
futures::stream::once(async move { Ok(data) }), futures::stream::once(async move { Ok(data) }),
Some(len), Some(len),
) )

View File

@ -232,7 +232,7 @@ impl Storage {
self.object_store self.object_store
.put( .put(
&file_name, file_name,
futures::stream::once(async move { stream_data }), futures::stream::once(async move { stream_data }),
Some(len), Some(len),
) )

View File

@ -90,7 +90,7 @@ impl ExtensionPlanner for IOxExtensionPlanner {
let split_expr = planner.create_physical_expr( let split_expr = planner.create_physical_expr(
stream_split.split_expr(), stream_split.split_expr(),
&logical_inputs[0].schema(), logical_inputs[0].schema(),
&physical_inputs[0].schema(), &physical_inputs[0].schema(),
ctx_state, ctx_state,
)?; )?;
@ -207,7 +207,7 @@ impl IOxExecutionContext {
pub fn prepare_plan(&self, plan: &LogicalPlan) -> Result<Arc<dyn ExecutionPlan>> { pub fn prepare_plan(&self, plan: &LogicalPlan) -> Result<Arc<dyn ExecutionPlan>> {
debug!(text=%plan.display_indent_schema(), "prepare_plan: initial plan"); debug!(text=%plan.display_indent_schema(), "prepare_plan: initial plan");
let plan = self.inner.optimize(&plan)?; let plan = self.inner.optimize(plan)?;
trace!(text=%plan.display_indent_schema(), graphviz=%plan.display_graphviz(), "optimized plan"); trace!(text=%plan.display_indent_schema(), graphviz=%plan.display_graphviz(), "optimized plan");
let physical_plan = self.inner.create_physical_plan(&plan)?; let physical_plan = self.inner.create_physical_plan(&plan)?;

View File

@ -100,7 +100,7 @@ impl FieldIndexes {
} }
pub fn as_slice(&self) -> &[FieldIndex] { pub fn as_slice(&self) -> &[FieldIndex] {
&self.inner.as_ref() self.inner.as_ref()
} }
} }
@ -138,7 +138,7 @@ impl FieldIndexes {
column_name: TIME_COLUMN_NAME, column_name: TIME_COLUMN_NAME,
})?; })?;
Self::names_to_indexes(schema, &field_names)? Self::names_to_indexes(schema, field_names)?
.into_iter() .into_iter()
.map(|field_index| FieldIndex { .map(|field_index| FieldIndex {
value_index: field_index, value_index: field_index,

View File

@ -377,7 +377,7 @@ impl SeriesSetConverter {
), ),
}; };
tag_value.map(|tag_value| (Arc::clone(&column_name), Arc::from(tag_value.as_str()))) tag_value.map(|tag_value| (Arc::clone(column_name), Arc::from(tag_value.as_str())))
}) })
.collect() .collect()
} }

View File

@ -73,7 +73,7 @@ impl UserDefinedLogicalNode for StreamSplitNode {
/// Schema is the same as the input schema /// Schema is the same as the input schema
fn schema(&self) -> &DFSchemaRef { fn schema(&self) -> &DFSchemaRef {
&self.input.schema() self.input.schema()
} }
fn expressions(&self) -> Vec<Expr> { fn expressions(&self) -> Vec<Expr> {

View File

@ -662,7 +662,7 @@ impl InfluxRpcPlanner {
for chunk in chunks { for chunk in chunks {
// Try and apply the predicate using only metadata // Try and apply the predicate using only metadata
let pred_result = chunk let pred_result = chunk
.apply_predicate_to_metadata(&predicate) .apply_predicate_to_metadata(predicate)
.map_err(|e| Box::new(e) as _) .map_err(|e| Box::new(e) as _)
.context(CheckingChunkPredicate { .context(CheckingChunkPredicate {
chunk_id: chunk.id(), chunk_id: chunk.id(),
@ -1048,7 +1048,7 @@ impl InfluxRpcPlanner {
}; };
// Group by all tag columns and the window bounds // Group by all tag columns and the window bounds
let window_bound = make_window_bound_expr(TIME_COLUMN_NAME.as_expr(), &every, &offset) let window_bound = make_window_bound_expr(TIME_COLUMN_NAME.as_expr(), every, offset)
.alias(TIME_COLUMN_NAME); .alias(TIME_COLUMN_NAME);
let group_exprs = schema let group_exprs = schema
@ -1517,7 +1517,7 @@ mod tests {
fn reorder_prefix_err(prefix: &[&str], table_columns: &[&str]) -> String { fn reorder_prefix_err(prefix: &[&str], table_columns: &[&str]) -> String {
let table_columns = table_columns.to_vec(); let table_columns = table_columns.to_vec();
let res = reorder_prefix(&prefix, table_columns); let res = reorder_prefix(prefix, table_columns);
match res { match res {
Ok(r) => { Ok(r) => {

View File

@ -587,7 +587,7 @@ impl<C: QueryChunk + 'static> Deduplicater<C> {
Self::build_sort_plan_for_read_filter( Self::build_sort_plan_for_read_filter(
Arc::clone(&table_name), Arc::clone(&table_name),
Arc::clone(&input_schema), Arc::clone(&input_schema),
Arc::clone(&chunk), Arc::clone(chunk),
predicate.clone(), predicate.clone(),
&sort_key, &sort_key,
) )
@ -920,7 +920,7 @@ impl<C: QueryChunk + 'static> Deduplicater<C> {
Self::build_plan_for_non_duplicates_chunk( Self::build_plan_for_non_duplicates_chunk(
Arc::clone(&table_name), Arc::clone(&table_name),
Arc::clone(&output_schema), Arc::clone(&output_schema),
Arc::clone(&chunk), Arc::clone(chunk),
predicate.clone(), predicate.clone(),
output_sort_key, output_sort_key,
) )
@ -951,9 +951,9 @@ impl<C: QueryChunk + 'static> Deduplicater<C> {
/// primary key columns /// primary key columns
fn compute_input_schema(output_schema: &Schema, pk_schema: &Schema) -> Arc<Schema> { fn compute_input_schema(output_schema: &Schema, pk_schema: &Schema) -> Arc<Schema> {
let input_schema = SchemaMerger::new() let input_schema = SchemaMerger::new()
.merge(&output_schema) .merge(output_schema)
.unwrap() .unwrap()
.merge(&pk_schema) .merge(pk_schema)
.unwrap() .unwrap()
.build(); .build();
Arc::new(input_schema) Arc::new(input_schema)

View File

@ -14,6 +14,7 @@ use datafusion::physical_plan::{RecordBatchStream, SendableRecordBatchStream};
use futures::Stream; use futures::Stream;
/// Database schema creation / validation errors. /// Database schema creation / validation errors.
#[allow(clippy::enum_variant_names)]
#[derive(Debug, Snafu)] #[derive(Debug, Snafu)]
pub enum Error { pub enum Error {
#[snafu(display("Internal error creating SchemaAdapterStream: field '{}' does not appear in the output schema", #[snafu(display("Internal error creating SchemaAdapterStream: field '{}' does not appear in the output schema",
@ -178,7 +179,7 @@ impl SchemaAdapterStream {
.mappings .mappings
.iter() .iter()
.map(|mapping| match mapping { .map(|mapping| match mapping {
ColumnMapping::FromInput(input_index) => Arc::clone(&batch.column(*input_index)), ColumnMapping::FromInput(input_index) => Arc::clone(batch.column(*input_index)),
ColumnMapping::MakeNull(data_type) => new_null_array(data_type, batch.num_rows()), ColumnMapping::MakeNull(data_type) => new_null_array(data_type, batch.num_rows()),
}) })
.collect::<Vec<_>>(); .collect::<Vec<_>>();

View File

@ -284,12 +284,12 @@ impl RecordBatchDeduplicator {
// Special case when no ranges are duplicated (so just emit input as output) // Special case when no ranges are duplicated (so just emit input as output)
if num_dupes == 0 { if num_dupes == 0 {
trace!(num_rows = batch.num_rows(), "No dupes"); trace!(num_rows = batch.num_rows(), "No dupes");
Self::slice_record_batch(&batch, 0, ranges.len()) Self::slice_record_batch(batch, 0, ranges.len())
} else { } else {
trace!(num_dupes, num_rows = batch.num_rows(), "dupes"); trace!(num_dupes, num_rows = batch.num_rows(), "dupes");
// Use take kernel // Use take kernel
let sort_key_indices = self.compute_sort_key_indices(&ranges); let sort_key_indices = self.compute_sort_key_indices(ranges);
let take_options = Some(TakeOptions { let take_options = Some(TakeOptions {
check_bounds: false, check_bounds: false,
@ -309,7 +309,7 @@ impl RecordBatchDeduplicator {
) )
} else { } else {
// pick the last non null value // pick the last non null value
let field_indices = self.compute_field_indices(&ranges, input_array); let field_indices = self.compute_field_indices(ranges, input_array);
arrow::compute::take( arrow::compute::take(
input_array.as_ref(), input_array.as_ref(),

View File

@ -75,7 +75,7 @@ where
{ {
trace!(?filter_expr, schema=?chunk.schema(), "creating pruning predicate"); trace!(?filter_expr, schema=?chunk.schema(), "creating pruning predicate");
let pruning_predicate = match PruningPredicate::try_new(&filter_expr, chunk.schema().as_arrow()) let pruning_predicate = match PruningPredicate::try_new(filter_expr, chunk.schema().as_arrow())
{ {
Ok(p) => p, Ok(p) => p,
Err(e) => { Err(e) => {

View File

@ -935,7 +935,7 @@ impl DatabaseStore for TestDatabaseStore {
let mut databases = self.databases.lock(); let mut databases = self.databases.lock();
if let Some(db) = databases.get(name) { if let Some(db) = databases.get(name) {
Ok(Arc::clone(&db)) Ok(Arc::clone(db))
} else { } else {
let new_db = Arc::new(TestDatabase::new()); let new_db = Arc::new(TestDatabase::new());
databases.insert(name.to_string(), Arc::clone(&new_db)); databases.insert(name.to_string(), Arc::clone(&new_db));

View File

@ -38,7 +38,7 @@ pub fn arrow_pk_sort_exprs(
) -> Vec<PhysicalSortExpr> { ) -> Vec<PhysicalSortExpr> {
let mut sort_exprs = vec![]; let mut sort_exprs = vec![];
for key in key_columns { for key in key_columns {
let expr = physical_col(key, &input_schema).expect("pk in schema"); let expr = physical_col(key, input_schema).expect("pk in schema");
sort_exprs.push(PhysicalSortExpr { sort_exprs.push(PhysicalSortExpr {
expr, expr,
options: SortOptions { options: SortOptions {
@ -57,7 +57,7 @@ pub fn arrow_sort_key_exprs(
) -> Vec<PhysicalSortExpr> { ) -> Vec<PhysicalSortExpr> {
let mut sort_exprs = vec![]; let mut sort_exprs = vec![];
for (key, options) in sort_key.iter() { for (key, options) in sort_key.iter() {
let expr = physical_col(key, &input_schema).expect("sort key column in schema"); let expr = physical_col(key, input_schema).expect("sort key column in schema");
sort_exprs.push(PhysicalSortExpr { sort_exprs.push(PhysicalSortExpr {
expr, expr,
options: SortOptions { options: SortOptions {

View File

@ -236,7 +236,7 @@ async fn list_tag_values_field_col() {
// Test: temp is a field, not a tag // Test: temp is a field, not a tag
let tag_name = "temp"; let tag_name = "temp";
let plan_result = planner.tag_values(db.as_ref(), &tag_name, predicate.clone()); let plan_result = planner.tag_values(db.as_ref(), tag_name, predicate.clone());
assert_eq!( assert_eq!(
plan_result.unwrap_err().to_string(), plan_result.unwrap_err().to_string(),

View File

@ -20,6 +20,7 @@ use std::{
use self::{parse::TestQueries, setup::TestSetup}; use self::{parse::TestQueries, setup::TestSetup};
use crate::scenarios::{DbScenario, DbSetup}; use crate::scenarios::{DbScenario, DbSetup};
#[allow(clippy::enum_variant_names)]
#[derive(Debug, Snafu)] #[derive(Debug, Snafu)]
pub enum Error { pub enum Error {
#[snafu(display("Can not find case file '{:?}': {}", path, source))] #[snafu(display("Can not find case file '{:?}': {}", path, source))]
@ -275,7 +276,7 @@ impl<W: Write> Runner<W> {
let executor = Arc::new(executor); let executor = Arc::new(executor);
let physical_plan = planner let physical_plan = planner
.query(db, &sql, executor.as_ref()) .query(db, sql, executor.as_ref())
.expect("built plan successfully"); .expect("built plan successfully");
let results: Vec<RecordBatch> = executor let results: Vec<RecordBatch> = executor

View File

@ -7,6 +7,7 @@ use crate::scenarios::{get_all_setups, get_db_setup, DbSetup};
const IOX_SETUP_NEEDLE: &str = "-- IOX_SETUP: "; const IOX_SETUP_NEEDLE: &str = "-- IOX_SETUP: ";
#[allow(clippy::enum_variant_names)]
#[derive(Debug, Snafu)] #[derive(Debug, Snafu)]
pub enum Error { pub enum Error {
#[snafu(display( #[snafu(display(

View File

@ -686,10 +686,10 @@ pub(crate) async fn make_one_chunk_rub_scenario(
let db = make_db().await.db; let db = make_db().await.db;
let table_names = write_lp(&db, data).await; let table_names = write_lp(&db, data).await;
for table_name in &table_names { for table_name in &table_names {
db.rollover_partition(&table_name, partition_key) db.rollover_partition(table_name, partition_key)
.await .await
.unwrap(); .unwrap();
db.move_chunk_to_read_buffer(&table_name, partition_key, 0) db.move_chunk_to_read_buffer(table_name, partition_key, 0)
.await .await
.unwrap(); .unwrap();
} }
@ -745,7 +745,7 @@ pub(crate) async fn make_one_chunk_scenarios(partition_key: &str, data: &str) ->
let db = make_db().await.db; let db = make_db().await.db;
let table_names = write_lp(&db, data).await; let table_names = write_lp(&db, data).await;
for table_name in &table_names { for table_name in &table_names {
db.rollover_partition(&table_name, partition_key) db.rollover_partition(table_name, partition_key)
.await .await
.unwrap(); .unwrap();
} }
@ -758,10 +758,10 @@ pub(crate) async fn make_one_chunk_scenarios(partition_key: &str, data: &str) ->
let db = make_db().await.db; let db = make_db().await.db;
let table_names = write_lp(&db, data).await; let table_names = write_lp(&db, data).await;
for table_name in &table_names { for table_name in &table_names {
db.rollover_partition(&table_name, partition_key) db.rollover_partition(table_name, partition_key)
.await .await
.unwrap(); .unwrap();
db.move_chunk_to_read_buffer(&table_name, partition_key, 0) db.move_chunk_to_read_buffer(table_name, partition_key, 0)
.await .await
.unwrap(); .unwrap();
} }
@ -774,15 +774,15 @@ pub(crate) async fn make_one_chunk_scenarios(partition_key: &str, data: &str) ->
let db = make_db().await.db; let db = make_db().await.db;
let table_names = write_lp(&db, data).await; let table_names = write_lp(&db, data).await;
for table_name in &table_names { for table_name in &table_names {
db.rollover_partition(&table_name, partition_key) db.rollover_partition(table_name, partition_key)
.await .await
.unwrap(); .unwrap();
db.move_chunk_to_read_buffer(&table_name, partition_key, 0) db.move_chunk_to_read_buffer(table_name, partition_key, 0)
.await .await
.unwrap(); .unwrap();
db.persist_partition( db.persist_partition(
&table_name, table_name,
partition_key, partition_key,
Instant::now() + Duration::from_secs(1), Instant::now() + Duration::from_secs(1),
) )
@ -798,21 +798,20 @@ pub(crate) async fn make_one_chunk_scenarios(partition_key: &str, data: &str) ->
let db = make_db().await.db; let db = make_db().await.db;
let table_names = write_lp(&db, data).await; let table_names = write_lp(&db, data).await;
for table_name in &table_names { for table_name in &table_names {
db.rollover_partition(&table_name, partition_key) db.rollover_partition(table_name, partition_key)
.await .await
.unwrap(); .unwrap();
db.move_chunk_to_read_buffer(&table_name, partition_key, 0) db.move_chunk_to_read_buffer(table_name, partition_key, 0)
.await .await
.unwrap(); .unwrap();
db.persist_partition( db.persist_partition(
&table_name, table_name,
partition_key, partition_key,
Instant::now() + Duration::from_secs(1), Instant::now() + Duration::from_secs(1),
) )
.await .await
.unwrap(); .unwrap();
db.unload_read_buffer(&table_name, partition_key, 1) db.unload_read_buffer(table_name, partition_key, 1).unwrap();
.unwrap();
} }
let scenario5 = DbScenario { let scenario5 = DbScenario {
scenario_name: "Data in object store only".into(), scenario_name: "Data in object store only".into(),
@ -845,7 +844,7 @@ pub async fn make_two_chunk_scenarios(
let db = make_db().await.db; let db = make_db().await.db;
let table_names = write_lp(&db, data1).await; let table_names = write_lp(&db, data1).await;
for table_name in &table_names { for table_name in &table_names {
db.rollover_partition(&table_name, partition_key) db.rollover_partition(table_name, partition_key)
.await .await
.unwrap(); .unwrap();
} }
@ -859,10 +858,10 @@ pub async fn make_two_chunk_scenarios(
let db = make_db().await.db; let db = make_db().await.db;
let table_names = write_lp(&db, data1).await; let table_names = write_lp(&db, data1).await;
for table_name in &table_names { for table_name in &table_names {
db.rollover_partition(&table_name, partition_key) db.rollover_partition(table_name, partition_key)
.await .await
.unwrap(); .unwrap();
db.move_chunk_to_read_buffer(&table_name, partition_key, 0) db.move_chunk_to_read_buffer(table_name, partition_key, 0)
.await .await
.unwrap(); .unwrap();
} }
@ -876,21 +875,21 @@ pub async fn make_two_chunk_scenarios(
let db = make_db().await.db; let db = make_db().await.db;
let table_names = write_lp(&db, data1).await; let table_names = write_lp(&db, data1).await;
for table_name in &table_names { for table_name in &table_names {
db.rollover_partition(&table_name, partition_key) db.rollover_partition(table_name, partition_key)
.await .await
.unwrap(); .unwrap();
} }
let table_names = write_lp(&db, data2).await; let table_names = write_lp(&db, data2).await;
for table_name in &table_names { for table_name in &table_names {
db.rollover_partition(&table_name, partition_key) db.rollover_partition(table_name, partition_key)
.await .await
.unwrap(); .unwrap();
db.move_chunk_to_read_buffer(&table_name, partition_key, 0) db.move_chunk_to_read_buffer(table_name, partition_key, 0)
.await .await
.unwrap(); .unwrap();
db.move_chunk_to_read_buffer(&table_name, partition_key, 1) db.move_chunk_to_read_buffer(table_name, partition_key, 1)
.await .await
.unwrap(); .unwrap();
} }
@ -903,14 +902,14 @@ pub async fn make_two_chunk_scenarios(
let db = make_db().await.db; let db = make_db().await.db;
let table_names = write_lp(&db, data1).await; let table_names = write_lp(&db, data1).await;
for table_name in &table_names { for table_name in &table_names {
db.rollover_partition(&table_name, partition_key) db.rollover_partition(table_name, partition_key)
.await .await
.unwrap(); .unwrap();
db.move_chunk_to_read_buffer(&table_name, partition_key, 0) db.move_chunk_to_read_buffer(table_name, partition_key, 0)
.await .await
.unwrap(); .unwrap();
db.persist_partition( db.persist_partition(
&table_name, table_name,
partition_key, partition_key,
Instant::now() + Duration::from_secs(1), Instant::now() + Duration::from_secs(1),
) )
@ -919,14 +918,14 @@ pub async fn make_two_chunk_scenarios(
} }
let table_names = write_lp(&db, data2).await; let table_names = write_lp(&db, data2).await;
for table_name in &table_names { for table_name in &table_names {
db.rollover_partition(&table_name, partition_key) db.rollover_partition(table_name, partition_key)
.await .await
.unwrap(); .unwrap();
db.move_chunk_to_read_buffer(&table_name, partition_key, 2) db.move_chunk_to_read_buffer(table_name, partition_key, 2)
.await .await
.unwrap(); .unwrap();
db.persist_partition( db.persist_partition(
&table_name, table_name,
partition_key, partition_key,
Instant::now() + Duration::from_secs(1), Instant::now() + Duration::from_secs(1),
) )
@ -942,39 +941,37 @@ pub async fn make_two_chunk_scenarios(
let db = make_db().await.db; let db = make_db().await.db;
let table_names = write_lp(&db, data1).await; let table_names = write_lp(&db, data1).await;
for table_name in &table_names { for table_name in &table_names {
db.rollover_partition(&table_name, partition_key) db.rollover_partition(table_name, partition_key)
.await .await
.unwrap(); .unwrap();
db.move_chunk_to_read_buffer(&table_name, partition_key, 0) db.move_chunk_to_read_buffer(table_name, partition_key, 0)
.await .await
.unwrap(); .unwrap();
db.persist_partition( db.persist_partition(
&table_name, table_name,
partition_key, partition_key,
Instant::now() + Duration::from_secs(1), Instant::now() + Duration::from_secs(1),
) )
.await .await
.unwrap(); .unwrap();
db.unload_read_buffer(&table_name, partition_key, 1) db.unload_read_buffer(table_name, partition_key, 1).unwrap();
.unwrap();
} }
let table_names = write_lp(&db, data2).await; let table_names = write_lp(&db, data2).await;
for table_name in &table_names { for table_name in &table_names {
db.rollover_partition(&table_name, partition_key) db.rollover_partition(table_name, partition_key)
.await .await
.unwrap(); .unwrap();
db.move_chunk_to_read_buffer(&table_name, partition_key, 2) db.move_chunk_to_read_buffer(table_name, partition_key, 2)
.await .await
.unwrap(); .unwrap();
db.persist_partition( db.persist_partition(
&table_name, table_name,
partition_key, partition_key,
Instant::now() + Duration::from_secs(1), Instant::now() + Duration::from_secs(1),
) )
.await .await
.unwrap(); .unwrap();
db.unload_read_buffer(&table_name, partition_key, 3) db.unload_read_buffer(table_name, partition_key, 3).unwrap();
.unwrap();
} }
let scenario6 = DbScenario { let scenario6 = DbScenario {
scenario_name: "Data in 2 parquet chunks in object store only".into(), scenario_name: "Data in 2 parquet chunks in object store only".into(),
@ -986,17 +983,17 @@ pub async fn make_two_chunk_scenarios(
let table_names = write_lp(&db, data1).await; let table_names = write_lp(&db, data1).await;
for table_name in &table_names { for table_name in &table_names {
// put chunk 1 into RUB // put chunk 1 into RUB
db.rollover_partition(&table_name, partition_key) db.rollover_partition(table_name, partition_key)
.await .await
.unwrap(); .unwrap();
db.move_chunk_to_read_buffer(&table_name, partition_key, 0) db.move_chunk_to_read_buffer(table_name, partition_key, 0)
.await .await
.unwrap(); .unwrap();
} }
let table_names = write_lp(&db, data2).await; // write to MUB let table_names = write_lp(&db, data2).await; // write to MUB
for table_name in &table_names { for table_name in &table_names {
// compact chunks into a single RUB chunk // compact chunks into a single RUB chunk
db.compact_partition(&table_name, partition_key) db.compact_partition(table_name, partition_key)
.await .await
.unwrap(); .unwrap();
} }
@ -1036,10 +1033,10 @@ pub(crate) async fn make_one_rub_or_parquet_chunk_scenario(
let db = make_db().await.db; let db = make_db().await.db;
let table_names = write_lp(&db, data).await; let table_names = write_lp(&db, data).await;
for table_name in &table_names { for table_name in &table_names {
db.rollover_partition(&table_name, partition_key) db.rollover_partition(table_name, partition_key)
.await .await
.unwrap(); .unwrap();
db.move_chunk_to_read_buffer(&table_name, partition_key, 0) db.move_chunk_to_read_buffer(table_name, partition_key, 0)
.await .await
.unwrap(); .unwrap();
} }
@ -1052,21 +1049,20 @@ pub(crate) async fn make_one_rub_or_parquet_chunk_scenario(
let db = make_db().await.db; let db = make_db().await.db;
let table_names = write_lp(&db, data).await; let table_names = write_lp(&db, data).await;
for table_name in &table_names { for table_name in &table_names {
db.rollover_partition(&table_name, partition_key) db.rollover_partition(table_name, partition_key)
.await .await
.unwrap(); .unwrap();
db.move_chunk_to_read_buffer(&table_name, partition_key, 0) db.move_chunk_to_read_buffer(table_name, partition_key, 0)
.await .await
.unwrap(); .unwrap();
db.persist_partition( db.persist_partition(
&table_name, table_name,
partition_key, partition_key,
Instant::now() + Duration::from_secs(1), Instant::now() + Duration::from_secs(1),
) )
.await .await
.unwrap(); .unwrap();
db.unload_read_buffer(&table_name, partition_key, 1) db.unload_read_buffer(table_name, partition_key, 1).unwrap();
.unwrap();
} }
let scenario2 = DbScenario { let scenario2 = DbScenario {
scenario_name: "--------------------- Data in object store only ".into(), scenario_name: "--------------------- Data in object store only ".into(),

View File

@ -99,7 +99,7 @@ fn benchmark_plain_sum(
|b, input| { |b, input| {
b.iter(|| { b.iter(|| {
// do work // do work
let _ = encoding.sum(&input); let _ = encoding.sum(input);
}); });
}, },
); );
@ -121,7 +121,7 @@ fn benchmark_plain_sum(
|b, input| { |b, input| {
b.iter(|| { b.iter(|| {
// do work // do work
let _ = encoding.sum(&input); let _ = encoding.sum(input);
}); });
}, },
); );
@ -148,7 +148,7 @@ fn benchmark_plain_sum(
|b, input| { |b, input| {
b.iter(|| { b.iter(|| {
// do work // do work
let _ = encoding.sum(&input); let _ = encoding.sum(input);
}); });
}, },
); );
@ -170,7 +170,7 @@ fn benchmark_plain_sum(
|b, input| { |b, input| {
b.iter(|| { b.iter(|| {
// do work // do work
let _ = encoding.sum(&input); let _ = encoding.sum(input);
}); });
}, },
); );
@ -197,7 +197,7 @@ fn benchmark_plain_sum(
|b, input| { |b, input| {
b.iter(|| { b.iter(|| {
// do work // do work
let _ = encoding.sum(&input); let _ = encoding.sum(input);
}); });
}, },
); );
@ -219,7 +219,7 @@ fn benchmark_plain_sum(
|b, input| { |b, input| {
b.iter(|| { b.iter(|| {
// do work // do work
let _ = encoding.sum(&input); let _ = encoding.sum(input);
}); });
}, },
); );

View File

@ -29,7 +29,7 @@ fn read_group_predicate_all_time(c: &mut Criterion, row_group: &RowGroup, rng: &
benchmark_read_group_vary_cardinality( benchmark_read_group_vary_cardinality(
c, c,
"row_group_read_group_all_time_vary_cardinality", "row_group_read_group_all_time_vary_cardinality",
&row_group, row_group,
&time_pred, &time_pred,
// grouping columns and expected cardinality // grouping columns and expected cardinality
vec![ vec![
@ -47,7 +47,7 @@ fn read_group_predicate_all_time(c: &mut Criterion, row_group: &RowGroup, rng: &
benchmark_read_group_vary_group_cols( benchmark_read_group_vary_group_cols(
c, c,
"row_group_read_group_all_time_vary_columns", "row_group_read_group_all_time_vary_columns",
&row_group, row_group,
&time_pred, &time_pred,
// number of cols to group on and expected cardinality // number of cols to group on and expected cardinality
vec![ vec![
@ -82,7 +82,7 @@ fn read_group_pre_computed_groups(c: &mut Criterion, row_group: &RowGroup, rng:
benchmark_read_group_vary_cardinality( benchmark_read_group_vary_cardinality(
c, c,
"row_group_read_group_pre_computed_groups_vary_cardinality", "row_group_read_group_pre_computed_groups_vary_cardinality",
&row_group, row_group,
&Predicate::default(), &Predicate::default(),
// grouping columns and expected cardinality // grouping columns and expected cardinality
vec![ vec![
@ -99,7 +99,7 @@ fn read_group_pre_computed_groups(c: &mut Criterion, row_group: &RowGroup, rng:
benchmark_read_group_vary_group_cols( benchmark_read_group_vary_group_cols(
c, c,
"row_group_read_group_pre_computed_groups_vary_columns", "row_group_read_group_pre_computed_groups_vary_columns",
&row_group, row_group,
&Predicate::default(), &Predicate::default(),
// number of cols to group on and expected cardinality // number of cols to group on and expected cardinality
vec![ vec![

View File

@ -99,7 +99,7 @@ fn benchmark_none_sum(
|b, input| { |b, input| {
b.iter(|| { b.iter(|| {
// do work // do work
let _ = encoding.sum(&input); let _ = encoding.sum(input);
}); });
}, },
); );
@ -121,7 +121,7 @@ fn benchmark_none_sum(
|b, input| { |b, input| {
b.iter(|| { b.iter(|| {
// do work // do work
let _ = encoding.sum(&input); let _ = encoding.sum(input);
}); });
}, },
); );
@ -148,7 +148,7 @@ fn benchmark_none_sum(
|b, input| { |b, input| {
b.iter(|| { b.iter(|| {
// do work // do work
let _ = encoding.sum(&input); let _ = encoding.sum(input);
}); });
}, },
); );
@ -170,7 +170,7 @@ fn benchmark_none_sum(
|b, input| { |b, input| {
b.iter(|| { b.iter(|| {
// do work // do work
let _ = encoding.sum(&input); let _ = encoding.sum(input);
}); });
}, },
); );
@ -197,7 +197,7 @@ fn benchmark_none_sum(
|b, input| { |b, input| {
b.iter(|| { b.iter(|| {
// do work // do work
let _ = encoding.sum(&input); let _ = encoding.sum(input);
}); });
}, },
); );
@ -219,7 +219,7 @@ fn benchmark_none_sum(
|b, input| { |b, input| {
b.iter(|| { b.iter(|| {
// do work // do work
let _ = encoding.sum(&input); let _ = encoding.sum(input);
}); });
}, },
); );

View File

@ -130,7 +130,7 @@ impl Chunk {
/// caller does not need to be concerned about the size of the update. /// caller does not need to be concerned about the size of the update.
pub fn upsert_table(&mut self, table_data: RecordBatch) { pub fn upsert_table(&mut self, table_data: RecordBatch) {
let table_name = self.table.name(); let table_name = self.table.name();
let row_group = record_batch_to_row_group_with_logging(&table_name, table_data); let row_group = record_batch_to_row_group_with_logging(table_name, table_data);
self.upsert_table_with_row_group(row_group) self.upsert_table_with_row_group(row_group)
} }

View File

@ -390,7 +390,7 @@ impl Column {
) -> RowIDsOption { ) -> RowIDsOption {
// If we can get an answer using only the meta-data on the column then // If we can get an answer using only the meta-data on the column then
// return that answer. // return that answer.
match self.evaluate_predicate_on_meta(&op, &value) { match self.evaluate_predicate_on_meta(op, value) {
PredicateMatch::None => return RowIDsOption::None(dst), PredicateMatch::None => return RowIDsOption::None(dst),
PredicateMatch::All => return RowIDsOption::All(dst), PredicateMatch::All => return RowIDsOption::All(dst),
PredicateMatch::SomeMaybe => {} // have to apply predicate to column PredicateMatch::SomeMaybe => {} // have to apply predicate to column
@ -482,7 +482,7 @@ impl Column {
// When the predicate is == and the metadata range indicates the column // When the predicate is == and the metadata range indicates the column
// can't contain `value` then the column doesn't need to be read. // can't contain `value` then the column doesn't need to be read.
cmp::Operator::Equal => { cmp::Operator::Equal => {
if !self.might_contain_value(&value) { if !self.might_contain_value(value) {
return PredicateMatch::None; // no rows are going to match. return PredicateMatch::None; // no rows are going to match.
} }
} }
@ -491,7 +491,7 @@ impl Column {
// contain any null values, and the entire range of values satisfies the // contain any null values, and the entire range of values satisfies the
// predicate then the column doesn't need to be read. // predicate then the column doesn't need to be read.
cmp::Operator::GT | cmp::Operator::GTE | cmp::Operator::LT | cmp::Operator::LTE => { cmp::Operator::GT | cmp::Operator::GTE | cmp::Operator::LT | cmp::Operator::LTE => {
if self.predicate_matches_all_values(&op, &value) { if self.predicate_matches_all_values(op, value) {
return PredicateMatch::All; return PredicateMatch::All;
} }
} }
@ -500,13 +500,13 @@ impl Column {
// column can't possibly contain `value` then the predicate must // column can't possibly contain `value` then the predicate must
// match all rows on the column. // match all rows on the column.
cmp::Operator::NotEqual => { cmp::Operator::NotEqual => {
if !self.might_contain_value(&value) { if !self.might_contain_value(value) {
return PredicateMatch::All; // all rows are going to match. return PredicateMatch::All; // all rows are going to match.
} }
} }
} }
if self.predicate_matches_no_values(&op, &value) { if self.predicate_matches_no_values(op, value) {
return PredicateMatch::None; return PredicateMatch::None;
} }

View File

@ -210,7 +210,7 @@ impl Bool {
pub fn row_ids_filter(&self, value: bool, op: &cmp::Operator, dst: RowIDs) -> RowIDs { pub fn row_ids_filter(&self, value: bool, op: &cmp::Operator, dst: RowIDs) -> RowIDs {
match op { match op {
cmp::Operator::GT | cmp::Operator::GTE | cmp::Operator::LT | cmp::Operator::LTE => { cmp::Operator::GT | cmp::Operator::GTE | cmp::Operator::LT | cmp::Operator::LTE => {
self.row_ids_cmp_order(value, Self::ord_from_op(&op), dst) self.row_ids_cmp_order(value, Self::ord_from_op(op), dst)
} }
_ => self.row_ids_equal(value, op, dst), _ => self.row_ids_equal(value, op, dst),
} }

View File

@ -398,8 +398,8 @@ where
| (cmp::Operator::LT, cmp::Operator::GTE) | (cmp::Operator::LT, cmp::Operator::GTE)
| (cmp::Operator::LTE, cmp::Operator::GT) | (cmp::Operator::LTE, cmp::Operator::GT)
| (cmp::Operator::LTE, cmp::Operator::GTE) => self.row_ids_cmp_range_order( | (cmp::Operator::LTE, cmp::Operator::GTE) => self.row_ids_cmp_range_order(
(&left.0, Self::ord_from_op(&left.1)), (&left.0, Self::ord_from_op(left.1)),
(&right.0, Self::ord_from_op(&right.1)), (&right.0, Self::ord_from_op(right.1)),
dst, dst,
), ),

View File

@ -412,10 +412,10 @@ where
fn row_ids_filter(&self, value: L, op: &cmp::Operator, dst: RowIDs) -> RowIDs { fn row_ids_filter(&self, value: L, op: &cmp::Operator, dst: RowIDs) -> RowIDs {
let value = self.transcoder.encode(value); let value = self.transcoder.encode(value);
match op { match op {
cmp::Operator::GT => self.row_ids_cmp_order(value, Self::ord_from_op(&op), dst), cmp::Operator::GT => self.row_ids_cmp_order(value, Self::ord_from_op(op), dst),
cmp::Operator::GTE => self.row_ids_cmp_order(value, Self::ord_from_op(&op), dst), cmp::Operator::GTE => self.row_ids_cmp_order(value, Self::ord_from_op(op), dst),
cmp::Operator::LT => self.row_ids_cmp_order(value, Self::ord_from_op(&op), dst), cmp::Operator::LT => self.row_ids_cmp_order(value, Self::ord_from_op(op), dst),
cmp::Operator::LTE => self.row_ids_cmp_order(value, Self::ord_from_op(&op), dst), cmp::Operator::LTE => self.row_ids_cmp_order(value, Self::ord_from_op(op), dst),
_ => self.row_ids_equal(value, op, dst), _ => self.row_ids_equal(value, op, dst),
} }
} }
@ -438,8 +438,8 @@ where
| (cmp::Operator::LT, cmp::Operator::GTE) | (cmp::Operator::LT, cmp::Operator::GTE)
| (cmp::Operator::LTE, cmp::Operator::GT) | (cmp::Operator::LTE, cmp::Operator::GT)
| (cmp::Operator::LTE, cmp::Operator::GTE) => self.row_ids_cmp_range_order( | (cmp::Operator::LTE, cmp::Operator::GTE) => self.row_ids_cmp_range_order(
(left.0, Self::ord_from_op(&left.1)), (left.0, Self::ord_from_op(left.1)),
(right.0, Self::ord_from_op(&right.1)), (right.0, Self::ord_from_op(right.1)),
dst, dst,
), ),

View File

@ -400,8 +400,8 @@ where
| (cmp::Operator::LT, cmp::Operator::GTE) | (cmp::Operator::LT, cmp::Operator::GTE)
| (cmp::Operator::LTE, cmp::Operator::GT) | (cmp::Operator::LTE, cmp::Operator::GT)
| (cmp::Operator::LTE, cmp::Operator::GTE) => self.row_ids_cmp_range( | (cmp::Operator::LTE, cmp::Operator::GTE) => self.row_ids_cmp_range(
(&left.0, Self::ord_from_op(&left.1)), (&left.0, Self::ord_from_op(left.1)),
(&right.0, Self::ord_from_op(&right.1)), (&right.0, Self::ord_from_op(right.1)),
dst, dst,
), ),

View File

@ -430,7 +430,7 @@ mod test {
enc.push_none(); // 9 enc.push_none(); // 9
enc.push_additional(Some("south".to_string()), 2); // 10, 11 enc.push_additional(Some("south".to_string()), 2); // 10, 11
let ids = enc.row_ids_filter(&"east", &cmp::Operator::Equal, RowIDs::Vector(vec![])); let ids = enc.row_ids_filter("east", &cmp::Operator::Equal, RowIDs::Vector(vec![]));
assert_eq!( assert_eq!(
ids, ids,
RowIDs::Vector(vec![0, 1, 2, 4, 5, 6, 7, 8]), RowIDs::Vector(vec![0, 1, 2, 4, 5, 6, 7, 8]),
@ -438,14 +438,14 @@ mod test {
name name
); );
let ids = enc.row_ids_filter(&"south", &cmp::Operator::Equal, RowIDs::Vector(vec![])); let ids = enc.row_ids_filter("south", &cmp::Operator::Equal, RowIDs::Vector(vec![]));
assert_eq!(ids, RowIDs::Vector(vec![10, 11]), "{}", name); assert_eq!(ids, RowIDs::Vector(vec![10, 11]), "{}", name);
let ids = enc.row_ids_filter(&"foo", &cmp::Operator::Equal, RowIDs::Vector(vec![])); let ids = enc.row_ids_filter("foo", &cmp::Operator::Equal, RowIDs::Vector(vec![]));
assert!(ids.is_empty(), "{}", name); assert!(ids.is_empty(), "{}", name);
// != some value not in the column should exclude the NULL value. // != some value not in the column should exclude the NULL value.
let ids = enc.row_ids_filter(&"foo", &cmp::Operator::NotEqual, RowIDs::Vector(vec![])); let ids = enc.row_ids_filter("foo", &cmp::Operator::NotEqual, RowIDs::Vector(vec![]));
assert_eq!( assert_eq!(
ids, ids,
RowIDs::Vector(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 10, 11]), RowIDs::Vector(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 10, 11]),
@ -453,7 +453,7 @@ mod test {
name name
); );
let ids = enc.row_ids_filter(&"east", &cmp::Operator::NotEqual, RowIDs::Vector(vec![])); let ids = enc.row_ids_filter("east", &cmp::Operator::NotEqual, RowIDs::Vector(vec![]));
assert_eq!(ids, RowIDs::Vector(vec![3, 10, 11]), "{}", name); assert_eq!(ids, RowIDs::Vector(vec![3, 10, 11]), "{}", name);
} }
@ -474,7 +474,7 @@ mod test {
enc.push_additional(Some("east".to_string()), 2); enc.push_additional(Some("east".to_string()), 2);
enc.push_additional(Some("west".to_string()), 1); enc.push_additional(Some("west".to_string()), 1);
let ids = enc.row_ids_filter(&"abba", &cmp::Operator::NotEqual, RowIDs::Vector(vec![])); let ids = enc.row_ids_filter("abba", &cmp::Operator::NotEqual, RowIDs::Vector(vec![]));
assert_eq!(ids, RowIDs::Vector(vec![0, 1, 2]), "{}", name); assert_eq!(ids, RowIDs::Vector(vec![0, 1, 2]), "{}", name);
} }
@ -502,7 +502,7 @@ mod test {
enc.push_none(); // 13 enc.push_none(); // 13
enc.push_additional(Some("west".to_string()), 5); // 14, 15, 16, 17, 18 enc.push_additional(Some("west".to_string()), 5); // 14, 15, 16, 17, 18
let ids = enc.row_ids_filter(&"east", &cmp::Operator::LTE, RowIDs::Vector(vec![])); let ids = enc.row_ids_filter("east", &cmp::Operator::LTE, RowIDs::Vector(vec![]));
assert_eq!( assert_eq!(
ids, ids,
RowIDs::Vector(vec![0, 1, 2, 4, 5, 6, 7, 8]), RowIDs::Vector(vec![0, 1, 2, 4, 5, 6, 7, 8]),
@ -510,24 +510,24 @@ mod test {
name name
); );
let ids = enc.row_ids_filter(&"east", &cmp::Operator::LT, RowIDs::Vector(vec![])); let ids = enc.row_ids_filter("east", &cmp::Operator::LT, RowIDs::Vector(vec![]));
assert!(ids.is_empty(), "{}", name); assert!(ids.is_empty(), "{}", name);
let ids = enc.row_ids_filter(&"abc", &cmp::Operator::LTE, RowIDs::Vector(vec![])); let ids = enc.row_ids_filter("abc", &cmp::Operator::LTE, RowIDs::Vector(vec![]));
assert!(ids.is_empty(), "{}", name); assert!(ids.is_empty(), "{}", name);
let ids = enc.row_ids_filter(&"abc", &cmp::Operator::LT, RowIDs::Vector(vec![])); let ids = enc.row_ids_filter("abc", &cmp::Operator::LT, RowIDs::Vector(vec![]));
assert!(ids.is_empty(), "{}", name); assert!(ids.is_empty(), "{}", name);
let ids = enc.row_ids_filter(&"zoo", &cmp::Operator::GTE, RowIDs::Vector(vec![])); let ids = enc.row_ids_filter("zoo", &cmp::Operator::GTE, RowIDs::Vector(vec![]));
assert!(ids.is_empty(), "{}", name); assert!(ids.is_empty(), "{}", name);
let ids = enc.row_ids_filter(&"zoo", &cmp::Operator::GT, RowIDs::Vector(vec![])); let ids = enc.row_ids_filter("zoo", &cmp::Operator::GT, RowIDs::Vector(vec![]));
assert!(ids.is_empty(), "{}", name); assert!(ids.is_empty(), "{}", name);
let ids = enc.row_ids_filter(&"west", &cmp::Operator::GT, RowIDs::Vector(vec![])); let ids = enc.row_ids_filter("west", &cmp::Operator::GT, RowIDs::Vector(vec![]));
assert!(ids.is_empty(), "{}", name); assert!(ids.is_empty(), "{}", name);
let ids = enc.row_ids_filter(&"north", &cmp::Operator::GT, RowIDs::Vector(vec![])); let ids = enc.row_ids_filter("north", &cmp::Operator::GT, RowIDs::Vector(vec![]));
assert_eq!( assert_eq!(
ids, ids,
RowIDs::Vector(vec![9, 10, 11, 14, 15, 16, 17, 18]), RowIDs::Vector(vec![9, 10, 11, 14, 15, 16, 17, 18]),
@ -535,7 +535,7 @@ mod test {
name name
); );
let ids = enc.row_ids_filter(&"north", &cmp::Operator::GTE, RowIDs::Vector(vec![])); let ids = enc.row_ids_filter("north", &cmp::Operator::GTE, RowIDs::Vector(vec![]));
assert_eq!( assert_eq!(
ids, ids,
RowIDs::Vector(vec![3, 9, 10, 11, 12, 14, 15, 16, 17, 18]), RowIDs::Vector(vec![3, 9, 10, 11, 12, 14, 15, 16, 17, 18]),
@ -545,7 +545,7 @@ mod test {
// The encoding also supports comparisons on values that don't directly exist in // The encoding also supports comparisons on values that don't directly exist in
// the column. // the column.
let ids = enc.row_ids_filter(&"abba", &cmp::Operator::GT, RowIDs::Vector(vec![])); let ids = enc.row_ids_filter("abba", &cmp::Operator::GT, RowIDs::Vector(vec![]));
assert_eq!( assert_eq!(
ids, ids,
RowIDs::Vector(vec![ RowIDs::Vector(vec![
@ -555,7 +555,7 @@ mod test {
name name
); );
let ids = enc.row_ids_filter(&"east1", &cmp::Operator::GT, RowIDs::Vector(vec![])); let ids = enc.row_ids_filter("east1", &cmp::Operator::GT, RowIDs::Vector(vec![]));
assert_eq!( assert_eq!(
ids, ids,
RowIDs::Vector(vec![3, 9, 10, 11, 12, 14, 15, 16, 17, 18]), RowIDs::Vector(vec![3, 9, 10, 11, 12, 14, 15, 16, 17, 18]),
@ -563,7 +563,7 @@ mod test {
name name
); );
let ids = enc.row_ids_filter(&"east1", &cmp::Operator::GTE, RowIDs::Vector(vec![])); let ids = enc.row_ids_filter("east1", &cmp::Operator::GTE, RowIDs::Vector(vec![]));
assert_eq!( assert_eq!(
ids, ids,
RowIDs::Vector(vec![3, 9, 10, 11, 12, 14, 15, 16, 17, 18]), RowIDs::Vector(vec![3, 9, 10, 11, 12, 14, 15, 16, 17, 18]),
@ -571,7 +571,7 @@ mod test {
name name
); );
let ids = enc.row_ids_filter(&"east1", &cmp::Operator::LTE, RowIDs::Vector(vec![])); let ids = enc.row_ids_filter("east1", &cmp::Operator::LTE, RowIDs::Vector(vec![]));
assert_eq!( assert_eq!(
ids, ids,
RowIDs::Vector(vec![0, 1, 2, 4, 5, 6, 7, 8]), RowIDs::Vector(vec![0, 1, 2, 4, 5, 6, 7, 8]),
@ -579,7 +579,7 @@ mod test {
name name
); );
let ids = enc.row_ids_filter(&"region", &cmp::Operator::LT, RowIDs::Vector(vec![])); let ids = enc.row_ids_filter("region", &cmp::Operator::LT, RowIDs::Vector(vec![]));
assert_eq!( assert_eq!(
ids, ids,
RowIDs::Vector(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 12]), RowIDs::Vector(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 12]),
@ -587,7 +587,7 @@ mod test {
name name
); );
let ids = enc.row_ids_filter(&"region", &cmp::Operator::LTE, RowIDs::Vector(vec![])); let ids = enc.row_ids_filter("region", &cmp::Operator::LTE, RowIDs::Vector(vec![]));
assert_eq!( assert_eq!(
ids, ids,
RowIDs::Vector(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 12]), RowIDs::Vector(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 12]),
@ -595,7 +595,7 @@ mod test {
name name
); );
let ids = enc.row_ids_filter(&"zoo", &cmp::Operator::LT, RowIDs::Vector(vec![])); let ids = enc.row_ids_filter("zoo", &cmp::Operator::LT, RowIDs::Vector(vec![]));
assert_eq!( assert_eq!(
ids, ids,
RowIDs::Vector(vec![ RowIDs::Vector(vec![
@ -605,7 +605,7 @@ mod test {
name name
); );
let ids = enc.row_ids_filter(&"zoo", &cmp::Operator::LTE, RowIDs::Vector(vec![])); let ids = enc.row_ids_filter("zoo", &cmp::Operator::LTE, RowIDs::Vector(vec![]));
assert_eq!( assert_eq!(
ids, ids,
RowIDs::Vector(vec![ RowIDs::Vector(vec![
@ -627,34 +627,34 @@ mod test {
let name = enc.debug_name(); let name = enc.debug_name();
enc.push("east".to_string()); enc.push("east".to_string());
let ids = enc.row_ids_filter(&"east", &cmp::Operator::GT, RowIDs::Vector(vec![])); let ids = enc.row_ids_filter("east", &cmp::Operator::GT, RowIDs::Vector(vec![]));
assert!(ids.is_empty(), "{}", name); assert!(ids.is_empty(), "{}", name);
let ids = enc.row_ids_filter(&"north", &cmp::Operator::GTE, RowIDs::Vector(vec![])); let ids = enc.row_ids_filter("north", &cmp::Operator::GTE, RowIDs::Vector(vec![]));
assert!(ids.is_empty(), "{}", name); assert!(ids.is_empty(), "{}", name);
let ids = enc.row_ids_filter(&"east", &cmp::Operator::LT, RowIDs::Vector(vec![])); let ids = enc.row_ids_filter("east", &cmp::Operator::LT, RowIDs::Vector(vec![]));
assert!(ids.is_empty(), "{}", name); assert!(ids.is_empty(), "{}", name);
let ids = enc.row_ids_filter(&"abc", &cmp::Operator::LTE, RowIDs::Vector(vec![])); let ids = enc.row_ids_filter("abc", &cmp::Operator::LTE, RowIDs::Vector(vec![]));
assert!(ids.is_empty(), "{}", name); assert!(ids.is_empty(), "{}", name);
let ids = enc.row_ids_filter(&"abc", &cmp::Operator::GT, RowIDs::Vector(vec![])); let ids = enc.row_ids_filter("abc", &cmp::Operator::GT, RowIDs::Vector(vec![]));
assert_eq!(ids, RowIDs::Vector(vec![0]), "{}", name); assert_eq!(ids, RowIDs::Vector(vec![0]), "{}", name);
let ids = enc.row_ids_filter(&"abc", &cmp::Operator::GTE, RowIDs::Vector(vec![])); let ids = enc.row_ids_filter("abc", &cmp::Operator::GTE, RowIDs::Vector(vec![]));
assert_eq!(ids, RowIDs::Vector(vec![0]), "{}", name); assert_eq!(ids, RowIDs::Vector(vec![0]), "{}", name);
let ids = enc.row_ids_filter(&"east", &cmp::Operator::GTE, RowIDs::Vector(vec![])); let ids = enc.row_ids_filter("east", &cmp::Operator::GTE, RowIDs::Vector(vec![]));
assert_eq!(ids, RowIDs::Vector(vec![0]), "{}", name); assert_eq!(ids, RowIDs::Vector(vec![0]), "{}", name);
let ids = enc.row_ids_filter(&"east", &cmp::Operator::LTE, RowIDs::Vector(vec![])); let ids = enc.row_ids_filter("east", &cmp::Operator::LTE, RowIDs::Vector(vec![]));
assert_eq!(ids, RowIDs::Vector(vec![0]), "{}", name); assert_eq!(ids, RowIDs::Vector(vec![0]), "{}", name);
let ids = enc.row_ids_filter(&"west", &cmp::Operator::LTE, RowIDs::Vector(vec![])); let ids = enc.row_ids_filter("west", &cmp::Operator::LTE, RowIDs::Vector(vec![]));
assert_eq!(ids, RowIDs::Vector(vec![0]), "{}", name); assert_eq!(ids, RowIDs::Vector(vec![0]), "{}", name);
let ids = enc.row_ids_filter(&"west", &cmp::Operator::LT, RowIDs::Vector(vec![])); let ids = enc.row_ids_filter("west", &cmp::Operator::LT, RowIDs::Vector(vec![]));
assert_eq!(ids, RowIDs::Vector(vec![0]), "{}", name); assert_eq!(ids, RowIDs::Vector(vec![0]), "{}", name);
} }
} }
@ -677,34 +677,34 @@ mod test {
enc.push_none(); enc.push_none();
enc.push("east".to_string()); enc.push("east".to_string());
let ids = enc.row_ids_filter(&"west", &cmp::Operator::GT, RowIDs::Vector(vec![])); let ids = enc.row_ids_filter("west", &cmp::Operator::GT, RowIDs::Vector(vec![]));
assert!(ids.is_empty(), "{}", name); assert!(ids.is_empty(), "{}", name);
let ids = enc.row_ids_filter(&"zoo", &cmp::Operator::GTE, RowIDs::Vector(vec![])); let ids = enc.row_ids_filter("zoo", &cmp::Operator::GTE, RowIDs::Vector(vec![]));
assert!(ids.is_empty(), "{}", name); assert!(ids.is_empty(), "{}", name);
let ids = enc.row_ids_filter(&"east", &cmp::Operator::LT, RowIDs::Vector(vec![])); let ids = enc.row_ids_filter("east", &cmp::Operator::LT, RowIDs::Vector(vec![]));
assert!(ids.is_empty(), "{}", name); assert!(ids.is_empty(), "{}", name);
let ids = enc.row_ids_filter(&"abc", &cmp::Operator::LTE, RowIDs::Vector(vec![])); let ids = enc.row_ids_filter("abc", &cmp::Operator::LTE, RowIDs::Vector(vec![]));
assert!(ids.is_empty(), "{}", name); assert!(ids.is_empty(), "{}", name);
let ids = enc.row_ids_filter(&"abc", &cmp::Operator::GT, RowIDs::Vector(vec![])); let ids = enc.row_ids_filter("abc", &cmp::Operator::GT, RowIDs::Vector(vec![]));
assert_eq!(ids, RowIDs::Vector(vec![0, 1, 3]), "{}", name); assert_eq!(ids, RowIDs::Vector(vec![0, 1, 3]), "{}", name);
let ids = enc.row_ids_filter(&"abc", &cmp::Operator::GTE, RowIDs::Vector(vec![])); let ids = enc.row_ids_filter("abc", &cmp::Operator::GTE, RowIDs::Vector(vec![]));
assert_eq!(ids, RowIDs::Vector(vec![0, 1, 3]), "{}", name); assert_eq!(ids, RowIDs::Vector(vec![0, 1, 3]), "{}", name);
let ids = enc.row_ids_filter(&"west", &cmp::Operator::GTE, RowIDs::Vector(vec![])); let ids = enc.row_ids_filter("west", &cmp::Operator::GTE, RowIDs::Vector(vec![]));
assert_eq!(ids, RowIDs::Vector(vec![1]), "{}", name); assert_eq!(ids, RowIDs::Vector(vec![1]), "{}", name);
let ids = enc.row_ids_filter(&"east", &cmp::Operator::LTE, RowIDs::Vector(vec![])); let ids = enc.row_ids_filter("east", &cmp::Operator::LTE, RowIDs::Vector(vec![]));
assert_eq!(ids, RowIDs::Vector(vec![3]), "{}", name); assert_eq!(ids, RowIDs::Vector(vec![3]), "{}", name);
let ids = enc.row_ids_filter(&"west", &cmp::Operator::LTE, RowIDs::Vector(vec![])); let ids = enc.row_ids_filter("west", &cmp::Operator::LTE, RowIDs::Vector(vec![]));
assert_eq!(ids, RowIDs::Vector(vec![0, 1, 3]), "{}", name); assert_eq!(ids, RowIDs::Vector(vec![0, 1, 3]), "{}", name);
let ids = enc.row_ids_filter(&"west", &cmp::Operator::LT, RowIDs::Vector(vec![])); let ids = enc.row_ids_filter("west", &cmp::Operator::LT, RowIDs::Vector(vec![]));
assert_eq!(ids, RowIDs::Vector(vec![0, 3]), "{}", name); assert_eq!(ids, RowIDs::Vector(vec![0, 3]), "{}", name);
} }
} }
@ -809,12 +809,7 @@ mod test {
enc.push_none(); enc.push_none();
enc.push_additional(Some("zoo".to_string()), 1); enc.push_additional(Some("zoo".to_string()), 1);
assert_eq!( assert_eq!(enc.dictionary(), vec!["east", "west", "zoo"], "{}", name);
enc.dictionary(),
vec![&"east".to_string(), &"west".to_string(), &"zoo".to_string()],
"{}",
name
);
} }
#[test] #[test]

View File

@ -37,10 +37,10 @@ pub const TIME_COLUMN_NAME: &str = internal_types::schema::TIME_COLUMN_NAME;
#[derive(Debug, Snafu)] #[derive(Debug, Snafu)]
pub enum Error { pub enum Error {
#[snafu(display("arrow conversion error: {}", source))] #[snafu(display("arrow conversion error: {}", source))]
ArrowError { source: arrow::error::ArrowError }, ArrowConversion { source: arrow::error::ArrowError },
#[snafu(display("schema conversion error: {}", source))] #[snafu(display("schema conversion error: {}", source))]
SchemaError { SchemaConversion {
source: internal_types::schema::builder::Error, source: internal_types::schema::builder::Error,
}, },
@ -256,7 +256,7 @@ impl RowGroup {
columns: &[ColumnName<'_>], columns: &[ColumnName<'_>],
predicates: &Predicate, predicates: &Predicate,
) -> ReadFilterResult<'_> { ) -> ReadFilterResult<'_> {
let select_columns = self.meta.schema_for_column_names(&columns); let select_columns = self.meta.schema_for_column_names(columns);
assert_eq!(select_columns.len(), columns.len()); assert_eq!(select_columns.len(), columns.len());
let schema = ResultSchema { let schema = ResultSchema {
@ -540,11 +540,11 @@ impl RowGroup {
// single 128-bit integer as the group key. If grouping is on more than // single 128-bit integer as the group key. If grouping is on more than
// four columns then a fallback to using an vector as a key will happen. // four columns then a fallback to using an vector as a key will happen.
if dst.schema.group_columns.len() <= 4 { if dst.schema.group_columns.len() <= 4 {
self.read_group_hash_with_u128_key(dst, &groupby_encoded_ids, aggregate_columns_data); self.read_group_hash_with_u128_key(dst, groupby_encoded_ids, aggregate_columns_data);
return; return;
} }
self.read_group_hash_with_vec_key(dst, &groupby_encoded_ids, aggregate_columns_data); self.read_group_hash_with_vec_key(dst, groupby_encoded_ids, aggregate_columns_data);
} }
// This function is used with `read_group_hash` when the number of columns // This function is used with `read_group_hash` when the number of columns
@ -1709,7 +1709,7 @@ impl TryFrom<ReadFilterResult<'_>> for RecordBatch {
fn try_from(result: ReadFilterResult<'_>) -> Result<Self, Self::Error> { fn try_from(result: ReadFilterResult<'_>) -> Result<Self, Self::Error> {
let schema = internal_types::schema::Schema::try_from(result.schema()) let schema = internal_types::schema::Schema::try_from(result.schema())
.map_err(|source| Error::SchemaError { source })?; .map_err(|source| Error::SchemaConversion { source })?;
let columns: Vec<ArrayRef> = result let columns: Vec<ArrayRef> = result
.data .data
@ -1754,7 +1754,7 @@ impl TryFrom<ReadFilterResult<'_>> for RecordBatch {
// try_new only returns an error if the schema is invalid or the number // try_new only returns an error if the schema is invalid or the number
// of rows on columns differ. We have full control over both so there // of rows on columns differ. We have full control over both so there
// should never be an error to return... // should never be an error to return...
Self::try_new(arrow_schema, columns).map_err(|source| Error::ArrowError { source }) Self::try_new(arrow_schema, columns).map_err(|source| Error::ArrowConversion { source })
} }
} }
@ -2185,7 +2185,7 @@ impl TryFrom<ReadAggregateResult<'_>> for RecordBatch {
fn try_from(mut result: ReadAggregateResult<'_>) -> Result<Self, Self::Error> { fn try_from(mut result: ReadAggregateResult<'_>) -> Result<Self, Self::Error> {
let schema = internal_types::schema::Schema::try_from(result.schema()) let schema = internal_types::schema::Schema::try_from(result.schema())
.map_err(|source| Error::SchemaError { source })?; .map_err(|source| Error::SchemaConversion { source })?;
let arrow_schema: arrow::datatypes::SchemaRef = schema.into(); let arrow_schema: arrow::datatypes::SchemaRef = schema.into();
// Add the group columns to the set of column data for the record batch. // Add the group columns to the set of column data for the record batch.
@ -2257,7 +2257,7 @@ impl TryFrom<ReadAggregateResult<'_>> for RecordBatch {
// try_new only returns an error if the schema is invalid or the number // try_new only returns an error if the schema is invalid or the number
// of rows on columns differ. We have full control over both so there // of rows on columns differ. We have full control over both so there
// should never be an error to return... // should never be an error to return...
Self::try_new(arrow_schema, columns).context(ArrowError) Self::try_new(arrow_schema, columns).context(ArrowConversion)
} }
} }

View File

@ -19,10 +19,10 @@ use std::{
#[derive(Debug, Snafu)] #[derive(Debug, Snafu)]
pub enum Error { pub enum Error {
#[snafu(display("cannot drop last row group in table; drop table"))] #[snafu(display("cannot drop last row group in table; drop table"))]
EmptyTableError {}, EmptyTable {},
#[snafu(display("table does not have InfluxDB timestamp column"))] #[snafu(display("table does not have InfluxDB timestamp column"))]
NoTimestampColumnError {}, NoTimestampColumn {},
#[snafu(display("unsupported column operation on {}: {}", column_name, msg))] #[snafu(display("unsupported column operation on {}: {}", column_name, msg))]
UnsupportedColumnOperation { msg: String, column_name: String }, UnsupportedColumnOperation { msg: String, column_name: String },
@ -114,7 +114,7 @@ impl Table {
let mut row_groups = self.table_data.write(); let mut row_groups = self.table_data.write();
// Tables must always have at least one row group. // Tables must always have at least one row group.
ensure!(row_groups.data.len() > 1, EmptyTableError); ensure!(row_groups.data.len() > 1, EmptyTable);
row_groups.data.remove(position); // removes row group data row_groups.data.remove(position); // removes row group data
row_groups.meta = Arc::new(MetaData::from(row_groups.data.as_ref())); // rebuild meta row_groups.meta = Arc::new(MetaData::from(row_groups.data.as_ref())); // rebuild meta
@ -261,7 +261,7 @@ impl Table {
} }
// row group could potentially satisfy predicate // row group could potentially satisfy predicate
row_groups.push(Arc::clone(&rg)); row_groups.push(Arc::clone(rg));
} }
(Arc::clone(&table_data.meta), row_groups) (Arc::clone(&table_data.meta), row_groups)
@ -828,7 +828,7 @@ impl From<&[Arc<RowGroup>]> for MetaData {
let mut meta = Self::new(&row_groups[0]); let mut meta = Self::new(&row_groups[0]);
for row_group in row_groups.iter().skip(1) { for row_group in row_groups.iter().skip(1) {
meta = Self::update_with(meta, &row_group); meta = Self::update_with(meta, row_group);
} }
meta meta

View File

@ -1,3 +1,3 @@
[toolchain] [toolchain]
channel = "1.53.0" channel = "1.54"
components = [ "rustfmt", "clippy" ] components = [ "rustfmt", "clippy" ]

View File

@ -105,7 +105,7 @@ impl Config {
state.reservations.insert(db_name.clone()); state.reservations.insert(db_name.clone());
Ok(DatabaseHandle { Ok(DatabaseHandle {
state: Some(Arc::new(DatabaseState::Known { db_name })), state: Some(Arc::new(DatabaseState::Known { db_name })),
config: &self, config: self,
}) })
} }
@ -139,7 +139,7 @@ impl Config {
state.reservations.insert(db_name); state.reservations.insert(db_name);
Ok(DatabaseHandle { Ok(DatabaseHandle {
state: Some(db_state), state: Some(db_state),
config: &self, config: self,
}) })
} }
@ -167,7 +167,7 @@ impl Config {
state.reservations.insert(db_name.clone()); state.reservations.insert(db_name.clone());
Ok(BlockDatabaseGuard { Ok(BlockDatabaseGuard {
db_name: Some(db_name), db_name: Some(db_name),
config: &self, config: self,
}) })
} }
@ -364,15 +364,15 @@ impl DatabaseState {
fn db_any_state(&self) -> Option<Arc<Db>> { fn db_any_state(&self) -> Option<Arc<Db>> {
match self { match self {
DatabaseState::Replay { db, .. } => Some(Arc::clone(&db)), DatabaseState::Replay { db, .. } => Some(Arc::clone(db)),
DatabaseState::Initialized { db, .. } => Some(Arc::clone(&db)), DatabaseState::Initialized { db, .. } => Some(Arc::clone(db)),
_ => None, _ => None,
} }
} }
fn db_initialized(&self) -> Option<Arc<Db>> { fn db_initialized(&self) -> Option<Arc<Db>> {
match self { match self {
DatabaseState::Initialized { db, .. } => Some(Arc::clone(&db)), DatabaseState::Initialized { db, .. } => Some(Arc::clone(db)),
_ => None, _ => None,
} }
} }
@ -389,7 +389,7 @@ impl DatabaseState {
fn rules(&self) -> Option<Arc<DatabaseRules>> { fn rules(&self) -> Option<Arc<DatabaseRules>> {
match self { match self {
DatabaseState::Known { .. } => None, DatabaseState::Known { .. } => None,
DatabaseState::RulesLoaded { rules, .. } => Some(Arc::clone(&rules)), DatabaseState::RulesLoaded { rules, .. } => Some(Arc::clone(rules)),
DatabaseState::Replay { db, .. } => Some(db.rules()), DatabaseState::Replay { db, .. } => Some(db.rules()),
DatabaseState::Initialized { db, .. } => Some(db.rules()), DatabaseState::Initialized { db, .. } => Some(db.rules()),
} }
@ -445,7 +445,7 @@ pub(crate) struct DatabaseHandle<'a> {
impl<'a> DatabaseHandle<'a> { impl<'a> DatabaseHandle<'a> {
fn state(&self) -> Arc<DatabaseState> { fn state(&self) -> Arc<DatabaseState> {
Arc::clone(&self.state.as_ref().expect("not committed")) Arc::clone(self.state.as_ref().expect("not committed"))
} }
/// Get current [`DatabaseStateCode`] associated with this handle. /// Get current [`DatabaseStateCode`] associated with this handle.
@ -548,7 +548,7 @@ impl<'a> DatabaseHandle<'a> {
exec: Arc::clone(&self.config.exec), exec: Arc::clone(&self.config.exec),
preserved_catalog, preserved_catalog,
catalog, catalog,
rules: Arc::clone(&rules), rules: Arc::clone(rules),
write_buffer, write_buffer,
}; };
let db = Arc::new(Db::new(database_to_commit, Arc::clone(&self.config.jobs))); let db = Arc::new(Db::new(database_to_commit, Arc::clone(&self.config.jobs)));
@ -576,7 +576,7 @@ impl<'a> DatabaseHandle<'a> {
let shutdown = self.config.shutdown.child_token(); let shutdown = self.config.shutdown.child_token();
let shutdown_captured = shutdown.clone(); let shutdown_captured = shutdown.clone();
let db_captured = Arc::clone(&db); let db_captured = Arc::clone(db);
let rules = db.rules(); let rules = db.rules();
let handle = Some(tokio::spawn(async move { let handle = Some(tokio::spawn(async move {
@ -587,7 +587,7 @@ impl<'a> DatabaseHandle<'a> {
})); }));
self.state = Some(Arc::new(DatabaseState::Initialized { self.state = Some(Arc::new(DatabaseState::Initialized {
db: Arc::clone(&db), db: Arc::clone(db),
handle, handle,
shutdown, shutdown,
})); }));

View File

@ -546,7 +546,7 @@ impl Db {
// assume the locks have to possibly live across the `await` // assume the locks have to possibly live across the `await`
let fut = { let fut = {
let partition = self.partition(table_name, partition_key)?; let partition = self.partition(table_name, partition_key)?;
let partition = LockableCatalogPartition::new(Arc::clone(&self), partition); let partition = LockableCatalogPartition::new(Arc::clone(self), partition);
// Do lock dance to get a write lock on the partition as well // Do lock dance to get a write lock on the partition as well
// as on the to-be-dropped chunk. // as on the to-be-dropped chunk.
@ -603,7 +603,7 @@ impl Db {
// assume the locks have to possibly live across the `await` // assume the locks have to possibly live across the `await`
let fut = { let fut = {
let partition = self.partition(table_name, partition_key)?; let partition = self.partition(table_name, partition_key)?;
let partition = LockableCatalogPartition::new(Arc::clone(&self), partition); let partition = LockableCatalogPartition::new(Arc::clone(self), partition);
// Do lock dance to get a write lock on the partition as well // Do lock dance to get a write lock on the partition as well
// as on all of the chunks // as on all of the chunks
@ -637,7 +637,7 @@ impl Db {
// assume the locks have to possibly live across the `await` // assume the locks have to possibly live across the `await`
let fut = { let fut = {
let partition = self.partition(table_name, partition_key)?; let partition = self.partition(table_name, partition_key)?;
let partition = LockableCatalogPartition::new(Arc::clone(&self), partition); let partition = LockableCatalogPartition::new(Arc::clone(self), partition);
let partition = partition.read(); let partition = partition.read();
let chunks = LockablePartition::chunks(&partition); let chunks = LockablePartition::chunks(&partition);
@ -752,7 +752,7 @@ impl Db {
tokio::join!( tokio::join!(
// lifecycle policy loop // lifecycle policy loop
async { async {
let mut policy = ::lifecycle::LifecyclePolicy::new(ArcDb(Arc::clone(&self))); let mut policy = ::lifecycle::LifecyclePolicy::new(ArcDb(Arc::clone(self)));
while !shutdown.is_cancelled() { while !shutdown.is_cancelled() {
self.worker_iterations_lifecycle self.worker_iterations_lifecycle
@ -2908,7 +2908,7 @@ mod tests {
for (expected_summary, actual_summary) in expected.iter().zip(chunk_summaries.iter()) { for (expected_summary, actual_summary) in expected.iter().zip(chunk_summaries.iter()) {
assert!( assert!(
expected_summary.equal_without_timestamps(&actual_summary), expected_summary.equal_without_timestamps(actual_summary),
"expected:\n{:#?}\n\nactual:{:#?}\n\n", "expected:\n{:#?}\n\nactual:{:#?}\n\n",
expected_summary, expected_summary,
actual_summary actual_summary
@ -3218,7 +3218,7 @@ mod tests {
for (expected_summary, actual_summary) in expected.iter().zip(chunk_summaries.iter()) { for (expected_summary, actual_summary) in expected.iter().zip(chunk_summaries.iter()) {
assert!( assert!(
expected_summary.equal_without_timestamps(&actual_summary), expected_summary.equal_without_timestamps(actual_summary),
"\n\nexpected item:\n{:#?}\n\nactual item:\n{:#?}\n\n\ "\n\nexpected item:\n{:#?}\n\nactual item:\n{:#?}\n\n\
all expected:\n{:#?}\n\nall actual:\n{:#?}", all expected:\n{:#?}\n\nall actual:\n{:#?}",
expected_summary, expected_summary,
@ -4049,7 +4049,7 @@ mod tests {
object_store object_store
.put( .put(
&path, path,
futures::stream::once(async move { Ok(data) }), futures::stream::once(async move { Ok(data) }),
Some(len), Some(len),
) )

View File

@ -214,7 +214,7 @@ impl Catalog {
}); });
let partition = table.get_or_create_partition(partition_key); let partition = table.get_or_create_partition(partition_key);
(Arc::clone(&partition), table.schema()) (Arc::clone(partition), table.schema())
} }
/// Returns a list of summaries for each partition. /// Returns a list of summaries for each partition.

View File

@ -791,7 +791,7 @@ impl CatalogChunk {
meta, meta,
.. ..
} => { } => {
let meta = Arc::clone(&meta); let meta = Arc::clone(meta);
match &representation { match &representation {
ChunkStageFrozenRepr::MutableBufferSnapshot(_) => { ChunkStageFrozenRepr::MutableBufferSnapshot(_) => {
// Should always be in RUB once persisted // Should always be in RUB once persisted
@ -804,7 +804,7 @@ impl CatalogChunk {
.fail() .fail()
} }
ChunkStageFrozenRepr::ReadBuffer(repr) => { ChunkStageFrozenRepr::ReadBuffer(repr) => {
let db = Arc::clone(&repr); let db = Arc::clone(repr);
self.finish_lifecycle_action(ChunkLifecycleAction::Persisting)?; self.finish_lifecycle_action(ChunkLifecycleAction::Persisting)?;
self.metrics self.metrics

View File

@ -307,7 +307,7 @@ pub(super) struct TimestampHistogram {
impl TimestampHistogram { impl TimestampHistogram {
pub(super) fn add(&self, summary: &TimestampSummary) { pub(super) fn add(&self, summary: &TimestampSummary) {
self.inner.lock().merge(&summary) self.inner.lock().merge(summary)
} }
} }

View File

@ -27,6 +27,7 @@ use std::{
sync::Arc, sync::Arc,
}; };
#[allow(clippy::enum_variant_names)]
#[derive(Debug, Snafu)] #[derive(Debug, Snafu)]
pub enum Error { pub enum Error {
#[snafu(display("Mutable Buffer Chunk Error: {}", source))] #[snafu(display("Mutable Buffer Chunk Error: {}", source))]
@ -133,7 +134,7 @@ impl DbChunk {
partition_key, partition_key,
}, },
}; };
(state, Arc::clone(&meta)) (state, Arc::clone(meta))
} }
ChunkStage::Persisted { ChunkStage::Persisted {
parquet, parquet,
@ -148,10 +149,10 @@ impl DbChunk {
} }
} else { } else {
State::ParquetFile { State::ParquetFile {
chunk: Arc::clone(&parquet), chunk: Arc::clone(parquet),
} }
}; };
(state, Arc::clone(&meta)) (state, Arc::clone(meta))
} }
}; };
@ -176,9 +177,9 @@ impl DbChunk {
let (state, meta) = match chunk.stage() { let (state, meta) = match chunk.stage() {
ChunkStage::Persisted { parquet, meta, .. } => { ChunkStage::Persisted { parquet, meta, .. } => {
let chunk = Arc::clone(&parquet); let chunk = Arc::clone(parquet);
let state = State::ParquetFile { chunk }; let state = State::ParquetFile { chunk };
(state, Arc::clone(&meta)) (state, Arc::clone(meta))
} }
_ => { _ => {
panic!("Internal error: This chunk's stage is not Persisted"); panic!("Internal error: This chunk's stage is not Persisted");
@ -270,7 +271,7 @@ impl QueryChunk for DbChunk {
// meta-data only. A future improvement could be to apply this // meta-data only. A future improvement could be to apply this
// logic to chunk meta-data without involving the backing // logic to chunk meta-data without involving the backing
// execution engine. // execution engine.
let rb_predicate = match to_read_buffer_predicate(&predicate) { let rb_predicate = match to_read_buffer_predicate(predicate) {
Ok(rb_predicate) => rb_predicate, Ok(rb_predicate) => rb_predicate,
Err(e) => { Err(e) => {
debug!(?predicate, %e, "read buffer predicate not supported for table_names, falling back"); debug!(?predicate, %e, "read buffer predicate not supported for table_names, falling back");
@ -327,7 +328,7 @@ impl QueryChunk for DbChunk {
State::ReadBuffer { chunk, .. } => { State::ReadBuffer { chunk, .. } => {
// Only apply pushdownable predicates // Only apply pushdownable predicates
let rb_predicate = let rb_predicate =
match to_read_buffer_predicate(&predicate).context(PredicateConversion) { match to_read_buffer_predicate(predicate).context(PredicateConversion) {
Ok(predicate) => predicate, Ok(predicate) => predicate,
Err(_) => read_buffer::Predicate::default(), Err(_) => read_buffer::Predicate::default(),
}; };
@ -372,7 +373,7 @@ impl QueryChunk for DbChunk {
Ok(chunk.column_names(columns)) Ok(chunk.column_names(columns))
} }
State::ReadBuffer { chunk, .. } => { State::ReadBuffer { chunk, .. } => {
let rb_predicate = match to_read_buffer_predicate(&predicate) { let rb_predicate = match to_read_buffer_predicate(predicate) {
Ok(rb_predicate) => rb_predicate, Ok(rb_predicate) => rb_predicate,
Err(e) => { Err(e) => {
debug!(?predicate, %e, "read buffer predicate not supported for column_names, falling back"); debug!(?predicate, %e, "read buffer predicate not supported for column_names, falling back");

View File

@ -5,6 +5,7 @@ use data_types::chunk_metadata::ChunkAddr;
use crate::db::catalog; use crate::db::catalog;
#[allow(clippy::enum_variant_names)]
#[derive(Debug, Snafu)] #[derive(Debug, Snafu)]
// Export the snafu "selectors" so they can be used in other modules // Export the snafu "selectors" so they can be used in other modules
#[snafu(visibility = "pub")] #[snafu(visibility = "pub")]

View File

@ -11,6 +11,7 @@ use write_buffer::config::WriteBufferConfig;
use crate::Db; use crate::Db;
#[allow(clippy::enum_variant_names)]
#[derive(Debug, Snafu)] #[derive(Debug, Snafu)]
pub enum Error { pub enum Error {
#[snafu(display("Cannot seek sequencer {} during replay: {}", sequencer_id, source))] #[snafu(display("Cannot seek sequencer {} during replay: {}", sequencer_id, source))]
@ -382,7 +383,7 @@ mod tests {
// start background worker // start background worker
let shutdown: CancellationToken = Default::default(); let shutdown: CancellationToken = Default::default();
let shutdown_captured = shutdown.clone(); let shutdown_captured = shutdown.clone();
let db_captured = Arc::clone(&db); let db_captured = Arc::clone(db);
let join_handle = tokio::spawn(async move { let join_handle = tokio::spawn(async move {
db_captured.background_worker(shutdown_captured).await db_captured.background_worker(shutdown_captured).await
}); });
@ -480,7 +481,7 @@ mod tests {
) )
} }
Check::Query(query, expected) => { Check::Query(query, expected) => {
let db = Arc::clone(&db); let db = Arc::clone(db);
let planner = SqlQueryPlanner::default(); let planner = SqlQueryPlanner::default();
let executor = db.executor(); let executor = db.executor();

View File

@ -25,6 +25,7 @@ use crate::{
const STORE_ERROR_PAUSE_SECONDS: u64 = 100; const STORE_ERROR_PAUSE_SECONDS: u64 = 100;
#[allow(clippy::enum_variant_names)]
#[derive(Debug, Snafu)] #[derive(Debug, Snafu)]
pub enum Error { pub enum Error {
#[snafu(display("cannot load catalog: {}", source))] #[snafu(display("cannot load catalog: {}", source))]

View File

@ -573,7 +573,7 @@ where
server_id: config.server_id(), server_id: config.server_id(),
}) })
} }
ServerStage::Initialized { config, .. } => Ok(Arc::clone(&config)), ServerStage::Initialized { config, .. } => Ok(Arc::clone(config)),
} }
} }
@ -584,7 +584,7 @@ where
ServerStage::Startup { .. } => Err(Error::IdNotSet), ServerStage::Startup { .. } => Err(Error::IdNotSet),
ServerStage::InitReady { config, .. } ServerStage::InitReady { config, .. }
| ServerStage::Initializing { config, .. } | ServerStage::Initializing { config, .. }
| ServerStage::Initialized { config, .. } => Ok(Arc::clone(&config)), | ServerStage::Initialized { config, .. } => Ok(Arc::clone(config)),
} }
} }
@ -866,7 +866,7 @@ where
.await? .await?
} }
None => { None => {
self.write_entry_local(&db_name, db, sharded_entry.entry) self.write_entry_local(db_name, db, sharded_entry.entry)
.await? .await?
} }
} }
@ -931,7 +931,7 @@ where
} }
Ok(remote) => { Ok(remote) => {
return remote return remote
.write_entry(&db_name, entry) .write_entry(db_name, entry)
.await .await
.context(RemoteError) .context(RemoteError)
} }

View File

@ -84,25 +84,25 @@ async fn setup(object_store: Arc<ObjectStore>, done: &Mutex<bool>) {
for table_name in &table_names { for table_name in &table_names {
let chunk = db let chunk = db
.rollover_partition(&table_name, partition_key) .rollover_partition(table_name, partition_key)
.await .await
.unwrap() .unwrap()
.unwrap(); .unwrap();
db.move_chunk_to_read_buffer(&table_name, partition_key, chunk.id()) db.move_chunk_to_read_buffer(table_name, partition_key, chunk.id())
.await .await
.unwrap(); .unwrap();
let chunk = db let chunk = db
.persist_partition( .persist_partition(
&table_name, table_name,
partition_key, partition_key,
Instant::now() + Duration::from_secs(1), Instant::now() + Duration::from_secs(1),
) )
.await .await
.unwrap(); .unwrap();
db.unload_read_buffer(&table_name, partition_key, chunk.id()) db.unload_read_buffer(table_name, partition_key, chunk.id())
.unwrap(); .unwrap();
} }
} }

View File

@ -108,7 +108,7 @@ async fn run_tag_values_query(
predicate: Predicate, predicate: Predicate,
) { ) {
let plan = planner let plan = planner
.tag_values(db, &tag_key, predicate) .tag_values(db, tag_key, predicate)
.expect("built plan successfully"); .expect("built plan successfully");
let names = executor.to_string_set(plan).await.expect( let names = executor.to_string_set(plan).await.expect(
"converted plan to strings "converted plan to strings

View File

@ -18,6 +18,7 @@ mod catalog;
mod chunk; mod chunk;
mod partition; mod partition;
#[allow(clippy::enum_variant_names)]
#[derive(Debug, Error)] #[derive(Debug, Error)]
pub enum Error { pub enum Error {
#[error("Error creating database: {0}")] #[error("Error creating database: {0}")]

View File

@ -9,6 +9,7 @@ use influxdb_iox_client::{
use snafu::{ResultExt, Snafu}; use snafu::{ResultExt, Snafu};
use structopt::StructOpt; use structopt::StructOpt;
#[allow(clippy::enum_variant_names)]
#[derive(Debug, Snafu)] #[derive(Debug, Snafu)]
pub enum Error { pub enum Error {
#[snafu(display("Error connection to IOx: {}", source))] #[snafu(display("Error connection to IOx: {}", source))]

View File

@ -9,6 +9,7 @@ use std::convert::TryFrom;
use structopt::StructOpt; use structopt::StructOpt;
use thiserror::Error; use thiserror::Error;
#[allow(clippy::enum_variant_names)]
#[derive(Debug, Error)] #[derive(Debug, Error)]
pub enum Error { pub enum Error {
#[error("Error listing chunks: {0}")] #[error("Error listing chunks: {0}")]

View File

@ -6,13 +6,14 @@ use influxdb_iox_client::{
connection::Builder, connection::Builder,
management::{ management::{
self, ClosePartitionChunkError, GetPartitionError, ListPartitionChunksError, self, ClosePartitionChunkError, GetPartitionError, ListPartitionChunksError,
ListPartitionsError, NewPartitionChunkError, UnloadPartitionChunkError ListPartitionsError, NewPartitionChunkError, UnloadPartitionChunkError,
}, },
}; };
use std::convert::{TryFrom, TryInto}; use std::convert::{TryFrom, TryInto};
use structopt::StructOpt; use structopt::StructOpt;
use thiserror::Error; use thiserror::Error;
#[allow(clippy::enum_variant_names)]
#[derive(Debug, Error)] #[derive(Debug, Error)]
pub enum Error { pub enum Error {
#[error("Error listing partitions: {0}")] #[error("Error listing partitions: {0}")]

View File

@ -9,6 +9,7 @@ use std::convert::TryInto;
use structopt::StructOpt; use structopt::StructOpt;
use thiserror::Error; use thiserror::Error;
#[allow(clippy::enum_variant_names)]
#[derive(Debug, Error)] #[derive(Debug, Error)]
pub enum Error { pub enum Error {
#[error("Error connecting to IOx: {0}")] #[error("Error connecting to IOx: {0}")]

View File

@ -7,6 +7,7 @@ use crate::commands::server_remote;
use structopt::StructOpt; use structopt::StructOpt;
use thiserror::Error; use thiserror::Error;
#[allow(clippy::enum_variant_names)]
#[derive(Debug, Error)] #[derive(Debug, Error)]
pub enum Error { pub enum Error {
#[error("Remote: {0}")] #[error("Remote: {0}")]

View File

@ -4,6 +4,7 @@ use thiserror::Error;
use prettytable::{format, Cell, Row, Table}; use prettytable::{format, Cell, Row, Table};
#[allow(clippy::enum_variant_names)]
#[derive(Debug, Error)] #[derive(Debug, Error)]
pub enum Error { pub enum Error {
#[error("Error connecting to IOx: {0}")] #[error("Error connecting to IOx: {0}")]

View File

@ -231,7 +231,7 @@ impl Repl {
Some(QueryEngine::Remote(db_name)) => { Some(QueryEngine::Remote(db_name)) => {
info!(%db_name, %sql, "Running sql on remote database"); info!(%db_name, %sql, "Running sql on remote database");
scrape_query(&mut self.flight_client, &db_name, &sql).await? scrape_query(&mut self.flight_client, db_name, &sql).await?
} }
Some(QueryEngine::Observer(observer)) => { Some(QueryEngine::Observer(observer)) => {
info!("Running sql on local observer"); info!("Running sql on local observer");

View File

@ -29,7 +29,7 @@ impl TryInto<ReplCommand> for String {
let raw_commands = self let raw_commands = self
.trim() .trim()
// chop off trailing semicolon // chop off trailing semicolon
.strip_suffix(";") .strip_suffix(';')
.unwrap_or(&self) .unwrap_or(&self)
// tokenize on whitespace // tokenize on whitespace
.split(' ') .split(' ')

View File

@ -483,7 +483,7 @@ where
max_request_size, max_request_size,
} = req.data::<Server<M>>().expect("server state"); } = req.data::<Server<M>>().expect("server state");
let max_request_size = *max_request_size; let max_request_size = *max_request_size;
let server = Arc::clone(&server); let server = Arc::clone(server);
// TODO(edd): figure out best way of catching all errors in this observation. // TODO(edd): figure out best way of catching all errors in this observation.
let obs = server.metrics().http_requests.observation(); // instrument request let obs = server.metrics().http_requests.observation(); // instrument request

View File

@ -104,7 +104,7 @@ where
testing::make_server(), testing::make_server(),
storage::make_server( storage::make_server(
Arc::clone(&server), Arc::clone(&server),
Arc::clone(&server.metrics_registry()), Arc::clone(server.metrics_registry()),
serving_gate.clone(), serving_gate.clone(),
), ),
flight::make_server(Arc::clone(&server), serving_gate.clone()), flight::make_server(Arc::clone(&server), serving_gate.clone()),

View File

@ -25,6 +25,7 @@ use std::fmt::Debug;
use super::super::planner::Planner; use super::super::planner::Planner;
#[allow(clippy::enum_variant_names)]
#[derive(Debug, Snafu)] #[derive(Debug, Snafu)]
pub enum Error { pub enum Error {
#[snafu(display("Invalid ticket. Error: {:?} Ticket: {:?}", source, ticket))] #[snafu(display("Invalid ticket. Error: {:?} Ticket: {:?}", source, ticket))]
@ -353,7 +354,7 @@ fn optimize_schema(schema: &Schema) -> Schema {
fn hydrate_dictionary(array: &ArrayRef) -> Result<ArrayRef, Error> { fn hydrate_dictionary(array: &ArrayRef) -> Result<ArrayRef, Error> {
match array.data_type() { match array.data_type() {
DataType::Dictionary(_, value) => { DataType::Dictionary(_, value) => {
arrow::compute::cast(array, &value).context(DictionaryError) arrow::compute::cast(array, value).context(DictionaryError)
} }
_ => unreachable!("not a dictionary"), _ => unreachable!("not a dictionary"),
} }

View File

@ -863,9 +863,7 @@ where
let predicate = PredicateBuilder::default().set_range(range).build(); let predicate = PredicateBuilder::default().set_range(range).build();
let db_name = db_name.as_ref(); let db_name = db_name.as_ref();
let db = db_store let db = db_store.db(db_name).context(DatabaseNotFound { db_name })?;
.db(&db_name)
.context(DatabaseNotFound { db_name })?;
let executor = db_store.executor(); let executor = db_store.executor();
let plan = Planner::new(Arc::clone(&executor)) let plan = Planner::new(Arc::clone(&executor))
@ -1080,9 +1078,7 @@ where
let owned_db_name = db_name; let owned_db_name = db_name;
let db_name = owned_db_name.as_str(); let db_name = owned_db_name.as_str();
let db = db_store let db = db_store.db(db_name).context(DatabaseNotFound { db_name })?;
.db(&db_name)
.context(DatabaseNotFound { db_name })?;
let executor = db_store.executor(); let executor = db_store.executor();
let planner = Planner::new(Arc::clone(&executor)); let planner = Planner::new(Arc::clone(&executor));
@ -1184,7 +1180,7 @@ mod tests {
use futures::prelude::*; use futures::prelude::*;
use generated_types::{ use generated_types::{
aggregate::AggregateType, i_ox_testing_client, node, read_response::frame, storage_client, aggregate::AggregateType, i_ox_testing_client, node, storage_client,
Aggregate as RPCAggregate, Duration as RPCDuration, Node, ReadSource, TestErrorRequest, Aggregate as RPCAggregate, Duration as RPCDuration, Node, ReadSource, TestErrorRequest,
Window as RPCWindow, Window as RPCWindow,
}; };
@ -2578,13 +2574,13 @@ mod tests {
.try_collect() .try_collect()
.await?; .await?;
let data_frames: Vec<frame::Data> = responses let data_frames_count = responses
.into_iter() .into_iter()
.flat_map(|r| r.frames) .flat_map(|r| r.frames)
.flat_map(|f| f.data) .flat_map(|f| f.data)
.collect(); .count();
let s = format!("{} aggregate_frames", data_frames.len()); let s = format!("{} aggregate_frames", data_frames_count);
Ok(vec![s]) Ok(vec![s])
} }
@ -2671,13 +2667,13 @@ mod tests {
.try_collect() .try_collect()
.await?; .await?;
let data_frames: Vec<frame::Data> = responses let data_frames_count = responses
.into_iter() .into_iter()
.flat_map(|r| r.frames) .flat_map(|r| r.frames)
.flat_map(|f| f.data) .flat_map(|f| f.data)
.collect(); .count();
let s = format!("{} frames", data_frames.len()); let s = format!("{} frames", data_frames_count);
Ok(vec![s]) Ok(vec![s])
} }
@ -2696,13 +2692,13 @@ mod tests {
.try_collect() .try_collect()
.await?; .await?;
let data_frames: Vec<frame::Data> = responses let data_frames_count = responses
.into_iter() .into_iter()
.flat_map(|r| r.frames) .flat_map(|r| r.frames)
.flat_map(|f| f.data) .flat_map(|f| f.data)
.collect(); .count();
let s = format!("{} group frames", data_frames.len()); let s = format!("{} group frames", data_frames_count);
Ok(vec![s]) Ok(vec![s])
} }

View File

@ -241,7 +241,7 @@ impl ServerFixture {
/// Directory used for data storage. /// Directory used for data storage.
pub fn dir(&self) -> &Path { pub fn dir(&self) -> &Path {
&self.server.dir.path() self.server.dir.path()
} }
} }

View File

@ -181,7 +181,7 @@ impl Scenario {
.build() .build()
.unwrap(), .unwrap(),
]; ];
self.write_data(&influxdb2, points).await.unwrap(); self.write_data(influxdb2, points).await.unwrap();
let host_array = StringArray::from(vec![ let host_array = StringArray::from(vec![
Some("server01"), Some("server01"),

View File

@ -93,7 +93,7 @@ impl<F: TryFuture> Future for TrackedFuture<F> {
#[pinned_drop] #[pinned_drop]
impl<F: TryFuture> PinnedDrop for TrackedFuture<F> { impl<F: TryFuture> PinnedDrop for TrackedFuture<F> {
fn drop(self: Pin<&mut Self>) { fn drop(self: Pin<&mut Self>) {
let state: &TrackerState = &self.project().tracker; let state: &TrackerState = self.project().tracker;
let wall_nanos = state.start_instant.elapsed().as_nanos() as usize; let wall_nanos = state.start_instant.elapsed().as_nanos() as usize;

View File

@ -747,7 +747,7 @@ ERROR foo
for range in test_cases { for range in test_cases {
for len in range { for len in range {
let long = std::iter::repeat("X").take(len).collect::<String>(); let long = "X".repeat(len);
let captured = log_test( let captured = log_test(
Builder::new().with_log_filter(&Some("error".to_string())), Builder::new().with_log_filter(&Some("error".to_string())),