Use combined IndexSet when executing meta queries

* removed unused fieldset field
pull/9586/head
Stuart Carnie 2018-03-15 09:35:15 -07:00 committed by Stuart Carnie
parent 8116cded0c
commit 6cf6ae7af4
2 changed files with 38 additions and 4 deletions

View File

@ -643,7 +643,6 @@ func (itr *seriesIDDifferenceIterator) Next() (_ SeriesIDElem, err error) {
type seriesPointIterator struct {
once sync.Once
indexSet IndexSet
fieldset *MeasurementFieldSet
mitr MeasurementIterator
keys [][]byte
opt query.IteratorOptions
@ -652,7 +651,7 @@ type seriesPointIterator struct {
}
// newSeriesPointIterator returns a new instance of seriesPointIterator.
func NewSeriesPointIterator(indexSet IndexSet, fieldset *MeasurementFieldSet, opt query.IteratorOptions) (_ query.Iterator, err error) {
func NewSeriesPointIterator(indexSet IndexSet, opt query.IteratorOptions) (_ query.Iterator, err error) {
// Only equality operators are allowed.
influxql.WalkFunc(opt.Condition, func(n influxql.Node) {
switch n := n.(type) {
@ -676,7 +675,6 @@ func NewSeriesPointIterator(indexSet IndexSet, fieldset *MeasurementFieldSet, op
return &seriesPointIterator{
indexSet: indexSet,
fieldset: fieldset,
mitr: mitr,
point: query.FloatPoint{
Aux: make([]interface{}, len(opt.Aux)),

View File

@ -408,6 +408,15 @@ func (s *Shard) Index() (Index, error) {
return s.index, nil
}
func (s *Shard) seriesFile() (*SeriesFile, error) {
s.mu.RLock()
defer s.mu.RUnlock()
if err := s.ready(); err != nil {
return nil, err
}
return s.sfile, nil
}
// IsIdle return true if the shard is not receiving writes and is fully compacted.
func (s *Shard) IsIdle() bool {
engine, err := s.engine()
@ -802,7 +811,7 @@ func (s *Shard) CreateIterator(ctx context.Context, m *influxql.Measurement, opt
return nil, err
}
indexSet := IndexSet{Indexes: []Index{index}, SeriesFile: s.sfile}
return NewSeriesPointIterator(indexSet, engine.MeasurementFieldSet(), opt)
return NewSeriesPointIterator(indexSet, opt)
case "_tagKeys":
return NewTagKeysIterator(s, opt)
}
@ -1207,6 +1216,11 @@ func (a Shards) MapType(measurement, field string) influxql.DataType {
}
func (a Shards) CreateIterator(ctx context.Context, measurement *influxql.Measurement, opt query.IteratorOptions) (query.Iterator, error) {
switch measurement.SystemIterator {
case "_series":
return a.createSeriesIterator(ctx, opt)
}
itrs := make([]query.Iterator, 0, len(a))
for _, sh := range a {
itr, err := sh.CreateIterator(ctx, measurement, opt)
@ -1237,6 +1251,28 @@ func (a Shards) CreateIterator(ctx context.Context, measurement *influxql.Measur
return query.Iterators(itrs).Merge(opt)
}
func (a Shards) createSeriesIterator(ctx context.Context, opt query.IteratorOptions) (_ query.Iterator, err error) {
var (
idxs = make([]Index, 0, len(a))
sfile *SeriesFile
)
for _, sh := range a {
var idx Index
if idx, err = sh.Index(); err == nil {
idxs = append(idxs, idx)
}
if sfile == nil {
sfile, _ = sh.seriesFile()
}
}
if sfile == nil {
return nil, errors.New("createSeriesIterator: no series file")
}
return NewSeriesPointIterator(IndexSet{Indexes: idxs, SeriesFile: sfile}, opt)
}
func (a Shards) IteratorCost(measurement string, opt query.IteratorOptions) (query.IteratorCost, error) {
var costs query.IteratorCost
var costerr error