Merge pull request #1445 from influxdb/show-tag-values

Show tag values
pull/1446/head
Paul Dix 2015-01-29 15:15:19 -05:00
commit ee2359e98f
8 changed files with 399 additions and 66 deletions

View File

@ -1178,3 +1178,77 @@ func (m *Measurement) tagKeys() []string {
sort.Strings(keys)
return keys
}
func (m *Measurement) tagValuesByKeyAndSeriesID(tagKeys []string, ids seriesIDs) stringSet {
// If no tag keys were passed, get all tag keys for the measurement.
if len(tagKeys) == 0 {
for k, _ := range m.seriesByTagKeyValue {
tagKeys = append(tagKeys, k)
}
}
// Make a set to hold all tag values found.
tagValues := newStringSet()
// Iterate all series to collect tag values.
for _, id := range ids {
s, ok := m.seriesByID[id]
if !ok {
continue
}
// Iterate the tag keys we're interested in and collect values
// from this series, if they exist.
for _, tagKey := range tagKeys {
if tagVal, ok := s.Tags[tagKey]; ok {
tagValues.add(tagVal)
}
}
}
return tagValues
}
type stringSet map[string]struct{}
func newStringSet() stringSet {
return make(map[string]struct{})
}
func (s stringSet) add(ss string) {
s[ss] = struct{}{}
}
func (s stringSet) list() []string {
l := make([]string, 0, len(s))
for k, _ := range s {
l = append(l, k)
}
return l
}
func (s stringSet) union(o stringSet) stringSet {
ns := newStringSet()
for k, _ := range s {
ns[k] = struct{}{}
}
for k, _ := range o {
ns[k] = struct{}{}
}
return ns
}
func (s stringSet) intersect(o stringSet) stringSet {
ns := newStringSet()
for k, _ := range s {
if _, ok := o[k]; ok {
ns[k] = struct{}{}
}
}
for k, _ := range o {
if _, ok := s[k]; ok {
ns[k] = struct{}{}
}
}
return ns
}

View File

@ -1132,6 +1132,147 @@ func TestHandler_serveShowTagKeys(t *testing.T) {
}
}
func TestHandler_serveShowTagValues(t *testing.T) {
srvr := OpenServer(NewMessagingClient())
srvr.CreateDatabase("foo")
srvr.CreateRetentionPolicy("foo", influxdb.NewRetentionPolicy("bar"))
srvr.SetDefaultRetentionPolicy("foo", "bar")
s := NewHTTPServer(srvr)
defer s.Close()
status, body := MustHTTP("POST", s.URL+`/write`, nil, nil, `{"database" : "foo", "retentionPolicy" : "bar", "points": [
{"name": "cpu", "tags": {"host": "server01"},"timestamp": "2009-11-10T23:00:00Z","values": {"value": 100}},
{"name": "cpu", "tags": {"host": "server01", "region": "uswest"},"timestamp": "2009-11-10T23:00:00Z","values": {"value": 100}},
{"name": "cpu", "tags": {"host": "server01", "region": "useast"},"timestamp": "2009-11-10T23:00:00Z","values": {"value": 100}},
{"name": "cpu", "tags": {"host": "server02", "region": "useast"},"timestamp": "2009-11-10T23:00:00Z","values": {"value": 100}},
{"name": "gpu", "tags": {"host": "server02", "region": "useast"},"timestamp": "2009-11-10T23:00:00Z","values": {"value": 100}}
]}`)
if status != http.StatusOK {
t.Log(body)
t.Fatalf("unexpected status after write: %d", status)
}
var tests = []struct {
q string
r *influxdb.Results
err string
}{
// SHOW TAG VALUES
{
q: `SHOW TAG VALUES WITH KEY = host`,
r: &influxdb.Results{
Results: []*influxdb.Result{
&influxdb.Result{
Rows: []*influxql.Row{
&influxql.Row{
Name: "cpu",
Columns: []string{"tagValue"},
Values: [][]interface{}{
str2iface([]string{"server01"}),
str2iface([]string{"server02"}),
},
},
&influxql.Row{
Name: "gpu",
Columns: []string{"tagValue"},
Values: [][]interface{}{
str2iface([]string{"server02"}),
},
},
},
},
},
},
},
// SHOW TAG VALUES FROM ...
{
q: `SHOW TAG VALUES FROM cpu WITH KEY = host`,
r: &influxdb.Results{
Results: []*influxdb.Result{
&influxdb.Result{
Rows: []*influxql.Row{
&influxql.Row{
Name: "cpu",
Columns: []string{"tagValue"},
Values: [][]interface{}{
str2iface([]string{"server01"}),
str2iface([]string{"server02"}),
},
},
},
},
},
},
},
// SHOW TAG VALUES FROM ... WHERE ...
{
q: `SHOW TAG VALUES FROM cpu WITH KEY = host WHERE region = 'uswest'`,
r: &influxdb.Results{
Results: []*influxdb.Result{
&influxdb.Result{
Rows: []*influxql.Row{
&influxql.Row{
Name: "cpu",
Columns: []string{"tagValue"},
Values: [][]interface{}{
str2iface([]string{"server01"}),
},
},
},
},
},
},
},
// SHOW TAG VALUES FROM ... WITH KEY IN ... WHERE ...
{
q: `SHOW TAG VALUES FROM cpu WITH KEY IN (host, region) WHERE region = 'uswest'`,
r: &influxdb.Results{
Results: []*influxdb.Result{
&influxdb.Result{
Rows: []*influxql.Row{
&influxql.Row{
Name: "cpu",
Columns: []string{"tagValue"},
Values: [][]interface{}{
str2iface([]string{"server01"}),
str2iface([]string{"uswest"}),
},
},
},
},
},
},
},
}
for i, tt := range tests {
query := map[string]string{"db": "foo", "q": tt.q}
status, body = MustHTTP("GET", s.URL+`/query`, query, nil, "")
if status != http.StatusOK {
t.Logf("query #%d: %s", i, tt.q)
t.Log(body)
t.Errorf("unexpected status: %d", status)
}
r := &influxdb.Results{}
if err := json.Unmarshal([]byte(body), r); err != nil {
t.Logf("query #%d: %s", i, tt.q)
t.Log(body)
t.Error(err)
}
if !reflect.DeepEqual(tt.err, errstring(r.Err)) {
t.Errorf("%d. %s: error mismatch:\n exp=%s\n got=%s\n\n", i, tt.q, tt.err, r.Err)
} else if tt.err == "" && !reflect.DeepEqual(tt.r, r) {
b, _ := json.Marshal(tt.r)
t.Log(string(b))
t.Log(body)
t.Errorf("%d. %s: result mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.q, tt.r, r)
}
}
}
// Utility functions for this test suite.
func MustHTTP(verb, path string, params, headers map[string]string, body string) (int, string) {

View File

@ -91,7 +91,6 @@ func (_ *ParenExpr) node() {}
func (_ *SortField) node() {}
func (_ SortFields) node() {}
func (_ *StringLiteral) node() {}
func (_ *TagKeyIdent) node() {}
func (_ *Target) node() {}
func (_ *TimeLiteral) node() {}
func (_ *VarRef) node() {}
@ -177,7 +176,6 @@ func (_ *nilLiteral) expr() {}
func (_ *NumberLiteral) expr() {}
func (_ *ParenExpr) expr() {}
func (_ *StringLiteral) expr() {}
func (_ *TagKeyIdent) expr() {}
func (_ *TimeLiteral) expr() {}
func (_ *VarRef) expr() {}
func (_ *Wildcard) expr() {}
@ -1081,6 +1079,9 @@ type ShowTagValuesStatement struct {
// Data source that fields are extracted from.
Source Source
// Tag key(s) to pull values from.
TagKeys []string
// An expression evaluated on data point.
Condition Expr
@ -1457,12 +1458,6 @@ type StringLiteral struct {
// String returns a string representation of the literal.
func (l *StringLiteral) String() string { return QuoteString(l.Val) }
// TagKeyIdent represents a special TAG KEY identifier.
type TagKeyIdent struct{}
// String returns a string representation of the TagKeyIdent.
func (t *TagKeyIdent) String() string { return "TAG KEY" }
// TimeLiteral represents a point-in-time literal.
type TimeLiteral struct {
Val time.Time

View File

@ -348,7 +348,7 @@ func (p *Parser) parseDuration() (time.Duration, error) {
return d, nil
}
// parserIdent parses an identifier.
// parseIdent parses an identifier.
func (p *Parser) parseIdent() (string, error) {
tok, pos, lit := p.scanIgnoreWhitespace()
if tok != IDENT {
@ -357,6 +357,30 @@ func (p *Parser) parseIdent() (string, error) {
return lit, nil
}
// parseIdentList parses a comma delimited list of identifiers.
func (p *Parser) parseIdentList() ([]string, error) {
// Parse first (required) identifier.
ident, err := p.parseIdent()
if err != nil {
return nil, err
}
idents := []string{ident}
// Parse remaining (optional) identifiers.
for {
if tok, _, _ := p.scanIgnoreWhitespace(); tok != COMMA {
p.unscan()
return idents, nil
}
if ident, err = p.parseIdent(); err != nil {
return nil, err
}
idents = append(idents, ident)
}
}
// parserString parses a string.
func (p *Parser) parseString() (string, error) {
tok, pos, lit := p.scanIgnoreWhitespace()
@ -717,11 +741,17 @@ func (p *Parser) parseShowTagValuesStatement() (*ShowTagValuesStatement, error)
stmt := &ShowTagValuesStatement{}
var err error
// Parse source.
if tok, pos, lit := p.scanIgnoreWhitespace(); tok != FROM {
return nil, newParseError(tokstr(tok, lit), []string{"FROM"}, pos)
// Parse optional source.
if tok, _, _ := p.scanIgnoreWhitespace(); tok == FROM {
if stmt.Source, err = p.parseSource(); err != nil {
return nil, err
}
} else {
p.unscan()
}
if stmt.Source, err = p.parseSource(); err != nil {
// Parse required WITH KEY.
if stmt.TagKeys, err = p.parseTagKeys(); err != nil {
return nil, err
}
@ -748,6 +778,47 @@ func (p *Parser) parseShowTagValuesStatement() (*ShowTagValuesStatement, error)
return stmt, nil
}
// parseTagKeys parses a string and returns a list of tag keys.
func (p *Parser) parseTagKeys() ([]string, error) {
var err error
// Parse required WITH KEY tokens.
if err := p.parseTokens([]Token{WITH, KEY}); err != nil {
return nil, err
}
var tagKeys []string
// Parse required IN or EQ token.
if tok, pos, lit := p.scanIgnoreWhitespace(); tok == IN {
// Parse required ( token.
if tok, pos, lit = p.scanIgnoreWhitespace(); tok != LPAREN {
return nil, newParseError(tokstr(tok, lit), []string{"("}, pos)
}
// Parse tag key list.
if tagKeys, err = p.parseIdentList(); err != nil {
return nil, err
}
// Parse required ) token.
if tok, pos, lit = p.scanIgnoreWhitespace(); tok != RPAREN {
return nil, newParseError(tokstr(tok, lit), []string{"("}, pos)
}
} else if tok == EQ {
// Parse required tag key.
ident, err := p.parseIdent()
if err != nil {
return nil, err
}
tagKeys = append(tagKeys, ident)
} else {
return nil, newParseError(tokstr(tok, lit), []string{"IN", "="}, pos)
}
return tagKeys, nil
}
// parseShowUsersStatement parses a string and returns a ShowUsersStatement.
// This function assumes the "SHOW USERS" tokens have been consumed.
func (p *Parser) parseShowUsersStatement() (*ShowUsersStatement, error) {
@ -1457,11 +1528,6 @@ func (p *Parser) parseUnaryExpr() (Expr, error) {
case DURATION_VAL:
v, _ := ParseDuration(lit)
return &DurationLiteral{Val: v}, nil
case TAG:
if tok, pos, lit = p.scanIgnoreWhitespace(); tok != KEY {
return nil, newParseError(tokstr(tok, lit), []string{"KEY"}, pos)
}
return &TagKeyIdent{}, nil
default:
return nil, newParseError(tokstr(tok, lit), []string{"identifier", "string", "number", "bool"}, pos)
}

View File

@ -231,11 +231,12 @@ func TestParser_ParseStatement(t *testing.T) {
},
},
// SHOW TAG VALUES
// SHOW TAG VALUES FROM ... WITH KEY = ...
{
s: `SHOW TAG VALUES FROM src WHERE region = 'uswest' ORDER BY ASC, field1, field2 DESC LIMIT 10`,
s: `SHOW TAG VALUES FROM src WITH KEY = region WHERE region = 'uswest' ORDER BY ASC, field1, field2 DESC LIMIT 10`,
stmt: &influxql.ShowTagValuesStatement{
Source: &influxql.Measurement{Name: "src"},
Source: &influxql.Measurement{Name: "src"},
TagKeys: []string{"region"},
Condition: &influxql.BinaryExpr{
Op: influxql.EQ,
LHS: &influxql.VarRef{Val: "region"},
@ -250,65 +251,43 @@ func TestParser_ParseStatement(t *testing.T) {
},
},
// SHOW TAG VALUES ... TAG KEY =
// SHOW TAG VALUES FROM ... WITH KEY IN...
{
s: `SHOW TAG VALUES FROM cpu WHERE TAG KEY = 'host' AND region = 'uswest'`,
s: `SHOW TAG VALUES FROM cpu WITH KEY IN (region, host) WHERE region = 'uswest'`,
stmt: &influxql.ShowTagValuesStatement{
Source: &influxql.Measurement{Name: "cpu"},
Source: &influxql.Measurement{Name: "cpu"},
TagKeys: []string{"region", "host"},
Condition: &influxql.BinaryExpr{
Op: influxql.AND,
LHS: &influxql.BinaryExpr{
Op: influxql.EQ,
LHS: &influxql.TagKeyIdent{},
RHS: &influxql.StringLiteral{Val: "host"},
},
RHS: &influxql.BinaryExpr{
Op: influxql.EQ,
LHS: &influxql.VarRef{Val: "region"},
RHS: &influxql.StringLiteral{Val: "uswest"},
},
Op: influxql.EQ,
LHS: &influxql.VarRef{Val: "region"},
RHS: &influxql.StringLiteral{Val: "uswest"},
},
},
},
// SHOW TAG VALUES ... AND TAG KEY =
{
s: `SHOW TAG VALUES FROM cpu WHERE region = 'uswest' AND TAG KEY = 'host'`,
s: `SHOW TAG VALUES FROM cpu WITH KEY IN (region,service,host)WHERE region = 'uswest'`,
stmt: &influxql.ShowTagValuesStatement{
Source: &influxql.Measurement{Name: "cpu"},
Source: &influxql.Measurement{Name: "cpu"},
TagKeys: []string{"region", "service", "host"},
Condition: &influxql.BinaryExpr{
Op: influxql.AND,
LHS: &influxql.BinaryExpr{
Op: influxql.EQ,
LHS: &influxql.VarRef{Val: "region"},
RHS: &influxql.StringLiteral{Val: "uswest"},
},
RHS: &influxql.BinaryExpr{
Op: influxql.EQ,
LHS: &influxql.TagKeyIdent{},
RHS: &influxql.StringLiteral{Val: "host"},
},
Op: influxql.EQ,
LHS: &influxql.VarRef{Val: "region"},
RHS: &influxql.StringLiteral{Val: "uswest"},
},
},
},
// SHOW TAG VALUES ... AND ... = TAG KEY
// SHOW TAG VALUES WITH KEY = ...
{
s: `SHOW TAG VALUES FROM cpu WHERE region = 'uswest' AND 'host' = TAG KEY`,
s: `SHOW TAG VALUES WITH KEY = host WHERE region = 'uswest'`,
stmt: &influxql.ShowTagValuesStatement{
Source: &influxql.Measurement{Name: "cpu"},
TagKeys: []string{"host"},
Condition: &influxql.BinaryExpr{
Op: influxql.AND,
LHS: &influxql.BinaryExpr{
Op: influxql.EQ,
LHS: &influxql.VarRef{Val: "region"},
RHS: &influxql.StringLiteral{Val: "uswest"},
},
RHS: &influxql.BinaryExpr{
Op: influxql.EQ,
LHS: &influxql.StringLiteral{Val: "host"},
RHS: &influxql.TagKeyIdent{},
},
Op: influxql.EQ,
LHS: &influxql.VarRef{Val: "region"},
RHS: &influxql.StringLiteral{Val: "uswest"},
},
},
},

View File

@ -74,6 +74,7 @@ const (
GRANT
GROUP
IF
IN
INNER
INSERT
INTO
@ -165,6 +166,7 @@ var tokens = [...]string{
GRANT: "GRANT",
GROUP: "GROUP",
IF: "IF",
IN: "IN",
INNER: "INNER",
INSERT: "INSERT",
INTO: "INTO",

View File

@ -1642,7 +1642,7 @@ func (s *Server) ExecuteQuery(q *influxql.Query, database string, user *User) Re
case *influxql.ShowTagKeysStatement:
res = s.executeShowTagKeysStatement(stmt, database, user)
case *influxql.ShowTagValuesStatement:
continue
res = s.executeShowTagValuesStatement(stmt, database, user)
case *influxql.ShowFieldKeysStatement:
continue
case *influxql.ShowFieldValuesStatement:
@ -1945,6 +1945,82 @@ func (s *Server) executeShowTagKeysStatement(stmt *influxql.ShowTagKeysStatement
return result
}
func (s *Server) executeShowTagValuesStatement(stmt *influxql.ShowTagValuesStatement, database string, user *User) *Result {
s.mu.RLock()
defer s.mu.RUnlock()
// Find the database.
db := s.databases[database]
if db == nil {
return &Result{Err: ErrDatabaseNotFound}
}
// Get the list of measurements we're interested in.
measurements, err := measurementsFromSourceOrDB(stmt.Source, db)
if err != nil {
return &Result{Err: err}
}
// Make result.
result := &Result{
Rows: make(influxql.Rows, 0, len(measurements)),
}
for _, name := range measurements {
// Look up the measurement by name.
m, ok := db.measurements[name]
if !ok {
continue
}
var ids seriesIDs
if stmt.Condition != nil {
// Get series IDs that match the WHERE clause.
filters := map[uint32]influxql.Expr{}
ids, _, _ = m.walkWhereForSeriesIds(stmt.Condition, filters)
// If no series matched, then go to the next measurement.
if len(ids) == 0 {
continue
}
// TODO: check return of walkWhereForSeriesIds for fields
} else {
// No WHERE clause so get all series IDs for this measurement.
ids = m.seriesIDs
}
tagValues := m.tagValuesByKeyAndSeriesID(stmt.TagKeys, ids)
r := &influxql.Row{
Name: m.Name,
Columns: []string{"tagValue"},
}
vals := tagValues.list()
sort.Strings(vals)
for _, val := range vals {
v := interface{}(val)
r.Values = append(r.Values, []interface{}{v})
}
result.Rows = append(result.Rows, r)
}
return result
}
// str2iface converts an array of strings to an array of interfaces.
func str2iface(strs []string) []interface{} {
a := make([]interface{}, 0, len(strs))
for _, s := range strs {
a = append(a, interface{}(s))
}
return a
}
// measurementsFromSourceOrDB returns a list of measurement names from the
// statement passed in or, if the statement is nil, a list of all
// measurement names from the database passed in.

View File

@ -69,8 +69,8 @@ func mapKeyList(m interface{}) []string {
switch m.(type) {
case map[string]string:
return mapStrStrKeyList(m.(map[string]string))
case map[string]int:
return mapStrIntKeyList(m.(map[string]int))
case map[string]uint32:
return mapStrUint32KeyList(m.(map[string]uint32))
}
return nil
}
@ -83,7 +83,7 @@ func mapStrStrKeyList(m map[string]string) []string {
return l
}
func mapStrIntKeyList(m map[string]int) []string {
func mapStrUint32KeyList(m map[string]uint32) []string {
l := make([]string, 0, len(m))
for k, _ := range m {
l = append(l, k)