test: fix flaky enqueue test (#23035)
parent
11c00813f1
commit
c1d384de19
|
@ -5,6 +5,7 @@ import (
|
|||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
@ -83,20 +84,23 @@ func TestEnqueueScan(t *testing.T) {
|
|||
err := qm.InitializeQueue(id1, maxQueueSizeBytes, orgID1, localBucketID1)
|
||||
require.NoError(t, err)
|
||||
rq := qm.replicationQueues[id1]
|
||||
rq.remoteWriter = getTestRemoteWriter(t, data, tt.writeFuncReturn)
|
||||
var writeCounter sync.WaitGroup
|
||||
rq.remoteWriter = getTestRemoteWriter(t, data, tt.writeFuncReturn, &writeCounter)
|
||||
|
||||
// Enqueue the data
|
||||
for _, dat := range tt.testData {
|
||||
writeCounter.Add(1)
|
||||
err = qm.EnqueueData(id1, []byte(dat), 1)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
writeCounter.Wait()
|
||||
|
||||
// Check queue position
|
||||
closeRq(rq)
|
||||
scan, err := rq.queue.NewScanner()
|
||||
|
||||
if tt.writeFuncReturn == nil {
|
||||
require.ErrorIs(t, io.EOF, err)
|
||||
require.ErrorIs(t, err, io.EOF)
|
||||
} else {
|
||||
// Queue should not have advanced at all
|
||||
for range tt.testData {
|
||||
|
@ -353,11 +357,14 @@ func (tw *testRemoteWriter) Write(data []byte) error {
|
|||
return tw.writeFn(data)
|
||||
}
|
||||
|
||||
func getTestRemoteWriter(t *testing.T, expected string, returning error) remoteWriter {
|
||||
func getTestRemoteWriter(t *testing.T, expected string, returning error, wg *sync.WaitGroup) remoteWriter {
|
||||
t.Helper()
|
||||
|
||||
writeFn := func(b []byte) error {
|
||||
require.Equal(t, expected, string(b))
|
||||
if wg != nil {
|
||||
wg.Done()
|
||||
}
|
||||
return returning
|
||||
}
|
||||
|
||||
|
@ -424,7 +431,7 @@ func TestEnqueueData_WithMetrics(t *testing.T) {
|
|||
data := "some fake data"
|
||||
numPointsPerData := 3
|
||||
numDataToAdd := 4
|
||||
rq.remoteWriter = getTestRemoteWriter(t, data, nil)
|
||||
rq.remoteWriter = getTestRemoteWriter(t, data, nil, nil)
|
||||
|
||||
for i := 1; i <= numDataToAdd; i++ {
|
||||
go func() { <-rq.receive }() // absorb the receive to avoid testcase deadlock
|
||||
|
|
Loading…
Reference in New Issue