Merge pull request #9487 from influxdata/sgc-tagsets

fallback to inmem TagSets implementation
pull/9495/head v1.5.0rc4
Stuart Carnie 2018-02-27 09:06:54 -07:00 committed by GitHub
commit 48fb2a4cc5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 98 additions and 50 deletions

View File

@ -2169,6 +2169,10 @@ func (e *Engine) CreateIterator(ctx context.Context, measurement string, opt que
return newMergeFinalizerIterator(ctx, itrs, opt, e.logger)
}
type indexTagSets interface {
TagSets(name []byte, options query.IteratorOptions) ([]*query.TagSet, error)
}
func (e *Engine) createCallIterator(ctx context.Context, measurement string, call *influxql.Call, opt query.IteratorOptions) ([]query.Iterator, error) {
ref, _ := call.Args[0].(*influxql.VarRef)
@ -2179,8 +2183,18 @@ func (e *Engine) createCallIterator(ctx context.Context, measurement string, cal
}
// Determine tagsets for this measurement based on dimensions and filters.
indexSet := tsdb.IndexSet{Indexes: []tsdb.Index{e.index}, SeriesFile: e.sfile}
tagSets, err := indexSet.TagSets(e.sfile, []byte(measurement), opt)
var (
tagSets []*query.TagSet
err error
)
if e.index.Type() == "inmem" {
ts := e.index.(indexTagSets)
tagSets, err = ts.TagSets([]byte(measurement), opt)
} else {
indexSet := tsdb.IndexSet{Indexes: []tsdb.Index{e.index}, SeriesFile: e.sfile}
tagSets, err = indexSet.TagSets(e.sfile, []byte(measurement), opt)
}
if err != nil {
return nil, err
}
@ -2249,9 +2263,18 @@ func (e *Engine) createVarRefIterator(ctx context.Context, measurement string, o
return nil, nil
}
// Determine tagsets for this measurement based on dimensions and filters.
indexSet := tsdb.IndexSet{Indexes: []tsdb.Index{e.index}, SeriesFile: e.sfile}
tagSets, err := indexSet.TagSets(e.sfile, []byte(measurement), opt)
var (
tagSets []*query.TagSet
err error
)
if e.index.Type() == "inmem" {
ts := e.index.(indexTagSets)
tagSets, err = ts.TagSets([]byte(measurement), opt)
} else {
indexSet := tsdb.IndexSet{Indexes: []tsdb.Index{e.index}, SeriesFile: e.sfile}
tagSets, err = indexSet.TagSets(e.sfile, []byte(measurement), opt)
}
if err != nil {
return nil, err
}

View File

@ -2310,8 +2310,16 @@ func (is IndexSet) TagSets(sfile *SeriesFile, name []byte, opt query.IteratorOpt
itr, err := is.measurementSeriesByExprIterator(name, opt.Condition)
if err != nil {
return nil, err
} else if itr != nil {
defer itr.Close()
} else if itr == nil {
return nil, nil
}
defer itr.Close()
var dims []string
if len(opt.Dimensions) > 0 {
dims = make([]string, len(opt.Dimensions))
copy(dims, opt.Dimensions)
sort.Strings(dims)
}
// For every series, get the tag values for the requested tag keys i.e.
@ -2319,52 +2327,69 @@ func (is IndexSet) TagSets(sfile *SeriesFile, name []byte, opt query.IteratorOpt
// TagSet are then grouped together, because for the purpose of GROUP BY
// they are part of the same composite series.
tagSets := make(map[string]*query.TagSet, 64)
var (
seriesN, maxSeriesN int
db = is.Database()
)
if itr != nil {
for {
e, err := itr.Next()
if err != nil {
return nil, err
} else if e.SeriesID == 0 {
break
}
if opt.MaxSeriesN > 0 {
maxSeriesN = opt.MaxSeriesN
} else {
maxSeriesN = int(^uint(0) >> 1)
}
// Skip if the series has been tombstoned.
key := sfile.SeriesKey(e.SeriesID)
if len(key) == 0 {
continue
}
_, tags := ParseSeriesKey(key)
if opt.Authorizer != nil && !opt.Authorizer.AuthorizeSeriesRead(is.Database(), name, tags) {
continue
}
tagsMap := make(map[string]string, len(opt.Dimensions))
// Build the TagSet for this series.
for _, dim := range opt.Dimensions {
tagsMap[dim] = tags.GetString(dim)
}
// Convert the TagSet to a string, so it can be added to a map
// allowing TagSets to be handled as a set.
tagsAsKey := MarshalTags(tagsMap)
tagSet, ok := tagSets[string(tagsAsKey)]
if !ok {
// This TagSet is new, create a new entry for it.
tagSet = &query.TagSet{
Tags: tagsMap,
Key: tagsAsKey,
}
}
// Associate the series and filter with the Tagset.
tagSet.AddFilter(string(models.MakeKey(name, tags)), e.Expr)
// Ensure it's back in the map.
tagSets[string(tagsAsKey)] = tagSet
for {
se, err := itr.Next()
if err != nil {
return nil, err
} else if se.SeriesID == 0 {
break
}
// Skip if the series has been tombstoned.
key := sfile.SeriesKey(se.SeriesID)
if len(key) == 0 {
continue
}
if seriesN&0x3fff == 0x3fff {
// check every 16384 series if the query has been canceled
select {
case <-opt.InterruptCh:
return nil, query.ErrQueryInterrupted
default:
}
}
if seriesN > maxSeriesN {
return nil, fmt.Errorf("max-select-series limit exceeded: (%d/%d)", seriesN, opt.MaxSeriesN)
}
_, tags := ParseSeriesKey(key)
if opt.Authorizer != nil && !opt.Authorizer.AuthorizeSeriesRead(db, name, tags) {
continue
}
var tagsAsKey []byte
if len(dims) > 0 {
tagsAsKey = MakeTagsKey(dims, tags)
}
tagSet, ok := tagSets[string(tagsAsKey)]
if !ok {
// This TagSet is new, create a new entry for it.
tagSet = &query.TagSet{
Tags: nil,
Key: tagsAsKey,
}
}
// Associate the series and filter with the Tagset.
tagSet.AddFilter(string(models.MakeKey(name, tags)), se.Expr)
// Ensure it's back in the map.
tagSets[string(tagsAsKey)] = tagSet
seriesN++
}
// Sort the series in each tag set.