intermediate
parent
8863e3c0f3
commit
5f5b02e052
|
@ -1,67 +0,0 @@
|
|||
package inmem
|
||||
|
||||
import (
|
||||
"sort"
|
||||
|
||||
"github.com/influxdata/influxdb/models"
|
||||
)
|
||||
|
||||
// Index represents an in-memory index.
|
||||
type Index struct {
|
||||
series Series
|
||||
measurements map[string]Measurement
|
||||
}
|
||||
|
||||
// NewIndex returns a new instance of Index.
|
||||
func NewIndex() *Index {
|
||||
return &Index{
|
||||
measurements: make(map[string]Measurement),
|
||||
}
|
||||
}
|
||||
|
||||
// MeasurementNames returns a sorted list of measurement names.
|
||||
func (i *Index) MeasurementNames() []string {
|
||||
a := make([]string, 0, len(m))
|
||||
for name := range m {
|
||||
a = append(a, name)
|
||||
}
|
||||
sort.Strings(a)
|
||||
return a
|
||||
}
|
||||
|
||||
// Measurement represents a measurement in the index.
|
||||
type Measurement struct {
|
||||
Name []byte
|
||||
Deleted bool
|
||||
TagSet TagSet
|
||||
}
|
||||
|
||||
// TagSet represents a collection of tags.
|
||||
type TagSet map[string]Tag
|
||||
|
||||
// Tag represents a tag key and its associated values.
|
||||
type Tag struct {
|
||||
Name []byte
|
||||
Deleted bool
|
||||
Values TagValues
|
||||
}
|
||||
|
||||
// TagValue represents a collection of tag values.
|
||||
type TagValues map[string]TagValue
|
||||
|
||||
// TagValue represents a single tag value and its associated series.
|
||||
type TagValue struct {
|
||||
Name []byte
|
||||
Deleted bool
|
||||
Series []Serie
|
||||
}
|
||||
|
||||
// Series represents a sorted list of serie.
|
||||
type Series []Serie
|
||||
|
||||
// Serie represents an individual series.
|
||||
type Serie struct {
|
||||
Name []byte
|
||||
Tags models.Tags
|
||||
Deleted bool
|
||||
}
|
|
@ -0,0 +1,473 @@
|
|||
package tsi1
|
||||
|
||||
/*
|
||||
import (
|
||||
"github.com/influxdata/influxdb/influxql"
|
||||
)
|
||||
|
||||
// TagSets returns the unique tag sets that exist for the given tag keys. This
|
||||
// is used to determine what composite series will be created by a group by.
|
||||
//
|
||||
// i.e. "group by region" should return: {"region":"uswest"},
|
||||
// {"region":"useast"} or region, service returns {"region": "uswest",
|
||||
// "service": "redis"}, {"region": "uswest", "service": "mysql"}, etc...
|
||||
//
|
||||
// This will also populate the TagSet objects with the series IDs that match
|
||||
// each tagset and any influx filter expression that goes with the series TODO:
|
||||
// this shouldn't be exported. However, until tx.go and the engine get
|
||||
// refactored into tsdb, we need it.
|
||||
func (m *Measurement) TagSets(dimensions []string, condition influxql.Expr) ([]*influxql.TagSet, error) {
|
||||
m.mu.RLock()
|
||||
|
||||
// TODO(benbjohnson):
|
||||
// Iterators are needed at the series id level and the series level. The
|
||||
// series id will allow us to union faster. We can't intersect at the
|
||||
// series id level because that could remove series which would intersect
|
||||
// at a higher cross-file level.
|
||||
//
|
||||
// - IndexFile.SeriesIteratorByExpr(condition)
|
||||
// - LogFile.SeriesIteratorByExpr(condition)
|
||||
//
|
||||
// - UnionSeriesIterators()
|
||||
// - IntersectSeriesIterators()
|
||||
// - unionSeriesIDIterators()
|
||||
|
||||
// TODO(benbjohnson):
|
||||
// Create series iterator based on condition. If condition is time-only
|
||||
// the return all measurement series ids. Otherwise walk condition and merge
|
||||
// series via walkWhereForSeriesIds()/idsForExpr().
|
||||
|
||||
// get the unique set of series ids and the filters that should be applied to each
|
||||
ids, filters, err := m.filters(condition)
|
||||
if err != nil {
|
||||
m.mu.RUnlock()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// TODO(benbjohnson):
|
||||
// Iterate over each series and build tagsets with dimensions.
|
||||
// Limit and offset as needed.
|
||||
|
||||
// For every series, get the tag values for the requested tag keys i.e. dimensions. This is the
|
||||
// TagSet for that series. Series with the same TagSet are then grouped together, because for the
|
||||
// purpose of GROUP BY they are part of the same composite series.
|
||||
tagSets := make(map[string]*influxql.TagSet, 64)
|
||||
for _, id := range ids {
|
||||
s := m.seriesByID[id]
|
||||
tags := make(map[string]string, len(dimensions))
|
||||
|
||||
// Build the TagSet for this series.
|
||||
for _, dim := range dimensions {
|
||||
tags[dim] = s.Tags.GetString(dim)
|
||||
}
|
||||
// Convert the TagSet to a string, so it can be added to a map allowing TagSets to be handled
|
||||
// as a set.
|
||||
tagsAsKey := MarshalTags(tags)
|
||||
tagSet, ok := tagSets[string(tagsAsKey)]
|
||||
if !ok {
|
||||
// This TagSet is new, create a new entry for it.
|
||||
tagSet = &influxql.TagSet{
|
||||
Tags: tags,
|
||||
Key: tagsAsKey,
|
||||
}
|
||||
}
|
||||
// Associate the series and filter with the Tagset.
|
||||
tagSet.AddFilter(m.seriesByID[id].Key, filters[id])
|
||||
|
||||
// Ensure it's back in the map.
|
||||
tagSets[string(tagsAsKey)] = tagSet
|
||||
}
|
||||
// Release the lock while we sort all the tags
|
||||
m.mu.RUnlock()
|
||||
|
||||
// Sort the series in each tag set.
|
||||
for _, t := range tagSets {
|
||||
sort.Sort(t)
|
||||
}
|
||||
|
||||
// The TagSets have been created, as a map of TagSets. Just send
|
||||
// the values back as a slice, sorting for consistency.
|
||||
sortedTagsSets := make([]*influxql.TagSet, 0, len(tagSets))
|
||||
for _, v := range tagSets {
|
||||
sortedTagsSets = append(sortedTagsSets, v)
|
||||
}
|
||||
sort.Sort(byTagKey(sortedTagsSets))
|
||||
|
||||
return sortedTagsSets, nil
|
||||
}
|
||||
|
||||
// filters walks the where clause of a select statement and returns a map with all series ids
|
||||
// matching the where clause and any filter expression that should be applied to each
|
||||
func (m *Measurement) filters(condition influxql.Expr) ([]uint64, map[uint64]influxql.Expr, error) {
|
||||
if condition == nil || influxql.OnlyTimeExpr(condition) {
|
||||
return m.seriesIDs, nil, nil
|
||||
}
|
||||
return m.walkWhereForSeriesIds(condition)
|
||||
}
|
||||
|
||||
// walkWhereForSeriesIds recursively walks the WHERE clause and returns an
|
||||
// ordered set of series IDs and a map from those series IDs to filter
|
||||
// expressions that should be used to limit points returned in the final query
|
||||
// result.
|
||||
func (m *Measurement) walkWhereForSeriesIds(expr influxql.Expr) (SeriesIDs, FilterExprs, error) {
|
||||
switch n := expr.(type) {
|
||||
case *influxql.BinaryExpr:
|
||||
switch n.Op {
|
||||
case influxql.EQ, influxql.NEQ, influxql.LT, influxql.LTE, influxql.GT, influxql.GTE, influxql.EQREGEX, influxql.NEQREGEX:
|
||||
// Get the series IDs and filter expression for the tag or field comparison.
|
||||
ids, expr, err := m.idsForExpr(n)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
if len(ids) == 0 {
|
||||
return ids, nil, nil
|
||||
}
|
||||
|
||||
// If the expression is a boolean literal that is true, ignore it.
|
||||
if b, ok := expr.(*influxql.BooleanLiteral); ok && b.Val {
|
||||
expr = nil
|
||||
}
|
||||
|
||||
var filters FilterExprs
|
||||
if expr != nil {
|
||||
filters = make(FilterExprs, len(ids))
|
||||
for _, id := range ids {
|
||||
filters[id] = expr
|
||||
}
|
||||
}
|
||||
|
||||
return ids, filters, nil
|
||||
case influxql.AND, influxql.OR:
|
||||
// Get the series IDs and filter expressions for the LHS.
|
||||
lids, lfilters, err := m.walkWhereForSeriesIds(n.LHS)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
// Get the series IDs and filter expressions for the RHS.
|
||||
rids, rfilters, err := m.walkWhereForSeriesIds(n.RHS)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
// Combine the series IDs from the LHS and RHS.
|
||||
if n.Op == influxql.AND {
|
||||
ids, filters := intersectSeriesFilters(lids, rids, lfilters, rfilters)
|
||||
return ids, filters, nil
|
||||
} else {
|
||||
ids, filters := unionSeriesFilters(lids, rids, lfilters, rfilters)
|
||||
return ids, filters, nil
|
||||
}
|
||||
}
|
||||
|
||||
ids, _, err := m.idsForExpr(n)
|
||||
return ids, nil, err
|
||||
case *influxql.ParenExpr:
|
||||
// walk down the tree
|
||||
return m.walkWhereForSeriesIds(n.Expr)
|
||||
default:
|
||||
return nil, nil, nil
|
||||
}
|
||||
}
|
||||
|
||||
// intersectSeriesFilters performs an intersection for two sets of ids and filter expressions.
|
||||
func intersectSeriesFilters(lids, rids SeriesIDs, lfilters, rfilters FilterExprs) (SeriesIDs, FilterExprs) {
|
||||
// We only want to allocate a slice and map of the smaller size.
|
||||
var ids []uint64
|
||||
if len(lids) > len(rids) {
|
||||
ids = make([]uint64, 0, len(rids))
|
||||
} else {
|
||||
ids = make([]uint64, 0, len(lids))
|
||||
}
|
||||
|
||||
var filters FilterExprs
|
||||
if len(lfilters) > len(rfilters) {
|
||||
filters = make(FilterExprs, len(rfilters))
|
||||
} else {
|
||||
filters = make(FilterExprs, len(lfilters))
|
||||
}
|
||||
|
||||
// They're in sorted order so advance the counter as needed.
|
||||
// This is, don't run comparisons against lower values that we've already passed.
|
||||
for len(lids) > 0 && len(rids) > 0 {
|
||||
lid, rid := lids[0], rids[0]
|
||||
if lid == rid {
|
||||
ids = append(ids, lid)
|
||||
|
||||
var expr influxql.Expr
|
||||
lfilter := lfilters[lid]
|
||||
rfilter := rfilters[rid]
|
||||
|
||||
if lfilter != nil && rfilter != nil {
|
||||
be := &influxql.BinaryExpr{
|
||||
Op: influxql.AND,
|
||||
LHS: lfilter,
|
||||
RHS: rfilter,
|
||||
}
|
||||
expr = influxql.Reduce(be, nil)
|
||||
} else if lfilter != nil {
|
||||
expr = lfilter
|
||||
} else if rfilter != nil {
|
||||
expr = rfilter
|
||||
}
|
||||
|
||||
if expr != nil {
|
||||
filters[lid] = expr
|
||||
}
|
||||
lids, rids = lids[1:], rids[1:]
|
||||
} else if lid < rid {
|
||||
lids = lids[1:]
|
||||
} else {
|
||||
rids = rids[1:]
|
||||
}
|
||||
}
|
||||
return ids, filters
|
||||
}
|
||||
|
||||
// unionSeriesFilters performs a union for two sets of ids and filter expressions.
|
||||
func unionSeriesFilters(lids, rids SeriesIDs, lfilters, rfilters FilterExprs) (SeriesIDs, FilterExprs) {
|
||||
ids := make([]uint64, 0, len(lids)+len(rids))
|
||||
|
||||
// Setup the filters with the smallest size since we will discard filters
|
||||
// that do not have a match on the other side.
|
||||
var filters FilterExprs
|
||||
if len(lfilters) < len(rfilters) {
|
||||
filters = make(FilterExprs, len(lfilters))
|
||||
} else {
|
||||
filters = make(FilterExprs, len(rfilters))
|
||||
}
|
||||
|
||||
for len(lids) > 0 && len(rids) > 0 {
|
||||
lid, rid := lids[0], rids[0]
|
||||
if lid == rid {
|
||||
ids = append(ids, lid)
|
||||
|
||||
// If one side does not have a filter, then the series has been
|
||||
// included on one side of the OR with no condition. Eliminate the
|
||||
// filter in this case.
|
||||
var expr influxql.Expr
|
||||
lfilter := lfilters[lid]
|
||||
rfilter := rfilters[rid]
|
||||
if lfilter != nil && rfilter != nil {
|
||||
be := &influxql.BinaryExpr{
|
||||
Op: influxql.OR,
|
||||
LHS: lfilter,
|
||||
RHS: rfilter,
|
||||
}
|
||||
expr = influxql.Reduce(be, nil)
|
||||
}
|
||||
|
||||
if expr != nil {
|
||||
filters[lid] = expr
|
||||
}
|
||||
lids, rids = lids[1:], rids[1:]
|
||||
} else if lid < rid {
|
||||
ids = append(ids, lid)
|
||||
|
||||
filter := lfilters[lid]
|
||||
if filter != nil {
|
||||
filters[lid] = filter
|
||||
}
|
||||
lids = lids[1:]
|
||||
} else {
|
||||
ids = append(ids, rid)
|
||||
|
||||
filter := rfilters[rid]
|
||||
if filter != nil {
|
||||
filters[rid] = filter
|
||||
}
|
||||
rids = rids[1:]
|
||||
}
|
||||
}
|
||||
|
||||
// Now append the remainder.
|
||||
if len(lids) > 0 {
|
||||
for i := 0; i < len(lids); i++ {
|
||||
ids = append(ids, lids[i])
|
||||
|
||||
filter := lfilters[lids[i]]
|
||||
if filter != nil {
|
||||
filters[lids[i]] = filter
|
||||
}
|
||||
}
|
||||
} else if len(rids) > 0 {
|
||||
for i := 0; i < len(rids); i++ {
|
||||
ids = append(ids, rids[i])
|
||||
|
||||
filter := rfilters[rids[i]]
|
||||
if filter != nil {
|
||||
filters[rids[i]] = filter
|
||||
}
|
||||
}
|
||||
}
|
||||
return ids, filters
|
||||
}
|
||||
|
||||
// idsForExpr will return a collection of series ids and a filter expression that should
|
||||
// be used to filter points from those series.
|
||||
func (m *Measurement) idsForExpr(n *influxql.BinaryExpr) (SeriesIDs, influxql.Expr, error) {
|
||||
// If this binary expression has another binary expression, then this
|
||||
// is some expression math and we should just pass it to the underlying query.
|
||||
if _, ok := n.LHS.(*influxql.BinaryExpr); ok {
|
||||
return m.seriesIDs, n, nil
|
||||
} else if _, ok := n.RHS.(*influxql.BinaryExpr); ok {
|
||||
return m.seriesIDs, n, nil
|
||||
}
|
||||
|
||||
// Retrieve the variable reference from the correct side of the expression.
|
||||
name, ok := n.LHS.(*influxql.VarRef)
|
||||
value := n.RHS
|
||||
if !ok {
|
||||
name, ok = n.RHS.(*influxql.VarRef)
|
||||
if !ok {
|
||||
return nil, nil, fmt.Errorf("invalid expression: %s", n.String())
|
||||
}
|
||||
value = n.LHS
|
||||
}
|
||||
|
||||
// For time literals, return all series IDs and "true" as the filter.
|
||||
if _, ok := value.(*influxql.TimeLiteral); ok || name.Val == "time" {
|
||||
return m.seriesIDs, &influxql.BooleanLiteral{Val: true}, nil
|
||||
}
|
||||
|
||||
// For fields, return all series IDs from this measurement and return
|
||||
// the expression passed in, as the filter.
|
||||
if name.Val != "_name" && ((name.Type == influxql.Unknown && m.hasField(name.Val)) || name.Type == influxql.AnyField || (name.Type != influxql.Tag && name.Type != influxql.Unknown)) {
|
||||
return m.seriesIDs, n, nil
|
||||
} else if value, ok := value.(*influxql.VarRef); ok {
|
||||
// Check if the RHS is a variable and if it is a field.
|
||||
if value.Val != "_name" && ((value.Type == influxql.Unknown && m.hasField(value.Val)) || name.Type == influxql.AnyField || (value.Type != influxql.Tag && value.Type != influxql.Unknown)) {
|
||||
return m.seriesIDs, n, nil
|
||||
}
|
||||
}
|
||||
|
||||
// Retrieve list of series with this tag key.
|
||||
tagVals := m.seriesByTagKeyValue[name.Val]
|
||||
|
||||
// if we're looking for series with a specific tag value
|
||||
if str, ok := value.(*influxql.StringLiteral); ok {
|
||||
var ids SeriesIDs
|
||||
|
||||
// Special handling for "_name" to match measurement name.
|
||||
if name.Val == "_name" {
|
||||
if (n.Op == influxql.EQ && str.Val == m.Name) || (n.Op == influxql.NEQ && str.Val != m.Name) {
|
||||
return m.seriesIDs, nil, nil
|
||||
}
|
||||
return nil, nil, nil
|
||||
}
|
||||
|
||||
if n.Op == influxql.EQ {
|
||||
if str.Val != "" {
|
||||
// return series that have a tag of specific value.
|
||||
ids = tagVals[str.Val]
|
||||
} else {
|
||||
// Make a copy of all series ids and mark the ones we need to evict.
|
||||
seriesIDs := newEvictSeriesIDs(m.seriesIDs)
|
||||
|
||||
// Go through each slice and mark the values we find as zero so
|
||||
// they can be removed later.
|
||||
for _, a := range tagVals {
|
||||
seriesIDs.mark(a)
|
||||
}
|
||||
|
||||
// Make a new slice with only the remaining ids.
|
||||
ids = seriesIDs.evict()
|
||||
}
|
||||
} else if n.Op == influxql.NEQ {
|
||||
if str.Val != "" {
|
||||
ids = m.seriesIDs.Reject(tagVals[str.Val])
|
||||
} else {
|
||||
for k := range tagVals {
|
||||
ids = append(ids, tagVals[k]...)
|
||||
}
|
||||
sort.Sort(ids)
|
||||
}
|
||||
}
|
||||
return ids, nil, nil
|
||||
}
|
||||
|
||||
// if we're looking for series with a tag value that matches a regex
|
||||
if re, ok := value.(*influxql.RegexLiteral); ok {
|
||||
var ids SeriesIDs
|
||||
|
||||
// Special handling for "_name" to match measurement name.
|
||||
if name.Val == "_name" {
|
||||
match := re.Val.MatchString(m.Name)
|
||||
if (n.Op == influxql.EQREGEX && match) || (n.Op == influxql.NEQREGEX && !match) {
|
||||
return m.seriesIDs, &influxql.BooleanLiteral{Val: true}, nil
|
||||
}
|
||||
return nil, nil, nil
|
||||
}
|
||||
|
||||
// Check if we match the empty string to see if we should include series
|
||||
// that are missing the tag.
|
||||
empty := re.Val.MatchString("")
|
||||
|
||||
// Gather the series that match the regex. If we should include the empty string,
|
||||
// start with the list of all series and reject series that don't match our condition.
|
||||
// If we should not include the empty string, include series that match our condition.
|
||||
if empty && n.Op == influxql.EQREGEX {
|
||||
// See comments above for EQ with a StringLiteral.
|
||||
seriesIDs := newEvictSeriesIDs(m.seriesIDs)
|
||||
for k := range tagVals {
|
||||
if !re.Val.MatchString(k) {
|
||||
seriesIDs.mark(tagVals[k])
|
||||
}
|
||||
}
|
||||
ids = seriesIDs.evict()
|
||||
} else if empty && n.Op == influxql.NEQREGEX {
|
||||
ids = make(SeriesIDs, 0, len(m.seriesIDs))
|
||||
for k := range tagVals {
|
||||
if !re.Val.MatchString(k) {
|
||||
ids = append(ids, tagVals[k]...)
|
||||
}
|
||||
}
|
||||
sort.Sort(ids)
|
||||
} else if !empty && n.Op == influxql.EQREGEX {
|
||||
ids = make(SeriesIDs, 0, len(m.seriesIDs))
|
||||
for k := range tagVals {
|
||||
if re.Val.MatchString(k) {
|
||||
ids = append(ids, tagVals[k]...)
|
||||
}
|
||||
}
|
||||
sort.Sort(ids)
|
||||
} else if !empty && n.Op == influxql.NEQREGEX {
|
||||
// See comments above for EQ with a StringLiteral.
|
||||
seriesIDs := newEvictSeriesIDs(m.seriesIDs)
|
||||
for k := range tagVals {
|
||||
if re.Val.MatchString(k) {
|
||||
seriesIDs.mark(tagVals[k])
|
||||
}
|
||||
}
|
||||
ids = seriesIDs.evict()
|
||||
}
|
||||
return ids, nil, nil
|
||||
}
|
||||
|
||||
// compare tag values
|
||||
if ref, ok := value.(*influxql.VarRef); ok {
|
||||
var ids SeriesIDs
|
||||
|
||||
if n.Op == influxql.NEQ {
|
||||
ids = m.seriesIDs
|
||||
}
|
||||
|
||||
rhsTagVals := m.seriesByTagKeyValue[ref.Val]
|
||||
for k := range tagVals {
|
||||
tags := tagVals[k].Intersect(rhsTagVals[k])
|
||||
if n.Op == influxql.EQ {
|
||||
ids = ids.Union(tags)
|
||||
} else if n.Op == influxql.NEQ {
|
||||
ids = ids.Reject(tags)
|
||||
}
|
||||
}
|
||||
return ids, nil, nil
|
||||
}
|
||||
|
||||
if n.Op == influxql.NEQ || n.Op == influxql.NEQREGEX {
|
||||
return m.seriesIDs, nil, nil
|
||||
}
|
||||
return nil, nil, nil
|
||||
}
|
||||
*/
|
|
@ -1,6 +1,7 @@
|
|||
package tsi1
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"regexp"
|
||||
"sort"
|
||||
|
@ -16,11 +17,8 @@ var _ tsdb.Index = &Index{}
|
|||
|
||||
// Index represents a collection of layered index files and WAL.
|
||||
type Index struct {
|
||||
file *IndexFile
|
||||
|
||||
// TODO(benbjohnson): Use layered list of index files.
|
||||
|
||||
// TODO(benbjohnson): Add write ahead log.
|
||||
logFiles []*LogFile
|
||||
indexFiles IndexFiles
|
||||
}
|
||||
|
||||
// Open opens the index.
|
||||
|
@ -29,11 +27,23 @@ func (i *Index) Open() error { panic("TODO") }
|
|||
// Close closes the index.
|
||||
func (i *Index) Close() error { panic("TODO") }
|
||||
|
||||
// SetFile explicitly sets a file in the index.
|
||||
func (i *Index) SetFile(f *IndexFile) { i.file = f }
|
||||
// SetLogFiles explicitly sets log files.
|
||||
// TEMPORARY: For testing only.
|
||||
func (i *Index) SetLogFiles(a ...*LogFile) { i.logFiles = a }
|
||||
|
||||
func (i *Index) CreateMeasurementIndexIfNotExists(name string) (*tsdb.Measurement, error) {
|
||||
panic("TODO: Requires WAL")
|
||||
// SetIndexFiles explicitly sets index files
|
||||
// TEMPORARY: For testing only.
|
||||
func (i *Index) SetIndexFiles(a ...*IndexFile) { i.indexFiles = IndexFiles(a) }
|
||||
|
||||
// FileN returns the number of log and index files within the index.
|
||||
func (i *Index) FileN() int { return len(i.logFiles) + len(i.indexFiles) }
|
||||
|
||||
func (i *Index) CreateMeasurementIndexIfNotExists(name []byte) (*tsdb.Measurement, error) {
|
||||
// FIXME(benbjohnson): Read lock log file during lookup.
|
||||
if mm := i.measurement(name); mm == nil {
|
||||
return mm, nil
|
||||
}
|
||||
return i.logFiles[0].CreateMeasurementIndexIfNotExists(name)
|
||||
}
|
||||
|
||||
// Measurement retrieves a measurement by name.
|
||||
|
@ -45,7 +55,7 @@ func (i *Index) measurement(name []byte) *tsdb.Measurement {
|
|||
m := tsdb.NewMeasurement(string(name))
|
||||
|
||||
// Iterate over measurement series.
|
||||
itr := i.file.MeasurementSeriesIterator(name)
|
||||
itr := i.MeasurementSeriesIterator(name)
|
||||
|
||||
var id uint64 // TEMPORARY
|
||||
for e := itr.Next(); e != nil; e = itr.Next() {
|
||||
|
@ -69,16 +79,40 @@ func (i *Index) measurement(name []byte) *tsdb.Measurement {
|
|||
return m
|
||||
}
|
||||
|
||||
// MeasurementSeriesIterator returns an iterator over all series in the index.
|
||||
func (i *Index) MeasurementSeriesIterator(name []byte) SeriesIterator {
|
||||
a := make([]SeriesIterator, 0, i.FileN())
|
||||
for _, f := range i.logFiles {
|
||||
a = append(a, f.MeasurementSeriesIterator(name))
|
||||
}
|
||||
for _, f := range i.indexFiles {
|
||||
a = append(a, f.MeasurementSeriesIterator(name))
|
||||
}
|
||||
return MergeSeriesIterators(a...)
|
||||
}
|
||||
|
||||
// Measurements returns a list of all measurements.
|
||||
func (i *Index) Measurements() (tsdb.Measurements, error) {
|
||||
var mms tsdb.Measurements
|
||||
itr := i.file.MeasurementIterator()
|
||||
itr := i.MeasurementIterator()
|
||||
for e := itr.Next(); e != nil; e = itr.Next() {
|
||||
mms = append(mms, i.measurement(e.Name()))
|
||||
}
|
||||
return mms, nil
|
||||
}
|
||||
|
||||
// MeasurementIterator returns an iterator over all measurements in the index.
|
||||
func (i *Index) MeasurementIterator() MeasurementIterator {
|
||||
a := make([]MeasurementIterator, 0, i.FileN())
|
||||
for _, f := range i.logFiles {
|
||||
a = append(a, f.MeasurementIterator())
|
||||
}
|
||||
for _, f := range i.indexFiles {
|
||||
a = append(a, f.MeasurementIterator())
|
||||
}
|
||||
return MergeMeasurementIterators(a...)
|
||||
}
|
||||
|
||||
func (i *Index) MeasurementsByExpr(expr influxql.Expr) (tsdb.Measurements, bool, error) {
|
||||
return i.measurementsByExpr(expr)
|
||||
}
|
||||
|
@ -159,7 +193,7 @@ func (i *Index) measurementsByExpr(expr influxql.Expr) (tsdb.Measurements, bool,
|
|||
// measurementsByNameFilter returns the sorted measurements matching a name.
|
||||
func (i *Index) measurementsByNameFilter(op influxql.Token, val string, regex *regexp.Regexp) tsdb.Measurements {
|
||||
var mms tsdb.Measurements
|
||||
itr := i.file.MeasurementIterator()
|
||||
itr := i.MeasurementIterator()
|
||||
for e := itr.Next(); e != nil; e = itr.Next() {
|
||||
var matched bool
|
||||
switch op {
|
||||
|
@ -183,7 +217,7 @@ func (i *Index) measurementsByNameFilter(op influxql.Token, val string, regex *r
|
|||
|
||||
func (i *Index) measurementsByTagFilter(op influxql.Token, key, val string, regex *regexp.Regexp) tsdb.Measurements {
|
||||
var mms tsdb.Measurements
|
||||
itr := i.file.MeasurementIterator()
|
||||
itr := i.MeasurementIterator()
|
||||
for e := itr.Next(); e != nil; e = itr.Next() {
|
||||
mm := i.measurement(e.Name())
|
||||
|
||||
|
@ -228,12 +262,12 @@ func (i *Index) measurementsByTagFilter(op influxql.Token, key, val string, rege
|
|||
return mms
|
||||
}
|
||||
|
||||
func (i *Index) MeasurementsByName(names []string) ([]*tsdb.Measurement, error) {
|
||||
itr := i.file.MeasurementIterator()
|
||||
func (i *Index) MeasurementsByName(names [][]byte) ([]*tsdb.Measurement, error) {
|
||||
itr := i.MeasurementIterator()
|
||||
mms := make([]*tsdb.Measurement, 0, len(names))
|
||||
for e := itr.Next(); e != nil; e = itr.Next() {
|
||||
for _, name := range names {
|
||||
if string(e.Name()) == name {
|
||||
if bytes.Equal(e.Name(), name) {
|
||||
mms = append(mms, i.measurement(e.Name()))
|
||||
break
|
||||
}
|
||||
|
@ -243,7 +277,7 @@ func (i *Index) MeasurementsByName(names []string) ([]*tsdb.Measurement, error)
|
|||
}
|
||||
|
||||
func (i *Index) MeasurementsByRegex(re *regexp.Regexp) (tsdb.Measurements, error) {
|
||||
itr := i.file.MeasurementIterator()
|
||||
itr := i.MeasurementIterator()
|
||||
var mms tsdb.Measurements
|
||||
for e := itr.Next(); e != nil; e = itr.Next() {
|
||||
if re.Match(e.Name()) {
|
||||
|
@ -257,7 +291,7 @@ func (i *Index) DropMeasurement(name []byte) error {
|
|||
panic("TODO: Requires WAL")
|
||||
}
|
||||
|
||||
func (i *Index) CreateSeriesIndexIfNotExists(measurement string, series *tsdb.Series) (*tsdb.Series, error) {
|
||||
func (i *Index) CreateSeriesIndexIfNotExists(measurement []byte, series *tsdb.Series) (*tsdb.Series, error) {
|
||||
panic("TODO: Requires WAL")
|
||||
}
|
||||
|
||||
|
@ -265,7 +299,7 @@ func (i *Index) Series(key []byte) (*tsdb.Series, error) {
|
|||
panic("TODO")
|
||||
}
|
||||
|
||||
func (i *Index) DropSeries(keys []string) error {
|
||||
func (i *Index) DropSeries(keys [][]byte) error {
|
||||
panic("TODO: Requires WAL")
|
||||
}
|
||||
|
||||
|
@ -279,7 +313,7 @@ func (i *Index) SeriesN() (n uint64, err error) {
|
|||
// return n, nil
|
||||
}
|
||||
|
||||
func (i *Index) TagsForSeries(key string) (models.Tags, error) {
|
||||
func (i *Index) TagsForSeries(key []byte) (models.Tags, error) {
|
||||
ss, err := i.Series([]byte(key))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -297,3 +331,263 @@ func (i *Index) MeasurementsSketches() (estimator.Sketch, estimator.Sketch, erro
|
|||
|
||||
// Dereference is a nop.
|
||||
func (i *Index) Dereference([]byte) {}
|
||||
|
||||
// TagKeySeriesIterator returns a series iterator for all values across a single key.
|
||||
func (i *Index) TagKeySeriesIterator(name, key []byte) SeriesIterator {
|
||||
panic("TODO")
|
||||
}
|
||||
|
||||
// TagValueSeriesIterator returns a series iterator for a single tag value.
|
||||
func (i *Index) TagValueSeriesIterator(name, key, value []byte) SeriesIterator {
|
||||
panic("TODO")
|
||||
}
|
||||
|
||||
// MatchTagValueSeriesIterator returns a series iterator for tags which match value.
|
||||
// If matches is false, returns iterators which do not match value.
|
||||
func (i *Index) MatchTagValueSeriesIterator(name, key []byte, value *regexp.Regexp, matches bool) SeriesIterator {
|
||||
panic("TODO")
|
||||
|
||||
/*
|
||||
// Check if we match the empty string to see if we should include series
|
||||
// that are missing the tag.
|
||||
empty := value.MatchString("")
|
||||
|
||||
// Gather the series that match the regex. If we should include the empty string,
|
||||
// start with the list of all series and reject series that don't match our condition.
|
||||
// If we should not include the empty string, include series that match our condition.
|
||||
if op == influxql.EQREGEX {
|
||||
|
||||
if empty {
|
||||
// See comments above for EQ with a StringLiteral.
|
||||
seriesIDs := newEvictSeriesIDs(m.seriesIDs)
|
||||
for k := range tagVals {
|
||||
if !re.Val.MatchString(k) {
|
||||
seriesIDs.mark(tagVals[k])
|
||||
}
|
||||
}
|
||||
return seriesIDs.evict(), nil, nil
|
||||
}
|
||||
ids = make(SeriesIDs, 0, len(m.seriesIDs))
|
||||
for k := range tagVals {
|
||||
if re.Val.MatchString(k) {
|
||||
ids = append(ids, tagVals[k]...)
|
||||
}
|
||||
}
|
||||
sort.Sort(ids)
|
||||
return ids, nil, nil
|
||||
|
||||
}
|
||||
|
||||
// Compare not-equal to empty string.
|
||||
if empty {
|
||||
ids = make(SeriesIDs, 0, len(m.seriesIDs))
|
||||
for k := range tagVals {
|
||||
if !re.Val.MatchString(k) {
|
||||
ids = append(ids, tagVals[k]...)
|
||||
}
|
||||
}
|
||||
sort.Sort(ids)
|
||||
return ids, nil, nil
|
||||
}
|
||||
|
||||
// Compare not-equal to empty string.
|
||||
seriesIDs := newEvictSeriesIDs(m.seriesIDs)
|
||||
for k := range tagVals {
|
||||
if re.Val.MatchString(k) {
|
||||
seriesIDs.mark(tagVals[k])
|
||||
}
|
||||
}
|
||||
|
||||
return seriesIDs.evict(), nil, nil
|
||||
*/
|
||||
}
|
||||
|
||||
// TagSets returns an ordered list of tag sets for a measurement by dimension
|
||||
// and filtered by an optional conditional expression.
|
||||
func (i *Index) TagSets(name []byte, dimensions []string, condition influxql.Expr) ([]*influxql.TagSet, error) {
|
||||
var tagSets []*influxql.TagSet
|
||||
// TODO(benbjohnson): Iterate over filtered series and build tag sets.
|
||||
return tagSets, nil
|
||||
}
|
||||
|
||||
// MeasurementSeriesByExprIterator returns a series iterator for a measurement
|
||||
// that is filtered by expr. If expr only contains time expressions then this
|
||||
// call is equivalent to MeasurementSeriesIterator().
|
||||
func (i *Index) MeasurementSeriesByExprIterator(name []byte, expr influxql.Expr) (SeriesIterator, error) {
|
||||
// Return all series for the measurement if there are no tag expressions.
|
||||
if expr == nil || influxql.OnlyTimeExpr(expr) {
|
||||
return i.MeasurementSeriesIterator(name), nil
|
||||
}
|
||||
return i.seriesByExprIterator(name, expr)
|
||||
}
|
||||
|
||||
func (i *Index) seriesByExprIterator(name []byte, expr influxql.Expr) (SeriesIterator, error) {
|
||||
switch expr := expr.(type) {
|
||||
case *influxql.BinaryExpr:
|
||||
switch expr.Op {
|
||||
case influxql.AND, influxql.OR:
|
||||
// Get the series IDs and filter expressions for the LHS.
|
||||
litr, err := i.seriesByExprIterator(name, expr.LHS)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Get the series IDs and filter expressions for the RHS.
|
||||
ritr, err := i.seriesByExprIterator(name, expr.RHS)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Intersect iterators if expression is "AND".
|
||||
if expr.Op == influxql.AND {
|
||||
return IntersectSeriesIterators(litr, ritr), nil
|
||||
}
|
||||
|
||||
// Union iterators if expression is "OR".
|
||||
return UnionSeriesIterators(litr, ritr), nil
|
||||
|
||||
default:
|
||||
return i.seriesByBinaryExprIterator(name, expr)
|
||||
}
|
||||
|
||||
case *influxql.ParenExpr:
|
||||
return i.seriesByExprIterator(name, expr.Expr)
|
||||
|
||||
default:
|
||||
return nil, nil
|
||||
}
|
||||
}
|
||||
|
||||
// seriesByBinaryExprIterator returns a series iterator and a filtering expression.
|
||||
func (i *Index) seriesByBinaryExprIterator(name []byte, n *influxql.BinaryExpr) (SeriesIterator, error) {
|
||||
// If this binary expression has another binary expression, then this
|
||||
// is some expression math and we should just pass it to the underlying query.
|
||||
if _, ok := n.LHS.(*influxql.BinaryExpr); ok {
|
||||
return newSeriesExprIterator(i.MeasurementSeriesIterator(name), n), nil
|
||||
} else if _, ok := n.RHS.(*influxql.BinaryExpr); ok {
|
||||
return newSeriesExprIterator(i.MeasurementSeriesIterator(name), n), nil
|
||||
}
|
||||
|
||||
// Retrieve the variable reference from the correct side of the expression.
|
||||
key, ok := n.LHS.(*influxql.VarRef)
|
||||
value := n.RHS
|
||||
if !ok {
|
||||
key, ok = n.RHS.(*influxql.VarRef)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("invalid expression: %s", n.String())
|
||||
}
|
||||
value = n.LHS
|
||||
}
|
||||
|
||||
// For time literals, return all series and "true" as the filter.
|
||||
if _, ok := value.(*influxql.TimeLiteral); ok || key.Val == "time" {
|
||||
return newSeriesExprIterator(i.MeasurementSeriesIterator(name), &influxql.BooleanLiteral{Val: true}), nil
|
||||
}
|
||||
|
||||
/*
|
||||
// For fields, return all series from this measurement.
|
||||
if key.Val != "_name" && ((key.Type == influxql.Unknown && i.hasField(key.Val)) || key.Type == influxql.AnyField || (key.Type != influxql.Tag && key.Type != influxql.Unknown)) {
|
||||
return newSeriesExprIterator(i.MeasurementSeriesIterator(name), n), nil
|
||||
} else if value, ok := value.(*influxql.VarRef); ok {
|
||||
// Check if the RHS is a variable and if it is a field.
|
||||
if value.Val != "_name" && ((value.Type == influxql.Unknown && i.hasField(value.Val)) || key.Type == influxql.AnyField || (value.Type != influxql.Tag && value.Type != influxql.Unknown)) {
|
||||
return newSeriesExprIterator(i.MeasurementSeriesIterator(name), n), nil
|
||||
}
|
||||
}
|
||||
*/
|
||||
|
||||
// Create iterator based on value type.
|
||||
switch value := value.(type) {
|
||||
case *influxql.StringLiteral:
|
||||
return i.seriesByBinaryExprStringIterator(name, []byte(key.Val), []byte(value.Val), n.Op)
|
||||
case *influxql.RegexLiteral:
|
||||
return i.seriesByBinaryExprRegexIterator(name, []byte(key.Val), value.Val, n.Op)
|
||||
case *influxql.VarRef:
|
||||
return i.seriesByBinaryExprVarRefIterator(name, []byte(key.Val), value, n.Op)
|
||||
default:
|
||||
if n.Op == influxql.NEQ || n.Op == influxql.NEQREGEX {
|
||||
return i.MeasurementSeriesIterator(name), nil
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
}
|
||||
|
||||
func (i *Index) seriesByBinaryExprStringIterator(name, key, value []byte, op influxql.Token) (SeriesIterator, error) {
|
||||
// Special handling for "_name" to match measurement name.
|
||||
if bytes.Equal(key, []byte("_name")) {
|
||||
if (op == influxql.EQ && bytes.Equal(value, name)) || (op == influxql.NEQ && !bytes.Equal(value, name)) {
|
||||
return i.MeasurementSeriesIterator(name), nil
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
if op == influxql.EQ {
|
||||
// Match a specific value.
|
||||
if len(value) != 0 {
|
||||
return i.TagValueSeriesIterator(name, key, value), nil
|
||||
}
|
||||
|
||||
// Return all measurement series that have no values from this tag key.
|
||||
return DifferenceSeriesIterators(
|
||||
i.MeasurementSeriesIterator(name),
|
||||
i.TagKeySeriesIterator(name, key),
|
||||
), nil
|
||||
}
|
||||
|
||||
// Return all measurement series without this tag value.
|
||||
if len(value) != 0 {
|
||||
return DifferenceSeriesIterators(
|
||||
i.MeasurementSeriesIterator(name),
|
||||
i.TagValueSeriesIterator(name, key, value),
|
||||
), nil
|
||||
}
|
||||
|
||||
// Return all series across all values of this tag key.
|
||||
return i.TagKeySeriesIterator(name, key), nil
|
||||
}
|
||||
|
||||
func (i *Index) seriesByBinaryExprRegexIterator(name, key []byte, value *regexp.Regexp, op influxql.Token) (SeriesIterator, error) {
|
||||
// Special handling for "_name" to match measurement name.
|
||||
if bytes.Equal(key, []byte("_name")) {
|
||||
match := value.Match(name)
|
||||
if (op == influxql.EQREGEX && match) || (op == influxql.NEQREGEX && !match) {
|
||||
return newSeriesExprIterator(i.MeasurementSeriesIterator(name), &influxql.BooleanLiteral{Val: true}), nil
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
return i.MatchTagValueSeriesIterator(name, key, value, op == influxql.EQREGEX), nil
|
||||
}
|
||||
|
||||
func (i *Index) seriesByBinaryExprVarRefIterator(name, key []byte, value *influxql.VarRef, op influxql.Token) (SeriesIterator, error) {
|
||||
if op == influxql.EQ {
|
||||
return IntersectSeriesIterators(
|
||||
i.TagKeySeriesIterator(name, key),
|
||||
i.TagKeySeriesIterator(name, []byte(value.Val)),
|
||||
), nil
|
||||
}
|
||||
|
||||
return DifferenceSeriesIterators(
|
||||
i.TagKeySeriesIterator(name, key),
|
||||
i.TagKeySeriesIterator(name, []byte(value.Val)),
|
||||
), nil
|
||||
}
|
||||
|
||||
// FilterExprs represents a map of series IDs to filter expressions.
|
||||
type FilterExprs map[uint64]influxql.Expr
|
||||
|
||||
// DeleteBoolLiteralTrues deletes all elements whose filter expression is a boolean literal true.
|
||||
func (fe FilterExprs) DeleteBoolLiteralTrues() {
|
||||
for id, expr := range fe {
|
||||
if e, ok := expr.(*influxql.BooleanLiteral); ok && e.Val == true {
|
||||
delete(fe, id)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Len returns the number of elements.
|
||||
func (fe FilterExprs) Len() int {
|
||||
if fe == nil {
|
||||
return 0
|
||||
}
|
||||
return len(fe)
|
||||
}
|
||||
|
|
|
@ -21,7 +21,7 @@ func TestIndex_Measurement(t *testing.T) {
|
|||
|
||||
// Create an index from the single file.
|
||||
var idx tsi1.Index
|
||||
idx.SetFile(f)
|
||||
idx.SetIndexFiles(f)
|
||||
|
||||
// Verify measurement is correct.
|
||||
if mm, err := idx.Measurement([]byte("cpu")); err != nil {
|
||||
|
@ -52,7 +52,7 @@ func TestIndex_Measurements(t *testing.T) {
|
|||
|
||||
// Create an index from the single file.
|
||||
var idx tsi1.Index
|
||||
idx.SetFile(f)
|
||||
idx.SetIndexFiles(f)
|
||||
|
||||
// Retrieve measurements and verify.
|
||||
if mms, err := idx.Measurements(); err != nil {
|
||||
|
|
|
@ -10,6 +10,7 @@ import (
|
|||
|
||||
"github.com/influxdata/influxdb/models"
|
||||
"github.com/influxdata/influxdb/pkg/mmap"
|
||||
"github.com/influxdata/influxdb/tsdb"
|
||||
)
|
||||
|
||||
// Log entry flag constants.
|
||||
|
@ -96,6 +97,10 @@ func (f *LogFile) Close() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (f *LogFile) CreateMeasurementIndexIfNotExists(name []byte) (*tsdb.Measurement, error) {
|
||||
panic("TODO")
|
||||
}
|
||||
|
||||
// DeleteMeasurement adds a tombstone for a measurement to the log file.
|
||||
func (f *LogFile) DeleteMeasurement(name []byte) error {
|
||||
// Append log entry.
|
||||
|
@ -223,6 +228,11 @@ func (f *LogFile) MeasurementIterator() MeasurementIterator {
|
|||
return &itr
|
||||
}
|
||||
|
||||
// MeasurementSeriesIterator returns an iterator over all series in the log file.
|
||||
func (f *LogFile) MeasurementSeriesIterator(name []byte) SeriesIterator {
|
||||
panic("TODO")
|
||||
}
|
||||
|
||||
// CompactTo compacts the log file and writes it to w.
|
||||
func (f *LogFile) CompactTo(w io.Writer) (n int64, err error) {
|
||||
var t IndexFileTrailer
|
||||
|
|
|
@ -9,6 +9,7 @@ import (
|
|||
"math"
|
||||
"sort"
|
||||
|
||||
"github.com/influxdata/influxdb/influxql"
|
||||
"github.com/influxdata/influxdb/models"
|
||||
)
|
||||
|
||||
|
@ -303,6 +304,10 @@ func (e *seriesBlockElem) Name() []byte { return e.name }
|
|||
// Tags returns the tag set.
|
||||
func (e *seriesBlockElem) Tags() models.Tags { return e.tags }
|
||||
|
||||
// Expr always returns a nil expression.
|
||||
// This is only used by higher level query planning.
|
||||
func (e *seriesBlockElem) Expr() influxql.Expr { return nil }
|
||||
|
||||
// SeriesBlockWriter writes a SeriesBlock.
|
||||
type SeriesBlockWriter struct {
|
||||
terms map[string]int // term frequency
|
||||
|
|
|
@ -8,6 +8,7 @@ import (
|
|||
"os"
|
||||
|
||||
"github.com/cespare/xxhash"
|
||||
"github.com/influxdata/influxdb/influxql"
|
||||
"github.com/influxdata/influxdb/models"
|
||||
)
|
||||
|
||||
|
@ -348,6 +349,17 @@ type SeriesElem interface {
|
|||
Name() []byte
|
||||
Tags() models.Tags
|
||||
Deleted() bool
|
||||
|
||||
// InfluxQL expression associated with series during filtering.
|
||||
Expr() influxql.Expr
|
||||
}
|
||||
|
||||
// CompareSeriesElem returns -1 if a < b, 1 if a > b, and 0 if equal.
|
||||
func CompareSeriesElem(a, b SeriesElem) int {
|
||||
if cmp := bytes.Compare(a.Name(), b.Name()); cmp != 0 {
|
||||
return cmp
|
||||
}
|
||||
return models.CompareTags(a.Tags(), b.Tags())
|
||||
}
|
||||
|
||||
// SeriesIterator represents a iterator over a list of series.
|
||||
|
@ -363,19 +375,13 @@ func MergeSeriesIterators(itrs ...SeriesIterator) SeriesIterator {
|
|||
return nil
|
||||
}
|
||||
|
||||
itr := &seriesMergeIterator{
|
||||
return &seriesMergeIterator{
|
||||
buf: make([]SeriesElem, len(itrs)),
|
||||
itrs: itrs,
|
||||
}
|
||||
|
||||
// Initialize buffers.
|
||||
for i := range itr.itrs {
|
||||
itr.buf[i] = itr.itrs[i].Next()
|
||||
}
|
||||
|
||||
return itr
|
||||
}
|
||||
|
||||
// seriesMergeIterator is an iterator that merges multiple iterators together.
|
||||
type seriesMergeIterator struct {
|
||||
buf []SeriesElem
|
||||
itrs []SeriesIterator
|
||||
|
@ -434,63 +440,230 @@ func (itr *seriesMergeIterator) Next() SeriesElem {
|
|||
return e
|
||||
}
|
||||
|
||||
// seriesIDIterator represents a iterator over a list of series ids.
|
||||
type seriesIDIterator interface {
|
||||
next() uint32
|
||||
}
|
||||
|
||||
// unionSeriesIDIterators returns an iterator returns a union of iterators.
|
||||
func unionSeriesIDIterators(itrs ...seriesIDIterator) seriesIDIterator {
|
||||
if len(itrs) == 0 {
|
||||
// IntersectSeriesIterators returns an iterator that only returns series which
|
||||
// occur in both iterators. If both series have associated expressions then
|
||||
// they are combined together.
|
||||
func IntersectSeriesIterators(itr0, itr1 SeriesIterator) SeriesIterator {
|
||||
if itr0 == nil || itr1 == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
itr := &unionIterator{
|
||||
buf: make([]uint32, len(itrs)),
|
||||
itrs: itrs,
|
||||
}
|
||||
|
||||
// Initialize buffers.
|
||||
for i := range itr.itrs {
|
||||
itr.buf[i] = itr.itrs[i].next()
|
||||
}
|
||||
|
||||
return itr
|
||||
return &seriesIntersectIterator{itrs: [2]SeriesIterator{itr0, itr1}}
|
||||
}
|
||||
|
||||
type unionIterator struct {
|
||||
buf []uint32
|
||||
itrs []seriesIDIterator
|
||||
// seriesIntersectIterator is an iterator that merges two iterators together.
|
||||
type seriesIntersectIterator struct {
|
||||
e seriesExprElem
|
||||
buf [2]SeriesElem
|
||||
itrs [2]SeriesIterator
|
||||
}
|
||||
|
||||
// next returns the next series id. Duplicates are combined.
|
||||
func (itr *unionIterator) next() uint32 {
|
||||
// Find next series id in the buffers.
|
||||
var id uint32
|
||||
for i := range itr.buf {
|
||||
// Skip empty buffers.
|
||||
if itr.buf[i] == 0 {
|
||||
// Next returns the next element which occurs in both iterators.
|
||||
func (itr *seriesIntersectIterator) Next() (e SeriesElem) {
|
||||
// Fill buffers.
|
||||
if itr.buf[0] == nil {
|
||||
itr.buf[0] = itr.itrs[0].Next()
|
||||
}
|
||||
if itr.buf[1] == nil {
|
||||
itr.buf[1] = itr.itrs[1].Next()
|
||||
}
|
||||
|
||||
// Exit if either buffer is still empty.
|
||||
if itr.buf[0] == nil || itr.buf[1] == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Return lesser series.
|
||||
if cmp := CompareSeriesElem(itr.buf[0], itr.buf[1]); cmp == -1 {
|
||||
e, itr.buf[0] = itr.buf[0], nil
|
||||
return e
|
||||
} else if cmp == 1 {
|
||||
e, itr.buf[1] = itr.buf[1], nil
|
||||
return e
|
||||
}
|
||||
|
||||
// Merge series together if equal.
|
||||
itr.e.SeriesElem = itr.buf[0]
|
||||
|
||||
// Attach expression.
|
||||
expr0 := itr.buf[0].Expr()
|
||||
expr1 := itr.buf[0].Expr()
|
||||
if expr0 == nil {
|
||||
itr.e.expr = expr1
|
||||
} else if expr1 == nil {
|
||||
itr.e.expr = expr0
|
||||
} else {
|
||||
itr.e.expr = &influxql.BinaryExpr{
|
||||
Op: influxql.AND,
|
||||
LHS: expr0,
|
||||
RHS: expr1,
|
||||
}
|
||||
}
|
||||
|
||||
itr.buf[0], itr.buf[1] = nil, nil
|
||||
return &itr.e
|
||||
}
|
||||
|
||||
// UnionSeriesIterators returns an iterator that returns series from both
|
||||
// both iterators. If both series have associated expressions then they are
|
||||
// combined together.
|
||||
func UnionSeriesIterators(itr0, itr1 SeriesIterator) SeriesIterator {
|
||||
// Return other iterator if either one is nil.
|
||||
if itr0 == nil {
|
||||
return itr1
|
||||
} else if itr1 == nil {
|
||||
return itr0
|
||||
}
|
||||
|
||||
return &seriesUnionIterator{itrs: [2]SeriesIterator{itr0, itr1}}
|
||||
}
|
||||
|
||||
// seriesUnionIterator is an iterator that unions two iterators together.
|
||||
type seriesUnionIterator struct {
|
||||
e seriesExprElem
|
||||
buf [2]SeriesElem
|
||||
itrs [2]SeriesIterator
|
||||
}
|
||||
|
||||
// Next returns the next element which occurs in both iterators.
|
||||
func (itr *seriesUnionIterator) Next() (e SeriesElem) {
|
||||
// Fill buffers.
|
||||
if itr.buf[0] == nil {
|
||||
itr.buf[0] = itr.itrs[0].Next()
|
||||
}
|
||||
if itr.buf[1] == nil {
|
||||
itr.buf[1] = itr.itrs[1].Next()
|
||||
}
|
||||
|
||||
// Return the other iterator if either one is empty.
|
||||
if itr.buf[0] == nil {
|
||||
e, itr.buf[1] = itr.buf[1], nil
|
||||
return e
|
||||
} else if itr.buf[1] == nil {
|
||||
e, itr.buf[0] = itr.buf[0], nil
|
||||
return e
|
||||
}
|
||||
|
||||
// Return lesser series.
|
||||
if cmp := CompareSeriesElem(itr.buf[0], itr.buf[1]); cmp == -1 {
|
||||
e, itr.buf[0] = itr.buf[0], nil
|
||||
return e
|
||||
} else if cmp == 1 {
|
||||
e, itr.buf[1] = itr.buf[1], nil
|
||||
return e
|
||||
}
|
||||
|
||||
// Attach element.
|
||||
itr.e.SeriesElem = itr.buf[0]
|
||||
|
||||
// Attach expression.
|
||||
expr0 := itr.buf[0].Expr()
|
||||
expr1 := itr.buf[0].Expr()
|
||||
if expr0 == nil {
|
||||
itr.e.expr = expr1
|
||||
} else if expr1 == nil {
|
||||
itr.e.expr = expr0
|
||||
} else {
|
||||
itr.e.expr = &influxql.BinaryExpr{
|
||||
Op: influxql.OR,
|
||||
LHS: expr0,
|
||||
RHS: expr1,
|
||||
}
|
||||
}
|
||||
|
||||
itr.buf[0], itr.buf[1] = nil, nil
|
||||
return &itr.e
|
||||
}
|
||||
|
||||
// DifferenceSeriesIterators returns an iterator that only returns series which
|
||||
// occur the first iterator but not the second iterator.
|
||||
func DifferenceSeriesIterators(itr0, itr1 SeriesIterator) SeriesIterator {
|
||||
if itr0 != nil && itr1 == nil {
|
||||
return itr0
|
||||
} else if itr0 == nil {
|
||||
return nil
|
||||
}
|
||||
return &seriesDifferenceIterator{itrs: [2]SeriesIterator{itr0, itr1}}
|
||||
}
|
||||
|
||||
// seriesDifferenceIterator is an iterator that merges two iterators together.
|
||||
type seriesDifferenceIterator struct {
|
||||
buf [2]SeriesElem
|
||||
itrs [2]SeriesIterator
|
||||
}
|
||||
|
||||
// Next returns the next element which occurs only in the first iterator.
|
||||
func (itr *seriesDifferenceIterator) Next() (e SeriesElem) {
|
||||
for {
|
||||
// Fill buffers.
|
||||
if itr.buf[0] == nil {
|
||||
itr.buf[0] = itr.itrs[0].Next()
|
||||
}
|
||||
if itr.buf[1] == nil {
|
||||
itr.buf[1] = itr.itrs[1].Next()
|
||||
}
|
||||
|
||||
// Exit if first buffer is still empty.
|
||||
if itr.buf[0] == nil {
|
||||
return nil
|
||||
} else if itr.buf[1] == nil {
|
||||
e, itr.buf[0] = itr.buf[0], nil
|
||||
return e
|
||||
}
|
||||
|
||||
// Return first series if it's less.
|
||||
// If second series is less then skip it.
|
||||
// If both series are equal then skip both.
|
||||
if cmp := CompareSeriesElem(itr.buf[0], itr.buf[1]); cmp == -1 {
|
||||
e, itr.buf[0] = itr.buf[0], nil
|
||||
return e
|
||||
} else if cmp == 1 {
|
||||
itr.buf[1] = nil
|
||||
continue
|
||||
} else {
|
||||
itr.buf[0], itr.buf[1] = nil, nil
|
||||
continue
|
||||
}
|
||||
|
||||
// If the name is not set the pick the first non-empty name.
|
||||
if id == 0 || itr.buf[i] < id {
|
||||
id = itr.buf[i]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Return zero if no elements remaining.
|
||||
if id == 0 {
|
||||
return 0
|
||||
}
|
||||
// seriesExprElem holds a series and its associated filter expression.
|
||||
type seriesExprElem struct {
|
||||
SeriesElem
|
||||
expr influxql.Expr
|
||||
}
|
||||
|
||||
// Refill buffer.
|
||||
for i := range itr.buf {
|
||||
if itr.buf[i] == id {
|
||||
itr.buf[i] = itr.itrs[i].next()
|
||||
}
|
||||
// Expr returns the associated expression.
|
||||
func (e *seriesExprElem) Expr() influxql.Expr { return e.expr }
|
||||
|
||||
// seriesExprIterator is an iterator that attaches an associated expression.
|
||||
type seriesExprIterator struct {
|
||||
itr SeriesIterator
|
||||
e seriesExprElem
|
||||
}
|
||||
|
||||
// newSeriesExprIterator returns a new instance of seriesExprIterator.
|
||||
func newSeriesExprIterator(itr SeriesIterator, expr influxql.Expr) *seriesExprIterator {
|
||||
return &seriesExprIterator{
|
||||
itr: itr,
|
||||
e: seriesExprElem{
|
||||
expr: expr,
|
||||
},
|
||||
}
|
||||
return id
|
||||
}
|
||||
|
||||
// Next returns the next element in the iterator.
|
||||
func (itr *seriesExprIterator) Next() SeriesElem {
|
||||
itr.e.SeriesElem = itr.Next()
|
||||
if itr.e.SeriesElem == nil {
|
||||
return nil
|
||||
}
|
||||
return &itr.e
|
||||
}
|
||||
|
||||
// seriesIDIterator represents a iterator over a list of series ids.
|
||||
type seriesIDIterator interface {
|
||||
next() uint32
|
||||
}
|
||||
|
||||
// writeTo writes write v into w. Updates n.
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
"reflect"
|
||||
"testing"
|
||||
|
||||
"github.com/influxdata/influxdb/influxql"
|
||||
"github.com/influxdata/influxdb/models"
|
||||
"github.com/influxdata/influxdb/tsdb/engine/tsi1"
|
||||
)
|
||||
|
@ -274,11 +275,13 @@ type SeriesElem struct {
|
|||
name []byte
|
||||
tags models.Tags
|
||||
deleted bool
|
||||
expr influxql.Expr
|
||||
}
|
||||
|
||||
func (e *SeriesElem) Name() []byte { return e.name }
|
||||
func (e *SeriesElem) Tags() models.Tags { return e.tags }
|
||||
func (e *SeriesElem) Deleted() bool { return e.deleted }
|
||||
func (e *SeriesElem) Name() []byte { return e.name }
|
||||
func (e *SeriesElem) Tags() models.Tags { return e.tags }
|
||||
func (e *SeriesElem) Deleted() bool { return e.deleted }
|
||||
func (e *SeriesElem) Expr() influxql.Expr { return e.expr }
|
||||
|
||||
// SeriesIterator represents an iterator over a slice of tag values.
|
||||
type SeriesIterator struct {
|
||||
|
|
|
@ -1351,24 +1351,13 @@ func (e *Engine) createVarRefIterator(opt influxql.IteratorOptions, aggregate bo
|
|||
|
||||
var itrs []influxql.Iterator
|
||||
if err := func() error {
|
||||
mByName, err := e.index.MeasurementsByName(influxql.Sources(opt.Sources).Names())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
mms := tsdb.Measurements(mByName)
|
||||
|
||||
for _, mm := range mms {
|
||||
// Determine tagsets for this measurement based on dimensions and filters.
|
||||
tagSets, err := mm.TagSets(opt.Dimensions, opt.Condition)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Calculate tag sets and apply SLIMIT/SOFFSET.
|
||||
tagSets = influxql.LimitTagSets(tagSets, opt.SLimit, opt.SOffset)
|
||||
for _, name := range influxql.Sources(opt.Sources).Names() {
|
||||
// Generate tag sets from index.
|
||||
tagSets := e.index.TagSets(name, opt.Dimensions, opt.Condition, opt.SLimit, opt.SOffset)
|
||||
|
||||
// Create iterators for each tagset.
|
||||
for _, t := range tagSets {
|
||||
inputs, err := e.createTagSetIterators(ref, mm, t, opt)
|
||||
inputs, err := e.createTagSetIterators(ref, name, t, opt)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -1390,7 +1379,7 @@ func (e *Engine) createVarRefIterator(opt influxql.IteratorOptions, aggregate bo
|
|||
}
|
||||
|
||||
// createTagSetIterators creates a set of iterators for a tagset.
|
||||
func (e *Engine) createTagSetIterators(ref *influxql.VarRef, mm *tsdb.Measurement, t *influxql.TagSet, opt influxql.IteratorOptions) ([]influxql.Iterator, error) {
|
||||
func (e *Engine) createTagSetIterators(ref *influxql.VarRef, name string, t *influxql.TagSet, opt influxql.IteratorOptions) ([]influxql.Iterator, error) {
|
||||
// Set parallelism by number of logical cpus.
|
||||
parallelism := runtime.GOMAXPROCS(0)
|
||||
if parallelism > len(t.SeriesKeys) {
|
||||
|
@ -1427,7 +1416,7 @@ func (e *Engine) createTagSetIterators(ref *influxql.VarRef, mm *tsdb.Measuremen
|
|||
wg.Add(1)
|
||||
go func(i int) {
|
||||
defer wg.Done()
|
||||
groups[i].itrs, groups[i].err = e.createTagSetGroupIterators(ref, mm, groups[i].keys, t, groups[i].filters, opt)
|
||||
groups[i].itrs, groups[i].err = e.createTagSetGroupIterators(ref, name, groups[i].keys, t, groups[i].filters, opt)
|
||||
}(i)
|
||||
}
|
||||
wg.Wait()
|
||||
|
@ -1458,7 +1447,7 @@ func (e *Engine) createTagSetIterators(ref *influxql.VarRef, mm *tsdb.Measuremen
|
|||
}
|
||||
|
||||
// createTagSetGroupIterators creates a set of iterators for a subset of a tagset's series.
|
||||
func (e *Engine) createTagSetGroupIterators(ref *influxql.VarRef, mm *tsdb.Measurement, seriesKeys []string, t *influxql.TagSet, filters []influxql.Expr, opt influxql.IteratorOptions) ([]influxql.Iterator, error) {
|
||||
func (e *Engine) createTagSetGroupIterators(ref *influxql.VarRef, name string, seriesKeys []string, t *influxql.TagSet, filters []influxql.Expr, opt influxql.IteratorOptions) ([]influxql.Iterator, error) {
|
||||
conditionFields := make([]influxql.VarRef, len(influxql.ExprNames(opt.Condition)))
|
||||
|
||||
itrs := make([]influxql.Iterator, 0, len(seriesKeys))
|
||||
|
@ -1472,7 +1461,7 @@ func (e *Engine) createTagSetGroupIterators(ref *influxql.VarRef, mm *tsdb.Measu
|
|||
}
|
||||
}
|
||||
|
||||
itr, err := e.createVarRefSeriesIterator(ref, mm, seriesKey, t, filters[i], conditionFields[:fields], opt)
|
||||
itr, err := e.createVarRefSeriesIterator(ref, name, seriesKey, t, filters[i], conditionFields[:fields], opt)
|
||||
if err != nil {
|
||||
return itrs, err
|
||||
} else if itr == nil {
|
||||
|
@ -1484,7 +1473,7 @@ func (e *Engine) createTagSetGroupIterators(ref *influxql.VarRef, mm *tsdb.Measu
|
|||
}
|
||||
|
||||
// createVarRefSeriesIterator creates an iterator for a variable reference for a series.
|
||||
func (e *Engine) createVarRefSeriesIterator(ref *influxql.VarRef, mm *tsdb.Measurement, seriesKey string, t *influxql.TagSet, filter influxql.Expr, conditionFields []influxql.VarRef, opt influxql.IteratorOptions) (influxql.Iterator, error) {
|
||||
func (e *Engine) createVarRefSeriesIterator(ref *influxql.VarRef, name string, seriesKey string, t *influxql.TagSet, filter influxql.Expr, conditionFields []influxql.VarRef, opt influxql.IteratorOptions) (influxql.Iterator, error) {
|
||||
tfs, err := e.index.TagsForSeries(seriesKey)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -1504,7 +1493,7 @@ func (e *Engine) createVarRefSeriesIterator(ref *influxql.VarRef, mm *tsdb.Measu
|
|||
for i, ref := range opt.Aux {
|
||||
// Create cursor from field if a tag wasn't requested.
|
||||
if ref.Type != influxql.Tag {
|
||||
cur := e.buildCursor(mm.Name, seriesKey, &ref, opt)
|
||||
cur := e.buildCursor(name, seriesKey, &ref, opt)
|
||||
if cur != nil {
|
||||
aux[i] = newBufCursor(cur, opt.Ascending)
|
||||
continue
|
||||
|
@ -1545,7 +1534,7 @@ func (e *Engine) createVarRefSeriesIterator(ref *influxql.VarRef, mm *tsdb.Measu
|
|||
for i, ref := range conditionFields {
|
||||
// Create cursor from field if a tag wasn't requested.
|
||||
if ref.Type != influxql.Tag {
|
||||
cur := e.buildCursor(mm.Name, seriesKey, &ref, opt)
|
||||
cur := e.buildCursor(name, seriesKey, &ref, opt)
|
||||
if cur != nil {
|
||||
conds[i] = newBufCursor(cur, opt.Ascending)
|
||||
continue
|
||||
|
@ -1584,11 +1573,11 @@ func (e *Engine) createVarRefSeriesIterator(ref *influxql.VarRef, mm *tsdb.Measu
|
|||
|
||||
// If it's only auxiliary fields then it doesn't matter what type of iterator we use.
|
||||
if ref == nil {
|
||||
return newFloatIterator(mm.Name, tags, itrOpt, nil, aux, conds, condNames), nil
|
||||
return newFloatIterator(name, tags, itrOpt, nil, aux, conds, condNames), nil
|
||||
}
|
||||
|
||||
// Build main cursor.
|
||||
cur := e.buildCursor(mm.Name, seriesKey, ref, opt)
|
||||
cur := e.buildCursor(name, seriesKey, ref, opt)
|
||||
|
||||
// If the field doesn't exist then don't build an iterator.
|
||||
if cur == nil {
|
||||
|
@ -1597,13 +1586,13 @@ func (e *Engine) createVarRefSeriesIterator(ref *influxql.VarRef, mm *tsdb.Measu
|
|||
|
||||
switch cur := cur.(type) {
|
||||
case floatCursor:
|
||||
return newFloatIterator(mm.Name, tags, itrOpt, cur, aux, conds, condNames), nil
|
||||
return newFloatIterator(name, tags, itrOpt, cur, aux, conds, condNames), nil
|
||||
case integerCursor:
|
||||
return newIntegerIterator(mm.Name, tags, itrOpt, cur, aux, conds, condNames), nil
|
||||
return newIntegerIterator(name, tags, itrOpt, cur, aux, conds, condNames), nil
|
||||
case stringCursor:
|
||||
return newStringIterator(mm.Name, tags, itrOpt, cur, aux, conds, condNames), nil
|
||||
return newStringIterator(name, tags, itrOpt, cur, aux, conds, condNames), nil
|
||||
case booleanCursor:
|
||||
return newBooleanIterator(mm.Name, tags, itrOpt, cur, aux, conds, condNames), nil
|
||||
return newBooleanIterator(name, tags, itrOpt, cur, aux, conds, condNames), nil
|
||||
default:
|
||||
panic("unreachable")
|
||||
}
|
||||
|
|
|
@ -12,22 +12,22 @@ type Index interface {
|
|||
Open() error
|
||||
Close() error
|
||||
|
||||
CreateMeasurementIndexIfNotExists(name string) (*Measurement, error)
|
||||
CreateMeasurementIndexIfNotExists(name []byte) (*Measurement, error)
|
||||
Measurement(name []byte) (*Measurement, error)
|
||||
Measurements() (Measurements, error)
|
||||
MeasurementsByExpr(expr influxql.Expr) (Measurements, bool, error)
|
||||
MeasurementsByName(names []string) ([]*Measurement, error)
|
||||
MeasurementsByName(names [][]byte) ([]*Measurement, error)
|
||||
MeasurementsByRegex(re *regexp.Regexp) (Measurements, error)
|
||||
DropMeasurement(name []byte) error
|
||||
|
||||
CreateSeriesIndexIfNotExists(measurement string, series *Series) (*Series, error)
|
||||
CreateSeriesIndexIfNotExists(measurement []byte, series *Series) (*Series, error)
|
||||
Series(key []byte) (*Series, error)
|
||||
DropSeries(keys []string) error
|
||||
DropSeries(keys [][]byte) error
|
||||
|
||||
SeriesN() (uint64, error)
|
||||
SeriesSketches() (estimator.Sketch, estimator.Sketch, error)
|
||||
MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error)
|
||||
|
||||
TagsForSeries(key string) (models.Tags, error)
|
||||
TagsForSeries(key []byte) (models.Tags, error)
|
||||
Dereference(b []byte)
|
||||
}
|
||||
|
|
267
tsdb/meta.go
267
tsdb/meta.go
|
@ -692,15 +692,6 @@ func (m *Measurement) DropSeries(series *Series) {
|
|||
return
|
||||
}
|
||||
|
||||
// filters walks the where clause of a select statement and returns a map with all series ids
|
||||
// matching the where clause and any filter expression that should be applied to each.
|
||||
func (m *Measurement) filters(condition influxql.Expr) ([]uint64, map[uint64]influxql.Expr, error) {
|
||||
if condition == nil || influxql.OnlyTimeExpr(condition) {
|
||||
return m.seriesIDs, nil, nil
|
||||
}
|
||||
return m.walkWhereForSeriesIds(condition)
|
||||
}
|
||||
|
||||
// TagSets returns the unique tag sets that exist for the given tag keys. This
|
||||
// is used to determine what composite series will be created by a group by.
|
||||
//
|
||||
|
@ -713,194 +704,7 @@ func (m *Measurement) filters(condition influxql.Expr) ([]uint64, map[uint64]inf
|
|||
// this shouldn't be exported. However, until tx.go and the engine get
|
||||
// refactored into tsdb, we need it.
|
||||
func (m *Measurement) TagSets(dimensions []string, condition influxql.Expr) ([]*influxql.TagSet, error) {
|
||||
m.mu.RLock()
|
||||
|
||||
// get the unique set of series ids and the filters that should be applied to each
|
||||
ids, filters, err := m.filters(condition)
|
||||
if err != nil {
|
||||
m.mu.RUnlock()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// For every series, get the tag values for the requested tag keys i.e. dimensions. This is the
|
||||
// TagSet for that series. Series with the same TagSet are then grouped together, because for the
|
||||
// purpose of GROUP BY they are part of the same composite series.
|
||||
tagSets := make(map[string]*influxql.TagSet, 64)
|
||||
for _, id := range ids {
|
||||
s := m.seriesByID[id]
|
||||
tags := make(map[string]string, len(dimensions))
|
||||
|
||||
// Build the TagSet for this series.
|
||||
for _, dim := range dimensions {
|
||||
tags[dim] = s.Tags.GetString(dim)
|
||||
}
|
||||
// Convert the TagSet to a string, so it can be added to a map allowing TagSets to be handled
|
||||
// as a set.
|
||||
tagsAsKey := MarshalTags(tags)
|
||||
tagSet, ok := tagSets[string(tagsAsKey)]
|
||||
if !ok {
|
||||
// This TagSet is new, create a new entry for it.
|
||||
tagSet = &influxql.TagSet{
|
||||
Tags: tags,
|
||||
Key: tagsAsKey,
|
||||
}
|
||||
}
|
||||
// Associate the series and filter with the Tagset.
|
||||
tagSet.AddFilter(m.seriesByID[id].Key, filters[id])
|
||||
|
||||
// Ensure it's back in the map.
|
||||
tagSets[string(tagsAsKey)] = tagSet
|
||||
}
|
||||
// Release the lock while we sort all the tags
|
||||
m.mu.RUnlock()
|
||||
|
||||
// Sort the series in each tag set.
|
||||
for _, t := range tagSets {
|
||||
sort.Sort(t)
|
||||
}
|
||||
|
||||
// The TagSets have been created, as a map of TagSets. Just send
|
||||
// the values back as a slice, sorting for consistency.
|
||||
sortedTagsSets := make([]*influxql.TagSet, 0, len(tagSets))
|
||||
for _, v := range tagSets {
|
||||
sortedTagsSets = append(sortedTagsSets, v)
|
||||
}
|
||||
sort.Sort(byTagKey(sortedTagsSets))
|
||||
|
||||
return sortedTagsSets, nil
|
||||
}
|
||||
|
||||
// intersectSeriesFilters performs an intersection for two sets of ids and filter expressions.
|
||||
func intersectSeriesFilters(lids, rids SeriesIDs, lfilters, rfilters FilterExprs) (SeriesIDs, FilterExprs) {
|
||||
// We only want to allocate a slice and map of the smaller size.
|
||||
var ids []uint64
|
||||
if len(lids) > len(rids) {
|
||||
ids = make([]uint64, 0, len(rids))
|
||||
} else {
|
||||
ids = make([]uint64, 0, len(lids))
|
||||
}
|
||||
|
||||
var filters FilterExprs
|
||||
if len(lfilters) > len(rfilters) {
|
||||
filters = make(FilterExprs, len(rfilters))
|
||||
} else {
|
||||
filters = make(FilterExprs, len(lfilters))
|
||||
}
|
||||
|
||||
// They're in sorted order so advance the counter as needed.
|
||||
// This is, don't run comparisons against lower values that we've already passed.
|
||||
for len(lids) > 0 && len(rids) > 0 {
|
||||
lid, rid := lids[0], rids[0]
|
||||
if lid == rid {
|
||||
ids = append(ids, lid)
|
||||
|
||||
var expr influxql.Expr
|
||||
lfilter := lfilters[lid]
|
||||
rfilter := rfilters[rid]
|
||||
|
||||
if lfilter != nil && rfilter != nil {
|
||||
be := &influxql.BinaryExpr{
|
||||
Op: influxql.AND,
|
||||
LHS: lfilter,
|
||||
RHS: rfilter,
|
||||
}
|
||||
expr = influxql.Reduce(be, nil)
|
||||
} else if lfilter != nil {
|
||||
expr = lfilter
|
||||
} else if rfilter != nil {
|
||||
expr = rfilter
|
||||
}
|
||||
|
||||
if expr != nil {
|
||||
filters[lid] = expr
|
||||
}
|
||||
lids, rids = lids[1:], rids[1:]
|
||||
} else if lid < rid {
|
||||
lids = lids[1:]
|
||||
} else {
|
||||
rids = rids[1:]
|
||||
}
|
||||
}
|
||||
return ids, filters
|
||||
}
|
||||
|
||||
// unionSeriesFilters performs a union for two sets of ids and filter expressions.
|
||||
func unionSeriesFilters(lids, rids SeriesIDs, lfilters, rfilters FilterExprs) (SeriesIDs, FilterExprs) {
|
||||
ids := make([]uint64, 0, len(lids)+len(rids))
|
||||
|
||||
// Setup the filters with the smallest size since we will discard filters
|
||||
// that do not have a match on the other side.
|
||||
var filters FilterExprs
|
||||
if len(lfilters) < len(rfilters) {
|
||||
filters = make(FilterExprs, len(lfilters))
|
||||
} else {
|
||||
filters = make(FilterExprs, len(rfilters))
|
||||
}
|
||||
|
||||
for len(lids) > 0 && len(rids) > 0 {
|
||||
lid, rid := lids[0], rids[0]
|
||||
if lid == rid {
|
||||
ids = append(ids, lid)
|
||||
|
||||
// If one side does not have a filter, then the series has been
|
||||
// included on one side of the OR with no condition. Eliminate the
|
||||
// filter in this case.
|
||||
var expr influxql.Expr
|
||||
lfilter := lfilters[lid]
|
||||
rfilter := rfilters[rid]
|
||||
if lfilter != nil && rfilter != nil {
|
||||
be := &influxql.BinaryExpr{
|
||||
Op: influxql.OR,
|
||||
LHS: lfilter,
|
||||
RHS: rfilter,
|
||||
}
|
||||
expr = influxql.Reduce(be, nil)
|
||||
}
|
||||
|
||||
if expr != nil {
|
||||
filters[lid] = expr
|
||||
}
|
||||
lids, rids = lids[1:], rids[1:]
|
||||
} else if lid < rid {
|
||||
ids = append(ids, lid)
|
||||
|
||||
filter := lfilters[lid]
|
||||
if filter != nil {
|
||||
filters[lid] = filter
|
||||
}
|
||||
lids = lids[1:]
|
||||
} else {
|
||||
ids = append(ids, rid)
|
||||
|
||||
filter := rfilters[rid]
|
||||
if filter != nil {
|
||||
filters[rid] = filter
|
||||
}
|
||||
rids = rids[1:]
|
||||
}
|
||||
}
|
||||
|
||||
// Now append the remainder.
|
||||
if len(lids) > 0 {
|
||||
for i := 0; i < len(lids); i++ {
|
||||
ids = append(ids, lids[i])
|
||||
|
||||
filter := lfilters[lids[i]]
|
||||
if filter != nil {
|
||||
filters[lids[i]] = filter
|
||||
}
|
||||
}
|
||||
} else if len(rids) > 0 {
|
||||
for i := 0; i < len(rids); i++ {
|
||||
ids = append(ids, rids[i])
|
||||
|
||||
filter := rfilters[rids[i]]
|
||||
if filter != nil {
|
||||
filters[rids[i]] = filter
|
||||
}
|
||||
}
|
||||
}
|
||||
return ids, filters
|
||||
panic("MOVED")
|
||||
}
|
||||
|
||||
// IDsForExpr returns the series IDs that are candidates to match the given expression.
|
||||
|
@ -1096,72 +900,6 @@ func (fe FilterExprs) Len() int {
|
|||
return len(fe)
|
||||
}
|
||||
|
||||
// walkWhereForSeriesIds recursively walks the WHERE clause and returns an
|
||||
// ordered set of series IDs and a map from those series IDs to filter
|
||||
// expressions that should be used to limit points returned in the final query
|
||||
// result.
|
||||
func (m *Measurement) walkWhereForSeriesIds(expr influxql.Expr) (SeriesIDs, FilterExprs, error) {
|
||||
switch n := expr.(type) {
|
||||
case *influxql.BinaryExpr:
|
||||
switch n.Op {
|
||||
case influxql.EQ, influxql.NEQ, influxql.LT, influxql.LTE, influxql.GT, influxql.GTE, influxql.EQREGEX, influxql.NEQREGEX:
|
||||
// Get the series IDs and filter expression for the tag or field comparison.
|
||||
ids, expr, err := m.idsForExpr(n)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
if len(ids) == 0 {
|
||||
return ids, nil, nil
|
||||
}
|
||||
|
||||
// If the expression is a boolean literal that is true, ignore it.
|
||||
if b, ok := expr.(*influxql.BooleanLiteral); ok && b.Val {
|
||||
expr = nil
|
||||
}
|
||||
|
||||
var filters FilterExprs
|
||||
if expr != nil {
|
||||
filters = make(FilterExprs, len(ids))
|
||||
for _, id := range ids {
|
||||
filters[id] = expr
|
||||
}
|
||||
}
|
||||
|
||||
return ids, filters, nil
|
||||
case influxql.AND, influxql.OR:
|
||||
// Get the series IDs and filter expressions for the LHS.
|
||||
lids, lfilters, err := m.walkWhereForSeriesIds(n.LHS)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
// Get the series IDs and filter expressions for the RHS.
|
||||
rids, rfilters, err := m.walkWhereForSeriesIds(n.RHS)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
// Combine the series IDs from the LHS and RHS.
|
||||
if n.Op == influxql.AND {
|
||||
ids, filters := intersectSeriesFilters(lids, rids, lfilters, rfilters)
|
||||
return ids, filters, nil
|
||||
} else {
|
||||
ids, filters := unionSeriesFilters(lids, rids, lfilters, rfilters)
|
||||
return ids, filters, nil
|
||||
}
|
||||
}
|
||||
|
||||
ids, _, err := m.idsForExpr(n)
|
||||
return ids, nil, err
|
||||
case *influxql.ParenExpr:
|
||||
// walk down the tree
|
||||
return m.walkWhereForSeriesIds(n.Expr)
|
||||
default:
|
||||
return nil, nil, nil
|
||||
}
|
||||
}
|
||||
|
||||
// expandExpr returns a list of expressions expanded by all possible tag
|
||||
// combinations.
|
||||
func (m *Measurement) expandExpr(expr influxql.Expr) []tagSetExpr {
|
||||
|
@ -1220,6 +958,7 @@ func expandExprWithValues(expr influxql.Expr, keys []string, tagExprs []tagExpr,
|
|||
return exprs
|
||||
}
|
||||
|
||||
<<<<<<< HEAD
|
||||
// SeriesIDsAllOrByExpr walks an expressions for matching series IDs
|
||||
// or, if no expression is given, returns all series IDs for the measurement.
|
||||
func (m *Measurement) SeriesIDsAllOrByExpr(expr influxql.Expr) (SeriesIDs, error) {
|
||||
|
@ -1246,6 +985,8 @@ func (m *Measurement) seriesIDsAllOrByExpr(expr influxql.Expr) (SeriesIDs, error
|
|||
return ids, nil
|
||||
}
|
||||
|
||||
=======
|
||||
>>>>>>> df7cec1... intermediate
|
||||
// tagKeysByExpr extracts the tag keys wanted by the expression.
|
||||
func (m *Measurement) TagKeysByExpr(expr influxql.Expr) (stringSet, bool, error) {
|
||||
switch e := expr.(type) {
|
||||
|
|
177
tsdb/shard.go
177
tsdb/shard.go
|
@ -10,7 +10,6 @@ import (
|
|||
"os"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
@ -1227,29 +1226,33 @@ func (itr *seriesIterator) Next() (*influxql.FloatPoint, error) {
|
|||
|
||||
// nextKeys reads all keys for the next measurement.
|
||||
func (itr *seriesIterator) nextKeys() error {
|
||||
for {
|
||||
// Ensure previous keys are cleared out.
|
||||
itr.keys.i, itr.keys.buf = 0, itr.keys.buf[:0]
|
||||
panic("MOVE TO TSI")
|
||||
|
||||
/*
|
||||
for {
|
||||
// Ensure previous keys are cleared out.
|
||||
itr.keys.i, itr.keys.buf = 0, itr.keys.buf[:0]
|
||||
|
||||
// Read next measurement.
|
||||
if len(itr.mms) == 0 {
|
||||
return nil
|
||||
}
|
||||
mm := itr.mms[0]
|
||||
itr.mms = itr.mms[1:]
|
||||
|
||||
// Read all series keys.
|
||||
ids, err := mm.seriesIDsAllOrByExpr(itr.opt.Condition)
|
||||
if err != nil {
|
||||
return err
|
||||
} else if len(ids) == 0 {
|
||||
continue
|
||||
}
|
||||
itr.keys.buf = mm.AppendSeriesKeysByID(itr.keys.buf, ids)
|
||||
sort.Strings(itr.keys.buf)
|
||||
|
||||
// Read next measurement.
|
||||
if len(itr.mms) == 0 {
|
||||
return nil
|
||||
}
|
||||
mm := itr.mms[0]
|
||||
itr.mms = itr.mms[1:]
|
||||
|
||||
// Read all series keys.
|
||||
ids, err := mm.seriesIDsAllOrByExpr(itr.opt.Condition)
|
||||
if err != nil {
|
||||
return err
|
||||
} else if len(ids) == 0 {
|
||||
continue
|
||||
}
|
||||
itr.keys.buf = mm.AppendSeriesKeysByID(itr.keys.buf, ids)
|
||||
sort.Strings(itr.keys.buf)
|
||||
|
||||
return nil
|
||||
}
|
||||
*/
|
||||
}
|
||||
|
||||
// NewTagKeysIterator returns a new instance of TagKeysIterator.
|
||||
|
@ -1273,82 +1276,86 @@ type tagValuesIterator struct {
|
|||
|
||||
// NewTagValuesIterator returns a new instance of TagValuesIterator.
|
||||
func NewTagValuesIterator(sh *Shard, opt influxql.IteratorOptions) (influxql.Iterator, error) {
|
||||
if opt.Condition == nil {
|
||||
return nil, errors.New("a condition is required")
|
||||
}
|
||||
panic("MOVE")
|
||||
|
||||
measurementExpr := influxql.CloneExpr(opt.Condition)
|
||||
measurementExpr = influxql.Reduce(influxql.RewriteExpr(measurementExpr, func(e influxql.Expr) influxql.Expr {
|
||||
switch e := e.(type) {
|
||||
case *influxql.BinaryExpr:
|
||||
switch e.Op {
|
||||
case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX:
|
||||
tag, ok := e.LHS.(*influxql.VarRef)
|
||||
if !ok || tag.Val != "_name" {
|
||||
return nil
|
||||
/*
|
||||
if opt.Condition == nil {
|
||||
return nil, errors.New("a condition is required")
|
||||
}
|
||||
|
||||
measurementExpr := influxql.CloneExpr(opt.Condition)
|
||||
measurementExpr = influxql.Reduce(influxql.RewriteExpr(measurementExpr, func(e influxql.Expr) influxql.Expr {
|
||||
switch e := e.(type) {
|
||||
case *influxql.BinaryExpr:
|
||||
switch e.Op {
|
||||
case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX:
|
||||
tag, ok := e.LHS.(*influxql.VarRef)
|
||||
if !ok || tag.Val != "_name" {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return e
|
||||
}), nil)
|
||||
return e
|
||||
}), nil)
|
||||
|
||||
mms, ok, err := sh.engine.MeasurementsByExpr(measurementExpr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
} else if !ok {
|
||||
if mms, err = sh.engine.Measurements(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
sort.Sort(mms)
|
||||
}
|
||||
|
||||
// If there are no measurements, return immediately.
|
||||
if len(mms) == 0 {
|
||||
return &tagValuesIterator{}, nil
|
||||
}
|
||||
|
||||
filterExpr := influxql.CloneExpr(opt.Condition)
|
||||
filterExpr = influxql.Reduce(influxql.RewriteExpr(filterExpr, func(e influxql.Expr) influxql.Expr {
|
||||
switch e := e.(type) {
|
||||
case *influxql.BinaryExpr:
|
||||
switch e.Op {
|
||||
case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX:
|
||||
tag, ok := e.LHS.(*influxql.VarRef)
|
||||
if !ok || strings.HasPrefix(tag.Val, "_") {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
return e
|
||||
}), nil)
|
||||
|
||||
var series []*Series
|
||||
keys := newStringSet()
|
||||
for _, mm := range mms {
|
||||
ss, ok, err := mm.TagKeysByExpr(opt.Condition)
|
||||
mms, ok, err := sh.engine.MeasurementsByExpr(measurementExpr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
} else if !ok {
|
||||
keys.add(mm.TagKeys()...)
|
||||
} else {
|
||||
keys = keys.union(ss)
|
||||
if mms, err = sh.engine.Measurements(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
sort.Sort(mms)
|
||||
}
|
||||
|
||||
ids, err := mm.seriesIDsAllOrByExpr(filterExpr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
// If there are no measurements, return immediately.
|
||||
if len(mms) == 0 {
|
||||
return &tagValuesIterator{}, nil
|
||||
}
|
||||
|
||||
for _, id := range ids {
|
||||
series = append(series, mm.SeriesByID(id))
|
||||
}
|
||||
}
|
||||
filterExpr := influxql.CloneExpr(opt.Condition)
|
||||
filterExpr = influxql.Reduce(influxql.RewriteExpr(filterExpr, func(e influxql.Expr) influxql.Expr {
|
||||
switch e := e.(type) {
|
||||
case *influxql.BinaryExpr:
|
||||
switch e.Op {
|
||||
case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX:
|
||||
tag, ok := e.LHS.(*influxql.VarRef)
|
||||
if !ok || strings.HasPrefix(tag.Val, "_") {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
return e
|
||||
}), nil)
|
||||
|
||||
return &tagValuesIterator{
|
||||
series: series,
|
||||
keys: keys.list(),
|
||||
fields: influxql.VarRefs(opt.Aux).Strings(),
|
||||
}, nil
|
||||
var series []*Series
|
||||
keys := newStringSet()
|
||||
for _, mm := range mms {
|
||||
ss, ok, err := mm.TagKeysByExpr(opt.Condition)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
} else if !ok {
|
||||
keys.add(mm.TagKeys()...)
|
||||
} else {
|
||||
keys = keys.union(ss)
|
||||
}
|
||||
|
||||
ids, err := mm.seriesIDsAllOrByExpr(filterExpr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, id := range ids {
|
||||
series = append(series, mm.SeriesByID(id))
|
||||
}
|
||||
}
|
||||
|
||||
return &tagValuesIterator{
|
||||
series: series,
|
||||
keys: keys.list(),
|
||||
fields: influxql.VarRefs(opt.Aux).Strings(),
|
||||
}, nil
|
||||
*/
|
||||
}
|
||||
|
||||
// Stats returns stats about the points processed.
|
||||
|
|
321
tsdb/store.go
321
tsdb/store.go
|
@ -10,7 +10,6 @@ import (
|
|||
"runtime"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
|
@ -696,78 +695,82 @@ func (s *Store) ShardRelativePath(id uint64) (string, error) {
|
|||
// DeleteSeries loops through the local shards and deletes the series data for
|
||||
// the passed in series keys.
|
||||
func (s *Store) DeleteSeries(database string, sources []influxql.Source, condition influxql.Expr) error {
|
||||
// Expand regex expressions in the FROM clause.
|
||||
a, err := s.ExpandSources(sources)
|
||||
if err != nil {
|
||||
return err
|
||||
} else if sources != nil && len(sources) != 0 && len(a) == 0 {
|
||||
return nil
|
||||
}
|
||||
sources = a
|
||||
panic("MOVE TO TSI")
|
||||
|
||||
// Determine deletion time range.
|
||||
min, max, err := influxql.TimeRangeAsEpochNano(condition)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
s.mu.RLock()
|
||||
shards := s.filterShards(byDatabase(database))
|
||||
s.mu.RUnlock()
|
||||
|
||||
mMap := make(map[string]*Measurement)
|
||||
for _, shard := range shards {
|
||||
shardMeasures := shard.Measurements()
|
||||
for _, m := range shardMeasures {
|
||||
mMap[m.Name] = m
|
||||
/*
|
||||
// Expand regex expressions in the FROM clause.
|
||||
a, err := s.ExpandSources(sources)
|
||||
if err != nil {
|
||||
return err
|
||||
} else if sources != nil && len(sources) != 0 && len(a) == 0 {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
sources = a
|
||||
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
|
||||
measurements, err := measurementsFromSourcesOrDB(mMap, sources...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var seriesKeys [][]byte
|
||||
for _, m := range measurements {
|
||||
var ids SeriesIDs
|
||||
var filters FilterExprs
|
||||
if condition != nil {
|
||||
// Get series IDs that match the WHERE clause.
|
||||
ids, filters, err = m.walkWhereForSeriesIds(condition)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Delete boolean literal true filter expressions.
|
||||
// These are returned for `WHERE tagKey = 'tagVal'` type expressions and are okay.
|
||||
filters.DeleteBoolLiteralTrues()
|
||||
|
||||
// Check for unsupported field filters.
|
||||
// Any remaining filters means there were fields (e.g., `WHERE value = 1.2`).
|
||||
if filters.Len() > 0 {
|
||||
return errors.New("fields not supported in WHERE clause during deletion")
|
||||
}
|
||||
} else {
|
||||
// No WHERE clause so get all series IDs for this measurement.
|
||||
ids = m.seriesIDs
|
||||
}
|
||||
|
||||
for _, id := range ids {
|
||||
seriesKeys = append(seriesKeys, []byte(m.seriesByID[id].Key))
|
||||
}
|
||||
}
|
||||
|
||||
// delete the raw series data.
|
||||
return s.walkShards(shards, func(sh *Shard) error {
|
||||
if err := sh.DeleteSeriesRange(seriesKeys, min, max); err != nil {
|
||||
// Determine deletion time range.
|
||||
min, max, err := influxql.TimeRangeAsEpochNano(condition)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
s.mu.RLock()
|
||||
shards := s.filterShards(byDatabase(database))
|
||||
s.mu.RUnlock()
|
||||
|
||||
mMap := make(map[string]*Measurement)
|
||||
for _, shard := range shards {
|
||||
shardMeasures := shard.Measurements()
|
||||
for _, m := range shardMeasures {
|
||||
mMap[m.Name] = m
|
||||
}
|
||||
}
|
||||
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
|
||||
measurements, err := measurementsFromSourcesOrDB(mMap, sources...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var seriesKeys [][]byte
|
||||
for _, m := range measurements {
|
||||
var ids SeriesIDs
|
||||
var filters FilterExprs
|
||||
if condition != nil {
|
||||
// Get series IDs that match the WHERE clause.
|
||||
ids, filters, err = m.walkWhereForSeriesIds(condition)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Delete boolean literal true filter expressions.
|
||||
// These are returned for `WHERE tagKey = 'tagVal'` type expressions and are okay.
|
||||
filters.DeleteBoolLiteralTrues()
|
||||
|
||||
// Check for unsupported field filters.
|
||||
// Any remaining filters means there were fields (e.g., `WHERE value = 1.2`).
|
||||
if filters.Len() > 0 {
|
||||
return errors.New("fields not supported in WHERE clause during deletion")
|
||||
}
|
||||
} else {
|
||||
// No WHERE clause so get all series IDs for this measurement.
|
||||
ids = m.seriesIDs
|
||||
}
|
||||
|
||||
for _, id := range ids {
|
||||
seriesKeys = append(seriesKeys, []byte(m.seriesByID[id].Key))
|
||||
}
|
||||
}
|
||||
|
||||
// delete the raw series data.
|
||||
return s.walkShards(shards, func(sh *Shard) error {
|
||||
if err := sh.DeleteSeriesRange(seriesKeys, min, max); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
})
|
||||
*/
|
||||
}
|
||||
|
||||
// ExpandSources expands sources against all local shards.
|
||||
|
@ -880,110 +883,114 @@ type TagValues struct {
|
|||
|
||||
// TagValues returns the tag keys and values in the given database, matching the condition.
|
||||
func (s *Store) TagValues(database string, cond influxql.Expr) ([]TagValues, error) {
|
||||
if cond == nil {
|
||||
return nil, errors.New("a condition is required")
|
||||
}
|
||||
panic("MOVE TO TSI")
|
||||
|
||||
measurementExpr := influxql.CloneExpr(cond)
|
||||
measurementExpr = influxql.Reduce(influxql.RewriteExpr(measurementExpr, func(e influxql.Expr) influxql.Expr {
|
||||
switch e := e.(type) {
|
||||
case *influxql.BinaryExpr:
|
||||
switch e.Op {
|
||||
case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX:
|
||||
tag, ok := e.LHS.(*influxql.VarRef)
|
||||
if !ok || tag.Val != "_name" {
|
||||
return nil
|
||||
/*
|
||||
if cond == nil {
|
||||
return nil, errors.New("a condition is required")
|
||||
}
|
||||
|
||||
measurementExpr := influxql.CloneExpr(cond)
|
||||
measurementExpr = influxql.Reduce(influxql.RewriteExpr(measurementExpr, func(e influxql.Expr) influxql.Expr {
|
||||
switch e := e.(type) {
|
||||
case *influxql.BinaryExpr:
|
||||
switch e.Op {
|
||||
case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX:
|
||||
tag, ok := e.LHS.(*influxql.VarRef)
|
||||
if !ok || tag.Val != "_name" {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return e
|
||||
}), nil)
|
||||
return e
|
||||
}), nil)
|
||||
|
||||
// Get all measurements for the shards we're interested in.
|
||||
s.mu.RLock()
|
||||
shards := s.filterShards(byDatabase(database))
|
||||
s.mu.RUnlock()
|
||||
// Get all measurements for the shards we're interested in.
|
||||
s.mu.RLock()
|
||||
shards := s.filterShards(byDatabase(database))
|
||||
s.mu.RUnlock()
|
||||
|
||||
var measures Measurements
|
||||
for _, sh := range shards {
|
||||
mms, ok, err := sh.MeasurementsByExpr(measurementExpr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
} else if !ok {
|
||||
// TODO(edd): can we simplify this so we don't have to check the
|
||||
// ok value, and we can call sh.measurements with a shard filter
|
||||
// instead?
|
||||
mms = sh.Measurements()
|
||||
var measures Measurements
|
||||
for _, sh := range shards {
|
||||
mms, ok, err := sh.MeasurementsByExpr(measurementExpr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
} else if !ok {
|
||||
// TODO(edd): can we simplify this so we don't have to check the
|
||||
// ok value, and we can call sh.measurements with a shard filter
|
||||
// instead?
|
||||
mms = sh.Measurements()
|
||||
}
|
||||
|
||||
measures = append(measures, mms...)
|
||||
}
|
||||
|
||||
measures = append(measures, mms...)
|
||||
}
|
||||
// If there are no measurements, return immediately.
|
||||
if len(measures) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
sort.Sort(measures)
|
||||
|
||||
// If there are no measurements, return immediately.
|
||||
if len(measures) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
sort.Sort(measures)
|
||||
|
||||
filterExpr := influxql.CloneExpr(cond)
|
||||
filterExpr = influxql.Reduce(influxql.RewriteExpr(filterExpr, func(e influxql.Expr) influxql.Expr {
|
||||
switch e := e.(type) {
|
||||
case *influxql.BinaryExpr:
|
||||
switch e.Op {
|
||||
case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX:
|
||||
tag, ok := e.LHS.(*influxql.VarRef)
|
||||
if !ok || strings.HasPrefix(tag.Val, "_") {
|
||||
return nil
|
||||
filterExpr := influxql.CloneExpr(cond)
|
||||
filterExpr = influxql.Reduce(influxql.RewriteExpr(filterExpr, func(e influxql.Expr) influxql.Expr {
|
||||
switch e := e.(type) {
|
||||
case *influxql.BinaryExpr:
|
||||
switch e.Op {
|
||||
case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX:
|
||||
tag, ok := e.LHS.(*influxql.VarRef)
|
||||
if !ok || strings.HasPrefix(tag.Val, "_") {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return e
|
||||
}), nil)
|
||||
return e
|
||||
}), nil)
|
||||
|
||||
tagValues := make([]TagValues, len(measures))
|
||||
for i, mm := range measures {
|
||||
tagValues[i].Measurement = mm.Name
|
||||
tagValues := make([]TagValues, len(measures))
|
||||
for i, mm := range measures {
|
||||
tagValues[i].Measurement = mm.Name
|
||||
|
||||
ids, err := mm.SeriesIDsAllOrByExpr(filterExpr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ss := mm.SeriesByIDSlice(ids)
|
||||
|
||||
// Determine a list of keys from condition.
|
||||
keySet, ok, err := mm.TagKeysByExpr(cond)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Loop over all keys for each series.
|
||||
m := make(map[KeyValue]struct{}, len(ss))
|
||||
for _, series := range ss {
|
||||
for _, t := range series.Tags {
|
||||
if !ok {
|
||||
// nop
|
||||
} else if _, exists := keySet[string(t.Key)]; !exists {
|
||||
continue
|
||||
}
|
||||
m[KeyValue{string(t.Key), string(t.Value)}] = struct{}{}
|
||||
ids, err := mm.SeriesIDsAllOrByExpr(filterExpr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ss := mm.SeriesByIDSlice(ids)
|
||||
|
||||
// Determine a list of keys from condition.
|
||||
keySet, ok, err := mm.TagKeysByExpr(cond)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Loop over all keys for each series.
|
||||
m := make(map[KeyValue]struct{}, len(ss))
|
||||
for _, series := range ss {
|
||||
for _, t := range series.Tags {
|
||||
if !ok {
|
||||
// nop
|
||||
} else if _, exists := keySet[string(t.Key)]; !exists {
|
||||
continue
|
||||
}
|
||||
m[KeyValue{string(t.Key), string(t.Value)}] = struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
// Return an empty slice if there are no key/value matches.
|
||||
if len(m) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
// Sort key/value set.
|
||||
a := make([]KeyValue, 0, len(m))
|
||||
for kv := range m {
|
||||
a = append(a, kv)
|
||||
}
|
||||
sort.Sort(KeyValues(a))
|
||||
tagValues[i].Values = a
|
||||
}
|
||||
|
||||
// Return an empty slice if there are no key/value matches.
|
||||
if len(m) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
// Sort key/value set.
|
||||
a := make([]KeyValue, 0, len(m))
|
||||
for kv := range m {
|
||||
a = append(a, kv)
|
||||
}
|
||||
sort.Sort(KeyValues(a))
|
||||
tagValues[i].Values = a
|
||||
}
|
||||
|
||||
return tagValues, nil
|
||||
return tagValues, nil
|
||||
*/
|
||||
}
|
||||
|
||||
// KeyValue holds a string key and a string value.
|
||||
|
|
Loading…
Reference in New Issue