Close #521. Allow MODE to work on all datatypes
This change allows MODE to work on string as well as boolean and null values. It also introduce another numerical argument that specifies how many modal values it will return (although it is disabled)pull/612/head
parent
527517b724
commit
cc5a5e86bd
|
@ -937,20 +937,20 @@ func NewPercentileAggregator(_ *parser.SelectQuery, value *parser.Value, default
|
|||
//
|
||||
|
||||
type ModeAggregatorState struct {
|
||||
counts map[float64]int
|
||||
modes []float64
|
||||
counts map[interface{}]int
|
||||
}
|
||||
|
||||
type ModeAggregator struct {
|
||||
AbstractAggregator
|
||||
defaultValue *protocol.FieldValue
|
||||
alias string
|
||||
size int
|
||||
}
|
||||
|
||||
func (self *ModeAggregator) AggregatePoint(state interface{}, p *protocol.Point) (interface{}, error) {
|
||||
s, ok := state.(*ModeAggregatorState)
|
||||
if !ok {
|
||||
s = &ModeAggregatorState{make(map[float64]int), nil}
|
||||
s = &ModeAggregatorState{make(map[interface{}]int)}
|
||||
}
|
||||
|
||||
point, err := GetValue(self.value, self.columns, p)
|
||||
|
@ -958,13 +958,17 @@ func (self *ModeAggregator) AggregatePoint(state interface{}, p *protocol.Point)
|
|||
return nil, err
|
||||
}
|
||||
|
||||
var value float64
|
||||
var value interface{}
|
||||
if point.Int64Value != nil {
|
||||
value = float64(*point.Int64Value)
|
||||
} else if point.DoubleValue != nil {
|
||||
value = *point.DoubleValue
|
||||
} else if point.BoolValue != nil {
|
||||
value = *point.BoolValue
|
||||
} else if point.StringValue != nil {
|
||||
value = *point.StringValue
|
||||
} else {
|
||||
return s, nil
|
||||
value = nil
|
||||
}
|
||||
|
||||
s.counts[value]++
|
||||
|
@ -978,49 +982,72 @@ func (self *ModeAggregator) ColumnNames() []string {
|
|||
return []string{"mode"}
|
||||
}
|
||||
|
||||
func (self *ModeAggregator) CalculateSummaries(state interface{}) {
|
||||
modes := []float64{}
|
||||
currentCount := 1
|
||||
|
||||
s := state.(*ModeAggregatorState)
|
||||
for value, count := range s.counts {
|
||||
if count == currentCount {
|
||||
modes = append(modes, value)
|
||||
} else if count > currentCount {
|
||||
modes = nil
|
||||
modes = append(modes, value)
|
||||
currentCount = count
|
||||
}
|
||||
}
|
||||
|
||||
s.modes = modes
|
||||
s.counts = nil
|
||||
}
|
||||
|
||||
func (self *ModeAggregator) GetValues(state interface{}) [][]*protocol.FieldValue {
|
||||
s := state.(*ModeAggregatorState)
|
||||
|
||||
counts := make([]int, len(s.counts))
|
||||
countMap := make(map[int][]interface{}, len(s.counts))
|
||||
for value, count := range s.counts {
|
||||
counts = append(counts, count)
|
||||
countMap[count] = append(countMap[count], value)
|
||||
}
|
||||
sort.Ints(counts)
|
||||
|
||||
returnValues := [][]*protocol.FieldValue{}
|
||||
|
||||
s, ok := state.(*ModeAggregatorState)
|
||||
if !ok || len(s.modes) == 0 {
|
||||
returnValues = append(returnValues, []*protocol.FieldValue{self.defaultValue})
|
||||
return returnValues
|
||||
}
|
||||
|
||||
for _, value := range s.modes {
|
||||
// we can't use value since we need a pointer to a variable that won't change,
|
||||
// while value will change the next iteration
|
||||
v := value
|
||||
returnValues = append(returnValues, []*protocol.FieldValue{
|
||||
&protocol.FieldValue{DoubleValue: &v},
|
||||
})
|
||||
last := 0
|
||||
for i := len(counts); i > 0; i-- {
|
||||
// counts can contain duplicates, but we only want to append each count-set once
|
||||
count := counts[i-1]
|
||||
if count == last {
|
||||
continue
|
||||
}
|
||||
last = count
|
||||
for _, value := range countMap[count] {
|
||||
switch v := value.(type) {
|
||||
case int:
|
||||
n := int64(v)
|
||||
returnValues = append(returnValues, []*protocol.FieldValue{&protocol.FieldValue{Int64Value: &n}})
|
||||
case string:
|
||||
returnValues = append(returnValues, []*protocol.FieldValue{&protocol.FieldValue{StringValue: &v}})
|
||||
case bool:
|
||||
returnValues = append(returnValues, []*protocol.FieldValue{&protocol.FieldValue{BoolValue: &v}})
|
||||
case float64:
|
||||
returnValues = append(returnValues, []*protocol.FieldValue{&protocol.FieldValue{DoubleValue: &v}})
|
||||
}
|
||||
}
|
||||
// size is really "minimum size"
|
||||
if len(returnValues) >= self.size {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
return returnValues
|
||||
}
|
||||
|
||||
func NewModeAggregator(_ *parser.SelectQuery, value *parser.Value, defaultValue *parser.Value) (Aggregator, error) {
|
||||
if len(value.Elems) != 1 {
|
||||
return nil, common.NewQueryError(common.WrongNumberOfArguments, "function mode() requires exactly one argument")
|
||||
if len(value.Elems) < 1 {
|
||||
return nil, common.NewQueryError(common.WrongNumberOfArguments, "function mode() requires at least one argument")
|
||||
}
|
||||
|
||||
if len(value.Elems) > 2 {
|
||||
return nil, common.NewQueryError(common.WrongNumberOfArguments, "function mode() takes at most two arguments")
|
||||
}
|
||||
|
||||
size := 1
|
||||
if len(value.Elems) == 2 {
|
||||
switch value.Elems[1].Type {
|
||||
case parser.ValueInt, parser.ValueFloat:
|
||||
var err error
|
||||
_size := int64(1)
|
||||
_size, err = strconv.ParseInt(value.Elems[1].Name, 10, 32)
|
||||
size = int(_size)
|
||||
if err != nil {
|
||||
return nil, common.NewQueryError(common.InvalidArgument, "Cannot parse %s into an int", value.Elems[1].Name)
|
||||
}
|
||||
default:
|
||||
return nil, common.NewQueryError(common.InvalidArgument, "Cannot parse %s into a int", value.Elems[1].Name)
|
||||
}
|
||||
}
|
||||
|
||||
wrappedDefaultValue, err := wrapDefaultValue(defaultValue)
|
||||
|
@ -1034,6 +1061,7 @@ func NewModeAggregator(_ *parser.SelectQuery, value *parser.Value, defaultValue
|
|||
},
|
||||
defaultValue: wrappedDefaultValue,
|
||||
alias: value.Alias,
|
||||
size: size,
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue