Correctly check if raw derivative is required
The multiple checks for Mapper and Executor type -- the lack of DRYness in this code -- meant the same checks would need to be copied. Therefore this change, as well as fixing the bug, improves the situation a little bit by *asking* the Mappers what type of Executor is required. This code is still not ideal. Fixes #3355.pull/3361/head
parent
dfdffb6401
commit
0d6c6bbe6f
|
@ -1202,6 +1202,45 @@ func TestServer_Query_SelectRelativeTime(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// Ensure the server can handle various simple calculus queries.
|
||||
func TestServer_Query_SelectRawCalculus(t *testing.T) {
|
||||
t.Parallel()
|
||||
s := OpenServer(NewConfig(), "")
|
||||
defer s.Close()
|
||||
|
||||
if err := s.CreateDatabaseAndRetentionPolicy("db0", newRetentionPolicyInfo("rp0", 1, 1*time.Hour)); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
test := NewTest("db0", "rp0")
|
||||
test.write = fmt.Sprintf("cpu value=210 1278010021000000000\ncpu value=10 1278010022000000000")
|
||||
|
||||
test.addQueries([]*Query{
|
||||
&Query{
|
||||
name: "calculate single derivate",
|
||||
command: `SELECT derivative(value) from db0.rp0.cpu`,
|
||||
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","value"],"values":[["2010-07-01T18:47:02Z",-200]]}]}]}`,
|
||||
},
|
||||
}...)
|
||||
|
||||
for i, query := range test.queries {
|
||||
if i == 0 {
|
||||
if err := test.init(s); err != nil {
|
||||
t.Fatalf("test init failed: %s", err)
|
||||
}
|
||||
}
|
||||
if query.skip {
|
||||
t.Logf("SKIP:: %s", query.name)
|
||||
continue
|
||||
}
|
||||
if err := query.Execute(s); err != nil {
|
||||
t.Error(query.Error(err))
|
||||
} else if !query.success() {
|
||||
t.Error(query.failureMessage())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// mergeMany ensures that when merging many series together and some of them have a different number
|
||||
// of points than others in a group by interval the results are correct
|
||||
func TestServer_Query_MergeMany(t *testing.T) {
|
||||
|
|
|
@ -184,7 +184,7 @@ func (rm *RawMapper) NextChunk() (interface{}, error) {
|
|||
|
||||
// Close closes the mapper.
|
||||
func (rm *RawMapper) Close() {
|
||||
if rm.tx != nil {
|
||||
if rm != nil && rm.tx != nil {
|
||||
_ = rm.tx.Rollback()
|
||||
}
|
||||
}
|
||||
|
@ -469,7 +469,7 @@ func (am *AggMapper) TagSets() []string {
|
|||
|
||||
// Close closes the mapper.
|
||||
func (am *AggMapper) Close() {
|
||||
if am.tx != nil {
|
||||
if am != nil && am.tx != nil {
|
||||
_ = am.tx.Rollback()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -251,10 +251,19 @@ func (q *QueryExecutor) plan(stmt *influxql.SelectStatement, chunkSize int) (Exe
|
|||
mappers = append(mappers, m)
|
||||
}
|
||||
|
||||
if stmt.IsRawQuery && !stmt.HasDistinct() {
|
||||
return NewRawExecutor(stmt, mappers, chunkSize), nil
|
||||
var executor Executor
|
||||
if len(mappers) > 0 {
|
||||
// All Mapper are of same type, so check first to determine correct Executor type.
|
||||
if _, ok := mappers[0].(*RawMapper); ok {
|
||||
executor = NewRawExecutor(stmt, mappers, chunkSize)
|
||||
} else {
|
||||
executor = NewAggregateExecutor(stmt, mappers)
|
||||
}
|
||||
} else {
|
||||
// With no mappers, the Executor type doesn't matter.
|
||||
executor = NewRawExecutor(stmt, nil, chunkSize)
|
||||
}
|
||||
return NewAggregateExecutor(stmt, mappers), nil
|
||||
return executor, nil
|
||||
}
|
||||
|
||||
// executeSelectStatement plans and executes a select statement against a database.
|
||||
|
|
|
@ -309,7 +309,7 @@ func (s *Store) CreateMapper(shardID uint64, query string, chunkSize int) (Mappe
|
|||
return nil, nil
|
||||
}
|
||||
|
||||
if stmt.IsRawQuery && !stmt.HasDistinct() {
|
||||
if (stmt.IsRawQuery && !stmt.HasDistinct()) || stmt.IsSimpleDerivative() {
|
||||
return NewRawMapper(shard, stmt, chunkSize), nil
|
||||
}
|
||||
return NewAggMapper(shard, stmt), nil
|
||||
|
|
Loading…
Reference in New Issue