Handle missing values in aggregate derivative queries better

If an aggregate derivative query did not have a value in the first
time bucket, it would abort early and return a single row with value
of 0.  Similarly, if either the current or previous value was nil,
it would skip the row and not append any values causing gaps and
no data to show up.

Instead, this will append a nil value if either the current or previous
valis is nil.  This essentially allows nil values to carry through the
results as well as gives a more sensible value for rows where we cannot
compute a difference (instead of dropping the row as before).

Fixes #4237 #4263
pull/4292/head
Jason Wilder 2015-10-01 11:17:56 -06:00
parent 970094d4a3
commit 06c143c2dd
3 changed files with 130 additions and 27 deletions

View File

@ -30,6 +30,8 @@
- [#4264](https://github.com/influxdb/influxdb/issues/4264): Refactor map functions to use list of values
- [#4278](https://github.com/influxdb/influxdb/pull/4278): Fix error marshalling across the cluster
- [#4149](https://github.com/influxdb/influxdb/pull/4149): Fix derivative unnecessarily requires aggregate function. Thanks @peekeri!
- [#4237](https://github.com/influxdb/influxdb/issues/4237): DERIVATIVE() edge conditions
- [#4263](https://github.com/influxdb/influxdb/issues/4263): derivative does not work when data is missing
## v0.9.4 [2015-09-14]

View File

@ -926,16 +926,15 @@ type RawQueryDerivativeProcessor struct {
DerivativeInterval time.Duration
}
func (rqdp *RawQueryDerivativeProcessor) canProcess(input []*MapperValue) bool {
// If we only have 1 value, then the value did not change, so return
// a single row with 0.0
if len(input) == 1 {
func (rqdp *RawQueryDerivativeProcessor) canProcess(input *MapperValue) bool {
// Cannot process a nil value
if input == nil {
return false
}
// See if the field value is numeric, if it's not, we can't process the derivative
validType := false
switch input[0].Value.(type) {
switch input.Value.(type) {
case int64:
validType = true
case float64:
@ -950,7 +949,7 @@ func (rqdp *RawQueryDerivativeProcessor) Process(input []*MapperValue) []*Mapper
return input
}
if !rqdp.canProcess(input) {
if len(input) == 1 {
return []*MapperValue{
&MapperValue{
Time: input[0].Time,
@ -967,6 +966,16 @@ func (rqdp *RawQueryDerivativeProcessor) Process(input []*MapperValue) []*Mapper
for i := 1; i < len(input); i++ {
v := input[i]
// If we can't use the current or prev value (wrong time, nil), just append
// nil
if !rqdp.canProcess(v) || !rqdp.canProcess(rqdp.LastValueFromPreviousChunk) {
derivativeValues = append(derivativeValues, &MapperValue{
Time: v.Time,
Value: nil,
})
continue
}
// Calculate the derivative of successive points by dividing the difference
// of each value by the elapsed time normalized to the interval
diff := int64toFloat64(v.Value) - int64toFloat64(rqdp.LastValueFromPreviousChunk.Value)
@ -1044,22 +1053,6 @@ func ProcessAggregateDerivative(results [][]interface{}, isNonNegative bool, int
}
}
// Check the value's type to ensure it's an numeric, if not, return a 0 result. We only check the first value
// because derivatives cannot be combined with other aggregates currently.
validType := false
switch results[0][1].(type) {
case int64:
validType = true
case float64:
validType = true
}
if !validType {
return [][]interface{}{
[]interface{}{results[0][0], 0.0},
}
}
// Otherwise calculate the derivatives as the difference between consecutive
// points divided by the elapsed time. Then normalize to the requested
// interval.
@ -1068,7 +1061,28 @@ func ProcessAggregateDerivative(results [][]interface{}, isNonNegative bool, int
prev := results[i-1]
cur := results[i]
if cur[1] == nil || prev[1] == nil {
// If current value is nil, append nil for the value
if prev[1] == nil || cur[1] == nil {
derivatives = append(derivatives, []interface{}{
cur[0], nil,
})
continue
}
// Check the value's type to ensure it's an numeric, if not, return a nil result. We only check the first value
// because derivatives cannot be combined with other aggregates currently.
validType := false
switch cur[1].(type) {
case int64:
validType = true
case float64:
validType = true
}
if !validType {
derivatives = append(derivatives, []interface{}{
cur[0], nil,
})
continue
}

View File

@ -860,7 +860,43 @@ func TestProcessAggregateDerivative(t *testing.T) {
},
exp: [][]interface{}{
[]interface{}{
time.Unix(0, 0), 0.0,
time.Unix(0, 0).Add(24 * time.Hour), nil,
},
[]interface{}{
time.Unix(0, 0).Add(48 * time.Hour), nil,
},
[]interface{}{
time.Unix(0, 0).Add(72 * time.Hour), nil,
},
},
},
{
name: "bool derivatives",
fn: "derivative",
interval: 24 * time.Hour,
in: [][]interface{}{
[]interface{}{
time.Unix(0, 0), "1.0",
},
[]interface{}{
time.Unix(0, 0).Add(24 * time.Hour), true,
},
[]interface{}{
time.Unix(0, 0).Add(48 * time.Hour), true,
},
[]interface{}{
time.Unix(0, 0).Add(72 * time.Hour), true,
},
},
exp: [][]interface{}{
[]interface{}{
time.Unix(0, 0).Add(24 * time.Hour), nil,
},
[]interface{}{
time.Unix(0, 0).Add(48 * time.Hour), nil,
},
[]interface{}{
time.Unix(0, 0).Add(72 * time.Hour), nil,
},
},
},
@ -1122,9 +1158,54 @@ func TestProcessRawQueryDerivative(t *testing.T) {
},
},
exp: []*tsdb.MapperValue{
{
Time: time.Unix(0, 0).Add(24 * time.Hour).UnixNano(),
Value: nil,
},
{
Time: time.Unix(0, 0).Add(48 * time.Hour).UnixNano(),
Value: nil,
},
{
Time: time.Unix(0, 0).Add(72 * time.Hour).UnixNano(),
Value: nil,
},
},
},
{
name: "bool derivatives",
fn: "derivative",
interval: 24 * time.Hour,
in: []*tsdb.MapperValue{
{
Time: time.Unix(0, 0).Unix(),
Value: 0.0,
Value: true,
},
{
Time: time.Unix(0, 0).Add(24 * time.Hour).UnixNano(),
Value: true,
},
{
Time: time.Unix(0, 0).Add(48 * time.Hour).UnixNano(),
Value: false,
},
{
Time: time.Unix(0, 0).Add(72 * time.Hour).UnixNano(),
Value: false,
},
},
exp: []*tsdb.MapperValue{
{
Time: time.Unix(0, 0).Add(24 * time.Hour).UnixNano(),
Value: nil,
},
{
Time: time.Unix(0, 0).Add(48 * time.Hour).UnixNano(),
Value: nil,
},
{
Time: time.Unix(0, 0).Add(72 * time.Hour).UnixNano(),
Value: nil,
},
},
},
@ -1142,8 +1223,14 @@ func TestProcessRawQueryDerivative(t *testing.T) {
}
for i := 0; i < len(test.exp); i++ {
if test.exp[i].Time != got[i].Time || math.Abs((test.exp[i].Value.(float64)-got[i].Value.(float64))) > 0.0000001 {
t.Fatalf("RawQueryDerivativeProcessor - %s results mismatch:\ngot %v\nexp %v", test.name, got, test.exp)
if v, ok := test.exp[i].Value.(float64); ok {
if test.exp[i].Time != got[i].Time || math.Abs((v-got[i].Value.(float64))) > 0.0000001 {
t.Fatalf("RawQueryDerivativeProcessor - %s results mismatch:\ngot %v\nexp %v", test.name, got, test.exp)
}
} else {
if test.exp[i].Time != got[i].Time || test.exp[i].Value != got[i].Value {
t.Fatalf("RawQueryDerivativeProcessor - %s results mismatch:\ngot %v\nexp %v", test.name, got, test.exp)
}
}
}
}