Merge pull request #4833 from li-ang/fix_show_measurements

Fix SHOW MEASUREMENTS across the cluster
pull/4913/head
Philip O'Toole 2015-11-25 10:47:44 -08:00
commit bce68ed43d
1 changed files with 32 additions and 15 deletions

View File

@ -57,15 +57,20 @@ func (e *ShowMeasurementsExecutor) Execute(closing <-chan struct{}) <-chan *mode
continue continue
} }
// Convert the mapper chunk to a string array of measurement names. // Convert the mapper chunk to MapperOutput type.
mms, ok := c.([]string) mop, ok := c.(*MapperOutput)
if !ok { if !ok {
out <- &models.Row{Err: fmt.Errorf("show measurements mapper returned invalid type: %T", c)} out <- &models.Row{Err: fmt.Errorf("show measurements mapper returned invalid type: %T", c)}
return return
} }
// Add the measurement names to the set. // Add the measurement names to the set.
for _, mm := range mms { for _, mv := range mop.Values {
mm, ok := mv.Value.(string)
if !ok {
out <- &models.Row{Err: fmt.Errorf("show measurements mapper returned invalid type: %T", mop)}
return
}
set[mm] = struct{}{} set[mm] = struct{}{}
} }
} }
@ -215,21 +220,25 @@ func (m *ShowMeasurementsMapper) NextChunk() (interface{}, error) {
return nil, nil return nil, nil
} }
names := []string{} mop := &MapperOutput{
if err := json.Unmarshal(b.([]byte), &names); err != nil { Name: "measurements",
return nil, err Fields: []string{"name"},
} else if len(names) == 0 { Values: make([]*MapperValue, 0),
// Mapper on other node sent 0 values so it's done.
return nil, nil
} }
return names, nil
if err := json.Unmarshal(b.([]byte), &mop); err != nil {
return nil, err
}
return mop, nil
} }
return m.nextChunk() return m.nextChunk()
} }
// nextChunk implements next chunk logic for a local shard. // nextChunk implements next chunk logic for a local shard.
func (m *ShowMeasurementsMapper) nextChunk() (interface{}, error) { func (m *ShowMeasurementsMapper) nextChunk() (interface{}, error) {
// Allocate array to hold measurement names. var output *MapperOutput
names := make([]string, 0, m.ChunkSize) names := make([]string, 0, m.ChunkSize)
// Get the channel of measurement names from the state. // Get the channel of measurement names from the state.
@ -242,12 +251,20 @@ func (m *ShowMeasurementsMapper) nextChunk() (interface{}, error) {
break break
} }
} }
// See if we've read all the names.
if len(names) == 0 { output = &MapperOutput{
return nil, nil Name: "measurements",
Fields: []string{"name"},
Values: make([]*MapperValue, 0, len(names)),
} }
return names, nil for _, v := range names {
output.Values = append(output.Values, &MapperValue{
Value: v,
})
}
return output, nil
} }
// Close closes the mapper. // Close closes the mapper.