Merge pull request #181 from influxdb/fix-180-column-names

Fix #180. Column names aren't returned when running a multinode cluster
pull/185/head
Paul Dix 2014-01-13 11:13:11 -08:00
commit 4b2bef87f8
3 changed files with 17 additions and 10 deletions

View File

@ -95,7 +95,7 @@ func (self *CoordinatorImpl) DistributeQuery(user common.User, db string, query
} }
local := make(chan *protocol.Response) local := make(chan *protocol.Response)
nextPointMap := make(map[string]*protocol.Point) nextPointMap := make(map[string]*NextPoint)
// TODO: this style of wrapping the series in response objects with the // TODO: this style of wrapping the series in response objects with the
// last point time is duplicated in the request handler. Refactor... // last point time is duplicated in the request handler. Refactor...

View File

@ -224,11 +224,17 @@ func (self *ProtobufRequestHandler) handleReplay(request *protocol.Request, conn
} }
} }
func createResponse(nextPointMap map[string]*protocol.Point, series *protocol.Series, id *uint32) *protocol.Response { type NextPoint struct {
fields []string
point *protocol.Point
}
func createResponse(nextPointMap map[string]*NextPoint, series *protocol.Series, id *uint32) *protocol.Response {
pointCount := len(series.Points) pointCount := len(series.Points)
if pointCount <= 1 { if pointCount < 1 {
if nextPoint := nextPointMap[*series.Name]; nextPoint != nil { if nextPoint := nextPointMap[*series.Name]; nextPoint != nil {
series.Points = append(series.Points, nextPoint) series.Points = append(series.Points, nextPoint.point)
series.Fields = nextPoint.fields
} }
response := &protocol.Response{Type: &queryResponse, Series: series, RequestId: id} response := &protocol.Response{Type: &queryResponse, Series: series, RequestId: id}
@ -239,7 +245,7 @@ func createResponse(nextPointMap map[string]*protocol.Point, series *protocol.Se
series.Points[pointCount-1] = nil series.Points[pointCount-1] = nil
if oldNextPoint != nil { if oldNextPoint != nil {
copy(series.Points[1:], series.Points[0:]) copy(series.Points[1:], series.Points[0:])
series.Points[0] = oldNextPoint series.Points[0] = oldNextPoint.point
} else { } else {
series.Points = series.Points[:len(series.Points)-1] series.Points = series.Points[:len(series.Points)-1]
} }
@ -247,13 +253,13 @@ func createResponse(nextPointMap map[string]*protocol.Point, series *protocol.Se
response := &protocol.Response{Series: series, Type: &queryResponse, RequestId: id} response := &protocol.Response{Series: series, Type: &queryResponse, RequestId: id}
if nextPoint != nil { if nextPoint != nil {
response.NextPointTime = nextPoint.Timestamp response.NextPointTime = nextPoint.Timestamp
nextPointMap[*series.Name] = nextPoint nextPointMap[*series.Name] = &NextPoint{series.Fields, nextPoint}
} }
return response return response
} }
func (self *ProtobufRequestHandler) handleQuery(request *protocol.Request, conn net.Conn) { func (self *ProtobufRequestHandler) handleQuery(request *protocol.Request, conn net.Conn) {
nextPointMap := make(map[string]*protocol.Point) nextPointMap := make(map[string]*NextPoint)
assignNextPointTimesAndSend := func(series *protocol.Series) error { assignNextPointTimesAndSend := func(series *protocol.Series) error {
response := createResponse(nextPointMap, series, request.Id) response := createResponse(nextPointMap, series, request.Id)
return self.WriteResponse(conn, response) return self.WriteResponse(conn, response)

View File

@ -415,14 +415,15 @@ func (self *ServerSuite) TestFailureAndDeleteReplays(c *C) {
func (self *ServerSuite) TestColumnNamesReturnInDistributedQuery(c *C) { func (self *ServerSuite) TestColumnNamesReturnInDistributedQuery(c *C) {
data := `[{ data := `[{
"name": "cluster_query_with_columns", "name": "cluster_query_with_columns",
"columns": ["asdf"], "columns": ["col1"],
"points": [[1]] "points": [[1], [2]]
}]` }]`
self.serverProcesses[0].Post("/db/test_rep/series?u=paul&p=pass", data, c) self.serverProcesses[0].Post("/db/test_rep/series?u=paul&p=pass", data, c)
for _, s := range self.serverProcesses { for _, s := range self.serverProcesses {
collection := s.Query("test_rep", "select * from cluster_query_with_columns", false, c) collection := s.Query("test_rep", "select * from cluster_query_with_columns", false, c)
series := collection.GetSeries("cluster_query_with_columns", c) series := collection.GetSeries("cluster_query_with_columns", c)
c.Assert(series.GetValueForPointAndColumn(0, "asdf", c), Equals, float64(1)) c.Assert(series.GetValueForPointAndColumn(0, "col1", c), Equals, float64(2))
c.Assert(series.GetValueForPointAndColumn(1, "col1", c), Equals, float64(1))
} }
} }