Merge pull request #7305 from influxdata/jl-udp-split

UDP Client: Split large points
pull/7327/head
joelegasse 2016-09-23 15:30:21 -04:00 committed by GitHub
commit c2315b208c
7 changed files with 457 additions and 168 deletions

View File

@ -12,6 +12,7 @@
- [#7136](https://github.com/influxdata/influxdb/pull/7136): Update jwt-go dependency to version 3.
- [#6962](https://github.com/influxdata/influxdb/issues/6962): Support ON and use default database for SHOW commands.
- [#7268](https://github.com/influxdata/influxdb/pull/7268): More man pages for the other tools we package and compress man pages fully.
- [#7305](https://github.com/influxdata/influxdb/pull/7305): UDP Client: Split large points. Thanks @vlasad
### Bugfixes

View File

@ -255,6 +255,28 @@ func WriteUDP() {
}
```
### Point Splitting
The UDP client now supports splitting single points that exceed the configured
payload size. The logic for processing each point is listed here, starting with
an empty payload.
1. If adding the point to the current (non-empty) payload would exceed the
configured size, send the current payload. Otherwise, add it to the current
payload.
1. If the point is smaller than the configured size, add it to the payload.
1. If the point has no timestamp, just try to send the entire point as a single
UDP payload, and process the next point.
1. Since the point has a timestamp, re-use the existing measurement name,
tagset, and timestamp and create multiple new points by splitting up the
fields. The per-point length will be kept close to the configured size,
staying under it if possible. This does mean that one large field, maybe a
long string, could be sent as a larger-than-configured payload.
The above logic attempts to respect configured payload sizes, but not sacrifice
any data integrity. Points without a timestamp can't be split, as that may
cause fields to have differing timestamps when processed by the server.
## Go Docs
Please refer to

View File

@ -7,7 +7,6 @@ import (
"errors"
"fmt"
"io/ioutil"
"net"
"net/http"
"net/url"
"time"
@ -15,12 +14,6 @@ import (
"github.com/influxdata/influxdb/models"
)
// UDPPayloadSize is a reasonable default payload size for UDP packets that
// could be travelling over the internet.
const (
UDPPayloadSize = 512
)
// HTTPConfig is the config data needed to create an HTTP Client
type HTTPConfig struct {
// Addr should be of the form "http://host:port"
@ -48,17 +41,6 @@ type HTTPConfig struct {
TLSConfig *tls.Config
}
// UDPConfig is the config data needed to create a UDP Client
type UDPConfig struct {
// Addr should be of the form "host:port"
// or "[ipv6-host%zone]:port".
Addr string
// PayloadSize is the maximum size of a UDP client message, optional
// Tune this based on your network. Defaults to UDPBufferSize.
PayloadSize int
}
// BatchPointsConfig is the config data needed to create an instance of the BatchPoints struct
type BatchPointsConfig struct {
// Precision is the write precision of the points, defaults to "ns"
@ -76,7 +58,8 @@ type BatchPointsConfig struct {
// Client is a client interface for writing & querying the database
type Client interface {
// Ping checks that status of cluster
// Ping checks that status of cluster, and will always return 0 time and no
// error for UDP clients
Ping(timeout time.Duration) (time.Duration, string, error)
// Write takes a BatchPoints object and writes all Points to InfluxDB.
@ -177,42 +160,6 @@ func (c *client) Close() error {
return nil
}
// NewUDPClient returns a client interface for writing to an InfluxDB UDP
// service from the given config.
func NewUDPClient(conf UDPConfig) (Client, error) {
var udpAddr *net.UDPAddr
udpAddr, err := net.ResolveUDPAddr("udp", conf.Addr)
if err != nil {
return nil, err
}
conn, err := net.DialUDP("udp", nil, udpAddr)
if err != nil {
return nil, err
}
payloadSize := conf.PayloadSize
if payloadSize == 0 {
payloadSize = UDPPayloadSize
}
return &udpclient{
conn: conn,
payloadSize: payloadSize,
}, nil
}
// Ping will check to see if the server is up with an optional timeout on waiting for leader.
// Ping returns how long the request took, the version of the server it connected to, and an error if one occurred.
func (uc *udpclient) Ping(timeout time.Duration) (time.Duration, string, error) {
return 0, "", nil
}
// Close releases the udpclient's resources.
func (uc *udpclient) Close() error {
return uc.conn.Close()
}
// client is safe for concurrent use as the fields are all read-only
// once the client is instantiated.
type client struct {
@ -226,11 +173,6 @@ type client struct {
transport *http.Transport
}
type udpclient struct {
conn *net.UDPConn
payloadSize int
}
// BatchPoints is an interface into a batched grouping of points to write into
// InfluxDB together. BatchPoints is NOT thread-safe, you must create a separate
// batch for each goroutine.
@ -405,31 +347,6 @@ func NewPointFrom(pt models.Point) *Point {
return &Point{pt: pt}
}
func (uc *udpclient) Write(bp BatchPoints) error {
var b bytes.Buffer
var d time.Duration
d, _ = time.ParseDuration("1" + bp.Precision())
for _, p := range bp.Points() {
pointstring := p.pt.RoundedString(d) + "\n"
// Write and reset the buffer if we reach the max size
if b.Len()+len(pointstring) >= uc.payloadSize {
if _, err := uc.conn.Write(b.Bytes()); err != nil {
return err
}
b.Reset()
}
if _, err := b.WriteString(pointstring); err != nil {
return err
}
}
_, err := uc.conn.Write(b.Bytes())
return err
}
func (c *client) Write(bp BatchPoints) error {
var b bytes.Buffer
@ -532,10 +449,6 @@ type Result struct {
Err string `json:"error,omitempty"`
}
func (uc *udpclient) Query(q Query) (*Response, error) {
return nil, fmt.Errorf("Querying via UDP is not supported")
}
// Query sends a command to the server and returns the Response
func (c *client) Query(q Query) (*Response, error) {
u := c.url

View File

@ -72,6 +72,68 @@ func TestUDPClient_BadAddr(t *testing.T) {
}
}
func TestUDPClient_Batches(t *testing.T) {
var logger writeLogger
var cl udpclient
cl.conn = &logger
cl.payloadSize = 20 // should allow for two points per batch
// expected point should look like this: "cpu a=1i"
fields := map[string]interface{}{"a": 1}
p, _ := NewPoint("cpu", nil, fields, time.Time{})
bp, _ := NewBatchPoints(BatchPointsConfig{})
for i := 0; i < 9; i++ {
bp.AddPoint(p)
}
if err := cl.Write(bp); err != nil {
t.Fatalf("Unexpected error during Write: %v", err)
}
if len(logger.writes) != 5 {
t.Errorf("Mismatched write count: got %v, exp %v", len(logger.writes), 5)
}
}
func TestUDPClient_Split(t *testing.T) {
var logger writeLogger
var cl udpclient
cl.conn = &logger
cl.payloadSize = 1 // force one field per point
fields := map[string]interface{}{"a": 1, "b": 2, "c": 3, "d": 4}
p, _ := NewPoint("cpu", nil, fields, time.Unix(1, 0))
bp, _ := NewBatchPoints(BatchPointsConfig{})
bp.AddPoint(p)
if err := cl.Write(bp); err != nil {
t.Fatalf("Unexpected error during Write: %v", err)
}
if len(logger.writes) != len(fields) {
t.Errorf("Mismatched write count: got %v, exp %v", len(logger.writes), len(fields))
}
}
type writeLogger struct {
writes [][]byte
}
func (w *writeLogger) Write(b []byte) (int, error) {
w.writes = append(w.writes, append([]byte(nil), b...))
return len(b), nil
}
func (w *writeLogger) Close() error { return nil }
func TestClient_Query(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
var data Response

112
client/v2/udp.go Normal file
View File

@ -0,0 +1,112 @@
package client
import (
"fmt"
"io"
"net"
"time"
)
const (
// UDPPayloadSize is a reasonable default payload size for UDP packets that
// could be travelling over the internet.
UDPPayloadSize = 512
)
// UDPConfig is the config data needed to create a UDP Client
type UDPConfig struct {
// Addr should be of the form "host:port"
// or "[ipv6-host%zone]:port".
Addr string
// PayloadSize is the maximum size of a UDP client message, optional
// Tune this based on your network. Defaults to UDPPayloadSize.
PayloadSize int
}
// NewUDPClient returns a client interface for writing to an InfluxDB UDP
// service from the given config.
func NewUDPClient(conf UDPConfig) (Client, error) {
var udpAddr *net.UDPAddr
udpAddr, err := net.ResolveUDPAddr("udp", conf.Addr)
if err != nil {
return nil, err
}
conn, err := net.DialUDP("udp", nil, udpAddr)
if err != nil {
return nil, err
}
payloadSize := conf.PayloadSize
if payloadSize == 0 {
payloadSize = UDPPayloadSize
}
return &udpclient{
conn: conn,
payloadSize: payloadSize,
}, nil
}
// Close releases the udpclient's resources.
func (uc *udpclient) Close() error {
return uc.conn.Close()
}
type udpclient struct {
conn io.WriteCloser
payloadSize int
}
func (uc *udpclient) Write(bp BatchPoints) error {
var b = make([]byte, 0, uc.payloadSize) // initial buffer size, it will grow as needed
var d, _ = time.ParseDuration("1" + bp.Precision())
var delayedError error
var checkBuffer = func(n int) {
if len(b) > 0 && len(b)+n > uc.payloadSize {
if _, err := uc.conn.Write(b); err != nil {
delayedError = err
}
b = b[:0]
}
}
for _, p := range bp.Points() {
p.pt.Round(d)
pointSize := p.pt.StringSize() + 1 // include newline in size
//point := p.pt.RoundedString(d) + "\n"
checkBuffer(pointSize)
if p.Time().IsZero() || pointSize <= uc.payloadSize {
b = p.pt.AppendString(b)
b = append(b, '\n')
continue
}
points := p.pt.Split(uc.payloadSize - 1) // account for newline character
for _, sp := range points {
checkBuffer(sp.StringSize() + 1)
b = sp.AppendString(b)
b = append(b, '\n')
}
}
if len(b) > 0 {
if _, err := uc.conn.Write(b); err != nil {
return err
}
}
return delayedError
}
func (uc *udpclient) Query(q Query) (*Response, error) {
return nil, fmt.Errorf("Querying via UDP is not supported")
}
func (uc *udpclient) Ping(timeout time.Duration) (time.Duration, string, error) {
return 0, "", nil
}

View File

@ -75,6 +75,21 @@ type Point interface {
// is a timestamp associated with the point, then it will be rounded to the
// given duration
RoundedString(d time.Duration) string
// Split will attempt to return multiple points with the same timestamp whose
// string representations are no longer than size. Points with a single field or
// a point without a timestamp may exceed the requested size.
Split(size int) []Point
// Round will round the timestamp of the point to the given duration
Round(d time.Duration)
// StringSize returns the length of the string that would be returned by String()
StringSize() int
// AppendString appends the result of String() to the provided buffer and returns
// the result, potentially reducing string allocations
AppendString(buf []byte) []byte
}
// Points represents a sortable list of points by timestamp.
@ -991,11 +1006,7 @@ func scanTagValue(buf []byte, i int) (int, []byte) {
func scanFieldValue(buf []byte, i int) (int, []byte) {
start := i
quoted := false
for {
if i >= len(buf) {
break
}
for i < len(buf) {
// Only escape char for a field value is a double-quote
if buf[i] == '\\' && i+1 < len(buf) && buf[i+1] == '"' {
i += 2
@ -1117,20 +1128,42 @@ func unescapeStringField(in string) string {
// NewPoint returns a new point with the given measurement name, tags, fields and timestamp. If
// an unsupported field value (NaN) or out of range time is passed, this function returns an error.
func NewPoint(name string, tags Tags, fields Fields, time time.Time) (Point, error) {
func NewPoint(name string, tags Tags, fields Fields, t time.Time) (Point, error) {
key, err := pointKey(name, tags, fields, t)
if err != nil {
return nil, err
}
return &point{
key: key,
time: t,
fields: fields.MarshalBinary(),
}, nil
}
// pointKey checks some basic requirements for valid points, and returns the
// key, along with an possible error
func pointKey(measurement string, tags Tags, fields Fields, t time.Time) ([]byte, error) {
if len(fields) == 0 {
return nil, ErrPointMustHaveAField
}
if !time.IsZero() {
if err := CheckTime(time); err != nil {
if !t.IsZero() {
if err := CheckTime(t); err != nil {
return nil, err
}
}
for key, value := range fields {
if fv, ok := value.(float64); ok {
switch value := value.(type) {
case float64:
// Ensure the caller validates and handles invalid field values
if math.IsNaN(fv) {
if math.IsNaN(value) {
return nil, fmt.Errorf("NaN is an unsupported value for field %s", key)
}
case float32:
// Ensure the caller validates and handles invalid field values
if math.IsNaN(float64(value)) {
return nil, fmt.Errorf("NaN is an unsupported value for field %s", key)
}
}
@ -1139,16 +1172,12 @@ func NewPoint(name string, tags Tags, fields Fields, time time.Time) (Point, err
}
}
key := MakeKey([]byte(name), tags)
key := MakeKey([]byte(measurement), tags)
if len(key) > MaxKeyLength {
return nil, fmt.Errorf("max key length exceeded: %v > %v", len(key), MaxKeyLength)
}
return &point{
key: key,
time: time,
fields: fields.MarshalBinary(),
}, nil
return key, nil
}
// NewPointFromBytes returns a new Point from a marshalled Point.
@ -1215,6 +1244,11 @@ func (p *point) SetTime(t time.Time) {
p.time = t
}
// Round implements Point.Round
func (p *point) Round(d time.Duration) {
p.time = p.time.Round(d)
}
// Tags returns the tag set for the point
func (p *point) Tags() Tags {
return parseTags(p.key)
@ -1313,6 +1347,41 @@ func (p *point) String() string {
return string(p.Key()) + " " + string(p.fields) + " " + strconv.FormatInt(p.UnixNano(), 10)
}
// AppendString implements Point.AppendString
func (p *point) AppendString(buf []byte) []byte {
buf = append(buf, p.key...)
buf = append(buf, ' ')
buf = append(buf, p.fields...)
if !p.time.IsZero() {
buf = append(buf, ' ')
buf = strconv.AppendInt(buf, p.UnixNano(), 10)
}
return buf
}
func (p *point) StringSize() int {
size := len(p.key) + len(p.fields) + 1
if !p.time.IsZero() {
digits := 1 // even "0" has one digit
t := p.UnixNano()
if t < 0 {
// account for negative sign, then negate
digits++
t = -t
}
for t > 9 { // already accounted for one digit
digits++
t /= 10
}
size += digits + 1 // digits and a space
}
return size
}
func (p *point) MarshalBinary() ([]byte, error) {
tb, err := p.time.MarshalBinary()
if err != nil {
@ -1386,6 +1455,42 @@ func (p *point) UnixNano() int64 {
return p.Time().UnixNano()
}
func (p *point) Split(size int) []Point {
if p.time.IsZero() || len(p.String()) <= size {
return []Point{p}
}
// key string, timestamp string, spaces
size -= len(p.key) + len(strconv.FormatInt(p.time.UnixNano(), 10)) + 2
var points []Point
var start, cur int
for cur < len(p.fields) {
end, _ := scanTo(p.fields, cur, '=')
end, _ = scanFieldValue(p.fields, end+1)
if cur > start && end-start > size {
points = append(points, &point{
key: p.key,
time: p.time,
fields: p.fields[start : cur-1],
})
start = cur
}
cur = end + 1
}
points = append(points, &point{
key: p.key,
time: p.time,
fields: p.fields[start:],
})
return points
}
// Tag represents a single key/value tag pair.
type Tag struct {
Key []byte
@ -1397,6 +1502,9 @@ type Tags []Tag
// NewTags returns a new Tags from a map.
func NewTags(m map[string]string) Tags {
if len(m) == 0 {
return nil
}
a := make(Tags, 0, len(m))
for k, v := range m {
a = append(a, Tag{Key: []byte(k), Value: []byte(v)})
@ -1595,76 +1703,86 @@ func newFieldsFromBinary(buf []byte) Fields {
// represenation
// NOTE: uint64 is specifically not supported due to potential overflow when we decode
// again later to an int64
// NOTE2: uint is accepted, and may be 64 bits, and is for some reason accepted...
func (p Fields) MarshalBinary() []byte {
b := []byte{}
keys := make([]string, len(p))
i := 0
var b []byte
keys := make([]string, 0, len(p))
for k := range p {
keys[i] = k
i++
keys = append(keys, k)
}
// Not really necessary, can probably be removed.
sort.Strings(keys)
for _, k := range keys {
v := p[k]
b = append(b, []byte(escape.String(k))...)
b = append(b, '=')
switch t := v.(type) {
case int:
b = append(b, []byte(strconv.FormatInt(int64(t), 10))...)
b = append(b, 'i')
case int8:
b = append(b, []byte(strconv.FormatInt(int64(t), 10))...)
b = append(b, 'i')
case int16:
b = append(b, []byte(strconv.FormatInt(int64(t), 10))...)
b = append(b, 'i')
case int32:
b = append(b, []byte(strconv.FormatInt(int64(t), 10))...)
b = append(b, 'i')
case int64:
b = append(b, []byte(strconv.FormatInt(t, 10))...)
b = append(b, 'i')
case uint:
b = append(b, []byte(strconv.FormatInt(int64(t), 10))...)
b = append(b, 'i')
case uint8:
b = append(b, []byte(strconv.FormatInt(int64(t), 10))...)
b = append(b, 'i')
case uint16:
b = append(b, []byte(strconv.FormatInt(int64(t), 10))...)
b = append(b, 'i')
case uint32:
b = append(b, []byte(strconv.FormatInt(int64(t), 10))...)
b = append(b, 'i')
case float32:
val := []byte(strconv.FormatFloat(float64(t), 'f', -1, 32))
b = append(b, val...)
case float64:
val := []byte(strconv.FormatFloat(t, 'f', -1, 64))
b = append(b, val...)
case bool:
b = append(b, []byte(strconv.FormatBool(t))...)
case []byte:
b = append(b, t...)
case string:
b = append(b, '"')
b = append(b, []byte(EscapeStringField(t))...)
b = append(b, '"')
case nil:
// skip
default:
// Can't determine the type, so convert to string
b = append(b, '"')
b = append(b, []byte(EscapeStringField(fmt.Sprintf("%v", v)))...)
b = append(b, '"')
for i, k := range keys {
if i > 0 {
b = append(b, ',')
}
b = append(b, ',')
b = appendField(b, k, p[k])
}
if len(b) > 0 {
return b[0 : len(b)-1]
return b
}
func appendField(b []byte, k string, v interface{}) []byte {
b = append(b, []byte(escape.String(k))...)
b = append(b, '=')
// check popular types first
switch v := v.(type) {
case float64:
b = strconv.AppendFloat(b, v, 'f', -1, 64)
case int64:
b = strconv.AppendInt(b, v, 10)
b = append(b, 'i')
case string:
b = append(b, '"')
b = append(b, []byte(EscapeStringField(v))...)
b = append(b, '"')
case bool:
b = strconv.AppendBool(b, v)
case int32:
b = strconv.AppendInt(b, int64(v), 10)
b = append(b, 'i')
case int16:
b = strconv.AppendInt(b, int64(v), 10)
b = append(b, 'i')
case int8:
b = strconv.AppendInt(b, int64(v), 10)
b = append(b, 'i')
case int:
b = strconv.AppendInt(b, int64(v), 10)
b = append(b, 'i')
case uint32:
b = strconv.AppendInt(b, int64(v), 10)
b = append(b, 'i')
case uint16:
b = strconv.AppendInt(b, int64(v), 10)
b = append(b, 'i')
case uint8:
b = strconv.AppendInt(b, int64(v), 10)
b = append(b, 'i')
// TODO: 'uint' should be considered just as "dangerous" as a uint64,
// perhaps the value should be checked and capped at MaxInt64? We could
// then include uint64 as an accepted value
case uint:
b = strconv.AppendInt(b, int64(v), 10)
b = append(b, 'i')
case float32:
b = strconv.AppendFloat(b, float64(v), 'f', -1, 32)
case []byte:
b = append(b, v...)
case nil:
// skip
default:
// Can't determine the type, so convert to string
b = append(b, '"')
b = append(b, []byte(EscapeStringField(fmt.Sprintf("%v", v)))...)
b = append(b, '"')
}
return b
}

View File

@ -15,7 +15,15 @@ import (
)
var (
tags = models.NewTags(map[string]string{"foo": "bar", "apple": "orange", "host": "serverA", "region": "uswest"})
tags = models.NewTags(map[string]string{"foo": "bar", "apple": "orange", "host": "serverA", "region": "uswest"})
fields = models.Fields{
"int64": int64(math.MaxInt64),
"uint32": uint32(math.MaxUint32),
"string": "String field that has a decent length, probably some log message or something",
"boolean": false,
"float64-tiny": float64(math.SmallestNonzeroFloat64),
"float64-large": float64(math.MaxFloat64),
}
maxFloat64 = strconv.FormatFloat(math.MaxFloat64, 'f', 1, 64)
minFloat64 = strconv.FormatFloat(-math.MaxFloat64, 'f', 1, 64)
)
@ -35,6 +43,59 @@ func BenchmarkMarshal(b *testing.B) {
}
}
func TestPoint_StringSize(t *testing.T) {
testPoint_cube(t, func(p models.Point) {
l := p.StringSize()
s := p.String()
if l != len(s) {
t.Errorf("Incorrect length for %q. got %v, exp %v", s, l, len(s))
}
})
}
func TestPoint_AppendString(t *testing.T) {
testPoint_cube(t, func(p models.Point) {
got := p.AppendString(nil)
exp := []byte(p.String())
if !reflect.DeepEqual(exp, got) {
t.Errorf("AppendString() didn't match String(): got %v, exp %v", got, exp)
}
})
}
func testPoint_cube(t *testing.T, f func(p models.Point)) {
// heard of a table-driven test? let's make a cube-driven test...
tagList := []models.Tags{nil, {{[]byte("foo"), []byte("bar")}}, tags}
fieldList := []models.Fields{{"a": 42.0}, {"a": 42, "b": "things"}, fields}
timeList := []time.Time{time.Time{}, time.Unix(0, 0), time.Unix(-34526, 0), time.Unix(231845, 0), time.Now()}
for _, tagSet := range tagList {
for _, fieldSet := range fieldList {
for _, pointTime := range timeList {
p, err := models.NewPoint("test", tagSet, fieldSet, pointTime)
if err != nil {
t.Errorf("unexpected error creating point: %v", err)
continue
}
f(p)
}
}
}
}
var p models.Point
func BenchmarkNewPoint(b *testing.B) {
ts := time.Now()
for i := 0; i < b.N; i++ {
p, _ = models.NewPoint("measurement", tags, fields, ts)
}
}
func BenchmarkParsePointNoTags(b *testing.B) {
line := `cpu value=1i 1000000000`
for i := 0; i < b.N; i++ {