feat(query): enable the mqtt pool dialer by default (#23226)

pull/23227/head
Jonathan A. Sternberg 2022-03-24 12:35:10 -05:00 committed by GitHub
parent 050449803a
commit 5231d2d197
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 71 additions and 12 deletions

2
go.mod
View File

@ -34,7 +34,7 @@ require (
github.com/hashicorp/vault/api v1.0.2
github.com/imdario/mergo v0.3.9 // indirect
github.com/influxdata/cron v0.0.0-20201006132531-4bb0a200dcbe
github.com/influxdata/flux v0.159.1-0.20220322154400-5e19bfa74b44
github.com/influxdata/flux v0.160.1-0.20220324150044-30cb3a7e72f6
github.com/influxdata/httprouter v1.3.1-0.20191122104820-ee83e2772f69
github.com/influxdata/influxql v1.1.1-0.20211004132434-7e7d61973256
github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839

4
go.sum
View File

@ -503,8 +503,8 @@ github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NH
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/influxdata/cron v0.0.0-20201006132531-4bb0a200dcbe h1:7j4SdN/BvQwN6WoUq7mv0kg5U9NhnFBxPGMafYRKym0=
github.com/influxdata/cron v0.0.0-20201006132531-4bb0a200dcbe/go.mod h1:XabtPPW2qsCg0tl+kjaPU+cFS+CjQXEXbT1VJvHT4og=
github.com/influxdata/flux v0.159.1-0.20220322154400-5e19bfa74b44 h1:Q6U31iE0HzFpzS+JRZRyFvp6TSDBU7bWpQ1N78yltCw=
github.com/influxdata/flux v0.159.1-0.20220322154400-5e19bfa74b44/go.mod h1:dALQQHRj+70b+o/9RtaHAAXH3toMs2M58gfY66oEll8=
github.com/influxdata/flux v0.160.1-0.20220324150044-30cb3a7e72f6 h1:gWJb1CkCX0LrQFjL8C2OwG8v6QW3eYCQIg3PZS1engg=
github.com/influxdata/flux v0.160.1-0.20220324150044-30cb3a7e72f6/go.mod h1:dALQQHRj+70b+o/9RtaHAAXH3toMs2M58gfY66oEll8=
github.com/influxdata/gosnowflake v1.6.9 h1:BhE39Mmh8bC+Rvd4QQsP2gHypfeYIH1wqW1AjGWxxrE=
github.com/influxdata/gosnowflake v1.6.9/go.mod h1:9W/BvCXOKx2gJtQ+jdi1Vudev9t9/UDOEHnlJZ/y1nU=
github.com/influxdata/httprouter v1.3.1-0.20191122104820-ee83e2772f69 h1:WQsmW0fXO4ZE/lFGIE84G6rIV5SJN3P3sjIXAP1a8eU=

View File

@ -27,6 +27,7 @@ import (
"github.com/influxdata/flux"
"github.com/influxdata/flux/codes"
"github.com/influxdata/flux/dependency"
"github.com/influxdata/flux/execute/table"
"github.com/influxdata/flux/lang"
"github.com/influxdata/flux/memory"
@ -244,11 +245,10 @@ func (c *Controller) Query(ctx context.Context, req *query.Request) (flux.Query,
// Set the org label value for controller metrics
ctx = context.WithValue(ctx, orgLabel, req.OrganizationID.String()) //lint:ignore SA1029 this is a temporary ignore until we have time to create an appropriate type
// The controller injects the dependencies for each incoming request.
for _, dep := range c.dependencies {
ctx = dep.Inject(ctx)
}
q, err := c.query(ctx, req.Compiler)
ctx, deps := dependency.Inject(ctx, c.dependencies...)
q, err := c.query(ctx, req.Compiler, deps)
if err != nil {
deps.Finish()
return q, err
}
@ -257,8 +257,8 @@ func (c *Controller) Query(ctx context.Context, req *query.Request) (flux.Query,
// query submits a query for execution returning immediately.
// Done must be called on any returned Query objects.
func (c *Controller) query(ctx context.Context, compiler flux.Compiler) (flux.Query, error) {
q, err := c.createQuery(ctx, compiler)
func (c *Controller) query(ctx context.Context, compiler flux.Compiler, deps *dependency.Span) (flux.Query, error) {
q, err := c.createQuery(ctx, compiler, deps)
if err != nil {
return nil, handleFluxError(err)
}
@ -278,7 +278,7 @@ func (c *Controller) query(ctx context.Context, compiler flux.Compiler) (flux.Qu
return q, nil
}
func (c *Controller) createQuery(ctx context.Context, compiler flux.Compiler) (*Query, error) {
func (c *Controller) createQuery(ctx context.Context, compiler flux.Compiler, deps *dependency.Span) (*Query, error) {
c.queriesMu.RLock()
if c.shutdown {
c.queriesMu.RUnlock()
@ -321,6 +321,7 @@ func (c *Controller) createQuery(ctx context.Context, compiler flux.Compiler) (*
parentSpan: parentSpan,
cancel: cancel,
doneCh: make(chan struct{}),
deps: deps,
compiler: compiler,
}
@ -628,6 +629,7 @@ type Query struct {
memoryManager *queryMemoryManager
alloc *memory.ResourceAllocator
deps *dependency.Span
}
func (q *Query) ProfilerResults() (flux.ResultIterator, error) {
@ -723,6 +725,9 @@ func (q *Query) Done() {
}
q.stats.RuntimeErrors = errMsgs
// Clean up the dependencies.
q.deps.Finish()
// Mark the query as finished so it is removed from the query map.
q.c.finish(q)

View File

@ -13,6 +13,7 @@ import (
"github.com/influxdata/flux"
"github.com/influxdata/flux/arrow"
"github.com/influxdata/flux/codes"
"github.com/influxdata/flux/dependency"
"github.com/influxdata/flux/execute"
"github.com/influxdata/flux/execute/executetest"
"github.com/influxdata/flux/lang"
@ -1450,6 +1451,56 @@ func TestController_ReserveMemoryWithoutExceedingMax(t *testing.T) {
validateUnusedMemory(t, reg, config)
}
func TestController_OnFinish(t *testing.T) {
closed := false
config := control.Config{
ConcurrencyQuota: 1,
MemoryBytesQuotaPerQuery: 1024,
QueueSize: 1,
ExecutorDependencies: []flux.Dependency{
mock.Dependency{
InjectFn: func(ctx context.Context) context.Context {
dependency.OnFinishFunc(ctx, func() error {
closed = true
return nil
})
return ctx
},
},
},
}
logger := zaptest.NewLogger(t)
ctrl, err := control.New(config, logger)
if err != nil {
t.Fatal(err)
}
defer shutdown(t, ctrl)
done := make(chan struct{})
defer close(done)
compiler := &mock.Compiler{
CompileFn: func(ctx context.Context) (flux.Program, error) {
return &mock.Program{}, nil
},
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
q, err := ctrl.Query(ctx, makeRequest(compiler))
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
consumeResults(t, q)
// The dependency should be closed.
if !closed {
t.Error("finish function was not executed")
}
}
func consumeResults(tb testing.TB, q flux.Query) {
tb.Helper()
for res := range q.Results() {

View File

@ -8,6 +8,7 @@ import (
"github.com/google/go-cmp/cmp"
"github.com/influxdata/flux"
"github.com/influxdata/flux/dependencies/dependenciestest"
"github.com/influxdata/flux/dependency"
"github.com/influxdata/flux/execute"
"github.com/influxdata/flux/execute/executetest"
"github.com/influxdata/flux/memory"
@ -131,7 +132,8 @@ func TestMetrics(t *testing.T) {
// This key/value pair added to the context will appear as a label in the prometheus histogram.
ctx := context.WithValue(context.Background(), labelKey, labelValue) //lint:ignore SA1029 this is a temporary ignore until we have time to create an appropriate type
// Injecting deps
ctx = deps.Inject(ctx)
ctx, span := dependency.Inject(ctx, deps)
defer span.Finish()
a := &mockAdministration{Ctx: ctx}
rfs := influxdb.ReadFilterSource(
execute.DatasetID(uuid.FromTime(time.Now())),
@ -282,7 +284,8 @@ func TestReadWindowAggregateSource(t *testing.T) {
Metrics: metrics,
},
}
ctx := deps.Inject(context.Background())
ctx, span := dependency.Inject(context.Background(), deps)
defer span.Finish()
ctx = query.ContextWithRequest(ctx, &query.Request{
OrganizationID: orgID,
})