feat(http): add http write client
parent
91dbd48258
commit
6f4bd54afc
|
@ -4,6 +4,7 @@ import (
|
|||
"compress/gzip"
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
|
||||
|
@ -30,6 +31,10 @@ type WriteHandler struct {
|
|||
PointsWriter storage.PointsWriter
|
||||
}
|
||||
|
||||
const (
|
||||
writePath = "/api/v2/write"
|
||||
)
|
||||
|
||||
// NewWriteHandler creates a new handler at /api/v2/write to receive line protocol.
|
||||
func NewWriteHandler(writer storage.PointsWriter) *WriteHandler {
|
||||
h := &WriteHandler{
|
||||
|
@ -38,7 +43,7 @@ func NewWriteHandler(writer storage.PointsWriter) *WriteHandler {
|
|||
PointsWriter: writer,
|
||||
}
|
||||
|
||||
h.HandlerFunc("POST", "/api/v2/write", h.handleWrite)
|
||||
h.HandlerFunc("POST", writePath, h.handleWrite)
|
||||
return h
|
||||
}
|
||||
|
||||
|
@ -178,3 +183,61 @@ type postWriteRequest struct {
|
|||
Org string
|
||||
Bucket string
|
||||
}
|
||||
|
||||
// WriteService sends data over HTTP to influxdb via line protocol.
|
||||
type WriteService struct {
|
||||
Addr string
|
||||
Token string
|
||||
InsecureSkipVerify bool
|
||||
}
|
||||
|
||||
var _ platform.WriteService = (*WriteService)(nil)
|
||||
|
||||
func (s *WriteService) Write(ctx context.Context, org, bucket platform.ID, r io.Reader) error {
|
||||
u, err := newURL(s.Addr, writePath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
r, err = compressWithGzip(r)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
req, err := http.NewRequest("POST", u.String(), r)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
req.Header.Set("Content-Type", "text/plain; charset=utf-8")
|
||||
req.Header.Set("Content-Encoding", "gzip")
|
||||
SetToken(s.Token, req)
|
||||
|
||||
params := req.URL.Query()
|
||||
params.Set("org", string(org.Encode()))
|
||||
params.Set("bucket", string(bucket.Encode()))
|
||||
req.URL.RawQuery = params.Encode()
|
||||
|
||||
hc := newClient(u.Scheme, s.InsecureSkipVerify)
|
||||
|
||||
resp, err := hc.Do(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return CheckError(resp)
|
||||
}
|
||||
|
||||
func compressWithGzip(data io.Reader) (io.Reader, error) {
|
||||
pr, pw := io.Pipe()
|
||||
gw := gzip.NewWriter(pw)
|
||||
var err error
|
||||
|
||||
go func() {
|
||||
_, err = io.Copy(gw, data)
|
||||
gw.Close()
|
||||
pw.Close()
|
||||
}()
|
||||
|
||||
return pr, err
|
||||
}
|
||||
|
|
|
@ -0,0 +1,51 @@
|
|||
package http
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
|
||||
"github.com/influxdata/platform"
|
||||
"github.com/influxdata/platform/models"
|
||||
)
|
||||
|
||||
func TestWriteService_Write(t *testing.T) {
|
||||
type fields struct {
|
||||
Token string
|
||||
InsecureSkipVerify bool
|
||||
}
|
||||
type args struct {
|
||||
ctx context.Context
|
||||
org platform.ID
|
||||
bucket platform.ID
|
||||
r io.Reader
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
fields fields
|
||||
args args
|
||||
status int
|
||||
want []models.Point
|
||||
wantErr bool
|
||||
}{
|
||||
// TODO: Add test cases.
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
r.URL.Query().Get(OrgID)
|
||||
w.WriteHeader(tt.status)
|
||||
}))
|
||||
s := &WriteService{
|
||||
Addr: ts.URL,
|
||||
Token: tt.fields.Token,
|
||||
InsecureSkipVerify: tt.fields.InsecureSkipVerify,
|
||||
}
|
||||
if err := s.Write(tt.args.ctx, tt.args.org, tt.args.bucket, tt.args.r); (err != nil) != tt.wantErr {
|
||||
t.Errorf("WriteService.Write() error = %v, wantErr %v", err, tt.wantErr)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
|
@ -0,0 +1,49 @@
|
|||
package mock
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
"github.com/influxdata/platform/models"
|
||||
)
|
||||
|
||||
// PointsWriter is a mock structure for writing points.
|
||||
type PointsWriter struct {
|
||||
mu sync.RWMutex
|
||||
CTX context.Context
|
||||
Points []models.Point
|
||||
Err error
|
||||
}
|
||||
|
||||
// ForceError is for error testing, if WritePoints is called after ForceError, it will return that error.
|
||||
func (p *PointsWriter) ForceError(err error) {
|
||||
p.mu.Lock()
|
||||
p.Err = err
|
||||
p.mu.Unlock()
|
||||
}
|
||||
|
||||
// WritePoints writes points to the PointsWriter that will be exposed in the Values.
|
||||
func (p *PointsWriter) WritePoints(ctx context.Context, points []models.Point) error {
|
||||
p.mu.Lock()
|
||||
p.CTX = ctx
|
||||
p.Points = append(p.Points, points...)
|
||||
err := p.Err
|
||||
p.mu.Unlock()
|
||||
return err
|
||||
}
|
||||
|
||||
// Next returns the next (oldest) batch of values.
|
||||
func (p *PointsWriter) Next() models.Point {
|
||||
var points models.Point
|
||||
p.mu.RLock()
|
||||
if len(p.Points) == 0 {
|
||||
p.mu.RUnlock()
|
||||
return points
|
||||
}
|
||||
p.mu.RUnlock()
|
||||
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
points, p.Points = p.Points[0], p.Points[1:]
|
||||
return points
|
||||
}
|
Loading…
Reference in New Issue