Merge branch 'pr-318'

pull/344/head
John Shahid 2014-03-13 16:37:05 -04:00
commit d3124b3da5
24 changed files with 461 additions and 94 deletions

View File

@ -1,3 +1,4 @@
## v0.0.1 [2013-10-22]
* Initial Release
@ -296,3 +297,4 @@
- [Issue #333](https://github.com/influxdb/influxdb/issues/333). Better
error message when password is invalid and don't create the user if
the password is invalid
- [Issue #318](https://github.com/influxdb/influxdb/issues/318). Support EXPLAIN queries

View File

@ -9,7 +9,7 @@ bind-address = "0.0.0.0"
[logging]
# logging level can be one of "debug", "info", "warn" or "error"
level = "info"
level = "debug"
file = "influxdb.log" # stdout to log to standard out
# Configure the admin server

View File

@ -31,8 +31,14 @@ type QueryProcessor interface {
// This method returns true if the query should continue. If the query should be stopped,
// like maybe the limit was hit, it should return false
YieldPoint(seriesName *string, columnNames []string, point *p.Point) bool
YieldSeries(seriesName *string, columnNames []string, seriesIncoming *p.Series) bool
YieldSeries(seriesIncoming *p.Series) bool
Close()
// Set by the shard, so EXPLAIN query can know query against which shard is being measured
SetShardInfo(shardId int, shardLocal bool)
// Let QueryProcessor identify itself. What if it is a spy and we can't check that?
GetName() string
}
type NewShardData struct {
@ -211,21 +217,18 @@ func (self *ShardData) Query(querySpec *parser.QuerySpec, response chan *p.Respo
processor = engine.NewPassthroughEngine(response, maxDeleteResults)
} else {
if self.ShouldAggregateLocally(querySpec) {
fmt.Printf("creating a query engine\n")
processor, err = engine.NewQueryEngine(querySpec.SelectQuery(), response)
if err != nil {
response <- &p.Response{Type: &endStreamResponse, ErrorMessage: p.String(err.Error())}
log.Error("Error while creating engine: %s", err)
return
}
if querySpec.IsExplainQuery() {
processor.SetShardInfo(int(self.Id()), self.IsLocal)
}
processor.SetShardInfo(int(self.Id()), self.IsLocal)
} else {
maxPointsToBufferBeforeSending := 1000
fmt.Printf("creating a passthrough engine\n")
processor = engine.NewPassthroughEngine(response, maxPointsToBufferBeforeSending)
if querySpec.IsExplainQuery() {
processor.SetShardInfo(int(self.Id()), self.IsLocal)
}
}
}
shard, err := self.store.GetOrCreateShard(self.id)

View File

@ -42,12 +42,13 @@ var (
// shorter constants for readability
var (
dropDatabase = protocol.Request_DROP_DATABASE
queryRequest = protocol.Request_QUERY
endStreamResponse = protocol.Response_END_STREAM
queryResponse = protocol.Response_QUERY
heartbeatResponse = protocol.Response_HEARTBEAT
write = protocol.Request_WRITE
dropDatabase = protocol.Request_DROP_DATABASE
queryRequest = protocol.Request_QUERY
endStreamResponse = protocol.Response_END_STREAM
queryResponse = protocol.Response_QUERY
heartbeatResponse = protocol.Response_HEARTBEAT
explainQueryResponse = protocol.Response_EXPLAIN_QUERY
write = protocol.Request_WRITE
)
type SeriesWriter interface {
@ -77,7 +78,7 @@ func NewCoordinatorImpl(config *configuration.Configuration, raftServer ClusterC
}
func (self *CoordinatorImpl) RunQuery(user common.User, database string, queryString string, seriesWriter SeriesWriter) (err error) {
log.Debug("COORD: RunQuery: ", queryString)
log.Debug("COORD: RunQuery: %s", queryString)
// don't let a panic pass beyond RunQuery
defer recoverFunc(database, queryString)
@ -254,14 +255,17 @@ func (self *CoordinatorImpl) getShardsAndProcessor(querySpec *parser.QuerySpec,
go func() {
for {
res := <-responseChan
if *res.Type == endStreamResponse || *res.Type == accessDeniedResponse {
response := <-responseChan
if *response.Type == endStreamResponse || *response.Type == accessDeniedResponse {
writer.Close()
seriesClosed <- true
return
}
if res.Series != nil && len(res.Series.Points) > 0 {
writer.Write(res.Series)
if !(*response.Type == queryResponse && querySpec.IsExplainQuery()) {
if response.Series != nil && len(response.Series.Points) > 0 {
writer.Write(response.Series)
}
}
}
}()
@ -278,6 +282,7 @@ func (self *CoordinatorImpl) runQuerySpec(querySpec *parser.QuerySpec, seriesWri
responses := make([]chan *protocol.Response, 0)
for _, shard := range shards {
responseChan := make(chan *protocol.Response, self.config.QueryShardBufferSize)
// We query shards for data and stream them to query processor
go shard.Query(querySpec, responseChan)
responses = append(responses, responseChan)
}
@ -296,20 +301,27 @@ func (self *CoordinatorImpl) runQuerySpec(querySpec *parser.QuerySpec, seriesWri
break
}
// if we don't have a processor, yield the point to the writer
if processor == nil {
log.Debug("WRITING: ", len(response.Series.Points))
seriesWriter.Write(response.Series)
log.Debug("WRITING (done)")
if response.Series == nil || len(response.Series.Points) == 0 {
log.Debug("Series has no points, continue")
continue
}
// if the data wasn't aggregated at the shard level, aggregate
// the data here
if response.Series != nil {
log.Debug("YIELDING: ", len(response.Series.Points))
processor.YieldSeries(response.Series.Name, response.Series.Fields, response.Series)
}
// if we don't have a processor, yield the point to the writer
// this happens if shard took care of the query
// otherwise client will get points from passthrough engine
if processor != nil {
// if the data wasn't aggregated at the shard level, aggregate
// the data here
log.Debug("YIELDING: %d points", len(response.Series.Points))
processor.YieldSeries(response.Series)
continue
}
// If we have EXPLAIN query, we don't write actual points (of
// response.Type Query) to the client
if !(*response.Type == queryResponse && querySpec.IsExplainQuery()) {
seriesWriter.Write(response.Series)
}
}
log.Debug("DONE: shard: ", shards[i].String())
}

View File

@ -58,7 +58,7 @@ func (self *ProtobufRequestHandler) handleQuery(request *protocol.Request, conn
// the query should always parse correctly since it was parsed at the originating server.
queries, err := parser.ParseQuery(*request.Query)
if err != nil || len(queries) < 1 {
log.Error("Erorr parsing query: ", err)
log.Error("Error parsing query: ", err)
errorMsg := fmt.Sprintf("Cannot find user %s", *request.UserName)
response := &protocol.Response{Type: &endStreamResponse, ErrorMessage: &errorMsg, RequestId: request.Id}
self.WriteResponse(conn, response)
@ -80,6 +80,7 @@ func (self *ProtobufRequestHandler) handleQuery(request *protocol.Request, conn
}
shard := self.clusterConfig.GetLocalShardById(*request.ShardId)
querySpec := parser.NewQuerySpec(user, *request.Database, query)
responseChan := make(chan *protocol.Response)

View File

@ -281,8 +281,12 @@ func (self *LevelDbShard) executeQueryForSeries(querySpec *parser.QuerySpec, ser
if len(seriesOutgoing.Points) >= self.pointBatchSize {
for _, alias := range aliases {
_alias := alias
if !processor.YieldSeries(&_alias, fieldNames, seriesOutgoing) {
series := &protocol.Series{
Name: proto.String(alias),
Fields: fieldNames,
Points: seriesOutgoing.Points,
}
if !processor.YieldSeries(series) {
shouldContinue = false
}
}
@ -296,9 +300,9 @@ func (self *LevelDbShard) executeQueryForSeries(querySpec *parser.QuerySpec, ser
//Yield remaining data
for _, alias := range aliases {
_alias := alias
log.Debug("Final Flush %s", _alias)
if !processor.YieldSeries(&_alias, fieldNames, seriesOutgoing) {
log.Debug("Final Flush %s", alias)
series := &protocol.Series{Name: protocol.String(alias), Fields: seriesOutgoing.Fields, Points: seriesOutgoing.Points}
if !processor.YieldSeries(series) {
log.Debug("Cancelled...")
}
}

View File

@ -1,6 +1,6 @@
package engine
import (
import (
"common"
"fmt"
"math"
@ -554,7 +554,7 @@ func (self *TimestampAggregator) AggregatePoint(series string, group interface{}
if timestamps == nil {
timestamps = make(map[interface{}]int64)
self.timestamps[series] = timestamps
}
}
if self.duration != nil {
timestamps[group] = *p.GetTimestampInMicroseconds() / *self.duration * *self.duration
} else {
@ -571,18 +571,19 @@ func (self *TimestampAggregator) AggregateSeries(series string, group interface{
}
if len(s.Points) > 0 {
if self.duration != nil {
timestamps[group] = *(s.Points[len(s.Points) - 1]).GetTimestampInMicroseconds() / *self.duration * *self.duration
timestamps[group] = *(s.Points[len(s.Points)-1]).GetTimestampInMicroseconds() / *self.duration * *self.duration
} else {
timestamps[group] = *(s.Points[len(s.Points) - 1]).GetTimestampInMicroseconds()
timestamps[group] = *(s.Points[len(s.Points)-1]).GetTimestampInMicroseconds()
}
}
return nil
}
/*
//TODO: to be optimized
func (self *TimestampAggregator) AggregateSeries(series string, group interface{}, s *protocol.Series) error {
//log.Error("Timestamp: ", len(s.Points))
for _, p := range s.Points {
for _, p := range s.Points {
//log.Error("Point: ", p)
self.AggregatePoint(series, group, p)
}
@ -614,12 +615,9 @@ func NewTimestampAggregator(query *parser.SelectQuery, _ *parser.Value) (Aggrega
var durationPtr *int64
//log.Error("Duration: ", duration)
if duration != nil {
newDuration := int64(*duration / time.Microsecond)
durationPtr = &newDuration
// log.Error("Woohoo! ", durationPtr)
}
return &TimestampAggregator{

View File

@ -29,17 +29,26 @@ type QueryEngine struct {
pointsRange map[string]*PointRange
groupBy *parser.GroupByClause
aggregateYield func(*protocol.Series) error
explain bool
// query statistics
runStartTime float64
runEndTime float64
pointsRead int64
pointsWritten int64
shardId int
shardLocal bool
}
var (
endStreamResponse = protocol.Response_END_STREAM
explainQueryResponse = protocol.Response_EXPLAIN_QUERY
)
const (
POINT_BATCH_SIZE = 64
)
var (
responseQuery = protocol.Response_QUERY
responseEndStream = protocol.Response_END_STREAM
)
// distribute query and possibly do the merge/join before yielding the points
func (self *QueryEngine) distributeQuery(query *parser.SelectQuery, yield func(*protocol.Series) error) error {
// see if this is a merge query
@ -69,11 +78,30 @@ func NewQueryEngine(query *parser.SelectQuery, responseChan chan *protocol.Respo
where: query.GetWhereCondition(),
limiter: NewLimiter(limit),
responseChan: responseChan,
seriesToPoints: make(map[string]*protocol.Series),
seriesToPoints: make(map[string]*protocol.Series),
// stats stuff
explain: query.IsExplainQuery(),
runStartTime: 0,
runEndTime: 0,
pointsRead: 0,
pointsWritten: 0,
shardId: 0,
shardLocal: false, //that really doesn't matter if it is not EXPLAIN query
}
if queryEngine.explain {
queryEngine.runStartTime = float64(time.Now().UnixNano()) / float64(time.Millisecond)
}
yield := func(series *protocol.Series) error {
response := &protocol.Response{Type: &responseQuery, Series: series}
var response *protocol.Response
if queryEngine.explain {
//TODO: We may not have to send points, just count them
queryEngine.pointsWritten += int64(len(series.Points))
}
response = &protocol.Response{Type: &queryResponse, Series: series}
responseChan <- response
return nil
}
@ -93,6 +121,12 @@ func NewQueryEngine(query *parser.SelectQuery, responseChan chan *protocol.Respo
return queryEngine, nil
}
// Shard will call this method for EXPLAIN query
func (self *QueryEngine) SetShardInfo(shardId int, shardLocal bool) {
self.shardId = shardId
self.shardLocal = shardLocal
}
// Returns false if the query should be stopped (either because of limit or error)
func (self *QueryEngine) YieldPoint(seriesName *string, fieldNames []string, point *protocol.Point) (shouldContinue bool) {
shouldContinue = true
@ -107,10 +141,20 @@ func (self *QueryEngine) YieldPoint(seriesName *string, fieldNames []string, poi
}
series.Points = append(series.Points, point)
if self.explain {
self.pointsRead++
}
fmt.Printf("self.seriesToPoints: %#v\n", self.seriesToPoints)
return shouldContinue
}
func (self *QueryEngine) YieldSeries(seriesName *string, fieldNames []string, seriesIncoming *protocol.Series) (shouldContinue bool) {
func (self *QueryEngine) YieldSeries(seriesIncoming *protocol.Series) (shouldContinue bool) {
if self.explain {
self.pointsRead += int64(len(seriesIncoming.Points))
}
seriesName := seriesIncoming.GetName()
self.seriesToPoints[seriesName] = &protocol.Series{Name: &seriesName, Fields: seriesIncoming.Fields}
return self.yieldSeriesData(seriesIncoming)
}
@ -169,6 +213,8 @@ func (self *QueryEngine) filter(series *protocol.Series) ([]*protocol.Series, er
}
func (self *QueryEngine) Close() {
fmt.Printf("Closing: %#v\n", self.seriesToPoints)
for _, series := range self.seriesToPoints {
if len(series.Points) == 0 {
continue
@ -182,6 +228,7 @@ func (self *QueryEngine) Close() {
Name: series.Name,
Fields: series.Fields,
}
fmt.Printf("yielding empty series for %s\n", series.GetName())
err = self.yield(s)
if err != nil {
break
@ -191,7 +238,14 @@ func (self *QueryEngine) Close() {
if self.isAggregateQuery {
self.runAggregates()
}
response := &protocol.Response{Type: &responseEndStream}
if self.explain {
self.runEndTime = float64(time.Now().UnixNano()) / float64(time.Millisecond)
log.Debug("QueryEngine: %.3f R:%d W:%d", self.runEndTime-self.runStartTime, self.pointsRead, self.pointsWritten)
self.SendQueryStats()
}
response := &protocol.Response{Type: &endStreamResponse}
if err != nil {
message := err.Error()
response.ErrorMessage = &message
@ -199,6 +253,40 @@ func (self *QueryEngine) Close() {
self.responseChan <- response
}
func (self *QueryEngine) SendQueryStats() {
timestamp := time.Now().UnixNano() / int64(time.Microsecond)
runTime := self.runEndTime - self.runStartTime
points := []*protocol.Point{}
pointsRead := self.pointsRead
pointsWritten := self.pointsWritten
shardId := int64(self.shardId)
shardLocal := self.shardLocal
engineName := "QueryEngine"
point := &protocol.Point{
Values: []*protocol.FieldValue{
&protocol.FieldValue{StringValue: &engineName},
&protocol.FieldValue{Int64Value: &shardId},
&protocol.FieldValue{BoolValue: &shardLocal},
&protocol.FieldValue{DoubleValue: &runTime},
&protocol.FieldValue{Int64Value: &pointsRead},
&protocol.FieldValue{Int64Value: &pointsWritten},
},
Timestamp: &timestamp,
}
points = append(points, point)
seriesName := "explain query"
series := &protocol.Series{
Name: &seriesName,
Fields: []string{"engine_name", "shard_id", "shard_local", "run_time", "points_read", "points_written"},
Points: points,
}
response := &protocol.Response{Type: &explainQueryResponse, Series: series}
self.responseChan <- response
}
func containsArithmeticOperators(query *parser.SelectQuery) bool {
for _, column := range query.GetColumnNames() {
if column.Type == parser.ValueExpression {
@ -612,3 +700,7 @@ func (self *QueryEngine) executeArithmeticQuery(query *parser.SelectQuery, yield
return nil
})
}
func (self *QueryEngine) GetName() string {
return "QueryEngine"
}

View File

@ -1,7 +1,7 @@
package engine
import (
"protocol"
"protocol"
)
type Limiter struct {
@ -18,10 +18,10 @@ func NewLimiter(limit int) *Limiter {
}
}
func (self *Limiter) calculateLimitAndSlicePoints(series *protocol.Series) {
func (self *Limiter) calculateLimitAndSlicePoints(series *protocol.Series) {
if self.shouldLimit {
// if the limit is 0, stop returning any points
limit := self.limitForSeries(*series.Name)
// if the limit is 0, stop returning any points
limit := self.limitForSeries(*series.Name)
defer func() { self.limits[*series.Name] = limit }()
if limit == 0 {
series.Points = nil
@ -33,7 +33,7 @@ func (self *Limiter) calculateLimitAndSlicePoints(series *protocol.Series) {
series.Points = series.Points[0:sliceTo]
limit = 0
}
}
}
}
func (self *Limiter) hitLimit(seriesName string) bool {

View File

@ -9,8 +9,7 @@ const (
)
var (
queryResponse = protocol.Response_QUERY
endStreamResponse = protocol.Response_END_STREAM
queryResponse = protocol.Response_QUERY
)
type ListSeriesEngine struct {
@ -42,7 +41,7 @@ func (self *ListSeriesEngine) YieldPoint(seriesName *string, columnNames []strin
return true
}
func (self *ListSeriesEngine) YieldSeries(seriesName *string, columnNames []string, seriesIncoming *protocol.Series) bool {
func (self *ListSeriesEngine) YieldSeries(seriesIncoming *protocol.Series) bool {
if len(self.response.MultiSeries) > MAX_SERIES_IN_RESPONSE {
self.responseChan <- self.response
self.response = &protocol.Response{
@ -50,7 +49,7 @@ func (self *ListSeriesEngine) YieldSeries(seriesName *string, columnNames []stri
MultiSeries: make([]*protocol.Series, 0),
}
}
self.response.MultiSeries = append(self.response.MultiSeries, &protocol.Series{Name: seriesName})
self.response.MultiSeries = append(self.response.MultiSeries, &protocol.Series{Name: seriesIncoming.Name})
return true
}
@ -61,3 +60,11 @@ func (self *ListSeriesEngine) Close() {
response := &protocol.Response{Type: &endStreamResponse}
self.responseChan <- response
}
func (self *ListSeriesEngine) SetShardInfo(shardId int, shardLocal bool) {
//EXPLAIN doens't work with this query
}
func (self *ListSeriesEngine) GetName() string {
return "ListSeriesEngine"
}

View File

@ -1,6 +1,7 @@
package engine
import (
"fmt"
"parser"
"protocol"
)
@ -16,6 +17,7 @@ func getJoinYield(query *parser.SelectQuery, yield func(*protocol.Series) error)
name := table1 + "_join_" + table2
return mergeYield(table1, table2, false, query.Ascending, func(s *protocol.Series) error {
fmt.Printf("join series: %d\n", len(s.Points))
if *s.Name == table1 {
lastPoint1 = s.Points[len(s.Points)-1]
if lastFields1 == nil {
@ -269,6 +271,8 @@ func mergeYield(table1, table2 string, modifyValues bool, ascending bool, yield
}
return func(p *protocol.Series) error {
fmt.Printf("series: %v\n", len(p.Points))
state.updateState(p)
if err := state.flushIfNecessary(yield); err != nil {

View File

@ -12,6 +12,15 @@ type PassthroughEngine struct {
response *protocol.Response
maxPointsInResponse int
limiter *Limiter
responseType *protocol.Response_Type
// query statistics
runStartTime float64
runEndTime float64
pointsRead int64
pointsWritten int64
shardId int
shardLocal bool
}
func NewPassthroughEngine(responseChan chan *protocol.Response, maxPointsInResponse int) *PassthroughEngine {
@ -19,14 +28,24 @@ func NewPassthroughEngine(responseChan chan *protocol.Response, maxPointsInRespo
}
func NewPassthroughEngineWithLimit(responseChan chan *protocol.Response, maxPointsInResponse, limit int) *PassthroughEngine {
return &PassthroughEngine{
passthroughEngine := &PassthroughEngine{
responseChan: responseChan,
maxPointsInResponse: maxPointsInResponse,
limiter: NewLimiter(limit),
responseType: &queryResponse,
runStartTime: 0,
runEndTime: 0,
pointsRead: 0,
pointsWritten: 0,
shardId: 0,
shardLocal: false, //that really doesn't matter if it is not EXPLAIN query
}
return passthroughEngine
}
func (self *PassthroughEngine) YieldPoint(seriesName *string, columnNames []string, point *protocol.Point) bool {
self.responseType = &queryResponse
series := &protocol.Series{Name: seriesName, Points: []*protocol.Point{point}, Fields: columnNames}
self.limiter.calculateLimitAndSlicePoints(series)
if len(series.Points) == 0 {
@ -35,19 +54,19 @@ func (self *PassthroughEngine) YieldPoint(seriesName *string, columnNames []stri
if self.response == nil {
self.response = &protocol.Response{
Type: &queryResponse,
Type: self.responseType,
Series: series,
}
} else if self.response.Series.Name != seriesName {
} else if *self.response.Series.Name != *seriesName {
self.responseChan <- self.response
self.response = &protocol.Response{
Type: &queryResponse,
Type: self.responseType,
Series: series,
}
} else if len(self.response.Series.Points) > self.maxPointsInResponse {
self.responseChan <- self.response
self.response = &protocol.Response{
Type: &queryResponse,
Type: self.responseType,
Series: series,
}
} else {
@ -56,54 +75,45 @@ func (self *PassthroughEngine) YieldPoint(seriesName *string, columnNames []stri
return !self.limiter.hitLimit(*seriesName)
}
func (self *PassthroughEngine) YieldSeries(seriesName *string, fieldNames []string, seriesIncoming *protocol.Series) bool {
func (self *PassthroughEngine) YieldSeries(seriesIncoming *protocol.Series) bool {
log.Debug("PassthroughEngine YieldSeries %d", len(seriesIncoming.Points))
/*
seriesCopy := &protocol.Series{Name: protocol.String(*seriesName), Fields: fieldNames, Points: make([]*protocol.Point, 0, POINT_BATCH_SIZE)}
for _, point := range seriesIncoming.Points {
seriesCopy.Points = append(seriesCopy.Points, point)
if *seriesIncoming.Name == "explain query" {
self.responseType = &explainQueryResponse
log.Debug("Response Changed!")
} else {
self.responseType = &queryResponse
}
*/
//log.Debug("PT Copied %d %d", len(seriesIncoming.Points), POINT_BATCH_SIZE)
self.limiter.calculateLimitAndSlicePoints(seriesIncoming)
if len(seriesIncoming.Points) == 0 {
log.Error("Not sent == 0")
return false
}
}
//log.Debug("PassthroughEngine", seriesCopy)
/*
self.response = &protocol.Response{
Type: &queryResponse,
Series: seriesIncoming,
}
self.responseChan <- self.response
*/
if self.response == nil {
self.response = &protocol.Response{
Type: &queryResponse,
Type: self.responseType,
Series: seriesIncoming,
}
} else if *self.response.Series.Name != *seriesName {
} else if self.response.Series.GetName() != seriesIncoming.GetName() {
self.responseChan <- self.response
self.response = &protocol.Response{
Type: &queryResponse,
Type: self.responseType,
Series: seriesIncoming,
}
} else if len(self.response.Series.Points) > self.maxPointsInResponse {
self.responseChan <- self.response
self.response = &protocol.Response{
Type: &queryResponse,
Type: self.responseType,
Series: seriesIncoming,
}
} else {
self.response.Series.Points = append(self.response.Series.Points, seriesIncoming.Points...)
}
return !self.limiter.hitLimit(*seriesName)
return !self.limiter.hitLimit(seriesIncoming.GetName())
//return true
}
func (self *PassthroughEngine) Close() {
if self.response != nil && self.response.Series != nil && self.response.Series.Name != nil {
self.responseChan <- self.response
@ -111,3 +121,11 @@ func (self *PassthroughEngine) Close() {
response := &protocol.Response{Type: &endStreamResponse}
self.responseChan <- response
}
func (self *PassthroughEngine) SetShardInfo(shardId int, shardLocal bool) {
//EXPLAIN doens't really work with this query (yet ?)
}
func (self *PassthroughEngine) GetName() string {
return "PassthroughEngine"
}

View File

@ -298,6 +298,177 @@ func (self *IntegrationSuite) TestShortPasswords(c *C) {
c.Assert(resp.StatusCode, Equals, http.StatusOK)
}
func (self *IntegrationSuite) TestExplainsWithPassthrough(c *C) {
data := `
[{
"points": [
["val1", 2],
["val1", 3]
],
"name": "test_explain_passthrough",
"columns": ["val_1", "val_2"]
}]`
c.Assert(self.server.WriteData(data), IsNil)
bs, err := self.server.RunQuery("explain select val_1 from test_explain_passthrough where time > now() - 1h", "m")
c.Assert(err, IsNil)
series := []*SerializedSeries{}
err = json.Unmarshal(bs, &series)
c.Assert(err, IsNil)
c.Assert(series, HasLen, 1)
c.Assert(series[0].Name, Equals, "explain query")
c.Assert(series[0].Columns, HasLen, 7) // 6 columns plus the time column
c.Assert(series[0].Points, HasLen, 1)
c.Assert(series[0].Points[0][1], Equals, "QueryEngine")
c.Assert(series[0].Points[0][5], Equals, float64(2.0))
c.Assert(series[0].Points[0][6], Equals, float64(2.0))
}
func (self *IntegrationSuite) TestExplainsWithPassthroughAndLimit(c *C) {
points := []string{}
for i := 0; i < 101; i++ {
points = append(points, fmt.Sprintf(`["val1", %d]`, i))
}
data := fmt.Sprintf(`
[{
"points": [%s],
"name": "test_explain_passthrough_limit",
"columns": ["val_1", "val_2"]
}]`, strings.Join(points, ","))
c.Assert(self.server.WriteData(data), IsNil)
bs, err := self.server.RunQuery("explain select val_1 from test_explain_passthrough_limit where time > now() - 1h limit 1", "m")
c.Assert(err, IsNil)
series := []*SerializedSeries{}
err = json.Unmarshal(bs, &series)
c.Assert(err, IsNil)
c.Assert(series, HasLen, 1)
c.Assert(series[0].Name, Equals, "explain query")
c.Assert(series[0].Columns, HasLen, 7) // 6 columns plus the time column
c.Assert(series[0].Points, HasLen, 1)
c.Assert(series[0].Points[0][1], Equals, "QueryEngine")
// we can read at most point-batch-size points, which is set to 100
// by default
c.Assert(series[0].Points[0][5], Equals, float64(100.0))
c.Assert(series[0].Points[0][6], Equals, float64(1.0))
}
func (self *IntegrationSuite) TestExplainsWithNonLocalAggregator(c *C) {
data := `
[{
"points": [
["val1", 2],
["val1", 3],
["val1", 4]
],
"name": "test_explain_non_local",
"columns": ["val_1", "val_2"]
}]`
c.Assert(self.server.WriteData(data), IsNil)
bs, err := self.server.RunQuery("explain select count(val_1) from test_explain_non_local where time > now() - 1h", "m")
c.Assert(err, IsNil)
series := []*SerializedSeries{}
err = json.Unmarshal(bs, &series)
c.Assert(err, IsNil)
c.Assert(series, HasLen, 1)
c.Assert(series[0].Name, Equals, "explain query")
c.Assert(series[0].Columns, HasLen, 7) // 6 columns plus the time column
c.Assert(series[0].Points, HasLen, 1)
c.Assert(series[0].Points[0][1], Equals, "QueryEngine")
c.Assert(series[0].Points[0][5], Equals, float64(3.0))
c.Assert(series[0].Points[0][6], Equals, float64(1.0))
}
func (self *IntegrationSuite) TestExplainsWithNonLocalAggregatorAndRegex(c *C) {
data := `
[{
"points": [
["val1", 2],
["val1", 3],
["val1", 4]
],
"name": "test_explain_non_local_regex",
"columns": ["val_1", "val_2"]
}]`
c.Assert(self.server.WriteData(data), IsNil)
bs, err := self.server.RunQuery("explain select count(val_1) from /.*test_explain_non_local_regex.*/ where time > now() - 1h", "m")
c.Assert(err, IsNil)
series := []*SerializedSeries{}
err = json.Unmarshal(bs, &series)
c.Assert(err, IsNil)
c.Assert(series, HasLen, 1)
c.Assert(series[0].Name, Equals, "explain query")
c.Assert(series[0].Columns, HasLen, 7) // 6 columns plus the time column
c.Assert(series[0].Points, HasLen, 1)
c.Assert(series[0].Points[0][1], Equals, "QueryEngine")
c.Assert(series[0].Points[0][5], Equals, float64(3.0))
c.Assert(series[0].Points[0][6], Equals, float64(1.0))
}
func (self *IntegrationSuite) TestExplainsWithLocalAggregator(c *C) {
data := `
[{
"points": [
["val1", 2],
["val1", 3],
["val1", 4]
],
"name": "test_local_aggregator",
"columns": ["val_1", "val_2"]
}]`
c.Assert(self.server.WriteData(data), IsNil)
bs, err := self.server.RunQuery("explain select count(val_1) from test_local_aggregator group by time(1h) where time > now() - 1h", "m")
c.Assert(err, IsNil)
series := []*SerializedSeries{}
err = json.Unmarshal(bs, &series)
c.Assert(err, IsNil)
c.Assert(series, HasLen, 1)
c.Assert(series[0].Name, Equals, "explain query")
c.Assert(series[0].Columns, HasLen, 7) // 6 columns plus the time column
c.Assert(series[0].Points, HasLen, 1)
c.Assert(series[0].Points[0][1], Equals, "QueryEngine")
c.Assert(series[0].Points[0][5], Equals, float64(3.0))
c.Assert(series[0].Points[0][6], Equals, float64(1.0))
}
func (self *IntegrationSuite) TestExplainsWithLocalAggregatorAndRegex(c *C) {
data := `
[
{
"points": [
["val1", 2],
["val1", 3],
["val1", 4]
],
"name": "test_local_aggregator_regex_1",
"columns": ["val_1", "val_2"]
},
{
"points": [
["val1", 2],
["val1", 3],
["val1", 4]
],
"name": "test_local_aggregator_regex_2",
"columns": ["val_1", "val_2"]
}
]`
c.Assert(self.server.WriteData(data), IsNil)
bs, err := self.server.RunQuery("explain select count(val_1) from /.*test_local_aggregator_regex.*/ group by time(1h) where time > now() - 1h", "m")
c.Assert(err, IsNil)
series := []*SerializedSeries{}
err = json.Unmarshal(bs, &series)
c.Assert(err, IsNil)
c.Assert(series, HasLen, 1)
c.Assert(series[0].Name, Equals, "explain query")
c.Assert(series[0].Columns, HasLen, 7) // 6 columns plus the time column
c.Assert(series[0].Points, HasLen, 1)
c.Assert(series[0].Points[0][1], Equals, "QueryEngine")
c.Assert(series[0].Points[0][5], Equals, float64(6.0))
c.Assert(series[0].Points[0][6], Equals, float64(2.0))
}
func (self *IntegrationSuite) TestMedians(c *C) {
for i := 0; i < 3; i++ {
err := self.server.WriteData(fmt.Sprintf(`

View File

@ -7,7 +7,7 @@
[logging]
# logging level can be one of "debug", "info", "warn" or "error"
level = "info"
level = "debug"
file = "/tmp/influxdb/test/1/influxdb.log"
# Configure the admin server

View File

@ -7,7 +7,7 @@
[logging]
# logging level can be one of "debug", "info", "warn" or "error"
level = "info"
level = "debug"
file = "/tmp/influxdb/test/2/influxdb.log"
# Configure the admin server

View File

@ -7,7 +7,7 @@
[logging]
# logging level can be one of "debug", "info", "warn" or "error"
level = "info"
level = "debug"
file = "/tmp/influxdb/test/3/influxdb.log"
# Configure the admin server

View File

@ -150,6 +150,7 @@ type SelectQuery struct {
IntoClause *IntoClause
Limit int
Ascending bool
Explain bool
}
type ListType int
@ -203,6 +204,10 @@ func (self *Query) IsListQuery() bool {
return self.ListQuery != nil
}
func (self *Query) IsExplainQuery() bool {
return self.SelectQuery != nil && self.SelectQuery.Explain
}
func (self *Query) IsListSeriesQuery() bool {
return self.ListQuery != nil && self.ListQuery.Type == Series
}
@ -219,6 +224,10 @@ func (self *SelectQuery) GetColumnNames() []*Value {
return self.ColumnNames
}
func (self *SelectQuery) IsExplainQuery() bool {
return self.Explain
}
func (self *SelectQuery) IsSinglePointQuery() bool {
w := self.GetWhereCondition()
if w == nil {
@ -617,6 +626,7 @@ func parseSelectQuery(queryString string, q *C.select_query) (*SelectQuery, erro
SelectDeleteCommonQuery: basicQuery,
Limit: int(limit),
Ascending: q.ascending != 0,
Explain: q.explain != 0,
}
// get the column names

View File

@ -31,6 +31,23 @@ func (self *QueryParserSuite) TestInvalidFromClause(c *C) {
c.Assert(err, ErrorMatches, ".*\\$undefined.*")
}
func (self *QueryParserSuite) TestInvalidExplainQueries(c *C) {
query := "explain select foo, baz group by time(1d)"
_, err := ParseQuery(query)
c.Assert(err, NotNil)
}
func (self *QueryParserSuite) TestExplainQueries(c *C) {
query := "explain select foo, bar from baz group by time(1d)"
queries, err := ParseQuery(query)
c.Assert(err, IsNil)
c.Assert(queries, HasLen, 1)
c.Assert(queries[0].SelectQuery, NotNil)
c.Assert(queries[0].SelectQuery.IsExplainQuery(), Equals, true)
}
func (self *QueryParserSuite) TestParseBasicSelectQuery(c *C) {
for _, query := range []string{
"select value from t where c = '5';",

View File

@ -58,6 +58,7 @@ static int yycolumn = 1;
"where" { BEGIN(INITIAL); return WHERE; }
"as" { return AS; }
"select" { return SELECT; }
"explain" { return EXPLAIN; }
"delete" { return DELETE; }
"drop series" { return DROP_SERIES; }
"drop" { return DROP; }

View File

@ -74,7 +74,7 @@ value *create_expression_value(char *operator, size_t size, ...) {
%lex-param {void *scanner}
// define types of tokens (terminals)
%token SELECT DELETE FROM WHERE EQUAL GROUP BY LIMIT ORDER ASC DESC MERGE INNER JOIN AS LIST SERIES INTO CONTINUOUS_QUERIES CONTINUOUS_QUERY DROP DROP_SERIES
%token SELECT DELETE FROM WHERE EQUAL GROUP BY LIMIT ORDER ASC DESC MERGE INNER JOIN AS LIST SERIES INTO CONTINUOUS_QUERIES CONTINUOUS_QUERY DROP DROP_SERIES EXPLAIN
%token <string> STRING_VALUE INT_VALUE FLOAT_VALUE TABLE_NAME SIMPLE_NAME INTO_NAME REGEX_OP
%token <string> NEGATION_REGEX_OP REGEX_STRING INSENSITIVE_REGEX_STRING DURATION
@ -105,6 +105,7 @@ value *create_expression_value(char *operator, size_t size, ...) {
%type <drop_series_query> DROP_SERIES_QUERY
%type <select_query> SELECT_QUERY
%type <drop_query> DROP_QUERY
%type <select_query> EXPLAIN_QUERY
// the initial token
%start ALL_QUERIES
@ -176,6 +177,12 @@ QUERY:
$$ = calloc(1, sizeof(query));
$$->list_continuous_queries_query = TRUE;
}
|
EXPLAIN_QUERY
{
$$ = calloc(1, sizeof(query));
$$->select_query = $1;
}
DROP_QUERY:
DROP CONTINUOUS_QUERY INT_VALUE
@ -200,6 +207,13 @@ DROP_SERIES_QUERY:
$$->name = $2;
}
EXPLAIN_QUERY:
EXPLAIN SELECT_QUERY
{
$$ = $2;
$$->explain = TRUE;
}
SELECT_QUERY:
SELECT COLUMN_NAMES FROM_CLAUSE GROUP_BY_CLAUSE WHERE_CLAUSE LIMIT_AND_ORDER_CLAUSES INTO_CLAUSE
{
@ -211,6 +225,7 @@ SELECT_QUERY:
$$->limit = $6.limit;
$$->ascending = $6.ascending;
$$->into_clause = $7;
$$->explain = FALSE;
}
|
SELECT COLUMN_NAMES FROM_CLAUSE WHERE_CLAUSE GROUP_BY_CLAUSE LIMIT_AND_ORDER_CLAUSES INTO_CLAUSE
@ -223,6 +238,7 @@ SELECT_QUERY:
$$->limit = $6.limit;
$$->ascending = $6.ascending;
$$->into_clause = $7;
$$->explain = FALSE;
}
LIMIT_AND_ORDER_CLAUSES:

View File

@ -109,6 +109,13 @@ func (self *QuerySpec) IsSinglePointQuery() bool {
return false
}
func (self *QuerySpec) IsExplainQuery() bool {
if self.query.SelectQuery != nil {
return self.query.SelectQuery.IsExplainQuery()
}
return false
}
func (self *QuerySpec) SelectQuery() *SelectQuery {
return self.query.SelectQuery
}

View File

@ -88,6 +88,7 @@ typedef struct {
condition *where_condition;
int limit;
char ascending;
char explain;
} select_query;
typedef struct {
@ -124,4 +125,3 @@ void free_error (error *error);
// this is the api that is used in GO
query parse_query(char *const query_s);
void close_query (query *q);

View File

@ -8,6 +8,9 @@ int main(int argc, char **argv) {
query q = parse_query("select count(*) from users.events group_by user_email,time(1h) where time>now()-1d;");
close_query(&q);
q = parse_query("explain select users.events group_by user_email,time(1h) where time>now()-1d;");
close_query(&q);
// test freeing on error
q = parse_query("select count(*) from users.events group_by user_email,time(1h) where time >> now()-1d;");
close_query(&q);

View File

@ -57,6 +57,7 @@ message Response {
// Access denied also serves as an end of stream response
ACCESS_DENIED = 8;
HEARTBEAT = 9;
EXPLAIN_QUERY = 10;
}
enum ErrorCode {
REQUEST_TOO_LARGE = 1;