2014-12-08 05:08:39 +00:00
|
|
|
package influxql
|
|
|
|
|
|
|
|
import (
|
2014-12-18 15:44:21 +00:00
|
|
|
"encoding/binary"
|
2014-12-08 05:08:39 +00:00
|
|
|
"fmt"
|
2014-12-18 15:44:21 +00:00
|
|
|
"hash/fnv"
|
2015-01-27 00:42:29 +00:00
|
|
|
"math"
|
2014-12-18 15:44:21 +00:00
|
|
|
"sort"
|
2014-12-08 05:08:39 +00:00
|
|
|
"strings"
|
|
|
|
"time"
|
|
|
|
)
|
|
|
|
|
2015-02-11 22:11:45 +00:00
|
|
|
// how many values we will map before emitting
|
|
|
|
const emitBatchSize = 1000
|
|
|
|
|
2015-01-26 12:19:35 +00:00
|
|
|
// DB represents an interface for creating transactions.
|
|
|
|
type DB interface {
|
|
|
|
Begin() (Tx, error)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Tx represents a transaction.
|
|
|
|
// The Tx must be opened before being used.
|
|
|
|
type Tx interface {
|
|
|
|
// Opens and closes the transaction.
|
|
|
|
Open() error
|
|
|
|
Close() error
|
|
|
|
|
|
|
|
// SetNow sets the current time to be used throughout the transaction.
|
|
|
|
SetNow(time.Time)
|
|
|
|
|
|
|
|
// Creates a list of iterators for a simple select statement.
|
|
|
|
//
|
|
|
|
// The statement must adhere to the following rules:
|
|
|
|
// 1. It can only have a single VarRef field.
|
|
|
|
// 2. It can only have a single source measurement.
|
2015-01-23 09:44:56 +00:00
|
|
|
CreateIterators(*SelectStatement) ([]Iterator, error)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Iterator represents a forward-only iterator over a set of points.
|
|
|
|
type Iterator interface {
|
2015-01-26 12:19:35 +00:00
|
|
|
// Tags returns the encoded dimensional tag values.
|
|
|
|
Tags() string
|
2014-12-08 05:08:39 +00:00
|
|
|
|
2015-01-23 09:44:56 +00:00
|
|
|
// Next returns the next value from the iterator.
|
2015-02-14 22:12:38 +00:00
|
|
|
Next() (key int64, data []byte, value interface{})
|
2014-12-08 05:08:39 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Planner represents an object for creating execution plans.
|
|
|
|
type Planner struct {
|
2015-01-26 12:19:35 +00:00
|
|
|
DB DB
|
2014-12-15 15:34:32 +00:00
|
|
|
|
|
|
|
// Returns the current time. Defaults to time.Now().
|
2014-12-11 06:32:45 +00:00
|
|
|
Now func() time.Time
|
|
|
|
}
|
|
|
|
|
|
|
|
// NewPlanner returns a new instance of Planner.
|
2015-01-26 12:19:35 +00:00
|
|
|
func NewPlanner(db DB) *Planner {
|
2014-12-11 06:32:45 +00:00
|
|
|
return &Planner{
|
|
|
|
DB: db,
|
|
|
|
Now: time.Now,
|
|
|
|
}
|
2014-12-08 05:08:39 +00:00
|
|
|
}
|
|
|
|
|
2015-02-01 18:47:48 +00:00
|
|
|
// Plan creates an execution plan for the given SelectStatement and returns an Executor.
|
2014-12-08 05:08:39 +00:00
|
|
|
func (p *Planner) Plan(stmt *SelectStatement) (*Executor, error) {
|
2015-01-23 09:44:56 +00:00
|
|
|
now := p.Now()
|
|
|
|
|
2015-01-26 12:19:35 +00:00
|
|
|
// Clone the statement to be planned.
|
|
|
|
// Replace instances of "now()" with the current time.
|
|
|
|
stmt = stmt.Clone()
|
2015-01-23 09:44:56 +00:00
|
|
|
stmt.Condition = Reduce(stmt.Condition, &nowValuer{Now: now})
|
|
|
|
|
2015-01-26 12:19:35 +00:00
|
|
|
// Begin an unopened transaction.
|
|
|
|
tx, err := p.DB.Begin()
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
2014-12-15 15:34:32 +00:00
|
|
|
}
|
|
|
|
|
2015-01-26 12:19:35 +00:00
|
|
|
// Create the executor.
|
|
|
|
e := newExecutor(tx, stmt)
|
|
|
|
|
2015-01-23 09:44:56 +00:00
|
|
|
// Determine group by tag keys.
|
2015-01-26 12:19:35 +00:00
|
|
|
interval, tags, err := stmt.Dimensions.Normalize()
|
2014-12-15 15:34:32 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2015-01-26 12:19:35 +00:00
|
|
|
e.interval = interval
|
|
|
|
e.tags = tags
|
2014-12-15 15:34:32 +00:00
|
|
|
|
2014-12-08 05:08:39 +00:00
|
|
|
// Generate a processor for each field.
|
2015-01-26 12:19:35 +00:00
|
|
|
e.processors = make([]Processor, len(stmt.Fields))
|
2014-12-08 05:08:39 +00:00
|
|
|
for i, f := range stmt.Fields {
|
2014-12-09 15:45:29 +00:00
|
|
|
p, err := p.planField(e, f)
|
2014-12-08 05:08:39 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
e.processors[i] = p
|
|
|
|
}
|
|
|
|
|
|
|
|
return e, nil
|
|
|
|
}
|
|
|
|
|
2015-01-26 12:19:35 +00:00
|
|
|
func (p *Planner) planField(e *Executor, f *Field) (Processor, error) {
|
2014-12-09 15:45:29 +00:00
|
|
|
return p.planExpr(e, f.Expr)
|
2014-12-08 05:08:39 +00:00
|
|
|
}
|
|
|
|
|
2015-01-26 12:19:35 +00:00
|
|
|
func (p *Planner) planExpr(e *Executor, expr Expr) (Processor, error) {
|
2014-12-08 05:08:39 +00:00
|
|
|
switch expr := expr.(type) {
|
|
|
|
case *VarRef:
|
2015-01-27 05:08:36 +00:00
|
|
|
return p.planRawQuery(e, expr)
|
2014-12-08 05:08:39 +00:00
|
|
|
case *Call:
|
2014-12-09 15:45:29 +00:00
|
|
|
return p.planCall(e, expr)
|
2014-12-08 05:08:39 +00:00
|
|
|
case *BinaryExpr:
|
2014-12-09 15:45:29 +00:00
|
|
|
return p.planBinaryExpr(e, expr)
|
2014-12-08 05:08:39 +00:00
|
|
|
case *ParenExpr:
|
2014-12-09 15:45:29 +00:00
|
|
|
return p.planExpr(e, expr.Expr)
|
2014-12-08 05:08:39 +00:00
|
|
|
case *NumberLiteral:
|
|
|
|
return newLiteralProcessor(expr.Val), nil
|
|
|
|
case *StringLiteral:
|
|
|
|
return newLiteralProcessor(expr.Val), nil
|
|
|
|
case *BooleanLiteral:
|
|
|
|
return newLiteralProcessor(expr.Val), nil
|
|
|
|
case *TimeLiteral:
|
|
|
|
return newLiteralProcessor(expr.Val), nil
|
|
|
|
case *DurationLiteral:
|
|
|
|
return newLiteralProcessor(expr.Val), nil
|
|
|
|
}
|
|
|
|
panic("unreachable")
|
|
|
|
}
|
|
|
|
|
2015-01-27 05:08:36 +00:00
|
|
|
// planCall generates a processor for a function call.
|
|
|
|
func (p *Planner) planRawQuery(e *Executor, v *VarRef) (Processor, error) {
|
|
|
|
// Convert the statement to a simplified substatement for the single field.
|
|
|
|
stmt, err := e.stmt.Substatement(v)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
// Retrieve a list of iterators for the substatement.
|
|
|
|
itrs, err := e.tx.CreateIterators(stmt)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
// Create mapper and reducer.
|
|
|
|
mappers := make([]*Mapper, len(itrs))
|
|
|
|
for i, itr := range itrs {
|
|
|
|
mappers[i] = NewMapper(MapRawQuery, itr, e.interval)
|
|
|
|
}
|
|
|
|
r := NewReducer(ReduceRawQuery, mappers)
|
2015-01-28 08:57:38 +00:00
|
|
|
r.name = lastIdent(stmt.Source.(*Measurement).Name)
|
2015-01-27 05:08:36 +00:00
|
|
|
|
|
|
|
return r, nil
|
|
|
|
|
|
|
|
}
|
|
|
|
|
2014-12-08 05:08:39 +00:00
|
|
|
// planCall generates a processor for a function call.
|
2015-01-26 12:19:35 +00:00
|
|
|
func (p *Planner) planCall(e *Executor, c *Call) (Processor, error) {
|
2014-12-08 05:08:39 +00:00
|
|
|
// Ensure there is a single argument.
|
2015-01-27 00:42:29 +00:00
|
|
|
if c.Name == "percentile" {
|
|
|
|
if len(c.Args) != 2 {
|
|
|
|
return nil, fmt.Errorf("expected two arguments for percentile()")
|
|
|
|
}
|
|
|
|
} else if len(c.Args) != 1 {
|
2014-12-08 05:08:39 +00:00
|
|
|
return nil, fmt.Errorf("expected one argument for %s()", c.Name)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Ensure the argument is a variable reference.
|
|
|
|
ref, ok := c.Args[0].(*VarRef)
|
|
|
|
if !ok {
|
|
|
|
return nil, fmt.Errorf("expected field argument in %s()", c.Name)
|
|
|
|
}
|
|
|
|
|
2015-01-26 12:19:35 +00:00
|
|
|
// Convert the statement to a simplified substatement for the single field.
|
|
|
|
stmt, err := e.stmt.Substatement(ref)
|
2014-12-08 05:08:39 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2014-12-21 20:22:04 +00:00
|
|
|
|
2015-01-23 09:44:56 +00:00
|
|
|
// Retrieve a list of iterators for the substatement.
|
2015-01-26 12:19:35 +00:00
|
|
|
itrs, err := e.tx.CreateIterators(stmt)
|
2014-12-21 20:22:04 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2014-12-09 15:45:29 +00:00
|
|
|
|
2015-01-26 12:19:35 +00:00
|
|
|
// Retrieve map & reduce functions by name.
|
|
|
|
var mapFn MapFunc
|
|
|
|
var reduceFn ReduceFunc
|
2014-12-08 05:08:39 +00:00
|
|
|
switch strings.ToLower(c.Name) {
|
|
|
|
case "count":
|
2015-01-26 12:19:35 +00:00
|
|
|
mapFn, reduceFn = MapCount, ReduceSum
|
2014-12-15 15:34:32 +00:00
|
|
|
case "sum":
|
2015-01-26 12:19:35 +00:00
|
|
|
mapFn, reduceFn = MapSum, ReduceSum
|
2015-01-26 23:18:24 +00:00
|
|
|
case "mean":
|
|
|
|
mapFn, reduceFn = MapMean, ReduceMean
|
2015-02-10 19:10:37 +00:00
|
|
|
case "min":
|
|
|
|
mapFn, reduceFn = MapMin, ReduceMin
|
2015-02-10 23:14:22 +00:00
|
|
|
case "max":
|
|
|
|
mapFn, reduceFn = MapMax, ReduceMax
|
2015-02-12 16:39:41 +00:00
|
|
|
case "spread":
|
|
|
|
mapFn, reduceFn = MapSpread, ReduceSpread
|
2015-02-11 17:16:46 +00:00
|
|
|
case "stddev":
|
|
|
|
mapFn, reduceFn = MapStddev, ReduceStddev
|
2015-02-11 22:55:51 +00:00
|
|
|
case "first":
|
|
|
|
mapFn, reduceFn = MapFirst, ReduceFirst
|
|
|
|
case "last":
|
|
|
|
mapFn, reduceFn = MapLast, ReduceLast
|
2015-01-27 00:42:29 +00:00
|
|
|
case "percentile":
|
|
|
|
lit, ok := c.Args[1].(*NumberLiteral)
|
|
|
|
if !ok {
|
|
|
|
return nil, fmt.Errorf("expected float argument in percentile()")
|
|
|
|
}
|
|
|
|
mapFn, reduceFn = MapEcho, ReducePercentile(lit.Val)
|
2014-12-08 05:08:39 +00:00
|
|
|
default:
|
|
|
|
return nil, fmt.Errorf("function not found: %q", c.Name)
|
|
|
|
}
|
|
|
|
|
2015-01-26 12:19:35 +00:00
|
|
|
// Create mapper and reducer.
|
2015-01-27 05:08:36 +00:00
|
|
|
mappers := make([]*Mapper, len(itrs))
|
|
|
|
for i, itr := range itrs {
|
|
|
|
mappers[i] = NewMapper(mapFn, itr, e.interval)
|
|
|
|
}
|
|
|
|
r := NewReducer(reduceFn, mappers)
|
2015-01-28 08:57:38 +00:00
|
|
|
r.name = lastIdent(stmt.Source.(*Measurement).Name)
|
2015-01-26 12:19:35 +00:00
|
|
|
|
2014-12-08 05:08:39 +00:00
|
|
|
return r, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// planBinaryExpr generates a processor for a binary expression.
|
|
|
|
// A binary expression represents a join operator between two processors.
|
2015-01-26 12:19:35 +00:00
|
|
|
func (p *Planner) planBinaryExpr(e *Executor, expr *BinaryExpr) (Processor, error) {
|
2014-12-20 04:36:52 +00:00
|
|
|
// Create processor for LHS.
|
|
|
|
lhs, err := p.planExpr(e, expr.LHS)
|
|
|
|
if err != nil {
|
|
|
|
return nil, fmt.Errorf("lhs: %s", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Create processor for RHS.
|
|
|
|
rhs, err := p.planExpr(e, expr.RHS)
|
|
|
|
if err != nil {
|
|
|
|
return nil, fmt.Errorf("rhs: %s", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Combine processors.
|
|
|
|
return newBinaryExprEvaluator(e, expr.Op, lhs, rhs), nil
|
2014-12-08 05:08:39 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Executor represents the implementation of Executor.
|
|
|
|
// It executes all reducers and combines their result into a row.
|
|
|
|
type Executor struct {
|
2015-01-26 12:19:35 +00:00
|
|
|
tx Tx // transaction
|
2014-12-15 15:34:32 +00:00
|
|
|
stmt *SelectStatement // original statement
|
2015-01-26 12:19:35 +00:00
|
|
|
processors []Processor // per-field processors
|
|
|
|
interval time.Duration // group by interval
|
|
|
|
tags []string // dimensional tag keys
|
|
|
|
}
|
|
|
|
|
|
|
|
// newExecutor returns an executor associated with a transaction and statement.
|
|
|
|
func newExecutor(tx Tx, stmt *SelectStatement) *Executor {
|
|
|
|
return &Executor{
|
|
|
|
tx: tx,
|
|
|
|
stmt: stmt,
|
|
|
|
}
|
2014-12-08 05:08:39 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Execute begins execution of the query and returns a channel to receive rows.
|
|
|
|
func (e *Executor) Execute() (<-chan *Row, error) {
|
2015-01-26 12:19:35 +00:00
|
|
|
// Open transaction.
|
|
|
|
if err := e.tx.Open(); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2014-12-08 05:08:39 +00:00
|
|
|
// Initialize processors.
|
|
|
|
for _, p := range e.processors {
|
2015-01-26 12:19:35 +00:00
|
|
|
p.Process()
|
2014-12-08 05:08:39 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Create output channel and stream data in a separate goroutine.
|
|
|
|
out := make(chan *Row, 0)
|
|
|
|
go e.execute(out)
|
|
|
|
|
|
|
|
return out, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// execute runs in a separate separate goroutine and streams data from processors.
|
|
|
|
func (e *Executor) execute(out chan *Row) {
|
2015-01-26 12:19:35 +00:00
|
|
|
// Ensure the transaction closes after execution.
|
|
|
|
defer e.tx.Close()
|
|
|
|
|
2014-12-08 05:08:39 +00:00
|
|
|
// TODO: Support multi-value rows.
|
|
|
|
|
2014-12-18 15:44:21 +00:00
|
|
|
// Initialize map of rows by encoded tagset.
|
|
|
|
rows := make(map[string]*Row)
|
2014-12-08 05:08:39 +00:00
|
|
|
|
2014-12-11 06:32:45 +00:00
|
|
|
// Combine values from each processor.
|
2014-12-15 15:34:32 +00:00
|
|
|
loop:
|
|
|
|
for {
|
2014-12-18 15:44:21 +00:00
|
|
|
// Retrieve values from processors and write them to the approprite
|
|
|
|
// row based on their tagset.
|
2014-12-15 15:34:32 +00:00
|
|
|
for i, p := range e.processors {
|
|
|
|
// Retrieve data from the processor.
|
|
|
|
m, ok := <-p.C()
|
|
|
|
if !ok {
|
|
|
|
break loop
|
|
|
|
}
|
|
|
|
|
|
|
|
// Set values on returned row.
|
|
|
|
for k, v := range m {
|
2014-12-18 15:44:21 +00:00
|
|
|
// Lookup row values and populate data.
|
2015-01-26 12:19:35 +00:00
|
|
|
values := e.createRowValuesIfNotExists(rows, e.processors[0].Name(), k.Timestamp, k.Values)
|
2014-12-15 15:34:32 +00:00
|
|
|
values[i+1] = v
|
|
|
|
}
|
2014-12-08 05:08:39 +00:00
|
|
|
}
|
2014-12-18 15:44:21 +00:00
|
|
|
}
|
2014-12-08 05:08:39 +00:00
|
|
|
|
2014-12-18 15:44:21 +00:00
|
|
|
// Normalize rows and values.
|
2015-02-04 00:02:41 +00:00
|
|
|
// Convert all times to timestamps
|
2014-12-18 15:44:21 +00:00
|
|
|
a := make(Rows, 0, len(rows))
|
|
|
|
for _, row := range rows {
|
2015-02-03 23:55:33 +00:00
|
|
|
for _, values := range row.Values {
|
|
|
|
t := time.Unix(0, values[0].(int64))
|
2015-02-08 11:06:30 +00:00
|
|
|
values[0] = t.UTC()
|
2015-02-03 23:55:33 +00:00
|
|
|
}
|
2014-12-18 15:44:21 +00:00
|
|
|
a = append(a, row)
|
2014-12-08 05:08:39 +00:00
|
|
|
}
|
2014-12-18 15:44:21 +00:00
|
|
|
sort.Sort(a)
|
2014-12-08 05:08:39 +00:00
|
|
|
|
2014-12-18 15:44:21 +00:00
|
|
|
// Send rows to the channel.
|
|
|
|
for _, row := range a {
|
|
|
|
out <- row
|
|
|
|
}
|
2014-12-08 05:08:39 +00:00
|
|
|
|
|
|
|
// Mark the end of the output channel.
|
|
|
|
close(out)
|
|
|
|
}
|
|
|
|
|
2014-12-18 15:44:21 +00:00
|
|
|
// creates a new value set if one does not already exist for a given tagset + timestamp.
|
2015-01-26 12:19:35 +00:00
|
|
|
func (e *Executor) createRowValuesIfNotExists(rows map[string]*Row, name string, timestamp int64, tagset string) []interface{} {
|
2014-12-18 15:44:21 +00:00
|
|
|
// TODO: Add "name" to lookup key.
|
|
|
|
|
|
|
|
// Find row by tagset.
|
|
|
|
var row *Row
|
2015-01-26 12:19:35 +00:00
|
|
|
if row = rows[tagset]; row == nil {
|
2014-12-18 15:44:21 +00:00
|
|
|
row = &Row{Name: name}
|
|
|
|
|
|
|
|
// Create tag map.
|
|
|
|
row.Tags = make(map[string]string)
|
2015-01-26 12:19:35 +00:00
|
|
|
for i, v := range UnmarshalStrings([]byte(tagset)) {
|
|
|
|
row.Tags[e.tags[i]] = v
|
2014-12-18 15:44:21 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Create column names.
|
|
|
|
row.Columns = make([]string, 1, len(e.stmt.Fields)+1)
|
|
|
|
row.Columns[0] = "time"
|
|
|
|
for i, f := range e.stmt.Fields {
|
|
|
|
name := f.Name()
|
|
|
|
if name == "" {
|
|
|
|
name = fmt.Sprintf("col%d", i)
|
|
|
|
}
|
|
|
|
row.Columns = append(row.Columns, name)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Save to lookup.
|
2015-01-26 12:19:35 +00:00
|
|
|
rows[tagset] = row
|
2014-12-18 15:44:21 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// If no values exist or last value doesn't match the timestamp then create new.
|
|
|
|
if len(row.Values) == 0 || row.Values[len(row.Values)-1][0] != timestamp {
|
|
|
|
values := make([]interface{}, len(e.processors)+1)
|
|
|
|
values[0] = timestamp
|
|
|
|
row.Values = append(row.Values, values)
|
|
|
|
}
|
|
|
|
|
|
|
|
return row.Values[len(row.Values)-1]
|
|
|
|
}
|
|
|
|
|
2015-01-26 12:19:35 +00:00
|
|
|
// Mapper represents an object for processing iterators.
|
|
|
|
type Mapper struct {
|
2015-01-27 05:08:36 +00:00
|
|
|
fn MapFunc // map function
|
|
|
|
itr Iterator // iterators
|
|
|
|
interval int64 // grouping interval
|
2014-12-18 15:44:21 +00:00
|
|
|
}
|
|
|
|
|
2015-01-26 12:19:35 +00:00
|
|
|
// NewMapper returns a new instance of Mapper with a given function and interval.
|
2015-01-27 05:08:36 +00:00
|
|
|
func NewMapper(fn MapFunc, itr Iterator, interval time.Duration) *Mapper {
|
2015-01-26 12:19:35 +00:00
|
|
|
return &Mapper{
|
|
|
|
fn: fn,
|
2015-01-27 05:08:36 +00:00
|
|
|
itr: itr,
|
2015-01-26 12:19:35 +00:00
|
|
|
interval: interval.Nanoseconds(),
|
|
|
|
}
|
|
|
|
}
|
2014-12-09 15:45:29 +00:00
|
|
|
|
2015-01-26 12:19:35 +00:00
|
|
|
// Map executes the mapper's function against the iterator.
|
|
|
|
// Returns a nil emitter if no data was found.
|
|
|
|
func (m *Mapper) Map() *Emitter {
|
|
|
|
e := NewEmitter(1)
|
|
|
|
go m.run(e)
|
|
|
|
return e
|
2014-12-08 05:08:39 +00:00
|
|
|
}
|
|
|
|
|
2015-01-26 12:19:35 +00:00
|
|
|
func (m *Mapper) run(e *Emitter) {
|
|
|
|
// Close emitter when we're done.
|
|
|
|
defer func() { _ = e.Close() }()
|
|
|
|
|
2015-01-27 05:08:36 +00:00
|
|
|
// Wrap iterator with buffer.
|
|
|
|
bufItr := &bufIterator{itr: m.itr}
|
2015-01-26 12:19:35 +00:00
|
|
|
|
|
|
|
// Determine the start time.
|
|
|
|
var tmin int64
|
|
|
|
if m.interval > 0 {
|
|
|
|
// Align start time to interval.
|
2015-02-14 22:12:38 +00:00
|
|
|
tmin, _, _ = bufItr.Peek()
|
2015-01-26 12:19:35 +00:00
|
|
|
tmin -= (tmin % m.interval)
|
|
|
|
}
|
|
|
|
|
|
|
|
for {
|
2015-01-27 05:08:36 +00:00
|
|
|
// Set the upper bound of the interval.
|
|
|
|
if m.interval > 0 {
|
|
|
|
bufItr.tmax = tmin + m.interval - 1
|
2015-01-26 12:19:35 +00:00
|
|
|
}
|
|
|
|
|
2015-01-27 05:08:36 +00:00
|
|
|
// Exit if there was only one interval or no more data is available.
|
|
|
|
if bufItr.EOF() {
|
2015-01-26 12:19:35 +00:00
|
|
|
break
|
|
|
|
}
|
|
|
|
|
2015-02-07 11:29:04 +00:00
|
|
|
// Execute the map function.
|
|
|
|
m.fn(bufItr, e, tmin)
|
|
|
|
|
2015-01-26 12:19:35 +00:00
|
|
|
// Move the interval forward.
|
|
|
|
tmin += m.interval
|
2014-12-08 05:08:39 +00:00
|
|
|
}
|
2014-12-09 15:45:29 +00:00
|
|
|
}
|
2014-12-08 05:08:39 +00:00
|
|
|
|
2015-01-26 12:19:35 +00:00
|
|
|
// bufIterator represents a buffer iterator.
|
|
|
|
type bufIterator struct {
|
|
|
|
itr Iterator // underlying iterator
|
|
|
|
tmax int64 // maximum key
|
|
|
|
|
|
|
|
buf struct {
|
|
|
|
key int64
|
2015-02-14 22:12:38 +00:00
|
|
|
data []byte
|
2015-01-26 12:19:35 +00:00
|
|
|
value interface{}
|
|
|
|
}
|
|
|
|
buffered bool
|
2015-01-23 09:44:56 +00:00
|
|
|
}
|
2014-12-08 05:08:39 +00:00
|
|
|
|
2015-01-26 12:19:35 +00:00
|
|
|
// Tags returns the encoded dimensional values for the iterator.
|
|
|
|
func (i *bufIterator) Tags() string { return i.itr.Tags() }
|
2014-12-08 05:08:39 +00:00
|
|
|
|
2015-01-26 12:19:35 +00:00
|
|
|
// Next returns the next key/value pair from the iterator.
|
2015-02-14 22:12:38 +00:00
|
|
|
func (i *bufIterator) Next() (key int64, data []byte, value interface{}) {
|
2015-01-26 12:19:35 +00:00
|
|
|
// Read the key/value pair off the buffer or underlying iterator.
|
|
|
|
if i.buffered {
|
|
|
|
i.buffered = false
|
|
|
|
} else {
|
2015-02-14 22:12:38 +00:00
|
|
|
i.buf.key, i.buf.data, i.buf.value = i.itr.Next()
|
2014-12-15 15:34:32 +00:00
|
|
|
}
|
2015-02-14 22:12:38 +00:00
|
|
|
key, data, value = i.buf.key, i.buf.data, i.buf.value
|
2015-01-26 12:19:35 +00:00
|
|
|
|
|
|
|
// If key is greater than tmax then put it back on the buffer.
|
|
|
|
if i.tmax != 0 && key > i.tmax {
|
|
|
|
i.buffered = true
|
2015-02-14 22:12:38 +00:00
|
|
|
return 0, nil, nil
|
2015-01-26 12:19:35 +00:00
|
|
|
}
|
|
|
|
|
2015-02-14 22:12:38 +00:00
|
|
|
return key, data, value
|
2014-12-08 05:08:39 +00:00
|
|
|
}
|
|
|
|
|
2015-01-26 12:19:35 +00:00
|
|
|
// Peek returns the next key/value pair but does not move the iterator forward.
|
2015-02-14 22:12:38 +00:00
|
|
|
func (i *bufIterator) Peek() (key int64, data []byte, value interface{}) {
|
|
|
|
key, data, value = i.Next()
|
2015-01-26 12:19:35 +00:00
|
|
|
i.buffered = true
|
|
|
|
return
|
2014-12-08 05:08:39 +00:00
|
|
|
}
|
|
|
|
|
2015-01-26 12:19:35 +00:00
|
|
|
// EOF returns true if there is no more data in the underlying iterator.
|
|
|
|
func (i *bufIterator) EOF() bool { i.Peek(); return i.buf.key == 0 }
|
|
|
|
|
|
|
|
// MapFunc represents a function used for mapping iterators.
|
|
|
|
type MapFunc func(Iterator, *Emitter, int64)
|
2014-12-08 05:08:39 +00:00
|
|
|
|
2015-01-26 12:19:35 +00:00
|
|
|
// MapCount computes the number of values in an iterator.
|
|
|
|
func MapCount(itr Iterator, e *Emitter, tmin int64) {
|
2014-12-08 05:08:39 +00:00
|
|
|
n := 0
|
2015-02-14 22:12:38 +00:00
|
|
|
for k, _, _ := itr.Next(); k != 0; k, _, _ = itr.Next() {
|
2014-12-08 05:08:39 +00:00
|
|
|
n++
|
|
|
|
}
|
2015-01-26 12:19:35 +00:00
|
|
|
e.Emit(Key{tmin, itr.Tags()}, float64(n))
|
2014-12-08 05:08:39 +00:00
|
|
|
}
|
|
|
|
|
2015-01-26 12:19:35 +00:00
|
|
|
// MapSum computes the summation of values in an iterator.
|
|
|
|
func MapSum(itr Iterator, e *Emitter, tmin int64) {
|
2014-12-15 15:34:32 +00:00
|
|
|
n := float64(0)
|
2015-02-14 22:12:38 +00:00
|
|
|
for k, _, v := itr.Next(); k != 0; k, _, v = itr.Next() {
|
2014-12-15 15:34:32 +00:00
|
|
|
n += v.(float64)
|
|
|
|
}
|
2015-01-26 12:19:35 +00:00
|
|
|
e.Emit(Key{tmin, itr.Tags()}, n)
|
2014-12-15 15:34:32 +00:00
|
|
|
}
|
|
|
|
|
2015-01-26 12:19:35 +00:00
|
|
|
// Processor represents an object for joining reducer output.
|
|
|
|
type Processor interface {
|
|
|
|
Process()
|
|
|
|
Name() string
|
|
|
|
C() <-chan map[Key]interface{}
|
2014-12-20 04:36:52 +00:00
|
|
|
}
|
|
|
|
|
2015-01-26 12:19:35 +00:00
|
|
|
// Reducer represents an object for processing mapper output.
|
2014-12-08 05:08:39 +00:00
|
|
|
// Implements processor.
|
2015-01-26 12:19:35 +00:00
|
|
|
type Reducer struct {
|
|
|
|
name string
|
|
|
|
fn ReduceFunc // reduce function
|
|
|
|
mappers []*Mapper // child mappers
|
2014-12-09 15:45:29 +00:00
|
|
|
|
2015-01-26 12:19:35 +00:00
|
|
|
c <-chan map[Key]interface{}
|
2014-12-08 05:08:39 +00:00
|
|
|
}
|
|
|
|
|
2015-01-26 12:19:35 +00:00
|
|
|
// NewReducer returns a new instance of reducer.
|
|
|
|
func NewReducer(fn ReduceFunc, mappers []*Mapper) *Reducer {
|
|
|
|
return &Reducer{
|
|
|
|
fn: fn,
|
|
|
|
mappers: mappers,
|
2014-12-08 05:08:39 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2015-01-26 12:19:35 +00:00
|
|
|
// C returns the output channel.
|
|
|
|
func (r *Reducer) C() <-chan map[Key]interface{} { return r.c }
|
|
|
|
|
|
|
|
// Name returns the source name.
|
|
|
|
func (r *Reducer) Name() string { return r.name }
|
2014-12-08 05:08:39 +00:00
|
|
|
|
2015-02-01 18:47:48 +00:00
|
|
|
// Process processes the Reducer.
|
2015-01-26 12:19:35 +00:00
|
|
|
func (r *Reducer) Process() { r.Reduce() }
|
|
|
|
|
|
|
|
// Reduce executes the reducer's function against all output from the mappers.
|
|
|
|
func (r *Reducer) Reduce() *Emitter {
|
|
|
|
inputs := make([]<-chan map[Key]interface{}, len(r.mappers))
|
|
|
|
for i, m := range r.mappers {
|
|
|
|
inputs[i] = m.Map().C()
|
2014-12-08 05:08:39 +00:00
|
|
|
}
|
2015-01-26 12:19:35 +00:00
|
|
|
|
|
|
|
e := NewEmitter(1)
|
|
|
|
r.c = e.C()
|
|
|
|
go r.run(e, inputs)
|
|
|
|
return e
|
2014-12-08 05:08:39 +00:00
|
|
|
}
|
|
|
|
|
2015-01-26 12:19:35 +00:00
|
|
|
func (r *Reducer) run(e *Emitter, inputs []<-chan map[Key]interface{}) {
|
|
|
|
// Close emitter when we're done.
|
|
|
|
defer func() { _ = e.Close() }()
|
2014-12-08 05:08:39 +00:00
|
|
|
|
2015-01-26 12:19:35 +00:00
|
|
|
// Buffer all the inputs.
|
|
|
|
bufInputs := make([]*bufInput, len(inputs))
|
|
|
|
for i, input := range inputs {
|
|
|
|
bufInputs[i] = &bufInput{c: input}
|
|
|
|
}
|
2014-12-09 15:45:29 +00:00
|
|
|
|
2015-01-26 12:19:35 +00:00
|
|
|
// Stream data from the inputs and reduce.
|
2014-12-15 15:34:32 +00:00
|
|
|
for {
|
2015-01-26 12:19:35 +00:00
|
|
|
// Read all data from the inputers with the same timestamp.
|
|
|
|
timestamp := int64(0)
|
2015-01-27 05:08:36 +00:00
|
|
|
for _, bufInput := range bufInputs {
|
|
|
|
rec := bufInput.peek()
|
|
|
|
if rec == nil {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
if timestamp == 0 || rec.Key.Timestamp < timestamp {
|
|
|
|
timestamp = rec.Key.Timestamp
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2015-01-26 12:19:35 +00:00
|
|
|
data := make(map[Key][]interface{})
|
|
|
|
for _, bufInput := range bufInputs {
|
|
|
|
for {
|
2015-01-27 05:08:36 +00:00
|
|
|
rec := bufInput.read()
|
|
|
|
if rec == nil {
|
2015-01-26 12:19:35 +00:00
|
|
|
break
|
|
|
|
}
|
|
|
|
|
2015-01-27 05:08:36 +00:00
|
|
|
if rec.Key.Timestamp != timestamp {
|
|
|
|
bufInput.unread(rec)
|
|
|
|
break
|
2015-01-26 12:19:35 +00:00
|
|
|
}
|
2015-01-27 05:08:36 +00:00
|
|
|
|
|
|
|
data[rec.Key] = append(data[rec.Key], rec.Value)
|
2014-12-15 15:34:32 +00:00
|
|
|
}
|
2014-12-08 05:08:39 +00:00
|
|
|
}
|
|
|
|
|
2015-01-26 12:19:35 +00:00
|
|
|
if len(data) == 0 {
|
|
|
|
break
|
|
|
|
}
|
|
|
|
|
|
|
|
// Sort keys.
|
|
|
|
keys := make(keySlice, 0, len(data))
|
|
|
|
for k := range data {
|
|
|
|
keys = append(keys, k)
|
|
|
|
}
|
|
|
|
sort.Sort(keys)
|
|
|
|
|
2014-12-15 15:34:32 +00:00
|
|
|
// Reduce each key.
|
2015-01-26 12:19:35 +00:00
|
|
|
for _, k := range keys {
|
|
|
|
r.fn(k, data[k], e)
|
2014-12-15 15:34:32 +00:00
|
|
|
}
|
2014-12-08 05:08:39 +00:00
|
|
|
}
|
2015-01-26 12:19:35 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
type bufInput struct {
|
2015-01-27 05:08:36 +00:00
|
|
|
buf *Record
|
2015-01-26 12:19:35 +00:00
|
|
|
c <-chan map[Key]interface{}
|
|
|
|
}
|
|
|
|
|
2015-01-27 05:08:36 +00:00
|
|
|
func (i *bufInput) read() *Record {
|
2015-01-26 12:19:35 +00:00
|
|
|
if i.buf != nil {
|
2015-01-27 05:08:36 +00:00
|
|
|
rec := i.buf
|
2015-01-26 12:19:35 +00:00
|
|
|
i.buf = nil
|
2015-01-27 05:08:36 +00:00
|
|
|
return rec
|
2015-01-26 12:19:35 +00:00
|
|
|
}
|
2014-12-08 05:08:39 +00:00
|
|
|
|
2015-01-26 12:19:35 +00:00
|
|
|
m, _ := <-i.c
|
2015-01-27 05:08:36 +00:00
|
|
|
return mapToRecord(m)
|
2014-12-08 05:08:39 +00:00
|
|
|
}
|
|
|
|
|
2015-01-27 05:08:36 +00:00
|
|
|
func (i *bufInput) unread(rec *Record) { i.buf = rec }
|
2015-01-26 12:19:35 +00:00
|
|
|
|
2015-01-27 05:08:36 +00:00
|
|
|
func (i *bufInput) peek() *Record {
|
|
|
|
rec := i.read()
|
|
|
|
i.unread(rec)
|
|
|
|
return rec
|
|
|
|
}
|
|
|
|
|
|
|
|
type Record struct {
|
|
|
|
Key Key
|
|
|
|
Value interface{}
|
|
|
|
}
|
|
|
|
|
|
|
|
func mapToRecord(m map[Key]interface{}) *Record {
|
|
|
|
for k, v := range m {
|
|
|
|
return &Record{k, v}
|
|
|
|
}
|
|
|
|
return nil
|
2014-12-08 05:08:39 +00:00
|
|
|
}
|
|
|
|
|
2015-01-26 12:19:35 +00:00
|
|
|
// ReduceFunc represents a function used for reducing mapper output.
|
|
|
|
type ReduceFunc func(Key, []interface{}, *Emitter)
|
2014-12-08 05:08:39 +00:00
|
|
|
|
2015-01-26 12:19:35 +00:00
|
|
|
// ReduceSum computes the sum of values for each key.
|
|
|
|
func ReduceSum(key Key, values []interface{}, e *Emitter) {
|
2014-12-11 06:32:45 +00:00
|
|
|
var n float64
|
|
|
|
for _, v := range values {
|
|
|
|
n += v.(float64)
|
|
|
|
}
|
2015-01-26 12:19:35 +00:00
|
|
|
e.Emit(key, n)
|
2014-12-08 05:08:39 +00:00
|
|
|
}
|
|
|
|
|
2015-01-26 23:18:24 +00:00
|
|
|
// MapMean computes the count and sum of values in an iterator to be combined by the reducer.
|
|
|
|
func MapMean(itr Iterator, e *Emitter, tmin int64) {
|
|
|
|
out := &meanMapOutput{}
|
|
|
|
|
2015-02-14 22:12:38 +00:00
|
|
|
for k, _, v := itr.Next(); k != 0; k, _, v = itr.Next() {
|
2015-01-26 23:18:24 +00:00
|
|
|
out.Count++
|
|
|
|
out.Sum += v.(float64)
|
|
|
|
}
|
|
|
|
e.Emit(Key{tmin, itr.Tags()}, out)
|
|
|
|
}
|
|
|
|
|
|
|
|
type meanMapOutput struct {
|
|
|
|
Count int
|
|
|
|
Sum float64
|
|
|
|
}
|
|
|
|
|
|
|
|
// ReduceMean computes the mean of values for each key.
|
|
|
|
func ReduceMean(key Key, values []interface{}, e *Emitter) {
|
|
|
|
out := &meanMapOutput{}
|
|
|
|
for _, v := range values {
|
|
|
|
val := v.(*meanMapOutput)
|
|
|
|
out.Count += val.Count
|
|
|
|
out.Sum += val.Sum
|
|
|
|
}
|
|
|
|
e.Emit(key, out.Sum/float64(out.Count))
|
|
|
|
}
|
|
|
|
|
2015-02-10 19:10:37 +00:00
|
|
|
// MapMin collects the values to pass to the reducer
|
|
|
|
func MapMin(itr Iterator, e *Emitter, tmin int64) {
|
2015-02-11 17:45:44 +00:00
|
|
|
var min float64
|
|
|
|
pointsYielded := false
|
2015-02-10 19:10:37 +00:00
|
|
|
|
2015-02-14 22:12:38 +00:00
|
|
|
for k, _, v := itr.Next(); k != 0; k, _, v = itr.Next() {
|
|
|
|
val := v.(float64)
|
2015-02-11 17:45:44 +00:00
|
|
|
// Initialize min
|
|
|
|
if !pointsYielded {
|
|
|
|
min = val
|
|
|
|
pointsYielded = true
|
2015-02-11 16:34:59 +00:00
|
|
|
}
|
2015-02-11 17:45:44 +00:00
|
|
|
min = math.Min(min, val)
|
2015-02-11 16:34:59 +00:00
|
|
|
}
|
2015-02-11 17:45:44 +00:00
|
|
|
if pointsYielded {
|
|
|
|
e.Emit(Key{tmin, itr.Tags()}, min)
|
2015-02-10 19:10:37 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// ReduceMin computes the min of value.
|
|
|
|
func ReduceMin(key Key, values []interface{}, e *Emitter) {
|
2015-02-11 17:45:44 +00:00
|
|
|
var min float64
|
|
|
|
pointsYielded := false
|
2015-02-11 16:34:59 +00:00
|
|
|
|
2015-02-11 17:45:44 +00:00
|
|
|
for _, v := range values {
|
|
|
|
val := v.(float64)
|
2015-02-11 16:34:59 +00:00
|
|
|
// Initialize min
|
2015-02-11 17:45:44 +00:00
|
|
|
if !pointsYielded {
|
|
|
|
min = val
|
|
|
|
pointsYielded = true
|
2015-02-10 19:10:37 +00:00
|
|
|
}
|
2015-02-11 17:45:44 +00:00
|
|
|
m := math.Min(min, val)
|
|
|
|
min = m
|
|
|
|
}
|
|
|
|
if pointsYielded {
|
|
|
|
e.Emit(key, min)
|
2015-02-10 19:10:37 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2015-02-10 23:14:22 +00:00
|
|
|
// MapMax collects the values to pass to the reducer
|
|
|
|
func MapMax(itr Iterator, e *Emitter, tmax int64) {
|
2015-02-11 17:45:44 +00:00
|
|
|
var max float64
|
|
|
|
pointsYielded := false
|
2015-02-10 23:14:22 +00:00
|
|
|
|
2015-02-14 22:12:38 +00:00
|
|
|
for k, _, v := itr.Next(); k != 0; k, _, v = itr.Next() {
|
2015-02-11 17:45:44 +00:00
|
|
|
val := v.(float64)
|
|
|
|
// Initialize max
|
|
|
|
if !pointsYielded {
|
|
|
|
max = val
|
|
|
|
pointsYielded = true
|
2015-02-11 16:34:59 +00:00
|
|
|
}
|
2015-02-11 17:45:44 +00:00
|
|
|
max = math.Max(max, val)
|
2015-02-11 16:34:59 +00:00
|
|
|
}
|
2015-02-11 17:45:44 +00:00
|
|
|
if pointsYielded {
|
|
|
|
e.Emit(Key{tmax, itr.Tags()}, max)
|
2015-02-10 23:14:22 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// ReduceMax computes the max of value.
|
|
|
|
func ReduceMax(key Key, values []interface{}, e *Emitter) {
|
2015-02-11 17:45:44 +00:00
|
|
|
var max float64
|
|
|
|
pointsYielded := false
|
2015-02-11 16:34:59 +00:00
|
|
|
|
2015-02-11 17:45:44 +00:00
|
|
|
for _, v := range values {
|
|
|
|
val := v.(float64)
|
2015-02-11 16:34:59 +00:00
|
|
|
// Initialize max
|
2015-02-11 17:45:44 +00:00
|
|
|
if !pointsYielded {
|
|
|
|
max = val
|
|
|
|
pointsYielded = true
|
2015-02-10 23:14:22 +00:00
|
|
|
}
|
2015-02-11 17:45:44 +00:00
|
|
|
max = math.Max(max, val)
|
|
|
|
}
|
|
|
|
if pointsYielded {
|
|
|
|
e.Emit(key, max)
|
2015-02-10 23:14:22 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2015-02-12 16:39:41 +00:00
|
|
|
type spreadMapOutput struct {
|
2015-02-12 22:12:14 +00:00
|
|
|
Min, Max float64
|
2015-02-12 16:39:41 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// MapSpread collects the values to pass to the reducer
|
|
|
|
func MapSpread(itr Iterator, e *Emitter, tmax int64) {
|
|
|
|
var out spreadMapOutput
|
|
|
|
pointsYielded := false
|
|
|
|
|
2015-02-14 22:12:38 +00:00
|
|
|
for k, _, v := itr.Next(); k != 0; k, _, v = itr.Next() {
|
2015-02-12 16:39:41 +00:00
|
|
|
val := v.(float64)
|
|
|
|
// Initialize
|
|
|
|
if !pointsYielded {
|
2015-02-12 22:12:14 +00:00
|
|
|
out.Max = val
|
|
|
|
out.Min = val
|
2015-02-12 16:39:41 +00:00
|
|
|
pointsYielded = true
|
|
|
|
}
|
2015-02-12 22:12:14 +00:00
|
|
|
out.Max = math.Max(out.Max, val)
|
|
|
|
out.Min = math.Min(out.Min, val)
|
2015-02-12 16:39:41 +00:00
|
|
|
}
|
|
|
|
if pointsYielded {
|
|
|
|
e.Emit(Key{tmax, itr.Tags()}, out)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2015-02-12 16:56:39 +00:00
|
|
|
// ReduceSpread computes the spread of values.
|
2015-02-12 16:39:41 +00:00
|
|
|
func ReduceSpread(key Key, values []interface{}, e *Emitter) {
|
|
|
|
var result spreadMapOutput
|
|
|
|
pointsYielded := false
|
|
|
|
|
|
|
|
for _, v := range values {
|
|
|
|
val := v.(spreadMapOutput)
|
|
|
|
// Initialize
|
|
|
|
if !pointsYielded {
|
2015-02-12 22:12:14 +00:00
|
|
|
result.Max = val.Max
|
|
|
|
result.Min = val.Min
|
2015-02-12 16:39:41 +00:00
|
|
|
pointsYielded = true
|
|
|
|
}
|
2015-02-12 22:12:14 +00:00
|
|
|
result.Max = math.Max(result.Max, val.Max)
|
|
|
|
result.Min = math.Min(result.Min, val.Min)
|
2015-02-12 16:39:41 +00:00
|
|
|
}
|
|
|
|
if pointsYielded {
|
2015-02-12 22:12:14 +00:00
|
|
|
e.Emit(key, result.Max-result.Min)
|
2015-02-12 16:39:41 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2015-02-11 17:16:46 +00:00
|
|
|
// MapStddev collects the values to pass to the reducer
|
|
|
|
func MapStddev(itr Iterator, e *Emitter, tmax int64) {
|
|
|
|
var values []float64
|
|
|
|
|
2015-02-14 22:12:38 +00:00
|
|
|
for k, _, v := itr.Next(); k != 0; k, _, v = itr.Next() {
|
2015-02-11 17:16:46 +00:00
|
|
|
values = append(values, v.(float64))
|
2015-02-11 23:03:01 +00:00
|
|
|
// Emit in batches.
|
|
|
|
// unbounded emission of data can lead to excessive memory use
|
|
|
|
// or other potential performance problems.
|
2015-02-11 22:11:45 +00:00
|
|
|
if len(values) == emitBatchSize {
|
2015-02-11 22:00:39 +00:00
|
|
|
e.Emit(Key{tmax, itr.Tags()}, values)
|
|
|
|
values = []float64{}
|
|
|
|
}
|
2015-02-11 17:16:46 +00:00
|
|
|
}
|
2015-02-11 19:37:14 +00:00
|
|
|
if len(values) > 0 {
|
|
|
|
e.Emit(Key{tmax, itr.Tags()}, values)
|
|
|
|
}
|
2015-02-11 17:16:46 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// ReduceStddev computes the stddev of values.
|
|
|
|
func ReduceStddev(key Key, values []interface{}, e *Emitter) {
|
|
|
|
var data []float64
|
|
|
|
// Collect all the data points
|
|
|
|
for _, value := range values {
|
|
|
|
data = append(data, value.([]float64)...)
|
|
|
|
}
|
2015-02-11 19:37:14 +00:00
|
|
|
// If no data, leave
|
|
|
|
if len(data) == 0 {
|
|
|
|
return
|
|
|
|
}
|
2015-02-11 19:33:10 +00:00
|
|
|
// If we only have one data point, the std dev is undefined
|
|
|
|
if len(data) == 1 {
|
|
|
|
e.Emit(key, "undefined")
|
|
|
|
return
|
|
|
|
}
|
2015-02-11 17:16:46 +00:00
|
|
|
// Get the sum
|
|
|
|
var sum float64
|
|
|
|
for _, v := range data {
|
|
|
|
sum += v
|
|
|
|
}
|
|
|
|
// Get the mean
|
|
|
|
mean := sum / float64(len(data))
|
|
|
|
// Get the variance
|
|
|
|
var variance float64
|
|
|
|
for _, v := range data {
|
|
|
|
dif := v - mean
|
|
|
|
sq := math.Pow(dif, 2)
|
|
|
|
variance += sq
|
|
|
|
}
|
2015-02-11 19:33:10 +00:00
|
|
|
variance = variance / float64(len(data)-1)
|
2015-02-11 17:16:46 +00:00
|
|
|
stddev := math.Sqrt(variance)
|
|
|
|
|
|
|
|
e.Emit(key, stddev)
|
|
|
|
}
|
|
|
|
|
2015-02-11 22:55:51 +00:00
|
|
|
type firstLastMapOutput struct {
|
2015-02-12 22:12:14 +00:00
|
|
|
Time int64
|
|
|
|
Val interface{}
|
2015-02-11 22:55:51 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// MapFirst collects the values to pass to the reducer
|
|
|
|
func MapFirst(itr Iterator, e *Emitter, tmax int64) {
|
|
|
|
out := firstLastMapOutput{}
|
|
|
|
pointsYielded := false
|
|
|
|
|
2015-02-14 22:12:38 +00:00
|
|
|
for k, _, v := itr.Next(); k != 0; k, _, v = itr.Next() {
|
2015-02-11 22:55:51 +00:00
|
|
|
// Initialize first
|
|
|
|
if !pointsYielded {
|
2015-02-12 22:12:14 +00:00
|
|
|
out.Time = k
|
|
|
|
out.Val = v
|
2015-02-11 22:55:51 +00:00
|
|
|
pointsYielded = true
|
|
|
|
}
|
2015-02-12 22:12:14 +00:00
|
|
|
if k < out.Time {
|
|
|
|
out.Time = k
|
|
|
|
out.Val = v
|
2015-02-11 22:55:51 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
if pointsYielded {
|
|
|
|
e.Emit(Key{tmax, itr.Tags()}, out)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// ReduceFirst computes the first of value.
|
|
|
|
func ReduceFirst(key Key, values []interface{}, e *Emitter) {
|
|
|
|
out := firstLastMapOutput{}
|
|
|
|
pointsYielded := false
|
|
|
|
|
|
|
|
for _, v := range values {
|
|
|
|
val := v.(firstLastMapOutput)
|
|
|
|
// Initialize first
|
|
|
|
if !pointsYielded {
|
2015-02-12 22:12:14 +00:00
|
|
|
out.Time = val.Time
|
|
|
|
out.Val = val.Val
|
2015-02-11 22:55:51 +00:00
|
|
|
pointsYielded = true
|
|
|
|
}
|
2015-02-12 22:12:14 +00:00
|
|
|
if val.Time < out.Time {
|
|
|
|
out.Time = val.Time
|
|
|
|
out.Val = val.Val
|
2015-02-11 22:55:51 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
if pointsYielded {
|
2015-02-12 22:12:14 +00:00
|
|
|
e.Emit(key, out.Val)
|
2015-02-11 22:55:51 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// MapLast collects the values to pass to the reducer
|
|
|
|
func MapLast(itr Iterator, e *Emitter, tmax int64) {
|
|
|
|
out := firstLastMapOutput{}
|
|
|
|
pointsYielded := false
|
|
|
|
|
2015-02-14 22:12:38 +00:00
|
|
|
for k, _, v := itr.Next(); k != 0; k, _, v = itr.Next() {
|
2015-02-11 22:55:51 +00:00
|
|
|
// Initialize last
|
|
|
|
if !pointsYielded {
|
2015-02-12 22:12:14 +00:00
|
|
|
out.Time = k
|
|
|
|
out.Val = v
|
2015-02-11 22:55:51 +00:00
|
|
|
pointsYielded = true
|
|
|
|
}
|
2015-02-12 22:12:14 +00:00
|
|
|
if k > out.Time {
|
|
|
|
out.Time = k
|
|
|
|
out.Val = v
|
2015-02-11 22:55:51 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
if pointsYielded {
|
|
|
|
e.Emit(Key{tmax, itr.Tags()}, out)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// ReduceLast computes the last of value.
|
|
|
|
func ReduceLast(key Key, values []interface{}, e *Emitter) {
|
|
|
|
out := firstLastMapOutput{}
|
|
|
|
pointsYielded := false
|
|
|
|
|
|
|
|
for _, v := range values {
|
|
|
|
val := v.(firstLastMapOutput)
|
|
|
|
// Initialize last
|
|
|
|
if !pointsYielded {
|
2015-02-12 22:12:14 +00:00
|
|
|
out.Time = val.Time
|
|
|
|
out.Val = val.Val
|
2015-02-11 22:55:51 +00:00
|
|
|
pointsYielded = true
|
|
|
|
}
|
2015-02-12 22:12:14 +00:00
|
|
|
if val.Time > out.Time {
|
|
|
|
out.Time = val.Time
|
|
|
|
out.Val = val.Val
|
2015-02-11 22:55:51 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
if pointsYielded {
|
2015-02-12 22:12:14 +00:00
|
|
|
e.Emit(key, out.Val)
|
2015-02-11 22:55:51 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2015-01-27 00:42:29 +00:00
|
|
|
// MapEcho emits the data points for each group by interval
|
|
|
|
func MapEcho(itr Iterator, e *Emitter, tmin int64) {
|
|
|
|
var values []interface{}
|
|
|
|
|
2015-02-14 22:12:38 +00:00
|
|
|
for k, _, v := itr.Next(); k != 0; k, _, v = itr.Next() {
|
2015-01-27 00:42:29 +00:00
|
|
|
values = append(values, v)
|
|
|
|
}
|
|
|
|
e.Emit(Key{tmin, itr.Tags()}, values)
|
|
|
|
}
|
|
|
|
|
|
|
|
// ReducePercentile computes the percentile of values for each key.
|
|
|
|
func ReducePercentile(percentile float64) ReduceFunc {
|
|
|
|
return func(key Key, values []interface{}, e *Emitter) {
|
|
|
|
var allValues []float64
|
|
|
|
|
|
|
|
for _, v := range values {
|
|
|
|
vals := v.([]interface{})
|
|
|
|
for _, v := range vals {
|
|
|
|
allValues = append(allValues, v.(float64))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
sort.Float64s(allValues)
|
|
|
|
length := len(allValues)
|
|
|
|
index := int(math.Floor(float64(length)*percentile/100.0+0.5)) - 1
|
|
|
|
|
|
|
|
if index < 0 || index >= len(allValues) {
|
|
|
|
e.Emit(key, 0.0)
|
|
|
|
}
|
|
|
|
|
|
|
|
e.Emit(key, allValues[index])
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2015-01-27 05:08:36 +00:00
|
|
|
func MapRawQuery(itr Iterator, e *Emitter, tmin int64) {
|
2015-02-14 22:12:38 +00:00
|
|
|
for k, _, v := itr.Next(); k != 0; k, _, v = itr.Next() {
|
2015-01-27 05:08:36 +00:00
|
|
|
e.Emit(Key{k, itr.Tags()}, v)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
type rawQueryMapOutput struct {
|
|
|
|
timestamp int64
|
|
|
|
value interface{}
|
|
|
|
}
|
|
|
|
|
|
|
|
func ReduceRawQuery(key Key, values []interface{}, e *Emitter) {
|
|
|
|
for _, v := range values {
|
|
|
|
e.Emit(key, v)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2014-12-20 04:36:52 +00:00
|
|
|
// binaryExprEvaluator represents a processor for combining two processors.
|
|
|
|
type binaryExprEvaluator struct {
|
|
|
|
executor *Executor // parent executor
|
2015-01-26 12:19:35 +00:00
|
|
|
lhs, rhs Processor // processors
|
2014-12-20 04:36:52 +00:00
|
|
|
op Token // operation
|
|
|
|
|
2015-01-26 12:19:35 +00:00
|
|
|
c chan map[Key]interface{}
|
2014-12-20 04:36:52 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// newBinaryExprEvaluator returns a new instance of binaryExprEvaluator.
|
2015-01-26 12:19:35 +00:00
|
|
|
func newBinaryExprEvaluator(e *Executor, op Token, lhs, rhs Processor) *binaryExprEvaluator {
|
2014-12-20 04:36:52 +00:00
|
|
|
return &binaryExprEvaluator{
|
|
|
|
executor: e,
|
|
|
|
op: op,
|
|
|
|
lhs: lhs,
|
|
|
|
rhs: rhs,
|
2015-01-26 12:19:35 +00:00
|
|
|
c: make(chan map[Key]interface{}, 0),
|
2014-12-20 04:36:52 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2015-01-26 12:19:35 +00:00
|
|
|
// Process begins streaming values from the lhs/rhs processors
|
|
|
|
func (e *binaryExprEvaluator) Process() {
|
|
|
|
e.lhs.Process()
|
|
|
|
e.rhs.Process()
|
2014-12-20 04:36:52 +00:00
|
|
|
go e.run()
|
|
|
|
}
|
|
|
|
|
|
|
|
// C returns the streaming data channel.
|
2015-01-26 12:19:35 +00:00
|
|
|
func (e *binaryExprEvaluator) C() <-chan map[Key]interface{} { return e.c }
|
2014-12-20 04:36:52 +00:00
|
|
|
|
|
|
|
// name returns the source name.
|
2015-01-26 12:19:35 +00:00
|
|
|
func (e *binaryExprEvaluator) Name() string { return "" }
|
2014-12-20 04:36:52 +00:00
|
|
|
|
|
|
|
// run runs the processor loop to read subprocessor output and combine it.
|
|
|
|
func (e *binaryExprEvaluator) run() {
|
|
|
|
for {
|
|
|
|
// Read LHS value.
|
|
|
|
lhs, ok := <-e.lhs.C()
|
|
|
|
if !ok {
|
|
|
|
break
|
|
|
|
}
|
|
|
|
|
|
|
|
// Read RHS value.
|
|
|
|
rhs, ok := <-e.rhs.C()
|
|
|
|
if !ok {
|
|
|
|
break
|
|
|
|
}
|
|
|
|
|
|
|
|
// Merge maps.
|
2015-01-26 12:19:35 +00:00
|
|
|
m := make(map[Key]interface{})
|
2014-12-20 04:36:52 +00:00
|
|
|
for k, v := range lhs {
|
|
|
|
m[k] = e.eval(v, rhs[k])
|
|
|
|
}
|
|
|
|
for k, v := range rhs {
|
|
|
|
// Skip value if already processed in lhs loop.
|
|
|
|
if _, ok := m[k]; ok {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
m[k] = e.eval(float64(0), v)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Return value.
|
|
|
|
e.c <- m
|
|
|
|
}
|
|
|
|
|
|
|
|
// Mark the channel as complete.
|
|
|
|
close(e.c)
|
|
|
|
}
|
|
|
|
|
|
|
|
// eval evaluates two values using the evaluator's operation.
|
|
|
|
func (e *binaryExprEvaluator) eval(lhs, rhs interface{}) interface{} {
|
|
|
|
switch e.op {
|
|
|
|
case ADD:
|
|
|
|
return lhs.(float64) + rhs.(float64)
|
|
|
|
case SUB:
|
|
|
|
return lhs.(float64) - rhs.(float64)
|
|
|
|
case MUL:
|
|
|
|
return lhs.(float64) * rhs.(float64)
|
|
|
|
case DIV:
|
|
|
|
rhs := rhs.(float64)
|
|
|
|
if rhs == 0 {
|
|
|
|
return float64(0)
|
|
|
|
}
|
|
|
|
return lhs.(float64) / rhs
|
|
|
|
default:
|
|
|
|
// TODO: Validate operation & data types.
|
|
|
|
panic("invalid operation: " + e.op.String())
|
|
|
|
}
|
2014-12-08 05:08:39 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// literalProcessor represents a processor that continually sends a literal value.
|
|
|
|
type literalProcessor struct {
|
|
|
|
val interface{}
|
2015-01-26 12:19:35 +00:00
|
|
|
c chan map[Key]interface{}
|
2014-12-08 05:08:39 +00:00
|
|
|
done chan chan struct{}
|
|
|
|
}
|
|
|
|
|
|
|
|
// newLiteralProcessor returns a literalProcessor for a given value.
|
|
|
|
func newLiteralProcessor(val interface{}) *literalProcessor {
|
|
|
|
return &literalProcessor{
|
|
|
|
val: val,
|
2015-01-26 12:19:35 +00:00
|
|
|
c: make(chan map[Key]interface{}, 0),
|
2014-12-08 05:08:39 +00:00
|
|
|
done: make(chan chan struct{}, 0),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// C returns the streaming data channel.
|
2015-01-26 12:19:35 +00:00
|
|
|
func (p *literalProcessor) C() <-chan map[Key]interface{} { return p.c }
|
2014-12-08 05:08:39 +00:00
|
|
|
|
2015-01-26 12:19:35 +00:00
|
|
|
// Process continually returns a literal value with a "0" key.
|
|
|
|
func (p *literalProcessor) Process() { go p.run() }
|
2014-12-08 05:08:39 +00:00
|
|
|
|
|
|
|
// run executes the processor loop.
|
|
|
|
func (p *literalProcessor) run() {
|
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case ch := <-p.done:
|
|
|
|
close(ch)
|
|
|
|
return
|
2015-01-26 12:19:35 +00:00
|
|
|
case p.c <- map[Key]interface{}{Key{}: p.val}:
|
2014-12-08 05:08:39 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2014-12-09 15:45:29 +00:00
|
|
|
// stop stops the processor from sending values.
|
|
|
|
func (p *literalProcessor) stop() { syncClose(p.done) }
|
|
|
|
|
|
|
|
// name returns the source name.
|
2015-01-26 12:19:35 +00:00
|
|
|
func (p *literalProcessor) Name() string { return "" }
|
2014-12-08 05:08:39 +00:00
|
|
|
|
|
|
|
// syncClose closes a "done" channel and waits for a response.
|
|
|
|
func syncClose(done chan chan struct{}) {
|
|
|
|
ch := make(chan struct{}, 0)
|
|
|
|
done <- ch
|
|
|
|
<-ch
|
|
|
|
}
|
|
|
|
|
2015-01-26 12:19:35 +00:00
|
|
|
// Key represents a key returned by a Mapper or Reducer.
|
|
|
|
type Key struct {
|
|
|
|
Timestamp int64
|
|
|
|
Values string
|
|
|
|
}
|
|
|
|
|
|
|
|
type keySlice []Key
|
|
|
|
|
|
|
|
func (p keySlice) Len() int { return len(p) }
|
|
|
|
func (p keySlice) Less(i, j int) bool {
|
|
|
|
return p[i].Timestamp < p[j].Timestamp || p[i].Values < p[j].Values
|
|
|
|
}
|
|
|
|
func (p keySlice) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
|
|
|
|
|
|
|
|
// Emitter provides bufferred emit/flush of key/value pairs.
|
|
|
|
type Emitter struct {
|
|
|
|
c chan map[Key]interface{}
|
|
|
|
}
|
|
|
|
|
|
|
|
// NewEmitter returns a new instance of Emitter with a buffer size of n.
|
|
|
|
func NewEmitter(n int) *Emitter {
|
|
|
|
return &Emitter{
|
|
|
|
c: make(chan map[Key]interface{}, n),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Close closes the emitter's output channel.
|
|
|
|
func (e *Emitter) Close() error { close(e.c); return nil }
|
|
|
|
|
|
|
|
// C returns the emitter's output channel.
|
|
|
|
func (e *Emitter) C() <-chan map[Key]interface{} { return e.c }
|
|
|
|
|
|
|
|
// Emit sets a key and value on the emitter's bufferred data.
|
|
|
|
func (e *Emitter) Emit(key Key, value interface{}) { e.c <- map[Key]interface{}{key: value} }
|
|
|
|
|
2014-12-08 05:08:39 +00:00
|
|
|
// Row represents a single row returned from the execution of a statement.
|
|
|
|
type Row struct {
|
2014-12-11 06:32:45 +00:00
|
|
|
Name string `json:"name,omitempty"`
|
|
|
|
Tags map[string]string `json:"tags,omitempty"`
|
|
|
|
Columns []string `json:"columns"`
|
|
|
|
Values [][]interface{} `json:"values,omitempty"`
|
|
|
|
Err error `json:"err,omitempty"`
|
2014-12-08 05:08:39 +00:00
|
|
|
}
|
|
|
|
|
2014-12-18 15:44:21 +00:00
|
|
|
// tagsHash returns a hash of tag key/value pairs.
|
|
|
|
func (r *Row) tagsHash() uint64 {
|
|
|
|
h := fnv.New64a()
|
|
|
|
keys := r.tagsKeys()
|
|
|
|
for _, k := range keys {
|
|
|
|
h.Write([]byte(k))
|
|
|
|
h.Write([]byte(r.Tags[k]))
|
|
|
|
}
|
|
|
|
return h.Sum64()
|
|
|
|
}
|
|
|
|
|
|
|
|
// tagKeys returns a sorted list of tag keys.
|
|
|
|
func (r *Row) tagsKeys() []string {
|
|
|
|
a := make([]string, len(r.Tags))
|
|
|
|
for k := range r.Tags {
|
|
|
|
a = append(a, k)
|
|
|
|
}
|
|
|
|
sort.Strings(a)
|
|
|
|
return a
|
|
|
|
}
|
|
|
|
|
|
|
|
// Rows represents a list of rows that can be sorted consistently by name/tag.
|
|
|
|
type Rows []*Row
|
|
|
|
|
|
|
|
func (p Rows) Len() int { return len(p) }
|
|
|
|
|
|
|
|
func (p Rows) Less(i, j int) bool {
|
|
|
|
// Sort by name first.
|
|
|
|
if p[i].Name != p[j].Name {
|
|
|
|
return p[i].Name < p[j].Name
|
|
|
|
}
|
|
|
|
|
2014-12-21 17:05:15 +00:00
|
|
|
// Sort by tag set hash. Tags don't have a meaningful sort order so we
|
|
|
|
// just compute a hash and sort by that instead. This allows the tests
|
|
|
|
// to receive rows in a predictable order every time.
|
2014-12-18 15:44:21 +00:00
|
|
|
return p[i].tagsHash() < p[j].tagsHash()
|
|
|
|
}
|
|
|
|
|
|
|
|
func (p Rows) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
|
|
|
|
|
2015-01-26 12:19:35 +00:00
|
|
|
// MarshalStrings encodes an array of strings into a byte slice.
|
|
|
|
func MarshalStrings(a []string) (ret []byte) {
|
2014-12-18 15:44:21 +00:00
|
|
|
for _, s := range a {
|
|
|
|
// Create a slice for len+data
|
|
|
|
b := make([]byte, 2+len(s))
|
|
|
|
binary.BigEndian.PutUint16(b[0:2], uint16(len(s)))
|
|
|
|
copy(b[2:], s)
|
|
|
|
|
|
|
|
// Append it to the full byte slice.
|
|
|
|
ret = append(ret, b...)
|
|
|
|
}
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2015-01-26 12:19:35 +00:00
|
|
|
// UnmarshalStrings decodes a byte slice into an array of strings.
|
|
|
|
func UnmarshalStrings(b []byte) (ret []string) {
|
2014-12-18 15:44:21 +00:00
|
|
|
for {
|
|
|
|
// If there's no more data then exit.
|
|
|
|
if len(b) == 0 {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
// Decode size + data.
|
|
|
|
n := binary.BigEndian.Uint16(b[0:2])
|
|
|
|
ret = append(ret, string(b[2:n+2]))
|
|
|
|
|
|
|
|
// Move the byte slice forward and retry.
|
|
|
|
b = b[n+2:]
|
|
|
|
}
|
|
|
|
}
|