Fix #534. Create a new series when interpolating

pull/546/head v0.6.3
John Shahid 2014-05-13 18:18:25 -04:00
parent 2d11e2271c
commit 2e0f6c1700
3 changed files with 42 additions and 14 deletions

View File

@ -13,6 +13,7 @@
- [Issue #538](https://github.com/influxdb/influxdb/issues/538). Don't panic if the same series existed twice in the request with different columns
- [Issue #536](https://github.com/influxdb/influxdb/issues/536). Joining the cluster after shards are creating shouldn't cause new nodes to panic
- [Issue #539](https://github.com/influxdb/influxdb/issues/539). count(distinct()) with fill shouldn't panic on empty groups
- [Issue #534](https://github.com/influxdb/influxdb/issues/534). Create a new series when interpolating
## v0.6.2 [2014-05-09]

View File

@ -536,11 +536,17 @@ func (self *CoordinatorImpl) InterpolateValuesAndCommit(query string, db string,
fieldsInTargetName[i] = f
}
fields := make([]string, 0, len(series.Fields)-len(fieldsIndeces))
// remove the fields used in the target name from the series fields
for i, fi := range fieldsIndeces {
// move the fields to the left
copy(series.Fields[fi-i:], series.Fields[fi-i+1:])
series.Fields = series.Fields[:len(series.Fields)-1]
nextfield:
for i, f := range series.Fields {
for _, fi := range fieldsIndeces {
if fi == i {
continue nextfield
}
}
fields = append(fields, f)
}
if r.MatchString(targetName) {
@ -553,27 +559,37 @@ func (self *CoordinatorImpl) InterpolateValuesAndCommit(query string, db string,
return value
})
p := &protocol.Point{
Values: make([]*protocol.FieldValue, 0, len(point.Values)-len(fieldsIndeces)),
Timestamp: point.Timestamp,
SequenceNumber: point.SequenceNumber,
}
// remove the fields used in the target name from the series fields
for i, fi := range fieldsIndeces {
// move the fields to the left
copy(point.Values[fi-i:], point.Values[fi-i+1:])
point.Values = point.Values[:len(point.Values)-1]
nextvalue:
for i, v := range point.Values {
for _, fi := range fieldsIndeces {
if fi == i {
continue nextvalue
}
}
p.Values = append(p.Values, v)
}
if assignSequenceNumbers {
key := sequenceKey{targetNameWithValues, *point.Timestamp}
key := sequenceKey{targetNameWithValues, *p.Timestamp}
sequenceMap[key] += 1
sequenceNumber := uint64(sequenceMap[key])
point.SequenceNumber = &sequenceNumber
p.SequenceNumber = &sequenceNumber
}
newSeries := serieses[targetNameWithValues]
if newSeries == nil {
newSeries = &protocol.Series{Name: &targetNameWithValues, Fields: series.Fields, Points: []*protocol.Point{point}}
newSeries = &protocol.Series{Name: &targetNameWithValues, Fields: fields, Points: []*protocol.Point{p}}
serieses[targetNameWithValues] = newSeries
continue
}
newSeries.Points = append(newSeries.Points, point)
newSeries.Points = append(newSeries.Points, p)
}
seriesSlice := make([]*protocol.Series, 0, len(serieses))
for _, s := range serieses {
@ -583,7 +599,7 @@ func (self *CoordinatorImpl) InterpolateValuesAndCommit(query string, db string,
log.Error("Couldn't write data for continuous query: ", e)
}
} else {
newSeries := &protocol.Series{Name: &targetName, Fields: series.Fields, Points: series.Points}
newSeries := &protocol.Series{Name: &targetName, Fields: fields, Points: series.Points}
if assignSequenceNumbers {
for _, point := range newSeries.Points {

View File

@ -568,6 +568,7 @@ func (self *ServerSuite) TestContinuousQueryFanoutOperations(c *C) {
self.serverProcesses[0].QueryAsRoot("test_cq", "select * from s1 into d1;", false, c)
self.serverProcesses[0].QueryAsRoot("test_cq", "select * from s2 into d2;", false, c)
self.serverProcesses[0].QueryAsRoot("test_cq", "select * from s1 into d3.[c1];", false, c)
self.serverProcesses[0].QueryAsRoot("test_cq", "select * from /s\\d/ into d3;", false, c)
self.serverProcesses[0].QueryAsRoot("test_cq", "select * from silly_name into :series_name.foo;", false, c)
defer self.serverProcesses[0].QueryAsRoot("test_cq", "drop continuous query 1;", false, c)
@ -577,7 +578,7 @@ func (self *ServerSuite) TestContinuousQueryFanoutOperations(c *C) {
self.serverProcesses[0].WaitForServerToSync()
collection := self.serverProcesses[0].QueryAsRoot("test_cq", "list continuous queries;", false, c)
series := collection.GetSeries("continuous queries", c)
c.Assert(series.Points, HasLen, 4)
c.Assert(series.Points, HasLen, 5)
data := `[
{"name": "s1", "columns": ["c1", "c2"], "points": [[1, "a"], [2, "b"]]},
@ -622,6 +623,16 @@ func (self *ServerSuite) TestContinuousQueryFanoutOperations(c *C) {
series = collection.GetSeries("silly_name.foo", c)
c.Assert(series.GetValueForPointAndColumn(0, "c4", c), Equals, 4.0)
c.Assert(series.GetValueForPointAndColumn(0, "c5", c), Equals, 5.0)
for i, v := range map[int]string{1: "a", 2: "b"} {
query := fmt.Sprintf("select * from d3.%d", i)
collection = self.serverProcesses[0].Query("test_cq", query, false, c)
series = collection.GetSeries(fmt.Sprintf("d3.%d", i), c)
c.Assert(collection.Members, HasLen, 1)
// time, sequence number and c2
c.Assert(collection.Members[0].Columns, HasLen, 3)
c.Assert(series.GetValueForPointAndColumn(0, "c2", c), Equals, v)
}
}
func (self *ServerSuite) TestContinuousQueryGroupByOperations(c *C) {