From ae33bc99dbdfe99fc1a939af6d3f1aa44ea0f357 Mon Sep 17 00:00:00 2001 From: David Norton Date: Wed, 28 Jan 2015 20:26:15 -0500 Subject: [PATCH] issue #1411: wire up show tag keys --- httpd/handler_test.go | 96 ++++++++++++++++++++++++++++++++++--- influxql/ast.go | 10 ++++ influxql/parser.go | 13 ++--- influxql/parser_test.go | 8 ++++ server.go | 102 +++++++++++++++++++++++++++++++--------- 5 files changed, 193 insertions(+), 36 deletions(-) diff --git a/httpd/handler_test.go b/httpd/handler_test.go index 95440cc901..6255bfdf1b 100644 --- a/httpd/handler_test.go +++ b/httpd/handler_test.go @@ -959,14 +959,7 @@ func TestHandler_serveShowSeries(t *testing.T) { if !reflect.DeepEqual(tt.err, errstring(r.Err)) { t.Errorf("%d. %s: error mismatch:\n exp=%s\n got=%s\n\n", i, tt.q, tt.err, r.Err) } else if tt.err == "" && !reflect.DeepEqual(tt.r, r) { - t.Log(body) - t.Log("") - b, _ := json.Marshal(tt.r) - t.Log(string(b)) - if body == string(b) { - t.Log("****** strings are equal") - } t.Errorf("%d. %s: result mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.q, tt.r, r) } } @@ -1036,6 +1029,95 @@ func TestHandler_serveShowMeasurements(t *testing.T) { } } +func TestHandler_serveShowTagKeys(t *testing.T) { + srvr := OpenServer(NewMessagingClient()) + srvr.CreateDatabase("foo") + srvr.CreateRetentionPolicy("foo", influxdb.NewRetentionPolicy("bar")) + srvr.SetDefaultRetentionPolicy("foo", "bar") + s := NewHTTPServer(srvr) + defer s.Close() + + status, body := MustHTTP("POST", s.URL+`/write`, nil, nil, `{"database" : "foo", "retentionPolicy" : "bar", "points": [ + {"name": "cpu", "tags": {"host": "server01"},"timestamp": "2009-11-10T23:00:00Z","values": {"value": 100}}, + {"name": "cpu", "tags": {"host": "server01", "region": "uswest"},"timestamp": "2009-11-10T23:00:00Z","values": {"value": 100}}, + {"name": "cpu", "tags": {"host": "server01", "region": "useast"},"timestamp": "2009-11-10T23:00:00Z","values": {"value": 100}}, + {"name": "cpu", "tags": {"host": "server02", "region": "useast"},"timestamp": "2009-11-10T23:00:00Z","values": {"value": 100}}, + {"name": "gpu", "tags": {"host": "server02", "region": "useast"},"timestamp": "2009-11-10T23:00:00Z","values": {"value": 100}} + ]}`) + + if status != http.StatusOK { + t.Log(body) + t.Fatalf("unexpected status after write: %d", status) + } + + var tests = []struct { + q string + r *influxdb.Results + err string + }{ + // SHOW TAG KEYS + { + q: `SHOW TAG KEYS`, + r: &influxdb.Results{ + Results: []*influxdb.Result{ + &influxdb.Result{ + Rows: []*influxql.Row{ + &influxql.Row{ + Name: "cpu", + Columns: []string{"host", "region"}, + }, + &influxql.Row{ + Name: "gpu", + Columns: []string{"host", "region"}, + }, + }, + }, + }, + }, + }, + // SHOW TAG KEYS FROM... + { + q: `SHOW TAG KEYS FROM cpu`, + r: &influxdb.Results{ + Results: []*influxdb.Result{ + &influxdb.Result{ + Rows: []*influxql.Row{ + &influxql.Row{ + Name: "cpu", + Columns: []string{"host", "region"}, + }, + }, + }, + }, + }, + }, + } + for i, tt := range tests { + query := map[string]string{"db": "foo", "q": tt.q} + status, body = MustHTTP("GET", s.URL+`/query`, query, nil, "") + + if status != http.StatusOK { + t.Logf("query #%d: %s", i, tt.q) + t.Log(body) + t.Errorf("unexpected status: %d", status) + } + + r := &influxdb.Results{} + if err := json.Unmarshal([]byte(body), r); err != nil { + t.Logf("query #%d: %s", i, tt.q) + t.Log(body) + t.Error(err) + } + + if !reflect.DeepEqual(tt.err, errstring(r.Err)) { + t.Errorf("%d. %s: error mismatch:\n exp=%s\n got=%s\n\n", i, tt.q, tt.err, r.Err) + } else if tt.err == "" && !reflect.DeepEqual(tt.r, r) { + t.Log(body) + t.Errorf("%d. %s: result mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.q, tt.r, r) + } + } +} + // Utility functions for this test suite. func MustHTTP(verb, path string, params, headers map[string]string, body string) (int, string) { diff --git a/influxql/ast.go b/influxql/ast.go index d518f2e0fd..af6d7e46ed 100644 --- a/influxql/ast.go +++ b/influxql/ast.go @@ -1648,6 +1648,16 @@ func Walk(v Visitor, node Node) { Walk(v, n.Source) Walk(v, n.Condition) + case *ShowTagKeysStatement: + Walk(v, n.Source) + Walk(v, n.Condition) + Walk(v, n.SortFields) + + case *ShowTagValuesStatement: + Walk(v, n.Source) + Walk(v, n.Condition) + Walk(v, n.SortFields) + case Fields: for _, c := range n { Walk(v, c) diff --git a/influxql/parser.go b/influxql/parser.go index 06501d70fb..e1e3bd25ae 100644 --- a/influxql/parser.go +++ b/influxql/parser.go @@ -679,12 +679,13 @@ func (p *Parser) parseShowTagKeysStatement() (*ShowTagKeysStatement, error) { stmt := &ShowTagKeysStatement{} var err error - // Parse source. - if tok, pos, lit := p.scanIgnoreWhitespace(); tok != FROM { - return nil, newParseError(tokstr(tok, lit), []string{"FROM"}, pos) - } - if stmt.Source, err = p.parseSource(); err != nil { - return nil, err + // Parse optional source. + if tok, _, _ := p.scanIgnoreWhitespace(); tok == FROM { + if stmt.Source, err = p.parseSource(); err != nil { + return nil, err + } + } else { + p.unscan() } // Parse condition: "WHERE EXPR". diff --git a/influxql/parser_test.go b/influxql/parser_test.go index 25a8237e4e..4b6df322e4 100644 --- a/influxql/parser_test.go +++ b/influxql/parser_test.go @@ -204,6 +204,14 @@ func TestParser_ParseStatement(t *testing.T) { }, }, + // SHOW TAG KEYS + { + s: `SHOW TAG KEYS FROM src`, + stmt: &influxql.ShowTagKeysStatement{ + Source: &influxql.Measurement{Name: "src"}, + }, + }, + // SHOW TAG KEYS { s: `SHOW TAG KEYS FROM src WHERE region = 'uswest' ORDER BY ASC, field1, field2 DESC LIMIT 10`, diff --git a/server.go b/server.go index 3ee6791ebe..bcbd5876bd 100644 --- a/server.go +++ b/server.go @@ -1640,7 +1640,7 @@ func (s *Server) ExecuteQuery(q *influxql.Query, database string, user *User) Re case *influxql.ShowMeasurementsStatement: res = s.executeShowMeasurementsStatement(stmt, database, user) case *influxql.ShowTagKeysStatement: - continue + res = s.executeShowTagKeysStatement(stmt, database, user) case *influxql.ShowTagValuesStatement: continue case *influxql.ShowFieldKeysStatement: @@ -1763,28 +1763,16 @@ func (s *Server) executeShowSeriesStatement(stmt *influxql.ShowSeriesStatement, return &Result{Err: ErrDatabaseNotFound} } - // Make a list of measurements we're interested in. - var measurements []string - if stmt.Source != nil { - // TODO: handle multiple measurement sources - if m, ok := stmt.Source.(*influxql.Measurement); ok { - segments, err := influxql.SplitIdent(m.Name) - if err != nil { - return &Result{Err: err} - } - measurements = append(measurements, segments[2]) - } else { - return &Result{Err: ErrMeasurementNotFound} - } - } else { - // No measurements specified in FROM clause so get all measurements. - measurements = db.MeasurementNames() + // Get the list of measurements we're interested in. + measurements, err := measurementsFromSourceOrDB(stmt.Source, db) + if err != nil { + return &Result{Err: err} } - // If OFFSET is past the end of the array, return empty results. - if stmt.Offset > len(measurements)-1 { - return &Result{} - } + // // If OFFSET is past the end of the array, return empty results. + // if stmt.Offset > len(measurements)-1 { + // return &Result{} + // } // Sort measurement names so results are always in the same order. sort.Strings(measurements) @@ -1847,10 +1835,10 @@ func (s *Server) executeShowSeriesStatement(stmt *influxql.ShowSeriesStatement, } func (s *Server) executeShowMeasurementsStatement(stmt *influxql.ShowMeasurementsStatement, database string, user *User) *Result { - // Find the database. s.mu.RLock() defer s.mu.RUnlock() + // Find the database. db := s.databases[database] if db == nil { return &Result{Err: ErrDatabaseNotFound} @@ -1897,7 +1885,6 @@ func (s *Server) executeShowMeasurementsStatement(stmt *influxql.ShowMeasurement Name: m.Name, Columns: m.tagKeys(), } - sort.Strings(r.Columns) result.Rows = append(result.Rows, r) } @@ -1905,6 +1892,75 @@ func (s *Server) executeShowMeasurementsStatement(stmt *influxql.ShowMeasurement return result } +func (s *Server) executeShowTagKeysStatement(stmt *influxql.ShowTagKeysStatement, database string, user *User) *Result { + s.mu.RLock() + defer s.mu.RUnlock() + + // Find the database. + db := s.databases[database] + if db == nil { + return &Result{Err: ErrDatabaseNotFound} + } + + // Get the list of measurements we're interested in. + measurements, err := measurementsFromSourceOrDB(stmt.Source, db) + if err != nil { + return &Result{Err: err} + } + + // Make result. + result := &Result{ + Rows: make(influxql.Rows, 0, len(measurements)), + } + + // Add one row per measurement to the result. + for _, name := range measurements { + // Look up the measurement by name. + m, ok := db.measurements[name] + if !ok { + continue + } + + // TODO: filter tag keys by stmt.Condition + + r := &influxql.Row{ + Name: m.Name, + Columns: m.tagKeys(), + } + + result.Rows = append(result.Rows, r) + } + + // TODO: LIMIT & OFFSET + + return result +} + +// measurementsFromSourceOrDB returns a list of measurement names from the +// statement passed in or, if the statement is nil, a list of all +// measurement names from the database passed in. +func measurementsFromSourceOrDB(stmt influxql.Source, db *database) ([]string, error) { + var measurements []string + if stmt != nil { + // TODO: handle multiple measurement sources + if m, ok := stmt.(*influxql.Measurement); ok { + segments, err := influxql.SplitIdent(m.Name) + if err != nil { + return nil, err + } + + measurements = append(measurements, segments[2]) + } else { + return nil, errors.New("identifiers in FROM clause must be measurement names") + } + } else { + // No measurements specified in FROM clause so get all measurements. + measurements = db.MeasurementNames() + } + + return measurements, nil +} + func (s *Server) executeShowUsersStatement(q *influxql.ShowUsersStatement, user *User) *Result { row := &influxql.Row{Columns: []string{"user", "admin"}} for _, user := range s.Users() {