feat(authorization): Create a v1 authorization service
This service is a private API for managing authorization tokens for v1 API requests. Note that this commit does not hook up the service to the v1 `/query` and `/write`, which will occur in a subsequent PR. Closes #19812pull/19834/head
parent
a2dbb572fe
commit
5c63c2163d
|
@ -69,6 +69,7 @@ import (
|
||||||
"github.com/influxdata/influxdb/v2/tenant"
|
"github.com/influxdata/influxdb/v2/tenant"
|
||||||
_ "github.com/influxdata/influxdb/v2/tsdb/engine/tsm1" // needed for tsm1
|
_ "github.com/influxdata/influxdb/v2/tsdb/engine/tsm1" // needed for tsm1
|
||||||
_ "github.com/influxdata/influxdb/v2/tsdb/index/tsi1" // needed for tsi1
|
_ "github.com/influxdata/influxdb/v2/tsdb/index/tsi1" // needed for tsi1
|
||||||
|
authv1 "github.com/influxdata/influxdb/v2/v1/authorization"
|
||||||
iqlcoordinator "github.com/influxdata/influxdb/v2/v1/coordinator"
|
iqlcoordinator "github.com/influxdata/influxdb/v2/v1/coordinator"
|
||||||
"github.com/influxdata/influxdb/v2/v1/services/meta"
|
"github.com/influxdata/influxdb/v2/v1/services/meta"
|
||||||
storage2 "github.com/influxdata/influxdb/v2/v1/services/storage"
|
storage2 "github.com/influxdata/influxdb/v2/v1/services/storage"
|
||||||
|
@ -1277,6 +1278,27 @@ func (m *Launcher) run(ctx context.Context) (err error) {
|
||||||
authHTTPServer = authorization.NewHTTPAuthHandler(m.log, authService, ts)
|
authHTTPServer = authorization.NewHTTPAuthHandler(m.log, authService, ts)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var v1AuthHTTPServer *authv1.AuthHandler
|
||||||
|
{
|
||||||
|
var v1AuthSvc platform.AuthorizationService
|
||||||
|
{
|
||||||
|
authStore, err := authv1.NewStore(m.kvStore)
|
||||||
|
if err != nil {
|
||||||
|
m.log.Error("Failed creating new authorization store", zap.Error(err))
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
v1AuthSvc = authv1.NewService(authStore, ts)
|
||||||
|
}
|
||||||
|
|
||||||
|
authLogger := m.log.With(zap.String("handler", "v1_authorization"))
|
||||||
|
|
||||||
|
var authService platform.AuthorizationService
|
||||||
|
authService = authorization.NewAuthedAuthorizationService(v1AuthSvc, ts)
|
||||||
|
authService = authorization.NewAuthLogger(authLogger, authService)
|
||||||
|
|
||||||
|
v1AuthHTTPServer = authv1.NewHTTPAuthHandler(m.log, authService, ts)
|
||||||
|
}
|
||||||
|
|
||||||
var sessionHTTPServer *session.SessionHandler
|
var sessionHTTPServer *session.SessionHandler
|
||||||
{
|
{
|
||||||
sessionHTTPServer = session.NewSessionHandler(m.log.With(zap.String("handler", "session")), sessionSvc, ts.UserService, ts.PasswordsService)
|
sessionHTTPServer = session.NewSessionHandler(m.log.With(zap.String("handler", "session")), sessionSvc, ts.UserService, ts.PasswordsService)
|
||||||
|
@ -1299,6 +1321,7 @@ func (m *Launcher) run(ctx context.Context) (err error) {
|
||||||
http.WithResourceHandler(userHTTPServer.UserResourceHandler()),
|
http.WithResourceHandler(userHTTPServer.UserResourceHandler()),
|
||||||
http.WithResourceHandler(orgHTTPServer),
|
http.WithResourceHandler(orgHTTPServer),
|
||||||
http.WithResourceHandler(bucketHTTPServer),
|
http.WithResourceHandler(bucketHTTPServer),
|
||||||
|
http.WithResourceHandler(v1AuthHTTPServer),
|
||||||
)
|
)
|
||||||
|
|
||||||
httpLogger := m.log.With(zap.String("service", "http"))
|
httpLogger := m.log.With(zap.String("service", "http"))
|
||||||
|
|
|
@ -69,7 +69,8 @@ func (h *PlatformHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||||
// of the platform API.
|
// of the platform API.
|
||||||
if !strings.HasPrefix(r.URL.Path, "/v1") &&
|
if !strings.HasPrefix(r.URL.Path, "/v1") &&
|
||||||
!strings.HasPrefix(r.URL.Path, "/api/v2") &&
|
!strings.HasPrefix(r.URL.Path, "/api/v2") &&
|
||||||
!strings.HasPrefix(r.URL.Path, "/chronograf/") {
|
!strings.HasPrefix(r.URL.Path, "/chronograf/") &&
|
||||||
|
!strings.HasPrefix(r.URL.Path, "/private/") {
|
||||||
h.AssetHandler.ServeHTTP(w, r)
|
h.AssetHandler.ServeHTTP(w, r)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,9 @@
|
||||||
|
package all
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/influxdata/influxdb/v2/kv/migration"
|
||||||
|
)
|
||||||
|
|
||||||
|
var Migration0008_LegacyAuthBuckets = migration.CreateBuckets(
|
||||||
|
"Create Legacy authorization buckets",
|
||||||
|
[]byte("legacy/authorizationsv1"), []byte("legacy/authorizationindexv1"))
|
|
@ -21,5 +21,7 @@ var Migrations = [...]migration.Spec{
|
||||||
Migration0006_DeleteBucketSessionsv1,
|
Migration0006_DeleteBucketSessionsv1,
|
||||||
// CreateMetaDataBucket
|
// CreateMetaDataBucket
|
||||||
Migration0007_CreateMetaDataBucket,
|
Migration0007_CreateMetaDataBucket,
|
||||||
|
// LegacyAuthBuckets
|
||||||
|
Migration0008_LegacyAuthBuckets,
|
||||||
// {{ do_not_edit . }}
|
// {{ do_not_edit . }}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,66 @@
|
||||||
|
package authorization
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/influxdata/influxdb/v2"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
// ErrInvalidAuthID is used when the Authorization's ID cannot be encoded
|
||||||
|
ErrInvalidAuthID = &influxdb.Error{
|
||||||
|
Code: influxdb.EInvalid,
|
||||||
|
Msg: "authorization ID is invalid",
|
||||||
|
}
|
||||||
|
|
||||||
|
// ErrAuthNotFound is used when the specified auth cannot be found
|
||||||
|
ErrAuthNotFound = &influxdb.Error{
|
||||||
|
Code: influxdb.ENotFound,
|
||||||
|
Msg: "authorization not found",
|
||||||
|
}
|
||||||
|
|
||||||
|
// NotUniqueIDError occurs when attempting to create an Authorization with an ID that already belongs to another one
|
||||||
|
NotUniqueIDError = &influxdb.Error{
|
||||||
|
Code: influxdb.EConflict,
|
||||||
|
Msg: "ID already exists",
|
||||||
|
}
|
||||||
|
|
||||||
|
// ErrFailureGeneratingID occurs ony when the random number generator
|
||||||
|
// cannot generate an ID in MaxIDGenerationN times.
|
||||||
|
ErrFailureGeneratingID = &influxdb.Error{
|
||||||
|
Code: influxdb.EInternal,
|
||||||
|
Msg: "unable to generate valid id",
|
||||||
|
}
|
||||||
|
|
||||||
|
// ErrTokenAlreadyExistsError is used when attempting to create an authorization
|
||||||
|
// with a token that already exists
|
||||||
|
ErrTokenAlreadyExistsError = &influxdb.Error{
|
||||||
|
Code: influxdb.EConflict,
|
||||||
|
Msg: "token already exists",
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
// ErrInvalidAuthIDError is used when a service was provided an invalid ID.
|
||||||
|
func ErrInvalidAuthIDError(err error) *influxdb.Error {
|
||||||
|
return &influxdb.Error{
|
||||||
|
Code: influxdb.EInvalid,
|
||||||
|
Msg: "auth id provided is invalid",
|
||||||
|
Err: err,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ErrInternalServiceError is used when the error comes from an internal system.
|
||||||
|
func ErrInternalServiceError(err error) *influxdb.Error {
|
||||||
|
return &influxdb.Error{
|
||||||
|
Code: influxdb.EInternal,
|
||||||
|
Err: err,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// UnexpectedAuthIndexError is used when the error comes from an internal system.
|
||||||
|
func UnexpectedAuthIndexError(err error) *influxdb.Error {
|
||||||
|
return &influxdb.Error{
|
||||||
|
Code: influxdb.EInternal,
|
||||||
|
Msg: fmt.Sprintf("unexpected error retrieving auth index; Err: %v", err),
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,106 @@
|
||||||
|
package authorization
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
|
||||||
|
"github.com/influxdata/influxdb/v2"
|
||||||
|
"github.com/influxdata/influxdb/v2/pkg/httpc"
|
||||||
|
)
|
||||||
|
|
||||||
|
var _ influxdb.AuthorizationService = (*Client)(nil)
|
||||||
|
|
||||||
|
// Client connects to Influx via HTTP using tokens to manage authorizations
|
||||||
|
type Client struct {
|
||||||
|
Client *httpc.Client
|
||||||
|
}
|
||||||
|
|
||||||
|
// CreateAuthorization creates a new authorization and sets b.ID with the new identifier.
|
||||||
|
func (s *Client) CreateAuthorization(ctx context.Context, a *influxdb.Authorization) error {
|
||||||
|
newAuth, err := newPostAuthorizationRequest(a)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return s.Client.
|
||||||
|
PostJSON(newAuth, prefixAuthorization).
|
||||||
|
DecodeJSON(a).
|
||||||
|
Do(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
// FindAuthorizations returns a list of authorizations that match filter and the total count of matching authorizations.
|
||||||
|
// Additional options provide pagination & sorting.
|
||||||
|
func (s *Client) FindAuthorizations(ctx context.Context, filter influxdb.AuthorizationFilter, opt ...influxdb.FindOptions) ([]*influxdb.Authorization, int, error) {
|
||||||
|
params := influxdb.FindOptionParams(opt...)
|
||||||
|
if filter.ID != nil {
|
||||||
|
params = append(params, [2]string{"id", filter.ID.String()})
|
||||||
|
}
|
||||||
|
if filter.UserID != nil {
|
||||||
|
params = append(params, [2]string{"userID", filter.UserID.String()})
|
||||||
|
}
|
||||||
|
if filter.User != nil {
|
||||||
|
params = append(params, [2]string{"user", *filter.User})
|
||||||
|
}
|
||||||
|
if filter.OrgID != nil {
|
||||||
|
params = append(params, [2]string{"orgID", filter.OrgID.String()})
|
||||||
|
}
|
||||||
|
if filter.Org != nil {
|
||||||
|
params = append(params, [2]string{"org", *filter.Org})
|
||||||
|
}
|
||||||
|
|
||||||
|
var as authsResponse
|
||||||
|
err := s.Client.
|
||||||
|
Get(prefixAuthorization).
|
||||||
|
QueryParams(params...).
|
||||||
|
DecodeJSON(&as).
|
||||||
|
Do(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
auths := make([]*influxdb.Authorization, 0, len(as.Auths))
|
||||||
|
for _, a := range as.Auths {
|
||||||
|
auths = append(auths, a.toInfluxdb())
|
||||||
|
}
|
||||||
|
|
||||||
|
return auths, len(auths), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// FindAuthorizationByToken is not supported by the HTTP authorization service.
|
||||||
|
func (s *Client) FindAuthorizationByToken(ctx context.Context, token string) (*influxdb.Authorization, error) {
|
||||||
|
return nil, errors.New("not supported in HTTP authorization service")
|
||||||
|
}
|
||||||
|
|
||||||
|
// FindAuthorizationByID finds a single Authorization by its ID against a remote influx server.
|
||||||
|
func (s *Client) FindAuthorizationByID(ctx context.Context, id influxdb.ID) (*influxdb.Authorization, error) {
|
||||||
|
var b influxdb.Authorization
|
||||||
|
err := s.Client.
|
||||||
|
Get(prefixAuthorization, id.String()).
|
||||||
|
DecodeJSON(&b).
|
||||||
|
Do(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &b, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// UpdateAuthorization updates the status and description if available.
|
||||||
|
func (s *Client) UpdateAuthorization(ctx context.Context, id influxdb.ID, upd *influxdb.AuthorizationUpdate) (*influxdb.Authorization, error) {
|
||||||
|
var res authResponse
|
||||||
|
err := s.Client.
|
||||||
|
PatchJSON(upd, prefixAuthorization, id.String()).
|
||||||
|
DecodeJSON(&res).
|
||||||
|
Do(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return res.toInfluxdb(), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// DeleteAuthorization removes a authorization by id.
|
||||||
|
func (s *Client) DeleteAuthorization(ctx context.Context, id influxdb.ID) error {
|
||||||
|
return s.Client.
|
||||||
|
Delete(prefixAuthorization, id.String()).
|
||||||
|
Do(ctx)
|
||||||
|
}
|
|
@ -0,0 +1,625 @@
|
||||||
|
package authorization
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"crypto/sha256"
|
||||||
|
"encoding/base64"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"net/http"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/go-chi/chi"
|
||||||
|
"github.com/go-chi/chi/middleware"
|
||||||
|
"github.com/influxdata/influxdb/v2"
|
||||||
|
icontext "github.com/influxdata/influxdb/v2/context"
|
||||||
|
kithttp "github.com/influxdata/influxdb/v2/kit/transport/http"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
)
|
||||||
|
|
||||||
|
// TenantService is used to look up the Organization and User for an Authorization
|
||||||
|
type TenantService interface {
|
||||||
|
FindOrganizationByID(ctx context.Context, id influxdb.ID) (*influxdb.Organization, error)
|
||||||
|
FindOrganization(ctx context.Context, filter influxdb.OrganizationFilter) (*influxdb.Organization, error)
|
||||||
|
FindUserByID(ctx context.Context, id influxdb.ID) (*influxdb.User, error)
|
||||||
|
FindUser(ctx context.Context, filter influxdb.UserFilter) (*influxdb.User, error)
|
||||||
|
FindBucketByID(ctx context.Context, id influxdb.ID) (*influxdb.Bucket, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
type AuthHandler struct {
|
||||||
|
chi.Router
|
||||||
|
api *kithttp.API
|
||||||
|
log *zap.Logger
|
||||||
|
authSvc influxdb.AuthorizationService
|
||||||
|
tenantService TenantService
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewHTTPAuthHandler constructs a new http server.
|
||||||
|
func NewHTTPAuthHandler(log *zap.Logger, authService influxdb.AuthorizationService, tenantService TenantService) *AuthHandler {
|
||||||
|
h := &AuthHandler{
|
||||||
|
api: kithttp.NewAPI(kithttp.WithLog(log)),
|
||||||
|
log: log,
|
||||||
|
authSvc: authService,
|
||||||
|
tenantService: tenantService,
|
||||||
|
}
|
||||||
|
|
||||||
|
r := chi.NewRouter()
|
||||||
|
r.Use(
|
||||||
|
middleware.Recoverer,
|
||||||
|
middleware.RequestID,
|
||||||
|
middleware.RealIP,
|
||||||
|
)
|
||||||
|
|
||||||
|
r.Route("/", func(r chi.Router) {
|
||||||
|
r.Post("/", h.handlePostAuthorization)
|
||||||
|
r.Get("/", h.handleGetAuthorizations)
|
||||||
|
|
||||||
|
r.Route("/{id}", func(r chi.Router) {
|
||||||
|
r.Get("/", h.handleGetAuthorization)
|
||||||
|
r.Patch("/", h.handleUpdateAuthorization)
|
||||||
|
r.Delete("/", h.handleDeleteAuthorization)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
h.Router = r
|
||||||
|
return h
|
||||||
|
}
|
||||||
|
|
||||||
|
const prefixAuthorization = "/private/legacy/authorizations"
|
||||||
|
|
||||||
|
func (h *AuthHandler) Prefix() string {
|
||||||
|
return prefixAuthorization
|
||||||
|
}
|
||||||
|
|
||||||
|
// handlePostAuthorization is the HTTP handler for the POST prefixAuthorization route.
|
||||||
|
func (h *AuthHandler) handlePostAuthorization(w http.ResponseWriter, r *http.Request) {
|
||||||
|
ctx := r.Context()
|
||||||
|
a, err := decodePostAuthorizationRequest(ctx, r)
|
||||||
|
if err != nil {
|
||||||
|
h.api.Err(w, r, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
user, err := getAuthorizedUser(r, h.tenantService)
|
||||||
|
if err != nil {
|
||||||
|
h.api.Err(w, r, influxdb.ErrUnableToCreateToken)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
userID := user.ID
|
||||||
|
if a.UserID != nil && a.UserID.Valid() {
|
||||||
|
userID = *a.UserID
|
||||||
|
}
|
||||||
|
|
||||||
|
auth := a.toInfluxdb(userID)
|
||||||
|
|
||||||
|
if err := h.authSvc.CreateAuthorization(ctx, auth); err != nil {
|
||||||
|
h.api.Err(w, r, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
perms, err := h.newPermissionsResponse(ctx, auth.Permissions)
|
||||||
|
if err != nil {
|
||||||
|
h.api.Err(w, r, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
h.log.Debug("Auth created ", zap.String("auth", fmt.Sprint(auth)))
|
||||||
|
|
||||||
|
resp, err := h.newAuthResponse(ctx, auth, perms)
|
||||||
|
if err != nil {
|
||||||
|
h.api.Err(w, r, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
h.api.Respond(w, r, http.StatusCreated, resp)
|
||||||
|
}
|
||||||
|
|
||||||
|
func getAuthorizedUser(r *http.Request, ts TenantService) (*influxdb.User, error) {
|
||||||
|
ctx := r.Context()
|
||||||
|
|
||||||
|
a, err := icontext.GetAuthorizer(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return ts.FindUserByID(ctx, a.GetUserID())
|
||||||
|
}
|
||||||
|
|
||||||
|
type postAuthorizationRequest struct {
|
||||||
|
Token string `json:"token"`
|
||||||
|
Status influxdb.Status `json:"status"`
|
||||||
|
OrgID influxdb.ID `json:"orgID"`
|
||||||
|
UserID *influxdb.ID `json:"userID,omitempty"`
|
||||||
|
Description string `json:"description"`
|
||||||
|
Permissions []influxdb.Permission `json:"permissions"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type authResponse struct {
|
||||||
|
ID influxdb.ID `json:"id"`
|
||||||
|
Status influxdb.Status `json:"status"`
|
||||||
|
Description string `json:"description"`
|
||||||
|
OrgID influxdb.ID `json:"orgID"`
|
||||||
|
Org string `json:"org"`
|
||||||
|
UserID influxdb.ID `json:"userID"`
|
||||||
|
User string `json:"user"`
|
||||||
|
Permissions []permissionResponse `json:"permissions"`
|
||||||
|
Links map[string]string `json:"links"`
|
||||||
|
CreatedAt time.Time `json:"createdAt"`
|
||||||
|
UpdatedAt time.Time `json:"updatedAt"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// In the future, we would like only the service layer to look up the user and org to see if they are valid
|
||||||
|
// but for now we need to look up the User and Org here because the API expects the response
|
||||||
|
// to have the names of the Org and User
|
||||||
|
func (h *AuthHandler) newAuthResponse(ctx context.Context, a *influxdb.Authorization, ps []permissionResponse) (*authResponse, error) {
|
||||||
|
org, err := h.tenantService.FindOrganizationByID(ctx, a.OrgID)
|
||||||
|
if err != nil {
|
||||||
|
h.log.Info("Failed to get org", zap.String("handler", "getAuthorizations"), zap.String("orgID", a.OrgID.String()), zap.Error(err))
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
user, err := h.tenantService.FindUserByID(ctx, a.UserID)
|
||||||
|
if err != nil {
|
||||||
|
h.log.Info("Failed to get user", zap.String("userID", a.UserID.String()), zap.Error(err))
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
res := &authResponse{
|
||||||
|
ID: a.ID,
|
||||||
|
Status: a.Status,
|
||||||
|
Description: a.Description,
|
||||||
|
OrgID: a.OrgID,
|
||||||
|
UserID: a.UserID,
|
||||||
|
User: user.Name,
|
||||||
|
Org: org.Name,
|
||||||
|
Permissions: ps,
|
||||||
|
Links: map[string]string{
|
||||||
|
"self": fmt.Sprintf(prefixAuthorization+"/%s", a.ID),
|
||||||
|
"user": fmt.Sprintf("/api/v2/users/%s", a.UserID),
|
||||||
|
},
|
||||||
|
CreatedAt: a.CreatedAt,
|
||||||
|
UpdatedAt: a.UpdatedAt,
|
||||||
|
}
|
||||||
|
|
||||||
|
return res, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *postAuthorizationRequest) toInfluxdb(userID influxdb.ID) *influxdb.Authorization {
|
||||||
|
hash := sha256.New()
|
||||||
|
hash.Write([]byte(p.Token))
|
||||||
|
var buf [sha256.Size]byte
|
||||||
|
token := hash.Sum(buf[:0])
|
||||||
|
|
||||||
|
t := &influxdb.Authorization{
|
||||||
|
OrgID: p.OrgID,
|
||||||
|
Token: base64.URLEncoding.EncodeToString(token),
|
||||||
|
Status: p.Status,
|
||||||
|
Description: p.Description,
|
||||||
|
Permissions: p.Permissions,
|
||||||
|
UserID: userID,
|
||||||
|
}
|
||||||
|
|
||||||
|
return t
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *authResponse) toInfluxdb() *influxdb.Authorization {
|
||||||
|
res := &influxdb.Authorization{
|
||||||
|
ID: a.ID,
|
||||||
|
Status: a.Status,
|
||||||
|
Description: a.Description,
|
||||||
|
OrgID: a.OrgID,
|
||||||
|
UserID: a.UserID,
|
||||||
|
CRUDLog: influxdb.CRUDLog{
|
||||||
|
CreatedAt: a.CreatedAt,
|
||||||
|
UpdatedAt: a.UpdatedAt,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, p := range a.Permissions {
|
||||||
|
res.Permissions = append(res.Permissions, influxdb.Permission{Action: p.Action, Resource: p.Resource.Resource})
|
||||||
|
}
|
||||||
|
return res
|
||||||
|
}
|
||||||
|
|
||||||
|
type authsResponse struct {
|
||||||
|
Links map[string]string `json:"links"`
|
||||||
|
Auths []*authResponse `json:"authorizations"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func newAuthsResponse(as []*authResponse) *authsResponse {
|
||||||
|
return &authsResponse{
|
||||||
|
// TODO(desa): update links to include paging and filter information
|
||||||
|
Links: map[string]string{
|
||||||
|
"self": prefixAuthorization,
|
||||||
|
},
|
||||||
|
Auths: as,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func newPostAuthorizationRequest(a *influxdb.Authorization) (*postAuthorizationRequest, error) {
|
||||||
|
res := &postAuthorizationRequest{
|
||||||
|
OrgID: a.OrgID,
|
||||||
|
Description: a.Description,
|
||||||
|
Permissions: a.Permissions,
|
||||||
|
Token: a.Token,
|
||||||
|
Status: a.Status,
|
||||||
|
}
|
||||||
|
|
||||||
|
if a.UserID.Valid() {
|
||||||
|
res.UserID = &a.UserID
|
||||||
|
}
|
||||||
|
|
||||||
|
res.SetDefaults()
|
||||||
|
|
||||||
|
return res, res.Validate()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *postAuthorizationRequest) SetDefaults() {
|
||||||
|
if p.Status == "" {
|
||||||
|
p.Status = influxdb.Active
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *postAuthorizationRequest) Validate() error {
|
||||||
|
if len(p.Permissions) == 0 {
|
||||||
|
return &influxdb.Error{
|
||||||
|
Code: influxdb.EInvalid,
|
||||||
|
Msg: "authorization must include permissions",
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, perm := range p.Permissions {
|
||||||
|
if err := perm.Valid(); err != nil {
|
||||||
|
return &influxdb.Error{
|
||||||
|
Err: err,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if !p.OrgID.Valid() {
|
||||||
|
return &influxdb.Error{
|
||||||
|
Err: influxdb.ErrInvalidID,
|
||||||
|
Code: influxdb.EInvalid,
|
||||||
|
Msg: "org id required",
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if p.Status == "" {
|
||||||
|
p.Status = influxdb.Active
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := p.Status.Valid(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if p.Token == "" {
|
||||||
|
return &influxdb.Error{
|
||||||
|
Msg: "token required for v1_user authorization type",
|
||||||
|
Code: influxdb.EInvalid,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if strings.IndexByte(p.Token, ':') == -1 {
|
||||||
|
return &influxdb.Error{
|
||||||
|
Msg: "token format invalid for v1_user authorization type: must be username:password",
|
||||||
|
Code: influxdb.EInvalid,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type permissionResponse struct {
|
||||||
|
Action influxdb.Action `json:"action"`
|
||||||
|
Resource resourceResponse `json:"resource"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type resourceResponse struct {
|
||||||
|
influxdb.Resource
|
||||||
|
Name string `json:"name,omitempty"`
|
||||||
|
Organization string `json:"org,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *AuthHandler) newPermissionsResponse(ctx context.Context, ps []influxdb.Permission) ([]permissionResponse, error) {
|
||||||
|
res := make([]permissionResponse, len(ps))
|
||||||
|
for i, p := range ps {
|
||||||
|
res[i] = permissionResponse{
|
||||||
|
Action: p.Action,
|
||||||
|
Resource: resourceResponse{
|
||||||
|
Resource: p.Resource,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
if p.Resource.ID != nil {
|
||||||
|
name, err := h.getNameForResource(ctx, p.Resource.Type, *p.Resource.ID)
|
||||||
|
if influxdb.ErrorCode(err) == influxdb.ENotFound {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
res[i].Resource.Name = name
|
||||||
|
}
|
||||||
|
|
||||||
|
if p.Resource.OrgID != nil {
|
||||||
|
name, err := h.getNameForResource(ctx, influxdb.OrgsResourceType, *p.Resource.OrgID)
|
||||||
|
if influxdb.ErrorCode(err) == influxdb.ENotFound {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
res[i].Resource.Organization = name
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return res, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *AuthHandler) getNameForResource(ctx context.Context, resource influxdb.ResourceType, id influxdb.ID) (string, error) {
|
||||||
|
if err := resource.Valid(); err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
if ok := id.Valid(); !ok {
|
||||||
|
return "", influxdb.ErrInvalidID
|
||||||
|
}
|
||||||
|
|
||||||
|
switch resource {
|
||||||
|
case influxdb.BucketsResourceType:
|
||||||
|
r, err := h.tenantService.FindBucketByID(ctx, id)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
return r.Name, nil
|
||||||
|
case influxdb.OrgsResourceType:
|
||||||
|
r, err := h.tenantService.FindOrganizationByID(ctx, id)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
return r.Name, nil
|
||||||
|
case influxdb.UsersResourceType:
|
||||||
|
r, err := h.tenantService.FindUserByID(ctx, id)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
return r.Name, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return "", nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func decodePostAuthorizationRequest(ctx context.Context, r *http.Request) (*postAuthorizationRequest, error) {
|
||||||
|
a := &postAuthorizationRequest{}
|
||||||
|
if err := json.NewDecoder(r.Body).Decode(a); err != nil {
|
||||||
|
return nil, &influxdb.Error{
|
||||||
|
Code: influxdb.EInvalid,
|
||||||
|
Msg: "invalid json structure",
|
||||||
|
Err: err,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
a.SetDefaults()
|
||||||
|
|
||||||
|
return a, a.Validate()
|
||||||
|
}
|
||||||
|
|
||||||
|
// handleGetAuthorizations is the HTTP handler for the GET prefixAuthorization route.
|
||||||
|
func (h *AuthHandler) handleGetAuthorizations(w http.ResponseWriter, r *http.Request) {
|
||||||
|
ctx := r.Context()
|
||||||
|
req, err := decodeGetAuthorizationsRequest(ctx, r)
|
||||||
|
if err != nil {
|
||||||
|
h.log.Info("Failed to decode request", zap.String("handler", "getAuthorizations"), zap.Error(err))
|
||||||
|
h.api.Err(w, r, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
opts := influxdb.FindOptions{}
|
||||||
|
as, _, err := h.authSvc.FindAuthorizations(ctx, req.filter, opts)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
h.api.Err(w, r, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
f := req.filter
|
||||||
|
// If the user or org name was provided, look up the ID first
|
||||||
|
if f.User != nil {
|
||||||
|
u, err := h.tenantService.FindUser(ctx, influxdb.UserFilter{Name: f.User})
|
||||||
|
if err != nil {
|
||||||
|
h.api.Err(w, r, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
f.UserID = &u.ID
|
||||||
|
}
|
||||||
|
|
||||||
|
if f.Org != nil {
|
||||||
|
o, err := h.tenantService.FindOrganization(ctx, influxdb.OrganizationFilter{Name: f.Org})
|
||||||
|
if err != nil {
|
||||||
|
h.api.Err(w, r, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
f.OrgID = &o.ID
|
||||||
|
}
|
||||||
|
|
||||||
|
auths := make([]*authResponse, 0, len(as))
|
||||||
|
for _, a := range as {
|
||||||
|
ps, err := h.newPermissionsResponse(ctx, a.Permissions)
|
||||||
|
if err != nil {
|
||||||
|
h.api.Err(w, r, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
resp, err := h.newAuthResponse(ctx, a, ps)
|
||||||
|
if err != nil {
|
||||||
|
h.log.Info("Failed to create auth response", zap.String("handler", "getAuthorizations"))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
auths = append(auths, resp)
|
||||||
|
}
|
||||||
|
|
||||||
|
h.log.Debug("Auths retrieved ", zap.String("auths", fmt.Sprint(auths)))
|
||||||
|
|
||||||
|
h.api.Respond(w, r, http.StatusOK, newAuthsResponse(auths))
|
||||||
|
}
|
||||||
|
|
||||||
|
type getAuthorizationsRequest struct {
|
||||||
|
filter influxdb.AuthorizationFilter
|
||||||
|
}
|
||||||
|
|
||||||
|
func decodeGetAuthorizationsRequest(ctx context.Context, r *http.Request) (*getAuthorizationsRequest, error) {
|
||||||
|
qp := r.URL.Query()
|
||||||
|
|
||||||
|
req := &getAuthorizationsRequest{}
|
||||||
|
|
||||||
|
userID := qp.Get("userID")
|
||||||
|
if userID != "" {
|
||||||
|
id, err := influxdb.IDFromString(userID)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
req.filter.UserID = id
|
||||||
|
}
|
||||||
|
|
||||||
|
user := qp.Get("user")
|
||||||
|
if user != "" {
|
||||||
|
req.filter.User = &user
|
||||||
|
}
|
||||||
|
|
||||||
|
orgID := qp.Get("orgID")
|
||||||
|
if orgID != "" {
|
||||||
|
id, err := influxdb.IDFromString(orgID)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
req.filter.OrgID = id
|
||||||
|
}
|
||||||
|
|
||||||
|
org := qp.Get("org")
|
||||||
|
if org != "" {
|
||||||
|
req.filter.Org = &org
|
||||||
|
}
|
||||||
|
|
||||||
|
authID := qp.Get("id")
|
||||||
|
if authID != "" {
|
||||||
|
id, err := influxdb.IDFromString(authID)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
req.filter.ID = id
|
||||||
|
}
|
||||||
|
|
||||||
|
return req, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *AuthHandler) handleGetAuthorization(w http.ResponseWriter, r *http.Request) {
|
||||||
|
ctx := r.Context()
|
||||||
|
|
||||||
|
id, err := influxdb.IDFromString(chi.URLParam(r, "id"))
|
||||||
|
if err != nil {
|
||||||
|
h.log.Info("Failed to decode request", zap.String("handler", "getAuthorization"), zap.Error(err))
|
||||||
|
h.api.Err(w, r, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
a, err := h.authSvc.FindAuthorizationByID(ctx, *id)
|
||||||
|
if err != nil {
|
||||||
|
// Don't log here, it should already be handled by the service
|
||||||
|
h.api.Err(w, r, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
ps, err := h.newPermissionsResponse(ctx, a.Permissions)
|
||||||
|
if err != nil {
|
||||||
|
h.api.Err(w, r, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
h.log.Debug("Auth retrieved ", zap.String("auth", fmt.Sprint(a)))
|
||||||
|
|
||||||
|
resp, err := h.newAuthResponse(ctx, a, ps)
|
||||||
|
if err != nil {
|
||||||
|
h.api.Err(w, r, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
h.api.Respond(w, r, http.StatusOK, resp)
|
||||||
|
}
|
||||||
|
|
||||||
|
// handleUpdateAuthorization is the HTTP handler for the PATCH /api/v2/authorizations/:id route that updates the authorization's status and desc.
|
||||||
|
func (h *AuthHandler) handleUpdateAuthorization(w http.ResponseWriter, r *http.Request) {
|
||||||
|
ctx := r.Context()
|
||||||
|
req, err := decodeUpdateAuthorizationRequest(ctx, r)
|
||||||
|
if err != nil {
|
||||||
|
h.log.Info("Failed to decode request", zap.String("handler", "updateAuthorization"), zap.Error(err))
|
||||||
|
h.api.Err(w, r, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
a, err := h.authSvc.FindAuthorizationByID(ctx, req.ID)
|
||||||
|
if err != nil {
|
||||||
|
h.api.Err(w, r, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
a, err = h.authSvc.UpdateAuthorization(ctx, a.ID, req.AuthorizationUpdate)
|
||||||
|
if err != nil {
|
||||||
|
h.api.Err(w, r, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
ps, err := h.newPermissionsResponse(ctx, a.Permissions)
|
||||||
|
if err != nil {
|
||||||
|
h.api.Err(w, r, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
h.log.Debug("Auth updated", zap.String("auth", fmt.Sprint(a)))
|
||||||
|
|
||||||
|
resp, err := h.newAuthResponse(ctx, a, ps)
|
||||||
|
if err != nil {
|
||||||
|
h.api.Err(w, r, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
h.api.Respond(w, r, http.StatusOK, resp)
|
||||||
|
}
|
||||||
|
|
||||||
|
type updateAuthorizationRequest struct {
|
||||||
|
ID influxdb.ID
|
||||||
|
*influxdb.AuthorizationUpdate
|
||||||
|
}
|
||||||
|
|
||||||
|
func decodeUpdateAuthorizationRequest(ctx context.Context, r *http.Request) (*updateAuthorizationRequest, error) {
|
||||||
|
id, err := influxdb.IDFromString(chi.URLParam(r, "id"))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
upd := &influxdb.AuthorizationUpdate{}
|
||||||
|
if err := json.NewDecoder(r.Body).Decode(upd); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return &updateAuthorizationRequest{
|
||||||
|
ID: *id,
|
||||||
|
AuthorizationUpdate: upd,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// handleDeleteAuthorization is the HTTP handler for the DELETE prefixAuthorization/:id route.
|
||||||
|
func (h *AuthHandler) handleDeleteAuthorization(w http.ResponseWriter, r *http.Request) {
|
||||||
|
id, err := influxdb.IDFromString(chi.URLParam(r, "id"))
|
||||||
|
if err != nil {
|
||||||
|
h.log.Info("Failed to decode request", zap.String("handler", "deleteAuthorization"), zap.Error(err))
|
||||||
|
h.api.Err(w, r, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := h.authSvc.DeleteAuthorization(r.Context(), *id); err != nil {
|
||||||
|
// Don't log here, it should already be handled by the service
|
||||||
|
h.api.Err(w, r, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
h.log.Debug("Auth deleted", zap.String("authID", fmt.Sprint(id)))
|
||||||
|
|
||||||
|
w.WriteHeader(http.StatusNoContent)
|
||||||
|
}
|
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,40 @@
|
||||||
|
package authorization
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"github.com/influxdata/influxdb/v2"
|
||||||
|
)
|
||||||
|
|
||||||
|
// tenantService is a mock implementation of an authorization.tenantService
|
||||||
|
type tenantService struct {
|
||||||
|
FindUserByIDFn func(context.Context, influxdb.ID) (*influxdb.User, error)
|
||||||
|
FindUserFn func(context.Context, influxdb.UserFilter) (*influxdb.User, error)
|
||||||
|
FindOrganizationByIDF func(ctx context.Context, id influxdb.ID) (*influxdb.Organization, error)
|
||||||
|
FindOrganizationF func(ctx context.Context, filter influxdb.OrganizationFilter) (*influxdb.Organization, error)
|
||||||
|
FindBucketByIDFn func(context.Context, influxdb.ID) (*influxdb.Bucket, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
// FindUserByID returns a single User by ID.
|
||||||
|
func (s *tenantService) FindUserByID(ctx context.Context, id influxdb.ID) (*influxdb.User, error) {
|
||||||
|
return s.FindUserByIDFn(ctx, id)
|
||||||
|
}
|
||||||
|
|
||||||
|
// FindUsers returns a list of Users that match filter and the total count of matching Users.
|
||||||
|
func (s *tenantService) FindUser(ctx context.Context, filter influxdb.UserFilter) (*influxdb.User, error) {
|
||||||
|
return s.FindUserFn(ctx, filter)
|
||||||
|
}
|
||||||
|
|
||||||
|
//FindOrganizationByID calls FindOrganizationByIDF.
|
||||||
|
func (s *tenantService) FindOrganizationByID(ctx context.Context, id influxdb.ID) (*influxdb.Organization, error) {
|
||||||
|
return s.FindOrganizationByIDF(ctx, id)
|
||||||
|
}
|
||||||
|
|
||||||
|
//FindOrganization calls FindOrganizationF.
|
||||||
|
func (s *tenantService) FindOrganization(ctx context.Context, filter influxdb.OrganizationFilter) (*influxdb.Organization, error) {
|
||||||
|
return s.FindOrganizationF(ctx, filter)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *tenantService) FindBucketByID(ctx context.Context, id influxdb.ID) (*influxdb.Bucket, error) {
|
||||||
|
return s.FindBucketByIDFn(ctx, id)
|
||||||
|
}
|
|
@ -0,0 +1,210 @@
|
||||||
|
package authorization
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/influxdata/influxdb/v2"
|
||||||
|
"github.com/influxdata/influxdb/v2/kv"
|
||||||
|
"github.com/influxdata/influxdb/v2/rand"
|
||||||
|
)
|
||||||
|
|
||||||
|
var _ influxdb.AuthorizationService = (*Service)(nil)
|
||||||
|
|
||||||
|
type Service struct {
|
||||||
|
store *Store
|
||||||
|
tokenGenerator influxdb.TokenGenerator
|
||||||
|
tenantService TenantService
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewService(st *Store, ts TenantService) influxdb.AuthorizationService {
|
||||||
|
return &Service{
|
||||||
|
store: st,
|
||||||
|
tokenGenerator: rand.NewTokenGenerator(64),
|
||||||
|
tenantService: ts,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Service) CreateAuthorization(ctx context.Context, a *influxdb.Authorization) error {
|
||||||
|
if err := a.Valid(); err != nil {
|
||||||
|
return &influxdb.Error{
|
||||||
|
Err: err,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if a.Token == "" {
|
||||||
|
return influxdb.ErrUnableToCreateToken
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := s.tenantService.FindUserByID(ctx, a.UserID); err != nil {
|
||||||
|
return influxdb.ErrUnableToCreateToken
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := s.tenantService.FindOrganizationByID(ctx, a.OrgID); err != nil {
|
||||||
|
return influxdb.ErrUnableToCreateToken
|
||||||
|
}
|
||||||
|
|
||||||
|
err := s.store.View(ctx, func(tx kv.Tx) error {
|
||||||
|
if err := s.store.uniqueAuthToken(ctx, tx, a); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return ErrTokenAlreadyExistsError
|
||||||
|
}
|
||||||
|
|
||||||
|
now := time.Now()
|
||||||
|
a.SetCreatedAt(now)
|
||||||
|
a.SetUpdatedAt(now)
|
||||||
|
|
||||||
|
return s.store.Update(ctx, func(tx kv.Tx) error {
|
||||||
|
return s.store.CreateAuthorization(ctx, tx, a)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Service) FindAuthorizationByID(ctx context.Context, id influxdb.ID) (*influxdb.Authorization, error) {
|
||||||
|
var a *influxdb.Authorization
|
||||||
|
err := s.store.View(ctx, func(tx kv.Tx) error {
|
||||||
|
auth, err := s.store.GetAuthorizationByID(ctx, tx, id)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
a = auth
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return a, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// FindAuthorizationByToken returns a authorization by token for a particular authorization.
|
||||||
|
func (s *Service) FindAuthorizationByToken(ctx context.Context, n string) (*influxdb.Authorization, error) {
|
||||||
|
var a *influxdb.Authorization
|
||||||
|
err := s.store.View(ctx, func(tx kv.Tx) error {
|
||||||
|
auth, err := s.store.GetAuthorizationByToken(ctx, tx, n)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
a = auth
|
||||||
|
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return a, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// FindAuthorizations retrives all authorizations that match an arbitrary authorization filter.
|
||||||
|
// Filters using ID, or Token should be efficient.
|
||||||
|
// Other filters will do a linear scan across all authorizations searching for a match.
|
||||||
|
func (s *Service) FindAuthorizations(ctx context.Context, filter influxdb.AuthorizationFilter, opt ...influxdb.FindOptions) ([]*influxdb.Authorization, int, error) {
|
||||||
|
if filter.ID != nil {
|
||||||
|
var auth *influxdb.Authorization
|
||||||
|
err := s.store.View(ctx, func(tx kv.Tx) error {
|
||||||
|
a, e := s.store.GetAuthorizationByID(ctx, tx, *filter.ID)
|
||||||
|
if e != nil {
|
||||||
|
return e
|
||||||
|
}
|
||||||
|
auth = a
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, 0, &influxdb.Error{
|
||||||
|
Err: err,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return []*influxdb.Authorization{auth}, 1, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if filter.Token != nil {
|
||||||
|
var auth *influxdb.Authorization
|
||||||
|
err := s.store.View(ctx, func(tx kv.Tx) error {
|
||||||
|
a, e := s.store.GetAuthorizationByToken(ctx, tx, *filter.Token)
|
||||||
|
if e != nil {
|
||||||
|
return e
|
||||||
|
}
|
||||||
|
auth = a
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, 0, &influxdb.Error{
|
||||||
|
Err: err,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return []*influxdb.Authorization{auth}, 1, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
as := []*influxdb.Authorization{}
|
||||||
|
err := s.store.View(ctx, func(tx kv.Tx) error {
|
||||||
|
auths, err := s.store.ListAuthorizations(ctx, tx, filter)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
as = auths
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return nil, 0, &influxdb.Error{
|
||||||
|
Err: err,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return as, len(as), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// UpdateAuthorization updates the status and description if available.
|
||||||
|
func (s *Service) UpdateAuthorization(ctx context.Context, id influxdb.ID, upd *influxdb.AuthorizationUpdate) (*influxdb.Authorization, error) {
|
||||||
|
var auth *influxdb.Authorization
|
||||||
|
err := s.store.View(ctx, func(tx kv.Tx) error {
|
||||||
|
a, e := s.store.GetAuthorizationByID(ctx, tx, id)
|
||||||
|
if e != nil {
|
||||||
|
return e
|
||||||
|
}
|
||||||
|
auth = a
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return nil, &influxdb.Error{
|
||||||
|
Code: influxdb.ENotFound,
|
||||||
|
Err: err,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if upd.Status != nil {
|
||||||
|
auth.Status = *upd.Status
|
||||||
|
}
|
||||||
|
if upd.Description != nil {
|
||||||
|
auth.Description = *upd.Description
|
||||||
|
}
|
||||||
|
|
||||||
|
auth.SetUpdatedAt(time.Now())
|
||||||
|
|
||||||
|
err = s.store.Update(ctx, func(tx kv.Tx) error {
|
||||||
|
a, e := s.store.UpdateAuthorization(ctx, tx, id, auth)
|
||||||
|
if e != nil {
|
||||||
|
return e
|
||||||
|
}
|
||||||
|
auth = a
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
return auth, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Service) DeleteAuthorization(ctx context.Context, id influxdb.ID) error {
|
||||||
|
return s.store.Update(ctx, func(tx kv.Tx) (err error) {
|
||||||
|
return s.store.DeleteAuthorization(ctx, tx, id)
|
||||||
|
})
|
||||||
|
}
|
|
@ -0,0 +1,106 @@
|
||||||
|
package authorization
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"github.com/influxdata/influxdb/v2"
|
||||||
|
"github.com/influxdata/influxdb/v2/kit/tracing"
|
||||||
|
"github.com/influxdata/influxdb/v2/kv"
|
||||||
|
"github.com/influxdata/influxdb/v2/snowflake"
|
||||||
|
)
|
||||||
|
|
||||||
|
const MaxIDGenerationN = 100
|
||||||
|
const ReservedIDs = 1000
|
||||||
|
|
||||||
|
var (
|
||||||
|
authBucket = []byte("legacy/authorizationsv1")
|
||||||
|
authIndex = []byte("legacy/authorizationindexv1")
|
||||||
|
)
|
||||||
|
|
||||||
|
type Store struct {
|
||||||
|
kvStore kv.Store
|
||||||
|
IDGen influxdb.IDGenerator
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewStore(kvStore kv.Store) (*Store, error) {
|
||||||
|
st := &Store{
|
||||||
|
kvStore: kvStore,
|
||||||
|
IDGen: snowflake.NewDefaultIDGenerator(),
|
||||||
|
}
|
||||||
|
return st, st.setup()
|
||||||
|
}
|
||||||
|
|
||||||
|
// View opens up a transaction that will not write to any data. Implementing interfaces
|
||||||
|
// should take care to ensure that all view transactions do not mutate any data.
|
||||||
|
func (s *Store) View(ctx context.Context, fn func(kv.Tx) error) error {
|
||||||
|
return s.kvStore.View(ctx, fn)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update opens up a transaction that will mutate data.
|
||||||
|
func (s *Store) Update(ctx context.Context, fn func(kv.Tx) error) error {
|
||||||
|
return s.kvStore.Update(ctx, fn)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Store) setup() error {
|
||||||
|
return s.Update(context.Background(), func(tx kv.Tx) error {
|
||||||
|
if _, err := tx.Bucket(authBucket); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if _, err := authIndexBucket(tx); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// generateSafeID attempts to create ids for buckets
|
||||||
|
// and orgs that are without backslash, commas, and spaces, BUT ALSO do not already exist.
|
||||||
|
func (s *Store) generateSafeID(ctx context.Context, tx kv.Tx, bucket []byte) (influxdb.ID, error) {
|
||||||
|
for i := 0; i < MaxIDGenerationN; i++ {
|
||||||
|
id := s.IDGen.ID()
|
||||||
|
|
||||||
|
// TODO: this is probably unnecessary but for testing we need to keep it in.
|
||||||
|
// After KV is cleaned out we can update the tests and remove this.
|
||||||
|
if id < ReservedIDs {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
err := s.uniqueID(ctx, tx, bucket, id)
|
||||||
|
if err == nil {
|
||||||
|
return id, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if err == NotUniqueIDError {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
return influxdb.InvalidID(), err
|
||||||
|
}
|
||||||
|
return influxdb.InvalidID(), ErrFailureGeneratingID
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Store) uniqueID(ctx context.Context, tx kv.Tx, bucket []byte, id influxdb.ID) error {
|
||||||
|
span, _ := tracing.StartSpanFromContext(ctx)
|
||||||
|
defer span.Finish()
|
||||||
|
|
||||||
|
encodedID, err := id.Encode()
|
||||||
|
if err != nil {
|
||||||
|
return &influxdb.Error{
|
||||||
|
Code: influxdb.EInvalid,
|
||||||
|
Err: err,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
b, err := tx.Bucket(bucket)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = b.Get(encodedID)
|
||||||
|
if kv.IsNotFound(err) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return NotUniqueIDError
|
||||||
|
}
|
|
@ -0,0 +1,453 @@
|
||||||
|
package authorization
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
|
||||||
|
"github.com/buger/jsonparser"
|
||||||
|
"github.com/influxdata/influxdb/v2"
|
||||||
|
"github.com/influxdata/influxdb/v2/kv"
|
||||||
|
jsonp "github.com/influxdata/influxdb/v2/pkg/jsonparser"
|
||||||
|
)
|
||||||
|
|
||||||
|
func authIndexKey(n string) []byte {
|
||||||
|
return []byte(n)
|
||||||
|
}
|
||||||
|
|
||||||
|
func authIndexBucket(tx kv.Tx) (kv.Bucket, error) {
|
||||||
|
b, err := tx.Bucket([]byte(authIndex))
|
||||||
|
if err != nil {
|
||||||
|
return nil, UnexpectedAuthIndexError(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return b, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func encodeAuthorization(a *influxdb.Authorization) ([]byte, error) {
|
||||||
|
switch a.Status {
|
||||||
|
case influxdb.Active, influxdb.Inactive:
|
||||||
|
case "":
|
||||||
|
a.Status = influxdb.Active
|
||||||
|
default:
|
||||||
|
return nil, &influxdb.Error{
|
||||||
|
Code: influxdb.EInvalid,
|
||||||
|
Msg: "unknown authorization status",
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return json.Marshal(a)
|
||||||
|
}
|
||||||
|
|
||||||
|
func decodeAuthorization(b []byte, a *influxdb.Authorization) error {
|
||||||
|
if err := json.Unmarshal(b, a); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if a.Status == "" {
|
||||||
|
a.Status = influxdb.Active
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// CreateAuthorization takes an Authorization object and saves it in storage using its token
|
||||||
|
// using its token property as an index
|
||||||
|
func (s *Store) CreateAuthorization(ctx context.Context, tx kv.Tx, a *influxdb.Authorization) error {
|
||||||
|
// if the provided ID is invalid, or already maps to an existing Auth, then generate a new one
|
||||||
|
if !a.ID.Valid() {
|
||||||
|
id, err := s.generateSafeID(ctx, tx, authBucket)
|
||||||
|
if err != nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
a.ID = id
|
||||||
|
} else if err := uniqueID(ctx, tx, a.ID); err != nil {
|
||||||
|
id, err := s.generateSafeID(ctx, tx, authBucket)
|
||||||
|
if err != nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
a.ID = id
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := s.uniqueAuthToken(ctx, tx, a); err != nil {
|
||||||
|
return ErrTokenAlreadyExistsError
|
||||||
|
}
|
||||||
|
|
||||||
|
v, err := encodeAuthorization(a)
|
||||||
|
if err != nil {
|
||||||
|
return &influxdb.Error{
|
||||||
|
Code: influxdb.EInvalid,
|
||||||
|
Err: err,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
encodedID, err := a.ID.Encode()
|
||||||
|
if err != nil {
|
||||||
|
return ErrInvalidAuthIDError(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
idx, err := authIndexBucket(tx)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := idx.Put(authIndexKey(a.Token), encodedID); err != nil {
|
||||||
|
return &influxdb.Error{
|
||||||
|
Code: influxdb.EInternal,
|
||||||
|
Err: err,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
b, err := tx.Bucket(authBucket)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := b.Put(encodedID, v); err != nil {
|
||||||
|
return &influxdb.Error{
|
||||||
|
Err: err,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetAuthorization gets an authorization by its ID from the auth bucket in kv
|
||||||
|
func (s *Store) GetAuthorizationByID(ctx context.Context, tx kv.Tx, id influxdb.ID) (*influxdb.Authorization, error) {
|
||||||
|
encodedID, err := id.Encode()
|
||||||
|
if err != nil {
|
||||||
|
return nil, ErrInvalidAuthID
|
||||||
|
}
|
||||||
|
|
||||||
|
b, err := tx.Bucket(authBucket)
|
||||||
|
if err != nil {
|
||||||
|
return nil, ErrInternalServiceError(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
v, err := b.Get(encodedID)
|
||||||
|
if kv.IsNotFound(err) {
|
||||||
|
return nil, ErrAuthNotFound
|
||||||
|
}
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return nil, ErrInternalServiceError(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
a := &influxdb.Authorization{}
|
||||||
|
if err := decodeAuthorization(v, a); err != nil {
|
||||||
|
return nil, &influxdb.Error{
|
||||||
|
Code: influxdb.EInvalid,
|
||||||
|
Err: err,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return a, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Store) GetAuthorizationByToken(ctx context.Context, tx kv.Tx, token string) (*influxdb.Authorization, error) {
|
||||||
|
idx, err := authIndexBucket(tx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// use the token to look up the authorization's ID
|
||||||
|
idKey, err := idx.Get(authIndexKey(token))
|
||||||
|
if kv.IsNotFound(err) {
|
||||||
|
return nil, &influxdb.Error{
|
||||||
|
Code: influxdb.ENotFound,
|
||||||
|
Msg: "authorization not found",
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var id influxdb.ID
|
||||||
|
if err := id.Decode(idKey); err != nil {
|
||||||
|
return nil, &influxdb.Error{
|
||||||
|
Code: influxdb.EInvalid,
|
||||||
|
Err: err,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return s.GetAuthorizationByID(ctx, tx, id)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ListAuthorizations returns all the authorizations matching a set of FindOptions. This function is used for
|
||||||
|
// FindAuthorizationByID, FindAuthorizationByToken, and FindAuthorizations in the AuthorizationService implementation
|
||||||
|
func (s *Store) ListAuthorizations(ctx context.Context, tx kv.Tx, f influxdb.AuthorizationFilter) ([]*influxdb.Authorization, error) {
|
||||||
|
var as []*influxdb.Authorization
|
||||||
|
pred := authorizationsPredicateFn(f)
|
||||||
|
filterFn := filterAuthorizationsFn(f)
|
||||||
|
err := s.forEachAuthorization(ctx, tx, pred, func(a *influxdb.Authorization) bool {
|
||||||
|
if filterFn(a) {
|
||||||
|
as = append(as, a)
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return as, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// forEachAuthorization will iterate through all authorizations while fn returns true.
|
||||||
|
func (s *Store) forEachAuthorization(ctx context.Context, tx kv.Tx, pred kv.CursorPredicateFunc, fn func(*influxdb.Authorization) bool) error {
|
||||||
|
b, err := tx.Bucket(authBucket)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
var cur kv.Cursor
|
||||||
|
if pred != nil {
|
||||||
|
cur, err = b.Cursor(kv.WithCursorHintPredicate(pred))
|
||||||
|
} else {
|
||||||
|
cur, err = b.Cursor()
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
for k, v := cur.First(); k != nil; k, v = cur.Next() {
|
||||||
|
// preallocate Permissions to reduce multiple slice re-allocations
|
||||||
|
a := &influxdb.Authorization{
|
||||||
|
Permissions: make([]influxdb.Permission, 64),
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := decodeAuthorization(v, a); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if !fn(a) {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// UpdateAuthorization updates the status and description only of an authorization
|
||||||
|
func (s *Store) UpdateAuthorization(ctx context.Context, tx kv.Tx, id influxdb.ID, a *influxdb.Authorization) (*influxdb.Authorization, error) {
|
||||||
|
v, err := encodeAuthorization(a)
|
||||||
|
if err != nil {
|
||||||
|
return nil, &influxdb.Error{
|
||||||
|
Code: influxdb.EInvalid,
|
||||||
|
Err: err,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
encodedID, err := a.ID.Encode()
|
||||||
|
if err != nil {
|
||||||
|
return nil, &influxdb.Error{
|
||||||
|
Code: influxdb.ENotFound,
|
||||||
|
Err: err,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
idx, err := authIndexBucket(tx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := idx.Put(authIndexKey(a.Token), encodedID); err != nil {
|
||||||
|
return nil, &influxdb.Error{
|
||||||
|
Code: influxdb.EInternal,
|
||||||
|
Err: err,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
b, err := tx.Bucket(authBucket)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := b.Put(encodedID, v); err != nil {
|
||||||
|
return nil, &influxdb.Error{
|
||||||
|
Err: err,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return a, nil
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
// DeleteAuthorization removes an authorization from storage
|
||||||
|
func (s *Store) DeleteAuthorization(ctx context.Context, tx kv.Tx, id influxdb.ID) error {
|
||||||
|
a, err := s.GetAuthorizationByID(ctx, tx, id)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
encodedID, err := id.Encode()
|
||||||
|
if err != nil {
|
||||||
|
return ErrInvalidAuthID
|
||||||
|
}
|
||||||
|
|
||||||
|
idx, err := authIndexBucket(tx)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
b, err := tx.Bucket(authBucket)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := idx.Delete([]byte(a.Token)); err != nil {
|
||||||
|
return ErrInternalServiceError(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := b.Delete(encodedID); err != nil {
|
||||||
|
return ErrInternalServiceError(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Store) uniqueAuthToken(ctx context.Context, tx kv.Tx, a *influxdb.Authorization) error {
|
||||||
|
err := unique(ctx, tx, authIndex, authIndexKey(a.Token))
|
||||||
|
if err == kv.NotUniqueError {
|
||||||
|
// by returning a generic error we are trying to hide when
|
||||||
|
// a token is non-unique.
|
||||||
|
return influxdb.ErrUnableToCreateToken
|
||||||
|
}
|
||||||
|
// otherwise, this is some sort of internal server error and we
|
||||||
|
// should provide some debugging information.
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func unique(ctx context.Context, tx kv.Tx, indexBucket, indexKey []byte) error {
|
||||||
|
bucket, err := tx.Bucket(indexBucket)
|
||||||
|
if err != nil {
|
||||||
|
return kv.UnexpectedIndexError(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = bucket.Get(indexKey)
|
||||||
|
// if not found then this token is unique.
|
||||||
|
if kv.IsNotFound(err) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// no error means this is not unique
|
||||||
|
if err == nil {
|
||||||
|
return kv.NotUniqueError
|
||||||
|
}
|
||||||
|
|
||||||
|
// any other error is some sort of internal server error
|
||||||
|
return kv.UnexpectedIndexError(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// uniqueID returns nil if the ID provided is unique, returns an error otherwise
|
||||||
|
func uniqueID(ctx context.Context, tx kv.Tx, id influxdb.ID) error {
|
||||||
|
encodedID, err := id.Encode()
|
||||||
|
if err != nil {
|
||||||
|
return ErrInvalidAuthID
|
||||||
|
}
|
||||||
|
|
||||||
|
b, err := tx.Bucket(authBucket)
|
||||||
|
if err != nil {
|
||||||
|
return ErrInternalServiceError(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = b.Get(encodedID)
|
||||||
|
// if not found then the ID is unique
|
||||||
|
if kv.IsNotFound(err) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
// no error means this is not unique
|
||||||
|
if err == nil {
|
||||||
|
return kv.NotUniqueError
|
||||||
|
}
|
||||||
|
|
||||||
|
// any other error is some sort of internal server error
|
||||||
|
return kv.UnexpectedIndexError(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func authorizationsPredicateFn(f influxdb.AuthorizationFilter) kv.CursorPredicateFunc {
|
||||||
|
// if any errors occur reading the JSON data, the predicate will always return true
|
||||||
|
// to ensure the value is included and handled higher up.
|
||||||
|
|
||||||
|
if f.ID != nil {
|
||||||
|
exp := *f.ID
|
||||||
|
return func(_, value []byte) bool {
|
||||||
|
got, err := jsonp.GetID(value, "id")
|
||||||
|
if err != nil {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
return got == exp
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if f.Token != nil {
|
||||||
|
exp := *f.Token
|
||||||
|
return func(_, value []byte) bool {
|
||||||
|
// it is assumed that token never has escaped string data
|
||||||
|
got, _, _, err := jsonparser.Get(value, "token")
|
||||||
|
if err != nil {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
return string(got) == exp
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var pred kv.CursorPredicateFunc
|
||||||
|
if f.OrgID != nil {
|
||||||
|
exp := *f.OrgID
|
||||||
|
pred = func(_, value []byte) bool {
|
||||||
|
got, err := jsonp.GetID(value, "orgID")
|
||||||
|
if err != nil {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
return got == exp
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if f.UserID != nil {
|
||||||
|
exp := *f.UserID
|
||||||
|
prevFn := pred
|
||||||
|
pred = func(key, value []byte) bool {
|
||||||
|
prev := prevFn == nil || prevFn(key, value)
|
||||||
|
got, exists, err := jsonp.GetOptionalID(value, "userID")
|
||||||
|
return prev && ((exp == got && exists) || err != nil)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return pred
|
||||||
|
}
|
||||||
|
|
||||||
|
type predicateFunc func(a *influxdb.Authorization) bool
|
||||||
|
|
||||||
|
func filterAuthorizationsFn(filter influxdb.AuthorizationFilter) predicateFunc {
|
||||||
|
|
||||||
|
if filter.ID != nil {
|
||||||
|
return func(a *influxdb.Authorization) bool {
|
||||||
|
return a.ID == *filter.ID
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if filter.Token != nil {
|
||||||
|
return func(a *influxdb.Authorization) bool {
|
||||||
|
return a.Token == *filter.Token
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var pred predicateFunc
|
||||||
|
if filter.OrgID != nil {
|
||||||
|
exp := *filter.OrgID
|
||||||
|
prevFn := pred
|
||||||
|
pred = func(a *influxdb.Authorization) bool {
|
||||||
|
prev := prevFn == nil || prevFn(a)
|
||||||
|
return prev && a.OrgID == exp
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if filter.UserID != nil {
|
||||||
|
exp := *filter.UserID
|
||||||
|
prevFn := pred
|
||||||
|
pred = func(a *influxdb.Authorization) bool {
|
||||||
|
prev := prevFn == nil || prevFn(a)
|
||||||
|
return prev && a.UserID == exp
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if pred == nil {
|
||||||
|
pred = func(a *influxdb.Authorization) bool { return true }
|
||||||
|
}
|
||||||
|
|
||||||
|
return pred
|
||||||
|
}
|
|
@ -0,0 +1,342 @@
|
||||||
|
package authorization
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"reflect"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/influxdata/influxdb/v2"
|
||||||
|
"github.com/influxdata/influxdb/v2/inmem"
|
||||||
|
"github.com/influxdata/influxdb/v2/kv"
|
||||||
|
"github.com/influxdata/influxdb/v2/kv/migration/all"
|
||||||
|
"github.com/influxdata/influxdb/v2/pkg/pointer"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"go.uber.org/zap/zaptest"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestAuth(t *testing.T) {
|
||||||
|
setup := func(t *testing.T, store *Store, tx kv.Tx) {
|
||||||
|
for i := 1; i <= 10; i++ {
|
||||||
|
err := store.CreateAuthorization(context.Background(), tx, &influxdb.Authorization{
|
||||||
|
ID: influxdb.ID(i),
|
||||||
|
Token: fmt.Sprintf("randomtoken%d", i),
|
||||||
|
OrgID: influxdb.ID(i),
|
||||||
|
UserID: influxdb.ID(i),
|
||||||
|
Status: influxdb.Active,
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
tt := []struct {
|
||||||
|
name string
|
||||||
|
setup func(*testing.T, *Store, kv.Tx)
|
||||||
|
update func(*testing.T, *Store, kv.Tx)
|
||||||
|
results func(*testing.T, *Store, kv.Tx)
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "create",
|
||||||
|
setup: setup,
|
||||||
|
results: func(t *testing.T, store *Store, tx kv.Tx) {
|
||||||
|
auths, err := store.ListAuthorizations(context.Background(), tx, influxdb.AuthorizationFilter{})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(auths) != 10 {
|
||||||
|
t.Fatalf("expected 10 authorizations, got: %d", len(auths))
|
||||||
|
}
|
||||||
|
|
||||||
|
expected := []*influxdb.Authorization{}
|
||||||
|
for i := 1; i <= 10; i++ {
|
||||||
|
expected = append(expected, &influxdb.Authorization{
|
||||||
|
ID: influxdb.ID(i),
|
||||||
|
Token: fmt.Sprintf("randomtoken%d", i),
|
||||||
|
OrgID: influxdb.ID(i),
|
||||||
|
UserID: influxdb.ID(i),
|
||||||
|
Status: "active",
|
||||||
|
})
|
||||||
|
}
|
||||||
|
if !reflect.DeepEqual(auths, expected) {
|
||||||
|
t.Fatalf("expected identical authorizations: \n%+v\n%+v", auths, expected)
|
||||||
|
}
|
||||||
|
|
||||||
|
// should not be able to create two authorizations with identical tokens
|
||||||
|
err = store.CreateAuthorization(context.Background(), tx, &influxdb.Authorization{
|
||||||
|
ID: influxdb.ID(1),
|
||||||
|
Token: fmt.Sprintf("randomtoken%d", 1),
|
||||||
|
OrgID: influxdb.ID(1),
|
||||||
|
UserID: influxdb.ID(1),
|
||||||
|
})
|
||||||
|
if err == nil {
|
||||||
|
t.Fatalf("expected to be unable to create authorizations with identical tokens")
|
||||||
|
}
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "read",
|
||||||
|
setup: setup,
|
||||||
|
results: func(t *testing.T, store *Store, tx kv.Tx) {
|
||||||
|
for i := 1; i <= 10; i++ {
|
||||||
|
expectedAuth := &influxdb.Authorization{
|
||||||
|
ID: influxdb.ID(i),
|
||||||
|
Token: fmt.Sprintf("randomtoken%d", i),
|
||||||
|
OrgID: influxdb.ID(i),
|
||||||
|
UserID: influxdb.ID(i),
|
||||||
|
Status: influxdb.Active,
|
||||||
|
}
|
||||||
|
|
||||||
|
authByID, err := store.GetAuthorizationByID(context.Background(), tx, influxdb.ID(i))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Unexpectedly could not acquire Authorization by ID [Error]: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !reflect.DeepEqual(authByID, expectedAuth) {
|
||||||
|
t.Fatalf("ID TEST: expected identical authorizations:\n[Expected]: %+#v\n[Got]: %+#v", expectedAuth, authByID)
|
||||||
|
}
|
||||||
|
|
||||||
|
authByToken, err := store.GetAuthorizationByToken(context.Background(), tx, fmt.Sprintf("randomtoken%d", i))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("cannot get authorization by Token [Error]: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !reflect.DeepEqual(authByToken, expectedAuth) {
|
||||||
|
t.Fatalf("TOKEN TEST: expected identical authorizations:\n[Expected]: %+#v\n[Got]: %+#v", expectedAuth, authByToken)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "update",
|
||||||
|
setup: setup,
|
||||||
|
update: func(t *testing.T, store *Store, tx kv.Tx) {
|
||||||
|
for i := 1; i <= 10; i++ {
|
||||||
|
auth, err := store.GetAuthorizationByID(context.Background(), tx, influxdb.ID(i))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Could not get authorization [Error]: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
auth.Status = influxdb.Inactive
|
||||||
|
|
||||||
|
_, err = store.UpdateAuthorization(context.Background(), tx, influxdb.ID(i), auth)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Could not get updated authorization [Error]: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
results: func(t *testing.T, store *Store, tx kv.Tx) {
|
||||||
|
|
||||||
|
for i := 1; i <= 10; i++ {
|
||||||
|
auth, err := store.GetAuthorizationByID(context.Background(), tx, influxdb.ID(i))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Could not get authorization [Error]: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
expectedAuth := &influxdb.Authorization{
|
||||||
|
ID: influxdb.ID(i),
|
||||||
|
Token: fmt.Sprintf("randomtoken%d", i),
|
||||||
|
OrgID: influxdb.ID(i),
|
||||||
|
UserID: influxdb.ID(i),
|
||||||
|
Status: influxdb.Inactive,
|
||||||
|
}
|
||||||
|
|
||||||
|
if !reflect.DeepEqual(auth, expectedAuth) {
|
||||||
|
t.Fatalf("expected identical authorizations:\n[Expected] %+#v\n[Got] %+#v", expectedAuth, auth)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "delete",
|
||||||
|
setup: setup,
|
||||||
|
update: func(t *testing.T, store *Store, tx kv.Tx) {
|
||||||
|
for i := 1; i <= 10; i++ {
|
||||||
|
err := store.DeleteAuthorization(context.Background(), tx, influxdb.ID(i))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Could not delete authorization [Error]: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
results: func(t *testing.T, store *Store, tx kv.Tx) {
|
||||||
|
for i := 1; i <= 10; i++ {
|
||||||
|
_, err := store.GetAuthorizationByID(context.Background(), tx, influxdb.ID(i))
|
||||||
|
if err == nil {
|
||||||
|
t.Fatal("Authorization was not deleted correctly")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, testScenario := range tt {
|
||||||
|
t.Run(testScenario.name, func(t *testing.T) {
|
||||||
|
store := inmem.NewKVStore()
|
||||||
|
if err := all.Up(context.Background(), zaptest.NewLogger(t), store); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
ts, err := NewStore(store)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// setup
|
||||||
|
if testScenario.setup != nil {
|
||||||
|
err := ts.Update(context.Background(), func(tx kv.Tx) error {
|
||||||
|
testScenario.setup(t, ts, tx)
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// update
|
||||||
|
if testScenario.update != nil {
|
||||||
|
err := ts.Update(context.Background(), func(tx kv.Tx) error {
|
||||||
|
testScenario.update(t, ts, tx)
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// results
|
||||||
|
if testScenario.results != nil {
|
||||||
|
err := ts.View(context.Background(), func(tx kv.Tx) error {
|
||||||
|
testScenario.results(t, ts, tx)
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func Test_filterAuthorizationsFn(t *testing.T) {
|
||||||
|
var (
|
||||||
|
otherID = influxdb.ID(999)
|
||||||
|
)
|
||||||
|
|
||||||
|
auth := influxdb.Authorization{
|
||||||
|
ID: 1000,
|
||||||
|
Token: "foo",
|
||||||
|
Status: influxdb.Active,
|
||||||
|
OrgID: 2000,
|
||||||
|
UserID: 3000,
|
||||||
|
}
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
filt influxdb.AuthorizationFilter
|
||||||
|
auth influxdb.Authorization
|
||||||
|
exp bool
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "default is true",
|
||||||
|
filt: influxdb.AuthorizationFilter{},
|
||||||
|
auth: auth,
|
||||||
|
exp: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "match id",
|
||||||
|
filt: influxdb.AuthorizationFilter{
|
||||||
|
ID: &auth.ID,
|
||||||
|
},
|
||||||
|
auth: auth,
|
||||||
|
exp: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "no match id",
|
||||||
|
filt: influxdb.AuthorizationFilter{
|
||||||
|
ID: &otherID,
|
||||||
|
},
|
||||||
|
auth: auth,
|
||||||
|
exp: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "match token",
|
||||||
|
filt: influxdb.AuthorizationFilter{
|
||||||
|
Token: &auth.Token,
|
||||||
|
},
|
||||||
|
auth: auth,
|
||||||
|
exp: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "no match token",
|
||||||
|
filt: influxdb.AuthorizationFilter{
|
||||||
|
Token: pointer.String("2"),
|
||||||
|
},
|
||||||
|
auth: auth,
|
||||||
|
exp: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "match org",
|
||||||
|
filt: influxdb.AuthorizationFilter{
|
||||||
|
OrgID: &auth.OrgID,
|
||||||
|
},
|
||||||
|
auth: auth,
|
||||||
|
exp: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "no match org",
|
||||||
|
filt: influxdb.AuthorizationFilter{
|
||||||
|
OrgID: &otherID,
|
||||||
|
},
|
||||||
|
auth: auth,
|
||||||
|
exp: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "match user",
|
||||||
|
filt: influxdb.AuthorizationFilter{
|
||||||
|
UserID: &auth.UserID,
|
||||||
|
},
|
||||||
|
auth: auth,
|
||||||
|
exp: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "no match user",
|
||||||
|
filt: influxdb.AuthorizationFilter{
|
||||||
|
UserID: &otherID,
|
||||||
|
},
|
||||||
|
auth: auth,
|
||||||
|
exp: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "match org and user",
|
||||||
|
filt: influxdb.AuthorizationFilter{
|
||||||
|
OrgID: &auth.OrgID,
|
||||||
|
UserID: &auth.UserID,
|
||||||
|
},
|
||||||
|
auth: auth,
|
||||||
|
exp: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "no match org and user",
|
||||||
|
filt: influxdb.AuthorizationFilter{
|
||||||
|
OrgID: &otherID,
|
||||||
|
UserID: &auth.UserID,
|
||||||
|
},
|
||||||
|
auth: auth,
|
||||||
|
exp: false,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range tests {
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
pred := filterAuthorizationsFn(tc.filt)
|
||||||
|
got := pred(&tc.auth)
|
||||||
|
assert.Equal(t, tc.exp, got)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue