diff --git a/cmd/influx/pkg.go b/cmd/influx/pkg.go index e3591863cb..a3cde35600 100644 --- a/cmd/influx/pkg.go +++ b/cmd/influx/pkg.go @@ -92,7 +92,7 @@ func (b *cmdPkgBuilder) cmdPkgApply() *cobra.Command { cmd.Flags().StringVarP(&b.file, "file", "f", "", "Path to package file") cmd.MarkFlagFilename("file", "yaml", "yml", "json") cmd.Flags().BoolVarP(&b.quiet, "quiet", "q", false, "disable output printing") - cmd.Flags().IntVarP(&b.applyReqLimit, "req-limit", "r", 0, "TTY input, if package will have destructive changes, proceed if set true.") + cmd.Flags().IntVarP(&b.applyReqLimit, "req-limit", "r", 0, "Request limit for applying a pkg, defaults to 5(recommended for OSS).") cmd.Flags().StringVar(&b.applyOpts.force, "force", "", `TTY input, if package will have destructive changes, proceed if set "true".`) cmd.Flags().StringVarP(&b.orgID, "org-id", "o", "", "The ID of the organization that owns the bucket") diff --git a/cmd/influxd/launcher/launcher_helpers.go b/cmd/influxd/launcher/launcher_helpers.go index 9bb5ba0006..99a2839ec0 100644 --- a/cmd/influxd/launcher/launcher_helpers.go +++ b/cmd/influxd/launcher/launcher_helpers.go @@ -337,8 +337,13 @@ func (tl *TestLauncher) LabelService() *http.LabelService { return &http.LabelService{Addr: tl.URL(), Token: tl.Auth.Token} } -func (tl *TestLauncher) TelegrafService() *http.TelegrafService { - return http.NewTelegrafService(tl.URL(), tl.Auth.Token, false) +func (tl *TestLauncher) TelegrafService(t *testing.T) *http.TelegrafService { + t.Helper() + teleSVC, err := http.NewTelegrafService(tl.URL(), tl.Auth.Token, false) + if err != nil { + t.Fatal(err) + } + return teleSVC } func (tl *TestLauncher) VariableService() *http.VariableService { diff --git a/cmd/influxd/launcher/pkger_test.go b/cmd/influxd/launcher/pkger_test.go index 75f2c9b432..2efe4eac48 100644 --- a/cmd/influxd/launcher/pkger_test.go +++ b/cmd/influxd/launcher/pkger_test.go @@ -3,6 +3,7 @@ package launcher_test import ( "context" "errors" + "sync" "testing" "time" @@ -22,7 +23,7 @@ func TestLauncher_Pkger(t *testing.T) { pkger.WithBucketSVC(l.BucketService()), pkger.WithDashboardSVC(l.DashboardService()), pkger.WithLabelSVC(l.LabelService()), - pkger.WithTelegrafSVC(l.TelegrafService()), + pkger.WithTelegrafSVC(l.TelegrafService(t)), pkger.WithVariableSVC(l.VariableService()), ) @@ -49,7 +50,7 @@ func TestLauncher_Pkger(t *testing.T) { LabelService: l.LabelService(), killCount: 2, // hits error on 3rd attempt at creating a mapping }), - pkger.WithTelegrafSVC(l.TelegrafService()), + pkger.WithTelegrafSVC(l.TelegrafService(t)), pkger.WithVariableSVC(l.VariableService()), ) @@ -326,7 +327,7 @@ func TestLauncher_Pkger(t *testing.T) { }), pkger.WithDashboardSVC(l.DashboardService()), pkger.WithLabelSVC(l.LabelService()), - pkger.WithTelegrafSVC(l.TelegrafService()), + pkger.WithTelegrafSVC(l.TelegrafService(t)), pkger.WithVariableSVC(l.VariableService()), ) @@ -461,28 +462,36 @@ spec: type fakeBucketSVC struct { influxdb.BucketService + countMu sync.Mutex callCount int killCount int } func (f *fakeBucketSVC) UpdateBucket(ctx context.Context, id influxdb.ID, upd influxdb.BucketUpdate) (*influxdb.Bucket, error) { + f.countMu.Lock() if f.callCount == f.killCount { + f.countMu.Unlock() return nil, errors.New("reached kill count") } f.callCount++ + f.countMu.Unlock() return f.BucketService.UpdateBucket(ctx, id, upd) } type fakeLabelSVC struct { influxdb.LabelService + countMu sync.Mutex callCount int killCount int } func (f *fakeLabelSVC) CreateLabelMapping(ctx context.Context, m *influxdb.LabelMapping) error { + f.countMu.Lock() if f.callCount == f.killCount { + f.countMu.Unlock() return errors.New("reached kill count") } f.callCount++ + f.countMu.Unlock() return f.LabelService.CreateLabelMapping(ctx, m) } diff --git a/http/client.go b/http/client.go index 0690c28f13..49102fa6ed 100644 --- a/http/client.go +++ b/http/client.go @@ -1,8 +1,13 @@ package http import ( + "context" + "encoding/json" + "io" + "io/ioutil" "net/http" "net/url" + "path" "github.com/influxdata/influxdb/kit/tracing" ) @@ -90,3 +95,126 @@ func (c *traceClient) Do(r *http.Request) (*http.Response, error) { tracing.InjectToHTTPRequest(span, r) return c.Client.Do(r) } + +// HTTPClient is a basic http client that can make cReqs with out having to juggle +// the token and so forth. It provides sane defaults for checking response +// statuses, sets auth token when provided, and sets the content type to +// application/json for each request. The token, response checker, and +// content type can be overidden on the cReq as well. +type HTTPClient struct { + addr url.URL + token string + client *traceClient +} + +// NewHTTPClient creates a new HTTPClient(client). +func NewHTTPClient(addr, token string, insecureSkipVerify bool) (*HTTPClient, error) { + u, err := url.Parse(addr) + if err != nil { + return nil, err + } + + return &HTTPClient{ + addr: *u, + token: token, + client: NewClient(u.Scheme, insecureSkipVerify), + }, nil +} + +func (c *HTTPClient) delete(urlPath string) *cReq { + return c.newClientReq(http.MethodDelete, urlPath, nil) +} + +func (c *HTTPClient) get(urlPath string) *cReq { + return c.newClientReq(http.MethodGet, urlPath, nil) +} + +func (c *HTTPClient) post(urlPath string, body io.Reader) *cReq { + return c.newClientReq(http.MethodPost, urlPath, body) +} + +func (c *HTTPClient) newClientReq(method, urlPath string, body io.Reader) *cReq { + u := c.addr + u.Path = path.Join(u.Path, urlPath) + req, err := http.NewRequest(method, u.String(), body) + if err != nil { + return &cReq{err: err} + } + if c.token != "" { + SetToken(c.token, req) + } + + cr := &cReq{ + client: c.client, + req: req, + respFn: CheckError, + } + return cr.ContentType("application/json") +} + +type cReq struct { + client interface { + Do(*http.Request) (*http.Response, error) + } + req *http.Request + respFn func(*http.Response) error + + err error +} + +func (r *cReq) Header(k, v string) *cReq { + if r.err != nil { + return r + } + r.req.Header.Add(k, v) + return r +} + +type queryPair struct { + k, v string +} + +func (r *cReq) Queries(pairs ...queryPair) *cReq { + if r.err != nil || len(pairs) == 0 { + return r + } + params := r.req.URL.Query() + for _, p := range pairs { + params.Add(p.k, p.v) + } + r.req.URL.RawQuery = params.Encode() + return r +} + +func (r *cReq) ContentType(ct string) *cReq { + return r.Header("Content-Type", ct) +} + +func (r *cReq) DecodeJSON(v interface{}) *cReq { + return r.RespFn(func(resp *http.Response) error { + return json.NewDecoder(resp.Body).Decode(v) + }) +} + +func (r *cReq) RespFn(fn func(*http.Response) error) *cReq { + r.respFn = fn + return r +} + +func (r *cReq) Do(ctx context.Context) error { + if r.err != nil { + return r.err + } + r.req = r.req.WithContext(ctx) + + resp, err := r.client.Do(r.req) + if err != nil { + return err + } + defer func() { + io.Copy(ioutil.Discard, resp.Body) // drain body completely + resp.Body.Close() + }() + + return r.respFn(resp) +} diff --git a/http/telegraf.go b/http/telegraf.go index 2eec10de3f..0f2fae7da6 100644 --- a/http/telegraf.go +++ b/http/telegraf.go @@ -5,8 +5,6 @@ import ( "context" "encoding/json" "fmt" - "io" - "io/ioutil" "net/http" "path" "strings" @@ -431,24 +429,24 @@ func (h *TelegrafHandler) handleDeleteTelegraf(w http.ResponseWriter, r *http.Re // TelegrafService is an http client that speaks to the telegraf service via HTTP. type TelegrafService struct { - client C + client *HTTPClient *UserResourceMappingService } // NewTelegrafService is a constructor for a telegraf service. -func NewTelegrafService(addr, token string, insecureSkipVerify bool) *TelegrafService { +func NewTelegrafService(addr, token string, insecureSkipVerify bool) (*TelegrafService, error) { + client, err := NewHTTPClient(addr, token, insecureSkipVerify) + if err != nil { + return nil, err + } return &TelegrafService{ - client: C{ - Addr: addr, - Token: token, - InsecureSkipVerify: insecureSkipVerify, - }, + client: client, UserResourceMappingService: &UserResourceMappingService{ Addr: addr, Token: token, InsecureSkipVerify: insecureSkipVerify, }, - } + }, nil } var _ platform.TelegrafConfigStore = (*TelegrafService)(nil) @@ -537,113 +535,3 @@ func (s *TelegrafService) UpdateTelegrafConfig(ctx context.Context, id platform. func (s *TelegrafService) DeleteTelegrafConfig(ctx context.Context, id platform.ID) error { return s.client.delete(path.Join(telegrafsPath, id.String())).Do(ctx) } - -// C is a basic http client that can make cReqs with out having to juggle -// the token and so forth. It provides sane defaults for checking response -// statuses, sets auth token when provided, and sets the content type to -// application/json for each request. The token, response checker, and -// content type can be overidden on the cReq as well. -type C struct { - Addr string - Token string - InsecureSkipVerify bool -} - -func (c *C) delete(urlPath string) *cReq { - return c.newClientReq(http.MethodDelete, urlPath, nil) -} - -func (c *C) get(urlPath string) *cReq { - return c.newClientReq(http.MethodGet, urlPath, nil) -} - -func (c *C) post(urlPath string, body io.Reader) *cReq { - return c.newClientReq(http.MethodPost, urlPath, body) -} - -func (c *C) newClientReq(method, urlPath string, body io.Reader) *cReq { - u, err := NewURL(c.Addr, urlPath) - if err != nil { - return &cReq{err: err} - } - - req, err := http.NewRequest(method, u.String(), body) - if err != nil { - return &cReq{err: err} - } - if c.Token != "" { - SetToken(c.Token, req) - } - - cr := &cReq{ - insecureSkip: c.InsecureSkipVerify, - req: req, - respFn: CheckError, - } - return cr.ContentType("application/json") -} - -type cReq struct { - req *http.Request - insecureSkip bool - respFn func(*http.Response) error - - err error -} - -func (r *cReq) Header(k, v string) *cReq { - if r.err != nil { - return r - } - r.req.Header.Add(k, v) - return r -} - -type queryPair struct { - k, v string -} - -func (r *cReq) Queries(pairs ...queryPair) *cReq { - if r.err != nil || len(pairs) == 0 { - return r - } - params := r.req.URL.Query() - for _, p := range pairs { - params.Add(p.k, p.v) - } - r.req.URL.RawQuery = params.Encode() - return r -} - -func (r *cReq) ContentType(ct string) *cReq { - return r.Header("Content-Type", ct) -} - -func (r *cReq) DecodeJSON(v interface{}) *cReq { - return r.RespFn(func(resp *http.Response) error { - return json.NewDecoder(resp.Body).Decode(v) - }) -} - -func (r *cReq) RespFn(fn func(*http.Response) error) *cReq { - r.respFn = fn - return r -} - -func (r *cReq) Do(ctx context.Context) error { - if r.err != nil { - return r.err - } - r.req = r.req.WithContext(ctx) - - resp, err := NewClient(r.req.URL.Scheme, r.insecureSkip).Do(r.req) - if err != nil { - return err - } - defer func() { - io.Copy(ioutil.Discard, resp.Body) // drain body completely - resp.Body.Close() - }() - - return r.respFn(resp) -} diff --git a/pkger/service.go b/pkger/service.go index 66f15440f0..e4a5f12d98 100644 --- a/pkger/service.go +++ b/pkger/service.go @@ -1412,12 +1412,6 @@ func (r *rollbackCoordinator) rollback(l *zap.Logger, err *error) { } } -func (r *rollbackCoordinator) close() { - if r.sem != nil { - close(r.sem) - } -} - type errMsg struct { resource string err applyErrBody