Merge point parse & explode.

pull/11617/head
Ben Johnson 2019-01-25 11:08:01 -07:00
parent ca3d66a7a1
commit 1004abc3e1
No known key found for this signature in database
GPG Key ID: 048846D1E3EB6818
13 changed files with 824 additions and 620 deletions

View File

@ -4,8 +4,6 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"github.com/influxdata/influxdb/tsdb"
"github.com/influxdata/influxdb/nats" "github.com/influxdata/influxdb/nats"
"github.com/influxdata/influxdb/storage" "github.com/influxdata/influxdb/storage"
"go.uber.org/zap" "go.uber.org/zap"
@ -22,10 +20,6 @@ func (s PointWriter) Record(collected MetricsCollection) error {
if err != nil { if err != nil {
return err return err
} }
ps, err = tsdb.ExplodePoints(collected.OrgID, collected.BucketID, ps)
if err != nil {
return err
}
return s.Writer.WritePoints(context.TODO(), ps) return s.Writer.WritePoints(context.TODO(), ps)
} }

View File

@ -194,7 +194,8 @@ func (h *WriteHandler) handleWrite(w http.ResponseWriter, r *http.Request) {
return return
} }
points, err := models.ParsePointsWithPrecision(data, time.Now(), req.Precision) mm := tsdb.EncodeName(org.ID, bucket.ID)
points, err := models.ParsePointsWithPrecision(data, mm[:], time.Now(), req.Precision)
if err != nil { if err != nil {
logger.Error("Error parsing points", zap.Error(err)) logger.Error("Error parsing points", zap.Error(err))
EncodeError(ctx, &platform.Error{ EncodeError(ctx, &platform.Error{
@ -206,19 +207,7 @@ func (h *WriteHandler) handleWrite(w http.ResponseWriter, r *http.Request) {
return return
} }
exploded, err := tsdb.ExplodePoints(org.ID, bucket.ID, points) if err := h.PointsWriter.WritePoints(ctx, points); err != nil {
if err != nil {
logger.Error("Error exploding points", zap.Error(err))
EncodeError(ctx, &platform.Error{
Code: platform.EInternal,
Op: "http/handleWrite",
Msg: fmt.Sprintf("unable to convert points to internal structures: %v", err),
Err: err,
}, w)
return
}
if err := h.PointsWriter.WritePoints(ctx, exploded); err != nil {
logger.Error("Error writing points", zap.Error(err)) logger.Error("Error writing points", zap.Error(err))
EncodeError(ctx, &platform.Error{ EncodeError(ctx, &platform.Error{
Code: platform.EInternal, Code: platform.EInternal,

View File

@ -18,6 +18,12 @@ import (
"github.com/influxdata/influxdb/pkg/escape" "github.com/influxdata/influxdb/pkg/escape"
) )
// Values used to store the field key and measurement name as tags.
const (
FieldKeyTagKey = "_f"
MeasurementTagKey = "_m"
)
type escapeSet struct { type escapeSet struct {
k [1]byte k [1]byte
esc [2]byte esc [2]byte
@ -276,13 +282,13 @@ const (
// ParsePoints returns a slice of Points from a text representation of a point // ParsePoints returns a slice of Points from a text representation of a point
// with each point separated by newlines. If any points fail to parse, a non-nil error // with each point separated by newlines. If any points fail to parse, a non-nil error
// will be returned in addition to the points that parsed successfully. // will be returned in addition to the points that parsed successfully.
func ParsePoints(buf []byte) ([]Point, error) { func ParsePoints(buf, mm []byte) ([]Point, error) {
return ParsePointsWithPrecision(buf, time.Now().UTC(), "n") return ParsePointsWithPrecision(buf, mm, time.Now().UTC(), "n")
} }
// ParsePointsString is identical to ParsePoints but accepts a string. // ParsePointsString is identical to ParsePoints but accepts a string.
func ParsePointsString(buf string) ([]Point, error) { func ParsePointsString(buf, mm string) ([]Point, error) {
return ParsePoints([]byte(buf)) return ParsePoints([]byte(buf), []byte(mm))
} }
// ParseKey returns the measurement name and tags from a point. // ParseKey returns the measurement name and tags from a point.
@ -347,7 +353,7 @@ func ValidPrecision(precision string) bool {
// //
// NOTE: to minimize heap allocations, the returned Points will refer to subslices of buf. // NOTE: to minimize heap allocations, the returned Points will refer to subslices of buf.
// This can have the unintended effect preventing buf from being garbage collected. // This can have the unintended effect preventing buf from being garbage collected.
func ParsePointsWithPrecision(buf []byte, defaultTime time.Time, precision string) ([]Point, error) { func ParsePointsWithPrecision(buf []byte, mm []byte, defaultTime time.Time, precision string) (_ []Point, err error) {
points := make([]Point, 0, bytes.Count(buf, []byte{'\n'})+1) points := make([]Point, 0, bytes.Count(buf, []byte{'\n'})+1)
var ( var (
pos int pos int
@ -379,13 +385,10 @@ func ParsePointsWithPrecision(buf []byte, defaultTime time.Time, precision strin
block = block[:len(block)-1] block = block[:len(block)-1]
} }
pt, err := parsePoint(block[start:], defaultTime, precision) points, err = parsePointsAppend(points, block[start:], mm, defaultTime, precision)
if err != nil { if err != nil {
failed = append(failed, fmt.Sprintf("unable to parse '%s': %v", string(block[start:]), err)) failed = append(failed, fmt.Sprintf("unable to parse '%s': %v", string(block[start:]), err))
} else {
points = append(points, pt)
} }
} }
if len(failed) > 0 { if len(failed) > 0 {
return points, fmt.Errorf("%s", strings.Join(failed, "\n")) return points, fmt.Errorf("%s", strings.Join(failed, "\n"))
@ -394,61 +397,52 @@ func ParsePointsWithPrecision(buf []byte, defaultTime time.Time, precision strin
} }
func parsePoint(buf []byte, defaultTime time.Time, precision string) (Point, error) { func parsePointsAppend(points []Point, buf []byte, mm []byte, defaultTime time.Time, precision string) ([]Point, error) {
// scan the first block which is measurement[,tag1=value1,tag2=value=2...] // scan the first block which is measurement[,tag1=value1,tag2=value=2...]
pos, key, err := scanKey(buf, 0) pos, key, err := scanKey(buf, 0)
if err != nil { if err != nil {
return nil, err return points, err
} }
// measurement name is required // measurement name is required
if len(key) == 0 { if len(key) == 0 {
return nil, fmt.Errorf("missing measurement") return points, fmt.Errorf("missing measurement")
} }
if len(key) > MaxKeyLength { if len(key) > MaxKeyLength {
return nil, fmt.Errorf("max key length exceeded: %v > %v", len(key), MaxKeyLength) return points, fmt.Errorf("max key length exceeded: %v > %v", len(key), MaxKeyLength)
}
// Since the measurement is converted to a tag and measurements & tags have
// different escaping rules, we need to check if the measurement needs escaping.
_, i, _ := scanMeasurement(key, 0)
keyMeasurement := key[:i-1]
if bytes.IndexByte(keyMeasurement, '=') != -1 {
escapedKeyMeasurement := bytes.Replace(keyMeasurement, []byte("="), []byte(`\=`), -1)
newKey := make([]byte, len(escapedKeyMeasurement)+(len(key)-len(keyMeasurement)))
copy(newKey, escapedKeyMeasurement)
copy(newKey[len(escapedKeyMeasurement):], key[len(keyMeasurement):])
key = newKey
} }
// scan the second block is which is field1=value1[,field2=value2,...] // scan the second block is which is field1=value1[,field2=value2,...]
// at least one field is required
pos, fields, err := scanFields(buf, pos) pos, fields, err := scanFields(buf, pos)
if err != nil { if err != nil {
return nil, err return points, err
} } else if len(fields) == 0 {
return points, fmt.Errorf("missing fields")
// at least one field is required
if len(fields) == 0 {
return nil, fmt.Errorf("missing fields")
}
var maxKeyErr error
err = walkFields(fields, func(k, v []byte) bool {
if sz := seriesKeySize(key, k); sz > MaxKeyLength {
maxKeyErr = fmt.Errorf("max key length exceeded: %v > %v", sz, MaxKeyLength)
return false
}
return true
})
if err != nil {
return nil, err
}
if maxKeyErr != nil {
return nil, maxKeyErr
} }
// scan the last block which is an optional integer timestamp // scan the last block which is an optional integer timestamp
pos, ts, err := scanTime(buf, pos) pos, ts, err := scanTime(buf, pos)
if err != nil { if err != nil {
return nil, err return points, err
} }
pt := &point{ // Build point with timestamp only.
key: key, pt := point{ts: ts}
fields: fields,
ts: ts,
}
if len(ts) == 0 { if len(ts) == 0 {
pt.time = defaultTime pt.time = defaultTime
@ -456,23 +450,67 @@ func parsePoint(buf []byte, defaultTime time.Time, precision string) (Point, err
} else { } else {
ts, err := parseIntBytes(ts, 10, 64) ts, err := parseIntBytes(ts, 10, 64)
if err != nil { if err != nil {
return nil, err return points, err
} }
pt.time, err = SafeCalcTime(ts, precision) pt.time, err = SafeCalcTime(ts, precision)
if err != nil { if err != nil {
return nil, err return points, err
} }
// Determine if there are illegal non-whitespace characters after the // Determine if there are illegal non-whitespace characters after the
// timestamp block. // timestamp block.
for pos < len(buf) { for pos < len(buf) {
if buf[pos] != ' ' { if buf[pos] != ' ' {
return nil, ErrInvalidPoint return points, ErrInvalidPoint
} }
pos++ pos++
} }
} }
return pt, nil
// Loop over fields and split points while validating field.
var maxKeyErr error
if err := walkFields(fields, func(k, v, fieldBuf []byte) bool {
// Build new key with measurement & field as keys.
newKey := newV2Key(key, mm, k)
if sz := seriesKeySizeV2(key, mm, k); sz > MaxKeyLength {
maxKeyErr = fmt.Errorf("max key length exceeded: %v > %v", sz, MaxKeyLength)
return false
}
other := pt
other.key = newKey
other.fields = fieldBuf
points = append(points, &other)
return true
}); err != nil {
return points, err
} else if maxKeyErr != nil {
return points, maxKeyErr
}
return points, nil
}
// newV2Key returns a new key by converting the old measurement & field into keys.
func newV2Key(oldKey, mm, field []byte) []byte {
newKey := make([]byte, len(mm)+1+len(MeasurementTagKey)+1+len(oldKey)+1+len(FieldKeyTagKey)+1+len(field))
buf := newKey
copy(buf, mm)
buf = buf[len(mm):]
buf[0], buf = ',', buf[1:]
buf[0], buf[1], buf[2], buf = '_', 'f', '=', buf[3:]
copy(buf, field)
buf = buf[len(field):]
buf[0], buf = ',', buf[1:]
buf[0], buf[1], buf[2], buf = '_', 'm', '=', buf[3:]
copy(buf, oldKey)
return newKey
} }
// GetPrecisionMultiplier will return a multiplier for the precision specified. // GetPrecisionMultiplier will return a multiplier for the precision specified.
@ -1401,7 +1439,7 @@ func pointKey(measurement string, tags Tags, fields Fields, t time.Time) ([]byte
key := MakeKey([]byte(measurement), tags) key := MakeKey([]byte(measurement), tags)
for field := range fields { for field := range fields {
sz := seriesKeySize(key, []byte(field)) sz := seriesKeySizeV1(key, []byte(field))
if sz > MaxKeyLength { if sz > MaxKeyLength {
return nil, fmt.Errorf("max key length exceeded: %v > %v", sz, MaxKeyLength) return nil, fmt.Errorf("max key length exceeded: %v > %v", sz, MaxKeyLength)
} }
@ -1410,10 +1448,12 @@ func pointKey(measurement string, tags Tags, fields Fields, t time.Time) ([]byte
return key, nil return key, nil
} }
func seriesKeySize(key, field []byte) int { func seriesKeySizeV1(key, field []byte) int {
// 4 is the length of the tsm1.fieldKeySeparator constant. It's inlined here to avoid a circular return len(key) + len("#!~#") + len(field)
// dependency. }
return len(key) + 4 + len(field)
func seriesKeySizeV2(key, mm, field []byte) int {
return len(mm) + len(",_f=") + len(field) + len(",_m=") + len(key) + len("#!~#") + len(field)
} }
// NewPointFromBytes returns a new Point from a marshalled Point. // NewPointFromBytes returns a new Point from a marshalled Point.
@ -1582,10 +1622,12 @@ func walkTags(buf []byte, fn func(key, value []byte) bool) {
// walkFields walks each field key and value via fn. If fn returns false, the iteration // walkFields walks each field key and value via fn. If fn returns false, the iteration
// is stopped. The values are the raw byte slices and not the converted types. // is stopped. The values are the raw byte slices and not the converted types.
func walkFields(buf []byte, fn func(key, value []byte) bool) error { func walkFields(buf []byte, fn func(key, value, data []byte) bool) error {
var i int var i int
var key, val []byte var key, val []byte
for len(buf) > 0 { for len(buf) > 0 {
data := buf
i, key = scanTo(buf, 0, '=') i, key = scanTo(buf, 0, '=')
if i > len(buf)-2 { if i > len(buf)-2 {
return fmt.Errorf("invalid value: field-key=%s", key) return fmt.Errorf("invalid value: field-key=%s", key)
@ -1593,7 +1635,7 @@ func walkFields(buf []byte, fn func(key, value []byte) bool) error {
buf = buf[i+1:] buf = buf[i+1:]
i, val = scanFieldValue(buf, 0) i, val = scanFieldValue(buf, 0)
buf = buf[i:] buf = buf[i:]
if !fn(key, val) { if !fn(key, val, data[:len(data)-len(buf)]) {
break break
} }

View File

@ -3,7 +3,7 @@ package models
import "testing" import "testing"
func TestMarshalPointNoFields(t *testing.T) { func TestMarshalPointNoFields(t *testing.T) {
points, err := ParsePointsString("m,k=v f=0i") points, err := ParsePointsString("m,k=v f=0i", "foo")
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

File diff suppressed because it is too large Load Diff

View File

@ -579,16 +579,32 @@ func writeTable(t *ToTransformation, tbl flux.Table) error {
measurementStats[measurementName].Update(mstats) measurementStats[measurementName].Update(mstats)
} }
pt, err := models.NewPoint(measurementName, tags, fields, pointTime) name := tsdb.EncodeNameString(*orgID, *bucketID)
if err != nil {
return err fieldNames := make([]string, 0, len(fields))
for k := range fields {
fieldNames = append(fieldNames, k)
} }
points = append(points, pt) sort.Strings(fieldNames)
for _, k := range fieldNames {
v := fields[k]
pointTags := append(models.Tags{
{Key: []byte("_f"), Value: []byte(k)},
{Key: []byte("_m"), Value: []byte(measurementName)},
}, tags...)
pt, err := models.NewPoint(name, pointTags, models.Fields{k: v}, pointTime)
if err != nil {
return err
}
points = append(points, pt)
}
if err := execute.AppendRecord(i, er, builder); err != nil { if err := execute.AppendRecord(i, er, builder); err != nil {
return err return err
} }
} }
points, err = tsdb.ExplodePoints(*orgID, *bucketID, points)
return d.PointsWriter.WritePoints(context.TODO(), points) return d.PointsWriter.WritePoints(context.TODO(), points)
}) })
} }

View File

@ -156,11 +156,11 @@ func TestTo_Process(t *testing.T) {
})}, })},
want: wanted{ want: wanted{
result: &mock.PointsWriter{ result: &mock.PointsWriter{
Points: mockPoints(oid, bid, `a _value=2.0 11 Points: mockPoints(oid, bid, `a _value=2 11
a _value=2.0 21 a _value=2 21
b _value=1.0 21 b _value=1 21
a _value=3.0 31 a _value=3 31
c _value=4.0 41`), c _value=4 41`),
}, },
tables: []*executetest.Table{{ tables: []*executetest.Table{{
ColMeta: []flux.ColMeta{ ColMeta: []flux.ColMeta{
@ -209,11 +209,11 @@ c _value=4.0 41`),
})}, })},
want: wanted{ want: wanted{
result: &mock.PointsWriter{ result: &mock.PointsWriter{
Points: mockPoints(oid, bid, `a,tag2=aa _value=2.0 11 Points: mockPoints(oid, bid, `a,tag2=aa _value=2 11
a,tag2=bb _value=2.0 21 a,tag2=bb _value=2 21
b,tag2=cc _value=1.0 21 b,tag2=cc _value=1 21
a,tag2=dd _value=3.0 31 a,tag2=dd _value=3 31
c,tag2=ee _value=4.0 41`), c,tag2=ee _value=4 41`),
}, },
tables: []*executetest.Table{{ tables: []*executetest.Table{{
ColMeta: []flux.ColMeta{ ColMeta: []flux.ColMeta{
@ -263,11 +263,11 @@ c,tag2=ee _value=4.0 41`),
})}, })},
want: wanted{ want: wanted{
result: &mock.PointsWriter{ result: &mock.PointsWriter{
Points: mockPoints(oid, bid, `m,tag1=a,tag2=aa _value=2.0 11 Points: mockPoints(oid, bid, `m,tag1=a,tag2=aa _value=2 11
m,tag1=a,tag2=bb _value=2.0 21 m,tag1=a,tag2=bb _value=2 21
m,tag1=b,tag2=cc _value=1.0 21 m,tag1=b,tag2=cc _value=1 21
m,tag1=a,tag2=dd _value=3.0 31 m,tag1=a,tag2=dd _value=3 31
m,tag1=c,tag2=ee _value=4.0 41`), m,tag1=c,tag2=ee _value=4 41`),
}, },
tables: []*executetest.Table{{ tables: []*executetest.Table{{
ColMeta: []flux.ColMeta{ ColMeta: []flux.ColMeta{
@ -336,11 +336,11 @@ m,tag1=c,tag2=ee _value=4.0 41`),
})}, })},
want: wanted{ want: wanted{
result: &mock.PointsWriter{ result: &mock.PointsWriter{
Points: mockPoints(oid, bid, `a temperature=2.0 11 Points: mockPoints(oid, bid, `a temperature=2 11
a temperature=2.0 21 a temperature=2 21
b temperature=1.0 21 b temperature=1 21
a temperature=3.0 31 a temperature=3 31
c temperature=4.0 41`), c temperature=4 41`),
}, },
tables: []*executetest.Table{{ tables: []*executetest.Table{{
ColMeta: []flux.ColMeta{ ColMeta: []flux.ColMeta{
@ -436,11 +436,11 @@ c temperature=4.0 41`),
})}, })},
want: wanted{ want: wanted{
result: &mock.PointsWriter{ result: &mock.PointsWriter{
Points: mockPoints(oid, bid, `a day="Monday",humidity=1.0,ratio=2.0,temperature=2.0 11 Points: mockPoints(oid, bid, `a day="Monday",humidity=1,ratio=2,temperature=2 11
a day="Tuesday",humidity=2.0,ratio=1.0,temperature=2.0 21 a day="Tuesday",humidity=2,ratio=1,temperature=2 21
b day="Wednesday",humidity=4.0,ratio=0.25,temperature=1.0 21 b day="Wednesday",humidity=4,ratio=0.25,temperature=1 21
a day="Thursday",humidity=3.0,ratio=1.0,temperature=3.0 31 a day="Thursday",humidity=3,ratio=1,temperature=3 31
c day="Friday",humidity=5.0,ratio=0.8,temperature=4.0 41`), c day="Friday",humidity=5,ratio=0.8,temperature=4 41`),
}, },
tables: []*executetest.Table{{ tables: []*executetest.Table{{
ColMeta: []flux.ColMeta{ ColMeta: []flux.ColMeta{
@ -522,11 +522,11 @@ c day="Friday",humidity=5.0,ratio=0.8,temperature=4.0 41`),
})}, })},
want: wanted{ want: wanted{
result: &mock.PointsWriter{ result: &mock.PointsWriter{
Points: mockPoints(oid, bid, `a,tag2=d humidity=50i,temperature=2.0 11 Points: mockPoints(oid, bid, `a,tag2=d humidity=50i,temperature=2 11
a,tag2=d humidity=50i,temperature=2.0 21 a,tag2=d humidity=50i,temperature=2 21
b,tag2=d humidity=50i,temperature=1.0 21 b,tag2=d humidity=50i,temperature=1 21
a,tag2=e humidity=60i,temperature=3.0 31 a,tag2=e humidity=60i,temperature=3 31
c,tag2=e humidity=65i,temperature=4.0 41`), c,tag2=e humidity=65i,temperature=4 41`),
}, },
tables: []*executetest.Table{{ tables: []*executetest.Table{{
ColMeta: []flux.ColMeta{ ColMeta: []flux.ColMeta{
@ -575,7 +575,6 @@ c,tag2=e humidity=65i,temperature=4.0 41`),
wantStr := pointsToStr(tc.want.result.Points) wantStr := pointsToStr(tc.want.result.Points)
if !cmp.Equal(gotStr, wantStr) { if !cmp.Equal(gotStr, wantStr) {
t.Errorf("got other than expected %s", cmp.Diff(gotStr, wantStr)) t.Errorf("got other than expected %s", cmp.Diff(gotStr, wantStr))
} }
}) })
@ -617,15 +616,10 @@ func pointsToStr(points []models.Point) string {
} }
func mockPoints(org, bucket platform.ID, pointdata string) []models.Point { func mockPoints(org, bucket platform.ID, pointdata string) []models.Point {
points, err := models.ParsePoints([]byte(pointdata)) name := tsdb.EncodeName(org, bucket)
points, err := models.ParsePoints([]byte(pointdata), name[:])
if err != nil { if err != nil {
return nil return nil
} }
return points
exploded, err := tsdb.ExplodePoints(org, bucket, points)
if err != nil {
return nil
}
return exploded
} }

View File

@ -21,7 +21,7 @@ func TestEngine_WriteAndIndex(t *testing.T) {
// Calling WritePoints when the engine is not open will return // Calling WritePoints when the engine is not open will return
// ErrEngineClosed. // ErrEngineClosed.
if got, exp := engine.Write1xPoints(nil), storage.ErrEngineClosed; got != exp { if got, exp := engine.Engine.WritePoints(context.TODO(), nil), storage.ErrEngineClosed; got != exp {
t.Fatalf("got %v, expected %v", got, exp) t.Fatalf("got %v, expected %v", got, exp)
} }
@ -34,12 +34,12 @@ func TestEngine_WriteAndIndex(t *testing.T) {
time.Unix(1, 2), time.Unix(1, 2),
) )
if err := engine.Write1xPoints([]models.Point{pt}); err != nil { if err := engine.Engine.WritePoints(context.TODO(), []models.Point{pt}); err != nil {
t.Fatal(err) t.Fatal(err)
} }
pt.SetTime(time.Unix(2, 3)) pt.SetTime(time.Unix(2, 3))
if err := engine.Write1xPoints([]models.Point{pt}); err != nil { if err := engine.Engine.WritePoints(context.TODO(), []models.Point{pt}); err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -57,7 +57,7 @@ func TestEngine_WriteAndIndex(t *testing.T) {
// and ensure that we can still write data // and ensure that we can still write data
pt.SetTime(time.Unix(2, 6)) pt.SetTime(time.Unix(2, 6))
if err := engine.Write1xPoints([]models.Point{pt}); err != nil { if err := engine.Engine.WritePoints(context.TODO(), []models.Point{pt}); err != nil {
t.Fatal(err) t.Fatal(err)
} }
} }
@ -74,7 +74,7 @@ func TestEngine_TimeTag(t *testing.T) {
time.Unix(1, 2), time.Unix(1, 2),
) )
if err := engine.Write1xPoints([]models.Point{pt}); err == nil { if err := engine.Engine.WritePoints(context.TODO(), []models.Point{pt}); err == nil {
t.Fatal("expected error: got nil") t.Fatal("expected error: got nil")
} }
@ -85,7 +85,7 @@ func TestEngine_TimeTag(t *testing.T) {
time.Unix(1, 2), time.Unix(1, 2),
) )
if err := engine.Write1xPoints([]models.Point{pt}); err == nil { if err := engine.Engine.WritePoints(context.TODO(), []models.Point{pt}); err == nil {
t.Fatalf("unexpected error: %v", err) t.Fatalf("unexpected error: %v", err)
} }
} }
@ -95,25 +95,34 @@ func TestWrite_TimeField(t *testing.T) {
defer engine.Close() defer engine.Close()
engine.MustOpen() engine.MustOpen()
name := tsdb.EncodeNameString(engine.org, engine.bucket)
pt := models.MustNewPoint( pt := models.MustNewPoint(
"cpu", name,
models.NewTags(map[string]string{}), models.NewTags(map[string]string{"_f": "time", "_m": "cpu"}),
map[string]interface{}{"time": 1.0}, map[string]interface{}{"time": 1.0},
time.Unix(1, 2), time.Unix(1, 2),
) )
if err := engine.Write1xPoints([]models.Point{pt}); err == nil { if err := engine.Engine.WritePoints(context.TODO(), []models.Point{pt}); err == nil {
t.Fatal("expected error: got nil") t.Fatal("expected error: got nil")
} }
pt = models.MustNewPoint( var points []models.Point
"cpu", points = append(points, models.MustNewPoint(
models.NewTags(map[string]string{}), name,
map[string]interface{}{"value": 1.1, "time": 1.0}, models.NewTags(map[string]string{"_f": "time", "_m": "cpu"}),
map[string]interface{}{"time": 1.0},
time.Unix(1, 2), time.Unix(1, 2),
) ))
points = append(points, models.MustNewPoint(
name,
models.NewTags(map[string]string{"_f": "value", "_m": "cpu"}),
map[string]interface{}{"value": 1.1},
time.Unix(1, 2),
))
if err := engine.Write1xPoints([]models.Point{pt}); err == nil { if err := engine.Engine.WritePoints(context.TODO(), points); err == nil {
t.Fatal("expected error: got nil") t.Fatal("expected error: got nil")
} }
} }
@ -123,27 +132,31 @@ func TestEngine_WriteAddNewField(t *testing.T) {
defer engine.Close() defer engine.Close()
engine.MustOpen() engine.MustOpen()
pt := models.MustNewPoint( name := tsdb.EncodeNameString(engine.org, engine.bucket)
"cpu",
models.NewTags(map[string]string{"host": "server"}), if err := engine.Engine.WritePoints(context.TODO(), []models.Point{models.MustNewPoint(
name,
models.NewTags(map[string]string{"_f": "value", "_m": "cpu", "host": "server"}),
map[string]interface{}{"value": 1.0}, map[string]interface{}{"value": 1.0},
time.Unix(1, 2), time.Unix(1, 2),
) )}); err != nil {
err := engine.Write1xPoints([]models.Point{pt})
if err != nil {
t.Fatalf(err.Error()) t.Fatalf(err.Error())
} }
pt = models.MustNewPoint( if err := engine.Engine.WritePoints(context.TODO(), []models.Point{
"cpu", models.MustNewPoint(
models.NewTags(map[string]string{"host": "server"}), name,
map[string]interface{}{"value": 1.0, "value2": 2.0}, models.NewTags(map[string]string{"_f": "value", "_m": "cpu", "host": "server"}),
time.Unix(1, 2), map[string]interface{}{"value": 1.0},
) time.Unix(1, 2),
),
err = engine.Write1xPoints([]models.Point{pt}) models.MustNewPoint(
if err != nil { name,
models.NewTags(map[string]string{"_f": "value2", "_m": "cpu", "host": "server"}),
map[string]interface{}{"value2": 2.0},
time.Unix(1, 2),
),
}); err != nil {
t.Fatalf(err.Error()) t.Fatalf(err.Error())
} }
@ -157,27 +170,34 @@ func TestEngine_DeleteBucket(t *testing.T) {
defer engine.Close() defer engine.Close()
engine.MustOpen() engine.MustOpen()
pt := models.MustNewPoint( orgID, _ := influxdb.IDFromString("3131313131313131")
"cpu", bucketID, _ := influxdb.IDFromString("8888888888888888")
models.NewTags(map[string]string{"host": "server"}),
err := engine.Engine.WritePoints(context.TODO(), []models.Point{models.MustNewPoint(
tsdb.EncodeNameString(engine.org, engine.bucket),
models.NewTags(map[string]string{"_f": "value", "_m": "cpu", "host": "server"}),
map[string]interface{}{"value": 1.0}, map[string]interface{}{"value": 1.0},
time.Unix(1, 2), time.Unix(1, 2),
) )})
err := engine.Write1xPoints([]models.Point{pt})
if err != nil { if err != nil {
t.Fatalf(err.Error()) t.Fatalf(err.Error())
} }
pt = models.MustNewPoint(
"cpu",
models.NewTags(map[string]string{"host": "server"}),
map[string]interface{}{"value": 1.0, "value2": 2.0},
time.Unix(1, 3),
)
// Same org, different bucket. // Same org, different bucket.
err = engine.Write1xPointsWithOrgBucket([]models.Point{pt}, "3131313131313131", "8888888888888888") err = engine.Engine.WritePoints(context.TODO(), []models.Point{
models.MustNewPoint(
tsdb.EncodeNameString(*orgID, *bucketID),
models.NewTags(map[string]string{"_f": "value", "_m": "cpu", "host": "server"}),
map[string]interface{}{"value": 1.0},
time.Unix(1, 3),
),
models.MustNewPoint(
tsdb.EncodeNameString(*orgID, *bucketID),
models.NewTags(map[string]string{"_f": "value2", "_m": "cpu", "host": "server"}),
map[string]interface{}{"value2": 2.0},
time.Unix(1, 3),
),
})
if err != nil { if err != nil {
t.Fatalf(err.Error()) t.Fatalf(err.Error())
} }
@ -228,7 +248,7 @@ func TestEngineClose_RemoveIndex(t *testing.T) {
time.Unix(1, 2), time.Unix(1, 2),
) )
err := engine.Write1xPoints([]models.Point{pt}) err := engine.Engine.WritePoints(context.TODO(), []models.Point{pt})
if err != nil { if err != nil {
t.Fatalf(err.Error()) t.Fatalf(err.Error())
} }
@ -261,7 +281,7 @@ func TestEngine_WALDisabled(t *testing.T) {
time.Unix(1, 2), time.Unix(1, 2),
) )
if err := engine.Write1xPoints([]models.Point{pt}); err != nil { if err := engine.Engine.WritePoints(context.TODO(), []models.Point{pt}); err != nil {
t.Fatal(err) t.Fatal(err)
} }
} }
@ -282,7 +302,7 @@ func BenchmarkDeleteBucket(b *testing.B) {
) )
} }
if err := engine.Write1xPoints(points); err != nil { if err := engine.Engine.WritePoints(context.TODO(), points); err != nil {
panic(err) panic(err)
} }
} }
@ -352,6 +372,7 @@ func (e *Engine) MustOpen() {
} }
} }
/*
// Write1xPoints converts old style points into the new 2.0 engine format. // Write1xPoints converts old style points into the new 2.0 engine format.
// This allows us to use the old `models` package helper functions and still write // This allows us to use the old `models` package helper functions and still write
// the points in the correct format. // the points in the correct format.
@ -381,6 +402,7 @@ func (e *Engine) Write1xPointsWithOrgBucket(pts []models.Point, org, bucket stri
} }
return e.Engine.WritePoints(context.TODO(), points) return e.Engine.WritePoints(context.TODO(), points)
} }
*/
// Close closes the engine and removes all temporary data. // Close closes the engine and removes all temporary data.
func (e *Engine) Close() error { func (e *Engine) Close() error {

View File

@ -33,6 +33,12 @@ func EncodeName(org, bucket platform.ID) [16]byte {
return nameBytes return nameBytes
} }
// EncodeNameString converts org/bucket pairs to the tsdb internal serialization
func EncodeNameString(org, bucket platform.ID) string {
name := EncodeName(org, bucket)
return string(name[:])
}
// ExplodePoints creates a list of points that only contains one field per point. It also // ExplodePoints creates a list of points that only contains one field per point. It also
// moves the measurement to a tag, and changes the measurement to be the provided argument. // moves the measurement to a tag, and changes the measurement to be the provided argument.
func ExplodePoints(org, bucket platform.ID, points []models.Point) ([]models.Point, error) { func ExplodePoints(org, bucket platform.ID, points []models.Point) ([]models.Point, error) {

View File

@ -2,13 +2,9 @@ package tsdb_test
import ( import (
"fmt" "fmt"
"reflect"
"sort"
"testing" "testing"
"github.com/google/go-cmp/cmp"
platform "github.com/influxdata/influxdb" platform "github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/tsdb" "github.com/influxdata/influxdb/tsdb"
) )
@ -44,44 +40,3 @@ func TestNames(t *testing.T) {
}) })
} }
} }
func TestExplodePoints(t *testing.T) {
points, err := models.ParsePointsString(`
cpu,t1=a,t2=q f1=5,f2="f" 9
mem,t1=b,t2=w f1=6,f2="g",f3=true 8
cpu,t3=e,t1=c f3=7,f4="h" 7
mem,t1=d,t2=r,t4=g f1=8,f2="i" 6
`)
if err != nil {
t.Fatal(err)
}
org := platform.ID(0x4F4F4F4F4F4F4F4F)
bucket := platform.ID(0x4242424242424242)
points, err = tsdb.ExplodePoints(org, bucket, points)
if err != nil {
t.Fatal(err)
}
var lines []string
for _, point := range points {
lines = append(lines, point.String())
}
sort.Strings(lines)
expected := []string{
`OOOOOOOOBBBBBBBB,_f=f1,_m=cpu,t1=a,t2=q f1=5 9`,
`OOOOOOOOBBBBBBBB,_f=f1,_m=mem,t1=b,t2=w f1=6 8`,
`OOOOOOOOBBBBBBBB,_f=f1,_m=mem,t1=d,t2=r,t4=g f1=8 6`,
`OOOOOOOOBBBBBBBB,_f=f2,_m=cpu,t1=a,t2=q f2="f" 9`,
`OOOOOOOOBBBBBBBB,_f=f2,_m=mem,t1=b,t2=w f2="g" 8`,
`OOOOOOOOBBBBBBBB,_f=f2,_m=mem,t1=d,t2=r,t4=g f2="i" 6`,
`OOOOOOOOBBBBBBBB,_f=f3,_m=cpu,t1=c,t3=e f3=7 7`,
`OOOOOOOOBBBBBBBB,_f=f3,_m=mem,t1=b,t2=w f3=true 8`,
`OOOOOOOOBBBBBBBB,_f=f4,_m=cpu,t1=c,t3=e f4="h" 7`,
}
if !reflect.DeepEqual(lines, expected) {
t.Fatal("bad output:\n", cmp.Diff(lines, expected))
}
}

View File

@ -185,7 +185,7 @@ func BenchmarkIndex_TagSets(b *testing.B) {
b.Fatal(err) b.Fatal(err)
} }
points, err := models.ParsePoints(data) points, err := models.ParsePoints(data, []byte("mm"))
if err != nil { if err != nil {
b.Fatal(err) b.Fatal(err)
} }
@ -268,7 +268,7 @@ func BenchmarkIndex_ConcurrentWriteQuery(b *testing.B) {
b.Fatal(err) b.Fatal(err)
} }
points, err := models.ParsePoints(data) points, err := models.ParsePoints(data, []byte("mm"))
if err != nil { if err != nil {
b.Fatal(err) b.Fatal(err)
} }

View File

@ -10,14 +10,14 @@ import (
func TestEngine_DeleteBucket(t *testing.T) { func TestEngine_DeleteBucket(t *testing.T) {
// Create a few points. // Create a few points.
p1 := MustParsePointString("cpu,host=0 value=1.1 6") p1 := MustParsePointString("cpu,host=0 value=1.1 6", "mm0")
p2 := MustParsePointString("cpu,host=A value=1.2 2") p2 := MustParsePointString("cpu,host=A value=1.2 2", "mm0")
p3 := MustParsePointString("cpu,host=A value=1.3 3") p3 := MustParsePointString("cpu,host=A value=1.3 3", "mm0")
p4 := MustParsePointString("cpu,host=B value=1.3 4") p4 := MustParsePointString("cpu,host=B value=1.3 4", "mm0")
p5 := MustParsePointString("cpu,host=B value=1.3 5") p5 := MustParsePointString("cpu,host=B value=1.3 5", "mm0")
p6 := MustParsePointString("cpu,host=C value=1.3 1") p6 := MustParsePointString("cpu,host=C value=1.3 1", "mm0")
p7 := MustParsePointString("mem,host=C value=1.3 1") p7 := MustParsePointString("mem,host=C value=1.3 1", "mm1")
p8 := MustParsePointString("disk,host=C value=1.3 1") p8 := MustParsePointString("disk,host=C value=1.3 1", "mm2")
e, err := NewEngine() e, err := NewEngine()
if err != nil { if err != nil {
@ -44,7 +44,7 @@ func TestEngine_DeleteBucket(t *testing.T) {
t.Fatalf("series count mismatch: exp %v, got %v", exp, got) t.Fatalf("series count mismatch: exp %v, got %v", exp, got)
} }
if err := e.DeleteBucketRange([]byte("cpu"), 0, 3); err != nil { if err := e.DeleteBucketRange([]byte("mm0"), 0, 3); err != nil {
t.Fatalf("failed to delete series: %v", err) t.Fatalf("failed to delete series: %v", err)
} }
@ -54,17 +54,17 @@ func TestEngine_DeleteBucket(t *testing.T) {
} }
exp := map[string]byte{ exp := map[string]byte{
"cpu,host=0#!~#value": 0, "mm0,_f=value,_m=cpu,host=0#!~#value": 0,
"cpu,host=B#!~#value": 0, "mm0,_f=value,_m=cpu,host=B#!~#value": 0,
"disk,host=C#!~#value": 0, "mm1,_f=value,_m=mem,host=C#!~#value": 0,
"mem,host=C#!~#value": 0, "mm2,_f=value,_m=disk,host=C#!~#value": 0,
} }
if !reflect.DeepEqual(keys, exp) { if !reflect.DeepEqual(keys, exp) {
t.Fatalf("unexpected series in file store: %v != %v", keys, exp) t.Fatalf("unexpected series in file store: %v != %v", keys, exp)
} }
// Check that the series still exists in the index // Check that the series still exists in the index
iter, err := e.index.MeasurementSeriesIDIterator([]byte("cpu")) iter, err := e.index.MeasurementSeriesIDIterator([]byte("mm0"))
if err != nil { if err != nil {
t.Fatalf("iterator error: %v", err) t.Fatalf("iterator error: %v", err)
} }
@ -80,17 +80,17 @@ func TestEngine_DeleteBucket(t *testing.T) {
// Lookup series. // Lookup series.
name, tags := e.sfile.Series(elem.SeriesID) name, tags := e.sfile.Series(elem.SeriesID)
if got, exp := name, []byte("cpu"); !bytes.Equal(got, exp) { if got, exp := name, []byte("mm0"); !bytes.Equal(got, exp) {
t.Fatalf("series mismatch: got %s, exp %s", got, exp) t.Fatalf("series mismatch: got %s, exp %s", got, exp)
} }
if !tags.Equal(models.NewTags(map[string]string{"host": "0"})) && !tags.Equal(models.NewTags(map[string]string{"host": "B"})) { if !tags.Equal(models.NewTags(map[string]string{"_f": "value", "_m": "cpu", "host": "0"})) && !tags.Equal(models.NewTags(map[string]string{"host": "B"})) {
t.Fatalf(`series mismatch: got %s, exp either "host=0" or "host=B"`, tags) t.Fatalf(`series mismatch: got %s, exp either "host=0" or "host=B"`, tags)
} }
iter.Close() iter.Close()
// Deleting remaining series should remove them from the series. // Deleting remaining series should remove them from the series.
if err := e.DeleteBucketRange([]byte("cpu"), 0, 9); err != nil { if err := e.DeleteBucketRange([]byte("mm0"), 0, 9); err != nil {
t.Fatalf("failed to delete series: %v", err) t.Fatalf("failed to delete series: %v", err)
} }
@ -100,14 +100,14 @@ func TestEngine_DeleteBucket(t *testing.T) {
} }
exp = map[string]byte{ exp = map[string]byte{
"disk,host=C#!~#value": 0, "mm1,_f=value,_m=mem,host=C#!~#value": 0,
"mem,host=C#!~#value": 0, "mm2,_f=value,_m=disk,host=C#!~#value": 0,
} }
if !reflect.DeepEqual(keys, exp) { if !reflect.DeepEqual(keys, exp) {
t.Fatalf("unexpected series in file store: %v != %v", keys, exp) t.Fatalf("unexpected series in file store: %v != %v", keys, exp)
} }
if iter, err = e.index.MeasurementSeriesIDIterator([]byte("cpu")); err != nil { if iter, err = e.index.MeasurementSeriesIDIterator([]byte("mm0")); err != nil {
t.Fatalf("iterator error: %v", err) t.Fatalf("iterator error: %v", err)
} }
if iter == nil { if iter == nil {

View File

@ -150,7 +150,7 @@ func TestEngine_ShouldCompactCache(t *testing.T) {
t.Fatal("nothing written to cache, so should not compact") t.Fatal("nothing written to cache, so should not compact")
} }
if err := e.WritePointsString("m,k=v f=3i"); err != nil { if err := e.WritePointsString("mm", "m,k=v f=3i"); err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -234,7 +234,7 @@ func BenchmarkEngine_WritePoints(b *testing.B) {
e := MustOpenEngine() e := MustOpenEngine()
pp := make([]models.Point, 0, sz) pp := make([]models.Point, 0, sz)
for i := 0; i < sz; i++ { for i := 0; i < sz; i++ {
p := MustParsePointString(fmt.Sprintf("cpu,host=%d value=1.2", i)) p := MustParsePointString(fmt.Sprintf("cpu,host=%d value=1.2", i), "mm")
pp = append(pp, p) pp = append(pp, p)
} }
@ -259,7 +259,7 @@ func BenchmarkEngine_WritePoints_Parallel(b *testing.B) {
cpus := runtime.GOMAXPROCS(0) cpus := runtime.GOMAXPROCS(0)
pp := make([]models.Point, 0, sz*cpus) pp := make([]models.Point, 0, sz*cpus)
for i := 0; i < sz*cpus; i++ { for i := 0; i < sz*cpus; i++ {
p := MustParsePointString(fmt.Sprintf("cpu,host=%d value=1.2,other=%di", i, i)) p := MustParsePointString(fmt.Sprintf("cpu,host=%d value=1.2,other=%di", i, i), "mm")
pp = append(pp, p) pp = append(pp, p)
} }
@ -420,8 +420,8 @@ func (e *Engine) AddSeries(name string, tags map[string]string) error {
// WritePointsString calls WritePointsString on the underlying engine, but also // WritePointsString calls WritePointsString on the underlying engine, but also
// adds the associated series to the index. // adds the associated series to the index.
func (e *Engine) WritePointsString(ptstr ...string) error { func (e *Engine) WritePointsString(mm string, ptstr ...string) error {
points, err := models.ParsePointsString(strings.Join(ptstr, "\n")) points, err := models.ParsePointsString(strings.Join(ptstr, "\n"), mm)
if err != nil { if err != nil {
return err return err
} }
@ -494,8 +494,8 @@ func (f *SeriesFile) Close() {
} }
// MustParsePointsString parses points from a string. Panic on error. // MustParsePointsString parses points from a string. Panic on error.
func MustParsePointsString(buf string) []models.Point { func MustParsePointsString(buf, mm string) []models.Point {
a, err := models.ParsePointsString(buf) a, err := models.ParsePointsString(buf, mm)
if err != nil { if err != nil {
panic(err) panic(err)
} }
@ -503,7 +503,7 @@ func MustParsePointsString(buf string) []models.Point {
} }
// MustParsePointString parses the first point from a string. Panic on error. // MustParsePointString parses the first point from a string. Panic on error.
func MustParsePointString(buf string) models.Point { return MustParsePointsString(buf)[0] } func MustParsePointString(buf, mm string) models.Point { return MustParsePointsString(buf, mm)[0] }
type mockPlanner struct{} type mockPlanner struct{}