mirror of https://github.com/milvus-io/milvus.git
Make querycoord segment allocator respect context (#17452)
Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>pull/17472/head
parent
dfaed5acdd
commit
97a871cc82
|
@ -202,10 +202,25 @@ func shuffleSegmentsToQueryNodeV2(ctx context.Context, reqs []*querypb.LoadSegme
|
|||
}
|
||||
}
|
||||
|
||||
time.Sleep(shuffleWaitInterval)
|
||||
err := waitWithContext(ctx, shuffleWaitInterval)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// waitWithContext util function to wait for provided duration or context done.
|
||||
func waitWithContext(ctx context.Context, d time.Duration) error {
|
||||
timer := time.NewTimer(d)
|
||||
defer timer.Stop()
|
||||
select {
|
||||
case <-timer.C:
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func nodeIncluded(nodeID int64, includeNodeIDs []int64) bool {
|
||||
for _, id := range includeNodeIDs {
|
||||
if id == nodeID {
|
||||
|
|
|
@ -18,9 +18,11 @@ package querycoord
|
|||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"math/rand"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
|
@ -132,6 +134,44 @@ func TestShuffleSegmentsToQueryNode(t *testing.T) {
|
|||
assert.Equal(t, node2ID, secondReq.DstNodeID)
|
||||
})
|
||||
|
||||
cluster.StopNode(node2ID)
|
||||
|
||||
t.Run("Test shuffleSegmentsToQueryNodeV2 ctx", func(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
cancel()
|
||||
err = shuffleSegmentsToQueryNodeV2(ctx, reqs, cluster, meta, true, nil, nil, -1)
|
||||
assert.Error(t, err)
|
||||
|
||||
assert.True(t, errors.Is(err, context.Canceled))
|
||||
})
|
||||
|
||||
err = removeAllSession()
|
||||
assert.Nil(t, err)
|
||||
}
|
||||
|
||||
func Test_waitWithContext(t *testing.T) {
|
||||
t.Run("normal wait", func(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
err := waitWithContext(ctx, time.Millisecond)
|
||||
assert.NoError(t, err)
|
||||
})
|
||||
|
||||
t.Run("context canceled", func(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
cancel()
|
||||
|
||||
err := waitWithContext(ctx, time.Second)
|
||||
assert.Error(t, err)
|
||||
assert.True(t, errors.Is(err, context.Canceled))
|
||||
})
|
||||
|
||||
t.Run("context deadline", func(t *testing.T) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
|
||||
defer cancel()
|
||||
|
||||
err := waitWithContext(ctx, time.Second)
|
||||
assert.Error(t, err)
|
||||
assert.True(t, errors.Is(err, context.DeadlineExceeded))
|
||||
})
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue