refactor: unify WindowAggregateCapability (#17901)

pull/17898/head
Yiqun (Ethan) Zhang 2020-05-05 01:16:55 -05:00 committed by Yiqun Zhang
parent 1d027cf4e0
commit 8bb5065769
5 changed files with 21 additions and 23 deletions

View File

@ -42,19 +42,16 @@ func (s *StorageReader) Close() {
type WindowAggregateStoreReader struct {
*StorageReader
HasWindowAggregateCapabilityFn func(ctx context.Context) bool
GetWindowAggregateCapabilityFn func(ctx context.Context) query.WindowAggregateCapability
ReadWindowAggregateFn func(ctx context.Context, spec query.ReadWindowAggregateSpec, alloc *memory.Allocator) (query.TableIterator, error)
}
func (s *WindowAggregateStoreReader) HasWindowAggregateCapability(ctx context.Context, capability ...*query.WindowAggregateCapability) bool {
func (s *WindowAggregateStoreReader) GetWindowAggregateCapability(ctx context.Context) query.WindowAggregateCapability {
// Use the function if it exists.
if s.HasWindowAggregateCapabilityFn != nil {
return s.HasWindowAggregateCapabilityFn(ctx)
if s.GetWindowAggregateCapabilityFn != nil {
return s.GetWindowAggregateCapabilityFn(ctx)
}
// Provide a default implementation if one wasn't set.
// This will return true if the other function was set.
return s.ReadWindowAggregateFn != nil
return nil
}
func (s *WindowAggregateStoreReader) ReadWindowAggregate(ctx context.Context, spec query.ReadWindowAggregateSpec, alloc *memory.Allocator) (query.TableIterator, error) {

View File

@ -25,14 +25,13 @@ type StorageReader interface {
}
// WindowAggregateCapability describes what is supported by WindowAggregateReader.
type WindowAggregateCapability struct{}
type WindowAggregateCapability interface {}
// WindowAggregateReader implements the WindowAggregate capability.
type WindowAggregateReader interface {
// HasWindowAggregateCapability will test if this Reader source supports the ReadWindowAggregate capability.
// If WindowAggregateCapability is passed to the method, then the struct
// is filled with a detailed list of what the RPC call supports.
HasWindowAggregateCapability(ctx context.Context, capability ...*WindowAggregateCapability) bool
// GetWindowAggregateCapability will get a detailed list of what the RPC call supports
// for window aggregate.
GetWindowAggregateCapability(ctx context.Context) WindowAggregateCapability
// ReadWindowAggregate will read a table using the WindowAggregate method.
ReadWindowAggregate(ctx context.Context, spec ReadWindowAggregateSpec, alloc *memory.Allocator) (TableIterator, error)

View File

@ -80,11 +80,11 @@ func (r *storeReader) ReadGroup(ctx context.Context, spec query.ReadGroupSpec, a
}, nil
}
func (r *storeReader) HasWindowAggregateCapability(ctx context.Context, capability ...*query.WindowAggregateCapability) bool {
func (r *storeReader) GetWindowAggregateCapability(ctx context.Context) query.WindowAggregateCapability {
if aggStore, ok := r.s.(storage.WindowAggregateStore); ok {
return aggStore.HasWindowAggregateCapability(ctx)
return aggStore.GetWindowAggregateCapability(ctx)
}
return false
return nil
}
func (r *storeReader) ReadWindowAggregate(ctx context.Context, spec query.ReadWindowAggregateSpec, alloc *memory.Allocator) (query.TableIterator, error) {

View File

@ -5,6 +5,7 @@ import (
"github.com/gogo/protobuf/proto"
"github.com/influxdata/influxdb/v2/models"
"github.com/influxdata/influxdb/v2/query"
"github.com/influxdata/influxdb/v2/storage/reads/datatypes"
"github.com/influxdata/influxdb/v2/tsdb/cursors"
)
@ -84,14 +85,15 @@ type Store interface {
}
// WindowAggregateCapability describes what is supported by WindowAggregateStore.
type WindowAggregateCapability struct{}
type WindowAggregateCapability interface{
query.WindowAggregateCapability
}
// WindowAggregateStore implements the WindowAggregate capability.
type WindowAggregateStore interface {
// HasWindowAggregateCapability checks if this Store supports the capability.
// If WindowAggregateCapability is passed to the method, then the struct
// is filled with a detailed list of what the RPC call supports.
HasWindowAggregateCapability(ctx context.Context, capability ...*WindowAggregateCapability) bool
// GetWindowAggregateCapability will get a detailed list of what the RPC call supports
// for window aggregate.
GetWindowAggregateCapability(ctx context.Context) WindowAggregateCapability
// WindowAggregate will invoke a ReadWindowAggregateRequest against the Store.
WindowAggregate(ctx context.Context, req *datatypes.ReadWindowAggregateRequest) (ResultSet, error)

View File

@ -154,8 +154,8 @@ func (s *store) GetSource(orgID, bucketID uint64) proto.Message {
}
}
func (s *store) HasWindowAggregateCapability(ctx context.Context, capability ...*reads.WindowAggregateCapability) bool {
return false
func (s *store) GetWindowAggregateCapability(ctx context.Context) reads.WindowAggregateCapability {
return nil
}
// WindowAggregate will invoke a ReadWindowAggregateRequest against the Store.