milvus/internal/streamingcoord/server/service/discover/discover_server_test.go

85 lines
2.6 KiB
Go

package discover
import (
"context"
"io"
"testing"
"github.com/stretchr/testify/mock"
"github.com/milvus-io/milvus/internal/mocks/streamingcoord/server/mock_balancer"
"github.com/milvus-io/milvus/internal/streamingcoord/server/resource"
"github.com/milvus-io/milvus/pkg/v2/mocks/proto/mock_streamingpb"
"github.com/milvus-io/milvus/pkg/v2/proto/streamingpb"
"github.com/milvus-io/milvus/pkg/v2/streaming/util/types"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
)
func TestAssignmentDiscover(t *testing.T) {
resource.InitForTest()
b := mock_balancer.NewMockBalancer(t)
b.EXPECT().WatchChannelAssignments(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, cb func(typeutil.VersionInt64Pair, []types.PChannelInfoAssigned) error) error {
versions := []typeutil.VersionInt64Pair{
{Global: 1, Local: 2},
{Global: 1, Local: 3},
}
pchans := [][]types.PChannelInfoAssigned{
{
types.PChannelInfoAssigned{
Channel: types.PChannelInfo{Name: "pchannel", Term: 1},
Node: types.StreamingNodeInfo{ServerID: 1, Address: "localhost:1"},
},
},
{
types.PChannelInfoAssigned{
Channel: types.PChannelInfo{Name: "pchannel", Term: 1},
Node: types.StreamingNodeInfo{ServerID: 1, Address: "localhost:1"},
},
types.PChannelInfoAssigned{
Channel: types.PChannelInfo{Name: "pchannel2", Term: 1},
Node: types.StreamingNodeInfo{ServerID: 1, Address: "localhost:1"},
},
},
}
for i := 0; i < len(versions); i++ {
cb(versions[i], pchans[i])
}
<-ctx.Done()
return context.Cause(ctx)
})
b.EXPECT().MarkAsUnavailable(mock.Anything, mock.Anything).Return(nil)
streamServer := mock_streamingpb.NewMockStreamingCoordAssignmentService_AssignmentDiscoverServer(t)
streamServer.EXPECT().Context().Return(context.Background())
k := 0
reqs := []*streamingpb.AssignmentDiscoverRequest{
{
Command: &streamingpb.AssignmentDiscoverRequest_ReportError{
ReportError: &streamingpb.ReportAssignmentErrorRequest{
Pchannel: &streamingpb.PChannelInfo{
Name: "pchannel",
Term: 1,
},
Err: &streamingpb.StreamingError{
Code: streamingpb.StreamingCode_STREAMING_CODE_CHANNEL_NOT_EXIST,
},
},
},
},
{
Command: &streamingpb.AssignmentDiscoverRequest_Close{},
},
}
streamServer.EXPECT().Recv().RunAndReturn(func() (*streamingpb.AssignmentDiscoverRequest, error) {
if k >= len(reqs) {
return nil, io.EOF
}
req := reqs[k]
k++
return req, nil
})
streamServer.EXPECT().Send(mock.Anything).Return(nil)
ads := NewAssignmentDiscoverServer(b, streamServer)
ads.Execute()
}