test: fix flaky replications tests (#22973)
* fix: fix test and run 20 times * fix: unfix and run test 20 times * test: wait for rq run fn to return in testspull/22983/head
parent
e5cbd279ee
commit
e3ff434f81
|
@ -87,8 +87,7 @@ func TestEnqueueScan(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check queue position
|
// Check queue position
|
||||||
close(rq.done)
|
closeRq(rq)
|
||||||
rq.wg.Wait()
|
|
||||||
scan, err := rq.queue.NewScanner()
|
scan, err := rq.queue.NewScanner()
|
||||||
|
|
||||||
if tt.writeFuncReturn == nil {
|
if tt.writeFuncReturn == nil {
|
||||||
|
@ -371,7 +370,7 @@ func TestEnqueueData(t *testing.T) {
|
||||||
// close the scanner goroutine to specifically test EnqueueData()
|
// close the scanner goroutine to specifically test EnqueueData()
|
||||||
rq, ok := qm.replicationQueues[id1]
|
rq, ok := qm.replicationQueues[id1]
|
||||||
require.True(t, ok)
|
require.True(t, ok)
|
||||||
close(rq.done)
|
closeRq(rq)
|
||||||
go func() { <-rq.receive }() // absorb the receive to avoid testcase deadlock
|
go func() { <-rq.receive }() // absorb the receive to avoid testcase deadlock
|
||||||
|
|
||||||
require.NoError(t, qm.EnqueueData(id1, []byte(data), 1))
|
require.NoError(t, qm.EnqueueData(id1, []byte(data), 1))
|
||||||
|
@ -396,7 +395,7 @@ func TestEnqueueData_WithMetrics(t *testing.T) {
|
||||||
// close the scanner goroutine to specifically test EnqueueData()
|
// close the scanner goroutine to specifically test EnqueueData()
|
||||||
rq, ok := qm.replicationQueues[id1]
|
rq, ok := qm.replicationQueues[id1]
|
||||||
require.True(t, ok)
|
require.True(t, ok)
|
||||||
close(rq.done)
|
closeRq(rq)
|
||||||
|
|
||||||
reg := prom.NewRegistry(zaptest.NewLogger(t))
|
reg := prom.NewRegistry(zaptest.NewLogger(t))
|
||||||
reg.MustRegister(qm.metrics.PrometheusCollectors()...)
|
reg.MustRegister(qm.metrics.PrometheusCollectors()...)
|
||||||
|
@ -471,7 +470,7 @@ func TestGoroutineReceives(t *testing.T) {
|
||||||
rq, ok := qm.replicationQueues[id1]
|
rq, ok := qm.replicationQueues[id1]
|
||||||
require.True(t, ok)
|
require.True(t, ok)
|
||||||
require.NotNil(t, rq)
|
require.NotNil(t, rq)
|
||||||
close(rq.done) // atypical from normal behavior, but lets us receive channels to test
|
closeRq(rq) // atypical from normal behavior, but lets us receive channels to test
|
||||||
|
|
||||||
go func() { require.NoError(t, qm.EnqueueData(id1, []byte("1234"), 1)) }()
|
go func() { require.NoError(t, qm.EnqueueData(id1, []byte("1234"), 1)) }()
|
||||||
select {
|
select {
|
||||||
|
@ -500,3 +499,10 @@ func TestGoroutineCloses(t *testing.T) {
|
||||||
// if this does not panic, then the routine is still active
|
// if this does not panic, then the routine is still active
|
||||||
require.Panics(t, func() { rq.wg.Add(-1) })
|
require.Panics(t, func() { rq.wg.Add(-1) })
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// closeRq closes the done channel of a replication queue so that the run() function returns, but keeps the underlying
|
||||||
|
// queue open for testing purposes.
|
||||||
|
func closeRq(rq *replicationQueue) {
|
||||||
|
close(rq.done)
|
||||||
|
rq.wg.Wait() // wait for run() function to return
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue