models: Added AppendString, PointSize, and Round to Point

This change also updates the UDP client to take advantage of these
improvements, as well as some code review changes.
pull/7305/head
Joe LeGasse 2016-09-14 11:55:31 -04:00
parent ee6816756a
commit 0d2b339d7c
4 changed files with 124 additions and 40 deletions

View File

@ -58,7 +58,8 @@ type BatchPointsConfig struct {
// Client is a client interface for writing & querying the database
type Client interface {
// Ping checks that status of cluster
// Ping checks that status of cluster, and will always return 0 time and no
// error for UDP clients
Ping(timeout time.Duration) (time.Duration, string, error)
// Write takes a BatchPoints object and writes all Points to InfluxDB.

View File

@ -1,23 +1,16 @@
package client
import (
"bytes"
"errors"
"fmt"
"io"
"net"
"time"
)
var ErrLargePoint = errors.New("point exceeds allowed size")
const (
// UDPPayloadSize is a reasonable default payload size for UDP packets that
// could be travelling over the internet.
UDPPayloadSize = 512
// MaxPayloadSize is a safe maximum limit for a UDP payload over IPv4
MaxUDPPayloadSize = 65467
)
// UDPConfig is the config data needed to create a UDP Client
@ -27,7 +20,7 @@ type UDPConfig struct {
Addr string
// PayloadSize is the maximum size of a UDP client message, optional
// Tune this based on your network. Defaults to UDPBufferSize.
// Tune this based on your network. Defaults to UDPPayloadSize.
PayloadSize int
}
@ -56,12 +49,6 @@ func NewUDPClient(conf UDPConfig) (Client, error) {
}, nil
}
// Ping will check to see if the server is up with an optional timeout on waiting for leader.
// Ping returns how long the request took, the version of the server it connected to, and an error if one occurred.
func (uc *udpclient) Ping(timeout time.Duration) (time.Duration, string, error) {
return 0, "", nil
}
// Close releases the udpclient's resources.
func (uc *udpclient) Close() error {
return uc.conn.Close()
@ -73,45 +60,43 @@ type udpclient struct {
}
func (uc *udpclient) Write(bp BatchPoints) error {
var b bytes.Buffer
var d time.Duration
d, _ = time.ParseDuration("1" + bp.Precision())
var b = make([]byte, 0, uc.payloadSize) // initial buffer size, it will grow as needed
var d, _ = time.ParseDuration("1" + bp.Precision())
var delayedError error
var checkBuffer = func(s string) {
if b.Len() > 0 && b.Len()+len(s) > uc.payloadSize {
if _, err := uc.conn.Write(b.Bytes()); err != nil {
var checkBuffer = func(n int) {
if len(b) > 0 && len(b)+n > uc.payloadSize {
if _, err := uc.conn.Write(b); err != nil {
delayedError = err
}
b.Reset()
b = b[:0]
}
}
for _, p := range bp.Points() {
point := p.pt.RoundedString(d) + "\n"
if len(point) > MaxUDPPayloadSize {
delayedError = ErrLargePoint
p.pt.Round(d)
pointSize := p.pt.StringSize() + 1 // include newline in size
//point := p.pt.RoundedString(d) + "\n"
checkBuffer(pointSize)
if p.Time().IsZero() || pointSize <= uc.payloadSize {
b = p.pt.AppendString(b)
b = append(b, '\n')
continue
}
checkBuffer(point)
if p.Time().IsZero() || len(point) <= uc.payloadSize {
b.WriteString(point)
continue
}
points := p.pt.Split(uc.payloadSize - 1) // newline will be added
points := p.pt.Split(uc.payloadSize - 1) // account for newline character
for _, sp := range points {
point = sp.RoundedString(d) + "\n"
checkBuffer(point)
b.WriteString(point)
checkBuffer(sp.StringSize() + 1)
b = sp.AppendString(b)
b = append(b, '\n')
}
}
if b.Len() > 0 {
if _, err := uc.conn.Write(b.Bytes()); err != nil {
if len(b) > 0 {
if _, err := uc.conn.Write(b); err != nil {
return err
}
}
@ -121,3 +106,7 @@ func (uc *udpclient) Write(bp BatchPoints) error {
func (uc *udpclient) Query(q Query) (*Response, error) {
return nil, fmt.Errorf("Querying via UDP is not supported")
}
func (uc *udpclient) Ping(timeout time.Duration) (time.Duration, string, error) {
return 0, "", nil
}

View File

@ -80,6 +80,16 @@ type Point interface {
// string representations are no longer than size. Points with a single field or
// a point without a timestamp may exceed the requested size.
Split(size int) []Point
// Round will round the timestamp of the point to the given duration
Round(d time.Duration)
// StringSize returns the length of the string that would be returned by String()
StringSize() int
// AppendString appends the result of String() to the provided buffer and returns
// the result, potentially reducing string allocations
AppendString(buf []byte) []byte
}
// Points represents a sortable list of points by timestamp.
@ -1234,6 +1244,11 @@ func (p *point) SetTime(t time.Time) {
p.time = t
}
// Round implements Point.Round
func (p *point) Round(d time.Duration) {
p.time = p.time.Round(d)
}
// Tags returns the tag set for the point
func (p *point) Tags() Tags {
return parseTags(p.key)
@ -1332,6 +1347,41 @@ func (p *point) String() string {
return string(p.Key()) + " " + string(p.fields) + " " + strconv.FormatInt(p.UnixNano(), 10)
}
// AppendString implements Point.AppendString
func (p *point) AppendString(buf []byte) []byte {
buf = append(buf, p.key...)
buf = append(buf, ' ')
buf = append(buf, p.fields...)
if !p.time.IsZero() {
buf = append(buf, ' ')
buf = strconv.AppendInt(buf, p.UnixNano(), 10)
}
return buf
}
func (p *point) StringSize() int {
size := len(p.key) + len(p.fields) + 1
if !p.time.IsZero() {
digits := 1 // even "0" has one digit
t := p.UnixNano()
if t < 0 {
// account for negative sign, then negate
digits++
t = -t
}
for t > 9 { // already accounted for one digit
digits++
t /= 10
}
size += digits + 1 // digits and a space
}
return size
}
func (p *point) MarshalBinary() ([]byte, error) {
tb, err := p.time.MarshalBinary()
if err != nil {

View File

@ -21,8 +21,8 @@ var (
"uint32": uint32(math.MaxUint32),
"string": "String field that has a decent length, probably some log message or something",
"boolean": false,
"float64-tiny": math.SmallestNonzeroFloat64,
"float64-large": math.MaxFloat64,
"float64-tiny": float64(math.SmallestNonzeroFloat64),
"float64-large": float64(math.MaxFloat64),
}
maxFloat64 = strconv.FormatFloat(math.MaxFloat64, 'f', 1, 64)
minFloat64 = strconv.FormatFloat(-math.MaxFloat64, 'f', 1, 64)
@ -43,6 +43,50 @@ func BenchmarkMarshal(b *testing.B) {
}
}
func TestPoint_StringSize(t *testing.T) {
testPoint_cube(t, func(p models.Point) {
l := p.StringSize()
s := p.String()
if l != len(s) {
t.Errorf("Incorrect length for %q. got %v, exp %v", s, l, len(s))
}
})
}
func TestPoint_AppendString(t *testing.T) {
testPoint_cube(t, func(p models.Point) {
got := p.AppendString(nil)
exp := []byte(p.String())
if !reflect.DeepEqual(exp, got) {
t.Errorf("AppendString() didn't match String(): got %v, exp %v", got, exp)
}
})
}
func testPoint_cube(t *testing.T, f func(p models.Point)) {
// heard of a table-driven test? let's make a cube-driven test...
tagList := []models.Tags{nil, {{[]byte("foo"), []byte("bar")}}, tags}
fieldList := []models.Fields{{"a": 42.0}, {"a": 42, "b": "things"}, fields}
timeList := []time.Time{time.Time{}, time.Unix(0, 0), time.Unix(-34526, 0), time.Unix(231845, 0), time.Now()}
for _, tagSet := range tagList {
for _, fieldSet := range fieldList {
for _, pointTime := range timeList {
p, err := models.NewPoint("test", tagSet, fieldSet, pointTime)
if err != nil {
t.Errorf("unexpected error creating point: %v", err)
continue
}
f(p)
}
}
}
}
var p models.Point
func BenchmarkNewPoint(b *testing.B) {