influxdb/http/points/points_parser.go

130 lines
3.3 KiB
Go

package points
import (
"compress/gzip"
"context"
"errors"
"fmt"
"io"
"time"
io2 "github.com/influxdata/influxdb/v2/kit/io"
"github.com/influxdata/influxdb/v2/kit/platform"
errors2 "github.com/influxdata/influxdb/v2/kit/platform/errors"
"github.com/influxdata/influxdb/v2/kit/tracing"
"github.com/influxdata/influxdb/v2/models"
"github.com/opentracing/opentracing-go"
)
var (
// ErrMaxBatchSizeExceeded is returned when a points batch exceeds
// the defined upper limit in bytes. This pertains to the size of the
// batch after inflation from any compression (i.e. ungzipped).
ErrMaxBatchSizeExceeded = errors.New("points batch is too large")
)
const (
opPointsWriter = "http/pointsWriter"
msgUnableToReadData = "unable to read data"
)
// ParsedPoints contains the points parsed as well as the total number of bytes
// after decompression.
type ParsedPoints struct {
Points models.Points
RawSize int
}
// Parser parses batches of Points.
type Parser struct {
Precision string
//ParserOptions []models.ParserOption
}
// Parse parses the points from an io.ReadCloser for a specific Bucket.
func (pw *Parser) Parse(ctx context.Context, orgID, bucketID platform.ID, rc io.ReadCloser) (*ParsedPoints, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "write points")
defer span.Finish()
return pw.parsePoints(ctx, orgID, bucketID, rc)
}
func (pw *Parser) parsePoints(ctx context.Context, orgID, bucketID platform.ID, rc io.ReadCloser) (*ParsedPoints, error) {
data, err := readAll(ctx, rc)
if err != nil {
code := errors2.EInternal
if errors.Is(err, ErrMaxBatchSizeExceeded) {
code = errors2.ETooLarge
} else if errors.Is(err, gzip.ErrHeader) || errors.Is(err, gzip.ErrChecksum) {
code = errors2.EInvalid
}
return nil, &errors2.Error{
Code: code,
Op: opPointsWriter,
Msg: msgUnableToReadData,
Err: err,
}
}
span, _ := tracing.StartSpanFromContextWithOperationName(ctx, "encoding and parsing")
points, err := models.ParsePointsWithPrecision(data, time.Now().UTC(), pw.Precision)
span.LogKV("values_total", len(points))
span.Finish()
if err != nil {
tracing.LogError(span, fmt.Errorf("error parsing points: %v", err))
code := errors2.EInvalid
// TODO - backport these
// if errors.Is(err, models.ErrLimitMaxBytesExceeded) ||
// errors.Is(err, models.ErrLimitMaxLinesExceeded) ||
// errors.Is(err, models.ErrLimitMaxValuesExceeded) {
// code = influxdb.ETooLarge
// }
return nil, &errors2.Error{
Code: code,
Op: opPointsWriter,
Msg: "",
Err: err,
}
}
return &ParsedPoints{
Points: points,
RawSize: len(data),
}, nil
}
func readAll(ctx context.Context, rc io.ReadCloser) (data []byte, err error) {
defer func() {
if cerr := rc.Close(); cerr != nil && err == nil {
if errors.Is(cerr, io2.ErrReadLimitExceeded) {
cerr = ErrMaxBatchSizeExceeded
}
err = cerr
}
}()
span, _ := tracing.StartSpanFromContextWithOperationName(ctx, "read request body")
defer func() {
span.LogKV("request_bytes", len(data))
span.Finish()
}()
data, err = io.ReadAll(rc)
if err != nil {
return nil, err
}
return data, nil
}
// NewParser returns a new Parser
func NewParser(precision string /*parserOptions ...models.ParserOption*/) *Parser {
return &Parser{
Precision: precision,
//ParserOptions: parserOptions,
}
}