Taught tsm1 storage engine how to read and write uint64 values

* introduced UnsignedValue type
  * leveraged existing int64 compression algorithms (RLE, Simple 8B)
* tsm and WAL can read and write UnsignedValue
* compaction is aware of UnsignedValue
* unsigned support to model, cursors and write points

NOTE: there is no support to create unsigned points, as the line
protocol has not been modified.
pull/8615/head
Stuart Carnie 2017-06-06 15:40:38 +08:00
parent 4a87662d60
commit eec80692c4
32 changed files with 8101 additions and 93 deletions

View File

@ -38,6 +38,8 @@ const (
Tag = 7
// AnyField means the data type is any field.
AnyField = 8
// Unsigned means the data type is an unsigned integer.
Unsigned = 9
)
var (

View File

@ -6,6 +6,8 @@ func castToFloat(v interface{}) float64 {
return v
case int64:
return float64(v)
case uint64:
return float64(v)
default:
return float64(0)
}
@ -17,11 +19,26 @@ func castToInteger(v interface{}) int64 {
return int64(v)
case int64:
return v
case uint64:
return int64(v)
default:
return int64(0)
}
}
func castToUnsigned(v interface{}) uint64 {
switch v := v.(type) {
case float64:
return uint64(v)
case uint64:
return v
case int64:
return uint64(v)
default:
return uint64(0)
}
}
func castToString(v interface{}) string {
switch v := v.(type) {
case string:

View File

@ -181,6 +181,76 @@ func (r *FloatSliceFuncIntegerReducer) Emit() []IntegerPoint {
return r.fn(r.points)
}
// FloatReduceUnsignedFunc is the function called by a FloatPoint reducer.
type FloatReduceUnsignedFunc func(prev *UnsignedPoint, curr *FloatPoint) (t int64, v uint64, aux []interface{})
// FloatFuncUnsignedReducer is a reducer that reduces
// the passed in points to a single point using a reduce function.
type FloatFuncUnsignedReducer struct {
prev *UnsignedPoint
fn FloatReduceUnsignedFunc
}
// NewFloatFuncUnsignedReducer creates a new FloatFuncUnsignedReducer.
func NewFloatFuncUnsignedReducer(fn FloatReduceUnsignedFunc, prev *UnsignedPoint) *FloatFuncUnsignedReducer {
return &FloatFuncUnsignedReducer{fn: fn, prev: prev}
}
// AggregateFloat takes a FloatPoint and invokes the reduce function with the
// current and new point to modify the current point.
func (r *FloatFuncUnsignedReducer) AggregateFloat(p *FloatPoint) {
t, v, aux := r.fn(r.prev, p)
if r.prev == nil {
r.prev = &UnsignedPoint{}
}
r.prev.Time = t
r.prev.Value = v
r.prev.Aux = aux
if p.Aggregated > 1 {
r.prev.Aggregated += p.Aggregated
} else {
r.prev.Aggregated++
}
}
// Emit emits the point that was generated when reducing the points fed in with AggregateFloat.
func (r *FloatFuncUnsignedReducer) Emit() []UnsignedPoint {
return []UnsignedPoint{*r.prev}
}
// FloatReduceUnsignedSliceFunc is the function called by a FloatPoint reducer.
type FloatReduceUnsignedSliceFunc func(a []FloatPoint) []UnsignedPoint
// FloatSliceFuncUnsignedReducer is a reducer that aggregates
// the passed in points and then invokes the function to reduce the points when they are emitted.
type FloatSliceFuncUnsignedReducer struct {
points []FloatPoint
fn FloatReduceUnsignedSliceFunc
}
// NewFloatSliceFuncUnsignedReducer creates a new FloatSliceFuncUnsignedReducer.
func NewFloatSliceFuncUnsignedReducer(fn FloatReduceUnsignedSliceFunc) *FloatSliceFuncUnsignedReducer {
return &FloatSliceFuncUnsignedReducer{fn: fn}
}
// AggregateFloat copies the FloatPoint into the internal slice to be passed
// to the reduce function when Emit is called.
func (r *FloatSliceFuncUnsignedReducer) AggregateFloat(p *FloatPoint) {
r.points = append(r.points, *p.Clone())
}
// AggregateFloatBulk performs a bulk copy of FloatPoints into the internal slice.
// This is a more efficient version of calling AggregateFloat on each point.
func (r *FloatSliceFuncUnsignedReducer) AggregateFloatBulk(points []FloatPoint) {
r.points = append(r.points, points...)
}
// Emit invokes the reduce function on the aggregated points to generate the aggregated points.
// This method does not clear the points from the internal slice.
func (r *FloatSliceFuncUnsignedReducer) Emit() []UnsignedPoint {
return r.fn(r.points)
}
// FloatReduceStringFunc is the function called by a FloatPoint reducer.
type FloatReduceStringFunc func(prev *StringPoint, curr *FloatPoint) (t int64, v string, aux []interface{})
@ -595,6 +665,76 @@ func (r *IntegerSliceFuncReducer) Emit() []IntegerPoint {
return r.fn(r.points)
}
// IntegerReduceUnsignedFunc is the function called by a IntegerPoint reducer.
type IntegerReduceUnsignedFunc func(prev *UnsignedPoint, curr *IntegerPoint) (t int64, v uint64, aux []interface{})
// IntegerFuncUnsignedReducer is a reducer that reduces
// the passed in points to a single point using a reduce function.
type IntegerFuncUnsignedReducer struct {
prev *UnsignedPoint
fn IntegerReduceUnsignedFunc
}
// NewIntegerFuncUnsignedReducer creates a new IntegerFuncUnsignedReducer.
func NewIntegerFuncUnsignedReducer(fn IntegerReduceUnsignedFunc, prev *UnsignedPoint) *IntegerFuncUnsignedReducer {
return &IntegerFuncUnsignedReducer{fn: fn, prev: prev}
}
// AggregateInteger takes a IntegerPoint and invokes the reduce function with the
// current and new point to modify the current point.
func (r *IntegerFuncUnsignedReducer) AggregateInteger(p *IntegerPoint) {
t, v, aux := r.fn(r.prev, p)
if r.prev == nil {
r.prev = &UnsignedPoint{}
}
r.prev.Time = t
r.prev.Value = v
r.prev.Aux = aux
if p.Aggregated > 1 {
r.prev.Aggregated += p.Aggregated
} else {
r.prev.Aggregated++
}
}
// Emit emits the point that was generated when reducing the points fed in with AggregateInteger.
func (r *IntegerFuncUnsignedReducer) Emit() []UnsignedPoint {
return []UnsignedPoint{*r.prev}
}
// IntegerReduceUnsignedSliceFunc is the function called by a IntegerPoint reducer.
type IntegerReduceUnsignedSliceFunc func(a []IntegerPoint) []UnsignedPoint
// IntegerSliceFuncUnsignedReducer is a reducer that aggregates
// the passed in points and then invokes the function to reduce the points when they are emitted.
type IntegerSliceFuncUnsignedReducer struct {
points []IntegerPoint
fn IntegerReduceUnsignedSliceFunc
}
// NewIntegerSliceFuncUnsignedReducer creates a new IntegerSliceFuncUnsignedReducer.
func NewIntegerSliceFuncUnsignedReducer(fn IntegerReduceUnsignedSliceFunc) *IntegerSliceFuncUnsignedReducer {
return &IntegerSliceFuncUnsignedReducer{fn: fn}
}
// AggregateInteger copies the IntegerPoint into the internal slice to be passed
// to the reduce function when Emit is called.
func (r *IntegerSliceFuncUnsignedReducer) AggregateInteger(p *IntegerPoint) {
r.points = append(r.points, *p.Clone())
}
// AggregateIntegerBulk performs a bulk copy of IntegerPoints into the internal slice.
// This is a more efficient version of calling AggregateInteger on each point.
func (r *IntegerSliceFuncUnsignedReducer) AggregateIntegerBulk(points []IntegerPoint) {
r.points = append(r.points, points...)
}
// Emit invokes the reduce function on the aggregated points to generate the aggregated points.
// This method does not clear the points from the internal slice.
func (r *IntegerSliceFuncUnsignedReducer) Emit() []UnsignedPoint {
return r.fn(r.points)
}
// IntegerReduceStringFunc is the function called by a IntegerPoint reducer.
type IntegerReduceStringFunc func(prev *StringPoint, curr *IntegerPoint) (t int64, v string, aux []interface{})
@ -840,6 +980,490 @@ func (r *IntegerSampleReducer) Emit() []IntegerPoint {
return pts
}
// UnsignedPointAggregator aggregates points to produce a single point.
type UnsignedPointAggregator interface {
AggregateUnsigned(p *UnsignedPoint)
}
// UnsignedBulkPointAggregator aggregates multiple points at a time.
type UnsignedBulkPointAggregator interface {
AggregateUnsignedBulk(points []UnsignedPoint)
}
// AggregateUnsignedPoints feeds a slice of UnsignedPoint into an
// aggregator. If the aggregator is a UnsignedBulkPointAggregator, it will
// use the AggregateBulk method.
func AggregateUnsignedPoints(a UnsignedPointAggregator, points []UnsignedPoint) {
switch a := a.(type) {
case UnsignedBulkPointAggregator:
a.AggregateUnsignedBulk(points)
default:
for _, p := range points {
a.AggregateUnsigned(&p)
}
}
}
// UnsignedPointEmitter produces a single point from an aggregate.
type UnsignedPointEmitter interface {
Emit() []UnsignedPoint
}
// UnsignedReduceFloatFunc is the function called by a UnsignedPoint reducer.
type UnsignedReduceFloatFunc func(prev *FloatPoint, curr *UnsignedPoint) (t int64, v float64, aux []interface{})
// UnsignedFuncFloatReducer is a reducer that reduces
// the passed in points to a single point using a reduce function.
type UnsignedFuncFloatReducer struct {
prev *FloatPoint
fn UnsignedReduceFloatFunc
}
// NewUnsignedFuncFloatReducer creates a new UnsignedFuncFloatReducer.
func NewUnsignedFuncFloatReducer(fn UnsignedReduceFloatFunc, prev *FloatPoint) *UnsignedFuncFloatReducer {
return &UnsignedFuncFloatReducer{fn: fn, prev: prev}
}
// AggregateUnsigned takes a UnsignedPoint and invokes the reduce function with the
// current and new point to modify the current point.
func (r *UnsignedFuncFloatReducer) AggregateUnsigned(p *UnsignedPoint) {
t, v, aux := r.fn(r.prev, p)
if r.prev == nil {
r.prev = &FloatPoint{}
}
r.prev.Time = t
r.prev.Value = v
r.prev.Aux = aux
if p.Aggregated > 1 {
r.prev.Aggregated += p.Aggregated
} else {
r.prev.Aggregated++
}
}
// Emit emits the point that was generated when reducing the points fed in with AggregateUnsigned.
func (r *UnsignedFuncFloatReducer) Emit() []FloatPoint {
return []FloatPoint{*r.prev}
}
// UnsignedReduceFloatSliceFunc is the function called by a UnsignedPoint reducer.
type UnsignedReduceFloatSliceFunc func(a []UnsignedPoint) []FloatPoint
// UnsignedSliceFuncFloatReducer is a reducer that aggregates
// the passed in points and then invokes the function to reduce the points when they are emitted.
type UnsignedSliceFuncFloatReducer struct {
points []UnsignedPoint
fn UnsignedReduceFloatSliceFunc
}
// NewUnsignedSliceFuncFloatReducer creates a new UnsignedSliceFuncFloatReducer.
func NewUnsignedSliceFuncFloatReducer(fn UnsignedReduceFloatSliceFunc) *UnsignedSliceFuncFloatReducer {
return &UnsignedSliceFuncFloatReducer{fn: fn}
}
// AggregateUnsigned copies the UnsignedPoint into the internal slice to be passed
// to the reduce function when Emit is called.
func (r *UnsignedSliceFuncFloatReducer) AggregateUnsigned(p *UnsignedPoint) {
r.points = append(r.points, *p.Clone())
}
// AggregateUnsignedBulk performs a bulk copy of UnsignedPoints into the internal slice.
// This is a more efficient version of calling AggregateUnsigned on each point.
func (r *UnsignedSliceFuncFloatReducer) AggregateUnsignedBulk(points []UnsignedPoint) {
r.points = append(r.points, points...)
}
// Emit invokes the reduce function on the aggregated points to generate the aggregated points.
// This method does not clear the points from the internal slice.
func (r *UnsignedSliceFuncFloatReducer) Emit() []FloatPoint {
return r.fn(r.points)
}
// UnsignedReduceIntegerFunc is the function called by a UnsignedPoint reducer.
type UnsignedReduceIntegerFunc func(prev *IntegerPoint, curr *UnsignedPoint) (t int64, v int64, aux []interface{})
// UnsignedFuncIntegerReducer is a reducer that reduces
// the passed in points to a single point using a reduce function.
type UnsignedFuncIntegerReducer struct {
prev *IntegerPoint
fn UnsignedReduceIntegerFunc
}
// NewUnsignedFuncIntegerReducer creates a new UnsignedFuncIntegerReducer.
func NewUnsignedFuncIntegerReducer(fn UnsignedReduceIntegerFunc, prev *IntegerPoint) *UnsignedFuncIntegerReducer {
return &UnsignedFuncIntegerReducer{fn: fn, prev: prev}
}
// AggregateUnsigned takes a UnsignedPoint and invokes the reduce function with the
// current and new point to modify the current point.
func (r *UnsignedFuncIntegerReducer) AggregateUnsigned(p *UnsignedPoint) {
t, v, aux := r.fn(r.prev, p)
if r.prev == nil {
r.prev = &IntegerPoint{}
}
r.prev.Time = t
r.prev.Value = v
r.prev.Aux = aux
if p.Aggregated > 1 {
r.prev.Aggregated += p.Aggregated
} else {
r.prev.Aggregated++
}
}
// Emit emits the point that was generated when reducing the points fed in with AggregateUnsigned.
func (r *UnsignedFuncIntegerReducer) Emit() []IntegerPoint {
return []IntegerPoint{*r.prev}
}
// UnsignedReduceIntegerSliceFunc is the function called by a UnsignedPoint reducer.
type UnsignedReduceIntegerSliceFunc func(a []UnsignedPoint) []IntegerPoint
// UnsignedSliceFuncIntegerReducer is a reducer that aggregates
// the passed in points and then invokes the function to reduce the points when they are emitted.
type UnsignedSliceFuncIntegerReducer struct {
points []UnsignedPoint
fn UnsignedReduceIntegerSliceFunc
}
// NewUnsignedSliceFuncIntegerReducer creates a new UnsignedSliceFuncIntegerReducer.
func NewUnsignedSliceFuncIntegerReducer(fn UnsignedReduceIntegerSliceFunc) *UnsignedSliceFuncIntegerReducer {
return &UnsignedSliceFuncIntegerReducer{fn: fn}
}
// AggregateUnsigned copies the UnsignedPoint into the internal slice to be passed
// to the reduce function when Emit is called.
func (r *UnsignedSliceFuncIntegerReducer) AggregateUnsigned(p *UnsignedPoint) {
r.points = append(r.points, *p.Clone())
}
// AggregateUnsignedBulk performs a bulk copy of UnsignedPoints into the internal slice.
// This is a more efficient version of calling AggregateUnsigned on each point.
func (r *UnsignedSliceFuncIntegerReducer) AggregateUnsignedBulk(points []UnsignedPoint) {
r.points = append(r.points, points...)
}
// Emit invokes the reduce function on the aggregated points to generate the aggregated points.
// This method does not clear the points from the internal slice.
func (r *UnsignedSliceFuncIntegerReducer) Emit() []IntegerPoint {
return r.fn(r.points)
}
// UnsignedReduceFunc is the function called by a UnsignedPoint reducer.
type UnsignedReduceFunc func(prev *UnsignedPoint, curr *UnsignedPoint) (t int64, v uint64, aux []interface{})
// UnsignedFuncReducer is a reducer that reduces
// the passed in points to a single point using a reduce function.
type UnsignedFuncReducer struct {
prev *UnsignedPoint
fn UnsignedReduceFunc
}
// NewUnsignedFuncReducer creates a new UnsignedFuncUnsignedReducer.
func NewUnsignedFuncReducer(fn UnsignedReduceFunc, prev *UnsignedPoint) *UnsignedFuncReducer {
return &UnsignedFuncReducer{fn: fn, prev: prev}
}
// AggregateUnsigned takes a UnsignedPoint and invokes the reduce function with the
// current and new point to modify the current point.
func (r *UnsignedFuncReducer) AggregateUnsigned(p *UnsignedPoint) {
t, v, aux := r.fn(r.prev, p)
if r.prev == nil {
r.prev = &UnsignedPoint{}
}
r.prev.Time = t
r.prev.Value = v
r.prev.Aux = aux
if p.Aggregated > 1 {
r.prev.Aggregated += p.Aggregated
} else {
r.prev.Aggregated++
}
}
// Emit emits the point that was generated when reducing the points fed in with AggregateUnsigned.
func (r *UnsignedFuncReducer) Emit() []UnsignedPoint {
return []UnsignedPoint{*r.prev}
}
// UnsignedReduceSliceFunc is the function called by a UnsignedPoint reducer.
type UnsignedReduceSliceFunc func(a []UnsignedPoint) []UnsignedPoint
// UnsignedSliceFuncReducer is a reducer that aggregates
// the passed in points and then invokes the function to reduce the points when they are emitted.
type UnsignedSliceFuncReducer struct {
points []UnsignedPoint
fn UnsignedReduceSliceFunc
}
// NewUnsignedSliceFuncReducer creates a new UnsignedSliceFuncReducer.
func NewUnsignedSliceFuncReducer(fn UnsignedReduceSliceFunc) *UnsignedSliceFuncReducer {
return &UnsignedSliceFuncReducer{fn: fn}
}
// AggregateUnsigned copies the UnsignedPoint into the internal slice to be passed
// to the reduce function when Emit is called.
func (r *UnsignedSliceFuncReducer) AggregateUnsigned(p *UnsignedPoint) {
r.points = append(r.points, *p.Clone())
}
// AggregateUnsignedBulk performs a bulk copy of UnsignedPoints into the internal slice.
// This is a more efficient version of calling AggregateUnsigned on each point.
func (r *UnsignedSliceFuncReducer) AggregateUnsignedBulk(points []UnsignedPoint) {
r.points = append(r.points, points...)
}
// Emit invokes the reduce function on the aggregated points to generate the aggregated points.
// This method does not clear the points from the internal slice.
func (r *UnsignedSliceFuncReducer) Emit() []UnsignedPoint {
return r.fn(r.points)
}
// UnsignedReduceStringFunc is the function called by a UnsignedPoint reducer.
type UnsignedReduceStringFunc func(prev *StringPoint, curr *UnsignedPoint) (t int64, v string, aux []interface{})
// UnsignedFuncStringReducer is a reducer that reduces
// the passed in points to a single point using a reduce function.
type UnsignedFuncStringReducer struct {
prev *StringPoint
fn UnsignedReduceStringFunc
}
// NewUnsignedFuncStringReducer creates a new UnsignedFuncStringReducer.
func NewUnsignedFuncStringReducer(fn UnsignedReduceStringFunc, prev *StringPoint) *UnsignedFuncStringReducer {
return &UnsignedFuncStringReducer{fn: fn, prev: prev}
}
// AggregateUnsigned takes a UnsignedPoint and invokes the reduce function with the
// current and new point to modify the current point.
func (r *UnsignedFuncStringReducer) AggregateUnsigned(p *UnsignedPoint) {
t, v, aux := r.fn(r.prev, p)
if r.prev == nil {
r.prev = &StringPoint{}
}
r.prev.Time = t
r.prev.Value = v
r.prev.Aux = aux
if p.Aggregated > 1 {
r.prev.Aggregated += p.Aggregated
} else {
r.prev.Aggregated++
}
}
// Emit emits the point that was generated when reducing the points fed in with AggregateUnsigned.
func (r *UnsignedFuncStringReducer) Emit() []StringPoint {
return []StringPoint{*r.prev}
}
// UnsignedReduceStringSliceFunc is the function called by a UnsignedPoint reducer.
type UnsignedReduceStringSliceFunc func(a []UnsignedPoint) []StringPoint
// UnsignedSliceFuncStringReducer is a reducer that aggregates
// the passed in points and then invokes the function to reduce the points when they are emitted.
type UnsignedSliceFuncStringReducer struct {
points []UnsignedPoint
fn UnsignedReduceStringSliceFunc
}
// NewUnsignedSliceFuncStringReducer creates a new UnsignedSliceFuncStringReducer.
func NewUnsignedSliceFuncStringReducer(fn UnsignedReduceStringSliceFunc) *UnsignedSliceFuncStringReducer {
return &UnsignedSliceFuncStringReducer{fn: fn}
}
// AggregateUnsigned copies the UnsignedPoint into the internal slice to be passed
// to the reduce function when Emit is called.
func (r *UnsignedSliceFuncStringReducer) AggregateUnsigned(p *UnsignedPoint) {
r.points = append(r.points, *p.Clone())
}
// AggregateUnsignedBulk performs a bulk copy of UnsignedPoints into the internal slice.
// This is a more efficient version of calling AggregateUnsigned on each point.
func (r *UnsignedSliceFuncStringReducer) AggregateUnsignedBulk(points []UnsignedPoint) {
r.points = append(r.points, points...)
}
// Emit invokes the reduce function on the aggregated points to generate the aggregated points.
// This method does not clear the points from the internal slice.
func (r *UnsignedSliceFuncStringReducer) Emit() []StringPoint {
return r.fn(r.points)
}
// UnsignedReduceBooleanFunc is the function called by a UnsignedPoint reducer.
type UnsignedReduceBooleanFunc func(prev *BooleanPoint, curr *UnsignedPoint) (t int64, v bool, aux []interface{})
// UnsignedFuncBooleanReducer is a reducer that reduces
// the passed in points to a single point using a reduce function.
type UnsignedFuncBooleanReducer struct {
prev *BooleanPoint
fn UnsignedReduceBooleanFunc
}
// NewUnsignedFuncBooleanReducer creates a new UnsignedFuncBooleanReducer.
func NewUnsignedFuncBooleanReducer(fn UnsignedReduceBooleanFunc, prev *BooleanPoint) *UnsignedFuncBooleanReducer {
return &UnsignedFuncBooleanReducer{fn: fn, prev: prev}
}
// AggregateUnsigned takes a UnsignedPoint and invokes the reduce function with the
// current and new point to modify the current point.
func (r *UnsignedFuncBooleanReducer) AggregateUnsigned(p *UnsignedPoint) {
t, v, aux := r.fn(r.prev, p)
if r.prev == nil {
r.prev = &BooleanPoint{}
}
r.prev.Time = t
r.prev.Value = v
r.prev.Aux = aux
if p.Aggregated > 1 {
r.prev.Aggregated += p.Aggregated
} else {
r.prev.Aggregated++
}
}
// Emit emits the point that was generated when reducing the points fed in with AggregateUnsigned.
func (r *UnsignedFuncBooleanReducer) Emit() []BooleanPoint {
return []BooleanPoint{*r.prev}
}
// UnsignedReduceBooleanSliceFunc is the function called by a UnsignedPoint reducer.
type UnsignedReduceBooleanSliceFunc func(a []UnsignedPoint) []BooleanPoint
// UnsignedSliceFuncBooleanReducer is a reducer that aggregates
// the passed in points and then invokes the function to reduce the points when they are emitted.
type UnsignedSliceFuncBooleanReducer struct {
points []UnsignedPoint
fn UnsignedReduceBooleanSliceFunc
}
// NewUnsignedSliceFuncBooleanReducer creates a new UnsignedSliceFuncBooleanReducer.
func NewUnsignedSliceFuncBooleanReducer(fn UnsignedReduceBooleanSliceFunc) *UnsignedSliceFuncBooleanReducer {
return &UnsignedSliceFuncBooleanReducer{fn: fn}
}
// AggregateUnsigned copies the UnsignedPoint into the internal slice to be passed
// to the reduce function when Emit is called.
func (r *UnsignedSliceFuncBooleanReducer) AggregateUnsigned(p *UnsignedPoint) {
r.points = append(r.points, *p.Clone())
}
// AggregateUnsignedBulk performs a bulk copy of UnsignedPoints into the internal slice.
// This is a more efficient version of calling AggregateUnsigned on each point.
func (r *UnsignedSliceFuncBooleanReducer) AggregateUnsignedBulk(points []UnsignedPoint) {
r.points = append(r.points, points...)
}
// Emit invokes the reduce function on the aggregated points to generate the aggregated points.
// This method does not clear the points from the internal slice.
func (r *UnsignedSliceFuncBooleanReducer) Emit() []BooleanPoint {
return r.fn(r.points)
}
// UnsignedDistinctReducer returns the distinct points in a series.
type UnsignedDistinctReducer struct {
m map[uint64]UnsignedPoint
}
// NewUnsignedDistinctReducer creates a new UnsignedDistinctReducer.
func NewUnsignedDistinctReducer() *UnsignedDistinctReducer {
return &UnsignedDistinctReducer{m: make(map[uint64]UnsignedPoint)}
}
// AggregateUnsigned aggregates a point into the reducer.
func (r *UnsignedDistinctReducer) AggregateUnsigned(p *UnsignedPoint) {
if _, ok := r.m[p.Value]; !ok {
r.m[p.Value] = *p
}
}
// Emit emits the distinct points that have been aggregated into the reducer.
func (r *UnsignedDistinctReducer) Emit() []UnsignedPoint {
points := make([]UnsignedPoint, 0, len(r.m))
for _, p := range r.m {
points = append(points, UnsignedPoint{Time: p.Time, Value: p.Value})
}
sort.Sort(unsignedPoints(points))
return points
}
// UnsignedElapsedReducer calculates the elapsed of the aggregated points.
type UnsignedElapsedReducer struct {
unitConversion int64
prev UnsignedPoint
curr UnsignedPoint
}
// NewUnsignedElapsedReducer creates a new UnsignedElapsedReducer.
func NewUnsignedElapsedReducer(interval Interval) *UnsignedElapsedReducer {
return &UnsignedElapsedReducer{
unitConversion: int64(interval.Duration),
prev: UnsignedPoint{Nil: true},
curr: UnsignedPoint{Nil: true},
}
}
// AggregateUnsigned aggregates a point into the reducer and updates the current window.
func (r *UnsignedElapsedReducer) AggregateUnsigned(p *UnsignedPoint) {
r.prev = r.curr
r.curr = *p
}
// Emit emits the elapsed of the reducer at the current point.
func (r *UnsignedElapsedReducer) Emit() []IntegerPoint {
if !r.prev.Nil {
elapsed := (r.curr.Time - r.prev.Time) / r.unitConversion
return []IntegerPoint{
{Time: r.curr.Time, Value: elapsed},
}
}
return nil
}
// UnsignedSampleReducer implements a reservoir sampling to calculate a random subset of points
type UnsignedSampleReducer struct {
count int // how many points we've iterated over
rng *rand.Rand // random number generator for each reducer
points unsignedPoints // the reservoir
}
// NewUnsignedSampleReducer creates a new UnsignedSampleReducer
func NewUnsignedSampleReducer(size int) *UnsignedSampleReducer {
return &UnsignedSampleReducer{
rng: rand.New(rand.NewSource(time.Now().UnixNano())), // seed with current time as suggested by https://golang.org/pkg/math/rand/
points: make(unsignedPoints, size),
}
}
// AggregateUnsigned aggregates a point into the reducer.
func (r *UnsignedSampleReducer) AggregateUnsigned(p *UnsignedPoint) {
r.count++
// Fill the reservoir with the first n points
if r.count-1 < len(r.points) {
p.CopyTo(&r.points[r.count-1])
return
}
// Generate a random integer between 1 and the count and
// if that number is less than the length of the slice
// replace the point at that index rnd with p.
rnd := r.rng.Intn(r.count)
if rnd < len(r.points) {
p.CopyTo(&r.points[rnd])
}
}
// Emit emits the reservoir sample as many points.
func (r *UnsignedSampleReducer) Emit() []UnsignedPoint {
min := len(r.points)
if r.count < min {
min = r.count
}
pts := r.points[:min]
sort.Sort(pts)
return pts
}
// StringPointAggregator aggregates points to produce a single point.
type StringPointAggregator interface {
AggregateString(p *StringPoint)
@ -1009,6 +1633,76 @@ func (r *StringSliceFuncIntegerReducer) Emit() []IntegerPoint {
return r.fn(r.points)
}
// StringReduceUnsignedFunc is the function called by a StringPoint reducer.
type StringReduceUnsignedFunc func(prev *UnsignedPoint, curr *StringPoint) (t int64, v uint64, aux []interface{})
// StringFuncUnsignedReducer is a reducer that reduces
// the passed in points to a single point using a reduce function.
type StringFuncUnsignedReducer struct {
prev *UnsignedPoint
fn StringReduceUnsignedFunc
}
// NewStringFuncUnsignedReducer creates a new StringFuncUnsignedReducer.
func NewStringFuncUnsignedReducer(fn StringReduceUnsignedFunc, prev *UnsignedPoint) *StringFuncUnsignedReducer {
return &StringFuncUnsignedReducer{fn: fn, prev: prev}
}
// AggregateString takes a StringPoint and invokes the reduce function with the
// current and new point to modify the current point.
func (r *StringFuncUnsignedReducer) AggregateString(p *StringPoint) {
t, v, aux := r.fn(r.prev, p)
if r.prev == nil {
r.prev = &UnsignedPoint{}
}
r.prev.Time = t
r.prev.Value = v
r.prev.Aux = aux
if p.Aggregated > 1 {
r.prev.Aggregated += p.Aggregated
} else {
r.prev.Aggregated++
}
}
// Emit emits the point that was generated when reducing the points fed in with AggregateString.
func (r *StringFuncUnsignedReducer) Emit() []UnsignedPoint {
return []UnsignedPoint{*r.prev}
}
// StringReduceUnsignedSliceFunc is the function called by a StringPoint reducer.
type StringReduceUnsignedSliceFunc func(a []StringPoint) []UnsignedPoint
// StringSliceFuncUnsignedReducer is a reducer that aggregates
// the passed in points and then invokes the function to reduce the points when they are emitted.
type StringSliceFuncUnsignedReducer struct {
points []StringPoint
fn StringReduceUnsignedSliceFunc
}
// NewStringSliceFuncUnsignedReducer creates a new StringSliceFuncUnsignedReducer.
func NewStringSliceFuncUnsignedReducer(fn StringReduceUnsignedSliceFunc) *StringSliceFuncUnsignedReducer {
return &StringSliceFuncUnsignedReducer{fn: fn}
}
// AggregateString copies the StringPoint into the internal slice to be passed
// to the reduce function when Emit is called.
func (r *StringSliceFuncUnsignedReducer) AggregateString(p *StringPoint) {
r.points = append(r.points, *p.Clone())
}
// AggregateStringBulk performs a bulk copy of StringPoints into the internal slice.
// This is a more efficient version of calling AggregateString on each point.
func (r *StringSliceFuncUnsignedReducer) AggregateStringBulk(points []StringPoint) {
r.points = append(r.points, points...)
}
// Emit invokes the reduce function on the aggregated points to generate the aggregated points.
// This method does not clear the points from the internal slice.
func (r *StringSliceFuncUnsignedReducer) Emit() []UnsignedPoint {
return r.fn(r.points)
}
// StringReduceFunc is the function called by a StringPoint reducer.
type StringReduceFunc func(prev *StringPoint, curr *StringPoint) (t int64, v string, aux []interface{})
@ -1423,6 +2117,76 @@ func (r *BooleanSliceFuncIntegerReducer) Emit() []IntegerPoint {
return r.fn(r.points)
}
// BooleanReduceUnsignedFunc is the function called by a BooleanPoint reducer.
type BooleanReduceUnsignedFunc func(prev *UnsignedPoint, curr *BooleanPoint) (t int64, v uint64, aux []interface{})
// BooleanFuncUnsignedReducer is a reducer that reduces
// the passed in points to a single point using a reduce function.
type BooleanFuncUnsignedReducer struct {
prev *UnsignedPoint
fn BooleanReduceUnsignedFunc
}
// NewBooleanFuncUnsignedReducer creates a new BooleanFuncUnsignedReducer.
func NewBooleanFuncUnsignedReducer(fn BooleanReduceUnsignedFunc, prev *UnsignedPoint) *BooleanFuncUnsignedReducer {
return &BooleanFuncUnsignedReducer{fn: fn, prev: prev}
}
// AggregateBoolean takes a BooleanPoint and invokes the reduce function with the
// current and new point to modify the current point.
func (r *BooleanFuncUnsignedReducer) AggregateBoolean(p *BooleanPoint) {
t, v, aux := r.fn(r.prev, p)
if r.prev == nil {
r.prev = &UnsignedPoint{}
}
r.prev.Time = t
r.prev.Value = v
r.prev.Aux = aux
if p.Aggregated > 1 {
r.prev.Aggregated += p.Aggregated
} else {
r.prev.Aggregated++
}
}
// Emit emits the point that was generated when reducing the points fed in with AggregateBoolean.
func (r *BooleanFuncUnsignedReducer) Emit() []UnsignedPoint {
return []UnsignedPoint{*r.prev}
}
// BooleanReduceUnsignedSliceFunc is the function called by a BooleanPoint reducer.
type BooleanReduceUnsignedSliceFunc func(a []BooleanPoint) []UnsignedPoint
// BooleanSliceFuncUnsignedReducer is a reducer that aggregates
// the passed in points and then invokes the function to reduce the points when they are emitted.
type BooleanSliceFuncUnsignedReducer struct {
points []BooleanPoint
fn BooleanReduceUnsignedSliceFunc
}
// NewBooleanSliceFuncUnsignedReducer creates a new BooleanSliceFuncUnsignedReducer.
func NewBooleanSliceFuncUnsignedReducer(fn BooleanReduceUnsignedSliceFunc) *BooleanSliceFuncUnsignedReducer {
return &BooleanSliceFuncUnsignedReducer{fn: fn}
}
// AggregateBoolean copies the BooleanPoint into the internal slice to be passed
// to the reduce function when Emit is called.
func (r *BooleanSliceFuncUnsignedReducer) AggregateBoolean(p *BooleanPoint) {
r.points = append(r.points, *p.Clone())
}
// AggregateBooleanBulk performs a bulk copy of BooleanPoints into the internal slice.
// This is a more efficient version of calling AggregateBoolean on each point.
func (r *BooleanSliceFuncUnsignedReducer) AggregateBooleanBulk(points []BooleanPoint) {
r.points = append(r.points, points...)
}
// Emit invokes the reduce function on the aggregated points to generate the aggregated points.
// This method does not clear the points from the internal slice.
func (r *BooleanSliceFuncUnsignedReducer) Emit() []UnsignedPoint {
return r.fn(r.points)
}
// BooleanReduceStringFunc is the function called by a BooleanPoint reducer.
type BooleanReduceStringFunc func(prev *StringPoint, curr *BooleanPoint) (t int64, v string, aux []interface{})

View File

@ -46,6 +46,7 @@ type Point struct {
IntegerValue *int64 `protobuf:"varint,8,opt,name=IntegerValue" json:"IntegerValue,omitempty"`
StringValue *string `protobuf:"bytes,9,opt,name=StringValue" json:"StringValue,omitempty"`
BooleanValue *bool `protobuf:"varint,10,opt,name=BooleanValue" json:"BooleanValue,omitempty"`
UnsignedValue *uint64 `protobuf:"varint,12,opt,name=UnsignedValue" json:"UnsignedValue,omitempty"`
Stats *IteratorStats `protobuf:"bytes,11,opt,name=Stats" json:"Stats,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -125,6 +126,13 @@ func (m *Point) GetBooleanValue() bool {
return false
}
func (m *Point) GetUnsignedValue() uint64 {
if m != nil && m.UnsignedValue != nil {
return *m.UnsignedValue
}
return 0
}
func (m *Point) GetStats() *IteratorStats {
if m != nil {
return m.Stats
@ -138,6 +146,7 @@ type Aux struct {
IntegerValue *int64 `protobuf:"varint,3,opt,name=IntegerValue" json:"IntegerValue,omitempty"`
StringValue *string `protobuf:"bytes,4,opt,name=StringValue" json:"StringValue,omitempty"`
BooleanValue *bool `protobuf:"varint,5,opt,name=BooleanValue" json:"BooleanValue,omitempty"`
UnsignedValue *uint64 `protobuf:"varint,6,opt,name=UnsignedValue" json:"UnsignedValue,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -181,6 +190,13 @@ func (m *Aux) GetBooleanValue() bool {
return false
}
func (m *Aux) GetUnsignedValue() uint64 {
if m != nil && m.UnsignedValue != nil {
return *m.UnsignedValue
}
return 0
}
type IteratorOptions struct {
Expr *string `protobuf:"bytes,1,opt,name=Expr" json:"Expr,omitempty"`
Aux []string `protobuf:"bytes,2,rep,name=Aux" json:"Aux,omitempty"`
@ -513,52 +529,53 @@ func init() {
func init() { proto.RegisterFile("internal/internal.proto", fileDescriptorInternal) }
var fileDescriptorInternal = []byte{
// 737 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x84, 0x54, 0xdd, 0x6a, 0xdb, 0x4a,
0x10, 0x46, 0x92, 0xe5, 0x58, 0xeb, 0xf8, 0x24, 0x67, 0x4f, 0x7e, 0x96, 0xc3, 0xe1, 0x54, 0xe8,
0x4a, 0x50, 0xea, 0x40, 0x6e, 0x0b, 0x05, 0xa7, 0x49, 0x8a, 0x21, 0x71, 0xc2, 0x2a, 0xe4, 0x7e,
0x6b, 0x8d, 0xc5, 0x82, 0x2c, 0xb9, 0xab, 0x55, 0x71, 0x1e, 0xa5, 0xcf, 0xd0, 0x87, 0xe9, 0xab,
0xf4, 0x11, 0xca, 0xce, 0x4a, 0x96, 0x92, 0x42, 0x73, 0xa5, 0xf9, 0xbe, 0x99, 0x1d, 0xed, 0xcc,
0x37, 0x3b, 0xe4, 0x54, 0x16, 0x1a, 0x54, 0x21, 0xf2, 0xb3, 0xd6, 0x98, 0x6e, 0x54, 0xa9, 0x4b,
0x3a, 0x92, 0xc5, 0x2a, 0xaf, 0xb7, 0x5f, 0xf2, 0xe8, 0x87, 0x4b, 0xfc, 0xfb, 0x52, 0x16, 0x9a,
0x52, 0x32, 0x58, 0x88, 0x35, 0x30, 0x27, 0x74, 0xe3, 0x80, 0xa3, 0x6d, 0xb8, 0x07, 0x91, 0x55,
0xcc, 0xb5, 0x9c, 0xb1, 0x91, 0x93, 0x6b, 0x60, 0x5e, 0xe8, 0xc6, 0x1e, 0x47, 0x9b, 0x1e, 0x12,
0x6f, 0x21, 0x73, 0x36, 0x08, 0xdd, 0x78, 0xc4, 0x8d, 0x49, 0xdf, 0x10, 0x6f, 0x56, 0x6f, 0x99,
0x1f, 0x7a, 0xf1, 0xf8, 0x7c, 0x32, 0x6d, 0xff, 0x37, 0x9d, 0xd5, 0x5b, 0x6e, 0x3c, 0xf4, 0x7f,
0x42, 0x66, 0x59, 0xa6, 0x20, 0x13, 0x1a, 0x52, 0x36, 0x0c, 0x9d, 0x78, 0xc2, 0x7b, 0x8c, 0xf1,
0x5f, 0xe7, 0xa5, 0xd0, 0x8f, 0x22, 0xaf, 0x81, 0xed, 0x85, 0x4e, 0xec, 0xf0, 0x1e, 0x43, 0x23,
0xb2, 0x3f, 0x2f, 0x34, 0x64, 0xa0, 0x6c, 0xc4, 0x28, 0x74, 0x62, 0x8f, 0x3f, 0xe3, 0x68, 0x48,
0xc6, 0x89, 0x56, 0xb2, 0xc8, 0x6c, 0x48, 0x10, 0x3a, 0x71, 0xc0, 0xfb, 0x94, 0xc9, 0x72, 0x51,
0x96, 0x39, 0x88, 0xc2, 0x86, 0x90, 0xd0, 0x89, 0x47, 0xfc, 0x19, 0x47, 0xdf, 0x11, 0x3f, 0xd1,
0x42, 0x57, 0x6c, 0x1c, 0x3a, 0xf1, 0xf8, 0xfc, 0xb4, 0x2b, 0x66, 0xae, 0x41, 0x09, 0x5d, 0x2a,
0x74, 0x73, 0x1b, 0x15, 0x7d, 0x77, 0xb0, 0x74, 0xfa, 0x2f, 0x19, 0x5d, 0x0a, 0x2d, 0x1e, 0x9e,
0x36, 0xb6, 0xa7, 0x3e, 0xdf, 0xe1, 0x17, 0xc5, 0xb9, 0xaf, 0x16, 0xe7, 0xbd, 0x5e, 0xdc, 0xe0,
0xf5, 0xe2, 0xfc, 0xdf, 0x8b, 0x8b, 0x7e, 0x0e, 0xc8, 0x41, 0x5b, 0xc6, 0xdd, 0x46, 0xcb, 0xb2,
0x40, 0x85, 0xaf, 0xb6, 0x1b, 0xc5, 0x1c, 0x4c, 0x89, 0xb6, 0x51, 0xd8, 0xe8, 0xe9, 0x86, 0x5e,
0x1c, 0x58, 0x01, 0x63, 0x32, 0xbc, 0x96, 0x90, 0xa7, 0x15, 0xfb, 0x1b, 0x45, 0x3e, 0xec, 0xfa,
0xf2, 0x28, 0x14, 0x87, 0x15, 0x6f, 0xfc, 0xf4, 0x8c, 0xec, 0x25, 0x65, 0xad, 0x96, 0x50, 0x31,
0x0f, 0x43, 0x8f, 0xbb, 0xd0, 0x5b, 0x10, 0x55, 0xad, 0x60, 0x0d, 0x85, 0xe6, 0x6d, 0x14, 0x9d,
0x92, 0x91, 0x29, 0x55, 0x7d, 0x15, 0x39, 0xd6, 0x35, 0x3e, 0xa7, 0xbd, 0xa6, 0x37, 0x1e, 0xbe,
0x8b, 0x31, 0xed, 0xbc, 0x94, 0x6b, 0x28, 0x2a, 0x73, 0x7d, 0x9c, 0xb9, 0x80, 0xf7, 0x18, 0xca,
0xc8, 0xde, 0x27, 0x55, 0xd6, 0x9b, 0x8b, 0x27, 0xf6, 0x0f, 0x3a, 0x5b, 0x68, 0x4a, 0xbd, 0x96,
0x79, 0x8e, 0xf3, 0xe7, 0x73, 0xb4, 0xe9, 0x7f, 0x24, 0x30, 0xdf, 0xfe, 0xe0, 0x75, 0x84, 0xf1,
0x7e, 0x2c, 0x8b, 0x54, 0x9a, 0x56, 0xe1, 0xd0, 0x05, 0xbc, 0x23, 0x8c, 0x37, 0xd1, 0x42, 0x69,
0x7c, 0x21, 0x01, 0xaa, 0xd6, 0x11, 0xe6, 0x1e, 0x57, 0x45, 0x8a, 0x3e, 0x82, 0xbe, 0x16, 0x9a,
0x61, 0xb9, 0x29, 0x97, 0x02, 0x93, 0x1e, 0x63, 0xd2, 0x1d, 0x36, 0x39, 0x67, 0xd5, 0x12, 0x8a,
0x54, 0x16, 0x19, 0xce, 0xe0, 0x88, 0x77, 0x04, 0x3d, 0x22, 0xfe, 0x8d, 0x5c, 0x4b, 0xcd, 0xf6,
0x31, 0xa3, 0x05, 0xf4, 0x84, 0x0c, 0xef, 0x56, 0xab, 0x0a, 0x34, 0x9b, 0x20, 0xdd, 0x20, 0xc3,
0x27, 0x36, 0xfc, 0x2f, 0xcb, 0x5b, 0x64, 0x6e, 0x96, 0x34, 0x07, 0x0e, 0xec, 0xcd, 0x92, 0xee,
0xc4, 0x25, 0xa4, 0xf5, 0x06, 0xd8, 0x21, 0xfe, 0xba, 0x41, 0xa6, 0xe7, 0xb7, 0x62, 0x9b, 0x80,
0x92, 0x50, 0x2d, 0x18, 0xc5, 0x43, 0x3d, 0xc6, 0x64, 0xbc, 0x53, 0x29, 0x28, 0x48, 0xd9, 0x11,
0x1e, 0x6c, 0x61, 0xf4, 0x9e, 0xec, 0xf7, 0x54, 0xaf, 0xe8, 0x5b, 0xe2, 0xcf, 0x35, 0xac, 0x2b,
0xe6, 0xfc, 0x69, 0x38, 0x6c, 0x4c, 0xf4, 0xcd, 0x21, 0xe3, 0x1e, 0xdd, 0xbe, 0xb2, 0xcf, 0xa2,
0x82, 0x66, 0x5e, 0x77, 0x98, 0xc6, 0xe4, 0x80, 0x83, 0x86, 0xc2, 0x74, 0xf1, 0xbe, 0xcc, 0xe5,
0xf2, 0x09, 0x9f, 0x5a, 0xc0, 0x5f, 0xd2, 0xbb, 0xdd, 0xe7, 0xd9, 0x89, 0xc7, 0xdd, 0x77, 0x44,
0x7c, 0x0e, 0x19, 0x6c, 0x9b, 0x97, 0x65, 0x81, 0xf9, 0xdf, 0xbc, 0x7a, 0x10, 0x2a, 0x03, 0xdd,
0xbc, 0xa7, 0x1d, 0x8e, 0x3e, 0x74, 0x63, 0x8b, 0xf7, 0xaa, 0x95, 0x15, 0xd4, 0xc1, 0xe6, 0xec,
0x70, 0x4f, 0x1c, 0xb7, 0x2f, 0x4e, 0x34, 0x23, 0x93, 0x67, 0x1b, 0x05, 0x55, 0x69, 0x1a, 0xec,
0x34, 0xaa, 0x34, 0xdd, 0x3d, 0x21, 0x43, 0xdc, 0xda, 0x8b, 0x36, 0x85, 0x45, 0xd1, 0x94, 0x0c,
0xed, 0xe3, 0x33, 0x0f, 0xf6, 0x51, 0xe4, 0xcd, 0x36, 0x37, 0x26, 0x2e, 0x6e, 0xb3, 0x8c, 0x5c,
0x3b, 0xeb, 0xc6, 0xfe, 0x15, 0x00, 0x00, 0xff, 0xff, 0xca, 0x3e, 0x5e, 0x08, 0x22, 0x06, 0x00,
0x00,
// 759 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x55, 0x5f, 0x6f, 0xfb, 0x34,
0x14, 0x55, 0x92, 0xa6, 0x6b, 0xdc, 0x95, 0x0d, 0xb3, 0xdf, 0xef, 0x67, 0x21, 0x04, 0x51, 0xc4,
0x43, 0x24, 0x44, 0x27, 0xed, 0x15, 0x09, 0xa9, 0x63, 0x1b, 0xaa, 0xb4, 0x75, 0x93, 0x33, 0xf6,
0x6e, 0x9a, 0xdb, 0xc8, 0x52, 0xea, 0x14, 0xc7, 0x41, 0xdd, 0x47, 0xe1, 0x63, 0xf1, 0x4d, 0x78,
0xe1, 0x1d, 0xf9, 0x3a, 0x69, 0xd2, 0x09, 0x31, 0x9e, 0x72, 0xcf, 0xb9, 0xd7, 0xff, 0xce, 0x3d,
0x76, 0xc8, 0x27, 0xa9, 0x0c, 0x68, 0x25, 0xca, 0xcb, 0x2e, 0x98, 0xef, 0x74, 0x65, 0x2a, 0x3a,
0x91, 0x6a, 0x53, 0x36, 0xfb, 0xdf, 0xca, 0xe4, 0x6f, 0x9f, 0x84, 0x4f, 0x95, 0x54, 0x86, 0x52,
0x32, 0x5a, 0x89, 0x2d, 0x30, 0x2f, 0xf6, 0xd3, 0x88, 0x63, 0x6c, 0xb9, 0x67, 0x51, 0xd4, 0xcc,
0x77, 0x9c, 0x8d, 0x91, 0x93, 0x5b, 0x60, 0x41, 0xec, 0xa7, 0x01, 0xc7, 0x98, 0x9e, 0x93, 0x60,
0x25, 0x4b, 0x36, 0x8a, 0xfd, 0x74, 0xc2, 0x6d, 0x48, 0xbf, 0x21, 0xc1, 0xa2, 0xd9, 0xb3, 0x30,
0x0e, 0xd2, 0xe9, 0xd5, 0x6c, 0xde, 0xad, 0x37, 0x5f, 0x34, 0x7b, 0x6e, 0x33, 0xf4, 0x6b, 0x42,
0x16, 0x45, 0xa1, 0xa1, 0x10, 0x06, 0x72, 0x36, 0x8e, 0xbd, 0x74, 0xc6, 0x07, 0x8c, 0xcd, 0xdf,
0x95, 0x95, 0x30, 0x2f, 0xa2, 0x6c, 0x80, 0x9d, 0xc4, 0x5e, 0xea, 0xf1, 0x01, 0x43, 0x13, 0x72,
0xba, 0x54, 0x06, 0x0a, 0xd0, 0xae, 0x62, 0x12, 0x7b, 0x69, 0xc0, 0x8f, 0x38, 0x1a, 0x93, 0x69,
0x66, 0xb4, 0x54, 0x85, 0x2b, 0x89, 0x62, 0x2f, 0x8d, 0xf8, 0x90, 0xb2, 0xb3, 0x5c, 0x57, 0x55,
0x09, 0x42, 0xb9, 0x12, 0x12, 0x7b, 0xe9, 0x84, 0x1f, 0x71, 0xf4, 0x5b, 0x32, 0xfb, 0x45, 0xd5,
0xb2, 0x50, 0x90, 0xbb, 0xa2, 0xd3, 0xd8, 0x4b, 0x47, 0xfc, 0x98, 0xa4, 0xdf, 0x93, 0x30, 0x33,
0xc2, 0xd4, 0x6c, 0x1a, 0x7b, 0xe9, 0xf4, 0xea, 0x53, 0x7f, 0xe4, 0xa5, 0x01, 0x2d, 0x4c, 0xa5,
0x31, 0xcd, 0x5d, 0x55, 0xf2, 0xa7, 0x87, 0x02, 0xd1, 0x2f, 0xc9, 0xe4, 0x46, 0x18, 0xf1, 0xfc,
0xba, 0x73, 0xca, 0x87, 0xfc, 0x80, 0xdf, 0x48, 0xe0, 0xbf, 0x2b, 0x41, 0xf0, 0xbe, 0x04, 0xa3,
0xf7, 0x25, 0x08, 0xff, 0x8f, 0x04, 0xe3, 0x7f, 0x91, 0x20, 0xf9, 0x6b, 0x44, 0xce, 0xba, 0xc3,
0x3e, 0xee, 0x8c, 0xac, 0x14, 0xba, 0xe5, 0x76, 0xbf, 0xd3, 0xcc, 0xc3, 0x85, 0x31, 0xb6, 0x6e,
0xb1, 0xde, 0xf0, 0xe3, 0x20, 0x8d, 0x9c, 0x19, 0x52, 0x32, 0xbe, 0x93, 0x50, 0xe6, 0x35, 0xfb,
0x1c, 0x0d, 0x73, 0xde, 0xab, 0xf7, 0x22, 0x34, 0x87, 0x0d, 0x6f, 0xf3, 0xf4, 0x92, 0x9c, 0x64,
0x55, 0xa3, 0xd7, 0x50, 0xb3, 0x00, 0x4b, 0x3f, 0xf4, 0xa5, 0x0f, 0x20, 0xea, 0x46, 0xc3, 0x16,
0x94, 0xe1, 0x5d, 0x15, 0x9d, 0x93, 0x89, 0x15, 0x44, 0xff, 0x2e, 0x4a, 0x3c, 0xfd, 0xf4, 0x8a,
0x0e, 0x5a, 0xd3, 0x66, 0xf8, 0xa1, 0xc6, 0x8a, 0x7e, 0x23, 0xb7, 0xa0, 0x6a, 0xbb, 0x7d, 0xf4,
0x6f, 0xc4, 0x07, 0x0c, 0x65, 0xe4, 0xe4, 0x67, 0x5d, 0x35, 0xbb, 0xeb, 0x57, 0xf6, 0x05, 0x26,
0x3b, 0x68, 0x8f, 0x7a, 0x27, 0xcb, 0x12, 0xb5, 0x09, 0x39, 0xc6, 0xf4, 0x2b, 0x12, 0xd9, 0xef,
0xd0, 0xc4, 0x3d, 0x61, 0xb3, 0x3f, 0x55, 0x2a, 0x97, 0x56, 0x2a, 0x34, 0x70, 0xc4, 0x7b, 0xc2,
0x66, 0x33, 0x23, 0xb4, 0xc1, 0xdb, 0x16, 0x61, 0x6f, 0x7b, 0xc2, 0xee, 0xe3, 0x56, 0xe5, 0x98,
0x23, 0x98, 0xeb, 0xa0, 0xb5, 0xd4, 0x7d, 0xb5, 0x16, 0x38, 0xe9, 0x07, 0x9c, 0xf4, 0x80, 0xed,
0x9c, 0x8b, 0x7a, 0x0d, 0x2a, 0x97, 0xaa, 0x40, 0xa7, 0x4e, 0x78, 0x4f, 0xd0, 0x0b, 0x12, 0xde,
0xcb, 0xad, 0x34, 0xe8, 0xf0, 0x80, 0x3b, 0x40, 0x3f, 0x92, 0xf1, 0xe3, 0x66, 0x53, 0x83, 0x61,
0x33, 0xa4, 0x5b, 0x64, 0xf9, 0xcc, 0x95, 0x7f, 0xe6, 0x78, 0x87, 0xec, 0xce, 0xb2, 0x76, 0xc0,
0x99, 0xdb, 0x59, 0xd6, 0x8f, 0xb8, 0x81, 0xbc, 0xd9, 0x01, 0x3b, 0xc7, 0xa5, 0x5b, 0x64, 0x35,
0x7f, 0x10, 0xfb, 0x0c, 0xb4, 0x84, 0x7a, 0xc5, 0x28, 0x0e, 0x1a, 0x30, 0x76, 0xc6, 0x47, 0x9d,
0x83, 0x86, 0x9c, 0x5d, 0xe0, 0xc0, 0x0e, 0x26, 0x3f, 0x90, 0xd3, 0x41, 0xd7, 0x6b, 0xfa, 0x1d,
0x09, 0x97, 0x06, 0xb6, 0x35, 0xf3, 0xfe, 0xcb, 0x1c, 0xae, 0x26, 0xf9, 0xc3, 0x23, 0xd3, 0x01,
0xdd, 0xdd, 0xc5, 0x5f, 0x45, 0x0d, 0xad, 0x5f, 0x0f, 0x98, 0xa6, 0xe4, 0x8c, 0x83, 0x01, 0x65,
0x55, 0x7c, 0xaa, 0x4a, 0xb9, 0x7e, 0xc5, 0x0b, 0x19, 0xf1, 0xb7, 0xf4, 0xe1, 0x1d, 0x0d, 0x9c,
0xe3, 0xf1, 0x1d, 0xbd, 0x20, 0x21, 0x87, 0x02, 0xf6, 0xed, 0xfd, 0x73, 0xc0, 0xae, 0xb7, 0xac,
0x9f, 0x85, 0x2e, 0xc0, 0xb4, 0xb7, 0xee, 0x80, 0x93, 0x1f, 0x7b, 0xdb, 0xe2, 0xbe, 0x1a, 0xed,
0x1a, 0xea, 0xa1, 0x38, 0x07, 0x3c, 0x68, 0x8e, 0x3f, 0x6c, 0x4e, 0xb2, 0x20, 0xb3, 0xa3, 0x77,
0x07, 0xbb, 0xd2, 0x0a, 0xec, 0xb5, 0x5d, 0x69, 0xd5, 0xfd, 0x48, 0xc6, 0xf8, 0x07, 0x58, 0x75,
0x53, 0x38, 0x94, 0xcc, 0xc9, 0xd8, 0x5d, 0x3e, 0x7b, 0x61, 0x5f, 0x44, 0xd9, 0xfe, 0x19, 0x6c,
0x88, 0x3f, 0x01, 0xfb, 0x64, 0xf9, 0xce, 0xeb, 0x36, 0xfe, 0x27, 0x00, 0x00, 0xff, 0xff, 0xcd,
0x79, 0x50, 0xb8, 0x6e, 0x06, 0x00, 0x00,
}

View File

@ -9,20 +9,22 @@ message Point {
repeated Aux Aux = 5;
optional uint32 Aggregated = 6;
optional double FloatValue = 7;
optional int64 IntegerValue = 8;
optional string StringValue = 9;
optional bool BooleanValue = 10;
optional double FloatValue = 7;
optional int64 IntegerValue = 8;
optional string StringValue = 9;
optional bool BooleanValue = 10;
optional uint64 UnsignedValue = 12;
optional IteratorStats Stats = 11;
}
message Aux {
required int32 DataType = 1;
optional double FloatValue = 2;
optional int64 IntegerValue = 3;
optional string StringValue = 4;
optional bool BooleanValue = 5;
required int32 DataType = 1;
optional double FloatValue = 2;
optional int64 IntegerValue = 3;
optional string StringValue = 4;
optional bool BooleanValue = 5;
optional uint64 UnsignedValue = 6;
}
message IteratorOptions {

File diff suppressed because it is too large Load Diff

View File

@ -436,6 +436,215 @@ func (dec *IntegerPointDecoder) DecodeIntegerPoint(p *IntegerPoint) error {
}
}
// UnsignedPoint represents a point with a uint64 value.
// DO NOT ADD ADDITIONAL FIELDS TO THIS STRUCT.
// See TestPoint_Fields in influxql/point_test.go for more details.
type UnsignedPoint struct {
Name string
Tags Tags
Time int64
Nil bool
Value uint64
Aux []interface{}
// Total number of points that were combined into this point from an aggregate.
// If this is zero, the point is not the result of an aggregate function.
Aggregated uint32
}
func (v *UnsignedPoint) name() string { return v.Name }
func (v *UnsignedPoint) tags() Tags { return v.Tags }
func (v *UnsignedPoint) time() int64 { return v.Time }
func (v *UnsignedPoint) nil() bool { return v.Nil }
func (v *UnsignedPoint) value() interface{} {
if v.Nil {
return nil
}
return v.Value
}
func (v *UnsignedPoint) aux() []interface{} { return v.Aux }
// Clone returns a copy of v.
func (v *UnsignedPoint) Clone() *UnsignedPoint {
if v == nil {
return nil
}
other := *v
if v.Aux != nil {
other.Aux = make([]interface{}, len(v.Aux))
copy(other.Aux, v.Aux)
}
return &other
}
// CopyTo makes a deep copy into the point.
func (v *UnsignedPoint) CopyTo(other *UnsignedPoint) {
*other = *v
if v.Aux != nil {
other.Aux = make([]interface{}, len(v.Aux))
copy(other.Aux, v.Aux)
}
}
func encodeUnsignedPoint(p *UnsignedPoint) *internal.Point {
return &internal.Point{
Name: proto.String(p.Name),
Tags: proto.String(p.Tags.ID()),
Time: proto.Int64(p.Time),
Nil: proto.Bool(p.Nil),
Aux: encodeAux(p.Aux),
Aggregated: proto.Uint32(p.Aggregated),
}
}
func decodeUnsignedPoint(pb *internal.Point) *UnsignedPoint {
return &UnsignedPoint{
Name: pb.GetName(),
Tags: newTagsID(pb.GetTags()),
Time: pb.GetTime(),
Nil: pb.GetNil(),
Aux: decodeAux(pb.Aux),
Aggregated: pb.GetAggregated(),
Value: pb.GetUnsignedValue(),
}
}
// unsignedPoints represents a slice of points sortable by value.
type unsignedPoints []UnsignedPoint
func (a unsignedPoints) Len() int { return len(a) }
func (a unsignedPoints) Less(i, j int) bool {
if a[i].Time != a[j].Time {
return a[i].Time < a[j].Time
}
return a[i].Value < a[j].Value
}
func (a unsignedPoints) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
// unsignedPointsByValue represents a slice of points sortable by value.
type unsignedPointsByValue []UnsignedPoint
func (a unsignedPointsByValue) Len() int { return len(a) }
func (a unsignedPointsByValue) Less(i, j int) bool { return a[i].Value < a[j].Value }
func (a unsignedPointsByValue) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
// unsignedPointsByTime represents a slice of points sortable by value.
type unsignedPointsByTime []UnsignedPoint
func (a unsignedPointsByTime) Len() int { return len(a) }
func (a unsignedPointsByTime) Less(i, j int) bool { return a[i].Time < a[j].Time }
func (a unsignedPointsByTime) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
// unsignedPointByFunc represents a slice of points sortable by a function.
type unsignedPointsByFunc struct {
points []UnsignedPoint
cmp func(a, b *UnsignedPoint) bool
}
func (a *unsignedPointsByFunc) Len() int { return len(a.points) }
func (a *unsignedPointsByFunc) Less(i, j int) bool { return a.cmp(&a.points[i], &a.points[j]) }
func (a *unsignedPointsByFunc) Swap(i, j int) { a.points[i], a.points[j] = a.points[j], a.points[i] }
func (a *unsignedPointsByFunc) Push(x interface{}) {
a.points = append(a.points, x.(UnsignedPoint))
}
func (a *unsignedPointsByFunc) Pop() interface{} {
p := a.points[len(a.points)-1]
a.points = a.points[:len(a.points)-1]
return p
}
func unsignedPointsSortBy(points []UnsignedPoint, cmp func(a, b *UnsignedPoint) bool) *unsignedPointsByFunc {
return &unsignedPointsByFunc{
points: points,
cmp: cmp,
}
}
// UnsignedPointEncoder encodes UnsignedPoint points to a writer.
type UnsignedPointEncoder struct {
w io.Writer
}
// NewUnsignedPointEncoder returns a new instance of UnsignedPointEncoder that writes to w.
func NewUnsignedPointEncoder(w io.Writer) *UnsignedPointEncoder {
return &UnsignedPointEncoder{w: w}
}
// EncodeUnsignedPoint marshals and writes p to the underlying writer.
func (enc *UnsignedPointEncoder) EncodeUnsignedPoint(p *UnsignedPoint) error {
// Marshal to bytes.
buf, err := proto.Marshal(encodeUnsignedPoint(p))
if err != nil {
return err
}
// Write the length.
if err := binary.Write(enc.w, binary.BigEndian, uint32(len(buf))); err != nil {
return err
}
// Write the encoded point.
if _, err := enc.w.Write(buf); err != nil {
return err
}
return nil
}
// UnsignedPointDecoder decodes UnsignedPoint points from a reader.
type UnsignedPointDecoder struct {
r io.Reader
stats IteratorStats
}
// NewUnsignedPointDecoder returns a new instance of UnsignedPointDecoder that reads from r.
func NewUnsignedPointDecoder(r io.Reader) *UnsignedPointDecoder {
return &UnsignedPointDecoder{r: r}
}
// Stats returns iterator stats embedded within the stream.
func (dec *UnsignedPointDecoder) Stats() IteratorStats { return dec.stats }
// DecodeUnsignedPoint reads from the underlying reader and unmarshals into p.
func (dec *UnsignedPointDecoder) DecodeUnsignedPoint(p *UnsignedPoint) error {
for {
// Read length.
var sz uint32
if err := binary.Read(dec.r, binary.BigEndian, &sz); err != nil {
return err
}
// Read point data.
buf := make([]byte, sz)
if _, err := io.ReadFull(dec.r, buf); err != nil {
return err
}
// Unmarshal into point.
var pb internal.Point
if err := proto.Unmarshal(buf, &pb); err != nil {
return err
}
// If the point contains stats then read stats and retry.
if pb.Stats != nil {
dec.stats = decodeIteratorStats(pb.Stats)
continue
}
// Decode into point object.
*p = *decodeUnsignedPoint(&pb)
return nil
}
}
// StringPoint represents a point with a string value.
// DO NOT ADD ADDITIONAL FIELDS TO THIS STRUCT.
// See TestPoint_Fields in influxql/point_test.go for more details.

View File

@ -13,6 +13,13 @@
"Nil":"0",
"Zero":"int64(0)"
},
{
"Name":"Unsigned",
"name":"unsigned",
"Type":"uint64",
"Nil":"0",
"Zero":"uint64(0)"
},
{
"Name":"String",
"name":"string",

View File

@ -12,6 +12,12 @@ func parseIntBytes(b []byte, base int, bitSize int) (i int64, err error) {
return strconv.ParseInt(s, base, bitSize)
}
// parseUintBytes is a zero-alloc wrapper around strconv.ParseUint.
func parseUintBytes(b []byte, base int, bitSize int) (i uint64, err error) {
s := unsafeBytesToString(b)
return strconv.ParseUint(s, base, bitSize)
}
// parseFloatBytes is a zero-alloc wrapper around strconv.ParseFloat.
func parseFloatBytes(b []byte, bitSize int) (float64, error) {
s := unsafeBytesToString(b)

View File

@ -137,6 +137,9 @@ const (
// Empty is used to indicate that there is no field.
Empty
// Unsigned indicates the field's type is an unsigned integer.
Unsigned
)
// FieldIterator provides a low-allocation interface to iterate through a point's fields.
@ -156,6 +159,9 @@ type FieldIterator interface {
// IntegerValue returns the integer value of the current field.
IntegerValue() (int64, error)
// UnsignedValue returns the unsigned value of the current field.
UnsignedValue() (uint64, error)
// BooleanValue returns the boolean value of the current field.
BooleanValue() (bool, error)
@ -205,6 +211,12 @@ type point struct {
it fieldIterator
}
// type assertions
var (
_ Point = (*point)(nil)
_ FieldIterator = (*point)(nil)
)
const (
// the number of characters for the largest possible int64 (9223372036854775807)
maxInt64Digits = 19
@ -1315,6 +1327,11 @@ func NewPointFromBytes(b []byte) (Point, error) {
if err != nil {
return nil, fmt.Errorf("unable to unmarshal field %s: %s", string(iter.FieldKey()), err)
}
case Unsigned:
_, err := iter.UnsignedValue()
if err != nil {
return nil, fmt.Errorf("unable to unmarshal field %s: %s", string(iter.FieldKey()), err)
}
case String:
// Skip since this won't return an error
case Boolean:
@ -1678,6 +1695,12 @@ func (p *point) unmarshalBinary() (Fields, error) {
return nil, fmt.Errorf("unable to unmarshal field %s: %s", string(iter.FieldKey()), err)
}
fields[string(iter.FieldKey())] = v
case Unsigned:
v, err := iter.UnsignedValue()
if err != nil {
return nil, fmt.Errorf("unable to unmarshal field %s: %s", string(iter.FieldKey()), err)
}
fields[string(iter.FieldKey())] = v
case String:
fields[string(iter.FieldKey())] = iter.StringValue()
case Boolean:
@ -2110,6 +2133,15 @@ func (p *point) IntegerValue() (int64, error) {
return n, nil
}
// UnsignedValue returns the unsigned value of the current field.
func (p *point) UnsignedValue() (uint64, error) {
n, err := parseUintBytes(p.it.valueBuf, 10, 64)
if err != nil {
return 0, fmt.Errorf("unable to parse unsigned value %q: %v", p.it.valueBuf, err)
}
return n, nil
}
// BooleanValue returns the boolean value of the current field.
func (p *point) BooleanValue() (bool, error) {
b, err := parseBoolBytes(p.it.valueBuf)

View File

@ -438,6 +438,220 @@ func (k *tsmKeyIterator) chunkInteger(dst blocks) blocks {
return dst
}
// merge combines the next set of blocks into merged blocks.
func (k *tsmKeyIterator) mergeUnsigned() {
// No blocks left, or pending merged values, we're done
if len(k.blocks) == 0 && len(k.merged) == 0 && len(k.mergedUnsignedValues) == 0 {
return
}
dedup := len(k.mergedUnsignedValues) != 0
if len(k.blocks) > 0 && !dedup {
// If we have more than one block or any partially tombstoned blocks, we many need to dedup
dedup = len(k.blocks[0].tombstones) > 0 || k.blocks[0].partiallyRead()
// Quickly scan each block to see if any overlap with the prior block, if they overlap then
// we need to dedup as there may be duplicate points now
for i := 1; !dedup && i < len(k.blocks); i++ {
if k.blocks[i].partiallyRead() {
dedup = true
break
}
if k.blocks[i].minTime <= k.blocks[i-1].maxTime || len(k.blocks[i].tombstones) > 0 {
dedup = true
break
}
}
}
k.merged = k.combineUnsigned(dedup)
}
// combine returns a new set of blocks using the current blocks in the buffers. If dedup
// is true, all the blocks will be decoded, dedup and sorted in in order. If dedup is false,
// only blocks that are smaller than the chunk size will be decoded and combined.
func (k *tsmKeyIterator) combineUnsigned(dedup bool) blocks {
if dedup {
for len(k.mergedUnsignedValues) < k.size && len(k.blocks) > 0 {
for len(k.blocks) > 0 && k.blocks[0].read() {
k.blocks = k.blocks[1:]
}
if len(k.blocks) == 0 {
break
}
first := k.blocks[0]
minTime := first.minTime
maxTime := first.maxTime
// Adjust the min time to the start of any overlapping blocks.
for i := 0; i < len(k.blocks); i++ {
if k.blocks[i].overlapsTimeRange(minTime, maxTime) && !k.blocks[i].read() {
if k.blocks[i].minTime < minTime {
minTime = k.blocks[i].minTime
}
if k.blocks[i].maxTime > minTime && k.blocks[i].maxTime < maxTime {
maxTime = k.blocks[i].maxTime
}
}
}
// We have some overlapping blocks so decode all, append in order and then dedup
for i := 0; i < len(k.blocks); i++ {
if !k.blocks[i].overlapsTimeRange(minTime, maxTime) || k.blocks[i].read() {
continue
}
v, err := DecodeUnsignedBlock(k.blocks[i].b, &[]UnsignedValue{})
if err != nil {
k.err = err
return nil
}
// Remove values we already read
v = UnsignedValues(v).Exclude(k.blocks[i].readMin, k.blocks[i].readMax)
// Filter out only the values for overlapping block
v = UnsignedValues(v).Include(minTime, maxTime)
if len(v) > 0 {
// Record that we read a subset of the block
k.blocks[i].markRead(v[0].UnixNano(), v[len(v)-1].UnixNano())
}
// Apply each tombstone to the block
for _, ts := range k.blocks[i].tombstones {
v = UnsignedValues(v).Exclude(ts.Min, ts.Max)
}
k.mergedUnsignedValues = k.mergedUnsignedValues.Merge(v)
// Allow other goroutines to run
runtime.Gosched()
}
}
// Since we combined multiple blocks, we could have more values than we should put into
// a single block. We need to chunk them up into groups and re-encode them.
return k.chunkUnsigned(nil)
} else {
var chunked blocks
var i int
for i < len(k.blocks) {
// skip this block if it's values were already read
if k.blocks[i].read() {
i++
continue
}
// If we this block is already full, just add it as is
if BlockCount(k.blocks[i].b) >= k.size {
chunked = append(chunked, k.blocks[i])
} else {
break
}
i++
// Allow other goroutines to run
runtime.Gosched()
}
if k.fast {
for i < len(k.blocks) {
// skip this block if it's values were already read
if k.blocks[i].read() {
i++
continue
}
chunked = append(chunked, k.blocks[i])
i++
// Allow other goroutines to run
runtime.Gosched()
}
}
// If we only have 1 blocks left, just append it as is and avoid decoding/recoding
if i == len(k.blocks)-1 {
if !k.blocks[i].read() {
chunked = append(chunked, k.blocks[i])
}
i++
}
// The remaining blocks can be combined and we know that they do not overlap and
// so we can just append each, sort and re-encode.
for i < len(k.blocks) && len(k.mergedUnsignedValues) < k.size {
if k.blocks[i].read() {
i++
continue
}
v, err := DecodeUnsignedBlock(k.blocks[i].b, &[]UnsignedValue{})
if err != nil {
k.err = err
return nil
}
// Apply each tombstone to the block
for _, ts := range k.blocks[i].tombstones {
v = UnsignedValues(v).Exclude(ts.Min, ts.Max)
}
k.blocks[i].markRead(k.blocks[i].minTime, k.blocks[i].maxTime)
k.mergedUnsignedValues = k.mergedUnsignedValues.Merge(v)
i++
// Allow other goroutines to run
runtime.Gosched()
}
k.blocks = k.blocks[i:]
return k.chunkUnsigned(chunked)
}
}
func (k *tsmKeyIterator) chunkUnsigned(dst blocks) blocks {
if len(k.mergedUnsignedValues) > k.size {
values := k.mergedUnsignedValues[:k.size]
cb, err := UnsignedValues(values).Encode(nil)
if err != nil {
k.err = err
return nil
}
dst = append(dst, &block{
minTime: values[0].UnixNano(),
maxTime: values[len(values)-1].UnixNano(),
key: k.key,
b: cb,
})
k.mergedUnsignedValues = k.mergedUnsignedValues[k.size:]
return dst
}
// Re-encode the remaining values into the last block
if len(k.mergedUnsignedValues) > 0 {
cb, err := UnsignedValues(k.mergedUnsignedValues).Encode(nil)
if err != nil {
k.err = err
return nil
}
dst = append(dst, &block{
minTime: k.mergedUnsignedValues[0].UnixNano(),
maxTime: k.mergedUnsignedValues[len(k.mergedUnsignedValues)-1].UnixNano(),
key: k.key,
b: cb,
})
k.mergedUnsignedValues = k.mergedUnsignedValues[:0]
}
return dst
}
// merge combines the next set of blocks into merged blocks.
func (k *tsmKeyIterator) mergeString() {
// No blocks left, or pending merged values, we're done

View File

@ -7,6 +7,10 @@
"Name":"Integer",
"name":"integer"
},
{
"Name":"Unsigned",
"name":"unsigned"
},
{
"Name":"String",
"name":"string"

View File

@ -986,10 +986,11 @@ type tsmKeyIterator struct {
buf []blocks
// mergeValues are decoded blocks that have been combined
mergedFloatValues FloatValues
mergedIntegerValues IntegerValues
mergedBooleanValues BooleanValues
mergedStringValues StringValues
mergedFloatValues FloatValues
mergedIntegerValues IntegerValues
mergedUnsignedValues UnsignedValues
mergedBooleanValues BooleanValues
mergedStringValues StringValues
// merged are encoded blocks that have been combined or used as is
// without decode
@ -1065,6 +1066,7 @@ func NewTSMKeyIterator(size int, fast bool, readers ...*TSMReader) (KeyIterator,
func (k *tsmKeyIterator) hasMergedValues() bool {
return len(k.mergedFloatValues) > 0 ||
len(k.mergedIntegerValues) > 0 ||
len(k.mergedUnsignedValues) > 0 ||
len(k.mergedStringValues) > 0 ||
len(k.mergedBooleanValues) > 0
}
@ -1189,6 +1191,8 @@ func (k *tsmKeyIterator) merge() {
k.mergeFloat()
case BlockInteger:
k.mergeInteger()
case BlockUnsigned:
k.mergeUnsigned()
case BlockBoolean:
k.mergeBoolean()
case BlockString:

View File

@ -549,6 +549,200 @@ func (a IntegerValues) Len() int { return len(a) }
func (a IntegerValues) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a IntegerValues) Less(i, j int) bool { return a[i].UnixNano() < a[j].UnixNano() }
// UnsignedValues represents a slice of Unsigned values.
type UnsignedValues []UnsignedValue
func (a UnsignedValues) MinTime() int64 {
return a[0].UnixNano()
}
func (a UnsignedValues) MaxTime() int64 {
return a[len(a)-1].UnixNano()
}
func (a UnsignedValues) Size() int {
sz := 0
for _, v := range a {
sz += v.Size()
}
return sz
}
func (a UnsignedValues) ordered() bool {
if len(a) <= 1 {
return true
}
for i := 1; i < len(a); i++ {
if av, ab := a[i-1].UnixNano(), a[i].UnixNano(); av >= ab {
return false
}
}
return true
}
func (a UnsignedValues) assertOrdered() {
if len(a) <= 1 {
return
}
for i := 1; i < len(a); i++ {
if av, ab := a[i-1].UnixNano(), a[i].UnixNano(); av >= ab {
panic(fmt.Sprintf("not ordered: %d %d >= %d", i, av, ab))
}
}
}
// Deduplicate returns a new slice with any values that have the same timestamp removed.
// The Value that appears last in the slice is the one that is kept.
func (a UnsignedValues) Deduplicate() UnsignedValues {
if len(a) == 0 {
return a
}
// See if we're already sorted and deduped
var needSort bool
for i := 1; i < len(a); i++ {
if a[i-1].UnixNano() >= a[i].UnixNano() {
needSort = true
break
}
}
if !needSort {
return a
}
sort.Stable(a)
var i int
for j := 1; j < len(a); j++ {
v := a[j]
if v.UnixNano() != a[i].UnixNano() {
i++
}
a[i] = v
}
return a[:i+1]
}
// Exclude returns the subset of values not in [min, max]
func (a UnsignedValues) Exclude(min, max int64) UnsignedValues {
var i int
for j := 0; j < len(a); j++ {
if a[j].UnixNano() >= min && a[j].UnixNano() <= max {
continue
}
a[i] = a[j]
i++
}
return a[:i]
}
// Include returns the subset values between min and max inclusive.
func (a UnsignedValues) Include(min, max int64) UnsignedValues {
var i int
for j := 0; j < len(a); j++ {
if a[j].UnixNano() < min || a[j].UnixNano() > max {
continue
}
a[i] = a[j]
i++
}
return a[:i]
}
// Merge overlays b to top of a. If two values conflict with
// the same timestamp, b is used. Both a and b must be sorted
// in ascending order.
func (a UnsignedValues) Merge(b UnsignedValues) UnsignedValues {
if len(a) == 0 {
return b
}
if len(b) == 0 {
return a
}
// Normally, both a and b should not contain duplicates. Due to a bug in older versions, it's
// possible stored blocks might contain duplicate values. Remove them if they exists before
// merging.
a = a.Deduplicate()
b = b.Deduplicate()
if a[len(a)-1].UnixNano() < b[0].UnixNano() {
return append(a, b...)
}
if b[len(b)-1].UnixNano() < a[0].UnixNano() {
return append(b, a...)
}
out := make(UnsignedValues, 0, len(a)+len(b))
for len(a) > 0 && len(b) > 0 {
if a[0].UnixNano() < b[0].UnixNano() {
out, a = append(out, a[0]), a[1:]
} else if len(b) > 0 && a[0].UnixNano() == b[0].UnixNano() {
a = a[1:]
} else {
out, b = append(out, b[0]), b[1:]
}
}
if len(a) > 0 {
return append(out, a...)
}
return append(out, b...)
}
func (a UnsignedValues) Encode(buf []byte) ([]byte, error) {
return encodeUnsignedValuesBlock(buf, a)
}
func encodeUnsignedValuesBlock(buf []byte, values []UnsignedValue) ([]byte, error) {
if len(values) == 0 {
return nil, nil
}
venc := getUnsignedEncoder(len(values))
tsenc := getTimeEncoder(len(values))
var b []byte
err := func() error {
for _, v := range values {
tsenc.Write(v.unixnano)
venc.Write(int64(v.value))
}
venc.Flush()
// Encoded timestamp values
tb, err := tsenc.Bytes()
if err != nil {
return err
}
// Encoded values
vb, err := venc.Bytes()
if err != nil {
return err
}
// Prepend the first timestamp of the block in the first 8 bytes and the block
// in the next byte, followed by the block
b = packBlock(buf, BlockUnsigned, tb, vb)
return nil
}()
putTimeEncoder(tsenc)
putUnsignedEncoder(venc)
return b, err
}
// Sort methods
func (a UnsignedValues) Len() int { return len(a) }
func (a UnsignedValues) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a UnsignedValues) Less(i, j int) bool { return a[i].UnixNano() < a[j].UnixNano() }
// StringValues represents a slice of String values.
type StringValues []StringValue

View File

@ -170,7 +170,7 @@ func encode{{ .Name }}ValuesBlock(buf []byte, values []{{.Name}}Value) ([]byte,
err := func() error {
for _, v := range values {
tsenc.Write(v.unixnano)
venc.Write(v.value)
venc.Write({{if .CastType}}{{.CastType}}(v.value){{else}}v.value{{end}})
}
venc.Flush()

View File

@ -2,26 +2,37 @@
{
"Name":"",
"name":"",
"Type":""
"Type":"",
"CastType":""
},
{
"Name":"Float",
"name":"float",
"Type":"BlockFloat64"
"Type":"BlockFloat64",
"CastType":""
},
{
"Name":"Integer",
"name":"integer",
"Type":"BlockInteger"
"Type":"BlockInteger",
"CastType":""
},
{
"Name":"Unsigned",
"name":"unsigned",
"Type":"BlockUnsigned",
"CastType":"int64"
},
{
"Name":"String",
"name":"string",
"Type":"BlockString"
"Type":"BlockString",
"CastType":""
},
{
"Name":"Boolean",
"name":"boolean",
"Type":"BlockBoolean"
"Type":"BlockBoolean",
"CastType":""
}
]

View File

@ -24,6 +24,9 @@ const (
// BlockString designates a block encodes string values.
BlockString = byte(3)
// BlockUnsigned designates a block encodes uint64 values.
BlockUnsigned = byte(4)
// encodedBlockHeaderSize is the size of the header for an encoded block. There is one
// byte encoding the type of the block.
encodedBlockHeaderSize = 1
@ -114,6 +117,8 @@ func NewValue(t int64, value interface{}) Value {
switch v := value.(type) {
case int64:
return IntegerValue{unixnano: t, value: v}
case uint64:
return UnsignedValue{unixnano: t, value: v}
case float64:
return FloatValue{unixnano: t, value: v}
case bool:
@ -129,6 +134,11 @@ func NewIntegerValue(t int64, v int64) Value {
return IntegerValue{unixnano: t, value: v}
}
// NewUnsignedValue returns a new unsigned integer value.
func NewUnsignedValue(t int64, v uint64) Value {
return UnsignedValue{unixnano: t, value: v}
}
// NewFloatValue returns a new float value.
func NewFloatValue(t int64, v float64) Value {
return FloatValue{unixnano: t, value: v}
@ -159,11 +169,12 @@ func (e EmptyValue) Size() int { return 0 }
// String returns the empty string.
func (e EmptyValue) String() string { return "" }
func (_ EmptyValue) internalOnly() {}
func (_ StringValue) internalOnly() {}
func (_ IntegerValue) internalOnly() {}
func (_ BooleanValue) internalOnly() {}
func (_ FloatValue) internalOnly() {}
func (_ EmptyValue) internalOnly() {}
func (_ StringValue) internalOnly() {}
func (_ IntegerValue) internalOnly() {}
func (_ UnsignedValue) internalOnly() {}
func (_ BooleanValue) internalOnly() {}
func (_ FloatValue) internalOnly() {}
// Encode converts the values to a byte slice. If there are no values,
// this function panics.
@ -177,6 +188,8 @@ func (a Values) Encode(buf []byte) ([]byte, error) {
return encodeFloatBlock(buf, a)
case IntegerValue:
return encodeIntegerBlock(buf, a)
case UnsignedValue:
return encodeUnsignedBlock(buf, a)
case BooleanValue:
return encodeBooleanBlock(buf, a)
case StringValue:
@ -197,6 +210,8 @@ func (a Values) InfluxQLType() (influxql.DataType, error) {
return influxql.Float, nil
case IntegerValue:
return influxql.Integer, nil
case UnsignedValue:
return influxql.Unsigned, nil
case BooleanValue:
return influxql.Boolean, nil
case StringValue:
@ -211,7 +226,7 @@ func (a Values) InfluxQLType() (influxql.DataType, error) {
func BlockType(block []byte) (byte, error) {
blockType := block[0]
switch blockType {
case BlockFloat64, BlockInteger, BlockBoolean, BlockString:
case BlockFloat64, BlockInteger, BlockUnsigned, BlockBoolean, BlockString:
return blockType, nil
default:
return 0, fmt.Errorf("unknown block type: %d", blockType)
@ -265,6 +280,17 @@ func DecodeBlock(block []byte, vals []Value) ([]Value, error) {
}
return vals[:len(decoded)], err
case BlockUnsigned:
var buf []UnsignedValue
decoded, err := DecodeUnsignedBlock(block, &buf)
if len(vals) < len(decoded) {
vals = make([]Value, len(decoded))
}
for i := range decoded {
vals[i] = decoded[i]
}
return vals[:len(decoded)], err
case BlockBoolean:
var buf []BooleanValue
decoded, err := DecodeBooleanBlock(block, &buf)
@ -553,7 +579,7 @@ func DecodeBooleanBlock(block []byte, a *[]BooleanValue) ([]BooleanValue, error)
return (*a)[:i], err
}
// FloatValue represents an int64 value.
// IntegerValue represents an int64 value.
type IntegerValue struct {
unixnano int64
value int64
@ -671,6 +697,124 @@ func DecodeIntegerBlock(block []byte, a *[]IntegerValue) ([]IntegerValue, error)
return (*a)[:i], err
}
// UnsignedValue represents an int64 value.
type UnsignedValue struct {
unixnano int64
value uint64
}
// Value returns the underlying int64 value.
func (v UnsignedValue) Value() interface{} {
return v.value
}
// UnixNano returns the timestamp of the value.
func (v UnsignedValue) UnixNano() int64 {
return v.unixnano
}
// Size returns the number of bytes necessary to represent the value and its timestamp.
func (v UnsignedValue) Size() int {
return 16
}
// String returns the string representation of the value and its timestamp.
func (v UnsignedValue) String() string {
return fmt.Sprintf("%v %v", time.Unix(0, v.unixnano), v.Value())
}
func encodeUnsignedBlock(buf []byte, values []Value) ([]byte, error) {
tsEnc := getTimeEncoder(len(values))
vEnc := getUnsignedEncoder(len(values))
var b []byte
err := func() error {
for _, v := range values {
vv := v.(UnsignedValue)
tsEnc.Write(vv.unixnano)
vEnc.Write(int64(vv.value))
}
// Encoded timestamp values
tb, err := tsEnc.Bytes()
if err != nil {
return err
}
// Encoded int64 values
vb, err := vEnc.Bytes()
if err != nil {
return err
}
// Prepend the first timestamp of the block in the first 8 bytes
b = packBlock(buf, BlockUnsigned, tb, vb)
return nil
}()
putTimeEncoder(tsEnc)
putUnsignedEncoder(vEnc)
return b, err
}
// DecodeUnsignedBlock decodes the unsigned integer block from the byte slice
// and appends the unsigned integer values to a.
func DecodeUnsignedBlock(block []byte, a *[]UnsignedValue) ([]UnsignedValue, error) {
blockType := block[0]
if blockType != BlockUnsigned {
return nil, fmt.Errorf("invalid block type: exp %d, got %d", BlockUnsigned, blockType)
}
block = block[1:]
// The first 8 bytes is the minimum timestamp of the block
tb, vb, err := unpackBlock(block)
if err != nil {
return nil, err
}
tdec := timeDecoderPool.Get(0).(*TimeDecoder)
vdec := integerDecoderPool.Get(0).(*IntegerDecoder)
var i int
err = func() error {
// Setup our timestamp and value decoders
tdec.Init(tb)
vdec.SetBytes(vb)
// Decode both a timestamp and value
for tdec.Next() && vdec.Next() {
ts := tdec.Read()
v := uint64(vdec.Read())
if i < len(*a) {
elem := &(*a)[i]
elem.unixnano = ts
elem.value = v
} else {
*a = append(*a, UnsignedValue{ts, v})
}
i++
}
// Did timestamp decoding have an error?
err = tdec.Error()
if err != nil {
return err
}
// Did int64 decoding have an error?
err = vdec.Error()
if err != nil {
return err
}
return nil
}()
timeDecoderPool.Put(tdec)
integerDecoderPool.Put(vdec)
return (*a)[:i], err
}
// StringValue represents a string value.
type StringValue struct {
unixnano int64
@ -858,6 +1002,13 @@ func getIntegerEncoder(sz int) IntegerEncoder {
}
func putIntegerEncoder(enc IntegerEncoder) { integerEncoderPool.Put(enc) }
func getUnsignedEncoder(sz int) IntegerEncoder {
x := integerEncoderPool.Get(sz).(IntegerEncoder)
x.Reset()
return x
}
func putUnsignedEncoder(enc IntegerEncoder) { integerEncoderPool.Put(enc) }
func getFloatEncoder(sz int) *FloatEncoder {
x := floatEncoderPool.Get(sz).(*FloatEncoder)
x.Reset()

View File

@ -142,6 +142,66 @@ func TestEncoding_IntBlock_Negatives(t *testing.T) {
}
}
func TestEncoding_UIntBlock_Basic(t *testing.T) {
valueCount := 1000
times := getTimes(valueCount, 60, time.Second)
values := make([]tsm1.Value, len(times))
for i, t := range times {
values[i] = tsm1.NewValue(t, uint64(i))
}
b, err := tsm1.Values(values).Encode(nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
var decodedValues []tsm1.Value
decodedValues, err = tsm1.DecodeBlock(b, decodedValues)
if err != nil {
t.Fatalf("unexpected error decoding block: %v", err)
}
if len(decodedValues) != len(values) {
t.Fatalf("unexpected results length:\n\tgot: %v\n\texp: %v\n", len(decodedValues), len(values))
}
for i := 0; i < len(decodedValues); i++ {
if decodedValues[i].UnixNano() != values[i].UnixNano() {
t.Fatalf("unexpected results:\n\tgot: %v\n\texp: %v\n", decodedValues[i].UnixNano(), values[i].UnixNano())
}
if decodedValues[i].Value() != values[i].Value() {
t.Fatalf("unexpected results:\n\tgot: %v\n\texp: %v\n", decodedValues[i].Value(), values[i].Value())
}
}
}
// TestEncoding_UIntBlock_MaxValues encodes uint64 numbers starting at max (18446744073709551615)
// down to 18446744073709550616
func TestEncoding_UIntBlock_MaxValues(t *testing.T) {
valueCount := 1000
times := getTimes(valueCount, 60, time.Second)
values := make([]tsm1.Value, len(times))
for i, t := range times {
values[i] = tsm1.NewValue(t, ^uint64(i))
}
b, err := tsm1.Values(values).Encode(nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
var decodedValues []tsm1.Value
decodedValues, err = tsm1.DecodeBlock(b, decodedValues)
if err != nil {
t.Fatalf("unexpected error decoding block: %v", err)
}
if !reflect.DeepEqual(decodedValues, values) {
t.Fatalf("unexpected results:\n\tgot: %v\n\texp: %v\n", decodedValues, values)
}
}
func TestEncoding_BooleanBlock_Basic(t *testing.T) {
valueCount := 1000
times := getTimes(valueCount, 60, time.Second)
@ -201,6 +261,7 @@ func TestEncoding_BlockType(t *testing.T) {
}{
{value: float64(1.0), blockType: tsm1.BlockFloat64},
{value: int64(1), blockType: tsm1.BlockInteger},
{value: uint64(1), blockType: tsm1.BlockUnsigned},
{value: true, blockType: tsm1.BlockBoolean},
{value: "string", blockType: tsm1.BlockString},
}
@ -237,6 +298,7 @@ func TestEncoding_Count(t *testing.T) {
}{
{value: float64(1.0), blockType: tsm1.BlockFloat64},
{value: int64(1), blockType: tsm1.BlockInteger},
{value: uint64(1), blockType: tsm1.BlockUnsigned},
{value: true, blockType: tsm1.BlockBoolean},
{value: "string", blockType: tsm1.BlockString},
}
@ -645,6 +707,145 @@ func TestIntegerValues_Merge(t *testing.T) {
}
}
func TestUnsignedValues_Merge(t *testing.T) {
uintValue := func(t int64, f uint64) tsm1.UnsignedValue {
return tsm1.NewValue(t, f).(tsm1.UnsignedValue)
}
tests := []struct {
a, b, exp []tsm1.UnsignedValue
}{
{ // empty a
a: []tsm1.UnsignedValue{},
b: []tsm1.UnsignedValue{
uintValue(1, 10),
uintValue(2, 20),
},
exp: []tsm1.UnsignedValue{
uintValue(1, 10),
uintValue(2, 20),
},
},
{ // empty b
a: []tsm1.UnsignedValue{
uintValue(1, 1),
uintValue(2, 2),
},
b: []tsm1.UnsignedValue{},
exp: []tsm1.UnsignedValue{
uintValue(1, 1),
uintValue(2, 2),
},
},
{
a: []tsm1.UnsignedValue{
uintValue(1, 1),
},
b: []tsm1.UnsignedValue{
uintValue(0, 0),
uintValue(1, 10), // overwrites a
uintValue(2, 20),
uintValue(3, 30),
uintValue(4, 40),
},
exp: []tsm1.UnsignedValue{
uintValue(0, 0),
uintValue(1, 10),
uintValue(2, 20),
uintValue(3, 30),
uintValue(4, 40),
},
},
{
a: []tsm1.UnsignedValue{
uintValue(1, 1),
uintValue(2, 2),
uintValue(3, 3),
uintValue(4, 4),
},
b: []tsm1.UnsignedValue{
uintValue(1, ^uint64(0)), // overwrites a
uintValue(2, 20), // overwrites a
},
exp: []tsm1.UnsignedValue{
uintValue(1, ^uint64(0)),
uintValue(2, 20),
uintValue(3, 3),
uintValue(4, 4),
},
},
{
a: []tsm1.UnsignedValue{
uintValue(1, 1),
uintValue(2, 2),
uintValue(3, 3),
uintValue(4, 4),
},
b: []tsm1.UnsignedValue{
uintValue(1, 10), // overwrites a
uintValue(2, 20), // overwrites a
uintValue(3, 30),
uintValue(4, 40),
},
exp: []tsm1.UnsignedValue{
uintValue(1, 10),
uintValue(2, 20),
uintValue(3, 30),
uintValue(4, 40),
},
},
{
a: []tsm1.UnsignedValue{
uintValue(0, 0),
uintValue(1, 1),
uintValue(2, 2),
uintValue(3, 3),
uintValue(4, 4),
},
b: []tsm1.UnsignedValue{
uintValue(0, 0),
uintValue(2, 20),
uintValue(4, 40),
},
exp: []tsm1.UnsignedValue{
uintValue(0, 0.0),
uintValue(1, 1),
uintValue(2, 20),
uintValue(3, 3),
uintValue(4, 40),
},
},
}
for i, test := range tests {
if i != 2 {
continue
}
got := tsm1.UnsignedValues(test.a).Merge(test.b)
if exp, got := len(test.exp), len(got); exp != got {
t.Fatalf("test(%d): value length mismatch: exp %v, got %v", i, exp, got)
}
dedup := tsm1.UnsignedValues(append(test.a, test.b...)).Deduplicate()
for i := range test.exp {
if exp, got := test.exp[i].String(), got[i].String(); exp != got {
t.Fatalf("value mismatch:\n exp %v\n got %v", exp, got)
}
if exp, got := test.exp[i].String(), dedup[i].String(); exp != got {
t.Fatalf("value mismatch:\n exp %v\n got %v", exp, got)
}
}
}
}
func TestFloatValues_Merge(t *testing.T) {
floatValue := func(t int64, f float64) tsm1.FloatValue {
return tsm1.NewValue(t, f).(tsm1.FloatValue)

View File

@ -848,6 +848,12 @@ func (e *Engine) WritePoints(points []models.Point) error {
return err
}
v = NewIntegerValue(t, iv)
case models.Unsigned:
iv, err := iter.UnsignedValue()
if err != nil {
return err
}
v = NewUnsignedValue(t, iv)
case models.String:
v = NewStringValue(t, iter.StringValue())
case models.Boolean:
@ -1849,6 +1855,9 @@ func (e *Engine) createVarRefSeriesIterator(ref *influxql.VarRef, name string, s
case influxql.Integer:
aux[i] = &integerNilLiteralCursor{}
continue
case influxql.Unsigned:
aux[i] = &unsignedNilLiteralCursor{}
continue
case influxql.String:
aux[i] = &stringNilLiteralCursor{}
continue
@ -1890,6 +1899,9 @@ func (e *Engine) createVarRefSeriesIterator(ref *influxql.VarRef, name string, s
case influxql.Integer:
conds[i] = &integerNilLiteralCursor{}
continue
case influxql.Unsigned:
conds[i] = &unsignedNilLiteralCursor{}
continue
case influxql.String:
conds[i] = &stringNilLiteralCursor{}
continue
@ -1932,6 +1944,8 @@ func (e *Engine) createVarRefSeriesIterator(ref *influxql.VarRef, name string, s
return newFloatIterator(name, tags, itrOpt, cur, aux, conds, condNames), nil
case integerCursor:
return newIntegerIterator(name, tags, itrOpt, cur, aux, conds, condNames), nil
case unsignedCursor:
return newUnsignedIterator(name, tags, itrOpt, cur, aux, conds, condNames), nil
case stringCursor:
return newStringIterator(name, tags, itrOpt, cur, aux, conds, condNames), nil
case booleanCursor:
@ -1964,12 +1978,27 @@ func (e *Engine) buildCursor(measurement, seriesKey string, ref *influxql.VarRef
case influxql.Integer:
cur := e.buildIntegerCursor(measurement, seriesKey, ref.Val, opt)
return &floatCastIntegerCursor{cursor: cur}
case influxql.Unsigned:
cur := e.buildUnsignedCursor(measurement, seriesKey, ref.Val, opt)
return &floatCastUnsignedCursor{cursor: cur}
}
case influxql.Integer:
switch f.Type {
case influxql.Float:
cur := e.buildFloatCursor(measurement, seriesKey, ref.Val, opt)
return &integerCastFloatCursor{cursor: cur}
case influxql.Unsigned:
cur := e.buildUnsignedCursor(measurement, seriesKey, ref.Val, opt)
return &integerCastUnsignedCursor{cursor: cur}
}
case influxql.Unsigned:
switch f.Type {
case influxql.Float:
cur := e.buildFloatCursor(measurement, seriesKey, ref.Val, opt)
return &unsignedCastFloatCursor{cursor: cur}
case influxql.Integer:
cur := e.buildIntegerCursor(measurement, seriesKey, ref.Val, opt)
return &unsignedCastIntegerCursor{cursor: cur}
}
}
return nil
@ -1981,6 +2010,8 @@ func (e *Engine) buildCursor(measurement, seriesKey string, ref *influxql.VarRef
return e.buildFloatCursor(measurement, seriesKey, ref.Val, opt)
case influxql.Integer:
return e.buildIntegerCursor(measurement, seriesKey, ref.Val, opt)
case influxql.Unsigned:
return e.buildUnsignedCursor(measurement, seriesKey, ref.Val, opt)
case influxql.String:
return e.buildStringCursor(measurement, seriesKey, ref.Val, opt)
case influxql.Boolean:
@ -2004,6 +2035,13 @@ func (e *Engine) buildIntegerCursor(measurement, seriesKey, field string, opt in
return newIntegerCursor(opt.SeekTime(), opt.Ascending, cacheValues, keyCursor)
}
// buildUnsignedCursor creates a cursor for an unsigned field.
func (e *Engine) buildUnsignedCursor(measurement, seriesKey, field string, opt influxql.IteratorOptions) unsignedCursor {
cacheValues := e.Cache.Values(SeriesFieldKey(seriesKey, field))
keyCursor := e.KeyCursor(SeriesFieldKey(seriesKey, field), opt.SeekTime(), opt.Ascending)
return newUnsignedCursor(opt.SeekTime(), opt.Ascending, cacheValues, keyCursor)
}
// buildStringCursor creates a cursor for a string field.
func (e *Engine) buildStringCursor(measurement, seriesKey, field string, opt influxql.IteratorOptions) stringCursor {
cacheValues := e.Cache.Values(SeriesFieldKey(seriesKey, field))
@ -2033,6 +2071,8 @@ func tsmFieldTypeToInfluxQLDataType(typ byte) (influxql.DataType, error) {
return influxql.Float, nil
case BlockInteger:
return influxql.Integer, nil
case BlockUnsigned:
return influxql.Unsigned, nil
case BlockBoolean:
return influxql.Boolean, nil
case BlockString:

View File

@ -332,6 +332,169 @@ func (c *KeyCursor) ReadIntegerBlock(buf *[]IntegerValue) ([]IntegerValue, error
return values, err
}
// ReadUnsignedBlock reads the next block as a set of unsigned values.
func (c *KeyCursor) ReadUnsignedBlock(buf *[]UnsignedValue) ([]UnsignedValue, error) {
// No matching blocks to decode
if len(c.current) == 0 {
return nil, nil
}
// First block is the oldest block containing the points we're searching for.
first := c.current[0]
*buf = (*buf)[:0]
values, err := first.r.ReadUnsignedBlockAt(&first.entry, buf)
if err != nil {
return nil, err
}
// Remove values we already read
values = UnsignedValues(values).Exclude(first.readMin, first.readMax)
// Remove any tombstones
tombstones := first.r.TombstoneRange(c.key)
values = c.filterUnsignedValues(tombstones, values)
// Check we have remaining values.
if len(values) == 0 {
return nil, nil
}
// Only one block with this key and time range so return it
if len(c.current) == 1 {
if len(values) > 0 {
first.markRead(values[0].UnixNano(), values[len(values)-1].UnixNano())
}
return values, nil
}
// Use the current block time range as our overlapping window
minT, maxT := first.readMin, first.readMax
if len(values) > 0 {
minT, maxT = values[0].UnixNano(), values[len(values)-1].UnixNano()
}
if c.ascending {
// Blocks are ordered by generation, we may have values in the past in later blocks, if so,
// expand the window to include the min time range to ensure values are returned in ascending
// order
for i := 1; i < len(c.current); i++ {
cur := c.current[i]
if cur.entry.MinTime < minT && !cur.read() {
minT = cur.entry.MinTime
}
}
// Find first block that overlaps our window
for i := 1; i < len(c.current); i++ {
cur := c.current[i]
if cur.entry.OverlapsTimeRange(minT, maxT) && !cur.read() {
// Shrink our window so it's the intersection of the first overlapping block and the
// first block. We do this to minimize the region that overlaps and needs to
// be merged.
if cur.entry.MaxTime > maxT {
maxT = cur.entry.MaxTime
}
values = UnsignedValues(values).Include(minT, maxT)
break
}
}
// Search the remaining blocks that overlap our window and append their values so we can
// merge them.
for i := 1; i < len(c.current); i++ {
cur := c.current[i]
// Skip this block if it doesn't contain points we looking for or they have already been read
if !cur.entry.OverlapsTimeRange(minT, maxT) || cur.read() {
cur.markRead(minT, maxT)
continue
}
tombstones := cur.r.TombstoneRange(c.key)
var a []UnsignedValue
v, err := cur.r.ReadUnsignedBlockAt(&cur.entry, &a)
if err != nil {
return nil, err
}
// Remove any tombstoned values
v = c.filterUnsignedValues(tombstones, v)
// Remove values we already read
v = UnsignedValues(v).Exclude(cur.readMin, cur.readMax)
if len(v) > 0 {
// Only use values in the overlapping window
v = UnsignedValues(v).Include(minT, maxT)
// Merge the remaing values with the existing
values = UnsignedValues(values).Merge(v)
}
cur.markRead(minT, maxT)
}
} else {
// Blocks are ordered by generation, we may have values in the past in later blocks, if so,
// expand the window to include the max time range to ensure values are returned in descending
// order
for i := 1; i < len(c.current); i++ {
cur := c.current[i]
if cur.entry.MaxTime > maxT && !cur.read() {
maxT = cur.entry.MaxTime
}
}
// Find first block that overlaps our window
for i := 1; i < len(c.current); i++ {
cur := c.current[i]
if cur.entry.OverlapsTimeRange(minT, maxT) && !cur.read() {
// Shrink our window so it's the intersection of the first overlapping block and the
// first block. We do this to minimize the region that overlaps and needs to
// be merged.
if cur.entry.MinTime < minT {
minT = cur.entry.MinTime
}
values = UnsignedValues(values).Include(minT, maxT)
break
}
}
// Search the remaining blocks that overlap our window and append their values so we can
// merge them.
for i := 1; i < len(c.current); i++ {
cur := c.current[i]
// Skip this block if it doesn't contain points we looking for or they have already been read
if !cur.entry.OverlapsTimeRange(minT, maxT) || cur.read() {
cur.markRead(minT, maxT)
continue
}
tombstones := cur.r.TombstoneRange(c.key)
var a []UnsignedValue
v, err := cur.r.ReadUnsignedBlockAt(&cur.entry, &a)
if err != nil {
return nil, err
}
// Remove any tombstoned values
v = c.filterUnsignedValues(tombstones, v)
// Remove values we already read
v = UnsignedValues(v).Exclude(cur.readMin, cur.readMax)
// If the block we decoded should have all of it's values included, mark it as read so we
// don't use it again.
if len(v) > 0 {
v = UnsignedValues(v).Include(minT, maxT)
// Merge the remaing values with the existing
values = UnsignedValues(v).Merge(values)
}
cur.markRead(minT, maxT)
}
}
first.markRead(minT, maxT)
return values, err
}
// ReadStringBlock reads the next block as a set of string values.
func (c *KeyCursor) ReadStringBlock(buf *[]StringValue) ([]StringValue, error) {
// No matching blocks to decode

View File

@ -7,6 +7,10 @@
"Name":"Integer",
"name":"integer"
},
{
"Name":"Unsigned",
"name":"unsigned"
},
{
"Name":"String",
"name":"string"

View File

@ -31,6 +31,7 @@ type TSMFile interface {
ReadAt(entry *IndexEntry, values []Value) ([]Value, error)
ReadFloatBlockAt(entry *IndexEntry, values *[]FloatValue) ([]FloatValue, error)
ReadIntegerBlockAt(entry *IndexEntry, values *[]IntegerValue) ([]IntegerValue, error)
ReadUnsignedBlockAt(entry *IndexEntry, values *[]UnsignedValue) ([]UnsignedValue, error)
ReadStringBlockAt(entry *IndexEntry, values *[]StringValue) ([]StringValue, error)
ReadBooleanBlockAt(entry *IndexEntry, values *[]BooleanValue) ([]BooleanValue, error)
@ -1143,6 +1144,13 @@ func (c *KeyCursor) filterIntegerValues(tombstones []TimeRange, values IntegerVa
return values
}
func (c *KeyCursor) filterUnsignedValues(tombstones []TimeRange, values UnsignedValues) UnsignedValues {
for _, t := range tombstones {
values = values.Exclude(t.Min, t.Max)
}
return values
}
func (c *KeyCursor) filterStringValues(tombstones []TimeRange, values StringValues) StringValues {
for _, t := range tombstones {
values = values.Exclude(t.Min, t.Max)

View File

@ -335,6 +335,72 @@ func TestFileStore_SeekToAsc_BeforeStart_OverlapInteger(t *testing.T) {
}
}
// Tests that seeking and reading all blocks that contain overlapping points does
// not skip any blocks.
func TestFileStore_SeekToAsc_BeforeStart_OverlapUnsigned(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
fs := tsm1.NewFileStore(dir)
// Setup 3 files
data := []keyValues{
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(0, uint64(0)), tsm1.NewValue(1, uint64(1))}},
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(2, uint64(2))}},
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(3, uint64(3))}},
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(0, uint64(4)), tsm1.NewValue(2, uint64(7))}},
}
files, err := newFiles(dir, data...)
if err != nil {
t.Fatalf("unexpected error creating files: %v", err)
}
fs.Replace(nil, files)
// Search for an entry that exists in the second file
buf := make([]tsm1.UnsignedValue, 1000)
c := fs.KeyCursor("cpu", 0, true)
values, err := c.ReadUnsignedBlock(&buf)
if err != nil {
t.Fatalf("unexpected error reading values: %v", err)
}
exp := []tsm1.Value{
data[3].values[0],
data[0].values[1],
data[3].values[1],
}
if got, exp := len(values), len(exp); got != exp {
t.Fatalf("value length mismatch: got %v, exp %v", got, exp)
}
for i, v := range exp {
if got, exp := values[i].Value(), v.Value(); got != exp {
t.Fatalf("read value mismatch(%d): got %v, exp %v", i, got, exp)
}
}
c.Next()
values, err = c.ReadUnsignedBlock(&buf)
if err != nil {
t.Fatalf("unexpected error reading values: %v", err)
}
exp = []tsm1.Value{
data[2].values[0],
}
if got, exp := len(values), len(exp); got != exp {
t.Fatalf("value length mismatch: got %v, exp %v", got, exp)
}
for i, v := range exp {
if got, exp := values[i].Value(), v.Value(); got != exp {
t.Fatalf("read value mismatch(%d): got %v, exp %v", i, got, exp)
}
}
}
// Tests that seeking and reading all blocks that contain overlapping points does
// not skip any blocks.
func TestFileStore_SeekToAsc_BeforeStart_OverlapBoolean(t *testing.T) {
@ -626,6 +692,85 @@ func TestFileStore_SeekToAsc_OverlapMinInteger(t *testing.T) {
}
}
// Tests that blocks with a lower min time in later files are not returned
// more than once causing unsorted results
func TestFileStore_SeekToAsc_OverlapMinUnsigned(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
fs := tsm1.NewFileStore(dir)
// Setup 3 files
data := []keyValues{
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(1, uint64(1)), tsm1.NewValue(3, uint64(3))}},
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(2, uint64(2)), tsm1.NewValue(4, uint64(4))}},
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(0, uint64(0)), tsm1.NewValue(1, uint64(10))}},
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(2, uint64(5))}},
}
files, err := newFiles(dir, data...)
if err != nil {
t.Fatalf("unexpected error creating files: %v", err)
}
fs.Replace(nil, files)
buf := make([]tsm1.UnsignedValue, 1000)
c := fs.KeyCursor("cpu", 0, true)
// Search for an entry that exists in the second file
values, err := c.ReadUnsignedBlock(&buf)
if err != nil {
t.Fatalf("unexpected error reading values: %v", err)
}
exp := []tsm1.Value{
data[2].values[0],
data[2].values[1],
data[3].values[0],
data[0].values[1],
}
if got, exp := len(values), len(exp); got != exp {
t.Fatalf("value length mismatch: got %v, exp %v", got, exp)
}
for i, v := range exp {
if got, exp := values[i].Value(), v.Value(); got != exp {
t.Fatalf("read value mismatch(%d): got %v, exp %v", i, got, exp)
}
}
// Check that calling Next will dedupe points
c.Next()
values, err = c.ReadUnsignedBlock(&buf)
if err != nil {
t.Fatalf("unexpected error reading values: %v", err)
}
exp = []tsm1.Value{
data[1].values[1],
}
if got, exp := len(values), len(exp); got != exp {
t.Fatalf("value length mismatch: got %v, exp %v", got, exp)
}
for i, v := range exp {
if got, exp := values[i].Value(), v.Value(); got != exp {
t.Fatalf("read value mismatch(%d): got %v, exp %v", i, got, exp)
}
}
c.Next()
values, err = c.ReadUnsignedBlock(&buf)
if err != nil {
t.Fatal(err)
}
exp = nil
if got, exp := len(values), len(exp); got != exp {
t.Fatalf("value length mismatch: got %v, exp %v", got, exp)
}
}
// Tests that blocks with a lower min time in later files are not returned
// more than once causing unsorted results
func TestFileStore_SeekToAsc_OverlapMinBoolean(t *testing.T) {
@ -1104,6 +1249,68 @@ func TestFileStore_SeekToDesc_OverlapMaxInteger(t *testing.T) {
}
}
}
func TestFileStore_SeekToDesc_OverlapMaxUnsigned(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
fs := tsm1.NewFileStore(dir)
// Setup 3 files
data := []keyValues{
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(1, uint64(1)), tsm1.NewValue(3, uint64(3))}},
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(2, uint64(2)), tsm1.NewValue(4, uint64(4))}},
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(0, uint64(0)), tsm1.NewValue(1, uint64(10))}},
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(2, uint64(5))}},
}
files, err := newFiles(dir, data...)
if err != nil {
t.Fatalf("unexpected error creating files: %v", err)
}
fs.Replace(nil, files)
// Search for an entry that exists in the second file
buf := make([]tsm1.UnsignedValue, 1000)
c := fs.KeyCursor("cpu", 5, false)
values, err := c.ReadUnsignedBlock(&buf)
if err != nil {
t.Fatalf("unexpected error reading values: %v", err)
}
exp := []tsm1.Value{
data[3].values[0],
data[0].values[1],
data[1].values[1],
}
if got, exp := len(values), len(exp); got != exp {
t.Fatalf("value length mismatch: got %v, exp %v", got, exp)
}
for i, v := range exp {
if got, exp := values[i].Value(), v.Value(); got != exp {
t.Fatalf("read value mismatch(%d): got %v, exp %v", i, got, exp)
}
}
c.Next()
values, err = c.ReadUnsignedBlock(&buf)
if err != nil {
t.Fatalf("unexpected error reading values: %v", err)
}
exp = []tsm1.Value{
data[2].values[0],
data[2].values[1],
}
if got, exp := len(values), len(exp); got != exp {
t.Fatalf("value length mismatch: got %v, exp %v", got, exp)
}
for i, v := range exp {
if got, exp := values[i].Value(), v.Value(); got != exp {
t.Fatalf("read value mismatch(%d): got %v, exp %v", i, got, exp)
}
}
}
func TestFileStore_SeekToDesc_OverlapMaxBoolean(t *testing.T) {
dir := MustTempDir()
@ -1443,6 +1650,83 @@ func TestFileStore_SeekToDesc_AfterEnd_OverlapInteger(t *testing.T) {
}
}
func TestFileStore_SeekToDesc_AfterEnd_OverlapUnsigned(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
fs := tsm1.NewFileStore(dir)
// Setup 3 files
data := []keyValues{
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(8, uint64(0)), tsm1.NewValue(9, uint64(1))}},
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(2, uint64(2))}},
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(3, uint64(3))}},
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(3, uint64(4)), tsm1.NewValue(10, uint64(7))}},
}
files, err := newFiles(dir, data...)
if err != nil {
t.Fatalf("unexpected error creating files: %v", err)
}
fs.Replace(nil, files)
buf := make([]tsm1.UnsignedValue, 1000)
c := fs.KeyCursor("cpu", 11, false)
values, err := c.ReadUnsignedBlock(&buf)
if err != nil {
t.Fatalf("unexpected error reading values: %v", err)
}
exp := []tsm1.Value{
data[3].values[0],
data[0].values[0],
data[0].values[1],
data[3].values[1],
}
if got, exp := len(values), len(exp); got != exp {
t.Fatalf("value length mismatch: got %v, exp %v", got, exp)
}
for i, v := range exp {
if got, exp := values[i].Value(), v.Value(); got != exp {
t.Fatalf("read value mismatch(%d): got %v, exp %v", i, got, exp)
}
}
c.Next()
values, err = c.ReadUnsignedBlock(&buf)
if err != nil {
t.Fatalf("unexpected error reading values: %v", err)
}
exp = []tsm1.Value{
data[1].values[0],
}
if got, exp := len(values), len(exp); got != exp {
t.Fatalf("value length mismatch: got %v, exp %v", got, exp)
}
for i, v := range exp {
if got, exp := values[i].Value(), v.Value(); got != exp {
t.Fatalf("read value mismatch(%d): got %v, exp %v", i, got, exp)
}
}
c.Next()
values, err = c.ReadUnsignedBlock(&buf)
if err != nil {
t.Fatalf("unexpected error reading values: %v", err)
}
if got, exp := len(values), 0; got != exp {
t.Fatalf("value length mismatch: got %v, exp %v", got, exp)
}
}
func TestFileStore_SeekToDesc_AfterEnd_OverlapBoolean(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
@ -1884,6 +2168,50 @@ func TestKeyCursor_TombstoneRange_PartialInteger(t *testing.T) {
}
}
func TestKeyCursor_TombstoneRange_PartialUnsigned(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
fs := tsm1.NewFileStore(dir)
// Setup 3 files
data := []keyValues{
keyValues{"cpu", []tsm1.Value{
tsm1.NewValue(0, uint64(1)),
tsm1.NewValue(1, uint64(2)),
tsm1.NewValue(2, uint64(3))}},
}
files, err := newFiles(dir, data...)
if err != nil {
t.Fatalf("unexpected error creating files: %v", err)
}
fs.Replace(nil, files)
if err := fs.DeleteRange([]string{"cpu"}, 1, 1); err != nil {
t.Fatalf("unexpected error delete range: %v", err)
}
buf := make([]tsm1.UnsignedValue, 1000)
c := fs.KeyCursor("cpu", 0, true)
values, err := c.ReadUnsignedBlock(&buf)
if err != nil {
t.Fatalf("unexpected error reading values: %v", err)
}
expValues := []tsm1.Value{data[0].values[0], data[0].values[2]}
for i, v := range expValues {
exp := v
if got, exp := len(values), 2; got != exp {
t.Fatalf("value length mismatch: got %v, exp %v", got, exp)
}
if got, exp := values[i].String(), exp.String(); got != exp {
t.Fatalf("read value mismatch(%d): got %v, exp %v", 0, got, exp)
}
}
}
func TestKeyCursor_TombstoneRange_PartialString(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)

View File

@ -100,6 +100,8 @@ func (c *bufCursor) nextAt(seek int64) interface{} {
return (*float64)(nil)
case integerCursor:
return (*int64)(nil)
case unsignedCursor:
return (*uint64)(nil)
case stringCursor:
return (*string)(nil)
case booleanCursor:
@ -997,6 +999,447 @@ func (c *integerNilLiteralCursor) peek() (t int64, v interface{}) { return tsdb.
func (c *integerNilLiteralCursor) next() (t int64, v interface{}) { return tsdb.EOF, (*int64)(nil) }
func (c *integerNilLiteralCursor) nextAt(seek int64) interface{} { return (*int64)(nil) }
type unsignedIterator struct {
cur unsignedCursor
aux []cursorAt
conds struct {
names []string
curs []cursorAt
}
opt influxql.IteratorOptions
m map[string]interface{} // map used for condition evaluation
point influxql.UnsignedPoint // reusable buffer
statsLock sync.Mutex
stats influxql.IteratorStats
statsBuf influxql.IteratorStats
}
func newUnsignedIterator(name string, tags influxql.Tags, opt influxql.IteratorOptions, cur unsignedCursor, aux []cursorAt, conds []cursorAt, condNames []string) *unsignedIterator {
itr := &unsignedIterator{
cur: cur,
aux: aux,
opt: opt,
point: influxql.UnsignedPoint{
Name: name,
Tags: tags,
},
statsBuf: influxql.IteratorStats{
SeriesN: 1,
},
}
itr.stats = itr.statsBuf
if len(aux) > 0 {
itr.point.Aux = make([]interface{}, len(aux))
}
if opt.Condition != nil {
itr.m = make(map[string]interface{}, len(aux)+len(conds))
}
itr.conds.names = condNames
itr.conds.curs = conds
return itr
}
// Next returns the next point from the iterator.
func (itr *unsignedIterator) Next() (*influxql.UnsignedPoint, error) {
for {
seek := tsdb.EOF
if itr.cur != nil {
// Read from the main cursor if we have one.
itr.point.Time, itr.point.Value = itr.cur.nextUnsigned()
seek = itr.point.Time
} else {
// Otherwise find lowest aux timestamp.
for i := range itr.aux {
if k, _ := itr.aux[i].peek(); k != tsdb.EOF {
if seek == tsdb.EOF || (itr.opt.Ascending && k < seek) || (!itr.opt.Ascending && k > seek) {
seek = k
}
}
}
itr.point.Time = seek
}
// Exit if we have no more points or we are outside our time range.
if itr.point.Time == tsdb.EOF {
itr.copyStats()
return nil, nil
} else if itr.opt.Ascending && itr.point.Time > itr.opt.EndTime {
itr.copyStats()
return nil, nil
} else if !itr.opt.Ascending && itr.point.Time < itr.opt.StartTime {
itr.copyStats()
return nil, nil
}
// Read from each auxiliary cursor.
for i := range itr.opt.Aux {
itr.point.Aux[i] = itr.aux[i].nextAt(seek)
}
// Read from condition field cursors.
for i := range itr.conds.curs {
itr.m[itr.conds.names[i]] = itr.conds.curs[i].nextAt(seek)
}
// Evaluate condition, if one exists. Retry if it fails.
if itr.opt.Condition != nil && !influxql.EvalBool(itr.opt.Condition, itr.m) {
continue
}
// Track points returned.
itr.statsBuf.PointN++
// Copy buffer to stats periodically.
if itr.statsBuf.PointN%statsBufferCopyIntervalN == 0 {
itr.copyStats()
}
return &itr.point, nil
}
}
// copyStats copies from the itr stats buffer to the stats under lock.
func (itr *unsignedIterator) copyStats() {
itr.statsLock.Lock()
itr.stats = itr.statsBuf
itr.statsLock.Unlock()
}
// Stats returns stats on the points processed.
func (itr *unsignedIterator) Stats() influxql.IteratorStats {
itr.statsLock.Lock()
stats := itr.stats
itr.statsLock.Unlock()
return stats
}
// Close closes the iterator.
func (itr *unsignedIterator) Close() error {
for _, c := range itr.aux {
c.close()
}
itr.aux = nil
for _, c := range itr.conds.curs {
c.close()
}
itr.conds.curs = nil
if itr.cur != nil {
err := itr.cur.close()
itr.cur = nil
return err
}
return nil
}
// unsignedLimitIterator
type unsignedLimitIterator struct {
input influxql.UnsignedIterator
opt influxql.IteratorOptions
n int
}
func newUnsignedLimitIterator(input influxql.UnsignedIterator, opt influxql.IteratorOptions) *unsignedLimitIterator {
return &unsignedLimitIterator{
input: input,
opt: opt,
}
}
func (itr *unsignedLimitIterator) Stats() influxql.IteratorStats { return itr.input.Stats() }
func (itr *unsignedLimitIterator) Close() error { return itr.input.Close() }
func (itr *unsignedLimitIterator) Next() (*influxql.UnsignedPoint, error) {
// Check if we are beyond the limit.
if (itr.n - itr.opt.Offset) > itr.opt.Limit {
return nil, nil
}
// Read the next point.
p, err := itr.input.Next()
if p == nil || err != nil {
return nil, err
}
// Increment counter.
itr.n++
// Offsets are handled by a higher level iterator so return all points.
return p, nil
}
// unsignedCursor represents an object for iterating over a single unsigned field.
type unsignedCursor interface {
cursor
nextUnsigned() (t int64, v uint64)
}
func newUnsignedCursor(seek int64, ascending bool, cacheValues Values, tsmKeyCursor *KeyCursor) unsignedCursor {
if ascending {
return newUnsignedAscendingCursor(seek, cacheValues, tsmKeyCursor)
}
return newUnsignedDescendingCursor(seek, cacheValues, tsmKeyCursor)
}
type unsignedAscendingCursor struct {
cache struct {
values Values
pos int
}
tsm struct {
buf []UnsignedValue
values []UnsignedValue
pos int
keyCursor *KeyCursor
}
}
func newUnsignedAscendingCursor(seek int64, cacheValues Values, tsmKeyCursor *KeyCursor) *unsignedAscendingCursor {
c := &unsignedAscendingCursor{}
c.cache.values = cacheValues
c.cache.pos = sort.Search(len(c.cache.values), func(i int) bool {
return c.cache.values[i].UnixNano() >= seek
})
c.tsm.keyCursor = tsmKeyCursor
c.tsm.buf = make([]UnsignedValue, 10)
c.tsm.values, _ = c.tsm.keyCursor.ReadUnsignedBlock(&c.tsm.buf)
c.tsm.pos = sort.Search(len(c.tsm.values), func(i int) bool {
return c.tsm.values[i].UnixNano() >= seek
})
return c
}
// peekCache returns the current time/value from the cache.
func (c *unsignedAscendingCursor) peekCache() (t int64, v uint64) {
if c.cache.pos >= len(c.cache.values) {
return tsdb.EOF, 0
}
item := c.cache.values[c.cache.pos]
return item.UnixNano(), item.(UnsignedValue).value
}
// peekTSM returns the current time/value from tsm.
func (c *unsignedAscendingCursor) peekTSM() (t int64, v uint64) {
if c.tsm.pos < 0 || c.tsm.pos >= len(c.tsm.values) {
return tsdb.EOF, 0
}
item := c.tsm.values[c.tsm.pos]
return item.UnixNano(), item.value
}
// close closes the cursor and any dependent cursors.
func (c *unsignedAscendingCursor) close() error {
c.tsm.keyCursor.Close()
c.tsm.keyCursor = nil
c.tsm.buf = nil
c.cache.values = nil
c.tsm.values = nil
return nil
}
// next returns the next key/value for the cursor.
func (c *unsignedAscendingCursor) next() (int64, interface{}) { return c.nextUnsigned() }
// nextUnsigned returns the next key/value for the cursor.
func (c *unsignedAscendingCursor) nextUnsigned() (int64, uint64) {
ckey, cvalue := c.peekCache()
tkey, tvalue := c.peekTSM()
// No more data in cache or in TSM files.
if ckey == tsdb.EOF && tkey == tsdb.EOF {
return tsdb.EOF, 0
}
// Both cache and tsm files have the same key, cache takes precedence.
if ckey == tkey {
c.nextCache()
c.nextTSM()
return ckey, cvalue
}
// Buffered cache key precedes that in TSM file.
if ckey != tsdb.EOF && (ckey < tkey || tkey == tsdb.EOF) {
c.nextCache()
return ckey, cvalue
}
// Buffered TSM key precedes that in cache.
c.nextTSM()
return tkey, tvalue
}
// nextCache returns the next value from the cache.
func (c *unsignedAscendingCursor) nextCache() {
if c.cache.pos >= len(c.cache.values) {
return
}
c.cache.pos++
}
// nextTSM returns the next value from the TSM files.
func (c *unsignedAscendingCursor) nextTSM() {
c.tsm.pos++
if c.tsm.pos >= len(c.tsm.values) {
c.tsm.keyCursor.Next()
c.tsm.values, _ = c.tsm.keyCursor.ReadUnsignedBlock(&c.tsm.buf)
if len(c.tsm.values) == 0 {
return
}
c.tsm.pos = 0
}
}
type unsignedDescendingCursor struct {
cache struct {
values Values
pos int
}
tsm struct {
buf []UnsignedValue
values []UnsignedValue
pos int
keyCursor *KeyCursor
}
}
func newUnsignedDescendingCursor(seek int64, cacheValues Values, tsmKeyCursor *KeyCursor) *unsignedDescendingCursor {
c := &unsignedDescendingCursor{}
c.cache.values = cacheValues
c.cache.pos = sort.Search(len(c.cache.values), func(i int) bool {
return c.cache.values[i].UnixNano() >= seek
})
if t, _ := c.peekCache(); t != seek {
c.cache.pos--
}
c.tsm.keyCursor = tsmKeyCursor
c.tsm.buf = make([]UnsignedValue, 10)
c.tsm.values, _ = c.tsm.keyCursor.ReadUnsignedBlock(&c.tsm.buf)
c.tsm.pos = sort.Search(len(c.tsm.values), func(i int) bool {
return c.tsm.values[i].UnixNano() >= seek
})
if t, _ := c.peekTSM(); t != seek {
c.tsm.pos--
}
return c
}
// peekCache returns the current time/value from the cache.
func (c *unsignedDescendingCursor) peekCache() (t int64, v uint64) {
if c.cache.pos < 0 || c.cache.pos >= len(c.cache.values) {
return tsdb.EOF, 0
}
item := c.cache.values[c.cache.pos]
return item.UnixNano(), item.(UnsignedValue).value
}
// peekTSM returns the current time/value from tsm.
func (c *unsignedDescendingCursor) peekTSM() (t int64, v uint64) {
if c.tsm.pos < 0 || c.tsm.pos >= len(c.tsm.values) {
return tsdb.EOF, 0
}
item := c.tsm.values[c.tsm.pos]
return item.UnixNano(), item.value
}
// close closes the cursor and any dependent cursors.
func (c *unsignedDescendingCursor) close() error {
c.tsm.keyCursor.Close()
c.tsm.keyCursor = nil
c.tsm.buf = nil
c.cache.values = nil
c.tsm.values = nil
return nil
}
// next returns the next key/value for the cursor.
func (c *unsignedDescendingCursor) next() (int64, interface{}) { return c.nextUnsigned() }
// nextUnsigned returns the next key/value for the cursor.
func (c *unsignedDescendingCursor) nextUnsigned() (int64, uint64) {
ckey, cvalue := c.peekCache()
tkey, tvalue := c.peekTSM()
// No more data in cache or in TSM files.
if ckey == tsdb.EOF && tkey == tsdb.EOF {
return tsdb.EOF, 0
}
// Both cache and tsm files have the same key, cache takes precedence.
if ckey == tkey {
c.nextCache()
c.nextTSM()
return ckey, cvalue
}
// Buffered cache key precedes that in TSM file.
if ckey != tsdb.EOF && (ckey > tkey || tkey == tsdb.EOF) {
c.nextCache()
return ckey, cvalue
}
// Buffered TSM key precedes that in cache.
c.nextTSM()
return tkey, tvalue
}
// nextCache returns the next value from the cache.
func (c *unsignedDescendingCursor) nextCache() {
if c.cache.pos < 0 {
return
}
c.cache.pos--
}
// nextTSM returns the next value from the TSM files.
func (c *unsignedDescendingCursor) nextTSM() {
c.tsm.pos--
if c.tsm.pos < 0 {
c.tsm.keyCursor.Next()
c.tsm.values, _ = c.tsm.keyCursor.ReadUnsignedBlock(&c.tsm.buf)
if len(c.tsm.values) == 0 {
return
}
c.tsm.pos = len(c.tsm.values) - 1
}
}
// unsignedLiteralCursor represents a cursor that always returns a single value.
// It doesn't not have a time value so it can only be used with nextAt().
type unsignedLiteralCursor struct {
value uint64
}
func (c *unsignedLiteralCursor) close() error { return nil }
func (c *unsignedLiteralCursor) peek() (t int64, v interface{}) { return tsdb.EOF, c.value }
func (c *unsignedLiteralCursor) next() (t int64, v interface{}) { return tsdb.EOF, c.value }
func (c *unsignedLiteralCursor) nextAt(seek int64) interface{} { return c.value }
// unsignedNilLiteralCursor represents a cursor that always returns a typed nil value.
// It doesn't not have a time value so it can only be used with nextAt().
type unsignedNilLiteralCursor struct{}
func (c *unsignedNilLiteralCursor) close() error { return nil }
func (c *unsignedNilLiteralCursor) peek() (t int64, v interface{}) { return tsdb.EOF, (*uint64)(nil) }
func (c *unsignedNilLiteralCursor) next() (t int64, v interface{}) { return tsdb.EOF, (*uint64)(nil) }
func (c *unsignedNilLiteralCursor) nextAt(seek int64) interface{} { return (*uint64)(nil) }
type stringIterator struct {
cur stringCursor
aux []cursorAt

View File

@ -93,6 +93,8 @@ func (c *bufCursor) nextAt(seek int64) interface{} {
return (*float64)(nil)
case integerCursor:
return (*int64)(nil)
case unsignedCursor:
return (*uint64)(nil)
case stringCursor:
return (*string)(nil)
case booleanCursor:

View File

@ -13,6 +13,13 @@
"ValueType":"IntegerValue",
"Nil":"0"
},
{
"Name":"Unsigned",
"name":"unsigned",
"Type":"uint64",
"ValueType":"UnsignedValue",
"Nil":"0"
},
{
"Name":"String",
"name":"string",

View File

@ -12,6 +12,8 @@ func newLimitIterator(input influxql.Iterator, opt influxql.IteratorOptions) inf
return newFloatLimitIterator(input, opt)
case influxql.IntegerIterator:
return newIntegerLimitIterator(input, opt)
case influxql.UnsignedIterator:
return newUnsignedLimitIterator(input, opt)
case influxql.StringIterator:
return newStringLimitIterator(input, opt)
case influxql.BooleanIterator:
@ -34,6 +36,19 @@ func (c *floatCastIntegerCursor) nextFloat() (int64, float64) {
return t, float64(v)
}
type floatCastUnsignedCursor struct {
cursor unsignedCursor
}
func (c *floatCastUnsignedCursor) close() error { return c.cursor.close() }
func (c *floatCastUnsignedCursor) next() (t int64, v interface{}) { return c.nextFloat() }
func (c *floatCastUnsignedCursor) nextFloat() (int64, float64) {
t, v := c.cursor.nextUnsigned()
return t, float64(v)
}
type integerCastFloatCursor struct {
cursor floatCursor
}
@ -46,3 +61,42 @@ func (c *integerCastFloatCursor) nextInteger() (int64, int64) {
t, v := c.cursor.nextFloat()
return t, int64(v)
}
type integerCastUnsignedCursor struct {
cursor unsignedCursor
}
func (c *integerCastUnsignedCursor) close() error { return c.cursor.close() }
func (c *integerCastUnsignedCursor) next() (t int64, v interface{}) { return c.nextInteger() }
func (c *integerCastUnsignedCursor) nextInteger() (int64, int64) {
t, v := c.cursor.nextUnsigned()
return t, int64(v)
}
type unsignedCastFloatCursor struct {
cursor floatCursor
}
func (c *unsignedCastFloatCursor) close() error { return c.cursor.close() }
func (c *unsignedCastFloatCursor) next() (t int64, v interface{}) { return c.nextUnsigned() }
func (c *unsignedCastFloatCursor) nextUnsigned() (int64, uint64) {
t, v := c.cursor.nextFloat()
return t, uint64(v)
}
type unsignedCastIntegerCursor struct {
cursor integerCursor
}
func (c *unsignedCastIntegerCursor) close() error { return c.cursor.close() }
func (c *unsignedCastIntegerCursor) next() (t int64, v interface{}) { return c.nextUnsigned() }
func (c *unsignedCastIntegerCursor) nextUnsigned() (int64, uint64) {
t, v := c.cursor.nextInteger()
return t, uint64(v)
}

View File

@ -176,6 +176,7 @@ type blockAccessor interface {
readBlock(entry *IndexEntry, values []Value) ([]Value, error)
readFloatBlock(entry *IndexEntry, values *[]FloatValue) ([]FloatValue, error)
readIntegerBlock(entry *IndexEntry, values *[]IntegerValue) ([]IntegerValue, error)
readUnsignedBlock(entry *IndexEntry, values *[]UnsignedValue) ([]UnsignedValue, error)
readStringBlock(entry *IndexEntry, values *[]StringValue) ([]StringValue, error)
readBooleanBlock(entry *IndexEntry, values *[]BooleanValue) ([]BooleanValue, error)
readBytes(entry *IndexEntry, buf []byte) (uint32, []byte, error)
@ -285,6 +286,14 @@ func (t *TSMReader) ReadIntegerBlockAt(entry *IndexEntry, vals *[]IntegerValue)
return v, err
}
// ReadUnsignedBlockAt returns the unsigned integer values corresponding to the given index entry.
func (t *TSMReader) ReadUnsignedBlockAt(entry *IndexEntry, vals *[]UnsignedValue) ([]UnsignedValue, error) {
t.mu.RLock()
v, err := t.accessor.readUnsignedBlock(entry, vals)
t.mu.RUnlock()
return v, err
}
// ReadStringBlockAt returns the string values corresponding to the given index entry.
func (t *TSMReader) ReadStringBlockAt(entry *IndexEntry, vals *[]StringValue) ([]StringValue, error) {
t.mu.RLock()
@ -1158,6 +1167,24 @@ func (m *mmapAccessor) readIntegerBlock(entry *IndexEntry, values *[]IntegerValu
return a, nil
}
func (m *mmapAccessor) readUnsignedBlock(entry *IndexEntry, values *[]UnsignedValue) ([]UnsignedValue, error) {
m.mu.RLock()
if int64(len(m.b)) < entry.Offset+int64(entry.Size) {
m.mu.RUnlock()
return nil, ErrTSMClosed
}
a, err := DecodeUnsignedBlock(m.b[entry.Offset+4:entry.Offset+int64(entry.Size)], values)
m.mu.RUnlock()
if err != nil {
return nil, err
}
return a, nil
}
func (m *mmapAccessor) readStringBlock(entry *IndexEntry, values *[]StringValue) ([]StringValue, error) {
m.mu.RLock()

View File

@ -74,6 +74,9 @@ func TestTSMReader_MMAP_ReadAll(t *testing.T) {
{"int", []tsm1.Value{
tsm1.NewValue(1, int64(1))},
},
{"uint", []tsm1.Value{
tsm1.NewValue(1, ^uint64(0))},
},
{"bool", []tsm1.Value{
tsm1.NewValue(1, true)},
},
@ -152,6 +155,9 @@ func TestTSMReader_MMAP_Read(t *testing.T) {
{"int", []tsm1.Value{
tsm1.NewValue(1, int64(1))},
},
{"uint", []tsm1.Value{
tsm1.NewValue(1, ^uint64(0))},
},
{"bool", []tsm1.Value{
tsm1.NewValue(1, true)},
},
@ -229,6 +235,9 @@ func TestTSMReader_MMAP_Keys(t *testing.T) {
{"int", []tsm1.Value{
tsm1.NewValue(1, int64(1))},
},
{"uint", []tsm1.Value{
tsm1.NewValue(1, ^uint64(0))},
},
{"bool", []tsm1.Value{
tsm1.NewValue(1, true)},
},
@ -1160,10 +1169,11 @@ func TestBlockIterator_Sorted(t *testing.T) {
}
values := map[string][]tsm1.Value{
"mem": []tsm1.Value{tsm1.NewValue(0, int64(1))},
"cpu": []tsm1.Value{tsm1.NewValue(1, float64(2))},
"disk": []tsm1.Value{tsm1.NewValue(1, true)},
"load": []tsm1.Value{tsm1.NewValue(1, "string")},
"mem": []tsm1.Value{tsm1.NewValue(0, int64(1))},
"cycles": []tsm1.Value{tsm1.NewValue(0, ^uint64(0))},
"cpu": []tsm1.Value{tsm1.NewValue(1, float64(2))},
"disk": []tsm1.Value{tsm1.NewValue(1, true)},
"load": []tsm1.Value{tsm1.NewValue(1, "string")},
}
for k, v := range values {
@ -1323,6 +1333,9 @@ func TestTSMReader_File_ReadAll(t *testing.T) {
{"int", []tsm1.Value{
tsm1.NewValue(1, int64(1))},
},
{"uint", []tsm1.Value{
tsm1.NewValue(1, ^uint64(0))},
},
{"bool", []tsm1.Value{
tsm1.NewValue(1, true)},
},
@ -1470,6 +1483,9 @@ func TestTSMReader_File_Read(t *testing.T) {
{"int", []tsm1.Value{
tsm1.NewValue(1, int64(1))},
},
{"uint", []tsm1.Value{
tsm1.NewValue(1, ^uint64(0))},
},
{"bool", []tsm1.Value{
tsm1.NewValue(1, true)},
},
@ -1547,6 +1563,9 @@ func TestTSMReader_References(t *testing.T) {
{"int", []tsm1.Value{
tsm1.NewValue(1, int64(1))},
},
{"uint", []tsm1.Value{
tsm1.NewValue(1, ^uint64(0))},
},
{"bool", []tsm1.Value{
tsm1.NewValue(1, true)},
},

View File

@ -36,10 +36,11 @@ const (
// walEncodeBufSize is the size of the wal entry encoding buffer
walEncodeBufSize = 4 * 1024 * 1024
float64EntryType = 1
integerEntryType = 2
booleanEntryType = 3
stringEntryType = 4
float64EntryType = 1
integerEntryType = 2
booleanEntryType = 3
stringEntryType = 4
unsignedEntryType = 5
)
// WalEntryType is a byte written to a wal segment file that indicates what the following compressed block contains.
@ -604,7 +605,7 @@ func (w *WriteWALEntry) MarshalSize() int {
encLen += 8 * len(v) // timestamps (8)
switch v[0].(type) {
case FloatValue, IntegerValue:
case FloatValue, IntegerValue, UnsignedValue:
encLen += 8 * len(v)
case BooleanValue:
encLen += 1 * len(v)
@ -667,6 +668,8 @@ func (w *WriteWALEntry) Encode(dst []byte) ([]byte, error) {
curType = float64EntryType
case IntegerValue:
curType = integerEntryType
case UnsignedValue:
curType = unsignedEntryType
case BooleanValue:
curType = booleanEntryType
case StringValue:
@ -701,6 +704,12 @@ func (w *WriteWALEntry) Encode(dst []byte) ([]byte, error) {
}
binary.BigEndian.PutUint64(dst[n:n+8], uint64(vv.value))
n += 8
case UnsignedValue:
if curType != unsignedEntryType {
return nil, fmt.Errorf("incorrect value found in %T slice: %T", v[0].Value(), vv)
}
binary.BigEndian.PutUint64(dst[n:n+8], uint64(vv.value))
n += 8
case BooleanValue:
if curType != booleanEntryType {
return nil, fmt.Errorf("incorrect value found in %T slice: %T", v[0].Value(), vv)
@ -792,6 +801,21 @@ func (w *WriteWALEntry) UnmarshalBinary(b []byte) error {
}
w.Values[k] = values
case unsignedEntryType:
if i+16*nvals > len(b) {
return ErrWALCorrupt
}
values := make([]Value, 0, nvals)
for j := 0; j < nvals; j++ {
un := int64(binary.BigEndian.Uint64(b[i : i+8]))
i += 8
v := binary.BigEndian.Uint64(b[i : i+8])
i += 8
values = append(values, NewUnsignedValue(un, v))
}
w.Values[k] = values
case booleanEntryType:
if i+9*nvals > len(b) {
return ErrWALCorrupt

View File

@ -6,9 +6,8 @@ import (
"os"
"testing"
"github.com/influxdata/influxdb/tsdb/engine/tsm1"
"github.com/golang/snappy"
"github.com/influxdata/influxdb/tsdb/engine/tsm1"
)
func TestWALWriter_WriteMulti_Single(t *testing.T) {
@ -21,12 +20,14 @@ func TestWALWriter_WriteMulti_Single(t *testing.T) {
p2 := tsm1.NewValue(1, int64(1))
p3 := tsm1.NewValue(1, true)
p4 := tsm1.NewValue(1, "string")
p5 := tsm1.NewValue(1, ^uint64(0))
values := map[string][]tsm1.Value{
"cpu,host=A#!~#float": []tsm1.Value{p1},
"cpu,host=A#!~#int": []tsm1.Value{p2},
"cpu,host=A#!~#bool": []tsm1.Value{p3},
"cpu,host=A#!~#string": []tsm1.Value{p4},
"cpu,host=A#!~#float": []tsm1.Value{p1},
"cpu,host=A#!~#int": []tsm1.Value{p2},
"cpu,host=A#!~#bool": []tsm1.Value{p3},
"cpu,host=A#!~#string": []tsm1.Value{p4},
"cpu,host=A#!~#unsigned": []tsm1.Value{p5},
}
entry := &tsm1.WriteWALEntry{
@ -607,12 +608,14 @@ func TestWriteWALSegment_UnmarshalBinary_WriteWALCorrupt(t *testing.T) {
p2 := tsm1.NewValue(1, int64(1))
p3 := tsm1.NewValue(1, true)
p4 := tsm1.NewValue(1, "string")
p5 := tsm1.NewValue(1, uint64(1))
values := map[string][]tsm1.Value{
"cpu,host=A#!~#float": []tsm1.Value{p1, p1},
"cpu,host=A#!~#int": []tsm1.Value{p2, p2},
"cpu,host=A#!~#bool": []tsm1.Value{p3, p3},
"cpu,host=A#!~#string": []tsm1.Value{p4, p4},
"cpu,host=A#!~#float": []tsm1.Value{p1, p1},
"cpu,host=A#!~#int": []tsm1.Value{p2, p2},
"cpu,host=A#!~#bool": []tsm1.Value{p3, p3},
"cpu,host=A#!~#string": []tsm1.Value{p4, p4},
"cpu,host=A#!~#unsigned": []tsm1.Value{p5, p5},
}
w := &tsm1.WriteWALEntry{

View File

@ -647,6 +647,8 @@ func (s *Shard) validateSeriesAndFields(points []models.Point) ([]models.Point,
fieldType = influxql.Float
case models.Integer:
fieldType = influxql.Integer
case models.Unsigned:
fieldType = influxql.Unsigned
case models.Boolean:
fieldType = influxql.Boolean
case models.String: