fix #3414: shard mappers perform query re-writing

pull/3495/head
David Norton 2015-07-28 18:49:18 -04:00
parent d810682019
commit d661bf1a06
13 changed files with 332 additions and 156 deletions

View File

@ -13,6 +13,7 @@
- [#3411](https://github.com/influxdb/influxdb/issues/3411): 500 timeout on write
- [#3420](https://github.com/influxdb/influxdb/pull/3420): Catch opentsdb malformed tags. Thanks @nathanielc.
- [#3404](https://github.com/influxdb/influxdb/pull/3404): Added support for escaped single quotes in query string. Thanks @jhorwit2
- [#3414](https://github.com/influxdb/influxdb/issues/3414): Shard mappers perform query re-writing
## v0.9.2 [2015-07-24]

View File

@ -239,6 +239,7 @@ type MapShardResponse struct {
Message *string `protobuf:"bytes,2,opt" json:"Message,omitempty"`
Data []byte `protobuf:"bytes,3,opt" json:"Data,omitempty"`
TagSets []string `protobuf:"bytes,4,rep" json:"TagSets,omitempty"`
Fields []string `protobuf:"bytes,5,rep" json:"Fields,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -274,5 +275,12 @@ func (m *MapShardResponse) GetTagSets() []string {
return nil
}
func (m *MapShardResponse) GetFields() []string {
if m != nil {
return m.Fields
}
return nil
}
func init() {
}

View File

@ -45,4 +45,5 @@ message MapShardResponse {
optional string Message = 2;
optional bytes Data = 3;
repeated string TagSets = 4;
repeated string Fields = 5;
}

View File

@ -51,11 +51,13 @@ func NewMapShardResponse(code int, message string) *MapShardResponse {
func (r *MapShardResponse) Code() int { return int(r.pb.GetCode()) }
func (r *MapShardResponse) Message() string { return r.pb.GetMessage() }
func (r *MapShardResponse) TagSets() []string { return r.pb.GetTagSets() }
func (r *MapShardResponse) Fields() []string { return r.pb.GetFields() }
func (r *MapShardResponse) Data() []byte { return r.pb.GetData() }
func (r *MapShardResponse) SetCode(code int) { r.pb.Code = proto.Int32(int32(code)) }
func (r *MapShardResponse) SetMessage(message string) { r.pb.Message = &message }
func (r *MapShardResponse) SetTagSets(tagsets []string) { r.pb.TagSets = tagsets }
func (r *MapShardResponse) SetFields(fields []string) { r.pb.Fields = fields }
func (r *MapShardResponse) SetData(data []byte) { r.pb.Data = data }
// MarshalBinary encodes the object to a binary format.

View File

@ -243,13 +243,14 @@ func (s *Service) processMapShardRequest(w io.Writer, buf []byte) error {
}
defer m.Close()
var tagSetsSent bool
var metaSent bool
for {
var resp MapShardResponse
if !tagSetsSent {
if !metaSent {
resp.SetTagSets(m.TagSets())
tagSetsSent = true
resp.SetFields(m.Fields())
metaSent = true
}
chunk, err := m.NextChunk()

View File

@ -94,6 +94,7 @@ type RemoteMapper struct {
chunkSize int
tagsets []string
fields []string
conn remoteShardConn
bufferedResponse *MapShardResponse
@ -161,6 +162,10 @@ func (r *RemoteMapper) TagSets() []string {
return r.tagsets
}
func (r *RemoteMapper) Fields() []string {
return r.fields
}
// NextChunk returns the next chunk read from the remote node to the client.
func (r *RemoteMapper) NextChunk() (chunk interface{}, err error) {
output := &tsdb.MapperOutput{}

View File

@ -908,7 +908,7 @@ func TestServer_Query_Tags(t *testing.T) {
&Query{
name: "field with two tags should succeed",
command: `SELECT host, value, core FROM db0.rp0.cpu`,
exp: fmt.Sprintf(`{"results":[{"series":[{"name":"cpu","tags":{"host":"server01"},"columns":["time","value","core"],"values":[["%s",100,4]]},{"name":"cpu","tags":{"host":"server02"},"columns":["time","value","core"],"values":[["%s",50,2]]}]}]}`, now.Format(time.RFC3339Nano), now.Add(1).Format(time.RFC3339Nano)),
exp: fmt.Sprintf(`{"results":[{"series":[{"name":"cpu","tags":{"host":"server01"},"columns":["time","core","value"],"values":[["%s",4,100]]},{"name":"cpu","tags":{"host":"server02"},"columns":["time","core","value"],"values":[["%s",2,50]]}]}]}`, now.Format(time.RFC3339Nano), now.Add(1).Format(time.RFC3339Nano)),
},
&Query{
name: "select * with tags should succeed",
@ -2120,7 +2120,7 @@ func TestServer_Query_Where_Fields(t *testing.T) {
name: "missing measurement with group by",
params: url.Values{"db": []string{"db0"}},
command: `SELECT load from missing group by *`,
exp: `{"results":[{"error":"measurement not found: \"db0\".\"rp0\".missing"}]}`,
exp: `{"results":[{}]}`,
},
// string
@ -2134,7 +2134,7 @@ func TestServer_Query_Where_Fields(t *testing.T) {
name: "string AND query, all fields in SELECT",
params: url.Values{"db": []string{"db0"}},
command: `SELECT alert_id,tenant_id,_cust FROM cpu WHERE alert_id='alert' AND tenant_id='tenant'`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","alert_id","tenant_id","_cust"],"values":[["2015-02-28T01:03:36.703820946Z","alert","tenant","johnson brothers"]]}]}]}`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","_cust","alert_id","tenant_id"],"values":[["2015-02-28T01:03:36.703820946Z","johnson brothers","alert","tenant"]]}]}]}`,
},
&Query{
name: "string AND query, all fields in SELECT, one in parenthesis",
@ -2617,7 +2617,7 @@ func TestServer_Query_DropAndRecreateMeasurement(t *testing.T) {
&Query{
name: "verify cpu measurement is gone",
command: `SELECT * FROM cpu`,
exp: `{"results":[{"error":"measurement not found: \"db0\".\"rp0\".cpu"}]}`,
exp: `{"results":[{}]}`,
params: url.Values{"db": []string{"db0"}},
},
&Query{

View File

@ -25,6 +25,7 @@ const (
type Mapper interface {
Open() error
TagSets() []string
Fields() []string
NextChunk() (interface{}, error)
Close()
}
@ -78,6 +79,11 @@ func (e *Executor) Execute() <-chan *influxql.Row {
// Create output channel and stream data in a separate goroutine.
out := make(chan *influxql.Row, 0)
// Certain operations on the SELECT statement can be performed by the Executor without
// assistance from the Mappers. This allows the Executor to prepare aggregation functions
// and mathematical functions.
e.stmt.RewriteDistinct()
if (e.stmt.IsRawQuery && !e.stmt.HasDistinct()) || e.stmt.IsSimpleDerivative() {
go e.executeRaw(out)
} else {
@ -151,6 +157,12 @@ func (e *Executor) executeRaw(out chan *influxql.Row) {
}
}
// Get the union of SELECT fields across all mappers.
selectFields := newStringSet()
for _, m := range e.mappers {
selectFields.add(m.Fields()...)
}
// Used to read ahead chunks from mappers.
var rowWriter *limitedRowWriter
var currTagset string
@ -177,6 +189,20 @@ func (e *Executor) executeRaw(out chan *influxql.Row) {
m.drained = true
break
}
// If the SELECT query is on more than 1 field, but the chunks values from the Mappers
// only contain a single value, create k-v pairs using the field name of the chunk
// and the value of the chunk. If there is only 1 SELECT field across all mappers then
// there is no need to create k-v pairs, and there is no need to distinguish field data,
// as it is all for the *same* field.
if len(selectFields) > 1 && len(m.bufferedChunk.Fields) == 1 {
fieldKey := m.bufferedChunk.Fields[0]
for i := range m.bufferedChunk.Values {
field := map[string]interface{}{fieldKey: m.bufferedChunk.Values[i].Value}
m.bufferedChunk.Values[i].Value = field
}
}
}
if e.tagSetIsLimited(m.bufferedChunk.Name) {
@ -264,7 +290,7 @@ func (e *Executor) executeRaw(out chan *influxql.Row) {
chunkSize: e.chunkSize,
name: chunkedOutput.Name,
tags: chunkedOutput.Tags,
selectNames: e.stmt.NamesInSelect(),
selectNames: selectFields.list(),
fields: e.stmt.Fields,
c: out,
}

View File

@ -2,7 +2,6 @@ package tsdb_test
import (
"encoding/json"
"fmt"
"io/ioutil"
"math"
"os"
@ -249,6 +248,78 @@ func TestWritePointsAndExecuteTwoShardsAlign(t *testing.T) {
}
}
// Test to ensure the engine handles query re-writing across stores.
func TestWritePointsAndExecuteTwoShardsQueryRewrite(t *testing.T) {
// Create two distinct stores, ensuring shard mappers will shard nothing.
store0 := testStore()
defer os.RemoveAll(store0.Path())
store1 := testStore()
defer os.RemoveAll(store1.Path())
// Create a shard in each store.
database := "foo"
retentionPolicy := "bar"
store0.CreateShard(database, retentionPolicy, sID0)
store1.CreateShard(database, retentionPolicy, sID1)
// Write two points across shards.
pt1time := time.Unix(1, 0).UTC()
if err := store0.WriteToShard(sID0, []tsdb.Point{tsdb.NewPoint(
"cpu",
map[string]string{"host": "serverA"},
map[string]interface{}{"value1": 100},
pt1time,
)}); err != nil {
t.Fatalf(err.Error())
}
pt2time := time.Unix(2, 0).UTC()
if err := store1.WriteToShard(sID1, []tsdb.Point{tsdb.NewPoint(
"cpu",
map[string]string{"host": "serverB"},
map[string]interface{}{"value2": 200},
pt2time,
)}); err != nil {
t.Fatalf(err.Error())
}
var tests = []struct {
skip bool // Skip test
stmt string // Query statement
chunkSize int // Chunk size for driving the executor
expected string // Expected results, rendered as a string
}{
{
stmt: `SELECT * FROM cpu`,
expected: `[{"name":"cpu","tags":{"host":"serverA"},"columns":["time","value1","value2"],"values":[["1970-01-01T00:00:01Z",100,null]]},{"name":"cpu","tags":{"host":"serverB"},"columns":["time","value1","value2"],"values":[["1970-01-01T00:00:02Z",null,200]]}]`,
},
}
for _, tt := range tests {
if tt.skip {
t.Logf("Skipping test %s", tt.stmt)
continue
}
parsedSelectStmt := mustParseSelectStatement(tt.stmt)
// Create Mappers and Executor.
mapper0, err := store0.CreateMapper(sID0, tt.stmt, tt.chunkSize)
if err != nil {
t.Fatalf("failed to create mapper0: %s", err.Error())
}
mapper1, err := store1.CreateMapper(sID1, tt.stmt, tt.chunkSize)
if err != nil {
t.Fatalf("failed to create mapper1: %s", err.Error())
}
executor := tsdb.NewExecutor(parsedSelectStmt, []tsdb.Mapper{mapper0, mapper1}, tt.chunkSize)
// Check the results.
got := executeAndGetResults(executor)
if got != tt.expected {
t.Fatalf("Test %s\nexp: %s\ngot: %s\n", tt.stmt, tt.expected, got)
}
}
}
// Test that executor correctly orders data across shards when the tagsets
// are not presented in alphabetically order across shards.
func TestWritePointsAndExecuteTwoShardsTagSetOrdering(t *testing.T) {
@ -749,7 +820,6 @@ func TestProcessRawQueryDerivative(t *testing.T) {
}
for i := 0; i < len(test.exp); i++ {
fmt.Println("Times:", test.exp[i].Time, got[i].Time)
if test.exp[i].Time != got[i].Time || math.Abs((test.exp[i].Value.(float64)-got[i].Value.(float64))) > 0.0000001 {
t.Fatalf("RawQueryDerivativeProcessor - %s results mismatch:\ngot %v\nexp %v", test.name, got, test.exp)
}

View File

@ -29,6 +29,7 @@ func (a MapperValues) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
type MapperOutput struct {
Name string `json:"name,omitempty"`
Tags map[string]string `json:"tags,omitempty"`
Fields []string `json:"fields,omitempty"` // Field names of returned data.
Values []*MapperValue `json:"values,omitempty"` // For aggregates contains a single value at [0]
}
@ -64,18 +65,12 @@ type LocalMapper struct {
// NewLocalMapper returns a mapper for the given shard, which will return data for the SELECT statement.
func NewLocalMapper(shard *Shard, stmt influxql.Statement, chunkSize int) *LocalMapper {
m := &LocalMapper{
return &LocalMapper{
shard: shard,
stmt: stmt,
chunkSize: chunkSize,
cursors: make([]*tagSetCursor, 0),
}
if s, ok := stmt.(*influxql.SelectStatement); ok {
m.selectStmt = s
m.rawMode = (s.IsRawQuery && !s.HasDistinct()) || s.IsSimpleDerivative()
}
return m
}
// openMeta opens the mapper for a meta query.
@ -94,7 +89,14 @@ func (lm *LocalMapper) Open() error {
}
lm.tx = tx
if lm.selectStmt == nil {
if s, ok := lm.stmt.(*influxql.SelectStatement); ok {
stmt, err := lm.rewriteSelectStatement(s)
if err != nil {
return err
}
lm.selectStmt = stmt
lm.rawMode = (s.IsRawQuery && !s.HasDistinct()) || s.IsSimpleDerivative()
} else {
return lm.openMeta()
}
@ -278,8 +280,9 @@ func (lm *LocalMapper) nextChunkRaw() (interface{}, error) {
if output == nil {
output = &MapperOutput{
Name: cursor.measurement,
Tags: cursor.tags,
Name: cursor.measurement,
Tags: cursor.tags,
Fields: lm.selectFields,
}
}
value := &MapperValue{Time: k, Value: v}
@ -317,6 +320,7 @@ func (lm *LocalMapper) nextChunkAgg() (interface{}, error) {
output = &MapperOutput{
Name: tsc.measurement,
Tags: tsc.tags,
Fields: lm.selectFields,
Values: make([]*MapperValue, 1),
}
// Aggregate values only use the first entry in the Values field. Set the time
@ -419,11 +423,125 @@ func (lm *LocalMapper) initializeMapFunctions() error {
return nil
}
// rewriteSelectStatement performs any necessary query re-writing.
func (lm *LocalMapper) rewriteSelectStatement(stmt *influxql.SelectStatement) (*influxql.SelectStatement, error) {
var err error
// Expand regex expressions in the FROM clause.
sources, err := lm.expandSources(stmt.Sources)
if err != nil {
return nil, err
}
stmt.Sources = sources
// Expand wildcards in the fields or GROUP BY.
stmt, err = lm.expandWildcards(stmt)
if err != nil {
return nil, err
}
stmt.RewriteDistinct()
return stmt, nil
}
// expandWildcards returns a new SelectStatement with wildcards in the fields
// and/or GROUP BY expanded with actual field names.
func (lm *LocalMapper) expandWildcards(stmt *influxql.SelectStatement) (*influxql.SelectStatement, error) {
// If there are no wildcards in the statement, return it as-is.
if !stmt.HasWildcard() {
return stmt, nil
}
// Use sets to avoid duplicate field names.
fieldSet := map[string]struct{}{}
dimensionSet := map[string]struct{}{}
var fields influxql.Fields
var dimensions influxql.Dimensions
// Iterate measurements in the FROM clause getting the fields & dimensions for each.
for _, src := range stmt.Sources {
if m, ok := src.(*influxql.Measurement); ok {
// Lookup the measurement in the database.
mm := lm.shard.index.Measurement(m.Name)
if mm == nil {
// This shard have never received data for the measurement. No Mapper
// required.
return stmt, nil
}
// Get the fields for this measurement.
for _, name := range mm.FieldNames() {
if _, ok := fieldSet[name]; ok {
continue
}
fieldSet[name] = struct{}{}
fields = append(fields, &influxql.Field{Expr: &influxql.VarRef{Val: name}})
}
// Get the dimensions for this measurement.
for _, t := range mm.TagKeys() {
if _, ok := dimensionSet[t]; ok {
continue
}
dimensionSet[t] = struct{}{}
dimensions = append(dimensions, &influxql.Dimension{Expr: &influxql.VarRef{Val: t}})
}
}
}
// Return a new SelectStatement with the wild cards rewritten.
return stmt.RewriteWildcards(fields, dimensions), nil
}
// expandSources expands regex sources and removes duplicates.
// NOTE: sources must be normalized (db and rp set) before calling this function.
func (lm *LocalMapper) expandSources(sources influxql.Sources) (influxql.Sources, error) {
// Use a map as a set to prevent duplicates. Two regexes might produce
// duplicates when expanded.
set := map[string]influxql.Source{}
names := []string{}
// Iterate all sources, expanding regexes when they're found.
for _, source := range sources {
switch src := source.(type) {
case *influxql.Measurement:
if src.Regex == nil {
name := src.String()
set[name] = src
names = append(names, name)
continue
}
// Get measurements from the database that match the regex.
measurements := lm.shard.index.measurementsByRegex(src.Regex.Val)
// Add those measurements to the set.
for _, m := range measurements {
m2 := &influxql.Measurement{
Database: src.Database,
RetentionPolicy: src.RetentionPolicy,
Name: m.Name,
}
name := m2.String()
if _, ok := set[name]; !ok {
set[name] = m2
names = append(names, name)
}
}
default:
return nil, fmt.Errorf("expandSources: unsuported source type: %T", source)
}
}
// Sort the list of source names.
sort.Strings(names)
// Convert set to a list of Sources.
expanded := make(influxql.Sources, 0, len(set))
for _, name := range names {
expanded = append(expanded, set[name])
}
return expanded, nil
}
// TagSets returns the list of TagSets for which this mapper has data.
func (lm *LocalMapper) TagSets() []string {
return tagSetCursors(lm.cursors).Keys()
}
// Fields returns any SELECT fields. If this Mapper is not processing a SELECT query
// then an empty slice is returned.
func (lm *LocalMapper) Fields() []string {
return lm.selectFields
}
// Close closes the mapper.
func (lm *LocalMapper) Close() {
if lm != nil && lm.tx != nil {

View File

@ -15,7 +15,7 @@ import (
"github.com/influxdb/influxdb/tsdb"
)
func TestShardMapper_RawMapperTagSets(t *testing.T) {
func TestShardMapper_RawMapperTagSetsFields(t *testing.T) {
tmpDir, _ := ioutil.TempDir("", "shard_test")
defer os.RemoveAll(tmpDir)
shard := mustCreateShard(tmpDir)
@ -24,7 +24,7 @@ func TestShardMapper_RawMapperTagSets(t *testing.T) {
pt1 := tsdb.NewPoint(
"cpu",
map[string]string{"host": "serverA", "region": "us-east"},
map[string]interface{}{"load": 42},
map[string]interface{}{"idle": 60},
pt1time,
)
pt2time := time.Unix(2, 0).UTC()
@ -40,41 +40,67 @@ func TestShardMapper_RawMapperTagSets(t *testing.T) {
}
var tests = []struct {
stmt string
expected []string
stmt string
expectedTags []string
expectedFields []string
}{
{
stmt: `SELECT load FROM cpu`,
expected: []string{"cpu"},
stmt: `SELECT load FROM cpu`,
expectedTags: []string{"cpu"},
expectedFields: []string{"load"},
},
{
stmt: `SELECT load FROM cpu GROUP BY host`,
expected: []string{"cpu|host|serverA", "cpu|host|serverB"},
stmt: `SELECT derivative(load) FROM cpu`,
expectedTags: []string{"cpu"},
expectedFields: []string{"load"},
},
{
stmt: `SELECT load FROM cpu GROUP BY region`,
expected: []string{"cpu|region|us-east"},
stmt: `SELECT idle,load FROM cpu`,
expectedTags: []string{"cpu"},
expectedFields: []string{"idle", "load"},
},
{
stmt: `SELECT load FROM cpu WHERE host='serverA'`,
expected: []string{"cpu"},
stmt: `SELECT load,idle FROM cpu`,
expectedTags: []string{"cpu"},
expectedFields: []string{"idle", "load"},
},
{
stmt: `SELECT load FROM cpu WHERE host='serverB'`,
expected: []string{"cpu"},
stmt: `SELECT load FROM cpu GROUP BY host`,
expectedTags: []string{"cpu|host|serverA", "cpu|host|serverB"},
expectedFields: []string{"load"},
},
{
stmt: `SELECT load FROM cpu WHERE host='serverC'`,
expected: []string{},
stmt: `SELECT load FROM cpu GROUP BY region`,
expectedTags: []string{"cpu|region|us-east"},
expectedFields: []string{"load"},
},
{
stmt: `SELECT load FROM cpu WHERE host='serverA'`,
expectedTags: []string{"cpu"},
expectedFields: []string{"load"},
},
{
stmt: `SELECT load FROM cpu WHERE host='serverB'`,
expectedTags: []string{"cpu"},
expectedFields: []string{"load"},
},
{
stmt: `SELECT load FROM cpu WHERE host='serverC'`,
expectedTags: []string{},
expectedFields: []string{"load"},
},
}
for _, tt := range tests {
stmt := mustParseSelectStatement(tt.stmt)
mapper := openRawMapperOrFail(t, shard, stmt, 0)
got := mapper.TagSets()
if !reflect.DeepEqual(got, tt.expected) {
t.Errorf("test '%s'\n\tgot %s\n\texpected %s", tt.stmt, got, tt.expected)
tags := mapper.TagSets()
if !reflect.DeepEqual(tags, tt.expectedTags) {
t.Errorf("test '%s'\n\tgot %s\n\texpected %s", tt.stmt, tags, tt.expectedTags)
}
fields := mapper.Fields()
if !reflect.DeepEqual(fields, tt.expectedFields) {
t.Errorf("test '%s'\n\tgot %s\n\texpected %s", tt.stmt, fields, tt.expectedFields)
}
}
}
@ -110,38 +136,41 @@ func TestShardMapper_WriteAndSingleMapperRawQuery(t *testing.T) {
}{
{
stmt: `SELECT load FROM cpu`,
expected: []string{`{"name":"cpu","values":[{"time":1000000000,"value":42},{"time":2000000000,"value":60}]}`, `null`},
expected: []string{`{"name":"cpu","fields":["load"],"values":[{"time":1000000000,"value":42},{"time":2000000000,"value":60}]}`, `null`},
},
{
stmt: `SELECT load FROM cpu`,
chunkSize: 1,
expected: []string{`{"name":"cpu","values":[{"time":1000000000,"value":42}]}`, `{"name":"cpu","values":[{"time":2000000000,"value":60}]}`, `null`},
expected: []string{`{"name":"cpu","fields":["load"],"values":[{"time":1000000000,"value":42}]}`, `{"name":"cpu","fields":["load"],"values":[{"time":2000000000,"value":60}]}`, `null`},
},
{
stmt: `SELECT load FROM cpu`,
chunkSize: 2,
expected: []string{`{"name":"cpu","values":[{"time":1000000000,"value":42},{"time":2000000000,"value":60}]}`},
expected: []string{`{"name":"cpu","fields":["load"],"values":[{"time":1000000000,"value":42},{"time":2000000000,"value":60}]}`},
},
{
stmt: `SELECT load FROM cpu`,
chunkSize: 3,
expected: []string{`{"name":"cpu","values":[{"time":1000000000,"value":42},{"time":2000000000,"value":60}]}`},
expected: []string{`{"name":"cpu","fields":["load"],"values":[{"time":1000000000,"value":42},{"time":2000000000,"value":60}]}`},
},
{
stmt: `SELECT load FROM cpu GROUP BY host`,
expected: []string{`{"name":"cpu","tags":{"host":"serverA"},"values":[{"time":1000000000,"value":42}]}`, `{"name":"cpu","tags":{"host":"serverB"},"values":[{"time":2000000000,"value":60}]}`, `null`},
stmt: `SELECT load FROM cpu GROUP BY host`,
expected: []string{
`{"name":"cpu","tags":{"host":"serverA"},"fields":["load"],"values":[{"time":1000000000,"value":42}]}`,
`{"name":"cpu","tags":{"host":"serverB"},"fields":["load"],"values":[{"time":2000000000,"value":60}]}`,
`null`},
},
{
stmt: `SELECT load FROM cpu GROUP BY region`,
expected: []string{`{"name":"cpu","tags":{"region":"us-east"},"values":[{"time":1000000000,"value":42},{"time":2000000000,"value":60}]}`, `null`},
expected: []string{`{"name":"cpu","tags":{"region":"us-east"},"fields":["load"],"values":[{"time":1000000000,"value":42},{"time":2000000000,"value":60}]}`, `null`},
},
{
stmt: `SELECT load FROM cpu WHERE host='serverA'`,
expected: []string{`{"name":"cpu","values":[{"time":1000000000,"value":42}]}`, `null`},
expected: []string{`{"name":"cpu","fields":["load"],"values":[{"time":1000000000,"value":42}]}`, `null`},
},
{
stmt: `SELECT load FROM cpu WHERE host='serverB'`,
expected: []string{`{"name":"cpu","values":[{"time":2000000000,"value":60}]}`, `null`},
expected: []string{`{"name":"cpu","fields":["load"],"values":[{"time":2000000000,"value":60}]}`, `null`},
},
{
stmt: `SELECT load FROM cpu WHERE host='serverC'`,
@ -149,19 +178,19 @@ func TestShardMapper_WriteAndSingleMapperRawQuery(t *testing.T) {
},
{
stmt: `SELECT load FROM cpu WHERE load = 60`,
expected: []string{`{"name":"cpu","values":[{"time":2000000000,"value":60}]}`, `null`},
expected: []string{`{"name":"cpu","fields":["load"],"values":[{"time":2000000000,"value":60}]}`, `null`},
},
{
stmt: `SELECT load FROM cpu WHERE load != 60`,
expected: []string{`{"name":"cpu","values":[{"time":1000000000,"value":42}]}`, `null`},
expected: []string{`{"name":"cpu","fields":["load"],"values":[{"time":1000000000,"value":42}]}`, `null`},
},
{
stmt: fmt.Sprintf(`SELECT load FROM cpu WHERE time = '%s'`, pt1time.Format(influxql.DateTimeFormat)),
expected: []string{`{"name":"cpu","values":[{"time":1000000000,"value":42}]}`, `null`},
expected: []string{`{"name":"cpu","fields":["load"],"values":[{"time":1000000000,"value":42}]}`, `null`},
},
{
stmt: fmt.Sprintf(`SELECT load FROM cpu WHERE time > '%s'`, pt1time.Format(influxql.DateTimeFormat)),
expected: []string{`{"name":"cpu","values":[{"time":2000000000,"value":60}]}`, `null`},
expected: []string{`{"name":"cpu","fields":["load"],"values":[{"time":2000000000,"value":60}]}`, `null`},
},
{
stmt: fmt.Sprintf(`SELECT load FROM cpu WHERE time > '%s'`, pt2time.Format(influxql.DateTimeFormat)),
@ -214,11 +243,11 @@ func TestShardMapper_WriteAndSingleMapperRawQueryMultiValue(t *testing.T) {
}{
{
stmt: `SELECT foo FROM cpu`,
expected: []string{`{"name":"cpu","values":[{"time":1000000000,"value":42},{"time":2000000000,"value":60}]}`, `null`},
expected: []string{`{"name":"cpu","fields":["foo"],"values":[{"time":1000000000,"value":42},{"time":2000000000,"value":60}]}`, `null`},
},
{
stmt: `SELECT foo,bar FROM cpu`,
expected: []string{`{"name":"cpu","values":[{"time":1000000000,"value":{"bar":43,"foo":42}},{"time":2000000000,"value":{"bar":61,"foo":60}}]}`, `null`},
expected: []string{`{"name":"cpu","fields":["bar","foo"],"values":[{"time":1000000000,"value":{"bar":43,"foo":42}},{"time":2000000000,"value":{"bar":61,"foo":60}}]}`, `null`},
},
}
@ -226,10 +255,10 @@ func TestShardMapper_WriteAndSingleMapperRawQueryMultiValue(t *testing.T) {
stmt := mustParseSelectStatement(tt.stmt)
mapper := openRawMapperOrFail(t, shard, stmt, tt.chunkSize)
for _, s := range tt.expected {
for i, s := range tt.expected {
got := nextRawChunkAsJson(t, mapper)
if got != s {
t.Errorf("test '%s'\n\tgot %s\n\texpected %s", tt.stmt, got, tt.expected)
t.Errorf("test '%s'\n\tgot %s\n\texpected %s", tt.stmt, got, tt.expected[i])
break
}
}
@ -267,15 +296,15 @@ func TestShardMapper_WriteAndSingleMapperRawQueryMultiSource(t *testing.T) {
}{
{
stmt: `SELECT foo FROM cpu0,cpu1`,
expected: []string{`{"name":"cpu0","values":[{"time":1000000000,"value":42}]}`, `null`},
expected: []string{`{"name":"cpu0","fields":["foo"],"values":[{"time":1000000000,"value":42}]}`, `null`},
},
{
stmt: `SELECT foo FROM cpu0,cpu1 WHERE foo=42`,
expected: []string{`{"name":"cpu0","values":[{"time":1000000000,"value":42}]}`, `null`},
expected: []string{`{"name":"cpu0","fields":["foo"],"values":[{"time":1000000000,"value":42}]}`, `null`},
},
{
stmt: `SELECT bar FROM cpu0,cpu1`,
expected: []string{`{"name":"cpu1","values":[{"time":2000000000,"value":60}]}`, `null`},
expected: []string{`{"name":"cpu1","fields":["bar"],"values":[{"time":2000000000,"value":60}]}`, `null`},
},
{
stmt: `SELECT bar FROM cpu0,cpu1 WHERE foo=42`,
@ -331,54 +360,54 @@ func TestShardMapper_WriteAndSingleMapperAggregateQuery(t *testing.T) {
}{
{
stmt: `SELECT sum(value) FROM cpu`,
expected: []string{`{"name":"cpu","values":[{"value":[61]}]}`, `null`},
expected: []string{`{"name":"cpu","fields":["value"],"values":[{"value":[61]}]}`, `null`},
},
{
stmt: `SELECT sum(value),mean(value) FROM cpu`,
expected: []string{`{"name":"cpu","values":[{"value":[61,{"Count":2,"Mean":30.5,"ResultType":1}]}]}`, `null`},
expected: []string{`{"name":"cpu","fields":["value"],"values":[{"value":[61,{"Count":2,"Mean":30.5,"ResultType":1}]}]}`, `null`},
},
{
stmt: `SELECT sum(value) FROM cpu GROUP BY host`,
expected: []string{
`{"name":"cpu","tags":{"host":"serverA"},"values":[{"value":[1]}]}`,
`{"name":"cpu","tags":{"host":"serverB"},"values":[{"value":[60]}]}`,
`{"name":"cpu","tags":{"host":"serverA"},"fields":["value"],"values":[{"value":[1]}]}`,
`{"name":"cpu","tags":{"host":"serverB"},"fields":["value"],"values":[{"value":[60]}]}`,
`null`},
},
{
stmt: `SELECT sum(value) FROM cpu GROUP BY region`,
expected: []string{
`{"name":"cpu","tags":{"region":"us-east"},"values":[{"value":[61]}]}`,
`{"name":"cpu","tags":{"region":"us-east"},"fields":["value"],"values":[{"value":[61]}]}`,
`null`},
},
{
stmt: `SELECT sum(value) FROM cpu GROUP BY region,host`,
expected: []string{
`{"name":"cpu","tags":{"host":"serverA","region":"us-east"},"values":[{"value":[1]}]}`,
`{"name":"cpu","tags":{"host":"serverB","region":"us-east"},"values":[{"value":[60]}]}`,
`{"name":"cpu","tags":{"host":"serverA","region":"us-east"},"fields":["value"],"values":[{"value":[1]}]}`,
`{"name":"cpu","tags":{"host":"serverB","region":"us-east"},"fields":["value"],"values":[{"value":[60]}]}`,
`null`},
},
{
stmt: `SELECT sum(value) FROM cpu WHERE host='serverB'`,
expected: []string{
`{"name":"cpu","values":[{"value":[60]}]}`,
`{"name":"cpu","fields":["value"],"values":[{"value":[60]}]}`,
`null`},
},
{
stmt: fmt.Sprintf(`SELECT sum(value) FROM cpu WHERE time = '%s'`, pt1time.Format(influxql.DateTimeFormat)),
expected: []string{
`{"name":"cpu","values":[{"time":10000000000,"value":[1]}]}`,
`{"name":"cpu","fields":["value"],"values":[{"time":10000000000,"value":[1]}]}`,
`null`},
},
{
stmt: fmt.Sprintf(`SELECT sum(value) FROM cpu WHERE time > '%s'`, pt1time.Format(influxql.DateTimeFormat)),
expected: []string{
`{"name":"cpu","values":[{"time":10000000001,"value":[60]}]}`,
`{"name":"cpu","fields":["value"],"values":[{"time":10000000001,"value":[60]}]}`,
`null`},
},
{
stmt: fmt.Sprintf(`SELECT sum(value) FROM cpu WHERE time > '%s'`, pt2time.Format(influxql.DateTimeFormat)),
expected: []string{
`{"name":"cpu","values":[{"time":20000000001,"value":[null]}]}`,
`{"name":"cpu","fields":["value"],"values":[{"time":20000000001,"value":[null]}]}`,
`null`},
},
}

View File

@ -251,12 +251,6 @@ func (q *QueryExecutor) Plan(stmt *influxql.SelectStatement, chunkSize int) (*Ex
// executeSelectStatement plans and executes a select statement against a database.
func (q *QueryExecutor) executeSelectStatement(statementID int, stmt *influxql.SelectStatement, results chan *influxql.Result, chunkSize int) error {
// Perform any necessary query re-writing.
stmt, err := q.rewriteSelectStatement(stmt)
if err != nil {
return err
}
// Plan statement execution.
e, err := q.Plan(stmt, chunkSize)
if err != nil {
@ -283,85 +277,6 @@ func (q *QueryExecutor) executeSelectStatement(statementID int, stmt *influxql.S
return nil
}
// rewriteSelectStatement performs any necessary query re-writing.
func (q *QueryExecutor) rewriteSelectStatement(stmt *influxql.SelectStatement) (*influxql.SelectStatement, error) {
var err error
// Expand regex expressions in the FROM clause.
sources, err := q.expandSources(stmt.Sources)
if err != nil {
return nil, err
}
stmt.Sources = sources
// Expand wildcards in the fields or GROUP BY.
if stmt.HasWildcard() {
stmt, err = q.expandWildcards(stmt)
if err != nil {
return nil, err
}
}
stmt.RewriteDistinct()
return stmt, nil
}
// expandWildcards returns a new SelectStatement with wildcards in the fields
// and/or GROUP BY expanded with actual field names.
func (q *QueryExecutor) expandWildcards(stmt *influxql.SelectStatement) (*influxql.SelectStatement, error) {
// If there are no wildcards in the statement, return it as-is.
if !stmt.HasWildcard() {
return stmt, nil
}
// Use sets to avoid duplicate field names.
fieldSet := map[string]struct{}{}
dimensionSet := map[string]struct{}{}
var fields influxql.Fields
var dimensions influxql.Dimensions
// Iterate measurements in the FROM clause getting the fields & dimensions for each.
for _, src := range stmt.Sources {
if m, ok := src.(*influxql.Measurement); ok {
// Lookup the database. The database may not exist if no data for this database
// was ever written to the shard.
db := q.Store.DatabaseIndex(m.Database)
if db == nil {
return stmt, nil
}
// Lookup the measurement in the database.
mm := db.measurements[m.Name]
if mm == nil {
return nil, ErrMeasurementNotFound(m.String())
}
// Get the fields for this measurement.
for _, name := range mm.FieldNames() {
if _, ok := fieldSet[name]; ok {
continue
}
fieldSet[name] = struct{}{}
fields = append(fields, &influxql.Field{Expr: &influxql.VarRef{Val: name}})
}
// Get the dimensions for this measurement.
for _, t := range mm.TagKeys() {
if _, ok := dimensionSet[t]; ok {
continue
}
dimensionSet[t] = struct{}{}
dimensions = append(dimensions, &influxql.Dimension{Expr: &influxql.VarRef{Val: t}})
}
}
}
// Return a new SelectStatement with the wild cards rewritten.
return stmt.RewriteWildcards(fields, dimensions), nil
}
// expandSources expands regex sources and removes duplicates.
// NOTE: sources must be normalized (db and rp set) before calling this function.
func (q *QueryExecutor) expandSources(sources influxql.Sources) (influxql.Sources, error) {

View File

@ -160,7 +160,7 @@ func TestDropMeasurementStatement(t *testing.T) {
t.Fatalf("exp: %s\ngot: %s", exepected, got)
}
got = executeAndGetJSON("select * from memory", executor)
exepected = `[{"error":"measurement not found: \"foo\".\"foo\".memory"}]`
exepected = `[{}]`
if exepected != got {
t.Fatalf("exp: %s\ngot: %s", exepected, got)
}