Add tsi1 series system iterator.

pull/7913/head
Ben Johnson 2016-11-28 09:59:36 -07:00
parent 87f4e0ec0a
commit e7940cc556
No known key found for this signature in database
GPG Key ID: 81741CD251883081
8 changed files with 362 additions and 202 deletions

View File

@ -55,6 +55,9 @@ type Engine interface {
MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error) MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error)
MeasurementFields(measurement string) *MeasurementFields MeasurementFields(measurement string) *MeasurementFields
// InfluxQL system iterators
SeriesPointIterator(opt influxql.IteratorOptions) (influxql.Iterator, error)
// Statistics will return statistics relevant to this engine. // Statistics will return statistics relevant to this engine.
Statistics(tags map[string]string) []models.Statistic Statistics(tags map[string]string) []models.Statistic
LastModified() time.Time LastModified() time.Time

View File

@ -101,9 +101,8 @@ type Engine struct {
traceLogger zap.Logger // Logger to be used when trace-logging is on. traceLogger zap.Logger // Logger to be used when trace-logging is on.
traceLogging bool traceLogging bool
index tsdb.Index index tsdb.Index
fieldsMu sync.RWMutex fieldset *tsdb.MeasurementFieldSet
measurementFields map[string]*tsdb.MeasurementFields
WAL *WAL WAL *WAL
Cache *Cache Cache *Cache
@ -149,7 +148,7 @@ func NewEngine(id uint64, idx tsdb.Index, path string, walPath string, opt tsdb.
logOutput: os.Stderr, logOutput: os.Stderr,
traceLogging: opt.Config.TraceLoggingEnabled, traceLogging: opt.Config.TraceLoggingEnabled,
measurementFields: make(map[string]*tsdb.MeasurementFields), fieldset: tsdb.NewMeasurementFieldSet(),
WAL: w, WAL: w,
Cache: cache, Cache: cache,
@ -167,6 +166,9 @@ func NewEngine(id uint64, idx tsdb.Index, path string, walPath string, opt tsdb.
stats: &EngineStatistics{}, stats: &EngineStatistics{},
} }
// Attach fieldset to index.
e.index.SetFieldSet(e.fieldset)
if e.traceLogging { if e.traceLogging {
fs.enableTraceLogging(true) fs.enableTraceLogging(true)
w.enableTraceLogging(true) w.enableTraceLogging(true)
@ -302,22 +304,7 @@ func (e *Engine) MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error) {
// MeasurementFields returns the measurement fields for a measurement. // MeasurementFields returns the measurement fields for a measurement.
func (e *Engine) MeasurementFields(measurement string) *tsdb.MeasurementFields { func (e *Engine) MeasurementFields(measurement string) *tsdb.MeasurementFields {
e.fieldsMu.RLock() return e.fieldset.CreateFieldsIfNotExists(measurement)
m := e.measurementFields[measurement]
e.fieldsMu.RUnlock()
if m != nil {
return m
}
e.fieldsMu.Lock()
m = e.measurementFields[measurement]
if m == nil {
m = tsdb.NewMeasurementFields()
e.measurementFields[measurement] = m
}
e.fieldsMu.Unlock()
return m
} }
func (e *Engine) SeriesN() (uint64, error) { func (e *Engine) SeriesN() (uint64, error) {
@ -655,14 +642,7 @@ func (e *Engine) addToIndexFromKey(key []byte, fieldType influxql.DataType, inde
seriesKey, field := SeriesAndFieldFromCompositeKey(key) seriesKey, field := SeriesAndFieldFromCompositeKey(key)
name := tsdb.MeasurementFromSeriesKey(string(seriesKey)) name := tsdb.MeasurementFromSeriesKey(string(seriesKey))
e.fieldsMu.Lock() mf := e.fieldset.CreateFieldsIfNotExists(name)
mf := e.measurementFields[name]
if mf == nil {
mf = tsdb.NewMeasurementFields()
e.measurementFields[name] = mf
}
e.fieldsMu.Unlock()
if err := mf.CreateFieldIfNotExists(field, fieldType, false); err != nil { if err := mf.CreateFieldIfNotExists(field, fieldType, false); err != nil {
return err return err
} }
@ -848,9 +828,7 @@ func (e *Engine) DeleteSeriesRange(seriesKeys [][]byte, min, max int64) error {
// DeleteMeasurement deletes a measurement and all related series. // DeleteMeasurement deletes a measurement and all related series.
func (e *Engine) DeleteMeasurement(name []byte) error { func (e *Engine) DeleteMeasurement(name []byte) error {
e.mu.Lock() e.fieldset.Delete(string(name))
delete(e.measurementFields, string(name))
e.mu.Unlock()
// Attempt to find the series keys. // Attempt to find the series keys.
m, err := e.Measurement(name) m, err := e.Measurement(name)
@ -1335,18 +1313,8 @@ func (e *Engine) createVarRefIterator(opt influxql.IteratorOptions, aggregate bo
var itrs []influxql.Iterator var itrs []influxql.Iterator
if err := func() error { if err := func() error {
for _, name := range influxql.Sources(opt.Sources).Names() { for _, name := range influxql.Sources(opt.Sources).Names() {
// Retrieve measurement fields.
e.mu.Lock()
mf := e.measurementFields[name]
e.mu.Unlock()
// Skip if there are no fields.
if mf == nil {
continue
}
// Generate tag sets from index. // Generate tag sets from index.
tagSets, err := e.index.TagSets([]byte(name), opt.Dimensions, opt.Condition, mf) tagSets, err := e.index.TagSets([]byte(name), opt.Dimensions, opt.Condition)
if err != nil { if err != nil {
return err return err
} }
@ -1594,10 +1562,7 @@ func (e *Engine) createVarRefSeriesIterator(ref *influxql.VarRef, name string, s
// buildCursor creates an untyped cursor for a field. // buildCursor creates an untyped cursor for a field.
func (e *Engine) buildCursor(measurement, seriesKey string, ref *influxql.VarRef, opt influxql.IteratorOptions) cursor { func (e *Engine) buildCursor(measurement, seriesKey string, ref *influxql.VarRef, opt influxql.IteratorOptions) cursor {
// Look up fields for measurement. // Look up fields for measurement.
e.fieldsMu.RLock() mf := e.fieldset.Fields(measurement)
mf := e.measurementFields[measurement]
e.fieldsMu.RUnlock()
if mf == nil { if mf == nil {
return nil return nil
} }
@ -1671,6 +1636,10 @@ func (e *Engine) buildBooleanCursor(measurement, seriesKey, field string, opt in
return newBooleanCursor(opt.SeekTime(), opt.Ascending, cacheValues, keyCursor) return newBooleanCursor(opt.SeekTime(), opt.Ascending, cacheValues, keyCursor)
} }
func (e *Engine) SeriesPointIterator(opt influxql.IteratorOptions) (influxql.Iterator, error) {
return e.index.SeriesPointIterator(opt)
}
// SeriesFieldKey combine a series key and field name for a unique string to be hashed to a numeric ID. // SeriesFieldKey combine a series key and field name for a unique string to be hashed to a numeric ID.
func SeriesFieldKey(seriesKey, field string) string { func SeriesFieldKey(seriesKey, field string) string {
return seriesKey + keyFieldSeparator + field return seriesKey + keyFieldSeparator + field

View File

@ -31,7 +31,13 @@ type Index interface {
Dereference(b []byte) Dereference(b []byte)
TagSets(name []byte, dimensions []string, condition influxql.Expr, mf *MeasurementFields) ([]*influxql.TagSet, error) TagSets(name []byte, dimensions []string, condition influxql.Expr) ([]*influxql.TagSet, error)
// InfluxQL system iterators
SeriesPointIterator(opt influxql.IteratorOptions) (influxql.Iterator, error)
// Sets a shared fieldset from the engine.
SetFieldSet(fs *MeasurementFieldSet)
// To be removed w/ tsi1. // To be removed w/ tsi1.
SetFieldName(measurement, name string) SetFieldName(measurement, name string)

View File

@ -493,7 +493,7 @@ func (i *Index) Dereference(b []byte) {
} }
// TagSets returns a list of tag sets. // TagSets returns a list of tag sets.
func (i *Index) TagSets(name []byte, dimensions []string, condition influxql.Expr, mf *tsdb.MeasurementFields) ([]*influxql.TagSet, error) { func (i *Index) TagSets(name []byte, dimensions []string, condition influxql.Expr) ([]*influxql.TagSet, error) {
i.mu.RLock() i.mu.RLock()
defer i.mu.RUnlock() defer i.mu.RUnlock()
@ -520,8 +520,102 @@ func (i *Index) SeriesKeys() []string {
return s return s
} }
// SetFieldSet sets a shared field set from the engine.
func (i *Index) SetFieldSet(*tsdb.MeasurementFieldSet) {}
// SetFieldName adds a field name to a measurement. // SetFieldName adds a field name to a measurement.
func (i *Index) SetFieldName(measurement, name string) { func (i *Index) SetFieldName(measurement, name string) {
m := i.CreateMeasurementIndexIfNotExists(measurement) m := i.CreateMeasurementIndexIfNotExists(measurement)
m.SetFieldName(name) m.SetFieldName(name)
} }
// SeriesPointIterator returns an influxql iterator over all series.
func (i *Index) SeriesPointIterator(opt influxql.IteratorOptions) (influxql.Iterator, error) {
// Read and sort all measurements.
mms := make(tsdb.Measurements, 0, len(i.measurements))
for _, mm := range i.measurements {
mms = append(mms, mm)
}
sort.Sort(mms)
return &seriesPointIterator{
mms: mms,
point: influxql.FloatPoint{
Aux: make([]interface{}, len(opt.Aux)),
},
opt: opt,
}, nil
}
// seriesPointIterator emits series as influxql points.
type seriesPointIterator struct {
mms tsdb.Measurements
keys struct {
buf []string
i int
}
point influxql.FloatPoint // reusable point
opt influxql.IteratorOptions
}
// Stats returns stats about the points processed.
func (itr *seriesPointIterator) Stats() influxql.IteratorStats { return influxql.IteratorStats{} }
// Close closes the iterator.
func (itr *seriesPointIterator) Close() error { return nil }
// Next emits the next point in the iterator.
func (itr *seriesPointIterator) Next() (*influxql.FloatPoint, error) {
for {
// Load next measurement's keys if there are no more remaining.
if itr.keys.i >= len(itr.keys.buf) {
if err := itr.nextKeys(); err != nil {
return nil, err
}
if len(itr.keys.buf) == 0 {
return nil, nil
}
}
// Read the next key.
key := itr.keys.buf[itr.keys.i]
itr.keys.i++
// Write auxiliary fields.
for i, f := range itr.opt.Aux {
switch f.Val {
case "key":
itr.point.Aux[i] = key
}
}
return &itr.point, nil
}
}
// nextKeys reads all keys for the next measurement.
func (itr *seriesPointIterator) nextKeys() error {
for {
// Ensure previous keys are cleared out.
itr.keys.i, itr.keys.buf = 0, itr.keys.buf[:0]
// Read next measurement.
if len(itr.mms) == 0 {
return nil
}
mm := itr.mms[0]
itr.mms = itr.mms[1:]
// Read all series keys.
ids, err := mm.SeriesIDsAllOrByExpr(itr.opt.Condition)
if err != nil {
return err
} else if len(ids) == 0 {
continue
}
itr.keys.buf = mm.AppendSeriesKeysByID(itr.keys.buf, ids)
sort.Strings(itr.keys.buf)
return nil
}
}

View File

@ -37,6 +37,10 @@ type Index struct {
mu sync.RWMutex mu sync.RWMutex
logFiles []*LogFile logFiles []*LogFile
indexFiles IndexFiles indexFiles IndexFiles
// Fieldset shared with engine.
// TODO: Move field management into index.
fieldset *tsdb.MeasurementFieldSet
} }
// Open opens the index. // Open opens the index.
@ -124,6 +128,13 @@ func (i *Index) Close() error {
return nil return nil
} }
// SetFieldSet sets a shared field set from the engine.
func (i *Index) SetFieldSet(fs *tsdb.MeasurementFieldSet) {
i.mu.Lock()
i.fieldset = fs
i.mu.Unlock()
}
// SetLogFiles explicitly sets log files. // SetLogFiles explicitly sets log files.
// TEMPORARY: For testing only. // TEMPORARY: For testing only.
func (i *Index) SetLogFiles(a ...*LogFile) { i.logFiles = a } func (i *Index) SetLogFiles(a ...*LogFile) { i.logFiles = a }
@ -149,6 +160,19 @@ func (i *Index) files() []File {
return a return a
} }
// SeriesIterator returns an iterator over all series in the index.
func (i *Index) SeriesIterator() SeriesIterator {
a := make([]SeriesIterator, 0, i.FileN())
for _, f := range i.files() {
itr := f.SeriesIterator()
if itr == nil {
continue
}
a = append(a, itr)
}
return MergeSeriesIterators(a...)
}
// Measurement retrieves a measurement by name. // Measurement retrieves a measurement by name.
func (i *Index) Measurement(name []byte) (*tsdb.Measurement, error) { func (i *Index) Measurement(name []byte) (*tsdb.Measurement, error) {
return i.measurement(name), nil return i.measurement(name), nil
@ -587,8 +611,8 @@ func (i *Index) matchTagValueNotEqualNotEmptySeriesIterator(name, key []byte, va
// TagSets returns an ordered list of tag sets for a measurement by dimension // TagSets returns an ordered list of tag sets for a measurement by dimension
// and filtered by an optional conditional expression. // and filtered by an optional conditional expression.
func (i *Index) TagSets(name []byte, dimensions []string, condition influxql.Expr, mf *tsdb.MeasurementFields) ([]*influxql.TagSet, error) { func (i *Index) TagSets(name []byte, dimensions []string, condition influxql.Expr) ([]*influxql.TagSet, error) {
itr, err := i.MeasurementSeriesByExprIterator(name, condition, mf) itr, err := i.MeasurementSeriesByExprIterator(name, condition)
if err != nil { if err != nil {
return nil, err return nil, err
} else if itr == nil { } else if itr == nil {
@ -646,12 +670,12 @@ func (i *Index) TagSets(name []byte, dimensions []string, condition influxql.Exp
// MeasurementSeriesByExprIterator returns a series iterator for a measurement // MeasurementSeriesByExprIterator returns a series iterator for a measurement
// that is filtered by expr. If expr only contains time expressions then this // that is filtered by expr. If expr only contains time expressions then this
// call is equivalent to MeasurementSeriesIterator(). // call is equivalent to MeasurementSeriesIterator().
func (i *Index) MeasurementSeriesByExprIterator(name []byte, expr influxql.Expr, mf *tsdb.MeasurementFields) (SeriesIterator, error) { func (i *Index) MeasurementSeriesByExprIterator(name []byte, expr influxql.Expr) (SeriesIterator, error) {
// Return all series for the measurement if there are no tag expressions. // Return all series for the measurement if there are no tag expressions.
if expr == nil || influxql.OnlyTimeExpr(expr) { if expr == nil || influxql.OnlyTimeExpr(expr) {
return i.MeasurementSeriesIterator(name), nil return i.MeasurementSeriesIterator(name), nil
} }
return i.seriesByExprIterator(name, expr, mf) return i.seriesByExprIterator(name, expr, i.fieldset.CreateFieldsIfNotExists(string(name)))
} }
func (i *Index) seriesByExprIterator(name []byte, expr influxql.Expr, mf *tsdb.MeasurementFields) (SeriesIterator, error) { func (i *Index) seriesByExprIterator(name []byte, expr influxql.Expr, mf *tsdb.MeasurementFields) (SeriesIterator, error) {
@ -808,6 +832,11 @@ func (i *Index) RemoveShard(shardID uint64) {}
func (i *Index) AssignShard(k string, shardID uint64) {} func (i *Index) AssignShard(k string, shardID uint64) {}
func (i *Index) UnassignShard(k string, shardID uint64) {} func (i *Index) UnassignShard(k string, shardID uint64) {}
// SeriesPointIterator returns an influxql iterator over all series.
func (i *Index) SeriesPointIterator(opt influxql.IteratorOptions) (influxql.Iterator, error) {
return newSeriesPointIterator(i, opt), nil
}
// File represents a log or index file. // File represents a log or index file.
type File interface { type File interface {
Measurement(name []byte) MeasurementElem Measurement(name []byte) MeasurementElem
@ -815,6 +844,8 @@ type File interface {
TagValueIterator(name, key []byte) (itr TagValueIterator, deleted bool) TagValueIterator(name, key []byte) (itr TagValueIterator, deleted bool)
SeriesIterator() SeriesIterator
MeasurementSeriesIterator(name []byte) SeriesIterator
TagKeySeriesIterator(name, key []byte) (itr SeriesIterator, deleted bool) TagKeySeriesIterator(name, key []byte) (itr SeriesIterator, deleted bool)
TagValueSeriesIterator(name, key, value []byte) (itr SeriesIterator, deleted bool) TagValueSeriesIterator(name, key, value []byte) (itr SeriesIterator, deleted bool)
} }
@ -838,3 +869,72 @@ func (fe FilterExprs) Len() int {
} }
return len(fe) return len(fe)
} }
// seriesPointIterator adapts SeriesIterator to an influxql.Iterator.
type seriesPointIterator struct {
index *Index
mitr MeasurementIterator
sitr SeriesIterator
opt influxql.IteratorOptions
point influxql.FloatPoint // reusable point
}
// newSeriesPointIterator returns a new instance of seriesPointIterator.
func newSeriesPointIterator(index *Index, opt influxql.IteratorOptions) *seriesPointIterator {
return &seriesPointIterator{
index: index,
mitr: index.MeasurementIterator(),
point: influxql.FloatPoint{
Aux: make([]interface{}, len(opt.Aux)),
},
opt: opt,
}
}
// Stats returns stats about the points processed.
func (itr *seriesPointIterator) Stats() influxql.IteratorStats { return influxql.IteratorStats{} }
// Close closes the iterator.
func (itr *seriesPointIterator) Close() error { return nil }
// Next emits the next point in the iterator.
func (itr *seriesPointIterator) Next() (*influxql.FloatPoint, error) {
for {
// Create new series iterator, if necessary.
// Exit if there are no measurements remaining.
if itr.sitr == nil {
m := itr.mitr.Next()
if m == nil {
return nil, nil
}
sitr, err := itr.index.MeasurementSeriesByExprIterator(m.Name(), itr.opt.Condition)
if err != nil {
return nil, err
} else if sitr == nil {
continue
}
itr.sitr = sitr
}
// Read next series element.
e := itr.sitr.Next()
if e == nil {
itr.sitr = nil
continue
}
// Convert to a key.
key := string(models.MakeKey(e.Name(), e.Tags()))
// Write auxiliary fields.
for i, f := range itr.opt.Aux {
switch f.Val {
case "key":
itr.point.Aux[i] = key
}
}
return &itr.point, nil
}
}

View File

@ -429,6 +429,29 @@ func (f *LogFile) execSeriesEntry(e *LogEntry) {
f.mms[string(e.Name)] = mm f.mms[string(e.Name)] = mm
} }
// SeriesIterator returns an iterator over all series in the log file.
func (f *LogFile) SeriesIterator() SeriesIterator {
f.mu.RLock()
defer f.mu.RUnlock()
// Sort measurement names determine total series count.
var n int
names := make([][]byte, 0, len(f.mms))
for _, mm := range f.mms {
names = append(names, mm.name)
n += len(mm.series)
}
sort.Sort(byteSlices(names))
// Combine series across all measurements.
series := make(logSeries, 0, n)
for _, name := range names {
series = append(series, f.mms[string(name)].series...)
}
return newLogSeriesIterator(series)
}
// measurement returns a measurement by name. // measurement returns a measurement by name.
func (f *LogFile) measurement(name []byte) logMeasurement { func (f *LogFile) measurement(name []byte) logMeasurement {
mm, ok := f.mms[string(name)] mm, ok := f.mms[string(name)]
@ -451,7 +474,7 @@ func (f *LogFile) MeasurementIterator() MeasurementIterator {
return &itr return &itr
} }
// MeasurementSeriesIterator returns an iterator over all series in the log file. // MeasurementSeriesIterator returns an iterator over all series for a measurement.
func (f *LogFile) MeasurementSeriesIterator(name []byte) SeriesIterator { func (f *LogFile) MeasurementSeriesIterator(name []byte) SeriesIterator {
f.mu.RLock() f.mu.RLock()
defer f.mu.RUnlock() defer f.mu.RUnlock()

View File

@ -473,48 +473,50 @@ type seriesIntersectIterator struct {
// Next returns the next element which occurs in both iterators. // Next returns the next element which occurs in both iterators.
func (itr *seriesIntersectIterator) Next() (e SeriesElem) { func (itr *seriesIntersectIterator) Next() (e SeriesElem) {
// Fill buffers. for {
if itr.buf[0] == nil { // Fill buffers.
itr.buf[0] = itr.itrs[0].Next() if itr.buf[0] == nil {
} itr.buf[0] = itr.itrs[0].Next()
if itr.buf[1] == nil { }
itr.buf[1] = itr.itrs[1].Next() if itr.buf[1] == nil {
} itr.buf[1] = itr.itrs[1].Next()
// Exit if either buffer is still empty.
if itr.buf[0] == nil || itr.buf[1] == nil {
return nil
}
// Return lesser series.
if cmp := CompareSeriesElem(itr.buf[0], itr.buf[1]); cmp == -1 {
e, itr.buf[0] = itr.buf[0], nil
return e
} else if cmp == 1 {
e, itr.buf[1] = itr.buf[1], nil
return e
}
// Merge series together if equal.
itr.e.SeriesElem = itr.buf[0]
// Attach expression.
expr0 := itr.buf[0].Expr()
expr1 := itr.buf[0].Expr()
if expr0 == nil {
itr.e.expr = expr1
} else if expr1 == nil {
itr.e.expr = expr0
} else {
itr.e.expr = &influxql.BinaryExpr{
Op: influxql.AND,
LHS: expr0,
RHS: expr1,
} }
}
itr.buf[0], itr.buf[1] = nil, nil // Exit if either buffer is still empty.
return &itr.e if itr.buf[0] == nil || itr.buf[1] == nil {
return nil
}
// Skip if both series are not equal.
if cmp := CompareSeriesElem(itr.buf[0], itr.buf[1]); cmp == -1 {
itr.buf[0] = nil
continue
} else if cmp == 1 {
itr.buf[1] = nil
continue
}
// Merge series together if equal.
itr.e.SeriesElem = itr.buf[0]
// Attach expression.
expr0 := itr.buf[0].Expr()
expr1 := itr.buf[0].Expr()
if expr0 == nil {
itr.e.expr = expr1
} else if expr1 == nil {
itr.e.expr = expr0
} else {
itr.e.expr = &influxql.BinaryExpr{
Op: influxql.AND,
LHS: expr0,
RHS: expr1,
}
}
itr.buf[0], itr.buf[1] = nil, nil
return &itr.e
}
} }
// UnionSeriesIterators returns an iterator that returns series from both // UnionSeriesIterators returns an iterator that returns series from both

View File

@ -705,7 +705,7 @@ func (s *Shard) createSystemIterator(opt influxql.IteratorOptions) (influxql.Ite
case "_fieldKeys": case "_fieldKeys":
return NewFieldKeysIterator(s, opt) return NewFieldKeysIterator(s, opt)
case "_series": case "_series":
return NewSeriesIterator(s, opt) return s.createSeriesIterator(opt)
case "_tagKeys": case "_tagKeys":
return NewTagKeysIterator(s, opt) return NewTagKeysIterator(s, opt)
default: default:
@ -713,6 +713,28 @@ func (s *Shard) createSystemIterator(opt influxql.IteratorOptions) (influxql.Ite
} }
} }
// createSeriesIterator returns a new instance of SeriesIterator.
func (s *Shard) createSeriesIterator(opt influxql.IteratorOptions) (influxql.Iterator, error) {
// Only equality operators are allowed.
var err error
influxql.WalkFunc(opt.Condition, func(n influxql.Node) {
switch n := n.(type) {
case *influxql.BinaryExpr:
switch n.Op {
case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX,
influxql.OR, influxql.AND:
default:
err = errors.New("invalid tag comparison operator")
}
}
})
if err != nil {
return nil, err
}
return s.engine.SeriesPointIterator(opt)
}
// FieldDimensions returns unique sets of fields and dimensions across a list of sources. // FieldDimensions returns unique sets of fields and dimensions across a list of sources.
func (s *Shard) FieldDimensions(sources influxql.Sources) (fields map[string]influxql.DataType, dimensions map[string]struct{}, err error) { func (s *Shard) FieldDimensions(sources influxql.Sources) (fields map[string]influxql.DataType, dimensions map[string]struct{}, err error) {
if err := s.ready(); err != nil { if err := s.ready(); err != nil {
@ -1033,6 +1055,55 @@ func (m *MeasurementFields) FieldSet() map[string]influxql.DataType {
return fields return fields
} }
// MeasurementFieldSet represents a collection of fields by measurement.
// This safe for concurrent use.
type MeasurementFieldSet struct {
mu sync.RWMutex
fields map[string]*MeasurementFields
}
// NewMeasurementFieldSet returns a new instance of MeasurementFieldSet.
func NewMeasurementFieldSet() *MeasurementFieldSet {
return &MeasurementFieldSet{
fields: make(map[string]*MeasurementFields),
}
}
// Fields returns fields for a measurement by name.
func (fs *MeasurementFieldSet) Fields(name string) *MeasurementFields {
fs.mu.RLock()
mf := fs.fields[name]
fs.mu.RUnlock()
return mf
}
// CreateFieldsIfNotExists returns fields for a measurement by name.
func (fs *MeasurementFieldSet) CreateFieldsIfNotExists(name string) *MeasurementFields {
fs.mu.RLock()
mf := fs.fields[name]
fs.mu.RUnlock()
if mf != nil {
return mf
}
fs.mu.Lock()
mf = fs.fields[name]
if mf == nil {
mf = NewMeasurementFields()
fs.fields[name] = mf
}
fs.mu.Unlock()
return mf
}
// Delete removes a field set for a measurement.
func (fs *MeasurementFieldSet) Delete(name string) {
fs.mu.Lock()
delete(fs.fields, name)
fs.mu.Unlock()
}
// Field represents a series field. // Field represents a series field.
type Field struct { type Field struct {
ID uint8 `json:"id,omitempty"` ID uint8 `json:"id,omitempty"`
@ -1159,114 +1230,6 @@ func (itr *fieldKeysIterator) Next() (*influxql.FloatPoint, error) {
} }
} }
// seriesIterator emits series ids.
type seriesIterator struct {
mms Measurements
keys struct {
buf []string
i int
}
point influxql.FloatPoint // reusable point
opt influxql.IteratorOptions
}
// NewSeriesIterator returns a new instance of SeriesIterator.
func NewSeriesIterator(sh *Shard, opt influxql.IteratorOptions) (influxql.Iterator, error) {
// Only equality operators are allowed.
var err error
influxql.WalkFunc(opt.Condition, func(n influxql.Node) {
switch n := n.(type) {
case *influxql.BinaryExpr:
switch n.Op {
case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX,
influxql.OR, influxql.AND:
default:
err = errors.New("invalid tag comparison operator")
}
}
})
if err != nil {
return nil, err
}
// Read and sort all measurements.
mms, err := sh.engine.Measurements()
if err != nil {
return nil, err
}
sort.Sort(mms)
return &seriesIterator{
mms: mms,
point: influxql.FloatPoint{
Aux: make([]interface{}, len(opt.Aux)),
},
opt: opt,
}, nil
}
// Stats returns stats about the points processed.
func (itr *seriesIterator) Stats() influxql.IteratorStats { return influxql.IteratorStats{} }
// Close closes the iterator.
func (itr *seriesIterator) Close() error { return nil }
// Next emits the next point in the iterator.
func (itr *seriesIterator) Next() (*influxql.FloatPoint, error) {
for {
// Load next measurement's keys if there are no more remaining.
if itr.keys.i >= len(itr.keys.buf) {
if err := itr.nextKeys(); err != nil {
return nil, err
}
if len(itr.keys.buf) == 0 {
return nil, nil
}
}
// Read the next key.
key := itr.keys.buf[itr.keys.i]
itr.keys.i++
// Write auxiliary fields.
for i, f := range itr.opt.Aux {
switch f.Val {
case "key":
itr.point.Aux[i] = key
}
}
return &itr.point, nil
}
}
// nextKeys reads all keys for the next measurement.
func (itr *seriesIterator) nextKeys() error {
for {
// Ensure previous keys are cleared out.
itr.keys.i, itr.keys.buf = 0, itr.keys.buf[:0]
// Read next measurement.
if len(itr.mms) == 0 {
return nil
}
mm := itr.mms[0]
itr.mms = itr.mms[1:]
// Read all series keys.
ids, err := mm.seriesIDsAllOrByExpr(itr.opt.Condition)
if err != nil {
return err
} else if len(ids) == 0 {
continue
}
itr.keys.buf = mm.AppendSeriesKeysByID(itr.keys.buf, ids)
sort.Strings(itr.keys.buf)
return nil
}
}
// NewTagKeysIterator returns a new instance of TagKeysIterator. // NewTagKeysIterator returns a new instance of TagKeysIterator.
func NewTagKeysIterator(sh *Shard, opt influxql.IteratorOptions) (influxql.Iterator, error) { func NewTagKeysIterator(sh *Shard, opt influxql.IteratorOptions) (influxql.Iterator, error) {
fn := func(m *Measurement) []string { fn := func(m *Measurement) []string {