feat: Add iterators over different types of columns in Schema (#879)
* feat: Add iterators by InfluxColumnType * fix: Update data_types/src/schema.rs Implement PR suggestion Co-authored-by: Edd Robinson <me@edd.io> * refactor: Remove unecessary code Co-authored-by: Edd Robinson <me@edd.io> Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>pull/24376/head
parent
d4802d19d8
commit
c7343a4acf
|
@ -299,10 +299,44 @@ impl Schema {
|
|||
/// Returns an iterator of (Option<InfluxColumnType>, &Field) for
|
||||
/// all the columns of this schema, in order
|
||||
pub fn iter(&self) -> SchemaIter<'_> {
|
||||
SchemaIter {
|
||||
schema: self,
|
||||
idx: 0,
|
||||
}
|
||||
SchemaIter::new(self)
|
||||
}
|
||||
|
||||
/// Returns an iterator of `&Field` for all the tag columns of
|
||||
/// this schema, in order
|
||||
pub fn tags_iter(&self) -> impl Iterator<Item = &ArrowField> {
|
||||
self.iter().filter_map(|(influx_column_type, field)| {
|
||||
if matches!(influx_column_type, Some(InfluxColumnType::Tag)) {
|
||||
Some(field)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
/// Returns an iterator of `&Field` for all the field columns of
|
||||
/// this schema, in order
|
||||
pub fn fields_iter(&self) -> impl Iterator<Item = &ArrowField> {
|
||||
self.iter().filter_map(|(influx_column_type, field)| {
|
||||
if matches!(influx_column_type, Some(InfluxColumnType::Field(_))) {
|
||||
Some(field)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
/// Returns an iterator of `&Field` for all the timestamp columns
|
||||
/// of this schema, in order. At the time of writing there should
|
||||
/// be only one or 0 such columns
|
||||
pub fn time_iter(&self) -> impl Iterator<Item = &ArrowField> {
|
||||
self.iter().filter_map(|(influx_column_type, field)| {
|
||||
if matches!(influx_column_type, Some(InfluxColumnType::Timestamp)) {
|
||||
Some(field)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
/// Merges any new columns from new_schema, consuming self. If the
|
||||
|
@ -573,6 +607,12 @@ pub struct SchemaIter<'a> {
|
|||
idx: usize,
|
||||
}
|
||||
|
||||
impl<'a> SchemaIter<'a> {
|
||||
fn new(schema: &'a Schema) -> Self {
|
||||
Self { schema, idx: 0 }
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> fmt::Debug for SchemaIter<'a> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(f, "SchemaIter<{}>", self.idx)
|
||||
|
@ -829,15 +869,47 @@ mod test {
|
|||
}
|
||||
}
|
||||
|
||||
/// Build an empty iterator
|
||||
fn empty_schema() -> Schema {
|
||||
SchemaBuilder::new().build().unwrap()
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_iter_empty() {
|
||||
assert_eq!(empty_schema().iter().count(), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_tags_iter_empty() {
|
||||
assert_eq!(empty_schema().tags_iter().count(), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_fields_iter_empty() {
|
||||
assert_eq!(empty_schema().fields_iter().count(), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_time_iter_empty() {
|
||||
assert_eq!(empty_schema().time_iter().count(), 0);
|
||||
}
|
||||
|
||||
/// Build a schema for testing iterators
|
||||
fn iter_schema() -> Schema {
|
||||
SchemaBuilder::new()
|
||||
.influx_field("field1", Float)
|
||||
.tag("tag1")
|
||||
.timestamp()
|
||||
.influx_field("field2", String)
|
||||
.influx_field("field3", String)
|
||||
.tag("tag2")
|
||||
.build()
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_iter() {
|
||||
let schema = SchemaBuilder::new()
|
||||
.influx_field("the_field", String)
|
||||
.tag("the_tag")
|
||||
.timestamp()
|
||||
.measurement("the_measurement")
|
||||
.build()
|
||||
.unwrap();
|
||||
let schema = iter_schema();
|
||||
|
||||
// test schema iterator and field accessor match up
|
||||
for (i, (iter_col_type, iter_field)) in schema.iter().enumerate() {
|
||||
|
@ -845,7 +917,40 @@ mod test {
|
|||
assert_eq!(iter_col_type, col_type);
|
||||
assert_eq!(iter_field, field);
|
||||
}
|
||||
assert_eq!(schema.iter().count(), 3);
|
||||
assert_eq!(schema.iter().count(), 6);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_tags_iter() {
|
||||
let schema = iter_schema();
|
||||
|
||||
let mut iter = schema.tags_iter();
|
||||
assert_eq!(iter.next().unwrap().name(), "tag1");
|
||||
assert_eq!(iter.next().unwrap().name(), "tag2");
|
||||
assert_eq!(iter.next(), None);
|
||||
assert_eq!(iter.next(), None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_fields_iter() {
|
||||
let schema = iter_schema();
|
||||
|
||||
let mut iter = schema.fields_iter();
|
||||
assert_eq!(iter.next().unwrap().name(), "field1");
|
||||
assert_eq!(iter.next().unwrap().name(), "field2");
|
||||
assert_eq!(iter.next().unwrap().name(), "field3");
|
||||
assert_eq!(iter.next(), None);
|
||||
assert_eq!(iter.next(), None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_time_iter() {
|
||||
let schema = iter_schema();
|
||||
|
||||
let mut iter = schema.time_iter();
|
||||
assert_eq!(iter.next().unwrap().name(), "time");
|
||||
assert_eq!(iter.next(), None);
|
||||
assert_eq!(iter.next(), None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
|
Loading…
Reference in New Issue