Merge pull request #3075 from influxdb/fix_tag_sets

Fix GROUP BY when different tags have same values
pull/3097/merge
Philip O'Toole 2015-06-22 16:07:22 -07:00
commit 93d7cd10a0
3 changed files with 90 additions and 24 deletions

View File

@ -26,6 +26,7 @@
- [#3033](https://github.com/influxdb/influxdb/pull/3033): Add support for marshaling `uint64` in client.
- [#3090](https://github.com/influxdb/influxdb/pull/3090): Remove database from TSDB index on DROP DATABASE.
- [#2944](https://github.com/influxdb/influxdb/issues/2944): Don't require "WHERE time" when creating continuous queries.
- [#3075](https://github.com/influxdb/influxdb/pull/3075): GROUP BY correctly when different tags have same value.
## v0.9.0 [2015-06-11]

View File

@ -582,6 +582,59 @@ func TestServer_Query_Multiple_Measurements(t *testing.T) {
}
}
// Ensure the server correctly supports data with identical tag values.
func TestServer_Query_IdenticalTagValues(t *testing.T) {
t.Parallel()
s := OpenServer(NewConfig(), "")
defer s.Close()
if err := s.CreateDatabaseAndRetentionPolicy("db0", newRetentionPolicyInfo("rp0", 1, 1*time.Hour)); err != nil {
t.Fatal(err)
}
writes := []string{
fmt.Sprintf("cpu,t1=val1 value=1 %d", mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:00Z").UnixNano()),
fmt.Sprintf("cpu,t2=val2 value=2 %d", mustParseTime(time.RFC3339Nano, "2000-01-01T00:01:00Z").UnixNano()),
fmt.Sprintf("cpu,t1=val2 value=3 %d", mustParseTime(time.RFC3339Nano, "2000-01-01T00:02:00Z").UnixNano()),
}
test := NewTest("db0", "rp0")
test.write = strings.Join(writes, "\n")
test.addQueries([]*Query{
&Query{
name: "measurements with identical tag values - SELECT *, no GROUP BY",
command: `SELECT * FROM db0.rp0.cpu`,
exp: `{"results":[{"series":[{"name":"cpu","tags":{"t1":"val1","t2":""},"columns":["time","value"],"values":[["2000-01-01T00:00:00Z",1]]},{"name":"cpu","tags":{"t1":"val2","t2":""},"columns":["time","value"],"values":[["2000-01-01T00:02:00Z",3]]},{"name":"cpu","tags":{"t1":"","t2":"val2"},"columns":["time","value"],"values":[["2000-01-01T00:01:00Z",2]]}]}]}`,
},
&Query{
name: "measurements with identical tag values - SELECT *, with GROUP BY",
command: `SELECT value FROM db0.rp0.cpu GROUP BY t1,t2`,
exp: `{"results":[{"series":[{"name":"cpu","tags":{"t1":"val1","t2":""},"columns":["time","value"],"values":[["2000-01-01T00:00:00Z",1]]},{"name":"cpu","tags":{"t1":"val2","t2":""},"columns":["time","value"],"values":[["2000-01-01T00:02:00Z",3]]},{"name":"cpu","tags":{"t1":"","t2":"val2"},"columns":["time","value"],"values":[["2000-01-01T00:01:00Z",2]]}]}]}`,
},
&Query{
name: "measurements with identical tag values - SELECT value no GROUP BY",
command: `SELECT value FROM db0.rp0.cpu`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","value"],"values":[["2000-01-01T00:00:00Z",1],["2000-01-01T00:01:00Z",2],["2000-01-01T00:02:00Z",3]]}]}]}`,
},
}...)
if err := test.init(s); err != nil {
t.Fatalf("test init failed: %s", err)
}
for _, query := range test.queries {
if query.skip {
t.Logf("SKIP:: %s", query.name)
continue
}
if err := query.Execute(s); err != nil {
t.Error(query.Error(err))
} else if !query.success() {
t.Error(query.failureMessage())
}
}
}
// Ensure the server can query with the count aggregate function
func TestServer_Query_Count(t *testing.T) {
t.Parallel()

View File

@ -469,43 +469,55 @@ func (m *Measurement) TagSets(stmt *influxql.SelectStatement, dimensions []strin
return nil, err
}
// build the tag sets
var tagStrings []string
// For every series, get the tag values for the requested tag keys i.e. dimensions. This is the
// TagSet for that series. Series with the same TagSet are then grouped together, because for the
// purpose of GROUP BY they are part of the same composite series.
tagSets := make(map[string]*influxql.TagSet)
for id, filter := range filters {
// get the series and set the tag values for the dimensions we care about
s := m.seriesByID[id]
tags := make([]string, len(dimensions))
for i, dim := range dimensions {
tags[i] = s.Tags[dim]
tags := make(map[string]string)
// Build the TagSet for this series.
for _, dim := range dimensions {
tags[dim] = s.Tags[dim]
}
// marshal it into a string and put this series and its expr into the tagSets map
t := strings.Join(tags, "")
set, ok := tagSets[t]
// Convert the TagSet to a string, so it can be added to a map allowing TagSets to be handled
// as a set.
tagsAsKey := string(marshalTags(tags))
tagSet, ok := tagSets[tagsAsKey]
if !ok {
tagStrings = append(tagStrings, t)
set = &influxql.TagSet{}
// set the tags for this set
// This TagSet is new, create a new entry for it.
tagSet = &influxql.TagSet{}
tagsForSet := make(map[string]string)
for i, dim := range dimensions {
tagsForSet[dim] = tags[i]
for k, v := range tags {
tagsForSet[k] = v
}
set.Tags = tagsForSet
set.Key = marshalTags(tagsForSet)
tagSet.Tags = tagsForSet
tagSet.Key = marshalTags(tagsForSet)
}
set.AddFilter(m.seriesByID[id].Key, filter)
tagSets[t] = set
// Associate the series and filter with the Tagset.
tagSet.AddFilter(m.seriesByID[id].Key, filter)
// Ensure it's back in the map.
tagSets[tagsAsKey] = tagSet
}
// return the tag sets in sorted order
a := make([]*influxql.TagSet, 0, len(tagSets))
sort.Strings(tagStrings)
for _, s := range tagStrings {
a = append(a, tagSets[s])
// The TagSets have been created, as a map of TagSets. Just send
// the values back as a slice, sorting for consistency.
sortedTagSetKeys := make([]string, 0, len(tagSets))
for k, _ := range tagSets {
sortedTagSetKeys = append(sortedTagSetKeys, k)
}
sort.Strings(sortedTagSetKeys)
sortedTagsSets := make([]*influxql.TagSet, 0, len(sortedTagSetKeys))
for _, k := range sortedTagSetKeys {
sortedTagsSets = append(sortedTagsSets, tagSets[k])
}
return a, nil
return sortedTagsSets, nil
}
// mergeSeriesFilters merges two sets of filter expressions and culls series IDs.