Use local RNG in SampleReducer

The reducers already had a local RNG but mistakenly did not use it when
sampling points.

Because the local RNG is not protected by a mutex, there is a slight
speedup as a result of this change:

benchmark                          old ns/op     new ns/op     delta
BenchmarkSampleIterator_1k-4       418           418           +0.00%
BenchmarkSampleIterator_100k-4     434           422           -2.76%
BenchmarkSampleIterator_1M-4       449           439           -2.23%

benchmark                          old allocs     new allocs     delta
BenchmarkSampleIterator_1k-4       3              3              +0.00%
BenchmarkSampleIterator_100k-4     3              3              +0.00%
BenchmarkSampleIterator_1M-4       3              3              +0.00%

benchmark                          old bytes     new bytes     delta
BenchmarkSampleIterator_1k-4       304           304           +0.00%
BenchmarkSampleIterator_100k-4     304           304           +0.00%
BenchmarkSampleIterator_1M-4       304           304           +0.00%

The speedup would presumably increase when multiple sample iterators are
used concurrently.
pull/7717/head
Mark Rushakoff 2016-12-13 10:06:33 -08:00
parent 049f9b42e9
commit a29781286b
3 changed files with 23 additions and 18 deletions

View File

@ -409,7 +409,7 @@ func (r *FloatSampleReducer) AggregateFloat(p *FloatPoint) {
// Generate a random integer between 1 and the count and
// if that number is less than the length of the slice
// replace the point at that index rnd with p.
rnd := rand.Intn(r.count)
rnd := r.rng.Intn(r.count)
if rnd < len(r.points) {
r.points[rnd] = *p
}
@ -823,7 +823,7 @@ func (r *IntegerSampleReducer) AggregateInteger(p *IntegerPoint) {
// Generate a random integer between 1 and the count and
// if that number is less than the length of the slice
// replace the point at that index rnd with p.
rnd := rand.Intn(r.count)
rnd := r.rng.Intn(r.count)
if rnd < len(r.points) {
r.points[rnd] = *p
}
@ -1237,7 +1237,7 @@ func (r *StringSampleReducer) AggregateString(p *StringPoint) {
// Generate a random integer between 1 and the count and
// if that number is less than the length of the slice
// replace the point at that index rnd with p.
rnd := rand.Intn(r.count)
rnd := r.rng.Intn(r.count)
if rnd < len(r.points) {
r.points[rnd] = *p
}
@ -1651,7 +1651,7 @@ func (r *BooleanSampleReducer) AggregateBoolean(p *BooleanPoint) {
// Generate a random integer between 1 and the count and
// if that number is less than the length of the slice
// replace the point at that index rnd with p.
rnd := rand.Intn(r.count)
rnd := r.rng.Intn(r.count)
if rnd < len(r.points) {
r.points[rnd] = *p
}

View File

@ -198,7 +198,7 @@ func (r *{{$k.Name}}SampleReducer) Aggregate{{$k.Name}}(p *{{$k.Name}}Point) {
// Generate a random integer between 1 and the count and
// if that number is less than the length of the slice
// replace the point at that index rnd with p.
rnd := rand.Intn(r.count)
rnd := r.rng.Intn(r.count)
if rnd < len(r.points) {
r.points[rnd] = *p
}

View File

@ -390,10 +390,9 @@ func TestHoltWinters_MaxTime(t *testing.T) {
// TestSample_AllSamplesSeen attempts to verify that it is possible
// to get every subsample in a reasonable number of iterations.
//
// The idea here is that 6 iterations should be enough to hit every possible
// sequence atleast once.
// The idea here is that 30 iterations should be enough to hit every possible
// sequence at least once.
func TestSample_AllSamplesSeen(t *testing.T) {
ps := []influxql.FloatPoint{
{Time: 1, Value: 1},
{Time: 2, Value: 2},
@ -416,9 +415,9 @@ func TestSample_AllSamplesSeen(t *testing.T) {
},
}
// 6 iterations should be more than sufficient to garentee that
// 30 iterations should be sufficient to guarantee that
// we hit every possible subsample.
for i := 0; i < 6; i++ {
for i := 0; i < 30; i++ {
s := influxql.NewFloatSampleReducer(2)
for _, p := range ps {
s.AggregateFloat(&p)
@ -426,26 +425,32 @@ func TestSample_AllSamplesSeen(t *testing.T) {
points := s.Emit()
// if samples is empty we've seen every sample, so we're done
if len(samples) == 0 {
return
}
for i, sample := range samples {
// if we find a sample that it matches, remove it from
// this list of possible samples
if deep.Equal(sample, points) {
samples = append(samples[:i], samples[i+1:]...)
break
}
}
// if samples is empty we've seen every sample, so we're done
if len(samples) == 0 {
return
}
// The FloatSampleReducer is seeded with time.Now().UnixNano(), and without this sleep,
// this test will fail on machines where UnixNano doesn't return full resolution.
// Specifically, some Windows machines will only return timestamps accurate to 100ns.
// While iterating through this test without an explicit sleep,
// we would only see one or two unique seeds across all the calls to NewFloatSampleReducer.
time.Sleep(time.Millisecond)
}
// If we missed a sample, report the error
if exp, got := 0, len(samples); exp != got {
t.Fatalf("expected to get every sample: got %d, exp %d", got, exp)
if len(samples) != 0 {
t.Fatalf("expected all samples to be seen; unseen samples: %#v", samples)
}
}
func TestSample_SampleSizeLessThanNumPoints(t *testing.T) {