Pass series IDs for shards in DQ instead of all series IDs
parent
e3fcdd2d01
commit
9268ddfefc
6
tx.go
6
tx.go
|
@ -149,7 +149,7 @@ func (tx *tx) CreateMapReduceJobs(stmt *influxql.SelectStatement, tagKeys []stri
|
|||
shards[shard] = append(shards[shard], sid)
|
||||
}
|
||||
|
||||
for shard, _ := range shards {
|
||||
for shard, sids := range shards {
|
||||
var mapper influxql.Mapper
|
||||
|
||||
// create either a remote or local mapper for this shard
|
||||
|
@ -167,7 +167,7 @@ func (tx *tx) CreateMapReduceJobs(stmt *influxql.SelectStatement, tagKeys []stri
|
|||
MeasurementName: m.Name,
|
||||
TMin: tmin.UnixNano(),
|
||||
TMax: tmax.UnixNano(),
|
||||
SeriesIDs: t.SeriesIDs,
|
||||
SeriesIDs: sids,
|
||||
ShardID: shard.ID,
|
||||
WhereFields: whereFields,
|
||||
SelectFields: selectFields,
|
||||
|
@ -179,7 +179,7 @@ func (tx *tx) CreateMapReduceJobs(stmt *influxql.SelectStatement, tagKeys []stri
|
|||
mapper.(*RemoteMapper).SetFilters(t.Filters)
|
||||
} else {
|
||||
mapper = &LocalMapper{
|
||||
seriesIDs: t.SeriesIDs,
|
||||
seriesIDs: sids,
|
||||
db: shard.store,
|
||||
job: job,
|
||||
decoder: NewFieldCodec(m),
|
||||
|
|
Loading…
Reference in New Issue