Merge pull request #15393 from influxdata/flux-staging/v0.50.x

build(flux): update Flux to v0.50.0
pull/15412/head
Jonathan A. Sternberg 2019-10-15 07:41:50 -07:00 committed by GitHub
commit 0cddc7254c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 804 additions and 166 deletions

2
go.mod
View File

@ -38,7 +38,7 @@ require (
github.com/hashicorp/raft v1.0.0 // indirect
github.com/hashicorp/vault/api v1.0.2
github.com/influxdata/cron v0.0.0-20190812233253-38faece03642
github.com/influxdata/flux v0.49.0
github.com/influxdata/flux v0.50.0
github.com/influxdata/influxql v0.0.0-20180925231337-1cbfca8e56b6
github.com/influxdata/usage-client v0.0.0-20160829180054-6d3895376368
github.com/jessevdk/go-flags v1.4.0

8
go.sum
View File

@ -233,12 +233,12 @@ github.com/imdario/mergo v0.3.6 h1:xTNEAn+kxVO7dTZGu0CegyqKZmoWFI0rF8UxjlB2d28=
github.com/imdario/mergo v0.3.6/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA=
github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/influxdata/changelog v1.0.0 h1:RstJD6H48zLQj0GdE6E6k/6RPwtUjkyzIe/T1E/xuWU=
github.com/influxdata/changelog v1.0.0/go.mod h1:uzpGWE/qehT8L426YuXwpMQub+a63vIINhIeEI9mnSM=
github.com/influxdata/changelog v1.1.0 h1:HXhmLZDrbuC+Ca5YX7g8B8cH5DmJpaOjd844d9Y7aTQ=
github.com/influxdata/changelog v1.1.0/go.mod h1:uzpGWE/qehT8L426YuXwpMQub+a63vIINhIeEI9mnSM=
github.com/influxdata/cron v0.0.0-20190812233253-38faece03642 h1:ae+mZOcsOpcD0GyaVpqAzR/2t2tffQ2cWArPGohs3A8=
github.com/influxdata/cron v0.0.0-20190812233253-38faece03642/go.mod h1:XabtPPW2qsCg0tl+kjaPU+cFS+CjQXEXbT1VJvHT4og=
github.com/influxdata/flux v0.49.0 h1:uRIUWqPNhAfy3RfTdidnHFjFRL8q5fHZoUumz3qW1Wo=
github.com/influxdata/flux v0.49.0/go.mod h1:jnRutnpW4auRnMYcZQdRhhUKI2xrDAf4X1qjlfSdN6c=
github.com/influxdata/flux v0.50.0 h1:O+WfqDJzzN3/+KBuszODM3QOiOH600eR1Cg8+XjycOg=
github.com/influxdata/flux v0.50.0/go.mod h1:da7xWKIC4aUxqecO/frxZs4wr+0jXqV+pSOYBTuVYoA=
github.com/influxdata/goreleaser v0.97.0-influx h1:jT5OrcW7WfS0e2QxfwmTBjhLvpIC9CDLRhNgZJyhj8s=
github.com/influxdata/goreleaser v0.97.0-influx/go.mod h1:MnjA0e0Uq6ISqjG1WxxMAl+3VS1QYjILSWVnMYDxasE=
github.com/influxdata/influxql v0.0.0-20180925231337-1cbfca8e56b6 h1:CFx+pP90q/qg3spoiZjf8donE4WpAdjeJfPOcoNqkWo=

View File

@ -55,8 +55,7 @@ type Controller struct {
done chan struct{}
abortOnce sync.Once
abort chan struct{}
memoryBytesQuotaPerQuery int64
memory *memoryManager
metrics *controllerMetrics
labelKeys []string
@ -69,12 +68,26 @@ type Controller struct {
type Config struct {
// ConcurrencyQuota is the number of queries that are allowed to execute concurrently.
ConcurrencyQuota int
// InitialMemoryBytesQuotaPerQuery is the initial number of bytes allocated for a query
// when it is started. If this is unset, then the MemoryBytesQuotaPerQuery will be used.
InitialMemoryBytesQuotaPerQuery int64
// MemoryBytesQuotaPerQuery is the maximum number of bytes (in table memory) a query is allowed to use at
// any given time.
//
// The maximum amount of memory the controller is allowed to consume is
// ConcurrencyQuota * MemoryBytesQuotaPerQuery
// A query may not be able to use its entire quota of memory if requesting more memory would conflict
// with the maximum amount of memory that the controller can request.
MemoryBytesQuotaPerQuery int64
// MaxMemoryBytes is the maximum amount of memory the controller is allowed to
// allocated to queries.
//
// If this is unset, then this number is ConcurrencyQuota * MemoryBytesQuotaPerQuery.
// This number must be greater than or equal to the ConcurrencyQuota * InitialMemoryBytesQuotaPerQuery.
// This number may be less than the ConcurrencyQuota * MemoryBytesQuotaPerQuery.
MaxMemoryBytes int64
// QueueSize is the number of queries that are allowed to be awaiting execution before new queries are
// rejected.
QueueSize int
@ -87,23 +100,54 @@ type Config struct {
ExecutorDependencies []flux.Dependency
}
func (c *Config) Validate() error {
// complete will fill in the defaults, validate the configuration, and
// return the new Config.
func (c *Config) complete() (Config, error) {
config := *c
if config.InitialMemoryBytesQuotaPerQuery == 0 {
config.InitialMemoryBytesQuotaPerQuery = config.MemoryBytesQuotaPerQuery
}
if err := config.validate(true); err != nil {
return Config{}, err
}
return config, nil
}
func (c *Config) validate(isComplete bool) error {
if c.ConcurrencyQuota <= 0 {
return errors.New("ConcurrencyQuota must be positive")
}
if c.MemoryBytesQuotaPerQuery <= 0 {
return errors.New("MemoryBytesQuotaPerQuery must be positive")
}
if c.InitialMemoryBytesQuotaPerQuery < 0 || (isComplete && c.InitialMemoryBytesQuotaPerQuery == 0) {
return errors.New("InitialMemoryBytesQuotaPerQuery must be positive")
}
if c.MaxMemoryBytes < 0 {
return errors.New("MaxMemoryBytes must be positive")
}
if c.MaxMemoryBytes != 0 {
if minMemory := int64(c.ConcurrencyQuota) * c.InitialMemoryBytesQuotaPerQuery; c.MaxMemoryBytes < minMemory {
return fmt.Errorf("MaxMemoryBytes must be greater than or equal to the ConcurrencyQuota * InitialMemoryBytesQuotaPerQuery: %d < %d (%d * %d)", c.MaxMemoryBytes, minMemory, c.ConcurrencyQuota, c.InitialMemoryBytesQuotaPerQuery)
}
}
if c.QueueSize <= 0 {
return errors.New("QueueSize must be positive")
}
return nil
}
// Validate will validate that the controller configuration is valid.
func (c *Config) Validate() error {
return c.validate(false)
}
type QueryID uint64
func New(c Config) (*Controller, error) {
if err := c.Validate(); err != nil {
func New(config Config) (*Controller, error) {
c, err := config.complete()
if err != nil {
return nil, errors.Wrap(err, "invalid controller config")
}
c.MetricLabelKeys = append(c.MetricLabelKeys, orgLabel) //lint:ignore SA1029 this is a temporary ignore until we have time to create an appropriate type
@ -113,18 +157,30 @@ func New(c Config) (*Controller, error) {
}
logger.Info("Starting query controller",
zap.Int("concurrency_quota", c.ConcurrencyQuota),
zap.Int64("initial_memory_bytes_quota_per_query", c.InitialMemoryBytesQuotaPerQuery),
zap.Int64("memory_bytes_quota_per_query", c.MemoryBytesQuotaPerQuery),
zap.Int64("max_memory_bytes", c.MaxMemoryBytes),
zap.Int("queue_size", c.QueueSize))
mm := &memoryManager{
initialBytesQuotaPerQuery: c.InitialMemoryBytesQuotaPerQuery,
memoryBytesQuotaPerQuery: c.MemoryBytesQuotaPerQuery,
}
if c.MaxMemoryBytes > 0 {
mm.unusedMemoryBytes = c.MaxMemoryBytes - (int64(c.ConcurrencyQuota) * c.InitialMemoryBytesQuotaPerQuery)
} else {
mm.unlimited = true
}
ctrl := &Controller{
queries: make(map[QueryID]*Query),
queryQueue: make(chan *Query, c.QueueSize),
done: make(chan struct{}),
abort: make(chan struct{}),
memoryBytesQuotaPerQuery: c.MemoryBytesQuotaPerQuery,
logger: logger,
metrics: newControllerMetrics(c.MetricLabelKeys),
labelKeys: c.MetricLabelKeys,
dependencies: c.ExecutorDependencies,
queries: make(map[QueryID]*Query),
queryQueue: make(chan *Query, c.QueueSize),
done: make(chan struct{}),
abort: make(chan struct{}),
memory: mm,
logger: logger,
metrics: newControllerMetrics(c.MetricLabelKeys),
labelKeys: c.MetricLabelKeys,
dependencies: c.ExecutorDependencies,
}
ctrl.wg.Add(c.ConcurrencyQuota)
for i := 0; i < c.ConcurrencyQuota; i++ {
@ -326,7 +382,9 @@ func (c *Controller) processQueryQueue() {
}
}
// executeQuery will execute a compiled program and wait for its completion.
func (c *Controller) executeQuery(q *Query) {
defer c.waitForQuery(q)
defer func() {
if e := recover(); e != nil {
var ok bool
@ -355,8 +413,7 @@ func (c *Controller) executeQuery(q *Query) {
return
}
q.alloc = new(memory.Allocator)
q.alloc.Limit = func(v int64) *int64 { return &v }(c.memoryBytesQuotaPerQuery)
q.c.createAllocator(q)
exec, err := q.program.Start(ctx, q.alloc)
if err != nil {
q.setErr(err)
@ -366,6 +423,14 @@ func (c *Controller) executeQuery(q *Query) {
q.pump(exec, ctx.Done())
}
// waitForQuery will wait until the query is done.
func (c *Controller) waitForQuery(q *Query) {
select {
case <-q.doneCh:
case <-c.done:
}
}
func (c *Controller) finish(q *Query) {
c.queriesMu.Lock()
delete(c.queries, q.id)
@ -461,7 +526,9 @@ type Query struct {
program flux.Program
exec flux.Query
results chan flux.Result
alloc *memory.Allocator
memoryManager *queryMemoryManager
alloc *memory.Allocator
}
// ID reports an ephemeral unique ID for the query.
@ -538,6 +605,11 @@ func (q *Query) Done() {
// Mark the query as finished so it is removed from the query map.
q.c.finish(q)
// Release the additional memory associated with this query.
if q.memoryManager != nil {
q.memoryManager.Release()
}
// count query request
if q.err != nil || len(q.runtimeErrs) > 0 {
q.c.countQueryRequest(q, labelRuntimeError)

View File

@ -3,6 +3,7 @@ package control_test
import (
"context"
"errors"
"fmt"
"strings"
"sync"
"testing"
@ -895,6 +896,380 @@ func TestController_DoneWithoutRead(t *testing.T) {
wg.Wait()
}
// This tests what happens when there is memory remaining,
// but we would go above the maximum amount of available memory.
func TestController_Error_MaxMemory(t *testing.T) {
config := config
config.InitialMemoryBytesQuotaPerQuery = config.MemoryBytesQuotaPerQuery / 2
config.MaxMemoryBytes = config.MemoryBytesQuotaPerQuery * 2
ctrl, err := control.New(config)
if err != nil {
t.Fatal(err)
}
defer shutdown(t, ctrl)
compiler := &mock.Compiler{
CompileFn: func(ctx context.Context) (flux.Program, error) {
return &mock.Program{
ExecuteFn: func(ctx context.Context, q *mock.Query, alloc *memory.Allocator) {
// Allocate memory continuously to hit the memory limit.
for i := 0; i < 16; i++ {
size := config.MemoryBytesQuotaPerQuery / 16
if err := alloc.Account(int(size)); err != nil {
q.SetErr(err)
return
}
}
// This final allocation should cause an error even though
// we haven't reached the maximum memory usage for the system.
if err := alloc.Account(32); err == nil {
t.Fatal("expected error")
}
},
}, nil
},
}
q, err := ctrl.Query(context.Background(), makeRequest(compiler))
if err != nil {
t.Errorf("unexpected error: %s", err)
return
}
consumeResults(t, q)
}
// This tests that we can continuously run queries that do not use
// more than their initial memory allocation with some noisy neighbors.
// The noisy neighbors may occasionally fail because they are competing
// with each other, but they will never cause a small query to fail.
func TestController_NoisyNeighbor(t *testing.T) {
config := config
// We are fine using up to 1024 without an additional allocation.
config.InitialMemoryBytesQuotaPerQuery = 1024
// Effectively no maximum quota per query.
config.MemoryBytesQuotaPerQuery = config.InitialMemoryBytesQuotaPerQuery * 100
// The maximum number is about double what is needed to run
// all of the queries.
config.MaxMemoryBytes = config.InitialMemoryBytesQuotaPerQuery * 20
// The concurrency is 10 which means at most 10 queries can run
// at any given time.
config.ConcurrencyQuota = 10
// Set the queue length to something that can accommodate the input.
config.QueueSize = 1000
ctrl, err := control.New(config)
if err != nil {
t.Fatal(err)
}
defer shutdown(t, ctrl)
wellBehavedNeighbor := &mock.Compiler{
CompileFn: func(ctx context.Context) (flux.Program, error) {
return &mock.Program{
ExecuteFn: func(ctx context.Context, q *mock.Query, alloc *memory.Allocator) {
// Allocate memory until we hit our initial memory limit so we should
// never request more memory.
for amount := int64(0); amount < config.InitialMemoryBytesQuotaPerQuery; amount += 16 {
if err := alloc.Account(16); err != nil {
q.SetErr(fmt.Errorf("well behaved query affected by noisy neighbor: %s", err))
return
}
}
},
}, nil
},
}
noisyNeighbor := &mock.Compiler{
CompileFn: func(ctx context.Context) (flux.Program, error) {
return &mock.Program{
ExecuteFn: func(ctx context.Context, q *mock.Query, alloc *memory.Allocator) {
// Allocate memory continuously to use up what we can and be as noisy as possible.
// Turn up the stereo and party on.
for {
if err := alloc.Account(16); err != nil {
// Whoops, party shut down.
return
}
}
},
}, nil
},
}
var wg sync.WaitGroup
// Launch 100 queriers that are well behaved. They should never fail.
errCh := make(chan error, 1)
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < 1000; i++ {
q, err := ctrl.Query(context.Background(), makeRequest(wellBehavedNeighbor))
if err != nil {
select {
case errCh <- err:
default:
}
return
}
consumeResults(t, q)
}
}()
}
// Launch 10 noisy neighbors. They will fail continuously.
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < 1000; i++ {
q, err := ctrl.Query(context.Background(), makeRequest(noisyNeighbor))
if err != nil {
select {
case errCh <- err:
default:
}
return
}
consumeResults(t, q)
}
}()
}
wg.Wait()
close(errCh)
for err := range errCh {
t.Fatalf("unexpected error: %s", err)
}
}
// This tests that a query that should be allowed is killed
// when it attempts to use more memory available than the
// system has.
func TestController_Error_NoRemainingMemory(t *testing.T) {
config := config
// We are fine using up to 1024 without an additional allocation.
config.InitialMemoryBytesQuotaPerQuery = 1024
// Effectively no maximum quota per query.
config.MemoryBytesQuotaPerQuery = config.InitialMemoryBytesQuotaPerQuery * 100
// The maximum memory available on the system is double the initial quota.
config.MaxMemoryBytes = config.InitialMemoryBytesQuotaPerQuery * 2
ctrl, err := control.New(config)
if err != nil {
t.Fatal(err)
}
defer shutdown(t, ctrl)
compiler := &mock.Compiler{
CompileFn: func(ctx context.Context) (flux.Program, error) {
return &mock.Program{
ExecuteFn: func(ctx context.Context, q *mock.Query, alloc *memory.Allocator) {
// Allocate memory continuously to use up what we can until denied.
for size := int64(0); ; size += 16 {
if err := alloc.Account(16); err != nil {
// We were not allowed to allocate more.
// Ensure that the size never exceeded the
// MaxMemoryBytes value.
if size > config.MaxMemoryBytes {
t.Errorf("query was allowed to allocate more than the maximum memory: %d > %d", size, config.MaxMemoryBytes)
}
return
}
}
},
}, nil
},
}
q, err := ctrl.Query(context.Background(), makeRequest(compiler))
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
consumeResults(t, q)
}
// This test ensures the memory that the extra memory allocated
// for a query is properly returned when the query exits.
func TestController_MemoryRelease(t *testing.T) {
config := config
config.InitialMemoryBytesQuotaPerQuery = 16
config.MaxMemoryBytes = config.MemoryBytesQuotaPerQuery * 2
ctrl, err := control.New(config)
if err != nil {
t.Fatal(err)
}
defer shutdown(t, ctrl)
compiler := &mock.Compiler{
CompileFn: func(ctx context.Context) (flux.Program, error) {
return &mock.Program{
ExecuteFn: func(ctx context.Context, q *mock.Query, alloc *memory.Allocator) {
// Allocate some amount of memory and never release it.
if err := alloc.Account(int(config.MemoryBytesQuotaPerQuery) / 2); err != nil {
q.SetErr(err)
return
}
},
}, nil
},
}
// Run 100 queries. If we do not release the memory properly,
// this would fail.
for i := 0; i < 100; i++ {
q, err := ctrl.Query(context.Background(), makeRequest(compiler))
if err != nil {
t.Errorf("unexpected error: %s", err)
return
}
consumeResults(t, q)
if t.Failed() {
return
}
}
}
// Set an irregular memory quota so that doubling the limit continuously
// would send us over the memory quota limit and make sure that
// the quota is still enforced correctly.
func TestController_IrregularMemoryQuota(t *testing.T) {
config := config
config.InitialMemoryBytesQuotaPerQuery = 64
config.MemoryBytesQuotaPerQuery = 768
config.MaxMemoryBytes = config.MemoryBytesQuotaPerQuery * 2
ctrl, err := control.New(config)
if err != nil {
t.Fatal(err)
}
defer shutdown(t, ctrl)
compiler := &mock.Compiler{
CompileFn: func(ctx context.Context) (flux.Program, error) {
return &mock.Program{
ExecuteFn: func(ctx context.Context, q *mock.Query, alloc *memory.Allocator) {
// Allocate memory continuously to hit the memory limit.
for size := 0; size < 768; size += 16 {
if err := alloc.Account(16); err != nil {
q.SetErr(err)
return
}
}
// This final allocation should cause an error since we reached the
// memory quota. If the code for setting the limit is faulty, this
// would end up being allowed since the limit was set incorrectly.
if err := alloc.Account(16); err == nil {
t.Fatal("expected error")
}
},
}, nil
},
}
q, err := ctrl.Query(context.Background(), makeRequest(compiler))
if err != nil {
t.Errorf("unexpected error: %s", err)
return
}
consumeResults(t, q)
}
// This tests that if we run a bunch of queries that reserve memory,
// we don't encounter any race conditions that cause the currently
// in use memory from the pool to be accounted incorrectly.
func TestController_ReserveMemoryWithoutExceedingMax(t *testing.T) {
config := config
// Small initial memory bytes allocation so most of the query
// is handled by the memory pool.
config.InitialMemoryBytesQuotaPerQuery = 16
// We will allocate 1024. This is needed to ensure the queries do not
// allocate too much.
config.MemoryBytesQuotaPerQuery = 1024
// The maximum amount of memory. We will run with a concurrency of
// 100 and each of these queries will allocate exactly 1024.
config.MaxMemoryBytes = 1024 * 100
// The concurrency is 100 which means at most 100 queries can run
// at any given time.
config.ConcurrencyQuota = 100
// Set the queue length to something that can accommodate the input.
config.QueueSize = 1000
ctrl, err := control.New(config)
if err != nil {
t.Fatal(err)
}
defer shutdown(t, ctrl)
compiler := &mock.Compiler{
CompileFn: func(ctx context.Context) (flux.Program, error) {
return &mock.Program{
ExecuteFn: func(ctx context.Context, q *mock.Query, alloc *memory.Allocator) {
// Allocate memory continuously to use up what we can and be as noisy as possible.
// Turn up the stereo and party on.
for size := 0; size < 1024; size += 16 {
if err := alloc.Account(16); err != nil {
q.SetErr(err)
return
}
}
},
}, nil
},
}
var wg sync.WaitGroup
// Launch double the number of running queriers to ensure saturation.
errCh := make(chan error, 1)
for i := 0; i < 300; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < 1000; i++ {
q, err := ctrl.Query(context.Background(), makeRequest(compiler))
if err != nil {
select {
case errCh <- err:
default:
}
return
}
consumeResults(t, q)
}
}()
}
wg.Wait()
close(errCh)
for err := range errCh {
t.Fatalf("unexpected error: %s", err)
}
}
func consumeResults(tb testing.TB, q flux.Query) {
tb.Helper()
for res := range q.Results() {
if err := res.Tables().Do(func(table flux.Table) error {
return nil
}); err != nil {
tb.Errorf("unexpected error: %s", err)
}
}
q.Done()
if err := q.Err(); err != nil {
tb.Errorf("unexpected error: %s", err)
}
}
func shutdown(t *testing.T, ctrl *control.Controller) {
t.Helper()

145
query/control/memory.go Normal file
View File

@ -0,0 +1,145 @@
package control
import (
"errors"
"math"
"sync/atomic"
"github.com/influxdata/flux/memory"
)
type memoryManager struct {
// initialBytesQuotaPerQuery is the initial amount of memory
// allocated for each query. It does not count against the
// memory pool.
initialBytesQuotaPerQuery int64
// memoryBytesQuotaPerQuery is the maximum amount of memory
// that may be allocated to each query.
memoryBytesQuotaPerQuery int64
// unusedMemoryBytes is the amount of memory that may be used
// when a query requests more memory. This value is only used
// when unlimited is set to false.
unusedMemoryBytes int64
// unlimited indicates that the memory manager should indicate
// there is an unlimited amount of free memory available.
unlimited bool
}
// createAllocator will construct an allocator and memory manager
// for the given query.
func (c *Controller) createAllocator(q *Query) {
q.memoryManager = &queryMemoryManager{
m: c.memory,
limit: c.memory.initialBytesQuotaPerQuery,
}
q.alloc = &memory.Allocator{
// Use an anonymous function to ensure the value is copied.
Limit: func(v int64) *int64 { return &v }(q.memoryManager.limit),
Manager: q.memoryManager,
}
}
// queryMemoryManager is a memory manager for a specific query.
type queryMemoryManager struct {
m *memoryManager
limit int64
given int64
}
// RequestMemory will determine if the query can be given more memory
// when it is requested.
//
// Note: This function accesses the memoryManager whose attributes
// may be modified concurrently. Atomic operations are used to keep
// it lockless. The data associated with this specific query are only
// invoked from within a lock so they are safe to modify.
// Second Note: The errors here are discarded anyway so don't worry
// too much about the specific message or structure.
func (q *queryMemoryManager) RequestMemory(want int64) (got int64, err error) {
// It can be determined statically if we are going to violate
// the memoryBytesQuotaPerQuery.
if q.limit+want > q.m.memoryBytesQuotaPerQuery {
return 0, errors.New("query hit hard limit")
}
for {
unused := int64(math.MaxInt64)
if !q.m.unlimited {
unused = atomic.LoadInt64(&q.m.unusedMemoryBytes)
if unused < want {
// We do not have the capacity for this query to
// be given more memory.
return 0, errors.New("not enough capacity")
}
}
// The memory allocator will only request the bare amount of
// memory it needs, but it will probably ask for more memory
// so, if possible, give it more so it isn't repeatedly calling
// this method.
given := q.giveMemory(want, unused)
// Reserve this memory for our own use.
if !q.m.unlimited {
if !atomic.CompareAndSwapInt64(&q.m.unusedMemoryBytes, unused, unused-given) {
// The unused value has changed so someone may have taken
// the memory that we wanted. Retry.
continue
}
}
// Successfully reserved the memory so update our own internal
// counter for the limit.
q.limit += given
q.given += given
return given, nil
}
}
// giveMemory will determine an appropriate amount of memory to give
// a query based on what it wants and how much it has allocated in
// the past. It will always return a number greater than or equal
// to want.
func (q *queryMemoryManager) giveMemory(want, unused int64) int64 {
// If we can safely double the limit, then just do that.
if q.limit > want && q.limit < unused {
if q.limit*2 <= q.m.memoryBytesQuotaPerQuery {
return q.limit
}
// Doubling the limit sends us over the quota.
// Determine what would be our maximum amount.
max := q.m.memoryBytesQuotaPerQuery - q.limit
if max > want {
return max
}
}
// If we can't double because there isn't enough space
// in unused, maybe we can just use everything.
if unused > want && unused < q.limit {
return unused
}
// Otherwise we have already determined we can give the
// wanted number of bytes so just give that.
return want
}
func (q *queryMemoryManager) FreeMemory(bytes int64) {
// Not implemented. There is no problem with invoking
// this method, but the controller won't recognize that
// the memory has been declared as returned.
}
// Release will release all of the allocated memory to the
// memory manager.
func (q *queryMemoryManager) Release() {
if !q.m.unlimited {
atomic.AddInt64(&q.m.unusedMemoryBytes, q.given)
}
q.limit = q.m.initialBytesQuotaPerQuery
q.given = 0
}

View File

@ -43,35 +43,60 @@ func runEndToEnd(t *testing.T, pkgs []*ast.Package) {
l.SetupOrFail(t)
defer l.ShutdownOrFail(t, ctx)
for _, pkg := range pkgs {
pkg := pkg.Copy().(*ast.Package)
name := strings.TrimSuffix(pkg.Files[0].Name, "_test.flux")
t.Run(name, func(t *testing.T) {
if reason, ok := itesting.FluxEndToEndSkipList[name]; ok {
t.Skip(reason)
test := func(t *testing.T, f func(t *testing.T)) {
t.Run(pkg.Path, f)
}
if pkg.Path == "universe" {
test = func(t *testing.T, f func(t *testing.T)) {
f(t)
}
}
test(t, func(t *testing.T) {
for _, file := range pkg.Files {
name := strings.TrimSuffix(file.Name, "_test.flux")
t.Run(name, func(t *testing.T) {
if reason, ok := itesting.FluxEndToEndSkipList[pkg.Path][name]; ok {
t.Skip(reason)
}
testFlux(t, l, file)
})
}
testFlux(t, l, pkg)
})
}
}
func benchEndToEnd(b *testing.B, pkgs []*ast.Package) {
l := launcher.RunTestLauncherOrFail(b, ctx)
l.SetupOrFail(b)
defer l.ShutdownOrFail(b, ctx)
for _, pkg := range pkgs {
pkg := pkg.Copy().(*ast.Package)
name := pkg.Files[0].Name
b.Run(name, func(b *testing.B) {
if reason, ok := itesting.FluxEndToEndSkipList[strings.TrimSuffix(name, ".flux")]; ok {
b.Skip(reason)
}
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
testFlux(b, l, pkg)
}
})
// TODO(jsternberg): These benchmarks don't run properly
// and need to be fixed. Commenting out the code for now.
b.Skip("https://github.com/influxdata/influxdb/issues/15391")
// l := launcher.RunTestLauncherOrFail(b, ctx)
// l.SetupOrFail(b)
// defer l.ShutdownOrFail(b, ctx)
// for _, pkg := range pkgs {
// pkg := pkg.Copy().(*ast.Package)
// name := pkg.Files[0].Name
// b.Run(name, func(b *testing.B) {
// if reason, ok := itesting.FluxEndToEndSkipList[strings.TrimSuffix(name, ".flux")]; ok {
// b.Skip(reason)
// }
// b.ResetTimer()
// b.ReportAllocs()
// for i := 0; i < b.N; i++ {
// testFlux(b, l, pkg)
// }
// })
// }
}
func makeTestPackage(file *ast.File) *ast.Package {
file = file.Copy().(*ast.File)
file.Package.Name.Name = "main"
pkg := &ast.Package{
Package: "main",
Files: []*ast.File{file},
}
return pkg
}
var optionsSource = `
@ -95,7 +120,7 @@ func init() {
optionsAST = pkg.Files[0]
}
func testFlux(t testing.TB, l *launcher.TestLauncher, pkg *ast.Package) {
func testFlux(t testing.TB, l *launcher.TestLauncher, file *ast.File) {
// Query server to ensure write persists.
@ -127,6 +152,7 @@ func testFlux(t testing.TB, l *launcher.TestLauncher, pkg *ast.Package) {
options.Body = append([]ast.Statement{bucketOpt, orgOpt}, options.Body...)
// Add options to pkg
pkg := makeTestPackage(file)
pkg.Files = append(pkg.Files, options)
// Add testing.inspect call to ensure the data is loaded

View File

@ -1,132 +1,151 @@
package testing
var FluxEndToEndSkipList = map[string]string{
// TODO(adam) determine the reason for these test failures.
"cov": "Reason TBD",
"covariance": "Reason TBD",
"cumulative_sum": "Reason TBD",
"cumulative_sum_default": "Reason TBD",
"cumulative_sum_noop": "Reason TBD",
"drop_non_existent": "Reason TBD",
"first": "Reason TBD",
"highestAverage": "Reason TBD",
"highestMax": "Reason TBD",
"histogram": "Reason TBD",
"histogram_normalize": "Reason TBD",
"histogram_quantile": "Reason TBD",
"join": "Reason TBD",
"join_across_measurements": "Reason TBD",
"join_agg": "Reason TBD",
"keep_non_existent": "Reason TBD",
"key_values": "Reason TBD",
"key_values_host_name": "Reason TBD",
"last": "Reason TBD",
"lowestAverage": "Reason TBD",
"max": "Reason TBD",
"min": "Reason TBD",
"sample": "Reason TBD",
"selector_preserve_time": "Reason TBD",
"shift": "Reason TBD",
"shift_negative_duration": "Reason TBD",
"task_per_line": "Reason TBD",
"top": "Reason TBD",
"union": "Reason TBD",
"union_heterogeneous": "Reason TBD",
"unique": "Reason TBD",
"distinct": "Reason TBD",
var FluxEndToEndSkipList = map[string]map[string]string{
"universe": {
// TODO(adam) determine the reason for these test failures.
"cov": "Reason TBD",
"covariance": "Reason TBD",
"cumulative_sum": "Reason TBD",
"cumulative_sum_default": "Reason TBD",
"cumulative_sum_noop": "Reason TBD",
"drop_non_existent": "Reason TBD",
"first": "Reason TBD",
"highestAverage": "Reason TBD",
"highestMax": "Reason TBD",
"histogram": "Reason TBD",
"histogram_normalize": "Reason TBD",
"histogram_quantile": "Reason TBD",
"join": "Reason TBD",
"join_across_measurements": "Reason TBD",
"join_agg": "Reason TBD",
"keep_non_existent": "Reason TBD",
"key_values": "Reason TBD",
"key_values_host_name": "Reason TBD",
"last": "Reason TBD",
"lowestAverage": "Reason TBD",
"max": "Reason TBD",
"min": "Reason TBD",
"sample": "Reason TBD",
"selector_preserve_time": "Reason TBD",
"shift": "Reason TBD",
"shift_negative_duration": "Reason TBD",
"task_per_line": "Reason TBD",
"top": "Reason TBD",
"union": "Reason TBD",
"union_heterogeneous": "Reason TBD",
"unique": "Reason TBD",
"distinct": "Reason TBD",
// it appears these occur when writing the input data. `to` may not be null safe.
"fill_bool": "failed to read meta data: panic: interface conversion: interface {} is nil, not uint64",
"fill_float": "failed to read meta data: panic: interface conversion: interface {} is nil, not uint64",
"fill_int": "failed to read meta data: panic: interface conversion: interface {} is nil, not uint64",
"fill_string": "failed to read meta data: panic: interface conversion: interface {} is nil, not uint64",
"fill_time": "failed to read meta data: panic: interface conversion: interface {} is nil, not uint64",
"fill_uint": "failed to read meta data: panic: interface conversion: interface {} is nil, not uint64",
"window_null": "failed to read meta data: panic: interface conversion: interface {} is nil, not float64",
"fill_default": "unknown field type for f1",
// it appears these occur when writing the input data. `to` may not be null safe.
"fill_bool": "failed to read meta data: panic: interface conversion: interface {} is nil, not uint64",
"fill_float": "failed to read meta data: panic: interface conversion: interface {} is nil, not uint64",
"fill_int": "failed to read meta data: panic: interface conversion: interface {} is nil, not uint64",
"fill_string": "failed to read meta data: panic: interface conversion: interface {} is nil, not uint64",
"fill_time": "failed to read meta data: panic: interface conversion: interface {} is nil, not uint64",
"fill_uint": "failed to read meta data: panic: interface conversion: interface {} is nil, not uint64",
"window_null": "failed to read meta data: panic: interface conversion: interface {} is nil, not float64",
// these may just be missing calls to range() in the tests. easy to fix in a new PR.
"group_nulls": "unbounded test",
"integral": "unbounded test",
"integral_columns": "unbounded test",
"map": "unbounded test",
"buckets": "unbounded test",
"join_missing_on_col": "unbounded test",
// these may just be missing calls to range() in the tests. easy to fix in a new PR.
"group_nulls": "unbounded test",
"integral": "unbounded test",
"integral_columns": "unbounded test",
"map": "unbounded test",
"join_missing_on_col": "unbounded test",
"rowfn_with_import": "unbounded test",
// the following tests have a difference between the CSV-decoded input table, and the storage-retrieved version of that table
"columns": "group key mismatch",
"set": "column order mismatch",
"simple_max": "_stop missing from expected output",
"derivative": "time bounds mismatch (engine uses now() instead of bounds on input table)",
"difference_columns": "data write/read path loses columns x and y",
"keys": "group key mismatch",
// the following tests have a difference between the CSV-decoded input table, and the storage-retrieved version of that table
"columns": "group key mismatch",
"set": "column order mismatch",
"simple_max": "_stop missing from expected output",
"derivative": "time bounds mismatch (engine uses now() instead of bounds on input table)",
"difference_columns": "data write/read path loses columns x and y",
"keys": "group key mismatch",
// failed to read meta data errors: the CSV encoding is incomplete probably due to data schema errors. needs more detailed investigation to find root cause of error
"filter_by_regex": "failed to read metadata",
"filter_by_tags": "failed to read metadata",
"group": "failed to read metadata",
"group_except": "failed to read metadata",
"group_ungroup": "failed to read metadata",
"pivot_mean": "failed to read metadata",
"histogram_quantile_minvalue": "failed to read meta data: no column with label _measurement exists",
"increase": "failed to read meta data: table has no _value column",
// failed to read meta data errors: the CSV encoding is incomplete probably due to data schema errors. needs more detailed investigation to find root cause of error
"filter_by_regex": "failed to read metadata",
"filter_by_tags": "failed to read metadata",
"group": "failed to read metadata",
"group_except": "failed to read metadata",
"group_ungroup": "failed to read metadata",
"pivot_mean": "failed to read metadata",
"histogram_quantile_minvalue": "failed to read meta data: no column with label _measurement exists",
"increase": "failed to read meta data: table has no _value column",
"string_max": "error: invalid use of function: *functions.MaxSelector has no implementation for type string (https://github.com/influxdata/platform/issues/224)",
"null_as_value": "null not supported as value in influxql (https://github.com/influxdata/platform/issues/353)",
"string_interp": "string interpolation not working as expected in flux (https://github.com/influxdata/platform/issues/404)",
"to": "to functions are not supported in the testing framework (https://github.com/influxdata/flux/issues/77)",
"covariance_missing_column_1": "need to support known errors in new test framework (https://github.com/influxdata/flux/issues/536)",
"covariance_missing_column_2": "need to support known errors in new test framework (https://github.com/influxdata/flux/issues/536)",
"drop_before_rename": "need to support known errors in new test framework (https://github.com/influxdata/flux/issues/536)",
"drop_referenced": "need to support known errors in new test framework (https://github.com/influxdata/flux/issues/536)",
"yield": "yield requires special test case (https://github.com/influxdata/flux/issues/535)",
"rowfn_with_import": "imported libraries are not visible in user-defined functions (https://github.com/influxdata/flux/issues/1000)",
"string_trim": "imported libraries are not visible in user-defined functions (https://github.com/influxdata/flux/issues/1000)",
"string_max": "error: invalid use of function: *functions.MaxSelector has no implementation for type string (https://github.com/influxdata/platform/issues/224)",
"null_as_value": "null not supported as value in influxql (https://github.com/influxdata/platform/issues/353)",
"string_interp": "string interpolation not working as expected in flux (https://github.com/influxdata/platform/issues/404)",
"to": "to functions are not supported in the testing framework (https://github.com/influxdata/flux/issues/77)",
"covariance_missing_column_1": "need to support known errors in new test framework (https://github.com/influxdata/flux/issues/536)",
"covariance_missing_column_2": "need to support known errors in new test framework (https://github.com/influxdata/flux/issues/536)",
"drop_before_rename": "need to support known errors in new test framework (https://github.com/influxdata/flux/issues/536)",
"drop_referenced": "need to support known errors in new test framework (https://github.com/influxdata/flux/issues/536)",
"yield": "yield requires special test case (https://github.com/influxdata/flux/issues/535)",
"window_group_mean_ungroup": "window trigger optimization modifies sort order of its output tables (https://github.com/influxdata/flux/issues/1067)",
"window_group_mean_ungroup": "window trigger optimization modifies sort order of its output tables (https://github.com/influxdata/flux/issues/1067)",
"median_column": "failing in different ways (https://github.com/influxdata/influxdb/issues/13909)",
"dynamic_query": "panic when executing",
"median_column": "failing in different ways (https://github.com/influxdata/influxdb/issues/13909)",
"dynamic_query": "panic when executing",
"regexp_replaceAllString": "Reason TBD",
"extract_regexp_findStringIndex": "pandas. map does not correctly handled returned arrays (https://github.com/influxdata/flux/issues/1387)",
"partition_strings_splitN": "pandas. map does not correctly handled returned arrays (https://github.com/influxdata/flux/issues/1387)",
"to_int": "dateTime conversion issue: https://github.com/influxdata/influxdb/issues/14575",
"to_uint": "dateTime conversion issue: https://github.com/influxdata/influxdb/issues/14575",
"check": "Cannot see overridden options from inside stdlib functions (https://github.com/influxdata/flux/issues/1720)",
"http_endpoint": "need ability to test side effects in e2e tests: (https://github.com/influxdata/flux/issues/1723)",
"holt_winters_panic": "Expected output is an empty table which breaks the testing framework (https://github.com/influxdata/influxdb/issues/14749)",
"secrets": "Cannot inject custom deps into the test framework so the secrets don't lookup correctly",
"to_int": "dateTime conversion issue: https://github.com/influxdata/influxdb/issues/14575",
"to_uint": "dateTime conversion issue: https://github.com/influxdata/influxdb/issues/14575",
"holt_winters_panic": "Expected output is an empty table which breaks the testing framework (https://github.com/influxdata/influxdb/issues/14749)",
},
"experimental": {
"set": "Reason TBD",
},
"regexp": {
"replaceAllString": "Reason TBD",
},
"http": {
"http_endpoint": "need ability to test side effects in e2e tests: (https://github.com/influxdata/flux/issues/1723)",
},
"influxdata/influxdb/monitor": {
"check": "Cannot see overridden options from inside stdlib functions (https://github.com/influxdata/flux/issues/1720)",
},
"influxdata/influxdb/secrets": {
"secrets": "Cannot inject custom deps into the test framework so the secrets don't lookup correctly",
},
"internal/promql": {
"join": "unbounded test",
},
"testing/chronograf": {
"buckets": "unbounded test",
},
"testing/kapacitor": {
"fill_default": "unknown field type for f1",
},
"testing/pandas": {
"extract_regexp_findStringIndex": "pandas. map does not correctly handled returned arrays (https://github.com/influxdata/flux/issues/1387)",
"partition_strings_splitN": "pandas. map does not correctly handled returned arrays (https://github.com/influxdata/flux/issues/1387)",
},
// PromQL tests don't work. Missing `range` or `_measurement`.
"promql_changes": "unbounded or missing _measurement column",
"promql_dayOfMonth": "unbounded or missing _measurement column",
"promql_dayOfWeek": "unbounded or missing _measurement column",
"promql_daysInMonth": "unbounded or missing _measurement column",
"promql_emptyTable": "unbounded or missing _measurement column",
"promql_extrapolatedRate_counter_rate": "unbounded or missing _measurement column",
"promql_extrapolatedRate_nocounter": "unbounded or missing _measurement column",
"promql_extrapolatedRate_norate": "unbounded or missing _measurement column",
"promql_histogramQuantile": "unbounded or missing _measurement column",
"promql_holtWinters": "unbounded or missing _measurement column",
"promql_hour": "unbounded or missing _measurement column",
"promql_instantRate": "unbounded or missing _measurement column",
"promql_labelReplace_empty_dst": "unbounded or missing _measurement column",
"promql_labelReplace_full_string_match": "unbounded or missing _measurement column",
"promql_labelReplace_multiple_groups": "unbounded or missing _measurement column",
"promql_labelReplace_src_empty": "unbounded or missing _measurement column",
"promql_labelReplace_src_nonexistent": "unbounded or missing _measurement column",
"promql_labelReplace_src_not_matched": "unbounded or missing _measurement column",
"promql_labelReplace_sub_string_match": "unbounded or missing _measurement column",
"promql_linearRegression_nopredict": "unbounded or missing _measurement column",
"promql_linearRegression_predict": "unbounded or missing _measurement column",
"promql_minute": "unbounded or missing _measurement column",
"promql_month": "unbounded or missing _measurement column",
"promql_resets": "unbounded or missing _measurement column",
"promql_timestamp": "unbounded or missing _measurement column",
"promql_year": "unbounded or missing _measurement column",
"testing/promql": {
"changes": "unbounded or missing _measurement column",
"dayOfMonth": "unbounded or missing _measurement column",
"dayOfWeek": "unbounded or missing _measurement column",
"daysInMonth": "unbounded or missing _measurement column",
"emptyTable": "unbounded or missing _measurement column",
"extrapolatedRate_counter_rate": "unbounded or missing _measurement column",
"extrapolatedRate_nocounter": "unbounded or missing _measurement column",
"extrapolatedRate_norate": "unbounded or missing _measurement column",
"histogramQuantile": "unbounded or missing _measurement column",
"holtWinters": "unbounded or missing _measurement column",
"hour": "unbounded or missing _measurement column",
"instantRate": "unbounded or missing _measurement column",
"labelReplace_empty_dst": "unbounded or missing _measurement column",
"labelReplace_full_string_match": "unbounded or missing _measurement column",
"labelReplace_multiple_groups": "unbounded or missing _measurement column",
"labelReplace_src_empty": "unbounded or missing _measurement column",
"labelReplace_src_nonexistent": "unbounded or missing _measurement column",
"labelReplace_src_not_matched": "unbounded or missing _measurement column",
"labelReplace_sub_string_match": "unbounded or missing _measurement column",
"linearRegression_nopredict": "unbounded or missing _measurement column",
"linearRegression_predict": "unbounded or missing _measurement column",
"minute": "unbounded or missing _measurement column",
"month": "unbounded or missing _measurement column",
"resets": "unbounded or missing _measurement column",
"timestamp": "unbounded or missing _measurement column",
"year": "unbounded or missing _measurement column",
},
}

View File

@ -240,6 +240,7 @@ func TestScheduler_CreateNextRunOnTick(t *testing.T) {
}
func TestScheduler_LogStatisticsOnSuccess(t *testing.T) {
t.Skip("flaky test: https://github.com/influxdata/influxdb/issues/15394")
t.Parallel()
tcs := mock.NewTaskControlService()