Merge pull request #3808 from influxdb/dmq-show-measurements2

convert SHOW MEASUREMENTS to a distributed query
pull/3853/head
dgnorton 2015-08-26 11:43:38 -04:00
commit 2cf6233cbc
14 changed files with 564 additions and 187 deletions

View File

@ -11,6 +11,7 @@ import (
"strings"
"sync"
"github.com/influxdb/influxdb/influxql"
"github.com/influxdb/influxdb/meta"
"github.com/influxdb/influxdb/tsdb"
)
@ -37,7 +38,7 @@ type Service struct {
TSDBStore interface {
CreateShard(database, policy string, shardID uint64) error
WriteToShard(shardID uint64, points []tsdb.Point) error
CreateMapper(shardID uint64, query string, chunkSize int) (tsdb.Mapper, error)
CreateMapper(shardID uint64, stmt influxql.Statement, chunkSize int) (tsdb.Mapper, error)
}
Logger *log.Logger
@ -232,7 +233,15 @@ func (s *Service) processMapShardRequest(w io.Writer, buf []byte) error {
return err
}
m, err := s.TSDBStore.CreateMapper(req.ShardID(), req.Query(), int(req.ChunkSize()))
// Parse the statement.
q, err := influxql.ParseQuery(req.Query())
if err != nil {
return fmt.Errorf("processing map shard: %s", err)
} else if len(q.Statements) != 1 {
return fmt.Errorf("processing map shard: expected 1 statement but got %d", len(q.Statements))
}
m, err := s.TSDBStore.CreateMapper(req.ShardID(), q.Statements[0], int(req.ChunkSize()))
if err != nil {
return fmt.Errorf("create mapper: %s", err)
}

View File

@ -6,6 +6,7 @@ import (
"time"
"github.com/influxdb/influxdb/cluster"
"github.com/influxdb/influxdb/influxql"
"github.com/influxdb/influxdb/meta"
"github.com/influxdb/influxdb/tcp"
"github.com/influxdb/influxdb/tsdb"
@ -28,7 +29,7 @@ type testService struct {
muxln net.Listener
writeShardFunc func(shardID uint64, points []tsdb.Point) error
createShardFunc func(database, policy string, shardID uint64) error
createMapperFunc func(shardID uint64, query string, chunkSize int) (tsdb.Mapper, error)
createMapperFunc func(shardID uint64, stmt influxql.Statement, chunkSize int) (tsdb.Mapper, error)
}
func newTestWriteService(f func(shardID uint64, points []tsdb.Point) error) testService {
@ -69,8 +70,8 @@ func (t testService) CreateShard(database, policy string, shardID uint64) error
return t.createShardFunc(database, policy, shardID)
}
func (t testService) CreateMapper(shardID uint64, query string, chunkSize int) (tsdb.Mapper, error) {
return t.createMapperFunc(shardID, query, chunkSize)
func (t testService) CreateMapper(shardID uint64, stmt influxql.Statement, chunkSize int) (tsdb.Mapper, error) {
return t.createMapperFunc(shardID, stmt, chunkSize)
}
func writeShardSuccess(shardID uint64, points []tsdb.Point) error {

View File

@ -7,6 +7,7 @@ import (
"net"
"time"
"github.com/influxdb/influxdb/influxql"
"github.com/influxdb/influxdb/meta"
"github.com/influxdb/influxdb/tsdb"
"gopkg.in/fatih/pool.v2"
@ -24,7 +25,7 @@ type ShardMapper struct {
}
TSDBStore interface {
CreateMapper(shardID uint64, query string, chunkSize int) (tsdb.Mapper, error)
CreateMapper(shardID uint64, stmt influxql.Statement, chunkSize int) (tsdb.Mapper, error)
}
timeout time.Duration
@ -40,7 +41,7 @@ func NewShardMapper(timeout time.Duration) *ShardMapper {
}
// CreateMapper returns a Mapper for the given shard ID.
func (s *ShardMapper) CreateMapper(sh meta.ShardInfo, stmt string, chunkSize int) (tsdb.Mapper, error) {
func (s *ShardMapper) CreateMapper(sh meta.ShardInfo, stmt influxql.Statement, chunkSize int) (tsdb.Mapper, error) {
m, err := s.TSDBStore.CreateMapper(sh.ID, stmt, chunkSize)
if err != nil {
return nil, err
@ -86,7 +87,7 @@ type remoteShardConn interface {
// sends a query, and interprets the stream of data that comes back.
type RemoteMapper struct {
shardID uint64
stmt string
stmt influxql.Statement
chunkSize int
tagsets []string
@ -97,7 +98,7 @@ type RemoteMapper struct {
}
// NewRemoteMapper returns a new remote mapper using the given connection.
func NewRemoteMapper(c remoteShardConn, shardID uint64, stmt string, chunkSize int) *RemoteMapper {
func NewRemoteMapper(c remoteShardConn, shardID uint64, stmt influxql.Statement, chunkSize int) *RemoteMapper {
return &RemoteMapper{
conn: c,
shardID: shardID,
@ -116,7 +117,7 @@ func (r *RemoteMapper) Open() (err error) {
// Build Map request.
var request MapShardRequest
request.SetShardID(r.shardID)
request.SetQuery(r.stmt)
request.SetQuery(r.stmt.String())
request.SetChunkSize(int32(r.chunkSize))
// Marshal into protocol buffers.

View File

@ -3,9 +3,11 @@ package cluster
import (
"bytes"
"encoding/json"
"fmt"
"io"
"testing"
"github.com/influxdb/influxdb/influxql"
"github.com/influxdb/influxdb/tsdb"
)
@ -63,7 +65,7 @@ func TestShardWriter_RemoteMapper_Success(t *testing.T) {
c := newRemoteShardResponder([]*tsdb.MapperOutput{expOutput, nil}, expTagSets)
r := NewRemoteMapper(c, 1234, "SELECT * FROM CPU", 10)
r := NewRemoteMapper(c, 1234, mustParseStmt("SELECT * FROM CPU"), 10)
if err := r.Open(); err != nil {
t.Fatalf("failed to open remote mapper: %s", err.Error())
}
@ -98,3 +100,14 @@ func TestShardWriter_RemoteMapper_Success(t *testing.T) {
t.Fatal("received more chunks when none expected")
}
}
// mustParseStmt parses a single statement or panics.
func mustParseStmt(stmt string) influxql.Statement {
q, err := influxql.ParseQuery(stmt)
if err != nil {
panic(err)
} else if len(q.Statements) != 1 {
panic(fmt.Sprintf("expected 1 statement but got %d", len(q.Statements)))
}
return q.Statements[0]
}

View File

@ -662,6 +662,31 @@ func (di DatabaseInfo) RetentionPolicy(name string) *RetentionPolicyInfo {
return nil
}
// ShardInfos returns a list of all shards' info for the database.
func (di DatabaseInfo) ShardInfos() []ShardInfo {
shards := map[uint64]*ShardInfo{}
for i := range di.RetentionPolicies {
for j := range di.RetentionPolicies[i].ShardGroups {
sg := di.RetentionPolicies[i].ShardGroups[j]
// Skip deleted shard groups
if sg.Deleted() {
continue
}
for k := range sg.Shards {
si := &di.RetentionPolicies[i].ShardGroups[j].Shards[k]
shards[si.ID] = si
}
}
}
infos := make([]ShardInfo, 0, len(shards))
for _, info := range shards {
infos = append(infos, *info)
}
return infos
}
// clone returns a deep copy of di.
func (di DatabaseInfo) clone() DatabaseInfo {
other := di

View File

@ -194,7 +194,7 @@ func (e *Engine) LoadMetadataIndex(index *tsdb.DatabaseIndex, measurementFields
return err
}
// now flush the metadata that was in the WAL, but hand't yet been flushed
// now flush the metadata that was in the WAL, but hadn't yet been flushed
if err := e.WAL.LoadMetadataIndex(index, measurementFields); err != nil {
return err
}

View File

@ -21,6 +21,11 @@ const (
IgnoredChunkSize = 0
)
// Executor is an interface for a query executor.
type Executor interface {
Execute() <-chan *influxql.Row
}
// Mapper is the interface all Mapper types must implement.
type Mapper interface {
Open() error
@ -54,20 +59,20 @@ func (sm *StatefulMapper) NextChunk() (*MapperOutput, error) {
return chunk, nil
}
type Executor struct {
type SelectExecutor struct {
stmt *influxql.SelectStatement
mappers []*StatefulMapper
chunkSize int
limitedTagSets map[string]struct{} // Set tagsets for which data has reached the LIMIT.
}
// NewExecutor returns a new Executor.
func NewExecutor(stmt *influxql.SelectStatement, mappers []Mapper, chunkSize int) *Executor {
// NewSelectExecutor returns a new SelectExecutor.
func NewSelectExecutor(stmt *influxql.SelectStatement, mappers []Mapper, chunkSize int) *SelectExecutor {
a := []*StatefulMapper{}
for _, m := range mappers {
a = append(a, &StatefulMapper{m, nil, false})
}
return &Executor{
return &SelectExecutor{
stmt: stmt,
mappers: a,
chunkSize: chunkSize,
@ -76,12 +81,12 @@ func NewExecutor(stmt *influxql.SelectStatement, mappers []Mapper, chunkSize int
}
// Execute begins execution of the query and returns a channel to receive rows.
func (e *Executor) Execute() <-chan *influxql.Row {
func (e *SelectExecutor) Execute() <-chan *influxql.Row {
// Create output channel and stream data in a separate goroutine.
out := make(chan *influxql.Row, 0)
// Certain operations on the SELECT statement can be performed by the Executor without
// assistance from the Mappers. This allows the Executor to prepare aggregation functions
// Certain operations on the SELECT statement can be performed by the SelectExecutor without
// assistance from the Mappers. This allows the SelectExecutor to prepare aggregation functions
// and mathematical functions.
e.stmt.RewriteDistinct()
@ -94,7 +99,7 @@ func (e *Executor) Execute() <-chan *influxql.Row {
}
// mappersDrained returns whether all the executors Mappers have been drained of data.
func (e *Executor) mappersDrained() bool {
func (e *SelectExecutor) mappersDrained() bool {
for _, m := range e.mappers {
if !m.drained {
return false
@ -104,7 +109,7 @@ func (e *Executor) mappersDrained() bool {
}
// nextMapperTagset returns the alphabetically lowest tagset across all Mappers.
func (e *Executor) nextMapperTagSet() string {
func (e *SelectExecutor) nextMapperTagSet() string {
tagset := ""
for _, m := range e.mappers {
if m.bufferedChunk != nil {
@ -119,7 +124,7 @@ func (e *Executor) nextMapperTagSet() string {
}
// nextMapperLowestTime returns the lowest minimum time across all Mappers, for the given tagset.
func (e *Executor) nextMapperLowestTime(tagset string) int64 {
func (e *SelectExecutor) nextMapperLowestTime(tagset string) int64 {
minTime := int64(math.MaxInt64)
for _, m := range e.mappers {
if !m.drained && m.bufferedChunk != nil {
@ -136,17 +141,17 @@ func (e *Executor) nextMapperLowestTime(tagset string) int64 {
}
// tagSetIsLimited returns whether data for the given tagset has been LIMITed.
func (e *Executor) tagSetIsLimited(tagset string) bool {
func (e *SelectExecutor) tagSetIsLimited(tagset string) bool {
_, ok := e.limitedTagSets[tagset]
return ok
}
// limitTagSet marks the given taset as LIMITed.
func (e *Executor) limitTagSet(tagset string) {
func (e *SelectExecutor) limitTagSet(tagset string) {
e.limitedTagSets[tagset] = struct{}{}
}
func (e *Executor) executeRaw(out chan *influxql.Row) {
func (e *SelectExecutor) executeRaw(out chan *influxql.Row) {
// It's important that all resources are released when execution completes.
defer e.close()
@ -329,7 +334,7 @@ func (e *Executor) executeRaw(out chan *influxql.Row) {
close(out)
}
func (e *Executor) executeAggregate(out chan *influxql.Row) {
func (e *SelectExecutor) executeAggregate(out chan *influxql.Row) {
// It's important to close all resources when execution completes.
defer e.close()
@ -493,7 +498,7 @@ func (e *Executor) executeAggregate(out chan *influxql.Row) {
// processFill will take the results and return new results (or the same if no fill modifications are needed)
// with whatever fill options the query has.
func (e *Executor) processFill(results [][]interface{}) [][]interface{} {
func (e *SelectExecutor) processFill(results [][]interface{}) [][]interface{} {
// don't do anything if we're supposed to leave the nulls
if e.stmt.Fill == influxql.NullFill {
return results
@ -539,7 +544,7 @@ func (e *Executor) processFill(results [][]interface{}) [][]interface{} {
}
// processDerivative returns the derivatives of the results
func (e *Executor) processDerivative(results [][]interface{}) [][]interface{} {
func (e *SelectExecutor) processDerivative(results [][]interface{}) [][]interface{} {
// Return early if we're not supposed to process the derivatives
if e.stmt.HasDerivative() {
interval, err := derivativeInterval(e.stmt)
@ -556,7 +561,7 @@ func (e *Executor) processDerivative(results [][]interface{}) [][]interface{} {
// Close closes the executor such that all resources are released. Once closed,
// an executor may not be re-used.
func (e *Executor) close() {
func (e *SelectExecutor) close() {
if e != nil {
for _, m := range e.mappers {
m.Close()

View File

@ -139,7 +139,7 @@ func TestWritePointsAndExecuteTwoShards(t *testing.T) {
t.Logf("Skipping test %s", tt.stmt)
continue
}
executor, err := query_executor.Plan(mustParseSelectStatement(tt.stmt), tt.chunkSize)
executor, err := query_executor.PlanSelect(mustParseSelectStatement(tt.stmt), tt.chunkSize)
if err != nil {
t.Fatalf("failed to plan query: %s", err.Error())
}
@ -238,7 +238,7 @@ func TestWritePointsAndExecuteTwoShardsAlign(t *testing.T) {
t.Logf("Skipping test %s", tt.stmt)
continue
}
executor, err := query_executor.Plan(mustParseSelectStatement(tt.stmt), tt.chunkSize)
executor, err := query_executor.PlanSelect(mustParseSelectStatement(tt.stmt), tt.chunkSize)
if err != nil {
t.Fatalf("failed to plan query: %s", err.Error())
}
@ -306,15 +306,15 @@ func TestWritePointsAndExecuteTwoShardsQueryRewrite(t *testing.T) {
parsedSelectStmt := mustParseSelectStatement(tt.stmt)
// Create Mappers and Executor.
mapper0, err := store0.CreateMapper(sID0, tt.stmt, tt.chunkSize)
mapper0, err := store0.CreateMapper(sID0, parsedSelectStmt, tt.chunkSize)
if err != nil {
t.Fatalf("failed to create mapper0: %s", err.Error())
}
mapper1, err := store1.CreateMapper(sID1, tt.stmt, tt.chunkSize)
mapper1, err := store1.CreateMapper(sID1, parsedSelectStmt, tt.chunkSize)
if err != nil {
t.Fatalf("failed to create mapper1: %s", err.Error())
}
executor := tsdb.NewExecutor(parsedSelectStmt, []tsdb.Mapper{mapper0, mapper1}, tt.chunkSize)
executor := tsdb.NewSelectExecutor(parsedSelectStmt, []tsdb.Mapper{mapper0, mapper1}, tt.chunkSize)
// Check the results.
got := executeAndGetResults(executor)
@ -421,7 +421,7 @@ func TestWritePointsAndExecuteTwoShardsTagSetOrdering(t *testing.T) {
t.Logf("Skipping test %s", tt.stmt)
continue
}
executor, err := query_executor.Plan(mustParseSelectStatement(tt.stmt), tt.chunkSize)
executor, err := query_executor.PlanSelect(mustParseSelectStatement(tt.stmt), tt.chunkSize)
if err != nil {
t.Fatalf("failed to plan query: %s", err.Error())
}
@ -432,6 +432,86 @@ func TestWritePointsAndExecuteTwoShardsTagSetOrdering(t *testing.T) {
}
}
// Test to ensure the engine handles measurements across stores.
func TestWritePointsAndExecuteTwoShardsShowMeasurements(t *testing.T) {
// Create two distinct stores, ensuring shard mappers will shard nothing.
store0 := testStore()
defer os.RemoveAll(store0.Path())
store1 := testStore()
defer os.RemoveAll(store1.Path())
// Create a shard in each store.
database := "foo"
retentionPolicy := "bar"
store0.CreateShard(database, retentionPolicy, sID0)
store1.CreateShard(database, retentionPolicy, sID1)
// Write two points across shards.
pt1time := time.Unix(1, 0).UTC()
if err := store0.WriteToShard(sID0, []tsdb.Point{tsdb.NewPoint(
"cpu",
map[string]string{"host": "serverA"},
map[string]interface{}{"value1": 100},
pt1time,
)}); err != nil {
t.Fatalf(err.Error())
}
pt2time := time.Unix(2, 0).UTC()
if err := store1.WriteToShard(sID1, []tsdb.Point{tsdb.NewPoint(
"mem",
map[string]string{"host": "serverB"},
map[string]interface{}{"value2": 200},
pt2time,
)}); err != nil {
t.Fatalf(err.Error())
}
var tests = []struct {
skip bool // Skip test
stmt string // Query statement
chunkSize int // Chunk size for driving the executor
expected string // Expected results, rendered as a string
}{
{
stmt: `SHOW MEASUREMENTS`,
expected: `[{"name":"measurements","columns":["name"],"values":[["cpu"],["mem"]]}]`,
},
{
stmt: `SHOW MEASUREMENTS WHERE host='serverB'`,
expected: `[{"name":"measurements","columns":["name"],"values":[["mem"]]}]`,
},
{
stmt: `SHOW MEASUREMENTS WHERE host='serverX'`,
expected: `null`,
},
}
for _, tt := range tests {
if tt.skip {
t.Logf("Skipping test %s", tt.stmt)
continue
}
parsedStmt := mustParseStatement(tt.stmt).(*influxql.ShowMeasurementsStatement)
// Create Mappers and Executor.
mapper0, err := store0.CreateMapper(sID0, parsedStmt, tt.chunkSize)
if err != nil {
t.Fatalf("failed to create mapper0: %s", err.Error())
}
mapper1, err := store1.CreateMapper(sID1, parsedStmt, tt.chunkSize)
if err != nil {
t.Fatalf("failed to create mapper1: %s", err.Error())
}
executor := tsdb.NewShowMeasurementsExecutor(parsedStmt, []tsdb.Mapper{mapper0, mapper1}, tt.chunkSize)
// Check the results.
got := executeAndGetResults(executor)
if got != tt.expected {
t.Fatalf("Test %s\nexp: %s\ngot: %s\n", tt.stmt, tt.expected, got)
}
}
}
// TestProccessAggregateDerivative tests the RawQueryDerivativeProcessor transformation function on the engine.
// The is called for a query with a GROUP BY.
func TestProcessAggregateDerivative(t *testing.T) {
@ -974,11 +1054,11 @@ type testQEShardMapper struct {
store *tsdb.Store
}
func (t *testQEShardMapper) CreateMapper(shard meta.ShardInfo, stmt string, chunkSize int) (tsdb.Mapper, error) {
func (t *testQEShardMapper) CreateMapper(shard meta.ShardInfo, stmt influxql.Statement, chunkSize int) (tsdb.Mapper, error) {
return t.store.CreateMapper(shard.ID, stmt, chunkSize)
}
func executeAndGetResults(executor *tsdb.Executor) string {
func executeAndGetResults(executor tsdb.Executor) string {
ch := executor.Execute()
var rows []*influxql.Row

View File

@ -40,8 +40,8 @@ func (mo *MapperOutput) key() string {
return mo.cursorKey
}
// LocalMapper is for retrieving data for a query, from a given shard.
type LocalMapper struct {
// SelectMapper is for retrieving data for a query, from a given shard.
type SelectMapper struct {
shard *Shard
remote Mapper
stmt influxql.Statement
@ -67,9 +67,9 @@ type LocalMapper struct {
fieldNames []string // the field name being read for mapping.
}
// NewLocalMapper returns a mapper for the given shard, which will return data for the SELECT statement.
func NewLocalMapper(shard *Shard, stmt influxql.Statement, chunkSize int) *LocalMapper {
return &LocalMapper{
// NewSelectMapper returns a mapper for the given shard, which will return data for the SELECT statement.
func NewSelectMapper(shard *Shard, stmt influxql.Statement, chunkSize int) *SelectMapper {
return &SelectMapper{
shard: shard,
stmt: stmt,
chunkSize: chunkSize,
@ -78,12 +78,12 @@ func NewLocalMapper(shard *Shard, stmt influxql.Statement, chunkSize int) *Local
}
// openMeta opens the mapper for a meta query.
func (lm *LocalMapper) openMeta() error {
func (lm *SelectMapper) openMeta() error {
return errors.New("not implemented")
}
// Open opens the local mapper.
func (lm *LocalMapper) Open() error {
func (lm *SelectMapper) Open() error {
if lm.remote != nil {
return lm.remote.Open()
}
@ -260,12 +260,12 @@ func (lm *LocalMapper) Open() error {
return nil
}
func (lm *LocalMapper) SetRemote(m Mapper) error {
func (lm *SelectMapper) SetRemote(m Mapper) error {
lm.remote = m
return nil
}
func (lm *LocalMapper) NextChunk() (interface{}, error) {
func (lm *SelectMapper) NextChunk() (interface{}, error) {
// If set, use remote mapper.
if lm.remote != nil {
b, err := lm.remote.NextChunk()
@ -296,7 +296,7 @@ func (lm *LocalMapper) NextChunk() (interface{}, error) {
// nextChunkRaw returns the next chunk of data. Data comes in the same order as the
// tags return by TagSets. A chunk never contains data for more than 1 tagset.
// If there is no more data for any tagset, nil will be returned.
func (lm *LocalMapper) nextChunkRaw() (interface{}, error) {
func (lm *SelectMapper) nextChunkRaw() (interface{}, error) {
var output *MapperOutput
for {
if lm.currCursorIndex == len(lm.cursors) {
@ -338,7 +338,7 @@ func (lm *LocalMapper) nextChunkRaw() (interface{}, error) {
// for the current tagset. Tagsets are always processed in the same order as that
// returned by AvailTagsSets(). When there is no more data for any tagset nil
// is returned.
func (lm *LocalMapper) nextChunkAgg() (interface{}, error) {
func (lm *SelectMapper) nextChunkAgg() (interface{}, error) {
var output *MapperOutput
for {
if lm.currCursorIndex == len(lm.cursors) {
@ -418,7 +418,7 @@ func (lm *LocalMapper) nextChunkAgg() (interface{}, error) {
// nextInterval returns the next interval for which to return data. If start is less than 0
// there are no more intervals.
func (lm *LocalMapper) nextInterval() (start, end int64) {
func (lm *SelectMapper) nextInterval() (start, end int64) {
t := lm.queryTMinWindow + int64(lm.currInterval+lm.selectStmt.Offset)*lm.intervalSize
// Onto next interval.
@ -433,7 +433,7 @@ func (lm *LocalMapper) nextInterval() (start, end int64) {
// initializeMapFunctions initialize the mapping functions for the mapper. This only applies
// to aggregate queries.
func (lm *LocalMapper) initializeMapFunctions() error {
func (lm *SelectMapper) initializeMapFunctions() error {
var err error
// Set up each mapping function for this statement.
aggregates := lm.selectStmt.FunctionCalls()
@ -467,10 +467,10 @@ func (lm *LocalMapper) initializeMapFunctions() error {
}
// rewriteSelectStatement performs any necessary query re-writing.
func (lm *LocalMapper) rewriteSelectStatement(stmt *influxql.SelectStatement) (*influxql.SelectStatement, error) {
func (lm *SelectMapper) rewriteSelectStatement(stmt *influxql.SelectStatement) (*influxql.SelectStatement, error) {
var err error
// Expand regex expressions in the FROM clause.
sources, err := lm.expandSources(stmt.Sources)
sources, err := expandSources(stmt.Sources, lm.shard.index)
if err != nil {
return nil, err
}
@ -488,7 +488,7 @@ func (lm *LocalMapper) rewriteSelectStatement(stmt *influxql.SelectStatement) (*
// If only a `SELECT *` is present, without a `GROUP BY *`, both tags and fields expand in the SELECT
// If a `SELECT *` and a `GROUP BY *` are both present, then only fiels are expanded in the `SELECT` and only
// tags are expanded in the `GROUP BY`
func (lm *LocalMapper) expandWildcards(stmt *influxql.SelectStatement) (*influxql.SelectStatement, error) {
func (lm *SelectMapper) expandWildcards(stmt *influxql.SelectStatement) (*influxql.SelectStatement, error) {
// If there are no wildcards in the statement, return it as-is.
if !stmt.HasWildcard() {
return stmt, nil
@ -550,54 +550,8 @@ func (lm *LocalMapper) expandWildcards(stmt *influxql.SelectStatement) (*influxq
return stmt.RewriteWildcards(fields, dimensions), nil
}
// expandSources expands regex sources and removes duplicates.
// NOTE: sources must be normalized (db and rp set) before calling this function.
func (lm *LocalMapper) expandSources(sources influxql.Sources) (influxql.Sources, error) {
// Use a map as a set to prevent duplicates. Two regexes might produce
// duplicates when expanded.
set := map[string]influxql.Source{}
names := []string{}
// Iterate all sources, expanding regexes when they're found.
for _, source := range sources {
switch src := source.(type) {
case *influxql.Measurement:
if src.Regex == nil {
name := src.String()
set[name] = src
names = append(names, name)
continue
}
// Get measurements from the database that match the regex.
measurements := lm.shard.index.measurementsByRegex(src.Regex.Val)
// Add those measurements to the set.
for _, m := range measurements {
m2 := &influxql.Measurement{
Database: src.Database,
RetentionPolicy: src.RetentionPolicy,
Name: m.Name,
}
name := m2.String()
if _, ok := set[name]; !ok {
set[name] = m2
names = append(names, name)
}
}
default:
return nil, fmt.Errorf("expandSources: unsuported source type: %T", source)
}
}
// Sort the list of source names.
sort.Strings(names)
// Convert set to a list of Sources.
expanded := make(influxql.Sources, 0, len(set))
for _, name := range names {
expanded = append(expanded, set[name])
}
return expanded, nil
}
// TagSets returns the list of TagSets for which this mapper has data.
func (lm *LocalMapper) TagSets() []string {
func (lm *SelectMapper) TagSets() []string {
if lm.remote != nil {
return lm.remote.TagSets()
}
@ -606,7 +560,7 @@ func (lm *LocalMapper) TagSets() []string {
// Fields returns any SELECT fields. If this Mapper is not processing a SELECT query
// then an empty slice is returned.
func (lm *LocalMapper) Fields() []string {
func (lm *SelectMapper) Fields() []string {
if lm.remote != nil {
return lm.remote.Fields()
}
@ -614,7 +568,7 @@ func (lm *LocalMapper) Fields() []string {
}
// Close closes the mapper.
func (lm *LocalMapper) Close() {
func (lm *SelectMapper) Close() {
if lm.remote != nil {
lm.remote.Close()
return
@ -855,6 +809,52 @@ type tagSetsAndFields struct {
whereFields []string
}
// expandSources expands regex sources and removes duplicates.
// NOTE: sources must be normalized (db and rp set) before calling this function.
func expandSources(sources influxql.Sources, di *DatabaseIndex) (influxql.Sources, error) {
// Use a map as a set to prevent duplicates. Two regexes might produce
// duplicates when expanded.
set := map[string]influxql.Source{}
names := []string{}
// Iterate all sources, expanding regexes when they're found.
for _, source := range sources {
switch src := source.(type) {
case *influxql.Measurement:
if src.Regex == nil {
name := src.String()
set[name] = src
names = append(names, name)
continue
}
// Get measurements from the database that match the regex.
measurements := di.measurementsByRegex(src.Regex.Val)
// Add those measurements to the set.
for _, m := range measurements {
m2 := &influxql.Measurement{
Database: src.Database,
RetentionPolicy: src.RetentionPolicy,
Name: m.Name,
}
name := m2.String()
if _, ok := set[name]; !ok {
set[name] = m2
names = append(names, name)
}
}
default:
return nil, fmt.Errorf("expandSources: unsuported source type: %T", source)
}
}
// Sort the list of source names.
sort.Strings(names)
// Convert set to a list of Sources.
expanded := make(influxql.Sources, 0, len(set))
for _, name := range names {
expanded = append(expanded, set[name])
}
return expanded, nil
}
// createTagSetsAndFields returns the tagsets and various fields given a measurement and
// SELECT statement.
func createTagSetsAndFields(m *Measurement, stmt *influxql.SelectStatement) (*tagSetsAndFields, error) {

View File

@ -415,7 +415,7 @@ func TestShardMapper_WriteAndSingleMapperAggregateQuery(t *testing.T) {
for _, tt := range tests {
stmt := mustParseSelectStatement(tt.stmt)
mapper := openLocalMapperOrFail(t, shard, stmt)
mapper := openSelectMapperOrFail(t, shard, stmt)
for i := range tt.expected {
got := aggIntervalAsJson(t, mapper)
@ -427,7 +427,7 @@ func TestShardMapper_WriteAndSingleMapperAggregateQuery(t *testing.T) {
}
}
func TestShardMapper_LocalMapperTagSetsFields(t *testing.T) {
func TestShardMapper_SelectMapperTagSetsFields(t *testing.T) {
tmpDir, _ := ioutil.TempDir("", "shard_test")
defer os.RemoveAll(tmpDir)
shard := mustCreateShard(tmpDir)
@ -490,7 +490,7 @@ func TestShardMapper_LocalMapperTagSetsFields(t *testing.T) {
for _, tt := range tests {
stmt := mustParseSelectStatement(tt.stmt)
mapper := openLocalMapperOrFail(t, shard, stmt)
mapper := openSelectMapperOrFail(t, shard, stmt)
fields := mapper.Fields()
if !reflect.DeepEqual(fields, tt.expectedFields) {
@ -526,8 +526,17 @@ func mustParseSelectStatement(s string) *influxql.SelectStatement {
return stmt.(*influxql.SelectStatement)
}
// mustParseStatement parses a statement. Panic on error.
func mustParseStatement(s string) influxql.Statement {
stmt, err := influxql.NewParser(strings.NewReader(s)).ParseStatement()
if err != nil {
panic(err)
}
return stmt
}
func openRawMapperOrFail(t *testing.T, shard *tsdb.Shard, stmt *influxql.SelectStatement, chunkSize int) tsdb.Mapper {
mapper := tsdb.NewLocalMapper(shard, stmt, chunkSize)
mapper := tsdb.NewSelectMapper(shard, stmt, chunkSize)
if err := mapper.Open(); err != nil {
t.Fatalf("failed to open raw mapper: %s", err.Error())
@ -547,8 +556,8 @@ func nextRawChunkAsJson(t *testing.T, mapper tsdb.Mapper) string {
return string(b)
}
func openLocalMapperOrFail(t *testing.T, shard *tsdb.Shard, stmt *influxql.SelectStatement) *tsdb.LocalMapper {
mapper := tsdb.NewLocalMapper(shard, stmt, 0)
func openSelectMapperOrFail(t *testing.T, shard *tsdb.Shard, stmt *influxql.SelectStatement) *tsdb.SelectMapper {
mapper := tsdb.NewSelectMapper(shard, stmt, 0)
if err := mapper.Open(); err != nil {
t.Fatalf("failed to open aggregate mapper: %s", err.Error())
@ -556,7 +565,7 @@ func openLocalMapperOrFail(t *testing.T, shard *tsdb.Shard, stmt *influxql.Selec
return mapper
}
func aggIntervalAsJson(t *testing.T, mapper *tsdb.LocalMapper) string {
func aggIntervalAsJson(t *testing.T, mapper *tsdb.SelectMapper) string {
r, err := mapper.NextChunk()
if err != nil {
t.Fatalf("failed to get chunk from aggregate mapper: %s", err.Error())

View File

@ -38,7 +38,7 @@ type QueryExecutor struct {
// Maps shards for queries.
ShardMapper interface {
CreateMapper(shard meta.ShardInfo, stmt string, chunkSize int) (Mapper, error)
CreateMapper(shard meta.ShardInfo, stmt influxql.Statement, chunkSize int) (Mapper, error)
}
Logger *log.Logger
@ -156,7 +156,10 @@ func (q *QueryExecutor) ExecuteQuery(query *influxql.Query, database string, chu
// TODO: handle this in a cluster
res = q.executeDropMeasurementStatement(stmt, database)
case *influxql.ShowMeasurementsStatement:
res = q.executeShowMeasurementsStatement(stmt, database)
if err := q.executeShowMeasurementsStatement(i, stmt, database, results, chunkSize); err != nil {
results <- &influxql.Result{Err: err}
break
}
case *influxql.ShowTagKeysStatement:
res = q.executeShowTagKeysStatement(stmt, database)
case *influxql.ShowTagValuesStatement:
@ -199,7 +202,7 @@ func (q *QueryExecutor) ExecuteQuery(query *influxql.Query, database string, chu
}
// Plan creates an execution plan for the given SelectStatement and returns an Executor.
func (q *QueryExecutor) Plan(stmt *influxql.SelectStatement, chunkSize int) (*Executor, error) {
func (q *QueryExecutor) PlanSelect(stmt *influxql.SelectStatement, chunkSize int) (Executor, error) {
shards := map[uint64]meta.ShardInfo{} // Shards requiring mappers.
// Replace instances of "now()" with the current time, and check the resultant times.
@ -234,7 +237,7 @@ func (q *QueryExecutor) Plan(stmt *influxql.SelectStatement, chunkSize int) (*Ex
// Build the Mappers, one per shard.
mappers := []Mapper{}
for _, sh := range shards {
m, err := q.ShardMapper.CreateMapper(sh, stmt.String(), chunkSize)
m, err := q.ShardMapper.CreateMapper(sh, stmt, chunkSize)
if err != nil {
return nil, err
}
@ -245,14 +248,14 @@ func (q *QueryExecutor) Plan(stmt *influxql.SelectStatement, chunkSize int) (*Ex
mappers = append(mappers, m)
}
executor := NewExecutor(stmt, mappers, chunkSize)
executor := NewSelectExecutor(stmt, mappers, chunkSize)
return executor, nil
}
// executeSelectStatement plans and executes a select statement against a database.
func (q *QueryExecutor) executeSelectStatement(statementID int, stmt *influxql.SelectStatement, results chan *influxql.Result, chunkSize int) error {
// Plan statement execution.
e, err := q.Plan(stmt, chunkSize)
e, err := q.PlanSelect(stmt, chunkSize)
if err != nil {
return err
}
@ -547,63 +550,62 @@ func (q *QueryExecutor) filterShowSeriesResult(limit, offset int, rows influxql.
return filteredSeries
}
func (q *QueryExecutor) executeShowMeasurementsStatement(stmt *influxql.ShowMeasurementsStatement, database string) *influxql.Result {
// Find the database.
db := q.Store.DatabaseIndex(database)
if db == nil {
return &influxql.Result{}
// PlanShowMeasurements creates an execution plan for the given SelectStatement and returns an Executor.
func (q *QueryExecutor) PlanShowMeasurements(stmt *influxql.ShowMeasurementsStatement, database string, chunkSize int) (Executor, error) {
// Get the database info.
di, err := q.MetaStore.Database(database)
if err != nil {
return nil, err
} else if di == nil {
return nil, ErrDatabaseNotFound(database)
}
var measurements Measurements
// Get info for all shards in the database.
shards := di.ShardInfos()
// If a WHERE clause was specified, filter the measurements.
if stmt.Condition != nil {
var err error
measurements, err = db.measurementsByExpr(stmt.Condition)
// Build the Mappers, one per shard.
mappers := []Mapper{}
for _, sh := range shards {
m, err := q.ShardMapper.CreateMapper(sh, stmt, chunkSize)
if err != nil {
return &influxql.Result{Err: err}
return nil, err
}
} else {
// Otherwise, get all measurements from the database.
measurements = db.Measurements()
}
sort.Sort(measurements)
offset := stmt.Offset
limit := stmt.Limit
// If OFFSET is past the end of the array, return empty results.
if offset > len(measurements)-1 {
return &influxql.Result{}
if m == nil {
// No data for this shard, skip it.
continue
}
mappers = append(mappers, m)
}
// Calculate last index based on LIMIT.
end := len(measurements)
if limit > 0 && offset+limit < end {
limit = offset + limit
} else {
limit = end
executor := NewShowMeasurementsExecutor(stmt, mappers, chunkSize)
return executor, nil
}
func (q *QueryExecutor) executeShowMeasurementsStatement(statementID int, stmt *influxql.ShowMeasurementsStatement, database string, results chan *influxql.Result, chunkSize int) error {
// Plan statement execution.
e, err := q.PlanShowMeasurements(stmt, database, chunkSize)
if err != nil {
return err
}
// Make a result row to hold all measurement names.
row := &influxql.Row{
Name: "measurements",
Columns: []string{"name"},
// Execute plan.
ch := e.Execute()
// Stream results from the channel. We should send an empty result if nothing comes through.
resultSent := false
for row := range ch {
if row.Err != nil {
return row.Err
}
resultSent = true
results <- &influxql.Result{StatementID: statementID, Series: []*influxql.Row{row}}
}
// Add one value to the row for each measurement name.
for i := offset; i < limit; i++ {
m := measurements[i]
v := interface{}(m.Name)
row.Values = append(row.Values, []interface{}{v})
if !resultSent {
results <- &influxql.Result{StatementID: statementID, Series: make([]*influxql.Row, 0)}
}
// Make a result.
result := &influxql.Result{
Series: []*influxql.Row{row},
}
return result
return nil
}
func (q *QueryExecutor) executeShowTagKeysStatement(stmt *influxql.ShowTagKeysStatement, database string) *influxql.Result {

View File

@ -18,7 +18,7 @@ var sgID = uint64(2)
var shardID = uint64(1)
func TestWritePointsAndExecuteQuery(t *testing.T) {
store, executor := testStoreAndExecutor()
store, executor := testStoreAndExecutor("")
defer os.RemoveAll(store.Path())
// Write first point.
@ -71,7 +71,7 @@ func TestWritePointsAndExecuteQuery(t *testing.T) {
// Ensure writing a point and updating it results in only a single point.
func TestWritePointsAndExecuteQuery_Update(t *testing.T) {
store, executor := testStoreAndExecutor()
store, executor := testStoreAndExecutor("")
defer os.RemoveAll(store.Path())
// Write original point.
@ -113,7 +113,7 @@ func TestWritePointsAndExecuteQuery_Update(t *testing.T) {
}
func TestDropSeriesStatement(t *testing.T) {
store, executor := testStoreAndExecutor()
store, executor := testStoreAndExecutor("")
defer os.RemoveAll(store.Path())
pt := tsdb.NewPoint(
@ -169,7 +169,7 @@ func TestDropSeriesStatement(t *testing.T) {
}
func TestDropMeasurementStatement(t *testing.T) {
store, executor := testStoreAndExecutor()
store, executor := testStoreAndExecutor("")
defer os.RemoveAll(store.Path())
pt := tsdb.NewPoint(
@ -221,11 +221,7 @@ func TestDropMeasurementStatement(t *testing.T) {
validateDrop()
store.Close()
conf := store.EngineOptions.Config
store = tsdb.NewStore(store.Path())
store.EngineOptions.Config = conf
store.Open()
executor.Store = store
store, executor = testStoreAndExecutor(store.Path())
validateDrop()
}
@ -239,7 +235,7 @@ func (m *metaExec) ExecuteStatement(stmt influxql.Statement) *influxql.Result {
}
func TestDropDatabase(t *testing.T) {
store, executor := testStoreAndExecutor()
store, executor := testStoreAndExecutor("")
defer os.RemoveAll(store.Path())
pt := tsdb.NewPoint(
@ -301,7 +297,7 @@ func TestDropDatabase(t *testing.T) {
// Ensure that queries for which there is no data result in an empty set.
func TestQueryNoData(t *testing.T) {
store, executor := testStoreAndExecutor()
store, executor := testStoreAndExecutor("")
defer os.RemoveAll(store.Path())
got := executeAndGetJSON("select * from /.*/", executor)
@ -322,7 +318,7 @@ func TestQueryNoData(t *testing.T) {
// ensure that authenticate doesn't return an error if the user count is zero and they're attempting
// to create a user.
func TestAuthenticateIfUserCountZeroAndCreateUser(t *testing.T) {
store, executor := testStoreAndExecutor()
store, executor := testStoreAndExecutor("")
defer os.RemoveAll(store.Path())
ms := &testMetastore{userCount: 0}
executor.MetaStore = ms
@ -350,11 +346,13 @@ func TestAuthenticateIfUserCountZeroAndCreateUser(t *testing.T) {
}
}
func testStoreAndExecutor() (*tsdb.Store, *tsdb.QueryExecutor) {
path, _ := ioutil.TempDir("", "")
func testStoreAndExecutor(storePath string) (*tsdb.Store, *tsdb.QueryExecutor) {
if storePath == "" {
storePath, _ = ioutil.TempDir("", "")
}
store := tsdb.NewStore(path)
store.EngineOptions.Config.WALDir = filepath.Join(path, "wal")
store := tsdb.NewStore(storePath)
store.EngineOptions.Config.WALDir = filepath.Join(storePath, "wal")
err := store.Open()
if err != nil {
@ -479,7 +477,7 @@ type testShardMapper struct {
store *tsdb.Store
}
func (t *testShardMapper) CreateMapper(shard meta.ShardInfo, stmt string, chunkSize int) (tsdb.Mapper, error) {
func (t *testShardMapper) CreateMapper(shard meta.ShardInfo, stmt influxql.Statement, chunkSize int) (tsdb.Mapper, error) {
m, err := t.store.CreateMapper(shard.ID, stmt, chunkSize)
return m, err
}

236
tsdb/show_measurements.go Normal file
View File

@ -0,0 +1,236 @@
package tsdb
import (
"encoding/json"
"fmt"
"sort"
"github.com/influxdb/influxdb/influxql"
)
// ShowMeasurementsExecutor implements the Executor interface for a SHOW MEASUREMENTS statement.
type ShowMeasurementsExecutor struct {
stmt *influxql.ShowMeasurementsStatement
mappers []Mapper
chunkSize int
}
// NewShowMeasurementsExecutor returns a new ShowMeasurementsExecutor.
func NewShowMeasurementsExecutor(stmt *influxql.ShowMeasurementsStatement, mappers []Mapper, chunkSize int) *ShowMeasurementsExecutor {
return &ShowMeasurementsExecutor{
stmt: stmt,
mappers: mappers,
chunkSize: chunkSize,
}
}
// Execute begins execution of the query and returns a channel to receive rows.
func (e *ShowMeasurementsExecutor) Execute() <-chan *influxql.Row {
// Create output channel and stream data in a separate goroutine.
out := make(chan *influxql.Row, 0)
// It's important that all resources are released when execution completes.
defer e.close()
go func() {
// Open the mappers.
for _, m := range e.mappers {
if err := m.Open(); err != nil {
out <- &influxql.Row{Err: err}
return
}
}
// Create a set to hold measurement names from mappers.
set := map[string]struct{}{}
// Iterate through mappers collecting measurement names.
for _, m := range e.mappers {
// Get the data from the mapper.
c, err := m.NextChunk()
if err != nil {
out <- &influxql.Row{Err: err}
return
} else if c == nil {
// Mapper had no data.
continue
}
// Convert the mapper chunk to a string array of measurement names.
mms, ok := c.([]string)
if !ok {
out <- &influxql.Row{Err: fmt.Errorf("show measurements mapper returned invalid type: %T", c)}
return
}
// Add the measurement names to the set.
for _, mm := range mms {
set[mm] = struct{}{}
}
}
// Convert the set into an array of measurement names.
measurements := make([]string, 0, len(set))
for mm := range set {
measurements = append(measurements, mm)
}
// Sort the names.
sort.Strings(measurements)
// Calculate OFFSET and LIMIT
off := e.stmt.Offset
lim := len(measurements)
stmtLim := e.stmt.Limit
if stmtLim > 0 && off+stmtLim < lim {
lim = off + stmtLim
} else if off > lim {
off, lim = 0, 0
}
// Put the results in a row and send it.
row := &influxql.Row{
Name: "measurements",
Columns: []string{"name"},
Values: make([][]interface{}, 0, len(measurements)),
}
for _, m := range measurements[off:lim] {
v := []interface{}{m}
row.Values = append(row.Values, v)
}
if len(row.Values) > 0 {
out <- row
}
close(out)
}()
return out
}
// Close closes the executor such that all resources are released. Once closed,
// an executor may not be re-used.
func (e *ShowMeasurementsExecutor) close() {
if e != nil {
for _, m := range e.mappers {
m.Close()
}
}
}
// ShowMeasurementsMapper is a mapper for collecting measurement names from a shard.
type ShowMeasurementsMapper struct {
remote Mapper
shard *Shard
stmt *influxql.ShowMeasurementsStatement
chunkSize int
state interface{}
}
// NewShowMeasurementsMapper returns a mapper for the given shard, which will return data for the meta statement.
func NewShowMeasurementsMapper(shard *Shard, stmt *influxql.ShowMeasurementsStatement, chunkSize int) *ShowMeasurementsMapper {
return &ShowMeasurementsMapper{
shard: shard,
stmt: stmt,
chunkSize: chunkSize,
}
}
// Open opens the mapper for use.
func (m *ShowMeasurementsMapper) Open() error {
if m.remote != nil {
return m.remote.Open()
}
var measurements Measurements
// If a WHERE clause was specified, filter the measurements.
if m.stmt.Condition != nil {
var err error
measurements, err = m.shard.index.measurementsByExpr(m.stmt.Condition)
if err != nil {
return err
}
} else {
// Otherwise, get all measurements from the database.
measurements = m.shard.index.Measurements()
}
sort.Sort(measurements)
// Create a channel to send measurement names on.
ch := make(chan string)
// Start a goroutine to send the names over the channel as needed.
go func() {
for _, mm := range measurements {
ch <- mm.Name
}
close(ch)
}()
// Store the channel as the state of the mapper.
m.state = ch
return nil
}
// SetRemote sets the remote mapper to use.
func (m *ShowMeasurementsMapper) SetRemote(remote Mapper) error {
m.remote = remote
return nil
}
// TagSets is only implemented on this mapper to satisfy the Mapper interface.
func (m *ShowMeasurementsMapper) TagSets() []string { return nil }
// Fields returns a list of field names for this mapper.
func (m *ShowMeasurementsMapper) Fields() []string { return []string{"name"} }
// NextChunk returns the next chunk of measurement names.
func (m *ShowMeasurementsMapper) NextChunk() (interface{}, error) {
if m.remote != nil {
b, err := m.remote.NextChunk()
if err != nil {
return nil, err
} else if b == nil {
return nil, nil
}
names := []string{}
if err := json.Unmarshal(b.([]byte), &names); err != nil {
return nil, err
} else if len(names) == 0 {
// Mapper on other node sent 0 values so it's done.
return nil, nil
}
return names, nil
}
return m.nextChunk()
}
// nextChunk implements next chunk logic for a local shard.
func (m *ShowMeasurementsMapper) nextChunk() (interface{}, error) {
// Allocate array to hold measurement names.
names := make([]string, 0, m.chunkSize)
// Get the channel of measurement names from the state.
measurementNames := m.state.(chan string)
// Get the next chunk of names.
for n := range measurementNames {
names = append(names, n)
if len(names) == m.chunkSize {
break
}
}
// See if we've read all the names.
if len(names) == 0 {
return nil, nil
}
return names, nil
}
// Close closes the mapper.
func (m *ShowMeasurementsMapper) Close() {
if m.remote != nil {
m.remote.Close()
}
}

View File

@ -303,23 +303,21 @@ func (s *Store) WriteToShard(shardID uint64, points []Point) error {
return sh.WritePoints(points)
}
func (s *Store) CreateMapper(shardID uint64, query string, chunkSize int) (Mapper, error) {
q, err := influxql.NewParser(strings.NewReader(query)).ParseStatement()
if err != nil {
return nil, err
}
stmt, ok := q.(*influxql.SelectStatement)
if !ok {
return nil, fmt.Errorf("query is not a SELECT statement: %s", err.Error())
}
func (s *Store) CreateMapper(shardID uint64, stmt influxql.Statement, chunkSize int) (Mapper, error) {
shard := s.Shard(shardID)
if shard == nil {
// This can happen if the shard has been assigned, but hasn't actually been created yet.
return nil, nil
}
return NewLocalMapper(shard, stmt, chunkSize), nil
switch st := stmt.(type) {
case *influxql.SelectStatement:
return NewSelectMapper(shard, st, chunkSize), nil
case *influxql.ShowMeasurementsStatement:
return NewShowMeasurementsMapper(shard, st, chunkSize), nil
default:
return nil, fmt.Errorf("can't create mapper for statement type: %v", st)
}
}
func (s *Store) Close() error {