feat: flux query profiler (#19359)
parent
56ac8b2762
commit
6f805cbc2b
|
@ -21,6 +21,7 @@ import (
|
|||
"github.com/influxdata/flux/execute/executetest"
|
||||
"github.com/influxdata/flux/execute/table"
|
||||
"github.com/influxdata/flux/lang"
|
||||
"github.com/influxdata/flux/memory"
|
||||
"github.com/influxdata/flux/runtime"
|
||||
"github.com/influxdata/flux/values"
|
||||
"github.com/influxdata/influxdb/v2"
|
||||
|
@ -752,6 +753,193 @@ from(bucket: "%s")
|
|||
}
|
||||
}
|
||||
|
||||
type TestQueryProfiler struct{
|
||||
start int64
|
||||
}
|
||||
|
||||
func (s TestQueryProfiler) Name() string {
|
||||
return fmt.Sprintf("query%d", s.start)
|
||||
}
|
||||
|
||||
func (s TestQueryProfiler) GetResult(q flux.Query, alloc *memory.Allocator) (flux.Table, error) {
|
||||
groupKey := execute.NewGroupKey(
|
||||
[]flux.ColMeta{
|
||||
{
|
||||
Label: "_measurement",
|
||||
Type: flux.TString,
|
||||
},
|
||||
},
|
||||
[]values.Value{
|
||||
values.NewString(fmt.Sprintf("profiler/query%d", s.start)),
|
||||
},
|
||||
)
|
||||
b := execute.NewColListTableBuilder(groupKey, alloc)
|
||||
colMeta := []flux.ColMeta{
|
||||
{
|
||||
Label: "_measurement",
|
||||
Type: flux.TString,
|
||||
},
|
||||
{
|
||||
Label: "TotalDuration",
|
||||
Type: flux.TInt,
|
||||
},
|
||||
{
|
||||
Label: "CompileDuration",
|
||||
Type: flux.TInt,
|
||||
},
|
||||
{
|
||||
Label: "QueueDuration",
|
||||
Type: flux.TInt,
|
||||
},
|
||||
{
|
||||
Label: "PlanDuration",
|
||||
Type: flux.TInt,
|
||||
},
|
||||
{
|
||||
Label: "RequeueDuration",
|
||||
Type: flux.TInt,
|
||||
},
|
||||
{
|
||||
Label: "ExecuteDuration",
|
||||
Type: flux.TInt,
|
||||
},
|
||||
{
|
||||
Label: "Concurrency",
|
||||
Type: flux.TInt,
|
||||
},
|
||||
{
|
||||
Label: "MaxAllocated",
|
||||
Type: flux.TInt,
|
||||
},
|
||||
{
|
||||
Label: "TotalAllocated",
|
||||
Type: flux.TInt,
|
||||
},
|
||||
{
|
||||
Label: "RuntimeErrors",
|
||||
Type: flux.TString,
|
||||
},
|
||||
{
|
||||
Label: "influxdb/scanned-bytes",
|
||||
Type: flux.TInt,
|
||||
},
|
||||
{
|
||||
Label: "influxdb/scanned-values",
|
||||
Type: flux.TInt,
|
||||
},
|
||||
{
|
||||
Label: "flux/query-plan",
|
||||
Type: flux.TString,
|
||||
},
|
||||
}
|
||||
colData := []interface{} {
|
||||
fmt.Sprintf("profiler/query%d", s.start),
|
||||
s.start,
|
||||
s.start + 1,
|
||||
s.start + 2,
|
||||
s.start + 3,
|
||||
s.start + 4,
|
||||
s.start + 5,
|
||||
s.start + 6,
|
||||
s.start + 7,
|
||||
s.start + 8,
|
||||
"error1\nerror2",
|
||||
s.start + 9,
|
||||
s.start + 10,
|
||||
"query plan",
|
||||
}
|
||||
for _, col := range colMeta {
|
||||
if _, err := b.AddCol(col); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
for i := 0; i < len(colData); i++ {
|
||||
if intValue, ok := colData[i].(int64); ok {
|
||||
b.AppendInt(i, intValue)
|
||||
} else {
|
||||
b.AppendString(i, colData[i].(string))
|
||||
}
|
||||
}
|
||||
tbl, err := b.Table()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return tbl, nil
|
||||
}
|
||||
|
||||
func TestFluxProfiler(t *testing.T) {
|
||||
testcases := []struct {
|
||||
name string
|
||||
data []string
|
||||
query string
|
||||
want string
|
||||
}{
|
||||
{
|
||||
name: "range last single point start time",
|
||||
data: []string{
|
||||
"m,tag=a f=1i 1",
|
||||
},
|
||||
query: `
|
||||
option profiler.enabledProfilers = ["query0", "query100", "query100", "NonExistentProfiler"]
|
||||
from(bucket: v.bucket)
|
||||
|> range(start: 1970-01-01T00:00:00.000000001Z, stop: 1970-01-01T01:00:00Z)
|
||||
|> last()
|
||||
`,
|
||||
want: `
|
||||
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,long,string,string,string
|
||||
#group,false,false,true,true,false,false,true,true,true
|
||||
#default,_result,,,,,,,,
|
||||
,result,table,_start,_stop,_time,_value,_field,_measurement,tag
|
||||
,,0,1970-01-01T00:00:00.000000001Z,1970-01-01T01:00:00Z,1970-01-01T00:00:00.000000001Z,1,f,m,a
|
||||
|
||||
#datatype,string,long,string,long,long,long,long,long,long,long,long,long,string,string,long,long
|
||||
#group,false,false,true,false,false,false,false,false,false,false,false,false,false,false,false,false
|
||||
#default,_profiler,,,,,,,,,,,,,,,
|
||||
,result,table,_measurement,TotalDuration,CompileDuration,QueueDuration,PlanDuration,RequeueDuration,ExecuteDuration,Concurrency,MaxAllocated,TotalAllocated,RuntimeErrors,flux/query-plan,influxdb/scanned-bytes,influxdb/scanned-values
|
||||
,,0,profiler/query0,0,1,2,3,4,5,6,7,8,"error1
|
||||
error2","query plan",9,10
|
||||
,,1,profiler/query100,100,101,102,103,104,105,106,107,108,"error1
|
||||
error2","query plan",109,110
|
||||
`,
|
||||
},
|
||||
}
|
||||
execute.RegisterProfilers(&TestQueryProfiler{}, &TestQueryProfiler{start: 100})
|
||||
for _, tc := range testcases {
|
||||
tc := tc
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
l := launcher.RunTestLauncherOrFail(t, ctx, nil)
|
||||
|
||||
l.SetupOrFail(t)
|
||||
defer l.ShutdownOrFail(t, ctx)
|
||||
|
||||
l.WritePointsOrFail(t, strings.Join(tc.data, "\n"))
|
||||
|
||||
queryStr := "import \"profiler\"\nv = {bucket: " + "\"" + l.Bucket.Name + "\"" + "}\n" + tc.query
|
||||
req := &query.Request{
|
||||
Authorization: l.Auth,
|
||||
OrganizationID: l.Org.ID,
|
||||
Compiler: lang.FluxCompiler{
|
||||
Query: queryStr,
|
||||
},
|
||||
}
|
||||
if got, err := l.FluxQueryService().Query(ctx, req); err != nil {
|
||||
t.Error(err)
|
||||
} else {
|
||||
dec := csv.NewMultiResultDecoder(csv.ResultDecoderConfig{})
|
||||
want, err := dec.Decode(ioutil.NopCloser(strings.NewReader(tc.want)))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer want.Release()
|
||||
|
||||
if err := executetest.EqualResultIterators(want, got); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestQueryPushDowns(t *testing.T) {
|
||||
testcases := []struct {
|
||||
name string
|
||||
|
|
|
@ -149,6 +149,15 @@ func (b ProxyQueryServiceAsyncBridge) Query(ctx context.Context, w io.Writer, re
|
|||
if err != nil {
|
||||
return stats, tracing.LogError(span, err)
|
||||
}
|
||||
|
||||
if results, err := q.ProfilerResults(); err != nil {
|
||||
return stats, tracing.LogError(span, err)
|
||||
} else if results != nil {
|
||||
_, err = encoder.Encode(w, results)
|
||||
if err != nil {
|
||||
return stats, tracing.LogError(span, err)
|
||||
}
|
||||
}
|
||||
return stats, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -26,6 +26,7 @@ import (
|
|||
|
||||
"github.com/influxdata/flux"
|
||||
"github.com/influxdata/flux/codes"
|
||||
"github.com/influxdata/flux/execute/table"
|
||||
"github.com/influxdata/flux/lang"
|
||||
"github.com/influxdata/flux/memory"
|
||||
"github.com/influxdata/flux/runtime"
|
||||
|
@ -548,6 +549,23 @@ type Query struct {
|
|||
alloc *memory.Allocator
|
||||
}
|
||||
|
||||
func (q *Query) ProfilerResults() (flux.ResultIterator, error) {
|
||||
p := q.program.(*lang.AstProgram)
|
||||
if len(p.Profilers) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
tables := make([]flux.Table, 0)
|
||||
for _, profiler := range p.Profilers {
|
||||
if result, err := profiler.GetResult(q, q.alloc); err != nil {
|
||||
return nil, err
|
||||
} else {
|
||||
tables = append(tables, result)
|
||||
}
|
||||
}
|
||||
res := table.NewProfilerResult(tables...)
|
||||
return flux.NewSliceResultIterator([]flux.Result{&res}), nil
|
||||
}
|
||||
|
||||
// ID reports an ephemeral unique ID for the query.
|
||||
func (q *Query) ID() QueryID {
|
||||
return q.id
|
||||
|
@ -572,10 +590,6 @@ func (q *Query) Results() <-chan flux.Result {
|
|||
return q.results
|
||||
}
|
||||
|
||||
func (q *Query) ProfilerResults() (flux.ResultIterator, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (q *Query) recordUnusedMemory() {
|
||||
unused := q.c.GetUnusedMemoryBytes()
|
||||
q.c.metrics.memoryUnused.WithLabelValues(q.labelValues...).Set(float64(unused))
|
||||
|
|
Loading…
Reference in New Issue