package influxql import ( "fmt" "time" "github.com/influxdata/influxdb/models" ) // Emitter groups values together by name, type Emitter struct { buf []Point itrs []Iterator ascending bool tags Tags row *models.Row // The columns to attach to each row. Columns []string // Removes the "time" column from output. // Used for meta queries where time does not apply. OmitTime bool } // NewEmitter returns a new instance of Emitter that pulls from itrs. func NewEmitter(itrs []Iterator, ascending bool) *Emitter { return &Emitter{ buf: make([]Point, len(itrs)), itrs: itrs, ascending: ascending, } } // Close closes the underlying iterators. func (e *Emitter) Close() error { return Iterators(e.itrs).Close() } // Emit returns the next row from the iterators. func (e *Emitter) Emit() *models.Row { // Immediately end emission if there are no iterators. if len(e.itrs) == 0 { return nil } // Continually read from iterators until they are exhausted. for { // Fill buffer. Return row if no more points remain. t, name, tags := e.loadBuf() if t == ZeroTime { row := e.row e.row = nil return row } // Read next set of values from all iterators at a given time/name/tags. // If no values are returned then return row. values := e.readAt(t, name, tags) if values == nil { row := e.row e.row = nil return row } // If there's no row yet then create one. // If the name and tags match the existing row, append to that row. // Otherwise return existing row and add values to next emitted row. if e.row == nil { e.createRow(name, tags, values) } else if e.row.Name == name && e.tags.Equals(&tags) { e.row.Values = append(e.row.Values, values) } else { row := e.row e.createRow(name, tags, values) return row } } } // loadBuf reads in points into empty buffer slots. // Returns the next time/name/tags to emit for. func (e *Emitter) loadBuf() (t int64, name string, tags Tags) { t = ZeroTime for i := range e.itrs { // Load buffer, if empty. if e.buf[i] == nil { e.buf[i] = e.readIterator(e.itrs[i]) } // Skip if buffer is empty. p := e.buf[i] if p == nil { continue } itrTime, itrName, itrTags := p.time(), p.name(), p.tags() // Initialize range values if not set. if t == ZeroTime { t, name, tags = itrTime, itrName, itrTags continue } // Update range values if lower and emitter is in time ascending order. if e.ascending { if (itrTime < t) || (itrTime == t && itrName < name) || (itrTime == t && itrName == name && itrTags.ID() < tags.ID()) { t, name, tags = itrTime, itrName, itrTags } continue } // Update range values if higher and emitter is in time descending order. if (itrTime > t) || (itrTime == t && itrName > name) || (itrTime == t && itrName == name && itrTags.ID() > tags.ID()) { t, name, tags = itrTime, itrName, itrTags } } return } // createRow creates a new row attached to the emitter. func (e *Emitter) createRow(name string, tags Tags, values []interface{}) { e.tags = tags e.row = &models.Row{ Name: name, Tags: tags.KeyValues(), Columns: e.Columns, Values: [][]interface{}{values}, } } // readAt returns the next slice of values from the iterators at time/name/tags. // Returns nil values once the iterators are exhausted. func (e *Emitter) readAt(t int64, name string, tags Tags) []interface{} { // If time is included then move colums over by one. offset := 1 if e.OmitTime { offset = 0 } values := make([]interface{}, len(e.itrs)+offset) if !e.OmitTime { values[0] = time.Unix(0, t).UTC() } for i, p := range e.buf { // Skip if buffer is empty. if p == nil { values[i+offset] = nil continue } // Skip point if it doesn't match time/name/tags. pTags := p.tags() if p.time() != t || p.name() != name || !pTags.Equals(&tags) { values[i+offset] = nil continue } // Read point value. values[i+offset] = p.value() // Clear buffer. e.buf[i] = nil } return values } // readIterator reads the next point from itr. func (e *Emitter) readIterator(itr Iterator) Point { if itr == nil { return nil } switch itr := itr.(type) { case FloatIterator: if p := itr.Next(); p != nil { return p } case IntegerIterator: if p := itr.Next(); p != nil { return p } case StringIterator: if p := itr.Next(); p != nil { return p } case BooleanIterator: if p := itr.Next(); p != nil { return p } default: panic(fmt.Sprintf("unsupported iterator: %T", itr)) } return nil }