feat: flux query profiler (#19359)

pull/19443/head
Yiqun (Ethan) Zhang 2020-08-25 18:00:02 -05:00 committed by GitHub
parent 56ac8b2762
commit 6f805cbc2b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 215 additions and 4 deletions

View File

@ -21,6 +21,7 @@ import (
"github.com/influxdata/flux/execute/executetest"
"github.com/influxdata/flux/execute/table"
"github.com/influxdata/flux/lang"
"github.com/influxdata/flux/memory"
"github.com/influxdata/flux/runtime"
"github.com/influxdata/flux/values"
"github.com/influxdata/influxdb/v2"
@ -752,6 +753,193 @@ from(bucket: "%s")
}
}
type TestQueryProfiler struct{
start int64
}
func (s TestQueryProfiler) Name() string {
return fmt.Sprintf("query%d", s.start)
}
func (s TestQueryProfiler) GetResult(q flux.Query, alloc *memory.Allocator) (flux.Table, error) {
groupKey := execute.NewGroupKey(
[]flux.ColMeta{
{
Label: "_measurement",
Type: flux.TString,
},
},
[]values.Value{
values.NewString(fmt.Sprintf("profiler/query%d", s.start)),
},
)
b := execute.NewColListTableBuilder(groupKey, alloc)
colMeta := []flux.ColMeta{
{
Label: "_measurement",
Type: flux.TString,
},
{
Label: "TotalDuration",
Type: flux.TInt,
},
{
Label: "CompileDuration",
Type: flux.TInt,
},
{
Label: "QueueDuration",
Type: flux.TInt,
},
{
Label: "PlanDuration",
Type: flux.TInt,
},
{
Label: "RequeueDuration",
Type: flux.TInt,
},
{
Label: "ExecuteDuration",
Type: flux.TInt,
},
{
Label: "Concurrency",
Type: flux.TInt,
},
{
Label: "MaxAllocated",
Type: flux.TInt,
},
{
Label: "TotalAllocated",
Type: flux.TInt,
},
{
Label: "RuntimeErrors",
Type: flux.TString,
},
{
Label: "influxdb/scanned-bytes",
Type: flux.TInt,
},
{
Label: "influxdb/scanned-values",
Type: flux.TInt,
},
{
Label: "flux/query-plan",
Type: flux.TString,
},
}
colData := []interface{} {
fmt.Sprintf("profiler/query%d", s.start),
s.start,
s.start + 1,
s.start + 2,
s.start + 3,
s.start + 4,
s.start + 5,
s.start + 6,
s.start + 7,
s.start + 8,
"error1\nerror2",
s.start + 9,
s.start + 10,
"query plan",
}
for _, col := range colMeta {
if _, err := b.AddCol(col); err != nil {
return nil, err
}
}
for i := 0; i < len(colData); i++ {
if intValue, ok := colData[i].(int64); ok {
b.AppendInt(i, intValue)
} else {
b.AppendString(i, colData[i].(string))
}
}
tbl, err := b.Table()
if err != nil {
return nil, err
}
return tbl, nil
}
func TestFluxProfiler(t *testing.T) {
testcases := []struct {
name string
data []string
query string
want string
}{
{
name: "range last single point start time",
data: []string{
"m,tag=a f=1i 1",
},
query: `
option profiler.enabledProfilers = ["query0", "query100", "query100", "NonExistentProfiler"]
from(bucket: v.bucket)
|> range(start: 1970-01-01T00:00:00.000000001Z, stop: 1970-01-01T01:00:00Z)
|> last()
`,
want: `
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,long,string,string,string
#group,false,false,true,true,false,false,true,true,true
#default,_result,,,,,,,,
,result,table,_start,_stop,_time,_value,_field,_measurement,tag
,,0,1970-01-01T00:00:00.000000001Z,1970-01-01T01:00:00Z,1970-01-01T00:00:00.000000001Z,1,f,m,a
#datatype,string,long,string,long,long,long,long,long,long,long,long,long,string,string,long,long
#group,false,false,true,false,false,false,false,false,false,false,false,false,false,false,false,false
#default,_profiler,,,,,,,,,,,,,,,
,result,table,_measurement,TotalDuration,CompileDuration,QueueDuration,PlanDuration,RequeueDuration,ExecuteDuration,Concurrency,MaxAllocated,TotalAllocated,RuntimeErrors,flux/query-plan,influxdb/scanned-bytes,influxdb/scanned-values
,,0,profiler/query0,0,1,2,3,4,5,6,7,8,"error1
error2","query plan",9,10
,,1,profiler/query100,100,101,102,103,104,105,106,107,108,"error1
error2","query plan",109,110
`,
},
}
execute.RegisterProfilers(&TestQueryProfiler{}, &TestQueryProfiler{start: 100})
for _, tc := range testcases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
l := launcher.RunTestLauncherOrFail(t, ctx, nil)
l.SetupOrFail(t)
defer l.ShutdownOrFail(t, ctx)
l.WritePointsOrFail(t, strings.Join(tc.data, "\n"))
queryStr := "import \"profiler\"\nv = {bucket: " + "\"" + l.Bucket.Name + "\"" + "}\n" + tc.query
req := &query.Request{
Authorization: l.Auth,
OrganizationID: l.Org.ID,
Compiler: lang.FluxCompiler{
Query: queryStr,
},
}
if got, err := l.FluxQueryService().Query(ctx, req); err != nil {
t.Error(err)
} else {
dec := csv.NewMultiResultDecoder(csv.ResultDecoderConfig{})
want, err := dec.Decode(ioutil.NopCloser(strings.NewReader(tc.want)))
if err != nil {
t.Fatal(err)
}
defer want.Release()
if err := executetest.EqualResultIterators(want, got); err != nil {
t.Fatal(err)
}
}
})
}
}
func TestQueryPushDowns(t *testing.T) {
testcases := []struct {
name string

View File

@ -149,6 +149,15 @@ func (b ProxyQueryServiceAsyncBridge) Query(ctx context.Context, w io.Writer, re
if err != nil {
return stats, tracing.LogError(span, err)
}
if results, err := q.ProfilerResults(); err != nil {
return stats, tracing.LogError(span, err)
} else if results != nil {
_, err = encoder.Encode(w, results)
if err != nil {
return stats, tracing.LogError(span, err)
}
}
return stats, nil
}

View File

@ -26,6 +26,7 @@ import (
"github.com/influxdata/flux"
"github.com/influxdata/flux/codes"
"github.com/influxdata/flux/execute/table"
"github.com/influxdata/flux/lang"
"github.com/influxdata/flux/memory"
"github.com/influxdata/flux/runtime"
@ -548,6 +549,23 @@ type Query struct {
alloc *memory.Allocator
}
func (q *Query) ProfilerResults() (flux.ResultIterator, error) {
p := q.program.(*lang.AstProgram)
if len(p.Profilers) == 0 {
return nil, nil
}
tables := make([]flux.Table, 0)
for _, profiler := range p.Profilers {
if result, err := profiler.GetResult(q, q.alloc); err != nil {
return nil, err
} else {
tables = append(tables, result)
}
}
res := table.NewProfilerResult(tables...)
return flux.NewSliceResultIterator([]flux.Result{&res}), nil
}
// ID reports an ephemeral unique ID for the query.
func (q *Query) ID() QueryID {
return q.id
@ -572,10 +590,6 @@ func (q *Query) Results() <-chan flux.Result {
return q.results
}
func (q *Query) ProfilerResults() (flux.ResultIterator, error) {
return nil, nil
}
func (q *Query) recordUnusedMemory() {
unused := q.c.GetUnusedMemoryBytes()
q.c.metrics.memoryUnused.WithLabelValues(q.labelValues...).Set(float64(unused))