fix SHOW SERIES
parent
3ad3abaf64
commit
204aad21d3
|
@ -9,11 +9,13 @@ import (
|
||||||
"net/http/httptest"
|
"net/http/httptest"
|
||||||
"net/url"
|
"net/url"
|
||||||
"os"
|
"os"
|
||||||
|
"reflect"
|
||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/influxdb/influxdb"
|
"github.com/influxdb/influxdb"
|
||||||
"github.com/influxdb/influxdb/httpd"
|
"github.com/influxdb/influxdb/httpd"
|
||||||
|
"github.com/influxdb/influxdb/influxql"
|
||||||
"github.com/influxdb/influxdb/messaging"
|
"github.com/influxdb/influxdb/messaging"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -778,6 +780,7 @@ func TestHandler_serveShowSeries(t *testing.T) {
|
||||||
srvr := OpenServer(NewMessagingClient())
|
srvr := OpenServer(NewMessagingClient())
|
||||||
srvr.CreateDatabase("foo")
|
srvr.CreateDatabase("foo")
|
||||||
srvr.CreateRetentionPolicy("foo", influxdb.NewRetentionPolicy("bar"))
|
srvr.CreateRetentionPolicy("foo", influxdb.NewRetentionPolicy("bar"))
|
||||||
|
srvr.SetDefaultRetentionPolicy("foo", "bar")
|
||||||
s := NewHTTPServer(srvr)
|
s := NewHTTPServer(srvr)
|
||||||
defer s.Close()
|
defer s.Close()
|
||||||
|
|
||||||
|
@ -796,17 +799,120 @@ func TestHandler_serveShowSeries(t *testing.T) {
|
||||||
|
|
||||||
var tests = []struct {
|
var tests = []struct {
|
||||||
q string
|
q string
|
||||||
r string
|
r *influxdb.Results
|
||||||
err string
|
err string
|
||||||
}{
|
}{
|
||||||
// SHOW SERIES
|
// SHOW SERIES
|
||||||
{
|
{
|
||||||
q: `SHOW SERIES`,
|
q: `SHOW SERIES`,
|
||||||
r: `{"results":[{"rows":[{"name":"cpu","columns":["host","region"],"values":[[["server01",""]],[["server01","uswest"]],[["server01","useast"]],[["server02","useast"]]]},{"name":"gpu","columns":["host","region"],"values":[[["server02","useast"]]]}]}]}`,
|
r: &influxdb.Results{
|
||||||
|
Results: []*influxdb.Result{
|
||||||
|
&influxdb.Result{
|
||||||
|
Rows: []*influxql.Row{
|
||||||
|
&influxql.Row{
|
||||||
|
Name: "cpu",
|
||||||
|
Columns: []string{"host", "region"},
|
||||||
|
Values: [][]interface{}{
|
||||||
|
str2iface([]string{"server01", ""}),
|
||||||
|
str2iface([]string{"server01", "uswest"}),
|
||||||
|
str2iface([]string{"server01", "useast"}),
|
||||||
|
str2iface([]string{"server02", "useast"}),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
&influxql.Row{
|
||||||
|
Name: "gpu",
|
||||||
|
Columns: []string{"host", "region"},
|
||||||
|
Values: [][]interface{}{
|
||||||
|
str2iface([]string{"server02", "useast"}),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
},
|
},
|
||||||
|
// SHOW SERIES ... LIMIT
|
||||||
|
// {
|
||||||
|
// q: `SHOW SERIES LIMIT 1`,
|
||||||
|
// r: &influxdb.Results{
|
||||||
|
// Results: []*influxdb.Result{
|
||||||
|
// &influxdb.Result{
|
||||||
|
// Rows: []*influxql.Row{
|
||||||
|
// &influxql.Row{
|
||||||
|
// Name: "cpu",
|
||||||
|
// Columns: []string{"host", "region"},
|
||||||
|
// Values: [][]interface{}{
|
||||||
|
// str2iface([]string{"server01", ""}),
|
||||||
|
// str2iface([]string{"server01", "uswest"}),
|
||||||
|
// str2iface([]string{"server01", "useast"}),
|
||||||
|
// str2iface([]string{"server02", "useast"}),
|
||||||
|
// },
|
||||||
|
// },
|
||||||
|
// },
|
||||||
|
// },
|
||||||
|
// },
|
||||||
|
// },
|
||||||
|
// },
|
||||||
|
// SHOW SERIES FROM
|
||||||
{
|
{
|
||||||
q: `SHOW SERIES from cpu where region = 'uswest'`,
|
q: `SHOW SERIES FROM cpu`,
|
||||||
r: `{"results":[{"rows":[{"name":"cpu","columns":["host","region"],"values":[[["server01","uswest"]]]}]}]}`,
|
r: &influxdb.Results{
|
||||||
|
Results: []*influxdb.Result{
|
||||||
|
&influxdb.Result{
|
||||||
|
Rows: []*influxql.Row{
|
||||||
|
&influxql.Row{
|
||||||
|
Name: "cpu",
|
||||||
|
Columns: []string{"host", "region"},
|
||||||
|
Values: [][]interface{}{
|
||||||
|
str2iface([]string{"server01", ""}),
|
||||||
|
str2iface([]string{"server01", "uswest"}),
|
||||||
|
str2iface([]string{"server01", "useast"}),
|
||||||
|
str2iface([]string{"server02", "useast"}),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
// SHOW SERIES WHERE
|
||||||
|
{
|
||||||
|
q: `SHOW SERIES WHERE region = 'uswest'`,
|
||||||
|
r: &influxdb.Results{
|
||||||
|
Results: []*influxdb.Result{
|
||||||
|
&influxdb.Result{
|
||||||
|
Rows: []*influxql.Row{
|
||||||
|
&influxql.Row{
|
||||||
|
Name: "cpu",
|
||||||
|
Columns: []string{"host", "region"},
|
||||||
|
Values: [][]interface{}{
|
||||||
|
str2iface([]string{"server01", "uswest"}),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
// SHOW SERIES FROM ... WHERE
|
||||||
|
{
|
||||||
|
q: `SHOW SERIES FROM cpu WHERE region = 'useast'`,
|
||||||
|
r: &influxdb.Results{
|
||||||
|
Results: []*influxdb.Result{
|
||||||
|
&influxdb.Result{
|
||||||
|
Rows: []*influxql.Row{
|
||||||
|
&influxql.Row{
|
||||||
|
Name: "cpu",
|
||||||
|
Columns: []string{"host", "region"},
|
||||||
|
Values: [][]interface{}{
|
||||||
|
str2iface([]string{"server01", "useast"}),
|
||||||
|
str2iface([]string{"server02", "useast"}),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -824,18 +930,28 @@ func TestHandler_serveShowSeries(t *testing.T) {
|
||||||
if err := json.Unmarshal([]byte(body), r); err != nil {
|
if err := json.Unmarshal([]byte(body), r); err != nil {
|
||||||
t.Logf("query #%d: %s", i, tt.q)
|
t.Logf("query #%d: %s", i, tt.q)
|
||||||
t.Log(body)
|
t.Log(body)
|
||||||
t.Error("error marshaling result: ", err)
|
t.Error(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if body != tt.r {
|
if !reflect.DeepEqual(tt.err, errstring(r.Err)) {
|
||||||
t.Errorf("result mismatch\n exp: %s\n got: %s\n", tt.r, body)
|
t.Errorf("%d. %s: error mismatch:\n exp=%s\n got=%s\n\n", i, tt.q, tt.err, r.Err)
|
||||||
|
} else if tt.err == "" && !reflect.DeepEqual(tt.r, r) {
|
||||||
|
|
||||||
|
t.Log(body)
|
||||||
|
t.Log("")
|
||||||
|
b, _ := json.Marshal(tt.r)
|
||||||
|
t.Log(string(b))
|
||||||
|
if body == string(b) {
|
||||||
|
t.Log("****** strings are equal")
|
||||||
|
}
|
||||||
|
t.Errorf("%d. %s: result mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.q, tt.r, r)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// str2iface converts an array of strings to an array of interfaces.
|
// str2iface converts an array of strings to an array of interfaces.
|
||||||
func str2iface(strs []string) []interface{} {
|
func str2iface(strs []string) []interface{} {
|
||||||
a := make([]interface{}, len(strs))
|
a := make([]interface{}, 0, len(strs))
|
||||||
for _, s := range strs {
|
for _, s := range strs {
|
||||||
a = append(a, interface{}(s))
|
a = append(a, interface{}(s))
|
||||||
}
|
}
|
||||||
|
|
23
server.go
23
server.go
|
@ -1754,9 +1754,10 @@ func (s *Server) executeDropUserStatement(q *influxql.DropUserStatement, user *U
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Server) executeShowSeriesStatement(stmt *influxql.ShowSeriesStatement, database string, user *User) *Result {
|
func (s *Server) executeShowSeriesStatement(stmt *influxql.ShowSeriesStatement, database string, user *User) *Result {
|
||||||
// Find the database.
|
|
||||||
s.mu.RLock()
|
s.mu.RLock()
|
||||||
defer s.mu.RUnlock()
|
defer s.mu.RUnlock()
|
||||||
|
|
||||||
|
// Find the database.
|
||||||
db := s.databases[database]
|
db := s.databases[database]
|
||||||
if db == nil {
|
if db == nil {
|
||||||
return &Result{Err: ErrDatabaseNotFound}
|
return &Result{Err: ErrDatabaseNotFound}
|
||||||
|
@ -1767,7 +1768,11 @@ func (s *Server) executeShowSeriesStatement(stmt *influxql.ShowSeriesStatement,
|
||||||
if stmt.Source != nil {
|
if stmt.Source != nil {
|
||||||
// TODO: handle multiple measurement sources
|
// TODO: handle multiple measurement sources
|
||||||
if m, ok := stmt.Source.(*influxql.Measurement); ok {
|
if m, ok := stmt.Source.(*influxql.Measurement); ok {
|
||||||
measurements = append(measurements, m.Name)
|
segments, err := influxql.SplitIdent(m.Name)
|
||||||
|
if err != nil {
|
||||||
|
return &Result{Err: err}
|
||||||
|
}
|
||||||
|
measurements = append(measurements, segments[2])
|
||||||
} else {
|
} else {
|
||||||
return &Result{Err: ErrMeasurementNotFound}
|
return &Result{Err: ErrMeasurementNotFound}
|
||||||
}
|
}
|
||||||
|
@ -1776,6 +1781,11 @@ func (s *Server) executeShowSeriesStatement(stmt *influxql.ShowSeriesStatement,
|
||||||
measurements = db.MeasurementNames()
|
measurements = db.MeasurementNames()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// If OFFSET is past the end of the array, return empty results.
|
||||||
|
if stmt.Offset > len(measurements)-1 {
|
||||||
|
return &Result{}
|
||||||
|
}
|
||||||
|
|
||||||
// Sort measurement names so results are always in the same order.
|
// Sort measurement names so results are always in the same order.
|
||||||
sort.Strings(measurements)
|
sort.Strings(measurements)
|
||||||
|
|
||||||
|
@ -1799,6 +1809,11 @@ func (s *Server) executeShowSeriesStatement(stmt *influxql.ShowSeriesStatement,
|
||||||
filters := map[uint32]influxql.Expr{}
|
filters := map[uint32]influxql.Expr{}
|
||||||
ids, _, _ = m.walkWhereForSeriesIds(stmt.Condition, filters)
|
ids, _, _ = m.walkWhereForSeriesIds(stmt.Condition, filters)
|
||||||
|
|
||||||
|
// If no series matched, then go to the next measurement.
|
||||||
|
if len(ids) == 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
// TODO: check return of walkWhereForSeriesIds for fields
|
// TODO: check return of walkWhereForSeriesIds for fields
|
||||||
} else {
|
} else {
|
||||||
// No WHERE clause so get all series IDs for this measurement.
|
// No WHERE clause so get all series IDs for this measurement.
|
||||||
|
@ -1814,13 +1829,13 @@ func (s *Server) executeShowSeriesStatement(stmt *influxql.ShowSeriesStatement,
|
||||||
// Loop through series IDs getting matching tag sets.
|
// Loop through series IDs getting matching tag sets.
|
||||||
for _, id := range ids {
|
for _, id := range ids {
|
||||||
if s, ok := m.seriesByID[id]; ok {
|
if s, ok := m.seriesByID[id]; ok {
|
||||||
values := make([]string, 0, len(r.Columns))
|
values := make([]interface{}, 0, len(r.Columns))
|
||||||
for _, column := range r.Columns {
|
for _, column := range r.Columns {
|
||||||
values = append(values, s.Tags[column])
|
values = append(values, s.Tags[column])
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add the tag values to the row.
|
// Add the tag values to the row.
|
||||||
r.Values = append(r.Values, []interface{}{values})
|
r.Values = append(r.Values, values)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue