diff --git a/database.go b/database.go index 304d661652..6e310cfb15 100644 --- a/database.go +++ b/database.go @@ -1057,3 +1057,171 @@ func (m *Measurement) seriesIDsByFilter(filter *TagFilter) (ids seriesIDs) { return } + +func (a Measurements) Len() int { return len(a) } +func (a Measurements) Less(i, j int) bool { return a[i].Name < a[j].Name } +func (a Measurements) Swap(i, j int) { a[i], a[j] = a[j], a[i] } + +func (a Measurements) intersect(other Measurements) Measurements { + l := a + r := other + + // we want to iterate through the shortest one and stop + if len(other) < len(a) { + l = other + r = a + } + + // they're in sorted order so advance the counter as needed. + // That is, don't run comparisons against lower values that we've already passed + var i, j int + + result := make(Measurements, 0, len(l)) + for i < len(l) && j < len(r) { + if l[i].Name == r[j].Name { + result = append(result, l[i]) + i += 1 + j += 1 + } else if l[i].Name < r[j].Name { + i += 1 + } else { + j += 1 + } + } + + return result +} + +func (a Measurements) union(other Measurements) Measurements { + result := make(Measurements, 0, len(a)+len(other)) + var i, j int + for i < len(a) && j < len(other) { + if a[i].Name == other[j].Name { + result = append(result, a[i]) + i += 1 + j += 1 + } else if a[i].Name < other[j].Name { + result = append(result, a[i]) + i += 1 + } else { + result = append(result, other[j]) + j += 1 + } + } + + // now append the remainder + if i < len(a) { + result = append(result, a[i:]...) + } else if j < len(other) { + result = append(result, other[j:]...) + } + + return result +} + +// measurementsByExpr takes and expression containing only tags and returns +// a list of matching *Measurement. +func (d *database) measurementsByExpr(expr influxql.Expr) (Measurements, error) { + switch e := expr.(type) { + case *influxql.BinaryExpr: + switch e.Op { + case influxql.EQ, influxql.NEQ: + tag, ok := e.LHS.(*influxql.VarRef) + if !ok { + return nil, fmt.Errorf("left side of '=' must be a tag name") + } + + value, ok := e.RHS.(*influxql.StringLiteral) + if !ok { + return nil, fmt.Errorf("right side of '=' must be a tag value string") + } + + tf := &TagFilter{ + Not: e.Op == influxql.NEQ, + Key: tag.Val, + Value: value.Val, + } + return d.measurementsByTagFilters([]*TagFilter{tf}), nil + case influxql.OR, influxql.AND: + lhsIDs, err := d.measurementsByExpr(e.LHS) + if err != nil { + return nil, err + } + + rhsIDs, err := d.measurementsByExpr(e.RHS) + if err != nil { + return nil, err + } + + if e.Op == influxql.OR { + return lhsIDs.union(rhsIDs), nil + } else { + return lhsIDs.intersect(rhsIDs), nil + } + default: + return nil, fmt.Errorf("invalid operator") + } + case *influxql.ParenExpr: + return d.measurementsByExpr(e.Expr) + } + return nil, fmt.Errorf("%#v", expr) +} + +func (d *database) measurementsByTagFilters(filters []*TagFilter) Measurements { + // If no filters, then return all measurements. + if len(filters) == 0 { + measurements := make(Measurements, 0, len(d.measurements)) + for _, m := range d.measurements { + measurements = append(measurements, m) + } + return measurements + } + + // Build a list of measurements matching the filters. + var measurements Measurements + var tagMatch bool + for _, m := range d.measurements { + for _, f := range filters { + tagMatch = false + if tagVals, ok := m.seriesByTagKeyValue[f.Key]; ok { + if _, ok := tagVals[f.Value]; ok { + tagMatch = true + } + } + + isEQ := !f.Not + + // tags match | operation is EQ | measurement matches + // -------------------------------------------------- + // True | True | True + // True | False | False + // False | True | False + // False | False | True + + if tagMatch == isEQ { + measurements = append(measurements, m) + break + } + } + } + + return measurements +} + +// Measurements returns a list of all measurements. +func (d *database) Measurements() Measurements { + measurements := make(Measurements, 0, len(d.measurements)) + for _, m := range d.measurements { + measurements = append(measurements, m) + } + return measurements +} + +// tagKeys returns a list of the measurement's tag names. +func (m *Measurement) tagKeys() []string { + keys := make([]string, 0, len(m.seriesByTagKeyValue)) + for k, _ := range m.seriesByTagKeyValue { + keys = append(keys, k) + } + return keys +} diff --git a/httpd/handler_test.go b/httpd/handler_test.go index 83229e0d17..8d8b17cdea 100644 --- a/httpd/handler_test.go +++ b/httpd/handler_test.go @@ -805,6 +805,38 @@ func TestHandler_serveShowSeries(t *testing.T) { t.Fatalf("test") } +func TestHandler_serveShowMeasurements(t *testing.T) { + srvr := OpenServer(NewMessagingClient()) + srvr.CreateDatabase("foo") + srvr.CreateRetentionPolicy("foo", influxdb.NewRetentionPolicy("bar")) + s := NewHTTPServer(srvr) + defer s.Close() + + status, body := MustHTTP("POST", s.URL+`/write`, nil, nil, `{"database" : "foo", "retentionPolicy" : "bar", "points": [ + {"name": "cpu", "tags": {"host": "server01"},"timestamp": "2009-11-10T23:00:00Z","values": {"value": 100}}, + {"name": "cpu", "tags": {"host": "server01", "region": "uswest"},"timestamp": "2009-11-10T23:00:00Z","values": {"value": 100}}, + {"name": "cpu", "tags": {"host": "server01", "region": "useast"},"timestamp": "2009-11-10T23:00:00Z","values": {"value": 100}}, + {"name": "cpu", "tags": {"host": "server02", "region": "useast"},"timestamp": "2009-11-10T23:00:00Z","values": {"value": 100}}, + {"name": "gpu", "tags": {"host": "server02", "region": "useast"},"timestamp": "2009-11-10T23:00:00Z","values": {"value": 100}} + ]}`) + + if status != http.StatusOK { + t.Log(body) + t.Fatalf("unexpected status after write: %d", status) + } + + query := map[string]string{"db": "foo", "q": "SHOW MEASUREMENTS LIMIT 2"} + status, body = MustHTTP("GET", s.URL+`/query`, query, nil, "") + + if status != http.StatusOK { + t.Log(body) + t.Fatalf("unexpected status after query: %d", status) + } + + t.Log(body) + t.Fatalf("test") +} + // Utility functions for this test suite. func MustHTTP(verb, path string, params, headers map[string]string, body string) (int, string) { diff --git a/server.go b/server.go index bf025b6a27..a741d87903 100644 --- a/server.go +++ b/server.go @@ -1587,7 +1587,7 @@ func (s *Server) ExecuteQuery(q *influxql.Query, database string, user *User) Re case *influxql.ShowSeriesStatement: res = s.executeShowSeriesStatement(stmt, database, user) case *influxql.ShowMeasurementsStatement: - continue + res = s.executeShowMeasurementsStatement(stmt, database, user) case *influxql.ShowTagKeysStatement: continue case *influxql.ShowTagValuesStatement: @@ -1709,12 +1709,10 @@ func (s *Server) executeShowSeriesStatement(stmt *influxql.ShowSeriesStatement, return &Result{Err: ErrDatabaseNotFound} } - fmt.Printf("# series = %d\n", len(db.series)) - // Make a list of measurements we're interested in. var measurements []string if stmt.Source != nil { - // TODO: (david) handle multiple measurement sources + // TODO: handle multiple measurement sources if m, ok := stmt.Source.(*influxql.Measurement); ok { measurements = append(measurements, m.Name) } else { @@ -1736,6 +1734,8 @@ func (s *Server) executeShowSeriesStatement(stmt *influxql.ShowSeriesStatement, Rows: make(influxql.Rows, 0, len(ids)), } + // TODO: support OFFSET & LIMIT + // Add one result row for each series. for _, id := range ids { if series := db.Series(id); series != nil { @@ -1754,6 +1754,64 @@ func (s *Server) executeShowSeriesStatement(stmt *influxql.ShowSeriesStatement, return result } +func (s *Server) executeShowMeasurementsStatement(stmt *influxql.ShowMeasurementsStatement, database string, user *User) *Result { + // Find the database. + db := s.database(database) + if db == nil { + return &Result{Err: ErrDatabaseNotFound} + } + + // Get all measurements in sorted order. + measurements := db.Measurements() + sort.Sort(measurements) + + // If a WHERE clause was specified, filter the measurements. + if stmt.Condition != nil { + var err error + measurements, err = db.measurementsByExpr(stmt.Condition) + if err != nil { + return &Result{Err: err} + } + } + + offset := stmt.Offset + limit := stmt.Limit + + // If OFFSET is past the end of the array, return empty results. + if offset > len(measurements)-1 { + return &Result{} + } + + // Calculate last index based on LIMIT. + end := len(measurements) + if limit > 0 && offset+limit < end { + limit = offset + limit + } else { + limit = end + } + + // Make result with presized list Rows. + result := &Result{ + Rows: make(influxql.Rows, 0, len(measurements)), + } + + fmt.Printf("o = %d, l = %d\n", offset, limit) + + // Add one result row for each measurement. + for i := offset; i < limit; i++ { + m := measurements[i] + r := &influxql.Row{ + Name: m.Name, + Columns: m.tagKeys(), + } + sort.Strings(r.Columns) + + result.Rows = append(result.Rows, r) + } + + return result +} + func (s *Server) executeShowUsersStatement(q *influxql.ShowUsersStatement, user *User) *Result { row := &influxql.Row{Columns: []string{"user", "admin"}} for _, user := range s.Users() {