feat: use count_hll for 'show series cardinality' queries (#20745)

Closes: https://github.com/influxdata/influxdb/issues/20614

Also fix nil pointer for seriesKey iterator

Fix for bug in: https://github.com/influxdata/influxdb/issues/20543

Also add a test for ingress metrics
pull/20739/head
Sam Arnold 2021-02-10 17:00:16 -04:00 committed by GitHub
parent 903b8cd0ea
commit de1a0eb2a9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 139 additions and 13 deletions

View File

@ -202,6 +202,30 @@ func rewriteShowSeriesCardinalityStatement(stmt *influxql.ShowSeriesCardinalityS
}
}
// do hll estimation for per-measurement queries
if !stmt.Exact && stmt.Dimensions == nil && stmt.Limit == 0 && stmt.Offset == 0 {
return &influxql.SelectStatement{
Fields: []*influxql.Field{
{
Expr: &influxql.Call{
Name: "count_hll",
Args: []influxql.Expr{&influxql.Call{
Name: "sum_hll",
Args: []influxql.Expr{&influxql.VarRef{Val: "_seriesKey"}},
}},
},
Alias: "cardinality estimation",
},
},
Sources: rewriteSources2(stmt.Sources, stmt.Database),
Condition: stmt.Condition,
Dimensions: stmt.Dimensions,
Offset: stmt.Offset,
Limit: stmt.Limit,
OmitTime: true,
}, nil
}
return &influxql.SelectStatement{
Fields: []*influxql.Field{
{

View File

@ -126,7 +126,7 @@ func TestRewriteStatement(t *testing.T) {
},
{
stmt: `SHOW SERIES CARDINALITY FROM m`,
s: `SELECT count(distinct(_seriesKey)) AS count FROM m`,
s: `SELECT count_hll(sum_hll(_seriesKey)) AS "cardinality estimation" FROM m`,
},
{
stmt: `SHOW SERIES EXACT CARDINALITY`,

View File

@ -23,6 +23,7 @@ import (
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/tsdb"
"github.com/prometheus/prometheus/prompb"
"github.com/stretchr/testify/assert"
)
// Global server used by benchmarks
@ -7382,37 +7383,37 @@ func TestServer_Query_ShowSeriesExactCardinality(t *testing.T) {
&Query{
name: `show series cardinality from measurement`,
command: "SHOW SERIES CARDINALITY FROM cpu",
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["count"],"values":[[4]]}]}]}`,
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["cardinality estimation"],"values":[[4]]}]}]}`,
params: url.Values{"db": []string{"db0"}},
},
&Query{
name: `show series cardinality from regular expression`,
command: "SHOW SERIES CARDINALITY FROM /[cg]pu/",
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["count"],"values":[[4]]},{"name":"gpu","columns":["count"],"values":[[2]]}]}]}`,
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["cardinality estimation"],"values":[[4]]},{"name":"gpu","columns":["cardinality estimation"],"values":[[2]]}]}]}`,
params: url.Values{"db": []string{"db0"}},
},
&Query{
name: `show series cardinality with where tag`,
command: "SHOW SERIES CARDINALITY WHERE region = 'uswest'",
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["count"],"values":[[1]]}]}]}`,
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["cardinality estimation"],"values":[[1]]}]}]}`,
params: url.Values{"db": []string{"db0"}},
},
&Query{
name: `show series cardinality where tag matches regular expression`,
command: "SHOW SERIES CARDINALITY WHERE region =~ /ca.*/",
exp: `{"results":[{"statement_id":0,"series":[{"name":"disk","columns":["count"],"values":[[1]]},{"name":"gpu","columns":["count"],"values":[[1]]}]}]}`,
exp: `{"results":[{"statement_id":0,"series":[{"name":"disk","columns":["cardinality estimation"],"values":[[1]]},{"name":"gpu","columns":["cardinality estimation"],"values":[[1]]}]}]}`,
params: url.Values{"db": []string{"db0"}},
},
&Query{
name: `show series cardinality`,
command: "SHOW SERIES CARDINALITY WHERE host !~ /server0[12]/",
exp: `{"results":[{"statement_id":0,"series":[{"name":"disk","columns":["count"],"values":[[1]]},{"name":"gpu","columns":["count"],"values":[[1]]}]}]}`,
exp: `{"results":[{"statement_id":0,"series":[{"name":"disk","columns":["cardinality estimation"],"values":[[1]]},{"name":"gpu","columns":["cardinality estimation"],"values":[[1]]}]}]}`,
params: url.Values{"db": []string{"db0"}},
},
&Query{
name: `show series cardinality with from and where`,
command: "SHOW SERIES CARDINALITY FROM cpu WHERE region = 'useast'",
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["count"],"values":[[2]]}]}]}`,
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["cardinality estimation"],"values":[[2]]}]}]}`,
params: url.Values{"db": []string{"db0"}},
},
&Query{
@ -7532,6 +7533,100 @@ func TestServer_Query_ShowStats(t *testing.T) {
}
}
func TestServer_Query_ShowIngressStats(t *testing.T) {
t.Parallel()
configWithIngress := func(measurement, login bool) *Config {
c := NewConfig()
c.Data.IngressMetricByMeasurement = measurement
c.Data.IngressMetricByLogin = login
return c
}
writes := strings.Join([]string{
fmt.Sprintf(`cpu,host=server01 value=100 %d`, mustParseTime(time.RFC3339Nano, "2009-11-10T23:00:02Z").UnixNano()),
fmt.Sprintf(`cpu,host=server01 value=100 %d`, mustParseTime(time.RFC3339Nano, "2009-11-10T23:00:03Z").UnixNano()),
fmt.Sprintf(`cpu,host=server01 value=100 %d`, mustParseTime(time.RFC3339Nano, "2009-11-10T23:00:04Z").UnixNano()),
fmt.Sprintf(`cpu,host=server02 value=100 %d`, mustParseTime(time.RFC3339Nano, "2009-11-10T23:00:05Z").UnixNano()),
fmt.Sprintf(`cpu,host=server02 value=100 %d`, mustParseTime(time.RFC3339Nano, "2009-11-10T23:00:06Z").UnixNano()),
fmt.Sprintf(`gpu,host=server02 value=100 %d`, mustParseTime(time.RFC3339Nano, "2009-11-10T23:00:06Z").UnixNano()),
}, "\n")
var testData = []struct {
name string
config *Config
expectLocalStore string
expectIngress string
}{
{
name: "no details",
config: configWithIngress(false, false),
expectLocalStore: `{"columns": ["pointsWritten", "seriesCreated", "valuesWritten"], "name":"localStore", "values": [[6,3,6]]}`,
expectIngress: "[]",
},
{
name: "just measurement",
config: configWithIngress(true, false),
expectLocalStore: `{"columns": ["pointsWritten", "seriesCreated", "valuesWritten"], "name":"localStore", "values": [[6,3,6]]}`,
expectIngress: `[{"columns":["pointsWritten","seriesCreated","valuesWritten"],"name":"ingress","tags":{"db":"db0","measurement":"cpu","rp":"rp0"},"values":[[5,2,5]]},{"columns":["pointsWritten","seriesCreated","valuesWritten"],"name":"ingress","tags":{"db":"db0","measurement":"gpu","rp":"rp0"},"values":[[1,1,1]]}]`,
},
{
// note we are unauthenticated, so the login user is '_systemuser_unknown'
name: "just login",
config: configWithIngress(false, true),
expectLocalStore: `{"columns": ["pointsWritten", "seriesCreated", "valuesWritten"], "name":"localStore", "values": [[6,3,6]]}`,
expectIngress: `[{"columns":["pointsWritten","seriesCreated","valuesWritten"],"name":"ingress","tags":{"login":"_systemuser_unknown"},"values":[[6,3,6]]}]`,
},
{
name: "measurement and login",
config: configWithIngress(true, true),
expectLocalStore: `{"columns": ["pointsWritten", "seriesCreated", "valuesWritten"], "name":"localStore", "values": [[6,3,6]]}`,
expectIngress: `[{"columns":["pointsWritten","seriesCreated","valuesWritten"],"name":"ingress","tags":{"db":"db0","login":"_systemuser_unknown","measurement":"cpu","rp":"rp0"},"values":[[5,2,5]]},{"columns":["pointsWritten","seriesCreated","valuesWritten"],"name":"ingress","tags":{"db":"db0","login":"_systemuser_unknown","measurement":"gpu","rp":"rp0"},"values":[[1,1,1]]}]`,
},
}
for _, data := range testData {
t.Run(data.name, func(t *testing.T) {
s := OpenServer(data.config)
defer s.Close()
if err := s.CreateDatabaseAndRetentionPolicy("db0", NewRetentionPolicySpec("rp0", 1, 0), true); err != nil {
t.Fatalf("Unexpected create database error: %v", err)
}
if _, err := s.Write("db0", "rp0", writes, url.Values{}); err != nil {
t.Fatalf("Unexpected write error: %v", err)
}
results, err := s.Query("show stats")
if err != nil {
t.Fatalf("stats query error: %v", err)
}
var v map[string]interface{}
var localStore map[string]interface{}
var ingress = make([]map[string]interface{}, 0)
json.Unmarshal([]byte(results), &v)
resultSeries := v["results"].([]interface{})[0].(map[string]interface{})["series"].([]interface{})
for _, series := range resultSeries {
seriesMap := series.(map[string]interface{})
name := seriesMap["name"].(string)
if name == "localStore" {
localStore = seriesMap
}
if name == "ingress" {
ingress = append(ingress, seriesMap)
}
}
mustJson := func(v interface{}) string {
b, err := json.Marshal(v)
if err != nil {
t.Fatalf("Unexpected json marshalling error: %v", err)
}
return string(b)
}
assert.JSONEq(t, data.expectLocalStore, mustJson(localStore))
assert.JSONEq(t, data.expectIngress, mustJson(ingress))
})
}
}
func TestServer_Query_ShowMeasurements(t *testing.T) {
t.Parallel()
s := OpenServer(NewConfig())

View File

@ -2506,7 +2506,10 @@ func (e *Engine) createSeriesIterator(measurement string, ref *influxql.VarRef,
return nil, err
}
var seriesIterator query.Iterator
seriesIterator = newSeriesIterator(measurement, seriesCursor)
seriesIterator, err = newSeriesIterator(measurement, seriesCursor)
if err != nil {
return nil, err
}
if opt.InterruptCh != nil {
seriesIterator = query.NewInterruptIterator(seriesIterator, opt.InterruptCh)
}

View File

@ -227,7 +227,10 @@ type seriesIterator struct {
statsBuf query.IteratorStats
}
func newSeriesIterator(name string, cur tsdb.SeriesKeyIterator) *seriesIterator {
func newSeriesIterator(name string, cur tsdb.SeriesKeyIterator) (*seriesIterator, error) {
if cur == nil {
return nil, fmt.Errorf("internal error: cannot create series iterator from nil iterator")
}
itr := &seriesIterator{
cur: cur,
point: query.StringPoint{
@ -236,7 +239,7 @@ func newSeriesIterator(name string, cur tsdb.SeriesKeyIterator) *seriesIterator
},
}
itr.stats = itr.statsBuf
return itr
return itr, nil
}
// Next returns the next point from the iterator.

View File

@ -1933,7 +1933,7 @@ type measurementSeriesKeyByExprIterator struct {
}
func (itr *measurementSeriesKeyByExprIterator) Next() ([]byte, error) {
if itr == nil {
if itr.ids == nil {
return nil, nil
}
for {
@ -1982,7 +1982,8 @@ func (itr *measurementSeriesKeyByExprIterator) Next() ([]byte, error) {
}
func (itr *measurementSeriesKeyByExprIterator) Close() error {
if itr == nil {
// assume that we don't have to release if ids is nil - see MeasurementSeriesKeyByExprIterator
if itr.ids == nil {
return nil
}
itr.once.Do(itr.releaser)
@ -2001,7 +2002,7 @@ func (is IndexSet) MeasurementSeriesKeyByExprIterator(name []byte, expr influxql
}
if ids == nil {
release()
return nil, nil
release = nil
}
return &measurementSeriesKeyByExprIterator{
ids: ids,