fix . Implement first and last aggregates

pull/70/head
John Shahid 2013-11-15 16:30:09 -05:00
parent 393ac4fdf0
commit 4d02a140ea
3 changed files with 130 additions and 1 deletions

View File

@ -103,7 +103,6 @@
- [Issue #34](https://github.com/influxdb/influxdb/issues/34). Ascending order always return null for columns that have a null value
- [Issue #55](https://github.com/influxdb/influxdb/issues/55). Limit should limit the points that match the Where clause
- [Issue #53](https://github.com/influxdb/influxdb/issues/53). Writing null values via HTTP API fails
- [Issue #57](https://github.com/influxdb/influxdb/issues/57). Don't panic when type of time != float
### Deprecated
@ -116,4 +115,8 @@
## Features
- [Issue #51](https://github.com/influxdb/influxdb/issues/51). Implement first and last aggregates
## Bugfixes
- [Issue #57](https://github.com/influxdb/influxdb/issues/57). Don't panic when type of time != float

View File

@ -36,6 +36,8 @@ func init() {
registeredAggregators["mean"] = NewMeanAggregator
registeredAggregators["mode"] = NewModeAggregator
registeredAggregators["distinct"] = NewDistinctAggregator
registeredAggregators["first"] = NewFirstAggregator
registeredAggregators["last"] = NewLastAggregator
}
//
@ -812,3 +814,63 @@ func NewSumAggregator(_ *parser.Query, value *parser.Value) (Aggregator, error)
return currentValue + fv
})
}
type FirstOrLastAggregator struct {
name string
fieldName string
fieldIndex int
isFirst bool
values map[string]map[interface{}]*protocol.FieldValue
}
func (self *FirstOrLastAggregator) AggregatePoint(series string, group interface{}, p *protocol.Point) error {
values := self.values[series]
if values == nil {
values = make(map[interface{}]*protocol.FieldValue)
self.values[series] = values
}
if values[group] == nil || !self.isFirst {
values[group] = p.Values[self.fieldIndex]
}
return nil
}
func (self *FirstOrLastAggregator) ColumnName() string {
return self.name
}
func (self *FirstOrLastAggregator) GetValue(series string, group interface{}) []*protocol.FieldValue {
return []*protocol.FieldValue{self.values[series][group]}
}
func (self *FirstOrLastAggregator) InitializeFieldsMetadata(series *protocol.Series) error {
for idx, field := range series.Fields {
if field == self.fieldName {
self.fieldIndex = idx
return nil
}
}
return common.NewQueryError(common.InvalidArgument, fmt.Sprintf("Unknown column name %s", self.fieldName))
}
func NewFirstOrLastAggregator(name string, v *parser.Value, isFirst bool) (Aggregator, error) {
if len(v.Elems) != 1 {
return nil, common.NewQueryError(common.WrongNumberOfArguments, "function max() requires only one argument")
}
return &FirstOrLastAggregator{
name: name,
fieldName: v.Elems[0].Name,
isFirst: isFirst,
values: make(map[string]map[interface{}]*protocol.FieldValue),
}, nil
}
func NewFirstAggregator(_ *parser.Query, value *parser.Value) (Aggregator, error) {
return NewFirstOrLastAggregator("first", value, true)
}
func NewLastAggregator(_ *parser.Query, value *parser.Value) (Aggregator, error) {
return NewFirstOrLastAggregator("last", value, false)
}

View File

@ -216,6 +216,70 @@ func (self *EngineSuite) TestCountQuery(c *C) {
}
func (self *EngineSuite) TestFirstAndLastQuery(c *C) {
// make the mock coordinator return some data
engine := createEngine(c, `
[
{
"points": [
{
"values": [
{
"int64_value": 1
}
],
"timestamp": 1381346631000000,
"sequence_number": 1
},
{
"values": [
{
"int64_value": 2
}
],
"timestamp": 1381346631000000,
"sequence_number": 2
},
{
"values": [
{
"int64_value": 3
}
],
"timestamp": 1381346631000000,
"sequence_number": 3
}
],
"name": "foo",
"fields": ["column_one"]
}
]
`)
runQuery(engine, "select first(column_one), last(column_one) from foo", c, `[
{
"points": [
{
"values": [
{
"int64_value": 1
},
{
"int64_value": 3
}
],
"timestamp": 1381346631000000,
"sequence_number": 1
}
],
"name": "foo",
"fields": ["first", "last"]
}
]
`)
}
func (self *EngineSuite) TestUpperCaseQuery(c *C) {
// make the mock coordinator return some data
engine := createEngine(c, `