influxdb/remote_mapper.go

204 lines
5.1 KiB
Go

package influxdb
import (
"bytes"
"encoding/json"
"errors"
"io"
"net/http"
"github.com/influxdb/influxdb/influxql"
)
const (
MAX_MAP_RESPONSE_SIZE = 1024 * 1024 * 1024
)
// RemoteMapper implements the influxql.Mapper interface. The engine uses the remote mapper
// to pull map results from shards that only exist on other servers in the cluster.
type RemoteMapper struct {
dataNodes Balancer
resp *http.Response
results chan interface{}
unmarshal influxql.UnmarshalFunc
complete bool
decoder *json.Decoder
Call string `json:",omitempty"`
Database string `json:",omitempty"`
MeasurementName string `json:",omitempty"`
TMin int64 `json:",omitempty"`
TMax int64 `json:",omitempty"`
SeriesIDs []uint64 `json:",omitempty"`
ShardID uint64 `json:",omitempty"`
Filters []string `json:",omitempty"`
WhereFields []*Field `json:",omitempty"`
SelectFields []*Field `json:",omitempty"`
SelectTags []string `json:",omitempty"`
Limit int `json:",omitempty"`
Offset int `json:",omitempty"`
Interval int64 `json:",omitempty"`
ChunkSize int `json:",omitempty"`
}
// Responses get streamed back to the remote mapper from the remote machine that runs a local mapper
type MapResponse struct {
Err string `json:",omitempty"`
Data []byte
Completed bool `json:",omitempty"`
}
// Open is a no op, real work is done starting with Being
func (m *RemoteMapper) Open() error { return nil }
// Close the response body
func (m *RemoteMapper) Close() {
if m.resp != nil && m.resp.Body != nil {
m.resp.Body.Close()
}
}
// Begin sends a request to the remote server to start streaming map results
func (m *RemoteMapper) Begin(c *influxql.Call, startingTime int64, chunkSize int) error {
// get the function for unmarshaling results
f, err := influxql.InitializeUnmarshaller(c)
if err != nil {
return err
}
m.unmarshal = f
if c != nil {
m.Call = c.String()
}
m.ChunkSize = chunkSize
m.TMin = startingTime
// send the request to map to the remote server
b, err := json.Marshal(m)
if err != nil {
return err
}
var resp *http.Response
for {
node := m.dataNodes.Next()
if node == nil {
// no data nodes are available to service this query
return ErrNoDataNodeAvailable
}
// request to start streaming results
resp, err = http.Post(node.URL.String()+"/data/run_mapper", "application/json", bytes.NewReader(b))
if err != nil {
node.Down()
continue
}
// Mark the node as up
node.Up()
break
}
m.resp = resp
lr := io.LimitReader(m.resp.Body, MAX_MAP_RESPONSE_SIZE)
m.decoder = json.NewDecoder(lr)
return nil
}
// NextInterval is part of the mapper interface. In this case we read the next chunk from the remote mapper
func (m *RemoteMapper) NextInterval() (interface{}, error) {
// just return nil if the mapper has completed its run
if m.complete {
return nil, nil
}
mr := &MapResponse{}
err := m.decoder.Decode(&mr)
if err != nil {
return nil, err
}
if mr.Err != "" {
return nil, errors.New(mr.Err)
}
// if it's a complete message, we've emptied this mapper of all data
if mr.Completed {
m.complete = true
return nil, nil
}
// marshal the data that came from the MapFN
v, err := m.unmarshal(mr.Data)
if err != nil {
return nil, err
}
return v, nil
}
// CallExpr will parse the Call string into an expression or return nil
func (m *RemoteMapper) CallExpr() (*influxql.Call, error) {
if m.Call == "" {
return nil, nil
}
c, err := influxql.ParseExpr(m.Call)
if err != nil {
return nil, err
}
call, ok := c.(*influxql.Call)
if !ok {
return nil, errors.New("unable to marshal aggregate call")
}
return call, nil
}
// FilterExprs will parse the filter strings and return any expressions. This array
// will be the same size as the SeriesIDs array with each element having a filter (which could be nil)
func (m *RemoteMapper) FilterExprs() []influxql.Expr {
exprs := make([]influxql.Expr, len(m.SeriesIDs), len(m.SeriesIDs))
// if filters is empty, they're all nil. if filters has one element, all filters
// should be set to that. Otherwise marshal each filter
if len(m.Filters) == 1 {
f, _ := influxql.ParseExpr(m.Filters[0])
for i, _ := range exprs {
exprs[i] = f
}
} else if len(m.Filters) > 1 {
for i, s := range m.Filters {
f, _ := influxql.ParseExpr(s)
exprs[i] = f
}
}
return exprs
}
// SetFilters will convert the given arrray of filters into filters that can be marshaled and sent to the remote system
func (m *RemoteMapper) SetFilters(filters []influxql.Expr) {
l := filters[0]
allFiltersTheSame := true
for _, f := range filters {
if l != f {
allFiltersTheSame = false
break
}
}
// we don't need anything if they're all the same and nil
if l == nil && allFiltersTheSame {
return
} else if allFiltersTheSame { // just set one filter element since they're all the same
m.Filters = []string{l.String()}
return
}
// marshal all of them since there are different ones
m.Filters = make([]string, len(filters), len(filters))
for i, f := range filters {
m.Filters[i] = f.String()
}
}