fix(query): make mock Query close its results channel (#13242)
parent
c3d0122a75
commit
5e09aa178b
|
@ -10,7 +10,7 @@ import (
|
|||
"github.com/influxdata/influxdb/query"
|
||||
)
|
||||
|
||||
// ProxyQueryService mocks the idep QueryService for testing.
|
||||
// ProxyQueryService mocks the idpe QueryService for testing.
|
||||
type ProxyQueryService struct {
|
||||
QueryF func(ctx context.Context, w io.Writer, req *query.ProxyRequest) (flux.Statistics, error)
|
||||
}
|
||||
|
@ -50,30 +50,32 @@ func (s *AsyncQueryService) Query(ctx context.Context, req *query.Request) (flux
|
|||
|
||||
// Query is a mock implementation of a flux.Query.
|
||||
// It contains controls to ensure that the flux.Query object is used correctly.
|
||||
// Note: Query will only return one result, specified by calling the SetResults method.
|
||||
type Query struct {
|
||||
Metadata flux.Metadata
|
||||
|
||||
spec *flux.Spec
|
||||
ready chan flux.Result
|
||||
once sync.Once
|
||||
err error
|
||||
mu sync.Mutex
|
||||
done bool
|
||||
results chan flux.Result
|
||||
once sync.Once
|
||||
err error
|
||||
mu sync.Mutex
|
||||
done bool
|
||||
}
|
||||
|
||||
var _ flux.Query = &Query{}
|
||||
|
||||
// NewQuery constructs a new asynchronous query.
|
||||
func NewQuery(spec *flux.Spec) *Query {
|
||||
func NewQuery() *Query {
|
||||
return &Query{
|
||||
Metadata: make(flux.Metadata),
|
||||
spec: spec,
|
||||
ready: make(chan flux.Result, 1),
|
||||
results: make(chan flux.Result, 1),
|
||||
}
|
||||
}
|
||||
|
||||
func (q *Query) SetResults(results flux.Result) *Query {
|
||||
q.ready <- results
|
||||
q.results <- results
|
||||
q.once.Do(func() {
|
||||
close(q.results)
|
||||
})
|
||||
return q
|
||||
}
|
||||
|
||||
|
@ -83,12 +85,8 @@ func (q *Query) SetErr(err error) *Query {
|
|||
return q
|
||||
}
|
||||
|
||||
func (q *Query) Spec() *flux.Spec {
|
||||
return q.spec
|
||||
}
|
||||
|
||||
func (q *Query) Results() <-chan flux.Result {
|
||||
return q.ready
|
||||
return q.results
|
||||
}
|
||||
|
||||
func (q *Query) Done() {
|
||||
|
@ -99,10 +97,10 @@ func (q *Query) Done() {
|
|||
q.mu.Unlock()
|
||||
}
|
||||
|
||||
// Cancel closes the ready channel.
|
||||
// Cancel closes the results channel.
|
||||
func (q *Query) Cancel() {
|
||||
q.once.Do(func() {
|
||||
close(q.ready)
|
||||
close(q.results)
|
||||
})
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue