feat: SHOW TAG VALUES should produce results from one specific RP (#21983)
Ensure that the Sources field of the ShowTagValuesStatement is filled in. Then use the sources to limit the retention policies, and thus the shards from which tag values are collected. This fix only works on TSI databases; INMEM shards share indices, so restricting shard indices used does not restrict the tag values returned. This will not permit multiple retention policies to be specified in a query; either all RPs or one are permitted. Closes https://github.com/influxdata/influxdb/issues/21981pull/22020/head
parent
e62efaf751
commit
519e23b86a
|
@ -5,6 +5,7 @@
|
|||
- [#21707](https://github.com/influxdata/influxdb/pull/21707): chore: add logging to compaction
|
||||
- [#21752](https://github.com/influxdata/influxdb/pull/21752): feat: add total-buffer-bytes config parameter to subscriptions
|
||||
- [#21820](https://github.com/influxdata/influxdb/pull/21820): chore: update flux to v0.120.1
|
||||
- [#21983](https://github.com/influxdata/influxdb/pull/21983): feat: SHOW TAG VALUES should produce results from one specific RP
|
||||
|
||||
### Bugfixes
|
||||
|
||||
|
|
|
@ -1039,10 +1039,31 @@ func (e *StatementExecutor) executeShowTagValues(ctx *query.ExecutionContext, q
|
|||
return err
|
||||
}
|
||||
|
||||
// Get all shards for all retention policies.
|
||||
// If measurements include retention policies
|
||||
// only look at those policies
|
||||
rps := make([]string, 0, len(di.RetentionPolicies))
|
||||
// Collect retention policies if specified
|
||||
for _, m := range q.Sources.Measurements() {
|
||||
if len(m.RetentionPolicy) > 0 {
|
||||
rps = append(rps, m.RetentionPolicy)
|
||||
}
|
||||
}
|
||||
// If no retention policies specified, use
|
||||
// all retention policies
|
||||
if len(rps) == 0 {
|
||||
for _, rp := range di.RetentionPolicies {
|
||||
rps = append(rps, rp.Name)
|
||||
}
|
||||
} else {
|
||||
for _, rp := range rps {
|
||||
if rp != rps[0] {
|
||||
return fmt.Errorf("only one retention policy allowed in SHOW TAG VALUES query: \"%s\", \"%s\"", rp, rps[0])
|
||||
}
|
||||
}
|
||||
}
|
||||
var allGroups []meta.ShardGroupInfo
|
||||
for _, rpi := range di.RetentionPolicies {
|
||||
sgis, err := e.MetaClient.ShardGroupsByTimeRange(q.Database, rpi.Name, timeRange.MinTime(), timeRange.MaxTime())
|
||||
for _, rp := range rps {
|
||||
sgis, err := e.MetaClient.ShardGroupsByTimeRange(q.Database, rp, timeRange.MinTime(), timeRange.MaxTime())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -1312,6 +1333,8 @@ func (e *StatementExecutor) NormalizeStatement(stmt influxql.Statement, defaultD
|
|||
case *influxql.DropSeriesStatement, *influxql.DeleteSeriesStatement:
|
||||
// DB and RP not supported by these statements so don't rewrite into invalid
|
||||
// statements
|
||||
case *influxql.ShowTagValuesStatement:
|
||||
// SHOW TAG VALUES should span multiple RPs if one is not specified.
|
||||
default:
|
||||
err = e.normalizeMeasurement(node, defaultDatabase, defaultRetentionPolicy)
|
||||
}
|
||||
|
|
|
@ -300,6 +300,7 @@ func rewriteShowTagValuesStatement(stmt *influxql.ShowTagValuesStatement) (influ
|
|||
|
||||
return &influxql.ShowTagValuesStatement{
|
||||
Database: stmt.Database,
|
||||
Sources: stmt.Sources,
|
||||
Op: stmt.Op,
|
||||
TagKeyExpr: stmt.TagKeyExpr,
|
||||
Condition: condition,
|
||||
|
|
|
@ -302,7 +302,11 @@ func TestRewriteStatement(t *testing.T) {
|
|||
},
|
||||
{
|
||||
stmt: `SHOW TAG VALUES FROM cpu WITH KEY = "region"`,
|
||||
s: `SHOW TAG VALUES WITH KEY = region WHERE (_name = 'cpu') AND (_tagKey = 'region')`,
|
||||
s: `SHOW TAG VALUES FROM cpu WITH KEY = region WHERE (_name = 'cpu') AND (_tagKey = 'region')`,
|
||||
},
|
||||
{
|
||||
stmt: `SHOW TAG VALUES FROM mydb.myrp1.cpu WITH KEY = "region"`,
|
||||
s: `SHOW TAG VALUES FROM mydb.myrp1.cpu WITH KEY = region WHERE (_name = 'cpu') AND (_tagKey = 'region')`,
|
||||
},
|
||||
{
|
||||
stmt: `SHOW TAG VALUES WITH KEY != "region"`,
|
||||
|
|
|
@ -8154,28 +8154,34 @@ func TestServer_Query_ShowTagKeys(t *testing.T) {
|
|||
|
||||
func TestServer_Query_ShowTagValues(t *testing.T) {
|
||||
t.Parallel()
|
||||
s := OpenServer(NewConfig())
|
||||
conf := NewConfig()
|
||||
s := OpenServer(conf)
|
||||
defer s.Close()
|
||||
rps := []string{"rp0", "rp1", "rp2", "rp3"}
|
||||
|
||||
if err := s.CreateDatabaseAndRetentionPolicy("db0", NewRetentionPolicySpec("rp0", 1, 0), true); err != nil {
|
||||
t.Fatal(err)
|
||||
writes := [][]string{
|
||||
[]string{
|
||||
fmt.Sprintf(`cpu,host=server01 value=100 %d`, mustParseTime(time.RFC3339Nano, "2009-11-10T23:00:00Z").UnixNano()),
|
||||
fmt.Sprintf(`cpu,host=server01,region=useast value=100 %d`, mustParseTime(time.RFC3339Nano, "2009-11-10T23:00:00Z").UnixNano()),
|
||||
fmt.Sprintf(`cpu,host=server01,region=uswest value=100 %d`, mustParseTime(time.RFC3339Nano, "2009-11-10T23:00:00Z").UnixNano()),
|
||||
},
|
||||
[]string{
|
||||
fmt.Sprintf(`cpu,host=server02,region=useast value=100 %d`, mustParseTime(time.RFC3339Nano, "2009-11-10T23:00:00Z").UnixNano()),
|
||||
fmt.Sprintf(`gpu,host=server02,region=useast value=100 %d`, mustParseTime(time.RFC3339Nano, "2009-11-10T23:00:00Z").UnixNano()),
|
||||
fmt.Sprintf(`gpu,host=server03,region=caeast value=100 %d`, mustParseTime(time.RFC3339Nano, "2009-11-10T23:00:00Z").UnixNano()),
|
||||
},
|
||||
[]string{
|
||||
fmt.Sprintf(`disk,host=server03,region=caeast value=100 %d`, mustParseTime(time.RFC3339Nano, "2009-11-10T23:00:00Z").UnixNano()),
|
||||
fmt.Sprintf(`_,__name__=metric1 value=100 %d`, mustParseTime(time.RFC3339Nano, "2009-11-10T23:00:00Z").UnixNano()),
|
||||
fmt.Sprintf(`_,__name__=metric2 value=100 %d`, mustParseTime(time.RFC3339Nano, "2009-11-10T23:00:00Z").UnixNano()),
|
||||
},
|
||||
}
|
||||
|
||||
writes := []string{
|
||||
fmt.Sprintf(`cpu,host=server01 value=100 %d`, mustParseTime(time.RFC3339Nano, "2009-11-10T23:00:00Z").UnixNano()),
|
||||
fmt.Sprintf(`cpu,host=server01,region=uswest value=100 %d`, mustParseTime(time.RFC3339Nano, "2009-11-10T23:00:00Z").UnixNano()),
|
||||
fmt.Sprintf(`cpu,host=server01,region=useast value=100 %d`, mustParseTime(time.RFC3339Nano, "2009-11-10T23:00:00Z").UnixNano()),
|
||||
fmt.Sprintf(`cpu,host=server02,region=useast value=100 %d`, mustParseTime(time.RFC3339Nano, "2009-11-10T23:00:00Z").UnixNano()),
|
||||
fmt.Sprintf(`gpu,host=server02,region=useast value=100 %d`, mustParseTime(time.RFC3339Nano, "2009-11-10T23:00:00Z").UnixNano()),
|
||||
fmt.Sprintf(`gpu,host=server03,region=caeast value=100 %d`, mustParseTime(time.RFC3339Nano, "2009-11-10T23:00:00Z").UnixNano()),
|
||||
fmt.Sprintf(`disk,host=server03,region=caeast value=100 %d`, mustParseTime(time.RFC3339Nano, "2009-11-10T23:00:00Z").UnixNano()),
|
||||
fmt.Sprintf(`_,__name__=metric1 value=100 %d`, mustParseTime(time.RFC3339Nano, "2009-11-10T23:00:00Z").UnixNano()),
|
||||
fmt.Sprintf(`_,__name__=metric2 value=100 %d`, mustParseTime(time.RFC3339Nano, "2009-11-10T23:00:00Z").UnixNano()),
|
||||
}
|
||||
|
||||
test := NewTest("db0", "rp0")
|
||||
test := NewTest("db0", rps[0])
|
||||
test.writes = Writes{
|
||||
&Write{data: strings.Join(writes, "\n")},
|
||||
&Write{db: test.database(), rp: rps[0], data: strings.Join(writes[0], "\n")},
|
||||
&Write{db: test.database(), rp: rps[1], data: strings.Join(writes[1], "\n")},
|
||||
&Write{db: test.database(), rp: rps[2], data: strings.Join(writes[2], "\n")},
|
||||
}
|
||||
|
||||
test.addQueries([]*Query{
|
||||
|
@ -8335,8 +8341,38 @@ func TestServer_Query_ShowTagValues(t *testing.T) {
|
|||
exp: `{"results":[{"statement_id":0}]}`,
|
||||
params: url.Values{"db": []string{"db0"}},
|
||||
},
|
||||
&Query{
|
||||
name: "show tag values with multiple retention policies",
|
||||
command: `SHOW TAG VALUES FROM ` + rps[0] + `.cpu, ` + rps[1] + `.cpu WITH KEY IN (host, region)`,
|
||||
exp: `{"results":[{"statement_id":0,"error":"only one retention policy allowed in SHOW TAG VALUES query: \"rp1\", \"rp0\""}]}`,
|
||||
params: url.Values{"db": []string{"db0"}},
|
||||
},
|
||||
}...)
|
||||
|
||||
// Retention policy filtration of tag values only works on TSI
|
||||
if conf.Data.Index != tsdb.InmemIndexName {
|
||||
test.addQueries([]*Query{
|
||||
&Query{
|
||||
name: `show tag values from retention policy zero with key in and where does not match the regular expression`,
|
||||
command: `SHOW TAG VALUES FROM ` + rps[0] + `.cpu WITH KEY IN (host, region) WHERE region = 'useast'`,
|
||||
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["key","value"],"values":[["host","server01"],["region","useast"]]}]}]}`,
|
||||
params: url.Values{"db": []string{"db0"}},
|
||||
},
|
||||
&Query{
|
||||
name: `show tag values from retention policy one with key in and where does not match the regular expression`,
|
||||
command: `SHOW TAG VALUES FROM ` + rps[1] + `.cpu WITH KEY IN (host, region) WHERE region = 'useast'`,
|
||||
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["key","value"],"values":[["host","server02"],["region","useast"]]}]}]}`,
|
||||
params: url.Values{"db": []string{"db0"}},
|
||||
},
|
||||
&Query{
|
||||
name: `show tag values from retention policy with key and measurement matches regular expression`,
|
||||
command: `SHOW TAG VALUES FROM ` + rps[1] + `./[cg]pu/ WITH KEY = host`,
|
||||
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["key","value"],"values":[["host","server02"]]},{"name":"gpu","columns":["key","value"],"values":[["host","server02"],["host","server03"]]}]}]}`,
|
||||
params: url.Values{"db": []string{"db0"}},
|
||||
},
|
||||
}...)
|
||||
}
|
||||
|
||||
var once sync.Once
|
||||
for _, query := range test.queries {
|
||||
t.Run(query.name, func(t *testing.T) {
|
||||
|
|
Loading…
Reference in New Issue