fix #69. Support column aliasing

pull/290/head
John Shahid 2014-03-03 13:08:08 -05:00
parent 5bb5161c8d
commit 0d52464061
2 changed files with 115 additions and 11 deletions

View File

@ -44,6 +44,14 @@ func init() {
registeredAggregators["last"] = NewLastAggregator
}
// used in testing to get a list of all aggregators
func GetRegisteredAggregators() (names []string) {
for n, _ := range registeredAggregators {
names = append(names, n)
}
return
}
type AbstractAggregator struct {
Aggregator
value *parser.Value
@ -103,6 +111,7 @@ type StandardDeviationAggregator struct {
AbstractAggregator
running map[string]map[interface{}]*StandardDeviationRunning
defaultValue *protocol.FieldValue
alias string
}
func (self *StandardDeviationAggregator) AggregatePoint(series string, group interface{}, p *protocol.Point) error {
@ -140,6 +149,10 @@ func (self *StandardDeviationAggregator) AggregatePoint(series string, group int
}
func (self *StandardDeviationAggregator) ColumnNames() []string {
if self.alias != "" {
return []string{self.alias}
}
return []string{"stddev"}
}
@ -196,6 +209,7 @@ func NewStandardDeviationAggregator(q *parser.SelectQuery, v *parser.Value, defa
},
running: make(map[string]map[interface{}]*StandardDeviationRunning),
defaultValue: value,
alias: v.Alias,
}, nil
}
@ -208,6 +222,7 @@ type DerivativeAggregator struct {
firstValues map[string]map[interface{}]*protocol.Point
lastValues map[string]map[interface{}]*protocol.Point
defaultValue *protocol.FieldValue
alias string
}
func (self *DerivativeAggregator) AggregatePoint(series string, group interface{}, p *protocol.Point) error {
@ -253,6 +268,9 @@ func (self *DerivativeAggregator) AggregatePoint(series string, group interface{
}
func (self *DerivativeAggregator) ColumnNames() []string {
if self.alias != "" {
return []string{self.alias}
}
return []string{"derivative"}
}
@ -299,6 +317,7 @@ func NewDerivativeAggregator(q *parser.SelectQuery, v *parser.Value, defaultValu
firstValues: make(map[string]map[interface{}]*protocol.Point),
lastValues: make(map[string]map[interface{}]*protocol.Point),
defaultValue: wrappedDefaultValue,
alias: v.Alias,
}, nil
}
@ -308,8 +327,9 @@ func NewDerivativeAggregator(q *parser.SelectQuery, v *parser.Value, defaultValu
type HistogramAggregator struct {
AbstractAggregator
bucketSize float64
histograms map[string]map[interface{}]map[int]int
bucketSize float64
histograms map[string]map[interface{}]map[int]int
columnNames []string
}
func (self *HistogramAggregator) AggregatePoint(series string, group interface{}, p *protocol.Point) error {
@ -344,7 +364,7 @@ func (self *HistogramAggregator) AggregatePoint(series string, group interface{}
}
func (self *HistogramAggregator) ColumnNames() []string {
return []string{"bucket_start", "count"}
return self.columnNames
}
func (self *HistogramAggregator) GetValues(series string, group interface{}) [][]*protocol.FieldValue {
@ -391,12 +411,19 @@ func NewHistogramAggregator(q *parser.SelectQuery, v *parser.Value, defaultValue
}
}
columnNames := []string{"bucket_start", "count"}
if v.Alias != "" {
columnNames[0] = fmt.Sprintf("%s_bucket_start", v.Alias)
columnNames[1] = fmt.Sprintf("%s_count", v.Alias)
}
return &HistogramAggregator{
AbstractAggregator: AbstractAggregator{
value: v.Elems[0],
},
bucketSize: bucketSize,
histograms: make(map[string]map[interface{}]map[int]int),
bucketSize: bucketSize,
histograms: make(map[string]map[interface{}]map[int]int),
columnNames: columnNames,
}, nil
}
@ -407,6 +434,7 @@ func NewHistogramAggregator(q *parser.SelectQuery, v *parser.Value, defaultValue
type CountAggregator struct {
defaultValue *protocol.FieldValue
counts map[string]map[interface{}]int32
alias string
}
func (self *CountAggregator) AggregatePoint(series string, group interface{}, p *protocol.Point) error {
@ -420,6 +448,9 @@ func (self *CountAggregator) AggregatePoint(series string, group interface{}, p
}
func (self *CountAggregator) ColumnNames() []string {
if self.alias != "" {
return []string{self.alias}
}
return []string{"count"}
}
@ -464,10 +495,10 @@ func NewCountAggregator(q *parser.SelectQuery, v *parser.Value, defaultValue *pa
if err != nil {
return nil, err
}
return NewCompositeAggregator(&CountAggregator{wrappedDefaultValue, make(map[string]map[interface{}]int32)}, inner)
return NewCompositeAggregator(&CountAggregator{wrappedDefaultValue, make(map[string]map[interface{}]int32), v.Alias}, inner)
}
return &CountAggregator{wrappedDefaultValue, make(map[string]map[interface{}]int32)}, nil
return &CountAggregator{wrappedDefaultValue, make(map[string]map[interface{}]int32), v.Alias}, nil
}
//
@ -537,6 +568,7 @@ type MeanAggregator struct {
means map[string]map[interface{}]float64
counts map[string]map[interface{}]int
defaultValue *protocol.FieldValue
alias string
}
func (self *MeanAggregator) AggregatePoint(series string, group interface{}, p *protocol.Point) error {
@ -574,6 +606,9 @@ func (self *MeanAggregator) AggregatePoint(series string, group interface{}, p *
}
func (self *MeanAggregator) ColumnNames() []string {
if self.alias != "" {
return []string{self.alias}
}
return []string{"mean"}
}
@ -608,6 +643,7 @@ func NewMeanAggregator(_ *parser.SelectQuery, value *parser.Value, defaultValue
means: make(map[string]map[interface{}]float64),
counts: make(map[string]map[interface{}]int),
defaultValue: wrappedDefaultValue,
alias: value.Alias,
}, nil
}
@ -621,15 +657,22 @@ func NewMedianAggregator(_ *parser.SelectQuery, value *parser.Value, defaultValu
return nil, err
}
return &PercentileAggregator{
functionName := "median"
if value.Alias != "" {
functionName = value.Alias
}
aggregator := &PercentileAggregator{
AbstractAggregator: AbstractAggregator{
value: value.Elems[0],
},
functionName: "median",
functionName: functionName,
percentile: 50.0,
float_values: make(map[string]map[interface{}][]float64),
defaultValue: wrappedDefaultValue,
}, nil
alias: value.Alias,
}
return aggregator, nil
}
//
@ -642,6 +685,7 @@ type PercentileAggregator struct {
percentile float64
float_values map[string]map[interface{}][]float64
defaultValue *protocol.FieldValue
alias string
}
func (self *PercentileAggregator) AggregatePoint(series string, group interface{}, p *protocol.Point) error {
@ -671,6 +715,9 @@ func (self *PercentileAggregator) AggregatePoint(series string, group interface{
}
func (self *PercentileAggregator) ColumnNames() []string {
if self.alias != "" {
return []string{self.alias}
}
return []string{self.functionName}
}
@ -708,11 +755,16 @@ func NewPercentileAggregator(_ *parser.SelectQuery, value *parser.Value, default
return nil, err
}
functionName := "percentile"
if value.Alias != "" {
functionName = value.Alias
}
return &PercentileAggregator{
AbstractAggregator: AbstractAggregator{
value: value.Elems[0],
},
functionName: "percentile",
functionName: functionName,
percentile: percentile,
float_values: make(map[string]map[interface{}][]float64),
defaultValue: wrappedDefaultValue,
@ -727,6 +779,7 @@ type ModeAggregator struct {
AbstractAggregator
counts map[string]map[interface{}]map[float64]int
defaultValue *protocol.FieldValue
alias string
}
func (self *ModeAggregator) AggregatePoint(series string, group interface{}, p *protocol.Point) error {
@ -764,6 +817,9 @@ func (self *ModeAggregator) AggregatePoint(series string, group interface{}, p *
}
func (self *ModeAggregator) ColumnNames() []string {
if self.alias != "" {
return []string{self.alias}
}
return []string{"mode"}
}
@ -815,6 +871,7 @@ func NewModeAggregator(_ *parser.SelectQuery, value *parser.Value, defaultValue
},
counts: make(map[string]map[interface{}]map[float64]int),
defaultValue: wrappedDefaultValue,
alias: value.Alias,
}, nil
}
@ -826,6 +883,7 @@ type DistinctAggregator struct {
AbstractAggregator
counts map[string]map[interface{}]map[interface{}]int
defaultValue *protocol.FieldValue
alias string
}
func (self *DistinctAggregator) AggregatePoint(series string, group interface{}, p *protocol.Point) error {
@ -867,6 +925,9 @@ func (self *DistinctAggregator) AggregatePoint(series string, group interface{},
}
func (self *DistinctAggregator) ColumnNames() []string {
if self.alias != "" {
return []string{self.alias}
}
return []string{"distinct"}
}
@ -907,6 +968,7 @@ func NewDistinctAggregator(_ *parser.SelectQuery, value *parser.Value, defaultVa
},
counts: make(map[string]map[interface{}]map[interface{}]int),
defaultValue: wrappedDefaultValue,
alias: value.Alias,
}, nil
}
@ -964,6 +1026,10 @@ func NewCumulativeArithmeticAggregator(name string, value *parser.Value, initial
return nil, err
}
if value.Alias != "" {
name = value.Alias
}
return &CumulativeArithmeticAggregator{
AbstractAggregator: AbstractAggregator{
value: value.Elems[0],
@ -1065,6 +1131,10 @@ func NewFirstOrLastAggregator(name string, v *parser.Value, isFirst bool, defaul
return nil, err
}
if v.Alias != "" {
name = v.Alias
}
return &FirstOrLastAggregator{
AbstractAggregator: AbstractAggregator{
value: v.Elems[0],

View File

@ -5,6 +5,7 @@ import (
"checkers"
. "common"
"encoding/json"
"engine"
"flag"
"fmt"
"io/ioutil"
@ -626,6 +627,39 @@ func (self *IntegrationSuite) TestCountWithGroupBy(c *C) {
c.Assert(data[0].Points[1][1], Equals, 20.0)
}
func (self *IntegrationSuite) TestCountWithAlias(c *C) {
for i := 0; i < 5; i++ {
err := self.server.WriteData(fmt.Sprintf(`
[
{
"name": "test_aliasing",
"columns": ["cpu", "host"],
"points": [[%d, "hosta"], [%d, "hostb"]]
}
]
`, 60+i*10, 70+i*10))
c.Assert(err, IsNil)
}
for _, name := range engine.GetRegisteredAggregators() {
query := fmt.Sprintf("select %s(cpu) as some_alias from test_aliasing", name)
if name == "percentile" {
query = "select percentile(cpu, 90) as some_alias from test_aliasing"
}
fmt.Printf("query: %s\n", query)
bs, err := self.server.RunQuery(query, "m")
c.Assert(err, IsNil)
data := []*SerializedSeries{}
err = json.Unmarshal(bs, &data)
c.Assert(data, HasLen, 1)
c.Assert(data[0].Name, Equals, "test_aliasing")
if name == "histogram" {
c.Assert(data[0].Columns, DeepEquals, []string{"time", "some_alias_bucket_start", "some_alias_count"})
continue
}
c.Assert(data[0].Columns, DeepEquals, []string{"time", "some_alias"})
}
}
// test for issue #30
func (self *IntegrationSuite) TestHttpPostWithTime(c *C) {
now := time.Now().Add(-10 * 24 * time.Hour)