implement SHOW MEASUREMENTS

pull/1398/head
David Norton 2015-01-28 00:51:09 -05:00
parent b50e4cc514
commit 16eaae5fbd
3 changed files with 262 additions and 4 deletions

View File

@ -1057,3 +1057,171 @@ func (m *Measurement) seriesIDsByFilter(filter *TagFilter) (ids seriesIDs) {
return
}
func (a Measurements) Len() int { return len(a) }
func (a Measurements) Less(i, j int) bool { return a[i].Name < a[j].Name }
func (a Measurements) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a Measurements) intersect(other Measurements) Measurements {
l := a
r := other
// we want to iterate through the shortest one and stop
if len(other) < len(a) {
l = other
r = a
}
// they're in sorted order so advance the counter as needed.
// That is, don't run comparisons against lower values that we've already passed
var i, j int
result := make(Measurements, 0, len(l))
for i < len(l) && j < len(r) {
if l[i].Name == r[j].Name {
result = append(result, l[i])
i += 1
j += 1
} else if l[i].Name < r[j].Name {
i += 1
} else {
j += 1
}
}
return result
}
func (a Measurements) union(other Measurements) Measurements {
result := make(Measurements, 0, len(a)+len(other))
var i, j int
for i < len(a) && j < len(other) {
if a[i].Name == other[j].Name {
result = append(result, a[i])
i += 1
j += 1
} else if a[i].Name < other[j].Name {
result = append(result, a[i])
i += 1
} else {
result = append(result, other[j])
j += 1
}
}
// now append the remainder
if i < len(a) {
result = append(result, a[i:]...)
} else if j < len(other) {
result = append(result, other[j:]...)
}
return result
}
// measurementsByExpr takes and expression containing only tags and returns
// a list of matching *Measurement.
func (d *database) measurementsByExpr(expr influxql.Expr) (Measurements, error) {
switch e := expr.(type) {
case *influxql.BinaryExpr:
switch e.Op {
case influxql.EQ, influxql.NEQ:
tag, ok := e.LHS.(*influxql.VarRef)
if !ok {
return nil, fmt.Errorf("left side of '=' must be a tag name")
}
value, ok := e.RHS.(*influxql.StringLiteral)
if !ok {
return nil, fmt.Errorf("right side of '=' must be a tag value string")
}
tf := &TagFilter{
Not: e.Op == influxql.NEQ,
Key: tag.Val,
Value: value.Val,
}
return d.measurementsByTagFilters([]*TagFilter{tf}), nil
case influxql.OR, influxql.AND:
lhsIDs, err := d.measurementsByExpr(e.LHS)
if err != nil {
return nil, err
}
rhsIDs, err := d.measurementsByExpr(e.RHS)
if err != nil {
return nil, err
}
if e.Op == influxql.OR {
return lhsIDs.union(rhsIDs), nil
} else {
return lhsIDs.intersect(rhsIDs), nil
}
default:
return nil, fmt.Errorf("invalid operator")
}
case *influxql.ParenExpr:
return d.measurementsByExpr(e.Expr)
}
return nil, fmt.Errorf("%#v", expr)
}
func (d *database) measurementsByTagFilters(filters []*TagFilter) Measurements {
// If no filters, then return all measurements.
if len(filters) == 0 {
measurements := make(Measurements, 0, len(d.measurements))
for _, m := range d.measurements {
measurements = append(measurements, m)
}
return measurements
}
// Build a list of measurements matching the filters.
var measurements Measurements
var tagMatch bool
for _, m := range d.measurements {
for _, f := range filters {
tagMatch = false
if tagVals, ok := m.seriesByTagKeyValue[f.Key]; ok {
if _, ok := tagVals[f.Value]; ok {
tagMatch = true
}
}
isEQ := !f.Not
// tags match | operation is EQ | measurement matches
// --------------------------------------------------
// True | True | True
// True | False | False
// False | True | False
// False | False | True
if tagMatch == isEQ {
measurements = append(measurements, m)
break
}
}
}
return measurements
}
// Measurements returns a list of all measurements.
func (d *database) Measurements() Measurements {
measurements := make(Measurements, 0, len(d.measurements))
for _, m := range d.measurements {
measurements = append(measurements, m)
}
return measurements
}
// tagKeys returns a list of the measurement's tag names.
func (m *Measurement) tagKeys() []string {
keys := make([]string, 0, len(m.seriesByTagKeyValue))
for k, _ := range m.seriesByTagKeyValue {
keys = append(keys, k)
}
return keys
}

View File

@ -805,6 +805,38 @@ func TestHandler_serveShowSeries(t *testing.T) {
t.Fatalf("test")
}
func TestHandler_serveShowMeasurements(t *testing.T) {
srvr := OpenServer(NewMessagingClient())
srvr.CreateDatabase("foo")
srvr.CreateRetentionPolicy("foo", influxdb.NewRetentionPolicy("bar"))
s := NewHTTPServer(srvr)
defer s.Close()
status, body := MustHTTP("POST", s.URL+`/write`, nil, nil, `{"database" : "foo", "retentionPolicy" : "bar", "points": [
{"name": "cpu", "tags": {"host": "server01"},"timestamp": "2009-11-10T23:00:00Z","values": {"value": 100}},
{"name": "cpu", "tags": {"host": "server01", "region": "uswest"},"timestamp": "2009-11-10T23:00:00Z","values": {"value": 100}},
{"name": "cpu", "tags": {"host": "server01", "region": "useast"},"timestamp": "2009-11-10T23:00:00Z","values": {"value": 100}},
{"name": "cpu", "tags": {"host": "server02", "region": "useast"},"timestamp": "2009-11-10T23:00:00Z","values": {"value": 100}},
{"name": "gpu", "tags": {"host": "server02", "region": "useast"},"timestamp": "2009-11-10T23:00:00Z","values": {"value": 100}}
]}`)
if status != http.StatusOK {
t.Log(body)
t.Fatalf("unexpected status after write: %d", status)
}
query := map[string]string{"db": "foo", "q": "SHOW MEASUREMENTS LIMIT 2"}
status, body = MustHTTP("GET", s.URL+`/query`, query, nil, "")
if status != http.StatusOK {
t.Log(body)
t.Fatalf("unexpected status after query: %d", status)
}
t.Log(body)
t.Fatalf("test")
}
// Utility functions for this test suite.
func MustHTTP(verb, path string, params, headers map[string]string, body string) (int, string) {

View File

@ -1587,7 +1587,7 @@ func (s *Server) ExecuteQuery(q *influxql.Query, database string, user *User) Re
case *influxql.ShowSeriesStatement:
res = s.executeShowSeriesStatement(stmt, database, user)
case *influxql.ShowMeasurementsStatement:
continue
res = s.executeShowMeasurementsStatement(stmt, database, user)
case *influxql.ShowTagKeysStatement:
continue
case *influxql.ShowTagValuesStatement:
@ -1709,12 +1709,10 @@ func (s *Server) executeShowSeriesStatement(stmt *influxql.ShowSeriesStatement,
return &Result{Err: ErrDatabaseNotFound}
}
fmt.Printf("# series = %d\n", len(db.series))
// Make a list of measurements we're interested in.
var measurements []string
if stmt.Source != nil {
// TODO: (david) handle multiple measurement sources
// TODO: handle multiple measurement sources
if m, ok := stmt.Source.(*influxql.Measurement); ok {
measurements = append(measurements, m.Name)
} else {
@ -1736,6 +1734,8 @@ func (s *Server) executeShowSeriesStatement(stmt *influxql.ShowSeriesStatement,
Rows: make(influxql.Rows, 0, len(ids)),
}
// TODO: support OFFSET & LIMIT
// Add one result row for each series.
for _, id := range ids {
if series := db.Series(id); series != nil {
@ -1754,6 +1754,64 @@ func (s *Server) executeShowSeriesStatement(stmt *influxql.ShowSeriesStatement,
return result
}
func (s *Server) executeShowMeasurementsStatement(stmt *influxql.ShowMeasurementsStatement, database string, user *User) *Result {
// Find the database.
db := s.database(database)
if db == nil {
return &Result{Err: ErrDatabaseNotFound}
}
// Get all measurements in sorted order.
measurements := db.Measurements()
sort.Sort(measurements)
// If a WHERE clause was specified, filter the measurements.
if stmt.Condition != nil {
var err error
measurements, err = db.measurementsByExpr(stmt.Condition)
if err != nil {
return &Result{Err: err}
}
}
offset := stmt.Offset
limit := stmt.Limit
// If OFFSET is past the end of the array, return empty results.
if offset > len(measurements)-1 {
return &Result{}
}
// Calculate last index based on LIMIT.
end := len(measurements)
if limit > 0 && offset+limit < end {
limit = offset + limit
} else {
limit = end
}
// Make result with presized list Rows.
result := &Result{
Rows: make(influxql.Rows, 0, len(measurements)),
}
fmt.Printf("o = %d, l = %d\n", offset, limit)
// Add one result row for each measurement.
for i := offset; i < limit; i++ {
m := measurements[i]
r := &influxql.Row{
Name: m.Name,
Columns: m.tagKeys(),
}
sort.Strings(r.Columns)
result.Rows = append(result.Rows, r)
}
return result
}
func (s *Server) executeShowUsersStatement(q *influxql.ShowUsersStatement, user *User) *Result {
row := &influxql.Row{Columns: []string{"user", "admin"}}
for _, user := range s.Users() {