influxdb/query/query.go

206 lines
4.9 KiB
Go
Raw Normal View History

2018-05-21 21:13:54 +00:00
package query
import (
"context"
"sort"
"github.com/influxdata/platform"
2018-05-21 21:13:54 +00:00
)
// QueryService represents a service for performing queries.
type QueryService interface {
// Query submits a query spec for execution returning a results iterator.
Query(ctx context.Context, orgID platform.ID, query *Spec) (ResultIterator, error)
// Query submits a query string for execution returning a results iterator.
QueryWithCompile(ctx context.Context, orgID platform.ID, query string) (ResultIterator, error)
}
// ResultIterator allows iterating through all results
type ResultIterator interface {
// More indicates if there are more results.
// More must be called until it returns false in order to free all resources.
More() bool
2018-05-21 21:13:54 +00:00
// Next returns the next result.
// If More is false, Next panics.
Next() Result
// Cancel discards the remaining results.
// If not all results are going to be read, Cancel must be called to free resources.
Cancel()
// Err reports the first error encountered.
Err() error
2018-05-21 21:13:54 +00:00
}
// AsyncQueryService represents a service for performing queries where the results are delivered asynchronously.
type AsyncQueryService interface {
// Query submits a query for execution returning immediately.
// The spec must not be modified while the query is still active.
// Done must be called on any returned Query objects.
Query(ctx context.Context, orgID platform.ID, query *Spec) (Query, error)
// QueryWithCompile submits a query for execution returning immediately.
// The query string will be compiled before submitting for execution.
// Done must be called on returned Query objects.
QueryWithCompile(ctx context.Context, orgID platform.ID, query string) (Query, error)
2018-05-21 21:13:54 +00:00
}
// Query represents an active query.
type Query interface {
// Spec returns the spec used to execute this query.
// Spec must not be modified.
Spec() *Spec
// Ready returns a channel that will deliver the query results.
// Its possible that the channel is closed before any results arrive,
// in which case the query should be inspected for an error using Err().
Ready() <-chan map[string]Result
// Done must always be called to free resources.
Done()
// Cancel will stop the query execution.
// Done must still be called to free resources.
Cancel()
// Err reports any error the query may have encountered.
Err() error
}
// QueryServiceBridge implements the QueryService interface while consuming the AsyncQueryService interface.
type QueryServiceBridge struct {
AsyncQueryService AsyncQueryService
}
func (b QueryServiceBridge) Query(ctx context.Context, orgID platform.ID, spec *Spec) (ResultIterator, error) {
query, err := b.AsyncQueryService.Query(ctx, orgID, spec)
if err != nil {
return nil, err
2018-05-21 21:13:54 +00:00
}
return newResultIterator(query), nil
}
func (b QueryServiceBridge) QueryWithCompile(ctx context.Context, orgID platform.ID, queryStr string) (ResultIterator, error) {
query, err := b.AsyncQueryService.QueryWithCompile(ctx, orgID, queryStr)
if err != nil {
return nil, err
2018-05-21 21:13:54 +00:00
}
return newResultIterator(query), nil
2018-05-21 21:13:54 +00:00
}
// resultIterator implements a ResultIterator while consuming a Query
type resultIterator struct {
query Query
cancel chan struct{}
ready bool
results *MapResultIterator
2018-05-21 21:13:54 +00:00
}
func newResultIterator(q Query) *resultIterator {
return &resultIterator{
query: q,
cancel: make(chan struct{}),
2018-05-21 21:13:54 +00:00
}
}
func (r *resultIterator) More() bool {
if !r.ready {
select {
case <-r.cancel:
goto DONE
case results, ok := <-r.query.Ready():
if !ok {
goto DONE
}
r.ready = true
r.results = NewMapResultIterator(results)
2018-05-21 21:13:54 +00:00
}
}
if r.results.More() {
return true
}
DONE:
r.query.Done()
return false
2018-05-21 21:13:54 +00:00
}
func (r *resultIterator) Next() Result {
return r.results.Next()
}
2018-05-21 21:13:54 +00:00
func (r *resultIterator) Cancel() {
select {
case <-r.cancel:
default:
close(r.cancel)
2018-05-21 21:13:54 +00:00
}
r.query.Cancel()
}
2018-05-21 21:13:54 +00:00
func (r *resultIterator) Err() error {
return r.query.Err()
}
2018-05-21 21:13:54 +00:00
type MapResultIterator struct {
results map[string]Result
order []string
}
2018-05-21 21:13:54 +00:00
func NewMapResultIterator(results map[string]Result) *MapResultIterator {
order := make([]string, 0, len(results))
for k := range results {
order = append(order, k)
2018-05-21 21:13:54 +00:00
}
sort.Strings(order)
return &MapResultIterator{
results: results,
order: order,
2018-05-21 21:13:54 +00:00
}
}
func (r *MapResultIterator) More() bool {
return len(r.order) > 0
2018-05-21 21:13:54 +00:00
}
func (r *MapResultIterator) Next() Result {
next := r.order[0]
r.order = r.order[1:]
return r.results[next]
2018-05-21 21:13:54 +00:00
}
func (r *MapResultIterator) Cancel() {
2018-05-21 21:13:54 +00:00
}
func (r *MapResultIterator) Err() error {
return nil
2018-05-21 21:13:54 +00:00
}
type SliceResultIterator struct {
results []Result
}
func NewSliceResultIterator(results []Result) *SliceResultIterator {
return &SliceResultIterator{
results: results,
}
}
func (r *SliceResultIterator) More() bool {
return len(r.results) > 0
}
func (r *SliceResultIterator) Next() Result {
next := r.results[0]
r.results = r.results[1:]
return next
}
func (r *SliceResultIterator) Cancel() {
}
func (r *SliceResultIterator) Err() error {
return nil
}