Close #551. Fix #409. Add aggregate funciton top and low

this patch provides aggregate function top and low. these function are
useful when executing analytics or summarize queries.

Added functions:
  low(columns..., limit)
  top(columns..., limit)

For instance:
  `select low(value, 10) from series` will output 10 `value` values with ascending order.
  `select top(value, 10) from series` will output 10 `value` values with descending order.

How to build:
  go get github.com/ryszard/goskiplist/skiplist
  go build src/daemon.go

Currently, these function support float64 and int64 types.

Limitations:
  * can't handle same value correctly (goskiplist limitation)

Misc:
  * each column should support alias.
    e.g) low(score as low_score, 10)

And, I'm not good at English and Golang. so if you find some wrong
points, plz feel free to fix it.
pull/547/merge
Shuhei Tanuma 2014-05-18 00:40:03 +09:00 committed by John Shahid
parent a4ece2a897
commit d382ea0b3e
2 changed files with 491 additions and 0 deletions

View File

@ -44,6 +44,8 @@ func init() {
registeredAggregators["distinct"] = NewDistinctAggregator
registeredAggregators["first"] = NewFirstAggregator
registeredAggregators["last"] = NewLastAggregator
registeredAggregators["top"] = NewTopAggregator
registeredAggregators["bottom"] = NewBottomAggregator
}
// used in testing to get a list of all aggregators
@ -1091,3 +1093,198 @@ func NewFirstAggregator(_ *parser.SelectQuery, value *parser.Value, defaultValue
func NewLastAggregator(_ *parser.SelectQuery, value *parser.Value, defaultValue *parser.Value) (Aggregator, error) {
return NewFirstOrLastAggregator("last", value, false, defaultValue)
}
//
// Top, Bottom aggregators
//
type ByPointColumnDesc struct {
protocol.PointsCollection
}
type ByPointColumnAsc struct {
protocol.PointsCollection
}
func (s ByPointColumnDesc) Less(i, j int) bool {
if s.PointsCollection[i] == nil || s.PointsCollection[j] == nil {
return false
}
if s.PointsCollection[i].Values[0].Int64Value != nil && s.PointsCollection[j].Values[0].Int64Value != nil {
return *s.PointsCollection[i].Values[0].Int64Value > *s.PointsCollection[j].Values[0].Int64Value
} else if s.PointsCollection[i].Values[0].DoubleValue != nil && s.PointsCollection[j].Values[0].DoubleValue != nil {
return *s.PointsCollection[i].Values[0].DoubleValue > *s.PointsCollection[j].Values[0].DoubleValue
} else if s.PointsCollection[i].Values[0].StringValue != nil && s.PointsCollection[j].Values[0].StringValue != nil {
return *s.PointsCollection[i].Values[0].StringValue > *s.PointsCollection[j].Values[0].StringValue
}
return false
}
func (s ByPointColumnAsc) Less(i, j int) bool {
if s.PointsCollection[i] == nil || s.PointsCollection[j] == nil {
return false
}
if s.PointsCollection[i].Values[0].Int64Value != nil && s.PointsCollection[j].Values[0].Int64Value != nil {
return *s.PointsCollection[i].Values[0].Int64Value < *s.PointsCollection[j].Values[0].Int64Value
} else if s.PointsCollection[i].Values[0].DoubleValue != nil && s.PointsCollection[j].Values[0].DoubleValue != nil {
return *s.PointsCollection[i].Values[0].DoubleValue < *s.PointsCollection[j].Values[0].DoubleValue
} else if s.PointsCollection[i].Values[0].StringValue != nil && s.PointsCollection[j].Values[0].StringValue != nil {
return *s.PointsCollection[i].Values[0].StringValue < *s.PointsCollection[j].Values[0].StringValue
}
return false
}
type TopOrBottomAggregatorState struct {
values protocol.PointsCollection
counter int64
}
type TopOrBottomAggregator struct {
AbstractAggregator
name string
isTop bool
defaultValue *protocol.FieldValue
alias string
limit int64
target string
}
func (self *TopOrBottomAggregator) comparePoint(a *protocol.Point, b *protocol.Point, greater bool) bool {
result := func(a *protocol.Point, b *protocol.Point) bool {
if a.Values[0].Int64Value != nil && b.Values[0].Int64Value != nil {
return *a.Values[0].Int64Value < *b.Values[0].Int64Value
} else if a.Values[0].DoubleValue != nil && b.Values[0].DoubleValue != nil {
return *a.Values[0].DoubleValue < *b.Values[0].DoubleValue
} else if a.Values[0].StringValue != nil && b.Values[0].StringValue != nil {
return *a.Values[0].StringValue < *b.Values[0].StringValue
}
return false
}(a, b)
if !greater {
return !result
} else {
return result
}
}
func (self *TopOrBottomAggregator) AggregatePoint(state interface{}, p *protocol.Point) (interface{}, error) {
var s *TopOrBottomAggregatorState
fieldValue, err := GetValue(self.value, self.columns, p)
if err != nil {
return nil, err
}
if state == nil {
s = &TopOrBottomAggregatorState{values: protocol.PointsCollection{}}
} else {
s = state.(*TopOrBottomAggregatorState)
}
sorter := func(values protocol.PointsCollection, isTop bool) {
if isTop {
sort.Sort(ByPointColumnDesc{values})
} else {
sort.Sort(ByPointColumnAsc{values})
}
}
asFieldValue := func(p *protocol.Point) *protocol.FieldValue {
pp := &protocol.FieldValue{}
if fieldValue.Int64Value != nil {
pp.Int64Value = fieldValue.Int64Value
} else if fieldValue.DoubleValue != nil {
pp.DoubleValue = fieldValue.DoubleValue
} else if fieldValue.StringValue != nil {
pp.StringValue = fieldValue.StringValue
}
return pp
}
if s.counter < self.limit {
newvalue := &protocol.Point{
Values: []*protocol.FieldValue{asFieldValue(p)},
}
s.values = append(s.values, newvalue)
sorter(s.values, self.isTop)
s.counter++
} else {
newvalue := &protocol.Point{
Values: []*protocol.FieldValue{asFieldValue(p)},
}
if self.comparePoint(s.values[s.counter-1], newvalue, self.isTop) {
s.values = append(s.values, p)
sorter(s.values, self.isTop)
s.values = s.values[0:self.limit]
}
}
return s, nil
}
func (self *TopOrBottomAggregator) ColumnNames() []string {
if self.alias != "" {
return []string{self.alias}
} else {
return []string{self.name}
}
}
func (self *TopOrBottomAggregator) GetValues(state interface{}) [][]*protocol.FieldValue {
returnValues := [][]*protocol.FieldValue{}
if state == nil {
returnValues = append(returnValues, []*protocol.FieldValue{self.defaultValue})
} else {
s := state.(*TopOrBottomAggregatorState)
for _, values := range s.values {
returnValues = append(returnValues, []*protocol.FieldValue{values.Values[0]})
}
}
return returnValues
}
func (self *TopOrBottomAggregator) InitializeFieldsMetadata(series *protocol.Series) error {
self.columns = series.Fields
return nil
}
func NewTopOrBottomAggregator(name string, v *parser.Value, isTop bool, defaultValue *parser.Value) (Aggregator, error) {
if len(v.Elems) != 2 {
return nil, common.NewQueryError(common.WrongNumberOfArguments, fmt.Sprintf("function %s() requires at exactly 2 arguments", name))
}
if v.Elems[1].Type != parser.ValueInt {
return nil, common.NewQueryError(common.InvalidArgument, fmt.Sprintf("function %s() second parameter expect int", name))
}
wrappedDefaultValue, err := wrapDefaultValue(defaultValue)
if err != nil {
return nil, err
}
limit, err := strconv.ParseInt(v.Elems[1].Name, 10, 64)
if err != nil {
return nil, err
}
return &TopOrBottomAggregator{
AbstractAggregator: AbstractAggregator{
value: v.Elems[0],
},
name: name,
isTop: isTop,
defaultValue: wrappedDefaultValue,
alias: v.Alias,
limit: limit}, nil
}
func NewTopAggregator(_ *parser.SelectQuery, value *parser.Value, defaultValue *parser.Value) (Aggregator, error) {
return NewTopOrBottomAggregator("top", value, true, defaultValue)
}
func NewBottomAggregator(_ *parser.SelectQuery, value *parser.Value, defaultValue *parser.Value) (Aggregator, error) {
return NewTopOrBottomAggregator("bottom", value, false, defaultValue)
}

View File

@ -908,6 +908,8 @@ func (self *DataTestSuite) CountWithAlias(c *C) (Fun, Fun) {
query := fmt.Sprintf("select %s(column0) as some_alias from test_aliasing", name)
if name == "percentile" {
query = "select percentile(column0, 90) as some_alias from test_aliasing"
} else if name == "top" || name == "bottom" {
query = fmt.Sprintf("select %s(column0, 10) as some_alias from test_aliasing", name)
}
fmt.Printf("query: %s\n", query)
data := client.RunQuery(query, c, "m")
@ -1631,3 +1633,295 @@ func (self *SingleServerSuite) ColumnNameWithWeirdCharacters(c *C) (Fun, Fun) {
c.Assert(data[0].Columns[0], Equals, "foo.-239(*@&#$!#)(* #$@")
}
}
// For issue #551 - add aggregate function top and bottom - https://github.com/influxdb/influxdb/issues/551
func (self *DataTestSuite) Top(c *C) (Fun, Fun) {
return func(client Client) {
for i := 0; i < 3; i++ {
client.WriteJsonData(fmt.Sprintf(`
[
{
"name": "test_top",
"columns": ["cpu", "host"],
"points": [[%d, "hosta"], [%d, "hostb"]]
}
]
`, 60+i*10, 70+i*10), c)
}
}, func(client Client) {
data := client.RunQuery("select top(cpu, 5) from test_top;", c, "m")
c.Assert(data[0].Name, Equals, "test_top")
c.Assert(data[0].Columns, HasLen, 2)
c.Assert(data[0].Points, HasLen, 5)
tops := []float64{}
for _, point := range data[0].Points {
tops = append(tops, point[1].(float64))
}
c.Assert(tops, DeepEquals, []float64{90, 80, 80, 70, 70})
}
}
func (self *DataTestSuite) TopWithStringColumn(c *C) (Fun, Fun) {
return func(client Client) {
for i := 0; i < 3; i++ {
client.WriteJsonData(fmt.Sprintf(`
[
{
"name": "test_top",
"columns": ["cpu", "host"],
"points": [[%d, "hosta"], [%d, "hostb"]]
}
]
`, 60+i*10, 70+i*10), c)
}
}, func(client Client) {
data := client.RunQuery("select top(host, 5) from test_top;", c, "m")
c.Assert(data[0].Name, Equals, "test_top")
c.Assert(data[0].Columns, HasLen, 2)
c.Assert(data[0].Points, HasLen, 5)
tops := []string{}
for _, point := range data[0].Points {
tops = append(tops, point[1].(string))
}
c.Assert(tops, DeepEquals, []string{"hostb", "hostb", "hostb", "hosta", "hosta"})
}
}
func (self *DataTestSuite) TopWithGroupBy(c *C) (Fun, Fun) {
return func(client Client) {
for i := 0; i < 3; i++ {
client.WriteJsonData(fmt.Sprintf(`
[
{
"name": "test_top",
"columns": ["cpu", "host"],
"points": [[%d, "hosta"], [%d, "hostb"]]
}
]
`, 60+i*10, 70+i*10), c)
}
}, func(client Client) {
data := client.RunQuery("select top(cpu, 5), host from test_top group by host;", c, "m")
c.Assert(data[0].Name, Equals, "test_top")
c.Assert(data[0].Columns, HasLen, 3)
c.Assert(data[0].Points, HasLen, 6)
type tmp struct {
cpu float64
host string
}
tops := []tmp{}
for _, point := range data[0].Points {
tops = append(tops, tmp{point[1].(float64), point[2].(string)})
}
c.Assert(tops, DeepEquals, []tmp{tmp{80, "hosta"}, tmp{70, "hosta"}, tmp{60, "hosta"}, tmp{90, "hostb"}, tmp{80, "hostb"}, tmp{70, "hostb"}})
}
}
func (self *DataTestSuite) TopWithMultipleGroupBy(c *C) (Fun, Fun) {
return func(client Client) {
for i := 0; i < 3; i++ {
client.WriteJsonData(fmt.Sprintf(`
[
{
"name": "test_top",
"columns": ["time", "cpu", "host"],
"points": [[%d, %d, "hosta"], [%d, %d, "hostb"]]
}
]
`, 1400504400+i*60, 60+i*10, 1400504400+i*60, 70+i*10), c)
}
}, func(client Client) {
data := client.RunQuery("select top(cpu, 5), host from test_top group by time(1m), host;", c, "m")
c.Assert(data[0].Name, Equals, "test_top")
c.Assert(data[0].Columns, HasLen, 3)
c.Assert(data[0].Points, HasLen, 6)
type tmp struct {
// TODO(chobie): add time column
cpu float64
host string
}
tops := []tmp{}
for _, point := range data[0].Points {
tops = append(tops, tmp{point[1].(float64), point[2].(string)})
}
c.Assert(tops, DeepEquals, []tmp{tmp{80, "hosta"}, tmp{70, "hosta"}, tmp{60, "hosta"}, tmp{90, "hostb"}, tmp{80, "hostb"}, tmp{70, "hostb"}})
}
}
func (self *DataTestSuite) TopWithLessResult(c *C) (Fun, Fun) {
return func(client Client) {
for i := 0; i < 5; i++ {
client.WriteJsonData(fmt.Sprintf(`
[
{
"name": "test_top",
"columns": ["cpu", "host"],
"points": [[%d, "hosta"], [%d, "hostb"]]
}
]
`, 40+i*10, 50+i*10), c)
}
}, func(client Client) {
data := client.RunQuery("select top(cpu, 20) from test_top;", c, "m")
c.Assert(data[0].Name, Equals, "test_top")
c.Assert(data[0].Columns, HasLen, 2)
// top collects result as possible
c.Assert(data[0].Points, HasLen, 10)
tops := []float64{}
for _, point := range data[0].Points {
tops = append(tops, point[1].(float64))
}
c.Assert(tops, DeepEquals, []float64{90, 80, 80, 70, 70, 60, 60, 50, 50, 40})
}
}
func (self *DataTestSuite) Bottom(c *C) (Fun, Fun) {
return func(client Client) {
for i := 0; i < 3; i++ {
client.WriteJsonData(fmt.Sprintf(`
[
{
"name": "test_bottom",
"columns": ["cpu", "host"],
"points": [[%d, "hosta"], [%d, "hostb"]]
}
]
`, 60+i*10, 70+i*10), c)
}
}, func(client Client) {
data := client.RunQuery("select bottom(cpu, 5) from test_bottom;", c, "m")
c.Assert(data[0].Name, Equals, "test_bottom")
c.Assert(data[0].Columns, HasLen, 2)
c.Assert(data[0].Points, HasLen, 5)
tops := []float64{}
for _, point := range data[0].Points {
tops = append(tops, point[1].(float64))
}
c.Assert(tops, DeepEquals, []float64{60, 70, 70, 80, 80})
}
}
func (self *DataTestSuite) BottomWithStringColumn(c *C) (Fun, Fun) {
return func(client Client) {
for i := 0; i < 3; i++ {
client.WriteJsonData(fmt.Sprintf(`
[
{
"name": "test_bottom",
"columns": ["cpu", "host"],
"points": [[%d, "hosta"], [%d, "hostb"]]
}
]
`, 60+i*10, 70+i*10), c)
}
}, func(client Client) {
data := client.RunQuery("select bottom(host, 5) from test_bottom;", c, "m")
c.Assert(data[0].Name, Equals, "test_bottom")
c.Assert(data[0].Columns, HasLen, 2)
c.Assert(data[0].Points, HasLen, 5)
tops := []string{}
for _, point := range data[0].Points {
tops = append(tops, point[1].(string))
}
c.Assert(tops, DeepEquals, []string{"hosta", "hosta", "hosta", "hostb", "hostb"})
}
}
func (self *DataTestSuite) BottomWithGroupBy(c *C) (Fun, Fun) {
return func(client Client) {
for i := 0; i < 3; i++ {
client.WriteJsonData(fmt.Sprintf(`
[
{
"name": "test_bottom",
"columns": ["cpu", "host"],
"points": [[%d, "hosta"], [%d, "hostb"]]
}
]
`, 60+i*10, 70+i*10), c)
}
}, func(client Client) {
data := client.RunQuery("select bottom(cpu, 5), host from test_bottom group by host;", c, "m")
c.Assert(data[0].Name, Equals, "test_bottom")
c.Assert(data[0].Columns, HasLen, 3)
c.Assert(data[0].Points, HasLen, 6)
type tmp struct {
cpu float64
host string
}
tops := []tmp{}
for _, point := range data[0].Points {
tops = append(tops, tmp{point[1].(float64), point[2].(string)})
}
c.Assert(tops, DeepEquals, []tmp{tmp{60, "hosta"}, tmp{70, "hosta"}, tmp{80, "hosta"}, tmp{70, "hostb"}, tmp{80, "hostb"}, tmp{90, "hostb"}})
}
}
func (self *DataTestSuite) BottomWithLessResult(c *C) (Fun, Fun) {
return func(client Client) {
for i := 0; i < 5; i++ {
client.WriteJsonData(fmt.Sprintf(`
[
{
"name": "test_bottom",
"columns": ["cpu", "host"],
"points": [[%d, "hosta"], [%d, "hostb"]]
}
]
`, 40+i*10, 50+i*10), c)
}
}, func(client Client) {
data := client.RunQuery("select bottom(cpu, 20) from test_bottom;", c, "m")
c.Assert(data[0].Name, Equals, "test_bottom")
c.Assert(data[0].Columns, HasLen, 2)
// bottom collects result as possible
c.Assert(data[0].Points, HasLen, 10)
bottoms := []float64{}
for _, point := range data[0].Points {
bottoms = append(bottoms, point[1].(float64))
}
c.Assert(bottoms, DeepEquals, []float64{40, 50, 50, 60, 60, 70, 70, 80, 80, 90})
}
}
func (self *DataTestSuite) BottomWithMultipleGroupBy(c *C) (Fun, Fun) {
return func(client Client) {
for i := 0; i < 3; i++ {
client.WriteJsonData(fmt.Sprintf(`
[
{
"name": "test_bottom",
"columns": ["time", "cpu", "host"],
"points": [[%d, %d, "hosta"], [%d, %d, "hostb"]]
}
]
`, 1400504400+i*60, 60+i*10, 1400504400+i*60, 70+i*10), c)
}
}, func(client Client) {
data := client.RunQuery("select bottom(cpu, 5), host from test_bottom group by time(1m), host;", c, "m")
c.Assert(data[0].Name, Equals, "test_bottom")
c.Assert(data[0].Columns, HasLen, 3)
c.Assert(data[0].Points, HasLen, 6)
type tmp struct {
// TODO(chobie): add time column
cpu float64
host string
}
tops := []tmp{}
for _, point := range data[0].Points {
tops = append(tops, tmp{point[1].(float64), point[2].(string)})
}
c.Assert(tops, DeepEquals, []tmp{tmp{60, "hosta"}, tmp{70, "hosta"}, tmp{80, "hosta"}, tmp{70, "hostb"}, tmp{80, "hostb"}, tmp{90, "hostb"}})
}
}