diff --git a/cluster/service.go b/cluster/service.go index 186cf8434c..a81fae46f5 100644 --- a/cluster/service.go +++ b/cluster/service.go @@ -11,6 +11,7 @@ import ( "strings" "sync" + "github.com/influxdb/influxdb/influxql" "github.com/influxdb/influxdb/meta" "github.com/influxdb/influxdb/tsdb" ) @@ -37,7 +38,7 @@ type Service struct { TSDBStore interface { CreateShard(database, policy string, shardID uint64) error WriteToShard(shardID uint64, points []tsdb.Point) error - CreateMapper(shardID uint64, query string, chunkSize int) (tsdb.Mapper, error) + CreateMapper(shardID uint64, stmt influxql.Statement, chunkSize int) (tsdb.Mapper, error) } Logger *log.Logger @@ -232,7 +233,15 @@ func (s *Service) processMapShardRequest(w io.Writer, buf []byte) error { return err } - m, err := s.TSDBStore.CreateMapper(req.ShardID(), req.Query(), int(req.ChunkSize())) + // Parse the statement. + q, err := influxql.ParseQuery(req.Query()) + if err != nil { + return fmt.Errorf("processing map shard: %s", err) + } else if len(q.Statements) != 1 { + return fmt.Errorf("processing map shard: expected 1 statement but got %d", len(q.Statements)) + } + + m, err := s.TSDBStore.CreateMapper(req.ShardID(), q.Statements[0], int(req.ChunkSize())) if err != nil { return fmt.Errorf("create mapper: %s", err) } diff --git a/cluster/service_test.go b/cluster/service_test.go index 114135db86..3006bc4fe6 100644 --- a/cluster/service_test.go +++ b/cluster/service_test.go @@ -6,6 +6,7 @@ import ( "time" "github.com/influxdb/influxdb/cluster" + "github.com/influxdb/influxdb/influxql" "github.com/influxdb/influxdb/meta" "github.com/influxdb/influxdb/tcp" "github.com/influxdb/influxdb/tsdb" @@ -28,7 +29,7 @@ type testService struct { muxln net.Listener writeShardFunc func(shardID uint64, points []tsdb.Point) error createShardFunc func(database, policy string, shardID uint64) error - createMapperFunc func(shardID uint64, query string, chunkSize int) (tsdb.Mapper, error) + createMapperFunc func(shardID uint64, stmt influxql.Statement, chunkSize int) (tsdb.Mapper, error) } func newTestWriteService(f func(shardID uint64, points []tsdb.Point) error) testService { @@ -69,8 +70,8 @@ func (t testService) CreateShard(database, policy string, shardID uint64) error return t.createShardFunc(database, policy, shardID) } -func (t testService) CreateMapper(shardID uint64, query string, chunkSize int) (tsdb.Mapper, error) { - return t.createMapperFunc(shardID, query, chunkSize) +func (t testService) CreateMapper(shardID uint64, stmt influxql.Statement, chunkSize int) (tsdb.Mapper, error) { + return t.createMapperFunc(shardID, stmt, chunkSize) } func writeShardSuccess(shardID uint64, points []tsdb.Point) error { diff --git a/cluster/shard_mapper.go b/cluster/shard_mapper.go index 9ed4fab5b8..1526c3e4ad 100644 --- a/cluster/shard_mapper.go +++ b/cluster/shard_mapper.go @@ -7,6 +7,7 @@ import ( "net" "time" + "github.com/influxdb/influxdb/influxql" "github.com/influxdb/influxdb/meta" "github.com/influxdb/influxdb/tsdb" "gopkg.in/fatih/pool.v2" @@ -24,7 +25,7 @@ type ShardMapper struct { } TSDBStore interface { - CreateMapper(shardID uint64, query string, chunkSize int) (tsdb.Mapper, error) + CreateMapper(shardID uint64, stmt influxql.Statement, chunkSize int) (tsdb.Mapper, error) } timeout time.Duration @@ -40,7 +41,7 @@ func NewShardMapper(timeout time.Duration) *ShardMapper { } // CreateMapper returns a Mapper for the given shard ID. -func (s *ShardMapper) CreateMapper(sh meta.ShardInfo, stmt string, chunkSize int) (tsdb.Mapper, error) { +func (s *ShardMapper) CreateMapper(sh meta.ShardInfo, stmt influxql.Statement, chunkSize int) (tsdb.Mapper, error) { m, err := s.TSDBStore.CreateMapper(sh.ID, stmt, chunkSize) if err != nil { return nil, err @@ -86,7 +87,7 @@ type remoteShardConn interface { // sends a query, and interprets the stream of data that comes back. type RemoteMapper struct { shardID uint64 - stmt string + stmt influxql.Statement chunkSize int tagsets []string @@ -97,7 +98,7 @@ type RemoteMapper struct { } // NewRemoteMapper returns a new remote mapper using the given connection. -func NewRemoteMapper(c remoteShardConn, shardID uint64, stmt string, chunkSize int) *RemoteMapper { +func NewRemoteMapper(c remoteShardConn, shardID uint64, stmt influxql.Statement, chunkSize int) *RemoteMapper { return &RemoteMapper{ conn: c, shardID: shardID, @@ -116,7 +117,7 @@ func (r *RemoteMapper) Open() (err error) { // Build Map request. var request MapShardRequest request.SetShardID(r.shardID) - request.SetQuery(r.stmt) + request.SetQuery(r.stmt.String()) request.SetChunkSize(int32(r.chunkSize)) // Marshal into protocol buffers. diff --git a/cluster/shard_mapper_test.go b/cluster/shard_mapper_test.go index 60f97cf24e..922934273c 100644 --- a/cluster/shard_mapper_test.go +++ b/cluster/shard_mapper_test.go @@ -3,9 +3,11 @@ package cluster import ( "bytes" "encoding/json" + "fmt" "io" "testing" + "github.com/influxdb/influxdb/influxql" "github.com/influxdb/influxdb/tsdb" ) @@ -63,7 +65,7 @@ func TestShardWriter_RemoteMapper_Success(t *testing.T) { c := newRemoteShardResponder([]*tsdb.MapperOutput{expOutput, nil}, expTagSets) - r := NewRemoteMapper(c, 1234, "SELECT * FROM CPU", 10) + r := NewRemoteMapper(c, 1234, mustParseStmt("SELECT * FROM CPU"), 10) if err := r.Open(); err != nil { t.Fatalf("failed to open remote mapper: %s", err.Error()) } @@ -98,3 +100,14 @@ func TestShardWriter_RemoteMapper_Success(t *testing.T) { t.Fatal("received more chunks when none expected") } } + +// mustParseStmt parses a single statement or panics. +func mustParseStmt(stmt string) influxql.Statement { + q, err := influxql.ParseQuery(stmt) + if err != nil { + panic(err) + } else if len(q.Statements) != 1 { + panic(fmt.Sprintf("expected 1 statement but got %d", len(q.Statements))) + } + return q.Statements[0] +} diff --git a/meta/data.go b/meta/data.go index 2b9168901f..6e01f864aa 100644 --- a/meta/data.go +++ b/meta/data.go @@ -662,6 +662,31 @@ func (di DatabaseInfo) RetentionPolicy(name string) *RetentionPolicyInfo { return nil } +// ShardInfos returns a list of all shards' info for the database. +func (di DatabaseInfo) ShardInfos() []ShardInfo { + shards := map[uint64]*ShardInfo{} + for i := range di.RetentionPolicies { + for j := range di.RetentionPolicies[i].ShardGroups { + sg := di.RetentionPolicies[i].ShardGroups[j] + // Skip deleted shard groups + if sg.Deleted() { + continue + } + for k := range sg.Shards { + si := &di.RetentionPolicies[i].ShardGroups[j].Shards[k] + shards[si.ID] = si + } + } + } + + infos := make([]ShardInfo, 0, len(shards)) + for _, info := range shards { + infos = append(infos, *info) + } + + return infos +} + // clone returns a deep copy of di. func (di DatabaseInfo) clone() DatabaseInfo { other := di diff --git a/tsdb/engine/bz1/bz1.go b/tsdb/engine/bz1/bz1.go index 5ab1483c73..498f192fd0 100644 --- a/tsdb/engine/bz1/bz1.go +++ b/tsdb/engine/bz1/bz1.go @@ -194,7 +194,7 @@ func (e *Engine) LoadMetadataIndex(index *tsdb.DatabaseIndex, measurementFields return err } - // now flush the metadata that was in the WAL, but hand't yet been flushed + // now flush the metadata that was in the WAL, but hadn't yet been flushed if err := e.WAL.LoadMetadataIndex(index, measurementFields); err != nil { return err } diff --git a/tsdb/executor.go b/tsdb/executor.go index 3d62507d16..72dbeb676e 100644 --- a/tsdb/executor.go +++ b/tsdb/executor.go @@ -21,6 +21,11 @@ const ( IgnoredChunkSize = 0 ) +// Executor is an interface for a query executor. +type Executor interface { + Execute() <-chan *influxql.Row +} + // Mapper is the interface all Mapper types must implement. type Mapper interface { Open() error @@ -54,20 +59,20 @@ func (sm *StatefulMapper) NextChunk() (*MapperOutput, error) { return chunk, nil } -type Executor struct { +type SelectExecutor struct { stmt *influxql.SelectStatement mappers []*StatefulMapper chunkSize int limitedTagSets map[string]struct{} // Set tagsets for which data has reached the LIMIT. } -// NewExecutor returns a new Executor. -func NewExecutor(stmt *influxql.SelectStatement, mappers []Mapper, chunkSize int) *Executor { +// NewSelectExecutor returns a new SelectExecutor. +func NewSelectExecutor(stmt *influxql.SelectStatement, mappers []Mapper, chunkSize int) *SelectExecutor { a := []*StatefulMapper{} for _, m := range mappers { a = append(a, &StatefulMapper{m, nil, false}) } - return &Executor{ + return &SelectExecutor{ stmt: stmt, mappers: a, chunkSize: chunkSize, @@ -76,12 +81,12 @@ func NewExecutor(stmt *influxql.SelectStatement, mappers []Mapper, chunkSize int } // Execute begins execution of the query and returns a channel to receive rows. -func (e *Executor) Execute() <-chan *influxql.Row { +func (e *SelectExecutor) Execute() <-chan *influxql.Row { // Create output channel and stream data in a separate goroutine. out := make(chan *influxql.Row, 0) - // Certain operations on the SELECT statement can be performed by the Executor without - // assistance from the Mappers. This allows the Executor to prepare aggregation functions + // Certain operations on the SELECT statement can be performed by the SelectExecutor without + // assistance from the Mappers. This allows the SelectExecutor to prepare aggregation functions // and mathematical functions. e.stmt.RewriteDistinct() @@ -94,7 +99,7 @@ func (e *Executor) Execute() <-chan *influxql.Row { } // mappersDrained returns whether all the executors Mappers have been drained of data. -func (e *Executor) mappersDrained() bool { +func (e *SelectExecutor) mappersDrained() bool { for _, m := range e.mappers { if !m.drained { return false @@ -104,7 +109,7 @@ func (e *Executor) mappersDrained() bool { } // nextMapperTagset returns the alphabetically lowest tagset across all Mappers. -func (e *Executor) nextMapperTagSet() string { +func (e *SelectExecutor) nextMapperTagSet() string { tagset := "" for _, m := range e.mappers { if m.bufferedChunk != nil { @@ -119,7 +124,7 @@ func (e *Executor) nextMapperTagSet() string { } // nextMapperLowestTime returns the lowest minimum time across all Mappers, for the given tagset. -func (e *Executor) nextMapperLowestTime(tagset string) int64 { +func (e *SelectExecutor) nextMapperLowestTime(tagset string) int64 { minTime := int64(math.MaxInt64) for _, m := range e.mappers { if !m.drained && m.bufferedChunk != nil { @@ -136,17 +141,17 @@ func (e *Executor) nextMapperLowestTime(tagset string) int64 { } // tagSetIsLimited returns whether data for the given tagset has been LIMITed. -func (e *Executor) tagSetIsLimited(tagset string) bool { +func (e *SelectExecutor) tagSetIsLimited(tagset string) bool { _, ok := e.limitedTagSets[tagset] return ok } // limitTagSet marks the given taset as LIMITed. -func (e *Executor) limitTagSet(tagset string) { +func (e *SelectExecutor) limitTagSet(tagset string) { e.limitedTagSets[tagset] = struct{}{} } -func (e *Executor) executeRaw(out chan *influxql.Row) { +func (e *SelectExecutor) executeRaw(out chan *influxql.Row) { // It's important that all resources are released when execution completes. defer e.close() @@ -329,7 +334,7 @@ func (e *Executor) executeRaw(out chan *influxql.Row) { close(out) } -func (e *Executor) executeAggregate(out chan *influxql.Row) { +func (e *SelectExecutor) executeAggregate(out chan *influxql.Row) { // It's important to close all resources when execution completes. defer e.close() @@ -493,7 +498,7 @@ func (e *Executor) executeAggregate(out chan *influxql.Row) { // processFill will take the results and return new results (or the same if no fill modifications are needed) // with whatever fill options the query has. -func (e *Executor) processFill(results [][]interface{}) [][]interface{} { +func (e *SelectExecutor) processFill(results [][]interface{}) [][]interface{} { // don't do anything if we're supposed to leave the nulls if e.stmt.Fill == influxql.NullFill { return results @@ -539,7 +544,7 @@ func (e *Executor) processFill(results [][]interface{}) [][]interface{} { } // processDerivative returns the derivatives of the results -func (e *Executor) processDerivative(results [][]interface{}) [][]interface{} { +func (e *SelectExecutor) processDerivative(results [][]interface{}) [][]interface{} { // Return early if we're not supposed to process the derivatives if e.stmt.HasDerivative() { interval, err := derivativeInterval(e.stmt) @@ -556,7 +561,7 @@ func (e *Executor) processDerivative(results [][]interface{}) [][]interface{} { // Close closes the executor such that all resources are released. Once closed, // an executor may not be re-used. -func (e *Executor) close() { +func (e *SelectExecutor) close() { if e != nil { for _, m := range e.mappers { m.Close() diff --git a/tsdb/executor_test.go b/tsdb/executor_test.go index b6c9342644..3bc911bc6c 100644 --- a/tsdb/executor_test.go +++ b/tsdb/executor_test.go @@ -139,7 +139,7 @@ func TestWritePointsAndExecuteTwoShards(t *testing.T) { t.Logf("Skipping test %s", tt.stmt) continue } - executor, err := query_executor.Plan(mustParseSelectStatement(tt.stmt), tt.chunkSize) + executor, err := query_executor.PlanSelect(mustParseSelectStatement(tt.stmt), tt.chunkSize) if err != nil { t.Fatalf("failed to plan query: %s", err.Error()) } @@ -238,7 +238,7 @@ func TestWritePointsAndExecuteTwoShardsAlign(t *testing.T) { t.Logf("Skipping test %s", tt.stmt) continue } - executor, err := query_executor.Plan(mustParseSelectStatement(tt.stmt), tt.chunkSize) + executor, err := query_executor.PlanSelect(mustParseSelectStatement(tt.stmt), tt.chunkSize) if err != nil { t.Fatalf("failed to plan query: %s", err.Error()) } @@ -306,15 +306,15 @@ func TestWritePointsAndExecuteTwoShardsQueryRewrite(t *testing.T) { parsedSelectStmt := mustParseSelectStatement(tt.stmt) // Create Mappers and Executor. - mapper0, err := store0.CreateMapper(sID0, tt.stmt, tt.chunkSize) + mapper0, err := store0.CreateMapper(sID0, parsedSelectStmt, tt.chunkSize) if err != nil { t.Fatalf("failed to create mapper0: %s", err.Error()) } - mapper1, err := store1.CreateMapper(sID1, tt.stmt, tt.chunkSize) + mapper1, err := store1.CreateMapper(sID1, parsedSelectStmt, tt.chunkSize) if err != nil { t.Fatalf("failed to create mapper1: %s", err.Error()) } - executor := tsdb.NewExecutor(parsedSelectStmt, []tsdb.Mapper{mapper0, mapper1}, tt.chunkSize) + executor := tsdb.NewSelectExecutor(parsedSelectStmt, []tsdb.Mapper{mapper0, mapper1}, tt.chunkSize) // Check the results. got := executeAndGetResults(executor) @@ -421,7 +421,7 @@ func TestWritePointsAndExecuteTwoShardsTagSetOrdering(t *testing.T) { t.Logf("Skipping test %s", tt.stmt) continue } - executor, err := query_executor.Plan(mustParseSelectStatement(tt.stmt), tt.chunkSize) + executor, err := query_executor.PlanSelect(mustParseSelectStatement(tt.stmt), tt.chunkSize) if err != nil { t.Fatalf("failed to plan query: %s", err.Error()) } @@ -432,6 +432,86 @@ func TestWritePointsAndExecuteTwoShardsTagSetOrdering(t *testing.T) { } } +// Test to ensure the engine handles measurements across stores. +func TestWritePointsAndExecuteTwoShardsShowMeasurements(t *testing.T) { + // Create two distinct stores, ensuring shard mappers will shard nothing. + store0 := testStore() + defer os.RemoveAll(store0.Path()) + store1 := testStore() + defer os.RemoveAll(store1.Path()) + + // Create a shard in each store. + database := "foo" + retentionPolicy := "bar" + store0.CreateShard(database, retentionPolicy, sID0) + store1.CreateShard(database, retentionPolicy, sID1) + + // Write two points across shards. + pt1time := time.Unix(1, 0).UTC() + if err := store0.WriteToShard(sID0, []tsdb.Point{tsdb.NewPoint( + "cpu", + map[string]string{"host": "serverA"}, + map[string]interface{}{"value1": 100}, + pt1time, + )}); err != nil { + t.Fatalf(err.Error()) + } + pt2time := time.Unix(2, 0).UTC() + if err := store1.WriteToShard(sID1, []tsdb.Point{tsdb.NewPoint( + "mem", + map[string]string{"host": "serverB"}, + map[string]interface{}{"value2": 200}, + pt2time, + )}); err != nil { + t.Fatalf(err.Error()) + } + var tests = []struct { + skip bool // Skip test + stmt string // Query statement + chunkSize int // Chunk size for driving the executor + expected string // Expected results, rendered as a string + }{ + { + stmt: `SHOW MEASUREMENTS`, + expected: `[{"name":"measurements","columns":["name"],"values":[["cpu"],["mem"]]}]`, + }, + { + stmt: `SHOW MEASUREMENTS WHERE host='serverB'`, + expected: `[{"name":"measurements","columns":["name"],"values":[["mem"]]}]`, + }, + { + stmt: `SHOW MEASUREMENTS WHERE host='serverX'`, + expected: `null`, + }, + } + for _, tt := range tests { + if tt.skip { + t.Logf("Skipping test %s", tt.stmt) + continue + } + + parsedStmt := mustParseStatement(tt.stmt).(*influxql.ShowMeasurementsStatement) + + // Create Mappers and Executor. + mapper0, err := store0.CreateMapper(sID0, parsedStmt, tt.chunkSize) + if err != nil { + t.Fatalf("failed to create mapper0: %s", err.Error()) + } + mapper1, err := store1.CreateMapper(sID1, parsedStmt, tt.chunkSize) + if err != nil { + t.Fatalf("failed to create mapper1: %s", err.Error()) + } + executor := tsdb.NewShowMeasurementsExecutor(parsedStmt, []tsdb.Mapper{mapper0, mapper1}, tt.chunkSize) + + // Check the results. + got := executeAndGetResults(executor) + if got != tt.expected { + t.Fatalf("Test %s\nexp: %s\ngot: %s\n", tt.stmt, tt.expected, got) + } + + } +} + // TestProccessAggregateDerivative tests the RawQueryDerivativeProcessor transformation function on the engine. // The is called for a query with a GROUP BY. func TestProcessAggregateDerivative(t *testing.T) { @@ -974,11 +1054,11 @@ type testQEShardMapper struct { store *tsdb.Store } -func (t *testQEShardMapper) CreateMapper(shard meta.ShardInfo, stmt string, chunkSize int) (tsdb.Mapper, error) { +func (t *testQEShardMapper) CreateMapper(shard meta.ShardInfo, stmt influxql.Statement, chunkSize int) (tsdb.Mapper, error) { return t.store.CreateMapper(shard.ID, stmt, chunkSize) } -func executeAndGetResults(executor *tsdb.Executor) string { +func executeAndGetResults(executor tsdb.Executor) string { ch := executor.Execute() var rows []*influxql.Row diff --git a/tsdb/mapper.go b/tsdb/mapper.go index 634e4ae4d5..6bcff36251 100644 --- a/tsdb/mapper.go +++ b/tsdb/mapper.go @@ -40,8 +40,8 @@ func (mo *MapperOutput) key() string { return mo.cursorKey } -// LocalMapper is for retrieving data for a query, from a given shard. -type LocalMapper struct { +// SelectMapper is for retrieving data for a query, from a given shard. +type SelectMapper struct { shard *Shard remote Mapper stmt influxql.Statement @@ -67,9 +67,9 @@ type LocalMapper struct { fieldNames []string // the field name being read for mapping. } -// NewLocalMapper returns a mapper for the given shard, which will return data for the SELECT statement. -func NewLocalMapper(shard *Shard, stmt influxql.Statement, chunkSize int) *LocalMapper { - return &LocalMapper{ +// NewSelectMapper returns a mapper for the given shard, which will return data for the SELECT statement. +func NewSelectMapper(shard *Shard, stmt influxql.Statement, chunkSize int) *SelectMapper { + return &SelectMapper{ shard: shard, stmt: stmt, chunkSize: chunkSize, @@ -78,12 +78,12 @@ func NewLocalMapper(shard *Shard, stmt influxql.Statement, chunkSize int) *Local } // openMeta opens the mapper for a meta query. -func (lm *LocalMapper) openMeta() error { +func (lm *SelectMapper) openMeta() error { return errors.New("not implemented") } // Open opens the local mapper. -func (lm *LocalMapper) Open() error { +func (lm *SelectMapper) Open() error { if lm.remote != nil { return lm.remote.Open() } @@ -260,12 +260,12 @@ func (lm *LocalMapper) Open() error { return nil } -func (lm *LocalMapper) SetRemote(m Mapper) error { +func (lm *SelectMapper) SetRemote(m Mapper) error { lm.remote = m return nil } -func (lm *LocalMapper) NextChunk() (interface{}, error) { +func (lm *SelectMapper) NextChunk() (interface{}, error) { // If set, use remote mapper. if lm.remote != nil { b, err := lm.remote.NextChunk() @@ -296,7 +296,7 @@ func (lm *LocalMapper) NextChunk() (interface{}, error) { // nextChunkRaw returns the next chunk of data. Data comes in the same order as the // tags return by TagSets. A chunk never contains data for more than 1 tagset. // If there is no more data for any tagset, nil will be returned. -func (lm *LocalMapper) nextChunkRaw() (interface{}, error) { +func (lm *SelectMapper) nextChunkRaw() (interface{}, error) { var output *MapperOutput for { if lm.currCursorIndex == len(lm.cursors) { @@ -338,7 +338,7 @@ func (lm *LocalMapper) nextChunkRaw() (interface{}, error) { // for the current tagset. Tagsets are always processed in the same order as that // returned by AvailTagsSets(). When there is no more data for any tagset nil // is returned. -func (lm *LocalMapper) nextChunkAgg() (interface{}, error) { +func (lm *SelectMapper) nextChunkAgg() (interface{}, error) { var output *MapperOutput for { if lm.currCursorIndex == len(lm.cursors) { @@ -418,7 +418,7 @@ func (lm *LocalMapper) nextChunkAgg() (interface{}, error) { // nextInterval returns the next interval for which to return data. If start is less than 0 // there are no more intervals. -func (lm *LocalMapper) nextInterval() (start, end int64) { +func (lm *SelectMapper) nextInterval() (start, end int64) { t := lm.queryTMinWindow + int64(lm.currInterval+lm.selectStmt.Offset)*lm.intervalSize // Onto next interval. @@ -433,7 +433,7 @@ func (lm *LocalMapper) nextInterval() (start, end int64) { // initializeMapFunctions initialize the mapping functions for the mapper. This only applies // to aggregate queries. -func (lm *LocalMapper) initializeMapFunctions() error { +func (lm *SelectMapper) initializeMapFunctions() error { var err error // Set up each mapping function for this statement. aggregates := lm.selectStmt.FunctionCalls() @@ -467,10 +467,10 @@ func (lm *LocalMapper) initializeMapFunctions() error { } // rewriteSelectStatement performs any necessary query re-writing. -func (lm *LocalMapper) rewriteSelectStatement(stmt *influxql.SelectStatement) (*influxql.SelectStatement, error) { +func (lm *SelectMapper) rewriteSelectStatement(stmt *influxql.SelectStatement) (*influxql.SelectStatement, error) { var err error // Expand regex expressions in the FROM clause. - sources, err := lm.expandSources(stmt.Sources) + sources, err := expandSources(stmt.Sources, lm.shard.index) if err != nil { return nil, err } @@ -488,7 +488,7 @@ func (lm *LocalMapper) rewriteSelectStatement(stmt *influxql.SelectStatement) (* // If only a `SELECT *` is present, without a `GROUP BY *`, both tags and fields expand in the SELECT // If a `SELECT *` and a `GROUP BY *` are both present, then only fiels are expanded in the `SELECT` and only // tags are expanded in the `GROUP BY` -func (lm *LocalMapper) expandWildcards(stmt *influxql.SelectStatement) (*influxql.SelectStatement, error) { +func (lm *SelectMapper) expandWildcards(stmt *influxql.SelectStatement) (*influxql.SelectStatement, error) { // If there are no wildcards in the statement, return it as-is. if !stmt.HasWildcard() { return stmt, nil @@ -550,54 +550,8 @@ func (lm *LocalMapper) expandWildcards(stmt *influxql.SelectStatement) (*influxq return stmt.RewriteWildcards(fields, dimensions), nil } -// expandSources expands regex sources and removes duplicates. -// NOTE: sources must be normalized (db and rp set) before calling this function. -func (lm *LocalMapper) expandSources(sources influxql.Sources) (influxql.Sources, error) { - // Use a map as a set to prevent duplicates. Two regexes might produce - // duplicates when expanded. - set := map[string]influxql.Source{} - names := []string{} - // Iterate all sources, expanding regexes when they're found. - for _, source := range sources { - switch src := source.(type) { - case *influxql.Measurement: - if src.Regex == nil { - name := src.String() - set[name] = src - names = append(names, name) - continue - } - // Get measurements from the database that match the regex. - measurements := lm.shard.index.measurementsByRegex(src.Regex.Val) - // Add those measurements to the set. - for _, m := range measurements { - m2 := &influxql.Measurement{ - Database: src.Database, - RetentionPolicy: src.RetentionPolicy, - Name: m.Name, - } - name := m2.String() - if _, ok := set[name]; !ok { - set[name] = m2 - names = append(names, name) - } - } - default: - return nil, fmt.Errorf("expandSources: unsuported source type: %T", source) - } - } - // Sort the list of source names. - sort.Strings(names) - // Convert set to a list of Sources. - expanded := make(influxql.Sources, 0, len(set)) - for _, name := range names { - expanded = append(expanded, set[name]) - } - return expanded, nil -} - // TagSets returns the list of TagSets for which this mapper has data. -func (lm *LocalMapper) TagSets() []string { +func (lm *SelectMapper) TagSets() []string { if lm.remote != nil { return lm.remote.TagSets() } @@ -606,7 +560,7 @@ func (lm *LocalMapper) TagSets() []string { // Fields returns any SELECT fields. If this Mapper is not processing a SELECT query // then an empty slice is returned. -func (lm *LocalMapper) Fields() []string { +func (lm *SelectMapper) Fields() []string { if lm.remote != nil { return lm.remote.Fields() } @@ -614,7 +568,7 @@ func (lm *LocalMapper) Fields() []string { } // Close closes the mapper. -func (lm *LocalMapper) Close() { +func (lm *SelectMapper) Close() { if lm.remote != nil { lm.remote.Close() return @@ -855,6 +809,52 @@ type tagSetsAndFields struct { whereFields []string } +// expandSources expands regex sources and removes duplicates. +// NOTE: sources must be normalized (db and rp set) before calling this function. +func expandSources(sources influxql.Sources, di *DatabaseIndex) (influxql.Sources, error) { + // Use a map as a set to prevent duplicates. Two regexes might produce + // duplicates when expanded. + set := map[string]influxql.Source{} + names := []string{} + // Iterate all sources, expanding regexes when they're found. + for _, source := range sources { + switch src := source.(type) { + case *influxql.Measurement: + if src.Regex == nil { + name := src.String() + set[name] = src + names = append(names, name) + continue + } + // Get measurements from the database that match the regex. + measurements := di.measurementsByRegex(src.Regex.Val) + // Add those measurements to the set. + for _, m := range measurements { + m2 := &influxql.Measurement{ + Database: src.Database, + RetentionPolicy: src.RetentionPolicy, + Name: m.Name, + } + name := m2.String() + if _, ok := set[name]; !ok { + set[name] = m2 + names = append(names, name) + } + } + default: + return nil, fmt.Errorf("expandSources: unsuported source type: %T", source) + } + } + // Sort the list of source names. + sort.Strings(names) + // Convert set to a list of Sources. + expanded := make(influxql.Sources, 0, len(set)) + for _, name := range names { + expanded = append(expanded, set[name]) + } + return expanded, nil +} + // createTagSetsAndFields returns the tagsets and various fields given a measurement and // SELECT statement. func createTagSetsAndFields(m *Measurement, stmt *influxql.SelectStatement) (*tagSetsAndFields, error) { diff --git a/tsdb/mapper_test.go b/tsdb/mapper_test.go index 150bc15ad2..1002d333fa 100644 --- a/tsdb/mapper_test.go +++ b/tsdb/mapper_test.go @@ -415,7 +415,7 @@ func TestShardMapper_WriteAndSingleMapperAggregateQuery(t *testing.T) { for _, tt := range tests { stmt := mustParseSelectStatement(tt.stmt) - mapper := openLocalMapperOrFail(t, shard, stmt) + mapper := openSelectMapperOrFail(t, shard, stmt) for i := range tt.expected { got := aggIntervalAsJson(t, mapper) @@ -427,7 +427,7 @@ func TestShardMapper_WriteAndSingleMapperAggregateQuery(t *testing.T) { } } -func TestShardMapper_LocalMapperTagSetsFields(t *testing.T) { +func TestShardMapper_SelectMapperTagSetsFields(t *testing.T) { tmpDir, _ := ioutil.TempDir("", "shard_test") defer os.RemoveAll(tmpDir) shard := mustCreateShard(tmpDir) @@ -490,7 +490,7 @@ func TestShardMapper_LocalMapperTagSetsFields(t *testing.T) { for _, tt := range tests { stmt := mustParseSelectStatement(tt.stmt) - mapper := openLocalMapperOrFail(t, shard, stmt) + mapper := openSelectMapperOrFail(t, shard, stmt) fields := mapper.Fields() if !reflect.DeepEqual(fields, tt.expectedFields) { @@ -526,8 +526,17 @@ func mustParseSelectStatement(s string) *influxql.SelectStatement { return stmt.(*influxql.SelectStatement) } +// mustParseStatement parses a statement. Panic on error. +func mustParseStatement(s string) influxql.Statement { + stmt, err := influxql.NewParser(strings.NewReader(s)).ParseStatement() + if err != nil { + panic(err) + } + return stmt +} + func openRawMapperOrFail(t *testing.T, shard *tsdb.Shard, stmt *influxql.SelectStatement, chunkSize int) tsdb.Mapper { - mapper := tsdb.NewLocalMapper(shard, stmt, chunkSize) + mapper := tsdb.NewSelectMapper(shard, stmt, chunkSize) if err := mapper.Open(); err != nil { t.Fatalf("failed to open raw mapper: %s", err.Error()) @@ -547,8 +556,8 @@ func nextRawChunkAsJson(t *testing.T, mapper tsdb.Mapper) string { return string(b) } -func openLocalMapperOrFail(t *testing.T, shard *tsdb.Shard, stmt *influxql.SelectStatement) *tsdb.LocalMapper { - mapper := tsdb.NewLocalMapper(shard, stmt, 0) +func openSelectMapperOrFail(t *testing.T, shard *tsdb.Shard, stmt *influxql.SelectStatement) *tsdb.SelectMapper { + mapper := tsdb.NewSelectMapper(shard, stmt, 0) if err := mapper.Open(); err != nil { t.Fatalf("failed to open aggregate mapper: %s", err.Error()) @@ -556,7 +565,7 @@ func openLocalMapperOrFail(t *testing.T, shard *tsdb.Shard, stmt *influxql.Selec return mapper } -func aggIntervalAsJson(t *testing.T, mapper *tsdb.LocalMapper) string { +func aggIntervalAsJson(t *testing.T, mapper *tsdb.SelectMapper) string { r, err := mapper.NextChunk() if err != nil { t.Fatalf("failed to get chunk from aggregate mapper: %s", err.Error()) diff --git a/tsdb/query_executor.go b/tsdb/query_executor.go index 14acc11b2e..d4c9196ed2 100644 --- a/tsdb/query_executor.go +++ b/tsdb/query_executor.go @@ -38,7 +38,7 @@ type QueryExecutor struct { // Maps shards for queries. ShardMapper interface { - CreateMapper(shard meta.ShardInfo, stmt string, chunkSize int) (Mapper, error) + CreateMapper(shard meta.ShardInfo, stmt influxql.Statement, chunkSize int) (Mapper, error) } Logger *log.Logger @@ -156,7 +156,10 @@ func (q *QueryExecutor) ExecuteQuery(query *influxql.Query, database string, chu // TODO: handle this in a cluster res = q.executeDropMeasurementStatement(stmt, database) case *influxql.ShowMeasurementsStatement: - res = q.executeShowMeasurementsStatement(stmt, database) + if err := q.executeShowMeasurementsStatement(i, stmt, database, results, chunkSize); err != nil { + results <- &influxql.Result{Err: err} + break + } case *influxql.ShowTagKeysStatement: res = q.executeShowTagKeysStatement(stmt, database) case *influxql.ShowTagValuesStatement: @@ -199,7 +202,7 @@ func (q *QueryExecutor) ExecuteQuery(query *influxql.Query, database string, chu } // Plan creates an execution plan for the given SelectStatement and returns an Executor. -func (q *QueryExecutor) Plan(stmt *influxql.SelectStatement, chunkSize int) (*Executor, error) { +func (q *QueryExecutor) PlanSelect(stmt *influxql.SelectStatement, chunkSize int) (Executor, error) { shards := map[uint64]meta.ShardInfo{} // Shards requiring mappers. // Replace instances of "now()" with the current time, and check the resultant times. @@ -234,7 +237,7 @@ func (q *QueryExecutor) Plan(stmt *influxql.SelectStatement, chunkSize int) (*Ex // Build the Mappers, one per shard. mappers := []Mapper{} for _, sh := range shards { - m, err := q.ShardMapper.CreateMapper(sh, stmt.String(), chunkSize) + m, err := q.ShardMapper.CreateMapper(sh, stmt, chunkSize) if err != nil { return nil, err } @@ -245,14 +248,14 @@ func (q *QueryExecutor) Plan(stmt *influxql.SelectStatement, chunkSize int) (*Ex mappers = append(mappers, m) } - executor := NewExecutor(stmt, mappers, chunkSize) + executor := NewSelectExecutor(stmt, mappers, chunkSize) return executor, nil } // executeSelectStatement plans and executes a select statement against a database. func (q *QueryExecutor) executeSelectStatement(statementID int, stmt *influxql.SelectStatement, results chan *influxql.Result, chunkSize int) error { // Plan statement execution. - e, err := q.Plan(stmt, chunkSize) + e, err := q.PlanSelect(stmt, chunkSize) if err != nil { return err } @@ -547,63 +550,62 @@ func (q *QueryExecutor) filterShowSeriesResult(limit, offset int, rows influxql. return filteredSeries } -func (q *QueryExecutor) executeShowMeasurementsStatement(stmt *influxql.ShowMeasurementsStatement, database string) *influxql.Result { - // Find the database. - db := q.Store.DatabaseIndex(database) - if db == nil { - return &influxql.Result{} +// PlanShowMeasurements creates an execution plan for the given SelectStatement and returns an Executor. +func (q *QueryExecutor) PlanShowMeasurements(stmt *influxql.ShowMeasurementsStatement, database string, chunkSize int) (Executor, error) { + // Get the database info. + di, err := q.MetaStore.Database(database) + if err != nil { + return nil, err + } else if di == nil { + return nil, ErrDatabaseNotFound(database) } - var measurements Measurements + // Get info for all shards in the database. + shards := di.ShardInfos() - // If a WHERE clause was specified, filter the measurements. - if stmt.Condition != nil { - var err error - measurements, err = db.measurementsByExpr(stmt.Condition) + // Build the Mappers, one per shard. + mappers := []Mapper{} + for _, sh := range shards { + m, err := q.ShardMapper.CreateMapper(sh, stmt, chunkSize) if err != nil { - return &influxql.Result{Err: err} + return nil, err } - } else { - // Otherwise, get all measurements from the database. - measurements = db.Measurements() - } - sort.Sort(measurements) - - offset := stmt.Offset - limit := stmt.Limit - - // If OFFSET is past the end of the array, return empty results. - if offset > len(measurements)-1 { - return &influxql.Result{} + if m == nil { + // No data for this shard, skip it. + continue + } + mappers = append(mappers, m) } - // Calculate last index based on LIMIT. - end := len(measurements) - if limit > 0 && offset+limit < end { - limit = offset + limit - } else { - limit = end + executor := NewShowMeasurementsExecutor(stmt, mappers, chunkSize) + return executor, nil +} + +func (q *QueryExecutor) executeShowMeasurementsStatement(statementID int, stmt *influxql.ShowMeasurementsStatement, database string, results chan *influxql.Result, chunkSize int) error { + // Plan statement execution. + e, err := q.PlanShowMeasurements(stmt, database, chunkSize) + if err != nil { + return err } - // Make a result row to hold all measurement names. - row := &influxql.Row{ - Name: "measurements", - Columns: []string{"name"}, + // Execute plan. + ch := e.Execute() + + // Stream results from the channel. We should send an empty result if nothing comes through. + resultSent := false + for row := range ch { + if row.Err != nil { + return row.Err + } + resultSent = true + results <- &influxql.Result{StatementID: statementID, Series: []*influxql.Row{row}} } - // Add one value to the row for each measurement name. - for i := offset; i < limit; i++ { - m := measurements[i] - v := interface{}(m.Name) - row.Values = append(row.Values, []interface{}{v}) + if !resultSent { + results <- &influxql.Result{StatementID: statementID, Series: make([]*influxql.Row, 0)} } - // Make a result. - result := &influxql.Result{ - Series: []*influxql.Row{row}, - } - - return result + return nil } func (q *QueryExecutor) executeShowTagKeysStatement(stmt *influxql.ShowTagKeysStatement, database string) *influxql.Result { diff --git a/tsdb/query_executor_test.go b/tsdb/query_executor_test.go index 2a0d3e581f..89f06a1007 100644 --- a/tsdb/query_executor_test.go +++ b/tsdb/query_executor_test.go @@ -18,7 +18,7 @@ var sgID = uint64(2) var shardID = uint64(1) func TestWritePointsAndExecuteQuery(t *testing.T) { - store, executor := testStoreAndExecutor() + store, executor := testStoreAndExecutor("") defer os.RemoveAll(store.Path()) // Write first point. @@ -71,7 +71,7 @@ func TestWritePointsAndExecuteQuery(t *testing.T) { // Ensure writing a point and updating it results in only a single point. func TestWritePointsAndExecuteQuery_Update(t *testing.T) { - store, executor := testStoreAndExecutor() + store, executor := testStoreAndExecutor("") defer os.RemoveAll(store.Path()) // Write original point. @@ -113,7 +113,7 @@ func TestWritePointsAndExecuteQuery_Update(t *testing.T) { } func TestDropSeriesStatement(t *testing.T) { - store, executor := testStoreAndExecutor() + store, executor := testStoreAndExecutor("") defer os.RemoveAll(store.Path()) pt := tsdb.NewPoint( @@ -169,7 +169,7 @@ func TestDropSeriesStatement(t *testing.T) { } func TestDropMeasurementStatement(t *testing.T) { - store, executor := testStoreAndExecutor() + store, executor := testStoreAndExecutor("") defer os.RemoveAll(store.Path()) pt := tsdb.NewPoint( @@ -221,11 +221,7 @@ func TestDropMeasurementStatement(t *testing.T) { validateDrop() store.Close() - conf := store.EngineOptions.Config - store = tsdb.NewStore(store.Path()) - store.EngineOptions.Config = conf - store.Open() - executor.Store = store + store, executor = testStoreAndExecutor(store.Path()) validateDrop() } @@ -239,7 +235,7 @@ func (m *metaExec) ExecuteStatement(stmt influxql.Statement) *influxql.Result { } func TestDropDatabase(t *testing.T) { - store, executor := testStoreAndExecutor() + store, executor := testStoreAndExecutor("") defer os.RemoveAll(store.Path()) pt := tsdb.NewPoint( @@ -301,7 +297,7 @@ func TestDropDatabase(t *testing.T) { // Ensure that queries for which there is no data result in an empty set. func TestQueryNoData(t *testing.T) { - store, executor := testStoreAndExecutor() + store, executor := testStoreAndExecutor("") defer os.RemoveAll(store.Path()) got := executeAndGetJSON("select * from /.*/", executor) @@ -322,7 +318,7 @@ func TestQueryNoData(t *testing.T) { // ensure that authenticate doesn't return an error if the user count is zero and they're attempting // to create a user. func TestAuthenticateIfUserCountZeroAndCreateUser(t *testing.T) { - store, executor := testStoreAndExecutor() + store, executor := testStoreAndExecutor("") defer os.RemoveAll(store.Path()) ms := &testMetastore{userCount: 0} executor.MetaStore = ms @@ -350,11 +346,13 @@ func TestAuthenticateIfUserCountZeroAndCreateUser(t *testing.T) { } } -func testStoreAndExecutor() (*tsdb.Store, *tsdb.QueryExecutor) { - path, _ := ioutil.TempDir("", "") +func testStoreAndExecutor(storePath string) (*tsdb.Store, *tsdb.QueryExecutor) { + if storePath == "" { + storePath, _ = ioutil.TempDir("", "") + } - store := tsdb.NewStore(path) - store.EngineOptions.Config.WALDir = filepath.Join(path, "wal") + store := tsdb.NewStore(storePath) + store.EngineOptions.Config.WALDir = filepath.Join(storePath, "wal") err := store.Open() if err != nil { @@ -479,7 +477,7 @@ type testShardMapper struct { store *tsdb.Store } -func (t *testShardMapper) CreateMapper(shard meta.ShardInfo, stmt string, chunkSize int) (tsdb.Mapper, error) { +func (t *testShardMapper) CreateMapper(shard meta.ShardInfo, stmt influxql.Statement, chunkSize int) (tsdb.Mapper, error) { m, err := t.store.CreateMapper(shard.ID, stmt, chunkSize) return m, err } diff --git a/tsdb/show_measurements.go b/tsdb/show_measurements.go new file mode 100644 index 0000000000..3326c6a32d --- /dev/null +++ b/tsdb/show_measurements.go @@ -0,0 +1,236 @@ +package tsdb + +import ( + "encoding/json" + "fmt" + "sort" + + "github.com/influxdb/influxdb/influxql" +) + +// ShowMeasurementsExecutor implements the Executor interface for a SHOW MEASUREMENTS statement. +type ShowMeasurementsExecutor struct { + stmt *influxql.ShowMeasurementsStatement + mappers []Mapper + chunkSize int +} + +// NewShowMeasurementsExecutor returns a new ShowMeasurementsExecutor. +func NewShowMeasurementsExecutor(stmt *influxql.ShowMeasurementsStatement, mappers []Mapper, chunkSize int) *ShowMeasurementsExecutor { + return &ShowMeasurementsExecutor{ + stmt: stmt, + mappers: mappers, + chunkSize: chunkSize, + } +} + +// Execute begins execution of the query and returns a channel to receive rows. +func (e *ShowMeasurementsExecutor) Execute() <-chan *influxql.Row { + // Create output channel and stream data in a separate goroutine. + out := make(chan *influxql.Row, 0) + + // It's important that all resources are released when execution completes. + defer e.close() + + go func() { + // Open the mappers. + for _, m := range e.mappers { + if err := m.Open(); err != nil { + out <- &influxql.Row{Err: err} + return + } + } + + // Create a set to hold measurement names from mappers. + set := map[string]struct{}{} + // Iterate through mappers collecting measurement names. + for _, m := range e.mappers { + // Get the data from the mapper. + c, err := m.NextChunk() + if err != nil { + out <- &influxql.Row{Err: err} + return + } else if c == nil { + // Mapper had no data. + continue + } + + // Convert the mapper chunk to a string array of measurement names. + mms, ok := c.([]string) + if !ok { + out <- &influxql.Row{Err: fmt.Errorf("show measurements mapper returned invalid type: %T", c)} + return + } + + // Add the measurement names to the set. + for _, mm := range mms { + set[mm] = struct{}{} + } + } + + // Convert the set into an array of measurement names. + measurements := make([]string, 0, len(set)) + for mm := range set { + measurements = append(measurements, mm) + } + // Sort the names. + sort.Strings(measurements) + + // Calculate OFFSET and LIMIT + off := e.stmt.Offset + lim := len(measurements) + stmtLim := e.stmt.Limit + + if stmtLim > 0 && off+stmtLim < lim { + lim = off + stmtLim + } else if off > lim { + off, lim = 0, 0 + } + + // Put the results in a row and send it. + row := &influxql.Row{ + Name: "measurements", + Columns: []string{"name"}, + Values: make([][]interface{}, 0, len(measurements)), + } + + for _, m := range measurements[off:lim] { + v := []interface{}{m} + row.Values = append(row.Values, v) + } + + if len(row.Values) > 0 { + out <- row + } + + close(out) + }() + return out +} + +// Close closes the executor such that all resources are released. Once closed, +// an executor may not be re-used. +func (e *ShowMeasurementsExecutor) close() { + if e != nil { + for _, m := range e.mappers { + m.Close() + } + } +} + +// ShowMeasurementsMapper is a mapper for collecting measurement names from a shard. +type ShowMeasurementsMapper struct { + remote Mapper + shard *Shard + stmt *influxql.ShowMeasurementsStatement + chunkSize int + state interface{} +} + +// NewShowMeasurementsMapper returns a mapper for the given shard, which will return data for the meta statement. +func NewShowMeasurementsMapper(shard *Shard, stmt *influxql.ShowMeasurementsStatement, chunkSize int) *ShowMeasurementsMapper { + return &ShowMeasurementsMapper{ + shard: shard, + stmt: stmt, + chunkSize: chunkSize, + } +} + +// Open opens the mapper for use. +func (m *ShowMeasurementsMapper) Open() error { + if m.remote != nil { + return m.remote.Open() + } + + var measurements Measurements + + // If a WHERE clause was specified, filter the measurements. + if m.stmt.Condition != nil { + var err error + measurements, err = m.shard.index.measurementsByExpr(m.stmt.Condition) + if err != nil { + return err + } + } else { + // Otherwise, get all measurements from the database. + measurements = m.shard.index.Measurements() + } + sort.Sort(measurements) + + // Create a channel to send measurement names on. + ch := make(chan string) + // Start a goroutine to send the names over the channel as needed. + go func() { + for _, mm := range measurements { + ch <- mm.Name + } + close(ch) + }() + + // Store the channel as the state of the mapper. + m.state = ch + + return nil +} + +// SetRemote sets the remote mapper to use. +func (m *ShowMeasurementsMapper) SetRemote(remote Mapper) error { + m.remote = remote + return nil +} + +// TagSets is only implemented on this mapper to satisfy the Mapper interface. +func (m *ShowMeasurementsMapper) TagSets() []string { return nil } + +// Fields returns a list of field names for this mapper. +func (m *ShowMeasurementsMapper) Fields() []string { return []string{"name"} } + +// NextChunk returns the next chunk of measurement names. +func (m *ShowMeasurementsMapper) NextChunk() (interface{}, error) { + if m.remote != nil { + b, err := m.remote.NextChunk() + if err != nil { + return nil, err + } else if b == nil { + return nil, nil + } + + names := []string{} + if err := json.Unmarshal(b.([]byte), &names); err != nil { + return nil, err + } else if len(names) == 0 { + // Mapper on other node sent 0 values so it's done. + return nil, nil + } + return names, nil + } + return m.nextChunk() +} + +// nextChunk implements next chunk logic for a local shard. +func (m *ShowMeasurementsMapper) nextChunk() (interface{}, error) { + // Allocate array to hold measurement names. + names := make([]string, 0, m.chunkSize) + // Get the channel of measurement names from the state. + measurementNames := m.state.(chan string) + // Get the next chunk of names. + for n := range measurementNames { + names = append(names, n) + if len(names) == m.chunkSize { + break + } + } + // See if we've read all the names. + if len(names) == 0 { + return nil, nil + } + + return names, nil +} + +// Close closes the mapper. +func (m *ShowMeasurementsMapper) Close() { + if m.remote != nil { + m.remote.Close() + } +} diff --git a/tsdb/store.go b/tsdb/store.go index b2ade9a276..2bf20996c6 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -303,23 +303,21 @@ func (s *Store) WriteToShard(shardID uint64, points []Point) error { return sh.WritePoints(points) } -func (s *Store) CreateMapper(shardID uint64, query string, chunkSize int) (Mapper, error) { - q, err := influxql.NewParser(strings.NewReader(query)).ParseStatement() - if err != nil { - return nil, err - } - stmt, ok := q.(*influxql.SelectStatement) - if !ok { - return nil, fmt.Errorf("query is not a SELECT statement: %s", err.Error()) - } - +func (s *Store) CreateMapper(shardID uint64, stmt influxql.Statement, chunkSize int) (Mapper, error) { shard := s.Shard(shardID) if shard == nil { // This can happen if the shard has been assigned, but hasn't actually been created yet. return nil, nil } - return NewLocalMapper(shard, stmt, chunkSize), nil + switch st := stmt.(type) { + case *influxql.SelectStatement: + return NewSelectMapper(shard, st, chunkSize), nil + case *influxql.ShowMeasurementsStatement: + return NewShowMeasurementsMapper(shard, st, chunkSize), nil + default: + return nil, fmt.Errorf("can't create mapper for statement type: %v", st) + } } func (s *Store) Close() error {