Merge pull request #17925 from influxdata/chore/merge-master

chore: merge master into algo-w branch
pull/17948/head
jlapacik 2020-04-30 14:28:46 -07:00 committed by GitHub
commit 4fba49fde4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 1099 additions and 140 deletions

View File

@ -715,6 +715,10 @@ func (f *fakePkgSVC) InitStack(ctx context.Context, userID influxdb.ID, stack pk
panic("not implemented")
}
func (f *fakePkgSVC) ListStacks(ctx context.Context, orgID influxdb.ID, filter pkger.ListFilter) ([]pkger.Stack, error) {
panic("not implemented")
}
func (f *fakePkgSVC) CreatePkg(ctx context.Context, setters ...pkger.CreatePkgSetFn) (*pkger.Pkg, error) {
if f.createFn != nil {
return f.createFn(ctx, setters...)

View File

@ -179,6 +179,56 @@ func TestLauncher_Pkger(t *testing.T) {
assert.NotZero(t, newStack.CRUDLog)
})
t.Run("list stacks", func(t *testing.T) {
// seed platform with stacks
newStack1, err := svc.InitStack(ctx, l.User.ID, pkger.Stack{
OrgID: l.Org.ID,
Name: "first stack",
})
require.NoError(t, err)
newStack2, err := svc.InitStack(ctx, l.User.ID, pkger.Stack{
OrgID: l.Org.ID,
Name: "second stack",
})
require.NoError(t, err)
containsStack := func(t *testing.T, haystack []pkger.Stack, needle pkger.Stack) {
t.Helper()
for _, hay := range haystack {
if hay.ID == needle.ID {
return
}
}
require.FailNowf(t, "did not find expected stack", "got: %+v", needle)
}
t.Run("returns all stacks when no filter args provided", func(t *testing.T) {
stacks, err := svc.ListStacks(ctx, l.Org.ID, pkger.ListFilter{})
require.NoError(t, err)
containsStack(t, stacks, newStack1)
containsStack(t, stacks, newStack2)
})
t.Run("filters stacks by ID filter", func(t *testing.T) {
stacks, err := svc.ListStacks(ctx, l.Org.ID, pkger.ListFilter{
StackIDs: []influxdb.ID{newStack1.ID},
})
require.NoError(t, err)
require.Len(t, stacks, 1)
containsStack(t, stacks, newStack1)
})
t.Run("filter stacks by names", func(t *testing.T) {
stacks, err := svc.ListStacks(ctx, l.Org.ID, pkger.ListFilter{
Names: []string{newStack2.Name},
})
require.NoError(t, err)
require.Len(t, stacks, 1)
containsStack(t, stacks, newStack2)
})
})
t.Run("apply with only a stackID succeeds when stack has URLs", func(t *testing.T) {
svr := httptest.NewServer(nethttp.HandlerFunc(func(w nethttp.ResponseWriter, r *nethttp.Request) {
pkg := newPkg(newBucketObject("bucket_0", "", ""))

View File

@ -12,7 +12,7 @@ import (
"strings"
platform "github.com/influxdata/influxdb/v2"
"github.com/pkg/errors"
khttp "github.com/influxdata/influxdb/v2/kit/transport/http"
)
// AuthzError is returned for authorization errors. When this error type is returned,
@ -58,11 +58,13 @@ func CheckError(resp *http.Response) (err error) {
}
}
perr := &platform.Error{
Code: khttp.StatusCodeToErrorCode(resp.StatusCode),
}
if resp.StatusCode == http.StatusUnsupportedMediaType {
return &platform.Error{
Code: platform.EInvalid,
Msg: fmt.Sprintf("invalid media type: %q", resp.Header.Get("Content-Type")),
}
perr.Msg = fmt.Sprintf("invalid media type: %q", resp.Header.Get("Content-Type"))
return perr
}
contentType := resp.Header.Get("Content-Type")
@ -74,24 +76,32 @@ func CheckError(resp *http.Response) (err error) {
var buf bytes.Buffer
if _, err := io.Copy(&buf, resp.Body); err != nil {
return &platform.Error{
Code: platform.EInternal,
Msg: err.Error(),
}
perr.Msg = "failed to read error response"
perr.Err = err
return perr
}
switch mediatype {
case "application/json":
pe := new(platform.Error)
if err := json.Unmarshal(buf.Bytes(), pe); err != nil {
line, _ := buf.ReadString('\n')
return errors.Wrap(stderrors.New(strings.TrimSuffix(line, "\n")), err.Error())
if err := json.Unmarshal(buf.Bytes(), perr); err != nil {
perr.Msg = fmt.Sprintf("attempted to unmarshal error as JSON but failed: %q", err)
perr.Err = firstLineAsError(buf)
}
return pe
default:
line, _ := buf.ReadString('\n')
return stderrors.New(strings.TrimSuffix(line, "\n"))
perr.Err = firstLineAsError(buf)
}
if perr.Code == "" {
// given it was unset during attempt to unmarshal as JSON
perr.Code = khttp.StatusCodeToErrorCode(resp.StatusCode)
}
return perr
}
func firstLineAsError(buf bytes.Buffer) error {
line, _ := buf.ReadString('\n')
return stderrors.New(strings.TrimSuffix(line, "\n"))
}
// UnauthorizedError encodes a error message and status code for unauthorized access.

View File

@ -12,7 +12,6 @@ import (
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/http"
kithttp "github.com/influxdata/influxdb/v2/kit/transport/http"
"github.com/pkg/errors"
)
func TestCheckError(t *testing.T) {
@ -43,7 +42,10 @@ func TestCheckError(t *testing.T) {
w.WriteHeader(500)
_, _ = io.WriteString(w, "upstream timeout\n")
},
want: stderrors.New("upstream timeout"),
want: &influxdb.Error{
Code: influxdb.EInternal,
Err: stderrors.New("upstream timeout"),
},
},
{
name: "error with bad json",
@ -52,7 +54,44 @@ func TestCheckError(t *testing.T) {
w.WriteHeader(500)
_, _ = io.WriteString(w, "upstream timeout\n")
},
want: errors.Wrap(stderrors.New("upstream timeout"), "invalid character 'u' looking for beginning of value"),
want: &influxdb.Error{
Code: influxdb.EInternal,
Msg: `attempted to unmarshal error as JSON but failed: "invalid character 'u' looking for beginning of value"`,
Err: stderrors.New("upstream timeout"),
},
},
{
name: "error with no content-type (encoded as json - with code)",
write: func(w *httptest.ResponseRecorder) {
w.WriteHeader(500)
_, _ = io.WriteString(w, `{"error": "service unavailable", "code": "unavailable"}`)
},
want: &influxdb.Error{
Code: influxdb.EUnavailable,
Err: stderrors.New("service unavailable"),
},
},
{
name: "error with no content-type (encoded as json - no code)",
write: func(w *httptest.ResponseRecorder) {
w.WriteHeader(503)
_, _ = io.WriteString(w, `{"error": "service unavailable"}`)
},
want: &influxdb.Error{
Code: influxdb.EUnavailable,
Err: stderrors.New("service unavailable"),
},
},
{
name: "error with no content-type (not json encoded)",
write: func(w *httptest.ResponseRecorder) {
w.WriteHeader(503)
},
want: &influxdb.Error{
Code: influxdb.EUnavailable,
Msg: `attempted to unmarshal error as JSON but failed: "unexpected end of JSON input"`,
Err: stderrors.New(""),
},
},
} {
t.Run(tt.name, func(t *testing.T) {

View File

@ -4375,6 +4375,84 @@ paths:
schema:
$ref: "#/components/schemas/Error"
/packages/stacks:
get:
operationId: ListStacks
tags:
- InfluxPackages
summary: Grab a list of installed Influx packages
parameters:
- in: query
name: orgID
required: true
schema:
type: string
description: The organization id of the stacks
- in: query
name: name
schema:
type: string
description: A collection of names to filter the list by.
- in: query
name: stackID
schema:
type: string
description: A collection of stackIDs to filter the list by.
responses:
'200':
description: Influx stacks found
content:
application/json:
schema:
type: array
items:
type: object
properties:
id:
type: string
orgID:
type: string
name:
type: string
description:
type: string
urls:
type: array
items:
type: string
createdAt:
type: string
format: date-time
readOnly: true
updatedAt:
type: string
format: date-time
readOnly: true
resources:
type: object
properties:
apiVersion:
type: string
resourceID:
type: string
kind:
type: string
pkgName:
type: string
associations:
type: array
items:
type: object
properties:
kind:
type: string
pkgName:
type: string
default:
description: Unexpected error
content:
application/json:
schema:
$ref: "#/components/schemas/Error"
post:
operationId: CreateStack
tags:

View File

@ -87,19 +87,15 @@ func NewAPI(opts ...APIOptFn) *API {
}
},
errFn: func(err error) (interface{}, int, error) {
code := influxdb.ErrorCode(err)
httpStatusCode, ok := statusCodePlatformError[code]
if !ok {
httpStatusCode = http.StatusBadRequest
}
msg := err.Error()
if msg == "" {
msg = "an internal error has occurred"
}
code := influxdb.ErrorCode(err)
return ErrBody{
Code: code,
Msg: msg,
}, httpStatusCode, nil
}, ErrorCodeToStatusCode(code), nil
},
}
for _, o := range opts {
@ -242,19 +238,3 @@ type ErrBody struct {
Code string `json:"code"`
Msg string `json:"message"`
}
// statusCodePlatformError is the map convert platform.Error to error
var statusCodePlatformError = map[string]int{
influxdb.EInternal: http.StatusInternalServerError,
influxdb.EInvalid: http.StatusBadRequest,
influxdb.EUnprocessableEntity: http.StatusUnprocessableEntity,
influxdb.EEmptyValue: http.StatusBadRequest,
influxdb.EConflict: http.StatusUnprocessableEntity,
influxdb.ENotFound: http.StatusNotFound,
influxdb.EUnavailable: http.StatusServiceUnavailable,
influxdb.EForbidden: http.StatusForbidden,
influxdb.ETooManyRequests: http.StatusTooManyRequests,
influxdb.EUnauthorized: http.StatusUnauthorized,
influxdb.EMethodNotAllowed: http.StatusMethodNotAllowed,
influxdb.ETooLarge: http.StatusRequestEntityTooLarge,
}

View File

@ -21,13 +21,9 @@ func (h ErrorHandler) HandleHTTPError(ctx context.Context, err error, w http.Res
}
code := influxdb.ErrorCode(err)
httpCode, ok := statusCodePlatformError[code]
if !ok {
httpCode = http.StatusBadRequest
}
w.Header().Set(PlatformErrorCodeHeader, code)
w.Header().Set("Content-Type", "application/json; charset=utf-8")
w.WriteHeader(httpCode)
w.WriteHeader(ErrorCodeToStatusCode(code))
var e struct {
Code string `json:"code"`
Message string `json:"message"`
@ -41,3 +37,49 @@ func (h ErrorHandler) HandleHTTPError(ctx context.Context, err error, w http.Res
b, _ := json.Marshal(e)
_, _ = w.Write(b)
}
// StatusCodeToErrorCode maps a http status code integer to an
// influxdb error code string.
func StatusCodeToErrorCode(statusCode int) string {
errorCode, ok := httpStatusCodeToInfluxDBError[statusCode]
if ok {
return errorCode
}
return influxdb.EInternal
}
// ErrorCodeToStatusCode maps an influxdb error code string to a
// http status code integer.
func ErrorCodeToStatusCode(code string) int {
statusCode, ok := influxDBErrorToStatusCode[code]
if ok {
return statusCode
}
return http.StatusInternalServerError
}
// influxDBErrorToStatusCode is a mapping of ErrorCode to http status code.
var influxDBErrorToStatusCode = map[string]int{
influxdb.EInternal: http.StatusInternalServerError,
influxdb.EInvalid: http.StatusBadRequest,
influxdb.EUnprocessableEntity: http.StatusUnprocessableEntity,
influxdb.EEmptyValue: http.StatusBadRequest,
influxdb.EConflict: http.StatusUnprocessableEntity,
influxdb.ENotFound: http.StatusNotFound,
influxdb.EUnavailable: http.StatusServiceUnavailable,
influxdb.EForbidden: http.StatusForbidden,
influxdb.ETooManyRequests: http.StatusTooManyRequests,
influxdb.EUnauthorized: http.StatusUnauthorized,
influxdb.EMethodNotAllowed: http.StatusMethodNotAllowed,
influxdb.ETooLarge: http.StatusRequestEntityTooLarge,
}
var httpStatusCodeToInfluxDBError = map[int]string{}
func init() {
for k, v := range influxDBErrorToStatusCode {
httpStatusCodeToInfluxDBError[v] = k
}
}

View File

@ -55,6 +55,27 @@ func (s *HTTPRemoteService) InitStack(ctx context.Context, userID influxdb.ID, s
return newStack, nil
}
func (s *HTTPRemoteService) ListStacks(ctx context.Context, orgID influxdb.ID, f ListFilter) ([]Stack, error) {
queryParams := [][2]string{{"orgID", orgID.String()}}
for _, name := range f.Names {
queryParams = append(queryParams, [2]string{"name", name})
}
for _, stackID := range f.StackIDs {
queryParams = append(queryParams, [2]string{"stackID", stackID.String()})
}
var resp RespListStacks
err := s.Client.
Get(RoutePrefix, "/stacks").
QueryParams(queryParams...).
DecodeJSON(&resp).
Do(ctx)
if err != nil {
return nil, err
}
return resp.Stacks, nil
}
// CreatePkg will produce a pkg from the parameters provided.
func (s *HTTPRemoteService) CreatePkg(ctx context.Context, setters ...CreatePkgSetFn) (*Pkg, error) {
var opt CreateOpt

View File

@ -50,7 +50,10 @@ func NewHTTPServer(log *zap.Logger, svc SVC) *HTTPServer {
Post("/", svr.createPkg)
r.With(middleware.SetHeader("Content-Type", "application/json; charset=utf-8")).
Post("/apply", svr.applyPkg)
r.Post("/stacks", svr.createStack)
r.Route("/stacks", func(r chi.Router) {
r.Post("/", svr.createStack)
r.Get("/", svr.listStacks)
})
}
svr.Router = r
@ -62,6 +65,65 @@ func (s *HTTPServer) Prefix() string {
return RoutePrefix
}
// RespListStacks is the HTTP response for a stack list call.
type RespListStacks struct {
Stacks []Stack `json:"stacks"`
}
func (s *HTTPServer) listStacks(w http.ResponseWriter, r *http.Request) {
q := r.URL.Query()
rawOrgID := q.Get("orgID")
orgID, err := influxdb.IDFromString(rawOrgID)
if err != nil {
s.api.Err(w, &influxdb.Error{
Code: influxdb.EInvalid,
Msg: fmt.Sprintf("organization id[%q] is invalid", rawOrgID),
Err: err,
})
return
}
if err := r.ParseForm(); err != nil {
s.api.Err(w, &influxdb.Error{
Code: influxdb.EInvalid,
Msg: "failed to parse form from encoded url",
Err: err,
})
return
}
filter := ListFilter{
Names: r.Form["name"],
}
for _, idRaw := range r.Form["stackID"] {
id, err := influxdb.IDFromString(idRaw)
if err != nil {
s.api.Err(w, &influxdb.Error{
Code: influxdb.EInvalid,
Msg: fmt.Sprintf("stack ID[%q] provided is invalid", idRaw),
Err: err,
})
return
}
filter.StackIDs = append(filter.StackIDs, *id)
}
stacks, err := s.svc.ListStacks(r.Context(), *orgID, filter)
if err != nil {
s.api.Err(w, err)
return
}
if stacks == nil {
stacks = []Stack{}
}
s.api.Respond(w, http.StatusOK, RespListStacks{
Stacks: stacks,
})
}
// ReqCreateStack is a request body for a create stack call.
type ReqCreateStack struct {
OrgID string `json:"orgID"`

View File

@ -559,6 +559,189 @@ func TestPkgerHTTPServer(t *testing.T) {
}
})
})
t.Run("list a stack", func(t *testing.T) {
t.Run("should successfully return with valid req body", func(t *testing.T) {
const expectedOrgID = 3
svc := &fakeSVC{
listStacksFn: func(ctx context.Context, orgID influxdb.ID, filter pkger.ListFilter) ([]pkger.Stack, error) {
if orgID != expectedOrgID {
return nil, nil
}
if len(filter.Names) > 0 && len(filter.StackIDs) == 0 {
var stacks []pkger.Stack
for i, name := range filter.Names {
stacks = append(stacks, pkger.Stack{
ID: influxdb.ID(i + 1),
OrgID: expectedOrgID,
Name: name,
})
}
return stacks, nil
}
if len(filter.StackIDs) > 0 && len(filter.Names) == 0 {
var stacks []pkger.Stack
for _, stackID := range filter.StackIDs {
stacks = append(stacks, pkger.Stack{
ID: stackID,
OrgID: expectedOrgID,
})
}
return stacks, nil
}
return []pkger.Stack{{
ID: 1,
OrgID: expectedOrgID,
Name: "stack_1",
}}, nil
},
}
pkgHandler := pkger.NewHTTPServer(zap.NewNop(), svc)
svr := newMountedHandler(pkgHandler, 1)
tests := []struct {
name string
queryArgs string
expectedStacks []pkger.Stack
}{
{
name: "with org ID that has stacks",
queryArgs: "orgID=" + influxdb.ID(expectedOrgID).String(),
expectedStacks: []pkger.Stack{{
ID: 1,
OrgID: expectedOrgID,
Name: "stack_1",
}},
},
{
name: "with orgID with no stacks",
queryArgs: "orgID=" + influxdb.ID(9000).String(),
expectedStacks: []pkger.Stack{},
},
{
name: "with names",
queryArgs: "name=name_stack&name=threeve&orgID=" + influxdb.ID(expectedOrgID).String(),
expectedStacks: []pkger.Stack{
{
ID: 1,
OrgID: expectedOrgID,
Name: "name_stack",
},
{
ID: 2,
OrgID: expectedOrgID,
Name: "threeve",
},
},
},
{
name: "with ids",
queryArgs: fmt.Sprintf("stackID=%s&stackID=%s&orgID=%s", influxdb.ID(1), influxdb.ID(2), influxdb.ID(expectedOrgID)),
expectedStacks: []pkger.Stack{
{
ID: 1,
OrgID: expectedOrgID,
},
{
ID: 2,
OrgID: expectedOrgID,
},
},
},
}
for _, tt := range tests {
fn := func(t *testing.T) {
testttp.
Get(t, "/api/v2/packages/stacks?"+tt.queryArgs).
Headers("Content-Type", "application/x-www-form-urlencoded").
Do(svr).
ExpectStatus(http.StatusOK).
ExpectBody(func(buf *bytes.Buffer) {
var resp pkger.RespListStacks
decodeBody(t, buf, &resp)
assert.Equal(t, tt.expectedStacks, resp.Stacks)
})
}
t.Run(tt.name, fn)
}
})
t.Run("error cases", func(t *testing.T) {
tests := []struct {
name string
reqBody pkger.ReqCreateStack
expectedStatus int
svc pkger.SVC
}{
{
name: "bad org id",
reqBody: pkger.ReqCreateStack{
OrgID: "invalid id",
},
expectedStatus: http.StatusBadRequest,
},
{
name: "bad url",
reqBody: pkger.ReqCreateStack{
OrgID: influxdb.ID(3).String(),
URLs: []string{"invalid @% url"},
},
expectedStatus: http.StatusBadRequest,
},
{
name: "translates svc conflict error",
reqBody: pkger.ReqCreateStack{OrgID: influxdb.ID(3).String()},
svc: &fakeSVC{
initStack: func(ctx context.Context, userID influxdb.ID, stack pkger.Stack) (pkger.Stack, error) {
return pkger.Stack{}, &influxdb.Error{Code: influxdb.EConflict}
},
},
expectedStatus: http.StatusUnprocessableEntity,
},
{
name: "translates svc internal error",
reqBody: pkger.ReqCreateStack{OrgID: influxdb.ID(3).String()},
svc: &fakeSVC{
initStack: func(ctx context.Context, userID influxdb.ID, stack pkger.Stack) (pkger.Stack, error) {
return pkger.Stack{}, &influxdb.Error{Code: influxdb.EInternal}
},
},
expectedStatus: http.StatusInternalServerError,
},
}
for _, tt := range tests {
fn := func(t *testing.T) {
svc := tt.svc
if svc == nil {
svc = &fakeSVC{
initStack: func(ctx context.Context, userID influxdb.ID, stack pkger.Stack) (pkger.Stack, error) {
return stack, nil
},
}
}
pkgHandler := pkger.NewHTTPServer(zap.NewNop(), svc)
svr := newMountedHandler(pkgHandler, 1)
testttp.
PostJSON(t, "/api/v2/packages/stacks", tt.reqBody).
Headers("Content-Type", "application/json").
Do(svr).
ExpectStatus(tt.expectedStatus)
}
t.Run(tt.name, fn)
}
})
})
}
func bucketPkgKinds(t *testing.T, encoding pkger.Encoding) []byte {
@ -639,9 +822,10 @@ func decodeBody(t *testing.T, r io.Reader, v interface{}) {
}
type fakeSVC struct {
initStack func(ctx context.Context, userID influxdb.ID, stack pkger.Stack) (pkger.Stack, error)
dryRunFn func(ctx context.Context, orgID, userID influxdb.ID, pkg *pkger.Pkg, opts ...pkger.ApplyOptFn) (pkger.Summary, pkger.Diff, error)
applyFn func(ctx context.Context, orgID, userID influxdb.ID, pkg *pkger.Pkg, opts ...pkger.ApplyOptFn) (pkger.Summary, pkger.Diff, error)
initStack func(ctx context.Context, userID influxdb.ID, stack pkger.Stack) (pkger.Stack, error)
listStacksFn func(ctx context.Context, orgID influxdb.ID, filter pkger.ListFilter) ([]pkger.Stack, error)
dryRunFn func(ctx context.Context, orgID, userID influxdb.ID, pkg *pkger.Pkg, opts ...pkger.ApplyOptFn) (pkger.Summary, pkger.Diff, error)
applyFn func(ctx context.Context, orgID, userID influxdb.ID, pkg *pkger.Pkg, opts ...pkger.ApplyOptFn) (pkger.Summary, pkger.Diff, error)
}
var _ pkger.SVC = (*fakeSVC)(nil)
@ -653,6 +837,13 @@ func (f *fakeSVC) InitStack(ctx context.Context, userID influxdb.ID, stack pkger
return f.initStack(ctx, userID, stack)
}
func (f *fakeSVC) ListStacks(ctx context.Context, orgID influxdb.ID, filter pkger.ListFilter) ([]pkger.Stack, error) {
if f.listStacksFn == nil {
panic("not implemented")
}
return f.listStacksFn(ctx, orgID, filter)
}
func (f *fakeSVC) CreatePkg(ctx context.Context, setters ...pkger.CreatePkgSetFn) (*pkger.Pkg, error) {
panic("not implemented")
}

View File

@ -59,6 +59,8 @@ const ResourceTypeStack influxdb.ResourceType = "stack"
// SVC is the packages service interface.
type SVC interface {
InitStack(ctx context.Context, userID influxdb.ID, stack Stack) (Stack, error)
ListStacks(ctx context.Context, orgID influxdb.ID, filter ListFilter) ([]Stack, error)
CreatePkg(ctx context.Context, setters ...CreatePkgSetFn) (*Pkg, error)
DryRun(ctx context.Context, orgID, userID influxdb.ID, pkg *Pkg, opts ...ApplyOptFn) (Summary, Diff, error)
Apply(ctx context.Context, orgID, userID influxdb.ID, pkg *Pkg, opts ...ApplyOptFn) (Summary, Diff, error)
@ -199,6 +201,7 @@ func WithVariableSVC(varSVC influxdb.VariableService) ServiceSetterFn {
// Store is the storage behavior the Service depends on.
type Store interface {
CreateStack(ctx context.Context, stack Stack) error
ListStacks(ctx context.Context, orgID influxdb.ID, filter ListFilter) ([]Stack, error)
ReadStackByID(ctx context.Context, id influxdb.ID) (Stack, error)
UpdateStack(ctx context.Context, stack Stack) error
DeleteStack(ctx context.Context, id influxdb.ID) error
@ -295,6 +298,17 @@ func (s *Service) InitStack(ctx context.Context, userID influxdb.ID, stack Stack
return stack, nil
}
// ListFilter are filter options for filtering stacks from being returned.
type ListFilter struct {
StackIDs []influxdb.ID
Names []string
}
// ListStacks returns a list of stacks.
func (s *Service) ListStacks(ctx context.Context, orgID influxdb.ID, f ListFilter) ([]Stack, error) {
return s.store.ListStacks(ctx, orgID, f)
}
type (
// CreatePkgSetFn is a functional input for setting the pkg fields.
CreatePkgSetFn func(opt *CreateOpt) error

View File

@ -36,6 +36,14 @@ func (s *authMW) InitStack(ctx context.Context, userID influxdb.ID, newStack Sta
return s.next.InitStack(ctx, userID, newStack)
}
func (s *authMW) ListStacks(ctx context.Context, orgID influxdb.ID, f ListFilter) ([]Stack, error) {
err := s.authAgent.OrgPermissions(ctx, orgID, influxdb.ReadAction)
if err != nil {
return nil, err
}
return s.next.ListStacks(ctx, orgID, f)
}
func (s *authMW) CreatePkg(ctx context.Context, setters ...CreatePkgSetFn) (*Pkg, error) {
return s.next.CreatePkg(ctx, setters...)
}

View File

@ -34,15 +34,38 @@ func (s *loggingMW) InitStack(ctx context.Context, userID influxdb.ID, newStack
s.logger.Error(
"failed to init stack",
zap.Error(err),
zap.Duration("took", time.Since(start)),
zap.Stringer("orgID", newStack.OrgID),
zap.Stringer("userID", userID),
zap.Strings("urls", newStack.URLs),
zap.Duration("took", time.Since(start)),
)
}(time.Now())
return s.next.InitStack(ctx, userID, newStack)
}
func (s *loggingMW) ListStacks(ctx context.Context, orgID influxdb.ID, f ListFilter) (stacks []Stack, err error) {
defer func(start time.Time) {
if err == nil {
return
}
var stackIDs []string
for _, id := range f.StackIDs {
stackIDs = append(stackIDs, id.String())
}
s.logger.Error(
"failed to list stacks",
zap.Error(err),
zap.Stringer("orgID", orgID),
zap.Strings("stackIDs", stackIDs),
zap.Strings("names", f.Names),
zap.Duration("took", time.Since(start)),
)
}(time.Now())
return s.next.ListStacks(ctx, orgID, f)
}
func (s *loggingMW) CreatePkg(ctx context.Context, setters ...CreatePkgSetFn) (pkg *Pkg, err error) {
defer func(start time.Time) {
dur := zap.Duration("took", time.Since(start))

View File

@ -33,6 +33,12 @@ func (s *mwMetrics) InitStack(ctx context.Context, userID influxdb.ID, newStack
return stack, rec(err)
}
func (s *mwMetrics) ListStacks(ctx context.Context, orgID influxdb.ID, f ListFilter) ([]Stack, error) {
rec := s.rec.Record("list_stacks")
stacks, err := s.next.ListStacks(ctx, orgID, f)
return stacks, rec(err)
}
func (s *mwMetrics) CreatePkg(ctx context.Context, setters ...CreatePkgSetFn) (*Pkg, error) {
rec := s.rec.Record("create_pkg")
pkg, err := s.next.CreatePkg(ctx, setters...)

View File

@ -3234,6 +3234,10 @@ func (s *fakeStore) CreateStack(ctx context.Context, stack Stack) error {
panic("not implemented")
}
func (s *fakeStore) ListStacks(ctx context.Context, orgID influxdb.ID, f ListFilter) ([]Stack, error) {
panic("not implemented")
}
func (s *fakeStore) ReadStackByID(ctx context.Context, id influxdb.ID) (Stack, error) {
panic("not implemented")
}

View File

@ -5,6 +5,7 @@ import (
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/kit/tracing"
"github.com/opentracing/opentracing-go/log"
)
type traceMW struct {
@ -26,6 +27,18 @@ func (s *traceMW) InitStack(ctx context.Context, userID influxdb.ID, newStack St
return s.next.InitStack(ctx, userID, newStack)
}
func (s *traceMW) ListStacks(ctx context.Context, orgID influxdb.ID, f ListFilter) ([]Stack, error) {
span, ctx := tracing.StartSpanFromContextWithOperationName(ctx, "ListStacks")
defer span.Finish()
stacks, err := s.next.ListStacks(ctx, orgID, f)
span.LogFields(
log.String("org_id", orgID.String()),
log.Int("num_stacks", len(stacks)),
)
return stacks, err
}
func (s *traceMW) CreatePkg(ctx context.Context, setters ...CreatePkgSetFn) (pkg *Pkg, err error) {
span, ctx := tracing.StartSpanFromContextWithOperationName(ctx, "CreatePkg")
defer span.Finish()

View File

@ -1,6 +1,7 @@
package pkger
import (
"bytes"
"context"
"encoding/json"
"time"
@ -75,6 +76,92 @@ func (s *StoreKV) CreateStack(ctx context.Context, stack Stack) error {
return s.put(ctx, stack, kv.PutNew())
}
// ListStacks returns a list of stacks.
func (s *StoreKV) ListStacks(ctx context.Context, orgID influxdb.ID, f ListFilter) ([]Stack, error) {
if len(f.StackIDs) > 0 && len(f.Names) == 0 {
return s.listStacksByID(ctx, orgID, f.StackIDs)
}
filterFn, err := storeListFilterFn(orgID, f)
if err != nil {
return nil, err
}
var stacks []Stack
err = s.kvStore.View(ctx, func(tx kv.Tx) error {
return s.indexBase.Find(ctx, tx, kv.FindOpts{
CaptureFn: func(key []byte, decodedVal interface{}) error {
stack, err := convertStackEntToStack(decodedVal.(*entStack))
if err != nil {
return err
}
stacks = append(stacks, stack)
return nil
},
FilterEntFn: func(key []byte, decodedVal interface{}) bool {
st := decodedVal.(*entStack)
return filterFn(st)
},
})
})
if err != nil {
return nil, err
}
return stacks, nil
}
func storeListFilterFn(orgID influxdb.ID, f ListFilter) (func(*entStack) bool, error) {
orgIDEncoded, err := orgID.Encode()
if err != nil {
return nil, err
}
mIDs := make(map[string]bool)
for _, id := range f.StackIDs {
b, err := id.Encode()
if err != nil {
return nil, err
}
mIDs[string(b)] = true
}
mNames := make(map[string]bool)
for _, name := range f.Names {
mNames[name] = true
}
optionalFieldFilterFn := func(ent *entStack) bool {
if len(mIDs) > 0 || len(mNames) > 0 {
return mIDs[string(ent.ID)] || mNames[ent.Name]
}
return true
}
return func(st *entStack) bool {
return bytes.Equal(orgIDEncoded, st.OrgID) && optionalFieldFilterFn(st)
}, nil
}
func (s *StoreKV) listStacksByID(ctx context.Context, orgID influxdb.ID, stackIDs []influxdb.ID) ([]Stack, error) {
var stacks []Stack
for _, id := range stackIDs {
st, err := s.ReadStackByID(ctx, id)
if influxdb.ErrorCode(err) == influxdb.ENotFound {
// since the stackIDs are a filter, if it is not found, we just continue
// on. If the user wants to verify the existence of a particular stack
// then it would be upon them to use the ReadByID call.
continue
}
if err != nil {
return nil, err
}
if orgID != st.OrgID {
continue
}
stacks = append(stacks, st)
}
return stacks, nil
}
// ReadStackByID reads a stack by the provided ID.
func (s *StoreKV) ReadStackByID(ctx context.Context, id influxdb.ID) (Stack, error) {
var stack Stack

View File

@ -83,6 +83,146 @@ func TestStoreKV(t *testing.T) {
})
})
t.Run("list stacks", func(t *testing.T) {
defer inMemStore.Flush(context.Background())
storeKV := pkger.NewStoreKV(inMemStore)
const orgID1 = 1
const orgID2 = 2
seedEntities(t, storeKV,
pkger.Stack{
ID: 1,
OrgID: orgID1,
Name: "first_name",
},
pkger.Stack{
ID: 2,
OrgID: orgID2,
Name: "first_name",
},
pkger.Stack{
ID: 3,
OrgID: orgID1,
Name: "second_name",
},
pkger.Stack{
ID: 4,
OrgID: orgID2,
Name: "second_name",
},
)
tests := []struct {
name string
orgID influxdb.ID
filter pkger.ListFilter
expected []pkger.Stack
}{
{
name: "by org id",
orgID: orgID1,
expected: []pkger.Stack{
{
ID: 1,
OrgID: orgID1,
Name: "first_name",
},
{
ID: 3,
OrgID: orgID1,
Name: "second_name",
},
},
},
{
name: "by stack ids",
orgID: orgID1,
filter: pkger.ListFilter{
StackIDs: []influxdb.ID{1, 3},
},
expected: []pkger.Stack{
{
ID: 1,
OrgID: orgID1,
Name: "first_name",
},
{
ID: 3,
OrgID: orgID1,
Name: "second_name",
},
},
},
{
name: "by stack ids skips ids that belong to different organization",
orgID: orgID1,
filter: pkger.ListFilter{
StackIDs: []influxdb.ID{1, 2, 4},
},
expected: []pkger.Stack{{
ID: 1,
OrgID: orgID1,
Name: "first_name",
}},
},
{
name: "stack ids that do not exist are skipped",
orgID: orgID1,
filter: pkger.ListFilter{
StackIDs: []influxdb.ID{1, 9000},
},
expected: []pkger.Stack{{
ID: 1,
OrgID: orgID1,
Name: "first_name",
}},
},
{
name: "by name",
orgID: orgID1,
filter: pkger.ListFilter{
Names: []string{"first_name"},
},
expected: []pkger.Stack{{
ID: 1,
OrgID: orgID1,
Name: "first_name",
}},
},
{
name: "by name and id",
orgID: orgID1,
filter: pkger.ListFilter{
StackIDs: []influxdb.ID{3},
Names: []string{"first_name"},
},
expected: []pkger.Stack{
{
ID: 1,
OrgID: orgID1,
Name: "first_name",
},
{
ID: 3,
OrgID: orgID1,
Name: "second_name",
},
},
},
}
for _, tt := range tests {
fn := func(t *testing.T) {
stacks, err := storeKV.ListStacks(context.Background(), tt.orgID, tt.filter)
require.NoError(t, err)
assert.Equal(t, tt.expected, stacks)
}
t.Run(tt.name, fn)
}
})
t.Run("read a stack", func(t *testing.T) {
defer inMemStore.Flush(context.Background())

View File

@ -296,6 +296,117 @@ describe('Dashboard', () => {
})
})
/*\
built to approximate an instance with docker metrics,
operating with the variables:
depbuck:
from(bucket: v.buckets)
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["_measurement"] == "docker_container_cpu")
|> keep(columns: ["container_name"])
|> rename(columns: {"container_name": "_value"})
|> last()
|> group()
buckets:
buckets()
|> filter(fn: (r) => r.name !~ /^_/)
|> rename(columns: {name: "_value"})
|> keep(columns: ["_value"])
and a dashboard built of :
cell one:
from(bucket: v.buckets)
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["_measurement"] == "docker_container_cpu")
|> filter(fn: (r) => r["_field"] == "usage_percent")
cell two:
from(bucket: v.buckets)
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["_measurement"] == "docker_container_cpu")
|> filter(fn: (r) => r["_field"] == "usage_percent")
|> filter(fn: (r) => r["container_name"] == v.depbuck)
with only 4 api queries being sent to fulfill it all
\*/
it('can load dependent queries without much fuss', () => {
cy.get('@org').then(({id: orgID}: Organization) => {
cy.createDashboard(orgID).then(({body: dashboard}) => {
const now = Date.now()
cy.writeData([
`test,container_name=cool dopeness=12 ${now - 1000}000000`,
`test,container_name=beans dopeness=18 ${now - 1200}000000`,
`test,container_name=cool dopeness=14 ${now - 1400}000000`,
`test,container_name=beans dopeness=10 ${now - 1600}000000`,
])
cy.createCSVVariable(orgID, 'static', ['beans', 'defbuck'])
cy.createQueryVariable(
orgID,
'dependent',
`from(bucket: v.static)
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["_measurement"] == "test")
|> keep(columns: ["container_name"])
|> rename(columns: {"container_name": "_value"})
|> last()
|> group()`
)
cy.fixture('routes').then(({orgs}) => {
cy.visit(`${orgs}/${orgID}/dashboards/${dashboard.id}`)
})
})
})
cy.getByTestID('add-cell--button').click()
cy.getByTestID('switch-to-script-editor').should('be.visible')
cy.getByTestID('switch-to-script-editor').click()
cy.getByTestID('toolbar-tab').click()
cy
.getByTestID('flux-editor')
.should('be.visible')
.click()
.focused().type(`from(bucket: v.static)
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["_measurement"] == "test")
|> filter(fn: (r) => r["_field"] == "dopeness")
|> filter(fn: (r) => r["container_name"] == v.dependent)`)
cy.getByTestID('save-cell--button').click()
// the default bucket selection should have no results
cy.getByTestID('variable-dropdown')
.eq(0)
.should('contain', 'beans')
// and cause the rest to exist in loading states
cy.getByTestID('variable-dropdown')
.eq(1)
.should('contain', 'Loading')
cy.getByTestID('cell--view-empty')
// But selecting a nonempty bucket should load some data
cy.getByTestID('variable-dropdown--button')
.eq(0)
.click()
cy.get(`#defbuck`).click()
// default select the first result
cy.getByTestID('variable-dropdown')
.eq(1)
.should('contain', 'beans')
// and also load the second result
cy.getByTestID('variable-dropdown--button')
.eq(1)
.click()
cy.get(`#cool`).click()
})
it('can create a view through the API', () => {
cy.get('@org').then(({id: orgID}: Organization) => {
cy.createDashWithViewAndVar(orgID).then(() => {

View File

@ -173,13 +173,14 @@ export const createTask = (
export const createQueryVariable = (
orgID?: string,
name: string = 'Little Variable'
name: string = 'Little Variable',
query?: string
): Cypress.Chainable<Cypress.Response> => {
const argumentsObj = {
type: 'query',
values: {
language: 'flux',
query: `filter(fn: (r) => r._field == "cpu")`,
query: query || `filter(fn: (r) => r._field == "cpu")`,
},
}
@ -196,11 +197,12 @@ export const createQueryVariable = (
export const createCSVVariable = (
orgID?: string,
name: string = 'CSVVariable'
name: string = 'CSVVariable',
csv?: string[]
): Cypress.Chainable<Cypress.Response> => {
const argumentsObj = {
type: 'constant',
values: ['c1', 'c2', 'c3', 'c4'],
values: csv || ['c1', 'c2', 'c3', 'c4'],
}
return cy.request({

View File

@ -11,11 +11,8 @@ import ViewSwitcher from 'src/shared/components/ViewSwitcher'
// Utils
import {GlobalAutoRefresher} from 'src/utils/AutoRefresher'
import {getTimeRange} from 'src/dashboards/selectors'
import {getRangeVariable} from 'src/variables/utils/getTimeRangeVars'
import {getVariables, asAssignment} from 'src/variables/selectors'
import {checkResultsLength} from 'src/shared/utils/vis'
import {getActiveTimeRange} from 'src/timeMachine/selectors/index'
import {TIME_RANGE_START, TIME_RANGE_STOP} from 'src/variables/constants'
// Types
import {
@ -23,7 +20,6 @@ import {
TimeZone,
AppState,
DashboardQuery,
VariableAssignment,
QueryViewProperties,
Theme,
} from 'src/types'
@ -38,7 +34,6 @@ interface StateProps {
timeRange: TimeRange
ranges: TimeRange | null
timeZone: TimeZone
variableAssignments: VariableAssignment[]
}
interface State {
@ -68,14 +63,7 @@ class RefreshingView extends PureComponent<Props, State> {
}
public render() {
const {
ranges,
properties,
manualRefresh,
timeZone,
variableAssignments,
theme,
} = this.props
const {ranges, properties, manualRefresh, timeZone, theme} = this.props
const {submitToken} = this.state
return (
@ -83,7 +71,6 @@ class RefreshingView extends PureComponent<Props, State> {
submitToken={submitToken}
queries={this.queries}
key={manualRefresh}
variables={variableAssignments}
>
{({
giraffeResult,
@ -152,18 +139,6 @@ class RefreshingView extends PureComponent<Props, State> {
const mstp = (state: AppState, ownProps: OwnProps): StateProps => {
const timeRange = getTimeRange(state)
// NOTE: cannot use getAllVariables here because the TimeSeries
// component appends it automatically. That should be fixed
const vars = getVariables(state)
const variableAssignments = [
...vars,
getRangeVariable(TIME_RANGE_START, timeRange),
getRangeVariable(TIME_RANGE_STOP, timeRange),
]
.map(v => asAssignment(v))
.filter(v => !!v)
const ranges = getActiveTimeRange(timeRange, ownProps.properties.queries)
const {timeZone, theme} = state.app.persisted
@ -171,7 +146,6 @@ const mstp = (state: AppState, ownProps: OwnProps): StateProps => {
timeRange,
ranges,
timeZone,
variableAssignments,
theme,
}
}

View File

@ -19,6 +19,10 @@ import {
import {runStatusesQuery} from 'src/alerting/utils/statusEvents'
// Utils
import {getTimeRange} from 'src/dashboards/selectors'
import {getVariables, asAssignment} from 'src/variables/selectors'
import {getRangeVariable} from 'src/variables/utils/getTimeRangeVars'
import {isInQuery} from 'src/variables/utils/hydrateVars'
import {getWindowVars} from 'src/variables/utils/getWindowVars'
import {buildVarsOption} from 'src/variables/utils/buildVarsOption'
import 'intersection-observer'
@ -27,6 +31,7 @@ import {getOrgIDFromBuckets} from 'src/timeMachine/actions/queries'
// Constants
import {rateLimitReached, resultTooLarge} from 'src/shared/copy/notifications'
import {TIME_RANGE_START, TIME_RANGE_STOP} from 'src/variables/constants'
// Actions
import {notify as notifyAction} from 'src/shared/actions/notifications'
@ -39,6 +44,7 @@ import {
Bucket,
ResourceType,
DashboardQuery,
Variable,
VariableAssignment,
AppState,
CancelBox,
@ -57,6 +63,7 @@ interface QueriesState {
interface StateProps {
queryLink: string
buckets: Bucket[]
variables: Variable[]
}
interface OwnProps {
@ -187,14 +194,23 @@ class TimeSeries extends Component<Props & WithRouterProps, State> {
// Cancel any existing queries
this.pendingResults.forEach(({cancel}) => cancel())
const usedVars = variables.filter(v => v.arguments.type !== 'system')
const waitList = usedVars.filter(v => v.status !== RemoteDataState.Done)
// If a variable is loading, and a cell requires it, leave the cell to never resolve,
// keeping it in a loading state until the variable is resolved
if (usedVars.length && waitList.length) {
await new Promise(() => {})
}
const vars = variables.map(v => asAssignment(v))
// Issue new queries
this.pendingResults = queries.map(({text}) => {
const orgID =
getOrgIDFromBuckets(text, buckets) || this.props.params.orgID
const windowVars = getWindowVars(text, variables)
const extern = buildVarsOption([...variables, ...windowVars])
const windowVars = getWindowVars(text, vars)
const extern = buildVarsOption([...vars, ...windowVars])
return runQuery(orgID, text, extern)
})
@ -204,7 +220,7 @@ class TimeSeries extends Component<Props & WithRouterProps, State> {
let statuses = [] as StatusRow[][]
if (check) {
const extern = buildVarsOption(variables)
const extern = buildVarsOption(vars)
this.pendingCheckStatuses = runStatusesQuery(
this.props.params.orgID,
check.id,
@ -291,12 +307,27 @@ class TimeSeries extends Component<Props & WithRouterProps, State> {
}
}
const mstp = (state: AppState): StateProps => {
const {links} = state
const mstp = (state: AppState, props: OwnProps): StateProps => {
const timeRange = getTimeRange(state)
// NOTE: cannot use getAllVariables here because the TimeSeries
// component appends it automatically. That should be fixed
// NOTE: limit the variables returned to those that are used,
// as this prevents resending when other queries get sent
const queries = props.queries.map(q => q.text).filter(t => !!t.trim())
const vars = getVariables(state).filter(v =>
queries.some(t => isInQuery(t, v))
)
const variables = [
...vars,
getRangeVariable(TIME_RANGE_START, timeRange),
getRangeVariable(TIME_RANGE_STOP, timeRange),
]
return {
queryLink: links.query.self,
queryLink: state.links.query.self,
buckets: getAll<Bucket>(state, ResourceType.Buckets),
variables,
}
}

View File

@ -118,6 +118,7 @@ export const getVariables = () => async (
}
}
// TODO: make this context aware
export const hydrateVariables = (skipCache?: boolean) => async (
dispatch: Dispatch<Action>,
getState: GetState
@ -125,37 +126,25 @@ export const hydrateVariables = (skipCache?: boolean) => async (
const state = getState()
const org = getOrg(state)
const vars = getVariablesFromState(state)
const vals = await hydrateVars(vars, getAllVariablesFromState(state), {
const hydration = hydrateVars(vars, getAllVariablesFromState(state), {
orgID: org.id,
url: state.links.query.self,
skipCache,
}).promise
const lookup = vals.reduce((prev, curr) => {
prev[curr.id] = curr
return prev
}, {})
const updated = vars.map(vari => {
if (!lookup.hasOwnProperty(vari.id)) {
return vari
}
return lookup[vari.id]
})
await dispatch(
setVariables(RemoteDataState.Done, {
result: updated.map(v => v.id),
entities: {
variables: updated.reduce((prev, curr) => {
prev[curr.id] = curr
return prev
}, {}),
},
})
)
hydration.on('status', (variable, status) => {
if (status === RemoteDataState.Loading) {
dispatch(setVariable(variable.id, status))
return
}
if (status === RemoteDataState.Done) {
const _variable = normalize<Variable, VariableEntities, string>(
variable,
variableSchema
)
dispatch(setVariable(variable.id, RemoteDataState.Done, _variable))
}
})
await hydration.promise
}
export const getVariable = (id: string) => async (

View File

@ -16,11 +16,12 @@ import {selectValue} from 'src/variables/actions/thunks'
import {getVariable, normalizeValues} from 'src/variables/selectors'
// Types
import {AppState} from 'src/types'
import {AppState, RemoteDataState} from 'src/types'
interface StateProps {
values: string[]
selectedValue: string
status: RemoteDataState
}
interface DispatchProps {
@ -56,7 +57,7 @@ class VariableDropdown extends PureComponent<Props> {
testID="variable-dropdown--button"
status={dropdownStatus}
>
{selectedValue || 'No Values'}
{this.selectedText}
</Dropdown.Button>
)}
menu={onCollapse => (
@ -94,15 +95,31 @@ class VariableDropdown extends PureComponent<Props> {
onSelect()
}
}
private get selectedText() {
const {selectedValue, status} = this.props
if (status === RemoteDataState.Loading) {
return 'Loading'
}
if (selectedValue) {
return selectedValue
}
return 'No Values'
}
}
const mstp = (state: AppState, props: OwnProps): StateProps => {
const {variableID} = props
const variable = getVariable(state, variableID)
const selected =
variable.selected && variable.selected.length ? variable.selected[0] : null
return {
status: variable.status,
values: normalizeValues(variable),
selectedValue: variable.selected[0],
selectedValue: selected,
}
}

View File

@ -165,12 +165,16 @@ export const getVariable = (state: AppState, variableID: string): Variable => {
// Now validate that the selected value makes sense for
// the current situation
const vals = normalizeValues(vari)
if (vari.selected && !vals.includes(vari.selected[0])) {
vari = {...vari}
if (
!vari.selected ||
(vari.selected && vari.selected.length && !vals.includes(vari.selected[0]))
) {
vari.selected = []
}
if ((!vari.selected || !vari.selected.length) && vals.length) {
vari.selected = [vals[0]]
if (!vari.selected.length && vals.length) {
vari.selected.push(vals[0])
}
return vari

View File

@ -237,7 +237,6 @@ describe('hydrate vars', () => {
// Basic test for now, we would need an icky mock to assert that the
// appropriate substitution is actually taking place
expect(
actual.filter(v => v.id === 'a')[0].arguments.values.results
).toEqual(['aVal'])

View File

@ -32,17 +32,25 @@ interface HydrateVarsOptions {
skipCache?: boolean
}
export interface EventedCancelBox<T> extends CancelBox<T> {
on?: any
}
export const createVariableGraph = (
allVariables: Variable[]
): VariableNode[] => {
const nodesByID: {[variableID: string]: VariableNode} = allVariables.reduce(
(prev, curr) => {
let status = RemoteDataState.Done
if (curr.arguments.type === 'query') {
status = RemoteDataState.NotStarted
}
prev[curr.id] = {
variable: curr,
values: null,
parents: [],
children: [],
status: RemoteDataState.NotStarted,
status,
cancel: () => {},
}
return prev
@ -179,9 +187,11 @@ export const collectDescendants = (
This assumes that every descendant of this node has already been hydrated.
*/
// TODO: figure out how to type the `on` function
const hydrateVarsHelper = async (
node: VariableNode,
options: HydrateVarsOptions
options: HydrateVarsOptions,
on?: any
): Promise<VariableValues> => {
const variableType = node.variable.arguments.type
@ -202,6 +212,28 @@ const hydrateVarsHelper = async (
}
}
if (variableType === 'system') {
return {
valueType: 'string',
values: node.variable.arguments.values,
selected: node.variable.selected,
}
}
if (node.status !== RemoteDataState.Loading) {
node.status = RemoteDataState.Loading
on.fire('status', node.variable, node.status)
collectAncestors(node)
.filter(parent => parent.variable.arguments.type === 'query')
.forEach(parent => {
if (parent.status !== RemoteDataState.Loading) {
parent.status = RemoteDataState.Loading
on.fire('status', parent.variable, parent.status)
}
})
}
const descendants = collectDescendants(node)
const assignments = descendants
.map(node => asAssignment(node.variable))
@ -225,6 +257,9 @@ const hydrateVarsHelper = async (
const values = await request.promise
// NOTE: do not fire `done` event here, as the value
// has not been properly hydrated yet
node.status = RemoteDataState.Done
return values
}
@ -233,17 +268,13 @@ const hydrateVarsHelper = async (
resolved (successfully or not).
*/
const readyToResolve = (parent: VariableNode): boolean =>
parent.status === RemoteDataState.NotStarted &&
parent.children.every(child => child.status === RemoteDataState.Done)
/*
Find all `NotStarted` nodes in the graph that have no children.
*/
const findLeaves = (graph: VariableNode[]): VariableNode[] =>
graph.filter(
node =>
node.children.length === 0 && node.status === RemoteDataState.NotStarted
)
graph.filter(node => node.children.length === 0)
/*
Given a node, attempt to find a cycle that the node is a part of. If no cycle
@ -359,8 +390,11 @@ export const hydrateVars = (
variables: Variable[],
allVariables: Variable[],
options: HydrateVarsOptions
): CancelBox<Variable[]> => {
const graph = findSubgraph(createVariableGraph(allVariables), variables)
): EventedCancelBox<Variable[]> => {
const graph = findSubgraph(
createVariableGraph(allVariables),
variables
).filter(n => n.variable.arguments.type !== 'system')
invalidateCycles(graph)
let isCancelled = false
@ -370,11 +404,9 @@ export const hydrateVars = (
return
}
node.status === RemoteDataState.Loading
try {
// TODO: terminate the concept of node.values at the fetcher and just use variables
node.values = await hydrateVarsHelper(node, options)
node.values = await hydrateVarsHelper(node, options, on)
if (node.variable.arguments.type === 'query') {
node.variable.arguments.values.results = node.values.values as string[]
@ -407,7 +439,7 @@ export const hydrateVars = (
node.variable.selected = node.values.selected
}
node.status = RemoteDataState.Done
on.fire('status', node.variable, node.status)
return Promise.all(node.parents.filter(readyToResolve).map(resolve))
} catch (e) {
@ -430,9 +462,37 @@ export const hydrateVars = (
deferred.reject(new CancellationError())
}
Promise.all(findLeaves(graph).map(resolve)).then(() => {
deferred.resolve(extractResult(graph))
})
const on = (function() {
const callbacks = {}
const ret = (evt, cb) => {
if (!callbacks.hasOwnProperty(evt)) {
callbacks[evt] = []
}
return {promise: deferred.promise, cancel}
callbacks[evt].push(cb)
}
ret.fire = (evt, ...args) => {
if (!callbacks.hasOwnProperty(evt)) {
return
}
callbacks[evt].forEach(cb => cb.apply(cb, args))
}
return ret
})()
// NOTE: wrapping in a resolve disconnects the following findLeaves
// from the main execution thread, allowing external services to
// register listeners for the loading state changes
Promise.resolve()
.then(() => {
return Promise.all(findLeaves(graph).map(resolve))
})
.then(() => {
deferred.resolve(extractResult(graph))
})
return {promise: deferred.promise, cancel, on}
}