Split the different engines in their own types

pull/893/head
John Shahid 2014-08-20 11:59:45 -04:00
parent d68a798454
commit 4383375fe8
35 changed files with 1127 additions and 1196 deletions

View File

@ -3,7 +3,7 @@ package api
import (
"github.com/influxdb/influxdb/cluster"
cmn "github.com/influxdb/influxdb/common"
"github.com/influxdb/influxdb/coordinator"
"github.com/influxdb/influxdb/engine"
"github.com/influxdb/influxdb/protocol"
)
@ -14,7 +14,7 @@ type Coordinator interface {
ForceCompaction(cmn.User) error
// Data related api
RunQuery(cmn.User, string, string, coordinator.SeriesWriter) error
RunQuery(cmn.User, string, string, engine.Processor) error
WriteSeriesData(cmn.User, string, []*protocol.Series) error
// Administration related api

View File

@ -1173,7 +1173,7 @@ func (self *HttpServer) configureDatabase(w libhttp.ResponseWriter, r *libhttp.R
}
}
for _, queryString := range databaseConfig.ContinuousQueries {
err := self.coordinator.RunQuery(u, database, queryString, nullSeriesWriter)
err := self.coordinator.RunQuery(u, database, queryString, cluster.NilProcessor{})
if err != nil {
return libhttp.StatusInternalServerError, err.Error()
}

View File

@ -16,6 +16,7 @@ import (
. "github.com/influxdb/influxdb/common"
"github.com/influxdb/influxdb/configuration"
"github.com/influxdb/influxdb/coordinator"
"github.com/influxdb/influxdb/engine"
"github.com/influxdb/influxdb/parser"
"github.com/influxdb/influxdb/protocol"
. "launchpad.net/gocheck"
@ -35,7 +36,7 @@ type ApiSuite struct {
var _ = Suite(&ApiSuite{})
func (self *MockCoordinator) RunQuery(_ User, _ string, query string, yield coordinator.SeriesWriter) error {
func (self *MockCoordinator) RunQuery(_ User, _ string, query string, yield engine.Processor) error {
if self.returnedError != nil {
return self.returnedError
}
@ -87,10 +88,11 @@ func (self *MockCoordinator) RunQuery(_ User, _ string, query string, yield coor
if err != nil {
return err
}
if err := yield.Write(series[0]); err != nil {
if _, err := yield.Yield(series[0]); err != nil {
return err
}
return yield.Write(series[1])
_, err = yield.Yield(series[1])
return err
}
type MockCoordinator struct {

View File

@ -1,5 +0,0 @@
package http
import "github.com/influxdb/influxdb/protocol"
var nullSeriesWriter = NewSeriesWriter(func(_ *protocol.Series) error { return nil })

View File

@ -14,9 +14,18 @@ func NewSeriesWriter(yield func(*protocol.Series) error) *SeriesWriter {
return &SeriesWriter{yield}
}
func (self *SeriesWriter) Write(series *protocol.Series) error {
return self.yield(series)
func (self *SeriesWriter) Yield(series *protocol.Series) (bool, error) {
err := self.yield(series)
if err != nil {
return false, err
}
return true, nil
}
func (self *SeriesWriter) Close() {
func (self *SeriesWriter) Close() error {
return nil
}
func (self *SeriesWriter) Name() string {
return "SeriesWriter"
}

View File

@ -35,7 +35,8 @@ type ServerConnection interface {
Connect()
Close()
ClearRequests()
MakeRequest(request *protocol.Request, responseStream chan *protocol.Response) error
MakeRequest(*protocol.Request, ResponseChannel) error
CancelRequest(*protocol.Request)
}
type ServerState int
@ -108,21 +109,19 @@ func (self *ClusterServer) Connect() {
self.connection.Connect()
}
func (self *ClusterServer) MakeRequest(request *protocol.Request, responseStream chan *protocol.Response) {
err := self.connection.MakeRequest(request, responseStream)
func (self *ClusterServer) MakeRequest(request *protocol.Request, responseStream chan<- *protocol.Response) {
rc := NewResponseChannelWrapper(responseStream)
err := self.connection.MakeRequest(request, rc)
if err != nil {
message := err.Error()
select {
case responseStream <- &protocol.Response{Type: &endStreamResponse, ErrorMessage: &message}:
default:
}
self.connection.CancelRequest(request)
self.markServerAsDown()
}
}
func (self *ClusterServer) Write(request *protocol.Request) error {
responseChan := make(chan *protocol.Response, 1)
err := self.connection.MakeRequest(request, responseChan)
rc := NewResponseChannelWrapper(responseChan)
err := self.connection.MakeRequest(request, rc)
if err != nil {
return err
}

21
cluster/nil_processor.go Normal file
View File

@ -0,0 +1,21 @@
package cluster
import (
"fmt"
"github.com/influxdb/influxdb/protocol"
)
type NilProcessor struct{}
func (np NilProcessor) Name() string {
return "NilProcessor"
}
func (np NilProcessor) Yield(s *protocol.Series) (bool, error) {
return false, fmt.Errorf("Shouldn't get any data")
}
func (np NilProcessor) Close() error {
return nil
}

View File

@ -0,0 +1,35 @@
package cluster
import "github.com/influxdb/influxdb/protocol"
type ResponseChannelProcessor struct {
r ResponseChannel
}
var (
QueryResponse = protocol.Response_QUERY
EndStreamResponse = protocol.Response_END_STREAM
)
func NewResponseChannelProcessor(r ResponseChannel) *ResponseChannelProcessor {
return &ResponseChannelProcessor{r}
}
func (p *ResponseChannelProcessor) Yield(s *protocol.Series) (bool, error) {
ok := p.r.Yield(&protocol.Response{
Type: &QueryResponse,
MultiSeries: []*protocol.Series{s},
})
return ok, nil
}
func (p *ResponseChannelProcessor) Close() error {
p.r.Yield(&protocol.Response{
Type: &EndStreamResponse,
})
return nil
}
func (p *ResponseChannelProcessor) Name() string {
return "ResponseChannelProcessor"
}

View File

@ -0,0 +1,16 @@
package cluster
import "github.com/influxdb/influxdb/protocol"
type ResponseChannelWrapper struct {
c chan<- *protocol.Response
}
func NewResponseChannelWrapper(c chan<- *protocol.Response) ResponseChannel {
return &ResponseChannelWrapper{c}
}
func (w *ResponseChannelWrapper) Yield(r *protocol.Response) bool {
w.c <- r
return true
}

View File

@ -0,0 +1,7 @@
package cluster
import "github.com/influxdb/influxdb/protocol"
type ResponseChannel interface {
Yield(r *protocol.Response) bool
}

View File

@ -25,26 +25,11 @@ type Shard interface {
EndTime() time.Time
Write(*p.Request) error
SyncWrite(req *p.Request, assignSeqNum bool) error
Query(querySpec *parser.QuerySpec, response chan *p.Response)
Query(querySpec *parser.QuerySpec, response chan<- *p.Response)
ReplicationFactor() int
IsMicrosecondInRange(t int64) bool
}
// Passed to a shard (local datastore or whatever) that gets yielded points from series.
type QueryProcessor interface {
// This method returns true if the query should continue. If the query should be stopped,
// like maybe the limit was hit, it should return false
YieldPoint(seriesName *string, columnNames []string, point *p.Point) bool
YieldSeries(seriesIncoming *p.Series) bool
Close()
// Set by the shard, so EXPLAIN query can know query against which shard is being measured
SetShardInfo(shardId int, shardLocal bool)
// Let QueryProcessor identify itself. What if it is a spy and we can't check that?
GetName() string
}
type NewShardData struct {
Id uint32 `json:",omitempty"`
SpaceName string
@ -112,7 +97,7 @@ var (
type LocalShardDb interface {
Write(database string, series []*p.Series) error
Query(*parser.QuerySpec, QueryProcessor) error
Query(*parser.QuerySpec, engine.Processor) error
DropFields(fields []*metastore.Field) error
IsClosed() bool
}
@ -241,7 +226,7 @@ func (self *ShardData) WriteLocalOnly(request *p.Request) error {
return nil
}
func (self *ShardData) Query(querySpec *parser.QuerySpec, response chan *p.Response) {
func (self *ShardData) Query(querySpec *parser.QuerySpec, response chan<- *p.Response) {
log.Debug("QUERY: shard %d, query '%s'", self.Id(), querySpec.GetQueryString())
defer common.RecoverFunc(querySpec.Database(), querySpec.GetQueryString(), func(err interface{}) {
response <- &p.Response{Type: &endStreamResponse, ErrorMessage: p.String(fmt.Sprintf("%s", err))}
@ -258,33 +243,30 @@ func (self *ShardData) Query(querySpec *parser.QuerySpec, response chan *p.Respo
}
if self.IsLocal {
var processor QueryProcessor
var processor engine.Processor = NewResponseChannelProcessor(NewResponseChannelWrapper(response))
var err error
if querySpec.IsListSeriesQuery() {
processor = engine.NewListSeriesEngine(response)
} else if querySpec.IsDeleteFromSeriesQuery() || querySpec.IsDropSeriesQuery() || querySpec.IsSinglePointQuery() {
if querySpec.IsDeleteFromSeriesQuery() || querySpec.IsDropSeriesQuery() || querySpec.IsSinglePointQuery() {
maxDeleteResults := 10000
processor = engine.NewPassthroughEngine(response, maxDeleteResults)
processor = engine.NewPassthroughEngine(processor, maxDeleteResults)
} else {
query := querySpec.SelectQuery()
if self.ShouldAggregateLocally(querySpec) {
log.Debug("creating a query engine")
processor, err = engine.NewQueryEngine(query, response)
processor, err = engine.NewQueryEngine(processor, query)
if err != nil {
response <- &p.Response{Type: &endStreamResponse, ErrorMessage: p.String(err.Error())}
log.Error("Error while creating engine: %s", err)
return
}
processor.SetShardInfo(int(self.Id()), self.IsLocal)
} else if query.HasAggregates() {
maxPointsToBufferBeforeSending := 1000
log.Debug("creating a passthrough engine")
processor = engine.NewPassthroughEngine(response, maxPointsToBufferBeforeSending)
processor = engine.NewPassthroughEngine(processor, maxPointsToBufferBeforeSending)
} else {
maxPointsToBufferBeforeSending := 1000
log.Debug("creating a passthrough engine with limit")
processor = engine.NewPassthroughEngineWithLimit(response, maxPointsToBufferBeforeSending, query.Limit)
processor = engine.NewPassthroughEngineWithLimit(processor, maxPointsToBufferBeforeSending, query.Limit)
}
if query.GetFromClause().Type != parser.FromClauseInnerJoin {
@ -422,35 +404,28 @@ func (self *ShardData) QueryResponseBufferSize(querySpec *parser.QuerySpec, batc
return tickCount
}
func (self *ShardData) logAndHandleDeleteQuery(querySpec *parser.QuerySpec, response chan *p.Response) {
func (self *ShardData) logAndHandleDeleteQuery(querySpec *parser.QuerySpec, response chan<- *p.Response) {
queryString := querySpec.GetQueryStringWithTimeCondition()
request := self.createRequest(querySpec)
request.Query = &queryString
self.LogAndHandleDestructiveQuery(querySpec, request, response, false)
}
func (self *ShardData) logAndHandleDropSeriesQuery(querySpec *parser.QuerySpec, response chan *p.Response) {
func (self *ShardData) logAndHandleDropSeriesQuery(querySpec *parser.QuerySpec, response chan<- *p.Response) {
self.LogAndHandleDestructiveQuery(querySpec, self.createRequest(querySpec), response, false)
}
func (self *ShardData) LogAndHandleDestructiveQuery(querySpec *parser.QuerySpec, request *p.Request, response chan *p.Response, runLocalOnly bool) {
func (self *ShardData) LogAndHandleDestructiveQuery(querySpec *parser.QuerySpec, request *p.Request, response chan<- *p.Response, runLocalOnly bool) {
self.HandleDestructiveQuery(querySpec, request, response, runLocalOnly)
}
func (self *ShardData) deleteDataLocally(querySpec *parser.QuerySpec) (<-chan *p.Response, error) {
localResponses := make(chan *p.Response, 1)
// this doesn't really apply at this point since destructive queries don't output anything, but it may later
maxPointsFromDestructiveQuery := 1000
processor := engine.NewPassthroughEngine(localResponses, maxPointsFromDestructiveQuery)
func (self *ShardData) deleteDataLocally(querySpec *parser.QuerySpec) error {
shard, err := self.store.GetOrCreateShard(self.id)
if err != nil {
return nil, err
return err
}
defer self.store.ReturnShard(self.id)
err = shard.Query(querySpec, processor)
processor.Close()
return localResponses, err
return shard.Query(querySpec, NilProcessor{})
}
func (self *ShardData) forwardRequest(request *p.Request) ([]<-chan *p.Response, []uint32, error) {
@ -468,7 +443,7 @@ func (self *ShardData) forwardRequest(request *p.Request) ([]<-chan *p.Response,
return responses, ids, nil
}
func (self *ShardData) HandleDestructiveQuery(querySpec *parser.QuerySpec, request *p.Request, response chan *p.Response, runLocalOnly bool) {
func (self *ShardData) HandleDestructiveQuery(querySpec *parser.QuerySpec, request *p.Request, response chan<- *p.Response, runLocalOnly bool) {
if !self.IsLocal && runLocalOnly {
panic("WTF islocal is false and runLocalOnly is true")
}
@ -477,15 +452,13 @@ func (self *ShardData) HandleDestructiveQuery(querySpec *parser.QuerySpec, reque
serverIds := []uint32{}
if self.IsLocal {
channel, err := self.deleteDataLocally(querySpec)
err := self.deleteDataLocally(querySpec)
if err != nil {
msg := err.Error()
response <- &p.Response{Type: &endStreamResponse, ErrorMessage: &msg}
log.Error(msg)
return
}
responseChannels = append(responseChannels, channel)
serverIds = append(serverIds, self.localServerId)
}
log.Debug("request %s, runLocalOnly: %v", request.GetDescription(), runLocalOnly)

View File

@ -8,6 +8,7 @@ import (
"testing"
"time"
"github.com/influxdb/influxdb/cluster"
"github.com/influxdb/influxdb/protocol"
. "launchpad.net/gocheck"
)
@ -21,8 +22,6 @@ const DB_DIR = "/tmp/influxdb/datastore_test"
type MockRequestHandler struct {
}
var writeOk = protocol.Response_WRITE_OK
func Test(t *testing.T) {
TestingT(t)
}
@ -35,7 +34,7 @@ func stringToSeries(seriesString string, c *C) *protocol.Series {
}
func (self *MockRequestHandler) HandleRequest(request *protocol.Request, conn net.Conn) error {
response := &protocol.Response{RequestId: request.Id, Type: &writeOk}
response := &protocol.Response{RequestId: request.Id, Type: protocol.Response_END_STREAM.Enum()}
data, _ := response.Encode()
binary.Write(conn, binary.LittleEndian, uint32(len(data)))
conn.Write(data)
@ -71,14 +70,14 @@ func (self *ClientServerSuite) TestClientCanMakeRequests(c *C) {
request := &protocol.Request{Id: &id, Type: &proxyWrite, Database: &database, MultiSeries: []*protocol.Series{series}}
time.Sleep(time.Second * 1)
err := protobufClient.MakeRequest(request, responseStream)
err := protobufClient.MakeRequest(request, cluster.NewResponseChannelWrapper(responseStream))
c.Assert(err, IsNil)
timer := time.NewTimer(time.Second)
select {
case <-timer.C:
c.Error("Timed out waiting for response")
case response := <-responseStream:
c.Assert(*response.Type, Equals, protocol.Response_WRITE_OK)
c.Assert(*response.Type, Equals, protocol.Response_END_STREAM)
}
}

View File

@ -14,9 +14,18 @@ func NewContinuousQueryWriter(yield func(*protocol.Series) error) *ContinuousQue
return &ContinuousQueryWriter{yield}
}
func (self *ContinuousQueryWriter) Write(series *protocol.Series) error {
return self.yield(series)
func (self *ContinuousQueryWriter) Yield(series *protocol.Series) (bool, error) {
err := self.yield(series)
if err != nil {
return false, err
}
return true, nil
}
func (self *ContinuousQueryWriter) Close() {
func (self *ContinuousQueryWriter) Close() error {
return nil
}
func (self *ContinuousQueryWriter) Name() string {
return "ContinuousQueryWriter"
}

View File

@ -45,7 +45,7 @@ func NewCoordinator(
return coordinator
}
func (self *Coordinator) RunQuery(user common.User, database string, queryString string, seriesWriter SeriesWriter) (err error) {
func (self *Coordinator) RunQuery(user common.User, database string, queryString string, p engine.Processor) (err error) {
log.Info("Start Query: db: %s, u: %s, q: %s", database, user.GetName(), queryString)
defer func(t time.Time) {
log.Debug("End Query: db: %s, u: %s, q: %s, t: %s", database, user.GetName(), queryString, time.Now().Sub(t))
@ -59,16 +59,17 @@ func (self *Coordinator) RunQuery(user common.User, database string, queryString
}
for _, query := range q {
err := self.runSingleQuery(user, database, query, seriesWriter)
// runSingleQuery shouldn't close the processor in case there are
// other queries to be run
err := self.runSingleQuery(user, database, query, p)
if err != nil {
return err
}
}
seriesWriter.Close()
return nil
}
func (self *Coordinator) runSingleQuery(user common.User, db string, q *parser.Query, sw SeriesWriter) error {
func (self *Coordinator) runSingleQuery(user common.User, db string, q *parser.Query, p engine.Processor) error {
querySpec := parser.NewQuerySpec(user, db, q)
if ok, err := self.permissions.CheckQueryPermissions(user, db, querySpec); !ok {
@ -80,37 +81,37 @@ func (self *Coordinator) runSingleQuery(user common.User, db string, q *parser.Q
case parser.DropContinuousQuery:
return self.runDropContinuousQuery(user, db, uint32(q.DropQuery.Id))
case parser.ListContinuousQueries:
return self.runListContinuousQueries(user, db, sw)
return self.runListContinuousQueries(user, db, p)
case parser.Continuous:
return self.runContinuousQuery(user, db, q.GetQueryString())
case parser.ListSeries:
return self.runListSeriesQuery(querySpec, sw)
return self.runListSeriesQuery(querySpec, p)
// Data queries
case parser.Delete:
return self.runDeleteQuery(querySpec, sw)
return self.runDeleteQuery(querySpec, p)
case parser.DropSeries:
return self.runDropSeriesQuery(querySpec, sw)
return self.runDropSeriesQuery(querySpec)
case parser.Select:
return self.runQuerySpec(querySpec, sw)
return self.runQuerySpec(querySpec, p)
default:
return fmt.Errorf("Can't handle query %s", qt)
}
}
func (self *Coordinator) runListContinuousQueries(user common.User, db string, sw SeriesWriter) error {
func (self *Coordinator) runListContinuousQueries(user common.User, db string, p engine.Processor) error {
queries, err := self.ListContinuousQueries(user, db)
if err != nil {
return err
}
for _, q := range queries {
if err := sw.Write(q); err != nil {
if ok, err := p.Yield(q); !ok || err != nil {
return err
}
}
return nil
}
func (self *Coordinator) runListSeriesQuery(querySpec *parser.QuerySpec, seriesWriter SeriesWriter) error {
func (self *Coordinator) runListSeriesQuery(querySpec *parser.QuerySpec, p engine.Processor) error {
allSeries := self.clusterConfiguration.MetaStore.GetSeriesForDatabase(querySpec.Database())
matchingSeries := allSeries
if q := querySpec.Query().GetListSeriesQuery(); q.HasRegex() {
@ -133,27 +134,25 @@ func (self *Coordinator) runListSeriesQuery(querySpec *parser.QuerySpec, seriesW
}
seriesResult := &protocol.Series{Name: &name, Fields: fields, Points: points}
seriesWriter.Write(seriesResult)
seriesWriter.Close()
return nil
_, err := p.Yield(seriesResult)
return err
}
func (self *Coordinator) runDeleteQuery(querySpec *parser.QuerySpec, seriesWriter SeriesWriter) error {
func (self *Coordinator) runDeleteQuery(querySpec *parser.QuerySpec, p engine.Processor) error {
if err := self.clusterConfiguration.CreateCheckpoint(); err != nil {
return err
}
querySpec.RunAgainstAllServersInShard = true
return self.runQuerySpec(querySpec, seriesWriter)
return self.runQuerySpec(querySpec, p)
}
func (self *Coordinator) runDropSeriesQuery(querySpec *parser.QuerySpec, seriesWriter SeriesWriter) error {
func (self *Coordinator) runDropSeriesQuery(querySpec *parser.QuerySpec) error {
user := querySpec.User()
db := querySpec.Database()
series := querySpec.Query().DropSeriesQuery.GetTableName()
if ok, err := self.permissions.AuthorizeDropSeries(user, db, series); !ok {
return err
}
defer seriesWriter.Close()
err := self.raftServer.DropSeries(db, series)
if err != nil {
return err
@ -209,144 +208,58 @@ func (self *Coordinator) shouldQuerySequentially(shards cluster.Shards, querySpe
return false
}
func (self *Coordinator) getShardsAndProcessor(querySpec *parser.QuerySpec, writer SeriesWriter) ([]*cluster.ShardData, cluster.QueryProcessor, <-chan bool, error) {
func (self *Coordinator) getShardsAndProcessor(querySpec *parser.QuerySpec, writer engine.Processor) ([]*cluster.ShardData, engine.Processor, error) {
shards := self.clusterConfiguration.GetShardsForQuery(querySpec)
shouldAggregateLocally := shards.ShouldAggregateLocally(querySpec)
var err error
var processor cluster.QueryProcessor
responseChan := make(chan *protocol.Response)
seriesClosed := make(chan bool)
selectQuery := querySpec.SelectQuery()
if selectQuery != nil {
if !shouldAggregateLocally {
// if we should aggregate in the coordinator (i.e. aggregation
// isn't happening locally at the shard level), create an engine
processor, err = engine.NewQueryEngine(querySpec.SelectQuery(), responseChan)
writer, err = engine.NewQueryEngine(writer, querySpec.SelectQuery())
} else {
// if we have a query with limit, then create an engine, or we can
// make the passthrough limit aware
processor = engine.NewPassthroughEngineWithLimit(responseChan, 100, selectQuery.Limit)
writer = engine.NewPassthroughEngineWithLimit(writer, 100, selectQuery.Limit)
}
} else if !shouldAggregateLocally {
processor = engine.NewPassthroughEngine(responseChan, 100)
writer = engine.NewPassthroughEngine(writer, 100)
}
if err != nil {
return nil, nil, nil, err
return nil, nil, err
}
if processor == nil {
return shards, nil, nil, nil
}
go func() {
for {
response := <-responseChan
if *response.Type == endStreamResponse || *response.Type == accessDeniedResponse {
writer.Close()
seriesClosed <- true
return
}
if !(*response.Type == queryResponse && querySpec.IsExplainQuery()) {
if response.Series != nil && len(response.Series.Points) > 0 {
writer.Write(response.Series)
}
}
}
}()
return shards, processor, seriesClosed, nil
return shards, writer, nil
}
func (self *Coordinator) readFromResponseChannels(processor cluster.QueryProcessor,
writer SeriesWriter,
isExplainQuery bool,
errors chan<- error,
responseChannels <-chan (<-chan *protocol.Response)) {
defer close(errors)
for responseChan := range responseChannels {
for response := range responseChan {
//log.Debug("GOT RESPONSE: ", response.Type, response.Series)
log.Debug("GOT RESPONSE: %v", response.Type)
if *response.Type == endStreamResponse || *response.Type == accessDeniedResponse {
if response.ErrorMessage == nil {
break
}
err := common.NewQueryError(common.InvalidArgument, *response.ErrorMessage)
log.Error("Error while executing query: %s", err)
errors <- err
return
}
if response.Series == nil || len(response.Series.Points) == 0 {
log.Debug("Series has no points, continue")
continue
}
// if we don't have a processor, yield the point to the writer
// this happens if shard took care of the query
// otherwise client will get points from passthrough engine
if processor != nil {
// if the data wasn't aggregated at the shard level, aggregate
// the data here
log.Debug("YIELDING: %d points with %d columns for %s", len(response.Series.Points), len(response.Series.Fields), response.Series.GetName())
processor.YieldSeries(response.Series)
continue
}
// If we have EXPLAIN query, we don't write actual points (of
// response.Type Query) to the client
if !(*response.Type == queryResponse && isExplainQuery) {
writer.Write(response.Series)
}
}
// once we're done with a response channel signal queryShards to
// start querying a new shard
errors <- nil
}
return
}
func (self *Coordinator) queryShards(querySpec *parser.QuerySpec, shards []*cluster.ShardData,
errors <-chan error,
responseChannels chan<- (<-chan *protocol.Response)) error {
defer close(responseChannels)
for i := 0; i < len(shards); i++ {
func (self *Coordinator) queryShards(querySpec *parser.QuerySpec, shards []*cluster.ShardData, p *MergeChannelProcessor) error {
for i, s := range shards {
// readFromResponseChannels will insert an error if an error
// occured while reading the response. This should immediately
// stop reading from shards
err := <-errors
if err != nil {
return err
}
shard := shards[i]
bufferSize := shard.QueryResponseBufferSize(querySpec, self.config.StoragePointBatchSize)
bufferSize := s.QueryResponseBufferSize(querySpec, self.config.StoragePointBatchSize)
if bufferSize > self.config.ClusterMaxResponseBufferSize {
bufferSize = self.config.ClusterMaxResponseBufferSize
}
responseChan := make(chan *protocol.Response, bufferSize)
c, err := p.NextChannel(bufferSize)
if err != nil {
return err
}
// We query shards for data and stream them to query processor
log.Debug("QUERYING: shard: %d %v", i, shard.String())
go shard.Query(querySpec, responseChan)
responseChannels <- responseChan
log.Debug("QUERYING: shard: %d %v", i, s.String())
go s.Query(querySpec, c)
}
return nil
}
// We call this function only if we have a Select query (not continuous) or Delete query
func (self *Coordinator) runQuerySpec(querySpec *parser.QuerySpec, seriesWriter SeriesWriter) error {
shards, processor, seriesClosed, err := self.getShardsAndProcessor(querySpec, seriesWriter)
func (self *Coordinator) runQuerySpec(querySpec *parser.QuerySpec, p engine.Processor) error {
shards, processor, err := self.getShardsAndProcessor(querySpec, p)
if err != nil {
return err
}
@ -355,15 +268,6 @@ func (self *Coordinator) runQuerySpec(querySpec *parser.QuerySpec, seriesWriter
return fmt.Errorf("Couldn't look up columns")
}
defer func() {
if processor != nil {
processor.Close()
<-seriesClosed
} else {
seriesWriter.Close()
}
}()
shardConcurrentLimit := self.config.ConcurrentShardQueryLimit
if self.shouldQuerySequentially(shards, querySpec) {
log.Debug("Querying shards sequentially")
@ -371,35 +275,22 @@ func (self *Coordinator) runQuerySpec(querySpec *parser.QuerySpec, seriesWriter
}
log.Debug("Shard concurrent limit: %d", shardConcurrentLimit)
errors := make(chan error, shardConcurrentLimit)
for i := 0; i < shardConcurrentLimit; i++ {
errors <- nil
}
responseChannels := make(chan (<-chan *protocol.Response), shardConcurrentLimit)
mcp := NewMergeChannelProcessor(processor, shardConcurrentLimit)
go self.readFromResponseChannels(processor, seriesWriter, querySpec.IsExplainQuery(), errors, responseChannels)
go mcp.ProcessChannels()
err = self.queryShards(querySpec, shards, errors, responseChannels)
// make sure we read the rest of the errors and responses
for _err := range errors {
if err == nil {
err = _err
}
if err := self.queryShards(querySpec, shards, mcp); err != nil {
log.Error("Error while querying shards: %s", err)
mcp.Close()
return err
}
for responsechan := range responseChannels {
for response := range responsechan {
if response.GetType() != endStreamResponse {
continue
}
if response.ErrorMessage != nil && err == nil {
err = common.NewQueryError(common.InvalidArgument, *response.ErrorMessage)
}
break
}
if err := mcp.Close(); err != nil {
log.Error("Error while querying shards: %s", err)
return err
}
return err
return processor.Close()
}
func (self *Coordinator) ForceCompaction(user common.User) error {

View File

@ -0,0 +1,104 @@
package coordinator
import (
"fmt"
"code.google.com/p/log4go"
"github.com/influxdb/influxdb/common"
"github.com/influxdb/influxdb/engine"
"github.com/influxdb/influxdb/protocol"
)
type MergeChannelProcessor struct {
next engine.Processor
c chan (<-chan *protocol.Response)
e chan error
}
func NewMergeChannelProcessor(next engine.Processor, concurrency int) *MergeChannelProcessor {
p := &MergeChannelProcessor{
next: next,
e: make(chan error, concurrency),
c: make(chan (<-chan *protocol.Response), concurrency),
}
for i := 0; i < concurrency; i++ {
p.e <- nil
}
return p
}
func (p *MergeChannelProcessor) Close() (err error) {
close(p.c)
for e := range p.e {
if e != nil {
err = e
}
}
for c := range p.c {
nextChannel:
for r := range c {
switch r.GetType() {
case protocol.Response_END_STREAM,
protocol.Response_ACCESS_DENIED,
protocol.Response_WRITE_OK,
protocol.Response_HEARTBEAT:
break nextChannel
}
}
}
return err
}
func (p *MergeChannelProcessor) NextChannel(bs int) (chan<- *protocol.Response, error) {
err := <-p.e
if err != nil {
return nil, err
}
c := make(chan *protocol.Response, bs)
p.c <- c
return c, nil
}
func (p *MergeChannelProcessor) String() string {
return fmt.Sprintf("MergeChannelProcessor (%d)", cap(p.e))
}
func (p *MergeChannelProcessor) ProcessChannels() {
defer close(p.e)
for channel := range p.c {
nextChannel:
for response := range channel {
log4go.Debug("%s received %s", p, response)
switch response.GetType() {
// all these four types end the stream
case protocol.Response_WRITE_OK,
protocol.Response_HEARTBEAT,
protocol.Response_ACCESS_DENIED,
protocol.Response_END_STREAM:
var err error
if m := response.ErrorMessage; m != nil {
err = common.NewQueryError(common.InvalidArgument, *m)
}
p.e <- err
break nextChannel
case protocol.Response_QUERY:
for _, s := range response.MultiSeries {
log4go.Debug("Yielding to %s: %s", p.next.Name(), s)
_, err := p.next.Yield(s)
if err != nil {
p.e <- err
return
}
}
}
}
}
}

View File

@ -11,6 +11,7 @@ import (
"time"
log "code.google.com/p/log4go"
"github.com/influxdb/influxdb/cluster"
"github.com/influxdb/influxdb/protocol"
)
@ -32,9 +33,9 @@ type ProtobufClient struct {
}
type runningRequest struct {
timeMade time.Time
responseChan chan *protocol.Response
request *protocol.Request
timeMade time.Time
r cluster.ResponseChannel
request *protocol.Request
}
const (
@ -91,27 +92,38 @@ func (self *ProtobufClient) ClearRequests() {
self.requestBufferLock.Lock()
defer self.requestBufferLock.Unlock()
message := "clearing all requests"
for _, req := range self.requestBuffer {
select {
case req.responseChan <- &protocol.Response{Type: &endStreamResponse, ErrorMessage: &message}:
default:
log.Debug("Cannot send response on channel")
}
self.cancelRequest(req.request)
}
self.requestBuffer = map[uint32]*runningRequest{}
}
func (self *ProtobufClient) CancelRequest(request *protocol.Request) {
self.requestBufferLock.Lock()
defer self.requestBufferLock.Unlock()
self.cancelRequest(request)
}
func (self *ProtobufClient) cancelRequest(request *protocol.Request) {
req, ok := self.requestBuffer[*request.Id]
if !ok {
return
}
message := "cancelling request"
req.r.Yield(&protocol.Response{Type: &endStreamResponse, ErrorMessage: &message})
delete(self.requestBuffer, *request.Id)
}
// Makes a request to the server. If the responseStream chan is not nil it will expect a response from the server
// with a matching request.Id. The REQUEST_RETRY_ATTEMPTS constant of 3 and the RECONNECT_RETRY_WAIT of 100ms means
// that an attempt to make a request to a downed server will take 300ms to time out.
func (self *ProtobufClient) MakeRequest(request *protocol.Request, responseStream chan *protocol.Response) error {
func (self *ProtobufClient) MakeRequest(request *protocol.Request, r cluster.ResponseChannel) error {
if request.Id == nil {
id := atomic.AddUint32(&self.lastRequestId, uint32(1))
request.Id = &id
}
if responseStream != nil {
if r != nil {
self.requestBufferLock.Lock()
// this should actually never happen. The sweeper should clear out dead requests
@ -119,9 +131,9 @@ func (self *ProtobufClient) MakeRequest(request *protocol.Request, responseStrea
if oldReq, alreadyHasRequestById := self.requestBuffer[*request.Id]; alreadyHasRequestById {
message := "already has a request with this id, must have timed out"
log.Error(message)
oldReq.responseChan <- &protocol.Response{Type: &endStreamResponse, ErrorMessage: &message}
oldReq.r.Yield(&protocol.Response{Type: &endStreamResponse, ErrorMessage: &message})
}
self.requestBuffer[*request.Id] = &runningRequest{timeMade: time.Now(), responseChan: responseStream, request: request}
self.requestBuffer[*request.Id] = &runningRequest{timeMade: time.Now(), r: r, request: request}
self.requestBufferLock.Unlock()
}
@ -197,14 +209,29 @@ func (self *ProtobufClient) sendResponse(response *protocol.Response) {
self.requestBufferLock.RLock()
req, ok := self.requestBuffer[*response.RequestId]
self.requestBufferLock.RUnlock()
if ok {
if *response.Type == protocol.Response_END_STREAM || *response.Type == protocol.Response_WRITE_OK || *response.Type == protocol.Response_HEARTBEAT || *response.Type == protocol.Response_ACCESS_DENIED {
self.requestBufferLock.Lock()
delete(self.requestBuffer, *response.RequestId)
self.requestBufferLock.Unlock()
}
req.responseChan <- response
if !ok {
return
}
switch response.GetType() {
case protocol.Response_END_STREAM,
protocol.Response_WRITE_OK,
protocol.Response_HEARTBEAT,
protocol.Response_ACCESS_DENIED:
// continue and delete the request
default:
return
}
self.requestBufferLock.Lock()
req, ok = self.requestBuffer[*response.RequestId]
delete(self.requestBuffer, *response.RequestId)
self.requestBufferLock.Unlock()
if !ok {
return
}
req.r.Yield(response)
}
func (self *ProtobufClient) reconnect() net.Conn {

View File

@ -9,6 +9,7 @@ import (
"time"
log "code.google.com/p/log4go"
"github.com/influxdb/influxdb/cluster"
"github.com/influxdb/influxdb/protocol"
)
@ -98,7 +99,8 @@ func BenchmarkSingle(b *testing.B) {
Type: &HEARTBEAT_TYPE,
Database: protocol.String(""),
}
client.MakeRequest(heartbeatRequest, responseChan)
rcw := cluster.NewResponseChannelWrapper(responseChan)
client.MakeRequest(heartbeatRequest, rcw)
<-responseChan
}
}

View File

@ -107,16 +107,12 @@ func (self *ProtobufRequestHandler) handleQuery(request *protocol.Request, conn
func (self *ProtobufRequestHandler) WriteResponse(conn net.Conn, response *protocol.Response) error {
if response.Size() >= MAX_RESPONSE_SIZE {
l := len(response.Series.Points)
firstHalfPoints := response.Series.Points[:l/2]
secondHalfPoints := response.Series.Points[l/2:]
response.Series.Points = firstHalfPoints
err := self.WriteResponse(conn, response)
f, s := splitResponse(response)
err := self.WriteResponse(conn, f)
if err != nil {
return err
}
response.Series.Points = secondHalfPoints
return self.WriteResponse(conn, response)
return self.WriteResponse(conn, s)
}
data, err := response.Encode()
@ -134,3 +130,21 @@ func (self *ProtobufRequestHandler) WriteResponse(conn net.Conn, response *proto
}
return nil
}
func splitResponse(response *protocol.Response) (f, s *protocol.Response) {
f = &protocol.Response{}
s = &protocol.Response{}
*f = *response
*s = *response
if l := len(response.MultiSeries); l > 1 {
f.MultiSeries = f.MultiSeries[:l/2]
s.MultiSeries = s.MultiSeries[l/2:]
return
}
l := len(response.MultiSeries[0].Points)
f.MultiSeries[0].Points = f.MultiSeries[0].Points[:l/2]
s.MultiSeries[0].Points = s.MultiSeries[0].Points[l/2:]
return
}

View File

@ -1,8 +0,0 @@
package coordinator
import "github.com/influxdb/influxdb/protocol"
type SeriesWriter interface {
Write(*protocol.Series) error
Close()
}

View File

@ -12,9 +12,9 @@ import (
"code.google.com/p/goprotobuf/proto"
log "code.google.com/p/log4go"
"github.com/influxdb/influxdb/cluster"
"github.com/influxdb/influxdb/common"
"github.com/influxdb/influxdb/datastore/storage"
"github.com/influxdb/influxdb/engine"
"github.com/influxdb/influxdb/metastore"
"github.com/influxdb/influxdb/parser"
"github.com/influxdb/influxdb/protocol"
@ -100,13 +100,12 @@ func (self *Shard) Write(database string, series []*protocol.Series) error {
return self.db.BatchPut(wb)
}
func (self *Shard) Query(querySpec *parser.QuerySpec, processor cluster.QueryProcessor) error {
func (self *Shard) Query(querySpec *parser.QuerySpec, processor engine.Processor) error {
self.closeLock.RLock()
defer self.closeLock.RUnlock()
if self.closed {
return fmt.Errorf("Shard is closed")
}
if querySpec.IsListSeriesQuery() {
return fmt.Errorf("List series queries should never come to the shard")
} else if querySpec.IsDeleteFromSeriesQuery() {
@ -145,7 +144,7 @@ func (self *Shard) IsClosed() bool {
return self.closed
}
func (self *Shard) executeQueryForSeries(querySpec *parser.QuerySpec, seriesName string, columns []string, processor cluster.QueryProcessor) error {
func (self *Shard) executeQueryForSeries(querySpec *parser.QuerySpec, seriesName string, columns []string, processor engine.Processor) error {
startTimeBytes := self.byteArrayForTime(querySpec.GetStartTime())
endTimeBytes := self.byteArrayForTime(querySpec.GetEndTime())
@ -167,7 +166,8 @@ func (self *Shard) executeQueryForSeries(querySpec *parser.QuerySpec, seriesName
return err
}
if len(series.Points) > 0 {
processor.YieldPoint(series.Name, series.Fields, series.Points[0])
_, err := processor.Yield(series)
return err
}
return nil
}
@ -291,9 +291,13 @@ func (self *Shard) executeQueryForSeries(querySpec *parser.QuerySpec, seriesName
Fields: fieldNames,
Points: seriesOutgoing.Points,
}
if !processor.YieldSeries(series) {
log.Debug("Stopping processing")
if ok, err := processor.Yield(series); !ok || err != nil {
log.Debug("Stopping processing.")
shouldContinue = false
if err != nil {
log.Error("Error while processing data: %v", err)
return err
}
}
}
seriesOutgoing = &protocol.Series{Name: protocol.String(seriesName), Fields: fieldNames, Points: make([]*protocol.Point, 0, self.pointBatchSize)}
@ -308,8 +312,12 @@ func (self *Shard) executeQueryForSeries(querySpec *parser.QuerySpec, seriesName
for _, alias := range aliases {
log.Debug("Final Flush %s", alias)
series := &protocol.Series{Name: protocol.String(alias), Fields: seriesOutgoing.Fields, Points: seriesOutgoing.Points}
if !processor.YieldSeries(series) {
if ok, err := processor.Yield(series); !ok || err != nil {
log.Debug("Cancelled...")
if err != nil {
log.Error("Error while processing data: %v", err)
return err
}
}
}
@ -317,7 +325,7 @@ func (self *Shard) executeQueryForSeries(querySpec *parser.QuerySpec, seriesName
return nil
}
func (self *Shard) executeDeleteQuery(querySpec *parser.QuerySpec, processor cluster.QueryProcessor) error {
func (self *Shard) executeDeleteQuery(querySpec *parser.QuerySpec, processor engine.Processor) error {
query := querySpec.DeleteQuery()
series := query.GetFromClause()
database := querySpec.Database()

400
engine/aggregator_engine.go Normal file
View File

@ -0,0 +1,400 @@
package engine
import (
"fmt"
"math"
"strings"
"time"
"github.com/influxdb/influxdb/common"
"github.com/influxdb/influxdb/parser"
"github.com/influxdb/influxdb/protocol"
)
type AggregatorEngine struct {
// query information
query *parser.SelectQuery
isAggregateQuery bool
fields []string
where *parser.WhereCondition
fillWithZero bool
// was start time set in the query, e.g. time > now() - 1d
startTimeSpecified bool
startTime int64
endTime int64
// output fields
next Processor
limiter *Limiter
// variables for aggregate queries
aggregators []Aggregator
elems []*parser.Value // group by columns other than time()
duration *time.Duration // the time by duration if any
seriesStates map[string]*SeriesState
}
func (self *AggregatorEngine) Name() string {
return "Aggregator Engine"
}
func (self *AggregatorEngine) yieldToNext(seriesIncoming *protocol.Series) (bool, error) {
if ok, err := self.next.Yield(seriesIncoming); err != nil || !ok {
return ok, err
}
if self.limiter.hitLimit(seriesIncoming.GetName()) {
return false, nil
}
return true, nil
}
func (self *AggregatorEngine) Close() error {
if self.isAggregateQuery {
if _, err := self.runAggregates(); err != nil {
return err
}
}
return self.next.Close()
}
func (self *AggregatorEngine) getTimestampFromPoint(point *protocol.Point) int64 {
return self.getTimestampBucket(uint64(*point.GetTimestampInMicroseconds()))
}
func (self *AggregatorEngine) getTimestampBucket(timestampMicroseconds uint64) int64 {
timestampMicroseconds *= 1000 // convert to nanoseconds
multiplier := uint64(*self.duration)
return int64(timestampMicroseconds / multiplier * multiplier / 1000)
}
func (self *AggregatorEngine) Yield(s *protocol.Series) (bool, error) {
if len(s.Points) == 0 {
return true, nil
}
return self.aggregateValuesForSeries(s)
}
func (self *AggregatorEngine) initializeFields() {
for _, aggregator := range self.aggregators {
columnNames := aggregator.ColumnNames()
self.fields = append(self.fields, columnNames...)
}
if self.elems == nil {
return
}
for _, value := range self.elems {
tempName := value.Name
self.fields = append(self.fields, tempName)
}
}
var _count = 0
func (self *AggregatorEngine) getSeriesState(name string) *SeriesState {
state := self.seriesStates[name]
if state == nil {
levels := len(self.elems)
if self.duration != nil && self.fillWithZero {
levels++
}
state = &SeriesState{
started: false,
trie: NewTrie(levels, len(self.aggregators)),
lastTimestamp: 0,
pointsRange: &PointRange{math.MaxInt64, math.MinInt64},
}
self.seriesStates[name] = state
}
return state
}
// We have three types of queries:
// 1. time() without fill
// 2. time() with fill
// 3. no time()
//
// For (1) we flush as soon as a new bucket start, the prefix tree
// keeps track of the other group by columns without the time
// bucket. We reset the trie once the series is yielded. For (2), we
// keep track of all group by columns with time being the last level
// in the prefix tree. At the end of the query we step through [start
// time, end time] in self.duration steps and get the state from the
// prefix tree, using default values for groups without state in the
// prefix tree. For the last case we keep the groups in the prefix
// tree and on close() we loop through the groups and flush their
// values with a timestamp equal to now()
func (self *AggregatorEngine) aggregateValuesForSeries(series *protocol.Series) (bool, error) {
for _, aggregator := range self.aggregators {
if err := aggregator.InitializeFieldsMetadata(series); err != nil {
return false, err
}
}
seriesState := self.getSeriesState(series.GetName())
currentRange := seriesState.pointsRange
includeTimestampInGroup := self.duration != nil && self.fillWithZero
var group []*protocol.FieldValue
if !includeTimestampInGroup {
group = make([]*protocol.FieldValue, len(self.elems))
} else {
group = make([]*protocol.FieldValue, len(self.elems)+1)
}
for _, point := range series.Points {
currentRange.UpdateRange(point)
// this is a groupby with time() and no fill, flush as soon as we
// start a new bucket
if self.duration != nil && !self.fillWithZero {
timestamp := self.getTimestampFromPoint(point)
// this is the timestamp aggregator
if seriesState.started && seriesState.lastTimestamp != timestamp {
self.runAggregatesForTable(series.GetName())
}
seriesState.lastTimestamp = timestamp
seriesState.started = true
}
// get the group this point belongs to
for idx, elem := range self.elems {
// TODO: create an index from fieldname to index
value, err := GetValue(elem, series.Fields, point)
if err != nil {
return false, err
}
group[idx] = value
}
// if this is a fill() query, add the timestamp at the end
if includeTimestampInGroup {
timestamp := self.getTimestampFromPoint(point)
group[len(self.elems)] = &protocol.FieldValue{Int64Value: protocol.Int64(timestamp)}
}
// update the state of the given group
node := seriesState.trie.GetNode(group)
var err error
for idx, aggregator := range self.aggregators {
node.states[idx], err = aggregator.AggregatePoint(node.states[idx], point)
if err != nil {
return false, err
}
}
}
return true, nil
}
func (self *AggregatorEngine) runAggregates() (bool, error) {
for t := range self.seriesStates {
if ok, err := self.runAggregatesForTable(t); !ok || err != nil {
return ok, err
}
}
return true, nil
}
func (self *AggregatorEngine) calculateSummariesForTable(table string) {
trie := self.getSeriesState(table).trie
err := trie.Traverse(func(group []*protocol.FieldValue, node *Node) error {
for idx, aggregator := range self.aggregators {
aggregator.CalculateSummaries(node.states[idx])
}
return nil
})
if err != nil {
panic("Error while calculating summaries")
}
}
func (self *AggregatorEngine) runAggregatesForTable(table string) (bool, error) {
// TODO: if this is a fill query, step through [start,end] in duration
// steps and flush the groups for the given bucket
self.calculateSummariesForTable(table)
state := self.getSeriesState(table)
trie := state.trie
points := make([]*protocol.Point, 0, trie.CountLeafNodes())
f := func(group []*protocol.FieldValue, node *Node) error {
points = append(points, self.getValuesForGroup(table, group, node)...)
return nil
}
var err error
if self.duration != nil && self.fillWithZero {
timestampRange := state.pointsRange
if self.startTimeSpecified {
timestampRange = &PointRange{startTime: self.startTime, endTime: self.endTime}
}
// TODO: DRY this
if self.query.Ascending {
bucket := self.getTimestampBucket(uint64(timestampRange.startTime))
for bucket <= timestampRange.endTime {
timestamp := &protocol.FieldValue{Int64Value: protocol.Int64(bucket)}
defaultChildNode := &Node{states: make([]interface{}, len(self.aggregators))}
err = trie.TraverseLevel(len(self.elems), func(v []*protocol.FieldValue, node *Node) error {
childNode := node.GetChildNode(timestamp)
if childNode == nil {
childNode = defaultChildNode
}
return f(append(v, timestamp), childNode)
})
bucket += self.duration.Nanoseconds() / 1000
}
} else {
bucket := self.getTimestampBucket(uint64(timestampRange.endTime))
for {
timestamp := &protocol.FieldValue{Int64Value: protocol.Int64(bucket)}
defaultChildNode := &Node{states: make([]interface{}, len(self.aggregators))}
err = trie.TraverseLevel(len(self.elems), func(v []*protocol.FieldValue, node *Node) error {
childNode := node.GetChildNode(timestamp)
if childNode == nil {
childNode = defaultChildNode
}
return f(append(v, timestamp), childNode)
})
if bucket <= timestampRange.startTime {
break
}
bucket -= self.duration.Nanoseconds() / 1000
}
}
} else {
err = trie.Traverse(f)
}
if err != nil {
panic(err)
}
trie.Clear()
return self.yieldToNext(&protocol.Series{
Name: &table,
Fields: self.fields,
Points: points,
})
}
func (self *AggregatorEngine) getValuesForGroup(table string, group []*protocol.FieldValue, node *Node) []*protocol.Point {
values := [][][]*protocol.FieldValue{}
var timestamp int64
useTimestamp := false
if self.duration != nil && !self.fillWithZero {
// if there's a group by time(), then the timestamp is the lastTimestamp
timestamp = self.getSeriesState(table).lastTimestamp
useTimestamp = true
} else if self.duration != nil && self.fillWithZero {
// if there's no group by time(), but a fill value was specified,
// the timestamp is the last value in the group
timestamp = group[len(group)-1].GetInt64Value()
useTimestamp = true
}
for idx, aggregator := range self.aggregators {
values = append(values, aggregator.GetValues(node.states[idx]))
node.states[idx] = nil
}
// do cross product of all the values
var _values [][]*protocol.FieldValue
if len(values) == 1 {
_values = values[0]
} else {
_values = crossProduct(values)
}
points := []*protocol.Point{}
for _, v := range _values {
/* groupPoints := []*protocol.Point{} */
point := &protocol.Point{
Values: v,
}
if useTimestamp {
point.SetTimestampInMicroseconds(timestamp)
} else {
point.SetTimestampInMicroseconds(0)
}
// FIXME: this should be looking at the fields slice not the group by clause
// FIXME: we should check whether the selected columns are in the group by clause
for idx := range self.elems {
point.Values = append(point.Values, group[idx])
}
points = append(points, point)
}
return points
}
func (self *AggregatorEngine) init() error {
duration, err := self.query.GetGroupByClause().GetGroupByTime()
if err != nil {
return err
}
self.isAggregateQuery = true
self.duration = duration
self.aggregators = []Aggregator{}
for _, value := range self.query.GetColumnNames() {
if !value.IsFunctionCall() {
continue
}
lowerCaseName := strings.ToLower(value.Name)
initializer := registeredAggregators[lowerCaseName]
if initializer == nil {
return common.NewQueryError(common.InvalidArgument, fmt.Sprintf("Unknown function %s", value.Name))
}
aggregator, err := initializer(self.query, value, self.query.GetGroupByClause().FillValue)
if err != nil {
return common.NewQueryError(common.InvalidArgument, fmt.Sprintf("%s", err))
}
self.aggregators = append(self.aggregators, aggregator)
}
for _, elem := range self.query.GetGroupByClause().Elems {
if elem.IsFunctionCall() {
continue
}
self.elems = append(self.elems, elem)
}
self.fillWithZero = self.query.GetGroupByClause().FillWithZero
// This is a special case for issue #426. If the start time is
// specified and there's a group by clause and fill with zero, then
// we need to fill the entire range from start time to end time
if self.query.IsStartTimeSpecified() && self.duration != nil && self.fillWithZero {
self.startTimeSpecified = true
self.startTime = self.query.GetStartTime().Truncate(*self.duration).UnixNano() / 1000
self.endTime = self.query.GetEndTime().Truncate(*self.duration).UnixNano() / 1000
}
self.initializeFields()
return nil
}
func NewAggregatorEngine(query *parser.SelectQuery, next Processor) (*AggregatorEngine, error) {
queryEngine := &AggregatorEngine{
query: query,
where: query.GetWhereCondition(),
next: next,
limiter: NewLimiter(query.Limit),
// stats stuff
duration: nil,
seriesStates: make(map[string]*SeriesState),
}
return queryEngine, queryEngine.init()
}

View File

@ -0,0 +1,80 @@
package engine
import (
"strconv"
log "code.google.com/p/log4go"
"github.com/influxdb/influxdb/parser"
"github.com/influxdb/influxdb/protocol"
)
type ArithmeticEngine struct {
next Processor
names map[string]*parser.Value
}
func NewArithmeticEngine(query *parser.SelectQuery, next Processor) (*ArithmeticEngine, error) {
names := map[string]*parser.Value{}
for idx, v := range query.GetColumnNames() {
switch v.Type {
case parser.ValueSimpleName:
names[v.Name] = v
case parser.ValueFunctionCall:
names[v.Name] = v
case parser.ValueExpression:
if v.Alias != "" {
names[v.Alias] = v
} else {
names["expr"+strconv.Itoa(idx)] = v
}
}
}
return &ArithmeticEngine{
next: next,
names: names,
}, nil
}
func (ae *ArithmeticEngine) Yield(s *protocol.Series) (bool, error) {
if len(s.Points) == 0 {
return ae.next.Yield(s)
}
newSeries := &protocol.Series{
Name: s.Name,
}
// create the new column names
for name := range ae.names {
newSeries.Fields = append(newSeries.Fields, name)
}
for _, point := range s.Points {
newPoint := &protocol.Point{
Timestamp: point.Timestamp,
SequenceNumber: point.SequenceNumber,
}
for _, field := range newSeries.Fields {
value := ae.names[field]
v, err := GetValue(value, s.Fields, point)
if err != nil {
log.Error("Error in arithmetic computation: %s", err)
return false, err
}
newPoint.Values = append(newPoint.Values, v)
}
newSeries.Points = append(newSeries.Points, newPoint)
}
return ae.next.Yield(newSeries)
}
func (self *ArithmeticEngine) Close() error {
return self.next.Close()
}
func (self *ArithmeticEngine) Name() string {
return "Arithmetic Engine"
}

View File

@ -1,78 +1,6 @@
package engine
import (
"github.com/influxdb/influxdb/parser"
"github.com/influxdb/influxdb/protocol"
)
func getJoinYield(query *parser.SelectQuery, yield func(*protocol.Series) error) func(*protocol.Series) error {
var lastPoint1 *protocol.Point
var lastFields1 []string
var lastPoint2 *protocol.Point
var lastFields2 []string
table1 := query.GetFromClause().Names[0].GetAlias()
table2 := query.GetFromClause().Names[1].GetAlias()
name := table1 + "_join_" + table2
return mergeYield(table1, table2, false, query.Ascending, func(s *protocol.Series) error {
if *s.Name == table1 {
lastPoint1 = s.Points[len(s.Points)-1]
if lastFields1 == nil {
for _, f := range s.Fields {
lastFields1 = append(lastFields1, table1+"."+f)
}
}
}
if *s.Name == table2 {
lastPoint2 = s.Points[len(s.Points)-1]
if lastFields2 == nil {
for _, f := range s.Fields {
lastFields2 = append(lastFields2, table2+"."+f)
}
}
}
if lastPoint1 == nil || lastPoint2 == nil {
return nil
}
newSeries := &protocol.Series{
Name: &name,
Fields: append(lastFields1, lastFields2...),
Points: []*protocol.Point{
{
Values: append(lastPoint1.Values, lastPoint2.Values...),
Timestamp: lastPoint2.Timestamp,
},
},
}
lastPoint1 = nil
lastPoint2 = nil
filteredSeries, _ := Filter(query, newSeries)
if len(filteredSeries.Points) > 0 {
return yield(newSeries)
}
return nil
})
}
func getMergeYield(table1, table2 string, ascending bool, yield func(*protocol.Series) error) func(*protocol.Series) error {
name := table1 + "_merge_" + table2
return mergeYield(table1, table2, true, ascending, func(s *protocol.Series) error {
oldName := s.Name
s.Name = &name
s.Fields = append(s.Fields, "_orig_series")
for _, p := range s.Points {
p.Values = append(p.Values, &protocol.FieldValue{StringValue: oldName})
}
return yield(s)
})
}
import "github.com/influxdb/influxdb/protocol"
type seriesMergeState struct {
name string
@ -93,16 +21,16 @@ func isLater(first *seriesMergeState, other *seriesMergeState) bool {
return *first.series[0].Points[0].Timestamp > *other.series[0].Points[0].Timestamp
}
func (self *seriesMergeState) flush(state *mergeState, yield func(*protocol.Series) error) error {
func (self *seriesMergeState) flush(state *CommonMergeEngine) (bool, error) {
for _, s := range self.series {
s := state.mergeColumnsInSeries(s)
err := yield(s)
if err != nil {
return err
ok, err := state.next.Yield(s)
if !ok || err != nil {
return ok, err
}
}
self.series = nil
return nil
return true, nil
}
// update the state, the points belong to this seriesMergeState (i.e. the name of the timeseries matches)
@ -124,7 +52,8 @@ func (self *seriesMergeState) updateState(p *protocol.Series) {
}
}
type mergeState struct {
type CommonMergeEngine struct {
next Processor
leftGoFirst func(_, _ *seriesMergeState) bool
fields map[string]int
left *seriesMergeState
@ -136,7 +65,7 @@ type mergeState struct {
// the order of the null values match the order of the field
// definitions, i.e. left timeseries first followed by values from the
// right timeseries
func (self *mergeState) mergeColumnsInSeries(s *protocol.Series) *protocol.Series {
func (self *CommonMergeEngine) mergeColumnsInSeries(s *protocol.Series) *protocol.Series {
if !self.mergeColumns {
return s
}
@ -168,7 +97,7 @@ func (self *mergeState) mergeColumnsInSeries(s *protocol.Series) *protocol.Serie
// the order of the null values match the order of the field
// definitions, i.e. left timeseries first followed by values from the
// right timeseries
func (self *mergeState) getFields() []string {
func (self *CommonMergeEngine) getFields() []string {
fields := make([]string, len(self.fields))
for f, i := range self.fields {
fields[i] = f
@ -176,7 +105,7 @@ func (self *mergeState) getFields() []string {
return fields
}
func (self *mergeState) yieldNextPoints(yield func(*protocol.Series) error) error {
func (self *CommonMergeEngine) yieldNextPoints() (bool, error) {
// see which point should be returned next and remove it from the
// series
for self.left.hasPoints() && self.right.hasPoints() {
@ -188,29 +117,33 @@ func (self *mergeState) yieldNextPoints(yield func(*protocol.Series) error) erro
}
s := next.removeAndGetFirstPoint()
err := yield(self.mergeColumnsInSeries(s))
if err != nil {
return err
ok, err := self.next.Yield(self.mergeColumnsInSeries(s))
if err != nil || !ok {
return ok, err
}
}
return nil
return true, nil
}
// if `other` state is done (i.e. we'll receive no more points for its
// timeseries) then we know that we won't get any points that are
// older than what's in `self` so we can safely flush all `self`
// points.
func (self *mergeState) flushIfNecessary(yield func(*protocol.Series) error) error {
func (self *CommonMergeEngine) flushIfNecessary() (bool, error) {
if self.left.done && len(self.left.series) == 0 {
self.right.flush(self, yield)
if ok, err := self.right.flush(self); err != nil || !ok {
return ok, err
}
}
if self.right.done && len(self.right.series) == 0 {
self.left.flush(self, yield)
if ok, err := self.left.flush(self); err != nil || !ok {
return ok, err
}
}
return nil
return true, nil
}
func (self *mergeState) updateState(p *protocol.Series) {
func (self *CommonMergeEngine) updateState(p *protocol.Series) {
self.left.updateState(p)
self.right.updateState(p)
@ -249,7 +182,7 @@ func (self *seriesMergeState) removeAndGetFirstPoint() *protocol.Series {
// returns a yield function that will sort points from table1 and
// table2 no matter what the order in which they are received.
func mergeYield(table1, table2 string, mergeColumns bool, ascending bool, yield func(*protocol.Series) error) func(*protocol.Series) error {
func NewCommonMergeEngine(table1, table2 string, mergeColumns bool, ascending bool, next Processor) *CommonMergeEngine {
state1 := &seriesMergeState{
name: table1,
}
@ -262,21 +195,31 @@ func mergeYield(table1, table2 string, mergeColumns bool, ascending bool, yield
whoGoFirst = isLater
}
state := &mergeState{
return &CommonMergeEngine{
next: next,
left: state1,
right: state2,
leftGoFirst: whoGoFirst,
mergeColumns: mergeColumns,
}
return func(p *protocol.Series) error {
state.updateState(p)
if err := state.flushIfNecessary(yield); err != nil {
return err
}
return state.yieldNextPoints(yield)
}
}
func (e *CommonMergeEngine) Close() error {
e.Yield(&protocol.Series{Name: &e.left.name, Fields: []string{}})
e.Yield(&protocol.Series{Name: &e.right.name, Fields: []string{}})
return e.next.Close()
}
func (e *CommonMergeEngine) Name() string {
return "CommonMergeEngine"
}
func (e *CommonMergeEngine) Yield(s *protocol.Series) (bool, error) {
e.updateState(s)
if ok, err := e.flushIfNecessary(); !ok || err != nil {
return ok, err
}
return e.yieldNextPoints()
}

5
engine/constants.go Normal file
View File

@ -0,0 +1,5 @@
package engine
import "github.com/influxdb/influxdb/protocol"
var queryResponse = protocol.Response_QUERY

View File

@ -1,14 +1,6 @@
package engine
import (
"fmt"
"math"
"strconv"
"strings"
"time"
log "code.google.com/p/log4go"
"github.com/influxdb/influxdb/common"
"github.com/influxdb/influxdb/parser"
"github.com/influxdb/influxdb/protocol"
)
@ -24,256 +16,33 @@ type SeriesState struct {
lastTimestamp int64
}
type QueryEngine struct {
// query information
query *parser.SelectQuery
isAggregateQuery bool
fields []string
where *parser.WhereCondition
fillWithZero bool
// was start time set in the query, e.g. time > now() - 1d
startTimeSpecified bool
startTime int64
endTime int64
// output fields
responseChan chan *protocol.Response
limiter *Limiter
seriesToPoints map[string]*protocol.Series
yield func(*protocol.Series) error
aggregateYield func(*protocol.Series) error
// variables for aggregate queries
aggregators []Aggregator
elems []*parser.Value // group by columns other than time()
duration *time.Duration // the time by duration if any
seriesStates map[string]*SeriesState
// query statistics
explain bool
runStartTime float64
runEndTime float64
pointsRead int64
pointsWritten int64
shardId int
shardLocal bool
}
var (
endStreamResponse = protocol.Response_END_STREAM
explainQueryResponse = protocol.Response_EXPLAIN_QUERY
)
const (
POINT_BATCH_SIZE = 64
)
// distribute query and possibly do the merge/join before yielding the points
func (self *QueryEngine) distributeQuery(query *parser.SelectQuery, yield func(*protocol.Series) error) error {
// see if this is a merge query
fromClause := query.GetFromClause()
if fromClause.Type == parser.FromClauseMerge {
yield = getMergeYield(fromClause.Names[0].Name.Name, fromClause.Names[1].Name.Name, query.Ascending, yield)
}
if fromClause.Type == parser.FromClauseInnerJoin {
yield = getJoinYield(query, yield)
}
self.yield = yield
return nil
}
func NewQueryEngine(query *parser.SelectQuery, responseChan chan *protocol.Response) (*QueryEngine, error) {
func NewQueryEngine(next Processor, query *parser.SelectQuery) (Processor, error) {
limit := query.Limit
queryEngine := &QueryEngine{
query: query,
where: query.GetWhereCondition(),
limiter: NewLimiter(limit),
responseChan: responseChan,
seriesToPoints: make(map[string]*protocol.Series),
// stats stuff
explain: query.IsExplainQuery(),
runStartTime: 0,
runEndTime: 0,
pointsRead: 0,
pointsWritten: 0,
shardId: 0,
shardLocal: false, //that really doesn't matter if it is not EXPLAIN query
duration: nil,
seriesStates: make(map[string]*SeriesState),
}
if queryEngine.explain {
queryEngine.runStartTime = float64(time.Now().UnixNano()) / float64(time.Millisecond)
}
yield := func(series *protocol.Series) error {
var response *protocol.Response
queryEngine.limiter.calculateLimitAndSlicePoints(series)
if len(series.Points) == 0 {
return nil
}
if queryEngine.explain {
//TODO: We may not have to send points, just count them
queryEngine.pointsWritten += int64(len(series.Points))
}
response = &protocol.Response{Type: &queryResponse, Series: series}
responseChan <- response
return nil
}
var engine Processor = NewPassthroughEngineWithLimit(next, 1, limit)
var err error
if query.HasAggregates() {
err = queryEngine.executeCountQueryWithGroupBy(query, yield)
engine, err = NewAggregatorEngine(query, engine)
} else if containsArithmeticOperators(query) {
err = queryEngine.executeArithmeticQuery(query, yield)
} else {
err = queryEngine.distributeQuery(query, yield)
engine, err = NewArithmeticEngine(query, engine)
}
fromClause := query.GetFromClause()
if fromClause.Type == parser.FromClauseMerge {
engine = NewMergeEngine(fromClause.Names[0].Name.Name, fromClause.Names[1].Name.Name, query.Ascending, engine)
} else if fromClause.Type == parser.FromClauseInnerJoin {
engine = NewJoinEngine(query, engine)
}
if err != nil {
return nil, err
}
return queryEngine, nil
}
// Shard will call this method for EXPLAIN query
func (self *QueryEngine) SetShardInfo(shardId int, shardLocal bool) {
self.shardId = shardId
self.shardLocal = shardLocal
}
// Returns false if the query should be stopped (either because of limit or error)
func (self *QueryEngine) YieldPoint(seriesName *string, fieldNames []string, point *protocol.Point) (shouldContinue bool) {
shouldContinue = true
series := self.seriesToPoints[*seriesName]
if series == nil {
series = &protocol.Series{Name: protocol.String(*seriesName), Fields: fieldNames, Points: make([]*protocol.Point, 0, POINT_BATCH_SIZE)}
self.seriesToPoints[*seriesName] = series
} else if len(series.Points) >= POINT_BATCH_SIZE {
shouldContinue = self.yieldSeriesData(series)
series = &protocol.Series{Name: protocol.String(*seriesName), Fields: fieldNames, Points: make([]*protocol.Point, 0, POINT_BATCH_SIZE)}
self.seriesToPoints[*seriesName] = series
}
series.Points = append(series.Points, point)
if self.explain {
self.pointsRead++
}
return shouldContinue
}
func (self *QueryEngine) YieldSeries(seriesIncoming *protocol.Series) (shouldContinue bool) {
if self.explain {
self.pointsRead += int64(len(seriesIncoming.Points))
}
seriesName := seriesIncoming.GetName()
self.seriesToPoints[seriesName] = &protocol.Series{Name: &seriesName, Fields: seriesIncoming.Fields}
return self.yieldSeriesData(seriesIncoming) && !self.limiter.hitLimit(seriesIncoming.GetName())
}
func (self *QueryEngine) yieldSeriesData(series *protocol.Series) bool {
err := self.yield(series)
if err != nil {
log.Error(err)
return false
}
return true
}
func (self *QueryEngine) Close() {
for _, series := range self.seriesToPoints {
if len(series.Points) == 0 {
continue
}
self.yieldSeriesData(series)
}
var err error
for _, series := range self.seriesToPoints {
s := &protocol.Series{
Name: series.Name,
Fields: series.Fields,
}
err = self.yield(s)
if err != nil {
break
}
}
// make sure we yield an empty series for series without points
fromClause := self.query.GetFromClause()
if fromClause.Type == parser.FromClauseMerge {
for _, s := range []string{fromClause.Names[0].Name.Name, fromClause.Names[1].Name.Name} {
if _, ok := self.seriesToPoints[s]; ok {
continue
}
err := self.yield(&protocol.Series{
Name: &s,
Fields: []string{},
})
if err != nil {
log.Error("Error while closing engine: %s", err)
}
}
}
if self.isAggregateQuery {
self.runAggregates()
}
if self.explain {
self.runEndTime = float64(time.Now().UnixNano()) / float64(time.Millisecond)
log.Debug("QueryEngine: %.3f R:%d W:%d", self.runEndTime-self.runStartTime, self.pointsRead, self.pointsWritten)
self.SendQueryStats()
}
response := &protocol.Response{Type: &endStreamResponse}
if err != nil {
message := err.Error()
response.ErrorMessage = &message
}
self.responseChan <- response
}
func (self *QueryEngine) SendQueryStats() {
timestamp := time.Now().UnixNano() / int64(time.Microsecond)
runTime := self.runEndTime - self.runStartTime
points := []*protocol.Point{}
pointsRead := self.pointsRead
pointsWritten := self.pointsWritten
shardId := int64(self.shardId)
shardLocal := self.shardLocal
engineName := "QueryEngine"
point := &protocol.Point{
Values: []*protocol.FieldValue{
{StringValue: &engineName},
{Int64Value: &shardId},
{BoolValue: &shardLocal},
{DoubleValue: &runTime},
{Int64Value: &pointsRead},
{Int64Value: &pointsWritten},
},
Timestamp: &timestamp,
}
points = append(points, point)
seriesName := "explain query"
series := &protocol.Series{
Name: &seriesName,
Fields: []string{"engine_name", "shard_id", "shard_local", "run_time", "points_read", "points_written"},
Points: points,
}
response := &protocol.Response{Type: &explainQueryResponse, Series: series}
self.responseChan <- response
return engine, nil
}
func containsArithmeticOperators(query *parser.SelectQuery) bool {
@ -285,31 +54,6 @@ func containsArithmeticOperators(query *parser.SelectQuery) bool {
return false
}
func (self *QueryEngine) getTimestampFromPoint(point *protocol.Point) int64 {
return self.getTimestampBucket(uint64(*point.GetTimestampInMicroseconds()))
}
func (self *QueryEngine) getTimestampBucket(timestampMicroseconds uint64) int64 {
timestampMicroseconds *= 1000 // convert to nanoseconds
multiplier := uint64(*self.duration)
return int64(timestampMicroseconds / multiplier * multiplier / 1000)
}
type PointRange struct {
startTime int64
endTime int64
}
func (self *PointRange) UpdateRange(point *protocol.Point) {
timestamp := *point.GetTimestampInMicroseconds()
if timestamp < self.startTime {
self.startTime = timestamp
}
if timestamp > self.endTime {
self.endTime = timestamp
}
}
func crossProduct(values [][][]*protocol.FieldValue) [][]*protocol.FieldValue {
if len(values) == 0 {
return [][]*protocol.FieldValue{{}}
@ -324,378 +68,3 @@ func crossProduct(values [][][]*protocol.FieldValue) [][]*protocol.FieldValue {
}
return returnValues
}
func (self *QueryEngine) executeCountQueryWithGroupBy(query *parser.SelectQuery, yield func(*protocol.Series) error) error {
self.aggregateYield = yield
duration, err := query.GetGroupByClause().GetGroupByTime()
if err != nil {
return err
}
self.isAggregateQuery = true
self.duration = duration
self.aggregators = []Aggregator{}
for _, value := range query.GetColumnNames() {
if !value.IsFunctionCall() {
continue
}
lowerCaseName := strings.ToLower(value.Name)
initializer := registeredAggregators[lowerCaseName]
if initializer == nil {
return common.NewQueryError(common.InvalidArgument, fmt.Sprintf("Unknown function %s", value.Name))
}
aggregator, err := initializer(query, value, query.GetGroupByClause().FillValue)
if err != nil {
return common.NewQueryError(common.InvalidArgument, fmt.Sprintf("%s", err))
}
self.aggregators = append(self.aggregators, aggregator)
}
for _, elem := range query.GetGroupByClause().Elems {
if elem.IsFunctionCall() {
continue
}
self.elems = append(self.elems, elem)
}
self.fillWithZero = query.GetGroupByClause().FillWithZero
// This is a special case for issue #426. If the start time is
// specified and there's a group by clause and fill with zero, then
// we need to fill the entire range from start time to end time
if query.IsStartTimeSpecified() && self.duration != nil && self.fillWithZero {
self.startTimeSpecified = true
self.startTime = query.GetStartTime().Truncate(*self.duration).UnixNano() / 1000
self.endTime = query.GetEndTime().Truncate(*self.duration).UnixNano() / 1000
}
self.initializeFields()
err = self.distributeQuery(query, func(series *protocol.Series) error {
if len(series.Points) == 0 {
return nil
}
return self.aggregateValuesForSeries(series)
})
return err
}
func (self *QueryEngine) initializeFields() {
for _, aggregator := range self.aggregators {
columnNames := aggregator.ColumnNames()
self.fields = append(self.fields, columnNames...)
}
if self.elems == nil {
return
}
for _, value := range self.elems {
tempName := value.Name
self.fields = append(self.fields, tempName)
}
}
var _count = 0
func (self *QueryEngine) getSeriesState(name string) *SeriesState {
state := self.seriesStates[name]
if state == nil {
levels := len(self.elems)
if self.duration != nil && self.fillWithZero {
levels++
}
state = &SeriesState{
started: false,
trie: NewTrie(levels, len(self.aggregators)),
lastTimestamp: 0,
pointsRange: &PointRange{math.MaxInt64, math.MinInt64},
}
self.seriesStates[name] = state
}
return state
}
// We have three types of queries:
// 1. time() without fill
// 2. time() with fill
// 3. no time()
//
// For (1) we flush as soon as a new bucket start, the prefix tree
// keeps track of the other group by columns without the time
// bucket. We reset the trie once the series is yielded. For (2), we
// keep track of all group by columns with time being the last level
// in the prefix tree. At the end of the query we step through [start
// time, end time] in self.duration steps and get the state from the
// prefix tree, using default values for groups without state in the
// prefix tree. For the last case we keep the groups in the prefix
// tree and on close() we loop through the groups and flush their
// values with a timestamp equal to now()
func (self *QueryEngine) aggregateValuesForSeries(series *protocol.Series) error {
for _, aggregator := range self.aggregators {
if err := aggregator.InitializeFieldsMetadata(series); err != nil {
return err
}
}
seriesState := self.getSeriesState(series.GetName())
currentRange := seriesState.pointsRange
includeTimestampInGroup := self.duration != nil && self.fillWithZero
var group []*protocol.FieldValue
if !includeTimestampInGroup {
group = make([]*protocol.FieldValue, len(self.elems))
} else {
group = make([]*protocol.FieldValue, len(self.elems)+1)
}
for _, point := range series.Points {
currentRange.UpdateRange(point)
// this is a groupby with time() and no fill, flush as soon as we
// start a new bucket
if self.duration != nil && !self.fillWithZero {
timestamp := self.getTimestampFromPoint(point)
// this is the timestamp aggregator
if seriesState.started && seriesState.lastTimestamp != timestamp {
self.runAggregatesForTable(series.GetName())
}
seriesState.lastTimestamp = timestamp
seriesState.started = true
}
// get the group this point belongs to
for idx, elem := range self.elems {
// TODO: create an index from fieldname to index
value, err := GetValue(elem, series.Fields, point)
if err != nil {
return err
}
group[idx] = value
}
// if this is a fill() query, add the timestamp at the end
if includeTimestampInGroup {
timestamp := self.getTimestampFromPoint(point)
group[len(self.elems)] = &protocol.FieldValue{Int64Value: protocol.Int64(timestamp)}
}
// update the state of the given group
node := seriesState.trie.GetNode(group)
var err error
for idx, aggregator := range self.aggregators {
node.states[idx], err = aggregator.AggregatePoint(node.states[idx], point)
if err != nil {
return err
}
}
}
return nil
}
func (self *QueryEngine) runAggregates() {
for t := range self.seriesStates {
self.runAggregatesForTable(t)
}
}
func (self *QueryEngine) calculateSummariesForTable(table string) {
trie := self.getSeriesState(table).trie
err := trie.Traverse(func(group []*protocol.FieldValue, node *Node) error {
for idx, aggregator := range self.aggregators {
aggregator.CalculateSummaries(node.states[idx])
}
return nil
})
if err != nil {
panic("Error while calculating summaries")
}
}
func (self *QueryEngine) runAggregatesForTable(table string) {
// TODO: if this is a fill query, step through [start,end] in duration
// steps and flush the groups for the given bucket
self.calculateSummariesForTable(table)
state := self.getSeriesState(table)
trie := state.trie
points := make([]*protocol.Point, 0, trie.CountLeafNodes())
f := func(group []*protocol.FieldValue, node *Node) error {
points = append(points, self.getValuesForGroup(table, group, node)...)
return nil
}
var err error
if self.duration != nil && self.fillWithZero {
timestampRange := state.pointsRange
if self.startTimeSpecified {
timestampRange = &PointRange{startTime: self.startTime, endTime: self.endTime}
}
// TODO: DRY this
if self.query.Ascending {
bucket := self.getTimestampBucket(uint64(timestampRange.startTime))
for bucket <= timestampRange.endTime {
timestamp := &protocol.FieldValue{Int64Value: protocol.Int64(bucket)}
defaultChildNode := &Node{states: make([]interface{}, len(self.aggregators))}
err = trie.TraverseLevel(len(self.elems), func(v []*protocol.FieldValue, node *Node) error {
childNode := node.GetChildNode(timestamp)
if childNode == nil {
childNode = defaultChildNode
}
return f(append(v, timestamp), childNode)
})
bucket += self.duration.Nanoseconds() / 1000
}
} else {
bucket := self.getTimestampBucket(uint64(timestampRange.endTime))
for {
timestamp := &protocol.FieldValue{Int64Value: protocol.Int64(bucket)}
defaultChildNode := &Node{states: make([]interface{}, len(self.aggregators))}
err = trie.TraverseLevel(len(self.elems), func(v []*protocol.FieldValue, node *Node) error {
childNode := node.GetChildNode(timestamp)
if childNode == nil {
childNode = defaultChildNode
}
return f(append(v, timestamp), childNode)
})
if bucket <= timestampRange.startTime {
break
}
bucket -= self.duration.Nanoseconds() / 1000
}
}
} else {
err = trie.Traverse(f)
}
if err != nil {
panic(err)
}
trie.Clear()
self.aggregateYield(&protocol.Series{
Name: &table,
Fields: self.fields,
Points: points,
})
}
func (self *QueryEngine) getValuesForGroup(table string, group []*protocol.FieldValue, node *Node) []*protocol.Point {
values := [][][]*protocol.FieldValue{}
var timestamp int64
useTimestamp := false
if self.duration != nil && !self.fillWithZero {
// if there's a group by time(), then the timestamp is the lastTimestamp
timestamp = self.getSeriesState(table).lastTimestamp
useTimestamp = true
} else if self.duration != nil && self.fillWithZero {
// if there's no group by time(), but a fill value was specified,
// the timestamp is the last value in the group
timestamp = group[len(group)-1].GetInt64Value()
useTimestamp = true
}
for idx, aggregator := range self.aggregators {
values = append(values, aggregator.GetValues(node.states[idx]))
node.states[idx] = nil
}
// do cross product of all the values
var _values [][]*protocol.FieldValue
if len(values) == 1 {
_values = values[0]
} else {
_values = crossProduct(values)
}
points := []*protocol.Point{}
for _, v := range _values {
/* groupPoints := []*protocol.Point{} */
point := &protocol.Point{
Values: v,
}
if useTimestamp {
point.SetTimestampInMicroseconds(timestamp)
} else {
point.SetTimestampInMicroseconds(0)
}
// FIXME: this should be looking at the fields slice not the group by clause
// FIXME: we should check whether the selected columns are in the group by clause
for idx := range self.elems {
point.Values = append(point.Values, group[idx])
}
points = append(points, point)
}
return points
}
func (self *QueryEngine) executeArithmeticQuery(query *parser.SelectQuery, yield func(*protocol.Series) error) error {
names := map[string]*parser.Value{}
for idx, v := range query.GetColumnNames() {
switch v.Type {
case parser.ValueSimpleName:
names[v.Name] = v
case parser.ValueFunctionCall:
names[v.Name] = v
case parser.ValueExpression:
if v.Alias != "" {
names[v.Alias] = v
} else {
names["expr"+strconv.Itoa(idx)] = v
}
}
}
return self.distributeQuery(query, func(series *protocol.Series) error {
if len(series.Points) == 0 {
yield(series)
return nil
}
newSeries := &protocol.Series{
Name: series.Name,
}
// create the new column names
for name := range names {
newSeries.Fields = append(newSeries.Fields, name)
}
for _, point := range series.Points {
newPoint := &protocol.Point{
Timestamp: point.Timestamp,
SequenceNumber: point.SequenceNumber,
}
for _, field := range newSeries.Fields {
value := names[field]
v, err := GetValue(value, series.Fields, point)
if err != nil {
log.Error("Error in arithmetic computation: %s", err)
return err
}
newPoint.Values = append(newPoint.Values, v)
}
newSeries.Points = append(newSeries.Points, newPoint)
}
yield(newSeries)
return nil
})
}
func (self *QueryEngine) GetName() string {
return "QueryEngine"
}

View File

@ -1,54 +1,42 @@
package engine
import (
log "code.google.com/p/log4go"
"fmt"
"github.com/influxdb/influxdb/parser"
p "github.com/influxdb/influxdb/protocol"
)
type FilteringEngine struct {
query *parser.SelectQuery
processor QueryProcessor
processor Processor
shouldFilter bool
}
func NewFilteringEngine(query *parser.SelectQuery, processor QueryProcessor) *FilteringEngine {
func NewFilteringEngine(query *parser.SelectQuery, processor Processor) *FilteringEngine {
shouldFilter := query.GetWhereCondition() != nil
return &FilteringEngine{query, processor, shouldFilter}
}
// optimize for yield series and use it here
func (self *FilteringEngine) YieldPoint(seriesName *string, columnNames []string, point *p.Point) bool {
return self.YieldSeries(&p.Series{
Name: seriesName,
Fields: columnNames,
Points: []*p.Point{point},
})
}
func (self *FilteringEngine) YieldSeries(seriesIncoming *p.Series) bool {
func (self *FilteringEngine) Yield(seriesIncoming *p.Series) (bool, error) {
if !self.shouldFilter {
return self.processor.YieldSeries(seriesIncoming)
return self.processor.Yield(seriesIncoming)
}
series, err := Filter(self.query, seriesIncoming)
if err != nil {
log.Error("Error while filtering points: %s [query = %s]", err, self.query.GetQueryString())
return false
return false, fmt.Errorf("Error while filtering points: %s [query = %s]", err, self.query.GetQueryString())
}
if len(series.Points) == 0 {
return true
return true, nil
}
return self.processor.YieldSeries(series)
return self.processor.Yield(series)
}
func (self *FilteringEngine) Close() {
self.processor.Close()
func (self *FilteringEngine) Close() error {
return self.processor.Close()
}
func (self *FilteringEngine) SetShardInfo(shardId int, shardLocal bool) {
self.processor.SetShardInfo(shardId, shardLocal)
}
func (self *FilteringEngine) GetName() string {
return self.processor.GetName()
func (self *FilteringEngine) Name() string {
return self.processor.Name()
}

85
engine/join_engine.go Normal file
View File

@ -0,0 +1,85 @@
package engine
import (
"github.com/influxdb/influxdb/parser"
"github.com/influxdb/influxdb/protocol"
)
type JoinEngine struct {
query *parser.SelectQuery
next Processor
table1, table2 string
name string // the output table name
lastPoint1, lastPoint2 *protocol.Point
lastFields1, lastFields2 []string
}
func NewJoinEngine(query *parser.SelectQuery, next Processor) Processor {
table1 := query.GetFromClause().Names[0].GetAlias()
table2 := query.GetFromClause().Names[1].GetAlias()
name := table1 + "_join_" + table2
joinEngine := &JoinEngine{
next: next,
name: name,
table1: table1,
table2: table2,
query: query,
}
mergeEngine := NewCommonMergeEngine(table1, table2, false, query.Ascending, joinEngine)
return mergeEngine
}
func (_ *JoinEngine) SetShardInfo(shardId int, shardLocal bool) {}
func (je *JoinEngine) Name() string {
return "JoinEngine"
}
func (je *JoinEngine) Close() error {
return je.next.Close()
}
func (je *JoinEngine) Yield(s *protocol.Series) (bool, error) {
if *s.Name == je.table1 {
je.lastPoint1 = s.Points[len(s.Points)-1]
if je.lastFields1 == nil {
for _, f := range s.Fields {
je.lastFields1 = append(je.lastFields1, je.table1+"."+f)
}
}
}
if *s.Name == je.table2 {
je.lastPoint2 = s.Points[len(s.Points)-1]
if je.lastFields2 == nil {
for _, f := range s.Fields {
je.lastFields2 = append(je.lastFields2, je.table2+"."+f)
}
}
}
if je.lastPoint1 == nil || je.lastPoint2 == nil {
return true, nil
}
newSeries := &protocol.Series{
Name: &je.name,
Fields: append(je.lastFields1, je.lastFields2...),
Points: []*protocol.Point{
{
Values: append(je.lastPoint1.Values, je.lastPoint2.Values...),
Timestamp: je.lastPoint2.Timestamp,
},
},
}
je.lastPoint1 = nil
je.lastPoint2 = nil
filteredSeries, _ := Filter(je.query, newSeries)
if len(filteredSeries.Points) > 0 {
return je.next.Yield(newSeries)
}
return true, nil
}

View File

@ -1,70 +0,0 @@
package engine
import (
"github.com/influxdb/influxdb/protocol"
)
const (
MAX_SERIES_IN_RESPONSE = 10000
)
var (
queryResponse = protocol.Response_QUERY
)
type ListSeriesEngine struct {
responseChan chan *protocol.Response
response *protocol.Response
}
func NewListSeriesEngine(responseChan chan *protocol.Response) *ListSeriesEngine {
response := &protocol.Response{
Type: &queryResponse,
MultiSeries: make([]*protocol.Series, 0),
}
return &ListSeriesEngine{
responseChan: responseChan,
response: response,
}
}
func (self *ListSeriesEngine) YieldPoint(seriesName *string, columnNames []string, point *protocol.Point) bool {
if len(self.response.MultiSeries) > MAX_SERIES_IN_RESPONSE {
self.responseChan <- self.response
self.response = &protocol.Response{
Type: &queryResponse,
MultiSeries: make([]*protocol.Series, 0),
}
}
self.response.MultiSeries = append(self.response.MultiSeries, &protocol.Series{Name: seriesName})
return true
}
func (self *ListSeriesEngine) YieldSeries(seriesIncoming *protocol.Series) bool {
if len(self.response.MultiSeries) > MAX_SERIES_IN_RESPONSE {
self.responseChan <- self.response
self.response = &protocol.Response{
Type: &queryResponse,
MultiSeries: make([]*protocol.Series, 0),
}
}
self.response.MultiSeries = append(self.response.MultiSeries, &protocol.Series{Name: seriesIncoming.Name})
return true
}
func (self *ListSeriesEngine) Close() {
if len(self.response.MultiSeries) > 0 {
self.responseChan <- self.response
}
response := &protocol.Response{Type: &endStreamResponse}
self.responseChan <- response
}
func (self *ListSeriesEngine) SetShardInfo(shardId int, shardLocal bool) {
//EXPLAIN doens't work with this query
}
func (self *ListSeriesEngine) GetName() string {
return "ListSeriesEngine"
}

36
engine/merge_engine.go Normal file
View File

@ -0,0 +1,36 @@
package engine
import "github.com/influxdb/influxdb/protocol"
type MergeEngine struct {
name string
next Processor
}
func NewMergeEngine(table1, table2 string, ascending bool, next Processor) Processor {
name := table1 + "_merge_" + table2
me := &MergeEngine{name: name, next: next}
return NewCommonMergeEngine(table1, table2, true, ascending, me)
}
func (me *MergeEngine) Yield(s *protocol.Series) (bool, error) {
oldName := s.Name
s.Name = &me.name
s.Fields = append(s.Fields, "_orig_series")
for _, p := range s.Points {
p.Values = append(p.Values, &protocol.FieldValue{StringValue: oldName})
}
return me.next.Yield(s)
}
func (_ *MergeEngine) SetShardInfo(shardId int, shardLocal bool) {}
func (me *MergeEngine) Close() error {
return me.next.Close()
}
func (me *MergeEngine) Name() string {
return "MergeEngine"
}

View File

@ -8,12 +8,11 @@ import (
"github.com/influxdb/influxdb/protocol"
)
type PassthroughEngine struct {
responseChan chan *protocol.Response
response *protocol.Response
type Passthrough struct {
next Processor
series *protocol.Series
maxPointsInResponse int
limiter *Limiter
responseType *protocol.Response_Type
// query statistics
runStartTime float64
@ -24,16 +23,15 @@ type PassthroughEngine struct {
shardLocal bool
}
func NewPassthroughEngine(responseChan chan *protocol.Response, maxPointsInResponse int) *PassthroughEngine {
return NewPassthroughEngineWithLimit(responseChan, maxPointsInResponse, 0)
func NewPassthroughEngine(next Processor, maxPointsInResponse int) *Passthrough {
return NewPassthroughEngineWithLimit(next, maxPointsInResponse, 0)
}
func NewPassthroughEngineWithLimit(responseChan chan *protocol.Response, maxPointsInResponse, limit int) *PassthroughEngine {
passthroughEngine := &PassthroughEngine{
responseChan: responseChan,
func NewPassthroughEngineWithLimit(next Processor, maxPointsInResponse, limit int) *Passthrough {
passthroughEngine := &Passthrough{
next: next,
maxPointsInResponse: maxPointsInResponse,
limiter: NewLimiter(limit),
responseType: &queryResponse,
runStartTime: 0,
runEndTime: 0,
pointsRead: 0,
@ -45,62 +43,44 @@ func NewPassthroughEngineWithLimit(responseChan chan *protocol.Response, maxPoin
return passthroughEngine
}
func (self *PassthroughEngine) YieldPoint(seriesName *string, columnNames []string, point *protocol.Point) bool {
series := &protocol.Series{Name: seriesName, Points: []*protocol.Point{point}, Fields: columnNames}
return self.YieldSeries(series)
}
func (self *PassthroughEngine) YieldSeries(seriesIncoming *protocol.Series) bool {
func (self *Passthrough) Yield(seriesIncoming *protocol.Series) (bool, error) {
log.Debug("PassthroughEngine YieldSeries %d", len(seriesIncoming.Points))
if *seriesIncoming.Name == "explain query" {
self.responseType = &explainQueryResponse
log.Debug("Response Changed!")
} else {
self.responseType = &queryResponse
}
self.limiter.calculateLimitAndSlicePoints(seriesIncoming)
if len(seriesIncoming.Points) == 0 {
log.Debug("Not sent == 0")
return false
return false, nil
}
if self.response == nil {
self.response = &protocol.Response{
Type: self.responseType,
Series: seriesIncoming,
if self.series == nil {
self.series = seriesIncoming
} else if self.series.GetName() != seriesIncoming.GetName() {
ok, err := self.next.Yield(self.series)
if !ok || err != nil {
return ok, err
}
} else if self.response.Series.GetName() != seriesIncoming.GetName() {
self.responseChan <- self.response
self.response = &protocol.Response{
Type: self.responseType,
Series: seriesIncoming,
}
} else if len(self.response.Series.Points) > self.maxPointsInResponse {
self.responseChan <- self.response
self.response = &protocol.Response{
Type: self.responseType,
Series: seriesIncoming,
self.series = seriesIncoming
} else if len(self.series.Points) > self.maxPointsInResponse {
ok, err := self.next.Yield(self.series)
if !ok || err != nil {
return ok, err
}
self.series = seriesIncoming
} else {
self.response.Series = common.MergeSeries(self.response.Series, seriesIncoming)
self.series = common.MergeSeries(self.series, seriesIncoming)
}
return !self.limiter.hitLimit(seriesIncoming.GetName())
//return true
return !self.limiter.hitLimit(seriesIncoming.GetName()), nil
}
func (self *PassthroughEngine) Close() {
if self.response != nil && self.response.Series != nil && self.response.Series.Name != nil {
self.responseChan <- self.response
func (self *Passthrough) Close() error {
if self.series != nil && self.series.Name != nil {
_, err := self.next.Yield(self.series)
if err != nil {
return err
}
}
response := &protocol.Response{Type: &endStreamResponse}
self.responseChan <- response
return self.next.Close()
}
func (self *PassthroughEngine) SetShardInfo(shardId int, shardLocal bool) {
//EXPLAIN doens't really work with this query (yet ?)
}
func (self *PassthroughEngine) GetName() string {
func (self *Passthrough) Name() string {
return "PassthroughEngine"
}

18
engine/point_range.go Normal file
View File

@ -0,0 +1,18 @@
package engine
import "github.com/influxdb/influxdb/protocol"
type PointRange struct {
startTime int64
endTime int64
}
func (self *PointRange) UpdateRange(point *protocol.Point) {
timestamp := *point.GetTimestampInMicroseconds()
if timestamp < self.startTime {
self.startTime = timestamp
}
if timestamp > self.endTime {
self.endTime = timestamp
}
}

16
engine/processor.go Normal file
View File

@ -0,0 +1,16 @@
package engine
import "github.com/influxdb/influxdb/protocol"
// Passed to a shard (local datastore or whatever) that gets yielded points from series.
type Processor interface {
// (true, nil) if the query should continue. False if processing
// should stop, because of an error in which case error isn't nil or
// because the desired data was read succesfully and no more data is
// needed.
Yield(s *protocol.Series) (bool, error)
Name() string
// Flush any data that could be in the queue
Close() error
}

View File

@ -1,19 +0,0 @@
package engine
import (
p "github.com/influxdb/influxdb/protocol"
)
type QueryProcessor interface {
// This method returns true if the query should continue. If the query should be stopped,
// like maybe the limit was hit, it should return false
YieldPoint(seriesName *string, columnNames []string, point *p.Point) bool
YieldSeries(seriesIncoming *p.Series) bool
Close()
// Set by the shard, so EXPLAIN query can know query against which shard is being measured
SetShardInfo(shardId int, shardLocal bool)
// Let QueryProcessor identify itself. What if it is a spy and we can't check that?
GetName() string
}

View File

@ -1049,9 +1049,7 @@ func (self *DataTestSuite) FilterWithInvalidCondition(c *C) (Fun, Fun) {
data := CreatePoints("test_invalid_where_condition", 1, 1)
client.WriteData(data, c)
}, func(client Client) {
data := client.RunQuery("select * from test_invalid_where_condition where column0 > 0.1s", c, "m")
// TODO: this should return an error
c.Assert(data, HasLen, 0)
client.RunInvalidQuery("select * from test_invalid_where_condition where column0 > 0.1s", c, "m")
}
}

View File

@ -52,9 +52,9 @@ message Response {
QUERY = 1;
WRITE_OK = 2;
END_STREAM = 3;
REPLICATION_REPLAY = 4;
REPLICATION_REPLAY_END = 5;
SEQUENCE_NUMBER = 7;
// REPLICATION_REPLAY = 4;
// REPLICATION_REPLAY_END = 5;
// SEQUENCE_NUMBER = 7;
// Access denied also serves as an end of stream response
ACCESS_DENIED = 8;
HEARTBEAT = 9;
@ -66,10 +66,9 @@ message Response {
}
required Type type = 1;
required uint32 request_id = 2;
optional Series series = 3;
repeated Series multi_series = 3;
optional ErrorCode error_code = 4;
optional string error_message = 5;
optional int64 nextPointTime = 6;
optional Request request = 7;
repeated Series multi_series = 8;
}