influxdb/engine/common_merge_engine.go

50 lines
1.2 KiB
Go
Raw Normal View History

2013-10-28 19:24:12 +00:00
package engine
import "github.com/influxdb/influxdb/protocol"
2013-10-29 18:22:11 +00:00
type CommonMergeEngine struct {
2014-08-29 22:17:07 +00:00
merger *Merger
2014-09-11 21:33:21 +00:00
streams map[uint32]StreamUpdate
2014-08-29 22:17:07 +00:00
next Processor
}
2013-10-29 18:22:11 +00:00
// returns a yield function that will sort points from table1 and
// table2 no matter what the order in which they are received.
2014-09-11 21:33:21 +00:00
func NewCommonMergeEngine(shards []uint32, mergeColumns bool, ascending bool, next Processor) *CommonMergeEngine {
2014-08-29 22:17:07 +00:00
cme := &CommonMergeEngine{
2014-09-11 21:33:21 +00:00
streams: make(map[uint32]StreamUpdate, len(shards)),
2014-08-29 22:17:07 +00:00
next: next,
2013-10-29 18:22:11 +00:00
}
2014-09-11 21:33:21 +00:00
streams := make([]StreamQuery, len(shards))
for i, sh := range shards {
2014-08-29 22:17:07 +00:00
s := NewStream()
streams[i] = s
2014-09-11 21:33:21 +00:00
cme.streams[sh] = s
2013-10-29 18:22:11 +00:00
}
2014-08-29 22:17:07 +00:00
h := &SeriesHeap{Ascending: ascending}
cme.merger = NewCME("Engine", streams, h, next, mergeColumns)
return cme
}
2013-10-29 18:22:11 +00:00
2014-08-29 22:17:07 +00:00
func (cme *CommonMergeEngine) Close() error {
for _, s := range cme.streams {
s.Close()
}
2014-08-29 22:17:07 +00:00
_, err := cme.merger.Update()
if err != nil {
return err
}
2014-08-29 22:17:07 +00:00
return cme.next.Close()
}
2013-10-29 18:22:11 +00:00
2014-08-29 22:17:07 +00:00
func (cme *CommonMergeEngine) Yield(s *protocol.Series) (bool, error) {
2014-09-11 21:33:21 +00:00
stream := cme.streams[s.GetShardId()]
2014-08-29 22:17:07 +00:00
stream.Yield(s)
return cme.merger.Update()
}
2014-08-29 22:17:07 +00:00
func (cme *CommonMergeEngine) Name() string {
return "CommonMergeEngine"
}