issue #1411: wire up show tag keys
parent
094e9bd008
commit
ae33bc99db
|
@ -959,14 +959,7 @@ func TestHandler_serveShowSeries(t *testing.T) {
|
|||
if !reflect.DeepEqual(tt.err, errstring(r.Err)) {
|
||||
t.Errorf("%d. %s: error mismatch:\n exp=%s\n got=%s\n\n", i, tt.q, tt.err, r.Err)
|
||||
} else if tt.err == "" && !reflect.DeepEqual(tt.r, r) {
|
||||
|
||||
t.Log(body)
|
||||
t.Log("")
|
||||
b, _ := json.Marshal(tt.r)
|
||||
t.Log(string(b))
|
||||
if body == string(b) {
|
||||
t.Log("****** strings are equal")
|
||||
}
|
||||
t.Errorf("%d. %s: result mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.q, tt.r, r)
|
||||
}
|
||||
}
|
||||
|
@ -1036,6 +1029,95 @@ func TestHandler_serveShowMeasurements(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestHandler_serveShowTagKeys(t *testing.T) {
|
||||
srvr := OpenServer(NewMessagingClient())
|
||||
srvr.CreateDatabase("foo")
|
||||
srvr.CreateRetentionPolicy("foo", influxdb.NewRetentionPolicy("bar"))
|
||||
srvr.SetDefaultRetentionPolicy("foo", "bar")
|
||||
s := NewHTTPServer(srvr)
|
||||
defer s.Close()
|
||||
|
||||
status, body := MustHTTP("POST", s.URL+`/write`, nil, nil, `{"database" : "foo", "retentionPolicy" : "bar", "points": [
|
||||
{"name": "cpu", "tags": {"host": "server01"},"timestamp": "2009-11-10T23:00:00Z","values": {"value": 100}},
|
||||
{"name": "cpu", "tags": {"host": "server01", "region": "uswest"},"timestamp": "2009-11-10T23:00:00Z","values": {"value": 100}},
|
||||
{"name": "cpu", "tags": {"host": "server01", "region": "useast"},"timestamp": "2009-11-10T23:00:00Z","values": {"value": 100}},
|
||||
{"name": "cpu", "tags": {"host": "server02", "region": "useast"},"timestamp": "2009-11-10T23:00:00Z","values": {"value": 100}},
|
||||
{"name": "gpu", "tags": {"host": "server02", "region": "useast"},"timestamp": "2009-11-10T23:00:00Z","values": {"value": 100}}
|
||||
]}`)
|
||||
|
||||
if status != http.StatusOK {
|
||||
t.Log(body)
|
||||
t.Fatalf("unexpected status after write: %d", status)
|
||||
}
|
||||
|
||||
var tests = []struct {
|
||||
q string
|
||||
r *influxdb.Results
|
||||
err string
|
||||
}{
|
||||
// SHOW TAG KEYS
|
||||
{
|
||||
q: `SHOW TAG KEYS`,
|
||||
r: &influxdb.Results{
|
||||
Results: []*influxdb.Result{
|
||||
&influxdb.Result{
|
||||
Rows: []*influxql.Row{
|
||||
&influxql.Row{
|
||||
Name: "cpu",
|
||||
Columns: []string{"host", "region"},
|
||||
},
|
||||
&influxql.Row{
|
||||
Name: "gpu",
|
||||
Columns: []string{"host", "region"},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
// SHOW TAG KEYS FROM...
|
||||
{
|
||||
q: `SHOW TAG KEYS FROM cpu`,
|
||||
r: &influxdb.Results{
|
||||
Results: []*influxdb.Result{
|
||||
&influxdb.Result{
|
||||
Rows: []*influxql.Row{
|
||||
&influxql.Row{
|
||||
Name: "cpu",
|
||||
Columns: []string{"host", "region"},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
for i, tt := range tests {
|
||||
query := map[string]string{"db": "foo", "q": tt.q}
|
||||
status, body = MustHTTP("GET", s.URL+`/query`, query, nil, "")
|
||||
|
||||
if status != http.StatusOK {
|
||||
t.Logf("query #%d: %s", i, tt.q)
|
||||
t.Log(body)
|
||||
t.Errorf("unexpected status: %d", status)
|
||||
}
|
||||
|
||||
r := &influxdb.Results{}
|
||||
if err := json.Unmarshal([]byte(body), r); err != nil {
|
||||
t.Logf("query #%d: %s", i, tt.q)
|
||||
t.Log(body)
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(tt.err, errstring(r.Err)) {
|
||||
t.Errorf("%d. %s: error mismatch:\n exp=%s\n got=%s\n\n", i, tt.q, tt.err, r.Err)
|
||||
} else if tt.err == "" && !reflect.DeepEqual(tt.r, r) {
|
||||
t.Log(body)
|
||||
t.Errorf("%d. %s: result mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.q, tt.r, r)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Utility functions for this test suite.
|
||||
|
||||
func MustHTTP(verb, path string, params, headers map[string]string, body string) (int, string) {
|
||||
|
|
|
@ -1648,6 +1648,16 @@ func Walk(v Visitor, node Node) {
|
|||
Walk(v, n.Source)
|
||||
Walk(v, n.Condition)
|
||||
|
||||
case *ShowTagKeysStatement:
|
||||
Walk(v, n.Source)
|
||||
Walk(v, n.Condition)
|
||||
Walk(v, n.SortFields)
|
||||
|
||||
case *ShowTagValuesStatement:
|
||||
Walk(v, n.Source)
|
||||
Walk(v, n.Condition)
|
||||
Walk(v, n.SortFields)
|
||||
|
||||
case Fields:
|
||||
for _, c := range n {
|
||||
Walk(v, c)
|
||||
|
|
|
@ -679,12 +679,13 @@ func (p *Parser) parseShowTagKeysStatement() (*ShowTagKeysStatement, error) {
|
|||
stmt := &ShowTagKeysStatement{}
|
||||
var err error
|
||||
|
||||
// Parse source.
|
||||
if tok, pos, lit := p.scanIgnoreWhitespace(); tok != FROM {
|
||||
return nil, newParseError(tokstr(tok, lit), []string{"FROM"}, pos)
|
||||
}
|
||||
if stmt.Source, err = p.parseSource(); err != nil {
|
||||
return nil, err
|
||||
// Parse optional source.
|
||||
if tok, _, _ := p.scanIgnoreWhitespace(); tok == FROM {
|
||||
if stmt.Source, err = p.parseSource(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
p.unscan()
|
||||
}
|
||||
|
||||
// Parse condition: "WHERE EXPR".
|
||||
|
|
|
@ -204,6 +204,14 @@ func TestParser_ParseStatement(t *testing.T) {
|
|||
},
|
||||
},
|
||||
|
||||
// SHOW TAG KEYS
|
||||
{
|
||||
s: `SHOW TAG KEYS FROM src`,
|
||||
stmt: &influxql.ShowTagKeysStatement{
|
||||
Source: &influxql.Measurement{Name: "src"},
|
||||
},
|
||||
},
|
||||
|
||||
// SHOW TAG KEYS
|
||||
{
|
||||
s: `SHOW TAG KEYS FROM src WHERE region = 'uswest' ORDER BY ASC, field1, field2 DESC LIMIT 10`,
|
||||
|
|
102
server.go
102
server.go
|
@ -1640,7 +1640,7 @@ func (s *Server) ExecuteQuery(q *influxql.Query, database string, user *User) Re
|
|||
case *influxql.ShowMeasurementsStatement:
|
||||
res = s.executeShowMeasurementsStatement(stmt, database, user)
|
||||
case *influxql.ShowTagKeysStatement:
|
||||
continue
|
||||
res = s.executeShowTagKeysStatement(stmt, database, user)
|
||||
case *influxql.ShowTagValuesStatement:
|
||||
continue
|
||||
case *influxql.ShowFieldKeysStatement:
|
||||
|
@ -1763,28 +1763,16 @@ func (s *Server) executeShowSeriesStatement(stmt *influxql.ShowSeriesStatement,
|
|||
return &Result{Err: ErrDatabaseNotFound}
|
||||
}
|
||||
|
||||
// Make a list of measurements we're interested in.
|
||||
var measurements []string
|
||||
if stmt.Source != nil {
|
||||
// TODO: handle multiple measurement sources
|
||||
if m, ok := stmt.Source.(*influxql.Measurement); ok {
|
||||
segments, err := influxql.SplitIdent(m.Name)
|
||||
if err != nil {
|
||||
return &Result{Err: err}
|
||||
}
|
||||
measurements = append(measurements, segments[2])
|
||||
} else {
|
||||
return &Result{Err: ErrMeasurementNotFound}
|
||||
}
|
||||
} else {
|
||||
// No measurements specified in FROM clause so get all measurements.
|
||||
measurements = db.MeasurementNames()
|
||||
// Get the list of measurements we're interested in.
|
||||
measurements, err := measurementsFromSourceOrDB(stmt.Source, db)
|
||||
if err != nil {
|
||||
return &Result{Err: err}
|
||||
}
|
||||
|
||||
// If OFFSET is past the end of the array, return empty results.
|
||||
if stmt.Offset > len(measurements)-1 {
|
||||
return &Result{}
|
||||
}
|
||||
// // If OFFSET is past the end of the array, return empty results.
|
||||
// if stmt.Offset > len(measurements)-1 {
|
||||
// return &Result{}
|
||||
// }
|
||||
|
||||
// Sort measurement names so results are always in the same order.
|
||||
sort.Strings(measurements)
|
||||
|
@ -1847,10 +1835,10 @@ func (s *Server) executeShowSeriesStatement(stmt *influxql.ShowSeriesStatement,
|
|||
}
|
||||
|
||||
func (s *Server) executeShowMeasurementsStatement(stmt *influxql.ShowMeasurementsStatement, database string, user *User) *Result {
|
||||
// Find the database.
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
|
||||
// Find the database.
|
||||
db := s.databases[database]
|
||||
if db == nil {
|
||||
return &Result{Err: ErrDatabaseNotFound}
|
||||
|
@ -1897,7 +1885,6 @@ func (s *Server) executeShowMeasurementsStatement(stmt *influxql.ShowMeasurement
|
|||
Name: m.Name,
|
||||
Columns: m.tagKeys(),
|
||||
}
|
||||
sort.Strings(r.Columns)
|
||||
|
||||
result.Rows = append(result.Rows, r)
|
||||
}
|
||||
|
@ -1905,6 +1892,75 @@ func (s *Server) executeShowMeasurementsStatement(stmt *influxql.ShowMeasurement
|
|||
return result
|
||||
}
|
||||
|
||||
func (s *Server) executeShowTagKeysStatement(stmt *influxql.ShowTagKeysStatement, database string, user *User) *Result {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
|
||||
// Find the database.
|
||||
db := s.databases[database]
|
||||
if db == nil {
|
||||
return &Result{Err: ErrDatabaseNotFound}
|
||||
}
|
||||
|
||||
// Get the list of measurements we're interested in.
|
||||
measurements, err := measurementsFromSourceOrDB(stmt.Source, db)
|
||||
if err != nil {
|
||||
return &Result{Err: err}
|
||||
}
|
||||
|
||||
// Make result.
|
||||
result := &Result{
|
||||
Rows: make(influxql.Rows, 0, len(measurements)),
|
||||
}
|
||||
|
||||
// Add one row per measurement to the result.
|
||||
for _, name := range measurements {
|
||||
// Look up the measurement by name.
|
||||
m, ok := db.measurements[name]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
// TODO: filter tag keys by stmt.Condition
|
||||
|
||||
r := &influxql.Row{
|
||||
Name: m.Name,
|
||||
Columns: m.tagKeys(),
|
||||
}
|
||||
|
||||
result.Rows = append(result.Rows, r)
|
||||
}
|
||||
|
||||
// TODO: LIMIT & OFFSET
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
// measurementsFromSourceOrDB returns a list of measurement names from the
|
||||
// statement passed in or, if the statement is nil, a list of all
|
||||
// measurement names from the database passed in.
|
||||
func measurementsFromSourceOrDB(stmt influxql.Source, db *database) ([]string, error) {
|
||||
var measurements []string
|
||||
if stmt != nil {
|
||||
// TODO: handle multiple measurement sources
|
||||
if m, ok := stmt.(*influxql.Measurement); ok {
|
||||
segments, err := influxql.SplitIdent(m.Name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
measurements = append(measurements, segments[2])
|
||||
} else {
|
||||
return nil, errors.New("identifiers in FROM clause must be measurement names")
|
||||
}
|
||||
} else {
|
||||
// No measurements specified in FROM clause so get all measurements.
|
||||
measurements = db.MeasurementNames()
|
||||
}
|
||||
|
||||
return measurements, nil
|
||||
}
|
||||
|
||||
func (s *Server) executeShowUsersStatement(q *influxql.ShowUsersStatement, user *User) *Result {
|
||||
row := &influxql.Row{Columns: []string{"user", "admin"}}
|
||||
for _, user := range s.Users() {
|
||||
|
|
Loading…
Reference in New Issue