// Generated by tmpl // https://github.com/benbjohnson/tmpl // // DO NOT EDIT! // Source: array_cursor.gen.go.tmpl package reads import ( "errors" "fmt" "math" "github.com/influxdata/influxdb/v2" "github.com/influxdata/influxdb/v2/tsdb/cursors" ) const ( // MaxPointsPerBlock is the maximum number of points in an encoded // block in a TSM file. It should match the value in the tsm1 // package, but we don't want to import it. MaxPointsPerBlock = 1000 ) func newLimitArrayCursor(cur cursors.Cursor) cursors.Cursor { switch cur := cur.(type) { case cursors.FloatArrayCursor: return newFloatLimitArrayCursor(cur) case cursors.IntegerArrayCursor: return newIntegerLimitArrayCursor(cur) case cursors.UnsignedArrayCursor: return newUnsignedLimitArrayCursor(cur) case cursors.StringArrayCursor: return newStringLimitArrayCursor(cur) case cursors.BooleanArrayCursor: return newBooleanLimitArrayCursor(cur) default: panic(fmt.Sprintf("unreachable: %T", cur)) } } func newWindowFirstArrayCursor(cur cursors.Cursor, every, offset int64) cursors.Cursor { if every == 0 { return newLimitArrayCursor(cur) } switch cur := cur.(type) { case cursors.FloatArrayCursor: return newFloatWindowFirstArrayCursor(cur, every, offset) case cursors.IntegerArrayCursor: return newIntegerWindowFirstArrayCursor(cur, every, offset) case cursors.UnsignedArrayCursor: return newUnsignedWindowFirstArrayCursor(cur, every, offset) case cursors.StringArrayCursor: return newStringWindowFirstArrayCursor(cur, every, offset) case cursors.BooleanArrayCursor: return newBooleanWindowFirstArrayCursor(cur, every, offset) default: panic(fmt.Sprintf("unreachable: %T", cur)) } } func newWindowLastArrayCursor(cur cursors.Cursor, every, offset int64) cursors.Cursor { if every == 0 { return newLimitArrayCursor(cur) } switch cur := cur.(type) { case cursors.FloatArrayCursor: return newFloatWindowLastArrayCursor(cur, every, offset) case cursors.IntegerArrayCursor: return newIntegerWindowLastArrayCursor(cur, every, offset) case cursors.UnsignedArrayCursor: return newUnsignedWindowLastArrayCursor(cur, every, offset) case cursors.StringArrayCursor: return newStringWindowLastArrayCursor(cur, every, offset) case cursors.BooleanArrayCursor: return newBooleanWindowLastArrayCursor(cur, every, offset) default: panic(fmt.Sprintf("unreachable: %T", cur)) } } func newWindowCountArrayCursor(cur cursors.Cursor, every, offset int64) cursors.Cursor { switch cur := cur.(type) { case cursors.FloatArrayCursor: return newFloatWindowCountArrayCursor(cur, every, offset) case cursors.IntegerArrayCursor: return newIntegerWindowCountArrayCursor(cur, every, offset) case cursors.UnsignedArrayCursor: return newUnsignedWindowCountArrayCursor(cur, every, offset) case cursors.StringArrayCursor: return newStringWindowCountArrayCursor(cur, every, offset) case cursors.BooleanArrayCursor: return newBooleanWindowCountArrayCursor(cur, every, offset) default: panic(fmt.Sprintf("unreachable: %T", cur)) } } func newWindowSumArrayCursor(cur cursors.Cursor, every, offset int64) (cursors.Cursor, error) { switch cur := cur.(type) { case cursors.FloatArrayCursor: return newFloatWindowSumArrayCursor(cur, every, offset), nil case cursors.IntegerArrayCursor: return newIntegerWindowSumArrayCursor(cur, every, offset), nil case cursors.UnsignedArrayCursor: return newUnsignedWindowSumArrayCursor(cur, every, offset), nil default: return nil, &influxdb.Error{ Code: influxdb.EInvalid, Msg: fmt.Sprintf("unsupported input type for sum aggregate: %s", arrayCursorType(cur)), } } } func newWindowMinArrayCursor(cur cursors.Cursor, every, offset int64) cursors.Cursor { switch cur := cur.(type) { case cursors.FloatArrayCursor: return newFloatWindowMinArrayCursor(cur, every, offset) case cursors.IntegerArrayCursor: return newIntegerWindowMinArrayCursor(cur, every, offset) case cursors.UnsignedArrayCursor: return newUnsignedWindowMinArrayCursor(cur, every, offset) default: panic(fmt.Sprintf("unsupported for aggregate min: %T", cur)) } } func newWindowMaxArrayCursor(cur cursors.Cursor, every, offset int64) cursors.Cursor { switch cur := cur.(type) { case cursors.FloatArrayCursor: return newFloatWindowMaxArrayCursor(cur, every, offset) case cursors.IntegerArrayCursor: return newIntegerWindowMaxArrayCursor(cur, every, offset) case cursors.UnsignedArrayCursor: return newUnsignedWindowMaxArrayCursor(cur, every, offset) default: panic(fmt.Sprintf("unsupported for aggregate max: %T", cur)) } } func newWindowMeanArrayCursor(cur cursors.Cursor, every, offset int64) (cursors.Cursor, error) { switch cur := cur.(type) { case cursors.FloatArrayCursor: return newFloatWindowMeanArrayCursor(cur, every, offset), nil case cursors.IntegerArrayCursor: return newIntegerWindowMeanArrayCursor(cur, every, offset), nil case cursors.UnsignedArrayCursor: return newUnsignedWindowMeanArrayCursor(cur, every, offset), nil default: return nil, &influxdb.Error{ Code: influxdb.EInvalid, Msg: fmt.Sprintf("unsupported input type for mean aggregate: %s", arrayCursorType(cur)), } } } // ******************** // Float Array Cursor type floatArrayFilterCursor struct { cursors.FloatArrayCursor cond expression m *singleValue res *cursors.FloatArray tmp *cursors.FloatArray } func newFloatFilterArrayCursor(cond expression) *floatArrayFilterCursor { return &floatArrayFilterCursor{ cond: cond, m: &singleValue{}, res: cursors.NewFloatArrayLen(MaxPointsPerBlock), tmp: &cursors.FloatArray{}, } } func (c *floatArrayFilterCursor) reset(cur cursors.FloatArrayCursor) { c.FloatArrayCursor = cur c.tmp.Timestamps, c.tmp.Values = nil, nil } func (c *floatArrayFilterCursor) Stats() cursors.CursorStats { return c.FloatArrayCursor.Stats() } func (c *floatArrayFilterCursor) Next() *cursors.FloatArray { pos := 0 c.res.Timestamps = c.res.Timestamps[:cap(c.res.Timestamps)] c.res.Values = c.res.Values[:cap(c.res.Values)] var a *cursors.FloatArray if c.tmp.Len() > 0 { a = c.tmp } else { a = c.FloatArrayCursor.Next() } LOOP: for len(a.Timestamps) > 0 { for i, v := range a.Values { c.m.v = v if c.cond.EvalBool(c.m) { c.res.Timestamps[pos] = a.Timestamps[i] c.res.Values[pos] = v pos++ if pos >= MaxPointsPerBlock { c.tmp.Timestamps = a.Timestamps[i+1:] c.tmp.Values = a.Values[i+1:] break LOOP } } } // Clear bufferred timestamps & values if we make it through a cursor. // The break above will skip this if a cursor is partially read. c.tmp.Timestamps = nil c.tmp.Values = nil a = c.FloatArrayCursor.Next() } c.res.Timestamps = c.res.Timestamps[:pos] c.res.Values = c.res.Values[:pos] return c.res } type floatArrayCursor struct { cursors.FloatArrayCursor cursorContext filter *floatArrayFilterCursor } func (c *floatArrayCursor) reset(cur cursors.FloatArrayCursor, cursorIterator cursors.CursorIterator, cond expression) { if cond != nil { if c.filter == nil { c.filter = newFloatFilterArrayCursor(cond) } c.filter.reset(cur) cur = c.filter } c.FloatArrayCursor = cur c.cursorIterator = cursorIterator c.err = nil } func (c *floatArrayCursor) Err() error { return c.err } func (c *floatArrayCursor) Stats() cursors.CursorStats { return c.FloatArrayCursor.Stats() } func (c *floatArrayCursor) Next() *cursors.FloatArray { for { a := c.FloatArrayCursor.Next() if a.Len() == 0 { if c.nextArrayCursor() { continue } } return a } } func (c *floatArrayCursor) nextArrayCursor() bool { if c.cursorIterator == nil { return false } c.FloatArrayCursor.Close() cur, _ := c.cursorIterator.Next(c.ctx, c.req) c.cursorIterator = nil var ok bool if cur != nil { var next cursors.FloatArrayCursor next, ok = cur.(cursors.FloatArrayCursor) if !ok { cur.Close() next = FloatEmptyArrayCursor c.cursorIterator = nil c.err = errors.New("expected float cursor") } else { if c.filter != nil { c.filter.reset(next) next = c.filter } } c.FloatArrayCursor = next } else { c.FloatArrayCursor = FloatEmptyArrayCursor } return ok } type floatLimitArrayCursor struct { cursors.FloatArrayCursor res *cursors.FloatArray done bool } func newFloatLimitArrayCursor(cur cursors.FloatArrayCursor) *floatLimitArrayCursor { return &floatLimitArrayCursor{ FloatArrayCursor: cur, res: cursors.NewFloatArrayLen(1), } } func (c *floatLimitArrayCursor) Stats() cursors.CursorStats { return c.FloatArrayCursor.Stats() } func (c *floatLimitArrayCursor) Next() *cursors.FloatArray { if c.done { return &cursors.FloatArray{} } a := c.FloatArrayCursor.Next() if len(a.Timestamps) == 0 { return a } c.done = true c.res.Timestamps[0] = a.Timestamps[0] c.res.Values[0] = a.Values[0] return c.res } type floatWindowLastArrayCursor struct { cursors.FloatArrayCursor every, offset, windowEnd int64 res *cursors.FloatArray tmp *cursors.FloatArray } // Window array cursors assume that every != 0 && every != MaxInt64. // Such a cursor will panic in the first case and possibly overflow in the second. func newFloatWindowLastArrayCursor(cur cursors.FloatArrayCursor, every, offset int64) *floatWindowLastArrayCursor { return &floatWindowLastArrayCursor{ FloatArrayCursor: cur, every: every, offset: offset, windowEnd: math.MinInt64, res: cursors.NewFloatArrayLen(MaxPointsPerBlock), tmp: &cursors.FloatArray{}, } } func (c *floatWindowLastArrayCursor) Stats() cursors.CursorStats { return c.FloatArrayCursor.Stats() } func (c *floatWindowLastArrayCursor) Next() *cursors.FloatArray { cur := -1 NEXT: var a *cursors.FloatArray if c.tmp.Len() > 0 { a = c.tmp } else { a = c.FloatArrayCursor.Next() } if a.Len() == 0 { c.res.Timestamps = c.res.Timestamps[:cur+1] c.res.Values = c.res.Values[:cur+1] return c.res } for i, t := range a.Timestamps { if t >= c.windowEnd { cur++ } if cur == MaxPointsPerBlock { c.tmp.Timestamps = a.Timestamps[i:] c.tmp.Values = a.Values[i:] return c.res } c.res.Timestamps[cur] = t c.res.Values[cur] = a.Values[i] c.windowEnd = WindowStop(t, c.every, c.offset) } c.tmp.Timestamps = nil c.tmp.Values = nil goto NEXT } type floatWindowFirstArrayCursor struct { cursors.FloatArrayCursor every, offset, windowEnd int64 res *cursors.FloatArray tmp *cursors.FloatArray } // Window array cursors assume that every != 0 && every != MaxInt64. // Such a cursor will panic in the first case and possibly overflow in the second. func newFloatWindowFirstArrayCursor(cur cursors.FloatArrayCursor, every, offset int64) *floatWindowFirstArrayCursor { return &floatWindowFirstArrayCursor{ FloatArrayCursor: cur, every: every, offset: offset, windowEnd: math.MinInt64, res: cursors.NewFloatArrayLen(MaxPointsPerBlock), tmp: &cursors.FloatArray{}, } } func (c *floatWindowFirstArrayCursor) Stats() cursors.CursorStats { return c.FloatArrayCursor.Stats() } func (c *floatWindowFirstArrayCursor) Next() *cursors.FloatArray { c.res.Timestamps = c.res.Timestamps[:0] c.res.Values = c.res.Values[:0] NEXT: var a *cursors.FloatArray if c.tmp.Len() > 0 { a = c.tmp } else { a = c.FloatArrayCursor.Next() } if a.Len() == 0 { return c.res } for i, t := range a.Timestamps { if t < c.windowEnd { continue } c.windowEnd = WindowStop(t, c.every, c.offset) c.res.Timestamps = append(c.res.Timestamps, t) c.res.Values = append(c.res.Values, a.Values[i]) if c.res.Len() == MaxPointsPerBlock { c.tmp.Timestamps = a.Timestamps[i+1:] c.tmp.Values = a.Values[i+1:] return c.res } } c.tmp.Timestamps = nil c.tmp.Values = nil goto NEXT } type floatWindowCountArrayCursor struct { cursors.FloatArrayCursor every, offset int64 res *cursors.IntegerArray tmp *cursors.FloatArray } func newFloatWindowCountArrayCursor(cur cursors.FloatArrayCursor, every, offset int64) *floatWindowCountArrayCursor { resLen := MaxPointsPerBlock if every == 0 { resLen = 1 } return &floatWindowCountArrayCursor{ FloatArrayCursor: cur, every: every, offset: offset, res: cursors.NewIntegerArrayLen(resLen), tmp: &cursors.FloatArray{}, } } func (c *floatWindowCountArrayCursor) Stats() cursors.CursorStats { return c.FloatArrayCursor.Stats() } func (c *floatWindowCountArrayCursor) Next() *cursors.IntegerArray { pos := 0 c.res.Timestamps = c.res.Timestamps[:cap(c.res.Timestamps)] c.res.Values = c.res.Values[:cap(c.res.Values)] var a *cursors.FloatArray if c.tmp.Len() > 0 { a = c.tmp } else { a = c.FloatArrayCursor.Next() } if a.Len() == 0 { return &cursors.IntegerArray{} } rowIdx := 0 var acc int64 = 0 var windowEnd int64 if c.every != 0 { windowEnd = WindowStop(a.Timestamps[rowIdx], c.every, c.offset) } else { windowEnd = math.MaxInt64 } windowHasPoints := false // enumerate windows WINDOWS: for { for ; rowIdx < a.Len(); rowIdx++ { ts := a.Timestamps[rowIdx] if c.every != 0 && ts >= windowEnd { // new window detected, close the current window // do not generate a point for empty windows if windowHasPoints { c.res.Timestamps[pos] = windowEnd c.res.Values[pos] = acc pos++ if pos >= MaxPointsPerBlock { // the output array is full, // save the remaining points in the input array in tmp. // they will be processed in the next call to Next() c.tmp.Timestamps = a.Timestamps[rowIdx:] c.tmp.Values = a.Values[rowIdx:] break WINDOWS } } // start the new window acc = 0 windowEnd = WindowStop(a.Timestamps[rowIdx], c.every, c.offset) windowHasPoints = false continue WINDOWS } else { acc++ windowHasPoints = true } } // Clear buffered timestamps & values if we make it through a cursor. // The break above will skip this if a cursor is partially read. c.tmp.Timestamps = nil c.tmp.Values = nil // get the next chunk a = c.FloatArrayCursor.Next() if a.Len() == 0 { // write the final point // do not generate a point for empty windows if windowHasPoints { c.res.Timestamps[pos] = windowEnd c.res.Values[pos] = acc pos++ } break WINDOWS } rowIdx = 0 } c.res.Timestamps = c.res.Timestamps[:pos] c.res.Values = c.res.Values[:pos] return c.res } type floatWindowSumArrayCursor struct { cursors.FloatArrayCursor every, offset int64 res *cursors.FloatArray tmp *cursors.FloatArray } func newFloatWindowSumArrayCursor(cur cursors.FloatArrayCursor, every, offset int64) *floatWindowSumArrayCursor { resLen := MaxPointsPerBlock if every == 0 { resLen = 1 } return &floatWindowSumArrayCursor{ FloatArrayCursor: cur, every: every, offset: offset, res: cursors.NewFloatArrayLen(resLen), tmp: &cursors.FloatArray{}, } } func (c *floatWindowSumArrayCursor) Stats() cursors.CursorStats { return c.FloatArrayCursor.Stats() } func (c *floatWindowSumArrayCursor) Next() *cursors.FloatArray { pos := 0 c.res.Timestamps = c.res.Timestamps[:cap(c.res.Timestamps)] c.res.Values = c.res.Values[:cap(c.res.Values)] var a *cursors.FloatArray if c.tmp.Len() > 0 { a = c.tmp } else { a = c.FloatArrayCursor.Next() } if a.Len() == 0 { return &cursors.FloatArray{} } rowIdx := 0 var acc float64 = 0 var windowEnd int64 if c.every != 0 { windowEnd = WindowStop(a.Timestamps[rowIdx], c.every, c.offset) } else { windowEnd = math.MaxInt64 } windowHasPoints := false // enumerate windows WINDOWS: for { for ; rowIdx < a.Len(); rowIdx++ { ts := a.Timestamps[rowIdx] if c.every != 0 && ts >= windowEnd { // new window detected, close the current window // do not generate a point for empty windows if windowHasPoints { c.res.Timestamps[pos] = windowEnd c.res.Values[pos] = acc pos++ if pos >= MaxPointsPerBlock { // the output array is full, // save the remaining points in the input array in tmp. // they will be processed in the next call to Next() c.tmp.Timestamps = a.Timestamps[rowIdx:] c.tmp.Values = a.Values[rowIdx:] break WINDOWS } } // start the new window acc = 0 windowEnd = WindowStop(a.Timestamps[rowIdx], c.every, c.offset) windowHasPoints = false continue WINDOWS } else { acc += a.Values[rowIdx] windowHasPoints = true } } // Clear buffered timestamps & values if we make it through a cursor. // The break above will skip this if a cursor is partially read. c.tmp.Timestamps = nil c.tmp.Values = nil // get the next chunk a = c.FloatArrayCursor.Next() if a.Len() == 0 { // write the final point // do not generate a point for empty windows if windowHasPoints { c.res.Timestamps[pos] = windowEnd c.res.Values[pos] = acc pos++ } break WINDOWS } rowIdx = 0 } c.res.Timestamps = c.res.Timestamps[:pos] c.res.Values = c.res.Values[:pos] return c.res } type floatWindowMinArrayCursor struct { cursors.FloatArrayCursor every, offset int64 res *cursors.FloatArray tmp *cursors.FloatArray } func newFloatWindowMinArrayCursor(cur cursors.FloatArrayCursor, every, offset int64) *floatWindowMinArrayCursor { resLen := MaxPointsPerBlock if every == 0 { resLen = 1 } return &floatWindowMinArrayCursor{ FloatArrayCursor: cur, every: every, offset: offset, res: cursors.NewFloatArrayLen(resLen), tmp: &cursors.FloatArray{}, } } func (c *floatWindowMinArrayCursor) Stats() cursors.CursorStats { return c.FloatArrayCursor.Stats() } func (c *floatWindowMinArrayCursor) Next() *cursors.FloatArray { pos := 0 c.res.Timestamps = c.res.Timestamps[:cap(c.res.Timestamps)] c.res.Values = c.res.Values[:cap(c.res.Values)] var a *cursors.FloatArray if c.tmp.Len() > 0 { a = c.tmp } else { a = c.FloatArrayCursor.Next() } if a.Len() == 0 { return &cursors.FloatArray{} } rowIdx := 0 var acc float64 = math.MaxFloat64 var tsAcc int64 var windowEnd int64 if c.every != 0 { windowEnd = WindowStop(a.Timestamps[rowIdx], c.every, c.offset) } else { windowEnd = math.MaxInt64 } windowHasPoints := false // enumerate windows WINDOWS: for { for ; rowIdx < a.Len(); rowIdx++ { ts := a.Timestamps[rowIdx] if c.every != 0 && ts >= windowEnd { // new window detected, close the current window // do not generate a point for empty windows if windowHasPoints { c.res.Timestamps[pos] = tsAcc c.res.Values[pos] = acc pos++ if pos >= MaxPointsPerBlock { // the output array is full, // save the remaining points in the input array in tmp. // they will be processed in the next call to Next() c.tmp.Timestamps = a.Timestamps[rowIdx:] c.tmp.Values = a.Values[rowIdx:] break WINDOWS } } // start the new window acc = math.MaxFloat64 windowEnd = WindowStop(a.Timestamps[rowIdx], c.every, c.offset) windowHasPoints = false continue WINDOWS } else { if !windowHasPoints || a.Values[rowIdx] < acc { acc = a.Values[rowIdx] tsAcc = a.Timestamps[rowIdx] } windowHasPoints = true } } // Clear buffered timestamps & values if we make it through a cursor. // The break above will skip this if a cursor is partially read. c.tmp.Timestamps = nil c.tmp.Values = nil // get the next chunk a = c.FloatArrayCursor.Next() if a.Len() == 0 { // write the final point // do not generate a point for empty windows if windowHasPoints { c.res.Timestamps[pos] = tsAcc c.res.Values[pos] = acc pos++ } break WINDOWS } rowIdx = 0 } c.res.Timestamps = c.res.Timestamps[:pos] c.res.Values = c.res.Values[:pos] return c.res } type floatWindowMaxArrayCursor struct { cursors.FloatArrayCursor every, offset int64 res *cursors.FloatArray tmp *cursors.FloatArray } func newFloatWindowMaxArrayCursor(cur cursors.FloatArrayCursor, every, offset int64) *floatWindowMaxArrayCursor { resLen := MaxPointsPerBlock if every == 0 { resLen = 1 } return &floatWindowMaxArrayCursor{ FloatArrayCursor: cur, every: every, offset: offset, res: cursors.NewFloatArrayLen(resLen), tmp: &cursors.FloatArray{}, } } func (c *floatWindowMaxArrayCursor) Stats() cursors.CursorStats { return c.FloatArrayCursor.Stats() } func (c *floatWindowMaxArrayCursor) Next() *cursors.FloatArray { pos := 0 c.res.Timestamps = c.res.Timestamps[:cap(c.res.Timestamps)] c.res.Values = c.res.Values[:cap(c.res.Values)] var a *cursors.FloatArray if c.tmp.Len() > 0 { a = c.tmp } else { a = c.FloatArrayCursor.Next() } if a.Len() == 0 { return &cursors.FloatArray{} } rowIdx := 0 var acc float64 = -math.MaxFloat64 var tsAcc int64 var windowEnd int64 if c.every != 0 { windowEnd = WindowStop(a.Timestamps[rowIdx], c.every, c.offset) } else { windowEnd = math.MaxInt64 } windowHasPoints := false // enumerate windows WINDOWS: for { for ; rowIdx < a.Len(); rowIdx++ { ts := a.Timestamps[rowIdx] if c.every != 0 && ts >= windowEnd { // new window detected, close the current window // do not generate a point for empty windows if windowHasPoints { c.res.Timestamps[pos] = tsAcc c.res.Values[pos] = acc pos++ if pos >= MaxPointsPerBlock { // the output array is full, // save the remaining points in the input array in tmp. // they will be processed in the next call to Next() c.tmp.Timestamps = a.Timestamps[rowIdx:] c.tmp.Values = a.Values[rowIdx:] break WINDOWS } } // start the new window acc = -math.MaxFloat64 windowEnd = WindowStop(a.Timestamps[rowIdx], c.every, c.offset) windowHasPoints = false continue WINDOWS } else { if !windowHasPoints || a.Values[rowIdx] > acc { acc = a.Values[rowIdx] tsAcc = a.Timestamps[rowIdx] } windowHasPoints = true } } // Clear buffered timestamps & values if we make it through a cursor. // The break above will skip this if a cursor is partially read. c.tmp.Timestamps = nil c.tmp.Values = nil // get the next chunk a = c.FloatArrayCursor.Next() if a.Len() == 0 { // write the final point // do not generate a point for empty windows if windowHasPoints { c.res.Timestamps[pos] = tsAcc c.res.Values[pos] = acc pos++ } break WINDOWS } rowIdx = 0 } c.res.Timestamps = c.res.Timestamps[:pos] c.res.Values = c.res.Values[:pos] return c.res } type floatWindowMeanArrayCursor struct { cursors.FloatArrayCursor every, offset int64 res *cursors.FloatArray tmp *cursors.FloatArray } func newFloatWindowMeanArrayCursor(cur cursors.FloatArrayCursor, every, offset int64) *floatWindowMeanArrayCursor { resLen := MaxPointsPerBlock if every == 0 { resLen = 1 } return &floatWindowMeanArrayCursor{ FloatArrayCursor: cur, every: every, offset: offset, res: cursors.NewFloatArrayLen(resLen), tmp: &cursors.FloatArray{}, } } func (c *floatWindowMeanArrayCursor) Stats() cursors.CursorStats { return c.FloatArrayCursor.Stats() } func (c *floatWindowMeanArrayCursor) Next() *cursors.FloatArray { pos := 0 c.res.Timestamps = c.res.Timestamps[:cap(c.res.Timestamps)] c.res.Values = c.res.Values[:cap(c.res.Values)] var a *cursors.FloatArray if c.tmp.Len() > 0 { a = c.tmp } else { a = c.FloatArrayCursor.Next() } if a.Len() == 0 { return &cursors.FloatArray{} } rowIdx := 0 var sum float64 var count int64 var windowEnd int64 if c.every != 0 { windowEnd = WindowStop(a.Timestamps[rowIdx], c.every, c.offset) } else { windowEnd = math.MaxInt64 } windowHasPoints := false // enumerate windows WINDOWS: for { for ; rowIdx < a.Len(); rowIdx++ { ts := a.Timestamps[rowIdx] if c.every != 0 && ts >= windowEnd { // new window detected, close the current window // do not generate a point for empty windows if windowHasPoints { c.res.Timestamps[pos] = windowEnd c.res.Values[pos] = sum / float64(count) pos++ if pos >= MaxPointsPerBlock { // the output array is full, // save the remaining points in the input array in tmp. // they will be processed in the next call to Next() c.tmp.Timestamps = a.Timestamps[rowIdx:] c.tmp.Values = a.Values[rowIdx:] break WINDOWS } } // start the new window sum = 0 count = 0 windowEnd = WindowStop(a.Timestamps[rowIdx], c.every, c.offset) windowHasPoints = false continue WINDOWS } else { sum += a.Values[rowIdx] count++ windowHasPoints = true } } // Clear buffered timestamps & values if we make it through a cursor. // The break above will skip this if a cursor is partially read. c.tmp.Timestamps = nil c.tmp.Values = nil // get the next chunk a = c.FloatArrayCursor.Next() if a.Len() == 0 { // write the final point // do not generate a point for empty windows if windowHasPoints { c.res.Timestamps[pos] = windowEnd c.res.Values[pos] = sum / float64(count) pos++ } break WINDOWS } rowIdx = 0 } c.res.Timestamps = c.res.Timestamps[:pos] c.res.Values = c.res.Values[:pos] return c.res } type floatEmptyArrayCursor struct { res cursors.FloatArray } var FloatEmptyArrayCursor cursors.FloatArrayCursor = &floatEmptyArrayCursor{} func (c *floatEmptyArrayCursor) Err() error { return nil } func (c *floatEmptyArrayCursor) Close() {} func (c *floatEmptyArrayCursor) Stats() cursors.CursorStats { return cursors.CursorStats{} } func (c *floatEmptyArrayCursor) Next() *cursors.FloatArray { return &c.res } // ******************** // Integer Array Cursor type integerArrayFilterCursor struct { cursors.IntegerArrayCursor cond expression m *singleValue res *cursors.IntegerArray tmp *cursors.IntegerArray } func newIntegerFilterArrayCursor(cond expression) *integerArrayFilterCursor { return &integerArrayFilterCursor{ cond: cond, m: &singleValue{}, res: cursors.NewIntegerArrayLen(MaxPointsPerBlock), tmp: &cursors.IntegerArray{}, } } func (c *integerArrayFilterCursor) reset(cur cursors.IntegerArrayCursor) { c.IntegerArrayCursor = cur c.tmp.Timestamps, c.tmp.Values = nil, nil } func (c *integerArrayFilterCursor) Stats() cursors.CursorStats { return c.IntegerArrayCursor.Stats() } func (c *integerArrayFilterCursor) Next() *cursors.IntegerArray { pos := 0 c.res.Timestamps = c.res.Timestamps[:cap(c.res.Timestamps)] c.res.Values = c.res.Values[:cap(c.res.Values)] var a *cursors.IntegerArray if c.tmp.Len() > 0 { a = c.tmp } else { a = c.IntegerArrayCursor.Next() } LOOP: for len(a.Timestamps) > 0 { for i, v := range a.Values { c.m.v = v if c.cond.EvalBool(c.m) { c.res.Timestamps[pos] = a.Timestamps[i] c.res.Values[pos] = v pos++ if pos >= MaxPointsPerBlock { c.tmp.Timestamps = a.Timestamps[i+1:] c.tmp.Values = a.Values[i+1:] break LOOP } } } // Clear bufferred timestamps & values if we make it through a cursor. // The break above will skip this if a cursor is partially read. c.tmp.Timestamps = nil c.tmp.Values = nil a = c.IntegerArrayCursor.Next() } c.res.Timestamps = c.res.Timestamps[:pos] c.res.Values = c.res.Values[:pos] return c.res } type integerArrayCursor struct { cursors.IntegerArrayCursor cursorContext filter *integerArrayFilterCursor } func (c *integerArrayCursor) reset(cur cursors.IntegerArrayCursor, cursorIterator cursors.CursorIterator, cond expression) { if cond != nil { if c.filter == nil { c.filter = newIntegerFilterArrayCursor(cond) } c.filter.reset(cur) cur = c.filter } c.IntegerArrayCursor = cur c.cursorIterator = cursorIterator c.err = nil } func (c *integerArrayCursor) Err() error { return c.err } func (c *integerArrayCursor) Stats() cursors.CursorStats { return c.IntegerArrayCursor.Stats() } func (c *integerArrayCursor) Next() *cursors.IntegerArray { for { a := c.IntegerArrayCursor.Next() if a.Len() == 0 { if c.nextArrayCursor() { continue } } return a } } func (c *integerArrayCursor) nextArrayCursor() bool { if c.cursorIterator == nil { return false } c.IntegerArrayCursor.Close() cur, _ := c.cursorIterator.Next(c.ctx, c.req) c.cursorIterator = nil var ok bool if cur != nil { var next cursors.IntegerArrayCursor next, ok = cur.(cursors.IntegerArrayCursor) if !ok { cur.Close() next = IntegerEmptyArrayCursor c.cursorIterator = nil c.err = errors.New("expected integer cursor") } else { if c.filter != nil { c.filter.reset(next) next = c.filter } } c.IntegerArrayCursor = next } else { c.IntegerArrayCursor = IntegerEmptyArrayCursor } return ok } type integerLimitArrayCursor struct { cursors.IntegerArrayCursor res *cursors.IntegerArray done bool } func newIntegerLimitArrayCursor(cur cursors.IntegerArrayCursor) *integerLimitArrayCursor { return &integerLimitArrayCursor{ IntegerArrayCursor: cur, res: cursors.NewIntegerArrayLen(1), } } func (c *integerLimitArrayCursor) Stats() cursors.CursorStats { return c.IntegerArrayCursor.Stats() } func (c *integerLimitArrayCursor) Next() *cursors.IntegerArray { if c.done { return &cursors.IntegerArray{} } a := c.IntegerArrayCursor.Next() if len(a.Timestamps) == 0 { return a } c.done = true c.res.Timestamps[0] = a.Timestamps[0] c.res.Values[0] = a.Values[0] return c.res } type integerWindowLastArrayCursor struct { cursors.IntegerArrayCursor every, offset, windowEnd int64 res *cursors.IntegerArray tmp *cursors.IntegerArray } // Window array cursors assume that every != 0 && every != MaxInt64. // Such a cursor will panic in the first case and possibly overflow in the second. func newIntegerWindowLastArrayCursor(cur cursors.IntegerArrayCursor, every, offset int64) *integerWindowLastArrayCursor { return &integerWindowLastArrayCursor{ IntegerArrayCursor: cur, every: every, offset: offset, windowEnd: math.MinInt64, res: cursors.NewIntegerArrayLen(MaxPointsPerBlock), tmp: &cursors.IntegerArray{}, } } func (c *integerWindowLastArrayCursor) Stats() cursors.CursorStats { return c.IntegerArrayCursor.Stats() } func (c *integerWindowLastArrayCursor) Next() *cursors.IntegerArray { cur := -1 NEXT: var a *cursors.IntegerArray if c.tmp.Len() > 0 { a = c.tmp } else { a = c.IntegerArrayCursor.Next() } if a.Len() == 0 { c.res.Timestamps = c.res.Timestamps[:cur+1] c.res.Values = c.res.Values[:cur+1] return c.res } for i, t := range a.Timestamps { if t >= c.windowEnd { cur++ } if cur == MaxPointsPerBlock { c.tmp.Timestamps = a.Timestamps[i:] c.tmp.Values = a.Values[i:] return c.res } c.res.Timestamps[cur] = t c.res.Values[cur] = a.Values[i] c.windowEnd = WindowStop(t, c.every, c.offset) } c.tmp.Timestamps = nil c.tmp.Values = nil goto NEXT } type integerWindowFirstArrayCursor struct { cursors.IntegerArrayCursor every, offset, windowEnd int64 res *cursors.IntegerArray tmp *cursors.IntegerArray } // Window array cursors assume that every != 0 && every != MaxInt64. // Such a cursor will panic in the first case and possibly overflow in the second. func newIntegerWindowFirstArrayCursor(cur cursors.IntegerArrayCursor, every, offset int64) *integerWindowFirstArrayCursor { return &integerWindowFirstArrayCursor{ IntegerArrayCursor: cur, every: every, offset: offset, windowEnd: math.MinInt64, res: cursors.NewIntegerArrayLen(MaxPointsPerBlock), tmp: &cursors.IntegerArray{}, } } func (c *integerWindowFirstArrayCursor) Stats() cursors.CursorStats { return c.IntegerArrayCursor.Stats() } func (c *integerWindowFirstArrayCursor) Next() *cursors.IntegerArray { c.res.Timestamps = c.res.Timestamps[:0] c.res.Values = c.res.Values[:0] NEXT: var a *cursors.IntegerArray if c.tmp.Len() > 0 { a = c.tmp } else { a = c.IntegerArrayCursor.Next() } if a.Len() == 0 { return c.res } for i, t := range a.Timestamps { if t < c.windowEnd { continue } c.windowEnd = WindowStop(t, c.every, c.offset) c.res.Timestamps = append(c.res.Timestamps, t) c.res.Values = append(c.res.Values, a.Values[i]) if c.res.Len() == MaxPointsPerBlock { c.tmp.Timestamps = a.Timestamps[i+1:] c.tmp.Values = a.Values[i+1:] return c.res } } c.tmp.Timestamps = nil c.tmp.Values = nil goto NEXT } type integerWindowCountArrayCursor struct { cursors.IntegerArrayCursor every, offset int64 res *cursors.IntegerArray tmp *cursors.IntegerArray } func newIntegerWindowCountArrayCursor(cur cursors.IntegerArrayCursor, every, offset int64) *integerWindowCountArrayCursor { resLen := MaxPointsPerBlock if every == 0 { resLen = 1 } return &integerWindowCountArrayCursor{ IntegerArrayCursor: cur, every: every, offset: offset, res: cursors.NewIntegerArrayLen(resLen), tmp: &cursors.IntegerArray{}, } } func (c *integerWindowCountArrayCursor) Stats() cursors.CursorStats { return c.IntegerArrayCursor.Stats() } func (c *integerWindowCountArrayCursor) Next() *cursors.IntegerArray { pos := 0 c.res.Timestamps = c.res.Timestamps[:cap(c.res.Timestamps)] c.res.Values = c.res.Values[:cap(c.res.Values)] var a *cursors.IntegerArray if c.tmp.Len() > 0 { a = c.tmp } else { a = c.IntegerArrayCursor.Next() } if a.Len() == 0 { return &cursors.IntegerArray{} } rowIdx := 0 var acc int64 = 0 var windowEnd int64 if c.every != 0 { windowEnd = WindowStop(a.Timestamps[rowIdx], c.every, c.offset) } else { windowEnd = math.MaxInt64 } windowHasPoints := false // enumerate windows WINDOWS: for { for ; rowIdx < a.Len(); rowIdx++ { ts := a.Timestamps[rowIdx] if c.every != 0 && ts >= windowEnd { // new window detected, close the current window // do not generate a point for empty windows if windowHasPoints { c.res.Timestamps[pos] = windowEnd c.res.Values[pos] = acc pos++ if pos >= MaxPointsPerBlock { // the output array is full, // save the remaining points in the input array in tmp. // they will be processed in the next call to Next() c.tmp.Timestamps = a.Timestamps[rowIdx:] c.tmp.Values = a.Values[rowIdx:] break WINDOWS } } // start the new window acc = 0 windowEnd = WindowStop(a.Timestamps[rowIdx], c.every, c.offset) windowHasPoints = false continue WINDOWS } else { acc++ windowHasPoints = true } } // Clear buffered timestamps & values if we make it through a cursor. // The break above will skip this if a cursor is partially read. c.tmp.Timestamps = nil c.tmp.Values = nil // get the next chunk a = c.IntegerArrayCursor.Next() if a.Len() == 0 { // write the final point // do not generate a point for empty windows if windowHasPoints { c.res.Timestamps[pos] = windowEnd c.res.Values[pos] = acc pos++ } break WINDOWS } rowIdx = 0 } c.res.Timestamps = c.res.Timestamps[:pos] c.res.Values = c.res.Values[:pos] return c.res } type integerWindowSumArrayCursor struct { cursors.IntegerArrayCursor every, offset int64 res *cursors.IntegerArray tmp *cursors.IntegerArray } func newIntegerWindowSumArrayCursor(cur cursors.IntegerArrayCursor, every, offset int64) *integerWindowSumArrayCursor { resLen := MaxPointsPerBlock if every == 0 { resLen = 1 } return &integerWindowSumArrayCursor{ IntegerArrayCursor: cur, every: every, offset: offset, res: cursors.NewIntegerArrayLen(resLen), tmp: &cursors.IntegerArray{}, } } func (c *integerWindowSumArrayCursor) Stats() cursors.CursorStats { return c.IntegerArrayCursor.Stats() } func (c *integerWindowSumArrayCursor) Next() *cursors.IntegerArray { pos := 0 c.res.Timestamps = c.res.Timestamps[:cap(c.res.Timestamps)] c.res.Values = c.res.Values[:cap(c.res.Values)] var a *cursors.IntegerArray if c.tmp.Len() > 0 { a = c.tmp } else { a = c.IntegerArrayCursor.Next() } if a.Len() == 0 { return &cursors.IntegerArray{} } rowIdx := 0 var acc int64 = 0 var windowEnd int64 if c.every != 0 { windowEnd = WindowStop(a.Timestamps[rowIdx], c.every, c.offset) } else { windowEnd = math.MaxInt64 } windowHasPoints := false // enumerate windows WINDOWS: for { for ; rowIdx < a.Len(); rowIdx++ { ts := a.Timestamps[rowIdx] if c.every != 0 && ts >= windowEnd { // new window detected, close the current window // do not generate a point for empty windows if windowHasPoints { c.res.Timestamps[pos] = windowEnd c.res.Values[pos] = acc pos++ if pos >= MaxPointsPerBlock { // the output array is full, // save the remaining points in the input array in tmp. // they will be processed in the next call to Next() c.tmp.Timestamps = a.Timestamps[rowIdx:] c.tmp.Values = a.Values[rowIdx:] break WINDOWS } } // start the new window acc = 0 windowEnd = WindowStop(a.Timestamps[rowIdx], c.every, c.offset) windowHasPoints = false continue WINDOWS } else { acc += a.Values[rowIdx] windowHasPoints = true } } // Clear buffered timestamps & values if we make it through a cursor. // The break above will skip this if a cursor is partially read. c.tmp.Timestamps = nil c.tmp.Values = nil // get the next chunk a = c.IntegerArrayCursor.Next() if a.Len() == 0 { // write the final point // do not generate a point for empty windows if windowHasPoints { c.res.Timestamps[pos] = windowEnd c.res.Values[pos] = acc pos++ } break WINDOWS } rowIdx = 0 } c.res.Timestamps = c.res.Timestamps[:pos] c.res.Values = c.res.Values[:pos] return c.res } type integerWindowMinArrayCursor struct { cursors.IntegerArrayCursor every, offset int64 res *cursors.IntegerArray tmp *cursors.IntegerArray } func newIntegerWindowMinArrayCursor(cur cursors.IntegerArrayCursor, every, offset int64) *integerWindowMinArrayCursor { resLen := MaxPointsPerBlock if every == 0 { resLen = 1 } return &integerWindowMinArrayCursor{ IntegerArrayCursor: cur, every: every, offset: offset, res: cursors.NewIntegerArrayLen(resLen), tmp: &cursors.IntegerArray{}, } } func (c *integerWindowMinArrayCursor) Stats() cursors.CursorStats { return c.IntegerArrayCursor.Stats() } func (c *integerWindowMinArrayCursor) Next() *cursors.IntegerArray { pos := 0 c.res.Timestamps = c.res.Timestamps[:cap(c.res.Timestamps)] c.res.Values = c.res.Values[:cap(c.res.Values)] var a *cursors.IntegerArray if c.tmp.Len() > 0 { a = c.tmp } else { a = c.IntegerArrayCursor.Next() } if a.Len() == 0 { return &cursors.IntegerArray{} } rowIdx := 0 var acc int64 = math.MaxInt64 var tsAcc int64 var windowEnd int64 if c.every != 0 { windowEnd = WindowStop(a.Timestamps[rowIdx], c.every, c.offset) } else { windowEnd = math.MaxInt64 } windowHasPoints := false // enumerate windows WINDOWS: for { for ; rowIdx < a.Len(); rowIdx++ { ts := a.Timestamps[rowIdx] if c.every != 0 && ts >= windowEnd { // new window detected, close the current window // do not generate a point for empty windows if windowHasPoints { c.res.Timestamps[pos] = tsAcc c.res.Values[pos] = acc pos++ if pos >= MaxPointsPerBlock { // the output array is full, // save the remaining points in the input array in tmp. // they will be processed in the next call to Next() c.tmp.Timestamps = a.Timestamps[rowIdx:] c.tmp.Values = a.Values[rowIdx:] break WINDOWS } } // start the new window acc = math.MaxInt64 windowEnd = WindowStop(a.Timestamps[rowIdx], c.every, c.offset) windowHasPoints = false continue WINDOWS } else { if !windowHasPoints || a.Values[rowIdx] < acc { acc = a.Values[rowIdx] tsAcc = a.Timestamps[rowIdx] } windowHasPoints = true } } // Clear buffered timestamps & values if we make it through a cursor. // The break above will skip this if a cursor is partially read. c.tmp.Timestamps = nil c.tmp.Values = nil // get the next chunk a = c.IntegerArrayCursor.Next() if a.Len() == 0 { // write the final point // do not generate a point for empty windows if windowHasPoints { c.res.Timestamps[pos] = tsAcc c.res.Values[pos] = acc pos++ } break WINDOWS } rowIdx = 0 } c.res.Timestamps = c.res.Timestamps[:pos] c.res.Values = c.res.Values[:pos] return c.res } type integerWindowMaxArrayCursor struct { cursors.IntegerArrayCursor every, offset int64 res *cursors.IntegerArray tmp *cursors.IntegerArray } func newIntegerWindowMaxArrayCursor(cur cursors.IntegerArrayCursor, every, offset int64) *integerWindowMaxArrayCursor { resLen := MaxPointsPerBlock if every == 0 { resLen = 1 } return &integerWindowMaxArrayCursor{ IntegerArrayCursor: cur, every: every, offset: offset, res: cursors.NewIntegerArrayLen(resLen), tmp: &cursors.IntegerArray{}, } } func (c *integerWindowMaxArrayCursor) Stats() cursors.CursorStats { return c.IntegerArrayCursor.Stats() } func (c *integerWindowMaxArrayCursor) Next() *cursors.IntegerArray { pos := 0 c.res.Timestamps = c.res.Timestamps[:cap(c.res.Timestamps)] c.res.Values = c.res.Values[:cap(c.res.Values)] var a *cursors.IntegerArray if c.tmp.Len() > 0 { a = c.tmp } else { a = c.IntegerArrayCursor.Next() } if a.Len() == 0 { return &cursors.IntegerArray{} } rowIdx := 0 var acc int64 = math.MinInt64 var tsAcc int64 var windowEnd int64 if c.every != 0 { windowEnd = WindowStop(a.Timestamps[rowIdx], c.every, c.offset) } else { windowEnd = math.MaxInt64 } windowHasPoints := false // enumerate windows WINDOWS: for { for ; rowIdx < a.Len(); rowIdx++ { ts := a.Timestamps[rowIdx] if c.every != 0 && ts >= windowEnd { // new window detected, close the current window // do not generate a point for empty windows if windowHasPoints { c.res.Timestamps[pos] = tsAcc c.res.Values[pos] = acc pos++ if pos >= MaxPointsPerBlock { // the output array is full, // save the remaining points in the input array in tmp. // they will be processed in the next call to Next() c.tmp.Timestamps = a.Timestamps[rowIdx:] c.tmp.Values = a.Values[rowIdx:] break WINDOWS } } // start the new window acc = math.MinInt64 windowEnd = WindowStop(a.Timestamps[rowIdx], c.every, c.offset) windowHasPoints = false continue WINDOWS } else { if !windowHasPoints || a.Values[rowIdx] > acc { acc = a.Values[rowIdx] tsAcc = a.Timestamps[rowIdx] } windowHasPoints = true } } // Clear buffered timestamps & values if we make it through a cursor. // The break above will skip this if a cursor is partially read. c.tmp.Timestamps = nil c.tmp.Values = nil // get the next chunk a = c.IntegerArrayCursor.Next() if a.Len() == 0 { // write the final point // do not generate a point for empty windows if windowHasPoints { c.res.Timestamps[pos] = tsAcc c.res.Values[pos] = acc pos++ } break WINDOWS } rowIdx = 0 } c.res.Timestamps = c.res.Timestamps[:pos] c.res.Values = c.res.Values[:pos] return c.res } type integerWindowMeanArrayCursor struct { cursors.IntegerArrayCursor every, offset int64 res *cursors.FloatArray tmp *cursors.IntegerArray } func newIntegerWindowMeanArrayCursor(cur cursors.IntegerArrayCursor, every, offset int64) *integerWindowMeanArrayCursor { resLen := MaxPointsPerBlock if every == 0 { resLen = 1 } return &integerWindowMeanArrayCursor{ IntegerArrayCursor: cur, every: every, offset: offset, res: cursors.NewFloatArrayLen(resLen), tmp: &cursors.IntegerArray{}, } } func (c *integerWindowMeanArrayCursor) Stats() cursors.CursorStats { return c.IntegerArrayCursor.Stats() } func (c *integerWindowMeanArrayCursor) Next() *cursors.FloatArray { pos := 0 c.res.Timestamps = c.res.Timestamps[:cap(c.res.Timestamps)] c.res.Values = c.res.Values[:cap(c.res.Values)] var a *cursors.IntegerArray if c.tmp.Len() > 0 { a = c.tmp } else { a = c.IntegerArrayCursor.Next() } if a.Len() == 0 { return &cursors.FloatArray{} } rowIdx := 0 var sum int64 var count int64 var windowEnd int64 if c.every != 0 { windowEnd = WindowStop(a.Timestamps[rowIdx], c.every, c.offset) } else { windowEnd = math.MaxInt64 } windowHasPoints := false // enumerate windows WINDOWS: for { for ; rowIdx < a.Len(); rowIdx++ { ts := a.Timestamps[rowIdx] if c.every != 0 && ts >= windowEnd { // new window detected, close the current window // do not generate a point for empty windows if windowHasPoints { c.res.Timestamps[pos] = windowEnd c.res.Values[pos] = float64(sum) / float64(count) pos++ if pos >= MaxPointsPerBlock { // the output array is full, // save the remaining points in the input array in tmp. // they will be processed in the next call to Next() c.tmp.Timestamps = a.Timestamps[rowIdx:] c.tmp.Values = a.Values[rowIdx:] break WINDOWS } } // start the new window sum = 0 count = 0 windowEnd = WindowStop(a.Timestamps[rowIdx], c.every, c.offset) windowHasPoints = false continue WINDOWS } else { sum += a.Values[rowIdx] count++ windowHasPoints = true } } // Clear buffered timestamps & values if we make it through a cursor. // The break above will skip this if a cursor is partially read. c.tmp.Timestamps = nil c.tmp.Values = nil // get the next chunk a = c.IntegerArrayCursor.Next() if a.Len() == 0 { // write the final point // do not generate a point for empty windows if windowHasPoints { c.res.Timestamps[pos] = windowEnd c.res.Values[pos] = float64(sum) / float64(count) pos++ } break WINDOWS } rowIdx = 0 } c.res.Timestamps = c.res.Timestamps[:pos] c.res.Values = c.res.Values[:pos] return c.res } type integerEmptyArrayCursor struct { res cursors.IntegerArray } var IntegerEmptyArrayCursor cursors.IntegerArrayCursor = &integerEmptyArrayCursor{} func (c *integerEmptyArrayCursor) Err() error { return nil } func (c *integerEmptyArrayCursor) Close() {} func (c *integerEmptyArrayCursor) Stats() cursors.CursorStats { return cursors.CursorStats{} } func (c *integerEmptyArrayCursor) Next() *cursors.IntegerArray { return &c.res } // ******************** // Unsigned Array Cursor type unsignedArrayFilterCursor struct { cursors.UnsignedArrayCursor cond expression m *singleValue res *cursors.UnsignedArray tmp *cursors.UnsignedArray } func newUnsignedFilterArrayCursor(cond expression) *unsignedArrayFilterCursor { return &unsignedArrayFilterCursor{ cond: cond, m: &singleValue{}, res: cursors.NewUnsignedArrayLen(MaxPointsPerBlock), tmp: &cursors.UnsignedArray{}, } } func (c *unsignedArrayFilterCursor) reset(cur cursors.UnsignedArrayCursor) { c.UnsignedArrayCursor = cur c.tmp.Timestamps, c.tmp.Values = nil, nil } func (c *unsignedArrayFilterCursor) Stats() cursors.CursorStats { return c.UnsignedArrayCursor.Stats() } func (c *unsignedArrayFilterCursor) Next() *cursors.UnsignedArray { pos := 0 c.res.Timestamps = c.res.Timestamps[:cap(c.res.Timestamps)] c.res.Values = c.res.Values[:cap(c.res.Values)] var a *cursors.UnsignedArray if c.tmp.Len() > 0 { a = c.tmp } else { a = c.UnsignedArrayCursor.Next() } LOOP: for len(a.Timestamps) > 0 { for i, v := range a.Values { c.m.v = v if c.cond.EvalBool(c.m) { c.res.Timestamps[pos] = a.Timestamps[i] c.res.Values[pos] = v pos++ if pos >= MaxPointsPerBlock { c.tmp.Timestamps = a.Timestamps[i+1:] c.tmp.Values = a.Values[i+1:] break LOOP } } } // Clear bufferred timestamps & values if we make it through a cursor. // The break above will skip this if a cursor is partially read. c.tmp.Timestamps = nil c.tmp.Values = nil a = c.UnsignedArrayCursor.Next() } c.res.Timestamps = c.res.Timestamps[:pos] c.res.Values = c.res.Values[:pos] return c.res } type unsignedArrayCursor struct { cursors.UnsignedArrayCursor cursorContext filter *unsignedArrayFilterCursor } func (c *unsignedArrayCursor) reset(cur cursors.UnsignedArrayCursor, cursorIterator cursors.CursorIterator, cond expression) { if cond != nil { if c.filter == nil { c.filter = newUnsignedFilterArrayCursor(cond) } c.filter.reset(cur) cur = c.filter } c.UnsignedArrayCursor = cur c.cursorIterator = cursorIterator c.err = nil } func (c *unsignedArrayCursor) Err() error { return c.err } func (c *unsignedArrayCursor) Stats() cursors.CursorStats { return c.UnsignedArrayCursor.Stats() } func (c *unsignedArrayCursor) Next() *cursors.UnsignedArray { for { a := c.UnsignedArrayCursor.Next() if a.Len() == 0 { if c.nextArrayCursor() { continue } } return a } } func (c *unsignedArrayCursor) nextArrayCursor() bool { if c.cursorIterator == nil { return false } c.UnsignedArrayCursor.Close() cur, _ := c.cursorIterator.Next(c.ctx, c.req) c.cursorIterator = nil var ok bool if cur != nil { var next cursors.UnsignedArrayCursor next, ok = cur.(cursors.UnsignedArrayCursor) if !ok { cur.Close() next = UnsignedEmptyArrayCursor c.cursorIterator = nil c.err = errors.New("expected unsigned cursor") } else { if c.filter != nil { c.filter.reset(next) next = c.filter } } c.UnsignedArrayCursor = next } else { c.UnsignedArrayCursor = UnsignedEmptyArrayCursor } return ok } type unsignedLimitArrayCursor struct { cursors.UnsignedArrayCursor res *cursors.UnsignedArray done bool } func newUnsignedLimitArrayCursor(cur cursors.UnsignedArrayCursor) *unsignedLimitArrayCursor { return &unsignedLimitArrayCursor{ UnsignedArrayCursor: cur, res: cursors.NewUnsignedArrayLen(1), } } func (c *unsignedLimitArrayCursor) Stats() cursors.CursorStats { return c.UnsignedArrayCursor.Stats() } func (c *unsignedLimitArrayCursor) Next() *cursors.UnsignedArray { if c.done { return &cursors.UnsignedArray{} } a := c.UnsignedArrayCursor.Next() if len(a.Timestamps) == 0 { return a } c.done = true c.res.Timestamps[0] = a.Timestamps[0] c.res.Values[0] = a.Values[0] return c.res } type unsignedWindowLastArrayCursor struct { cursors.UnsignedArrayCursor every, offset, windowEnd int64 res *cursors.UnsignedArray tmp *cursors.UnsignedArray } // Window array cursors assume that every != 0 && every != MaxInt64. // Such a cursor will panic in the first case and possibly overflow in the second. func newUnsignedWindowLastArrayCursor(cur cursors.UnsignedArrayCursor, every, offset int64) *unsignedWindowLastArrayCursor { return &unsignedWindowLastArrayCursor{ UnsignedArrayCursor: cur, every: every, offset: offset, windowEnd: math.MinInt64, res: cursors.NewUnsignedArrayLen(MaxPointsPerBlock), tmp: &cursors.UnsignedArray{}, } } func (c *unsignedWindowLastArrayCursor) Stats() cursors.CursorStats { return c.UnsignedArrayCursor.Stats() } func (c *unsignedWindowLastArrayCursor) Next() *cursors.UnsignedArray { cur := -1 NEXT: var a *cursors.UnsignedArray if c.tmp.Len() > 0 { a = c.tmp } else { a = c.UnsignedArrayCursor.Next() } if a.Len() == 0 { c.res.Timestamps = c.res.Timestamps[:cur+1] c.res.Values = c.res.Values[:cur+1] return c.res } for i, t := range a.Timestamps { if t >= c.windowEnd { cur++ } if cur == MaxPointsPerBlock { c.tmp.Timestamps = a.Timestamps[i:] c.tmp.Values = a.Values[i:] return c.res } c.res.Timestamps[cur] = t c.res.Values[cur] = a.Values[i] c.windowEnd = WindowStop(t, c.every, c.offset) } c.tmp.Timestamps = nil c.tmp.Values = nil goto NEXT } type unsignedWindowFirstArrayCursor struct { cursors.UnsignedArrayCursor every, offset, windowEnd int64 res *cursors.UnsignedArray tmp *cursors.UnsignedArray } // Window array cursors assume that every != 0 && every != MaxInt64. // Such a cursor will panic in the first case and possibly overflow in the second. func newUnsignedWindowFirstArrayCursor(cur cursors.UnsignedArrayCursor, every, offset int64) *unsignedWindowFirstArrayCursor { return &unsignedWindowFirstArrayCursor{ UnsignedArrayCursor: cur, every: every, offset: offset, windowEnd: math.MinInt64, res: cursors.NewUnsignedArrayLen(MaxPointsPerBlock), tmp: &cursors.UnsignedArray{}, } } func (c *unsignedWindowFirstArrayCursor) Stats() cursors.CursorStats { return c.UnsignedArrayCursor.Stats() } func (c *unsignedWindowFirstArrayCursor) Next() *cursors.UnsignedArray { c.res.Timestamps = c.res.Timestamps[:0] c.res.Values = c.res.Values[:0] NEXT: var a *cursors.UnsignedArray if c.tmp.Len() > 0 { a = c.tmp } else { a = c.UnsignedArrayCursor.Next() } if a.Len() == 0 { return c.res } for i, t := range a.Timestamps { if t < c.windowEnd { continue } c.windowEnd = WindowStop(t, c.every, c.offset) c.res.Timestamps = append(c.res.Timestamps, t) c.res.Values = append(c.res.Values, a.Values[i]) if c.res.Len() == MaxPointsPerBlock { c.tmp.Timestamps = a.Timestamps[i+1:] c.tmp.Values = a.Values[i+1:] return c.res } } c.tmp.Timestamps = nil c.tmp.Values = nil goto NEXT } type unsignedWindowCountArrayCursor struct { cursors.UnsignedArrayCursor every, offset int64 res *cursors.IntegerArray tmp *cursors.UnsignedArray } func newUnsignedWindowCountArrayCursor(cur cursors.UnsignedArrayCursor, every, offset int64) *unsignedWindowCountArrayCursor { resLen := MaxPointsPerBlock if every == 0 { resLen = 1 } return &unsignedWindowCountArrayCursor{ UnsignedArrayCursor: cur, every: every, offset: offset, res: cursors.NewIntegerArrayLen(resLen), tmp: &cursors.UnsignedArray{}, } } func (c *unsignedWindowCountArrayCursor) Stats() cursors.CursorStats { return c.UnsignedArrayCursor.Stats() } func (c *unsignedWindowCountArrayCursor) Next() *cursors.IntegerArray { pos := 0 c.res.Timestamps = c.res.Timestamps[:cap(c.res.Timestamps)] c.res.Values = c.res.Values[:cap(c.res.Values)] var a *cursors.UnsignedArray if c.tmp.Len() > 0 { a = c.tmp } else { a = c.UnsignedArrayCursor.Next() } if a.Len() == 0 { return &cursors.IntegerArray{} } rowIdx := 0 var acc int64 = 0 var windowEnd int64 if c.every != 0 { windowEnd = WindowStop(a.Timestamps[rowIdx], c.every, c.offset) } else { windowEnd = math.MaxInt64 } windowHasPoints := false // enumerate windows WINDOWS: for { for ; rowIdx < a.Len(); rowIdx++ { ts := a.Timestamps[rowIdx] if c.every != 0 && ts >= windowEnd { // new window detected, close the current window // do not generate a point for empty windows if windowHasPoints { c.res.Timestamps[pos] = windowEnd c.res.Values[pos] = acc pos++ if pos >= MaxPointsPerBlock { // the output array is full, // save the remaining points in the input array in tmp. // they will be processed in the next call to Next() c.tmp.Timestamps = a.Timestamps[rowIdx:] c.tmp.Values = a.Values[rowIdx:] break WINDOWS } } // start the new window acc = 0 windowEnd = WindowStop(a.Timestamps[rowIdx], c.every, c.offset) windowHasPoints = false continue WINDOWS } else { acc++ windowHasPoints = true } } // Clear buffered timestamps & values if we make it through a cursor. // The break above will skip this if a cursor is partially read. c.tmp.Timestamps = nil c.tmp.Values = nil // get the next chunk a = c.UnsignedArrayCursor.Next() if a.Len() == 0 { // write the final point // do not generate a point for empty windows if windowHasPoints { c.res.Timestamps[pos] = windowEnd c.res.Values[pos] = acc pos++ } break WINDOWS } rowIdx = 0 } c.res.Timestamps = c.res.Timestamps[:pos] c.res.Values = c.res.Values[:pos] return c.res } type unsignedWindowSumArrayCursor struct { cursors.UnsignedArrayCursor every, offset int64 res *cursors.UnsignedArray tmp *cursors.UnsignedArray } func newUnsignedWindowSumArrayCursor(cur cursors.UnsignedArrayCursor, every, offset int64) *unsignedWindowSumArrayCursor { resLen := MaxPointsPerBlock if every == 0 { resLen = 1 } return &unsignedWindowSumArrayCursor{ UnsignedArrayCursor: cur, every: every, offset: offset, res: cursors.NewUnsignedArrayLen(resLen), tmp: &cursors.UnsignedArray{}, } } func (c *unsignedWindowSumArrayCursor) Stats() cursors.CursorStats { return c.UnsignedArrayCursor.Stats() } func (c *unsignedWindowSumArrayCursor) Next() *cursors.UnsignedArray { pos := 0 c.res.Timestamps = c.res.Timestamps[:cap(c.res.Timestamps)] c.res.Values = c.res.Values[:cap(c.res.Values)] var a *cursors.UnsignedArray if c.tmp.Len() > 0 { a = c.tmp } else { a = c.UnsignedArrayCursor.Next() } if a.Len() == 0 { return &cursors.UnsignedArray{} } rowIdx := 0 var acc uint64 = 0 var windowEnd int64 if c.every != 0 { windowEnd = WindowStop(a.Timestamps[rowIdx], c.every, c.offset) } else { windowEnd = math.MaxInt64 } windowHasPoints := false // enumerate windows WINDOWS: for { for ; rowIdx < a.Len(); rowIdx++ { ts := a.Timestamps[rowIdx] if c.every != 0 && ts >= windowEnd { // new window detected, close the current window // do not generate a point for empty windows if windowHasPoints { c.res.Timestamps[pos] = windowEnd c.res.Values[pos] = acc pos++ if pos >= MaxPointsPerBlock { // the output array is full, // save the remaining points in the input array in tmp. // they will be processed in the next call to Next() c.tmp.Timestamps = a.Timestamps[rowIdx:] c.tmp.Values = a.Values[rowIdx:] break WINDOWS } } // start the new window acc = 0 windowEnd = WindowStop(a.Timestamps[rowIdx], c.every, c.offset) windowHasPoints = false continue WINDOWS } else { acc += a.Values[rowIdx] windowHasPoints = true } } // Clear buffered timestamps & values if we make it through a cursor. // The break above will skip this if a cursor is partially read. c.tmp.Timestamps = nil c.tmp.Values = nil // get the next chunk a = c.UnsignedArrayCursor.Next() if a.Len() == 0 { // write the final point // do not generate a point for empty windows if windowHasPoints { c.res.Timestamps[pos] = windowEnd c.res.Values[pos] = acc pos++ } break WINDOWS } rowIdx = 0 } c.res.Timestamps = c.res.Timestamps[:pos] c.res.Values = c.res.Values[:pos] return c.res } type unsignedWindowMinArrayCursor struct { cursors.UnsignedArrayCursor every, offset int64 res *cursors.UnsignedArray tmp *cursors.UnsignedArray } func newUnsignedWindowMinArrayCursor(cur cursors.UnsignedArrayCursor, every, offset int64) *unsignedWindowMinArrayCursor { resLen := MaxPointsPerBlock if every == 0 { resLen = 1 } return &unsignedWindowMinArrayCursor{ UnsignedArrayCursor: cur, every: every, offset: offset, res: cursors.NewUnsignedArrayLen(resLen), tmp: &cursors.UnsignedArray{}, } } func (c *unsignedWindowMinArrayCursor) Stats() cursors.CursorStats { return c.UnsignedArrayCursor.Stats() } func (c *unsignedWindowMinArrayCursor) Next() *cursors.UnsignedArray { pos := 0 c.res.Timestamps = c.res.Timestamps[:cap(c.res.Timestamps)] c.res.Values = c.res.Values[:cap(c.res.Values)] var a *cursors.UnsignedArray if c.tmp.Len() > 0 { a = c.tmp } else { a = c.UnsignedArrayCursor.Next() } if a.Len() == 0 { return &cursors.UnsignedArray{} } rowIdx := 0 var acc uint64 = math.MaxUint64 var tsAcc int64 var windowEnd int64 if c.every != 0 { windowEnd = WindowStop(a.Timestamps[rowIdx], c.every, c.offset) } else { windowEnd = math.MaxInt64 } windowHasPoints := false // enumerate windows WINDOWS: for { for ; rowIdx < a.Len(); rowIdx++ { ts := a.Timestamps[rowIdx] if c.every != 0 && ts >= windowEnd { // new window detected, close the current window // do not generate a point for empty windows if windowHasPoints { c.res.Timestamps[pos] = tsAcc c.res.Values[pos] = acc pos++ if pos >= MaxPointsPerBlock { // the output array is full, // save the remaining points in the input array in tmp. // they will be processed in the next call to Next() c.tmp.Timestamps = a.Timestamps[rowIdx:] c.tmp.Values = a.Values[rowIdx:] break WINDOWS } } // start the new window acc = math.MaxUint64 windowEnd = WindowStop(a.Timestamps[rowIdx], c.every, c.offset) windowHasPoints = false continue WINDOWS } else { if !windowHasPoints || a.Values[rowIdx] < acc { acc = a.Values[rowIdx] tsAcc = a.Timestamps[rowIdx] } windowHasPoints = true } } // Clear buffered timestamps & values if we make it through a cursor. // The break above will skip this if a cursor is partially read. c.tmp.Timestamps = nil c.tmp.Values = nil // get the next chunk a = c.UnsignedArrayCursor.Next() if a.Len() == 0 { // write the final point // do not generate a point for empty windows if windowHasPoints { c.res.Timestamps[pos] = tsAcc c.res.Values[pos] = acc pos++ } break WINDOWS } rowIdx = 0 } c.res.Timestamps = c.res.Timestamps[:pos] c.res.Values = c.res.Values[:pos] return c.res } type unsignedWindowMaxArrayCursor struct { cursors.UnsignedArrayCursor every, offset int64 res *cursors.UnsignedArray tmp *cursors.UnsignedArray } func newUnsignedWindowMaxArrayCursor(cur cursors.UnsignedArrayCursor, every, offset int64) *unsignedWindowMaxArrayCursor { resLen := MaxPointsPerBlock if every == 0 { resLen = 1 } return &unsignedWindowMaxArrayCursor{ UnsignedArrayCursor: cur, every: every, offset: offset, res: cursors.NewUnsignedArrayLen(resLen), tmp: &cursors.UnsignedArray{}, } } func (c *unsignedWindowMaxArrayCursor) Stats() cursors.CursorStats { return c.UnsignedArrayCursor.Stats() } func (c *unsignedWindowMaxArrayCursor) Next() *cursors.UnsignedArray { pos := 0 c.res.Timestamps = c.res.Timestamps[:cap(c.res.Timestamps)] c.res.Values = c.res.Values[:cap(c.res.Values)] var a *cursors.UnsignedArray if c.tmp.Len() > 0 { a = c.tmp } else { a = c.UnsignedArrayCursor.Next() } if a.Len() == 0 { return &cursors.UnsignedArray{} } rowIdx := 0 var acc uint64 = 0 var tsAcc int64 var windowEnd int64 if c.every != 0 { windowEnd = WindowStop(a.Timestamps[rowIdx], c.every, c.offset) } else { windowEnd = math.MaxInt64 } windowHasPoints := false // enumerate windows WINDOWS: for { for ; rowIdx < a.Len(); rowIdx++ { ts := a.Timestamps[rowIdx] if c.every != 0 && ts >= windowEnd { // new window detected, close the current window // do not generate a point for empty windows if windowHasPoints { c.res.Timestamps[pos] = tsAcc c.res.Values[pos] = acc pos++ if pos >= MaxPointsPerBlock { // the output array is full, // save the remaining points in the input array in tmp. // they will be processed in the next call to Next() c.tmp.Timestamps = a.Timestamps[rowIdx:] c.tmp.Values = a.Values[rowIdx:] break WINDOWS } } // start the new window acc = 0 windowEnd = WindowStop(a.Timestamps[rowIdx], c.every, c.offset) windowHasPoints = false continue WINDOWS } else { if !windowHasPoints || a.Values[rowIdx] > acc { acc = a.Values[rowIdx] tsAcc = a.Timestamps[rowIdx] } windowHasPoints = true } } // Clear buffered timestamps & values if we make it through a cursor. // The break above will skip this if a cursor is partially read. c.tmp.Timestamps = nil c.tmp.Values = nil // get the next chunk a = c.UnsignedArrayCursor.Next() if a.Len() == 0 { // write the final point // do not generate a point for empty windows if windowHasPoints { c.res.Timestamps[pos] = tsAcc c.res.Values[pos] = acc pos++ } break WINDOWS } rowIdx = 0 } c.res.Timestamps = c.res.Timestamps[:pos] c.res.Values = c.res.Values[:pos] return c.res } type unsignedWindowMeanArrayCursor struct { cursors.UnsignedArrayCursor every, offset int64 res *cursors.FloatArray tmp *cursors.UnsignedArray } func newUnsignedWindowMeanArrayCursor(cur cursors.UnsignedArrayCursor, every, offset int64) *unsignedWindowMeanArrayCursor { resLen := MaxPointsPerBlock if every == 0 { resLen = 1 } return &unsignedWindowMeanArrayCursor{ UnsignedArrayCursor: cur, every: every, offset: offset, res: cursors.NewFloatArrayLen(resLen), tmp: &cursors.UnsignedArray{}, } } func (c *unsignedWindowMeanArrayCursor) Stats() cursors.CursorStats { return c.UnsignedArrayCursor.Stats() } func (c *unsignedWindowMeanArrayCursor) Next() *cursors.FloatArray { pos := 0 c.res.Timestamps = c.res.Timestamps[:cap(c.res.Timestamps)] c.res.Values = c.res.Values[:cap(c.res.Values)] var a *cursors.UnsignedArray if c.tmp.Len() > 0 { a = c.tmp } else { a = c.UnsignedArrayCursor.Next() } if a.Len() == 0 { return &cursors.FloatArray{} } rowIdx := 0 var sum uint64 var count int64 var windowEnd int64 if c.every != 0 { windowEnd = WindowStop(a.Timestamps[rowIdx], c.every, c.offset) } else { windowEnd = math.MaxInt64 } windowHasPoints := false // enumerate windows WINDOWS: for { for ; rowIdx < a.Len(); rowIdx++ { ts := a.Timestamps[rowIdx] if c.every != 0 && ts >= windowEnd { // new window detected, close the current window // do not generate a point for empty windows if windowHasPoints { c.res.Timestamps[pos] = windowEnd c.res.Values[pos] = float64(sum) / float64(count) pos++ if pos >= MaxPointsPerBlock { // the output array is full, // save the remaining points in the input array in tmp. // they will be processed in the next call to Next() c.tmp.Timestamps = a.Timestamps[rowIdx:] c.tmp.Values = a.Values[rowIdx:] break WINDOWS } } // start the new window sum = 0 count = 0 windowEnd = WindowStop(a.Timestamps[rowIdx], c.every, c.offset) windowHasPoints = false continue WINDOWS } else { sum += a.Values[rowIdx] count++ windowHasPoints = true } } // Clear buffered timestamps & values if we make it through a cursor. // The break above will skip this if a cursor is partially read. c.tmp.Timestamps = nil c.tmp.Values = nil // get the next chunk a = c.UnsignedArrayCursor.Next() if a.Len() == 0 { // write the final point // do not generate a point for empty windows if windowHasPoints { c.res.Timestamps[pos] = windowEnd c.res.Values[pos] = float64(sum) / float64(count) pos++ } break WINDOWS } rowIdx = 0 } c.res.Timestamps = c.res.Timestamps[:pos] c.res.Values = c.res.Values[:pos] return c.res } type unsignedEmptyArrayCursor struct { res cursors.UnsignedArray } var UnsignedEmptyArrayCursor cursors.UnsignedArrayCursor = &unsignedEmptyArrayCursor{} func (c *unsignedEmptyArrayCursor) Err() error { return nil } func (c *unsignedEmptyArrayCursor) Close() {} func (c *unsignedEmptyArrayCursor) Stats() cursors.CursorStats { return cursors.CursorStats{} } func (c *unsignedEmptyArrayCursor) Next() *cursors.UnsignedArray { return &c.res } // ******************** // String Array Cursor type stringArrayFilterCursor struct { cursors.StringArrayCursor cond expression m *singleValue res *cursors.StringArray tmp *cursors.StringArray } func newStringFilterArrayCursor(cond expression) *stringArrayFilterCursor { return &stringArrayFilterCursor{ cond: cond, m: &singleValue{}, res: cursors.NewStringArrayLen(MaxPointsPerBlock), tmp: &cursors.StringArray{}, } } func (c *stringArrayFilterCursor) reset(cur cursors.StringArrayCursor) { c.StringArrayCursor = cur c.tmp.Timestamps, c.tmp.Values = nil, nil } func (c *stringArrayFilterCursor) Stats() cursors.CursorStats { return c.StringArrayCursor.Stats() } func (c *stringArrayFilterCursor) Next() *cursors.StringArray { pos := 0 c.res.Timestamps = c.res.Timestamps[:cap(c.res.Timestamps)] c.res.Values = c.res.Values[:cap(c.res.Values)] var a *cursors.StringArray if c.tmp.Len() > 0 { a = c.tmp } else { a = c.StringArrayCursor.Next() } LOOP: for len(a.Timestamps) > 0 { for i, v := range a.Values { c.m.v = v if c.cond.EvalBool(c.m) { c.res.Timestamps[pos] = a.Timestamps[i] c.res.Values[pos] = v pos++ if pos >= MaxPointsPerBlock { c.tmp.Timestamps = a.Timestamps[i+1:] c.tmp.Values = a.Values[i+1:] break LOOP } } } // Clear bufferred timestamps & values if we make it through a cursor. // The break above will skip this if a cursor is partially read. c.tmp.Timestamps = nil c.tmp.Values = nil a = c.StringArrayCursor.Next() } c.res.Timestamps = c.res.Timestamps[:pos] c.res.Values = c.res.Values[:pos] return c.res } type stringArrayCursor struct { cursors.StringArrayCursor cursorContext filter *stringArrayFilterCursor } func (c *stringArrayCursor) reset(cur cursors.StringArrayCursor, cursorIterator cursors.CursorIterator, cond expression) { if cond != nil { if c.filter == nil { c.filter = newStringFilterArrayCursor(cond) } c.filter.reset(cur) cur = c.filter } c.StringArrayCursor = cur c.cursorIterator = cursorIterator c.err = nil } func (c *stringArrayCursor) Err() error { return c.err } func (c *stringArrayCursor) Stats() cursors.CursorStats { return c.StringArrayCursor.Stats() } func (c *stringArrayCursor) Next() *cursors.StringArray { for { a := c.StringArrayCursor.Next() if a.Len() == 0 { if c.nextArrayCursor() { continue } } return a } } func (c *stringArrayCursor) nextArrayCursor() bool { if c.cursorIterator == nil { return false } c.StringArrayCursor.Close() cur, _ := c.cursorIterator.Next(c.ctx, c.req) c.cursorIterator = nil var ok bool if cur != nil { var next cursors.StringArrayCursor next, ok = cur.(cursors.StringArrayCursor) if !ok { cur.Close() next = StringEmptyArrayCursor c.cursorIterator = nil c.err = errors.New("expected string cursor") } else { if c.filter != nil { c.filter.reset(next) next = c.filter } } c.StringArrayCursor = next } else { c.StringArrayCursor = StringEmptyArrayCursor } return ok } type stringLimitArrayCursor struct { cursors.StringArrayCursor res *cursors.StringArray done bool } func newStringLimitArrayCursor(cur cursors.StringArrayCursor) *stringLimitArrayCursor { return &stringLimitArrayCursor{ StringArrayCursor: cur, res: cursors.NewStringArrayLen(1), } } func (c *stringLimitArrayCursor) Stats() cursors.CursorStats { return c.StringArrayCursor.Stats() } func (c *stringLimitArrayCursor) Next() *cursors.StringArray { if c.done { return &cursors.StringArray{} } a := c.StringArrayCursor.Next() if len(a.Timestamps) == 0 { return a } c.done = true c.res.Timestamps[0] = a.Timestamps[0] c.res.Values[0] = a.Values[0] return c.res } type stringWindowLastArrayCursor struct { cursors.StringArrayCursor every, offset, windowEnd int64 res *cursors.StringArray tmp *cursors.StringArray } // Window array cursors assume that every != 0 && every != MaxInt64. // Such a cursor will panic in the first case and possibly overflow in the second. func newStringWindowLastArrayCursor(cur cursors.StringArrayCursor, every, offset int64) *stringWindowLastArrayCursor { return &stringWindowLastArrayCursor{ StringArrayCursor: cur, every: every, offset: offset, windowEnd: math.MinInt64, res: cursors.NewStringArrayLen(MaxPointsPerBlock), tmp: &cursors.StringArray{}, } } func (c *stringWindowLastArrayCursor) Stats() cursors.CursorStats { return c.StringArrayCursor.Stats() } func (c *stringWindowLastArrayCursor) Next() *cursors.StringArray { cur := -1 NEXT: var a *cursors.StringArray if c.tmp.Len() > 0 { a = c.tmp } else { a = c.StringArrayCursor.Next() } if a.Len() == 0 { c.res.Timestamps = c.res.Timestamps[:cur+1] c.res.Values = c.res.Values[:cur+1] return c.res } for i, t := range a.Timestamps { if t >= c.windowEnd { cur++ } if cur == MaxPointsPerBlock { c.tmp.Timestamps = a.Timestamps[i:] c.tmp.Values = a.Values[i:] return c.res } c.res.Timestamps[cur] = t c.res.Values[cur] = a.Values[i] c.windowEnd = WindowStop(t, c.every, c.offset) } c.tmp.Timestamps = nil c.tmp.Values = nil goto NEXT } type stringWindowFirstArrayCursor struct { cursors.StringArrayCursor every, offset, windowEnd int64 res *cursors.StringArray tmp *cursors.StringArray } // Window array cursors assume that every != 0 && every != MaxInt64. // Such a cursor will panic in the first case and possibly overflow in the second. func newStringWindowFirstArrayCursor(cur cursors.StringArrayCursor, every, offset int64) *stringWindowFirstArrayCursor { return &stringWindowFirstArrayCursor{ StringArrayCursor: cur, every: every, offset: offset, windowEnd: math.MinInt64, res: cursors.NewStringArrayLen(MaxPointsPerBlock), tmp: &cursors.StringArray{}, } } func (c *stringWindowFirstArrayCursor) Stats() cursors.CursorStats { return c.StringArrayCursor.Stats() } func (c *stringWindowFirstArrayCursor) Next() *cursors.StringArray { c.res.Timestamps = c.res.Timestamps[:0] c.res.Values = c.res.Values[:0] NEXT: var a *cursors.StringArray if c.tmp.Len() > 0 { a = c.tmp } else { a = c.StringArrayCursor.Next() } if a.Len() == 0 { return c.res } for i, t := range a.Timestamps { if t < c.windowEnd { continue } c.windowEnd = WindowStop(t, c.every, c.offset) c.res.Timestamps = append(c.res.Timestamps, t) c.res.Values = append(c.res.Values, a.Values[i]) if c.res.Len() == MaxPointsPerBlock { c.tmp.Timestamps = a.Timestamps[i+1:] c.tmp.Values = a.Values[i+1:] return c.res } } c.tmp.Timestamps = nil c.tmp.Values = nil goto NEXT } type stringWindowCountArrayCursor struct { cursors.StringArrayCursor every, offset int64 res *cursors.IntegerArray tmp *cursors.StringArray } func newStringWindowCountArrayCursor(cur cursors.StringArrayCursor, every, offset int64) *stringWindowCountArrayCursor { resLen := MaxPointsPerBlock if every == 0 { resLen = 1 } return &stringWindowCountArrayCursor{ StringArrayCursor: cur, every: every, offset: offset, res: cursors.NewIntegerArrayLen(resLen), tmp: &cursors.StringArray{}, } } func (c *stringWindowCountArrayCursor) Stats() cursors.CursorStats { return c.StringArrayCursor.Stats() } func (c *stringWindowCountArrayCursor) Next() *cursors.IntegerArray { pos := 0 c.res.Timestamps = c.res.Timestamps[:cap(c.res.Timestamps)] c.res.Values = c.res.Values[:cap(c.res.Values)] var a *cursors.StringArray if c.tmp.Len() > 0 { a = c.tmp } else { a = c.StringArrayCursor.Next() } if a.Len() == 0 { return &cursors.IntegerArray{} } rowIdx := 0 var acc int64 = 0 var windowEnd int64 if c.every != 0 { windowEnd = WindowStop(a.Timestamps[rowIdx], c.every, c.offset) } else { windowEnd = math.MaxInt64 } windowHasPoints := false // enumerate windows WINDOWS: for { for ; rowIdx < a.Len(); rowIdx++ { ts := a.Timestamps[rowIdx] if c.every != 0 && ts >= windowEnd { // new window detected, close the current window // do not generate a point for empty windows if windowHasPoints { c.res.Timestamps[pos] = windowEnd c.res.Values[pos] = acc pos++ if pos >= MaxPointsPerBlock { // the output array is full, // save the remaining points in the input array in tmp. // they will be processed in the next call to Next() c.tmp.Timestamps = a.Timestamps[rowIdx:] c.tmp.Values = a.Values[rowIdx:] break WINDOWS } } // start the new window acc = 0 windowEnd = WindowStop(a.Timestamps[rowIdx], c.every, c.offset) windowHasPoints = false continue WINDOWS } else { acc++ windowHasPoints = true } } // Clear buffered timestamps & values if we make it through a cursor. // The break above will skip this if a cursor is partially read. c.tmp.Timestamps = nil c.tmp.Values = nil // get the next chunk a = c.StringArrayCursor.Next() if a.Len() == 0 { // write the final point // do not generate a point for empty windows if windowHasPoints { c.res.Timestamps[pos] = windowEnd c.res.Values[pos] = acc pos++ } break WINDOWS } rowIdx = 0 } c.res.Timestamps = c.res.Timestamps[:pos] c.res.Values = c.res.Values[:pos] return c.res } type stringEmptyArrayCursor struct { res cursors.StringArray } var StringEmptyArrayCursor cursors.StringArrayCursor = &stringEmptyArrayCursor{} func (c *stringEmptyArrayCursor) Err() error { return nil } func (c *stringEmptyArrayCursor) Close() {} func (c *stringEmptyArrayCursor) Stats() cursors.CursorStats { return cursors.CursorStats{} } func (c *stringEmptyArrayCursor) Next() *cursors.StringArray { return &c.res } // ******************** // Boolean Array Cursor type booleanArrayFilterCursor struct { cursors.BooleanArrayCursor cond expression m *singleValue res *cursors.BooleanArray tmp *cursors.BooleanArray } func newBooleanFilterArrayCursor(cond expression) *booleanArrayFilterCursor { return &booleanArrayFilterCursor{ cond: cond, m: &singleValue{}, res: cursors.NewBooleanArrayLen(MaxPointsPerBlock), tmp: &cursors.BooleanArray{}, } } func (c *booleanArrayFilterCursor) reset(cur cursors.BooleanArrayCursor) { c.BooleanArrayCursor = cur c.tmp.Timestamps, c.tmp.Values = nil, nil } func (c *booleanArrayFilterCursor) Stats() cursors.CursorStats { return c.BooleanArrayCursor.Stats() } func (c *booleanArrayFilterCursor) Next() *cursors.BooleanArray { pos := 0 c.res.Timestamps = c.res.Timestamps[:cap(c.res.Timestamps)] c.res.Values = c.res.Values[:cap(c.res.Values)] var a *cursors.BooleanArray if c.tmp.Len() > 0 { a = c.tmp } else { a = c.BooleanArrayCursor.Next() } LOOP: for len(a.Timestamps) > 0 { for i, v := range a.Values { c.m.v = v if c.cond.EvalBool(c.m) { c.res.Timestamps[pos] = a.Timestamps[i] c.res.Values[pos] = v pos++ if pos >= MaxPointsPerBlock { c.tmp.Timestamps = a.Timestamps[i+1:] c.tmp.Values = a.Values[i+1:] break LOOP } } } // Clear bufferred timestamps & values if we make it through a cursor. // The break above will skip this if a cursor is partially read. c.tmp.Timestamps = nil c.tmp.Values = nil a = c.BooleanArrayCursor.Next() } c.res.Timestamps = c.res.Timestamps[:pos] c.res.Values = c.res.Values[:pos] return c.res } type booleanArrayCursor struct { cursors.BooleanArrayCursor cursorContext filter *booleanArrayFilterCursor } func (c *booleanArrayCursor) reset(cur cursors.BooleanArrayCursor, cursorIterator cursors.CursorIterator, cond expression) { if cond != nil { if c.filter == nil { c.filter = newBooleanFilterArrayCursor(cond) } c.filter.reset(cur) cur = c.filter } c.BooleanArrayCursor = cur c.cursorIterator = cursorIterator c.err = nil } func (c *booleanArrayCursor) Err() error { return c.err } func (c *booleanArrayCursor) Stats() cursors.CursorStats { return c.BooleanArrayCursor.Stats() } func (c *booleanArrayCursor) Next() *cursors.BooleanArray { for { a := c.BooleanArrayCursor.Next() if a.Len() == 0 { if c.nextArrayCursor() { continue } } return a } } func (c *booleanArrayCursor) nextArrayCursor() bool { if c.cursorIterator == nil { return false } c.BooleanArrayCursor.Close() cur, _ := c.cursorIterator.Next(c.ctx, c.req) c.cursorIterator = nil var ok bool if cur != nil { var next cursors.BooleanArrayCursor next, ok = cur.(cursors.BooleanArrayCursor) if !ok { cur.Close() next = BooleanEmptyArrayCursor c.cursorIterator = nil c.err = errors.New("expected boolean cursor") } else { if c.filter != nil { c.filter.reset(next) next = c.filter } } c.BooleanArrayCursor = next } else { c.BooleanArrayCursor = BooleanEmptyArrayCursor } return ok } type booleanLimitArrayCursor struct { cursors.BooleanArrayCursor res *cursors.BooleanArray done bool } func newBooleanLimitArrayCursor(cur cursors.BooleanArrayCursor) *booleanLimitArrayCursor { return &booleanLimitArrayCursor{ BooleanArrayCursor: cur, res: cursors.NewBooleanArrayLen(1), } } func (c *booleanLimitArrayCursor) Stats() cursors.CursorStats { return c.BooleanArrayCursor.Stats() } func (c *booleanLimitArrayCursor) Next() *cursors.BooleanArray { if c.done { return &cursors.BooleanArray{} } a := c.BooleanArrayCursor.Next() if len(a.Timestamps) == 0 { return a } c.done = true c.res.Timestamps[0] = a.Timestamps[0] c.res.Values[0] = a.Values[0] return c.res } type booleanWindowLastArrayCursor struct { cursors.BooleanArrayCursor every, offset, windowEnd int64 res *cursors.BooleanArray tmp *cursors.BooleanArray } // Window array cursors assume that every != 0 && every != MaxInt64. // Such a cursor will panic in the first case and possibly overflow in the second. func newBooleanWindowLastArrayCursor(cur cursors.BooleanArrayCursor, every, offset int64) *booleanWindowLastArrayCursor { return &booleanWindowLastArrayCursor{ BooleanArrayCursor: cur, every: every, offset: offset, windowEnd: math.MinInt64, res: cursors.NewBooleanArrayLen(MaxPointsPerBlock), tmp: &cursors.BooleanArray{}, } } func (c *booleanWindowLastArrayCursor) Stats() cursors.CursorStats { return c.BooleanArrayCursor.Stats() } func (c *booleanWindowLastArrayCursor) Next() *cursors.BooleanArray { cur := -1 NEXT: var a *cursors.BooleanArray if c.tmp.Len() > 0 { a = c.tmp } else { a = c.BooleanArrayCursor.Next() } if a.Len() == 0 { c.res.Timestamps = c.res.Timestamps[:cur+1] c.res.Values = c.res.Values[:cur+1] return c.res } for i, t := range a.Timestamps { if t >= c.windowEnd { cur++ } if cur == MaxPointsPerBlock { c.tmp.Timestamps = a.Timestamps[i:] c.tmp.Values = a.Values[i:] return c.res } c.res.Timestamps[cur] = t c.res.Values[cur] = a.Values[i] c.windowEnd = WindowStop(t, c.every, c.offset) } c.tmp.Timestamps = nil c.tmp.Values = nil goto NEXT } type booleanWindowFirstArrayCursor struct { cursors.BooleanArrayCursor every, offset, windowEnd int64 res *cursors.BooleanArray tmp *cursors.BooleanArray } // Window array cursors assume that every != 0 && every != MaxInt64. // Such a cursor will panic in the first case and possibly overflow in the second. func newBooleanWindowFirstArrayCursor(cur cursors.BooleanArrayCursor, every, offset int64) *booleanWindowFirstArrayCursor { return &booleanWindowFirstArrayCursor{ BooleanArrayCursor: cur, every: every, offset: offset, windowEnd: math.MinInt64, res: cursors.NewBooleanArrayLen(MaxPointsPerBlock), tmp: &cursors.BooleanArray{}, } } func (c *booleanWindowFirstArrayCursor) Stats() cursors.CursorStats { return c.BooleanArrayCursor.Stats() } func (c *booleanWindowFirstArrayCursor) Next() *cursors.BooleanArray { c.res.Timestamps = c.res.Timestamps[:0] c.res.Values = c.res.Values[:0] NEXT: var a *cursors.BooleanArray if c.tmp.Len() > 0 { a = c.tmp } else { a = c.BooleanArrayCursor.Next() } if a.Len() == 0 { return c.res } for i, t := range a.Timestamps { if t < c.windowEnd { continue } c.windowEnd = WindowStop(t, c.every, c.offset) c.res.Timestamps = append(c.res.Timestamps, t) c.res.Values = append(c.res.Values, a.Values[i]) if c.res.Len() == MaxPointsPerBlock { c.tmp.Timestamps = a.Timestamps[i+1:] c.tmp.Values = a.Values[i+1:] return c.res } } c.tmp.Timestamps = nil c.tmp.Values = nil goto NEXT } type booleanWindowCountArrayCursor struct { cursors.BooleanArrayCursor every, offset int64 res *cursors.IntegerArray tmp *cursors.BooleanArray } func newBooleanWindowCountArrayCursor(cur cursors.BooleanArrayCursor, every, offset int64) *booleanWindowCountArrayCursor { resLen := MaxPointsPerBlock if every == 0 { resLen = 1 } return &booleanWindowCountArrayCursor{ BooleanArrayCursor: cur, every: every, offset: offset, res: cursors.NewIntegerArrayLen(resLen), tmp: &cursors.BooleanArray{}, } } func (c *booleanWindowCountArrayCursor) Stats() cursors.CursorStats { return c.BooleanArrayCursor.Stats() } func (c *booleanWindowCountArrayCursor) Next() *cursors.IntegerArray { pos := 0 c.res.Timestamps = c.res.Timestamps[:cap(c.res.Timestamps)] c.res.Values = c.res.Values[:cap(c.res.Values)] var a *cursors.BooleanArray if c.tmp.Len() > 0 { a = c.tmp } else { a = c.BooleanArrayCursor.Next() } if a.Len() == 0 { return &cursors.IntegerArray{} } rowIdx := 0 var acc int64 = 0 var windowEnd int64 if c.every != 0 { windowEnd = WindowStop(a.Timestamps[rowIdx], c.every, c.offset) } else { windowEnd = math.MaxInt64 } windowHasPoints := false // enumerate windows WINDOWS: for { for ; rowIdx < a.Len(); rowIdx++ { ts := a.Timestamps[rowIdx] if c.every != 0 && ts >= windowEnd { // new window detected, close the current window // do not generate a point for empty windows if windowHasPoints { c.res.Timestamps[pos] = windowEnd c.res.Values[pos] = acc pos++ if pos >= MaxPointsPerBlock { // the output array is full, // save the remaining points in the input array in tmp. // they will be processed in the next call to Next() c.tmp.Timestamps = a.Timestamps[rowIdx:] c.tmp.Values = a.Values[rowIdx:] break WINDOWS } } // start the new window acc = 0 windowEnd = WindowStop(a.Timestamps[rowIdx], c.every, c.offset) windowHasPoints = false continue WINDOWS } else { acc++ windowHasPoints = true } } // Clear buffered timestamps & values if we make it through a cursor. // The break above will skip this if a cursor is partially read. c.tmp.Timestamps = nil c.tmp.Values = nil // get the next chunk a = c.BooleanArrayCursor.Next() if a.Len() == 0 { // write the final point // do not generate a point for empty windows if windowHasPoints { c.res.Timestamps[pos] = windowEnd c.res.Values[pos] = acc pos++ } break WINDOWS } rowIdx = 0 } c.res.Timestamps = c.res.Timestamps[:pos] c.res.Values = c.res.Values[:pos] return c.res } type booleanEmptyArrayCursor struct { res cursors.BooleanArray } var BooleanEmptyArrayCursor cursors.BooleanArrayCursor = &booleanEmptyArrayCursor{} func (c *booleanEmptyArrayCursor) Err() error { return nil } func (c *booleanEmptyArrayCursor) Close() {} func (c *booleanEmptyArrayCursor) Stats() cursors.CursorStats { return cursors.CursorStats{} } func (c *booleanEmptyArrayCursor) Next() *cursors.BooleanArray { return &c.res } func arrayCursorType(cur cursors.Cursor) string { switch cur.(type) { case cursors.FloatArrayCursor: return "float" case cursors.IntegerArrayCursor: return "integer" case cursors.UnsignedArrayCursor: return "unsigned" case cursors.StringArrayCursor: return "string" case cursors.BooleanArrayCursor: return "boolean" default: return "unknown" } }