diff --git a/tsdb/engine/inmem/index.go b/tsdb/engine/inmem/index.go deleted file mode 100644 index b51e365683..0000000000 --- a/tsdb/engine/inmem/index.go +++ /dev/null @@ -1,67 +0,0 @@ -package inmem - -import ( - "sort" - - "github.com/influxdata/influxdb/models" -) - -// Index represents an in-memory index. -type Index struct { - series Series - measurements map[string]Measurement -} - -// NewIndex returns a new instance of Index. -func NewIndex() *Index { - return &Index{ - measurements: make(map[string]Measurement), - } -} - -// MeasurementNames returns a sorted list of measurement names. -func (i *Index) MeasurementNames() []string { - a := make([]string, 0, len(m)) - for name := range m { - a = append(a, name) - } - sort.Strings(a) - return a -} - -// Measurement represents a measurement in the index. -type Measurement struct { - Name []byte - Deleted bool - TagSet TagSet -} - -// TagSet represents a collection of tags. -type TagSet map[string]Tag - -// Tag represents a tag key and its associated values. -type Tag struct { - Name []byte - Deleted bool - Values TagValues -} - -// TagValue represents a collection of tag values. -type TagValues map[string]TagValue - -// TagValue represents a single tag value and its associated series. -type TagValue struct { - Name []byte - Deleted bool - Series []Serie -} - -// Series represents a sorted list of serie. -type Series []Serie - -// Serie represents an individual series. -type Serie struct { - Name []byte - Tags models.Tags - Deleted bool -} diff --git a/tsdb/engine/tsi1/TODO.go b/tsdb/engine/tsi1/TODO.go new file mode 100644 index 0000000000..876e5c2c4f --- /dev/null +++ b/tsdb/engine/tsi1/TODO.go @@ -0,0 +1,473 @@ +package tsi1 + +/* +import ( + "github.com/influxdata/influxdb/influxql" +) + +// TagSets returns the unique tag sets that exist for the given tag keys. This +// is used to determine what composite series will be created by a group by. +// +// i.e. "group by region" should return: {"region":"uswest"}, +// {"region":"useast"} or region, service returns {"region": "uswest", +// "service": "redis"}, {"region": "uswest", "service": "mysql"}, etc... +// +// This will also populate the TagSet objects with the series IDs that match +// each tagset and any influx filter expression that goes with the series TODO: +// this shouldn't be exported. However, until tx.go and the engine get +// refactored into tsdb, we need it. +func (m *Measurement) TagSets(dimensions []string, condition influxql.Expr) ([]*influxql.TagSet, error) { + m.mu.RLock() + + // TODO(benbjohnson): + // Iterators are needed at the series id level and the series level. The + // series id will allow us to union faster. We can't intersect at the + // series id level because that could remove series which would intersect + // at a higher cross-file level. + // + // - IndexFile.SeriesIteratorByExpr(condition) + // - LogFile.SeriesIteratorByExpr(condition) + // + // - UnionSeriesIterators() + // - IntersectSeriesIterators() + // - unionSeriesIDIterators() + + // TODO(benbjohnson): + // Create series iterator based on condition. If condition is time-only + // the return all measurement series ids. Otherwise walk condition and merge + // series via walkWhereForSeriesIds()/idsForExpr(). + + // get the unique set of series ids and the filters that should be applied to each + ids, filters, err := m.filters(condition) + if err != nil { + m.mu.RUnlock() + return nil, err + } + + // TODO(benbjohnson): + // Iterate over each series and build tagsets with dimensions. + // Limit and offset as needed. + + // For every series, get the tag values for the requested tag keys i.e. dimensions. This is the + // TagSet for that series. Series with the same TagSet are then grouped together, because for the + // purpose of GROUP BY they are part of the same composite series. + tagSets := make(map[string]*influxql.TagSet, 64) + for _, id := range ids { + s := m.seriesByID[id] + tags := make(map[string]string, len(dimensions)) + + // Build the TagSet for this series. + for _, dim := range dimensions { + tags[dim] = s.Tags.GetString(dim) + } + // Convert the TagSet to a string, so it can be added to a map allowing TagSets to be handled + // as a set. + tagsAsKey := MarshalTags(tags) + tagSet, ok := tagSets[string(tagsAsKey)] + if !ok { + // This TagSet is new, create a new entry for it. + tagSet = &influxql.TagSet{ + Tags: tags, + Key: tagsAsKey, + } + } + // Associate the series and filter with the Tagset. + tagSet.AddFilter(m.seriesByID[id].Key, filters[id]) + + // Ensure it's back in the map. + tagSets[string(tagsAsKey)] = tagSet + } + // Release the lock while we sort all the tags + m.mu.RUnlock() + + // Sort the series in each tag set. + for _, t := range tagSets { + sort.Sort(t) + } + + // The TagSets have been created, as a map of TagSets. Just send + // the values back as a slice, sorting for consistency. + sortedTagsSets := make([]*influxql.TagSet, 0, len(tagSets)) + for _, v := range tagSets { + sortedTagsSets = append(sortedTagsSets, v) + } + sort.Sort(byTagKey(sortedTagsSets)) + + return sortedTagsSets, nil +} + +// filters walks the where clause of a select statement and returns a map with all series ids +// matching the where clause and any filter expression that should be applied to each +func (m *Measurement) filters(condition influxql.Expr) ([]uint64, map[uint64]influxql.Expr, error) { + if condition == nil || influxql.OnlyTimeExpr(condition) { + return m.seriesIDs, nil, nil + } + return m.walkWhereForSeriesIds(condition) +} + +// walkWhereForSeriesIds recursively walks the WHERE clause and returns an +// ordered set of series IDs and a map from those series IDs to filter +// expressions that should be used to limit points returned in the final query +// result. +func (m *Measurement) walkWhereForSeriesIds(expr influxql.Expr) (SeriesIDs, FilterExprs, error) { + switch n := expr.(type) { + case *influxql.BinaryExpr: + switch n.Op { + case influxql.EQ, influxql.NEQ, influxql.LT, influxql.LTE, influxql.GT, influxql.GTE, influxql.EQREGEX, influxql.NEQREGEX: + // Get the series IDs and filter expression for the tag or field comparison. + ids, expr, err := m.idsForExpr(n) + if err != nil { + return nil, nil, err + } + + if len(ids) == 0 { + return ids, nil, nil + } + + // If the expression is a boolean literal that is true, ignore it. + if b, ok := expr.(*influxql.BooleanLiteral); ok && b.Val { + expr = nil + } + + var filters FilterExprs + if expr != nil { + filters = make(FilterExprs, len(ids)) + for _, id := range ids { + filters[id] = expr + } + } + + return ids, filters, nil + case influxql.AND, influxql.OR: + // Get the series IDs and filter expressions for the LHS. + lids, lfilters, err := m.walkWhereForSeriesIds(n.LHS) + if err != nil { + return nil, nil, err + } + + // Get the series IDs and filter expressions for the RHS. + rids, rfilters, err := m.walkWhereForSeriesIds(n.RHS) + if err != nil { + return nil, nil, err + } + + // Combine the series IDs from the LHS and RHS. + if n.Op == influxql.AND { + ids, filters := intersectSeriesFilters(lids, rids, lfilters, rfilters) + return ids, filters, nil + } else { + ids, filters := unionSeriesFilters(lids, rids, lfilters, rfilters) + return ids, filters, nil + } + } + + ids, _, err := m.idsForExpr(n) + return ids, nil, err + case *influxql.ParenExpr: + // walk down the tree + return m.walkWhereForSeriesIds(n.Expr) + default: + return nil, nil, nil + } +} + +// intersectSeriesFilters performs an intersection for two sets of ids and filter expressions. +func intersectSeriesFilters(lids, rids SeriesIDs, lfilters, rfilters FilterExprs) (SeriesIDs, FilterExprs) { + // We only want to allocate a slice and map of the smaller size. + var ids []uint64 + if len(lids) > len(rids) { + ids = make([]uint64, 0, len(rids)) + } else { + ids = make([]uint64, 0, len(lids)) + } + + var filters FilterExprs + if len(lfilters) > len(rfilters) { + filters = make(FilterExprs, len(rfilters)) + } else { + filters = make(FilterExprs, len(lfilters)) + } + + // They're in sorted order so advance the counter as needed. + // This is, don't run comparisons against lower values that we've already passed. + for len(lids) > 0 && len(rids) > 0 { + lid, rid := lids[0], rids[0] + if lid == rid { + ids = append(ids, lid) + + var expr influxql.Expr + lfilter := lfilters[lid] + rfilter := rfilters[rid] + + if lfilter != nil && rfilter != nil { + be := &influxql.BinaryExpr{ + Op: influxql.AND, + LHS: lfilter, + RHS: rfilter, + } + expr = influxql.Reduce(be, nil) + } else if lfilter != nil { + expr = lfilter + } else if rfilter != nil { + expr = rfilter + } + + if expr != nil { + filters[lid] = expr + } + lids, rids = lids[1:], rids[1:] + } else if lid < rid { + lids = lids[1:] + } else { + rids = rids[1:] + } + } + return ids, filters +} + +// unionSeriesFilters performs a union for two sets of ids and filter expressions. +func unionSeriesFilters(lids, rids SeriesIDs, lfilters, rfilters FilterExprs) (SeriesIDs, FilterExprs) { + ids := make([]uint64, 0, len(lids)+len(rids)) + + // Setup the filters with the smallest size since we will discard filters + // that do not have a match on the other side. + var filters FilterExprs + if len(lfilters) < len(rfilters) { + filters = make(FilterExprs, len(lfilters)) + } else { + filters = make(FilterExprs, len(rfilters)) + } + + for len(lids) > 0 && len(rids) > 0 { + lid, rid := lids[0], rids[0] + if lid == rid { + ids = append(ids, lid) + + // If one side does not have a filter, then the series has been + // included on one side of the OR with no condition. Eliminate the + // filter in this case. + var expr influxql.Expr + lfilter := lfilters[lid] + rfilter := rfilters[rid] + if lfilter != nil && rfilter != nil { + be := &influxql.BinaryExpr{ + Op: influxql.OR, + LHS: lfilter, + RHS: rfilter, + } + expr = influxql.Reduce(be, nil) + } + + if expr != nil { + filters[lid] = expr + } + lids, rids = lids[1:], rids[1:] + } else if lid < rid { + ids = append(ids, lid) + + filter := lfilters[lid] + if filter != nil { + filters[lid] = filter + } + lids = lids[1:] + } else { + ids = append(ids, rid) + + filter := rfilters[rid] + if filter != nil { + filters[rid] = filter + } + rids = rids[1:] + } + } + + // Now append the remainder. + if len(lids) > 0 { + for i := 0; i < len(lids); i++ { + ids = append(ids, lids[i]) + + filter := lfilters[lids[i]] + if filter != nil { + filters[lids[i]] = filter + } + } + } else if len(rids) > 0 { + for i := 0; i < len(rids); i++ { + ids = append(ids, rids[i]) + + filter := rfilters[rids[i]] + if filter != nil { + filters[rids[i]] = filter + } + } + } + return ids, filters +} + +// idsForExpr will return a collection of series ids and a filter expression that should +// be used to filter points from those series. +func (m *Measurement) idsForExpr(n *influxql.BinaryExpr) (SeriesIDs, influxql.Expr, error) { + // If this binary expression has another binary expression, then this + // is some expression math and we should just pass it to the underlying query. + if _, ok := n.LHS.(*influxql.BinaryExpr); ok { + return m.seriesIDs, n, nil + } else if _, ok := n.RHS.(*influxql.BinaryExpr); ok { + return m.seriesIDs, n, nil + } + + // Retrieve the variable reference from the correct side of the expression. + name, ok := n.LHS.(*influxql.VarRef) + value := n.RHS + if !ok { + name, ok = n.RHS.(*influxql.VarRef) + if !ok { + return nil, nil, fmt.Errorf("invalid expression: %s", n.String()) + } + value = n.LHS + } + + // For time literals, return all series IDs and "true" as the filter. + if _, ok := value.(*influxql.TimeLiteral); ok || name.Val == "time" { + return m.seriesIDs, &influxql.BooleanLiteral{Val: true}, nil + } + + // For fields, return all series IDs from this measurement and return + // the expression passed in, as the filter. + if name.Val != "_name" && ((name.Type == influxql.Unknown && m.hasField(name.Val)) || name.Type == influxql.AnyField || (name.Type != influxql.Tag && name.Type != influxql.Unknown)) { + return m.seriesIDs, n, nil + } else if value, ok := value.(*influxql.VarRef); ok { + // Check if the RHS is a variable and if it is a field. + if value.Val != "_name" && ((value.Type == influxql.Unknown && m.hasField(value.Val)) || name.Type == influxql.AnyField || (value.Type != influxql.Tag && value.Type != influxql.Unknown)) { + return m.seriesIDs, n, nil + } + } + + // Retrieve list of series with this tag key. + tagVals := m.seriesByTagKeyValue[name.Val] + + // if we're looking for series with a specific tag value + if str, ok := value.(*influxql.StringLiteral); ok { + var ids SeriesIDs + + // Special handling for "_name" to match measurement name. + if name.Val == "_name" { + if (n.Op == influxql.EQ && str.Val == m.Name) || (n.Op == influxql.NEQ && str.Val != m.Name) { + return m.seriesIDs, nil, nil + } + return nil, nil, nil + } + + if n.Op == influxql.EQ { + if str.Val != "" { + // return series that have a tag of specific value. + ids = tagVals[str.Val] + } else { + // Make a copy of all series ids and mark the ones we need to evict. + seriesIDs := newEvictSeriesIDs(m.seriesIDs) + + // Go through each slice and mark the values we find as zero so + // they can be removed later. + for _, a := range tagVals { + seriesIDs.mark(a) + } + + // Make a new slice with only the remaining ids. + ids = seriesIDs.evict() + } + } else if n.Op == influxql.NEQ { + if str.Val != "" { + ids = m.seriesIDs.Reject(tagVals[str.Val]) + } else { + for k := range tagVals { + ids = append(ids, tagVals[k]...) + } + sort.Sort(ids) + } + } + return ids, nil, nil + } + + // if we're looking for series with a tag value that matches a regex + if re, ok := value.(*influxql.RegexLiteral); ok { + var ids SeriesIDs + + // Special handling for "_name" to match measurement name. + if name.Val == "_name" { + match := re.Val.MatchString(m.Name) + if (n.Op == influxql.EQREGEX && match) || (n.Op == influxql.NEQREGEX && !match) { + return m.seriesIDs, &influxql.BooleanLiteral{Val: true}, nil + } + return nil, nil, nil + } + + // Check if we match the empty string to see if we should include series + // that are missing the tag. + empty := re.Val.MatchString("") + + // Gather the series that match the regex. If we should include the empty string, + // start with the list of all series and reject series that don't match our condition. + // If we should not include the empty string, include series that match our condition. + if empty && n.Op == influxql.EQREGEX { + // See comments above for EQ with a StringLiteral. + seriesIDs := newEvictSeriesIDs(m.seriesIDs) + for k := range tagVals { + if !re.Val.MatchString(k) { + seriesIDs.mark(tagVals[k]) + } + } + ids = seriesIDs.evict() + } else if empty && n.Op == influxql.NEQREGEX { + ids = make(SeriesIDs, 0, len(m.seriesIDs)) + for k := range tagVals { + if !re.Val.MatchString(k) { + ids = append(ids, tagVals[k]...) + } + } + sort.Sort(ids) + } else if !empty && n.Op == influxql.EQREGEX { + ids = make(SeriesIDs, 0, len(m.seriesIDs)) + for k := range tagVals { + if re.Val.MatchString(k) { + ids = append(ids, tagVals[k]...) + } + } + sort.Sort(ids) + } else if !empty && n.Op == influxql.NEQREGEX { + // See comments above for EQ with a StringLiteral. + seriesIDs := newEvictSeriesIDs(m.seriesIDs) + for k := range tagVals { + if re.Val.MatchString(k) { + seriesIDs.mark(tagVals[k]) + } + } + ids = seriesIDs.evict() + } + return ids, nil, nil + } + + // compare tag values + if ref, ok := value.(*influxql.VarRef); ok { + var ids SeriesIDs + + if n.Op == influxql.NEQ { + ids = m.seriesIDs + } + + rhsTagVals := m.seriesByTagKeyValue[ref.Val] + for k := range tagVals { + tags := tagVals[k].Intersect(rhsTagVals[k]) + if n.Op == influxql.EQ { + ids = ids.Union(tags) + } else if n.Op == influxql.NEQ { + ids = ids.Reject(tags) + } + } + return ids, nil, nil + } + + if n.Op == influxql.NEQ || n.Op == influxql.NEQREGEX { + return m.seriesIDs, nil, nil + } + return nil, nil, nil +} +*/ diff --git a/tsdb/engine/tsi1/index.go b/tsdb/engine/tsi1/index.go index 4549a32b50..12397247bc 100644 --- a/tsdb/engine/tsi1/index.go +++ b/tsdb/engine/tsi1/index.go @@ -1,6 +1,7 @@ package tsi1 import ( + "bytes" "fmt" "regexp" "sort" @@ -16,11 +17,8 @@ var _ tsdb.Index = &Index{} // Index represents a collection of layered index files and WAL. type Index struct { - file *IndexFile - - // TODO(benbjohnson): Use layered list of index files. - - // TODO(benbjohnson): Add write ahead log. + logFiles []*LogFile + indexFiles IndexFiles } // Open opens the index. @@ -29,11 +27,23 @@ func (i *Index) Open() error { panic("TODO") } // Close closes the index. func (i *Index) Close() error { panic("TODO") } -// SetFile explicitly sets a file in the index. -func (i *Index) SetFile(f *IndexFile) { i.file = f } +// SetLogFiles explicitly sets log files. +// TEMPORARY: For testing only. +func (i *Index) SetLogFiles(a ...*LogFile) { i.logFiles = a } -func (i *Index) CreateMeasurementIndexIfNotExists(name string) (*tsdb.Measurement, error) { - panic("TODO: Requires WAL") +// SetIndexFiles explicitly sets index files +// TEMPORARY: For testing only. +func (i *Index) SetIndexFiles(a ...*IndexFile) { i.indexFiles = IndexFiles(a) } + +// FileN returns the number of log and index files within the index. +func (i *Index) FileN() int { return len(i.logFiles) + len(i.indexFiles) } + +func (i *Index) CreateMeasurementIndexIfNotExists(name []byte) (*tsdb.Measurement, error) { + // FIXME(benbjohnson): Read lock log file during lookup. + if mm := i.measurement(name); mm == nil { + return mm, nil + } + return i.logFiles[0].CreateMeasurementIndexIfNotExists(name) } // Measurement retrieves a measurement by name. @@ -45,7 +55,7 @@ func (i *Index) measurement(name []byte) *tsdb.Measurement { m := tsdb.NewMeasurement(string(name)) // Iterate over measurement series. - itr := i.file.MeasurementSeriesIterator(name) + itr := i.MeasurementSeriesIterator(name) var id uint64 // TEMPORARY for e := itr.Next(); e != nil; e = itr.Next() { @@ -69,16 +79,40 @@ func (i *Index) measurement(name []byte) *tsdb.Measurement { return m } +// MeasurementSeriesIterator returns an iterator over all series in the index. +func (i *Index) MeasurementSeriesIterator(name []byte) SeriesIterator { + a := make([]SeriesIterator, 0, i.FileN()) + for _, f := range i.logFiles { + a = append(a, f.MeasurementSeriesIterator(name)) + } + for _, f := range i.indexFiles { + a = append(a, f.MeasurementSeriesIterator(name)) + } + return MergeSeriesIterators(a...) +} + // Measurements returns a list of all measurements. func (i *Index) Measurements() (tsdb.Measurements, error) { var mms tsdb.Measurements - itr := i.file.MeasurementIterator() + itr := i.MeasurementIterator() for e := itr.Next(); e != nil; e = itr.Next() { mms = append(mms, i.measurement(e.Name())) } return mms, nil } +// MeasurementIterator returns an iterator over all measurements in the index. +func (i *Index) MeasurementIterator() MeasurementIterator { + a := make([]MeasurementIterator, 0, i.FileN()) + for _, f := range i.logFiles { + a = append(a, f.MeasurementIterator()) + } + for _, f := range i.indexFiles { + a = append(a, f.MeasurementIterator()) + } + return MergeMeasurementIterators(a...) +} + func (i *Index) MeasurementsByExpr(expr influxql.Expr) (tsdb.Measurements, bool, error) { return i.measurementsByExpr(expr) } @@ -159,7 +193,7 @@ func (i *Index) measurementsByExpr(expr influxql.Expr) (tsdb.Measurements, bool, // measurementsByNameFilter returns the sorted measurements matching a name. func (i *Index) measurementsByNameFilter(op influxql.Token, val string, regex *regexp.Regexp) tsdb.Measurements { var mms tsdb.Measurements - itr := i.file.MeasurementIterator() + itr := i.MeasurementIterator() for e := itr.Next(); e != nil; e = itr.Next() { var matched bool switch op { @@ -183,7 +217,7 @@ func (i *Index) measurementsByNameFilter(op influxql.Token, val string, regex *r func (i *Index) measurementsByTagFilter(op influxql.Token, key, val string, regex *regexp.Regexp) tsdb.Measurements { var mms tsdb.Measurements - itr := i.file.MeasurementIterator() + itr := i.MeasurementIterator() for e := itr.Next(); e != nil; e = itr.Next() { mm := i.measurement(e.Name()) @@ -228,12 +262,12 @@ func (i *Index) measurementsByTagFilter(op influxql.Token, key, val string, rege return mms } -func (i *Index) MeasurementsByName(names []string) ([]*tsdb.Measurement, error) { - itr := i.file.MeasurementIterator() +func (i *Index) MeasurementsByName(names [][]byte) ([]*tsdb.Measurement, error) { + itr := i.MeasurementIterator() mms := make([]*tsdb.Measurement, 0, len(names)) for e := itr.Next(); e != nil; e = itr.Next() { for _, name := range names { - if string(e.Name()) == name { + if bytes.Equal(e.Name(), name) { mms = append(mms, i.measurement(e.Name())) break } @@ -243,7 +277,7 @@ func (i *Index) MeasurementsByName(names []string) ([]*tsdb.Measurement, error) } func (i *Index) MeasurementsByRegex(re *regexp.Regexp) (tsdb.Measurements, error) { - itr := i.file.MeasurementIterator() + itr := i.MeasurementIterator() var mms tsdb.Measurements for e := itr.Next(); e != nil; e = itr.Next() { if re.Match(e.Name()) { @@ -257,7 +291,7 @@ func (i *Index) DropMeasurement(name []byte) error { panic("TODO: Requires WAL") } -func (i *Index) CreateSeriesIndexIfNotExists(measurement string, series *tsdb.Series) (*tsdb.Series, error) { +func (i *Index) CreateSeriesIndexIfNotExists(measurement []byte, series *tsdb.Series) (*tsdb.Series, error) { panic("TODO: Requires WAL") } @@ -265,7 +299,7 @@ func (i *Index) Series(key []byte) (*tsdb.Series, error) { panic("TODO") } -func (i *Index) DropSeries(keys []string) error { +func (i *Index) DropSeries(keys [][]byte) error { panic("TODO: Requires WAL") } @@ -279,7 +313,7 @@ func (i *Index) SeriesN() (n uint64, err error) { // return n, nil } -func (i *Index) TagsForSeries(key string) (models.Tags, error) { +func (i *Index) TagsForSeries(key []byte) (models.Tags, error) { ss, err := i.Series([]byte(key)) if err != nil { return nil, err @@ -297,3 +331,263 @@ func (i *Index) MeasurementsSketches() (estimator.Sketch, estimator.Sketch, erro // Dereference is a nop. func (i *Index) Dereference([]byte) {} + +// TagKeySeriesIterator returns a series iterator for all values across a single key. +func (i *Index) TagKeySeriesIterator(name, key []byte) SeriesIterator { + panic("TODO") +} + +// TagValueSeriesIterator returns a series iterator for a single tag value. +func (i *Index) TagValueSeriesIterator(name, key, value []byte) SeriesIterator { + panic("TODO") +} + +// MatchTagValueSeriesIterator returns a series iterator for tags which match value. +// If matches is false, returns iterators which do not match value. +func (i *Index) MatchTagValueSeriesIterator(name, key []byte, value *regexp.Regexp, matches bool) SeriesIterator { + panic("TODO") + + /* + // Check if we match the empty string to see if we should include series + // that are missing the tag. + empty := value.MatchString("") + + // Gather the series that match the regex. If we should include the empty string, + // start with the list of all series and reject series that don't match our condition. + // If we should not include the empty string, include series that match our condition. + if op == influxql.EQREGEX { + + if empty { + // See comments above for EQ with a StringLiteral. + seriesIDs := newEvictSeriesIDs(m.seriesIDs) + for k := range tagVals { + if !re.Val.MatchString(k) { + seriesIDs.mark(tagVals[k]) + } + } + return seriesIDs.evict(), nil, nil + } + ids = make(SeriesIDs, 0, len(m.seriesIDs)) + for k := range tagVals { + if re.Val.MatchString(k) { + ids = append(ids, tagVals[k]...) + } + } + sort.Sort(ids) + return ids, nil, nil + + } + + // Compare not-equal to empty string. + if empty { + ids = make(SeriesIDs, 0, len(m.seriesIDs)) + for k := range tagVals { + if !re.Val.MatchString(k) { + ids = append(ids, tagVals[k]...) + } + } + sort.Sort(ids) + return ids, nil, nil + } + + // Compare not-equal to empty string. + seriesIDs := newEvictSeriesIDs(m.seriesIDs) + for k := range tagVals { + if re.Val.MatchString(k) { + seriesIDs.mark(tagVals[k]) + } + } + + return seriesIDs.evict(), nil, nil + */ +} + +// TagSets returns an ordered list of tag sets for a measurement by dimension +// and filtered by an optional conditional expression. +func (i *Index) TagSets(name []byte, dimensions []string, condition influxql.Expr) ([]*influxql.TagSet, error) { + var tagSets []*influxql.TagSet + // TODO(benbjohnson): Iterate over filtered series and build tag sets. + return tagSets, nil +} + +// MeasurementSeriesByExprIterator returns a series iterator for a measurement +// that is filtered by expr. If expr only contains time expressions then this +// call is equivalent to MeasurementSeriesIterator(). +func (i *Index) MeasurementSeriesByExprIterator(name []byte, expr influxql.Expr) (SeriesIterator, error) { + // Return all series for the measurement if there are no tag expressions. + if expr == nil || influxql.OnlyTimeExpr(expr) { + return i.MeasurementSeriesIterator(name), nil + } + return i.seriesByExprIterator(name, expr) +} + +func (i *Index) seriesByExprIterator(name []byte, expr influxql.Expr) (SeriesIterator, error) { + switch expr := expr.(type) { + case *influxql.BinaryExpr: + switch expr.Op { + case influxql.AND, influxql.OR: + // Get the series IDs and filter expressions for the LHS. + litr, err := i.seriesByExprIterator(name, expr.LHS) + if err != nil { + return nil, err + } + + // Get the series IDs and filter expressions for the RHS. + ritr, err := i.seriesByExprIterator(name, expr.RHS) + if err != nil { + return nil, err + } + + // Intersect iterators if expression is "AND". + if expr.Op == influxql.AND { + return IntersectSeriesIterators(litr, ritr), nil + } + + // Union iterators if expression is "OR". + return UnionSeriesIterators(litr, ritr), nil + + default: + return i.seriesByBinaryExprIterator(name, expr) + } + + case *influxql.ParenExpr: + return i.seriesByExprIterator(name, expr.Expr) + + default: + return nil, nil + } +} + +// seriesByBinaryExprIterator returns a series iterator and a filtering expression. +func (i *Index) seriesByBinaryExprIterator(name []byte, n *influxql.BinaryExpr) (SeriesIterator, error) { + // If this binary expression has another binary expression, then this + // is some expression math and we should just pass it to the underlying query. + if _, ok := n.LHS.(*influxql.BinaryExpr); ok { + return newSeriesExprIterator(i.MeasurementSeriesIterator(name), n), nil + } else if _, ok := n.RHS.(*influxql.BinaryExpr); ok { + return newSeriesExprIterator(i.MeasurementSeriesIterator(name), n), nil + } + + // Retrieve the variable reference from the correct side of the expression. + key, ok := n.LHS.(*influxql.VarRef) + value := n.RHS + if !ok { + key, ok = n.RHS.(*influxql.VarRef) + if !ok { + return nil, fmt.Errorf("invalid expression: %s", n.String()) + } + value = n.LHS + } + + // For time literals, return all series and "true" as the filter. + if _, ok := value.(*influxql.TimeLiteral); ok || key.Val == "time" { + return newSeriesExprIterator(i.MeasurementSeriesIterator(name), &influxql.BooleanLiteral{Val: true}), nil + } + + /* + // For fields, return all series from this measurement. + if key.Val != "_name" && ((key.Type == influxql.Unknown && i.hasField(key.Val)) || key.Type == influxql.AnyField || (key.Type != influxql.Tag && key.Type != influxql.Unknown)) { + return newSeriesExprIterator(i.MeasurementSeriesIterator(name), n), nil + } else if value, ok := value.(*influxql.VarRef); ok { + // Check if the RHS is a variable and if it is a field. + if value.Val != "_name" && ((value.Type == influxql.Unknown && i.hasField(value.Val)) || key.Type == influxql.AnyField || (value.Type != influxql.Tag && value.Type != influxql.Unknown)) { + return newSeriesExprIterator(i.MeasurementSeriesIterator(name), n), nil + } + } + */ + + // Create iterator based on value type. + switch value := value.(type) { + case *influxql.StringLiteral: + return i.seriesByBinaryExprStringIterator(name, []byte(key.Val), []byte(value.Val), n.Op) + case *influxql.RegexLiteral: + return i.seriesByBinaryExprRegexIterator(name, []byte(key.Val), value.Val, n.Op) + case *influxql.VarRef: + return i.seriesByBinaryExprVarRefIterator(name, []byte(key.Val), value, n.Op) + default: + if n.Op == influxql.NEQ || n.Op == influxql.NEQREGEX { + return i.MeasurementSeriesIterator(name), nil + } + return nil, nil + } +} + +func (i *Index) seriesByBinaryExprStringIterator(name, key, value []byte, op influxql.Token) (SeriesIterator, error) { + // Special handling for "_name" to match measurement name. + if bytes.Equal(key, []byte("_name")) { + if (op == influxql.EQ && bytes.Equal(value, name)) || (op == influxql.NEQ && !bytes.Equal(value, name)) { + return i.MeasurementSeriesIterator(name), nil + } + return nil, nil + } + + if op == influxql.EQ { + // Match a specific value. + if len(value) != 0 { + return i.TagValueSeriesIterator(name, key, value), nil + } + + // Return all measurement series that have no values from this tag key. + return DifferenceSeriesIterators( + i.MeasurementSeriesIterator(name), + i.TagKeySeriesIterator(name, key), + ), nil + } + + // Return all measurement series without this tag value. + if len(value) != 0 { + return DifferenceSeriesIterators( + i.MeasurementSeriesIterator(name), + i.TagValueSeriesIterator(name, key, value), + ), nil + } + + // Return all series across all values of this tag key. + return i.TagKeySeriesIterator(name, key), nil +} + +func (i *Index) seriesByBinaryExprRegexIterator(name, key []byte, value *regexp.Regexp, op influxql.Token) (SeriesIterator, error) { + // Special handling for "_name" to match measurement name. + if bytes.Equal(key, []byte("_name")) { + match := value.Match(name) + if (op == influxql.EQREGEX && match) || (op == influxql.NEQREGEX && !match) { + return newSeriesExprIterator(i.MeasurementSeriesIterator(name), &influxql.BooleanLiteral{Val: true}), nil + } + return nil, nil + } + return i.MatchTagValueSeriesIterator(name, key, value, op == influxql.EQREGEX), nil +} + +func (i *Index) seriesByBinaryExprVarRefIterator(name, key []byte, value *influxql.VarRef, op influxql.Token) (SeriesIterator, error) { + if op == influxql.EQ { + return IntersectSeriesIterators( + i.TagKeySeriesIterator(name, key), + i.TagKeySeriesIterator(name, []byte(value.Val)), + ), nil + } + + return DifferenceSeriesIterators( + i.TagKeySeriesIterator(name, key), + i.TagKeySeriesIterator(name, []byte(value.Val)), + ), nil +} + +// FilterExprs represents a map of series IDs to filter expressions. +type FilterExprs map[uint64]influxql.Expr + +// DeleteBoolLiteralTrues deletes all elements whose filter expression is a boolean literal true. +func (fe FilterExprs) DeleteBoolLiteralTrues() { + for id, expr := range fe { + if e, ok := expr.(*influxql.BooleanLiteral); ok && e.Val == true { + delete(fe, id) + } + } +} + +// Len returns the number of elements. +func (fe FilterExprs) Len() int { + if fe == nil { + return 0 + } + return len(fe) +} diff --git a/tsdb/engine/tsi1/index_test.go b/tsdb/engine/tsi1/index_test.go index 27888637ee..056059c4bd 100644 --- a/tsdb/engine/tsi1/index_test.go +++ b/tsdb/engine/tsi1/index_test.go @@ -21,7 +21,7 @@ func TestIndex_Measurement(t *testing.T) { // Create an index from the single file. var idx tsi1.Index - idx.SetFile(f) + idx.SetIndexFiles(f) // Verify measurement is correct. if mm, err := idx.Measurement([]byte("cpu")); err != nil { @@ -52,7 +52,7 @@ func TestIndex_Measurements(t *testing.T) { // Create an index from the single file. var idx tsi1.Index - idx.SetFile(f) + idx.SetIndexFiles(f) // Retrieve measurements and verify. if mms, err := idx.Measurements(); err != nil { diff --git a/tsdb/engine/tsi1/log_file.go b/tsdb/engine/tsi1/log_file.go index bcd16b838b..ccbe195f09 100644 --- a/tsdb/engine/tsi1/log_file.go +++ b/tsdb/engine/tsi1/log_file.go @@ -10,6 +10,7 @@ import ( "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/pkg/mmap" + "github.com/influxdata/influxdb/tsdb" ) // Log entry flag constants. @@ -96,6 +97,10 @@ func (f *LogFile) Close() error { return nil } +func (f *LogFile) CreateMeasurementIndexIfNotExists(name []byte) (*tsdb.Measurement, error) { + panic("TODO") +} + // DeleteMeasurement adds a tombstone for a measurement to the log file. func (f *LogFile) DeleteMeasurement(name []byte) error { // Append log entry. @@ -223,6 +228,11 @@ func (f *LogFile) MeasurementIterator() MeasurementIterator { return &itr } +// MeasurementSeriesIterator returns an iterator over all series in the log file. +func (f *LogFile) MeasurementSeriesIterator(name []byte) SeriesIterator { + panic("TODO") +} + // CompactTo compacts the log file and writes it to w. func (f *LogFile) CompactTo(w io.Writer) (n int64, err error) { var t IndexFileTrailer diff --git a/tsdb/engine/tsi1/series_block.go b/tsdb/engine/tsi1/series_block.go index 1c33729dd6..e0c3b4f914 100644 --- a/tsdb/engine/tsi1/series_block.go +++ b/tsdb/engine/tsi1/series_block.go @@ -9,6 +9,7 @@ import ( "math" "sort" + "github.com/influxdata/influxdb/influxql" "github.com/influxdata/influxdb/models" ) @@ -303,6 +304,10 @@ func (e *seriesBlockElem) Name() []byte { return e.name } // Tags returns the tag set. func (e *seriesBlockElem) Tags() models.Tags { return e.tags } +// Expr always returns a nil expression. +// This is only used by higher level query planning. +func (e *seriesBlockElem) Expr() influxql.Expr { return nil } + // SeriesBlockWriter writes a SeriesBlock. type SeriesBlockWriter struct { terms map[string]int // term frequency diff --git a/tsdb/engine/tsi1/tsi1.go b/tsdb/engine/tsi1/tsi1.go index d9242c6f4e..22c59dccf4 100644 --- a/tsdb/engine/tsi1/tsi1.go +++ b/tsdb/engine/tsi1/tsi1.go @@ -8,6 +8,7 @@ import ( "os" "github.com/cespare/xxhash" + "github.com/influxdata/influxdb/influxql" "github.com/influxdata/influxdb/models" ) @@ -348,6 +349,17 @@ type SeriesElem interface { Name() []byte Tags() models.Tags Deleted() bool + + // InfluxQL expression associated with series during filtering. + Expr() influxql.Expr +} + +// CompareSeriesElem returns -1 if a < b, 1 if a > b, and 0 if equal. +func CompareSeriesElem(a, b SeriesElem) int { + if cmp := bytes.Compare(a.Name(), b.Name()); cmp != 0 { + return cmp + } + return models.CompareTags(a.Tags(), b.Tags()) } // SeriesIterator represents a iterator over a list of series. @@ -363,19 +375,13 @@ func MergeSeriesIterators(itrs ...SeriesIterator) SeriesIterator { return nil } - itr := &seriesMergeIterator{ + return &seriesMergeIterator{ buf: make([]SeriesElem, len(itrs)), itrs: itrs, } - - // Initialize buffers. - for i := range itr.itrs { - itr.buf[i] = itr.itrs[i].Next() - } - - return itr } +// seriesMergeIterator is an iterator that merges multiple iterators together. type seriesMergeIterator struct { buf []SeriesElem itrs []SeriesIterator @@ -434,63 +440,230 @@ func (itr *seriesMergeIterator) Next() SeriesElem { return e } -// seriesIDIterator represents a iterator over a list of series ids. -type seriesIDIterator interface { - next() uint32 -} - -// unionSeriesIDIterators returns an iterator returns a union of iterators. -func unionSeriesIDIterators(itrs ...seriesIDIterator) seriesIDIterator { - if len(itrs) == 0 { +// IntersectSeriesIterators returns an iterator that only returns series which +// occur in both iterators. If both series have associated expressions then +// they are combined together. +func IntersectSeriesIterators(itr0, itr1 SeriesIterator) SeriesIterator { + if itr0 == nil || itr1 == nil { return nil } - itr := &unionIterator{ - buf: make([]uint32, len(itrs)), - itrs: itrs, - } - - // Initialize buffers. - for i := range itr.itrs { - itr.buf[i] = itr.itrs[i].next() - } - - return itr + return &seriesIntersectIterator{itrs: [2]SeriesIterator{itr0, itr1}} } -type unionIterator struct { - buf []uint32 - itrs []seriesIDIterator +// seriesIntersectIterator is an iterator that merges two iterators together. +type seriesIntersectIterator struct { + e seriesExprElem + buf [2]SeriesElem + itrs [2]SeriesIterator } -// next returns the next series id. Duplicates are combined. -func (itr *unionIterator) next() uint32 { - // Find next series id in the buffers. - var id uint32 - for i := range itr.buf { - // Skip empty buffers. - if itr.buf[i] == 0 { +// Next returns the next element which occurs in both iterators. +func (itr *seriesIntersectIterator) Next() (e SeriesElem) { + // Fill buffers. + if itr.buf[0] == nil { + itr.buf[0] = itr.itrs[0].Next() + } + if itr.buf[1] == nil { + itr.buf[1] = itr.itrs[1].Next() + } + + // Exit if either buffer is still empty. + if itr.buf[0] == nil || itr.buf[1] == nil { + return nil + } + + // Return lesser series. + if cmp := CompareSeriesElem(itr.buf[0], itr.buf[1]); cmp == -1 { + e, itr.buf[0] = itr.buf[0], nil + return e + } else if cmp == 1 { + e, itr.buf[1] = itr.buf[1], nil + return e + } + + // Merge series together if equal. + itr.e.SeriesElem = itr.buf[0] + + // Attach expression. + expr0 := itr.buf[0].Expr() + expr1 := itr.buf[0].Expr() + if expr0 == nil { + itr.e.expr = expr1 + } else if expr1 == nil { + itr.e.expr = expr0 + } else { + itr.e.expr = &influxql.BinaryExpr{ + Op: influxql.AND, + LHS: expr0, + RHS: expr1, + } + } + + itr.buf[0], itr.buf[1] = nil, nil + return &itr.e +} + +// UnionSeriesIterators returns an iterator that returns series from both +// both iterators. If both series have associated expressions then they are +// combined together. +func UnionSeriesIterators(itr0, itr1 SeriesIterator) SeriesIterator { + // Return other iterator if either one is nil. + if itr0 == nil { + return itr1 + } else if itr1 == nil { + return itr0 + } + + return &seriesUnionIterator{itrs: [2]SeriesIterator{itr0, itr1}} +} + +// seriesUnionIterator is an iterator that unions two iterators together. +type seriesUnionIterator struct { + e seriesExprElem + buf [2]SeriesElem + itrs [2]SeriesIterator +} + +// Next returns the next element which occurs in both iterators. +func (itr *seriesUnionIterator) Next() (e SeriesElem) { + // Fill buffers. + if itr.buf[0] == nil { + itr.buf[0] = itr.itrs[0].Next() + } + if itr.buf[1] == nil { + itr.buf[1] = itr.itrs[1].Next() + } + + // Return the other iterator if either one is empty. + if itr.buf[0] == nil { + e, itr.buf[1] = itr.buf[1], nil + return e + } else if itr.buf[1] == nil { + e, itr.buf[0] = itr.buf[0], nil + return e + } + + // Return lesser series. + if cmp := CompareSeriesElem(itr.buf[0], itr.buf[1]); cmp == -1 { + e, itr.buf[0] = itr.buf[0], nil + return e + } else if cmp == 1 { + e, itr.buf[1] = itr.buf[1], nil + return e + } + + // Attach element. + itr.e.SeriesElem = itr.buf[0] + + // Attach expression. + expr0 := itr.buf[0].Expr() + expr1 := itr.buf[0].Expr() + if expr0 == nil { + itr.e.expr = expr1 + } else if expr1 == nil { + itr.e.expr = expr0 + } else { + itr.e.expr = &influxql.BinaryExpr{ + Op: influxql.OR, + LHS: expr0, + RHS: expr1, + } + } + + itr.buf[0], itr.buf[1] = nil, nil + return &itr.e +} + +// DifferenceSeriesIterators returns an iterator that only returns series which +// occur the first iterator but not the second iterator. +func DifferenceSeriesIterators(itr0, itr1 SeriesIterator) SeriesIterator { + if itr0 != nil && itr1 == nil { + return itr0 + } else if itr0 == nil { + return nil + } + return &seriesDifferenceIterator{itrs: [2]SeriesIterator{itr0, itr1}} +} + +// seriesDifferenceIterator is an iterator that merges two iterators together. +type seriesDifferenceIterator struct { + buf [2]SeriesElem + itrs [2]SeriesIterator +} + +// Next returns the next element which occurs only in the first iterator. +func (itr *seriesDifferenceIterator) Next() (e SeriesElem) { + for { + // Fill buffers. + if itr.buf[0] == nil { + itr.buf[0] = itr.itrs[0].Next() + } + if itr.buf[1] == nil { + itr.buf[1] = itr.itrs[1].Next() + } + + // Exit if first buffer is still empty. + if itr.buf[0] == nil { + return nil + } else if itr.buf[1] == nil { + e, itr.buf[0] = itr.buf[0], nil + return e + } + + // Return first series if it's less. + // If second series is less then skip it. + // If both series are equal then skip both. + if cmp := CompareSeriesElem(itr.buf[0], itr.buf[1]); cmp == -1 { + e, itr.buf[0] = itr.buf[0], nil + return e + } else if cmp == 1 { + itr.buf[1] = nil + continue + } else { + itr.buf[0], itr.buf[1] = nil, nil continue } - - // If the name is not set the pick the first non-empty name. - if id == 0 || itr.buf[i] < id { - id = itr.buf[i] - } } +} - // Return zero if no elements remaining. - if id == 0 { - return 0 - } +// seriesExprElem holds a series and its associated filter expression. +type seriesExprElem struct { + SeriesElem + expr influxql.Expr +} - // Refill buffer. - for i := range itr.buf { - if itr.buf[i] == id { - itr.buf[i] = itr.itrs[i].next() - } +// Expr returns the associated expression. +func (e *seriesExprElem) Expr() influxql.Expr { return e.expr } + +// seriesExprIterator is an iterator that attaches an associated expression. +type seriesExprIterator struct { + itr SeriesIterator + e seriesExprElem +} + +// newSeriesExprIterator returns a new instance of seriesExprIterator. +func newSeriesExprIterator(itr SeriesIterator, expr influxql.Expr) *seriesExprIterator { + return &seriesExprIterator{ + itr: itr, + e: seriesExprElem{ + expr: expr, + }, } - return id +} + +// Next returns the next element in the iterator. +func (itr *seriesExprIterator) Next() SeriesElem { + itr.e.SeriesElem = itr.Next() + if itr.e.SeriesElem == nil { + return nil + } + return &itr.e +} + +// seriesIDIterator represents a iterator over a list of series ids. +type seriesIDIterator interface { + next() uint32 } // writeTo writes write v into w. Updates n. diff --git a/tsdb/engine/tsi1/tsi1_test.go b/tsdb/engine/tsi1/tsi1_test.go index 0ef77846c6..1826000637 100644 --- a/tsdb/engine/tsi1/tsi1_test.go +++ b/tsdb/engine/tsi1/tsi1_test.go @@ -5,6 +5,7 @@ import ( "reflect" "testing" + "github.com/influxdata/influxdb/influxql" "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/tsdb/engine/tsi1" ) @@ -274,11 +275,13 @@ type SeriesElem struct { name []byte tags models.Tags deleted bool + expr influxql.Expr } -func (e *SeriesElem) Name() []byte { return e.name } -func (e *SeriesElem) Tags() models.Tags { return e.tags } -func (e *SeriesElem) Deleted() bool { return e.deleted } +func (e *SeriesElem) Name() []byte { return e.name } +func (e *SeriesElem) Tags() models.Tags { return e.tags } +func (e *SeriesElem) Deleted() bool { return e.deleted } +func (e *SeriesElem) Expr() influxql.Expr { return e.expr } // SeriesIterator represents an iterator over a slice of tag values. type SeriesIterator struct { diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index f2af8e0ba5..cc2682d807 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -1351,24 +1351,13 @@ func (e *Engine) createVarRefIterator(opt influxql.IteratorOptions, aggregate bo var itrs []influxql.Iterator if err := func() error { - mByName, err := e.index.MeasurementsByName(influxql.Sources(opt.Sources).Names()) - if err != nil { - return err - } - mms := tsdb.Measurements(mByName) - - for _, mm := range mms { - // Determine tagsets for this measurement based on dimensions and filters. - tagSets, err := mm.TagSets(opt.Dimensions, opt.Condition) - if err != nil { - return err - } - - // Calculate tag sets and apply SLIMIT/SOFFSET. - tagSets = influxql.LimitTagSets(tagSets, opt.SLimit, opt.SOffset) + for _, name := range influxql.Sources(opt.Sources).Names() { + // Generate tag sets from index. + tagSets := e.index.TagSets(name, opt.Dimensions, opt.Condition, opt.SLimit, opt.SOffset) + // Create iterators for each tagset. for _, t := range tagSets { - inputs, err := e.createTagSetIterators(ref, mm, t, opt) + inputs, err := e.createTagSetIterators(ref, name, t, opt) if err != nil { return err } @@ -1390,7 +1379,7 @@ func (e *Engine) createVarRefIterator(opt influxql.IteratorOptions, aggregate bo } // createTagSetIterators creates a set of iterators for a tagset. -func (e *Engine) createTagSetIterators(ref *influxql.VarRef, mm *tsdb.Measurement, t *influxql.TagSet, opt influxql.IteratorOptions) ([]influxql.Iterator, error) { +func (e *Engine) createTagSetIterators(ref *influxql.VarRef, name string, t *influxql.TagSet, opt influxql.IteratorOptions) ([]influxql.Iterator, error) { // Set parallelism by number of logical cpus. parallelism := runtime.GOMAXPROCS(0) if parallelism > len(t.SeriesKeys) { @@ -1427,7 +1416,7 @@ func (e *Engine) createTagSetIterators(ref *influxql.VarRef, mm *tsdb.Measuremen wg.Add(1) go func(i int) { defer wg.Done() - groups[i].itrs, groups[i].err = e.createTagSetGroupIterators(ref, mm, groups[i].keys, t, groups[i].filters, opt) + groups[i].itrs, groups[i].err = e.createTagSetGroupIterators(ref, name, groups[i].keys, t, groups[i].filters, opt) }(i) } wg.Wait() @@ -1458,7 +1447,7 @@ func (e *Engine) createTagSetIterators(ref *influxql.VarRef, mm *tsdb.Measuremen } // createTagSetGroupIterators creates a set of iterators for a subset of a tagset's series. -func (e *Engine) createTagSetGroupIterators(ref *influxql.VarRef, mm *tsdb.Measurement, seriesKeys []string, t *influxql.TagSet, filters []influxql.Expr, opt influxql.IteratorOptions) ([]influxql.Iterator, error) { +func (e *Engine) createTagSetGroupIterators(ref *influxql.VarRef, name string, seriesKeys []string, t *influxql.TagSet, filters []influxql.Expr, opt influxql.IteratorOptions) ([]influxql.Iterator, error) { conditionFields := make([]influxql.VarRef, len(influxql.ExprNames(opt.Condition))) itrs := make([]influxql.Iterator, 0, len(seriesKeys)) @@ -1472,7 +1461,7 @@ func (e *Engine) createTagSetGroupIterators(ref *influxql.VarRef, mm *tsdb.Measu } } - itr, err := e.createVarRefSeriesIterator(ref, mm, seriesKey, t, filters[i], conditionFields[:fields], opt) + itr, err := e.createVarRefSeriesIterator(ref, name, seriesKey, t, filters[i], conditionFields[:fields], opt) if err != nil { return itrs, err } else if itr == nil { @@ -1484,7 +1473,7 @@ func (e *Engine) createTagSetGroupIterators(ref *influxql.VarRef, mm *tsdb.Measu } // createVarRefSeriesIterator creates an iterator for a variable reference for a series. -func (e *Engine) createVarRefSeriesIterator(ref *influxql.VarRef, mm *tsdb.Measurement, seriesKey string, t *influxql.TagSet, filter influxql.Expr, conditionFields []influxql.VarRef, opt influxql.IteratorOptions) (influxql.Iterator, error) { +func (e *Engine) createVarRefSeriesIterator(ref *influxql.VarRef, name string, seriesKey string, t *influxql.TagSet, filter influxql.Expr, conditionFields []influxql.VarRef, opt influxql.IteratorOptions) (influxql.Iterator, error) { tfs, err := e.index.TagsForSeries(seriesKey) if err != nil { return nil, err @@ -1504,7 +1493,7 @@ func (e *Engine) createVarRefSeriesIterator(ref *influxql.VarRef, mm *tsdb.Measu for i, ref := range opt.Aux { // Create cursor from field if a tag wasn't requested. if ref.Type != influxql.Tag { - cur := e.buildCursor(mm.Name, seriesKey, &ref, opt) + cur := e.buildCursor(name, seriesKey, &ref, opt) if cur != nil { aux[i] = newBufCursor(cur, opt.Ascending) continue @@ -1545,7 +1534,7 @@ func (e *Engine) createVarRefSeriesIterator(ref *influxql.VarRef, mm *tsdb.Measu for i, ref := range conditionFields { // Create cursor from field if a tag wasn't requested. if ref.Type != influxql.Tag { - cur := e.buildCursor(mm.Name, seriesKey, &ref, opt) + cur := e.buildCursor(name, seriesKey, &ref, opt) if cur != nil { conds[i] = newBufCursor(cur, opt.Ascending) continue @@ -1584,11 +1573,11 @@ func (e *Engine) createVarRefSeriesIterator(ref *influxql.VarRef, mm *tsdb.Measu // If it's only auxiliary fields then it doesn't matter what type of iterator we use. if ref == nil { - return newFloatIterator(mm.Name, tags, itrOpt, nil, aux, conds, condNames), nil + return newFloatIterator(name, tags, itrOpt, nil, aux, conds, condNames), nil } // Build main cursor. - cur := e.buildCursor(mm.Name, seriesKey, ref, opt) + cur := e.buildCursor(name, seriesKey, ref, opt) // If the field doesn't exist then don't build an iterator. if cur == nil { @@ -1597,13 +1586,13 @@ func (e *Engine) createVarRefSeriesIterator(ref *influxql.VarRef, mm *tsdb.Measu switch cur := cur.(type) { case floatCursor: - return newFloatIterator(mm.Name, tags, itrOpt, cur, aux, conds, condNames), nil + return newFloatIterator(name, tags, itrOpt, cur, aux, conds, condNames), nil case integerCursor: - return newIntegerIterator(mm.Name, tags, itrOpt, cur, aux, conds, condNames), nil + return newIntegerIterator(name, tags, itrOpt, cur, aux, conds, condNames), nil case stringCursor: - return newStringIterator(mm.Name, tags, itrOpt, cur, aux, conds, condNames), nil + return newStringIterator(name, tags, itrOpt, cur, aux, conds, condNames), nil case booleanCursor: - return newBooleanIterator(mm.Name, tags, itrOpt, cur, aux, conds, condNames), nil + return newBooleanIterator(name, tags, itrOpt, cur, aux, conds, condNames), nil default: panic("unreachable") } diff --git a/tsdb/index.go b/tsdb/index.go index d884d83362..ee483f2f0d 100644 --- a/tsdb/index.go +++ b/tsdb/index.go @@ -12,22 +12,22 @@ type Index interface { Open() error Close() error - CreateMeasurementIndexIfNotExists(name string) (*Measurement, error) + CreateMeasurementIndexIfNotExists(name []byte) (*Measurement, error) Measurement(name []byte) (*Measurement, error) Measurements() (Measurements, error) MeasurementsByExpr(expr influxql.Expr) (Measurements, bool, error) - MeasurementsByName(names []string) ([]*Measurement, error) + MeasurementsByName(names [][]byte) ([]*Measurement, error) MeasurementsByRegex(re *regexp.Regexp) (Measurements, error) DropMeasurement(name []byte) error - CreateSeriesIndexIfNotExists(measurement string, series *Series) (*Series, error) + CreateSeriesIndexIfNotExists(measurement []byte, series *Series) (*Series, error) Series(key []byte) (*Series, error) - DropSeries(keys []string) error + DropSeries(keys [][]byte) error SeriesN() (uint64, error) SeriesSketches() (estimator.Sketch, estimator.Sketch, error) MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error) - TagsForSeries(key string) (models.Tags, error) + TagsForSeries(key []byte) (models.Tags, error) Dereference(b []byte) } diff --git a/tsdb/meta.go b/tsdb/meta.go index e84df8c9fc..54d69bc9e0 100644 --- a/tsdb/meta.go +++ b/tsdb/meta.go @@ -692,15 +692,6 @@ func (m *Measurement) DropSeries(series *Series) { return } -// filters walks the where clause of a select statement and returns a map with all series ids -// matching the where clause and any filter expression that should be applied to each. -func (m *Measurement) filters(condition influxql.Expr) ([]uint64, map[uint64]influxql.Expr, error) { - if condition == nil || influxql.OnlyTimeExpr(condition) { - return m.seriesIDs, nil, nil - } - return m.walkWhereForSeriesIds(condition) -} - // TagSets returns the unique tag sets that exist for the given tag keys. This // is used to determine what composite series will be created by a group by. // @@ -713,194 +704,7 @@ func (m *Measurement) filters(condition influxql.Expr) ([]uint64, map[uint64]inf // this shouldn't be exported. However, until tx.go and the engine get // refactored into tsdb, we need it. func (m *Measurement) TagSets(dimensions []string, condition influxql.Expr) ([]*influxql.TagSet, error) { - m.mu.RLock() - - // get the unique set of series ids and the filters that should be applied to each - ids, filters, err := m.filters(condition) - if err != nil { - m.mu.RUnlock() - return nil, err - } - - // For every series, get the tag values for the requested tag keys i.e. dimensions. This is the - // TagSet for that series. Series with the same TagSet are then grouped together, because for the - // purpose of GROUP BY they are part of the same composite series. - tagSets := make(map[string]*influxql.TagSet, 64) - for _, id := range ids { - s := m.seriesByID[id] - tags := make(map[string]string, len(dimensions)) - - // Build the TagSet for this series. - for _, dim := range dimensions { - tags[dim] = s.Tags.GetString(dim) - } - // Convert the TagSet to a string, so it can be added to a map allowing TagSets to be handled - // as a set. - tagsAsKey := MarshalTags(tags) - tagSet, ok := tagSets[string(tagsAsKey)] - if !ok { - // This TagSet is new, create a new entry for it. - tagSet = &influxql.TagSet{ - Tags: tags, - Key: tagsAsKey, - } - } - // Associate the series and filter with the Tagset. - tagSet.AddFilter(m.seriesByID[id].Key, filters[id]) - - // Ensure it's back in the map. - tagSets[string(tagsAsKey)] = tagSet - } - // Release the lock while we sort all the tags - m.mu.RUnlock() - - // Sort the series in each tag set. - for _, t := range tagSets { - sort.Sort(t) - } - - // The TagSets have been created, as a map of TagSets. Just send - // the values back as a slice, sorting for consistency. - sortedTagsSets := make([]*influxql.TagSet, 0, len(tagSets)) - for _, v := range tagSets { - sortedTagsSets = append(sortedTagsSets, v) - } - sort.Sort(byTagKey(sortedTagsSets)) - - return sortedTagsSets, nil -} - -// intersectSeriesFilters performs an intersection for two sets of ids and filter expressions. -func intersectSeriesFilters(lids, rids SeriesIDs, lfilters, rfilters FilterExprs) (SeriesIDs, FilterExprs) { - // We only want to allocate a slice and map of the smaller size. - var ids []uint64 - if len(lids) > len(rids) { - ids = make([]uint64, 0, len(rids)) - } else { - ids = make([]uint64, 0, len(lids)) - } - - var filters FilterExprs - if len(lfilters) > len(rfilters) { - filters = make(FilterExprs, len(rfilters)) - } else { - filters = make(FilterExprs, len(lfilters)) - } - - // They're in sorted order so advance the counter as needed. - // This is, don't run comparisons against lower values that we've already passed. - for len(lids) > 0 && len(rids) > 0 { - lid, rid := lids[0], rids[0] - if lid == rid { - ids = append(ids, lid) - - var expr influxql.Expr - lfilter := lfilters[lid] - rfilter := rfilters[rid] - - if lfilter != nil && rfilter != nil { - be := &influxql.BinaryExpr{ - Op: influxql.AND, - LHS: lfilter, - RHS: rfilter, - } - expr = influxql.Reduce(be, nil) - } else if lfilter != nil { - expr = lfilter - } else if rfilter != nil { - expr = rfilter - } - - if expr != nil { - filters[lid] = expr - } - lids, rids = lids[1:], rids[1:] - } else if lid < rid { - lids = lids[1:] - } else { - rids = rids[1:] - } - } - return ids, filters -} - -// unionSeriesFilters performs a union for two sets of ids and filter expressions. -func unionSeriesFilters(lids, rids SeriesIDs, lfilters, rfilters FilterExprs) (SeriesIDs, FilterExprs) { - ids := make([]uint64, 0, len(lids)+len(rids)) - - // Setup the filters with the smallest size since we will discard filters - // that do not have a match on the other side. - var filters FilterExprs - if len(lfilters) < len(rfilters) { - filters = make(FilterExprs, len(lfilters)) - } else { - filters = make(FilterExprs, len(rfilters)) - } - - for len(lids) > 0 && len(rids) > 0 { - lid, rid := lids[0], rids[0] - if lid == rid { - ids = append(ids, lid) - - // If one side does not have a filter, then the series has been - // included on one side of the OR with no condition. Eliminate the - // filter in this case. - var expr influxql.Expr - lfilter := lfilters[lid] - rfilter := rfilters[rid] - if lfilter != nil && rfilter != nil { - be := &influxql.BinaryExpr{ - Op: influxql.OR, - LHS: lfilter, - RHS: rfilter, - } - expr = influxql.Reduce(be, nil) - } - - if expr != nil { - filters[lid] = expr - } - lids, rids = lids[1:], rids[1:] - } else if lid < rid { - ids = append(ids, lid) - - filter := lfilters[lid] - if filter != nil { - filters[lid] = filter - } - lids = lids[1:] - } else { - ids = append(ids, rid) - - filter := rfilters[rid] - if filter != nil { - filters[rid] = filter - } - rids = rids[1:] - } - } - - // Now append the remainder. - if len(lids) > 0 { - for i := 0; i < len(lids); i++ { - ids = append(ids, lids[i]) - - filter := lfilters[lids[i]] - if filter != nil { - filters[lids[i]] = filter - } - } - } else if len(rids) > 0 { - for i := 0; i < len(rids); i++ { - ids = append(ids, rids[i]) - - filter := rfilters[rids[i]] - if filter != nil { - filters[rids[i]] = filter - } - } - } - return ids, filters + panic("MOVED") } // IDsForExpr returns the series IDs that are candidates to match the given expression. @@ -1096,72 +900,6 @@ func (fe FilterExprs) Len() int { return len(fe) } -// walkWhereForSeriesIds recursively walks the WHERE clause and returns an -// ordered set of series IDs and a map from those series IDs to filter -// expressions that should be used to limit points returned in the final query -// result. -func (m *Measurement) walkWhereForSeriesIds(expr influxql.Expr) (SeriesIDs, FilterExprs, error) { - switch n := expr.(type) { - case *influxql.BinaryExpr: - switch n.Op { - case influxql.EQ, influxql.NEQ, influxql.LT, influxql.LTE, influxql.GT, influxql.GTE, influxql.EQREGEX, influxql.NEQREGEX: - // Get the series IDs and filter expression for the tag or field comparison. - ids, expr, err := m.idsForExpr(n) - if err != nil { - return nil, nil, err - } - - if len(ids) == 0 { - return ids, nil, nil - } - - // If the expression is a boolean literal that is true, ignore it. - if b, ok := expr.(*influxql.BooleanLiteral); ok && b.Val { - expr = nil - } - - var filters FilterExprs - if expr != nil { - filters = make(FilterExprs, len(ids)) - for _, id := range ids { - filters[id] = expr - } - } - - return ids, filters, nil - case influxql.AND, influxql.OR: - // Get the series IDs and filter expressions for the LHS. - lids, lfilters, err := m.walkWhereForSeriesIds(n.LHS) - if err != nil { - return nil, nil, err - } - - // Get the series IDs and filter expressions for the RHS. - rids, rfilters, err := m.walkWhereForSeriesIds(n.RHS) - if err != nil { - return nil, nil, err - } - - // Combine the series IDs from the LHS and RHS. - if n.Op == influxql.AND { - ids, filters := intersectSeriesFilters(lids, rids, lfilters, rfilters) - return ids, filters, nil - } else { - ids, filters := unionSeriesFilters(lids, rids, lfilters, rfilters) - return ids, filters, nil - } - } - - ids, _, err := m.idsForExpr(n) - return ids, nil, err - case *influxql.ParenExpr: - // walk down the tree - return m.walkWhereForSeriesIds(n.Expr) - default: - return nil, nil, nil - } -} - // expandExpr returns a list of expressions expanded by all possible tag // combinations. func (m *Measurement) expandExpr(expr influxql.Expr) []tagSetExpr { @@ -1220,6 +958,7 @@ func expandExprWithValues(expr influxql.Expr, keys []string, tagExprs []tagExpr, return exprs } +<<<<<<< HEAD // SeriesIDsAllOrByExpr walks an expressions for matching series IDs // or, if no expression is given, returns all series IDs for the measurement. func (m *Measurement) SeriesIDsAllOrByExpr(expr influxql.Expr) (SeriesIDs, error) { @@ -1246,6 +985,8 @@ func (m *Measurement) seriesIDsAllOrByExpr(expr influxql.Expr) (SeriesIDs, error return ids, nil } +======= +>>>>>>> df7cec1... intermediate // tagKeysByExpr extracts the tag keys wanted by the expression. func (m *Measurement) TagKeysByExpr(expr influxql.Expr) (stringSet, bool, error) { switch e := expr.(type) { diff --git a/tsdb/shard.go b/tsdb/shard.go index 16220c448a..edb4b2a3d8 100644 --- a/tsdb/shard.go +++ b/tsdb/shard.go @@ -10,7 +10,6 @@ import ( "os" "path/filepath" "sort" - "strings" "sync" "sync/atomic" "time" @@ -1227,29 +1226,33 @@ func (itr *seriesIterator) Next() (*influxql.FloatPoint, error) { // nextKeys reads all keys for the next measurement. func (itr *seriesIterator) nextKeys() error { - for { - // Ensure previous keys are cleared out. - itr.keys.i, itr.keys.buf = 0, itr.keys.buf[:0] + panic("MOVE TO TSI") + + /* + for { + // Ensure previous keys are cleared out. + itr.keys.i, itr.keys.buf = 0, itr.keys.buf[:0] + + // Read next measurement. + if len(itr.mms) == 0 { + return nil + } + mm := itr.mms[0] + itr.mms = itr.mms[1:] + + // Read all series keys. + ids, err := mm.seriesIDsAllOrByExpr(itr.opt.Condition) + if err != nil { + return err + } else if len(ids) == 0 { + continue + } + itr.keys.buf = mm.AppendSeriesKeysByID(itr.keys.buf, ids) + sort.Strings(itr.keys.buf) - // Read next measurement. - if len(itr.mms) == 0 { return nil } - mm := itr.mms[0] - itr.mms = itr.mms[1:] - - // Read all series keys. - ids, err := mm.seriesIDsAllOrByExpr(itr.opt.Condition) - if err != nil { - return err - } else if len(ids) == 0 { - continue - } - itr.keys.buf = mm.AppendSeriesKeysByID(itr.keys.buf, ids) - sort.Strings(itr.keys.buf) - - return nil - } + */ } // NewTagKeysIterator returns a new instance of TagKeysIterator. @@ -1273,82 +1276,86 @@ type tagValuesIterator struct { // NewTagValuesIterator returns a new instance of TagValuesIterator. func NewTagValuesIterator(sh *Shard, opt influxql.IteratorOptions) (influxql.Iterator, error) { - if opt.Condition == nil { - return nil, errors.New("a condition is required") - } + panic("MOVE") - measurementExpr := influxql.CloneExpr(opt.Condition) - measurementExpr = influxql.Reduce(influxql.RewriteExpr(measurementExpr, func(e influxql.Expr) influxql.Expr { - switch e := e.(type) { - case *influxql.BinaryExpr: - switch e.Op { - case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX: - tag, ok := e.LHS.(*influxql.VarRef) - if !ok || tag.Val != "_name" { - return nil + /* + if opt.Condition == nil { + return nil, errors.New("a condition is required") + } + + measurementExpr := influxql.CloneExpr(opt.Condition) + measurementExpr = influxql.Reduce(influxql.RewriteExpr(measurementExpr, func(e influxql.Expr) influxql.Expr { + switch e := e.(type) { + case *influxql.BinaryExpr: + switch e.Op { + case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX: + tag, ok := e.LHS.(*influxql.VarRef) + if !ok || tag.Val != "_name" { + return nil + } } } - } - return e - }), nil) + return e + }), nil) - mms, ok, err := sh.engine.MeasurementsByExpr(measurementExpr) - if err != nil { - return nil, err - } else if !ok { - if mms, err = sh.engine.Measurements(); err != nil { - return nil, err - } - sort.Sort(mms) - } - - // If there are no measurements, return immediately. - if len(mms) == 0 { - return &tagValuesIterator{}, nil - } - - filterExpr := influxql.CloneExpr(opt.Condition) - filterExpr = influxql.Reduce(influxql.RewriteExpr(filterExpr, func(e influxql.Expr) influxql.Expr { - switch e := e.(type) { - case *influxql.BinaryExpr: - switch e.Op { - case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX: - tag, ok := e.LHS.(*influxql.VarRef) - if !ok || strings.HasPrefix(tag.Val, "_") { - return nil - } - } - } - return e - }), nil) - - var series []*Series - keys := newStringSet() - for _, mm := range mms { - ss, ok, err := mm.TagKeysByExpr(opt.Condition) + mms, ok, err := sh.engine.MeasurementsByExpr(measurementExpr) if err != nil { return nil, err } else if !ok { - keys.add(mm.TagKeys()...) - } else { - keys = keys.union(ss) + if mms, err = sh.engine.Measurements(); err != nil { + return nil, err + } + sort.Sort(mms) } - ids, err := mm.seriesIDsAllOrByExpr(filterExpr) - if err != nil { - return nil, err + // If there are no measurements, return immediately. + if len(mms) == 0 { + return &tagValuesIterator{}, nil } - for _, id := range ids { - series = append(series, mm.SeriesByID(id)) - } - } + filterExpr := influxql.CloneExpr(opt.Condition) + filterExpr = influxql.Reduce(influxql.RewriteExpr(filterExpr, func(e influxql.Expr) influxql.Expr { + switch e := e.(type) { + case *influxql.BinaryExpr: + switch e.Op { + case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX: + tag, ok := e.LHS.(*influxql.VarRef) + if !ok || strings.HasPrefix(tag.Val, "_") { + return nil + } + } + } + return e + }), nil) - return &tagValuesIterator{ - series: series, - keys: keys.list(), - fields: influxql.VarRefs(opt.Aux).Strings(), - }, nil + var series []*Series + keys := newStringSet() + for _, mm := range mms { + ss, ok, err := mm.TagKeysByExpr(opt.Condition) + if err != nil { + return nil, err + } else if !ok { + keys.add(mm.TagKeys()...) + } else { + keys = keys.union(ss) + } + + ids, err := mm.seriesIDsAllOrByExpr(filterExpr) + if err != nil { + return nil, err + } + + for _, id := range ids { + series = append(series, mm.SeriesByID(id)) + } + } + + return &tagValuesIterator{ + series: series, + keys: keys.list(), + fields: influxql.VarRefs(opt.Aux).Strings(), + }, nil + */ } // Stats returns stats about the points processed. diff --git a/tsdb/store.go b/tsdb/store.go index c6ff0fe996..60677154bd 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -10,7 +10,6 @@ import ( "runtime" "sort" "strconv" - "strings" "sync" "time" @@ -696,78 +695,82 @@ func (s *Store) ShardRelativePath(id uint64) (string, error) { // DeleteSeries loops through the local shards and deletes the series data for // the passed in series keys. func (s *Store) DeleteSeries(database string, sources []influxql.Source, condition influxql.Expr) error { - // Expand regex expressions in the FROM clause. - a, err := s.ExpandSources(sources) - if err != nil { - return err - } else if sources != nil && len(sources) != 0 && len(a) == 0 { - return nil - } - sources = a + panic("MOVE TO TSI") - // Determine deletion time range. - min, max, err := influxql.TimeRangeAsEpochNano(condition) - if err != nil { - return err - } - - s.mu.RLock() - shards := s.filterShards(byDatabase(database)) - s.mu.RUnlock() - - mMap := make(map[string]*Measurement) - for _, shard := range shards { - shardMeasures := shard.Measurements() - for _, m := range shardMeasures { - mMap[m.Name] = m + /* + // Expand regex expressions in the FROM clause. + a, err := s.ExpandSources(sources) + if err != nil { + return err + } else if sources != nil && len(sources) != 0 && len(a) == 0 { + return nil } - } + sources = a - s.mu.RLock() - defer s.mu.RUnlock() - - measurements, err := measurementsFromSourcesOrDB(mMap, sources...) - if err != nil { - return err - } - - var seriesKeys [][]byte - for _, m := range measurements { - var ids SeriesIDs - var filters FilterExprs - if condition != nil { - // Get series IDs that match the WHERE clause. - ids, filters, err = m.walkWhereForSeriesIds(condition) - if err != nil { - return err - } - - // Delete boolean literal true filter expressions. - // These are returned for `WHERE tagKey = 'tagVal'` type expressions and are okay. - filters.DeleteBoolLiteralTrues() - - // Check for unsupported field filters. - // Any remaining filters means there were fields (e.g., `WHERE value = 1.2`). - if filters.Len() > 0 { - return errors.New("fields not supported in WHERE clause during deletion") - } - } else { - // No WHERE clause so get all series IDs for this measurement. - ids = m.seriesIDs - } - - for _, id := range ids { - seriesKeys = append(seriesKeys, []byte(m.seriesByID[id].Key)) - } - } - - // delete the raw series data. - return s.walkShards(shards, func(sh *Shard) error { - if err := sh.DeleteSeriesRange(seriesKeys, min, max); err != nil { + // Determine deletion time range. + min, max, err := influxql.TimeRangeAsEpochNano(condition) + if err != nil { return err } - return nil - }) + + s.mu.RLock() + shards := s.filterShards(byDatabase(database)) + s.mu.RUnlock() + + mMap := make(map[string]*Measurement) + for _, shard := range shards { + shardMeasures := shard.Measurements() + for _, m := range shardMeasures { + mMap[m.Name] = m + } + } + + s.mu.RLock() + defer s.mu.RUnlock() + + measurements, err := measurementsFromSourcesOrDB(mMap, sources...) + if err != nil { + return err + } + + var seriesKeys [][]byte + for _, m := range measurements { + var ids SeriesIDs + var filters FilterExprs + if condition != nil { + // Get series IDs that match the WHERE clause. + ids, filters, err = m.walkWhereForSeriesIds(condition) + if err != nil { + return err + } + + // Delete boolean literal true filter expressions. + // These are returned for `WHERE tagKey = 'tagVal'` type expressions and are okay. + filters.DeleteBoolLiteralTrues() + + // Check for unsupported field filters. + // Any remaining filters means there were fields (e.g., `WHERE value = 1.2`). + if filters.Len() > 0 { + return errors.New("fields not supported in WHERE clause during deletion") + } + } else { + // No WHERE clause so get all series IDs for this measurement. + ids = m.seriesIDs + } + + for _, id := range ids { + seriesKeys = append(seriesKeys, []byte(m.seriesByID[id].Key)) + } + } + + // delete the raw series data. + return s.walkShards(shards, func(sh *Shard) error { + if err := sh.DeleteSeriesRange(seriesKeys, min, max); err != nil { + return err + } + return nil + }) + */ } // ExpandSources expands sources against all local shards. @@ -880,110 +883,114 @@ type TagValues struct { // TagValues returns the tag keys and values in the given database, matching the condition. func (s *Store) TagValues(database string, cond influxql.Expr) ([]TagValues, error) { - if cond == nil { - return nil, errors.New("a condition is required") - } + panic("MOVE TO TSI") - measurementExpr := influxql.CloneExpr(cond) - measurementExpr = influxql.Reduce(influxql.RewriteExpr(measurementExpr, func(e influxql.Expr) influxql.Expr { - switch e := e.(type) { - case *influxql.BinaryExpr: - switch e.Op { - case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX: - tag, ok := e.LHS.(*influxql.VarRef) - if !ok || tag.Val != "_name" { - return nil + /* + if cond == nil { + return nil, errors.New("a condition is required") + } + + measurementExpr := influxql.CloneExpr(cond) + measurementExpr = influxql.Reduce(influxql.RewriteExpr(measurementExpr, func(e influxql.Expr) influxql.Expr { + switch e := e.(type) { + case *influxql.BinaryExpr: + switch e.Op { + case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX: + tag, ok := e.LHS.(*influxql.VarRef) + if !ok || tag.Val != "_name" { + return nil + } } } - } - return e - }), nil) + return e + }), nil) - // Get all measurements for the shards we're interested in. - s.mu.RLock() - shards := s.filterShards(byDatabase(database)) - s.mu.RUnlock() + // Get all measurements for the shards we're interested in. + s.mu.RLock() + shards := s.filterShards(byDatabase(database)) + s.mu.RUnlock() - var measures Measurements - for _, sh := range shards { - mms, ok, err := sh.MeasurementsByExpr(measurementExpr) - if err != nil { - return nil, err - } else if !ok { - // TODO(edd): can we simplify this so we don't have to check the - // ok value, and we can call sh.measurements with a shard filter - // instead? - mms = sh.Measurements() + var measures Measurements + for _, sh := range shards { + mms, ok, err := sh.MeasurementsByExpr(measurementExpr) + if err != nil { + return nil, err + } else if !ok { + // TODO(edd): can we simplify this so we don't have to check the + // ok value, and we can call sh.measurements with a shard filter + // instead? + mms = sh.Measurements() + } + + measures = append(measures, mms...) } - measures = append(measures, mms...) - } + // If there are no measurements, return immediately. + if len(measures) == 0 { + return nil, nil + } + sort.Sort(measures) - // If there are no measurements, return immediately. - if len(measures) == 0 { - return nil, nil - } - sort.Sort(measures) - - filterExpr := influxql.CloneExpr(cond) - filterExpr = influxql.Reduce(influxql.RewriteExpr(filterExpr, func(e influxql.Expr) influxql.Expr { - switch e := e.(type) { - case *influxql.BinaryExpr: - switch e.Op { - case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX: - tag, ok := e.LHS.(*influxql.VarRef) - if !ok || strings.HasPrefix(tag.Val, "_") { - return nil + filterExpr := influxql.CloneExpr(cond) + filterExpr = influxql.Reduce(influxql.RewriteExpr(filterExpr, func(e influxql.Expr) influxql.Expr { + switch e := e.(type) { + case *influxql.BinaryExpr: + switch e.Op { + case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX: + tag, ok := e.LHS.(*influxql.VarRef) + if !ok || strings.HasPrefix(tag.Val, "_") { + return nil + } } } - } - return e - }), nil) + return e + }), nil) - tagValues := make([]TagValues, len(measures)) - for i, mm := range measures { - tagValues[i].Measurement = mm.Name + tagValues := make([]TagValues, len(measures)) + for i, mm := range measures { + tagValues[i].Measurement = mm.Name - ids, err := mm.SeriesIDsAllOrByExpr(filterExpr) - if err != nil { - return nil, err - } - ss := mm.SeriesByIDSlice(ids) - - // Determine a list of keys from condition. - keySet, ok, err := mm.TagKeysByExpr(cond) - if err != nil { - return nil, err - } - - // Loop over all keys for each series. - m := make(map[KeyValue]struct{}, len(ss)) - for _, series := range ss { - for _, t := range series.Tags { - if !ok { - // nop - } else if _, exists := keySet[string(t.Key)]; !exists { - continue - } - m[KeyValue{string(t.Key), string(t.Value)}] = struct{}{} + ids, err := mm.SeriesIDsAllOrByExpr(filterExpr) + if err != nil { + return nil, err } + ss := mm.SeriesByIDSlice(ids) + + // Determine a list of keys from condition. + keySet, ok, err := mm.TagKeysByExpr(cond) + if err != nil { + return nil, err + } + + // Loop over all keys for each series. + m := make(map[KeyValue]struct{}, len(ss)) + for _, series := range ss { + for _, t := range series.Tags { + if !ok { + // nop + } else if _, exists := keySet[string(t.Key)]; !exists { + continue + } + m[KeyValue{string(t.Key), string(t.Value)}] = struct{}{} + } + } + + // Return an empty slice if there are no key/value matches. + if len(m) == 0 { + continue + } + + // Sort key/value set. + a := make([]KeyValue, 0, len(m)) + for kv := range m { + a = append(a, kv) + } + sort.Sort(KeyValues(a)) + tagValues[i].Values = a } - // Return an empty slice if there are no key/value matches. - if len(m) == 0 { - continue - } - - // Sort key/value set. - a := make([]KeyValue, 0, len(m)) - for kv := range m { - a = append(a, kv) - } - sort.Sort(KeyValues(a)) - tagValues[i].Values = a - } - - return tagValues, nil + return tagValues, nil + */ } // KeyValue holds a string key and a string value.