409 lines
10 KiB
Go
409 lines
10 KiB
Go
package http
|
|
|
|
import (
|
|
"compress/gzip"
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
|
|
"github.com/influxdata/httprouter"
|
|
"github.com/influxdata/influxdb/v2"
|
|
pcontext "github.com/influxdata/influxdb/v2/context"
|
|
"github.com/influxdata/influxdb/v2/http/metric"
|
|
"github.com/influxdata/influxdb/v2/http/points"
|
|
"github.com/influxdata/influxdb/v2/kit/tracing"
|
|
kithttp "github.com/influxdata/influxdb/v2/kit/transport/http"
|
|
"github.com/influxdata/influxdb/v2/models"
|
|
"github.com/influxdata/influxdb/v2/storage"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
// WriteBackend is all services and associated parameters required to construct
|
|
// the WriteHandler.
|
|
type WriteBackend struct {
|
|
influxdb.HTTPErrorHandler
|
|
log *zap.Logger
|
|
WriteEventRecorder metric.EventRecorder
|
|
|
|
PointsWriter storage.PointsWriter
|
|
BucketService influxdb.BucketService
|
|
OrganizationService influxdb.OrganizationService
|
|
}
|
|
|
|
// NewWriteBackend returns a new instance of WriteBackend.
|
|
func NewWriteBackend(log *zap.Logger, b *APIBackend) *WriteBackend {
|
|
return &WriteBackend{
|
|
HTTPErrorHandler: b.HTTPErrorHandler,
|
|
log: log,
|
|
WriteEventRecorder: b.WriteEventRecorder,
|
|
|
|
PointsWriter: b.PointsWriter,
|
|
BucketService: b.BucketService,
|
|
OrganizationService: b.OrganizationService,
|
|
}
|
|
}
|
|
|
|
// WriteHandler receives line protocol and sends to a publish function.
|
|
type WriteHandler struct {
|
|
influxdb.HTTPErrorHandler
|
|
BucketService influxdb.BucketService
|
|
OrganizationService influxdb.OrganizationService
|
|
PointsWriter storage.PointsWriter
|
|
EventRecorder metric.EventRecorder
|
|
|
|
router *httprouter.Router
|
|
log *zap.Logger
|
|
maxBatchSizeBytes int64
|
|
// parserOptions []models.ParserOption
|
|
}
|
|
|
|
// WriteHandlerOption is a functional option for a *WriteHandler
|
|
type WriteHandlerOption func(*WriteHandler)
|
|
|
|
// WithMaxBatchSizeBytes configures the maximum size for a
|
|
// (decompressed) points batch allowed by the write handler
|
|
func WithMaxBatchSizeBytes(n int64) WriteHandlerOption {
|
|
return func(w *WriteHandler) {
|
|
w.maxBatchSizeBytes = n
|
|
}
|
|
}
|
|
|
|
//func WithParserOptions(opts ...models.ParserOption) WriteHandlerOption {
|
|
// return func(w *WriteHandler) {
|
|
// w.parserOptions = opts
|
|
// }
|
|
//}
|
|
|
|
// Prefix provides the route prefix.
|
|
func (*WriteHandler) Prefix() string {
|
|
return prefixWrite
|
|
}
|
|
|
|
const (
|
|
prefixWrite = "/api/v2/write"
|
|
msgInvalidGzipHeader = "gzipped HTTP body contains an invalid header"
|
|
msgInvalidPrecision = "invalid precision; valid precision units are ns, us, ms, and s"
|
|
|
|
opWriteHandler = "http/writeHandler"
|
|
)
|
|
|
|
// NewWriteHandler creates a new handler at /api/v2/write to receive line protocol.
|
|
func NewWriteHandler(log *zap.Logger, b *WriteBackend, opts ...WriteHandlerOption) *WriteHandler {
|
|
h := &WriteHandler{
|
|
HTTPErrorHandler: b.HTTPErrorHandler,
|
|
PointsWriter: b.PointsWriter,
|
|
BucketService: b.BucketService,
|
|
OrganizationService: b.OrganizationService,
|
|
EventRecorder: b.WriteEventRecorder,
|
|
|
|
router: NewRouter(b.HTTPErrorHandler),
|
|
log: log,
|
|
}
|
|
|
|
for _, opt := range opts {
|
|
opt(h)
|
|
}
|
|
|
|
h.router.HandlerFunc(http.MethodPost, prefixWrite, h.handleWrite)
|
|
return h
|
|
}
|
|
|
|
func (h *WriteHandler) findBucket(ctx context.Context, orgID influxdb.ID, bucket string) (*influxdb.Bucket, error) {
|
|
if id, err := influxdb.IDFromString(bucket); err == nil {
|
|
b, err := h.BucketService.FindBucket(ctx, influxdb.BucketFilter{
|
|
OrganizationID: &orgID,
|
|
ID: id,
|
|
})
|
|
if err != nil && influxdb.ErrorCode(err) != influxdb.ENotFound {
|
|
return nil, err
|
|
} else if err == nil {
|
|
return b, err
|
|
}
|
|
}
|
|
|
|
return h.BucketService.FindBucket(ctx, influxdb.BucketFilter{
|
|
OrganizationID: &orgID,
|
|
Name: &bucket,
|
|
})
|
|
}
|
|
|
|
func (h *WriteHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|
h.router.ServeHTTP(w, r)
|
|
}
|
|
|
|
func (h *WriteHandler) handleWrite(w http.ResponseWriter, r *http.Request) {
|
|
span, r := tracing.ExtractFromHTTPRequest(r, "WriteHandler")
|
|
defer span.Finish()
|
|
|
|
ctx := r.Context()
|
|
auth, err := pcontext.GetAuthorizer(ctx)
|
|
if err != nil {
|
|
h.HandleHTTPError(ctx, err, w)
|
|
return
|
|
}
|
|
|
|
req, err := decodeWriteRequest(ctx, r, h.maxBatchSizeBytes)
|
|
if err != nil {
|
|
h.HandleHTTPError(ctx, err, w)
|
|
return
|
|
}
|
|
|
|
org, err := queryOrganization(ctx, r, h.OrganizationService)
|
|
if err != nil {
|
|
h.HandleHTTPError(ctx, err, w)
|
|
return
|
|
}
|
|
span.LogKV("org_id", org.ID)
|
|
|
|
sw := kithttp.NewStatusResponseWriter(w)
|
|
recorder := NewWriteUsageRecorder(sw, h.EventRecorder)
|
|
var requestBytes int
|
|
defer func() {
|
|
// Close around the requestBytes variable to placate the linter.
|
|
recorder.Record(ctx, requestBytes, org.ID, r.URL.Path)
|
|
}()
|
|
|
|
bucket, err := h.findBucket(ctx, org.ID, req.Bucket)
|
|
if err != nil {
|
|
h.HandleHTTPError(ctx, err, sw)
|
|
return
|
|
}
|
|
span.LogKV("bucket_id", bucket.ID)
|
|
|
|
if err := checkBucketWritePermissions(auth, org.ID, bucket.ID); err != nil {
|
|
h.HandleHTTPError(ctx, err, sw)
|
|
return
|
|
}
|
|
|
|
// TODO: Backport?
|
|
//opts := append([]models.ParserOption{}, h.parserOptions...)
|
|
//opts = append(opts, models.WithParserPrecision(req.Precision))
|
|
parsed, err := points.NewParser(req.Precision).Parse(ctx, org.ID, bucket.ID, req.Body)
|
|
if err != nil {
|
|
h.HandleHTTPError(ctx, err, sw)
|
|
return
|
|
}
|
|
requestBytes = parsed.RawSize
|
|
|
|
if err := h.PointsWriter.WritePoints(ctx, org.ID, bucket.ID, parsed.Points); err != nil {
|
|
h.HandleHTTPError(ctx, &influxdb.Error{
|
|
Code: influxdb.EInternal,
|
|
Op: opWriteHandler,
|
|
Msg: "unexpected error writing points to database",
|
|
Err: err,
|
|
}, sw)
|
|
return
|
|
}
|
|
|
|
sw.WriteHeader(http.StatusNoContent)
|
|
}
|
|
|
|
// checkBucketWritePermissions checks an Authorizer for write permissions to a
|
|
// specific Bucket.
|
|
func checkBucketWritePermissions(auth influxdb.Authorizer, orgID, bucketID influxdb.ID) error {
|
|
p, err := influxdb.NewPermissionAtID(bucketID, influxdb.WriteAction, influxdb.BucketsResourceType, orgID)
|
|
if err != nil {
|
|
return &influxdb.Error{
|
|
Code: influxdb.EInternal,
|
|
Op: opWriteHandler,
|
|
Msg: fmt.Sprintf("unable to create permission for bucket: %v", err),
|
|
Err: err,
|
|
}
|
|
}
|
|
if pset, err := auth.PermissionSet(); err != nil || !pset.Allowed(*p) {
|
|
return &influxdb.Error{
|
|
Code: influxdb.EForbidden,
|
|
Op: opWriteHandler,
|
|
Msg: "insufficient permissions for write",
|
|
Err: err,
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// writeRequest is a request object holding information about a batch of points
|
|
// to be written to a Bucket.
|
|
type writeRequest struct {
|
|
Org string
|
|
Bucket string
|
|
Precision string
|
|
Body io.ReadCloser
|
|
}
|
|
|
|
// decodeWriteRequest extracts information from an http.Request object to
|
|
// produce a writeRequest.
|
|
func decodeWriteRequest(ctx context.Context, r *http.Request, maxBatchSizeBytes int64) (*writeRequest, error) {
|
|
qp := r.URL.Query()
|
|
precision := qp.Get("precision")
|
|
if precision == "" {
|
|
precision = "ns"
|
|
}
|
|
|
|
if !models.ValidPrecision(precision) {
|
|
return nil, &influxdb.Error{
|
|
Code: influxdb.EInvalid,
|
|
Op: "http/newWriteRequest",
|
|
Msg: msgInvalidPrecision,
|
|
}
|
|
}
|
|
|
|
bucket := qp.Get("bucket")
|
|
if bucket == "" {
|
|
return nil, &influxdb.Error{
|
|
Code: influxdb.ENotFound,
|
|
Op: "http/newWriteRequest",
|
|
Msg: "bucket not found",
|
|
}
|
|
}
|
|
|
|
encoding := r.Header.Get("Content-Encoding")
|
|
body, err := points.BatchReadCloser(r.Body, encoding, maxBatchSizeBytes)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &writeRequest{
|
|
Bucket: qp.Get("bucket"),
|
|
Org: qp.Get("org"),
|
|
Precision: precision,
|
|
Body: body,
|
|
}, nil
|
|
}
|
|
|
|
// WriteService sends data over HTTP to influxdb via line protocol.
|
|
type WriteService struct {
|
|
Addr string
|
|
Token string
|
|
Precision string
|
|
InsecureSkipVerify bool
|
|
}
|
|
|
|
var _ influxdb.WriteService = (*WriteService)(nil)
|
|
|
|
func (s *WriteService) Write(ctx context.Context, orgID, bucketID influxdb.ID, r io.Reader) error {
|
|
precision := s.Precision
|
|
if precision == "" {
|
|
precision = "ns"
|
|
}
|
|
|
|
if !models.ValidPrecision(precision) {
|
|
return &influxdb.Error{
|
|
Code: influxdb.EInvalid,
|
|
Op: "http/Write",
|
|
Msg: msgInvalidPrecision,
|
|
}
|
|
}
|
|
|
|
u, err := NewURL(s.Addr, prefixWrite)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
r, err = compressWithGzip(r)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
req, err := http.NewRequestWithContext(ctx, http.MethodPost, u.String(), r)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
req.Header.Set("Content-Type", "text/plain; charset=utf-8")
|
|
req.Header.Set("Content-Encoding", "gzip")
|
|
SetToken(s.Token, req)
|
|
|
|
org, err := orgID.Encode()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
bucket, err := bucketID.Encode()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
params := req.URL.Query()
|
|
params.Set("org", string(org))
|
|
params.Set("bucket", string(bucket))
|
|
params.Set("precision", string(precision))
|
|
req.URL.RawQuery = params.Encode()
|
|
|
|
hc := NewClient(u.Scheme, s.InsecureSkipVerify)
|
|
|
|
resp, err := hc.Do(req)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
return CheckError(resp)
|
|
}
|
|
|
|
func compressWithGzip(data io.Reader) (io.Reader, error) {
|
|
pr, pw := io.Pipe()
|
|
gw := gzip.NewWriter(pw)
|
|
var err error
|
|
|
|
go func() {
|
|
_, err = io.Copy(gw, data)
|
|
gw.Close()
|
|
pw.Close()
|
|
}()
|
|
|
|
return pr, err
|
|
}
|
|
|
|
// WriteTo writes to the bucket matching the filter.
|
|
func (s *WriteService) WriteTo(ctx context.Context, filter influxdb.BucketFilter, r io.Reader) error {
|
|
precision := s.Precision
|
|
if precision == "" {
|
|
precision = "ns"
|
|
}
|
|
|
|
if !models.ValidPrecision(precision) {
|
|
return &influxdb.Error{
|
|
Code: influxdb.EInvalid,
|
|
Op: "http/Write",
|
|
Msg: msgInvalidPrecision,
|
|
}
|
|
}
|
|
|
|
u, err := NewURL(s.Addr, prefixWrite)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
r, err = compressWithGzip(r)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
req, err := http.NewRequestWithContext(ctx, http.MethodPost, u.String(), r)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
req.Header.Set("Content-Type", "text/plain; charset=utf-8")
|
|
req.Header.Set("Content-Encoding", "gzip")
|
|
SetToken(s.Token, req)
|
|
|
|
params := req.URL.Query()
|
|
for key, param := range filter.QueryParams() {
|
|
params[key] = param
|
|
}
|
|
params.Set("precision", string(precision))
|
|
req.URL.RawQuery = params.Encode()
|
|
|
|
hc := NewClient(u.Scheme, s.InsecureSkipVerify)
|
|
|
|
resp, err := hc.Do(req)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
return CheckError(resp)
|
|
}
|