diff --git a/read_buffer/src/chunk.rs b/read_buffer/src/chunk.rs index 25faf02e65..1ca84a1789 100644 --- a/read_buffer/src/chunk.rs +++ b/read_buffer/src/chunk.rs @@ -217,7 +217,7 @@ impl Chunk { /// Returns the distinct set of table names that contain data satisfying the /// provided predicate. /// - /// `exclude_table_names` can be used to provide a set of table names to + /// `skip_table_names` can be used to provide a set of table names to /// skip, typically because they're already included in results from other /// chunks. pub fn table_names( @@ -257,18 +257,27 @@ impl Chunk { .collect::>() } - /// Returns the distinct set of tag keys (column names) matching the - /// provided optional predicates and time range. - pub fn tag_keys( + /// Returns the distinct set of column names that contain data matching the + /// provided optional predicate. + /// + /// `dst` is a buffer that will be populated with results. `column_names` is + /// smart enough to short-circuit processing on row groups when it + /// determines that all the columns in the row group are already contained + /// in the results buffer. + pub fn column_names( &self, - table_name: String, - predicate: Predicate, - found_keys: &BTreeSet>, - ) -> BTreeSet> { - // Lookup table by name and dispatch execution if the table's time range - // overlaps the requested time range *and* there exists columns in the - // table's schema that are *not* already found. - todo!(); + table_name: &str, + predicate: &Predicate, + dst: BTreeSet, + ) -> BTreeSet { + let chunk_data = self.chunk_data.read().unwrap(); + + // TODO(edd): same potential contention as `table_names` but I'm ok + // with this for now. + match chunk_data.data.get(table_name) { + Some(table) => table.column_names(predicate, dst), + None => dst, + } } /// Returns the distinct set of tag values (column values) for each provided diff --git a/read_buffer/src/lib.rs b/read_buffer/src/lib.rs index 91b13bdf2e..ba1a68f522 100644 --- a/read_buffer/src/lib.rs +++ b/read_buffer/src/lib.rs @@ -34,6 +34,10 @@ use table::Table; /// `table_names`. pub const TABLE_NAMES_COLUMN_NAME: &str = "table"; +/// The name of the column containing column names returned by a call to +/// `column_names`. +pub const COLUMN_NAMES_COLUMN_NAME: &str = "column"; + #[derive(Debug, Snafu)] pub enum Error { #[snafu(display("arrow conversion error: {}", source))] @@ -470,20 +474,42 @@ impl Database { pub fn column_names( &self, partition_key: &str, + table_name: &str, chunk_ids: &[u32], predicate: Predicate, ) -> Result { - // Find all matching chunks using: - // - time range - // - measurement name. - // - // Execute query against matching chunks. The `tag_keys` method for - // a chunk allows the caller to provide already found tag keys - // (column names). This allows the execution to skip entire chunks, - // tables or segments if there are no new columns to be found there... - Err(Error::UnsupportedOperation { - msg: "`column_names` call not yet hooked up".to_owned(), - }) + let partition_data = self.data.read().unwrap(); + + let partition = partition_data + .partitions + .get(partition_key) + .ok_or_else(|| Error::PartitionNotFound { + key: partition_key.to_owned(), + })?; + + let chunk_data = partition.data.read().unwrap(); + let mut filtered_chunks = vec![]; + for id in chunk_ids { + filtered_chunks.push( + chunk_data + .chunks + .get(id) + .ok_or_else(|| Error::ChunkNotFound { id: *id })?, + ); + } + + let names = filtered_chunks + .iter() + .fold(BTreeSet::new(), |dst, chunk| { + // the dst buffer is pushed into each chunk's `column_names` + // implementation ensuring that we short-circuit any tables where + // we have already determined column names. + chunk.column_names(table_name, &predicate, dst) + }) // have a BTreeSet here, convert to an iterator of Some(&str) + .into_iter() + .map(Some); + + str_iter_to_batch(COLUMN_NAMES_COLUMN_NAME, names).context(ArrowError) } } @@ -1042,6 +1068,126 @@ mod test { ); } + #[test] + fn column_names() { + let mut db = Database::new(); + let res_col = COLUMN_NAMES_COLUMN_NAME; + + let schema = SchemaBuilder::new() + .non_null_tag("region") + .non_null_field("counter", Float64) + .timestamp() + .field("sketchy_sensor", Float64) + .build() + .unwrap() + .into(); + + let data: Vec = vec![ + Arc::new(StringArray::from(vec!["west", "west", "east"])), + Arc::new(Float64Array::from(vec![1.2, 3.3, 45.3])), + Arc::new(Int64Array::from(vec![11111111, 222222, 3333])), + Arc::new(Float64Array::from(vec![Some(11.0), None, Some(12.0)])), + ]; + + // Add the above table to a chunk and partition + let rb = RecordBatch::try_new(schema, data).unwrap(); + db.upsert_partition("hour_1", 22, "Utopia", rb); + + // Add a different but compatible table to a different chunk in the same + // partition. + let schema = SchemaBuilder::new() + .field("active", Boolean) + .timestamp() + .build() + .unwrap() + .into(); + + let data: Vec = vec![ + Arc::new(BooleanArray::from(vec![Some(true), None, None])), + Arc::new(Int64Array::from(vec![10, 20, 30])), + ]; + let rb = RecordBatch::try_new(schema, data).unwrap(); + db.upsert_partition("hour_1", 40, "Utopia", rb); + + // Just query against the first chunk. + let result = db + .column_names("hour_1", "Utopia", &[22], Predicate::default()) + .unwrap(); + + assert_rb_column_equals( + &result, + res_col, + &Values::String( + vec!["counter", "region", "sketchy_sensor", "time"] + .into_iter() + .map(Some) + .collect(), + ), + ); + + // Now the second - different columns. + let result = db + .column_names("hour_1", "Utopia", &[40], Predicate::default()) + .unwrap(); + + assert_rb_column_equals( + &result, + res_col, + &Values::String(vec!["active", "time"].into_iter().map(Some).collect()), + ); + + // And now the union across all chunks. + let result = db + .column_names("hour_1", "Utopia", &[22, 40], Predicate::default()) + .unwrap(); + + assert_rb_column_equals( + &result, + res_col, + &Values::String( + vec!["active", "counter", "region", "sketchy_sensor", "time"] + .into_iter() + .map(Some) + .collect(), + ), + ); + + // Testing predicates + let result = db + .column_names( + "hour_1", + "Utopia", + &[22, 40], + Predicate::new(vec![BinaryExpr::from(("time", "=", 30_i64))]), + ) + .unwrap(); + + // only time will be returned - "active" in the second chunk is NULL for + // matching rows + assert_rb_column_equals( + &result, + res_col, + &Values::String(vec!["time"].into_iter().map(Some).collect()), + ); + + let result = db + .column_names( + "hour_1", + "Utopia", + &[22, 40], + Predicate::new(vec![BinaryExpr::from(("active", "=", true))]), + ) + .unwrap(); + + // there exists at least one row in the second chunk with a matching + // non-null value across the active and time columns. + assert_rb_column_equals( + &result, + res_col, + &Values::String(vec!["active", "time"].into_iter().map(Some).collect()), + ); + } + #[test] fn read_filter_single_chunk() { let mut db = Database::new(); diff --git a/read_buffer/src/table.rs b/read_buffer/src/table.rs index ef665c2e07..36aade47dd 100644 --- a/read_buffer/src/table.rs +++ b/read_buffer/src/table.rs @@ -425,21 +425,42 @@ impl Table { // ---- Schema API queries // - /// Returns the distinct set of tag keys (column names) matching the - /// provided optional predicates and time range. - pub fn tag_keys<'a>( + /// Returns a distinct set of column names in the table. + /// + /// Optionally a predicate may be provided. In such a case only column names + /// will be returned belonging to columns whom have at least one non-null + /// value for any row satisfying the predicate. + pub fn column_names( &self, - time_range: (i64, i64), - predicates: &[(&str, &str)], - found_columns: &BTreeSet, - ) -> BTreeSet> { - // Firstly, this should short-circuit early if all of the table's columns - // are present in `found_columns`. + predicate: &Predicate, + mut dst: BTreeSet, + ) -> BTreeSet { + let table_data = self.table_data.read().unwrap(); + + // Short circuit execution if we have already got all of this table's + // columns in the results. + if table_data + .meta + .columns + .keys() + .all(|name| dst.contains(name)) + { + return dst; + } + + // Identify row groups where time range and predicates match could match + // the predicate. Get a snapshot of those, and the table meta-data. // - // Otherwise, identify segments where time range and predicates match could - // match using segment meta data and then execute against those segments - // and merge results. - todo!(); + // NOTE(edd): this takes another read lock on `self`. I think this is + // ok, but if it turns out it's not then we can move the + // `filter_row_groups` logic into here and not take the second read + // lock. + let (_, row_groups) = self.filter_row_groups(predicate); + for row_group in row_groups { + row_group.column_names(predicate, &mut dst); + } + + dst } /// Returns the distinct set of tag values (column values) for each provided @@ -1269,4 +1290,75 @@ west,host-b,100 " ); } + + #[test] + fn column_names() { + // Build a row group. + let mut columns = BTreeMap::new(); + let tc = ColumnType::Time(Column::from(&[1_i64, 2, 3][..])); + columns.insert("time".to_string(), tc); + + let rc = ColumnType::Tag(Column::from(&["west", "south", "north"][..])); + columns.insert("region".to_string(), rc); + let rg = RowGroup::new(3, columns); + let mut table = Table::new("cpu".to_owned(), rg); + + // add another row group + let mut columns = BTreeMap::new(); + let tc = ColumnType::Time(Column::from(&[200_i64, 300, 400][..])); + columns.insert("time".to_string(), tc); + + let rc = ColumnType::Tag(Column::from(vec![Some("north"), None, None].as_slice())); + columns.insert("region".to_string(), rc); + let rg = RowGroup::new(3, columns); + table.add_row_group(rg); + + // Table looks like: + // + // region, time + // ------------ + // west, 1 + // south, 2 + // north, 3 + // <- next row group -> + // north, 200 + // NULL, 300 + // NULL, 400 + + let mut dst: BTreeSet = BTreeSet::new(); + dst = table.column_names(&Predicate::default(), dst); + + assert_eq!( + dst.iter().cloned().collect::>(), + vec!["region".to_owned(), "time".to_owned()], + ); + + // re-run and get the same answer + dst = table.column_names(&Predicate::default(), dst); + assert_eq!( + dst.iter().cloned().collect::>(), + vec!["region".to_owned(), "time".to_owned()], + ); + + // include a predicate that doesn't match any region rows and still get + // region from previous results. + dst = table.column_names( + &Predicate::new(vec![BinaryExpr::from(("time", ">=", 300_i64))]), + dst, + ); + assert_eq!( + dst.iter().cloned().collect::>(), + vec!["region".to_owned(), "time".to_owned()], + ); + + // wipe the destination buffer and region won't show up + dst = table.column_names( + &Predicate::new(vec![BinaryExpr::from(("time", ">=", 300_i64))]), + BTreeSet::new(), + ); + assert_eq!( + dst.iter().cloned().collect::>(), + vec!["time".to_owned()], + ); + } }