Ensure all shards checked for fields within an IndexSet

pull/9551/head
Edd Robinson 2018-03-12 15:25:45 +00:00
parent c1e1412dae
commit ec93b0eb0c
1 changed files with 41 additions and 21 deletions

View File

@ -1092,8 +1092,9 @@ func (itr *tagValueMergeIterator) Next() (_ []byte, err error) {
// IndexSet represents a list of indexes.
type IndexSet struct {
Indexes []Index
SeriesFile *SeriesFile
Indexes []Index // The set of indexes comprising this IndexSet.
SeriesFile *SeriesFile // The Series File associated with the db for this set.
fieldSets []*MeasurementFieldSet // field sets for _all_ indexes in this set's DB.
}
// Database returns the database name of the first index.
@ -1104,20 +1105,40 @@ func (is IndexSet) Database() string {
return is.Indexes[0].Database()
}
// FieldSet returns the fieldset of the first index.
func (is IndexSet) FieldSet() *MeasurementFieldSet {
// HasField determines if any of the field sets on the set of indexes in the
// IndexSet have the provided field for the provided measurement.
func (is IndexSet) HasField(measurement []byte, field string) bool {
if len(is.Indexes) == 0 {
return nil
return false
}
return is.Indexes[0].FieldSet()
if len(is.fieldSets) == 0 {
// field sets may not have been initialised yet.
is.fieldSets = make([]*MeasurementFieldSet, 0, len(is.Indexes))
for _, idx := range is.Indexes {
is.fieldSets = append(is.fieldSets, idx.FieldSet())
}
}
for _, fs := range is.fieldSets {
if fs.Fields(measurement).HasField(field) {
return true
}
}
return false
}
// DedupeInmemIndexes returns an index set which removes duplicate in-memory indexes.
func (is IndexSet) DedupeInmemIndexes() IndexSet {
other := IndexSet{Indexes: make([]Index, 0, len(is.Indexes)), SeriesFile: is.SeriesFile}
other := IndexSet{
Indexes: make([]Index, 0, len(is.Indexes)),
SeriesFile: is.SeriesFile,
fieldSets: make([]*MeasurementFieldSet, 0, len(is.Indexes)),
}
var hasInmem bool
for _, idx := range is.Indexes {
other.fieldSets = append(other.fieldSets, idx.FieldSet())
if idx.Type() == "inmem" {
if !hasInmem {
other.Indexes = append(other.Indexes, idx)
@ -1699,9 +1720,8 @@ func (is IndexSet) measurementSeriesByExprIterator(name []byte, expr influxql.Ex
if expr == nil {
return is.measurementSeriesIDIterator(name)
}
fieldset := is.FieldSet()
itr, err := is.seriesByExprIterator(name, expr, fieldset.CreateFieldsIfNotExists(name))
itr, err := is.seriesByExprIterator(name, expr)
if err != nil {
return nil, err
}
@ -1754,19 +1774,19 @@ func (is IndexSet) MeasurementSeriesKeysByExpr(name []byte, expr influxql.Expr)
return keys, nil
}
func (is IndexSet) seriesByExprIterator(name []byte, expr influxql.Expr, mf *MeasurementFields) (SeriesIDIterator, error) {
func (is IndexSet) seriesByExprIterator(name []byte, expr influxql.Expr) (SeriesIDIterator, error) {
switch expr := expr.(type) {
case *influxql.BinaryExpr:
switch expr.Op {
case influxql.AND, influxql.OR:
// Get the series IDs and filter expressions for the LHS.
litr, err := is.seriesByExprIterator(name, expr.LHS, mf)
litr, err := is.seriesByExprIterator(name, expr.LHS)
if err != nil {
return nil, err
}
// Get the series IDs and filter expressions for the RHS.
ritr, err := is.seriesByExprIterator(name, expr.RHS, mf)
ritr, err := is.seriesByExprIterator(name, expr.RHS)
if err != nil {
if litr != nil {
litr.Close()
@ -1783,11 +1803,11 @@ func (is IndexSet) seriesByExprIterator(name []byte, expr influxql.Expr, mf *Mea
return UnionSeriesIDIterators(litr, ritr), nil
default:
return is.seriesByBinaryExprIterator(name, expr, mf)
return is.seriesByBinaryExprIterator(name, expr)
}
case *influxql.ParenExpr:
return is.seriesByExprIterator(name, expr.Expr, mf)
return is.seriesByExprIterator(name, expr.Expr)
case *influxql.BooleanLiteral:
if expr.Val {
@ -1801,7 +1821,7 @@ func (is IndexSet) seriesByExprIterator(name []byte, expr influxql.Expr, mf *Mea
}
// seriesByBinaryExprIterator returns a series iterator and a filtering expression.
func (is IndexSet) seriesByBinaryExprIterator(name []byte, n *influxql.BinaryExpr, mf *MeasurementFields) (SeriesIDIterator, error) {
func (is IndexSet) seriesByBinaryExprIterator(name []byte, n *influxql.BinaryExpr) (SeriesIDIterator, error) {
// If this binary expression has another binary expression, then this
// is some expression math and we should just pass it to the underlying query.
if _, ok := n.LHS.(*influxql.BinaryExpr); ok {
@ -1830,7 +1850,7 @@ func (is IndexSet) seriesByBinaryExprIterator(name []byte, n *influxql.BinaryExp
}
// For fields, return all series from this measurement.
if key.Val != "_name" && ((key.Type == influxql.Unknown && mf.HasField(key.Val)) || key.Type == influxql.AnyField || (key.Type != influxql.Tag && key.Type != influxql.Unknown)) {
if key.Val != "_name" && ((key.Type == influxql.Unknown && is.HasField(name, key.Val)) || key.Type == influxql.AnyField || (key.Type != influxql.Tag && key.Type != influxql.Unknown)) {
itr, err := is.measurementSeriesIDIterator(name)
if err != nil {
return nil, err
@ -1838,7 +1858,7 @@ func (is IndexSet) seriesByBinaryExprIterator(name []byte, n *influxql.BinaryExp
return newSeriesIDExprIterator(itr, n), nil
} else if value, ok := value.(*influxql.VarRef); ok {
// Check if the RHS is a variable and if it is a field.
if value.Val != "_name" && ((value.Type == influxql.Unknown && mf.HasField(value.Val)) || key.Type == influxql.AnyField || (value.Type != influxql.Tag && value.Type != influxql.Unknown)) {
if value.Val != "_name" && ((value.Type == influxql.Unknown && is.HasField(name, value.Val)) || key.Type == influxql.AnyField || (value.Type != influxql.Tag && value.Type != influxql.Unknown)) {
itr, err := is.measurementSeriesIDIterator(name)
if err != nil {
return nil, err
@ -2130,7 +2150,7 @@ func (is IndexSet) matchTagValueNotEqualNotEmptySeriesIDIterator(name, key []byt
func (is IndexSet) TagValuesByKeyAndExpr(auth query.Authorizer, name []byte, keys []string, expr influxql.Expr, fieldset *MeasurementFieldSet) ([]map[string]struct{}, error) {
release := is.SeriesFile.Retain()
defer release()
return is.tagValuesByKeyAndExpr(auth, name, keys, expr, fieldset)
return is.tagValuesByKeyAndExpr(auth, name, keys, expr)
}
// tagValuesByKeyAndExpr retrieves tag values for the provided tag keys. See
@ -2138,10 +2158,10 @@ func (is IndexSet) TagValuesByKeyAndExpr(auth query.Authorizer, name []byte, key
//
// tagValuesByKeyAndExpr guarantees to never take any locks on the underlying
// series file.
func (is IndexSet) tagValuesByKeyAndExpr(auth query.Authorizer, name []byte, keys []string, expr influxql.Expr, fieldset *MeasurementFieldSet) ([]map[string]struct{}, error) {
func (is IndexSet) tagValuesByKeyAndExpr(auth query.Authorizer, name []byte, keys []string, expr influxql.Expr) ([]map[string]struct{}, error) {
database := is.Database()
itr, err := is.seriesByExprIterator(name, expr, fieldset.Fields(string(name)))
itr, err := is.seriesByExprIterator(name, expr)
if err != nil {
return nil, err
} else if itr == nil {
@ -2290,7 +2310,7 @@ func (is IndexSet) MeasurementTagKeyValuesByExpr(auth query.Authorizer, name []b
// This is the case where we have filtered series by some WHERE condition.
// We only care about the tag values for the keys given the
// filtered set of series ids.
resultSet, err := is.tagValuesByKeyAndExpr(auth, name, keys, expr, is.FieldSet())
resultSet, err := is.tagValuesByKeyAndExpr(auth, name, keys, expr)
if err != nil {
return nil, err
}