udp client: large points will now be split, if possible

The v2 UDP client will attempt to split points that exceed the
configured payload size. It will only do this for points that have a
timestamp specified.
pull/7305/head
Joe LeGasse 2016-09-13 21:15:41 -04:00
parent 86f3300a46
commit ee6816756a
5 changed files with 338 additions and 342 deletions

View File

@ -7,7 +7,6 @@ import (
"errors"
"fmt"
"io/ioutil"
"net"
"net/http"
"net/url"
"time"
@ -15,15 +14,6 @@ import (
"github.com/influxdata/influxdb/models"
)
var ErrLargePoint = errors.New("point exceeds allowed size")
// UDPPayloadSize is a reasonable default payload size for UDP packets that
// could be travelling over the internet.
const (
UDPPayloadSize = 512
MaxPointSize = 65507
)
// HTTPConfig is the config data needed to create an HTTP Client
type HTTPConfig struct {
// Addr should be of the form "http://host:port"
@ -51,17 +41,6 @@ type HTTPConfig struct {
TLSConfig *tls.Config
}
// UDPConfig is the config data needed to create a UDP Client
type UDPConfig struct {
// Addr should be of the form "host:port"
// or "[ipv6-host%zone]:port".
Addr string
// PayloadSize is the maximum size of a UDP client message, optional
// Tune this based on your network. Defaults to UDPBufferSize.
PayloadSize int
}
// BatchPointsConfig is the config data needed to create an instance of the BatchPoints struct
type BatchPointsConfig struct {
// Precision is the write precision of the points, defaults to "ns"
@ -180,42 +159,6 @@ func (c *client) Close() error {
return nil
}
// NewUDPClient returns a client interface for writing to an InfluxDB UDP
// service from the given config.
func NewUDPClient(conf UDPConfig) (Client, error) {
var udpAddr *net.UDPAddr
udpAddr, err := net.ResolveUDPAddr("udp", conf.Addr)
if err != nil {
return nil, err
}
conn, err := net.DialUDP("udp", nil, udpAddr)
if err != nil {
return nil, err
}
payloadSize := conf.PayloadSize
if payloadSize == 0 {
payloadSize = UDPPayloadSize
}
return &udpclient{
conn: conn,
payloadSize: payloadSize,
}, nil
}
// Ping will check to see if the server is up with an optional timeout on waiting for leader.
// Ping returns how long the request took, the version of the server it connected to, and an error if one occurred.
func (uc *udpclient) Ping(timeout time.Duration) (time.Duration, string, error) {
return 0, "", nil
}
// Close releases the udpclient's resources.
func (uc *udpclient) Close() error {
return uc.conn.Close()
}
// client is safe for concurrent use as the fields are all read-only
// once the client is instantiated.
type client struct {
@ -229,11 +172,6 @@ type client struct {
transport *http.Transport
}
type udpclient struct {
conn *net.UDPConn
payloadSize int
}
// BatchPoints is an interface into a batched grouping of points to write into
// InfluxDB together. BatchPoints is NOT thread-safe, you must create a separate
// batch for each goroutine.
@ -408,59 +346,6 @@ func NewPointFrom(pt models.Point) *Point {
return &Point{pt: pt}
}
func (uc *udpclient) Write(bp BatchPoints) error {
var b bytes.Buffer
var d time.Duration
d, _ = time.ParseDuration("1" + bp.Precision())
for _, p := range bp.Points() {
pointstring := p.pt.RoundedString(d) + "\n"
if len(pointstring) > MaxPointSize {
return ErrLargePoint
}
if b.Len() > 0 && b.Len()+len(pointstring) > uc.payloadSize {
if _, err := uc.conn.Write(b.Bytes()); err != nil {
return err
}
b.Reset()
}
if b.Len()+len(pointstring) <= uc.payloadSize {
b.WriteString(pointstring)
continue
}
if p.Time().IsZero() {
b.WriteString(pointstring)
continue
}
points, err := p.pt.SplitN(uc.payloadSize - 2) // -2 because of `+ "\n"`
if err != nil {
return err
}
for _, sp := range points {
pointstring := sp.RoundedString(d) + "\n"
if b.Len() > 0 && b.Len()+len(pointstring) > uc.payloadSize {
if _, err := uc.conn.Write(b.Bytes()); err != nil {
return err
}
b.Reset()
}
b.WriteString(pointstring)
}
}
if b.Len() > 0 {
if _, err := uc.conn.Write(b.Bytes()); err != nil {
return err
}
}
return nil
}
func (c *client) Write(bp BatchPoints) error {
var b bytes.Buffer
@ -563,10 +448,6 @@ type Result struct {
Err string `json:"error,omitempty"`
}
func (uc *udpclient) Query(q Query) (*Response, error) {
return nil, fmt.Errorf("Querying via UDP is not supported")
}
// Query sends a command to the server and returns the Response
func (c *client) Query(q Query) (*Response, error) {
u := c.url

View File

@ -2,7 +2,6 @@ package client
import (
"encoding/json"
"net"
"net/http"
"net/http/httptest"
"reflect"
@ -10,8 +9,6 @@ import (
"sync"
"testing"
"time"
"github.com/influxdata/influxdb/models"
)
func TestUDPClient_Query(t *testing.T) {
@ -75,6 +72,68 @@ func TestUDPClient_BadAddr(t *testing.T) {
}
}
func TestUDPClient_Batches(t *testing.T) {
var logger writeLogger
var cl udpclient
cl.conn = &logger
cl.payloadSize = 20 // should allow for two points per batch
// expected point should look like this: "cpu a=1i"
fields := map[string]interface{}{"a": 1}
p, _ := NewPoint("cpu", nil, fields, time.Time{})
bp, _ := NewBatchPoints(BatchPointsConfig{})
for i := 0; i < 9; i++ {
bp.AddPoint(p)
}
if err := cl.Write(bp); err != nil {
t.Fatalf("Unexpected error during Write: %v", err)
}
if len(logger.writes) != 5 {
t.Errorf("Mismatched write count: got %v, exp %v", len(logger.writes), 5)
}
}
func TestUDPClient_Split(t *testing.T) {
var logger writeLogger
var cl udpclient
cl.conn = &logger
cl.payloadSize = 1 // force one field per point
fields := map[string]interface{}{"a": 1, "b": 2, "c": 3, "d": 4}
p, _ := NewPoint("cpu", nil, fields, time.Unix(1, 0))
bp, _ := NewBatchPoints(BatchPointsConfig{})
bp.AddPoint(p)
if err := cl.Write(bp); err != nil {
t.Fatalf("Unexpected error during Write: %v", err)
}
if len(logger.writes) != len(fields) {
t.Errorf("Mismatched write count: got %v, exp %v", len(logger.writes), len(fields))
}
}
type writeLogger struct {
writes [][]byte
}
func (w *writeLogger) Write(b []byte) (int, error) {
w.writes = append(w.writes, append([]byte(nil), b...))
return len(b), nil
}
func (w *writeLogger) Close() error { return nil }
func TestClient_Query(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
var data Response
@ -419,107 +478,3 @@ func TestBatchPoints_SettersGetters(t *testing.T) {
t.Errorf("Expected: %s, got %s", bp.WriteConsistency(), "wc2")
}
}
var testPoints = func() (input []*Point, outputPointsCount int) {
tags := map[string]string{"tag1": "blabla"}
timeNow := time.Now()
fields := map[string]interface{}{
"aaaaaaaaa": "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
"bbbbbbbbb": "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb",
}
pt, _ := NewPoint("point 1", tags, fields, timeNow)
input = append(input, pt)
fields = map[string]interface{}{
"aaaaaaaaa": "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
"bbbbbbbbb": "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb",
"ccccccccc": "cccccccccccccccccccccccccccccccccccccccccccccccc",
"ddddddddd": "dddddddddddddddddddddddddddddddddddddddddddddddd",
"eeeeeeeee": "eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee",
"fffffffff": "ffffffffffffffffffffffffffffffffffffffffffffffff",
"ggggggggg": "gggggggggggggggggggggggggggggggggggggggggggggggg",
"hhhhhhhhh": "hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhh",
"iiiiiiiii": "iiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiii",
}
pt, _ = NewPoint("point 2", tags, fields, timeNow)
input = append(input, pt)
pt, _ = NewPoint("point 3", tags, fields)
input = append(input, pt)
outputPointsCount = 4
return
}
func TestWriteChunks(t *testing.T) {
var (
UDPAddr = "localhost:8888"
done = make(chan bool)
input, output = testPoints()
)
addr, err := net.ResolveUDPAddr("udp", UDPAddr)
if err != nil {
t.Fatal(err)
}
conn, err := net.ListenUDP("udp", addr)
if err != nil {
t.Fatal(err)
}
defer conn.Close()
go func() {
var counter int
for {
buf := make([]byte, 1024*4)
n, _, err := conn.ReadFromUDP(buf)
if err != nil {
t.Fatal(err)
}
points, err := models.ParsePoints([]byte(buf[:n]))
if err != nil {
t.Fatal(err)
}
counter += len(points)
if counter == output {
done <- true
return
}
}
}()
client, err := NewUDPClient(UDPConfig{Addr: UDPAddr})
if err != nil {
t.Fatal(err)
}
bp, err := NewBatchPoints(BatchPointsConfig{
Precision: "ns",
Database: "db",
RetentionPolicy: "rp",
WriteConsistency: "wc",
})
if err != nil {
t.Fatal(err)
}
for _, p := range input {
bp.AddPoint(p)
}
err = client.Write(bp)
if err != nil {
t.Fatal(err)
}
select {
case <-done:
case <-time.After(time.Second * 3):
t.Fatal("Input points count is not equal output points count")
}
}

123
client/v2/udp.go Normal file
View File

@ -0,0 +1,123 @@
package client
import (
"bytes"
"errors"
"fmt"
"io"
"net"
"time"
)
var ErrLargePoint = errors.New("point exceeds allowed size")
const (
// UDPPayloadSize is a reasonable default payload size for UDP packets that
// could be travelling over the internet.
UDPPayloadSize = 512
// MaxPayloadSize is a safe maximum limit for a UDP payload over IPv4
MaxUDPPayloadSize = 65467
)
// UDPConfig is the config data needed to create a UDP Client
type UDPConfig struct {
// Addr should be of the form "host:port"
// or "[ipv6-host%zone]:port".
Addr string
// PayloadSize is the maximum size of a UDP client message, optional
// Tune this based on your network. Defaults to UDPBufferSize.
PayloadSize int
}
// NewUDPClient returns a client interface for writing to an InfluxDB UDP
// service from the given config.
func NewUDPClient(conf UDPConfig) (Client, error) {
var udpAddr *net.UDPAddr
udpAddr, err := net.ResolveUDPAddr("udp", conf.Addr)
if err != nil {
return nil, err
}
conn, err := net.DialUDP("udp", nil, udpAddr)
if err != nil {
return nil, err
}
payloadSize := conf.PayloadSize
if payloadSize == 0 {
payloadSize = UDPPayloadSize
}
return &udpclient{
conn: conn,
payloadSize: payloadSize,
}, nil
}
// Ping will check to see if the server is up with an optional timeout on waiting for leader.
// Ping returns how long the request took, the version of the server it connected to, and an error if one occurred.
func (uc *udpclient) Ping(timeout time.Duration) (time.Duration, string, error) {
return 0, "", nil
}
// Close releases the udpclient's resources.
func (uc *udpclient) Close() error {
return uc.conn.Close()
}
type udpclient struct {
conn io.WriteCloser
payloadSize int
}
func (uc *udpclient) Write(bp BatchPoints) error {
var b bytes.Buffer
var d time.Duration
d, _ = time.ParseDuration("1" + bp.Precision())
var delayedError error
var checkBuffer = func(s string) {
if b.Len() > 0 && b.Len()+len(s) > uc.payloadSize {
if _, err := uc.conn.Write(b.Bytes()); err != nil {
delayedError = err
}
b.Reset()
}
}
for _, p := range bp.Points() {
point := p.pt.RoundedString(d) + "\n"
if len(point) > MaxUDPPayloadSize {
delayedError = ErrLargePoint
continue
}
checkBuffer(point)
if p.Time().IsZero() || len(point) <= uc.payloadSize {
b.WriteString(point)
continue
}
points := p.pt.Split(uc.payloadSize - 1) // newline will be added
for _, sp := range points {
point = sp.RoundedString(d) + "\n"
checkBuffer(point)
b.WriteString(point)
}
}
if b.Len() > 0 {
if _, err := uc.conn.Write(b.Bytes()); err != nil {
return err
}
}
return delayedError
}
func (uc *udpclient) Query(q Query) (*Response, error) {
return nil, fmt.Errorf("Querying via UDP is not supported")
}

View File

@ -76,10 +76,10 @@ type Point interface {
// given duration
RoundedString(d time.Duration) string
// SplitN splits a point into multiple points and ensures that
// the string length of any returned point is no larger than the input size,
// except points which consists from one field, they can be larger than the input size.
SplitN(size int) ([]Point, error)
// Split will attempt to return multiple points with the same timestamp whose
// string representations are no longer than size. Points with a single field or
// a point without a timestamp may exceed the requested size.
Split(size int) []Point
}
// Points represents a sortable list of points by timestamp.
@ -996,11 +996,7 @@ func scanTagValue(buf []byte, i int) (int, []byte) {
func scanFieldValue(buf []byte, i int) (int, []byte) {
start := i
quoted := false
for {
if i >= len(buf) {
break
}
for i < len(buf) {
// Only escape char for a field value is a double-quote
if buf[i] == '\\' && i+1 < len(buf) && buf[i+1] == '"' {
i += 2
@ -1122,20 +1118,42 @@ func unescapeStringField(in string) string {
// NewPoint returns a new point with the given measurement name, tags, fields and timestamp. If
// an unsupported field value (NaN) or out of range time is passed, this function returns an error.
func NewPoint(name string, tags Tags, fields Fields, time time.Time) (Point, error) {
func NewPoint(name string, tags Tags, fields Fields, t time.Time) (Point, error) {
key, err := pointKey(name, tags, fields, t)
if err != nil {
return nil, err
}
return &point{
key: key,
time: t,
fields: fields.MarshalBinary(),
}, nil
}
// pointKey checks some basic requirements for valid points, and returns the
// key, along with an possible error
func pointKey(measurement string, tags Tags, fields Fields, t time.Time) ([]byte, error) {
if len(fields) == 0 {
return nil, ErrPointMustHaveAField
}
if !time.IsZero() {
if err := CheckTime(time); err != nil {
if !t.IsZero() {
if err := CheckTime(t); err != nil {
return nil, err
}
}
for key, value := range fields {
if fv, ok := value.(float64); ok {
switch value := value.(type) {
case float64:
// Ensure the caller validates and handles invalid field values
if math.IsNaN(fv) {
if math.IsNaN(value) {
return nil, fmt.Errorf("NaN is an unsupported value for field %s", key)
}
case float32:
// Ensure the caller validates and handles invalid field values
if math.IsNaN(float64(value)) {
return nil, fmt.Errorf("NaN is an unsupported value for field %s", key)
}
}
@ -1144,16 +1162,12 @@ func NewPoint(name string, tags Tags, fields Fields, time time.Time) (Point, err
}
}
key := MakeKey([]byte(name), tags)
key := MakeKey([]byte(measurement), tags)
if len(key) > MaxKeyLength {
return nil, fmt.Errorf("max key length exceeded: %v > %v", len(key), MaxKeyLength)
}
return &point{
key: key,
time: time,
fields: fields.MarshalBinary(),
}, nil
return key, nil
}
// NewPointFromBytes returns a new Point from a marshalled Point.
@ -1391,47 +1405,40 @@ func (p *point) UnixNano() int64 {
return p.Time().UnixNano()
}
func (p *point) SplitN(size int) ([]Point, error) {
if len(p.String()) <= size {
return []Point{p}, nil
func (p *point) Split(size int) []Point {
if p.time.IsZero() || len(p.String()) <= size {
return []Point{p}
}
points := []Point{}
tags := p.Tags()
fields := make(Fields)
// key string, timestamp string, spaces
size -= len(p.key) + len(strconv.FormatInt(p.time.UnixNano(), 10)) + 2
for k, v := range p.Fields() {
fields[k] = v
next, err := NewPoint(p.Name(), tags, fields, p.Time())
if err != nil {
return []Point{}, fmt.Errorf("failed to create new point: %s", err)
var points []Point
var start, cur int
for cur < len(p.fields) {
end, _ := scanTo(p.fields, cur, '=')
end, _ = scanFieldValue(p.fields, end+1)
if cur > start && end-start > size {
points = append(points, &point{
key: p.key,
time: p.time,
fields: p.fields[start : cur-1],
})
start = cur
}
if len(next.String()) > size {
if len(fields) == 1 {
points = append(points, next)
fields = make(Fields)
} else {
delete(fields, k)
pt, err := NewPoint(p.Name(), tags, fields, p.Time())
if err != nil {
return []Point{}, fmt.Errorf("failed to create new point: %s", err)
}
points = append(points, pt)
fields = Fields{k: v}
}
}
cur = end + 1
}
if len(fields) != 0 {
pt, err := NewPoint(p.Name(), tags, fields, p.Time())
if err != nil {
return []Point{}, fmt.Errorf("failed to create new point: %s", err)
}
points = append(points, pt)
}
points = append(points, &point{
key: p.key,
time: p.time,
fields: p.fields[start:],
})
return points, nil
return points
}
// Tag represents a single key/value tag pair.
@ -1445,6 +1452,9 @@ type Tags []Tag
// NewTags returns a new Tags from a map.
func NewTags(m map[string]string) Tags {
if len(m) == 0 {
return nil
}
a := make(Tags, 0, len(m))
for k, v := range m {
a = append(a, Tag{Key: []byte(k), Value: []byte(v)})
@ -1643,62 +1653,76 @@ func newFieldsFromBinary(buf []byte) Fields {
// represenation
// NOTE: uint64 is specifically not supported due to potential overflow when we decode
// again later to an int64
// NOTE2: uint is accepted, and may be 64 bits, and is for some reason accepted...
func (p Fields) MarshalBinary() []byte {
b := []byte{}
keys := make([]string, len(p))
i := 0
var b []byte
keys := make([]string, 0, len(p))
for k := range p {
keys[i] = k
i++
keys = append(keys, k)
}
// Not really necessary, can probably be removed.
sort.Strings(keys)
for _, k := range keys {
v := p[k]
for i, k := range keys {
if i > 0 {
b = append(b, ',')
}
b = appendField(b, k, p[k])
}
return b
}
func appendField(b []byte, k string, v interface{}) []byte {
b = append(b, []byte(escape.String(k))...)
b = append(b, '=')
switch t := v.(type) {
case int:
b = append(b, []byte(strconv.FormatInt(int64(t), 10))...)
b = append(b, 'i')
case int8:
b = append(b, []byte(strconv.FormatInt(int64(t), 10))...)
b = append(b, 'i')
case int16:
b = append(b, []byte(strconv.FormatInt(int64(t), 10))...)
b = append(b, 'i')
case int32:
b = append(b, []byte(strconv.FormatInt(int64(t), 10))...)
b = append(b, 'i')
case int64:
b = append(b, []byte(strconv.FormatInt(t, 10))...)
b = append(b, 'i')
case uint:
b = append(b, []byte(strconv.FormatInt(int64(t), 10))...)
b = append(b, 'i')
case uint8:
b = append(b, []byte(strconv.FormatInt(int64(t), 10))...)
b = append(b, 'i')
case uint16:
b = append(b, []byte(strconv.FormatInt(int64(t), 10))...)
b = append(b, 'i')
case uint32:
b = append(b, []byte(strconv.FormatInt(int64(t), 10))...)
b = append(b, 'i')
case float32:
val := []byte(strconv.FormatFloat(float64(t), 'f', -1, 32))
b = append(b, val...)
// check popular types first
switch v := v.(type) {
case float64:
val := []byte(strconv.FormatFloat(t, 'f', -1, 64))
b = append(b, val...)
case bool:
b = append(b, []byte(strconv.FormatBool(t))...)
case []byte:
b = append(b, t...)
b = strconv.AppendFloat(b, v, 'f', -1, 64)
case int64:
b = strconv.AppendInt(b, v, 10)
b = append(b, 'i')
case string:
b = append(b, '"')
b = append(b, []byte(EscapeStringField(t))...)
b = append(b, []byte(EscapeStringField(v))...)
b = append(b, '"')
case bool:
b = strconv.AppendBool(b, v)
case int32:
b = strconv.AppendInt(b, int64(v), 10)
b = append(b, 'i')
case int16:
b = strconv.AppendInt(b, int64(v), 10)
b = append(b, 'i')
case int8:
b = strconv.AppendInt(b, int64(v), 10)
b = append(b, 'i')
case int:
b = strconv.AppendInt(b, int64(v), 10)
b = append(b, 'i')
case uint32:
b = strconv.AppendInt(b, int64(v), 10)
b = append(b, 'i')
case uint16:
b = strconv.AppendInt(b, int64(v), 10)
b = append(b, 'i')
case uint8:
b = strconv.AppendInt(b, int64(v), 10)
b = append(b, 'i')
// TODO: 'uint' should be considered just as "dangerous" as a uint64,
// perhaps the value should be checked and capped at MaxInt64? We could
// then include uint64 as an accepted value
case uint:
b = strconv.AppendInt(b, int64(v), 10)
b = append(b, 'i')
case float32:
b = strconv.AppendFloat(b, float64(v), 'f', -1, 32)
case []byte:
b = append(b, v...)
case nil:
// skip
default:
@ -1708,11 +1732,7 @@ func (p Fields) MarshalBinary() []byte {
b = append(b, '"')
}
b = append(b, ',')
}
if len(b) > 0 {
return b[0 : len(b)-1]
}
return b
}

View File

@ -16,6 +16,14 @@ import (
var (
tags = models.NewTags(map[string]string{"foo": "bar", "apple": "orange", "host": "serverA", "region": "uswest"})
fields = models.Fields{
"int64": int64(math.MaxInt64),
"uint32": uint32(math.MaxUint32),
"string": "String field that has a decent length, probably some log message or something",
"boolean": false,
"float64-tiny": math.SmallestNonzeroFloat64,
"float64-large": math.MaxFloat64,
}
maxFloat64 = strconv.FormatFloat(math.MaxFloat64, 'f', 1, 64)
minFloat64 = strconv.FormatFloat(-math.MaxFloat64, 'f', 1, 64)
)
@ -35,6 +43,15 @@ func BenchmarkMarshal(b *testing.B) {
}
}
var p models.Point
func BenchmarkNewPoint(b *testing.B) {
ts := time.Now()
for i := 0; i < b.N; i++ {
p, _ = models.NewPoint("measurement", tags, fields, ts)
}
}
func BenchmarkParsePointNoTags(b *testing.B) {
line := `cpu value=1i 1000000000`
for i := 0; i < b.N; i++ {