diff --git a/http/write_handler.go b/http/write_handler.go index bce06180c0..69b1fe985d 100644 --- a/http/write_handler.go +++ b/http/write_handler.go @@ -196,7 +196,7 @@ func (h *WriteHandler) handleWrite(w http.ResponseWriter, r *http.Request) { h.HandleHTTPError(ctx, err, sw) return } - requestBytes = parsed.Bytes + requestBytes = parsed.RawSize if err := h.PointsWriter.WritePoints(ctx, parsed.Points); err != nil { h.HandleHTTPError(ctx, &influxdb.Error{ @@ -211,6 +211,8 @@ func (h *WriteHandler) handleWrite(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusNoContent) } +// checkBucketWritePermissions checks an Authorizer for write permissions to a +// specific Bucket. func checkBucketWritePermissions(auth influxdb.Authorizer, orgID, bucketID influxdb.ID) error { p, err := influxdb.NewPermissionAtID(bucketID, influxdb.WriteAction, influxdb.BucketsResourceType, orgID) if err != nil { @@ -232,7 +234,9 @@ func checkBucketWritePermissions(auth influxdb.Authorizer, orgID, bucketID influ return nil } -func BatchReadCloser(rc io.ReadCloser, encoding string, maxBatchSizeBytes int64) (io.ReadCloser, error) { +// PointBatchReadCloser (potentially) wraps an io.ReadCloser in Gzip +// decompression and limits the reading to a specific number of bytes. +func PointBatchReadCloser(rc io.ReadCloser, encoding string, maxBatchSizeBytes int64) (io.ReadCloser, error) { switch encoding { case "gzip", "x-gzip": var err error @@ -247,25 +251,29 @@ func BatchReadCloser(rc io.ReadCloser, encoding string, maxBatchSizeBytes int64) return rc, nil } +// NewPointsParser returns a new PointsParser func NewPointsParser(parserOptions ...models.ParserOption) *PointsParser { return &PointsParser{ ParserOptions: parserOptions, } } +// ParsedPoints contains the points parsed as well as the total number of bytes +// after decompression. type ParsedPoints struct { - Points models.Points - Bytes int + Points models.Points + RawSize int } +// PointsParser parses batches of Points. type PointsParser struct { ParserOptions []models.ParserOption } +// ParsePoints parses the points from an io.ReadCloser for a specific Bucket. func (pw *PointsParser) ParsePoints(ctx context.Context, orgID, bucketID influxdb.ID, rc io.ReadCloser) (*ParsedPoints, error) { span, ctx := opentracing.StartSpanFromContext(ctx, "write points") defer span.Finish() - return pw.parsePoints(ctx, orgID, bucketID, rc) } @@ -321,8 +329,8 @@ func (pw *PointsParser) parsePoints(ctx context.Context, orgID, bucketID influxd } return &ParsedPoints{ - Points: points, - Bytes: requestBytes, + Points: points, + RawSize: requestBytes, }, nil } @@ -351,6 +359,8 @@ func readAll(ctx context.Context, rc io.ReadCloser) (data []byte, err error) { return data, nil } +// writeRequest is a request object holding information about a batch of points +// to be written to a Bucket. type writeRequest struct { Org string Bucket string @@ -358,6 +368,8 @@ type writeRequest struct { Body io.ReadCloser } +// decodeWriteRequest extracts information from an http.Request object to +// produce a writeRequest. func decodeWriteRequest(ctx context.Context, r *http.Request, maxBatchSizeBytes int64) (*writeRequest, error) { qp := r.URL.Query() precision := qp.Get("precision") @@ -383,7 +395,7 @@ func decodeWriteRequest(ctx context.Context, r *http.Request, maxBatchSizeBytes } encoding := r.Header.Get("Content-Encoding") - body, err := BatchReadCloser(r.Body, encoding, maxBatchSizeBytes) + body, err := PointBatchReadCloser(r.Body, encoding, maxBatchSizeBytes) if err != nil { return nil, err } @@ -430,7 +442,7 @@ func (s *WriteService) Write(ctx context.Context, orgID, bucketID influxdb.ID, r return err } - req, err := http.NewRequestWithContext(ctx, "POST", u.String(), r) + req, err := http.NewRequestWithContext(ctx, http.MethodPost, u.String(), r) if err != nil { return err }