some documentation and small refactoring.

pull/17/head
John Shahid 2013-10-29 14:33:09 -04:00
parent 1287f1eb5a
commit 016093631e
1 changed files with 38 additions and 27 deletions

View File

@ -62,7 +62,7 @@ func getMergeYield(table1, table2 string, yield func(*protocol.Series) error) fu
})
}
type MergeState struct {
type mergeState struct {
name string
series []*protocol.Series
fields []string
@ -71,15 +71,19 @@ type MergeState struct {
done bool
}
func (self *MergeState) hasPoints() bool {
func (self *mergeState) hasPoints() bool {
return len(self.series) > 0 && len(self.series[0].Points) > 0
}
func (self *MergeState) isEarlier(other *MergeState) bool {
func (self *mergeState) isEarlier(other *mergeState) bool {
return *self.series[0].Points[0].Timestamp < *other.series[0].Points[0].Timestamp
}
func (self *MergeState) mergeValues(other *MergeState, p *protocol.Point) {
// set the fields of the other time series to null making sure that
// the order of the null values match the order of the field
// definitions, i.e. left timeseries first followed by values from the
// right timeseries
func (self *mergeState) mergeValues(other *mergeState, p *protocol.Point) {
if self.isLeft {
p.Values = append(p.Values, other.nullValues...)
return
@ -87,7 +91,11 @@ func (self *MergeState) mergeValues(other *MergeState, p *protocol.Point) {
p.Values = append(other.nullValues, p.Values...)
}
func (self *MergeState) flushIfOtherStateIsEmpty(other *MergeState, fields []string, yield func(*protocol.Series) error) error {
// if `other` state is done (i.e. we'll receive no more points for its
// timeseries) then we know that we won't get any points that are
// older than what's in `self` so we can safely flush all `self`
// points.
func (self *mergeState) flushIfOtherStateIsEmpty(other *mergeState, fields []string, yield func(*protocol.Series) error) error {
if other.done && len(other.series) == 0 {
for _, s := range self.series {
for _, p := range s.Points {
@ -108,8 +116,8 @@ func (self *MergeState) flushIfOtherStateIsEmpty(other *MergeState, fields []str
return nil
}
// update the state, the points belong to this MergeState (i.e. the name of the timeseries matches)
func (self *MergeState) updateState(p *protocol.Series) {
// update the state, the points belong to this mergeState (i.e. the name of the timeseries matches)
func (self *mergeState) updateState(p *protocol.Series) {
if *p.Name != self.name {
return
}
@ -132,16 +140,28 @@ func (self *MergeState) updateState(p *protocol.Series) {
}
}
func (self *mergeState) removeAndGetFirstPoint() *protocol.Point {
point := self.series[0].Points[0]
// get rid of that point, or get rid of the entire series
// if this is the last point
if len(self.series[0].Points) == 1 {
self.series = self.series[1:]
} else {
self.series[0].Points = self.series[0].Points[1:]
}
return point
}
// returns a yield function that will sort points from table1 and
// table2 no matter what the order in which they are received.
//
// FIXME: This won't work for queries that are executed in descending order.
func mergeYield(table1, table2 string, yield func(*protocol.Series) error) func(*protocol.Series) error {
state1 := &MergeState{
state1 := &mergeState{
name: table1,
isLeft: true,
}
state2 := &MergeState{
state2 := &mergeState{
name: table2,
isLeft: false,
}
@ -152,10 +172,6 @@ func mergeYield(table1, table2 string, yield func(*protocol.Series) error) func(
mergedFields := append(state1.fields, state2.fields...)
// if one of the states (let's call it s1) is done (i.e. we'll
// receive no more points for that series) then we know that we
// won't get any points for s1 that are older than what's in s2 so
// we can safely flush all s2's points.
if err := state1.flushIfOtherStateIsEmpty(state2, mergedFields, yield); err != nil {
return err
}
@ -163,30 +179,25 @@ func mergeYield(table1, table2 string, yield func(*protocol.Series) error) func(
return err
}
// see which point should be returned next and remove it from the
// series
for state1.hasPoints() && state2.hasPoints() {
var points []*protocol.Point
// state is the state of the series from which the next point
// will be fetched
state := state1
otherState := state2
if state2.isEarlier(state1) {
state = state2
otherState = state1
}
points = []*protocol.Point{state.series[0].Points[0]}
for _, p := range points {
state.mergeValues(otherState, p)
}
// get rid of that point, or get rid of the entire series
// if this is the last point
if len(state.series[0].Points) == 1 {
state.series = state.series[1:]
} else {
state.series[0].Points = state.series[0].Points[1:]
}
p := state.removeAndGetFirstPoint()
state.mergeValues(otherState, p)
err := yield(&protocol.Series{
Name: &state.name,
Fields: mergedFields,
Points: points,
Points: []*protocol.Point{p},
})
if err != nil {
return err