diff --git a/tsdb/index/inmem/inmem.go b/tsdb/index/inmem/inmem.go index 089b2f52cb..ebfd32e3fa 100644 --- a/tsdb/index/inmem/inmem.go +++ b/tsdb/index/inmem/inmem.go @@ -46,9 +46,9 @@ type Index struct { mu sync.RWMutex // In-memory metadata index, built on load and updated when new series come in - measurements map[string]*tsdb.Measurement // measurement name to object and index - series map[string]*tsdb.Series // map series key to the Series object - lastID uint64 // last used series ID. They're in memory only for this shard + measurements map[string]*Measurement // measurement name to object and index + series map[string]*Series // map series key to the Series object + lastID uint64 // last used series ID. They're in memory only for this shard seriesSketch, seriesTSSketch *hll.Plus measurementsSketch, measurementsTSSketch *hll.Plus @@ -57,8 +57,8 @@ type Index struct { // NewIndex returns a new initialized Index. func NewIndex() *Index { index := &Index{ - measurements: make(map[string]*tsdb.Measurement), - series: make(map[string]*tsdb.Series), + measurements: make(map[string]*Measurement), + series: make(map[string]*Series), } index.seriesSketch = hll.NewDefaultPlus() @@ -74,7 +74,7 @@ func (i *Index) Open() (err error) { return nil } func (i *Index) Close() error { return nil } // Series returns a series by key. -func (i *Index) Series(key []byte) (*tsdb.Series, error) { +func (i *Index) Series(key []byte) (*Series, error) { i.mu.RLock() s := i.series[string(key)] i.mu.RUnlock() @@ -99,7 +99,7 @@ func (i *Index) SeriesN() int64 { } // Measurement returns the measurement object from the index by the name -func (i *Index) Measurement(name []byte) (*tsdb.Measurement, error) { +func (i *Index) Measurement(name []byte) (*Measurement, error) { i.mu.RLock() defer i.mu.RUnlock() return i.measurements[string(name)], nil @@ -120,11 +120,11 @@ func (i *Index) MeasurementsSketches() (estimator.Sketch, estimator.Sketch, erro } // MeasurementsByName returns a list of measurements. -func (i *Index) MeasurementsByName(names [][]byte) ([]*tsdb.Measurement, error) { +func (i *Index) MeasurementsByName(names [][]byte) ([]*Measurement, error) { i.mu.RLock() defer i.mu.RUnlock() - a := make([]*tsdb.Measurement, 0, len(names)) + a := make([]*Measurement, 0, len(names)) for _, name := range names { if m := i.measurements[string(name)]; m != nil { a = append(a, m) @@ -168,7 +168,7 @@ func (i *Index) CreateSeriesIfNotExists(shardID uint64, key, name []byte, tags m // set the in memory ID for query processing on this shard // The series key and tags are clone to prevent a memory leak - series := tsdb.NewSeries([]byte(string(key)), tags.Clone()) + series := NewSeries([]byte(string(key)), tags.Clone()) series.ID = i.lastID + 1 i.lastID++ @@ -187,7 +187,7 @@ func (i *Index) CreateSeriesIfNotExists(shardID uint64, key, name []byte, tags m // CreateMeasurementIndexIfNotExists creates or retrieves an in memory index // object for the measurement -func (i *Index) CreateMeasurementIndexIfNotExists(name []byte) *tsdb.Measurement { +func (i *Index) CreateMeasurementIndexIfNotExists(name []byte) *Measurement { name = escape.Unescape(name) // See if the measurement exists using a read-lock @@ -207,7 +207,7 @@ func (i *Index) CreateMeasurementIndexIfNotExists(name []byte) *tsdb.Measurement // and acquire the write lock m = i.measurements[string(name)] if m == nil { - m = tsdb.NewMeasurement(string(name)) + m = NewMeasurement(string(name)) i.measurements[string(name)] = m // Add the measurement to the measurements sketch. @@ -343,7 +343,7 @@ func (i *Index) measurementNamesByExpr(expr influxql.Expr) ([][]byte, error) { return nil, fmt.Errorf("left side of '%s' must be a tag key", e.Op.String()) } - tf := &tsdb.TagFilter{ + tf := &TagFilter{ Op: e.Op, Key: tag.Val, } @@ -420,7 +420,7 @@ func (i *Index) measurementNamesByNameFilter(op influxql.Token, val string, rege } // measurementNamesByTagFilters returns the sorted measurements matching the filters on tag values. -func (i *Index) measurementNamesByTagFilters(filter *tsdb.TagFilter) [][]byte { +func (i *Index) measurementNamesByTagFilters(filter *TagFilter) [][]byte { // Build a list of measurements matching the filters. var names [][]byte var tagMatch bool @@ -599,7 +599,7 @@ func (i *Index) ForEachMeasurementName(fn func(name []byte) error) error { i.mu.RLock() defer i.mu.RUnlock() - mms := make(tsdb.Measurements, 0, len(i.measurements)) + mms := make(Measurements, 0, len(i.measurements)) for _, m := range i.measurements { mms = append(mms, m) } @@ -649,7 +649,7 @@ func (i *Index) MeasurementSeriesKeysByExpr(name []byte, condition influxql.Expr // SeriesPointIterator returns an influxql iterator over all series. func (i *Index) SeriesPointIterator(opt influxql.IteratorOptions) (influxql.Iterator, error) { // Read and sort all measurements. - mms := make(tsdb.Measurements, 0, len(i.measurements)) + mms := make(Measurements, 0, len(i.measurements)) for _, mm := range i.measurements { mms = append(mms, mm) } @@ -838,7 +838,7 @@ func NewShardIndex(id uint64, path string, opt tsdb.EngineOptions) tsdb.Index { // seriesPointIterator emits series as influxql points. type seriesPointIterator struct { - mms tsdb.Measurements + mms Measurements keys struct { buf []string i int diff --git a/tsdb/index/inmem/meta.go b/tsdb/index/inmem/meta.go new file mode 100644 index 0000000000..da73313962 --- /dev/null +++ b/tsdb/index/inmem/meta.go @@ -0,0 +1,1566 @@ +package inmem + +import ( + "bytes" + "fmt" + "regexp" + "sort" + "sync" + + "github.com/influxdata/influxdb/influxql" + "github.com/influxdata/influxdb/models" + "github.com/influxdata/influxdb/tsdb" +) + +// Measurement represents a collection of time series in a database. It also +// contains in memory structures for indexing tags. Exported functions are +// goroutine safe while un-exported functions assume the caller will use the +// appropriate locks. +type Measurement struct { + mu sync.RWMutex + Name string `json:"name,omitempty"` + fieldNames map[string]struct{} + + // in-memory index fields + seriesByID map[uint64]*Series // lookup table for series by their id + seriesByTagKeyValue map[string]map[string]SeriesIDs // map from tag key to value to sorted set of series ids + + // lazyily created sorted series IDs + sortedSeriesIDs SeriesIDs // sorted list of series IDs in this measurement +} + +// NewMeasurement allocates and initializes a new Measurement. +func NewMeasurement(name string) *Measurement { + return &Measurement{ + Name: name, + fieldNames: make(map[string]struct{}), + + seriesByID: make(map[uint64]*Series), + seriesByTagKeyValue: make(map[string]map[string]SeriesIDs), + } +} + +func (m *Measurement) HasField(name string) bool { + m.mu.RLock() + _, hasField := m.fieldNames[name] + m.mu.RUnlock() + return hasField +} + +// SeriesByID returns a series by identifier. +func (m *Measurement) SeriesByID(id uint64) *Series { + m.mu.RLock() + defer m.mu.RUnlock() + return m.seriesByID[id] +} + +// SeriesByIDMap returns the internal seriesByID map. +func (m *Measurement) SeriesByIDMap() map[uint64]*Series { + m.mu.RLock() + defer m.mu.RUnlock() + return m.seriesByID +} + +// SeriesByIDSlice returns a list of series by identifiers. +func (m *Measurement) SeriesByIDSlice(ids []uint64) []*Series { + m.mu.RLock() + defer m.mu.RUnlock() + a := make([]*Series, len(ids)) + for i, id := range ids { + a[i] = m.seriesByID[id] + } + return a +} + +// AppendSeriesKeysByID appends keys for a list of series ids to a buffer. +func (m *Measurement) AppendSeriesKeysByID(dst []string, ids []uint64) []string { + m.mu.RLock() + defer m.mu.RUnlock() + for _, id := range ids { + if s := m.seriesByID[id]; s != nil { + dst = append(dst, s.Key) + } + } + return dst +} + +// SeriesKeysByID returns the a list of keys for a set of ids. +func (m *Measurement) SeriesKeysByID(ids SeriesIDs) [][]byte { + m.mu.RLock() + defer m.mu.RUnlock() + keys := make([][]byte, 0, len(ids)) + for _, id := range ids { + s := m.seriesByID[id] + if s == nil { + continue + } + keys = append(keys, []byte(s.Key)) + } + return keys +} + +// SeriesKeys returns the keys of every series in this measurement +func (m *Measurement) SeriesKeys() [][]byte { + m.mu.RLock() + defer m.mu.RUnlock() + keys := make([][]byte, 0, len(m.seriesByID)) + for _, s := range m.seriesByID { + keys = append(keys, []byte(s.Key)) + } + return keys +} + +func (m *Measurement) SeriesIDs() SeriesIDs { + m.mu.RLock() + if len(m.sortedSeriesIDs) == len(m.seriesByID) { + s := m.sortedSeriesIDs + m.mu.RUnlock() + return s + } + m.mu.RUnlock() + + m.mu.Lock() + if len(m.sortedSeriesIDs) == len(m.seriesByID) { + s := m.sortedSeriesIDs + m.mu.Unlock() + return s + } + + if cap(m.sortedSeriesIDs) < len(m.seriesByID) { + m.sortedSeriesIDs = make(SeriesIDs, 0, len(m.seriesByID)) + } + for k := range m.seriesByID { + m.sortedSeriesIDs = append(m.sortedSeriesIDs, k) + } + sort.Sort(m.sortedSeriesIDs) + s := m.sortedSeriesIDs + m.mu.Unlock() + return s +} + +// HasTagKey returns true if at least one series in this measurement has written a value for the passed in tag key +func (m *Measurement) HasTagKey(k string) bool { + m.mu.RLock() + defer m.mu.RUnlock() + _, hasTag := m.seriesByTagKeyValue[k] + return hasTag +} + +func (m *Measurement) HasTagKeyValue(k, v []byte) bool { + m.mu.RLock() + if vals, ok := m.seriesByTagKeyValue[string(k)]; ok { + _, ok := vals[string(v)] + m.mu.RUnlock() + return ok + } + m.mu.RUnlock() + return false +} + +// HasSeries returns true if there is at least 1 series under this measurement. +func (m *Measurement) HasSeries() bool { + m.mu.RLock() + defer m.mu.RUnlock() + return len(m.seriesByID) > 0 +} + +// Cardinality returns the number of values associated with the given tag key. +func (m *Measurement) Cardinality(key string) int { + var n int + m.mu.RLock() + n = m.cardinality(key) + m.mu.RUnlock() + return n +} + +func (m *Measurement) cardinality(key string) int { + return len(m.seriesByTagKeyValue[key]) +} + +// CardinalityBytes returns the number of values associated with the given tag key. +func (m *Measurement) CardinalityBytes(key []byte) int { + var n int + m.mu.RLock() + n = len(m.seriesByTagKeyValue[string(key)]) + m.mu.RUnlock() + return n +} + +// AddSeries adds a series to the measurement's index. +// It returns true if the series was added successfully or false if the series was already present. +func (m *Measurement) AddSeries(s *Series) bool { + m.mu.RLock() + if _, ok := m.seriesByID[s.ID]; ok { + m.mu.RUnlock() + return false + } + m.mu.RUnlock() + + m.mu.Lock() + defer m.mu.Unlock() + + if _, ok := m.seriesByID[s.ID]; ok { + return false + } + + m.seriesByID[s.ID] = s + + if len(m.sortedSeriesIDs) == 0 || s.ID > m.sortedSeriesIDs[len(m.sortedSeriesIDs)-1] { + m.sortedSeriesIDs = append(m.sortedSeriesIDs, s.ID) + } + + // add this series id to the tag index on the measurement + s.ForEachTag(func(t models.Tag) { + valueMap := m.seriesByTagKeyValue[string(t.Key)] + if valueMap == nil { + valueMap = make(map[string]SeriesIDs) + m.seriesByTagKeyValue[string(t.Key)] = valueMap + } + ids := valueMap[string(t.Value)] + ids = append(ids, s.ID) + + // most of the time the series ID will be higher than all others because it's a new + // series. So don't do the sort if we don't have to. + if len(ids) > 1 && ids[len(ids)-1] < ids[len(ids)-2] { + sort.Sort(ids) + } + valueMap[string(t.Value)] = ids + }) + + return true +} + +// DropSeries removes a series from the measurement's index. +func (m *Measurement) DropSeries(series *Series) { + seriesID := series.ID + m.mu.Lock() + defer m.mu.Unlock() + + if _, ok := m.seriesByID[seriesID]; !ok { + return + } + delete(m.seriesByID, seriesID) + + // clear our lazily sorted set of ids + m.sortedSeriesIDs = m.sortedSeriesIDs[:0] + + // remove this series id from the tag index on the measurement + // s.seriesByTagKeyValue is defined as map[string]map[string]SeriesIDs + series.ForEachTag(func(t models.Tag) { + values := m.seriesByTagKeyValue[string(t.Key)][string(t.Value)] + ids := filter(values, seriesID) + // Check to see if we have any ids, if not, remove the key + if len(ids) == 0 { + delete(m.seriesByTagKeyValue[string(t.Key)], string(t.Value)) + } else { + m.seriesByTagKeyValue[string(t.Key)][string(t.Value)] = ids + } + + // If we have no values, then we delete the key + if len(m.seriesByTagKeyValue[string(t.Key)]) == 0 { + delete(m.seriesByTagKeyValue, string(t.Key)) + } + }) + + return +} + +// filters walks the where clause of a select statement and returns a map with all series ids +// matching the where clause and any filter expression that should be applied to each +func (m *Measurement) filters(condition influxql.Expr) ([]uint64, map[uint64]influxql.Expr, error) { + if condition == nil || influxql.OnlyTimeExpr(condition) { + return m.SeriesIDs(), nil, nil + } + return m.WalkWhereForSeriesIds(condition) +} + +// ForEachSeriesByExpr iterates over all series filtered by condition. +func (m *Measurement) ForEachSeriesByExpr(condition influxql.Expr, fn func(tags models.Tags) error) error { + // Retrieve matching series ids. + ids, _, err := m.filters(condition) + if err != nil { + return err + } + + // Iterate over each series. + for _, id := range ids { + s := m.SeriesByID(id) + if err := fn(s.Tags()); err != nil { + return err + } + } + + return nil +} + +// TagSets returns the unique tag sets that exist for the given tag keys. This is used to determine +// what composite series will be created by a group by. i.e. "group by region" should return: +// {"region":"uswest"}, {"region":"useast"} +// or region, service returns +// {"region": "uswest", "service": "redis"}, {"region": "uswest", "service": "mysql"}, etc... +// This will also populate the TagSet objects with the series IDs that match each tagset and any +// influx filter expression that goes with the series +// TODO: this shouldn't be exported. However, until tx.go and the engine get refactored into tsdb, we need it. +func (m *Measurement) TagSets(shardID uint64, opt influxql.IteratorOptions) ([]*influxql.TagSet, error) { + // get the unique set of series ids and the filters that should be applied to each + ids, filters, err := m.filters(opt.Condition) + if err != nil { + return nil, err + } + + m.mu.RLock() + // For every series, get the tag values for the requested tag keys i.e. dimensions. This is the + // TagSet for that series. Series with the same TagSet are then grouped together, because for the + // purpose of GROUP BY they are part of the same composite series. + tagSets := make(map[string]*influxql.TagSet, 64) + var seriesN int + for _, id := range ids { + // Abort if the query was killed + select { + case <-opt.InterruptCh: + return nil, influxql.ErrQueryInterrupted + default: + } + + if opt.MaxSeriesN > 0 && seriesN > opt.MaxSeriesN { + return nil, fmt.Errorf("max-select-series limit exceeded: (%d/%d)", seriesN, opt.MaxSeriesN) + } + + s := m.seriesByID[id] + if !s.Assigned(shardID) { + continue + } + tags := make(map[string]string, len(opt.Dimensions)) + + // Build the TagSet for this series. + for _, dim := range opt.Dimensions { + tags[dim] = s.GetTagString(dim) + } + + // Convert the TagSet to a string, so it can be added to a map allowing TagSets to be handled + // as a set. + tagsAsKey := tsdb.MarshalTags(tags) + tagSet, ok := tagSets[string(tagsAsKey)] + if !ok { + // This TagSet is new, create a new entry for it. + tagSet = &influxql.TagSet{ + Tags: tags, + Key: tagsAsKey, + } + } + // Associate the series and filter with the Tagset. + tagSet.AddFilter(m.seriesByID[id].Key, filters[id]) + seriesN++ + + // Ensure it's back in the map. + tagSets[string(tagsAsKey)] = tagSet + } + // Release the lock while we sort all the tags + m.mu.RUnlock() + + // Sort the series in each tag set. + for _, t := range tagSets { + // Abort if the query was killed + select { + case <-opt.InterruptCh: + return nil, influxql.ErrQueryInterrupted + default: + } + + sort.Sort(t) + } + + // The TagSets have been created, as a map of TagSets. Just send + // the values back as a slice, sorting for consistency. + sortedTagsSets := make([]*influxql.TagSet, 0, len(tagSets)) + for _, v := range tagSets { + sortedTagsSets = append(sortedTagsSets, v) + } + sort.Sort(byTagKey(sortedTagsSets)) + + return sortedTagsSets, nil +} + +// intersectSeriesFilters performs an intersection for two sets of ids and filter expressions. +func intersectSeriesFilters(lids, rids SeriesIDs, lfilters, rfilters FilterExprs) (SeriesIDs, FilterExprs) { + // We only want to allocate a slice and map of the smaller size. + var ids []uint64 + if len(lids) > len(rids) { + ids = make([]uint64, 0, len(rids)) + } else { + ids = make([]uint64, 0, len(lids)) + } + + var filters FilterExprs + if len(lfilters) > len(rfilters) { + filters = make(FilterExprs, len(rfilters)) + } else { + filters = make(FilterExprs, len(lfilters)) + } + + // They're in sorted order so advance the counter as needed. + // This is, don't run comparisons against lower values that we've already passed. + for len(lids) > 0 && len(rids) > 0 { + lid, rid := lids[0], rids[0] + if lid == rid { + ids = append(ids, lid) + + var expr influxql.Expr + lfilter := lfilters[lid] + rfilter := rfilters[rid] + + if lfilter != nil && rfilter != nil { + be := &influxql.BinaryExpr{ + Op: influxql.AND, + LHS: lfilter, + RHS: rfilter, + } + expr = influxql.Reduce(be, nil) + } else if lfilter != nil { + expr = lfilter + } else if rfilter != nil { + expr = rfilter + } + + if expr != nil { + filters[lid] = expr + } + lids, rids = lids[1:], rids[1:] + } else if lid < rid { + lids = lids[1:] + } else { + rids = rids[1:] + } + } + return ids, filters +} + +// unionSeriesFilters performs a union for two sets of ids and filter expressions. +func unionSeriesFilters(lids, rids SeriesIDs, lfilters, rfilters FilterExprs) (SeriesIDs, FilterExprs) { + ids := make([]uint64, 0, len(lids)+len(rids)) + + // Setup the filters with the smallest size since we will discard filters + // that do not have a match on the other side. + var filters FilterExprs + if len(lfilters) < len(rfilters) { + filters = make(FilterExprs, len(lfilters)) + } else { + filters = make(FilterExprs, len(rfilters)) + } + + for len(lids) > 0 && len(rids) > 0 { + lid, rid := lids[0], rids[0] + if lid == rid { + ids = append(ids, lid) + + // If one side does not have a filter, then the series has been + // included on one side of the OR with no condition. Eliminate the + // filter in this case. + var expr influxql.Expr + lfilter := lfilters[lid] + rfilter := rfilters[rid] + if lfilter != nil && rfilter != nil { + be := &influxql.BinaryExpr{ + Op: influxql.OR, + LHS: lfilter, + RHS: rfilter, + } + expr = influxql.Reduce(be, nil) + } + + if expr != nil { + filters[lid] = expr + } + lids, rids = lids[1:], rids[1:] + } else if lid < rid { + ids = append(ids, lid) + + filter := lfilters[lid] + if filter != nil { + filters[lid] = filter + } + lids = lids[1:] + } else { + ids = append(ids, rid) + + filter := rfilters[rid] + if filter != nil { + filters[rid] = filter + } + rids = rids[1:] + } + } + + // Now append the remainder. + if len(lids) > 0 { + for i := 0; i < len(lids); i++ { + ids = append(ids, lids[i]) + + filter := lfilters[lids[i]] + if filter != nil { + filters[lids[i]] = filter + } + } + } else if len(rids) > 0 { + for i := 0; i < len(rids); i++ { + ids = append(ids, rids[i]) + + filter := rfilters[rids[i]] + if filter != nil { + filters[rids[i]] = filter + } + } + } + return ids, filters +} + +// IDsForExpr returns the series IDs that are candidates to match the given expression. +func (m *Measurement) IDsForExpr(n *influxql.BinaryExpr) SeriesIDs { + ids, _, _ := m.idsForExpr(n) + return ids +} + +// idsForExpr returns a collection of series ids and a filter expression that should +// be used to filter points from those series. +func (m *Measurement) idsForExpr(n *influxql.BinaryExpr) (SeriesIDs, influxql.Expr, error) { + // If this binary expression has another binary expression, then this + // is some expression math and we should just pass it to the underlying query. + if _, ok := n.LHS.(*influxql.BinaryExpr); ok { + return m.SeriesIDs(), n, nil + } else if _, ok := n.RHS.(*influxql.BinaryExpr); ok { + return m.SeriesIDs(), n, nil + } + + // Retrieve the variable reference from the correct side of the expression. + name, ok := n.LHS.(*influxql.VarRef) + value := n.RHS + if !ok { + name, ok = n.RHS.(*influxql.VarRef) + if !ok { + return nil, nil, fmt.Errorf("invalid expression: %s", n.String()) + } + value = n.LHS + } + + // For time literals, return all series IDs and "true" as the filter. + if _, ok := value.(*influxql.TimeLiteral); ok || name.Val == "time" { + return m.SeriesIDs(), &influxql.BooleanLiteral{Val: true}, nil + } + + // For fields, return all series IDs from this measurement and return + // the expression passed in, as the filter. + if name.Val != "_name" && ((name.Type == influxql.Unknown && m.HasField(name.Val)) || name.Type == influxql.AnyField || (name.Type != influxql.Tag && name.Type != influxql.Unknown)) { + return m.SeriesIDs(), n, nil + } else if value, ok := value.(*influxql.VarRef); ok { + // Check if the RHS is a variable and if it is a field. + if value.Val != "_name" && ((value.Type == influxql.Unknown && m.HasField(value.Val)) || name.Type == influxql.AnyField || (value.Type != influxql.Tag && value.Type != influxql.Unknown)) { + return m.SeriesIDs(), n, nil + } + } + + // Retrieve list of series with this tag key. + tagVals := m.seriesByTagKeyValue[name.Val] + + // if we're looking for series with a specific tag value + if str, ok := value.(*influxql.StringLiteral); ok { + var ids SeriesIDs + + // Special handling for "_name" to match measurement name. + if name.Val == "_name" { + if (n.Op == influxql.EQ && str.Val == m.Name) || (n.Op == influxql.NEQ && str.Val != m.Name) { + return m.SeriesIDs(), nil, nil + } + return nil, nil, nil + } + + if n.Op == influxql.EQ { + if str.Val != "" { + // return series that have a tag of specific value. + ids = tagVals[str.Val] + } else { + // Make a copy of all series ids and mark the ones we need to evict. + seriesIDs := newEvictSeriesIDs(m.SeriesIDs()) + + // Go through each slice and mark the values we find as zero so + // they can be removed later. + for _, a := range tagVals { + seriesIDs.mark(a) + } + + // Make a new slice with only the remaining ids. + ids = seriesIDs.evict() + } + } else if n.Op == influxql.NEQ { + if str.Val != "" { + ids = m.SeriesIDs().Reject(tagVals[str.Val]) + } else { + for k := range tagVals { + ids = append(ids, tagVals[k]...) + } + sort.Sort(ids) + } + } + return ids, nil, nil + } + + // if we're looking for series with a tag value that matches a regex + if re, ok := value.(*influxql.RegexLiteral); ok { + var ids SeriesIDs + + // Special handling for "_name" to match measurement name. + if name.Val == "_name" { + match := re.Val.MatchString(m.Name) + if (n.Op == influxql.EQREGEX && match) || (n.Op == influxql.NEQREGEX && !match) { + return m.SeriesIDs(), &influxql.BooleanLiteral{Val: true}, nil + } + return nil, nil, nil + } + + // Check if we match the empty string to see if we should include series + // that are missing the tag. + empty := re.Val.MatchString("") + + // Gather the series that match the regex. If we should include the empty string, + // start with the list of all series and reject series that don't match our condition. + // If we should not include the empty string, include series that match our condition. + if empty && n.Op == influxql.EQREGEX { + // See comments above for EQ with a StringLiteral. + seriesIDs := newEvictSeriesIDs(m.SeriesIDs()) + for k := range tagVals { + if !re.Val.MatchString(k) { + seriesIDs.mark(tagVals[k]) + } + } + ids = seriesIDs.evict() + } else if empty && n.Op == influxql.NEQREGEX { + ids = make(SeriesIDs, 0, len(m.SeriesIDs())) + for k := range tagVals { + if !re.Val.MatchString(k) { + ids = append(ids, tagVals[k]...) + } + } + sort.Sort(ids) + } else if !empty && n.Op == influxql.EQREGEX { + ids = make(SeriesIDs, 0, len(m.SeriesIDs())) + for k := range tagVals { + if re.Val.MatchString(k) { + ids = append(ids, tagVals[k]...) + } + } + sort.Sort(ids) + } else if !empty && n.Op == influxql.NEQREGEX { + // See comments above for EQ with a StringLiteral. + seriesIDs := newEvictSeriesIDs(m.SeriesIDs()) + for k := range tagVals { + if re.Val.MatchString(k) { + seriesIDs.mark(tagVals[k]) + } + } + ids = seriesIDs.evict() + } + return ids, nil, nil + } + + // compare tag values + if ref, ok := value.(*influxql.VarRef); ok { + var ids SeriesIDs + + if n.Op == influxql.NEQ { + ids = m.SeriesIDs() + } + + rhsTagVals := m.seriesByTagKeyValue[ref.Val] + for k := range tagVals { + tags := tagVals[k].Intersect(rhsTagVals[k]) + if n.Op == influxql.EQ { + ids = ids.Union(tags) + } else if n.Op == influxql.NEQ { + ids = ids.Reject(tags) + } + } + return ids, nil, nil + } + + if n.Op == influxql.NEQ || n.Op == influxql.NEQREGEX { + return m.SeriesIDs(), nil, nil + } + return nil, nil, nil +} + +// FilterExprs represents a map of series IDs to filter expressions. +type FilterExprs map[uint64]influxql.Expr + +// DeleteBoolLiteralTrues deletes all elements whose filter expression is a boolean literal true. +func (fe FilterExprs) DeleteBoolLiteralTrues() { + for id, expr := range fe { + if e, ok := expr.(*influxql.BooleanLiteral); ok && e.Val { + delete(fe, id) + } + } +} + +// Len returns the number of elements. +func (fe FilterExprs) Len() int { + if fe == nil { + return 0 + } + return len(fe) +} + +// WalkWhereForSeriesIds recursively walks the WHERE clause and returns an ordered set of series IDs and +// a map from those series IDs to filter expressions that should be used to limit points returned in +// the final query result. +func (m *Measurement) WalkWhereForSeriesIds(expr influxql.Expr) (SeriesIDs, FilterExprs, error) { + switch n := expr.(type) { + case *influxql.BinaryExpr: + switch n.Op { + case influxql.EQ, influxql.NEQ, influxql.LT, influxql.LTE, influxql.GT, influxql.GTE, influxql.EQREGEX, influxql.NEQREGEX: + // Get the series IDs and filter expression for the tag or field comparison. + ids, expr, err := m.idsForExpr(n) + if err != nil { + return nil, nil, err + } + + if len(ids) == 0 { + return ids, nil, nil + } + + // If the expression is a boolean literal that is true, ignore it. + if b, ok := expr.(*influxql.BooleanLiteral); ok && b.Val { + expr = nil + } + + var filters FilterExprs + if expr != nil { + filters = make(FilterExprs, len(ids)) + for _, id := range ids { + filters[id] = expr + } + } + + return ids, filters, nil + case influxql.AND, influxql.OR: + // Get the series IDs and filter expressions for the LHS. + lids, lfilters, err := m.WalkWhereForSeriesIds(n.LHS) + if err != nil { + return nil, nil, err + } + + // Get the series IDs and filter expressions for the RHS. + rids, rfilters, err := m.WalkWhereForSeriesIds(n.RHS) + if err != nil { + return nil, nil, err + } + + // Combine the series IDs from the LHS and RHS. + if n.Op == influxql.AND { + ids, filters := intersectSeriesFilters(lids, rids, lfilters, rfilters) + return ids, filters, nil + } else { + ids, filters := unionSeriesFilters(lids, rids, lfilters, rfilters) + return ids, filters, nil + } + } + + ids, _, err := m.idsForExpr(n) + return ids, nil, err + case *influxql.ParenExpr: + // walk down the tree + return m.WalkWhereForSeriesIds(n.Expr) + default: + return nil, nil, nil + } +} + +// expandExpr returns a list of expressions expanded by all possible tag +// combinations. +func (m *Measurement) expandExpr(expr influxql.Expr) []tagSetExpr { + // Retrieve list of unique values for each tag. + valuesByTagKey := m.uniqueTagValues(expr) + + // Convert keys to slices. + keys := make([]string, 0, len(valuesByTagKey)) + for key := range valuesByTagKey { + keys = append(keys, key) + } + sort.Strings(keys) + + // Order uniques by key. + uniques := make([][]string, len(keys)) + for i, key := range keys { + uniques[i] = valuesByTagKey[key] + } + + // Reduce a condition for each combination of tag values. + return expandExprWithValues(expr, keys, []tagExpr{}, uniques, 0) +} + +func expandExprWithValues(expr influxql.Expr, keys []string, tagExprs []tagExpr, uniques [][]string, index int) []tagSetExpr { + // If we have no more keys left then execute the reduction and return. + if index == len(keys) { + // Create a map of tag key/values. + m := make(map[string]*string, len(keys)) + for i, key := range keys { + if tagExprs[i].op == influxql.EQ { + m[key] = &tagExprs[i].values[0] + } else { + m[key] = nil + } + } + + // TODO: Rewrite full expressions instead of VarRef replacement. + + // Reduce using the current tag key/value set. + // Ignore it if reduces down to "false". + e := influxql.Reduce(expr, &tagValuer{tags: m}) + if e, ok := e.(*influxql.BooleanLiteral); ok && !e.Val { + return nil + } + + return []tagSetExpr{{values: copyTagExprs(tagExprs), expr: e}} + } + + // Otherwise expand for each possible equality value of the key. + var exprs []tagSetExpr + for _, v := range uniques[index] { + exprs = append(exprs, expandExprWithValues(expr, keys, append(tagExprs, tagExpr{keys[index], []string{v}, influxql.EQ}), uniques, index+1)...) + } + exprs = append(exprs, expandExprWithValues(expr, keys, append(tagExprs, tagExpr{keys[index], uniques[index], influxql.NEQ}), uniques, index+1)...) + + return exprs +} + +// SeriesIDsAllOrByExpr walks an expressions for matching series IDs +// or, if no expressions is given, returns all series IDs for the measurement. +func (m *Measurement) SeriesIDsAllOrByExpr(expr influxql.Expr) (SeriesIDs, error) { + // If no expression given or the measurement has no series, + // we can take just return the ids or nil accordingly. + if expr == nil { + return m.SeriesIDs(), nil + } + + m.mu.RLock() + defer m.mu.RUnlock() + + if len(m.seriesByID) == 0 { + return nil, nil + } + + // Get series IDs that match the WHERE clause. + ids, _, err := m.WalkWhereForSeriesIds(expr) + if err != nil { + return nil, err + } + + return ids, nil +} + +// tagKeysByExpr extracts the tag keys wanted by the expression. +func (m *Measurement) TagKeysByExpr(expr influxql.Expr) (map[string]struct{}, error) { + switch e := expr.(type) { + case *influxql.BinaryExpr: + switch e.Op { + case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX: + tag, ok := e.LHS.(*influxql.VarRef) + if !ok { + return nil, fmt.Errorf("left side of '%s' must be a tag key", e.Op.String()) + } else if tag.Val != "_tagKey" { + return nil, nil + } + + if influxql.IsRegexOp(e.Op) { + re, ok := e.RHS.(*influxql.RegexLiteral) + if !ok { + return nil, fmt.Errorf("right side of '%s' must be a regular expression", e.Op.String()) + } + return m.tagKeysByFilter(e.Op, "", re.Val), nil + } + + s, ok := e.RHS.(*influxql.StringLiteral) + if !ok { + return nil, fmt.Errorf("right side of '%s' must be a tag value string", e.Op.String()) + } + return m.tagKeysByFilter(e.Op, s.Val, nil), nil + + case influxql.AND, influxql.OR: + lhs, err := m.TagKeysByExpr(e.LHS) + if err != nil { + return nil, err + } + + rhs, err := m.TagKeysByExpr(e.RHS) + if err != nil { + return nil, err + } + + if lhs != nil && rhs != nil { + if e.Op == influxql.OR { + return stringSet(lhs).union(rhs), nil + } + return stringSet(lhs).intersect(rhs), nil + } else if lhs != nil { + return lhs, nil + } else if rhs != nil { + return rhs, nil + } + return nil, nil + default: + return nil, fmt.Errorf("invalid operator") + } + + case *influxql.ParenExpr: + return m.TagKeysByExpr(e.Expr) + } + + return nil, fmt.Errorf("%#v", expr) +} + +// tagKeysByFilter will filter the tag keys for the measurement. +func (m *Measurement) tagKeysByFilter(op influxql.Token, val string, regex *regexp.Regexp) stringSet { + ss := newStringSet() + for _, key := range m.TagKeys() { + var matched bool + switch op { + case influxql.EQ: + matched = key == val + case influxql.NEQ: + matched = key != val + case influxql.EQREGEX: + matched = regex.MatchString(key) + case influxql.NEQREGEX: + matched = !regex.MatchString(key) + } + + if !matched { + continue + } + ss.add(key) + } + return ss +} + +// tagValuer is used during expression expansion to evaluate all sets of tag values. +type tagValuer struct { + tags map[string]*string +} + +// Value returns the string value of a tag and true if it's listed in the tagset. +func (v *tagValuer) Value(name string) (interface{}, bool) { + if value, ok := v.tags[name]; ok { + if value == nil { + return nil, true + } + return *value, true + } + return nil, false +} + +// tagSetExpr represents a set of tag keys/values and associated expression. +type tagSetExpr struct { + values []tagExpr + expr influxql.Expr +} + +// tagExpr represents one or more values assigned to a given tag. +type tagExpr struct { + key string + values []string + op influxql.Token // EQ or NEQ +} + +func copyTagExprs(a []tagExpr) []tagExpr { + other := make([]tagExpr, len(a)) + copy(other, a) + return other +} + +// uniqueTagValues returns a list of unique tag values used in an expression. +func (m *Measurement) uniqueTagValues(expr influxql.Expr) map[string][]string { + // Track unique value per tag. + tags := make(map[string]map[string]struct{}) + + // Find all tag values referenced in the expression. + influxql.WalkFunc(expr, func(n influxql.Node) { + switch n := n.(type) { + case *influxql.BinaryExpr: + // Ignore operators that are not equality. + if n.Op != influxql.EQ { + return + } + + // Extract ref and string literal. + var key, value string + switch lhs := n.LHS.(type) { + case *influxql.VarRef: + if rhs, ok := n.RHS.(*influxql.StringLiteral); ok { + key, value = lhs.Val, rhs.Val + } + case *influxql.StringLiteral: + if rhs, ok := n.RHS.(*influxql.VarRef); ok { + key, value = rhs.Val, lhs.Val + } + } + if key == "" { + return + } + + // Add value to set. + if tags[key] == nil { + tags[key] = make(map[string]struct{}) + } + tags[key][value] = struct{}{} + } + }) + + // Convert to map of slices. + out := make(map[string][]string) + for k, values := range tags { + out[k] = make([]string, 0, len(values)) + for v := range values { + out[k] = append(out[k], v) + } + sort.Strings(out[k]) + } + return out +} + +// Measurements represents a list of *Measurement. +type Measurements []*Measurement + +// Len implements sort.Interface. +func (a Measurements) Len() int { return len(a) } + +// Less implements sort.Interface. +func (a Measurements) Less(i, j int) bool { return a[i].Name < a[j].Name } + +// Swap implements sort.Interface. +func (a Measurements) Swap(i, j int) { a[i], a[j] = a[j], a[i] } + +func (a Measurements) Intersect(other Measurements) Measurements { + l := a + r := other + + // we want to iterate through the shortest one and stop + if len(other) < len(a) { + l = other + r = a + } + + // they're in sorted order so advance the counter as needed. + // That is, don't run comparisons against lower values that we've already passed + var i, j int + + result := make(Measurements, 0, len(l)) + for i < len(l) && j < len(r) { + if l[i].Name == r[j].Name { + result = append(result, l[i]) + i++ + j++ + } else if l[i].Name < r[j].Name { + i++ + } else { + j++ + } + } + + return result +} + +func (a Measurements) Union(other Measurements) Measurements { + result := make(Measurements, 0, len(a)+len(other)) + var i, j int + for i < len(a) && j < len(other) { + if a[i].Name == other[j].Name { + result = append(result, a[i]) + i++ + j++ + } else if a[i].Name < other[j].Name { + result = append(result, a[i]) + i++ + } else { + result = append(result, other[j]) + j++ + } + } + + // now append the remainder + if i < len(a) { + result = append(result, a[i:]...) + } else if j < len(other) { + result = append(result, other[j:]...) + } + + return result +} + +// Series belong to a Measurement and represent unique time series in a database. +type Series struct { + mu sync.RWMutex + Key string + tags models.Tags + ID uint64 + measurement *Measurement + shardIDs map[uint64]struct{} // shards that have this series defined +} + +// NewSeries returns an initialized series struct +func NewSeries(key []byte, tags models.Tags) *Series { + return &Series{ + Key: string(key), + tags: tags, + shardIDs: make(map[uint64]struct{}), + } +} + +func (s *Series) AssignShard(shardID uint64) { + s.mu.RLock() + _, ok := s.shardIDs[shardID] + s.mu.RUnlock() + + if ok { + return + } + + s.mu.Lock() + // Skip the existence check under the write lock because we're just storing + // and empty struct. + s.shardIDs[shardID] = struct{}{} + s.mu.Unlock() +} + +func (s *Series) UnassignShard(shardID uint64) { + s.mu.Lock() + delete(s.shardIDs, shardID) + s.mu.Unlock() +} + +func (s *Series) Assigned(shardID uint64) bool { + s.mu.RLock() + _, ok := s.shardIDs[shardID] + s.mu.RUnlock() + return ok +} + +func (s *Series) ShardN() int { + s.mu.RLock() + n := len(s.shardIDs) + s.mu.RUnlock() + return n +} + +// Measurement returns the measurement on the series. +func (s *Series) Measurement() *Measurement { + return s.measurement +} + +// SetMeasurement sets the measurement on the series. +func (s *Series) SetMeasurement(m *Measurement) { + s.measurement = m +} + +// ForEachTag executes fn for every tag. Iteration occurs under lock. +func (s *Series) ForEachTag(fn func(models.Tag)) { + s.mu.RLock() + defer s.mu.RUnlock() + for _, t := range s.tags { + fn(t) + } +} + +// Tags returns a copy of the tags under lock. +func (s *Series) Tags() models.Tags { + s.mu.RLock() + defer s.mu.RUnlock() + return s.tags +} + +// CopyTags clones the tags on the series in-place, +func (s *Series) CopyTags() { + s.mu.Lock() + defer s.mu.Unlock() + s.tags = s.tags.Clone() +} + +// GetTagString returns a tag value under lock. +func (s *Series) GetTagString(key string) string { + s.mu.RLock() + defer s.mu.RUnlock() + return s.tags.GetString(key) +} + +// SeriesIDs is a convenience type for sorting, checking equality, and doing +// union and intersection of collections of series ids. +type SeriesIDs []uint64 + +// Len implements sort.Interface. +func (a SeriesIDs) Len() int { return len(a) } + +// Less implements sort.Interface. +func (a SeriesIDs) Less(i, j int) bool { return a[i] < a[j] } + +// Swap implements sort.Interface. +func (a SeriesIDs) Swap(i, j int) { a[i], a[j] = a[j], a[i] } + +// Equals assumes that both are sorted. +func (a SeriesIDs) Equals(other SeriesIDs) bool { + if len(a) != len(other) { + return false + } + for i, s := range other { + if a[i] != s { + return false + } + } + return true +} + +// Intersect returns a new collection of series ids in sorted order that is the intersection of the two. +// The two collections must already be sorted. +func (a SeriesIDs) Intersect(other SeriesIDs) SeriesIDs { + l := a + r := other + + // we want to iterate through the shortest one and stop + if len(other) < len(a) { + l = other + r = a + } + + // they're in sorted order so advance the counter as needed. + // That is, don't run comparisons against lower values that we've already passed + var i, j int + + ids := make([]uint64, 0, len(l)) + for i < len(l) && j < len(r) { + if l[i] == r[j] { + ids = append(ids, l[i]) + i++ + j++ + } else if l[i] < r[j] { + i++ + } else { + j++ + } + } + + return SeriesIDs(ids) +} + +// Union returns a new collection of series ids in sorted order that is the union of the two. +// The two collections must already be sorted. +func (a SeriesIDs) Union(other SeriesIDs) SeriesIDs { + l := a + r := other + ids := make([]uint64, 0, len(l)+len(r)) + var i, j int + for i < len(l) && j < len(r) { + if l[i] == r[j] { + ids = append(ids, l[i]) + i++ + j++ + } else if l[i] < r[j] { + ids = append(ids, l[i]) + i++ + } else { + ids = append(ids, r[j]) + j++ + } + } + + // now append the remainder + if i < len(l) { + ids = append(ids, l[i:]...) + } else if j < len(r) { + ids = append(ids, r[j:]...) + } + + return ids +} + +// Reject returns a new collection of series ids in sorted order with the passed in set removed from the original. +// This is useful for the NOT operator. The two collections must already be sorted. +func (a SeriesIDs) Reject(other SeriesIDs) SeriesIDs { + l := a + r := other + var i, j int + + ids := make([]uint64, 0, len(l)) + for i < len(l) && j < len(r) { + if l[i] == r[j] { + i++ + j++ + } else if l[i] < r[j] { + ids = append(ids, l[i]) + i++ + } else { + j++ + } + } + + // Append the remainder + if i < len(l) { + ids = append(ids, l[i:]...) + } + + return SeriesIDs(ids) +} + +// seriesID is a series id that may or may not have been evicted from the +// current id list. +type seriesID struct { + val uint64 + evict bool +} + +// evictSeriesIDs is a slice of SeriesIDs with an extra field to mark if the +// field should be evicted or not. +type evictSeriesIDs struct { + ids []seriesID + sz int +} + +// newEvictSeriesIDs copies the ids into a new slice that can be used for +// evicting series from the slice. +func newEvictSeriesIDs(ids []uint64) evictSeriesIDs { + a := make([]seriesID, len(ids)) + for i, id := range ids { + a[i].val = id + } + return evictSeriesIDs{ + ids: a, + sz: len(a), + } +} + +// mark marks all of the ids in the sorted slice to be evicted from the list of +// series ids. If an id to be evicted does not exist, it just gets ignored. +func (a *evictSeriesIDs) mark(ids []uint64) { + seriesIDs := a.ids + for _, id := range ids { + if len(seriesIDs) == 0 { + break + } + + // Perform a binary search of the remaining slice if + // the first element does not match the value we're + // looking for. + i := 0 + if seriesIDs[0].val < id { + i = sort.Search(len(seriesIDs), func(i int) bool { + return seriesIDs[i].val >= id + }) + } + + if i >= len(seriesIDs) { + break + } else if seriesIDs[i].val == id { + if !seriesIDs[i].evict { + seriesIDs[i].evict = true + a.sz-- + } + // Skip over this series since it has been evicted and won't be + // encountered again. + i++ + } + seriesIDs = seriesIDs[i:] + } +} + +// evict creates a new slice with only the series that have not been evicted. +func (a *evictSeriesIDs) evict() (ids SeriesIDs) { + if a.sz == 0 { + return ids + } + + // Make a new slice with only the remaining ids. + ids = make([]uint64, 0, a.sz) + for _, id := range a.ids { + if id.evict { + continue + } + ids = append(ids, id.val) + } + return ids +} + +// TagFilter represents a tag filter when looking up other tags or measurements. +type TagFilter struct { + Op influxql.Token + Key string + Value string + Regex *regexp.Regexp +} + +// WalkTagKeys calls fn for each tag key associated with m. The order of the +// keys is undefined. +func (m *Measurement) WalkTagKeys(fn func(k string)) { + m.mu.RLock() + defer m.mu.RUnlock() + + for k := range m.seriesByTagKeyValue { + fn(k) + } +} + +// TagKeys returns a list of the measurement's tag names, in sorted order. +func (m *Measurement) TagKeys() []string { + m.mu.RLock() + keys := make([]string, 0, len(m.seriesByTagKeyValue)) + for k := range m.seriesByTagKeyValue { + keys = append(keys, k) + } + m.mu.RUnlock() + sort.Strings(keys) + return keys +} + +// TagValues returns all the values for the given tag key, in an arbitrary order. +func (m *Measurement) TagValues(key string) []string { + m.mu.RLock() + defer m.mu.RUnlock() + values := make([]string, 0, len(m.seriesByTagKeyValue[key])) + for v := range m.seriesByTagKeyValue[key] { + values = append(values, v) + } + return values +} + +// SetFieldName adds the field name to the measurement. +func (m *Measurement) SetFieldName(name string) { + m.mu.RLock() + if _, ok := m.fieldNames[name]; ok { + m.mu.RUnlock() + return + } + m.mu.RUnlock() + + m.mu.Lock() + m.fieldNames[name] = struct{}{} + m.mu.Unlock() +} + +// FieldNames returns a list of the measurement's field names, in an arbitrary order. +func (m *Measurement) FieldNames() []string { + m.mu.RLock() + defer m.mu.RUnlock() + + a := make([]string, 0, len(m.fieldNames)) + for n := range m.fieldNames { + a = append(a, n) + } + return a +} + +func (m *Measurement) tagValuesByKeyAndSeriesID(tagKeys []string, ids SeriesIDs) map[string]stringSet { + // If no tag keys were passed, get all tag keys for the measurement. + if len(tagKeys) == 0 { + for k := range m.seriesByTagKeyValue { + tagKeys = append(tagKeys, k) + } + } + + // Mapping between tag keys to all existing tag values. + tagValues := make(map[string]stringSet, 0) + + // Iterate all series to collect tag values. + for _, id := range ids { + s, ok := m.seriesByID[id] + if !ok { + continue + } + + // Iterate the tag keys we're interested in and collect values + // from this series, if they exist. + tags := s.Tags() + for _, tagKey := range tagKeys { + if tagVal := tags.GetString(tagKey); tagVal != "" { + if _, ok = tagValues[tagKey]; !ok { + tagValues[tagKey] = newStringSet() + } + tagValues[tagKey].add(tagVal) + } + } + } + + return tagValues +} + +func (m *Measurement) SeriesByTagKeyValue(key string) map[string]SeriesIDs { + m.mu.RLock() + ret := m.seriesByTagKeyValue[key] + m.mu.RUnlock() + return ret +} + +// stringSet represents a set of strings. +type stringSet map[string]struct{} + +// newStringSet returns an empty stringSet. +func newStringSet() stringSet { + return make(map[string]struct{}) +} + +// add adds strings to the set. +func (s stringSet) add(ss ...string) { + for _, n := range ss { + s[n] = struct{}{} + } +} + +// list returns the current elements in the set, in sorted order. +func (s stringSet) list() []string { + l := make([]string, 0, len(s)) + for k := range s { + l = append(l, k) + } + sort.Strings(l) + return l +} + +// union returns the union of this set and another. +func (s stringSet) union(o stringSet) stringSet { + ns := newStringSet() + for k := range s { + ns[k] = struct{}{} + } + for k := range o { + ns[k] = struct{}{} + } + return ns +} + +// intersect returns the intersection of this set and another. +func (s stringSet) intersect(o stringSet) stringSet { + shorter, longer := s, o + if len(longer) < len(shorter) { + shorter, longer = longer, shorter + } + + ns := newStringSet() + for k := range shorter { + if _, ok := longer[k]; ok { + ns[k] = struct{}{} + } + } + return ns +} + +// filter removes v from a if it exists. a must be sorted in ascending +// order. +func filter(a []uint64, v uint64) []uint64 { + // binary search for v + i := sort.Search(len(a), func(i int) bool { return a[i] >= v }) + if i >= len(a) || a[i] != v { + return a + } + + // we found it, so shift the right half down one, overwriting v's position. + copy(a[i:], a[i+1:]) + return a[:len(a)-1] +} + +type byTagKey []*influxql.TagSet + +func (t byTagKey) Len() int { return len(t) } +func (t byTagKey) Less(i, j int) bool { return bytes.Compare(t[i].Key, t[j].Key) < 0 } +func (t byTagKey) Swap(i, j int) { t[i], t[j] = t[j], t[i] } diff --git a/tsdb/index/inmem/meta_test.go b/tsdb/index/inmem/meta_test.go new file mode 100644 index 0000000000..e59d86554f --- /dev/null +++ b/tsdb/index/inmem/meta_test.go @@ -0,0 +1,255 @@ +package inmem_test + +import ( + "fmt" + "strings" + "testing" + + "github.com/influxdata/influxdb/influxql" + "github.com/influxdata/influxdb/models" + "github.com/influxdata/influxdb/tsdb/index/inmem" +) + +// Test comparing SeriesIDs for equality. +func TestSeriesIDs_Equals(t *testing.T) { + ids1 := inmem.SeriesIDs([]uint64{1, 2, 3}) + ids2 := inmem.SeriesIDs([]uint64{1, 2, 3}) + ids3 := inmem.SeriesIDs([]uint64{4, 5, 6}) + + if !ids1.Equals(ids2) { + t.Fatal("expected ids1 == ids2") + } else if ids1.Equals(ids3) { + t.Fatal("expected ids1 != ids3") + } +} + +// Test intersecting sets of SeriesIDs. +func TestSeriesIDs_Intersect(t *testing.T) { + // Test swaping l & r, all branches of if-else, and exit loop when 'j < len(r)' + ids1 := inmem.SeriesIDs([]uint64{1, 3, 4, 5, 6}) + ids2 := inmem.SeriesIDs([]uint64{1, 2, 3, 7}) + exp := inmem.SeriesIDs([]uint64{1, 3}) + got := ids1.Intersect(ids2) + + if !exp.Equals(got) { + t.Fatalf("exp=%v, got=%v", exp, got) + } + + // Test exit for loop when 'i < len(l)' + ids1 = inmem.SeriesIDs([]uint64{1}) + ids2 = inmem.SeriesIDs([]uint64{1, 2}) + exp = inmem.SeriesIDs([]uint64{1}) + got = ids1.Intersect(ids2) + + if !exp.Equals(got) { + t.Fatalf("exp=%v, got=%v", exp, got) + } +} + +// Test union sets of SeriesIDs. +func TestSeriesIDs_Union(t *testing.T) { + // Test all branches of if-else, exit loop because of 'j < len(r)', and append remainder from left. + ids1 := inmem.SeriesIDs([]uint64{1, 2, 3, 7}) + ids2 := inmem.SeriesIDs([]uint64{1, 3, 4, 5, 6}) + exp := inmem.SeriesIDs([]uint64{1, 2, 3, 4, 5, 6, 7}) + got := ids1.Union(ids2) + + if !exp.Equals(got) { + t.Fatalf("exp=%v, got=%v", exp, got) + } + + // Test exit because of 'i < len(l)' and append remainder from right. + ids1 = inmem.SeriesIDs([]uint64{1}) + ids2 = inmem.SeriesIDs([]uint64{1, 2}) + exp = inmem.SeriesIDs([]uint64{1, 2}) + got = ids1.Union(ids2) + + if !exp.Equals(got) { + t.Fatalf("exp=%v, got=%v", exp, got) + } +} + +// Test removing one set of SeriesIDs from another. +func TestSeriesIDs_Reject(t *testing.T) { + // Test all branches of if-else, exit loop because of 'j < len(r)', and append remainder from left. + ids1 := inmem.SeriesIDs([]uint64{1, 2, 3, 7}) + ids2 := inmem.SeriesIDs([]uint64{1, 3, 4, 5, 6}) + exp := inmem.SeriesIDs([]uint64{2, 7}) + got := ids1.Reject(ids2) + + if !exp.Equals(got) { + t.Fatalf("exp=%v, got=%v", exp, got) + } + + // Test exit because of 'i < len(l)'. + ids1 = inmem.SeriesIDs([]uint64{1}) + ids2 = inmem.SeriesIDs([]uint64{1, 2}) + exp = inmem.SeriesIDs{} + got = ids1.Reject(ids2) + + if !exp.Equals(got) { + t.Fatalf("exp=%v, got=%v", exp, got) + } +} + +func TestMeasurement_AppendSeriesKeysByID_Missing(t *testing.T) { + m := inmem.NewMeasurement("cpu") + var dst []string + dst = m.AppendSeriesKeysByID(dst, []uint64{1}) + if exp, got := 0, len(dst); exp != got { + t.Fatalf("series len mismatch: exp %v, got %v", exp, got) + } +} + +func TestMeasurement_AppendSeriesKeysByID_Exists(t *testing.T) { + m := inmem.NewMeasurement("cpu") + s := inmem.NewSeries([]byte("cpu,host=foo"), models.Tags{models.NewTag([]byte("host"), []byte("foo"))}) + s.ID = 1 + m.AddSeries(s) + + var dst []string + dst = m.AppendSeriesKeysByID(dst, []uint64{1}) + if exp, got := 1, len(dst); exp != got { + t.Fatalf("series len mismatch: exp %v, got %v", exp, got) + } + + if exp, got := "cpu,host=foo", dst[0]; exp != got { + t.Fatalf("series mismatch: exp %v, got %v", exp, got) + } +} + +func TestMeasurement_TagsSet_Deadlock(t *testing.T) { + m := inmem.NewMeasurement("cpu") + s1 := inmem.NewSeries([]byte("cpu,host=foo"), models.Tags{models.NewTag([]byte("host"), []byte("foo"))}) + s1.ID = 1 + m.AddSeries(s1) + + s2 := inmem.NewSeries([]byte("cpu,host=bar"), models.Tags{models.NewTag([]byte("host"), []byte("bar"))}) + s2.ID = 2 + m.AddSeries(s2) + + m.DropSeries(s1) + + // This was deadlocking + m.TagSets(1, influxql.IteratorOptions{}) + if got, exp := len(m.SeriesIDs()), 1; got != exp { + t.Fatalf("series count mismatch: got %v, exp %v", got, exp) + } +} + +func TestMeasurement_ForEachSeriesByExpr_Deadlock(t *testing.T) { + m := inmem.NewMeasurement("cpu") + s1 := inmem.NewSeries([]byte("cpu,host=foo"), models.Tags{models.NewTag([]byte("host"), []byte("foo"))}) + s1.ID = 1 + m.AddSeries(s1) + + s2 := inmem.NewSeries([]byte("cpu,host=bar"), models.Tags{models.NewTag([]byte("host"), []byte("bar"))}) + s2.ID = 2 + m.AddSeries(s2) + + m.DropSeries(s1) + + // This was deadlocking + m.ForEachSeriesByExpr(nil, func(tags models.Tags) error { + return nil + }) + if got, exp := len(m.SeriesIDs()), 1; got != exp { + t.Fatalf("series count mismatch: got %v, exp %v", got, exp) + } +} + +func BenchmarkMeasurement_SeriesIDForExp_EQRegex(b *testing.B) { + m := inmem.NewMeasurement("cpu") + for i := 0; i < 100000; i++ { + s := inmem.NewSeries([]byte("cpu"), models.Tags{models.NewTag( + []byte("host"), + []byte(fmt.Sprintf("host%d", i)))}) + s.ID = uint64(i) + m.AddSeries(s) + } + + if exp, got := 100000, len(m.SeriesKeys()); exp != got { + b.Fatalf("series count mismatch: exp %v got %v", exp, got) + } + + stmt, err := influxql.NewParser(strings.NewReader(`SELECT * FROM cpu WHERE host =~ /host\d+/`)).ParseStatement() + if err != nil { + b.Fatalf("invalid statement: %s", err) + } + + selectStmt := stmt.(*influxql.SelectStatement) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + ids := m.IDsForExpr(selectStmt.Condition.(*influxql.BinaryExpr)) + if exp, got := 100000, len(ids); exp != got { + b.Fatalf("series count mismatch: exp %v got %v", exp, got) + } + + } +} + +func BenchmarkMeasurement_SeriesIDForExp_NERegex(b *testing.B) { + m := inmem.NewMeasurement("cpu") + for i := 0; i < 100000; i++ { + s := inmem.NewSeries([]byte("cpu"), models.Tags{models.Tag{ + Key: []byte("host"), + Value: []byte(fmt.Sprintf("host%d", i))}}) + s.ID = uint64(i) + m.AddSeries(s) + } + + if exp, got := 100000, len(m.SeriesKeys()); exp != got { + b.Fatalf("series count mismatch: exp %v got %v", exp, got) + } + + stmt, err := influxql.NewParser(strings.NewReader(`SELECT * FROM cpu WHERE host !~ /foo\d+/`)).ParseStatement() + if err != nil { + b.Fatalf("invalid statement: %s", err) + } + + selectStmt := stmt.(*influxql.SelectStatement) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + ids := m.IDsForExpr(selectStmt.Condition.(*influxql.BinaryExpr)) + if exp, got := 100000, len(ids); exp != got { + b.Fatalf("series count mismatch: exp %v got %v", exp, got) + } + + } + +} + +/* +func BenchmarkCreateSeriesIndex_1K(b *testing.B) { + benchmarkCreateSeriesIndex(b, genTestSeries(38, 3, 3)) +} + +func BenchmarkCreateSeriesIndex_100K(b *testing.B) { + benchmarkCreateSeriesIndex(b, genTestSeries(32, 5, 5)) +} + +func BenchmarkCreateSeriesIndex_1M(b *testing.B) { + benchmarkCreateSeriesIndex(b, genTestSeries(330, 5, 5)) +} + +func benchmarkCreateSeriesIndex(b *testing.B, series []*TestSeries) { + idxs := make([]*inmem.DatabaseIndex, 0, b.N) + for i := 0; i < b.N; i++ { + index, err := inmem.NewDatabaseIndex(fmt.Sprintf("db%d", i)) + if err != nil { + b.Fatal(err) + } + idxs = append(idxs, index) + } + + b.ResetTimer() + for n := 0; n < b.N; n++ { + idx := idxs[n] + for _, s := range series { + idx.CreateSeriesIndexIfNotExists(s.Measurement, s.Series, false) + } + } +} +*/ diff --git a/tsdb/meta.go b/tsdb/meta.go index c5f7ecc804..e5dfc2e899 100644 --- a/tsdb/meta.go +++ b/tsdb/meta.go @@ -1,1430 +1,13 @@ package tsdb -import ( - "bytes" - "fmt" - "regexp" - "sort" - "sync" - - "github.com/influxdata/influxdb/influxql" - "github.com/influxdata/influxdb/models" - "github.com/influxdata/influxdb/pkg/escape" - internal "github.com/influxdata/influxdb/tsdb/internal" - - "github.com/gogo/protobuf/proto" -) - //go:generate protoc --gogo_out=. internal/meta.proto -// Measurement represents a collection of time series in a database. It also -// contains in memory structures for indexing tags. Exported functions are -// goroutine safe while un-exported functions assume the caller will use the -// appropriate locks. -type Measurement struct { - mu sync.RWMutex - Name string `json:"name,omitempty"` - fieldNames map[string]struct{} +import ( + "sort" - // in-memory index fields - seriesByID map[uint64]*Series // lookup table for series by their id - seriesByTagKeyValue map[string]map[string]SeriesIDs // map from tag key to value to sorted set of series ids - - // lazyily created sorted series IDs - sortedSeriesIDs SeriesIDs // sorted list of series IDs in this measurement -} - -// NewMeasurement allocates and initializes a new Measurement. -func NewMeasurement(name string) *Measurement { - return &Measurement{ - Name: name, - fieldNames: make(map[string]struct{}), - - seriesByID: make(map[uint64]*Series), - seriesByTagKeyValue: make(map[string]map[string]SeriesIDs), - } -} - -func (m *Measurement) HasField(name string) bool { - m.mu.RLock() - _, hasField := m.fieldNames[name] - m.mu.RUnlock() - return hasField -} - -// SeriesByID returns a series by identifier. -func (m *Measurement) SeriesByID(id uint64) *Series { - m.mu.RLock() - defer m.mu.RUnlock() - return m.seriesByID[id] -} - -// SeriesByIDMap returns the internal seriesByID map. -func (m *Measurement) SeriesByIDMap() map[uint64]*Series { - m.mu.RLock() - defer m.mu.RUnlock() - return m.seriesByID -} - -// SeriesByIDSlice returns a list of series by identifiers. -func (m *Measurement) SeriesByIDSlice(ids []uint64) []*Series { - m.mu.RLock() - defer m.mu.RUnlock() - a := make([]*Series, len(ids)) - for i, id := range ids { - a[i] = m.seriesByID[id] - } - return a -} - -// AppendSeriesKeysByID appends keys for a list of series ids to a buffer. -func (m *Measurement) AppendSeriesKeysByID(dst []string, ids []uint64) []string { - m.mu.RLock() - defer m.mu.RUnlock() - for _, id := range ids { - if s := m.seriesByID[id]; s != nil { - dst = append(dst, s.Key) - } - } - return dst -} - -// SeriesKeysByID returns the a list of keys for a set of ids. -func (m *Measurement) SeriesKeysByID(ids SeriesIDs) [][]byte { - m.mu.RLock() - defer m.mu.RUnlock() - keys := make([][]byte, 0, len(ids)) - for _, id := range ids { - s := m.seriesByID[id] - if s == nil { - continue - } - keys = append(keys, []byte(s.Key)) - } - return keys -} - -// SeriesKeys returns the keys of every series in this measurement -func (m *Measurement) SeriesKeys() [][]byte { - m.mu.RLock() - defer m.mu.RUnlock() - keys := make([][]byte, 0, len(m.seriesByID)) - for _, s := range m.seriesByID { - keys = append(keys, []byte(s.Key)) - } - return keys -} - -func (m *Measurement) SeriesIDs() SeriesIDs { - m.mu.RLock() - if len(m.sortedSeriesIDs) == len(m.seriesByID) { - s := m.sortedSeriesIDs - m.mu.RUnlock() - return s - } - m.mu.RUnlock() - - m.mu.Lock() - if len(m.sortedSeriesIDs) == len(m.seriesByID) { - s := m.sortedSeriesIDs - m.mu.Unlock() - return s - } - - if cap(m.sortedSeriesIDs) < len(m.seriesByID) { - m.sortedSeriesIDs = make(SeriesIDs, 0, len(m.seriesByID)) - } - for k := range m.seriesByID { - m.sortedSeriesIDs = append(m.sortedSeriesIDs, k) - } - sort.Sort(m.sortedSeriesIDs) - s := m.sortedSeriesIDs - m.mu.Unlock() - return s -} - -// HasTagKey returns true if at least one series in this measurement has written a value for the passed in tag key -func (m *Measurement) HasTagKey(k string) bool { - m.mu.RLock() - defer m.mu.RUnlock() - _, hasTag := m.seriesByTagKeyValue[k] - return hasTag -} - -func (m *Measurement) HasTagKeyValue(k, v []byte) bool { - m.mu.RLock() - if vals, ok := m.seriesByTagKeyValue[string(k)]; ok { - _, ok := vals[string(v)] - m.mu.RUnlock() - return ok - } - m.mu.RUnlock() - return false -} - -// HasSeries returns true if there is at least 1 series under this measurement. -func (m *Measurement) HasSeries() bool { - m.mu.RLock() - defer m.mu.RUnlock() - return len(m.seriesByID) > 0 -} - -// Cardinality returns the number of values associated with the given tag key. -func (m *Measurement) Cardinality(key string) int { - var n int - m.mu.RLock() - n = m.cardinality(key) - m.mu.RUnlock() - return n -} - -func (m *Measurement) cardinality(key string) int { - return len(m.seriesByTagKeyValue[key]) -} - -// CardinalityBytes returns the number of values associated with the given tag key. -func (m *Measurement) CardinalityBytes(key []byte) int { - var n int - m.mu.RLock() - n = len(m.seriesByTagKeyValue[string(key)]) - m.mu.RUnlock() - return n -} - -// AddSeries adds a series to the measurement's index. -// It returns true if the series was added successfully or false if the series was already present. -func (m *Measurement) AddSeries(s *Series) bool { - m.mu.RLock() - if _, ok := m.seriesByID[s.ID]; ok { - m.mu.RUnlock() - return false - } - m.mu.RUnlock() - - m.mu.Lock() - defer m.mu.Unlock() - - if _, ok := m.seriesByID[s.ID]; ok { - return false - } - - m.seriesByID[s.ID] = s - - if len(m.sortedSeriesIDs) == 0 || s.ID > m.sortedSeriesIDs[len(m.sortedSeriesIDs)-1] { - m.sortedSeriesIDs = append(m.sortedSeriesIDs, s.ID) - } - - // add this series id to the tag index on the measurement - s.ForEachTag(func(t models.Tag) { - valueMap := m.seriesByTagKeyValue[string(t.Key)] - if valueMap == nil { - valueMap = make(map[string]SeriesIDs) - m.seriesByTagKeyValue[string(t.Key)] = valueMap - } - ids := valueMap[string(t.Value)] - ids = append(ids, s.ID) - - // most of the time the series ID will be higher than all others because it's a new - // series. So don't do the sort if we don't have to. - if len(ids) > 1 && ids[len(ids)-1] < ids[len(ids)-2] { - sort.Sort(ids) - } - valueMap[string(t.Value)] = ids - }) - - return true -} - -// DropSeries removes a series from the measurement's index. -func (m *Measurement) DropSeries(series *Series) { - seriesID := series.ID - m.mu.Lock() - defer m.mu.Unlock() - - if _, ok := m.seriesByID[seriesID]; !ok { - return - } - delete(m.seriesByID, seriesID) - - // clear our lazily sorted set of ids - m.sortedSeriesIDs = m.sortedSeriesIDs[:0] - - // remove this series id from the tag index on the measurement - // s.seriesByTagKeyValue is defined as map[string]map[string]SeriesIDs - series.ForEachTag(func(t models.Tag) { - values := m.seriesByTagKeyValue[string(t.Key)][string(t.Value)] - ids := filter(values, seriesID) - // Check to see if we have any ids, if not, remove the key - if len(ids) == 0 { - delete(m.seriesByTagKeyValue[string(t.Key)], string(t.Value)) - } else { - m.seriesByTagKeyValue[string(t.Key)][string(t.Value)] = ids - } - - // If we have no values, then we delete the key - if len(m.seriesByTagKeyValue[string(t.Key)]) == 0 { - delete(m.seriesByTagKeyValue, string(t.Key)) - } - }) - - return -} - -// filters walks the where clause of a select statement and returns a map with all series ids -// matching the where clause and any filter expression that should be applied to each -func (m *Measurement) filters(condition influxql.Expr) ([]uint64, map[uint64]influxql.Expr, error) { - if condition == nil || influxql.OnlyTimeExpr(condition) { - return m.SeriesIDs(), nil, nil - } - return m.WalkWhereForSeriesIds(condition) -} - -// ForEachSeriesByExpr iterates over all series filtered by condition. -func (m *Measurement) ForEachSeriesByExpr(condition influxql.Expr, fn func(tags models.Tags) error) error { - // Retrieve matching series ids. - ids, _, err := m.filters(condition) - if err != nil { - return err - } - - // Iterate over each series. - for _, id := range ids { - s := m.SeriesByID(id) - if err := fn(s.Tags()); err != nil { - return err - } - } - - return nil -} - -// TagSets returns the unique tag sets that exist for the given tag keys. This is used to determine -// what composite series will be created by a group by. i.e. "group by region" should return: -// {"region":"uswest"}, {"region":"useast"} -// or region, service returns -// {"region": "uswest", "service": "redis"}, {"region": "uswest", "service": "mysql"}, etc... -// This will also populate the TagSet objects with the series IDs that match each tagset and any -// influx filter expression that goes with the series -// TODO: this shouldn't be exported. However, until tx.go and the engine get refactored into tsdb, we need it. -func (m *Measurement) TagSets(shardID uint64, opt influxql.IteratorOptions) ([]*influxql.TagSet, error) { - // get the unique set of series ids and the filters that should be applied to each - ids, filters, err := m.filters(opt.Condition) - if err != nil { - return nil, err - } - - m.mu.RLock() - // For every series, get the tag values for the requested tag keys i.e. dimensions. This is the - // TagSet for that series. Series with the same TagSet are then grouped together, because for the - // purpose of GROUP BY they are part of the same composite series. - tagSets := make(map[string]*influxql.TagSet, 64) - var seriesN int - for _, id := range ids { - // Abort if the query was killed - select { - case <-opt.InterruptCh: - return nil, influxql.ErrQueryInterrupted - default: - } - - if opt.MaxSeriesN > 0 && seriesN > opt.MaxSeriesN { - return nil, fmt.Errorf("max-select-series limit exceeded: (%d/%d)", seriesN, opt.MaxSeriesN) - } - - s := m.seriesByID[id] - if !s.Assigned(shardID) { - continue - } - tags := make(map[string]string, len(opt.Dimensions)) - - // Build the TagSet for this series. - for _, dim := range opt.Dimensions { - tags[dim] = s.GetTagString(dim) - } - - // Convert the TagSet to a string, so it can be added to a map allowing TagSets to be handled - // as a set. - tagsAsKey := MarshalTags(tags) - tagSet, ok := tagSets[string(tagsAsKey)] - if !ok { - // This TagSet is new, create a new entry for it. - tagSet = &influxql.TagSet{ - Tags: tags, - Key: tagsAsKey, - } - } - // Associate the series and filter with the Tagset. - tagSet.AddFilter(m.seriesByID[id].Key, filters[id]) - seriesN++ - - // Ensure it's back in the map. - tagSets[string(tagsAsKey)] = tagSet - } - // Release the lock while we sort all the tags - m.mu.RUnlock() - - // Sort the series in each tag set. - for _, t := range tagSets { - // Abort if the query was killed - select { - case <-opt.InterruptCh: - return nil, influxql.ErrQueryInterrupted - default: - } - - sort.Sort(t) - } - - // The TagSets have been created, as a map of TagSets. Just send - // the values back as a slice, sorting for consistency. - sortedTagsSets := make([]*influxql.TagSet, 0, len(tagSets)) - for _, v := range tagSets { - sortedTagsSets = append(sortedTagsSets, v) - } - sort.Sort(byTagKey(sortedTagsSets)) - - return sortedTagsSets, nil -} - -// intersectSeriesFilters performs an intersection for two sets of ids and filter expressions. -func intersectSeriesFilters(lids, rids SeriesIDs, lfilters, rfilters FilterExprs) (SeriesIDs, FilterExprs) { - // We only want to allocate a slice and map of the smaller size. - var ids []uint64 - if len(lids) > len(rids) { - ids = make([]uint64, 0, len(rids)) - } else { - ids = make([]uint64, 0, len(lids)) - } - - var filters FilterExprs - if len(lfilters) > len(rfilters) { - filters = make(FilterExprs, len(rfilters)) - } else { - filters = make(FilterExprs, len(lfilters)) - } - - // They're in sorted order so advance the counter as needed. - // This is, don't run comparisons against lower values that we've already passed. - for len(lids) > 0 && len(rids) > 0 { - lid, rid := lids[0], rids[0] - if lid == rid { - ids = append(ids, lid) - - var expr influxql.Expr - lfilter := lfilters[lid] - rfilter := rfilters[rid] - - if lfilter != nil && rfilter != nil { - be := &influxql.BinaryExpr{ - Op: influxql.AND, - LHS: lfilter, - RHS: rfilter, - } - expr = influxql.Reduce(be, nil) - } else if lfilter != nil { - expr = lfilter - } else if rfilter != nil { - expr = rfilter - } - - if expr != nil { - filters[lid] = expr - } - lids, rids = lids[1:], rids[1:] - } else if lid < rid { - lids = lids[1:] - } else { - rids = rids[1:] - } - } - return ids, filters -} - -// unionSeriesFilters performs a union for two sets of ids and filter expressions. -func unionSeriesFilters(lids, rids SeriesIDs, lfilters, rfilters FilterExprs) (SeriesIDs, FilterExprs) { - ids := make([]uint64, 0, len(lids)+len(rids)) - - // Setup the filters with the smallest size since we will discard filters - // that do not have a match on the other side. - var filters FilterExprs - if len(lfilters) < len(rfilters) { - filters = make(FilterExprs, len(lfilters)) - } else { - filters = make(FilterExprs, len(rfilters)) - } - - for len(lids) > 0 && len(rids) > 0 { - lid, rid := lids[0], rids[0] - if lid == rid { - ids = append(ids, lid) - - // If one side does not have a filter, then the series has been - // included on one side of the OR with no condition. Eliminate the - // filter in this case. - var expr influxql.Expr - lfilter := lfilters[lid] - rfilter := rfilters[rid] - if lfilter != nil && rfilter != nil { - be := &influxql.BinaryExpr{ - Op: influxql.OR, - LHS: lfilter, - RHS: rfilter, - } - expr = influxql.Reduce(be, nil) - } - - if expr != nil { - filters[lid] = expr - } - lids, rids = lids[1:], rids[1:] - } else if lid < rid { - ids = append(ids, lid) - - filter := lfilters[lid] - if filter != nil { - filters[lid] = filter - } - lids = lids[1:] - } else { - ids = append(ids, rid) - - filter := rfilters[rid] - if filter != nil { - filters[rid] = filter - } - rids = rids[1:] - } - } - - // Now append the remainder. - if len(lids) > 0 { - for i := 0; i < len(lids); i++ { - ids = append(ids, lids[i]) - - filter := lfilters[lids[i]] - if filter != nil { - filters[lids[i]] = filter - } - } - } else if len(rids) > 0 { - for i := 0; i < len(rids); i++ { - ids = append(ids, rids[i]) - - filter := rfilters[rids[i]] - if filter != nil { - filters[rids[i]] = filter - } - } - } - return ids, filters -} - -// IDsForExpr returns the series IDs that are candidates to match the given expression. -func (m *Measurement) IDsForExpr(n *influxql.BinaryExpr) SeriesIDs { - ids, _, _ := m.idsForExpr(n) - return ids -} - -// idsForExpr returns a collection of series ids and a filter expression that should -// be used to filter points from those series. -func (m *Measurement) idsForExpr(n *influxql.BinaryExpr) (SeriesIDs, influxql.Expr, error) { - // If this binary expression has another binary expression, then this - // is some expression math and we should just pass it to the underlying query. - if _, ok := n.LHS.(*influxql.BinaryExpr); ok { - return m.SeriesIDs(), n, nil - } else if _, ok := n.RHS.(*influxql.BinaryExpr); ok { - return m.SeriesIDs(), n, nil - } - - // Retrieve the variable reference from the correct side of the expression. - name, ok := n.LHS.(*influxql.VarRef) - value := n.RHS - if !ok { - name, ok = n.RHS.(*influxql.VarRef) - if !ok { - return nil, nil, fmt.Errorf("invalid expression: %s", n.String()) - } - value = n.LHS - } - - // For time literals, return all series IDs and "true" as the filter. - if _, ok := value.(*influxql.TimeLiteral); ok || name.Val == "time" { - return m.SeriesIDs(), &influxql.BooleanLiteral{Val: true}, nil - } - - // For fields, return all series IDs from this measurement and return - // the expression passed in, as the filter. - if name.Val != "_name" && ((name.Type == influxql.Unknown && m.HasField(name.Val)) || name.Type == influxql.AnyField || (name.Type != influxql.Tag && name.Type != influxql.Unknown)) { - return m.SeriesIDs(), n, nil - } else if value, ok := value.(*influxql.VarRef); ok { - // Check if the RHS is a variable and if it is a field. - if value.Val != "_name" && ((value.Type == influxql.Unknown && m.HasField(value.Val)) || name.Type == influxql.AnyField || (value.Type != influxql.Tag && value.Type != influxql.Unknown)) { - return m.SeriesIDs(), n, nil - } - } - - // Retrieve list of series with this tag key. - tagVals := m.seriesByTagKeyValue[name.Val] - - // if we're looking for series with a specific tag value - if str, ok := value.(*influxql.StringLiteral); ok { - var ids SeriesIDs - - // Special handling for "_name" to match measurement name. - if name.Val == "_name" { - if (n.Op == influxql.EQ && str.Val == m.Name) || (n.Op == influxql.NEQ && str.Val != m.Name) { - return m.SeriesIDs(), nil, nil - } - return nil, nil, nil - } - - if n.Op == influxql.EQ { - if str.Val != "" { - // return series that have a tag of specific value. - ids = tagVals[str.Val] - } else { - // Make a copy of all series ids and mark the ones we need to evict. - seriesIDs := newEvictSeriesIDs(m.SeriesIDs()) - - // Go through each slice and mark the values we find as zero so - // they can be removed later. - for _, a := range tagVals { - seriesIDs.mark(a) - } - - // Make a new slice with only the remaining ids. - ids = seriesIDs.evict() - } - } else if n.Op == influxql.NEQ { - if str.Val != "" { - ids = m.SeriesIDs().Reject(tagVals[str.Val]) - } else { - for k := range tagVals { - ids = append(ids, tagVals[k]...) - } - sort.Sort(ids) - } - } - return ids, nil, nil - } - - // if we're looking for series with a tag value that matches a regex - if re, ok := value.(*influxql.RegexLiteral); ok { - var ids SeriesIDs - - // Special handling for "_name" to match measurement name. - if name.Val == "_name" { - match := re.Val.MatchString(m.Name) - if (n.Op == influxql.EQREGEX && match) || (n.Op == influxql.NEQREGEX && !match) { - return m.SeriesIDs(), &influxql.BooleanLiteral{Val: true}, nil - } - return nil, nil, nil - } - - // Check if we match the empty string to see if we should include series - // that are missing the tag. - empty := re.Val.MatchString("") - - // Gather the series that match the regex. If we should include the empty string, - // start with the list of all series and reject series that don't match our condition. - // If we should not include the empty string, include series that match our condition. - if empty && n.Op == influxql.EQREGEX { - // See comments above for EQ with a StringLiteral. - seriesIDs := newEvictSeriesIDs(m.SeriesIDs()) - for k := range tagVals { - if !re.Val.MatchString(k) { - seriesIDs.mark(tagVals[k]) - } - } - ids = seriesIDs.evict() - } else if empty && n.Op == influxql.NEQREGEX { - ids = make(SeriesIDs, 0, len(m.SeriesIDs())) - for k := range tagVals { - if !re.Val.MatchString(k) { - ids = append(ids, tagVals[k]...) - } - } - sort.Sort(ids) - } else if !empty && n.Op == influxql.EQREGEX { - ids = make(SeriesIDs, 0, len(m.SeriesIDs())) - for k := range tagVals { - if re.Val.MatchString(k) { - ids = append(ids, tagVals[k]...) - } - } - sort.Sort(ids) - } else if !empty && n.Op == influxql.NEQREGEX { - // See comments above for EQ with a StringLiteral. - seriesIDs := newEvictSeriesIDs(m.SeriesIDs()) - for k := range tagVals { - if re.Val.MatchString(k) { - seriesIDs.mark(tagVals[k]) - } - } - ids = seriesIDs.evict() - } - return ids, nil, nil - } - - // compare tag values - if ref, ok := value.(*influxql.VarRef); ok { - var ids SeriesIDs - - if n.Op == influxql.NEQ { - ids = m.SeriesIDs() - } - - rhsTagVals := m.seriesByTagKeyValue[ref.Val] - for k := range tagVals { - tags := tagVals[k].Intersect(rhsTagVals[k]) - if n.Op == influxql.EQ { - ids = ids.Union(tags) - } else if n.Op == influxql.NEQ { - ids = ids.Reject(tags) - } - } - return ids, nil, nil - } - - if n.Op == influxql.NEQ || n.Op == influxql.NEQREGEX { - return m.SeriesIDs(), nil, nil - } - return nil, nil, nil -} - -// FilterExprs represents a map of series IDs to filter expressions. -type FilterExprs map[uint64]influxql.Expr - -// DeleteBoolLiteralTrues deletes all elements whose filter expression is a boolean literal true. -func (fe FilterExprs) DeleteBoolLiteralTrues() { - for id, expr := range fe { - if e, ok := expr.(*influxql.BooleanLiteral); ok && e.Val { - delete(fe, id) - } - } -} - -// Len returns the number of elements. -func (fe FilterExprs) Len() int { - if fe == nil { - return 0 - } - return len(fe) -} - -// WalkWhereForSeriesIds recursively walks the WHERE clause and returns an ordered set of series IDs and -// a map from those series IDs to filter expressions that should be used to limit points returned in -// the final query result. -func (m *Measurement) WalkWhereForSeriesIds(expr influxql.Expr) (SeriesIDs, FilterExprs, error) { - switch n := expr.(type) { - case *influxql.BinaryExpr: - switch n.Op { - case influxql.EQ, influxql.NEQ, influxql.LT, influxql.LTE, influxql.GT, influxql.GTE, influxql.EQREGEX, influxql.NEQREGEX: - // Get the series IDs and filter expression for the tag or field comparison. - ids, expr, err := m.idsForExpr(n) - if err != nil { - return nil, nil, err - } - - if len(ids) == 0 { - return ids, nil, nil - } - - // If the expression is a boolean literal that is true, ignore it. - if b, ok := expr.(*influxql.BooleanLiteral); ok && b.Val { - expr = nil - } - - var filters FilterExprs - if expr != nil { - filters = make(FilterExprs, len(ids)) - for _, id := range ids { - filters[id] = expr - } - } - - return ids, filters, nil - case influxql.AND, influxql.OR: - // Get the series IDs and filter expressions for the LHS. - lids, lfilters, err := m.WalkWhereForSeriesIds(n.LHS) - if err != nil { - return nil, nil, err - } - - // Get the series IDs and filter expressions for the RHS. - rids, rfilters, err := m.WalkWhereForSeriesIds(n.RHS) - if err != nil { - return nil, nil, err - } - - // Combine the series IDs from the LHS and RHS. - if n.Op == influxql.AND { - ids, filters := intersectSeriesFilters(lids, rids, lfilters, rfilters) - return ids, filters, nil - } else { - ids, filters := unionSeriesFilters(lids, rids, lfilters, rfilters) - return ids, filters, nil - } - } - - ids, _, err := m.idsForExpr(n) - return ids, nil, err - case *influxql.ParenExpr: - // walk down the tree - return m.WalkWhereForSeriesIds(n.Expr) - default: - return nil, nil, nil - } -} - -// expandExpr returns a list of expressions expanded by all possible tag -// combinations. -func (m *Measurement) expandExpr(expr influxql.Expr) []tagSetExpr { - // Retrieve list of unique values for each tag. - valuesByTagKey := m.uniqueTagValues(expr) - - // Convert keys to slices. - keys := make([]string, 0, len(valuesByTagKey)) - for key := range valuesByTagKey { - keys = append(keys, key) - } - sort.Strings(keys) - - // Order uniques by key. - uniques := make([][]string, len(keys)) - for i, key := range keys { - uniques[i] = valuesByTagKey[key] - } - - // Reduce a condition for each combination of tag values. - return expandExprWithValues(expr, keys, []tagExpr{}, uniques, 0) -} - -func expandExprWithValues(expr influxql.Expr, keys []string, tagExprs []tagExpr, uniques [][]string, index int) []tagSetExpr { - // If we have no more keys left then execute the reduction and return. - if index == len(keys) { - // Create a map of tag key/values. - m := make(map[string]*string, len(keys)) - for i, key := range keys { - if tagExprs[i].op == influxql.EQ { - m[key] = &tagExprs[i].values[0] - } else { - m[key] = nil - } - } - - // TODO: Rewrite full expressions instead of VarRef replacement. - - // Reduce using the current tag key/value set. - // Ignore it if reduces down to "false". - e := influxql.Reduce(expr, &tagValuer{tags: m}) - if e, ok := e.(*influxql.BooleanLiteral); ok && !e.Val { - return nil - } - - return []tagSetExpr{{values: copyTagExprs(tagExprs), expr: e}} - } - - // Otherwise expand for each possible equality value of the key. - var exprs []tagSetExpr - for _, v := range uniques[index] { - exprs = append(exprs, expandExprWithValues(expr, keys, append(tagExprs, tagExpr{keys[index], []string{v}, influxql.EQ}), uniques, index+1)...) - } - exprs = append(exprs, expandExprWithValues(expr, keys, append(tagExprs, tagExpr{keys[index], uniques[index], influxql.NEQ}), uniques, index+1)...) - - return exprs -} - -// SeriesIDsAllOrByExpr walks an expressions for matching series IDs -// or, if no expressions is given, returns all series IDs for the measurement. -func (m *Measurement) SeriesIDsAllOrByExpr(expr influxql.Expr) (SeriesIDs, error) { - // If no expression given or the measurement has no series, - // we can take just return the ids or nil accordingly. - if expr == nil { - return m.SeriesIDs(), nil - } - - m.mu.RLock() - defer m.mu.RUnlock() - - if len(m.seriesByID) == 0 { - return nil, nil - } - - // Get series IDs that match the WHERE clause. - ids, _, err := m.WalkWhereForSeriesIds(expr) - if err != nil { - return nil, err - } - - return ids, nil -} - -// tagKeysByExpr extracts the tag keys wanted by the expression. -func (m *Measurement) TagKeysByExpr(expr influxql.Expr) (map[string]struct{}, error) { - switch e := expr.(type) { - case *influxql.BinaryExpr: - switch e.Op { - case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX: - tag, ok := e.LHS.(*influxql.VarRef) - if !ok { - return nil, fmt.Errorf("left side of '%s' must be a tag key", e.Op.String()) - } else if tag.Val != "_tagKey" { - return nil, nil - } - - if influxql.IsRegexOp(e.Op) { - re, ok := e.RHS.(*influxql.RegexLiteral) - if !ok { - return nil, fmt.Errorf("right side of '%s' must be a regular expression", e.Op.String()) - } - return m.tagKeysByFilter(e.Op, "", re.Val), nil - } - - s, ok := e.RHS.(*influxql.StringLiteral) - if !ok { - return nil, fmt.Errorf("right side of '%s' must be a tag value string", e.Op.String()) - } - return m.tagKeysByFilter(e.Op, s.Val, nil), nil - - case influxql.AND, influxql.OR: - lhs, err := m.TagKeysByExpr(e.LHS) - if err != nil { - return nil, err - } - - rhs, err := m.TagKeysByExpr(e.RHS) - if err != nil { - return nil, err - } - - if lhs != nil && rhs != nil { - if e.Op == influxql.OR { - return stringSet(lhs).union(rhs), nil - } - return stringSet(lhs).intersect(rhs), nil - } else if lhs != nil { - return lhs, nil - } else if rhs != nil { - return rhs, nil - } - return nil, nil - default: - return nil, fmt.Errorf("invalid operator") - } - - case *influxql.ParenExpr: - return m.TagKeysByExpr(e.Expr) - } - - return nil, fmt.Errorf("%#v", expr) -} - -// tagKeysByFilter will filter the tag keys for the measurement. -func (m *Measurement) tagKeysByFilter(op influxql.Token, val string, regex *regexp.Regexp) stringSet { - ss := newStringSet() - for _, key := range m.TagKeys() { - var matched bool - switch op { - case influxql.EQ: - matched = key == val - case influxql.NEQ: - matched = key != val - case influxql.EQREGEX: - matched = regex.MatchString(key) - case influxql.NEQREGEX: - matched = !regex.MatchString(key) - } - - if !matched { - continue - } - ss.add(key) - } - return ss -} - -// tagValuer is used during expression expansion to evaluate all sets of tag values. -type tagValuer struct { - tags map[string]*string -} - -// Value returns the string value of a tag and true if it's listed in the tagset. -func (v *tagValuer) Value(name string) (interface{}, bool) { - if value, ok := v.tags[name]; ok { - if value == nil { - return nil, true - } - return *value, true - } - return nil, false -} - -// tagSetExpr represents a set of tag keys/values and associated expression. -type tagSetExpr struct { - values []tagExpr - expr influxql.Expr -} - -// tagExpr represents one or more values assigned to a given tag. -type tagExpr struct { - key string - values []string - op influxql.Token // EQ or NEQ -} - -func copyTagExprs(a []tagExpr) []tagExpr { - other := make([]tagExpr, len(a)) - copy(other, a) - return other -} - -// uniqueTagValues returns a list of unique tag values used in an expression. -func (m *Measurement) uniqueTagValues(expr influxql.Expr) map[string][]string { - // Track unique value per tag. - tags := make(map[string]map[string]struct{}) - - // Find all tag values referenced in the expression. - influxql.WalkFunc(expr, func(n influxql.Node) { - switch n := n.(type) { - case *influxql.BinaryExpr: - // Ignore operators that are not equality. - if n.Op != influxql.EQ { - return - } - - // Extract ref and string literal. - var key, value string - switch lhs := n.LHS.(type) { - case *influxql.VarRef: - if rhs, ok := n.RHS.(*influxql.StringLiteral); ok { - key, value = lhs.Val, rhs.Val - } - case *influxql.StringLiteral: - if rhs, ok := n.RHS.(*influxql.VarRef); ok { - key, value = rhs.Val, lhs.Val - } - } - if key == "" { - return - } - - // Add value to set. - if tags[key] == nil { - tags[key] = make(map[string]struct{}) - } - tags[key][value] = struct{}{} - } - }) - - // Convert to map of slices. - out := make(map[string][]string) - for k, values := range tags { - out[k] = make([]string, 0, len(values)) - for v := range values { - out[k] = append(out[k], v) - } - sort.Strings(out[k]) - } - return out -} - -// Measurements represents a list of *Measurement. -type Measurements []*Measurement - -// Len implements sort.Interface. -func (a Measurements) Len() int { return len(a) } - -// Less implements sort.Interface. -func (a Measurements) Less(i, j int) bool { return a[i].Name < a[j].Name } - -// Swap implements sort.Interface. -func (a Measurements) Swap(i, j int) { a[i], a[j] = a[j], a[i] } - -func (a Measurements) Intersect(other Measurements) Measurements { - l := a - r := other - - // we want to iterate through the shortest one and stop - if len(other) < len(a) { - l = other - r = a - } - - // they're in sorted order so advance the counter as needed. - // That is, don't run comparisons against lower values that we've already passed - var i, j int - - result := make(Measurements, 0, len(l)) - for i < len(l) && j < len(r) { - if l[i].Name == r[j].Name { - result = append(result, l[i]) - i++ - j++ - } else if l[i].Name < r[j].Name { - i++ - } else { - j++ - } - } - - return result -} - -func (a Measurements) Union(other Measurements) Measurements { - result := make(Measurements, 0, len(a)+len(other)) - var i, j int - for i < len(a) && j < len(other) { - if a[i].Name == other[j].Name { - result = append(result, a[i]) - i++ - j++ - } else if a[i].Name < other[j].Name { - result = append(result, a[i]) - i++ - } else { - result = append(result, other[j]) - j++ - } - } - - // now append the remainder - if i < len(a) { - result = append(result, a[i:]...) - } else if j < len(other) { - result = append(result, other[j:]...) - } - - return result -} - -// Series belong to a Measurement and represent unique time series in a database. -type Series struct { - mu sync.RWMutex - Key string - tags models.Tags - ID uint64 - measurement *Measurement - shardIDs map[uint64]struct{} // shards that have this series defined -} - -// NewSeries returns an initialized series struct -func NewSeries(key []byte, tags models.Tags) *Series { - return &Series{ - Key: string(key), - tags: tags, - shardIDs: make(map[uint64]struct{}), - } -} - -func (s *Series) AssignShard(shardID uint64) { - s.mu.RLock() - _, ok := s.shardIDs[shardID] - s.mu.RUnlock() - - if ok { - return - } - - s.mu.Lock() - // Skip the existence check under the write lock because we're just storing - // and empty struct. - s.shardIDs[shardID] = struct{}{} - s.mu.Unlock() -} - -func (s *Series) UnassignShard(shardID uint64) { - s.mu.Lock() - delete(s.shardIDs, shardID) - s.mu.Unlock() -} - -func (s *Series) Assigned(shardID uint64) bool { - s.mu.RLock() - _, ok := s.shardIDs[shardID] - s.mu.RUnlock() - return ok -} - -func (s *Series) ShardN() int { - s.mu.RLock() - n := len(s.shardIDs) - s.mu.RUnlock() - return n -} - -// Measurement returns the measurement on the series. -func (s *Series) Measurement() *Measurement { - return s.measurement -} - -// SetMeasurement sets the measurement on the series. -func (s *Series) SetMeasurement(m *Measurement) { - s.measurement = m -} - -// ForEachTag executes fn for every tag. Iteration occurs under lock. -func (s *Series) ForEachTag(fn func(models.Tag)) { - s.mu.RLock() - defer s.mu.RUnlock() - for _, t := range s.tags { - fn(t) - } -} - -// Tags returns a copy of the tags under lock. -func (s *Series) Tags() models.Tags { - s.mu.RLock() - defer s.mu.RUnlock() - return s.tags -} - -// CopyTags clones the tags on the series in-place, -func (s *Series) CopyTags() { - s.mu.Lock() - defer s.mu.Unlock() - s.tags = s.tags.Clone() -} - -// GetTagString returns a tag value under lock. -func (s *Series) GetTagString(key string) string { - s.mu.RLock() - defer s.mu.RUnlock() - return s.tags.GetString(key) -} - -// MarshalBinary encodes the object to a binary format. -func (s *Series) MarshalBinary() ([]byte, error) { - s.mu.RLock() - defer s.mu.RUnlock() - - var pb internal.Series - pb.Key = &s.Key - for _, t := range s.tags { - pb.Tags = append(pb.Tags, &internal.Tag{Key: proto.String(string(t.Key)), Value: proto.String(string(t.Value))}) - } - return proto.Marshal(&pb) -} - -// UnmarshalBinary decodes the object from a binary format. -func (s *Series) UnmarshalBinary(buf []byte) error { - s.mu.Lock() - defer s.mu.Unlock() - - var pb internal.Series - if err := proto.Unmarshal(buf, &pb); err != nil { - return err - } - s.Key = pb.GetKey() - s.tags = make(models.Tags, len(pb.Tags)) - for i, t := range pb.Tags { - s.tags[i] = models.Tag{Key: []byte(t.GetKey()), Value: []byte(t.GetValue())} - } - return nil -} - -// SeriesIDs is a convenience type for sorting, checking equality, and doing -// union and intersection of collections of series ids. -type SeriesIDs []uint64 - -// Len implements sort.Interface. -func (a SeriesIDs) Len() int { return len(a) } - -// Less implements sort.Interface. -func (a SeriesIDs) Less(i, j int) bool { return a[i] < a[j] } - -// Swap implements sort.Interface. -func (a SeriesIDs) Swap(i, j int) { a[i], a[j] = a[j], a[i] } - -// Equals assumes that both are sorted. -func (a SeriesIDs) Equals(other SeriesIDs) bool { - if len(a) != len(other) { - return false - } - for i, s := range other { - if a[i] != s { - return false - } - } - return true -} - -// Intersect returns a new collection of series ids in sorted order that is the intersection of the two. -// The two collections must already be sorted. -func (a SeriesIDs) Intersect(other SeriesIDs) SeriesIDs { - l := a - r := other - - // we want to iterate through the shortest one and stop - if len(other) < len(a) { - l = other - r = a - } - - // they're in sorted order so advance the counter as needed. - // That is, don't run comparisons against lower values that we've already passed - var i, j int - - ids := make([]uint64, 0, len(l)) - for i < len(l) && j < len(r) { - if l[i] == r[j] { - ids = append(ids, l[i]) - i++ - j++ - } else if l[i] < r[j] { - i++ - } else { - j++ - } - } - - return SeriesIDs(ids) -} - -// Union returns a new collection of series ids in sorted order that is the union of the two. -// The two collections must already be sorted. -func (a SeriesIDs) Union(other SeriesIDs) SeriesIDs { - l := a - r := other - ids := make([]uint64, 0, len(l)+len(r)) - var i, j int - for i < len(l) && j < len(r) { - if l[i] == r[j] { - ids = append(ids, l[i]) - i++ - j++ - } else if l[i] < r[j] { - ids = append(ids, l[i]) - i++ - } else { - ids = append(ids, r[j]) - j++ - } - } - - // now append the remainder - if i < len(l) { - ids = append(ids, l[i:]...) - } else if j < len(r) { - ids = append(ids, r[j:]...) - } - - return ids -} - -// Reject returns a new collection of series ids in sorted order with the passed in set removed from the original. -// This is useful for the NOT operator. The two collections must already be sorted. -func (a SeriesIDs) Reject(other SeriesIDs) SeriesIDs { - l := a - r := other - var i, j int - - ids := make([]uint64, 0, len(l)) - for i < len(l) && j < len(r) { - if l[i] == r[j] { - i++ - j++ - } else if l[i] < r[j] { - ids = append(ids, l[i]) - i++ - } else { - j++ - } - } - - // Append the remainder - if i < len(l) { - ids = append(ids, l[i:]...) - } - - return SeriesIDs(ids) -} - -// seriesID is a series id that may or may not have been evicted from the -// current id list. -type seriesID struct { - val uint64 - evict bool -} - -// evictSeriesIDs is a slice of SeriesIDs with an extra field to mark if the -// field should be evicted or not. -type evictSeriesIDs struct { - ids []seriesID - sz int -} - -// newEvictSeriesIDs copies the ids into a new slice that can be used for -// evicting series from the slice. -func newEvictSeriesIDs(ids []uint64) evictSeriesIDs { - a := make([]seriesID, len(ids)) - for i, id := range ids { - a[i].val = id - } - return evictSeriesIDs{ - ids: a, - sz: len(a), - } -} - -// mark marks all of the ids in the sorted slice to be evicted from the list of -// series ids. If an id to be evicted does not exist, it just gets ignored. -func (a *evictSeriesIDs) mark(ids []uint64) { - seriesIDs := a.ids - for _, id := range ids { - if len(seriesIDs) == 0 { - break - } - - // Perform a binary search of the remaining slice if - // the first element does not match the value we're - // looking for. - i := 0 - if seriesIDs[0].val < id { - i = sort.Search(len(seriesIDs), func(i int) bool { - return seriesIDs[i].val >= id - }) - } - - if i >= len(seriesIDs) { - break - } else if seriesIDs[i].val == id { - if !seriesIDs[i].evict { - seriesIDs[i].evict = true - a.sz-- - } - // Skip over this series since it has been evicted and won't be - // encountered again. - i++ - } - seriesIDs = seriesIDs[i:] - } -} - -// evict creates a new slice with only the series that have not been evicted. -func (a *evictSeriesIDs) evict() (ids SeriesIDs) { - if a.sz == 0 { - return ids - } - - // Make a new slice with only the remaining ids. - ids = make([]uint64, 0, a.sz) - for _, id := range a.ids { - if id.evict { - continue - } - ids = append(ids, id.val) - } - return ids -} - -// TagFilter represents a tag filter when looking up other tags or measurements. -type TagFilter struct { - Op influxql.Token - Key string - Value string - Regex *regexp.Regexp -} + "github.com/influxdata/influxdb/models" + "github.com/influxdata/influxdb/pkg/escape" +) // MarshalTags converts a tag set to bytes for use as a lookup key. func MarshalTags(tags map[string]string) []byte { @@ -1461,174 +44,6 @@ func MarshalTags(tags map[string]string) []byte { return b } -// WalkTagKeys calls fn for each tag key associated with m. The order of the -// keys is undefined. -func (m *Measurement) WalkTagKeys(fn func(k string)) { - m.mu.RLock() - defer m.mu.RUnlock() - - for k := range m.seriesByTagKeyValue { - fn(k) - } -} - -// TagKeys returns a list of the measurement's tag names, in sorted order. -func (m *Measurement) TagKeys() []string { - m.mu.RLock() - keys := make([]string, 0, len(m.seriesByTagKeyValue)) - for k := range m.seriesByTagKeyValue { - keys = append(keys, k) - } - m.mu.RUnlock() - sort.Strings(keys) - return keys -} - -// TagValues returns all the values for the given tag key, in an arbitrary order. -func (m *Measurement) TagValues(key string) []string { - m.mu.RLock() - defer m.mu.RUnlock() - values := make([]string, 0, len(m.seriesByTagKeyValue[key])) - for v := range m.seriesByTagKeyValue[key] { - values = append(values, v) - } - return values -} - -// SetFieldName adds the field name to the measurement. -func (m *Measurement) SetFieldName(name string) { - m.mu.RLock() - if _, ok := m.fieldNames[name]; ok { - m.mu.RUnlock() - return - } - m.mu.RUnlock() - - m.mu.Lock() - m.fieldNames[name] = struct{}{} - m.mu.Unlock() -} - -// FieldNames returns a list of the measurement's field names, in an arbitrary order. -func (m *Measurement) FieldNames() []string { - m.mu.RLock() - defer m.mu.RUnlock() - - a := make([]string, 0, len(m.fieldNames)) - for n := range m.fieldNames { - a = append(a, n) - } - return a -} - -func (m *Measurement) tagValuesByKeyAndSeriesID(tagKeys []string, ids SeriesIDs) map[string]stringSet { - // If no tag keys were passed, get all tag keys for the measurement. - if len(tagKeys) == 0 { - for k := range m.seriesByTagKeyValue { - tagKeys = append(tagKeys, k) - } - } - - // Mapping between tag keys to all existing tag values. - tagValues := make(map[string]stringSet, 0) - - // Iterate all series to collect tag values. - for _, id := range ids { - s, ok := m.seriesByID[id] - if !ok { - continue - } - - // Iterate the tag keys we're interested in and collect values - // from this series, if they exist. - tags := s.Tags() - for _, tagKey := range tagKeys { - if tagVal := tags.GetString(tagKey); tagVal != "" { - if _, ok = tagValues[tagKey]; !ok { - tagValues[tagKey] = newStringSet() - } - tagValues[tagKey].add(tagVal) - } - } - } - - return tagValues -} - -func (m *Measurement) SeriesByTagKeyValue(key string) map[string]SeriesIDs { - m.mu.RLock() - ret := m.seriesByTagKeyValue[key] - m.mu.RUnlock() - return ret -} - -// stringSet represents a set of strings. -type stringSet map[string]struct{} - -// newStringSet returns an empty stringSet. -func newStringSet() stringSet { - return make(map[string]struct{}) -} - -// add adds strings to the set. -func (s stringSet) add(ss ...string) { - for _, n := range ss { - s[n] = struct{}{} - } -} - -// list returns the current elements in the set, in sorted order. -func (s stringSet) list() []string { - l := make([]string, 0, len(s)) - for k := range s { - l = append(l, k) - } - sort.Strings(l) - return l -} - -// union returns the union of this set and another. -func (s stringSet) union(o stringSet) stringSet { - ns := newStringSet() - for k := range s { - ns[k] = struct{}{} - } - for k := range o { - ns[k] = struct{}{} - } - return ns -} - -// intersect returns the intersection of this set and another. -func (s stringSet) intersect(o stringSet) stringSet { - shorter, longer := s, o - if len(longer) < len(shorter) { - shorter, longer = longer, shorter - } - - ns := newStringSet() - for k := range shorter { - if _, ok := longer[k]; ok { - ns[k] = struct{}{} - } - } - return ns -} - -// filter removes v from a if it exists. a must be sorted in ascending -// order. -func filter(a []uint64, v uint64) []uint64 { - // binary search for v - i := sort.Search(len(a), func(i int) bool { return a[i] >= v }) - if i >= len(a) || a[i] != v { - return a - } - - // we found it, so shift the right half down one, overwriting v's position. - copy(a[i:], a[i+1:]) - return a[:len(a)-1] -} - // MeasurementFromSeriesKey returns the name of the measurement from a key that // contains a measurement name. func MeasurementFromSeriesKey(key []byte) []byte { @@ -1636,15 +51,3 @@ func MeasurementFromSeriesKey(key []byte) []byte { k, _ := models.ParseName(key) return escape.Unescape(k) } - -type uint64Slice []uint64 - -func (a uint64Slice) Len() int { return len(a) } -func (a uint64Slice) Swap(i, j int) { a[i], a[j] = a[j], a[i] } -func (a uint64Slice) Less(i, j int) bool { return a[i] < a[j] } - -type byTagKey []*influxql.TagSet - -func (t byTagKey) Len() int { return len(t) } -func (t byTagKey) Less(i, j int) bool { return bytes.Compare(t[i].Key, t[j].Key) < 0 } -func (t byTagKey) Swap(i, j int) { t[i], t[j] = t[j], t[i] } diff --git a/tsdb/meta_test.go b/tsdb/meta_test.go index a9a97f600e..e53b35903d 100644 --- a/tsdb/meta_test.go +++ b/tsdb/meta_test.go @@ -3,225 +3,13 @@ package tsdb_test import ( "bytes" "fmt" - "strings" "testing" - "github.com/influxdata/influxdb/influxql" "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/tsdb" + "github.com/influxdata/influxdb/tsdb/index/inmem" ) -// Test comparing SeriesIDs for equality. -func TestSeriesIDs_Equals(t *testing.T) { - ids1 := tsdb.SeriesIDs([]uint64{1, 2, 3}) - ids2 := tsdb.SeriesIDs([]uint64{1, 2, 3}) - ids3 := tsdb.SeriesIDs([]uint64{4, 5, 6}) - - if !ids1.Equals(ids2) { - t.Fatal("expected ids1 == ids2") - } else if ids1.Equals(ids3) { - t.Fatal("expected ids1 != ids3") - } -} - -// Test intersecting sets of SeriesIDs. -func TestSeriesIDs_Intersect(t *testing.T) { - // Test swaping l & r, all branches of if-else, and exit loop when 'j < len(r)' - ids1 := tsdb.SeriesIDs([]uint64{1, 3, 4, 5, 6}) - ids2 := tsdb.SeriesIDs([]uint64{1, 2, 3, 7}) - exp := tsdb.SeriesIDs([]uint64{1, 3}) - got := ids1.Intersect(ids2) - - if !exp.Equals(got) { - t.Fatalf("exp=%v, got=%v", exp, got) - } - - // Test exit for loop when 'i < len(l)' - ids1 = tsdb.SeriesIDs([]uint64{1}) - ids2 = tsdb.SeriesIDs([]uint64{1, 2}) - exp = tsdb.SeriesIDs([]uint64{1}) - got = ids1.Intersect(ids2) - - if !exp.Equals(got) { - t.Fatalf("exp=%v, got=%v", exp, got) - } -} - -// Test union sets of SeriesIDs. -func TestSeriesIDs_Union(t *testing.T) { - // Test all branches of if-else, exit loop because of 'j < len(r)', and append remainder from left. - ids1 := tsdb.SeriesIDs([]uint64{1, 2, 3, 7}) - ids2 := tsdb.SeriesIDs([]uint64{1, 3, 4, 5, 6}) - exp := tsdb.SeriesIDs([]uint64{1, 2, 3, 4, 5, 6, 7}) - got := ids1.Union(ids2) - - if !exp.Equals(got) { - t.Fatalf("exp=%v, got=%v", exp, got) - } - - // Test exit because of 'i < len(l)' and append remainder from right. - ids1 = tsdb.SeriesIDs([]uint64{1}) - ids2 = tsdb.SeriesIDs([]uint64{1, 2}) - exp = tsdb.SeriesIDs([]uint64{1, 2}) - got = ids1.Union(ids2) - - if !exp.Equals(got) { - t.Fatalf("exp=%v, got=%v", exp, got) - } -} - -// Test removing one set of SeriesIDs from another. -func TestSeriesIDs_Reject(t *testing.T) { - // Test all branches of if-else, exit loop because of 'j < len(r)', and append remainder from left. - ids1 := tsdb.SeriesIDs([]uint64{1, 2, 3, 7}) - ids2 := tsdb.SeriesIDs([]uint64{1, 3, 4, 5, 6}) - exp := tsdb.SeriesIDs([]uint64{2, 7}) - got := ids1.Reject(ids2) - - if !exp.Equals(got) { - t.Fatalf("exp=%v, got=%v", exp, got) - } - - // Test exit because of 'i < len(l)'. - ids1 = tsdb.SeriesIDs([]uint64{1}) - ids2 = tsdb.SeriesIDs([]uint64{1, 2}) - exp = tsdb.SeriesIDs{} - got = ids1.Reject(ids2) - - if !exp.Equals(got) { - t.Fatalf("exp=%v, got=%v", exp, got) - } -} - -func TestMeasurement_AppendSeriesKeysByID_Missing(t *testing.T) { - m := tsdb.NewMeasurement("cpu") - var dst []string - dst = m.AppendSeriesKeysByID(dst, []uint64{1}) - if exp, got := 0, len(dst); exp != got { - t.Fatalf("series len mismatch: exp %v, got %v", exp, got) - } -} - -func TestMeasurement_AppendSeriesKeysByID_Exists(t *testing.T) { - m := tsdb.NewMeasurement("cpu") - s := tsdb.NewSeries([]byte("cpu,host=foo"), models.Tags{models.NewTag([]byte("host"), []byte("foo"))}) - s.ID = 1 - m.AddSeries(s) - - var dst []string - dst = m.AppendSeriesKeysByID(dst, []uint64{1}) - if exp, got := 1, len(dst); exp != got { - t.Fatalf("series len mismatch: exp %v, got %v", exp, got) - } - - if exp, got := "cpu,host=foo", dst[0]; exp != got { - t.Fatalf("series mismatch: exp %v, got %v", exp, got) - } -} - -func TestMeasurement_TagsSet_Deadlock(t *testing.T) { - m := tsdb.NewMeasurement("cpu") - s1 := tsdb.NewSeries([]byte("cpu,host=foo"), models.Tags{models.NewTag([]byte("host"), []byte("foo"))}) - s1.ID = 1 - m.AddSeries(s1) - - s2 := tsdb.NewSeries([]byte("cpu,host=bar"), models.Tags{models.NewTag([]byte("host"), []byte("bar"))}) - s2.ID = 2 - m.AddSeries(s2) - - m.DropSeries(s1) - - // This was deadlocking - m.TagSets(1, influxql.IteratorOptions{}) - if got, exp := len(m.SeriesIDs()), 1; got != exp { - t.Fatalf("series count mismatch: got %v, exp %v", got, exp) - } -} - -func TestMeasurement_ForEachSeriesByExpr_Deadlock(t *testing.T) { - m := tsdb.NewMeasurement("cpu") - s1 := tsdb.NewSeries([]byte("cpu,host=foo"), models.Tags{models.NewTag([]byte("host"), []byte("foo"))}) - s1.ID = 1 - m.AddSeries(s1) - - s2 := tsdb.NewSeries([]byte("cpu,host=bar"), models.Tags{models.NewTag([]byte("host"), []byte("bar"))}) - s2.ID = 2 - m.AddSeries(s2) - - m.DropSeries(s1) - - // This was deadlocking - m.ForEachSeriesByExpr(nil, func(tags models.Tags) error { - return nil - }) - if got, exp := len(m.SeriesIDs()), 1; got != exp { - t.Fatalf("series count mismatch: got %v, exp %v", got, exp) - } -} - -func BenchmarkMeasurement_SeriesIDForExp_EQRegex(b *testing.B) { - m := tsdb.NewMeasurement("cpu") - for i := 0; i < 100000; i++ { - s := tsdb.NewSeries([]byte("cpu"), models.Tags{models.NewTag( - []byte("host"), - []byte(fmt.Sprintf("host%d", i)))}) - s.ID = uint64(i) - m.AddSeries(s) - } - - if exp, got := 100000, len(m.SeriesKeys()); exp != got { - b.Fatalf("series count mismatch: exp %v got %v", exp, got) - } - - stmt, err := influxql.NewParser(strings.NewReader(`SELECT * FROM cpu WHERE host =~ /host\d+/`)).ParseStatement() - if err != nil { - b.Fatalf("invalid statement: %s", err) - } - - selectStmt := stmt.(*influxql.SelectStatement) - - b.ResetTimer() - for i := 0; i < b.N; i++ { - ids := m.IDsForExpr(selectStmt.Condition.(*influxql.BinaryExpr)) - if exp, got := 100000, len(ids); exp != got { - b.Fatalf("series count mismatch: exp %v got %v", exp, got) - } - - } -} - -func BenchmarkMeasurement_SeriesIDForExp_NERegex(b *testing.B) { - m := tsdb.NewMeasurement("cpu") - for i := 0; i < 100000; i++ { - s := tsdb.NewSeries([]byte("cpu"), models.Tags{models.Tag{ - Key: []byte("host"), - Value: []byte(fmt.Sprintf("host%d", i))}}) - s.ID = uint64(i) - m.AddSeries(s) - } - - if exp, got := 100000, len(m.SeriesKeys()); exp != got { - b.Fatalf("series count mismatch: exp %v got %v", exp, got) - } - - stmt, err := influxql.NewParser(strings.NewReader(`SELECT * FROM cpu WHERE host !~ /foo\d+/`)).ParseStatement() - if err != nil { - b.Fatalf("invalid statement: %s", err) - } - - selectStmt := stmt.(*influxql.SelectStatement) - - b.ResetTimer() - for i := 0; i < b.N; i++ { - ids := m.IDsForExpr(selectStmt.Condition.(*influxql.BinaryExpr)) - if exp, got := 100000, len(ids); exp != got { - b.Fatalf("series count mismatch: exp %v got %v", exp, got) - } - - } - -} - // Ensure tags can be marshaled into a byte slice. func TestMarshalTags(t *testing.T) { for i, tt := range []struct { @@ -273,42 +61,9 @@ func benchmarkMarshalTags(b *testing.B, keyN int) { } } -/* -func BenchmarkCreateSeriesIndex_1K(b *testing.B) { - benchmarkCreateSeriesIndex(b, genTestSeries(38, 3, 3)) -} - -func BenchmarkCreateSeriesIndex_100K(b *testing.B) { - benchmarkCreateSeriesIndex(b, genTestSeries(32, 5, 5)) -} - -func BenchmarkCreateSeriesIndex_1M(b *testing.B) { - benchmarkCreateSeriesIndex(b, genTestSeries(330, 5, 5)) -} - -func benchmarkCreateSeriesIndex(b *testing.B, series []*TestSeries) { - idxs := make([]*tsdb.DatabaseIndex, 0, b.N) - for i := 0; i < b.N; i++ { - index, err := tsdb.NewDatabaseIndex(fmt.Sprintf("db%d", i)) - if err != nil { - b.Fatal(err) - } - idxs = append(idxs, index) - } - - b.ResetTimer() - for n := 0; n < b.N; n++ { - idx := idxs[n] - for _, s := range series { - idx.CreateSeriesIndexIfNotExists(s.Measurement, s.Series, false) - } - } -} -*/ - type TestSeries struct { Measurement string - Series *tsdb.Series + Series *inmem.Series } func genTestSeries(mCnt, tCnt, vCnt int) []*TestSeries { @@ -319,7 +74,7 @@ func genTestSeries(mCnt, tCnt, vCnt int) []*TestSeries { for _, ts := range tagSets { series = append(series, &TestSeries{ Measurement: m, - Series: tsdb.NewSeries([]byte(fmt.Sprintf("%s:%s", m, string(tsdb.MarshalTags(ts)))), models.NewTags(ts)), + Series: inmem.NewSeries([]byte(fmt.Sprintf("%s:%s", m, string(tsdb.MarshalTags(ts)))), models.NewTags(ts)), }) } } diff --git a/tsdb/shard.go b/tsdb/shard.go index 7de022cf49..01c069bcd0 100644 --- a/tsdb/shard.go +++ b/tsdb/shard.go @@ -429,12 +429,6 @@ type FieldCreate struct { Field *Field } -// SeriesCreate holds information for a series to create. -type SeriesCreate struct { - Measurement string - Series *Series -} - // WritePoints will write the raw data points and any new metadata to the index in the shard. func (s *Shard) WritePoints(points []models.Point) error { if err := s.ready(); err != nil {