From 2bdc9404efd199495d2408d149e0a6456afb91ff Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Wed, 3 Feb 2016 10:23:31 -0700 Subject: [PATCH] revert meta execution --- tsdb/query_executor.go | 405 ++++++++++++++++++++++++++++++++--------- tsdb/shard.go | 5 +- tsdb/store.go | 4 +- 3 files changed, 327 insertions(+), 87 deletions(-) diff --git a/tsdb/query_executor.go b/tsdb/query_executor.go index ca65aa77a3..b2b05ddf2b 100644 --- a/tsdb/query_executor.go +++ b/tsdb/query_executor.go @@ -20,11 +20,12 @@ import ( type QueryExecutor struct { // Local data store. Store interface { + DatabaseIndex(name string) *DatabaseIndex Shards(ids []uint64) []*Shard ExpandSources(sources influxql.Sources) (influxql.Sources, error) DeleteDatabase(name string, shardIDs []uint64) error DeleteMeasurement(database, name string) error - DeleteSeries(database string, sources influxql.Sources, condition influxql.Expr) error + DeleteSeries(database string, seriesKeys []string) error } // The meta store for accessing and updating cluster and schema data. @@ -176,10 +177,7 @@ func (q *QueryExecutor) ExecuteQuery(query *influxql.Query, database string, chu // TODO: handle this in a cluster res = q.executeDropSeriesStatement(stmt, database) case *influxql.ShowSeriesStatement: - if err := q.executeStatement(i, stmt, database, results, chunkSize, closing); err != nil { - results <- &influxql.Result{Err: err} - break - } + res = q.executeShowSeriesStatement(stmt, database) case *influxql.DropMeasurementStatement: // TODO: handle this in a cluster res = q.executeDropMeasurementStatement(stmt, database) @@ -194,15 +192,9 @@ func (q *QueryExecutor) ExecuteQuery(query *influxql.Query, database string, chu break } case *influxql.ShowTagValuesStatement: - if err := q.executeStatement(i, stmt, database, results, chunkSize, closing); err != nil { - results <- &influxql.Result{Err: err} - break - } + res = q.executeShowTagValuesStatement(stmt, database) case *influxql.ShowFieldKeysStatement: - if err := q.executeStatement(i, stmt, database, results, chunkSize, closing); err != nil { - results <- &influxql.Result{Err: err} - break - } + res = q.executeShowFieldKeysStatement(stmt, database) case *influxql.DeleteStatement: res = &influxql.Result{Err: ErrInvalidQuery} case *influxql.DropDatabaseStatement: @@ -343,25 +335,157 @@ func (q *QueryExecutor) executeDropSeriesStatement(stmt *influxql.DropSeriesStat return &influxql.Result{Err: errors.New("DROP SERIES doesn't support time in WHERE clause")} } - if err := q.Store.DeleteSeries(database, stmt.Sources, stmt.Condition); err != nil { + // Find the database. + db := q.Store.DatabaseIndex(database) + if db == nil { + return &influxql.Result{} + } + + // Expand regex expressions in the FROM clause. + sources, err := q.Store.ExpandSources(stmt.Sources) + if err != nil { + return &influxql.Result{Err: err} + } else if stmt.Sources != nil && len(stmt.Sources) != 0 && len(sources) == 0 { + return &influxql.Result{} + } + + measurements, err := measurementsFromSourcesOrDB(db, sources...) + if err != nil { return &influxql.Result{Err: err} } + + var seriesKeys []string + for _, m := range measurements { + var ids SeriesIDs + var filters FilterExprs + if stmt.Condition != nil { + // Get series IDs that match the WHERE clause. + ids, filters, err = m.walkWhereForSeriesIds(stmt.Condition) + if err != nil { + return &influxql.Result{Err: err} + } + + // Delete boolean literal true filter expressions. + // These are returned for `WHERE tagKey = 'tagVal'` type expressions and are okay. + filters.DeleteBoolLiteralTrues() + + // Check for unsupported field filters. + // Any remaining filters means there were fields (e.g., `WHERE value = 1.2`). + if filters.Len() > 0 { + return &influxql.Result{Err: errors.New("DROP SERIES doesn't support fields in WHERE clause")} + } + } else { + // No WHERE clause so get all series IDs for this measurement. + ids = m.seriesIDs + } + + for _, id := range ids { + seriesKeys = append(seriesKeys, m.seriesByID[id].Key) + } + } + + // delete the raw series data + if err := q.Store.DeleteSeries(database, seriesKeys); err != nil { + return &influxql.Result{Err: err} + } + // remove them from the index + db.DropSeries(seriesKeys) + return &influxql.Result{} } -func (q *QueryExecutor) planShowSeries(stmt *influxql.ShowSeriesStatement, database string, chunkSize int) (Executor, error) { - return q.PlanSelect(&influxql.SelectStatement{ - Fields: influxql.Fields{ - {Expr: &influxql.Wildcard{}}, - }, - Sources: influxql.Sources{ - &influxql.Measurement{Database: database, Name: "_series"}, - }, - Condition: stmt.Condition, - Offset: stmt.Offset, - Limit: stmt.Limit, - SortFields: stmt.SortFields, - }, chunkSize) +func (q *QueryExecutor) executeShowSeriesStatement(stmt *influxql.ShowSeriesStatement, database string) *influxql.Result { + // Check for time in WHERE clause (not supported). + if influxql.HasTimeExpr(stmt.Condition) { + return &influxql.Result{Err: errors.New("SHOW SERIES doesn't support time in WHERE clause")} + } + + // Find the database. + db := q.Store.DatabaseIndex(database) + if db == nil { + return &influxql.Result{} + } + + // Expand regex expressions in the FROM clause. + sources, err := q.Store.ExpandSources(stmt.Sources) + if err != nil { + return &influxql.Result{Err: err} + } + + // Get the list of measurements we're interested in. + measurements, err := measurementsFromSourcesOrDB(db, sources...) + if err != nil { + return &influxql.Result{Err: err} + } + + // Create result struct that will be populated and returned. + result := &influxql.Result{ + Series: make(models.Rows, 0, len(measurements)), + } + + // Loop through measurements to build result. One result row / measurement. + for _, m := range measurements { + var ids SeriesIDs + var filters FilterExprs + + if stmt.Condition != nil { + // Get series IDs that match the WHERE clause. + ids, filters, err = m.walkWhereForSeriesIds(stmt.Condition) + if err != nil { + return &influxql.Result{Err: err} + } + + // Delete boolean literal true filter expressions. + filters.DeleteBoolLiteralTrues() + + // Check for unsupported field filters. + if filters.Len() > 0 { + return &influxql.Result{Err: errors.New("SHOW SERIES doesn't support fields in WHERE clause")} + } + + // If no series matched, then go to the next measurement. + if len(ids) == 0 { + continue + } + } else { + // No WHERE clause so get all series IDs for this measurement. + ids = m.seriesIDs + } + + // Make a new row for this measurement. + r := &models.Row{ + Name: m.Name, + Columns: m.TagKeys(), + } + + // Loop through series IDs getting matching tag sets. + for _, id := range ids { + if s, ok := m.seriesByID[id]; ok { + values := make([]interface{}, 0, len(r.Columns)) + + // make the series key the first value + values = append(values, s.Key) + + for _, column := range r.Columns { + values = append(values, s.Tags[column]) + } + + // Add the tag values to the row. + r.Values = append(r.Values, values) + } + } + // make the id the first column + r.Columns = append([]string{"_key"}, r.Columns...) + + // Append the row to the result. + result.Series = append(result.Series, r) + } + + if stmt.Limit > 0 || stmt.Offset > 0 { + result.Series = q.filterShowSeriesResult(stmt.Limit, stmt.Offset, result.Series) + } + + return result } // filterShowSeriesResult will limit the number of series returned based on the limit and the offset. @@ -399,14 +523,8 @@ func (q *QueryExecutor) planStatement(stmt influxql.Statement, database string, return q.PlanSelect(stmt, chunkSize) case *influxql.ShowMeasurementsStatement: return q.planShowMeasurements(stmt, database, chunkSize) - case *influxql.ShowSeriesStatement: - return q.planShowSeries(stmt, database, chunkSize) case *influxql.ShowTagKeysStatement: return q.planShowTagKeys(stmt, database, chunkSize) - case *influxql.ShowTagValuesStatement: - return q.planShowTagValues(stmt, database, chunkSize) - case *influxql.ShowFieldKeysStatement: - return q.planShowFieldKeys(stmt, database, chunkSize) default: return nil, fmt.Errorf("can't plan statement type: %v", stmt) } @@ -419,18 +537,35 @@ func (q *QueryExecutor) planShowMeasurements(stmt *influxql.ShowMeasurementsStat return nil, errors.New("SHOW MEASUREMENTS doesn't support time in WHERE clause") } - return q.PlanSelect(&influxql.SelectStatement{ - Fields: influxql.Fields{ - {Expr: &influxql.VarRef{Val: "name"}}, - }, - Sources: influxql.Sources{ - &influxql.Measurement{Database: database, Name: "_measurements"}, - }, - Condition: stmt.Condition, - Offset: stmt.Offset, - Limit: stmt.Limit, - SortFields: stmt.SortFields, - }, chunkSize) + panic("FIXME: Implement SHOW MEASUREMENTS") + /* + // Get the database info. + di, err := q.MetaClient.Database(database) + if err != nil { + return nil, err + } else if di == nil { + return nil, ErrDatabaseNotFound(database) + } + + // Get info for all shards in the database. + shards := di.ShardInfos() + + // Build the Mappers, one per shard. + mappers := []Mapper{} + for _, sh := range shards { + m, err := q.ShardMapper.CreateMapper(sh, stmt, chunkSize) + if err != nil { + return nil, err + } + if m == nil { + // No data for this shard, skip it. + continue + } + mappers = append(mappers, m) + } + + return NewShowMeasurementsExecutor(stmt, mappers, chunkSize), nil + */ } // planShowTagKeys creates an execution plan for a SHOW MEASUREMENTS statement and returns an Executor. @@ -440,18 +575,22 @@ func (q *QueryExecutor) planShowTagKeys(stmt *influxql.ShowTagKeysStatement, dat return nil, errors.New("SHOW TAG KEYS doesn't support time in WHERE clause") } - return q.PlanSelect(&influxql.SelectStatement{ - Fields: influxql.Fields{ - {Expr: &influxql.VarRef{Val: "tagKey"}}, - }, - Sources: influxql.Sources{ - &influxql.Measurement{Database: database, Name: "_tagkeys"}, - }, - Condition: stmt.Condition, - Offset: stmt.Offset, - Limit: stmt.Limit, - SortFields: stmt.SortFields, - }, chunkSize) + panic("FIXME: Implement SHOW TAG KEYS") + + /* + return q.PlanSelect(&influxql.SelectStatement{ + Fields: influxql.Fields{ + {Expr: &influxql.VarRef{Val: "tagKey"}}, + }, + Sources: influxql.Sources{ + &influxql.Measurement{Database: database, Name: "_tagkeys"}, + }, + Condition: stmt.Condition, + Offset: stmt.Offset, + Limit: stmt.Limit, + SortFields: stmt.SortFields, + }, chunkSize) + */ } func (q *QueryExecutor) executeStatement(statementID int, stmt influxql.Statement, database string, results chan *influxql.Result, chunkSize int, closing chan struct{}) error { @@ -546,33 +685,135 @@ func (q *QueryExecutor) writeInto(row *models.Row, selectstmt *influxql.SelectSt return nil } -func (q *QueryExecutor) planShowTagValues(stmt *influxql.ShowTagValuesStatement, database string, chunkSize int) (Executor, error) { - return q.PlanSelect(&influxql.SelectStatement{ - Fields: influxql.Fields{ - {Expr: &influxql.Wildcard{}}, - }, - Sources: influxql.Sources{ - &influxql.Measurement{Database: database, Name: "_tagvalues"}, - }, - Condition: stmt.Condition, - Offset: stmt.Offset, - Limit: stmt.Limit, - SortFields: stmt.SortFields, - }, chunkSize) +func (q *QueryExecutor) executeShowTagValuesStatement(stmt *influxql.ShowTagValuesStatement, database string) *influxql.Result { + // Check for time in WHERE clause (not supported). + if influxql.HasTimeExpr(stmt.Condition) { + return &influxql.Result{Err: errors.New("SHOW TAG VALUES doesn't support time in WHERE clause")} + } + + // Find the database. + db := q.Store.DatabaseIndex(database) + if db == nil { + return &influxql.Result{} + } + + // Expand regex expressions in the FROM clause. + sources, err := q.Store.ExpandSources(stmt.Sources) + if err != nil { + return &influxql.Result{Err: err} + } + + // Get the list of measurements we're interested in. + measurements, err := measurementsFromSourcesOrDB(db, sources...) + if err != nil { + return &influxql.Result{Err: err} + } + + // Make result. + result := &influxql.Result{ + Series: make(models.Rows, 0), + } + + tagValues := make(map[string]stringSet) + for _, m := range measurements { + var ids SeriesIDs + + if stmt.Condition != nil { + // Get series IDs that match the WHERE clause. + ids, _, err = m.walkWhereForSeriesIds(stmt.Condition) + if err != nil { + return &influxql.Result{Err: err} + } + + // If no series matched, then go to the next measurement. + if len(ids) == 0 { + continue + } + + // TODO: check return of walkWhereForSeriesIds for fields + } else { + // No WHERE clause so get all series IDs for this measurement. + ids = m.seriesIDs + } + + for k, v := range m.tagValuesByKeyAndSeriesID(stmt.TagKeys, ids) { + _, ok := tagValues[k] + if !ok { + tagValues[k] = v + } + tagValues[k] = tagValues[k].union(v) + } + } + + for k, v := range tagValues { + r := &models.Row{ + Name: k + "TagValues", + Columns: []string{k}, + } + + vals := v.list() + sort.Strings(vals) + + for _, val := range vals { + v := interface{}(val) + r.Values = append(r.Values, []interface{}{v}) + } + + result.Series = append(result.Series, r) + } + + sort.Sort(result.Series) + return result } -func (q *QueryExecutor) planShowFieldKeys(stmt *influxql.ShowFieldKeysStatement, database string, chunkSize int) (Executor, error) { - return q.PlanSelect(&influxql.SelectStatement{ - Fields: influxql.Fields{ - {Expr: &influxql.VarRef{Val: "fieldKey"}}, - }, - Sources: influxql.Sources{ - &influxql.Measurement{Database: database, Name: "_fieldkeys"}, - }, - Offset: stmt.Offset, - Limit: stmt.Limit, - SortFields: stmt.SortFields, - }, chunkSize) +func (q *QueryExecutor) executeShowFieldKeysStatement(stmt *influxql.ShowFieldKeysStatement, database string) *influxql.Result { + var err error + + // Find the database. + db := q.Store.DatabaseIndex(database) + if db == nil { + return &influxql.Result{} + } + + // Expand regex expressions in the FROM clause. + sources, err := q.Store.ExpandSources(stmt.Sources) + if err != nil { + return &influxql.Result{Err: err} + } + + measurements, err := measurementsFromSourcesOrDB(db, sources...) + if err != nil { + return &influxql.Result{Err: err} + } + + // Make result. + result := &influxql.Result{ + Series: make(models.Rows, 0, len(measurements)), + } + + // Loop through measurements, adding a result row for each. + for _, m := range measurements { + // Create a new row. + r := &models.Row{ + Name: m.Name, + Columns: []string{"fieldKey"}, + } + + // Get a list of field names from the measurement then sort them. + names := m.FieldNames() + sort.Strings(names) + + // Add the field names to the result row values. + for _, n := range names { + v := interface{}(n) + r.Values = append(r.Values, []interface{}{v}) + } + + // Append the row to the result. + result.Series = append(result.Series, r) + } + + return result } // measurementsFromSourcesOrDB returns a list of measurements from the diff --git a/tsdb/shard.go b/tsdb/shard.go index 1caa450fd3..c9c7dcb340 100644 --- a/tsdb/shard.go +++ b/tsdb/shard.go @@ -263,9 +263,8 @@ func (s *Shard) WritePoints(points []models.Point) error { } // DeleteSeries deletes a list of series. -func (s *Shard) DeleteSeries(sources influxql.Sources, condition influxql.Expr) error { - panic("FIXME: implement delete series") - //return s.engine.DeleteSeries(sources, condition) +func (s *Shard) DeleteSeries(seriesKeys []string) error { + return s.engine.DeleteSeries(seriesKeys) } // DeleteMeasurement deletes a measurement and all underlying series. diff --git a/tsdb/store.go b/tsdb/store.go index d9a600cf65..2869478fbb 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -418,7 +418,7 @@ func (s *Store) ShardRelativePath(id uint64) (string, error) { } // DeleteSeries loops through the local shards and deletes the series data and metadata for the passed in series keys -func (s *Store) DeleteSeries(database string, sources influxql.Sources, condition influxql.Expr) error { +func (s *Store) DeleteSeries(database string, seriesKeys []string) error { s.mu.RLock() defer s.mu.RUnlock() @@ -431,7 +431,7 @@ func (s *Store) DeleteSeries(database string, sources influxql.Sources, conditio if sh.index != db { continue } - if err := sh.DeleteSeries(sources, condition); err != nil { + if err := sh.DeleteSeries(seriesKeys); err != nil { return err } }