Remove the interval setting from NextInterval to make remote mappers work.
parent
6e8ea9ae91
commit
d41b85a715
|
@ -753,7 +753,7 @@ func (h *Handler) serveRunMapper(w http.ResponseWriter, r *http.Request) {
|
||||||
// write results to the client until the next interval is empty
|
// write results to the client until the next interval is empty
|
||||||
for {
|
for {
|
||||||
fmt.Println("start interval")
|
fmt.Println("start interval")
|
||||||
v, err := lm.NextInterval(m.TMax)
|
v, err := lm.NextInterval()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
mapError(w, err)
|
mapError(w, err)
|
||||||
return
|
return
|
||||||
|
@ -763,7 +763,7 @@ func (h *Handler) serveRunMapper(w http.ResponseWriter, r *http.Request) {
|
||||||
// see if we're done
|
// see if we're done
|
||||||
if v == nil {
|
if v == nil {
|
||||||
fmt.Println("DONE")
|
fmt.Println("DONE")
|
||||||
return
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
// marshal and write out
|
// marshal and write out
|
||||||
|
@ -782,6 +782,14 @@ func (h *Handler) serveRunMapper(w http.ResponseWriter, r *http.Request) {
|
||||||
w.Write(b)
|
w.Write(b)
|
||||||
w.(http.Flusher).Flush()
|
w.(http.Flusher).Flush()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
d, err := json.Marshal(&influxdb.MapResponse{Completed: true})
|
||||||
|
if err != nil {
|
||||||
|
mapError(w, err)
|
||||||
|
} else {
|
||||||
|
w.Write(d)
|
||||||
|
w.(http.Flusher).Flush()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type dataNodeJSON struct {
|
type dataNodeJSON struct {
|
||||||
|
@ -809,6 +817,7 @@ func isFieldNotFoundError(err error) bool {
|
||||||
|
|
||||||
// mapError writes an error result after trying to start a mapper
|
// mapError writes an error result after trying to start a mapper
|
||||||
func mapError(w http.ResponseWriter, err error) {
|
func mapError(w http.ResponseWriter, err error) {
|
||||||
|
fmt.Println("mapError: ", err.Error())
|
||||||
b, _ := json.Marshal(&influxdb.MapResponse{Err: err.Error()})
|
b, _ := json.Marshal(&influxdb.MapResponse{Err: err.Error()})
|
||||||
w.Write(b)
|
w.Write(b)
|
||||||
}
|
}
|
||||||
|
|
|
@ -234,7 +234,7 @@ func (m *MapReduceJob) processRawQuery(out chan *Row, filterEmptyResults bool) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
res, err := mm.NextInterval(m.TMax)
|
res, err := mm.NextInterval()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
out <- &Row{Err: err}
|
out <- &Row{Err: err}
|
||||||
return
|
return
|
||||||
|
@ -614,19 +614,11 @@ func (m *MapReduceJob) processAggregate(c *Call, reduceFunc ReduceFunc, resultVa
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// the first interval in a query with a group by may be smaller than the others. This happens when they have a
|
|
||||||
// where time > clause that is in the middle of the bucket that the group by time creates
|
|
||||||
firstInterval := (m.TMin/m.interval*m.interval + m.interval) - m.TMin
|
|
||||||
|
|
||||||
// populate the result values for each interval of time
|
// populate the result values for each interval of time
|
||||||
for i, _ := range resultValues {
|
for i, _ := range resultValues {
|
||||||
// collect the results from each mapper
|
// collect the results from each mapper
|
||||||
for j, mm := range m.Mappers {
|
for j, mm := range m.Mappers {
|
||||||
interval := m.interval
|
res, err := mm.NextInterval()
|
||||||
if i == 0 {
|
|
||||||
interval = firstInterval
|
|
||||||
}
|
|
||||||
res, err := mm.NextInterval(interval)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -661,10 +653,8 @@ type Mapper interface {
|
||||||
|
|
||||||
// NextInterval will get the time ordered next interval of the given interval size from the mapper. This is a
|
// NextInterval will get the time ordered next interval of the given interval size from the mapper. This is a
|
||||||
// forward only operation from the start time passed into Begin. Will return nil when there is no more data to be read.
|
// forward only operation from the start time passed into Begin. Will return nil when there is no more data to be read.
|
||||||
// We pass the interval in here so that it can be varied over the period of the query. This is useful for queries that
|
// Interval periods can be different based on time boundaries (months, daylight savings, etc) of the query.
|
||||||
// must respect natural time boundaries like months or queries that span daylight savings time borders. Note that if
|
NextInterval() (interface{}, error)
|
||||||
// a limit is set on the mapper, the interval passed here should represent the MaxTime in a nano epoch.
|
|
||||||
NextInterval(interval int64) (interface{}, error)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type TagSet struct {
|
type TagSet struct {
|
||||||
|
|
|
@ -21,6 +21,7 @@ type RemoteMapper struct {
|
||||||
resp *http.Response
|
resp *http.Response
|
||||||
results chan interface{}
|
results chan interface{}
|
||||||
unmarshal influxql.UnmarshalFunc
|
unmarshal influxql.UnmarshalFunc
|
||||||
|
complete bool
|
||||||
|
|
||||||
Call string
|
Call string
|
||||||
Database string
|
Database string
|
||||||
|
@ -34,11 +35,13 @@ type RemoteMapper struct {
|
||||||
SelectFields []*Field
|
SelectFields []*Field
|
||||||
SelectTags []string
|
SelectTags []string
|
||||||
Limit int
|
Limit int
|
||||||
|
Interval int64
|
||||||
}
|
}
|
||||||
|
|
||||||
type MapResponse struct {
|
type MapResponse struct {
|
||||||
Err string
|
Err string `json:",omitempty"`
|
||||||
Data []byte
|
Data []byte
|
||||||
|
Completed bool `json:",omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// Open is a no op, real work is done starting with Being
|
// Open is a no op, real work is done starting with Being
|
||||||
|
@ -80,10 +83,15 @@ func (m *RemoteMapper) Begin(c *influxql.Call, startingTime int64, limit int) er
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *RemoteMapper) NextInterval(interval int64) (interface{}, error) {
|
func (m *RemoteMapper) NextInterval() (interface{}, error) {
|
||||||
|
if m.complete {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
chunk := make([]byte, MAX_MAP_RESPONSE_SIZE, MAX_MAP_RESPONSE_SIZE)
|
chunk := make([]byte, MAX_MAP_RESPONSE_SIZE, MAX_MAP_RESPONSE_SIZE)
|
||||||
n, err := m.resp.Body.Read(chunk)
|
n, err := m.resp.Body.Read(chunk)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
warn("NextInterval err:", n, err.Error())
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if n == 0 {
|
if n == 0 {
|
||||||
|
@ -98,6 +106,10 @@ func (m *RemoteMapper) NextInterval(interval int64) (interface{}, error) {
|
||||||
if mr.Err != "" {
|
if mr.Err != "" {
|
||||||
return nil, errors.New(mr.Err)
|
return nil, errors.New(mr.Err)
|
||||||
}
|
}
|
||||||
|
if mr.Completed {
|
||||||
|
m.complete = true
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
v, err := m.unmarshal(mr.Data)
|
v, err := m.unmarshal(mr.Data)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|
|
@ -3133,6 +3133,7 @@ func (s *Server) StartLocalMapper(rm *RemoteMapper) (*LocalMapper, error) {
|
||||||
whereFields: rm.WhereFields,
|
whereFields: rm.WhereFields,
|
||||||
selectFields: rm.SelectFields,
|
selectFields: rm.SelectFields,
|
||||||
selectTags: rm.SelectTags,
|
selectTags: rm.SelectTags,
|
||||||
|
interval: rm.Interval,
|
||||||
}
|
}
|
||||||
|
|
||||||
return lm, nil
|
return lm, nil
|
||||||
|
|
38
tx.go
38
tx.go
|
@ -114,6 +114,14 @@ func (tx *tx) CreateMapReduceJobs(stmt *influxql.SelectStatement, tagKeys []stri
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// get the group by interval, if there is one
|
||||||
|
var interval int64
|
||||||
|
if d, err := stmt.GroupByInterval(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
} else {
|
||||||
|
interval = d.Nanoseconds()
|
||||||
|
}
|
||||||
|
|
||||||
// get the sorted unique tag sets for this query.
|
// get the sorted unique tag sets for this query.
|
||||||
tagSets := m.tagSets(stmt, tagKeys)
|
tagSets := m.tagSets(stmt, tagKeys)
|
||||||
|
|
||||||
|
@ -159,7 +167,8 @@ func (tx *tx) CreateMapReduceJobs(stmt *influxql.SelectStatement, tagKeys []stri
|
||||||
WhereFields: whereFields,
|
WhereFields: whereFields,
|
||||||
SelectFields: selectFields,
|
SelectFields: selectFields,
|
||||||
SelectTags: selectTags,
|
SelectTags: selectTags,
|
||||||
Limit: stmt.Limit,
|
Limit: stmt.Limit + stmt.Offset,
|
||||||
|
Interval: interval,
|
||||||
}
|
}
|
||||||
mapper.(*RemoteMapper).SetFilters(t.Filters)
|
mapper.(*RemoteMapper).SetFilters(t.Filters)
|
||||||
} else {
|
} else {
|
||||||
|
@ -172,6 +181,8 @@ func (tx *tx) CreateMapReduceJobs(stmt *influxql.SelectStatement, tagKeys []stri
|
||||||
whereFields: whereFields,
|
whereFields: whereFields,
|
||||||
selectFields: selectFields,
|
selectFields: selectFields,
|
||||||
selectTags: selectTags,
|
selectTags: selectTags,
|
||||||
|
tmax: tmax.UnixNano(),
|
||||||
|
interval: interval,
|
||||||
// multiple mappers may need to be merged together to get the results
|
// multiple mappers may need to be merged together to get the results
|
||||||
// for a raw query. So each mapper will have to read at least the
|
// for a raw query. So each mapper will have to read at least the
|
||||||
// limit plus the offset in data points to ensure we've hit our mark
|
// limit plus the offset in data points to ensure we've hit our mark
|
||||||
|
@ -253,6 +264,7 @@ type LocalMapper struct {
|
||||||
selectFields []*Field // field names that occur in the select clause
|
selectFields []*Field // field names that occur in the select clause
|
||||||
selectTags []string // tag keys that occur in the select clause
|
selectTags []string // tag keys that occur in the select clause
|
||||||
isRaw bool // if the query is a non-aggregate query
|
isRaw bool // if the query is a non-aggregate query
|
||||||
|
interval int64 // the group by interval of the query, if any
|
||||||
limit uint64 // used for raw queries for LIMIT
|
limit uint64 // used for raw queries for LIMIT
|
||||||
perIntervalLimit int // used for raw queries to determine how far into a chunk we are
|
perIntervalLimit int // used for raw queries to determine how far into a chunk we are
|
||||||
chunkSize int // used for raw queries to determine how much data to read before flushing to client
|
chunkSize int // used for raw queries to determine how much data to read before flushing to client
|
||||||
|
@ -297,8 +309,8 @@ func (l *LocalMapper) Begin(c *influxql.Call, startingTime int64, chunkSize int)
|
||||||
l.mapFunc = mapFunc
|
l.mapFunc = mapFunc
|
||||||
l.keyBuffer = make([]int64, len(l.cursors))
|
l.keyBuffer = make([]int64, len(l.cursors))
|
||||||
l.valueBuffer = make([][]byte, len(l.cursors))
|
l.valueBuffer = make([][]byte, len(l.cursors))
|
||||||
l.tmin = startingTime
|
|
||||||
l.chunkSize = chunkSize
|
l.chunkSize = chunkSize
|
||||||
|
l.tmin = startingTime
|
||||||
|
|
||||||
// determine if this is a raw data query with a single field, multiple fields, or an aggregate
|
// determine if this is a raw data query with a single field, multiple fields, or an aggregate
|
||||||
var fieldName string
|
var fieldName string
|
||||||
|
@ -355,18 +367,28 @@ func (l *LocalMapper) Begin(c *influxql.Call, startingTime int64, chunkSize int)
|
||||||
// NextInterval will get the time ordered next interval of the given interval size from the mapper. This is a
|
// NextInterval will get the time ordered next interval of the given interval size from the mapper. This is a
|
||||||
// forward only operation from the start time passed into Begin. Will return nil when there is no more data to be read.
|
// forward only operation from the start time passed into Begin. Will return nil when there is no more data to be read.
|
||||||
// If this is a raw query, interval should be the max time to hit in the query
|
// If this is a raw query, interval should be the max time to hit in the query
|
||||||
func (l *LocalMapper) NextInterval(interval int64) (interface{}, error) {
|
func (l *LocalMapper) NextInterval() (interface{}, error) {
|
||||||
if l.cursorsEmpty || l.tmin > l.job.TMax {
|
if l.cursorsEmpty || l.tmin > l.job.TMax {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// after we call to the mapper, this will be the tmin for the next interval.
|
||||||
|
nextMin := l.tmin + l.interval
|
||||||
|
|
||||||
// Set the upper bound of the interval.
|
// Set the upper bound of the interval.
|
||||||
if l.isRaw {
|
if l.isRaw {
|
||||||
l.tmax = interval
|
|
||||||
l.perIntervalLimit = l.chunkSize
|
l.perIntervalLimit = l.chunkSize
|
||||||
} else if interval > 0 {
|
} else if l.interval > 0 {
|
||||||
// Make sure the bottom of the interval lands on a natural boundary.
|
// Set tmax to ensure that the interval lands on the boundary of the interval
|
||||||
l.tmax = l.tmin + interval - 1
|
if l.tmin%l.interval != 0 {
|
||||||
|
// the first interval in a query with a group by may be smaller than the others. This happens when they have a
|
||||||
|
// where time > clause that is in the middle of the bucket that the group by time creates. That will be the
|
||||||
|
// case on the first interval when the tmin % the interval isn't equal to zero
|
||||||
|
nextMin = l.tmin/l.interval*l.interval + l.interval
|
||||||
|
l.tmax = nextMin - 1
|
||||||
|
} else {
|
||||||
|
l.tmax = l.tmin + l.interval - 1
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Execute the map function. This local mapper acts as the iterator
|
// Execute the map function. This local mapper acts as the iterator
|
||||||
|
@ -383,7 +405,7 @@ func (l *LocalMapper) NextInterval(interval int64) (interface{}, error) {
|
||||||
|
|
||||||
// Move the interval forward if it's not a raw query. For raw queries we use the limit to advance intervals.
|
// Move the interval forward if it's not a raw query. For raw queries we use the limit to advance intervals.
|
||||||
if !l.isRaw {
|
if !l.isRaw {
|
||||||
l.tmin += interval
|
l.tmin = nextMin
|
||||||
}
|
}
|
||||||
|
|
||||||
return val, nil
|
return val, nil
|
||||||
|
|
Loading…
Reference in New Issue