SHOW SERIES
parent
8b6efb49d2
commit
41dde61226
|
@ -57,6 +57,7 @@
|
|||
- [#5880](https://github.com/influxdata/influxdb/issues/5880): TCP connection closed after write (regression/change from 0.9.6)
|
||||
- [#5865](https://github.com/influxdata/influxdb/issues/5865): Conversion to tsm fails with exceeds max index value
|
||||
- [#5924](https://github.com/influxdata/influxdb/issues/5924): Missing data after using influx\_tsm
|
||||
- [#5937](https://github.com/influxdata/influxdb/pull/5937): Rewrite SHOW SERIES to use query engine
|
||||
|
||||
## v0.10.2 [2016-03-03]
|
||||
### Bugfixes
|
||||
|
|
|
@ -172,8 +172,6 @@ func (e *QueryExecutor) executeQuery(query *influxql.Query, database string, chu
|
|||
rows, err = e.executeShowGrantsForUserStatement(stmt)
|
||||
case *influxql.ShowRetentionPoliciesStatement:
|
||||
rows, err = e.executeShowRetentionPoliciesStatement(stmt)
|
||||
case *influxql.ShowSeriesStatement:
|
||||
rows, err = e.executeShowSeriesStatement(stmt, database)
|
||||
case *influxql.ShowServersStatement:
|
||||
rows, err = e.executeShowServersStatement(stmt)
|
||||
case *influxql.ShowShardsStatement:
|
||||
|
@ -667,10 +665,6 @@ func (e *QueryExecutor) executeShowRetentionPoliciesStatement(q *influxql.ShowRe
|
|||
return []*models.Row{row}, nil
|
||||
}
|
||||
|
||||
func (e *QueryExecutor) executeShowSeriesStatement(stmt *influxql.ShowSeriesStatement, database string) (models.Rows, error) {
|
||||
return e.TSDBStore.ExecuteShowSeriesStatement(stmt, database)
|
||||
}
|
||||
|
||||
func (e *QueryExecutor) executeShowServersStatement(q *influxql.ShowServersStatement) (models.Rows, error) {
|
||||
nis, err := e.MetaClient.DataNodes()
|
||||
if err != nil {
|
||||
|
@ -1100,7 +1094,6 @@ type TSDBStore interface {
|
|||
DeleteRetentionPolicy(database, name string) error
|
||||
DeleteSeries(database string, sources []influxql.Source, condition influxql.Expr) error
|
||||
ExecuteShowFieldKeysStatement(stmt *influxql.ShowFieldKeysStatement, database string) (models.Rows, error)
|
||||
ExecuteShowSeriesStatement(stmt *influxql.ShowSeriesStatement, database string) (models.Rows, error)
|
||||
ExecuteShowTagValuesStatement(stmt *influxql.ShowTagValuesStatement, database string) (models.Rows, error)
|
||||
ExpandSources(sources influxql.Sources) (influxql.Sources, error)
|
||||
ShardIteratorCreator(id uint64) influxql.IteratorCreator
|
||||
|
|
|
@ -195,7 +195,6 @@ type TSDBStore struct {
|
|||
DeleteRetentionPolicyFn func(database, name string) error
|
||||
DeleteSeriesFn func(database string, sources []influxql.Source, condition influxql.Expr) error
|
||||
ExecuteShowFieldKeysStatementFn func(stmt *influxql.ShowFieldKeysStatement, database string) (models.Rows, error)
|
||||
ExecuteShowSeriesStatementFn func(stmt *influxql.ShowSeriesStatement, database string) (models.Rows, error)
|
||||
ExecuteShowTagValuesStatementFn func(stmt *influxql.ShowTagValuesStatement, database string) (models.Rows, error)
|
||||
ExpandSourcesFn func(sources influxql.Sources) (influxql.Sources, error)
|
||||
ShardIteratorCreatorFn func(id uint64) influxql.IteratorCreator
|
||||
|
@ -232,10 +231,6 @@ func (s *TSDBStore) ExecuteShowFieldKeysStatement(stmt *influxql.ShowFieldKeysSt
|
|||
return s.ExecuteShowFieldKeysStatementFn(stmt, database)
|
||||
}
|
||||
|
||||
func (s *TSDBStore) ExecuteShowSeriesStatement(stmt *influxql.ShowSeriesStatement, database string) (models.Rows, error) {
|
||||
return s.ExecuteShowSeriesStatementFn(stmt, database)
|
||||
}
|
||||
|
||||
func (s *TSDBStore) ExecuteShowTagValuesStatement(stmt *influxql.ShowTagValuesStatement, database string) (models.Rows, error) {
|
||||
return s.ExecuteShowTagValuesStatementFn(stmt, database)
|
||||
}
|
||||
|
|
|
@ -205,7 +205,7 @@ func init() {
|
|||
&Query{
|
||||
name: "Show series is present",
|
||||
command: `SHOW SERIES`,
|
||||
exp: `{"results":[{"series":[{"name":"cpu","columns":["_key","host","region"],"values":[["cpu,host=serverA,region=uswest","serverA","uswest"]]}]}]}`,
|
||||
exp: `{"results":[{"series":[{"columns":["key"],"values":[["cpu,host=serverA,region=uswest"]]}]}]}`,
|
||||
params: url.Values{"db": []string{"db0"}},
|
||||
},
|
||||
&Query{
|
||||
|
@ -239,7 +239,7 @@ func init() {
|
|||
&Query{
|
||||
name: "Show series is present again after re-write",
|
||||
command: `SHOW SERIES`,
|
||||
exp: `{"results":[{"series":[{"name":"cpu","columns":["_key","host","region"],"values":[["cpu,host=serverA,region=uswest","serverA","uswest"]]}]}]}`,
|
||||
exp: `{"results":[{"series":[{"columns":["key"],"values":[["cpu,host=serverA,region=uswest"]]}]}]}`,
|
||||
params: url.Values{"db": []string{"db0"}},
|
||||
},
|
||||
},
|
||||
|
@ -260,7 +260,7 @@ func init() {
|
|||
&Query{
|
||||
name: "Show series is present",
|
||||
command: `SHOW SERIES`,
|
||||
exp: `{"results":[{"series":[{"name":"a","columns":["_key","host","region"],"values":[["a,host=serverA,region=uswest","serverA","uswest"]]},{"name":"aa","columns":["_key","host","region"],"values":[["aa,host=serverA,region=uswest","serverA","uswest"]]},{"name":"b","columns":["_key","host","region"],"values":[["b,host=serverA,region=uswest","serverA","uswest"]]},{"name":"c","columns":["_key","host","region"],"values":[["c,host=serverA,region=uswest","serverA","uswest"]]}]}]}`,
|
||||
exp: `{"results":[{"series":[{"columns":["key"],"values":[["a,host=serverA,region=uswest"],["aa,host=serverA,region=uswest"],["b,host=serverA,region=uswest"],["c,host=serverA,region=uswest"]]}]}]}`,
|
||||
params: url.Values{"db": []string{"db0"}},
|
||||
},
|
||||
&Query{
|
||||
|
@ -273,7 +273,7 @@ func init() {
|
|||
&Query{
|
||||
name: "Show series is gone",
|
||||
command: `SHOW SERIES`,
|
||||
exp: `{"results":[{"series":[{"name":"b","columns":["_key","host","region"],"values":[["b,host=serverA,region=uswest","serverA","uswest"]]},{"name":"c","columns":["_key","host","region"],"values":[["c,host=serverA,region=uswest","serverA","uswest"]]}]}]}`,
|
||||
exp: `{"results":[{"series":[{"columns":["key"],"values":[["b,host=serverA,region=uswest"],["c,host=serverA,region=uswest"]]}]}]}`,
|
||||
params: url.Values{"db": []string{"db0"}},
|
||||
},
|
||||
&Query{
|
||||
|
@ -286,7 +286,7 @@ func init() {
|
|||
&Query{
|
||||
name: "make sure DROP SERIES doesn't delete anything when regex doesn't match",
|
||||
command: `SHOW SERIES`,
|
||||
exp: `{"results":[{"series":[{"name":"b","columns":["_key","host","region"],"values":[["b,host=serverA,region=uswest","serverA","uswest"]]},{"name":"c","columns":["_key","host","region"],"values":[["c,host=serverA,region=uswest","serverA","uswest"]]}]}]}`,
|
||||
exp: `{"results":[{"series":[{"columns":["key"],"values":[["b,host=serverA,region=uswest"],["c,host=serverA,region=uswest"]]}]}]}`,
|
||||
params: url.Values{"db": []string{"db0"}},
|
||||
},
|
||||
&Query{
|
||||
|
@ -298,7 +298,7 @@ func init() {
|
|||
&Query{
|
||||
name: "make sure DROP SERIES with field in WHERE didn't delete data",
|
||||
command: `SHOW SERIES`,
|
||||
exp: `{"results":[{"series":[{"name":"b","columns":["_key","host","region"],"values":[["b,host=serverA,region=uswest","serverA","uswest"]]},{"name":"c","columns":["_key","host","region"],"values":[["c,host=serverA,region=uswest","serverA","uswest"]]}]}]}`,
|
||||
exp: `{"results":[{"series":[{"columns":["key"],"values":[["b,host=serverA,region=uswest"],["c,host=serverA,region=uswest"]]}]}]}`,
|
||||
params: url.Values{"db": []string{"db0"}},
|
||||
},
|
||||
&Query{
|
||||
|
|
|
@ -4599,7 +4599,7 @@ func TestServer_Query_DropAndRecreateMeasurement(t *testing.T) {
|
|||
&Query{
|
||||
name: "show series",
|
||||
command: `SHOW SERIES`,
|
||||
exp: `{"results":[{"series":[{"name":"cpu","columns":["_key","host","region"],"values":[["cpu,host=serverA,region=uswest","serverA","uswest"]]},{"name":"memory","columns":["_key","host","region"],"values":[["memory,host=serverB,region=uswest","serverB","uswest"]]}]}]}`,
|
||||
exp: `{"results":[{"series":[{"columns":["key"],"values":[["cpu,host=serverA,region=uswest"],["memory,host=serverB,region=uswest"]]}]}]}`,
|
||||
params: url.Values{"db": []string{"db0"}},
|
||||
},
|
||||
&Query{
|
||||
|
@ -4623,7 +4623,7 @@ func TestServer_Query_DropAndRecreateMeasurement(t *testing.T) {
|
|||
&Query{
|
||||
name: "verify series",
|
||||
command: `SHOW SERIES`,
|
||||
exp: `{"results":[{"series":[{"name":"memory","columns":["_key","host","region"],"values":[["memory,host=serverB,region=uswest","serverB","uswest"]]}]}]}`,
|
||||
exp: `{"results":[{"series":[{"columns":["key"],"values":[["memory,host=serverB,region=uswest"]]}]}]}`,
|
||||
params: url.Values{"db": []string{"db0"}},
|
||||
},
|
||||
&Query{
|
||||
|
@ -4751,43 +4751,43 @@ func TestServer_Query_ShowSeries(t *testing.T) {
|
|||
&Query{
|
||||
name: `show series`,
|
||||
command: "SHOW SERIES",
|
||||
exp: `{"results":[{"series":[{"name":"cpu","columns":["_key","host","region"],"values":[["cpu,host=server01","server01",""],["cpu,host=server01,region=uswest","server01","uswest"],["cpu,host=server01,region=useast","server01","useast"],["cpu,host=server02,region=useast","server02","useast"]]},{"name":"disk","columns":["_key","host","region"],"values":[["disk,host=server03,region=caeast","server03","caeast"]]},{"name":"gpu","columns":["_key","host","region"],"values":[["gpu,host=server02,region=useast","server02","useast"],["gpu,host=server03,region=caeast","server03","caeast"]]}]}]}`,
|
||||
exp: `{"results":[{"series":[{"columns":["key"],"values":[["cpu,host=server01"],["cpu,host=server01,region=useast"],["cpu,host=server01,region=uswest"],["cpu,host=server02,region=useast"],["disk,host=server03,region=caeast"],["gpu,host=server02,region=useast"],["gpu,host=server03,region=caeast"]]}]}]}`,
|
||||
params: url.Values{"db": []string{"db0"}},
|
||||
},
|
||||
&Query{
|
||||
name: `show series from measurement`,
|
||||
command: "SHOW SERIES FROM cpu",
|
||||
exp: `{"results":[{"series":[{"name":"cpu","columns":["_key","host","region"],"values":[["cpu,host=server01","server01",""],["cpu,host=server01,region=uswest","server01","uswest"],["cpu,host=server01,region=useast","server01","useast"],["cpu,host=server02,region=useast","server02","useast"]]}]}]}`,
|
||||
exp: `{"results":[{"series":[{"columns":["key"],"values":[["cpu,host=server01"],["cpu,host=server01,region=useast"],["cpu,host=server01,region=uswest"],["cpu,host=server02,region=useast"]]}]}]}`,
|
||||
params: url.Values{"db": []string{"db0"}},
|
||||
},
|
||||
&Query{
|
||||
name: `show series from regular expression`,
|
||||
command: "SHOW SERIES FROM /[cg]pu/",
|
||||
exp: `{"results":[{"series":[{"name":"cpu","columns":["_key","host","region"],"values":[["cpu,host=server01","server01",""],["cpu,host=server01,region=uswest","server01","uswest"],["cpu,host=server01,region=useast","server01","useast"],["cpu,host=server02,region=useast","server02","useast"]]},{"name":"gpu","columns":["_key","host","region"],"values":[["gpu,host=server02,region=useast","server02","useast"],["gpu,host=server03,region=caeast","server03","caeast"]]}]}]}`,
|
||||
exp: `{"results":[{"series":[{"columns":["key"],"values":[["cpu,host=server01"],["cpu,host=server01,region=useast"],["cpu,host=server01,region=uswest"],["cpu,host=server02,region=useast"],["gpu,host=server02,region=useast"],["gpu,host=server03,region=caeast"]]}]}]}`,
|
||||
params: url.Values{"db": []string{"db0"}},
|
||||
},
|
||||
&Query{
|
||||
name: `show series with where tag`,
|
||||
command: "SHOW SERIES WHERE region = 'uswest'",
|
||||
exp: `{"results":[{"series":[{"name":"cpu","columns":["_key","host","region"],"values":[["cpu,host=server01,region=uswest","server01","uswest"]]}]}]}`,
|
||||
exp: `{"results":[{"series":[{"columns":["key"],"values":[["cpu,host=server01,region=uswest"]]}]}]}`,
|
||||
params: url.Values{"db": []string{"db0"}},
|
||||
},
|
||||
&Query{
|
||||
name: `show series where tag matches regular expression`,
|
||||
command: "SHOW SERIES WHERE region =~ /ca.*/",
|
||||
exp: `{"results":[{"series":[{"name":"disk","columns":["_key","host","region"],"values":[["disk,host=server03,region=caeast","server03","caeast"]]},{"name":"gpu","columns":["_key","host","region"],"values":[["gpu,host=server03,region=caeast","server03","caeast"]]}]}]}`,
|
||||
exp: `{"results":[{"series":[{"columns":["key"],"values":[["disk,host=server03,region=caeast"],["gpu,host=server03,region=caeast"]]}]}]}`,
|
||||
params: url.Values{"db": []string{"db0"}},
|
||||
},
|
||||
&Query{
|
||||
name: `show series`,
|
||||
command: "SHOW SERIES WHERE host !~ /server0[12]/",
|
||||
exp: `{"results":[{"series":[{"name":"disk","columns":["_key","host","region"],"values":[["disk,host=server03,region=caeast","server03","caeast"]]},{"name":"gpu","columns":["_key","host","region"],"values":[["gpu,host=server03,region=caeast","server03","caeast"]]}]}]}`,
|
||||
exp: `{"results":[{"series":[{"columns":["key"],"values":[["disk,host=server03,region=caeast"],["gpu,host=server03,region=caeast"]]}]}]}`,
|
||||
params: url.Values{"db": []string{"db0"}},
|
||||
},
|
||||
&Query{
|
||||
name: `show series with from and where`,
|
||||
command: "SHOW SERIES FROM cpu WHERE region = 'useast'",
|
||||
exp: `{"results":[{"series":[{"name":"cpu","columns":["_key","host","region"],"values":[["cpu,host=server01,region=useast","server01","useast"],["cpu,host=server02,region=useast","server02","useast"]]}]}]}`,
|
||||
exp: `{"results":[{"series":[{"columns":["key"],"values":[["cpu,host=server01,region=useast"],["cpu,host=server02,region=useast"]]}]}]}`,
|
||||
params: url.Values{"db": []string{"db0"}},
|
||||
},
|
||||
&Query{
|
||||
|
@ -4799,7 +4799,7 @@ func TestServer_Query_ShowSeries(t *testing.T) {
|
|||
&Query{
|
||||
name: `show series with WHERE field should fail`,
|
||||
command: "SHOW SERIES WHERE value > 10.0",
|
||||
exp: `{"results":[{"error":"SHOW SERIES doesn't support fields in WHERE clause"}]}`,
|
||||
exp: `{"results":[{"error":"invalid tag comparison operator"}]}`,
|
||||
params: url.Values{"db": []string{"db0"}},
|
||||
},
|
||||
}...)
|
||||
|
|
|
@ -9,6 +9,8 @@ func RewriteStatement(stmt Statement) (Statement, error) {
|
|||
return rewriteShowFieldKeysStatement(stmt)
|
||||
case *ShowMeasurementsStatement:
|
||||
return rewriteShowMeasurementsStatement(stmt)
|
||||
case *ShowSeriesStatement:
|
||||
return rewriteShowSeriesStatement(stmt)
|
||||
case *ShowTagKeysStatement:
|
||||
return rewriteShowTagKeysStatement(stmt)
|
||||
case *ShowTagValuesStatement:
|
||||
|
@ -81,21 +83,20 @@ func rewriteShowMeasurementsStatement(stmt *ShowMeasurementsStatement) (Statemen
|
|||
}, nil
|
||||
}
|
||||
|
||||
func rewriteShowTagKeysStatement(stmt *ShowTagKeysStatement) (Statement, error) {
|
||||
func rewriteShowSeriesStatement(stmt *ShowSeriesStatement) (Statement, error) {
|
||||
// Check for time in WHERE clause (not supported).
|
||||
if HasTimeExpr(stmt.Condition) {
|
||||
return nil, errors.New("SHOW TAG KEYS doesn't support time in WHERE clause")
|
||||
return nil, errors.New("SHOW SERIES doesn't support time in WHERE clause")
|
||||
}
|
||||
|
||||
condition := rewriteSourcesCondition(stmt.Sources, stmt.Condition)
|
||||
return &SelectStatement{
|
||||
Fields: []*Field{
|
||||
{Expr: &VarRef{Val: "tagKey"}},
|
||||
{Expr: &VarRef{Val: "key"}},
|
||||
},
|
||||
Sources: []Source{
|
||||
&Measurement{Name: "_tagKeys"},
|
||||
&Measurement{Name: "_series"},
|
||||
},
|
||||
Condition: condition,
|
||||
Condition: rewriteSourcesCondition(stmt.Sources, stmt.Condition),
|
||||
Offset: stmt.Offset,
|
||||
Limit: stmt.Limit,
|
||||
SortFields: stmt.SortFields,
|
||||
|
@ -161,6 +162,28 @@ func rewriteShowTagValuesStatement(stmt *ShowTagValuesStatement) (Statement, err
|
|||
}, nil
|
||||
}
|
||||
|
||||
func rewriteShowTagKeysStatement(stmt *ShowTagKeysStatement) (Statement, error) {
|
||||
// Check for time in WHERE clause (not supported).
|
||||
if HasTimeExpr(stmt.Condition) {
|
||||
return nil, errors.New("SHOW TAG KEYS doesn't support time in WHERE clause")
|
||||
}
|
||||
|
||||
return &SelectStatement{
|
||||
Fields: []*Field{
|
||||
{Expr: &VarRef{Val: "tagKey"}},
|
||||
},
|
||||
Sources: []Source{
|
||||
&Measurement{Name: "_tagKeys"},
|
||||
},
|
||||
Condition: rewriteSourcesCondition(stmt.Sources, stmt.Condition),
|
||||
Offset: stmt.Offset,
|
||||
Limit: stmt.Limit,
|
||||
SortFields: stmt.SortFields,
|
||||
OmitTime: true,
|
||||
Dedupe: true,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// rewriteSourcesCondition rewrites sources into `name` expressions.
|
||||
// Merges with cond and returns a new condition.
|
||||
func rewriteSourcesCondition(sources Sources, cond Expr) Expr {
|
||||
|
|
23
tsdb/meta.go
23
tsdb/meta.go
|
@ -209,7 +209,7 @@ func (d *DatabaseIndex) measurementsByExpr(expr influxql.Expr) (Measurements, bo
|
|||
}
|
||||
return nil, false, nil
|
||||
default:
|
||||
return nil, false, fmt.Errorf("invalid operator")
|
||||
return nil, false, fmt.Errorf("invalid tag comparison operator")
|
||||
}
|
||||
case *influxql.ParenExpr:
|
||||
return d.measurementsByExpr(e.Expr)
|
||||
|
@ -740,12 +740,12 @@ func (m *Measurement) idsForExpr(n *influxql.BinaryExpr) (SeriesIDs, influxql.Ex
|
|||
|
||||
// For fields, return all series IDs from this measurement and return
|
||||
// the expression passed in, as the filter.
|
||||
if m.HasField(name.Val) {
|
||||
if name.Val != "name" && m.HasField(name.Val) {
|
||||
return m.seriesIDs, n, nil
|
||||
}
|
||||
|
||||
tagVals, ok := m.seriesByTagKeyValue[name.Val]
|
||||
if !ok {
|
||||
if name.Val != "name" && !ok {
|
||||
return nil, nil, nil
|
||||
}
|
||||
|
||||
|
@ -753,6 +753,14 @@ func (m *Measurement) idsForExpr(n *influxql.BinaryExpr) (SeriesIDs, influxql.Ex
|
|||
if str, ok := value.(*influxql.StringLiteral); ok {
|
||||
var ids SeriesIDs
|
||||
|
||||
// Special handling for "name" to match measurement name.
|
||||
if name.Val == "name" {
|
||||
if (n.Op == influxql.EQ && str.Val == m.Name) || (n.Op == influxql.NEQ && str.Val != m.Name) {
|
||||
return m.seriesIDs, &influxql.BooleanLiteral{Val: true}, nil
|
||||
}
|
||||
return nil, &influxql.BooleanLiteral{Val: true}, nil
|
||||
}
|
||||
|
||||
if n.Op == influxql.EQ {
|
||||
// return series that have a tag of specific value.
|
||||
ids = tagVals[str.Val]
|
||||
|
@ -766,6 +774,15 @@ func (m *Measurement) idsForExpr(n *influxql.BinaryExpr) (SeriesIDs, influxql.Ex
|
|||
if re, ok := value.(*influxql.RegexLiteral); ok {
|
||||
var ids SeriesIDs
|
||||
|
||||
// Special handling for "name" to match measurement name.
|
||||
if name.Val == "name" {
|
||||
match := re.Val.MatchString(m.Name)
|
||||
if (n.Op == influxql.EQREGEX && match) || (n.Op == influxql.NEQREGEX && !match) {
|
||||
return m.seriesIDs, &influxql.BooleanLiteral{Val: true}, nil
|
||||
}
|
||||
return nil, &influxql.BooleanLiteral{Val: true}, nil
|
||||
}
|
||||
|
||||
// The operation is a NEQREGEX, code must start by assuming all match, even
|
||||
// series without any tags.
|
||||
if n.Op == influxql.NEQREGEX {
|
||||
|
|
|
@ -415,6 +415,8 @@ func (s *Shard) createSystemIterator(opt influxql.IteratorOptions) (influxql.Ite
|
|||
return NewFieldKeysIterator(s, opt)
|
||||
case "_measurements":
|
||||
return NewMeasurementIterator(s, opt)
|
||||
case "_series":
|
||||
return NewSeriesIterator(s, opt)
|
||||
case "_tagKeys":
|
||||
return NewTagKeysIterator(s, opt)
|
||||
case "_tags":
|
||||
|
@ -882,6 +884,82 @@ func (itr *MeasurementIterator) Next() *influxql.FloatPoint {
|
|||
}
|
||||
}
|
||||
|
||||
// seriesIterator emits series ids.
|
||||
type seriesIterator struct {
|
||||
keys []string // remaining series
|
||||
fields []string // fields to emit (key)
|
||||
}
|
||||
|
||||
// NewSeriesIterator returns a new instance of SeriesIterator.
|
||||
func NewSeriesIterator(sh *Shard, opt influxql.IteratorOptions) (influxql.Iterator, error) {
|
||||
// Retrieve a list of all measurements.
|
||||
mms := sh.index.Measurements()
|
||||
sort.Sort(mms)
|
||||
|
||||
// Only equality operators are allowed.
|
||||
var err error
|
||||
influxql.WalkFunc(opt.Condition, func(n influxql.Node) {
|
||||
switch n := n.(type) {
|
||||
case *influxql.BinaryExpr:
|
||||
switch n.Op {
|
||||
case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX,
|
||||
influxql.OR, influxql.AND:
|
||||
default:
|
||||
err = errors.New("invalid tag comparison operator")
|
||||
}
|
||||
}
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Generate a list of all series keys.
|
||||
keys := newStringSet()
|
||||
for _, mm := range mms {
|
||||
ids, err := mm.seriesIDsAllOrByExpr(opt.Condition)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, id := range ids {
|
||||
keys.add(mm.SeriesByID(id).Key)
|
||||
}
|
||||
}
|
||||
|
||||
return &seriesIterator{
|
||||
keys: keys.list(),
|
||||
fields: opt.Aux,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Close closes the iterator.
|
||||
func (itr *seriesIterator) Close() error { return nil }
|
||||
|
||||
// Next emits the next point in the iterator.
|
||||
func (itr *seriesIterator) Next() *influxql.FloatPoint {
|
||||
// If there are no more keys then return nil.
|
||||
if len(itr.keys) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Prepare auxiliary fields.
|
||||
aux := make([]interface{}, len(itr.fields))
|
||||
for i, f := range itr.fields {
|
||||
switch f {
|
||||
case "key":
|
||||
aux[i] = itr.keys[0]
|
||||
}
|
||||
}
|
||||
|
||||
// Return next key.
|
||||
p := &influxql.FloatPoint{
|
||||
Aux: aux,
|
||||
}
|
||||
itr.keys = itr.keys[1:]
|
||||
|
||||
return p
|
||||
}
|
||||
|
||||
// NewTagKeysIterator returns a new instance of TagKeysIterator.
|
||||
func NewTagKeysIterator(sh *Shard, opt influxql.IteratorOptions) (influxql.Iterator, error) {
|
||||
fn := func(m *Measurement) []string {
|
||||
|
|
|
@ -729,101 +729,6 @@ func (s *Store) ExecuteShowFieldKeysStatement(stmt *influxql.ShowFieldKeysStatem
|
|||
return rows, nil
|
||||
}
|
||||
|
||||
func (s *Store) ExecuteShowSeriesStatement(stmt *influxql.ShowSeriesStatement, database string) (models.Rows, error) {
|
||||
// NOTE(benbjohnson):
|
||||
// This function is temporarily moved here until reimplemented in the new query engine.
|
||||
|
||||
// Check for time in WHERE clause (not supported).
|
||||
if influxql.HasTimeExpr(stmt.Condition) {
|
||||
return nil, errors.New("SHOW SERIES doesn't support time in WHERE clause")
|
||||
}
|
||||
|
||||
// Find the database.
|
||||
db := s.DatabaseIndex(database)
|
||||
if db == nil {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// Expand regex expressions in the FROM clause.
|
||||
sources, err := s.ExpandSources(stmt.Sources)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Get the list of measurements we're interested in.
|
||||
measurements, err := measurementsFromSourcesOrDB(db, sources...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Create result struct that will be populated and returned.
|
||||
rows := make(models.Rows, 0, len(measurements))
|
||||
|
||||
// Loop through measurements to build result. One result row / measurement.
|
||||
for _, m := range measurements {
|
||||
var ids SeriesIDs
|
||||
var filters FilterExprs
|
||||
|
||||
if stmt.Condition != nil {
|
||||
// Get series IDs that match the WHERE clause.
|
||||
ids, filters, err = m.walkWhereForSeriesIds(stmt.Condition)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Delete boolean literal true filter expressions.
|
||||
filters.DeleteBoolLiteralTrues()
|
||||
|
||||
// Check for unsupported field filters.
|
||||
if filters.Len() > 0 {
|
||||
return nil, errors.New("SHOW SERIES doesn't support fields in WHERE clause")
|
||||
}
|
||||
|
||||
// If no series matched, then go to the next measurement.
|
||||
if len(ids) == 0 {
|
||||
continue
|
||||
}
|
||||
} else {
|
||||
// No WHERE clause so get all series IDs for this measurement.
|
||||
ids = m.seriesIDs
|
||||
}
|
||||
|
||||
// Make a new row for this measurement.
|
||||
r := &models.Row{
|
||||
Name: m.Name,
|
||||
Columns: m.TagKeys(),
|
||||
}
|
||||
|
||||
// Loop through series IDs getting matching tag sets.
|
||||
for _, id := range ids {
|
||||
if s, ok := m.seriesByID[id]; ok {
|
||||
values := make([]interface{}, 0, len(r.Columns))
|
||||
|
||||
// make the series key the first value
|
||||
values = append(values, s.Key)
|
||||
|
||||
for _, column := range r.Columns {
|
||||
values = append(values, s.Tags[column])
|
||||
}
|
||||
|
||||
// Add the tag values to the row.
|
||||
r.Values = append(r.Values, values)
|
||||
}
|
||||
}
|
||||
// make the id the first column
|
||||
r.Columns = append([]string{"_key"}, r.Columns...)
|
||||
|
||||
// Append the row.
|
||||
rows = append(rows, r)
|
||||
}
|
||||
|
||||
if stmt.Limit > 0 || stmt.Offset > 0 {
|
||||
rows = s.filterShowSeriesResult(stmt.Limit, stmt.Offset, rows)
|
||||
}
|
||||
|
||||
return rows, nil
|
||||
}
|
||||
|
||||
// filterShowSeriesResult will limit the number of series returned based on the limit and the offset.
|
||||
// Unlike limit and offset on SELECT statements, the limit and offset don't apply to the number of Rows, but
|
||||
// to the number of total Values returned, since each Value represents a unique series.
|
||||
|
|
Loading…
Reference in New Issue