diff --git a/database.go b/database.go index f6ead62dd2..c367c74e1d 100644 --- a/database.go +++ b/database.go @@ -1178,3 +1178,77 @@ func (m *Measurement) tagKeys() []string { sort.Strings(keys) return keys } + +func (m *Measurement) tagValuesByKeyAndSeriesID(tagKeys []string, ids seriesIDs) stringSet { + // If no tag keys were passed, get all tag keys for the measurement. + if len(tagKeys) == 0 { + for k, _ := range m.seriesByTagKeyValue { + tagKeys = append(tagKeys, k) + } + } + + // Make a set to hold all tag values found. + tagValues := newStringSet() + + // Iterate all series to collect tag values. + for _, id := range ids { + s, ok := m.seriesByID[id] + if !ok { + continue + } + + // Iterate the tag keys we're interested in and collect values + // from this series, if they exist. + for _, tagKey := range tagKeys { + if tagVal, ok := s.Tags[tagKey]; ok { + tagValues.add(tagVal) + } + } + } + + return tagValues +} + +type stringSet map[string]struct{} + +func newStringSet() stringSet { + return make(map[string]struct{}) +} + +func (s stringSet) add(ss string) { + s[ss] = struct{}{} +} + +func (s stringSet) list() []string { + l := make([]string, 0, len(s)) + for k, _ := range s { + l = append(l, k) + } + return l +} + +func (s stringSet) union(o stringSet) stringSet { + ns := newStringSet() + for k, _ := range s { + ns[k] = struct{}{} + } + for k, _ := range o { + ns[k] = struct{}{} + } + return ns +} + +func (s stringSet) intersect(o stringSet) stringSet { + ns := newStringSet() + for k, _ := range s { + if _, ok := o[k]; ok { + ns[k] = struct{}{} + } + } + for k, _ := range o { + if _, ok := s[k]; ok { + ns[k] = struct{}{} + } + } + return ns +} diff --git a/httpd/handler_test.go b/httpd/handler_test.go index 1928b377bd..9dda496a7f 100644 --- a/httpd/handler_test.go +++ b/httpd/handler_test.go @@ -1132,6 +1132,147 @@ func TestHandler_serveShowTagKeys(t *testing.T) { } } +func TestHandler_serveShowTagValues(t *testing.T) { + srvr := OpenServer(NewMessagingClient()) + srvr.CreateDatabase("foo") + srvr.CreateRetentionPolicy("foo", influxdb.NewRetentionPolicy("bar")) + srvr.SetDefaultRetentionPolicy("foo", "bar") + s := NewHTTPServer(srvr) + defer s.Close() + + status, body := MustHTTP("POST", s.URL+`/write`, nil, nil, `{"database" : "foo", "retentionPolicy" : "bar", "points": [ + {"name": "cpu", "tags": {"host": "server01"},"timestamp": "2009-11-10T23:00:00Z","values": {"value": 100}}, + {"name": "cpu", "tags": {"host": "server01", "region": "uswest"},"timestamp": "2009-11-10T23:00:00Z","values": {"value": 100}}, + {"name": "cpu", "tags": {"host": "server01", "region": "useast"},"timestamp": "2009-11-10T23:00:00Z","values": {"value": 100}}, + {"name": "cpu", "tags": {"host": "server02", "region": "useast"},"timestamp": "2009-11-10T23:00:00Z","values": {"value": 100}}, + {"name": "gpu", "tags": {"host": "server02", "region": "useast"},"timestamp": "2009-11-10T23:00:00Z","values": {"value": 100}} + ]}`) + + if status != http.StatusOK { + t.Log(body) + t.Fatalf("unexpected status after write: %d", status) + } + + var tests = []struct { + q string + r *influxdb.Results + err string + }{ + // SHOW TAG VALUES + { + q: `SHOW TAG VALUES WITH KEY = host`, + r: &influxdb.Results{ + Results: []*influxdb.Result{ + &influxdb.Result{ + Rows: []*influxql.Row{ + &influxql.Row{ + Name: "cpu", + Columns: []string{"tagValue"}, + Values: [][]interface{}{ + str2iface([]string{"server01"}), + str2iface([]string{"server02"}), + }, + }, + &influxql.Row{ + Name: "gpu", + Columns: []string{"tagValue"}, + Values: [][]interface{}{ + str2iface([]string{"server02"}), + }, + }, + }, + }, + }, + }, + }, + // SHOW TAG VALUES FROM ... + { + q: `SHOW TAG VALUES FROM cpu WITH KEY = host`, + r: &influxdb.Results{ + Results: []*influxdb.Result{ + &influxdb.Result{ + Rows: []*influxql.Row{ + &influxql.Row{ + Name: "cpu", + Columns: []string{"tagValue"}, + Values: [][]interface{}{ + str2iface([]string{"server01"}), + str2iface([]string{"server02"}), + }, + }, + }, + }, + }, + }, + }, + // SHOW TAG VALUES FROM ... WHERE ... + { + q: `SHOW TAG VALUES FROM cpu WITH KEY = host WHERE region = 'uswest'`, + r: &influxdb.Results{ + Results: []*influxdb.Result{ + &influxdb.Result{ + Rows: []*influxql.Row{ + &influxql.Row{ + Name: "cpu", + Columns: []string{"tagValue"}, + Values: [][]interface{}{ + str2iface([]string{"server01"}), + }, + }, + }, + }, + }, + }, + }, + // SHOW TAG VALUES FROM ... WITH KEY IN ... WHERE ... + { + q: `SHOW TAG VALUES FROM cpu WITH KEY IN (host, region) WHERE region = 'uswest'`, + r: &influxdb.Results{ + Results: []*influxdb.Result{ + &influxdb.Result{ + Rows: []*influxql.Row{ + &influxql.Row{ + Name: "cpu", + Columns: []string{"tagValue"}, + Values: [][]interface{}{ + str2iface([]string{"server01"}), + str2iface([]string{"uswest"}), + }, + }, + }, + }, + }, + }, + }, + } + for i, tt := range tests { + query := map[string]string{"db": "foo", "q": tt.q} + status, body = MustHTTP("GET", s.URL+`/query`, query, nil, "") + + if status != http.StatusOK { + t.Logf("query #%d: %s", i, tt.q) + t.Log(body) + t.Errorf("unexpected status: %d", status) + } + + r := &influxdb.Results{} + if err := json.Unmarshal([]byte(body), r); err != nil { + t.Logf("query #%d: %s", i, tt.q) + t.Log(body) + t.Error(err) + } + + if !reflect.DeepEqual(tt.err, errstring(r.Err)) { + t.Errorf("%d. %s: error mismatch:\n exp=%s\n got=%s\n\n", i, tt.q, tt.err, r.Err) + } else if tt.err == "" && !reflect.DeepEqual(tt.r, r) { + b, _ := json.Marshal(tt.r) + t.Log(string(b)) + t.Log(body) + t.Errorf("%d. %s: result mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.q, tt.r, r) + } + } +} + // Utility functions for this test suite. func MustHTTP(verb, path string, params, headers map[string]string, body string) (int, string) { diff --git a/influxql/ast.go b/influxql/ast.go index af6d7e46ed..fafb7d7a7e 100644 --- a/influxql/ast.go +++ b/influxql/ast.go @@ -91,7 +91,6 @@ func (_ *ParenExpr) node() {} func (_ *SortField) node() {} func (_ SortFields) node() {} func (_ *StringLiteral) node() {} -func (_ *TagKeyIdent) node() {} func (_ *Target) node() {} func (_ *TimeLiteral) node() {} func (_ *VarRef) node() {} @@ -177,7 +176,6 @@ func (_ *nilLiteral) expr() {} func (_ *NumberLiteral) expr() {} func (_ *ParenExpr) expr() {} func (_ *StringLiteral) expr() {} -func (_ *TagKeyIdent) expr() {} func (_ *TimeLiteral) expr() {} func (_ *VarRef) expr() {} func (_ *Wildcard) expr() {} @@ -1081,6 +1079,9 @@ type ShowTagValuesStatement struct { // Data source that fields are extracted from. Source Source + // Tag key(s) to pull values from. + TagKeys []string + // An expression evaluated on data point. Condition Expr @@ -1457,12 +1458,6 @@ type StringLiteral struct { // String returns a string representation of the literal. func (l *StringLiteral) String() string { return QuoteString(l.Val) } -// TagKeyIdent represents a special TAG KEY identifier. -type TagKeyIdent struct{} - -// String returns a string representation of the TagKeyIdent. -func (t *TagKeyIdent) String() string { return "TAG KEY" } - // TimeLiteral represents a point-in-time literal. type TimeLiteral struct { Val time.Time diff --git a/influxql/parser.go b/influxql/parser.go index e1e3bd25ae..306dd8b2e9 100644 --- a/influxql/parser.go +++ b/influxql/parser.go @@ -348,7 +348,7 @@ func (p *Parser) parseDuration() (time.Duration, error) { return d, nil } -// parserIdent parses an identifier. +// parseIdent parses an identifier. func (p *Parser) parseIdent() (string, error) { tok, pos, lit := p.scanIgnoreWhitespace() if tok != IDENT { @@ -357,6 +357,30 @@ func (p *Parser) parseIdent() (string, error) { return lit, nil } +// parseIdentList parses a comma delimited list of identifiers. +func (p *Parser) parseIdentList() ([]string, error) { + // Parse first (required) identifier. + ident, err := p.parseIdent() + if err != nil { + return nil, err + } + idents := []string{ident} + + // Parse remaining (optional) identifiers. + for { + if tok, _, _ := p.scanIgnoreWhitespace(); tok != COMMA { + p.unscan() + return idents, nil + } + + if ident, err = p.parseIdent(); err != nil { + return nil, err + } + + idents = append(idents, ident) + } +} + // parserString parses a string. func (p *Parser) parseString() (string, error) { tok, pos, lit := p.scanIgnoreWhitespace() @@ -717,11 +741,17 @@ func (p *Parser) parseShowTagValuesStatement() (*ShowTagValuesStatement, error) stmt := &ShowTagValuesStatement{} var err error - // Parse source. - if tok, pos, lit := p.scanIgnoreWhitespace(); tok != FROM { - return nil, newParseError(tokstr(tok, lit), []string{"FROM"}, pos) + // Parse optional source. + if tok, _, _ := p.scanIgnoreWhitespace(); tok == FROM { + if stmt.Source, err = p.parseSource(); err != nil { + return nil, err + } + } else { + p.unscan() } - if stmt.Source, err = p.parseSource(); err != nil { + + // Parse required WITH KEY. + if stmt.TagKeys, err = p.parseTagKeys(); err != nil { return nil, err } @@ -748,6 +778,47 @@ func (p *Parser) parseShowTagValuesStatement() (*ShowTagValuesStatement, error) return stmt, nil } +// parseTagKeys parses a string and returns a list of tag keys. +func (p *Parser) parseTagKeys() ([]string, error) { + var err error + + // Parse required WITH KEY tokens. + if err := p.parseTokens([]Token{WITH, KEY}); err != nil { + return nil, err + } + + var tagKeys []string + + // Parse required IN or EQ token. + if tok, pos, lit := p.scanIgnoreWhitespace(); tok == IN { + // Parse required ( token. + if tok, pos, lit = p.scanIgnoreWhitespace(); tok != LPAREN { + return nil, newParseError(tokstr(tok, lit), []string{"("}, pos) + } + + // Parse tag key list. + if tagKeys, err = p.parseIdentList(); err != nil { + return nil, err + } + + // Parse required ) token. + if tok, pos, lit = p.scanIgnoreWhitespace(); tok != RPAREN { + return nil, newParseError(tokstr(tok, lit), []string{"("}, pos) + } + } else if tok == EQ { + // Parse required tag key. + ident, err := p.parseIdent() + if err != nil { + return nil, err + } + tagKeys = append(tagKeys, ident) + } else { + return nil, newParseError(tokstr(tok, lit), []string{"IN", "="}, pos) + } + + return tagKeys, nil +} + // parseShowUsersStatement parses a string and returns a ShowUsersStatement. // This function assumes the "SHOW USERS" tokens have been consumed. func (p *Parser) parseShowUsersStatement() (*ShowUsersStatement, error) { @@ -1457,11 +1528,6 @@ func (p *Parser) parseUnaryExpr() (Expr, error) { case DURATION_VAL: v, _ := ParseDuration(lit) return &DurationLiteral{Val: v}, nil - case TAG: - if tok, pos, lit = p.scanIgnoreWhitespace(); tok != KEY { - return nil, newParseError(tokstr(tok, lit), []string{"KEY"}, pos) - } - return &TagKeyIdent{}, nil default: return nil, newParseError(tokstr(tok, lit), []string{"identifier", "string", "number", "bool"}, pos) } diff --git a/influxql/parser_test.go b/influxql/parser_test.go index 4b6df322e4..75b6a4f61a 100644 --- a/influxql/parser_test.go +++ b/influxql/parser_test.go @@ -231,11 +231,12 @@ func TestParser_ParseStatement(t *testing.T) { }, }, - // SHOW TAG VALUES + // SHOW TAG VALUES FROM ... WITH KEY = ... { - s: `SHOW TAG VALUES FROM src WHERE region = 'uswest' ORDER BY ASC, field1, field2 DESC LIMIT 10`, + s: `SHOW TAG VALUES FROM src WITH KEY = region WHERE region = 'uswest' ORDER BY ASC, field1, field2 DESC LIMIT 10`, stmt: &influxql.ShowTagValuesStatement{ - Source: &influxql.Measurement{Name: "src"}, + Source: &influxql.Measurement{Name: "src"}, + TagKeys: []string{"region"}, Condition: &influxql.BinaryExpr{ Op: influxql.EQ, LHS: &influxql.VarRef{Val: "region"}, @@ -250,65 +251,43 @@ func TestParser_ParseStatement(t *testing.T) { }, }, - // SHOW TAG VALUES ... TAG KEY = + // SHOW TAG VALUES FROM ... WITH KEY IN... { - s: `SHOW TAG VALUES FROM cpu WHERE TAG KEY = 'host' AND region = 'uswest'`, + s: `SHOW TAG VALUES FROM cpu WITH KEY IN (region, host) WHERE region = 'uswest'`, stmt: &influxql.ShowTagValuesStatement{ - Source: &influxql.Measurement{Name: "cpu"}, + Source: &influxql.Measurement{Name: "cpu"}, + TagKeys: []string{"region", "host"}, Condition: &influxql.BinaryExpr{ - Op: influxql.AND, - LHS: &influxql.BinaryExpr{ - Op: influxql.EQ, - LHS: &influxql.TagKeyIdent{}, - RHS: &influxql.StringLiteral{Val: "host"}, - }, - RHS: &influxql.BinaryExpr{ - Op: influxql.EQ, - LHS: &influxql.VarRef{Val: "region"}, - RHS: &influxql.StringLiteral{Val: "uswest"}, - }, + Op: influxql.EQ, + LHS: &influxql.VarRef{Val: "region"}, + RHS: &influxql.StringLiteral{Val: "uswest"}, }, }, }, // SHOW TAG VALUES ... AND TAG KEY = { - s: `SHOW TAG VALUES FROM cpu WHERE region = 'uswest' AND TAG KEY = 'host'`, + s: `SHOW TAG VALUES FROM cpu WITH KEY IN (region,service,host)WHERE region = 'uswest'`, stmt: &influxql.ShowTagValuesStatement{ - Source: &influxql.Measurement{Name: "cpu"}, + Source: &influxql.Measurement{Name: "cpu"}, + TagKeys: []string{"region", "service", "host"}, Condition: &influxql.BinaryExpr{ - Op: influxql.AND, - LHS: &influxql.BinaryExpr{ - Op: influxql.EQ, - LHS: &influxql.VarRef{Val: "region"}, - RHS: &influxql.StringLiteral{Val: "uswest"}, - }, - RHS: &influxql.BinaryExpr{ - Op: influxql.EQ, - LHS: &influxql.TagKeyIdent{}, - RHS: &influxql.StringLiteral{Val: "host"}, - }, + Op: influxql.EQ, + LHS: &influxql.VarRef{Val: "region"}, + RHS: &influxql.StringLiteral{Val: "uswest"}, }, }, }, - // SHOW TAG VALUES ... AND ... = TAG KEY + // SHOW TAG VALUES WITH KEY = ... { - s: `SHOW TAG VALUES FROM cpu WHERE region = 'uswest' AND 'host' = TAG KEY`, + s: `SHOW TAG VALUES WITH KEY = host WHERE region = 'uswest'`, stmt: &influxql.ShowTagValuesStatement{ - Source: &influxql.Measurement{Name: "cpu"}, + TagKeys: []string{"host"}, Condition: &influxql.BinaryExpr{ - Op: influxql.AND, - LHS: &influxql.BinaryExpr{ - Op: influxql.EQ, - LHS: &influxql.VarRef{Val: "region"}, - RHS: &influxql.StringLiteral{Val: "uswest"}, - }, - RHS: &influxql.BinaryExpr{ - Op: influxql.EQ, - LHS: &influxql.StringLiteral{Val: "host"}, - RHS: &influxql.TagKeyIdent{}, - }, + Op: influxql.EQ, + LHS: &influxql.VarRef{Val: "region"}, + RHS: &influxql.StringLiteral{Val: "uswest"}, }, }, }, diff --git a/influxql/token.go b/influxql/token.go index bde975b61f..615c604d9d 100644 --- a/influxql/token.go +++ b/influxql/token.go @@ -74,6 +74,7 @@ const ( GRANT GROUP IF + IN INNER INSERT INTO @@ -165,6 +166,7 @@ var tokens = [...]string{ GRANT: "GRANT", GROUP: "GROUP", IF: "IF", + IN: "IN", INNER: "INNER", INSERT: "INSERT", INTO: "INTO", diff --git a/server.go b/server.go index f030d93382..5afabe8f30 100644 --- a/server.go +++ b/server.go @@ -1642,7 +1642,7 @@ func (s *Server) ExecuteQuery(q *influxql.Query, database string, user *User) Re case *influxql.ShowTagKeysStatement: res = s.executeShowTagKeysStatement(stmt, database, user) case *influxql.ShowTagValuesStatement: - continue + res = s.executeShowTagValuesStatement(stmt, database, user) case *influxql.ShowFieldKeysStatement: continue case *influxql.ShowFieldValuesStatement: @@ -1945,6 +1945,82 @@ func (s *Server) executeShowTagKeysStatement(stmt *influxql.ShowTagKeysStatement return result } +func (s *Server) executeShowTagValuesStatement(stmt *influxql.ShowTagValuesStatement, database string, user *User) *Result { + s.mu.RLock() + defer s.mu.RUnlock() + + // Find the database. + db := s.databases[database] + if db == nil { + return &Result{Err: ErrDatabaseNotFound} + } + + // Get the list of measurements we're interested in. + measurements, err := measurementsFromSourceOrDB(stmt.Source, db) + if err != nil { + return &Result{Err: err} + } + + // Make result. + result := &Result{ + Rows: make(influxql.Rows, 0, len(measurements)), + } + + for _, name := range measurements { + // Look up the measurement by name. + m, ok := db.measurements[name] + if !ok { + continue + } + + var ids seriesIDs + + if stmt.Condition != nil { + // Get series IDs that match the WHERE clause. + filters := map[uint32]influxql.Expr{} + ids, _, _ = m.walkWhereForSeriesIds(stmt.Condition, filters) + + // If no series matched, then go to the next measurement. + if len(ids) == 0 { + continue + } + + // TODO: check return of walkWhereForSeriesIds for fields + } else { + // No WHERE clause so get all series IDs for this measurement. + ids = m.seriesIDs + } + + tagValues := m.tagValuesByKeyAndSeriesID(stmt.TagKeys, ids) + + r := &influxql.Row{ + Name: m.Name, + Columns: []string{"tagValue"}, + } + + vals := tagValues.list() + sort.Strings(vals) + + for _, val := range vals { + v := interface{}(val) + r.Values = append(r.Values, []interface{}{v}) + } + + result.Rows = append(result.Rows, r) + } + + return result +} + +// str2iface converts an array of strings to an array of interfaces. +func str2iface(strs []string) []interface{} { + a := make([]interface{}, 0, len(strs)) + for _, s := range strs { + a = append(a, interface{}(s)) + } + return a +} + // measurementsFromSourceOrDB returns a list of measurement names from the // statement passed in or, if the statement is nil, a list of all // measurement names from the database passed in. diff --git a/util.go b/util.go index 138f7880b5..5e7d337a7b 100644 --- a/util.go +++ b/util.go @@ -69,8 +69,8 @@ func mapKeyList(m interface{}) []string { switch m.(type) { case map[string]string: return mapStrStrKeyList(m.(map[string]string)) - case map[string]int: - return mapStrIntKeyList(m.(map[string]int)) + case map[string]uint32: + return mapStrUint32KeyList(m.(map[string]uint32)) } return nil } @@ -83,7 +83,7 @@ func mapStrStrKeyList(m map[string]string) []string { return l } -func mapStrIntKeyList(m map[string]int) []string { +func mapStrUint32KeyList(m map[string]uint32) []string { l := make([]string, 0, len(m)) for k, _ := range m { l = append(l, k)