feat: expose column_names via external API

pull/24376/head
Edd Robinson 2021-02-02 22:28:54 +00:00
parent fd28738abf
commit 6bec4c6eef
3 changed files with 283 additions and 36 deletions

View File

@ -217,7 +217,7 @@ impl Chunk {
/// Returns the distinct set of table names that contain data satisfying the
/// provided predicate.
///
/// `exclude_table_names` can be used to provide a set of table names to
/// `skip_table_names` can be used to provide a set of table names to
/// skip, typically because they're already included in results from other
/// chunks.
pub fn table_names(
@ -257,18 +257,27 @@ impl Chunk {
.collect::<BTreeSet<_>>()
}
/// Returns the distinct set of tag keys (column names) matching the
/// provided optional predicates and time range.
pub fn tag_keys(
/// Returns the distinct set of column names that contain data matching the
/// provided optional predicate.
///
/// `dst` is a buffer that will be populated with results. `column_names` is
/// smart enough to short-circuit processing on row groups when it
/// determines that all the columns in the row group are already contained
/// in the results buffer.
pub fn column_names(
&self,
table_name: String,
predicate: Predicate,
found_keys: &BTreeSet<ColumnName<'_>>,
) -> BTreeSet<ColumnName<'_>> {
// Lookup table by name and dispatch execution if the table's time range
// overlaps the requested time range *and* there exists columns in the
// table's schema that are *not* already found.
todo!();
table_name: &str,
predicate: &Predicate,
dst: BTreeSet<String>,
) -> BTreeSet<String> {
let chunk_data = self.chunk_data.read().unwrap();
// TODO(edd): same potential contention as `table_names` but I'm ok
// with this for now.
match chunk_data.data.get(table_name) {
Some(table) => table.column_names(predicate, dst),
None => dst,
}
}
/// Returns the distinct set of tag values (column values) for each provided

View File

@ -34,6 +34,10 @@ use table::Table;
/// `table_names`.
pub const TABLE_NAMES_COLUMN_NAME: &str = "table";
/// The name of the column containing column names returned by a call to
/// `column_names`.
pub const COLUMN_NAMES_COLUMN_NAME: &str = "column";
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("arrow conversion error: {}", source))]
@ -470,20 +474,42 @@ impl Database {
pub fn column_names(
&self,
partition_key: &str,
table_name: &str,
chunk_ids: &[u32],
predicate: Predicate,
) -> Result<RecordBatch> {
// Find all matching chunks using:
// - time range
// - measurement name.
//
// Execute query against matching chunks. The `tag_keys` method for
// a chunk allows the caller to provide already found tag keys
// (column names). This allows the execution to skip entire chunks,
// tables or segments if there are no new columns to be found there...
Err(Error::UnsupportedOperation {
msg: "`column_names` call not yet hooked up".to_owned(),
})
let partition_data = self.data.read().unwrap();
let partition = partition_data
.partitions
.get(partition_key)
.ok_or_else(|| Error::PartitionNotFound {
key: partition_key.to_owned(),
})?;
let chunk_data = partition.data.read().unwrap();
let mut filtered_chunks = vec![];
for id in chunk_ids {
filtered_chunks.push(
chunk_data
.chunks
.get(id)
.ok_or_else(|| Error::ChunkNotFound { id: *id })?,
);
}
let names = filtered_chunks
.iter()
.fold(BTreeSet::new(), |dst, chunk| {
// the dst buffer is pushed into each chunk's `column_names`
// implementation ensuring that we short-circuit any tables where
// we have already determined column names.
chunk.column_names(table_name, &predicate, dst)
}) // have a BTreeSet here, convert to an iterator of Some(&str)
.into_iter()
.map(Some);
str_iter_to_batch(COLUMN_NAMES_COLUMN_NAME, names).context(ArrowError)
}
}
@ -1042,6 +1068,126 @@ mod test {
);
}
#[test]
fn column_names() {
let mut db = Database::new();
let res_col = COLUMN_NAMES_COLUMN_NAME;
let schema = SchemaBuilder::new()
.non_null_tag("region")
.non_null_field("counter", Float64)
.timestamp()
.field("sketchy_sensor", Float64)
.build()
.unwrap()
.into();
let data: Vec<ArrayRef> = vec![
Arc::new(StringArray::from(vec!["west", "west", "east"])),
Arc::new(Float64Array::from(vec![1.2, 3.3, 45.3])),
Arc::new(Int64Array::from(vec![11111111, 222222, 3333])),
Arc::new(Float64Array::from(vec![Some(11.0), None, Some(12.0)])),
];
// Add the above table to a chunk and partition
let rb = RecordBatch::try_new(schema, data).unwrap();
db.upsert_partition("hour_1", 22, "Utopia", rb);
// Add a different but compatible table to a different chunk in the same
// partition.
let schema = SchemaBuilder::new()
.field("active", Boolean)
.timestamp()
.build()
.unwrap()
.into();
let data: Vec<ArrayRef> = vec![
Arc::new(BooleanArray::from(vec![Some(true), None, None])),
Arc::new(Int64Array::from(vec![10, 20, 30])),
];
let rb = RecordBatch::try_new(schema, data).unwrap();
db.upsert_partition("hour_1", 40, "Utopia", rb);
// Just query against the first chunk.
let result = db
.column_names("hour_1", "Utopia", &[22], Predicate::default())
.unwrap();
assert_rb_column_equals(
&result,
res_col,
&Values::String(
vec!["counter", "region", "sketchy_sensor", "time"]
.into_iter()
.map(Some)
.collect(),
),
);
// Now the second - different columns.
let result = db
.column_names("hour_1", "Utopia", &[40], Predicate::default())
.unwrap();
assert_rb_column_equals(
&result,
res_col,
&Values::String(vec!["active", "time"].into_iter().map(Some).collect()),
);
// And now the union across all chunks.
let result = db
.column_names("hour_1", "Utopia", &[22, 40], Predicate::default())
.unwrap();
assert_rb_column_equals(
&result,
res_col,
&Values::String(
vec!["active", "counter", "region", "sketchy_sensor", "time"]
.into_iter()
.map(Some)
.collect(),
),
);
// Testing predicates
let result = db
.column_names(
"hour_1",
"Utopia",
&[22, 40],
Predicate::new(vec![BinaryExpr::from(("time", "=", 30_i64))]),
)
.unwrap();
// only time will be returned - "active" in the second chunk is NULL for
// matching rows
assert_rb_column_equals(
&result,
res_col,
&Values::String(vec!["time"].into_iter().map(Some).collect()),
);
let result = db
.column_names(
"hour_1",
"Utopia",
&[22, 40],
Predicate::new(vec![BinaryExpr::from(("active", "=", true))]),
)
.unwrap();
// there exists at least one row in the second chunk with a matching
// non-null value across the active and time columns.
assert_rb_column_equals(
&result,
res_col,
&Values::String(vec!["active", "time"].into_iter().map(Some).collect()),
);
}
#[test]
fn read_filter_single_chunk() {
let mut db = Database::new();

View File

@ -425,21 +425,42 @@ impl Table {
// ---- Schema API queries
//
/// Returns the distinct set of tag keys (column names) matching the
/// provided optional predicates and time range.
pub fn tag_keys<'a>(
/// Returns a distinct set of column names in the table.
///
/// Optionally a predicate may be provided. In such a case only column names
/// will be returned belonging to columns whom have at least one non-null
/// value for any row satisfying the predicate.
pub fn column_names(
&self,
time_range: (i64, i64),
predicates: &[(&str, &str)],
found_columns: &BTreeSet<String>,
) -> BTreeSet<ColumnName<'a>> {
// Firstly, this should short-circuit early if all of the table's columns
// are present in `found_columns`.
predicate: &Predicate,
mut dst: BTreeSet<String>,
) -> BTreeSet<String> {
let table_data = self.table_data.read().unwrap();
// Short circuit execution if we have already got all of this table's
// columns in the results.
if table_data
.meta
.columns
.keys()
.all(|name| dst.contains(name))
{
return dst;
}
// Identify row groups where time range and predicates match could match
// the predicate. Get a snapshot of those, and the table meta-data.
//
// Otherwise, identify segments where time range and predicates match could
// match using segment meta data and then execute against those segments
// and merge results.
todo!();
// NOTE(edd): this takes another read lock on `self`. I think this is
// ok, but if it turns out it's not then we can move the
// `filter_row_groups` logic into here and not take the second read
// lock.
let (_, row_groups) = self.filter_row_groups(predicate);
for row_group in row_groups {
row_group.column_names(predicate, &mut dst);
}
dst
}
/// Returns the distinct set of tag values (column values) for each provided
@ -1269,4 +1290,75 @@ west,host-b,100
"
);
}
#[test]
fn column_names() {
// Build a row group.
let mut columns = BTreeMap::new();
let tc = ColumnType::Time(Column::from(&[1_i64, 2, 3][..]));
columns.insert("time".to_string(), tc);
let rc = ColumnType::Tag(Column::from(&["west", "south", "north"][..]));
columns.insert("region".to_string(), rc);
let rg = RowGroup::new(3, columns);
let mut table = Table::new("cpu".to_owned(), rg);
// add another row group
let mut columns = BTreeMap::new();
let tc = ColumnType::Time(Column::from(&[200_i64, 300, 400][..]));
columns.insert("time".to_string(), tc);
let rc = ColumnType::Tag(Column::from(vec![Some("north"), None, None].as_slice()));
columns.insert("region".to_string(), rc);
let rg = RowGroup::new(3, columns);
table.add_row_group(rg);
// Table looks like:
//
// region, time
// ------------
// west, 1
// south, 2
// north, 3
// <- next row group ->
// north, 200
// NULL, 300
// NULL, 400
let mut dst: BTreeSet<String> = BTreeSet::new();
dst = table.column_names(&Predicate::default(), dst);
assert_eq!(
dst.iter().cloned().collect::<Vec<_>>(),
vec!["region".to_owned(), "time".to_owned()],
);
// re-run and get the same answer
dst = table.column_names(&Predicate::default(), dst);
assert_eq!(
dst.iter().cloned().collect::<Vec<_>>(),
vec!["region".to_owned(), "time".to_owned()],
);
// include a predicate that doesn't match any region rows and still get
// region from previous results.
dst = table.column_names(
&Predicate::new(vec![BinaryExpr::from(("time", ">=", 300_i64))]),
dst,
);
assert_eq!(
dst.iter().cloned().collect::<Vec<_>>(),
vec!["region".to_owned(), "time".to_owned()],
);
// wipe the destination buffer and region won't show up
dst = table.column_names(
&Predicate::new(vec![BinaryExpr::from(("time", ">=", 300_i64))]),
BTreeSet::new(),
);
assert_eq!(
dst.iter().cloned().collect::<Vec<_>>(),
vec!["time".to_owned()],
);
}
}