chore: don't depend on details of what platform.ID is
It may become a uint64 in the future, for example. This does mean that we have to call Decode on some data that we just Encoded, but we can fix that later.pull/10616/head
parent
b867eabf65
commit
260ed3eb13
|
@ -33,8 +33,8 @@ type WriteHandler struct {
|
|||
// NewWriteHandler creates a new handler at /api/v2/write to receive line protocol.
|
||||
func NewWriteHandler(writer storage.PointsWriter) *WriteHandler {
|
||||
h := &WriteHandler{
|
||||
Router: httprouter.New(),
|
||||
Logger: zap.NewNop(),
|
||||
Router: httprouter.New(),
|
||||
Logger: zap.NewNop(),
|
||||
PointsWriter: writer,
|
||||
}
|
||||
|
||||
|
@ -136,32 +136,31 @@ func (h *WriteHandler) handleWrite(w http.ResponseWriter, r *http.Request) {
|
|||
// TODO(jeff): we should be publishing with the org and bucket instead of
|
||||
// parsing, rewriting, and publishing, but the interface isn't quite there yet.
|
||||
// be sure to remove this when it is there!
|
||||
data, err := ioutil.ReadAll(in)
|
||||
if err != nil {
|
||||
logger.Info("Error reading body", zap.Error(err))
|
||||
EncodeError(ctx, err, w)
|
||||
return
|
||||
}
|
||||
data, err := ioutil.ReadAll(in)
|
||||
if err != nil {
|
||||
logger.Info("Error reading body", zap.Error(err))
|
||||
EncodeError(ctx, err, w)
|
||||
return
|
||||
}
|
||||
|
||||
points, err := models.ParsePoints(data)
|
||||
if err != nil {
|
||||
logger.Info("Error parsing points", zap.Error(err))
|
||||
EncodeError(ctx, err, w)
|
||||
return
|
||||
}
|
||||
points, err := models.ParsePoints(data)
|
||||
if err != nil {
|
||||
logger.Info("Error parsing points", zap.Error(err))
|
||||
EncodeError(ctx, err, w)
|
||||
return
|
||||
}
|
||||
|
||||
exploded, err := tsdb.ExplodePoints([]byte(org.ID), []byte(bucket.ID), points)
|
||||
if err != nil {
|
||||
logger.Info("Error exploding points", zap.Error(err))
|
||||
EncodeError(ctx, err, w)
|
||||
return
|
||||
}
|
||||
|
||||
if err := h.PointsWriter.WritePoints(exploded); err != nil {
|
||||
EncodeError(ctx, errors.BadRequestError(err.Error()), w)
|
||||
return
|
||||
}
|
||||
exploded, err := tsdb.ExplodePoints(org.ID, bucket.ID, points)
|
||||
if err != nil {
|
||||
logger.Info("Error exploding points", zap.Error(err))
|
||||
EncodeError(ctx, err, w)
|
||||
return
|
||||
}
|
||||
|
||||
if err := h.PointsWriter.WritePoints(exploded); err != nil {
|
||||
EncodeError(ctx, errors.BadRequestError(err.Error()), w)
|
||||
return
|
||||
}
|
||||
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
}
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/platform"
|
||||
"github.com/influxdata/platform/models"
|
||||
"github.com/influxdata/platform/storage"
|
||||
"github.com/influxdata/platform/tsdb"
|
||||
|
@ -215,7 +216,9 @@ func (e *Engine) MustOpen() {
|
|||
// This allows us to use the old `models` package helper functions and still write
|
||||
// the points in the correct format.
|
||||
func (e *Engine) Write1xPoints(pts []models.Point) error {
|
||||
points, err := tsdb.ExplodePoints([]byte("11111111"), []byte("22222222"), pts)
|
||||
org, _ := platform.IDFromString("1111111111111111")
|
||||
bucket, _ := platform.IDFromString("2222222222222222")
|
||||
points, err := tsdb.ExplodePoints(*org, *bucket, pts)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -1,8 +1,9 @@
|
|||
package tsdb
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"encoding/hex"
|
||||
|
||||
"github.com/influxdata/platform"
|
||||
"github.com/influxdata/platform/models"
|
||||
)
|
||||
|
||||
|
@ -19,13 +20,22 @@ var (
|
|||
|
||||
// ExplodePoints creates a list of points that only contains one field per point. It also
|
||||
// moves the measurement to a tag, and changes the measurement to be the provided argument.
|
||||
func ExplodePoints(org, bucket []byte, points []models.Point) ([]models.Point, error) {
|
||||
if len(org) != 8 || len(bucket) != 8 {
|
||||
return nil, errors.New("invalid org/bucket")
|
||||
}
|
||||
|
||||
func ExplodePoints(org, bucket platform.ID, points []models.Point) ([]models.Point, error) {
|
||||
out := make([]models.Point, 0, len(points))
|
||||
name := string(org) + string(bucket)
|
||||
|
||||
// TODO(jeff): We should add a RawEncode() method or something to the platform.ID type
|
||||
// or we should use hex encoded measurement names. Either way, we shouldn't be doing a
|
||||
// decode of the encode here, and we don't want to depend on details of how the ID type
|
||||
// is represented.
|
||||
|
||||
var nameBytes [16]byte
|
||||
if _, err := hex.Decode(nameBytes[0:8], org.Encode()); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if _, err := hex.Decode(nameBytes[8:16], bucket.Encode()); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
name := string(nameBytes[:])
|
||||
|
||||
var tags models.Tags
|
||||
for _, pt := range points {
|
||||
|
|
Loading…
Reference in New Issue