revert meta execution

pull/5196/head
Ben Johnson 2016-02-03 10:23:31 -07:00
parent 5605bbb22e
commit 2bdc9404ef
3 changed files with 327 additions and 87 deletions

View File

@ -20,11 +20,12 @@ import (
type QueryExecutor struct {
// Local data store.
Store interface {
DatabaseIndex(name string) *DatabaseIndex
Shards(ids []uint64) []*Shard
ExpandSources(sources influxql.Sources) (influxql.Sources, error)
DeleteDatabase(name string, shardIDs []uint64) error
DeleteMeasurement(database, name string) error
DeleteSeries(database string, sources influxql.Sources, condition influxql.Expr) error
DeleteSeries(database string, seriesKeys []string) error
}
// The meta store for accessing and updating cluster and schema data.
@ -176,10 +177,7 @@ func (q *QueryExecutor) ExecuteQuery(query *influxql.Query, database string, chu
// TODO: handle this in a cluster
res = q.executeDropSeriesStatement(stmt, database)
case *influxql.ShowSeriesStatement:
if err := q.executeStatement(i, stmt, database, results, chunkSize, closing); err != nil {
results <- &influxql.Result{Err: err}
break
}
res = q.executeShowSeriesStatement(stmt, database)
case *influxql.DropMeasurementStatement:
// TODO: handle this in a cluster
res = q.executeDropMeasurementStatement(stmt, database)
@ -194,15 +192,9 @@ func (q *QueryExecutor) ExecuteQuery(query *influxql.Query, database string, chu
break
}
case *influxql.ShowTagValuesStatement:
if err := q.executeStatement(i, stmt, database, results, chunkSize, closing); err != nil {
results <- &influxql.Result{Err: err}
break
}
res = q.executeShowTagValuesStatement(stmt, database)
case *influxql.ShowFieldKeysStatement:
if err := q.executeStatement(i, stmt, database, results, chunkSize, closing); err != nil {
results <- &influxql.Result{Err: err}
break
}
res = q.executeShowFieldKeysStatement(stmt, database)
case *influxql.DeleteStatement:
res = &influxql.Result{Err: ErrInvalidQuery}
case *influxql.DropDatabaseStatement:
@ -343,25 +335,157 @@ func (q *QueryExecutor) executeDropSeriesStatement(stmt *influxql.DropSeriesStat
return &influxql.Result{Err: errors.New("DROP SERIES doesn't support time in WHERE clause")}
}
if err := q.Store.DeleteSeries(database, stmt.Sources, stmt.Condition); err != nil {
// Find the database.
db := q.Store.DatabaseIndex(database)
if db == nil {
return &influxql.Result{}
}
// Expand regex expressions in the FROM clause.
sources, err := q.Store.ExpandSources(stmt.Sources)
if err != nil {
return &influxql.Result{Err: err}
} else if stmt.Sources != nil && len(stmt.Sources) != 0 && len(sources) == 0 {
return &influxql.Result{}
}
measurements, err := measurementsFromSourcesOrDB(db, sources...)
if err != nil {
return &influxql.Result{Err: err}
}
var seriesKeys []string
for _, m := range measurements {
var ids SeriesIDs
var filters FilterExprs
if stmt.Condition != nil {
// Get series IDs that match the WHERE clause.
ids, filters, err = m.walkWhereForSeriesIds(stmt.Condition)
if err != nil {
return &influxql.Result{Err: err}
}
// Delete boolean literal true filter expressions.
// These are returned for `WHERE tagKey = 'tagVal'` type expressions and are okay.
filters.DeleteBoolLiteralTrues()
// Check for unsupported field filters.
// Any remaining filters means there were fields (e.g., `WHERE value = 1.2`).
if filters.Len() > 0 {
return &influxql.Result{Err: errors.New("DROP SERIES doesn't support fields in WHERE clause")}
}
} else {
// No WHERE clause so get all series IDs for this measurement.
ids = m.seriesIDs
}
for _, id := range ids {
seriesKeys = append(seriesKeys, m.seriesByID[id].Key)
}
}
// delete the raw series data
if err := q.Store.DeleteSeries(database, seriesKeys); err != nil {
return &influxql.Result{Err: err}
}
// remove them from the index
db.DropSeries(seriesKeys)
return &influxql.Result{}
}
func (q *QueryExecutor) planShowSeries(stmt *influxql.ShowSeriesStatement, database string, chunkSize int) (Executor, error) {
return q.PlanSelect(&influxql.SelectStatement{
Fields: influxql.Fields{
{Expr: &influxql.Wildcard{}},
},
Sources: influxql.Sources{
&influxql.Measurement{Database: database, Name: "_series"},
},
Condition: stmt.Condition,
Offset: stmt.Offset,
Limit: stmt.Limit,
SortFields: stmt.SortFields,
}, chunkSize)
func (q *QueryExecutor) executeShowSeriesStatement(stmt *influxql.ShowSeriesStatement, database string) *influxql.Result {
// Check for time in WHERE clause (not supported).
if influxql.HasTimeExpr(stmt.Condition) {
return &influxql.Result{Err: errors.New("SHOW SERIES doesn't support time in WHERE clause")}
}
// Find the database.
db := q.Store.DatabaseIndex(database)
if db == nil {
return &influxql.Result{}
}
// Expand regex expressions in the FROM clause.
sources, err := q.Store.ExpandSources(stmt.Sources)
if err != nil {
return &influxql.Result{Err: err}
}
// Get the list of measurements we're interested in.
measurements, err := measurementsFromSourcesOrDB(db, sources...)
if err != nil {
return &influxql.Result{Err: err}
}
// Create result struct that will be populated and returned.
result := &influxql.Result{
Series: make(models.Rows, 0, len(measurements)),
}
// Loop through measurements to build result. One result row / measurement.
for _, m := range measurements {
var ids SeriesIDs
var filters FilterExprs
if stmt.Condition != nil {
// Get series IDs that match the WHERE clause.
ids, filters, err = m.walkWhereForSeriesIds(stmt.Condition)
if err != nil {
return &influxql.Result{Err: err}
}
// Delete boolean literal true filter expressions.
filters.DeleteBoolLiteralTrues()
// Check for unsupported field filters.
if filters.Len() > 0 {
return &influxql.Result{Err: errors.New("SHOW SERIES doesn't support fields in WHERE clause")}
}
// If no series matched, then go to the next measurement.
if len(ids) == 0 {
continue
}
} else {
// No WHERE clause so get all series IDs for this measurement.
ids = m.seriesIDs
}
// Make a new row for this measurement.
r := &models.Row{
Name: m.Name,
Columns: m.TagKeys(),
}
// Loop through series IDs getting matching tag sets.
for _, id := range ids {
if s, ok := m.seriesByID[id]; ok {
values := make([]interface{}, 0, len(r.Columns))
// make the series key the first value
values = append(values, s.Key)
for _, column := range r.Columns {
values = append(values, s.Tags[column])
}
// Add the tag values to the row.
r.Values = append(r.Values, values)
}
}
// make the id the first column
r.Columns = append([]string{"_key"}, r.Columns...)
// Append the row to the result.
result.Series = append(result.Series, r)
}
if stmt.Limit > 0 || stmt.Offset > 0 {
result.Series = q.filterShowSeriesResult(stmt.Limit, stmt.Offset, result.Series)
}
return result
}
// filterShowSeriesResult will limit the number of series returned based on the limit and the offset.
@ -399,14 +523,8 @@ func (q *QueryExecutor) planStatement(stmt influxql.Statement, database string,
return q.PlanSelect(stmt, chunkSize)
case *influxql.ShowMeasurementsStatement:
return q.planShowMeasurements(stmt, database, chunkSize)
case *influxql.ShowSeriesStatement:
return q.planShowSeries(stmt, database, chunkSize)
case *influxql.ShowTagKeysStatement:
return q.planShowTagKeys(stmt, database, chunkSize)
case *influxql.ShowTagValuesStatement:
return q.planShowTagValues(stmt, database, chunkSize)
case *influxql.ShowFieldKeysStatement:
return q.planShowFieldKeys(stmt, database, chunkSize)
default:
return nil, fmt.Errorf("can't plan statement type: %v", stmt)
}
@ -419,18 +537,35 @@ func (q *QueryExecutor) planShowMeasurements(stmt *influxql.ShowMeasurementsStat
return nil, errors.New("SHOW MEASUREMENTS doesn't support time in WHERE clause")
}
return q.PlanSelect(&influxql.SelectStatement{
Fields: influxql.Fields{
{Expr: &influxql.VarRef{Val: "name"}},
},
Sources: influxql.Sources{
&influxql.Measurement{Database: database, Name: "_measurements"},
},
Condition: stmt.Condition,
Offset: stmt.Offset,
Limit: stmt.Limit,
SortFields: stmt.SortFields,
}, chunkSize)
panic("FIXME: Implement SHOW MEASUREMENTS")
/*
// Get the database info.
di, err := q.MetaClient.Database(database)
if err != nil {
return nil, err
} else if di == nil {
return nil, ErrDatabaseNotFound(database)
}
// Get info for all shards in the database.
shards := di.ShardInfos()
// Build the Mappers, one per shard.
mappers := []Mapper{}
for _, sh := range shards {
m, err := q.ShardMapper.CreateMapper(sh, stmt, chunkSize)
if err != nil {
return nil, err
}
if m == nil {
// No data for this shard, skip it.
continue
}
mappers = append(mappers, m)
}
return NewShowMeasurementsExecutor(stmt, mappers, chunkSize), nil
*/
}
// planShowTagKeys creates an execution plan for a SHOW MEASUREMENTS statement and returns an Executor.
@ -440,18 +575,22 @@ func (q *QueryExecutor) planShowTagKeys(stmt *influxql.ShowTagKeysStatement, dat
return nil, errors.New("SHOW TAG KEYS doesn't support time in WHERE clause")
}
return q.PlanSelect(&influxql.SelectStatement{
Fields: influxql.Fields{
{Expr: &influxql.VarRef{Val: "tagKey"}},
},
Sources: influxql.Sources{
&influxql.Measurement{Database: database, Name: "_tagkeys"},
},
Condition: stmt.Condition,
Offset: stmt.Offset,
Limit: stmt.Limit,
SortFields: stmt.SortFields,
}, chunkSize)
panic("FIXME: Implement SHOW TAG KEYS")
/*
return q.PlanSelect(&influxql.SelectStatement{
Fields: influxql.Fields{
{Expr: &influxql.VarRef{Val: "tagKey"}},
},
Sources: influxql.Sources{
&influxql.Measurement{Database: database, Name: "_tagkeys"},
},
Condition: stmt.Condition,
Offset: stmt.Offset,
Limit: stmt.Limit,
SortFields: stmt.SortFields,
}, chunkSize)
*/
}
func (q *QueryExecutor) executeStatement(statementID int, stmt influxql.Statement, database string, results chan *influxql.Result, chunkSize int, closing chan struct{}) error {
@ -546,33 +685,135 @@ func (q *QueryExecutor) writeInto(row *models.Row, selectstmt *influxql.SelectSt
return nil
}
func (q *QueryExecutor) planShowTagValues(stmt *influxql.ShowTagValuesStatement, database string, chunkSize int) (Executor, error) {
return q.PlanSelect(&influxql.SelectStatement{
Fields: influxql.Fields{
{Expr: &influxql.Wildcard{}},
},
Sources: influxql.Sources{
&influxql.Measurement{Database: database, Name: "_tagvalues"},
},
Condition: stmt.Condition,
Offset: stmt.Offset,
Limit: stmt.Limit,
SortFields: stmt.SortFields,
}, chunkSize)
func (q *QueryExecutor) executeShowTagValuesStatement(stmt *influxql.ShowTagValuesStatement, database string) *influxql.Result {
// Check for time in WHERE clause (not supported).
if influxql.HasTimeExpr(stmt.Condition) {
return &influxql.Result{Err: errors.New("SHOW TAG VALUES doesn't support time in WHERE clause")}
}
// Find the database.
db := q.Store.DatabaseIndex(database)
if db == nil {
return &influxql.Result{}
}
// Expand regex expressions in the FROM clause.
sources, err := q.Store.ExpandSources(stmt.Sources)
if err != nil {
return &influxql.Result{Err: err}
}
// Get the list of measurements we're interested in.
measurements, err := measurementsFromSourcesOrDB(db, sources...)
if err != nil {
return &influxql.Result{Err: err}
}
// Make result.
result := &influxql.Result{
Series: make(models.Rows, 0),
}
tagValues := make(map[string]stringSet)
for _, m := range measurements {
var ids SeriesIDs
if stmt.Condition != nil {
// Get series IDs that match the WHERE clause.
ids, _, err = m.walkWhereForSeriesIds(stmt.Condition)
if err != nil {
return &influxql.Result{Err: err}
}
// If no series matched, then go to the next measurement.
if len(ids) == 0 {
continue
}
// TODO: check return of walkWhereForSeriesIds for fields
} else {
// No WHERE clause so get all series IDs for this measurement.
ids = m.seriesIDs
}
for k, v := range m.tagValuesByKeyAndSeriesID(stmt.TagKeys, ids) {
_, ok := tagValues[k]
if !ok {
tagValues[k] = v
}
tagValues[k] = tagValues[k].union(v)
}
}
for k, v := range tagValues {
r := &models.Row{
Name: k + "TagValues",
Columns: []string{k},
}
vals := v.list()
sort.Strings(vals)
for _, val := range vals {
v := interface{}(val)
r.Values = append(r.Values, []interface{}{v})
}
result.Series = append(result.Series, r)
}
sort.Sort(result.Series)
return result
}
func (q *QueryExecutor) planShowFieldKeys(stmt *influxql.ShowFieldKeysStatement, database string, chunkSize int) (Executor, error) {
return q.PlanSelect(&influxql.SelectStatement{
Fields: influxql.Fields{
{Expr: &influxql.VarRef{Val: "fieldKey"}},
},
Sources: influxql.Sources{
&influxql.Measurement{Database: database, Name: "_fieldkeys"},
},
Offset: stmt.Offset,
Limit: stmt.Limit,
SortFields: stmt.SortFields,
}, chunkSize)
func (q *QueryExecutor) executeShowFieldKeysStatement(stmt *influxql.ShowFieldKeysStatement, database string) *influxql.Result {
var err error
// Find the database.
db := q.Store.DatabaseIndex(database)
if db == nil {
return &influxql.Result{}
}
// Expand regex expressions in the FROM clause.
sources, err := q.Store.ExpandSources(stmt.Sources)
if err != nil {
return &influxql.Result{Err: err}
}
measurements, err := measurementsFromSourcesOrDB(db, sources...)
if err != nil {
return &influxql.Result{Err: err}
}
// Make result.
result := &influxql.Result{
Series: make(models.Rows, 0, len(measurements)),
}
// Loop through measurements, adding a result row for each.
for _, m := range measurements {
// Create a new row.
r := &models.Row{
Name: m.Name,
Columns: []string{"fieldKey"},
}
// Get a list of field names from the measurement then sort them.
names := m.FieldNames()
sort.Strings(names)
// Add the field names to the result row values.
for _, n := range names {
v := interface{}(n)
r.Values = append(r.Values, []interface{}{v})
}
// Append the row to the result.
result.Series = append(result.Series, r)
}
return result
}
// measurementsFromSourcesOrDB returns a list of measurements from the

View File

@ -263,9 +263,8 @@ func (s *Shard) WritePoints(points []models.Point) error {
}
// DeleteSeries deletes a list of series.
func (s *Shard) DeleteSeries(sources influxql.Sources, condition influxql.Expr) error {
panic("FIXME: implement delete series")
//return s.engine.DeleteSeries(sources, condition)
func (s *Shard) DeleteSeries(seriesKeys []string) error {
return s.engine.DeleteSeries(seriesKeys)
}
// DeleteMeasurement deletes a measurement and all underlying series.

View File

@ -418,7 +418,7 @@ func (s *Store) ShardRelativePath(id uint64) (string, error) {
}
// DeleteSeries loops through the local shards and deletes the series data and metadata for the passed in series keys
func (s *Store) DeleteSeries(database string, sources influxql.Sources, condition influxql.Expr) error {
func (s *Store) DeleteSeries(database string, seriesKeys []string) error {
s.mu.RLock()
defer s.mu.RUnlock()
@ -431,7 +431,7 @@ func (s *Store) DeleteSeries(database string, sources influxql.Sources, conditio
if sh.index != db {
continue
}
if err := sh.DeleteSeries(sources, condition); err != nil {
if err := sh.DeleteSeries(seriesKeys); err != nil {
return err
}
}