diff --git a/src/coordinator/coordinator.go b/src/coordinator/coordinator.go index 7d92fea03b..d4a5dca68a 100644 --- a/src/coordinator/coordinator.go +++ b/src/coordinator/coordinator.go @@ -95,7 +95,7 @@ func (self *CoordinatorImpl) DistributeQuery(user common.User, db string, query } local := make(chan *protocol.Response) - nextPointMap := make(map[string]*protocol.Point) + nextPointMap := make(map[string]*NextPoint) // TODO: this style of wrapping the series in response objects with the // last point time is duplicated in the request handler. Refactor... diff --git a/src/coordinator/protobuf_request_handler.go b/src/coordinator/protobuf_request_handler.go index 199009791c..8b3e00b317 100644 --- a/src/coordinator/protobuf_request_handler.go +++ b/src/coordinator/protobuf_request_handler.go @@ -224,11 +224,17 @@ func (self *ProtobufRequestHandler) handleReplay(request *protocol.Request, conn } } -func createResponse(nextPointMap map[string]*protocol.Point, series *protocol.Series, id *uint32) *protocol.Response { +type NextPoint struct { + fields []string + point *protocol.Point +} + +func createResponse(nextPointMap map[string]*NextPoint, series *protocol.Series, id *uint32) *protocol.Response { pointCount := len(series.Points) - if pointCount <= 1 { + if pointCount < 1 { if nextPoint := nextPointMap[*series.Name]; nextPoint != nil { - series.Points = append(series.Points, nextPoint) + series.Points = append(series.Points, nextPoint.point) + series.Fields = nextPoint.fields } response := &protocol.Response{Type: &queryResponse, Series: series, RequestId: id} @@ -239,7 +245,7 @@ func createResponse(nextPointMap map[string]*protocol.Point, series *protocol.Se series.Points[pointCount-1] = nil if oldNextPoint != nil { copy(series.Points[1:], series.Points[0:]) - series.Points[0] = oldNextPoint + series.Points[0] = oldNextPoint.point } else { series.Points = series.Points[:len(series.Points)-1] } @@ -247,13 +253,13 @@ func createResponse(nextPointMap map[string]*protocol.Point, series *protocol.Se response := &protocol.Response{Series: series, Type: &queryResponse, RequestId: id} if nextPoint != nil { response.NextPointTime = nextPoint.Timestamp - nextPointMap[*series.Name] = nextPoint + nextPointMap[*series.Name] = &NextPoint{series.Fields, nextPoint} } return response } func (self *ProtobufRequestHandler) handleQuery(request *protocol.Request, conn net.Conn) { - nextPointMap := make(map[string]*protocol.Point) + nextPointMap := make(map[string]*NextPoint) assignNextPointTimesAndSend := func(series *protocol.Series) error { response := createResponse(nextPointMap, series, request.Id) return self.WriteResponse(conn, response) diff --git a/src/integration/server_test.go b/src/integration/server_test.go index 9d1ffa45e8..9cf8ca2956 100644 --- a/src/integration/server_test.go +++ b/src/integration/server_test.go @@ -415,14 +415,15 @@ func (self *ServerSuite) TestFailureAndDeleteReplays(c *C) { func (self *ServerSuite) TestColumnNamesReturnInDistributedQuery(c *C) { data := `[{ "name": "cluster_query_with_columns", - "columns": ["asdf"], - "points": [[1]] + "columns": ["col1"], + "points": [[1], [2]] }]` self.serverProcesses[0].Post("/db/test_rep/series?u=paul&p=pass", data, c) for _, s := range self.serverProcesses { collection := s.Query("test_rep", "select * from cluster_query_with_columns", false, c) series := collection.GetSeries("cluster_query_with_columns", c) - c.Assert(series.GetValueForPointAndColumn(0, "asdf", c), Equals, float64(1)) + c.Assert(series.GetValueForPointAndColumn(0, "col1", c), Equals, float64(2)) + c.Assert(series.GetValueForPointAndColumn(1, "col1", c), Equals, float64(1)) } }