feat: switch to use the new tenant bucket service (#18738)
parent
e5fa7eb571
commit
5776350a53
|
@ -1103,6 +1103,13 @@ func (m *Launcher) run(ctx context.Context) (err error) {
|
|||
orgHTTPServer = tenant.NewHTTPOrgHandler(m.log.With(zap.String("handler", "org")), tenant.NewAuthedOrgService(orgSvc), urmHandler, labelHandler, secretHandler)
|
||||
}
|
||||
|
||||
var bucketHTTPServer *tenant.BucketHandler
|
||||
{
|
||||
urmHandler := tenant.NewURMHandler(m.log.With(zap.String("handler", "urm")), platform.OrgsResourceType, "id", userSvc, tenant.NewAuthedURMService(orgSvc, userResourceSvc))
|
||||
labelHandler := label.NewHTTPEmbeddedHandler(m.log.With(zap.String("handler", "label")), platform.BucketsResourceType, labelSvc)
|
||||
bucketHTTPServer = tenant.NewHTTPBucketHandler(m.log.With(zap.String("handler", "bucket")), tenant.NewAuthedBucketService(bucketSvc), labelSvc, urmHandler, labelHandler)
|
||||
}
|
||||
|
||||
{
|
||||
platformHandler := http.NewPlatformHandler(m.apibackend,
|
||||
http.WithResourceHandler(pkgHTTPServerDeprecated),
|
||||
|
@ -1116,6 +1123,7 @@ func (m *Launcher) run(ctx context.Context) (err error) {
|
|||
http.WithResourceHandler(userHTTPServer.MeResourceHandler()),
|
||||
http.WithResourceHandler(userHTTPServer.UserResourceHandler()),
|
||||
http.WithResourceHandler(orgHTTPServer),
|
||||
http.WithResourceHandler(bucketHTTPServer),
|
||||
)
|
||||
|
||||
httpLogger := m.log.With(zap.String("service", "http"))
|
||||
|
|
|
@ -130,10 +130,6 @@ func NewAPIHandler(b *APIBackend, opts ...APIHandlerOptFn) *APIHandler {
|
|||
|
||||
h.Mount("/api/v2", serveLinksHandler(b.HTTPErrorHandler))
|
||||
|
||||
bucketBackend := NewBucketBackend(b.Logger.With(zap.String("handler", "bucket")), b)
|
||||
bucketBackend.BucketService = authorizer.NewBucketService(b.BucketService, noAuthUserResourceMappingService)
|
||||
h.Mount(prefixBuckets, NewBucketHandler(b.Logger, bucketBackend))
|
||||
|
||||
checkBackend := NewCheckBackend(b.Logger.With(zap.String("handler", "check")), b)
|
||||
checkBackend.CheckService = authorizer.NewCheckService(b.CheckService,
|
||||
b.UserResourceMappingService, b.OrganizationService)
|
||||
|
|
|
@ -152,7 +152,11 @@ func ValidResource(api *API, lookupOrgByResourceID func(context.Context, influxd
|
|||
|
||||
orgID, err := lookupOrgByResourceID(ctx, *id)
|
||||
if err != nil {
|
||||
api.Err(w, r, err)
|
||||
// if this function returns an error we will squash the error message and replace it with a not found error
|
||||
api.Err(w, r, &influxdb.Error{
|
||||
Code: influxdb.ENotFound,
|
||||
Msg: "404 page not found",
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
|
|
|
@ -7,6 +7,7 @@ import (
|
|||
"github.com/go-chi/chi"
|
||||
"github.com/go-chi/chi/middleware"
|
||||
"github.com/influxdata/influxdb/v2"
|
||||
ihttp "github.com/influxdata/influxdb/v2/http"
|
||||
kithttp "github.com/influxdata/influxdb/v2/kit/transport/http"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
@ -28,7 +29,7 @@ func NewHTTPEmbeddedHandler(log *zap.Logger, rt influxdb.ResourceType, ls influx
|
|||
rt: rt,
|
||||
}
|
||||
|
||||
r := chi.NewRouter()
|
||||
r := ihttp.NewBaseChiRouter(h.api)
|
||||
r.Use(
|
||||
middleware.Recoverer,
|
||||
middleware.RequestID,
|
||||
|
|
|
@ -20,6 +20,7 @@ type BucketHandler struct {
|
|||
api *kithttp.API
|
||||
log *zap.Logger
|
||||
bucketSvc influxdb.BucketService
|
||||
labelSvc influxdb.LabelService // we may need this for now but we dont want it perminantly
|
||||
}
|
||||
|
||||
const (
|
||||
|
@ -27,11 +28,12 @@ const (
|
|||
)
|
||||
|
||||
// NewHTTPBucketHandler constructs a new http server.
|
||||
func NewHTTPBucketHandler(log *zap.Logger, bucketSvc influxdb.BucketService, urmHandler, labelHandler http.Handler) *BucketHandler {
|
||||
func NewHTTPBucketHandler(log *zap.Logger, bucketSvc influxdb.BucketService, labelSvc influxdb.LabelService, urmHandler, labelHandler http.Handler) *BucketHandler {
|
||||
svr := &BucketHandler{
|
||||
api: kithttp.NewAPI(kithttp.WithLog(log)),
|
||||
log: log,
|
||||
bucketSvc: bucketSvc,
|
||||
labelSvc: labelSvc,
|
||||
}
|
||||
|
||||
r := chi.NewRouter()
|
||||
|
@ -211,10 +213,11 @@ func newBucketUpdate(pb *influxdb.BucketUpdate) *bucketUpdate {
|
|||
|
||||
type bucketResponse struct {
|
||||
bucket
|
||||
Links map[string]string `json:"links"`
|
||||
Links map[string]string `json:"links"`
|
||||
Labels []influxdb.Label `json:"labels"`
|
||||
}
|
||||
|
||||
func NewBucketResponse(b *influxdb.Bucket) *bucketResponse {
|
||||
func NewBucketResponse(b *influxdb.Bucket, labels ...*influxdb.Label) *bucketResponse {
|
||||
res := &bucketResponse{
|
||||
Links: map[string]string{
|
||||
"self": fmt.Sprintf("/api/v2/buckets/%s", b.ID),
|
||||
|
@ -225,6 +228,10 @@ func NewBucketResponse(b *influxdb.Bucket) *bucketResponse {
|
|||
"write": fmt.Sprintf("/api/v2/write?org=%s&bucket=%s", b.OrgID, b.ID),
|
||||
},
|
||||
bucket: *newBucket(b),
|
||||
Labels: []influxdb.Label{},
|
||||
}
|
||||
for _, l := range labels {
|
||||
res.Labels = append(res.Labels, *l)
|
||||
}
|
||||
|
||||
return res
|
||||
|
@ -235,10 +242,14 @@ type bucketsResponse struct {
|
|||
Buckets []*bucketResponse `json:"buckets"`
|
||||
}
|
||||
|
||||
func newBucketsResponse(ctx context.Context, opts influxdb.FindOptions, f influxdb.BucketFilter, bs []*influxdb.Bucket) *bucketsResponse {
|
||||
func newBucketsResponse(ctx context.Context, opts influxdb.FindOptions, f influxdb.BucketFilter, bs []*influxdb.Bucket, labelSvc influxdb.LabelService) *bucketsResponse {
|
||||
rs := make([]*bucketResponse, 0, len(bs))
|
||||
for _, b := range bs {
|
||||
rs = append(rs, NewBucketResponse(b))
|
||||
var labels []*influxdb.Label
|
||||
if labelSvc != nil { // allow for no label svc
|
||||
labels, _ = labelSvc.FindResourceLabels(ctx, influxdb.LabelMappingFilter{ResourceID: b.ID, ResourceType: influxdb.BucketsResourceType})
|
||||
}
|
||||
rs = append(rs, NewBucketResponse(b, labels...))
|
||||
}
|
||||
return &bucketsResponse{
|
||||
Links: influxdb.NewPagingLinks(prefixBuckets, opts, f, len(bs)),
|
||||
|
@ -345,8 +356,11 @@ func (h *BucketHandler) handleGetBucket(w http.ResponseWriter, r *http.Request)
|
|||
}
|
||||
|
||||
h.log.Debug("Bucket retrieved", zap.String("bucket", fmt.Sprint(b)))
|
||||
|
||||
h.api.Respond(w, r, http.StatusOK, NewBucketResponse(b))
|
||||
var labels []*influxdb.Label
|
||||
if h.labelSvc != nil { // allow for no label svc
|
||||
labels, _ = h.labelSvc.FindResourceLabels(ctx, influxdb.LabelMappingFilter{ResourceID: b.ID, ResourceType: influxdb.BucketsResourceType})
|
||||
}
|
||||
h.api.Respond(w, r, http.StatusOK, NewBucketResponse(b, labels...))
|
||||
}
|
||||
|
||||
// handleDeleteBucket is the HTTP handler for the DELETE /api/v2/buckets/:id route.
|
||||
|
@ -382,7 +396,7 @@ func (h *BucketHandler) handleGetBuckets(w http.ResponseWriter, r *http.Request)
|
|||
}
|
||||
h.log.Debug("Buckets retrieved", zap.String("buckets", fmt.Sprint(bs)))
|
||||
|
||||
h.api.Respond(w, r, http.StatusOK, newBucketsResponse(r.Context(), bucketsRequest.opts, bucketsRequest.filter, bs))
|
||||
h.api.Respond(w, r, http.StatusOK, newBucketsResponse(r.Context(), bucketsRequest.opts, bucketsRequest.filter, bs, h.labelSvc))
|
||||
}
|
||||
|
||||
type getBucketsRequest struct {
|
||||
|
|
|
@ -39,7 +39,7 @@ func initBucketHttpService(f itesting.BucketFields, t *testing.T) (influxdb.Buck
|
|||
}
|
||||
}
|
||||
|
||||
handler := tenant.NewHTTPBucketHandler(zaptest.NewLogger(t), svc, nil, nil)
|
||||
handler := tenant.NewHTTPBucketHandler(zaptest.NewLogger(t), svc, nil, nil, nil)
|
||||
r := chi.NewRouter()
|
||||
r.Mount(handler.Prefix(), handler)
|
||||
server := httptest.NewServer(r)
|
||||
|
|
|
@ -16,14 +16,12 @@ var _ influxdb.BucketService = (*AuthedBucketService)(nil)
|
|||
// against it appropriately.
|
||||
type AuthedBucketService struct {
|
||||
s influxdb.BucketService
|
||||
u influxdb.UserResourceMappingService
|
||||
}
|
||||
|
||||
// NewAuthedBucketService constructs an instance of an authorizing bucket serivce.
|
||||
func NewAuthedBucketService(s influxdb.BucketService, u influxdb.UserResourceMappingService) *AuthedBucketService {
|
||||
func NewAuthedBucketService(s influxdb.BucketService) *AuthedBucketService {
|
||||
return &AuthedBucketService{
|
||||
s: s,
|
||||
u: u,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -104,7 +104,7 @@ func TestBucketService_FindBucketByID(t *testing.T) {
|
|||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
s := tenant.NewAuthedBucketService(tt.fields.BucketService, nil)
|
||||
s := tenant.NewAuthedBucketService(tt.fields.BucketService)
|
||||
|
||||
ctx := context.Background()
|
||||
ctx = influxdbcontext.SetAuthorizer(ctx, mock.NewMockAuthorizer(false, []influxdb.Permission{tt.args.permission}))
|
||||
|
@ -189,7 +189,7 @@ func TestBucketService_FindBucket(t *testing.T) {
|
|||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
s := tenant.NewAuthedBucketService(tt.fields.BucketService, nil)
|
||||
s := tenant.NewAuthedBucketService(tt.fields.BucketService)
|
||||
|
||||
ctx := context.Background()
|
||||
ctx = influxdbcontext.SetAuthorizer(ctx, mock.NewMockAuthorizer(false, []influxdb.Permission{tt.args.permission}))
|
||||
|
@ -314,7 +314,7 @@ func TestBucketService_FindBuckets(t *testing.T) {
|
|||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
s := tenant.NewAuthedBucketService(tt.fields.BucketService, nil)
|
||||
s := tenant.NewAuthedBucketService(tt.fields.BucketService)
|
||||
|
||||
ctx := context.Background()
|
||||
ctx = influxdbcontext.SetAuthorizer(ctx, mock.NewMockAuthorizer(false, []influxdb.Permission{tt.args.permission}))
|
||||
|
@ -429,7 +429,7 @@ func TestBucketService_UpdateBucket(t *testing.T) {
|
|||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
s := tenant.NewAuthedBucketService(tt.fields.BucketService, nil)
|
||||
s := tenant.NewAuthedBucketService(tt.fields.BucketService)
|
||||
|
||||
ctx := context.Background()
|
||||
ctx = influxdbcontext.SetAuthorizer(ctx, mock.NewMockAuthorizer(false, tt.args.permissions))
|
||||
|
@ -534,7 +534,7 @@ func TestBucketService_DeleteBucket(t *testing.T) {
|
|||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
s := tenant.NewAuthedBucketService(tt.fields.BucketService, nil)
|
||||
s := tenant.NewAuthedBucketService(tt.fields.BucketService)
|
||||
|
||||
ctx := context.Background()
|
||||
ctx = influxdbcontext.SetAuthorizer(ctx, mock.NewMockAuthorizer(false, tt.args.permissions))
|
||||
|
@ -616,7 +616,7 @@ func TestBucketService_CreateBucket(t *testing.T) {
|
|||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
s := tenant.NewAuthedBucketService(tt.fields.BucketService, nil)
|
||||
s := tenant.NewAuthedBucketService(tt.fields.BucketService)
|
||||
|
||||
ctx := context.Background()
|
||||
ctx = influxdbcontext.SetAuthorizer(ctx, mock.NewMockAuthorizer(false, []influxdb.Permission{tt.args.permission}))
|
||||
|
|
Loading…
Reference in New Issue