fix . Different columns in different shards returns invalid results

pull/392/head
John Shahid 2014-04-08 17:40:36 -04:00
parent 0748eda4ed
commit 2785782e17
5 changed files with 90 additions and 31 deletions

View File

@ -7,6 +7,7 @@
- [Issue #413](https://github.com/influxdb/influxdb/issues/413). Don't assume that group by interval is greater than a second
- [Issue #415](https://github.com/influxdb/influxdb/issues/415). Include the database when sending an auth error back to the user
- [Issue #421](https://github.com/influxdb/influxdb/issues/421). Make read timeout a config option
- [Issue #392](https://github.com/influxdb/influxdb/issues/392). Different columns in different shards returns invalid results when a query spans those shards
### Bugfixes

View File

@ -214,7 +214,7 @@ func (self *AllPointsWriter) yield(series *protocol.Series) error {
return nil
}
oldSeries.Points = append(oldSeries.Points, series.Points...)
self.memSeries[series.GetName()] = MergeSeries(self.memSeries[series.GetName()], series)
return nil
}

View File

@ -0,0 +1,79 @@
package common
import (
"protocol"
"reflect"
)
func pointMaps(s *protocol.Series) (result []map[string]*protocol.FieldValue) {
for _, p := range s.Points {
pointMap := map[string]*protocol.FieldValue{}
for idx, value := range p.Values {
pointMap[s.Fields[idx]] = value
}
result = append(result, pointMap)
}
return
}
// merges two time series making sure that the resulting series has
// the union of the two series columns and the values set
// properly. will panic if the two series don't have the same name
func MergeSeries(s1, s2 *protocol.Series) *protocol.Series {
if s1.GetName() != s1.GetName() {
panic("the two series don't have the same name")
}
// if the two series have the same columns and in the same order
// append the points and return.
if reflect.DeepEqual(s1.Fields, s2.Fields) {
s1.Points = append(s1.Points, s2.Points...)
return s1
}
columns := map[string]struct{}{}
for _, cs := range [][]string{s1.Fields, s2.Fields} {
for _, c := range cs {
columns[c] = struct{}{}
}
}
points := append(pointMaps(s1), pointMaps(s2)...)
fieldsSlice := make([]string, 0, len(columns))
for c, _ := range columns {
fieldsSlice = append(fieldsSlice, c)
}
resultPoints := make([]*protocol.Point, 0, len(points))
for idx, point := range points {
resultPoint := &protocol.Point{}
for _, field := range fieldsSlice {
value := point[field]
if value == nil {
value = &protocol.FieldValue{
IsNull: &TRUE,
}
}
resultPoint.Values = append(resultPoint.Values, value)
if idx < len(s1.Points) {
resultPoint.Timestamp = s1.Points[idx].Timestamp
resultPoint.SequenceNumber = s1.Points[idx].SequenceNumber
} else {
resultPoint.Timestamp = s2.Points[idx-len(s1.Points)].Timestamp
resultPoint.SequenceNumber = s2.Points[idx-len(s1.Points)].SequenceNumber
}
}
resultPoints = append(resultPoints, resultPoint)
}
// otherwise, merge the columns
result := &protocol.Series{
Name: s1.Name,
Fields: fieldsSlice,
Points: resultPoints,
}
return result
}

View File

@ -6,7 +6,10 @@ import (
"regexp"
)
var TRUE = true
var (
TRUE = true
FALSE = false
)
type TimePrecision int

View File

@ -3,8 +3,10 @@ package engine
// This engine buffers points and passes them through without modification. Works for queries
// that can't be aggregated locally or queries that don't require it like deletes and drops.
import (
log "code.google.com/p/log4go"
"common"
"protocol"
log "code.google.com/p/log4go"
)
type PassthroughEngine struct {
@ -45,34 +47,8 @@ func NewPassthroughEngineWithLimit(responseChan chan *protocol.Response, maxPoin
}
func (self *PassthroughEngine) YieldPoint(seriesName *string, columnNames []string, point *protocol.Point) bool {
self.responseType = &queryResponse
series := &protocol.Series{Name: seriesName, Points: []*protocol.Point{point}, Fields: columnNames}
self.limiter.calculateLimitAndSlicePoints(series)
if len(series.Points) == 0 {
return false
}
if self.response == nil {
self.response = &protocol.Response{
Type: self.responseType,
Series: series,
}
} else if *self.response.Series.Name != *seriesName {
self.responseChan <- self.response
self.response = &protocol.Response{
Type: self.responseType,
Series: series,
}
} else if len(self.response.Series.Points) > self.maxPointsInResponse {
self.responseChan <- self.response
self.response = &protocol.Response{
Type: self.responseType,
Series: series,
}
} else {
self.response.Series.Points = append(self.response.Series.Points, point)
}
return !self.limiter.hitLimit(*seriesName)
return self.YieldSeries(series)
}
func (self *PassthroughEngine) YieldSeries(seriesIncoming *protocol.Series) bool {
@ -108,7 +84,7 @@ func (self *PassthroughEngine) YieldSeries(seriesIncoming *protocol.Series) bool
Series: seriesIncoming,
}
} else {
self.response.Series.Points = append(self.response.Series.Points, seriesIncoming.Points...)
self.response.Series = common.MergeSeries(self.response.Series, seriesIncoming)
}
return !self.limiter.hitLimit(seriesIncoming.GetName())
//return true