Fix #250. Continuous queries now get auto-incrementing sequence numbers assigned per timestamp.

pull/269/head
Todd Persen 2014-02-21 16:24:47 -05:00
parent 370140335f
commit 51f03d98e7
3 changed files with 50 additions and 5 deletions

View File

@ -374,12 +374,22 @@ func (self *CoordinatorImpl) CommitSeriesData(db string, series *protocol.Series
}
series.Points = series.Points[lastPointIndex:]
err := self.write(db, series, shardToWrite)
if err != nil {
log.Error("COORD error writing: ", err)
if len(series.Points) > 0 {
if shardToWrite == nil {
shardToWrite, _ = self.clusterConfiguration.GetShardToWriteToBySeriesAndTime(db, *series.Name, *series.Points[0].Timestamp)
}
err := self.write(db, series, shardToWrite)
if err != nil {
log.Error("COORD error writing: ", err)
}
return err
}
return err
return nil
}
func (self *CoordinatorImpl) write(db string, series *protocol.Series, shard cluster.Shard) error {

View File

@ -447,16 +447,18 @@ func (s *RaftServer) checkContinuousQueries() {
}
func (s *RaftServer) runContinuousQuery(db string, query *parser.SelectQuery, start time.Time, end time.Time) {
sequenceMap := make(map[int64]int)
clusterAdmin := s.clusterConfig.GetClusterAdmin("root")
intoClause := query.GetIntoClause()
targetName := intoClause.Target.Name
sequenceNumber := uint64(1)
queryString := query.GetQueryStringForContinuousQuery(start, end)
f := func(series *protocol.Series) error {
interpolatedTargetName := strings.Replace(targetName, ":series_name", *series.Name, -1)
series.Name = &interpolatedTargetName
for _, point := range series.Points {
sequenceMap[*point.Timestamp] = sequenceMap[*point.Timestamp] + 1
sequenceNumber := uint64(sequenceMap[*point.Timestamp])
point.SequenceNumber = &sequenceNumber
}

View File

@ -1121,3 +1121,36 @@ func dirExists(path string) (bool, error) {
}
return false, err
}
func (self *ServerSuite) TestContinuousQueryWithMixedGroupByOperations(c *C) {
data := fmt.Sprintf(`[
{
"name": "cqtest",
"columns": ["time", "reqtime", "url"],
"points": [
[0, 8.0, "/login"],
[0, 3.0, "/list"],
[0, 4.0, "/register"],
[5, 9.0, "/login"],
[5, 4.0, "/list"],
[5, 5.0, "/register"]
]
}
]`)
self.serverProcesses[0].Post("/db/test_cq/series?u=paul&p=pass&time_precision=s", data, c)
time.Sleep(time.Second)
self.serverProcesses[0].QueryAsRoot("test_cq", "select mean(reqtime), url from cqtest group by time(10s), url into cqtest.10s", false, c)
collection := self.serverProcesses[0].QueryAsRoot("test_cq", "select * from cqtest.10s", false, c)
series := collection.GetSeries("cqtest.10s", c)
c.Assert(series.GetValueForPointAndColumn(0, "mean", c), Equals, float64(8.5))
c.Assert(series.GetValueForPointAndColumn(0, "url", c), Equals, "/login")
c.Assert(series.GetValueForPointAndColumn(1, "mean", c), Equals, float64(3.5))
c.Assert(series.GetValueForPointAndColumn(1, "url", c), Equals, "/list")
c.Assert(series.GetValueForPointAndColumn(2, "mean", c), Equals, float64(4.5))
c.Assert(series.GetValueForPointAndColumn(2, "url", c), Equals, "/register")
}