diff --git a/CHANGELOG.md b/CHANGELOG.md index 36b7d06b07..4e60c81d76 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ - [Issue #538](https://github.com/influxdb/influxdb/issues/538). Don't panic if the same series existed twice in the request with different columns - [Issue #536](https://github.com/influxdb/influxdb/issues/536). Joining the cluster after shards are creating shouldn't cause new nodes to panic - [Issue #539](https://github.com/influxdb/influxdb/issues/539). count(distinct()) with fill shouldn't panic on empty groups +- [Issue #534](https://github.com/influxdb/influxdb/issues/534). Create a new series when interpolating ## v0.6.2 [2014-05-09] diff --git a/src/coordinator/coordinator.go b/src/coordinator/coordinator.go index 4f3d0ac6bc..da24c493fa 100644 --- a/src/coordinator/coordinator.go +++ b/src/coordinator/coordinator.go @@ -536,11 +536,17 @@ func (self *CoordinatorImpl) InterpolateValuesAndCommit(query string, db string, fieldsInTargetName[i] = f } + fields := make([]string, 0, len(series.Fields)-len(fieldsIndeces)) + // remove the fields used in the target name from the series fields - for i, fi := range fieldsIndeces { - // move the fields to the left - copy(series.Fields[fi-i:], series.Fields[fi-i+1:]) - series.Fields = series.Fields[:len(series.Fields)-1] +nextfield: + for i, f := range series.Fields { + for _, fi := range fieldsIndeces { + if fi == i { + continue nextfield + } + } + fields = append(fields, f) } if r.MatchString(targetName) { @@ -553,27 +559,37 @@ func (self *CoordinatorImpl) InterpolateValuesAndCommit(query string, db string, return value }) + p := &protocol.Point{ + Values: make([]*protocol.FieldValue, 0, len(point.Values)-len(fieldsIndeces)), + Timestamp: point.Timestamp, + SequenceNumber: point.SequenceNumber, + } + // remove the fields used in the target name from the series fields - for i, fi := range fieldsIndeces { - // move the fields to the left - copy(point.Values[fi-i:], point.Values[fi-i+1:]) - point.Values = point.Values[:len(point.Values)-1] + nextvalue: + for i, v := range point.Values { + for _, fi := range fieldsIndeces { + if fi == i { + continue nextvalue + } + } + p.Values = append(p.Values, v) } if assignSequenceNumbers { - key := sequenceKey{targetNameWithValues, *point.Timestamp} + key := sequenceKey{targetNameWithValues, *p.Timestamp} sequenceMap[key] += 1 sequenceNumber := uint64(sequenceMap[key]) - point.SequenceNumber = &sequenceNumber + p.SequenceNumber = &sequenceNumber } newSeries := serieses[targetNameWithValues] if newSeries == nil { - newSeries = &protocol.Series{Name: &targetNameWithValues, Fields: series.Fields, Points: []*protocol.Point{point}} + newSeries = &protocol.Series{Name: &targetNameWithValues, Fields: fields, Points: []*protocol.Point{p}} serieses[targetNameWithValues] = newSeries continue } - newSeries.Points = append(newSeries.Points, point) + newSeries.Points = append(newSeries.Points, p) } seriesSlice := make([]*protocol.Series, 0, len(serieses)) for _, s := range serieses { @@ -583,7 +599,7 @@ func (self *CoordinatorImpl) InterpolateValuesAndCommit(query string, db string, log.Error("Couldn't write data for continuous query: ", e) } } else { - newSeries := &protocol.Series{Name: &targetName, Fields: series.Fields, Points: series.Points} + newSeries := &protocol.Series{Name: &targetName, Fields: fields, Points: series.Points} if assignSequenceNumbers { for _, point := range newSeries.Points { diff --git a/src/integration/multiple_servers_test.go b/src/integration/multiple_servers_test.go index a6eaf5a1c1..c33cc2be05 100644 --- a/src/integration/multiple_servers_test.go +++ b/src/integration/multiple_servers_test.go @@ -568,6 +568,7 @@ func (self *ServerSuite) TestContinuousQueryFanoutOperations(c *C) { self.serverProcesses[0].QueryAsRoot("test_cq", "select * from s1 into d1;", false, c) self.serverProcesses[0].QueryAsRoot("test_cq", "select * from s2 into d2;", false, c) + self.serverProcesses[0].QueryAsRoot("test_cq", "select * from s1 into d3.[c1];", false, c) self.serverProcesses[0].QueryAsRoot("test_cq", "select * from /s\\d/ into d3;", false, c) self.serverProcesses[0].QueryAsRoot("test_cq", "select * from silly_name into :series_name.foo;", false, c) defer self.serverProcesses[0].QueryAsRoot("test_cq", "drop continuous query 1;", false, c) @@ -577,7 +578,7 @@ func (self *ServerSuite) TestContinuousQueryFanoutOperations(c *C) { self.serverProcesses[0].WaitForServerToSync() collection := self.serverProcesses[0].QueryAsRoot("test_cq", "list continuous queries;", false, c) series := collection.GetSeries("continuous queries", c) - c.Assert(series.Points, HasLen, 4) + c.Assert(series.Points, HasLen, 5) data := `[ {"name": "s1", "columns": ["c1", "c2"], "points": [[1, "a"], [2, "b"]]}, @@ -622,6 +623,16 @@ func (self *ServerSuite) TestContinuousQueryFanoutOperations(c *C) { series = collection.GetSeries("silly_name.foo", c) c.Assert(series.GetValueForPointAndColumn(0, "c4", c), Equals, 4.0) c.Assert(series.GetValueForPointAndColumn(0, "c5", c), Equals, 5.0) + + for i, v := range map[int]string{1: "a", 2: "b"} { + query := fmt.Sprintf("select * from d3.%d", i) + collection = self.serverProcesses[0].Query("test_cq", query, false, c) + series = collection.GetSeries(fmt.Sprintf("d3.%d", i), c) + c.Assert(collection.Members, HasLen, 1) + // time, sequence number and c2 + c.Assert(collection.Members[0].Columns, HasLen, 3) + c.Assert(series.GetValueForPointAndColumn(0, "c2", c), Equals, v) + } } func (self *ServerSuite) TestContinuousQueryGroupByOperations(c *C) {