From 51f03d98e711fcd1e3c84544385fc867fa75ca31 Mon Sep 17 00:00:00 2001 From: Todd Persen Date: Fri, 21 Feb 2014 16:24:47 -0500 Subject: [PATCH] Fix #250. Continuous queries now get auto-incrementing sequence numbers assigned per timestamp. --- src/coordinator/coordinator.go | 18 ++++++++++++++---- src/coordinator/raft_server.go | 4 +++- src/integration/server_test.go | 33 +++++++++++++++++++++++++++++++++ 3 files changed, 50 insertions(+), 5 deletions(-) diff --git a/src/coordinator/coordinator.go b/src/coordinator/coordinator.go index 17949507e4..86ab521ca1 100644 --- a/src/coordinator/coordinator.go +++ b/src/coordinator/coordinator.go @@ -374,12 +374,22 @@ func (self *CoordinatorImpl) CommitSeriesData(db string, series *protocol.Series } series.Points = series.Points[lastPointIndex:] - err := self.write(db, series, shardToWrite) - if err != nil { - log.Error("COORD error writing: ", err) + + if len(series.Points) > 0 { + if shardToWrite == nil { + shardToWrite, _ = self.clusterConfiguration.GetShardToWriteToBySeriesAndTime(db, *series.Name, *series.Points[0].Timestamp) + } + + err := self.write(db, series, shardToWrite) + + if err != nil { + log.Error("COORD error writing: ", err) + } + + return err } - return err + return nil } func (self *CoordinatorImpl) write(db string, series *protocol.Series, shard cluster.Shard) error { diff --git a/src/coordinator/raft_server.go b/src/coordinator/raft_server.go index a34581f114..2a6cedf874 100644 --- a/src/coordinator/raft_server.go +++ b/src/coordinator/raft_server.go @@ -447,16 +447,18 @@ func (s *RaftServer) checkContinuousQueries() { } func (s *RaftServer) runContinuousQuery(db string, query *parser.SelectQuery, start time.Time, end time.Time) { + sequenceMap := make(map[int64]int) clusterAdmin := s.clusterConfig.GetClusterAdmin("root") intoClause := query.GetIntoClause() targetName := intoClause.Target.Name - sequenceNumber := uint64(1) queryString := query.GetQueryStringForContinuousQuery(start, end) f := func(series *protocol.Series) error { interpolatedTargetName := strings.Replace(targetName, ":series_name", *series.Name, -1) series.Name = &interpolatedTargetName for _, point := range series.Points { + sequenceMap[*point.Timestamp] = sequenceMap[*point.Timestamp] + 1 + sequenceNumber := uint64(sequenceMap[*point.Timestamp]) point.SequenceNumber = &sequenceNumber } diff --git a/src/integration/server_test.go b/src/integration/server_test.go index ba70097fa6..929aa17aae 100644 --- a/src/integration/server_test.go +++ b/src/integration/server_test.go @@ -1121,3 +1121,36 @@ func dirExists(path string) (bool, error) { } return false, err } + +func (self *ServerSuite) TestContinuousQueryWithMixedGroupByOperations(c *C) { + data := fmt.Sprintf(`[ + { + "name": "cqtest", + "columns": ["time", "reqtime", "url"], + "points": [ + [0, 8.0, "/login"], + [0, 3.0, "/list"], + [0, 4.0, "/register"], + [5, 9.0, "/login"], + [5, 4.0, "/list"], + [5, 5.0, "/register"] + ] + } + ]`) + + self.serverProcesses[0].Post("/db/test_cq/series?u=paul&p=pass&time_precision=s", data, c) + + time.Sleep(time.Second) + + self.serverProcesses[0].QueryAsRoot("test_cq", "select mean(reqtime), url from cqtest group by time(10s), url into cqtest.10s", false, c) + + collection := self.serverProcesses[0].QueryAsRoot("test_cq", "select * from cqtest.10s", false, c) + series := collection.GetSeries("cqtest.10s", c) + + c.Assert(series.GetValueForPointAndColumn(0, "mean", c), Equals, float64(8.5)) + c.Assert(series.GetValueForPointAndColumn(0, "url", c), Equals, "/login") + c.Assert(series.GetValueForPointAndColumn(1, "mean", c), Equals, float64(3.5)) + c.Assert(series.GetValueForPointAndColumn(1, "url", c), Equals, "/list") + c.Assert(series.GetValueForPointAndColumn(2, "mean", c), Equals, float64(4.5)) + c.Assert(series.GetValueForPointAndColumn(2, "url", c), Equals, "/register") +}