chore: Tidy up some documentation and naming.
parent
414aef261b
commit
22b569fcd8
|
@ -196,7 +196,7 @@ func (h *WriteHandler) handleWrite(w http.ResponseWriter, r *http.Request) {
|
|||
h.HandleHTTPError(ctx, err, sw)
|
||||
return
|
||||
}
|
||||
requestBytes = parsed.Bytes
|
||||
requestBytes = parsed.RawSize
|
||||
|
||||
if err := h.PointsWriter.WritePoints(ctx, parsed.Points); err != nil {
|
||||
h.HandleHTTPError(ctx, &influxdb.Error{
|
||||
|
@ -211,6 +211,8 @@ func (h *WriteHandler) handleWrite(w http.ResponseWriter, r *http.Request) {
|
|||
w.WriteHeader(http.StatusNoContent)
|
||||
}
|
||||
|
||||
// checkBucketWritePermissions checks an Authorizer for write permissions to a
|
||||
// specific Bucket.
|
||||
func checkBucketWritePermissions(auth influxdb.Authorizer, orgID, bucketID influxdb.ID) error {
|
||||
p, err := influxdb.NewPermissionAtID(bucketID, influxdb.WriteAction, influxdb.BucketsResourceType, orgID)
|
||||
if err != nil {
|
||||
|
@ -232,7 +234,9 @@ func checkBucketWritePermissions(auth influxdb.Authorizer, orgID, bucketID influ
|
|||
return nil
|
||||
}
|
||||
|
||||
func BatchReadCloser(rc io.ReadCloser, encoding string, maxBatchSizeBytes int64) (io.ReadCloser, error) {
|
||||
// PointBatchReadCloser (potentially) wraps an io.ReadCloser in Gzip
|
||||
// decompression and limits the reading to a specific number of bytes.
|
||||
func PointBatchReadCloser(rc io.ReadCloser, encoding string, maxBatchSizeBytes int64) (io.ReadCloser, error) {
|
||||
switch encoding {
|
||||
case "gzip", "x-gzip":
|
||||
var err error
|
||||
|
@ -247,25 +251,29 @@ func BatchReadCloser(rc io.ReadCloser, encoding string, maxBatchSizeBytes int64)
|
|||
return rc, nil
|
||||
}
|
||||
|
||||
// NewPointsParser returns a new PointsParser
|
||||
func NewPointsParser(parserOptions ...models.ParserOption) *PointsParser {
|
||||
return &PointsParser{
|
||||
ParserOptions: parserOptions,
|
||||
}
|
||||
}
|
||||
|
||||
// ParsedPoints contains the points parsed as well as the total number of bytes
|
||||
// after decompression.
|
||||
type ParsedPoints struct {
|
||||
Points models.Points
|
||||
Bytes int
|
||||
Points models.Points
|
||||
RawSize int
|
||||
}
|
||||
|
||||
// PointsParser parses batches of Points.
|
||||
type PointsParser struct {
|
||||
ParserOptions []models.ParserOption
|
||||
}
|
||||
|
||||
// ParsePoints parses the points from an io.ReadCloser for a specific Bucket.
|
||||
func (pw *PointsParser) ParsePoints(ctx context.Context, orgID, bucketID influxdb.ID, rc io.ReadCloser) (*ParsedPoints, error) {
|
||||
span, ctx := opentracing.StartSpanFromContext(ctx, "write points")
|
||||
defer span.Finish()
|
||||
|
||||
return pw.parsePoints(ctx, orgID, bucketID, rc)
|
||||
}
|
||||
|
||||
|
@ -321,8 +329,8 @@ func (pw *PointsParser) parsePoints(ctx context.Context, orgID, bucketID influxd
|
|||
}
|
||||
|
||||
return &ParsedPoints{
|
||||
Points: points,
|
||||
Bytes: requestBytes,
|
||||
Points: points,
|
||||
RawSize: requestBytes,
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
@ -351,6 +359,8 @@ func readAll(ctx context.Context, rc io.ReadCloser) (data []byte, err error) {
|
|||
return data, nil
|
||||
}
|
||||
|
||||
// writeRequest is a request object holding information about a batch of points
|
||||
// to be written to a Bucket.
|
||||
type writeRequest struct {
|
||||
Org string
|
||||
Bucket string
|
||||
|
@ -358,6 +368,8 @@ type writeRequest struct {
|
|||
Body io.ReadCloser
|
||||
}
|
||||
|
||||
// decodeWriteRequest extracts information from an http.Request object to
|
||||
// produce a writeRequest.
|
||||
func decodeWriteRequest(ctx context.Context, r *http.Request, maxBatchSizeBytes int64) (*writeRequest, error) {
|
||||
qp := r.URL.Query()
|
||||
precision := qp.Get("precision")
|
||||
|
@ -383,7 +395,7 @@ func decodeWriteRequest(ctx context.Context, r *http.Request, maxBatchSizeBytes
|
|||
}
|
||||
|
||||
encoding := r.Header.Get("Content-Encoding")
|
||||
body, err := BatchReadCloser(r.Body, encoding, maxBatchSizeBytes)
|
||||
body, err := PointBatchReadCloser(r.Body, encoding, maxBatchSizeBytes)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -430,7 +442,7 @@ func (s *WriteService) Write(ctx context.Context, orgID, bucketID influxdb.ID, r
|
|||
return err
|
||||
}
|
||||
|
||||
req, err := http.NewRequestWithContext(ctx, "POST", u.String(), r)
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodPost, u.String(), r)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue