Convert Point.Tags to Point.Tags()
parent
5dcab443dc
commit
f8d599cda9
|
@ -274,17 +274,17 @@ func TestUnmarshal_Points(t *testing.T) {
|
||||||
t.Errorf("point value mismatch. expected %v, got %v", pv, mv)
|
t.Errorf("point value mismatch. expected %v, got %v", pv, mv)
|
||||||
}
|
}
|
||||||
// test tags
|
// test tags
|
||||||
if test.packet.Hostname != m.Tags["host"] {
|
if test.packet.Hostname != m.Tags()["host"] {
|
||||||
t.Errorf(`point tags["host"] mismatch. expected %q, got %q`, test.packet.Hostname, m.Tags["host"])
|
t.Errorf(`point tags["host"] mismatch. expected %q, got %q`, test.packet.Hostname, m.Tags()["host"])
|
||||||
}
|
}
|
||||||
if test.packet.PluginInstance != m.Tags["instance"] {
|
if test.packet.PluginInstance != m.Tags()["instance"] {
|
||||||
t.Errorf(`point tags["instance"] mismatch. expected %q, got %q`, test.packet.PluginInstance, m.Tags["instance"])
|
t.Errorf(`point tags["instance"] mismatch. expected %q, got %q`, test.packet.PluginInstance, m.Tags()["instance"])
|
||||||
}
|
}
|
||||||
if test.packet.Type != m.Tags["type"] {
|
if test.packet.Type != m.Tags()["type"] {
|
||||||
t.Errorf(`point tags["type"] mismatch. expected %q, got %q`, test.packet.Type, m.Tags["type"])
|
t.Errorf(`point tags["type"] mismatch. expected %q, got %q`, test.packet.Type, m.Tags()["type"])
|
||||||
}
|
}
|
||||||
if test.packet.TypeInstance != m.Tags["type_instance"] {
|
if test.packet.TypeInstance != m.Tags()["type_instance"] {
|
||||||
t.Errorf(`point tags["type_instance"] mismatch. expected %q, got %q`, test.packet.TypeInstance, m.Tags["type_instance"])
|
t.Errorf(`point tags["type_instance"] mismatch. expected %q, got %q`, test.packet.TypeInstance, m.Tags()["type_instance"])
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -97,7 +97,7 @@ func (w *WriteShardRequest) marshalPoints(points []tsdb.Point) []*internal.Point
|
||||||
}
|
}
|
||||||
|
|
||||||
tags := []*internal.Tag{}
|
tags := []*internal.Tag{}
|
||||||
for k, v := range p.Tags {
|
for k, v := range p.Tags() {
|
||||||
key := k
|
key := k
|
||||||
value := v
|
value := v
|
||||||
tags = append(tags, &internal.Tag{
|
tags = append(tags, &internal.Tag{
|
||||||
|
@ -149,9 +149,11 @@ func (w *WriteShardRequest) unmarhalPoints() []tsdb.Point {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tags := tsdb.Tags{}
|
||||||
for _, t := range p.GetTags() {
|
for _, t := range p.GetTags() {
|
||||||
pt.Tags[t.GetKey()] = t.GetValue()
|
tags[t.GetKey()] = t.GetValue()
|
||||||
}
|
}
|
||||||
|
pt.SetTags(tags)
|
||||||
points[i] = pt
|
points[i] = pt
|
||||||
}
|
}
|
||||||
return points
|
return points
|
||||||
|
|
|
@ -55,8 +55,8 @@ func TestWriteShardRequestBinary(t *testing.T) {
|
||||||
t.Errorf("Point #%d HashID() mismatch: got %v, exp %v", i, g.HashID(), p.HashID())
|
t.Errorf("Point #%d HashID() mismatch: got %v, exp %v", i, g.HashID(), p.HashID())
|
||||||
}
|
}
|
||||||
|
|
||||||
for k, v := range p.Tags {
|
for k, v := range p.Tags() {
|
||||||
if g.Tags[k] != v {
|
if g.Tags()[k] != v {
|
||||||
t.Errorf("Point #%d tag mismatch: got %v, exp %v", i, k, v)
|
t.Errorf("Point #%d tag mismatch: got %v, exp %v", i, k, v)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -208,8 +208,8 @@ func Test_DecodeMetric(t *testing.T) {
|
||||||
if point.Name() != test.name {
|
if point.Name() != test.name {
|
||||||
t.Fatalf("name parse failer. expected %v, got %v", test.name, point.Name())
|
t.Fatalf("name parse failer. expected %v, got %v", test.name, point.Name())
|
||||||
}
|
}
|
||||||
if len(point.Tags) != len(test.tags) {
|
if len(point.Tags()) != len(test.tags) {
|
||||||
t.Fatalf("tags len mismatch. expected %d, got %d", len(test.tags), len(point.Tags))
|
t.Fatalf("tags len mismatch. expected %d, got %d", len(test.tags), len(point.Tags()))
|
||||||
}
|
}
|
||||||
f := point.Fields["value"].(float64)
|
f := point.Fields["value"].(float64)
|
||||||
if point.Fields["value"] != f {
|
if point.Fields["value"] != f {
|
||||||
|
|
13
server.go
13
server.go
|
@ -428,7 +428,8 @@ func (s *Server) StartSelfMonitoring(database, retention string, interval time.D
|
||||||
)
|
)
|
||||||
// Specifically create a new map.
|
// Specifically create a new map.
|
||||||
for k, v := range tags {
|
for k, v := range tags {
|
||||||
point.Tags[k] = v
|
tags[k] = v
|
||||||
|
point.AddTag(k, v)
|
||||||
}
|
}
|
||||||
points = append(points, point)
|
points = append(points, point)
|
||||||
})
|
})
|
||||||
|
@ -468,7 +469,7 @@ func (s *Server) StartSelfMonitoring(database, retention string, interval time.D
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
for _, p := range points {
|
for _, p := range points {
|
||||||
p.Tags = map[string]string{"serverID": strconv.FormatUint(s.ID(), 10)}
|
p.AddTag("serverID", strconv.FormatUint(s.ID(), 10))
|
||||||
}
|
}
|
||||||
batch = append(batch, points...)
|
batch = append(batch, points...)
|
||||||
}
|
}
|
||||||
|
@ -1879,9 +1880,9 @@ func (s *Server) WriteSeries(database, retentionPolicy string, points []tsdb.Poi
|
||||||
return ErrDatabaseNotFound(database)
|
return ErrDatabaseNotFound(database)
|
||||||
}
|
}
|
||||||
for _, p := range points {
|
for _, p := range points {
|
||||||
measurement, series := db.MeasurementAndSeries(p.Name(), p.Tags)
|
measurement, series := db.MeasurementAndSeries(p.Name(), p.Tags())
|
||||||
if series == nil {
|
if series == nil {
|
||||||
s.Logger.Printf("series not found: name=%s, tags=%#v", p.Name(), p.Tags)
|
s.Logger.Printf("series not found: name=%s, tags=%#v", p.Name(), p.Tags())
|
||||||
return ErrSeriesNotFound
|
return ErrSeriesNotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1972,11 +1973,11 @@ func (s *Server) createMeasurementsIfNotExists(database, retentionPolicy string,
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, p := range points {
|
for _, p := range points {
|
||||||
measurement, series := db.MeasurementAndSeries(p.Name(), p.Tags)
|
measurement, series := db.MeasurementAndSeries(p.Name(), p.Tags())
|
||||||
|
|
||||||
if series == nil {
|
if series == nil {
|
||||||
// Series does not exist in Metastore, add it so it's created cluster-wide.
|
// Series does not exist in Metastore, add it so it's created cluster-wide.
|
||||||
c.addSeriesIfNotExists(p.Name(), p.Tags)
|
c.addSeriesIfNotExists(p.Name(), p.Tags())
|
||||||
}
|
}
|
||||||
|
|
||||||
for k, v := range p.Fields {
|
for k, v := range p.Fields {
|
||||||
|
|
|
@ -9,7 +9,7 @@ import (
|
||||||
// Point defines the values that will be written to the database
|
// Point defines the values that will be written to the database
|
||||||
type Point struct {
|
type Point struct {
|
||||||
name string
|
name string
|
||||||
Tags Tags
|
tags Tags
|
||||||
time time.Time
|
time time.Time
|
||||||
Fields map[string]interface{}
|
Fields map[string]interface{}
|
||||||
key string
|
key string
|
||||||
|
@ -20,7 +20,7 @@ type Point struct {
|
||||||
func NewPoint(name string, tags Tags, fields map[string]interface{}, time time.Time) Point {
|
func NewPoint(name string, tags Tags, fields map[string]interface{}, time time.Time) Point {
|
||||||
return Point{
|
return Point{
|
||||||
name: name,
|
name: name,
|
||||||
Tags: tags,
|
tags: tags,
|
||||||
time: time,
|
time: time,
|
||||||
Fields: fields,
|
Fields: fields,
|
||||||
}
|
}
|
||||||
|
@ -28,7 +28,7 @@ func NewPoint(name string, tags Tags, fields map[string]interface{}, time time.T
|
||||||
|
|
||||||
func (p *Point) Key() string {
|
func (p *Point) Key() string {
|
||||||
if p.key == "" {
|
if p.key == "" {
|
||||||
p.key = p.Name() + "," + string(p.Tags.HashKey())
|
p.key = p.Name() + "," + string(p.tags.HashKey())
|
||||||
}
|
}
|
||||||
return p.key
|
return p.key
|
||||||
}
|
}
|
||||||
|
@ -53,11 +53,26 @@ func (p *Point) SetTime(t time.Time) {
|
||||||
p.time = t
|
p.time = t
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Tags returns the tag set for the point
|
||||||
|
func (p *Point) Tags() Tags {
|
||||||
|
return p.tags
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetTags replaces the tags for the point
|
||||||
|
func (p *Point) SetTags(tags Tags) {
|
||||||
|
p.tags = tags
|
||||||
|
}
|
||||||
|
|
||||||
|
// AddTag adds or replaces a tag value for a point
|
||||||
|
func (p *Point) AddTag(key, value string) {
|
||||||
|
p.tags[key] = value
|
||||||
|
}
|
||||||
|
|
||||||
func (p *Point) HashID() uint64 {
|
func (p *Point) HashID() uint64 {
|
||||||
|
|
||||||
// <measurementName>|<tagKey>|<tagKey>|<tagValue>|<tagValue>
|
// <measurementName>|<tagKey>|<tagKey>|<tagValue>|<tagValue>
|
||||||
// cpu|host|servera
|
// cpu|host|servera
|
||||||
encodedTags := p.Tags.HashKey()
|
encodedTags := p.tags.HashKey()
|
||||||
size := len(p.Name()) + len(encodedTags)
|
size := len(p.Name()) + len(encodedTags)
|
||||||
if len(encodedTags) > 0 {
|
if len(encodedTags) > 0 {
|
||||||
size++
|
size++
|
||||||
|
|
|
@ -102,7 +102,7 @@ func (s *Shard) WritePoints(points []*Point) error {
|
||||||
var measurement *Measurement
|
var measurement *Measurement
|
||||||
|
|
||||||
if ss := s.series[p.Key()]; ss == nil {
|
if ss := s.series[p.Key()]; ss == nil {
|
||||||
series := &Series{Key: p.Key(), Tags: p.Tags}
|
series := &Series{Key: p.Key(), Tags: p.Tags()}
|
||||||
seriesToCreate = append(seriesToCreate, &seriesCreate{p.Name(), series})
|
seriesToCreate = append(seriesToCreate, &seriesCreate{p.Name(), series})
|
||||||
|
|
||||||
// if the measurement doesn't exist, all fields need to be created
|
// if the measurement doesn't exist, all fields need to be created
|
||||||
|
|
|
@ -44,8 +44,8 @@ func TestShardWriteAndIndex(t *testing.T) {
|
||||||
t.Fatalf("series wasn't in index")
|
t.Fatalf("series wasn't in index")
|
||||||
}
|
}
|
||||||
seriesTags := sh.series[pt.Key()].Tags
|
seriesTags := sh.series[pt.Key()].Tags
|
||||||
if len(seriesTags) != len(pt.Tags) || pt.Tags["host"] != seriesTags["host"] {
|
if len(seriesTags) != len(pt.Tags()) || pt.Tags()["host"] != seriesTags["host"] {
|
||||||
t.Fatalf("tags weren't properly saved to series index: %v, %v", pt.Tags, sh.series[pt.Key()].Tags)
|
t.Fatalf("tags weren't properly saved to series index: %v, %v", pt.Tags(), sh.series[pt.Key()].Tags)
|
||||||
}
|
}
|
||||||
if !reflect.DeepEqual(sh.measurements["cpu"].tagKeys(), []string{"host"}) {
|
if !reflect.DeepEqual(sh.measurements["cpu"].tagKeys(), []string{"host"}) {
|
||||||
t.Fatalf("tag key wasn't saved to measurement index")
|
t.Fatalf("tag key wasn't saved to measurement index")
|
||||||
|
|
Loading…
Reference in New Issue