feat(httpc): add httpc pkg to resuse code for http client type

pull/16177/head
Johnny Steenbergen 2019-12-07 20:10:22 -08:00 committed by Johnny Steenbergen
parent 648a14d148
commit 5cc02de988
16 changed files with 1186 additions and 321 deletions

View File

@ -14,6 +14,7 @@ import (
"github.com/influxdata/influxdb/http"
"github.com/influxdata/influxdb/internal/fs"
"github.com/influxdata/influxdb/kv"
"github.com/influxdata/influxdb/pkg/httpc"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"go.uber.org/zap"
@ -29,10 +30,10 @@ func main() {
}
var (
httpClient *http.HTTPClient
httpClient *httpc.Client
)
func newHTTPClient() (*http.HTTPClient, error) {
func newHTTPClient() (*httpc.Client, error) {
if httpClient != nil {
return httpClient, nil
}

View File

@ -21,6 +21,7 @@ import (
"github.com/influxdata/influxdb/http"
"github.com/influxdata/influxdb/kv"
"github.com/influxdata/influxdb/mock"
"github.com/influxdata/influxdb/pkg/httpc"
"github.com/influxdata/influxdb/query"
)
@ -37,7 +38,7 @@ type TestLauncher struct {
Bucket *platform.Bucket
Auth *platform.Authorization
httpClient *http.HTTPClient
httpClient *httpc.Client
// Standard in/out/err buffers.
Stdin bytes.Buffer
@ -361,11 +362,15 @@ func (tl *TestLauncher) TaskService() *http.TaskService {
return &http.TaskService{Addr: tl.URL(), Token: tl.Auth.Token}
}
func (tl *TestLauncher) HTTPClient(tb testing.TB) *http.HTTPClient {
func (tl *TestLauncher) HTTPClient(tb testing.TB) *httpc.Client {
tb.Helper()
if tl.httpClient == nil {
client, err := http.NewHTTPClient(tl.URL(), tl.Auth.Token, false)
token := ""
if tl.Auth != nil {
token = tl.Auth.Token
}
client, err := http.NewHTTPClient(tl.URL(), token, false)
if err != nil {
tb.Fatal(err)
}

View File

@ -10,10 +10,10 @@ import (
"time"
"github.com/influxdata/httprouter"
"go.uber.org/zap"
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/kit/tracing"
"github.com/influxdata/influxdb/pkg/httpc"
"go.uber.org/zap"
)
// BucketBackend is all services and associated parameters required to construct
@ -721,7 +721,7 @@ func decodePatchBucketRequest(ctx context.Context, r *http.Request) (*patchBucke
// BucketService connects to Influx via HTTP using tokens to manage buckets
type BucketService struct {
Client *HTTPClient
Client *httpc.Client
// OpPrefix is an additional property for error
// find bucket service, when finds nothing.
OpPrefix string
@ -765,7 +765,8 @@ func (s *BucketService) FindBucketByID(ctx context.Context, id influxdb.ID) (*in
defer span.Finish()
var br bucketResponse
err := s.Client.get(bucketIDPath(id)).
err := s.Client.
Get(bucketIDPath(id)).
DecodeJSON(&br).
Do(ctx)
if err != nil {
@ -830,8 +831,9 @@ func (s *BucketService) FindBuckets(ctx context.Context, filter influxdb.BucketF
}
var bs bucketsResponse
err := s.Client.get(bucketsPath).
Queries(queryPairs...).
err := s.Client.
Get(bucketsPath).
QueryParams(queryPairs...).
DecodeJSON(&bs).
Do(ctx)
if err != nil {
@ -857,7 +859,8 @@ func (s *BucketService) CreateBucket(ctx context.Context, b *influxdb.Bucket) er
defer span.Finish()
var br bucketResponse
err := s.Client.post(bucketsPath, bodyJSON(newBucket(b))).
err := s.Client.
Post(httpc.BodyJSON(newBucket(b)), bucketsPath).
DecodeJSON(&br).
Do(ctx)
if err != nil {
@ -876,7 +879,8 @@ func (s *BucketService) CreateBucket(ctx context.Context, b *influxdb.Bucket) er
// Returns the new bucket state after update.
func (s *BucketService) UpdateBucket(ctx context.Context, id influxdb.ID, upd influxdb.BucketUpdate) (*influxdb.Bucket, error) {
var br bucketResponse
err := s.Client.patch(bucketIDPath(id), bodyJSON(newBucketUpdate(&upd))).
err := s.Client.
Patch(httpc.BodyJSON(newBucketUpdate(&upd)), bucketIDPath(id)).
DecodeJSON(&br).
Do(ctx)
if err != nil {
@ -887,7 +891,9 @@ func (s *BucketService) UpdateBucket(ctx context.Context, id influxdb.ID, upd in
// DeleteBucket removes a bucket by ID.
func (s *BucketService) DeleteBucket(ctx context.Context, id influxdb.ID) error {
return s.Client.delete(bucketIDPath(id)).Do(ctx)
return s.Client.
Delete(bucketIDPath(id)).
Do(ctx)
}
// validBucketName reports any errors with bucket names

View File

@ -17,6 +17,7 @@ import (
"github.com/influxdata/influxdb/inmem"
"github.com/influxdata/influxdb/kv"
"github.com/influxdata/influxdb/mock"
"github.com/influxdata/influxdb/pkg/httpc"
platformtesting "github.com/influxdata/influxdb/testing"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"
@ -1215,7 +1216,7 @@ func TestBucketService(t *testing.T) {
platformtesting.BucketService(initBucketService, t)
}
func mustNewHTTPClient(t *testing.T, addr, token string) *HTTPClient {
func mustNewHTTPClient(t *testing.T, addr, token string) *httpc.Client {
t.Helper()
httpClient, err := NewHTTPClient(addr, token, false)

View File

@ -1,19 +1,38 @@
package http
import (
"bytes"
"context"
"encoding/json"
"io"
"io/ioutil"
"crypto/tls"
"net"
"net/http"
"net/url"
"path"
"time"
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/kit/tracing"
"github.com/influxdata/influxdb/pkg/httpc"
)
// NewHTTPClient creates a new httpc.Client type. This call sets all
// the options that are important to the http pkg on the httpc client.
// The default status fn and so forth will all be set for the caller.
func NewHTTPClient(addr, token string, insecureSkipVerify bool) (*httpc.Client, error) {
u, err := url.Parse(addr)
if err != nil {
return nil, err
}
opts := []httpc.ClientOptFn{
httpc.WithAddr(addr),
httpc.WithContentType("application/json"),
httpc.WithHTTPClient(NewClient(u.Scheme, insecureSkipVerify)),
httpc.WithInsecureSkipVerify(insecureSkipVerify),
httpc.WithStatusFn(CheckError),
}
if token != "" {
opts = append(opts, httpc.WithAuthToken(token))
}
return httpc.New(opts...)
}
// Service connects to an InfluxDB via HTTP.
type Service struct {
Addr string
@ -73,206 +92,44 @@ func NewURL(addr, path string) (*url.URL, error) {
}
// NewClient returns an http.Client that pools connections and injects a span.
func NewClient(scheme string, insecure bool) *traceClient {
hc := &traceClient{
Client: http.Client{
Transport: defaultTransport,
},
}
if scheme == "https" && insecure {
hc.Transport = skipVerifyTransport
}
return hc
func NewClient(scheme string, insecure bool) *http.Client {
return httpClient(scheme, insecure)
}
// traceClient always injects any opentracing trace into the client requests.
type traceClient struct {
http.Client
// SpanTransport injects the http.RoundTripper.RoundTrip() request
// with a span.
type SpanTransport struct {
base http.RoundTripper
}
// Do injects the trace and then performs the request.
func (c *traceClient) Do(r *http.Request) (*http.Response, error) {
// RoundTrip implements the http.RoundTripper, intercepting the base
// round trippers call and injecting a span.
func (s *SpanTransport) RoundTrip(r *http.Request) (*http.Response, error) {
span, _ := tracing.StartSpanFromContext(r.Context())
defer span.Finish()
tracing.InjectToHTTPRequest(span, r)
return c.Client.Do(r)
return s.base.RoundTrip(r)
}
// HTTPClient is a basic http client that can make cReqs with out having to juggle
// the token and so forth. It provides sane defaults for checking response
// statuses, sets auth token when provided, and sets the content type to
// application/json for each request. The token, response checker, and
// content type can be overidden on the cReq as well.
type HTTPClient struct {
addr url.URL
token string
client *traceClient
}
// NewHTTPClient creates a new HTTPClient(client).
func NewHTTPClient(addr, token string, insecureSkipVerify bool) (*HTTPClient, error) {
u, err := url.Parse(addr)
if err != nil {
return nil, err
func httpClient(scheme string, insecure bool) *http.Client {
tr := &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
DualStack: true,
}).DialContext,
MaxIdleConns: 100,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
}
return &HTTPClient{
addr: *u,
token: token,
client: NewClient(u.Scheme, insecureSkipVerify),
}, nil
}
func (c *HTTPClient) delete(urlPath string) *cReq {
return c.newClientReq(http.MethodDelete, urlPath, bodyEmpty())
}
func (c *HTTPClient) get(urlPath string) *cReq {
return c.newClientReq(http.MethodGet, urlPath, bodyEmpty())
}
func (c *HTTPClient) patch(urlPath string, bFn bodyFn) *cReq {
return c.newClientReq(http.MethodPatch, urlPath, bFn)
}
func (c *HTTPClient) post(urlPath string, bFn bodyFn) *cReq {
return c.newClientReq(http.MethodPost, urlPath, bFn)
}
func (c *HTTPClient) put(urlPath string, bFn bodyFn) *cReq {
return c.newClientReq(http.MethodPut, urlPath, bFn)
}
type bodyFn func() (io.Reader, error)
func bodyEmpty() bodyFn {
return func() (io.Reader, error) {
return nil, nil
if scheme == "https" && insecure {
tr.TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
}
return &http.Client{
Transport: &SpanTransport{
base: tr,
},
}
}
// TODO(@jsteenb2): discussion add a inspection for an OK() or Valid() method, then enforce
// that across all consumers?
func bodyJSON(v interface{}) bodyFn {
return func() (io.Reader, error) {
var buf bytes.Buffer
if err := json.NewEncoder(&buf).Encode(v); err != nil {
return nil, err
}
return &buf, nil
}
}
func (c *HTTPClient) newClientReq(method, urlPath string, bFn bodyFn) *cReq {
body, err := bFn()
if err != nil {
return &cReq{err: err}
}
u := c.addr
u.Path = path.Join(u.Path, urlPath)
req, err := http.NewRequest(method, u.String(), body)
if err != nil {
return &cReq{err: err}
}
if c.token != "" {
SetToken(c.token, req)
}
cr := &cReq{
client: c.client,
req: req,
statusFn: CheckError,
}
return cr.ContentType("application/json")
}
type cReq struct {
client interface {
Do(*http.Request) (*http.Response, error)
}
req *http.Request
decodeFn func(*http.Response) error
respFn func(*http.Response) error
statusFn func(*http.Response) error
err error
}
func (r *cReq) Header(k, v string) *cReq {
if r.err != nil {
return r
}
r.req.Header.Add(k, v)
return r
}
func (r *cReq) Queries(pairs ...[2]string) *cReq {
if r.err != nil || len(pairs) == 0 {
return r
}
params := r.req.URL.Query()
for _, p := range pairs {
params.Add(p[0], p[1])
}
r.req.URL.RawQuery = params.Encode()
return r
}
func (r *cReq) ContentType(ct string) *cReq {
return r.Header("Content-Type", ct)
}
func (r *cReq) DecodeJSON(v interface{}) *cReq {
r.decodeFn = func(resp *http.Response) error {
if err := json.NewDecoder(resp.Body).Decode(v); err != nil {
return &influxdb.Error{
Code: influxdb.EInvalid,
Err: err,
}
}
return nil
}
return r
}
func (r *cReq) RespFn(fn func(*http.Response) error) *cReq {
r.respFn = fn
return r
}
func (r *cReq) StatusFn(fn func(*http.Response) error) *cReq {
r.statusFn = fn
return r
}
func (r *cReq) Do(ctx context.Context) error {
if r.err != nil {
return r.err
}
r.req = r.req.WithContext(ctx)
resp, err := r.client.Do(r.req)
if err != nil {
return err
}
defer func() {
io.Copy(ioutil.Discard, resp.Body) // drain body completely
resp.Body.Close()
}()
responseFns := []func(*http.Response) error{
r.statusFn,
r.decodeFn,
r.respFn,
}
for _, fn := range responseFns {
if fn != nil {
if err := fn(resp); err != nil {
return err
}
}
}
return nil
}

View File

@ -9,6 +9,7 @@ import (
"github.com/influxdata/httprouter"
platform "github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/pkg/httpc"
"go.uber.org/zap"
)
@ -1080,14 +1081,15 @@ func (h *DashboardHandler) handlePatchDashboardCell(w http.ResponseWriter, r *ht
// DashboardService is a dashboard service over HTTP to the influxdb server.
type DashboardService struct {
Client *HTTPClient
Client *httpc.Client
}
// FindDashboardByID returns a single dashboard by ID.
func (s *DashboardService) FindDashboardByID(ctx context.Context, id platform.ID) (*platform.Dashboard, error) {
var dr dashboardResponse
err := s.Client.get(dashboardIDPath(id)).
Queries([2]string{"include", "properties"}).
err := s.Client.
Get(dashboardsPath, id.String()).
QueryParams([2]string{"include", "properties"}).
DecodeJSON(&dr).
Do(ctx)
if err != nil {
@ -1116,8 +1118,9 @@ func (s *DashboardService) FindDashboards(ctx context.Context, filter platform.D
}
var dr getDashboardsResponse
err := s.Client.get(dashboardsPath).
Queries(queryPairs...).
err := s.Client.
Get(dashboardsPath).
QueryParams(queryPairs...).
DecodeJSON(&dr).
Do(ctx)
if err != nil {
@ -1130,7 +1133,8 @@ func (s *DashboardService) FindDashboards(ctx context.Context, filter platform.D
// CreateDashboard creates a new dashboard and sets b.ID with the new identifier.
func (s *DashboardService) CreateDashboard(ctx context.Context, d *platform.Dashboard) error {
return s.Client.post(dashboardsPath, bodyJSON(d)).
return s.Client.
Post(httpc.BodyJSON(d), dashboardsPath).
DecodeJSON(d).
Do(ctx)
}
@ -1139,7 +1143,8 @@ func (s *DashboardService) CreateDashboard(ctx context.Context, d *platform.Dash
// Returns the new dashboard state after update.
func (s *DashboardService) UpdateDashboard(ctx context.Context, id platform.ID, upd platform.DashboardUpdate) (*platform.Dashboard, error) {
var d platform.Dashboard
err := s.Client.patch(dashboardIDPath(id), bodyJSON(upd)).
err := s.Client.
Patch(httpc.BodyJSON(upd), dashboardsPath, id.String()).
DecodeJSON(&d).
Do(ctx)
if err != nil {
@ -1156,19 +1161,24 @@ func (s *DashboardService) UpdateDashboard(ctx context.Context, id platform.ID,
// DeleteDashboard removes a dashboard by ID.
func (s *DashboardService) DeleteDashboard(ctx context.Context, id platform.ID) error {
return s.Client.delete(dashboardIDPath(id)).Do(ctx)
return s.Client.
Delete(dashboardIDPath(id)).
Do(ctx)
}
// AddDashboardCell adds a cell to a dashboard.
func (s *DashboardService) AddDashboardCell(ctx context.Context, id platform.ID, c *platform.Cell, opts platform.AddDashboardCellOptions) error {
return s.Client.post(cellPath(id), bodyJSON(c)).
return s.Client.
Post(httpc.BodyJSON(c), cellPath(id)).
DecodeJSON(c).
Do(ctx)
}
// RemoveDashboardCell removes a dashboard.
func (s *DashboardService) RemoveDashboardCell(ctx context.Context, dashboardID, cellID platform.ID) error {
return s.Client.delete(dashboardCellIDPath(dashboardID, cellID)).Do(ctx)
return s.Client.
Delete(dashboardCellIDPath(dashboardID, cellID)).
Do(ctx)
}
// UpdateDashboardCell replaces the dashboard cell with the provided ID.
@ -1180,7 +1190,8 @@ func (s *DashboardService) UpdateDashboardCell(ctx context.Context, dashboardID,
}
var c platform.Cell
err := s.Client.patch(dashboardCellIDPath(dashboardID, cellID), bodyJSON(upd)).
err := s.Client.
Patch(httpc.BodyJSON(upd), dashboardCellIDPath(dashboardID, cellID)).
DecodeJSON(&c).
Do(ctx)
if err != nil {
@ -1193,7 +1204,8 @@ func (s *DashboardService) UpdateDashboardCell(ctx context.Context, dashboardID,
// GetDashboardCellView retrieves the view for a dashboard cell.
func (s *DashboardService) GetDashboardCellView(ctx context.Context, dashboardID, cellID platform.ID) (*platform.View, error) {
var dcv dashboardCellViewResponse
err := s.Client.get(cellViewPath(dashboardID, cellID)).
err := s.Client.
Get(cellViewPath(dashboardID, cellID)).
DecodeJSON(&dcv).
Do(ctx)
if err != nil {
@ -1206,7 +1218,8 @@ func (s *DashboardService) GetDashboardCellView(ctx context.Context, dashboardID
// UpdateDashboardCellView updates the view for a dashboard cell.
func (s *DashboardService) UpdateDashboardCellView(ctx context.Context, dashboardID, cellID platform.ID, upd platform.ViewUpdate) (*platform.View, error) {
var dcv dashboardCellViewResponse
err := s.Client.patch(cellViewPath(dashboardID, cellID), bodyJSON(upd)).
err := s.Client.
Patch(httpc.BodyJSON(upd), cellViewPath(dashboardID, cellID)).
DecodeJSON(&dcv).
Do(ctx)
if err != nil {
@ -1217,7 +1230,8 @@ func (s *DashboardService) UpdateDashboardCellView(ctx context.Context, dashboar
// ReplaceDashboardCells replaces all cells in a dashboard
func (s *DashboardService) ReplaceDashboardCells(ctx context.Context, id platform.ID, cs []*platform.Cell) error {
return s.Client.put(cellPath(id), bodyJSON(cs)).
return s.Client.
Put(httpc.BodyJSON(cs), cellPath(id)).
// TODO: previous implementation did not do anything with the response except validate it is valid json.
// seems likely we should have to overwrite (:sadpanda:) the incoming cs...
DecodeJSON(&dashboardCellsResponse{}).
@ -1233,7 +1247,7 @@ func cellPath(id platform.ID) string {
}
func cellViewPath(dashboardID, cellID platform.ID) string {
return path.Join(dashboardIDPath(dashboardID), "cells", cellID.String(), "view")
return path.Join(dashboardCellIDPath(dashboardID, cellID), "view")
}
func dashboardCellIDPath(id platform.ID, cellID platform.ID) string {

View File

@ -8,10 +8,10 @@ import (
"net/url"
"path"
"go.uber.org/zap"
"github.com/influxdata/httprouter"
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/pkg/httpc"
"go.uber.org/zap"
)
// LabelHandler represents an HTTP API handler for labels
@ -522,14 +522,15 @@ func labelIDPath(id influxdb.ID) string {
// LabelService connects to Influx via HTTP using tokens to manage labels
type LabelService struct {
Client *HTTPClient
Client *httpc.Client
OpPrefix string
}
// FindLabelByID returns a single label by ID.
func (s *LabelService) FindLabelByID(ctx context.Context, id influxdb.ID) (*influxdb.Label, error) {
var lr labelResponse
err := s.Client.get(labelIDPath(id)).
err := s.Client.
Get(labelIDPath(id)).
DecodeJSON(&lr).
Do(ctx)
if err != nil {
@ -549,8 +550,9 @@ func (s *LabelService) FindLabels(ctx context.Context, filter influxdb.LabelFilt
}
var lr labelsResponse
err := s.Client.get(labelsPath).
Queries(queryPairs...).
err := s.Client.
Get(labelsPath).
QueryParams(queryPairs...).
DecodeJSON(&lr).
Do(ctx)
if err != nil {
@ -566,7 +568,8 @@ func (s *LabelService) FindResourceLabels(ctx context.Context, filter influxdb.L
}
var r labelsResponse
err := s.Client.get(resourceIDPath(filter.ResourceType, filter.ResourceID, "labels")).
err := s.Client.
Get(resourceIDPath(filter.ResourceType, filter.ResourceID, "labels")).
DecodeJSON(&r).
Do(ctx)
if err != nil {
@ -578,7 +581,8 @@ func (s *LabelService) FindResourceLabels(ctx context.Context, filter influxdb.L
// CreateLabel creates a new label.
func (s *LabelService) CreateLabel(ctx context.Context, l *influxdb.Label) error {
var lr labelResponse
err := s.Client.post(labelsPath, bodyJSON(l)).
err := s.Client.
Post(httpc.BodyJSON(l), labelsPath).
DecodeJSON(&lr).
Do(ctx)
if err != nil {
@ -593,7 +597,8 @@ func (s *LabelService) CreateLabel(ctx context.Context, l *influxdb.Label) error
// UpdateLabel updates a label and returns the updated label.
func (s *LabelService) UpdateLabel(ctx context.Context, id influxdb.ID, upd influxdb.LabelUpdate) (*influxdb.Label, error) {
var lr labelResponse
err := s.Client.patch(labelIDPath(id), bodyJSON(upd)).
err := s.Client.
Patch(httpc.BodyJSON(upd), labelIDPath(id)).
DecodeJSON(&lr).
Do(ctx)
if err != nil {
@ -604,7 +609,9 @@ func (s *LabelService) UpdateLabel(ctx context.Context, id influxdb.ID, upd infl
// DeleteLabel removes a label by ID.
func (s *LabelService) DeleteLabel(ctx context.Context, id influxdb.ID) error {
return s.Client.delete(labelIDPath(id)).Do(ctx)
return s.Client.
Delete(labelIDPath(id)).
Do(ctx)
}
// CreateLabelMapping will create a labbel mapping
@ -614,7 +621,8 @@ func (s *LabelService) CreateLabelMapping(ctx context.Context, m *influxdb.Label
}
urlPath := resourceIDPath(m.ResourceType, m.ResourceID, "labels")
return s.Client.post(urlPath, bodyJSON(m)).
return s.Client.
Post(httpc.BodyJSON(m), urlPath).
DecodeJSON(m).
Do(ctx)
}
@ -624,5 +632,7 @@ func (s *LabelService) DeleteLabelMapping(ctx context.Context, m *influxdb.Label
return err
}
return s.Client.delete(resourceIDPath(m.ResourceType, m.ResourceID, "labels")).Do(ctx)
return s.Client.
Delete(resourceIDPath(m.ResourceType, m.ResourceID, "labels")).
Do(ctx)
}

View File

@ -5,13 +5,13 @@ import (
"encoding/json"
"fmt"
"net/http"
"path"
"strings"
"github.com/golang/gddo/httputil"
"github.com/influxdata/httprouter"
platform "github.com/influxdata/influxdb"
pctx "github.com/influxdata/influxdb/context"
"github.com/influxdata/influxdb/pkg/httpc"
"github.com/influxdata/influxdb/telegraf/plugins"
"go.uber.org/zap"
)
@ -428,12 +428,12 @@ func (h *TelegrafHandler) handleDeleteTelegraf(w http.ResponseWriter, r *http.Re
// TelegrafService is an http client that speaks to the telegraf service via HTTP.
type TelegrafService struct {
client *HTTPClient
client *httpc.Client
*UserResourceMappingService
}
// NewTelegrafService is a constructor for a telegraf service.
func NewTelegrafService(httpClient *HTTPClient) *TelegrafService {
func NewTelegrafService(httpClient *httpc.Client) *TelegrafService {
return &TelegrafService{
client: httpClient,
UserResourceMappingService: &UserResourceMappingService{
@ -447,7 +447,8 @@ var _ platform.TelegrafConfigStore = (*TelegrafService)(nil)
// FindTelegrafConfigByID returns a single telegraf config by ID.
func (s *TelegrafService) FindTelegrafConfigByID(ctx context.Context, id platform.ID) (*platform.TelegrafConfig, error) {
var cfg platform.TelegrafConfig
err := s.client.get(path.Join(telegrafsPath, id.String())).
err := s.client.
Get(telegrafsPath, id.String()).
Header("Accept", "application/json").
DecodeJSON(&cfg).
Do(ctx)
@ -477,8 +478,9 @@ func (s *TelegrafService) FindTelegrafConfigs(ctx context.Context, f platform.Te
var resp struct {
Configs []*platform.TelegrafConfig `json:"configurations"`
}
err := s.client.get(telegrafsPath).
Queries(queryPairs...).
err := s.client.
Get(telegrafsPath).
QueryParams(queryPairs...).
DecodeJSON(&resp).
Do(ctx)
if err != nil {
@ -491,7 +493,8 @@ func (s *TelegrafService) FindTelegrafConfigs(ctx context.Context, f platform.Te
// CreateTelegrafConfig creates a new telegraf config and sets b.ID with the new identifier.
func (s *TelegrafService) CreateTelegrafConfig(ctx context.Context, tc *platform.TelegrafConfig, userID platform.ID) error {
var teleResp platform.TelegrafConfig
err := s.client.post(telegrafsPath, bodyJSON(tc)).
err := s.client.
Post(httpc.BodyJSON(tc), telegrafsPath).
DecodeJSON(&teleResp).
Do(ctx)
if err != nil {
@ -509,5 +512,7 @@ func (s *TelegrafService) UpdateTelegrafConfig(ctx context.Context, id platform.
// DeleteTelegrafConfig removes a telegraf config by ID.
func (s *TelegrafService) DeleteTelegrafConfig(ctx context.Context, id platform.ID) error {
return s.client.delete(path.Join(telegrafsPath, id.String())).Do(ctx)
return s.client.
Delete(telegrafsPath, id.String()).
Do(ctx)
}

View File

@ -1,51 +0,0 @@
// Shared transports for all clients to prevent leaking connections
package http
import (
"crypto/tls"
"net"
"net/http"
"time"
)
// defaultTransport is the default implementation of Transport and is
// used by traceClient. It establishes network connections as needed
// and caches them for reuse by subsequent calls. It uses HTTP proxies
// as directed by the $HTTP_PROXY and $NO_PROXY (or $http_proxy and
// $no_proxy) environment variables.
// This is the same as http.DefaultTransport.
//
var defaultTransport http.RoundTripper = &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
DualStack: true,
}).DialContext,
MaxIdleConns: 100,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
}
// skipVerifyTransport is the default implementation of Transport and is
// used by traceClient (NewClient with insecure set to true). It establishes network connections as needed
// and caches them for reuse by subsequent calls. It uses HTTP proxies
// as directed by the $HTTP_PROXY and $NO_PROXY (or $http_proxy and
// $no_proxy) environment variables.
// This is the same as http.DefaultTransport but with TLS skip verify.
//
var skipVerifyTransport = &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
DualStack: true,
}).DialContext,
MaxIdleConns: 100,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
// This is the value that changes between this and http.DefaultTransport
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
}

View File

@ -7,10 +7,10 @@ import (
"net/http"
"path"
"go.uber.org/zap"
"github.com/influxdata/httprouter"
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/pkg/httpc"
"go.uber.org/zap"
)
type resourceUserResponse struct {
@ -264,13 +264,14 @@ func decodeDeleteMemberRequest(ctx context.Context, r *http.Request) (*deleteMem
// UserResourceMappingService is the struct of urm service
type UserResourceMappingService struct {
Client *HTTPClient
Client *httpc.Client
}
// FindUserResourceMappings returns the user resource mappings
func (s *UserResourceMappingService) FindUserResourceMappings(ctx context.Context, filter influxdb.UserResourceMappingFilter, opt ...influxdb.FindOptions) ([]*influxdb.UserResourceMapping, int, error) {
func (s *UserResourceMappingService) FindUserResourceMappings(ctx context.Context, f influxdb.UserResourceMappingFilter, opt ...influxdb.FindOptions) ([]*influxdb.UserResourceMapping, int, error) {
var results resourceUsersResponse
err := s.Client.get(resourceIDPath(filter.ResourceType, filter.ResourceID, string(filter.UserType)+"s")).
err := s.Client.
Get(resourceIDPath(f.ResourceType, f.ResourceID, string(f.UserType)+"s")).
DecodeJSON(&results).
Do(ctx)
if err != nil {
@ -280,8 +281,8 @@ func (s *UserResourceMappingService) FindUserResourceMappings(ctx context.Contex
urs := make([]*influxdb.UserResourceMapping, len(results.Users))
for k, item := range results.Users {
urs[k] = &influxdb.UserResourceMapping{
ResourceID: filter.ResourceID,
ResourceType: filter.ResourceType,
ResourceID: f.ResourceID,
ResourceType: f.ResourceType,
UserID: item.User.ID,
UserType: item.Role,
}
@ -296,7 +297,8 @@ func (s *UserResourceMappingService) CreateUserResourceMapping(ctx context.Conte
}
urlPath := resourceIDPath(m.ResourceType, m.ResourceID, string(m.UserType)+"s")
return s.Client.post(urlPath, bodyJSON(influxdb.User{ID: m.UserID})).
return s.Client.
Post(httpc.BodyJSON(influxdb.User{ID: m.UserID}), urlPath).
DecodeJSON(m).
Do(ctx)
}
@ -304,7 +306,9 @@ func (s *UserResourceMappingService) CreateUserResourceMapping(ctx context.Conte
// DeleteUserResourceMapping will delete user resource mapping based in criteria.
func (s *UserResourceMappingService) DeleteUserResourceMapping(ctx context.Context, resourceID influxdb.ID, userID influxdb.ID) error {
urlPath := resourceIDUserPath(influxdb.OrgsResourceType, resourceID, influxdb.Member, userID)
return s.Client.delete(urlPath).Do(ctx)
return s.Client.
Delete(urlPath).
Do(ctx)
}
func resourceIDPath(resourceType influxdb.ResourceType, resourceID influxdb.ID, p string) string {

View File

@ -5,10 +5,10 @@ import (
"encoding/json"
"fmt"
"net/http"
"path"
"github.com/influxdata/httprouter"
platform "github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/pkg/httpc"
"go.uber.org/zap"
)
@ -438,13 +438,14 @@ func (h *VariableHandler) handleDeleteVariable(w http.ResponseWriter, r *http.Re
// VariableService is a variable service over HTTP to the influxdb server
type VariableService struct {
Client *HTTPClient
Client *httpc.Client
}
// FindVariableByID finds a single variable from the store by its ID
func (s *VariableService) FindVariableByID(ctx context.Context, id platform.ID) (*platform.Variable, error) {
var mr variableResponse
err := s.Client.get(variableIDPath(id)).
err := s.Client.
Get(variablePath, id.String()).
DecodeJSON(&mr).
Do(ctx)
if err != nil {
@ -469,8 +470,9 @@ func (s *VariableService) FindVariables(ctx context.Context, filter platform.Var
}
var ms getVariablesResponse
err := s.Client.get(variablePath).
Queries(queryPairs...).
err := s.Client.
Get(variablePath).
QueryParams(queryPairs...).
DecodeJSON(&ms).
Do(ctx)
if err != nil {
@ -489,7 +491,8 @@ func (s *VariableService) CreateVariable(ctx context.Context, m *platform.Variab
}
}
return s.Client.post(variablePath, bodyJSON(m)).
return s.Client.
Post(httpc.BodyJSON(m), variablePath).
DecodeJSON(m).
Do(ctx)
}
@ -497,7 +500,8 @@ func (s *VariableService) CreateVariable(ctx context.Context, m *platform.Variab
// UpdateVariable updates a single variable with a changeset
func (s *VariableService) UpdateVariable(ctx context.Context, id platform.ID, update *platform.VariableUpdate) (*platform.Variable, error) {
var m platform.Variable
err := s.Client.patch(variableIDPath(id), bodyJSON(update)).
err := s.Client.
Patch(httpc.BodyJSON(update), variablePath, id.String()).
DecodeJSON(&m).
Do(ctx)
if err != nil {
@ -509,16 +513,15 @@ func (s *VariableService) UpdateVariable(ctx context.Context, id platform.ID, up
// ReplaceVariable replaces a single variable
func (s *VariableService) ReplaceVariable(ctx context.Context, variable *platform.Variable) error {
return s.Client.put(variableIDPath(variable.ID), bodyJSON(variable)).
return s.Client.
Put(httpc.BodyJSON(variable), variablePath, variable.ID.String()).
DecodeJSON(variable).
Do(ctx)
}
// DeleteVariable removes a variable from the store
func (s *VariableService) DeleteVariable(ctx context.Context, id platform.ID) error {
return s.Client.delete(variableIDPath(id)).Do(ctx)
}
func variableIDPath(id platform.ID) string {
return path.Join(variablePath, id.String())
return s.Client.
Delete(variablePath, id.String()).
Do(ctx)
}

41
pkg/httpc/body_fns.go Normal file
View File

@ -0,0 +1,41 @@
package httpc
import (
"bytes"
"encoding/gob"
"encoding/json"
"io"
)
// BodyFn provides a writer to which a value will be written to
// that will make it's way into the HTTP request.
type BodyFn func(w io.Writer) (header string, headerVal string, err error)
// BodyEmpty returns an empty body.
func BodyEmpty(io.Writer) (string, string, error) {
return "", "", nil
}
// BodyGob gob encodes the value provided for the HTTP request. Sets the
// Content-Encoding to application/gob.
func BodyGob(v interface{}) BodyFn {
return func(w io.Writer) (string, string, error) {
return headerContentEncoding, "application/gob", gob.NewEncoder(w).Encode(v)
}
}
// BodyJSON JSON encodes the value provided for the HTTP request. Sets the
// Content-Type to application/json.
func BodyJSON(v interface{}) BodyFn {
return func(w io.Writer) (string, string, error) {
return headerContentType, "application/json", json.NewEncoder(w).Encode(v)
}
}
type nopBufCloser struct {
bytes.Buffer
}
func (*nopBufCloser) Close() error {
return nil
}

196
pkg/httpc/client.go Normal file
View File

@ -0,0 +1,196 @@
package httpc
import (
"errors"
"io"
"net/http"
"net/url"
"path"
"github.com/influxdata/influxdb"
)
type (
// WriteCloserFn is a write closer wrapper than indicates the type of writer by
// returning the header and header value associated with the writer closer.
// i.e. GZIP writer returns header Content-Encoding with value gzip alongside
// the writer.
WriteCloserFn func(closer io.WriteCloser) (string, string, io.WriteCloser)
// doer provides an abstraction around the actual http client behavior. The doer
// can be faked out in tests or another http client provided in its place.
doer interface {
Do(*http.Request) (*http.Response, error)
}
)
// Client is a basic http client that can make cReqs with out having to juggle
// the token and so forth. It provides sane defaults for checking response
// statuses, sets auth token when provided, and sets the content type to
// application/json for each request. The token, response checker, and
// content type can be overridden on the Req as well.
type Client struct {
addr url.URL
doer doer
defaultHeaders http.Header
writerFns []WriteCloserFn
authFn func(*http.Request)
respFn func(*http.Response) error
statusFn func(*http.Response) error
}
// New creates a new httpc client.
func New(opts ...ClientOptFn) (*Client, error) {
opt := clientOpt{
authFn: func(*http.Request) {},
}
for _, o := range opts {
if err := o(&opt); err != nil {
return nil, err
}
}
if opt.addr == "" {
return nil, errors.New("must provide a non empty host address")
}
u, err := url.Parse(opt.addr)
if err != nil {
return nil, err
}
if opt.doer == nil {
opt.doer = defaultHTTPClient(u.Scheme, opt.insecureSkipVerify)
}
return &Client{
addr: *u,
doer: opt.doer,
defaultHeaders: opt.headers,
authFn: opt.authFn,
statusFn: opt.statusFn,
writerFns: opt.writerFns,
}, nil
}
// Delete generates a DELETE request.
func (c *Client) Delete(urlPath string, rest ...string) *Req {
return c.Req(http.MethodDelete, nil, urlPath, rest...)
}
// Get generates a GET request.
func (c *Client) Get(urlPath string, rest ...string) *Req {
return c.Req(http.MethodGet, nil, urlPath, rest...)
}
// Patch generates a PATCH request.
func (c *Client) Patch(bFn BodyFn, urlPath string, rest ...string) *Req {
return c.Req(http.MethodPatch, bFn, urlPath, rest...)
}
// Post generates a POST request.
func (c *Client) Post(bFn BodyFn, urlPath string, rest ...string) *Req {
return c.Req(http.MethodPost, bFn, urlPath, rest...)
}
// Put generates a PUT request.
func (c *Client) Put(bFn BodyFn, urlPath string, rest ...string) *Req {
return c.Req(http.MethodPut, bFn, urlPath, rest...)
}
// Req constructs a request.
func (c *Client) Req(method string, bFn BodyFn, urlPath string, rest ...string) *Req {
bodyF := BodyEmpty
if bFn != nil {
bodyF = bFn
}
headers := make(http.Header, len(c.defaultHeaders))
for header, vals := range c.defaultHeaders {
for _, v := range vals {
headers.Add(header, v)
}
}
var buf nopBufCloser
var w io.WriteCloser = &buf
for _, writerFn := range c.writerFns {
header, headerVal, ww := writerFn(w)
w = ww
headers.Add(header, headerVal)
}
header, headerVal, err := bodyF(w)
if err != nil {
// TODO(@jsteenb2): add a inspection for an OK() or Valid() method, then enforce
// that across all consumers? Same for all bodyFns for that matter.
return &Req{
err: &influxdb.Error{
Code: influxdb.EInvalid,
Err: err,
},
}
}
if header != "" {
headers.Add(header, headerVal)
}
// w.Close here is necessary since we have to close any gzip writer
// or other writer that requires closing.
if err := w.Close(); err != nil {
return &Req{err: err}
}
var body io.Reader
if buf.Len() > 0 {
body = &buf
}
req, err := http.NewRequest(method, c.buildURL(urlPath, rest...), body)
if err != nil {
return &Req{err: err}
}
cr := &Req{
client: c.doer,
req: req,
authFn: c.authFn,
respFn: c.respFn,
statusFn: c.statusFn,
}
return cr.Headers(headers)
}
// Clone creates a new *Client type from an existing client. This may be
// useful if you want to have a shared base client, then take a specific
// client from that base and tack on some extra goodies like specific headers
// and whatever else that suits you.
// Note: a new net.http.Client type will not be created. It will share the existing
// http.Client from the parent httpc.Client. Same connection pool, different specifics.
func (c *Client) Clone(opts ...ClientOptFn) (*Client, error) {
existingOpts := []ClientOptFn{
WithAuth(c.authFn),
withDoer(c.doer),
WithRespFn(c.respFn),
WithStatusFn(c.statusFn),
}
for h, vals := range c.defaultHeaders {
for _, v := range vals {
existingOpts = append(existingOpts, WithHeader(h, v))
}
}
for _, fn := range c.writerFns {
existingOpts = append(existingOpts, WithWriterFn(fn))
}
return New(append(existingOpts, opts...)...)
}
func (c *Client) buildURL(urlPath string, rest ...string) string {
u := c.addr
u.Path = path.Join(u.Path, urlPath)
if len(rest) > 0 {
u.Path = path.Join(u.Path, path.Join(rest...))
}
return u.String()
}

423
pkg/httpc/client_test.go Normal file
View File

@ -0,0 +1,423 @@
package httpc
import (
"bytes"
"compress/gzip"
"context"
"encoding/gob"
"encoding/json"
"errors"
"io"
"io/ioutil"
"net/http"
"sort"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestClient(t *testing.T) {
newClient := func(t *testing.T, addr string, opts ...ClientOptFn) *Client {
t.Helper()
client, err := New(append(opts, WithAddr(addr))...)
require.NoError(t, err)
return client
}
type (
respFn func(status int, req *http.Request) (resp *http.Response, err error)
authFn func(status int, respFn respFn, opts ...ClientOptFn) (*Client, *fakeDoer)
newReqFn func(*Client, string, reqBody) *Req
testCase struct {
method string
status int
clientOpts []ClientOptFn
reqFn newReqFn
queryParams [][2]string
reqBody reqBody
}
)
tokenAuthClient := func(status int, respFn respFn, opts ...ClientOptFn) (*Client, *fakeDoer) {
const token = "secrettoken"
fakeDoer := &fakeDoer{
doFn: func(r *http.Request) (*http.Response, error) {
if r.Header.Get("Authorization") != "Token "+token {
return nil, errors.New("unauthed token")
}
return respFn(status, r)
},
}
client := newClient(t, "http://example.com", append(opts, WithAuthToken(token))...)
client.doer = fakeDoer
return client, fakeDoer
}
noAuthClient := func(status int, respFn respFn, opts ...ClientOptFn) (*Client, *fakeDoer) {
fakeDoer := &fakeDoer{
doFn: func(r *http.Request) (*http.Response, error) {
return respFn(status, r)
},
}
client := newClient(t, "http://example.com", opts...)
client.doer = fakeDoer
return client, fakeDoer
}
authTests := []struct {
name string
clientFn authFn
}{
{
name: "no auth",
clientFn: noAuthClient,
},
{
name: "token auth",
clientFn: tokenAuthClient,
},
}
encodingTests := []struct {
name string
respFn respFn
decodeFn func(v interface{}) func(r *Req) *Req
}{
{
name: "json response",
respFn: stubRespNJSONBody,
decodeFn: func(v interface{}) func(r *Req) *Req {
return func(r *Req) *Req { return r.DecodeJSON(v) }
},
},
{
name: "gzipped json response",
respFn: stubRespNGZippedJSON,
decodeFn: func(v interface{}) func(r *Req) *Req {
return func(r *Req) *Req { return r.DecodeJSON(v) }
},
},
{
name: "gob response",
respFn: stubRespNGobBody,
decodeFn: func(v interface{}) func(r *Req) *Req {
return func(r *Req) *Req { return r.DecodeGob(v) }
},
},
}
testWithRespBody := func(tt testCase) func(t *testing.T) {
return func(t *testing.T) {
t.Helper()
for _, encTest := range encodingTests {
t.Run(encTest.name, func(t *testing.T) {
for _, authTest := range authTests {
fn := func(t *testing.T) {
client, fakeDoer := authTest.clientFn(tt.status, encTest.respFn, tt.clientOpts...)
req := tt.reqFn(client, "/new/path/heres", tt.reqBody).
Accept("application/json").
Header("X-Code", "Code").
QueryParams(tt.queryParams...).
StatusFn(StatusIn(tt.status))
var actual echoResp
req = encTest.decodeFn(&actual)(req)
err := req.Do(context.TODO())
require.NoError(t, err)
expectedResp := echoResp{
Method: tt.method,
Scheme: "http",
Host: "example.com",
Path: "/new/path/heres",
Queries: tt.queryParams,
ReqBody: tt.reqBody,
}
assert.Equal(t, expectedResp, actual)
require.Len(t, fakeDoer.args, 1)
assert.Equal(t, "application/json", fakeDoer.args[0].Header.Get("Accept"))
assert.Equal(t, "Code", fakeDoer.args[0].Header.Get("X-Code"))
}
t.Run(authTest.name, fn)
}
})
}
}
}
newGet := func(client *Client, urlPath string, _ reqBody) *Req {
return client.Get(urlPath)
}
t.Run("Delete", func(t *testing.T) {
for _, authTest := range authTests {
fn := func(t *testing.T) {
client, fakeDoer := authTest.clientFn(204, stubResp)
err := client.Delete("/new/path/heres").
Header("X-Code", "Code").
StatusFn(StatusIn(204)).
Do(context.TODO())
require.NoError(t, err)
require.Len(t, fakeDoer.args, 1)
assert.Equal(t, "Code", fakeDoer.args[0].Header.Get("X-Code"))
}
t.Run(authTest.name, fn)
}
})
t.Run("Get", func(t *testing.T) {
tests := []struct {
name string
testCase
}{
{
name: "handles basic call",
testCase: testCase{
status: 200,
},
},
{
name: "handles query values",
testCase: testCase{
queryParams: [][2]string{{"q1", "v1"}, {"q2", "v2"}},
status: 202,
},
},
}
for _, tt := range tests {
tt.method = "GET"
tt.reqFn = newGet
t.Run(tt.name, testWithRespBody(tt.testCase))
}
})
t.Run("Patch Post Put with request bodies", func(t *testing.T) {
methods := []struct {
name string
methodCallFn func(client *Client, urlPath string, bFn BodyFn) *Req
}{
{
name: "PATCH",
methodCallFn: func(client *Client, urlPath string, bFn BodyFn) *Req {
return client.Patch(bFn, urlPath)
},
},
{
name: "POST",
methodCallFn: func(client *Client, urlPath string, bFn BodyFn) *Req {
return client.Post(bFn, urlPath)
},
},
{
name: "PUT",
methodCallFn: func(client *Client, urlPath string, bFn BodyFn) *Req {
return client.Put(bFn, urlPath)
},
},
}
for _, method := range methods {
t.Run(method.name, func(t *testing.T) {
tests := []struct {
name string
testCase
}{
{
name: "handles json req body",
testCase: testCase{
status: 200,
reqFn: func(client *Client, urlPath string, body reqBody) *Req {
return method.methodCallFn(client, urlPath, BodyJSON(body))
},
reqBody: reqBody{
Foo: "foo 1",
Bar: 31,
},
},
},
{
name: "handles gob req body",
testCase: testCase{
status: 201,
reqFn: func(client *Client, urlPath string, body reqBody) *Req {
return method.methodCallFn(client, urlPath, BodyGob(body))
},
reqBody: reqBody{
Foo: "foo 1",
Bar: 31,
},
},
},
{
name: "handles gzipped json req body",
testCase: testCase{
status: 201,
clientOpts: []ClientOptFn{WithWriterGZIP()},
reqFn: func(client *Client, urlPath string, body reqBody) *Req {
return method.methodCallFn(client, urlPath, BodyJSON(body))
},
reqBody: reqBody{
Foo: "foo",
Bar: 31,
},
},
},
}
for _, tt := range tests {
tt.method = method.name
t.Run(tt.name, testWithRespBody(tt.testCase))
}
})
}
})
}
type fakeDoer struct {
doFn func(*http.Request) (*http.Response, error)
args []*http.Request
callCount int
}
func (f *fakeDoer) Do(r *http.Request) (*http.Response, error) {
f.callCount++
f.args = append(f.args, r)
return f.doFn(r)
}
func stubResp(status int, _ *http.Request) (*http.Response, error) {
return &http.Response{
StatusCode: status,
Body: ioutil.NopCloser(new(bytes.Buffer)),
}, nil
}
func stubRespNGZippedJSON(status int, r *http.Request) (*http.Response, error) {
e, err := decodeFromContentType(r)
if err != nil {
return nil, err
}
var buf bytes.Buffer
w := gzip.NewWriter(&buf)
defer w.Close()
if err := json.NewEncoder(w).Encode(e); err != nil {
return nil, err
}
if err := w.Flush(); err != nil {
return nil, err
}
return &http.Response{
StatusCode: status,
Body: ioutil.NopCloser(&buf),
Header: http.Header{
"Content-Encoding": []string{"gzip"},
headerContentType: []string{"application/json"},
},
}, nil
}
func stubRespNJSONBody(status int, r *http.Request) (*http.Response, error) {
e, err := decodeFromContentType(r)
if err != nil {
return nil, err
}
var buf bytes.Buffer
if err := json.NewEncoder(&buf).Encode(e); err != nil {
return nil, err
}
return &http.Response{
StatusCode: status,
Body: ioutil.NopCloser(&buf),
Header: http.Header{headerContentType: []string{"application/json"}},
}, nil
}
func stubRespNGobBody(status int, r *http.Request) (*http.Response, error) {
e, err := decodeFromContentType(r)
if err != nil {
return nil, err
}
var buf bytes.Buffer
if err := gob.NewEncoder(&buf).Encode(e); err != nil {
return nil, err
}
return &http.Response{
StatusCode: status,
Body: ioutil.NopCloser(&buf),
Header: http.Header{headerContentEncoding: []string{"application/gob"}},
}, nil
}
type (
reqBody struct {
Foo string
Bar int
}
echoResp struct {
Method string
Scheme string
Host string
Path string
Queries [][2]string
ReqBody reqBody
}
)
func decodeFromContentType(r *http.Request) (echoResp, error) {
e := echoResp{
Method: r.Method,
Scheme: r.URL.Scheme,
Host: r.URL.Host,
Path: r.URL.Path,
}
for key, vals := range r.URL.Query() {
for _, v := range vals {
e.Queries = append(e.Queries, [2]string{key, v})
}
}
sort.Slice(e.Queries, func(i, j int) bool {
qi, qj := e.Queries[i], e.Queries[j]
if qi[0] == qj[0] {
return qi[1] < qj[1]
}
return qi[0] < qj[0]
})
var reader io.Reader = r.Body
if r.Header.Get(headerContentEncoding) == "gzip" {
gr, err := gzip.NewReader(reader)
if err != nil {
return echoResp{}, err
}
reader = gr
}
if r.Header.Get(headerContentEncoding) == "application/gob" {
return e, gob.NewDecoder(reader).Decode(&e.ReqBody)
}
if r.Header.Get(headerContentType) == "application/json" {
return e, json.NewDecoder(reader).Decode(&e.ReqBody)
}
return e, nil
}

152
pkg/httpc/options.go Normal file
View File

@ -0,0 +1,152 @@
package httpc
import (
"compress/gzip"
"crypto/tls"
"io"
"net"
"net/http"
"time"
)
// ClientOptFn are options to set different parameters on the Client.
type ClientOptFn func(*clientOpt) error
type clientOpt struct {
addr string
insecureSkipVerify bool
doer doer
headers http.Header
authFn func(*http.Request)
respFn func(*http.Response) error
statusFn func(*http.Response) error
writerFns []WriteCloserFn
}
// WithAddr sets the host address on the client.
func WithAddr(addr string) ClientOptFn {
return func(opt *clientOpt) error {
opt.addr = addr
return nil
}
}
// WithAuth provides a means to set a custom auth that doesn't match
// the provided auth types here.
func WithAuth(fn func(r *http.Request)) ClientOptFn {
return func(opt *clientOpt) error {
opt.authFn = fn
return nil
}
}
// WithAuthToken provides token auth for requests.
func WithAuthToken(token string) ClientOptFn {
return func(opts *clientOpt) error {
fn := func(r *http.Request) {
r.Header.Set("Authorization", "Token "+token)
}
return WithAuth(fn)(opts)
}
}
// WithContentType sets the content type that will be applied to the requests created
// by the Client.
func WithContentType(ct string) ClientOptFn {
return func(opt *clientOpt) error {
return WithHeader(headerContentType, ct)(opt)
}
}
func withDoer(d doer) ClientOptFn {
return func(opt *clientOpt) error {
opt.doer = d
return nil
}
}
// WithHeader sets a default header that will be applied to all requests created
// by the client.
func WithHeader(header, val string) ClientOptFn {
return func(opt *clientOpt) error {
if opt.headers == nil {
opt.headers = make(http.Header)
}
opt.headers.Add(header, val)
return nil
}
}
// WithHTTPClient sets the raw http client on the httpc Client.
func WithHTTPClient(c *http.Client) ClientOptFn {
return func(opt *clientOpt) error {
opt.doer = c
return nil
}
}
// WithInsecureSkipVerify sets the insecure skip verify on the http client's htp transport.
func WithInsecureSkipVerify(b bool) ClientOptFn {
return func(opts *clientOpt) error {
opts.insecureSkipVerify = b
return nil
}
}
// WithRespFn sets the default resp fn for the client that will be applied to all requests
// generated from it.
func WithRespFn(fn func(*http.Response) error) ClientOptFn {
return func(opt *clientOpt) error {
opt.respFn = fn
return nil
}
}
// WithStatusFn sets the default status fn for the client that will be applied to all requests
// generated from it.
func WithStatusFn(fn func(*http.Response) error) ClientOptFn {
return func(opt *clientOpt) error {
opt.statusFn = fn
return nil
}
}
// WithWriterFn applies the provided writer behavior to all the request bodies'
// generated from the client.
func WithWriterFn(fn WriteCloserFn) ClientOptFn {
return func(opt *clientOpt) error {
opt.writerFns = append(opt.writerFns, fn)
return nil
}
}
// WithWriterGZIP gzips the request body generated from this client.
func WithWriterGZIP() ClientOptFn {
return func(opt *clientOpt) error {
fn := func(w io.WriteCloser) (string, string, io.WriteCloser) {
return headerContentEncoding, "gzip", gzip.NewWriter(w)
}
return WithWriterFn(fn)(opt)
}
}
func defaultHTTPClient(scheme string, insecure bool) *http.Client {
tr := http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
DualStack: true,
}).DialContext,
MaxIdleConns: 100,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
}
if scheme == "https" && insecure {
tr.TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
}
return &http.Client{
Transport: &tr,
}
}

198
pkg/httpc/req.go Normal file
View File

@ -0,0 +1,198 @@
package httpc
import (
"compress/gzip"
"context"
"encoding/gob"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"strings"
"github.com/influxdata/influxdb"
)
const (
headerContentType = "Content-Type"
headerContentEncoding = "Content-Encoding"
)
// Req is a request type.
type Req struct {
client doer
req *http.Request
authFn func(*http.Request)
decodeFn func(*http.Response) error
respFn func(*http.Response) error
statusFn func(*http.Response) error
err error
}
// Accept sets the Accept header to the provided content type on the request.
func (r *Req) Accept(contentType string) *Req {
return r.Header("Accept", contentType)
}
// ContentType sets the Content-Type header to the provided content type on the request.
func (r *Req) ContentType(contentType string) *Req {
return r.Header("Content-Type", contentType)
}
// Decode sets the decoding functionality for the request. All Decode calls are called
// after the status and response functions are called. Decoding will not happen if error
// encountered in the status check.
func (r *Req) Decode(fn func(resp *http.Response) error) *Req {
if r.err != nil {
return r
}
r.decodeFn = fn
return r
}
// DecodeGob sets the decoding functionality to decode gob for the request.
func (r *Req) DecodeGob(v interface{}) *Req {
return r.Decode(func(resp *http.Response) error {
r := decodeReader(resp.Body, resp.Header)
return gob.NewDecoder(r).Decode(v)
})
}
// DecodeJSON sets the decoding functionality to decode json for the request.
func (r *Req) DecodeJSON(v interface{}) *Req {
return r.Decode(func(resp *http.Response) error {
r := decodeReader(resp.Body, resp.Header)
return json.NewDecoder(r).Decode(v)
})
}
// Header adds the header to the http request.
func (r *Req) Header(k, v string) *Req {
if r.err != nil {
return r
}
r.req.Header.Add(k, v)
return r
}
// Headers adds all the headers to the http request.
func (r *Req) Headers(m map[string][]string) *Req {
if r.err != nil {
return r
}
for header, vals := range m {
if header == "" {
continue
}
for _, v := range vals {
r = r.Header(header, v)
}
}
return r
}
// QueryParams adds the query params to the http request.
func (r *Req) QueryParams(pairs ...[2]string) *Req {
if r.err != nil || len(pairs) == 0 {
return r
}
params := r.req.URL.Query()
for _, p := range pairs {
params.Add(p[0], p[1])
}
r.req.URL.RawQuery = params.Encode()
return r
}
// RespFn provides a means to inspect the entire http response. This function runs first
// before the status and decode functions are called.
func (r *Req) RespFn(fn func(*http.Response) error) *Req {
r.respFn = fn
return r
}
// StatusFn sets a status check function. This runs after the resp func
// but before the decode fn.
func (r *Req) StatusFn(fn func(*http.Response) error) *Req {
r.statusFn = fn
return r
}
// Do makes the HTTP request. Any errors that had been encountered in
// the lifetime of the Req type will be returned here first, in place of
// the call. This makes it safe to call Do at anytime.
func (r *Req) Do(ctx context.Context) error {
if r.err != nil {
return r.err
}
r.authFn(r.req)
// TODO(@jsteenb2): wrap do with retry/backoff policy.
return r.do(ctx)
}
func (r *Req) do(ctx context.Context) error {
resp, err := r.client.Do(r.req.WithContext(ctx))
if err != nil {
return err
}
defer func() {
io.Copy(ioutil.Discard, resp.Body) // drain body completely
resp.Body.Close()
}()
if r.respFn != nil {
if err := r.respFn(resp); err != nil {
return err
}
}
if r.statusFn != nil {
if err := r.statusFn(resp); err != nil {
return err
}
}
if r.decodeFn != nil {
if err := r.decodeFn(resp); err != nil {
return &influxdb.Error{
Code: influxdb.EInvalid,
Err: err,
}
}
}
return nil
}
// StatusIn validates the status code matches one of the provided statuses.
func StatusIn(code int, rest ...int) func(*http.Response) error {
return func(resp *http.Response) error {
for _, code := range append(rest, code) {
if code == resp.StatusCode {
return nil
}
}
return fmt.Errorf("recieved unexpected status: %s %d", resp.Status, resp.StatusCode)
}
}
var encodingReaders = map[string]func(io.Reader) io.Reader{
"gzip": func(r io.Reader) io.Reader {
if gr, err := gzip.NewReader(r); err == nil {
return gr
}
return r
},
}
func decodeReader(r io.Reader, headers http.Header) io.Reader {
contentEncoding := strings.TrimSpace(headers.Get(headerContentEncoding))
fn, ok := encodingReaders[contentEncoding]
if ok {
return fn(r)
}
return r
}