feat: add support for tag_values cli
parent
38a889ecf6
commit
eb733042ca
|
@ -109,12 +109,14 @@ fn parse_db_name(db_name: &str) -> Result<OrgAndBucket, ParseError> {
|
|||
/// All possible subcommands for storage
|
||||
#[derive(Debug, clap::Parser)]
|
||||
enum Command {
|
||||
/// Issue a read_filter request
|
||||
ReadFilter,
|
||||
TagValues(TagValues),
|
||||
}
|
||||
|
||||
// #[derive(Debug, clap::Parser)]
|
||||
// struct ReadFilter {}
|
||||
#[derive(Debug, clap::Parser)]
|
||||
struct TagValues {
|
||||
tag_key: String,
|
||||
}
|
||||
|
||||
/// Create and issue read request
|
||||
pub async fn command(connection: Connection, config: Config) -> Result<()> {
|
||||
|
@ -124,9 +126,9 @@ pub async fn command(connection: Connection, config: Config) -> Result<()> {
|
|||
let predicate = config.predicate.root.is_some().then(|| config.predicate);
|
||||
|
||||
let source = Client::read_source(&config.db_name, 0);
|
||||
let result = match config.command {
|
||||
match config.command {
|
||||
Command::ReadFilter => {
|
||||
client
|
||||
let result = client
|
||||
.read_filter(request::read_filter(
|
||||
source,
|
||||
config.start,
|
||||
|
@ -134,11 +136,23 @@ pub async fn command(connection: Connection, config: Config) -> Result<()> {
|
|||
predicate,
|
||||
))
|
||||
.await
|
||||
.context(ServerSnafu)?;
|
||||
response::pretty_print_frames(&result).context(ResponseSnafu)
|
||||
}
|
||||
Command::TagValues(tv) => {
|
||||
let result = client
|
||||
.tag_values(request::tag_values(
|
||||
source,
|
||||
config.start,
|
||||
config.stop,
|
||||
predicate,
|
||||
tv.tag_key,
|
||||
))
|
||||
.await
|
||||
.context(ServerSnafu)?;
|
||||
response::pretty_print_strings(result).context(ResponseSnafu)
|
||||
}
|
||||
}
|
||||
.context(ServerSnafu)?;
|
||||
|
||||
response::pretty_print(&result).context(ResponseSnafu)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
|
|
@ -20,12 +20,26 @@ pub fn read_filter(
|
|||
}
|
||||
}
|
||||
|
||||
pub fn tag_values(
|
||||
org_bucket: Any,
|
||||
start: i64,
|
||||
stop: i64,
|
||||
predicate: std::option::Option<Predicate>,
|
||||
tag_key: String,
|
||||
) -> TagValuesRequest {
|
||||
generated_types::TagValuesRequest {
|
||||
predicate,
|
||||
tags_source: Some(org_bucket),
|
||||
range: Some(TimestampRange { start, end: stop }),
|
||||
tag_key: tag_key.into(),
|
||||
}
|
||||
}
|
||||
|
||||
// TODO Add the following helpers for building requests:
|
||||
//
|
||||
// * read_group
|
||||
// * read_window_aggregate
|
||||
// * tag_keys
|
||||
// * tag_values
|
||||
// * tag_values_with_measurement_and_key
|
||||
// * measurement_names
|
||||
// * measurement_tag_keys
|
||||
|
|
|
@ -39,20 +39,41 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
|
|||
|
||||
// Prints the provided data frames in a tabular format grouped into tables per
|
||||
// distinct measurement.
|
||||
pub fn pretty_print(frames: &[Data]) -> Result<()> {
|
||||
let rbs = into_record_batches(frames)?;
|
||||
pub fn pretty_print_frames(frames: &[Data]) -> Result<()> {
|
||||
let rbs = frames_to_record_batches(frames)?;
|
||||
for (k, rb) in rbs {
|
||||
println!("_measurement: {}", k);
|
||||
println!("\n_measurement: {}", k);
|
||||
println!("rows: {:?}", &rb.num_rows());
|
||||
print_batches(&[rb]).context(ArrowSnafu)?;
|
||||
println!("\n\n");
|
||||
println!("\n");
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Prints the provided set of strings in a tabular format grouped.
|
||||
pub fn pretty_print_strings(values: Vec<String>) -> Result<()> {
|
||||
let schema = SchemaBuilder::new()
|
||||
.influx_field("values", InfluxFieldType::String)
|
||||
.build()
|
||||
.context(SchemaBuildingSnafu)?;
|
||||
|
||||
let arrow_schema: arrow::datatypes::SchemaRef = schema.into();
|
||||
let rb_columns: Vec<Arc<dyn arrow::array::Array>> =
|
||||
vec![Arc::new(arrow::array::StringArray::from(
|
||||
values.iter().map(|x| Some(x.as_str())).collect::<Vec<_>>(),
|
||||
))];
|
||||
|
||||
let rb = RecordBatch::try_new(arrow_schema, rb_columns).context(ArrowSnafu)?;
|
||||
|
||||
println!("\ntag values: {:?}", &rb.num_rows());
|
||||
print_batches(&[rb]).context(ArrowSnafu)?;
|
||||
println!("\n");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// This function takes a set of InfluxRPC data frames and converts them into an
|
||||
// Arrow record batches, which are suitable for pretty printing.
|
||||
fn into_record_batches(frames: &[Data]) -> Result<BTreeMap<String, RecordBatch>> {
|
||||
fn frames_to_record_batches(frames: &[Data]) -> Result<BTreeMap<String, RecordBatch>> {
|
||||
// Run through all the frames once to build the schema of each table we need
|
||||
// to build as a record batch.
|
||||
let mut table_column_mapping = determine_tag_columns(frames);
|
||||
|
@ -728,7 +749,7 @@ mod test_super {
|
|||
fn test_into_record_batches() {
|
||||
let frames = gen_frames();
|
||||
|
||||
let rbs = into_record_batches(&frames);
|
||||
let rbs = frames_to_record_batches(&frames);
|
||||
let exp = vec![
|
||||
(
|
||||
"another table",
|
||||
|
|
Loading…
Reference in New Issue