From 519e23b86a295e1df964000def6d5ebb23db6987 Mon Sep 17 00:00:00 2001 From: davidby-influx <72418212+davidby-influx@users.noreply.github.com> Date: Mon, 2 Aug 2021 11:11:39 -0700 Subject: [PATCH] feat: SHOW TAG VALUES should produce results from one specific RP (#21983) Ensure that the Sources field of the ShowTagValuesStatement is filled in. Then use the sources to limit the retention policies, and thus the shards from which tag values are collected. This fix only works on TSI databases; INMEM shards share indices, so restricting shard indices used does not restrict the tag values returned. This will not permit multiple retention policies to be specified in a query; either all RPs or one are permitted. Closes https://github.com/influxdata/influxdb/issues/21981 --- CHANGELOG.md | 1 + coordinator/statement_executor.go | 29 +++++++++++-- query/statement_rewriter.go | 1 + query/statement_rewriter_test.go | 6 ++- tests/server_test.go | 70 +++++++++++++++++++++++-------- 5 files changed, 86 insertions(+), 21 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6be40e1047..fb69436efe 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ - [#21707](https://github.com/influxdata/influxdb/pull/21707): chore: add logging to compaction - [#21752](https://github.com/influxdata/influxdb/pull/21752): feat: add total-buffer-bytes config parameter to subscriptions - [#21820](https://github.com/influxdata/influxdb/pull/21820): chore: update flux to v0.120.1 +- [#21983](https://github.com/influxdata/influxdb/pull/21983): feat: SHOW TAG VALUES should produce results from one specific RP ### Bugfixes diff --git a/coordinator/statement_executor.go b/coordinator/statement_executor.go index 105d755f2c..d3ca9679eb 100644 --- a/coordinator/statement_executor.go +++ b/coordinator/statement_executor.go @@ -1039,10 +1039,31 @@ func (e *StatementExecutor) executeShowTagValues(ctx *query.ExecutionContext, q return err } - // Get all shards for all retention policies. + // If measurements include retention policies + // only look at those policies + rps := make([]string, 0, len(di.RetentionPolicies)) + // Collect retention policies if specified + for _, m := range q.Sources.Measurements() { + if len(m.RetentionPolicy) > 0 { + rps = append(rps, m.RetentionPolicy) + } + } + // If no retention policies specified, use + // all retention policies + if len(rps) == 0 { + for _, rp := range di.RetentionPolicies { + rps = append(rps, rp.Name) + } + } else { + for _, rp := range rps { + if rp != rps[0] { + return fmt.Errorf("only one retention policy allowed in SHOW TAG VALUES query: \"%s\", \"%s\"", rp, rps[0]) + } + } + } var allGroups []meta.ShardGroupInfo - for _, rpi := range di.RetentionPolicies { - sgis, err := e.MetaClient.ShardGroupsByTimeRange(q.Database, rpi.Name, timeRange.MinTime(), timeRange.MaxTime()) + for _, rp := range rps { + sgis, err := e.MetaClient.ShardGroupsByTimeRange(q.Database, rp, timeRange.MinTime(), timeRange.MaxTime()) if err != nil { return err } @@ -1312,6 +1333,8 @@ func (e *StatementExecutor) NormalizeStatement(stmt influxql.Statement, defaultD case *influxql.DropSeriesStatement, *influxql.DeleteSeriesStatement: // DB and RP not supported by these statements so don't rewrite into invalid // statements + case *influxql.ShowTagValuesStatement: + // SHOW TAG VALUES should span multiple RPs if one is not specified. default: err = e.normalizeMeasurement(node, defaultDatabase, defaultRetentionPolicy) } diff --git a/query/statement_rewriter.go b/query/statement_rewriter.go index 71fe691e80..4c18b281de 100644 --- a/query/statement_rewriter.go +++ b/query/statement_rewriter.go @@ -300,6 +300,7 @@ func rewriteShowTagValuesStatement(stmt *influxql.ShowTagValuesStatement) (influ return &influxql.ShowTagValuesStatement{ Database: stmt.Database, + Sources: stmt.Sources, Op: stmt.Op, TagKeyExpr: stmt.TagKeyExpr, Condition: condition, diff --git a/query/statement_rewriter_test.go b/query/statement_rewriter_test.go index 1dca367ded..1d124bcc4c 100644 --- a/query/statement_rewriter_test.go +++ b/query/statement_rewriter_test.go @@ -302,7 +302,11 @@ func TestRewriteStatement(t *testing.T) { }, { stmt: `SHOW TAG VALUES FROM cpu WITH KEY = "region"`, - s: `SHOW TAG VALUES WITH KEY = region WHERE (_name = 'cpu') AND (_tagKey = 'region')`, + s: `SHOW TAG VALUES FROM cpu WITH KEY = region WHERE (_name = 'cpu') AND (_tagKey = 'region')`, + }, + { + stmt: `SHOW TAG VALUES FROM mydb.myrp1.cpu WITH KEY = "region"`, + s: `SHOW TAG VALUES FROM mydb.myrp1.cpu WITH KEY = region WHERE (_name = 'cpu') AND (_tagKey = 'region')`, }, { stmt: `SHOW TAG VALUES WITH KEY != "region"`, diff --git a/tests/server_test.go b/tests/server_test.go index 262be77487..24ed57f82f 100644 --- a/tests/server_test.go +++ b/tests/server_test.go @@ -8154,28 +8154,34 @@ func TestServer_Query_ShowTagKeys(t *testing.T) { func TestServer_Query_ShowTagValues(t *testing.T) { t.Parallel() - s := OpenServer(NewConfig()) + conf := NewConfig() + s := OpenServer(conf) defer s.Close() + rps := []string{"rp0", "rp1", "rp2", "rp3"} - if err := s.CreateDatabaseAndRetentionPolicy("db0", NewRetentionPolicySpec("rp0", 1, 0), true); err != nil { - t.Fatal(err) + writes := [][]string{ + []string{ + fmt.Sprintf(`cpu,host=server01 value=100 %d`, mustParseTime(time.RFC3339Nano, "2009-11-10T23:00:00Z").UnixNano()), + fmt.Sprintf(`cpu,host=server01,region=useast value=100 %d`, mustParseTime(time.RFC3339Nano, "2009-11-10T23:00:00Z").UnixNano()), + fmt.Sprintf(`cpu,host=server01,region=uswest value=100 %d`, mustParseTime(time.RFC3339Nano, "2009-11-10T23:00:00Z").UnixNano()), + }, + []string{ + fmt.Sprintf(`cpu,host=server02,region=useast value=100 %d`, mustParseTime(time.RFC3339Nano, "2009-11-10T23:00:00Z").UnixNano()), + fmt.Sprintf(`gpu,host=server02,region=useast value=100 %d`, mustParseTime(time.RFC3339Nano, "2009-11-10T23:00:00Z").UnixNano()), + fmt.Sprintf(`gpu,host=server03,region=caeast value=100 %d`, mustParseTime(time.RFC3339Nano, "2009-11-10T23:00:00Z").UnixNano()), + }, + []string{ + fmt.Sprintf(`disk,host=server03,region=caeast value=100 %d`, mustParseTime(time.RFC3339Nano, "2009-11-10T23:00:00Z").UnixNano()), + fmt.Sprintf(`_,__name__=metric1 value=100 %d`, mustParseTime(time.RFC3339Nano, "2009-11-10T23:00:00Z").UnixNano()), + fmt.Sprintf(`_,__name__=metric2 value=100 %d`, mustParseTime(time.RFC3339Nano, "2009-11-10T23:00:00Z").UnixNano()), + }, } - writes := []string{ - fmt.Sprintf(`cpu,host=server01 value=100 %d`, mustParseTime(time.RFC3339Nano, "2009-11-10T23:00:00Z").UnixNano()), - fmt.Sprintf(`cpu,host=server01,region=uswest value=100 %d`, mustParseTime(time.RFC3339Nano, "2009-11-10T23:00:00Z").UnixNano()), - fmt.Sprintf(`cpu,host=server01,region=useast value=100 %d`, mustParseTime(time.RFC3339Nano, "2009-11-10T23:00:00Z").UnixNano()), - fmt.Sprintf(`cpu,host=server02,region=useast value=100 %d`, mustParseTime(time.RFC3339Nano, "2009-11-10T23:00:00Z").UnixNano()), - fmt.Sprintf(`gpu,host=server02,region=useast value=100 %d`, mustParseTime(time.RFC3339Nano, "2009-11-10T23:00:00Z").UnixNano()), - fmt.Sprintf(`gpu,host=server03,region=caeast value=100 %d`, mustParseTime(time.RFC3339Nano, "2009-11-10T23:00:00Z").UnixNano()), - fmt.Sprintf(`disk,host=server03,region=caeast value=100 %d`, mustParseTime(time.RFC3339Nano, "2009-11-10T23:00:00Z").UnixNano()), - fmt.Sprintf(`_,__name__=metric1 value=100 %d`, mustParseTime(time.RFC3339Nano, "2009-11-10T23:00:00Z").UnixNano()), - fmt.Sprintf(`_,__name__=metric2 value=100 %d`, mustParseTime(time.RFC3339Nano, "2009-11-10T23:00:00Z").UnixNano()), - } - - test := NewTest("db0", "rp0") + test := NewTest("db0", rps[0]) test.writes = Writes{ - &Write{data: strings.Join(writes, "\n")}, + &Write{db: test.database(), rp: rps[0], data: strings.Join(writes[0], "\n")}, + &Write{db: test.database(), rp: rps[1], data: strings.Join(writes[1], "\n")}, + &Write{db: test.database(), rp: rps[2], data: strings.Join(writes[2], "\n")}, } test.addQueries([]*Query{ @@ -8335,8 +8341,38 @@ func TestServer_Query_ShowTagValues(t *testing.T) { exp: `{"results":[{"statement_id":0}]}`, params: url.Values{"db": []string{"db0"}}, }, + &Query{ + name: "show tag values with multiple retention policies", + command: `SHOW TAG VALUES FROM ` + rps[0] + `.cpu, ` + rps[1] + `.cpu WITH KEY IN (host, region)`, + exp: `{"results":[{"statement_id":0,"error":"only one retention policy allowed in SHOW TAG VALUES query: \"rp1\", \"rp0\""}]}`, + params: url.Values{"db": []string{"db0"}}, + }, }...) + // Retention policy filtration of tag values only works on TSI + if conf.Data.Index != tsdb.InmemIndexName { + test.addQueries([]*Query{ + &Query{ + name: `show tag values from retention policy zero with key in and where does not match the regular expression`, + command: `SHOW TAG VALUES FROM ` + rps[0] + `.cpu WITH KEY IN (host, region) WHERE region = 'useast'`, + exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["key","value"],"values":[["host","server01"],["region","useast"]]}]}]}`, + params: url.Values{"db": []string{"db0"}}, + }, + &Query{ + name: `show tag values from retention policy one with key in and where does not match the regular expression`, + command: `SHOW TAG VALUES FROM ` + rps[1] + `.cpu WITH KEY IN (host, region) WHERE region = 'useast'`, + exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["key","value"],"values":[["host","server02"],["region","useast"]]}]}]}`, + params: url.Values{"db": []string{"db0"}}, + }, + &Query{ + name: `show tag values from retention policy with key and measurement matches regular expression`, + command: `SHOW TAG VALUES FROM ` + rps[1] + `./[cg]pu/ WITH KEY = host`, + exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["key","value"],"values":[["host","server02"]]},{"name":"gpu","columns":["key","value"],"values":[["host","server02"],["host","server03"]]}]}]}`, + params: url.Values{"db": []string{"db0"}}, + }, + }...) + } + var once sync.Once for _, query := range test.queries { t.Run(query.name, func(t *testing.T) {