Combine values across mappers before emit

pull/2142/head
Philip O'Toole 2015-04-02 14:59:10 -07:00
parent 8661155a7d
commit e4cf36fe29
1 changed files with 23 additions and 18 deletions

View File

@ -209,13 +209,11 @@ func (m *MapReduceJob) processRawQuery(out chan *Row, filterEmptyResults bool) {
// markers for which mappers have been completely emptied
mapperComplete := make([]bool, len(m.Mappers))
// we need to make sure that we send at least one row, even for queries with empty results
oneRowSent := false
// for limit and offset we need to track how many values we've swalloed for the offset and how many we've already set for the limit.
// we track the number set for the limit because they could be getting chunks. For instance if your limit is 10k, but chunk size is 1k
valuesSent := 0
valuesOffset := 0
valuesToReturn := make([]*rawQueryMapOutput, 0)
// loop until we've emptied out all the mappers and sent everything out
for {
@ -277,10 +275,7 @@ func (m *MapReduceJob) processRawQuery(out chan *Row, filterEmptyResults bool) {
// if we didn't pull out any values, we're done here
if values == nil {
if !oneRowSent && !filterEmptyResults {
out <- m.processRawResults(nil)
}
return
break
}
// sort the values by time first so we can then handle offset and limit
@ -308,24 +303,34 @@ func (m *MapReduceJob) processRawQuery(out chan *Row, filterEmptyResults bool) {
}
valuesSent += len(values)
}
valuesToReturn = append(valuesToReturn, values...)
// convert the raw results into rows
row := m.processRawResults(values)
if filterEmptyResults && m.resultsEmpty(row.Values) {
return
// hit the chunk size? Send out what has been accumulated, but keep
// processing.
if len(valuesToReturn) >= m.chunkSize {
row := m.processRawResults(valuesToReturn)
// perform post-processing, such as math.
row.Values = m.processResults(row.Values)
out <- row
valuesToReturn = make([]*rawQueryMapOutput, 0)
}
// do any post processing like math and stuff
row.Values = m.processResults(row.Values)
oneRowSent = true
out <- row
// stop processing if we've hit the limit
if m.stmt.Limit != 0 && valuesSent >= m.stmt.Limit {
return
break
}
}
if len(valuesToReturn) == 0 {
if !filterEmptyResults {
out <- m.processRawResults(nil)
}
} else {
row := m.processRawResults(valuesToReturn)
// perform post-processing, such as math.
row.Values = m.processResults(row.Values)
out <- row
}
}
// processsResults will apply any math that was specified in the select statement against the passed in results