influxdb/gather/scheduler_test.go

125 lines
3.3 KiB
Go

package gather
import (
"context"
"net/http/httptest"
"strings"
"testing"
"time"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/kit/platform"
"github.com/influxdata/influxdb/v2/mock"
"github.com/influxdata/influxdb/v2/models"
influxdbtesting "github.com/influxdata/influxdb/v2/testing"
dto "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
)
func TestScheduler(t *testing.T) {
totalGatherJobs := 20
// Create top level logger
logger := zaptest.NewLogger(t)
ts := httptest.NewServer(&mockHTTPHandler{
responseMap: map[string]string{
"/metrics": sampleRespSmall,
},
})
defer ts.Close()
storage := &mockStorage{
Metrics: make(map[time.Time]Metrics),
Targets: []influxdb.ScraperTarget{
{
ID: influxdbtesting.MustIDBase16("3a0d0a6365646120"),
Type: influxdb.PrometheusScraperType,
URL: ts.URL + "/metrics",
OrgID: *orgID,
BucketID: *bucketID,
},
},
}
gatherJobs := make(chan []models.Point)
done := make(chan struct{})
writer := &mock.PointsWriter{}
writer.WritePointsFn = func(ctx context.Context, orgID platform.ID, bucketID platform.ID, points []models.Point) error {
select {
case gatherJobs <- points:
case <-done:
}
return nil
}
scheduler, err := NewScheduler(logger, 10, 2, storage, writer, 1*time.Millisecond)
require.NoError(t, err)
defer scheduler.Close()
defer close(done) //don't block the points writer forever
// make sure all jobs are done
pointWrites := [][]models.Point{}
for i := 0; i < totalGatherJobs; i++ {
newWrite := <-gatherJobs
pointWrites = append(pointWrites, newWrite)
assert.Equal(t, 1, len(newWrite))
newWrite[0].SetTime(time.Unix(0, 0)) // zero out the time so we don't have to compare it
assert.Equal(t, "go_goroutines gauge=36 0", newWrite[0].String())
}
if len(pointWrites) < totalGatherJobs {
t.Fatalf("metrics stored less than expected, got len %d", len(storage.Metrics))
}
}
const sampleRespSmall = `
# HELP go_goroutines Number of goroutines that currently exist.
# TYPE go_goroutines gauge
go_goroutines 36
`
func TestMetricsToPoints(t *testing.T) {
const overflow = 3
const goodPoints = 2
tags := map[string]string{"one": "first", "two": "second", "three": "third"}
fields := map[string]interface{}{"first_field": 32.2}
ms := MetricsSlice{
{
Name: "a",
Tags: tags,
Fields: fields,
Timestamp: time.Now(),
Type: dto.MetricType_GAUGE,
},
{
Name: "b",
Tags: tags,
Fields: fields,
Timestamp: time.Now(),
Type: dto.MetricType_GAUGE,
}, {
Name: strings.Repeat("c", models.MaxKeyLength+overflow),
Tags: tags,
Fields: fields,
Timestamp: time.Now(),
Type: dto.MetricType_GAUGE,
},
{
Name: "d",
Tags: tags,
Fields: fields,
Timestamp: time.Now(),
Type: dto.MetricType_GAUGE,
},
}
ps, err := ms.Points()
assert.ErrorContains(t, err, "max key length exceeded", "MetricSlice.Points did not have a 'max key length exceeded' error")
assert.Equal(t, goodPoints, len(ps), "wrong number of Points returned from MetricSlice.Points")
for _, p := range ps {
assert.NotNil(t, p, "nil Point object returned from MetricSlice.Points")
}
}