implemented median aggregation

pull/2411/head
ben hockey 2015-04-23 17:14:09 -05:00
parent 6571f95ea5
commit 40af5fd1e3
1 changed files with 130 additions and 0 deletions

View File

@ -10,6 +10,7 @@ import (
"encoding/json"
"fmt"
"math"
"math/rand"
"sort"
)
@ -60,6 +61,8 @@ func InitializeMapFunc(c *Call) (MapFunc, error) {
return MapSum, nil
case "mean":
return MapMean, nil
case "median":
return MapStddev, nil
case "min":
return MapMin, nil
case "max":
@ -93,6 +96,8 @@ func InitializeReduceFunc(c *Call) (ReduceFunc, error) {
return ReduceSum, nil
case "mean":
return ReduceMean, nil
case "median":
return ReduceMedian, nil
case "min":
return ReduceMin, nil
case "max":
@ -162,6 +167,12 @@ func InitializeUnmarshaller(c *Call) (UnmarshalFunc, error) {
err := json.Unmarshal(b, &val)
return val, err
}, nil
case "median":
return func(b []byte) (interface{}, error) {
a := make([]float64, 0)
err := json.Unmarshal(b, &a)
return a, err
}, nil
default:
return func(b []byte) (interface{}, error) {
var val interface{}
@ -254,6 +265,125 @@ func ReduceMean(values []interface{}) interface{} {
return nil
}
// ReduceMedian computes the median of values
func ReduceMedian(values []interface{}) interface{} {
var data []float64
// Collect all the data points
for _, value := range values {
if value == nil {
continue
}
data = append(data, value.([]float64)...)
}
length := len(data)
if length < 2 {
if length == 0 {
return nil
}
return data[0]
}
middle := length / 2
var sortedRange []float64
if length%2 == 0 {
sortedRange = getSortedRange(data, middle-1, 2)
var low, high = sortedRange[0], sortedRange[1]
return low + (high-low)/2
} else {
sortedRange = getSortedRange(data, middle, 1)
return sortedRange[0]
}
}
func getSortedRange(data []float64, start int, count int) []float64 {
out := discardLowerRange(data, start)
discards := len(out) - count
if discards > 0 {
out = discardUpperRange(out, discards)
}
sort.Float64s(out)
return out
}
func discardLowerRange(data []float64, discards int) []float64 {
var out []float64
// discard values lower than the desired range
for discards > 0 {
lows, pivotValue, highs := partition(data)
lowLength := len(lows)
if lowLength > discards {
// keep all the highs and the pivot
out = append(out, pivotValue)
out = append(out, highs...)
// iterate over the lows again
data = lows
} else {
// discard all the lows
data = highs
discards -= lowLength
if discards == 0 {
// if discarded enough lows, keep the pivot
out = append(out, pivotValue)
} else {
// able to discard the pivot too
discards--
}
}
}
return append(out, data...)
}
func discardUpperRange(data []float64, discards int) []float64 {
var out []float64
// discard values higher than the desired range
for discards > 0 {
lows, pivotValue, highs := partition(data)
highLength := len(highs)
if highLength > discards {
// keep all the lows and the pivot
out = append(out, pivotValue)
out = append(out, lows...)
// iterate over the highs again
data = highs
} else {
// discard all the highs
data = lows
discards -= highLength
if discards == 0 {
// if discarded enough highs, keep the pivot
out = append(out, pivotValue)
} else {
// able to discard the pivot too
discards--
}
}
}
return append(out, data...)
}
func partition(data []float64) (lows []float64, pivotValue float64, highs []float64) {
length := len(data)
pivotIndex := rand.Int() % length
pivotValue = data[pivotIndex]
// partition the data around the pivot
for i, value := range data {
if i != pivotIndex {
if value < pivotValue {
lows = append(lows, value)
} else {
highs = append(highs, value)
}
}
}
return lows, pivotValue, highs
}
// MapMin collects the values to pass to the reducer
func MapMin(itr Iterator) interface{} {
var min float64