Merge pull request #461 from influxdb/fix-458-continuous-query-sequence-numbers
Fix #458. Correct sequence number calculation during continuous query interpolation.pull/463/head
commit
e671755ad4
|
@ -538,15 +538,17 @@ func (self *CoordinatorImpl) InterpolateValuesAndCommit(query string, db string,
|
|||
fieldIndex := series.GetFieldIndex(fieldName)
|
||||
return point.GetFieldValueAsString(fieldIndex)
|
||||
})
|
||||
cleanedTargetName := strings.Map(replaceInvalidCharacters, targetNameWithValues)
|
||||
|
||||
sanitizedTargetName := strings.Map(replaceInvalidCharacters, targetNameWithValues)
|
||||
|
||||
if assignSequenceNumbers {
|
||||
sequenceMap[sequenceKey{targetName, *point.Timestamp}] += 1
|
||||
sequenceNumber := uint64(sequenceMap[sequenceKey{targetName, *point.Timestamp}])
|
||||
key := sequenceKey{sanitizedTargetName, *point.Timestamp}
|
||||
sequenceMap[key] += 1
|
||||
sequenceNumber := uint64(sequenceMap[key])
|
||||
point.SequenceNumber = &sequenceNumber
|
||||
}
|
||||
|
||||
newSeries := &protocol.Series{Name: &cleanedTargetName, Fields: series.Fields, Points: []*protocol.Point{point}}
|
||||
newSeries := &protocol.Series{Name: &sanitizedTargetName, Fields: series.Fields, Points: []*protocol.Point{point}}
|
||||
if e := self.CommitSeriesData(db, []*protocol.Series{newSeries}); e != nil {
|
||||
log.Error("Couldn't write data for continuous query: ", e)
|
||||
}
|
||||
|
|
|
@ -502,6 +502,13 @@ func (self ServerSuite) RemoveAllContinuousQueries(db string, c *C) {
|
|||
}
|
||||
}
|
||||
|
||||
func (self ServerSuite) AssertContinuousQueryCount(db string, count int, c *C) {
|
||||
client := self.serverProcesses[0].GetClient(db, c)
|
||||
queries, err := client.GetContinuousQueries()
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(queries, HasLen, count)
|
||||
}
|
||||
|
||||
func (self *ServerSuite) TestContinuousQueryManagement(c *C) {
|
||||
defer self.RemoveAllContinuousQueries("test_cq", c)
|
||||
|
||||
|
@ -771,6 +778,71 @@ func (self *ServerSuite) TestContinuousQueryInterpolation(c *C) {
|
|||
/* self.serverProcesses[0].QueryAsRoot("test_cq", "drop continuous query 4;", false, c) */
|
||||
}
|
||||
|
||||
func (self *ServerSuite) TestContinuousQuerySequenceNumberAssignmentWithInterpolation(c *C) {
|
||||
defer self.RemoveAllContinuousQueries("test_cq", c)
|
||||
|
||||
currentTime := time.Now()
|
||||
t0 := currentTime.Truncate(10 * time.Second)
|
||||
t1 := time.Unix(t0.Unix()-5, 0).Unix()
|
||||
t2 := time.Unix(t0.Unix()-10, 0).Unix()
|
||||
|
||||
data := fmt.Sprintf(`[
|
||||
{ "name": "points",
|
||||
"columns": ["c1", "c2", "time"],
|
||||
"points": [
|
||||
[1, "a", %d],
|
||||
[2, "a", %d],
|
||||
[3, "a", %d],
|
||||
[7, "b", %d],
|
||||
[8, "b", %d],
|
||||
[9, "b", %d]
|
||||
]}
|
||||
]`, t1, t1, t1, t2, t2, t2)
|
||||
|
||||
self.serverProcesses[0].Post("/db/test_cq/series?u=paul&p=pass", data, c)
|
||||
|
||||
self.serverProcesses[0].QueryAsRoot("test_cq", "select count(c1) from points group by time(5s), c2 into :series_name.count.[c2];", false, c)
|
||||
self.AssertContinuousQueryCount("test_cq", 1, c)
|
||||
self.serverProcesses[0].WaitForServerToSync()
|
||||
time.Sleep(time.Second)
|
||||
|
||||
self.serverProcesses[0].QueryAsRoot("test_cq", "drop continuous query 1;", false, c)
|
||||
|
||||
data = fmt.Sprintf(`[
|
||||
{ "name": "points",
|
||||
"columns": ["c1", "c2", "time"],
|
||||
"points": [
|
||||
[1, "aa", %d],
|
||||
[2, "aa", %d],
|
||||
[3, "aa", %d],
|
||||
[7, "bb", %d],
|
||||
[8, "bb", %d],
|
||||
[9, "bb", %d]
|
||||
]}
|
||||
]`, t1, t1, t1, t2, t2, t2)
|
||||
|
||||
self.serverProcesses[0].Post("/db/test_cq/series?u=paul&p=pass", data, c)
|
||||
|
||||
self.serverProcesses[0].QueryAsRoot("test_cq", "select count(c1) from points group by time(5s), c2 into :series_name.count.[c2];", false, c)
|
||||
self.AssertContinuousQueryCount("test_cq", 1, c)
|
||||
self.serverProcesses[0].WaitForServerToSync()
|
||||
time.Sleep(time.Second)
|
||||
|
||||
collection := self.serverProcesses[0].Query("test_cq", "select * from points;", false, c)
|
||||
series := collection.GetSeries("points", c)
|
||||
c.Assert(series.Points, HasLen, 12)
|
||||
|
||||
collection = self.serverProcesses[0].Query("test_cq", "select * from /points.count.*/;", false, c)
|
||||
c.Assert(collection.Members, HasLen, 4)
|
||||
|
||||
subseries := []string{"a", "aa", "b", "bb"}
|
||||
for i := range subseries {
|
||||
series = collection.GetSeries("points.count."+subseries[i], c)
|
||||
c.Assert(series.Points, HasLen, 1)
|
||||
c.Assert(series.Points[0][1], Equals, float64(1))
|
||||
}
|
||||
}
|
||||
|
||||
func (self *ServerSuite) TestGetServers(c *C) {
|
||||
body := self.serverProcesses[0].Get("/cluster/servers?u=root&p=root", c)
|
||||
|
||||
|
|
Loading…
Reference in New Issue