parent
1335eaba33
commit
e25fe4997a
|
@ -151,6 +151,7 @@
|
|||
- [Issue #78](https://github.com/influxdb/influxdb/issues/78). Sequence numbers persist across restarts so they're not reused
|
||||
- [Issue #102](https://github.com/influxdb/influxdb/issues/102). Support expressions in where condition
|
||||
- [Issue #101](https://github.com/influxdb/influxdb/issues/101). Support expressions in aggregates
|
||||
- [Issue #62](https://github.com/influxdb/influxdb/issues/62). Support updating and deleting column values
|
||||
|
||||
## Bugfixes
|
||||
|
||||
|
|
|
@ -234,20 +234,25 @@ func (self *HttpServer) query(w libhttp.ResponseWriter, r *libhttp.Request) {
|
|||
}
|
||||
}
|
||||
|
||||
func removeTimestampFieldDefinition(fields []string) []string {
|
||||
timestampIdx := -1
|
||||
func removeField(fields []string, name string) []string {
|
||||
index := -1
|
||||
for idx, field := range fields {
|
||||
if field == "time" {
|
||||
timestampIdx = idx
|
||||
if field == name {
|
||||
index = idx
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if timestampIdx == -1 {
|
||||
if index == -1 {
|
||||
return fields
|
||||
}
|
||||
|
||||
return append(fields[:timestampIdx], fields[timestampIdx+1:]...)
|
||||
return append(fields[:index], fields[index+1:]...)
|
||||
}
|
||||
|
||||
func removeTimestampFieldDefinition(fields []string) []string {
|
||||
fields = removeField(fields, "time")
|
||||
return removeField(fields, "sequence_number")
|
||||
}
|
||||
|
||||
func convertToDataStoreSeries(s *SerializedSeries, precision TimePrecision) (*protocol.Series, error) {
|
||||
|
@ -259,6 +264,7 @@ func convertToDataStoreSeries(s *SerializedSeries, precision TimePrecision) (*pr
|
|||
for _, point := range s.Points {
|
||||
values := []*protocol.FieldValue{}
|
||||
var timestamp *int64
|
||||
var sequence *uint64
|
||||
|
||||
for idx, field := range s.Columns {
|
||||
value := point[idx]
|
||||
|
@ -281,6 +287,17 @@ func convertToDataStoreSeries(s *SerializedSeries, precision TimePrecision) (*pr
|
|||
}
|
||||
}
|
||||
|
||||
if field == "sequence_number" {
|
||||
switch value.(type) {
|
||||
case float64:
|
||||
_sequenceNumber := uint64(value.(float64))
|
||||
sequence = &_sequenceNumber
|
||||
continue
|
||||
default:
|
||||
return nil, fmt.Errorf("sequence_number field must be float but is %T (%v)", value, value)
|
||||
}
|
||||
}
|
||||
|
||||
switch v := value.(type) {
|
||||
case string:
|
||||
values = append(values, &protocol.FieldValue{StringValue: &v})
|
||||
|
@ -300,8 +317,9 @@ func convertToDataStoreSeries(s *SerializedSeries, precision TimePrecision) (*pr
|
|||
}
|
||||
}
|
||||
points = append(points, &protocol.Point{
|
||||
Values: values,
|
||||
Timestamp: timestamp,
|
||||
Values: values,
|
||||
Timestamp: timestamp,
|
||||
SequenceNumber: sequence,
|
||||
})
|
||||
}
|
||||
|
||||
|
|
|
@ -281,10 +281,8 @@ func (self *LevelDbDatastore) WriteSeriesData(database string, series *protocol.
|
|||
binary.Write(sequenceNumberBuffer, binary.BigEndian, *point.SequenceNumber)
|
||||
pointKey := append(append(id, timestampBuffer.Bytes()...), sequenceNumberBuffer.Bytes()...)
|
||||
|
||||
// TODO: we should remove the column value if timestamp and sequence number
|
||||
// were provided.
|
||||
// Paul: since these are assigned in the coordinator, we'll have to figure out how to represent this.
|
||||
if point.Values[fieldIndex] == nil {
|
||||
wb.Delete(pointKey)
|
||||
continue
|
||||
}
|
||||
|
||||
|
|
|
@ -14,6 +14,7 @@ import (
|
|||
"net/url"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"strings"
|
||||
"syscall"
|
||||
"testing"
|
||||
|
@ -683,6 +684,57 @@ func (self *IntegrationSuite) TestAggregateWithExpression(c *C) {
|
|||
c.Assert(data[0].Points[0][1], Equals, 3.0)
|
||||
}
|
||||
|
||||
func (self *IntegrationSuite) verifyWrite(series string, value, sequence interface{}, c *C) interface{} {
|
||||
valueString := "null"
|
||||
if value != nil {
|
||||
valueString = strconv.Itoa(int(value.(float64)))
|
||||
}
|
||||
|
||||
columns := `["time", "a"]`
|
||||
points := fmt.Sprintf(`[[1386299093602, %s]]`, valueString)
|
||||
if sequence != nil {
|
||||
columns = `["time", "sequence_number", "a"]`
|
||||
points = fmt.Sprintf(`[[1386299093602, %.0f, %s]]`, sequence, valueString)
|
||||
}
|
||||
|
||||
payload := fmt.Sprintf(`
|
||||
[
|
||||
{
|
||||
"name": "%s",
|
||||
"columns": %s,
|
||||
"points": %s
|
||||
}
|
||||
]`, series, columns, points)
|
||||
err := self.server.WriteData(payload)
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
bs, err := self.server.RunQuery("select * from " + series)
|
||||
c.Assert(err, IsNil)
|
||||
data := []*h.SerializedSeries{}
|
||||
err = json.Unmarshal(bs, &data)
|
||||
c.Assert(data, HasLen, 1)
|
||||
c.Assert(data[0].Columns, HasLen, 3)
|
||||
|
||||
if value != nil {
|
||||
c.Assert(data[0].Points, HasLen, 1)
|
||||
p := toMap(data[0])
|
||||
c.Assert(p[0]["a"], Equals, value)
|
||||
return p[0]["sequence_number"]
|
||||
}
|
||||
c.Assert(data[0].Points, HasLen, 0)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (self *IntegrationSuite) TestUpdatePoint(c *C) {
|
||||
sequence := self.verifyWrite("test_updating_point", 1.0, nil, c)
|
||||
self.verifyWrite("test_updating_point", 2.0, sequence, c)
|
||||
}
|
||||
|
||||
func (self *IntegrationSuite) TestDeletePoint(c *C) {
|
||||
sequence := self.verifyWrite("test_deleting_point", 1.0, nil, c)
|
||||
self.verifyWrite("test_deleting_point", nil, sequence, c)
|
||||
}
|
||||
|
||||
// test for issue #41
|
||||
func (self *IntegrationSuite) TestDbDelete(c *C) {
|
||||
err := self.server.WriteData(`
|
||||
|
|
Loading…
Reference in New Issue