Merged pull request #685 from influxdata/nc-hist-quantile
feat: Add histogramQuantile functionpull/10616/head
commit
796fd7217e
|
@ -1401,6 +1401,48 @@ from(db:"telegraf")
|
|||
r.service == "app-server")
|
||||
```
|
||||
|
||||
#### HistogramQuantile
|
||||
|
||||
HistogramQuantile approximates a quantile given an histogram that approximates the cumulative distribution of the dataset.
|
||||
Each input table represents a single histogram.
|
||||
The histogram tables must have two columns, a count column and an upper bound column.
|
||||
The count is the number of values that are less than or equal to the upper bound value.
|
||||
The table can have any number of records, each representing an entry in the histogram.
|
||||
The counts must be monotonically increasing when sorted by upper bound.
|
||||
|
||||
Linear interpolation between the two closest bounds is used to compute the quantile.
|
||||
If the either of the bounds used in interpolation are infinite, then the other finite bound is used and no interpolation is performed.
|
||||
|
||||
The output table will have a the same group key as the input table.
|
||||
The columns not part of the group key will be removed and a single value column of type float will be added.
|
||||
The count and upper bound columns must not be part of the group key.
|
||||
The value column represents the value of the desired quantile from the histogram.
|
||||
|
||||
HistogramQuantile has the following properties:
|
||||
|
||||
* `quantile` float
|
||||
Quantile is a value between 0 and 1 indicating the desired quantile to compute.
|
||||
* `countColumn` string
|
||||
CountColumn is the name of the column containing the histogram counts.
|
||||
The count column type must be float.
|
||||
Defaults to `_value`.
|
||||
* `upperBoundColumn` string
|
||||
UpperBoundColumn is the name of the column containing the histogram upper bounds.
|
||||
The upper bound column type must be float.
|
||||
* `valueColumn` string
|
||||
ValueColumn is the name of the output column which will contain the computed quantile.
|
||||
Defaults to `_value`.
|
||||
* `minValue` float
|
||||
MinValue is the assumed minumum value of the dataset.
|
||||
When the quantile falls below the lowest upper bound, interpolation is performed between
|
||||
minValue and the lowest upper bound.
|
||||
When minValue is equal to negative infinity, the lowest upper bound is used.
|
||||
Defaults to 0.
|
||||
|
||||
Example:
|
||||
|
||||
histogramQuantile(quantile:0.9, upperBoundColumn:"le") // compute the 90th quantile using histogram data.
|
||||
|
||||
#### Limit
|
||||
|
||||
Limit caps the number of records in output tables to a fixed size n.
|
||||
|
|
|
@ -0,0 +1,302 @@
|
|||
package functions
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math"
|
||||
"sort"
|
||||
|
||||
"github.com/influxdata/platform/query"
|
||||
"github.com/influxdata/platform/query/execute"
|
||||
"github.com/influxdata/platform/query/plan"
|
||||
"github.com/influxdata/platform/query/semantic"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
const HistogramQuantileKind = "histogramQuantile"
|
||||
|
||||
type HistogramQuantileOpSpec struct {
|
||||
Quantile float64 `json:"quantile"`
|
||||
CountColumn string `json:"countColumn"`
|
||||
UpperBoundColumn string `json:"upperBoundColumn"`
|
||||
ValueColumn string `json:"valueColumn"`
|
||||
MinValue float64 `json:"minValue"`
|
||||
}
|
||||
|
||||
var histogramQuantileSignature = query.DefaultFunctionSignature()
|
||||
|
||||
func init() {
|
||||
histogramQuantileSignature.Params["quantile"] = semantic.Float
|
||||
histogramQuantileSignature.Params["countColumn"] = semantic.String
|
||||
histogramQuantileSignature.Params["upperBoundColumn"] = semantic.String
|
||||
histogramQuantileSignature.Params["valueColumn"] = semantic.String
|
||||
histogramQuantileSignature.Params["minValue"] = semantic.Float
|
||||
|
||||
query.RegisterFunction(HistogramQuantileKind, createHistogramQuantileOpSpec, histogramQuantileSignature)
|
||||
query.RegisterOpSpec(HistogramQuantileKind, newHistogramQuantileOp)
|
||||
plan.RegisterProcedureSpec(HistogramQuantileKind, newHistogramQuantileProcedure, HistogramQuantileKind)
|
||||
execute.RegisterTransformation(HistogramQuantileKind, createHistogramQuantileTransformation)
|
||||
}
|
||||
func createHistogramQuantileOpSpec(args query.Arguments, a *query.Administration) (query.OperationSpec, error) {
|
||||
if err := a.AddParentFromArgs(args); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
s := new(HistogramQuantileOpSpec)
|
||||
q, err := args.GetRequiredFloat("quantile")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
s.Quantile = q
|
||||
|
||||
if col, ok, err := args.GetString("countColumn"); err != nil {
|
||||
return nil, err
|
||||
} else if ok {
|
||||
s.CountColumn = col
|
||||
} else {
|
||||
s.CountColumn = execute.DefaultValueColLabel
|
||||
}
|
||||
|
||||
if col, ok, err := args.GetString("upperBoundColumn"); err != nil {
|
||||
return nil, err
|
||||
} else if ok {
|
||||
s.UpperBoundColumn = col
|
||||
}
|
||||
|
||||
if col, ok, err := args.GetString("valueColumn"); err != nil {
|
||||
return nil, err
|
||||
} else if ok {
|
||||
s.ValueColumn = col
|
||||
} else {
|
||||
s.ValueColumn = execute.DefaultValueColLabel
|
||||
}
|
||||
|
||||
if min, ok, err := args.GetFloat("minValue"); err != nil {
|
||||
return nil, err
|
||||
} else if ok {
|
||||
s.MinValue = min
|
||||
}
|
||||
|
||||
return s, nil
|
||||
}
|
||||
|
||||
func newHistogramQuantileOp() query.OperationSpec {
|
||||
return new(HistogramQuantileOpSpec)
|
||||
}
|
||||
|
||||
func (s *HistogramQuantileOpSpec) Kind() query.OperationKind {
|
||||
return HistogramQuantileKind
|
||||
}
|
||||
|
||||
type HistogramQuantileProcedureSpec struct {
|
||||
Quantile float64 `json:"quantile"`
|
||||
CountColumn string `json:"countColumn"`
|
||||
UpperBoundColumn string `json:"upperBoundColumn"`
|
||||
ValueColumn string `json:"valueColumn"`
|
||||
MinValue float64 `json:"minValue"`
|
||||
}
|
||||
|
||||
func newHistogramQuantileProcedure(qs query.OperationSpec, a plan.Administration) (plan.ProcedureSpec, error) {
|
||||
spec, ok := qs.(*HistogramQuantileOpSpec)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("invalid spec type %T", qs)
|
||||
}
|
||||
return &HistogramQuantileProcedureSpec{
|
||||
Quantile: spec.Quantile,
|
||||
CountColumn: spec.CountColumn,
|
||||
UpperBoundColumn: spec.UpperBoundColumn,
|
||||
ValueColumn: spec.ValueColumn,
|
||||
MinValue: spec.MinValue,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *HistogramQuantileProcedureSpec) Kind() plan.ProcedureKind {
|
||||
return HistogramQuantileKind
|
||||
}
|
||||
func (s *HistogramQuantileProcedureSpec) Copy() plan.ProcedureSpec {
|
||||
ns := new(HistogramQuantileProcedureSpec)
|
||||
*ns = *s
|
||||
return ns
|
||||
}
|
||||
|
||||
type histogramQuantileTransformation struct {
|
||||
d execute.Dataset
|
||||
cache execute.TableBuilderCache
|
||||
|
||||
spec HistogramQuantileProcedureSpec
|
||||
}
|
||||
|
||||
type bucket struct {
|
||||
count float64
|
||||
upperBound float64
|
||||
}
|
||||
|
||||
func createHistogramQuantileTransformation(id execute.DatasetID, mode execute.AccumulationMode, spec plan.ProcedureSpec, a execute.Administration) (execute.Transformation, execute.Dataset, error) {
|
||||
s, ok := spec.(*HistogramQuantileProcedureSpec)
|
||||
if !ok {
|
||||
return nil, nil, fmt.Errorf("invalid spec type %T", spec)
|
||||
}
|
||||
cache := execute.NewTableBuilderCache(a.Allocator())
|
||||
d := execute.NewDataset(id, mode, cache)
|
||||
t := NewHistorgramQuantileTransformation(d, cache, s)
|
||||
return t, d, nil
|
||||
}
|
||||
|
||||
func NewHistorgramQuantileTransformation(
|
||||
d execute.Dataset,
|
||||
cache execute.TableBuilderCache,
|
||||
spec *HistogramQuantileProcedureSpec,
|
||||
) execute.Transformation {
|
||||
return &histogramQuantileTransformation{
|
||||
d: d,
|
||||
cache: cache,
|
||||
spec: *spec,
|
||||
}
|
||||
}
|
||||
|
||||
func (t histogramQuantileTransformation) RetractTable(id execute.DatasetID, key query.GroupKey) error {
|
||||
// TODO
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t histogramQuantileTransformation) Process(id execute.DatasetID, tbl query.Table) error {
|
||||
builder, created := t.cache.TableBuilder(tbl.Key())
|
||||
if !created {
|
||||
return fmt.Errorf("histogramQuantile found duplicate table with key: %v", tbl.Key())
|
||||
}
|
||||
|
||||
execute.AddTableKeyCols(tbl.Key(), builder)
|
||||
valueIdx := builder.AddCol(query.ColMeta{
|
||||
Label: t.spec.ValueColumn,
|
||||
Type: query.TFloat,
|
||||
})
|
||||
|
||||
countIdx := execute.ColIdx(t.spec.CountColumn, tbl.Cols())
|
||||
if countIdx < 0 {
|
||||
return fmt.Errorf("table is missing count column %q", t.spec.CountColumn)
|
||||
}
|
||||
if tbl.Cols()[countIdx].Type != query.TFloat {
|
||||
return fmt.Errorf("count column %q must be of type float", t.spec.CountColumn)
|
||||
}
|
||||
upperBoundIdx := execute.ColIdx(t.spec.UpperBoundColumn, tbl.Cols())
|
||||
if upperBoundIdx < 0 {
|
||||
return fmt.Errorf("table is missing upper bound column %q", t.spec.UpperBoundColumn)
|
||||
}
|
||||
if tbl.Cols()[upperBoundIdx].Type != query.TFloat {
|
||||
return fmt.Errorf("upper bound column %q must be of type float", t.spec.UpperBoundColumn)
|
||||
}
|
||||
// Read buckets
|
||||
var cdf []bucket
|
||||
sorted := true //track if the cdf was naturally sorted
|
||||
err := tbl.Do(func(cr query.ColReader) error {
|
||||
offset := len(cdf)
|
||||
// Grow cdf by number of rows
|
||||
l := offset + cr.Len()
|
||||
if cap(cdf) < l {
|
||||
cpy := make([]bucket, l, l*2)
|
||||
// Copy existing buckets to new slice
|
||||
copy(cpy, cdf)
|
||||
cdf = cpy
|
||||
} else {
|
||||
cdf = cdf[:l]
|
||||
}
|
||||
for i := 0; i < cr.Len(); i++ {
|
||||
curr := i + offset
|
||||
prev := curr - 1
|
||||
cdf[curr] = bucket{
|
||||
count: cr.Floats(countIdx)[i],
|
||||
upperBound: cr.Floats(upperBoundIdx)[i],
|
||||
}
|
||||
if prev >= 0 {
|
||||
sorted = sorted && cdf[prev].upperBound <= cdf[curr].upperBound
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if !sorted {
|
||||
sort.Slice(cdf, func(i, j int) bool {
|
||||
return cdf[i].upperBound < cdf[j].upperBound
|
||||
})
|
||||
}
|
||||
|
||||
q, err := t.computeQuantile(cdf)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
execute.AppendKeyValues(tbl.Key(), builder)
|
||||
builder.AppendFloat(valueIdx, q)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *histogramQuantileTransformation) computeQuantile(cdf []bucket) (float64, error) {
|
||||
if len(cdf) == 0 {
|
||||
return 0, errors.New("histogram is empty")
|
||||
}
|
||||
// Find rank index and check counts are monotonic
|
||||
prevCount := 0.0
|
||||
totalCount := cdf[len(cdf)-1].count
|
||||
rank := t.spec.Quantile * totalCount
|
||||
rankIdx := -1
|
||||
for i, b := range cdf {
|
||||
if b.count < prevCount {
|
||||
return 0, errors.New("histogram records counts are not monotonic")
|
||||
}
|
||||
prevCount = b.count
|
||||
|
||||
if rank >= b.count {
|
||||
rankIdx = i
|
||||
}
|
||||
}
|
||||
var (
|
||||
lowerCount,
|
||||
lowerBound,
|
||||
upperCount,
|
||||
upperBound float64
|
||||
)
|
||||
switch rankIdx {
|
||||
case -1:
|
||||
// Quantile is below the lowest upper bound, interpolate using the min value
|
||||
lowerCount = 0
|
||||
lowerBound = t.spec.MinValue
|
||||
upperCount = cdf[0].count
|
||||
upperBound = cdf[0].upperBound
|
||||
case len(cdf) - 1:
|
||||
// Quantile is above the highest upper bound, simply return it as it must be finite
|
||||
return cdf[len(cdf)-1].upperBound, nil
|
||||
default:
|
||||
lowerCount = cdf[rankIdx].count
|
||||
lowerBound = cdf[rankIdx].upperBound
|
||||
upperCount = cdf[rankIdx+1].count
|
||||
upperBound = cdf[rankIdx+1].upperBound
|
||||
}
|
||||
if rank == lowerCount {
|
||||
// No need to interpolate
|
||||
return lowerBound, nil
|
||||
}
|
||||
if math.IsInf(lowerBound, -1) {
|
||||
// We cannot interpolate with infinity
|
||||
return upperBound, nil
|
||||
}
|
||||
if math.IsInf(upperBound, 1) {
|
||||
// We cannot interpolate with infinity
|
||||
return lowerBound, nil
|
||||
}
|
||||
// Compute quantile using linear interpolation
|
||||
scale := (rank - lowerCount) / (upperCount - lowerCount)
|
||||
return lowerBound + (upperBound-lowerBound)*scale, nil
|
||||
}
|
||||
|
||||
func (t histogramQuantileTransformation) UpdateWatermark(id execute.DatasetID, mark execute.Time) error {
|
||||
return t.d.UpdateWatermark(mark)
|
||||
}
|
||||
|
||||
func (t histogramQuantileTransformation) UpdateProcessingTime(id execute.DatasetID, pt execute.Time) error {
|
||||
return t.d.UpdateProcessingTime(pt)
|
||||
}
|
||||
|
||||
func (t histogramQuantileTransformation) Finish(id execute.DatasetID, err error) {
|
||||
t.d.Finish(err)
|
||||
}
|
|
@ -0,0 +1,405 @@
|
|||
package functions_test
|
||||
|
||||
import (
|
||||
"math"
|
||||
"testing"
|
||||
|
||||
"github.com/influxdata/platform/query"
|
||||
"github.com/influxdata/platform/query/execute"
|
||||
"github.com/influxdata/platform/query/execute/executetest"
|
||||
"github.com/influxdata/platform/query/functions"
|
||||
)
|
||||
|
||||
var linearHist = []query.Table{&executetest.Table{
|
||||
KeyCols: []string{"_start", "_stop"},
|
||||
ColMeta: []query.ColMeta{
|
||||
{Label: "_start", Type: query.TTime},
|
||||
{Label: "_stop", Type: query.TTime},
|
||||
{Label: "_time", Type: query.TTime},
|
||||
{Label: "le", Type: query.TFloat},
|
||||
{Label: "_value", Type: query.TFloat},
|
||||
},
|
||||
Data: [][]interface{}{
|
||||
{execute.Time(1), execute.Time(3), execute.Time(1), 0.1, 1.0},
|
||||
{execute.Time(1), execute.Time(3), execute.Time(1), 0.2, 2.0},
|
||||
{execute.Time(1), execute.Time(3), execute.Time(1), 0.3, 3.0},
|
||||
{execute.Time(1), execute.Time(3), execute.Time(1), 0.4, 4.0},
|
||||
{execute.Time(1), execute.Time(3), execute.Time(1), 0.5, 5.0},
|
||||
{execute.Time(1), execute.Time(3), execute.Time(1), 0.6, 6.0},
|
||||
{execute.Time(1), execute.Time(3), execute.Time(1), 0.7, 7.0},
|
||||
{execute.Time(1), execute.Time(3), execute.Time(1), 0.8, 8.0},
|
||||
{execute.Time(1), execute.Time(3), execute.Time(1), 0.9, 9.0},
|
||||
{execute.Time(1), execute.Time(3), execute.Time(1), 1.0, 10.0},
|
||||
{execute.Time(1), execute.Time(3), execute.Time(1), math.Inf(1), 10.0},
|
||||
},
|
||||
}}
|
||||
var linearHistNoMax = []query.Table{&executetest.Table{
|
||||
KeyCols: []string{"_start", "_stop"},
|
||||
ColMeta: []query.ColMeta{
|
||||
{Label: "_start", Type: query.TTime},
|
||||
{Label: "_stop", Type: query.TTime},
|
||||
{Label: "_time", Type: query.TTime},
|
||||
{Label: "le", Type: query.TFloat},
|
||||
{Label: "_value", Type: query.TFloat},
|
||||
},
|
||||
Data: [][]interface{}{
|
||||
{execute.Time(1), execute.Time(3), execute.Time(1), 0.2, 2.0},
|
||||
{execute.Time(1), execute.Time(3), execute.Time(1), 0.4, 4.0},
|
||||
{execute.Time(1), execute.Time(3), execute.Time(1), 0.6, 6.0},
|
||||
{execute.Time(1), execute.Time(3), execute.Time(1), 0.8, 8.0},
|
||||
{execute.Time(1), execute.Time(3), execute.Time(1), 1.0, 10.0},
|
||||
},
|
||||
}}
|
||||
var unsortedOddHist = []query.Table{&executetest.Table{
|
||||
KeyCols: []string{"_start", "_stop"},
|
||||
ColMeta: []query.ColMeta{
|
||||
{Label: "_start", Type: query.TTime},
|
||||
{Label: "_stop", Type: query.TTime},
|
||||
{Label: "_time", Type: query.TTime},
|
||||
{Label: "le", Type: query.TFloat},
|
||||
{Label: "_value", Type: query.TFloat},
|
||||
},
|
||||
Data: [][]interface{}{
|
||||
{execute.Time(1), execute.Time(3), execute.Time(1), 0.4, 4.0},
|
||||
{execute.Time(1), execute.Time(3), execute.Time(1), 1.0, 10.0},
|
||||
{execute.Time(1), execute.Time(3), execute.Time(1), 0.6, 6.0},
|
||||
{execute.Time(1), execute.Time(3), execute.Time(1), 0.2, 2.0},
|
||||
{execute.Time(1), execute.Time(3), execute.Time(1), 0.8, 10.0},
|
||||
},
|
||||
}}
|
||||
var nonLinearHist = []query.Table{&executetest.Table{
|
||||
KeyCols: []string{"_start", "_stop"},
|
||||
ColMeta: []query.ColMeta{
|
||||
{Label: "_start", Type: query.TTime},
|
||||
{Label: "_stop", Type: query.TTime},
|
||||
{Label: "_time", Type: query.TTime},
|
||||
{Label: "le", Type: query.TFloat},
|
||||
{Label: "_value", Type: query.TFloat},
|
||||
},
|
||||
Data: [][]interface{}{
|
||||
{execute.Time(1), execute.Time(3), execute.Time(1), 0.1, 1.0},
|
||||
{execute.Time(1), execute.Time(3), execute.Time(1), 0.5, 5.0},
|
||||
{execute.Time(1), execute.Time(3), execute.Time(1), 1.0, 10.0},
|
||||
{execute.Time(1), execute.Time(3), execute.Time(1), math.Inf(1), 11.0},
|
||||
},
|
||||
}}
|
||||
|
||||
func TestHistogramQuantile_Process(t *testing.T) {
|
||||
testCases := []struct {
|
||||
name string
|
||||
spec *functions.HistogramQuantileProcedureSpec
|
||||
data []query.Table
|
||||
want []*executetest.Table
|
||||
}{
|
||||
{
|
||||
name: "90th linear",
|
||||
spec: &functions.HistogramQuantileProcedureSpec{
|
||||
Quantile: 0.9,
|
||||
CountColumn: "_value",
|
||||
UpperBoundColumn: "le",
|
||||
ValueColumn: "_value",
|
||||
},
|
||||
data: linearHist,
|
||||
want: []*executetest.Table{{
|
||||
KeyCols: []string{"_start", "_stop"},
|
||||
ColMeta: []query.ColMeta{
|
||||
{Label: "_start", Type: query.TTime},
|
||||
{Label: "_stop", Type: query.TTime},
|
||||
{Label: "_value", Type: query.TFloat},
|
||||
},
|
||||
Data: [][]interface{}{
|
||||
{execute.Time(1), execute.Time(3), 0.9},
|
||||
},
|
||||
}},
|
||||
},
|
||||
{
|
||||
name: "0th linear",
|
||||
spec: &functions.HistogramQuantileProcedureSpec{
|
||||
Quantile: 0.0,
|
||||
CountColumn: "_value",
|
||||
UpperBoundColumn: "le",
|
||||
ValueColumn: "_value",
|
||||
},
|
||||
data: linearHist,
|
||||
want: []*executetest.Table{{
|
||||
KeyCols: []string{"_start", "_stop"},
|
||||
ColMeta: []query.ColMeta{
|
||||
{Label: "_start", Type: query.TTime},
|
||||
{Label: "_stop", Type: query.TTime},
|
||||
{Label: "_value", Type: query.TFloat},
|
||||
},
|
||||
Data: [][]interface{}{
|
||||
{execute.Time(1), execute.Time(3), 0.0},
|
||||
},
|
||||
}},
|
||||
},
|
||||
{
|
||||
name: "5th linear",
|
||||
spec: &functions.HistogramQuantileProcedureSpec{
|
||||
Quantile: 0.05,
|
||||
CountColumn: "_value",
|
||||
UpperBoundColumn: "le",
|
||||
ValueColumn: "_value",
|
||||
},
|
||||
data: linearHist,
|
||||
want: []*executetest.Table{{
|
||||
KeyCols: []string{"_start", "_stop"},
|
||||
ColMeta: []query.ColMeta{
|
||||
{Label: "_start", Type: query.TTime},
|
||||
{Label: "_stop", Type: query.TTime},
|
||||
{Label: "_value", Type: query.TFloat},
|
||||
},
|
||||
Data: [][]interface{}{
|
||||
{execute.Time(1), execute.Time(3), 0.05},
|
||||
},
|
||||
}},
|
||||
},
|
||||
{
|
||||
name: "5th linear -0.1 min value",
|
||||
spec: &functions.HistogramQuantileProcedureSpec{
|
||||
Quantile: 0.05,
|
||||
CountColumn: "_value",
|
||||
UpperBoundColumn: "le",
|
||||
ValueColumn: "_value",
|
||||
MinValue: -0.1,
|
||||
},
|
||||
data: linearHist,
|
||||
want: []*executetest.Table{{
|
||||
KeyCols: []string{"_start", "_stop"},
|
||||
ColMeta: []query.ColMeta{
|
||||
{Label: "_start", Type: query.TTime},
|
||||
{Label: "_stop", Type: query.TTime},
|
||||
{Label: "_value", Type: query.TFloat},
|
||||
},
|
||||
Data: [][]interface{}{
|
||||
{execute.Time(1), execute.Time(3), 0.0},
|
||||
},
|
||||
}},
|
||||
},
|
||||
{
|
||||
name: "5th linear -inf min value",
|
||||
spec: &functions.HistogramQuantileProcedureSpec{
|
||||
Quantile: 0.05,
|
||||
CountColumn: "_value",
|
||||
UpperBoundColumn: "le",
|
||||
ValueColumn: "_value",
|
||||
MinValue: math.Inf(-1),
|
||||
},
|
||||
data: linearHist,
|
||||
want: []*executetest.Table{{
|
||||
KeyCols: []string{"_start", "_stop"},
|
||||
ColMeta: []query.ColMeta{
|
||||
{Label: "_start", Type: query.TTime},
|
||||
{Label: "_stop", Type: query.TTime},
|
||||
{Label: "_value", Type: query.TFloat},
|
||||
},
|
||||
Data: [][]interface{}{
|
||||
{execute.Time(1), execute.Time(3), 0.1},
|
||||
},
|
||||
}},
|
||||
},
|
||||
{
|
||||
name: "10th linear",
|
||||
spec: &functions.HistogramQuantileProcedureSpec{
|
||||
Quantile: 0.1,
|
||||
CountColumn: "_value",
|
||||
UpperBoundColumn: "le",
|
||||
ValueColumn: "_value",
|
||||
},
|
||||
data: linearHist,
|
||||
want: []*executetest.Table{{
|
||||
KeyCols: []string{"_start", "_stop"},
|
||||
ColMeta: []query.ColMeta{
|
||||
{Label: "_start", Type: query.TTime},
|
||||
{Label: "_stop", Type: query.TTime},
|
||||
{Label: "_value", Type: query.TFloat},
|
||||
},
|
||||
Data: [][]interface{}{
|
||||
{execute.Time(1), execute.Time(3), 0.1},
|
||||
},
|
||||
}},
|
||||
},
|
||||
{
|
||||
name: "95th linear",
|
||||
spec: &functions.HistogramQuantileProcedureSpec{
|
||||
Quantile: 0.95,
|
||||
CountColumn: "_value",
|
||||
UpperBoundColumn: "le",
|
||||
ValueColumn: "_value",
|
||||
},
|
||||
data: linearHist,
|
||||
want: []*executetest.Table{{
|
||||
KeyCols: []string{"_start", "_stop"},
|
||||
ColMeta: []query.ColMeta{
|
||||
{Label: "_start", Type: query.TTime},
|
||||
{Label: "_stop", Type: query.TTime},
|
||||
{Label: "_value", Type: query.TFloat},
|
||||
},
|
||||
Data: [][]interface{}{
|
||||
{execute.Time(1), execute.Time(3), 0.95},
|
||||
},
|
||||
}},
|
||||
},
|
||||
{
|
||||
name: "99.999th linear",
|
||||
spec: &functions.HistogramQuantileProcedureSpec{
|
||||
Quantile: 0.99999,
|
||||
CountColumn: "_value",
|
||||
UpperBoundColumn: "le",
|
||||
ValueColumn: "_value",
|
||||
},
|
||||
data: linearHist,
|
||||
want: []*executetest.Table{{
|
||||
KeyCols: []string{"_start", "_stop"},
|
||||
ColMeta: []query.ColMeta{
|
||||
{Label: "_start", Type: query.TTime},
|
||||
{Label: "_stop", Type: query.TTime},
|
||||
{Label: "_value", Type: query.TFloat},
|
||||
},
|
||||
Data: [][]interface{}{
|
||||
{execute.Time(1), execute.Time(3), 0.99999},
|
||||
},
|
||||
}},
|
||||
},
|
||||
{
|
||||
name: "100th linear",
|
||||
spec: &functions.HistogramQuantileProcedureSpec{
|
||||
Quantile: 1.0,
|
||||
CountColumn: "_value",
|
||||
UpperBoundColumn: "le",
|
||||
ValueColumn: "_value",
|
||||
},
|
||||
data: linearHist,
|
||||
want: []*executetest.Table{{
|
||||
KeyCols: []string{"_start", "_stop"},
|
||||
ColMeta: []query.ColMeta{
|
||||
{Label: "_start", Type: query.TTime},
|
||||
{Label: "_stop", Type: query.TTime},
|
||||
{Label: "_value", Type: query.TFloat},
|
||||
},
|
||||
Data: [][]interface{}{
|
||||
{execute.Time(1), execute.Time(3), math.Inf(1)},
|
||||
},
|
||||
}},
|
||||
},
|
||||
{
|
||||
name: "100th linear no max",
|
||||
spec: &functions.HistogramQuantileProcedureSpec{
|
||||
Quantile: 1.0,
|
||||
CountColumn: "_value",
|
||||
UpperBoundColumn: "le",
|
||||
ValueColumn: "_value",
|
||||
},
|
||||
data: linearHistNoMax,
|
||||
want: []*executetest.Table{{
|
||||
KeyCols: []string{"_start", "_stop"},
|
||||
ColMeta: []query.ColMeta{
|
||||
{Label: "_start", Type: query.TTime},
|
||||
{Label: "_stop", Type: query.TTime},
|
||||
{Label: "_value", Type: query.TFloat},
|
||||
},
|
||||
Data: [][]interface{}{
|
||||
{execute.Time(1), execute.Time(3), 1.0},
|
||||
},
|
||||
}},
|
||||
},
|
||||
{
|
||||
name: "90th linear unsorted odd",
|
||||
spec: &functions.HistogramQuantileProcedureSpec{
|
||||
Quantile: 0.9,
|
||||
CountColumn: "_value",
|
||||
UpperBoundColumn: "le",
|
||||
ValueColumn: "_value",
|
||||
},
|
||||
data: unsortedOddHist,
|
||||
want: []*executetest.Table{{
|
||||
KeyCols: []string{"_start", "_stop"},
|
||||
ColMeta: []query.ColMeta{
|
||||
{Label: "_start", Type: query.TTime},
|
||||
{Label: "_stop", Type: query.TTime},
|
||||
{Label: "_value", Type: query.TFloat},
|
||||
},
|
||||
Data: [][]interface{}{
|
||||
{execute.Time(1), execute.Time(3), 0.75},
|
||||
},
|
||||
}},
|
||||
},
|
||||
{
|
||||
name: "100th linear unsorted odd",
|
||||
spec: &functions.HistogramQuantileProcedureSpec{
|
||||
Quantile: 1.0,
|
||||
CountColumn: "_value",
|
||||
UpperBoundColumn: "le",
|
||||
ValueColumn: "_value",
|
||||
},
|
||||
data: unsortedOddHist,
|
||||
want: []*executetest.Table{{
|
||||
KeyCols: []string{"_start", "_stop"},
|
||||
ColMeta: []query.ColMeta{
|
||||
{Label: "_start", Type: query.TTime},
|
||||
{Label: "_stop", Type: query.TTime},
|
||||
{Label: "_value", Type: query.TFloat},
|
||||
},
|
||||
Data: [][]interface{}{
|
||||
{execute.Time(1), execute.Time(3), 1.0},
|
||||
},
|
||||
}},
|
||||
},
|
||||
{
|
||||
name: "90th nonlinear",
|
||||
spec: &functions.HistogramQuantileProcedureSpec{
|
||||
Quantile: 0.90,
|
||||
CountColumn: "_value",
|
||||
UpperBoundColumn: "le",
|
||||
ValueColumn: "_value",
|
||||
},
|
||||
data: nonLinearHist,
|
||||
want: []*executetest.Table{{
|
||||
KeyCols: []string{"_start", "_stop"},
|
||||
ColMeta: []query.ColMeta{
|
||||
{Label: "_start", Type: query.TTime},
|
||||
{Label: "_stop", Type: query.TTime},
|
||||
{Label: "_value", Type: query.TFloat},
|
||||
},
|
||||
Data: [][]interface{}{
|
||||
{execute.Time(1), execute.Time(3), 0.99},
|
||||
},
|
||||
}},
|
||||
},
|
||||
{
|
||||
name: "highest finite upper bound nonlinear",
|
||||
spec: &functions.HistogramQuantileProcedureSpec{
|
||||
Quantile: 0.99,
|
||||
CountColumn: "_value",
|
||||
UpperBoundColumn: "le",
|
||||
ValueColumn: "_value",
|
||||
},
|
||||
data: nonLinearHist,
|
||||
want: []*executetest.Table{{
|
||||
KeyCols: []string{"_start", "_stop"},
|
||||
ColMeta: []query.ColMeta{
|
||||
{Label: "_start", Type: query.TTime},
|
||||
{Label: "_stop", Type: query.TTime},
|
||||
{Label: "_value", Type: query.TFloat},
|
||||
},
|
||||
Data: [][]interface{}{
|
||||
{execute.Time(1), execute.Time(3), 1.0},
|
||||
},
|
||||
}},
|
||||
},
|
||||
}
|
||||
for _, tc := range testCases {
|
||||
tc := tc
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
executetest.ProcessTestHelper(
|
||||
t,
|
||||
tc.data,
|
||||
tc.want,
|
||||
nil,
|
||||
func(d execute.Dataset, c execute.TableBuilderCache) execute.Transformation {
|
||||
return functions.NewHistorgramQuantileTransformation(d, c, tc.spec)
|
||||
},
|
||||
)
|
||||
})
|
||||
}
|
||||
}
|
|
@ -51,7 +51,7 @@ var medianBuiltin = `
|
|||
// median returns the 50th percentile.
|
||||
// By default an approximate percentile is computed, this can be disabled by passing exact:true.
|
||||
// Using the exact method requires that the entire data set can fit in memory.
|
||||
median = (method="estimate_tdigest", compression=0.0, table=<-) => percentile(table:table, p:0.5, method:method, compression:compression)
|
||||
median = (method="estimate_tdigest", compression=0.0, table=<-) => percentile(table:table, percentile:0.5, method:method, compression:compression)
|
||||
`
|
||||
|
||||
func createPercentileOpSpec(args query.Arguments, a *query.Administration) (query.OperationSpec, error) {
|
||||
|
@ -60,7 +60,7 @@ func createPercentileOpSpec(args query.Arguments, a *query.Administration) (quer
|
|||
}
|
||||
|
||||
spec := new(PercentileOpSpec)
|
||||
p, err := args.GetRequiredFloat("p")
|
||||
p, err := args.GetRequiredFloat("percentile")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -0,0 +1,3 @@
|
|||
from(db:"testdb")
|
||||
|> range(start: 2018-05-22T19:53:00Z)
|
||||
|> histogramQuantile(quantile:0.90,upperBoundColumn:"le")
|
|
@ -0,0 +1,21 @@
|
|||
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,string,double,double
|
||||
#group,false,false,true,true,true,true,false,false
|
||||
#default,_result,,,,,,,
|
||||
,result,table,_start,_stop,_time,_field,_value,le
|
||||
,,1,2018-05-22T19:53:00Z,2018-05-22T19:54:00Z,2018-05-22T19:53:00Z,x_duration_seconds,1,0.1
|
||||
,,1,2018-05-22T19:53:00Z,2018-05-22T19:54:00Z,2018-05-22T19:53:00Z,x_duration_seconds,2,0.2
|
||||
,,1,2018-05-22T19:53:00Z,2018-05-22T19:54:00Z,2018-05-22T19:53:00Z,x_duration_seconds,2,0.3
|
||||
,,1,2018-05-22T19:53:00Z,2018-05-22T19:54:00Z,2018-05-22T19:53:00Z,x_duration_seconds,2,0.4
|
||||
,,1,2018-05-22T19:53:00Z,2018-05-22T19:54:00Z,2018-05-22T19:53:00Z,x_duration_seconds,2,0.5
|
||||
,,1,2018-05-22T19:53:00Z,2018-05-22T19:54:00Z,2018-05-22T19:53:00Z,x_duration_seconds,2,0.6
|
||||
,,1,2018-05-22T19:53:00Z,2018-05-22T19:54:00Z,2018-05-22T19:53:00Z,x_duration_seconds,2,0.7
|
||||
,,1,2018-05-22T19:53:00Z,2018-05-22T19:54:00Z,2018-05-22T19:53:00Z,x_duration_seconds,8,0.8
|
||||
,,1,2018-05-22T19:53:00Z,2018-05-22T19:54:00Z,2018-05-22T19:53:00Z,x_duration_seconds,10,0.9
|
||||
,,1,2018-05-22T19:53:00Z,2018-05-22T19:54:00Z,2018-05-22T19:53:00Z,x_duration_seconds,10,+Inf
|
||||
,,2,2018-05-22T19:53:00Z,2018-05-22T19:54:00Z,2018-05-22T19:53:00Z,y_duration_seconds,0,-Inf
|
||||
,,2,2018-05-22T19:53:00Z,2018-05-22T19:54:00Z,2018-05-22T19:53:00Z,y_duration_seconds,10,0.2
|
||||
,,2,2018-05-22T19:53:00Z,2018-05-22T19:54:00Z,2018-05-22T19:53:00Z,y_duration_seconds,15,0.4
|
||||
,,2,2018-05-22T19:53:00Z,2018-05-22T19:54:00Z,2018-05-22T19:53:00Z,y_duration_seconds,25,0.6
|
||||
,,2,2018-05-22T19:53:00Z,2018-05-22T19:54:00Z,2018-05-22T19:53:00Z,y_duration_seconds,35,0.8
|
||||
,,2,2018-05-22T19:53:00Z,2018-05-22T19:54:00Z,2018-05-22T19:53:00Z,y_duration_seconds,45,1.0
|
||||
,,2,2018-05-22T19:53:00Z,2018-05-22T19:54:00Z,2018-05-22T19:53:00Z,y_duration_seconds,45,+Inf
|
|
|
@ -0,0 +1,7 @@
|
|||
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,string,double
|
||||
#group,false,false,true,true,true,true,false
|
||||
#default,_result,,,,,,
|
||||
,result,table,_start,_stop,_time,_field,_value
|
||||
,,0,2018-05-22T19:53:00Z,2018-05-22T19:54:00Z,2018-05-22T19:53:00Z,x_duration_seconds,0.8500000000000001
|
||||
,,1,2018-05-22T19:53:00Z,2018-05-22T19:54:00Z,2018-05-22T19:53:00Z,y_duration_seconds,0.91
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
from(db: "test")
|
||||
|> range(start: 2018-05-22T19:50:26Z)
|
||||
|> group(by: ["_measurement", "_start"])
|
||||
|> percentile(p:0.75, method:"exact_selector")
|
||||
|> percentile(percentile:0.75, method:"exact_selector")
|
||||
|> map(fn: (r) => {_time: r._time, percentile: r._value})
|
||||
|> yield(name:"0")
|
||||
|> yield(name:"0")
|
||||
|
|
Loading…
Reference in New Issue