Delimit measurement name for Mapper tagsets
parent
5a311e6cc0
commit
3ddff9114d
|
@ -5,6 +5,7 @@ import (
|
|||
"errors"
|
||||
"fmt"
|
||||
"sort"
|
||||
"strings"
|
||||
|
||||
"github.com/boltdb/bolt"
|
||||
"github.com/influxdb/influxdb/influxql"
|
||||
|
@ -28,7 +29,7 @@ type rawMapperOutput struct {
|
|||
}
|
||||
|
||||
func (mo *rawMapperOutput) key() string {
|
||||
return mo.Name + string(marshalTags(mo.Tags))
|
||||
return formMeasurementTagSetKey(mo.Name, mo.Tags)
|
||||
}
|
||||
|
||||
// RawMapper is for retrieving data, for a raw query, for a single shard.
|
||||
|
@ -196,7 +197,7 @@ type aggMapperOutput struct {
|
|||
}
|
||||
|
||||
func (amo *aggMapperOutput) key() string {
|
||||
return amo.Name + string(marshalTags(amo.Tags))
|
||||
return formMeasurementTagSetKey(amo.Name, amo.Tags)
|
||||
}
|
||||
|
||||
// AggMapper is for retrieving data, for an aggregate query, from a given shard.
|
||||
|
@ -521,7 +522,7 @@ func newTagSetCursor(m string, t map[string]string, c []*seriesCursor, d *FieldC
|
|||
}
|
||||
|
||||
func (tsc *tagSetCursor) key() string {
|
||||
return tsc.measurement + string(marshalTags(tsc.tags))
|
||||
return formMeasurementTagSetKey(tsc.measurement, tsc.tags)
|
||||
}
|
||||
|
||||
// Next returns the next matching series-key, timestamp and byte slice for the tagset. Filtering
|
||||
|
@ -769,5 +770,12 @@ func matchesWhere(f influxql.Expr, fields map[string]interface{}) bool {
|
|||
return true
|
||||
}
|
||||
|
||||
func formMeasurementTagSetKey(name string, tags map[string]string) string {
|
||||
if len(tags) == 0 {
|
||||
return name
|
||||
}
|
||||
return strings.Join([]string{name, string(marshalTags(tags))}, "|")
|
||||
}
|
||||
|
||||
// btou64 converts an 8-byte slice into an uint64.
|
||||
func btou64(b []byte) uint64 { return binary.BigEndian.Uint64(b) }
|
||||
|
|
|
@ -48,11 +48,11 @@ func TestShardMapper_RawMapperTagSets(t *testing.T) {
|
|||
},
|
||||
{
|
||||
stmt: `SELECT value FROM cpu GROUP BY host`,
|
||||
expected: []string{"cpuhost|serverA", "cpuhost|serverB"},
|
||||
expected: []string{"cpu|host|serverA", "cpu|host|serverB"},
|
||||
},
|
||||
{
|
||||
stmt: `SELECT value FROM cpu GROUP BY region`,
|
||||
expected: []string{"cpuregion|us-east"},
|
||||
expected: []string{"cpu|region|us-east"},
|
||||
},
|
||||
{
|
||||
stmt: `SELECT value FROM cpu WHERE host='serverA'`,
|
||||
|
@ -365,11 +365,11 @@ func TestShardMapper_AggMapperTagSets(t *testing.T) {
|
|||
},
|
||||
{
|
||||
stmt: `SELECT sum(value) FROM cpu GROUP BY host`,
|
||||
expected: []string{"cpuhost|serverA", "cpuhost|serverB"},
|
||||
expected: []string{"cpu|host|serverA", "cpu|host|serverB"},
|
||||
},
|
||||
{
|
||||
stmt: `SELECT sum(value) FROM cpu GROUP BY region`,
|
||||
expected: []string{"cpuregion|us-east"},
|
||||
expected: []string{"cpu|region|us-east"},
|
||||
},
|
||||
{
|
||||
stmt: `SELECT sum(value) FROM cpu WHERE host='serverA'`,
|
||||
|
|
Loading…
Reference in New Issue