package influxql import ( "bytes" "errors" "hash/fnv" "math" "sort" "time" ) // DB represents an interface for creating transactions. type DB interface { Begin() (Tx, error) } const ( // Return an error if the user is trying to select more than this number of points in a group by statement. // Most likely they specified a group by interval without time boundaries. MaxGroupByPoints = 100000 // Since time is always selected, the column count when selecting only a single other value will be 2 SelectColumnCountWithOneValue = 2 ) // Tx represents a transaction. // The Tx must be opened before being used. type Tx interface { // Create MapReduceJobs for the given select statement. One MRJob will be created per unique tagset that matches the query CreateMapReduceJobs(stmt *SelectStatement, tagKeys []string) ([]*MapReduceJob, error) } type MapReduceJob struct { MeasurementName string TagSet *TagSet Mappers []Mapper // the mappers to hit all shards for this MRJob TMin int64 // minimum time specified in the query TMax int64 // maximum time specified in the query key []byte // a key that identifies the MRJob so it can be sorted interval int64 // the group by interval of the query stmt *SelectStatement // the select statement this job was created for chunkSize int // the number of points to buffer in raw queries before returning a chunked response } func (m *MapReduceJob) Open() error { for _, mm := range m.Mappers { if err := mm.Open(); err != nil { m.Close() return err } } return nil } func (m *MapReduceJob) Close() { for _, mm := range m.Mappers { mm.Close() } } func (m *MapReduceJob) Key() []byte { if m.key == nil { m.key = append([]byte(m.MeasurementName), m.TagSet.Key...) } return m.key } func (m *MapReduceJob) Execute(out chan *Row, filterEmptyResults bool) { // if it's a raw query we handle processing differently if m.stmt.IsRawQuery { m.processRawQuery(out, filterEmptyResults) return } // get the aggregates and the associated reduce functions aggregates := m.stmt.FunctionCalls() reduceFuncs := make([]ReduceFunc, len(aggregates)) for i, c := range aggregates { reduceFunc, err := InitializeReduceFunc(c) if err != nil { out <- &Row{Err: err} return } reduceFuncs[i] = reduceFunc } // we'll have a fixed number of points with timestamps in buckets. Initialize those times and a slice to hold the associated values var pointCountInResult int // if the user didn't specify a start time or a group by interval, we're returning a single point that describes the entire range if m.TMin == 0 || m.interval == 0 { // they want a single aggregate point for the entire time range m.interval = m.TMax - m.TMin pointCountInResult = 1 } else { intervalTop := m.TMax/m.interval*m.interval + m.interval intervalBottom := m.TMin / m.interval * m.interval pointCountInResult = int((intervalTop - intervalBottom) / m.interval) } // For group by time queries, limit the number of data points returned by the limit and offset // raw query limits are handled elsewhere if m.stmt.Limit > 0 || m.stmt.Offset > 0 { // ensure that the offset isn't higher than the number of points we'd get if m.stmt.Offset > pointCountInResult { return } // take the lesser of either the pre computed number of group by buckets that // will be in the result or the limit passed in by the user if m.stmt.Limit < pointCountInResult { pointCountInResult = m.stmt.Limit } } // If we are exceeding our MaxGroupByPoints and we aren't a raw query, error out if pointCountInResult > MaxGroupByPoints { out <- &Row{ Err: errors.New("too many points in the group by interval. maybe you forgot to specify a where time clause?"), } return } // initialize the times of the aggregate points resultValues := make([][]interface{}, pointCountInResult) // ensure that the start time for the results is on the start of the window startTimeBucket := m.TMin / m.interval * m.interval for i, _ := range resultValues { var t int64 if m.stmt.Offset > 0 { t = startTimeBucket + (int64(i+1) * m.interval * int64(m.stmt.Offset)) } else { t = startTimeBucket + (int64(i+1) * m.interval) - m.interval } // If we start getting out of our max time range, then truncate values and return if t > m.TMax { resultValues = resultValues[:i] break } // we always include time so we need one more column than we have aggregates vals := make([]interface{}, 0, len(aggregates)+1) resultValues[i] = append(vals, time.Unix(0, t).UTC()) } // This just makes sure that if they specify a start time less than what the start time would be with the offset, // we just reset the start time to the later time to avoid going over data that won't show up in the result. if m.stmt.Offset > 0 { m.TMin = resultValues[0][0].(time.Time).UnixNano() } // now loop through the aggregate functions and populate everything for i, c := range aggregates { if err := m.processAggregate(c, reduceFuncs[i], resultValues); err != nil { out <- &Row{ Name: m.MeasurementName, Tags: m.TagSet.Tags, Err: err, } return } } // filter out empty results if filterEmptyResults && m.resultsEmpty(resultValues) { return } // put together the row to return columnNames := make([]string, len(m.stmt.Fields)+1) columnNames[0] = "time" for i, f := range m.stmt.Fields { columnNames[i+1] = f.Name() } // processes the result values if there's any math in there resultValues = m.processResults(resultValues) // handle any fill options resultValues = m.processFill(resultValues) row := &Row{ Name: m.MeasurementName, Tags: m.TagSet.Tags, Columns: columnNames, Values: resultValues, } // and we out out <- row } // processRawQuery will handle running the mappers and then reducing their output // for queries that pull back raw data values without computing any kind of aggregates. func (m *MapReduceJob) processRawQuery(out chan *Row, filterEmptyResults bool) { // initialize the mappers for _, mm := range m.Mappers { if err := mm.Begin(nil, m.TMin); err != nil { out <- &Row{Err: err} return } } mapperOutputs := make([][]*rawQueryMapOutput, len(m.Mappers)) // markers for which mappers have been completely emptied mapperComplete := make([]bool, len(m.Mappers)) // for limit and offset we need to track how many values we've swalloed for the offset and how many we've already set for the limit. // we track the number set for the limit because they could be getting chunks. For instance if your limit is 10k, but chunk size is 1k valuesSent := 0 valuesOffset := 0 valuesToReturn := make([]*rawQueryMapOutput, 0) // loop until we've emptied out all the mappers and sent everything out for { // collect up to the limit for each mapper for j, mm := range m.Mappers { // only pull from mappers that potentially have more data and whose last output has been completely sent out. if mapperOutputs[j] != nil || mapperComplete[j] { continue } mm.SetLimit(m.chunkSize) res, err := mm.NextInterval(m.TMax) if err != nil { out <- &Row{Err: err} return } if res != nil { mapperOutputs[j] = res.([]*rawQueryMapOutput) } else { // if we got a nil from the mapper it means that we've emptied all data from it mapperComplete[j] = true } } // process the mapper outputs. we can send out everything up to the min of the last time in the mappers min := int64(math.MaxInt64) for _, o := range mapperOutputs { // some of the mappers could empty out before others so ignore them because they'll be nil if o == nil { continue } // find the min of the last point in each mapper t := o[len(o)-1].timestamp if t < min { min = t } } // now empty out all the mapper outputs up to the min time var values []*rawQueryMapOutput for j, o := range mapperOutputs { // find the index of the point up to the min ind := len(o) for i, mo := range o { if mo.timestamp > min { ind = i break } } // add up to the index to the values values = append(values, o[:ind]...) // if we emptied out all the values, set this output to nil so that the mapper will get run again on the next loop if ind == len(o) { mapperOutputs[j] = nil } } // if we didn't pull out any values, we're done here if values == nil { break } // sort the values by time first so we can then handle offset and limit sort.Sort(rawOutputs(values)) // get rid of any points that need to be offset if valuesOffset < m.stmt.Offset { offset := m.stmt.Offset - valuesOffset // if offset is bigger than the number of values we have, move to the next batch from the mappers if offset > len(values) { valuesOffset += len(values) continue } values = values[offset:] valuesOffset += offset } // ensure we don't send more than the limit if valuesSent < m.stmt.Limit { limit := m.stmt.Limit - valuesSent if len(values) > limit { values = values[:limit] } valuesSent += len(values) } valuesToReturn = append(valuesToReturn, values...) // hit the chunk size? Send out what has been accumulated, but keep // processing. if len(valuesToReturn) >= m.chunkSize { row := m.processRawResults(valuesToReturn) // perform post-processing, such as math. row.Values = m.processResults(row.Values) out <- row valuesToReturn = make([]*rawQueryMapOutput, 0) } // stop processing if we've hit the limit if m.stmt.Limit != 0 && valuesSent >= m.stmt.Limit { break } } if len(valuesToReturn) == 0 { if !filterEmptyResults { out <- m.processRawResults(nil) } } else { row := m.processRawResults(valuesToReturn) // perform post-processing, such as math. row.Values = m.processResults(row.Values) out <- row } } // processsResults will apply any math that was specified in the select statement against the passed in results func (m *MapReduceJob) processResults(results [][]interface{}) [][]interface{} { hasMath := false for _, f := range m.stmt.Fields { if _, ok := f.Expr.(*BinaryExpr); ok { hasMath = true } else if _, ok := f.Expr.(*ParenExpr); ok { hasMath = true } } if !hasMath { return results } processors := make([]processor, len(m.stmt.Fields)) startIndex := 1 for i, f := range m.stmt.Fields { processors[i], startIndex = getProcessor(f.Expr, startIndex) } mathResults := make([][]interface{}, len(results)) for i, _ := range mathResults { mathResults[i] = make([]interface{}, len(m.stmt.Fields)+1) // put the time in mathResults[i][0] = results[i][0] for j, p := range processors { mathResults[i][j+1] = p(results[i]) } } return mathResults } // processFill will take the results and return new reaults (or the same if no fill modifications are needed) with whatever fill options the query has. func (m *MapReduceJob) processFill(results [][]interface{}) [][]interface{} { // don't do anything if we're supposed to leave the nulls if m.stmt.Fill == NullFill { return results } if m.stmt.Fill == NoFill { // remove any rows that have even one nil value. This one is tricky because they could have multiple // aggregates, but this option means that any row that has even one nil gets purged. newResults := make([][]interface{}, 0, len(results)) for _, vals := range results { hasNil := false // start at 1 because the first value is always time for j := 1; j < len(vals); j++ { if vals[j] == nil { hasNil = true break } } if !hasNil { newResults = append(newResults, vals) } } return newResults } // they're either filling with previous values or a specific number for i, vals := range results { // start at 1 because the first value is always time for j := 1; j < len(vals); j++ { if vals[j] == nil { switch m.stmt.Fill { case PreviousFill: if i != 0 { vals[j] = results[i-1][j] } case NumberFill: vals[j] = m.stmt.FillValue } } } } return results } func getProcessor(expr Expr, startIndex int) (processor, int) { switch expr := expr.(type) { case *VarRef: return newEchoProcessor(startIndex), startIndex + 1 case *Call: return newEchoProcessor(startIndex), startIndex + 1 case *BinaryExpr: return getBinaryProcessor(expr, startIndex) case *ParenExpr: return getProcessor(expr.Expr, startIndex) case *NumberLiteral: return newLiteralProcessor(expr.Val), startIndex case *StringLiteral: return newLiteralProcessor(expr.Val), startIndex case *BooleanLiteral: return newLiteralProcessor(expr.Val), startIndex case *TimeLiteral: return newLiteralProcessor(expr.Val), startIndex case *DurationLiteral: return newLiteralProcessor(expr.Val), startIndex } panic("unreachable") } type processor func(values []interface{}) interface{} func newEchoProcessor(index int) processor { return func(values []interface{}) interface{} { return values[index] } } func newLiteralProcessor(val interface{}) processor { return func(values []interface{}) interface{} { return val } } func getBinaryProcessor(expr *BinaryExpr, startIndex int) (processor, int) { lhs, index := getProcessor(expr.LHS, startIndex) rhs, index := getProcessor(expr.RHS, index) return newBinaryExprEvaluator(expr.Op, lhs, rhs), index } func newBinaryExprEvaluator(op Token, lhs, rhs processor) processor { switch op { case ADD: return func(values []interface{}) interface{} { l := lhs(values) r := rhs(values) if lv, ok := l.(float64); ok { if rv, ok := r.(float64); ok { if rv != 0 { return lv + rv } } } return nil } case SUB: return func(values []interface{}) interface{} { l := lhs(values) r := rhs(values) if lv, ok := l.(float64); ok { if rv, ok := r.(float64); ok { if rv != 0 { return lv - rv } } } return nil } case MUL: return func(values []interface{}) interface{} { l := lhs(values) r := rhs(values) if lv, ok := l.(float64); ok { if rv, ok := r.(float64); ok { if rv != 0 { return lv * rv } } } return nil } case DIV: return func(values []interface{}) interface{} { l := lhs(values) r := rhs(values) if lv, ok := l.(float64); ok { if rv, ok := r.(float64); ok { if rv != 0 { return lv / rv } } } return nil } default: // we shouldn't get here, but give them back nils if it goes this way return func(values []interface{}) interface{} { return nil } } } // resultsEmpty will return true if the all the result values are empty or contain only nulls func (m *MapReduceJob) resultsEmpty(resultValues [][]interface{}) bool { for _, vals := range resultValues { // start the loop at 1 because we want to skip over the time value for i := 1; i < len(vals); i++ { if vals[i] != nil { return false } } } return true } // processRawResults will handle converting the reduce results from a raw query into a Row func (m *MapReduceJob) processRawResults(values []*rawQueryMapOutput) *Row { selectNames := m.stmt.NamesInSelect() // ensure that time is in the select names and in the first position hasTime := false for i, n := range selectNames { if n == "time" { if i != 0 { tmp := selectNames[0] selectNames[0] = "time" selectNames[i] = tmp } hasTime = true } } // time should always be in the list of names they get back if !hasTime { selectNames = append([]string{"time"}, selectNames...) } // if they've selected only a single value we have to handle things a little differently singleValue := len(selectNames) == SelectColumnCountWithOneValue row := &Row{ Name: m.MeasurementName, Tags: m.TagSet.Tags, Columns: selectNames, } // return an empty row if there are no results if len(values) == 0 { return row } // the results will have all of the raw mapper results, convert into the row for _, v := range values { vals := make([]interface{}, len(selectNames)) if singleValue { vals[0] = time.Unix(0, v.timestamp).UTC() vals[1] = v.values.(interface{}) } else { fields := v.values.(map[string]interface{}) // time is always the first value vals[0] = time.Unix(0, v.timestamp).UTC() // populate the other values for i := 1; i < len(selectNames); i++ { vals[i] = fields[selectNames[i]] } } row.Values = append(row.Values, vals) } return row } func (m *MapReduceJob) processAggregate(c *Call, reduceFunc ReduceFunc, resultValues [][]interface{}) error { mapperOutputs := make([]interface{}, len(m.Mappers)) // intialize the mappers for _, mm := range m.Mappers { if err := mm.Begin(c, m.TMin); err != nil { return err } } // the first interval in a query with a group by may be smaller than the others. This happens when they have a // where time > clause that is in the middle of the bucket that the group by time creates firstInterval := (m.TMin/m.interval*m.interval + m.interval) - m.TMin // populate the result values for each interval of time for i, _ := range resultValues { // collect the results from each mapper for j, mm := range m.Mappers { interval := m.interval if i == 0 { interval = firstInterval } res, err := mm.NextInterval(interval) if err != nil { return err } mapperOutputs[j] = res } resultValues[i] = append(resultValues[i], reduceFunc(mapperOutputs)) } return nil } type MapReduceJobs []*MapReduceJob func (a MapReduceJobs) Len() int { return len(a) } func (a MapReduceJobs) Less(i, j int) bool { return bytes.Compare(a[i].Key(), a[j].Key()) == -1 } func (a MapReduceJobs) Swap(i, j int) { a[i], a[j] = a[j], a[i] } // Mapper will run through a map function. A single mapper will be created // for each shard for each tagset that must be hit to satisfy a query. // Mappers can either point to a local shard or could point to a remote server. type Mapper interface { // Open will open the necessary resources to being the map job. Could be connections to remote servers or // hitting the local bolt store Open() error // Close will close the mapper (either the bolt transaction or the request) Close() // Begin will set up the mapper to run the map function for a given aggregate call starting at the passed in time Begin(*Call, int64) error // NextInterval will get the time ordered next interval of the given interval size from the mapper. This is a // forward only operation from the start time passed into Begin. Will return nil when there is no more data to be read. // We pass the interval in here so that it can be varied over the period of the query. This is useful for queries that // must respect natural time boundaries like months or queries that span daylight savings time borders. Note that if // a limit is set on the mapper, the interval passed here should represent the MaxTime in a nano epoch. NextInterval(interval int64) (interface{}, error) // Set limit will limit the number of data points yielded on the next interval. If a limit is set, the interval // passed into NextInterval will be used as the MaxTime to scan until. SetLimit(limit int) } type TagSet struct { Tags map[string]string Filters []Expr SeriesIDs []uint32 Key []byte } func (t *TagSet) AddFilter(id uint32, filter Expr) { t.SeriesIDs = append(t.SeriesIDs, id) t.Filters = append(t.Filters, filter) } // Planner represents an object for creating execution plans. type Planner struct { DB DB // Returns the current time. Defaults to time.Now(). Now func() time.Time } // NewPlanner returns a new instance of Planner. func NewPlanner(db DB) *Planner { return &Planner{ DB: db, Now: time.Now, } } // Plan creates an execution plan for the given SelectStatement and returns an Executor. func (p *Planner) Plan(stmt *SelectStatement, chunkSize int) (*Executor, error) { now := p.Now().UTC() // Replace instances of "now()" with the current time. stmt.Condition = Reduce(stmt.Condition, &nowValuer{Now: now}) // Begin an unopened transaction. tx, err := p.DB.Begin() if err != nil { return nil, err } // Determine group by tag keys. interval, tags, err := stmt.Dimensions.Normalize() if err != nil { return nil, err } // TODO: hanldle queries that select from multiple measurements. This assumes that we're only selecting from a single one jobs, err := tx.CreateMapReduceJobs(stmt, tags) if err != nil { return nil, err } // LIMIT and OFFSET the unique series if stmt.SLimit > 0 || stmt.SOffset > 0 { if stmt.SOffset > len(jobs) { jobs = nil } else { if stmt.SOffset+stmt.SLimit > len(jobs) { stmt.SLimit = len(jobs) - stmt.SOffset } jobs = jobs[stmt.SOffset : stmt.SOffset+stmt.SLimit] } } for _, j := range jobs { j.interval = interval.Nanoseconds() j.stmt = stmt j.chunkSize = chunkSize } return &Executor{tx: tx, stmt: stmt, jobs: jobs, interval: interval.Nanoseconds()}, nil } // Executor represents the implementation of Executor. // It executes all reducers and combines their result into a row. type Executor struct { tx Tx // transaction stmt *SelectStatement // original statement jobs []*MapReduceJob // one job per unique tag set that will return in the query interval int64 // the group by interval of the query in nanoseconds } // Execute begins execution of the query and returns a channel to receive rows. func (e *Executor) Execute() (<-chan *Row, error) { // Open transaction. for _, j := range e.jobs { if err := j.Open(); err != nil { e.close() return nil, err } } // Create output channel and stream data in a separate goroutine. out := make(chan *Row, 0) go e.execute(out) return out, nil } func (e *Executor) close() { for _, j := range e.jobs { j.Close() } } // execute runs in a separate separate goroutine and streams data from processors. func (e *Executor) execute(out chan *Row) { // Ensure the the MRJobs close after execution. defer e.close() // If we have multiple tag sets we'll want to filter out the empty ones filterEmptyResults := len(e.jobs) > 1 // Execute each MRJob serially for _, j := range e.jobs { j.Execute(out, filterEmptyResults) } // Mark the end of the output channel. close(out) } // Row represents a single row returned from the execution of a statement. type Row struct { Name string `json:"name,omitempty"` Tags map[string]string `json:"tags,omitempty"` Columns []string `json:"columns"` Values [][]interface{} `json:"values,omitempty"` Err error `json:"err,omitempty"` } // tagsHash returns a hash of tag key/value pairs. func (r *Row) tagsHash() uint64 { h := fnv.New64a() keys := r.tagsKeys() for _, k := range keys { h.Write([]byte(k)) h.Write([]byte(r.Tags[k])) } return h.Sum64() } // tagKeys returns a sorted list of tag keys. func (r *Row) tagsKeys() []string { a := make([]string, len(r.Tags)) for k := range r.Tags { a = append(a, k) } sort.Strings(a) return a } // Rows represents a list of rows that can be sorted consistently by name/tag. type Rows []*Row func (p Rows) Len() int { return len(p) } func (p Rows) Less(i, j int) bool { // Sort by name first. if p[i].Name != p[j].Name { return p[i].Name < p[j].Name } // Sort by tag set hash. Tags don't have a meaningful sort order so we // just compute a hash and sort by that instead. This allows the tests // to receive rows in a predictable order every time. return p[i].tagsHash() < p[j].tagsHash() } func (p Rows) Swap(i, j int) { p[i], p[j] = p[j], p[i] }