return interface{} from nextChunk* functions
parent
c8f88f9a61
commit
5d26cfa4d7
|
@ -260,6 +260,9 @@ func (s *Service) processMapShardRequest(w io.Writer, buf []byte) error {
|
|||
return fmt.Errorf("next chunk: %s", err)
|
||||
}
|
||||
|
||||
// NOTE: Even if the chunk is nil, we still need to send one
|
||||
// empty response to let the other side know we're out of data.
|
||||
|
||||
if chunk != nil {
|
||||
b, err := json.Marshal(chunk)
|
||||
if err != nil {
|
||||
|
@ -270,12 +273,11 @@ func (s *Service) processMapShardRequest(w io.Writer, buf []byte) error {
|
|||
|
||||
// Write to connection.
|
||||
resp.SetCode(0)
|
||||
v := chunk.(*tsdb.MapperOutput)
|
||||
if err := writeMapShardResponseMessage(w, &resp); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if v == nil {
|
||||
if chunk == nil {
|
||||
// All mapper data sent.
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -271,7 +271,10 @@ func (lm *LocalMapper) NextChunk() (interface{}, error) {
|
|||
b, err := lm.remote.NextChunk()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
} else if b == nil {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
mo := &MapperOutput{}
|
||||
if err := json.Unmarshal(b.([]byte), mo); err != nil {
|
||||
return nil, err
|
||||
|
@ -293,7 +296,7 @@ func (lm *LocalMapper) NextChunk() (interface{}, error) {
|
|||
// nextChunkRaw returns the next chunk of data. Data comes in the same order as the
|
||||
// tags return by TagSets. A chunk never contains data for more than 1 tagset.
|
||||
// If there is no more data for any tagset, nil will be returned.
|
||||
func (lm *LocalMapper) nextChunkRaw() (*MapperOutput, error) {
|
||||
func (lm *LocalMapper) nextChunkRaw() (interface{}, error) {
|
||||
var output *MapperOutput
|
||||
for {
|
||||
if lm.currCursorIndex == len(lm.cursors) {
|
||||
|
@ -335,7 +338,7 @@ func (lm *LocalMapper) nextChunkRaw() (*MapperOutput, error) {
|
|||
// for the current tagset. Tagsets are always processed in the same order as that
|
||||
// returned by AvailTagsSets(). When there is no more data for any tagset nil
|
||||
// is returned.
|
||||
func (lm *LocalMapper) nextChunkAgg() (*MapperOutput, error) {
|
||||
func (lm *LocalMapper) nextChunkAgg() (interface{}, error) {
|
||||
var output *MapperOutput
|
||||
for {
|
||||
if lm.currCursorIndex == len(lm.cursors) {
|
||||
|
|
Loading…
Reference in New Issue