WIP - series point iterator

pull/9150/head
Edd Robinson 2017-09-29 12:56:20 +01:00 committed by Ben Johnson
parent aec607bddf
commit 6d87ff7fa2
No known key found for this signature in database
GPG Key ID: 81741CD251883081
3 changed files with 84 additions and 5 deletions

View File

@ -535,9 +535,16 @@ func (i *Index) UnassignShard(k string, shardID uint64) error {
// SeriesPointIterator returns an influxql iterator over all series.
func (i *Index) SeriesPointIterator(opt query.IteratorOptions) (query.Iterator, error) {
// TODO(edd): Create iterators for each Partition and return a merged
// iterator.
return nil, nil
// FIXME(edd): This needs implementing.
itrs := make([]*seriesPointIterator, len(i.partitions))
var err error
for k, p := range i.partitions {
if itrs[k], err = p.seriesPointIterator(opt); err != nil {
return nil, err
}
}
return MergeSeriesPointIterators(itrs...), nil
}
// Compact requests a compaction of log files in the index.

View File

@ -821,8 +821,8 @@ func (i *Partition) UnassignShard(k string, shardID uint64) error {
return i.DropSeries([]byte(k))
}
// SeriesPointIterator returns an influxql iterator over all series.
func (i *Partition) SeriesPointIterator(opt query.IteratorOptions) (query.Iterator, error) {
// seriesPointIterator returns an influxql iterator over all series.
func (i *Partition) seriesPointIterator(opt query.IteratorOptions) (*seriesPointIterator, error) {
// NOTE: The iterator handles releasing the file set.
fs := i.RetainFileSet()
return newSeriesPointIterator(fs, i.fieldset, opt), nil

View File

@ -414,6 +414,78 @@ func (itr *seriesIDMergeIterator) Next() SeriesIDElem {
return elem
}
// TODO(edd)
type SeriesPointMergeIterator interface {
Next() (*query.FloatPoint, error)
Close() error
Stats() query.IteratorStats
}
// TODO(edd)
func MergeSeriesPointIterators(itrs ...*seriesPointIterator) SeriesPointMergeIterator {
if n := len(itrs); n == 0 {
return nil
} else if n == 1 {
return itrs[0]
}
return &seriesPointMergeIterator{
buf: make([]*query.FloatPoint, len(itrs)),
itrs: itrs,
}
}
type seriesPointMergeIterator struct {
buf []*query.FloatPoint
itrs []*seriesPointIterator
}
// TODO(edd):
func (itr *seriesPointMergeIterator) Close() error { return nil }
func (itr *seriesPointMergeIterator) Stats() query.IteratorStats {
return query.IteratorStats{}
}
// TODO(edd)....
func (itr *seriesPointMergeIterator) Next() (*query.FloatPoint, error) {
// TODO(edd)...
// Find next lowest point amongst the buffers.
// var key []byte
// for i, buf := range itr.buf {
// // Fill buffer.
// if buf == nil {
// if buf = itr.itrs[i].Next(); buf != nil {
// itr.buf[i] = buf
// } else {
// continue
// }
// }
// // Find next lowest key.
// if key == nil || bytes.Compare(buf.Key(), key) == -1 {
// key = buf.Key()
// }
// }
// // Return nil if no elements remaining.
// if key == nil {
// return nil
// }
// // Merge elements together & clear buffer.
// itr.e = itr.e[:0]
// for i, buf := range itr.buf {
// if buf == nil || !bytes.Equal(buf.Key(), key) {
// continue
// }
// itr.e = append(itr.e, buf)
// itr.buf[i] = nil
// }
// return itr.e
return nil, nil
}
// IntersectSeriesIDIterators returns an iterator that only returns series which
// occur in both iterators. If both series have associated expressions then
// they are combined together.