From 67307d804ede329c5f7a72933ec10680635a7deb Mon Sep 17 00:00:00 2001 From: George Date: Thu, 30 Apr 2020 17:55:17 +0100 Subject: [PATCH 1/4] chore(errors): interpret more cases as influxdb error type in http.CheckError (#17888) --- http/errors.go | 42 ++++++++++++++--------- http/errors_test.go | 45 +++++++++++++++++++++++-- kit/transport/http/api.go | 24 ++----------- kit/transport/http/error_handler.go | 52 ++++++++++++++++++++++++++--- 4 files changed, 117 insertions(+), 46 deletions(-) diff --git a/http/errors.go b/http/errors.go index 77261cb624..7f4d42034e 100644 --- a/http/errors.go +++ b/http/errors.go @@ -12,7 +12,7 @@ import ( "strings" platform "github.com/influxdata/influxdb/v2" - "github.com/pkg/errors" + khttp "github.com/influxdata/influxdb/v2/kit/transport/http" ) // AuthzError is returned for authorization errors. When this error type is returned, @@ -58,11 +58,13 @@ func CheckError(resp *http.Response) (err error) { } } + perr := &platform.Error{ + Code: khttp.StatusCodeToErrorCode(resp.StatusCode), + } + if resp.StatusCode == http.StatusUnsupportedMediaType { - return &platform.Error{ - Code: platform.EInvalid, - Msg: fmt.Sprintf("invalid media type: %q", resp.Header.Get("Content-Type")), - } + perr.Msg = fmt.Sprintf("invalid media type: %q", resp.Header.Get("Content-Type")) + return perr } contentType := resp.Header.Get("Content-Type") @@ -74,24 +76,32 @@ func CheckError(resp *http.Response) (err error) { var buf bytes.Buffer if _, err := io.Copy(&buf, resp.Body); err != nil { - return &platform.Error{ - Code: platform.EInternal, - Msg: err.Error(), - } + perr.Msg = "failed to read error response" + perr.Err = err + return perr } switch mediatype { case "application/json": - pe := new(platform.Error) - if err := json.Unmarshal(buf.Bytes(), pe); err != nil { - line, _ := buf.ReadString('\n') - return errors.Wrap(stderrors.New(strings.TrimSuffix(line, "\n")), err.Error()) + if err := json.Unmarshal(buf.Bytes(), perr); err != nil { + perr.Msg = fmt.Sprintf("attempted to unmarshal error as JSON but failed: %q", err) + perr.Err = firstLineAsError(buf) } - return pe default: - line, _ := buf.ReadString('\n') - return stderrors.New(strings.TrimSuffix(line, "\n")) + perr.Err = firstLineAsError(buf) } + + if perr.Code == "" { + // given it was unset during attempt to unmarshal as JSON + perr.Code = khttp.StatusCodeToErrorCode(resp.StatusCode) + } + + return perr +} + +func firstLineAsError(buf bytes.Buffer) error { + line, _ := buf.ReadString('\n') + return stderrors.New(strings.TrimSuffix(line, "\n")) } // UnauthorizedError encodes a error message and status code for unauthorized access. diff --git a/http/errors_test.go b/http/errors_test.go index 8f41d0001e..1110bfa988 100644 --- a/http/errors_test.go +++ b/http/errors_test.go @@ -12,7 +12,6 @@ import ( "github.com/influxdata/influxdb/v2" "github.com/influxdata/influxdb/v2/http" kithttp "github.com/influxdata/influxdb/v2/kit/transport/http" - "github.com/pkg/errors" ) func TestCheckError(t *testing.T) { @@ -43,7 +42,10 @@ func TestCheckError(t *testing.T) { w.WriteHeader(500) _, _ = io.WriteString(w, "upstream timeout\n") }, - want: stderrors.New("upstream timeout"), + want: &influxdb.Error{ + Code: influxdb.EInternal, + Err: stderrors.New("upstream timeout"), + }, }, { name: "error with bad json", @@ -52,7 +54,44 @@ func TestCheckError(t *testing.T) { w.WriteHeader(500) _, _ = io.WriteString(w, "upstream timeout\n") }, - want: errors.Wrap(stderrors.New("upstream timeout"), "invalid character 'u' looking for beginning of value"), + want: &influxdb.Error{ + Code: influxdb.EInternal, + Msg: `attempted to unmarshal error as JSON but failed: "invalid character 'u' looking for beginning of value"`, + Err: stderrors.New("upstream timeout"), + }, + }, + { + name: "error with no content-type (encoded as json - with code)", + write: func(w *httptest.ResponseRecorder) { + w.WriteHeader(500) + _, _ = io.WriteString(w, `{"error": "service unavailable", "code": "unavailable"}`) + }, + want: &influxdb.Error{ + Code: influxdb.EUnavailable, + Err: stderrors.New("service unavailable"), + }, + }, + { + name: "error with no content-type (encoded as json - no code)", + write: func(w *httptest.ResponseRecorder) { + w.WriteHeader(503) + _, _ = io.WriteString(w, `{"error": "service unavailable"}`) + }, + want: &influxdb.Error{ + Code: influxdb.EUnavailable, + Err: stderrors.New("service unavailable"), + }, + }, + { + name: "error with no content-type (not json encoded)", + write: func(w *httptest.ResponseRecorder) { + w.WriteHeader(503) + }, + want: &influxdb.Error{ + Code: influxdb.EUnavailable, + Msg: `attempted to unmarshal error as JSON but failed: "unexpected end of JSON input"`, + Err: stderrors.New(""), + }, }, } { t.Run(tt.name, func(t *testing.T) { diff --git a/kit/transport/http/api.go b/kit/transport/http/api.go index e5942ab369..eac985f499 100644 --- a/kit/transport/http/api.go +++ b/kit/transport/http/api.go @@ -87,19 +87,15 @@ func NewAPI(opts ...APIOptFn) *API { } }, errFn: func(err error) (interface{}, int, error) { - code := influxdb.ErrorCode(err) - httpStatusCode, ok := statusCodePlatformError[code] - if !ok { - httpStatusCode = http.StatusBadRequest - } msg := err.Error() if msg == "" { msg = "an internal error has occurred" } + code := influxdb.ErrorCode(err) return ErrBody{ Code: code, Msg: msg, - }, httpStatusCode, nil + }, ErrorCodeToStatusCode(code), nil }, } for _, o := range opts { @@ -242,19 +238,3 @@ type ErrBody struct { Code string `json:"code"` Msg string `json:"message"` } - -// statusCodePlatformError is the map convert platform.Error to error -var statusCodePlatformError = map[string]int{ - influxdb.EInternal: http.StatusInternalServerError, - influxdb.EInvalid: http.StatusBadRequest, - influxdb.EUnprocessableEntity: http.StatusUnprocessableEntity, - influxdb.EEmptyValue: http.StatusBadRequest, - influxdb.EConflict: http.StatusUnprocessableEntity, - influxdb.ENotFound: http.StatusNotFound, - influxdb.EUnavailable: http.StatusServiceUnavailable, - influxdb.EForbidden: http.StatusForbidden, - influxdb.ETooManyRequests: http.StatusTooManyRequests, - influxdb.EUnauthorized: http.StatusUnauthorized, - influxdb.EMethodNotAllowed: http.StatusMethodNotAllowed, - influxdb.ETooLarge: http.StatusRequestEntityTooLarge, -} diff --git a/kit/transport/http/error_handler.go b/kit/transport/http/error_handler.go index 52767835c0..61dca7d206 100644 --- a/kit/transport/http/error_handler.go +++ b/kit/transport/http/error_handler.go @@ -21,13 +21,9 @@ func (h ErrorHandler) HandleHTTPError(ctx context.Context, err error, w http.Res } code := influxdb.ErrorCode(err) - httpCode, ok := statusCodePlatformError[code] - if !ok { - httpCode = http.StatusBadRequest - } w.Header().Set(PlatformErrorCodeHeader, code) w.Header().Set("Content-Type", "application/json; charset=utf-8") - w.WriteHeader(httpCode) + w.WriteHeader(ErrorCodeToStatusCode(code)) var e struct { Code string `json:"code"` Message string `json:"message"` @@ -41,3 +37,49 @@ func (h ErrorHandler) HandleHTTPError(ctx context.Context, err error, w http.Res b, _ := json.Marshal(e) _, _ = w.Write(b) } + +// StatusCodeToErrorCode maps a http status code integer to an +// influxdb error code string. +func StatusCodeToErrorCode(statusCode int) string { + errorCode, ok := httpStatusCodeToInfluxDBError[statusCode] + if ok { + return errorCode + } + + return influxdb.EInternal +} + +// ErrorCodeToStatusCode maps an influxdb error code string to a +// http status code integer. +func ErrorCodeToStatusCode(code string) int { + statusCode, ok := influxDBErrorToStatusCode[code] + if ok { + return statusCode + } + + return http.StatusInternalServerError +} + +// influxDBErrorToStatusCode is a mapping of ErrorCode to http status code. +var influxDBErrorToStatusCode = map[string]int{ + influxdb.EInternal: http.StatusInternalServerError, + influxdb.EInvalid: http.StatusBadRequest, + influxdb.EUnprocessableEntity: http.StatusUnprocessableEntity, + influxdb.EEmptyValue: http.StatusBadRequest, + influxdb.EConflict: http.StatusUnprocessableEntity, + influxdb.ENotFound: http.StatusNotFound, + influxdb.EUnavailable: http.StatusServiceUnavailable, + influxdb.EForbidden: http.StatusForbidden, + influxdb.ETooManyRequests: http.StatusTooManyRequests, + influxdb.EUnauthorized: http.StatusUnauthorized, + influxdb.EMethodNotAllowed: http.StatusMethodNotAllowed, + influxdb.ETooLarge: http.StatusRequestEntityTooLarge, +} + +var httpStatusCodeToInfluxDBError = map[int]string{} + +func init() { + for k, v := range influxDBErrorToStatusCode { + httpStatusCodeToInfluxDBError[v] = k + } +} From 097d761c3a3c77011f8134dc39c7c1c3a15f6dfa Mon Sep 17 00:00:00 2001 From: Johnny Steenbergen Date: Wed, 29 Apr 2020 21:50:52 -0700 Subject: [PATCH 2/4] feat(pkger): extend store with stack lists operation references: #17544 --- pkger/service.go | 1 + pkger/service_auth.go | 8 +++ pkger/service_logging.go | 25 ++++++- pkger/service_metrics.go | 6 ++ pkger/service_test.go | 4 ++ pkger/service_tracing.go | 14 ++++ pkger/store.go | 88 ++++++++++++++++++++++++ pkger/store_test.go | 140 +++++++++++++++++++++++++++++++++++++++ 8 files changed, 285 insertions(+), 1 deletion(-) diff --git a/pkger/service.go b/pkger/service.go index 950fbb67e1..2b44631335 100644 --- a/pkger/service.go +++ b/pkger/service.go @@ -199,6 +199,7 @@ func WithVariableSVC(varSVC influxdb.VariableService) ServiceSetterFn { // Store is the storage behavior the Service depends on. type Store interface { CreateStack(ctx context.Context, stack Stack) error + ListStacks(ctx context.Context, orgID influxdb.ID, filter ListFilter) ([]Stack, error) ReadStackByID(ctx context.Context, id influxdb.ID) (Stack, error) UpdateStack(ctx context.Context, stack Stack) error DeleteStack(ctx context.Context, id influxdb.ID) error diff --git a/pkger/service_auth.go b/pkger/service_auth.go index e257a6e439..f871672a4a 100644 --- a/pkger/service_auth.go +++ b/pkger/service_auth.go @@ -36,6 +36,14 @@ func (s *authMW) InitStack(ctx context.Context, userID influxdb.ID, newStack Sta return s.next.InitStack(ctx, userID, newStack) } +func (s *authMW) ListStacks(ctx context.Context, orgID influxdb.ID, f ListFilter) ([]Stack, error) { + err := s.authAgent.OrgPermissions(ctx, orgID, influxdb.ReadAction) + if err != nil { + return nil, err + } + return s.next.ListStacks(ctx, orgID, f) +} + func (s *authMW) CreatePkg(ctx context.Context, setters ...CreatePkgSetFn) (*Pkg, error) { return s.next.CreatePkg(ctx, setters...) } diff --git a/pkger/service_logging.go b/pkger/service_logging.go index 4270ff1ecc..65cb1edfec 100644 --- a/pkger/service_logging.go +++ b/pkger/service_logging.go @@ -34,15 +34,38 @@ func (s *loggingMW) InitStack(ctx context.Context, userID influxdb.ID, newStack s.logger.Error( "failed to init stack", zap.Error(err), - zap.Duration("took", time.Since(start)), zap.Stringer("orgID", newStack.OrgID), zap.Stringer("userID", userID), zap.Strings("urls", newStack.URLs), + zap.Duration("took", time.Since(start)), ) }(time.Now()) return s.next.InitStack(ctx, userID, newStack) } +func (s *loggingMW) ListStacks(ctx context.Context, orgID influxdb.ID, f ListFilter) (stacks []Stack, err error) { + defer func(start time.Time) { + if err == nil { + return + } + + var stackIDs []string + for _, id := range f.StackIDs { + stackIDs = append(stackIDs, id.String()) + } + + s.logger.Error( + "failed to list stacks", + zap.Error(err), + zap.Stringer("orgID", orgID), + zap.Strings("stackIDs", stackIDs), + zap.Strings("names", f.Names), + zap.Duration("took", time.Since(start)), + ) + }(time.Now()) + return s.next.ListStacks(ctx, orgID, f) +} + func (s *loggingMW) CreatePkg(ctx context.Context, setters ...CreatePkgSetFn) (pkg *Pkg, err error) { defer func(start time.Time) { dur := zap.Duration("took", time.Since(start)) diff --git a/pkger/service_metrics.go b/pkger/service_metrics.go index 49fcd1dccc..e573b6ed59 100644 --- a/pkger/service_metrics.go +++ b/pkger/service_metrics.go @@ -33,6 +33,12 @@ func (s *mwMetrics) InitStack(ctx context.Context, userID influxdb.ID, newStack return stack, rec(err) } +func (s *mwMetrics) ListStacks(ctx context.Context, orgID influxdb.ID, f ListFilter) ([]Stack, error) { + rec := s.rec.Record("list_stacks") + stacks, err := s.next.ListStacks(ctx, orgID, f) + return stacks, rec(err) +} + func (s *mwMetrics) CreatePkg(ctx context.Context, setters ...CreatePkgSetFn) (*Pkg, error) { rec := s.rec.Record("create_pkg") pkg, err := s.next.CreatePkg(ctx, setters...) diff --git a/pkger/service_test.go b/pkger/service_test.go index fe72c0d296..2d98354e1b 100644 --- a/pkger/service_test.go +++ b/pkger/service_test.go @@ -3234,6 +3234,10 @@ func (s *fakeStore) CreateStack(ctx context.Context, stack Stack) error { panic("not implemented") } +func (s *fakeStore) ListStacks(ctx context.Context, orgID influxdb.ID, f ListFilter) ([]Stack, error) { + panic("not implemented") +} + func (s *fakeStore) ReadStackByID(ctx context.Context, id influxdb.ID) (Stack, error) { panic("not implemented") } diff --git a/pkger/service_tracing.go b/pkger/service_tracing.go index ad7eaf0223..2c5e3304e7 100644 --- a/pkger/service_tracing.go +++ b/pkger/service_tracing.go @@ -3,6 +3,8 @@ package pkger import ( "context" + "github.com/opentracing/opentracing-go/log" + "github.com/influxdata/influxdb/v2" "github.com/influxdata/influxdb/v2/kit/tracing" ) @@ -26,6 +28,18 @@ func (s *traceMW) InitStack(ctx context.Context, userID influxdb.ID, newStack St return s.next.InitStack(ctx, userID, newStack) } +func (s *traceMW) ListStacks(ctx context.Context, orgID influxdb.ID, f ListFilter) ([]Stack, error) { + span, ctx := tracing.StartSpanFromContextWithOperationName(ctx, "ListStacks") + defer span.Finish() + + stacks, err := s.next.ListStacks(ctx, orgID, f) + span.LogFields( + log.String("org_id", orgID.String()), + log.Int("num_stacks", len(stacks)), + ) + return stacks, err +} + func (s *traceMW) CreatePkg(ctx context.Context, setters ...CreatePkgSetFn) (pkg *Pkg, err error) { span, ctx := tracing.StartSpanFromContextWithOperationName(ctx, "CreatePkg") defer span.Finish() diff --git a/pkger/store.go b/pkger/store.go index 61e830d348..6b8ff74033 100644 --- a/pkger/store.go +++ b/pkger/store.go @@ -1,6 +1,7 @@ package pkger import ( + "bytes" "context" "encoding/json" "time" @@ -75,6 +76,93 @@ func (s *StoreKV) CreateStack(ctx context.Context, stack Stack) error { return s.put(ctx, stack, kv.PutNew()) } +// ListStacks returns a list of stacks. +func (s *StoreKV) ListStacks(ctx context.Context, orgID influxdb.ID, f ListFilter) ([]Stack, error) { + if len(f.StackIDs) > 0 && len(f.Names) == 0 { + return s.listStacksByID(ctx, orgID, f.StackIDs) + } + + filterFn, err := storeListFilterFn(orgID, f) + if err != nil { + return nil, err + } + + var stacks []Stack + err = s.kvStore.View(ctx, func(tx kv.Tx) error { + return s.indexBase.Find(ctx, tx, kv.FindOpts{ + CaptureFn: func(key []byte, decodedVal interface{}) error { + stack, err := convertStackEntToStack(decodedVal.(*entStack)) + if err != nil { + return err + } + stacks = append(stacks, stack) + return nil + }, + FilterEntFn: func(key []byte, decodedVal interface{}) bool { + st := decodedVal.(*entStack) + return filterFn(st) + }, + }) + }) + if err != nil { + return nil, err + } + return stacks, nil +} + +func storeListFilterFn(orgID influxdb.ID, f ListFilter) (func(*entStack) bool, error) { + orgIDEncoded, err := orgID.Encode() + if err != nil { + return nil, err + } + + mIDs := make(map[string]bool) + for _, id := range f.StackIDs { + b, err := id.Encode() + if err != nil { + return nil, err + } + mIDs[string(b)] = true + } + + mNames := make(map[string]bool) + for _, name := range f.Names { + mNames[name] = true + } + + optionalFieldFilterFn := func(ent *entStack) bool { + id := string(ent.ID) + if len(mIDs) > 0 || len(mNames) > 0 { + return mIDs[id] || mNames[ent.Name] + } + return true + } + return func(st *entStack) bool { + return bytes.Equal(orgIDEncoded, st.OrgID) && optionalFieldFilterFn(st) + }, nil +} + +func (s *StoreKV) listStacksByID(ctx context.Context, orgID influxdb.ID, stackIDs []influxdb.ID) ([]Stack, error) { + var stacks []Stack + for _, id := range stackIDs { + st, err := s.ReadStackByID(ctx, id) + if influxdb.ErrorCode(err) == influxdb.ENotFound { + // since the stackIDs are a filter, if it is not found, we just continue + // on. If the user wants to verify the existence of a particular stack + // then it would be upon them to use the ReadByID call. + continue + } + if err != nil { + return nil, err + } + if orgID != st.OrgID { + continue + } + stacks = append(stacks, st) + } + return stacks, nil +} + // ReadStackByID reads a stack by the provided ID. func (s *StoreKV) ReadStackByID(ctx context.Context, id influxdb.ID) (Stack, error) { var stack Stack diff --git a/pkger/store_test.go b/pkger/store_test.go index 1120234920..6c30d47b0b 100644 --- a/pkger/store_test.go +++ b/pkger/store_test.go @@ -83,6 +83,146 @@ func TestStoreKV(t *testing.T) { }) }) + t.Run("list stacks", func(t *testing.T) { + defer inMemStore.Flush(context.Background()) + + storeKV := pkger.NewStoreKV(inMemStore) + + const orgID1 = 1 + const orgID2 = 2 + seedEntities(t, storeKV, + pkger.Stack{ + ID: 1, + OrgID: orgID1, + Name: "first_name", + }, + pkger.Stack{ + ID: 2, + OrgID: orgID2, + Name: "first_name", + }, + pkger.Stack{ + ID: 3, + OrgID: orgID1, + Name: "second_name", + }, + pkger.Stack{ + ID: 4, + OrgID: orgID2, + Name: "second_name", + }, + ) + + tests := []struct { + name string + orgID influxdb.ID + filter pkger.ListFilter + expected []pkger.Stack + }{ + { + name: "by org id", + orgID: orgID1, + expected: []pkger.Stack{ + { + ID: 1, + OrgID: orgID1, + Name: "first_name", + }, + { + ID: 3, + OrgID: orgID1, + Name: "second_name", + }, + }, + }, + { + name: "by stack ids", + orgID: orgID1, + filter: pkger.ListFilter{ + StackIDs: []influxdb.ID{1, 3}, + }, + expected: []pkger.Stack{ + { + ID: 1, + OrgID: orgID1, + Name: "first_name", + }, + { + ID: 3, + OrgID: orgID1, + Name: "second_name", + }, + }, + }, + { + name: "by stack ids skips ids that belong to different organization", + orgID: orgID1, + filter: pkger.ListFilter{ + StackIDs: []influxdb.ID{1, 2, 4}, + }, + expected: []pkger.Stack{{ + ID: 1, + OrgID: orgID1, + Name: "first_name", + }}, + }, + { + name: "stack ids that do not exist are skipped", + orgID: orgID1, + filter: pkger.ListFilter{ + StackIDs: []influxdb.ID{1, 9000}, + }, + expected: []pkger.Stack{{ + ID: 1, + OrgID: orgID1, + Name: "first_name", + }}, + }, + { + name: "by name", + orgID: orgID1, + filter: pkger.ListFilter{ + Names: []string{"first_name"}, + }, + expected: []pkger.Stack{{ + ID: 1, + OrgID: orgID1, + Name: "first_name", + }}, + }, + { + name: "by name and id", + orgID: orgID1, + filter: pkger.ListFilter{ + StackIDs: []influxdb.ID{3}, + Names: []string{"first_name"}, + }, + expected: []pkger.Stack{ + { + ID: 1, + OrgID: orgID1, + Name: "first_name", + }, + { + ID: 3, + OrgID: orgID1, + Name: "second_name", + }, + }, + }, + } + + for _, tt := range tests { + fn := func(t *testing.T) { + stacks, err := storeKV.ListStacks(context.Background(), tt.orgID, tt.filter) + require.NoError(t, err) + assert.Equal(t, tt.expected, stacks) + } + + t.Run(tt.name, fn) + } + }) + t.Run("read a stack", func(t *testing.T) { defer inMemStore.Flush(context.Background()) From c7e97f1625ab3d5af93f82f50136328d07fb4dca Mon Sep 17 00:00:00 2001 From: Johnny Steenbergen Date: Wed, 29 Apr 2020 23:37:39 -0700 Subject: [PATCH 3/4] feat(pkger): add service and http server integreations for stack lists references: #17554 --- cmd/influx/pkg_test.go | 4 + cmd/influxd/launcher/pkger_test.go | 50 ++++++++ http/swagger.yml | 78 ++++++++++++ pkger/http_remote_service.go | 21 +++ pkger/http_server.go | 64 +++++++++- pkger/http_server_test.go | 197 ++++++++++++++++++++++++++++- pkger/service.go | 13 ++ pkger/service_tracing.go | 3 +- pkger/store.go | 3 +- 9 files changed, 425 insertions(+), 8 deletions(-) diff --git a/cmd/influx/pkg_test.go b/cmd/influx/pkg_test.go index 483ed33f81..1f4ed77ada 100644 --- a/cmd/influx/pkg_test.go +++ b/cmd/influx/pkg_test.go @@ -715,6 +715,10 @@ func (f *fakePkgSVC) InitStack(ctx context.Context, userID influxdb.ID, stack pk panic("not implemented") } +func (f *fakePkgSVC) ListStacks(ctx context.Context, orgID influxdb.ID, filter pkger.ListFilter) ([]pkger.Stack, error) { + panic("not implemented") +} + func (f *fakePkgSVC) CreatePkg(ctx context.Context, setters ...pkger.CreatePkgSetFn) (*pkger.Pkg, error) { if f.createFn != nil { return f.createFn(ctx, setters...) diff --git a/cmd/influxd/launcher/pkger_test.go b/cmd/influxd/launcher/pkger_test.go index 5672cd3f95..c82cc37c9a 100644 --- a/cmd/influxd/launcher/pkger_test.go +++ b/cmd/influxd/launcher/pkger_test.go @@ -179,6 +179,56 @@ func TestLauncher_Pkger(t *testing.T) { assert.NotZero(t, newStack.CRUDLog) }) + t.Run("list stacks", func(t *testing.T) { + // seed platform with stacks + newStack1, err := svc.InitStack(ctx, l.User.ID, pkger.Stack{ + OrgID: l.Org.ID, + Name: "first stack", + }) + require.NoError(t, err) + newStack2, err := svc.InitStack(ctx, l.User.ID, pkger.Stack{ + OrgID: l.Org.ID, + Name: "second stack", + }) + require.NoError(t, err) + + containsStack := func(t *testing.T, haystack []pkger.Stack, needle pkger.Stack) { + t.Helper() + for _, hay := range haystack { + if hay.ID == needle.ID { + return + } + } + require.FailNowf(t, "did not find expected stack", "got: %+v", needle) + } + + t.Run("returns all stacks when no filter args provided", func(t *testing.T) { + stacks, err := svc.ListStacks(ctx, l.Org.ID, pkger.ListFilter{}) + require.NoError(t, err) + + containsStack(t, stacks, newStack1) + containsStack(t, stacks, newStack2) + }) + + t.Run("filters stacks by ID filter", func(t *testing.T) { + stacks, err := svc.ListStacks(ctx, l.Org.ID, pkger.ListFilter{ + StackIDs: []influxdb.ID{newStack1.ID}, + }) + require.NoError(t, err) + require.Len(t, stacks, 1) + containsStack(t, stacks, newStack1) + }) + + t.Run("filter stacks by names", func(t *testing.T) { + stacks, err := svc.ListStacks(ctx, l.Org.ID, pkger.ListFilter{ + Names: []string{newStack2.Name}, + }) + require.NoError(t, err) + require.Len(t, stacks, 1) + containsStack(t, stacks, newStack2) + }) + }) + t.Run("apply with only a stackID succeeds when stack has URLs", func(t *testing.T) { svr := httptest.NewServer(nethttp.HandlerFunc(func(w nethttp.ResponseWriter, r *nethttp.Request) { pkg := newPkg(newBucketObject("bucket_0", "", "")) diff --git a/http/swagger.yml b/http/swagger.yml index 60ddba4046..d21de512a8 100644 --- a/http/swagger.yml +++ b/http/swagger.yml @@ -4375,6 +4375,84 @@ paths: schema: $ref: "#/components/schemas/Error" /packages/stacks: + get: + operationId: ListStacks + tags: + - InfluxPackages + summary: Grab a list of installed Influx packages + parameters: + - in: query + name: orgID + required: true + schema: + type: string + description: The organization id of the stacks + - in: query + name: name + schema: + type: string + description: A collection of names to filter the list by. + - in: query + name: stackID + schema: + type: string + description: A collection of stackIDs to filter the list by. + responses: + '200': + description: Influx stacks found + content: + application/json: + schema: + type: array + items: + type: object + properties: + id: + type: string + orgID: + type: string + name: + type: string + description: + type: string + urls: + type: array + items: + type: string + createdAt: + type: string + format: date-time + readOnly: true + updatedAt: + type: string + format: date-time + readOnly: true + resources: + type: object + properties: + apiVersion: + type: string + resourceID: + type: string + kind: + type: string + pkgName: + type: string + associations: + type: array + items: + type: object + properties: + kind: + type: string + pkgName: + type: string + default: + description: Unexpected error + content: + application/json: + schema: + $ref: "#/components/schemas/Error" post: operationId: CreateStack tags: diff --git a/pkger/http_remote_service.go b/pkger/http_remote_service.go index 2d6b1103db..8161d85f0e 100644 --- a/pkger/http_remote_service.go +++ b/pkger/http_remote_service.go @@ -55,6 +55,27 @@ func (s *HTTPRemoteService) InitStack(ctx context.Context, userID influxdb.ID, s return newStack, nil } +func (s *HTTPRemoteService) ListStacks(ctx context.Context, orgID influxdb.ID, f ListFilter) ([]Stack, error) { + queryParams := [][2]string{{"orgID", orgID.String()}} + for _, name := range f.Names { + queryParams = append(queryParams, [2]string{"name", name}) + } + for _, stackID := range f.StackIDs { + queryParams = append(queryParams, [2]string{"stackID", stackID.String()}) + } + + var resp RespListStacks + err := s.Client. + Get(RoutePrefix, "/stacks"). + QueryParams(queryParams...). + DecodeJSON(&resp). + Do(ctx) + if err != nil { + return nil, err + } + return resp.Stacks, nil +} + // CreatePkg will produce a pkg from the parameters provided. func (s *HTTPRemoteService) CreatePkg(ctx context.Context, setters ...CreatePkgSetFn) (*Pkg, error) { var opt CreateOpt diff --git a/pkger/http_server.go b/pkger/http_server.go index be90dbeae6..7be0749a24 100644 --- a/pkger/http_server.go +++ b/pkger/http_server.go @@ -50,7 +50,10 @@ func NewHTTPServer(log *zap.Logger, svc SVC) *HTTPServer { Post("/", svr.createPkg) r.With(middleware.SetHeader("Content-Type", "application/json; charset=utf-8")). Post("/apply", svr.applyPkg) - r.Post("/stacks", svr.createStack) + r.Route("/stacks", func(r chi.Router) { + r.Post("/", svr.createStack) + r.Get("/", svr.listStacks) + }) } svr.Router = r @@ -62,6 +65,65 @@ func (s *HTTPServer) Prefix() string { return RoutePrefix } +// RespListStacks is the HTTP response for a stack list call. +type RespListStacks struct { + Stacks []Stack `json:"stacks"` +} + +func (s *HTTPServer) listStacks(w http.ResponseWriter, r *http.Request) { + q := r.URL.Query() + + rawOrgID := q.Get("orgID") + orgID, err := influxdb.IDFromString(rawOrgID) + if err != nil { + s.api.Err(w, &influxdb.Error{ + Code: influxdb.EInvalid, + Msg: fmt.Sprintf("organization id[%q] is invalid", rawOrgID), + Err: err, + }) + return + } + + if err := r.ParseForm(); err != nil { + s.api.Err(w, &influxdb.Error{ + Code: influxdb.EInvalid, + Msg: "failed to parse form from encoded url", + Err: err, + }) + return + } + + filter := ListFilter{ + Names: r.Form["name"], + } + + for _, idRaw := range r.Form["stackID"] { + id, err := influxdb.IDFromString(idRaw) + if err != nil { + s.api.Err(w, &influxdb.Error{ + Code: influxdb.EInvalid, + Msg: fmt.Sprintf("stack ID[%q] provided is invalid", idRaw), + Err: err, + }) + return + } + filter.StackIDs = append(filter.StackIDs, *id) + } + + stacks, err := s.svc.ListStacks(r.Context(), *orgID, filter) + if err != nil { + s.api.Err(w, err) + return + } + if stacks == nil { + stacks = []Stack{} + } + + s.api.Respond(w, http.StatusOK, RespListStacks{ + Stacks: stacks, + }) +} + // ReqCreateStack is a request body for a create stack call. type ReqCreateStack struct { OrgID string `json:"orgID"` diff --git a/pkger/http_server_test.go b/pkger/http_server_test.go index 321cea7727..a396ee15c9 100644 --- a/pkger/http_server_test.go +++ b/pkger/http_server_test.go @@ -559,6 +559,189 @@ func TestPkgerHTTPServer(t *testing.T) { } }) }) + + t.Run("list a stack", func(t *testing.T) { + t.Run("should successfully return with valid req body", func(t *testing.T) { + const expectedOrgID = 3 + + svc := &fakeSVC{ + listStacksFn: func(ctx context.Context, orgID influxdb.ID, filter pkger.ListFilter) ([]pkger.Stack, error) { + if orgID != expectedOrgID { + return nil, nil + } + + if len(filter.Names) > 0 && len(filter.StackIDs) == 0 { + var stacks []pkger.Stack + for i, name := range filter.Names { + stacks = append(stacks, pkger.Stack{ + ID: influxdb.ID(i + 1), + OrgID: expectedOrgID, + Name: name, + }) + } + return stacks, nil + } + + if len(filter.StackIDs) > 0 && len(filter.Names) == 0 { + var stacks []pkger.Stack + for _, stackID := range filter.StackIDs { + stacks = append(stacks, pkger.Stack{ + ID: stackID, + OrgID: expectedOrgID, + }) + } + return stacks, nil + } + + return []pkger.Stack{{ + ID: 1, + OrgID: expectedOrgID, + Name: "stack_1", + }}, nil + }, + } + pkgHandler := pkger.NewHTTPServer(zap.NewNop(), svc) + svr := newMountedHandler(pkgHandler, 1) + + tests := []struct { + name string + queryArgs string + expectedStacks []pkger.Stack + }{ + { + name: "with org ID that has stacks", + queryArgs: "orgID=" + influxdb.ID(expectedOrgID).String(), + expectedStacks: []pkger.Stack{{ + ID: 1, + OrgID: expectedOrgID, + Name: "stack_1", + }}, + }, + { + name: "with orgID with no stacks", + queryArgs: "orgID=" + influxdb.ID(9000).String(), + expectedStacks: []pkger.Stack{}, + }, + { + name: "with names", + queryArgs: "name=name_stack&name=threeve&orgID=" + influxdb.ID(expectedOrgID).String(), + expectedStacks: []pkger.Stack{ + { + ID: 1, + OrgID: expectedOrgID, + Name: "name_stack", + }, + { + ID: 2, + OrgID: expectedOrgID, + Name: "threeve", + }, + }, + }, + { + name: "with ids", + queryArgs: fmt.Sprintf("stackID=%s&stackID=%s&orgID=%s", influxdb.ID(1), influxdb.ID(2), influxdb.ID(expectedOrgID)), + expectedStacks: []pkger.Stack{ + { + ID: 1, + OrgID: expectedOrgID, + }, + { + ID: 2, + OrgID: expectedOrgID, + }, + }, + }, + } + + for _, tt := range tests { + fn := func(t *testing.T) { + testttp. + Get(t, "/api/v2/packages/stacks?"+tt.queryArgs). + Headers("Content-Type", "application/x-www-form-urlencoded"). + Do(svr). + ExpectStatus(http.StatusOK). + ExpectBody(func(buf *bytes.Buffer) { + var resp pkger.RespListStacks + decodeBody(t, buf, &resp) + + assert.Equal(t, tt.expectedStacks, resp.Stacks) + }) + } + + t.Run(tt.name, fn) + } + }) + + t.Run("error cases", func(t *testing.T) { + tests := []struct { + name string + reqBody pkger.ReqCreateStack + expectedStatus int + svc pkger.SVC + }{ + { + name: "bad org id", + reqBody: pkger.ReqCreateStack{ + OrgID: "invalid id", + }, + expectedStatus: http.StatusBadRequest, + }, + { + name: "bad url", + reqBody: pkger.ReqCreateStack{ + OrgID: influxdb.ID(3).String(), + URLs: []string{"invalid @% url"}, + }, + expectedStatus: http.StatusBadRequest, + }, + { + name: "translates svc conflict error", + reqBody: pkger.ReqCreateStack{OrgID: influxdb.ID(3).String()}, + svc: &fakeSVC{ + initStack: func(ctx context.Context, userID influxdb.ID, stack pkger.Stack) (pkger.Stack, error) { + return pkger.Stack{}, &influxdb.Error{Code: influxdb.EConflict} + }, + }, + expectedStatus: http.StatusUnprocessableEntity, + }, + { + name: "translates svc internal error", + reqBody: pkger.ReqCreateStack{OrgID: influxdb.ID(3).String()}, + svc: &fakeSVC{ + initStack: func(ctx context.Context, userID influxdb.ID, stack pkger.Stack) (pkger.Stack, error) { + return pkger.Stack{}, &influxdb.Error{Code: influxdb.EInternal} + }, + }, + expectedStatus: http.StatusInternalServerError, + }, + } + + for _, tt := range tests { + fn := func(t *testing.T) { + svc := tt.svc + if svc == nil { + svc = &fakeSVC{ + initStack: func(ctx context.Context, userID influxdb.ID, stack pkger.Stack) (pkger.Stack, error) { + return stack, nil + }, + } + } + + pkgHandler := pkger.NewHTTPServer(zap.NewNop(), svc) + svr := newMountedHandler(pkgHandler, 1) + + testttp. + PostJSON(t, "/api/v2/packages/stacks", tt.reqBody). + Headers("Content-Type", "application/json"). + Do(svr). + ExpectStatus(tt.expectedStatus) + } + + t.Run(tt.name, fn) + } + }) + }) } func bucketPkgKinds(t *testing.T, encoding pkger.Encoding) []byte { @@ -639,9 +822,10 @@ func decodeBody(t *testing.T, r io.Reader, v interface{}) { } type fakeSVC struct { - initStack func(ctx context.Context, userID influxdb.ID, stack pkger.Stack) (pkger.Stack, error) - dryRunFn func(ctx context.Context, orgID, userID influxdb.ID, pkg *pkger.Pkg, opts ...pkger.ApplyOptFn) (pkger.Summary, pkger.Diff, error) - applyFn func(ctx context.Context, orgID, userID influxdb.ID, pkg *pkger.Pkg, opts ...pkger.ApplyOptFn) (pkger.Summary, pkger.Diff, error) + initStack func(ctx context.Context, userID influxdb.ID, stack pkger.Stack) (pkger.Stack, error) + listStacksFn func(ctx context.Context, orgID influxdb.ID, filter pkger.ListFilter) ([]pkger.Stack, error) + dryRunFn func(ctx context.Context, orgID, userID influxdb.ID, pkg *pkger.Pkg, opts ...pkger.ApplyOptFn) (pkger.Summary, pkger.Diff, error) + applyFn func(ctx context.Context, orgID, userID influxdb.ID, pkg *pkger.Pkg, opts ...pkger.ApplyOptFn) (pkger.Summary, pkger.Diff, error) } var _ pkger.SVC = (*fakeSVC)(nil) @@ -653,6 +837,13 @@ func (f *fakeSVC) InitStack(ctx context.Context, userID influxdb.ID, stack pkger return f.initStack(ctx, userID, stack) } +func (f *fakeSVC) ListStacks(ctx context.Context, orgID influxdb.ID, filter pkger.ListFilter) ([]pkger.Stack, error) { + if f.listStacksFn == nil { + panic("not implemented") + } + return f.listStacksFn(ctx, orgID, filter) +} + func (f *fakeSVC) CreatePkg(ctx context.Context, setters ...pkger.CreatePkgSetFn) (*pkger.Pkg, error) { panic("not implemented") } diff --git a/pkger/service.go b/pkger/service.go index 2b44631335..a6939c283e 100644 --- a/pkger/service.go +++ b/pkger/service.go @@ -59,6 +59,8 @@ const ResourceTypeStack influxdb.ResourceType = "stack" // SVC is the packages service interface. type SVC interface { InitStack(ctx context.Context, userID influxdb.ID, stack Stack) (Stack, error) + ListStacks(ctx context.Context, orgID influxdb.ID, filter ListFilter) ([]Stack, error) + CreatePkg(ctx context.Context, setters ...CreatePkgSetFn) (*Pkg, error) DryRun(ctx context.Context, orgID, userID influxdb.ID, pkg *Pkg, opts ...ApplyOptFn) (Summary, Diff, error) Apply(ctx context.Context, orgID, userID influxdb.ID, pkg *Pkg, opts ...ApplyOptFn) (Summary, Diff, error) @@ -296,6 +298,17 @@ func (s *Service) InitStack(ctx context.Context, userID influxdb.ID, stack Stack return stack, nil } +// ListFilter are filter options for filtering stacks from being returned. +type ListFilter struct { + StackIDs []influxdb.ID + Names []string +} + +// ListStacks returns a list of stacks. +func (s *Service) ListStacks(ctx context.Context, orgID influxdb.ID, f ListFilter) ([]Stack, error) { + return s.store.ListStacks(ctx, orgID, f) +} + type ( // CreatePkgSetFn is a functional input for setting the pkg fields. CreatePkgSetFn func(opt *CreateOpt) error diff --git a/pkger/service_tracing.go b/pkger/service_tracing.go index 2c5e3304e7..c57fc8aa32 100644 --- a/pkger/service_tracing.go +++ b/pkger/service_tracing.go @@ -3,10 +3,9 @@ package pkger import ( "context" - "github.com/opentracing/opentracing-go/log" - "github.com/influxdata/influxdb/v2" "github.com/influxdata/influxdb/v2/kit/tracing" + "github.com/opentracing/opentracing-go/log" ) type traceMW struct { diff --git a/pkger/store.go b/pkger/store.go index 6b8ff74033..756fd7e9d4 100644 --- a/pkger/store.go +++ b/pkger/store.go @@ -131,9 +131,8 @@ func storeListFilterFn(orgID influxdb.ID, f ListFilter) (func(*entStack) bool, e } optionalFieldFilterFn := func(ent *entStack) bool { - id := string(ent.ID) if len(mIDs) > 0 || len(mNames) > 0 { - return mIDs[id] || mNames[ent.Name] + return mIDs[string(ent.ID)] || mNames[ent.Name] } return true } From 22899aee259055b113a7b124a7dcdce448e79950 Mon Sep 17 00:00:00 2001 From: Alex Boatwright Date: Thu, 30 Apr 2020 13:59:38 -0700 Subject: [PATCH 4/4] fix: load variables one at a time (#17875) --- ui/cypress/e2e/dashboardsView.test.ts | 111 ++++++++++++++++++ ui/cypress/support/commands.ts | 10 +- ui/src/shared/components/RefreshingView.tsx | 28 +---- ui/src/shared/components/TimeSeries.tsx | 43 ++++++- ui/src/variables/actions/thunks.ts | 43 +++---- .../variables/components/VariableDropdown.tsx | 25 +++- ui/src/variables/selectors/index.tsx | 10 +- ui/src/variables/utils/hydrateVars.test.ts | 1 - ui/src/variables/utils/hydrateVars.ts | 94 ++++++++++++--- 9 files changed, 276 insertions(+), 89 deletions(-) diff --git a/ui/cypress/e2e/dashboardsView.test.ts b/ui/cypress/e2e/dashboardsView.test.ts index 07a0483015..072466abfe 100644 --- a/ui/cypress/e2e/dashboardsView.test.ts +++ b/ui/cypress/e2e/dashboardsView.test.ts @@ -296,6 +296,117 @@ describe('Dashboard', () => { }) }) + /*\ + built to approximate an instance with docker metrics, + operating with the variables: + + depbuck: + from(bucket: v.buckets) + |> range(start: v.timeRangeStart, stop: v.timeRangeStop) + |> filter(fn: (r) => r["_measurement"] == "docker_container_cpu") + |> keep(columns: ["container_name"]) + |> rename(columns: {"container_name": "_value"}) + |> last() + |> group() + + buckets: + buckets() + |> filter(fn: (r) => r.name !~ /^_/) + |> rename(columns: {name: "_value"}) + |> keep(columns: ["_value"]) + + and a dashboard built of : + cell one: + from(bucket: v.buckets) + |> range(start: v.timeRangeStart, stop: v.timeRangeStop) + |> filter(fn: (r) => r["_measurement"] == "docker_container_cpu") + |> filter(fn: (r) => r["_field"] == "usage_percent") + + cell two: + from(bucket: v.buckets) + |> range(start: v.timeRangeStart, stop: v.timeRangeStop) + |> filter(fn: (r) => r["_measurement"] == "docker_container_cpu") + |> filter(fn: (r) => r["_field"] == "usage_percent") + |> filter(fn: (r) => r["container_name"] == v.depbuck) + + with only 4 api queries being sent to fulfill it all + + \*/ + it('can load dependent queries without much fuss', () => { + cy.get('@org').then(({id: orgID}: Organization) => { + cy.createDashboard(orgID).then(({body: dashboard}) => { + const now = Date.now() + cy.writeData([ + `test,container_name=cool dopeness=12 ${now - 1000}000000`, + `test,container_name=beans dopeness=18 ${now - 1200}000000`, + `test,container_name=cool dopeness=14 ${now - 1400}000000`, + `test,container_name=beans dopeness=10 ${now - 1600}000000`, + ]) + cy.createCSVVariable(orgID, 'static', ['beans', 'defbuck']) + cy.createQueryVariable( + orgID, + 'dependent', + `from(bucket: v.static) + |> range(start: v.timeRangeStart, stop: v.timeRangeStop) + |> filter(fn: (r) => r["_measurement"] == "test") + |> keep(columns: ["container_name"]) + |> rename(columns: {"container_name": "_value"}) + |> last() + |> group()` + ) + + cy.fixture('routes').then(({orgs}) => { + cy.visit(`${orgs}/${orgID}/dashboards/${dashboard.id}`) + }) + }) + }) + + cy.getByTestID('add-cell--button').click() + cy.getByTestID('switch-to-script-editor').should('be.visible') + cy.getByTestID('switch-to-script-editor').click() + cy.getByTestID('toolbar-tab').click() + + cy + .getByTestID('flux-editor') + .should('be.visible') + .click() + .focused().type(`from(bucket: v.static) +|> range(start: v.timeRangeStart, stop: v.timeRangeStop) +|> filter(fn: (r) => r["_measurement"] == "test") +|> filter(fn: (r) => r["_field"] == "dopeness") +|> filter(fn: (r) => r["container_name"] == v.dependent)`) + cy.getByTestID('save-cell--button').click() + + // the default bucket selection should have no results + cy.getByTestID('variable-dropdown') + .eq(0) + .should('contain', 'beans') + + // and cause the rest to exist in loading states + cy.getByTestID('variable-dropdown') + .eq(1) + .should('contain', 'Loading') + + cy.getByTestID('cell--view-empty') + + // But selecting a nonempty bucket should load some data + cy.getByTestID('variable-dropdown--button') + .eq(0) + .click() + cy.get(`#defbuck`).click() + + // default select the first result + cy.getByTestID('variable-dropdown') + .eq(1) + .should('contain', 'beans') + + // and also load the second result + cy.getByTestID('variable-dropdown--button') + .eq(1) + .click() + cy.get(`#cool`).click() + }) + it('can create a view through the API', () => { cy.get('@org').then(({id: orgID}: Organization) => { cy.createDashWithViewAndVar(orgID).then(() => { diff --git a/ui/cypress/support/commands.ts b/ui/cypress/support/commands.ts index 1d774638b3..cde6102006 100644 --- a/ui/cypress/support/commands.ts +++ b/ui/cypress/support/commands.ts @@ -173,13 +173,14 @@ export const createTask = ( export const createQueryVariable = ( orgID?: string, - name: string = 'Little Variable' + name: string = 'Little Variable', + query?: string ): Cypress.Chainable => { const argumentsObj = { type: 'query', values: { language: 'flux', - query: `filter(fn: (r) => r._field == "cpu")`, + query: query || `filter(fn: (r) => r._field == "cpu")`, }, } @@ -196,11 +197,12 @@ export const createQueryVariable = ( export const createCSVVariable = ( orgID?: string, - name: string = 'CSVVariable' + name: string = 'CSVVariable', + csv?: string[] ): Cypress.Chainable => { const argumentsObj = { type: 'constant', - values: ['c1', 'c2', 'c3', 'c4'], + values: csv || ['c1', 'c2', 'c3', 'c4'], } return cy.request({ diff --git a/ui/src/shared/components/RefreshingView.tsx b/ui/src/shared/components/RefreshingView.tsx index fe01082dfd..6d62fd3a42 100644 --- a/ui/src/shared/components/RefreshingView.tsx +++ b/ui/src/shared/components/RefreshingView.tsx @@ -11,11 +11,8 @@ import ViewSwitcher from 'src/shared/components/ViewSwitcher' // Utils import {GlobalAutoRefresher} from 'src/utils/AutoRefresher' import {getTimeRange} from 'src/dashboards/selectors' -import {getRangeVariable} from 'src/variables/utils/getTimeRangeVars' -import {getVariables, asAssignment} from 'src/variables/selectors' import {checkResultsLength} from 'src/shared/utils/vis' import {getActiveTimeRange} from 'src/timeMachine/selectors/index' -import {TIME_RANGE_START, TIME_RANGE_STOP} from 'src/variables/constants' // Types import { @@ -23,7 +20,6 @@ import { TimeZone, AppState, DashboardQuery, - VariableAssignment, QueryViewProperties, Theme, } from 'src/types' @@ -38,7 +34,6 @@ interface StateProps { timeRange: TimeRange ranges: TimeRange | null timeZone: TimeZone - variableAssignments: VariableAssignment[] } interface State { @@ -68,14 +63,7 @@ class RefreshingView extends PureComponent { } public render() { - const { - ranges, - properties, - manualRefresh, - timeZone, - variableAssignments, - theme, - } = this.props + const {ranges, properties, manualRefresh, timeZone, theme} = this.props const {submitToken} = this.state return ( @@ -83,7 +71,6 @@ class RefreshingView extends PureComponent { submitToken={submitToken} queries={this.queries} key={manualRefresh} - variables={variableAssignments} > {({ giraffeResult, @@ -152,18 +139,6 @@ class RefreshingView extends PureComponent { const mstp = (state: AppState, ownProps: OwnProps): StateProps => { const timeRange = getTimeRange(state) - - // NOTE: cannot use getAllVariables here because the TimeSeries - // component appends it automatically. That should be fixed - const vars = getVariables(state) - const variableAssignments = [ - ...vars, - getRangeVariable(TIME_RANGE_START, timeRange), - getRangeVariable(TIME_RANGE_STOP, timeRange), - ] - .map(v => asAssignment(v)) - .filter(v => !!v) - const ranges = getActiveTimeRange(timeRange, ownProps.properties.queries) const {timeZone, theme} = state.app.persisted @@ -171,7 +146,6 @@ const mstp = (state: AppState, ownProps: OwnProps): StateProps => { timeRange, ranges, timeZone, - variableAssignments, theme, } } diff --git a/ui/src/shared/components/TimeSeries.tsx b/ui/src/shared/components/TimeSeries.tsx index c2d5c19abb..caf1d7bd44 100644 --- a/ui/src/shared/components/TimeSeries.tsx +++ b/ui/src/shared/components/TimeSeries.tsx @@ -19,6 +19,10 @@ import { import {runStatusesQuery} from 'src/alerting/utils/statusEvents' // Utils +import {getTimeRange} from 'src/dashboards/selectors' +import {getVariables, asAssignment} from 'src/variables/selectors' +import {getRangeVariable} from 'src/variables/utils/getTimeRangeVars' +import {isInQuery} from 'src/variables/utils/hydrateVars' import {getWindowVars} from 'src/variables/utils/getWindowVars' import {buildVarsOption} from 'src/variables/utils/buildVarsOption' import 'intersection-observer' @@ -27,6 +31,7 @@ import {getOrgIDFromBuckets} from 'src/timeMachine/actions/queries' // Constants import {rateLimitReached, resultTooLarge} from 'src/shared/copy/notifications' +import {TIME_RANGE_START, TIME_RANGE_STOP} from 'src/variables/constants' // Actions import {notify as notifyAction} from 'src/shared/actions/notifications' @@ -39,6 +44,7 @@ import { Bucket, ResourceType, DashboardQuery, + Variable, VariableAssignment, AppState, CancelBox, @@ -57,6 +63,7 @@ interface QueriesState { interface StateProps { queryLink: string buckets: Bucket[] + variables: Variable[] } interface OwnProps { @@ -187,14 +194,23 @@ class TimeSeries extends Component { // Cancel any existing queries this.pendingResults.forEach(({cancel}) => cancel()) + const usedVars = variables.filter(v => v.arguments.type !== 'system') + const waitList = usedVars.filter(v => v.status !== RemoteDataState.Done) + // If a variable is loading, and a cell requires it, leave the cell to never resolve, + // keeping it in a loading state until the variable is resolved + if (usedVars.length && waitList.length) { + await new Promise(() => {}) + } + + const vars = variables.map(v => asAssignment(v)) // Issue new queries this.pendingResults = queries.map(({text}) => { const orgID = getOrgIDFromBuckets(text, buckets) || this.props.params.orgID - const windowVars = getWindowVars(text, variables) - const extern = buildVarsOption([...variables, ...windowVars]) + const windowVars = getWindowVars(text, vars) + const extern = buildVarsOption([...vars, ...windowVars]) return runQuery(orgID, text, extern) }) @@ -204,7 +220,7 @@ class TimeSeries extends Component { let statuses = [] as StatusRow[][] if (check) { - const extern = buildVarsOption(variables) + const extern = buildVarsOption(vars) this.pendingCheckStatuses = runStatusesQuery( this.props.params.orgID, check.id, @@ -291,12 +307,27 @@ class TimeSeries extends Component { } } -const mstp = (state: AppState): StateProps => { - const {links} = state +const mstp = (state: AppState, props: OwnProps): StateProps => { + const timeRange = getTimeRange(state) + + // NOTE: cannot use getAllVariables here because the TimeSeries + // component appends it automatically. That should be fixed + // NOTE: limit the variables returned to those that are used, + // as this prevents resending when other queries get sent + const queries = props.queries.map(q => q.text).filter(t => !!t.trim()) + const vars = getVariables(state).filter(v => + queries.some(t => isInQuery(t, v)) + ) + const variables = [ + ...vars, + getRangeVariable(TIME_RANGE_START, timeRange), + getRangeVariable(TIME_RANGE_STOP, timeRange), + ] return { - queryLink: links.query.self, + queryLink: state.links.query.self, buckets: getAll(state, ResourceType.Buckets), + variables, } } diff --git a/ui/src/variables/actions/thunks.ts b/ui/src/variables/actions/thunks.ts index 74384ef6a8..06d829471e 100644 --- a/ui/src/variables/actions/thunks.ts +++ b/ui/src/variables/actions/thunks.ts @@ -118,6 +118,7 @@ export const getVariables = () => async ( } } +// TODO: make this context aware export const hydrateVariables = (skipCache?: boolean) => async ( dispatch: Dispatch, getState: GetState @@ -125,37 +126,25 @@ export const hydrateVariables = (skipCache?: boolean) => async ( const state = getState() const org = getOrg(state) const vars = getVariablesFromState(state) - const vals = await hydrateVars(vars, getAllVariablesFromState(state), { + const hydration = hydrateVars(vars, getAllVariablesFromState(state), { orgID: org.id, url: state.links.query.self, skipCache, - }).promise - - const lookup = vals.reduce((prev, curr) => { - prev[curr.id] = curr - return prev - }, {}) - - const updated = vars.map(vari => { - if (!lookup.hasOwnProperty(vari.id)) { - return vari - } - - return lookup[vari.id] }) - - await dispatch( - setVariables(RemoteDataState.Done, { - result: updated.map(v => v.id), - entities: { - variables: updated.reduce((prev, curr) => { - prev[curr.id] = curr - - return prev - }, {}), - }, - }) - ) + hydration.on('status', (variable, status) => { + if (status === RemoteDataState.Loading) { + dispatch(setVariable(variable.id, status)) + return + } + if (status === RemoteDataState.Done) { + const _variable = normalize( + variable, + variableSchema + ) + dispatch(setVariable(variable.id, RemoteDataState.Done, _variable)) + } + }) + await hydration.promise } export const getVariable = (id: string) => async ( diff --git a/ui/src/variables/components/VariableDropdown.tsx b/ui/src/variables/components/VariableDropdown.tsx index 95bc54e1c4..7abcdbfb2f 100644 --- a/ui/src/variables/components/VariableDropdown.tsx +++ b/ui/src/variables/components/VariableDropdown.tsx @@ -16,11 +16,12 @@ import {selectValue} from 'src/variables/actions/thunks' import {getVariable, normalizeValues} from 'src/variables/selectors' // Types -import {AppState} from 'src/types' +import {AppState, RemoteDataState} from 'src/types' interface StateProps { values: string[] selectedValue: string + status: RemoteDataState } interface DispatchProps { @@ -56,7 +57,7 @@ class VariableDropdown extends PureComponent { testID="variable-dropdown--button" status={dropdownStatus} > - {selectedValue || 'No Values'} + {this.selectedText} )} menu={onCollapse => ( @@ -94,15 +95,31 @@ class VariableDropdown extends PureComponent { onSelect() } } + + private get selectedText() { + const {selectedValue, status} = this.props + if (status === RemoteDataState.Loading) { + return 'Loading' + } + + if (selectedValue) { + return selectedValue + } + + return 'No Values' + } } const mstp = (state: AppState, props: OwnProps): StateProps => { const {variableID} = props - const variable = getVariable(state, variableID) + const selected = + variable.selected && variable.selected.length ? variable.selected[0] : null + return { + status: variable.status, values: normalizeValues(variable), - selectedValue: variable.selected[0], + selectedValue: selected, } } diff --git a/ui/src/variables/selectors/index.tsx b/ui/src/variables/selectors/index.tsx index 8b8385afbf..500ba8089a 100644 --- a/ui/src/variables/selectors/index.tsx +++ b/ui/src/variables/selectors/index.tsx @@ -165,12 +165,16 @@ export const getVariable = (state: AppState, variableID: string): Variable => { // Now validate that the selected value makes sense for // the current situation const vals = normalizeValues(vari) - if (vari.selected && !vals.includes(vari.selected[0])) { + vari = {...vari} + if ( + !vari.selected || + (vari.selected && vari.selected.length && !vals.includes(vari.selected[0])) + ) { vari.selected = [] } - if ((!vari.selected || !vari.selected.length) && vals.length) { - vari.selected = [vals[0]] + if (!vari.selected.length && vals.length) { + vari.selected.push(vals[0]) } return vari diff --git a/ui/src/variables/utils/hydrateVars.test.ts b/ui/src/variables/utils/hydrateVars.test.ts index 168aa80bb6..50b058fc41 100644 --- a/ui/src/variables/utils/hydrateVars.test.ts +++ b/ui/src/variables/utils/hydrateVars.test.ts @@ -237,7 +237,6 @@ describe('hydrate vars', () => { // Basic test for now, we would need an icky mock to assert that the // appropriate substitution is actually taking place - expect( actual.filter(v => v.id === 'a')[0].arguments.values.results ).toEqual(['aVal']) diff --git a/ui/src/variables/utils/hydrateVars.ts b/ui/src/variables/utils/hydrateVars.ts index 9d57226900..30f91f8f13 100644 --- a/ui/src/variables/utils/hydrateVars.ts +++ b/ui/src/variables/utils/hydrateVars.ts @@ -32,17 +32,25 @@ interface HydrateVarsOptions { skipCache?: boolean } +export interface EventedCancelBox extends CancelBox { + on?: any +} + export const createVariableGraph = ( allVariables: Variable[] ): VariableNode[] => { const nodesByID: {[variableID: string]: VariableNode} = allVariables.reduce( (prev, curr) => { + let status = RemoteDataState.Done + if (curr.arguments.type === 'query') { + status = RemoteDataState.NotStarted + } prev[curr.id] = { variable: curr, values: null, parents: [], children: [], - status: RemoteDataState.NotStarted, + status, cancel: () => {}, } return prev @@ -179,9 +187,11 @@ export const collectDescendants = ( This assumes that every descendant of this node has already been hydrated. */ +// TODO: figure out how to type the `on` function const hydrateVarsHelper = async ( node: VariableNode, - options: HydrateVarsOptions + options: HydrateVarsOptions, + on?: any ): Promise => { const variableType = node.variable.arguments.type @@ -202,6 +212,28 @@ const hydrateVarsHelper = async ( } } + if (variableType === 'system') { + return { + valueType: 'string', + values: node.variable.arguments.values, + selected: node.variable.selected, + } + } + + if (node.status !== RemoteDataState.Loading) { + node.status = RemoteDataState.Loading + on.fire('status', node.variable, node.status) + + collectAncestors(node) + .filter(parent => parent.variable.arguments.type === 'query') + .forEach(parent => { + if (parent.status !== RemoteDataState.Loading) { + parent.status = RemoteDataState.Loading + on.fire('status', parent.variable, parent.status) + } + }) + } + const descendants = collectDescendants(node) const assignments = descendants .map(node => asAssignment(node.variable)) @@ -225,6 +257,9 @@ const hydrateVarsHelper = async ( const values = await request.promise + // NOTE: do not fire `done` event here, as the value + // has not been properly hydrated yet + node.status = RemoteDataState.Done return values } @@ -233,17 +268,13 @@ const hydrateVarsHelper = async ( resolved (successfully or not). */ const readyToResolve = (parent: VariableNode): boolean => - parent.status === RemoteDataState.NotStarted && parent.children.every(child => child.status === RemoteDataState.Done) /* Find all `NotStarted` nodes in the graph that have no children. */ const findLeaves = (graph: VariableNode[]): VariableNode[] => - graph.filter( - node => - node.children.length === 0 && node.status === RemoteDataState.NotStarted - ) + graph.filter(node => node.children.length === 0) /* Given a node, attempt to find a cycle that the node is a part of. If no cycle @@ -359,8 +390,11 @@ export const hydrateVars = ( variables: Variable[], allVariables: Variable[], options: HydrateVarsOptions -): CancelBox => { - const graph = findSubgraph(createVariableGraph(allVariables), variables) +): EventedCancelBox => { + const graph = findSubgraph( + createVariableGraph(allVariables), + variables + ).filter(n => n.variable.arguments.type !== 'system') invalidateCycles(graph) let isCancelled = false @@ -370,11 +404,9 @@ export const hydrateVars = ( return } - node.status === RemoteDataState.Loading - try { // TODO: terminate the concept of node.values at the fetcher and just use variables - node.values = await hydrateVarsHelper(node, options) + node.values = await hydrateVarsHelper(node, options, on) if (node.variable.arguments.type === 'query') { node.variable.arguments.values.results = node.values.values as string[] @@ -407,7 +439,7 @@ export const hydrateVars = ( node.variable.selected = node.values.selected } - node.status = RemoteDataState.Done + on.fire('status', node.variable, node.status) return Promise.all(node.parents.filter(readyToResolve).map(resolve)) } catch (e) { @@ -430,9 +462,37 @@ export const hydrateVars = ( deferred.reject(new CancellationError()) } - Promise.all(findLeaves(graph).map(resolve)).then(() => { - deferred.resolve(extractResult(graph)) - }) + const on = (function() { + const callbacks = {} + const ret = (evt, cb) => { + if (!callbacks.hasOwnProperty(evt)) { + callbacks[evt] = [] + } - return {promise: deferred.promise, cancel} + callbacks[evt].push(cb) + } + + ret.fire = (evt, ...args) => { + if (!callbacks.hasOwnProperty(evt)) { + return + } + + callbacks[evt].forEach(cb => cb.apply(cb, args)) + } + + return ret + })() + + // NOTE: wrapping in a resolve disconnects the following findLeaves + // from the main execution thread, allowing external services to + // register listeners for the loading state changes + Promise.resolve() + .then(() => { + return Promise.all(findLeaves(graph).map(resolve)) + }) + .then(() => { + deferred.resolve(extractResult(graph)) + }) + + return {promise: deferred.promise, cancel, on} }