Convert Point.Fields to Point.Fields()
parent
f8d599cda9
commit
528f47e093
|
@ -268,7 +268,7 @@ func TestUnmarshal_Points(t *testing.T) {
|
|||
t.Errorf("point name mismatch. expected %q, got %q", name, m.Name())
|
||||
}
|
||||
// test value
|
||||
mv := m.Fields["value"].(float64)
|
||||
mv := m.Fields()["value"].(float64)
|
||||
pv := test.packet.Values[i].Value
|
||||
if mv != pv {
|
||||
t.Errorf("point value mismatch. expected %v, got %v", pv, mv)
|
||||
|
|
14
data/rpc.go
14
data/rpc.go
|
@ -72,7 +72,7 @@ func (w *WriteShardRequest) marshalPoints(points []tsdb.Point) []*internal.Point
|
|||
pts := make([]*internal.Point, len(points))
|
||||
for i, p := range points {
|
||||
fields := []*internal.Field{}
|
||||
for k, v := range p.Fields {
|
||||
for k, v := range p.Fields() {
|
||||
name := k
|
||||
f := &internal.Field{
|
||||
Name: &name,
|
||||
|
@ -135,17 +135,17 @@ func (w *WriteShardRequest) unmarhalPoints() []tsdb.Point {
|
|||
for _, f := range p.GetFields() {
|
||||
n := f.GetName()
|
||||
if f.Int32 != nil {
|
||||
pt.Fields[n] = f.GetInt32()
|
||||
pt.AddField(n, f.GetInt32())
|
||||
} else if f.Int64 != nil {
|
||||
pt.Fields[n] = f.GetInt64()
|
||||
pt.AddField(n, f.GetInt64())
|
||||
} else if f.Float64 != nil {
|
||||
pt.Fields[n] = f.GetFloat64()
|
||||
pt.AddField(n, f.GetFloat64())
|
||||
} else if f.Bool != nil {
|
||||
pt.Fields[n] = f.GetBool()
|
||||
pt.AddField(n, f.GetBool())
|
||||
} else if f.String_ != nil {
|
||||
pt.Fields[n] = f.GetString_()
|
||||
pt.AddField(n, f.GetString_())
|
||||
} else {
|
||||
pt.Fields[n] = f.GetBytes()
|
||||
pt.AddField(n, f.GetBytes())
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -61,13 +61,13 @@ func TestWriteShardRequestBinary(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
if len(p.Fields) != len(g.Fields) {
|
||||
t.Errorf("Point %d field count mismatch: got %v, exp %v", i, len(g.Fields), len(p.Fields))
|
||||
if len(p.Fields()) != len(g.Fields()) {
|
||||
t.Errorf("Point %d field count mismatch: got %v, exp %v", i, len(g.Fields()), len(p.Fields()))
|
||||
}
|
||||
|
||||
for j, f := range p.Fields {
|
||||
if g.Fields[j] != f {
|
||||
t.Errorf("Point %d field mismatch: got %v, exp %v", i, g.Fields[j], f)
|
||||
for j, f := range p.Fields() {
|
||||
if g.Fields()[j] != f {
|
||||
t.Errorf("Point %d field mismatch: got %v, exp %v", i, g.Fields()[j], f)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -211,8 +211,8 @@ func Test_DecodeMetric(t *testing.T) {
|
|||
if len(point.Tags()) != len(test.tags) {
|
||||
t.Fatalf("tags len mismatch. expected %d, got %d", len(test.tags), len(point.Tags()))
|
||||
}
|
||||
f := point.Fields["value"].(float64)
|
||||
if point.Fields["value"] != f {
|
||||
f := point.Fields()["value"].(float64)
|
||||
if point.Fields()["value"] != f {
|
||||
t.Fatalf("floatValue value mismatch. expected %v, got %v", test.value, f)
|
||||
}
|
||||
if point.Time().UnixNano()/1000000 != test.time.UnixNano()/1000000 {
|
||||
|
|
10
server.go
10
server.go
|
@ -1829,11 +1829,11 @@ func (s *Server) WriteSeries(database, retentionPolicy string, points []tsdb.Poi
|
|||
|
||||
// Make sure every point is valid.
|
||||
for _, p := range points {
|
||||
if len(p.Fields) == 0 {
|
||||
if len(p.Fields()) == 0 {
|
||||
return 0, ErrFieldsRequired
|
||||
}
|
||||
|
||||
for _, f := range p.Fields {
|
||||
for _, f := range p.Fields() {
|
||||
if f == nil {
|
||||
return 0, ErrFieldIsNull
|
||||
}
|
||||
|
@ -1910,7 +1910,7 @@ func (s *Server) WriteSeries(database, retentionPolicy string, points []tsdb.Poi
|
|||
}
|
||||
|
||||
// Convert string-key/values to encoded fields.
|
||||
encodedFields, err := codec.EncodeFields(p.Fields)
|
||||
encodedFields, err := codec.EncodeFields(p.Fields())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -1980,7 +1980,7 @@ func (s *Server) createMeasurementsIfNotExists(database, retentionPolicy string,
|
|||
c.addSeriesIfNotExists(p.Name(), p.Tags())
|
||||
}
|
||||
|
||||
for k, v := range p.Fields {
|
||||
for k, v := range p.Fields() {
|
||||
if measurement != nil {
|
||||
if f := measurement.FieldByName(k); f != nil {
|
||||
// Field present in Metastore, make sure there is no type conflict.
|
||||
|
@ -4086,7 +4086,7 @@ func (s *Server) runContinuousQueryAndWriteResult(cq *ContinuousQuery) error {
|
|||
|
||||
if len(points) > 0 {
|
||||
for _, p := range points {
|
||||
for _, v := range p.Fields {
|
||||
for _, v := range p.Fields() {
|
||||
if v == nil {
|
||||
// If we have any nil values, we can't write the data
|
||||
// This happens the CQ is created and running before we write data to the measurement
|
||||
|
|
|
@ -11,7 +11,7 @@ type Point struct {
|
|||
name string
|
||||
tags Tags
|
||||
time time.Time
|
||||
Fields map[string]interface{}
|
||||
fields map[string]interface{}
|
||||
key string
|
||||
data []byte
|
||||
}
|
||||
|
@ -22,7 +22,7 @@ func NewPoint(name string, tags Tags, fields map[string]interface{}, time time.T
|
|||
name: name,
|
||||
tags: tags,
|
||||
time: time,
|
||||
Fields: fields,
|
||||
fields: fields,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -68,6 +68,16 @@ func (p *Point) AddTag(key, value string) {
|
|||
p.tags[key] = value
|
||||
}
|
||||
|
||||
// Fields returns the fiels for the point
|
||||
func (p *Point) Fields() map[string]interface{} {
|
||||
return p.fields
|
||||
}
|
||||
|
||||
// AddField adds or replaces a field value for a point
|
||||
func (p *Point) AddField(name string, value interface{}) {
|
||||
p.fields[name] = value
|
||||
}
|
||||
|
||||
func (p *Point) HashID() uint64 {
|
||||
|
||||
// <measurementName>|<tagKey>|<tagKey>|<tagValue>|<tagValue>
|
||||
|
|
|
@ -107,7 +107,7 @@ func (s *Shard) WritePoints(points []*Point) error {
|
|||
|
||||
// if the measurement doesn't exist, all fields need to be created
|
||||
if m := s.measurements[p.Name()]; m == nil {
|
||||
for name, value := range p.Fields {
|
||||
for name, value := range p.Fields() {
|
||||
fieldsToCreate = append(fieldsToCreate, &fieldCreate{p.Name(), &Field{Name: name, Type: influxql.InspectDataType(value)}})
|
||||
}
|
||||
continue // no need to validate since they're all new fields
|
||||
|
@ -119,14 +119,14 @@ func (s *Shard) WritePoints(points []*Point) error {
|
|||
}
|
||||
|
||||
// validate field types
|
||||
for name, value := range p.Fields {
|
||||
for name, value := range p.Fields() {
|
||||
if f := measurement.FieldByName(name); f != nil {
|
||||
// Field present in Metastore, make sure there is no type conflict.
|
||||
if f.Type != influxql.InspectDataType(value) {
|
||||
return fmt.Errorf("input field \"%s\" is type %T, already exists as type %s", name, value, f.Type)
|
||||
}
|
||||
|
||||
data, err := measurement.fieldCodec.EncodeFields(p.Fields)
|
||||
data, err := measurement.fieldCodec.EncodeFields(p.Fields())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -177,7 +177,7 @@ func (s *Shard) WritePoints(points []*Point) error {
|
|||
// marshal the raw data if it hasn't been marshaled already
|
||||
if p.data == nil {
|
||||
// this was populated earlier, don't need to validate that it's there.
|
||||
data, err := s.series[p.Key()].measurement.fieldCodec.EncodeFields(p.Fields)
|
||||
data, err := s.series[p.Key()].measurement.fieldCodec.EncodeFields(p.Fields())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue