Merge point parse & explode (#12377)

Merge point parse & explode
pull/13611/head
Ben Johnson 2019-04-24 10:30:16 -06:00 committed by GitHub
commit 01bfcf822b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 915 additions and 695 deletions

View File

@ -4,8 +4,6 @@ import (
"context"
"encoding/json"
"github.com/influxdata/influxdb/tsdb"
"github.com/influxdata/influxdb/nats"
"github.com/influxdata/influxdb/storage"
"go.uber.org/zap"
@ -22,10 +20,6 @@ func (s PointWriter) Record(collected MetricsCollection) error {
if err != nil {
return err
}
ps, err = tsdb.ExplodePoints(collected.OrgID, collected.BucketID, ps)
if err != nil {
return err
}
return s.Writer.WritePoints(context.TODO(), ps)
}

View File

@ -223,7 +223,9 @@ func (h *WriteHandler) handleWrite(w http.ResponseWriter, r *http.Request) {
}
requestBytes = len(data)
points, err := models.ParsePointsWithPrecision(data, time.Now(), req.Precision)
encoded := tsdb.EncodeName(org.ID, bucket.ID)
mm := models.EscapeMeasurement(encoded[:])
points, err := models.ParsePointsWithPrecision(data, mm, time.Now(), req.Precision)
if err != nil {
logger.Error("Error parsing points", zap.Error(err))
EncodeError(ctx, &platform.Error{
@ -235,19 +237,7 @@ func (h *WriteHandler) handleWrite(w http.ResponseWriter, r *http.Request) {
return
}
exploded, err := tsdb.ExplodePoints(org.ID, bucket.ID, points)
if err != nil {
logger.Error("Error exploding points", zap.Error(err))
EncodeError(ctx, &platform.Error{
Code: platform.EInternal,
Op: "http/handleWrite",
Msg: fmt.Sprintf("unable to convert points to internal structures: %v", err),
Err: err,
}, w)
return
}
if err := h.PointsWriter.WritePoints(ctx, exploded); err != nil {
if err := h.PointsWriter.WritePoints(ctx, points); err != nil {
logger.Error("Error writing points", zap.Error(err))
EncodeError(ctx, &platform.Error{
Code: platform.EInternal,

View File

@ -18,8 +18,7 @@ import (
"github.com/influxdata/influxdb/pkg/escape"
)
// Values used to store the field key and measurement name as special internal
// tags.
// Values used to store the field key and measurement name as special internal tags.
const (
FieldKeyTagKey = "\xff"
MeasurementTagKey = "\x00"
@ -289,13 +288,17 @@ const (
// ParsePoints returns a slice of Points from a text representation of a point
// with each point separated by newlines. If any points fail to parse, a non-nil error
// will be returned in addition to the points that parsed successfully.
func ParsePoints(buf []byte) ([]Point, error) {
return ParsePointsWithPrecision(buf, time.Now().UTC(), "n")
//
// The mm argument supplies the new measurement which is generated by calling
// EscapeMeasurement(EncodeName(orgID, bucketID)). The existing measurement is
// moved to the "_m" tag.
func ParsePoints(buf, mm []byte) ([]Point, error) {
return ParsePointsWithPrecision(buf, mm, time.Now().UTC(), "n")
}
// ParsePointsString is identical to ParsePoints but accepts a string.
func ParsePointsString(buf string) ([]Point, error) {
return ParsePoints([]byte(buf))
func ParsePointsString(buf, mm string) ([]Point, error) {
return ParsePoints([]byte(buf), []byte(mm))
}
// ParseKey returns the measurement name and tags from a point.
@ -355,12 +358,22 @@ func ValidPrecision(precision string) bool {
}
}
// ParsePointsWithPrecisionV1 is similar to ParsePointsWithPrecision but does
// not rewrite the measurement & field keys.
func ParsePointsWithPrecisionV1(buf []byte, mm []byte, defaultTime time.Time, precision string) (_ []Point, err error) {
return parsePointsWithPrecision(buf, mm, defaultTime, precision, false)
}
// ParsePointsWithPrecision is similar to ParsePoints, but allows the
// caller to provide a precision for time.
//
// NOTE: to minimize heap allocations, the returned Points will refer to subslices of buf.
// This can have the unintended effect preventing buf from being garbage collected.
func ParsePointsWithPrecision(buf []byte, defaultTime time.Time, precision string) ([]Point, error) {
func ParsePointsWithPrecision(buf []byte, mm []byte, defaultTime time.Time, precision string) (_ []Point, err error) {
return parsePointsWithPrecision(buf, mm, defaultTime, precision, true)
}
func parsePointsWithPrecision(buf []byte, mm []byte, defaultTime time.Time, precision string, rewrite bool) (_ []Point, err error) {
points := make([]Point, 0, bytes.Count(buf, []byte{'\n'})+1)
var (
pos int
@ -392,22 +405,19 @@ func ParsePointsWithPrecision(buf []byte, defaultTime time.Time, precision strin
block = block[:len(block)-1]
}
pt, err := parsePoint(block[start:], defaultTime, precision)
points, err = parsePointsAppend(points, block[start:], mm, defaultTime, precision, rewrite)
if err != nil {
failed = append(failed, fmt.Sprintf("unable to parse '%s': %v", string(block[start:]), err))
} else {
points = append(points, pt)
}
}
if len(failed) > 0 {
return points, fmt.Errorf("%s", strings.Join(failed, "\n"))
}
return points, nil
return points, nil
}
func parsePoint(buf []byte, defaultTime time.Time, precision string) (Point, error) {
func parsePointsAppend(points []Point, buf []byte, mm []byte, defaultTime time.Time, precision string, rewrite bool) ([]Point, error) {
// scan the first block which is measurement[,tag1=value1,tag2=value=2...]
pos, key, err := scanKey(buf, 0)
if err != nil {
@ -416,52 +426,43 @@ func parsePoint(buf []byte, defaultTime time.Time, precision string) (Point, err
// measurement name is required
if len(key) == 0 {
return nil, fmt.Errorf("missing measurement")
return points, fmt.Errorf("missing measurement")
}
if len(key) > MaxKeyLength {
return nil, fmt.Errorf("max key length exceeded: %v > %v", len(key), MaxKeyLength)
return points, fmt.Errorf("max key length exceeded: %v > %v", len(key), MaxKeyLength)
}
// Since the measurement is converted to a tag and measurements & tags have
// different escaping rules, we need to check if the measurement needs escaping.
_, i, _ := scanMeasurement(key, 0)
keyMeasurement := key[:i-1]
if rewrite && bytes.IndexByte(keyMeasurement, '=') != -1 {
escapedKeyMeasurement := bytes.Replace(keyMeasurement, []byte("="), []byte(`\=`), -1)
newKey := make([]byte, len(escapedKeyMeasurement)+(len(key)-len(keyMeasurement)))
copy(newKey, escapedKeyMeasurement)
copy(newKey[len(escapedKeyMeasurement):], key[len(keyMeasurement):])
key = newKey
}
// scan the second block is which is field1=value1[,field2=value2,...]
// at least one field is required
pos, fields, err := scanFields(buf, pos)
if err != nil {
return nil, err
}
// at least one field is required
if len(fields) == 0 {
return nil, fmt.Errorf("missing fields")
}
var maxKeyErr error
err = walkFields(fields, func(k, v []byte) bool {
if sz := seriesKeySize(key, k); sz > MaxKeyLength {
maxKeyErr = fmt.Errorf("max key length exceeded: %v > %v", sz, MaxKeyLength)
return false
}
return true
})
if err != nil {
return nil, err
}
if maxKeyErr != nil {
return nil, maxKeyErr
return points, err
} else if len(fields) == 0 {
return points, fmt.Errorf("missing fields")
}
// scan the last block which is an optional integer timestamp
pos, ts, err := scanTime(buf, pos)
if err != nil {
return nil, err
return points, err
}
pt := &point{
key: key,
fields: fields,
ts: ts,
}
// Build point with timestamp only.
pt := point{ts: ts}
if len(ts) == 0 {
pt.time = defaultTime
@ -469,23 +470,68 @@ func parsePoint(buf []byte, defaultTime time.Time, precision string) (Point, err
} else {
ts, err := parseIntBytes(ts, 10, 64)
if err != nil {
return nil, err
return points, err
}
pt.time, err = SafeCalcTime(ts, precision)
if err != nil {
return nil, err
return points, err
}
// Determine if there are illegal non-whitespace characters after the
// timestamp block.
for pos < len(buf) {
if buf[pos] != ' ' {
return nil, ErrInvalidPoint
return points, ErrInvalidPoint
}
pos++
}
}
return pt, nil
// Loop over fields and split points while validating field.
var maxKeyErr error
if err := walkFields(fields, func(k, v, fieldBuf []byte) bool {
newKey := key
// Build new key with measurement & field as keys.
if rewrite {
newKey = newV2Key(key, mm, k)
if sz := seriesKeySizeV2(key, mm, k); sz > MaxKeyLength {
maxKeyErr = fmt.Errorf("max key length exceeded: %v > %v", sz, MaxKeyLength)
return false
}
}
other := pt
other.key = newKey
other.fields = fieldBuf
points = append(points, &other)
return true
}); err != nil {
return points, err
} else if maxKeyErr != nil {
return points, maxKeyErr
}
return points, nil
}
// newV2Key returns a new key by converting the old measurement & field into keys.
func newV2Key(oldKey, mm, field []byte) []byte {
newKey := make([]byte, len(mm)+1+len(MeasurementTagKey)+1+len(oldKey)+1+len(FieldKeyTagKey)+1+len(field))
buf := newKey
copy(buf, mm)
buf = buf[len(mm):]
buf[0], buf[1], buf[2], buf = ',', MeasurementTagKeyBytes[0], '=', buf[3:]
copy(buf, oldKey)
buf = buf[len(oldKey):]
buf[0], buf[1], buf[2], buf = ',', FieldKeyTagKeyBytes[0], '=', buf[3:]
copy(buf, field)
return newKey
}
// GetPrecisionMultiplier will return a multiplier for the precision specified.
@ -1414,7 +1460,7 @@ func pointKey(measurement string, tags Tags, fields Fields, t time.Time) ([]byte
key := MakeKey([]byte(measurement), tags)
for field := range fields {
sz := seriesKeySize(key, []byte(field))
sz := seriesKeySizeV1(key, []byte(field))
if sz > MaxKeyLength {
return nil, fmt.Errorf("max key length exceeded: %v > %v", sz, MaxKeyLength)
}
@ -1423,10 +1469,12 @@ func pointKey(measurement string, tags Tags, fields Fields, t time.Time) ([]byte
return key, nil
}
func seriesKeySize(key, field []byte) int {
// 4 is the length of the tsm1.fieldKeySeparator constant. It's inlined here to avoid a circular
// dependency.
return len(key) + 4 + len(field)
func seriesKeySizeV1(key, field []byte) int {
return len(key) + len("#!~#") + len(field)
}
func seriesKeySizeV2(key, mm, field []byte) int {
return len(mm) + len(",\xFF=") + len(field) + len(",\x00=") + len(key) + len("#!~#") + len(field)
}
// NewPointFromBytes returns a new Point from a marshalled Point.
@ -1595,10 +1643,12 @@ func walkTags(buf []byte, fn func(key, value []byte) bool) {
// walkFields walks each field key and value via fn. If fn returns false, the iteration
// is stopped. The values are the raw byte slices and not the converted types.
func walkFields(buf []byte, fn func(key, value []byte) bool) error {
func walkFields(buf []byte, fn func(key, value, data []byte) bool) error {
var i int
var key, val []byte
for len(buf) > 0 {
data := buf
i, key = scanTo(buf, 0, '=')
if i > len(buf)-2 {
return fmt.Errorf("invalid value: field-key=%s", key)
@ -1606,7 +1656,7 @@ func walkFields(buf []byte, fn func(key, value []byte) bool) error {
buf = buf[i+1:]
i, val = scanFieldValue(buf, 0)
buf = buf[i:]
if !fn(key, val) {
if !fn(key, val, data[:len(data)-len(buf)]) {
break
}

View File

@ -3,7 +3,7 @@ package models
import "testing"
func TestMarshalPointNoFields(t *testing.T) {
points, err := ParsePointsString("m,k=v f=0i")
points, err := ParsePointsString("m,k=v f=0i", "foo")
if err != nil {
t.Fatal(err)
}

File diff suppressed because it is too large Load Diff

View File

@ -583,19 +583,31 @@ func writeTable(t *ToTransformation, tbl flux.Table) error {
measurementStats[measurementName].Update(mstats)
}
pt, err := models.NewPoint(measurementName, tags, fields, pointTime)
if err != nil {
return err
name := tsdb.EncodeNameString(*orgID, *bucketID)
fieldNames := make([]string, 0, len(fields))
for k := range fields {
fieldNames = append(fieldNames, k)
}
points = append(points, pt)
sort.Strings(fieldNames)
for _, k := range fieldNames {
v := fields[k]
pointTags := models.Tags{{Key: []byte("\x00"), Value: []byte(measurementName)}}
pointTags = append(pointTags, tags...)
pointTags = append(pointTags, models.Tag{Key: []byte("\xff"), Value: []byte(k)})
pt, err := models.NewPoint(name, pointTags, models.Fields{k: v}, pointTime)
if err != nil {
return err
}
points = append(points, pt)
}
if err := execute.AppendRecord(i, er, builder); err != nil {
return err
}
}
points, err = tsdb.ExplodePoints(*orgID, *bucketID, points)
if err != nil {
return err
}
return d.PointsWriter.WritePoints(context.TODO(), points)
})
}

View File

@ -157,11 +157,11 @@ func TestTo_Process(t *testing.T) {
})},
want: wanted{
result: &mock.PointsWriter{
Points: mockPoints(oid, bid, `a _value=2.0 11
a _value=2.0 21
b _value=1.0 21
a _value=3.0 31
c _value=4.0 41`),
Points: mockPoints(oid, bid, `a _value=2 11
a _value=2 21
b _value=1 21
a _value=3 31
c _value=4 41`),
},
tables: []*executetest.Table{{
ColMeta: []flux.ColMeta{
@ -210,11 +210,11 @@ c _value=4.0 41`),
})},
want: wanted{
result: &mock.PointsWriter{
Points: mockPoints(oid, bid, `a,tag2=aa _value=2.0 11
a,tag2=bb _value=2.0 21
b,tag2=cc _value=1.0 21
a,tag2=dd _value=3.0 31
c,tag2=ee _value=4.0 41`),
Points: mockPoints(oid, bid, `a,tag2=aa _value=2 11
a,tag2=bb _value=2 21
b,tag2=cc _value=1 21
a,tag2=dd _value=3 31
c,tag2=ee _value=4 41`),
},
tables: []*executetest.Table{{
ColMeta: []flux.ColMeta{
@ -264,11 +264,11 @@ c,tag2=ee _value=4.0 41`),
})},
want: wanted{
result: &mock.PointsWriter{
Points: mockPoints(oid, bid, `m,tag1=a,tag2=aa _value=2.0 11
m,tag1=a,tag2=bb _value=2.0 21
m,tag1=b,tag2=cc _value=1.0 21
m,tag1=a,tag2=dd _value=3.0 31
m,tag1=c,tag2=ee _value=4.0 41`),
Points: mockPoints(oid, bid, `m,tag1=a,tag2=aa _value=2 11
m,tag1=a,tag2=bb _value=2 21
m,tag1=b,tag2=cc _value=1 21
m,tag1=a,tag2=dd _value=3 31
m,tag1=c,tag2=ee _value=4 41`),
},
tables: []*executetest.Table{{
ColMeta: []flux.ColMeta{
@ -337,11 +337,11 @@ m,tag1=c,tag2=ee _value=4.0 41`),
})},
want: wanted{
result: &mock.PointsWriter{
Points: mockPoints(oid, bid, `a temperature=2.0 11
a temperature=2.0 21
b temperature=1.0 21
a temperature=3.0 31
c temperature=4.0 41`),
Points: mockPoints(oid, bid, `a temperature=2 11
a temperature=2 21
b temperature=1 21
a temperature=3 31
c temperature=4 41`),
},
tables: []*executetest.Table{{
ColMeta: []flux.ColMeta{
@ -437,11 +437,11 @@ c temperature=4.0 41`),
})},
want: wanted{
result: &mock.PointsWriter{
Points: mockPoints(oid, bid, `a day="Monday",humidity=1.0,ratio=2.0,temperature=2.0 11
a day="Tuesday",humidity=2.0,ratio=1.0,temperature=2.0 21
b day="Wednesday",humidity=4.0,ratio=0.25,temperature=1.0 21
a day="Thursday",humidity=3.0,ratio=1.0,temperature=3.0 31
c day="Friday",humidity=5.0,ratio=0.8,temperature=4.0 41`),
Points: mockPoints(oid, bid, `a day="Monday",humidity=1,ratio=2,temperature=2 11
a day="Tuesday",humidity=2,ratio=1,temperature=2 21
b day="Wednesday",humidity=4,ratio=0.25,temperature=1 21
a day="Thursday",humidity=3,ratio=1,temperature=3 31
c day="Friday",humidity=5,ratio=0.8,temperature=4 41`),
},
tables: []*executetest.Table{{
ColMeta: []flux.ColMeta{
@ -523,11 +523,11 @@ c day="Friday",humidity=5.0,ratio=0.8,temperature=4.0 41`),
})},
want: wanted{
result: &mock.PointsWriter{
Points: mockPoints(oid, bid, `a,tag2=d humidity=50i,temperature=2.0 11
a,tag2=d humidity=50i,temperature=2.0 21
b,tag2=d humidity=50i,temperature=1.0 21
a,tag2=e humidity=60i,temperature=3.0 31
c,tag2=e humidity=65i,temperature=4.0 41`),
Points: mockPoints(oid, bid, `a,tag2=d humidity=50i,temperature=2 11
a,tag2=d humidity=50i,temperature=2 21
b,tag2=d humidity=50i,temperature=1 21
a,tag2=e humidity=60i,temperature=3 31
c,tag2=e humidity=65i,temperature=4 41`),
},
tables: []*executetest.Table{{
ColMeta: []flux.ColMeta{
@ -576,7 +576,6 @@ c,tag2=e humidity=65i,temperature=4.0 41`),
wantStr := pointsToStr(tc.want.result.Points)
if !cmp.Equal(gotStr, wantStr) {
t.Errorf("got other than expected %s", cmp.Diff(gotStr, wantStr))
}
})
@ -618,15 +617,10 @@ func pointsToStr(points []models.Point) string {
}
func mockPoints(org, bucket platform.ID, pointdata string) []models.Point {
points, err := models.ParsePoints([]byte(pointdata))
name := tsdb.EncodeName(org, bucket)
points, err := models.ParsePoints([]byte(pointdata), name[:])
if err != nil {
return nil
}
exploded, err := tsdb.ExplodePoints(org, bucket, points)
if err != nil {
return nil
}
return exploded
return points
}

View File

@ -21,7 +21,7 @@ func TestEngine_WriteAndIndex(t *testing.T) {
// Calling WritePoints when the engine is not open will return
// ErrEngineClosed.
if got, exp := engine.Write1xPoints(nil), storage.ErrEngineClosed; got != exp {
if got, exp := engine.Engine.WritePoints(context.TODO(), nil), storage.ErrEngineClosed; got != exp {
t.Fatalf("got %v, expected %v", got, exp)
}
@ -29,17 +29,21 @@ func TestEngine_WriteAndIndex(t *testing.T) {
pt := models.MustNewPoint(
"cpu",
models.Tags{{Key: []byte("host"), Value: []byte("server")}},
models.Tags{
{Key: models.MeasurementTagKeyBytes, Value: []byte("cpu")},
{Key: []byte("host"), Value: []byte("server")},
{Key: models.FieldKeyTagKeyBytes, Value: []byte("value")},
},
map[string]interface{}{"value": 1.0},
time.Unix(1, 2),
)
if err := engine.Write1xPoints([]models.Point{pt}); err != nil {
if err := engine.Engine.WritePoints(context.TODO(), []models.Point{pt}); err != nil {
t.Fatal(err)
}
pt.SetTime(time.Unix(2, 3))
if err := engine.Write1xPoints([]models.Point{pt}); err != nil {
if err := engine.Engine.WritePoints(context.TODO(), []models.Point{pt}); err != nil {
t.Fatal(err)
}
@ -57,7 +61,7 @@ func TestEngine_WriteAndIndex(t *testing.T) {
// and ensure that we can still write data
pt.SetTime(time.Unix(2, 6))
if err := engine.Write1xPoints([]models.Point{pt}); err != nil {
if err := engine.Engine.WritePoints(context.TODO(), []models.Point{pt}); err != nil {
t.Fatal(err)
}
}
@ -74,7 +78,7 @@ func TestEngine_TimeTag(t *testing.T) {
time.Unix(1, 2),
)
if err := engine.Write1xPoints([]models.Point{pt}); err == nil {
if err := engine.Engine.WritePoints(context.TODO(), []models.Point{pt}); err == nil {
t.Fatal("expected error: got nil")
}
@ -85,7 +89,7 @@ func TestEngine_TimeTag(t *testing.T) {
time.Unix(1, 2),
)
if err := engine.Write1xPoints([]models.Point{pt}); err == nil {
if err := engine.Engine.WritePoints(context.TODO(), []models.Point{pt}); err == nil {
t.Fatalf("unexpected error: %v", err)
}
}
@ -102,7 +106,7 @@ func TestEngine_InvalidTag(t *testing.T) {
time.Unix(1, 2),
)
if err := engine.Write1xPoints([]models.Point{pt}); err == nil {
if err := engine.WritePoints(context.TODO(), []models.Point{pt}); err == nil {
fmt.Println(pt.String())
t.Fatal("expected error: got nil")
}
@ -114,7 +118,7 @@ func TestEngine_InvalidTag(t *testing.T) {
time.Unix(1, 2),
)
if err := engine.Write1xPoints([]models.Point{pt}); err == nil {
if err := engine.WritePoints(context.TODO(), []models.Point{pt}); err == nil {
t.Fatalf("unexpected error: %v", err)
}
}
@ -124,25 +128,34 @@ func TestWrite_TimeField(t *testing.T) {
defer engine.Close()
engine.MustOpen()
name := tsdb.EncodeNameString(engine.org, engine.bucket)
pt := models.MustNewPoint(
"cpu",
models.NewTags(map[string]string{}),
name,
models.NewTags(map[string]string{models.FieldKeyTagKey: "time", models.MeasurementTagKey: "cpu"}),
map[string]interface{}{"time": 1.0},
time.Unix(1, 2),
)
if err := engine.Write1xPoints([]models.Point{pt}); err == nil {
if err := engine.Engine.WritePoints(context.TODO(), []models.Point{pt}); err == nil {
t.Fatal("expected error: got nil")
}
pt = models.MustNewPoint(
"cpu",
models.NewTags(map[string]string{}),
map[string]interface{}{"value": 1.1, "time": 1.0},
var points []models.Point
points = append(points, models.MustNewPoint(
name,
models.NewTags(map[string]string{models.FieldKeyTagKey: "time", models.MeasurementTagKey: "cpu"}),
map[string]interface{}{"time": 1.0},
time.Unix(1, 2),
)
))
points = append(points, models.MustNewPoint(
name,
models.NewTags(map[string]string{models.FieldKeyTagKey: "value", models.MeasurementTagKey: "cpu"}),
map[string]interface{}{"value": 1.1},
time.Unix(1, 2),
))
if err := engine.Write1xPoints([]models.Point{pt}); err == nil {
if err := engine.Engine.WritePoints(context.TODO(), points); err == nil {
t.Fatal("expected error: got nil")
}
}
@ -152,28 +165,32 @@ func TestEngine_WriteAddNewField(t *testing.T) {
defer engine.Close()
engine.MustOpen()
pt := models.MustNewPoint(
"cpu",
models.NewTags(map[string]string{"host": "server"}),
name := tsdb.EncodeNameString(engine.org, engine.bucket)
if err := engine.Engine.WritePoints(context.TODO(), []models.Point{models.MustNewPoint(
name,
models.NewTags(map[string]string{models.FieldKeyTagKey: "value", models.MeasurementTagKey: "cpu", "host": "server"}),
map[string]interface{}{"value": 1.0},
time.Unix(1, 2),
)
err := engine.Write1xPoints([]models.Point{pt})
if err != nil {
t.Fatal(err)
)}); err != nil {
t.Fatalf(err.Error())
}
pt = models.MustNewPoint(
"cpu",
models.NewTags(map[string]string{"host": "server"}),
map[string]interface{}{"value": 1.0, "value2": 2.0},
time.Unix(1, 2),
)
err = engine.Write1xPoints([]models.Point{pt})
if err != nil {
t.Fatal(err)
if err := engine.Engine.WritePoints(context.TODO(), []models.Point{
models.MustNewPoint(
name,
models.NewTags(map[string]string{models.FieldKeyTagKey: "value", models.MeasurementTagKey: "cpu", "host": "server"}),
map[string]interface{}{"value": 1.0},
time.Unix(1, 2),
),
models.MustNewPoint(
name,
models.NewTags(map[string]string{models.FieldKeyTagKey: "value2", models.MeasurementTagKey: "cpu", "host": "server"}),
map[string]interface{}{"value2": 2.0},
time.Unix(1, 2),
),
}); err != nil {
t.Fatalf(err.Error())
}
if got, exp := engine.SeriesCardinality(), int64(2); got != exp {
@ -186,27 +203,34 @@ func TestEngine_DeleteBucket(t *testing.T) {
defer engine.Close()
engine.MustOpen()
pt := models.MustNewPoint(
"cpu",
models.NewTags(map[string]string{"host": "server"}),
orgID, _ := influxdb.IDFromString("3131313131313131")
bucketID, _ := influxdb.IDFromString("8888888888888888")
err := engine.Engine.WritePoints(context.TODO(), []models.Point{models.MustNewPoint(
tsdb.EncodeNameString(engine.org, engine.bucket),
models.NewTags(map[string]string{models.FieldKeyTagKey: "value", models.MeasurementTagKey: "cpu", "host": "server"}),
map[string]interface{}{"value": 1.0},
time.Unix(1, 2),
)
err := engine.Write1xPoints([]models.Point{pt})
)})
if err != nil {
t.Fatal(err)
}
pt = models.MustNewPoint(
"cpu",
models.NewTags(map[string]string{"host": "server"}),
map[string]interface{}{"value": 1.0, "value2": 2.0},
time.Unix(1, 3),
)
// Same org, different bucket.
err = engine.Write1xPointsWithOrgBucket([]models.Point{pt}, "3131313131313131", "8888888888888888")
err = engine.Engine.WritePoints(context.TODO(), []models.Point{
models.MustNewPoint(
tsdb.EncodeNameString(*orgID, *bucketID),
models.NewTags(map[string]string{models.FieldKeyTagKey: "value", models.MeasurementTagKey: "cpu", "host": "server"}),
map[string]interface{}{"value": 1.0},
time.Unix(1, 3),
),
models.MustNewPoint(
tsdb.EncodeNameString(*orgID, *bucketID),
models.NewTags(map[string]string{models.FieldKeyTagKey: "value2", models.MeasurementTagKey: "cpu", "host": "server"}),
map[string]interface{}{"value2": 2.0},
time.Unix(1, 3),
),
})
if err != nil {
t.Fatal(err)
}
@ -252,12 +276,16 @@ func TestEngineClose_RemoveIndex(t *testing.T) {
pt := models.MustNewPoint(
"cpu",
models.NewTags(map[string]string{"host": "server"}),
models.Tags{
{Key: models.MeasurementTagKeyBytes, Value: []byte("cpu")},
{Key: []byte("host"), Value: []byte("server")},
{Key: models.FieldKeyTagKeyBytes, Value: []byte("value")},
},
map[string]interface{}{"value": 1.0},
time.Unix(1, 2),
)
err := engine.Write1xPoints([]models.Point{pt})
err := engine.Engine.WritePoints(context.TODO(), []models.Point{pt})
if err != nil {
t.Fatal(err)
}
@ -285,12 +313,16 @@ func TestEngine_WALDisabled(t *testing.T) {
pt := models.MustNewPoint(
"cpu",
models.NewTags(map[string]string{"host": "server"}),
models.Tags{
{Key: models.MeasurementTagKeyBytes, Value: []byte("cpu")},
{Key: []byte("host"), Value: []byte("server")},
{Key: models.FieldKeyTagKeyBytes, Value: []byte("value")},
},
map[string]interface{}{"value": 1.0},
time.Unix(1, 2),
)
if err := engine.Write1xPoints([]models.Point{pt}); err != nil {
if err := engine.Engine.WritePoints(context.TODO(), []models.Point{pt}); err != nil {
t.Fatal(err)
}
}
@ -300,20 +332,22 @@ func TestEngine_WriteConflictingBatch(t *testing.T) {
defer engine.Close()
engine.MustOpen()
pt1 := models.MustNewPoint(
"cpu",
models.NewTags(map[string]string{"host": "server"}),
map[string]interface{}{"value": 1.0},
time.Unix(1, 2),
)
pt2 := models.MustNewPoint(
"cpu",
models.NewTags(map[string]string{"host": "server"}),
map[string]interface{}{"value": 2},
time.Unix(1, 2),
)
name := tsdb.EncodeNameString(engine.org, engine.bucket)
err := engine.Write1xPoints([]models.Point{pt1, pt2})
err := engine.Engine.WritePoints(context.TODO(), []models.Point{
models.MustNewPoint(
name,
models.NewTags(map[string]string{models.FieldKeyTagKey: "value", models.MeasurementTagKey: "cpu", "host": "server"}),
map[string]interface{}{"value": 1.0},
time.Unix(1, 2),
),
models.MustNewPoint(
name,
models.NewTags(map[string]string{models.FieldKeyTagKey: "value", models.MeasurementTagKey: "cpu", "host": "server"}),
map[string]interface{}{"value": 2},
time.Unix(1, 2),
),
})
if _, ok := err.(tsdb.PartialWriteError); !ok {
t.Fatal("expected partial write error. got:", err)
}
@ -335,7 +369,7 @@ func BenchmarkDeleteBucket(b *testing.B) {
)
}
if err := engine.Write1xPoints(points); err != nil {
if err := engine.Engine.WritePoints(context.TODO(), points); err != nil {
panic(err)
}
}
@ -405,6 +439,7 @@ func (e *Engine) MustOpen() {
}
}
/*
// Write1xPoints converts old style points into the new 2.0 engine format.
// This allows us to use the old `models` package helper functions and still write
// the points in the correct format.
@ -434,6 +469,7 @@ func (e *Engine) Write1xPointsWithOrgBucket(pts []models.Point, org, bucket stri
}
return e.Engine.WritePoints(context.TODO(), points)
}
*/
// Close closes the engine and removes all temporary data.
func (e *Engine) Close() error {

View File

@ -22,6 +22,12 @@ func EncodeName(org, bucket platform.ID) [16]byte {
return nameBytes
}
// EncodeNameString converts org/bucket pairs to the tsdb internal serialization
func EncodeNameString(org, bucket platform.ID) string {
name := EncodeName(org, bucket)
return string(name[:])
}
// ExplodePoints creates a list of points that only contains one field per point. It also
// moves the measurement to a tag, and changes the measurement to be the provided argument.
func ExplodePoints(org, bucket platform.ID, points []models.Point) ([]models.Point, error) {

View File

@ -2,13 +2,9 @@ package tsdb_test
import (
"fmt"
"reflect"
"sort"
"testing"
"github.com/google/go-cmp/cmp"
platform "github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/tsdb"
)
@ -44,44 +40,3 @@ func TestNames(t *testing.T) {
})
}
}
func TestExplodePoints(t *testing.T) {
points, err := models.ParsePointsString(`
cpu,t1=a,t2=q f1=5,f2="f" 9
mem,t1=b,t2=w f1=6,f2="g",f3=true 8
cpu,t3=e,t1=c f3=7,f4="h" 7
mem,t1=d,t2=r,t4=g f1=8,f2="i" 6
`)
if err != nil {
t.Fatal(err)
}
org := platform.ID(0x4F4F4F4F4F4F4F4F)
bucket := platform.ID(0x4242424242424242)
points, err = tsdb.ExplodePoints(org, bucket, points)
if err != nil {
t.Fatal(err)
}
var lines []string
for _, point := range points {
lines = append(lines, point.String())
}
sort.Strings(lines)
expected := []string{
"OOOOOOOOBBBBBBBB,\x00=cpu,t1=a,t2=q,\xff=f1 f1=5 9",
"OOOOOOOOBBBBBBBB,\x00=cpu,t1=a,t2=q,\xff=f2 f2=\"f\" 9",
"OOOOOOOOBBBBBBBB,\x00=cpu,t1=c,t3=e,\xff=f3 f3=7 7",
"OOOOOOOOBBBBBBBB,\x00=cpu,t1=c,t3=e,\xff=f4 f4=\"h\" 7",
"OOOOOOOOBBBBBBBB,\x00=mem,t1=b,t2=w,\xff=f1 f1=6 8",
"OOOOOOOOBBBBBBBB,\x00=mem,t1=b,t2=w,\xff=f2 f2=\"g\" 8",
"OOOOOOOOBBBBBBBB,\x00=mem,t1=b,t2=w,\xff=f3 f3=true 8",
"OOOOOOOOBBBBBBBB,\x00=mem,t1=d,t2=r,t4=g,\xff=f1 f1=8 6",
"OOOOOOOOBBBBBBBB,\x00=mem,t1=d,t2=r,t4=g,\xff=f2 f2=\"i\" 6",
}
if !reflect.DeepEqual(lines, expected) {
t.Fatal("bad output:\n", cmp.Diff(lines, expected))
}
}

View File

@ -186,7 +186,7 @@ func BenchmarkIndex_TagSets(b *testing.B) {
b.Fatal(err)
}
points, err := models.ParsePoints(data)
points, err := models.ParsePoints(data, []byte("mm"))
if err != nil {
b.Fatal(err)
}
@ -269,7 +269,7 @@ func BenchmarkIndex_ConcurrentWriteQuery(b *testing.B) {
b.Fatal(err)
}
points, err := models.ParsePoints(data)
points, err := models.ParsePoints(data, []byte("mm"))
if err != nil {
b.Fatal(err)
}

View File

@ -11,14 +11,14 @@ import (
func TestEngine_DeleteBucket(t *testing.T) {
// Create a few points.
p1 := MustParsePointString("cpu,host=0 value=1.1 6")
p2 := MustParsePointString("cpu,host=A value=1.2 2")
p3 := MustParsePointString("cpu,host=A value=1.3 3")
p4 := MustParsePointString("cpu,host=B value=1.3 4")
p5 := MustParsePointString("cpu,host=B value=1.3 5")
p6 := MustParsePointString("cpu,host=C value=1.3 1")
p7 := MustParsePointString("mem,host=C value=1.3 1")
p8 := MustParsePointString("disk,host=C value=1.3 1")
p1 := MustParsePointString("cpu,host=0 value=1.1 6", "mm0")
p2 := MustParsePointString("cpu,host=A value=1.2 2", "mm0")
p3 := MustParsePointString("cpu,host=A value=1.3 3", "mm0")
p4 := MustParsePointString("cpu,host=B value=1.3 4", "mm0")
p5 := MustParsePointString("cpu,host=B value=1.3 5", "mm0")
p6 := MustParsePointString("cpu,host=C value=1.3 1", "mm0")
p7 := MustParsePointString("mem,host=C value=1.3 1", "mm1")
p8 := MustParsePointString("disk,host=C value=1.3 1", "mm2")
e, err := NewEngine()
if err != nil {
@ -42,7 +42,7 @@ func TestEngine_DeleteBucket(t *testing.T) {
t.Fatalf("series count mismatch: exp %v, got %v", exp, got)
}
if err := e.DeleteBucketRange([]byte("cpu"), 0, 3); err != nil {
if err := e.DeleteBucketRange([]byte("mm0"), 0, 3); err != nil {
t.Fatalf("failed to delete series: %v", err)
}
@ -52,17 +52,17 @@ func TestEngine_DeleteBucket(t *testing.T) {
}
exp := map[string]byte{
"cpu,host=0#!~#value": 0,
"cpu,host=B#!~#value": 0,
"disk,host=C#!~#value": 0,
"mem,host=C#!~#value": 0,
"mm0,\x00=cpu,host=0,\xff=value#!~#value": 0,
"mm0,\x00=cpu,host=B,\xff=value#!~#value": 0,
"mm1,\x00=mem,host=C,\xff=value#!~#value": 0,
"mm2,\x00=disk,host=C,\xff=value#!~#value": 0,
}
if !reflect.DeepEqual(keys, exp) {
t.Fatalf("unexpected series in file store: %v != %v", keys, exp)
}
// Check that the series still exists in the index
iter, err := e.index.MeasurementSeriesIDIterator([]byte("cpu"))
iter, err := e.index.MeasurementSeriesIDIterator([]byte("mm0"))
if err != nil {
t.Fatalf("iterator error: %v", err)
}
@ -78,17 +78,17 @@ func TestEngine_DeleteBucket(t *testing.T) {
// Lookup series.
name, tags := e.sfile.Series(elem.SeriesID)
if got, exp := name, []byte("cpu"); !bytes.Equal(got, exp) {
if got, exp := name, []byte("mm0"); !bytes.Equal(got, exp) {
t.Fatalf("series mismatch: got %s, exp %s", got, exp)
}
if !tags.Equal(models.NewTags(map[string]string{"host": "0"})) && !tags.Equal(models.NewTags(map[string]string{"host": "B"})) {
if !tags.Equal(models.NewTags(map[string]string{models.FieldKeyTagKey: "value", models.MeasurementTagKey: "cpu", "host": "0"})) && !tags.Equal(models.NewTags(map[string]string{models.FieldKeyTagKey: "value", models.MeasurementTagKey: "cpu", "host": "B"})) {
t.Fatalf(`series mismatch: got %s, exp either "host=0" or "host=B"`, tags)
}
iter.Close()
// Deleting remaining series should remove them from the series.
if err := e.DeleteBucketRange([]byte("cpu"), 0, 9); err != nil {
if err := e.DeleteBucketRange([]byte("mm0"), 0, 9); err != nil {
t.Fatalf("failed to delete series: %v", err)
}
@ -98,14 +98,14 @@ func TestEngine_DeleteBucket(t *testing.T) {
}
exp = map[string]byte{
"disk,host=C#!~#value": 0,
"mem,host=C#!~#value": 0,
"mm1,\x00=mem,host=C,\xff=value#!~#value": 0,
"mm2,\x00=disk,host=C,\xff=value#!~#value": 0,
}
if !reflect.DeepEqual(keys, exp) {
t.Fatalf("unexpected series in file store: %v != %v", keys, exp)
}
if iter, err = e.index.MeasurementSeriesIDIterator([]byte("cpu")); err != nil {
if iter, err = e.index.MeasurementSeriesIDIterator([]byte("mm0")); err != nil {
t.Fatalf("iterator error: %v", err)
}
if iter != nil {

View File

@ -150,7 +150,7 @@ func TestEngine_ShouldCompactCache(t *testing.T) {
t.Fatalf("got status %v, exp status %v - nothing written to cache, so should not compact", got, exp)
}
if err := e.WritePointsString("m,k=v f=3i"); err != nil {
if err := e.WritePointsString("mm", "m,k=v f=3i"); err != nil {
t.Fatal(err)
}
@ -245,7 +245,7 @@ func BenchmarkEngine_WritePoints(b *testing.B) {
e := MustOpenEngine()
pp := make([]models.Point, 0, sz)
for i := 0; i < sz; i++ {
p := MustParsePointString(fmt.Sprintf("cpu,host=%d value=1.2", i))
p := MustParsePointString(fmt.Sprintf("cpu,host=%d value=1.2", i), "mm")
pp = append(pp, p)
}
@ -270,7 +270,7 @@ func BenchmarkEngine_WritePoints_Parallel(b *testing.B) {
cpus := runtime.GOMAXPROCS(0)
pp := make([]models.Point, 0, sz*cpus)
for i := 0; i < sz*cpus; i++ {
p := MustParsePointString(fmt.Sprintf("cpu,host=%d value=1.2,other=%di", i, i))
p := MustParsePointString(fmt.Sprintf("cpu,host=%d value=1.2,other=%di", i, i), "mm")
pp = append(pp, p)
}
@ -435,8 +435,8 @@ func (e *Engine) AddSeries(name string, tags map[string]string) error {
// WritePointsString calls WritePointsString on the underlying engine, but also
// adds the associated series to the index.
func (e *Engine) WritePointsString(ptstr ...string) error {
points, err := models.ParsePointsString(strings.Join(ptstr, "\n"))
func (e *Engine) WritePointsString(mm string, ptstr ...string) error {
points, err := models.ParsePointsString(strings.Join(ptstr, "\n"), mm)
if err != nil {
return err
}
@ -532,8 +532,8 @@ func (f *SeriesFile) Close() {
}
// MustParsePointsString parses points from a string. Panic on error.
func MustParsePointsString(buf string) []models.Point {
a, err := models.ParsePointsString(buf)
func MustParsePointsString(buf, mm string) []models.Point {
a, err := models.ParsePointsString(buf, mm)
if err != nil {
panic(err)
}
@ -543,16 +543,13 @@ func MustParsePointsString(buf string) []models.Point {
// MustParseExplodePoints parses points from a string and transforms using
// ExplodePoints using the provided org and bucket. Panic on error.
func MustParseExplodePoints(org, bucket influxdb.ID, buf string) []models.Point {
a := MustParsePointsString(buf)
a, err := tsdb.ExplodePoints(org, bucket, a)
if err != nil {
panic(err)
}
return a
encoded := tsdb.EncodeName(org, bucket)
name := models.EscapeMeasurement(encoded[:])
return MustParsePointsString(buf, string(name))
}
// MustParsePointString parses the first point from a string. Panic on error.
func MustParsePointString(buf string) models.Point { return MustParsePointsString(buf)[0] }
func MustParsePointString(buf, mm string) models.Point { return MustParsePointsString(buf, mm)[0] }
type mockPlanner struct{}