diff --git a/cluster/service.go b/cluster/service.go index fd583b2eac..186cf8434c 100644 --- a/cluster/service.go +++ b/cluster/service.go @@ -260,6 +260,9 @@ func (s *Service) processMapShardRequest(w io.Writer, buf []byte) error { return fmt.Errorf("next chunk: %s", err) } + // NOTE: Even if the chunk is nil, we still need to send one + // empty response to let the other side know we're out of data. + if chunk != nil { b, err := json.Marshal(chunk) if err != nil { @@ -270,12 +273,11 @@ func (s *Service) processMapShardRequest(w io.Writer, buf []byte) error { // Write to connection. resp.SetCode(0) - v := chunk.(*tsdb.MapperOutput) if err := writeMapShardResponseMessage(w, &resp); err != nil { return err } - if v == nil { + if chunk == nil { // All mapper data sent. return nil } diff --git a/tsdb/mapper.go b/tsdb/mapper.go index b3e2d7d077..22ad9b74d4 100644 --- a/tsdb/mapper.go +++ b/tsdb/mapper.go @@ -271,7 +271,10 @@ func (lm *LocalMapper) NextChunk() (interface{}, error) { b, err := lm.remote.NextChunk() if err != nil { return nil, err + } else if b == nil { + return nil, nil } + mo := &MapperOutput{} if err := json.Unmarshal(b.([]byte), mo); err != nil { return nil, err @@ -293,7 +296,7 @@ func (lm *LocalMapper) NextChunk() (interface{}, error) { // nextChunkRaw returns the next chunk of data. Data comes in the same order as the // tags return by TagSets. A chunk never contains data for more than 1 tagset. // If there is no more data for any tagset, nil will be returned. -func (lm *LocalMapper) nextChunkRaw() (*MapperOutput, error) { +func (lm *LocalMapper) nextChunkRaw() (interface{}, error) { var output *MapperOutput for { if lm.currCursorIndex == len(lm.cursors) { @@ -335,7 +338,7 @@ func (lm *LocalMapper) nextChunkRaw() (*MapperOutput, error) { // for the current tagset. Tagsets are always processed in the same order as that // returned by AvailTagsSets(). When there is no more data for any tagset nil // is returned. -func (lm *LocalMapper) nextChunkAgg() (*MapperOutput, error) { +func (lm *LocalMapper) nextChunkAgg() (interface{}, error) { var output *MapperOutput for { if lm.currCursorIndex == len(lm.cursors) {