diff --git a/http/write_handler.go b/http/write_handler.go index 3087edbf43..663105ae4c 100644 --- a/http/write_handler.go +++ b/http/write_handler.go @@ -4,6 +4,7 @@ import ( "compress/gzip" "context" "fmt" + "io" "io/ioutil" "net/http" @@ -30,6 +31,10 @@ type WriteHandler struct { PointsWriter storage.PointsWriter } +const ( + writePath = "/api/v2/write" +) + // NewWriteHandler creates a new handler at /api/v2/write to receive line protocol. func NewWriteHandler(writer storage.PointsWriter) *WriteHandler { h := &WriteHandler{ @@ -38,7 +43,7 @@ func NewWriteHandler(writer storage.PointsWriter) *WriteHandler { PointsWriter: writer, } - h.HandlerFunc("POST", "/api/v2/write", h.handleWrite) + h.HandlerFunc("POST", writePath, h.handleWrite) return h } @@ -178,3 +183,61 @@ type postWriteRequest struct { Org string Bucket string } + +// WriteService sends data over HTTP to influxdb via line protocol. +type WriteService struct { + Addr string + Token string + InsecureSkipVerify bool +} + +var _ platform.WriteService = (*WriteService)(nil) + +func (s *WriteService) Write(ctx context.Context, org, bucket platform.ID, r io.Reader) error { + u, err := newURL(s.Addr, writePath) + if err != nil { + return err + } + + r, err = compressWithGzip(r) + if err != nil { + return err + } + + req, err := http.NewRequest("POST", u.String(), r) + if err != nil { + return err + } + + req.Header.Set("Content-Type", "text/plain; charset=utf-8") + req.Header.Set("Content-Encoding", "gzip") + SetToken(s.Token, req) + + params := req.URL.Query() + params.Set("org", string(org.Encode())) + params.Set("bucket", string(bucket.Encode())) + req.URL.RawQuery = params.Encode() + + hc := newClient(u.Scheme, s.InsecureSkipVerify) + + resp, err := hc.Do(req) + if err != nil { + return err + } + + return CheckError(resp) +} + +func compressWithGzip(data io.Reader) (io.Reader, error) { + pr, pw := io.Pipe() + gw := gzip.NewWriter(pw) + var err error + + go func() { + _, err = io.Copy(gw, data) + gw.Close() + pw.Close() + }() + + return pr, err +} diff --git a/http/write_handler_test.go b/http/write_handler_test.go new file mode 100644 index 0000000000..65022135f9 --- /dev/null +++ b/http/write_handler_test.go @@ -0,0 +1,51 @@ +package http + +import ( + "context" + "io" + "net/http" + "net/http/httptest" + "testing" + + "github.com/influxdata/platform" + "github.com/influxdata/platform/models" +) + +func TestWriteService_Write(t *testing.T) { + type fields struct { + Token string + InsecureSkipVerify bool + } + type args struct { + ctx context.Context + org platform.ID + bucket platform.ID + r io.Reader + } + tests := []struct { + name string + fields fields + args args + status int + want []models.Point + wantErr bool + }{ + // TODO: Add test cases. + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + r.URL.Query().Get(OrgID) + w.WriteHeader(tt.status) + })) + s := &WriteService{ + Addr: ts.URL, + Token: tt.fields.Token, + InsecureSkipVerify: tt.fields.InsecureSkipVerify, + } + if err := s.Write(tt.args.ctx, tt.args.org, tt.args.bucket, tt.args.r); (err != nil) != tt.wantErr { + t.Errorf("WriteService.Write() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} diff --git a/mock/points_writer.go b/mock/points_writer.go new file mode 100644 index 0000000000..74cc960c35 --- /dev/null +++ b/mock/points_writer.go @@ -0,0 +1,49 @@ +package mock + +import ( + "context" + "sync" + + "github.com/influxdata/platform/models" +) + +// PointsWriter is a mock structure for writing points. +type PointsWriter struct { + mu sync.RWMutex + CTX context.Context + Points []models.Point + Err error +} + +// ForceError is for error testing, if WritePoints is called after ForceError, it will return that error. +func (p *PointsWriter) ForceError(err error) { + p.mu.Lock() + p.Err = err + p.mu.Unlock() +} + +// WritePoints writes points to the PointsWriter that will be exposed in the Values. +func (p *PointsWriter) WritePoints(ctx context.Context, points []models.Point) error { + p.mu.Lock() + p.CTX = ctx + p.Points = append(p.Points, points...) + err := p.Err + p.mu.Unlock() + return err +} + +// Next returns the next (oldest) batch of values. +func (p *PointsWriter) Next() models.Point { + var points models.Point + p.mu.RLock() + if len(p.Points) == 0 { + p.mu.RUnlock() + return points + } + p.mu.RUnlock() + + p.mu.Lock() + defer p.mu.Unlock() + points, p.Points = p.Points[0], p.Points[1:] + return points +} diff --git a/write.go b/write.go new file mode 100644 index 0000000000..8782f5964f --- /dev/null +++ b/write.go @@ -0,0 +1,11 @@ +package platform + +import ( + "context" + "io" +) + +// WriteService writes data read from the reader. +type WriteService interface { + Write(ctx context.Context, org, bucket ID, r io.Reader) error +}