diff --git a/CHANGELOG.md b/CHANGELOG.md index 81161514e4..1d7e88bd98 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -151,6 +151,7 @@ - [Issue #78](https://github.com/influxdb/influxdb/issues/78). Sequence numbers persist across restarts so they're not reused - [Issue #102](https://github.com/influxdb/influxdb/issues/102). Support expressions in where condition - [Issue #101](https://github.com/influxdb/influxdb/issues/101). Support expressions in aggregates +- [Issue #62](https://github.com/influxdb/influxdb/issues/62). Support updating and deleting column values ## Bugfixes diff --git a/src/api/http/api.go b/src/api/http/api.go index d035e5dcf1..e6cc2016ff 100644 --- a/src/api/http/api.go +++ b/src/api/http/api.go @@ -234,20 +234,25 @@ func (self *HttpServer) query(w libhttp.ResponseWriter, r *libhttp.Request) { } } -func removeTimestampFieldDefinition(fields []string) []string { - timestampIdx := -1 +func removeField(fields []string, name string) []string { + index := -1 for idx, field := range fields { - if field == "time" { - timestampIdx = idx + if field == name { + index = idx break } } - if timestampIdx == -1 { + if index == -1 { return fields } - return append(fields[:timestampIdx], fields[timestampIdx+1:]...) + return append(fields[:index], fields[index+1:]...) +} + +func removeTimestampFieldDefinition(fields []string) []string { + fields = removeField(fields, "time") + return removeField(fields, "sequence_number") } func convertToDataStoreSeries(s *SerializedSeries, precision TimePrecision) (*protocol.Series, error) { @@ -259,6 +264,7 @@ func convertToDataStoreSeries(s *SerializedSeries, precision TimePrecision) (*pr for _, point := range s.Points { values := []*protocol.FieldValue{} var timestamp *int64 + var sequence *uint64 for idx, field := range s.Columns { value := point[idx] @@ -281,6 +287,17 @@ func convertToDataStoreSeries(s *SerializedSeries, precision TimePrecision) (*pr } } + if field == "sequence_number" { + switch value.(type) { + case float64: + _sequenceNumber := uint64(value.(float64)) + sequence = &_sequenceNumber + continue + default: + return nil, fmt.Errorf("sequence_number field must be float but is %T (%v)", value, value) + } + } + switch v := value.(type) { case string: values = append(values, &protocol.FieldValue{StringValue: &v}) @@ -300,8 +317,9 @@ func convertToDataStoreSeries(s *SerializedSeries, precision TimePrecision) (*pr } } points = append(points, &protocol.Point{ - Values: values, - Timestamp: timestamp, + Values: values, + Timestamp: timestamp, + SequenceNumber: sequence, }) } diff --git a/src/datastore/leveldb_datastore.go b/src/datastore/leveldb_datastore.go index 97f6b98c6e..d3c4dba1a3 100644 --- a/src/datastore/leveldb_datastore.go +++ b/src/datastore/leveldb_datastore.go @@ -281,10 +281,8 @@ func (self *LevelDbDatastore) WriteSeriesData(database string, series *protocol. binary.Write(sequenceNumberBuffer, binary.BigEndian, *point.SequenceNumber) pointKey := append(append(id, timestampBuffer.Bytes()...), sequenceNumberBuffer.Bytes()...) - // TODO: we should remove the column value if timestamp and sequence number - // were provided. - // Paul: since these are assigned in the coordinator, we'll have to figure out how to represent this. if point.Values[fieldIndex] == nil { + wb.Delete(pointKey) continue } diff --git a/src/integration/benchmark_test.go b/src/integration/benchmark_test.go index 48fb1c11ba..6c3d14ff80 100644 --- a/src/integration/benchmark_test.go +++ b/src/integration/benchmark_test.go @@ -14,6 +14,7 @@ import ( "net/url" "os" "path/filepath" + "strconv" "strings" "syscall" "testing" @@ -683,6 +684,57 @@ func (self *IntegrationSuite) TestAggregateWithExpression(c *C) { c.Assert(data[0].Points[0][1], Equals, 3.0) } +func (self *IntegrationSuite) verifyWrite(series string, value, sequence interface{}, c *C) interface{} { + valueString := "null" + if value != nil { + valueString = strconv.Itoa(int(value.(float64))) + } + + columns := `["time", "a"]` + points := fmt.Sprintf(`[[1386299093602, %s]]`, valueString) + if sequence != nil { + columns = `["time", "sequence_number", "a"]` + points = fmt.Sprintf(`[[1386299093602, %.0f, %s]]`, sequence, valueString) + } + + payload := fmt.Sprintf(` +[ + { + "name": "%s", + "columns": %s, + "points": %s + } +]`, series, columns, points) + err := self.server.WriteData(payload) + c.Assert(err, IsNil) + + bs, err := self.server.RunQuery("select * from " + series) + c.Assert(err, IsNil) + data := []*h.SerializedSeries{} + err = json.Unmarshal(bs, &data) + c.Assert(data, HasLen, 1) + c.Assert(data[0].Columns, HasLen, 3) + + if value != nil { + c.Assert(data[0].Points, HasLen, 1) + p := toMap(data[0]) + c.Assert(p[0]["a"], Equals, value) + return p[0]["sequence_number"] + } + c.Assert(data[0].Points, HasLen, 0) + return nil +} + +func (self *IntegrationSuite) TestUpdatePoint(c *C) { + sequence := self.verifyWrite("test_updating_point", 1.0, nil, c) + self.verifyWrite("test_updating_point", 2.0, sequence, c) +} + +func (self *IntegrationSuite) TestDeletePoint(c *C) { + sequence := self.verifyWrite("test_deleting_point", 1.0, nil, c) + self.verifyWrite("test_deleting_point", nil, sequence, c) +} + // test for issue #41 func (self *IntegrationSuite) TestDbDelete(c *C) { err := self.server.WriteData(`