issue #1412: wire up show tag values

pull/1445/head
David Norton 2015-01-29 15:00:15 -05:00
parent 0cf681d19c
commit f827bf324a
4 changed files with 295 additions and 4 deletions

View File

@ -1178,3 +1178,77 @@ func (m *Measurement) tagKeys() []string {
sort.Strings(keys)
return keys
}
func (m *Measurement) tagValuesByKeyAndSeriesID(tagKeys []string, ids seriesIDs) stringSet {
// If no tag keys were passed, get all tag keys for the measurement.
if len(tagKeys) == 0 {
for k, _ := range m.seriesByTagKeyValue {
tagKeys = append(tagKeys, k)
}
}
// Make a set to hold all tag values found.
tagValues := newStringSet()
// Iterate all series to collect tag values.
for _, id := range ids {
s, ok := m.seriesByID[id]
if !ok {
continue
}
// Iterate the tag keys we're interested in and collect values
// from this series, if they exist.
for _, tagKey := range tagKeys {
if tagVal, ok := s.Tags[tagKey]; ok {
tagValues.add(tagVal)
}
}
}
return tagValues
}
type stringSet map[string]struct{}
func newStringSet() stringSet {
return make(map[string]struct{})
}
func (s stringSet) add(ss string) {
s[ss] = struct{}{}
}
func (s stringSet) list() []string {
l := make([]string, 0, len(s))
for k, _ := range s {
l = append(l, k)
}
return l
}
func (s stringSet) union(o stringSet) stringSet {
ns := newStringSet()
for k, _ := range s {
ns[k] = struct{}{}
}
for k, _ := range o {
ns[k] = struct{}{}
}
return ns
}
func (s stringSet) intersect(o stringSet) stringSet {
ns := newStringSet()
for k, _ := range s {
if _, ok := o[k]; ok {
ns[k] = struct{}{}
}
}
for k, _ := range o {
if _, ok := s[k]; ok {
ns[k] = struct{}{}
}
}
return ns
}

View File

@ -1132,6 +1132,147 @@ func TestHandler_serveShowTagKeys(t *testing.T) {
}
}
func TestHandler_serveShowTagValues(t *testing.T) {
srvr := OpenServer(NewMessagingClient())
srvr.CreateDatabase("foo")
srvr.CreateRetentionPolicy("foo", influxdb.NewRetentionPolicy("bar"))
srvr.SetDefaultRetentionPolicy("foo", "bar")
s := NewHTTPServer(srvr)
defer s.Close()
status, body := MustHTTP("POST", s.URL+`/write`, nil, nil, `{"database" : "foo", "retentionPolicy" : "bar", "points": [
{"name": "cpu", "tags": {"host": "server01"},"timestamp": "2009-11-10T23:00:00Z","values": {"value": 100}},
{"name": "cpu", "tags": {"host": "server01", "region": "uswest"},"timestamp": "2009-11-10T23:00:00Z","values": {"value": 100}},
{"name": "cpu", "tags": {"host": "server01", "region": "useast"},"timestamp": "2009-11-10T23:00:00Z","values": {"value": 100}},
{"name": "cpu", "tags": {"host": "server02", "region": "useast"},"timestamp": "2009-11-10T23:00:00Z","values": {"value": 100}},
{"name": "gpu", "tags": {"host": "server02", "region": "useast"},"timestamp": "2009-11-10T23:00:00Z","values": {"value": 100}}
]}`)
if status != http.StatusOK {
t.Log(body)
t.Fatalf("unexpected status after write: %d", status)
}
var tests = []struct {
q string
r *influxdb.Results
err string
}{
// SHOW TAG VALUES
{
q: `SHOW TAG VALUES WITH KEY = host`,
r: &influxdb.Results{
Results: []*influxdb.Result{
&influxdb.Result{
Rows: []*influxql.Row{
&influxql.Row{
Name: "cpu",
Columns: []string{"tagValue"},
Values: [][]interface{}{
str2iface([]string{"server01"}),
str2iface([]string{"server02"}),
},
},
&influxql.Row{
Name: "gpu",
Columns: []string{"tagValue"},
Values: [][]interface{}{
str2iface([]string{"server02"}),
},
},
},
},
},
},
},
// SHOW TAG VALUES FROM ...
{
q: `SHOW TAG VALUES FROM cpu WITH KEY = host`,
r: &influxdb.Results{
Results: []*influxdb.Result{
&influxdb.Result{
Rows: []*influxql.Row{
&influxql.Row{
Name: "cpu",
Columns: []string{"tagValue"},
Values: [][]interface{}{
str2iface([]string{"server01"}),
str2iface([]string{"server02"}),
},
},
},
},
},
},
},
// SHOW TAG VALUES FROM ... WHERE ...
{
q: `SHOW TAG VALUES FROM cpu WITH KEY = host WHERE region = 'uswest'`,
r: &influxdb.Results{
Results: []*influxdb.Result{
&influxdb.Result{
Rows: []*influxql.Row{
&influxql.Row{
Name: "cpu",
Columns: []string{"tagValue"},
Values: [][]interface{}{
str2iface([]string{"server01"}),
},
},
},
},
},
},
},
// SHOW TAG VALUES FROM ... WITH KEY IN ... WHERE ...
{
q: `SHOW TAG VALUES FROM cpu WITH KEY IN (host, region) WHERE region = 'uswest'`,
r: &influxdb.Results{
Results: []*influxdb.Result{
&influxdb.Result{
Rows: []*influxql.Row{
&influxql.Row{
Name: "cpu",
Columns: []string{"tagValue"},
Values: [][]interface{}{
str2iface([]string{"server01"}),
str2iface([]string{"uswest"}),
},
},
},
},
},
},
},
}
for i, tt := range tests {
query := map[string]string{"db": "foo", "q": tt.q}
status, body = MustHTTP("GET", s.URL+`/query`, query, nil, "")
if status != http.StatusOK {
t.Logf("query #%d: %s", i, tt.q)
t.Log(body)
t.Errorf("unexpected status: %d", status)
}
r := &influxdb.Results{}
if err := json.Unmarshal([]byte(body), r); err != nil {
t.Logf("query #%d: %s", i, tt.q)
t.Log(body)
t.Error(err)
}
if !reflect.DeepEqual(tt.err, errstring(r.Err)) {
t.Errorf("%d. %s: error mismatch:\n exp=%s\n got=%s\n\n", i, tt.q, tt.err, r.Err)
} else if tt.err == "" && !reflect.DeepEqual(tt.r, r) {
b, _ := json.Marshal(tt.r)
t.Log(string(b))
t.Log(body)
t.Errorf("%d. %s: result mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.q, tt.r, r)
}
}
}
// Utility functions for this test suite.
func MustHTTP(verb, path string, params, headers map[string]string, body string) (int, string) {

View File

@ -1642,7 +1642,7 @@ func (s *Server) ExecuteQuery(q *influxql.Query, database string, user *User) Re
case *influxql.ShowTagKeysStatement:
res = s.executeShowTagKeysStatement(stmt, database, user)
case *influxql.ShowTagValuesStatement:
continue
res = s.executeShowTagValuesStatement(stmt, database, user)
case *influxql.ShowFieldKeysStatement:
continue
case *influxql.ShowFieldValuesStatement:
@ -1945,6 +1945,82 @@ func (s *Server) executeShowTagKeysStatement(stmt *influxql.ShowTagKeysStatement
return result
}
func (s *Server) executeShowTagValuesStatement(stmt *influxql.ShowTagValuesStatement, database string, user *User) *Result {
s.mu.RLock()
defer s.mu.RUnlock()
// Find the database.
db := s.databases[database]
if db == nil {
return &Result{Err: ErrDatabaseNotFound}
}
// Get the list of measurements we're interested in.
measurements, err := measurementsFromSourceOrDB(stmt.Source, db)
if err != nil {
return &Result{Err: err}
}
// Make result.
result := &Result{
Rows: make(influxql.Rows, 0, len(measurements)),
}
for _, name := range measurements {
// Look up the measurement by name.
m, ok := db.measurements[name]
if !ok {
continue
}
var ids seriesIDs
if stmt.Condition != nil {
// Get series IDs that match the WHERE clause.
filters := map[uint32]influxql.Expr{}
ids, _, _ = m.walkWhereForSeriesIds(stmt.Condition, filters)
// If no series matched, then go to the next measurement.
if len(ids) == 0 {
continue
}
// TODO: check return of walkWhereForSeriesIds for fields
} else {
// No WHERE clause so get all series IDs for this measurement.
ids = m.seriesIDs
}
tagValues := m.tagValuesByKeyAndSeriesID(stmt.TagKeys, ids)
r := &influxql.Row{
Name: m.Name,
Columns: []string{"tagValue"},
}
vals := tagValues.list()
sort.Strings(vals)
for _, val := range vals {
v := interface{}(val)
r.Values = append(r.Values, []interface{}{v})
}
result.Rows = append(result.Rows, r)
}
return result
}
// str2iface converts an array of strings to an array of interfaces.
func str2iface(strs []string) []interface{} {
a := make([]interface{}, 0, len(strs))
for _, s := range strs {
a = append(a, interface{}(s))
}
return a
}
// measurementsFromSourceOrDB returns a list of measurement names from the
// statement passed in or, if the statement is nil, a list of all
// measurement names from the database passed in.

View File

@ -69,8 +69,8 @@ func mapKeyList(m interface{}) []string {
switch m.(type) {
case map[string]string:
return mapStrStrKeyList(m.(map[string]string))
case map[string]int:
return mapStrIntKeyList(m.(map[string]int))
case map[string]uint32:
return mapStrUint32KeyList(m.(map[string]uint32))
}
return nil
}
@ -83,7 +83,7 @@ func mapStrStrKeyList(m map[string]string) []string {
return l
}
func mapStrIntKeyList(m map[string]int) []string {
func mapStrUint32KeyList(m map[string]uint32) []string {
l := make([]string, 0, len(m))
for k, _ := range m {
l = append(l, k)