refactor(storage/reads): refactor and add unit tests for *WindowCountArrayCursor (#18354)

pull/18398/head
Christopher M. Wolff 2020-06-08 11:30:49 -07:00 committed by GitHub
parent 8485007322
commit 3dbfffd851
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 685 additions and 336 deletions

View File

@ -8,6 +8,7 @@ package reads
import (
"errors"
"math"
"github.com/influxdata/influxdb/v2/tsdb/cursors"
)
@ -200,90 +201,122 @@ func (c floatArraySumCursor) Next() *cursors.FloatArray {
}
}
type integerFloatCountArrayCursor struct {
cursors.FloatArrayCursor
}
func (c *integerFloatCountArrayCursor) Stats() cursors.CursorStats {
return c.FloatArrayCursor.Stats()
}
func (c *integerFloatCountArrayCursor) Next() *cursors.IntegerArray {
a := c.FloatArrayCursor.Next()
if len(a.Timestamps) == 0 {
return &cursors.IntegerArray{}
}
ts := a.Timestamps[0]
var acc int64
for {
acc += int64(len(a.Timestamps))
a = c.FloatArrayCursor.Next()
if len(a.Timestamps) == 0 {
res := cursors.NewIntegerArrayLen(1)
res.Timestamps[0] = ts
res.Values[0] = acc
return res
}
}
}
type integerFloatWindowCountArrayCursor struct {
type floatWindowCountArrayCursor struct {
cursors.FloatArrayCursor
every int64
res *cursors.IntegerArray
tmp *cursors.FloatArray
}
func (c *integerFloatWindowCountArrayCursor) Stats() cursors.CursorStats {
func newFloatWindowCountArrayCursor(cur cursors.FloatArrayCursor, every int64) *floatWindowCountArrayCursor {
resLen := MaxPointsPerBlock
if every == 0 {
resLen = 1
}
return &floatWindowCountArrayCursor{
FloatArrayCursor: cur,
every: every,
res: cursors.NewIntegerArrayLen(resLen),
tmp: &cursors.FloatArray{},
}
}
func newFloatCountArrayCursor(cur cursors.FloatArrayCursor) *floatWindowCountArrayCursor {
// zero means aggregate over the whole series
return newFloatWindowCountArrayCursor(cur, 0)
}
func (c *floatWindowCountArrayCursor) Stats() cursors.CursorStats {
return c.FloatArrayCursor.Stats()
}
func (c *integerFloatWindowCountArrayCursor) Next() *cursors.IntegerArray {
a := c.FloatArrayCursor.Next()
func (c *floatWindowCountArrayCursor) Next() *cursors.IntegerArray {
pos := 0
c.res.Timestamps = c.res.Timestamps[:cap(c.res.Timestamps)]
c.res.Values = c.res.Values[:cap(c.res.Values)]
var a *cursors.FloatArray
if c.tmp.Len() > 0 {
a = c.tmp
} else {
a = c.FloatArrayCursor.Next()
}
if a.Len() == 0 {
return &cursors.IntegerArray{}
}
res := cursors.NewIntegerArrayLen(0)
rowIdx := 0
var acc int64 = 0
firstTimestamp := a.Timestamps[rowIdx]
windowStart := firstTimestamp - firstTimestamp%c.every
windowEnd := windowStart + c.every
var windowEnd int64
if c.every != 0 {
firstTimestamp := a.Timestamps[rowIdx]
windowStart := firstTimestamp - firstTimestamp%c.every
windowEnd = windowStart + c.every
} else {
windowEnd = math.MaxInt64
}
// enumerate windows
WINDOWS:
for {
for ; rowIdx < a.Len(); rowIdx++ {
ts := a.Timestamps[rowIdx]
if ts >= windowEnd {
if c.every != 0 && ts >= windowEnd {
// new window detected, close the current window
// do not generate a point for empty windows
if acc > 0 {
res.Timestamps = append(res.Timestamps, windowEnd)
res.Values = append(res.Values, acc)
c.res.Timestamps[pos] = windowEnd
c.res.Values[pos] = acc
pos++
if pos >= MaxPointsPerBlock {
// the output array is full,
// save the remaining points in the input array in tmp.
// they will be processed in the next call to Next()
c.tmp.Timestamps = a.Timestamps[rowIdx:]
c.tmp.Values = a.Values[rowIdx:]
break WINDOWS
}
}
// start the new window
acc = 0
firstTimestamp = a.Timestamps[rowIdx]
windowStart = firstTimestamp - firstTimestamp%c.every
firstTimestamp := a.Timestamps[rowIdx]
windowStart := firstTimestamp - firstTimestamp%c.every
windowEnd = windowStart + c.every
continue WINDOWS
} else {
acc++
}
}
// Clear buffered timestamps & values if we make it through a cursor.
// The break above will skip this if a cursor is partially read.
c.tmp.Timestamps = nil
c.tmp.Values = nil
// get the next chunk
a = c.FloatArrayCursor.Next()
if a.Len() == 0 {
// write the final point
// do not generate a point for empty windows
if acc > 0 {
res.Timestamps = append(res.Timestamps, windowEnd)
res.Values = append(res.Values, acc)
c.res.Timestamps[pos] = windowEnd
c.res.Values[pos] = acc
pos++
}
break
break WINDOWS
}
rowIdx = 0
}
return res
c.res.Timestamps = c.res.Timestamps[:pos]
c.res.Values = c.res.Values[:pos]
return c.res
}
type floatEmptyArrayCursor struct {
@ -478,90 +511,122 @@ func (c integerArraySumCursor) Next() *cursors.IntegerArray {
}
}
type integerIntegerCountArrayCursor struct {
cursors.IntegerArrayCursor
}
func (c *integerIntegerCountArrayCursor) Stats() cursors.CursorStats {
return c.IntegerArrayCursor.Stats()
}
func (c *integerIntegerCountArrayCursor) Next() *cursors.IntegerArray {
a := c.IntegerArrayCursor.Next()
if len(a.Timestamps) == 0 {
return &cursors.IntegerArray{}
}
ts := a.Timestamps[0]
var acc int64
for {
acc += int64(len(a.Timestamps))
a = c.IntegerArrayCursor.Next()
if len(a.Timestamps) == 0 {
res := cursors.NewIntegerArrayLen(1)
res.Timestamps[0] = ts
res.Values[0] = acc
return res
}
}
}
type integerIntegerWindowCountArrayCursor struct {
type integerWindowCountArrayCursor struct {
cursors.IntegerArrayCursor
every int64
res *cursors.IntegerArray
tmp *cursors.IntegerArray
}
func (c *integerIntegerWindowCountArrayCursor) Stats() cursors.CursorStats {
func newIntegerWindowCountArrayCursor(cur cursors.IntegerArrayCursor, every int64) *integerWindowCountArrayCursor {
resLen := MaxPointsPerBlock
if every == 0 {
resLen = 1
}
return &integerWindowCountArrayCursor{
IntegerArrayCursor: cur,
every: every,
res: cursors.NewIntegerArrayLen(resLen),
tmp: &cursors.IntegerArray{},
}
}
func newIntegerCountArrayCursor(cur cursors.IntegerArrayCursor) *integerWindowCountArrayCursor {
// zero means aggregate over the whole series
return newIntegerWindowCountArrayCursor(cur, 0)
}
func (c *integerWindowCountArrayCursor) Stats() cursors.CursorStats {
return c.IntegerArrayCursor.Stats()
}
func (c *integerIntegerWindowCountArrayCursor) Next() *cursors.IntegerArray {
a := c.IntegerArrayCursor.Next()
func (c *integerWindowCountArrayCursor) Next() *cursors.IntegerArray {
pos := 0
c.res.Timestamps = c.res.Timestamps[:cap(c.res.Timestamps)]
c.res.Values = c.res.Values[:cap(c.res.Values)]
var a *cursors.IntegerArray
if c.tmp.Len() > 0 {
a = c.tmp
} else {
a = c.IntegerArrayCursor.Next()
}
if a.Len() == 0 {
return &cursors.IntegerArray{}
}
res := cursors.NewIntegerArrayLen(0)
rowIdx := 0
var acc int64 = 0
firstTimestamp := a.Timestamps[rowIdx]
windowStart := firstTimestamp - firstTimestamp%c.every
windowEnd := windowStart + c.every
var windowEnd int64
if c.every != 0 {
firstTimestamp := a.Timestamps[rowIdx]
windowStart := firstTimestamp - firstTimestamp%c.every
windowEnd = windowStart + c.every
} else {
windowEnd = math.MaxInt64
}
// enumerate windows
WINDOWS:
for {
for ; rowIdx < a.Len(); rowIdx++ {
ts := a.Timestamps[rowIdx]
if ts >= windowEnd {
if c.every != 0 && ts >= windowEnd {
// new window detected, close the current window
// do not generate a point for empty windows
if acc > 0 {
res.Timestamps = append(res.Timestamps, windowEnd)
res.Values = append(res.Values, acc)
c.res.Timestamps[pos] = windowEnd
c.res.Values[pos] = acc
pos++
if pos >= MaxPointsPerBlock {
// the output array is full,
// save the remaining points in the input array in tmp.
// they will be processed in the next call to Next()
c.tmp.Timestamps = a.Timestamps[rowIdx:]
c.tmp.Values = a.Values[rowIdx:]
break WINDOWS
}
}
// start the new window
acc = 0
firstTimestamp = a.Timestamps[rowIdx]
windowStart = firstTimestamp - firstTimestamp%c.every
firstTimestamp := a.Timestamps[rowIdx]
windowStart := firstTimestamp - firstTimestamp%c.every
windowEnd = windowStart + c.every
continue WINDOWS
} else {
acc++
}
}
// Clear buffered timestamps & values if we make it through a cursor.
// The break above will skip this if a cursor is partially read.
c.tmp.Timestamps = nil
c.tmp.Values = nil
// get the next chunk
a = c.IntegerArrayCursor.Next()
if a.Len() == 0 {
// write the final point
// do not generate a point for empty windows
if acc > 0 {
res.Timestamps = append(res.Timestamps, windowEnd)
res.Values = append(res.Values, acc)
c.res.Timestamps[pos] = windowEnd
c.res.Values[pos] = acc
pos++
}
break
break WINDOWS
}
rowIdx = 0
}
return res
c.res.Timestamps = c.res.Timestamps[:pos]
c.res.Values = c.res.Values[:pos]
return c.res
}
type integerEmptyArrayCursor struct {
@ -756,90 +821,122 @@ func (c unsignedArraySumCursor) Next() *cursors.UnsignedArray {
}
}
type integerUnsignedCountArrayCursor struct {
cursors.UnsignedArrayCursor
}
func (c *integerUnsignedCountArrayCursor) Stats() cursors.CursorStats {
return c.UnsignedArrayCursor.Stats()
}
func (c *integerUnsignedCountArrayCursor) Next() *cursors.IntegerArray {
a := c.UnsignedArrayCursor.Next()
if len(a.Timestamps) == 0 {
return &cursors.IntegerArray{}
}
ts := a.Timestamps[0]
var acc int64
for {
acc += int64(len(a.Timestamps))
a = c.UnsignedArrayCursor.Next()
if len(a.Timestamps) == 0 {
res := cursors.NewIntegerArrayLen(1)
res.Timestamps[0] = ts
res.Values[0] = acc
return res
}
}
}
type integerUnsignedWindowCountArrayCursor struct {
type unsignedWindowCountArrayCursor struct {
cursors.UnsignedArrayCursor
every int64
res *cursors.IntegerArray
tmp *cursors.UnsignedArray
}
func (c *integerUnsignedWindowCountArrayCursor) Stats() cursors.CursorStats {
func newUnsignedWindowCountArrayCursor(cur cursors.UnsignedArrayCursor, every int64) *unsignedWindowCountArrayCursor {
resLen := MaxPointsPerBlock
if every == 0 {
resLen = 1
}
return &unsignedWindowCountArrayCursor{
UnsignedArrayCursor: cur,
every: every,
res: cursors.NewIntegerArrayLen(resLen),
tmp: &cursors.UnsignedArray{},
}
}
func newUnsignedCountArrayCursor(cur cursors.UnsignedArrayCursor) *unsignedWindowCountArrayCursor {
// zero means aggregate over the whole series
return newUnsignedWindowCountArrayCursor(cur, 0)
}
func (c *unsignedWindowCountArrayCursor) Stats() cursors.CursorStats {
return c.UnsignedArrayCursor.Stats()
}
func (c *integerUnsignedWindowCountArrayCursor) Next() *cursors.IntegerArray {
a := c.UnsignedArrayCursor.Next()
func (c *unsignedWindowCountArrayCursor) Next() *cursors.IntegerArray {
pos := 0
c.res.Timestamps = c.res.Timestamps[:cap(c.res.Timestamps)]
c.res.Values = c.res.Values[:cap(c.res.Values)]
var a *cursors.UnsignedArray
if c.tmp.Len() > 0 {
a = c.tmp
} else {
a = c.UnsignedArrayCursor.Next()
}
if a.Len() == 0 {
return &cursors.IntegerArray{}
}
res := cursors.NewIntegerArrayLen(0)
rowIdx := 0
var acc int64 = 0
firstTimestamp := a.Timestamps[rowIdx]
windowStart := firstTimestamp - firstTimestamp%c.every
windowEnd := windowStart + c.every
var windowEnd int64
if c.every != 0 {
firstTimestamp := a.Timestamps[rowIdx]
windowStart := firstTimestamp - firstTimestamp%c.every
windowEnd = windowStart + c.every
} else {
windowEnd = math.MaxInt64
}
// enumerate windows
WINDOWS:
for {
for ; rowIdx < a.Len(); rowIdx++ {
ts := a.Timestamps[rowIdx]
if ts >= windowEnd {
if c.every != 0 && ts >= windowEnd {
// new window detected, close the current window
// do not generate a point for empty windows
if acc > 0 {
res.Timestamps = append(res.Timestamps, windowEnd)
res.Values = append(res.Values, acc)
c.res.Timestamps[pos] = windowEnd
c.res.Values[pos] = acc
pos++
if pos >= MaxPointsPerBlock {
// the output array is full,
// save the remaining points in the input array in tmp.
// they will be processed in the next call to Next()
c.tmp.Timestamps = a.Timestamps[rowIdx:]
c.tmp.Values = a.Values[rowIdx:]
break WINDOWS
}
}
// start the new window
acc = 0
firstTimestamp = a.Timestamps[rowIdx]
windowStart = firstTimestamp - firstTimestamp%c.every
firstTimestamp := a.Timestamps[rowIdx]
windowStart := firstTimestamp - firstTimestamp%c.every
windowEnd = windowStart + c.every
continue WINDOWS
} else {
acc++
}
}
// Clear buffered timestamps & values if we make it through a cursor.
// The break above will skip this if a cursor is partially read.
c.tmp.Timestamps = nil
c.tmp.Values = nil
// get the next chunk
a = c.UnsignedArrayCursor.Next()
if a.Len() == 0 {
// write the final point
// do not generate a point for empty windows
if acc > 0 {
res.Timestamps = append(res.Timestamps, windowEnd)
res.Values = append(res.Values, acc)
c.res.Timestamps[pos] = windowEnd
c.res.Values[pos] = acc
pos++
}
break
break WINDOWS
}
rowIdx = 0
}
return res
c.res.Timestamps = c.res.Timestamps[:pos]
c.res.Values = c.res.Values[:pos]
return c.res
}
type unsignedEmptyArrayCursor struct {
@ -994,90 +1091,122 @@ func (c *stringArrayCursor) nextArrayCursor() bool {
return ok
}
type integerStringCountArrayCursor struct {
cursors.StringArrayCursor
}
func (c *integerStringCountArrayCursor) Stats() cursors.CursorStats {
return c.StringArrayCursor.Stats()
}
func (c *integerStringCountArrayCursor) Next() *cursors.IntegerArray {
a := c.StringArrayCursor.Next()
if len(a.Timestamps) == 0 {
return &cursors.IntegerArray{}
}
ts := a.Timestamps[0]
var acc int64
for {
acc += int64(len(a.Timestamps))
a = c.StringArrayCursor.Next()
if len(a.Timestamps) == 0 {
res := cursors.NewIntegerArrayLen(1)
res.Timestamps[0] = ts
res.Values[0] = acc
return res
}
}
}
type integerStringWindowCountArrayCursor struct {
type stringWindowCountArrayCursor struct {
cursors.StringArrayCursor
every int64
res *cursors.IntegerArray
tmp *cursors.StringArray
}
func (c *integerStringWindowCountArrayCursor) Stats() cursors.CursorStats {
func newStringWindowCountArrayCursor(cur cursors.StringArrayCursor, every int64) *stringWindowCountArrayCursor {
resLen := MaxPointsPerBlock
if every == 0 {
resLen = 1
}
return &stringWindowCountArrayCursor{
StringArrayCursor: cur,
every: every,
res: cursors.NewIntegerArrayLen(resLen),
tmp: &cursors.StringArray{},
}
}
func newStringCountArrayCursor(cur cursors.StringArrayCursor) *stringWindowCountArrayCursor {
// zero means aggregate over the whole series
return newStringWindowCountArrayCursor(cur, 0)
}
func (c *stringWindowCountArrayCursor) Stats() cursors.CursorStats {
return c.StringArrayCursor.Stats()
}
func (c *integerStringWindowCountArrayCursor) Next() *cursors.IntegerArray {
a := c.StringArrayCursor.Next()
func (c *stringWindowCountArrayCursor) Next() *cursors.IntegerArray {
pos := 0
c.res.Timestamps = c.res.Timestamps[:cap(c.res.Timestamps)]
c.res.Values = c.res.Values[:cap(c.res.Values)]
var a *cursors.StringArray
if c.tmp.Len() > 0 {
a = c.tmp
} else {
a = c.StringArrayCursor.Next()
}
if a.Len() == 0 {
return &cursors.IntegerArray{}
}
res := cursors.NewIntegerArrayLen(0)
rowIdx := 0
var acc int64 = 0
firstTimestamp := a.Timestamps[rowIdx]
windowStart := firstTimestamp - firstTimestamp%c.every
windowEnd := windowStart + c.every
var windowEnd int64
if c.every != 0 {
firstTimestamp := a.Timestamps[rowIdx]
windowStart := firstTimestamp - firstTimestamp%c.every
windowEnd = windowStart + c.every
} else {
windowEnd = math.MaxInt64
}
// enumerate windows
WINDOWS:
for {
for ; rowIdx < a.Len(); rowIdx++ {
ts := a.Timestamps[rowIdx]
if ts >= windowEnd {
if c.every != 0 && ts >= windowEnd {
// new window detected, close the current window
// do not generate a point for empty windows
if acc > 0 {
res.Timestamps = append(res.Timestamps, windowEnd)
res.Values = append(res.Values, acc)
c.res.Timestamps[pos] = windowEnd
c.res.Values[pos] = acc
pos++
if pos >= MaxPointsPerBlock {
// the output array is full,
// save the remaining points in the input array in tmp.
// they will be processed in the next call to Next()
c.tmp.Timestamps = a.Timestamps[rowIdx:]
c.tmp.Values = a.Values[rowIdx:]
break WINDOWS
}
}
// start the new window
acc = 0
firstTimestamp = a.Timestamps[rowIdx]
windowStart = firstTimestamp - firstTimestamp%c.every
firstTimestamp := a.Timestamps[rowIdx]
windowStart := firstTimestamp - firstTimestamp%c.every
windowEnd = windowStart + c.every
continue WINDOWS
} else {
acc++
}
}
// Clear buffered timestamps & values if we make it through a cursor.
// The break above will skip this if a cursor is partially read.
c.tmp.Timestamps = nil
c.tmp.Values = nil
// get the next chunk
a = c.StringArrayCursor.Next()
if a.Len() == 0 {
// write the final point
// do not generate a point for empty windows
if acc > 0 {
res.Timestamps = append(res.Timestamps, windowEnd)
res.Values = append(res.Values, acc)
c.res.Timestamps[pos] = windowEnd
c.res.Values[pos] = acc
pos++
}
break
break WINDOWS
}
rowIdx = 0
}
return res
c.res.Timestamps = c.res.Timestamps[:pos]
c.res.Values = c.res.Values[:pos]
return c.res
}
type stringEmptyArrayCursor struct {
@ -1232,90 +1361,122 @@ func (c *booleanArrayCursor) nextArrayCursor() bool {
return ok
}
type integerBooleanCountArrayCursor struct {
cursors.BooleanArrayCursor
}
func (c *integerBooleanCountArrayCursor) Stats() cursors.CursorStats {
return c.BooleanArrayCursor.Stats()
}
func (c *integerBooleanCountArrayCursor) Next() *cursors.IntegerArray {
a := c.BooleanArrayCursor.Next()
if len(a.Timestamps) == 0 {
return &cursors.IntegerArray{}
}
ts := a.Timestamps[0]
var acc int64
for {
acc += int64(len(a.Timestamps))
a = c.BooleanArrayCursor.Next()
if len(a.Timestamps) == 0 {
res := cursors.NewIntegerArrayLen(1)
res.Timestamps[0] = ts
res.Values[0] = acc
return res
}
}
}
type integerBooleanWindowCountArrayCursor struct {
type booleanWindowCountArrayCursor struct {
cursors.BooleanArrayCursor
every int64
res *cursors.IntegerArray
tmp *cursors.BooleanArray
}
func (c *integerBooleanWindowCountArrayCursor) Stats() cursors.CursorStats {
func newBooleanWindowCountArrayCursor(cur cursors.BooleanArrayCursor, every int64) *booleanWindowCountArrayCursor {
resLen := MaxPointsPerBlock
if every == 0 {
resLen = 1
}
return &booleanWindowCountArrayCursor{
BooleanArrayCursor: cur,
every: every,
res: cursors.NewIntegerArrayLen(resLen),
tmp: &cursors.BooleanArray{},
}
}
func newBooleanCountArrayCursor(cur cursors.BooleanArrayCursor) *booleanWindowCountArrayCursor {
// zero means aggregate over the whole series
return newBooleanWindowCountArrayCursor(cur, 0)
}
func (c *booleanWindowCountArrayCursor) Stats() cursors.CursorStats {
return c.BooleanArrayCursor.Stats()
}
func (c *integerBooleanWindowCountArrayCursor) Next() *cursors.IntegerArray {
a := c.BooleanArrayCursor.Next()
func (c *booleanWindowCountArrayCursor) Next() *cursors.IntegerArray {
pos := 0
c.res.Timestamps = c.res.Timestamps[:cap(c.res.Timestamps)]
c.res.Values = c.res.Values[:cap(c.res.Values)]
var a *cursors.BooleanArray
if c.tmp.Len() > 0 {
a = c.tmp
} else {
a = c.BooleanArrayCursor.Next()
}
if a.Len() == 0 {
return &cursors.IntegerArray{}
}
res := cursors.NewIntegerArrayLen(0)
rowIdx := 0
var acc int64 = 0
firstTimestamp := a.Timestamps[rowIdx]
windowStart := firstTimestamp - firstTimestamp%c.every
windowEnd := windowStart + c.every
var windowEnd int64
if c.every != 0 {
firstTimestamp := a.Timestamps[rowIdx]
windowStart := firstTimestamp - firstTimestamp%c.every
windowEnd = windowStart + c.every
} else {
windowEnd = math.MaxInt64
}
// enumerate windows
WINDOWS:
for {
for ; rowIdx < a.Len(); rowIdx++ {
ts := a.Timestamps[rowIdx]
if ts >= windowEnd {
if c.every != 0 && ts >= windowEnd {
// new window detected, close the current window
// do not generate a point for empty windows
if acc > 0 {
res.Timestamps = append(res.Timestamps, windowEnd)
res.Values = append(res.Values, acc)
c.res.Timestamps[pos] = windowEnd
c.res.Values[pos] = acc
pos++
if pos >= MaxPointsPerBlock {
// the output array is full,
// save the remaining points in the input array in tmp.
// they will be processed in the next call to Next()
c.tmp.Timestamps = a.Timestamps[rowIdx:]
c.tmp.Values = a.Values[rowIdx:]
break WINDOWS
}
}
// start the new window
acc = 0
firstTimestamp = a.Timestamps[rowIdx]
windowStart = firstTimestamp - firstTimestamp%c.every
firstTimestamp := a.Timestamps[rowIdx]
windowStart := firstTimestamp - firstTimestamp%c.every
windowEnd = windowStart + c.every
continue WINDOWS
} else {
acc++
}
}
// Clear buffered timestamps & values if we make it through a cursor.
// The break above will skip this if a cursor is partially read.
c.tmp.Timestamps = nil
c.tmp.Values = nil
// get the next chunk
a = c.BooleanArrayCursor.Next()
if a.Len() == 0 {
// write the final point
// do not generate a point for empty windows
if acc > 0 {
res.Timestamps = append(res.Timestamps, windowEnd)
res.Values = append(res.Values, acc)
c.res.Timestamps[pos] = windowEnd
c.res.Values[pos] = acc
pos++
}
break
break WINDOWS
}
rowIdx = 0
}
return res
c.res.Timestamps = c.res.Timestamps[:pos]
c.res.Values = c.res.Values[:pos]
return c.res
}
type booleanEmptyArrayCursor struct {

View File

@ -2,6 +2,7 @@ package reads
import (
"errors"
"math"
"github.com/influxdata/influxdb/v2/tsdb/cursors"
)
@ -207,90 +208,122 @@ func (c {{$type}}) Next() {{$arrayType}} {
{{end}}
type integer{{.Name}}CountArrayCursor struct {
cursors.{{.Name}}ArrayCursor
}
func (c *integer{{.Name}}CountArrayCursor) Stats() cursors.CursorStats {
return c.{{.Name}}ArrayCursor.Stats()
}
func (c *integer{{.Name}}CountArrayCursor) Next() *cursors.IntegerArray {
a := c.{{.Name}}ArrayCursor.Next()
if len(a.Timestamps) == 0 {
return &cursors.IntegerArray{}
}
ts := a.Timestamps[0]
var acc int64
for {
acc += int64(len(a.Timestamps))
a = c.{{.Name}}ArrayCursor.Next()
if len(a.Timestamps) == 0 {
res := cursors.NewIntegerArrayLen(1)
res.Timestamps[0] = ts
res.Values[0] = acc
return res
}
}
}
type integer{{.Name}}WindowCountArrayCursor struct {
type {{.name}}WindowCountArrayCursor struct {
cursors.{{.Name}}ArrayCursor
every int64
res *cursors.IntegerArray
tmp {{$arrayType}}
}
func (c *integer{{.Name}}WindowCountArrayCursor) Stats() cursors.CursorStats {
func new{{.Name}}WindowCountArrayCursor(cur cursors.{{.Name}}ArrayCursor, every int64) *{{.name}}WindowCountArrayCursor {
resLen := MaxPointsPerBlock
if every == 0 {
resLen = 1
}
return &{{.name}}WindowCountArrayCursor{
{{.Name}}ArrayCursor: cur,
every: every,
res: cursors.NewIntegerArrayLen(resLen),
tmp: &cursors.{{.Name}}Array{},
}
}
func new{{.Name}}CountArrayCursor(cur cursors.{{.Name}}ArrayCursor) *{{.name}}WindowCountArrayCursor {
// zero means aggregate over the whole series
return new{{.Name}}WindowCountArrayCursor(cur, 0)
}
func (c *{{.name}}WindowCountArrayCursor) Stats() cursors.CursorStats {
return c.{{.Name}}ArrayCursor.Stats()
}
func (c *integer{{.Name}}WindowCountArrayCursor) Next() *cursors.IntegerArray {
a := c.{{.Name}}ArrayCursor.Next()
func (c *{{.name}}WindowCountArrayCursor) Next() *cursors.IntegerArray {
pos := 0
c.res.Timestamps = c.res.Timestamps[:cap(c.res.Timestamps)]
c.res.Values = c.res.Values[:cap(c.res.Values)]
var a *cursors.{{.Name}}Array
if c.tmp.Len() > 0 {
a = c.tmp
} else {
a = c.{{.Name}}ArrayCursor.Next()
}
if a.Len() == 0 {
return &cursors.IntegerArray{}
}
res := cursors.NewIntegerArrayLen(0)
rowIdx := 0
var acc int64 = 0
firstTimestamp := a.Timestamps[rowIdx]
windowStart := firstTimestamp - firstTimestamp%c.every
windowEnd := windowStart + c.every
var windowEnd int64
if c.every != 0 {
firstTimestamp := a.Timestamps[rowIdx]
windowStart := firstTimestamp - firstTimestamp%c.every
windowEnd = windowStart + c.every
} else {
windowEnd = math.MaxInt64
}
// enumerate windows
WINDOWS:
for {
for ; rowIdx < a.Len(); rowIdx++ {
ts := a.Timestamps[rowIdx]
if ts >= windowEnd {
if c.every != 0 && ts >= windowEnd {
// new window detected, close the current window
// do not generate a point for empty windows
if acc > 0 {
res.Timestamps = append(res.Timestamps, windowEnd)
res.Values = append(res.Values, acc)
c.res.Timestamps[pos] = windowEnd
c.res.Values[pos] = acc
pos++
if pos >= MaxPointsPerBlock {
// the output array is full,
// save the remaining points in the input array in tmp.
// they will be processed in the next call to Next()
c.tmp.Timestamps = a.Timestamps[rowIdx:]
c.tmp.Values = a.Values[rowIdx:]
break WINDOWS
}
}
// start the new window
acc = 0
firstTimestamp = a.Timestamps[rowIdx]
windowStart = firstTimestamp - firstTimestamp%c.every
firstTimestamp := a.Timestamps[rowIdx]
windowStart := firstTimestamp - firstTimestamp%c.every
windowEnd = windowStart + c.every
continue WINDOWS
} else {
acc++
}
}
// Clear buffered timestamps & values if we make it through a cursor.
// The break above will skip this if a cursor is partially read.
c.tmp.Timestamps = nil
c.tmp.Values = nil
// get the next chunk
a = c.{{.Name}}ArrayCursor.Next()
if a.Len() == 0 {
// write the final point
// do not generate a point for empty windows
if acc > 0 {
res.Timestamps = append(res.Timestamps, windowEnd)
res.Values = append(res.Values, acc)
c.res.Timestamps[pos] = windowEnd
c.res.Values[pos] = acc
pos++
}
break
break WINDOWS
}
rowIdx = 0
}
return res
c.res.Timestamps = c.res.Timestamps[:pos]
c.res.Values = c.res.Values[:pos]
return c.res
}
type {{.name}}EmptyArrayCursor struct {
@ -304,4 +337,4 @@ func (c *{{.name}}EmptyArrayCursor) Close() {}
func (c *{{.name}}EmptyArrayCursor) Stats() cursors.CursorStats { return cursors.CursorStats{} }
func (c *{{.name}}EmptyArrayCursor) Next() {{$arrayType}} { return &c.res }
{{end}}
{{end}}{{/* range . */}}

View File

@ -3,6 +3,7 @@ package reads
import (
"context"
"fmt"
"math"
"github.com/influxdata/influxdb/v2/storage/reads/datatypes"
"github.com/influxdata/influxdb/v2/tsdb/cursors"
@ -62,47 +63,37 @@ func newSumArrayCursor(cur cursors.Cursor) cursors.Cursor {
func newCountArrayCursor(cur cursors.Cursor) cursors.Cursor {
switch cur := cur.(type) {
case cursors.FloatArrayCursor:
return &integerFloatCountArrayCursor{FloatArrayCursor: cur}
return newFloatCountArrayCursor(cur)
case cursors.IntegerArrayCursor:
return &integerIntegerCountArrayCursor{IntegerArrayCursor: cur}
return newIntegerCountArrayCursor(cur)
case cursors.UnsignedArrayCursor:
return &integerUnsignedCountArrayCursor{UnsignedArrayCursor: cur}
return newUnsignedCountArrayCursor(cur)
case cursors.StringArrayCursor:
return &integerStringCountArrayCursor{StringArrayCursor: cur}
return newStringCountArrayCursor(cur)
case cursors.BooleanArrayCursor:
return &integerBooleanCountArrayCursor{BooleanArrayCursor: cur}
return newBooleanCountArrayCursor(cur)
default:
panic(fmt.Sprintf("unreachable: %T", cur))
}
}
func newWindowCountArrayCursor(cur cursors.Cursor, req *datatypes.ReadWindowAggregateRequest) cursors.Cursor {
if req.WindowEvery == math.MaxInt64 {
// This means to aggregate over the entire range,
// don't do windowed aggregation.
return newCountArrayCursor(cur)
}
switch cur := cur.(type) {
case cursors.FloatArrayCursor:
return &integerFloatWindowCountArrayCursor{
FloatArrayCursor: cur,
every: req.WindowEvery,
}
return newFloatWindowCountArrayCursor(cur, req.WindowEvery)
case cursors.IntegerArrayCursor:
return &integerIntegerWindowCountArrayCursor{
IntegerArrayCursor: cur,
every: req.WindowEvery,
}
return newIntegerWindowCountArrayCursor(cur, req.WindowEvery)
case cursors.UnsignedArrayCursor:
return &integerUnsignedWindowCountArrayCursor{
UnsignedArrayCursor: cur,
every: req.WindowEvery,
}
return newUnsignedWindowCountArrayCursor(cur, req.WindowEvery)
case cursors.StringArrayCursor:
return &integerStringWindowCountArrayCursor{
StringArrayCursor: cur,
every: req.WindowEvery,
}
return newStringWindowCountArrayCursor(cur, req.WindowEvery)
case cursors.BooleanArrayCursor:
return &integerBooleanWindowCountArrayCursor{
BooleanArrayCursor: cur,
every: req.WindowEvery,
}
return newBooleanWindowCountArrayCursor(cur, req.WindowEvery)
default:
panic(fmt.Sprintf("unreachable: %T", cur))
}

View File

@ -6,6 +6,7 @@ import (
"time"
"github.com/google/go-cmp/cmp"
"github.com/influxdata/influxdb/v2/storage/reads/datatypes"
"github.com/influxdata/influxdb/v2/tsdb/cursors"
)
@ -42,15 +43,15 @@ func TestIntegerFilterArrayCursor(t *testing.T) {
}
}
func makeIntegerArray(n int64, tsStart time.Time, tsStep time.Duration, vStart, vStep int64) *cursors.IntegerArray {
func makeIntegerArray(n int, tsStart time.Time, tsStep time.Duration, valueFn func(i int64) int64) *cursors.IntegerArray {
ia := &cursors.IntegerArray{
Timestamps: make([]int64, n),
Values: make([]int64, n),
}
for i := int64(0); i < n; i++ {
ia.Timestamps[i] = tsStart.UnixNano() + i*int64(tsStep)
ia.Values[i] = vStart + i*vStep
for i := 0; i < n; i++ {
ia.Timestamps[i] = tsStart.UnixNano() + int64(i)*int64(tsStep)
ia.Values[i] = valueFn(int64(i))
}
return ia
@ -64,6 +65,13 @@ func mustParseTime(ts string) time.Time {
return t
}
func copyIntegerArray(src *cursors.IntegerArray) *cursors.IntegerArray {
dst := cursors.NewIntegerArrayLen(src.Len())
copy(dst.Timestamps, src.Timestamps)
copy(dst.Values, src.Values)
return dst
}
func TestIntegerIntegerCountArrayCursor(t *testing.T) {
maxTimestamp := time.Unix(0, math.MaxInt64)
@ -80,11 +88,74 @@ func TestIntegerIntegerCountArrayCursor(t *testing.T) {
makeIntegerArray(
60,
mustParseTime("2010-01-01T00:00:00Z"), time.Minute,
100, 1,
func(i int64) int64 { return 100 + i },
),
},
want: []*cursors.IntegerArray{
makeIntegerArray(4, mustParseTime("2010-01-01T00:15:00Z"), 15*time.Minute, 15, 0),
makeIntegerArray(4, mustParseTime("2010-01-01T00:15:00Z"), 15*time.Minute, func(int64) int64 { return 15 }),
},
},
{
name: "empty windows",
every: time.Minute,
inputArrays: []*cursors.IntegerArray{
makeIntegerArray(
4,
mustParseTime("2010-01-01T00:00:00Z"), 15*time.Minute,
func(i int64) int64 { return 100 + i },
),
},
want: []*cursors.IntegerArray{
makeIntegerArray(
4,
mustParseTime("2010-01-01T00:01:00Z"), 15*time.Minute,
func(i int64) int64 { return 1 },
),
},
},
{
name: "unaligned window",
every: 15 * time.Minute,
inputArrays: []*cursors.IntegerArray{
makeIntegerArray(
60,
mustParseTime("2010-01-01T00:00:30Z"), time.Minute,
func(i int64) int64 { return 100 + i },
),
},
want: []*cursors.IntegerArray{
makeIntegerArray(
4,
mustParseTime("2010-01-01T00:15:00Z"), 15*time.Minute,
func(i int64) int64 {
return 15
}),
},
},
{
name: "more unaligned window",
every: 15 * time.Minute,
inputArrays: []*cursors.IntegerArray{
makeIntegerArray(
60,
mustParseTime("2010-01-01T00:01:30Z"), time.Minute,
func(i int64) int64 { return 100 + i },
),
},
want: []*cursors.IntegerArray{
makeIntegerArray(
5,
mustParseTime("2010-01-01T00:15:00Z"), 15*time.Minute,
func(i int64) int64 {
switch i {
case 0:
return 14
case 4:
return 1
default:
return 15
}
}),
},
},
{
@ -94,16 +165,16 @@ func TestIntegerIntegerCountArrayCursor(t *testing.T) {
makeIntegerArray(
60,
mustParseTime("2010-01-01T00:00:00Z"), time.Minute,
100, 1,
func(i int64) int64 { return 100 + i },
),
makeIntegerArray(
60,
mustParseTime("2010-01-01T01:00:00Z"), time.Minute,
200, 1,
func(i int64) int64 { return 200 + i },
),
},
want: []*cursors.IntegerArray{
makeIntegerArray(8, mustParseTime("2010-01-01T00:15:00Z"), 15*time.Minute, 15, 0),
makeIntegerArray(8, mustParseTime("2010-01-01T00:15:00Z"), 15*time.Minute, func(int64) int64 { return 15 }),
},
},
{
@ -113,82 +184,131 @@ func TestIntegerIntegerCountArrayCursor(t *testing.T) {
makeIntegerArray(
60,
mustParseTime("2010-01-01T00:00:00Z"), time.Minute,
100, 1,
func(i int64) int64 { return 100 + i },
),
makeIntegerArray(
60,
mustParseTime("2010-01-01T01:00:00Z"), time.Minute,
200, 1,
func(i int64) int64 { return 200 + i },
),
},
want: []*cursors.IntegerArray{
makeIntegerArray(3, mustParseTime("2010-01-01T00:40:00Z"), 40*time.Minute, 40, 0),
makeIntegerArray(3, mustParseTime("2010-01-01T00:40:00Z"), 40*time.Minute, func(int64) int64 { return 40 }),
},
},
{
name: "window max int every",
every: time.Duration(math.MaxInt64),
name: "more windows than MaxPointsPerBlock",
every: 2 * time.Millisecond,
inputArrays: []*cursors.IntegerArray{
makeIntegerArray( // 1 second, one point per ms
1000,
mustParseTime("2010-01-01T00:00:00Z"), time.Millisecond,
func(i int64) int64 { return i },
),
makeIntegerArray( // 1 second, one point per ms
1000,
mustParseTime("2010-01-01T00:00:01Z"), time.Millisecond,
func(i int64) int64 { return i },
),
makeIntegerArray( // 1 second, one point per ms
1000,
mustParseTime("2010-01-01T00:00:02Z"), time.Millisecond,
func(i int64) int64 { return i },
),
},
want: []*cursors.IntegerArray{
makeIntegerArray(
1000,
mustParseTime("2010-01-01T00:00:00.002Z"), 2*time.Millisecond,
func(i int64) int64 { return 2 },
),
makeIntegerArray(
500,
mustParseTime("2010-01-01T00:00:02.002Z"), 2*time.Millisecond,
func(i int64) int64 { return 2 },
),
},
},
{
name: "whole series",
inputArrays: []*cursors.IntegerArray{
makeIntegerArray(
60,
mustParseTime("2010-01-01T00:00:00Z"), time.Minute,
100, 1,
func(i int64) int64 { return 100 + i },
),
},
want: []*cursors.IntegerArray{
makeIntegerArray(1, maxTimestamp, 40*time.Minute, 60, 0),
makeIntegerArray(1, maxTimestamp, 40*time.Minute, func(i int64) int64 { return 60 }),
},
},
{
name: "window max int every two arrays",
every: time.Duration(math.MaxInt64),
name: "whole series no points",
inputArrays: []*cursors.IntegerArray{{}},
want: []*cursors.IntegerArray{},
},
{
name: "whole series two arrays",
inputArrays: []*cursors.IntegerArray{
makeIntegerArray(
60,
mustParseTime("2010-01-01T00:00:00Z"), time.Minute,
100, 1,
func(i int64) int64 { return 100 + i },
),
makeIntegerArray(
60,
mustParseTime("2010-01-01T01:00:00Z"), time.Minute,
100, 1,
func(i int64) int64 { return 100 + i },
),
},
want: []*cursors.IntegerArray{
makeIntegerArray(1, maxTimestamp, 40*time.Minute, 120, 0),
makeIntegerArray(1, maxTimestamp, 40*time.Minute, func(int64) int64 { return 120 }),
},
},
{
name: "window max int span epoch",
every: time.Duration(math.MaxInt64),
name: "whole series span epoch",
inputArrays: []*cursors.IntegerArray{
makeIntegerArray(
120,
mustParseTime("1969-12-31T23:00:00Z"), time.Minute,
100, 1,
func(i int64) int64 { return 100 + i },
),
},
want: []*cursors.IntegerArray{
makeIntegerArray(1, maxTimestamp, 40*time.Minute, 120, 0),
makeIntegerArray(1, maxTimestamp, 40*time.Minute, func(int64) int64 { return 120 }),
},
},
{
name: "window max int span epoch two arrays",
every: time.Duration(math.MaxInt64),
name: "whole series span epoch two arrays",
inputArrays: []*cursors.IntegerArray{
makeIntegerArray(
60,
mustParseTime("1969-12-31T23:00:00Z"), time.Minute,
100, 1,
func(i int64) int64 { return 100 + i },
),
makeIntegerArray(
60,
mustParseTime("1970-01-01T00:00:00Z"), time.Minute,
100, 1,
func(i int64) int64 { return 100 + i },
),
},
want: []*cursors.IntegerArray{
makeIntegerArray(1, maxTimestamp, 40*time.Minute, 120, 0),
makeIntegerArray(1, maxTimestamp, 40*time.Minute, func(int64) int64 { return 120 }),
},
},
{
name: "whole series, with max int64 timestamp",
inputArrays: []*cursors.IntegerArray{
{
Timestamps: []int64{math.MaxInt64},
Values: []int64{0},
},
},
want: []*cursors.IntegerArray{
{
Timestamps: []int64{math.MaxInt64},
Values: []int64{1},
},
},
},
}
@ -208,18 +328,10 @@ func TestIntegerIntegerCountArrayCursor(t *testing.T) {
return &cursors.IntegerArray{}
},
}
var countArrayCursor cursors.IntegerArrayCursor
if tc.every != 0 {
countArrayCursor = &integerIntegerWindowCountArrayCursor{
IntegerArrayCursor: mc,
every: int64(tc.every),
}
} else {
countArrayCursor = newCountArrayCursor(mc).(cursors.IntegerArrayCursor)
}
countArrayCursor := newIntegerWindowCountArrayCursor(mc, int64(tc.every))
got := make([]*cursors.IntegerArray, 0, len(tc.want))
for a := countArrayCursor.Next(); a.Len() != 0; a = countArrayCursor.Next() {
got = append(got, a)
got = append(got, copyIntegerArray(a))
}
if diff := cmp.Diff(got, tc.want); diff != "" {
@ -229,6 +341,58 @@ func TestIntegerIntegerCountArrayCursor(t *testing.T) {
}
}
func TestNewCountArrayCursor(t *testing.T) {
want := &integerWindowCountArrayCursor{
IntegerArrayCursor: &MockIntegerArrayCursor{},
res: cursors.NewIntegerArrayLen(1),
tmp: &cursors.IntegerArray{},
}
got := newCountArrayCursor(&MockIntegerArrayCursor{})
if diff := cmp.Diff(got, want, cmp.AllowUnexported(integerWindowCountArrayCursor{})); diff != "" {
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
}
}
func TestNewWindowCountArrayCursor(t *testing.T) {
t.Run("hour window", func(t *testing.T) {
want := &integerWindowCountArrayCursor{
IntegerArrayCursor: &MockIntegerArrayCursor{},
every: int64(time.Hour),
res: cursors.NewIntegerArrayLen(MaxPointsPerBlock),
tmp: &cursors.IntegerArray{},
}
req := &datatypes.ReadWindowAggregateRequest{
WindowEvery: int64(time.Hour),
}
got := newWindowCountArrayCursor(&MockIntegerArrayCursor{}, req)
if diff := cmp.Diff(got, want, cmp.AllowUnexported(integerWindowCountArrayCursor{})); diff != "" {
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
}
})
t.Run("count whole series", func(t *testing.T) {
want := &integerWindowCountArrayCursor{
IntegerArrayCursor: &MockIntegerArrayCursor{},
every: 0,
res: cursors.NewIntegerArrayLen(1),
tmp: &cursors.IntegerArray{},
}
req := &datatypes.ReadWindowAggregateRequest{
WindowEvery: math.MaxInt64,
}
got := newWindowCountArrayCursor(&MockIntegerArrayCursor{}, req)
if diff := cmp.Diff(got, want, cmp.AllowUnexported(integerWindowCountArrayCursor{})); diff != "" {
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
}
})
}
type MockIntegerArrayCursor struct {
CloseFunc func()
ErrFunc func() error