chore(http): refactor user http service to use the httpc.Client
parent
1c2b900687
commit
f527636fe0
|
|
@ -52,10 +52,13 @@ func newUserService() (platform.UserService, error) {
|
|||
if flags.local {
|
||||
return newLocalKVService()
|
||||
}
|
||||
|
||||
client, err := newHTTPClient()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &http.UserService{
|
||||
Addr: flags.host,
|
||||
Token: flags.token,
|
||||
InsecureSkipVerify: flags.skipVerify,
|
||||
Client: client,
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
|
@ -207,11 +210,7 @@ func userCreateF(cmd *cobra.Command, args []string) error {
|
|||
return err
|
||||
}
|
||||
|
||||
passSVC := &http.PasswordService{
|
||||
Addr: flags.host,
|
||||
Token: flags.token,
|
||||
InsecureSkipVerify: flags.skipVerify,
|
||||
}
|
||||
passSVC := &http.PasswordService{Client: c}
|
||||
|
||||
ctx := context.Background()
|
||||
if err := passSVC.SetPassword(ctx, user.ID, pass); err != nil {
|
||||
|
|
|
|||
|
|
@ -63,11 +63,8 @@ func NewService(addr, token string) (*Service, error) {
|
|||
BucketService: &BucketService{Client: httpClient},
|
||||
DashboardService: &DashboardService{Client: httpClient},
|
||||
OrganizationService: &OrganizationService{Client: httpClient},
|
||||
UserService: &UserService{
|
||||
Addr: addr,
|
||||
Token: token,
|
||||
},
|
||||
VariableService: &VariableService{Client: httpClient},
|
||||
UserService: &UserService{Client: httpClient},
|
||||
VariableService: &VariableService{Client: httpClient},
|
||||
WriteService: &WriteService{
|
||||
Addr: addr,
|
||||
Token: token,
|
||||
|
|
|
|||
|
|
@ -1,16 +1,15 @@
|
|||
package http
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"path"
|
||||
|
||||
"github.com/influxdata/httprouter"
|
||||
"github.com/influxdata/influxdb"
|
||||
icontext "github.com/influxdata/influxdb/context"
|
||||
"github.com/influxdata/influxdb/pkg/httpc"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
|
|
@ -337,6 +336,72 @@ func decodeDeleteUserRequest(ctx context.Context, r *http.Request) (*deleteUserR
|
|||
}, nil
|
||||
}
|
||||
|
||||
// hanldeGetUserLog retrieves a user log by the users ID.
|
||||
func (h *UserHandler) handleGetUserLog(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := r.Context()
|
||||
req, err := decodeGetUserLogRequest(ctx, r)
|
||||
if err != nil {
|
||||
h.HandleHTTPError(ctx, err, w)
|
||||
return
|
||||
}
|
||||
|
||||
log, _, err := h.UserOperationLogService.GetUserOperationLog(ctx, req.UserID, req.opts)
|
||||
if err != nil {
|
||||
h.HandleHTTPError(ctx, err, w)
|
||||
return
|
||||
}
|
||||
h.log.Debug("User log retrieved", zap.String("log", fmt.Sprint(log)))
|
||||
|
||||
if err := encodeResponse(ctx, w, http.StatusOK, newUserLogResponse(req.UserID, log)); err != nil {
|
||||
h.HandleHTTPError(ctx, err, w)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
type getUserLogRequest struct {
|
||||
UserID influxdb.ID
|
||||
opts influxdb.FindOptions
|
||||
}
|
||||
|
||||
func decodeGetUserLogRequest(ctx context.Context, r *http.Request) (*getUserLogRequest, error) {
|
||||
params := httprouter.ParamsFromContext(ctx)
|
||||
id := params.ByName("id")
|
||||
if id == "" {
|
||||
return nil, &influxdb.Error{
|
||||
Code: influxdb.EInvalid,
|
||||
Msg: "url missing id",
|
||||
}
|
||||
}
|
||||
|
||||
var i influxdb.ID
|
||||
if err := i.DecodeFromString(id); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
opts, err := decodeFindOptions(ctx, r)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &getUserLogRequest{
|
||||
UserID: i,
|
||||
opts: *opts,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func newUserLogResponse(id influxdb.ID, es []*influxdb.OperationLogEntry) *operationLogResponse {
|
||||
logs := make([]*operationLogEntryResponse, 0, len(es))
|
||||
for _, e := range es {
|
||||
logs = append(logs, newOperationLogEntryResponse(e))
|
||||
}
|
||||
return &operationLogResponse{
|
||||
Links: map[string]string{
|
||||
"self": fmt.Sprintf("/api/v2/users/%s/logs", id),
|
||||
},
|
||||
Logs: logs,
|
||||
}
|
||||
}
|
||||
|
||||
type usersResponse struct {
|
||||
Links map[string]string `json:"links"`
|
||||
Users []*UserResponse `json:"users"`
|
||||
|
|
@ -484,39 +549,19 @@ func decodePatchUserRequest(ctx context.Context, r *http.Request) (*patchUserReq
|
|||
|
||||
// UserService connects to Influx via HTTP using tokens to manage users
|
||||
type UserService struct {
|
||||
Addr string
|
||||
Token string
|
||||
InsecureSkipVerify bool
|
||||
Client *httpc.Client
|
||||
// OpPrefix is the ops of not found error.
|
||||
OpPrefix string
|
||||
}
|
||||
|
||||
// FindMe returns user information about the owner of the token
|
||||
func (s *UserService) FindMe(ctx context.Context, id influxdb.ID) (*influxdb.User, error) {
|
||||
url, err := NewURL(s.Addr, prefixMe)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
req, err := http.NewRequest("GET", url.String(), nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
SetToken(s.Token, req)
|
||||
|
||||
hc := NewClient(url.Scheme, s.InsecureSkipVerify)
|
||||
resp, err := hc.Do(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if err := CheckError(resp); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var res UserResponse
|
||||
if err := json.NewDecoder(resp.Body).Decode(&res); err != nil {
|
||||
err := s.Client.
|
||||
Get(prefixMe).
|
||||
DecodeJSON(&res).
|
||||
Do(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &res.User, nil
|
||||
|
|
@ -524,33 +569,14 @@ func (s *UserService) FindMe(ctx context.Context, id influxdb.ID) (*influxdb.Use
|
|||
|
||||
// FindUserByID returns a single user by ID.
|
||||
func (s *UserService) FindUserByID(ctx context.Context, id influxdb.ID) (*influxdb.User, error) {
|
||||
url, err := NewURL(s.Addr, userIDPath(id))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
req, err := http.NewRequest("GET", url.String(), nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
SetToken(s.Token, req)
|
||||
|
||||
hc := NewClient(url.Scheme, s.InsecureSkipVerify)
|
||||
resp, err := hc.Do(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if err := CheckError(resp); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var res UserResponse
|
||||
if err := json.NewDecoder(resp.Body).Decode(&res); err != nil {
|
||||
err := s.Client.
|
||||
Get(prefixUsers, id.String()).
|
||||
DecodeJSON(&res).
|
||||
Do(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &res.User, nil
|
||||
}
|
||||
|
||||
|
|
@ -584,40 +610,21 @@ func (s *UserService) FindUser(ctx context.Context, filter influxdb.UserFilter)
|
|||
// FindUsers returns a list of users that match filter and the total count of matching users.
|
||||
// Additional options provide pagination & sorting.
|
||||
func (s *UserService) FindUsers(ctx context.Context, filter influxdb.UserFilter, opt ...influxdb.FindOptions) ([]*influxdb.User, int, error) {
|
||||
url, err := NewURL(s.Addr, prefixUsers)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
query := url.Query()
|
||||
|
||||
req, err := http.NewRequest("GET", url.String(), nil)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
params := findOptionParams(opt...)
|
||||
if filter.ID != nil {
|
||||
query.Add("id", filter.ID.String())
|
||||
params = append(params, [2]string{"id", filter.ID.String()})
|
||||
}
|
||||
if filter.Name != nil {
|
||||
query.Add("name", *filter.Name)
|
||||
}
|
||||
|
||||
req.URL.RawQuery = query.Encode()
|
||||
SetToken(s.Token, req)
|
||||
|
||||
hc := NewClient(url.Scheme, s.InsecureSkipVerify)
|
||||
resp, err := hc.Do(req)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if err := CheckError(resp); err != nil {
|
||||
return nil, 0, err
|
||||
params = append(params, [2]string{"name", *filter.Name})
|
||||
}
|
||||
|
||||
var r usersResponse
|
||||
if err := json.NewDecoder(resp.Body).Decode(&r); err != nil {
|
||||
err := s.Client.
|
||||
Get(prefixUsers).
|
||||
QueryParams(params...).
|
||||
DecodeJSON(&r).
|
||||
Do(ctx)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
|
|
@ -627,219 +634,50 @@ func (s *UserService) FindUsers(ctx context.Context, filter influxdb.UserFilter,
|
|||
|
||||
// CreateUser creates a new user and sets u.ID with the new identifier.
|
||||
func (s *UserService) CreateUser(ctx context.Context, u *influxdb.User) error {
|
||||
url, err := NewURL(s.Addr, prefixUsers)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
octets, err := json.Marshal(u)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
req, err := http.NewRequest("POST", url.String(), bytes.NewReader(octets))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
SetToken(s.Token, req)
|
||||
|
||||
hc := NewClient(url.Scheme, s.InsecureSkipVerify)
|
||||
|
||||
resp, err := hc.Do(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
// TODO(jsternberg): Should this check for a 201 explicitly?
|
||||
if err := CheckError(resp); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := json.NewDecoder(resp.Body).Decode(u); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
return s.Client.
|
||||
PostJSON(u, prefixUsers).
|
||||
DecodeJSON(u).
|
||||
Do(ctx)
|
||||
}
|
||||
|
||||
// UpdateUser updates a single user with changeset.
|
||||
// Returns the new user state after update.
|
||||
func (s *UserService) UpdateUser(ctx context.Context, id influxdb.ID, upd influxdb.UserUpdate) (*influxdb.User, error) {
|
||||
url, err := NewURL(s.Addr, userIDPath(id))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
octets, err := json.Marshal(upd)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
req, err := http.NewRequest("PATCH", url.String(), bytes.NewReader(octets))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
SetToken(s.Token, req)
|
||||
|
||||
hc := NewClient(url.Scheme, s.InsecureSkipVerify)
|
||||
|
||||
resp, err := hc.Do(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if err := CheckError(resp); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var res UserResponse
|
||||
if err := json.NewDecoder(resp.Body).Decode(&res); err != nil {
|
||||
err := s.Client.
|
||||
PatchJSON(upd, prefixUsers, id.String()).
|
||||
DecodeJSON(&res).
|
||||
Do(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &res.User, nil
|
||||
}
|
||||
|
||||
// DeleteUser removes a user by ID.
|
||||
func (s *UserService) DeleteUser(ctx context.Context, id influxdb.ID) error {
|
||||
url, err := NewURL(s.Addr, userIDPath(id))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
req, err := http.NewRequest("DELETE", url.String(), nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
SetToken(s.Token, req)
|
||||
|
||||
hc := NewClient(url.Scheme, s.InsecureSkipVerify)
|
||||
resp, err := hc.Do(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
return CheckErrorStatus(http.StatusNoContent, resp)
|
||||
}
|
||||
|
||||
func userIDPath(id influxdb.ID) string {
|
||||
return path.Join(prefixUsers, id.String())
|
||||
}
|
||||
|
||||
// hanldeGetUserLog retrieves a user log by the users ID.
|
||||
func (h *UserHandler) handleGetUserLog(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := r.Context()
|
||||
req, err := decodeGetUserLogRequest(ctx, r)
|
||||
if err != nil {
|
||||
h.HandleHTTPError(ctx, err, w)
|
||||
return
|
||||
}
|
||||
|
||||
log, _, err := h.UserOperationLogService.GetUserOperationLog(ctx, req.UserID, req.opts)
|
||||
if err != nil {
|
||||
h.HandleHTTPError(ctx, err, w)
|
||||
return
|
||||
}
|
||||
h.log.Debug("User log retrieved", zap.String("log", fmt.Sprint(log)))
|
||||
|
||||
if err := encodeResponse(ctx, w, http.StatusOK, newUserLogResponse(req.UserID, log)); err != nil {
|
||||
h.HandleHTTPError(ctx, err, w)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
type getUserLogRequest struct {
|
||||
UserID influxdb.ID
|
||||
opts influxdb.FindOptions
|
||||
}
|
||||
|
||||
func decodeGetUserLogRequest(ctx context.Context, r *http.Request) (*getUserLogRequest, error) {
|
||||
params := httprouter.ParamsFromContext(ctx)
|
||||
id := params.ByName("id")
|
||||
if id == "" {
|
||||
return nil, &influxdb.Error{
|
||||
Code: influxdb.EInvalid,
|
||||
Msg: "url missing id",
|
||||
}
|
||||
}
|
||||
|
||||
var i influxdb.ID
|
||||
if err := i.DecodeFromString(id); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
opts, err := decodeFindOptions(ctx, r)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &getUserLogRequest{
|
||||
UserID: i,
|
||||
opts: *opts,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func newUserLogResponse(id influxdb.ID, es []*influxdb.OperationLogEntry) *operationLogResponse {
|
||||
logs := make([]*operationLogEntryResponse, 0, len(es))
|
||||
for _, e := range es {
|
||||
logs = append(logs, newOperationLogEntryResponse(e))
|
||||
}
|
||||
return &operationLogResponse{
|
||||
Links: map[string]string{
|
||||
"self": fmt.Sprintf("/api/v2/users/%s/logs", id),
|
||||
},
|
||||
Logs: logs,
|
||||
}
|
||||
return s.Client.
|
||||
Delete(prefixUsers, id.String()).
|
||||
StatusFn(func(resp *http.Response) error {
|
||||
return CheckErrorStatus(http.StatusNoContent, resp)
|
||||
}).
|
||||
Do(ctx)
|
||||
}
|
||||
|
||||
// PasswordService is an http client to speak to the password service.
|
||||
type PasswordService struct {
|
||||
Addr string
|
||||
Token string
|
||||
InsecureSkipVerify bool
|
||||
Client *httpc.Client
|
||||
}
|
||||
|
||||
var _ influxdb.PasswordsService = (*PasswordService)(nil)
|
||||
|
||||
// SetPassword sets the user's password.
|
||||
func (s *PasswordService) SetPassword(ctx context.Context, userID influxdb.ID, password string) error {
|
||||
u, err := NewURL(s.Addr, path.Join("/api/v2/users", userID.String(), "password"))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
newPass := passwordSetRequest{
|
||||
Password: password,
|
||||
}
|
||||
|
||||
var buf bytes.Buffer
|
||||
if err := json.NewEncoder(&buf).Encode(newPass); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
req, err := http.NewRequest(http.MethodPost, u.String(), &buf)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
SetToken(s.Token, req)
|
||||
|
||||
hc := NewClient(u.Scheme, s.InsecureSkipVerify)
|
||||
|
||||
resp, err := hc.Do(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
return CheckError(resp)
|
||||
return s.Client.
|
||||
PostJSON(passwordSetRequest{
|
||||
Password: password,
|
||||
}, prefixUsers, userID.String(), "password").
|
||||
Do(ctx)
|
||||
}
|
||||
|
||||
// ComparePassword compares the user new password with existing. Note: is not implemented.
|
||||
|
|
|
|||
|
|
@ -44,14 +44,18 @@ func initUserService(f platformtesting.UserFields, t *testing.T) (platform.UserS
|
|||
userBackend.UserService = svc
|
||||
handler := NewUserHandler(zaptest.NewLogger(t), userBackend)
|
||||
server := httptest.NewServer(handler)
|
||||
|
||||
httpClient, err := NewHTTPClient(server.URL, "", false)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
client := UserService{
|
||||
Addr: server.URL,
|
||||
Client: httpClient,
|
||||
OpPrefix: inmem.OpPrefix,
|
||||
}
|
||||
|
||||
done := server.Close
|
||||
|
||||
return &client, inmem.OpPrefix, done
|
||||
return &client, inmem.OpPrefix, server.Close
|
||||
}
|
||||
|
||||
func TestUserService(t *testing.T) {
|
||||
|
|
|
|||
Loading…
Reference in New Issue