From 4383375fe8f0bc36be06b2c4573220d958218075 Mon Sep 17 00:00:00 2001 From: John Shahid Date: Wed, 20 Aug 2014 11:59:45 -0400 Subject: [PATCH] Split the different engines in their own types --- api/coordinator.go | 4 +- api/http/api.go | 2 +- api/http/api_test.go | 8 +- api/http/null_series_writer.go | 5 - api/http/series_writer.go | 15 +- cluster/cluster_server.go | 17 +- cluster/nil_processor.go | 21 + cluster/response_channel_processor.go | 35 ++ cluster/response_channel_wrapper.go | 16 + cluster/response_processor.go | 7 + cluster/shard.go | 61 +- coordinator/client_server_test.go | 9 +- coordinator/continuous_query_writer.go | 15 +- coordinator/coordinator.go | 201 ++---- coordinator/merge_channel_processor.go | 104 ++++ coordinator/protobuf_client.go | 67 +- coordinator/protobuf_client_test.go | 4 +- coordinator/protobuf_request_handler.go | 28 +- coordinator/series_writer.go | 8 - datastore/shard.go | 26 +- engine/aggregator_engine.go | 400 ++++++++++++ engine/arithmetic_engine.go | 80 +++ engine/{merge.go => common_merge_engine.go} | 151 ++--- engine/constants.go | 5 + engine/engine.go | 655 +------------------- engine/filtering_engine.go | 38 +- engine/join_engine.go | 85 +++ engine/list_series_engine.go | 70 --- engine/merge_engine.go | 36 ++ engine/passthrough_engine.go | 84 +-- engine/point_range.go | 18 + engine/processor.go | 16 + engine/query_processor.go | 19 - integration/data_test.go | 4 +- protocol/protocol.proto | 9 +- 35 files changed, 1127 insertions(+), 1196 deletions(-) delete mode 100644 api/http/null_series_writer.go create mode 100644 cluster/nil_processor.go create mode 100644 cluster/response_channel_processor.go create mode 100644 cluster/response_channel_wrapper.go create mode 100644 cluster/response_processor.go create mode 100644 coordinator/merge_channel_processor.go delete mode 100644 coordinator/series_writer.go create mode 100644 engine/aggregator_engine.go create mode 100644 engine/arithmetic_engine.go rename engine/{merge.go => common_merge_engine.go} (59%) create mode 100644 engine/constants.go create mode 100644 engine/join_engine.go delete mode 100644 engine/list_series_engine.go create mode 100644 engine/merge_engine.go create mode 100644 engine/point_range.go create mode 100644 engine/processor.go delete mode 100644 engine/query_processor.go diff --git a/api/coordinator.go b/api/coordinator.go index 6cc6b8760c..b9d8dd9015 100644 --- a/api/coordinator.go +++ b/api/coordinator.go @@ -3,7 +3,7 @@ package api import ( "github.com/influxdb/influxdb/cluster" cmn "github.com/influxdb/influxdb/common" - "github.com/influxdb/influxdb/coordinator" + "github.com/influxdb/influxdb/engine" "github.com/influxdb/influxdb/protocol" ) @@ -14,7 +14,7 @@ type Coordinator interface { ForceCompaction(cmn.User) error // Data related api - RunQuery(cmn.User, string, string, coordinator.SeriesWriter) error + RunQuery(cmn.User, string, string, engine.Processor) error WriteSeriesData(cmn.User, string, []*protocol.Series) error // Administration related api diff --git a/api/http/api.go b/api/http/api.go index 34c97061e3..e25153da64 100644 --- a/api/http/api.go +++ b/api/http/api.go @@ -1173,7 +1173,7 @@ func (self *HttpServer) configureDatabase(w libhttp.ResponseWriter, r *libhttp.R } } for _, queryString := range databaseConfig.ContinuousQueries { - err := self.coordinator.RunQuery(u, database, queryString, nullSeriesWriter) + err := self.coordinator.RunQuery(u, database, queryString, cluster.NilProcessor{}) if err != nil { return libhttp.StatusInternalServerError, err.Error() } diff --git a/api/http/api_test.go b/api/http/api_test.go index db15f38468..bc54002730 100644 --- a/api/http/api_test.go +++ b/api/http/api_test.go @@ -16,6 +16,7 @@ import ( . "github.com/influxdb/influxdb/common" "github.com/influxdb/influxdb/configuration" "github.com/influxdb/influxdb/coordinator" + "github.com/influxdb/influxdb/engine" "github.com/influxdb/influxdb/parser" "github.com/influxdb/influxdb/protocol" . "launchpad.net/gocheck" @@ -35,7 +36,7 @@ type ApiSuite struct { var _ = Suite(&ApiSuite{}) -func (self *MockCoordinator) RunQuery(_ User, _ string, query string, yield coordinator.SeriesWriter) error { +func (self *MockCoordinator) RunQuery(_ User, _ string, query string, yield engine.Processor) error { if self.returnedError != nil { return self.returnedError } @@ -87,10 +88,11 @@ func (self *MockCoordinator) RunQuery(_ User, _ string, query string, yield coor if err != nil { return err } - if err := yield.Write(series[0]); err != nil { + if _, err := yield.Yield(series[0]); err != nil { return err } - return yield.Write(series[1]) + _, err = yield.Yield(series[1]) + return err } type MockCoordinator struct { diff --git a/api/http/null_series_writer.go b/api/http/null_series_writer.go deleted file mode 100644 index 53d8559891..0000000000 --- a/api/http/null_series_writer.go +++ /dev/null @@ -1,5 +0,0 @@ -package http - -import "github.com/influxdb/influxdb/protocol" - -var nullSeriesWriter = NewSeriesWriter(func(_ *protocol.Series) error { return nil }) diff --git a/api/http/series_writer.go b/api/http/series_writer.go index 60758e36b9..0ac529d9a2 100644 --- a/api/http/series_writer.go +++ b/api/http/series_writer.go @@ -14,9 +14,18 @@ func NewSeriesWriter(yield func(*protocol.Series) error) *SeriesWriter { return &SeriesWriter{yield} } -func (self *SeriesWriter) Write(series *protocol.Series) error { - return self.yield(series) +func (self *SeriesWriter) Yield(series *protocol.Series) (bool, error) { + err := self.yield(series) + if err != nil { + return false, err + } + return true, nil } -func (self *SeriesWriter) Close() { +func (self *SeriesWriter) Close() error { + return nil +} + +func (self *SeriesWriter) Name() string { + return "SeriesWriter" } diff --git a/cluster/cluster_server.go b/cluster/cluster_server.go index 007679eb41..17cb870cdf 100644 --- a/cluster/cluster_server.go +++ b/cluster/cluster_server.go @@ -35,7 +35,8 @@ type ServerConnection interface { Connect() Close() ClearRequests() - MakeRequest(request *protocol.Request, responseStream chan *protocol.Response) error + MakeRequest(*protocol.Request, ResponseChannel) error + CancelRequest(*protocol.Request) } type ServerState int @@ -108,21 +109,19 @@ func (self *ClusterServer) Connect() { self.connection.Connect() } -func (self *ClusterServer) MakeRequest(request *protocol.Request, responseStream chan *protocol.Response) { - err := self.connection.MakeRequest(request, responseStream) +func (self *ClusterServer) MakeRequest(request *protocol.Request, responseStream chan<- *protocol.Response) { + rc := NewResponseChannelWrapper(responseStream) + err := self.connection.MakeRequest(request, rc) if err != nil { - message := err.Error() - select { - case responseStream <- &protocol.Response{Type: &endStreamResponse, ErrorMessage: &message}: - default: - } + self.connection.CancelRequest(request) self.markServerAsDown() } } func (self *ClusterServer) Write(request *protocol.Request) error { responseChan := make(chan *protocol.Response, 1) - err := self.connection.MakeRequest(request, responseChan) + rc := NewResponseChannelWrapper(responseChan) + err := self.connection.MakeRequest(request, rc) if err != nil { return err } diff --git a/cluster/nil_processor.go b/cluster/nil_processor.go new file mode 100644 index 0000000000..cf4cb607aa --- /dev/null +++ b/cluster/nil_processor.go @@ -0,0 +1,21 @@ +package cluster + +import ( + "fmt" + + "github.com/influxdb/influxdb/protocol" +) + +type NilProcessor struct{} + +func (np NilProcessor) Name() string { + return "NilProcessor" +} + +func (np NilProcessor) Yield(s *protocol.Series) (bool, error) { + return false, fmt.Errorf("Shouldn't get any data") +} + +func (np NilProcessor) Close() error { + return nil +} diff --git a/cluster/response_channel_processor.go b/cluster/response_channel_processor.go new file mode 100644 index 0000000000..f760fdd802 --- /dev/null +++ b/cluster/response_channel_processor.go @@ -0,0 +1,35 @@ +package cluster + +import "github.com/influxdb/influxdb/protocol" + +type ResponseChannelProcessor struct { + r ResponseChannel +} + +var ( + QueryResponse = protocol.Response_QUERY + EndStreamResponse = protocol.Response_END_STREAM +) + +func NewResponseChannelProcessor(r ResponseChannel) *ResponseChannelProcessor { + return &ResponseChannelProcessor{r} +} + +func (p *ResponseChannelProcessor) Yield(s *protocol.Series) (bool, error) { + ok := p.r.Yield(&protocol.Response{ + Type: &QueryResponse, + MultiSeries: []*protocol.Series{s}, + }) + return ok, nil +} + +func (p *ResponseChannelProcessor) Close() error { + p.r.Yield(&protocol.Response{ + Type: &EndStreamResponse, + }) + return nil +} + +func (p *ResponseChannelProcessor) Name() string { + return "ResponseChannelProcessor" +} diff --git a/cluster/response_channel_wrapper.go b/cluster/response_channel_wrapper.go new file mode 100644 index 0000000000..74e7d00c03 --- /dev/null +++ b/cluster/response_channel_wrapper.go @@ -0,0 +1,16 @@ +package cluster + +import "github.com/influxdb/influxdb/protocol" + +type ResponseChannelWrapper struct { + c chan<- *protocol.Response +} + +func NewResponseChannelWrapper(c chan<- *protocol.Response) ResponseChannel { + return &ResponseChannelWrapper{c} +} + +func (w *ResponseChannelWrapper) Yield(r *protocol.Response) bool { + w.c <- r + return true +} diff --git a/cluster/response_processor.go b/cluster/response_processor.go new file mode 100644 index 0000000000..e59aff553d --- /dev/null +++ b/cluster/response_processor.go @@ -0,0 +1,7 @@ +package cluster + +import "github.com/influxdb/influxdb/protocol" + +type ResponseChannel interface { + Yield(r *protocol.Response) bool +} diff --git a/cluster/shard.go b/cluster/shard.go index 6e1a92b583..57ef73ea96 100644 --- a/cluster/shard.go +++ b/cluster/shard.go @@ -25,26 +25,11 @@ type Shard interface { EndTime() time.Time Write(*p.Request) error SyncWrite(req *p.Request, assignSeqNum bool) error - Query(querySpec *parser.QuerySpec, response chan *p.Response) + Query(querySpec *parser.QuerySpec, response chan<- *p.Response) ReplicationFactor() int IsMicrosecondInRange(t int64) bool } -// Passed to a shard (local datastore or whatever) that gets yielded points from series. -type QueryProcessor interface { - // This method returns true if the query should continue. If the query should be stopped, - // like maybe the limit was hit, it should return false - YieldPoint(seriesName *string, columnNames []string, point *p.Point) bool - YieldSeries(seriesIncoming *p.Series) bool - Close() - - // Set by the shard, so EXPLAIN query can know query against which shard is being measured - SetShardInfo(shardId int, shardLocal bool) - - // Let QueryProcessor identify itself. What if it is a spy and we can't check that? - GetName() string -} - type NewShardData struct { Id uint32 `json:",omitempty"` SpaceName string @@ -112,7 +97,7 @@ var ( type LocalShardDb interface { Write(database string, series []*p.Series) error - Query(*parser.QuerySpec, QueryProcessor) error + Query(*parser.QuerySpec, engine.Processor) error DropFields(fields []*metastore.Field) error IsClosed() bool } @@ -241,7 +226,7 @@ func (self *ShardData) WriteLocalOnly(request *p.Request) error { return nil } -func (self *ShardData) Query(querySpec *parser.QuerySpec, response chan *p.Response) { +func (self *ShardData) Query(querySpec *parser.QuerySpec, response chan<- *p.Response) { log.Debug("QUERY: shard %d, query '%s'", self.Id(), querySpec.GetQueryString()) defer common.RecoverFunc(querySpec.Database(), querySpec.GetQueryString(), func(err interface{}) { response <- &p.Response{Type: &endStreamResponse, ErrorMessage: p.String(fmt.Sprintf("%s", err))} @@ -258,33 +243,30 @@ func (self *ShardData) Query(querySpec *parser.QuerySpec, response chan *p.Respo } if self.IsLocal { - var processor QueryProcessor + var processor engine.Processor = NewResponseChannelProcessor(NewResponseChannelWrapper(response)) var err error - if querySpec.IsListSeriesQuery() { - processor = engine.NewListSeriesEngine(response) - } else if querySpec.IsDeleteFromSeriesQuery() || querySpec.IsDropSeriesQuery() || querySpec.IsSinglePointQuery() { + if querySpec.IsDeleteFromSeriesQuery() || querySpec.IsDropSeriesQuery() || querySpec.IsSinglePointQuery() { maxDeleteResults := 10000 - processor = engine.NewPassthroughEngine(response, maxDeleteResults) + processor = engine.NewPassthroughEngine(processor, maxDeleteResults) } else { query := querySpec.SelectQuery() if self.ShouldAggregateLocally(querySpec) { log.Debug("creating a query engine") - processor, err = engine.NewQueryEngine(query, response) + processor, err = engine.NewQueryEngine(processor, query) if err != nil { response <- &p.Response{Type: &endStreamResponse, ErrorMessage: p.String(err.Error())} log.Error("Error while creating engine: %s", err) return } - processor.SetShardInfo(int(self.Id()), self.IsLocal) } else if query.HasAggregates() { maxPointsToBufferBeforeSending := 1000 log.Debug("creating a passthrough engine") - processor = engine.NewPassthroughEngine(response, maxPointsToBufferBeforeSending) + processor = engine.NewPassthroughEngine(processor, maxPointsToBufferBeforeSending) } else { maxPointsToBufferBeforeSending := 1000 log.Debug("creating a passthrough engine with limit") - processor = engine.NewPassthroughEngineWithLimit(response, maxPointsToBufferBeforeSending, query.Limit) + processor = engine.NewPassthroughEngineWithLimit(processor, maxPointsToBufferBeforeSending, query.Limit) } if query.GetFromClause().Type != parser.FromClauseInnerJoin { @@ -422,35 +404,28 @@ func (self *ShardData) QueryResponseBufferSize(querySpec *parser.QuerySpec, batc return tickCount } -func (self *ShardData) logAndHandleDeleteQuery(querySpec *parser.QuerySpec, response chan *p.Response) { +func (self *ShardData) logAndHandleDeleteQuery(querySpec *parser.QuerySpec, response chan<- *p.Response) { queryString := querySpec.GetQueryStringWithTimeCondition() request := self.createRequest(querySpec) request.Query = &queryString self.LogAndHandleDestructiveQuery(querySpec, request, response, false) } -func (self *ShardData) logAndHandleDropSeriesQuery(querySpec *parser.QuerySpec, response chan *p.Response) { +func (self *ShardData) logAndHandleDropSeriesQuery(querySpec *parser.QuerySpec, response chan<- *p.Response) { self.LogAndHandleDestructiveQuery(querySpec, self.createRequest(querySpec), response, false) } -func (self *ShardData) LogAndHandleDestructiveQuery(querySpec *parser.QuerySpec, request *p.Request, response chan *p.Response, runLocalOnly bool) { +func (self *ShardData) LogAndHandleDestructiveQuery(querySpec *parser.QuerySpec, request *p.Request, response chan<- *p.Response, runLocalOnly bool) { self.HandleDestructiveQuery(querySpec, request, response, runLocalOnly) } -func (self *ShardData) deleteDataLocally(querySpec *parser.QuerySpec) (<-chan *p.Response, error) { - localResponses := make(chan *p.Response, 1) - - // this doesn't really apply at this point since destructive queries don't output anything, but it may later - maxPointsFromDestructiveQuery := 1000 - processor := engine.NewPassthroughEngine(localResponses, maxPointsFromDestructiveQuery) +func (self *ShardData) deleteDataLocally(querySpec *parser.QuerySpec) error { shard, err := self.store.GetOrCreateShard(self.id) if err != nil { - return nil, err + return err } defer self.store.ReturnShard(self.id) - err = shard.Query(querySpec, processor) - processor.Close() - return localResponses, err + return shard.Query(querySpec, NilProcessor{}) } func (self *ShardData) forwardRequest(request *p.Request) ([]<-chan *p.Response, []uint32, error) { @@ -468,7 +443,7 @@ func (self *ShardData) forwardRequest(request *p.Request) ([]<-chan *p.Response, return responses, ids, nil } -func (self *ShardData) HandleDestructiveQuery(querySpec *parser.QuerySpec, request *p.Request, response chan *p.Response, runLocalOnly bool) { +func (self *ShardData) HandleDestructiveQuery(querySpec *parser.QuerySpec, request *p.Request, response chan<- *p.Response, runLocalOnly bool) { if !self.IsLocal && runLocalOnly { panic("WTF islocal is false and runLocalOnly is true") } @@ -477,15 +452,13 @@ func (self *ShardData) HandleDestructiveQuery(querySpec *parser.QuerySpec, reque serverIds := []uint32{} if self.IsLocal { - channel, err := self.deleteDataLocally(querySpec) + err := self.deleteDataLocally(querySpec) if err != nil { msg := err.Error() response <- &p.Response{Type: &endStreamResponse, ErrorMessage: &msg} log.Error(msg) return } - responseChannels = append(responseChannels, channel) - serverIds = append(serverIds, self.localServerId) } log.Debug("request %s, runLocalOnly: %v", request.GetDescription(), runLocalOnly) diff --git a/coordinator/client_server_test.go b/coordinator/client_server_test.go index 008dc17038..9804e9f61a 100644 --- a/coordinator/client_server_test.go +++ b/coordinator/client_server_test.go @@ -8,6 +8,7 @@ import ( "testing" "time" + "github.com/influxdb/influxdb/cluster" "github.com/influxdb/influxdb/protocol" . "launchpad.net/gocheck" ) @@ -21,8 +22,6 @@ const DB_DIR = "/tmp/influxdb/datastore_test" type MockRequestHandler struct { } -var writeOk = protocol.Response_WRITE_OK - func Test(t *testing.T) { TestingT(t) } @@ -35,7 +34,7 @@ func stringToSeries(seriesString string, c *C) *protocol.Series { } func (self *MockRequestHandler) HandleRequest(request *protocol.Request, conn net.Conn) error { - response := &protocol.Response{RequestId: request.Id, Type: &writeOk} + response := &protocol.Response{RequestId: request.Id, Type: protocol.Response_END_STREAM.Enum()} data, _ := response.Encode() binary.Write(conn, binary.LittleEndian, uint32(len(data))) conn.Write(data) @@ -71,14 +70,14 @@ func (self *ClientServerSuite) TestClientCanMakeRequests(c *C) { request := &protocol.Request{Id: &id, Type: &proxyWrite, Database: &database, MultiSeries: []*protocol.Series{series}} time.Sleep(time.Second * 1) - err := protobufClient.MakeRequest(request, responseStream) + err := protobufClient.MakeRequest(request, cluster.NewResponseChannelWrapper(responseStream)) c.Assert(err, IsNil) timer := time.NewTimer(time.Second) select { case <-timer.C: c.Error("Timed out waiting for response") case response := <-responseStream: - c.Assert(*response.Type, Equals, protocol.Response_WRITE_OK) + c.Assert(*response.Type, Equals, protocol.Response_END_STREAM) } } diff --git a/coordinator/continuous_query_writer.go b/coordinator/continuous_query_writer.go index 9d21587b62..6f9eeaaa15 100644 --- a/coordinator/continuous_query_writer.go +++ b/coordinator/continuous_query_writer.go @@ -14,9 +14,18 @@ func NewContinuousQueryWriter(yield func(*protocol.Series) error) *ContinuousQue return &ContinuousQueryWriter{yield} } -func (self *ContinuousQueryWriter) Write(series *protocol.Series) error { - return self.yield(series) +func (self *ContinuousQueryWriter) Yield(series *protocol.Series) (bool, error) { + err := self.yield(series) + if err != nil { + return false, err + } + return true, nil } -func (self *ContinuousQueryWriter) Close() { +func (self *ContinuousQueryWriter) Close() error { + return nil +} + +func (self *ContinuousQueryWriter) Name() string { + return "ContinuousQueryWriter" } diff --git a/coordinator/coordinator.go b/coordinator/coordinator.go index 23f7802ebc..9b99aeb988 100644 --- a/coordinator/coordinator.go +++ b/coordinator/coordinator.go @@ -45,7 +45,7 @@ func NewCoordinator( return coordinator } -func (self *Coordinator) RunQuery(user common.User, database string, queryString string, seriesWriter SeriesWriter) (err error) { +func (self *Coordinator) RunQuery(user common.User, database string, queryString string, p engine.Processor) (err error) { log.Info("Start Query: db: %s, u: %s, q: %s", database, user.GetName(), queryString) defer func(t time.Time) { log.Debug("End Query: db: %s, u: %s, q: %s, t: %s", database, user.GetName(), queryString, time.Now().Sub(t)) @@ -59,16 +59,17 @@ func (self *Coordinator) RunQuery(user common.User, database string, queryString } for _, query := range q { - err := self.runSingleQuery(user, database, query, seriesWriter) + // runSingleQuery shouldn't close the processor in case there are + // other queries to be run + err := self.runSingleQuery(user, database, query, p) if err != nil { return err } } - seriesWriter.Close() return nil } -func (self *Coordinator) runSingleQuery(user common.User, db string, q *parser.Query, sw SeriesWriter) error { +func (self *Coordinator) runSingleQuery(user common.User, db string, q *parser.Query, p engine.Processor) error { querySpec := parser.NewQuerySpec(user, db, q) if ok, err := self.permissions.CheckQueryPermissions(user, db, querySpec); !ok { @@ -80,37 +81,37 @@ func (self *Coordinator) runSingleQuery(user common.User, db string, q *parser.Q case parser.DropContinuousQuery: return self.runDropContinuousQuery(user, db, uint32(q.DropQuery.Id)) case parser.ListContinuousQueries: - return self.runListContinuousQueries(user, db, sw) + return self.runListContinuousQueries(user, db, p) case parser.Continuous: return self.runContinuousQuery(user, db, q.GetQueryString()) case parser.ListSeries: - return self.runListSeriesQuery(querySpec, sw) + return self.runListSeriesQuery(querySpec, p) // Data queries case parser.Delete: - return self.runDeleteQuery(querySpec, sw) + return self.runDeleteQuery(querySpec, p) case parser.DropSeries: - return self.runDropSeriesQuery(querySpec, sw) + return self.runDropSeriesQuery(querySpec) case parser.Select: - return self.runQuerySpec(querySpec, sw) + return self.runQuerySpec(querySpec, p) default: return fmt.Errorf("Can't handle query %s", qt) } } -func (self *Coordinator) runListContinuousQueries(user common.User, db string, sw SeriesWriter) error { +func (self *Coordinator) runListContinuousQueries(user common.User, db string, p engine.Processor) error { queries, err := self.ListContinuousQueries(user, db) if err != nil { return err } for _, q := range queries { - if err := sw.Write(q); err != nil { + if ok, err := p.Yield(q); !ok || err != nil { return err } } return nil } -func (self *Coordinator) runListSeriesQuery(querySpec *parser.QuerySpec, seriesWriter SeriesWriter) error { +func (self *Coordinator) runListSeriesQuery(querySpec *parser.QuerySpec, p engine.Processor) error { allSeries := self.clusterConfiguration.MetaStore.GetSeriesForDatabase(querySpec.Database()) matchingSeries := allSeries if q := querySpec.Query().GetListSeriesQuery(); q.HasRegex() { @@ -133,27 +134,25 @@ func (self *Coordinator) runListSeriesQuery(querySpec *parser.QuerySpec, seriesW } seriesResult := &protocol.Series{Name: &name, Fields: fields, Points: points} - seriesWriter.Write(seriesResult) - seriesWriter.Close() - return nil + _, err := p.Yield(seriesResult) + return err } -func (self *Coordinator) runDeleteQuery(querySpec *parser.QuerySpec, seriesWriter SeriesWriter) error { +func (self *Coordinator) runDeleteQuery(querySpec *parser.QuerySpec, p engine.Processor) error { if err := self.clusterConfiguration.CreateCheckpoint(); err != nil { return err } querySpec.RunAgainstAllServersInShard = true - return self.runQuerySpec(querySpec, seriesWriter) + return self.runQuerySpec(querySpec, p) } -func (self *Coordinator) runDropSeriesQuery(querySpec *parser.QuerySpec, seriesWriter SeriesWriter) error { +func (self *Coordinator) runDropSeriesQuery(querySpec *parser.QuerySpec) error { user := querySpec.User() db := querySpec.Database() series := querySpec.Query().DropSeriesQuery.GetTableName() if ok, err := self.permissions.AuthorizeDropSeries(user, db, series); !ok { return err } - defer seriesWriter.Close() err := self.raftServer.DropSeries(db, series) if err != nil { return err @@ -209,144 +208,58 @@ func (self *Coordinator) shouldQuerySequentially(shards cluster.Shards, querySpe return false } -func (self *Coordinator) getShardsAndProcessor(querySpec *parser.QuerySpec, writer SeriesWriter) ([]*cluster.ShardData, cluster.QueryProcessor, <-chan bool, error) { +func (self *Coordinator) getShardsAndProcessor(querySpec *parser.QuerySpec, writer engine.Processor) ([]*cluster.ShardData, engine.Processor, error) { shards := self.clusterConfiguration.GetShardsForQuery(querySpec) shouldAggregateLocally := shards.ShouldAggregateLocally(querySpec) var err error - var processor cluster.QueryProcessor - - responseChan := make(chan *protocol.Response) - seriesClosed := make(chan bool) selectQuery := querySpec.SelectQuery() if selectQuery != nil { if !shouldAggregateLocally { // if we should aggregate in the coordinator (i.e. aggregation // isn't happening locally at the shard level), create an engine - processor, err = engine.NewQueryEngine(querySpec.SelectQuery(), responseChan) + writer, err = engine.NewQueryEngine(writer, querySpec.SelectQuery()) } else { // if we have a query with limit, then create an engine, or we can // make the passthrough limit aware - processor = engine.NewPassthroughEngineWithLimit(responseChan, 100, selectQuery.Limit) + writer = engine.NewPassthroughEngineWithLimit(writer, 100, selectQuery.Limit) } } else if !shouldAggregateLocally { - processor = engine.NewPassthroughEngine(responseChan, 100) + writer = engine.NewPassthroughEngine(writer, 100) } if err != nil { - return nil, nil, nil, err + return nil, nil, err } - if processor == nil { - return shards, nil, nil, nil - } - - go func() { - for { - response := <-responseChan - - if *response.Type == endStreamResponse || *response.Type == accessDeniedResponse { - writer.Close() - seriesClosed <- true - return - } - if !(*response.Type == queryResponse && querySpec.IsExplainQuery()) { - if response.Series != nil && len(response.Series.Points) > 0 { - writer.Write(response.Series) - } - } - } - }() - - return shards, processor, seriesClosed, nil + return shards, writer, nil } -func (self *Coordinator) readFromResponseChannels(processor cluster.QueryProcessor, - writer SeriesWriter, - isExplainQuery bool, - errors chan<- error, - responseChannels <-chan (<-chan *protocol.Response)) { - - defer close(errors) - - for responseChan := range responseChannels { - for response := range responseChan { - - //log.Debug("GOT RESPONSE: ", response.Type, response.Series) - log.Debug("GOT RESPONSE: %v", response.Type) - if *response.Type == endStreamResponse || *response.Type == accessDeniedResponse { - if response.ErrorMessage == nil { - break - } - - err := common.NewQueryError(common.InvalidArgument, *response.ErrorMessage) - log.Error("Error while executing query: %s", err) - errors <- err - return - } - - if response.Series == nil || len(response.Series.Points) == 0 { - log.Debug("Series has no points, continue") - continue - } - - // if we don't have a processor, yield the point to the writer - // this happens if shard took care of the query - // otherwise client will get points from passthrough engine - if processor != nil { - // if the data wasn't aggregated at the shard level, aggregate - // the data here - log.Debug("YIELDING: %d points with %d columns for %s", len(response.Series.Points), len(response.Series.Fields), response.Series.GetName()) - processor.YieldSeries(response.Series) - continue - } - - // If we have EXPLAIN query, we don't write actual points (of - // response.Type Query) to the client - if !(*response.Type == queryResponse && isExplainQuery) { - writer.Write(response.Series) - } - } - - // once we're done with a response channel signal queryShards to - // start querying a new shard - errors <- nil - } - return -} - -func (self *Coordinator) queryShards(querySpec *parser.QuerySpec, shards []*cluster.ShardData, - errors <-chan error, - responseChannels chan<- (<-chan *protocol.Response)) error { - defer close(responseChannels) - - for i := 0; i < len(shards); i++ { +func (self *Coordinator) queryShards(querySpec *parser.QuerySpec, shards []*cluster.ShardData, p *MergeChannelProcessor) error { + for i, s := range shards { // readFromResponseChannels will insert an error if an error // occured while reading the response. This should immediately // stop reading from shards - err := <-errors - if err != nil { - return err - } - shard := shards[i] - bufferSize := shard.QueryResponseBufferSize(querySpec, self.config.StoragePointBatchSize) + bufferSize := s.QueryResponseBufferSize(querySpec, self.config.StoragePointBatchSize) if bufferSize > self.config.ClusterMaxResponseBufferSize { bufferSize = self.config.ClusterMaxResponseBufferSize } - responseChan := make(chan *protocol.Response, bufferSize) + c, err := p.NextChannel(bufferSize) + if err != nil { + return err + } // We query shards for data and stream them to query processor - log.Debug("QUERYING: shard: %d %v", i, shard.String()) - go shard.Query(querySpec, responseChan) - responseChannels <- responseChan + log.Debug("QUERYING: shard: %d %v", i, s.String()) + go s.Query(querySpec, c) } return nil } // We call this function only if we have a Select query (not continuous) or Delete query -func (self *Coordinator) runQuerySpec(querySpec *parser.QuerySpec, seriesWriter SeriesWriter) error { - shards, processor, seriesClosed, err := self.getShardsAndProcessor(querySpec, seriesWriter) +func (self *Coordinator) runQuerySpec(querySpec *parser.QuerySpec, p engine.Processor) error { + shards, processor, err := self.getShardsAndProcessor(querySpec, p) if err != nil { return err } @@ -355,15 +268,6 @@ func (self *Coordinator) runQuerySpec(querySpec *parser.QuerySpec, seriesWriter return fmt.Errorf("Couldn't look up columns") } - defer func() { - if processor != nil { - processor.Close() - <-seriesClosed - } else { - seriesWriter.Close() - } - }() - shardConcurrentLimit := self.config.ConcurrentShardQueryLimit if self.shouldQuerySequentially(shards, querySpec) { log.Debug("Querying shards sequentially") @@ -371,35 +275,22 @@ func (self *Coordinator) runQuerySpec(querySpec *parser.QuerySpec, seriesWriter } log.Debug("Shard concurrent limit: %d", shardConcurrentLimit) - errors := make(chan error, shardConcurrentLimit) - for i := 0; i < shardConcurrentLimit; i++ { - errors <- nil - } - responseChannels := make(chan (<-chan *protocol.Response), shardConcurrentLimit) + mcp := NewMergeChannelProcessor(processor, shardConcurrentLimit) - go self.readFromResponseChannels(processor, seriesWriter, querySpec.IsExplainQuery(), errors, responseChannels) + go mcp.ProcessChannels() - err = self.queryShards(querySpec, shards, errors, responseChannels) - - // make sure we read the rest of the errors and responses - for _err := range errors { - if err == nil { - err = _err - } + if err := self.queryShards(querySpec, shards, mcp); err != nil { + log.Error("Error while querying shards: %s", err) + mcp.Close() + return err } - for responsechan := range responseChannels { - for response := range responsechan { - if response.GetType() != endStreamResponse { - continue - } - if response.ErrorMessage != nil && err == nil { - err = common.NewQueryError(common.InvalidArgument, *response.ErrorMessage) - } - break - } + if err := mcp.Close(); err != nil { + log.Error("Error while querying shards: %s", err) + return err } - return err + + return processor.Close() } func (self *Coordinator) ForceCompaction(user common.User) error { diff --git a/coordinator/merge_channel_processor.go b/coordinator/merge_channel_processor.go new file mode 100644 index 0000000000..2df64ce32c --- /dev/null +++ b/coordinator/merge_channel_processor.go @@ -0,0 +1,104 @@ +package coordinator + +import ( + "fmt" + + "code.google.com/p/log4go" + + "github.com/influxdb/influxdb/common" + "github.com/influxdb/influxdb/engine" + "github.com/influxdb/influxdb/protocol" +) + +type MergeChannelProcessor struct { + next engine.Processor + c chan (<-chan *protocol.Response) + e chan error +} + +func NewMergeChannelProcessor(next engine.Processor, concurrency int) *MergeChannelProcessor { + p := &MergeChannelProcessor{ + next: next, + e: make(chan error, concurrency), + c: make(chan (<-chan *protocol.Response), concurrency), + } + for i := 0; i < concurrency; i++ { + p.e <- nil + } + return p +} + +func (p *MergeChannelProcessor) Close() (err error) { + close(p.c) + + for e := range p.e { + if e != nil { + err = e + } + } + + for c := range p.c { + nextChannel: + for r := range c { + switch r.GetType() { + case protocol.Response_END_STREAM, + protocol.Response_ACCESS_DENIED, + protocol.Response_WRITE_OK, + protocol.Response_HEARTBEAT: + break nextChannel + } + } + } + return err +} + +func (p *MergeChannelProcessor) NextChannel(bs int) (chan<- *protocol.Response, error) { + err := <-p.e + if err != nil { + return nil, err + } + c := make(chan *protocol.Response, bs) + p.c <- c + return c, nil +} + +func (p *MergeChannelProcessor) String() string { + return fmt.Sprintf("MergeChannelProcessor (%d)", cap(p.e)) +} + +func (p *MergeChannelProcessor) ProcessChannels() { + defer close(p.e) + + for channel := range p.c { + nextChannel: + for response := range channel { + log4go.Debug("%s received %s", p, response) + + switch response.GetType() { + + // all these four types end the stream + case protocol.Response_WRITE_OK, + protocol.Response_HEARTBEAT, + protocol.Response_ACCESS_DENIED, + protocol.Response_END_STREAM: + + var err error + if m := response.ErrorMessage; m != nil { + err = common.NewQueryError(common.InvalidArgument, *m) + } + p.e <- err + break nextChannel + + case protocol.Response_QUERY: + for _, s := range response.MultiSeries { + log4go.Debug("Yielding to %s: %s", p.next.Name(), s) + _, err := p.next.Yield(s) + if err != nil { + p.e <- err + return + } + } + } + } + } +} diff --git a/coordinator/protobuf_client.go b/coordinator/protobuf_client.go index 17adf20696..77a1b6cbec 100644 --- a/coordinator/protobuf_client.go +++ b/coordinator/protobuf_client.go @@ -11,6 +11,7 @@ import ( "time" log "code.google.com/p/log4go" + "github.com/influxdb/influxdb/cluster" "github.com/influxdb/influxdb/protocol" ) @@ -32,9 +33,9 @@ type ProtobufClient struct { } type runningRequest struct { - timeMade time.Time - responseChan chan *protocol.Response - request *protocol.Request + timeMade time.Time + r cluster.ResponseChannel + request *protocol.Request } const ( @@ -91,27 +92,38 @@ func (self *ProtobufClient) ClearRequests() { self.requestBufferLock.Lock() defer self.requestBufferLock.Unlock() - message := "clearing all requests" for _, req := range self.requestBuffer { - select { - case req.responseChan <- &protocol.Response{Type: &endStreamResponse, ErrorMessage: &message}: - default: - log.Debug("Cannot send response on channel") - } + self.cancelRequest(req.request) } self.requestBuffer = map[uint32]*runningRequest{} } +func (self *ProtobufClient) CancelRequest(request *protocol.Request) { + self.requestBufferLock.Lock() + defer self.requestBufferLock.Unlock() + self.cancelRequest(request) +} + +func (self *ProtobufClient) cancelRequest(request *protocol.Request) { + req, ok := self.requestBuffer[*request.Id] + if !ok { + return + } + message := "cancelling request" + req.r.Yield(&protocol.Response{Type: &endStreamResponse, ErrorMessage: &message}) + delete(self.requestBuffer, *request.Id) +} + // Makes a request to the server. If the responseStream chan is not nil it will expect a response from the server // with a matching request.Id. The REQUEST_RETRY_ATTEMPTS constant of 3 and the RECONNECT_RETRY_WAIT of 100ms means // that an attempt to make a request to a downed server will take 300ms to time out. -func (self *ProtobufClient) MakeRequest(request *protocol.Request, responseStream chan *protocol.Response) error { +func (self *ProtobufClient) MakeRequest(request *protocol.Request, r cluster.ResponseChannel) error { if request.Id == nil { id := atomic.AddUint32(&self.lastRequestId, uint32(1)) request.Id = &id } - if responseStream != nil { + if r != nil { self.requestBufferLock.Lock() // this should actually never happen. The sweeper should clear out dead requests @@ -119,9 +131,9 @@ func (self *ProtobufClient) MakeRequest(request *protocol.Request, responseStrea if oldReq, alreadyHasRequestById := self.requestBuffer[*request.Id]; alreadyHasRequestById { message := "already has a request with this id, must have timed out" log.Error(message) - oldReq.responseChan <- &protocol.Response{Type: &endStreamResponse, ErrorMessage: &message} + oldReq.r.Yield(&protocol.Response{Type: &endStreamResponse, ErrorMessage: &message}) } - self.requestBuffer[*request.Id] = &runningRequest{timeMade: time.Now(), responseChan: responseStream, request: request} + self.requestBuffer[*request.Id] = &runningRequest{timeMade: time.Now(), r: r, request: request} self.requestBufferLock.Unlock() } @@ -197,14 +209,29 @@ func (self *ProtobufClient) sendResponse(response *protocol.Response) { self.requestBufferLock.RLock() req, ok := self.requestBuffer[*response.RequestId] self.requestBufferLock.RUnlock() - if ok { - if *response.Type == protocol.Response_END_STREAM || *response.Type == protocol.Response_WRITE_OK || *response.Type == protocol.Response_HEARTBEAT || *response.Type == protocol.Response_ACCESS_DENIED { - self.requestBufferLock.Lock() - delete(self.requestBuffer, *response.RequestId) - self.requestBufferLock.Unlock() - } - req.responseChan <- response + if !ok { + return } + + switch response.GetType() { + case protocol.Response_END_STREAM, + protocol.Response_WRITE_OK, + protocol.Response_HEARTBEAT, + protocol.Response_ACCESS_DENIED: + // continue and delete the request + default: + return + } + + self.requestBufferLock.Lock() + req, ok = self.requestBuffer[*response.RequestId] + delete(self.requestBuffer, *response.RequestId) + self.requestBufferLock.Unlock() + if !ok { + return + } + + req.r.Yield(response) } func (self *ProtobufClient) reconnect() net.Conn { diff --git a/coordinator/protobuf_client_test.go b/coordinator/protobuf_client_test.go index 7215afece1..89268df45b 100644 --- a/coordinator/protobuf_client_test.go +++ b/coordinator/protobuf_client_test.go @@ -9,6 +9,7 @@ import ( "time" log "code.google.com/p/log4go" + "github.com/influxdb/influxdb/cluster" "github.com/influxdb/influxdb/protocol" ) @@ -98,7 +99,8 @@ func BenchmarkSingle(b *testing.B) { Type: &HEARTBEAT_TYPE, Database: protocol.String(""), } - client.MakeRequest(heartbeatRequest, responseChan) + rcw := cluster.NewResponseChannelWrapper(responseChan) + client.MakeRequest(heartbeatRequest, rcw) <-responseChan } } diff --git a/coordinator/protobuf_request_handler.go b/coordinator/protobuf_request_handler.go index ba16107ba6..6ee435cf59 100644 --- a/coordinator/protobuf_request_handler.go +++ b/coordinator/protobuf_request_handler.go @@ -107,16 +107,12 @@ func (self *ProtobufRequestHandler) handleQuery(request *protocol.Request, conn func (self *ProtobufRequestHandler) WriteResponse(conn net.Conn, response *protocol.Response) error { if response.Size() >= MAX_RESPONSE_SIZE { - l := len(response.Series.Points) - firstHalfPoints := response.Series.Points[:l/2] - secondHalfPoints := response.Series.Points[l/2:] - response.Series.Points = firstHalfPoints - err := self.WriteResponse(conn, response) + f, s := splitResponse(response) + err := self.WriteResponse(conn, f) if err != nil { return err } - response.Series.Points = secondHalfPoints - return self.WriteResponse(conn, response) + return self.WriteResponse(conn, s) } data, err := response.Encode() @@ -134,3 +130,21 @@ func (self *ProtobufRequestHandler) WriteResponse(conn net.Conn, response *proto } return nil } + +func splitResponse(response *protocol.Response) (f, s *protocol.Response) { + f = &protocol.Response{} + s = &protocol.Response{} + *f = *response + *s = *response + + if l := len(response.MultiSeries); l > 1 { + f.MultiSeries = f.MultiSeries[:l/2] + s.MultiSeries = s.MultiSeries[l/2:] + return + } + + l := len(response.MultiSeries[0].Points) + f.MultiSeries[0].Points = f.MultiSeries[0].Points[:l/2] + s.MultiSeries[0].Points = s.MultiSeries[0].Points[l/2:] + return +} diff --git a/coordinator/series_writer.go b/coordinator/series_writer.go deleted file mode 100644 index c089a05c6e..0000000000 --- a/coordinator/series_writer.go +++ /dev/null @@ -1,8 +0,0 @@ -package coordinator - -import "github.com/influxdb/influxdb/protocol" - -type SeriesWriter interface { - Write(*protocol.Series) error - Close() -} diff --git a/datastore/shard.go b/datastore/shard.go index 75b7b4d7df..322c2431d9 100644 --- a/datastore/shard.go +++ b/datastore/shard.go @@ -12,9 +12,9 @@ import ( "code.google.com/p/goprotobuf/proto" log "code.google.com/p/log4go" - "github.com/influxdb/influxdb/cluster" "github.com/influxdb/influxdb/common" "github.com/influxdb/influxdb/datastore/storage" + "github.com/influxdb/influxdb/engine" "github.com/influxdb/influxdb/metastore" "github.com/influxdb/influxdb/parser" "github.com/influxdb/influxdb/protocol" @@ -100,13 +100,12 @@ func (self *Shard) Write(database string, series []*protocol.Series) error { return self.db.BatchPut(wb) } -func (self *Shard) Query(querySpec *parser.QuerySpec, processor cluster.QueryProcessor) error { +func (self *Shard) Query(querySpec *parser.QuerySpec, processor engine.Processor) error { self.closeLock.RLock() defer self.closeLock.RUnlock() if self.closed { return fmt.Errorf("Shard is closed") } - if querySpec.IsListSeriesQuery() { return fmt.Errorf("List series queries should never come to the shard") } else if querySpec.IsDeleteFromSeriesQuery() { @@ -145,7 +144,7 @@ func (self *Shard) IsClosed() bool { return self.closed } -func (self *Shard) executeQueryForSeries(querySpec *parser.QuerySpec, seriesName string, columns []string, processor cluster.QueryProcessor) error { +func (self *Shard) executeQueryForSeries(querySpec *parser.QuerySpec, seriesName string, columns []string, processor engine.Processor) error { startTimeBytes := self.byteArrayForTime(querySpec.GetStartTime()) endTimeBytes := self.byteArrayForTime(querySpec.GetEndTime()) @@ -167,7 +166,8 @@ func (self *Shard) executeQueryForSeries(querySpec *parser.QuerySpec, seriesName return err } if len(series.Points) > 0 { - processor.YieldPoint(series.Name, series.Fields, series.Points[0]) + _, err := processor.Yield(series) + return err } return nil } @@ -291,9 +291,13 @@ func (self *Shard) executeQueryForSeries(querySpec *parser.QuerySpec, seriesName Fields: fieldNames, Points: seriesOutgoing.Points, } - if !processor.YieldSeries(series) { - log.Debug("Stopping processing") + if ok, err := processor.Yield(series); !ok || err != nil { + log.Debug("Stopping processing.") shouldContinue = false + if err != nil { + log.Error("Error while processing data: %v", err) + return err + } } } seriesOutgoing = &protocol.Series{Name: protocol.String(seriesName), Fields: fieldNames, Points: make([]*protocol.Point, 0, self.pointBatchSize)} @@ -308,8 +312,12 @@ func (self *Shard) executeQueryForSeries(querySpec *parser.QuerySpec, seriesName for _, alias := range aliases { log.Debug("Final Flush %s", alias) series := &protocol.Series{Name: protocol.String(alias), Fields: seriesOutgoing.Fields, Points: seriesOutgoing.Points} - if !processor.YieldSeries(series) { + if ok, err := processor.Yield(series); !ok || err != nil { log.Debug("Cancelled...") + if err != nil { + log.Error("Error while processing data: %v", err) + return err + } } } @@ -317,7 +325,7 @@ func (self *Shard) executeQueryForSeries(querySpec *parser.QuerySpec, seriesName return nil } -func (self *Shard) executeDeleteQuery(querySpec *parser.QuerySpec, processor cluster.QueryProcessor) error { +func (self *Shard) executeDeleteQuery(querySpec *parser.QuerySpec, processor engine.Processor) error { query := querySpec.DeleteQuery() series := query.GetFromClause() database := querySpec.Database() diff --git a/engine/aggregator_engine.go b/engine/aggregator_engine.go new file mode 100644 index 0000000000..5421588ab9 --- /dev/null +++ b/engine/aggregator_engine.go @@ -0,0 +1,400 @@ +package engine + +import ( + "fmt" + "math" + "strings" + "time" + + "github.com/influxdb/influxdb/common" + "github.com/influxdb/influxdb/parser" + "github.com/influxdb/influxdb/protocol" +) + +type AggregatorEngine struct { + // query information + query *parser.SelectQuery + isAggregateQuery bool + fields []string + where *parser.WhereCondition + fillWithZero bool + + // was start time set in the query, e.g. time > now() - 1d + startTimeSpecified bool + startTime int64 + endTime int64 + + // output fields + next Processor + limiter *Limiter + + // variables for aggregate queries + aggregators []Aggregator + elems []*parser.Value // group by columns other than time() + duration *time.Duration // the time by duration if any + seriesStates map[string]*SeriesState +} + +func (self *AggregatorEngine) Name() string { + return "Aggregator Engine" +} + +func (self *AggregatorEngine) yieldToNext(seriesIncoming *protocol.Series) (bool, error) { + if ok, err := self.next.Yield(seriesIncoming); err != nil || !ok { + return ok, err + } + if self.limiter.hitLimit(seriesIncoming.GetName()) { + return false, nil + } + return true, nil +} + +func (self *AggregatorEngine) Close() error { + if self.isAggregateQuery { + if _, err := self.runAggregates(); err != nil { + return err + } + } + + return self.next.Close() +} + +func (self *AggregatorEngine) getTimestampFromPoint(point *protocol.Point) int64 { + return self.getTimestampBucket(uint64(*point.GetTimestampInMicroseconds())) +} + +func (self *AggregatorEngine) getTimestampBucket(timestampMicroseconds uint64) int64 { + timestampMicroseconds *= 1000 // convert to nanoseconds + multiplier := uint64(*self.duration) + return int64(timestampMicroseconds / multiplier * multiplier / 1000) +} + +func (self *AggregatorEngine) Yield(s *protocol.Series) (bool, error) { + if len(s.Points) == 0 { + return true, nil + } + + return self.aggregateValuesForSeries(s) +} + +func (self *AggregatorEngine) initializeFields() { + for _, aggregator := range self.aggregators { + columnNames := aggregator.ColumnNames() + self.fields = append(self.fields, columnNames...) + } + + if self.elems == nil { + return + } + + for _, value := range self.elems { + tempName := value.Name + self.fields = append(self.fields, tempName) + } +} + +var _count = 0 + +func (self *AggregatorEngine) getSeriesState(name string) *SeriesState { + state := self.seriesStates[name] + if state == nil { + levels := len(self.elems) + if self.duration != nil && self.fillWithZero { + levels++ + } + + state = &SeriesState{ + started: false, + trie: NewTrie(levels, len(self.aggregators)), + lastTimestamp: 0, + pointsRange: &PointRange{math.MaxInt64, math.MinInt64}, + } + self.seriesStates[name] = state + } + return state +} + +// We have three types of queries: +// 1. time() without fill +// 2. time() with fill +// 3. no time() +// +// For (1) we flush as soon as a new bucket start, the prefix tree +// keeps track of the other group by columns without the time +// bucket. We reset the trie once the series is yielded. For (2), we +// keep track of all group by columns with time being the last level +// in the prefix tree. At the end of the query we step through [start +// time, end time] in self.duration steps and get the state from the +// prefix tree, using default values for groups without state in the +// prefix tree. For the last case we keep the groups in the prefix +// tree and on close() we loop through the groups and flush their +// values with a timestamp equal to now() +func (self *AggregatorEngine) aggregateValuesForSeries(series *protocol.Series) (bool, error) { + for _, aggregator := range self.aggregators { + if err := aggregator.InitializeFieldsMetadata(series); err != nil { + return false, err + } + } + + seriesState := self.getSeriesState(series.GetName()) + currentRange := seriesState.pointsRange + + includeTimestampInGroup := self.duration != nil && self.fillWithZero + var group []*protocol.FieldValue + if !includeTimestampInGroup { + group = make([]*protocol.FieldValue, len(self.elems)) + } else { + group = make([]*protocol.FieldValue, len(self.elems)+1) + } + + for _, point := range series.Points { + currentRange.UpdateRange(point) + + // this is a groupby with time() and no fill, flush as soon as we + // start a new bucket + if self.duration != nil && !self.fillWithZero { + timestamp := self.getTimestampFromPoint(point) + // this is the timestamp aggregator + if seriesState.started && seriesState.lastTimestamp != timestamp { + self.runAggregatesForTable(series.GetName()) + } + seriesState.lastTimestamp = timestamp + seriesState.started = true + } + + // get the group this point belongs to + for idx, elem := range self.elems { + // TODO: create an index from fieldname to index + value, err := GetValue(elem, series.Fields, point) + if err != nil { + return false, err + } + group[idx] = value + } + + // if this is a fill() query, add the timestamp at the end + if includeTimestampInGroup { + timestamp := self.getTimestampFromPoint(point) + group[len(self.elems)] = &protocol.FieldValue{Int64Value: protocol.Int64(timestamp)} + } + + // update the state of the given group + node := seriesState.trie.GetNode(group) + var err error + for idx, aggregator := range self.aggregators { + node.states[idx], err = aggregator.AggregatePoint(node.states[idx], point) + if err != nil { + return false, err + } + } + } + + return true, nil +} + +func (self *AggregatorEngine) runAggregates() (bool, error) { + for t := range self.seriesStates { + if ok, err := self.runAggregatesForTable(t); !ok || err != nil { + return ok, err + } + } + return true, nil +} + +func (self *AggregatorEngine) calculateSummariesForTable(table string) { + trie := self.getSeriesState(table).trie + err := trie.Traverse(func(group []*protocol.FieldValue, node *Node) error { + for idx, aggregator := range self.aggregators { + aggregator.CalculateSummaries(node.states[idx]) + } + return nil + }) + if err != nil { + panic("Error while calculating summaries") + } +} + +func (self *AggregatorEngine) runAggregatesForTable(table string) (bool, error) { + // TODO: if this is a fill query, step through [start,end] in duration + // steps and flush the groups for the given bucket + + self.calculateSummariesForTable(table) + + state := self.getSeriesState(table) + trie := state.trie + points := make([]*protocol.Point, 0, trie.CountLeafNodes()) + f := func(group []*protocol.FieldValue, node *Node) error { + points = append(points, self.getValuesForGroup(table, group, node)...) + return nil + } + + var err error + if self.duration != nil && self.fillWithZero { + timestampRange := state.pointsRange + if self.startTimeSpecified { + timestampRange = &PointRange{startTime: self.startTime, endTime: self.endTime} + } + + // TODO: DRY this + if self.query.Ascending { + bucket := self.getTimestampBucket(uint64(timestampRange.startTime)) + for bucket <= timestampRange.endTime { + timestamp := &protocol.FieldValue{Int64Value: protocol.Int64(bucket)} + defaultChildNode := &Node{states: make([]interface{}, len(self.aggregators))} + err = trie.TraverseLevel(len(self.elems), func(v []*protocol.FieldValue, node *Node) error { + childNode := node.GetChildNode(timestamp) + if childNode == nil { + childNode = defaultChildNode + } + return f(append(v, timestamp), childNode) + }) + bucket += self.duration.Nanoseconds() / 1000 + } + } else { + bucket := self.getTimestampBucket(uint64(timestampRange.endTime)) + for { + timestamp := &protocol.FieldValue{Int64Value: protocol.Int64(bucket)} + defaultChildNode := &Node{states: make([]interface{}, len(self.aggregators))} + err = trie.TraverseLevel(len(self.elems), func(v []*protocol.FieldValue, node *Node) error { + childNode := node.GetChildNode(timestamp) + if childNode == nil { + childNode = defaultChildNode + } + return f(append(v, timestamp), childNode) + }) + if bucket <= timestampRange.startTime { + break + } + bucket -= self.duration.Nanoseconds() / 1000 + } + } + } else { + err = trie.Traverse(f) + } + if err != nil { + panic(err) + } + trie.Clear() + return self.yieldToNext(&protocol.Series{ + Name: &table, + Fields: self.fields, + Points: points, + }) +} + +func (self *AggregatorEngine) getValuesForGroup(table string, group []*protocol.FieldValue, node *Node) []*protocol.Point { + + values := [][][]*protocol.FieldValue{} + + var timestamp int64 + useTimestamp := false + if self.duration != nil && !self.fillWithZero { + // if there's a group by time(), then the timestamp is the lastTimestamp + timestamp = self.getSeriesState(table).lastTimestamp + useTimestamp = true + } else if self.duration != nil && self.fillWithZero { + // if there's no group by time(), but a fill value was specified, + // the timestamp is the last value in the group + timestamp = group[len(group)-1].GetInt64Value() + useTimestamp = true + } + + for idx, aggregator := range self.aggregators { + values = append(values, aggregator.GetValues(node.states[idx])) + node.states[idx] = nil + } + + // do cross product of all the values + var _values [][]*protocol.FieldValue + if len(values) == 1 { + _values = values[0] + } else { + _values = crossProduct(values) + } + + points := []*protocol.Point{} + + for _, v := range _values { + /* groupPoints := []*protocol.Point{} */ + point := &protocol.Point{ + Values: v, + } + + if useTimestamp { + point.SetTimestampInMicroseconds(timestamp) + } else { + point.SetTimestampInMicroseconds(0) + } + + // FIXME: this should be looking at the fields slice not the group by clause + // FIXME: we should check whether the selected columns are in the group by clause + for idx := range self.elems { + point.Values = append(point.Values, group[idx]) + } + + points = append(points, point) + } + return points +} + +func (self *AggregatorEngine) init() error { + duration, err := self.query.GetGroupByClause().GetGroupByTime() + if err != nil { + return err + } + + self.isAggregateQuery = true + self.duration = duration + self.aggregators = []Aggregator{} + + for _, value := range self.query.GetColumnNames() { + if !value.IsFunctionCall() { + continue + } + lowerCaseName := strings.ToLower(value.Name) + initializer := registeredAggregators[lowerCaseName] + if initializer == nil { + return common.NewQueryError(common.InvalidArgument, fmt.Sprintf("Unknown function %s", value.Name)) + } + aggregator, err := initializer(self.query, value, self.query.GetGroupByClause().FillValue) + if err != nil { + return common.NewQueryError(common.InvalidArgument, fmt.Sprintf("%s", err)) + } + self.aggregators = append(self.aggregators, aggregator) + } + + for _, elem := range self.query.GetGroupByClause().Elems { + if elem.IsFunctionCall() { + continue + } + self.elems = append(self.elems, elem) + } + + self.fillWithZero = self.query.GetGroupByClause().FillWithZero + + // This is a special case for issue #426. If the start time is + // specified and there's a group by clause and fill with zero, then + // we need to fill the entire range from start time to end time + if self.query.IsStartTimeSpecified() && self.duration != nil && self.fillWithZero { + self.startTimeSpecified = true + self.startTime = self.query.GetStartTime().Truncate(*self.duration).UnixNano() / 1000 + self.endTime = self.query.GetEndTime().Truncate(*self.duration).UnixNano() / 1000 + } + + self.initializeFields() + return nil +} + +func NewAggregatorEngine(query *parser.SelectQuery, next Processor) (*AggregatorEngine, error) { + queryEngine := &AggregatorEngine{ + query: query, + where: query.GetWhereCondition(), + next: next, + limiter: NewLimiter(query.Limit), + // stats stuff + duration: nil, + seriesStates: make(map[string]*SeriesState), + } + + return queryEngine, queryEngine.init() +} diff --git a/engine/arithmetic_engine.go b/engine/arithmetic_engine.go new file mode 100644 index 0000000000..448c99828b --- /dev/null +++ b/engine/arithmetic_engine.go @@ -0,0 +1,80 @@ +package engine + +import ( + "strconv" + + log "code.google.com/p/log4go" + "github.com/influxdb/influxdb/parser" + "github.com/influxdb/influxdb/protocol" +) + +type ArithmeticEngine struct { + next Processor + names map[string]*parser.Value +} + +func NewArithmeticEngine(query *parser.SelectQuery, next Processor) (*ArithmeticEngine, error) { + + names := map[string]*parser.Value{} + for idx, v := range query.GetColumnNames() { + switch v.Type { + case parser.ValueSimpleName: + names[v.Name] = v + case parser.ValueFunctionCall: + names[v.Name] = v + case parser.ValueExpression: + if v.Alias != "" { + names[v.Alias] = v + } else { + names["expr"+strconv.Itoa(idx)] = v + } + } + } + + return &ArithmeticEngine{ + next: next, + names: names, + }, nil +} + +func (ae *ArithmeticEngine) Yield(s *protocol.Series) (bool, error) { + if len(s.Points) == 0 { + return ae.next.Yield(s) + } + + newSeries := &protocol.Series{ + Name: s.Name, + } + + // create the new column names + for name := range ae.names { + newSeries.Fields = append(newSeries.Fields, name) + } + + for _, point := range s.Points { + newPoint := &protocol.Point{ + Timestamp: point.Timestamp, + SequenceNumber: point.SequenceNumber, + } + for _, field := range newSeries.Fields { + value := ae.names[field] + v, err := GetValue(value, s.Fields, point) + if err != nil { + log.Error("Error in arithmetic computation: %s", err) + return false, err + } + newPoint.Values = append(newPoint.Values, v) + } + newSeries.Points = append(newSeries.Points, newPoint) + } + + return ae.next.Yield(newSeries) +} + +func (self *ArithmeticEngine) Close() error { + return self.next.Close() +} + +func (self *ArithmeticEngine) Name() string { + return "Arithmetic Engine" +} diff --git a/engine/merge.go b/engine/common_merge_engine.go similarity index 59% rename from engine/merge.go rename to engine/common_merge_engine.go index ffe4f9a3fb..4434dcb110 100644 --- a/engine/merge.go +++ b/engine/common_merge_engine.go @@ -1,78 +1,6 @@ package engine -import ( - "github.com/influxdb/influxdb/parser" - "github.com/influxdb/influxdb/protocol" -) - -func getJoinYield(query *parser.SelectQuery, yield func(*protocol.Series) error) func(*protocol.Series) error { - var lastPoint1 *protocol.Point - var lastFields1 []string - var lastPoint2 *protocol.Point - var lastFields2 []string - - table1 := query.GetFromClause().Names[0].GetAlias() - table2 := query.GetFromClause().Names[1].GetAlias() - name := table1 + "_join_" + table2 - - return mergeYield(table1, table2, false, query.Ascending, func(s *protocol.Series) error { - if *s.Name == table1 { - lastPoint1 = s.Points[len(s.Points)-1] - if lastFields1 == nil { - for _, f := range s.Fields { - lastFields1 = append(lastFields1, table1+"."+f) - } - } - } - - if *s.Name == table2 { - lastPoint2 = s.Points[len(s.Points)-1] - if lastFields2 == nil { - for _, f := range s.Fields { - lastFields2 = append(lastFields2, table2+"."+f) - } - } - } - - if lastPoint1 == nil || lastPoint2 == nil { - return nil - } - - newSeries := &protocol.Series{ - Name: &name, - Fields: append(lastFields1, lastFields2...), - Points: []*protocol.Point{ - { - Values: append(lastPoint1.Values, lastPoint2.Values...), - Timestamp: lastPoint2.Timestamp, - }, - }, - } - - lastPoint1 = nil - lastPoint2 = nil - - filteredSeries, _ := Filter(query, newSeries) - if len(filteredSeries.Points) > 0 { - return yield(newSeries) - } - return nil - }) -} - -func getMergeYield(table1, table2 string, ascending bool, yield func(*protocol.Series) error) func(*protocol.Series) error { - name := table1 + "_merge_" + table2 - - return mergeYield(table1, table2, true, ascending, func(s *protocol.Series) error { - oldName := s.Name - s.Name = &name - s.Fields = append(s.Fields, "_orig_series") - for _, p := range s.Points { - p.Values = append(p.Values, &protocol.FieldValue{StringValue: oldName}) - } - return yield(s) - }) -} +import "github.com/influxdb/influxdb/protocol" type seriesMergeState struct { name string @@ -93,16 +21,16 @@ func isLater(first *seriesMergeState, other *seriesMergeState) bool { return *first.series[0].Points[0].Timestamp > *other.series[0].Points[0].Timestamp } -func (self *seriesMergeState) flush(state *mergeState, yield func(*protocol.Series) error) error { +func (self *seriesMergeState) flush(state *CommonMergeEngine) (bool, error) { for _, s := range self.series { s := state.mergeColumnsInSeries(s) - err := yield(s) - if err != nil { - return err + ok, err := state.next.Yield(s) + if !ok || err != nil { + return ok, err } } self.series = nil - return nil + return true, nil } // update the state, the points belong to this seriesMergeState (i.e. the name of the timeseries matches) @@ -124,7 +52,8 @@ func (self *seriesMergeState) updateState(p *protocol.Series) { } } -type mergeState struct { +type CommonMergeEngine struct { + next Processor leftGoFirst func(_, _ *seriesMergeState) bool fields map[string]int left *seriesMergeState @@ -136,7 +65,7 @@ type mergeState struct { // the order of the null values match the order of the field // definitions, i.e. left timeseries first followed by values from the // right timeseries -func (self *mergeState) mergeColumnsInSeries(s *protocol.Series) *protocol.Series { +func (self *CommonMergeEngine) mergeColumnsInSeries(s *protocol.Series) *protocol.Series { if !self.mergeColumns { return s } @@ -168,7 +97,7 @@ func (self *mergeState) mergeColumnsInSeries(s *protocol.Series) *protocol.Serie // the order of the null values match the order of the field // definitions, i.e. left timeseries first followed by values from the // right timeseries -func (self *mergeState) getFields() []string { +func (self *CommonMergeEngine) getFields() []string { fields := make([]string, len(self.fields)) for f, i := range self.fields { fields[i] = f @@ -176,7 +105,7 @@ func (self *mergeState) getFields() []string { return fields } -func (self *mergeState) yieldNextPoints(yield func(*protocol.Series) error) error { +func (self *CommonMergeEngine) yieldNextPoints() (bool, error) { // see which point should be returned next and remove it from the // series for self.left.hasPoints() && self.right.hasPoints() { @@ -188,29 +117,33 @@ func (self *mergeState) yieldNextPoints(yield func(*protocol.Series) error) erro } s := next.removeAndGetFirstPoint() - err := yield(self.mergeColumnsInSeries(s)) - if err != nil { - return err + ok, err := self.next.Yield(self.mergeColumnsInSeries(s)) + if err != nil || !ok { + return ok, err } } - return nil + return true, nil } // if `other` state is done (i.e. we'll receive no more points for its // timeseries) then we know that we won't get any points that are // older than what's in `self` so we can safely flush all `self` // points. -func (self *mergeState) flushIfNecessary(yield func(*protocol.Series) error) error { +func (self *CommonMergeEngine) flushIfNecessary() (bool, error) { if self.left.done && len(self.left.series) == 0 { - self.right.flush(self, yield) + if ok, err := self.right.flush(self); err != nil || !ok { + return ok, err + } } if self.right.done && len(self.right.series) == 0 { - self.left.flush(self, yield) + if ok, err := self.left.flush(self); err != nil || !ok { + return ok, err + } } - return nil + return true, nil } -func (self *mergeState) updateState(p *protocol.Series) { +func (self *CommonMergeEngine) updateState(p *protocol.Series) { self.left.updateState(p) self.right.updateState(p) @@ -249,7 +182,7 @@ func (self *seriesMergeState) removeAndGetFirstPoint() *protocol.Series { // returns a yield function that will sort points from table1 and // table2 no matter what the order in which they are received. -func mergeYield(table1, table2 string, mergeColumns bool, ascending bool, yield func(*protocol.Series) error) func(*protocol.Series) error { +func NewCommonMergeEngine(table1, table2 string, mergeColumns bool, ascending bool, next Processor) *CommonMergeEngine { state1 := &seriesMergeState{ name: table1, } @@ -262,21 +195,31 @@ func mergeYield(table1, table2 string, mergeColumns bool, ascending bool, yield whoGoFirst = isLater } - state := &mergeState{ + return &CommonMergeEngine{ + next: next, left: state1, right: state2, leftGoFirst: whoGoFirst, mergeColumns: mergeColumns, } - - return func(p *protocol.Series) error { - - state.updateState(p) - - if err := state.flushIfNecessary(yield); err != nil { - return err - } - - return state.yieldNextPoints(yield) - } +} + +func (e *CommonMergeEngine) Close() error { + e.Yield(&protocol.Series{Name: &e.left.name, Fields: []string{}}) + e.Yield(&protocol.Series{Name: &e.right.name, Fields: []string{}}) + return e.next.Close() +} + +func (e *CommonMergeEngine) Name() string { + return "CommonMergeEngine" +} + +func (e *CommonMergeEngine) Yield(s *protocol.Series) (bool, error) { + e.updateState(s) + + if ok, err := e.flushIfNecessary(); !ok || err != nil { + return ok, err + } + + return e.yieldNextPoints() } diff --git a/engine/constants.go b/engine/constants.go new file mode 100644 index 0000000000..9d471e5cfb --- /dev/null +++ b/engine/constants.go @@ -0,0 +1,5 @@ +package engine + +import "github.com/influxdb/influxdb/protocol" + +var queryResponse = protocol.Response_QUERY diff --git a/engine/engine.go b/engine/engine.go index 281142c1ea..a2df4be50d 100644 --- a/engine/engine.go +++ b/engine/engine.go @@ -1,14 +1,6 @@ package engine import ( - "fmt" - "math" - "strconv" - "strings" - "time" - - log "code.google.com/p/log4go" - "github.com/influxdb/influxdb/common" "github.com/influxdb/influxdb/parser" "github.com/influxdb/influxdb/protocol" ) @@ -24,256 +16,33 @@ type SeriesState struct { lastTimestamp int64 } -type QueryEngine struct { - // query information - query *parser.SelectQuery - isAggregateQuery bool - fields []string - where *parser.WhereCondition - fillWithZero bool - - // was start time set in the query, e.g. time > now() - 1d - startTimeSpecified bool - startTime int64 - endTime int64 - - // output fields - responseChan chan *protocol.Response - limiter *Limiter - seriesToPoints map[string]*protocol.Series - yield func(*protocol.Series) error - aggregateYield func(*protocol.Series) error - - // variables for aggregate queries - aggregators []Aggregator - elems []*parser.Value // group by columns other than time() - duration *time.Duration // the time by duration if any - seriesStates map[string]*SeriesState - - // query statistics - explain bool - runStartTime float64 - runEndTime float64 - pointsRead int64 - pointsWritten int64 - shardId int - shardLocal bool -} - -var ( - endStreamResponse = protocol.Response_END_STREAM - explainQueryResponse = protocol.Response_EXPLAIN_QUERY -) - const ( POINT_BATCH_SIZE = 64 ) -// distribute query and possibly do the merge/join before yielding the points -func (self *QueryEngine) distributeQuery(query *parser.SelectQuery, yield func(*protocol.Series) error) error { - // see if this is a merge query - fromClause := query.GetFromClause() - if fromClause.Type == parser.FromClauseMerge { - yield = getMergeYield(fromClause.Names[0].Name.Name, fromClause.Names[1].Name.Name, query.Ascending, yield) - } - - if fromClause.Type == parser.FromClauseInnerJoin { - yield = getJoinYield(query, yield) - } - - self.yield = yield - return nil -} - -func NewQueryEngine(query *parser.SelectQuery, responseChan chan *protocol.Response) (*QueryEngine, error) { +func NewQueryEngine(next Processor, query *parser.SelectQuery) (Processor, error) { limit := query.Limit - queryEngine := &QueryEngine{ - query: query, - where: query.GetWhereCondition(), - limiter: NewLimiter(limit), - responseChan: responseChan, - seriesToPoints: make(map[string]*protocol.Series), - // stats stuff - explain: query.IsExplainQuery(), - runStartTime: 0, - runEndTime: 0, - pointsRead: 0, - pointsWritten: 0, - shardId: 0, - shardLocal: false, //that really doesn't matter if it is not EXPLAIN query - duration: nil, - seriesStates: make(map[string]*SeriesState), - } - - if queryEngine.explain { - queryEngine.runStartTime = float64(time.Now().UnixNano()) / float64(time.Millisecond) - } - - yield := func(series *protocol.Series) error { - var response *protocol.Response - - queryEngine.limiter.calculateLimitAndSlicePoints(series) - if len(series.Points) == 0 { - return nil - } - if queryEngine.explain { - //TODO: We may not have to send points, just count them - queryEngine.pointsWritten += int64(len(series.Points)) - } - response = &protocol.Response{Type: &queryResponse, Series: series} - responseChan <- response - return nil - } + var engine Processor = NewPassthroughEngineWithLimit(next, 1, limit) var err error if query.HasAggregates() { - err = queryEngine.executeCountQueryWithGroupBy(query, yield) + engine, err = NewAggregatorEngine(query, engine) } else if containsArithmeticOperators(query) { - err = queryEngine.executeArithmeticQuery(query, yield) - } else { - err = queryEngine.distributeQuery(query, yield) + engine, err = NewArithmeticEngine(query, engine) + } + + fromClause := query.GetFromClause() + if fromClause.Type == parser.FromClauseMerge { + engine = NewMergeEngine(fromClause.Names[0].Name.Name, fromClause.Names[1].Name.Name, query.Ascending, engine) + } else if fromClause.Type == parser.FromClauseInnerJoin { + engine = NewJoinEngine(query, engine) } if err != nil { return nil, err } - return queryEngine, nil -} - -// Shard will call this method for EXPLAIN query -func (self *QueryEngine) SetShardInfo(shardId int, shardLocal bool) { - self.shardId = shardId - self.shardLocal = shardLocal -} - -// Returns false if the query should be stopped (either because of limit or error) -func (self *QueryEngine) YieldPoint(seriesName *string, fieldNames []string, point *protocol.Point) (shouldContinue bool) { - shouldContinue = true - series := self.seriesToPoints[*seriesName] - if series == nil { - series = &protocol.Series{Name: protocol.String(*seriesName), Fields: fieldNames, Points: make([]*protocol.Point, 0, POINT_BATCH_SIZE)} - self.seriesToPoints[*seriesName] = series - } else if len(series.Points) >= POINT_BATCH_SIZE { - shouldContinue = self.yieldSeriesData(series) - series = &protocol.Series{Name: protocol.String(*seriesName), Fields: fieldNames, Points: make([]*protocol.Point, 0, POINT_BATCH_SIZE)} - self.seriesToPoints[*seriesName] = series - } - series.Points = append(series.Points, point) - - if self.explain { - self.pointsRead++ - } - - return shouldContinue -} - -func (self *QueryEngine) YieldSeries(seriesIncoming *protocol.Series) (shouldContinue bool) { - if self.explain { - self.pointsRead += int64(len(seriesIncoming.Points)) - } - seriesName := seriesIncoming.GetName() - self.seriesToPoints[seriesName] = &protocol.Series{Name: &seriesName, Fields: seriesIncoming.Fields} - return self.yieldSeriesData(seriesIncoming) && !self.limiter.hitLimit(seriesIncoming.GetName()) -} - -func (self *QueryEngine) yieldSeriesData(series *protocol.Series) bool { - err := self.yield(series) - if err != nil { - log.Error(err) - return false - } - return true -} - -func (self *QueryEngine) Close() { - for _, series := range self.seriesToPoints { - if len(series.Points) == 0 { - continue - } - self.yieldSeriesData(series) - } - - var err error - for _, series := range self.seriesToPoints { - s := &protocol.Series{ - Name: series.Name, - Fields: series.Fields, - } - err = self.yield(s) - if err != nil { - break - } - } - - // make sure we yield an empty series for series without points - fromClause := self.query.GetFromClause() - if fromClause.Type == parser.FromClauseMerge { - for _, s := range []string{fromClause.Names[0].Name.Name, fromClause.Names[1].Name.Name} { - if _, ok := self.seriesToPoints[s]; ok { - continue - } - - err := self.yield(&protocol.Series{ - Name: &s, - Fields: []string{}, - }) - if err != nil { - log.Error("Error while closing engine: %s", err) - } - } - } - - if self.isAggregateQuery { - self.runAggregates() - } - - if self.explain { - self.runEndTime = float64(time.Now().UnixNano()) / float64(time.Millisecond) - log.Debug("QueryEngine: %.3f R:%d W:%d", self.runEndTime-self.runStartTime, self.pointsRead, self.pointsWritten) - - self.SendQueryStats() - } - response := &protocol.Response{Type: &endStreamResponse} - if err != nil { - message := err.Error() - response.ErrorMessage = &message - } - self.responseChan <- response -} - -func (self *QueryEngine) SendQueryStats() { - timestamp := time.Now().UnixNano() / int64(time.Microsecond) - - runTime := self.runEndTime - self.runStartTime - points := []*protocol.Point{} - pointsRead := self.pointsRead - pointsWritten := self.pointsWritten - shardId := int64(self.shardId) - shardLocal := self.shardLocal - engineName := "QueryEngine" - - point := &protocol.Point{ - Values: []*protocol.FieldValue{ - {StringValue: &engineName}, - {Int64Value: &shardId}, - {BoolValue: &shardLocal}, - {DoubleValue: &runTime}, - {Int64Value: &pointsRead}, - {Int64Value: &pointsWritten}, - }, - Timestamp: ×tamp, - } - points = append(points, point) - - seriesName := "explain query" - series := &protocol.Series{ - Name: &seriesName, - Fields: []string{"engine_name", "shard_id", "shard_local", "run_time", "points_read", "points_written"}, - Points: points, - } - response := &protocol.Response{Type: &explainQueryResponse, Series: series} - self.responseChan <- response + return engine, nil } func containsArithmeticOperators(query *parser.SelectQuery) bool { @@ -285,31 +54,6 @@ func containsArithmeticOperators(query *parser.SelectQuery) bool { return false } -func (self *QueryEngine) getTimestampFromPoint(point *protocol.Point) int64 { - return self.getTimestampBucket(uint64(*point.GetTimestampInMicroseconds())) -} - -func (self *QueryEngine) getTimestampBucket(timestampMicroseconds uint64) int64 { - timestampMicroseconds *= 1000 // convert to nanoseconds - multiplier := uint64(*self.duration) - return int64(timestampMicroseconds / multiplier * multiplier / 1000) -} - -type PointRange struct { - startTime int64 - endTime int64 -} - -func (self *PointRange) UpdateRange(point *protocol.Point) { - timestamp := *point.GetTimestampInMicroseconds() - if timestamp < self.startTime { - self.startTime = timestamp - } - if timestamp > self.endTime { - self.endTime = timestamp - } -} - func crossProduct(values [][][]*protocol.FieldValue) [][]*protocol.FieldValue { if len(values) == 0 { return [][]*protocol.FieldValue{{}} @@ -324,378 +68,3 @@ func crossProduct(values [][][]*protocol.FieldValue) [][]*protocol.FieldValue { } return returnValues } - -func (self *QueryEngine) executeCountQueryWithGroupBy(query *parser.SelectQuery, yield func(*protocol.Series) error) error { - self.aggregateYield = yield - duration, err := query.GetGroupByClause().GetGroupByTime() - if err != nil { - return err - } - - self.isAggregateQuery = true - self.duration = duration - self.aggregators = []Aggregator{} - - for _, value := range query.GetColumnNames() { - if !value.IsFunctionCall() { - continue - } - lowerCaseName := strings.ToLower(value.Name) - initializer := registeredAggregators[lowerCaseName] - if initializer == nil { - return common.NewQueryError(common.InvalidArgument, fmt.Sprintf("Unknown function %s", value.Name)) - } - aggregator, err := initializer(query, value, query.GetGroupByClause().FillValue) - if err != nil { - return common.NewQueryError(common.InvalidArgument, fmt.Sprintf("%s", err)) - } - self.aggregators = append(self.aggregators, aggregator) - } - - for _, elem := range query.GetGroupByClause().Elems { - if elem.IsFunctionCall() { - continue - } - self.elems = append(self.elems, elem) - } - - self.fillWithZero = query.GetGroupByClause().FillWithZero - - // This is a special case for issue #426. If the start time is - // specified and there's a group by clause and fill with zero, then - // we need to fill the entire range from start time to end time - if query.IsStartTimeSpecified() && self.duration != nil && self.fillWithZero { - self.startTimeSpecified = true - self.startTime = query.GetStartTime().Truncate(*self.duration).UnixNano() / 1000 - self.endTime = query.GetEndTime().Truncate(*self.duration).UnixNano() / 1000 - } - - self.initializeFields() - - err = self.distributeQuery(query, func(series *protocol.Series) error { - if len(series.Points) == 0 { - return nil - } - - return self.aggregateValuesForSeries(series) - }) - - return err -} - -func (self *QueryEngine) initializeFields() { - for _, aggregator := range self.aggregators { - columnNames := aggregator.ColumnNames() - self.fields = append(self.fields, columnNames...) - } - - if self.elems == nil { - return - } - - for _, value := range self.elems { - tempName := value.Name - self.fields = append(self.fields, tempName) - } -} - -var _count = 0 - -func (self *QueryEngine) getSeriesState(name string) *SeriesState { - state := self.seriesStates[name] - if state == nil { - levels := len(self.elems) - if self.duration != nil && self.fillWithZero { - levels++ - } - - state = &SeriesState{ - started: false, - trie: NewTrie(levels, len(self.aggregators)), - lastTimestamp: 0, - pointsRange: &PointRange{math.MaxInt64, math.MinInt64}, - } - self.seriesStates[name] = state - } - return state -} - -// We have three types of queries: -// 1. time() without fill -// 2. time() with fill -// 3. no time() -// -// For (1) we flush as soon as a new bucket start, the prefix tree -// keeps track of the other group by columns without the time -// bucket. We reset the trie once the series is yielded. For (2), we -// keep track of all group by columns with time being the last level -// in the prefix tree. At the end of the query we step through [start -// time, end time] in self.duration steps and get the state from the -// prefix tree, using default values for groups without state in the -// prefix tree. For the last case we keep the groups in the prefix -// tree and on close() we loop through the groups and flush their -// values with a timestamp equal to now() -func (self *QueryEngine) aggregateValuesForSeries(series *protocol.Series) error { - for _, aggregator := range self.aggregators { - if err := aggregator.InitializeFieldsMetadata(series); err != nil { - return err - } - } - - seriesState := self.getSeriesState(series.GetName()) - currentRange := seriesState.pointsRange - - includeTimestampInGroup := self.duration != nil && self.fillWithZero - var group []*protocol.FieldValue - if !includeTimestampInGroup { - group = make([]*protocol.FieldValue, len(self.elems)) - } else { - group = make([]*protocol.FieldValue, len(self.elems)+1) - } - - for _, point := range series.Points { - currentRange.UpdateRange(point) - - // this is a groupby with time() and no fill, flush as soon as we - // start a new bucket - if self.duration != nil && !self.fillWithZero { - timestamp := self.getTimestampFromPoint(point) - // this is the timestamp aggregator - if seriesState.started && seriesState.lastTimestamp != timestamp { - self.runAggregatesForTable(series.GetName()) - } - seriesState.lastTimestamp = timestamp - seriesState.started = true - } - - // get the group this point belongs to - for idx, elem := range self.elems { - // TODO: create an index from fieldname to index - value, err := GetValue(elem, series.Fields, point) - if err != nil { - return err - } - group[idx] = value - } - - // if this is a fill() query, add the timestamp at the end - if includeTimestampInGroup { - timestamp := self.getTimestampFromPoint(point) - group[len(self.elems)] = &protocol.FieldValue{Int64Value: protocol.Int64(timestamp)} - } - - // update the state of the given group - node := seriesState.trie.GetNode(group) - var err error - for idx, aggregator := range self.aggregators { - node.states[idx], err = aggregator.AggregatePoint(node.states[idx], point) - if err != nil { - return err - } - } - } - - return nil -} - -func (self *QueryEngine) runAggregates() { - for t := range self.seriesStates { - self.runAggregatesForTable(t) - } -} - -func (self *QueryEngine) calculateSummariesForTable(table string) { - trie := self.getSeriesState(table).trie - err := trie.Traverse(func(group []*protocol.FieldValue, node *Node) error { - for idx, aggregator := range self.aggregators { - aggregator.CalculateSummaries(node.states[idx]) - } - return nil - }) - if err != nil { - panic("Error while calculating summaries") - } -} - -func (self *QueryEngine) runAggregatesForTable(table string) { - // TODO: if this is a fill query, step through [start,end] in duration - // steps and flush the groups for the given bucket - - self.calculateSummariesForTable(table) - - state := self.getSeriesState(table) - trie := state.trie - points := make([]*protocol.Point, 0, trie.CountLeafNodes()) - f := func(group []*protocol.FieldValue, node *Node) error { - points = append(points, self.getValuesForGroup(table, group, node)...) - return nil - } - - var err error - if self.duration != nil && self.fillWithZero { - timestampRange := state.pointsRange - if self.startTimeSpecified { - timestampRange = &PointRange{startTime: self.startTime, endTime: self.endTime} - } - - // TODO: DRY this - if self.query.Ascending { - bucket := self.getTimestampBucket(uint64(timestampRange.startTime)) - for bucket <= timestampRange.endTime { - timestamp := &protocol.FieldValue{Int64Value: protocol.Int64(bucket)} - defaultChildNode := &Node{states: make([]interface{}, len(self.aggregators))} - err = trie.TraverseLevel(len(self.elems), func(v []*protocol.FieldValue, node *Node) error { - childNode := node.GetChildNode(timestamp) - if childNode == nil { - childNode = defaultChildNode - } - return f(append(v, timestamp), childNode) - }) - bucket += self.duration.Nanoseconds() / 1000 - } - } else { - bucket := self.getTimestampBucket(uint64(timestampRange.endTime)) - for { - timestamp := &protocol.FieldValue{Int64Value: protocol.Int64(bucket)} - defaultChildNode := &Node{states: make([]interface{}, len(self.aggregators))} - err = trie.TraverseLevel(len(self.elems), func(v []*protocol.FieldValue, node *Node) error { - childNode := node.GetChildNode(timestamp) - if childNode == nil { - childNode = defaultChildNode - } - return f(append(v, timestamp), childNode) - }) - if bucket <= timestampRange.startTime { - break - } - bucket -= self.duration.Nanoseconds() / 1000 - } - } - } else { - err = trie.Traverse(f) - } - if err != nil { - panic(err) - } - trie.Clear() - self.aggregateYield(&protocol.Series{ - Name: &table, - Fields: self.fields, - Points: points, - }) -} - -func (self *QueryEngine) getValuesForGroup(table string, group []*protocol.FieldValue, node *Node) []*protocol.Point { - - values := [][][]*protocol.FieldValue{} - - var timestamp int64 - useTimestamp := false - if self.duration != nil && !self.fillWithZero { - // if there's a group by time(), then the timestamp is the lastTimestamp - timestamp = self.getSeriesState(table).lastTimestamp - useTimestamp = true - } else if self.duration != nil && self.fillWithZero { - // if there's no group by time(), but a fill value was specified, - // the timestamp is the last value in the group - timestamp = group[len(group)-1].GetInt64Value() - useTimestamp = true - } - - for idx, aggregator := range self.aggregators { - values = append(values, aggregator.GetValues(node.states[idx])) - node.states[idx] = nil - } - - // do cross product of all the values - var _values [][]*protocol.FieldValue - if len(values) == 1 { - _values = values[0] - } else { - _values = crossProduct(values) - } - - points := []*protocol.Point{} - - for _, v := range _values { - /* groupPoints := []*protocol.Point{} */ - point := &protocol.Point{ - Values: v, - } - - if useTimestamp { - point.SetTimestampInMicroseconds(timestamp) - } else { - point.SetTimestampInMicroseconds(0) - } - - // FIXME: this should be looking at the fields slice not the group by clause - // FIXME: we should check whether the selected columns are in the group by clause - for idx := range self.elems { - point.Values = append(point.Values, group[idx]) - } - - points = append(points, point) - } - return points -} - -func (self *QueryEngine) executeArithmeticQuery(query *parser.SelectQuery, yield func(*protocol.Series) error) error { - - names := map[string]*parser.Value{} - for idx, v := range query.GetColumnNames() { - switch v.Type { - case parser.ValueSimpleName: - names[v.Name] = v - case parser.ValueFunctionCall: - names[v.Name] = v - case parser.ValueExpression: - if v.Alias != "" { - names[v.Alias] = v - } else { - names["expr"+strconv.Itoa(idx)] = v - } - } - } - - return self.distributeQuery(query, func(series *protocol.Series) error { - if len(series.Points) == 0 { - yield(series) - return nil - } - - newSeries := &protocol.Series{ - Name: series.Name, - } - - // create the new column names - for name := range names { - newSeries.Fields = append(newSeries.Fields, name) - } - - for _, point := range series.Points { - newPoint := &protocol.Point{ - Timestamp: point.Timestamp, - SequenceNumber: point.SequenceNumber, - } - for _, field := range newSeries.Fields { - value := names[field] - v, err := GetValue(value, series.Fields, point) - if err != nil { - log.Error("Error in arithmetic computation: %s", err) - return err - } - newPoint.Values = append(newPoint.Values, v) - } - newSeries.Points = append(newSeries.Points, newPoint) - } - - yield(newSeries) - - return nil - }) -} - -func (self *QueryEngine) GetName() string { - return "QueryEngine" -} diff --git a/engine/filtering_engine.go b/engine/filtering_engine.go index bee91105cf..21904ab91d 100644 --- a/engine/filtering_engine.go +++ b/engine/filtering_engine.go @@ -1,54 +1,42 @@ package engine import ( - log "code.google.com/p/log4go" + "fmt" + "github.com/influxdb/influxdb/parser" p "github.com/influxdb/influxdb/protocol" ) type FilteringEngine struct { query *parser.SelectQuery - processor QueryProcessor + processor Processor shouldFilter bool } -func NewFilteringEngine(query *parser.SelectQuery, processor QueryProcessor) *FilteringEngine { +func NewFilteringEngine(query *parser.SelectQuery, processor Processor) *FilteringEngine { shouldFilter := query.GetWhereCondition() != nil return &FilteringEngine{query, processor, shouldFilter} } -// optimize for yield series and use it here -func (self *FilteringEngine) YieldPoint(seriesName *string, columnNames []string, point *p.Point) bool { - return self.YieldSeries(&p.Series{ - Name: seriesName, - Fields: columnNames, - Points: []*p.Point{point}, - }) -} - -func (self *FilteringEngine) YieldSeries(seriesIncoming *p.Series) bool { +func (self *FilteringEngine) Yield(seriesIncoming *p.Series) (bool, error) { if !self.shouldFilter { - return self.processor.YieldSeries(seriesIncoming) + return self.processor.Yield(seriesIncoming) } series, err := Filter(self.query, seriesIncoming) if err != nil { - log.Error("Error while filtering points: %s [query = %s]", err, self.query.GetQueryString()) - return false + return false, fmt.Errorf("Error while filtering points: %s [query = %s]", err, self.query.GetQueryString()) } if len(series.Points) == 0 { - return true + return true, nil } - return self.processor.YieldSeries(series) + return self.processor.Yield(series) } -func (self *FilteringEngine) Close() { - self.processor.Close() +func (self *FilteringEngine) Close() error { + return self.processor.Close() } -func (self *FilteringEngine) SetShardInfo(shardId int, shardLocal bool) { - self.processor.SetShardInfo(shardId, shardLocal) -} -func (self *FilteringEngine) GetName() string { - return self.processor.GetName() +func (self *FilteringEngine) Name() string { + return self.processor.Name() } diff --git a/engine/join_engine.go b/engine/join_engine.go new file mode 100644 index 0000000000..041432362d --- /dev/null +++ b/engine/join_engine.go @@ -0,0 +1,85 @@ +package engine + +import ( + "github.com/influxdb/influxdb/parser" + "github.com/influxdb/influxdb/protocol" +) + +type JoinEngine struct { + query *parser.SelectQuery + next Processor + table1, table2 string + name string // the output table name + lastPoint1, lastPoint2 *protocol.Point + lastFields1, lastFields2 []string +} + +func NewJoinEngine(query *parser.SelectQuery, next Processor) Processor { + table1 := query.GetFromClause().Names[0].GetAlias() + table2 := query.GetFromClause().Names[1].GetAlias() + name := table1 + "_join_" + table2 + + joinEngine := &JoinEngine{ + next: next, + name: name, + table1: table1, + table2: table2, + query: query, + } + mergeEngine := NewCommonMergeEngine(table1, table2, false, query.Ascending, joinEngine) + return mergeEngine +} + +func (_ *JoinEngine) SetShardInfo(shardId int, shardLocal bool) {} + +func (je *JoinEngine) Name() string { + return "JoinEngine" +} + +func (je *JoinEngine) Close() error { + return je.next.Close() +} + +func (je *JoinEngine) Yield(s *protocol.Series) (bool, error) { + if *s.Name == je.table1 { + je.lastPoint1 = s.Points[len(s.Points)-1] + if je.lastFields1 == nil { + for _, f := range s.Fields { + je.lastFields1 = append(je.lastFields1, je.table1+"."+f) + } + } + } + + if *s.Name == je.table2 { + je.lastPoint2 = s.Points[len(s.Points)-1] + if je.lastFields2 == nil { + for _, f := range s.Fields { + je.lastFields2 = append(je.lastFields2, je.table2+"."+f) + } + } + } + + if je.lastPoint1 == nil || je.lastPoint2 == nil { + return true, nil + } + + newSeries := &protocol.Series{ + Name: &je.name, + Fields: append(je.lastFields1, je.lastFields2...), + Points: []*protocol.Point{ + { + Values: append(je.lastPoint1.Values, je.lastPoint2.Values...), + Timestamp: je.lastPoint2.Timestamp, + }, + }, + } + + je.lastPoint1 = nil + je.lastPoint2 = nil + + filteredSeries, _ := Filter(je.query, newSeries) + if len(filteredSeries.Points) > 0 { + return je.next.Yield(newSeries) + } + return true, nil +} diff --git a/engine/list_series_engine.go b/engine/list_series_engine.go deleted file mode 100644 index 798d02c02e..0000000000 --- a/engine/list_series_engine.go +++ /dev/null @@ -1,70 +0,0 @@ -package engine - -import ( - "github.com/influxdb/influxdb/protocol" -) - -const ( - MAX_SERIES_IN_RESPONSE = 10000 -) - -var ( - queryResponse = protocol.Response_QUERY -) - -type ListSeriesEngine struct { - responseChan chan *protocol.Response - response *protocol.Response -} - -func NewListSeriesEngine(responseChan chan *protocol.Response) *ListSeriesEngine { - response := &protocol.Response{ - Type: &queryResponse, - MultiSeries: make([]*protocol.Series, 0), - } - - return &ListSeriesEngine{ - responseChan: responseChan, - response: response, - } -} - -func (self *ListSeriesEngine) YieldPoint(seriesName *string, columnNames []string, point *protocol.Point) bool { - if len(self.response.MultiSeries) > MAX_SERIES_IN_RESPONSE { - self.responseChan <- self.response - self.response = &protocol.Response{ - Type: &queryResponse, - MultiSeries: make([]*protocol.Series, 0), - } - } - self.response.MultiSeries = append(self.response.MultiSeries, &protocol.Series{Name: seriesName}) - return true -} - -func (self *ListSeriesEngine) YieldSeries(seriesIncoming *protocol.Series) bool { - if len(self.response.MultiSeries) > MAX_SERIES_IN_RESPONSE { - self.responseChan <- self.response - self.response = &protocol.Response{ - Type: &queryResponse, - MultiSeries: make([]*protocol.Series, 0), - } - } - self.response.MultiSeries = append(self.response.MultiSeries, &protocol.Series{Name: seriesIncoming.Name}) - return true -} - -func (self *ListSeriesEngine) Close() { - if len(self.response.MultiSeries) > 0 { - self.responseChan <- self.response - } - response := &protocol.Response{Type: &endStreamResponse} - self.responseChan <- response -} - -func (self *ListSeriesEngine) SetShardInfo(shardId int, shardLocal bool) { - //EXPLAIN doens't work with this query -} - -func (self *ListSeriesEngine) GetName() string { - return "ListSeriesEngine" -} diff --git a/engine/merge_engine.go b/engine/merge_engine.go new file mode 100644 index 0000000000..813548348d --- /dev/null +++ b/engine/merge_engine.go @@ -0,0 +1,36 @@ +package engine + +import "github.com/influxdb/influxdb/protocol" + +type MergeEngine struct { + name string + next Processor +} + +func NewMergeEngine(table1, table2 string, ascending bool, next Processor) Processor { + name := table1 + "_merge_" + table2 + + me := &MergeEngine{name: name, next: next} + + return NewCommonMergeEngine(table1, table2, true, ascending, me) +} + +func (me *MergeEngine) Yield(s *protocol.Series) (bool, error) { + oldName := s.Name + s.Name = &me.name + s.Fields = append(s.Fields, "_orig_series") + for _, p := range s.Points { + p.Values = append(p.Values, &protocol.FieldValue{StringValue: oldName}) + } + return me.next.Yield(s) +} + +func (_ *MergeEngine) SetShardInfo(shardId int, shardLocal bool) {} + +func (me *MergeEngine) Close() error { + return me.next.Close() +} + +func (me *MergeEngine) Name() string { + return "MergeEngine" +} diff --git a/engine/passthrough_engine.go b/engine/passthrough_engine.go index 438d37d0ad..40685fe6b1 100644 --- a/engine/passthrough_engine.go +++ b/engine/passthrough_engine.go @@ -8,12 +8,11 @@ import ( "github.com/influxdb/influxdb/protocol" ) -type PassthroughEngine struct { - responseChan chan *protocol.Response - response *protocol.Response +type Passthrough struct { + next Processor + series *protocol.Series maxPointsInResponse int limiter *Limiter - responseType *protocol.Response_Type // query statistics runStartTime float64 @@ -24,16 +23,15 @@ type PassthroughEngine struct { shardLocal bool } -func NewPassthroughEngine(responseChan chan *protocol.Response, maxPointsInResponse int) *PassthroughEngine { - return NewPassthroughEngineWithLimit(responseChan, maxPointsInResponse, 0) +func NewPassthroughEngine(next Processor, maxPointsInResponse int) *Passthrough { + return NewPassthroughEngineWithLimit(next, maxPointsInResponse, 0) } -func NewPassthroughEngineWithLimit(responseChan chan *protocol.Response, maxPointsInResponse, limit int) *PassthroughEngine { - passthroughEngine := &PassthroughEngine{ - responseChan: responseChan, +func NewPassthroughEngineWithLimit(next Processor, maxPointsInResponse, limit int) *Passthrough { + passthroughEngine := &Passthrough{ + next: next, maxPointsInResponse: maxPointsInResponse, limiter: NewLimiter(limit), - responseType: &queryResponse, runStartTime: 0, runEndTime: 0, pointsRead: 0, @@ -45,62 +43,44 @@ func NewPassthroughEngineWithLimit(responseChan chan *protocol.Response, maxPoin return passthroughEngine } -func (self *PassthroughEngine) YieldPoint(seriesName *string, columnNames []string, point *protocol.Point) bool { - series := &protocol.Series{Name: seriesName, Points: []*protocol.Point{point}, Fields: columnNames} - return self.YieldSeries(series) -} - -func (self *PassthroughEngine) YieldSeries(seriesIncoming *protocol.Series) bool { +func (self *Passthrough) Yield(seriesIncoming *protocol.Series) (bool, error) { log.Debug("PassthroughEngine YieldSeries %d", len(seriesIncoming.Points)) - if *seriesIncoming.Name == "explain query" { - self.responseType = &explainQueryResponse - log.Debug("Response Changed!") - } else { - self.responseType = &queryResponse - } self.limiter.calculateLimitAndSlicePoints(seriesIncoming) if len(seriesIncoming.Points) == 0 { - log.Debug("Not sent == 0") - return false + return false, nil } - if self.response == nil { - self.response = &protocol.Response{ - Type: self.responseType, - Series: seriesIncoming, + if self.series == nil { + self.series = seriesIncoming + } else if self.series.GetName() != seriesIncoming.GetName() { + ok, err := self.next.Yield(self.series) + if !ok || err != nil { + return ok, err } - } else if self.response.Series.GetName() != seriesIncoming.GetName() { - self.responseChan <- self.response - self.response = &protocol.Response{ - Type: self.responseType, - Series: seriesIncoming, - } - } else if len(self.response.Series.Points) > self.maxPointsInResponse { - self.responseChan <- self.response - self.response = &protocol.Response{ - Type: self.responseType, - Series: seriesIncoming, + self.series = seriesIncoming + } else if len(self.series.Points) > self.maxPointsInResponse { + ok, err := self.next.Yield(self.series) + if !ok || err != nil { + return ok, err } + self.series = seriesIncoming } else { - self.response.Series = common.MergeSeries(self.response.Series, seriesIncoming) + self.series = common.MergeSeries(self.series, seriesIncoming) } - return !self.limiter.hitLimit(seriesIncoming.GetName()) - //return true + return !self.limiter.hitLimit(seriesIncoming.GetName()), nil } -func (self *PassthroughEngine) Close() { - if self.response != nil && self.response.Series != nil && self.response.Series.Name != nil { - self.responseChan <- self.response +func (self *Passthrough) Close() error { + if self.series != nil && self.series.Name != nil { + _, err := self.next.Yield(self.series) + if err != nil { + return err + } } - response := &protocol.Response{Type: &endStreamResponse} - self.responseChan <- response + return self.next.Close() } -func (self *PassthroughEngine) SetShardInfo(shardId int, shardLocal bool) { - //EXPLAIN doens't really work with this query (yet ?) -} - -func (self *PassthroughEngine) GetName() string { +func (self *Passthrough) Name() string { return "PassthroughEngine" } diff --git a/engine/point_range.go b/engine/point_range.go new file mode 100644 index 0000000000..5cb8037d76 --- /dev/null +++ b/engine/point_range.go @@ -0,0 +1,18 @@ +package engine + +import "github.com/influxdb/influxdb/protocol" + +type PointRange struct { + startTime int64 + endTime int64 +} + +func (self *PointRange) UpdateRange(point *protocol.Point) { + timestamp := *point.GetTimestampInMicroseconds() + if timestamp < self.startTime { + self.startTime = timestamp + } + if timestamp > self.endTime { + self.endTime = timestamp + } +} diff --git a/engine/processor.go b/engine/processor.go new file mode 100644 index 0000000000..8dfc06e099 --- /dev/null +++ b/engine/processor.go @@ -0,0 +1,16 @@ +package engine + +import "github.com/influxdb/influxdb/protocol" + +// Passed to a shard (local datastore or whatever) that gets yielded points from series. +type Processor interface { + // (true, nil) if the query should continue. False if processing + // should stop, because of an error in which case error isn't nil or + // because the desired data was read succesfully and no more data is + // needed. + Yield(s *protocol.Series) (bool, error) + Name() string + + // Flush any data that could be in the queue + Close() error +} diff --git a/engine/query_processor.go b/engine/query_processor.go deleted file mode 100644 index 42f0046f0c..0000000000 --- a/engine/query_processor.go +++ /dev/null @@ -1,19 +0,0 @@ -package engine - -import ( - p "github.com/influxdb/influxdb/protocol" -) - -type QueryProcessor interface { - // This method returns true if the query should continue. If the query should be stopped, - // like maybe the limit was hit, it should return false - YieldPoint(seriesName *string, columnNames []string, point *p.Point) bool - YieldSeries(seriesIncoming *p.Series) bool - Close() - - // Set by the shard, so EXPLAIN query can know query against which shard is being measured - SetShardInfo(shardId int, shardLocal bool) - - // Let QueryProcessor identify itself. What if it is a spy and we can't check that? - GetName() string -} diff --git a/integration/data_test.go b/integration/data_test.go index 2e723c66e8..ed09eb7a15 100644 --- a/integration/data_test.go +++ b/integration/data_test.go @@ -1049,9 +1049,7 @@ func (self *DataTestSuite) FilterWithInvalidCondition(c *C) (Fun, Fun) { data := CreatePoints("test_invalid_where_condition", 1, 1) client.WriteData(data, c) }, func(client Client) { - data := client.RunQuery("select * from test_invalid_where_condition where column0 > 0.1s", c, "m") - // TODO: this should return an error - c.Assert(data, HasLen, 0) + client.RunInvalidQuery("select * from test_invalid_where_condition where column0 > 0.1s", c, "m") } } diff --git a/protocol/protocol.proto b/protocol/protocol.proto index 1614f00b2e..104345f2b1 100644 --- a/protocol/protocol.proto +++ b/protocol/protocol.proto @@ -52,9 +52,9 @@ message Response { QUERY = 1; WRITE_OK = 2; END_STREAM = 3; - REPLICATION_REPLAY = 4; - REPLICATION_REPLAY_END = 5; - SEQUENCE_NUMBER = 7; + // REPLICATION_REPLAY = 4; + // REPLICATION_REPLAY_END = 5; + // SEQUENCE_NUMBER = 7; // Access denied also serves as an end of stream response ACCESS_DENIED = 8; HEARTBEAT = 9; @@ -66,10 +66,9 @@ message Response { } required Type type = 1; required uint32 request_id = 2; - optional Series series = 3; + repeated Series multi_series = 3; optional ErrorCode error_code = 4; optional string error_message = 5; optional int64 nextPointTime = 6; optional Request request = 7; - repeated Series multi_series = 8; }