pull/10090/head
Edd Robinson 2018-07-13 19:05:59 +01:00
parent 80dc07cbcb
commit afa581ea08
4 changed files with 74 additions and 10 deletions

View File

@ -409,11 +409,12 @@ func (e *QueryExecutor) ExecuteQuery(q, database string, chunkSize int) <-chan *
}
type MockShard struct {
Measurements []string
FieldDimensionsFn func(measurements []string) (fields map[string]influxql.DataType, dimensions map[string]struct{}, err error)
CreateIteratorFn func(ctx context.Context, m *influxql.Measurement, opt query.IteratorOptions) (query.Iterator, error)
IteratorCostFn func(m string, opt query.IteratorOptions) (query.IteratorCost, error)
ExpandSourcesFn func(sources influxql.Sources) (influxql.Sources, error)
Measurements []string
FieldDimensionsFn func(measurements []string) (fields map[string]influxql.DataType, dimensions map[string]struct{}, err error)
FieldKeysByMeasurementFn func(name []byte) []string
CreateIteratorFn func(ctx context.Context, m *influxql.Measurement, opt query.IteratorOptions) (query.Iterator, error)
IteratorCostFn func(m string, opt query.IteratorOptions) (query.IteratorCost, error)
ExpandSourcesFn func(sources influxql.Sources) (influxql.Sources, error)
}
func (sh *MockShard) MeasurementsByRegex(re *regexp.Regexp) []string {
@ -426,6 +427,10 @@ func (sh *MockShard) MeasurementsByRegex(re *regexp.Regexp) []string {
return names
}
func (sh *MockShard) FieldKeysByMeasurement(name []byte) []string {
return sh.FieldKeysByMeasurementFn(name)
}
func (sh *MockShard) FieldDimensions(measurements []string) (fields map[string]influxql.DataType, dimensions map[string]struct{}, err error) {
return sh.FieldDimensionsFn(measurements)
}

View File

@ -264,7 +264,7 @@ func HasSingleMeasurementNoOR(expr influxql.Expr) (string, bool) {
return
}
if be, ok := expr.(*influxql.BinaryExpr); ok {
if be, ok := node.(*influxql.BinaryExpr); ok {
if be.Op == influxql.OR {
invalidOP = true
return
@ -289,7 +289,7 @@ func HasSingleMeasurementNoOR(expr influxql.Expr) (string, bool) {
}
}
})
return lastMeasurement, foundOnce && !invalidOP
return lastMeasurement, len(lastMeasurement) > 0 && foundOnce && !invalidOP
}
func HasFieldKeyOrValue(expr influxql.Expr) (bool, bool) {

View File

@ -149,6 +149,65 @@ func TestNodeToExpr(t *testing.T) {
}
}
func TestHasSingleMeasurementNoOR(t *testing.T) {
cases := []struct {
expr influxql.Expr
name string
ok bool
}{
{
expr: influxql.MustParseExpr(`_name = 'm0'`),
name: "m0",
ok: true,
},
{
expr: influxql.MustParseExpr(`_something = 'f' AND _name = 'm0'`),
name: "m0",
ok: true,
},
{
expr: influxql.MustParseExpr(`_something = 'f' AND (a =~ /x0/ AND _name = 'm0')`),
name: "m0",
ok: true,
},
{
expr: influxql.MustParseExpr(`tag1 != 'foo'`),
ok: false,
},
{
expr: influxql.MustParseExpr(`_name = 'm0' OR tag1 != 'foo'`),
ok: false,
},
{
expr: influxql.MustParseExpr(`_name = 'm0' AND tag1 != 'foo' AND _name = 'other'`),
ok: false,
},
{
expr: influxql.MustParseExpr(`_name = 'm0' AND tag1 != 'foo' OR _name = 'other'`),
ok: false,
},
{
expr: influxql.MustParseExpr(`_name = 'm0' AND (tag1 != 'foo' OR tag2 = 'other')`),
ok: false,
},
{
expr: influxql.MustParseExpr(`(tag1 != 'foo' OR tag2 = 'other') OR _name = 'm0'`),
ok: false,
},
}
for _, tc := range cases {
name, ok := storage.HasSingleMeasurementNoOR(tc.expr)
if ok != tc.ok {
t.Fatalf("got %q, %v for expression %q, expected %q, %v", name, ok, tc.expr, tc.name, tc.ok)
}
if ok && name != tc.name {
t.Fatalf("got %q, %v for expression %q, expected %q, %v", name, ok, tc.expr, tc.name, tc.ok)
}
}
}
func TestRewriteExprRemoveFieldKeyAndValue(t *testing.T) {
node := &storage.Node{
NodeType: storage.NodeTypeLogicalExpression,

View File

@ -105,8 +105,8 @@ func newIndexSeriesCursor(ctx context.Context, predicate *Predicate, shards []*t
}
var mitr tsdb.MeasurementIterator
name, ok := HasSingleMeasurementNoOR(p.measurementCond)
if ok {
name, singleMeasurement := HasSingleMeasurementNoOR(p.measurementCond)
if singleMeasurement {
mitr = tsdb.NewMeasurementSliceIterator([][]byte{[]byte(name)})
}
@ -117,7 +117,7 @@ func newIndexSeriesCursor(ctx context.Context, predicate *Predicate, shards []*t
// single measurement. In this case we can efficiently produce all known
// field keys from the collection of shards without having to go via
// the query engine.
if ok {
if singleMeasurement {
fkeys := sg.FieldKeysByMeasurement([]byte(name))
if len(fkeys) == 0 {
goto CLEANUP