models: Add FieldIterator type

The FieldIterator is used to scan over the fields of a point, providing
information, and delaying parsing/decoding the value until it is needed.
This change uses this new type to avoid the allocation of a map for the
fields which is then thrown away as soon as the points get converted
into columns within the datastore.
pull/7389/head
Joe LeGasse 2016-09-21 13:12:09 -04:00 committed by Jason Wilder
parent aeb84b3737
commit 743946fafb
11 changed files with 578 additions and 128 deletions

View File

@ -117,12 +117,7 @@ func NewShardMapping() *ShardMapping {
// MapPoint maps a point to shard
func (s *ShardMapping) MapPoint(shardInfo *meta.ShardInfo, p models.Point) {
points, ok := s.Points[shardInfo.ID]
if !ok {
s.Points[shardInfo.ID] = []models.Point{p}
} else {
s.Points[shardInfo.ID] = append(points, p)
}
s.Points[shardInfo.ID] = append(s.Points[shardInfo.ID], p)
s.Shards[shardInfo.ID] = shardInfo
}
@ -270,8 +265,7 @@ func (w *PointsWriter) WritePoints(database, retentionPolicy string, consistency
return err
}
// Write each shard in it's own goroutine and return as soon
// as one fails.
// Write each shard in it's own goroutine and return as soon as one fails.
ch := make(chan error, len(shardMappings.Points))
for shardID, points := range shardMappings.Points {
go func(shard *meta.ShardInfo, database, retentionPolicy string, points []models.Point) {

View File

@ -6,18 +6,23 @@ import (
"unsafe"
)
// ParseIntBytes is a zero-alloc wrapper around strconv.ParseInt.
func ParseIntBytes(b []byte, base int, bitSize int) (i int64, err error) {
// parseIntBytes is a zero-alloc wrapper around strconv.ParseInt.
func parseIntBytes(b []byte, base int, bitSize int) (i int64, err error) {
s := unsafeBytesToString(b)
return strconv.ParseInt(s, base, bitSize)
}
// ParseFloatBytes is a zero-alloc wrapper around strconv.ParseFloat.
func ParseFloatBytes(b []byte, bitSize int) (float64, error) {
// parseFloatBytes is a zero-alloc wrapper around strconv.ParseFloat.
func parseFloatBytes(b []byte, bitSize int) (float64, error) {
s := unsafeBytesToString(b)
return strconv.ParseFloat(s, bitSize)
}
// parseBoolBytes is a zero-alloc wrapper around strconv.ParseBool.
func parseBoolBytes(b []byte) (bool, error) {
return strconv.ParseBool(unsafeBytesToString(b))
}
// unsafeBytesToString converts a []byte to a string without a heap allocation.
//
// It is unsafe, and is intended to prepare input to short-lived functions

View File

@ -1,38 +1,23 @@
package models_test
package models
import (
"strconv"
"testing"
"testing/quick"
"github.com/influxdata/influxdb/models"
)
func TestParseIntBytesEquivalenceFuzz(t *testing.T) {
f := func(b []byte, base int, bitSize int) bool {
wantI, wantErr := strconv.ParseInt(string(b), base, bitSize)
gotI, gotErr := models.ParseIntBytes(b, base, bitSize)
exp, expErr := strconv.ParseInt(string(b), base, bitSize)
got, gotErr := parseIntBytes(b, base, bitSize)
pred := wantI == gotI
// error objects are heap allocated so naive equality checking
// won't work here. naive pointer dereferencing will panic
// in the case of a nil error.
if wantErr != nil && gotErr == nil {
pred = false
} else if wantErr == nil && gotErr != nil {
pred = false
} else if wantErr != nil && gotErr != nil {
if wantErr.Error() != gotErr.Error() {
pred = false
}
}
return pred
return exp == got && checkErrs(expErr, gotErr)
}
cfg := &quick.Config{
MaxCount: 10000,
}
if err := quick.Check(f, cfg); err != nil {
t.Fatal(err)
}
@ -43,29 +28,16 @@ func TestParseIntBytesValid64bitBase10EquivalenceFuzz(t *testing.T) {
f := func(n int64) bool {
buf = strconv.AppendInt(buf[:0], n, 10)
wantI, wantErr := strconv.ParseInt(string(buf), 10, 64)
gotI, gotErr := models.ParseIntBytes(buf, 10, 64)
exp, expErr := strconv.ParseInt(string(buf), 10, 64)
got, gotErr := parseIntBytes(buf, 10, 64)
pred := wantI == gotI
// error objects are heap allocated so naive equality checking
// won't work here. naive pointer dereferencing will panic
// in the case of a nil error.
if wantErr != nil && gotErr == nil {
pred = false
} else if wantErr == nil && gotErr != nil {
pred = false
} else if wantErr != nil && gotErr != nil {
if wantErr.Error() != gotErr.Error() {
pred = false
}
}
return pred
return exp == got && checkErrs(expErr, gotErr)
}
cfg := &quick.Config{
MaxCount: 10000,
}
if err := quick.Check(f, cfg); err != nil {
t.Fatal(err)
}
@ -73,29 +45,16 @@ func TestParseIntBytesValid64bitBase10EquivalenceFuzz(t *testing.T) {
func TestParseFloatBytesEquivalenceFuzz(t *testing.T) {
f := func(b []byte, bitSize int) bool {
wantI, wantErr := strconv.ParseFloat(string(b), bitSize)
gotI, gotErr := models.ParseFloatBytes(b, bitSize)
exp, expErr := strconv.ParseFloat(string(b), bitSize)
got, gotErr := parseFloatBytes(b, bitSize)
pred := wantI == gotI
// error objects are heap allocated so naive equality checking
// won't work here. naive pointer dereferencing will panic
// in the case of a nil error.
if wantErr != nil && gotErr == nil {
pred = false
} else if wantErr == nil && gotErr != nil {
pred = false
} else if wantErr != nil && gotErr != nil {
if wantErr.Error() != gotErr.Error() {
pred = false
}
}
return pred
return exp == got && checkErrs(expErr, gotErr)
}
cfg := &quick.Config{
MaxCount: 10000,
}
if err := quick.Check(f, cfg); err != nil {
t.Fatal(err)
}
@ -106,30 +65,39 @@ func TestParseFloatBytesValid64bitEquivalenceFuzz(t *testing.T) {
f := func(n float64) bool {
buf = strconv.AppendFloat(buf[:0], n, 'f', -1, 64)
wantI, wantErr := strconv.ParseFloat(string(buf), 64)
gotI, gotErr := models.ParseFloatBytes(buf, 64)
exp, expErr := strconv.ParseFloat(string(buf), 64)
got, gotErr := parseFloatBytes(buf, 64)
pred := wantI == gotI
// error objects are heap allocated so naive equality checking
// won't work here. naive pointer dereferencing will panic
// in the case of a nil error.
if wantErr != nil && gotErr == nil {
pred = false
} else if wantErr == nil && gotErr != nil {
pred = false
} else if wantErr != nil && gotErr != nil {
if wantErr.Error() != gotErr.Error() {
pred = false
}
}
return pred
return exp == got && checkErrs(expErr, gotErr)
}
cfg := &quick.Config{
MaxCount: 10000,
}
if err := quick.Check(f, cfg); err != nil {
t.Fatal(err)
}
}
func TestParseBoolBytesEquivalence(t *testing.T) {
var buf []byte
for _, s := range []string{"1", "t", "T", "TRUE", "true", "True", "0", "f", "F", "FALSE", "false", "False", "fail", "TrUe", "FAlSE", "numbers", ""} {
buf = append(buf[:0], s...)
exp, expErr := strconv.ParseBool(s)
got, gotErr := parseBoolBytes(buf)
if got != exp || !checkErrs(expErr, gotErr) {
t.Errorf("Failed to parse boolean value %q correctly: wanted (%t, %v), got (%t, %v)", s, exp, expErr, got, gotErr)
}
}
}
func checkErrs(a, b error) bool {
if (a == nil) != (b == nil) {
return false
}
return a == nil || a.Error() == b.Error()
}

View File

@ -89,6 +89,33 @@ type Point interface {
// AppendString appends the result of String() to the provided buffer and returns
// the result, potentially reducing string allocations
AppendString(buf []byte) []byte
// FieldIterator retuns a FieldIterator that can be used to traverse the
// fields of a point without constructing the in-memory map
FieldIterator() FieldIterator
}
type FieldType int
const (
Integer FieldType = iota
Float
Boolean
String
Empty
)
type FieldIterator interface {
Next() bool
FieldKey() []byte
Type() FieldType
StringValue() string
IntegerValue() int64
BooleanValue() bool
FloatValue() float64
Delete()
Reset()
}
// Points represents a sortable list of points by timestamp.
@ -121,6 +148,8 @@ type point struct {
// cached version of parsed name from key
cachedName string
it fieldIterator
}
const (
@ -201,7 +230,7 @@ func ParsePointsWithPrecision(buf []byte, defaultTime time.Time, precision strin
block = block[:len(block)-1]
}
pt, err := parsePoint(block[start:len(block)], defaultTime, precision)
pt, err := parsePoint(block[start:], defaultTime, precision)
if err != nil {
failed = append(failed, fmt.Sprintf("unable to parse '%s': %v", string(block[start:len(block)]), err))
} else {
@ -259,7 +288,7 @@ func parsePoint(buf []byte, defaultTime time.Time, precision string) (Point, err
pt.time = defaultTime
pt.SetPrecision(precision)
} else {
ts, err := ParseIntBytes(ts, 10, 64)
ts, err := parseIntBytes(ts, 10, 64)
if err != nil {
return nil, err
}
@ -796,14 +825,14 @@ func scanNumber(buf []byte, i int) (int, error) {
// Parse the int to check bounds the number of digits could be larger than the max range
// We subtract 1 from the index to remove the `i` from our tests
if len(buf[start:i-1]) >= maxInt64Digits || len(buf[start:i-1]) >= minInt64Digits {
if _, err := ParseIntBytes(buf[start:i-1], 10, 64); err != nil {
if _, err := parseIntBytes(buf[start:i-1], 10, 64); err != nil {
return i, fmt.Errorf("unable to parse integer %s: %s", buf[start:i-1], err)
}
}
} else {
// Parse the float to check bounds if it's scientific or the number of digits could be larger than the max range
if scientific || len(buf[start:i]) >= maxFloat64Digits || len(buf[start:i]) >= minFloat64Digits {
if _, err := ParseFloatBytes(buf[start:i], 10); err != nil {
if _, err := parseFloatBytes(buf[start:i], 10); err != nil {
return i, fmt.Errorf("invalid float")
}
}
@ -1637,18 +1666,18 @@ type Fields map[string]interface{}
func parseNumber(val []byte) (interface{}, error) {
if val[len(val)-1] == 'i' {
val = val[:len(val)-1]
return ParseIntBytes(val, 10, 64)
return parseIntBytes(val, 10, 64)
}
for i := 0; i < len(val); i++ {
// If there is a decimal or an N (NaN), I (Inf), parse as float
if val[i] == '.' || val[i] == 'N' || val[i] == 'n' || val[i] == 'I' || val[i] == 'i' || val[i] == 'e' {
return ParseFloatBytes(val, 64)
return parseFloatBytes(val, 64)
}
if val[i] < '0' && val[i] > '9' {
return string(val), nil
}
}
return ParseFloatBytes(val, 64)
return parseFloatBytes(val, 64)
}
func newFieldsFromBinary(buf []byte) Fields {
@ -1698,6 +1727,122 @@ func newFieldsFromBinary(buf []byte) Fields {
return fields
}
func (p *point) FieldIterator() FieldIterator {
p.Reset()
return p
}
type fieldIterator struct {
start, end int
key, keybuf []byte
valueBuf []byte
fieldType FieldType
}
func (p *point) Next() bool {
p.it.start = p.it.end
if p.it.start >= len(p.fields) {
return false
}
p.it.end, p.it.key = scanTo(p.fields, p.it.start, '=')
if escape.IsEscaped(p.it.key) {
p.it.keybuf = escape.AppendUnescaped(p.it.keybuf[:0], p.it.key)
p.it.key = p.it.keybuf
}
p.it.end, p.it.valueBuf = scanFieldValue(p.fields, p.it.end+1)
p.it.end++
if len(p.it.valueBuf) == 0 {
p.it.fieldType = Empty
return true
}
c := p.it.valueBuf[0]
if c == '"' {
p.it.fieldType = String
return true
}
if strings.IndexByte(`0123456789-.nNiI`, c) >= 0 {
if p.it.valueBuf[len(p.it.valueBuf)-1] == 'i' {
p.it.fieldType = Integer
p.it.valueBuf = p.it.valueBuf[:len(p.it.valueBuf)-1]
} else {
p.it.fieldType = Float
}
return true
}
// to keep the same behavior that currently exists, default to boolean
p.it.fieldType = Boolean
return true
}
func (p *point) FieldKey() []byte {
return p.it.key
}
func (p *point) Type() FieldType {
return p.it.fieldType
}
func (p *point) StringValue() string {
return unescapeStringField(string(p.it.valueBuf[1 : len(p.it.valueBuf)-1]))
}
func (p *point) IntegerValue() int64 {
n, err := parseIntBytes(p.it.valueBuf, 10, 64)
if err != nil {
panic(fmt.Sprintf("unable to parse integer value %q: %v", p.it.valueBuf, err))
}
return n
}
func (p *point) BooleanValue() bool {
b, err := parseBoolBytes(p.it.valueBuf)
if err != nil {
panic(fmt.Sprintf("unable to parse bool value %q: %v", p.it.valueBuf, err))
}
return b
}
func (p *point) FloatValue() float64 {
f, err := parseFloatBytes(p.it.valueBuf, 64)
if err != nil {
// panic because that's what the non-iterator code does
panic(fmt.Sprintf("unable to parse floating point value %q: %v", p.it.valueBuf, err))
}
return f
}
func (p *point) Delete() {
switch {
case p.it.end == p.it.start:
case p.it.end >= len(p.fields):
p.fields = p.fields[:p.it.start]
case p.it.start == 0:
p.fields = p.fields[p.it.end:]
default:
p.fields = append(p.fields[:p.it.start], p.fields[p.it.end:]...)
}
p.it.end = p.it.start
p.it.key = nil
p.it.valueBuf = nil
p.it.fieldType = Empty
}
func (p *point) Reset() {
p.it.fieldType = Empty
p.it.key = nil
p.it.valueBuf = nil
p.it.start = 0
p.it.end = 0
}
// MarshalBinary encodes all the fields to their proper type and returns the binary
// represenation
// NOTE: uint64 is specifically not supported due to potential overflow when we decode

View File

@ -1958,3 +1958,198 @@ func TestParseKeyEmpty(t *testing.T) {
t.Fatalf("unexpected error: %v", err)
}
}
func TestPoint_FieldIterator_Simple(t *testing.T) {
p, err := models.ParsePoints([]byte(`m v=42i,f=42 36`))
if err != nil {
t.Fatal(err)
}
if len(p) != 1 {
t.Fatalf("wrong number of points, got %d, exp %d", len(p), 1)
}
fi := p[0].FieldIterator()
if !fi.Next() {
t.Fatal("field iterator terminated before first field")
}
if fi.Type() != models.Integer {
t.Fatalf("'42i' should be an Integer, got %v", fi.Type())
}
if fi.IntegerValue() != 42 {
t.Fatalf("'42i' should be 42, got %d", fi.IntegerValue())
}
if !fi.Next() {
t.Fatalf("field iterator terminated before second field")
}
if fi.Type() != models.Float {
t.Fatalf("'42' should be a Float, got %v", fi.Type())
}
if fi.FloatValue() != 42.0 {
t.Fatalf("'42' should be %f, got %f", 42.0, fi.FloatValue())
}
if fi.Next() {
t.Fatal("field iterator didn't terminate")
}
}
func toFields(fi models.FieldIterator) models.Fields {
m := make(models.Fields)
for fi.Next() {
var v interface{}
switch fi.Type() {
case models.Float:
v = fi.FloatValue()
case models.Integer:
v = fi.IntegerValue()
case models.String:
v = fi.StringValue()
case models.Boolean:
v = fi.BooleanValue()
case models.Empty:
v = nil
default:
panic("unknown type")
}
m[string(fi.FieldKey())] = v
}
return m
}
func TestPoint_FieldIterator_FieldMap(t *testing.T) {
points, err := models.ParsePointsString(`
m v=42
m v=42i
m v="string"
m v=true
m v="string\"with\"escapes"
m v=42i,f=42,g=42.314
m a=2i,b=3i,c=true,d="stuff",e=-0.23,f=123.456
`)
if err != nil {
t.Fatal("failed to parse test points:", err)
}
for _, p := range points {
exp := p.Fields()
got := toFields(p.FieldIterator())
if !reflect.DeepEqual(got, exp) {
t.Errorf("FieldIterator failed for %#q: got %#v, exp %#v", p.String(), got, exp)
}
}
}
func TestPoint_FieldIterator_Delete_Begin(t *testing.T) {
points, err := models.ParsePointsString(`m a=1,b=2,c=3`)
if err != nil || len(points) != 1 {
t.Fatal("failed parsing point")
}
fi := points[0].FieldIterator()
fi.Next() // a
fi.Delete()
fi.Reset()
got := toFields(fi)
exp := models.Fields{"b": float64(2), "c": float64(3)}
if !reflect.DeepEqual(got, exp) {
t.Fatalf("Delete failed, got %#v, exp %#v", got, exp)
}
}
func TestPoint_FieldIterator_Delete_Middle(t *testing.T) {
points, err := models.ParsePointsString(`m a=1,b=2,c=3`)
if err != nil || len(points) != 1 {
t.Fatal("failed parsing point")
}
fi := points[0].FieldIterator()
fi.Next() // a
fi.Next() // b
fi.Delete()
fi.Reset()
got := toFields(fi)
exp := models.Fields{"a": float64(1), "c": float64(3)}
if !reflect.DeepEqual(got, exp) {
t.Fatalf("Delete failed, got %#v, exp %#v", got, exp)
}
}
func TestPoint_FieldIterator_Delete_End(t *testing.T) {
points, err := models.ParsePointsString(`m a=1,b=2,c=3`)
if err != nil || len(points) != 1 {
t.Fatal("failed parsing point")
}
fi := points[0].FieldIterator()
fi.Next() // a
fi.Next() // b
fi.Next() // c
fi.Delete()
fi.Reset()
got := toFields(fi)
exp := models.Fields{"a": float64(1), "b": float64(2)}
if !reflect.DeepEqual(got, exp) {
t.Fatalf("Delete failed, got %#v, exp %#v", got, exp)
}
}
func TestPoint_FieldIterator_Delete_Nothing(t *testing.T) {
points, err := models.ParsePointsString(`m a=1,b=2,c=3`)
if err != nil || len(points) != 1 {
t.Fatal("failed parsing point")
}
fi := points[0].FieldIterator()
fi.Delete()
fi.Reset()
got := toFields(fi)
exp := models.Fields{"a": float64(1), "b": float64(2), "c": float64(3)}
if !reflect.DeepEqual(got, exp) {
t.Fatalf("Delete failed, got %#v, exp %#v", got, exp)
}
}
func TestPoint_FieldIterator_Delete_Twice(t *testing.T) {
points, err := models.ParsePointsString(`m a=1,b=2,c=3`)
if err != nil || len(points) != 1 {
t.Fatal("failed parsing point")
}
fi := points[0].FieldIterator()
fi.Next() // a
fi.Next() // b
fi.Delete()
fi.Delete() // no-op
fi.Reset()
got := toFields(fi)
exp := models.Fields{"a": float64(1), "c": float64(3)}
if !reflect.DeepEqual(got, exp) {
t.Fatalf("Delete failed, got %#v, exp %#v", got, exp)
}
}

View File

@ -1,6 +1,9 @@
package escape // import "github.com/influxdata/influxdb/pkg/escape"
import "bytes"
import (
"bytes"
"strings"
)
func Bytes(in []byte) []byte {
for b, esc := range Codes {
@ -9,6 +12,45 @@ func Bytes(in []byte) []byte {
return in
}
const escapeChars = `," =`
func IsEscaped(b []byte) bool {
for len(b) > 0 {
i := bytes.IndexByte(b, '\\')
if i < 0 {
return false
}
if i+1 < len(b) && strings.IndexByte(escapeChars, b[i+1]) >= 0 {
return true
}
b = b[i+1:]
}
return false
}
func AppendUnescaped(dst, src []byte) []byte {
var pos int
for len(src) > 0 {
next := bytes.IndexByte(src[pos:], '\\')
if next < 0 || pos+next+1 >= len(src) {
return append(dst, src...)
}
if pos+next+1 < len(src) && strings.IndexByte(escapeChars, src[pos+next+1]) >= 0 {
if pos+next > 0 {
dst = append(dst, src[:pos+next]...)
}
src = src[pos+next+1:]
pos = 0
} else {
pos += next + 1
}
}
return dst
}
func Unescape(in []byte) []byte {
if len(in) == 0 {
return nil

View File

@ -1,7 +1,9 @@
package escape
import (
"bytes"
"reflect"
"strings"
"testing"
)
@ -43,3 +45,24 @@ func TestUnescape(t *testing.T) {
}
}
}
func TestAppendUnescaped(t *testing.T) {
cases := strings.Split(strings.TrimSpace(`
normal
inv\alid
goo\"d
sp\ ace
\,\"\ \=
f\\\ x
`), "\n")
for _, c := range cases {
exp := Unescape([]byte(c))
got := AppendUnescaped(nil, []byte(c))
if !bytes.Equal(got, exp) {
t.Errorf("AppendUnescaped failed for %#q: got %#q, exp %#q", c, got, exp)
}
}
}

View File

@ -108,18 +108,33 @@ func NewValue(t int64, value interface{}) Value {
case string:
return &StringValue{unixnano: t, value: v}
}
return &EmptyValue{}
return EmptyValue{}
}
type EmptyValue struct {
func NewIntegerValue(t int64, v int64) Value {
return &IntegerValue{unixnano: t, value: v}
}
func (e *EmptyValue) UnixNano() int64 { return tsdb.EOF }
func (e *EmptyValue) Value() interface{} { return nil }
func (e *EmptyValue) Size() int { return 0 }
func (e *EmptyValue) String() string { return "" }
func NewFloatValue(t int64, v float64) Value {
return &FloatValue{unixnano: t, value: v}
}
func (_ *EmptyValue) internalOnly() {}
func NewBooleanValue(t int64, v bool) Value {
return &BooleanValue{unixnano: t, value: v}
}
func NewStringValue(t int64, v string) Value {
return &StringValue{unixnano: t, value: v}
}
type EmptyValue struct{}
func (e EmptyValue) UnixNano() int64 { return tsdb.EOF }
func (e EmptyValue) Value() interface{} { return nil }
func (e EmptyValue) Size() int { return 0 }
func (e EmptyValue) String() string { return "" }
func (_ EmptyValue) internalOnly() {}
func (_ *StringValue) internalOnly() {}
func (_ *IntegerValue) internalOnly() {}
func (_ *BooleanValue) internalOnly() {}

View File

@ -565,10 +565,30 @@ func (e *Engine) addToIndexFromKey(shardID uint64, key []byte, fieldType influxq
// Returns an error if new points are added to an existing key.
func (e *Engine) WritePoints(points []models.Point) error {
values := map[string][]Value{}
var keyBuf []byte
var baseLen int
for _, p := range points {
for k, v := range p.Fields() {
key := string(p.Key()) + keyFieldSeparator + k
values[key] = append(values[key], NewValue(p.Time().UnixNano(), v))
keyBuf = append(keyBuf[:0], p.Key()...)
keyBuf = append(keyBuf, keyFieldSeparator...)
baseLen = len(keyBuf)
iter := p.FieldIterator()
t := p.Time().UnixNano()
for iter.Next() {
keyBuf = append(keyBuf[:baseLen], iter.FieldKey()...)
var v Value
switch iter.Type() {
case models.Float:
v = NewFloatValue(t, iter.FloatValue())
case models.Integer:
v = NewIntegerValue(t, iter.IntegerValue())
case models.String:
v = NewStringValue(t, iter.StringValue())
case models.Boolean:
v = NewBooleanValue(t, iter.BooleanValue())
default:
v = EmptyValue{}
}
values[string(keyBuf)] = append(values[string(keyBuf)], v)
}
}

View File

@ -1,6 +1,7 @@
package tsdb
import (
"bytes"
"errors"
"fmt"
"io"
@ -60,7 +61,7 @@ var (
var (
// Static objects to prevent small allocs.
timeTag = []byte("time")
timeBytes = []byte("time")
)
// A ShardError implements the error interface, and contains extra
@ -472,59 +473,94 @@ func (s *Shard) validateSeriesAndFields(points []models.Point) ([]*FieldCreate,
for _, p := range points {
// verify the tags and fields
tags := p.Tags()
if v := tags.Get(timeTag); v != nil {
if v := tags.Get(timeBytes); v != nil {
s.logger.Printf("dropping tag 'time' from '%s'\n", p.PrecisionString(""))
tags.Delete(timeTag)
tags.Delete(timeBytes)
p.SetTags(tags)
}
fields := p.Fields()
if _, ok := fields["time"]; ok {
s.logger.Printf("dropping field 'time' from '%s'\n", p.PrecisionString(""))
delete(fields, "time")
if len(fields) == 0 {
var validField bool
iter := p.FieldIterator()
for iter.Next() {
if bytes.Equal(iter.FieldKey(), timeBytes) {
s.logger.Printf("dropping field 'time' from '%s'\n", p.PrecisionString(""))
iter.Delete()
continue
}
validField = true
}
if !validField {
continue
}
iter.Reset()
// see if the series should be added to the index
ss := s.index.SeriesBytes(p.Key())
if ss == nil {
key := string(p.Key())
if s.options.Config.MaxSeriesPerDatabase > 0 && s.index.SeriesN()+1 > s.options.Config.MaxSeriesPerDatabase {
return nil, fmt.Errorf("max series per database exceeded: %s", key)
return nil, fmt.Errorf("max series per database exceeded: %s", p.Key())
}
ss = NewSeries(key, tags)
ss = s.index.CreateSeriesIndexIfNotExists(p.Name(), NewSeries(string(p.Key()), tags))
atomic.AddInt64(&s.stats.SeriesCreated, 1)
}
ss = s.index.CreateSeriesIndexIfNotExists(p.Name(), ss)
s.index.AssignShard(ss.Key, s.id)
if !ss.Assigned(s.id) {
ss.AssignShard(s.id)
}
// see if the field definitions need to be saved to the shard
mf := s.engine.MeasurementFields(p.Name())
if mf == nil {
for name, value := range fields {
fieldsToCreate = append(fieldsToCreate, &FieldCreate{p.Name(), &Field{Name: name, Type: influxql.InspectDataType(value)}})
var createType influxql.DataType
for iter.Next() {
switch iter.Type() {
case models.Float:
createType = influxql.Float
case models.Integer:
createType = influxql.Integer
case models.String:
createType = influxql.String
case models.Boolean:
createType = influxql.Boolean
default:
continue
}
fieldsToCreate = append(fieldsToCreate, &FieldCreate{p.Name(), &Field{Name: string(iter.FieldKey()), Type: createType}})
}
continue // skip validation since all fields are new
}
iter.Reset()
// validate field types and encode data
for name, value := range fields {
if f := mf.Field(name); f != nil {
for iter.Next() {
var fieldType influxql.DataType
switch iter.Type() {
case models.Float:
fieldType = influxql.Float
case models.Integer:
fieldType = influxql.Integer
case models.Boolean:
fieldType = influxql.Boolean
case models.String:
fieldType = influxql.String
default:
continue
}
if f := mf.FieldBytes(iter.FieldKey()); f != nil {
// Field present in shard metadata, make sure there is no type conflict.
if f.Type != influxql.InspectDataType(value) {
return nil, fmt.Errorf("%s: input field \"%s\" on measurement \"%s\" is type %T, already exists as type %s", ErrFieldTypeConflict, name, p.Name(), value, f.Type)
if f.Type != fieldType {
return nil, fmt.Errorf("%s: input field \"%s\" on measurement \"%s\" is type %s, already exists as type %s", ErrFieldTypeConflict, iter.FieldKey(), p.Name(), fieldType, f.Type)
}
continue // Field is present, and it's of the same type. Nothing more to do.
}
fieldsToCreate = append(fieldsToCreate, &FieldCreate{p.Name(), &Field{Name: name, Type: influxql.InspectDataType(value)}})
fieldsToCreate = append(fieldsToCreate, &FieldCreate{p.Name(), &Field{Name: string(iter.FieldKey()), Type: fieldType}})
}
}
@ -828,6 +864,13 @@ func (m *MeasurementFields) Field(name string) *Field {
return f
}
func (m *MeasurementFields) FieldBytes(name []byte) *Field {
m.mu.RLock()
f := m.fields[string(name)]
m.mu.RUnlock()
return f
}
func (m *MeasurementFields) FieldSet() map[string]influxql.DataType {
m.mu.RLock()
defer m.mu.RUnlock()

View File

@ -818,8 +818,8 @@ func (s *Store) WriteToShard(shardID uint64, points []models.Point) error {
default:
}
sh, ok := s.shards[shardID]
if !ok {
sh := s.shards[shardID]
if sh == nil {
s.mu.RUnlock()
return ErrShardNotFound
}