125 lines
3.3 KiB
Go
125 lines
3.3 KiB
Go
package gather
|
|
|
|
import (
|
|
"context"
|
|
"net/http/httptest"
|
|
"strings"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/influxdata/influxdb/v2"
|
|
"github.com/influxdata/influxdb/v2/kit/platform"
|
|
"github.com/influxdata/influxdb/v2/mock"
|
|
"github.com/influxdata/influxdb/v2/models"
|
|
influxdbtesting "github.com/influxdata/influxdb/v2/testing"
|
|
dto "github.com/prometheus/client_model/go"
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/require"
|
|
"go.uber.org/zap/zaptest"
|
|
)
|
|
|
|
func TestScheduler(t *testing.T) {
|
|
totalGatherJobs := 20
|
|
|
|
// Create top level logger
|
|
logger := zaptest.NewLogger(t)
|
|
ts := httptest.NewServer(&mockHTTPHandler{
|
|
responseMap: map[string]string{
|
|
"/metrics": sampleRespSmall,
|
|
},
|
|
})
|
|
defer ts.Close()
|
|
|
|
storage := &mockStorage{
|
|
Metrics: make(map[time.Time]Metrics),
|
|
Targets: []influxdb.ScraperTarget{
|
|
{
|
|
ID: influxdbtesting.MustIDBase16("3a0d0a6365646120"),
|
|
Type: influxdb.PrometheusScraperType,
|
|
URL: ts.URL + "/metrics",
|
|
OrgID: *orgID,
|
|
BucketID: *bucketID,
|
|
},
|
|
},
|
|
}
|
|
|
|
gatherJobs := make(chan []models.Point)
|
|
done := make(chan struct{})
|
|
writer := &mock.PointsWriter{}
|
|
writer.WritePointsFn = func(ctx context.Context, orgID platform.ID, bucketID platform.ID, points []models.Point) error {
|
|
select {
|
|
case gatherJobs <- points:
|
|
case <-done:
|
|
}
|
|
return nil
|
|
}
|
|
|
|
scheduler, err := NewScheduler(logger, 10, 2, storage, writer, 1*time.Millisecond)
|
|
require.NoError(t, err)
|
|
defer scheduler.Close()
|
|
defer close(done) //don't block the points writer forever
|
|
|
|
// make sure all jobs are done
|
|
pointWrites := [][]models.Point{}
|
|
for i := 0; i < totalGatherJobs; i++ {
|
|
newWrite := <-gatherJobs
|
|
pointWrites = append(pointWrites, newWrite)
|
|
assert.Equal(t, 1, len(newWrite))
|
|
newWrite[0].SetTime(time.Unix(0, 0)) // zero out the time so we don't have to compare it
|
|
assert.Equal(t, "go_goroutines gauge=36 0", newWrite[0].String())
|
|
}
|
|
|
|
if len(pointWrites) < totalGatherJobs {
|
|
t.Fatalf("metrics stored less than expected, got len %d", len(storage.Metrics))
|
|
}
|
|
}
|
|
|
|
const sampleRespSmall = `
|
|
# HELP go_goroutines Number of goroutines that currently exist.
|
|
# TYPE go_goroutines gauge
|
|
go_goroutines 36
|
|
`
|
|
|
|
func TestMetricsToPoints(t *testing.T) {
|
|
const overflow = 3
|
|
const goodPoints = 2
|
|
tags := map[string]string{"one": "first", "two": "second", "three": "third"}
|
|
fields := map[string]interface{}{"first_field": 32.2}
|
|
|
|
ms := MetricsSlice{
|
|
{
|
|
Name: "a",
|
|
Tags: tags,
|
|
Fields: fields,
|
|
Timestamp: time.Now(),
|
|
Type: dto.MetricType_GAUGE,
|
|
},
|
|
{
|
|
Name: "b",
|
|
Tags: tags,
|
|
Fields: fields,
|
|
Timestamp: time.Now(),
|
|
Type: dto.MetricType_GAUGE,
|
|
}, {
|
|
Name: strings.Repeat("c", models.MaxKeyLength+overflow),
|
|
Tags: tags,
|
|
Fields: fields,
|
|
Timestamp: time.Now(),
|
|
Type: dto.MetricType_GAUGE,
|
|
},
|
|
{
|
|
Name: "d",
|
|
Tags: tags,
|
|
Fields: fields,
|
|
Timestamp: time.Now(),
|
|
Type: dto.MetricType_GAUGE,
|
|
},
|
|
}
|
|
ps, err := ms.Points()
|
|
assert.ErrorContains(t, err, "max key length exceeded", "MetricSlice.Points did not have a 'max key length exceeded' error")
|
|
assert.Equal(t, goodPoints, len(ps), "wrong number of Points returned from MetricSlice.Points")
|
|
for _, p := range ps {
|
|
assert.NotNil(t, p, "nil Point object returned from MetricSlice.Points")
|
|
}
|
|
}
|