Refactoring based on PR comments
Changing WriteSeries to take a slice of points instead of variable arguments Adding godoc comment for public type Pointpull/1322/head
parent
af64bafba9
commit
e4604fec36
|
@ -17,7 +17,7 @@ const DefaultPort = 25826
|
|||
|
||||
// SeriesWriter defines the interface for the destination of the data.
|
||||
type SeriesWriter interface {
|
||||
WriteSeries(database, retentionPolicy string, points ...influxdb.Point) (uint64, error)
|
||||
WriteSeries(database, retentionPolicy string, points []influxdb.Point) (uint64, error)
|
||||
}
|
||||
|
||||
type Server struct {
|
||||
|
@ -112,7 +112,7 @@ func (s *Server) handleMessage(buffer []byte) {
|
|||
for _, packet := range *packets {
|
||||
points := Unmarshal(&packet)
|
||||
for _, p := range points {
|
||||
_, err := s.writer.WriteSeries(s.Database, "", p)
|
||||
_, err := s.writer.WriteSeries(s.Database, "", []influxdb.Point{p})
|
||||
if err != nil {
|
||||
log.Printf("Collectd cannot write data: %s", err)
|
||||
continue
|
||||
|
|
|
@ -22,7 +22,7 @@ type serverResponse struct {
|
|||
|
||||
var responses = make(chan *serverResponse, 1024)
|
||||
|
||||
func (testServer) WriteSeries(database, retentionPolicy string, points ...influxdb.Point) (uint64, error) {
|
||||
func (testServer) WriteSeries(database, retentionPolicy string, points []influxdb.Point) (uint64, error) {
|
||||
responses <- &serverResponse{
|
||||
database: database,
|
||||
retentionPolicy: retentionPolicy,
|
||||
|
|
|
@ -35,7 +35,7 @@ var (
|
|||
|
||||
// SeriesWriter defines the interface for the destination of the data.
|
||||
type SeriesWriter interface {
|
||||
WriteSeries(database, retentionPolicy string, points ...influxdb.Point) (uint64, error)
|
||||
WriteSeries(database, retentionPolicy string, points []influxdb.Point) (uint64, error)
|
||||
}
|
||||
|
||||
// Parser encapulates a Graphite Parser.
|
||||
|
|
|
@ -5,6 +5,8 @@ import (
|
|||
"log"
|
||||
"net"
|
||||
"strings"
|
||||
|
||||
"github.com/influxdb/influxdb"
|
||||
)
|
||||
|
||||
// TCPServer processes Graphite data received over TCP connections.
|
||||
|
@ -72,6 +74,6 @@ func (t *TCPServer) handleConnection(conn net.Conn) {
|
|||
}
|
||||
|
||||
// Send the data to database
|
||||
t.writer.WriteSeries(t.Database, "", point)
|
||||
t.writer.WriteSeries(t.Database, "", []influxdb.Point{point})
|
||||
}
|
||||
}
|
||||
|
|
|
@ -3,6 +3,8 @@ package graphite
|
|||
import (
|
||||
"net"
|
||||
"strings"
|
||||
|
||||
"github.com/influxdb/influxdb"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -59,7 +61,7 @@ func (u *UDPServer) ListenAndServe(iface string) error {
|
|||
}
|
||||
|
||||
// Send the data to database
|
||||
u.writer.WriteSeries(u.Database, "", point)
|
||||
u.writer.WriteSeries(u.Database, "", []influxdb.Point{point})
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
|
|
@ -1211,6 +1211,7 @@ type createSeriesIfNotExistsCommand struct {
|
|||
Tags map[string]string `json:"tags"`
|
||||
}
|
||||
|
||||
// Point defines the values that will be written to the database
|
||||
type Point struct {
|
||||
Name string
|
||||
Tags map[string]string
|
||||
|
@ -1220,7 +1221,7 @@ type Point struct {
|
|||
|
||||
// WriteSeries writes series data to the database.
|
||||
// Returns the messaging index the data was written to.
|
||||
func (s *Server) WriteSeries(database, retentionPolicy string, points ...Point) (uint64, error) {
|
||||
func (s *Server) WriteSeries(database, retentionPolicy string, points []Point) (uint64, error) {
|
||||
// TODO corylanou: implement batch writing
|
||||
if len(points) != 1 {
|
||||
return 0, errors.New("batching WriteSeries has not been implemented yet")
|
||||
|
|
|
@ -506,7 +506,7 @@ func TestServer_WriteSeries(t *testing.T) {
|
|||
// Write series with one point to the database.
|
||||
tags := map[string]string{"host": "servera.influx.com", "region": "uswest"}
|
||||
values := map[string]interface{}{"value": 23.2}
|
||||
index, err := s.WriteSeries("foo", "mypolicy", influxdb.Point{Name: "cpu_load", Tags: tags, Timestamp: mustParseTime("2000-01-01T00:00:00Z"), Values: values})
|
||||
index, err := s.WriteSeries("foo", "mypolicy", []influxdb.Point{influxdb.Point{Name: "cpu_load", Tags: tags, Timestamp: mustParseTime("2000-01-01T00:00:00Z"), Values: values}})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
} else if err = s.Sync(index); err != nil {
|
||||
|
@ -514,7 +514,7 @@ func TestServer_WriteSeries(t *testing.T) {
|
|||
}
|
||||
|
||||
// Write another point 10 seconds later so it goes through "raw series".
|
||||
index, err = s.WriteSeries("foo", "mypolicy", influxdb.Point{Name: "cpu_load", Tags: tags, Timestamp: mustParseTime("2000-01-01T00:00:10Z"), Values: values})
|
||||
index, err = s.WriteSeries("foo", "mypolicy", []influxdb.Point{influxdb.Point{Name: "cpu_load", Tags: tags, Timestamp: mustParseTime("2000-01-01T00:00:10Z"), Values: values}})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
} else if err = s.Sync(index); err != nil {
|
||||
|
@ -567,7 +567,7 @@ func TestServer_Measurements(t *testing.T) {
|
|||
tags := map[string]string{"host": "servera.influx.com", "region": "uswest"}
|
||||
values := map[string]interface{}{"value": 23.2}
|
||||
|
||||
index, err := s.WriteSeries("foo", "mypolicy", influxdb.Point{Name: "cpu_load", Tags: tags, Timestamp: timestamp, Values: values})
|
||||
index, err := s.WriteSeries("foo", "mypolicy", []influxdb.Point{influxdb.Point{Name: "cpu_load", Tags: tags, Timestamp: timestamp, Values: values}})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
} else if err = s.Sync(index); err != nil {
|
||||
|
|
Loading…
Reference in New Issue