Implement casting support for different iterator types

Out of a list of iterators, an overarching iterator type is chosen and
only iterators of that type are returned for the merge iterator. If a
type can be cast to another type, an extra cast iterator is created to
handle that casting.

The only supported cast is from integers to floats.
pull/5196/head
Jonathan A. Sternberg 2016-02-02 22:12:32 -05:00 committed by Ben Johnson
parent dbb9b36d84
commit 5605bbb22e
5 changed files with 258 additions and 47 deletions

View File

@ -23,10 +23,10 @@ const (
Float = 1 Float = 1
// Integer means the data type is a integer // Integer means the data type is a integer
Integer = 2 Integer = 2
// Boolean means the data type is a boolean.
Boolean = 3
// String means the data type is a string of text. // String means the data type is a string of text.
String = 4 String = 3
// Boolean means the data type is a boolean.
Boolean = 4
// Time means the data type is a time. // Time means the data type is a time.
Time = 5 Time = 5
// Duration means the data type is a duration of time. // Duration means the data type is a duration of time.
@ -40,10 +40,10 @@ func InspectDataType(v interface{}) DataType {
return Float return Float
case int64, int32, int: case int64, int32, int:
return Integer return Integer
case bool:
return Boolean
case string: case string:
return String return String
case bool:
return Boolean
case time.Time: case time.Time:
return Time return Time
case time.Duration: case time.Duration:
@ -59,10 +59,10 @@ func (d DataType) String() string {
return "float" return "float"
case Integer: case Integer:
return "integer" return "integer"
case Boolean:
return "boolean"
case String: case String:
return "string" return "string"
case Boolean:
return "boolean"
case Time: case Time:
return "time" return "time"
case Duration: case Duration:

View File

@ -17,11 +17,21 @@ type FloatIterator interface {
} }
// newFloatIterators converts a slice of Iterator to a slice of FloatIterator. // newFloatIterators converts a slice of Iterator to a slice of FloatIterator.
// Panic if any iterator in itrs is not a FloatIterator. // Drop and closes any iterator in itrs that is not a FloatIterator and cannot
// be cast to a FloatIterator.
func newFloatIterators(itrs []Iterator) []FloatIterator { func newFloatIterators(itrs []Iterator) []FloatIterator {
a := make([]FloatIterator, len(itrs)) a := make([]FloatIterator, 0, len(itrs))
for i, itr := range itrs { for _, itr := range itrs {
a[i] = itr.(FloatIterator) switch itr := itr.(type) {
case FloatIterator:
a = append(a, itr)
case IntegerIterator:
a = append(a, &integerFloatCastIterator{input: itr})
default:
itr.Close()
}
} }
return a return a
} }
@ -687,11 +697,18 @@ type IntegerIterator interface {
} }
// newIntegerIterators converts a slice of Iterator to a slice of IntegerIterator. // newIntegerIterators converts a slice of Iterator to a slice of IntegerIterator.
// Panic if any iterator in itrs is not a IntegerIterator. // Drop and closes any iterator in itrs that is not a IntegerIterator and cannot
// be cast to a IntegerIterator.
func newIntegerIterators(itrs []Iterator) []IntegerIterator { func newIntegerIterators(itrs []Iterator) []IntegerIterator {
a := make([]IntegerIterator, len(itrs)) a := make([]IntegerIterator, 0, len(itrs))
for i, itr := range itrs { for _, itr := range itrs {
a[i] = itr.(IntegerIterator) switch itr := itr.(type) {
case IntegerIterator:
a = append(a, itr)
default:
itr.Close()
}
} }
return a return a
} }
@ -1357,11 +1374,18 @@ type StringIterator interface {
} }
// newStringIterators converts a slice of Iterator to a slice of StringIterator. // newStringIterators converts a slice of Iterator to a slice of StringIterator.
// Panic if any iterator in itrs is not a StringIterator. // Drop and closes any iterator in itrs that is not a StringIterator and cannot
// be cast to a StringIterator.
func newStringIterators(itrs []Iterator) []StringIterator { func newStringIterators(itrs []Iterator) []StringIterator {
a := make([]StringIterator, len(itrs)) a := make([]StringIterator, 0, len(itrs))
for i, itr := range itrs { for _, itr := range itrs {
a[i] = itr.(StringIterator) switch itr := itr.(type) {
case StringIterator:
a = append(a, itr)
default:
itr.Close()
}
} }
return a return a
} }
@ -2027,11 +2051,18 @@ type BooleanIterator interface {
} }
// newBooleanIterators converts a slice of Iterator to a slice of BooleanIterator. // newBooleanIterators converts a slice of Iterator to a slice of BooleanIterator.
// Panic if any iterator in itrs is not a BooleanIterator. // Drop and closes any iterator in itrs that is not a BooleanIterator and cannot
// be cast to a BooleanIterator.
func newBooleanIterators(itrs []Iterator) []BooleanIterator { func newBooleanIterators(itrs []Iterator) []BooleanIterator {
a := make([]BooleanIterator, len(itrs)) a := make([]BooleanIterator, 0, len(itrs))
for i, itr := range itrs { for _, itr := range itrs {
a[i] = itr.(BooleanIterator) switch itr := itr.(type) {
case BooleanIterator:
a = append(a, itr)
default:
itr.Close()
}
} }
return a return a
} }

View File

@ -16,11 +16,21 @@ type {{.Name}}Iterator interface {
} }
// new{{.Name}}Iterators converts a slice of Iterator to a slice of {{.Name}}Iterator. // new{{.Name}}Iterators converts a slice of Iterator to a slice of {{.Name}}Iterator.
// Panic if any iterator in itrs is not a {{.Name}}Iterator. // Drop and closes any iterator in itrs that is not a {{.Name}}Iterator and cannot
// be cast to a {{.Name}}Iterator.
func new{{.Name}}Iterators(itrs []Iterator) []{{.Name}}Iterator { func new{{.Name}}Iterators(itrs []Iterator) []{{.Name}}Iterator {
a := make([]{{.Name}}Iterator, len(itrs)) a := make([]{{.Name}}Iterator, 0, len(itrs))
for i, itr := range itrs { for _, itr := range itrs {
a[i] = itr.({{.Name}}Iterator) switch itr := itr.(type) {
case {{.Name}}Iterator:
a = append(a, itr)
{{if eq .Name "Float"}}
case IntegerIterator:
a = append(a, &integerFloatCastIterator{input: itr})
{{end}}
default:
itr.Close()
}
} }
return a return a
} }

View File

@ -47,6 +47,53 @@ func (a Iterators) filterNonNil() []Iterator {
return other return other
} }
// castType determines what type to cast the set of iterators to.
// An iterator type is chosen using this hierarchy:
// float > integer > string > boolean
func (a Iterators) castType() DataType {
if len(a) == 0 {
return Unknown
}
typ := DataType(Boolean)
for _, input := range a {
switch input.(type) {
case FloatIterator:
// Once a float iterator is found, short circuit the end.
return Float
case IntegerIterator:
if typ > Integer {
typ = Integer
}
case StringIterator:
if typ > String {
typ = String
}
case BooleanIterator:
// Boolean is the lowest type.
}
}
return typ
}
// cast casts an array of iterators to a single type.
// Iterators that are not compatible or cannot be cast to the
// chosen iterator type are closed and dropped.
func (a Iterators) cast() interface{} {
typ := a.castType()
switch typ {
case Float:
return newFloatIterators(a)
case Integer:
return newIntegerIterators(a)
case String:
return newStringIterators(a)
case Boolean:
return newBooleanIterators(a)
}
return a
}
// NewMergeIterator returns an iterator to merge itrs into one. // NewMergeIterator returns an iterator to merge itrs into one.
// Inputs must either be merge iterators or only contain a single name/tag in // Inputs must either be merge iterators or only contain a single name/tag in
// sorted order. The iterator will output all points by window, name/tag, then // sorted order. The iterator will output all points by window, name/tag, then
@ -60,17 +107,17 @@ func NewMergeIterator(inputs []Iterator, opt IteratorOptions) Iterator {
// Aggregate functions can use a more relaxed sorting so that points // Aggregate functions can use a more relaxed sorting so that points
// within a window are grouped. This is much more efficient. // within a window are grouped. This is much more efficient.
switch input := inputs[0].(type) { switch inputs := Iterators(inputs).cast().(type) {
case FloatIterator: case []FloatIterator:
return newFloatMergeIterator(newFloatIterators(inputs), opt) return newFloatMergeIterator(inputs, opt)
case IntegerIterator: case []IntegerIterator:
return newIntegerMergeIterator(newIntegerIterators(inputs), opt) return newIntegerMergeIterator(inputs, opt)
case StringIterator: case []StringIterator:
return newStringMergeIterator(newStringIterators(inputs), opt) return newStringMergeIterator(inputs, opt)
case BooleanIterator: case []BooleanIterator:
return newBooleanMergeIterator(newBooleanIterators(inputs), opt) return newBooleanMergeIterator(inputs, opt)
default: default:
panic(fmt.Sprintf("unsupported merge iterator type: %T", input)) panic(fmt.Sprintf("unsupported merge iterator type: %T", inputs))
} }
} }
@ -85,17 +132,17 @@ func NewSortedMergeIterator(inputs []Iterator, opt IteratorOptions) Iterator {
return &nilFloatIterator{} return &nilFloatIterator{}
} }
switch input := inputs[0].(type) { switch inputs := Iterators(inputs).cast().(type) {
case FloatIterator: case []FloatIterator:
return newFloatSortedMergeIterator(newFloatIterators(inputs), opt) return newFloatSortedMergeIterator(inputs, opt)
case IntegerIterator: case []IntegerIterator:
return newIntegerSortedMergeIterator(newIntegerIterators(inputs), opt) return newIntegerSortedMergeIterator(inputs, opt)
case StringIterator: case []StringIterator:
return newStringSortedMergeIterator(newStringIterators(inputs), opt) return newStringSortedMergeIterator(inputs, opt)
case BooleanIterator: case []BooleanIterator:
return newBooleanSortedMergeIterator(newBooleanIterators(inputs), opt) return newBooleanSortedMergeIterator(inputs, opt)
default: default:
panic(fmt.Sprintf("unsupported sorted merge iterator type: %T", input)) panic(fmt.Sprintf("unsupported sorted merge iterator type: %T", inputs))
} }
} }
@ -509,3 +556,24 @@ type nilFloatIterator struct{}
func (*nilFloatIterator) Close() error { return nil } func (*nilFloatIterator) Close() error { return nil }
func (*nilFloatIterator) Next() *FloatPoint { return nil } func (*nilFloatIterator) Next() *FloatPoint { return nil }
type integerFloatCastIterator struct {
input IntegerIterator
}
func (itr *integerFloatCastIterator) Close() error { return itr.input.Close() }
func (itr *integerFloatCastIterator) Next() *FloatPoint {
p := itr.input.Next()
if p == nil {
return nil
}
return &FloatPoint{
Name: p.Name,
Tags: p.Tags,
Time: p.Time,
Nil: p.Nil,
Value: float64(p.Value),
Aux: p.Aux,
}
}

View File

@ -203,6 +203,57 @@ func TestMergeIterator_Nil(t *testing.T) {
itr.Close() itr.Close()
} }
func TestMergeIterator_Cast_Float(t *testing.T) {
inputs := []influxql.Iterator{
&IntegerIterator{Points: []influxql.IntegerPoint{
{Name: "cpu", Tags: ParseTags("host=A"), Time: 0, Value: 1},
{Name: "cpu", Tags: ParseTags("host=B"), Time: 1, Value: 2},
{Name: "mem", Tags: ParseTags("host=B"), Time: 11, Value: 8},
{Name: "cpu", Tags: ParseTags("host=A"), Time: 12, Value: 3},
{Name: "cpu", Tags: ParseTags("host=A"), Time: 30, Value: 4},
}},
&FloatIterator{Points: []influxql.FloatPoint{
{Name: "cpu", Tags: ParseTags("host=B"), Time: 11, Value: 5},
{Name: "cpu", Tags: ParseTags("host=B"), Time: 13, Value: 6},
{Name: "cpu", Tags: ParseTags("host=A"), Time: 20, Value: 7},
{Name: "mem", Tags: ParseTags("host=A"), Time: 25, Value: 9},
}},
}
itr := influxql.NewMergeIterator(inputs, influxql.IteratorOptions{
Interval: influxql.Interval{
Duration: 10 * time.Nanosecond,
},
Ascending: true,
})
if a := Iterators([]influxql.Iterator{itr}).ReadAll(); !deep.Equal(a, [][]influxql.Point{
{&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 0, Value: 1}},
{&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 1, Value: 2}},
{&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 11, Value: 5}},
{&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 13, Value: 6}},
{&influxql.FloatPoint{Name: "mem", Tags: ParseTags("host=B"), Time: 11, Value: 8}},
{&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 12, Value: 3}},
{&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 20, Value: 7}},
{&influxql.FloatPoint{Name: "mem", Tags: ParseTags("host=A"), Time: 25, Value: 9}},
{&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 30, Value: 4}},
}) {
t.Errorf("unexpected points: %s", spew.Sdump(a))
}
for i, input := range inputs {
switch input := input.(type) {
case *FloatIterator:
if !input.Closed {
t.Errorf("iterator %d not closed", i)
}
case *IntegerIterator:
if !input.Closed {
t.Errorf("iterator %d not closed", i)
}
}
}
}
// Ensure that a set of iterators can be merged together, sorted by name/tag. // Ensure that a set of iterators can be merged together, sorted by name/tag.
func TestSortedMergeIterator_Float(t *testing.T) { func TestSortedMergeIterator_Float(t *testing.T) {
inputs := []*FloatIterator{ inputs := []*FloatIterator{
@ -391,6 +442,57 @@ func TestSortedMergeIterator_Nil(t *testing.T) {
itr.Close() itr.Close()
} }
func TestSortedMergeIterator_Cast_Float(t *testing.T) {
inputs := []influxql.Iterator{
&IntegerIterator{Points: []influxql.IntegerPoint{
{Name: "cpu", Tags: ParseTags("host=A"), Time: 0, Value: 1},
{Name: "cpu", Tags: ParseTags("host=A"), Time: 12, Value: 3},
{Name: "cpu", Tags: ParseTags("host=A"), Time: 30, Value: 4},
{Name: "cpu", Tags: ParseTags("host=B"), Time: 1, Value: 2},
{Name: "mem", Tags: ParseTags("host=B"), Time: 4, Value: 8},
}},
&FloatIterator{Points: []influxql.FloatPoint{
{Name: "cpu", Tags: ParseTags("host=A"), Time: 20, Value: 7},
{Name: "cpu", Tags: ParseTags("host=B"), Time: 11, Value: 5},
{Name: "cpu", Tags: ParseTags("host=B"), Time: 13, Value: 6},
{Name: "mem", Tags: ParseTags("host=A"), Time: 25, Value: 9},
}},
}
itr := influxql.NewSortedMergeIterator(inputs, influxql.IteratorOptions{
Interval: influxql.Interval{
Duration: 10 * time.Nanosecond,
},
Ascending: true,
})
if a := Iterators([]influxql.Iterator{itr}).ReadAll(); !deep.Equal(a, [][]influxql.Point{
{&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 0, Value: 1}},
{&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 12, Value: 3}},
{&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 20, Value: 7}},
{&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 30, Value: 4}},
{&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 1, Value: 2}},
{&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 11, Value: 5}},
{&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 13, Value: 6}},
{&influxql.FloatPoint{Name: "mem", Tags: ParseTags("host=A"), Time: 25, Value: 9}},
{&influxql.FloatPoint{Name: "mem", Tags: ParseTags("host=B"), Time: 4, Value: 8}},
}) {
t.Errorf("unexpected points: %s", spew.Sdump(a))
}
for i, input := range inputs {
switch input := input.(type) {
case *FloatIterator:
if !input.Closed {
t.Errorf("iterator %d not closed", i)
}
case *IntegerIterator:
if !input.Closed {
t.Errorf("iterator %d not closed", i)
}
}
}
}
// Ensure auxilary iterators can be created for auxilary fields. // Ensure auxilary iterators can be created for auxilary fields.
func TestFloatAuxIterator(t *testing.T) { func TestFloatAuxIterator(t *testing.T) {
itr := influxql.NewAuxIterator( itr := influxql.NewAuxIterator(