package reads import ( "errors" "fmt" "math" "github.com/influxdata/flux/interval" "github.com/influxdata/flux/values" errors2 "github.com/influxdata/influxdb/kit/platform/errors" "github.com/influxdata/influxdb/tsdb/cursors" ) const ( // MaxPointsPerBlock is the maximum number of points in an encoded // block in a TSM file. It should match the value in the tsm1 // package, but we don't want to import it. MaxPointsPerBlock = 1000 ) func newLimitArrayCursor(cur cursors.Cursor) cursors.Cursor { switch cur := cur.(type) { {{range .}}{{/* every type supports limit */}} case cursors.{{.Name}}ArrayCursor: return new{{.Name}}LimitArrayCursor(cur) {{end}} default: panic(fmt.Sprintf("unreachable: %T", cur)) } } func newWindowFirstArrayCursor(cur cursors.Cursor, window interval.Window) cursors.Cursor { if window.IsZero() { return newLimitArrayCursor(cur) } switch cur := cur.(type) { {{range .}}{{/* every type supports first */}} case cursors.{{.Name}}ArrayCursor: return new{{.Name}}WindowFirstArrayCursor(cur, window) {{end}} default: panic(fmt.Sprintf("unreachable: %T", cur)) } } func newWindowLastArrayCursor(cur cursors.Cursor, window interval.Window) cursors.Cursor { if window.IsZero() { return newLimitArrayCursor(cur) } switch cur := cur.(type) { {{range .}}{{/* every type supports last */}} case cursors.{{.Name}}ArrayCursor: return new{{.Name}}WindowLastArrayCursor(cur, window) {{end}} default: panic(fmt.Sprintf("unreachable: %T", cur)) } } func newWindowCountArrayCursor(cur cursors.Cursor, window interval.Window) cursors.Cursor { switch cur := cur.(type) { {{range .}}{{/* every type supports count */}} case cursors.{{.Name}}ArrayCursor: return new{{.Name}}WindowCountArrayCursor(cur, window) {{end}} default: panic(fmt.Sprintf("unreachable: %T", cur)) } } func newWindowSumArrayCursor(cur cursors.Cursor, window interval.Window) (cursors.Cursor, error) { switch cur := cur.(type) { {{range .}} {{$Type := .Name}} {{range .Aggs}} {{if eq .Name "Sum"}} case cursors.{{$Type}}ArrayCursor: return new{{$Type}}WindowSumArrayCursor(cur, window), nil {{end}} {{end}}{{/* for each supported agg fn */}} {{end}}{{/* for each field type */}} default: return nil, &errors2.Error{ Code: errors2.EInvalid, Msg: fmt.Sprintf("unsupported input type for sum aggregate: %s", arrayCursorType(cur)), } } } func newWindowMinArrayCursor(cur cursors.Cursor, window interval.Window) cursors.Cursor { switch cur := cur.(type) { {{range .}} {{$Type := .Name}} {{range .Aggs}} {{if eq .Name "Min"}} case cursors.{{$Type}}ArrayCursor: return new{{$Type}}WindowMinArrayCursor(cur, window) {{end}} {{end}}{{/* for each supported agg fn */}} {{end}}{{/* for each field type */}} default: panic(fmt.Sprintf("unsupported for aggregate min: %T", cur)) } } func newWindowMaxArrayCursor(cur cursors.Cursor, window interval.Window) cursors.Cursor { switch cur := cur.(type) { {{range .}} {{$Type := .Name}} {{range .Aggs}} {{if eq .Name "Max"}} case cursors.{{$Type}}ArrayCursor: return new{{$Type}}WindowMaxArrayCursor(cur, window) {{end}} {{end}}{{/* for each supported agg fn */}} {{end}}{{/* for each field type */}} default: panic(fmt.Sprintf("unsupported for aggregate max: %T", cur)) } } func newWindowMeanArrayCursor(cur cursors.Cursor, window interval.Window) (cursors.Cursor, error) { switch cur := cur.(type) { {{range .}} {{$Type := .Name}} {{range .Aggs}} {{if eq .Name "Mean"}} case cursors.{{$Type}}ArrayCursor: return new{{$Type}}WindowMeanArrayCursor(cur, window), nil {{end}} {{end}}{{/* for each supported agg fn */}} {{end}}{{/* for each field type */}} default: return nil, &errors2.Error{ Code: errors2.EInvalid, Msg: fmt.Sprintf("unsupported input type for mean aggregate: %s", arrayCursorType(cur)), } } } func newWindowMeanCountArrayCursor(cur cursors.Cursor, window interval.Window) (cursors.Cursor, error) { switch cur := cur.(type) { {{range .}} {{$Type := .Name}} {{range .Aggs}} {{if eq .Name "MeanCount"}} case cursors.{{$Type}}ArrayCursor: return new{{$Type}}WindowMeanCountArrayCursor(cur, window), nil {{end}} {{end}}{{/* for each supported agg fn */}} {{end}}{{/* for each field type */}} default: return nil, &errors2.Error{ Code: errors2.EInvalid, Msg: fmt.Sprintf("unsupported input type for meancount aggregate: %s", arrayCursorType(cur)), } } } {{range .}} {{$arrayType := print "*cursors." .Name "Array"}} {{$type := print .name "ArrayFilterCursor"}} {{$Type := print .Name "ArrayFilterCursor"}} // ******************** // {{.Name}} Array Cursor type {{$type}} struct { cursors.{{.Name}}ArrayCursor cond expression m *singleValue res {{$arrayType}} tmp {{$arrayType}} } func new{{.Name}}FilterArrayCursor(cond expression) *{{$type}} { return &{{$type}}{ cond: cond, m: &singleValue{}, res: cursors.New{{.Name}}ArrayLen(MaxPointsPerBlock), tmp: &cursors.{{.Name}}Array{}, } } func (c *{{$type}}) reset(cur cursors.{{.Name}}ArrayCursor) { c.{{.Name}}ArrayCursor = cur c.tmp.Timestamps, c.tmp.Values = nil, nil } func (c *{{$type}}) Stats() cursors.CursorStats { return c.{{.Name}}ArrayCursor.Stats() } func (c *{{$type}}) Next() {{$arrayType}} { pos := 0 c.res.Timestamps = c.res.Timestamps[:cap(c.res.Timestamps)] c.res.Values = c.res.Values[:cap(c.res.Values)] var a {{$arrayType}} if c.tmp.Len() > 0 { a = c.tmp } else { a = c.{{.Name}}ArrayCursor.Next() } LOOP: for len(a.Timestamps) > 0 { for i, v := range a.Values { c.m.v = v if c.cond.EvalBool(c.m) { c.res.Timestamps[pos] = a.Timestamps[i] c.res.Values[pos] = v pos++ if pos >= MaxPointsPerBlock { c.tmp.Timestamps = a.Timestamps[i+1:] c.tmp.Values = a.Values[i+1:] break LOOP } } } // Clear buffered timestamps & values if we make it through a cursor. // The break above will skip this if a cursor is partially read. c.tmp.Timestamps = nil c.tmp.Values = nil a = c.{{.Name}}ArrayCursor.Next() } c.res.Timestamps = c.res.Timestamps[:pos] c.res.Values = c.res.Values[:pos] return c.res } type {{.name}}MultiShardArrayCursor struct { cursors.{{.Name}}ArrayCursor cursorContext filter *{{$type}} } func (c *{{.name}}MultiShardArrayCursor) reset(cur cursors.{{.Name}}ArrayCursor, itrs cursors.CursorIterators, cond expression) { if cond != nil { if c.filter == nil { c.filter = new{{.Name}}FilterArrayCursor(cond) } else { c.filter.cond = cond } c.filter.reset(cur) cur = c.filter } c.{{.Name}}ArrayCursor = cur c.itrs = itrs c.err = nil } func (c *{{.name}}MultiShardArrayCursor) Err() error { return c.err } func (c *{{.name}}MultiShardArrayCursor) Stats() cursors.CursorStats { return c.{{.Name}}ArrayCursor.Stats() } func (c *{{.name}}MultiShardArrayCursor) Next() {{$arrayType}} { for { a := c.{{.Name}}ArrayCursor.Next() if a.Len() == 0 { if c.nextArrayCursor() { continue } } return a } } func (c *{{.name}}MultiShardArrayCursor) nextArrayCursor() bool { if len(c.itrs) == 0 { return false } c.{{.Name}}ArrayCursor.Close() var itr cursors.CursorIterator var cur cursors.Cursor var err error for cur == nil && len(c.itrs) > 0 && err == nil { itr, c.itrs = c.itrs[0], c.itrs[1:] cur, err = itr.Next(c.ctx, c.req) } c.err = err var ok bool if cur != nil && err == nil { var next cursors.{{.Name}}ArrayCursor next, ok = cur.(cursors.{{.Name}}ArrayCursor) if !ok { cur.Close() next = {{.Name}}EmptyArrayCursor c.err = errors.New("expected {{.name}} cursor") } else { if c.filter != nil { c.filter.reset(next) next = c.filter } } c.{{.Name}}ArrayCursor = next } else { c.{{.Name}}ArrayCursor = {{.Name}}EmptyArrayCursor } return ok } type {{.name}}LimitArrayCursor struct { cursors.{{.Name}}ArrayCursor res {{$arrayType}} done bool } func new{{.Name}}LimitArrayCursor(cur cursors.{{.Name}}ArrayCursor) *{{.name}}LimitArrayCursor { return &{{.name}}LimitArrayCursor{ {{.Name}}ArrayCursor: cur, res: cursors.New{{.Name}}ArrayLen(1), } } func (c *{{.name}}LimitArrayCursor) Stats() cursors.CursorStats { return c.{{.Name}}ArrayCursor.Stats() } func (c *{{.name}}LimitArrayCursor) Next() {{$arrayType}} { if c.done { return &cursors.{{.Name}}Array{} } a := c.{{.Name}}ArrayCursor.Next() if len(a.Timestamps) == 0 { return a } c.done = true c.res.Timestamps[0] = a.Timestamps[0] c.res.Values[0] = a.Values[0] return c.res } type {{.name}}WindowLastArrayCursor struct { cursors.{{.Name}}ArrayCursor windowEnd int64 res {{$arrayType}} tmp {{$arrayType}} window interval.Window } // Window array cursors assume that every != 0 && every != MaxInt64. // Such a cursor will panic in the first case and possibly overflow in the second. func new{{.Name}}WindowLastArrayCursor(cur cursors.{{.Name}}ArrayCursor, window interval.Window) *{{.name}}WindowLastArrayCursor { return &{{.name}}WindowLastArrayCursor{ {{.Name}}ArrayCursor: cur, windowEnd: math.MinInt64, res: cursors.New{{.Name}}ArrayLen(MaxPointsPerBlock), tmp: &cursors.{{.Name}}Array{}, window: window, } } func (c *{{.name}}WindowLastArrayCursor) Stats() cursors.CursorStats { return c.{{.Name}}ArrayCursor.Stats() } func (c *{{.name}}WindowLastArrayCursor) Next() *cursors.{{.Name}}Array { cur := -1 NEXT: var a *cursors.{{.Name}}Array if c.tmp.Len() > 0 { a = c.tmp } else { a = c.{{.Name}}ArrayCursor.Next() } if a.Len() == 0 { c.res.Timestamps = c.res.Timestamps[:cur+1] c.res.Values = c.res.Values[:cur+1] return c.res } for i, t := range a.Timestamps { if t >= c.windowEnd { cur++ } if cur == MaxPointsPerBlock { c.tmp.Timestamps = a.Timestamps[i:] c.tmp.Values = a.Values[i:] return c.res } c.res.Timestamps[cur] = t c.res.Values[cur] = a.Values[i] c.windowEnd = int64(c.window.GetLatestBounds(values.Time(t)).Stop()) } c.tmp.Timestamps = nil c.tmp.Values = nil goto NEXT } type {{.name}}WindowFirstArrayCursor struct { cursors.{{.Name}}ArrayCursor windowEnd int64 res {{$arrayType}} tmp {{$arrayType}} window interval.Window } // Window array cursors assume that every != 0 && every != MaxInt64. // Such a cursor will panic in the first case and possibly overflow in the second. func new{{.Name}}WindowFirstArrayCursor(cur cursors.{{.Name}}ArrayCursor, window interval.Window) *{{.name}}WindowFirstArrayCursor { return &{{.name}}WindowFirstArrayCursor{ {{.Name}}ArrayCursor: cur, windowEnd: math.MinInt64, res: cursors.New{{.Name}}ArrayLen(MaxPointsPerBlock), tmp: &cursors.{{.Name}}Array{}, window: window, } } func (c *{{.name}}WindowFirstArrayCursor) Stats() cursors.CursorStats { return c.{{.Name}}ArrayCursor.Stats() } func (c *{{.name}}WindowFirstArrayCursor) Next() *cursors.{{.Name}}Array { c.res.Timestamps = c.res.Timestamps[:0] c.res.Values = c.res.Values[:0] NEXT: var a *cursors.{{.Name}}Array if c.tmp.Len() > 0 { a = c.tmp } else { a = c.{{.Name}}ArrayCursor.Next() } if a.Len() == 0 { return c.res } for i, t := range a.Timestamps { if t < c.windowEnd { continue } c.windowEnd = int64(c.window.GetLatestBounds(values.Time(t)).Stop()) c.res.Timestamps = append(c.res.Timestamps, t) c.res.Values = append(c.res.Values, a.Values[i]) if c.res.Len() == MaxPointsPerBlock { c.tmp.Timestamps = a.Timestamps[i+1:] c.tmp.Values = a.Values[i+1:] return c.res } } c.tmp.Timestamps = nil c.tmp.Values = nil goto NEXT } {{/* create an aggregate cursor for each aggregate function supported by the type */}} {{$Name := .Name}} {{$name := .name}} {{range .Aggs}} {{$aggName := .Name}} type {{$name}}Window{{$aggName}}ArrayCursor struct { cursors.{{$Name}}ArrayCursor res *cursors.{{.OutputTypeName}}Array tmp {{$arrayType}} window interval.Window } func new{{$Name}}Window{{$aggName}}ArrayCursor(cur cursors.{{$Name}}ArrayCursor, window interval.Window) *{{$name}}Window{{$aggName}}ArrayCursor { resLen := MaxPointsPerBlock if window.IsZero() { resLen = 1 } return &{{$name}}Window{{$aggName}}ArrayCursor{ {{$Name}}ArrayCursor: cur, res: cursors.New{{.OutputTypeName}}ArrayLen(resLen), tmp: &cursors.{{$Name}}Array{}, window: window, } } func (c *{{$name}}Window{{$aggName}}ArrayCursor) Stats() cursors.CursorStats { return c.{{$Name}}ArrayCursor.Stats() } func (c *{{$name}}Window{{$aggName}}ArrayCursor) Next() *cursors.{{.OutputTypeName}}Array { pos := 0 c.res.Timestamps = c.res.Timestamps[:cap(c.res.Timestamps)] {{if eq .OutputTypeName "MeanCount" }} c.res.Values0 = c.res.Values0[:cap(c.res.Values0)] c.res.Values1 = c.res.Values1[:cap(c.res.Values1)] {{else}} c.res.Values = c.res.Values[:cap(c.res.Values)] {{end}} var a *cursors.{{$Name}}Array if c.tmp.Len() > 0 { a = c.tmp } else { a = c.{{$Name}}ArrayCursor.Next() } if a.Len() == 0 { return &cursors.{{.OutputTypeName}}Array{} } rowIdx := 0 {{.AccDecls}} var windowEnd int64 if !c.window.IsZero() { windowEnd = int64(c.window.GetLatestBounds(values.Time(a.Timestamps[rowIdx])).Stop()) } else { windowEnd = math.MaxInt64 } windowHasPoints := false // enumerate windows WINDOWS: for { for ; rowIdx < a.Len(); rowIdx++ { ts := a.Timestamps[rowIdx] if !c.window.IsZero() && ts >= windowEnd { // new window detected, close the current window // do not generate a point for empty windows if windowHasPoints { {{.AccEmit}} pos++ if pos >= MaxPointsPerBlock { // the output array is full, // save the remaining points in the input array in tmp. // they will be processed in the next call to Next() c.tmp.Timestamps = a.Timestamps[rowIdx:] c.tmp.Values = a.Values[rowIdx:] break WINDOWS } } // start the new window {{.AccReset}} windowEnd = int64(c.window.GetLatestBounds(values.Time(ts)).Stop()) windowHasPoints = false continue WINDOWS } else { {{.Accumulate}} windowHasPoints = true } } // Clear buffered timestamps & values if we make it through a cursor. // The break above will skip this if a cursor is partially read. c.tmp.Timestamps = nil c.tmp.Values = nil // get the next chunk a = c.{{$Name}}ArrayCursor.Next() if a.Len() == 0 { // write the final point // do not generate a point for empty windows if windowHasPoints { {{.AccEmit}} pos++ } break WINDOWS } rowIdx = 0 } c.res.Timestamps = c.res.Timestamps[:pos] {{if eq .OutputTypeName "MeanCount" }} c.res.Values0 = c.res.Values0[:pos] c.res.Values1 = c.res.Values1[:pos] {{else}} c.res.Values = c.res.Values[:pos] {{end}} return c.res } {{end}}{{/* range .Aggs */}} type {{.name}}EmptyArrayCursor struct { res cursors.{{.Name}}Array } var {{.Name}}EmptyArrayCursor cursors.{{.Name}}ArrayCursor = &{{.name}}EmptyArrayCursor{} func (c *{{.name}}EmptyArrayCursor) Err() error { return nil } func (c *{{.name}}EmptyArrayCursor) Close() {} func (c *{{.name}}EmptyArrayCursor) Stats() cursors.CursorStats { return cursors.CursorStats{} } func (c *{{.name}}EmptyArrayCursor) Next() {{$arrayType}} { return &c.res } {{end}}{{/* range . */}} func arrayCursorType(cur cursors.Cursor) string { switch cur.(type) { {{range .}} case cursors.{{.Name}}ArrayCursor: return "{{.name}}" {{end}}{{/* range . */}} default: return "unknown" } }