parent
997020963a
commit
d633d7683f
|
@ -217,7 +217,7 @@ func TestUnmarshal_Points(t *testing.T) {
|
|||
},
|
||||
},
|
||||
points: []data.Point{
|
||||
{Name: "disk_read", Fields: map[string]interface{}{"value": float64(1)}},
|
||||
data.NewPoint("disk_read", nil, map[string]interface{}{"value": float64(1)}, time.Unix(0, 0)),
|
||||
},
|
||||
},
|
||||
{
|
||||
|
@ -230,8 +230,8 @@ func TestUnmarshal_Points(t *testing.T) {
|
|||
},
|
||||
},
|
||||
points: []data.Point{
|
||||
{Name: "disk_read", Fields: map[string]interface{}{"value": float64(1)}},
|
||||
{Name: "disk_write", Fields: map[string]interface{}{"value": float64(5)}},
|
||||
data.NewPoint("disk_read", nil, map[string]interface{}{"value": float64(1)}, time.Unix(0, 0)),
|
||||
data.NewPoint("disk_write", nil, map[string]interface{}{"value": float64(5)}, time.Unix(0, 0)),
|
||||
},
|
||||
},
|
||||
{
|
||||
|
@ -247,11 +247,10 @@ func TestUnmarshal_Points(t *testing.T) {
|
|||
},
|
||||
},
|
||||
points: []data.Point{
|
||||
{
|
||||
Name: "disk_read",
|
||||
Tags: map[string]string{"host": "server01", "instance": "sdk", "type": "disk_octets", "type_instance": "single"},
|
||||
Fields: map[string]interface{}{"value": float64(1)},
|
||||
},
|
||||
data.NewPoint("disk_read",
|
||||
map[string]string{"host": "server01", "instance": "sdk", "type": "disk_octets", "type_instance": "single"},
|
||||
map[string]interface{}{"value": float64(1)},
|
||||
time.Unix(0, 0)),
|
||||
},
|
||||
},
|
||||
}
|
||||
|
@ -265,8 +264,8 @@ func TestUnmarshal_Points(t *testing.T) {
|
|||
for i, m := range test.points {
|
||||
// test name
|
||||
name := fmt.Sprintf("%s_%s", test.packet.Plugin, test.packet.Values[i].Name)
|
||||
if m.Name != name {
|
||||
t.Errorf("point name mismatch. expected %q, got %q", name, m.Name)
|
||||
if m.Name() != name {
|
||||
t.Errorf("point name mismatch. expected %q, got %q", name, m.Name())
|
||||
}
|
||||
// test value
|
||||
mv := m.Fields["value"].(float64)
|
||||
|
|
|
@ -106,7 +106,7 @@ func (w *WriteShardRequest) marshalPoints(points []Point) []*internal.Point {
|
|||
Value: &value,
|
||||
})
|
||||
}
|
||||
name := p.Name
|
||||
name := p.Name()
|
||||
pts[i] = &internal.Point{
|
||||
Name: &name,
|
||||
Time: proto.Int64(p.Time().UnixNano()),
|
||||
|
@ -190,7 +190,7 @@ func (w *WriteShardResponse) UnmarshalBinary(buf []byte) error {
|
|||
|
||||
// Point defines the values that will be written to the database
|
||||
type Point struct {
|
||||
Name string
|
||||
name string
|
||||
Tags Tags
|
||||
time time.Time
|
||||
Fields map[string]interface{}
|
||||
|
@ -199,13 +199,23 @@ type Point struct {
|
|||
// NewPoint returns a new point with the given measurement name, tags, fiels and timestamp
|
||||
func NewPoint(name string, tags Tags, fields map[string]interface{}, time time.Time) Point {
|
||||
return Point{
|
||||
Name: name,
|
||||
name: name,
|
||||
Tags: tags,
|
||||
time: time,
|
||||
Fields: fields,
|
||||
}
|
||||
}
|
||||
|
||||
// Name return the measurement name for the point
|
||||
func (p *Point) Name() string {
|
||||
return p.name
|
||||
}
|
||||
|
||||
// SetName updates the measurement name for the point
|
||||
func (p *Point) SetName(name string) {
|
||||
p.name = name
|
||||
}
|
||||
|
||||
// Time return the timesteamp for the point
|
||||
func (p *Point) Time() time.Time {
|
||||
return p.time
|
||||
|
@ -221,12 +231,12 @@ func (p *Point) HashID() uint64 {
|
|||
// <measurementName>|<tagKey>|<tagKey>|<tagValue>|<tagValue>
|
||||
// cpu|host|servera
|
||||
encodedTags := p.Tags.Marshal()
|
||||
size := len(p.Name) + len(encodedTags)
|
||||
size := len(p.Name()) + len(encodedTags)
|
||||
if len(encodedTags) > 0 {
|
||||
size++
|
||||
}
|
||||
b := make([]byte, 0, size)
|
||||
b = append(b, p.Name...)
|
||||
b = append(b, p.Name()...)
|
||||
if len(encodedTags) > 0 {
|
||||
b = append(b, '|')
|
||||
}
|
||||
|
|
|
@ -60,8 +60,8 @@ func TestWriteShardRequestBinary(t *testing.T) {
|
|||
for i, p := range srPoints {
|
||||
g := gotPoints[i]
|
||||
|
||||
if g.Name != p.Name {
|
||||
t.Errorf("Point %d name mismatch: got %v, exp %v", i, g.Name, p.Name)
|
||||
if g.Name() != p.Name() {
|
||||
t.Errorf("Point %d name mismatch: got %v, exp %v", i, g.Name(), p.Name())
|
||||
}
|
||||
|
||||
if !g.Time().Equal(p.Time()) {
|
||||
|
|
|
@ -205,8 +205,8 @@ func Test_DecodeMetric(t *testing.T) {
|
|||
// If we erred out,it was intended and the following tests won't work
|
||||
continue
|
||||
}
|
||||
if point.Name != test.name {
|
||||
t.Fatalf("name parse failer. expected %v, got %v", test.name, point.Name)
|
||||
if point.Name() != test.name {
|
||||
t.Fatalf("name parse failer. expected %v, got %v", test.name, point.Name())
|
||||
}
|
||||
if len(point.Tags) != len(test.tags) {
|
||||
t.Fatalf("tags len mismatch. expected %d, got %d", len(test.tags), len(point.Tags))
|
||||
|
|
10
server.go
10
server.go
|
@ -1878,9 +1878,9 @@ func (s *Server) WriteSeries(database, retentionPolicy string, points []data.Poi
|
|||
return ErrDatabaseNotFound(database)
|
||||
}
|
||||
for _, p := range points {
|
||||
measurement, series := db.MeasurementAndSeries(p.Name, p.Tags)
|
||||
measurement, series := db.MeasurementAndSeries(p.Name(), p.Tags)
|
||||
if series == nil {
|
||||
s.Logger.Printf("series not found: name=%s, tags=%#v", p.Name, p.Tags)
|
||||
s.Logger.Printf("series not found: name=%s, tags=%#v", p.Name(), p.Tags)
|
||||
return ErrSeriesNotFound
|
||||
}
|
||||
|
||||
|
@ -1971,11 +1971,11 @@ func (s *Server) createMeasurementsIfNotExists(database, retentionPolicy string,
|
|||
}
|
||||
|
||||
for _, p := range points {
|
||||
measurement, series := db.MeasurementAndSeries(p.Name, p.Tags)
|
||||
measurement, series := db.MeasurementAndSeries(p.Name(), p.Tags)
|
||||
|
||||
if series == nil {
|
||||
// Series does not exist in Metastore, add it so it's created cluster-wide.
|
||||
c.addSeriesIfNotExists(p.Name, p.Tags)
|
||||
c.addSeriesIfNotExists(p.Name(), p.Tags)
|
||||
}
|
||||
|
||||
for k, v := range p.Fields {
|
||||
|
@ -1989,7 +1989,7 @@ func (s *Server) createMeasurementsIfNotExists(database, retentionPolicy string,
|
|||
}
|
||||
}
|
||||
// Field isn't in Metastore. Add it to command so it's created cluster-wide.
|
||||
if err := c.addFieldIfNotExists(p.Name, k, influxql.InspectDataType(v)); err != nil {
|
||||
if err := c.addFieldIfNotExists(p.Name(), k, influxql.InspectDataType(v)); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1439,7 +1439,7 @@ func TestServer_CopyShard(t *testing.T) {
|
|||
s.SetDefaultRetentionPolicy("foo", "raw")
|
||||
|
||||
// Write series with one point to the database to ensure shard 1 is created.
|
||||
s.MustWriteSeries("foo", "raw", []data.Point{{Name: "series1", Fields: map[string]interface{}{"value": float64(20)}}})
|
||||
s.MustWriteSeries("foo", "raw", []data.Point{data.NewPoint("series1", nil, map[string]interface{}{"value": float64(20)}, time.Unix(0, 0))})
|
||||
time.Sleep(time.Millisecond * 100)
|
||||
|
||||
err := s.CopyShard(ioutil.Discard, 1234)
|
||||
|
|
Loading…
Reference in New Issue