Merge branch 'main' into layeredtracing
commit
a8759c8b7e
|
@ -1,10 +1,16 @@
|
||||||
use snafu::{ResultExt, Snafu};
|
use snafu::{ResultExt, Snafu};
|
||||||
use std::{collections::BTreeSet, sync::Arc};
|
use std::{collections::BTreeSet, sync::Arc};
|
||||||
|
|
||||||
use crate::table::Table;
|
use crate::storage::Storage;
|
||||||
use data_types::{partition_metadata::TableSummary, timestamp::TimestampRange};
|
use data_types::{
|
||||||
|
partition_metadata::{Statistics, TableSummary},
|
||||||
|
timestamp::TimestampRange,
|
||||||
|
};
|
||||||
use datafusion::physical_plan::SendableRecordBatchStream;
|
use datafusion::physical_plan::SendableRecordBatchStream;
|
||||||
use internal_types::{schema::Schema, selection::Selection};
|
use internal_types::{
|
||||||
|
schema::{Schema, TIME_COLUMN_NAME},
|
||||||
|
selection::Selection,
|
||||||
|
};
|
||||||
use object_store::{path::Path, ObjectStore};
|
use object_store::{path::Path, ObjectStore};
|
||||||
use query::predicate::Predicate;
|
use query::predicate::Predicate;
|
||||||
|
|
||||||
|
@ -13,25 +19,15 @@ use std::mem;
|
||||||
|
|
||||||
#[derive(Debug, Snafu)]
|
#[derive(Debug, Snafu)]
|
||||||
pub enum Error {
|
pub enum Error {
|
||||||
#[snafu(display("Error writing table '{}': {}", table_name, source))]
|
|
||||||
TableWrite {
|
|
||||||
table_name: String,
|
|
||||||
source: crate::table::Error,
|
|
||||||
},
|
|
||||||
|
|
||||||
#[snafu(display("Table Error in '{}': {}", table_name, source))]
|
|
||||||
NamedTableError {
|
|
||||||
table_name: String,
|
|
||||||
source: crate::table::Error,
|
|
||||||
},
|
|
||||||
|
|
||||||
#[snafu(display("Table '{}' not found in chunk", table_name))]
|
#[snafu(display("Table '{}' not found in chunk", table_name))]
|
||||||
NamedTableNotFoundInChunk { table_name: String },
|
NamedTableNotFoundInChunk { table_name: String },
|
||||||
|
|
||||||
#[snafu(display("Error read parquet file for table '{}'", table_name,))]
|
#[snafu(display("Failed to read parquet: {}", source))]
|
||||||
ReadParquet {
|
ReadParquet { source: crate::storage::Error },
|
||||||
table_name: String,
|
|
||||||
source: crate::table::Error,
|
#[snafu(display("Failed to select columns: {}", source))]
|
||||||
|
SelectColumns {
|
||||||
|
source: internal_types::schema::Error,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -64,8 +60,23 @@ pub struct Chunk {
|
||||||
/// Partition this chunk belongs to
|
/// Partition this chunk belongs to
|
||||||
partition_key: String,
|
partition_key: String,
|
||||||
|
|
||||||
/// The table in chunk
|
/// Meta data of the table
|
||||||
table: Table,
|
table_summary: Arc<TableSummary>,
|
||||||
|
|
||||||
|
/// Schema that goes with this table's parquet file
|
||||||
|
schema: Arc<Schema>,
|
||||||
|
|
||||||
|
/// Timestamp range of this table's parquet file
|
||||||
|
/// (extracted from TableSummary)
|
||||||
|
timestamp_range: Option<TimestampRange>,
|
||||||
|
|
||||||
|
/// Object store of the above relative path to open and read the file
|
||||||
|
object_store: Arc<ObjectStore>,
|
||||||
|
|
||||||
|
/// Path in the object store. Format:
|
||||||
|
/// <writer id>/<database>/data/<partition key>/<chunk
|
||||||
|
/// id>/<tablename>.parquet
|
||||||
|
object_store_path: Path,
|
||||||
|
|
||||||
metrics: ChunkMetrics,
|
metrics: ChunkMetrics,
|
||||||
}
|
}
|
||||||
|
@ -79,11 +90,15 @@ impl Chunk {
|
||||||
schema: Schema,
|
schema: Schema,
|
||||||
metrics: ChunkMetrics,
|
metrics: ChunkMetrics,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
let table = Table::new(table_summary, file_location, store, schema);
|
let timestamp_range = extract_range(&table_summary);
|
||||||
|
|
||||||
let mut chunk = Self {
|
let mut chunk = Self {
|
||||||
partition_key: part_key.into(),
|
partition_key: part_key.into(),
|
||||||
table,
|
table_summary: Arc::new(table_summary),
|
||||||
|
schema: Arc::new(schema),
|
||||||
|
timestamp_range,
|
||||||
|
object_store: store,
|
||||||
|
object_store_path: file_location,
|
||||||
metrics,
|
metrics,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -97,64 +112,109 @@ impl Chunk {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Return object store path for this chunk
|
/// Return object store path for this chunk
|
||||||
pub fn table_path(&self) -> Path {
|
pub fn path(&self) -> Path {
|
||||||
self.table.path()
|
self.object_store_path.clone()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns the summary statistics for this chunk
|
/// Returns the summary statistics for this chunk
|
||||||
pub fn table_summary(&self) -> &Arc<TableSummary> {
|
pub fn table_summary(&self) -> &Arc<TableSummary> {
|
||||||
self.table.table_summary()
|
&self.table_summary
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns the name of the table this chunk holds
|
/// Returns the name of the table this chunk holds
|
||||||
pub fn table_name(&self) -> &str {
|
pub fn table_name(&self) -> &str {
|
||||||
self.table.name()
|
&self.table_summary.name
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Return the approximate memory size of the chunk, in bytes including the
|
/// Return the approximate memory size of the chunk, in bytes including the
|
||||||
/// dictionary, tables, and their rows.
|
/// dictionary, tables, and their rows.
|
||||||
pub fn size(&self) -> usize {
|
pub fn size(&self) -> usize {
|
||||||
self.table.size() + self.partition_key.len() + mem::size_of::<Self>()
|
mem::size_of::<Self>()
|
||||||
|
+ self.partition_key.len()
|
||||||
|
+ self.table_summary.size()
|
||||||
|
+ mem::size_of_val(&self.schema.as_ref())
|
||||||
|
+ mem::size_of_val(&self.object_store_path)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Return possibly restricted Schema for the table in this chunk
|
/// Return possibly restricted Schema for this chunk
|
||||||
pub fn table_schema(&self, selection: Selection<'_>) -> Result<Schema> {
|
pub fn schema(&self, selection: Selection<'_>) -> Result<Schema> {
|
||||||
self.table.schema(selection).context(NamedTableError {
|
Ok(match selection {
|
||||||
table_name: self.table_name(),
|
Selection::All => self.schema.as_ref().clone(),
|
||||||
|
Selection::Some(columns) => {
|
||||||
|
let columns = self.schema.select(columns).context(SelectColumns)?;
|
||||||
|
self.schema.project(&columns)
|
||||||
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Infallably return the full schema (for all columns) for this chunk
|
/// Infallably return the full schema (for all columns) for this chunk
|
||||||
pub fn full_schema(&self) -> Arc<Schema> {
|
pub fn full_schema(&self) -> Arc<Schema> {
|
||||||
self.table.full_schema()
|
Arc::clone(&self.schema)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Return true if the table in this chunk contains values within the time range
|
// Return true if this chunk contains values within the time range
|
||||||
pub fn has_timerange(&self, timestamp_range: Option<&TimestampRange>) -> bool {
|
pub fn has_timerange(&self, timestamp_range: Option<&TimestampRange>) -> bool {
|
||||||
self.table.matches_predicate(timestamp_range)
|
match (self.timestamp_range, timestamp_range) {
|
||||||
|
(Some(a), Some(b)) => !a.disjoint(b),
|
||||||
|
(None, Some(_)) => false, /* If this chunk doesn't have a time column it can't match */
|
||||||
|
// the predicate
|
||||||
|
(_, None) => true,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Return the columns names that belong to the given column
|
// Return the columns names that belong to the given column
|
||||||
// selection
|
// selection
|
||||||
pub fn column_names(&self, selection: Selection<'_>) -> Option<BTreeSet<String>> {
|
pub fn column_names(&self, selection: Selection<'_>) -> Option<BTreeSet<String>> {
|
||||||
self.table.column_names(selection)
|
let fields = self.schema.inner().fields().iter();
|
||||||
|
|
||||||
|
Some(match selection {
|
||||||
|
Selection::Some(cols) => fields
|
||||||
|
.filter_map(|x| {
|
||||||
|
if cols.contains(&x.name().as_str()) {
|
||||||
|
Some(x.name().clone())
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.collect(),
|
||||||
|
Selection::All => fields.map(|x| x.name().clone()).collect(),
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Return stream of data read from parquet file of the given table
|
/// Return stream of data read from parquet file
|
||||||
pub fn read_filter(
|
pub fn read_filter(
|
||||||
&self,
|
&self,
|
||||||
predicate: &Predicate,
|
predicate: &Predicate,
|
||||||
selection: Selection<'_>,
|
selection: Selection<'_>,
|
||||||
) -> Result<SendableRecordBatchStream> {
|
) -> Result<SendableRecordBatchStream> {
|
||||||
self.table
|
Storage::read_filter(
|
||||||
.read_filter(predicate, selection)
|
predicate,
|
||||||
.context(ReadParquet {
|
selection,
|
||||||
table_name: self.table_name(),
|
Arc::clone(&self.schema.as_arrow()),
|
||||||
})
|
self.object_store_path.clone(),
|
||||||
|
Arc::clone(&self.object_store),
|
||||||
|
)
|
||||||
|
.context(ReadParquet)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// The total number of rows in all row groups in all tables in this chunk.
|
/// The total number of rows in all row groups in this chunk.
|
||||||
pub fn rows(&self) -> usize {
|
pub fn rows(&self) -> usize {
|
||||||
self.table.rows()
|
// All columns have the same rows, so return get row count of the first column
|
||||||
|
self.table_summary.columns[0].count() as usize
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Extracts min/max values of the timestamp column, from the TableSummary, if possible
|
||||||
|
fn extract_range(table_summary: &TableSummary) -> Option<TimestampRange> {
|
||||||
|
table_summary
|
||||||
|
.column(TIME_COLUMN_NAME)
|
||||||
|
.map(|c| {
|
||||||
|
if let Statistics::I64(s) = &c.stats {
|
||||||
|
if let (Some(min), Some(max)) = (s.min, s.max) {
|
||||||
|
return Some(TimestampRange::new(min, max));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
None
|
||||||
|
})
|
||||||
|
.flatten()
|
||||||
|
}
|
||||||
|
|
|
@ -14,7 +14,6 @@ pub mod cleanup;
|
||||||
pub mod metadata;
|
pub mod metadata;
|
||||||
pub mod rebuild;
|
pub mod rebuild;
|
||||||
pub mod storage;
|
pub mod storage;
|
||||||
pub mod table;
|
|
||||||
pub mod test_utils;
|
pub mod test_utils;
|
||||||
|
|
||||||
mod storage_testing;
|
mod storage_testing;
|
||||||
|
|
|
@ -551,7 +551,7 @@ mod tests {
|
||||||
|
|
||||||
// step 1: read back schema
|
// step 1: read back schema
|
||||||
let schema_actual = read_schema_from_parquet_metadata(&parquet_metadata).unwrap();
|
let schema_actual = read_schema_from_parquet_metadata(&parquet_metadata).unwrap();
|
||||||
let schema_expected = chunk.table_schema(Selection::All).unwrap();
|
let schema_expected = chunk.schema(Selection::All).unwrap();
|
||||||
assert_eq!(schema_actual, schema_expected);
|
assert_eq!(schema_actual, schema_expected);
|
||||||
|
|
||||||
// step 2: read back statistics
|
// step 2: read back statistics
|
||||||
|
@ -574,7 +574,7 @@ mod tests {
|
||||||
|
|
||||||
// step 1: read back schema
|
// step 1: read back schema
|
||||||
let schema_actual = read_schema_from_parquet_metadata(&parquet_metadata).unwrap();
|
let schema_actual = read_schema_from_parquet_metadata(&parquet_metadata).unwrap();
|
||||||
let schema_expected = chunk.table_schema(Selection::All).unwrap();
|
let schema_expected = chunk.schema(Selection::All).unwrap();
|
||||||
assert_eq!(schema_actual, schema_expected);
|
assert_eq!(schema_actual, schema_expected);
|
||||||
|
|
||||||
// step 2: read back statistics
|
// step 2: read back statistics
|
||||||
|
@ -595,7 +595,7 @@ mod tests {
|
||||||
|
|
||||||
// step 1: read back schema
|
// step 1: read back schema
|
||||||
let schema_actual = read_schema_from_parquet_metadata(&parquet_metadata).unwrap();
|
let schema_actual = read_schema_from_parquet_metadata(&parquet_metadata).unwrap();
|
||||||
let schema_expected = chunk.table_schema(Selection::All).unwrap();
|
let schema_expected = chunk.schema(Selection::All).unwrap();
|
||||||
assert_eq!(schema_actual, schema_expected);
|
assert_eq!(schema_actual, schema_expected);
|
||||||
|
|
||||||
// step 2: reading back statistics fails
|
// step 2: reading back statistics fails
|
||||||
|
@ -618,7 +618,7 @@ mod tests {
|
||||||
|
|
||||||
// step 1: read back schema
|
// step 1: read back schema
|
||||||
let schema_actual = read_schema_from_parquet_metadata(&parquet_metadata).unwrap();
|
let schema_actual = read_schema_from_parquet_metadata(&parquet_metadata).unwrap();
|
||||||
let schema_expected = chunk.table_schema(Selection::All).unwrap();
|
let schema_expected = chunk.schema(Selection::All).unwrap();
|
||||||
assert_eq!(schema_actual, schema_expected);
|
assert_eq!(schema_actual, schema_expected);
|
||||||
|
|
||||||
// step 2: reading back statistics fails
|
// step 2: reading back statistics fails
|
||||||
|
|
|
@ -1,177 +0,0 @@
|
||||||
use snafu::{ResultExt, Snafu};
|
|
||||||
use std::{collections::BTreeSet, mem, sync::Arc};
|
|
||||||
|
|
||||||
use crate::storage::{self, Storage};
|
|
||||||
use data_types::{
|
|
||||||
partition_metadata::{Statistics, TableSummary},
|
|
||||||
timestamp::TimestampRange,
|
|
||||||
};
|
|
||||||
use datafusion::physical_plan::SendableRecordBatchStream;
|
|
||||||
use internal_types::{
|
|
||||||
schema::{Schema, TIME_COLUMN_NAME},
|
|
||||||
selection::Selection,
|
|
||||||
};
|
|
||||||
use object_store::{path::Path, ObjectStore};
|
|
||||||
use query::predicate::Predicate;
|
|
||||||
|
|
||||||
#[derive(Debug, Snafu)]
|
|
||||||
pub enum Error {
|
|
||||||
#[snafu(display("Failed to select columns: {}", source))]
|
|
||||||
SelectColumns {
|
|
||||||
source: internal_types::schema::Error,
|
|
||||||
},
|
|
||||||
|
|
||||||
#[snafu(display("Failed to read parquet: {}", source))]
|
|
||||||
ReadParquet { source: storage::Error },
|
|
||||||
}
|
|
||||||
|
|
||||||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
|
||||||
|
|
||||||
/// Table that belongs to a chunk persisted in a parquet file in object store
|
|
||||||
#[derive(Debug, Clone)]
|
|
||||||
pub struct Table {
|
|
||||||
/// Meta data of the table
|
|
||||||
table_summary: Arc<TableSummary>,
|
|
||||||
|
|
||||||
/// Path in the object store. Format:
|
|
||||||
/// <writer id>/<database>/data/<partition key>/<chunk
|
|
||||||
/// id>/<tablename>.parquet
|
|
||||||
object_store_path: Path,
|
|
||||||
|
|
||||||
/// Object store of the above relative path to open and read the file
|
|
||||||
object_store: Arc<ObjectStore>,
|
|
||||||
|
|
||||||
/// Schema that goes with this table's parquet file
|
|
||||||
table_schema: Arc<Schema>,
|
|
||||||
|
|
||||||
/// Timestamp range of this table's parquet file
|
|
||||||
/// (extracted from TableSummary)
|
|
||||||
timestamp_range: Option<TimestampRange>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Table {
|
|
||||||
pub fn new(
|
|
||||||
table_summary: TableSummary,
|
|
||||||
path: Path,
|
|
||||||
store: Arc<ObjectStore>,
|
|
||||||
schema: Schema,
|
|
||||||
) -> Self {
|
|
||||||
let timestamp_range = extract_range(&table_summary);
|
|
||||||
|
|
||||||
Self {
|
|
||||||
table_summary: Arc::new(table_summary),
|
|
||||||
object_store_path: path,
|
|
||||||
object_store: store,
|
|
||||||
table_schema: Arc::new(schema),
|
|
||||||
timestamp_range,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn table_summary(&self) -> &Arc<TableSummary> {
|
|
||||||
&self.table_summary
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn has_table(&self, table_name: &str) -> bool {
|
|
||||||
self.table_summary.has_table(table_name)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Return the approximate memory size of the table
|
|
||||||
pub fn size(&self) -> usize {
|
|
||||||
mem::size_of::<Self>()
|
|
||||||
+ self.table_summary.size()
|
|
||||||
+ mem::size_of_val(&self.object_store_path)
|
|
||||||
+ mem::size_of_val(&self.table_schema.as_ref())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Return name of this table
|
|
||||||
pub fn name(&self) -> &str {
|
|
||||||
&self.table_summary.name
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Return the object store path of this table
|
|
||||||
pub fn path(&self) -> Path {
|
|
||||||
self.object_store_path.clone()
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Return schema of this table for specified selection columns
|
|
||||||
pub fn schema(&self, selection: Selection<'_>) -> Result<Schema> {
|
|
||||||
Ok(match selection {
|
|
||||||
Selection::All => self.table_schema.as_ref().clone(),
|
|
||||||
Selection::Some(columns) => {
|
|
||||||
let columns = self.table_schema.select(columns).context(SelectColumns)?;
|
|
||||||
self.table_schema.project(&columns)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Infallably return the full schema (for all columns) for this chunk
|
|
||||||
pub fn full_schema(&self) -> Arc<Schema> {
|
|
||||||
Arc::clone(&self.table_schema)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check if 2 time ranges overlap
|
|
||||||
pub fn matches_predicate(&self, timestamp_range: Option<&TimestampRange>) -> bool {
|
|
||||||
match (self.timestamp_range, timestamp_range) {
|
|
||||||
(Some(a), Some(b)) => !a.disjoint(b),
|
|
||||||
(None, Some(_)) => false, /* If this chunk doesn't have a time column it can't match */
|
|
||||||
// the predicate
|
|
||||||
(_, None) => true,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Return columns names of this table that belong to the given column selection
|
|
||||||
pub fn column_names(&self, selection: Selection<'_>) -> Option<BTreeSet<String>> {
|
|
||||||
let fields = self.table_schema.inner().fields().iter();
|
|
||||||
|
|
||||||
Some(match selection {
|
|
||||||
Selection::Some(cols) => fields
|
|
||||||
.filter_map(|x| {
|
|
||||||
if cols.contains(&x.name().as_str()) {
|
|
||||||
Some(x.name().clone())
|
|
||||||
} else {
|
|
||||||
None
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.collect(),
|
|
||||||
Selection::All => fields.map(|x| x.name().clone()).collect(),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Return stream of data read from parquet file for given predicate and
|
|
||||||
/// column selection
|
|
||||||
pub fn read_filter(
|
|
||||||
&self,
|
|
||||||
predicate: &Predicate,
|
|
||||||
selection: Selection<'_>,
|
|
||||||
) -> Result<SendableRecordBatchStream> {
|
|
||||||
Storage::read_filter(
|
|
||||||
predicate,
|
|
||||||
selection,
|
|
||||||
Arc::clone(&self.table_schema.as_arrow()),
|
|
||||||
self.object_store_path.clone(),
|
|
||||||
Arc::clone(&self.object_store),
|
|
||||||
)
|
|
||||||
.context(ReadParquet)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// The number of rows of this table
|
|
||||||
pub fn rows(&self) -> usize {
|
|
||||||
// All columns have the same rows, so return get row count of the first column
|
|
||||||
self.table_summary.columns[0].count() as usize
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Extracts min/max values of the timestamp column, from the TableSummary, if possible
|
|
||||||
fn extract_range(table_summary: &TableSummary) -> Option<TimestampRange> {
|
|
||||||
table_summary
|
|
||||||
.column(TIME_COLUMN_NAME)
|
|
||||||
.map(|c| {
|
|
||||||
if let Statistics::I64(s) = &c.stats {
|
|
||||||
if let (Some(min), Some(max)) = (s.min, s.max) {
|
|
||||||
return Some(TimestampRange::new(min, max));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
None
|
|
||||||
})
|
|
||||||
.flatten()
|
|
||||||
}
|
|
|
@ -66,7 +66,7 @@ pub async fn load_parquet_from_store_for_chunk(
|
||||||
chunk: &Chunk,
|
chunk: &Chunk,
|
||||||
store: Arc<ObjectStore>,
|
store: Arc<ObjectStore>,
|
||||||
) -> Result<(String, Vec<u8>)> {
|
) -> Result<(String, Vec<u8>)> {
|
||||||
let path = chunk.table_path();
|
let path = chunk.path();
|
||||||
let table_name = chunk.table_name().to_string();
|
let table_name = chunk.table_name().to_string();
|
||||||
Ok((
|
Ok((
|
||||||
table_name,
|
table_name,
|
||||||
|
@ -584,7 +584,7 @@ pub async fn make_metadata(
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
(
|
(
|
||||||
chunk.table_path(),
|
chunk.path(),
|
||||||
read_parquet_metadata_from_file(parquet_data).unwrap(),
|
read_parquet_metadata_from_file(parquet_data).unwrap(),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
|
@ -246,6 +246,7 @@ impl<C: PartitionChunk + 'static> TableProvider for ChunkTableProvider<C> {
|
||||||
scan_schema,
|
scan_schema,
|
||||||
chunks,
|
chunks,
|
||||||
predicate,
|
predicate,
|
||||||
|
false,
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
Ok(plan)
|
Ok(plan)
|
||||||
|
@ -317,6 +318,11 @@ impl<C: PartitionChunk + 'static> Deduplicater<C> {
|
||||||
/// ┌───────────────────────┐ │
|
/// ┌───────────────────────┐ │
|
||||||
/// │SortPreservingMergeExec│ │
|
/// │SortPreservingMergeExec│ │
|
||||||
/// └───────────────────────┘ │
|
/// └───────────────────────┘ │
|
||||||
|
/// ▲ │
|
||||||
|
/// │ │
|
||||||
|
/// ┌───────────────────────┐ │
|
||||||
|
/// │ UnionExec │ │
|
||||||
|
/// └───────────────────────┘ │
|
||||||
/// ▲ |
|
/// ▲ |
|
||||||
/// │ |
|
/// │ |
|
||||||
/// ┌───────────┴───────────┐ │
|
/// ┌───────────┴───────────┐ │
|
||||||
|
@ -340,18 +346,21 @@ impl<C: PartitionChunk + 'static> Deduplicater<C> {
|
||||||
schema: ArrowSchemaRef,
|
schema: ArrowSchemaRef,
|
||||||
chunks: Vec<Arc<C>>,
|
chunks: Vec<Arc<C>>,
|
||||||
predicate: Predicate,
|
predicate: Predicate,
|
||||||
|
for_testing: bool, // TODO: remove this parameter when #1682 and #1683 are done
|
||||||
) -> Result<Arc<dyn ExecutionPlan>> {
|
) -> Result<Arc<dyn ExecutionPlan>> {
|
||||||
// find overlapped chunks and put them into the right group
|
// find overlapped chunks and put them into the right group
|
||||||
self.split_overlapped_chunks(chunks.to_vec())?;
|
self.split_overlapped_chunks(chunks.to_vec())?;
|
||||||
|
|
||||||
// TEMP until the rest of this module's code is complete:
|
// TEMP until the rest of this module's code is complete:
|
||||||
// merge all plans into the same
|
// merge all plans into the same
|
||||||
self.no_duplicates_chunks
|
if !for_testing {
|
||||||
.append(&mut self.in_chunk_duplicates_chunks);
|
self.no_duplicates_chunks
|
||||||
for mut group in &mut self.overlapped_chunks_set {
|
.append(&mut self.in_chunk_duplicates_chunks);
|
||||||
self.no_duplicates_chunks.append(&mut group);
|
for mut group in &mut self.overlapped_chunks_set {
|
||||||
|
self.no_duplicates_chunks.append(&mut group);
|
||||||
|
}
|
||||||
|
self.overlapped_chunks_set.clear();
|
||||||
}
|
}
|
||||||
self.overlapped_chunks_set.clear();
|
|
||||||
|
|
||||||
// Building plans
|
// Building plans
|
||||||
let mut plans = vec![];
|
let mut plans = vec![];
|
||||||
|
@ -396,16 +405,16 @@ impl<C: PartitionChunk + 'static> Deduplicater<C> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let final_plan = plans.remove(0);
|
match plans.len() {
|
||||||
|
// No plan generated. Something must go wrong
|
||||||
// TODO
|
// Even if the chunks are empty, IOxReadFilterNode is still created
|
||||||
// There are still plan, add UnionExec
|
0 => panic!("Internal error generating deduplicate plan"),
|
||||||
if !plans.is_empty() {
|
// Only one plan, no need to add union node
|
||||||
// final_plan = union_plan
|
// Return the plan itself
|
||||||
panic!("Unexpected error: There should be only one output for scan plan, but there were: {:#?}", plans);
|
1 => Ok(plans.remove(0)),
|
||||||
|
// Has many plans and need to union them
|
||||||
|
_ => Ok(Arc::new(UnionExec::new(plans))),
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(final_plan)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// discover overlaps and split them into three groups:
|
/// discover overlaps and split them into three groups:
|
||||||
|
@ -430,7 +439,7 @@ impl<C: PartitionChunk + 'static> Deduplicater<C> {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Return true if all chunks are neither overlap nor has duplicates in itself
|
/// Return true if all chunks neither overlap nor have duplicates in itself
|
||||||
fn no_duplicates(&self) -> bool {
|
fn no_duplicates(&self) -> bool {
|
||||||
self.overlapped_chunks_set.is_empty() && self.in_chunk_duplicates_chunks.is_empty()
|
self.overlapped_chunks_set.is_empty() && self.in_chunk_duplicates_chunks.is_empty()
|
||||||
}
|
}
|
||||||
|
@ -888,9 +897,8 @@ mod test {
|
||||||
Predicate::default(),
|
Predicate::default(),
|
||||||
);
|
);
|
||||||
let batch = collect(sort_plan.unwrap()).await.unwrap();
|
let batch = collect(sort_plan.unwrap()).await.unwrap();
|
||||||
// data is not sorted on primary key(tag1, tag2, time)
|
// data is sorted on primary key(tag1, tag2, time)
|
||||||
|
// NOTE: When the full deduplication is done, the duplicates will be removed from this output
|
||||||
// NOTE: When the full deduplication is done, the duplciates will be removed from this output
|
|
||||||
let expected = vec![
|
let expected = vec![
|
||||||
"+-----------+------+------+-------------------------------+",
|
"+-----------+------+------+-------------------------------+",
|
||||||
"| field_int | tag1 | tag2 | time |",
|
"| field_int | tag1 | tag2 | time |",
|
||||||
|
@ -910,6 +918,241 @@ mod test {
|
||||||
assert_batches_eq!(&expected, &batch);
|
assert_batches_eq!(&expected, &batch);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn scan_plan_with_one_chunk_no_duplicates() {
|
||||||
|
// Test no duplicate at all
|
||||||
|
let chunk = Arc::new(
|
||||||
|
TestChunk::new(1)
|
||||||
|
.with_time_column_with_stats("t", 5, 7000)
|
||||||
|
.with_tag_column_with_stats("t", "tag1", "AL", "MT")
|
||||||
|
.with_int_field_column("t", "field_int")
|
||||||
|
.with_five_rows_of_data("t"),
|
||||||
|
);
|
||||||
|
|
||||||
|
// Datafusion schema of the chunk
|
||||||
|
let schema = chunk.table_schema(Selection::All).unwrap().as_arrow();
|
||||||
|
|
||||||
|
let mut deduplicator = Deduplicater::new();
|
||||||
|
let plan = deduplicator.build_scan_plan(
|
||||||
|
Arc::from("t"),
|
||||||
|
schema,
|
||||||
|
vec![Arc::clone(&chunk)],
|
||||||
|
Predicate::default(),
|
||||||
|
true,
|
||||||
|
);
|
||||||
|
let batch = collect(plan.unwrap()).await.unwrap();
|
||||||
|
// No duplicates so no sort at all. The data will stay in their original order
|
||||||
|
let expected = vec![
|
||||||
|
"+-----------+------+-------------------------------+",
|
||||||
|
"| field_int | tag1 | time |",
|
||||||
|
"+-----------+------+-------------------------------+",
|
||||||
|
"| 1000 | MT | 1970-01-01 00:00:00.000001 |",
|
||||||
|
"| 10 | MT | 1970-01-01 00:00:00.000007 |",
|
||||||
|
"| 70 | CT | 1970-01-01 00:00:00.000000100 |",
|
||||||
|
"| 100 | AL | 1970-01-01 00:00:00.000000050 |",
|
||||||
|
"| 5 | MT | 1970-01-01 00:00:00.000005 |",
|
||||||
|
"+-----------+------+-------------------------------+",
|
||||||
|
];
|
||||||
|
assert_batches_eq!(&expected, &batch);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn scan_plan_with_one_chunk_with_duplicates() {
|
||||||
|
// Test one chunk with duplicate within
|
||||||
|
let chunk = Arc::new(
|
||||||
|
TestChunk::new(1)
|
||||||
|
.with_time_column_with_stats("t", 5, 7000)
|
||||||
|
.with_tag_column_with_stats("t", "tag1", "AL", "MT")
|
||||||
|
.with_int_field_column("t", "field_int")
|
||||||
|
.with_may_contain_pk_duplicates(true)
|
||||||
|
.with_ten_rows_of_data_some_duplicates("t"),
|
||||||
|
);
|
||||||
|
|
||||||
|
// Datafusion schema of the chunk
|
||||||
|
let schema = chunk.table_schema(Selection::All).unwrap().as_arrow();
|
||||||
|
|
||||||
|
let mut deduplicator = Deduplicater::new();
|
||||||
|
let plan = deduplicator.build_scan_plan(
|
||||||
|
Arc::from("t"),
|
||||||
|
schema,
|
||||||
|
vec![Arc::clone(&chunk)],
|
||||||
|
Predicate::default(),
|
||||||
|
true,
|
||||||
|
);
|
||||||
|
let batch = collect(plan.unwrap()).await.unwrap();
|
||||||
|
// Data must be sorted and duplicates removed
|
||||||
|
// TODO: it is just sorted for now. When https://github.com/influxdata/influxdb_iox/issues/1646
|
||||||
|
// is done, duplicates will be removed
|
||||||
|
let expected = vec![
|
||||||
|
"+-----------+------+-------------------------------+",
|
||||||
|
"| field_int | tag1 | time |",
|
||||||
|
"+-----------+------+-------------------------------+",
|
||||||
|
"| 100 | AL | 1970-01-01 00:00:00.000000050 |",
|
||||||
|
"| 10 | AL | 1970-01-01 00:00:00.000000050 |",
|
||||||
|
"| 70 | CT | 1970-01-01 00:00:00.000000100 |",
|
||||||
|
"| 70 | CT | 1970-01-01 00:00:00.000000500 |",
|
||||||
|
"| 5 | MT | 1970-01-01 00:00:00.000000005 |",
|
||||||
|
"| 30 | MT | 1970-01-01 00:00:00.000000005 |",
|
||||||
|
"| 1000 | MT | 1970-01-01 00:00:00.000001 |",
|
||||||
|
"| 1000 | MT | 1970-01-01 00:00:00.000002 |",
|
||||||
|
"| 10 | MT | 1970-01-01 00:00:00.000007 |",
|
||||||
|
"| 20 | MT | 1970-01-01 00:00:00.000007 |",
|
||||||
|
"+-----------+------+-------------------------------+",
|
||||||
|
];
|
||||||
|
assert_batches_eq!(&expected, &batch);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn scan_plan_with_two_overlapped_chunks_with_duplicates() {
|
||||||
|
// test overlapped chunks
|
||||||
|
let chunk1 = Arc::new(
|
||||||
|
TestChunk::new(1)
|
||||||
|
.with_time_column_with_stats("t", 5, 7000)
|
||||||
|
.with_tag_column_with_stats("t", "tag1", "AL", "MT")
|
||||||
|
.with_int_field_column("t", "field_int")
|
||||||
|
.with_ten_rows_of_data_some_duplicates("t"),
|
||||||
|
);
|
||||||
|
|
||||||
|
let chunk2 = Arc::new(
|
||||||
|
TestChunk::new(1)
|
||||||
|
.with_time_column_with_stats("t", 5, 7000)
|
||||||
|
.with_tag_column_with_stats("t", "tag1", "AL", "MT")
|
||||||
|
.with_int_field_column("t", "field_int")
|
||||||
|
.with_five_rows_of_data("t"),
|
||||||
|
);
|
||||||
|
|
||||||
|
// Datafusion schema of the chunk
|
||||||
|
let schema = chunk1.table_schema(Selection::All).unwrap().as_arrow();
|
||||||
|
|
||||||
|
let mut deduplicator = Deduplicater::new();
|
||||||
|
let plan = deduplicator.build_scan_plan(
|
||||||
|
Arc::from("t"),
|
||||||
|
schema,
|
||||||
|
vec![Arc::clone(&chunk1), Arc::clone(&chunk2)],
|
||||||
|
Predicate::default(),
|
||||||
|
true,
|
||||||
|
);
|
||||||
|
let batch = collect(plan.unwrap()).await.unwrap();
|
||||||
|
// Two overlapped chunks will be sort merged with dupplicates removed
|
||||||
|
// TODO: it is just sorted for now. When https://github.com/influxdata/influxdb_iox/issues/1646
|
||||||
|
// is done, duplicates will be removed
|
||||||
|
let expected = vec![
|
||||||
|
"+-----------+------+-------------------------------+",
|
||||||
|
"| field_int | tag1 | time |",
|
||||||
|
"+-----------+------+-------------------------------+",
|
||||||
|
"| 100 | AL | 1970-01-01 00:00:00.000000050 |",
|
||||||
|
"| 10 | AL | 1970-01-01 00:00:00.000000050 |",
|
||||||
|
"| 100 | AL | 1970-01-01 00:00:00.000000050 |",
|
||||||
|
"| 70 | CT | 1970-01-01 00:00:00.000000100 |",
|
||||||
|
"| 70 | CT | 1970-01-01 00:00:00.000000100 |",
|
||||||
|
"| 70 | CT | 1970-01-01 00:00:00.000000500 |",
|
||||||
|
"| 5 | MT | 1970-01-01 00:00:00.000000005 |",
|
||||||
|
"| 30 | MT | 1970-01-01 00:00:00.000000005 |",
|
||||||
|
"| 1000 | MT | 1970-01-01 00:00:00.000001 |",
|
||||||
|
"| 1000 | MT | 1970-01-01 00:00:00.000001 |",
|
||||||
|
"| 1000 | MT | 1970-01-01 00:00:00.000002 |",
|
||||||
|
"| 5 | MT | 1970-01-01 00:00:00.000005 |",
|
||||||
|
"| 10 | MT | 1970-01-01 00:00:00.000007 |",
|
||||||
|
"| 20 | MT | 1970-01-01 00:00:00.000007 |",
|
||||||
|
"| 10 | MT | 1970-01-01 00:00:00.000007 |",
|
||||||
|
"+-----------+------+-------------------------------+",
|
||||||
|
];
|
||||||
|
assert_batches_eq!(&expected, &batch);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn scan_plan_with_four_chunks() {
|
||||||
|
// This test covers all kind of chunks: overlap, non-overlap without duplicates within, non-overlap with duplicates within
|
||||||
|
let chunk1 = Arc::new(
|
||||||
|
TestChunk::new(1)
|
||||||
|
.with_time_column_with_stats("t", 5, 7000)
|
||||||
|
.with_tag_column_with_stats("t", "tag1", "AL", "MT")
|
||||||
|
.with_int_field_column("t", "field_int")
|
||||||
|
.with_ten_rows_of_data_some_duplicates("t"),
|
||||||
|
);
|
||||||
|
|
||||||
|
// chunk2 overlaps with chunk 1
|
||||||
|
let chunk2 = Arc::new(
|
||||||
|
TestChunk::new(1)
|
||||||
|
.with_time_column_with_stats("t", 5, 7000)
|
||||||
|
.with_tag_column_with_stats("t", "tag1", "AL", "MT")
|
||||||
|
.with_int_field_column("t", "field_int")
|
||||||
|
.with_five_rows_of_data("t"),
|
||||||
|
);
|
||||||
|
|
||||||
|
// chunk3 no overlap, no duplicates within
|
||||||
|
let chunk3 = Arc::new(
|
||||||
|
TestChunk::new(1)
|
||||||
|
.with_time_column_with_stats("t", 8000, 20000)
|
||||||
|
.with_tag_column_with_stats("t", "tag1", "UT", "WA")
|
||||||
|
.with_int_field_column("t", "field_int")
|
||||||
|
.with_three_rows_of_data("t"),
|
||||||
|
);
|
||||||
|
|
||||||
|
// chunk3 no overlap, duplicates within
|
||||||
|
let chunk4 = Arc::new(
|
||||||
|
TestChunk::new(1)
|
||||||
|
.with_time_column_with_stats("t", 28000, 220000)
|
||||||
|
.with_tag_column_with_stats("t", "tag1", "UT", "WA")
|
||||||
|
.with_int_field_column("t", "field_int")
|
||||||
|
.with_may_contain_pk_duplicates(true)
|
||||||
|
.with_four_rows_of_data("t"),
|
||||||
|
);
|
||||||
|
|
||||||
|
// Datafusion schema of the chunk
|
||||||
|
let schema = chunk1.table_schema(Selection::All).unwrap().as_arrow();
|
||||||
|
|
||||||
|
let mut deduplicator = Deduplicater::new();
|
||||||
|
let plan = deduplicator.build_scan_plan(
|
||||||
|
Arc::from("t"),
|
||||||
|
schema,
|
||||||
|
vec![
|
||||||
|
Arc::clone(&chunk1),
|
||||||
|
Arc::clone(&chunk2),
|
||||||
|
Arc::clone(&chunk3),
|
||||||
|
Arc::clone(&chunk4),
|
||||||
|
],
|
||||||
|
Predicate::default(),
|
||||||
|
true,
|
||||||
|
);
|
||||||
|
let batch = collect(plan.unwrap()).await.unwrap();
|
||||||
|
// Final data will be partially sorted and duplicates removed. Detailed:
|
||||||
|
// . chunk1 and chunk2 will be sorted merged and deduplicated (rows 8-32)
|
||||||
|
// . chunk3 will stay in its original (rows 1-3)
|
||||||
|
// . chunk4 will be sorted and deduplicated (rows 4-7)
|
||||||
|
// TODO: data is only partially sorted for now. The deduplication will happen when When https://github.com/influxdata/influxdb_iox/issues/1646
|
||||||
|
// is done
|
||||||
|
let expected = vec![
|
||||||
|
"+-----------+------+-------------------------------+",
|
||||||
|
"| field_int | tag1 | time |",
|
||||||
|
"+-----------+------+-------------------------------+",
|
||||||
|
"| 1000 | WA | 1970-01-01 00:00:00.000008 |",
|
||||||
|
"| 10 | VT | 1970-01-01 00:00:00.000010 |",
|
||||||
|
"| 70 | UT | 1970-01-01 00:00:00.000020 |",
|
||||||
|
"| 70 | UT | 1970-01-01 00:00:00.000020 |",
|
||||||
|
"| 10 | VT | 1970-01-01 00:00:00.000010 |",
|
||||||
|
"| 50 | VT | 1970-01-01 00:00:00.000010 |",
|
||||||
|
"| 1000 | WA | 1970-01-01 00:00:00.000008 |",
|
||||||
|
"| 100 | AL | 1970-01-01 00:00:00.000000050 |",
|
||||||
|
"| 10 | AL | 1970-01-01 00:00:00.000000050 |",
|
||||||
|
"| 100 | AL | 1970-01-01 00:00:00.000000050 |",
|
||||||
|
"| 70 | CT | 1970-01-01 00:00:00.000000100 |",
|
||||||
|
"| 70 | CT | 1970-01-01 00:00:00.000000100 |",
|
||||||
|
"| 70 | CT | 1970-01-01 00:00:00.000000500 |",
|
||||||
|
"| 5 | MT | 1970-01-01 00:00:00.000000005 |",
|
||||||
|
"| 30 | MT | 1970-01-01 00:00:00.000000005 |",
|
||||||
|
"| 1000 | MT | 1970-01-01 00:00:00.000001 |",
|
||||||
|
"| 1000 | MT | 1970-01-01 00:00:00.000001 |",
|
||||||
|
"| 1000 | MT | 1970-01-01 00:00:00.000002 |",
|
||||||
|
"| 5 | MT | 1970-01-01 00:00:00.000005 |",
|
||||||
|
"| 10 | MT | 1970-01-01 00:00:00.000007 |",
|
||||||
|
"| 20 | MT | 1970-01-01 00:00:00.000007 |",
|
||||||
|
"| 10 | MT | 1970-01-01 00:00:00.000007 |",
|
||||||
|
"+-----------+------+-------------------------------+",
|
||||||
|
];
|
||||||
|
assert_batches_eq!(&expected, &batch);
|
||||||
|
}
|
||||||
|
|
||||||
fn chunk_ids(group: &[Arc<TestChunk>]) -> String {
|
fn chunk_ids(group: &[Arc<TestChunk>]) -> String {
|
||||||
let ids = group.iter().map(|c| c.id().to_string()).collect::<Vec<_>>();
|
let ids = group.iter().map(|c| c.id().to_string()).collect::<Vec<_>>();
|
||||||
ids.join(", ")
|
ids.join(", ")
|
||||||
|
|
|
@ -21,7 +21,9 @@ use crate::{
|
||||||
use crate::{exec::Executor, pruning::Prunable};
|
use crate::{exec::Executor, pruning::Prunable};
|
||||||
|
|
||||||
use internal_types::{
|
use internal_types::{
|
||||||
schema::{builder::SchemaBuilder, merge::SchemaMerger, InfluxColumnType, Schema},
|
schema::{
|
||||||
|
builder::SchemaBuilder, merge::SchemaMerger, InfluxColumnType, Schema, TIME_COLUMN_NAME,
|
||||||
|
},
|
||||||
selection::Selection,
|
selection::Selection,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -233,7 +235,7 @@ impl TestChunk {
|
||||||
new_self
|
new_self
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Register a timetamp column with the test chunk
|
/// Register a timestamp column with the test chunk
|
||||||
pub fn with_time_column(self, table_name: impl Into<String>) -> Self {
|
pub fn with_time_column(self, table_name: impl Into<String>) -> Self {
|
||||||
let table_name = table_name.into();
|
let table_name = table_name.into();
|
||||||
|
|
||||||
|
@ -244,6 +246,36 @@ impl TestChunk {
|
||||||
self.add_schema_to_table(table_name, new_column_schema)
|
self.add_schema_to_table(table_name, new_column_schema)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Register a timestamp column with the test chunk
|
||||||
|
pub fn with_time_column_with_stats(
|
||||||
|
self,
|
||||||
|
table_name: impl Into<String>,
|
||||||
|
min: i64,
|
||||||
|
max: i64,
|
||||||
|
) -> Self {
|
||||||
|
let table_name = table_name.into();
|
||||||
|
|
||||||
|
let mut new_self = self.with_time_column(&table_name);
|
||||||
|
|
||||||
|
// Now, find the appropriate column summary and update the stats
|
||||||
|
let column_summary: &mut ColumnSummary = new_self
|
||||||
|
.table_summary
|
||||||
|
.as_mut()
|
||||||
|
.expect("had table summary")
|
||||||
|
.columns
|
||||||
|
.iter_mut()
|
||||||
|
.find(|c| c.name == TIME_COLUMN_NAME)
|
||||||
|
.expect("had column");
|
||||||
|
|
||||||
|
column_summary.stats = Statistics::I64(StatValues {
|
||||||
|
min: Some(min),
|
||||||
|
max: Some(max),
|
||||||
|
..Default::default()
|
||||||
|
});
|
||||||
|
|
||||||
|
new_self
|
||||||
|
}
|
||||||
|
|
||||||
/// Register an int field column with the test chunk
|
/// Register an int field column with the test chunk
|
||||||
pub fn with_int_field_column(
|
pub fn with_int_field_column(
|
||||||
self,
|
self,
|
||||||
|
@ -367,19 +399,146 @@ impl TestChunk {
|
||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Prepares this chunk to return a specific record batch with five
|
/// Prepares this chunk to return a specific record batch with three
|
||||||
/// rows of non null data that look like
|
/// rows of non null data that look like, no duplicates within
|
||||||
/// "+------+------+-----------+-------------------------------+",
|
/// "+------+------+-----------+-------------------------------+",
|
||||||
/// "| tag1 | tag2 | field_int | time |",
|
/// "| tag1 | tag2 | field_int | time |",
|
||||||
/// "+------+------+-----------+-------------------------------+",
|
/// "+------+------+-----------+-------------------------------+",
|
||||||
/// "| MA | MA | 1000 | 1970-01-01 00:00:00.000001 |",
|
/// "| WA | SC | 1000 | 1970-01-01 00:00:00.000008 |",
|
||||||
/// "| MT | MT | 10 | 1970-01-01 00:00:00.000007 |",
|
/// "| VT | NC | 10 | 1970-01-01 00:00:00.000010 |",
|
||||||
/// "| CT | CT | 70 | 1970-01-01 00:00:00.000000100 |",
|
/// "| UT | RI | 70 | 1970-01-01 00:00:00.000020 |",
|
||||||
/// "| AL | AL | 100 | 1970-01-01 00:00:00.000000050 |",
|
|
||||||
/// "| MT | MT | 5 | 1970-01-01 00:00:00.000005 |",
|
|
||||||
/// "+------+------+-----------+-------------------------------+",
|
/// "+------+------+-----------+-------------------------------+",
|
||||||
|
/// Stats(min, max) : tag1(UT, WA), tag2(RI, SC), time(8000, 20000)
|
||||||
|
pub fn with_three_rows_of_data(mut self, _table_name: impl Into<String>) -> Self {
|
||||||
|
let schema = self
|
||||||
|
.table_schema
|
||||||
|
.as_ref()
|
||||||
|
.expect("table must exist in TestChunk");
|
||||||
|
|
||||||
|
// create arrays
|
||||||
|
let columns = schema
|
||||||
|
.iter()
|
||||||
|
.map(|(_influxdb_column_type, field)| match field.data_type() {
|
||||||
|
DataType::Int64 => Arc::new(Int64Array::from(vec![1000, 10, 70])) as ArrayRef,
|
||||||
|
DataType::Utf8 => match field.name().as_str() {
|
||||||
|
"tag1" => Arc::new(StringArray::from(vec!["WA", "VT", "UT"])) as ArrayRef,
|
||||||
|
"tag2" => Arc::new(StringArray::from(vec!["SC", "NC", "RI"])) as ArrayRef,
|
||||||
|
_ => Arc::new(StringArray::from(vec!["TX", "PR", "OR"])) as ArrayRef,
|
||||||
|
},
|
||||||
|
DataType::Timestamp(TimeUnit::Nanosecond, _) => Arc::new(
|
||||||
|
TimestampNanosecondArray::from_vec(vec![8000, 10000, 20000], None),
|
||||||
|
) as ArrayRef,
|
||||||
|
DataType::Dictionary(key, value)
|
||||||
|
if key.as_ref() == &DataType::Int32 && value.as_ref() == &DataType::Utf8 =>
|
||||||
|
{
|
||||||
|
match field.name().as_str() {
|
||||||
|
"tag1" => Arc::new(
|
||||||
|
vec!["WA", "VT", "UT"]
|
||||||
|
.into_iter()
|
||||||
|
.collect::<DictionaryArray<Int32Type>>(),
|
||||||
|
) as ArrayRef,
|
||||||
|
"tag2" => Arc::new(
|
||||||
|
vec!["SC", "NC", "RI"]
|
||||||
|
.into_iter()
|
||||||
|
.collect::<DictionaryArray<Int32Type>>(),
|
||||||
|
) as ArrayRef,
|
||||||
|
_ => Arc::new(
|
||||||
|
vec!["TX", "PR", "OR"]
|
||||||
|
.into_iter()
|
||||||
|
.collect::<DictionaryArray<Int32Type>>(),
|
||||||
|
) as ArrayRef,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ => unimplemented!(
|
||||||
|
"Unimplemented data type for test database: {:?}",
|
||||||
|
field.data_type()
|
||||||
|
),
|
||||||
|
})
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
|
let batch = RecordBatch::try_new(schema.into(), columns).expect("made record batch");
|
||||||
|
|
||||||
|
self.table_data.push(Arc::new(batch));
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Prepares this chunk to return a specific record batch with four
|
||||||
|
/// rows of non null data that look like, duplicates within
|
||||||
|
/// "+------+------+-----------+-------------------------------+",
|
||||||
|
/// "| tag1 | tag2 | field_int | time |",
|
||||||
|
/// "+------+------+-----------+-------------------------------+",
|
||||||
|
/// "| WA | SC | 1000 | 1970-01-01 00:00:00.000028 |",
|
||||||
|
/// "| VT | NC | 10 | 1970-01-01 00:00:00.000210 |", (1)
|
||||||
|
/// "| UT | RI | 70 | 1970-01-01 00:00:00.000220 |",
|
||||||
|
/// "| VT | NC | 50 | 1970-01-01 00:00:00.000210 |", // duplicate of (1)
|
||||||
|
/// "+------+------+-----------+-------------------------------+",
|
||||||
|
/// Stats(min, max) : tag1(UT, WA), tag2(RI, SC), time(28000, 220000)
|
||||||
|
pub fn with_four_rows_of_data(mut self, _table_name: impl Into<String>) -> Self {
|
||||||
|
let schema = self
|
||||||
|
.table_schema
|
||||||
|
.as_ref()
|
||||||
|
.expect("table must exist in TestChunk");
|
||||||
|
|
||||||
|
// create arrays
|
||||||
|
let columns = schema
|
||||||
|
.iter()
|
||||||
|
.map(|(_influxdb_column_type, field)| match field.data_type() {
|
||||||
|
DataType::Int64 => Arc::new(Int64Array::from(vec![1000, 10, 70, 50])) as ArrayRef,
|
||||||
|
DataType::Utf8 => match field.name().as_str() {
|
||||||
|
"tag1" => Arc::new(StringArray::from(vec!["WA", "VT", "UT", "VT"])) as ArrayRef,
|
||||||
|
"tag2" => Arc::new(StringArray::from(vec!["SC", "NC", "RI", "NC"])) as ArrayRef,
|
||||||
|
_ => Arc::new(StringArray::from(vec!["TX", "PR", "OR", "AL"])) as ArrayRef,
|
||||||
|
},
|
||||||
|
DataType::Timestamp(TimeUnit::Nanosecond, _) => Arc::new(
|
||||||
|
TimestampNanosecondArray::from_vec(vec![8000, 10000, 20000, 10000], None),
|
||||||
|
) as ArrayRef,
|
||||||
|
DataType::Dictionary(key, value)
|
||||||
|
if key.as_ref() == &DataType::Int32 && value.as_ref() == &DataType::Utf8 =>
|
||||||
|
{
|
||||||
|
match field.name().as_str() {
|
||||||
|
"tag1" => Arc::new(
|
||||||
|
vec!["WA", "VT", "UT", "VT"]
|
||||||
|
.into_iter()
|
||||||
|
.collect::<DictionaryArray<Int32Type>>(),
|
||||||
|
) as ArrayRef,
|
||||||
|
"tag2" => Arc::new(
|
||||||
|
vec!["SC", "NC", "RI", "NC"]
|
||||||
|
.into_iter()
|
||||||
|
.collect::<DictionaryArray<Int32Type>>(),
|
||||||
|
) as ArrayRef,
|
||||||
|
_ => Arc::new(
|
||||||
|
vec!["TX", "PR", "OR", "AL"]
|
||||||
|
.into_iter()
|
||||||
|
.collect::<DictionaryArray<Int32Type>>(),
|
||||||
|
) as ArrayRef,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ => unimplemented!(
|
||||||
|
"Unimplemented data type for test database: {:?}",
|
||||||
|
field.data_type()
|
||||||
|
),
|
||||||
|
})
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
|
let batch = RecordBatch::try_new(schema.into(), columns).expect("made record batch");
|
||||||
|
|
||||||
|
self.table_data.push(Arc::new(batch));
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Prepares this chunk to return a specific record batch with five
|
||||||
|
/// rows of non null data that look like, no duplicates within
|
||||||
|
/// "+------+------+-----------+-------------------------------+",
|
||||||
|
/// "| tag1 | tag2 | field_int | time |",
|
||||||
|
/// "+------+------+-----------+-------------------------------+",
|
||||||
|
/// "| MT | CT | 1000 | 1970-01-01 00:00:00.000001 |",
|
||||||
|
/// "| MT | AL | 10 | 1970-01-01 00:00:00.000007 |",
|
||||||
|
/// "| CT | CT | 70 | 1970-01-01 00:00:00.000000100 |",
|
||||||
|
/// "| AL | MA | 100 | 1970-01-01 00:00:00.000000050 |",
|
||||||
|
/// "| MT | AL | 5 | 1970-01-01 00:00:00.000005 |",
|
||||||
|
/// "+------+------+-----------+-------------------------------+",
|
||||||
|
/// Stats(min, max) : tag1(AL, MT), tag2(AL, MA), time(5, 7000)
|
||||||
pub fn with_five_rows_of_data(mut self, _table_name: impl Into<String>) -> Self {
|
pub fn with_five_rows_of_data(mut self, _table_name: impl Into<String>) -> Self {
|
||||||
//let table_name = table_name.into();
|
|
||||||
let schema = self
|
let schema = self
|
||||||
.table_schema
|
.table_schema
|
||||||
.as_ref()
|
.as_ref()
|
||||||
|
@ -439,6 +598,88 @@ impl TestChunk {
|
||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Prepares this chunk to return a specific record batch with ten
|
||||||
|
/// rows of non null data that look like, duplicates within
|
||||||
|
/// "+------+------+-----------+-------------------------------+",
|
||||||
|
/// "| tag1 | tag2 | field_int | time |",
|
||||||
|
/// "+------+------+-----------+-------------------------------+",
|
||||||
|
/// "| MT | CT | 1000 | 1970-01-01 00:00:00.000001 |",
|
||||||
|
/// "| MT | AL | 10 | 1970-01-01 00:00:00.000007 |", (1)
|
||||||
|
/// "| CT | CT | 70 | 1970-01-01 00:00:00.000000100 |",
|
||||||
|
/// "| AL | MA | 100 | 1970-01-01 00:00:00.000000050 |", (2)
|
||||||
|
/// "| MT | AL | 5 | 1970-01-01 00:00:00.000005 |", (3)
|
||||||
|
/// "| MT | CT | 1000 | 1970-01-01 00:00:00.000002 |",
|
||||||
|
/// "| MT | AL | 20 | 1970-01-01 00:00:00.000007 |", // Duplicate with (1)
|
||||||
|
/// "| CT | CT | 70 | 1970-01-01 00:00:00.000000500 |",
|
||||||
|
/// "| AL | MA | 10 | 1970-01-01 00:00:00.000000050 |", // Duplicate with (2)
|
||||||
|
/// "| MT | AL | 30 | 1970-01-01 00:00:00.000005 |", // Duplicate with (3)
|
||||||
|
/// "+------+------+-----------+-------------------------------+",
|
||||||
|
/// Stats(min, max) : tag1(AL, MT), tag2(AL, MA), time(5, 7000)
|
||||||
|
pub fn with_ten_rows_of_data_some_duplicates(mut self, _table_name: impl Into<String>) -> Self {
|
||||||
|
//let table_name = table_name.into();
|
||||||
|
let schema = self
|
||||||
|
.table_schema
|
||||||
|
.as_ref()
|
||||||
|
.expect("table must exist in TestChunk");
|
||||||
|
|
||||||
|
// create arrays
|
||||||
|
let columns = schema
|
||||||
|
.iter()
|
||||||
|
.map(|(_influxdb_column_type, field)| match field.data_type() {
|
||||||
|
DataType::Int64 => Arc::new(Int64Array::from(vec![
|
||||||
|
1000, 10, 70, 100, 5, 1000, 20, 70, 10, 30,
|
||||||
|
])) as ArrayRef,
|
||||||
|
DataType::Utf8 => match field.name().as_str() {
|
||||||
|
"tag1" => Arc::new(StringArray::from(vec![
|
||||||
|
"MT", "MT", "CT", "AL", "MT", "MT", "MT", "CT", "AL", "MT",
|
||||||
|
])) as ArrayRef,
|
||||||
|
"tag2" => Arc::new(StringArray::from(vec![
|
||||||
|
"CT", "AL", "CT", "MA", "AL", "CT", "AL", "CT", "MA", "AL",
|
||||||
|
])) as ArrayRef,
|
||||||
|
_ => Arc::new(StringArray::from(vec![
|
||||||
|
"CT", "MT", "AL", "AL", "MT", "CT", "MT", "AL", "AL", "MT",
|
||||||
|
])) as ArrayRef,
|
||||||
|
},
|
||||||
|
DataType::Timestamp(TimeUnit::Nanosecond, _) => {
|
||||||
|
Arc::new(TimestampNanosecondArray::from_vec(
|
||||||
|
vec![1000, 7000, 100, 50, 5, 2000, 7000, 500, 50, 5],
|
||||||
|
None,
|
||||||
|
)) as ArrayRef
|
||||||
|
}
|
||||||
|
DataType::Dictionary(key, value)
|
||||||
|
if key.as_ref() == &DataType::Int32 && value.as_ref() == &DataType::Utf8 =>
|
||||||
|
{
|
||||||
|
match field.name().as_str() {
|
||||||
|
"tag1" => Arc::new(
|
||||||
|
vec!["MT", "MT", "CT", "AL", "MT", "MT", "MT", "CT", "AL", "MT"]
|
||||||
|
.into_iter()
|
||||||
|
.collect::<DictionaryArray<Int32Type>>(),
|
||||||
|
) as ArrayRef,
|
||||||
|
"tag2" => Arc::new(
|
||||||
|
vec!["CT", "AL", "CT", "MA", "AL", "CT", "AL", "CT", "MA", "AL"]
|
||||||
|
.into_iter()
|
||||||
|
.collect::<DictionaryArray<Int32Type>>(),
|
||||||
|
) as ArrayRef,
|
||||||
|
_ => Arc::new(
|
||||||
|
vec!["CT", "MT", "AL", "AL", "MT", "CT", "MT", "AL", "AL", "MT"]
|
||||||
|
.into_iter()
|
||||||
|
.collect::<DictionaryArray<Int32Type>>(),
|
||||||
|
) as ArrayRef,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ => unimplemented!(
|
||||||
|
"Unimplemented data type for test database: {:?}",
|
||||||
|
field.data_type()
|
||||||
|
),
|
||||||
|
})
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
|
let batch = RecordBatch::try_new(schema.into(), columns).expect("made record batch");
|
||||||
|
|
||||||
|
self.table_data.push(Arc::new(batch));
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
/// Returns all columns of the table
|
/// Returns all columns of the table
|
||||||
pub fn all_column_names(&self) -> Option<StringSet> {
|
pub fn all_column_names(&self) -> Option<StringSet> {
|
||||||
let column_names = self.table_schema.as_ref().map(|schema| {
|
let column_names = self.table_schema.as_ref().map(|schema| {
|
||||||
|
|
|
@ -1461,7 +1461,7 @@ mod tests {
|
||||||
.eq(1.0)
|
.eq(1.0)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
let expected_parquet_size = 759;
|
let expected_parquet_size = 647;
|
||||||
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "read_buffer", 1616).unwrap();
|
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "read_buffer", 1616).unwrap();
|
||||||
// now also in OS
|
// now also in OS
|
||||||
catalog_chunk_size_bytes_metric_eq(
|
catalog_chunk_size_bytes_metric_eq(
|
||||||
|
@ -1817,7 +1817,7 @@ mod tests {
|
||||||
("svr_id", "10"),
|
("svr_id", "10"),
|
||||||
])
|
])
|
||||||
.histogram()
|
.histogram()
|
||||||
.sample_sum_eq(2375.0)
|
.sample_sum_eq(2263.0)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
// it should be the same chunk!
|
// it should be the same chunk!
|
||||||
|
@ -1925,7 +1925,7 @@ mod tests {
|
||||||
("svr_id", "10"),
|
("svr_id", "10"),
|
||||||
])
|
])
|
||||||
.histogram()
|
.histogram()
|
||||||
.sample_sum_eq(2375.0)
|
.sample_sum_eq(2263.0)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
// Unload RB chunk but keep it in OS
|
// Unload RB chunk but keep it in OS
|
||||||
|
@ -1953,7 +1953,7 @@ mod tests {
|
||||||
("svr_id", "10"),
|
("svr_id", "10"),
|
||||||
])
|
])
|
||||||
.histogram()
|
.histogram()
|
||||||
.sample_sum_eq(759.0)
|
.sample_sum_eq(647.0)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
// Verify data written to the parquet file in object store
|
// Verify data written to the parquet file in object store
|
||||||
|
@ -2342,7 +2342,7 @@ mod tests {
|
||||||
Arc::from("cpu"),
|
Arc::from("cpu"),
|
||||||
0,
|
0,
|
||||||
ChunkStorage::ReadBufferAndObjectStore,
|
ChunkStorage::ReadBufferAndObjectStore,
|
||||||
2373, // size of RB and OS chunks
|
2261, // size of RB and OS chunks
|
||||||
1,
|
1,
|
||||||
),
|
),
|
||||||
ChunkSummary::new_without_timestamps(
|
ChunkSummary::new_without_timestamps(
|
||||||
|
@ -2402,7 +2402,7 @@ mod tests {
|
||||||
.memory()
|
.memory()
|
||||||
.parquet()
|
.parquet()
|
||||||
.get_total(),
|
.get_total(),
|
||||||
759
|
647
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2864,7 +2864,7 @@ mod tests {
|
||||||
let chunk = db.chunk(table_name, partition_key, *chunk_id).unwrap();
|
let chunk = db.chunk(table_name, partition_key, *chunk_id).unwrap();
|
||||||
let chunk = chunk.read();
|
let chunk = chunk.read();
|
||||||
if let ChunkStage::Persisted { parquet, .. } = chunk.stage() {
|
if let ChunkStage::Persisted { parquet, .. } = chunk.stage() {
|
||||||
paths_expected.push(parquet.table_path().display());
|
paths_expected.push(parquet.path().display());
|
||||||
} else {
|
} else {
|
||||||
panic!("Wrong chunk state.");
|
panic!("Wrong chunk state.");
|
||||||
}
|
}
|
||||||
|
@ -2944,7 +2944,7 @@ mod tests {
|
||||||
let chunk = db.chunk(&table_name, &partition_key, chunk_id).unwrap();
|
let chunk = db.chunk(&table_name, &partition_key, chunk_id).unwrap();
|
||||||
let chunk = chunk.read();
|
let chunk = chunk.read();
|
||||||
if let ChunkStage::Persisted { parquet, .. } = chunk.stage() {
|
if let ChunkStage::Persisted { parquet, .. } = chunk.stage() {
|
||||||
paths_keep.push(parquet.table_path());
|
paths_keep.push(parquet.path());
|
||||||
} else {
|
} else {
|
||||||
panic!("Wrong chunk state.");
|
panic!("Wrong chunk state.");
|
||||||
}
|
}
|
||||||
|
|
|
@ -195,7 +195,7 @@ impl DbChunk {
|
||||||
/// persisted, if any
|
/// persisted, if any
|
||||||
pub fn object_store_path(&self) -> Option<Path> {
|
pub fn object_store_path(&self) -> Option<Path> {
|
||||||
match &self.state {
|
match &self.state {
|
||||||
State::ParquetFile { chunk } => Some(chunk.table_path()),
|
State::ParquetFile { chunk } => Some(chunk.path()),
|
||||||
_ => None,
|
_ => None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -326,8 +326,8 @@ impl ChunkMover for LifecycleManager {
|
||||||
|
|
||||||
fn write_to_object_store(
|
fn write_to_object_store(
|
||||||
&mut self,
|
&mut self,
|
||||||
partition_key: String,
|
|
||||||
table_name: String,
|
table_name: String,
|
||||||
|
partition_key: String,
|
||||||
chunk_id: u32,
|
chunk_id: u32,
|
||||||
) -> TaskTracker<Self::Job> {
|
) -> TaskTracker<Self::Job> {
|
||||||
info!(%partition_key, %chunk_id, "write chunk to object store");
|
info!(%partition_key, %chunk_id, "write chunk to object store");
|
||||||
|
@ -338,7 +338,7 @@ impl ChunkMover for LifecycleManager {
|
||||||
tracker
|
tracker
|
||||||
}
|
}
|
||||||
|
|
||||||
fn drop_chunk(&mut self, partition_key: String, table_name: String, chunk_id: u32) {
|
fn drop_chunk(&mut self, table_name: String, partition_key: String, chunk_id: u32) {
|
||||||
info!(%partition_key, %chunk_id, "dropping chunk");
|
info!(%partition_key, %chunk_id, "dropping chunk");
|
||||||
let _ = self
|
let _ = self
|
||||||
.db
|
.db
|
||||||
|
|
|
@ -245,6 +245,32 @@ pub struct Config {
|
||||||
)]
|
)]
|
||||||
pub traces_exporter_jaeger_agent_port: NonZeroU16,
|
pub traces_exporter_jaeger_agent_port: NonZeroU16,
|
||||||
|
|
||||||
|
/// Tracing: Jaeger service name.
|
||||||
|
///
|
||||||
|
/// Only used if `--traces-exporter` is "jaeger".
|
||||||
|
#[structopt(
|
||||||
|
long = "--traces-exporter-jaeger-service-name",
|
||||||
|
env = "TRACES_EXPORTER_JAEGER_SERVICE_NAME",
|
||||||
|
default_value = "iox"
|
||||||
|
)]
|
||||||
|
pub traces_exporter_jaeger_service_name: String,
|
||||||
|
|
||||||
|
/// Tracing: Jaeger max UDP packet size
|
||||||
|
///
|
||||||
|
/// Default to 1300, which is a safe MTU.
|
||||||
|
///
|
||||||
|
/// You can increase it to 65000 if the target is a jaeger collector
|
||||||
|
/// on localhost. If so, the batching exporter will be enabled for
|
||||||
|
/// extra efficiency. Otherwise an UDP packet will be sent for each exported span.
|
||||||
|
///
|
||||||
|
/// Only used if `--traces-exporter` is "jaeger".
|
||||||
|
#[structopt(
|
||||||
|
long = "--traces-exporter-jaeger-max-packet-size",
|
||||||
|
env = "TRACES_EXPORTER_JAEGER_MAX_PACKET_SIZE",
|
||||||
|
default_value = "1300"
|
||||||
|
)]
|
||||||
|
pub traces_exporter_jaeger_max_packet_size: usize,
|
||||||
|
|
||||||
/// The identifier for the server.
|
/// The identifier for the server.
|
||||||
///
|
///
|
||||||
/// Used for writing to object storage and as an identifier that is added to
|
/// Used for writing to object storage and as an identifier that is added to
|
||||||
|
|
|
@ -160,13 +160,25 @@ fn construct_opentelemetry_tracer(config: &crate::commands::run::Config) -> Opti
|
||||||
config.traces_exporter_jaeger_agent_port
|
config.traces_exporter_jaeger_agent_port
|
||||||
);
|
);
|
||||||
opentelemetry::global::set_text_map_propagator(opentelemetry_jaeger::Propagator::new());
|
opentelemetry::global::set_text_map_propagator(opentelemetry_jaeger::Propagator::new());
|
||||||
Some(
|
Some({
|
||||||
opentelemetry_jaeger::new_pipeline()
|
let builder = opentelemetry_jaeger::new_pipeline()
|
||||||
.with_trace_config(trace_config)
|
.with_trace_config(trace_config)
|
||||||
.with_agent_endpoint(agent_endpoint)
|
.with_agent_endpoint(agent_endpoint)
|
||||||
.install_batch(opentelemetry::runtime::Tokio)
|
.with_service_name(&config.traces_exporter_jaeger_service_name)
|
||||||
.unwrap(),
|
.with_max_packet_size(config.traces_exporter_jaeger_max_packet_size);
|
||||||
)
|
|
||||||
|
// Batching is hard to tune because the max batch size
|
||||||
|
// is not currently exposed as a tunable from the trace config, and even then
|
||||||
|
// it's defined in terms of max number of spans, and not their size in bytes.
|
||||||
|
// Thus we enable batching only when the MTU size is 65000 which is the value suggested
|
||||||
|
// by jaeger when exporting to localhost.
|
||||||
|
if config.traces_exporter_jaeger_max_packet_size >= 65_000 {
|
||||||
|
builder.install_batch(opentelemetry::runtime::Tokio)
|
||||||
|
} else {
|
||||||
|
builder.install_simple()
|
||||||
|
}
|
||||||
|
.unwrap()
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
TracesExporter::Otlp => {
|
TracesExporter::Otlp => {
|
||||||
|
|
|
@ -4,7 +4,7 @@ pub mod management_api;
|
||||||
pub mod management_cli;
|
pub mod management_cli;
|
||||||
pub mod operations_api;
|
pub mod operations_api;
|
||||||
pub mod operations_cli;
|
pub mod operations_cli;
|
||||||
pub mod preservation;
|
mod persistence;
|
||||||
pub mod read_api;
|
pub mod read_api;
|
||||||
pub mod read_cli;
|
pub mod read_cli;
|
||||||
pub mod scenario;
|
pub mod scenario;
|
||||||
|
|
|
@ -1,9 +1,41 @@
|
||||||
use arrow_util::assert_batches_eq;
|
use arrow_util::assert_batches_eq;
|
||||||
use generated_types::influxdata::iox::management::v1::*;
|
use data_types::chunk_metadata::{ChunkStorage, ChunkSummary};
|
||||||
|
//use generated_types::influxdata::iox::management::v1::*;
|
||||||
use influxdb_iox_client::operations;
|
use influxdb_iox_client::operations;
|
||||||
|
|
||||||
use super::scenario::{collect_query, create_readable_database, rand_name};
|
use super::scenario::{
|
||||||
|
collect_query, create_quickly_persisting_database, create_readable_database, rand_name,
|
||||||
|
};
|
||||||
use crate::common::server_fixture::ServerFixture;
|
use crate::common::server_fixture::ServerFixture;
|
||||||
|
use std::convert::TryInto;
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_chunk_is_persisted_automatically() {
|
||||||
|
let fixture = ServerFixture::create_shared().await;
|
||||||
|
let mut write_client = fixture.write_client();
|
||||||
|
|
||||||
|
let db_name = rand_name();
|
||||||
|
create_quickly_persisting_database(&db_name, fixture.grpc_channel()).await;
|
||||||
|
|
||||||
|
// Stream in a write that should exceed the limit
|
||||||
|
let lp_lines: Vec<_> = (0..1_000)
|
||||||
|
.map(|i| format!("data,tag1=val{} x={} {}", i, i * 10, i))
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
let num_lines_written = write_client
|
||||||
|
.write(&db_name, lp_lines.join("\n"))
|
||||||
|
.await
|
||||||
|
.expect("successful write");
|
||||||
|
assert_eq!(num_lines_written, 1000);
|
||||||
|
|
||||||
|
wait_for_chunk(
|
||||||
|
&fixture,
|
||||||
|
&db_name,
|
||||||
|
ChunkStorage::ReadBufferAndObjectStore,
|
||||||
|
std::time::Duration::from_secs(5),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn test_query_chunk_after_restart() {
|
async fn test_query_chunk_after_restart() {
|
||||||
|
@ -53,6 +85,7 @@ async fn test_query_chunk_after_restart() {
|
||||||
assert_chunk_query_works(&fixture, &db_name).await;
|
assert_chunk_query_works(&fixture, &db_name).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Create a closed read buffer chunk and return its id
|
||||||
async fn create_readbuffer_chunk(fixture: &ServerFixture, db_name: &str) -> u32 {
|
async fn create_readbuffer_chunk(fixture: &ServerFixture, db_name: &str) -> u32 {
|
||||||
use influxdb_iox_client::management::generated_types::operation_metadata::Job;
|
use influxdb_iox_client::management::generated_types::operation_metadata::Job;
|
||||||
|
|
||||||
|
@ -69,14 +102,11 @@ async fn create_readbuffer_chunk(fixture: &ServerFixture, db_name: &str) -> u32
|
||||||
.await
|
.await
|
||||||
.expect("write succeded");
|
.expect("write succeded");
|
||||||
|
|
||||||
let chunks = management_client
|
let chunks = list_chunks(fixture, db_name).await;
|
||||||
.list_chunks(db_name)
|
|
||||||
.await
|
|
||||||
.expect("listing chunks");
|
|
||||||
|
|
||||||
assert_eq!(chunks.len(), 1, "Chunks: {:#?}", chunks);
|
assert_eq!(chunks.len(), 1, "Chunks: {:#?}", chunks);
|
||||||
let chunk_id = chunks[0].id;
|
let chunk_id = chunks[0].id;
|
||||||
assert_eq!(chunks[0].storage, ChunkStorage::OpenMutableBuffer as i32);
|
assert_eq!(chunks[0].storage, ChunkStorage::OpenMutableBuffer);
|
||||||
|
|
||||||
// Move the chunk to read buffer
|
// Move the chunk to read buffer
|
||||||
let operation = management_client
|
let operation = management_client
|
||||||
|
@ -107,19 +137,17 @@ async fn create_readbuffer_chunk(fixture: &ServerFixture, db_name: &str) -> u32
|
||||||
.expect("failed to wait operation");
|
.expect("failed to wait operation");
|
||||||
|
|
||||||
// And now the chunk should be good
|
// And now the chunk should be good
|
||||||
let mut chunks = management_client
|
let mut chunks = list_chunks(fixture, db_name).await;
|
||||||
.list_chunks(db_name)
|
|
||||||
.await
|
|
||||||
.expect("listing chunks");
|
|
||||||
chunks.sort_by(|c1, c2| c1.id.cmp(&c2.id));
|
chunks.sort_by(|c1, c2| c1.id.cmp(&c2.id));
|
||||||
|
|
||||||
assert_eq!(chunks.len(), 1, "Chunks: {:#?}", chunks);
|
assert_eq!(chunks.len(), 1, "Chunks: {:#?}", chunks);
|
||||||
assert_eq!(chunks[0].id, chunk_id);
|
assert_eq!(chunks[0].id, chunk_id);
|
||||||
assert_eq!(chunks[0].storage, ChunkStorage::ReadBuffer as i32);
|
assert_eq!(chunks[0].storage, ChunkStorage::ReadBuffer);
|
||||||
|
|
||||||
chunk_id
|
chunk_id
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Wait for the specified chunk to be persisted to object store
|
||||||
async fn wait_for_persisted_chunk(
|
async fn wait_for_persisted_chunk(
|
||||||
fixture: &ServerFixture,
|
fixture: &ServerFixture,
|
||||||
db_name: &str,
|
db_name: &str,
|
||||||
|
@ -129,11 +157,11 @@ async fn wait_for_persisted_chunk(
|
||||||
let t_start = std::time::Instant::now();
|
let t_start = std::time::Instant::now();
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
let mut management_client = fixture.management_client();
|
let chunks = list_chunks(fixture, db_name).await;
|
||||||
let chunks = management_client.list_chunks(db_name).await.unwrap();
|
|
||||||
let chunk = chunks.iter().find(|chunk| chunk.id == chunk_id).unwrap();
|
let chunk = chunks.iter().find(|chunk| chunk.id == chunk_id).unwrap();
|
||||||
if (chunk.storage == ChunkStorage::ReadBufferAndObjectStore as i32)
|
if (chunk.storage == ChunkStorage::ReadBufferAndObjectStore)
|
||||||
|| (chunk.storage == ChunkStorage::ObjectStoreOnly as i32)
|
|| (chunk.storage == ChunkStorage::ObjectStoreOnly)
|
||||||
{
|
{
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -143,6 +171,45 @@ async fn wait_for_persisted_chunk(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Wait for at least one chunk to be in the specified storage state
|
||||||
|
async fn wait_for_chunk(
|
||||||
|
fixture: &ServerFixture,
|
||||||
|
db_name: &str,
|
||||||
|
desired_storage: ChunkStorage,
|
||||||
|
wait_time: std::time::Duration,
|
||||||
|
) {
|
||||||
|
let t_start = std::time::Instant::now();
|
||||||
|
|
||||||
|
loop {
|
||||||
|
let chunks = list_chunks(fixture, db_name).await;
|
||||||
|
|
||||||
|
if chunks.iter().any(|chunk| chunk.storage == desired_storage) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Log the current status of the chunks
|
||||||
|
for chunk in &chunks {
|
||||||
|
println!(
|
||||||
|
"{:?}: chunk {} partition {} storage:{:?}",
|
||||||
|
(t_start.elapsed()),
|
||||||
|
chunk.id,
|
||||||
|
chunk.partition_key,
|
||||||
|
chunk.storage
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
assert!(
|
||||||
|
t_start.elapsed() < wait_time,
|
||||||
|
"Could not find chunk in desired state {:?} within {:?}. Chunks were: {:#?}",
|
||||||
|
desired_storage,
|
||||||
|
wait_time,
|
||||||
|
chunks
|
||||||
|
);
|
||||||
|
|
||||||
|
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
async fn assert_chunk_query_works(fixture: &ServerFixture, db_name: &str) {
|
async fn assert_chunk_query_works(fixture: &ServerFixture, db_name: &str) {
|
||||||
let mut client = fixture.flight_client();
|
let mut client = fixture.flight_client();
|
||||||
let sql_query = "select region, user, time from cpu";
|
let sql_query = "select region, user, time from cpu";
|
||||||
|
@ -160,3 +227,11 @@ async fn assert_chunk_query_works(fixture: &ServerFixture, db_name: &str) {
|
||||||
|
|
||||||
assert_batches_eq!(expected_read_data, &batches);
|
assert_batches_eq!(expected_read_data, &batches);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Gets the list of ChunkSummaries from the server
|
||||||
|
async fn list_chunks(fixture: &ServerFixture, db_name: &str) -> Vec<ChunkSummary> {
|
||||||
|
let mut management_client = fixture.management_client();
|
||||||
|
let chunks = management_client.list_chunks(db_name).await.unwrap();
|
||||||
|
|
||||||
|
chunks.into_iter().map(|c| c.try_into().unwrap()).collect()
|
||||||
|
}
|
|
@ -316,6 +316,43 @@ pub async fn create_readable_database(
|
||||||
.expect("create database failed");
|
.expect("create database failed");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// given a channel to talk with the management api, create a new
|
||||||
|
/// database with the specified name that will aggressively try and
|
||||||
|
/// persist all data quickly
|
||||||
|
pub async fn create_quickly_persisting_database(
|
||||||
|
db_name: impl Into<String>,
|
||||||
|
channel: tonic::transport::Channel,
|
||||||
|
) {
|
||||||
|
let db_name = db_name.into();
|
||||||
|
|
||||||
|
let mut management_client = influxdb_iox_client::management::Client::new(channel);
|
||||||
|
let rules = DatabaseRules {
|
||||||
|
name: db_name.clone(),
|
||||||
|
partition_template: Some(PartitionTemplate {
|
||||||
|
parts: vec![partition_template::Part {
|
||||||
|
part: Some(partition_template::part::Part::Time(
|
||||||
|
"%Y-%m-%d %H:00:00".into(),
|
||||||
|
)),
|
||||||
|
}],
|
||||||
|
}),
|
||||||
|
lifecycle_rules: Some(LifecycleRules {
|
||||||
|
mutable_linger_seconds: 1,
|
||||||
|
mutable_size_threshold: 100,
|
||||||
|
buffer_size_soft: 1024 * 1024,
|
||||||
|
buffer_size_hard: 1024 * 1024,
|
||||||
|
persist: true,
|
||||||
|
..Default::default()
|
||||||
|
}),
|
||||||
|
..Default::default()
|
||||||
|
};
|
||||||
|
|
||||||
|
management_client
|
||||||
|
.create_database(rules.clone())
|
||||||
|
.await
|
||||||
|
.expect("create database failed");
|
||||||
|
println!("Created quickly persisting database {}", db_name);
|
||||||
|
}
|
||||||
|
|
||||||
/// given a channel to talk with the managment api, create a new
|
/// given a channel to talk with the managment api, create a new
|
||||||
/// database with no mutable buffer configured, no partitioning rules
|
/// database with no mutable buffer configured, no partitioning rules
|
||||||
pub async fn create_unreadable_database(
|
pub async fn create_unreadable_database(
|
||||||
|
|
Loading…
Reference in New Issue