mirror of https://github.com/milvus-io/milvus.git
85 lines
2.6 KiB
Go
85 lines
2.6 KiB
Go
package discover
|
|
|
|
import (
|
|
"context"
|
|
"io"
|
|
"testing"
|
|
|
|
"github.com/stretchr/testify/mock"
|
|
|
|
"github.com/milvus-io/milvus/internal/mocks/streamingcoord/server/mock_balancer"
|
|
"github.com/milvus-io/milvus/internal/streamingcoord/server/resource"
|
|
"github.com/milvus-io/milvus/pkg/v2/mocks/proto/mock_streamingpb"
|
|
"github.com/milvus-io/milvus/pkg/v2/proto/streamingpb"
|
|
"github.com/milvus-io/milvus/pkg/v2/streaming/util/types"
|
|
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
|
|
)
|
|
|
|
func TestAssignmentDiscover(t *testing.T) {
|
|
resource.InitForTest()
|
|
b := mock_balancer.NewMockBalancer(t)
|
|
b.EXPECT().WatchChannelAssignments(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, cb func(typeutil.VersionInt64Pair, []types.PChannelInfoAssigned) error) error {
|
|
versions := []typeutil.VersionInt64Pair{
|
|
{Global: 1, Local: 2},
|
|
{Global: 1, Local: 3},
|
|
}
|
|
pchans := [][]types.PChannelInfoAssigned{
|
|
{
|
|
types.PChannelInfoAssigned{
|
|
Channel: types.PChannelInfo{Name: "pchannel", Term: 1},
|
|
Node: types.StreamingNodeInfo{ServerID: 1, Address: "localhost:1"},
|
|
},
|
|
},
|
|
{
|
|
types.PChannelInfoAssigned{
|
|
Channel: types.PChannelInfo{Name: "pchannel", Term: 1},
|
|
Node: types.StreamingNodeInfo{ServerID: 1, Address: "localhost:1"},
|
|
},
|
|
types.PChannelInfoAssigned{
|
|
Channel: types.PChannelInfo{Name: "pchannel2", Term: 1},
|
|
Node: types.StreamingNodeInfo{ServerID: 1, Address: "localhost:1"},
|
|
},
|
|
},
|
|
}
|
|
for i := 0; i < len(versions); i++ {
|
|
cb(versions[i], pchans[i])
|
|
}
|
|
<-ctx.Done()
|
|
return context.Cause(ctx)
|
|
})
|
|
b.EXPECT().MarkAsUnavailable(mock.Anything, mock.Anything).Return(nil)
|
|
|
|
streamServer := mock_streamingpb.NewMockStreamingCoordAssignmentService_AssignmentDiscoverServer(t)
|
|
streamServer.EXPECT().Context().Return(context.Background())
|
|
k := 0
|
|
reqs := []*streamingpb.AssignmentDiscoverRequest{
|
|
{
|
|
Command: &streamingpb.AssignmentDiscoverRequest_ReportError{
|
|
ReportError: &streamingpb.ReportAssignmentErrorRequest{
|
|
Pchannel: &streamingpb.PChannelInfo{
|
|
Name: "pchannel",
|
|
Term: 1,
|
|
},
|
|
Err: &streamingpb.StreamingError{
|
|
Code: streamingpb.StreamingCode_STREAMING_CODE_CHANNEL_NOT_EXIST,
|
|
},
|
|
},
|
|
},
|
|
},
|
|
{
|
|
Command: &streamingpb.AssignmentDiscoverRequest_Close{},
|
|
},
|
|
}
|
|
streamServer.EXPECT().Recv().RunAndReturn(func() (*streamingpb.AssignmentDiscoverRequest, error) {
|
|
if k >= len(reqs) {
|
|
return nil, io.EOF
|
|
}
|
|
req := reqs[k]
|
|
k++
|
|
return req, nil
|
|
})
|
|
streamServer.EXPECT().Send(mock.Anything).Return(nil)
|
|
ads := NewAssignmentDiscoverServer(b, streamServer)
|
|
ads.Execute()
|
|
}
|