fix(http): provides a reusable http client instead of creating/GCing one each time
parent
51f1b73738
commit
0225160814
|
@ -92,7 +92,7 @@ func (b *cmdPkgBuilder) cmdPkgApply() *cobra.Command {
|
|||
cmd.Flags().StringVarP(&b.file, "file", "f", "", "Path to package file")
|
||||
cmd.MarkFlagFilename("file", "yaml", "yml", "json")
|
||||
cmd.Flags().BoolVarP(&b.quiet, "quiet", "q", false, "disable output printing")
|
||||
cmd.Flags().IntVarP(&b.applyReqLimit, "req-limit", "r", 0, "TTY input, if package will have destructive changes, proceed if set true.")
|
||||
cmd.Flags().IntVarP(&b.applyReqLimit, "req-limit", "r", 0, "Request limit for applying a pkg, defaults to 5(recommended for OSS).")
|
||||
cmd.Flags().StringVar(&b.applyOpts.force, "force", "", `TTY input, if package will have destructive changes, proceed if set "true".`)
|
||||
|
||||
cmd.Flags().StringVarP(&b.orgID, "org-id", "o", "", "The ID of the organization that owns the bucket")
|
||||
|
|
|
@ -337,8 +337,13 @@ func (tl *TestLauncher) LabelService() *http.LabelService {
|
|||
return &http.LabelService{Addr: tl.URL(), Token: tl.Auth.Token}
|
||||
}
|
||||
|
||||
func (tl *TestLauncher) TelegrafService() *http.TelegrafService {
|
||||
return http.NewTelegrafService(tl.URL(), tl.Auth.Token, false)
|
||||
func (tl *TestLauncher) TelegrafService(t *testing.T) *http.TelegrafService {
|
||||
t.Helper()
|
||||
teleSVC, err := http.NewTelegrafService(tl.URL(), tl.Auth.Token, false)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
return teleSVC
|
||||
}
|
||||
|
||||
func (tl *TestLauncher) VariableService() *http.VariableService {
|
||||
|
|
|
@ -3,6 +3,7 @@ package launcher_test
|
|||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
@ -22,7 +23,7 @@ func TestLauncher_Pkger(t *testing.T) {
|
|||
pkger.WithBucketSVC(l.BucketService()),
|
||||
pkger.WithDashboardSVC(l.DashboardService()),
|
||||
pkger.WithLabelSVC(l.LabelService()),
|
||||
pkger.WithTelegrafSVC(l.TelegrafService()),
|
||||
pkger.WithTelegrafSVC(l.TelegrafService(t)),
|
||||
pkger.WithVariableSVC(l.VariableService()),
|
||||
)
|
||||
|
||||
|
@ -49,7 +50,7 @@ func TestLauncher_Pkger(t *testing.T) {
|
|||
LabelService: l.LabelService(),
|
||||
killCount: 2, // hits error on 3rd attempt at creating a mapping
|
||||
}),
|
||||
pkger.WithTelegrafSVC(l.TelegrafService()),
|
||||
pkger.WithTelegrafSVC(l.TelegrafService(t)),
|
||||
pkger.WithVariableSVC(l.VariableService()),
|
||||
)
|
||||
|
||||
|
@ -326,7 +327,7 @@ func TestLauncher_Pkger(t *testing.T) {
|
|||
}),
|
||||
pkger.WithDashboardSVC(l.DashboardService()),
|
||||
pkger.WithLabelSVC(l.LabelService()),
|
||||
pkger.WithTelegrafSVC(l.TelegrafService()),
|
||||
pkger.WithTelegrafSVC(l.TelegrafService(t)),
|
||||
pkger.WithVariableSVC(l.VariableService()),
|
||||
)
|
||||
|
||||
|
@ -461,28 +462,36 @@ spec:
|
|||
|
||||
type fakeBucketSVC struct {
|
||||
influxdb.BucketService
|
||||
countMu sync.Mutex
|
||||
callCount int
|
||||
killCount int
|
||||
}
|
||||
|
||||
func (f *fakeBucketSVC) UpdateBucket(ctx context.Context, id influxdb.ID, upd influxdb.BucketUpdate) (*influxdb.Bucket, error) {
|
||||
f.countMu.Lock()
|
||||
if f.callCount == f.killCount {
|
||||
f.countMu.Unlock()
|
||||
return nil, errors.New("reached kill count")
|
||||
}
|
||||
f.callCount++
|
||||
f.countMu.Unlock()
|
||||
return f.BucketService.UpdateBucket(ctx, id, upd)
|
||||
}
|
||||
|
||||
type fakeLabelSVC struct {
|
||||
influxdb.LabelService
|
||||
countMu sync.Mutex
|
||||
callCount int
|
||||
killCount int
|
||||
}
|
||||
|
||||
func (f *fakeLabelSVC) CreateLabelMapping(ctx context.Context, m *influxdb.LabelMapping) error {
|
||||
f.countMu.Lock()
|
||||
if f.callCount == f.killCount {
|
||||
f.countMu.Unlock()
|
||||
return errors.New("reached kill count")
|
||||
}
|
||||
f.callCount++
|
||||
f.countMu.Unlock()
|
||||
return f.LabelService.CreateLabelMapping(ctx, m)
|
||||
}
|
||||
|
|
128
http/client.go
128
http/client.go
|
@ -1,8 +1,13 @@
|
|||
package http
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"path"
|
||||
|
||||
"github.com/influxdata/influxdb/kit/tracing"
|
||||
)
|
||||
|
@ -90,3 +95,126 @@ func (c *traceClient) Do(r *http.Request) (*http.Response, error) {
|
|||
tracing.InjectToHTTPRequest(span, r)
|
||||
return c.Client.Do(r)
|
||||
}
|
||||
|
||||
// HTTPClient is a basic http client that can make cReqs with out having to juggle
|
||||
// the token and so forth. It provides sane defaults for checking response
|
||||
// statuses, sets auth token when provided, and sets the content type to
|
||||
// application/json for each request. The token, response checker, and
|
||||
// content type can be overidden on the cReq as well.
|
||||
type HTTPClient struct {
|
||||
addr url.URL
|
||||
token string
|
||||
client *traceClient
|
||||
}
|
||||
|
||||
// NewHTTPClient creates a new HTTPClient(client).
|
||||
func NewHTTPClient(addr, token string, insecureSkipVerify bool) (*HTTPClient, error) {
|
||||
u, err := url.Parse(addr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &HTTPClient{
|
||||
addr: *u,
|
||||
token: token,
|
||||
client: NewClient(u.Scheme, insecureSkipVerify),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (c *HTTPClient) delete(urlPath string) *cReq {
|
||||
return c.newClientReq(http.MethodDelete, urlPath, nil)
|
||||
}
|
||||
|
||||
func (c *HTTPClient) get(urlPath string) *cReq {
|
||||
return c.newClientReq(http.MethodGet, urlPath, nil)
|
||||
}
|
||||
|
||||
func (c *HTTPClient) post(urlPath string, body io.Reader) *cReq {
|
||||
return c.newClientReq(http.MethodPost, urlPath, body)
|
||||
}
|
||||
|
||||
func (c *HTTPClient) newClientReq(method, urlPath string, body io.Reader) *cReq {
|
||||
u := c.addr
|
||||
u.Path = path.Join(u.Path, urlPath)
|
||||
req, err := http.NewRequest(method, u.String(), body)
|
||||
if err != nil {
|
||||
return &cReq{err: err}
|
||||
}
|
||||
if c.token != "" {
|
||||
SetToken(c.token, req)
|
||||
}
|
||||
|
||||
cr := &cReq{
|
||||
client: c.client,
|
||||
req: req,
|
||||
respFn: CheckError,
|
||||
}
|
||||
return cr.ContentType("application/json")
|
||||
}
|
||||
|
||||
type cReq struct {
|
||||
client interface {
|
||||
Do(*http.Request) (*http.Response, error)
|
||||
}
|
||||
req *http.Request
|
||||
respFn func(*http.Response) error
|
||||
|
||||
err error
|
||||
}
|
||||
|
||||
func (r *cReq) Header(k, v string) *cReq {
|
||||
if r.err != nil {
|
||||
return r
|
||||
}
|
||||
r.req.Header.Add(k, v)
|
||||
return r
|
||||
}
|
||||
|
||||
type queryPair struct {
|
||||
k, v string
|
||||
}
|
||||
|
||||
func (r *cReq) Queries(pairs ...queryPair) *cReq {
|
||||
if r.err != nil || len(pairs) == 0 {
|
||||
return r
|
||||
}
|
||||
params := r.req.URL.Query()
|
||||
for _, p := range pairs {
|
||||
params.Add(p.k, p.v)
|
||||
}
|
||||
r.req.URL.RawQuery = params.Encode()
|
||||
return r
|
||||
}
|
||||
|
||||
func (r *cReq) ContentType(ct string) *cReq {
|
||||
return r.Header("Content-Type", ct)
|
||||
}
|
||||
|
||||
func (r *cReq) DecodeJSON(v interface{}) *cReq {
|
||||
return r.RespFn(func(resp *http.Response) error {
|
||||
return json.NewDecoder(resp.Body).Decode(v)
|
||||
})
|
||||
}
|
||||
|
||||
func (r *cReq) RespFn(fn func(*http.Response) error) *cReq {
|
||||
r.respFn = fn
|
||||
return r
|
||||
}
|
||||
|
||||
func (r *cReq) Do(ctx context.Context) error {
|
||||
if r.err != nil {
|
||||
return r.err
|
||||
}
|
||||
r.req = r.req.WithContext(ctx)
|
||||
|
||||
resp, err := r.client.Do(r.req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer func() {
|
||||
io.Copy(ioutil.Discard, resp.Body) // drain body completely
|
||||
resp.Body.Close()
|
||||
}()
|
||||
|
||||
return r.respFn(resp)
|
||||
}
|
||||
|
|
128
http/telegraf.go
128
http/telegraf.go
|
@ -5,8 +5,6 @@ import (
|
|||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"path"
|
||||
"strings"
|
||||
|
@ -431,24 +429,24 @@ func (h *TelegrafHandler) handleDeleteTelegraf(w http.ResponseWriter, r *http.Re
|
|||
|
||||
// TelegrafService is an http client that speaks to the telegraf service via HTTP.
|
||||
type TelegrafService struct {
|
||||
client C
|
||||
client *HTTPClient
|
||||
*UserResourceMappingService
|
||||
}
|
||||
|
||||
// NewTelegrafService is a constructor for a telegraf service.
|
||||
func NewTelegrafService(addr, token string, insecureSkipVerify bool) *TelegrafService {
|
||||
func NewTelegrafService(addr, token string, insecureSkipVerify bool) (*TelegrafService, error) {
|
||||
client, err := NewHTTPClient(addr, token, insecureSkipVerify)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &TelegrafService{
|
||||
client: C{
|
||||
Addr: addr,
|
||||
Token: token,
|
||||
InsecureSkipVerify: insecureSkipVerify,
|
||||
},
|
||||
client: client,
|
||||
UserResourceMappingService: &UserResourceMappingService{
|
||||
Addr: addr,
|
||||
Token: token,
|
||||
InsecureSkipVerify: insecureSkipVerify,
|
||||
},
|
||||
}
|
||||
}, nil
|
||||
}
|
||||
|
||||
var _ platform.TelegrafConfigStore = (*TelegrafService)(nil)
|
||||
|
@ -537,113 +535,3 @@ func (s *TelegrafService) UpdateTelegrafConfig(ctx context.Context, id platform.
|
|||
func (s *TelegrafService) DeleteTelegrafConfig(ctx context.Context, id platform.ID) error {
|
||||
return s.client.delete(path.Join(telegrafsPath, id.String())).Do(ctx)
|
||||
}
|
||||
|
||||
// C is a basic http client that can make cReqs with out having to juggle
|
||||
// the token and so forth. It provides sane defaults for checking response
|
||||
// statuses, sets auth token when provided, and sets the content type to
|
||||
// application/json for each request. The token, response checker, and
|
||||
// content type can be overidden on the cReq as well.
|
||||
type C struct {
|
||||
Addr string
|
||||
Token string
|
||||
InsecureSkipVerify bool
|
||||
}
|
||||
|
||||
func (c *C) delete(urlPath string) *cReq {
|
||||
return c.newClientReq(http.MethodDelete, urlPath, nil)
|
||||
}
|
||||
|
||||
func (c *C) get(urlPath string) *cReq {
|
||||
return c.newClientReq(http.MethodGet, urlPath, nil)
|
||||
}
|
||||
|
||||
func (c *C) post(urlPath string, body io.Reader) *cReq {
|
||||
return c.newClientReq(http.MethodPost, urlPath, body)
|
||||
}
|
||||
|
||||
func (c *C) newClientReq(method, urlPath string, body io.Reader) *cReq {
|
||||
u, err := NewURL(c.Addr, urlPath)
|
||||
if err != nil {
|
||||
return &cReq{err: err}
|
||||
}
|
||||
|
||||
req, err := http.NewRequest(method, u.String(), body)
|
||||
if err != nil {
|
||||
return &cReq{err: err}
|
||||
}
|
||||
if c.Token != "" {
|
||||
SetToken(c.Token, req)
|
||||
}
|
||||
|
||||
cr := &cReq{
|
||||
insecureSkip: c.InsecureSkipVerify,
|
||||
req: req,
|
||||
respFn: CheckError,
|
||||
}
|
||||
return cr.ContentType("application/json")
|
||||
}
|
||||
|
||||
type cReq struct {
|
||||
req *http.Request
|
||||
insecureSkip bool
|
||||
respFn func(*http.Response) error
|
||||
|
||||
err error
|
||||
}
|
||||
|
||||
func (r *cReq) Header(k, v string) *cReq {
|
||||
if r.err != nil {
|
||||
return r
|
||||
}
|
||||
r.req.Header.Add(k, v)
|
||||
return r
|
||||
}
|
||||
|
||||
type queryPair struct {
|
||||
k, v string
|
||||
}
|
||||
|
||||
func (r *cReq) Queries(pairs ...queryPair) *cReq {
|
||||
if r.err != nil || len(pairs) == 0 {
|
||||
return r
|
||||
}
|
||||
params := r.req.URL.Query()
|
||||
for _, p := range pairs {
|
||||
params.Add(p.k, p.v)
|
||||
}
|
||||
r.req.URL.RawQuery = params.Encode()
|
||||
return r
|
||||
}
|
||||
|
||||
func (r *cReq) ContentType(ct string) *cReq {
|
||||
return r.Header("Content-Type", ct)
|
||||
}
|
||||
|
||||
func (r *cReq) DecodeJSON(v interface{}) *cReq {
|
||||
return r.RespFn(func(resp *http.Response) error {
|
||||
return json.NewDecoder(resp.Body).Decode(v)
|
||||
})
|
||||
}
|
||||
|
||||
func (r *cReq) RespFn(fn func(*http.Response) error) *cReq {
|
||||
r.respFn = fn
|
||||
return r
|
||||
}
|
||||
|
||||
func (r *cReq) Do(ctx context.Context) error {
|
||||
if r.err != nil {
|
||||
return r.err
|
||||
}
|
||||
r.req = r.req.WithContext(ctx)
|
||||
|
||||
resp, err := NewClient(r.req.URL.Scheme, r.insecureSkip).Do(r.req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer func() {
|
||||
io.Copy(ioutil.Discard, resp.Body) // drain body completely
|
||||
resp.Body.Close()
|
||||
}()
|
||||
|
||||
return r.respFn(resp)
|
||||
}
|
||||
|
|
|
@ -1412,12 +1412,6 @@ func (r *rollbackCoordinator) rollback(l *zap.Logger, err *error) {
|
|||
}
|
||||
}
|
||||
|
||||
func (r *rollbackCoordinator) close() {
|
||||
if r.sem != nil {
|
||||
close(r.sem)
|
||||
}
|
||||
}
|
||||
|
||||
type errMsg struct {
|
||||
resource string
|
||||
err applyErrBody
|
||||
|
|
Loading…
Reference in New Issue