Modify iterators to work across shards
Aux iterators now ask the iterator creator what series will be returned and determine which aux fields to create based on the results. The `tsdb.Shards` struct also creates a call iterator around the iterators returned from each shard.pull/5196/head
parent
08f823546d
commit
d1f7c445e7
|
@ -53,6 +53,14 @@ func InspectDataType(v interface{}) DataType {
|
|||
}
|
||||
}
|
||||
|
||||
func InspectDataTypes(a []interface{}) []DataType {
|
||||
dta := make([]DataType, len(a))
|
||||
for i, v := range a {
|
||||
dta[i] = InspectDataType(v)
|
||||
}
|
||||
return dta
|
||||
}
|
||||
|
||||
func (d DataType) String() string {
|
||||
switch d {
|
||||
case Float:
|
||||
|
|
|
@ -418,6 +418,7 @@ type floatFillIterator struct {
|
|||
curTime int64
|
||||
startTime int64
|
||||
endTime int64
|
||||
auxFields []interface{}
|
||||
opt IteratorOptions
|
||||
}
|
||||
|
||||
|
@ -438,12 +439,21 @@ func newFloatFillIterator(input FloatIterator, seriesKeys SeriesList, expr Expr,
|
|||
endTime, _ = opt.Window(opt.StartTime)
|
||||
}
|
||||
|
||||
var auxFields []interface{}
|
||||
if len(seriesKeys) > 0 {
|
||||
series := seriesKeys[0]
|
||||
if len(series.Aux) > 0 {
|
||||
auxFields = make([]interface{}, len(series.Aux))
|
||||
}
|
||||
}
|
||||
|
||||
return &floatFillIterator{
|
||||
input: newBufFloatIterator(input),
|
||||
seriesKeys: seriesKeys,
|
||||
curTime: startTime,
|
||||
startTime: startTime,
|
||||
endTime: endTime,
|
||||
auxFields: auxFields,
|
||||
opt: opt,
|
||||
}
|
||||
}
|
||||
|
@ -463,7 +473,7 @@ func (itr *floatFillIterator) Next() *FloatPoint {
|
|||
Name: series.Name,
|
||||
Tags: series.Tags,
|
||||
Time: itr.curTime,
|
||||
Aux: series.Aux,
|
||||
Aux: itr.auxFields,
|
||||
}
|
||||
|
||||
switch itr.opt.Fill {
|
||||
|
@ -489,18 +499,30 @@ func (itr *floatFillIterator) Next() *FloatPoint {
|
|||
itr.curTime = p.Time + int64(itr.opt.Interval.Duration)
|
||||
if itr.curTime >= itr.endTime {
|
||||
itr.curTime = itr.startTime
|
||||
itr.index++
|
||||
itr.nextSeries()
|
||||
}
|
||||
} else {
|
||||
itr.curTime = p.Time - int64(itr.opt.Interval.Duration)
|
||||
if itr.curTime < itr.endTime {
|
||||
itr.curTime = itr.startTime
|
||||
itr.index++
|
||||
itr.nextSeries()
|
||||
}
|
||||
}
|
||||
return p
|
||||
}
|
||||
|
||||
func (itr *floatFillIterator) nextSeries() {
|
||||
itr.index++
|
||||
if itr.index < len(itr.seriesKeys) {
|
||||
series := itr.seriesKeys[itr.index]
|
||||
if len(series.Aux) > 0 {
|
||||
itr.auxFields = make([]interface{}, len(series.Aux))
|
||||
} else {
|
||||
itr.auxFields = nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// floatAuxIterator represents a float implementation of AuxIterator.
|
||||
type floatAuxIterator struct {
|
||||
input *bufFloatIterator
|
||||
|
@ -508,17 +530,16 @@ type floatAuxIterator struct {
|
|||
fields auxIteratorFields
|
||||
}
|
||||
|
||||
func newFloatAuxIterator(input FloatIterator, opt IteratorOptions) *floatAuxIterator {
|
||||
func newFloatAuxIterator(input FloatIterator, seriesKeys SeriesList, opt IteratorOptions) *floatAuxIterator {
|
||||
itr := &floatAuxIterator{
|
||||
input: newBufFloatIterator(input),
|
||||
output: make(chan *FloatPoint, 1),
|
||||
fields: newAuxIteratorFields(opt),
|
||||
}
|
||||
|
||||
// Initialize auxilary fields.
|
||||
if p := itr.input.Next(); p != nil {
|
||||
itr.output <- p
|
||||
itr.fields.init(p)
|
||||
// Initialize auxiliary fields.
|
||||
if len(opt.Aux) > 0 {
|
||||
itr.fields.init(seriesKeys)
|
||||
}
|
||||
|
||||
go itr.stream()
|
||||
|
@ -1268,6 +1289,7 @@ type integerFillIterator struct {
|
|||
curTime int64
|
||||
startTime int64
|
||||
endTime int64
|
||||
auxFields []interface{}
|
||||
opt IteratorOptions
|
||||
}
|
||||
|
||||
|
@ -1288,12 +1310,21 @@ func newIntegerFillIterator(input IntegerIterator, seriesKeys SeriesList, expr E
|
|||
endTime, _ = opt.Window(opt.StartTime)
|
||||
}
|
||||
|
||||
var auxFields []interface{}
|
||||
if len(seriesKeys) > 0 {
|
||||
series := seriesKeys[0]
|
||||
if len(series.Aux) > 0 {
|
||||
auxFields = make([]interface{}, len(series.Aux))
|
||||
}
|
||||
}
|
||||
|
||||
return &integerFillIterator{
|
||||
input: newBufIntegerIterator(input),
|
||||
seriesKeys: seriesKeys,
|
||||
curTime: startTime,
|
||||
startTime: startTime,
|
||||
endTime: endTime,
|
||||
auxFields: auxFields,
|
||||
opt: opt,
|
||||
}
|
||||
}
|
||||
|
@ -1313,7 +1344,7 @@ func (itr *integerFillIterator) Next() *IntegerPoint {
|
|||
Name: series.Name,
|
||||
Tags: series.Tags,
|
||||
Time: itr.curTime,
|
||||
Aux: series.Aux,
|
||||
Aux: itr.auxFields,
|
||||
}
|
||||
|
||||
switch itr.opt.Fill {
|
||||
|
@ -1339,18 +1370,30 @@ func (itr *integerFillIterator) Next() *IntegerPoint {
|
|||
itr.curTime = p.Time + int64(itr.opt.Interval.Duration)
|
||||
if itr.curTime >= itr.endTime {
|
||||
itr.curTime = itr.startTime
|
||||
itr.index++
|
||||
itr.nextSeries()
|
||||
}
|
||||
} else {
|
||||
itr.curTime = p.Time - int64(itr.opt.Interval.Duration)
|
||||
if itr.curTime < itr.endTime {
|
||||
itr.curTime = itr.startTime
|
||||
itr.index++
|
||||
itr.nextSeries()
|
||||
}
|
||||
}
|
||||
return p
|
||||
}
|
||||
|
||||
func (itr *integerFillIterator) nextSeries() {
|
||||
itr.index++
|
||||
if itr.index < len(itr.seriesKeys) {
|
||||
series := itr.seriesKeys[itr.index]
|
||||
if len(series.Aux) > 0 {
|
||||
itr.auxFields = make([]interface{}, len(series.Aux))
|
||||
} else {
|
||||
itr.auxFields = nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// integerAuxIterator represents a integer implementation of AuxIterator.
|
||||
type integerAuxIterator struct {
|
||||
input *bufIntegerIterator
|
||||
|
@ -1358,17 +1401,16 @@ type integerAuxIterator struct {
|
|||
fields auxIteratorFields
|
||||
}
|
||||
|
||||
func newIntegerAuxIterator(input IntegerIterator, opt IteratorOptions) *integerAuxIterator {
|
||||
func newIntegerAuxIterator(input IntegerIterator, seriesKeys SeriesList, opt IteratorOptions) *integerAuxIterator {
|
||||
itr := &integerAuxIterator{
|
||||
input: newBufIntegerIterator(input),
|
||||
output: make(chan *IntegerPoint, 1),
|
||||
fields: newAuxIteratorFields(opt),
|
||||
}
|
||||
|
||||
// Initialize auxilary fields.
|
||||
if p := itr.input.Next(); p != nil {
|
||||
itr.output <- p
|
||||
itr.fields.init(p)
|
||||
// Initialize auxiliary fields.
|
||||
if len(opt.Aux) > 0 {
|
||||
itr.fields.init(seriesKeys)
|
||||
}
|
||||
|
||||
go itr.stream()
|
||||
|
@ -2118,6 +2160,7 @@ type stringFillIterator struct {
|
|||
curTime int64
|
||||
startTime int64
|
||||
endTime int64
|
||||
auxFields []interface{}
|
||||
opt IteratorOptions
|
||||
}
|
||||
|
||||
|
@ -2138,12 +2181,21 @@ func newStringFillIterator(input StringIterator, seriesKeys SeriesList, expr Exp
|
|||
endTime, _ = opt.Window(opt.StartTime)
|
||||
}
|
||||
|
||||
var auxFields []interface{}
|
||||
if len(seriesKeys) > 0 {
|
||||
series := seriesKeys[0]
|
||||
if len(series.Aux) > 0 {
|
||||
auxFields = make([]interface{}, len(series.Aux))
|
||||
}
|
||||
}
|
||||
|
||||
return &stringFillIterator{
|
||||
input: newBufStringIterator(input),
|
||||
seriesKeys: seriesKeys,
|
||||
curTime: startTime,
|
||||
startTime: startTime,
|
||||
endTime: endTime,
|
||||
auxFields: auxFields,
|
||||
opt: opt,
|
||||
}
|
||||
}
|
||||
|
@ -2163,7 +2215,7 @@ func (itr *stringFillIterator) Next() *StringPoint {
|
|||
Name: series.Name,
|
||||
Tags: series.Tags,
|
||||
Time: itr.curTime,
|
||||
Aux: series.Aux,
|
||||
Aux: itr.auxFields,
|
||||
}
|
||||
|
||||
switch itr.opt.Fill {
|
||||
|
@ -2189,18 +2241,30 @@ func (itr *stringFillIterator) Next() *StringPoint {
|
|||
itr.curTime = p.Time + int64(itr.opt.Interval.Duration)
|
||||
if itr.curTime >= itr.endTime {
|
||||
itr.curTime = itr.startTime
|
||||
itr.index++
|
||||
itr.nextSeries()
|
||||
}
|
||||
} else {
|
||||
itr.curTime = p.Time - int64(itr.opt.Interval.Duration)
|
||||
if itr.curTime < itr.endTime {
|
||||
itr.curTime = itr.startTime
|
||||
itr.index++
|
||||
itr.nextSeries()
|
||||
}
|
||||
}
|
||||
return p
|
||||
}
|
||||
|
||||
func (itr *stringFillIterator) nextSeries() {
|
||||
itr.index++
|
||||
if itr.index < len(itr.seriesKeys) {
|
||||
series := itr.seriesKeys[itr.index]
|
||||
if len(series.Aux) > 0 {
|
||||
itr.auxFields = make([]interface{}, len(series.Aux))
|
||||
} else {
|
||||
itr.auxFields = nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// stringAuxIterator represents a string implementation of AuxIterator.
|
||||
type stringAuxIterator struct {
|
||||
input *bufStringIterator
|
||||
|
@ -2208,17 +2272,16 @@ type stringAuxIterator struct {
|
|||
fields auxIteratorFields
|
||||
}
|
||||
|
||||
func newStringAuxIterator(input StringIterator, opt IteratorOptions) *stringAuxIterator {
|
||||
func newStringAuxIterator(input StringIterator, seriesKeys SeriesList, opt IteratorOptions) *stringAuxIterator {
|
||||
itr := &stringAuxIterator{
|
||||
input: newBufStringIterator(input),
|
||||
output: make(chan *StringPoint, 1),
|
||||
fields: newAuxIteratorFields(opt),
|
||||
}
|
||||
|
||||
// Initialize auxilary fields.
|
||||
if p := itr.input.Next(); p != nil {
|
||||
itr.output <- p
|
||||
itr.fields.init(p)
|
||||
// Initialize auxiliary fields.
|
||||
if len(opt.Aux) > 0 {
|
||||
itr.fields.init(seriesKeys)
|
||||
}
|
||||
|
||||
go itr.stream()
|
||||
|
@ -2968,6 +3031,7 @@ type booleanFillIterator struct {
|
|||
curTime int64
|
||||
startTime int64
|
||||
endTime int64
|
||||
auxFields []interface{}
|
||||
opt IteratorOptions
|
||||
}
|
||||
|
||||
|
@ -2988,12 +3052,21 @@ func newBooleanFillIterator(input BooleanIterator, seriesKeys SeriesList, expr E
|
|||
endTime, _ = opt.Window(opt.StartTime)
|
||||
}
|
||||
|
||||
var auxFields []interface{}
|
||||
if len(seriesKeys) > 0 {
|
||||
series := seriesKeys[0]
|
||||
if len(series.Aux) > 0 {
|
||||
auxFields = make([]interface{}, len(series.Aux))
|
||||
}
|
||||
}
|
||||
|
||||
return &booleanFillIterator{
|
||||
input: newBufBooleanIterator(input),
|
||||
seriesKeys: seriesKeys,
|
||||
curTime: startTime,
|
||||
startTime: startTime,
|
||||
endTime: endTime,
|
||||
auxFields: auxFields,
|
||||
opt: opt,
|
||||
}
|
||||
}
|
||||
|
@ -3013,7 +3086,7 @@ func (itr *booleanFillIterator) Next() *BooleanPoint {
|
|||
Name: series.Name,
|
||||
Tags: series.Tags,
|
||||
Time: itr.curTime,
|
||||
Aux: series.Aux,
|
||||
Aux: itr.auxFields,
|
||||
}
|
||||
|
||||
switch itr.opt.Fill {
|
||||
|
@ -3039,18 +3112,30 @@ func (itr *booleanFillIterator) Next() *BooleanPoint {
|
|||
itr.curTime = p.Time + int64(itr.opt.Interval.Duration)
|
||||
if itr.curTime >= itr.endTime {
|
||||
itr.curTime = itr.startTime
|
||||
itr.index++
|
||||
itr.nextSeries()
|
||||
}
|
||||
} else {
|
||||
itr.curTime = p.Time - int64(itr.opt.Interval.Duration)
|
||||
if itr.curTime < itr.endTime {
|
||||
itr.curTime = itr.startTime
|
||||
itr.index++
|
||||
itr.nextSeries()
|
||||
}
|
||||
}
|
||||
return p
|
||||
}
|
||||
|
||||
func (itr *booleanFillIterator) nextSeries() {
|
||||
itr.index++
|
||||
if itr.index < len(itr.seriesKeys) {
|
||||
series := itr.seriesKeys[itr.index]
|
||||
if len(series.Aux) > 0 {
|
||||
itr.auxFields = make([]interface{}, len(series.Aux))
|
||||
} else {
|
||||
itr.auxFields = nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// booleanAuxIterator represents a boolean implementation of AuxIterator.
|
||||
type booleanAuxIterator struct {
|
||||
input *bufBooleanIterator
|
||||
|
@ -3058,17 +3143,16 @@ type booleanAuxIterator struct {
|
|||
fields auxIteratorFields
|
||||
}
|
||||
|
||||
func newBooleanAuxIterator(input BooleanIterator, opt IteratorOptions) *booleanAuxIterator {
|
||||
func newBooleanAuxIterator(input BooleanIterator, seriesKeys SeriesList, opt IteratorOptions) *booleanAuxIterator {
|
||||
itr := &booleanAuxIterator{
|
||||
input: newBufBooleanIterator(input),
|
||||
output: make(chan *BooleanPoint, 1),
|
||||
fields: newAuxIteratorFields(opt),
|
||||
}
|
||||
|
||||
// Initialize auxilary fields.
|
||||
if p := itr.input.Next(); p != nil {
|
||||
itr.output <- p
|
||||
itr.fields.init(p)
|
||||
// Initialize auxiliary fields.
|
||||
if len(opt.Aux) > 0 {
|
||||
itr.fields.init(seriesKeys)
|
||||
}
|
||||
|
||||
go itr.stream()
|
||||
|
|
|
@ -420,6 +420,7 @@ type {{.name}}FillIterator struct {
|
|||
curTime int64
|
||||
startTime int64
|
||||
endTime int64
|
||||
auxFields []interface{}
|
||||
opt IteratorOptions
|
||||
}
|
||||
|
||||
|
@ -440,12 +441,21 @@ func new{{.Name}}FillIterator(input {{.Name}}Iterator, seriesKeys SeriesList, ex
|
|||
endTime, _ = opt.Window(opt.StartTime)
|
||||
}
|
||||
|
||||
var auxFields []interface{}
|
||||
if len(seriesKeys) > 0 {
|
||||
series := seriesKeys[0]
|
||||
if len(series.Aux) > 0 {
|
||||
auxFields = make([]interface{}, len(series.Aux))
|
||||
}
|
||||
}
|
||||
|
||||
return &{{.name}}FillIterator{
|
||||
input: newBuf{{.Name}}Iterator(input),
|
||||
seriesKeys: seriesKeys,
|
||||
curTime: startTime,
|
||||
startTime: startTime,
|
||||
endTime: endTime,
|
||||
auxFields: auxFields,
|
||||
opt: opt,
|
||||
}
|
||||
}
|
||||
|
@ -465,7 +475,7 @@ func (itr *{{.name}}FillIterator) Next() *{{.Name}}Point {
|
|||
Name: series.Name,
|
||||
Tags: series.Tags,
|
||||
Time: itr.curTime,
|
||||
Aux: series.Aux,
|
||||
Aux: itr.auxFields,
|
||||
}
|
||||
|
||||
switch itr.opt.Fill {
|
||||
|
@ -491,18 +501,30 @@ func (itr *{{.name}}FillIterator) Next() *{{.Name}}Point {
|
|||
itr.curTime = p.Time + int64(itr.opt.Interval.Duration)
|
||||
if itr.curTime >= itr.endTime {
|
||||
itr.curTime = itr.startTime
|
||||
itr.index++
|
||||
itr.nextSeries()
|
||||
}
|
||||
} else {
|
||||
itr.curTime = p.Time - int64(itr.opt.Interval.Duration)
|
||||
if itr.curTime < itr.endTime {
|
||||
itr.curTime = itr.startTime
|
||||
itr.index++
|
||||
itr.nextSeries()
|
||||
}
|
||||
}
|
||||
return p
|
||||
}
|
||||
|
||||
func (itr *{{.name}}FillIterator) nextSeries() {
|
||||
itr.index++
|
||||
if itr.index < len(itr.seriesKeys) {
|
||||
series := itr.seriesKeys[itr.index]
|
||||
if len(series.Aux) > 0 {
|
||||
itr.auxFields = make([]interface{}, len(series.Aux))
|
||||
} else {
|
||||
itr.auxFields = nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// {{.name}}AuxIterator represents a {{.name}} implementation of AuxIterator.
|
||||
type {{.name}}AuxIterator struct {
|
||||
input *buf{{.Name}}Iterator
|
||||
|
@ -510,17 +532,16 @@ type {{.name}}AuxIterator struct {
|
|||
fields auxIteratorFields
|
||||
}
|
||||
|
||||
func new{{.Name}}AuxIterator(input {{.Name}}Iterator, opt IteratorOptions) *{{.name}}AuxIterator {
|
||||
func new{{.Name}}AuxIterator(input {{.Name}}Iterator, seriesKeys SeriesList, opt IteratorOptions) *{{.name}}AuxIterator {
|
||||
itr := &{{.name}}AuxIterator{
|
||||
input: newBuf{{.Name}}Iterator(input),
|
||||
output: make(chan *{{.Name}}Point, 1),
|
||||
fields: newAuxIteratorFields(opt),
|
||||
}
|
||||
|
||||
// Initialize auxilary fields.
|
||||
if p := itr.input.Next(); p != nil {
|
||||
itr.output <- p
|
||||
itr.fields.init(p)
|
||||
// Initialize auxiliary fields.
|
||||
if len(opt.Aux) > 0 {
|
||||
itr.fields.init(seriesKeys)
|
||||
}
|
||||
|
||||
go itr.stream()
|
||||
|
@ -528,7 +549,7 @@ func new{{.Name}}AuxIterator(input {{.Name}}Iterator, opt IteratorOptions) *{{.n
|
|||
}
|
||||
|
||||
func (itr *{{.name}}AuxIterator) Close() error { return itr.input.Close() }
|
||||
func (itr *{{.name}}AuxIterator) Next() *{{.Name}}Point { return <-itr.output }
|
||||
func (itr *{{.name}}AuxIterator) Next() *{{.Name}}Point { return <-itr.output }
|
||||
func (itr *{{.name}}AuxIterator) Iterator(name string) Iterator { return itr.fields.iterator(name) }
|
||||
|
||||
func (itr *{{.name}}AuxIterator) CreateIterator(opt IteratorOptions) (Iterator, error) {
|
||||
|
|
|
@ -210,16 +210,16 @@ type AuxIterator interface {
|
|||
}
|
||||
|
||||
// NewAuxIterator returns a new instance of AuxIterator.
|
||||
func NewAuxIterator(input Iterator, opt IteratorOptions) AuxIterator {
|
||||
func NewAuxIterator(input Iterator, seriesKeys SeriesList, opt IteratorOptions) AuxIterator {
|
||||
switch input := input.(type) {
|
||||
case FloatIterator:
|
||||
return newFloatAuxIterator(input, opt)
|
||||
return newFloatAuxIterator(input, seriesKeys, opt)
|
||||
case IntegerIterator:
|
||||
return newIntegerAuxIterator(input, opt)
|
||||
return newIntegerAuxIterator(input, seriesKeys, opt)
|
||||
case StringIterator:
|
||||
return newStringAuxIterator(input, opt)
|
||||
return newStringAuxIterator(input, seriesKeys, opt)
|
||||
case BooleanIterator:
|
||||
return newBooleanAuxIterator(input, opt)
|
||||
return newBooleanAuxIterator(input, seriesKeys, opt)
|
||||
default:
|
||||
panic(fmt.Sprintf("unsupported aux iterator type: %T", input))
|
||||
}
|
||||
|
@ -227,11 +227,10 @@ func NewAuxIterator(input Iterator, opt IteratorOptions) AuxIterator {
|
|||
|
||||
// auxIteratorField represents an auxilary field within an AuxIterator.
|
||||
type auxIteratorField struct {
|
||||
name string // field name
|
||||
typ DataType // detected data type
|
||||
initial Point // first point
|
||||
itrs []Iterator // auxillary iterators
|
||||
opt IteratorOptions
|
||||
name string // field name
|
||||
typ DataType // detected data type
|
||||
itrs []Iterator // auxillary iterators
|
||||
opt IteratorOptions
|
||||
}
|
||||
|
||||
type auxIteratorFields []*auxIteratorField
|
||||
|
@ -254,43 +253,16 @@ func (a auxIteratorFields) close() {
|
|||
}
|
||||
|
||||
// init initializes all auxilary fields with initial points.
|
||||
func (a auxIteratorFields) init(p Point) {
|
||||
values := p.aux()
|
||||
for i, f := range a {
|
||||
v := values[i]
|
||||
func (a auxIteratorFields) init(seriesKeys SeriesList) {
|
||||
for _, s := range seriesKeys {
|
||||
for i, aux := range s.Aux {
|
||||
if aux == Unknown {
|
||||
continue
|
||||
}
|
||||
|
||||
tags := p.tags()
|
||||
tags = tags.Subset(f.opt.Dimensions)
|
||||
|
||||
// Initialize first point based off value received.
|
||||
// Primitive pointers represent nil values.
|
||||
switch v := v.(type) {
|
||||
case float64:
|
||||
f.typ = Float
|
||||
f.initial = &FloatPoint{Name: p.name(), Tags: tags, Time: p.time(), Value: v}
|
||||
case *float64:
|
||||
f.typ = Float
|
||||
f.initial = &FloatPoint{Name: p.name(), Tags: tags, Time: p.time(), Nil: true}
|
||||
case int64:
|
||||
f.typ = Integer
|
||||
f.initial = &IntegerPoint{Name: p.name(), Tags: tags, Time: p.time(), Value: v}
|
||||
case *int64:
|
||||
f.typ = Integer
|
||||
f.initial = &IntegerPoint{Name: p.name(), Tags: tags, Time: p.time(), Nil: true}
|
||||
case string:
|
||||
f.typ = String
|
||||
f.initial = &StringPoint{Name: p.name(), Tags: tags, Time: p.time(), Value: v}
|
||||
case *string:
|
||||
f.typ = String
|
||||
f.initial = &StringPoint{Name: p.name(), Tags: tags, Time: p.time(), Nil: true}
|
||||
case bool:
|
||||
f.typ = Boolean
|
||||
f.initial = &BooleanPoint{Name: p.name(), Tags: tags, Time: p.time(), Value: v}
|
||||
case *bool:
|
||||
f.typ = Boolean
|
||||
f.initial = &BooleanPoint{Name: p.name(), Tags: tags, Time: p.time(), Nil: true}
|
||||
default:
|
||||
panic(fmt.Sprintf("invalid aux value type: %T", v))
|
||||
if a[i].typ == Unknown || aux < a[i].typ {
|
||||
a[i].typ = aux
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -302,34 +274,28 @@ func (a auxIteratorFields) iterator(name string) Iterator {
|
|||
// Exit if no points were received by the iterator.
|
||||
if f.name != name {
|
||||
continue
|
||||
} else if f.initial == nil {
|
||||
break
|
||||
}
|
||||
|
||||
// Create channel iterator by data type.
|
||||
switch f.typ {
|
||||
case Float:
|
||||
itr := &floatChanIterator{c: make(chan *FloatPoint, 1)}
|
||||
itr.c <- f.initial.(*FloatPoint)
|
||||
f.itrs = append(f.itrs, itr)
|
||||
return itr
|
||||
case Integer:
|
||||
itr := &integerChanIterator{c: make(chan *IntegerPoint, 1)}
|
||||
itr.c <- f.initial.(*IntegerPoint)
|
||||
f.itrs = append(f.itrs, itr)
|
||||
return itr
|
||||
case String:
|
||||
itr := &stringChanIterator{c: make(chan *StringPoint, 1)}
|
||||
itr.c <- f.initial.(*StringPoint)
|
||||
f.itrs = append(f.itrs, itr)
|
||||
return itr
|
||||
case Boolean:
|
||||
itr := &booleanChanIterator{c: make(chan *BooleanPoint, 1)}
|
||||
itr.c <- f.initial.(*BooleanPoint)
|
||||
f.itrs = append(f.itrs, itr)
|
||||
return itr
|
||||
default:
|
||||
panic(fmt.Sprintf("unsupported chan iterator type: %s", f.typ))
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -353,6 +319,8 @@ func (a auxIteratorFields) send(p Point) {
|
|||
switch v := v.(type) {
|
||||
case float64:
|
||||
itr.c <- &FloatPoint{Name: p.name(), Tags: tags, Time: p.time(), Value: v}
|
||||
case int64:
|
||||
itr.c <- &FloatPoint{Name: p.name(), Tags: tags, Time: p.time(), Value: float64(v)}
|
||||
default:
|
||||
itr.c <- &FloatPoint{Name: p.name(), Tags: tags, Time: p.time(), Nil: true}
|
||||
}
|
||||
|
@ -590,13 +558,25 @@ func (v *selectInfo) Visit(n Node) Visitor {
|
|||
type Series struct {
|
||||
Name string
|
||||
Tags Tags
|
||||
Aux []interface{}
|
||||
Aux []DataType
|
||||
}
|
||||
|
||||
func (s *Series) ID() string {
|
||||
return s.Name + "\x00" + s.Tags.ID()
|
||||
}
|
||||
|
||||
func (s *Series) Combine(other *Series) {
|
||||
for i, t := range s.Aux {
|
||||
if other.Aux[i] == Unknown {
|
||||
continue
|
||||
}
|
||||
|
||||
if t == Unknown || other.Aux[i] < t {
|
||||
s.Aux[i] = other.Aux[i]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type SeriesList []Series
|
||||
|
||||
func (a SeriesList) Len() int { return len(a) }
|
||||
|
|
|
@ -502,6 +502,9 @@ func TestFloatAuxIterator(t *testing.T) {
|
|||
{Time: 0, Value: 1, Aux: []interface{}{float64(100), float64(200)}},
|
||||
{Time: 1, Value: 2, Aux: []interface{}{float64(500), math.NaN()}},
|
||||
}},
|
||||
influxql.SeriesList{
|
||||
{Aux: []influxql.DataType{influxql.Float, influxql.Float}},
|
||||
},
|
||||
influxql.IteratorOptions{Aux: []string{"f0", "f1"}},
|
||||
)
|
||||
|
||||
|
@ -781,22 +784,22 @@ func (ic *IteratorCreator) SeriesKeys(opt influxql.IteratorOptions) (influxql.Se
|
|||
switch itr := itr.(type) {
|
||||
case influxql.FloatIterator:
|
||||
for p := itr.Next(); p != nil; p = itr.Next() {
|
||||
s := influxql.Series{Name: p.Name, Tags: p.Tags}
|
||||
s := influxql.Series{Name: p.Name, Tags: p.Tags, Aux: influxql.InspectDataTypes(p.Aux)}
|
||||
seriesMap[s.ID()] = s
|
||||
}
|
||||
case influxql.IntegerIterator:
|
||||
for p := itr.Next(); p != nil; p = itr.Next() {
|
||||
s := influxql.Series{Name: p.Name, Tags: p.Tags}
|
||||
s := influxql.Series{Name: p.Name, Tags: p.Tags, Aux: influxql.InspectDataTypes(p.Aux)}
|
||||
seriesMap[s.ID()] = s
|
||||
}
|
||||
case influxql.StringIterator:
|
||||
for p := itr.Next(); p != nil; p = itr.Next() {
|
||||
s := influxql.Series{Name: p.Name, Tags: p.Tags}
|
||||
s := influxql.Series{Name: p.Name, Tags: p.Tags, Aux: influxql.InspectDataTypes(p.Aux)}
|
||||
seriesMap[s.ID()] = s
|
||||
}
|
||||
case influxql.BooleanIterator:
|
||||
for p := itr.Next(); p != nil; p = itr.Next() {
|
||||
s := influxql.Series{Name: p.Name, Tags: p.Tags}
|
||||
s := influxql.Series{Name: p.Name, Tags: p.Tags, Aux: influxql.InspectDataTypes(p.Aux)}
|
||||
seriesMap[s.ID()] = s
|
||||
}
|
||||
}
|
||||
|
|
|
@ -85,8 +85,14 @@ func buildAuxIterators(fields Fields, ic IteratorCreator, opt IteratorOptions) (
|
|||
input = NewLimitIterator(input, opt)
|
||||
}
|
||||
|
||||
seriesKeys, err := ic.SeriesKeys(opt)
|
||||
if err != nil {
|
||||
input.Close()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Wrap in an auxilary iterator to separate the fields.
|
||||
aitr := NewAuxIterator(input, opt)
|
||||
aitr := NewAuxIterator(input, seriesKeys, opt)
|
||||
|
||||
// Generate iterators for each field.
|
||||
itrs := make([]Iterator, len(fields))
|
||||
|
@ -142,9 +148,14 @@ func buildFieldIterators(fields Fields, ic IteratorCreator, opt IteratorOptions)
|
|||
return nil
|
||||
}
|
||||
|
||||
seriesKeys, err := ic.SeriesKeys(opt)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Build the aux iterators. Previous validation should ensure that only one
|
||||
// call was present so we build an AuxIterator from that input.
|
||||
aitr := NewAuxIterator(input, opt)
|
||||
aitr := NewAuxIterator(input, seriesKeys, opt)
|
||||
for i, f := range fields {
|
||||
if itrs[i] != nil {
|
||||
itrs[i] = aitr
|
||||
|
|
|
@ -694,8 +694,8 @@ func TestSelect_Raw(t *testing.T) {
|
|||
|
||||
}
|
||||
return &FloatIterator{Points: []influxql.FloatPoint{
|
||||
{Time: 0, Aux: []interface{}{float64(1), math.NaN()}},
|
||||
{Time: 1, Aux: []interface{}{math.NaN(), float64(2)}},
|
||||
{Time: 0, Aux: []interface{}{float64(1), nil}},
|
||||
{Time: 1, Aux: []interface{}{nil, float64(2)}},
|
||||
{Time: 5, Aux: []interface{}{float64(3), float64(4)}},
|
||||
}}, nil
|
||||
}
|
||||
|
@ -707,10 +707,10 @@ func TestSelect_Raw(t *testing.T) {
|
|||
} else if a := Iterators(itrs).ReadAll(); !deep.Equal(a, [][]influxql.Point{
|
||||
{
|
||||
&influxql.FloatPoint{Time: 0, Value: 1},
|
||||
&influxql.FloatPoint{Time: 0, Value: math.NaN()},
|
||||
&influxql.FloatPoint{Time: 0, Nil: true},
|
||||
},
|
||||
{
|
||||
&influxql.FloatPoint{Time: 1, Value: math.NaN()},
|
||||
&influxql.FloatPoint{Time: 1, Nil: true},
|
||||
&influxql.FloatPoint{Time: 1, Value: 2},
|
||||
},
|
||||
{
|
||||
|
|
|
@ -692,53 +692,44 @@ func (e *Engine) SeriesKeys(opt influxql.IteratorOptions) (influxql.SeriesList,
|
|||
}
|
||||
tags := influxql.NewTags(tagMap)
|
||||
|
||||
// Determine the nil values for the aux fields/tags
|
||||
aux := make([]interface{}, 0, len(opt.Aux))
|
||||
for _, field := range opt.Aux {
|
||||
typ := func() influxql.DataType {
|
||||
mf := e.measurementFields[mm.Name]
|
||||
if mf == nil {
|
||||
return influxql.Unknown
|
||||
}
|
||||
|
||||
f := mf.Fields[field]
|
||||
if f == nil {
|
||||
return influxql.Unknown
|
||||
}
|
||||
return f.Type
|
||||
}()
|
||||
|
||||
if typ == influxql.Unknown {
|
||||
if v := tags.Value(field); v == "" {
|
||||
// We have no idea what this field/tag is, so it doesn't
|
||||
// exist for this part of the series.
|
||||
// Use a boolean so it can be promoted to the appropriate
|
||||
// type if another iterator knows the type.
|
||||
typ = influxql.Boolean
|
||||
} else {
|
||||
// All tags are strings.
|
||||
typ = influxql.String
|
||||
}
|
||||
}
|
||||
|
||||
switch typ {
|
||||
case influxql.Float:
|
||||
aux = append(aux, (*float64)(nil))
|
||||
case influxql.Integer:
|
||||
aux = append(aux, (*int64)(nil))
|
||||
case influxql.String:
|
||||
aux = append(aux, (*string)(nil))
|
||||
case influxql.Boolean:
|
||||
aux = append(aux, (*bool)(nil))
|
||||
default:
|
||||
panic(fmt.Sprintf("invalid aux type: %s", typ))
|
||||
}
|
||||
}
|
||||
seriesList = append(seriesList, influxql.Series{
|
||||
series := influxql.Series{
|
||||
Name: mm.Name,
|
||||
Tags: tags,
|
||||
Aux: aux,
|
||||
})
|
||||
Aux: make([]influxql.DataType, len(opt.Aux)),
|
||||
}
|
||||
|
||||
// Determine the aux field types.
|
||||
for _, seriesKey := range t.SeriesKeys {
|
||||
tags := influxql.NewTags(e.index.TagsForSeries(seriesKey))
|
||||
for i, field := range opt.Aux {
|
||||
typ := func() influxql.DataType {
|
||||
mf := e.measurementFields[mm.Name]
|
||||
if mf == nil {
|
||||
return influxql.Unknown
|
||||
}
|
||||
|
||||
f := mf.Fields[field]
|
||||
if f == nil {
|
||||
return influxql.Unknown
|
||||
}
|
||||
return f.Type
|
||||
}()
|
||||
|
||||
if typ == influxql.Unknown {
|
||||
if v := tags.Value(field); v != "" {
|
||||
// All tags are strings.
|
||||
typ = influxql.String
|
||||
}
|
||||
}
|
||||
|
||||
if typ != influxql.Unknown {
|
||||
if series.Aux[i] == influxql.Unknown || typ < series.Aux[i] {
|
||||
series.Aux[i] = typ
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
seriesList = append(seriesList, series)
|
||||
}
|
||||
}
|
||||
return seriesList, nil
|
||||
|
|
|
@ -453,7 +453,17 @@ func (a Shards) CreateIterator(opt influxql.IteratorOptions) (influxql.Iterator,
|
|||
if opt.MergeSorted() {
|
||||
return influxql.NewSortedMergeIterator(itrs, opt), nil
|
||||
}
|
||||
return influxql.NewMergeIterator(itrs, opt), nil
|
||||
|
||||
itr := influxql.NewMergeIterator(itrs, opt)
|
||||
if opt.Expr != nil {
|
||||
if expr, ok := opt.Expr.(*influxql.Call); ok && expr.Name == "count" {
|
||||
opt.Expr = &influxql.Call{
|
||||
Name: "sum",
|
||||
Args: expr.Args,
|
||||
}
|
||||
}
|
||||
}
|
||||
return influxql.NewCallIterator(itr, opt), nil
|
||||
}
|
||||
|
||||
// createSystemIterator returns an iterator for a system source.
|
||||
|
@ -475,6 +485,15 @@ func (a Shards) createSystemIterator(opt influxql.IteratorOptions) (influxql.Ite
|
|||
}
|
||||
|
||||
func (a Shards) SeriesKeys(opt influxql.IteratorOptions) (influxql.SeriesList, error) {
|
||||
if influxql.Sources(opt.Sources).HasSystemSource() {
|
||||
// Only support a single system source.
|
||||
if len(opt.Sources) > 1 {
|
||||
return nil, errors.New("cannot select from multiple system sources")
|
||||
}
|
||||
// Meta queries don't need to know the series name and always have a single string.
|
||||
return influxql.SeriesList{{Aux: []influxql.DataType{influxql.String}}}, nil
|
||||
}
|
||||
|
||||
seriesMap := make(map[string]influxql.Series)
|
||||
for _, sh := range a {
|
||||
series, err := sh.SeriesKeys(opt)
|
||||
|
@ -483,7 +502,12 @@ func (a Shards) SeriesKeys(opt influxql.IteratorOptions) (influxql.SeriesList, e
|
|||
}
|
||||
|
||||
for _, s := range series {
|
||||
seriesMap[s.ID()] = s
|
||||
cur, ok := seriesMap[s.ID()]
|
||||
if ok {
|
||||
cur.Combine(&s)
|
||||
} else {
|
||||
seriesMap[s.ID()] = s
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue