pull/3628/head
Cory LaNou 2015-08-10 16:35:33 -05:00
parent 8ea3c47747
commit dc83c57d7e
3 changed files with 50 additions and 9 deletions

View File

@ -2353,8 +2353,21 @@ func TestServer_Query_WildcardExpansion(t *testing.T) {
name: "wildcard",
params: url.Values{"db": []string{"db0"}},
command: `SELECT * FROM wildcard`,
exp: `{"results":[{"series":[{"name":"wildcard","columns":["time","cpu","host","region","value"],"values":[["2000-01-01T00:00:00Z",80,"A","us-east",10],["2000-01-01T00:00:10Z",90,"B","us-east",20],["2000-01-01T00:00:20Z",70,"B","us-west",30]]}]}]}`,
exp: `{"results":[{"series":[{"name":"wildcard","columns":["time","cpu","host","region","value"],"values":[["2000-01-01T00:00:00Z",80,"A","us-east",10],["2000-01-01T00:00:10Z",90,"B","us-east",20],["2000-01-01T00:00:20Z",70,"B","us-west",30],["2000-01-01T00:00:30Z",60,"A","us-east",40]]}]}]}`,
},
&Query{
name: "no wildcard in select",
params: url.Values{"db": []string{"db0"}},
command: `SELECT cpu, host, region, value FROM wildcard`,
exp: `{"results":[{"series":[{"name":"wildcard","columns":["time","cpu","host","region","value"],"values":[["2000-01-01T00:00:00Z",80,"A","us-east",10],["2000-01-01T00:00:10Z",90,"B","us-east",20],["2000-01-01T00:00:20Z",70,"B","us-west",30],["2000-01-01T00:00:30Z",60,"A","us-east",40]]}]}]}`,
},
&Query{
name: "no wildcard with alias",
params: url.Values{"db": []string{"db0"}},
command: `SELECT cpu as c, host as h, region, value FROM wildcard`,
exp: `{"results":[{"series":[{"name":"wildcard","columns":["time","c","h","region","value"],"values":[["2000-01-01T00:00:00Z",80,"A","us-east",10],["2000-01-01T00:00:10Z",90,"B","us-east",20],["2000-01-01T00:00:20Z",70,"B","us-west",30],["2000-01-01T00:00:30Z",60,"A","us-east",40]]}]}]}`,
},
//&Query{
//name: "duplicate tag and field name",
//params: url.Values{"db": []string{"db0"}},

View File

@ -167,6 +167,7 @@ func (e *Executor) executeRaw(out chan *influxql.Row) {
selectFields = sf.list()
aliasFields = selectFields
} else {
// TODO can you alias a tag?
selectFields = e.stmt.Fields.Names()
aliasFields = e.stmt.Fields.AliasNames()
}
@ -732,14 +733,32 @@ func (r *limitedRowWriter) processValues(values []*MapperValue) *influxql.Row {
vals[1] = v.Value.(interface{})
}
} else {
fields := v.Value.(map[string]interface{})
var fields map[string]interface{}
var dp *decodedPoint
switch v := v.Value.(type) {
case *decodedPoint:
dp = v
fields = v.value.(map[string]interface{})
case map[string]interface{}:
fields = v
}
// time is always the first value
vals[0] = time.Unix(0, v.Time).UTC()
// populate the other values
for i := 1; i < len(selectFields); i++ {
vals[i] = fields[selectFields[i]]
v, ok := fields[selectFields[i]]
if ok {
vals[i] = v
continue
}
if dp != nil {
v, ok = dp.tags[selectFields[i]]
if ok {
vals[i] = v
}
}
}
}
@ -801,7 +820,7 @@ func (rqdp *RawQueryDerivativeProcessor) Process(input []*MapperValue) []*Mapper
// Calculate the derivative of successive points by dividing the difference
// of each value by the elapsed time normalized to the interval
diff := int64toFloat64(v.Value) - int64toFloat64(rqdp.LastValueFromPreviousChunk.Value)
diff := int64toFloat64(v.Value.(*decodedPoint).value) - int64toFloat64(rqdp.LastValueFromPreviousChunk.Value.(*decodedPoint).value)
elapsed := v.Time - rqdp.LastValueFromPreviousChunk.Time

View File

@ -209,7 +209,8 @@ func (lm *LocalMapper) Open() error {
// No data exists for this key.
continue
}
cm := newSeriesCursor(c, t.Filters[i])
seriesTags := lm.shard.index.series[key].Tags
cm := newSeriesCursor(c, t.Filters[i], seriesTags)
cursors = append(cursors, cm)
}
@ -555,7 +556,7 @@ func (lm *LocalMapper) TagSets() []string {
// Fields returns any SELECT fields. If this Mapper is not processing a SELECT query
// then an empty slice is returned.
func (lm *LocalMapper) Fields() []string {
return lm.selectFields
return append(lm.selectFields, lm.selectTags...)
}
// Close closes the mapper.
@ -664,9 +665,15 @@ func (tsc *tagSetCursor) key() string {
return formMeasurementTagSetKey(tsc.measurement, tsc.tags)
}
type decodedPoint struct {
time int64
value interface{}
tags map[string]string
}
// Next returns the next matching series-key, timestamp and byte slice for the tagset. Filtering
// is enforced on the values. If there is no matching value, then a nil result is returned.
func (tsc *tagSetCursor) Next(tmin, tmax int64, selectFields, whereFields []string) (int64, interface{}) {
func (tsc *tagSetCursor) Next(tmin, tmax int64, selectFields, whereFields []string) (int64, *decodedPoint) {
for {
// If we're out of points, we're done.
if tsc.pointHeap.Len() == 0 {
@ -700,7 +707,7 @@ func (tsc *tagSetCursor) Next(tmin, tmax int64, selectFields, whereFields []stri
continue
}
return p.timestamp, value
return p.timestamp, &decodedPoint{p.timestamp, value, p.cursor.tags}
}
}
@ -746,13 +753,15 @@ func (tsc *tagSetCursor) decodeRawPoint(p *pointHeapItem, selectFields, whereFie
type seriesCursor struct {
cursor Cursor // BoltDB cursor for a series
filter influxql.Expr
tags map[string]string
}
// newSeriesCursor returns a new instance of a series cursor.
func newSeriesCursor(cur Cursor, filter influxql.Expr) *seriesCursor {
func newSeriesCursor(cur Cursor, filter influxql.Expr, tags map[string]string) *seriesCursor {
return &seriesCursor{
cursor: cur,
filter: filter,
tags: tags,
}
}