fix: Address or allow Clippy warnings new with Rust 1.54
parent
edd2cea7a6
commit
9d15798288
|
@ -64,7 +64,7 @@ fn create_table(results: &[RecordBatch]) -> Result<Table> {
|
|||
|
||||
let mut header = Vec::new();
|
||||
for field in schema.fields() {
|
||||
header.push(Cell::new(&field.name()));
|
||||
header.push(Cell::new(field.name()));
|
||||
}
|
||||
table.set_titles(Row::new(header));
|
||||
|
||||
|
@ -73,7 +73,7 @@ fn create_table(results: &[RecordBatch]) -> Result<Table> {
|
|||
let mut cells = Vec::new();
|
||||
for col in 0..batch.num_columns() {
|
||||
let column = batch.column(col);
|
||||
cells.push(Cell::new(&array_value_to_string(&column, row)?));
|
||||
cells.push(Cell::new(&array_value_to_string(column, row)?));
|
||||
}
|
||||
table.add_row(Row::new(cells));
|
||||
}
|
||||
|
|
|
@ -20,7 +20,7 @@ pub fn optimize_dictionaries(batch: &RecordBatch) -> Result<RecordBatch> {
|
|||
.iter()
|
||||
.zip(schema.fields())
|
||||
.map(|(col, field)| match field.data_type() {
|
||||
DataType::Dictionary(key, value) => optimize_dict_col(col, &key, &value),
|
||||
DataType::Dictionary(key, value) => optimize_dict_col(col, key, value),
|
||||
_ => Ok(Arc::clone(col)),
|
||||
})
|
||||
.collect::<Result<Vec<_>>>()?;
|
||||
|
|
|
@ -78,7 +78,7 @@ impl<K: AsPrimitive<usize> + FromPrimitive + Zero> PackedStringArray<K> {
|
|||
|
||||
pub fn iter(&self) -> PackedStringIterator<'_, K> {
|
||||
PackedStringIterator {
|
||||
array: &self,
|
||||
array: self,
|
||||
index: 0,
|
||||
}
|
||||
}
|
||||
|
|
|
@ -98,7 +98,7 @@ impl DatabaseRules {
|
|||
}
|
||||
|
||||
pub fn db_name(&self) -> &str {
|
||||
&self.name.as_str()
|
||||
self.name.as_str()
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -109,7 +109,7 @@ pub trait Partitioner {
|
|||
|
||||
impl Partitioner for DatabaseRules {
|
||||
fn partition_key(&self, line: &ParsedLine<'_>, default_time: i64) -> Result<String> {
|
||||
self.partition_key(&line, default_time)
|
||||
self.partition_key(line, default_time)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -268,16 +268,16 @@ impl Partitioner for PartitionTemplate {
|
|||
.iter()
|
||||
.map(|p| match p {
|
||||
TemplatePart::Table => line.series.measurement.to_string(),
|
||||
TemplatePart::Column(column) => match line.tag_value(&column) {
|
||||
TemplatePart::Column(column) => match line.tag_value(column) {
|
||||
Some(v) => format!("{}_{}", column, v),
|
||||
None => match line.field_value(&column) {
|
||||
None => match line.field_value(column) {
|
||||
Some(v) => format!("{}_{}", column, v),
|
||||
None => "".to_string(),
|
||||
},
|
||||
},
|
||||
TemplatePart::TimeFormat(format) => {
|
||||
let nanos = line.timestamp.unwrap_or(default_time);
|
||||
Utc.timestamp_nanos(nanos).format(&format).to_string()
|
||||
Utc.timestamp_nanos(nanos).format(format).to_string()
|
||||
}
|
||||
_ => unimplemented!(),
|
||||
})
|
||||
|
|
|
@ -58,7 +58,7 @@ impl Job {
|
|||
Self::CompactChunks { partition, .. } => Some(&partition.db_name),
|
||||
Self::PersistChunks { partition, .. } => Some(&partition.db_name),
|
||||
Self::DropChunk { chunk, .. } => Some(&chunk.db_name),
|
||||
Self::WipePreservedCatalog { db_name, .. } => Some(&db_name),
|
||||
Self::WipePreservedCatalog { db_name, .. } => Some(db_name),
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -306,7 +306,7 @@ pub fn pb_to_entry(database_batch: &pb::DatabaseBatch) -> Result<Entry> {
|
|||
|
||||
let mut table_batches = Vec::with_capacity(database_batch.table_batches.len());
|
||||
for table_batch in &database_batch.table_batches {
|
||||
table_batches.push(pb_table_batch_to_fb(&mut fbb, &table_batch)?);
|
||||
table_batches.push(pb_table_batch_to_fb(&mut fbb, table_batch)?);
|
||||
}
|
||||
let partition_key = fbb.create_string("pkey");
|
||||
let table_batches = fbb.create_vector(&table_batches);
|
||||
|
@ -406,7 +406,7 @@ fn pb_table_batch_to_fb<'a>(
|
|||
for column in &table_batch.columns {
|
||||
columns.push(pb_column_to_fb(
|
||||
fbb,
|
||||
&column,
|
||||
column,
|
||||
table_batch.row_count as usize,
|
||||
)?);
|
||||
}
|
||||
|
@ -1571,7 +1571,7 @@ impl<'a> ColumnBuilder<'a> {
|
|||
)
|
||||
}
|
||||
ColumnRaw::Time(values) => {
|
||||
let values = fbb.create_vector(&values);
|
||||
let values = fbb.create_vector(values);
|
||||
let values = entry_fb::I64Values::create(
|
||||
fbb,
|
||||
&entry_fb::I64ValuesArgs {
|
||||
|
@ -1586,7 +1586,7 @@ impl<'a> ColumnBuilder<'a> {
|
|||
)
|
||||
}
|
||||
ColumnRaw::I64(values) => {
|
||||
let values = fbb.create_vector(&values);
|
||||
let values = fbb.create_vector(values);
|
||||
let values = entry_fb::I64Values::create(
|
||||
fbb,
|
||||
&entry_fb::I64ValuesArgs {
|
||||
|
@ -1601,7 +1601,7 @@ impl<'a> ColumnBuilder<'a> {
|
|||
)
|
||||
}
|
||||
ColumnRaw::Bool(values) => {
|
||||
let values = fbb.create_vector(&values);
|
||||
let values = fbb.create_vector(values);
|
||||
let values = entry_fb::BoolValues::create(
|
||||
fbb,
|
||||
&entry_fb::BoolValuesArgs {
|
||||
|
@ -1616,7 +1616,7 @@ impl<'a> ColumnBuilder<'a> {
|
|||
)
|
||||
}
|
||||
ColumnRaw::F64(values) => {
|
||||
let values = fbb.create_vector(&values);
|
||||
let values = fbb.create_vector(values);
|
||||
let values = entry_fb::F64Values::create(
|
||||
fbb,
|
||||
&entry_fb::F64ValuesArgs {
|
||||
|
@ -1631,7 +1631,7 @@ impl<'a> ColumnBuilder<'a> {
|
|||
)
|
||||
}
|
||||
ColumnRaw::U64(values) => {
|
||||
let values = fbb.create_vector(&values);
|
||||
let values = fbb.create_vector(values);
|
||||
let values = entry_fb::U64Values::create(
|
||||
fbb,
|
||||
&entry_fb::U64ValuesArgs {
|
||||
|
@ -1802,7 +1802,7 @@ pub mod test_helpers {
|
|||
/// Converts the line protocol to a single `Entry` with a single shard and
|
||||
/// a single partition.
|
||||
pub fn lp_to_entry(lp: &str) -> Entry {
|
||||
let lines: Vec<_> = parse_lines(&lp).map(|l| l.unwrap()).collect();
|
||||
let lines: Vec<_> = parse_lines(lp).map(|l| l.unwrap()).collect();
|
||||
|
||||
let default_time = Utc::now().timestamp_nanos();
|
||||
|
||||
|
@ -1822,7 +1822,7 @@ pub mod test_helpers {
|
|||
/// shard and a single partition, which is useful for testing when `lp` is
|
||||
/// large. Batches are sized according to LP_BATCH_SIZE.
|
||||
pub fn lp_to_entries(lp: &str, partitioner: &impl Partitioner) -> Vec<Entry> {
|
||||
let lines: Vec<_> = parse_lines(&lp).map(|l| l.unwrap()).collect();
|
||||
let lines: Vec<_> = parse_lines(lp).map(|l| l.unwrap()).collect();
|
||||
|
||||
let default_time = Utc::now().timestamp_nanos();
|
||||
|
||||
|
@ -2446,7 +2446,7 @@ mod tests {
|
|||
|
||||
// One point that has no timestamp
|
||||
let lp = "a val=1i";
|
||||
let lines: Vec<_> = parse_lines(&lp).map(|l| l.unwrap()).collect();
|
||||
let lines: Vec<_> = parse_lines(lp).map(|l| l.unwrap()).collect();
|
||||
|
||||
// Partition on the hour
|
||||
let hour_partitioner = hour_partitioner();
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
mod entry;
|
||||
|
||||
#[allow(unused_imports)]
|
||||
#[allow(unused_imports, clippy::needless_borrow)]
|
||||
mod entry_generated;
|
||||
|
||||
pub use crate::entry::*;
|
||||
|
|
|
@ -177,7 +177,7 @@ mod tests {
|
|||
|
||||
let client = Client::new(&mockito::server_url(), token);
|
||||
|
||||
let _result = client.query_suggestions_name(&suggestion_name).await;
|
||||
let _result = client.query_suggestions_name(suggestion_name).await;
|
||||
|
||||
mock_server.assert();
|
||||
}
|
||||
|
|
|
@ -169,7 +169,7 @@ impl PerformQuery {
|
|||
message
|
||||
.header_as_dictionary_batch()
|
||||
.ok_or(Error::CouldNotGetDictionaryBatch)?,
|
||||
&schema,
|
||||
schema,
|
||||
dictionaries_by_field,
|
||||
)?;
|
||||
|
||||
|
@ -184,8 +184,8 @@ impl PerformQuery {
|
|||
|
||||
Ok(Some(flight_data_to_arrow_batch(
|
||||
&data,
|
||||
Arc::clone(&schema),
|
||||
&dictionaries_by_field,
|
||||
Arc::clone(schema),
|
||||
dictionaries_by_field,
|
||||
)?))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -118,9 +118,9 @@ impl QueryOutputFormat {
|
|||
/// ```
|
||||
pub fn format(&self, batches: &[RecordBatch]) -> Result<String> {
|
||||
match self {
|
||||
Self::Pretty => batches_to_pretty(&batches),
|
||||
Self::Csv => batches_to_csv(&batches),
|
||||
Self::Json => batches_to_json(&batches),
|
||||
Self::Pretty => batches_to_pretty(batches),
|
||||
Self::Csv => batches_to_csv(batches),
|
||||
Self::Json => batches_to_json(batches),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -23,7 +23,7 @@ pub fn encode(src: &[i64], dst: &mut Vec<u8>) -> Result<(), Box<dyn Error>> {
|
|||
}
|
||||
|
||||
let mut max: u64 = 0;
|
||||
let mut deltas = i64_to_u64_vector(&src);
|
||||
let mut deltas = i64_to_u64_vector(src);
|
||||
for i in (1..deltas.len()).rev() {
|
||||
deltas[i] = zig_zag_encode(deltas[i].wrapping_sub(deltas[i - 1]) as i64);
|
||||
if deltas[i] > max {
|
||||
|
|
|
@ -152,8 +152,8 @@ pub fn decode(src: &[u8], dst: &mut Vec<i64>) -> Result<(), Box<dyn Error>> {
|
|||
encoding if encoding == Encoding::Uncompressed as u8 => {
|
||||
decode_uncompressed(&src[1..], dst) // first byte not used
|
||||
}
|
||||
encoding if encoding == Encoding::Rle as u8 => decode_rle(&src, dst),
|
||||
encoding if encoding == Encoding::Simple8b as u8 => decode_simple8b(&src, dst),
|
||||
encoding if encoding == Encoding::Rle as u8 => decode_rle(src, dst),
|
||||
encoding if encoding == Encoding::Simple8b as u8 => decode_simple8b(src, dst),
|
||||
_ => Err(From::from("invalid block encoding")),
|
||||
}
|
||||
}
|
||||
|
|
|
@ -6,7 +6,7 @@ use std::error::Error;
|
|||
/// deltas are further compressed if possible, either via bit-packing using
|
||||
/// simple8b or by run-length encoding the deltas if they're all the same.
|
||||
pub fn encode(src: &[u64], dst: &mut Vec<u8>) -> Result<(), Box<dyn Error>> {
|
||||
let signed = u64_to_i64_vector(&src);
|
||||
let signed = u64_to_i64_vector(src);
|
||||
super::integer::encode(&signed, dst)
|
||||
}
|
||||
|
||||
|
|
|
@ -468,7 +468,7 @@ impl BlockData {
|
|||
}
|
||||
|
||||
// The merged output block data to be returned
|
||||
let mut block_data = Self::new_from_data(&blocks.first().unwrap());
|
||||
let mut block_data = Self::new_from_data(blocks.first().unwrap());
|
||||
|
||||
// buf will hold the next candidates from each of the sorted input
|
||||
// blocks.
|
||||
|
|
|
@ -475,7 +475,7 @@ impl Schema {
|
|||
// Now, sort lexographically (but put timestamp last)
|
||||
primary_keys.sort_by(|(a_column_type, a), (b_column_type, b)| {
|
||||
match (a_column_type, b_column_type) {
|
||||
(Tag, Tag) => a.name().cmp(&b.name()),
|
||||
(Tag, Tag) => a.name().cmp(b.name()),
|
||||
(Timestamp, Tag) => Ordering::Greater,
|
||||
(Tag, Timestamp) => Ordering::Less,
|
||||
(Timestamp, Timestamp) => panic!("multiple timestamps in summary"),
|
||||
|
|
|
@ -258,6 +258,12 @@ fn normalize_spans(lines: Vec<String>) -> Vec<String> {
|
|||
// Note: we include leading and trailing spaces so that span=2
|
||||
// doesn't also match span=21423
|
||||
let re = Regex::new(r#" span=(\d+) "#).unwrap();
|
||||
|
||||
// This collect isn't needless: the `fold` below moves `lines`, so this
|
||||
// iterator can't borrow `lines`, we need to collect into a `Vec` to
|
||||
// stop borrowing `lines`.
|
||||
// See https://github.com/rust-lang/rust-clippy/issues/7336
|
||||
#[allow(clippy::needless_collect)]
|
||||
let span_ids: Vec<String> = lines
|
||||
.iter()
|
||||
.map(|line| re.find_iter(line))
|
||||
|
|
|
@ -569,7 +569,7 @@ impl AggregatableByRange for Vector {
|
|||
from_row_id: usize,
|
||||
to_row_id: usize,
|
||||
) -> Aggregate {
|
||||
Vector::aggregate_by_id_range(&self, agg_type, from_row_id, to_row_id)
|
||||
Vector::aggregate_by_id_range(self, agg_type, from_row_id, to_row_id)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -861,7 +861,7 @@ impl Column {
|
|||
}
|
||||
|
||||
let now = std::time::Instant::now();
|
||||
let v = c.encoded_values(&row_ids);
|
||||
let v = c.encoded_values(row_ids);
|
||||
debug!("time getting encoded values {:?}", now.elapsed());
|
||||
|
||||
debug!("dictionary {:?}", c.data.dictionary());
|
||||
|
@ -872,14 +872,14 @@ impl Column {
|
|||
return Vector::Float(vec![]);
|
||||
}
|
||||
|
||||
Vector::Float(c.encoded_values(&row_ids))
|
||||
Vector::Float(c.encoded_values(row_ids))
|
||||
}
|
||||
Column::Integer(c) => {
|
||||
if row_ids.is_empty() {
|
||||
return Vector::Integer(vec![]);
|
||||
}
|
||||
|
||||
Vector::Integer(c.encoded_values(&row_ids))
|
||||
Vector::Integer(c.encoded_values(row_ids))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1328,7 +1328,7 @@ impl AggregatableByRange for &Column {
|
|||
from_row_id: usize,
|
||||
to_row_id: usize,
|
||||
) -> Aggregate {
|
||||
Column::aggregate_by_id_range(&self, agg_type, from_row_id, to_row_id)
|
||||
Column::aggregate_by_id_range(self, agg_type, from_row_id, to_row_id)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -305,7 +305,7 @@ impl Segment {
|
|||
// we are grouping on and store each group as an iterator.
|
||||
let mut group_column_encoded_values = Vec::with_capacity(group_columns.len());
|
||||
for group_column in group_columns {
|
||||
if let Some(column) = self.column(&group_column) {
|
||||
if let Some(column) = self.column(group_column) {
|
||||
let encoded_values = if filtered_row_ids_vec.len() == self.meta.rows {
|
||||
column.all_encoded_values()
|
||||
} else {
|
||||
|
@ -330,7 +330,7 @@ impl Segment {
|
|||
for (column_name, _) in aggregates {
|
||||
// let column_name: &String = column_name;
|
||||
|
||||
if let Some(column) = self.column(&column_name) {
|
||||
if let Some(column) = self.column(column_name) {
|
||||
let decoded_values = column.values(&filtered_row_ids_vec);
|
||||
assert_eq!(
|
||||
filtered_row_ids.cardinality() as usize,
|
||||
|
@ -514,7 +514,7 @@ impl Segment {
|
|||
// we are grouping on and store each group as an iterator.
|
||||
let mut group_column_encoded_values = Vec::with_capacity(group_columns.len());
|
||||
for group_column in group_columns {
|
||||
if let Some(column) = self.column(&group_column) {
|
||||
if let Some(column) = self.column(group_column) {
|
||||
let encoded_values = if filtered_row_ids_vec.len() == self.meta.rows {
|
||||
column.all_encoded_values()
|
||||
} else {
|
||||
|
@ -536,7 +536,7 @@ impl Segment {
|
|||
// aggregating on.
|
||||
let mut aggregate_column_decoded_values = Vec::with_capacity(aggregates.len());
|
||||
for (column_name, _) in aggregates {
|
||||
if let Some(column) = self.column(&column_name) {
|
||||
if let Some(column) = self.column(column_name) {
|
||||
let decoded_values = column.values(&filtered_row_ids_vec);
|
||||
assert_eq!(
|
||||
filtered_row_ids.cardinality() as usize,
|
||||
|
@ -657,7 +657,7 @@ impl Segment {
|
|||
// we are grouping on and store each group as an iterator.
|
||||
let mut group_column_encoded_values = Vec::with_capacity(group_columns.len());
|
||||
for group_column in group_columns {
|
||||
if let Some(column) = self.column(&group_column) {
|
||||
if let Some(column) = self.column(group_column) {
|
||||
let encoded_values = if filtered_row_ids_vec.len() == self.meta.rows {
|
||||
column.all_encoded_values()
|
||||
} else {
|
||||
|
@ -688,7 +688,7 @@ impl Segment {
|
|||
aggregate_cols.push((
|
||||
column_name.clone(),
|
||||
agg_type.clone(),
|
||||
self.column(&column_name).unwrap(),
|
||||
self.column(column_name).unwrap(),
|
||||
));
|
||||
}
|
||||
|
||||
|
@ -1358,8 +1358,8 @@ impl<'a> Segments<'a> {
|
|||
segment.aggregate_by_group_with_hash(
|
||||
time_range,
|
||||
predicates,
|
||||
&group_columns,
|
||||
&aggregates,
|
||||
group_columns,
|
||||
aggregates,
|
||||
window,
|
||||
);
|
||||
info!(
|
||||
|
|
|
@ -394,7 +394,7 @@ impl Domain {
|
|||
Arc::clone(vacant.insert(gauge))
|
||||
}
|
||||
};
|
||||
Gauge::new(gauge, [&self.default_labels[..], &default_labels].concat())
|
||||
Gauge::new(gauge, [&self.default_labels[..], default_labels].concat())
|
||||
}
|
||||
|
||||
/// An observer can be used to provide asynchronous fetching of values from an object
|
||||
|
|
|
@ -375,7 +375,7 @@ impl Histogram {
|
|||
/// A helper method for observing latencies. Returns a new timing instrument
|
||||
/// which will handle submitting an observation containing a duration.
|
||||
pub fn timer(&self) -> HistogramTimer<'_> {
|
||||
HistogramTimer::new(&self)
|
||||
HistogramTimer::new(self)
|
||||
}
|
||||
}
|
||||
#[derive(Debug)]
|
||||
|
|
|
@ -281,7 +281,7 @@ impl MBChunk {
|
|||
);
|
||||
|
||||
if let Some(c) = self.columns.get(column.name()) {
|
||||
c.validate_schema(&column).context(ColumnError {
|
||||
c.validate_schema(column).context(ColumnError {
|
||||
column: column.name(),
|
||||
})?;
|
||||
}
|
||||
|
@ -445,7 +445,7 @@ mod tests {
|
|||
cpu,host=c,env=stage val=11 1
|
||||
cpu,host=a,env=prod val=14 2
|
||||
"#;
|
||||
let chunk = write_lp_to_new_chunk(&lp).unwrap();
|
||||
let chunk = write_lp_to_new_chunk(lp).unwrap();
|
||||
|
||||
let summary = chunk.table_summary();
|
||||
assert_eq!(summary.name, "cpu");
|
||||
|
@ -525,7 +525,7 @@ mod tests {
|
|||
cpu,host2=a v=1 40
|
||||
cpu,host=c v=1 50
|
||||
"#;
|
||||
let chunk = write_lp_to_new_chunk(&lp).unwrap();
|
||||
let chunk = write_lp_to_new_chunk(lp).unwrap();
|
||||
let expected = ColumnSummary {
|
||||
name: "host".into(),
|
||||
influxdb_type: Some(InfluxDbType::Tag),
|
||||
|
@ -550,7 +550,7 @@ mod tests {
|
|||
cpu,host=a val=false 40
|
||||
cpu,host=c other_val=2 50
|
||||
"#;
|
||||
let chunk = write_lp_to_new_chunk(&lp).unwrap();
|
||||
let chunk = write_lp_to_new_chunk(lp).unwrap();
|
||||
let expected = ColumnSummary {
|
||||
name: "val".into(),
|
||||
influxdb_type: Some(InfluxDbType::Field),
|
||||
|
@ -574,7 +574,7 @@ mod tests {
|
|||
cpu,host=a val=1u 40
|
||||
cpu,host=c other_val=2 50
|
||||
"#;
|
||||
let chunk = write_lp_to_new_chunk(&lp).unwrap();
|
||||
let chunk = write_lp_to_new_chunk(lp).unwrap();
|
||||
let expected = ColumnSummary {
|
||||
name: "val".into(),
|
||||
influxdb_type: Some(InfluxDbType::Field),
|
||||
|
@ -598,7 +598,7 @@ mod tests {
|
|||
cpu,host=a val=1.0 40
|
||||
cpu,host=c other_val=2.0 50
|
||||
"#;
|
||||
let chunk = write_lp_to_new_chunk(&lp).unwrap();
|
||||
let chunk = write_lp_to_new_chunk(lp).unwrap();
|
||||
let expected = ColumnSummary {
|
||||
name: "val".into(),
|
||||
influxdb_type: Some(InfluxDbType::Field),
|
||||
|
@ -622,7 +622,7 @@ mod tests {
|
|||
cpu,host=a val=1i 40
|
||||
cpu,host=c other_val=2.0 50
|
||||
"#;
|
||||
let chunk = write_lp_to_new_chunk(&lp).unwrap();
|
||||
let chunk = write_lp_to_new_chunk(lp).unwrap();
|
||||
let expected = ColumnSummary {
|
||||
name: "val".into(),
|
||||
influxdb_type: Some(InfluxDbType::Field),
|
||||
|
@ -646,7 +646,7 @@ mod tests {
|
|||
cpu,host=a val="v3" 40
|
||||
cpu,host=c other_val=2.0 50
|
||||
"#;
|
||||
let chunk = write_lp_to_new_chunk(&lp).unwrap();
|
||||
let chunk = write_lp_to_new_chunk(lp).unwrap();
|
||||
let expected = ColumnSummary {
|
||||
name: "val".into(),
|
||||
influxdb_type: Some(InfluxDbType::Field),
|
||||
|
@ -670,7 +670,7 @@ mod tests {
|
|||
cpu,host=a val=4 2
|
||||
cpu,host=c val=25 12
|
||||
"#;
|
||||
let chunk = write_lp_to_new_chunk(&lp).unwrap();
|
||||
let chunk = write_lp_to_new_chunk(lp).unwrap();
|
||||
let expected = ColumnSummary {
|
||||
name: "time".into(),
|
||||
influxdb_type: Some(InfluxDbType::Timestamp),
|
||||
|
@ -699,7 +699,7 @@ mod tests {
|
|||
cpu,host2=z v=1 40
|
||||
cpu,host=c v=1 5
|
||||
"#;
|
||||
write_lp_to_chunk(&lp2, &mut chunk).unwrap();
|
||||
write_lp_to_chunk(lp2, &mut chunk).unwrap();
|
||||
|
||||
let expected = ColumnSummary {
|
||||
name: "host".into(),
|
||||
|
@ -823,7 +823,7 @@ mod tests {
|
|||
let mut table = write_lp_to_new_chunk(lp).unwrap();
|
||||
|
||||
let lp = "foo t1=\"string\" 1";
|
||||
let entry = lp_to_entry(&lp);
|
||||
let entry = lp_to_entry(lp);
|
||||
let response = table
|
||||
.write_columns(
|
||||
entry
|
||||
|
@ -854,7 +854,7 @@ mod tests {
|
|||
);
|
||||
|
||||
let lp = "foo iv=1u 1";
|
||||
let entry = lp_to_entry(&lp);
|
||||
let entry = lp_to_entry(lp);
|
||||
let response = table
|
||||
.write_columns(
|
||||
entry
|
||||
|
@ -885,7 +885,7 @@ mod tests {
|
|||
);
|
||||
|
||||
let lp = "foo fv=1i 1";
|
||||
let entry = lp_to_entry(&lp);
|
||||
let entry = lp_to_entry(lp);
|
||||
let response = table
|
||||
.write_columns(
|
||||
entry
|
||||
|
@ -916,7 +916,7 @@ mod tests {
|
|||
);
|
||||
|
||||
let lp = "foo bv=1 1";
|
||||
let entry = lp_to_entry(&lp);
|
||||
let entry = lp_to_entry(lp);
|
||||
let response = table
|
||||
.write_columns(
|
||||
entry
|
||||
|
@ -947,7 +947,7 @@ mod tests {
|
|||
);
|
||||
|
||||
let lp = "foo sv=true 1";
|
||||
let entry = lp_to_entry(&lp);
|
||||
let entry = lp_to_entry(lp);
|
||||
let response = table
|
||||
.write_columns(
|
||||
entry
|
||||
|
@ -978,7 +978,7 @@ mod tests {
|
|||
);
|
||||
|
||||
let lp = "foo,sv=\"bar\" f=3i 1";
|
||||
let entry = lp_to_entry(&lp);
|
||||
let entry = lp_to_entry(lp);
|
||||
let response = table
|
||||
.write_columns(
|
||||
entry
|
||||
|
|
|
@ -89,7 +89,7 @@ impl ObjectStoreApi for InMemory {
|
|||
}
|
||||
|
||||
async fn delete(&self, location: &Self::Path) -> Result<()> {
|
||||
self.storage.write().await.remove(&location);
|
||||
self.storage.write().await.remove(location);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
|
|
@ -453,7 +453,7 @@ mod tests {
|
|||
}
|
||||
|
||||
async fn measure_list(store: &ThrottledStore<InMemory>, n_entries: usize) -> Duration {
|
||||
let prefix = place_test_objects(&store, n_entries).await;
|
||||
let prefix = place_test_objects(store, n_entries).await;
|
||||
|
||||
let t0 = Instant::now();
|
||||
store
|
||||
|
@ -471,7 +471,7 @@ mod tests {
|
|||
store: &ThrottledStore<InMemory>,
|
||||
n_entries: usize,
|
||||
) -> Duration {
|
||||
let prefix = place_test_objects(&store, n_entries).await;
|
||||
let prefix = place_test_objects(store, n_entries).await;
|
||||
|
||||
let t0 = Instant::now();
|
||||
store.list_with_delimiter(&prefix).await.unwrap();
|
||||
|
|
|
@ -303,7 +303,7 @@ where
|
|||
}
|
||||
|
||||
pub fn iter(&self) -> PackerIterator<'_, T> {
|
||||
PackerIterator::new(&self)
|
||||
PackerIterator::new(self)
|
||||
}
|
||||
|
||||
// TODO(edd): I don't like these getters. They're only needed so we can
|
||||
|
|
|
@ -624,7 +624,7 @@ async fn list_files(
|
|||
server_id: ServerId,
|
||||
db_name: &str,
|
||||
) -> Result<Vec<(Path, FileType, u64, Uuid)>> {
|
||||
let list_path = catalog_path(&object_store, server_id, &db_name);
|
||||
let list_path = catalog_path(object_store, server_id, db_name);
|
||||
let paths = object_store
|
||||
.list(Some(&list_path))
|
||||
.await
|
||||
|
@ -658,7 +658,7 @@ async fn store_transaction_proto(
|
|||
|
||||
object_store
|
||||
.put(
|
||||
&path,
|
||||
path,
|
||||
futures::stream::once(async move { Ok(data) }),
|
||||
Some(len),
|
||||
)
|
||||
|
@ -674,7 +674,7 @@ async fn load_transaction_proto(
|
|||
path: &Path,
|
||||
) -> Result<proto::Transaction> {
|
||||
let data = object_store
|
||||
.get(&path)
|
||||
.get(path)
|
||||
.await
|
||||
.context(Read {})?
|
||||
.map_ok(|bytes| bytes.to_vec())
|
||||
|
@ -2492,7 +2492,7 @@ mod tests {
|
|||
&object_store,
|
||||
server_id,
|
||||
db_name,
|
||||
&tkey,
|
||||
tkey,
|
||||
FileType::Checkpoint,
|
||||
);
|
||||
store_transaction_proto(&object_store, &path, &proto)
|
||||
|
@ -2650,7 +2650,7 @@ mod tests {
|
|||
|
||||
object_store
|
||||
.put(
|
||||
&path,
|
||||
path,
|
||||
futures::stream::once(async move { Ok(data) }),
|
||||
Some(len),
|
||||
)
|
||||
|
@ -2661,7 +2661,7 @@ mod tests {
|
|||
async fn checked_delete(object_store: &ObjectStore, path: &Path) {
|
||||
// issue full GET operation to check if object is preset
|
||||
object_store
|
||||
.get(&path)
|
||||
.get(path)
|
||||
.await
|
||||
.unwrap()
|
||||
.map_ok(|bytes| bytes.to_vec())
|
||||
|
@ -2670,7 +2670,7 @@ mod tests {
|
|||
.unwrap();
|
||||
|
||||
// delete it
|
||||
object_store.delete(&path).await.unwrap();
|
||||
object_store.delete(path).await.unwrap();
|
||||
}
|
||||
|
||||
/// Result of [`assert_single_catalog_inmem_works`].
|
||||
|
@ -2713,7 +2713,7 @@ mod tests {
|
|||
db_name: &str,
|
||||
) -> TestTrace {
|
||||
let (catalog, mut state) = PreservedCatalog::new_empty(
|
||||
Arc::clone(&object_store),
|
||||
Arc::clone(object_store),
|
||||
server_id,
|
||||
db_name.to_string(),
|
||||
(),
|
||||
|
@ -2991,7 +2991,7 @@ mod tests {
|
|||
assert_single_catalog_inmem_works(&object_store, make_server_id(), "db1").await;
|
||||
|
||||
// create junk
|
||||
let mut path = catalog_path(&object_store, server_id, &db_name);
|
||||
let mut path = catalog_path(&object_store, server_id, db_name);
|
||||
path.push_dir("0");
|
||||
path.set_file_name(format!("{}.foo", Uuid::new_v4()));
|
||||
create_empty_file(&object_store, &path).await;
|
||||
|
@ -3002,7 +3002,7 @@ mod tests {
|
|||
.unwrap();
|
||||
|
||||
// check file is still there
|
||||
let prefix = catalog_path(&object_store, server_id, &db_name);
|
||||
let prefix = catalog_path(&object_store, server_id, db_name);
|
||||
let files = object_store
|
||||
.list(Some(&prefix))
|
||||
.await
|
||||
|
|
|
@ -115,7 +115,7 @@ pub async fn delete_files(catalog: &PreservedCatalog, files: &[Path]) -> Result<
|
|||
|
||||
for path in files {
|
||||
info!(path = %path.display(), "Delete file");
|
||||
store.delete(&path).await.context(WriteError)?;
|
||||
store.delete(path).await.context(WriteError)?;
|
||||
}
|
||||
|
||||
info!(n_files = files.len(), "Finished deletion, removed files.");
|
||||
|
@ -383,7 +383,7 @@ mod tests {
|
|||
|
||||
object_store
|
||||
.put(
|
||||
&path,
|
||||
path,
|
||||
futures::stream::once(async move { Ok(data) }),
|
||||
Some(len),
|
||||
)
|
||||
|
|
|
@ -232,7 +232,7 @@ impl Storage {
|
|||
|
||||
self.object_store
|
||||
.put(
|
||||
&file_name,
|
||||
file_name,
|
||||
futures::stream::once(async move { stream_data }),
|
||||
Some(len),
|
||||
)
|
||||
|
|
|
@ -90,7 +90,7 @@ impl ExtensionPlanner for IOxExtensionPlanner {
|
|||
|
||||
let split_expr = planner.create_physical_expr(
|
||||
stream_split.split_expr(),
|
||||
&logical_inputs[0].schema(),
|
||||
logical_inputs[0].schema(),
|
||||
&physical_inputs[0].schema(),
|
||||
ctx_state,
|
||||
)?;
|
||||
|
@ -207,7 +207,7 @@ impl IOxExecutionContext {
|
|||
pub fn prepare_plan(&self, plan: &LogicalPlan) -> Result<Arc<dyn ExecutionPlan>> {
|
||||
debug!(text=%plan.display_indent_schema(), "prepare_plan: initial plan");
|
||||
|
||||
let plan = self.inner.optimize(&plan)?;
|
||||
let plan = self.inner.optimize(plan)?;
|
||||
trace!(text=%plan.display_indent_schema(), graphviz=%plan.display_graphviz(), "optimized plan");
|
||||
|
||||
let physical_plan = self.inner.create_physical_plan(&plan)?;
|
||||
|
|
|
@ -100,7 +100,7 @@ impl FieldIndexes {
|
|||
}
|
||||
|
||||
pub fn as_slice(&self) -> &[FieldIndex] {
|
||||
&self.inner.as_ref()
|
||||
self.inner.as_ref()
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -138,7 +138,7 @@ impl FieldIndexes {
|
|||
column_name: TIME_COLUMN_NAME,
|
||||
})?;
|
||||
|
||||
Self::names_to_indexes(schema, &field_names)?
|
||||
Self::names_to_indexes(schema, field_names)?
|
||||
.into_iter()
|
||||
.map(|field_index| FieldIndex {
|
||||
value_index: field_index,
|
||||
|
|
|
@ -377,7 +377,7 @@ impl SeriesSetConverter {
|
|||
),
|
||||
};
|
||||
|
||||
tag_value.map(|tag_value| (Arc::clone(&column_name), Arc::from(tag_value.as_str())))
|
||||
tag_value.map(|tag_value| (Arc::clone(column_name), Arc::from(tag_value.as_str())))
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
|
|
@ -73,7 +73,7 @@ impl UserDefinedLogicalNode for StreamSplitNode {
|
|||
|
||||
/// Schema is the same as the input schema
|
||||
fn schema(&self) -> &DFSchemaRef {
|
||||
&self.input.schema()
|
||||
self.input.schema()
|
||||
}
|
||||
|
||||
fn expressions(&self) -> Vec<Expr> {
|
||||
|
|
|
@ -662,7 +662,7 @@ impl InfluxRpcPlanner {
|
|||
for chunk in chunks {
|
||||
// Try and apply the predicate using only metadata
|
||||
let pred_result = chunk
|
||||
.apply_predicate_to_metadata(&predicate)
|
||||
.apply_predicate_to_metadata(predicate)
|
||||
.map_err(|e| Box::new(e) as _)
|
||||
.context(CheckingChunkPredicate {
|
||||
chunk_id: chunk.id(),
|
||||
|
@ -1048,7 +1048,7 @@ impl InfluxRpcPlanner {
|
|||
};
|
||||
|
||||
// Group by all tag columns and the window bounds
|
||||
let window_bound = make_window_bound_expr(TIME_COLUMN_NAME.as_expr(), &every, &offset)
|
||||
let window_bound = make_window_bound_expr(TIME_COLUMN_NAME.as_expr(), every, offset)
|
||||
.alias(TIME_COLUMN_NAME);
|
||||
|
||||
let group_exprs = schema
|
||||
|
@ -1517,7 +1517,7 @@ mod tests {
|
|||
fn reorder_prefix_err(prefix: &[&str], table_columns: &[&str]) -> String {
|
||||
let table_columns = table_columns.to_vec();
|
||||
|
||||
let res = reorder_prefix(&prefix, table_columns);
|
||||
let res = reorder_prefix(prefix, table_columns);
|
||||
|
||||
match res {
|
||||
Ok(r) => {
|
||||
|
|
|
@ -587,7 +587,7 @@ impl<C: QueryChunk + 'static> Deduplicater<C> {
|
|||
Self::build_sort_plan_for_read_filter(
|
||||
Arc::clone(&table_name),
|
||||
Arc::clone(&input_schema),
|
||||
Arc::clone(&chunk),
|
||||
Arc::clone(chunk),
|
||||
predicate.clone(),
|
||||
&sort_key,
|
||||
)
|
||||
|
@ -884,9 +884,9 @@ impl<C: QueryChunk + 'static> Deduplicater<C> {
|
|||
/// │ SortExec │ │ SortExec │
|
||||
/// │ (optional) │ │ (optional) │
|
||||
/// └─────────────────┘ └─────────────────┘
|
||||
/// ▲ ▲
|
||||
/// │ ..... │
|
||||
/// │ │
|
||||
/// ▲ ▲
|
||||
/// │ ..... │
|
||||
/// │ │
|
||||
/// ┌─────────────────┐ ┌─────────────────┐
|
||||
/// │IOxReadFilterNode│ │IOxReadFilterNode│
|
||||
/// │ (Chunk 1) │ │ (Chunk n) │
|
||||
|
@ -920,7 +920,7 @@ impl<C: QueryChunk + 'static> Deduplicater<C> {
|
|||
Self::build_plan_for_non_duplicates_chunk(
|
||||
Arc::clone(&table_name),
|
||||
Arc::clone(&output_schema),
|
||||
Arc::clone(&chunk),
|
||||
Arc::clone(chunk),
|
||||
predicate.clone(),
|
||||
output_sort_key,
|
||||
)
|
||||
|
@ -951,9 +951,9 @@ impl<C: QueryChunk + 'static> Deduplicater<C> {
|
|||
/// primary key columns
|
||||
fn compute_input_schema(output_schema: &Schema, pk_schema: &Schema) -> Arc<Schema> {
|
||||
let input_schema = SchemaMerger::new()
|
||||
.merge(&output_schema)
|
||||
.merge(output_schema)
|
||||
.unwrap()
|
||||
.merge(&pk_schema)
|
||||
.merge(pk_schema)
|
||||
.unwrap()
|
||||
.build();
|
||||
Arc::new(input_schema)
|
||||
|
|
|
@ -14,6 +14,7 @@ use datafusion::physical_plan::{RecordBatchStream, SendableRecordBatchStream};
|
|||
use futures::Stream;
|
||||
|
||||
/// Database schema creation / validation errors.
|
||||
#[allow(clippy::enum_variant_names)]
|
||||
#[derive(Debug, Snafu)]
|
||||
pub enum Error {
|
||||
#[snafu(display("Internal error creating SchemaAdapterStream: field '{}' does not appear in the output schema",
|
||||
|
@ -178,7 +179,7 @@ impl SchemaAdapterStream {
|
|||
.mappings
|
||||
.iter()
|
||||
.map(|mapping| match mapping {
|
||||
ColumnMapping::FromInput(input_index) => Arc::clone(&batch.column(*input_index)),
|
||||
ColumnMapping::FromInput(input_index) => Arc::clone(batch.column(*input_index)),
|
||||
ColumnMapping::MakeNull(data_type) => new_null_array(data_type, batch.num_rows()),
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
|
|
@ -284,12 +284,12 @@ impl RecordBatchDeduplicator {
|
|||
// Special case when no ranges are duplicated (so just emit input as output)
|
||||
if num_dupes == 0 {
|
||||
trace!(num_rows = batch.num_rows(), "No dupes");
|
||||
Self::slice_record_batch(&batch, 0, ranges.len())
|
||||
Self::slice_record_batch(batch, 0, ranges.len())
|
||||
} else {
|
||||
trace!(num_dupes, num_rows = batch.num_rows(), "dupes");
|
||||
|
||||
// Use take kernel
|
||||
let sort_key_indices = self.compute_sort_key_indices(&ranges);
|
||||
let sort_key_indices = self.compute_sort_key_indices(ranges);
|
||||
|
||||
let take_options = Some(TakeOptions {
|
||||
check_bounds: false,
|
||||
|
@ -309,7 +309,7 @@ impl RecordBatchDeduplicator {
|
|||
)
|
||||
} else {
|
||||
// pick the last non null value
|
||||
let field_indices = self.compute_field_indices(&ranges, input_array);
|
||||
let field_indices = self.compute_field_indices(ranges, input_array);
|
||||
|
||||
arrow::compute::take(
|
||||
input_array.as_ref(),
|
||||
|
|
|
@ -75,7 +75,7 @@ where
|
|||
{
|
||||
trace!(?filter_expr, schema=?chunk.schema(), "creating pruning predicate");
|
||||
|
||||
let pruning_predicate = match PruningPredicate::try_new(&filter_expr, chunk.schema().as_arrow())
|
||||
let pruning_predicate = match PruningPredicate::try_new(filter_expr, chunk.schema().as_arrow())
|
||||
{
|
||||
Ok(p) => p,
|
||||
Err(e) => {
|
||||
|
|
|
@ -935,7 +935,7 @@ impl DatabaseStore for TestDatabaseStore {
|
|||
let mut databases = self.databases.lock();
|
||||
|
||||
if let Some(db) = databases.get(name) {
|
||||
Ok(Arc::clone(&db))
|
||||
Ok(Arc::clone(db))
|
||||
} else {
|
||||
let new_db = Arc::new(TestDatabase::new());
|
||||
databases.insert(name.to_string(), Arc::clone(&new_db));
|
||||
|
|
|
@ -38,7 +38,7 @@ pub fn arrow_pk_sort_exprs(
|
|||
) -> Vec<PhysicalSortExpr> {
|
||||
let mut sort_exprs = vec![];
|
||||
for key in key_columns {
|
||||
let expr = physical_col(key, &input_schema).expect("pk in schema");
|
||||
let expr = physical_col(key, input_schema).expect("pk in schema");
|
||||
sort_exprs.push(PhysicalSortExpr {
|
||||
expr,
|
||||
options: SortOptions {
|
||||
|
@ -57,7 +57,7 @@ pub fn arrow_sort_key_exprs(
|
|||
) -> Vec<PhysicalSortExpr> {
|
||||
let mut sort_exprs = vec![];
|
||||
for (key, options) in sort_key.iter() {
|
||||
let expr = physical_col(key, &input_schema).expect("sort key column in schema");
|
||||
let expr = physical_col(key, input_schema).expect("sort key column in schema");
|
||||
sort_exprs.push(PhysicalSortExpr {
|
||||
expr,
|
||||
options: SortOptions {
|
||||
|
|
|
@ -236,7 +236,7 @@ async fn list_tag_values_field_col() {
|
|||
|
||||
// Test: temp is a field, not a tag
|
||||
let tag_name = "temp";
|
||||
let plan_result = planner.tag_values(db.as_ref(), &tag_name, predicate.clone());
|
||||
let plan_result = planner.tag_values(db.as_ref(), tag_name, predicate.clone());
|
||||
|
||||
assert_eq!(
|
||||
plan_result.unwrap_err().to_string(),
|
||||
|
|
|
@ -20,6 +20,7 @@ use std::{
|
|||
use self::{parse::TestQueries, setup::TestSetup};
|
||||
use crate::scenarios::{DbScenario, DbSetup};
|
||||
|
||||
#[allow(clippy::enum_variant_names)]
|
||||
#[derive(Debug, Snafu)]
|
||||
pub enum Error {
|
||||
#[snafu(display("Can not find case file '{:?}': {}", path, source))]
|
||||
|
@ -275,7 +276,7 @@ impl<W: Write> Runner<W> {
|
|||
let executor = Arc::new(executor);
|
||||
|
||||
let physical_plan = planner
|
||||
.query(db, &sql, executor.as_ref())
|
||||
.query(db, sql, executor.as_ref())
|
||||
.expect("built plan successfully");
|
||||
|
||||
let results: Vec<RecordBatch> = executor
|
||||
|
|
|
@ -7,6 +7,7 @@ use crate::scenarios::{get_all_setups, get_db_setup, DbSetup};
|
|||
|
||||
const IOX_SETUP_NEEDLE: &str = "-- IOX_SETUP: ";
|
||||
|
||||
#[allow(clippy::enum_variant_names)]
|
||||
#[derive(Debug, Snafu)]
|
||||
pub enum Error {
|
||||
#[snafu(display(
|
||||
|
|
|
@ -686,10 +686,10 @@ pub(crate) async fn make_one_chunk_rub_scenario(
|
|||
let db = make_db().await.db;
|
||||
let table_names = write_lp(&db, data).await;
|
||||
for table_name in &table_names {
|
||||
db.rollover_partition(&table_name, partition_key)
|
||||
db.rollover_partition(table_name, partition_key)
|
||||
.await
|
||||
.unwrap();
|
||||
db.move_chunk_to_read_buffer(&table_name, partition_key, 0)
|
||||
db.move_chunk_to_read_buffer(table_name, partition_key, 0)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
@ -745,7 +745,7 @@ pub(crate) async fn make_one_chunk_scenarios(partition_key: &str, data: &str) ->
|
|||
let db = make_db().await.db;
|
||||
let table_names = write_lp(&db, data).await;
|
||||
for table_name in &table_names {
|
||||
db.rollover_partition(&table_name, partition_key)
|
||||
db.rollover_partition(table_name, partition_key)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
@ -758,10 +758,10 @@ pub(crate) async fn make_one_chunk_scenarios(partition_key: &str, data: &str) ->
|
|||
let db = make_db().await.db;
|
||||
let table_names = write_lp(&db, data).await;
|
||||
for table_name in &table_names {
|
||||
db.rollover_partition(&table_name, partition_key)
|
||||
db.rollover_partition(table_name, partition_key)
|
||||
.await
|
||||
.unwrap();
|
||||
db.move_chunk_to_read_buffer(&table_name, partition_key, 0)
|
||||
db.move_chunk_to_read_buffer(table_name, partition_key, 0)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
@ -774,15 +774,15 @@ pub(crate) async fn make_one_chunk_scenarios(partition_key: &str, data: &str) ->
|
|||
let db = make_db().await.db;
|
||||
let table_names = write_lp(&db, data).await;
|
||||
for table_name in &table_names {
|
||||
db.rollover_partition(&table_name, partition_key)
|
||||
db.rollover_partition(table_name, partition_key)
|
||||
.await
|
||||
.unwrap();
|
||||
db.move_chunk_to_read_buffer(&table_name, partition_key, 0)
|
||||
db.move_chunk_to_read_buffer(table_name, partition_key, 0)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
db.persist_partition(
|
||||
&table_name,
|
||||
table_name,
|
||||
partition_key,
|
||||
Instant::now() + Duration::from_secs(1),
|
||||
)
|
||||
|
@ -798,21 +798,20 @@ pub(crate) async fn make_one_chunk_scenarios(partition_key: &str, data: &str) ->
|
|||
let db = make_db().await.db;
|
||||
let table_names = write_lp(&db, data).await;
|
||||
for table_name in &table_names {
|
||||
db.rollover_partition(&table_name, partition_key)
|
||||
db.rollover_partition(table_name, partition_key)
|
||||
.await
|
||||
.unwrap();
|
||||
db.move_chunk_to_read_buffer(&table_name, partition_key, 0)
|
||||
db.move_chunk_to_read_buffer(table_name, partition_key, 0)
|
||||
.await
|
||||
.unwrap();
|
||||
db.persist_partition(
|
||||
&table_name,
|
||||
table_name,
|
||||
partition_key,
|
||||
Instant::now() + Duration::from_secs(1),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
db.unload_read_buffer(&table_name, partition_key, 1)
|
||||
.unwrap();
|
||||
db.unload_read_buffer(table_name, partition_key, 1).unwrap();
|
||||
}
|
||||
let scenario5 = DbScenario {
|
||||
scenario_name: "Data in object store only".into(),
|
||||
|
@ -845,7 +844,7 @@ pub async fn make_two_chunk_scenarios(
|
|||
let db = make_db().await.db;
|
||||
let table_names = write_lp(&db, data1).await;
|
||||
for table_name in &table_names {
|
||||
db.rollover_partition(&table_name, partition_key)
|
||||
db.rollover_partition(table_name, partition_key)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
@ -859,10 +858,10 @@ pub async fn make_two_chunk_scenarios(
|
|||
let db = make_db().await.db;
|
||||
let table_names = write_lp(&db, data1).await;
|
||||
for table_name in &table_names {
|
||||
db.rollover_partition(&table_name, partition_key)
|
||||
db.rollover_partition(table_name, partition_key)
|
||||
.await
|
||||
.unwrap();
|
||||
db.move_chunk_to_read_buffer(&table_name, partition_key, 0)
|
||||
db.move_chunk_to_read_buffer(table_name, partition_key, 0)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
@ -876,21 +875,21 @@ pub async fn make_two_chunk_scenarios(
|
|||
let db = make_db().await.db;
|
||||
let table_names = write_lp(&db, data1).await;
|
||||
for table_name in &table_names {
|
||||
db.rollover_partition(&table_name, partition_key)
|
||||
db.rollover_partition(table_name, partition_key)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
let table_names = write_lp(&db, data2).await;
|
||||
for table_name in &table_names {
|
||||
db.rollover_partition(&table_name, partition_key)
|
||||
db.rollover_partition(table_name, partition_key)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
db.move_chunk_to_read_buffer(&table_name, partition_key, 0)
|
||||
db.move_chunk_to_read_buffer(table_name, partition_key, 0)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
db.move_chunk_to_read_buffer(&table_name, partition_key, 1)
|
||||
db.move_chunk_to_read_buffer(table_name, partition_key, 1)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
@ -903,14 +902,14 @@ pub async fn make_two_chunk_scenarios(
|
|||
let db = make_db().await.db;
|
||||
let table_names = write_lp(&db, data1).await;
|
||||
for table_name in &table_names {
|
||||
db.rollover_partition(&table_name, partition_key)
|
||||
db.rollover_partition(table_name, partition_key)
|
||||
.await
|
||||
.unwrap();
|
||||
db.move_chunk_to_read_buffer(&table_name, partition_key, 0)
|
||||
db.move_chunk_to_read_buffer(table_name, partition_key, 0)
|
||||
.await
|
||||
.unwrap();
|
||||
db.persist_partition(
|
||||
&table_name,
|
||||
table_name,
|
||||
partition_key,
|
||||
Instant::now() + Duration::from_secs(1),
|
||||
)
|
||||
|
@ -919,14 +918,14 @@ pub async fn make_two_chunk_scenarios(
|
|||
}
|
||||
let table_names = write_lp(&db, data2).await;
|
||||
for table_name in &table_names {
|
||||
db.rollover_partition(&table_name, partition_key)
|
||||
db.rollover_partition(table_name, partition_key)
|
||||
.await
|
||||
.unwrap();
|
||||
db.move_chunk_to_read_buffer(&table_name, partition_key, 2)
|
||||
db.move_chunk_to_read_buffer(table_name, partition_key, 2)
|
||||
.await
|
||||
.unwrap();
|
||||
db.persist_partition(
|
||||
&table_name,
|
||||
table_name,
|
||||
partition_key,
|
||||
Instant::now() + Duration::from_secs(1),
|
||||
)
|
||||
|
@ -942,39 +941,37 @@ pub async fn make_two_chunk_scenarios(
|
|||
let db = make_db().await.db;
|
||||
let table_names = write_lp(&db, data1).await;
|
||||
for table_name in &table_names {
|
||||
db.rollover_partition(&table_name, partition_key)
|
||||
db.rollover_partition(table_name, partition_key)
|
||||
.await
|
||||
.unwrap();
|
||||
db.move_chunk_to_read_buffer(&table_name, partition_key, 0)
|
||||
db.move_chunk_to_read_buffer(table_name, partition_key, 0)
|
||||
.await
|
||||
.unwrap();
|
||||
db.persist_partition(
|
||||
&table_name,
|
||||
table_name,
|
||||
partition_key,
|
||||
Instant::now() + Duration::from_secs(1),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
db.unload_read_buffer(&table_name, partition_key, 1)
|
||||
.unwrap();
|
||||
db.unload_read_buffer(table_name, partition_key, 1).unwrap();
|
||||
}
|
||||
let table_names = write_lp(&db, data2).await;
|
||||
for table_name in &table_names {
|
||||
db.rollover_partition(&table_name, partition_key)
|
||||
db.rollover_partition(table_name, partition_key)
|
||||
.await
|
||||
.unwrap();
|
||||
db.move_chunk_to_read_buffer(&table_name, partition_key, 2)
|
||||
db.move_chunk_to_read_buffer(table_name, partition_key, 2)
|
||||
.await
|
||||
.unwrap();
|
||||
db.persist_partition(
|
||||
&table_name,
|
||||
table_name,
|
||||
partition_key,
|
||||
Instant::now() + Duration::from_secs(1),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
db.unload_read_buffer(&table_name, partition_key, 3)
|
||||
.unwrap();
|
||||
db.unload_read_buffer(table_name, partition_key, 3).unwrap();
|
||||
}
|
||||
let scenario6 = DbScenario {
|
||||
scenario_name: "Data in 2 parquet chunks in object store only".into(),
|
||||
|
@ -986,17 +983,17 @@ pub async fn make_two_chunk_scenarios(
|
|||
let table_names = write_lp(&db, data1).await;
|
||||
for table_name in &table_names {
|
||||
// put chunk 1 into RUB
|
||||
db.rollover_partition(&table_name, partition_key)
|
||||
db.rollover_partition(table_name, partition_key)
|
||||
.await
|
||||
.unwrap();
|
||||
db.move_chunk_to_read_buffer(&table_name, partition_key, 0)
|
||||
db.move_chunk_to_read_buffer(table_name, partition_key, 0)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
let table_names = write_lp(&db, data2).await; // write to MUB
|
||||
for table_name in &table_names {
|
||||
// compact chunks into a single RUB chunk
|
||||
db.compact_partition(&table_name, partition_key)
|
||||
db.compact_partition(table_name, partition_key)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
@ -1036,10 +1033,10 @@ pub(crate) async fn make_one_rub_or_parquet_chunk_scenario(
|
|||
let db = make_db().await.db;
|
||||
let table_names = write_lp(&db, data).await;
|
||||
for table_name in &table_names {
|
||||
db.rollover_partition(&table_name, partition_key)
|
||||
db.rollover_partition(table_name, partition_key)
|
||||
.await
|
||||
.unwrap();
|
||||
db.move_chunk_to_read_buffer(&table_name, partition_key, 0)
|
||||
db.move_chunk_to_read_buffer(table_name, partition_key, 0)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
@ -1052,21 +1049,20 @@ pub(crate) async fn make_one_rub_or_parquet_chunk_scenario(
|
|||
let db = make_db().await.db;
|
||||
let table_names = write_lp(&db, data).await;
|
||||
for table_name in &table_names {
|
||||
db.rollover_partition(&table_name, partition_key)
|
||||
db.rollover_partition(table_name, partition_key)
|
||||
.await
|
||||
.unwrap();
|
||||
db.move_chunk_to_read_buffer(&table_name, partition_key, 0)
|
||||
db.move_chunk_to_read_buffer(table_name, partition_key, 0)
|
||||
.await
|
||||
.unwrap();
|
||||
db.persist_partition(
|
||||
&table_name,
|
||||
table_name,
|
||||
partition_key,
|
||||
Instant::now() + Duration::from_secs(1),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
db.unload_read_buffer(&table_name, partition_key, 1)
|
||||
.unwrap();
|
||||
db.unload_read_buffer(table_name, partition_key, 1).unwrap();
|
||||
}
|
||||
let scenario2 = DbScenario {
|
||||
scenario_name: "--------------------- Data in object store only ".into(),
|
||||
|
|
|
@ -99,7 +99,7 @@ fn benchmark_plain_sum(
|
|||
|b, input| {
|
||||
b.iter(|| {
|
||||
// do work
|
||||
let _ = encoding.sum(&input);
|
||||
let _ = encoding.sum(input);
|
||||
});
|
||||
},
|
||||
);
|
||||
|
@ -121,7 +121,7 @@ fn benchmark_plain_sum(
|
|||
|b, input| {
|
||||
b.iter(|| {
|
||||
// do work
|
||||
let _ = encoding.sum(&input);
|
||||
let _ = encoding.sum(input);
|
||||
});
|
||||
},
|
||||
);
|
||||
|
@ -148,7 +148,7 @@ fn benchmark_plain_sum(
|
|||
|b, input| {
|
||||
b.iter(|| {
|
||||
// do work
|
||||
let _ = encoding.sum(&input);
|
||||
let _ = encoding.sum(input);
|
||||
});
|
||||
},
|
||||
);
|
||||
|
@ -170,7 +170,7 @@ fn benchmark_plain_sum(
|
|||
|b, input| {
|
||||
b.iter(|| {
|
||||
// do work
|
||||
let _ = encoding.sum(&input);
|
||||
let _ = encoding.sum(input);
|
||||
});
|
||||
},
|
||||
);
|
||||
|
@ -197,7 +197,7 @@ fn benchmark_plain_sum(
|
|||
|b, input| {
|
||||
b.iter(|| {
|
||||
// do work
|
||||
let _ = encoding.sum(&input);
|
||||
let _ = encoding.sum(input);
|
||||
});
|
||||
},
|
||||
);
|
||||
|
@ -219,7 +219,7 @@ fn benchmark_plain_sum(
|
|||
|b, input| {
|
||||
b.iter(|| {
|
||||
// do work
|
||||
let _ = encoding.sum(&input);
|
||||
let _ = encoding.sum(input);
|
||||
});
|
||||
},
|
||||
);
|
||||
|
|
|
@ -29,7 +29,7 @@ fn read_group_predicate_all_time(c: &mut Criterion, row_group: &RowGroup, rng: &
|
|||
benchmark_read_group_vary_cardinality(
|
||||
c,
|
||||
"row_group_read_group_all_time_vary_cardinality",
|
||||
&row_group,
|
||||
row_group,
|
||||
&time_pred,
|
||||
// grouping columns and expected cardinality
|
||||
vec![
|
||||
|
@ -47,7 +47,7 @@ fn read_group_predicate_all_time(c: &mut Criterion, row_group: &RowGroup, rng: &
|
|||
benchmark_read_group_vary_group_cols(
|
||||
c,
|
||||
"row_group_read_group_all_time_vary_columns",
|
||||
&row_group,
|
||||
row_group,
|
||||
&time_pred,
|
||||
// number of cols to group on and expected cardinality
|
||||
vec![
|
||||
|
@ -82,7 +82,7 @@ fn read_group_pre_computed_groups(c: &mut Criterion, row_group: &RowGroup, rng:
|
|||
benchmark_read_group_vary_cardinality(
|
||||
c,
|
||||
"row_group_read_group_pre_computed_groups_vary_cardinality",
|
||||
&row_group,
|
||||
row_group,
|
||||
&Predicate::default(),
|
||||
// grouping columns and expected cardinality
|
||||
vec![
|
||||
|
@ -99,7 +99,7 @@ fn read_group_pre_computed_groups(c: &mut Criterion, row_group: &RowGroup, rng:
|
|||
benchmark_read_group_vary_group_cols(
|
||||
c,
|
||||
"row_group_read_group_pre_computed_groups_vary_columns",
|
||||
&row_group,
|
||||
row_group,
|
||||
&Predicate::default(),
|
||||
// number of cols to group on and expected cardinality
|
||||
vec![
|
||||
|
|
|
@ -99,7 +99,7 @@ fn benchmark_none_sum(
|
|||
|b, input| {
|
||||
b.iter(|| {
|
||||
// do work
|
||||
let _ = encoding.sum(&input);
|
||||
let _ = encoding.sum(input);
|
||||
});
|
||||
},
|
||||
);
|
||||
|
@ -121,7 +121,7 @@ fn benchmark_none_sum(
|
|||
|b, input| {
|
||||
b.iter(|| {
|
||||
// do work
|
||||
let _ = encoding.sum(&input);
|
||||
let _ = encoding.sum(input);
|
||||
});
|
||||
},
|
||||
);
|
||||
|
@ -148,7 +148,7 @@ fn benchmark_none_sum(
|
|||
|b, input| {
|
||||
b.iter(|| {
|
||||
// do work
|
||||
let _ = encoding.sum(&input);
|
||||
let _ = encoding.sum(input);
|
||||
});
|
||||
},
|
||||
);
|
||||
|
@ -170,7 +170,7 @@ fn benchmark_none_sum(
|
|||
|b, input| {
|
||||
b.iter(|| {
|
||||
// do work
|
||||
let _ = encoding.sum(&input);
|
||||
let _ = encoding.sum(input);
|
||||
});
|
||||
},
|
||||
);
|
||||
|
@ -197,7 +197,7 @@ fn benchmark_none_sum(
|
|||
|b, input| {
|
||||
b.iter(|| {
|
||||
// do work
|
||||
let _ = encoding.sum(&input);
|
||||
let _ = encoding.sum(input);
|
||||
});
|
||||
},
|
||||
);
|
||||
|
@ -219,7 +219,7 @@ fn benchmark_none_sum(
|
|||
|b, input| {
|
||||
b.iter(|| {
|
||||
// do work
|
||||
let _ = encoding.sum(&input);
|
||||
let _ = encoding.sum(input);
|
||||
});
|
||||
},
|
||||
);
|
||||
|
|
|
@ -130,7 +130,7 @@ impl Chunk {
|
|||
/// caller does not need to be concerned about the size of the update.
|
||||
pub fn upsert_table(&mut self, table_data: RecordBatch) {
|
||||
let table_name = self.table.name();
|
||||
let row_group = record_batch_to_row_group_with_logging(&table_name, table_data);
|
||||
let row_group = record_batch_to_row_group_with_logging(table_name, table_data);
|
||||
|
||||
self.upsert_table_with_row_group(row_group)
|
||||
}
|
||||
|
|
|
@ -390,7 +390,7 @@ impl Column {
|
|||
) -> RowIDsOption {
|
||||
// If we can get an answer using only the meta-data on the column then
|
||||
// return that answer.
|
||||
match self.evaluate_predicate_on_meta(&op, &value) {
|
||||
match self.evaluate_predicate_on_meta(op, value) {
|
||||
PredicateMatch::None => return RowIDsOption::None(dst),
|
||||
PredicateMatch::All => return RowIDsOption::All(dst),
|
||||
PredicateMatch::SomeMaybe => {} // have to apply predicate to column
|
||||
|
@ -482,7 +482,7 @@ impl Column {
|
|||
// When the predicate is == and the metadata range indicates the column
|
||||
// can't contain `value` then the column doesn't need to be read.
|
||||
cmp::Operator::Equal => {
|
||||
if !self.might_contain_value(&value) {
|
||||
if !self.might_contain_value(value) {
|
||||
return PredicateMatch::None; // no rows are going to match.
|
||||
}
|
||||
}
|
||||
|
@ -491,7 +491,7 @@ impl Column {
|
|||
// contain any null values, and the entire range of values satisfies the
|
||||
// predicate then the column doesn't need to be read.
|
||||
cmp::Operator::GT | cmp::Operator::GTE | cmp::Operator::LT | cmp::Operator::LTE => {
|
||||
if self.predicate_matches_all_values(&op, &value) {
|
||||
if self.predicate_matches_all_values(op, value) {
|
||||
return PredicateMatch::All;
|
||||
}
|
||||
}
|
||||
|
@ -500,13 +500,13 @@ impl Column {
|
|||
// column can't possibly contain `value` then the predicate must
|
||||
// match all rows on the column.
|
||||
cmp::Operator::NotEqual => {
|
||||
if !self.might_contain_value(&value) {
|
||||
if !self.might_contain_value(value) {
|
||||
return PredicateMatch::All; // all rows are going to match.
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if self.predicate_matches_no_values(&op, &value) {
|
||||
if self.predicate_matches_no_values(op, value) {
|
||||
return PredicateMatch::None;
|
||||
}
|
||||
|
||||
|
|
|
@ -210,7 +210,7 @@ impl Bool {
|
|||
pub fn row_ids_filter(&self, value: bool, op: &cmp::Operator, dst: RowIDs) -> RowIDs {
|
||||
match op {
|
||||
cmp::Operator::GT | cmp::Operator::GTE | cmp::Operator::LT | cmp::Operator::LTE => {
|
||||
self.row_ids_cmp_order(value, Self::ord_from_op(&op), dst)
|
||||
self.row_ids_cmp_order(value, Self::ord_from_op(op), dst)
|
||||
}
|
||||
_ => self.row_ids_equal(value, op, dst),
|
||||
}
|
||||
|
|
|
@ -398,8 +398,8 @@ where
|
|||
| (cmp::Operator::LT, cmp::Operator::GTE)
|
||||
| (cmp::Operator::LTE, cmp::Operator::GT)
|
||||
| (cmp::Operator::LTE, cmp::Operator::GTE) => self.row_ids_cmp_range_order(
|
||||
(&left.0, Self::ord_from_op(&left.1)),
|
||||
(&right.0, Self::ord_from_op(&right.1)),
|
||||
(&left.0, Self::ord_from_op(left.1)),
|
||||
(&right.0, Self::ord_from_op(right.1)),
|
||||
dst,
|
||||
),
|
||||
|
||||
|
|
|
@ -412,10 +412,10 @@ where
|
|||
fn row_ids_filter(&self, value: L, op: &cmp::Operator, dst: RowIDs) -> RowIDs {
|
||||
let value = self.transcoder.encode(value);
|
||||
match op {
|
||||
cmp::Operator::GT => self.row_ids_cmp_order(value, Self::ord_from_op(&op), dst),
|
||||
cmp::Operator::GTE => self.row_ids_cmp_order(value, Self::ord_from_op(&op), dst),
|
||||
cmp::Operator::LT => self.row_ids_cmp_order(value, Self::ord_from_op(&op), dst),
|
||||
cmp::Operator::LTE => self.row_ids_cmp_order(value, Self::ord_from_op(&op), dst),
|
||||
cmp::Operator::GT => self.row_ids_cmp_order(value, Self::ord_from_op(op), dst),
|
||||
cmp::Operator::GTE => self.row_ids_cmp_order(value, Self::ord_from_op(op), dst),
|
||||
cmp::Operator::LT => self.row_ids_cmp_order(value, Self::ord_from_op(op), dst),
|
||||
cmp::Operator::LTE => self.row_ids_cmp_order(value, Self::ord_from_op(op), dst),
|
||||
_ => self.row_ids_equal(value, op, dst),
|
||||
}
|
||||
}
|
||||
|
@ -438,8 +438,8 @@ where
|
|||
| (cmp::Operator::LT, cmp::Operator::GTE)
|
||||
| (cmp::Operator::LTE, cmp::Operator::GT)
|
||||
| (cmp::Operator::LTE, cmp::Operator::GTE) => self.row_ids_cmp_range_order(
|
||||
(left.0, Self::ord_from_op(&left.1)),
|
||||
(right.0, Self::ord_from_op(&right.1)),
|
||||
(left.0, Self::ord_from_op(left.1)),
|
||||
(right.0, Self::ord_from_op(right.1)),
|
||||
dst,
|
||||
),
|
||||
|
||||
|
|
|
@ -400,8 +400,8 @@ where
|
|||
| (cmp::Operator::LT, cmp::Operator::GTE)
|
||||
| (cmp::Operator::LTE, cmp::Operator::GT)
|
||||
| (cmp::Operator::LTE, cmp::Operator::GTE) => self.row_ids_cmp_range(
|
||||
(&left.0, Self::ord_from_op(&left.1)),
|
||||
(&right.0, Self::ord_from_op(&right.1)),
|
||||
(&left.0, Self::ord_from_op(left.1)),
|
||||
(&right.0, Self::ord_from_op(right.1)),
|
||||
dst,
|
||||
),
|
||||
|
||||
|
|
|
@ -430,7 +430,7 @@ mod test {
|
|||
enc.push_none(); // 9
|
||||
enc.push_additional(Some("south".to_string()), 2); // 10, 11
|
||||
|
||||
let ids = enc.row_ids_filter(&"east", &cmp::Operator::Equal, RowIDs::Vector(vec![]));
|
||||
let ids = enc.row_ids_filter("east", &cmp::Operator::Equal, RowIDs::Vector(vec![]));
|
||||
assert_eq!(
|
||||
ids,
|
||||
RowIDs::Vector(vec![0, 1, 2, 4, 5, 6, 7, 8]),
|
||||
|
@ -438,14 +438,14 @@ mod test {
|
|||
name
|
||||
);
|
||||
|
||||
let ids = enc.row_ids_filter(&"south", &cmp::Operator::Equal, RowIDs::Vector(vec![]));
|
||||
let ids = enc.row_ids_filter("south", &cmp::Operator::Equal, RowIDs::Vector(vec![]));
|
||||
assert_eq!(ids, RowIDs::Vector(vec![10, 11]), "{}", name);
|
||||
|
||||
let ids = enc.row_ids_filter(&"foo", &cmp::Operator::Equal, RowIDs::Vector(vec![]));
|
||||
let ids = enc.row_ids_filter("foo", &cmp::Operator::Equal, RowIDs::Vector(vec![]));
|
||||
assert!(ids.is_empty(), "{}", name);
|
||||
|
||||
// != some value not in the column should exclude the NULL value.
|
||||
let ids = enc.row_ids_filter(&"foo", &cmp::Operator::NotEqual, RowIDs::Vector(vec![]));
|
||||
let ids = enc.row_ids_filter("foo", &cmp::Operator::NotEqual, RowIDs::Vector(vec![]));
|
||||
assert_eq!(
|
||||
ids,
|
||||
RowIDs::Vector(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 10, 11]),
|
||||
|
@ -453,7 +453,7 @@ mod test {
|
|||
name
|
||||
);
|
||||
|
||||
let ids = enc.row_ids_filter(&"east", &cmp::Operator::NotEqual, RowIDs::Vector(vec![]));
|
||||
let ids = enc.row_ids_filter("east", &cmp::Operator::NotEqual, RowIDs::Vector(vec![]));
|
||||
assert_eq!(ids, RowIDs::Vector(vec![3, 10, 11]), "{}", name);
|
||||
}
|
||||
|
||||
|
@ -474,7 +474,7 @@ mod test {
|
|||
enc.push_additional(Some("east".to_string()), 2);
|
||||
enc.push_additional(Some("west".to_string()), 1);
|
||||
|
||||
let ids = enc.row_ids_filter(&"abba", &cmp::Operator::NotEqual, RowIDs::Vector(vec![]));
|
||||
let ids = enc.row_ids_filter("abba", &cmp::Operator::NotEqual, RowIDs::Vector(vec![]));
|
||||
assert_eq!(ids, RowIDs::Vector(vec![0, 1, 2]), "{}", name);
|
||||
}
|
||||
|
||||
|
@ -502,7 +502,7 @@ mod test {
|
|||
enc.push_none(); // 13
|
||||
enc.push_additional(Some("west".to_string()), 5); // 14, 15, 16, 17, 18
|
||||
|
||||
let ids = enc.row_ids_filter(&"east", &cmp::Operator::LTE, RowIDs::Vector(vec![]));
|
||||
let ids = enc.row_ids_filter("east", &cmp::Operator::LTE, RowIDs::Vector(vec![]));
|
||||
assert_eq!(
|
||||
ids,
|
||||
RowIDs::Vector(vec![0, 1, 2, 4, 5, 6, 7, 8]),
|
||||
|
@ -510,24 +510,24 @@ mod test {
|
|||
name
|
||||
);
|
||||
|
||||
let ids = enc.row_ids_filter(&"east", &cmp::Operator::LT, RowIDs::Vector(vec![]));
|
||||
let ids = enc.row_ids_filter("east", &cmp::Operator::LT, RowIDs::Vector(vec![]));
|
||||
assert!(ids.is_empty(), "{}", name);
|
||||
|
||||
let ids = enc.row_ids_filter(&"abc", &cmp::Operator::LTE, RowIDs::Vector(vec![]));
|
||||
let ids = enc.row_ids_filter("abc", &cmp::Operator::LTE, RowIDs::Vector(vec![]));
|
||||
assert!(ids.is_empty(), "{}", name);
|
||||
let ids = enc.row_ids_filter(&"abc", &cmp::Operator::LT, RowIDs::Vector(vec![]));
|
||||
let ids = enc.row_ids_filter("abc", &cmp::Operator::LT, RowIDs::Vector(vec![]));
|
||||
assert!(ids.is_empty(), "{}", name);
|
||||
|
||||
let ids = enc.row_ids_filter(&"zoo", &cmp::Operator::GTE, RowIDs::Vector(vec![]));
|
||||
let ids = enc.row_ids_filter("zoo", &cmp::Operator::GTE, RowIDs::Vector(vec![]));
|
||||
assert!(ids.is_empty(), "{}", name);
|
||||
|
||||
let ids = enc.row_ids_filter(&"zoo", &cmp::Operator::GT, RowIDs::Vector(vec![]));
|
||||
let ids = enc.row_ids_filter("zoo", &cmp::Operator::GT, RowIDs::Vector(vec![]));
|
||||
assert!(ids.is_empty(), "{}", name);
|
||||
|
||||
let ids = enc.row_ids_filter(&"west", &cmp::Operator::GT, RowIDs::Vector(vec![]));
|
||||
let ids = enc.row_ids_filter("west", &cmp::Operator::GT, RowIDs::Vector(vec![]));
|
||||
assert!(ids.is_empty(), "{}", name);
|
||||
|
||||
let ids = enc.row_ids_filter(&"north", &cmp::Operator::GT, RowIDs::Vector(vec![]));
|
||||
let ids = enc.row_ids_filter("north", &cmp::Operator::GT, RowIDs::Vector(vec![]));
|
||||
assert_eq!(
|
||||
ids,
|
||||
RowIDs::Vector(vec![9, 10, 11, 14, 15, 16, 17, 18]),
|
||||
|
@ -535,7 +535,7 @@ mod test {
|
|||
name
|
||||
);
|
||||
|
||||
let ids = enc.row_ids_filter(&"north", &cmp::Operator::GTE, RowIDs::Vector(vec![]));
|
||||
let ids = enc.row_ids_filter("north", &cmp::Operator::GTE, RowIDs::Vector(vec![]));
|
||||
assert_eq!(
|
||||
ids,
|
||||
RowIDs::Vector(vec![3, 9, 10, 11, 12, 14, 15, 16, 17, 18]),
|
||||
|
@ -545,7 +545,7 @@ mod test {
|
|||
|
||||
// The encoding also supports comparisons on values that don't directly exist in
|
||||
// the column.
|
||||
let ids = enc.row_ids_filter(&"abba", &cmp::Operator::GT, RowIDs::Vector(vec![]));
|
||||
let ids = enc.row_ids_filter("abba", &cmp::Operator::GT, RowIDs::Vector(vec![]));
|
||||
assert_eq!(
|
||||
ids,
|
||||
RowIDs::Vector(vec![
|
||||
|
@ -555,7 +555,7 @@ mod test {
|
|||
name
|
||||
);
|
||||
|
||||
let ids = enc.row_ids_filter(&"east1", &cmp::Operator::GT, RowIDs::Vector(vec![]));
|
||||
let ids = enc.row_ids_filter("east1", &cmp::Operator::GT, RowIDs::Vector(vec![]));
|
||||
assert_eq!(
|
||||
ids,
|
||||
RowIDs::Vector(vec![3, 9, 10, 11, 12, 14, 15, 16, 17, 18]),
|
||||
|
@ -563,7 +563,7 @@ mod test {
|
|||
name
|
||||
);
|
||||
|
||||
let ids = enc.row_ids_filter(&"east1", &cmp::Operator::GTE, RowIDs::Vector(vec![]));
|
||||
let ids = enc.row_ids_filter("east1", &cmp::Operator::GTE, RowIDs::Vector(vec![]));
|
||||
assert_eq!(
|
||||
ids,
|
||||
RowIDs::Vector(vec![3, 9, 10, 11, 12, 14, 15, 16, 17, 18]),
|
||||
|
@ -571,7 +571,7 @@ mod test {
|
|||
name
|
||||
);
|
||||
|
||||
let ids = enc.row_ids_filter(&"east1", &cmp::Operator::LTE, RowIDs::Vector(vec![]));
|
||||
let ids = enc.row_ids_filter("east1", &cmp::Operator::LTE, RowIDs::Vector(vec![]));
|
||||
assert_eq!(
|
||||
ids,
|
||||
RowIDs::Vector(vec![0, 1, 2, 4, 5, 6, 7, 8]),
|
||||
|
@ -579,7 +579,7 @@ mod test {
|
|||
name
|
||||
);
|
||||
|
||||
let ids = enc.row_ids_filter(&"region", &cmp::Operator::LT, RowIDs::Vector(vec![]));
|
||||
let ids = enc.row_ids_filter("region", &cmp::Operator::LT, RowIDs::Vector(vec![]));
|
||||
assert_eq!(
|
||||
ids,
|
||||
RowIDs::Vector(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 12]),
|
||||
|
@ -587,7 +587,7 @@ mod test {
|
|||
name
|
||||
);
|
||||
|
||||
let ids = enc.row_ids_filter(&"region", &cmp::Operator::LTE, RowIDs::Vector(vec![]));
|
||||
let ids = enc.row_ids_filter("region", &cmp::Operator::LTE, RowIDs::Vector(vec![]));
|
||||
assert_eq!(
|
||||
ids,
|
||||
RowIDs::Vector(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 12]),
|
||||
|
@ -595,7 +595,7 @@ mod test {
|
|||
name
|
||||
);
|
||||
|
||||
let ids = enc.row_ids_filter(&"zoo", &cmp::Operator::LT, RowIDs::Vector(vec![]));
|
||||
let ids = enc.row_ids_filter("zoo", &cmp::Operator::LT, RowIDs::Vector(vec![]));
|
||||
assert_eq!(
|
||||
ids,
|
||||
RowIDs::Vector(vec![
|
||||
|
@ -605,7 +605,7 @@ mod test {
|
|||
name
|
||||
);
|
||||
|
||||
let ids = enc.row_ids_filter(&"zoo", &cmp::Operator::LTE, RowIDs::Vector(vec![]));
|
||||
let ids = enc.row_ids_filter("zoo", &cmp::Operator::LTE, RowIDs::Vector(vec![]));
|
||||
assert_eq!(
|
||||
ids,
|
||||
RowIDs::Vector(vec![
|
||||
|
@ -627,34 +627,34 @@ mod test {
|
|||
let name = enc.debug_name();
|
||||
enc.push("east".to_string());
|
||||
|
||||
let ids = enc.row_ids_filter(&"east", &cmp::Operator::GT, RowIDs::Vector(vec![]));
|
||||
let ids = enc.row_ids_filter("east", &cmp::Operator::GT, RowIDs::Vector(vec![]));
|
||||
assert!(ids.is_empty(), "{}", name);
|
||||
|
||||
let ids = enc.row_ids_filter(&"north", &cmp::Operator::GTE, RowIDs::Vector(vec![]));
|
||||
let ids = enc.row_ids_filter("north", &cmp::Operator::GTE, RowIDs::Vector(vec![]));
|
||||
assert!(ids.is_empty(), "{}", name);
|
||||
|
||||
let ids = enc.row_ids_filter(&"east", &cmp::Operator::LT, RowIDs::Vector(vec![]));
|
||||
let ids = enc.row_ids_filter("east", &cmp::Operator::LT, RowIDs::Vector(vec![]));
|
||||
assert!(ids.is_empty(), "{}", name);
|
||||
|
||||
let ids = enc.row_ids_filter(&"abc", &cmp::Operator::LTE, RowIDs::Vector(vec![]));
|
||||
let ids = enc.row_ids_filter("abc", &cmp::Operator::LTE, RowIDs::Vector(vec![]));
|
||||
assert!(ids.is_empty(), "{}", name);
|
||||
|
||||
let ids = enc.row_ids_filter(&"abc", &cmp::Operator::GT, RowIDs::Vector(vec![]));
|
||||
let ids = enc.row_ids_filter("abc", &cmp::Operator::GT, RowIDs::Vector(vec![]));
|
||||
assert_eq!(ids, RowIDs::Vector(vec![0]), "{}", name);
|
||||
|
||||
let ids = enc.row_ids_filter(&"abc", &cmp::Operator::GTE, RowIDs::Vector(vec![]));
|
||||
let ids = enc.row_ids_filter("abc", &cmp::Operator::GTE, RowIDs::Vector(vec![]));
|
||||
assert_eq!(ids, RowIDs::Vector(vec![0]), "{}", name);
|
||||
|
||||
let ids = enc.row_ids_filter(&"east", &cmp::Operator::GTE, RowIDs::Vector(vec![]));
|
||||
let ids = enc.row_ids_filter("east", &cmp::Operator::GTE, RowIDs::Vector(vec![]));
|
||||
assert_eq!(ids, RowIDs::Vector(vec![0]), "{}", name);
|
||||
|
||||
let ids = enc.row_ids_filter(&"east", &cmp::Operator::LTE, RowIDs::Vector(vec![]));
|
||||
let ids = enc.row_ids_filter("east", &cmp::Operator::LTE, RowIDs::Vector(vec![]));
|
||||
assert_eq!(ids, RowIDs::Vector(vec![0]), "{}", name);
|
||||
|
||||
let ids = enc.row_ids_filter(&"west", &cmp::Operator::LTE, RowIDs::Vector(vec![]));
|
||||
let ids = enc.row_ids_filter("west", &cmp::Operator::LTE, RowIDs::Vector(vec![]));
|
||||
assert_eq!(ids, RowIDs::Vector(vec![0]), "{}", name);
|
||||
|
||||
let ids = enc.row_ids_filter(&"west", &cmp::Operator::LT, RowIDs::Vector(vec![]));
|
||||
let ids = enc.row_ids_filter("west", &cmp::Operator::LT, RowIDs::Vector(vec![]));
|
||||
assert_eq!(ids, RowIDs::Vector(vec![0]), "{}", name);
|
||||
}
|
||||
}
|
||||
|
@ -677,34 +677,34 @@ mod test {
|
|||
enc.push_none();
|
||||
enc.push("east".to_string());
|
||||
|
||||
let ids = enc.row_ids_filter(&"west", &cmp::Operator::GT, RowIDs::Vector(vec![]));
|
||||
let ids = enc.row_ids_filter("west", &cmp::Operator::GT, RowIDs::Vector(vec![]));
|
||||
assert!(ids.is_empty(), "{}", name);
|
||||
|
||||
let ids = enc.row_ids_filter(&"zoo", &cmp::Operator::GTE, RowIDs::Vector(vec![]));
|
||||
let ids = enc.row_ids_filter("zoo", &cmp::Operator::GTE, RowIDs::Vector(vec![]));
|
||||
assert!(ids.is_empty(), "{}", name);
|
||||
|
||||
let ids = enc.row_ids_filter(&"east", &cmp::Operator::LT, RowIDs::Vector(vec![]));
|
||||
let ids = enc.row_ids_filter("east", &cmp::Operator::LT, RowIDs::Vector(vec![]));
|
||||
assert!(ids.is_empty(), "{}", name);
|
||||
|
||||
let ids = enc.row_ids_filter(&"abc", &cmp::Operator::LTE, RowIDs::Vector(vec![]));
|
||||
let ids = enc.row_ids_filter("abc", &cmp::Operator::LTE, RowIDs::Vector(vec![]));
|
||||
assert!(ids.is_empty(), "{}", name);
|
||||
|
||||
let ids = enc.row_ids_filter(&"abc", &cmp::Operator::GT, RowIDs::Vector(vec![]));
|
||||
let ids = enc.row_ids_filter("abc", &cmp::Operator::GT, RowIDs::Vector(vec![]));
|
||||
assert_eq!(ids, RowIDs::Vector(vec![0, 1, 3]), "{}", name);
|
||||
|
||||
let ids = enc.row_ids_filter(&"abc", &cmp::Operator::GTE, RowIDs::Vector(vec![]));
|
||||
let ids = enc.row_ids_filter("abc", &cmp::Operator::GTE, RowIDs::Vector(vec![]));
|
||||
assert_eq!(ids, RowIDs::Vector(vec![0, 1, 3]), "{}", name);
|
||||
|
||||
let ids = enc.row_ids_filter(&"west", &cmp::Operator::GTE, RowIDs::Vector(vec![]));
|
||||
let ids = enc.row_ids_filter("west", &cmp::Operator::GTE, RowIDs::Vector(vec![]));
|
||||
assert_eq!(ids, RowIDs::Vector(vec![1]), "{}", name);
|
||||
|
||||
let ids = enc.row_ids_filter(&"east", &cmp::Operator::LTE, RowIDs::Vector(vec![]));
|
||||
let ids = enc.row_ids_filter("east", &cmp::Operator::LTE, RowIDs::Vector(vec![]));
|
||||
assert_eq!(ids, RowIDs::Vector(vec![3]), "{}", name);
|
||||
|
||||
let ids = enc.row_ids_filter(&"west", &cmp::Operator::LTE, RowIDs::Vector(vec![]));
|
||||
let ids = enc.row_ids_filter("west", &cmp::Operator::LTE, RowIDs::Vector(vec![]));
|
||||
assert_eq!(ids, RowIDs::Vector(vec![0, 1, 3]), "{}", name);
|
||||
|
||||
let ids = enc.row_ids_filter(&"west", &cmp::Operator::LT, RowIDs::Vector(vec![]));
|
||||
let ids = enc.row_ids_filter("west", &cmp::Operator::LT, RowIDs::Vector(vec![]));
|
||||
assert_eq!(ids, RowIDs::Vector(vec![0, 3]), "{}", name);
|
||||
}
|
||||
}
|
||||
|
@ -809,12 +809,7 @@ mod test {
|
|||
enc.push_none();
|
||||
enc.push_additional(Some("zoo".to_string()), 1);
|
||||
|
||||
assert_eq!(
|
||||
enc.dictionary(),
|
||||
vec![&"east".to_string(), &"west".to_string(), &"zoo".to_string()],
|
||||
"{}",
|
||||
name
|
||||
);
|
||||
assert_eq!(enc.dictionary(), vec!["east", "west", "zoo"], "{}", name);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
|
|
@ -37,10 +37,10 @@ pub const TIME_COLUMN_NAME: &str = internal_types::schema::TIME_COLUMN_NAME;
|
|||
#[derive(Debug, Snafu)]
|
||||
pub enum Error {
|
||||
#[snafu(display("arrow conversion error: {}", source))]
|
||||
ArrowError { source: arrow::error::ArrowError },
|
||||
ArrowConversion { source: arrow::error::ArrowError },
|
||||
|
||||
#[snafu(display("schema conversion error: {}", source))]
|
||||
SchemaError {
|
||||
SchemaConversion {
|
||||
source: internal_types::schema::builder::Error,
|
||||
},
|
||||
|
||||
|
@ -256,7 +256,7 @@ impl RowGroup {
|
|||
columns: &[ColumnName<'_>],
|
||||
predicates: &Predicate,
|
||||
) -> ReadFilterResult<'_> {
|
||||
let select_columns = self.meta.schema_for_column_names(&columns);
|
||||
let select_columns = self.meta.schema_for_column_names(columns);
|
||||
assert_eq!(select_columns.len(), columns.len());
|
||||
|
||||
let schema = ResultSchema {
|
||||
|
@ -540,11 +540,11 @@ impl RowGroup {
|
|||
// single 128-bit integer as the group key. If grouping is on more than
|
||||
// four columns then a fallback to using an vector as a key will happen.
|
||||
if dst.schema.group_columns.len() <= 4 {
|
||||
self.read_group_hash_with_u128_key(dst, &groupby_encoded_ids, aggregate_columns_data);
|
||||
self.read_group_hash_with_u128_key(dst, groupby_encoded_ids, aggregate_columns_data);
|
||||
return;
|
||||
}
|
||||
|
||||
self.read_group_hash_with_vec_key(dst, &groupby_encoded_ids, aggregate_columns_data);
|
||||
self.read_group_hash_with_vec_key(dst, groupby_encoded_ids, aggregate_columns_data);
|
||||
}
|
||||
|
||||
// This function is used with `read_group_hash` when the number of columns
|
||||
|
@ -1709,7 +1709,7 @@ impl TryFrom<ReadFilterResult<'_>> for RecordBatch {
|
|||
|
||||
fn try_from(result: ReadFilterResult<'_>) -> Result<Self, Self::Error> {
|
||||
let schema = internal_types::schema::Schema::try_from(result.schema())
|
||||
.map_err(|source| Error::SchemaError { source })?;
|
||||
.map_err(|source| Error::SchemaConversion { source })?;
|
||||
|
||||
let columns: Vec<ArrayRef> = result
|
||||
.data
|
||||
|
@ -1754,7 +1754,7 @@ impl TryFrom<ReadFilterResult<'_>> for RecordBatch {
|
|||
// try_new only returns an error if the schema is invalid or the number
|
||||
// of rows on columns differ. We have full control over both so there
|
||||
// should never be an error to return...
|
||||
Self::try_new(arrow_schema, columns).map_err(|source| Error::ArrowError { source })
|
||||
Self::try_new(arrow_schema, columns).map_err(|source| Error::ArrowConversion { source })
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2185,7 +2185,7 @@ impl TryFrom<ReadAggregateResult<'_>> for RecordBatch {
|
|||
|
||||
fn try_from(mut result: ReadAggregateResult<'_>) -> Result<Self, Self::Error> {
|
||||
let schema = internal_types::schema::Schema::try_from(result.schema())
|
||||
.map_err(|source| Error::SchemaError { source })?;
|
||||
.map_err(|source| Error::SchemaConversion { source })?;
|
||||
let arrow_schema: arrow::datatypes::SchemaRef = schema.into();
|
||||
|
||||
// Add the group columns to the set of column data for the record batch.
|
||||
|
@ -2257,7 +2257,7 @@ impl TryFrom<ReadAggregateResult<'_>> for RecordBatch {
|
|||
// try_new only returns an error if the schema is invalid or the number
|
||||
// of rows on columns differ. We have full control over both so there
|
||||
// should never be an error to return...
|
||||
Self::try_new(arrow_schema, columns).context(ArrowError)
|
||||
Self::try_new(arrow_schema, columns).context(ArrowConversion)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -19,10 +19,10 @@ use std::{
|
|||
#[derive(Debug, Snafu)]
|
||||
pub enum Error {
|
||||
#[snafu(display("cannot drop last row group in table; drop table"))]
|
||||
EmptyTableError {},
|
||||
EmptyTable {},
|
||||
|
||||
#[snafu(display("table does not have InfluxDB timestamp column"))]
|
||||
NoTimestampColumnError {},
|
||||
NoTimestampColumn {},
|
||||
|
||||
#[snafu(display("unsupported column operation on {}: {}", column_name, msg))]
|
||||
UnsupportedColumnOperation { msg: String, column_name: String },
|
||||
|
@ -114,7 +114,7 @@ impl Table {
|
|||
let mut row_groups = self.table_data.write();
|
||||
|
||||
// Tables must always have at least one row group.
|
||||
ensure!(row_groups.data.len() > 1, EmptyTableError);
|
||||
ensure!(row_groups.data.len() > 1, EmptyTable);
|
||||
|
||||
row_groups.data.remove(position); // removes row group data
|
||||
row_groups.meta = Arc::new(MetaData::from(row_groups.data.as_ref())); // rebuild meta
|
||||
|
@ -261,7 +261,7 @@ impl Table {
|
|||
}
|
||||
|
||||
// row group could potentially satisfy predicate
|
||||
row_groups.push(Arc::clone(&rg));
|
||||
row_groups.push(Arc::clone(rg));
|
||||
}
|
||||
|
||||
(Arc::clone(&table_data.meta), row_groups)
|
||||
|
@ -776,7 +776,7 @@ impl From<&[Arc<RowGroup>]> for MetaData {
|
|||
|
||||
let mut meta = Self::new(&row_groups[0]);
|
||||
for row_group in row_groups.iter().skip(1) {
|
||||
meta = Self::update_with(meta, &row_group);
|
||||
meta = Self::update_with(meta, row_group);
|
||||
}
|
||||
|
||||
meta
|
||||
|
|
|
@ -105,7 +105,7 @@ impl Config {
|
|||
state.reservations.insert(db_name.clone());
|
||||
Ok(DatabaseHandle {
|
||||
state: Some(Arc::new(DatabaseState::Known { db_name })),
|
||||
config: &self,
|
||||
config: self,
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -139,7 +139,7 @@ impl Config {
|
|||
state.reservations.insert(db_name);
|
||||
Ok(DatabaseHandle {
|
||||
state: Some(db_state),
|
||||
config: &self,
|
||||
config: self,
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -167,7 +167,7 @@ impl Config {
|
|||
state.reservations.insert(db_name.clone());
|
||||
Ok(BlockDatabaseGuard {
|
||||
db_name: Some(db_name),
|
||||
config: &self,
|
||||
config: self,
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -364,15 +364,15 @@ impl DatabaseState {
|
|||
|
||||
fn db_any_state(&self) -> Option<Arc<Db>> {
|
||||
match self {
|
||||
DatabaseState::Replay { db, .. } => Some(Arc::clone(&db)),
|
||||
DatabaseState::Initialized { db, .. } => Some(Arc::clone(&db)),
|
||||
DatabaseState::Replay { db, .. } => Some(Arc::clone(db)),
|
||||
DatabaseState::Initialized { db, .. } => Some(Arc::clone(db)),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
fn db_initialized(&self) -> Option<Arc<Db>> {
|
||||
match self {
|
||||
DatabaseState::Initialized { db, .. } => Some(Arc::clone(&db)),
|
||||
DatabaseState::Initialized { db, .. } => Some(Arc::clone(db)),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
@ -389,7 +389,7 @@ impl DatabaseState {
|
|||
fn rules(&self) -> Option<Arc<DatabaseRules>> {
|
||||
match self {
|
||||
DatabaseState::Known { .. } => None,
|
||||
DatabaseState::RulesLoaded { rules, .. } => Some(Arc::clone(&rules)),
|
||||
DatabaseState::RulesLoaded { rules, .. } => Some(Arc::clone(rules)),
|
||||
DatabaseState::Replay { db, .. } => Some(db.rules()),
|
||||
DatabaseState::Initialized { db, .. } => Some(db.rules()),
|
||||
}
|
||||
|
@ -445,7 +445,7 @@ pub(crate) struct DatabaseHandle<'a> {
|
|||
|
||||
impl<'a> DatabaseHandle<'a> {
|
||||
fn state(&self) -> Arc<DatabaseState> {
|
||||
Arc::clone(&self.state.as_ref().expect("not committed"))
|
||||
Arc::clone(self.state.as_ref().expect("not committed"))
|
||||
}
|
||||
|
||||
/// Get current [`DatabaseStateCode`] associated with this handle.
|
||||
|
@ -548,7 +548,7 @@ impl<'a> DatabaseHandle<'a> {
|
|||
exec: Arc::clone(&self.config.exec),
|
||||
preserved_catalog,
|
||||
catalog,
|
||||
rules: Arc::clone(&rules),
|
||||
rules: Arc::clone(rules),
|
||||
write_buffer,
|
||||
};
|
||||
let db = Arc::new(Db::new(database_to_commit, Arc::clone(&self.config.jobs)));
|
||||
|
@ -576,7 +576,7 @@ impl<'a> DatabaseHandle<'a> {
|
|||
|
||||
let shutdown = self.config.shutdown.child_token();
|
||||
let shutdown_captured = shutdown.clone();
|
||||
let db_captured = Arc::clone(&db);
|
||||
let db_captured = Arc::clone(db);
|
||||
let rules = db.rules();
|
||||
|
||||
let handle = Some(tokio::spawn(async move {
|
||||
|
@ -587,7 +587,7 @@ impl<'a> DatabaseHandle<'a> {
|
|||
}));
|
||||
|
||||
self.state = Some(Arc::new(DatabaseState::Initialized {
|
||||
db: Arc::clone(&db),
|
||||
db: Arc::clone(db),
|
||||
handle,
|
||||
shutdown,
|
||||
}));
|
||||
|
|
|
@ -546,7 +546,7 @@ impl Db {
|
|||
// assume the locks have to possibly live across the `await`
|
||||
let fut = {
|
||||
let partition = self.partition(table_name, partition_key)?;
|
||||
let partition = LockableCatalogPartition::new(Arc::clone(&self), partition);
|
||||
let partition = LockableCatalogPartition::new(Arc::clone(self), partition);
|
||||
|
||||
// Do lock dance to get a write lock on the partition as well
|
||||
// as on the to-be-dropped chunk.
|
||||
|
@ -603,7 +603,7 @@ impl Db {
|
|||
// assume the locks have to possibly live across the `await`
|
||||
let fut = {
|
||||
let partition = self.partition(table_name, partition_key)?;
|
||||
let partition = LockableCatalogPartition::new(Arc::clone(&self), partition);
|
||||
let partition = LockableCatalogPartition::new(Arc::clone(self), partition);
|
||||
|
||||
// Do lock dance to get a write lock on the partition as well
|
||||
// as on all of the chunks
|
||||
|
@ -637,7 +637,7 @@ impl Db {
|
|||
// assume the locks have to possibly live across the `await`
|
||||
let fut = {
|
||||
let partition = self.partition(table_name, partition_key)?;
|
||||
let partition = LockableCatalogPartition::new(Arc::clone(&self), partition);
|
||||
let partition = LockableCatalogPartition::new(Arc::clone(self), partition);
|
||||
let partition = partition.read();
|
||||
|
||||
let chunks = LockablePartition::chunks(&partition);
|
||||
|
@ -752,7 +752,7 @@ impl Db {
|
|||
tokio::join!(
|
||||
// lifecycle policy loop
|
||||
async {
|
||||
let mut policy = ::lifecycle::LifecyclePolicy::new(ArcDb(Arc::clone(&self)));
|
||||
let mut policy = ::lifecycle::LifecyclePolicy::new(ArcDb(Arc::clone(self)));
|
||||
|
||||
while !shutdown.is_cancelled() {
|
||||
self.worker_iterations_lifecycle
|
||||
|
@ -2908,7 +2908,7 @@ mod tests {
|
|||
|
||||
for (expected_summary, actual_summary) in expected.iter().zip(chunk_summaries.iter()) {
|
||||
assert!(
|
||||
expected_summary.equal_without_timestamps(&actual_summary),
|
||||
expected_summary.equal_without_timestamps(actual_summary),
|
||||
"expected:\n{:#?}\n\nactual:{:#?}\n\n",
|
||||
expected_summary,
|
||||
actual_summary
|
||||
|
@ -3218,7 +3218,7 @@ mod tests {
|
|||
|
||||
for (expected_summary, actual_summary) in expected.iter().zip(chunk_summaries.iter()) {
|
||||
assert!(
|
||||
expected_summary.equal_without_timestamps(&actual_summary),
|
||||
expected_summary.equal_without_timestamps(actual_summary),
|
||||
"\n\nexpected item:\n{:#?}\n\nactual item:\n{:#?}\n\n\
|
||||
all expected:\n{:#?}\n\nall actual:\n{:#?}",
|
||||
expected_summary,
|
||||
|
@ -4049,7 +4049,7 @@ mod tests {
|
|||
|
||||
object_store
|
||||
.put(
|
||||
&path,
|
||||
path,
|
||||
futures::stream::once(async move { Ok(data) }),
|
||||
Some(len),
|
||||
)
|
||||
|
|
|
@ -214,7 +214,7 @@ impl Catalog {
|
|||
});
|
||||
|
||||
let partition = table.get_or_create_partition(partition_key);
|
||||
(Arc::clone(&partition), table.schema())
|
||||
(Arc::clone(partition), table.schema())
|
||||
}
|
||||
|
||||
/// Returns a list of summaries for each partition.
|
||||
|
|
|
@ -791,7 +791,7 @@ impl CatalogChunk {
|
|||
meta,
|
||||
..
|
||||
} => {
|
||||
let meta = Arc::clone(&meta);
|
||||
let meta = Arc::clone(meta);
|
||||
match &representation {
|
||||
ChunkStageFrozenRepr::MutableBufferSnapshot(_) => {
|
||||
// Should always be in RUB once persisted
|
||||
|
@ -804,7 +804,7 @@ impl CatalogChunk {
|
|||
.fail()
|
||||
}
|
||||
ChunkStageFrozenRepr::ReadBuffer(repr) => {
|
||||
let db = Arc::clone(&repr);
|
||||
let db = Arc::clone(repr);
|
||||
self.finish_lifecycle_action(ChunkLifecycleAction::Persisting)?;
|
||||
|
||||
self.metrics
|
||||
|
|
|
@ -307,7 +307,7 @@ pub(super) struct TimestampHistogram {
|
|||
|
||||
impl TimestampHistogram {
|
||||
pub(super) fn add(&self, summary: &TimestampSummary) {
|
||||
self.inner.lock().merge(&summary)
|
||||
self.inner.lock().merge(summary)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -27,6 +27,7 @@ use std::{
|
|||
sync::Arc,
|
||||
};
|
||||
|
||||
#[allow(clippy::enum_variant_names)]
|
||||
#[derive(Debug, Snafu)]
|
||||
pub enum Error {
|
||||
#[snafu(display("Mutable Buffer Chunk Error: {}", source))]
|
||||
|
@ -133,7 +134,7 @@ impl DbChunk {
|
|||
partition_key,
|
||||
},
|
||||
};
|
||||
(state, Arc::clone(&meta))
|
||||
(state, Arc::clone(meta))
|
||||
}
|
||||
ChunkStage::Persisted {
|
||||
parquet,
|
||||
|
@ -148,10 +149,10 @@ impl DbChunk {
|
|||
}
|
||||
} else {
|
||||
State::ParquetFile {
|
||||
chunk: Arc::clone(&parquet),
|
||||
chunk: Arc::clone(parquet),
|
||||
}
|
||||
};
|
||||
(state, Arc::clone(&meta))
|
||||
(state, Arc::clone(meta))
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -176,9 +177,9 @@ impl DbChunk {
|
|||
|
||||
let (state, meta) = match chunk.stage() {
|
||||
ChunkStage::Persisted { parquet, meta, .. } => {
|
||||
let chunk = Arc::clone(&parquet);
|
||||
let chunk = Arc::clone(parquet);
|
||||
let state = State::ParquetFile { chunk };
|
||||
(state, Arc::clone(&meta))
|
||||
(state, Arc::clone(meta))
|
||||
}
|
||||
_ => {
|
||||
panic!("Internal error: This chunk's stage is not Persisted");
|
||||
|
@ -270,7 +271,7 @@ impl QueryChunk for DbChunk {
|
|||
// meta-data only. A future improvement could be to apply this
|
||||
// logic to chunk meta-data without involving the backing
|
||||
// execution engine.
|
||||
let rb_predicate = match to_read_buffer_predicate(&predicate) {
|
||||
let rb_predicate = match to_read_buffer_predicate(predicate) {
|
||||
Ok(rb_predicate) => rb_predicate,
|
||||
Err(e) => {
|
||||
debug!(?predicate, %e, "read buffer predicate not supported for table_names, falling back");
|
||||
|
@ -327,7 +328,7 @@ impl QueryChunk for DbChunk {
|
|||
State::ReadBuffer { chunk, .. } => {
|
||||
// Only apply pushdownable predicates
|
||||
let rb_predicate =
|
||||
match to_read_buffer_predicate(&predicate).context(PredicateConversion) {
|
||||
match to_read_buffer_predicate(predicate).context(PredicateConversion) {
|
||||
Ok(predicate) => predicate,
|
||||
Err(_) => read_buffer::Predicate::default(),
|
||||
};
|
||||
|
@ -372,7 +373,7 @@ impl QueryChunk for DbChunk {
|
|||
Ok(chunk.column_names(columns))
|
||||
}
|
||||
State::ReadBuffer { chunk, .. } => {
|
||||
let rb_predicate = match to_read_buffer_predicate(&predicate) {
|
||||
let rb_predicate = match to_read_buffer_predicate(predicate) {
|
||||
Ok(rb_predicate) => rb_predicate,
|
||||
Err(e) => {
|
||||
debug!(?predicate, %e, "read buffer predicate not supported for column_names, falling back");
|
||||
|
|
|
@ -5,6 +5,7 @@ use data_types::chunk_metadata::ChunkAddr;
|
|||
|
||||
use crate::db::catalog;
|
||||
|
||||
#[allow(clippy::enum_variant_names)]
|
||||
#[derive(Debug, Snafu)]
|
||||
// Export the snafu "selectors" so they can be used in other modules
|
||||
#[snafu(visibility = "pub")]
|
||||
|
|
|
@ -11,6 +11,7 @@ use write_buffer::config::WriteBufferConfig;
|
|||
|
||||
use crate::Db;
|
||||
|
||||
#[allow(clippy::enum_variant_names)]
|
||||
#[derive(Debug, Snafu)]
|
||||
pub enum Error {
|
||||
#[snafu(display("Cannot seek sequencer {} during replay: {}", sequencer_id, source))]
|
||||
|
@ -382,7 +383,7 @@ mod tests {
|
|||
// start background worker
|
||||
let shutdown: CancellationToken = Default::default();
|
||||
let shutdown_captured = shutdown.clone();
|
||||
let db_captured = Arc::clone(&db);
|
||||
let db_captured = Arc::clone(db);
|
||||
let join_handle = tokio::spawn(async move {
|
||||
db_captured.background_worker(shutdown_captured).await
|
||||
});
|
||||
|
@ -480,7 +481,7 @@ mod tests {
|
|||
)
|
||||
}
|
||||
Check::Query(query, expected) => {
|
||||
let db = Arc::clone(&db);
|
||||
let db = Arc::clone(db);
|
||||
let planner = SqlQueryPlanner::default();
|
||||
let executor = db.executor();
|
||||
|
||||
|
|
|
@ -25,6 +25,7 @@ use crate::{
|
|||
|
||||
const STORE_ERROR_PAUSE_SECONDS: u64 = 100;
|
||||
|
||||
#[allow(clippy::enum_variant_names)]
|
||||
#[derive(Debug, Snafu)]
|
||||
pub enum Error {
|
||||
#[snafu(display("cannot load catalog: {}", source))]
|
||||
|
|
|
@ -573,7 +573,7 @@ where
|
|||
server_id: config.server_id(),
|
||||
})
|
||||
}
|
||||
ServerStage::Initialized { config, .. } => Ok(Arc::clone(&config)),
|
||||
ServerStage::Initialized { config, .. } => Ok(Arc::clone(config)),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -584,7 +584,7 @@ where
|
|||
ServerStage::Startup { .. } => Err(Error::IdNotSet),
|
||||
ServerStage::InitReady { config, .. }
|
||||
| ServerStage::Initializing { config, .. }
|
||||
| ServerStage::Initialized { config, .. } => Ok(Arc::clone(&config)),
|
||||
| ServerStage::Initialized { config, .. } => Ok(Arc::clone(config)),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -866,7 +866,7 @@ where
|
|||
.await?
|
||||
}
|
||||
None => {
|
||||
self.write_entry_local(&db_name, db, sharded_entry.entry)
|
||||
self.write_entry_local(db_name, db, sharded_entry.entry)
|
||||
.await?
|
||||
}
|
||||
}
|
||||
|
@ -931,7 +931,7 @@ where
|
|||
}
|
||||
Ok(remote) => {
|
||||
return remote
|
||||
.write_entry(&db_name, entry)
|
||||
.write_entry(db_name, entry)
|
||||
.await
|
||||
.context(RemoteError)
|
||||
}
|
||||
|
|
|
@ -84,25 +84,25 @@ async fn setup(object_store: Arc<ObjectStore>, done: &Mutex<bool>) {
|
|||
|
||||
for table_name in &table_names {
|
||||
let chunk = db
|
||||
.rollover_partition(&table_name, partition_key)
|
||||
.rollover_partition(table_name, partition_key)
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
|
||||
db.move_chunk_to_read_buffer(&table_name, partition_key, chunk.id())
|
||||
db.move_chunk_to_read_buffer(table_name, partition_key, chunk.id())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let chunk = db
|
||||
.persist_partition(
|
||||
&table_name,
|
||||
table_name,
|
||||
partition_key,
|
||||
Instant::now() + Duration::from_secs(1),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
db.unload_read_buffer(&table_name, partition_key, chunk.id())
|
||||
db.unload_read_buffer(table_name, partition_key, chunk.id())
|
||||
.unwrap();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -108,7 +108,7 @@ async fn run_tag_values_query(
|
|||
predicate: Predicate,
|
||||
) {
|
||||
let plan = planner
|
||||
.tag_values(db, &tag_key, predicate)
|
||||
.tag_values(db, tag_key, predicate)
|
||||
.expect("built plan successfully");
|
||||
let names = executor.to_string_set(plan).await.expect(
|
||||
"converted plan to strings
|
||||
|
|
|
@ -18,6 +18,7 @@ mod catalog;
|
|||
mod chunk;
|
||||
mod partition;
|
||||
|
||||
#[allow(clippy::enum_variant_names)]
|
||||
#[derive(Debug, Error)]
|
||||
pub enum Error {
|
||||
#[error("Error creating database: {0}")]
|
||||
|
|
|
@ -9,6 +9,7 @@ use influxdb_iox_client::{
|
|||
use snafu::{ResultExt, Snafu};
|
||||
use structopt::StructOpt;
|
||||
|
||||
#[allow(clippy::enum_variant_names)]
|
||||
#[derive(Debug, Snafu)]
|
||||
pub enum Error {
|
||||
#[snafu(display("Error connection to IOx: {}", source))]
|
||||
|
|
|
@ -9,6 +9,7 @@ use std::convert::TryFrom;
|
|||
use structopt::StructOpt;
|
||||
use thiserror::Error;
|
||||
|
||||
#[allow(clippy::enum_variant_names)]
|
||||
#[derive(Debug, Error)]
|
||||
pub enum Error {
|
||||
#[error("Error listing chunks: {0}")]
|
||||
|
|
|
@ -13,6 +13,7 @@ use std::convert::{TryFrom, TryInto};
|
|||
use structopt::StructOpt;
|
||||
use thiserror::Error;
|
||||
|
||||
#[allow(clippy::enum_variant_names)]
|
||||
#[derive(Debug, Error)]
|
||||
pub enum Error {
|
||||
#[error("Error listing partitions: {0}")]
|
||||
|
|
|
@ -9,6 +9,7 @@ use std::convert::TryInto;
|
|||
use structopt::StructOpt;
|
||||
use thiserror::Error;
|
||||
|
||||
#[allow(clippy::enum_variant_names)]
|
||||
#[derive(Debug, Error)]
|
||||
pub enum Error {
|
||||
#[error("Error connecting to IOx: {0}")]
|
||||
|
|
|
@ -7,6 +7,7 @@ use crate::commands::server_remote;
|
|||
use structopt::StructOpt;
|
||||
use thiserror::Error;
|
||||
|
||||
#[allow(clippy::enum_variant_names)]
|
||||
#[derive(Debug, Error)]
|
||||
pub enum Error {
|
||||
#[error("Remote: {0}")]
|
||||
|
|
|
@ -4,6 +4,7 @@ use thiserror::Error;
|
|||
|
||||
use prettytable::{format, Cell, Row, Table};
|
||||
|
||||
#[allow(clippy::enum_variant_names)]
|
||||
#[derive(Debug, Error)]
|
||||
pub enum Error {
|
||||
#[error("Error connecting to IOx: {0}")]
|
||||
|
|
|
@ -231,7 +231,7 @@ impl Repl {
|
|||
Some(QueryEngine::Remote(db_name)) => {
|
||||
info!(%db_name, %sql, "Running sql on remote database");
|
||||
|
||||
scrape_query(&mut self.flight_client, &db_name, &sql).await?
|
||||
scrape_query(&mut self.flight_client, db_name, &sql).await?
|
||||
}
|
||||
Some(QueryEngine::Observer(observer)) => {
|
||||
info!("Running sql on local observer");
|
||||
|
|
|
@ -29,7 +29,7 @@ impl TryInto<ReplCommand> for String {
|
|||
let raw_commands = self
|
||||
.trim()
|
||||
// chop off trailing semicolon
|
||||
.strip_suffix(";")
|
||||
.strip_suffix(';')
|
||||
.unwrap_or(&self)
|
||||
// tokenize on whitespace
|
||||
.split(' ')
|
||||
|
|
|
@ -483,7 +483,7 @@ where
|
|||
max_request_size,
|
||||
} = req.data::<Server<M>>().expect("server state");
|
||||
let max_request_size = *max_request_size;
|
||||
let server = Arc::clone(&server);
|
||||
let server = Arc::clone(server);
|
||||
|
||||
// TODO(edd): figure out best way of catching all errors in this observation.
|
||||
let obs = server.metrics().http_requests.observation(); // instrument request
|
||||
|
|
|
@ -104,7 +104,7 @@ where
|
|||
testing::make_server(),
|
||||
storage::make_server(
|
||||
Arc::clone(&server),
|
||||
Arc::clone(&server.metrics_registry()),
|
||||
Arc::clone(server.metrics_registry()),
|
||||
serving_gate.clone(),
|
||||
),
|
||||
flight::make_server(Arc::clone(&server), serving_gate.clone()),
|
||||
|
|
|
@ -25,6 +25,7 @@ use std::fmt::Debug;
|
|||
|
||||
use super::super::planner::Planner;
|
||||
|
||||
#[allow(clippy::enum_variant_names)]
|
||||
#[derive(Debug, Snafu)]
|
||||
pub enum Error {
|
||||
#[snafu(display("Invalid ticket. Error: {:?} Ticket: {:?}", source, ticket))]
|
||||
|
@ -353,7 +354,7 @@ fn optimize_schema(schema: &Schema) -> Schema {
|
|||
fn hydrate_dictionary(array: &ArrayRef) -> Result<ArrayRef, Error> {
|
||||
match array.data_type() {
|
||||
DataType::Dictionary(_, value) => {
|
||||
arrow::compute::cast(array, &value).context(DictionaryError)
|
||||
arrow::compute::cast(array, value).context(DictionaryError)
|
||||
}
|
||||
_ => unreachable!("not a dictionary"),
|
||||
}
|
||||
|
|
|
@ -863,9 +863,7 @@ where
|
|||
let predicate = PredicateBuilder::default().set_range(range).build();
|
||||
let db_name = db_name.as_ref();
|
||||
|
||||
let db = db_store
|
||||
.db(&db_name)
|
||||
.context(DatabaseNotFound { db_name })?;
|
||||
let db = db_store.db(db_name).context(DatabaseNotFound { db_name })?;
|
||||
let executor = db_store.executor();
|
||||
|
||||
let plan = Planner::new(Arc::clone(&executor))
|
||||
|
@ -1080,9 +1078,7 @@ where
|
|||
let owned_db_name = db_name;
|
||||
let db_name = owned_db_name.as_str();
|
||||
|
||||
let db = db_store
|
||||
.db(&db_name)
|
||||
.context(DatabaseNotFound { db_name })?;
|
||||
let db = db_store.db(db_name).context(DatabaseNotFound { db_name })?;
|
||||
let executor = db_store.executor();
|
||||
|
||||
let planner = Planner::new(Arc::clone(&executor));
|
||||
|
@ -1184,7 +1180,7 @@ mod tests {
|
|||
use futures::prelude::*;
|
||||
|
||||
use generated_types::{
|
||||
aggregate::AggregateType, i_ox_testing_client, node, read_response::frame, storage_client,
|
||||
aggregate::AggregateType, i_ox_testing_client, node, storage_client,
|
||||
Aggregate as RPCAggregate, Duration as RPCDuration, Node, ReadSource, TestErrorRequest,
|
||||
Window as RPCWindow,
|
||||
};
|
||||
|
@ -2578,13 +2574,13 @@ mod tests {
|
|||
.try_collect()
|
||||
.await?;
|
||||
|
||||
let data_frames: Vec<frame::Data> = responses
|
||||
let data_frames_count = responses
|
||||
.into_iter()
|
||||
.flat_map(|r| r.frames)
|
||||
.flat_map(|f| f.data)
|
||||
.collect();
|
||||
.count();
|
||||
|
||||
let s = format!("{} aggregate_frames", data_frames.len());
|
||||
let s = format!("{} aggregate_frames", data_frames_count);
|
||||
|
||||
Ok(vec![s])
|
||||
}
|
||||
|
@ -2671,13 +2667,13 @@ mod tests {
|
|||
.try_collect()
|
||||
.await?;
|
||||
|
||||
let data_frames: Vec<frame::Data> = responses
|
||||
let data_frames_count = responses
|
||||
.into_iter()
|
||||
.flat_map(|r| r.frames)
|
||||
.flat_map(|f| f.data)
|
||||
.collect();
|
||||
.count();
|
||||
|
||||
let s = format!("{} frames", data_frames.len());
|
||||
let s = format!("{} frames", data_frames_count);
|
||||
|
||||
Ok(vec![s])
|
||||
}
|
||||
|
@ -2696,13 +2692,13 @@ mod tests {
|
|||
.try_collect()
|
||||
.await?;
|
||||
|
||||
let data_frames: Vec<frame::Data> = responses
|
||||
let data_frames_count = responses
|
||||
.into_iter()
|
||||
.flat_map(|r| r.frames)
|
||||
.flat_map(|f| f.data)
|
||||
.collect();
|
||||
.count();
|
||||
|
||||
let s = format!("{} group frames", data_frames.len());
|
||||
let s = format!("{} group frames", data_frames_count);
|
||||
|
||||
Ok(vec![s])
|
||||
}
|
||||
|
|
|
@ -241,7 +241,7 @@ impl ServerFixture {
|
|||
|
||||
/// Directory used for data storage.
|
||||
pub fn dir(&self) -> &Path {
|
||||
&self.server.dir.path()
|
||||
self.server.dir.path()
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -181,7 +181,7 @@ impl Scenario {
|
|||
.build()
|
||||
.unwrap(),
|
||||
];
|
||||
self.write_data(&influxdb2, points).await.unwrap();
|
||||
self.write_data(influxdb2, points).await.unwrap();
|
||||
|
||||
let host_array = StringArray::from(vec![
|
||||
Some("server01"),
|
||||
|
|
|
@ -93,7 +93,7 @@ impl<F: TryFuture> Future for TrackedFuture<F> {
|
|||
#[pinned_drop]
|
||||
impl<F: TryFuture> PinnedDrop for TrackedFuture<F> {
|
||||
fn drop(self: Pin<&mut Self>) {
|
||||
let state: &TrackerState = &self.project().tracker;
|
||||
let state: &TrackerState = self.project().tracker;
|
||||
|
||||
let wall_nanos = state.start_instant.elapsed().as_nanos() as usize;
|
||||
|
||||
|
|
|
@ -747,7 +747,7 @@ ERROR foo
|
|||
|
||||
for range in test_cases {
|
||||
for len in range {
|
||||
let long = std::iter::repeat("X").take(len).collect::<String>();
|
||||
let long = "X".repeat(len);
|
||||
|
||||
let captured = log_test(
|
||||
Builder::new().with_log_filter(&Some("error".to_string())),
|
||||
|
|
Loading…
Reference in New Issue