refactor: add new label package (#18078)

pull/18194/head
Alirie Gray 2020-05-21 11:30:19 -07:00 committed by GitHub
parent 5ac45ccb8a
commit 7f4ddabe8a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
36 changed files with 3774 additions and 157 deletions

View File

@ -2,7 +2,6 @@ package authorization
import (
"context"
"fmt"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/kit/metric"
@ -18,10 +17,10 @@ type AuthMetrics struct {
var _ influxdb.AuthorizationService = (*AuthMetrics)(nil)
func NewAuthMetrics(reg prometheus.Registerer, s influxdb.AuthorizationService, opts ...MetricsOption) *AuthMetrics {
o := applyOpts(opts...)
func NewAuthMetrics(reg prometheus.Registerer, s influxdb.AuthorizationService, opts ...metric.MetricsOption) *AuthMetrics {
o := metric.ApplyMetricOpts(opts...)
return &AuthMetrics{
rec: metric.New(reg, o.applySuffix("token")),
rec: metric.New(reg, o.ApplySuffix("token")),
authService: s,
}
}
@ -59,37 +58,3 @@ func (m *AuthMetrics) DeleteAuthorization(ctx context.Context, id influxdb.ID) e
err := m.authService.DeleteAuthorization(ctx, id)
return rec(err)
}
// Metrics options
type metricOpts struct {
serviceSuffix string
}
func defaultOpts() *metricOpts {
return &metricOpts{}
}
func (o *metricOpts) applySuffix(prefix string) string {
if o.serviceSuffix != "" {
return fmt.Sprintf("%s_%s", prefix, o.serviceSuffix)
}
return prefix
}
// MetricsOption is an option used by a metric middleware.
type MetricsOption func(*metricOpts)
// WithSuffix returns a metric option that applies a suffix to the service name of the metric.
func WithSuffix(suffix string) MetricsOption {
return func(opts *metricOpts) {
opts.serviceSuffix = suffix
}
}
func applyOpts(opts ...MetricsOption) *metricOpts {
o := defaultOpts()
for _, opt := range opts {
opt(o)
}
return o
}

View File

@ -5,7 +5,7 @@ import (
"encoding/json"
"github.com/buger/jsonparser"
influxdb "github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/kv"
jsonp "github.com/influxdata/influxdb/v2/pkg/jsonparser"
)

View File

@ -31,6 +31,7 @@ import (
"github.com/influxdata/influxdb/v2/kit/cli"
"github.com/influxdata/influxdb/v2/kit/feature"
overrideflagger "github.com/influxdata/influxdb/v2/kit/feature/override"
"github.com/influxdata/influxdb/v2/kit/metric"
"github.com/influxdata/influxdb/v2/kit/prom"
"github.com/influxdata/influxdb/v2/kit/signals"
"github.com/influxdata/influxdb/v2/kit/tracing"
@ -617,11 +618,11 @@ func (m *Launcher) run(ctx context.Context) (err error) {
if m.enableNewMetaStore {
ts := tenant.NewService(store)
userSvc = tenant.NewUserLogger(m.log.With(zap.String("store", "new")), tenant.NewUserMetrics(m.reg, ts, tenant.WithSuffix("new")))
orgSvc = tenant.NewOrgLogger(m.log.With(zap.String("store", "new")), tenant.NewOrgMetrics(m.reg, ts, tenant.WithSuffix("new")))
userResourceSvc = tenant.NewURMLogger(m.log.With(zap.String("store", "new")), tenant.NewUrmMetrics(m.reg, ts, tenant.WithSuffix("new")))
bucketSvc = tenant.NewBucketLogger(m.log.With(zap.String("store", "new")), tenant.NewBucketMetrics(m.reg, ts, tenant.WithSuffix("new")))
passwdsSvc = tenant.NewPasswordLogger(m.log.With(zap.String("store", "new")), tenant.NewPasswordMetrics(m.reg, ts, tenant.WithSuffix("new")))
userSvc = tenant.NewUserLogger(m.log.With(zap.String("store", "new")), tenant.NewUserMetrics(m.reg, ts, metric.WithSuffix("new")))
orgSvc = tenant.NewOrgLogger(m.log.With(zap.String("store", "new")), tenant.NewOrgMetrics(m.reg, ts, metric.WithSuffix("new")))
userResourceSvc = tenant.NewURMLogger(m.log.With(zap.String("store", "new")), tenant.NewUrmMetrics(m.reg, ts, metric.WithSuffix("new")))
bucketSvc = tenant.NewBucketLogger(m.log.With(zap.String("store", "new")), tenant.NewBucketMetrics(m.reg, ts, metric.WithSuffix("new")))
passwdsSvc = tenant.NewPasswordLogger(m.log.With(zap.String("store", "new")), tenant.NewPasswordMetrics(m.reg, ts, metric.WithSuffix("new")))
}
switch m.secretStore {
@ -969,7 +970,7 @@ func (m *Launcher) run(ctx context.Context) (err error) {
{
onboardSvc := tenant.NewOnboardService(store, authSvc) // basic service
onboardSvc = tenant.NewAuthedOnboardSvc(onboardSvc) // with auth
onboardSvc = tenant.NewOnboardingMetrics(m.reg, onboardSvc, tenant.WithSuffix("new")) // with metrics
onboardSvc = tenant.NewOnboardingMetrics(m.reg, onboardSvc, metric.WithSuffix("new")) // with metrics
onboardSvc = tenant.NewOnboardingLogger(m.log.With(zap.String("handler", "onboard")), onboardSvc) // with logging
onboardHTTPServer = tenant.NewHTTPOnboardHandler(m.log, onboardSvc)

9
id.go
View File

@ -25,6 +25,15 @@ var (
}
)
// ErrCorruptID means the ID stored in the Store is corrupt.
func ErrCorruptID(err error) *Error {
return &Error{
Code: EInvalid,
Msg: "corrupt ID provided",
Err: err,
}
}
// ID is a unique identifier.
//
// Its zero value is not a valid ID.

View File

@ -1,4 +1,4 @@
package tenant
package metric
import "fmt"
@ -10,7 +10,7 @@ func defaultOpts() *metricOpts {
return &metricOpts{}
}
func (o *metricOpts) applySuffix(prefix string) string {
func (o *metricOpts) ApplySuffix(prefix string) string {
if o.serviceSuffix != "" {
return fmt.Sprintf("%s_%s", prefix, o.serviceSuffix)
}
@ -27,7 +27,7 @@ func WithSuffix(suffix string) MetricsOption {
}
}
func applyOpts(opts ...MetricsOption) *metricOpts {
func ApplyMetricOpts(opts ...MetricsOption) *metricOpts {
o := defaultOpts()
for _, opt := range opts {
opt(o)

View File

@ -1,11 +1,13 @@
package http
import (
"context"
"net/http"
"path"
"strings"
"time"
"github.com/go-chi/chi"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/kit/tracing"
ua "github.com/mileusna/useragent"
@ -130,3 +132,43 @@ func shiftPath(p string) (head, tail string) {
}
return p[1:i], p[i:]
}
type OrgContext string
const CtxOrgKey OrgContext = "orgID"
// ValidResource make sure a resource exists when a sub system needs to be mounted to an api
func ValidResource(api *API, lookupOrgByResourceID func(context.Context, influxdb.ID) (influxdb.ID, error)) Middleware {
return func(next http.Handler) http.Handler {
fn := func(w http.ResponseWriter, r *http.Request) {
statusW := NewStatusResponseWriter(w)
id, err := influxdb.IDFromString(chi.URLParam(r, "id"))
if err != nil {
api.Err(w, r, influxdb.ErrCorruptID(err))
return
}
ctx := r.Context()
orgID, err := lookupOrgByResourceID(ctx, *id)
if err != nil {
api.Err(w, r, err)
return
}
// embed OrgID into context
next.ServeHTTP(statusW, r.WithContext(context.WithValue(ctx, CtxOrgKey, orgID)))
}
return http.HandlerFunc(fn)
}
}
// OrgIDFromContext ....
func OrgIDFromContext(ctx context.Context) *influxdb.ID {
v := ctx.Value(CtxOrgKey)
if v == nil {
return nil
}
id := v.(influxdb.ID)
return &id
}

View File

@ -311,6 +311,14 @@ func (s *Service) PutLabel(ctx context.Context, l *influxdb.Label) error {
})
}
// CreateUserResourceMappingForOrg is a public function that calls createUserResourceMappingForOrg used only for the label service
// it can be removed when URMs are removed from the label service
func (s *Service) CreateUserResourceMappingForOrg(ctx context.Context, tx Tx, orgID influxdb.ID, resID influxdb.ID, resType influxdb.ResourceType) error {
err := s.createUserResourceMappingForOrg(ctx, tx, orgID, resID, resType)
return err
}
func (s *Service) createUserResourceMappingForOrg(ctx context.Context, tx Tx, orgID influxdb.ID, resID influxdb.ID, resType influxdb.ResourceType) error {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()

34
label/error.go Normal file
View File

@ -0,0 +1,34 @@
package label
import (
"github.com/influxdata/influxdb/v2"
)
var (
// NotUniqueIDError occurs when attempting to create a Label with an ID that already belongs to another one
NotUniqueIDError = &influxdb.Error{
Code: influxdb.EConflict,
Msg: "ID already exists",
}
// ErrFailureGeneratingID occurs ony when the random number generator
// cannot generate an ID in MaxIDGenerationN times.
ErrFailureGeneratingID = &influxdb.Error{
Code: influxdb.EInternal,
Msg: "unable to generate valid id",
}
// ErrLabelNotFound occurs when a label cannot be found by its ID
ErrLabelNotFound = &influxdb.Error{
Code: influxdb.ENotFound,
Msg: "label not found",
}
)
// ErrInternalServiceError is used when the error comes from an internal system.
func ErrInternalServiceError(err error) *influxdb.Error {
return &influxdb.Error{
Code: influxdb.EInternal,
Err: err,
}
}

135
label/http_client.go Normal file
View File

@ -0,0 +1,135 @@
package label
import (
"context"
"path"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/pkg/httpc"
)
var _ influxdb.LabelService = (*LabelClientService)(nil)
type LabelClientService struct {
Client *httpc.Client
}
func labelIDPath(id influxdb.ID) string {
return path.Join(prefixLabels, id.String())
}
func resourceIDPath(resourceType influxdb.ResourceType, resourceID influxdb.ID, p string) string {
return path.Join("/api/v2/", string(resourceType), resourceID.String(), p)
}
// CreateLabel creates a new label.
func (s *LabelClientService) CreateLabel(ctx context.Context, l *influxdb.Label) error {
var lr labelResponse
err := s.Client.
PostJSON(l, prefixLabels).
DecodeJSON(&lr).
Do(ctx)
if err != nil {
return err
}
*l = lr.Label
return nil
}
// FindLabelByID returns a single label by ID.
func (s *LabelClientService) FindLabelByID(ctx context.Context, id influxdb.ID) (*influxdb.Label, error) {
var lr labelResponse
err := s.Client.
Get(labelIDPath(id)).
DecodeJSON(&lr).
Do(ctx)
if err != nil {
return nil, err
}
return &lr.Label, nil
}
// FindLabels is a client for the find labels response from the server.
func (s *LabelClientService) FindLabels(ctx context.Context, filter influxdb.LabelFilter, opt ...influxdb.FindOptions) ([]*influxdb.Label, error) {
params := influxdb.FindOptionParams(opt...)
if filter.OrgID != nil {
params = append(params, [2]string{"orgID", filter.OrgID.String()})
}
if filter.Name != "" {
params = append(params, [2]string{"name", filter.Name})
}
var lr labelsResponse
err := s.Client.
Get(prefixLabels).
QueryParams(params...).
DecodeJSON(&lr).
Do(ctx)
if err != nil {
return nil, err
}
return lr.Labels, nil
}
// FindResourceLabels returns a list of labels, derived from a label mapping filter.
func (s *LabelClientService) FindResourceLabels(ctx context.Context, filter influxdb.LabelMappingFilter) ([]*influxdb.Label, error) {
if err := filter.Valid(); err != nil {
return nil, err
}
var r labelsResponse
err := s.Client.
Get(resourceIDPath(filter.ResourceType, filter.ResourceID, "labels")).
DecodeJSON(&r).
Do(ctx)
if err != nil {
return nil, err
}
return r.Labels, nil
}
// UpdateLabel updates a label and returns the updated label.
func (s *LabelClientService) UpdateLabel(ctx context.Context, id influxdb.ID, upd influxdb.LabelUpdate) (*influxdb.Label, error) {
var lr labelResponse
err := s.Client.
PatchJSON(upd, labelIDPath(id)).
DecodeJSON(&lr).
Do(ctx)
if err != nil {
return nil, err
}
return &lr.Label, nil
}
// DeleteLabel removes a label by ID.
func (s *LabelClientService) DeleteLabel(ctx context.Context, id influxdb.ID) error {
return s.Client.
Delete(labelIDPath(id)).
Do(ctx)
}
// ******* Label Mappings ******* //
// CreateLabelMapping will create a labbel mapping
func (s *LabelClientService) CreateLabelMapping(ctx context.Context, m *influxdb.LabelMapping) error {
if err := m.Validate(); err != nil {
return err
}
urlPath := resourceIDPath(m.ResourceType, m.ResourceID, "labels")
return s.Client.
PostJSON(m, urlPath).
DecodeJSON(m).
Do(ctx)
}
func (s *LabelClientService) DeleteLabelMapping(ctx context.Context, m *influxdb.LabelMapping) error {
if err := m.Validate(); err != nil {
return err
}
return s.Client.
Delete(resourceIDPath(m.ResourceType, m.ResourceID, "labels")).
Do(ctx)
}

192
label/http_server.go Normal file
View File

@ -0,0 +1,192 @@
package label
import (
"encoding/json"
"fmt"
"net/http"
"github.com/go-chi/chi"
"github.com/go-chi/chi/middleware"
"github.com/influxdata/influxdb/v2"
kithttp "github.com/influxdata/influxdb/v2/kit/transport/http"
"go.uber.org/zap"
)
type LabelHandler struct {
chi.Router
api *kithttp.API
log *zap.Logger
labelSvc influxdb.LabelService
}
const (
prefixLabels = "/api/v2/labels"
)
func (h *LabelHandler) Prefix() string {
return prefixLabels
}
func NewHTTPLabelHandler(log *zap.Logger, ls influxdb.LabelService) *LabelHandler {
h := &LabelHandler{
api: kithttp.NewAPI(kithttp.WithLog(log)),
log: log,
labelSvc: ls,
}
r := chi.NewRouter()
r.Use(
middleware.Recoverer,
middleware.RequestID,
middleware.RealIP,
)
r.Route("/", func(r chi.Router) {
r.Post("/", h.handlePostLabel)
r.Get("/", h.handleGetLabels)
r.Route("/{id}", func(r chi.Router) {
r.Get("/", h.handleGetLabel)
r.Patch("/", h.handlePatchLabel)
r.Delete("/", h.handleDeleteLabel)
})
})
h.Router = r
return h
}
type labelResponse struct {
Links map[string]string `json:"links"`
Label influxdb.Label `json:"label"`
}
func newLabelResponse(l *influxdb.Label) *labelResponse {
return &labelResponse{
Links: map[string]string{
"self": fmt.Sprintf("/api/v2/labels/%s", l.ID),
},
Label: *l,
}
}
type labelsResponse struct {
Links map[string]string `json:"links"`
Labels []*influxdb.Label `json:"labels"`
}
func newLabelsResponse(ls []*influxdb.Label) *labelsResponse {
return &labelsResponse{
Links: map[string]string{
"self": fmt.Sprintf("/api/v2/labels"),
},
Labels: ls,
}
}
// handlePostLabel is the HTTP handler for the POST /api/v2/labels route.
func (h *LabelHandler) handlePostLabel(w http.ResponseWriter, r *http.Request) {
var label influxdb.Label
if err := h.api.DecodeJSON(r.Body, &label); err != nil {
h.api.Err(w, r, err)
return
}
if err := label.Validate(); err != nil {
h.api.Err(w, r, err)
return
}
if err := h.labelSvc.CreateLabel(r.Context(), &label); err != nil {
h.api.Err(w, r, err)
return
}
h.log.Debug("Label created", zap.String("label", fmt.Sprint(label)))
h.api.Respond(w, r, http.StatusCreated, newLabelResponse(&label))
}
// handleGetLabel is the HTTP handler for the GET /api/v2/labels/id route.
func (h *LabelHandler) handleGetLabel(w http.ResponseWriter, r *http.Request) {
id, err := influxdb.IDFromString(chi.URLParam(r, "id"))
if err != nil {
h.api.Err(w, r, err)
return
}
l, err := h.labelSvc.FindLabelByID(r.Context(), *id)
if err != nil {
h.api.Err(w, r, err)
return
}
h.log.Debug("Label retrieved", zap.String("label", fmt.Sprint(l)))
h.api.Respond(w, r, http.StatusOK, newLabelResponse(l))
}
// handleGetLabels is the HTTP handler for the GET /api/v2/labels route.
func (h *LabelHandler) handleGetLabels(w http.ResponseWriter, r *http.Request) {
var filter influxdb.LabelFilter
qp := r.URL.Query()
if name := qp.Get("name"); name != "" {
filter.Name = name
}
if orgID := qp.Get("orgID"); orgID != "" {
i, err := influxdb.IDFromString(orgID)
if err == nil {
filter.OrgID = i
}
}
labels, err := h.labelSvc.FindLabels(r.Context(), filter)
if err != nil {
h.api.Err(w, r, err)
return
}
h.log.Debug("Labels retrived", zap.String("labels", fmt.Sprint(labels)))
h.api.Respond(w, r, http.StatusOK, newLabelsResponse(labels))
}
// handlePatchLabel is the HTTP handler for the PATCH /api/v2/labels route.
func (h *LabelHandler) handlePatchLabel(w http.ResponseWriter, r *http.Request) {
id, err := influxdb.IDFromString(chi.URLParam(r, "id"))
if err != nil {
h.api.Err(w, r, err)
return
}
upd := &influxdb.LabelUpdate{}
if err := json.NewDecoder(r.Body).Decode(upd); err != nil {
h.api.Err(w, r, err)
return
}
l, err := h.labelSvc.UpdateLabel(r.Context(), *id, *upd)
if err != nil {
h.api.Err(w, r, err)
return
}
h.log.Debug("Label updated", zap.String("label", fmt.Sprint(l)))
h.api.Respond(w, r, http.StatusOK, newLabelResponse(l))
}
// handleDeleteLabel is the HTTP handler for the DELETE /api/v2/labels/:id route.
func (h *LabelHandler) handleDeleteLabel(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
id, err := influxdb.IDFromString(chi.URLParam(r, "id"))
if err != nil {
h.api.Err(w, r, err)
return
}
if err := h.labelSvc.DeleteLabel(ctx, *id); err != nil {
h.api.Err(w, r, err)
return
}
h.log.Debug("Label deleted", zap.String("labelID", fmt.Sprint(id)))
h.api.Respond(w, r, http.StatusNoContent, nil)
}

621
label/http_server_test.go Normal file
View File

@ -0,0 +1,621 @@
package label
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net/http/httptest"
"testing"
"github.com/go-chi/chi"
"github.com/google/go-cmp/cmp"
influxdb "github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/mock"
influxdbtesting "github.com/influxdata/influxdb/v2/testing"
"github.com/yudai/gojsondiff"
"github.com/yudai/gojsondiff/formatter"
"go.uber.org/zap/zaptest"
)
func TestService_handlePostLabel(t *testing.T) {
type fields struct {
LabelService influxdb.LabelService
}
type args struct {
label *influxdb.Label
}
type wants struct {
statusCode int
contentType string
body string
}
tests := []struct {
name string
fields fields
args args
wants wants
}{
{
name: "create a new label",
fields: fields{
&mock.LabelService{
CreateLabelFn: func(ctx context.Context, l *influxdb.Label) error {
l.ID = influxdbtesting.MustIDBase16("020f755c3c082000")
return nil
},
},
},
args: args{
label: &influxdb.Label{
Name: "mylabel",
OrgID: influxdbtesting.MustIDBase16("020f755c3c082008"),
},
},
wants: wants{
statusCode: http.StatusCreated,
contentType: "application/json; charset=utf-8",
body: `
{
"links": {
"self": "/api/v2/labels/020f755c3c082000"
},
"label": {
"id": "020f755c3c082000",
"name": "mylabel",
"orgID": "020f755c3c082008"
}
}
`,
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
handler := NewHTTPLabelHandler(zaptest.NewLogger(t), tt.fields.LabelService)
router := chi.NewRouter()
router.Mount(handler.Prefix(), handler)
l, err := json.Marshal(tt.args.label)
if err != nil {
t.Fatalf("failed to marshal label: %v", err)
}
r := httptest.NewRequest("GET", "http://any.url", bytes.NewReader(l))
w := httptest.NewRecorder()
handler.handlePostLabel(w, r)
res := w.Result()
content := res.Header.Get("Content-Type")
body, _ := ioutil.ReadAll(res.Body)
if res.StatusCode != tt.wants.statusCode {
t.Errorf("%q. handlePostLabel() = %v, want %v", tt.name, res.StatusCode, tt.wants.statusCode)
}
if tt.wants.contentType != "" && content != tt.wants.contentType {
t.Errorf("%q. handlePostLabel() = %v, want %v", tt.name, content, tt.wants.contentType)
}
if eq, diff, err := jsonEqual(string(body), tt.wants.body); err != nil || tt.wants.body != "" && !eq {
t.Errorf("%q. handlePostLabel() = ***%v***", tt.name, diff)
}
})
}
}
func TestService_handleGetLabel(t *testing.T) {
type fields struct {
LabelService influxdb.LabelService
}
type args struct {
id string
}
type wants struct {
statusCode int
contentType string
body string
}
tests := []struct {
name string
fields fields
args args
wants wants
}{
{
name: "get a label by id",
fields: fields{
&mock.LabelService{
FindLabelByIDFn: func(ctx context.Context, id influxdb.ID) (*influxdb.Label, error) {
if id == influxdbtesting.MustIDBase16("020f755c3c082000") {
return &influxdb.Label{
ID: influxdbtesting.MustIDBase16("020f755c3c082000"),
Name: "mylabel",
Properties: map[string]string{
"color": "fff000",
},
}, nil
}
return nil, fmt.Errorf("not found")
},
},
},
args: args{
id: "020f755c3c082000",
},
wants: wants{
statusCode: http.StatusOK,
contentType: "application/json; charset=utf-8",
body: `
{
"links": {
"self": "/api/v2/labels/020f755c3c082000"
},
"label": {
"id": "020f755c3c082000",
"name": "mylabel",
"properties": {
"color": "fff000"
}
}
}
`,
},
},
{
name: "not found",
fields: fields{
&mock.LabelService{
FindLabelByIDFn: func(ctx context.Context, id influxdb.ID) (*influxdb.Label, error) {
return nil, &influxdb.Error{
Code: influxdb.ENotFound,
Msg: influxdb.ErrLabelNotFound,
}
},
},
},
args: args{
id: "020f755c3c082000",
},
wants: wants{
statusCode: http.StatusNotFound,
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
handler := NewHTTPLabelHandler(zaptest.NewLogger(t), tt.fields.LabelService)
router := chi.NewRouter()
router.Mount(handler.Prefix(), handler)
r := httptest.NewRequest("GET", "http://any.url", nil)
rctx := chi.NewRouteContext()
rctx.URLParams.Add("id", tt.args.id)
r = r.WithContext(context.WithValue(r.Context(), chi.RouteCtxKey, rctx))
w := httptest.NewRecorder()
handler.handleGetLabel(w, r)
res := w.Result()
content := res.Header.Get("Content-Type")
body, _ := ioutil.ReadAll(res.Body)
if res.StatusCode != tt.wants.statusCode {
t.Errorf("%q. handleGetLabel() = %v, want %v", tt.name, res.StatusCode, tt.wants.statusCode)
}
if tt.wants.contentType != "" && content != tt.wants.contentType {
t.Errorf("%q. handleGetLabel() = %v, want %v", tt.name, content, tt.wants.contentType)
}
if tt.wants.body != "" {
if eq, diff, err := jsonEqual(string(body), tt.wants.body); err != nil {
t.Errorf("%q, handleGetLabel(). error unmarshaling json %v", tt.name, err)
} else if !eq {
t.Errorf("%q. handleGetLabel() = ***%s***", tt.name, diff)
}
}
})
}
}
func TestService_handleGetLabels(t *testing.T) {
type fields struct {
LabelService influxdb.LabelService
}
type wants struct {
statusCode int
contentType string
body string
}
tests := []struct {
name string
fields fields
wants wants
}{
{
name: "get all labels",
fields: fields{
&mock.LabelService{
FindLabelsFn: func(ctx context.Context, filter influxdb.LabelFilter) ([]*influxdb.Label, error) {
return []*influxdb.Label{
{
ID: influxdbtesting.MustIDBase16("0b501e7e557ab1ed"),
Name: "hello",
Properties: map[string]string{
"color": "fff000",
},
},
{
ID: influxdbtesting.MustIDBase16("c0175f0077a77005"),
Name: "example",
Properties: map[string]string{
"color": "fff000",
},
},
}, nil
},
},
},
wants: wants{
statusCode: http.StatusOK,
contentType: "application/json; charset=utf-8",
body: `
{
"links": {
"self": "/api/v2/labels"
},
"labels": [
{
"id": "0b501e7e557ab1ed",
"name": "hello",
"properties": {
"color": "fff000"
}
},
{
"id": "c0175f0077a77005",
"name": "example",
"properties": {
"color": "fff000"
}
}
]
}
`,
},
},
{
name: "get all labels when there are none",
fields: fields{
&mock.LabelService{
FindLabelsFn: func(ctx context.Context, filter influxdb.LabelFilter) ([]*influxdb.Label, error) {
return []*influxdb.Label{}, nil
},
},
},
wants: wants{
statusCode: http.StatusOK,
contentType: "application/json; charset=utf-8",
body: `
{
"links": {
"self": "/api/v2/labels"
},
"labels": []
}`,
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
handler := NewHTTPLabelHandler(zaptest.NewLogger(t), tt.fields.LabelService)
router := chi.NewRouter()
router.Mount(handler.Prefix(), handler)
r := httptest.NewRequest("GET", "http://any.url", nil)
w := httptest.NewRecorder()
handler.handleGetLabels(w, r)
res := w.Result()
content := res.Header.Get("Content-Type")
body, _ := ioutil.ReadAll(res.Body)
if res.StatusCode != tt.wants.statusCode {
t.Errorf("%q. handleGetLabels() = %v, want %v", tt.name, res.StatusCode, tt.wants.statusCode)
}
if tt.wants.contentType != "" && content != tt.wants.contentType {
t.Errorf("%q. handleGetLabels() = %v, want %v", tt.name, content, tt.wants.contentType)
}
if eq, diff, err := jsonEqual(string(body), tt.wants.body); err != nil || tt.wants.body != "" && !eq {
t.Errorf("%q. handleGetLabels() = ***%v***", tt.name, diff)
}
})
}
}
func TestService_handlePatchLabel(t *testing.T) {
type fields struct {
LabelService influxdb.LabelService
}
type args struct {
id string
properties map[string]string
}
type wants struct {
statusCode int
contentType string
body string
}
tests := []struct {
name string
fields fields
args args
wants wants
}{
{
name: "update label properties",
fields: fields{
&mock.LabelService{
UpdateLabelFn: func(ctx context.Context, id influxdb.ID, upd influxdb.LabelUpdate) (*influxdb.Label, error) {
if id == influxdbtesting.MustIDBase16("020f755c3c082000") {
l := &influxdb.Label{
ID: influxdbtesting.MustIDBase16("020f755c3c082000"),
Name: "mylabel",
Properties: map[string]string{
"color": "fff000",
},
}
for k, v := range upd.Properties {
if v == "" {
delete(l.Properties, k)
} else {
l.Properties[k] = v
}
}
return l, nil
}
return nil, fmt.Errorf("not found")
},
},
},
args: args{
id: "020f755c3c082000",
properties: map[string]string{
"color": "aaabbb",
},
},
wants: wants{
statusCode: http.StatusOK,
contentType: "application/json; charset=utf-8",
body: `
{
"links": {
"self": "/api/v2/labels/020f755c3c082000"
},
"label": {
"id": "020f755c3c082000",
"name": "mylabel",
"properties": {
"color": "aaabbb"
}
}
}
`,
},
},
{
name: "label not found",
fields: fields{
&mock.LabelService{
UpdateLabelFn: func(ctx context.Context, id influxdb.ID, upd influxdb.LabelUpdate) (*influxdb.Label, error) {
return nil, &influxdb.Error{
Code: influxdb.ENotFound,
Msg: influxdb.ErrLabelNotFound,
}
},
},
},
args: args{
id: "020f755c3c082000",
properties: map[string]string{
"color": "aaabbb",
},
},
wants: wants{
statusCode: http.StatusNotFound,
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
handler := NewHTTPLabelHandler(zaptest.NewLogger(t), tt.fields.LabelService)
router := chi.NewRouter()
router.Mount(handler.Prefix(), handler)
w := httptest.NewRecorder()
upd := influxdb.LabelUpdate{}
if len(tt.args.properties) > 0 {
upd.Properties = tt.args.properties
}
l, err := json.Marshal(upd)
if err != nil {
t.Fatalf("failed to marshal label update: %v", err)
}
r := httptest.NewRequest("GET", "http://any.url", bytes.NewReader(l))
rctx := chi.NewRouteContext()
rctx.URLParams.Add("id", tt.args.id)
r = r.WithContext(context.WithValue(r.Context(), chi.RouteCtxKey, rctx))
handler.handlePatchLabel(w, r)
res := w.Result()
content := res.Header.Get("Content-Type")
body, _ := ioutil.ReadAll(res.Body)
if res.StatusCode != tt.wants.statusCode {
t.Errorf("%q. handlePatchLabel() = %v, want %v", tt.name, res.StatusCode, tt.wants.statusCode)
}
if tt.wants.contentType != "" && content != tt.wants.contentType {
t.Errorf("%q. handlePatchLabel() = %v, want %v", tt.name, content, tt.wants.contentType)
}
if tt.wants.body != "" {
if eq, diff, err := jsonEqual(string(body), tt.wants.body); err != nil {
t.Errorf("%q, handlePatchLabel(). error unmarshaling json %v", tt.name, err)
} else if !eq {
t.Errorf("%q. handlePatchLabel() = ***%s***", tt.name, diff)
}
}
})
}
}
func TestService_handleDeleteLabel(t *testing.T) {
type fields struct {
LabelService influxdb.LabelService
}
type args struct {
id string
}
type wants struct {
statusCode int
contentType string
body string
}
tests := []struct {
name string
fields fields
args args
wants wants
}{
{
name: "remove a label by id",
fields: fields{
&mock.LabelService{
DeleteLabelFn: func(ctx context.Context, id influxdb.ID) error {
if id == influxdbtesting.MustIDBase16("020f755c3c082000") {
return nil
}
return fmt.Errorf("wrong id")
},
},
},
args: args{
id: "020f755c3c082000",
},
wants: wants{
statusCode: http.StatusNoContent,
},
},
{
name: "label not found",
fields: fields{
&mock.LabelService{
DeleteLabelFn: func(ctx context.Context, id influxdb.ID) error {
return &influxdb.Error{
Code: influxdb.ENotFound,
Msg: influxdb.ErrLabelNotFound,
}
},
},
},
args: args{
id: "020f755c3c082000",
},
wants: wants{
statusCode: http.StatusNotFound,
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
handler := NewHTTPLabelHandler(zaptest.NewLogger(t), tt.fields.LabelService)
router := chi.NewRouter()
router.Mount(handler.Prefix(), handler)
w := httptest.NewRecorder()
r := httptest.NewRequest("GET", "http://any.url", nil)
rctx := chi.NewRouteContext()
rctx.URLParams.Add("id", tt.args.id)
r = r.WithContext(context.WithValue(r.Context(), chi.RouteCtxKey, rctx))
handler.handleDeleteLabel(w, r)
res := w.Result()
content := res.Header.Get("Content-Type")
body, _ := ioutil.ReadAll(res.Body)
if res.StatusCode != tt.wants.statusCode {
t.Errorf("%q. handleDeleteLabel() = %v, want %v", tt.name, res.StatusCode, tt.wants.statusCode)
}
if tt.wants.contentType != "" && content != tt.wants.contentType {
t.Errorf("%q. handleDeleteLabel() = %v, want %v", tt.name, content, tt.wants.contentType)
}
if tt.wants.body != "" {
if eq, diff, err := jsonEqual(string(body), tt.wants.body); err != nil {
t.Errorf("%q, handleDeleteLabel(). error unmarshaling json %v", tt.name, err)
} else if !eq {
t.Errorf("%q. handleDeleteLabel() = ***%s***", tt.name, diff)
}
}
})
}
}
func jsonEqual(s1, s2 string) (eq bool, diff string, err error) {
if s1 == s2 {
return true, "", nil
}
if s1 == "" {
return false, s2, fmt.Errorf("s1 is empty")
}
if s2 == "" {
return false, s1, fmt.Errorf("s2 is empty")
}
var o1 interface{}
if err = json.Unmarshal([]byte(s1), &o1); err != nil {
return
}
var o2 interface{}
if err = json.Unmarshal([]byte(s2), &o2); err != nil {
return
}
differ := gojsondiff.New()
d, err := differ.Compare([]byte(s1), []byte(s2))
if err != nil {
return
}
config := formatter.AsciiFormatterConfig{}
formatter := formatter.NewAsciiFormatter(o1, config)
diff, err = formatter.Format(d)
return cmp.Equal(o1, o2), diff, err
}

133
label/middleware_auth.go Normal file
View File

@ -0,0 +1,133 @@
package label
import (
"context"
"errors"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/authorizer"
kithttp "github.com/influxdata/influxdb/v2/kit/transport/http"
)
var _ influxdb.LabelService = (*AuthedLabelService)(nil)
type AuthedLabelService struct {
s influxdb.LabelService
}
// NewAuthedLabelService constructs an instance of an authorizing label serivce.
func NewAuthedLabelService(s influxdb.LabelService) *AuthedLabelService {
return &AuthedLabelService{
s: s,
}
}
func (s *AuthedLabelService) CreateLabel(ctx context.Context, l *influxdb.Label) error {
if _, _, err := authorizer.AuthorizeCreate(ctx, influxdb.LabelsResourceType, l.OrgID); err != nil {
return err
}
return s.s.CreateLabel(ctx, l)
}
func (s *AuthedLabelService) FindLabels(ctx context.Context, filter influxdb.LabelFilter, opt ...influxdb.FindOptions) ([]*influxdb.Label, error) {
// TODO: we'll likely want to push this operation into the database eventually since fetching the whole list of data
// will likely be expensive.
ls, err := s.s.FindLabels(ctx, filter, opt...)
if err != nil {
return nil, err
}
ls, _, err = authorizer.AuthorizeFindLabels(ctx, ls)
return ls, err
}
// FindLabelByID checks to see if the authorizer on context has read access to the label id provided.
func (s *AuthedLabelService) FindLabelByID(ctx context.Context, id influxdb.ID) (*influxdb.Label, error) {
l, err := s.s.FindLabelByID(ctx, id)
if err != nil {
return nil, err
}
if _, _, err := authorizer.AuthorizeRead(ctx, influxdb.LabelsResourceType, id, l.OrgID); err != nil {
return nil, err
}
return l, nil
}
// FindResourceLabels retrieves all labels belonging to the filtering resource if the authorizer on context has read access to it.
// Then it filters the list down to only the labels that are authorized.
func (s *AuthedLabelService) FindResourceLabels(ctx context.Context, filter influxdb.LabelMappingFilter) ([]*influxdb.Label, error) {
if err := filter.ResourceType.Valid(); err != nil {
return nil, err
}
// first fetch all labels for this resource
ls, err := s.s.FindResourceLabels(ctx, filter)
if err != nil {
return nil, err
}
// check the permissions for the resource by the org on the context
orgID := kithttp.OrgIDFromContext(ctx)
if orgID == nil {
return nil, errors.New("failed to find orgID on context")
}
if _, _, err := authorizer.AuthorizeRead(ctx, filter.ResourceType, filter.ResourceID, *orgID); err != nil {
return nil, err
}
// then filter the labels we got to return only the ones the user is authorized to read
ls, _, err = authorizer.AuthorizeFindLabels(ctx, ls)
return ls, err
}
// UpdateLabel checks to see if the authorizer on context has write access to the label provided.
func (s *AuthedLabelService) UpdateLabel(ctx context.Context, id influxdb.ID, upd influxdb.LabelUpdate) (*influxdb.Label, error) {
l, err := s.s.FindLabelByID(ctx, id)
if err != nil {
return nil, err
}
if _, _, err := authorizer.AuthorizeWrite(ctx, influxdb.LabelsResourceType, l.ID, l.OrgID); err != nil {
return nil, err
}
return s.s.UpdateLabel(ctx, id, upd)
}
// DeleteLabel checks to see if the authorizer on context has write access to the label provided.
func (s *AuthedLabelService) DeleteLabel(ctx context.Context, id influxdb.ID) error {
l, err := s.s.FindLabelByID(ctx, id)
if err != nil {
return err
}
if _, _, err := authorizer.AuthorizeWrite(ctx, influxdb.LabelsResourceType, l.ID, l.OrgID); err != nil {
return err
}
return s.s.DeleteLabel(ctx, id)
}
// CreateLabelMapping checks to see if the authorizer on context has write access to the label and the resource contained by the label mapping in creation.
func (s *AuthedLabelService) CreateLabelMapping(ctx context.Context, m *influxdb.LabelMapping) error {
l, err := s.s.FindLabelByID(ctx, m.LabelID)
if err != nil {
return err
}
if _, _, err := authorizer.AuthorizeWrite(ctx, influxdb.LabelsResourceType, m.LabelID, l.OrgID); err != nil {
return err
}
if _, _, err := authorizer.AuthorizeWrite(ctx, m.ResourceType, m.ResourceID, l.OrgID); err != nil {
return err
}
return s.s.CreateLabelMapping(ctx, m)
}
// DeleteLabelMapping checks to see if the authorizer on context has write access to the label and the resource of the label mapping to delete.
func (s *AuthedLabelService) DeleteLabelMapping(ctx context.Context, m *influxdb.LabelMapping) error {
l, err := s.s.FindLabelByID(ctx, m.LabelID)
if err != nil {
return err
}
if _, _, err := authorizer.AuthorizeWrite(ctx, influxdb.LabelsResourceType, m.LabelID, l.OrgID); err != nil {
return err
}
if _, _, err := authorizer.AuthorizeWrite(ctx, m.ResourceType, m.ResourceID, l.OrgID); err != nil {
return err
}
return s.s.DeleteLabelMapping(ctx, m)
}

File diff suppressed because it is too large Load Diff

127
label/middleware_logging.go Normal file
View File

@ -0,0 +1,127 @@
package label
import (
"context"
"fmt"
"time"
"github.com/influxdata/influxdb/v2"
"go.uber.org/zap"
)
type LabelLogger struct {
logger *zap.Logger
labelService influxdb.LabelService
}
func NewLabelLogger(log *zap.Logger, s influxdb.LabelService) *LabelLogger {
return &LabelLogger{
logger: log,
labelService: s,
}
}
var _ influxdb.LabelService = (*LabelLogger)(nil)
func (l *LabelLogger) CreateLabel(ctx context.Context, label *influxdb.Label) (err error) {
defer func(start time.Time) {
dur := zap.Duration("took", time.Since(start))
if err != nil {
l.logger.Debug("failed to create label", zap.Error(err), dur)
return
}
l.logger.Debug("label create", dur)
}(time.Now())
return l.labelService.CreateLabel(ctx, label)
}
func (l *LabelLogger) FindLabelByID(ctx context.Context, id influxdb.ID) (label *influxdb.Label, err error) {
defer func(start time.Time) {
dur := zap.Duration("took", time.Since(start))
if err != nil {
msg := fmt.Sprintf("failed to find label with ID %v", id)
l.logger.Debug(msg, zap.Error(err), dur)
return
}
l.logger.Debug("label find by ID", dur)
}(time.Now())
return l.labelService.FindLabelByID(ctx, id)
}
func (l *LabelLogger) FindLabels(ctx context.Context, filter influxdb.LabelFilter, opt ...influxdb.FindOptions) (ls []*influxdb.Label, err error) {
defer func(start time.Time) {
dur := zap.Duration("took", time.Since(start))
if err != nil {
l.logger.Debug("failed to find labels matching the given filter", zap.Error(err), dur)
return
}
l.logger.Debug("labels find", dur)
}(time.Now())
return l.labelService.FindLabels(ctx, filter, opt...)
}
func (l *LabelLogger) FindResourceLabels(ctx context.Context, filter influxdb.LabelMappingFilter) (ls []*influxdb.Label, err error) {
defer func(start time.Time) {
dur := zap.Duration("took", time.Since(start))
if err != nil {
l.logger.Debug("failed to find resource labels matching the given filter", zap.Error(err), dur)
return
}
l.logger.Debug("labels for resource find", dur)
}(time.Now())
return l.labelService.FindResourceLabels(ctx, filter)
}
func (l *LabelLogger) UpdateLabel(ctx context.Context, id influxdb.ID, upd influxdb.LabelUpdate) (lbl *influxdb.Label, err error) {
defer func(start time.Time) {
dur := zap.Duration("took", time.Since(start))
if err != nil {
l.logger.Debug("failed to update label", zap.Error(err), dur)
return
}
l.logger.Debug("label update", dur)
}(time.Now())
return l.labelService.UpdateLabel(ctx, id, upd)
}
func (l *LabelLogger) DeleteLabel(ctx context.Context, id influxdb.ID) (err error) {
defer func(start time.Time) {
dur := zap.Duration("took", time.Since(start))
if err != nil {
l.logger.Debug("failed to delete label", zap.Error(err), dur)
return
}
l.logger.Debug("label delete", dur)
}(time.Now())
return l.labelService.DeleteLabel(ctx, id)
}
func (l *LabelLogger) CreateLabelMapping(ctx context.Context, m *influxdb.LabelMapping) (err error) {
defer func(start time.Time) {
dur := zap.Duration("took", time.Since(start))
if err != nil {
l.logger.Debug("failed to create label mapping", zap.Error(err), dur)
return
}
l.logger.Debug("label mapping create", dur)
}(time.Now())
return l.labelService.CreateLabelMapping(ctx, m)
}
func (l *LabelLogger) DeleteLabelMapping(ctx context.Context, m *influxdb.LabelMapping) (err error) {
defer func(start time.Time) {
dur := zap.Duration("took", time.Since(start))
if err != nil {
l.logger.Debug("failed to delete label mapping", zap.Error(err), dur)
return
}
l.logger.Debug("label mapping delete", dur)
}(time.Now())
return l.labelService.DeleteLabelMapping(ctx, m)
}

View File

@ -0,0 +1,19 @@
package label_test
import (
"testing"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/label"
influxdbtesting "github.com/influxdata/influxdb/v2/testing"
"go.uber.org/zap/zaptest"
)
func TestLabelLoggingService(t *testing.T) {
influxdbtesting.LabelService(initBoltLabelLoggingService, t)
}
func initBoltLabelLoggingService(f influxdbtesting.LabelFields, t *testing.T) (influxdb.LabelService, string, func()) {
svc, s, closer := initBoltLabelService(f, t)
return label.NewLabelLogger(zaptest.NewLogger(t), svc), s, closer
}

View File

@ -0,0 +1,74 @@
package label
import (
"context"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/kit/metric"
"github.com/prometheus/client_golang/prometheus"
)
type LabelMetrics struct {
// RED metrics
rec *metric.REDClient
labelService influxdb.LabelService
}
func NewLabelMetrics(reg prometheus.Registerer, s influxdb.LabelService, opts ...metric.MetricsOption) *LabelMetrics {
o := metric.ApplyMetricOpts(opts...)
return &LabelMetrics{
rec: metric.New(reg, o.ApplySuffix("org")),
labelService: s,
}
}
var _ influxdb.LabelService = (*LabelMetrics)(nil)
func (m *LabelMetrics) CreateLabel(ctx context.Context, l *influxdb.Label) (err error) {
rec := m.rec.Record("create_label")
err = m.labelService.CreateLabel(ctx, l)
return rec(err)
}
func (m *LabelMetrics) FindLabelByID(ctx context.Context, id influxdb.ID) (label *influxdb.Label, err error) {
rec := m.rec.Record("find_label_by_id")
l, err := m.labelService.FindLabelByID(ctx, id)
return l, rec(err)
}
func (m *LabelMetrics) FindLabels(ctx context.Context, filter influxdb.LabelFilter, opt ...influxdb.FindOptions) (ls []*influxdb.Label, err error) {
rec := m.rec.Record("find_labels")
l, err := m.labelService.FindLabels(ctx, filter, opt...)
return l, rec(err)
}
func (m *LabelMetrics) FindResourceLabels(ctx context.Context, filter influxdb.LabelMappingFilter) (ls []*influxdb.Label, err error) {
rec := m.rec.Record("find_labels_for_resource")
l, err := m.labelService.FindResourceLabels(ctx, filter)
return l, rec(err)
}
func (m *LabelMetrics) UpdateLabel(ctx context.Context, id influxdb.ID, upd influxdb.LabelUpdate) (lbl *influxdb.Label, err error) {
rec := m.rec.Record("update_label")
l, err := m.labelService.UpdateLabel(ctx, id, upd)
return l, rec(err)
}
func (m *LabelMetrics) DeleteLabel(ctx context.Context, id influxdb.ID) (err error) {
rec := m.rec.Record("delete_label")
err = m.labelService.DeleteLabel(ctx, id)
return rec(err)
}
func (m *LabelMetrics) CreateLabelMapping(ctx context.Context, lm *influxdb.LabelMapping) (err error) {
rec := m.rec.Record("create_label_mapping")
err = m.labelService.CreateLabelMapping(ctx, lm)
return rec(err)
}
func (m *LabelMetrics) DeleteLabelMapping(ctx context.Context, lm *influxdb.LabelMapping) (err error) {
rec := m.rec.Record("delete_label_mapping")
err = m.labelService.DeleteLabelMapping(ctx, lm)
return rec(err)
}

View File

@ -0,0 +1,22 @@
package label_test
import (
"testing"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/kit/prom"
"github.com/influxdata/influxdb/v2/label"
"go.uber.org/zap"
influxdbtesting "github.com/influxdata/influxdb/v2/testing"
)
func TestLabelMetricsService(t *testing.T) {
influxdbtesting.LabelService(initBoltLabelMetricsService, t)
}
func initBoltLabelMetricsService(f influxdbtesting.LabelFields, t *testing.T) (influxdb.LabelService, string, func()) {
svc, s, closer := initBoltLabelService(f, t)
reg := prom.NewRegistry(zap.NewNop())
return label.NewLabelMetrics(reg, svc), s, closer
}

221
label/service.go Normal file
View File

@ -0,0 +1,221 @@
package label
import (
"context"
"strings"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/kv"
)
type Service struct {
store *Store
urmCreator UserResourceMappingCreator
}
type UserResourceMappingCreator interface {
CreateUserResourceMappingForOrg(ctx context.Context, tx kv.Tx, orgID influxdb.ID, resID influxdb.ID, resType influxdb.ResourceType) error
}
func NewService(st *Store, urmCreator UserResourceMappingCreator) influxdb.LabelService {
return &Service{
store: st,
urmCreator: urmCreator, // todo (al) this can be removed once URMs are removed from the Label service
}
}
// CreateLabel creates a new label.
func (s *Service) CreateLabel(ctx context.Context, l *influxdb.Label) error {
if err := l.Validate(); err != nil {
return &influxdb.Error{
Code: influxdb.EInvalid,
Err: err,
}
}
l.Name = strings.TrimSpace(l.Name)
err := s.store.Update(ctx, func(tx kv.Tx) error {
if err := uniqueLabelName(ctx, tx, l); err != nil {
return err
}
if err := s.store.CreateLabel(ctx, tx, l); err != nil {
return err
}
if err := s.urmCreator.CreateUserResourceMappingForOrg(ctx, tx, l.OrgID, l.ID, influxdb.LabelsResourceType); err != nil {
return err
}
return nil
})
if err != nil {
return err
}
return err
}
// FindLabelByID finds a label by its ID
func (s *Service) FindLabelByID(ctx context.Context, id influxdb.ID) (*influxdb.Label, error) {
var l *influxdb.Label
err := s.store.View(ctx, func(tx kv.Tx) error {
label, e := s.store.GetLabel(ctx, tx, id)
if e != nil {
return e
}
l = label
return nil
})
if err != nil {
return nil, &influxdb.Error{
Err: err,
}
}
return l, nil
}
// FindLabels returns a list of labels that match a filter.
func (s *Service) FindLabels(ctx context.Context, filter influxdb.LabelFilter, opt ...influxdb.FindOptions) ([]*influxdb.Label, error) {
ls := []*influxdb.Label{}
err := s.store.View(ctx, func(tx kv.Tx) error {
labels, err := s.store.ListLabels(ctx, tx, filter)
if err != nil {
return err
}
ls = labels
return nil
})
if err != nil {
return nil, err
}
return ls, nil
}
func (s *Service) FindResourceLabels(ctx context.Context, filter influxdb.LabelMappingFilter) ([]*influxdb.Label, error) {
ls := []*influxdb.Label{}
if err := s.store.View(ctx, func(tx kv.Tx) error {
return s.store.FindResourceLabels(ctx, tx, filter, &ls)
}); err != nil {
return nil, err
}
return ls, nil
}
// UpdateLabel updates a label.
func (s *Service) UpdateLabel(ctx context.Context, id influxdb.ID, upd influxdb.LabelUpdate) (*influxdb.Label, error) {
var label *influxdb.Label
err := s.store.Update(ctx, func(tx kv.Tx) error {
l, e := s.store.UpdateLabel(ctx, tx, id, upd)
if e != nil {
return &influxdb.Error{
Err: e,
}
}
label = l
return nil
})
return label, err
}
// DeleteLabel deletes a label.
func (s *Service) DeleteLabel(ctx context.Context, id influxdb.ID) error {
err := s.store.Update(ctx, func(tx kv.Tx) error {
return s.store.DeleteLabel(ctx, tx, id)
})
if err != nil {
return &influxdb.Error{
Err: err,
}
}
return nil
}
//******* Label Mappings *******//
// CreateLabelMapping creates a new mapping between a resource and a label.
func (s *Service) CreateLabelMapping(ctx context.Context, m *influxdb.LabelMapping) error {
err := s.store.View(ctx, func(tx kv.Tx) error {
if _, err := s.store.GetLabel(ctx, tx, m.LabelID); err != nil {
return err
}
ls := []*influxdb.Label{}
err := s.store.FindResourceLabels(ctx, tx, influxdb.LabelMappingFilter{ResourceID: m.ResourceID, ResourceType: m.ResourceType}, &ls)
if err != nil {
return err
}
for i := 0; i < len(ls); i++ {
if ls[i].ID == m.LabelID {
return influxdb.ErrLabelExistsOnResource
}
}
return nil
})
if err != nil {
return err
}
return s.store.Update(ctx, func(tx kv.Tx) error {
return s.store.CreateLabelMapping(ctx, tx, m)
})
}
// DeleteLabelMapping deletes a label mapping.
func (s *Service) DeleteLabelMapping(ctx context.Context, m *influxdb.LabelMapping) error {
err := s.store.Update(ctx, func(tx kv.Tx) error {
return s.store.DeleteLabelMapping(ctx, tx, m)
})
if err != nil {
return &influxdb.Error{
Err: err,
}
}
return nil
}
//******* helper functions *******//
func unique(ctx context.Context, tx kv.Tx, indexBucket, indexKey []byte) error {
bucket, err := tx.Bucket(indexBucket)
if err != nil {
return kv.UnexpectedIndexError(err)
}
_, err = bucket.Get(indexKey)
// if not found then this is _unique_.
if kv.IsNotFound(err) {
return nil
}
// no error means this is not unique
if err == nil {
return kv.NotUniqueError
}
// any other error is some sort of internal server error
return kv.UnexpectedIndexError(err)
}
func uniqueLabelName(ctx context.Context, tx kv.Tx, lbl *influxdb.Label) error {
key, err := labelIndexKey(lbl)
if err != nil {
return err
}
// labels are unique by `organization:label_name`
err = unique(ctx, tx, labelIndex, key)
if err == kv.NotUniqueError {
return labelAlreadyExistsError(lbl)
}
return err
}

86
label/service_test.go Normal file
View File

@ -0,0 +1,86 @@
package label_test
import (
"context"
"errors"
"io/ioutil"
"os"
"testing"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/bolt"
"github.com/influxdata/influxdb/v2/kv"
"github.com/influxdata/influxdb/v2/label"
influxdbtesting "github.com/influxdata/influxdb/v2/testing"
"go.uber.org/zap/zaptest"
)
func TestBoltLabelService(t *testing.T) {
influxdbtesting.LabelService(initBoltLabelService, t)
}
func NewTestBoltStore(t *testing.T) (kv.Store, func(), error) {
f, err := ioutil.TempFile("", "influxdata-bolt-")
if err != nil {
return nil, nil, errors.New("unable to open temporary boltdb file")
}
f.Close()
path := f.Name()
s := bolt.NewKVStore(zaptest.NewLogger(t), path)
if err := s.Open(context.Background()); err != nil {
return nil, nil, err
}
close := func() {
s.Close()
os.Remove(path)
}
return s, close, nil
}
func initBoltLabelService(f influxdbtesting.LabelFields, t *testing.T) (influxdb.LabelService, string, func()) {
s, closeBolt, err := NewTestBoltStore(t)
if err != nil {
t.Fatalf("failed to create new kv store: %v", err)
}
svc, op, closeSvc := initLabelService(s, f, t)
return svc, op, func() {
closeSvc()
closeBolt()
}
}
func initLabelService(s kv.Store, f influxdbtesting.LabelFields, t *testing.T) (influxdb.LabelService, string, func()) {
st, err := label.NewStore(s)
if err != nil {
t.Fatalf("failed to create label store: %v", err)
}
kvSvc := kv.NewService(zaptest.NewLogger(t), s)
svc := label.NewService(st, kvSvc)
ctx := context.Background()
for _, l := range f.Labels {
if err := svc.CreateLabel(ctx, l); err != nil {
t.Fatalf("failed to populate labels: %v", err)
}
}
for _, m := range f.Mappings {
if err := svc.CreateLabelMapping(ctx, m); err != nil {
t.Fatalf("failed to populate label mappings: %v", err)
}
}
return svc, kv.OpPrefix, func() {
for _, l := range f.Labels {
if err := svc.DeleteLabel(ctx, l.ID); err != nil {
t.Logf("failed to remove label: %v", err)
}
}
}
}

112
label/storage.go Normal file
View File

@ -0,0 +1,112 @@
package label
import (
"context"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/kit/tracing"
"github.com/influxdata/influxdb/v2/kv"
"github.com/influxdata/influxdb/v2/snowflake"
)
const MaxIDGenerationN = 100
const ReservedIDs = 1000
var (
labelBucket = []byte("labelsv1")
labelMappingBucket = []byte("labelmappingsv1")
labelIndex = []byte("labelindexv1")
)
type Store struct {
kvStore kv.Store
IDGenerator influxdb.IDGenerator
}
func NewStore(kvStore kv.Store) (*Store, error) {
st := &Store{
kvStore: kvStore,
IDGenerator: snowflake.NewDefaultIDGenerator(),
}
return st, st.setup()
}
// View opens up a transaction that will not write to any data. Implementing interfaces
// should take care to ensure that all view transactions do not mutate any data.
func (s *Store) View(ctx context.Context, fn func(kv.Tx) error) error {
return s.kvStore.View(ctx, fn)
}
// Update opens up a transaction that will mutate data.
func (s *Store) Update(ctx context.Context, fn func(kv.Tx) error) error {
return s.kvStore.Update(ctx, fn)
}
func (s *Store) setup() error {
return s.Update(context.Background(), func(tx kv.Tx) error {
if _, err := tx.Bucket(labelBucket); err != nil {
return err
}
if _, err := tx.Bucket(labelMappingBucket); err != nil {
return err
}
if _, err := tx.Bucket(labelIndex); err != nil {
return err
}
return nil
})
}
// generateSafeID attempts to create ids for buckets
// and orgs that are without backslash, commas, and spaces, BUT ALSO do not already exist.
func (s *Store) generateSafeID(ctx context.Context, tx kv.Tx, bucket []byte) (influxdb.ID, error) {
for i := 0; i < MaxIDGenerationN; i++ {
id := s.IDGenerator.ID()
// TODO: this is probably unnecessary but for testing we need to keep it in.
// After KV is cleaned out we can update the tests and remove this.
if id < ReservedIDs {
continue
}
err := s.uniqueID(ctx, tx, bucket, id)
if err == nil {
return id, nil
}
if err == NotUniqueIDError {
continue
}
return influxdb.InvalidID(), err
}
return influxdb.InvalidID(), ErrFailureGeneratingID
}
func (s *Store) uniqueID(ctx context.Context, tx kv.Tx, bucket []byte, id influxdb.ID) error {
span, _ := tracing.StartSpanFromContext(ctx)
defer span.Finish()
encodedID, err := id.Encode()
if err != nil {
return &influxdb.Error{
Code: influxdb.EInvalid,
Err: err,
}
}
b, err := tx.Bucket(bucket)
if err != nil {
return err
}
_, err = b.Get(encodedID)
if kv.IsNotFound(err) {
return nil
}
return NotUniqueIDError
}

501
label/storage_label.go Normal file
View File

@ -0,0 +1,501 @@
package label
import (
"context"
"encoding/json"
"fmt"
"strings"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/kv"
)
func (s *Store) CreateLabel(ctx context.Context, tx kv.Tx, l *influxdb.Label) error {
// if the provided ID is invalid, or already maps to an existing Auth, then generate a new one
if !l.ID.Valid() {
id, err := s.generateSafeID(ctx, tx, labelBucket)
if err != nil {
return nil
}
l.ID = id
} else if err := uniqueID(ctx, tx, l.ID); err != nil {
id, err := s.generateSafeID(ctx, tx, labelBucket)
if err != nil {
return nil
}
l.ID = id
}
v, err := json.Marshal(l)
if err != nil {
return &influxdb.Error{
Err: err,
}
}
encodedID, err := l.ID.Encode()
if err != nil {
return &influxdb.Error{
Err: err,
}
}
idx, err := tx.Bucket(labelIndex)
if err != nil {
return &influxdb.Error{
Err: err,
}
}
key, err := labelIndexKey(l)
if err != nil {
return &influxdb.Error{
Err: err,
}
}
if err := idx.Put([]byte(key), encodedID); err != nil {
return &influxdb.Error{
Err: err,
}
}
b, err := tx.Bucket(labelBucket)
if err != nil {
return &influxdb.Error{
Err: err,
}
}
if err := b.Put(encodedID, v); err != nil {
return &influxdb.Error{
Err: err,
}
}
return nil
}
func (s *Store) ListLabels(ctx context.Context, tx kv.Tx, filter influxdb.LabelFilter) ([]*influxdb.Label, error) {
ls := []*influxdb.Label{}
filterFn := filterLabelsFn(filter)
err := forEachLabel(ctx, tx, func(l *influxdb.Label) bool {
if filterFn(l) {
ls = append(ls, l)
}
return true
})
if err != nil {
return nil, err
}
return ls, nil
}
func (s *Store) GetLabel(ctx context.Context, tx kv.Tx, id influxdb.ID) (*influxdb.Label, error) {
encodedID, err := id.Encode()
if err != nil {
return nil, &influxdb.Error{
Err: err,
}
}
b, err := tx.Bucket(labelBucket)
if err != nil {
return nil, err
}
v, err := b.Get(encodedID)
if kv.IsNotFound(err) {
return nil, &influxdb.Error{
Code: influxdb.ENotFound,
Msg: influxdb.ErrLabelNotFound,
}
}
if err != nil {
return nil, err
}
var l influxdb.Label
if err := json.Unmarshal(v, &l); err != nil {
return nil, &influxdb.Error{
Err: err,
}
}
return &l, nil
}
func (s *Store) UpdateLabel(ctx context.Context, tx kv.Tx, id influxdb.ID, upd influxdb.LabelUpdate) (*influxdb.Label, error) {
label, err := s.GetLabel(ctx, tx, id)
if err != nil {
return nil, err
}
if len(upd.Properties) > 0 && label.Properties == nil {
label.Properties = make(map[string]string)
}
for k, v := range upd.Properties {
if v == "" {
delete(label.Properties, k)
} else {
label.Properties[k] = v
}
}
if upd.Name != "" {
upd.Name = strings.TrimSpace(upd.Name)
idx, err := tx.Bucket(labelIndex)
if err != nil {
return nil, &influxdb.Error{
Err: err,
}
}
key, err := labelIndexKey(label)
if err != nil {
return nil, &influxdb.Error{
Err: err,
}
}
if err := idx.Delete(key); err != nil {
return nil, &influxdb.Error{
Err: err,
}
}
label.Name = upd.Name
if err := uniqueLabelName(ctx, tx, label); err != nil {
return nil, &influxdb.Error{
Err: err,
}
}
}
if err := label.Validate(); err != nil {
return nil, &influxdb.Error{
Code: influxdb.EInvalid,
Err: err,
}
}
v, err := json.Marshal(label)
if err != nil {
return nil, &influxdb.Error{
Err: err,
}
}
encodedID, err := label.ID.Encode()
if err != nil {
return nil, &influxdb.Error{
Err: err,
}
}
idx, err := tx.Bucket(labelIndex)
if err != nil {
return nil, &influxdb.Error{
Err: err,
}
}
key, err := labelIndexKey(label)
if err != nil {
return nil, &influxdb.Error{
Err: err,
}
}
if err := idx.Put([]byte(key), encodedID); err != nil {
return nil, &influxdb.Error{
Err: err,
}
}
b, err := tx.Bucket(labelBucket)
if err != nil {
return nil, err
}
if err := b.Put(encodedID, v); err != nil {
return nil, &influxdb.Error{
Err: err,
}
}
return label, nil
}
func (s *Store) DeleteLabel(ctx context.Context, tx kv.Tx, id influxdb.ID) error {
label, err := s.GetLabel(ctx, tx, id)
if err != nil {
return ErrLabelNotFound
}
encodedID, idErr := id.Encode()
if idErr != nil {
return &influxdb.Error{
Err: idErr,
}
}
b, err := tx.Bucket(labelBucket)
if err != nil {
return err
}
if err := b.Delete(encodedID); err != nil {
return &influxdb.Error{
Err: err,
}
}
idx, err := tx.Bucket(labelIndex)
if err != nil {
return &influxdb.Error{
Err: err,
}
}
key, err := labelIndexKey(label)
if err != nil {
return &influxdb.Error{
Err: err,
}
}
if err := idx.Delete(key); err != nil {
return &influxdb.Error{
Err: err,
}
}
return nil
}
//********* Label Mappings *********//
func (s *Store) CreateLabelMapping(ctx context.Context, tx kv.Tx, m *influxdb.LabelMapping) error {
v, err := json.Marshal(m)
if err != nil {
return &influxdb.Error{
Err: err,
}
}
key, err := labelMappingKey(m)
if err != nil {
return &influxdb.Error{
Err: err,
}
}
idx, err := tx.Bucket(labelMappingBucket)
if err != nil {
return err
}
if err := idx.Put(key, v); err != nil {
return &influxdb.Error{
Err: err,
}
}
return nil
}
func (s *Store) FindResourceLabels(ctx context.Context, tx kv.Tx, filter influxdb.LabelMappingFilter, ls *[]*influxdb.Label) error {
if !filter.ResourceID.Valid() {
return &influxdb.Error{Code: influxdb.EInvalid, Msg: "filter requires a valid resource id", Err: influxdb.ErrInvalidID}
}
idx, err := tx.Bucket(labelMappingBucket)
if err != nil {
return err
}
prefix, err := filter.ResourceID.Encode()
if err != nil {
return err
}
cur, err := idx.ForwardCursor(prefix, kv.WithCursorPrefix(prefix))
if err != nil {
return err
}
for k, _ := cur.Next(); k != nil; k, _ = cur.Next() {
_, id, err := decodeLabelMappingKey(k)
if err != nil {
return err
}
l, err := s.GetLabel(ctx, tx, id)
if l == nil && err != nil {
// TODO(jm): return error instead of continuing once orphaned mappings are fixed
// (see https://github.com/influxdata/influxdb/issues/11278)
continue
}
*ls = append(*ls, l)
}
if err := cur.Err(); err != nil {
return err
}
return cur.Close()
}
func (s *Store) DeleteLabelMapping(ctx context.Context, tx kv.Tx, m *influxdb.LabelMapping) error {
key, err := labelMappingKey(m)
if err != nil {
return &influxdb.Error{
Err: err,
}
}
idx, err := tx.Bucket(labelMappingBucket)
if err != nil {
return err
}
if err := idx.Delete(key); err != nil {
return &influxdb.Error{
Err: err,
}
}
return nil
}
//********* helper functions *********//
func labelMappingKey(m *influxdb.LabelMapping) ([]byte, error) {
lid, err := m.LabelID.Encode()
if err != nil {
return nil, &influxdb.Error{
Code: influxdb.EInvalid,
Err: err,
}
}
rid, err := m.ResourceID.Encode()
if err != nil {
return nil, &influxdb.Error{
Code: influxdb.EInvalid,
Err: err,
}
}
key := make([]byte, influxdb.IDLength+influxdb.IDLength) // len(rid) + len(lid)
copy(key, rid)
copy(key[len(rid):], lid)
return key, nil
}
// labelAlreadyExistsError is used when creating a new label with
// a name that has already been used. Label names must be unique.
func labelAlreadyExistsError(lbl *influxdb.Label) error {
return &influxdb.Error{
Code: influxdb.EConflict,
Msg: fmt.Sprintf("label with name %s already exists", lbl.Name),
}
}
func labelIndexKey(l *influxdb.Label) ([]byte, error) {
orgID, err := l.OrgID.Encode()
if err != nil {
return nil, &influxdb.Error{
Code: influxdb.EInvalid,
Err: err,
}
}
k := make([]byte, influxdb.IDLength+len(l.Name))
copy(k, orgID)
copy(k[influxdb.IDLength:], []byte(strings.ToLower((l.Name))))
return k, nil
}
func filterLabelsFn(filter influxdb.LabelFilter) func(l *influxdb.Label) bool {
return func(label *influxdb.Label) bool {
return (filter.Name == "" || (strings.EqualFold(filter.Name, label.Name))) &&
((filter.OrgID == nil) || (filter.OrgID != nil && *filter.OrgID == label.OrgID))
}
}
func decodeLabelMappingKey(key []byte) (resourceID influxdb.ID, labelID influxdb.ID, err error) {
if len(key) != 2*influxdb.IDLength {
return 0, 0, &influxdb.Error{Code: influxdb.EInvalid, Msg: "malformed label mapping key (please report this error)"}
}
if err := (&resourceID).Decode(key[:influxdb.IDLength]); err != nil {
return 0, 0, &influxdb.Error{Code: influxdb.EInvalid, Msg: "bad resource id", Err: influxdb.ErrInvalidID}
}
if err := (&labelID).Decode(key[influxdb.IDLength:]); err != nil {
return 0, 0, &influxdb.Error{Code: influxdb.EInvalid, Msg: "bad label id", Err: influxdb.ErrInvalidID}
}
return resourceID, labelID, nil
}
func forEachLabel(ctx context.Context, tx kv.Tx, fn func(*influxdb.Label) bool) error {
b, err := tx.Bucket(labelBucket)
if err != nil {
return err
}
cur, err := b.ForwardCursor(nil)
if err != nil {
return err
}
for k, v := cur.Next(); k != nil; k, v = cur.Next() {
l := &influxdb.Label{}
if err := json.Unmarshal(v, l); err != nil {
return err
}
if !fn(l) {
break
}
}
if err := cur.Err(); err != nil {
return err
}
return cur.Close()
}
// uniqueID returns nil if the ID provided is unique, returns an error otherwise
func uniqueID(ctx context.Context, tx kv.Tx, id influxdb.ID) error {
encodedID, err := id.Encode()
if err != nil {
return influxdb.ErrInvalidID
}
b, err := tx.Bucket(labelBucket)
if err != nil {
return ErrInternalServiceError(err)
}
_, err = b.Get(encodedID)
// if not found then the ID is unique
if kv.IsNotFound(err) {
return nil
}
// no error means this is not unique
if err == nil {
return kv.NotUniqueError
}
// any other error is some sort of internal server error
return kv.UnexpectedIndexError(err)
}

272
label/storage_test.go Normal file
View File

@ -0,0 +1,272 @@
package label_test
import (
"context"
"fmt"
"reflect"
"testing"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/inmem"
"github.com/influxdata/influxdb/v2/kv"
"github.com/influxdata/influxdb/v2/label"
)
func TestLabels(t *testing.T) {
s := func() kv.Store {
return inmem.NewKVStore()
}
setup := func(t *testing.T, store *label.Store, tx kv.Tx) {
for i := 1; i <= 10; i++ {
err := store.CreateLabel(context.Background(), tx, &influxdb.Label{
ID: influxdb.ID(i),
Name: fmt.Sprintf("labelname%d", i),
OrgID: influxdb.ID(i),
})
if err != nil {
t.Fatal(err)
}
}
}
setupForList := func(t *testing.T, store *label.Store, tx kv.Tx) {
setup(t, store, tx)
err := store.CreateLabel(context.Background(), tx, &influxdb.Label{
ID: influxdb.ID(11),
Name: fmt.Sprintf("labelname%d", 11),
OrgID: influxdb.ID(5),
})
if err != nil {
t.Fatal(err)
}
}
tt := []struct {
name string
setup func(*testing.T, *label.Store, kv.Tx)
update func(*testing.T, *label.Store, kv.Tx)
results func(*testing.T, *label.Store, kv.Tx)
}{
{
name: "create",
setup: setup,
results: func(t *testing.T, store *label.Store, tx kv.Tx) {
labels, err := store.ListLabels(context.Background(), tx, influxdb.LabelFilter{})
if err != nil {
t.Fatal(err)
}
if len(labels) != 10 {
t.Fatalf("expected 10 labels, got: %d", len(labels))
}
expected := []*influxdb.Label{}
for i := 1; i <= 10; i++ {
expected = append(expected, &influxdb.Label{
ID: influxdb.ID(i),
Name: fmt.Sprintf("labelname%d", i),
OrgID: influxdb.ID(i),
})
}
if !reflect.DeepEqual(labels, expected) {
t.Fatalf("expected identical labels: \n%+v\n%+v", labels, expected)
}
},
},
{
name: "get",
setup: setup,
results: func(t *testing.T, store *label.Store, tx kv.Tx) {
label, err := store.GetLabel(context.Background(), tx, influxdb.ID(1))
if err != nil {
t.Fatal(err)
}
expected := &influxdb.Label{
ID: influxdb.ID(1),
Name: "labelname1",
OrgID: influxdb.ID(1),
}
if !reflect.DeepEqual(label, expected) {
t.Fatalf("expected identical label: \n%+v\n%+v", label, expected)
}
},
},
{
name: "list",
setup: setupForList,
results: func(t *testing.T, store *label.Store, tx kv.Tx) {
// list all
labels, err := store.ListLabels(context.Background(), tx, influxdb.LabelFilter{})
if err != nil {
t.Fatal(err)
}
if len(labels) != 11 {
t.Fatalf("expected 11 labels, got: %d", len(labels))
}
expected := []*influxdb.Label{}
for i := 1; i <= 10; i++ {
expected = append(expected, &influxdb.Label{
ID: influxdb.ID(i),
Name: fmt.Sprintf("labelname%d", i),
OrgID: influxdb.ID(i),
})
}
expected = append(expected, &influxdb.Label{
ID: influxdb.ID(11),
Name: fmt.Sprintf("labelname%d", 11),
OrgID: influxdb.ID(5),
})
if !reflect.DeepEqual(labels, expected) {
t.Fatalf("expected identical labels: \n%+v\n%+v", labels, expected)
}
// filter by name
l, err := store.ListLabels(context.Background(), tx, influxdb.LabelFilter{Name: "labelname5"})
if err != nil {
t.Fatal(err)
}
if len(l) != 1 {
t.Fatalf("expected 1 label, got: %d", len(l))
}
expectedLabel := []*influxdb.Label{&influxdb.Label{
ID: influxdb.ID(5),
Name: "labelname5",
OrgID: influxdb.ID(5),
}}
if !reflect.DeepEqual(l, expectedLabel) {
t.Fatalf("label returned by list did not match expected: \n%+v\n%+v", l, expectedLabel)
}
// filter by org id
id := influxdb.ID(5)
l, err = store.ListLabels(context.Background(), tx, influxdb.LabelFilter{OrgID: &id})
if err != nil {
t.Fatal(err)
}
if len(l) != 2 {
t.Fatalf("expected 2 labels, got: %d", len(l))
}
expectedLabel = []*influxdb.Label{
&influxdb.Label{
ID: influxdb.ID(5),
Name: "labelname5",
OrgID: influxdb.ID(5)},
{
ID: influxdb.ID(11),
Name: "labelname11",
OrgID: influxdb.ID(5),
}}
if !reflect.DeepEqual(l, expectedLabel) {
t.Fatalf("label returned by list did not match expected: \n%+v\n%+v", l, expectedLabel)
}
},
},
{
name: "update",
setup: setup,
update: func(t *testing.T, store *label.Store, tx kv.Tx) {
upd := influxdb.LabelUpdate{Name: "newName"}
updated, err := store.UpdateLabel(context.Background(), tx, influxdb.ID(1), upd)
if err != nil {
t.Fatal(err)
}
if updated.Name != upd.Name {
t.Fatalf("expected updated name %s, got: %s", upd.Name, updated.Name)
}
},
results: func(t *testing.T, store *label.Store, tx kv.Tx) {
la, err := store.GetLabel(context.Background(), tx, influxdb.ID(1))
if err != nil {
t.Fatal(err)
}
if la.Name != "newName" {
t.Fatalf("expected update name to be %s, got: %s", "newName", la.Name)
}
},
},
{
name: "delete",
setup: setup,
update: func(t *testing.T, store *label.Store, tx kv.Tx) {
err := store.DeleteLabel(context.Background(), tx, influxdb.ID(5))
if err != nil {
t.Fatal(err)
}
err = store.DeleteLabel(context.Background(), tx, influxdb.ID(5))
if err != label.ErrLabelNotFound {
t.Fatal("expected label not found error when deleting bucket that has already been deleted, got: ", err)
}
},
results: func(t *testing.T, store *label.Store, tx kv.Tx) {
l, err := store.ListLabels(context.Background(), tx, influxdb.LabelFilter{})
if err != nil {
t.Fatal(err)
}
if len(l) != 9 {
t.Fatalf("expected 2 labels, got: %d", len(l))
}
},
},
}
for _, testScenario := range tt {
t.Run(testScenario.name, func(t *testing.T) {
ts, err := label.NewStore(s())
if err != nil {
t.Fatal(err)
}
// setup
if testScenario.setup != nil {
err := ts.Update(context.Background(), func(tx kv.Tx) error {
testScenario.setup(t, ts, tx)
return nil
})
if err != nil {
t.Fatal(err)
}
}
// update
if testScenario.update != nil {
err := ts.Update(context.Background(), func(tx kv.Tx) error {
testScenario.update(t, ts, tx)
return nil
})
if err != nil {
t.Fatal(err)
}
}
// results
if testScenario.results != nil {
err := ts.View(context.Background(), func(tx kv.Tx) error {
testScenario.results(t, ts, tx)
return nil
})
if err != nil {
t.Fatal(err)
}
}
})
}
}

View File

@ -45,15 +45,6 @@ var (
}
)
// ErrCorruptID the ID stored in the Store is corrupt.
func ErrCorruptID(err error) *influxdb.Error {
return &influxdb.Error{
Code: influxdb.EInvalid,
Msg: "corrupt ID provided",
Err: err,
}
}
// ErrInternalServiceError is used when the error comes from an internal system.
func ErrInternalServiceError(err error) *influxdb.Error {
return &influxdb.Error{

View File

@ -1,48 +0,0 @@
package tenant
import (
"context"
"net/http"
"github.com/go-chi/chi"
"github.com/influxdata/influxdb/v2"
kit "github.com/influxdata/influxdb/v2/kit/transport/http"
)
type tenantContext string
const ctxOrgKey tenantContext = "orgID"
// ValidResource make sure a resource exists when a sub system needs to be mounted to an api
func ValidResource(api *kit.API, lookupOrgByResourceID func(context.Context, influxdb.ID) (influxdb.ID, error)) kit.Middleware {
return func(next http.Handler) http.Handler {
fn := func(w http.ResponseWriter, r *http.Request) {
statusW := kit.NewStatusResponseWriter(w)
id, err := influxdb.IDFromString(chi.URLParam(r, "id"))
if err != nil {
api.Err(w, r, ErrCorruptID(err))
return
}
ctx := r.Context()
orgID, err := lookupOrgByResourceID(ctx, *id)
if err != nil {
api.Err(w, r, err)
return
}
next.ServeHTTP(statusW, r.WithContext(context.WithValue(ctx, ctxOrgKey, orgID)))
}
return http.HandlerFunc(fn)
}
}
func orgIDFromContext(ctx context.Context) *influxdb.ID {
v := ctx.Value(ctxOrgKey)
if v == nil {
return nil
}
id := v.(influxdb.ID)
return &id
}

View File

@ -52,7 +52,7 @@ func NewHTTPBucketHandler(log *zap.Logger, bucketSvc influxdb.BucketService, urm
r.Delete("/", svr.handleDeleteBucket)
// mount embedded resources
mountableRouter := r.With(ValidResource(svr.api, svr.lookupOrgByBucketID))
mountableRouter := r.With(kithttp.ValidResource(svr.api, svr.lookupOrgByBucketID))
mountableRouter.Mount("/members", urmHandler)
mountableRouter.Mount("/owners", urmHandler)
mountableRouter.Mount("/labels", labelHandler)

View File

@ -53,7 +53,7 @@ func NewHTTPOrgHandler(log *zap.Logger, orgService influxdb.OrganizationService,
r.Delete("/", svr.handleDeleteOrg)
// mount embedded resources
mountableRouter := r.With(ValidResource(svr.api, svr.lookupOrgByID))
mountableRouter := r.With(kithttp.ValidResource(svr.api, svr.lookupOrgByID))
mountableRouter.Mount("/members", urm)
mountableRouter.Mount("/owners", urm)
mountableRouter.Mount("/labels", labelHandler)

View File

@ -18,10 +18,10 @@ type BucketMetrics struct {
var _ influxdb.BucketService = (*BucketMetrics)(nil)
// NewBucketMetrics returns a metrics service middleware for the Bucket Service.
func NewBucketMetrics(reg prometheus.Registerer, s influxdb.BucketService, opts ...MetricsOption) *BucketMetrics {
o := applyOpts(opts...)
func NewBucketMetrics(reg prometheus.Registerer, s influxdb.BucketService, opts ...metric.MetricsOption) *BucketMetrics {
o := metric.ApplyMetricOpts(opts...)
return &BucketMetrics{
rec: metric.New(reg, o.applySuffix("bucket")),
rec: metric.New(reg, o.ApplySuffix("bucket")),
bucketService: s,
}
}

View File

@ -18,10 +18,10 @@ type OnboardingMetrics struct {
}
// NewOnboardingMetrics returns a metrics service middleware for the User Service.
func NewOnboardingMetrics(reg prometheus.Registerer, s influxdb.OnboardingService, opts ...MetricsOption) *OnboardingMetrics {
o := applyOpts(opts...)
func NewOnboardingMetrics(reg prometheus.Registerer, s influxdb.OnboardingService, opts ...metric.MetricsOption) *OnboardingMetrics {
o := metric.ApplyMetricOpts(opts...)
return &OnboardingMetrics{
rec: metric.New(reg, o.applySuffix("onboard")),
rec: metric.New(reg, o.ApplySuffix("onboard")),
onboardingService: s,
}
}

View File

@ -18,10 +18,10 @@ type OrgMetrics struct {
var _ influxdb.OrganizationService = (*OrgMetrics)(nil)
// NewOrgMetrics returns a metrics service middleware for the Organization Service.
func NewOrgMetrics(reg prometheus.Registerer, s influxdb.OrganizationService, opts ...MetricsOption) *OrgMetrics {
o := applyOpts(opts...)
func NewOrgMetrics(reg prometheus.Registerer, s influxdb.OrganizationService, opts ...metric.MetricsOption) *OrgMetrics {
o := metric.ApplyMetricOpts(opts...)
return &OrgMetrics{
rec: metric.New(reg, o.applySuffix("org")),
rec: metric.New(reg, o.ApplySuffix("org")),
orgService: s,
}
}

View File

@ -5,6 +5,7 @@ import (
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/authorizer"
kithttp "github.com/influxdata/influxdb/v2/kit/transport/http"
)
type AuthedURMService struct {
@ -26,8 +27,8 @@ func (s *AuthedURMService) FindUserResourceMappings(ctx context.Context, filter
}
authedUrms := urms[:0]
orgID := kithttp.OrgIDFromContext(ctx)
for _, urm := range urms {
orgID := orgIDFromContext(ctx)
if orgID != nil {
if _, _, err := authorizer.AuthorizeRead(ctx, urm.ResourceType, urm.ResourceID, *orgID); err != nil {
continue
@ -44,7 +45,7 @@ func (s *AuthedURMService) FindUserResourceMappings(ctx context.Context, filter
}
func (s *AuthedURMService) CreateUserResourceMapping(ctx context.Context, m *influxdb.UserResourceMapping) error {
orgID := orgIDFromContext(ctx)
orgID := kithttp.OrgIDFromContext(ctx)
if orgID != nil {
if _, _, err := authorizer.AuthorizeWrite(ctx, m.ResourceType, m.ResourceID, *orgID); err != nil {
return err
@ -71,7 +72,7 @@ func (s *AuthedURMService) DeleteUserResourceMapping(ctx context.Context, resour
// There should only be one because resourceID and userID are used to create the primary key for urms
for _, urm := range urms {
orgID := orgIDFromContext(ctx)
orgID := kithttp.OrgIDFromContext(ctx)
if orgID != nil {
if _, _, err := authorizer.AuthorizeWrite(ctx, urm.ResourceType, urm.ResourceID, *orgID); err != nil {
return err

View File

@ -7,6 +7,7 @@ import (
"github.com/google/go-cmp/cmp"
"github.com/influxdata/influxdb/v2"
influxdbcontext "github.com/influxdata/influxdb/v2/context"
kithttp "github.com/influxdata/influxdb/v2/kit/transport/http"
"github.com/influxdata/influxdb/v2/mock"
influxdbtesting "github.com/influxdata/influxdb/v2/testing"
)
@ -105,7 +106,7 @@ func TestURMService_FindUserResourceMappings(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
s := NewAuthedURMService(tt.fields.OrgService, tt.fields.UserResourceMappingService)
orgID := influxdbtesting.IDPtr(10)
ctx := context.WithValue(context.Background(), ctxOrgKey, *orgID)
ctx := context.WithValue(context.Background(), kithttp.CtxOrgKey, *orgID)
ctx = influxdbcontext.SetAuthorizer(ctx, mock.NewMockAuthorizer(false, tt.args.permissions))
urms, _, err := s.FindUserResourceMappings(ctx, influxdb.UserResourceMappingFilter{})

View File

@ -18,10 +18,10 @@ type UrmMetrics struct {
var _ influxdb.UserResourceMappingService = (*UrmMetrics)(nil)
// NewUrmMetrics returns a metrics service middleware for the User Resource Mapping Service.
func NewUrmMetrics(reg prometheus.Registerer, s influxdb.UserResourceMappingService, opts ...MetricsOption) *UrmMetrics {
o := applyOpts(opts...)
func NewUrmMetrics(reg prometheus.Registerer, s influxdb.UserResourceMappingService, opts ...metric.MetricsOption) *UrmMetrics {
o := metric.ApplyMetricOpts(opts...)
return &UrmMetrics{
rec: metric.New(reg, o.applySuffix("urm")),
rec: metric.New(reg, o.ApplySuffix("urm")),
urmService: s,
}
}

View File

@ -19,10 +19,10 @@ type UserMetrics struct {
}
// NewUserMetrics returns a metrics service middleware for the User Service.
func NewUserMetrics(reg prometheus.Registerer, s influxdb.UserService, opts ...MetricsOption) *UserMetrics {
o := applyOpts(opts...)
func NewUserMetrics(reg prometheus.Registerer, s influxdb.UserService, opts ...metric.MetricsOption) *UserMetrics {
o := metric.ApplyMetricOpts(opts...)
return &UserMetrics{
rec: metric.New(reg, o.applySuffix("user")),
rec: metric.New(reg, o.ApplySuffix("user")),
userService: s,
}
}
@ -71,10 +71,10 @@ type PasswordMetrics struct {
}
// NewPasswordMetrics returns a metrics service middleware for the Password Service.
func NewPasswordMetrics(reg prometheus.Registerer, s influxdb.PasswordsService, opts ...MetricsOption) *PasswordMetrics {
o := applyOpts(opts...)
func NewPasswordMetrics(reg prometheus.Registerer, s influxdb.PasswordsService, opts ...metric.MetricsOption) *PasswordMetrics {
o := metric.ApplyMetricOpts(opts...)
return &PasswordMetrics{
rec: metric.New(reg, o.applySuffix("password")),
rec: metric.New(reg, o.ApplySuffix("password")),
pwdService: s,
}
}

View File

@ -104,7 +104,7 @@ func (s *Store) GetOrgByName(ctx context.Context, tx kv.Tx, n string) (*influxdb
var id influxdb.ID
if err := id.Decode(uid); err != nil {
return nil, ErrCorruptID(err)
return nil, influxdb.ErrCorruptID(err)
}
return s.GetOrg(ctx, tx, id)
}

View File

@ -95,7 +95,7 @@ func (s *Store) GetUserByName(ctx context.Context, tx kv.Tx, n string) (*influxd
var id influxdb.ID
if err := id.Decode(uid); err != nil {
return nil, ErrCorruptID(err)
return nil, influxdb.ErrCorruptID(err)
}
return s.GetUser(ctx, tx, id)
}

View File

@ -243,6 +243,7 @@ func CreateLabel(
args: args{
label: &influxdb.Label{
Name: "Tag2",
ID: MustIDBase16(labelOneID),
OrgID: MustIDBase16(orgOneID),
Properties: map[string]string{
"color": "fff000",
@ -940,32 +941,6 @@ func CreateLabelMapping(
},
},
},
// {
// name: "duplicate label mappings",
// fields: LabelFields{
// IDGenerator: mock.NewIDGenerator(labelOneID, t),
// Labels: []*influxdb.Label{},
// },
// args: args{
// label: &influxdb.Label{
// Name: "Tag2",
// Properties: map[string]string{
// "color": "fff000",
// },
// },
// },
// wants: wants{
// labels: []*influxdb.Label{
// {
// ID: MustIDBase16(labelOneID),
// Name: "Tag2",
// Properties: map[string]string{
// "color": "fff000",
// },
// },
// },
// },
// },
}
for _, tt := range tests {