feat(query): add unused memory bytes prom metric to controller

pull/17414/head
Lorenzo Affetti 2020-03-25 18:03:38 +01:00
parent bdd6dc644a
commit 7f70353b18
No known key found for this signature in database
GPG Key ID: 76FFBBDF6F0ADC4C
4 changed files with 64 additions and 6 deletions

View File

@ -420,6 +420,8 @@ func (c *Controller) executeQuery(q *Query) {
}
q.c.createAllocator(q)
// Record unused memory before start.
q.recordUnusedMemory()
exec, err := q.program.Start(ctx, q.alloc)
if err != nil {
q.setErr(err)
@ -561,6 +563,11 @@ func (q *Query) Results() <-chan flux.Result {
return q.results
}
func (q *Query) recordUnusedMemory() {
unused := q.memoryManager.getUnusedMemoryBytes()
q.c.metrics.memoryUnused.WithLabelValues(q.labelValues...).Set(float64(unused))
}
// Done signals to the Controller that this query is no longer
// being used and resources related to the query may be freed.
func (q *Query) Done() {
@ -614,14 +621,17 @@ func (q *Query) Done() {
// Release the additional memory associated with this query.
if q.memoryManager != nil {
q.memoryManager.Release()
// Record unused memory after finish.
q.recordUnusedMemory()
}
// count query request
// Count query request.
if q.err != nil || len(q.runtimeErrs) > 0 {
q.c.countQueryRequest(q, labelRuntimeError)
} else {
q.c.countQueryRequest(q, labelSuccess)
}
})
<-q.doneCh
}

View File

@ -95,6 +95,29 @@ func validateRequestTotals(t testing.TB, reg *prometheus.Registry, success, comp
validate("queue_error", queue)
}
func validateUnusedMemory(t testing.TB, reg *prometheus.Registry, c control.Config) {
t.Helper()
metrics, err := reg.Gather()
if err != nil {
t.Fatal(err)
}
m := FindMetric(
metrics,
"query_control_memory_unused_bytes",
map[string]string{
"org": "",
},
)
var got int64
if m != nil {
got = int64(*m.Gauge.Value)
}
want := c.MaxMemoryBytes - (int64(c.ConcurrencyQuota) * c.InitialMemoryBytesQuotaPerQuery)
if got != want {
t.Errorf("unexpected memory unused bytes: got %d want: %d", got, want)
}
}
func TestController_QuerySuccess(t *testing.T) {
ctrl, err := control.New(config)
if err != nil {
@ -908,6 +931,7 @@ func TestController_Error_MaxMemory(t *testing.T) {
t.Fatal(err)
}
defer shutdown(t, ctrl)
reg := setupPromRegistry(ctrl)
compiler := &mock.Compiler{
CompileFn: func(ctx context.Context) (flux.Program, error) {
@ -938,6 +962,7 @@ func TestController_Error_MaxMemory(t *testing.T) {
return
}
consumeResults(t, q)
validateUnusedMemory(t, reg, config)
}
// This tests that we can continuously run queries that do not use
@ -964,6 +989,7 @@ func TestController_NoisyNeighbor(t *testing.T) {
t.Fatal(err)
}
defer shutdown(t, ctrl)
reg := setupPromRegistry(ctrl)
wellBehavedNeighbor := &mock.Compiler{
CompileFn: func(ctx context.Context) (flux.Program, error) {
@ -1045,6 +1071,7 @@ func TestController_NoisyNeighbor(t *testing.T) {
for err := range errCh {
t.Fatalf("unexpected error: %s", err)
}
validateUnusedMemory(t, reg, config)
}
// This tests that a query that should be allowed is killed
@ -1064,6 +1091,7 @@ func TestController_Error_NoRemainingMemory(t *testing.T) {
t.Fatal(err)
}
defer shutdown(t, ctrl)
reg := setupPromRegistry(ctrl)
compiler := &mock.Compiler{
CompileFn: func(ctx context.Context) (flux.Program, error) {
@ -1091,6 +1119,7 @@ func TestController_Error_NoRemainingMemory(t *testing.T) {
t.Fatalf("unexpected error: %s", err)
}
consumeResults(t, q)
validateUnusedMemory(t, reg, config)
}
// This test ensures the memory that the extra memory allocated
@ -1105,6 +1134,7 @@ func TestController_MemoryRelease(t *testing.T) {
t.Fatal(err)
}
defer shutdown(t, ctrl)
reg := setupPromRegistry(ctrl)
compiler := &mock.Compiler{
CompileFn: func(ctx context.Context) (flux.Program, error) {
@ -1134,6 +1164,7 @@ func TestController_MemoryRelease(t *testing.T) {
return
}
}
validateUnusedMemory(t, reg, config)
}
// Set an irregular memory quota so that doubling the limit continuously
@ -1150,6 +1181,7 @@ func TestController_IrregularMemoryQuota(t *testing.T) {
t.Fatal(err)
}
defer shutdown(t, ctrl)
reg := setupPromRegistry(ctrl)
compiler := &mock.Compiler{
CompileFn: func(ctx context.Context) (flux.Program, error) {
@ -1180,6 +1212,7 @@ func TestController_IrregularMemoryQuota(t *testing.T) {
return
}
consumeResults(t, q)
validateUnusedMemory(t, reg, config)
}
// This tests that if we run a bunch of queries that reserve memory,
@ -1207,6 +1240,7 @@ func TestController_ReserveMemoryWithoutExceedingMax(t *testing.T) {
t.Fatal(err)
}
defer shutdown(t, ctrl)
reg := setupPromRegistry(ctrl)
compiler := &mock.Compiler{
CompileFn: func(ctx context.Context) (flux.Program, error) {
@ -1252,6 +1286,7 @@ func TestController_ReserveMemoryWithoutExceedingMax(t *testing.T) {
for err := range errCh {
t.Fatalf("unexpected error: %s", err)
}
validateUnusedMemory(t, reg, config)
}
func consumeResults(tb testing.TB, q flux.Query) {

View File

@ -49,6 +49,10 @@ type queryMemoryManager struct {
given int64
}
func (q *queryMemoryManager) getUnusedMemoryBytes() int64 {
return atomic.LoadInt64(&q.m.unusedMemoryBytes)
}
// RequestMemory will determine if the query can be given more memory
// when it is requested.
//

View File

@ -7,10 +7,11 @@ type controllerMetrics struct {
requests *prometheus.CounterVec
functions *prometheus.CounterVec
all *prometheus.GaugeVec
compiling *prometheus.GaugeVec
queueing *prometheus.GaugeVec
executing *prometheus.GaugeVec
all *prometheus.GaugeVec
compiling *prometheus.GaugeVec
queueing *prometheus.GaugeVec
executing *prometheus.GaugeVec
memoryUnused *prometheus.GaugeVec
allDur *prometheus.HistogramVec
compilingDur *prometheus.HistogramVec
@ -76,6 +77,13 @@ func newControllerMetrics(labels []string) *controllerMetrics {
Help: "Number of queries actively executing",
}, labels),
memoryUnused: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "memory_unused_bytes",
Help: "The free memory as seen by the internal memory manager",
}, labels),
allDur: prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
@ -110,7 +118,7 @@ func newControllerMetrics(labels []string) *controllerMetrics {
}
}
// PrometheusCollectors satisifies the prom.PrometheusCollector interface.
// PrometheusCollectors satisfies the prom.PrometheusCollector interface.
func (cm *controllerMetrics) PrometheusCollectors() []prometheus.Collector {
return []prometheus.Collector{
cm.requests,
@ -120,6 +128,7 @@ func (cm *controllerMetrics) PrometheusCollectors() []prometheus.Collector {
cm.compiling,
cm.queueing,
cm.executing,
cm.memoryUnused,
cm.allDur,
cm.compilingDur,