From d1f7c445e7102fbe2ed30b6bd143599d61096abc Mon Sep 17 00:00:00 2001 From: "Jonathan A. Sternberg" Date: Mon, 8 Feb 2016 10:02:08 -0500 Subject: [PATCH] Modify iterators to work across shards Aux iterators now ask the iterator creator what series will be returned and determine which aux fields to create based on the results. The `tsdb.Shards` struct also creates a call iterator around the iterators returned from each shard. --- influxql/ast.go | 8 ++ influxql/iterator.gen.go | 148 ++++++++++++++++++++++++++-------- influxql/iterator.gen.go.tmpl | 39 ++++++--- influxql/iterator.go | 88 ++++++++------------ influxql/iterator_test.go | 11 ++- influxql/select.go | 15 +++- influxql/select_test.go | 8 +- tsdb/engine/tsm1/engine.go | 81 +++++++++---------- tsdb/shard.go | 28 ++++++- 9 files changed, 274 insertions(+), 152 deletions(-) diff --git a/influxql/ast.go b/influxql/ast.go index f31c4eddd5..acd604568a 100644 --- a/influxql/ast.go +++ b/influxql/ast.go @@ -53,6 +53,14 @@ func InspectDataType(v interface{}) DataType { } } +func InspectDataTypes(a []interface{}) []DataType { + dta := make([]DataType, len(a)) + for i, v := range a { + dta[i] = InspectDataType(v) + } + return dta +} + func (d DataType) String() string { switch d { case Float: diff --git a/influxql/iterator.gen.go b/influxql/iterator.gen.go index be72e501d5..b670c654b0 100644 --- a/influxql/iterator.gen.go +++ b/influxql/iterator.gen.go @@ -418,6 +418,7 @@ type floatFillIterator struct { curTime int64 startTime int64 endTime int64 + auxFields []interface{} opt IteratorOptions } @@ -438,12 +439,21 @@ func newFloatFillIterator(input FloatIterator, seriesKeys SeriesList, expr Expr, endTime, _ = opt.Window(opt.StartTime) } + var auxFields []interface{} + if len(seriesKeys) > 0 { + series := seriesKeys[0] + if len(series.Aux) > 0 { + auxFields = make([]interface{}, len(series.Aux)) + } + } + return &floatFillIterator{ input: newBufFloatIterator(input), seriesKeys: seriesKeys, curTime: startTime, startTime: startTime, endTime: endTime, + auxFields: auxFields, opt: opt, } } @@ -463,7 +473,7 @@ func (itr *floatFillIterator) Next() *FloatPoint { Name: series.Name, Tags: series.Tags, Time: itr.curTime, - Aux: series.Aux, + Aux: itr.auxFields, } switch itr.opt.Fill { @@ -489,18 +499,30 @@ func (itr *floatFillIterator) Next() *FloatPoint { itr.curTime = p.Time + int64(itr.opt.Interval.Duration) if itr.curTime >= itr.endTime { itr.curTime = itr.startTime - itr.index++ + itr.nextSeries() } } else { itr.curTime = p.Time - int64(itr.opt.Interval.Duration) if itr.curTime < itr.endTime { itr.curTime = itr.startTime - itr.index++ + itr.nextSeries() } } return p } +func (itr *floatFillIterator) nextSeries() { + itr.index++ + if itr.index < len(itr.seriesKeys) { + series := itr.seriesKeys[itr.index] + if len(series.Aux) > 0 { + itr.auxFields = make([]interface{}, len(series.Aux)) + } else { + itr.auxFields = nil + } + } +} + // floatAuxIterator represents a float implementation of AuxIterator. type floatAuxIterator struct { input *bufFloatIterator @@ -508,17 +530,16 @@ type floatAuxIterator struct { fields auxIteratorFields } -func newFloatAuxIterator(input FloatIterator, opt IteratorOptions) *floatAuxIterator { +func newFloatAuxIterator(input FloatIterator, seriesKeys SeriesList, opt IteratorOptions) *floatAuxIterator { itr := &floatAuxIterator{ input: newBufFloatIterator(input), output: make(chan *FloatPoint, 1), fields: newAuxIteratorFields(opt), } - // Initialize auxilary fields. - if p := itr.input.Next(); p != nil { - itr.output <- p - itr.fields.init(p) + // Initialize auxiliary fields. + if len(opt.Aux) > 0 { + itr.fields.init(seriesKeys) } go itr.stream() @@ -1268,6 +1289,7 @@ type integerFillIterator struct { curTime int64 startTime int64 endTime int64 + auxFields []interface{} opt IteratorOptions } @@ -1288,12 +1310,21 @@ func newIntegerFillIterator(input IntegerIterator, seriesKeys SeriesList, expr E endTime, _ = opt.Window(opt.StartTime) } + var auxFields []interface{} + if len(seriesKeys) > 0 { + series := seriesKeys[0] + if len(series.Aux) > 0 { + auxFields = make([]interface{}, len(series.Aux)) + } + } + return &integerFillIterator{ input: newBufIntegerIterator(input), seriesKeys: seriesKeys, curTime: startTime, startTime: startTime, endTime: endTime, + auxFields: auxFields, opt: opt, } } @@ -1313,7 +1344,7 @@ func (itr *integerFillIterator) Next() *IntegerPoint { Name: series.Name, Tags: series.Tags, Time: itr.curTime, - Aux: series.Aux, + Aux: itr.auxFields, } switch itr.opt.Fill { @@ -1339,18 +1370,30 @@ func (itr *integerFillIterator) Next() *IntegerPoint { itr.curTime = p.Time + int64(itr.opt.Interval.Duration) if itr.curTime >= itr.endTime { itr.curTime = itr.startTime - itr.index++ + itr.nextSeries() } } else { itr.curTime = p.Time - int64(itr.opt.Interval.Duration) if itr.curTime < itr.endTime { itr.curTime = itr.startTime - itr.index++ + itr.nextSeries() } } return p } +func (itr *integerFillIterator) nextSeries() { + itr.index++ + if itr.index < len(itr.seriesKeys) { + series := itr.seriesKeys[itr.index] + if len(series.Aux) > 0 { + itr.auxFields = make([]interface{}, len(series.Aux)) + } else { + itr.auxFields = nil + } + } +} + // integerAuxIterator represents a integer implementation of AuxIterator. type integerAuxIterator struct { input *bufIntegerIterator @@ -1358,17 +1401,16 @@ type integerAuxIterator struct { fields auxIteratorFields } -func newIntegerAuxIterator(input IntegerIterator, opt IteratorOptions) *integerAuxIterator { +func newIntegerAuxIterator(input IntegerIterator, seriesKeys SeriesList, opt IteratorOptions) *integerAuxIterator { itr := &integerAuxIterator{ input: newBufIntegerIterator(input), output: make(chan *IntegerPoint, 1), fields: newAuxIteratorFields(opt), } - // Initialize auxilary fields. - if p := itr.input.Next(); p != nil { - itr.output <- p - itr.fields.init(p) + // Initialize auxiliary fields. + if len(opt.Aux) > 0 { + itr.fields.init(seriesKeys) } go itr.stream() @@ -2118,6 +2160,7 @@ type stringFillIterator struct { curTime int64 startTime int64 endTime int64 + auxFields []interface{} opt IteratorOptions } @@ -2138,12 +2181,21 @@ func newStringFillIterator(input StringIterator, seriesKeys SeriesList, expr Exp endTime, _ = opt.Window(opt.StartTime) } + var auxFields []interface{} + if len(seriesKeys) > 0 { + series := seriesKeys[0] + if len(series.Aux) > 0 { + auxFields = make([]interface{}, len(series.Aux)) + } + } + return &stringFillIterator{ input: newBufStringIterator(input), seriesKeys: seriesKeys, curTime: startTime, startTime: startTime, endTime: endTime, + auxFields: auxFields, opt: opt, } } @@ -2163,7 +2215,7 @@ func (itr *stringFillIterator) Next() *StringPoint { Name: series.Name, Tags: series.Tags, Time: itr.curTime, - Aux: series.Aux, + Aux: itr.auxFields, } switch itr.opt.Fill { @@ -2189,18 +2241,30 @@ func (itr *stringFillIterator) Next() *StringPoint { itr.curTime = p.Time + int64(itr.opt.Interval.Duration) if itr.curTime >= itr.endTime { itr.curTime = itr.startTime - itr.index++ + itr.nextSeries() } } else { itr.curTime = p.Time - int64(itr.opt.Interval.Duration) if itr.curTime < itr.endTime { itr.curTime = itr.startTime - itr.index++ + itr.nextSeries() } } return p } +func (itr *stringFillIterator) nextSeries() { + itr.index++ + if itr.index < len(itr.seriesKeys) { + series := itr.seriesKeys[itr.index] + if len(series.Aux) > 0 { + itr.auxFields = make([]interface{}, len(series.Aux)) + } else { + itr.auxFields = nil + } + } +} + // stringAuxIterator represents a string implementation of AuxIterator. type stringAuxIterator struct { input *bufStringIterator @@ -2208,17 +2272,16 @@ type stringAuxIterator struct { fields auxIteratorFields } -func newStringAuxIterator(input StringIterator, opt IteratorOptions) *stringAuxIterator { +func newStringAuxIterator(input StringIterator, seriesKeys SeriesList, opt IteratorOptions) *stringAuxIterator { itr := &stringAuxIterator{ input: newBufStringIterator(input), output: make(chan *StringPoint, 1), fields: newAuxIteratorFields(opt), } - // Initialize auxilary fields. - if p := itr.input.Next(); p != nil { - itr.output <- p - itr.fields.init(p) + // Initialize auxiliary fields. + if len(opt.Aux) > 0 { + itr.fields.init(seriesKeys) } go itr.stream() @@ -2968,6 +3031,7 @@ type booleanFillIterator struct { curTime int64 startTime int64 endTime int64 + auxFields []interface{} opt IteratorOptions } @@ -2988,12 +3052,21 @@ func newBooleanFillIterator(input BooleanIterator, seriesKeys SeriesList, expr E endTime, _ = opt.Window(opt.StartTime) } + var auxFields []interface{} + if len(seriesKeys) > 0 { + series := seriesKeys[0] + if len(series.Aux) > 0 { + auxFields = make([]interface{}, len(series.Aux)) + } + } + return &booleanFillIterator{ input: newBufBooleanIterator(input), seriesKeys: seriesKeys, curTime: startTime, startTime: startTime, endTime: endTime, + auxFields: auxFields, opt: opt, } } @@ -3013,7 +3086,7 @@ func (itr *booleanFillIterator) Next() *BooleanPoint { Name: series.Name, Tags: series.Tags, Time: itr.curTime, - Aux: series.Aux, + Aux: itr.auxFields, } switch itr.opt.Fill { @@ -3039,18 +3112,30 @@ func (itr *booleanFillIterator) Next() *BooleanPoint { itr.curTime = p.Time + int64(itr.opt.Interval.Duration) if itr.curTime >= itr.endTime { itr.curTime = itr.startTime - itr.index++ + itr.nextSeries() } } else { itr.curTime = p.Time - int64(itr.opt.Interval.Duration) if itr.curTime < itr.endTime { itr.curTime = itr.startTime - itr.index++ + itr.nextSeries() } } return p } +func (itr *booleanFillIterator) nextSeries() { + itr.index++ + if itr.index < len(itr.seriesKeys) { + series := itr.seriesKeys[itr.index] + if len(series.Aux) > 0 { + itr.auxFields = make([]interface{}, len(series.Aux)) + } else { + itr.auxFields = nil + } + } +} + // booleanAuxIterator represents a boolean implementation of AuxIterator. type booleanAuxIterator struct { input *bufBooleanIterator @@ -3058,17 +3143,16 @@ type booleanAuxIterator struct { fields auxIteratorFields } -func newBooleanAuxIterator(input BooleanIterator, opt IteratorOptions) *booleanAuxIterator { +func newBooleanAuxIterator(input BooleanIterator, seriesKeys SeriesList, opt IteratorOptions) *booleanAuxIterator { itr := &booleanAuxIterator{ input: newBufBooleanIterator(input), output: make(chan *BooleanPoint, 1), fields: newAuxIteratorFields(opt), } - // Initialize auxilary fields. - if p := itr.input.Next(); p != nil { - itr.output <- p - itr.fields.init(p) + // Initialize auxiliary fields. + if len(opt.Aux) > 0 { + itr.fields.init(seriesKeys) } go itr.stream() diff --git a/influxql/iterator.gen.go.tmpl b/influxql/iterator.gen.go.tmpl index 3460665458..ae8827cce1 100644 --- a/influxql/iterator.gen.go.tmpl +++ b/influxql/iterator.gen.go.tmpl @@ -420,6 +420,7 @@ type {{.name}}FillIterator struct { curTime int64 startTime int64 endTime int64 + auxFields []interface{} opt IteratorOptions } @@ -440,12 +441,21 @@ func new{{.Name}}FillIterator(input {{.Name}}Iterator, seriesKeys SeriesList, ex endTime, _ = opt.Window(opt.StartTime) } + var auxFields []interface{} + if len(seriesKeys) > 0 { + series := seriesKeys[0] + if len(series.Aux) > 0 { + auxFields = make([]interface{}, len(series.Aux)) + } + } + return &{{.name}}FillIterator{ input: newBuf{{.Name}}Iterator(input), seriesKeys: seriesKeys, curTime: startTime, startTime: startTime, endTime: endTime, + auxFields: auxFields, opt: opt, } } @@ -465,7 +475,7 @@ func (itr *{{.name}}FillIterator) Next() *{{.Name}}Point { Name: series.Name, Tags: series.Tags, Time: itr.curTime, - Aux: series.Aux, + Aux: itr.auxFields, } switch itr.opt.Fill { @@ -491,18 +501,30 @@ func (itr *{{.name}}FillIterator) Next() *{{.Name}}Point { itr.curTime = p.Time + int64(itr.opt.Interval.Duration) if itr.curTime >= itr.endTime { itr.curTime = itr.startTime - itr.index++ + itr.nextSeries() } } else { itr.curTime = p.Time - int64(itr.opt.Interval.Duration) if itr.curTime < itr.endTime { itr.curTime = itr.startTime - itr.index++ + itr.nextSeries() } } return p } +func (itr *{{.name}}FillIterator) nextSeries() { + itr.index++ + if itr.index < len(itr.seriesKeys) { + series := itr.seriesKeys[itr.index] + if len(series.Aux) > 0 { + itr.auxFields = make([]interface{}, len(series.Aux)) + } else { + itr.auxFields = nil + } + } +} + // {{.name}}AuxIterator represents a {{.name}} implementation of AuxIterator. type {{.name}}AuxIterator struct { input *buf{{.Name}}Iterator @@ -510,17 +532,16 @@ type {{.name}}AuxIterator struct { fields auxIteratorFields } -func new{{.Name}}AuxIterator(input {{.Name}}Iterator, opt IteratorOptions) *{{.name}}AuxIterator { +func new{{.Name}}AuxIterator(input {{.Name}}Iterator, seriesKeys SeriesList, opt IteratorOptions) *{{.name}}AuxIterator { itr := &{{.name}}AuxIterator{ input: newBuf{{.Name}}Iterator(input), output: make(chan *{{.Name}}Point, 1), fields: newAuxIteratorFields(opt), } - // Initialize auxilary fields. - if p := itr.input.Next(); p != nil { - itr.output <- p - itr.fields.init(p) + // Initialize auxiliary fields. + if len(opt.Aux) > 0 { + itr.fields.init(seriesKeys) } go itr.stream() @@ -528,7 +549,7 @@ func new{{.Name}}AuxIterator(input {{.Name}}Iterator, opt IteratorOptions) *{{.n } func (itr *{{.name}}AuxIterator) Close() error { return itr.input.Close() } -func (itr *{{.name}}AuxIterator) Next() *{{.Name}}Point { return <-itr.output } +func (itr *{{.name}}AuxIterator) Next() *{{.Name}}Point { return <-itr.output } func (itr *{{.name}}AuxIterator) Iterator(name string) Iterator { return itr.fields.iterator(name) } func (itr *{{.name}}AuxIterator) CreateIterator(opt IteratorOptions) (Iterator, error) { diff --git a/influxql/iterator.go b/influxql/iterator.go index 445fe16649..af9a82ad02 100644 --- a/influxql/iterator.go +++ b/influxql/iterator.go @@ -210,16 +210,16 @@ type AuxIterator interface { } // NewAuxIterator returns a new instance of AuxIterator. -func NewAuxIterator(input Iterator, opt IteratorOptions) AuxIterator { +func NewAuxIterator(input Iterator, seriesKeys SeriesList, opt IteratorOptions) AuxIterator { switch input := input.(type) { case FloatIterator: - return newFloatAuxIterator(input, opt) + return newFloatAuxIterator(input, seriesKeys, opt) case IntegerIterator: - return newIntegerAuxIterator(input, opt) + return newIntegerAuxIterator(input, seriesKeys, opt) case StringIterator: - return newStringAuxIterator(input, opt) + return newStringAuxIterator(input, seriesKeys, opt) case BooleanIterator: - return newBooleanAuxIterator(input, opt) + return newBooleanAuxIterator(input, seriesKeys, opt) default: panic(fmt.Sprintf("unsupported aux iterator type: %T", input)) } @@ -227,11 +227,10 @@ func NewAuxIterator(input Iterator, opt IteratorOptions) AuxIterator { // auxIteratorField represents an auxilary field within an AuxIterator. type auxIteratorField struct { - name string // field name - typ DataType // detected data type - initial Point // first point - itrs []Iterator // auxillary iterators - opt IteratorOptions + name string // field name + typ DataType // detected data type + itrs []Iterator // auxillary iterators + opt IteratorOptions } type auxIteratorFields []*auxIteratorField @@ -254,43 +253,16 @@ func (a auxIteratorFields) close() { } // init initializes all auxilary fields with initial points. -func (a auxIteratorFields) init(p Point) { - values := p.aux() - for i, f := range a { - v := values[i] +func (a auxIteratorFields) init(seriesKeys SeriesList) { + for _, s := range seriesKeys { + for i, aux := range s.Aux { + if aux == Unknown { + continue + } - tags := p.tags() - tags = tags.Subset(f.opt.Dimensions) - - // Initialize first point based off value received. - // Primitive pointers represent nil values. - switch v := v.(type) { - case float64: - f.typ = Float - f.initial = &FloatPoint{Name: p.name(), Tags: tags, Time: p.time(), Value: v} - case *float64: - f.typ = Float - f.initial = &FloatPoint{Name: p.name(), Tags: tags, Time: p.time(), Nil: true} - case int64: - f.typ = Integer - f.initial = &IntegerPoint{Name: p.name(), Tags: tags, Time: p.time(), Value: v} - case *int64: - f.typ = Integer - f.initial = &IntegerPoint{Name: p.name(), Tags: tags, Time: p.time(), Nil: true} - case string: - f.typ = String - f.initial = &StringPoint{Name: p.name(), Tags: tags, Time: p.time(), Value: v} - case *string: - f.typ = String - f.initial = &StringPoint{Name: p.name(), Tags: tags, Time: p.time(), Nil: true} - case bool: - f.typ = Boolean - f.initial = &BooleanPoint{Name: p.name(), Tags: tags, Time: p.time(), Value: v} - case *bool: - f.typ = Boolean - f.initial = &BooleanPoint{Name: p.name(), Tags: tags, Time: p.time(), Nil: true} - default: - panic(fmt.Sprintf("invalid aux value type: %T", v)) + if a[i].typ == Unknown || aux < a[i].typ { + a[i].typ = aux + } } } } @@ -302,34 +274,28 @@ func (a auxIteratorFields) iterator(name string) Iterator { // Exit if no points were received by the iterator. if f.name != name { continue - } else if f.initial == nil { - break } // Create channel iterator by data type. switch f.typ { case Float: itr := &floatChanIterator{c: make(chan *FloatPoint, 1)} - itr.c <- f.initial.(*FloatPoint) f.itrs = append(f.itrs, itr) return itr case Integer: itr := &integerChanIterator{c: make(chan *IntegerPoint, 1)} - itr.c <- f.initial.(*IntegerPoint) f.itrs = append(f.itrs, itr) return itr case String: itr := &stringChanIterator{c: make(chan *StringPoint, 1)} - itr.c <- f.initial.(*StringPoint) f.itrs = append(f.itrs, itr) return itr case Boolean: itr := &booleanChanIterator{c: make(chan *BooleanPoint, 1)} - itr.c <- f.initial.(*BooleanPoint) f.itrs = append(f.itrs, itr) return itr default: - panic(fmt.Sprintf("unsupported chan iterator type: %s", f.typ)) + break } } @@ -353,6 +319,8 @@ func (a auxIteratorFields) send(p Point) { switch v := v.(type) { case float64: itr.c <- &FloatPoint{Name: p.name(), Tags: tags, Time: p.time(), Value: v} + case int64: + itr.c <- &FloatPoint{Name: p.name(), Tags: tags, Time: p.time(), Value: float64(v)} default: itr.c <- &FloatPoint{Name: p.name(), Tags: tags, Time: p.time(), Nil: true} } @@ -590,13 +558,25 @@ func (v *selectInfo) Visit(n Node) Visitor { type Series struct { Name string Tags Tags - Aux []interface{} + Aux []DataType } func (s *Series) ID() string { return s.Name + "\x00" + s.Tags.ID() } +func (s *Series) Combine(other *Series) { + for i, t := range s.Aux { + if other.Aux[i] == Unknown { + continue + } + + if t == Unknown || other.Aux[i] < t { + s.Aux[i] = other.Aux[i] + } + } +} + type SeriesList []Series func (a SeriesList) Len() int { return len(a) } diff --git a/influxql/iterator_test.go b/influxql/iterator_test.go index 54a045ece4..e3c84adc80 100644 --- a/influxql/iterator_test.go +++ b/influxql/iterator_test.go @@ -502,6 +502,9 @@ func TestFloatAuxIterator(t *testing.T) { {Time: 0, Value: 1, Aux: []interface{}{float64(100), float64(200)}}, {Time: 1, Value: 2, Aux: []interface{}{float64(500), math.NaN()}}, }}, + influxql.SeriesList{ + {Aux: []influxql.DataType{influxql.Float, influxql.Float}}, + }, influxql.IteratorOptions{Aux: []string{"f0", "f1"}}, ) @@ -781,22 +784,22 @@ func (ic *IteratorCreator) SeriesKeys(opt influxql.IteratorOptions) (influxql.Se switch itr := itr.(type) { case influxql.FloatIterator: for p := itr.Next(); p != nil; p = itr.Next() { - s := influxql.Series{Name: p.Name, Tags: p.Tags} + s := influxql.Series{Name: p.Name, Tags: p.Tags, Aux: influxql.InspectDataTypes(p.Aux)} seriesMap[s.ID()] = s } case influxql.IntegerIterator: for p := itr.Next(); p != nil; p = itr.Next() { - s := influxql.Series{Name: p.Name, Tags: p.Tags} + s := influxql.Series{Name: p.Name, Tags: p.Tags, Aux: influxql.InspectDataTypes(p.Aux)} seriesMap[s.ID()] = s } case influxql.StringIterator: for p := itr.Next(); p != nil; p = itr.Next() { - s := influxql.Series{Name: p.Name, Tags: p.Tags} + s := influxql.Series{Name: p.Name, Tags: p.Tags, Aux: influxql.InspectDataTypes(p.Aux)} seriesMap[s.ID()] = s } case influxql.BooleanIterator: for p := itr.Next(); p != nil; p = itr.Next() { - s := influxql.Series{Name: p.Name, Tags: p.Tags} + s := influxql.Series{Name: p.Name, Tags: p.Tags, Aux: influxql.InspectDataTypes(p.Aux)} seriesMap[s.ID()] = s } } diff --git a/influxql/select.go b/influxql/select.go index eaea138f39..e5d4fa6854 100644 --- a/influxql/select.go +++ b/influxql/select.go @@ -85,8 +85,14 @@ func buildAuxIterators(fields Fields, ic IteratorCreator, opt IteratorOptions) ( input = NewLimitIterator(input, opt) } + seriesKeys, err := ic.SeriesKeys(opt) + if err != nil { + input.Close() + return nil, err + } + // Wrap in an auxilary iterator to separate the fields. - aitr := NewAuxIterator(input, opt) + aitr := NewAuxIterator(input, seriesKeys, opt) // Generate iterators for each field. itrs := make([]Iterator, len(fields)) @@ -142,9 +148,14 @@ func buildFieldIterators(fields Fields, ic IteratorCreator, opt IteratorOptions) return nil } + seriesKeys, err := ic.SeriesKeys(opt) + if err != nil { + return err + } + // Build the aux iterators. Previous validation should ensure that only one // call was present so we build an AuxIterator from that input. - aitr := NewAuxIterator(input, opt) + aitr := NewAuxIterator(input, seriesKeys, opt) for i, f := range fields { if itrs[i] != nil { itrs[i] = aitr diff --git a/influxql/select_test.go b/influxql/select_test.go index 1ee8360011..dbf32837e1 100644 --- a/influxql/select_test.go +++ b/influxql/select_test.go @@ -694,8 +694,8 @@ func TestSelect_Raw(t *testing.T) { } return &FloatIterator{Points: []influxql.FloatPoint{ - {Time: 0, Aux: []interface{}{float64(1), math.NaN()}}, - {Time: 1, Aux: []interface{}{math.NaN(), float64(2)}}, + {Time: 0, Aux: []interface{}{float64(1), nil}}, + {Time: 1, Aux: []interface{}{nil, float64(2)}}, {Time: 5, Aux: []interface{}{float64(3), float64(4)}}, }}, nil } @@ -707,10 +707,10 @@ func TestSelect_Raw(t *testing.T) { } else if a := Iterators(itrs).ReadAll(); !deep.Equal(a, [][]influxql.Point{ { &influxql.FloatPoint{Time: 0, Value: 1}, - &influxql.FloatPoint{Time: 0, Value: math.NaN()}, + &influxql.FloatPoint{Time: 0, Nil: true}, }, { - &influxql.FloatPoint{Time: 1, Value: math.NaN()}, + &influxql.FloatPoint{Time: 1, Nil: true}, &influxql.FloatPoint{Time: 1, Value: 2}, }, { diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index 807b4f6257..5e750ef0cc 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -692,53 +692,44 @@ func (e *Engine) SeriesKeys(opt influxql.IteratorOptions) (influxql.SeriesList, } tags := influxql.NewTags(tagMap) - // Determine the nil values for the aux fields/tags - aux := make([]interface{}, 0, len(opt.Aux)) - for _, field := range opt.Aux { - typ := func() influxql.DataType { - mf := e.measurementFields[mm.Name] - if mf == nil { - return influxql.Unknown - } - - f := mf.Fields[field] - if f == nil { - return influxql.Unknown - } - return f.Type - }() - - if typ == influxql.Unknown { - if v := tags.Value(field); v == "" { - // We have no idea what this field/tag is, so it doesn't - // exist for this part of the series. - // Use a boolean so it can be promoted to the appropriate - // type if another iterator knows the type. - typ = influxql.Boolean - } else { - // All tags are strings. - typ = influxql.String - } - } - - switch typ { - case influxql.Float: - aux = append(aux, (*float64)(nil)) - case influxql.Integer: - aux = append(aux, (*int64)(nil)) - case influxql.String: - aux = append(aux, (*string)(nil)) - case influxql.Boolean: - aux = append(aux, (*bool)(nil)) - default: - panic(fmt.Sprintf("invalid aux type: %s", typ)) - } - } - seriesList = append(seriesList, influxql.Series{ + series := influxql.Series{ Name: mm.Name, Tags: tags, - Aux: aux, - }) + Aux: make([]influxql.DataType, len(opt.Aux)), + } + + // Determine the aux field types. + for _, seriesKey := range t.SeriesKeys { + tags := influxql.NewTags(e.index.TagsForSeries(seriesKey)) + for i, field := range opt.Aux { + typ := func() influxql.DataType { + mf := e.measurementFields[mm.Name] + if mf == nil { + return influxql.Unknown + } + + f := mf.Fields[field] + if f == nil { + return influxql.Unknown + } + return f.Type + }() + + if typ == influxql.Unknown { + if v := tags.Value(field); v != "" { + // All tags are strings. + typ = influxql.String + } + } + + if typ != influxql.Unknown { + if series.Aux[i] == influxql.Unknown || typ < series.Aux[i] { + series.Aux[i] = typ + } + } + } + } + seriesList = append(seriesList, series) } } return seriesList, nil diff --git a/tsdb/shard.go b/tsdb/shard.go index 574a47821f..537d675a8e 100644 --- a/tsdb/shard.go +++ b/tsdb/shard.go @@ -453,7 +453,17 @@ func (a Shards) CreateIterator(opt influxql.IteratorOptions) (influxql.Iterator, if opt.MergeSorted() { return influxql.NewSortedMergeIterator(itrs, opt), nil } - return influxql.NewMergeIterator(itrs, opt), nil + + itr := influxql.NewMergeIterator(itrs, opt) + if opt.Expr != nil { + if expr, ok := opt.Expr.(*influxql.Call); ok && expr.Name == "count" { + opt.Expr = &influxql.Call{ + Name: "sum", + Args: expr.Args, + } + } + } + return influxql.NewCallIterator(itr, opt), nil } // createSystemIterator returns an iterator for a system source. @@ -475,6 +485,15 @@ func (a Shards) createSystemIterator(opt influxql.IteratorOptions) (influxql.Ite } func (a Shards) SeriesKeys(opt influxql.IteratorOptions) (influxql.SeriesList, error) { + if influxql.Sources(opt.Sources).HasSystemSource() { + // Only support a single system source. + if len(opt.Sources) > 1 { + return nil, errors.New("cannot select from multiple system sources") + } + // Meta queries don't need to know the series name and always have a single string. + return influxql.SeriesList{{Aux: []influxql.DataType{influxql.String}}}, nil + } + seriesMap := make(map[string]influxql.Series) for _, sh := range a { series, err := sh.SeriesKeys(opt) @@ -483,7 +502,12 @@ func (a Shards) SeriesKeys(opt influxql.IteratorOptions) (influxql.SeriesList, e } for _, s := range series { - seriesMap[s.ID()] = s + cur, ok := seriesMap[s.ID()] + if ok { + cur.Combine(&s) + } else { + seriesMap[s.ID()] = s + } } }